computer: fix stateful

This commit is contained in:
nym21
2025-12-03 15:43:50 +01:00
parent f48ad577d3
commit fcc74ba212
8 changed files with 169 additions and 82 deletions

View File

@@ -703,6 +703,25 @@ impl Vecs {
Height::ZERO
};
// Import aggregate cohorts' price_to_amount
// We need to drop the borrows first to access utxo_cohorts directly
drop(separate_utxo_vecs);
drop(separate_address_vecs);
let starting_height = if starting_height.is_not_zero()
&& self
.utxo_cohorts
.import_aggregate_price_to_amount(starting_height)?
== starting_height
{
starting_height
} else {
Height::ZERO
};
// Re-collect the separate vecs
let mut separate_utxo_vecs =
self.utxo_cohorts.iter_separate_mut().collect::<Vec<_>>();
let mut separate_address_vecs =
self.address_cohorts.iter_separate_mut().collect::<Vec<_>>();
// info!("starting_height = {starting_height}");

View File

@@ -8,7 +8,7 @@ use vecdb::{Database, Exit, IterableVec};
use crate::{
Indexes, PriceToAmount, UTXOCohortState,
grouped::PERCENTILES_LEN,
grouped::{PERCENTILES, PERCENTILES_LEN},
indexes, price,
stateful::{
common,
@@ -53,7 +53,11 @@ impl Vecs {
state_starting_height: None,
state: if state_level.is_full() {
Some(UTXOCohortState::new(states_path, &full_name, compute_dollars))
Some(UTXOCohortState::new(
states_path,
&full_name,
compute_dollars,
))
} else {
None
},
@@ -191,9 +195,10 @@ impl CohortVecs for Vecs {
impl Vecs {
/// Compute percentile prices for aggregate cohorts that have standalone price_to_amount.
/// Returns NaN array if price_to_amount is None or empty.
pub fn compute_percentile_prices_from_standalone(&self, supply: Sats) -> [Dollars; PERCENTILES_LEN] {
use crate::grouped::PERCENTILES;
pub fn compute_percentile_prices_from_standalone(
&self,
supply: Sats,
) -> [Dollars; PERCENTILES_LEN] {
let mut result = [Dollars::NAN; PERCENTILES_LEN];
let price_to_amount = match self.price_to_amount.as_ref() {
@@ -205,14 +210,14 @@ impl Vecs {
return result;
}
let total = u64::from(supply);
let targets = PERCENTILES.map(|p| total * u64::from(p) / 100);
let total = supply;
let targets = PERCENTILES.map(|p| total * p / 100);
let mut accumulated = 0u64;
let mut accumulated = Sats::ZERO;
let mut pct_idx = 0;
for (&price, &sats) in price_to_amount.iter() {
accumulated += u64::from(sats);
accumulated += sats;
while pct_idx < PERCENTILES_LEN && accumulated >= targets[pct_idx] {
result[pct_idx] = price;

View File

@@ -12,6 +12,7 @@ use brk_types::{
Version,
};
use derive_deref::{Deref, DerefMut};
use rayon::prelude::*;
use rustc_hash::FxHashMap;
use vecdb::{Database, Exit, IterableVec, VecIndex};
@@ -1514,19 +1515,18 @@ impl Vecs {
// Handle age transitions for aggregate cohorts' price_to_amount
// Check which cohorts the UTXO was in vs is now in, and increment/decrement accordingly
if let Some(price) = block_state.price {
// Only process if there's remaining supply (like CohortState::increment/decrement do)
if let Some(price) = block_state.price
&& block_state.supply.value > Sats::ZERO
{
aggregate_p2a.iter_mut().for_each(|(filter, p2a)| {
let is = filter.contains_time(days_old);
let was = filter.contains_time(prev_days_old);
if is && !was {
if let Some(p2a) = p2a.as_mut() {
p2a.increment(price, &block_state.supply);
}
p2a.as_mut().unwrap().increment(price, &block_state.supply);
} else if was && !is {
if let Some(p2a) = p2a.as_mut() {
p2a.decrement(price, &block_state.supply);
}
p2a.as_mut().unwrap().decrement(price, &block_state.supply);
}
});
}
@@ -1551,7 +1551,10 @@ impl Vecs {
..
} = &mut self.0;
let mut time_based_vecs = age_range.iter_mut().chain(epoch.iter_mut()).collect::<Vec<_>>();
let mut time_based_vecs = age_range
.iter_mut()
.chain(epoch.iter_mut())
.collect::<Vec<_>>();
// Collect aggregate cohorts' filter and p2a for iteration
let mut aggregate_p2a: Vec<(Filter, Option<&mut crate::PriceToAmount>)> = vec![
@@ -1613,50 +1616,40 @@ impl Vecs {
.spendable
.iter_typed()
.for_each(|(output_type, supply_state)| {
type_
.get_mut(output_type)
.state
.as_mut()
.unwrap()
.send(
supply_state,
current_price,
prev_price,
blocks_old,
days_old_float,
older_than_hour,
)
type_.get_mut(output_type).state.as_mut().unwrap().send(
supply_state,
current_price,
prev_price,
blocks_old,
days_old_float,
older_than_hour,
)
});
sent.by_size_group
.iter_typed()
.for_each(|(group, supply_state)| {
amount_range
.get_mut(group)
.state
.as_mut()
.unwrap()
.send(
supply_state,
current_price,
prev_price,
blocks_old,
days_old_float,
older_than_hour,
);
amount_range.get_mut(group).state.as_mut().unwrap().send(
supply_state,
current_price,
prev_price,
blocks_old,
days_old_float,
older_than_hour,
);
});
// Update aggregate cohorts' price_to_amount using filter.contains_time()
if let Some(prev_price) = prev_price {
let supply_state = &sent.spendable_supply;
if supply_state.value.is_not_zero() {
aggregate_p2a.iter_mut().for_each(|(filter, p2a)| {
if filter.contains_time(days_old) {
if let Some(p2a) = p2a.as_mut() {
p2a.decrement(prev_price, supply_state);
}
}
});
aggregate_p2a
.iter_mut()
.filter(|(f, _)| f.contains_time(days_old))
.map(|(_, p2a)| p2a)
.for_each(|p2a| {
p2a.as_mut().unwrap().decrement(prev_price, supply_state);
});
}
}
});
@@ -1679,13 +1672,15 @@ impl Vecs {
if let Some(price) = price
&& supply_state.value.is_not_zero()
{
self.0.iter_aggregate_mut().for_each(|v| {
if v.filter().contains_time(0) {
if let Some(p2a) = v.price_to_amount.as_mut() {
p2a.increment(price, &supply_state);
}
}
});
self.0
.iter_aggregate_mut()
.filter(|v| v.filter().contains_time(0))
.for_each(|v| {
v.price_to_amount
.as_mut()
.unwrap()
.increment(price, &supply_state);
});
}
self.type_.iter_mut().for_each(|vecs| {
@@ -1843,39 +1838,60 @@ impl Vecs {
})
}
/// Import aggregate cohorts' price_to_amount from disk when resuming from a checkpoint
pub fn import_aggregate_price_to_amount(&mut self, height: Height) -> Result<Height> {
let mut min_height = height;
for v in self.0.iter_aggregate_mut() {
if let Some(p2a) = v.price_to_amount.as_mut() {
min_height = min_height.min(p2a.import_at_or_before(height)?);
}
}
Ok(min_height)
}
/// Compute and push percentiles for aggregate cohorts (all, sth, lth).
/// Must be called after receive()/send() when price_to_amount is up to date.
pub fn truncate_push_aggregate_percentiles(&mut self, height: Height) -> Result<()> {
// First, compute supplies for each aggregate cohort by summing age_range sub-cohorts
let supplies: Vec<(Filter, Sats)> = self
let age_range_data: Vec<_> = self
.0
.iter_aggregate()
.map(|v| {
let filter = v.filter().clone();
let supply = self
.0
.age_range
.iter()
.filter(|sub| filter.includes(sub.filter()))
.map(|sub| sub.state.as_ref().unwrap().supply.value)
.fold(Sats::ZERO, |acc, v| acc + v);
(filter, supply)
.age_range
.iter()
.map(|sub| {
(
sub.filter().clone(),
sub.state.as_ref().unwrap().supply.value,
)
})
.collect();
// Then, compute and push percentiles for each aggregate cohort
for (filter, supply) in supplies {
let results: Vec<_> = self
.0
.par_iter_aggregate()
.map(|v| {
if v.price_to_amount.is_none() {
panic!();
}
let filter = v.filter().clone();
let supply = age_range_data
.iter()
.filter(|(sub_filter, _)| filter.includes(sub_filter))
.map(|(_, value)| *value)
.fold(Sats::ZERO, |acc, v| acc + v);
let percentiles = v.compute_percentile_prices_from_standalone(supply);
(filter, percentiles)
})
.collect();
// Push results sequentially (requires &mut)
for (filter, percentiles) in results {
let v = self
.0
.iter_aggregate_mut()
.find(|v| v.filter() == &filter)
.unwrap();
if v.price_to_amount.is_some() {
let percentiles = v.compute_percentile_prices_from_standalone(supply);
if let Some(pp) = v.inner.price_percentiles.as_mut() {
pp.truncate_push(height, &percentiles)?;
}
if let Some(pp) = v.inner.price_percentiles.as_mut() {
pp.truncate_push(height, &percentiles)?;
}
}