diff --git a/crates/brk_computer/src/stateful/address/height_type_vec.rs b/crates/brk_computer/src/stateful/address/height_type_vec.rs index 1f5e93de1..d7580ab54 100644 --- a/crates/brk_computer/src/stateful/address/height_type_vec.rs +++ b/crates/brk_computer/src/stateful/address/height_type_vec.rs @@ -13,18 +13,14 @@ pub struct HeightToAddressTypeToVec(FxHashMap>); impl HeightToAddressTypeToVec { /// Create with pre-allocated capacity for unique heights. pub fn with_capacity(capacity: usize) -> Self { - Self(FxHashMap::with_capacity_and_hasher(capacity, Default::default())) + Self(FxHashMap::with_capacity_and_hasher( + capacity, + Default::default(), + )) } } impl HeightToAddressTypeToVec { - /// Merge another map into this one. - pub fn merge_mut(&mut self, other: Self) { - for (height, vec) in other.0 { - self.entry(height).or_default().merge_mut(vec); - } - } - /// Consume and iterate over (Height, AddressTypeToVec) pairs. pub fn into_iter(self) -> impl Iterator)> { self.0.into_iter() diff --git a/crates/brk_computer/src/stateful/address/type_vec.rs b/crates/brk_computer/src/stateful/address/type_vec.rs index e111c9dcc..fa8627c61 100644 --- a/crates/brk_computer/src/stateful/address/type_vec.rs +++ b/crates/brk_computer/src/stateful/address/type_vec.rs @@ -1,7 +1,5 @@ //! Per-address-type vector. -use std::mem; - use brk_grouper::ByAddressType; use derive_deref::{Deref, DerefMut}; @@ -41,40 +39,6 @@ impl AddressTypeToVec { } impl AddressTypeToVec { - /// Merge two AddressTypeToVec, consuming other. - pub fn merge(mut self, mut other: Self) -> Self { - Self::merge_single(&mut self.p2a, &mut other.p2a); - Self::merge_single(&mut self.p2pk33, &mut other.p2pk33); - Self::merge_single(&mut self.p2pk65, &mut other.p2pk65); - Self::merge_single(&mut self.p2pkh, &mut other.p2pkh); - Self::merge_single(&mut self.p2sh, &mut other.p2sh); - Self::merge_single(&mut self.p2tr, &mut other.p2tr); - Self::merge_single(&mut self.p2wpkh, &mut other.p2wpkh); - Self::merge_single(&mut self.p2wsh, &mut other.p2wsh); - self - } - - /// Merge in place. - pub fn merge_mut(&mut self, mut other: Self) { - Self::merge_single(&mut self.p2a, &mut other.p2a); - Self::merge_single(&mut self.p2pk33, &mut other.p2pk33); - Self::merge_single(&mut self.p2pk65, &mut other.p2pk65); - Self::merge_single(&mut self.p2pkh, &mut other.p2pkh); - Self::merge_single(&mut self.p2sh, &mut other.p2sh); - Self::merge_single(&mut self.p2tr, &mut other.p2tr); - Self::merge_single(&mut self.p2wpkh, &mut other.p2wpkh); - Self::merge_single(&mut self.p2wsh, &mut other.p2wsh); - } - - fn merge_single(own: &mut Vec, other: &mut Vec) { - if own.len() >= other.len() { - own.append(other); - } else { - other.append(own); - mem::swap(own, other); - } - } - /// Unwrap the inner ByAddressType. pub fn unwrap(self) -> ByAddressType> { self.0 diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index 2b2f576f2..ee260f29d 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -27,8 +27,8 @@ use super::super::cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts}; use super::super::vecs::Vecs; use super::{ BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1, - BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, VecsReaders, - build_txinindex_to_txindex, build_txoutindex_to_txindex, + BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, TxInIterators, + TxOutIterators, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex, flush::flush_checkpoint as flush_checkpoint_full, }; use crate::stateful::address::AddressTypeToAddressCount; @@ -131,6 +131,10 @@ pub fn process_blocks( let ir = IndexerReaders::new(indexer); let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); + // Create reusable iterators for sequential txout/txin reads (16KB buffered) + let mut txout_iters = TxOutIterators::new(indexer); + let mut txin_iters = TxInIterators::new(indexer); + info!("Creating address iterators..."); // Create iterators for first address indexes per type @@ -267,6 +271,17 @@ pub fn process_blocks( // Reset per-block values for all separate cohorts reset_block_values(&mut vecs.utxo_cohorts, &mut vecs.address_cohorts); + // Collect output/input data using reusable iterators (16KB buffered reads) + // Must be done before thread::scope since iterators aren't Send + let (output_values, output_types, output_typeindexes) = + txout_iters.collect_block_outputs(first_txoutindex, output_count); + + let input_outpoints = if input_count > 1 { + txin_iters.collect_block_outpoints(first_txinindex + 1, input_count - 1) + } else { + Vec::new() + }; + // Process outputs and inputs in parallel with tick-tock let (outputs_result, inputs_result) = thread::scope(|scope| { // Tick-tock age transitions in background @@ -278,13 +293,11 @@ pub fn process_blocks( let outputs_handle = scope.spawn(|| { // Process outputs (receive) process_outputs( - first_txoutindex, output_count, &txoutindex_to_txindex, - &indexer.vecs.txout.txoutindex_to_value, - &indexer.vecs.txout.txoutindex_to_outputtype, - &indexer.vecs.txout.txoutindex_to_typeindex, - &ir, + &output_values, + &output_types, + &output_typeindexes, &first_addressindexes, &loaded_cache, &empty_cache, @@ -300,7 +313,7 @@ pub fn process_blocks( first_txinindex + 1, // Skip coinbase input_count - 1, &txinindex_to_txindex[1..], // Skip coinbase - &indexer.vecs.txin.txinindex_to_outpoint, + &input_outpoints, &indexer.vecs.tx.txindex_to_first_txoutindex, &indexer.vecs.txout.txoutindex_to_value, &indexer.vecs.txout.txoutindex_to_outputtype, @@ -562,11 +575,6 @@ fn flush_checkpoint( ) -> Result<()> { info!("Flushing checkpoint at height {}...", height); - // Flush cohort states - vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?; - vecs.address_cohorts - .safe_flush_stateful_vecs(height, exit)?; - // Flush height-indexed vectors vecs.height_to_unspendable_supply.safe_write(exit)?; vecs.height_to_opreturn_supply.safe_write(exit)?; diff --git a/crates/brk_computer/src/stateful/compute/mod.rs b/crates/brk_computer/src/stateful/compute/mod.rs index 06b8bf76d..e8ce132f7 100644 --- a/crates/brk_computer/src/stateful/compute/mod.rs +++ b/crates/brk_computer/src/stateful/compute/mod.rs @@ -17,7 +17,8 @@ mod recover; pub use block_loop::process_blocks; pub use context::ComputeContext; pub use readers::{ - IndexerReaders, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex, + IndexerReaders, TxInIterators, TxOutIterators, VecsReaders, build_txinindex_to_txindex, + build_txoutindex_to_txindex, }; pub use recover::{StartMode, determine_start_mode, recover_state, reset_state}; diff --git a/crates/brk_computer/src/stateful/compute/readers.rs b/crates/brk_computer/src/stateful/compute/readers.rs index 93fcf18e3..123327ce7 100644 --- a/crates/brk_computer/src/stateful/compute/readers.rs +++ b/crates/brk_computer/src/stateful/compute/readers.rs @@ -4,8 +4,11 @@ use brk_grouper::{ByAddressType, ByAnyAddress}; use brk_indexer::Indexer; -use brk_types::{OutputType, StoredU64, TxIndex}; -use vecdb::{BoxedVecIterator, GenericStoredVec, Reader, VecIndex}; +use brk_types::{OutPoint, OutputType, Sats, StoredU64, TxInIndex, TxIndex, TxOutIndex, TypeIndex}; +use vecdb::{ + BoxedVecIterator, BytesVecIterator, GenericStoredVec, PcodecVecIterator, Reader, VecIndex, + VecIterator, +}; use crate::stateful::address::{AddressesDataVecs, AnyAddressIndexesVecs}; @@ -32,6 +35,70 @@ impl IndexerReaders { } } +/// Reusable iterators for txout vectors (16KB buffered reads). +/// +/// Iterators are created once and re-positioned each block to avoid +/// creating new file handles repeatedly. +pub struct TxOutIterators<'a> { + value_iter: BytesVecIterator<'a, TxOutIndex, Sats>, + outputtype_iter: BytesVecIterator<'a, TxOutIndex, OutputType>, + typeindex_iter: BytesVecIterator<'a, TxOutIndex, TypeIndex>, +} + +impl<'a> TxOutIterators<'a> { + pub fn new(indexer: &'a Indexer) -> Self { + Self { + value_iter: indexer.vecs.txout.txoutindex_to_value.into_iter(), + outputtype_iter: indexer.vecs.txout.txoutindex_to_outputtype.into_iter(), + typeindex_iter: indexer.vecs.txout.txoutindex_to_typeindex.into_iter(), + } + } + + /// Collect output data for a block range using buffered iteration. + pub fn collect_block_outputs( + &mut self, + first_txoutindex: usize, + output_count: usize, + ) -> (Vec, Vec, Vec) { + let mut values = Vec::with_capacity(output_count); + let mut output_types = Vec::with_capacity(output_count); + let mut type_indexes = Vec::with_capacity(output_count); + + for i in first_txoutindex..first_txoutindex + output_count { + values.push(self.value_iter.get_at_unwrap(i)); + output_types.push(self.outputtype_iter.get_at_unwrap(i)); + type_indexes.push(self.typeindex_iter.get_at_unwrap(i)); + } + + (values, output_types, type_indexes) + } +} + +/// Reusable iterator for txin outpoints (PcoVec - avoids repeated page decompression). +pub struct TxInIterators<'a> { + outpoint_iter: PcodecVecIterator<'a, TxInIndex, OutPoint>, +} + +impl<'a> TxInIterators<'a> { + pub fn new(indexer: &'a Indexer) -> Self { + Self { + outpoint_iter: indexer.vecs.txin.txinindex_to_outpoint.into_iter(), + } + } + + /// Collect outpoints for a block range using buffered iteration. + /// This avoids repeated PcoVec page decompression (~1000x speedup). + pub fn collect_block_outpoints( + &mut self, + first_txinindex: usize, + input_count: usize, + ) -> Vec { + (first_txinindex..first_txinindex + input_count) + .map(|i| self.outpoint_iter.get_at_unwrap(i)) + .collect() + } +} + /// Cached readers for stateful vectors. pub struct VecsReaders { pub addresstypeindex_to_anyaddressindex: ByAddressType, diff --git a/crates/brk_computer/src/stateful/process/inputs.rs b/crates/brk_computer/src/stateful/process/inputs.rs index de8779f48..9ed0359fb 100644 --- a/crates/brk_computer/src/stateful/process/inputs.rs +++ b/crates/brk_computer/src/stateful/process/inputs.rs @@ -11,7 +11,7 @@ use brk_types::{ }; use rayon::prelude::*; use rustc_hash::FxHashMap; -use vecdb::{BytesVec, GenericStoredVec, PcoVec, VecIterator}; +use vecdb::{BytesVec, GenericStoredVec}; use crate::stateful::address::{ AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs, @@ -44,12 +44,13 @@ pub struct InputsResult { /// Process inputs (spent UTXOs) for a block. /// /// For each input: -/// 1. Read outpoint, resolve to txoutindex -/// 2. Get the creation height from txoutindex_to_height map -/// 3. Read value and type from the referenced output -/// 4. Look up address data if input references an address type -/// 5. Accumulate into height_to_sent map -/// 6. Track address-specific data for address cohort processing +/// 1. Use pre-collected outpoint (from reusable iterator, avoids PcoVec re-decompression) +/// 2. Resolve outpoint to txoutindex +/// 3. Get the creation height from txoutindex_to_height map +/// 4. Read value and type from the referenced output (random access via mmap) +/// 5. Look up address data if input references an address type +/// 6. Accumulate into height_to_sent map +/// 7. Track address-specific data for address cohort processing /// /// Uses parallel reads followed by sequential accumulation to avoid /// expensive merge overhead from rayon's fold/reduce pattern. @@ -58,7 +59,8 @@ pub fn process_inputs( first_txinindex: usize, input_count: usize, txinindex_to_txindex: &[TxIndex], - txinindex_to_outpoint: &PcoVec, + // Pre-collected outpoints (from reusable iterator with page caching) + outpoints: &[OutPoint], txindex_to_first_txoutindex: &BytesVec, txoutindex_to_value: &BytesVec, txoutindex_to_outputtype: &BytesVec, @@ -73,18 +75,7 @@ pub fn process_inputs( any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, ) -> InputsResult { - // Phase 1: Sequential collect of outpoints (uses iterator's page cache) - // This avoids decompressing the same PcoVec page ~1000 times per page - let outpoints: Vec = { - let mut iter = txinindex_to_outpoint - .clean_iter() - .expect("Failed to create outpoint iterator"); - iter.set_position_to(first_txinindex); - iter.set_end_to(first_txinindex + input_count); - iter.collect() - }; - - // Phase 2: Parallel reads - collect all input data (outpoints already in memory) + // Parallel reads - collect all input data (outpoints already in memory) let items: Vec<_> = (0..input_count) .into_par_iter() .map(|local_idx| { diff --git a/crates/brk_computer/src/stateful/process/outputs.rs b/crates/brk_computer/src/stateful/process/outputs.rs index d745d73ef..69407e575 100644 --- a/crates/brk_computer/src/stateful/process/outputs.rs +++ b/crates/brk_computer/src/stateful/process/outputs.rs @@ -6,16 +6,16 @@ use brk_grouper::ByAddressType; use brk_types::{ - AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TxOutIndex, TypeIndex, + AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TypeIndex, }; use smallvec::SmallVec; -use vecdb::{BytesVec, GenericStoredVec, VecIterator}; +use vecdb::GenericStoredVec; use crate::stateful::address::{ AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs, }; use crate::stateful::compute::VecsReaders; -use crate::{stateful::IndexerReaders, states::Transacted}; +use crate::states::Transacted; use super::super::address::AddressTypeToVec; use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, WithAddressDataSource}; @@ -38,19 +38,18 @@ pub struct OutputsResult { /// Process outputs (new UTXOs) for a block. /// /// For each output: -/// 1. Read value and output type from indexer (sequential via iterators) +/// 1. Read pre-collected value, output type, and typeindex /// 2. Accumulate into Transacted by type and amount /// 3. Look up address data if output is an address type /// 4. Track address-specific data for address cohort processing #[allow(clippy::too_many_arguments)] pub fn process_outputs( - first_txoutindex: usize, output_count: usize, txoutindex_to_txindex: &[TxIndex], - txoutindex_to_value: &BytesVec, - txoutindex_to_outputtype: &BytesVec, - txoutindex_to_typeindex: &BytesVec, - ir: &IndexerReaders, + // Pre-collected output data (from reusable iterators with 16KB buffered reads) + values: &[Sats], + output_types: &[OutputType], + typeindexes: &[TypeIndex], // Address lookup parameters first_addressindexes: &ByAddressType, loaded_cache: &AddressTypeToTypeIndexMap, @@ -59,19 +58,6 @@ pub fn process_outputs( any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, ) -> OutputsResult { - // Sequential iterators for value and outputtype (cache-friendly) - let mut value_iter = txoutindex_to_value - .clean_iter() - .expect("Failed to create value iterator"); - value_iter.set_position_to(first_txoutindex); - value_iter.set_end_to(first_txoutindex + output_count); - - let mut outputtype_iter = txoutindex_to_outputtype - .clean_iter() - .expect("Failed to create outputtype iterator"); - outputtype_iter.set_position_to(first_txoutindex); - outputtype_iter.set_end_to(first_txoutindex + output_count); - // Pre-allocate result structures let estimated_per_type = (output_count / 8).max(8); let mut transacted = Transacted::default(); @@ -81,13 +67,11 @@ pub fn process_outputs( let mut txindex_vecs = AddressTypeToTypeIndexMap::::with_capacity(estimated_per_type); - // Single pass: read and accumulate + // Single pass: read from pre-collected vecs and accumulate for local_idx in 0..output_count { - let txoutindex = TxOutIndex::from(first_txoutindex + local_idx); let txindex = txoutindex_to_txindex[local_idx]; - - let value = value_iter.next().unwrap(); - let output_type = outputtype_iter.next().unwrap(); + let value = values[local_idx]; + let output_type = output_types[local_idx]; transacted.iterate(value, output_type); @@ -95,9 +79,7 @@ pub fn process_outputs( continue; } - // typeindex only for addresses (random access) - let typeindex = - txoutindex_to_typeindex.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex); + let typeindex = typeindexes[local_idx]; received_data .get_mut(output_type) diff --git a/crates/brk_computer/src/states/fenwick.rs b/crates/brk_computer/src/states/fenwick.rs index 65f63acf7..0fd9e2f96 100644 --- a/crates/brk_computer/src/states/fenwick.rs +++ b/crates/brk_computer/src/states/fenwick.rs @@ -42,6 +42,7 @@ impl FenwickTree { } /// Get prefix sum of elements 0..=idx. O(log n). + #[allow(unused)] pub fn prefix_sum(&self, idx: usize) -> u64 { let mut sum = 0u64; let mut i = idx + 1; // Convert to 1-indexed @@ -84,11 +85,13 @@ impl FenwickTree { } /// Get total sum of all elements. O(log n). + #[allow(unused)] pub fn total(&self) -> u64 { self.prefix_sum(self.len.saturating_sub(1)) } /// Reset all values to zero. O(n). + #[allow(unused)] pub fn clear(&mut self) { self.tree.fill(0); } diff --git a/crates/brk_computer/src/states/price_buckets.rs b/crates/brk_computer/src/states/price_buckets.rs index fcfdce9b6..945f9784a 100644 --- a/crates/brk_computer/src/states/price_buckets.rs +++ b/crates/brk_computer/src/states/price_buckets.rs @@ -12,6 +12,7 @@ use crate::grouped::{PERCENTILES, PERCENTILES_LEN}; const MIN_PRICE: f64 = 0.001; /// Maximum price tracked ($100M for future-proofing). +#[allow(unused)] const MAX_PRICE: f64 = 100_000_000.0; /// Base for logarithmic buckets (0.1% precision). @@ -97,11 +98,13 @@ impl PriceBuckets { } /// Check if empty. + #[allow(unused)] pub fn is_empty(&self) -> bool { self.total == Sats::ZERO } /// Get total supply. + #[allow(unused)] pub fn total(&self) -> Sats { self.total } @@ -127,12 +130,14 @@ impl PriceBuckets { } /// Get amount in a specific bucket. + #[allow(unused)] pub fn get_bucket(&self, bucket: usize) -> Sats { self.buckets.get(bucket).copied().unwrap_or(Sats::ZERO) } /// Iterate over non-empty buckets in a price range. /// Used for unrealized computation flip range. + #[allow(unused)] pub fn iter_range( &self, from_price: Dollars, @@ -158,6 +163,7 @@ impl PriceBuckets { } /// Iterate over all non-empty buckets (for full unrealized computation). + #[allow(unused)] pub fn iter(&self) -> impl Iterator + '_ { self.buckets .iter() @@ -172,6 +178,7 @@ impl PriceBuckets { } /// Get the lowest price bucket with non-zero amount. + #[allow(unused)] pub fn min_price(&self) -> Option { self.buckets .iter() @@ -180,6 +187,7 @@ impl PriceBuckets { } /// Get the highest price bucket with non-zero amount. + #[allow(unused)] pub fn max_price(&self) -> Option { self.buckets .iter() @@ -188,6 +196,7 @@ impl PriceBuckets { } /// Clear all data. + #[allow(unused)] pub fn clear(&mut self) { self.fenwick.clear(); self.buckets.fill(Sats::ZERO);