computer: stateful snapshot

This commit is contained in:
nym21
2025-12-18 10:53:47 +01:00
parent 14ae41c7ba
commit 59f1296d56
9 changed files with 131 additions and 110 deletions
@@ -13,18 +13,14 @@ pub struct HeightToAddressTypeToVec<T>(FxHashMap<Height, AddressTypeToVec<T>>);
impl<T> HeightToAddressTypeToVec<T> {
/// Create with pre-allocated capacity for unique heights.
pub fn with_capacity(capacity: usize) -> Self {
Self(FxHashMap::with_capacity_and_hasher(capacity, Default::default()))
Self(FxHashMap::with_capacity_and_hasher(
capacity,
Default::default(),
))
}
}
impl<T> HeightToAddressTypeToVec<T> {
/// Merge another map into this one.
pub fn merge_mut(&mut self, other: Self) {
for (height, vec) in other.0 {
self.entry(height).or_default().merge_mut(vec);
}
}
/// Consume and iterate over (Height, AddressTypeToVec) pairs.
pub fn into_iter(self) -> impl Iterator<Item = (Height, AddressTypeToVec<T>)> {
self.0.into_iter()
@@ -1,7 +1,5 @@
//! Per-address-type vector.
use std::mem;
use brk_grouper::ByAddressType;
use derive_deref::{Deref, DerefMut};
@@ -41,40 +39,6 @@ impl<T> AddressTypeToVec<T> {
}
impl<T> AddressTypeToVec<T> {
/// Merge two AddressTypeToVec, consuming other.
pub fn merge(mut self, mut other: Self) -> Self {
Self::merge_single(&mut self.p2a, &mut other.p2a);
Self::merge_single(&mut self.p2pk33, &mut other.p2pk33);
Self::merge_single(&mut self.p2pk65, &mut other.p2pk65);
Self::merge_single(&mut self.p2pkh, &mut other.p2pkh);
Self::merge_single(&mut self.p2sh, &mut other.p2sh);
Self::merge_single(&mut self.p2tr, &mut other.p2tr);
Self::merge_single(&mut self.p2wpkh, &mut other.p2wpkh);
Self::merge_single(&mut self.p2wsh, &mut other.p2wsh);
self
}
/// Merge in place.
pub fn merge_mut(&mut self, mut other: Self) {
Self::merge_single(&mut self.p2a, &mut other.p2a);
Self::merge_single(&mut self.p2pk33, &mut other.p2pk33);
Self::merge_single(&mut self.p2pk65, &mut other.p2pk65);
Self::merge_single(&mut self.p2pkh, &mut other.p2pkh);
Self::merge_single(&mut self.p2sh, &mut other.p2sh);
Self::merge_single(&mut self.p2tr, &mut other.p2tr);
Self::merge_single(&mut self.p2wpkh, &mut other.p2wpkh);
Self::merge_single(&mut self.p2wsh, &mut other.p2wsh);
}
fn merge_single(own: &mut Vec<T>, other: &mut Vec<T>) {
if own.len() >= other.len() {
own.append(other);
} else {
other.append(own);
mem::swap(own, other);
}
}
/// Unwrap the inner ByAddressType.
pub fn unwrap(self) -> ByAddressType<Vec<T>> {
self.0
@@ -27,8 +27,8 @@ use super::super::cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts};
use super::super::vecs::Vecs;
use super::{
BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1,
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, VecsReaders,
build_txinindex_to_txindex, build_txoutindex_to_txindex,
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, TxInIterators,
TxOutIterators, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex,
flush::flush_checkpoint as flush_checkpoint_full,
};
use crate::stateful::address::AddressTypeToAddressCount;
@@ -131,6 +131,10 @@ pub fn process_blocks(
let ir = IndexerReaders::new(indexer);
let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
// Create reusable iterators for sequential txout/txin reads (16KB buffered)
let mut txout_iters = TxOutIterators::new(indexer);
let mut txin_iters = TxInIterators::new(indexer);
info!("Creating address iterators...");
// Create iterators for first address indexes per type
@@ -267,6 +271,17 @@ pub fn process_blocks(
// Reset per-block values for all separate cohorts
reset_block_values(&mut vecs.utxo_cohorts, &mut vecs.address_cohorts);
// Collect output/input data using reusable iterators (16KB buffered reads)
// Must be done before thread::scope since iterators aren't Send
let (output_values, output_types, output_typeindexes) =
txout_iters.collect_block_outputs(first_txoutindex, output_count);
let input_outpoints = if input_count > 1 {
txin_iters.collect_block_outpoints(first_txinindex + 1, input_count - 1)
} else {
Vec::new()
};
// Process outputs and inputs in parallel with tick-tock
let (outputs_result, inputs_result) = thread::scope(|scope| {
// Tick-tock age transitions in background
@@ -278,13 +293,11 @@ pub fn process_blocks(
let outputs_handle = scope.spawn(|| {
// Process outputs (receive)
process_outputs(
first_txoutindex,
output_count,
&txoutindex_to_txindex,
&indexer.vecs.txout.txoutindex_to_value,
&indexer.vecs.txout.txoutindex_to_outputtype,
&indexer.vecs.txout.txoutindex_to_typeindex,
&ir,
&output_values,
&output_types,
&output_typeindexes,
&first_addressindexes,
&loaded_cache,
&empty_cache,
@@ -300,7 +313,7 @@ pub fn process_blocks(
first_txinindex + 1, // Skip coinbase
input_count - 1,
&txinindex_to_txindex[1..], // Skip coinbase
&indexer.vecs.txin.txinindex_to_outpoint,
&input_outpoints,
&indexer.vecs.tx.txindex_to_first_txoutindex,
&indexer.vecs.txout.txoutindex_to_value,
&indexer.vecs.txout.txoutindex_to_outputtype,
@@ -562,11 +575,6 @@ fn flush_checkpoint(
) -> Result<()> {
info!("Flushing checkpoint at height {}...", height);
// Flush cohort states
vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?;
vecs.address_cohorts
.safe_flush_stateful_vecs(height, exit)?;
// Flush height-indexed vectors
vecs.height_to_unspendable_supply.safe_write(exit)?;
vecs.height_to_opreturn_supply.safe_write(exit)?;
@@ -17,7 +17,8 @@ mod recover;
pub use block_loop::process_blocks;
pub use context::ComputeContext;
pub use readers::{
IndexerReaders, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex,
IndexerReaders, TxInIterators, TxOutIterators, VecsReaders, build_txinindex_to_txindex,
build_txoutindex_to_txindex,
};
pub use recover::{StartMode, determine_start_mode, recover_state, reset_state};
@@ -4,8 +4,11 @@
use brk_grouper::{ByAddressType, ByAnyAddress};
use brk_indexer::Indexer;
use brk_types::{OutputType, StoredU64, TxIndex};
use vecdb::{BoxedVecIterator, GenericStoredVec, Reader, VecIndex};
use brk_types::{OutPoint, OutputType, Sats, StoredU64, TxInIndex, TxIndex, TxOutIndex, TypeIndex};
use vecdb::{
BoxedVecIterator, BytesVecIterator, GenericStoredVec, PcodecVecIterator, Reader, VecIndex,
VecIterator,
};
use crate::stateful::address::{AddressesDataVecs, AnyAddressIndexesVecs};
@@ -32,6 +35,70 @@ impl IndexerReaders {
}
}
/// Reusable iterators for txout vectors (16KB buffered reads).
///
/// Iterators are created once and re-positioned each block to avoid
/// creating new file handles repeatedly.
pub struct TxOutIterators<'a> {
value_iter: BytesVecIterator<'a, TxOutIndex, Sats>,
outputtype_iter: BytesVecIterator<'a, TxOutIndex, OutputType>,
typeindex_iter: BytesVecIterator<'a, TxOutIndex, TypeIndex>,
}
impl<'a> TxOutIterators<'a> {
pub fn new(indexer: &'a Indexer) -> Self {
Self {
value_iter: indexer.vecs.txout.txoutindex_to_value.into_iter(),
outputtype_iter: indexer.vecs.txout.txoutindex_to_outputtype.into_iter(),
typeindex_iter: indexer.vecs.txout.txoutindex_to_typeindex.into_iter(),
}
}
/// Collect output data for a block range using buffered iteration.
pub fn collect_block_outputs(
&mut self,
first_txoutindex: usize,
output_count: usize,
) -> (Vec<Sats>, Vec<OutputType>, Vec<TypeIndex>) {
let mut values = Vec::with_capacity(output_count);
let mut output_types = Vec::with_capacity(output_count);
let mut type_indexes = Vec::with_capacity(output_count);
for i in first_txoutindex..first_txoutindex + output_count {
values.push(self.value_iter.get_at_unwrap(i));
output_types.push(self.outputtype_iter.get_at_unwrap(i));
type_indexes.push(self.typeindex_iter.get_at_unwrap(i));
}
(values, output_types, type_indexes)
}
}
/// Reusable iterator for txin outpoints (PcoVec - avoids repeated page decompression).
pub struct TxInIterators<'a> {
outpoint_iter: PcodecVecIterator<'a, TxInIndex, OutPoint>,
}
impl<'a> TxInIterators<'a> {
pub fn new(indexer: &'a Indexer) -> Self {
Self {
outpoint_iter: indexer.vecs.txin.txinindex_to_outpoint.into_iter(),
}
}
/// Collect outpoints for a block range using buffered iteration.
/// This avoids repeated PcoVec page decompression (~1000x speedup).
pub fn collect_block_outpoints(
&mut self,
first_txinindex: usize,
input_count: usize,
) -> Vec<OutPoint> {
(first_txinindex..first_txinindex + input_count)
.map(|i| self.outpoint_iter.get_at_unwrap(i))
.collect()
}
}
/// Cached readers for stateful vectors.
pub struct VecsReaders {
pub addresstypeindex_to_anyaddressindex: ByAddressType<Reader>,
@@ -11,7 +11,7 @@ use brk_types::{
};
use rayon::prelude::*;
use rustc_hash::FxHashMap;
use vecdb::{BytesVec, GenericStoredVec, PcoVec, VecIterator};
use vecdb::{BytesVec, GenericStoredVec};
use crate::stateful::address::{
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
@@ -44,12 +44,13 @@ pub struct InputsResult {
/// Process inputs (spent UTXOs) for a block.
///
/// For each input:
/// 1. Read outpoint, resolve to txoutindex
/// 2. Get the creation height from txoutindex_to_height map
/// 3. Read value and type from the referenced output
/// 4. Look up address data if input references an address type
/// 5. Accumulate into height_to_sent map
/// 6. Track address-specific data for address cohort processing
/// 1. Use pre-collected outpoint (from reusable iterator, avoids PcoVec re-decompression)
/// 2. Resolve outpoint to txoutindex
/// 3. Get the creation height from txoutindex_to_height map
/// 4. Read value and type from the referenced output (random access via mmap)
/// 5. Look up address data if input references an address type
/// 6. Accumulate into height_to_sent map
/// 7. Track address-specific data for address cohort processing
///
/// Uses parallel reads followed by sequential accumulation to avoid
/// expensive merge overhead from rayon's fold/reduce pattern.
@@ -58,7 +59,8 @@ pub fn process_inputs(
first_txinindex: usize,
input_count: usize,
txinindex_to_txindex: &[TxIndex],
txinindex_to_outpoint: &PcoVec<TxInIndex, OutPoint>,
// Pre-collected outpoints (from reusable iterator with page caching)
outpoints: &[OutPoint],
txindex_to_first_txoutindex: &BytesVec<TxIndex, TxOutIndex>,
txoutindex_to_value: &BytesVec<TxOutIndex, Sats>,
txoutindex_to_outputtype: &BytesVec<TxOutIndex, OutputType>,
@@ -73,18 +75,7 @@ pub fn process_inputs(
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
) -> InputsResult {
// Phase 1: Sequential collect of outpoints (uses iterator's page cache)
// This avoids decompressing the same PcoVec page ~1000 times per page
let outpoints: Vec<OutPoint> = {
let mut iter = txinindex_to_outpoint
.clean_iter()
.expect("Failed to create outpoint iterator");
iter.set_position_to(first_txinindex);
iter.set_end_to(first_txinindex + input_count);
iter.collect()
};
// Phase 2: Parallel reads - collect all input data (outpoints already in memory)
// Parallel reads - collect all input data (outpoints already in memory)
let items: Vec<_> = (0..input_count)
.into_par_iter()
.map(|local_idx| {
@@ -6,16 +6,16 @@
use brk_grouper::ByAddressType;
use brk_types::{
AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TxOutIndex, TypeIndex,
AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TypeIndex,
};
use smallvec::SmallVec;
use vecdb::{BytesVec, GenericStoredVec, VecIterator};
use vecdb::GenericStoredVec;
use crate::stateful::address::{
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
};
use crate::stateful::compute::VecsReaders;
use crate::{stateful::IndexerReaders, states::Transacted};
use crate::states::Transacted;
use super::super::address::AddressTypeToVec;
use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, WithAddressDataSource};
@@ -38,19 +38,18 @@ pub struct OutputsResult {
/// Process outputs (new UTXOs) for a block.
///
/// For each output:
/// 1. Read value and output type from indexer (sequential via iterators)
/// 1. Read pre-collected value, output type, and typeindex
/// 2. Accumulate into Transacted by type and amount
/// 3. Look up address data if output is an address type
/// 4. Track address-specific data for address cohort processing
#[allow(clippy::too_many_arguments)]
pub fn process_outputs(
first_txoutindex: usize,
output_count: usize,
txoutindex_to_txindex: &[TxIndex],
txoutindex_to_value: &BytesVec<TxOutIndex, Sats>,
txoutindex_to_outputtype: &BytesVec<TxOutIndex, OutputType>,
txoutindex_to_typeindex: &BytesVec<TxOutIndex, TypeIndex>,
ir: &IndexerReaders,
// Pre-collected output data (from reusable iterators with 16KB buffered reads)
values: &[Sats],
output_types: &[OutputType],
typeindexes: &[TypeIndex],
// Address lookup parameters
first_addressindexes: &ByAddressType<TypeIndex>,
loaded_cache: &AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
@@ -59,19 +58,6 @@ pub fn process_outputs(
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
) -> OutputsResult {
// Sequential iterators for value and outputtype (cache-friendly)
let mut value_iter = txoutindex_to_value
.clean_iter()
.expect("Failed to create value iterator");
value_iter.set_position_to(first_txoutindex);
value_iter.set_end_to(first_txoutindex + output_count);
let mut outputtype_iter = txoutindex_to_outputtype
.clean_iter()
.expect("Failed to create outputtype iterator");
outputtype_iter.set_position_to(first_txoutindex);
outputtype_iter.set_end_to(first_txoutindex + output_count);
// Pre-allocate result structures
let estimated_per_type = (output_count / 8).max(8);
let mut transacted = Transacted::default();
@@ -81,13 +67,11 @@ pub fn process_outputs(
let mut txindex_vecs =
AddressTypeToTypeIndexMap::<TxIndexVec>::with_capacity(estimated_per_type);
// Single pass: read and accumulate
// Single pass: read from pre-collected vecs and accumulate
for local_idx in 0..output_count {
let txoutindex = TxOutIndex::from(first_txoutindex + local_idx);
let txindex = txoutindex_to_txindex[local_idx];
let value = value_iter.next().unwrap();
let output_type = outputtype_iter.next().unwrap();
let value = values[local_idx];
let output_type = output_types[local_idx];
transacted.iterate(value, output_type);
@@ -95,9 +79,7 @@ pub fn process_outputs(
continue;
}
// typeindex only for addresses (random access)
let typeindex =
txoutindex_to_typeindex.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex);
let typeindex = typeindexes[local_idx];
received_data
.get_mut(output_type)
@@ -42,6 +42,7 @@ impl FenwickTree {
}
/// Get prefix sum of elements 0..=idx. O(log n).
#[allow(unused)]
pub fn prefix_sum(&self, idx: usize) -> u64 {
let mut sum = 0u64;
let mut i = idx + 1; // Convert to 1-indexed
@@ -84,11 +85,13 @@ impl FenwickTree {
}
/// Get total sum of all elements. O(log n).
#[allow(unused)]
pub fn total(&self) -> u64 {
self.prefix_sum(self.len.saturating_sub(1))
}
/// Reset all values to zero. O(n).
#[allow(unused)]
pub fn clear(&mut self) {
self.tree.fill(0);
}
@@ -12,6 +12,7 @@ use crate::grouped::{PERCENTILES, PERCENTILES_LEN};
const MIN_PRICE: f64 = 0.001;
/// Maximum price tracked ($100M for future-proofing).
#[allow(unused)]
const MAX_PRICE: f64 = 100_000_000.0;
/// Base for logarithmic buckets (0.1% precision).
@@ -97,11 +98,13 @@ impl PriceBuckets {
}
/// Check if empty.
#[allow(unused)]
pub fn is_empty(&self) -> bool {
self.total == Sats::ZERO
}
/// Get total supply.
#[allow(unused)]
pub fn total(&self) -> Sats {
self.total
}
@@ -127,12 +130,14 @@ impl PriceBuckets {
}
/// Get amount in a specific bucket.
#[allow(unused)]
pub fn get_bucket(&self, bucket: usize) -> Sats {
self.buckets.get(bucket).copied().unwrap_or(Sats::ZERO)
}
/// Iterate over non-empty buckets in a price range.
/// Used for unrealized computation flip range.
#[allow(unused)]
pub fn iter_range(
&self,
from_price: Dollars,
@@ -158,6 +163,7 @@ impl PriceBuckets {
}
/// Iterate over all non-empty buckets (for full unrealized computation).
#[allow(unused)]
pub fn iter(&self) -> impl Iterator<Item = (Dollars, Sats)> + '_ {
self.buckets
.iter()
@@ -172,6 +178,7 @@ impl PriceBuckets {
}
/// Get the lowest price bucket with non-zero amount.
#[allow(unused)]
pub fn min_price(&self) -> Option<Dollars> {
self.buckets
.iter()
@@ -180,6 +187,7 @@ impl PriceBuckets {
}
/// Get the highest price bucket with non-zero amount.
#[allow(unused)]
pub fn max_price(&self) -> Option<Dollars> {
self.buckets
.iter()
@@ -188,6 +196,7 @@ impl PriceBuckets {
}
/// Clear all data.
#[allow(unused)]
pub fn clear(&mut self) {
self.fenwick.clear();
self.buckets.fill(Sats::ZERO);