From 33a92cfad41a50bb3156de2659f5ec77fb0d7b3d Mon Sep 17 00:00:00 2001 From: nym21 Date: Mon, 20 Oct 2025 11:33:48 +0200 Subject: [PATCH] store: faster everything --- crates/brk_indexer/src/stores_v2.rs | 27 ++++--- crates/brk_store/examples/main.rs | 4 +- crates/brk_store/src/any.rs | 2 +- crates/brk_store/src/lib.rs | 4 +- crates/brk_store/src/v2/meta.rs | 12 +-- crates/brk_store/src/v2/mod.rs | 120 ++++++---------------------- crates/brk_store/src/v3/mod.rs | 24 +++--- 7 files changed, 62 insertions(+), 131 deletions(-) diff --git a/crates/brk_indexer/src/stores_v2.rs b/crates/brk_indexer/src/stores_v2.rs index 59d967199..c0c589a39 100644 --- a/crates/brk_indexer/src/stores_v2.rs +++ b/crates/brk_indexer/src/stores_v2.rs @@ -7,7 +7,7 @@ use brk_structs::{ AddressBytes, AddressBytesHash, BlockHashPrefix, Height, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version, }; -use fjall2::{PersistMode, TransactionalKeyspace}; +use fjall2::{Keyspace, PersistMode}; use rayon::prelude::*; use vecdb::{AnyVec, StoredIndex, VecIterator}; @@ -17,7 +17,7 @@ use super::Vecs; #[derive(Clone)] pub struct Stores { - pub keyspace: TransactionalKeyspace, + pub keyspace: Keyspace, pub addressbyteshash_to_typeindex: Store, pub blockhashprefix_to_height: Store, @@ -361,8 +361,9 @@ impl Stores { } } } else { - self.blockhashprefix_to_height.reset()?; - self.addressbyteshash_to_typeindex.reset()?; + unreachable!(); + // self.blockhashprefix_to_height.reset()?; + // self.addressbyteshash_to_typeindex.reset()?; } if starting_indexes.txindex != TxIndex::ZERO { @@ -384,11 +385,12 @@ impl Stores { } }); } else { - self.txidprefix_to_txindex.reset()?; + unreachable!(); + // self.txidprefix_to_txindex.reset()?; } if starting_indexes.txoutindex != TxOutIndex::ZERO { - // todo!(); + todo!(); // let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.into_iter(); // vecs.txoutindex_to_outputtype // .iter_at(starting_indexes.txoutindex) @@ -404,12 +406,13 @@ impl Stores { // .remove(TypeIndexAndTxIndex::from((typeindex, txoutindex))); // }); } else { - self.addresstype_to_typeindex_and_txindex - .iter_mut() - .try_for_each(|s| s.reset())?; - self.addresstype_to_typeindex_and_unspentoutpoint - .iter_mut() - .try_for_each(|s| s.reset())?; + unreachable!(); + // self.addresstype_to_typeindex_and_txindex + // .iter_mut() + // .try_for_each(|s| s.reset())?; + // self.addresstype_to_typeindex_and_unspentoutpoint + // .iter_mut() + // .try_for_each(|s| s.reset())?; } self.commit(starting_indexes.height.decremented().unwrap_or_default())?; diff --git a/crates/brk_store/examples/main.rs b/crates/brk_store/examples/main.rs index 96b4fcb3b..391a43fbc 100644 --- a/crates/brk_store/examples/main.rs +++ b/crates/brk_store/examples/main.rs @@ -1,9 +1,9 @@ -use std::path::Path; +// use std::path::Path; use brk_error::Result; fn main() -> Result<()> { - let p = Path::new("./examples/_fjall"); + // let p = Path::new("./examples/_fjall"); // let _keyspace = brk_store::open_keyspace(p)?; diff --git a/crates/brk_store/src/any.rs b/crates/brk_store/src/any.rs index f4eff3c87..91c1128bf 100644 --- a/crates/brk_store/src/any.rs +++ b/crates/brk_store/src/any.rs @@ -4,7 +4,7 @@ use brk_structs::{Height, Version}; pub trait AnyStore: Send + Sync { fn commit(&mut self, height: Height) -> Result<()>; fn persist(&self) -> Result<()>; - fn reset(&mut self) -> Result<()>; + // fn reset(&mut self) -> Result<()>; fn name(&self) -> &'static str; fn height(&self) -> Option; fn has(&self, height: Height) -> bool; diff --git a/crates/brk_store/src/lib.rs b/crates/brk_store/src/lib.rs index e475ffef3..e3f9977d2 100644 --- a/crates/brk_store/src/lib.rs +++ b/crates/brk_store/src/lib.rs @@ -2,8 +2,8 @@ mod any; mod v2; -mod v3; +// mod v3; pub use any::*; pub use v2::*; -pub use v3::*; +// pub use v3::*; diff --git a/crates/brk_store/src/v2/meta.rs b/crates/brk_store/src/v2/meta.rs index 04f058788..e0e0b483b 100644 --- a/crates/brk_store/src/v2/meta.rs +++ b/crates/brk_store/src/v2/meta.rs @@ -5,7 +5,7 @@ use std::{ use brk_error::Result; use brk_structs::Version; -use fjall2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle}; +use fjall2::{Keyspace, PartitionHandle, PersistMode}; use super::Height; @@ -18,13 +18,13 @@ pub struct StoreMeta { impl StoreMeta { pub fn checked_open( - keyspace: &TransactionalKeyspace, + keyspace: &Keyspace, path: &Path, version: Version, open_partition_handle: F, - ) -> Result<(Self, TransactionalPartitionHandle)> + ) -> Result<(Self, PartitionHandle)> where - F: Fn() -> Result, + F: Fn() -> Result, { fs::create_dir_all(path)?; @@ -60,10 +60,6 @@ impl StoreMeta { height.write(&self.path_height()) } - pub fn reset(&mut self) { - self.height.take(); - } - pub fn path(&self) -> &Path { &self.pathbuf } diff --git a/crates/brk_store/src/v2/mod.rs b/crates/brk_store/src/v2/mod.rs index cc3c57f47..ea49c04f7 100644 --- a/crates/brk_store/src/v2/mod.rs +++ b/crates/brk_store/src/v2/mod.rs @@ -1,42 +1,35 @@ -use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, path::Path, sync::Arc}; +use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path}; use brk_error::Result; use brk_structs::{Height, Version}; use byteview6::ByteView; -use fjall2::{ - PartitionCreateOptions, PersistMode, ReadTransaction, TransactionalKeyspace, - TransactionalPartitionHandle, -}; -use parking_lot::RwLock; +use fjall2::{InnerItem, Keyspace, PartitionCreateOptions, PartitionHandle, PersistMode}; use rustc_hash::{FxHashMap, FxHashSet}; use crate::any::AnyStore; mod meta; -use log::info; use meta::*; #[derive(Clone)] pub struct StoreV2 { meta: StoreMeta, name: &'static str, - keyspace: TransactionalKeyspace, - partition: Arc>>, - rtx: Arc>>, + keyspace: Keyspace, + // no, make this faster + // no ! remove it altogether, reset too + partition: PartitionHandle, puts: FxHashMap, dels: FxHashSet, - bloom_filters: Option, } -// const CHECK_COLLISIONS: bool = true; const MAJOR_FJALL_VERSION: Version = Version::TWO; -pub fn open_keyspace(path: &Path) -> fjall2::Result { +pub fn open_keyspace(path: &Path) -> fjall2::Result { fjall2::Config::new(path.join("fjall")) - // .cache_size(1024 * 1024 * 1024) // for tests only .max_write_buffer_size(32 * 1024 * 1024) - .open_transactional() + .open() } impl StoreV2 @@ -46,12 +39,11 @@ where ByteView: From + From, { fn open_partition_handle( - keyspace: &TransactionalKeyspace, + keyspace: &Keyspace, name: &str, bloom_filters: Option, - ) -> Result { + ) -> Result { let mut options = PartitionCreateOptions::default() - // .max_memtable_size(64 * 1024 * 1024) // for tests only .max_memtable_size(8 * 1024 * 1024) .manual_journal_persist(true); @@ -63,7 +55,7 @@ where } pub fn import( - keyspace: &TransactionalKeyspace, + keyspace: &Keyspace, path: &Path, name: &str, version: Version, @@ -83,17 +75,13 @@ where }, )?; - let rtx = keyspace.read_tx(); - Ok(Self { meta, name: Box::leak(Box::new(name.to_string())), keyspace: keyspace.clone(), - partition: Arc::new(RwLock::new(Some(partition))), - rtx: Arc::new(RwLock::new(Some(rtx))), + partition, puts: FxHashMap::default(), dels: FxHashSet::default(), - bloom_filters, }) } @@ -104,13 +92,7 @@ where { if let Some(v) = self.puts.get(key) { Ok(Some(Cow::Borrowed(v))) - } else if let Some(slice) = self - .rtx - .read() - .as_ref() - .unwrap() - .get(self.partition.read().as_ref().unwrap(), ByteView::from(key))? - { + } else if let Some(slice) = self.partition.get(ByteView::from(key))? { Ok(Some(Cow::Owned(V::from(ByteView::from(&*slice))))) } else { Ok(None) @@ -118,20 +100,12 @@ where } pub fn is_empty(&self) -> Result { - self.rtx - .read() - .as_ref() - .unwrap() - .is_empty(self.partition.read().as_ref().unwrap()) - .map_err(|e| e.into()) + self.partition.is_empty().map_err(|e| e.into()) } pub fn iter(&self) -> impl Iterator { - self.rtx - .read() - .as_ref() - .unwrap() - .iter(self.partition.read().as_ref().unwrap()) + self.partition + .iter() .map(|res| res.unwrap()) .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) } @@ -146,7 +120,6 @@ where #[inline] pub fn remove(&mut self, key: K) { - // Hot path: key was recently inserted if self.puts.remove(&key).is_some() { return; } @@ -162,19 +135,6 @@ where } } - // pub fn retain_or_del(&mut self, retain: F) - // where - // F: Fn(&K, &mut V) -> bool, - // { - // self.puts.retain(|k, v| { - // let ret = retain(k, v); - // if !ret { - // self.dels.insert(k.clone()); - // } - // ret - // }); - // } - #[inline] fn has(&self, height: Height) -> bool { self.meta.has(height) @@ -204,27 +164,17 @@ where return Ok(()); } - let mut rtx = self.rtx.write(); - let _ = rtx.take(); + let mut items = self + .puts + .drain() + .map(|(key, value)| InnerItem::Value { key, value }) + .chain(self.dels.drain().map(|key| InnerItem::WeakTombstone(key))) + .collect::>(); + items.sort_unstable(); - let mut wtx = self.keyspace.write_tx(); - - let partition = self.partition.read(); - - let partition = partition.as_ref().unwrap(); - - wtx.remove_batch(partition, self.dels.drain().map(ByteView::from)); - - wtx.insert_batch( - partition, - self.puts - .drain() - .map(|(k, v)| (ByteView::from(k), ByteView::from(v))), - ); - - wtx.commit()?; - - rtx.replace(self.keyspace.read_tx()); + self.keyspace + .batch() + .commit_single_partition(&self.partition, items)?; Ok(()) } @@ -239,24 +189,6 @@ where self.name } - fn reset(&mut self) -> Result<()> { - info!("Resetting {}...", self.name); - - let mut opt = self.partition.write(); - - let partition = opt.take().unwrap(); - - self.keyspace.delete_partition(partition)?; - - self.meta.reset(); - - let partition = Self::open_partition_handle(&self.keyspace, self.name, self.bloom_filters)?; - - opt.replace(partition); - - Ok(()) - } - fn height(&self) -> Option { self.meta.height() } diff --git a/crates/brk_store/src/v3/mod.rs b/crates/brk_store/src/v3/mod.rs index d3ce8b78e..ae33c7cb3 100644 --- a/crates/brk_store/src/v3/mod.rs +++ b/crates/brk_store/src/v3/mod.rs @@ -217,26 +217,26 @@ where self.name } - fn reset(&mut self) -> Result<()> { - info!("Resetting {}...", self.name); + // fn reset(&mut self) -> Result<()> { + // info!("Resetting {}...", self.name); - todo!(); + // todo!(); - let mut opt = self.keyspace.write(); + // let mut opt = self.keyspace.write(); - let keyspace = opt.take().unwrap(); + // let keyspace = opt.take().unwrap(); - // Doesn't exist yet - // self.database.remove_keyspace(keyspace)?; + // // Doesn't exist yet + // // self.database.remove_keyspace(keyspace)?; - self.meta.reset(); + // self.meta.reset(); - let keyspace = Self::open_keyspace(&self.database, self.name)?; + // let keyspace = Self::open_keyspace(&self.database, self.name)?; - opt.replace(keyspace); + // opt.replace(keyspace); - Ok(()) - } + // Ok(()) + // } fn height(&self) -> Option { self.meta.height()