#![doc = include_str!("../README.md")] use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, ops::Range, path::Path}; use brk_error::Result; use brk_types::{Height, Version}; use byteview::ByteView; use fjall::{Database, Keyspace, KeyspaceCreateOptions, config::*}; use rustc_hash::{FxHashMap, FxHashSet}; mod any; mod item; mod kind; mod meta; mod mode; pub use any::*; pub use item::*; pub use kind::*; pub use meta::*; pub use mode::*; const MAJOR_FJALL_VERSION: Version = Version::new(3); pub fn open_database(path: &Path) -> fjall::Result { Database::builder(path.join("fjall")) .cache_size(3 * 1024 * 1024 * 1024) .max_cached_files(Some(512)) .open() } #[derive(Clone)] pub struct Store { meta: StoreMeta, name: &'static str, keyspace: Keyspace, puts: FxHashMap, dels: FxHashSet, caches: Vec>, } impl Store where K: Debug + Clone + From + Ord + Eq + Hash, V: Debug + Clone + From, ByteView: From + From, Self: Send + Sync, { pub fn import( db: &Database, path: &Path, name: &str, version: Version, mode: Mode, kind: Kind, ) -> Result { Self::import_inner(db, path, name, version, mode, kind, 0) } pub fn import_cached( db: &Database, path: &Path, name: &str, version: Version, mode: Mode, kind: Kind, max_batches: u8, ) -> Result { Self::import_inner(db, path, name, version, mode, kind, max_batches) } fn import_inner( db: &Database, path: &Path, name: &str, version: Version, mode: Mode, kind: Kind, max_batches: u8, ) -> Result { fs::create_dir_all(path)?; let (meta, keyspace) = StoreMeta::checked_open( &path.join(format!("meta/{name}")), MAJOR_FJALL_VERSION + version, || { Self::open_keyspace(db, name, mode, kind).inspect_err(|e| { eprintln!("{e}"); eprintln!("Delete {path:?} and try again"); }) }, )?; let mut caches = vec![]; for _ in 0..max_batches { caches.push(FxHashMap::default()); } Ok(Self { meta, name: Box::leak(Box::new(name.to_string())), keyspace, puts: FxHashMap::default(), dels: FxHashSet::default(), caches, }) } fn open_keyspace(database: &Database, name: &str, _mode: Mode, kind: Kind) -> Result { let mut options = KeyspaceCreateOptions::default() .manual_journal_persist(true) .filter_block_partitioning_policy(PartitioningPolicy::new([false, false, true])) .index_block_partitioning_policy(PartitioningPolicy::new([false, false, true])); match kind { Kind::Random => { options = options .filter_block_pinning_policy(PinningPolicy::new([true, true, true, false])) .filter_policy(FilterPolicy::new([ FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate( 0.0001, )), FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)), FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)), FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(9.0)), ])); } Kind::Recent => { options = options .expect_point_read_hits(true) .filter_policy(FilterPolicy::new([ FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate( 0.0001, )), FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)), FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(8.0)), FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(7.0)), ])); } Kind::Vec => { options = options .max_memtable_size(8 * 1024 * 1024) .filter_policy(FilterPolicy::disabled()) .filter_block_pinning_policy(PinningPolicy::all(false)) .index_block_pinning_policy(PinningPolicy::all(false)); } } database.keyspace(name, || options).map_err(|e| e.into()) } #[inline] pub fn get<'a>(&'a self, key: &'a K) -> Result>> where ByteView: From<&'a K>, { if let Some(v) = self.puts.get(key) { return Ok(Some(Cow::Borrowed(v))); } for cache in &self.caches { if let Some(v) = cache.get(key) { return Ok(Some(Cow::Borrowed(v))); } } if let Some(slice) = self.keyspace.get(ByteView::from(key))? { Ok(Some(Cow::Owned(V::from(ByteView::from(slice))))) } else { Ok(None) } } #[inline] pub fn is_empty(&self) -> Result { self.keyspace.is_empty().map_err(|e| e.into()) } #[inline] pub fn insert(&mut self, key: K, value: V) { let _ = self.dels.is_empty() || self.dels.remove(&key); self.puts.insert(key, value); } #[inline] pub fn remove(&mut self, key: K) { if self.puts.remove(&key).is_some() { return; } let newly_inserted = self.dels.insert(key); debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path()); } /// Clear all caches. Call after bulk removals (e.g., rollback) to prevent stale reads. #[inline] pub fn clear_caches(&mut self) { for cache in &mut self.caches { *cache = FxHashMap::default(); } } /// Takes buffered puts/dels and returns a closure that ingests them into the keyspace. /// The store is left with empty buffers, ready for the next batch. #[allow(clippy::type_complexity)] pub fn take_pending_ingest( &mut self, height: Height, ) -> Result Result<()> + Send>>> where K: Send + 'static, V: Send + 'static, for<'a> ByteView: From<&'a K> + From<&'a V>, { self.export_meta_if_needed(height)?; let puts = mem::take(&mut self.puts); let dels = mem::take(&mut self.dels); if puts.is_empty() && dels.is_empty() { return Ok(None); } let keyspace = self.keyspace.clone(); Ok(Some(Box::new(move || { Self::ingest(&keyspace, puts.iter(), dels.iter()) }))) } #[inline] pub fn iter(&self) -> impl Iterator { self.keyspace .iter() .map(|res| res.into_inner().unwrap()) .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) } #[inline] pub fn prefix>( &self, prefix: P, ) -> impl DoubleEndedIterator + '_ { self.keyspace .prefix(prefix) .map(|res| res.into_inner().unwrap()) .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) } #[inline] pub fn range>( &self, range: Range, ) -> impl DoubleEndedIterator + '_ { let start: ByteView = range.start.into(); let end: ByteView = range.end.into(); self.keyspace .range(start..end) .map(|res| res.into_inner().unwrap()) .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) } pub fn approximate_len(&self) -> usize { self.keyspace.approximate_len() } #[inline] fn has(&self, height: Height) -> bool { self.meta.has(height) } #[inline] pub fn needs(&self, height: Height) -> bool { self.meta.needs(height) } fn export_meta(&mut self, height: Height) -> Result<()> { self.meta.export(height)?; Ok(()) } fn export_meta_if_needed(&mut self, height: Height) -> Result<()> { if !self.has(height) { self.export_meta(height)?; } Ok(()) } fn ingest<'a>( keyspace: &Keyspace, puts: impl Iterator, dels: impl Iterator, ) -> Result<()> where ByteView: From<&'a K> + From<&'a V>, K: 'a, V: 'a, { let mut items: Vec> = puts .map(|(key, value)| Item::Value { key, value }) .chain(dels.map(Item::Tomb)) .collect(); items.sort_unstable(); let mut ingestion = keyspace.start_ingestion()?; for item in items { match item { Item::Value { key, value } => { ingestion.write(ByteView::from(key), ByteView::from(value))?; } Item::Tomb(key) => { ingestion.write_weak_tombstone(ByteView::from(key))?; } } } ingestion.finish()?; Ok(()) } } impl AnyStore for Store where K: Debug + Clone + From + Ord + Eq + Hash, V: Debug + Clone + From, for<'a> ByteView: From + From + From<&'a K> + From<&'a V>, Self: Send + Sync, { fn keyspace(&self) -> &Keyspace { &self.keyspace } fn export_meta(&mut self, height: Height) -> Result<()> { self.export_meta(height) } fn export_meta_if_needed(&mut self, height: Height) -> Result<()> { self.export_meta_if_needed(height) } fn name(&self) -> &'static str { self.name } fn height(&self) -> Option { self.meta.height() } fn has(&self, height: Height) -> bool { self.has(height) } fn needs(&self, height: Height) -> bool { self.needs(height) } fn version(&self) -> Version { self.meta.version() } fn commit(&mut self, height: Height) -> Result<()> { self.export_meta_if_needed(height)?; let puts = mem::take(&mut self.puts); let dels = mem::take(&mut self.dels); if puts.is_empty() && dels.is_empty() { return Ok(()); } Self::ingest(&self.keyspace, puts.iter(), dels.iter())?; if !self.caches.is_empty() { self.caches.pop(); self.caches.insert(0, puts); } Ok(()) } }