global: snapshot

This commit is contained in:
nym21
2026-03-31 22:53:25 +02:00
parent d038141a8a
commit ae26db6df2
83 changed files with 3398 additions and 710 deletions

View File

@@ -12,7 +12,6 @@ exclude = ["examples/"]
bitcoin = { workspace = true }
brk_error = { workspace = true, features = ["fjall", "vecdb"] }
brk_cohort = { workspace = true }
brk_iterator = { workspace = true }
brk_logger = { workspace = true }
brk_reader = { workspace = true }
brk_rpc = { workspace = true, features = ["corepc"] }

View File

@@ -7,7 +7,6 @@ use std::{
use brk_alloc::Mimalloc;
use brk_indexer::Indexer;
use brk_iterator::Blocks;
use brk_reader::Reader;
use brk_rpc::{Auth, Client};
use tracing::{debug, info};
@@ -33,9 +32,6 @@ fn main() -> color_eyre::Result<()> {
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
debug!("Reader created.");
let blocks = Blocks::new(&client, &reader);
debug!("Blocks created.");
let mut indexer = Indexer::forced_import(&outputs_dir)?;
debug!("Indexer imported.");
@@ -44,7 +40,7 @@ fn main() -> color_eyre::Result<()> {
loop {
let i = Instant::now();
indexer.checked_index(&blocks, &client, &exit)?;
indexer.checked_index(&reader, &client, &exit)?;
info!("Done in {:?}", i.elapsed());
Mimalloc::collect();

View File

@@ -9,7 +9,6 @@ use brk_alloc::Mimalloc;
use brk_bencher::Bencher;
use brk_error::Result;
use brk_indexer::Indexer;
use brk_iterator::Blocks;
use brk_reader::Reader;
use brk_rpc::{Auth, Client};
use tracing::{debug, info};
@@ -33,8 +32,6 @@ fn main() -> Result<()> {
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
let blocks = Blocks::new(&client, &reader);
let mut indexer = Indexer::forced_import(&outputs_dir)?;
let mut bencher =
@@ -50,7 +47,7 @@ fn main() -> Result<()> {
});
let i = Instant::now();
indexer.index(&blocks, &client, &exit)?;
indexer.index(&reader, &client, &exit)?;
info!("Done in {:?}", i.elapsed());
// We want to benchmark the drop too

View File

@@ -9,7 +9,6 @@ use brk_alloc::Mimalloc;
use brk_bencher::Bencher;
use brk_error::Result;
use brk_indexer::Indexer;
use brk_iterator::Blocks;
use brk_reader::Reader;
use brk_rpc::{Auth, Client};
use tracing::{debug, info};
@@ -33,8 +32,6 @@ fn main() -> Result<()> {
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
let blocks = Blocks::new(&client, &reader);
let mut indexer = Indexer::forced_import(&outputs_dir)?;
let mut bencher =
@@ -51,7 +48,7 @@ fn main() -> Result<()> {
loop {
let i = Instant::now();
indexer.index(&blocks, &client, &exit)?;
indexer.index(&reader, &client, &exit)?;
info!("Done in {:?}", i.elapsed());
Mimalloc::collect();

View File

@@ -4,7 +4,7 @@ use brk_types::{TxIndex, Txid, TxidPrefix, Version};
// One version for all data sources
// Increment on **change _OR_ addition**
pub const VERSION: Version = Version::new(25);
pub const VERSION: Version = Version::new(26);
pub const SNAPSHOT_BLOCK_RANGE: usize = 1_000;
/// Known duplicate Bitcoin transactions (BIP30)

View File

@@ -8,12 +8,14 @@ use std::{
};
use brk_error::Result;
use brk_iterator::Blocks;
use brk_reader::Reader;
use brk_rpc::Client;
use brk_types::Height;
use fjall::PersistMode;
use tracing::{debug, info};
use vecdb::{Exit, RawDBError, ReadOnlyClone, ReadableVec, Ro, Rw, StorageMode};
use vecdb::{
Exit, RawDBError, ReadOnlyClone, ReadableVec, Ro, Rw, StorageMode, WritableVec, unlikely,
};
mod constants;
mod indexes;
mod processor;
@@ -93,22 +95,22 @@ impl Indexer {
}
}
pub fn index(&mut self, blocks: &Blocks, client: &Client, exit: &Exit) -> Result<Indexes> {
self.index_(blocks, client, exit, false)
pub fn index(&mut self, reader: &Reader, client: &Client, exit: &Exit) -> Result<Indexes> {
self.index_(reader, client, exit, false)
}
pub fn checked_index(
&mut self,
blocks: &Blocks,
reader: &Reader,
client: &Client,
exit: &Exit,
) -> Result<Indexes> {
self.index_(blocks, client, exit, true)
self.index_(reader, client, exit, true)
}
fn index_(
&mut self,
blocks: &Blocks,
reader: &Reader,
client: &Client,
exit: &Exit,
check_collisions: bool,
@@ -172,13 +174,13 @@ impl Indexer {
let stores_res = s.spawn(|| -> Result<()> {
let i = Instant::now();
stores.commit(height)?;
info!("Stores exported in {:?}", i.elapsed());
debug!("Stores exported in {:?}", i.elapsed());
Ok(())
});
let vecs_res = s.spawn(|| -> Result<()> {
let i = Instant::now();
vecs.flush(height)?;
info!("Vecs exported in {:?}", i.elapsed());
debug!("Vecs exported in {:?}", i.elapsed());
Ok(())
});
stores_res.join().unwrap()?;
@@ -195,13 +197,22 @@ impl Indexer {
let vecs = &mut self.vecs;
let stores = &mut self.stores;
for block in blocks.after(prev_hash)? {
for block in reader.after(prev_hash)?.iter() {
let height = block.height();
info!("Indexing block {height}...");
if unlikely(height.is_multiple_of(100)) {
info!("Indexing block {height}...");
} else {
debug!("Indexing block {height}...");
}
indexes.height = height;
vecs.blocks.position.push(block.metadata().position());
block.tx_metadata().iter().for_each(|m| {
vecs.transactions.position.push(m.position());
});
let mut processor = BlockProcessor {
block: &block,
height,
@@ -271,13 +282,13 @@ impl Indexer {
for task in tasks {
task().map_err(vecdb::RawDBError::other)?;
}
info!("Stores committed in {:?}", i.elapsed());
debug!("Stores committed in {:?}", i.elapsed());
let i = Instant::now();
fjall_db
.persist(PersistMode::SyncData)
.map_err(RawDBError::other)?;
info!("Stores persisted in {:?}", i.elapsed());
debug!("Stores persisted in {:?}", i.elapsed());
}
db.compact()?;

View File

@@ -28,14 +28,14 @@ impl BlockProcessor<'_> {
.blockhash_prefix_to_height
.insert(blockhash_prefix, height);
self.stores
.height_to_coinbase_tag
.insert(height, self.block.coinbase_tag().into());
self.vecs
.blocks
.blockhash
.checked_push(height, blockhash.clone())?;
self.vecs
.blocks
.coinbase_tag
.checked_push(height, self.block.coinbase_tag())?;
self.vecs
.blocks
.difficulty
@@ -53,21 +53,28 @@ impl BlockProcessor<'_> {
pub fn push_block_size_and_weight(&mut self, txs: &[ComputedTx]) -> Result<()> {
let overhead = bitcoin::block::Header::SIZE + bitcoin::VarInt::from(txs.len()).size();
let mut total_size = overhead;
let mut weight_wu = overhead * 4;
for ct in txs {
let base = ct.base_size as usize;
let total = ct.total_size as usize;
total_size += total;
weight_wu += base * 3 + total;
let mut weight = overhead * 4;
let mut sw_txs = 0u32;
let mut sw_size = 0usize;
let mut sw_weight = 0usize;
for tx in txs {
total_size += tx.total_size as usize;
weight += tx.weight();
if tx.is_segwit() {
sw_txs += 1;
sw_size += tx.total_size as usize;
sw_weight += tx.weight();
}
}
self.vecs
.blocks
.total
.checked_push(self.height, total_size.into())?;
self.vecs
.blocks
.weight
.checked_push(self.height, weight_wu.into())?;
let h = self.height;
let blocks = &mut self.vecs.blocks;
blocks.total.checked_push(h, total_size.into())?;
blocks.weight.checked_push(h, weight.into())?;
blocks.segwit_txs.checked_push(h, sw_txs.into())?;
blocks.segwit_size.checked_push(h, sw_size.into())?;
blocks.segwit_weight.checked_push(h, sw_weight.into())?;
Ok(())
}
}

View File

@@ -48,6 +48,18 @@ pub struct ComputedTx<'a> {
pub total_size: u32,
}
impl ComputedTx<'_> {
#[inline]
pub fn is_segwit(&self) -> bool {
self.base_size != self.total_size
}
#[inline]
pub fn weight(&self) -> usize {
self.base_size as usize * 3 + self.total_size as usize
}
}
/// Reusable buffers cleared and refilled each block to avoid allocation churn.
#[derive(Default)]
pub struct BlockBuffers {

View File

@@ -7,11 +7,11 @@ use brk_error::Result;
use brk_store::{AnyStore, Kind, Mode, Store};
use brk_types::{
AddrHash, AddrIndexOutPoint, AddrIndexTxIndex, BlockHashPrefix, Height, OutPoint, OutputType,
StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout,
TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout,
};
use fjall::{Database, PersistMode};
use rayon::prelude::*;
use tracing::info;
use tracing::{debug, info};
use vecdb::{AnyVec, ReadableVec, VecIndex};
use crate::{Indexes, constants::DUPLICATE_TXID_PREFIXES};
@@ -26,7 +26,6 @@ pub struct Stores {
pub addr_type_to_addr_index_and_tx_index: ByAddrType<Store<AddrIndexTxIndex, Unit>>,
pub addr_type_to_addr_index_and_unspent_outpoint: ByAddrType<Store<AddrIndexOutPoint, Unit>>,
pub blockhash_prefix_to_height: Store<BlockHashPrefix, Height>,
pub height_to_coinbase_tag: Store<Height, StoredString>,
pub txid_prefix_to_tx_index: Store<TxidPrefix, TxIndex>,
}
@@ -88,14 +87,6 @@ impl Stores {
Ok(Self {
db: database.clone(),
height_to_coinbase_tag: Store::import(
database_ref,
path,
"height_to_coinbase_tag",
version,
Mode::PushOnly,
Kind::Sequential,
)?,
addr_type_to_addr_hash_to_addr_index: ByAddrType::new_with_index(
create_addr_hash_to_addr_index_store,
)?,
@@ -135,7 +126,6 @@ impl Stores {
fn iter_any(&self) -> impl Iterator<Item = &dyn AnyStore> {
[
&self.blockhash_prefix_to_height as &dyn AnyStore,
&self.height_to_coinbase_tag,
&self.txid_prefix_to_tx_index,
]
.into_iter()
@@ -159,7 +149,6 @@ impl Stores {
fn par_iter_any_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStore> {
[
&mut self.blockhash_prefix_to_height as &mut dyn AnyStore,
&mut self.height_to_coinbase_tag,
&mut self.txid_prefix_to_tx_index,
]
.into_par_iter()
@@ -184,11 +173,11 @@ impl Stores {
let i = Instant::now();
self.par_iter_any_mut()
.try_for_each(|store| store.commit(height))?;
info!("Stores committed in {:?}", i.elapsed());
debug!("Stores committed in {:?}", i.elapsed());
let i = Instant::now();
self.db.persist(PersistMode::SyncData)?;
info!("Stores persisted in {:?}", i.elapsed());
debug!("Stores persisted in {:?}", i.elapsed());
Ok(())
}
@@ -210,7 +199,6 @@ impl Stores {
}
take!(self.blockhash_prefix_to_height);
take!(self.height_to_coinbase_tag);
take!(self.txid_prefix_to_tx_index);
for store in self.addr_type_to_addr_hash_to_addr_index.values_mut() {
@@ -257,7 +245,6 @@ impl Stores {
fn is_empty(&self) -> Result<bool> {
Ok(self.blockhash_prefix_to_height.is_empty()?
&& self.txid_prefix_to_tx_index.is_empty()?
&& self.height_to_coinbase_tag.is_empty()?
&& self
.addr_type_to_addr_hash_to_addr_index
.values()
@@ -286,12 +273,6 @@ impl Stores {
},
);
(starting_indexes.height.to_usize()..vecs.blocks.blockhash.len())
.map(Height::from)
.for_each(|h| {
self.height_to_coinbase_tag.remove(h);
});
for addr_type in OutputType::ADDR_TYPES {
for hash in vecs.iter_addr_hashes_from(addr_type, starting_indexes.height)? {
self.addr_type_to_addr_hash_to_addr_index

View File

@@ -1,6 +1,9 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{BlockHash, Height, StoredF64, StoredU64, Timestamp, Version, Weight};
use brk_types::{
BlkPosition, BlockHash, CoinbaseTag, Height, StoredF64, StoredU32, StoredU64, Timestamp,
Version, Weight,
};
use rayon::prelude::*;
use vecdb::{
AnyStoredVec, BytesVec, Database, ImportableVec, PcoVec, Rw, Stamp, StorageMode, WritableVec,
@@ -11,6 +14,7 @@ use crate::parallel_import;
#[derive(Traversable)]
pub struct BlocksVecs<M: StorageMode = Rw> {
pub blockhash: M::Stored<BytesVec<Height, BlockHash>>,
pub coinbase_tag: M::Stored<BytesVec<Height, CoinbaseTag>>,
#[traversable(wrap = "difficulty", rename = "value")]
pub difficulty: M::Stored<PcoVec<Height, StoredF64>>,
/// Doesn't guarantee continuity due to possible reorgs and more generally the nature of mining
@@ -20,45 +24,85 @@ pub struct BlocksVecs<M: StorageMode = Rw> {
pub total: M::Stored<PcoVec<Height, StoredU64>>,
#[traversable(wrap = "weight", rename = "base")]
pub weight: M::Stored<PcoVec<Height, Weight>>,
#[traversable(hidden)]
pub position: M::Stored<PcoVec<Height, BlkPosition>>,
pub segwit_txs: M::Stored<PcoVec<Height, StoredU32>>,
pub segwit_size: M::Stored<PcoVec<Height, StoredU64>>,
pub segwit_weight: M::Stored<PcoVec<Height, Weight>>,
}
impl BlocksVecs {
pub fn forced_import(db: &Database, version: Version) -> Result<Self> {
let (blockhash, difficulty, timestamp, total, weight) = parallel_import! {
blockhash = BytesVec::forced_import(db, "blockhash", version),
difficulty = PcoVec::forced_import(db, "difficulty", version),
timestamp = PcoVec::forced_import(db, "timestamp", version),
total_size = PcoVec::forced_import(db, "total_size", version),
weight = PcoVec::forced_import(db, "block_weight", version),
};
Ok(Self {
let (
blockhash,
coinbase_tag,
difficulty,
timestamp,
total,
weight,
position,
segwit_txs,
segwit_size,
segwit_weight,
) = parallel_import! {
blockhash = BytesVec::forced_import(db, "blockhash", version),
coinbase_tag = BytesVec::forced_import(db, "coinbase_tag", version),
difficulty = PcoVec::forced_import(db, "difficulty", version),
timestamp = PcoVec::forced_import(db, "timestamp", version),
total_size = PcoVec::forced_import(db, "total_size", version),
weight = PcoVec::forced_import(db, "block_weight", version),
position = PcoVec::forced_import(db, "block_position", version),
segwit_txs = PcoVec::forced_import(db, "segwit_txs", version),
segwit_size = PcoVec::forced_import(db, "segwit_size", version),
segwit_weight = PcoVec::forced_import(db, "segwit_weight", version),
};
Ok(Self {
blockhash,
coinbase_tag,
difficulty,
timestamp,
total,
weight,
position,
segwit_txs,
segwit_size,
segwit_weight,
})
}
pub fn truncate(&mut self, height: Height, stamp: Stamp) -> Result<()> {
self.blockhash
.truncate_if_needed_with_stamp(height, stamp)?;
self.coinbase_tag
.truncate_if_needed_with_stamp(height, stamp)?;
self.difficulty
.truncate_if_needed_with_stamp(height, stamp)?;
self.timestamp
.truncate_if_needed_with_stamp(height, stamp)?;
self.total.truncate_if_needed_with_stamp(height, stamp)?;
self.weight.truncate_if_needed_with_stamp(height, stamp)?;
self.position.truncate_if_needed_with_stamp(height, stamp)?;
self.segwit_txs
.truncate_if_needed_with_stamp(height, stamp)?;
self.segwit_size
.truncate_if_needed_with_stamp(height, stamp)?;
self.segwit_weight
.truncate_if_needed_with_stamp(height, stamp)?;
Ok(())
}
pub fn par_iter_mut_any(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
[
&mut self.blockhash as &mut dyn AnyStoredVec,
&mut self.coinbase_tag,
&mut self.difficulty,
&mut self.timestamp,
&mut self.total,
&mut self.weight,
&mut self.position,
&mut self.segwit_txs,
&mut self.segwit_size,
&mut self.segwit_weight,
]
.into_par_iter()
}

View File

@@ -1,8 +1,8 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{
Height, RawLockTime, StoredBool, StoredU32, TxInIndex, TxIndex, TxOutIndex, TxVersion, Txid,
Version,
BlkPosition, Height, RawLockTime, StoredBool, StoredU32, TxInIndex, TxIndex, TxOutIndex,
TxVersion, Txid, Version,
};
use rayon::prelude::*;
use vecdb::{
@@ -23,6 +23,8 @@ pub struct TransactionsVecs<M: StorageMode = Rw> {
pub is_explicitly_rbf: M::Stored<PcoVec<TxIndex, StoredBool>>,
pub first_txin_index: M::Stored<PcoVec<TxIndex, TxInIndex>>,
pub first_txout_index: M::Stored<BytesVec<TxIndex, TxOutIndex>>,
#[traversable(hidden)]
pub position: M::Stored<PcoVec<TxIndex, BlkPosition>>,
}
pub struct TxMetadataVecs<'a> {
@@ -70,6 +72,7 @@ impl TransactionsVecs {
is_explicitly_rbf,
first_txin_index,
first_txout_index,
position,
) = parallel_import! {
first_tx_index = PcoVec::forced_import(db, "first_tx_index", version),
height = PcoVec::forced_import(db, "height", version),
@@ -81,6 +84,7 @@ impl TransactionsVecs {
is_explicitly_rbf = PcoVec::forced_import(db, "is_explicitly_rbf", version),
first_txin_index = PcoVec::forced_import(db, "first_txin_index", version),
first_txout_index = BytesVec::forced_import(db, "first_txout_index", version),
position = PcoVec::forced_import(db, "tx_position", version),
};
Ok(Self {
first_tx_index,
@@ -93,6 +97,7 @@ impl TransactionsVecs {
is_explicitly_rbf,
first_txin_index,
first_txout_index,
position,
})
}
@@ -115,6 +120,8 @@ impl TransactionsVecs {
.truncate_if_needed_with_stamp(tx_index, stamp)?;
self.first_txout_index
.truncate_if_needed_with_stamp(tx_index, stamp)?;
self.position
.truncate_if_needed_with_stamp(tx_index, stamp)?;
Ok(())
}
@@ -130,6 +137,7 @@ impl TransactionsVecs {
&mut self.is_explicitly_rbf,
&mut self.first_txin_index,
&mut self.first_txout_index,
&mut self.position,
]
.into_par_iter()
}