diff --git a/crates/brk_computer/src/grouped/price_percentiles.rs b/crates/brk_computer/src/grouped/price_percentiles.rs index f71e7d3f4..a70c37db1 100644 --- a/crates/brk_computer/src/grouped/price_percentiles.rs +++ b/crates/brk_computer/src/grouped/price_percentiles.rs @@ -1,11 +1,11 @@ use brk_error::Result; use brk_traversable::{Traversable, TreeNode}; -use brk_types::{Dollars, Height, Version}; +use brk_types::{DateIndex, Dollars, Version}; use vecdb::{AnyExportableVec, AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, PcoVec}; use crate::{Indexes, indexes, stateful::Flushable}; -use super::{ComputedVecsFromHeight, Source, VecBuilderOptions}; +use super::{ComputedVecsFromDateIndex, Source, VecBuilderOptions}; pub const PERCENTILES: [u8; 19] = [ 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, @@ -14,7 +14,7 @@ pub const PERCENTILES_LEN: usize = PERCENTILES.len(); #[derive(Clone)] pub struct PricePercentiles { - pub vecs: [Option>; PERCENTILES_LEN], + pub vecs: [Option>; PERCENTILES_LEN], } const VERSION: Version = Version::ZERO; @@ -29,7 +29,7 @@ impl PricePercentiles { ) -> Result { let vecs = PERCENTILES.map(|p| { compute.then(|| { - ComputedVecsFromHeight::forced_import( + ComputedVecsFromDateIndex::forced_import( db, &format!("{name}_price_pct{p:02}"), Source::Compute, @@ -44,17 +44,19 @@ impl PricePercentiles { Ok(Self { vecs }) } + /// Push percentile prices at date boundary. + /// Only called when dateindex is Some (last height of the day). pub fn truncate_push( &mut self, - height: Height, + dateindex: DateIndex, percentile_prices: &[Dollars; PERCENTILES_LEN], ) -> Result<()> { for (i, vec) in self.vecs.iter_mut().enumerate() { if let Some(v) = vec { - v.height + v.dateindex .as_mut() .unwrap() - .truncate_push(height, percentile_prices[i])?; + .truncate_push(dateindex, percentile_prices[i])?; } } Ok(()) @@ -62,22 +64,20 @@ impl PricePercentiles { pub fn compute_rest( &mut self, - indexes: &indexes::Vecs, starting_indexes: &Indexes, exit: &Exit, ) -> Result<()> { for vec in self.vecs.iter_mut().flatten() { vec.compute_rest( - indexes, starting_indexes, exit, - None::<&EagerVec>>, + None::<&EagerVec>>, )?; } Ok(()) } - pub fn get(&self, percentile: u8) -> Option<&ComputedVecsFromHeight> { + pub fn get(&self, percentile: u8) -> Option<&ComputedVecsFromDateIndex> { PERCENTILES .iter() .position(|&p| p == percentile) @@ -88,8 +88,8 @@ impl PricePercentiles { impl Flushable for PricePercentiles { fn safe_flush(&mut self, exit: &Exit) -> Result<()> { for vec in self.vecs.iter_mut().flatten() { - if let Some(height_vec) = vec.height.as_mut() { - height_vec.safe_flush(exit)?; + if let Some(dateindex_vec) = vec.dateindex.as_mut() { + dateindex_vec.safe_flush(exit)?; } } Ok(()) @@ -99,8 +99,8 @@ impl Flushable for PricePercentiles { impl PricePercentiles { pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { for vec in self.vecs.iter_mut().flatten() { - if let Some(height_vec) = vec.height.as_mut() { - height_vec.safe_write(exit)?; + if let Some(dateindex_vec) = vec.dateindex.as_mut() { + dateindex_vec.safe_write(exit)?; } } Ok(()) @@ -109,8 +109,8 @@ impl PricePercentiles { /// Validate computed versions or reset if mismatched. pub fn validate_computed_version_or_reset(&mut self, version: Version) -> Result<()> { for vec in self.vecs.iter_mut().flatten() { - if let Some(height_vec) = vec.height.as_mut() { - height_vec.validate_computed_version_or_reset(version)?; + if let Some(dateindex_vec) = vec.dateindex.as_mut() { + dateindex_vec.validate_computed_version_or_reset(version)?; } } Ok(()) diff --git a/crates/brk_computer/src/stateful/cohorts/mod.rs b/crates/brk_computer/src/stateful/cohorts/mod.rs index 65a34dd4b..c95e8d27a 100644 --- a/crates/brk_computer/src/stateful/cohorts/mod.rs +++ b/crates/brk_computer/src/stateful/cohorts/mod.rs @@ -15,7 +15,7 @@ mod traits; mod utxo; mod utxo_cohorts; -pub use crate::states::{Flushable, HeightFlushable}; +pub use crate::states::Flushable; pub use address::AddressCohortVecs; pub use address_cohorts::AddressCohorts; pub use state::CohortState; diff --git a/crates/brk_computer/src/stateful/cohorts/state.rs b/crates/brk_computer/src/stateful/cohorts/state.rs index 0ba52f9e0..14131af67 100644 --- a/crates/brk_computer/src/stateful/cohorts/state.rs +++ b/crates/brk_computer/src/stateful/cohorts/state.rs @@ -347,4 +347,10 @@ impl CohortState { pub fn max_price(&self) -> Option<&Dollars> { self.price_to_amount.as_ref()?.last_key_value().map(|(k, _)| k) } + + /// Get iterator over price_to_amount for merged percentile computation. + /// Returns None if price data is not tracked for this cohort. + pub fn price_to_amount_iter(&self) -> Option> { + self.price_to_amount.as_ref().map(|p| p.iter()) + } } diff --git a/crates/brk_computer/src/stateful/cohorts/utxo.rs b/crates/brk_computer/src/stateful/cohorts/utxo.rs index 8eae22512..f38eb4201 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo.rs @@ -5,12 +5,11 @@ use std::path::Path; use brk_error::Result; use brk_grouper::{CohortContext, Filter, Filtered, StateLevel}; use brk_traversable::Traversable; -use brk_types::{Bitcoin, DateIndex, Dollars, Height, Sats, Version}; +use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version}; use vecdb::{Database, Exit, IterableVec}; use crate::{ - Indexes, PriceToAmount, - grouped::{PERCENTILES, PERCENTILES_LEN}, + Indexes, indexes, price, stateful::{CohortVecs, DynCohortVecs, cohorts::UTXOCohortState}, }; @@ -23,14 +22,10 @@ pub struct UTXOCohortVecs { /// Starting height when state was imported state_starting_height: Option, - /// Runtime state for block-by-block processing + /// Runtime state for block-by-block processing (separate cohorts only) #[traversable(skip)] pub state: Option, - /// For aggregate cohorts that only need price_to_amount for percentiles - #[traversable(skip)] - pub price_to_amount: Option, - /// Metric vectors #[traversable(flatten)] pub metrics: CohortMetrics, @@ -72,12 +67,6 @@ impl UTXOCohortVecs { None }, - price_to_amount: if state_level.is_price_only() && compute_dollars { - Some(PriceToAmount::create(states_path, &full_name)) - } else { - None - }, - metrics: CohortMetrics::forced_import(&cfg)?, }) } @@ -99,45 +88,6 @@ impl UTXOCohortVecs { state.reset(); } } - - /// Compute percentile prices from standalone price_to_amount. - /// Returns NaN array if price_to_amount is None or empty. - pub fn compute_percentile_prices_from_standalone( - &self, - supply: Sats, - ) -> [Dollars; PERCENTILES_LEN] { - let mut result = [Dollars::NAN; PERCENTILES_LEN]; - - let price_to_amount = match self.price_to_amount.as_ref() { - Some(p) => p, - None => return result, - }; - - if price_to_amount.is_empty() || supply == Sats::ZERO { - return result; - } - - let total = supply; - let targets = PERCENTILES.map(|p| total * p as u64 / 100); - - let mut accumulated = Sats::ZERO; - let mut pct_idx = 0; - - for (&price, &sats) in price_to_amount.iter() { - accumulated += sats; - - while pct_idx < PERCENTILES_LEN && accumulated >= targets[pct_idx] { - result[pct_idx] = price; - pct_idx += 1; - } - - if pct_idx >= PERCENTILES_LEN { - break; - } - } - - result - } } impl Filtered for UTXOCohortVecs { diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs index 62f828160..d59779939 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs @@ -18,9 +18,14 @@ use derive_deref::{Deref, DerefMut}; use rayon::prelude::*; use vecdb::{Database, Exit, IterableVec}; -use crate::{Indexes, indexes, price, stateful::DynCohortVecs}; +use crate::{ + Indexes, + grouped::{PERCENTILES, PERCENTILES_LEN}, + indexes, price, + stateful::DynCohortVecs, +}; -use super::{CohortVecs, HeightFlushable, UTXOCohortVecs}; +use super::{CohortVecs, UTXOCohortVecs}; const VERSION: Version = Version::new(0); @@ -350,34 +355,14 @@ impl UTXOCohorts { self.par_iter_separate_mut() .try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?; - // Flush aggregate cohorts' price_to_amount state AND metrics (including price_percentiles) + // Flush aggregate cohorts' metrics (including price_percentiles) + // Note: aggregate cohorts no longer maintain price_to_amount state for v in self.0.iter_aggregate_mut() { - v.price_to_amount.flush_at_height(height, exit)?; v.metrics.safe_flush(exit)?; } Ok(()) } - /// Reset aggregate cohorts' price_to_amount for fresh start. - pub fn reset_aggregate_price_to_amount(&mut self) -> Result<()> { - self.0 - .iter_aggregate_mut() - .try_for_each(|v| v.price_to_amount.reset()) - } - - /// Import aggregate cohorts' price_to_amount when resuming from checkpoint. - pub fn import_aggregate_price_to_amount(&mut self, height: Height) -> Result { - let Some(mut prev_height) = height.decremented() else { - return Ok(Height::ZERO); - }; - - for v in self.0.iter_aggregate_mut() { - prev_height = prev_height.min(v.price_to_amount.import_at_or_before(prev_height)?); - } - - Ok(prev_height.incremented()) - } - /// Get minimum height from all separate cohorts' height-indexed vectors. pub fn min_separate_height_vecs_len(&self) -> Height { self.iter_separate() @@ -412,57 +397,115 @@ impl UTXOCohorts { } /// Compute and push percentiles for aggregate cohorts (all, sth, lth). - /// Must be called after receive()/send() when price_to_amount is up to date. - pub fn truncate_push_aggregate_percentiles(&mut self, height: Height) -> Result<()> { - // Collect supply values from age_range cohorts + /// Computes on-demand by merging age_range cohorts' price_to_amount data. + /// This avoids maintaining redundant aggregate price_to_amount maps. + pub fn truncate_push_aggregate_percentiles(&mut self, dateindex: DateIndex) -> Result<()> { + use std::cmp::Reverse; + use std::collections::BinaryHeap; + + // Collect (filter, supply, price_to_amount as Vec) from age_range cohorts let age_range_data: Vec<_> = self .0 .age_range .iter() - .map(|sub| { - ( - sub.filter().clone(), - sub.state - .as_ref() - .map(|s| s.supply.value) - .unwrap_or(Sats::ZERO), - ) + .filter_map(|sub| { + let state = sub.state.as_ref()?; + let entries: Vec<(Dollars, Sats)> = state + .price_to_amount_iter()? + .map(|(&p, &a)| (p, a)) + .collect(); + Some((sub.filter().clone(), state.supply.value, entries)) }) .collect(); - // Compute percentiles for each aggregate cohort in parallel - let results: Vec<_> = self - .0 - .par_iter_aggregate() - .filter_map(|v| { - v.price_to_amount.as_ref()?; - let filter = v.filter().clone(); - let supply = age_range_data - .iter() - .filter(|(sub_filter, _)| filter.includes(sub_filter)) - .map(|(_, value)| *value) - .fold(Sats::ZERO, |acc, v| acc + v); - let percentiles = v.compute_percentile_prices_from_standalone(supply); - Some((filter, percentiles)) - }) - .collect(); + // Compute percentiles for each aggregate filter + for aggregate in self.0.iter_aggregate_mut() { + let filter = aggregate.filter().clone(); - // Push results sequentially (requires &mut) - for (filter, percentiles) in results { - let v = self - .0 - .iter_aggregate_mut() - .find(|v| v.filter() == &filter) - .unwrap(); - - if let Some(pp) = v + // Get price_percentiles storage, skip if not configured + let Some(pp) = aggregate .metrics .price_paid .as_mut() .and_then(|p| p.price_percentiles.as_mut()) - { - pp.truncate_push(height, &percentiles)?; + else { + continue; + }; + + // Collect relevant cohort data for this aggregate + let relevant: Vec<_> = age_range_data + .iter() + .filter(|(sub_filter, _, _)| filter.includes(sub_filter)) + .collect(); + + // Calculate total supply + let total_supply: u64 = relevant.iter().map(|(_, s, _)| u64::from(*s)).sum(); + + if total_supply == 0 { + pp.truncate_push(dateindex, &[Dollars::NAN; PERCENTILES_LEN])?; + continue; } + + // K-way merge using min-heap: O(n log k) where k = number of cohorts + // Each heap entry: (price, amount, cohort_idx, entry_idx) + let mut heap: BinaryHeap> = BinaryHeap::new(); + + // Initialize heap with first entry from each cohort + for (cohort_idx, (_, _, entries)) in relevant.iter().enumerate() { + if !entries.is_empty() { + heap.push(Reverse((entries[0].0, cohort_idx, 0))); + } + } + + let targets = PERCENTILES.map(|p| total_supply * u64::from(p) / 100); + let mut result = [Dollars::NAN; PERCENTILES_LEN]; + let mut accumulated = 0u64; + let mut pct_idx = 0; + let mut current_price: Option = None; + let mut amount_at_price = 0u64; + + while let Some(Reverse((price, cohort_idx, entry_idx))) = heap.pop() { + let (_, _, entries) = relevant[cohort_idx]; + let (_, amount) = entries[entry_idx]; + + // If price changed, finalize previous price + if let Some(current_price) = current_price + && current_price != price + { + accumulated += amount_at_price; + + while pct_idx < PERCENTILES_LEN && accumulated >= targets[pct_idx] { + result[pct_idx] = current_price; + pct_idx += 1; + } + + if pct_idx >= PERCENTILES_LEN { + break; + } + + amount_at_price = 0; + } + + current_price = Some(price); + amount_at_price += u64::from(amount); + + // Push next entry from this cohort + let next_idx = entry_idx + 1; + if next_idx < entries.len() { + heap.push(Reverse((entries[next_idx].0, cohort_idx, next_idx))); + } + } + + // Finalize last price + if let Some(price) = current_price { + accumulated += amount_at_price; + while pct_idx < PERCENTILES_LEN && accumulated >= targets[pct_idx] { + result[pct_idx] = price; + pct_idx += 1; + } + } + + pp.truncate_push(dateindex, &result)?; } Ok(()) diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/receive.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/receive.rs index 89854c3ae..7df52517f 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/receive.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/receive.rs @@ -28,22 +28,6 @@ impl UTXOCohorts { v.state.as_mut().unwrap().receive(&supply_state, price); }); - // Update aggregate cohorts' price_to_amount - // New UTXOs have days_old = 0, so check if filter includes day 0 - if let Some(price) = price - && supply_state.value.is_not_zero() - { - self.0 - .iter_aggregate_mut() - .filter(|v| v.filter().contains_time(0)) - .for_each(|v| { - v.price_to_amount - .as_mut() - .unwrap() - .increment(price, &supply_state); - }); - } - // Update output type cohorts self.type_.iter_mut().for_each(|vecs| { let output_type = match vecs.filter() { diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs index f8309060a..0a6843224 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs @@ -1,11 +1,11 @@ //! Processing spent inputs (UTXOs being spent). -use brk_grouper::{Filter, Filtered, TimeFilter, UTXOGroups}; +use brk_grouper::{Filter, Filtered, TimeFilter}; use brk_types::{CheckedSub, HalvingEpoch, Height}; use rustc_hash::FxHashMap; use vecdb::VecIndex; -use crate::{states::{BlockState, Transacted}, utils::OptionExt, PriceToAmount}; +use crate::{states::{BlockState, Transacted}, utils::OptionExt}; use super::UTXOCohorts; @@ -23,35 +23,14 @@ impl UTXOCohorts { return; } - let UTXOGroups { - all, - term, - age_range, - epoch, - type_, - amount_range, - .. - } = &mut self.0; - // Time-based cohorts: age_range + epoch - let mut time_cohorts: Vec<_> = age_range + let mut time_cohorts: Vec<_> = self + .0 + .age_range .iter_mut() - .chain(epoch.iter_mut()) + .chain(self.0.epoch.iter_mut()) .collect(); - // Aggregate cohorts' price_to_amount - let mut aggregate_p2a: Vec<(Filter, Option<&mut PriceToAmount>)> = vec![ - (all.filter().clone(), all.price_to_amount.as_mut()), - ( - term.short.filter().clone(), - term.short.price_to_amount.as_mut(), - ), - ( - term.long.filter().clone(), - term.long.price_to_amount.as_mut(), - ), - ]; - let last_block = chain_state.last().unwrap(); let last_timestamp = last_block.timestamp; let current_price = last_block.price; @@ -98,7 +77,7 @@ impl UTXOCohorts { .spendable .iter_typed() .for_each(|(output_type, supply_state)| { - type_.get_mut(output_type).state.um().send( + self.0.type_.get_mut(output_type).state.um().send( supply_state, current_price, prev_price, @@ -112,7 +91,7 @@ impl UTXOCohorts { sent.by_size_group .iter_typed() .for_each(|(group, supply_state)| { - amount_range.get_mut(group).state.um().send( + self.0.amount_range.get_mut(group).state.um().send( supply_state, current_price, prev_price, @@ -121,19 +100,6 @@ impl UTXOCohorts { older_than_hour, ); }); - - // Update aggregate cohorts' price_to_amount - if let Some(prev_price) = prev_price { - let supply_state = &sent.spendable_supply; - if supply_state.value.is_not_zero() { - aggregate_p2a - .iter_mut() - .filter(|(f, _)| f.contains_time(days_old)) - .for_each(|(_, p2a)| { - p2a.um().decrement(prev_price, supply_state); - }); - } - } } } } diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs index 38e1dc50b..1a3eb0411 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs @@ -3,10 +3,10 @@ //! When a new block arrives, UTXOs age. Some cross day boundaries //! and need to move between age-based cohorts. -use brk_grouper::{Filter, Filtered, UTXOGroups}; -use brk_types::{ONE_DAY_IN_SEC, Sats, Timestamp}; +use brk_grouper::{Filter, Filtered}; +use brk_types::{ONE_DAY_IN_SEC, Timestamp}; -use crate::{PriceToAmount, states::BlockState, utils::OptionExt}; +use crate::states::BlockState; use super::UTXOCohorts; @@ -27,33 +27,14 @@ impl UTXOCohorts { let elapsed = (*timestamp).saturating_sub(*prev_timestamp); let threshold = ONE_DAY_IN_SEC.saturating_sub(elapsed); - // Extract mutable references to avoid borrow checker issues - let UTXOGroups { - all, - term, - age_range, - .. - } = &mut self.0; - // Collect age_range cohorts with their filters and states - let mut age_cohorts: Vec<(Filter, &mut Option<_>)> = age_range + let mut age_cohorts: Vec<(Filter, &mut Option<_>)> = self + .0 + .age_range .iter_mut() .map(|v| (v.filter().clone(), &mut v.state)) .collect(); - // Collect aggregate cohorts' price_to_amount for age transitions - let mut aggregate_p2a: Vec<(Filter, Option<&mut PriceToAmount>)> = vec![ - (all.filter().clone(), all.price_to_amount.as_mut()), - ( - term.short.filter().clone(), - term.short.price_to_amount.as_mut(), - ), - ( - term.long.filter().clone(), - term.long.price_to_amount.as_mut(), - ), - ]; - // Process blocks that might cross a day boundary chain_state .iter() @@ -86,22 +67,6 @@ impl UTXOCohorts { .decrement(&block_state.supply, block_state.price); } }); - - // Update aggregate cohorts' price_to_amount - if let Some(price) = block_state.price - && block_state.supply.value > Sats::ZERO - { - aggregate_p2a.iter_mut().for_each(|(filter, p2a)| { - let is_now = filter.contains_time(curr_days); - let was_before = filter.contains_time(prev_days); - - if is_now && !was_before { - p2a.um().increment(price, &block_state.supply); - } else if was_before && !is_now { - p2a.um().decrement(price, &block_state.supply); - } - }); - } }); } } diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index f8eca7397..e71d0e5c1 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -457,8 +457,10 @@ pub fn process_blocks( )?; // Compute and push percentiles for aggregate cohorts (all, sth, lth) - vecs.utxo_cohorts - .truncate_push_aggregate_percentiles(height)?; + if let Some(dateindex) = dateindex_opt { + vecs.utxo_cohorts + .truncate_push_aggregate_percentiles(dateindex)?; + } // Periodic checkpoint flush if height != last_height diff --git a/crates/brk_computer/src/stateful/compute/recover.rs b/crates/brk_computer/src/stateful/compute/recover.rs index 2ba950725..a1f62b17a 100644 --- a/crates/brk_computer/src/stateful/compute/recover.rs +++ b/crates/brk_computer/src/stateful/compute/recover.rs @@ -9,9 +9,9 @@ use brk_error::Result; use brk_types::Height; use vecdb::Stamp; +use super::super::AddressesDataVecs; use super::super::address::AnyAddressIndexesVecs; use super::super::cohorts::{AddressCohorts, UTXOCohorts}; -use super::super::AddressesDataVecs; /// Result of state recovery. pub struct RecoveredState { @@ -68,14 +68,6 @@ pub fn recover_state( }); } - // Import aggregate price_to_amount - must match height - let imported = import_aggregate_price_to_amount(height, utxo_cohorts)?; - if imported != height { - return Ok(RecoveredState { - starting_height: Height::ZERO, - }); - } - Ok(RecoveredState { starting_height: height, }) @@ -102,9 +94,6 @@ pub fn reset_state( utxo_cohorts.reset_separate_price_to_amount()?; address_cohorts.reset_separate_price_to_amount()?; - // Reset aggregate cohorts' price_to_amount - utxo_cohorts.reset_aggregate_price_to_amount()?; - Ok(RecoveredState { starting_height: Height::ZERO, }) @@ -176,22 +165,3 @@ fn rollback_states( Height::ZERO } } - -/// Import aggregate price_to_amount for UTXO cohorts. -fn import_aggregate_price_to_amount( - starting_height: Height, - utxo_cohorts: &mut UTXOCohorts, -) -> Result { - if starting_height.is_zero() { - return Ok(Height::ZERO); - } - - let imported = utxo_cohorts.import_aggregate_price_to_amount(starting_height)?; - - Ok(if imported == starting_height { - starting_height - } else { - Height::ZERO - }) -} - diff --git a/crates/brk_computer/src/stateful/metrics/mod.rs b/crates/brk_computer/src/stateful/metrics/mod.rs index 6a8e0c916..5afe0e331 100644 --- a/crates/brk_computer/src/stateful/metrics/mod.rs +++ b/crates/brk_computer/src/stateful/metrics/mod.rs @@ -142,6 +142,7 @@ impl CohortMetrics { } /// Compute and push unrealized states. + /// Percentiles are only computed at date boundaries (when dateindex is Some). pub fn compute_then_truncate_push_unrealized_states( &mut self, height: Height, @@ -167,7 +168,10 @@ impl CohortMetrics { date_unrealized_state.as_ref(), )?; - price_paid.truncate_push_percentiles(height, state)?; + // Only compute expensive percentiles at date boundaries (~144x reduction) + if let Some(dateindex) = dateindex { + price_paid.truncate_push_percentiles(dateindex, state)?; + } } Ok(()) diff --git a/crates/brk_computer/src/stateful/metrics/price_paid.rs b/crates/brk_computer/src/stateful/metrics/price_paid.rs index 90d7045f9..449e9b443 100644 --- a/crates/brk_computer/src/stateful/metrics/price_paid.rs +++ b/crates/brk_computer/src/stateful/metrics/price_paid.rs @@ -4,7 +4,7 @@ use brk_error::Result; use brk_traversable::Traversable; -use brk_types::{Dollars, Height, Version}; +use brk_types::{DateIndex, Dollars, Height, Version}; use vecdb::{AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVec}; use crate::{ @@ -98,11 +98,16 @@ impl PricePaidMetrics { Ok(()) } - /// Push price percentiles from state. - pub fn truncate_push_percentiles(&mut self, height: Height, state: &CohortState) -> Result<()> { + /// Push price percentiles from state at date boundary. + /// Only called when at the last height of a day. + pub fn truncate_push_percentiles( + &mut self, + dateindex: DateIndex, + state: &CohortState, + ) -> Result<()> { if let Some(price_percentiles) = self.price_percentiles.as_mut() { let percentile_prices = state.compute_percentile_prices(); - price_percentiles.truncate_push(height, &percentile_prices)?; + price_percentiles.truncate_push(dateindex, &percentile_prices)?; } Ok(()) } diff --git a/crates/brk_computer/src/states/flushable.rs b/crates/brk_computer/src/states/flushable.rs index e97180dbe..9bb339bec 100644 --- a/crates/brk_computer/src/states/flushable.rs +++ b/crates/brk_computer/src/states/flushable.rs @@ -5,7 +5,6 @@ //! are forgotten during flush operations. use brk_error::Result; -use brk_types::Height; use vecdb::Exit; /// Trait for components that can be flushed to disk. @@ -16,24 +15,6 @@ pub trait Flushable { fn safe_flush(&mut self, exit: &Exit) -> Result<()>; } -/// Trait for stateful components that track data indexed by height. -/// -/// This ensures consistent patterns for: -/// - Flushing state at checkpoints -/// - Importing state when resuming from a checkpoint -/// - Resetting state when starting from scratch -pub trait HeightFlushable { - /// Flush state to disk at the given height checkpoint. - fn flush_at_height(&mut self, height: Height, exit: &Exit) -> Result<()>; - - /// Import state from the most recent checkpoint at or before the given height. - /// Returns the actual height that was imported. - fn import_at_or_before(&mut self, height: Height) -> Result; - - /// Reset state for starting from scratch. - fn reset(&mut self) -> Result<()>; -} - /// Blanket implementation for Option where T: Flushable impl Flushable for Option { fn safe_flush(&mut self, exit: &Exit) -> Result<()> { @@ -43,28 +24,3 @@ impl Flushable for Option { Ok(()) } } - -/// Blanket implementation for Option where T: HeightFlushable -impl HeightFlushable for Option { - fn flush_at_height(&mut self, height: Height, exit: &Exit) -> Result<()> { - if let Some(inner) = self.as_mut() { - inner.flush_at_height(height, exit)?; - } - Ok(()) - } - - fn import_at_or_before(&mut self, height: Height) -> Result { - if let Some(inner) = self.as_mut() { - inner.import_at_or_before(height) - } else { - Ok(height) - } - } - - fn reset(&mut self) -> Result<()> { - if let Some(inner) = self.as_mut() { - inner.reset()?; - } - Ok(()) - } -} diff --git a/crates/brk_computer/src/states/price_to_amount.rs b/crates/brk_computer/src/states/price_to_amount.rs index 343fb9e89..40480e7be 100644 --- a/crates/brk_computer/src/states/price_to_amount.rs +++ b/crates/brk_computer/src/states/price_to_amount.rs @@ -9,12 +9,10 @@ use brk_types::{Dollars, Height, Sats}; use derive_deref::{Deref, DerefMut}; use pco::standalone::{simple_decompress, simpler_compress}; use serde::{Deserialize, Serialize}; -use vecdb::{Bytes, Exit}; +use vecdb::Bytes; use crate::{states::SupplyState, utils::OptionExt}; -use super::HeightFlushable; - #[derive(Clone, Debug)] pub struct PriceToAmount { pathbuf: PathBuf, @@ -34,10 +32,9 @@ impl PriceToAmount { pub fn import_at_or_before(&mut self, height: Height) -> Result { let files = self.read_dir(None)?; - let (&height, path) = files - .range(..=height) - .next_back() - .ok_or(Error::NotFound("No price state found at or before height".into()))?; + let (&height, path) = files.range(..=height).next_back().ok_or(Error::NotFound( + "No price state found at or before height".into(), + ))?; self.state = Some(State::deserialize(&fs::read(path)?)?); Ok(height) } @@ -114,10 +111,7 @@ impl PriceToAmount { fs::remove_file(path)?; } - fs::write( - self.path_state(height), - self.state.u().serialize()?, - )?; + fs::write(self.path_state(height), self.state.u().serialize()?)?; Ok(()) } @@ -130,22 +124,6 @@ impl PriceToAmount { } } -impl HeightFlushable for PriceToAmount { - fn flush_at_height(&mut self, height: Height, _exit: &Exit) -> Result<()> { - self.flush(height) - } - - fn import_at_or_before(&mut self, height: Height) -> Result { - PriceToAmount::import_at_or_before(self, height) - } - - fn reset(&mut self) -> Result<()> { - self.clean()?; - self.init(); - Ok(()) - } -} - #[derive(Clone, Default, Debug, Deref, DerefMut, Serialize, Deserialize)] struct State(BTreeMap); diff --git a/crates/brk_types/src/height.rs b/crates/brk_types/src/height.rs index 0352957e3..4304b9055 100644 --- a/crates/brk_types/src/height.rs +++ b/crates/brk_types/src/height.rs @@ -30,6 +30,7 @@ use super::StoredU64; JsonSchema, Hash, )] +#[allow(clippy::duplicated_attributes)] #[schemars(example = 0, example = 210_000, example = 420_000, example = 840_000)] pub struct Height(u32); diff --git a/crates/brk_types/src/limit.rs b/crates/brk_types/src/limit.rs index 9507cd32c..5b01c17a5 100644 --- a/crates/brk_types/src/limit.rs +++ b/crates/brk_types/src/limit.rs @@ -5,6 +5,7 @@ use serde::Deserialize; /// Maximum number of results to return. Defaults to 100 if not specified. #[derive(Debug, Deref, Deserialize, JsonSchema)] #[serde(transparent)] +#[allow(clippy::duplicated_attributes)] #[schemars(default, example = 1, example = 10, example = 100)] pub struct Limit(usize);