mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-06-10 06:53:33 -07:00
computer: stateful snapshot
This commit is contained in:
@@ -1,11 +1,11 @@
|
||||
use brk_error::Result;
|
||||
use brk_traversable::{Traversable, TreeNode};
|
||||
use brk_types::{Dollars, Height, Version};
|
||||
use brk_types::{DateIndex, Dollars, Version};
|
||||
use vecdb::{AnyExportableVec, AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, PcoVec};
|
||||
|
||||
use crate::{Indexes, indexes, stateful::Flushable};
|
||||
|
||||
use super::{ComputedVecsFromHeight, Source, VecBuilderOptions};
|
||||
use super::{ComputedVecsFromDateIndex, Source, VecBuilderOptions};
|
||||
|
||||
pub const PERCENTILES: [u8; 19] = [
|
||||
5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95,
|
||||
@@ -14,7 +14,7 @@ pub const PERCENTILES_LEN: usize = PERCENTILES.len();
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PricePercentiles {
|
||||
pub vecs: [Option<ComputedVecsFromHeight<Dollars>>; PERCENTILES_LEN],
|
||||
pub vecs: [Option<ComputedVecsFromDateIndex<Dollars>>; PERCENTILES_LEN],
|
||||
}
|
||||
|
||||
const VERSION: Version = Version::ZERO;
|
||||
@@ -29,7 +29,7 @@ impl PricePercentiles {
|
||||
) -> Result<Self> {
|
||||
let vecs = PERCENTILES.map(|p| {
|
||||
compute.then(|| {
|
||||
ComputedVecsFromHeight::forced_import(
|
||||
ComputedVecsFromDateIndex::forced_import(
|
||||
db,
|
||||
&format!("{name}_price_pct{p:02}"),
|
||||
Source::Compute,
|
||||
@@ -44,17 +44,19 @@ impl PricePercentiles {
|
||||
Ok(Self { vecs })
|
||||
}
|
||||
|
||||
/// Push percentile prices at date boundary.
|
||||
/// Only called when dateindex is Some (last height of the day).
|
||||
pub fn truncate_push(
|
||||
&mut self,
|
||||
height: Height,
|
||||
dateindex: DateIndex,
|
||||
percentile_prices: &[Dollars; PERCENTILES_LEN],
|
||||
) -> Result<()> {
|
||||
for (i, vec) in self.vecs.iter_mut().enumerate() {
|
||||
if let Some(v) = vec {
|
||||
v.height
|
||||
v.dateindex
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.truncate_push(height, percentile_prices[i])?;
|
||||
.truncate_push(dateindex, percentile_prices[i])?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -62,22 +64,20 @@ impl PricePercentiles {
|
||||
|
||||
pub fn compute_rest(
|
||||
&mut self,
|
||||
indexes: &indexes::Vecs,
|
||||
starting_indexes: &Indexes,
|
||||
exit: &Exit,
|
||||
) -> Result<()> {
|
||||
for vec in self.vecs.iter_mut().flatten() {
|
||||
vec.compute_rest(
|
||||
indexes,
|
||||
starting_indexes,
|
||||
exit,
|
||||
None::<&EagerVec<PcoVec<Height, Dollars>>>,
|
||||
None::<&EagerVec<PcoVec<DateIndex, Dollars>>>,
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get(&self, percentile: u8) -> Option<&ComputedVecsFromHeight<Dollars>> {
|
||||
pub fn get(&self, percentile: u8) -> Option<&ComputedVecsFromDateIndex<Dollars>> {
|
||||
PERCENTILES
|
||||
.iter()
|
||||
.position(|&p| p == percentile)
|
||||
@@ -88,8 +88,8 @@ impl PricePercentiles {
|
||||
impl Flushable for PricePercentiles {
|
||||
fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
|
||||
for vec in self.vecs.iter_mut().flatten() {
|
||||
if let Some(height_vec) = vec.height.as_mut() {
|
||||
height_vec.safe_flush(exit)?;
|
||||
if let Some(dateindex_vec) = vec.dateindex.as_mut() {
|
||||
dateindex_vec.safe_flush(exit)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -99,8 +99,8 @@ impl Flushable for PricePercentiles {
|
||||
impl PricePercentiles {
|
||||
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
|
||||
for vec in self.vecs.iter_mut().flatten() {
|
||||
if let Some(height_vec) = vec.height.as_mut() {
|
||||
height_vec.safe_write(exit)?;
|
||||
if let Some(dateindex_vec) = vec.dateindex.as_mut() {
|
||||
dateindex_vec.safe_write(exit)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -109,8 +109,8 @@ impl PricePercentiles {
|
||||
/// Validate computed versions or reset if mismatched.
|
||||
pub fn validate_computed_version_or_reset(&mut self, version: Version) -> Result<()> {
|
||||
for vec in self.vecs.iter_mut().flatten() {
|
||||
if let Some(height_vec) = vec.height.as_mut() {
|
||||
height_vec.validate_computed_version_or_reset(version)?;
|
||||
if let Some(dateindex_vec) = vec.dateindex.as_mut() {
|
||||
dateindex_vec.validate_computed_version_or_reset(version)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -15,7 +15,7 @@ mod traits;
|
||||
mod utxo;
|
||||
mod utxo_cohorts;
|
||||
|
||||
pub use crate::states::{Flushable, HeightFlushable};
|
||||
pub use crate::states::Flushable;
|
||||
pub use address::AddressCohortVecs;
|
||||
pub use address_cohorts::AddressCohorts;
|
||||
pub use state::CohortState;
|
||||
|
||||
@@ -347,4 +347,10 @@ impl CohortState {
|
||||
pub fn max_price(&self) -> Option<&Dollars> {
|
||||
self.price_to_amount.as_ref()?.last_key_value().map(|(k, _)| k)
|
||||
}
|
||||
|
||||
/// Get iterator over price_to_amount for merged percentile computation.
|
||||
/// Returns None if price data is not tracked for this cohort.
|
||||
pub fn price_to_amount_iter(&self) -> Option<impl Iterator<Item = (&Dollars, &Sats)>> {
|
||||
self.price_to_amount.as_ref().map(|p| p.iter())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,12 +5,11 @@ use std::path::Path;
|
||||
use brk_error::Result;
|
||||
use brk_grouper::{CohortContext, Filter, Filtered, StateLevel};
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{Bitcoin, DateIndex, Dollars, Height, Sats, Version};
|
||||
use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version};
|
||||
use vecdb::{Database, Exit, IterableVec};
|
||||
|
||||
use crate::{
|
||||
Indexes, PriceToAmount,
|
||||
grouped::{PERCENTILES, PERCENTILES_LEN},
|
||||
Indexes,
|
||||
indexes, price,
|
||||
stateful::{CohortVecs, DynCohortVecs, cohorts::UTXOCohortState},
|
||||
};
|
||||
@@ -23,14 +22,10 @@ pub struct UTXOCohortVecs {
|
||||
/// Starting height when state was imported
|
||||
state_starting_height: Option<Height>,
|
||||
|
||||
/// Runtime state for block-by-block processing
|
||||
/// Runtime state for block-by-block processing (separate cohorts only)
|
||||
#[traversable(skip)]
|
||||
pub state: Option<UTXOCohortState>,
|
||||
|
||||
/// For aggregate cohorts that only need price_to_amount for percentiles
|
||||
#[traversable(skip)]
|
||||
pub price_to_amount: Option<PriceToAmount>,
|
||||
|
||||
/// Metric vectors
|
||||
#[traversable(flatten)]
|
||||
pub metrics: CohortMetrics,
|
||||
@@ -72,12 +67,6 @@ impl UTXOCohortVecs {
|
||||
None
|
||||
},
|
||||
|
||||
price_to_amount: if state_level.is_price_only() && compute_dollars {
|
||||
Some(PriceToAmount::create(states_path, &full_name))
|
||||
} else {
|
||||
None
|
||||
},
|
||||
|
||||
metrics: CohortMetrics::forced_import(&cfg)?,
|
||||
})
|
||||
}
|
||||
@@ -99,45 +88,6 @@ impl UTXOCohortVecs {
|
||||
state.reset();
|
||||
}
|
||||
}
|
||||
|
||||
/// Compute percentile prices from standalone price_to_amount.
|
||||
/// Returns NaN array if price_to_amount is None or empty.
|
||||
pub fn compute_percentile_prices_from_standalone(
|
||||
&self,
|
||||
supply: Sats,
|
||||
) -> [Dollars; PERCENTILES_LEN] {
|
||||
let mut result = [Dollars::NAN; PERCENTILES_LEN];
|
||||
|
||||
let price_to_amount = match self.price_to_amount.as_ref() {
|
||||
Some(p) => p,
|
||||
None => return result,
|
||||
};
|
||||
|
||||
if price_to_amount.is_empty() || supply == Sats::ZERO {
|
||||
return result;
|
||||
}
|
||||
|
||||
let total = supply;
|
||||
let targets = PERCENTILES.map(|p| total * p as u64 / 100);
|
||||
|
||||
let mut accumulated = Sats::ZERO;
|
||||
let mut pct_idx = 0;
|
||||
|
||||
for (&price, &sats) in price_to_amount.iter() {
|
||||
accumulated += sats;
|
||||
|
||||
while pct_idx < PERCENTILES_LEN && accumulated >= targets[pct_idx] {
|
||||
result[pct_idx] = price;
|
||||
pct_idx += 1;
|
||||
}
|
||||
|
||||
if pct_idx >= PERCENTILES_LEN {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl Filtered for UTXOCohortVecs {
|
||||
|
||||
@@ -18,9 +18,14 @@ use derive_deref::{Deref, DerefMut};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{Database, Exit, IterableVec};
|
||||
|
||||
use crate::{Indexes, indexes, price, stateful::DynCohortVecs};
|
||||
use crate::{
|
||||
Indexes,
|
||||
grouped::{PERCENTILES, PERCENTILES_LEN},
|
||||
indexes, price,
|
||||
stateful::DynCohortVecs,
|
||||
};
|
||||
|
||||
use super::{CohortVecs, HeightFlushable, UTXOCohortVecs};
|
||||
use super::{CohortVecs, UTXOCohortVecs};
|
||||
|
||||
const VERSION: Version = Version::new(0);
|
||||
|
||||
@@ -350,34 +355,14 @@ impl UTXOCohorts {
|
||||
self.par_iter_separate_mut()
|
||||
.try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?;
|
||||
|
||||
// Flush aggregate cohorts' price_to_amount state AND metrics (including price_percentiles)
|
||||
// Flush aggregate cohorts' metrics (including price_percentiles)
|
||||
// Note: aggregate cohorts no longer maintain price_to_amount state
|
||||
for v in self.0.iter_aggregate_mut() {
|
||||
v.price_to_amount.flush_at_height(height, exit)?;
|
||||
v.metrics.safe_flush(exit)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reset aggregate cohorts' price_to_amount for fresh start.
|
||||
pub fn reset_aggregate_price_to_amount(&mut self) -> Result<()> {
|
||||
self.0
|
||||
.iter_aggregate_mut()
|
||||
.try_for_each(|v| v.price_to_amount.reset())
|
||||
}
|
||||
|
||||
/// Import aggregate cohorts' price_to_amount when resuming from checkpoint.
|
||||
pub fn import_aggregate_price_to_amount(&mut self, height: Height) -> Result<Height> {
|
||||
let Some(mut prev_height) = height.decremented() else {
|
||||
return Ok(Height::ZERO);
|
||||
};
|
||||
|
||||
for v in self.0.iter_aggregate_mut() {
|
||||
prev_height = prev_height.min(v.price_to_amount.import_at_or_before(prev_height)?);
|
||||
}
|
||||
|
||||
Ok(prev_height.incremented())
|
||||
}
|
||||
|
||||
/// Get minimum height from all separate cohorts' height-indexed vectors.
|
||||
pub fn min_separate_height_vecs_len(&self) -> Height {
|
||||
self.iter_separate()
|
||||
@@ -412,57 +397,115 @@ impl UTXOCohorts {
|
||||
}
|
||||
|
||||
/// Compute and push percentiles for aggregate cohorts (all, sth, lth).
|
||||
/// Must be called after receive()/send() when price_to_amount is up to date.
|
||||
pub fn truncate_push_aggregate_percentiles(&mut self, height: Height) -> Result<()> {
|
||||
// Collect supply values from age_range cohorts
|
||||
/// Computes on-demand by merging age_range cohorts' price_to_amount data.
|
||||
/// This avoids maintaining redundant aggregate price_to_amount maps.
|
||||
pub fn truncate_push_aggregate_percentiles(&mut self, dateindex: DateIndex) -> Result<()> {
|
||||
use std::cmp::Reverse;
|
||||
use std::collections::BinaryHeap;
|
||||
|
||||
// Collect (filter, supply, price_to_amount as Vec) from age_range cohorts
|
||||
let age_range_data: Vec<_> = self
|
||||
.0
|
||||
.age_range
|
||||
.iter()
|
||||
.map(|sub| {
|
||||
(
|
||||
sub.filter().clone(),
|
||||
sub.state
|
||||
.as_ref()
|
||||
.map(|s| s.supply.value)
|
||||
.unwrap_or(Sats::ZERO),
|
||||
)
|
||||
.filter_map(|sub| {
|
||||
let state = sub.state.as_ref()?;
|
||||
let entries: Vec<(Dollars, Sats)> = state
|
||||
.price_to_amount_iter()?
|
||||
.map(|(&p, &a)| (p, a))
|
||||
.collect();
|
||||
Some((sub.filter().clone(), state.supply.value, entries))
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Compute percentiles for each aggregate cohort in parallel
|
||||
let results: Vec<_> = self
|
||||
.0
|
||||
.par_iter_aggregate()
|
||||
.filter_map(|v| {
|
||||
v.price_to_amount.as_ref()?;
|
||||
let filter = v.filter().clone();
|
||||
let supply = age_range_data
|
||||
.iter()
|
||||
.filter(|(sub_filter, _)| filter.includes(sub_filter))
|
||||
.map(|(_, value)| *value)
|
||||
.fold(Sats::ZERO, |acc, v| acc + v);
|
||||
let percentiles = v.compute_percentile_prices_from_standalone(supply);
|
||||
Some((filter, percentiles))
|
||||
})
|
||||
.collect();
|
||||
// Compute percentiles for each aggregate filter
|
||||
for aggregate in self.0.iter_aggregate_mut() {
|
||||
let filter = aggregate.filter().clone();
|
||||
|
||||
// Push results sequentially (requires &mut)
|
||||
for (filter, percentiles) in results {
|
||||
let v = self
|
||||
.0
|
||||
.iter_aggregate_mut()
|
||||
.find(|v| v.filter() == &filter)
|
||||
.unwrap();
|
||||
|
||||
if let Some(pp) = v
|
||||
// Get price_percentiles storage, skip if not configured
|
||||
let Some(pp) = aggregate
|
||||
.metrics
|
||||
.price_paid
|
||||
.as_mut()
|
||||
.and_then(|p| p.price_percentiles.as_mut())
|
||||
{
|
||||
pp.truncate_push(height, &percentiles)?;
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// Collect relevant cohort data for this aggregate
|
||||
let relevant: Vec<_> = age_range_data
|
||||
.iter()
|
||||
.filter(|(sub_filter, _, _)| filter.includes(sub_filter))
|
||||
.collect();
|
||||
|
||||
// Calculate total supply
|
||||
let total_supply: u64 = relevant.iter().map(|(_, s, _)| u64::from(*s)).sum();
|
||||
|
||||
if total_supply == 0 {
|
||||
pp.truncate_push(dateindex, &[Dollars::NAN; PERCENTILES_LEN])?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// K-way merge using min-heap: O(n log k) where k = number of cohorts
|
||||
// Each heap entry: (price, amount, cohort_idx, entry_idx)
|
||||
let mut heap: BinaryHeap<Reverse<(Dollars, usize, usize)>> = BinaryHeap::new();
|
||||
|
||||
// Initialize heap with first entry from each cohort
|
||||
for (cohort_idx, (_, _, entries)) in relevant.iter().enumerate() {
|
||||
if !entries.is_empty() {
|
||||
heap.push(Reverse((entries[0].0, cohort_idx, 0)));
|
||||
}
|
||||
}
|
||||
|
||||
let targets = PERCENTILES.map(|p| total_supply * u64::from(p) / 100);
|
||||
let mut result = [Dollars::NAN; PERCENTILES_LEN];
|
||||
let mut accumulated = 0u64;
|
||||
let mut pct_idx = 0;
|
||||
let mut current_price: Option<Dollars> = None;
|
||||
let mut amount_at_price = 0u64;
|
||||
|
||||
while let Some(Reverse((price, cohort_idx, entry_idx))) = heap.pop() {
|
||||
let (_, _, entries) = relevant[cohort_idx];
|
||||
let (_, amount) = entries[entry_idx];
|
||||
|
||||
// If price changed, finalize previous price
|
||||
if let Some(current_price) = current_price
|
||||
&& current_price != price
|
||||
{
|
||||
accumulated += amount_at_price;
|
||||
|
||||
while pct_idx < PERCENTILES_LEN && accumulated >= targets[pct_idx] {
|
||||
result[pct_idx] = current_price;
|
||||
pct_idx += 1;
|
||||
}
|
||||
|
||||
if pct_idx >= PERCENTILES_LEN {
|
||||
break;
|
||||
}
|
||||
|
||||
amount_at_price = 0;
|
||||
}
|
||||
|
||||
current_price = Some(price);
|
||||
amount_at_price += u64::from(amount);
|
||||
|
||||
// Push next entry from this cohort
|
||||
let next_idx = entry_idx + 1;
|
||||
if next_idx < entries.len() {
|
||||
heap.push(Reverse((entries[next_idx].0, cohort_idx, next_idx)));
|
||||
}
|
||||
}
|
||||
|
||||
// Finalize last price
|
||||
if let Some(price) = current_price {
|
||||
accumulated += amount_at_price;
|
||||
while pct_idx < PERCENTILES_LEN && accumulated >= targets[pct_idx] {
|
||||
result[pct_idx] = price;
|
||||
pct_idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pp.truncate_push(dateindex, &result)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -28,22 +28,6 @@ impl UTXOCohorts {
|
||||
v.state.as_mut().unwrap().receive(&supply_state, price);
|
||||
});
|
||||
|
||||
// Update aggregate cohorts' price_to_amount
|
||||
// New UTXOs have days_old = 0, so check if filter includes day 0
|
||||
if let Some(price) = price
|
||||
&& supply_state.value.is_not_zero()
|
||||
{
|
||||
self.0
|
||||
.iter_aggregate_mut()
|
||||
.filter(|v| v.filter().contains_time(0))
|
||||
.for_each(|v| {
|
||||
v.price_to_amount
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.increment(price, &supply_state);
|
||||
});
|
||||
}
|
||||
|
||||
// Update output type cohorts
|
||||
self.type_.iter_mut().for_each(|vecs| {
|
||||
let output_type = match vecs.filter() {
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
//! Processing spent inputs (UTXOs being spent).
|
||||
|
||||
use brk_grouper::{Filter, Filtered, TimeFilter, UTXOGroups};
|
||||
use brk_grouper::{Filter, Filtered, TimeFilter};
|
||||
use brk_types::{CheckedSub, HalvingEpoch, Height};
|
||||
use rustc_hash::FxHashMap;
|
||||
use vecdb::VecIndex;
|
||||
|
||||
use crate::{states::{BlockState, Transacted}, utils::OptionExt, PriceToAmount};
|
||||
use crate::{states::{BlockState, Transacted}, utils::OptionExt};
|
||||
|
||||
use super::UTXOCohorts;
|
||||
|
||||
@@ -23,35 +23,14 @@ impl UTXOCohorts {
|
||||
return;
|
||||
}
|
||||
|
||||
let UTXOGroups {
|
||||
all,
|
||||
term,
|
||||
age_range,
|
||||
epoch,
|
||||
type_,
|
||||
amount_range,
|
||||
..
|
||||
} = &mut self.0;
|
||||
|
||||
// Time-based cohorts: age_range + epoch
|
||||
let mut time_cohorts: Vec<_> = age_range
|
||||
let mut time_cohorts: Vec<_> = self
|
||||
.0
|
||||
.age_range
|
||||
.iter_mut()
|
||||
.chain(epoch.iter_mut())
|
||||
.chain(self.0.epoch.iter_mut())
|
||||
.collect();
|
||||
|
||||
// Aggregate cohorts' price_to_amount
|
||||
let mut aggregate_p2a: Vec<(Filter, Option<&mut PriceToAmount>)> = vec![
|
||||
(all.filter().clone(), all.price_to_amount.as_mut()),
|
||||
(
|
||||
term.short.filter().clone(),
|
||||
term.short.price_to_amount.as_mut(),
|
||||
),
|
||||
(
|
||||
term.long.filter().clone(),
|
||||
term.long.price_to_amount.as_mut(),
|
||||
),
|
||||
];
|
||||
|
||||
let last_block = chain_state.last().unwrap();
|
||||
let last_timestamp = last_block.timestamp;
|
||||
let current_price = last_block.price;
|
||||
@@ -98,7 +77,7 @@ impl UTXOCohorts {
|
||||
.spendable
|
||||
.iter_typed()
|
||||
.for_each(|(output_type, supply_state)| {
|
||||
type_.get_mut(output_type).state.um().send(
|
||||
self.0.type_.get_mut(output_type).state.um().send(
|
||||
supply_state,
|
||||
current_price,
|
||||
prev_price,
|
||||
@@ -112,7 +91,7 @@ impl UTXOCohorts {
|
||||
sent.by_size_group
|
||||
.iter_typed()
|
||||
.for_each(|(group, supply_state)| {
|
||||
amount_range.get_mut(group).state.um().send(
|
||||
self.0.amount_range.get_mut(group).state.um().send(
|
||||
supply_state,
|
||||
current_price,
|
||||
prev_price,
|
||||
@@ -121,19 +100,6 @@ impl UTXOCohorts {
|
||||
older_than_hour,
|
||||
);
|
||||
});
|
||||
|
||||
// Update aggregate cohorts' price_to_amount
|
||||
if let Some(prev_price) = prev_price {
|
||||
let supply_state = &sent.spendable_supply;
|
||||
if supply_state.value.is_not_zero() {
|
||||
aggregate_p2a
|
||||
.iter_mut()
|
||||
.filter(|(f, _)| f.contains_time(days_old))
|
||||
.for_each(|(_, p2a)| {
|
||||
p2a.um().decrement(prev_price, supply_state);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,10 +3,10 @@
|
||||
//! When a new block arrives, UTXOs age. Some cross day boundaries
|
||||
//! and need to move between age-based cohorts.
|
||||
|
||||
use brk_grouper::{Filter, Filtered, UTXOGroups};
|
||||
use brk_types::{ONE_DAY_IN_SEC, Sats, Timestamp};
|
||||
use brk_grouper::{Filter, Filtered};
|
||||
use brk_types::{ONE_DAY_IN_SEC, Timestamp};
|
||||
|
||||
use crate::{PriceToAmount, states::BlockState, utils::OptionExt};
|
||||
use crate::states::BlockState;
|
||||
|
||||
use super::UTXOCohorts;
|
||||
|
||||
@@ -27,33 +27,14 @@ impl UTXOCohorts {
|
||||
let elapsed = (*timestamp).saturating_sub(*prev_timestamp);
|
||||
let threshold = ONE_DAY_IN_SEC.saturating_sub(elapsed);
|
||||
|
||||
// Extract mutable references to avoid borrow checker issues
|
||||
let UTXOGroups {
|
||||
all,
|
||||
term,
|
||||
age_range,
|
||||
..
|
||||
} = &mut self.0;
|
||||
|
||||
// Collect age_range cohorts with their filters and states
|
||||
let mut age_cohorts: Vec<(Filter, &mut Option<_>)> = age_range
|
||||
let mut age_cohorts: Vec<(Filter, &mut Option<_>)> = self
|
||||
.0
|
||||
.age_range
|
||||
.iter_mut()
|
||||
.map(|v| (v.filter().clone(), &mut v.state))
|
||||
.collect();
|
||||
|
||||
// Collect aggregate cohorts' price_to_amount for age transitions
|
||||
let mut aggregate_p2a: Vec<(Filter, Option<&mut PriceToAmount>)> = vec![
|
||||
(all.filter().clone(), all.price_to_amount.as_mut()),
|
||||
(
|
||||
term.short.filter().clone(),
|
||||
term.short.price_to_amount.as_mut(),
|
||||
),
|
||||
(
|
||||
term.long.filter().clone(),
|
||||
term.long.price_to_amount.as_mut(),
|
||||
),
|
||||
];
|
||||
|
||||
// Process blocks that might cross a day boundary
|
||||
chain_state
|
||||
.iter()
|
||||
@@ -86,22 +67,6 @@ impl UTXOCohorts {
|
||||
.decrement(&block_state.supply, block_state.price);
|
||||
}
|
||||
});
|
||||
|
||||
// Update aggregate cohorts' price_to_amount
|
||||
if let Some(price) = block_state.price
|
||||
&& block_state.supply.value > Sats::ZERO
|
||||
{
|
||||
aggregate_p2a.iter_mut().for_each(|(filter, p2a)| {
|
||||
let is_now = filter.contains_time(curr_days);
|
||||
let was_before = filter.contains_time(prev_days);
|
||||
|
||||
if is_now && !was_before {
|
||||
p2a.um().increment(price, &block_state.supply);
|
||||
} else if was_before && !is_now {
|
||||
p2a.um().decrement(price, &block_state.supply);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -457,8 +457,10 @@ pub fn process_blocks(
|
||||
)?;
|
||||
|
||||
// Compute and push percentiles for aggregate cohorts (all, sth, lth)
|
||||
vecs.utxo_cohorts
|
||||
.truncate_push_aggregate_percentiles(height)?;
|
||||
if let Some(dateindex) = dateindex_opt {
|
||||
vecs.utxo_cohorts
|
||||
.truncate_push_aggregate_percentiles(dateindex)?;
|
||||
}
|
||||
|
||||
// Periodic checkpoint flush
|
||||
if height != last_height
|
||||
|
||||
@@ -9,9 +9,9 @@ use brk_error::Result;
|
||||
use brk_types::Height;
|
||||
use vecdb::Stamp;
|
||||
|
||||
use super::super::AddressesDataVecs;
|
||||
use super::super::address::AnyAddressIndexesVecs;
|
||||
use super::super::cohorts::{AddressCohorts, UTXOCohorts};
|
||||
use super::super::AddressesDataVecs;
|
||||
|
||||
/// Result of state recovery.
|
||||
pub struct RecoveredState {
|
||||
@@ -68,14 +68,6 @@ pub fn recover_state(
|
||||
});
|
||||
}
|
||||
|
||||
// Import aggregate price_to_amount - must match height
|
||||
let imported = import_aggregate_price_to_amount(height, utxo_cohorts)?;
|
||||
if imported != height {
|
||||
return Ok(RecoveredState {
|
||||
starting_height: Height::ZERO,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(RecoveredState {
|
||||
starting_height: height,
|
||||
})
|
||||
@@ -102,9 +94,6 @@ pub fn reset_state(
|
||||
utxo_cohorts.reset_separate_price_to_amount()?;
|
||||
address_cohorts.reset_separate_price_to_amount()?;
|
||||
|
||||
// Reset aggregate cohorts' price_to_amount
|
||||
utxo_cohorts.reset_aggregate_price_to_amount()?;
|
||||
|
||||
Ok(RecoveredState {
|
||||
starting_height: Height::ZERO,
|
||||
})
|
||||
@@ -176,22 +165,3 @@ fn rollback_states(
|
||||
Height::ZERO
|
||||
}
|
||||
}
|
||||
|
||||
/// Import aggregate price_to_amount for UTXO cohorts.
|
||||
fn import_aggregate_price_to_amount(
|
||||
starting_height: Height,
|
||||
utxo_cohorts: &mut UTXOCohorts,
|
||||
) -> Result<Height> {
|
||||
if starting_height.is_zero() {
|
||||
return Ok(Height::ZERO);
|
||||
}
|
||||
|
||||
let imported = utxo_cohorts.import_aggregate_price_to_amount(starting_height)?;
|
||||
|
||||
Ok(if imported == starting_height {
|
||||
starting_height
|
||||
} else {
|
||||
Height::ZERO
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -142,6 +142,7 @@ impl CohortMetrics {
|
||||
}
|
||||
|
||||
/// Compute and push unrealized states.
|
||||
/// Percentiles are only computed at date boundaries (when dateindex is Some).
|
||||
pub fn compute_then_truncate_push_unrealized_states(
|
||||
&mut self,
|
||||
height: Height,
|
||||
@@ -167,7 +168,10 @@ impl CohortMetrics {
|
||||
date_unrealized_state.as_ref(),
|
||||
)?;
|
||||
|
||||
price_paid.truncate_push_percentiles(height, state)?;
|
||||
// Only compute expensive percentiles at date boundaries (~144x reduction)
|
||||
if let Some(dateindex) = dateindex {
|
||||
price_paid.truncate_push_percentiles(dateindex, state)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{Dollars, Height, Version};
|
||||
use brk_types::{DateIndex, Dollars, Height, Version};
|
||||
use vecdb::{AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVec};
|
||||
|
||||
use crate::{
|
||||
@@ -98,11 +98,16 @@ impl PricePaidMetrics {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Push price percentiles from state.
|
||||
pub fn truncate_push_percentiles(&mut self, height: Height, state: &CohortState) -> Result<()> {
|
||||
/// Push price percentiles from state at date boundary.
|
||||
/// Only called when at the last height of a day.
|
||||
pub fn truncate_push_percentiles(
|
||||
&mut self,
|
||||
dateindex: DateIndex,
|
||||
state: &CohortState,
|
||||
) -> Result<()> {
|
||||
if let Some(price_percentiles) = self.price_percentiles.as_mut() {
|
||||
let percentile_prices = state.compute_percentile_prices();
|
||||
price_percentiles.truncate_push(height, &percentile_prices)?;
|
||||
price_percentiles.truncate_push(dateindex, &percentile_prices)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
//! are forgotten during flush operations.
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_types::Height;
|
||||
use vecdb::Exit;
|
||||
|
||||
/// Trait for components that can be flushed to disk.
|
||||
@@ -16,24 +15,6 @@ pub trait Flushable {
|
||||
fn safe_flush(&mut self, exit: &Exit) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Trait for stateful components that track data indexed by height.
|
||||
///
|
||||
/// This ensures consistent patterns for:
|
||||
/// - Flushing state at checkpoints
|
||||
/// - Importing state when resuming from a checkpoint
|
||||
/// - Resetting state when starting from scratch
|
||||
pub trait HeightFlushable {
|
||||
/// Flush state to disk at the given height checkpoint.
|
||||
fn flush_at_height(&mut self, height: Height, exit: &Exit) -> Result<()>;
|
||||
|
||||
/// Import state from the most recent checkpoint at or before the given height.
|
||||
/// Returns the actual height that was imported.
|
||||
fn import_at_or_before(&mut self, height: Height) -> Result<Height>;
|
||||
|
||||
/// Reset state for starting from scratch.
|
||||
fn reset(&mut self) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Blanket implementation for Option<T> where T: Flushable
|
||||
impl<T: Flushable> Flushable for Option<T> {
|
||||
fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
|
||||
@@ -43,28 +24,3 @@ impl<T: Flushable> Flushable for Option<T> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Blanket implementation for Option<T> where T: HeightFlushable
|
||||
impl<T: HeightFlushable> HeightFlushable for Option<T> {
|
||||
fn flush_at_height(&mut self, height: Height, exit: &Exit) -> Result<()> {
|
||||
if let Some(inner) = self.as_mut() {
|
||||
inner.flush_at_height(height, exit)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn import_at_or_before(&mut self, height: Height) -> Result<Height> {
|
||||
if let Some(inner) = self.as_mut() {
|
||||
inner.import_at_or_before(height)
|
||||
} else {
|
||||
Ok(height)
|
||||
}
|
||||
}
|
||||
|
||||
fn reset(&mut self) -> Result<()> {
|
||||
if let Some(inner) = self.as_mut() {
|
||||
inner.reset()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,12 +9,10 @@ use brk_types::{Dollars, Height, Sats};
|
||||
use derive_deref::{Deref, DerefMut};
|
||||
use pco::standalone::{simple_decompress, simpler_compress};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use vecdb::{Bytes, Exit};
|
||||
use vecdb::Bytes;
|
||||
|
||||
use crate::{states::SupplyState, utils::OptionExt};
|
||||
|
||||
use super::HeightFlushable;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PriceToAmount {
|
||||
pathbuf: PathBuf,
|
||||
@@ -34,10 +32,9 @@ impl PriceToAmount {
|
||||
|
||||
pub fn import_at_or_before(&mut self, height: Height) -> Result<Height> {
|
||||
let files = self.read_dir(None)?;
|
||||
let (&height, path) = files
|
||||
.range(..=height)
|
||||
.next_back()
|
||||
.ok_or(Error::NotFound("No price state found at or before height".into()))?;
|
||||
let (&height, path) = files.range(..=height).next_back().ok_or(Error::NotFound(
|
||||
"No price state found at or before height".into(),
|
||||
))?;
|
||||
self.state = Some(State::deserialize(&fs::read(path)?)?);
|
||||
Ok(height)
|
||||
}
|
||||
@@ -114,10 +111,7 @@ impl PriceToAmount {
|
||||
fs::remove_file(path)?;
|
||||
}
|
||||
|
||||
fs::write(
|
||||
self.path_state(height),
|
||||
self.state.u().serialize()?,
|
||||
)?;
|
||||
fs::write(self.path_state(height), self.state.u().serialize()?)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -130,22 +124,6 @@ impl PriceToAmount {
|
||||
}
|
||||
}
|
||||
|
||||
impl HeightFlushable for PriceToAmount {
|
||||
fn flush_at_height(&mut self, height: Height, _exit: &Exit) -> Result<()> {
|
||||
self.flush(height)
|
||||
}
|
||||
|
||||
fn import_at_or_before(&mut self, height: Height) -> Result<Height> {
|
||||
PriceToAmount::import_at_or_before(self, height)
|
||||
}
|
||||
|
||||
fn reset(&mut self) -> Result<()> {
|
||||
self.clean()?;
|
||||
self.init();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default, Debug, Deref, DerefMut, Serialize, Deserialize)]
|
||||
struct State(BTreeMap<Dollars, Sats>);
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ use super::StoredU64;
|
||||
JsonSchema,
|
||||
Hash,
|
||||
)]
|
||||
#[allow(clippy::duplicated_attributes)]
|
||||
#[schemars(example = 0, example = 210_000, example = 420_000, example = 840_000)]
|
||||
pub struct Height(u32);
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use serde::Deserialize;
|
||||
/// Maximum number of results to return. Defaults to 100 if not specified.
|
||||
#[derive(Debug, Deref, Deserialize, JsonSchema)]
|
||||
#[serde(transparent)]
|
||||
#[allow(clippy::duplicated_attributes)]
|
||||
#[schemars(default, example = 1, example = 10, example = 100)]
|
||||
pub struct Limit(usize);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user