diff --git a/crates/brk_computer/src/stateful/address/height_type_vec.rs b/crates/brk_computer/src/stateful/address/height_type_vec.rs index f746c0375..1f5e93de1 100644 --- a/crates/brk_computer/src/stateful/address/height_type_vec.rs +++ b/crates/brk_computer/src/stateful/address/height_type_vec.rs @@ -10,6 +10,13 @@ use super::type_vec::AddressTypeToVec; #[derive(Debug, Default, Deref, DerefMut)] pub struct HeightToAddressTypeToVec(FxHashMap>); +impl HeightToAddressTypeToVec { + /// Create with pre-allocated capacity for unique heights. + pub fn with_capacity(capacity: usize) -> Self { + Self(FxHashMap::with_capacity_and_hasher(capacity, Default::default())) + } +} + impl HeightToAddressTypeToVec { /// Merge another map into this one. pub fn merge_mut(&mut self, other: Self) { diff --git a/crates/brk_computer/src/stateful/address/type_index_map.rs b/crates/brk_computer/src/stateful/address/type_index_map.rs index 8ae2652f1..ca3696e24 100644 --- a/crates/brk_computer/src/stateful/address/type_index_map.rs +++ b/crates/brk_computer/src/stateful/address/type_index_map.rs @@ -29,6 +29,20 @@ impl Default for AddressTypeToTypeIndexMap { } impl AddressTypeToTypeIndexMap { + /// Create with pre-allocated capacity per address type. + pub fn with_capacity(capacity: usize) -> Self { + Self(ByAddressType { + p2a: FxHashMap::with_capacity_and_hasher(capacity, Default::default()), + p2pk33: FxHashMap::with_capacity_and_hasher(capacity, Default::default()), + p2pk65: FxHashMap::with_capacity_and_hasher(capacity, Default::default()), + p2pkh: FxHashMap::with_capacity_and_hasher(capacity, Default::default()), + p2sh: FxHashMap::with_capacity_and_hasher(capacity, Default::default()), + p2tr: FxHashMap::with_capacity_and_hasher(capacity, Default::default()), + p2wpkh: FxHashMap::with_capacity_and_hasher(capacity, Default::default()), + p2wsh: FxHashMap::with_capacity_and_hasher(capacity, Default::default()), + }) + } + /// Merge two maps, consuming other and extending self. pub fn merge(mut self, mut other: Self) -> Self { Self::merge_single(&mut self.p2a, &mut other.p2a); diff --git a/crates/brk_computer/src/stateful/address/type_vec.rs b/crates/brk_computer/src/stateful/address/type_vec.rs index 47472ac36..e111c9dcc 100644 --- a/crates/brk_computer/src/stateful/address/type_vec.rs +++ b/crates/brk_computer/src/stateful/address/type_vec.rs @@ -24,6 +24,22 @@ impl Default for AddressTypeToVec { } } +impl AddressTypeToVec { + /// Create with pre-allocated capacity per address type. + pub fn with_capacity(capacity: usize) -> Self { + Self(ByAddressType { + p2a: Vec::with_capacity(capacity), + p2pk33: Vec::with_capacity(capacity), + p2pk65: Vec::with_capacity(capacity), + p2pkh: Vec::with_capacity(capacity), + p2sh: Vec::with_capacity(capacity), + p2tr: Vec::with_capacity(capacity), + p2wpkh: Vec::with_capacity(capacity), + p2wsh: Vec::with_capacity(capacity), + }) + } +} + impl AddressTypeToVec { /// Merge two AddressTypeToVec, consuming other. pub fn merge(mut self, mut other: Self) -> Self { diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index e71d0e5c1..2b2f576f2 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -275,22 +275,24 @@ pub fn process_blocks( .tick_tock_next_block(chain_state, timestamp); }); - // Process outputs (receive) - let outputs_result = process_outputs( - first_txoutindex, - output_count, - &txoutindex_to_txindex, - &indexer.vecs.txout.txoutindex_to_value, - &indexer.vecs.txout.txoutindex_to_outputtype, - &indexer.vecs.txout.txoutindex_to_typeindex, - &ir, - &first_addressindexes, - &loaded_cache, - &empty_cache, - &vr, - &vecs.any_address_indexes, - &vecs.addresses_data, - ); + let outputs_handle = scope.spawn(|| { + // Process outputs (receive) + process_outputs( + first_txoutindex, + output_count, + &txoutindex_to_txindex, + &indexer.vecs.txout.txoutindex_to_value, + &indexer.vecs.txout.txoutindex_to_outputtype, + &indexer.vecs.txout.txoutindex_to_typeindex, + &ir, + &first_addressindexes, + &loaded_cache, + &empty_cache, + &vr, + &vecs.any_address_indexes, + &vecs.addresses_data, + ) + }); // Process inputs (send) - skip coinbase input let inputs_result = if input_count > 1 { @@ -322,6 +324,8 @@ pub fn process_blocks( } }; + let outputs_result = outputs_handle.join().unwrap(); + (outputs_result, inputs_result) }); @@ -503,13 +507,13 @@ pub fn process_blocks( /// Reset per-block values for all separate cohorts. fn reset_block_values(utxo_cohorts: &mut UTXOCohorts, address_cohorts: &mut AddressCohorts) { - utxo_cohorts.par_iter_separate_mut().for_each(|v| { + utxo_cohorts.iter_separate_mut().for_each(|v| { if let Some(state) = v.state.as_mut() { state.reset_single_iteration_values(); } }); - address_cohorts.par_iter_separate_mut().for_each(|v| { + address_cohorts.iter_separate_mut().for_each(|v| { if let Some(state) = v.state.as_mut() { state.inner.reset_single_iteration_values(); } @@ -525,12 +529,14 @@ fn push_cohort_states( dateindex: Option, date_price: Option>, ) -> Result<()> { - utxo_cohorts.par_iter_separate_mut().try_for_each(|v| { + utxo_cohorts.iter_separate_mut().try_for_each(|v| { + // utxo_cohorts.par_iter_separate_mut().try_for_each(|v| { v.truncate_push(height)?; v.compute_then_truncate_push_unrealized_states(height, height_price, dateindex, date_price) })?; - address_cohorts.par_iter_separate_mut().try_for_each(|v| { + address_cohorts.iter_separate_mut().try_for_each(|v| { + // address_cohorts.par_iter_separate_mut().try_for_each(|v| { v.truncate_push(height)?; v.compute_then_truncate_push_unrealized_states(height, height_price, dateindex, date_price) })?; diff --git a/crates/brk_computer/src/stateful/compute/readers.rs b/crates/brk_computer/src/stateful/compute/readers.rs index 534e1ed73..93fcf18e3 100644 --- a/crates/brk_computer/src/stateful/compute/readers.rs +++ b/crates/brk_computer/src/stateful/compute/readers.rs @@ -11,7 +11,6 @@ use crate::stateful::address::{AddressesDataVecs, AnyAddressIndexesVecs}; /// Cached readers for indexer vectors. pub struct IndexerReaders { - pub txinindex_to_outpoint: Reader, pub txindex_to_first_txoutindex: Reader, pub txoutindex_to_value: Reader, pub txoutindex_to_outputtype: Reader, @@ -21,7 +20,6 @@ pub struct IndexerReaders { impl IndexerReaders { pub fn new(indexer: &Indexer) -> Self { Self { - txinindex_to_outpoint: indexer.vecs.txin.txinindex_to_outpoint.create_reader(), txindex_to_first_txoutindex: indexer .vecs .tx diff --git a/crates/brk_computer/src/stateful/process/inputs.rs b/crates/brk_computer/src/stateful/process/inputs.rs index f4253a842..de8779f48 100644 --- a/crates/brk_computer/src/stateful/process/inputs.rs +++ b/crates/brk_computer/src/stateful/process/inputs.rs @@ -11,7 +11,7 @@ use brk_types::{ }; use rayon::prelude::*; use rustc_hash::FxHashMap; -use vecdb::{BytesVec, GenericStoredVec, PcoVec}; +use vecdb::{BytesVec, GenericStoredVec, PcoVec, VecIterator}; use crate::stateful::address::{ AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs, @@ -41,7 +41,7 @@ pub struct InputsResult { pub txoutindex_to_txinindex_updates: Vec<(TxOutIndex, TxInIndex)>, } -/// Process inputs (spent UTXOs) for a block in parallel. +/// Process inputs (spent UTXOs) for a block. /// /// For each input: /// 1. Read outpoint, resolve to txoutindex @@ -50,6 +50,9 @@ pub struct InputsResult { /// 4. Look up address data if input references an address type /// 5. Accumulate into height_to_sent map /// 6. Track address-specific data for address cohort processing +/// +/// Uses parallel reads followed by sequential accumulation to avoid +/// expensive merge overhead from rayon's fold/reduce pattern. #[allow(clippy::too_many_arguments)] pub fn process_inputs( first_txinindex: usize, @@ -70,16 +73,26 @@ pub fn process_inputs( any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, ) -> InputsResult { - let (height_to_sent, sent_data, address_data, txindex_vecs, txoutindex_to_txinindex_updates) = (first_txinindex - ..first_txinindex + input_count) + // Phase 1: Sequential collect of outpoints (uses iterator's page cache) + // This avoids decompressing the same PcoVec page ~1000 times per page + let outpoints: Vec = { + let mut iter = txinindex_to_outpoint + .clean_iter() + .expect("Failed to create outpoint iterator"); + iter.set_position_to(first_txinindex); + iter.set_end_to(first_txinindex + input_count); + iter.collect() + }; + + // Phase 2: Parallel reads - collect all input data (outpoints already in memory) + let items: Vec<_> = (0..input_count) .into_par_iter() - .map(|i| { - let txinindex = TxInIndex::from(i); - let local_idx = i - first_txinindex; + .map(|local_idx| { + let txinindex = TxInIndex::from(first_txinindex + local_idx); let txindex = txinindex_to_txindex[local_idx]; - // Get outpoint and resolve to txoutindex - let outpoint = txinindex_to_outpoint.read_unwrap(txinindex, &ir.txinindex_to_outpoint); + // Get outpoint from pre-collected vec and resolve to txoutindex + let outpoint = outpoints[local_idx]; let first_txoutindex = txindex_to_first_txoutindex .read_unwrap(outpoint.txindex(), &ir.txindex_to_first_txoutindex); let txoutindex = first_txoutindex + outpoint.vout(); @@ -121,78 +134,51 @@ pub fn process_inputs( Some((typeindex, txindex, value, addr_data_opt)), ) }) - .fold( - || { - ( - FxHashMap::::default(), - HeightToAddressTypeToVec::default(), - AddressTypeToTypeIndexMap::::default(), - AddressTypeToTypeIndexMap::::default(), - Vec::<(TxOutIndex, TxInIndex)>::new(), - ) - }, - |(mut height_to_sent, mut sent_data, mut address_data, mut txindex_vecs, mut txoutindex_to_txinindex_updates), - (txinindex, txoutindex, prev_height, value, output_type, addr_info)| { - height_to_sent - .entry(prev_height) - .or_default() - .iterate(value, output_type); + .collect(); - txoutindex_to_txinindex_updates.push((txoutindex, txinindex)); + // Phase 2: Sequential accumulation - no merge overhead + // Estimate: unique heights bounded by block depth, addresses spread across ~8 types + let estimated_unique_heights = (input_count / 4).max(16); + let estimated_per_type = (input_count / 8).max(8); + let mut height_to_sent = FxHashMap::::with_capacity_and_hasher( + estimated_unique_heights, + Default::default(), + ); + let mut sent_data = HeightToAddressTypeToVec::with_capacity(estimated_unique_heights); + let mut address_data = + AddressTypeToTypeIndexMap::::with_capacity(estimated_per_type); + let mut txindex_vecs = + AddressTypeToTypeIndexMap::::with_capacity(estimated_per_type); + let mut txoutindex_to_txinindex_updates = Vec::with_capacity(input_count); - if let Some((typeindex, txindex, value, addr_data_opt)) = addr_info { - sent_data - .entry(prev_height) - .or_default() - .get_mut(output_type) - .unwrap() - .push((typeindex, value)); + for (txinindex, txoutindex, prev_height, value, output_type, addr_info) in items { + height_to_sent + .entry(prev_height) + .or_default() + .iterate(value, output_type); - if let Some(addr_data) = addr_data_opt { - address_data.insert_for_type(output_type, typeindex, addr_data); - } + txoutindex_to_txinindex_updates.push((txoutindex, txinindex)); - txindex_vecs - .get_mut(output_type) - .unwrap() - .entry(typeindex) - .or_insert_with(TxIndexVec::new) - .push(txindex); - } + if let Some((typeindex, txindex, value, addr_data_opt)) = addr_info { + sent_data + .entry(prev_height) + .or_default() + .get_mut(output_type) + .unwrap() + .push((typeindex, value)); - (height_to_sent, sent_data, address_data, txindex_vecs, txoutindex_to_txinindex_updates) - }, - ) - .reduce( - || { - ( - FxHashMap::::default(), - HeightToAddressTypeToVec::default(), - AddressTypeToTypeIndexMap::::default(), - AddressTypeToTypeIndexMap::::default(), - Vec::<(TxOutIndex, TxInIndex)>::new(), - ) - }, - |(mut h1, mut s1, a1, tx1, updates1), (h2, s2, a2, tx2, updates2)| { - // Merge height_to_sent maps - for (k, v) in h2 { - *h1.entry(k).or_default() += v; - } + if let Some(addr_data) = addr_data_opt { + address_data.insert_for_type(output_type, typeindex, addr_data); + } - // Merge sent_data maps - s1.merge_mut(s2); - - // Merge txoutindex_to_txinindex updates (extend longest with shortest) - let (mut updates, updates_consumed) = if updates1.len() > updates2.len() { - (updates1, updates2) - } else { - (updates2, updates1) - }; - updates.extend(updates_consumed); - - (h1, s1, a1.merge(a2), tx1.merge_vec(tx2), updates) - }, - ); + txindex_vecs + .get_mut(output_type) + .unwrap() + .entry(typeindex) + .or_default() + .push(txindex); + } + } InputsResult { height_to_sent, diff --git a/crates/brk_computer/src/stateful/process/outputs.rs b/crates/brk_computer/src/stateful/process/outputs.rs index 20d22bb99..d745d73ef 100644 --- a/crates/brk_computer/src/stateful/process/outputs.rs +++ b/crates/brk_computer/src/stateful/process/outputs.rs @@ -1,6 +1,6 @@ -//! Parallel output processing. +//! Output processing. //! -//! Processes a block's outputs (new UTXOs) in parallel, building: +//! Processes a block's outputs (new UTXOs), building: //! - Transacted: aggregated supply by output type and amount range //! - Address data for address cohort tracking (optional) @@ -8,9 +8,8 @@ use brk_grouper::ByAddressType; use brk_types::{ AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TxOutIndex, TypeIndex, }; -use rayon::prelude::*; use smallvec::SmallVec; -use vecdb::{BytesVec, GenericStoredVec}; +use vecdb::{BytesVec, GenericStoredVec, VecIterator}; use crate::stateful::address::{ AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs, @@ -36,10 +35,10 @@ pub struct OutputsResult { pub txindex_vecs: AddressTypeToTypeIndexMap, } -/// Process outputs (new UTXOs) for a block in parallel. +/// Process outputs (new UTXOs) for a block. /// /// For each output: -/// 1. Read value and output type from indexer +/// 1. Read value and output type from indexer (sequential via iterators) /// 2. Accumulate into Transacted by type and amount /// 3. Look up address data if output is an address type /// 4. Track address-specific data for address cohort processing @@ -60,92 +59,74 @@ pub fn process_outputs( any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, ) -> OutputsResult { - let (transacted, received_data, address_data, txindex_vecs) = (first_txoutindex - ..first_txoutindex + output_count) - .into_par_iter() - .map(|i| { - let txoutindex = TxOutIndex::from(i); - let local_idx = i - first_txoutindex; - let txindex = txoutindex_to_txindex[local_idx]; + // Sequential iterators for value and outputtype (cache-friendly) + let mut value_iter = txoutindex_to_value + .clean_iter() + .expect("Failed to create value iterator"); + value_iter.set_position_to(first_txoutindex); + value_iter.set_end_to(first_txoutindex + output_count); - let value = txoutindex_to_value.read_unwrap(txoutindex, &ir.txoutindex_to_value); - let output_type = - txoutindex_to_outputtype.read_unwrap(txoutindex, &ir.txoutindex_to_outputtype); + let mut outputtype_iter = txoutindex_to_outputtype + .clean_iter() + .expect("Failed to create outputtype iterator"); + outputtype_iter.set_position_to(first_txoutindex); + outputtype_iter.set_end_to(first_txoutindex + output_count); - // Non-address outputs don't need typeindex or address lookup - if output_type.is_not_address() { - return (value, output_type, None); - } + // Pre-allocate result structures + let estimated_per_type = (output_count / 8).max(8); + let mut transacted = Transacted::default(); + let mut received_data = AddressTypeToVec::with_capacity(estimated_per_type); + let mut address_data = + AddressTypeToTypeIndexMap::::with_capacity(estimated_per_type); + let mut txindex_vecs = + AddressTypeToTypeIndexMap::::with_capacity(estimated_per_type); - let typeindex = - txoutindex_to_typeindex.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex); + // Single pass: read and accumulate + for local_idx in 0..output_count { + let txoutindex = TxOutIndex::from(first_txoutindex + local_idx); + let txindex = txoutindex_to_txindex[local_idx]; - // Look up address data - let addr_data_opt = get_address_data( - output_type, - typeindex, - first_addressindexes, - loaded_cache, - empty_cache, - vr, - any_address_indexes, - addresses_data, - ); + let value = value_iter.next().unwrap(); + let output_type = outputtype_iter.next().unwrap(); - ( - value, - output_type, - Some((typeindex, txindex, value, addr_data_opt)), - ) - }) - .fold( - || { - ( - Transacted::default(), - AddressTypeToVec::default(), - AddressTypeToTypeIndexMap::::default(), - AddressTypeToTypeIndexMap::::default(), - ) - }, - |(mut transacted, mut received_data, mut address_data, mut txindex_vecs), - (value, output_type, addr_info)| { - transacted.iterate(value, output_type); + transacted.iterate(value, output_type); - if let Some((typeindex, txindex, value, addr_data_opt)) = addr_info { - received_data - .get_mut(output_type) - .unwrap() - .push((typeindex, value)); + if output_type.is_not_address() { + continue; + } - if let Some(addr_data) = addr_data_opt { - address_data.insert_for_type(output_type, typeindex, addr_data); - } + // typeindex only for addresses (random access) + let typeindex = + txoutindex_to_typeindex.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex); - txindex_vecs - .get_mut(output_type) - .unwrap() - .entry(typeindex) - .or_insert_with(TxIndexVec::new) - .push(txindex); - } + received_data + .get_mut(output_type) + .unwrap() + .push((typeindex, value)); - (transacted, received_data, address_data, txindex_vecs) - }, - ) - .reduce( - || { - ( - Transacted::default(), - AddressTypeToVec::default(), - AddressTypeToTypeIndexMap::::default(), - AddressTypeToTypeIndexMap::::default(), - ) - }, - |(t1, r1, a1, tx1), (t2, r2, a2, tx2)| { - (t1 + t2, r1.merge(r2), a1.merge(a2), tx1.merge_vec(tx2)) - }, + let addr_data_opt = get_address_data( + output_type, + typeindex, + first_addressindexes, + loaded_cache, + empty_cache, + vr, + any_address_indexes, + addresses_data, ); + if let Some(addr_data) = addr_data_opt { + address_data.insert_for_type(output_type, typeindex, addr_data); + } + + txindex_vecs + .get_mut(output_type) + .unwrap() + .entry(typeindex) + .or_default() + .push(txindex); + } + OutputsResult { transacted, received_data, diff --git a/crates/brk_indexer/examples/indexer.rs b/crates/brk_indexer/examples/indexer.rs index fc150f90b..e5a6df413 100644 --- a/crates/brk_indexer/examples/indexer.rs +++ b/crates/brk_indexer/examples/indexer.rs @@ -10,6 +10,7 @@ use brk_iterator::Blocks; use brk_reader::Reader; use brk_rpc::{Auth, Client}; use log::{debug, info}; +use mimalloc::MiMalloc; use vecdb::Exit; #[global_allocator] diff --git a/crates/brk_indexer/examples/indexer_read.rs b/crates/brk_indexer/examples/indexer_read.rs index e7ba35c87..2577a2b17 100644 --- a/crates/brk_indexer/examples/indexer_read.rs +++ b/crates/brk_indexer/examples/indexer_read.rs @@ -1,5 +1,6 @@ use brk_error::Result; use brk_indexer::Indexer; +use mimalloc::MiMalloc; // use brk_types::Sats; use std::{fs, path::Path}; diff --git a/crates/brk_indexer/examples/indexer_read_speed.rs b/crates/brk_indexer/examples/indexer_read_speed.rs index 4bf236a5e..b14dab7e4 100644 --- a/crates/brk_indexer/examples/indexer_read_speed.rs +++ b/crates/brk_indexer/examples/indexer_read_speed.rs @@ -1,7 +1,9 @@ +use std::{fs, path::Path, time::Instant}; + use brk_error::Result; use brk_indexer::Indexer; use brk_types::Sats; -use std::{fs, path::Path, time::Instant}; +use mimalloc::MiMalloc; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc;