diff --git a/crates/brk_indexer/examples/indexer_bench2.rs b/crates/brk_indexer/examples/indexer_bench2.rs index 46bc3cb7e..e95df4d29 100644 --- a/crates/brk_indexer/examples/indexer_bench2.rs +++ b/crates/brk_indexer/examples/indexer_bench2.rs @@ -55,7 +55,7 @@ fn main() -> Result<()> { loop { let i = Instant::now(); - indexer.checked_index(&blocks, &client, &exit)?; + indexer.index(&blocks, &client, &exit)?; info!("Done in {:?}", i.elapsed()); sleep(Duration::from_secs(60)); diff --git a/crates/brk_indexer/src/stores_v3.rs b/crates/brk_indexer/src/stores_v3.rs index 24722ca4e..a9f35d8ca 100644 --- a/crates/brk_indexer/src/stores_v3.rs +++ b/crates/brk_indexer/src/stores_v3.rs @@ -47,7 +47,7 @@ impl Stores { let database_ref = &database; let create_addresshash_to_addressindex_store = |index| { - Store::import( + Store::import_cached( database_ref, path, &format!("h2i{}", index), @@ -66,7 +66,6 @@ impl Stores { version, Mode3::PushOnly, Kind3::Vec, - 0, ) }; @@ -78,7 +77,6 @@ impl Stores { version, Mode3::Any, Kind3::Vec, - 0, ) }; @@ -92,7 +90,6 @@ impl Stores { version, Mode3::PushOnly, Kind3::Sequential, - 0, )?, addresstype_to_addresshash_to_addressindex: ByAddressType::new_with_index( create_addresshash_to_addressindex_store, @@ -110,9 +107,8 @@ impl Stores { version, Mode3::PushOnly, Kind3::Random, - 0, )?, - txidprefix_to_txindex: Store::import( + txidprefix_to_txindex: Store::import_cached( database_ref, path, "txidprefix_to_txindex", diff --git a/crates/brk_store/src/fjall_v3/mod.rs b/crates/brk_store/src/fjall_v3/mod.rs index 4add6186d..6318645f5 100644 --- a/crates/brk_store/src/fjall_v3/mod.rs +++ b/crates/brk_store/src/fjall_v3/mod.rs @@ -3,29 +3,13 @@ use std::{borrow::Cow, cmp::Ordering, fmt::Debug, fs, hash::Hash, mem, path::Pat use brk_error::Result; use brk_types::{Height, Version}; use byteview_f3::ByteView; -use fjall3::{ - Database, Keyspace, KeyspaceCreateOptions, - config::{ - BloomConstructionPolicy, FilterPolicy, FilterPolicyEntry, PartitioningPolicy, PinningPolicy, - }, -}; - -mod meta; - -use meta::*; +use fjall3::{Database, Keyspace, KeyspaceCreateOptions, config::*}; use rustc_hash::{FxHashMap, FxHashSet}; -use crate::any::AnyStore; +mod meta; +use meta::*; -#[derive(Clone)] -pub struct StoreFjallV3 { - meta: StoreMeta, - name: &'static str, - keyspace: Keyspace, - puts: FxHashMap, - dels: FxHashSet, - prev_puts: Vec>, -} +use crate::any::AnyStore; const MAJOR_FJALL_VERSION: Version = Version::new(3); @@ -35,6 +19,16 @@ pub fn open_fjall3_database(path: &Path) -> fjall3::Result { .open() } +#[derive(Clone)] +pub struct StoreFjallV3 { + meta: StoreMeta, + name: &'static str, + keyspace: Keyspace, + puts: FxHashMap, + dels: FxHashSet, + cache: Option>, +} + impl StoreFjallV3 where K: Debug + Clone + From + Ord + Eq + Hash, @@ -49,7 +43,30 @@ where version: Version, mode: Mode3, kind: Kind3, - cached_commits: usize, + ) -> Result { + Self::import_inner(db, path, name, version, mode, kind, None) + } + + pub fn import_cached( + db: &Database, + path: &Path, + name: &str, + version: Version, + mode: Mode3, + kind: Kind3, + max_batches: u16, + ) -> Result { + Self::import_inner(db, path, name, version, mode, kind, Some(max_batches)) + } + + fn import_inner( + db: &Database, + path: &Path, + name: &str, + version: Version, + mode: Mode3, + kind: Kind3, + max_batches: Option, ) -> Result { fs::create_dir_all(path)?; @@ -65,10 +82,7 @@ where }, )?; - let mut prev_puts = vec![]; - for _ in 0..cached_commits { - prev_puts.push(FxHashMap::default()); - } + let cache = max_batches.map(|max_batches| Cache::new(max_batches)); Ok(Self { meta, @@ -76,7 +90,7 @@ where keyspace, puts: FxHashMap::default(), dels: FxHashSet::default(), - prev_puts, + cache, }) } @@ -125,10 +139,10 @@ where return Ok(Some(Cow::Borrowed(v))); } - for prev in &self.prev_puts { - if let Some(v) = prev.get(key) { - return Ok(Some(Cow::Borrowed(v))); - } + if let Some(cache) = &self.cache + && let Some(v) = cache.get(key) + { + return Ok(Some(Cow::Borrowed(v))); } if let Some(slice) = self.keyspace.get(ByteView::from(key))? { @@ -143,6 +157,12 @@ where self.keyspace.is_empty().map_err(|e| e.into()) } + #[inline] + pub fn insert(&mut self, key: K, value: V) { + let _ = self.dels.is_empty() || self.dels.remove(&key); + self.puts.insert(key, value); + } + #[inline] pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { if self.needs(height) { @@ -150,19 +170,11 @@ where } } - #[inline] - pub fn insert(&mut self, key: K, value: V) { - let _ = self.dels.is_empty() || self.dels.remove(&key); - self.puts.insert(key, value); - } - #[inline] pub fn remove(&mut self, key: K) { - // Hot path: key was recently inserted if self.puts.remove(&key).is_some() { return; } - let newly_inserted = self.dels.insert(key); debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path()); } @@ -191,6 +203,13 @@ where fn needs(&self, height: Height) -> bool { self.meta.needs(height) } + + fn export_meta_if_needed(&mut self, height: Height) -> Result<()> { + if !self.has(height) { + self.meta.export(height)?; + } + Ok(()) + } } impl AnyStore for StoreFjallV3 @@ -200,7 +219,7 @@ where ByteView: From + From, Self: Send + Sync, { - fn keyspace(&self) -> &fjall3::Keyspace { + fn keyspace(&self) -> &Keyspace { &self.keyspace } @@ -213,11 +232,7 @@ where } fn export_meta_if_needed(&mut self, height: Height) -> Result<()> { - if self.has(height) { - return Ok(()); - } - self.meta.export(height)?; - Ok(()) + self.export_meta_if_needed(height) } fn name(&self) -> &'static str { @@ -244,48 +259,38 @@ where self.export_meta_if_needed(height)?; let puts = mem::take(&mut self.puts); + let dels = mem::take(&mut self.dels); - if !self.prev_puts.is_empty() { - self.prev_puts.pop(); - self.prev_puts.insert(0, puts.clone()); - } - - let mut items = puts - .into_iter() - .map(|(key, value)| Item::Value { key, value }) - .chain( - mem::take(&mut self.dels) - .into_iter() - .map(|key| Item::Tomb(key)), - ) - .collect::>(); - - if items.is_empty() { + if puts.is_empty() && dels.is_empty() { return Ok(()); } - items.sort_unstable(); + // Insert into cache here + if let Some(cache) = &mut self.cache { + for (k, v) in &puts { + cache.insert(k.clone(), v.clone()); + } + cache.commit(); + } - // let mut batch = OwnedWriteBatch::with_capacity(self.db.clone(), items.len()); - // let p = self.keyspace(); - // for item in items { - // match item { - // Item::Value { key, value } => { - // batch.insert(p, ByteView::from(key), ByteView::from(value)) - // } - // Item::Tomb(key) => batch.remove(p, ByteView::from(key)), - // } - // } - // batch.commit()?; + let mut items: Vec<_> = puts + .into_iter() + .map(|(key, value)| Item::Value { key, value }) + .chain(dels.into_iter().map(Item::Tomb)) + .collect(); + + items.sort_unstable(); let mut ingestion = self.keyspace.start_ingestion()?; for item in items { match item { Item::Value { key, value } => { - ingestion.write(ByteView::from(key), ByteView::from(value)) + ingestion.write(ByteView::from(key), ByteView::from(value))?; } - Item::Tomb(key) => ingestion.write_tombstone(ByteView::from(key)), - }? + Item::Tomb(key) => { + ingestion.write_tombstone(ByteView::from(key))?; + } + } } ingestion.finish()?; @@ -293,45 +298,43 @@ where } } -pub enum Item { +enum Item { Value { key: K, value: V }, Tomb(K), } - -impl Ord for Item { - fn cmp(&self, other: &Self) -> Ordering { - self.key().cmp(other.key()) - } -} - -impl PartialOrd for Item { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for Item { - fn eq(&self, other: &Self) -> bool { - self.key() == other.key() - } -} - -impl Eq for Item {} - impl Item { + #[inline] fn key(&self) -> &K { match self { Self::Value { key, .. } | Self::Tomb(key) => key, } } } +impl Ord for Item { + #[inline] + fn cmp(&self, other: &Self) -> Ordering { + self.key().cmp(other.key()) + } +} +impl PartialOrd for Item { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl PartialEq for Item { + #[inline] + fn eq(&self, other: &Self) -> bool { + self.key() == other.key() + } +} +impl Eq for Item {} #[derive(Debug, Clone, Copy)] pub enum Mode3 { Any, PushOnly, } - impl Mode3 { pub fn is_any(&self) -> bool { matches!(*self, Self::Any) @@ -348,7 +351,6 @@ pub enum Kind3 { Sequential, Vec, } - impl Kind3 { pub fn is_sequential(&self) -> bool { matches!(*self, Self::Sequential) @@ -362,3 +364,42 @@ impl Kind3 { !matches!(*self, Self::Vec) } } + +#[derive(Clone)] +struct Cache { + index: FxHashMap, + current_batch: u16, + max_batches: u16, +} +impl Cache { + fn new(max_batches: u16) -> Self { + Self { + index: FxHashMap::default(), + current_batch: 0, + max_batches, + } + } + + #[inline] + fn get(&self, key: &K) -> Option<&V> { + self.index.get(key).map(|(v, _)| v) + } + + #[inline] + fn insert(&mut self, key: K, value: V) { + self.index.insert(key, (value, self.current_batch)); + } + + fn commit(&mut self) { + let max = self.max_batches; + let current = self.current_batch; + self.index + .retain(|_, (_, batch)| current.wrapping_sub(*batch) < max); + self.current_batch = self.current_batch.wrapping_add(1); + } + + fn clear(&mut self) { + self.index.clear(); + self.current_batch = 0; + } +}