diff --git a/crates/brk_computer/src/stateful/mod.rs b/crates/brk_computer/src/stateful/mod.rs index bff27fe22..29a3be445 100644 --- a/crates/brk_computer/src/stateful/mod.rs +++ b/crates/brk_computer/src/stateful/mod.rs @@ -703,6 +703,25 @@ impl Vecs { Height::ZERO }; + // Import aggregate cohorts' price_to_amount + // We need to drop the borrows first to access utxo_cohorts directly + drop(separate_utxo_vecs); + drop(separate_address_vecs); + let starting_height = if starting_height.is_not_zero() + && self + .utxo_cohorts + .import_aggregate_price_to_amount(starting_height)? + == starting_height + { + starting_height + } else { + Height::ZERO + }; + // Re-collect the separate vecs + let mut separate_utxo_vecs = + self.utxo_cohorts.iter_separate_mut().collect::>(); + let mut separate_address_vecs = + self.address_cohorts.iter_separate_mut().collect::>(); // info!("starting_height = {starting_height}"); diff --git a/crates/brk_computer/src/stateful/utxo_cohort.rs b/crates/brk_computer/src/stateful/utxo_cohort.rs index 010995f3f..eaeb1f52d 100644 --- a/crates/brk_computer/src/stateful/utxo_cohort.rs +++ b/crates/brk_computer/src/stateful/utxo_cohort.rs @@ -8,7 +8,7 @@ use vecdb::{Database, Exit, IterableVec}; use crate::{ Indexes, PriceToAmount, UTXOCohortState, - grouped::PERCENTILES_LEN, + grouped::{PERCENTILES, PERCENTILES_LEN}, indexes, price, stateful::{ common, @@ -53,7 +53,11 @@ impl Vecs { state_starting_height: None, state: if state_level.is_full() { - Some(UTXOCohortState::new(states_path, &full_name, compute_dollars)) + Some(UTXOCohortState::new( + states_path, + &full_name, + compute_dollars, + )) } else { None }, @@ -191,9 +195,10 @@ impl CohortVecs for Vecs { impl Vecs { /// Compute percentile prices for aggregate cohorts that have standalone price_to_amount. /// Returns NaN array if price_to_amount is None or empty. - pub fn compute_percentile_prices_from_standalone(&self, supply: Sats) -> [Dollars; PERCENTILES_LEN] { - use crate::grouped::PERCENTILES; - + pub fn compute_percentile_prices_from_standalone( + &self, + supply: Sats, + ) -> [Dollars; PERCENTILES_LEN] { let mut result = [Dollars::NAN; PERCENTILES_LEN]; let price_to_amount = match self.price_to_amount.as_ref() { @@ -205,14 +210,14 @@ impl Vecs { return result; } - let total = u64::from(supply); - let targets = PERCENTILES.map(|p| total * u64::from(p) / 100); + let total = supply; + let targets = PERCENTILES.map(|p| total * p / 100); - let mut accumulated = 0u64; + let mut accumulated = Sats::ZERO; let mut pct_idx = 0; for (&price, &sats) in price_to_amount.iter() { - accumulated += u64::from(sats); + accumulated += sats; while pct_idx < PERCENTILES_LEN && accumulated >= targets[pct_idx] { result[pct_idx] = price; diff --git a/crates/brk_computer/src/stateful/utxo_cohorts.rs b/crates/brk_computer/src/stateful/utxo_cohorts.rs index 1a070ae6c..365e08c39 100644 --- a/crates/brk_computer/src/stateful/utxo_cohorts.rs +++ b/crates/brk_computer/src/stateful/utxo_cohorts.rs @@ -12,6 +12,7 @@ use brk_types::{ Version, }; use derive_deref::{Deref, DerefMut}; +use rayon::prelude::*; use rustc_hash::FxHashMap; use vecdb::{Database, Exit, IterableVec, VecIndex}; @@ -1514,19 +1515,18 @@ impl Vecs { // Handle age transitions for aggregate cohorts' price_to_amount // Check which cohorts the UTXO was in vs is now in, and increment/decrement accordingly - if let Some(price) = block_state.price { + // Only process if there's remaining supply (like CohortState::increment/decrement do) + if let Some(price) = block_state.price + && block_state.supply.value > Sats::ZERO + { aggregate_p2a.iter_mut().for_each(|(filter, p2a)| { let is = filter.contains_time(days_old); let was = filter.contains_time(prev_days_old); if is && !was { - if let Some(p2a) = p2a.as_mut() { - p2a.increment(price, &block_state.supply); - } + p2a.as_mut().unwrap().increment(price, &block_state.supply); } else if was && !is { - if let Some(p2a) = p2a.as_mut() { - p2a.decrement(price, &block_state.supply); - } + p2a.as_mut().unwrap().decrement(price, &block_state.supply); } }); } @@ -1551,7 +1551,10 @@ impl Vecs { .. } = &mut self.0; - let mut time_based_vecs = age_range.iter_mut().chain(epoch.iter_mut()).collect::>(); + let mut time_based_vecs = age_range + .iter_mut() + .chain(epoch.iter_mut()) + .collect::>(); // Collect aggregate cohorts' filter and p2a for iteration let mut aggregate_p2a: Vec<(Filter, Option<&mut crate::PriceToAmount>)> = vec![ @@ -1613,50 +1616,40 @@ impl Vecs { .spendable .iter_typed() .for_each(|(output_type, supply_state)| { - type_ - .get_mut(output_type) - .state - .as_mut() - .unwrap() - .send( - supply_state, - current_price, - prev_price, - blocks_old, - days_old_float, - older_than_hour, - ) + type_.get_mut(output_type).state.as_mut().unwrap().send( + supply_state, + current_price, + prev_price, + blocks_old, + days_old_float, + older_than_hour, + ) }); sent.by_size_group .iter_typed() .for_each(|(group, supply_state)| { - amount_range - .get_mut(group) - .state - .as_mut() - .unwrap() - .send( - supply_state, - current_price, - prev_price, - blocks_old, - days_old_float, - older_than_hour, - ); + amount_range.get_mut(group).state.as_mut().unwrap().send( + supply_state, + current_price, + prev_price, + blocks_old, + days_old_float, + older_than_hour, + ); }); // Update aggregate cohorts' price_to_amount using filter.contains_time() if let Some(prev_price) = prev_price { let supply_state = &sent.spendable_supply; if supply_state.value.is_not_zero() { - aggregate_p2a.iter_mut().for_each(|(filter, p2a)| { - if filter.contains_time(days_old) { - if let Some(p2a) = p2a.as_mut() { - p2a.decrement(prev_price, supply_state); - } - } - }); + aggregate_p2a + .iter_mut() + .filter(|(f, _)| f.contains_time(days_old)) + .map(|(_, p2a)| p2a) + .for_each(|p2a| { + p2a.as_mut().unwrap().decrement(prev_price, supply_state); + }); } } }); @@ -1679,13 +1672,15 @@ impl Vecs { if let Some(price) = price && supply_state.value.is_not_zero() { - self.0.iter_aggregate_mut().for_each(|v| { - if v.filter().contains_time(0) { - if let Some(p2a) = v.price_to_amount.as_mut() { - p2a.increment(price, &supply_state); - } - } - }); + self.0 + .iter_aggregate_mut() + .filter(|v| v.filter().contains_time(0)) + .for_each(|v| { + v.price_to_amount + .as_mut() + .unwrap() + .increment(price, &supply_state); + }); } self.type_.iter_mut().for_each(|vecs| { @@ -1843,39 +1838,60 @@ impl Vecs { }) } + /// Import aggregate cohorts' price_to_amount from disk when resuming from a checkpoint + pub fn import_aggregate_price_to_amount(&mut self, height: Height) -> Result { + let mut min_height = height; + for v in self.0.iter_aggregate_mut() { + if let Some(p2a) = v.price_to_amount.as_mut() { + min_height = min_height.min(p2a.import_at_or_before(height)?); + } + } + Ok(min_height) + } + /// Compute and push percentiles for aggregate cohorts (all, sth, lth). /// Must be called after receive()/send() when price_to_amount is up to date. pub fn truncate_push_aggregate_percentiles(&mut self, height: Height) -> Result<()> { - // First, compute supplies for each aggregate cohort by summing age_range sub-cohorts - let supplies: Vec<(Filter, Sats)> = self + let age_range_data: Vec<_> = self .0 - .iter_aggregate() - .map(|v| { - let filter = v.filter().clone(); - let supply = self - .0 - .age_range - .iter() - .filter(|sub| filter.includes(sub.filter())) - .map(|sub| sub.state.as_ref().unwrap().supply.value) - .fold(Sats::ZERO, |acc, v| acc + v); - (filter, supply) + .age_range + .iter() + .map(|sub| { + ( + sub.filter().clone(), + sub.state.as_ref().unwrap().supply.value, + ) }) .collect(); - // Then, compute and push percentiles for each aggregate cohort - for (filter, supply) in supplies { + let results: Vec<_> = self + .0 + .par_iter_aggregate() + .map(|v| { + if v.price_to_amount.is_none() { + panic!(); + } + let filter = v.filter().clone(); + let supply = age_range_data + .iter() + .filter(|(sub_filter, _)| filter.includes(sub_filter)) + .map(|(_, value)| *value) + .fold(Sats::ZERO, |acc, v| acc + v); + let percentiles = v.compute_percentile_prices_from_standalone(supply); + (filter, percentiles) + }) + .collect(); + + // Push results sequentially (requires &mut) + for (filter, percentiles) in results { let v = self .0 .iter_aggregate_mut() .find(|v| v.filter() == &filter) .unwrap(); - if v.price_to_amount.is_some() { - let percentiles = v.compute_percentile_prices_from_standalone(supply); - if let Some(pp) = v.inner.price_percentiles.as_mut() { - pp.truncate_push(height, &percentiles)?; - } + if let Some(pp) = v.inner.price_percentiles.as_mut() { + pp.truncate_push(height, &percentiles)?; } } diff --git a/crates/brk_grouper/src/by_amount_range.rs b/crates/brk_grouper/src/by_amount_range.rs index 52d59a2d3..9816879fc 100644 --- a/crates/brk_grouper/src/by_amount_range.rs +++ b/crates/brk_grouper/src/by_amount_range.rs @@ -31,7 +31,7 @@ impl ByAmountRange { F: FnMut(Filter) -> T, { Self { - _0sats: create(Filter::Amount(AmountFilter::LowerThan(Sats::_1))), + _0sats: create(Filter::Amount(AmountFilter::Range(Sats::ZERO..Sats::_1))), _1sat_to_10sats: create(Filter::Amount(AmountFilter::Range(Sats::_1..Sats::_10))), _10sats_to_100sats: create(Filter::Amount(AmountFilter::Range(Sats::_10..Sats::_100))), _100sats_to_1k_sats: create(Filter::Amount(AmountFilter::Range(Sats::_100..Sats::_1K))), @@ -45,7 +45,7 @@ impl ByAmountRange { _100btc_to_1k_btc: create(Filter::Amount(AmountFilter::Range(Sats::_100BTC..Sats::_1K_BTC))), _1k_btc_to_10k_btc: create(Filter::Amount(AmountFilter::Range(Sats::_1K_BTC..Sats::_10K_BTC))), _10k_btc_to_100k_btc: create(Filter::Amount(AmountFilter::Range(Sats::_10K_BTC..Sats::_100K_BTC))), - _100k_btc_or_more: create(Filter::Amount(AmountFilter::GreaterOrEqual(Sats::_100K_BTC))), + _100k_btc_or_more: create(Filter::Amount(AmountFilter::Range(Sats::_100K_BTC..Sats::MAX))), } } diff --git a/crates/brk_grouper/src/by_term.rs b/crates/brk_grouper/src/by_term.rs index ec664ff29..b7bfb7c39 100644 --- a/crates/brk_grouper/src/by_term.rs +++ b/crates/brk_grouper/src/by_term.rs @@ -1,4 +1,5 @@ use brk_traversable::Traversable; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::{Filter, Term}; @@ -26,4 +27,18 @@ impl ByTerm { pub fn iter_mut(&mut self) -> impl Iterator { [&mut self.short, &mut self.long].into_iter() } + + pub fn par_iter(&self) -> impl ParallelIterator + where + T: Send + Sync, + { + [&self.short, &self.long].into_par_iter() + } + + pub fn par_iter_mut(&mut self) -> impl ParallelIterator + where + T: Send + Sync, + { + [&mut self.short, &mut self.long].into_par_iter() + } } diff --git a/crates/brk_grouper/src/utxo.rs b/crates/brk_grouper/src/utxo.rs index aac60fe1b..4eef7a17d 100644 --- a/crates/brk_grouper/src/utxo.rs +++ b/crates/brk_grouper/src/utxo.rs @@ -102,11 +102,25 @@ impl UTXOGroups { [&self.all].into_iter().chain(self.term.iter()) } + pub fn par_iter_aggregate(&self) -> impl ParallelIterator + where + T: Send + Sync, + { + [&self.all].into_par_iter().chain(self.term.par_iter()) + } + /// Iterator over aggregate cohorts (all, sth, lth) that compute values from sub-cohorts. /// These are cohorts with StateLevel::PriceOnly that derive values from stateful sub-cohorts. pub fn iter_aggregate_mut(&mut self) -> impl Iterator { + [&mut self.all].into_iter().chain(self.term.iter_mut()) + } + + pub fn par_iter_aggregate_mut(&mut self) -> impl ParallelIterator + where + T: Send + Sync, + { [&mut self.all] - .into_iter() - .chain(self.term.iter_mut()) + .into_par_iter() + .chain(self.term.par_iter_mut()) } } diff --git a/crates/brk_types/src/dollars.rs b/crates/brk_types/src/dollars.rs index aec17b5f5..04ce0836c 100644 --- a/crates/brk_types/src/dollars.rs +++ b/crates/brk_types/src/dollars.rs @@ -36,6 +36,10 @@ impl Dollars { pub fn is_negative(&self) -> bool { self.0 < 0.0 } + + pub fn is_zero(&self) -> bool { + self.0 == 0.0 + } } impl From for Dollars { diff --git a/crates/brk_types/src/sats.rs b/crates/brk_types/src/sats.rs index b30cdae94..33fea5bea 100644 --- a/crates/brk_types/src/sats.rs +++ b/crates/brk_types/src/sats.rs @@ -132,6 +132,13 @@ impl Mul for Sats { } } +impl Mul for Sats { + type Output = Self; + fn mul(self, rhs: u8) -> Self::Output { + Sats::from(self.0.checked_mul(rhs as u64).unwrap()) + } +} + impl Mul for Sats { type Output = Self; fn mul(self, rhs: u64) -> Self::Output { @@ -146,10 +153,17 @@ impl Mul for Sats { } } +impl Mul for Sats { + type Output = Self; + fn mul(self, rhs: f64) -> Self::Output { + Sats::from((self.0 as f64 * rhs) as u64) + } +} + impl Mul for Sats { type Output = Self; fn mul(self, rhs: StoredF64) -> Self::Output { - Sats::from((self.0 as f64 * f64::from(rhs)) as u64) + self * f64::from(rhs) } }