store: better caching layer

This commit is contained in:
nym21
2025-12-09 16:37:03 +01:00
parent 96b967f6fb
commit b8f77433b9
3 changed files with 143 additions and 106 deletions

View File

@@ -55,7 +55,7 @@ fn main() -> Result<()> {
loop {
let i = Instant::now();
indexer.checked_index(&blocks, &client, &exit)?;
indexer.index(&blocks, &client, &exit)?;
info!("Done in {:?}", i.elapsed());
sleep(Duration::from_secs(60));

View File

@@ -47,7 +47,7 @@ impl Stores {
let database_ref = &database;
let create_addresshash_to_addressindex_store = |index| {
Store::import(
Store::import_cached(
database_ref,
path,
&format!("h2i{}", index),
@@ -66,7 +66,6 @@ impl Stores {
version,
Mode3::PushOnly,
Kind3::Vec,
0,
)
};
@@ -78,7 +77,6 @@ impl Stores {
version,
Mode3::Any,
Kind3::Vec,
0,
)
};
@@ -92,7 +90,6 @@ impl Stores {
version,
Mode3::PushOnly,
Kind3::Sequential,
0,
)?,
addresstype_to_addresshash_to_addressindex: ByAddressType::new_with_index(
create_addresshash_to_addressindex_store,
@@ -110,9 +107,8 @@ impl Stores {
version,
Mode3::PushOnly,
Kind3::Random,
0,
)?,
txidprefix_to_txindex: Store::import(
txidprefix_to_txindex: Store::import_cached(
database_ref,
path,
"txidprefix_to_txindex",

View File

@@ -3,29 +3,13 @@ use std::{borrow::Cow, cmp::Ordering, fmt::Debug, fs, hash::Hash, mem, path::Pat
use brk_error::Result;
use brk_types::{Height, Version};
use byteview_f3::ByteView;
use fjall3::{
Database, Keyspace, KeyspaceCreateOptions,
config::{
BloomConstructionPolicy, FilterPolicy, FilterPolicyEntry, PartitioningPolicy, PinningPolicy,
},
};
mod meta;
use meta::*;
use fjall3::{Database, Keyspace, KeyspaceCreateOptions, config::*};
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
mod meta;
use meta::*;
#[derive(Clone)]
pub struct StoreFjallV3<Key, Value> {
meta: StoreMeta,
name: &'static str,
keyspace: Keyspace,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
prev_puts: Vec<FxHashMap<Key, Value>>,
}
use crate::any::AnyStore;
const MAJOR_FJALL_VERSION: Version = Version::new(3);
@@ -35,6 +19,16 @@ pub fn open_fjall3_database(path: &Path) -> fjall3::Result<Database> {
.open()
}
#[derive(Clone)]
pub struct StoreFjallV3<K, V> {
meta: StoreMeta,
name: &'static str,
keyspace: Keyspace,
puts: FxHashMap<K, V>,
dels: FxHashSet<K>,
cache: Option<Cache<K, V>>,
}
impl<K, V> StoreFjallV3<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
@@ -49,7 +43,30 @@ where
version: Version,
mode: Mode3,
kind: Kind3,
cached_commits: usize,
) -> Result<Self> {
Self::import_inner(db, path, name, version, mode, kind, None)
}
pub fn import_cached(
db: &Database,
path: &Path,
name: &str,
version: Version,
mode: Mode3,
kind: Kind3,
max_batches: u16,
) -> Result<Self> {
Self::import_inner(db, path, name, version, mode, kind, Some(max_batches))
}
fn import_inner(
db: &Database,
path: &Path,
name: &str,
version: Version,
mode: Mode3,
kind: Kind3,
max_batches: Option<u16>,
) -> Result<Self> {
fs::create_dir_all(path)?;
@@ -65,10 +82,7 @@ where
},
)?;
let mut prev_puts = vec![];
for _ in 0..cached_commits {
prev_puts.push(FxHashMap::default());
}
let cache = max_batches.map(|max_batches| Cache::new(max_batches));
Ok(Self {
meta,
@@ -76,7 +90,7 @@ where
keyspace,
puts: FxHashMap::default(),
dels: FxHashSet::default(),
prev_puts,
cache,
})
}
@@ -125,10 +139,10 @@ where
return Ok(Some(Cow::Borrowed(v)));
}
for prev in &self.prev_puts {
if let Some(v) = prev.get(key) {
return Ok(Some(Cow::Borrowed(v)));
}
if let Some(cache) = &self.cache
&& let Some(v) = cache.get(key)
{
return Ok(Some(Cow::Borrowed(v)));
}
if let Some(slice) = self.keyspace.get(ByteView::from(key))? {
@@ -143,6 +157,12 @@ where
self.keyspace.is_empty().map_err(|e| e.into())
}
#[inline]
pub fn insert(&mut self, key: K, value: V) {
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
#[inline]
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
@@ -150,19 +170,11 @@ where
}
}
#[inline]
pub fn insert(&mut self, key: K, value: V) {
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
#[inline]
pub fn remove(&mut self, key: K) {
// Hot path: key was recently inserted
if self.puts.remove(&key).is_some() {
return;
}
let newly_inserted = self.dels.insert(key);
debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
}
@@ -191,6 +203,13 @@ where
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
if !self.has(height) {
self.meta.export(height)?;
}
Ok(())
}
}
impl<K, V> AnyStore for StoreFjallV3<K, V>
@@ -200,7 +219,7 @@ where
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
fn keyspace(&self) -> &fjall3::Keyspace {
fn keyspace(&self) -> &Keyspace {
&self.keyspace
}
@@ -213,11 +232,7 @@ where
}
fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
if self.has(height) {
return Ok(());
}
self.meta.export(height)?;
Ok(())
self.export_meta_if_needed(height)
}
fn name(&self) -> &'static str {
@@ -244,48 +259,38 @@ where
self.export_meta_if_needed(height)?;
let puts = mem::take(&mut self.puts);
let dels = mem::take(&mut self.dels);
if !self.prev_puts.is_empty() {
self.prev_puts.pop();
self.prev_puts.insert(0, puts.clone());
}
let mut items = puts
.into_iter()
.map(|(key, value)| Item::Value { key, value })
.chain(
mem::take(&mut self.dels)
.into_iter()
.map(|key| Item::Tomb(key)),
)
.collect::<Vec<_>>();
if items.is_empty() {
if puts.is_empty() && dels.is_empty() {
return Ok(());
}
items.sort_unstable();
// Insert into cache here
if let Some(cache) = &mut self.cache {
for (k, v) in &puts {
cache.insert(k.clone(), v.clone());
}
cache.commit();
}
// let mut batch = OwnedWriteBatch::with_capacity(self.db.clone(), items.len());
// let p = self.keyspace();
// for item in items {
// match item {
// Item::Value { key, value } => {
// batch.insert(p, ByteView::from(key), ByteView::from(value))
// }
// Item::Tomb(key) => batch.remove(p, ByteView::from(key)),
// }
// }
// batch.commit()?;
let mut items: Vec<_> = puts
.into_iter()
.map(|(key, value)| Item::Value { key, value })
.chain(dels.into_iter().map(Item::Tomb))
.collect();
items.sort_unstable();
let mut ingestion = self.keyspace.start_ingestion()?;
for item in items {
match item {
Item::Value { key, value } => {
ingestion.write(ByteView::from(key), ByteView::from(value))
ingestion.write(ByteView::from(key), ByteView::from(value))?;
}
Item::Tomb(key) => ingestion.write_tombstone(ByteView::from(key)),
}?
Item::Tomb(key) => {
ingestion.write_tombstone(ByteView::from(key))?;
}
}
}
ingestion.finish()?;
@@ -293,45 +298,43 @@ where
}
}
pub enum Item<K, V> {
enum Item<K, V> {
Value { key: K, value: V },
Tomb(K),
}
impl<K: Ord, V> Ord for Item<K, V> {
fn cmp(&self, other: &Self) -> Ordering {
self.key().cmp(other.key())
}
}
impl<K: Ord, V> PartialOrd for Item<K, V> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<K: Eq, V> PartialEq for Item<K, V> {
fn eq(&self, other: &Self) -> bool {
self.key() == other.key()
}
}
impl<K: Eq, V> Eq for Item<K, V> {}
impl<K, V> Item<K, V> {
#[inline]
fn key(&self) -> &K {
match self {
Self::Value { key, .. } | Self::Tomb(key) => key,
}
}
}
impl<K: Ord, V> Ord for Item<K, V> {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
self.key().cmp(other.key())
}
}
impl<K: Ord, V> PartialOrd for Item<K, V> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<K: Eq, V> PartialEq for Item<K, V> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.key() == other.key()
}
}
impl<K: Eq, V> Eq for Item<K, V> {}
#[derive(Debug, Clone, Copy)]
pub enum Mode3 {
Any,
PushOnly,
}
impl Mode3 {
pub fn is_any(&self) -> bool {
matches!(*self, Self::Any)
@@ -348,7 +351,6 @@ pub enum Kind3 {
Sequential,
Vec,
}
impl Kind3 {
pub fn is_sequential(&self) -> bool {
matches!(*self, Self::Sequential)
@@ -362,3 +364,42 @@ impl Kind3 {
!matches!(*self, Self::Vec)
}
}
#[derive(Clone)]
struct Cache<K, V> {
index: FxHashMap<K, (V, u16)>,
current_batch: u16,
max_batches: u16,
}
impl<K: Hash + Eq + Clone, V: Clone> Cache<K, V> {
fn new(max_batches: u16) -> Self {
Self {
index: FxHashMap::default(),
current_batch: 0,
max_batches,
}
}
#[inline]
fn get(&self, key: &K) -> Option<&V> {
self.index.get(key).map(|(v, _)| v)
}
#[inline]
fn insert(&mut self, key: K, value: V) {
self.index.insert(key, (value, self.current_batch));
}
fn commit(&mut self) {
let max = self.max_batches;
let current = self.current_batch;
self.index
.retain(|_, (_, batch)| current.wrapping_sub(*batch) < max);
self.current_batch = self.current_batch.wrapping_add(1);
}
fn clear(&mut self) {
self.index.clear();
self.current_batch = 0;
}
}