indexer + store: fjall v3 test (with caching layer)

This commit is contained in:
nym21
2025-12-07 21:53:19 +01:00
parent b88f0bab56
commit f4a1384dc4
7 changed files with 45 additions and 23 deletions

6
Cargo.lock generated
View File

@@ -1246,7 +1246,7 @@ version = "0.0.111"
dependencies = [
"bitcoin",
"brk_error",
"byteview 0.6.1",
"byteview 0.9.1",
"derive_deref",
"itoa",
"jiff",
@@ -2073,8 +2073,6 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "fjall"
version = "3.0.0-rc.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c91b735d557d6636f69dedbea4d7a465e6095a00409b2b146a64cf4c136ab833"
dependencies = [
"byteorder-lite",
"byteview 0.9.1",
@@ -2973,8 +2971,6 @@ dependencies = [
[[package]]
name = "lsm-tree"
version = "3.0.0-rc.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4e32220e039b7e8e1474a5206eb32c002c9d18fe0770d87c693e00b782f15ff"
dependencies = [
"byteorder-lite",
"byteview 0.9.1",

View File

@@ -57,13 +57,14 @@ brk_store = { version = "0.0.111", path = "crates/brk_store" }
brk_types = { version = "0.0.111", path = "crates/brk_types" }
brk_traversable = { version = "0.0.111", path = "crates/brk_traversable", features = ["pco", "derive"] }
brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_derive" }
byteview = "=0.6.1"
# byteview = "0.9.1"
# byteview = "=0.6.1"
byteview = "0.9.1"
color-eyre = "0.6.5"
derive_deref = "1.1.1"
fjall2 = { version = "2.11.8", package = "brk_fjall" }
# fjall2 = { path = "../fjall2", package = "brk_fjall" }
fjall3 = { version = "3.0.0-rc.5", package = "fjall" }
# fjall3 = { version = "3.0.0-rc.5", package = "fjall" }
fjall3 = { path = "../fjall3", package = "fjall" }
# fjall3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "434979ef59d8fd2b36b91e6ff759a36d19a397ee", package = "fjall" }
jiff = "0.2.16"
log = "0.4.29"
minreq = { version = "2.14.1", features = ["https", "serde_json"] }

View File

@@ -49,7 +49,7 @@ fn main() -> Result<()> {
});
let i = Instant::now();
indexer.checked_index(&blocks, &client, &exit)?;
indexer.index(&blocks, &client, &exit)?;
info!("Done in {:?}", i.elapsed());
// We want to benchmark the drop too

View File

@@ -12,16 +12,16 @@ mod constants;
mod indexes;
mod processor;
mod readers;
mod stores_v2;
// mod stores_v3;
// mod stores_v2;
mod stores_v3;
mod vecs;
use constants::*;
pub use indexes::*;
pub use processor::*;
pub use readers::*;
pub use stores_v2::*;
// pub use stores_v3::*;
// pub use stores_v2::*;
pub use stores_v3::*;
pub use vecs::*;
#[derive(Clone)]

View File

@@ -113,7 +113,7 @@ impl<'a> BlockProcessor<'a> {
/// Compute TXIDs in parallel (CPU-intensive operation).
pub fn compute_txids(&self) -> Result<Vec<ComputedTx<'a>>> {
let should_check_collisions =
let will_check_collisions =
self.check_collisions && self.stores.txidprefix_to_txindex.needs(self.height);
let base_txindex = self.indexes.txindex;
@@ -125,7 +125,7 @@ impl<'a> BlockProcessor<'a> {
let txid = Txid::from(tx.compute_txid());
let txid_prefix = TxidPrefix::from(&txid);
let prev_txindex_opt = if should_check_collisions {
let prev_txindex_opt = if will_check_collisions {
self.stores
.txidprefix_to_txindex
.get(&txid_prefix)?

View File

@@ -33,10 +33,6 @@ pub struct Stores {
pub blockhashprefix_to_height: Store<BlockHashPrefix, Height>,
pub height_to_coinbase_tag: Store<Height, StoredString>,
pub txidprefix_to_txindex: Store<TxidPrefix, TxIndex>,
// pub addresstype_to_addressindex_and_txindex: Store<AddressTypeAddressIndexTxIndex, Unit>,
// pub addresshash_to_typeindex: Store<AddressHash, TypeIndex>,
// pub addresstype_to_addressindex_and_unspentoutpoint:
// Store<AddressTypeAddressIndexOutPoint, Unit>,
}
impl Stores {
@@ -64,6 +60,7 @@ impl Stores {
version,
Mode3::PushOnly,
Kind3::Random,
10,
)
};
@@ -75,6 +72,7 @@ impl Stores {
version,
Mode3::PushOnly,
Kind3::Vec,
0,
)
};
@@ -86,6 +84,7 @@ impl Stores {
version,
Mode3::Any,
Kind3::Vec,
0,
)
};
@@ -99,6 +98,7 @@ impl Stores {
version,
Mode3::PushOnly,
Kind3::Sequential,
0,
)?,
addresstype_to_addresshash_to_addressindex: ByAddressType::new_with_index(
create_addresshash_to_addressindex_store,
@@ -116,6 +116,7 @@ impl Stores {
version,
Mode3::PushOnly,
Kind3::Random,
0,
)?,
txidprefix_to_txindex: Store::import(
database_ref,
@@ -124,6 +125,7 @@ impl Stores {
version,
Mode3::PushOnly,
Kind3::Random,
10,
)?,
})
}

View File

@@ -22,6 +22,7 @@ pub struct StoreFjallV3<Key, Value> {
keyspace: Keyspace,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
prev_puts: Vec<FxHashMap<Key, Value>>,
}
const MAJOR_FJALL_VERSION: Version = Version::new(3);
@@ -47,6 +48,7 @@ where
version: Version,
mode: Mode3,
kind: Kind3,
cached_commits: usize,
) -> Result<Self> {
fs::create_dir_all(path)?;
@@ -62,12 +64,18 @@ where
},
)?;
let mut prev_puts = vec![];
for _ in 0..cached_commits {
prev_puts.push(FxHashMap::default());
}
Ok(Self {
meta,
name: Box::leak(Box::new(name.to_string())),
keyspace,
puts: FxHashMap::default(),
dels: FxHashSet::default(),
prev_puts,
})
}
@@ -107,8 +115,16 @@ where
ByteView: From<&'a K>,
{
if let Some(v) = self.puts.get(key) {
Ok(Some(Cow::Borrowed(v)))
} else if let Some(slice) = self.keyspace.get(ByteView::from(key))? {
return Ok(Some(Cow::Borrowed(v)));
}
for prev in &self.prev_puts {
if let Some(v) = prev.get(key) {
return Ok(Some(Cow::Borrowed(v)));
}
}
if let Some(slice) = self.keyspace.get(ByteView::from(key))? {
Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
} else {
Ok(None)
@@ -220,7 +236,14 @@ where
fn commit_f3(&mut self, height: Height) -> Result<()> {
self.export_meta_if_needed(height)?;
let mut items = mem::take(&mut self.puts)
let puts = mem::take(&mut self.puts);
if !self.prev_puts.is_empty() {
self.prev_puts.pop();
self.prev_puts.insert(0, puts.clone());
}
let mut items = puts
.into_iter()
.map(|(key, value)| Item::Value { key, value })
.chain(