diff --git a/crates/brk_computer/src/cointime/compute.rs b/crates/brk_computer/src/cointime/compute.rs index a02af0d93..382d634cf 100644 --- a/crates/brk_computer/src/cointime/compute.rs +++ b/crates/brk_computer/src/cointime/compute.rs @@ -21,25 +21,36 @@ impl Vecs { self.activity .compute(starting_indexes, blocks, distribution, exit)?; - // Supply computes next (depends on activity) - self.supply - .compute(starting_indexes, distribution, &self.activity, exit)?; + // Phase 2: supply, adjusted, value are independent (all depend only on activity) + let (r1, r2) = rayon::join( + || { + self.supply + .compute(starting_indexes, distribution, &self.activity, exit) + }, + || { + rayon::join( + || { + self.adjusted + .compute(starting_indexes, supply_vecs, &self.activity, exit) + }, + || { + self.value.compute( + starting_indexes, + prices, + blocks, + distribution, + &self.activity, + exit, + ) + }, + ) + }, + ); + r1?; + r2.0?; + r2.1?; - // Adjusted velocity metrics (BTC) - can compute without price - self.adjusted - .compute(starting_indexes, supply_vecs, &self.activity, exit)?; - - // Value computes (cointime value destroyed/created/stored, VOCDD) - self.value.compute( - starting_indexes, - prices, - blocks, - distribution, - &self.activity, - exit, - )?; - - // Cap computes (thermo, investor, vaulted, active, cointime caps) + // Cap depends on activity + value self.cap.compute( starting_indexes, mining, @@ -49,21 +60,27 @@ impl Vecs { exit, )?; - // Pricing computes (all prices derived from caps) - self.pricing.compute( - starting_indexes, - prices, - blocks, - distribution, - &self.activity, - &self.supply, - &self.cap, - exit, - )?; - - // Reserve Risk computes (depends on value.vocdd and price) - self.reserve_risk - .compute(starting_indexes, blocks, prices, &self.value, exit)?; + // Phase 4: pricing and reserve_risk are independent + let (r3, r4) = rayon::join( + || { + self.pricing.compute( + starting_indexes, + prices, + blocks, + distribution, + &self.activity, + &self.supply, + &self.cap, + exit, + ) + }, + || { + self.reserve_risk + .compute(starting_indexes, blocks, prices, &self.value, exit) + }, + ); + r3?; + r4?; let _lock = exit.lock(); self.db.compact()?; diff --git a/crates/brk_computer/src/distribution/block/cohort/received.rs b/crates/brk_computer/src/distribution/block/cohort/received.rs index e36c2325d..ecb243d20 100644 --- a/crates/brk_computer/src/distribution/block/cohort/received.rs +++ b/crates/brk_computer/src/distribution/block/cohort/received.rs @@ -9,6 +9,13 @@ use crate::distribution::{ use super::super::cache::{AddressLookup, TrackingStatus}; +/// Aggregated receive data for a single address within a block. +#[derive(Default)] +struct AggregatedReceive { + total_value: Sats, + output_count: u32, +} + #[allow(clippy::too_many_arguments)] pub(crate) fn process_received( received_data: AddressTypeToVec<(TypeIndex, Sats)>, @@ -19,7 +26,9 @@ pub(crate) fn process_received( empty_addr_count: &mut ByAddressType, activity_counts: &mut AddressTypeToActivityCounts, ) { - let mut aggregated: FxHashMap = FxHashMap::default(); + let max_type_len = received_data.iter().map(|(_, v)| v.len()).max().unwrap_or(0); + let mut aggregated: FxHashMap = + FxHashMap::with_capacity_and_hasher(max_type_len, Default::default()); for (output_type, vec) in received_data.unwrap().into_iter() { if vec.is_empty() { @@ -32,14 +41,13 @@ pub(crate) fn process_received( let type_activity = activity_counts.get_mut_unwrap(output_type); // Aggregate receives by address - each address processed exactly once - // Track (total_value, output_count) for correct UTXO counting for (type_index, value) in vec { let entry = aggregated.entry(type_index).or_default(); - entry.0 += value; - entry.1 += 1; + entry.total_value += value; + entry.output_count += 1; } - for (type_index, (total_value, output_count)) in aggregated.drain() { + for (type_index, recv) in aggregated.drain() { let (addr_data, status) = lookup.get_or_create_for_receive(output_type, type_index); // Track receiving activity - each address in receive aggregation @@ -62,8 +70,8 @@ pub(crate) fn process_received( if is_new_entry { // New/was-empty address - just add to cohort - addr_data.receive_outputs(total_value, price, output_count); - let new_bucket = AmountBucket::from(total_value); + addr_data.receive_outputs(recv.total_value, price, recv.output_count); + let new_bucket = AmountBucket::from(recv.total_value); cohorts .amount_range .get_mut_by_bucket(new_bucket) @@ -73,7 +81,7 @@ pub(crate) fn process_received( .add(addr_data); } else { let prev_balance = addr_data.balance(); - let new_balance = prev_balance + total_value; + let new_balance = prev_balance + recv.total_value; let prev_bucket = AmountBucket::from(prev_balance); let new_bucket = AmountBucket::from(new_balance); @@ -97,13 +105,13 @@ pub(crate) fn process_received( type_index, prev_balance, new_balance, - total_value, + recv.total_value, addr_data ); } cohort_state.subtract(addr_data); - addr_data.receive_outputs(total_value, price, output_count); + addr_data.receive_outputs(recv.total_value, price, recv.output_count); cohorts .amount_range .get_mut_by_bucket(new_bucket) @@ -119,7 +127,7 @@ pub(crate) fn process_received( .state .as_mut() .unwrap() - .receive_outputs(addr_data, total_value, price, output_count); + .receive_outputs(addr_data, recv.total_value, price, recv.output_count); } } } diff --git a/crates/brk_computer/src/distribution/block/utxo/inputs.rs b/crates/brk_computer/src/distribution/block/utxo/inputs.rs index 6efe71ae0..a236f70ad 100644 --- a/crates/brk_computer/src/distribution/block/utxo/inputs.rs +++ b/crates/brk_computer/src/distribution/block/utxo/inputs.rs @@ -57,40 +57,48 @@ pub(crate) fn process_inputs( any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, ) -> Result { - let items: Vec<_> = (0..input_count) - .into_par_iter() - .map(|local_idx| -> Result<_> { - let txindex = txinindex_to_txindex[local_idx]; + let map_fn = |local_idx: usize| -> Result<_> { + let txindex = txinindex_to_txindex[local_idx]; - let prev_height = txinindex_to_prev_height[local_idx]; - let value = txinindex_to_value[local_idx]; - let input_type = txinindex_to_outputtype[local_idx]; + let prev_height = txinindex_to_prev_height[local_idx]; + let value = txinindex_to_value[local_idx]; + let input_type = txinindex_to_outputtype[local_idx]; - if input_type.is_not_address() { - return Ok((prev_height, value, input_type, None)); - } + if input_type.is_not_address() { + return Ok((prev_height, value, input_type, None)); + } - let typeindex = txinindex_to_typeindex[local_idx]; + let typeindex = txinindex_to_typeindex[local_idx]; - // Look up address data - let addr_data_opt = load_uncached_address_data( - input_type, - typeindex, - first_addressindexes, - cache, - vr, - any_address_indexes, - addresses_data, - )?; + // Look up address data + let addr_data_opt = load_uncached_address_data( + input_type, + typeindex, + first_addressindexes, + cache, + vr, + any_address_indexes, + addresses_data, + )?; - Ok(( - prev_height, - value, - input_type, - Some((typeindex, txindex, value, addr_data_opt)), - )) - }) - .collect::>>()?; + Ok(( + prev_height, + value, + input_type, + Some((typeindex, txindex, value, addr_data_opt)), + )) + }; + + let items: Vec<_> = if input_count < 128 { + (0..input_count) + .map(map_fn) + .collect::>>()? + } else { + (0..input_count) + .into_par_iter() + .map(map_fn) + .collect::>>()? + }; // Phase 2: Sequential accumulation - no merge overhead // Estimate: unique heights bounded by block depth, addresses spread across ~8 types diff --git a/crates/brk_computer/src/distribution/block/utxo/outputs.rs b/crates/brk_computer/src/distribution/block/utxo/outputs.rs index 4d4b706b1..e8b83c239 100644 --- a/crates/brk_computer/src/distribution/block/utxo/outputs.rs +++ b/crates/brk_computer/src/distribution/block/utxo/outputs.rs @@ -48,38 +48,46 @@ pub(crate) fn process_outputs( ) -> Result { let output_count = txoutdata_vec.len(); - // Phase 1: Parallel address lookups (mmap reads) - let items: Vec<_> = (0..output_count) - .into_par_iter() - .map(|local_idx| -> Result<_> { - let txoutdata = &txoutdata_vec[local_idx]; - let value = txoutdata.value; - let output_type = txoutdata.outputtype; + // Phase 1: Address lookups (mmap reads) — parallel for large blocks, sequential for small + let map_fn = |local_idx: usize| -> Result<_> { + let txoutdata = &txoutdata_vec[local_idx]; + let value = txoutdata.value; + let output_type = txoutdata.outputtype; - if output_type.is_not_address() { - return Ok((value, output_type, None)); - } + if output_type.is_not_address() { + return Ok((value, output_type, None)); + } - let typeindex = txoutdata.typeindex; - let txindex = txoutindex_to_txindex[local_idx]; + let typeindex = txoutdata.typeindex; + let txindex = txoutindex_to_txindex[local_idx]; - let addr_data_opt = load_uncached_address_data( - output_type, - typeindex, - first_addressindexes, - cache, - vr, - any_address_indexes, - addresses_data, - )?; + let addr_data_opt = load_uncached_address_data( + output_type, + typeindex, + first_addressindexes, + cache, + vr, + any_address_indexes, + addresses_data, + )?; - Ok(( - value, - output_type, - Some((typeindex, txindex, value, addr_data_opt)), - )) - }) - .collect::>>()?; + Ok(( + value, + output_type, + Some((typeindex, txindex, value, addr_data_opt)), + )) + }; + + let items: Vec<_> = if output_count < 128 { + (0..output_count) + .map(map_fn) + .collect::>>()? + } else { + (0..output_count) + .into_par_iter() + .map(map_fn) + .collect::>>()? + }; // Phase 2: Sequential accumulation let estimated_per_type = (output_count / 8).max(8); diff --git a/crates/brk_computer/src/distribution/cohorts/utxo/fenwick.rs b/crates/brk_computer/src/distribution/cohorts/utxo/fenwick.rs index 252f894a2..d8cfc7b57 100644 --- a/crates/brk_computer/src/distribution/cohorts/utxo/fenwick.rs +++ b/crates/brk_computer/src/distribution/cohorts/utxo/fenwick.rs @@ -239,26 +239,40 @@ impl CostBasisFenwick { return result; } - // Sat-weighted percentiles: find first bucket where cumulative >= target + // Build sorted sat targets: [min=0, percentiles..., max=total-1] + let mut sat_targets = [0i64; PERCENTILES_LEN + 2]; + sat_targets[0] = 0; // min for (i, &p) in PERCENTILES.iter().enumerate() { - let target = (total_sats * i64::from(p) / 100 - 1).max(0); - let bucket = self.tree.kth(target, &sat_field); - result.sat_prices[i] = bucket_to_cents(bucket); + sat_targets[i + 1] = (total_sats * i64::from(p) / 100 - 1).max(0); } + sat_targets[PERCENTILES_LEN + 1] = total_sats - 1; // max - // USD-weighted percentiles + let mut sat_buckets = [0usize; PERCENTILES_LEN + 2]; + self.tree + .batch_kth(&sat_targets, &sat_field, &mut sat_buckets); + + result.min_price = bucket_to_cents(sat_buckets[0]); + for i in 0..PERCENTILES_LEN { + result.sat_prices[i] = bucket_to_cents(sat_buckets[i + 1]); + } + result.max_price = bucket_to_cents(sat_buckets[PERCENTILES_LEN + 1]); + + // USD-weighted percentiles (batch) if total_usd > 0 { + let mut usd_targets = [0i128; PERCENTILES_LEN]; for (i, &p) in PERCENTILES.iter().enumerate() { - let target = (total_usd * i128::from(p) / 100 - 1).max(0); - let bucket = self.tree.kth(target, &usd_field); - result.usd_prices[i] = bucket_to_cents(bucket); + usd_targets[i] = (total_usd * i128::from(p) / 100 - 1).max(0); + } + + let mut usd_buckets = [0usize; PERCENTILES_LEN]; + self.tree + .batch_kth(&usd_targets, &usd_field, &mut usd_buckets); + + for i in 0..PERCENTILES_LEN { + result.usd_prices[i] = bucket_to_cents(usd_buckets[i]); } } - // Min/max via kth(0) and kth(total-1) - result.min_price = bucket_to_cents(self.tree.kth(0i64, &sat_field)); - result.max_price = bucket_to_cents(self.tree.kth(total_sats - 1, &sat_field)); - result } diff --git a/crates/brk_computer/src/distribution/cohorts/utxo/groups.rs b/crates/brk_computer/src/distribution/cohorts/utxo/groups.rs index fd8e78d3c..2cb82b4f4 100644 --- a/crates/brk_computer/src/distribution/cohorts/utxo/groups.rs +++ b/crates/brk_computer/src/distribution/cohorts/utxo/groups.rs @@ -331,6 +331,28 @@ impl UTXOCohorts { .chain(type_.par_iter_mut().map(|x| x as &mut dyn DynCohortVecs)) } + /// Sequential mutable iterator over all separate (stateful) cohorts. + /// Use instead of `par_iter_separate_mut` when per-item work is trivial. + pub(crate) fn iter_separate_mut( + &mut self, + ) -> impl Iterator { + let Self { + age_range, + epoch, + class, + amount_range, + type_, + .. + } = self; + age_range + .iter_mut() + .map(|x| x as &mut dyn DynCohortVecs) + .chain(epoch.iter_mut().map(|x| x as &mut dyn DynCohortVecs)) + .chain(class.iter_mut().map(|x| x as &mut dyn DynCohortVecs)) + .chain(amount_range.iter_mut().map(|x| x as &mut dyn DynCohortVecs)) + .chain(type_.iter_mut().map(|x| x as &mut dyn DynCohortVecs)) + } + /// Immutable iterator over all separate (stateful) cohorts. pub(crate) fn iter_separate(&self) -> impl Iterator { self.age_range @@ -705,21 +727,20 @@ impl UTXOCohorts { /// Reset state heights for all separate cohorts. pub(crate) fn reset_separate_state_heights(&mut self) { - self.par_iter_separate_mut().for_each(|v| { - v.reset_state_starting_height(); - }); + self.iter_separate_mut() + .for_each(|v| v.reset_state_starting_height()); } /// Reset cost_basis_data for all separate cohorts (called during fresh start). pub(crate) fn reset_separate_cost_basis_data(&mut self) -> Result<()> { - self.par_iter_separate_mut() + self.iter_separate_mut() .try_for_each(|v| v.reset_cost_basis_data_if_needed()) } /// Validate computed versions for all cohorts. pub(crate) fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> { // Validate separate cohorts - self.par_iter_separate_mut() + self.iter_separate_mut() .try_for_each(|v| v.validate_computed_versions(base_version))?; // Validate aggregate cohorts diff --git a/crates/brk_computer/src/distribution/cohorts/utxo/receive.rs b/crates/brk_computer/src/distribution/cohorts/utxo/receive.rs index ae0add351..43b7d986e 100644 --- a/crates/brk_computer/src/distribution/cohorts/utxo/receive.rs +++ b/crates/brk_computer/src/distribution/cohorts/utxo/receive.rs @@ -46,18 +46,22 @@ impl UTXOCohorts { .unwrap() .receive_utxo_snapshot(&supply_state, &snapshot); - // Update output type cohorts + // Update output type cohorts (skip types with no outputs this block) self.type_.iter_typed_mut().for_each(|(output_type, vecs)| { - vecs.state - .as_mut() - .unwrap() - .receive_utxo(received.by_type.get(output_type), price) + let supply_state = received.by_type.get(output_type); + if supply_state.utxo_count > 0 { + vecs.state + .as_mut() + .unwrap() + .receive_utxo(supply_state, price) + } }); - // Update amount range cohorts + // Update amount range cohorts (skip empty ranges) received .by_size_group .iter_typed() + .filter(|(_, supply_state)| supply_state.utxo_count > 0) .for_each(|(group, supply_state)| { self.amount_range .get_mut(group) diff --git a/crates/brk_computer/src/distribution/cohorts/utxo/send.rs b/crates/brk_computer/src/distribution/cohorts/utxo/send.rs index 6c0394ebd..b72631c22 100644 --- a/crates/brk_computer/src/distribution/cohorts/utxo/send.rs +++ b/crates/brk_computer/src/distribution/cohorts/utxo/send.rs @@ -94,10 +94,11 @@ impl UTXOCohorts { .supply -= &sent.spendable_supply; } - // Update output type cohorts + // Update output type cohorts (skip zero-supply entries) sent.by_type .spendable .iter_typed() + .filter(|(_, supply_state)| supply_state.utxo_count > 0) .for_each(|(output_type, supply_state)| { self.type_ .get_mut(output_type) @@ -107,9 +108,10 @@ impl UTXOCohorts { .send_utxo(supply_state, current_price, prev_price, peak_price, age) }); - // Update amount range cohorts + // Update amount range cohorts (skip zero-supply entries) sent.by_size_group .iter_typed() + .filter(|(_, supply_state)| supply_state.utxo_count > 0) .for_each(|(group, supply_state)| { self.amount_range .get_mut(group) diff --git a/crates/brk_computer/src/distribution/compute/block_loop.rs b/crates/brk_computer/src/distribution/compute/block_loop.rs index 76fb46d24..a2afb7f2a 100644 --- a/crates/brk_computer/src/distribution/compute/block_loop.rs +++ b/crates/brk_computer/src/distribution/compute/block_loop.rs @@ -238,12 +238,6 @@ pub(crate) fn process_blocks( debug_assert_eq!(ctx.timestamp_at(height), timestamp); debug_assert_eq!(ctx.price_at(height), block_price); - // Build txindex mappings for this block (reuses internal buffers) - let txoutindex_to_txindex = - txout_to_txindex_buf.build(first_txindex, tx_count, txindex_to_output_count); - let txinindex_to_txindex = - txin_to_txindex_buf.build(first_txindex, tx_count, txindex_to_input_count); - // Get first address indexes for this height from pre-collected vecs let first_addressindexes = ByAddressType { p2a: TypeIndex::from(first_p2a_vec[offset].to_usize()), @@ -259,23 +253,19 @@ pub(crate) fn process_blocks( // Reset per-block activity counts activity_counts.reset(); - // Collect output/input data using reusable iterators (16KB buffered reads) - // Must be done before rayon::join since iterators aren't Send - let txoutdata_vec = txout_iters.collect_block_outputs(first_txoutindex, output_count); - - let (input_values, input_prev_heights, input_outputtypes, input_typeindexes) = - if input_count > 1 { - txin_iters.collect_block_inputs(first_txinindex + 1, input_count - 1, height) - } else { - (&[][..], &[][..], &[][..], &[][..]) - }; - - // Process outputs, inputs, and tick-tock in parallel via rayon::join + // Process outputs, inputs, and tick-tock in parallel via rayon::join. + // Collection (build txindex mappings + bulk mmap reads) is merged into the + // processing closures so outputs and inputs collection overlap each other + // and tick-tock, instead of running sequentially before the join. let (matured, oi_result) = rayon::join( || vecs.utxo_cohorts.tick_tock_next_block(chain_state, timestamp), || -> Result<_> { let (outputs_result, inputs_result) = rayon::join( || { + let txoutindex_to_txindex = txout_to_txindex_buf + .build(first_txindex, tx_count, txindex_to_output_count); + let txoutdata_vec = + txout_iters.collect_block_outputs(first_txoutindex, output_count); process_outputs( txoutindex_to_txindex, txoutdata_vec, @@ -288,6 +278,14 @@ pub(crate) fn process_blocks( }, || -> Result<_> { if input_count > 1 { + let txinindex_to_txindex = txin_to_txindex_buf + .build(first_txindex, tx_count, txindex_to_input_count); + let (input_values, input_prev_heights, input_outputtypes, input_typeindexes) = + txin_iters.collect_block_inputs( + first_txinindex + 1, + input_count - 1, + height, + ); process_inputs( input_count - 1, &txinindex_to_txindex[1..], diff --git a/crates/brk_computer/src/distribution/range_map.rs b/crates/brk_computer/src/distribution/range_map.rs index c302cd747..0eea71d5f 100644 --- a/crates/brk_computer/src/distribution/range_map.rs +++ b/crates/brk_computer/src/distribution/range_map.rs @@ -1,8 +1,8 @@ use std::marker::PhantomData; -/// Number of ranges to cache. Small enough for O(1) linear scan, -/// large enough to cover the "hot" source blocks in a typical block. -const CACHE_SIZE: usize = 8; +/// Direct-mapped cache size. Power of 2 for fast masking. +const CACHE_SIZE: usize = 128; +const CACHE_MASK: usize = CACHE_SIZE - 1; /// Maps ranges of indices to values for efficient reverse lookups. /// @@ -10,15 +10,13 @@ const CACHE_SIZE: usize = 8; /// in a sorted Vec and uses binary search to find the value for any index. /// The value is derived from the position in the Vec. /// -/// Includes an LRU cache of recently accessed ranges to avoid binary search -/// when there's locality in access patterns. +/// Includes a direct-mapped cache for O(1) lookups when there's locality. #[derive(Debug, Clone)] pub struct RangeMap { /// Sorted vec of first_index values. Position in vec = value. first_indexes: Vec, - /// LRU cache: (range_low, range_high, value, age). Lower age = more recent. - cache: [(I, I, V, u8); CACHE_SIZE], - cache_len: u8, + /// Direct-mapped cache: (range_low, range_high, value, occupied). Inline for zero indirection. + cache: [(I, I, V, bool); CACHE_SIZE], _phantom: PhantomData, } @@ -26,14 +24,13 @@ impl Default for RangeMap { fn default() -> Self { Self { first_indexes: Vec::new(), - cache: [(I::default(), I::default(), V::default(), 0); CACHE_SIZE], - cache_len: 0, + cache: [(I::default(), I::default(), V::default(), false); CACHE_SIZE], _phantom: PhantomData, } } } -impl + Copy + Default> RangeMap { +impl, V: From + Copy + Default> RangeMap { /// Number of ranges stored. pub(crate) fn len(&self) -> usize { self.first_indexes.len() @@ -42,7 +39,7 @@ impl + Copy + Default> RangeMap { /// Truncate to `new_len` ranges and clear the cache. pub(crate) fn truncate(&mut self, new_len: usize) { self.first_indexes.truncate(new_len); - self.cache_len = 0; + self.clear_cache(); } /// Push a new first_index. Value is implicitly the current length. @@ -66,21 +63,11 @@ impl + Copy + Default> RangeMap { return None; } - let cache_len = self.cache_len as usize; - - // Check cache first (linear scan of small array) - for i in 0..cache_len { - let (low, high, value, _) = self.cache[i]; - if index >= low && index < high { - // Cache hit - mark as most recently used - if self.cache[i].3 != 0 { - for j in 0..cache_len { - self.cache[j].3 = self.cache[j].3.saturating_add(1); - } - self.cache[i].3 = 0; - } - return Some(value); - } + // Direct-mapped cache lookup: O(1), no aging + let slot = Self::cache_slot(&index); + let entry = &self.cache[slot]; + if entry.3 && index >= entry.0 && index < entry.1 { + return Some(entry.2); } // Cache miss - binary search @@ -88,15 +75,12 @@ impl + Copy + Default> RangeMap { if pos > 0 { let value = V::from(pos - 1); let low = self.first_indexes[pos - 1]; - - // For last range, use low as high (special marker) - // The check `index < high` will fail, but `index >= low` handles it - let high = self.first_indexes.get(pos).copied().unwrap_or(low); let is_last = pos == self.first_indexes.len(); - // Add to cache (skip if last range - unbounded high is tricky) + // Cache non-last ranges (last range has unbounded high) if !is_last { - self.add_to_cache(low, high, value); + let high = self.first_indexes[pos]; + self.cache[slot] = (low, high, value, true); } Some(value) @@ -106,29 +90,14 @@ impl + Copy + Default> RangeMap { } #[inline] - fn add_to_cache(&mut self, low: I, high: I, value: V) { - let cache_len = self.cache_len as usize; + fn cache_slot(index: &I) -> usize { + let v: usize = (*index).into(); + v & CACHE_MASK + } - // Age all entries - for i in 0..cache_len { - self.cache[i].3 = self.cache[i].3.saturating_add(1); - } - - if cache_len < CACHE_SIZE { - // Not full - append - self.cache[cache_len] = (low, high, value, 0); - self.cache_len += 1; - } else { - // Full - evict oldest (highest age) - let mut oldest_idx = 0; - let mut oldest_age = 0u8; - for i in 0..CACHE_SIZE { - if self.cache[i].3 > oldest_age { - oldest_age = self.cache[i].3; - oldest_idx = i; - } - } - self.cache[oldest_idx] = (low, high, value, 0); + fn clear_cache(&mut self) { + for entry in self.cache.iter_mut() { + entry.3 = false; } } } diff --git a/crates/brk_computer/src/distribution/state/cost_basis/unrealized.rs b/crates/brk_computer/src/distribution/state/cost_basis/unrealized.rs index 47a9dc6d3..4b03194de 100644 --- a/crates/brk_computer/src/distribution/state/cost_basis/unrealized.rs +++ b/crates/brk_computer/src/distribution/state/cost_basis/unrealized.rs @@ -89,6 +89,8 @@ impl CachedStateRaw { pub struct CachedUnrealizedState { state: CachedStateRaw, at_price: CentsCompact, + /// Cached output to skip redundant u128 divisions when nothing changed. + cached_output: Option, } impl CachedUnrealizedState { @@ -98,6 +100,7 @@ impl CachedUnrealizedState { Self { state, at_price: price, + cached_output: None, } } @@ -110,11 +113,18 @@ impl CachedUnrealizedState { let new_price: CentsCompact = new_price.into(); if new_price != self.at_price { self.update_for_price_change(new_price, map); + self.cached_output = None; } - self.state.to_output() + if let Some(ref output) = self.cached_output { + return output.clone(); + } + let output = self.state.to_output(); + self.cached_output = Some(output.clone()); + output } pub(crate) fn on_receive(&mut self, price: Cents, sats: Sats) { + self.cached_output = None; let price: CentsCompact = price.into(); let sats_u128 = sats.as_u128(); let price_u128 = price.as_u128(); @@ -139,6 +149,7 @@ impl CachedUnrealizedState { } pub(crate) fn on_send(&mut self, price: Cents, sats: Sats) { + self.cached_output = None; let price: CentsCompact = price.into(); let sats_u128 = sats.as_u128(); let price_u128 = price.as_u128(); diff --git a/crates/brk_computer/src/inputs/spent/compute.rs b/crates/brk_computer/src/inputs/spent/compute.rs index 8890e6d74..2b1bb7ff8 100644 --- a/crates/brk_computer/src/inputs/spent/compute.rs +++ b/crates/brk_computer/src/inputs/spent/compute.rs @@ -42,10 +42,14 @@ impl Vecs { let value_reader = indexer.vecs.outputs.value.reader(); let actual_total = target - min; let mut entries: Vec = Vec::with_capacity(actual_total.min(BATCH_SIZE)); + // Pre-allocate output buffers for scatter-write pattern + let mut out_txoutindex: Vec = Vec::new(); + let mut out_value: Vec = Vec::new(); let mut batch_start = min; while batch_start < target { let batch_end = (batch_start + BATCH_SIZE).min(target); + let batch_len = batch_end - batch_start; entries.clear(); let mut j = 0usize; @@ -55,7 +59,7 @@ impl Vecs { .outpoint .for_each_range_at(batch_start, batch_end, |outpoint| { entries.push(Entry { - txinindex: TxInIndex::from(batch_start + j), + original_idx: j, txindex: outpoint.txindex(), vout: outpoint.vout(), txoutindex: TxOutIndex::COINBASE, @@ -64,7 +68,7 @@ impl Vecs { j += 1; }); - // Coinbase entries (txindex MAX) sorted to end + // Sort 1: by txindex (group by transaction for sequential first_txoutindex reads) entries.sort_unstable_by_key(|e| e.txindex); for entry in &mut entries { if entry.txindex.is_coinbase() { @@ -74,6 +78,7 @@ impl Vecs { first_txoutindex_reader.get(entry.txindex.to_usize()) + entry.vout; } + // Sort 2: by txoutindex (sequential value reads) entries.sort_unstable_by_key(|e| e.txoutindex); for entry in &mut entries { if entry.txoutindex.is_coinbase() { @@ -82,11 +87,22 @@ impl Vecs { entry.value = value_reader.get(entry.txoutindex.to_usize()); } - entries.sort_unstable_by_key(|e| e.txinindex); + // Scatter-write to output buffers using original_idx (avoids Sort 3) + out_txoutindex.clear(); + out_txoutindex.resize(batch_len, TxOutIndex::COINBASE); + out_value.clear(); + out_value.resize(batch_len, Sats::MAX); + for entry in &entries { + out_txoutindex[entry.original_idx] = entry.txoutindex; + out_value[entry.original_idx] = entry.value; + } + + for i in 0..batch_len { + let txinindex = TxInIndex::from(batch_start + i); self.txoutindex - .truncate_push(entry.txinindex, entry.txoutindex)?; - self.value.truncate_push(entry.txinindex, entry.value)?; + .truncate_push(txinindex, out_txoutindex[i])?; + self.value.truncate_push(txinindex, out_value[i])?; } if batch_end < target { @@ -106,7 +122,7 @@ impl Vecs { } struct Entry { - txinindex: TxInIndex, + original_idx: usize, txindex: TxIndex, vout: Vout, txoutindex: TxOutIndex, diff --git a/crates/brk_computer/src/internal/algo/fenwick.rs b/crates/brk_computer/src/internal/algo/fenwick.rs index bf4cd309d..acf96e68b 100644 --- a/crates/brk_computer/src/internal/algo/fenwick.rs +++ b/crates/brk_computer/src/internal/algo/fenwick.rs @@ -88,6 +88,39 @@ impl FenwickTree { pos // 0-indexed bucket } + /// Batch kth for sorted targets. Processes all targets at each tree level + /// for better cache locality vs individual kth() calls. + /// + /// `sorted_targets` must be sorted ascending. `out` receives the 0-indexed bucket + /// for each target. Both slices must have the same length. + #[inline] + pub fn batch_kth(&self, sorted_targets: &[V], field_fn: &F, out: &mut [usize]) + where + V: Copy + PartialOrd + std::ops::SubAssign, + F: Fn(&N) -> V, + { + let k = sorted_targets.len(); + debug_assert_eq!(out.len(), k); + debug_assert!(self.size > 0); + out.fill(0); + // Copy targets so we can subtract in-place + let mut remaining: smallvec::SmallVec<[V; 24]> = sorted_targets.into(); + let mut bit = 1usize << (usize::BITS - 1 - self.size.leading_zeros() as u32); + while bit > 0 { + for i in 0..k { + let next = out[i] + bit; + if next <= self.size { + let val = field_fn(&self.tree[next]); + if remaining[i] >= val { + remaining[i] -= val; + out[i] = next; + } + } + } + bit >>= 1; + } + } + /// Write a raw frequency delta at a bucket. Does NOT maintain the Fenwick invariant. /// Call [`build_in_place`] after all raw writes. #[inline] diff --git a/crates/brk_computer/src/lib.rs b/crates/brk_computer/src/lib.rs index c4e26d867..38549e7bc 100644 --- a/crates/brk_computer/src/lib.rs +++ b/crates/brk_computer/src/lib.rs @@ -150,6 +150,14 @@ impl Computer { )?)) })?; + let pools_handle = big_thread().spawn_scoped(s, || -> Result<_> { + Ok(Box::new(pools::Vecs::forced_import( + &computed_path, + VERSION, + &indexes, + )?)) + })?; + let cointime = Box::new(cointime::Vecs::forced_import( &computed_path, VERSION, @@ -160,23 +168,33 @@ impl Computer { let mining = mining_handle.join().unwrap()?; let transactions = transactions_handle.join().unwrap()?; let scripts = scripts_handle.join().unwrap()?; - - let pools = Box::new(pools::Vecs::forced_import( - &computed_path, - VERSION, - &indexes, - )?); + let pools = pools_handle.join().unwrap()?; Ok((blocks, mining, transactions, scripts, pools, cointime)) }) })?; - let distribution = timed("Imported distribution", || -> Result<_> { - Ok(Box::new(distribution::Vecs::forced_import( - &computed_path, - VERSION, - &indexes, - )?)) + // Market and distribution are independent; import in parallel. + // Supply depends on distribution so it runs after. + let (distribution, market) = timed("Imported distribution/market", || { + thread::scope(|s| -> Result<_> { + let market_handle = big_thread().spawn_scoped(s, || -> Result<_> { + Ok(Box::new(market::Vecs::forced_import( + &computed_path, + VERSION, + &indexes, + )?)) + })?; + + let distribution = Box::new(distribution::Vecs::forced_import( + &computed_path, + VERSION, + &indexes, + )?); + + let market = market_handle.join().unwrap()?; + Ok((distribution, market)) + }) })?; let supply = timed("Imported supply", || -> Result<_> { @@ -188,14 +206,6 @@ impl Computer { )?)) })?; - let market = timed("Imported market", || -> Result<_> { - Ok(Box::new(market::Vecs::forced_import( - &computed_path, - VERSION, - &indexes, - )?)) - })?; - info!("Total import time: {:?}", import_start.elapsed()); let this = Self { diff --git a/crates/brk_computer/src/market/compute.rs b/crates/brk_computer/src/market/compute.rs index 0142c7e26..20fefd516 100644 --- a/crates/brk_computer/src/market/compute.rs +++ b/crates/brk_computer/src/market/compute.rs @@ -19,33 +19,50 @@ impl Vecs { starting_indexes: &Indexes, exit: &Exit, ) -> Result<()> { - self.ath.compute(prices, blocks, starting_indexes, exit)?; + // Phase 1: Independent sub-modules in parallel + let (r1, r2) = rayon::join( + || { + rayon::join( + || self.ath.compute(prices, blocks, starting_indexes, exit), + || self.lookback.compute(blocks, prices, starting_indexes, exit), + ) + }, + || { + rayon::join( + || self.range.compute(prices, blocks, starting_indexes, exit), + || { + self.moving_average + .compute(blocks, prices, starting_indexes, exit) + }, + ) + }, + ); + r1.0?; + r1.1?; + r2.0?; + r2.1?; - // Lookback metrics (independent) - self.lookback - .compute(blocks, prices, starting_indexes, exit)?; - - // Returns metrics (depends on lookback) - self.returns - .compute(prices, blocks, &self.lookback, starting_indexes, exit)?; - - // Range metrics (independent) - self.range.compute(prices, blocks, starting_indexes, exit)?; - - // Moving average metrics (independent) - self.moving_average - .compute(blocks, prices, starting_indexes, exit)?; - - // DCA metrics (depends on lookback for lump sum comparison) - self.dca.compute( - indexes, - prices, - blocks, - &self.lookback, - starting_indexes, - exit, - )?; + // Phase 2: Depend on lookback + let (r3, r4) = rayon::join( + || { + self.returns + .compute(prices, blocks, &self.lookback, starting_indexes, exit) + }, + || { + self.dca.compute( + indexes, + prices, + blocks, + &self.lookback, + starting_indexes, + exit, + ) + }, + ); + r3?; + r4?; + // Phase 3: Depends on returns, range, moving_average self.indicators.compute( &mining.rewards, &self.returns,