vec: compression part 2 and done

This commit is contained in:
nym21
2025-03-14 16:00:47 +01:00
parent c459a3033d
commit a995eb2929
22 changed files with 799 additions and 524 deletions

View File

@@ -8,7 +8,11 @@ use crate::run::RunConfig;
pub fn query(params: QueryParams) -> color_eyre::Result<()> {
let config = RunConfig::import(None)?;
let mut indexer = Indexer::new(config.indexeddir(), config.check_collisions())?;
let mut indexer = Indexer::new(
config.indexeddir(),
config.compressed(),
config.check_collisions(),
)?;
indexer.import_vecs()?;
let mut computer = Computer::new(config.computeddir(), None);

View File

@@ -26,7 +26,11 @@ pub fn run(config: RunConfig) -> color_eyre::Result<()> {
let parser = brk_parser::Parser::new(config.blocksdir(), rpc);
let mut indexer = Indexer::new(config.indexeddir(), config.check_collisions())?;
let mut indexer = Indexer::new(
config.indexeddir(),
config.compressed(),
config.check_collisions(),
)?;
indexer.import_stores()?;
indexer.import_vecs()?;
@@ -103,6 +107,10 @@ pub struct RunConfig {
#[arg(short, long)]
mode: Option<Mode>,
/// Activate compression of datasets, set to true to save disk space or false if prioritize speed, default: true, saved
#[arg(short, long, value_name = "BOOL")]
compressed: Option<bool>,
/// Activate fetching prices from exchanges APIs and the computation of all related datasets, default: false, saved
#[arg(short, long, value_name = "BOOL")]
fetch: Option<bool>,
@@ -171,6 +179,10 @@ impl RunConfig {
config_saved.fetch = Some(fetch);
}
if let Some(compressed) = config_args.compressed.take() {
config_saved.compressed = Some(compressed);
}
if let Some(website) = config_args.website.take() {
config_saved.website = Some(website);
}
@@ -387,6 +399,10 @@ impl RunConfig {
self.fetch.is_some_and(|b| b)
}
pub fn compressed(&self) -> bool {
self.compressed.is_none_or(|b| b)
}
pub fn check_collisions(&self) -> bool {
self.check_collisions.is_some_and(|b| b)
}

View File

@@ -28,7 +28,7 @@ pub fn main() -> color_eyre::Result<()> {
let outputs_dir = Path::new("../../_outputs");
let mut indexer = Indexer::new(outputs_dir.join("indexed"), true)?;
let mut indexer = Indexer::new(outputs_dir.join("indexed"), true, true)?;
indexer.import_stores()?;
indexer.import_vecs()?;

View File

@@ -87,7 +87,7 @@ where
fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> {
let path = self.path_computed_version();
if version.validate(path.as_ref()).is_err() {
self.reset_file()?;
self.reset()?;
}
version.write(path.as_ref())?;
Ok(())

View File

@@ -2,12 +2,11 @@ use std::path::Path;
use brk_core::{default_bitcoin_path, dot_brk_path};
use brk_exit::Exit;
use brk_indexer::{Indexer, rpc::RpcApi};
use brk_indexer::Indexer;
use brk_parser::{
Parser,
rpc::{self},
};
use log::info;
fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
@@ -26,24 +25,12 @@ fn main() -> color_eyre::Result<()> {
let outputs = dot_brk_path().join("outputs");
let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), true)?;
let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), true, true)?;
indexer.import_stores()?;
indexer.import_vecs()?;
// loop {
let block_count = rpc.get_block_count()?;
info!("{block_count} blocks found.");
indexer.index(&parser, rpc, &exit)?;
info!("Waiting for new blocks...");
// while block_count == rpc.get_block_count()? {
// sleep(Duration::from_secs(1))
// }
// }
#[allow(unreachable_code)]
Ok(())
}

View File

@@ -38,21 +38,27 @@ pub struct Indexer {
vecs: Option<Vecs>,
stores: Option<Stores>,
check_collisions: bool,
compressed: Compressed,
}
impl Indexer {
pub fn new(indexes_dir: PathBuf, check_collisions: bool) -> color_eyre::Result<Self> {
pub fn new(
indexes_dir: PathBuf,
compressed: bool,
check_collisions: bool,
) -> color_eyre::Result<Self> {
setrlimit()?;
Ok(Self {
path: indexes_dir,
vecs: None,
stores: None,
compressed: Compressed::from(compressed),
check_collisions,
})
}
pub fn import_vecs(&mut self) -> color_eyre::Result<()> {
self.vecs = Some(Vecs::import(&self.path.join("vecs"))?);
self.vecs = Some(Vecs::import(&self.path.join("vecs"), self.compressed)?);
Ok(())
}
@@ -131,7 +137,7 @@ impl Indexer {
idxs.height = height;
let check_collisions = self.check_collisions && height > Height::new(886_000);
let check_collisions = self.check_collisions && height > Height::new(200_000);
let blockhash = BlockHash::from(blockhash);
let blockhash_prefix = BlockHashPrefix::from(&blockhash);

View File

@@ -23,7 +23,7 @@ where
pub fn import(path: &Path, version: Version, compressed: Compressed) -> brk_vec::Result<Self> {
let mut vec = brk_vec::StorableVec::forced_import(path, version, compressed)?;
vec.init_big_cache()?;
vec.enable_large_cache();
Ok(Self {
height: Height::try_from(Self::path_height_(path).as_path()).ok(),
@@ -51,8 +51,7 @@ where
pub fn flush(&mut self, height: Height) -> io::Result<()> {
height.write(&self.path_height())?;
self.vec.flush()?;
self.vec.init_big_cache()
self.vec.flush()
}
}

View File

@@ -64,24 +64,24 @@ pub struct Vecs {
}
impl Vecs {
pub fn import(path: &Path) -> color_eyre::Result<Self> {
pub fn import(path: &Path, compressed: Compressed) -> color_eyre::Result<Self> {
fs::create_dir_all(path)?;
Ok(Self {
addressindex_to_addresstype: StorableVec::import(
&path.join("addressindex_to_addresstype"),
Version::from(1),
Compressed::YES,
compressed,
)?,
addressindex_to_addresstypeindex: StorableVec::import(
&path.join("addressindex_to_addresstypeindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
addressindex_to_height: StorableVec::import(
&path.join("addressindex_to_height"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_blockhash: StorableVec::import(
&path.join("height_to_blockhash"),
@@ -91,102 +91,102 @@ impl Vecs {
height_to_difficulty: StorableVec::import(
&path.join("height_to_difficulty"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_addressindex: StorableVec::import(
&path.join("height_to_first_addressindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_emptyindex: StorableVec::import(
&path.join("height_to_first_emptyindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_multisigindex: StorableVec::import(
&path.join("height_to_first_multisigindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_opreturnindex: StorableVec::import(
&path.join("height_to_first_opreturnindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_pushonlyindex: StorableVec::import(
&path.join("height_to_first_pushonlyindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_txindex: StorableVec::import(
&path.join("height_to_first_txindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_txinindex: StorableVec::import(
&path.join("height_to_first_txinindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_txoutindex: StorableVec::import(
&path.join("height_to_first_txoutindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_unknownindex: StorableVec::import(
&path.join("height_to_first_unkownindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_p2pk33index: StorableVec::import(
&path.join("height_to_first_p2pk33index"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_p2pk65index: StorableVec::import(
&path.join("height_to_first_p2pk65index"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_p2pkhindex: StorableVec::import(
&path.join("height_to_first_p2pkhindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_p2shindex: StorableVec::import(
&path.join("height_to_first_p2shindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_p2trindex: StorableVec::import(
&path.join("height_to_first_p2trindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_p2wpkhindex: StorableVec::import(
&path.join("height_to_first_p2wpkhindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_first_p2wshindex: StorableVec::import(
&path.join("height_to_first_p2wshindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_size: StorableVec::import(
&path.join("height_to_size"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_timestamp: StorableVec::import(
&path.join("height_to_timestamp"),
Version::from(1),
Compressed::YES,
compressed,
)?,
height_to_weight: StorableVec::import(
&path.join("height_to_weight"),
Version::from(1),
Compressed::YES,
compressed,
)?,
p2pk33index_to_p2pk33addressbytes: StorableVec::import(
&path.join("p2pk33index_to_p2pk33addressbytes"),
@@ -226,7 +226,7 @@ impl Vecs {
txindex_to_first_txinindex: StorableVec::import(
&path.join("txindex_to_first_txinindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
txindex_to_first_txoutindex: StorableVec::import(
&path.join("txindex_to_first_txoutindex"),
@@ -236,12 +236,12 @@ impl Vecs {
txindex_to_height: StorableVec::import(
&path.join("txindex_to_height"),
Version::from(1),
Compressed::YES,
compressed,
)?,
txindex_to_locktime: StorableVec::import(
&path.join("txindex_to_locktime"),
Version::from(1),
Compressed::YES,
compressed,
)?,
txindex_to_txid: StorableVec::import(
&path.join("txindex_to_txid"),
@@ -251,37 +251,37 @@ impl Vecs {
txindex_to_base_size: StorableVec::import(
&path.join("txindex_to_base_size"),
Version::from(1),
Compressed::YES,
compressed,
)?,
txindex_to_total_size: StorableVec::import(
&path.join("txindex_to_total_size"),
Version::from(1),
Compressed::YES,
compressed,
)?,
txindex_to_is_explicitly_rbf: StorableVec::import(
&path.join("txindex_to_is_explicitly_rbf"),
Version::from(1),
Compressed::YES,
compressed,
)?,
txindex_to_txversion: StorableVec::import(
&path.join("txindex_to_txversion"),
Version::from(1),
Compressed::YES,
compressed,
)?,
txinindex_to_txoutindex: StorableVec::import(
&path.join("txinindex_to_txoutindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
txoutindex_to_addressindex: StorableVec::import(
&path.join("txoutindex_to_addressindex"),
Version::from(1),
Compressed::YES,
compressed,
)?,
txoutindex_to_value: StorableVec::import(
&path.join("txoutindex_to_value"),
Version::from(1),
Compressed::YES,
compressed,
)?,
})
}

View File

@@ -9,7 +9,7 @@ pub fn main() -> color_eyre::Result<()> {
let outputs_dir = Path::new("../../_outputs");
let mut indexer = Indexer::new(outputs_dir.join("indexed"), true)?;
let mut indexer = Indexer::new(outputs_dir.join("indexed"), true, true)?;
indexer.import_vecs()?;
let mut computer = Computer::new(outputs_dir.join("computed"), None);

View File

@@ -21,7 +21,7 @@ color-eyre = { workspace = true }
jiff = { workspace = true }
log = { workspace = true }
minreq = { workspace = true }
oxc = { version = "0.58.0", features = ["codegen", "minifier"] }
oxc = { version = "0.58.1", features = ["codegen", "minifier"] }
serde = { workspace = true }
tokio = { version = "1.44.1", features = ["full"] }
tower-http = { version = "0.6.2", features = ["compression-full"] }

View File

@@ -31,7 +31,7 @@ pub fn main() -> color_eyre::Result<()> {
let outputs_dir = Path::new("../../_outputs");
let mut indexer = Indexer::new(outputs_dir.join("indexed"), true)?;
let mut indexer = Indexer::new(outputs_dir.join("indexed"), true, true)?;
indexer.import_stores()?;
indexer.import_vecs()?;

View File

@@ -1 +1,2 @@
/vec
_lib.rs

View File

@@ -12,9 +12,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
(0..21_u32).for_each(|v| {
vec.push(v);
});
dbg!(vec.get(0)?); // Some(0)
dbg!(vec.get(20)?); // Some(0)
dbg!(vec.get(21)?); // None
dbg!(vec.get(0)?);
dbg!(vec.get(20)?);
dbg!(vec.get(21)?);
vec.flush()?;
}
@@ -23,13 +23,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut vec: StorableVec<usize, u32> =
StorableVec::forced_import(Path::new("./vec"), Version::from(1), Compressed::YES)?;
dbg!(vec.get(0)?); // 0
dbg!(vec.read(0)?); // 0
dbg!(vec.read(1)?); // 0
dbg!(vec.read(2)?); // 0
dbg!(vec.read(20)?); // 0
dbg!(vec.get(20)?); // 0
dbg!(vec.read(0)?); // 0
dbg!(vec.get(0)?);
dbg!(vec.read(0)?);
dbg!(vec.read(1)?);
dbg!(vec.read(2)?);
dbg!(vec.read(20)?);
dbg!(vec.get(20)?);
dbg!(vec.read(0)?);
vec.push(21);
vec.push(22);
@@ -45,18 +45,18 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut vec: StorableVec<usize, u32> =
StorableVec::forced_import(Path::new("./vec"), Version::from(1), Compressed::YES)?;
vec.init_big_cache()?;
vec.enable_large_cache();
dbg!(vec.get(0)?); // 0
dbg!(vec.get(20)?); // 0
dbg!(vec.get(21)?); // 0
dbg!(vec.get(22)?); // 0
dbg!(vec.get(0)?);
dbg!(vec.get(20)?);
dbg!(vec.get(21)?);
dbg!(vec.get(22)?);
vec.truncate_if_needed(14)?;
dbg!(vec.get(0)?); // 0
dbg!(vec.get(5)?); // 0
dbg!(vec.get(20)?); // 0
dbg!(vec.get(0)?);
dbg!(vec.get(5)?);
dbg!(vec.get(20)?);
vec.iter(|(_, v)| {
dbg!(v);

View File

@@ -1,5 +1,7 @@
mod error;
mod value;
mod values;
pub use error::*;
pub use value::*;
pub use values::*;

View File

@@ -0,0 +1,83 @@
use std::ops::Range;
use memmap2::Mmap;
use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes};
use crate::MAX_PAGE_SIZE;
use super::Result;
#[derive(Debug)]
pub enum Values<T> {
Owned(Box<[T]>),
Ref(Box<Mmap>),
}
impl<T> Values<T> {
const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T;
const SIZE_OF_T: usize = size_of::<T>();
pub fn get(&self, index: usize) -> Result<Option<&T>>
where
T: TryFromBytes + IntoBytes + Immutable + KnownLayout,
{
let index = Self::index_to_decoded_index(index);
Ok(match self {
Self::Owned(a) => a.get(index),
Self::Ref(m) => {
let range = Self::index_to_byte_range(index);
let source = &m[range];
Some(T::try_ref_from_bytes(source)?)
}
})
}
pub fn as_arr(&self) -> &[T] {
match self {
Self::Owned(a) => a,
Self::Ref(_) => unreachable!(),
}
}
pub fn as_mmap(&self) -> &Mmap {
match self {
Self::Owned(_) => unreachable!(),
Self::Ref(m) => m,
}
}
#[inline]
fn index_to_byte_range(index: usize) -> Range<usize> {
let index = Self::index_to_byte_index(index) as usize;
index..(index + Self::SIZE_OF_T)
}
#[inline]
fn index_to_byte_index(index: usize) -> u64 {
(index * Self::SIZE_OF_T) as u64
}
#[inline(always)]
fn index_to_decoded_index(index: usize) -> usize {
index % Self::PER_PAGE
}
}
impl<T> From<Box<[T]>> for Values<T> {
fn from(value: Box<[T]>) -> Self {
Self::Owned(value)
}
}
impl<T> From<Mmap> for Values<T> {
fn from(value: Mmap) -> Self {
Self::Ref(Box::new(value))
}
}
impl<T> Default for Values<T> {
fn default() -> Self {
Self::Owned(vec![].into_boxed_slice())
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,20 +0,0 @@
use std::{fs::File, sync::OnceLock};
use super::CompressedPagesMetadata;
type CompressedPage<T> = Option<(usize, Box<[T]>)>;
pub enum Back<T> {
Raw {
raw_pages: Vec<OnceLock<Box<memmap2::Mmap>>>,
raw_page: memmap2::Mmap,
file: File,
file_position: u64,
buf: Vec<u8>,
},
Compressed {
decoded_pages: Option<Vec<OnceLock<Box<[T]>>>>,
decoded_page: CompressedPage<T>,
pages: CompressedPagesMetadata,
},
}

View File

@@ -1,15 +1,13 @@
mod back;
mod compressed;
mod compressed_page_meta;
mod compressed_pages_meta;
mod length;
mod page;
mod pages;
mod unsafe_slice;
mod version;
pub use back::*;
pub use compressed::*;
pub use compressed_page_meta::*;
pub use compressed_pages_meta::*;
pub use length::*;
pub use page::*;
pub use pages::*;
pub use unsafe_slice::*;
pub use version::*;

View File

@@ -9,7 +9,11 @@ pub trait AnyStorableVec: Send + Sync {
fn index_type_to_string(&self) -> &str;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn collect_range_values(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<serde_json::Value>>;
fn collect_range_values(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Vec<serde_json::Value>>;
fn flush(&mut self) -> io::Result<()>;
}
@@ -38,7 +42,11 @@ where
self.flush()
}
fn collect_range_values(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<serde_json::Value>> {
fn collect_range_values(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Vec<serde_json::Value>> {
Ok(self
.collect_range(from, to)?
.into_iter()