global: snapshot

This commit is contained in:
nym21
2026-03-26 15:57:22 +01:00
parent 6d3307c0df
commit 18bb4186a8
72 changed files with 2013 additions and 1150 deletions

View File

@@ -6,6 +6,7 @@ use brk_error::Result;
use brk_iterator::Blocks;
use brk_rpc::Client;
use brk_types::Height;
use fjall::PersistMode;
use tracing::{debug, info};
use vecdb::{Exit, ReadOnlyClone, ReadableVec, Ro, Rw, StorageMode};
mod constants;
@@ -107,6 +108,8 @@ impl Indexer {
exit: &Exit,
check_collisions: bool,
) -> Result<Indexes> {
self.vecs.db.sync_bg_tasks()?;
debug!("Starting indexing...");
let last_blockhash = self.vecs.blocks.blockhash.collect_last();
@@ -248,11 +251,32 @@ impl Indexer {
drop(readers);
if !is_export_height(indexes.height) {
export(stores, vecs, indexes.height)?;
}
let lock = exit.lock();
let tasks = self.stores.take_all_pending_ingests(indexes.height)?;
self.vecs.stamped_write(indexes.height)?;
let fjall_db = self.stores.db.clone();
self.vecs.compact()?;
self.vecs.db.run_bg(move |db| {
let _lock = lock;
if !tasks.is_empty() {
let i = Instant::now();
for task in tasks {
task().map_err(vecdb::RawDBError::other)?;
}
info!("Stores committed in {:?}", i.elapsed());
let i = Instant::now();
fjall_db
.persist(PersistMode::SyncData)
.map_err(vecdb::RawDBError::other)?;
info!("Stores persisted in {:?}", i.elapsed());
}
db.flush()?;
db.compact()?;
Ok(())
});
Ok(starting_indexes)
}

View File

@@ -6,8 +6,8 @@ use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_store::{AnyStore, Kind, Mode, Store};
use brk_types::{
AddrHash, AddrIndexOutPoint, AddrIndexTxIndex, BlockHashPrefix, Height, OutPoint,
OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout,
AddrHash, AddrIndexOutPoint, AddrIndexTxIndex, BlockHashPrefix, Height, OutPoint, OutputType,
StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout,
};
use fjall::{Database, PersistMode};
use rayon::prelude::*;
@@ -24,8 +24,7 @@ pub struct Stores {
pub addr_type_to_addr_hash_to_addr_index: ByAddrType<Store<AddrHash, TypeIndex>>,
pub addr_type_to_addr_index_and_tx_index: ByAddrType<Store<AddrIndexTxIndex, Unit>>,
pub addr_type_to_addr_index_and_unspent_outpoint:
ByAddrType<Store<AddrIndexOutPoint, Unit>>,
pub addr_type_to_addr_index_and_unspent_outpoint: ByAddrType<Store<AddrIndexOutPoint, Unit>>,
pub blockhash_prefix_to_height: Store<BlockHashPrefix, Height>,
pub height_to_coinbase_tag: Store<Height, StoredString>,
pub txid_prefix_to_tx_index: Store<TxidPrefix, TxIndex>,
@@ -194,6 +193,39 @@ impl Stores {
Ok(())
}
/// Takes all pending puts/dels from every store and returns closures
/// that can ingest them on a background thread.
#[allow(clippy::type_complexity)]
pub fn take_all_pending_ingests(
&mut self,
height: Height,
) -> Result<Vec<Box<dyn FnOnce() -> Result<()> + Send>>> {
let h = height;
let mut tasks = Vec::new();
macro_rules! take {
($store:expr) => {
tasks.extend($store.take_pending_ingest(h)?);
};
}
take!(self.blockhash_prefix_to_height);
take!(self.height_to_coinbase_tag);
take!(self.txid_prefix_to_tx_index);
for store in self.addr_type_to_addr_hash_to_addr_index.values_mut() {
take!(store);
}
for store in self.addr_type_to_addr_index_and_tx_index.values_mut() {
take!(store);
}
for store in self.addr_type_to_addr_index_and_unspent_outpoint.values_mut() {
take!(store);
}
Ok(tasks)
}
pub fn rollback_if_needed(
&mut self,
vecs: &mut Vecs,
@@ -368,11 +400,7 @@ impl Stores {
let addr_type = output_type;
let addr_index = type_index;
addr_index_tx_index_to_remove.insert((
addr_type,
addr_index,
spending_tx_index,
));
addr_index_tx_index_to_remove.insert((addr_type, addr_index, spending_tx_index));
self.addr_type_to_addr_index_and_unspent_outpoint
.get_mut_unwrap(addr_type)

View File

@@ -30,7 +30,7 @@ use crate::Indexes;
#[derive(Traversable)]
pub struct Vecs<M: StorageMode = Rw> {
#[traversable(skip)]
db: Database,
pub db: Database,
pub blocks: BlocksVecs<M>,
#[traversable(wrap = "transactions", rename = "raw")]
pub transactions: TransactionsVecs<M>,
@@ -121,8 +121,7 @@ impl Vecs {
}
pub fn flush(&mut self, height: Height) -> Result<()> {
self.par_iter_mut_any_stored_vec()
.try_for_each(|vec| vec.stamped_write(Stamp::from(height)))?;
self.stamped_write(height)?;
self.db.flush()?;
Ok(())
}
@@ -137,6 +136,12 @@ impl Vecs {
.unwrap()
}
pub fn stamped_write(&mut self, height: Height) -> Result<()> {
self.par_iter_mut_any_stored_vec()
.try_for_each(|vec| vec.stamped_write(Stamp::from(height)))?;
Ok(())
}
pub fn compact(&self) -> Result<()> {
self.db.compact()?;
Ok(())
@@ -168,7 +173,4 @@ impl Vecs {
.chain(self.scripts.par_iter_mut_any())
}
pub fn db(&self) -> &Database {
&self.db
}
}