distribution: speed improvements

This commit is contained in:
nym21
2026-03-08 21:49:14 +01:00
parent d55377e169
commit bb2458c765
15 changed files with 387 additions and 251 deletions

View File

@@ -21,25 +21,36 @@ impl Vecs {
self.activity
.compute(starting_indexes, blocks, distribution, exit)?;
// Supply computes next (depends on activity)
self.supply
.compute(starting_indexes, distribution, &self.activity, exit)?;
// Phase 2: supply, adjusted, value are independent (all depend only on activity)
let (r1, r2) = rayon::join(
|| {
self.supply
.compute(starting_indexes, distribution, &self.activity, exit)
},
|| {
rayon::join(
|| {
self.adjusted
.compute(starting_indexes, supply_vecs, &self.activity, exit)
},
|| {
self.value.compute(
starting_indexes,
prices,
blocks,
distribution,
&self.activity,
exit,
)
},
)
},
);
r1?;
r2.0?;
r2.1?;
// Adjusted velocity metrics (BTC) - can compute without price
self.adjusted
.compute(starting_indexes, supply_vecs, &self.activity, exit)?;
// Value computes (cointime value destroyed/created/stored, VOCDD)
self.value.compute(
starting_indexes,
prices,
blocks,
distribution,
&self.activity,
exit,
)?;
// Cap computes (thermo, investor, vaulted, active, cointime caps)
// Cap depends on activity + value
self.cap.compute(
starting_indexes,
mining,
@@ -49,21 +60,27 @@ impl Vecs {
exit,
)?;
// Pricing computes (all prices derived from caps)
self.pricing.compute(
starting_indexes,
prices,
blocks,
distribution,
&self.activity,
&self.supply,
&self.cap,
exit,
)?;
// Reserve Risk computes (depends on value.vocdd and price)
self.reserve_risk
.compute(starting_indexes, blocks, prices, &self.value, exit)?;
// Phase 4: pricing and reserve_risk are independent
let (r3, r4) = rayon::join(
|| {
self.pricing.compute(
starting_indexes,
prices,
blocks,
distribution,
&self.activity,
&self.supply,
&self.cap,
exit,
)
},
|| {
self.reserve_risk
.compute(starting_indexes, blocks, prices, &self.value, exit)
},
);
r3?;
r4?;
let _lock = exit.lock();
self.db.compact()?;

View File

@@ -9,6 +9,13 @@ use crate::distribution::{
use super::super::cache::{AddressLookup, TrackingStatus};
/// Aggregated receive data for a single address within a block.
#[derive(Default)]
struct AggregatedReceive {
total_value: Sats,
output_count: u32,
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn process_received(
received_data: AddressTypeToVec<(TypeIndex, Sats)>,
@@ -19,7 +26,9 @@ pub(crate) fn process_received(
empty_addr_count: &mut ByAddressType<u64>,
activity_counts: &mut AddressTypeToActivityCounts,
) {
let mut aggregated: FxHashMap<TypeIndex, (Sats, u32)> = FxHashMap::default();
let max_type_len = received_data.iter().map(|(_, v)| v.len()).max().unwrap_or(0);
let mut aggregated: FxHashMap<TypeIndex, AggregatedReceive> =
FxHashMap::with_capacity_and_hasher(max_type_len, Default::default());
for (output_type, vec) in received_data.unwrap().into_iter() {
if vec.is_empty() {
@@ -32,14 +41,13 @@ pub(crate) fn process_received(
let type_activity = activity_counts.get_mut_unwrap(output_type);
// Aggregate receives by address - each address processed exactly once
// Track (total_value, output_count) for correct UTXO counting
for (type_index, value) in vec {
let entry = aggregated.entry(type_index).or_default();
entry.0 += value;
entry.1 += 1;
entry.total_value += value;
entry.output_count += 1;
}
for (type_index, (total_value, output_count)) in aggregated.drain() {
for (type_index, recv) in aggregated.drain() {
let (addr_data, status) = lookup.get_or_create_for_receive(output_type, type_index);
// Track receiving activity - each address in receive aggregation
@@ -62,8 +70,8 @@ pub(crate) fn process_received(
if is_new_entry {
// New/was-empty address - just add to cohort
addr_data.receive_outputs(total_value, price, output_count);
let new_bucket = AmountBucket::from(total_value);
addr_data.receive_outputs(recv.total_value, price, recv.output_count);
let new_bucket = AmountBucket::from(recv.total_value);
cohorts
.amount_range
.get_mut_by_bucket(new_bucket)
@@ -73,7 +81,7 @@ pub(crate) fn process_received(
.add(addr_data);
} else {
let prev_balance = addr_data.balance();
let new_balance = prev_balance + total_value;
let new_balance = prev_balance + recv.total_value;
let prev_bucket = AmountBucket::from(prev_balance);
let new_bucket = AmountBucket::from(new_balance);
@@ -97,13 +105,13 @@ pub(crate) fn process_received(
type_index,
prev_balance,
new_balance,
total_value,
recv.total_value,
addr_data
);
}
cohort_state.subtract(addr_data);
addr_data.receive_outputs(total_value, price, output_count);
addr_data.receive_outputs(recv.total_value, price, recv.output_count);
cohorts
.amount_range
.get_mut_by_bucket(new_bucket)
@@ -119,7 +127,7 @@ pub(crate) fn process_received(
.state
.as_mut()
.unwrap()
.receive_outputs(addr_data, total_value, price, output_count);
.receive_outputs(addr_data, recv.total_value, price, recv.output_count);
}
}
}

View File

@@ -57,40 +57,48 @@ pub(crate) fn process_inputs(
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
) -> Result<InputsResult> {
let items: Vec<_> = (0..input_count)
.into_par_iter()
.map(|local_idx| -> Result<_> {
let txindex = txinindex_to_txindex[local_idx];
let map_fn = |local_idx: usize| -> Result<_> {
let txindex = txinindex_to_txindex[local_idx];
let prev_height = txinindex_to_prev_height[local_idx];
let value = txinindex_to_value[local_idx];
let input_type = txinindex_to_outputtype[local_idx];
let prev_height = txinindex_to_prev_height[local_idx];
let value = txinindex_to_value[local_idx];
let input_type = txinindex_to_outputtype[local_idx];
if input_type.is_not_address() {
return Ok((prev_height, value, input_type, None));
}
if input_type.is_not_address() {
return Ok((prev_height, value, input_type, None));
}
let typeindex = txinindex_to_typeindex[local_idx];
let typeindex = txinindex_to_typeindex[local_idx];
// Look up address data
let addr_data_opt = load_uncached_address_data(
input_type,
typeindex,
first_addressindexes,
cache,
vr,
any_address_indexes,
addresses_data,
)?;
// Look up address data
let addr_data_opt = load_uncached_address_data(
input_type,
typeindex,
first_addressindexes,
cache,
vr,
any_address_indexes,
addresses_data,
)?;
Ok((
prev_height,
value,
input_type,
Some((typeindex, txindex, value, addr_data_opt)),
))
})
.collect::<Result<Vec<_>>>()?;
Ok((
prev_height,
value,
input_type,
Some((typeindex, txindex, value, addr_data_opt)),
))
};
let items: Vec<_> = if input_count < 128 {
(0..input_count)
.map(map_fn)
.collect::<Result<Vec<_>>>()?
} else {
(0..input_count)
.into_par_iter()
.map(map_fn)
.collect::<Result<Vec<_>>>()?
};
// Phase 2: Sequential accumulation - no merge overhead
// Estimate: unique heights bounded by block depth, addresses spread across ~8 types

View File

@@ -48,38 +48,46 @@ pub(crate) fn process_outputs(
) -> Result<OutputsResult> {
let output_count = txoutdata_vec.len();
// Phase 1: Parallel address lookups (mmap reads)
let items: Vec<_> = (0..output_count)
.into_par_iter()
.map(|local_idx| -> Result<_> {
let txoutdata = &txoutdata_vec[local_idx];
let value = txoutdata.value;
let output_type = txoutdata.outputtype;
// Phase 1: Address lookups (mmap reads) — parallel for large blocks, sequential for small
let map_fn = |local_idx: usize| -> Result<_> {
let txoutdata = &txoutdata_vec[local_idx];
let value = txoutdata.value;
let output_type = txoutdata.outputtype;
if output_type.is_not_address() {
return Ok((value, output_type, None));
}
if output_type.is_not_address() {
return Ok((value, output_type, None));
}
let typeindex = txoutdata.typeindex;
let txindex = txoutindex_to_txindex[local_idx];
let typeindex = txoutdata.typeindex;
let txindex = txoutindex_to_txindex[local_idx];
let addr_data_opt = load_uncached_address_data(
output_type,
typeindex,
first_addressindexes,
cache,
vr,
any_address_indexes,
addresses_data,
)?;
let addr_data_opt = load_uncached_address_data(
output_type,
typeindex,
first_addressindexes,
cache,
vr,
any_address_indexes,
addresses_data,
)?;
Ok((
value,
output_type,
Some((typeindex, txindex, value, addr_data_opt)),
))
})
.collect::<Result<Vec<_>>>()?;
Ok((
value,
output_type,
Some((typeindex, txindex, value, addr_data_opt)),
))
};
let items: Vec<_> = if output_count < 128 {
(0..output_count)
.map(map_fn)
.collect::<Result<Vec<_>>>()?
} else {
(0..output_count)
.into_par_iter()
.map(map_fn)
.collect::<Result<Vec<_>>>()?
};
// Phase 2: Sequential accumulation
let estimated_per_type = (output_count / 8).max(8);

View File

@@ -239,26 +239,40 @@ impl CostBasisFenwick {
return result;
}
// Sat-weighted percentiles: find first bucket where cumulative >= target
// Build sorted sat targets: [min=0, percentiles..., max=total-1]
let mut sat_targets = [0i64; PERCENTILES_LEN + 2];
sat_targets[0] = 0; // min
for (i, &p) in PERCENTILES.iter().enumerate() {
let target = (total_sats * i64::from(p) / 100 - 1).max(0);
let bucket = self.tree.kth(target, &sat_field);
result.sat_prices[i] = bucket_to_cents(bucket);
sat_targets[i + 1] = (total_sats * i64::from(p) / 100 - 1).max(0);
}
sat_targets[PERCENTILES_LEN + 1] = total_sats - 1; // max
// USD-weighted percentiles
let mut sat_buckets = [0usize; PERCENTILES_LEN + 2];
self.tree
.batch_kth(&sat_targets, &sat_field, &mut sat_buckets);
result.min_price = bucket_to_cents(sat_buckets[0]);
for i in 0..PERCENTILES_LEN {
result.sat_prices[i] = bucket_to_cents(sat_buckets[i + 1]);
}
result.max_price = bucket_to_cents(sat_buckets[PERCENTILES_LEN + 1]);
// USD-weighted percentiles (batch)
if total_usd > 0 {
let mut usd_targets = [0i128; PERCENTILES_LEN];
for (i, &p) in PERCENTILES.iter().enumerate() {
let target = (total_usd * i128::from(p) / 100 - 1).max(0);
let bucket = self.tree.kth(target, &usd_field);
result.usd_prices[i] = bucket_to_cents(bucket);
usd_targets[i] = (total_usd * i128::from(p) / 100 - 1).max(0);
}
let mut usd_buckets = [0usize; PERCENTILES_LEN];
self.tree
.batch_kth(&usd_targets, &usd_field, &mut usd_buckets);
for i in 0..PERCENTILES_LEN {
result.usd_prices[i] = bucket_to_cents(usd_buckets[i]);
}
}
// Min/max via kth(0) and kth(total-1)
result.min_price = bucket_to_cents(self.tree.kth(0i64, &sat_field));
result.max_price = bucket_to_cents(self.tree.kth(total_sats - 1, &sat_field));
result
}

View File

@@ -331,6 +331,28 @@ impl UTXOCohorts<Rw> {
.chain(type_.par_iter_mut().map(|x| x as &mut dyn DynCohortVecs))
}
/// Sequential mutable iterator over all separate (stateful) cohorts.
/// Use instead of `par_iter_separate_mut` when per-item work is trivial.
pub(crate) fn iter_separate_mut(
&mut self,
) -> impl Iterator<Item = &mut dyn DynCohortVecs> {
let Self {
age_range,
epoch,
class,
amount_range,
type_,
..
} = self;
age_range
.iter_mut()
.map(|x| x as &mut dyn DynCohortVecs)
.chain(epoch.iter_mut().map(|x| x as &mut dyn DynCohortVecs))
.chain(class.iter_mut().map(|x| x as &mut dyn DynCohortVecs))
.chain(amount_range.iter_mut().map(|x| x as &mut dyn DynCohortVecs))
.chain(type_.iter_mut().map(|x| x as &mut dyn DynCohortVecs))
}
/// Immutable iterator over all separate (stateful) cohorts.
pub(crate) fn iter_separate(&self) -> impl Iterator<Item = &dyn DynCohortVecs> {
self.age_range
@@ -705,21 +727,20 @@ impl UTXOCohorts<Rw> {
/// Reset state heights for all separate cohorts.
pub(crate) fn reset_separate_state_heights(&mut self) {
self.par_iter_separate_mut().for_each(|v| {
v.reset_state_starting_height();
});
self.iter_separate_mut()
.for_each(|v| v.reset_state_starting_height());
}
/// Reset cost_basis_data for all separate cohorts (called during fresh start).
pub(crate) fn reset_separate_cost_basis_data(&mut self) -> Result<()> {
self.par_iter_separate_mut()
self.iter_separate_mut()
.try_for_each(|v| v.reset_cost_basis_data_if_needed())
}
/// Validate computed versions for all cohorts.
pub(crate) fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {
// Validate separate cohorts
self.par_iter_separate_mut()
self.iter_separate_mut()
.try_for_each(|v| v.validate_computed_versions(base_version))?;
// Validate aggregate cohorts

View File

@@ -46,18 +46,22 @@ impl UTXOCohorts<Rw> {
.unwrap()
.receive_utxo_snapshot(&supply_state, &snapshot);
// Update output type cohorts
// Update output type cohorts (skip types with no outputs this block)
self.type_.iter_typed_mut().for_each(|(output_type, vecs)| {
vecs.state
.as_mut()
.unwrap()
.receive_utxo(received.by_type.get(output_type), price)
let supply_state = received.by_type.get(output_type);
if supply_state.utxo_count > 0 {
vecs.state
.as_mut()
.unwrap()
.receive_utxo(supply_state, price)
}
});
// Update amount range cohorts
// Update amount range cohorts (skip empty ranges)
received
.by_size_group
.iter_typed()
.filter(|(_, supply_state)| supply_state.utxo_count > 0)
.for_each(|(group, supply_state)| {
self.amount_range
.get_mut(group)

View File

@@ -94,10 +94,11 @@ impl UTXOCohorts<Rw> {
.supply -= &sent.spendable_supply;
}
// Update output type cohorts
// Update output type cohorts (skip zero-supply entries)
sent.by_type
.spendable
.iter_typed()
.filter(|(_, supply_state)| supply_state.utxo_count > 0)
.for_each(|(output_type, supply_state)| {
self.type_
.get_mut(output_type)
@@ -107,9 +108,10 @@ impl UTXOCohorts<Rw> {
.send_utxo(supply_state, current_price, prev_price, peak_price, age)
});
// Update amount range cohorts
// Update amount range cohorts (skip zero-supply entries)
sent.by_size_group
.iter_typed()
.filter(|(_, supply_state)| supply_state.utxo_count > 0)
.for_each(|(group, supply_state)| {
self.amount_range
.get_mut(group)

View File

@@ -238,12 +238,6 @@ pub(crate) fn process_blocks(
debug_assert_eq!(ctx.timestamp_at(height), timestamp);
debug_assert_eq!(ctx.price_at(height), block_price);
// Build txindex mappings for this block (reuses internal buffers)
let txoutindex_to_txindex =
txout_to_txindex_buf.build(first_txindex, tx_count, txindex_to_output_count);
let txinindex_to_txindex =
txin_to_txindex_buf.build(first_txindex, tx_count, txindex_to_input_count);
// Get first address indexes for this height from pre-collected vecs
let first_addressindexes = ByAddressType {
p2a: TypeIndex::from(first_p2a_vec[offset].to_usize()),
@@ -259,23 +253,19 @@ pub(crate) fn process_blocks(
// Reset per-block activity counts
activity_counts.reset();
// Collect output/input data using reusable iterators (16KB buffered reads)
// Must be done before rayon::join since iterators aren't Send
let txoutdata_vec = txout_iters.collect_block_outputs(first_txoutindex, output_count);
let (input_values, input_prev_heights, input_outputtypes, input_typeindexes) =
if input_count > 1 {
txin_iters.collect_block_inputs(first_txinindex + 1, input_count - 1, height)
} else {
(&[][..], &[][..], &[][..], &[][..])
};
// Process outputs, inputs, and tick-tock in parallel via rayon::join
// Process outputs, inputs, and tick-tock in parallel via rayon::join.
// Collection (build txindex mappings + bulk mmap reads) is merged into the
// processing closures so outputs and inputs collection overlap each other
// and tick-tock, instead of running sequentially before the join.
let (matured, oi_result) = rayon::join(
|| vecs.utxo_cohorts.tick_tock_next_block(chain_state, timestamp),
|| -> Result<_> {
let (outputs_result, inputs_result) = rayon::join(
|| {
let txoutindex_to_txindex = txout_to_txindex_buf
.build(first_txindex, tx_count, txindex_to_output_count);
let txoutdata_vec =
txout_iters.collect_block_outputs(first_txoutindex, output_count);
process_outputs(
txoutindex_to_txindex,
txoutdata_vec,
@@ -288,6 +278,14 @@ pub(crate) fn process_blocks(
},
|| -> Result<_> {
if input_count > 1 {
let txinindex_to_txindex = txin_to_txindex_buf
.build(first_txindex, tx_count, txindex_to_input_count);
let (input_values, input_prev_heights, input_outputtypes, input_typeindexes) =
txin_iters.collect_block_inputs(
first_txinindex + 1,
input_count - 1,
height,
);
process_inputs(
input_count - 1,
&txinindex_to_txindex[1..],

View File

@@ -1,8 +1,8 @@
use std::marker::PhantomData;
/// Number of ranges to cache. Small enough for O(1) linear scan,
/// large enough to cover the "hot" source blocks in a typical block.
const CACHE_SIZE: usize = 8;
/// Direct-mapped cache size. Power of 2 for fast masking.
const CACHE_SIZE: usize = 128;
const CACHE_MASK: usize = CACHE_SIZE - 1;
/// Maps ranges of indices to values for efficient reverse lookups.
///
@@ -10,15 +10,13 @@ const CACHE_SIZE: usize = 8;
/// in a sorted Vec and uses binary search to find the value for any index.
/// The value is derived from the position in the Vec.
///
/// Includes an LRU cache of recently accessed ranges to avoid binary search
/// when there's locality in access patterns.
/// Includes a direct-mapped cache for O(1) lookups when there's locality.
#[derive(Debug, Clone)]
pub struct RangeMap<I, V> {
/// Sorted vec of first_index values. Position in vec = value.
first_indexes: Vec<I>,
/// LRU cache: (range_low, range_high, value, age). Lower age = more recent.
cache: [(I, I, V, u8); CACHE_SIZE],
cache_len: u8,
/// Direct-mapped cache: (range_low, range_high, value, occupied). Inline for zero indirection.
cache: [(I, I, V, bool); CACHE_SIZE],
_phantom: PhantomData<V>,
}
@@ -26,14 +24,13 @@ impl<I: Default + Copy, V: Default + Copy> Default for RangeMap<I, V> {
fn default() -> Self {
Self {
first_indexes: Vec::new(),
cache: [(I::default(), I::default(), V::default(), 0); CACHE_SIZE],
cache_len: 0,
cache: [(I::default(), I::default(), V::default(), false); CACHE_SIZE],
_phantom: PhantomData,
}
}
}
impl<I: Ord + Copy + Default, V: From<usize> + Copy + Default> RangeMap<I, V> {
impl<I: Ord + Copy + Default + Into<usize>, V: From<usize> + Copy + Default> RangeMap<I, V> {
/// Number of ranges stored.
pub(crate) fn len(&self) -> usize {
self.first_indexes.len()
@@ -42,7 +39,7 @@ impl<I: Ord + Copy + Default, V: From<usize> + Copy + Default> RangeMap<I, V> {
/// Truncate to `new_len` ranges and clear the cache.
pub(crate) fn truncate(&mut self, new_len: usize) {
self.first_indexes.truncate(new_len);
self.cache_len = 0;
self.clear_cache();
}
/// Push a new first_index. Value is implicitly the current length.
@@ -66,21 +63,11 @@ impl<I: Ord + Copy + Default, V: From<usize> + Copy + Default> RangeMap<I, V> {
return None;
}
let cache_len = self.cache_len as usize;
// Check cache first (linear scan of small array)
for i in 0..cache_len {
let (low, high, value, _) = self.cache[i];
if index >= low && index < high {
// Cache hit - mark as most recently used
if self.cache[i].3 != 0 {
for j in 0..cache_len {
self.cache[j].3 = self.cache[j].3.saturating_add(1);
}
self.cache[i].3 = 0;
}
return Some(value);
}
// Direct-mapped cache lookup: O(1), no aging
let slot = Self::cache_slot(&index);
let entry = &self.cache[slot];
if entry.3 && index >= entry.0 && index < entry.1 {
return Some(entry.2);
}
// Cache miss - binary search
@@ -88,15 +75,12 @@ impl<I: Ord + Copy + Default, V: From<usize> + Copy + Default> RangeMap<I, V> {
if pos > 0 {
let value = V::from(pos - 1);
let low = self.first_indexes[pos - 1];
// For last range, use low as high (special marker)
// The check `index < high` will fail, but `index >= low` handles it
let high = self.first_indexes.get(pos).copied().unwrap_or(low);
let is_last = pos == self.first_indexes.len();
// Add to cache (skip if last range - unbounded high is tricky)
// Cache non-last ranges (last range has unbounded high)
if !is_last {
self.add_to_cache(low, high, value);
let high = self.first_indexes[pos];
self.cache[slot] = (low, high, value, true);
}
Some(value)
@@ -106,29 +90,14 @@ impl<I: Ord + Copy + Default, V: From<usize> + Copy + Default> RangeMap<I, V> {
}
#[inline]
fn add_to_cache(&mut self, low: I, high: I, value: V) {
let cache_len = self.cache_len as usize;
fn cache_slot(index: &I) -> usize {
let v: usize = (*index).into();
v & CACHE_MASK
}
// Age all entries
for i in 0..cache_len {
self.cache[i].3 = self.cache[i].3.saturating_add(1);
}
if cache_len < CACHE_SIZE {
// Not full - append
self.cache[cache_len] = (low, high, value, 0);
self.cache_len += 1;
} else {
// Full - evict oldest (highest age)
let mut oldest_idx = 0;
let mut oldest_age = 0u8;
for i in 0..CACHE_SIZE {
if self.cache[i].3 > oldest_age {
oldest_age = self.cache[i].3;
oldest_idx = i;
}
}
self.cache[oldest_idx] = (low, high, value, 0);
fn clear_cache(&mut self) {
for entry in self.cache.iter_mut() {
entry.3 = false;
}
}
}

View File

@@ -89,6 +89,8 @@ impl CachedStateRaw {
pub struct CachedUnrealizedState {
state: CachedStateRaw,
at_price: CentsCompact,
/// Cached output to skip redundant u128 divisions when nothing changed.
cached_output: Option<UnrealizedState>,
}
impl CachedUnrealizedState {
@@ -98,6 +100,7 @@ impl CachedUnrealizedState {
Self {
state,
at_price: price,
cached_output: None,
}
}
@@ -110,11 +113,18 @@ impl CachedUnrealizedState {
let new_price: CentsCompact = new_price.into();
if new_price != self.at_price {
self.update_for_price_change(new_price, map);
self.cached_output = None;
}
self.state.to_output()
if let Some(ref output) = self.cached_output {
return output.clone();
}
let output = self.state.to_output();
self.cached_output = Some(output.clone());
output
}
pub(crate) fn on_receive(&mut self, price: Cents, sats: Sats) {
self.cached_output = None;
let price: CentsCompact = price.into();
let sats_u128 = sats.as_u128();
let price_u128 = price.as_u128();
@@ -139,6 +149,7 @@ impl CachedUnrealizedState {
}
pub(crate) fn on_send(&mut self, price: Cents, sats: Sats) {
self.cached_output = None;
let price: CentsCompact = price.into();
let sats_u128 = sats.as_u128();
let price_u128 = price.as_u128();

View File

@@ -42,10 +42,14 @@ impl Vecs {
let value_reader = indexer.vecs.outputs.value.reader();
let actual_total = target - min;
let mut entries: Vec<Entry> = Vec::with_capacity(actual_total.min(BATCH_SIZE));
// Pre-allocate output buffers for scatter-write pattern
let mut out_txoutindex: Vec<TxOutIndex> = Vec::new();
let mut out_value: Vec<Sats> = Vec::new();
let mut batch_start = min;
while batch_start < target {
let batch_end = (batch_start + BATCH_SIZE).min(target);
let batch_len = batch_end - batch_start;
entries.clear();
let mut j = 0usize;
@@ -55,7 +59,7 @@ impl Vecs {
.outpoint
.for_each_range_at(batch_start, batch_end, |outpoint| {
entries.push(Entry {
txinindex: TxInIndex::from(batch_start + j),
original_idx: j,
txindex: outpoint.txindex(),
vout: outpoint.vout(),
txoutindex: TxOutIndex::COINBASE,
@@ -64,7 +68,7 @@ impl Vecs {
j += 1;
});
// Coinbase entries (txindex MAX) sorted to end
// Sort 1: by txindex (group by transaction for sequential first_txoutindex reads)
entries.sort_unstable_by_key(|e| e.txindex);
for entry in &mut entries {
if entry.txindex.is_coinbase() {
@@ -74,6 +78,7 @@ impl Vecs {
first_txoutindex_reader.get(entry.txindex.to_usize()) + entry.vout;
}
// Sort 2: by txoutindex (sequential value reads)
entries.sort_unstable_by_key(|e| e.txoutindex);
for entry in &mut entries {
if entry.txoutindex.is_coinbase() {
@@ -82,11 +87,22 @@ impl Vecs {
entry.value = value_reader.get(entry.txoutindex.to_usize());
}
entries.sort_unstable_by_key(|e| e.txinindex);
// Scatter-write to output buffers using original_idx (avoids Sort 3)
out_txoutindex.clear();
out_txoutindex.resize(batch_len, TxOutIndex::COINBASE);
out_value.clear();
out_value.resize(batch_len, Sats::MAX);
for entry in &entries {
out_txoutindex[entry.original_idx] = entry.txoutindex;
out_value[entry.original_idx] = entry.value;
}
for i in 0..batch_len {
let txinindex = TxInIndex::from(batch_start + i);
self.txoutindex
.truncate_push(entry.txinindex, entry.txoutindex)?;
self.value.truncate_push(entry.txinindex, entry.value)?;
.truncate_push(txinindex, out_txoutindex[i])?;
self.value.truncate_push(txinindex, out_value[i])?;
}
if batch_end < target {
@@ -106,7 +122,7 @@ impl Vecs {
}
struct Entry {
txinindex: TxInIndex,
original_idx: usize,
txindex: TxIndex,
vout: Vout,
txoutindex: TxOutIndex,

View File

@@ -88,6 +88,39 @@ impl<N: FenwickNode> FenwickTree<N> {
pos // 0-indexed bucket
}
/// Batch kth for sorted targets. Processes all targets at each tree level
/// for better cache locality vs individual kth() calls.
///
/// `sorted_targets` must be sorted ascending. `out` receives the 0-indexed bucket
/// for each target. Both slices must have the same length.
#[inline]
pub fn batch_kth<V, F>(&self, sorted_targets: &[V], field_fn: &F, out: &mut [usize])
where
V: Copy + PartialOrd + std::ops::SubAssign,
F: Fn(&N) -> V,
{
let k = sorted_targets.len();
debug_assert_eq!(out.len(), k);
debug_assert!(self.size > 0);
out.fill(0);
// Copy targets so we can subtract in-place
let mut remaining: smallvec::SmallVec<[V; 24]> = sorted_targets.into();
let mut bit = 1usize << (usize::BITS - 1 - self.size.leading_zeros() as u32);
while bit > 0 {
for i in 0..k {
let next = out[i] + bit;
if next <= self.size {
let val = field_fn(&self.tree[next]);
if remaining[i] >= val {
remaining[i] -= val;
out[i] = next;
}
}
}
bit >>= 1;
}
}
/// Write a raw frequency delta at a bucket. Does NOT maintain the Fenwick invariant.
/// Call [`build_in_place`] after all raw writes.
#[inline]

View File

@@ -150,6 +150,14 @@ impl Computer {
)?))
})?;
let pools_handle = big_thread().spawn_scoped(s, || -> Result<_> {
Ok(Box::new(pools::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?))
})?;
let cointime = Box::new(cointime::Vecs::forced_import(
&computed_path,
VERSION,
@@ -160,23 +168,33 @@ impl Computer {
let mining = mining_handle.join().unwrap()?;
let transactions = transactions_handle.join().unwrap()?;
let scripts = scripts_handle.join().unwrap()?;
let pools = Box::new(pools::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?);
let pools = pools_handle.join().unwrap()?;
Ok((blocks, mining, transactions, scripts, pools, cointime))
})
})?;
let distribution = timed("Imported distribution", || -> Result<_> {
Ok(Box::new(distribution::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?))
// Market and distribution are independent; import in parallel.
// Supply depends on distribution so it runs after.
let (distribution, market) = timed("Imported distribution/market", || {
thread::scope(|s| -> Result<_> {
let market_handle = big_thread().spawn_scoped(s, || -> Result<_> {
Ok(Box::new(market::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?))
})?;
let distribution = Box::new(distribution::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?);
let market = market_handle.join().unwrap()?;
Ok((distribution, market))
})
})?;
let supply = timed("Imported supply", || -> Result<_> {
@@ -188,14 +206,6 @@ impl Computer {
)?))
})?;
let market = timed("Imported market", || -> Result<_> {
Ok(Box::new(market::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?))
})?;
info!("Total import time: {:?}", import_start.elapsed());
let this = Self {

View File

@@ -19,33 +19,50 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.ath.compute(prices, blocks, starting_indexes, exit)?;
// Phase 1: Independent sub-modules in parallel
let (r1, r2) = rayon::join(
|| {
rayon::join(
|| self.ath.compute(prices, blocks, starting_indexes, exit),
|| self.lookback.compute(blocks, prices, starting_indexes, exit),
)
},
|| {
rayon::join(
|| self.range.compute(prices, blocks, starting_indexes, exit),
|| {
self.moving_average
.compute(blocks, prices, starting_indexes, exit)
},
)
},
);
r1.0?;
r1.1?;
r2.0?;
r2.1?;
// Lookback metrics (independent)
self.lookback
.compute(blocks, prices, starting_indexes, exit)?;
// Returns metrics (depends on lookback)
self.returns
.compute(prices, blocks, &self.lookback, starting_indexes, exit)?;
// Range metrics (independent)
self.range.compute(prices, blocks, starting_indexes, exit)?;
// Moving average metrics (independent)
self.moving_average
.compute(blocks, prices, starting_indexes, exit)?;
// DCA metrics (depends on lookback for lump sum comparison)
self.dca.compute(
indexes,
prices,
blocks,
&self.lookback,
starting_indexes,
exit,
)?;
// Phase 2: Depend on lookback
let (r3, r4) = rayon::join(
|| {
self.returns
.compute(prices, blocks, &self.lookback, starting_indexes, exit)
},
|| {
self.dca.compute(
indexes,
prices,
blocks,
&self.lookback,
starting_indexes,
exit,
)
},
);
r3?;
r4?;
// Phase 3: Depends on returns, range, moving_average
self.indicators.compute(
&mining.rewards,
&self.returns,