diff --git a/Cargo.lock b/Cargo.lock index 534a176e3..a4ef43bac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1756,9 +1756,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.21.0" +version = "1.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde51589ab56b20a6f686b2c68f7a0bd6add753d697abf720d63f8db3ab7b1ad" +checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc" [[package]] name = "outref" @@ -1780,9 +1780,9 @@ checksum = "1036865bb9422d3300cf723f657c2851d0e9ab12567854b1f4eba3d77decf564" [[package]] name = "oxc" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68df167eecf9d9d0c8f5e0a7daa7cc5fb1627df45f6e285b3e6726a7325dd458" +checksum = "45dac9dff4aa3da5b483ec7f7180b0af4a82882c3b35e67c8f9221e117bf0c93" dependencies = [ "oxc_allocator", "oxc_ast", @@ -1823,9 +1823,9 @@ dependencies = [ [[package]] name = "oxc_allocator" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80925d1f320efc034e1af4b88057c7a115d5163c73c5543000cb1d9d40097457" +checksum = "d9e49310ddfd3bc659d60b9f72bb0fbdb7b23f9bca5b4906056bf1d7d1a502d2" dependencies = [ "allocator-api2", "assert-unchecked", @@ -1837,9 +1837,9 @@ dependencies = [ [[package]] name = "oxc_ast" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7d38367501a804dee978311c96d6740c65369347fbab4bb9def75c98faab640" +checksum = "54af74d151e1a61d57ec8699f1e8b6729d3817fe763c2ecbacb945822998ea3e" dependencies = [ "bitflags", "cow-utils", @@ -1854,9 +1854,9 @@ dependencies = [ [[package]] name = "oxc_ast_macros" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab500f253c07b126d10b5b707ee6f40abdb1676f31a7e896684419ce1214f333" +checksum = "d85874efff8c6b1f8b3adf8f3d8624e52ffab8a44da1e2e792de6a0303a9abb8" dependencies = [ "proc-macro2", "quote", @@ -1865,9 +1865,9 @@ dependencies = [ [[package]] name = "oxc_ast_visit" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ff7e8da546667e4f5e4e7c811518c069cdb3e9126b9496e3678401da2150947" +checksum = "03bdf81b8db7952a841d15141e9efc40c8dd01720b9f1779b37f6d3ae5c9e7e4" dependencies = [ "oxc_allocator", "oxc_ast", @@ -1877,9 +1877,9 @@ dependencies = [ [[package]] name = "oxc_cfg" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e0f65c0e170808c197ef205a442d50a5e4992e9247d8da986c4c591a88d557c" +checksum = "bc06be42ba66c3ab03fb82d973038ee8f5806cf8a7cd23beaa05f262cd63eee3" dependencies = [ "bitflags", "itertools", @@ -1892,9 +1892,9 @@ dependencies = [ [[package]] name = "oxc_codegen" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5a6662210485fc557bd26bcbfa5441fac4d74d632297caee5e01d886bf17599" +checksum = "fd09d5789bd90a760aa1e1a634fdd20e2b43981b590639264703e6760d33e5e6" dependencies = [ "bitflags", "cow-utils", @@ -1913,9 +1913,9 @@ dependencies = [ [[package]] name = "oxc_data_structures" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0863880597e1723b2c51d4212294010d3e5f600d82c6c2cb8aaf7e6bd9400732" +checksum = "f92e1c4325cef51dda4296fd92302a6c3117325609efb81ac3f0996e7e44977b" dependencies = [ "assert-unchecked", "ropey", @@ -1923,9 +1923,9 @@ dependencies = [ [[package]] name = "oxc_diagnostics" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c180f07354c135fc434766b905135443d88f2d8b392c0ec22c468df19db889ce" +checksum = "4866163037145687f7197fb70bad1fd0c109e9e2f70659f3eb7f038cd3168bd6" dependencies = [ "cow-utils", "oxc-miette", @@ -1933,9 +1933,9 @@ dependencies = [ [[package]] name = "oxc_ecmascript" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1539cce5d32af3b1bb496e219272c3c85ac1719b7c2ba08b98c4688c71f2067e" +checksum = "d0fbfb5f543a10fb1264a5c24731ab700b5e6f7bedcc2c39792267039824216c" dependencies = [ "cow-utils", "num-bigint", @@ -1947,9 +1947,9 @@ dependencies = [ [[package]] name = "oxc_estree" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58fda7b4e0bc5b16cc64eb19c519f13982968211bb35a7cd7ec50188f9e020cb" +checksum = "84fade441037d1c4f5929f278f7ed74bc5b5928f35f5bc27ffb569512762622d" [[package]] name = "oxc_index" @@ -1959,9 +1959,9 @@ checksum = "2fa07b0cfa997730afed43705766ef27792873fdf5215b1391949fec678d2392" [[package]] name = "oxc_mangler" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a0c5e897ed00988f4a8c665344adef77337f2f2caf07e51559f52fa85d146be" +checksum = "1bfbf4a5f8bb51ec1b97100b4faaf50379f97d635bba0a39d95aa613c51fe630" dependencies = [ "fixedbitset", "itertools", @@ -1976,9 +1976,9 @@ dependencies = [ [[package]] name = "oxc_minifier" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb6c12dbb296d6a07c433821c282c17d27118781751df0b6c788b0da7e17810b" +checksum = "ea5c83bfaef4a1d5d8ffdf2b10bceadd57c8b15ccade74ae7a6672c6af31d095" dependencies = [ "cow-utils", "oxc_allocator", @@ -1998,9 +1998,9 @@ dependencies = [ [[package]] name = "oxc_parser" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb503b32077fb3ee4466eae1d38e5360bc7bbe5d3e0d48b23bb7d26e556386de" +checksum = "6b702c0462e5a67c845d7fafe236d8bb253ae0f4c8ff44a980b916a154862534" dependencies = [ "assert-unchecked", "bitflags", @@ -2021,9 +2021,9 @@ dependencies = [ [[package]] name = "oxc_regular_expression" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6578e43fd0f44b6c1f00b9dc7aac0d65b8e85abb5b576aedd2f56ec37430837b" +checksum = "b672d8601f80b9828342e0a38f6cc6735a04e2e4251c0c46476df55842df711a" dependencies = [ "oxc_allocator", "oxc_ast_macros", @@ -2037,9 +2037,9 @@ dependencies = [ [[package]] name = "oxc_semantic" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0dd10e2c69820e02ad4f9a55d53484bd353567c4784b72a11843a81d917ee04" +checksum = "ae29a491046d24b7cbac9ce0602e1c30c52f7065013e7212d716f283800a92ab" dependencies = [ "assert-unchecked", "itertools", @@ -2074,9 +2074,9 @@ dependencies = [ [[package]] name = "oxc_span" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7492a8a4c6e018a6156b6b8a69ff5ab48cc034f895563b40902d1a0e78bef04" +checksum = "c059e07f57c3299f54dfed3ba2f58dcc183ad68102d1186f8a4c5f546a2b9c5e" dependencies = [ "compact_str", "oxc-miette", @@ -2087,9 +2087,9 @@ dependencies = [ [[package]] name = "oxc_syntax" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35c96f9478b96246f760b5cc54abf4044233fb6a52781e9cdc635b6dc5734752" +checksum = "2888043d4a47ee54903a229f3cfbab1126223c9b819505e900485993467a04d1" dependencies = [ "assert-unchecked", "bitflags", @@ -2108,9 +2108,9 @@ dependencies = [ [[package]] name = "oxc_traverse" -version = "0.58.0" +version = "0.58.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1bf9f3c23fc9fdf3f64d1e9b103b3aa54f91e3297ab201a922e1b66767350ce" +checksum = "6b09630f1f467e91901e1451d651cb8fef0cab7b3facedecc022d6065842cd8a" dependencies = [ "compact_str", "itoa", @@ -2844,11 +2844,10 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.18.0" +version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c317e0a526ee6120d8dabad239c8dadca62b24b6f168914bbbc8e2fb1f0e567" +checksum = "488960f40a3fd53d72c2a29a58722561dee8afdd175bd88e3db4677d7b2ba600" dependencies = [ - "cfg-if", "fastrand", "getrandom 0.3.1", "once_cell", @@ -2979,9 +2978,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.13" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" +checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034" dependencies = [ "bytes", "futures-core", diff --git a/crates/brk_cli/src/query.rs b/crates/brk_cli/src/query.rs index 58bd3fa0e..326c992c2 100644 --- a/crates/brk_cli/src/query.rs +++ b/crates/brk_cli/src/query.rs @@ -8,7 +8,11 @@ use crate::run::RunConfig; pub fn query(params: QueryParams) -> color_eyre::Result<()> { let config = RunConfig::import(None)?; - let mut indexer = Indexer::new(config.indexeddir(), config.check_collisions())?; + let mut indexer = Indexer::new( + config.indexeddir(), + config.compressed(), + config.check_collisions(), + )?; indexer.import_vecs()?; let mut computer = Computer::new(config.computeddir(), None); diff --git a/crates/brk_cli/src/run.rs b/crates/brk_cli/src/run.rs index 1ae67bed1..a850caa7c 100644 --- a/crates/brk_cli/src/run.rs +++ b/crates/brk_cli/src/run.rs @@ -26,7 +26,11 @@ pub fn run(config: RunConfig) -> color_eyre::Result<()> { let parser = brk_parser::Parser::new(config.blocksdir(), rpc); - let mut indexer = Indexer::new(config.indexeddir(), config.check_collisions())?; + let mut indexer = Indexer::new( + config.indexeddir(), + config.compressed(), + config.check_collisions(), + )?; indexer.import_stores()?; indexer.import_vecs()?; @@ -103,6 +107,10 @@ pub struct RunConfig { #[arg(short, long)] mode: Option, + /// Activate compression of datasets, set to true to save disk space or false if prioritize speed, default: true, saved + #[arg(short, long, value_name = "BOOL")] + compressed: Option, + /// Activate fetching prices from exchanges APIs and the computation of all related datasets, default: false, saved #[arg(short, long, value_name = "BOOL")] fetch: Option, @@ -171,6 +179,10 @@ impl RunConfig { config_saved.fetch = Some(fetch); } + if let Some(compressed) = config_args.compressed.take() { + config_saved.compressed = Some(compressed); + } + if let Some(website) = config_args.website.take() { config_saved.website = Some(website); } @@ -387,6 +399,10 @@ impl RunConfig { self.fetch.is_some_and(|b| b) } + pub fn compressed(&self) -> bool { + self.compressed.is_none_or(|b| b) + } + pub fn check_collisions(&self) -> bool { self.check_collisions.is_some_and(|b| b) } diff --git a/crates/brk_computer/examples/main.rs b/crates/brk_computer/examples/main.rs index 062787e96..b37b9241c 100644 --- a/crates/brk_computer/examples/main.rs +++ b/crates/brk_computer/examples/main.rs @@ -28,7 +28,7 @@ pub fn main() -> color_eyre::Result<()> { let outputs_dir = Path::new("../../_outputs"); - let mut indexer = Indexer::new(outputs_dir.join("indexed"), true)?; + let mut indexer = Indexer::new(outputs_dir.join("indexed"), true, true)?; indexer.import_stores()?; indexer.import_vecs()?; diff --git a/crates/brk_computer/src/storage/vecs/base.rs b/crates/brk_computer/src/storage/vecs/base.rs index aee9dda16..46e407576 100644 --- a/crates/brk_computer/src/storage/vecs/base.rs +++ b/crates/brk_computer/src/storage/vecs/base.rs @@ -87,7 +87,7 @@ where fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> { let path = self.path_computed_version(); if version.validate(path.as_ref()).is_err() { - self.reset_file()?; + self.reset()?; } version.write(path.as_ref())?; Ok(()) diff --git a/crates/brk_indexer/examples/main.rs b/crates/brk_indexer/examples/main.rs index 69ff767b2..d4cbb3a33 100644 --- a/crates/brk_indexer/examples/main.rs +++ b/crates/brk_indexer/examples/main.rs @@ -2,12 +2,11 @@ use std::path::Path; use brk_core::{default_bitcoin_path, dot_brk_path}; use brk_exit::Exit; -use brk_indexer::{Indexer, rpc::RpcApi}; +use brk_indexer::Indexer; use brk_parser::{ Parser, rpc::{self}, }; -use log::info; fn main() -> color_eyre::Result<()> { color_eyre::install()?; @@ -26,24 +25,12 @@ fn main() -> color_eyre::Result<()> { let outputs = dot_brk_path().join("outputs"); - let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), true)?; + let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), true, true)?; + indexer.import_stores()?; indexer.import_vecs()?; - // loop { - let block_count = rpc.get_block_count()?; - - info!("{block_count} blocks found."); - indexer.index(&parser, rpc, &exit)?; - info!("Waiting for new blocks..."); - - // while block_count == rpc.get_block_count()? { - // sleep(Duration::from_secs(1)) - // } - // } - - #[allow(unreachable_code)] Ok(()) } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 7e89e53cd..df4bc143d 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -38,21 +38,27 @@ pub struct Indexer { vecs: Option, stores: Option, check_collisions: bool, + compressed: Compressed, } impl Indexer { - pub fn new(indexes_dir: PathBuf, check_collisions: bool) -> color_eyre::Result { + pub fn new( + indexes_dir: PathBuf, + compressed: bool, + check_collisions: bool, + ) -> color_eyre::Result { setrlimit()?; Ok(Self { path: indexes_dir, vecs: None, stores: None, + compressed: Compressed::from(compressed), check_collisions, }) } pub fn import_vecs(&mut self) -> color_eyre::Result<()> { - self.vecs = Some(Vecs::import(&self.path.join("vecs"))?); + self.vecs = Some(Vecs::import(&self.path.join("vecs"), self.compressed)?); Ok(()) } @@ -131,7 +137,7 @@ impl Indexer { idxs.height = height; - let check_collisions = self.check_collisions && height > Height::new(886_000); + let check_collisions = self.check_collisions && height > Height::new(200_000); let blockhash = BlockHash::from(blockhash); let blockhash_prefix = BlockHashPrefix::from(&blockhash); diff --git a/crates/brk_indexer/src/vecs/base.rs b/crates/brk_indexer/src/vecs/base.rs index 4d515cf2b..f6e8d5198 100644 --- a/crates/brk_indexer/src/vecs/base.rs +++ b/crates/brk_indexer/src/vecs/base.rs @@ -23,7 +23,7 @@ where pub fn import(path: &Path, version: Version, compressed: Compressed) -> brk_vec::Result { let mut vec = brk_vec::StorableVec::forced_import(path, version, compressed)?; - vec.init_big_cache()?; + vec.enable_large_cache(); Ok(Self { height: Height::try_from(Self::path_height_(path).as_path()).ok(), @@ -51,8 +51,7 @@ where pub fn flush(&mut self, height: Height) -> io::Result<()> { height.write(&self.path_height())?; - self.vec.flush()?; - self.vec.init_big_cache() + self.vec.flush() } } diff --git a/crates/brk_indexer/src/vecs/mod.rs b/crates/brk_indexer/src/vecs/mod.rs index 37d5f3349..1bf3c813f 100644 --- a/crates/brk_indexer/src/vecs/mod.rs +++ b/crates/brk_indexer/src/vecs/mod.rs @@ -64,24 +64,24 @@ pub struct Vecs { } impl Vecs { - pub fn import(path: &Path) -> color_eyre::Result { + pub fn import(path: &Path, compressed: Compressed) -> color_eyre::Result { fs::create_dir_all(path)?; Ok(Self { addressindex_to_addresstype: StorableVec::import( &path.join("addressindex_to_addresstype"), Version::from(1), - Compressed::YES, + compressed, )?, addressindex_to_addresstypeindex: StorableVec::import( &path.join("addressindex_to_addresstypeindex"), Version::from(1), - Compressed::YES, + compressed, )?, addressindex_to_height: StorableVec::import( &path.join("addressindex_to_height"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_blockhash: StorableVec::import( &path.join("height_to_blockhash"), @@ -91,102 +91,102 @@ impl Vecs { height_to_difficulty: StorableVec::import( &path.join("height_to_difficulty"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_addressindex: StorableVec::import( &path.join("height_to_first_addressindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_emptyindex: StorableVec::import( &path.join("height_to_first_emptyindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_multisigindex: StorableVec::import( &path.join("height_to_first_multisigindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_opreturnindex: StorableVec::import( &path.join("height_to_first_opreturnindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_pushonlyindex: StorableVec::import( &path.join("height_to_first_pushonlyindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_txindex: StorableVec::import( &path.join("height_to_first_txindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_txinindex: StorableVec::import( &path.join("height_to_first_txinindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_txoutindex: StorableVec::import( &path.join("height_to_first_txoutindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_unknownindex: StorableVec::import( &path.join("height_to_first_unkownindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_p2pk33index: StorableVec::import( &path.join("height_to_first_p2pk33index"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_p2pk65index: StorableVec::import( &path.join("height_to_first_p2pk65index"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_p2pkhindex: StorableVec::import( &path.join("height_to_first_p2pkhindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_p2shindex: StorableVec::import( &path.join("height_to_first_p2shindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_p2trindex: StorableVec::import( &path.join("height_to_first_p2trindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_p2wpkhindex: StorableVec::import( &path.join("height_to_first_p2wpkhindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_first_p2wshindex: StorableVec::import( &path.join("height_to_first_p2wshindex"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_size: StorableVec::import( &path.join("height_to_size"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_timestamp: StorableVec::import( &path.join("height_to_timestamp"), Version::from(1), - Compressed::YES, + compressed, )?, height_to_weight: StorableVec::import( &path.join("height_to_weight"), Version::from(1), - Compressed::YES, + compressed, )?, p2pk33index_to_p2pk33addressbytes: StorableVec::import( &path.join("p2pk33index_to_p2pk33addressbytes"), @@ -226,7 +226,7 @@ impl Vecs { txindex_to_first_txinindex: StorableVec::import( &path.join("txindex_to_first_txinindex"), Version::from(1), - Compressed::YES, + compressed, )?, txindex_to_first_txoutindex: StorableVec::import( &path.join("txindex_to_first_txoutindex"), @@ -236,12 +236,12 @@ impl Vecs { txindex_to_height: StorableVec::import( &path.join("txindex_to_height"), Version::from(1), - Compressed::YES, + compressed, )?, txindex_to_locktime: StorableVec::import( &path.join("txindex_to_locktime"), Version::from(1), - Compressed::YES, + compressed, )?, txindex_to_txid: StorableVec::import( &path.join("txindex_to_txid"), @@ -251,37 +251,37 @@ impl Vecs { txindex_to_base_size: StorableVec::import( &path.join("txindex_to_base_size"), Version::from(1), - Compressed::YES, + compressed, )?, txindex_to_total_size: StorableVec::import( &path.join("txindex_to_total_size"), Version::from(1), - Compressed::YES, + compressed, )?, txindex_to_is_explicitly_rbf: StorableVec::import( &path.join("txindex_to_is_explicitly_rbf"), Version::from(1), - Compressed::YES, + compressed, )?, txindex_to_txversion: StorableVec::import( &path.join("txindex_to_txversion"), Version::from(1), - Compressed::YES, + compressed, )?, txinindex_to_txoutindex: StorableVec::import( &path.join("txinindex_to_txoutindex"), Version::from(1), - Compressed::YES, + compressed, )?, txoutindex_to_addressindex: StorableVec::import( &path.join("txoutindex_to_addressindex"), Version::from(1), - Compressed::YES, + compressed, )?, txoutindex_to_value: StorableVec::import( &path.join("txoutindex_to_value"), Version::from(1), - Compressed::YES, + compressed, )?, }) } diff --git a/crates/brk_query/examples/main.rs b/crates/brk_query/examples/main.rs index 6539099dc..81f22a45b 100644 --- a/crates/brk_query/examples/main.rs +++ b/crates/brk_query/examples/main.rs @@ -9,7 +9,7 @@ pub fn main() -> color_eyre::Result<()> { let outputs_dir = Path::new("../../_outputs"); - let mut indexer = Indexer::new(outputs_dir.join("indexed"), true)?; + let mut indexer = Indexer::new(outputs_dir.join("indexed"), true, true)?; indexer.import_vecs()?; let mut computer = Computer::new(outputs_dir.join("computed"), None); diff --git a/crates/brk_server/Cargo.toml b/crates/brk_server/Cargo.toml index c196245b9..95fca48d8 100644 --- a/crates/brk_server/Cargo.toml +++ b/crates/brk_server/Cargo.toml @@ -21,7 +21,7 @@ color-eyre = { workspace = true } jiff = { workspace = true } log = { workspace = true } minreq = { workspace = true } -oxc = { version = "0.58.0", features = ["codegen", "minifier"] } +oxc = { version = "0.58.1", features = ["codegen", "minifier"] } serde = { workspace = true } tokio = { version = "1.44.1", features = ["full"] } tower-http = { version = "0.6.2", features = ["compression-full"] } diff --git a/crates/brk_server/examples/main.rs b/crates/brk_server/examples/main.rs index a98f9bd9d..842f53865 100644 --- a/crates/brk_server/examples/main.rs +++ b/crates/brk_server/examples/main.rs @@ -31,7 +31,7 @@ pub fn main() -> color_eyre::Result<()> { let outputs_dir = Path::new("../../_outputs"); - let mut indexer = Indexer::new(outputs_dir.join("indexed"), true)?; + let mut indexer = Indexer::new(outputs_dir.join("indexed"), true, true)?; indexer.import_stores()?; indexer.import_vecs()?; diff --git a/crates/brk_vec/.gitignore b/crates/brk_vec/.gitignore index 709561520..d1d84f03b 100644 --- a/crates/brk_vec/.gitignore +++ b/crates/brk_vec/.gitignore @@ -1 +1,2 @@ /vec +_lib.rs diff --git a/crates/brk_vec/examples/main.rs b/crates/brk_vec/examples/main.rs index 53a1d3926..660d95013 100644 --- a/crates/brk_vec/examples/main.rs +++ b/crates/brk_vec/examples/main.rs @@ -12,9 +12,9 @@ fn main() -> Result<(), Box> { (0..21_u32).for_each(|v| { vec.push(v); }); - dbg!(vec.get(0)?); // Some(0) - dbg!(vec.get(20)?); // Some(0) - dbg!(vec.get(21)?); // None + dbg!(vec.get(0)?); + dbg!(vec.get(20)?); + dbg!(vec.get(21)?); vec.flush()?; } @@ -23,13 +23,13 @@ fn main() -> Result<(), Box> { let mut vec: StorableVec = StorableVec::forced_import(Path::new("./vec"), Version::from(1), Compressed::YES)?; - dbg!(vec.get(0)?); // 0 - dbg!(vec.read(0)?); // 0 - dbg!(vec.read(1)?); // 0 - dbg!(vec.read(2)?); // 0 - dbg!(vec.read(20)?); // 0 - dbg!(vec.get(20)?); // 0 - dbg!(vec.read(0)?); // 0 + dbg!(vec.get(0)?); + dbg!(vec.read(0)?); + dbg!(vec.read(1)?); + dbg!(vec.read(2)?); + dbg!(vec.read(20)?); + dbg!(vec.get(20)?); + dbg!(vec.read(0)?); vec.push(21); vec.push(22); @@ -45,18 +45,18 @@ fn main() -> Result<(), Box> { let mut vec: StorableVec = StorableVec::forced_import(Path::new("./vec"), Version::from(1), Compressed::YES)?; - vec.init_big_cache()?; + vec.enable_large_cache(); - dbg!(vec.get(0)?); // 0 - dbg!(vec.get(20)?); // 0 - dbg!(vec.get(21)?); // 0 - dbg!(vec.get(22)?); // 0 + dbg!(vec.get(0)?); + dbg!(vec.get(20)?); + dbg!(vec.get(21)?); + dbg!(vec.get(22)?); vec.truncate_if_needed(14)?; - dbg!(vec.get(0)?); // 0 - dbg!(vec.get(5)?); // 0 - dbg!(vec.get(20)?); // 0 + dbg!(vec.get(0)?); + dbg!(vec.get(5)?); + dbg!(vec.get(20)?); vec.iter(|(_, v)| { dbg!(v); diff --git a/crates/brk_vec/src/enums/mod.rs b/crates/brk_vec/src/enums/mod.rs index 9198ca8e0..acf1f9efb 100644 --- a/crates/brk_vec/src/enums/mod.rs +++ b/crates/brk_vec/src/enums/mod.rs @@ -1,5 +1,7 @@ mod error; mod value; +mod values; pub use error::*; pub use value::*; +pub use values::*; diff --git a/crates/brk_vec/src/enums/values.rs b/crates/brk_vec/src/enums/values.rs new file mode 100644 index 000000000..dd3d2bed2 --- /dev/null +++ b/crates/brk_vec/src/enums/values.rs @@ -0,0 +1,83 @@ +use std::ops::Range; + +use memmap2::Mmap; +use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; + +use crate::MAX_PAGE_SIZE; + +use super::Result; + +#[derive(Debug)] +pub enum Values { + Owned(Box<[T]>), + Ref(Box), +} + +impl Values { + const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; + const SIZE_OF_T: usize = size_of::(); + + pub fn get(&self, index: usize) -> Result> + where + T: TryFromBytes + IntoBytes + Immutable + KnownLayout, + { + let index = Self::index_to_decoded_index(index); + + Ok(match self { + Self::Owned(a) => a.get(index), + Self::Ref(m) => { + let range = Self::index_to_byte_range(index); + let source = &m[range]; + Some(T::try_ref_from_bytes(source)?) + } + }) + } + + pub fn as_arr(&self) -> &[T] { + match self { + Self::Owned(a) => a, + Self::Ref(_) => unreachable!(), + } + } + + pub fn as_mmap(&self) -> &Mmap { + match self { + Self::Owned(_) => unreachable!(), + Self::Ref(m) => m, + } + } + + #[inline] + fn index_to_byte_range(index: usize) -> Range { + let index = Self::index_to_byte_index(index) as usize; + index..(index + Self::SIZE_OF_T) + } + + #[inline] + fn index_to_byte_index(index: usize) -> u64 { + (index * Self::SIZE_OF_T) as u64 + } + + #[inline(always)] + fn index_to_decoded_index(index: usize) -> usize { + index % Self::PER_PAGE + } +} + +impl From> for Values { + fn from(value: Box<[T]>) -> Self { + Self::Owned(value) + } +} + +impl From for Values { + fn from(value: Mmap) -> Self { + Self::Ref(Box::new(value)) + } +} + +impl Default for Values { + fn default() -> Self { + Self::Owned(vec![].into_boxed_slice()) + } +} diff --git a/crates/brk_vec/src/lib.rs b/crates/brk_vec/src/lib.rs index ccaad4b1f..eb9eda45c 100644 --- a/crates/brk_vec/src/lib.rs +++ b/crates/brk_vec/src/lib.rs @@ -5,9 +5,8 @@ use std::{ cmp::Ordering, - fmt::Debug, fs::{self, File, OpenOptions}, - io::{self, Seek, SeekFrom, Write}, + io::{self, Read, Seek, SeekFrom, Write}, marker::PhantomData, mem, path::{Path, PathBuf}, @@ -15,9 +14,9 @@ use std::{ }; pub use memmap2; -use memmap2::Mmap; use rayon::prelude::*; pub use zerocopy; +use zstd::DEFAULT_COMPRESSION_LEVEL; mod enums; mod structs; @@ -26,46 +25,22 @@ mod traits; pub use enums::*; pub use structs::*; pub use traits::*; -use zstd::DEFAULT_COMPRESSION_LEVEL; const ONE_KIB: usize = 1024; -const MAX_PAGE_SIZE: usize = 16 * ONE_KIB; +pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB; const ONE_MIB: usize = ONE_KIB * ONE_KIB; const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; -type SmallCache = Option<(usize, Box<[T]>)>; - -/// -/// A very small, fast, efficient and simple storable Vec -/// -/// Reads (imports of Mmap) are lazy -/// -/// Stores only raw data without any overhead, and doesn't even have a header -/// -/// The file isn't portable for speed reasons (TODO: but could be ?) -/// -/// If you don't call `.flush()` it just acts as a normal Vec -/// +#[allow(private_interfaces)] #[derive(Debug)] -pub struct StorableVec { - version: Version, - pathbuf: PathBuf, - stored_len: Length, - compressed: Compressed, - - // Compressed - decoded_pages: Option>>>, - decoded_page: SmallCache, - pages: CompressedPagesMetadata, - - // Raw - // raw_pages: Vec>>, - // raw_page: memmap2::Mmap, - // file: File, - // file_position: u64, - // buf: Vec, - pushed: Vec, - phantom: PhantomData, +pub enum StorableVec { + Raw { + base: Base, + }, + Compressed { + base: Base, + pages_meta: CompressedPagesMetadata, + }, } impl StorableVec @@ -75,7 +50,6 @@ where { pub const SIZE_OF_T: usize = size_of::(); pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; - /// In bytes pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; @@ -97,130 +71,15 @@ where } pub fn import(path: &Path, version: Version, compressed: Compressed) -> Result { - fs::create_dir_all(path)?; + let base = Base::import(path, version, compressed)?; - let version_path = Self::path_version_(path); - version.validate(version_path.as_ref())?; - version.write(version_path.as_ref())?; + if *compressed { + let pages_meta = CompressedPagesMetadata::read(Self::path_pages_meta_(path).as_path())?; - let compressed_path = Self::path_compressed_(path); - compressed.validate(compressed_path.as_ref())?; - compressed.write(compressed_path.as_ref())?; - - let stored_len = Length::try_from(Self::path_length_(path).as_path())?; - - let pages = CompressedPagesMetadata::read(Self::path_pages_(path).as_path())?; - - Ok(Self { - version, - compressed, - pathbuf: path.to_owned(), - stored_len, - decoded_pages: None, - pushed: vec![], - pages, - decoded_page: None, - phantom: PhantomData, - }) - } - - fn open_file(&self) -> io::Result { - Self::open_file_(&self.path_vec()) - } - fn open_file_(path: &Path) -> io::Result { - OpenOptions::new() - .read(true) - .create(true) - .truncate(false) - .append(true) - .open(path) - } - - #[inline(always)] - fn mmap(&self, page: &CompressedPageMetadata) -> io::Result { - let len = page.bytes_len as usize; - let offset = page.start; - let file = self.open_file()?; - - Ok(unsafe { - memmap2::MmapOptions::new() - .len(len) - .offset(offset) - .map(&file)? - }) - } - - fn decode(&self, page_index: usize) -> Result> { - if self.pages.len() <= page_index { - return Err(Error::ExpectVecToHaveIndex); + Ok(Self::Compressed { base, pages_meta }) + } else { + Ok(Self::Raw { base }) } - - let page = self.pages.get(page_index).unwrap(); - - let mmap = self.mmap(page)?; - - let decoded = zstd::decode_all(&mmap[..]); - - if decoded.is_err() { - dbg!((page, page_index, &mmap[..], &mmap.len(), &decoded)); - } - - Ok(decoded? - .chunks(Self::SIZE_OF_T) - .map(|slice| T::try_read_from_bytes(slice).unwrap()) - .collect::>() - .into_boxed_slice()) - } - - pub fn open_then_read(&self, index: I) -> Result> { - self.open_then_read_(Self::i_to_usize(index)?) - } - fn open_then_read_(&self, index: usize) -> Result> { - Ok(self - .decode(Self::index_to_page_index(index))? - .get(Self::index_to_decoded_index(index)) - .cloned()) - } - - pub fn init_big_cache(&mut self) -> io::Result<()> { - self.decoded_pages.replace(vec![]); - self.reset_big_cache() - } - - fn reset_big_cache(&mut self) -> io::Result<()> { - if self.decoded_pages.is_none() { - return Ok(()); - } - - let big_cache = self.decoded_pages.as_mut().unwrap(); - - big_cache.par_iter_mut().for_each(|lock| { - lock.take(); - }); - - let len = (*self.stored_len as f64 / Self::PER_PAGE as f64).ceil() as usize; - let len = Self::CACHE_LENGTH.min(len); - - if big_cache.len() != len { - big_cache.resize_with(len, Default::default); - } - - Ok(()) - } - - fn reset_caches(&mut self) -> io::Result<()> { - self.decoded_page.take(); - self.reset_big_cache() - } - - #[inline(always)] - fn index_to_page_index(index: usize) -> usize { - index / Self::PER_PAGE - } - - #[inline(always)] - fn index_to_decoded_index(index: usize) -> usize { - index % Self::PER_PAGE } #[inline] @@ -231,7 +90,7 @@ where match self.index_to_pushed_index(index) { Ok(index) => { if let Some(index) = index { - return Ok(self.pushed.get(index).map(|v| Value::Ref(v))); + return Ok(self.pushed().get(index).map(|v| Value::Ref(v))); } } Err(Error::IndexTooHigh) => return Ok(None), @@ -239,24 +98,22 @@ where Err(error) => return Err(error), } - if let Some(big_cache) = self - .decoded_pages - .as_ref() - .and_then(|v| if v.is_empty() { None } else { Some(v) }) - { + let large_cache_len = self.large_cache_len(); + if large_cache_len != 0 { let page_index = Self::index_to_page_index(index); - let last_index = *self.stored_len - 1; - let max_page_index = last_index / Self::PER_PAGE; - - let min_page_index = (max_page_index + 1) - big_cache.len(); + let last_index = self.stored_len() - 1; + let max_page_index = Self::index_to_page_index(last_index); + let min_page_index = (max_page_index + 1) - large_cache_len; if page_index >= min_page_index { - return Ok(big_cache + let values = self + .pages() + .unwrap() .get(page_index - min_page_index) .ok_or(Error::MmapsVecIsTooSmall)? - .get_or_init(|| self.decode(page_index).unwrap()) - .get(Self::index_to_decoded_index(index)) - .map(|v| Value::Ref(v))); + .get_or_init(|| self.decode_page(page_index).unwrap()); + + return Ok(values.get(index)?.map(|v| Value::Ref(v))); } } @@ -272,7 +129,7 @@ where match self.index_to_pushed_index(index) { Ok(index) => { if let Some(index) = index { - return Ok(self.pushed.get(index)); + return Ok(self.pushed().get(index)); } } Err(Error::IndexTooHigh) => return Ok(None), @@ -282,17 +139,12 @@ where let page_index = Self::index_to_page_index(index); - if self.decoded_page.as_ref().is_none_or(|b| b.0 != page_index) { - self.decoded_page - .replace((page_index, self.decode(page_index)?)); + if self.page().is_none_or(|b| b.0 != page_index) { + let values = self.decode_page(page_index)?; + self.mut_page().replace((page_index, values)); } - Ok(self - .decoded_page - .as_ref() - .unwrap() - .1 - .get(Self::index_to_decoded_index(index))) + self.page().unwrap().1.get(index) } pub fn read_last(&mut self) -> Result> { @@ -303,6 +155,26 @@ where self.read_(len - 1) } + pub fn open_then_read(&self, index: I) -> Result> { + self.open_then_read_(Self::i_to_usize(index)?) + } + fn open_then_read_(&self, index: usize) -> Result> { + Ok(match self { + Self::Raw { .. } => { + let mut file = self.open_file()?; + let byte_index = Self::index_to_byte_index(index); + file.seek(SeekFrom::Start(byte_index))?; + let mut buf = vec![0; Self::SIZE_OF_T]; + file.read_exact(&mut buf)?; + T::try_ref_from_bytes(&buf[..]).ok().map(|v| v.to_owned()) + } + Self::Compressed { .. } => self + .decode_page(Self::index_to_page_index(index))? + .get(index)? + .cloned(), + }) + } + pub fn iter(&mut self, f: F) -> Result<()> where F: FnMut((I, &T)) -> Result<()>, @@ -314,11 +186,11 @@ where where F: FnMut((I, &T)) -> Result<()>, { - if !self.pushed.is_empty() { + if !self.is_pushed_empty() { return Err(Error::UnsupportedUnflushedState); } - let stored_len = I::from(*self.stored_len); + let stored_len = I::from(self.stored_len()); while index < stored_len { let v = self.read(index)?.unwrap(); @@ -333,11 +205,11 @@ where where F: FnMut((I, T, &mut Self)) -> Result<()>, { - if !self.pushed.is_empty() { + if !self.is_pushed_empty() { return Err(Error::UnsupportedUnflushedState); } - let stored_len = I::from(*self.stored_len); + let stored_len = I::from(self.stored_len()); while index < stored_len { let v = self.read(index)?.unwrap().clone(); @@ -349,11 +221,11 @@ where } pub fn collect_range(&self, from: Option, to: Option) -> Result> { - if !self.pushed.is_empty() { + if !self.is_pushed_empty() { return Err(Error::UnsupportedUnflushedState); } - let len = *self.stored_len; + let len = self.stored_len(); let from = from.map_or(0, |from| { if from >= 0 { @@ -375,31 +247,87 @@ where return Err(Error::RangeFromAfterTo); } - let mut small_cache: SmallCache = None; + let mut page: Option<(usize, Values)> = None; let values = (from..=to) .flat_map(|index| { let page_index = Self::index_to_page_index(index); - if small_cache.as_ref().is_none_or(|b| b.0 != page_index) { - small_cache.replace((page_index, self.decode(page_index).unwrap())); + if page.as_ref().is_none_or(|b| b.0 != page_index) { + let values = self.decode_page(page_index).unwrap(); + page.replace((page_index, values)); } - small_cache - .as_ref() - .unwrap() - .1 - .get(Self::index_to_decoded_index(index)) - .cloned() + page.as_ref().unwrap().1.get(index).ok().flatten().cloned() }) .collect::>(); Ok(values) } + fn decode_page(&self, page_index: usize) -> Result> { + Self::decode_page_( + self.stored_len(), + page_index, + self.file(), + match self { + Self::Raw { .. } => None, + Self::Compressed { pages_meta, .. } => Some(pages_meta), + }, + ) + } + + fn decode_page_( + stored_len: usize, + page_index: usize, + file: &File, + compressed_pages_meta: Option<&CompressedPagesMetadata>, + ) -> Result> { + if Self::page_index_to_index(page_index) >= stored_len { + return Err(Error::IndexTooHigh); + } + + let (len, offset) = if let Some(pages_meta) = compressed_pages_meta { + if pages_meta.len() <= page_index { + return Err(Error::ExpectVecToHaveIndex); + } + let page = pages_meta.get(page_index).unwrap(); + (page.bytes_len as usize, page.start) + } else { + (Self::PAGE_SIZE, Self::page_index_to_byte_index(page_index)) + }; + + let mmap = unsafe { + memmap2::MmapOptions::new() + .len(len) + .offset(offset) + .map(file)? + }; + + let compressed = compressed_pages_meta.is_some(); + + if compressed { + let decoded = zstd::decode_all(&mmap[..]); + + if decoded.is_err() { + dbg!((len, offset, page_index, &mmap[..], &mmap.len(), &decoded)); + } + + Ok(Values::from( + decoded? + .chunks(Self::SIZE_OF_T) + .map(|slice| T::try_read_from_bytes(slice).unwrap()) + .collect::>() + .into_boxed_slice(), + )) + } else { + Ok(Values::from(mmap)) + } + } + #[inline] pub fn push(&mut self, value: T) { - self.pushed.push(value) + self.mut_pushed().push(value) } #[inline] @@ -411,7 +339,7 @@ where Ok(()) } Ordering::Equal => { - self.pushed.push(value); + self.mut_pushed().push(value); Ok(()) } Ordering::Less => { @@ -421,120 +349,188 @@ where } } - #[inline] - pub fn len(&self) -> usize { - *self.stored_len + self.pushed_len() - } - - #[inline] - pub fn pushed_len(&self) -> usize { - self.pushed.len() - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - #[inline] - pub fn has(&self, index: I) -> Result { - Ok(self.has_(Self::i_to_usize(index)?)) - } - #[inline] - fn has_(&self, index: usize) -> bool { - index < self.len() - } - - #[inline] - pub fn hasnt(&self, index: I) -> Result { - self.has(index).map(|b| !b) - } - pub fn flush(&mut self) -> io::Result<()> { - if self.pushed.is_empty() { + let pushed_len = self.pushed_len(); + + if pushed_len == 0 { return Ok(()); } - let mut file = self.open_file()?; + let stored_len = self.stored_len(); - let (starting_page_index, values) = if *self.stored_len % Self::PER_PAGE != 0 { - if self.pages.is_empty() { - unreachable!() - } + let bytes = match self { + Self::Compressed { base, pages_meta } => { + let (starting_page_index, values) = if *base.stored_len % Self::PER_PAGE != 0 { + if pages_meta.is_empty() { + unreachable!() + } - let last_page_index = self.pages.len() - 1; + let last_page_index = pages_meta.len() - 1; - let values = if let Some(values) = self.decoded_pages.as_mut().and_then(|big_cache| { - big_cache - .last_mut() - .and_then(|lock| lock.take()) - .map(|b| b.into_vec()) - }) { - values - } else if self - .decoded_page - .as_ref() - .is_some_and(|(page_index, _)| *page_index == last_page_index) - { - self.decoded_page.take().unwrap().1.into_vec() - } else { - self.decode(last_page_index) - .inspect_err(|_| { - dbg!(last_page_index, &self.pages); - }) - .unwrap() - .into_vec() - }; + let values = if let Some(values) = base + .pages + .as_mut() + .and_then(|big_cache| big_cache.last_mut().and_then(|lock| lock.take())) + { + values + } else if base + .page + .as_ref() + .is_some_and(|(page_index, _)| *page_index == last_page_index) + { + base.page.take().unwrap().1 + } else { + Self::decode_page_( + stored_len, + last_page_index, + &base.file, + Some(pages_meta), + ) + .inspect_err(|_| { + dbg!(last_page_index, &pages_meta); + }) + .unwrap() + }; - let file_len = self.pages.pop().unwrap().start; + let file_len = pages_meta.pop().unwrap().start; - Self::file_set_len(&mut file, file_len)?; + file_set_len(&mut base.file, file_len)?; - (last_page_index, values) - } else { - (self.pages.len(), vec![]) - }; - - self.stored_len += self.pushed_len(); - - let compressed = values - .into_par_iter() - .chain(mem::take(&mut self.pushed).into_par_iter()) - .chunks(Self::PER_PAGE) - .map(|chunk| (Self::compress_chunk(&chunk), chunk.len())) - .collect::>(); - - compressed - .iter() - .enumerate() - .for_each(|(i, (compressed_bytes, values_len))| { - let page_index = starting_page_index + i; - - let start = if page_index != 0 { - let prev = self.pages.get(page_index - 1).unwrap(); - prev.start + prev.bytes_len as u64 + (last_page_index, values) } else { - 0 + (pages_meta.len(), Values::default()) }; - let bytes_len = compressed_bytes.len() as u32; - let values_len = *values_len as u32; + let compressed = Vec::from(values.as_arr()) + .into_par_iter() + .chain(mem::take(&mut base.pushed).into_par_iter()) + .chunks(Self::PER_PAGE) + .map(|chunk| (Self::compress_chunk(chunk.as_ref()), chunk.len())) + .collect::>(); - let page = CompressedPageMetadata::new(start, bytes_len, values_len); + compressed + .iter() + .enumerate() + .for_each(|(i, (compressed_bytes, values_len))| { + let page_index = starting_page_index + i; - self.pages.push(page_index, page); - }); + let start = if page_index != 0 { + let prev = pages_meta.get(page_index - 1).unwrap(); + prev.start + prev.bytes_len as u64 + } else { + 0 + }; - let compressed = compressed - .into_iter() - .flat_map(|(v, _)| v) - .collect::>(); + let bytes_len = compressed_bytes.len() as u32; + let values_len = *values_len as u32; - self.pages.write()?; - file.write_all(&compressed)?; - self.reset_caches()?; + let page = CompressedPageMetadata::new(start, bytes_len, values_len); - self.write_length()?; + pages_meta.push(page_index, page); + }); + + pages_meta.write()?; + + compressed + .into_iter() + .flat_map(|(v, _)| v) + .collect::>() + } + Self::Raw { base } => { + let pushed = &mut base.pushed; + + let mut bytes: Vec = vec![0; pushed.len() * Self::SIZE_OF_T]; + + let unsafe_bytes = UnsafeSlice::new(&mut bytes); + + mem::take(pushed) + .into_par_iter() + .enumerate() + .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); + + bytes + } + }; + + self.mut_file().write_all(&bytes)?; + + self.reset_caches(); + + self.increase_stored_len(pushed_len); + + self.write_stored_length()?; + + Ok(()) + } + + pub fn truncate_if_needed(&mut self, index: I) -> Result<()> { + let index = Self::i_to_usize(index)?; + + if index >= self.stored_len() { + return Ok(()); + } + + if index == 0 { + self.reset()?; + return Ok(()); + } + + let page_index = Self::index_to_page_index(index); + + let values = match self { + Self::Compressed { .. } => self.decode_page(page_index)?, + Self::Raw { .. } => Values::default(), + }; + + let (len, bytes) = match self { + Self::Compressed { pages_meta, .. } => { + let mut page = pages_meta.truncate(page_index).unwrap(); + + let len = page.start; + + let decoded_index = Self::index_to_decoded_index(index); + + let compressed = if decoded_index != 0 { + let chunk = &values.as_arr()[..decoded_index]; + + let compressed = Self::compress_chunk(chunk); + + page.values_len = chunk.len() as u32; + page.bytes_len = compressed.len() as u32; + + pages_meta.push(page_index, page); + + compressed + } else { + vec![].into_boxed_slice() + }; + + pages_meta.write()?; + + (len, compressed) + } + Self::Raw { .. } => { + // let value_at_index = self.open_then_read_(index).ok(); + + let len = Self::index_to_byte_index(index); + + (len, vec![].into_boxed_slice()) + } + }; + + let file = self.mut_file(); + + file_set_len(file, len)?; + + if !bytes.is_empty() { + file.write_all(&bytes)?; + } + + self.set_stored_len(index); + + self.write_stored_length()?; + + self.reset_caches(); Ok(()) } @@ -558,62 +554,49 @@ where .into_boxed_slice() } - pub fn truncate_if_needed(&mut self, index: I) -> Result<()> { - let index = Self::i_to_usize(index)?; - - if index >= *self.stored_len { - return Ok(()); - } - - if index == 0 { - self.reset_file()?; - return Ok(()); - } - - let page_index = Self::index_to_page_index(index); - - let values = self.decode(page_index)?; - let mut page = self.pages.truncate(page_index).unwrap(); - - let mut file = self.open_file()?; - Self::file_set_len(&mut file, page.start)?; - - let decoded_index = Self::index_to_decoded_index(index); - - if decoded_index != 0 { - let chunk = &values[..decoded_index]; - - let compressed = Self::compress_chunk(chunk); - - page.values_len = chunk.len() as u32; - page.bytes_len = compressed.len() as u32; - - file.write_all(&compressed)?; - - self.pages.push(page_index, page); - } - - self.pages.write()?; - - *self.stored_len = index; - self.write_length()?; - - self.reset_caches()?; - - Ok(()) + pub fn enable_large_cache(&mut self) { + self.mut_pages().replace(vec![]); + self.reset_large_cache(); } - pub fn reset_file(&mut self) -> Result<()> { - let mut file = self.open_file()?; - Self::file_set_len(&mut file, 0)?; - *self.stored_len = 0; - self.reset_caches()?; - Ok(()) + pub fn disable_large_cache(&mut self) { + self.mut_base().pages.take(); } - fn file_set_len(file: &mut File, len: u64) -> io::Result<()> { - file.set_len(len)?; - file.seek(SeekFrom::End(0))?; + fn reset_large_cache(&mut self) { + let stored_len = self.stored_len(); + + if let Some(pages) = self.mut_pages().as_mut() { + pages.par_iter_mut().for_each(|lock| { + lock.take(); + }); + + let len = (stored_len as f64 / Self::PER_PAGE as f64).ceil() as usize; + let len = Self::CACHE_LENGTH.min(len); + + if pages.len() != len { + pages.resize_with(len, Default::default); + } + } + } + + fn large_cache_len(&self) -> usize { + self.pages().map_or(0, |v| v.len()) + } + + fn reset_small_cache(&mut self) { + self.mut_base().page.take(); + } + + fn reset_caches(&mut self) { + self.reset_small_cache(); + self.reset_large_cache(); + } + + pub fn reset(&mut self) -> Result<()> { + self.mut_base().reset_file()?; + self.reset_stored_len(); + self.reset_caches(); Ok(()) } @@ -624,11 +607,11 @@ where #[inline] fn index_to_pushed_index(&self, index: usize) -> Result> { - let file_len = *self.stored_len; + let stored_len = self.stored_len(); - if index >= file_len { - let index = index - file_len; - if index >= self.pushed.len() { + if index >= stored_len { + let index = index - stored_len; + if index >= self.pushed_len() { Err(Error::IndexTooHigh) } else { Ok(Some(index)) @@ -638,6 +621,139 @@ where } } + #[inline] + fn index_to_byte_index(index: usize) -> u64 { + (index * Self::SIZE_OF_T) as u64 + } + + #[inline(always)] + fn index_to_page_index(index: usize) -> usize { + index / Self::PER_PAGE + } + + #[inline(always)] + fn page_index_to_index(page_index: usize) -> usize { + page_index * Self::PER_PAGE + } + + #[inline(always)] + fn page_index_to_byte_index(page_index: usize) -> u64 { + (page_index * Self::PAGE_SIZE) as u64 + } + + #[inline(always)] + fn index_to_decoded_index(index: usize) -> usize { + index % Self::PER_PAGE + } + + #[inline] + fn path_pages_meta_(path: &Path) -> PathBuf { + path.join("pages_meta") + } + + #[inline] + fn page(&self) -> Option<&(usize, Values)> { + self.base().page.as_ref() + } + + #[inline] + fn mut_page(&mut self) -> &mut Option<(usize, Values)> { + &mut self.mut_base().page + } + + #[inline] + fn pages(&self) -> Option<&Vec>>> { + self.base().pages.as_ref() + } + + #[inline] + fn mut_pages(&mut self) -> &mut Option>>> { + &mut self.mut_base().pages + } + + #[inline] + pub fn len(&self) -> usize { + self.stored_len() + self.pushed_len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + #[inline] + pub fn has(&self, index: I) -> Result { + Ok(self.has_(Self::i_to_usize(index)?)) + } + #[inline] + fn has_(&self, index: usize) -> bool { + index < self.len() + } + + #[inline] + pub fn hasnt(&self, index: I) -> Result { + self.has(index).map(|b| !b) + } + + #[inline] + fn pushed(&self) -> &Vec { + &self.base().pushed + } + + #[inline] + fn mut_pushed(&mut self) -> &mut Vec { + &mut self.mut_base().pushed + } + + #[inline] + pub fn pushed_len(&self) -> usize { + self.pushed().len() + } + + #[inline] + fn is_pushed_empty(&self) -> bool { + self.pushed_len() == 0 + } + + #[inline] + fn stored_len(&self) -> usize { + *self.base().stored_len + } + + #[inline] + fn set_stored_len(&mut self, len: usize) { + *self.mut_base().stored_len = len; + } + + fn increase_stored_len(&mut self, len: usize) { + *self.mut_base().stored_len += len; + } + + #[inline] + fn reset_stored_len(&mut self) { + self.set_stored_len(0); + } + + fn write_stored_length(&self) -> io::Result<()> { + self.base().write_stored_length() + } + + #[inline] + pub fn path(&self) -> &Path { + &self.base().pathbuf + } + + fn file(&self) -> &File { + &self.base().file + } + + fn mut_file(&mut self) -> &mut File { + &mut self.mut_base().file + } + + fn open_file(&self) -> io::Result { + self.base().open_file() + } + pub fn file_name(&self) -> String { self.path() .file_name() @@ -648,8 +764,91 @@ where } #[inline] - pub fn path(&self) -> &Path { - &self.pathbuf + pub fn version(&self) -> Version { + self.base().version + } + + #[inline] + fn compressed(&self) -> Compressed { + self.base().compressed + } + + #[inline] + fn base(&self) -> &Base { + match self { + Self::Raw { base, .. } => base, + Self::Compressed { base, .. } => base, + } + } + + #[inline] + fn mut_base(&mut self) -> &mut Base { + match self { + Self::Raw { base, .. } => base, + Self::Compressed { base, .. } => base, + } + } + + pub fn index_type_to_string(&self) -> &str { + std::any::type_name::() + } +} + +#[derive(Debug)] +struct Base { + pub version: Version, + pub pathbuf: PathBuf, + pub stored_len: Length, + pub compressed: Compressed, + pub page: Option<(usize, Values)>, + pub pages: Option>>>, + pub pushed: Vec, + pub file: File, + pub phantom: PhantomData, +} + +impl Base { + pub fn import(path: &Path, version: Version, compressed: Compressed) -> Result { + fs::create_dir_all(path)?; + + let version_path = Self::path_version_(path); + version.validate(version_path.as_ref())?; + version.write(version_path.as_ref())?; + + let compressed_path = Self::path_compressed_(path); + compressed.validate(compressed_path.as_ref())?; + compressed.write(compressed_path.as_ref())?; + + let stored_len = Length::try_from(Self::path_length_(path).as_path())?; + + Ok(Self { + version, + compressed, + pathbuf: path.to_owned(), + file: Self::open_file_(Self::path_vec_(path).as_path())?, + stored_len, + page: None, + pages: None, + pushed: vec![], + phantom: PhantomData, + }) + } + + fn open_file(&self) -> io::Result { + Self::open_file_(&self.path_vec()) + } + fn open_file_(path: &Path) -> io::Result { + OpenOptions::new() + .read(true) + .create(true) + .truncate(false) + .append(true) + .open(path) + } + + fn reset_file(&mut self) -> Result<()> { + file_set_len(&mut self.file, 0)?; + Ok(()) } #[inline] @@ -658,10 +857,10 @@ where } #[inline] fn path_vec_(path: &Path) -> PathBuf { - path.join("vec.zstd") + path.join("vec") } - fn write_length(&self) -> io::Result<()> { + fn write_stored_length(&self) -> io::Result<()> { self.stored_len.write(&self.path_length()) } #[inline] @@ -673,11 +872,6 @@ where path.join("length") } - #[inline] - fn path_pages_(path: &Path) -> PathBuf { - path.join("pages") - } - #[inline] fn path_version_(path: &Path) -> PathBuf { path.join("version") @@ -687,14 +881,6 @@ where fn path_compressed_(path: &Path) -> PathBuf { path.join("compressed") } - - pub fn index_type_to_string(&self) -> &str { - std::any::type_name::() - } - - pub fn version(&self) -> Version { - self.version - } } impl Clone for StorableVec @@ -703,6 +889,12 @@ where T: StoredType, { fn clone(&self) -> Self { - Self::import(&self.pathbuf, self.version, self.compressed).unwrap() + Self::import(self.path(), self.version(), self.compressed()).unwrap() } } + +fn file_set_len(file: &mut File, len: u64) -> io::Result<()> { + file.set_len(len)?; + file.seek(SeekFrom::End(0))?; + Ok(()) +} diff --git a/crates/brk_vec/src/structs/back.rs b/crates/brk_vec/src/structs/back.rs deleted file mode 100644 index fbd6dd506..000000000 --- a/crates/brk_vec/src/structs/back.rs +++ /dev/null @@ -1,20 +0,0 @@ -use std::{fs::File, sync::OnceLock}; - -use super::CompressedPagesMetadata; - -type CompressedPage = Option<(usize, Box<[T]>)>; - -pub enum Back { - Raw { - raw_pages: Vec>>, - raw_page: memmap2::Mmap, - file: File, - file_position: u64, - buf: Vec, - }, - Compressed { - decoded_pages: Option>>>, - decoded_page: CompressedPage, - pages: CompressedPagesMetadata, - }, -} diff --git a/crates/brk_vec/src/structs/page.rs b/crates/brk_vec/src/structs/compressed_page_meta.rs similarity index 100% rename from crates/brk_vec/src/structs/page.rs rename to crates/brk_vec/src/structs/compressed_page_meta.rs diff --git a/crates/brk_vec/src/structs/pages.rs b/crates/brk_vec/src/structs/compressed_pages_meta.rs similarity index 100% rename from crates/brk_vec/src/structs/pages.rs rename to crates/brk_vec/src/structs/compressed_pages_meta.rs diff --git a/crates/brk_vec/src/structs/mod.rs b/crates/brk_vec/src/structs/mod.rs index a17374000..e6feedea9 100644 --- a/crates/brk_vec/src/structs/mod.rs +++ b/crates/brk_vec/src/structs/mod.rs @@ -1,15 +1,13 @@ -mod back; mod compressed; +mod compressed_page_meta; +mod compressed_pages_meta; mod length; -mod page; -mod pages; mod unsafe_slice; mod version; -pub use back::*; pub use compressed::*; +pub use compressed_page_meta::*; +pub use compressed_pages_meta::*; pub use length::*; -pub use page::*; -pub use pages::*; pub use unsafe_slice::*; pub use version::*; diff --git a/crates/brk_vec/src/traits/any.rs b/crates/brk_vec/src/traits/any.rs index d8324e30c..ef5280910 100644 --- a/crates/brk_vec/src/traits/any.rs +++ b/crates/brk_vec/src/traits/any.rs @@ -9,7 +9,11 @@ pub trait AnyStorableVec: Send + Sync { fn index_type_to_string(&self) -> &str; fn len(&self) -> usize; fn is_empty(&self) -> bool; - fn collect_range_values(&self, from: Option, to: Option) -> Result>; + fn collect_range_values( + &self, + from: Option, + to: Option, + ) -> Result>; fn flush(&mut self) -> io::Result<()>; } @@ -38,7 +42,11 @@ where self.flush() } - fn collect_range_values(&self, from: Option, to: Option) -> Result> { + fn collect_range_values( + &self, + from: Option, + to: Option, + ) -> Result> { Ok(self .collect_range(from, to)? .into_iter()