From abde9ed1623b5a66536b8898514f9fa337612002 Mon Sep 17 00:00:00 2001 From: nym21 Date: Wed, 10 Dec 2025 17:36:12 +0100 Subject: [PATCH] global: fully replace fjall2 by fjall3 --- Cargo.lock | 120 +----- Cargo.toml | 6 +- crates/brk_error/Cargo.toml | 3 +- crates/brk_error/src/lib.rs | 19 +- crates/brk_indexer/Cargo.toml | 3 +- crates/brk_indexer/src/lib.rs | 6 +- crates/brk_indexer/src/processor.rs | 2 +- .../src/{stores_v3.rs => stores.rs} | 37 +- crates/brk_indexer/src/stores_v2.rs | 344 ---------------- crates/brk_store/Cargo.toml | 6 +- crates/brk_store/src/any.rs | 9 +- crates/brk_store/src/fjall_v2/meta.rs | 92 ----- crates/brk_store/src/fjall_v2/mod.rs | 312 -------------- crates/brk_store/src/fjall_v3/mod.rs | 380 ------------------ crates/brk_store/src/item.rs | 38 ++ crates/brk_store/src/kind.rs | 25 ++ crates/brk_store/src/lib.rs | 327 ++++++++++++++- crates/brk_store/src/{fjall_v3 => }/meta.rs | 2 +- crates/brk_store/src/mode.rs | 15 + 19 files changed, 447 insertions(+), 1299 deletions(-) rename crates/brk_indexer/src/{stores_v3.rs => stores.rs} (94%) delete mode 100644 crates/brk_indexer/src/stores_v2.rs delete mode 100644 crates/brk_store/src/fjall_v2/meta.rs delete mode 100644 crates/brk_store/src/fjall_v2/mod.rs delete mode 100644 crates/brk_store/src/fjall_v3/mod.rs create mode 100644 crates/brk_store/src/item.rs create mode 100644 crates/brk_store/src/kind.rs rename crates/brk_store/src/{fjall_v3 => }/meta.rs (98%) create mode 100644 crates/brk_store/src/mode.rs diff --git a/Cargo.lock b/Cargo.lock index fb5c9e5d2..420915e2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -644,7 +644,6 @@ version = "0.0.111" dependencies = [ "bitcoin", "bitcoincore-rpc", - "brk_fjall", "fjall", "jiff", "minreq", @@ -665,23 +664,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "brk_fjall" -version = "2.11.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da285ef974591f84284f6ab500dfd903c7a9bf8674e2c17f8b35a44a20726ee1" -dependencies = [ - "byteorder", - "byteview 0.6.1", - "dashmap", - "log", - "lsm-tree 2.10.4", - "path-absolutize", - "std-semaphore", - "tempfile", - "xxhash-rust", -] - [[package]] name = "brk_grouper" version = "0.0.111" @@ -700,7 +682,6 @@ dependencies = [ "bitcoin", "brk_bencher", "brk_error", - "brk_fjall", "brk_grouper", "brk_iterator", "brk_logger", @@ -1241,10 +1222,8 @@ name = "brk_store" version = "0.0.111" dependencies = [ "brk_error", - "brk_fjall", "brk_types", - "byteview 0.6.1", - "byteview 0.9.1", + "byteview", "fjall", "rustc-hash", ] @@ -1287,7 +1266,7 @@ version = "0.0.111" dependencies = [ "bitcoin", "brk_error", - "byteview 0.9.1", + "byteview", "derive_deref", "itoa", "jiff", @@ -1356,12 +1335,6 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" -[[package]] -name = "byteview" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5" - [[package]] name = "byteview" version = "0.9.1" @@ -1943,12 +1916,6 @@ dependencies = [ "libloading", ] -[[package]] -name = "double-ended-peekable" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57" - [[package]] name = "dragonbox_ecma" version = "0.0.5" @@ -2113,14 +2080,16 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "fjall" -version = "3.0.0-rc.5" +version = "3.0.0-rc.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff63be4348f42ed3c0c50175785ff14d0a833915c6b499a31f91d0e3ec5fc337" dependencies = [ "byteorder-lite", - "byteview 0.9.1", + "byteview", "dashmap", "flume", "log", - "lsm-tree 3.0.0-rc.5", + "lsm-tree", "lz4_flex 0.11.5", "tempfile", "xxhash-rust", @@ -2404,12 +2373,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" -[[package]] -name = "guardian" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17e2ac29387b1aa07a1e448f7bb4f35b500787971e965b02842b900afa5c8f6f" - [[package]] name = "half" version = "2.7.1" @@ -2997,34 +2960,12 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "lsm-tree" -version = "2.10.4" +version = "3.0.0-rc.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "799399117a2bfb37660e08be33f470958babb98386b04185288d829df362ea15" -dependencies = [ - "byteorder", - "crossbeam-skiplist", - "double-ended-peekable", - "enum_dispatch", - "guardian", - "interval-heap", - "log", - "lz4_flex 0.11.5", - "path-absolutize", - "quick_cache", - "rustc-hash", - "self_cell", - "tempfile", - "value-log", - "varint-rs", - "xxhash-rust", -] - -[[package]] -name = "lsm-tree" -version = "3.0.0-rc.5" +checksum = "315d36f307af4d53f1030d6561de3fb6b914d5c242c353be101ddff91527c4b4" dependencies = [ "byteorder-lite", - "byteview 0.9.1", + "byteview", "crossbeam-skiplist", "enum_dispatch", "interval-heap", @@ -3850,24 +3791,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" -[[package]] -name = "path-absolutize" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5" -dependencies = [ - "path-dedot", -] - -[[package]] -name = "path-dedot" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397" -dependencies = [ - "once_cell", -] - [[package]] name = "pathdiff" version = "0.2.3" @@ -4847,12 +4770,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "std-semaphore" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e" - [[package]] name = "str_indices" version = "0.4.4" @@ -5430,23 +5347,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" -[[package]] -name = "value-log" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c" -dependencies = [ - "byteorder", - "byteview 0.6.1", - "interval-heap", - "log", - "path-absolutize", - "rustc-hash", - "tempfile", - "varint-rs", - "xxhash-rust", -] - [[package]] name = "value-trait" version = "0.12.1" diff --git a/Cargo.toml b/Cargo.toml index 6053dfd44..2a96d3e5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,13 +57,11 @@ brk_store = { version = "0.0.111", path = "crates/brk_store" } brk_types = { version = "0.0.111", path = "crates/brk_types" } brk_traversable = { version = "0.0.111", path = "crates/brk_traversable", features = ["pco", "derive"] } brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_derive" } -# byteview = "=0.6.1" byteview = "0.9.1" color-eyre = "0.6.5" derive_deref = "1.1.1" -fjall2 = { version = "2.11.8", package = "brk_fjall" } -# fjall3 = { version = "3.0.0-rc.5", package = "fjall" } -fjall3 = { path = "../fjall3", package = "fjall" } +fjall = "3.0.0-rc.6" +# fjall3 = { path = "../fjall3", package = "fjall" } # fjall3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "434979ef59d8fd2b36b91e6ff759a36d19a397ee", package = "fjall" } jiff = "0.2.16" log = "0.4.29" diff --git a/crates/brk_error/Cargo.toml b/crates/brk_error/Cargo.toml index 836bb2a26..91a5f9882 100644 --- a/crates/brk_error/Cargo.toml +++ b/crates/brk_error/Cargo.toml @@ -11,8 +11,7 @@ build = "build.rs" [dependencies] bitcoin = { workspace = true } bitcoincore-rpc = { workspace = true } -fjall2 = { workspace = true } -fjall3 = { workspace = true } +fjall = { workspace = true } jiff = { workspace = true } minreq = { workspace = true } serde_json = { workspace = true } diff --git a/crates/brk_error/src/lib.rs b/crates/brk_error/src/lib.rs index 7e6d827fc..8288025a8 100644 --- a/crates/brk_error/src/lib.rs +++ b/crates/brk_error/src/lib.rs @@ -12,8 +12,7 @@ pub enum Error { IO(io::Error), BitcoinRPC(bitcoincore_rpc::Error), Jiff(jiff::Error), - FjallV2(fjall2::Error), - FjallV3(fjall3::Error), + Fjall(fjall::Error), VecDB(vecdb::Error), RawDB(vecdb::RawDBError), Minreq(minreq::Error), @@ -142,17 +141,10 @@ impl From for Error { } } -impl From for Error { +impl From for Error { #[inline] - fn from(value: fjall3::Error) -> Self { - Self::FjallV3(value) - } -} - -impl From for Error { - #[inline] - fn from(value: fjall2::Error) -> Self { - Self::FjallV2(value) + fn from(value: fjall::Error) -> Self { + Self::Fjall(value) } } @@ -172,8 +164,7 @@ impl fmt::Display for Error { Error::BitcoinHexError(error) => Display::fmt(&error, f), Error::BitcoinHexToArrayError(error) => Display::fmt(&error, f), Error::BitcoinRPC(error) => Display::fmt(&error, f), - Error::FjallV2(error) => Display::fmt(&error, f), - Error::FjallV3(error) => Display::fmt(&error, f), + Error::Fjall(error) => Display::fmt(&error, f), Error::IO(error) => Display::fmt(&error, f), Error::Jiff(error) => Display::fmt(&error, f), Error::Minreq(error) => Display::fmt(&error, f), diff --git a/crates/brk_indexer/Cargo.toml b/crates/brk_indexer/Cargo.toml index 6f8dda0c9..78b8635df 100644 --- a/crates/brk_indexer/Cargo.toml +++ b/crates/brk_indexer/Cargo.toml @@ -20,8 +20,7 @@ brk_rpc = { workspace = true } brk_store = { workspace = true } brk_types = { workspace = true } brk_traversable = { workspace = true } -fjall2 = { workspace = true } -fjall3 = { workspace = true } +fjall = { workspace = true } log = { workspace = true } rayon = { workspace = true } rustc-hash = { workspace = true } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 58458c1d5..d449edeb0 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -12,16 +12,14 @@ mod constants; mod indexes; mod processor; mod readers; -// mod stores_v2; -mod stores_v3; +mod stores; mod vecs; use constants::*; pub use indexes::*; pub use processor::*; pub use readers::*; -// pub use stores_v2::*; -pub use stores_v3::*; +pub use stores::*; pub use vecs::*; #[derive(Clone)] diff --git a/crates/brk_indexer/src/processor.rs b/crates/brk_indexer/src/processor.rs index b5c720b5c..dcdc9a293 100644 --- a/crates/brk_indexer/src/processor.rs +++ b/crates/brk_indexer/src/processor.rs @@ -221,7 +221,7 @@ impl<'a> BlockProcessor<'a> { { txindex } else { - return Err(Error::Str("Can't find txid = {txid}")); + return Err(Error::UnknownTxid); }; let txoutindex = self diff --git a/crates/brk_indexer/src/stores_v3.rs b/crates/brk_indexer/src/stores.rs similarity index 94% rename from crates/brk_indexer/src/stores_v3.rs rename to crates/brk_indexer/src/stores.rs index a9f35d8ca..0a40442a5 100644 --- a/crates/brk_indexer/src/stores_v3.rs +++ b/crates/brk_indexer/src/stores.rs @@ -2,12 +2,12 @@ use std::{fs, path::Path, time::Instant}; use brk_error::Result; use brk_grouper::ByAddressType; -use brk_store::{AnyStore, Kind3, Mode3, StoreFjallV3 as Store}; +use brk_store::{AnyStore, Kind, Mode, Store}; use brk_types::{ AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, OutPoint, OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout, }; -use fjall3::{Database, PersistMode}; +use fjall::{Database, PersistMode}; use log::info; use rayon::prelude::*; use vecdb::{AnyVec, TypedVecIterator, VecIndex, VecIterator}; @@ -36,7 +36,7 @@ impl Stores { fs::create_dir_all(&pathbuf)?; - let database = match brk_store::open_fjall3_database(path) { + let database = match brk_store::open_database(path) { Ok(database) => database, Err(_) => { fs::remove_dir_all(path)?; @@ -47,14 +47,13 @@ impl Stores { let database_ref = &database; let create_addresshash_to_addressindex_store = |index| { - Store::import_cached( + Store::import( database_ref, path, &format!("h2i{}", index), version, - Mode3::PushOnly, - Kind3::Random, - 10, + Mode::PushOnly, + Kind::Random, ) }; @@ -64,8 +63,8 @@ impl Stores { path, &format!("a2t{}", index), version, - Mode3::PushOnly, - Kind3::Vec, + Mode::PushOnly, + Kind::Vec, ) }; @@ -75,8 +74,8 @@ impl Stores { path, &format!("a2u{}", index), version, - Mode3::Any, - Kind3::Vec, + Mode::Any, + Kind::Vec, ) }; @@ -88,8 +87,8 @@ impl Stores { path, "height_to_coinbase_tag", version, - Mode3::PushOnly, - Kind3::Sequential, + Mode::PushOnly, + Kind::Sequential, )?, addresstype_to_addresshash_to_addressindex: ByAddressType::new_with_index( create_addresshash_to_addressindex_store, @@ -105,17 +104,17 @@ impl Stores { path, "blockhashprefix_to_height", version, - Mode3::PushOnly, - Kind3::Random, + Mode::PushOnly, + Kind::Random, )?, txidprefix_to_txindex: Store::import_cached( database_ref, path, "txidprefix_to_txindex", version, - Mode3::PushOnly, - Kind3::Random, - 10, + Mode::PushOnly, + Kind::Recent, + 5, )?, }) } @@ -170,7 +169,7 @@ impl Stores { .par_values_mut() .map(|s| s as &mut dyn AnyStore), ) // Changed from par_iter_mut() - .try_for_each(|store| store.commit_f3(height))?; + .try_for_each(|store| store.commit(height))?; info!("Commits done in {:?}", i.elapsed()); let i = Instant::now(); diff --git a/crates/brk_indexer/src/stores_v2.rs b/crates/brk_indexer/src/stores_v2.rs deleted file mode 100644 index fabbb923a..000000000 --- a/crates/brk_indexer/src/stores_v2.rs +++ /dev/null @@ -1,344 +0,0 @@ -use std::{fs, path::Path}; - -use brk_error::Result; -use brk_grouper::ByAddressType; -use brk_store::{AnyStore, Mode, StoreFjallV2 as Store, Type}; -use brk_types::{ - AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, OutPoint, - OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout, -}; -use fjall2::{CompressionType as Compression, PersistMode, TransactionalKeyspace}; -use rayon::prelude::*; -use vecdb::{AnyVec, TypedVecIterator, VecIndex, VecIterator}; - -use crate::Indexes; - -use super::Vecs; - -#[derive(Clone)] -pub struct Stores { - pub keyspace: TransactionalKeyspace, - - pub addresstype_to_addresshash_to_addressindex: ByAddressType>, - pub addresstype_to_addressindex_and_txindex: ByAddressType>, - pub addresstype_to_addressindex_and_unspentoutpoint: - ByAddressType>, - pub blockhashprefix_to_height: Store, - pub height_to_coinbase_tag: Store, - pub txidprefix_to_txindex: Store, -} - -impl Stores { - pub fn forced_import(parent: &Path, version: Version) -> Result { - let pathbuf = parent.join("stores"); - let path = pathbuf.as_path(); - - fs::create_dir_all(&pathbuf)?; - - let keyspace = match brk_store::open_keyspace(path) { - Ok(keyspace) => keyspace, - Err(_) => { - fs::remove_dir_all(path)?; - return Self::forced_import(path, version); - } - }; - - let keyspace_ref = &keyspace; - - let create_addresshash_to_addressindex_store = |index| { - Store::import( - keyspace_ref, - path, - &format!("h2i{}", index), - version, - Mode::UniquePushOnly(Type::Random), - Compression::Lz4, - ) - }; - - let create_addressindex_to_txindex_store = |index| { - Store::import( - keyspace_ref, - path, - &format!("a2t{}", index), - version, - Mode::VecLike, - Compression::Lz4, - ) - }; - - let create_addressindex_to_unspentoutpoint_store = |index| { - Store::import( - keyspace_ref, - path, - &format!("a2u{}", index), - version, - Mode::VecLike, - Compression::Lz4, - ) - }; - - Ok(Self { - keyspace: keyspace.clone(), - - height_to_coinbase_tag: Store::import( - keyspace_ref, - path, - "h2c", - version, - Mode::UniquePushOnly(Type::Sequential), - Compression::Lz4, - )?, - addresstype_to_addresshash_to_addressindex: ByAddressType::new_with_index( - create_addresshash_to_addressindex_store, - )?, - blockhashprefix_to_height: Store::import( - keyspace_ref, - path, - "b2h", - version, - Mode::UniquePushOnly(Type::Random), - Compression::Lz4, - )?, - txidprefix_to_txindex: Store::import( - keyspace_ref, - path, - "t2t", - version, - Mode::UniquePushOnly(Type::Random), - Compression::Lz4, - )?, - addresstype_to_addressindex_and_txindex: ByAddressType::new_with_index( - create_addressindex_to_txindex_store, - )?, - addresstype_to_addressindex_and_unspentoutpoint: ByAddressType::new_with_index( - create_addressindex_to_unspentoutpoint_store, - )?, - }) - } - - pub fn starting_height(&self) -> Height { - [ - &self.blockhashprefix_to_height as &dyn AnyStore, - &self.height_to_coinbase_tag, - &self.txidprefix_to_txindex, - ] - .into_iter() - .chain( - self.addresstype_to_addresshash_to_addressindex - .values() - .map(|s| s as &dyn AnyStore), - ) - .chain( - self.addresstype_to_addressindex_and_txindex - .values() - .map(|s| s as &dyn AnyStore), - ) - .chain( - self.addresstype_to_addressindex_and_unspentoutpoint - .values() - .map(|s| s as &dyn AnyStore), - ) - .map(|store| store.height().map(Height::incremented).unwrap_or_default()) - .min() - .unwrap() - } - - pub fn commit(&mut self, height: Height) -> Result<()> { - let tuples = [ - &mut self.blockhashprefix_to_height as &mut dyn AnyStore, - &mut self.height_to_coinbase_tag, - &mut self.txidprefix_to_txindex, - ] - .into_par_iter() - .chain( - self.addresstype_to_addresshash_to_addressindex - .par_values_mut() - .map(|s| s as &mut dyn AnyStore), - ) - .chain( - self.addresstype_to_addressindex_and_txindex - .par_values_mut() - .map(|s| s as &mut dyn AnyStore), - ) - .chain( - self.addresstype_to_addressindex_and_unspentoutpoint - .par_values_mut() - .map(|s| s as &mut dyn AnyStore), - ) - .map(|store| { - let items = store.take_all_f2(); - store.export_meta_if_needed(height)?; - Ok((store.partition(), items)) - }) - .collect::>>()?; - - self.keyspace.inner().batch().commit_partitions(tuples)?; - - self.keyspace - .persist(PersistMode::SyncData) - .map_err(|e| e.into()) - } - - pub fn rollback_if_needed( - &mut self, - vecs: &mut Vecs, - starting_indexes: &Indexes, - ) -> Result<()> { - if self.blockhashprefix_to_height.is_empty()? - && self.txidprefix_to_txindex.is_empty()? - && self.height_to_coinbase_tag.is_empty()? - && self - .addresstype_to_addresshash_to_addressindex - .values() - .try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))? - && self - .addresstype_to_addressindex_and_txindex - .values() - .try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))? - && self - .addresstype_to_addressindex_and_unspentoutpoint - .values() - .try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))? - { - return Ok(()); - } - - if starting_indexes.height != Height::ZERO { - vecs.height_to_blockhash - .iter()? - .skip(starting_indexes.height.to_usize()) - .map(BlockHashPrefix::from) - .for_each(|prefix| { - self.blockhashprefix_to_height.remove(prefix); - }); - - (starting_indexes.height.to_usize()..vecs.height_to_blockhash.len()) - .map(Height::from) - .for_each(|h| { - self.height_to_coinbase_tag.remove(h); - }); - - // Remove address hashes for all address types starting from rollback height - for address_type in [ - OutputType::P2PK65, - OutputType::P2PK33, - OutputType::P2PKH, - OutputType::P2SH, - OutputType::P2WPKH, - OutputType::P2WSH, - OutputType::P2TR, - OutputType::P2A, - ] { - for hash in vecs.iter_address_hashes_from(address_type, starting_indexes.height)? { - self.addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(address_type) - .remove(hash); - } - } - } else { - unreachable!(); - } - - if starting_indexes.txindex != TxIndex::ZERO { - vecs.txindex_to_txid - .iter()? - .enumerate() - .skip(starting_indexes.txindex.to_usize()) - .for_each(|(txindex, txid)| { - let txindex = TxIndex::from(txindex); - let txidprefix = TxidPrefix::from(&txid); - - let is_known_dup = - crate::DUPLICATE_TXID_PREFIXES - .iter() - .any(|(dup_prefix, dup_txindex)| { - txindex == *dup_txindex && txidprefix == *dup_prefix - }); - - if !is_known_dup { - self.txidprefix_to_txindex.remove(txidprefix); - } - }); - } else { - unreachable!(); - } - - if starting_indexes.txoutindex != TxOutIndex::ZERO { - let mut txoutindex_to_txindex_iter = vecs.txoutindex_to_txindex.iter()?; - let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?; - vecs.txoutindex_to_outputtype - .iter()? - .enumerate() - .skip(starting_indexes.txoutindex.to_usize()) - .zip( - vecs.txoutindex_to_typeindex - .iter()? - .skip(starting_indexes.txoutindex.to_usize()), - ) - .filter(|((_, outputtype), _)| outputtype.is_address()) - .for_each(|((txoutindex, addresstype), addressindex)| { - let txindex = txoutindex_to_txindex_iter.get_at_unwrap(txoutindex); - - self.addresstype_to_addressindex_and_txindex - .get_mut_unwrap(addresstype) - .remove(AddressIndexTxIndex::from((addressindex, txindex))); - - let vout = Vout::from( - txoutindex.to_usize() - - txindex_to_first_txoutindex_iter - .get_unwrap(txindex) - .to_usize(), - ); - let outpoint = OutPoint::new(txindex, vout); - - self.addresstype_to_addressindex_and_unspentoutpoint - .get_mut_unwrap(addresstype) - .remove(AddressIndexOutPoint::from((addressindex, outpoint))); - }); - - // Add back outputs that were spent after the rollback point - let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?; - let mut txoutindex_to_outputtype_iter = vecs.txoutindex_to_outputtype.iter()?; - let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.iter()?; - vecs.txinindex_to_outpoint - .iter()? - .skip(starting_indexes.txinindex.to_usize()) - .for_each(|outpoint| { - if outpoint.is_coinbase() { - return; - } - - let txindex = outpoint.txindex(); - let vout = outpoint.vout(); - - // Calculate txoutindex from txindex and vout - let txoutindex = txindex_to_first_txoutindex_iter.get_unwrap(txindex) + vout; - - // Only process if this output was created before the rollback point - if txoutindex < starting_indexes.txoutindex { - let outputtype = txoutindex_to_outputtype_iter.get_unwrap(txoutindex); - - if outputtype.is_address() { - let addresstype = outputtype; - let addressindex = txoutindex_to_typeindex_iter.get_unwrap(txoutindex); - - self.addresstype_to_addressindex_and_txindex - .get_mut_unwrap(addresstype) - .remove(AddressIndexTxIndex::from((addressindex, txindex))); - - self.addresstype_to_addressindex_and_unspentoutpoint - .get_mut_unwrap(addresstype) - .insert(AddressIndexOutPoint::from((addressindex, outpoint)), Unit); - } - } - }); - } else { - unreachable!(); - } - - self.commit(starting_indexes.height.decremented().unwrap_or_default())?; - - Ok(()) - } -} diff --git a/crates/brk_store/Cargo.toml b/crates/brk_store/Cargo.toml index 615fd34ad..3552fc635 100644 --- a/crates/brk_store/Cargo.toml +++ b/crates/brk_store/Cargo.toml @@ -13,8 +13,6 @@ build = "build.rs" [dependencies] brk_error = { workspace = true } brk_types = { workspace = true } -byteview_f2 = { version = "=0.6.1", package = "byteview" } -byteview_f3 = { version = "0.9.1", package = "byteview" } -fjall2 = { workspace = true } -fjall3 = { workspace = true } +byteview = { workspace = true } +fjall = { workspace = true } rustc-hash = { workspace = true } diff --git a/crates/brk_store/src/any.rs b/crates/brk_store/src/any.rs index 587d3e259..e35ce79b6 100644 --- a/crates/brk_store/src/any.rs +++ b/crates/brk_store/src/any.rs @@ -1,5 +1,6 @@ use brk_error::Result; use brk_types::{Height, Version}; +use fjall::Keyspace; pub trait AnyStore: Send + Sync { fn name(&self) -> &'static str; @@ -8,10 +9,6 @@ pub trait AnyStore: Send + Sync { fn needs(&self, height: Height) -> bool; fn version(&self) -> Version; fn export_meta_if_needed(&mut self, height: Height) -> Result<()>; - fn keyspace(&self) -> &fjall3::Keyspace; - fn partition(&self) -> &fjall2::PartitionHandle; - fn take_all_f2(&mut self) -> Vec; - fn commit_f3(&mut self, height: Height) -> Result<()>; - // fn take_all_f3(&mut self) -> Vec; - // fn take_all_f3(&mut self) -> Box>; + fn keyspace(&self) -> &Keyspace; + fn commit(&mut self, height: Height) -> Result<()>; } diff --git a/crates/brk_store/src/fjall_v2/meta.rs b/crates/brk_store/src/fjall_v2/meta.rs deleted file mode 100644 index 8e5e91071..000000000 --- a/crates/brk_store/src/fjall_v2/meta.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::{ - fs, io, - path::{Path, PathBuf}, -}; - -use brk_error::Result; -use brk_types::Version; -use fjall2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle}; - -use super::Height; - -#[derive(Debug, Clone)] -pub struct StoreMeta { - pathbuf: PathBuf, - version: Version, - height: Option, -} - -impl StoreMeta { - pub fn checked_open( - keyspace: &TransactionalKeyspace, - path: &Path, - version: Version, - open_partition_handle: F, - ) -> Result<(Self, TransactionalPartitionHandle)> - where - F: Fn() -> Result, - { - fs::create_dir_all(path)?; - - let mut partition = open_partition_handle()?; - - if Version::try_from(Self::path_version_(path).as_path()) - .is_ok_and(|prev_version| version != prev_version) - { - fs::remove_dir_all(path)?; - keyspace.delete_partition(partition)?; - keyspace.persist(PersistMode::SyncAll)?; - fs::create_dir(path)?; - partition = open_partition_handle()?; - } - - let slf = Self { - pathbuf: path.to_owned(), - version, - height: Height::try_from(Self::path_height_(path).as_path()).ok(), - }; - - slf.version.write(&slf.path_version())?; - - Ok((slf, partition)) - } - - pub fn version(&self) -> Version { - self.version - } - - pub fn export(&mut self, height: Height) -> io::Result<()> { - self.height = Some(height); - height.write(&self.path_height()) - } - - pub fn path(&self) -> &Path { - &self.pathbuf - } - - fn path_version(&self) -> PathBuf { - Self::path_version_(&self.pathbuf) - } - fn path_version_(path: &Path) -> PathBuf { - path.join("version") - } - - #[inline] - pub fn height(&self) -> Option { - self.height - } - #[inline] - pub fn needs(&self, height: Height) -> bool { - self.height.is_none_or(|self_height| height > self_height) - } - #[inline] - pub fn has(&self, height: Height) -> bool { - !self.needs(height) - } - fn path_height(&self) -> PathBuf { - Self::path_height_(&self.pathbuf) - } - fn path_height_(path: &Path) -> PathBuf { - path.join("height") - } -} diff --git a/crates/brk_store/src/fjall_v2/mod.rs b/crates/brk_store/src/fjall_v2/mod.rs deleted file mode 100644 index 9e86db8e9..000000000 --- a/crates/brk_store/src/fjall_v2/mod.rs +++ /dev/null @@ -1,312 +0,0 @@ -use std::{borrow::Cow, cmp, fmt::Debug, fs, hash::Hash, mem, path::Path}; - -use brk_error::Result; -use brk_types::{Height, Version}; -use byteview_f2::ByteView; -use fjall2::{ - CompressionType, InnerItem, PartitionCreateOptions, TransactionalKeyspace, - TransactionalPartitionHandle, ValueType, -}; -use rustc_hash::{FxHashMap, FxHashSet}; - -use crate::any::AnyStore; - -mod meta; - -use meta::*; - -#[derive(Clone)] -pub struct StoreFjallV2 { - meta: StoreMeta, - name: &'static str, - keyspace: TransactionalKeyspace, - partition: TransactionalPartitionHandle, - puts: FxHashMap, - dels: FxHashSet, -} - -const MAJOR_FJALL_VERSION: Version = Version::TWO; - -pub fn open_keyspace(path: &Path) -> fjall2::Result { - fjall2::Config::new(path.join("fjall")) - .manual_journal_persist(true) - .max_write_buffer_size(256 * 1_024 * 1_024) - .open_transactional() -} - -impl StoreFjallV2 -where - K: Debug + Clone + From + Ord + Eq + Hash, - V: Debug + Clone + From, - ByteView: From + From, -{ - fn open_partition_handle( - keyspace: &TransactionalKeyspace, - name: &str, - mode: Mode, - compression: CompressionType, - ) -> Result { - let mut options = PartitionCreateOptions::default() - .compression(compression) - .manual_journal_persist(true); - - if mode.is_unique_push_only() { - options = options.bloom_filter_bits(Some(7)); - } else { - options = options - .max_memtable_size(8 * 1024 * 1024) - .bloom_filter_bits(None); - } - - keyspace.open_partition(name, options).map_err(|e| e.into()) - } - - pub fn import( - keyspace: &TransactionalKeyspace, - path: &Path, - name: &str, - version: Version, - mode: Mode, - compression: CompressionType, - ) -> Result { - fs::create_dir_all(path)?; - - let (meta, partition) = StoreMeta::checked_open( - keyspace, - &path.join(format!("meta/{name}")), - MAJOR_FJALL_VERSION + version, - || { - Self::open_partition_handle(keyspace, name, mode, compression).inspect_err(|e| { - eprintln!("{e}"); - eprintln!("Delete {path:?} and try again"); - }) - }, - )?; - - Ok(Self { - meta, - name: Box::leak(Box::new(name.to_string())), - keyspace: keyspace.clone(), - partition, - puts: FxHashMap::default(), - dels: FxHashSet::default(), - }) - } - - #[inline] - pub fn get<'a>(&'a self, key: &'a K) -> Result>> - where - ByteView: From<&'a K>, - { - if let Some(v) = self.puts.get(key) { - Ok(Some(Cow::Borrowed(v))) - } else if let Some(slice) = self.partition.get(ByteView::from(key))? { - Ok(Some(Cow::Owned(V::from(ByteView::from(&*slice))))) - } else { - Ok(None) - } - } - - pub fn is_empty(&self) -> Result { - self.keyspace - .read_tx() - .is_empty(&self.partition) - .map_err(|e| e.into()) - } - - pub fn iter(&self) -> impl Iterator { - self.keyspace - .read_tx() - .iter(&self.partition) - .map(|res| res.unwrap()) - .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) - } - - #[inline] - pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { - if self.needs(height) { - self.insert(key, value); - } - } - - #[inline] - pub fn insert(&mut self, key: K, value: V) { - let _ = self.dels.is_empty() || self.dels.remove(&key); - self.puts.insert(key, value); - } - - #[inline] - pub fn remove(&mut self, key: K) { - if self.puts.remove(&key).is_some() { - return; - } - - let newly_inserted = self.dels.insert(key); - debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path()); - } - - #[inline] - pub fn remove_if_needed(&mut self, key: K, height: Height) { - if self.needs(height) { - self.remove(key) - } - } - - #[inline] - pub fn approximate_len(&self) -> usize { - self.partition.approximate_len() - } - - #[inline] - fn has(&self, height: Height) -> bool { - self.meta.has(height) - } - - #[inline] - fn needs(&self, height: Height) -> bool { - self.meta.needs(height) - } -} - -impl AnyStore for StoreFjallV2 -where - K: Debug + Clone + From + Ord + Eq + Hash + 'static, - V: Debug + Clone + From + 'static, - ByteView: From + From, - Self: Send + Sync, -{ - fn keyspace(&self) -> &fjall3::Keyspace { - panic!() - } - - fn partition(&self) -> &fjall2::PartitionHandle { - self.partition.inner() - } - - fn take_all_f2(&mut self) -> Vec { - let mut items = mem::take(&mut self.puts) - .into_iter() - .map(|(key, value)| Item::Value { key, value }) - .chain( - mem::take(&mut self.dels) - .into_iter() - .map(|key| Item::Tomb(key)), - ) - .collect::>(); - items.sort_unstable(); - items.into_iter().map(InnerItem::from).collect() - } - - // fn take_all_f3(&mut self) -> Vec { - // panic!() - // } - - fn export_meta_if_needed(&mut self, height: Height) -> Result<()> { - if self.has(height) { - return Ok(()); - } - self.meta.export(height)?; - Ok(()) - } - - fn name(&self) -> &'static str { - self.name - } - - fn height(&self) -> Option { - self.meta.height() - } - - fn has(&self, height: Height) -> bool { - self.has(height) - } - - fn needs(&self, height: Height) -> bool { - self.needs(height) - } - - fn version(&self) -> Version { - self.meta.version() - } - - fn commit_f3(&mut self, _: Height) -> Result<()> { - Ok(()) - } -} - -enum Item { - Value { key: K, value: V }, - Tomb(K), -} - -impl Ord for Item { - fn cmp(&self, other: &Self) -> cmp::Ordering { - self.key().cmp(other.key()) - } -} - -impl PartialOrd for Item { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for Item { - fn eq(&self, other: &Self) -> bool { - self.key() == other.key() - } -} - -impl Eq for Item {} - -impl Item { - fn key(&self) -> &K { - match self { - Self::Value { key, .. } | Self::Tomb(key) => key, - } - } -} - -impl From> for InnerItem -where - K: Into, - V: Into, -{ - #[inline] - fn from(value: Item) -> Self { - match value { - Item::Value { key, value } => Self { - key: key.into().into(), - value: value.into().into(), - value_type: ValueType::Value, - }, - Item::Tomb(key) => Self { - key: key.into().into(), - value: [].into(), - value_type: ValueType::Tombstone, - }, - } - } -} - -#[derive(Debug, Clone, Copy)] -pub enum Mode { - VecLike, - UniquePushOnly(Type), -} - -#[derive(Debug, Clone, Copy)] -pub enum Type { - Random, - Sequential, -} - -impl Mode { - pub fn is_vec_like(&self) -> bool { - matches!(*self, Self::VecLike) - } - - pub fn is_unique_push_only(&self) -> bool { - matches!(*self, Self::UniquePushOnly(_)) - } -} diff --git a/crates/brk_store/src/fjall_v3/mod.rs b/crates/brk_store/src/fjall_v3/mod.rs deleted file mode 100644 index a90e62f8a..000000000 --- a/crates/brk_store/src/fjall_v3/mod.rs +++ /dev/null @@ -1,380 +0,0 @@ -use std::{borrow::Cow, cmp::Ordering, fmt::Debug, fs, hash::Hash, mem, path::Path}; - -use brk_error::Result; -use brk_types::{Height, Version}; -use byteview_f3::ByteView; -use fjall3::{Database, Keyspace, KeyspaceCreateOptions, config::*}; -use rustc_hash::{FxHashMap, FxHashSet}; - -mod meta; -use meta::*; - -use crate::any::AnyStore; - -const MAJOR_FJALL_VERSION: Version = Version::new(3); - -pub fn open_fjall3_database(path: &Path) -> fjall3::Result { - Database::builder(path.join("fjall")) - .cache_size(2 * 1024 * 1024 * 1024) - .open() -} - -#[derive(Clone)] -pub struct StoreFjallV3 { - meta: StoreMeta, - name: &'static str, - keyspace: Keyspace, - puts: FxHashMap, - dels: FxHashSet, - caches: Vec>, -} - -impl StoreFjallV3 -where - K: Debug + Clone + From + Ord + Eq + Hash, - V: Debug + Clone + From, - ByteView: From + From, - Self: Send + Sync, -{ - pub fn import( - db: &Database, - path: &Path, - name: &str, - version: Version, - mode: Mode3, - kind: Kind3, - ) -> Result { - Self::import_inner(db, path, name, version, mode, kind, 0) - } - - pub fn import_cached( - db: &Database, - path: &Path, - name: &str, - version: Version, - mode: Mode3, - kind: Kind3, - max_batches: u8, - ) -> Result { - Self::import_inner(db, path, name, version, mode, kind, max_batches) - } - - fn import_inner( - db: &Database, - path: &Path, - name: &str, - version: Version, - mode: Mode3, - kind: Kind3, - max_batches: u8, - ) -> Result { - fs::create_dir_all(path)?; - - let (meta, keyspace) = StoreMeta::checked_open( - db, - &path.join(format!("meta/{name}")), - MAJOR_FJALL_VERSION + version, - || { - Self::open_keyspace(db, name, mode, kind).inspect_err(|e| { - eprintln!("{e}"); - eprintln!("Delete {path:?} and try again"); - }) - }, - )?; - - let mut caches = vec![]; - for _ in 0..max_batches { - caches.push(FxHashMap::default()); - } - - Ok(Self { - meta, - name: Box::leak(Box::new(name.to_string())), - keyspace, - puts: FxHashMap::default(), - dels: FxHashSet::default(), - caches, - }) - } - - fn open_keyspace( - database: &Database, - name: &str, - _mode: Mode3, - kind: Kind3, - ) -> Result { - let mut options = KeyspaceCreateOptions::default() - .manual_journal_persist(true) - .expect_point_read_hits(true) - .filter_block_partitioning_policy(PartitioningPolicy::new([false, false, true])) - .index_block_partitioning_policy(PartitioningPolicy::new([false, false, true])); - - if kind.is_not_vec() { - options = options.filter_policy(FilterPolicy::new([ - FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)), - FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)), - FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(8.0)), - FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(7.0)), - ])); - } else { - options = options - .max_memtable_size(8 * 1024 * 1024) - .filter_policy(FilterPolicy::disabled()); - } - - if kind.is_sequential() { - options = options - .filter_block_partitioning_policy(PartitioningPolicy::all(true)) - .index_block_partitioning_policy(PartitioningPolicy::all(true)) - .filter_block_pinning_policy(PinningPolicy::all(false)) - .index_block_pinning_policy(PinningPolicy::all(false)); - } - - database.keyspace(name, || options).map_err(|e| e.into()) - } - - #[inline] - pub fn get<'a>(&'a self, key: &'a K) -> Result>> - where - ByteView: From<&'a K>, - { - if let Some(v) = self.puts.get(key) { - return Ok(Some(Cow::Borrowed(v))); - } - - for cache in &self.caches { - if let Some(v) = cache.get(key) { - return Ok(Some(Cow::Borrowed(v))); - } - } - - if let Some(slice) = self.keyspace.get(ByteView::from(key))? { - Ok(Some(Cow::Owned(V::from(ByteView::from(slice))))) - } else { - Ok(None) - } - } - - #[inline] - pub fn is_empty(&self) -> Result { - self.keyspace.is_empty().map_err(|e| e.into()) - } - - #[inline] - pub fn insert(&mut self, key: K, value: V) { - let _ = self.dels.is_empty() || self.dels.remove(&key); - self.puts.insert(key, value); - } - - #[inline] - pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { - if self.needs(height) { - self.insert(key, value); - } - } - - #[inline] - pub fn remove(&mut self, key: K) { - if self.puts.remove(&key).is_some() { - return; - } - let newly_inserted = self.dels.insert(key); - debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path()); - } - - #[inline] - pub fn remove_if_needed(&mut self, key: K, height: Height) { - if self.needs(height) { - self.remove(key) - } - } - - #[inline] - pub fn iter(&self) -> impl Iterator { - self.keyspace - .iter() - .map(|res| res.into_inner().unwrap()) - .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) - } - - #[inline] - fn has(&self, height: Height) -> bool { - self.meta.has(height) - } - - #[inline] - pub fn needs(&self, height: Height) -> bool { - self.meta.needs(height) - } - - fn export_meta_if_needed(&mut self, height: Height) -> Result<()> { - if !self.has(height) { - self.meta.export(height)?; - } - Ok(()) - } - - fn ingest<'a>( - keyspace: &Keyspace, - puts: impl Iterator, - dels: impl Iterator, - ) -> Result<()> - where - ByteView: From<&'a K> + From<&'a V>, - K: 'a, - V: 'a, - { - let mut items: Vec> = puts - .map(|(key, value)| Item::Value { key, value }) - .chain(dels.map(Item::Tomb)) - .collect(); - - items.sort_unstable(); - - let mut ingestion = keyspace.start_ingestion()?; - for item in items { - match item { - Item::Value { key, value } => { - ingestion.write(ByteView::from(key), ByteView::from(value))?; - } - Item::Tomb(key) => { - ingestion.write_tombstone(ByteView::from(key))?; - } - } - } - ingestion.finish()?; - - Ok(()) - } -} - -impl AnyStore for StoreFjallV3 -where - K: Debug + Clone + From + Ord + Eq + Hash, - V: Debug + Clone + From, - for<'a> ByteView: From + From + From<&'a K> + From<&'a V>, - Self: Send + Sync, -{ - fn keyspace(&self) -> &Keyspace { - &self.keyspace - } - - fn take_all_f2(&mut self) -> Vec { - vec![] - } - - fn partition(&self) -> &fjall2::PartitionHandle { - panic!() - } - - fn export_meta_if_needed(&mut self, height: Height) -> Result<()> { - self.export_meta_if_needed(height) - } - - fn name(&self) -> &'static str { - self.name - } - - fn height(&self) -> Option { - self.meta.height() - } - - fn has(&self, height: Height) -> bool { - self.has(height) - } - - fn needs(&self, height: Height) -> bool { - self.needs(height) - } - - fn version(&self) -> Version { - self.meta.version() - } - - fn commit_f3(&mut self, height: Height) -> Result<()> { - self.export_meta_if_needed(height)?; - - let puts = mem::take(&mut self.puts); - let dels = mem::take(&mut self.dels); - - if puts.is_empty() && dels.is_empty() { - return Ok(()); - } - - Self::ingest(&self.keyspace, puts.iter(), dels.iter())?; - - if !self.caches.is_empty() { - self.caches.pop(); - self.caches.insert(0, puts); - } - - Ok(()) - } -} - -enum Item { - Value { key: K, value: V }, - Tomb(K), -} -impl Item { - #[inline] - fn key(&self) -> &K { - match self { - Self::Value { key, .. } | Self::Tomb(key) => key, - } - } -} -impl Ord for Item { - #[inline] - fn cmp(&self, other: &Self) -> Ordering { - self.key().cmp(other.key()) - } -} -impl PartialOrd for Item { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} -impl PartialEq for Item { - #[inline] - fn eq(&self, other: &Self) -> bool { - self.key() == other.key() - } -} -impl Eq for Item {} - -#[derive(Debug, Clone, Copy)] -pub enum Mode3 { - Any, - PushOnly, -} -impl Mode3 { - pub fn is_any(&self) -> bool { - matches!(*self, Self::Any) - } - - pub fn is_push_only(&self) -> bool { - matches!(*self, Self::PushOnly) - } -} - -#[derive(Debug, Clone, Copy)] -pub enum Kind3 { - Random, - Sequential, - Vec, -} -impl Kind3 { - pub fn is_sequential(&self) -> bool { - matches!(*self, Self::Sequential) - } - - pub fn is_random(&self) -> bool { - matches!(*self, Self::Random) - } - - pub fn is_not_vec(&self) -> bool { - !matches!(*self, Self::Vec) - } -} diff --git a/crates/brk_store/src/item.rs b/crates/brk_store/src/item.rs new file mode 100644 index 000000000..f57b8d2de --- /dev/null +++ b/crates/brk_store/src/item.rs @@ -0,0 +1,38 @@ +use std::cmp::Ordering; + +pub enum Item { + Value { key: K, value: V }, + Tomb(K), +} + +impl Item { + #[inline] + fn key(&self) -> &K { + match self { + Self::Value { key, .. } | Self::Tomb(key) => key, + } + } +} + +impl Ord for Item { + #[inline] + fn cmp(&self, other: &Self) -> Ordering { + self.key().cmp(other.key()) + } +} + +impl PartialOrd for Item { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for Item { + #[inline] + fn eq(&self, other: &Self) -> bool { + self.key() == other.key() + } +} + +impl Eq for Item {} diff --git a/crates/brk_store/src/kind.rs b/crates/brk_store/src/kind.rs new file mode 100644 index 000000000..85aa67bbe --- /dev/null +++ b/crates/brk_store/src/kind.rs @@ -0,0 +1,25 @@ +#[derive(Debug, Clone, Copy)] +pub enum Kind { + Recent, + Random, + Sequential, + Vec, +} + +impl Kind { + pub fn is_sequential(&self) -> bool { + matches!(*self, Self::Sequential) + } + + pub fn is_recent(&self) -> bool { + matches!(*self, Self::Recent) + } + + pub fn is_random(&self) -> bool { + matches!(*self, Self::Random) + } + + pub fn is_vec(&self) -> bool { + matches!(*self, Self::Vec) + } +} diff --git a/crates/brk_store/src/lib.rs b/crates/brk_store/src/lib.rs index 83c3c4af0..e6650de5f 100644 --- a/crates/brk_store/src/lib.rs +++ b/crates/brk_store/src/lib.rs @@ -1,9 +1,328 @@ #![doc = include_str!("../README.md")] +use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, path::Path}; + +use brk_error::Result; +use brk_types::{Height, Version}; +use byteview::ByteView; +use fjall::{Database, Keyspace, KeyspaceCreateOptions, config::*}; +use rustc_hash::{FxHashMap, FxHashSet}; + mod any; -mod fjall_v2; -mod fjall_v3; +mod item; +mod kind; +mod meta; +mod mode; pub use any::*; -pub use fjall_v2::*; -pub use fjall_v3::*; +pub use item::*; +pub use kind::*; +pub use meta::*; +pub use mode::*; + +const MAJOR_FJALL_VERSION: Version = Version::new(3); + +pub fn open_database(path: &Path) -> fjall::Result { + Database::builder(path.join("fjall")) + .cache_size(3 * 1024 * 1024 * 1024) + .open() +} + +#[derive(Clone)] +pub struct Store { + meta: StoreMeta, + name: &'static str, + keyspace: Keyspace, + puts: FxHashMap, + dels: FxHashSet, + caches: Vec>, +} + +impl Store +where + K: Debug + Clone + From + Ord + Eq + Hash, + V: Debug + Clone + From, + ByteView: From + From, + Self: Send + Sync, +{ + pub fn import( + db: &Database, + path: &Path, + name: &str, + version: Version, + mode: Mode, + kind: Kind, + ) -> Result { + Self::import_inner(db, path, name, version, mode, kind, 0) + } + + pub fn import_cached( + db: &Database, + path: &Path, + name: &str, + version: Version, + mode: Mode, + kind: Kind, + max_batches: u8, + ) -> Result { + Self::import_inner(db, path, name, version, mode, kind, max_batches) + } + + fn import_inner( + db: &Database, + path: &Path, + name: &str, + version: Version, + mode: Mode, + kind: Kind, + max_batches: u8, + ) -> Result { + fs::create_dir_all(path)?; + + let (meta, keyspace) = StoreMeta::checked_open( + db, + &path.join(format!("meta/{name}")), + MAJOR_FJALL_VERSION + version, + || { + Self::open_keyspace(db, name, mode, kind).inspect_err(|e| { + eprintln!("{e}"); + eprintln!("Delete {path:?} and try again"); + }) + }, + )?; + + let mut caches = vec![]; + for _ in 0..max_batches { + caches.push(FxHashMap::default()); + } + + Ok(Self { + meta, + name: Box::leak(Box::new(name.to_string())), + keyspace, + puts: FxHashMap::default(), + dels: FxHashSet::default(), + caches, + }) + } + + fn open_keyspace(database: &Database, name: &str, _mode: Mode, kind: Kind) -> Result { + let mut options = KeyspaceCreateOptions::default() + .manual_journal_persist(true) + .filter_block_partitioning_policy(PartitioningPolicy::new([false, false, true])) + .index_block_partitioning_policy(PartitioningPolicy::new([false, false, true])); + + match kind { + Kind::Random => { + options = options + .filter_block_pinning_policy(PinningPolicy::new([true, true, true, false])) + .filter_policy(FilterPolicy::new([ + FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate( + 0.0001, + )), + FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)), + FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)), + FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(9.0)), + ])); + } + Kind::Recent => { + options = options + .expect_point_read_hits(true) + .filter_policy(FilterPolicy::new([ + FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate( + 0.0001, + )), + FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)), + FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(8.0)), + FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(7.0)), + ])); + } + Kind::Sequential => { + options = options + .filter_block_partitioning_policy(PartitioningPolicy::all(true)) + .index_block_partitioning_policy(PartitioningPolicy::all(true)) + .filter_block_pinning_policy(PinningPolicy::all(false)) + .index_block_pinning_policy(PinningPolicy::all(false)); + } + Kind::Vec => { + options = options + .max_memtable_size(8 * 1024 * 1024) + .filter_policy(FilterPolicy::disabled()) + .filter_block_pinning_policy(PinningPolicy::all(false)) + .index_block_pinning_policy(PinningPolicy::all(false)); + } + } + + database.keyspace(name, || options).map_err(|e| e.into()) + } + + #[inline] + pub fn get<'a>(&'a self, key: &'a K) -> Result>> + where + ByteView: From<&'a K>, + { + if let Some(v) = self.puts.get(key) { + return Ok(Some(Cow::Borrowed(v))); + } + + for cache in &self.caches { + if let Some(v) = cache.get(key) { + return Ok(Some(Cow::Borrowed(v))); + } + } + + if let Some(slice) = self.keyspace.get(ByteView::from(key))? { + Ok(Some(Cow::Owned(V::from(ByteView::from(slice))))) + } else { + Ok(None) + } + } + + #[inline] + pub fn is_empty(&self) -> Result { + self.keyspace.is_empty().map_err(|e| e.into()) + } + + #[inline] + pub fn insert(&mut self, key: K, value: V) { + let _ = self.dels.is_empty() || self.dels.remove(&key); + self.puts.insert(key, value); + } + + #[inline] + pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { + if self.needs(height) { + self.insert(key, value); + } + } + + #[inline] + pub fn remove(&mut self, key: K) { + if self.puts.remove(&key).is_some() { + return; + } + let newly_inserted = self.dels.insert(key); + debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path()); + } + + #[inline] + pub fn remove_if_needed(&mut self, key: K, height: Height) { + if self.needs(height) { + self.remove(key) + } + } + + #[inline] + pub fn iter(&self) -> impl Iterator { + self.keyspace + .iter() + .map(|res| res.into_inner().unwrap()) + .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) + } + + #[inline] + fn has(&self, height: Height) -> bool { + self.meta.has(height) + } + + #[inline] + pub fn needs(&self, height: Height) -> bool { + self.meta.needs(height) + } + + fn export_meta_if_needed(&mut self, height: Height) -> Result<()> { + if !self.has(height) { + self.meta.export(height)?; + } + Ok(()) + } + + fn ingest<'a>( + keyspace: &Keyspace, + puts: impl Iterator, + dels: impl Iterator, + ) -> Result<()> + where + ByteView: From<&'a K> + From<&'a V>, + K: 'a, + V: 'a, + { + let mut items: Vec> = puts + .map(|(key, value)| Item::Value { key, value }) + .chain(dels.map(Item::Tomb)) + .collect(); + + items.sort_unstable(); + + let mut ingestion = keyspace.start_ingestion()?; + for item in items { + match item { + Item::Value { key, value } => { + ingestion.write(ByteView::from(key), ByteView::from(value))?; + } + Item::Tomb(key) => { + ingestion.write_tombstone(ByteView::from(key))?; + } + } + } + ingestion.finish()?; + + Ok(()) + } +} + +impl AnyStore for Store +where + K: Debug + Clone + From + Ord + Eq + Hash, + V: Debug + Clone + From, + for<'a> ByteView: From + From + From<&'a K> + From<&'a V>, + Self: Send + Sync, +{ + fn keyspace(&self) -> &Keyspace { + &self.keyspace + } + + fn export_meta_if_needed(&mut self, height: Height) -> Result<()> { + self.export_meta_if_needed(height) + } + + fn name(&self) -> &'static str { + self.name + } + + fn height(&self) -> Option { + self.meta.height() + } + + fn has(&self, height: Height) -> bool { + self.has(height) + } + + fn needs(&self, height: Height) -> bool { + self.needs(height) + } + + fn version(&self) -> Version { + self.meta.version() + } + + fn commit(&mut self, height: Height) -> Result<()> { + self.export_meta_if_needed(height)?; + + let puts = mem::take(&mut self.puts); + let dels = mem::take(&mut self.dels); + + if puts.is_empty() && dels.is_empty() { + return Ok(()); + } + + Self::ingest(&self.keyspace, puts.iter(), dels.iter())?; + + if !self.caches.is_empty() { + self.caches.pop(); + self.caches.insert(0, puts); + } + + Ok(()) + } +} diff --git a/crates/brk_store/src/fjall_v3/meta.rs b/crates/brk_store/src/meta.rs similarity index 98% rename from crates/brk_store/src/fjall_v3/meta.rs rename to crates/brk_store/src/meta.rs index 843913bfb..58bab452c 100644 --- a/crates/brk_store/src/fjall_v3/meta.rs +++ b/crates/brk_store/src/meta.rs @@ -5,7 +5,7 @@ use std::{ use brk_error::Result; use brk_types::Version; -use fjall3::{Database, Keyspace}; +use fjall::{Database, Keyspace}; use super::Height; diff --git a/crates/brk_store/src/mode.rs b/crates/brk_store/src/mode.rs new file mode 100644 index 000000000..04a3cc2d2 --- /dev/null +++ b/crates/brk_store/src/mode.rs @@ -0,0 +1,15 @@ +#[derive(Debug, Clone, Copy)] +pub enum Mode { + Any, + PushOnly, +} + +impl Mode { + pub fn is_any(&self) -> bool { + matches!(*self, Self::Any) + } + + pub fn is_push_only(&self) -> bool { + matches!(*self, Self::PushOnly) + } +}