computer: stateful snapshot

This commit is contained in:
nym21
2025-12-18 09:35:26 +01:00
parent df09b3aa28
commit 14ae41c7ba
10 changed files with 191 additions and 179 deletions

View File

@@ -10,6 +10,13 @@ use super::type_vec::AddressTypeToVec;
#[derive(Debug, Default, Deref, DerefMut)]
pub struct HeightToAddressTypeToVec<T>(FxHashMap<Height, AddressTypeToVec<T>>);
impl<T> HeightToAddressTypeToVec<T> {
/// Create with pre-allocated capacity for unique heights.
pub fn with_capacity(capacity: usize) -> Self {
Self(FxHashMap::with_capacity_and_hasher(capacity, Default::default()))
}
}
impl<T> HeightToAddressTypeToVec<T> {
/// Merge another map into this one.
pub fn merge_mut(&mut self, other: Self) {

View File

@@ -29,6 +29,20 @@ impl<T> Default for AddressTypeToTypeIndexMap<T> {
}
impl<T> AddressTypeToTypeIndexMap<T> {
/// Create with pre-allocated capacity per address type.
pub fn with_capacity(capacity: usize) -> Self {
Self(ByAddressType {
p2a: FxHashMap::with_capacity_and_hasher(capacity, Default::default()),
p2pk33: FxHashMap::with_capacity_and_hasher(capacity, Default::default()),
p2pk65: FxHashMap::with_capacity_and_hasher(capacity, Default::default()),
p2pkh: FxHashMap::with_capacity_and_hasher(capacity, Default::default()),
p2sh: FxHashMap::with_capacity_and_hasher(capacity, Default::default()),
p2tr: FxHashMap::with_capacity_and_hasher(capacity, Default::default()),
p2wpkh: FxHashMap::with_capacity_and_hasher(capacity, Default::default()),
p2wsh: FxHashMap::with_capacity_and_hasher(capacity, Default::default()),
})
}
/// Merge two maps, consuming other and extending self.
pub fn merge(mut self, mut other: Self) -> Self {
Self::merge_single(&mut self.p2a, &mut other.p2a);

View File

@@ -24,6 +24,22 @@ impl<T> Default for AddressTypeToVec<T> {
}
}
impl<T> AddressTypeToVec<T> {
/// Create with pre-allocated capacity per address type.
pub fn with_capacity(capacity: usize) -> Self {
Self(ByAddressType {
p2a: Vec::with_capacity(capacity),
p2pk33: Vec::with_capacity(capacity),
p2pk65: Vec::with_capacity(capacity),
p2pkh: Vec::with_capacity(capacity),
p2sh: Vec::with_capacity(capacity),
p2tr: Vec::with_capacity(capacity),
p2wpkh: Vec::with_capacity(capacity),
p2wsh: Vec::with_capacity(capacity),
})
}
}
impl<T> AddressTypeToVec<T> {
/// Merge two AddressTypeToVec, consuming other.
pub fn merge(mut self, mut other: Self) -> Self {

View File

@@ -275,22 +275,24 @@ pub fn process_blocks(
.tick_tock_next_block(chain_state, timestamp);
});
// Process outputs (receive)
let outputs_result = process_outputs(
first_txoutindex,
output_count,
&txoutindex_to_txindex,
&indexer.vecs.txout.txoutindex_to_value,
&indexer.vecs.txout.txoutindex_to_outputtype,
&indexer.vecs.txout.txoutindex_to_typeindex,
&ir,
&first_addressindexes,
&loaded_cache,
&empty_cache,
&vr,
&vecs.any_address_indexes,
&vecs.addresses_data,
);
let outputs_handle = scope.spawn(|| {
// Process outputs (receive)
process_outputs(
first_txoutindex,
output_count,
&txoutindex_to_txindex,
&indexer.vecs.txout.txoutindex_to_value,
&indexer.vecs.txout.txoutindex_to_outputtype,
&indexer.vecs.txout.txoutindex_to_typeindex,
&ir,
&first_addressindexes,
&loaded_cache,
&empty_cache,
&vr,
&vecs.any_address_indexes,
&vecs.addresses_data,
)
});
// Process inputs (send) - skip coinbase input
let inputs_result = if input_count > 1 {
@@ -322,6 +324,8 @@ pub fn process_blocks(
}
};
let outputs_result = outputs_handle.join().unwrap();
(outputs_result, inputs_result)
});
@@ -503,13 +507,13 @@ pub fn process_blocks(
/// Reset per-block values for all separate cohorts.
fn reset_block_values(utxo_cohorts: &mut UTXOCohorts, address_cohorts: &mut AddressCohorts) {
utxo_cohorts.par_iter_separate_mut().for_each(|v| {
utxo_cohorts.iter_separate_mut().for_each(|v| {
if let Some(state) = v.state.as_mut() {
state.reset_single_iteration_values();
}
});
address_cohorts.par_iter_separate_mut().for_each(|v| {
address_cohorts.iter_separate_mut().for_each(|v| {
if let Some(state) = v.state.as_mut() {
state.inner.reset_single_iteration_values();
}
@@ -525,12 +529,14 @@ fn push_cohort_states(
dateindex: Option<DateIndex>,
date_price: Option<Option<brk_types::Dollars>>,
) -> Result<()> {
utxo_cohorts.par_iter_separate_mut().try_for_each(|v| {
utxo_cohorts.iter_separate_mut().try_for_each(|v| {
// utxo_cohorts.par_iter_separate_mut().try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(height, height_price, dateindex, date_price)
})?;
address_cohorts.par_iter_separate_mut().try_for_each(|v| {
address_cohorts.iter_separate_mut().try_for_each(|v| {
// address_cohorts.par_iter_separate_mut().try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(height, height_price, dateindex, date_price)
})?;

View File

@@ -11,7 +11,6 @@ use crate::stateful::address::{AddressesDataVecs, AnyAddressIndexesVecs};
/// Cached readers for indexer vectors.
pub struct IndexerReaders {
pub txinindex_to_outpoint: Reader,
pub txindex_to_first_txoutindex: Reader,
pub txoutindex_to_value: Reader,
pub txoutindex_to_outputtype: Reader,
@@ -21,7 +20,6 @@ pub struct IndexerReaders {
impl IndexerReaders {
pub fn new(indexer: &Indexer) -> Self {
Self {
txinindex_to_outpoint: indexer.vecs.txin.txinindex_to_outpoint.create_reader(),
txindex_to_first_txoutindex: indexer
.vecs
.tx

View File

@@ -11,7 +11,7 @@ use brk_types::{
};
use rayon::prelude::*;
use rustc_hash::FxHashMap;
use vecdb::{BytesVec, GenericStoredVec, PcoVec};
use vecdb::{BytesVec, GenericStoredVec, PcoVec, VecIterator};
use crate::stateful::address::{
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
@@ -41,7 +41,7 @@ pub struct InputsResult {
pub txoutindex_to_txinindex_updates: Vec<(TxOutIndex, TxInIndex)>,
}
/// Process inputs (spent UTXOs) for a block in parallel.
/// Process inputs (spent UTXOs) for a block.
///
/// For each input:
/// 1. Read outpoint, resolve to txoutindex
@@ -50,6 +50,9 @@ pub struct InputsResult {
/// 4. Look up address data if input references an address type
/// 5. Accumulate into height_to_sent map
/// 6. Track address-specific data for address cohort processing
///
/// Uses parallel reads followed by sequential accumulation to avoid
/// expensive merge overhead from rayon's fold/reduce pattern.
#[allow(clippy::too_many_arguments)]
pub fn process_inputs(
first_txinindex: usize,
@@ -70,16 +73,26 @@ pub fn process_inputs(
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
) -> InputsResult {
let (height_to_sent, sent_data, address_data, txindex_vecs, txoutindex_to_txinindex_updates) = (first_txinindex
..first_txinindex + input_count)
// Phase 1: Sequential collect of outpoints (uses iterator's page cache)
// This avoids decompressing the same PcoVec page ~1000 times per page
let outpoints: Vec<OutPoint> = {
let mut iter = txinindex_to_outpoint
.clean_iter()
.expect("Failed to create outpoint iterator");
iter.set_position_to(first_txinindex);
iter.set_end_to(first_txinindex + input_count);
iter.collect()
};
// Phase 2: Parallel reads - collect all input data (outpoints already in memory)
let items: Vec<_> = (0..input_count)
.into_par_iter()
.map(|i| {
let txinindex = TxInIndex::from(i);
let local_idx = i - first_txinindex;
.map(|local_idx| {
let txinindex = TxInIndex::from(first_txinindex + local_idx);
let txindex = txinindex_to_txindex[local_idx];
// Get outpoint and resolve to txoutindex
let outpoint = txinindex_to_outpoint.read_unwrap(txinindex, &ir.txinindex_to_outpoint);
// Get outpoint from pre-collected vec and resolve to txoutindex
let outpoint = outpoints[local_idx];
let first_txoutindex = txindex_to_first_txoutindex
.read_unwrap(outpoint.txindex(), &ir.txindex_to_first_txoutindex);
let txoutindex = first_txoutindex + outpoint.vout();
@@ -121,78 +134,51 @@ pub fn process_inputs(
Some((typeindex, txindex, value, addr_data_opt)),
)
})
.fold(
|| {
(
FxHashMap::<Height, Transacted>::default(),
HeightToAddressTypeToVec::default(),
AddressTypeToTypeIndexMap::<LoadedAddressDataWithSource>::default(),
AddressTypeToTypeIndexMap::<TxIndexVec>::default(),
Vec::<(TxOutIndex, TxInIndex)>::new(),
)
},
|(mut height_to_sent, mut sent_data, mut address_data, mut txindex_vecs, mut txoutindex_to_txinindex_updates),
(txinindex, txoutindex, prev_height, value, output_type, addr_info)| {
height_to_sent
.entry(prev_height)
.or_default()
.iterate(value, output_type);
.collect();
txoutindex_to_txinindex_updates.push((txoutindex, txinindex));
// Phase 2: Sequential accumulation - no merge overhead
// Estimate: unique heights bounded by block depth, addresses spread across ~8 types
let estimated_unique_heights = (input_count / 4).max(16);
let estimated_per_type = (input_count / 8).max(8);
let mut height_to_sent = FxHashMap::<Height, Transacted>::with_capacity_and_hasher(
estimated_unique_heights,
Default::default(),
);
let mut sent_data = HeightToAddressTypeToVec::with_capacity(estimated_unique_heights);
let mut address_data =
AddressTypeToTypeIndexMap::<LoadedAddressDataWithSource>::with_capacity(estimated_per_type);
let mut txindex_vecs =
AddressTypeToTypeIndexMap::<TxIndexVec>::with_capacity(estimated_per_type);
let mut txoutindex_to_txinindex_updates = Vec::with_capacity(input_count);
if let Some((typeindex, txindex, value, addr_data_opt)) = addr_info {
sent_data
.entry(prev_height)
.or_default()
.get_mut(output_type)
.unwrap()
.push((typeindex, value));
for (txinindex, txoutindex, prev_height, value, output_type, addr_info) in items {
height_to_sent
.entry(prev_height)
.or_default()
.iterate(value, output_type);
if let Some(addr_data) = addr_data_opt {
address_data.insert_for_type(output_type, typeindex, addr_data);
}
txoutindex_to_txinindex_updates.push((txoutindex, txinindex));
txindex_vecs
.get_mut(output_type)
.unwrap()
.entry(typeindex)
.or_insert_with(TxIndexVec::new)
.push(txindex);
}
if let Some((typeindex, txindex, value, addr_data_opt)) = addr_info {
sent_data
.entry(prev_height)
.or_default()
.get_mut(output_type)
.unwrap()
.push((typeindex, value));
(height_to_sent, sent_data, address_data, txindex_vecs, txoutindex_to_txinindex_updates)
},
)
.reduce(
|| {
(
FxHashMap::<Height, Transacted>::default(),
HeightToAddressTypeToVec::default(),
AddressTypeToTypeIndexMap::<LoadedAddressDataWithSource>::default(),
AddressTypeToTypeIndexMap::<TxIndexVec>::default(),
Vec::<(TxOutIndex, TxInIndex)>::new(),
)
},
|(mut h1, mut s1, a1, tx1, updates1), (h2, s2, a2, tx2, updates2)| {
// Merge height_to_sent maps
for (k, v) in h2 {
*h1.entry(k).or_default() += v;
}
if let Some(addr_data) = addr_data_opt {
address_data.insert_for_type(output_type, typeindex, addr_data);
}
// Merge sent_data maps
s1.merge_mut(s2);
// Merge txoutindex_to_txinindex updates (extend longest with shortest)
let (mut updates, updates_consumed) = if updates1.len() > updates2.len() {
(updates1, updates2)
} else {
(updates2, updates1)
};
updates.extend(updates_consumed);
(h1, s1, a1.merge(a2), tx1.merge_vec(tx2), updates)
},
);
txindex_vecs
.get_mut(output_type)
.unwrap()
.entry(typeindex)
.or_default()
.push(txindex);
}
}
InputsResult {
height_to_sent,

View File

@@ -1,6 +1,6 @@
//! Parallel output processing.
//! Output processing.
//!
//! Processes a block's outputs (new UTXOs) in parallel, building:
//! Processes a block's outputs (new UTXOs), building:
//! - Transacted: aggregated supply by output type and amount range
//! - Address data for address cohort tracking (optional)
@@ -8,9 +8,8 @@ use brk_grouper::ByAddressType;
use brk_types::{
AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TxOutIndex, TypeIndex,
};
use rayon::prelude::*;
use smallvec::SmallVec;
use vecdb::{BytesVec, GenericStoredVec};
use vecdb::{BytesVec, GenericStoredVec, VecIterator};
use crate::stateful::address::{
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
@@ -36,10 +35,10 @@ pub struct OutputsResult {
pub txindex_vecs: AddressTypeToTypeIndexMap<TxIndexVec>,
}
/// Process outputs (new UTXOs) for a block in parallel.
/// Process outputs (new UTXOs) for a block.
///
/// For each output:
/// 1. Read value and output type from indexer
/// 1. Read value and output type from indexer (sequential via iterators)
/// 2. Accumulate into Transacted by type and amount
/// 3. Look up address data if output is an address type
/// 4. Track address-specific data for address cohort processing
@@ -60,92 +59,74 @@ pub fn process_outputs(
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
) -> OutputsResult {
let (transacted, received_data, address_data, txindex_vecs) = (first_txoutindex
..first_txoutindex + output_count)
.into_par_iter()
.map(|i| {
let txoutindex = TxOutIndex::from(i);
let local_idx = i - first_txoutindex;
let txindex = txoutindex_to_txindex[local_idx];
// Sequential iterators for value and outputtype (cache-friendly)
let mut value_iter = txoutindex_to_value
.clean_iter()
.expect("Failed to create value iterator");
value_iter.set_position_to(first_txoutindex);
value_iter.set_end_to(first_txoutindex + output_count);
let value = txoutindex_to_value.read_unwrap(txoutindex, &ir.txoutindex_to_value);
let output_type =
txoutindex_to_outputtype.read_unwrap(txoutindex, &ir.txoutindex_to_outputtype);
let mut outputtype_iter = txoutindex_to_outputtype
.clean_iter()
.expect("Failed to create outputtype iterator");
outputtype_iter.set_position_to(first_txoutindex);
outputtype_iter.set_end_to(first_txoutindex + output_count);
// Non-address outputs don't need typeindex or address lookup
if output_type.is_not_address() {
return (value, output_type, None);
}
// Pre-allocate result structures
let estimated_per_type = (output_count / 8).max(8);
let mut transacted = Transacted::default();
let mut received_data = AddressTypeToVec::with_capacity(estimated_per_type);
let mut address_data =
AddressTypeToTypeIndexMap::<LoadedAddressDataWithSource>::with_capacity(estimated_per_type);
let mut txindex_vecs =
AddressTypeToTypeIndexMap::<TxIndexVec>::with_capacity(estimated_per_type);
let typeindex =
txoutindex_to_typeindex.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex);
// Single pass: read and accumulate
for local_idx in 0..output_count {
let txoutindex = TxOutIndex::from(first_txoutindex + local_idx);
let txindex = txoutindex_to_txindex[local_idx];
// Look up address data
let addr_data_opt = get_address_data(
output_type,
typeindex,
first_addressindexes,
loaded_cache,
empty_cache,
vr,
any_address_indexes,
addresses_data,
);
let value = value_iter.next().unwrap();
let output_type = outputtype_iter.next().unwrap();
(
value,
output_type,
Some((typeindex, txindex, value, addr_data_opt)),
)
})
.fold(
|| {
(
Transacted::default(),
AddressTypeToVec::default(),
AddressTypeToTypeIndexMap::<LoadedAddressDataWithSource>::default(),
AddressTypeToTypeIndexMap::<TxIndexVec>::default(),
)
},
|(mut transacted, mut received_data, mut address_data, mut txindex_vecs),
(value, output_type, addr_info)| {
transacted.iterate(value, output_type);
transacted.iterate(value, output_type);
if let Some((typeindex, txindex, value, addr_data_opt)) = addr_info {
received_data
.get_mut(output_type)
.unwrap()
.push((typeindex, value));
if output_type.is_not_address() {
continue;
}
if let Some(addr_data) = addr_data_opt {
address_data.insert_for_type(output_type, typeindex, addr_data);
}
// typeindex only for addresses (random access)
let typeindex =
txoutindex_to_typeindex.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex);
txindex_vecs
.get_mut(output_type)
.unwrap()
.entry(typeindex)
.or_insert_with(TxIndexVec::new)
.push(txindex);
}
received_data
.get_mut(output_type)
.unwrap()
.push((typeindex, value));
(transacted, received_data, address_data, txindex_vecs)
},
)
.reduce(
|| {
(
Transacted::default(),
AddressTypeToVec::default(),
AddressTypeToTypeIndexMap::<LoadedAddressDataWithSource>::default(),
AddressTypeToTypeIndexMap::<TxIndexVec>::default(),
)
},
|(t1, r1, a1, tx1), (t2, r2, a2, tx2)| {
(t1 + t2, r1.merge(r2), a1.merge(a2), tx1.merge_vec(tx2))
},
let addr_data_opt = get_address_data(
output_type,
typeindex,
first_addressindexes,
loaded_cache,
empty_cache,
vr,
any_address_indexes,
addresses_data,
);
if let Some(addr_data) = addr_data_opt {
address_data.insert_for_type(output_type, typeindex, addr_data);
}
txindex_vecs
.get_mut(output_type)
.unwrap()
.entry(typeindex)
.or_default()
.push(txindex);
}
OutputsResult {
transacted,
received_data,