global: snapshot

This commit is contained in:
nym21
2026-03-04 23:21:56 +01:00
parent 9e23de4ba1
commit ef0b77baa8
51 changed files with 2109 additions and 5730 deletions

View File

@@ -15,14 +15,8 @@ impl Vecs {
) -> Result<()> {
let window_starts = blocks.count.window_starts();
let circulating_supply = &distribution
.utxo_cohorts
.all
.metrics
.supply
.total
.sats
.height;
let all_metrics = &distribution.utxo_cohorts.all.metrics;
let circulating_supply = &all_metrics.supply.total.sats.height;
self.coinblocks_created
.compute(starting_indexes.height, &window_starts, exit, |vec| {
@@ -35,12 +29,7 @@ impl Vecs {
Ok(())
})?;
let coinblocks_destroyed = &distribution
.utxo_cohorts
.all
.metrics
.activity
.coinblocks_destroyed;
let coinblocks_destroyed = &all_metrics.activity.coinblocks_destroyed;
self.coinblocks_stored
.compute(starting_indexes.height, &window_starts, exit, |vec| {

View File

@@ -17,22 +17,9 @@ impl Vecs {
value: &value::Vecs,
exit: &Exit,
) -> Result<()> {
let realized_cap_cents = &distribution
.utxo_cohorts
.all
.metrics
.realized
.realized_cap_cents
.height;
let circulating_supply = &distribution
.utxo_cohorts
.all
.metrics
.supply
.total
.btc
.height;
let all_metrics = &distribution.utxo_cohorts.all.metrics;
let realized_cap_cents = &all_metrics.realized.realized_cap_cents.height;
let circulating_supply = &all_metrics.supply.total.btc.height;
self.thermo_cap.cents.height.compute_transform(
starting_indexes.height,

View File

@@ -19,22 +19,9 @@ impl Vecs {
cap: &cap::Vecs,
exit: &Exit,
) -> Result<()> {
let circulating_supply = &distribution
.utxo_cohorts
.all
.metrics
.supply
.total
.btc
.height;
let realized_price = &distribution
.utxo_cohorts
.all
.metrics
.realized
.realized_price
.cents
.height;
let all_metrics = &distribution.utxo_cohorts.all.metrics;
let circulating_supply = &all_metrics.supply.total.btc.height;
let realized_price = &all_metrics.realized.realized_price.cents.height;
self.vaulted_price.cents.height.compute_transform2(
starting_indexes.height,

View File

@@ -18,28 +18,10 @@ impl Vecs {
) -> Result<()> {
let window_starts = blocks.count.window_starts();
let coinblocks_destroyed = &distribution
.utxo_cohorts
.all
.metrics
.activity
.coinblocks_destroyed;
let coindays_destroyed = &distribution
.utxo_cohorts
.all
.metrics
.activity
.coindays_destroyed;
let circulating_supply = &distribution
.utxo_cohorts
.all
.metrics
.supply
.total
.btc
.height;
let all_metrics = &distribution.utxo_cohorts.all.metrics;
let coinblocks_destroyed = &all_metrics.activity.coinblocks_destroyed;
let coindays_destroyed = &all_metrics.activity.coindays_destroyed;
let circulating_supply = &all_metrics.supply.total.btc.height;
self.cointime_value_destroyed.compute(
starting_indexes.height,

View File

@@ -19,6 +19,8 @@ pub(crate) fn process_received(
empty_addr_count: &mut ByAddressType<u64>,
activity_counts: &mut AddressTypeToActivityCounts,
) {
let mut aggregated: FxHashMap<TypeIndex, (Sats, u32)> = FxHashMap::default();
for (output_type, vec) in received_data.unwrap().into_iter() {
if vec.is_empty() {
continue;
@@ -31,14 +33,13 @@ pub(crate) fn process_received(
// Aggregate receives by address - each address processed exactly once
// Track (total_value, output_count) for correct UTXO counting
let mut aggregated: FxHashMap<TypeIndex, (Sats, u32)> = FxHashMap::default();
for (type_index, value) in vec {
let entry = aggregated.entry(type_index).or_default();
entry.0 += value;
entry.1 += 1;
}
for (type_index, (total_value, output_count)) in aggregated {
for (type_index, (total_value, output_count)) in aggregated.drain() {
let (addr_data, status) = lookup.get_or_create_for_receive(output_type, type_index);
// Track receiving activity - each address in receive aggregation

View File

@@ -40,9 +40,9 @@ pub(crate) fn process_sent(
height_to_timestamp: &[Timestamp],
current_height: Height,
current_timestamp: Timestamp,
seen_senders: &mut ByAddressType<FxHashSet<TypeIndex>>,
) -> Result<()> {
// Track unique senders per address type (simple set, no extra data needed)
let mut seen_senders: ByAddressType<FxHashSet<TypeIndex>> = ByAddressType::default();
seen_senders.values_mut().for_each(|set| set.clear());
for (receive_height, by_type) in sent_data.into_iter() {
let prev_price = height_to_price[receive_height.to_usize()];

View File

@@ -62,15 +62,15 @@ pub(crate) fn process_inputs(
.map(|local_idx| -> Result<_> {
let txindex = txinindex_to_txindex[local_idx];
let prev_height = *txinindex_to_prev_height.get(local_idx).unwrap();
let value = *txinindex_to_value.get(local_idx).unwrap();
let input_type = *txinindex_to_outputtype.get(local_idx).unwrap();
let prev_height = txinindex_to_prev_height[local_idx];
let value = txinindex_to_value[local_idx];
let input_type = txinindex_to_outputtype[local_idx];
if input_type.is_not_address() {
return Ok((prev_height, value, input_type, None));
}
let typeindex = *txinindex_to_typeindex.get(local_idx).unwrap();
let typeindex = txinindex_to_typeindex[local_idx];
// Look up address data
let addr_data_opt = load_uncached_address_data(

View File

@@ -1,6 +1,7 @@
use brk_cohort::ByAddressType;
use brk_error::Result;
use brk_types::{FundedAddressData, Sats, TxIndex, TypeIndex};
use rayon::prelude::*;
use smallvec::SmallVec;
use crate::distribution::{
@@ -47,7 +48,40 @@ pub(crate) fn process_outputs(
) -> Result<OutputsResult> {
let output_count = txoutdata_vec.len();
// Pre-allocate result structures
// Phase 1: Parallel address lookups (mmap reads)
let items: Vec<_> = (0..output_count)
.into_par_iter()
.map(|local_idx| -> Result<_> {
let txoutdata = &txoutdata_vec[local_idx];
let value = txoutdata.value;
let output_type = txoutdata.outputtype;
if output_type.is_not_address() {
return Ok((value, output_type, None));
}
let typeindex = txoutdata.typeindex;
let txindex = txoutindex_to_txindex[local_idx];
let addr_data_opt = load_uncached_address_data(
output_type,
typeindex,
first_addressindexes,
cache,
vr,
any_address_indexes,
addresses_data,
)?;
Ok((
value,
output_type,
Some((typeindex, txindex, value, addr_data_opt)),
))
})
.collect::<Result<Vec<_>>>()?;
// Phase 2: Sequential accumulation
let estimated_per_type = (output_count / 8).max(8);
let mut transacted = Transacted::default();
let mut received_data = AddressTypeToVec::with_capacity(estimated_per_type);
@@ -58,45 +92,26 @@ pub(crate) fn process_outputs(
let mut txindex_vecs =
AddressTypeToTypeIndexMap::<SmallVec<[TxIndex; 4]>>::with_capacity(estimated_per_type);
// Single pass: read from pre-collected vecs and accumulate
for (local_idx, txoutdata) in txoutdata_vec.iter().enumerate() {
let txindex = txoutindex_to_txindex[local_idx];
let value = txoutdata.value;
let output_type = txoutdata.outputtype;
for (value, output_type, addr_info) in items {
transacted.iterate(value, output_type);
if output_type.is_not_address() {
continue;
if let Some((typeindex, txindex, value, addr_data_opt)) = addr_info {
received_data
.get_mut(output_type)
.unwrap()
.push((typeindex, value));
if let Some(addr_data) = addr_data_opt {
address_data.insert_for_type(output_type, typeindex, addr_data);
}
txindex_vecs
.get_mut(output_type)
.unwrap()
.entry(typeindex)
.or_default()
.push(txindex);
}
let typeindex = txoutdata.typeindex;
received_data
.get_mut(output_type)
.unwrap()
.push((typeindex, value));
let addr_data_opt = load_uncached_address_data(
output_type,
typeindex,
first_addressindexes,
cache,
vr,
any_address_indexes,
addresses_data,
)?;
if let Some(addr_data) = addr_data_opt {
address_data.insert_for_type(output_type, typeindex, addr_data);
}
txindex_vecs
.get_mut(output_type)
.unwrap()
.entry(typeindex)
.or_default()
.push(txindex);
}
Ok(OutputsResult {

View File

@@ -1,27 +1,16 @@
use std::{cmp::Reverse, collections::BinaryHeap, fs, path::Path};
use std::path::Path;
use brk_cohort::{
ByAgeRange, ByAmountRange, ByEpoch, ByGreatEqualAmount, ByLowerThanAmount, ByMaxAge, ByMinAge,
BySpendableType, ByYear, CohortContext, Filter, Filtered, TERM_NAMES, Term,
BySpendableType, ByYear, CohortContext, Filter, Term,
};
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{
BasisPoints16, Cents, CentsCompact, CostBasisDistribution, Date, Dollars, Height, Indexes,
Sats, Version,
};
use brk_types::{Dollars, Height, Indexes, Version};
use rayon::prelude::*;
use vecdb::{
AnyStoredVec, Database, Exit, ReadOnlyClone, ReadableVec, Rw, StorageMode, WritableVec,
};
use vecdb::{AnyStoredVec, Database, Exit, ReadOnlyClone, ReadableVec, Rw, StorageMode};
use crate::{
blocks,
distribution::DynCohortVecs,
indexes,
internal::{PERCENTILES, PERCENTILES_LEN, compute_spot_percentile_rank},
prices,
};
use crate::{blocks, distribution::DynCohortVecs, indexes, prices};
use crate::distribution::metrics::{
AdjustedCohortMetrics, AllCohortMetrics, BasicCohortMetrics, CohortMetricsBase,
@@ -34,9 +23,6 @@ use crate::distribution::state::UTXOCohortState;
const VERSION: Version = Version::new(0);
/// Significant digits for cost basis prices (after rounding to dollars).
const COST_BASIS_PRICE_DIGITS: i32 = 5;
/// All UTXO cohorts organized by filter type.
///
/// Each group uses a concrete metrics type matching its required features:
@@ -63,6 +49,18 @@ pub struct UTXOCohorts<M: StorageMode = Rw> {
pub type_: BySpendableType<UTXOCohortVecs<BasicCohortMetrics<M>>>,
}
macro_rules! collect_separate {
($self:expr, $method:ident, $trait_ref:ty) => {{
let mut v: Vec<$trait_ref> = Vec::with_capacity(UTXOCohorts::SEPARATE_COHORT_CAPACITY);
v.extend($self.age_range.$method().map(|x| x as $trait_ref));
v.extend($self.epoch.$method().map(|x| x as $trait_ref));
v.extend($self.year.$method().map(|x| x as $trait_ref));
v.extend($self.amount_range.$method().map(|x| x as $trait_ref));
v.extend($self.type_.$method().map(|x| x as $trait_ref));
v
}};
}
impl UTXOCohorts<Rw> {
/// Import all UTXO cohorts from database.
pub(crate) fn forced_import(
@@ -236,54 +234,23 @@ impl UTXOCohorts<Rw> {
})
}
/// ~71 separate cohorts (21 age + 5 epoch + 18 year + 15 amount + 12 type)
const SEPARATE_COHORT_CAPACITY: usize = 80;
pub(crate) fn par_iter_separate_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn DynCohortVecs> {
let mut v: Vec<&mut dyn DynCohortVecs> = Vec::new();
v.extend(
self.age_range
.iter_mut()
.map(|x| x as &mut dyn DynCohortVecs),
);
v.extend(self.epoch.iter_mut().map(|x| x as &mut dyn DynCohortVecs));
v.extend(self.year.iter_mut().map(|x| x as &mut dyn DynCohortVecs));
v.extend(
self.amount_range
.iter_mut()
.map(|x| x as &mut dyn DynCohortVecs),
);
v.extend(self.type_.iter_mut().map(|x| x as &mut dyn DynCohortVecs));
v.into_par_iter()
collect_separate!(self, iter_mut, &mut dyn DynCohortVecs).into_par_iter()
}
/// Immutable iterator over all separate (stateful) cohorts.
pub(crate) fn iter_separate(&self) -> impl Iterator<Item = &dyn DynCohortVecs> {
let mut v: Vec<&dyn DynCohortVecs> = Vec::new();
v.extend(self.age_range.iter().map(|x| x as &dyn DynCohortVecs));
v.extend(self.epoch.iter().map(|x| x as &dyn DynCohortVecs));
v.extend(self.year.iter().map(|x| x as &dyn DynCohortVecs));
v.extend(self.amount_range.iter().map(|x| x as &dyn DynCohortVecs));
v.extend(self.type_.iter().map(|x| x as &dyn DynCohortVecs));
v.into_iter()
collect_separate!(self, iter, &dyn DynCohortVecs).into_iter()
}
/// Mutable iterator over all separate cohorts (non-parallel).
pub(crate) fn iter_separate_mut(&mut self) -> impl Iterator<Item = &mut dyn DynCohortVecs> {
let mut v: Vec<&mut dyn DynCohortVecs> = Vec::new();
v.extend(
self.age_range
.iter_mut()
.map(|x| x as &mut dyn DynCohortVecs),
);
v.extend(self.epoch.iter_mut().map(|x| x as &mut dyn DynCohortVecs));
v.extend(self.year.iter_mut().map(|x| x as &mut dyn DynCohortVecs));
v.extend(
self.amount_range
.iter_mut()
.map(|x| x as &mut dyn DynCohortVecs),
);
v.extend(self.type_.iter_mut().map(|x| x as &mut dyn DynCohortVecs));
v.into_iter()
collect_separate!(self, iter_mut, &mut dyn DynCohortVecs).into_iter()
}
pub(crate) fn compute_overlapping_vecs(
@@ -310,13 +277,9 @@ impl UTXOCohorts<Rw> {
// sth: aggregate of matching age_range
{
let sth_filter = self.sth.metrics.filter().clone();
let matching: Vec<_> = age_range
let sources_dyn: Vec<&dyn CohortMetricsBase> = age_range
.iter()
.filter(|v| sth_filter.includes(v.metrics.filter()))
.collect();
let sources_dyn: Vec<&dyn CohortMetricsBase> = matching
.iter()
.map(|v| &v.metrics as &dyn CohortMetricsBase)
.collect();
self.sth
@@ -327,13 +290,9 @@ impl UTXOCohorts<Rw> {
// lth: aggregate of matching age_range
{
let lth_filter = self.lth.metrics.filter().clone();
let matching: Vec<_> = age_range
let sources_dyn: Vec<&dyn CohortMetricsBase> = age_range
.iter()
.filter(|v| lth_filter.includes(v.metrics.filter()))
.collect();
let sources_dyn: Vec<&dyn CohortMetricsBase> = matching
.iter()
.map(|v| &v.metrics as &dyn CohortMetricsBase)
.collect();
self.lth
@@ -343,54 +302,36 @@ impl UTXOCohorts<Rw> {
// min_age: base from matching age_range
self.min_age
.iter_mut()
.collect::<Vec<_>>()
.into_par_iter()
.par_iter_mut()
.try_for_each(|vecs| -> Result<()> {
let filter = vecs.metrics.filter().clone();
let matching: Vec<_> = age_range
let sources_dyn: Vec<&dyn CohortMetricsBase> = age_range
.iter()
.filter(|v| filter.includes(v.metrics.filter()))
.collect();
let sources_dyn: Vec<&dyn CohortMetricsBase> = matching
.iter()
.map(|v| &v.metrics as &dyn CohortMetricsBase)
.collect();
vecs.metrics
.compute_base_from_others(starting_indexes, &sources_dyn, exit)?;
Ok(())
.compute_base_from_others(starting_indexes, &sources_dyn, exit)
})?;
// max_age: base + peak_regret from matching age_range
self.max_age
.iter_mut()
.collect::<Vec<_>>()
.into_par_iter()
.par_iter_mut()
.try_for_each(|vecs| -> Result<()> {
let filter = vecs.metrics.filter().clone();
let matching: Vec<_> = age_range
let sources_dyn: Vec<&dyn CohortMetricsBase> = age_range
.iter()
.filter(|v| filter.includes(v.metrics.filter()))
.collect();
let sources_dyn: Vec<&dyn CohortMetricsBase> = matching
.iter()
.map(|v| &v.metrics as &dyn CohortMetricsBase)
.collect();
vecs.metrics
.compute_base_from_others(starting_indexes, &sources_dyn, exit)?;
Ok(())
.compute_base_from_others(starting_indexes, &sources_dyn, exit)
})?;
// ge_amount, lt_amount: base only from matching amount_range
self.ge_amount
.iter_mut()
.chain(self.lt_amount.iter_mut())
.collect::<Vec<_>>()
.into_par_iter()
.par_iter_mut()
.chain(self.lt_amount.par_iter_mut())
.try_for_each(|vecs| {
let filter = vecs.metrics.filter().clone();
let sources_dyn: Vec<&dyn CohortMetricsBase> = amount_range
@@ -415,7 +356,7 @@ impl UTXOCohorts<Rw> {
) -> Result<()> {
// 1. Compute all metrics except net_sentiment (all cohorts via DynCohortVecs)
{
let mut all: Vec<&mut dyn DynCohortVecs> = Vec::new();
let mut all: Vec<&mut dyn DynCohortVecs> = Vec::with_capacity(Self::SEPARATE_COHORT_CAPACITY + 3);
all.push(&mut self.all);
all.push(&mut self.sth);
all.push(&mut self.lth);
@@ -708,7 +649,7 @@ impl UTXOCohorts<Rw> {
pub(crate) fn par_iter_vecs_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
let mut vecs: Vec<&mut dyn AnyStoredVec> = Vec::new();
let mut vecs: Vec<&mut dyn AnyStoredVec> = Vec::with_capacity(2048);
vecs.extend(self.all.metrics.collect_all_vecs_mut());
vecs.extend(self.sth.metrics.collect_all_vecs_mut());
vecs.extend(self.lth.metrics.collect_all_vecs_mut());
@@ -777,229 +718,6 @@ impl UTXOCohorts<Rw> {
.try_for_each(|v| v.reset_cost_basis_data_if_needed())
}
/// Compute and push percentiles for aggregate cohorts (all, sth, lth).
pub(crate) fn truncate_push_aggregate_percentiles(
&mut self,
height: Height,
spot: Cents,
date_opt: Option<Date>,
states_path: &Path,
) -> Result<()> {
// Collect (filter, entries, total_sats, total_usd) from age_range cohorts.
let age_range_data: Vec<_> = self
.age_range
.iter()
.filter_map(|sub| {
let state = sub.state.as_ref()?;
let mut total_sats: u64 = 0;
let mut total_usd: u128 = 0;
let entries: Vec<(Cents, Sats)> = state
.cost_basis_data_iter()
.map(|(price, &sats)| {
let sats_u64 = u64::from(sats);
let price_u128 = price.as_u128();
total_sats += sats_u64;
total_usd += price_u128 * sats_u64 as u128;
(price, sats)
})
.collect();
Some((sub.filter().clone(), entries, total_sats, total_usd))
})
.collect();
// Build list of (filter, cost_basis_extended, cohort_name) for aggregate cohorts
struct AggregateTarget<'a> {
filter: Filter,
extended: &'a mut crate::distribution::metrics::CostBasisExtended,
cohort_name: Option<&'static str>,
}
let mut targets = [
AggregateTarget {
filter: self.all.metrics.filter().clone(),
extended: &mut self.all.metrics.cost_basis.extended,
cohort_name: Some("all"),
},
AggregateTarget {
filter: self.sth.metrics.filter().clone(),
extended: &mut self.sth.metrics.cost_basis.extended,
cohort_name: Some(TERM_NAMES.short.id),
},
AggregateTarget {
filter: self.lth.metrics.filter().clone(),
extended: &mut self.lth.metrics.cost_basis.extended,
cohort_name: Some(TERM_NAMES.long.id),
},
];
for target in targets.iter_mut() {
let filter = &target.filter;
let mut total_sats: u64 = 0;
let mut total_usd: u128 = 0;
let relevant: Vec<_> = age_range_data
.iter()
.filter(|(sub_filter, _, _, _)| filter.includes(sub_filter))
.map(|(_, entries, cohort_sats, cohort_usd)| {
total_sats += cohort_sats;
total_usd += cohort_usd;
entries
})
.collect();
if total_sats == 0 {
let nan_prices = [Cents::ZERO; PERCENTILES_LEN];
target
.extended
.percentiles
.truncate_push(height, &nan_prices)?;
target
.extended
.invested_capital
.truncate_push(height, &nan_prices)?;
target
.extended
.spot_cost_basis_percentile
.bps
.height
.truncate_push(height, BasisPoints16::ZERO)?;
target
.extended
.spot_invested_capital_percentile
.bps
.height
.truncate_push(height, BasisPoints16::ZERO)?;
continue;
}
// K-way merge using min-heap
let mut heap: BinaryHeap<Reverse<(Cents, usize, usize)>> = BinaryHeap::new();
for (cohort_idx, entries) in relevant.iter().enumerate() {
if !entries.is_empty() {
heap.push(Reverse((entries[0].0, cohort_idx, 0)));
}
}
let sat_targets = PERCENTILES.map(|p| total_sats * u64::from(p) / 100);
let usd_targets = PERCENTILES.map(|p| total_usd * u128::from(p) / 100);
let mut sat_result = [Cents::ZERO; PERCENTILES_LEN];
let mut usd_result = [Cents::ZERO; PERCENTILES_LEN];
let mut cumsum_sats: u64 = 0;
let mut cumsum_usd: u128 = 0;
let mut sat_idx = 0;
let mut usd_idx = 0;
let mut current_price: Option<Cents> = None;
let mut sats_at_price: u64 = 0;
let mut usd_at_price: u128 = 0;
let collect_merged = date_opt.is_some();
let max_unique_prices = if collect_merged {
relevant.iter().map(|e| e.len()).max().unwrap_or(0)
} else {
0
};
let mut merged: Vec<(CentsCompact, Sats)> = Vec::with_capacity(max_unique_prices);
let mut finalize_price = |price: Cents, sats: u64, usd: u128| {
cumsum_sats += sats;
cumsum_usd += usd;
if sat_idx < PERCENTILES_LEN || usd_idx < PERCENTILES_LEN {
while sat_idx < PERCENTILES_LEN && cumsum_sats >= sat_targets[sat_idx] {
sat_result[sat_idx] = price;
sat_idx += 1;
}
while usd_idx < PERCENTILES_LEN && cumsum_usd >= usd_targets[usd_idx] {
usd_result[usd_idx] = price;
usd_idx += 1;
}
}
if collect_merged {
let rounded: CentsCompact =
price.round_to_dollar(COST_BASIS_PRICE_DIGITS).into();
if let Some((last_price, last_sats)) = merged.last_mut()
&& *last_price == rounded
{
*last_sats += Sats::from(sats);
} else {
merged.push((rounded, Sats::from(sats)));
}
}
};
while let Some(Reverse((price, cohort_idx, entry_idx))) = heap.pop() {
let entries = relevant[cohort_idx];
let (_, amount) = entries[entry_idx];
let amount_u64 = u64::from(amount);
let price_u128 = price.as_u128();
if let Some(prev_price) = current_price
&& prev_price != price
{
finalize_price(prev_price, sats_at_price, usd_at_price);
sats_at_price = 0;
usd_at_price = 0;
}
current_price = Some(price);
sats_at_price += amount_u64;
usd_at_price += price_u128 * amount_u64 as u128;
let next_idx = entry_idx + 1;
if next_idx < entries.len() {
heap.push(Reverse((entries[next_idx].0, cohort_idx, next_idx)));
}
}
if let Some(price) = current_price {
finalize_price(price, sats_at_price, usd_at_price);
}
target
.extended
.percentiles
.truncate_push(height, &sat_result)?;
target
.extended
.invested_capital
.truncate_push(height, &usd_result)?;
let rank = compute_spot_percentile_rank(&sat_result, spot);
target
.extended
.spot_cost_basis_percentile
.bps
.height
.truncate_push(height, rank)?;
let rank = compute_spot_percentile_rank(&usd_result, spot);
target
.extended
.spot_invested_capital_percentile
.bps
.height
.truncate_push(height, rank)?;
// Write daily cost basis snapshot
if let Some(date) = date_opt
&& let Some(cohort_name) = target.cohort_name
{
let dir = states_path.join(format!("utxo_{cohort_name}_cost_basis/by_date"));
fs::create_dir_all(&dir)?;
let path = dir.join(date.to_string());
fs::write(
path,
CostBasisDistribution::serialize_iter(merged.into_iter())?,
)?;
}
}
Ok(())
}
/// Validate computed versions for all cohorts.
pub(crate) fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {
// Validate separate cohorts

View File

@@ -1,4 +1,5 @@
mod groups;
mod percentiles;
mod receive;
mod send;
mod tick_tock;

View File

@@ -0,0 +1,322 @@
use std::{cmp::Reverse, collections::BinaryHeap, fs, path::Path};
use brk_cohort::{Filtered, TERM_NAMES};
use brk_error::Result;
use brk_types::{
BasisPoints16, Cents, CentsCompact, CostBasisDistribution, Date, Height, Sats,
};
use vecdb::WritableVec;
use crate::internal::{PERCENTILES, PERCENTILES_LEN, compute_spot_percentile_rank};
use crate::distribution::metrics::{CohortMetricsBase, CostBasisExtended};
use super::groups::UTXOCohorts;
/// Significant digits for cost basis prices (after rounding to dollars).
const COST_BASIS_PRICE_DIGITS: i32 = 5;
impl UTXOCohorts {
/// Compute and push percentiles for aggregate cohorts (all, sth, lth).
///
/// Single K-way merge pass over all age_range cohorts computes percentiles
/// for all 3 targets simultaneously, since each cohort belongs to exactly
/// one of STH/LTH and always contributes to ALL.
///
/// Uses BinaryHeap with direct BTreeMap iterators — O(log K) merge
/// with zero intermediate Vec allocation.
pub(crate) fn truncate_push_aggregate_percentiles(
&mut self,
height: Height,
spot: Cents,
date_opt: Option<Date>,
states_path: &Path,
) -> Result<()> {
let collect_merged = date_opt.is_some();
// Phase 1: compute totals + merge.
// Scoped so age_range borrows release before push_target borrows self.all/sth/lth.
let targets = {
let sth_filter = self.sth.metrics.filter().clone();
let mut totals = AllSthLth::<(u64, u128)>::default();
// Collect BTreeMap refs from age_range, skip empty, compute totals.
let maps: Vec<_> = self
.age_range
.iter()
.filter_map(|sub| {
let state = sub.state.as_ref()?;
let map = state.cost_basis_map();
if map.is_empty() {
return None;
}
let is_sth = sth_filter.includes(sub.filter());
let mut cs = 0u64;
let mut cu = 0u128;
for (&price, &sats) in map.iter() {
let s = u64::from(sats);
cs += s;
cu += price.as_u128() * s as u128;
}
totals.all.0 += cs;
totals.all.1 += cu;
let term = totals.term_mut(is_sth);
term.0 += cs;
term.1 += cu;
Some((map, is_sth))
})
.collect();
let cap = if collect_merged {
maps.iter().map(|(m, _)| m.len()).max().unwrap_or(0)
} else {
0
};
let all_has_data = totals.all.0 > 0;
let mut targets = totals.map(|(sats, usd)| PercTarget::new(sats, usd, cap));
// K-way merge via BinaryHeap + BTreeMap iterators (no Vec copies)
if all_has_data {
let mut iters: Vec<_> = maps
.iter()
.map(|(map, is_sth)| (map.iter().peekable(), *is_sth))
.collect();
let mut heap: BinaryHeap<Reverse<(CentsCompact, usize)>> =
BinaryHeap::with_capacity(iters.len());
for (i, (iter, _)) in iters.iter_mut().enumerate() {
if let Some(&(&price, _)) = iter.peek() {
heap.push(Reverse((price, i)));
}
}
let mut current_price: Option<CentsCompact> = None;
let mut early_exit = false;
while let Some(Reverse((price, ci))) = heap.pop() {
let (ref mut iter, is_sth) = iters[ci];
let (_, &sats) = iter.next().unwrap();
let amount = u64::from(sats);
let usd = Cents::from(price).as_u128() * amount as u128;
if let Some(prev) = current_price
&& prev != price
{
targets
.for_each_mut(|t| t.finalize_price(prev.into(), collect_merged));
if !collect_merged && targets.all_match(|t| t.done()) {
early_exit = true;
break;
}
}
current_price = Some(price);
targets.all.accumulate(amount, usd);
targets.term_mut(is_sth).accumulate(amount, usd);
if let Some(&(&next_price, _)) = iter.peek() {
heap.push(Reverse((next_price, ci)));
}
}
if !early_exit
&& let Some(price) = current_price
{
targets.for_each_mut(|t| t.finalize_price(price.into(), collect_merged));
}
}
targets
};
// Phase 2: push results (borrows self.all/sth/lth mutably)
push_target(
height, spot, date_opt, states_path, targets.all,
&mut self.all.metrics.cost_basis.extended, "all",
)?;
push_target(
height, spot, date_opt, states_path, targets.sth,
&mut self.sth.metrics.cost_basis.extended, TERM_NAMES.short.id,
)?;
push_target(
height, spot, date_opt, states_path, targets.lth,
&mut self.lth.metrics.cost_basis.extended, TERM_NAMES.long.id,
)?;
Ok(())
}
}
struct AllSthLth<T> {
all: T,
sth: T,
lth: T,
}
impl<T: Default> Default for AllSthLth<T> {
fn default() -> Self {
Self {
all: T::default(),
sth: T::default(),
lth: T::default(),
}
}
}
impl<T> AllSthLth<T> {
fn term_mut(&mut self, is_sth: bool) -> &mut T {
if is_sth { &mut self.sth } else { &mut self.lth }
}
fn map<U>(self, mut f: impl FnMut(T) -> U) -> AllSthLth<U> {
AllSthLth {
all: f(self.all),
sth: f(self.sth),
lth: f(self.lth),
}
}
fn for_each_mut(&mut self, mut f: impl FnMut(&mut T)) {
f(&mut self.all);
f(&mut self.sth);
f(&mut self.lth);
}
fn all_match(&self, mut f: impl FnMut(&T) -> bool) -> bool {
f(&self.all) && f(&self.sth) && f(&self.lth)
}
}
struct PercTarget {
total_sats: u64,
total_usd: u128,
cum_sats: u64,
cum_usd: u128,
sat_idx: usize,
usd_idx: usize,
sat_targets: [u64; PERCENTILES_LEN],
usd_targets: [u128; PERCENTILES_LEN],
sat_result: [Cents; PERCENTILES_LEN],
usd_result: [Cents; PERCENTILES_LEN],
price_sats: u64,
price_usd: u128,
merged: Vec<(CentsCompact, Sats)>,
}
impl PercTarget {
fn new(total_sats: u64, total_usd: u128, merged_cap: usize) -> Self {
Self {
sat_targets: if total_sats > 0 {
PERCENTILES.map(|p| total_sats * u64::from(p) / 100)
} else {
[0; PERCENTILES_LEN]
},
usd_targets: if total_usd > 0 {
PERCENTILES.map(|p| total_usd * u128::from(p) / 100)
} else {
[0; PERCENTILES_LEN]
},
total_sats,
total_usd,
cum_sats: 0,
cum_usd: 0,
sat_idx: 0,
usd_idx: 0,
sat_result: [Cents::ZERO; PERCENTILES_LEN],
usd_result: [Cents::ZERO; PERCENTILES_LEN],
price_sats: 0,
price_usd: 0,
merged: Vec::with_capacity(merged_cap),
}
}
#[inline]
fn accumulate(&mut self, amount: u64, usd: u128) {
self.price_sats += amount;
self.price_usd += usd;
}
fn finalize_price(&mut self, price: Cents, collect_merged: bool) {
if collect_merged && self.price_sats > 0 {
let rounded: CentsCompact = price.round_to_dollar(COST_BASIS_PRICE_DIGITS).into();
if let Some((lp, ls)) = self.merged.last_mut()
&& *lp == rounded
{
*ls += Sats::from(self.price_sats);
} else {
self.merged.push((rounded, Sats::from(self.price_sats)));
}
}
self.cum_sats += self.price_sats;
self.cum_usd += self.price_usd;
if self.total_sats > 0 {
while self.sat_idx < PERCENTILES_LEN
&& self.cum_sats >= self.sat_targets[self.sat_idx]
{
self.sat_result[self.sat_idx] = price;
self.sat_idx += 1;
}
}
if self.total_usd > 0 {
while self.usd_idx < PERCENTILES_LEN
&& self.cum_usd >= self.usd_targets[self.usd_idx]
{
self.usd_result[self.usd_idx] = price;
self.usd_idx += 1;
}
}
self.price_sats = 0;
self.price_usd = 0;
}
fn done(&self) -> bool {
(self.total_sats == 0 || self.sat_idx >= PERCENTILES_LEN)
&& (self.total_usd == 0 || self.usd_idx >= PERCENTILES_LEN)
}
}
#[allow(clippy::too_many_arguments)]
fn push_target(
height: Height,
spot: Cents,
date_opt: Option<Date>,
states_path: &Path,
target: PercTarget,
ext: &mut CostBasisExtended,
name: &str,
) -> Result<()> {
ext.percentiles.truncate_push(height, &target.sat_result)?;
ext.invested_capital
.truncate_push(height, &target.usd_result)?;
let sat_rank = if target.total_sats > 0 {
compute_spot_percentile_rank(&target.sat_result, spot)
} else {
BasisPoints16::ZERO
};
ext.spot_cost_basis_percentile
.bps
.height
.truncate_push(height, sat_rank)?;
let usd_rank = if target.total_usd > 0 {
compute_spot_percentile_rank(&target.usd_result, spot)
} else {
BasisPoints16::ZERO
};
ext.spot_invested_capital_percentile
.bps
.height
.truncate_push(height, usd_rank)?;
if let Some(date) = date_opt {
let dir = states_path.join(format!("utxo_{name}_cost_basis/by_date"));
fs::create_dir_all(&dir)?;
fs::write(
dir.join(date.to_string()),
CostBasisDistribution::serialize_iter(target.merged.into_iter())?,
)?;
}
Ok(())
}

View File

@@ -1,4 +1,4 @@
use brk_types::{Cents, Height, Timestamp};
use brk_types::{CostBasisSnapshot, Cents, Height, Timestamp};
use vecdb::Rw;
use crate::distribution::state::Transacted;
@@ -23,25 +23,28 @@ impl UTXOCohorts<Rw> {
) {
let supply_state = received.spendable_supply;
// Pre-compute snapshot once for the 3 cohorts sharing the same supply_state
let snapshot = CostBasisSnapshot::from_utxo(price, &supply_state);
// New UTXOs go into up_to_1h, current epoch, and current year
self.age_range
.up_to_1h
.state
.as_mut()
.unwrap()
.receive_utxo(&supply_state, price);
.receive_utxo_snapshot(&supply_state, &snapshot);
self.epoch
.mut_vec_from_height(height)
.state
.as_mut()
.unwrap()
.receive_utxo(&supply_state, price);
.receive_utxo_snapshot(&supply_state, &snapshot);
self.year
.mut_vec_from_timestamp(timestamp)
.state
.as_mut()
.unwrap()
.receive_utxo(&supply_state, price);
.receive_utxo_snapshot(&supply_state, &snapshot);
// Update output type cohorts
self.type_.iter_typed_mut().for_each(|(output_type, vecs)| {

View File

@@ -4,7 +4,7 @@ use vecdb::{Rw, VecIndex};
use crate::distribution::{
compute::PriceRangeMax,
state::{BlockState, Transacted},
state::{BlockState, CohortState, Transacted},
};
use super::groups::UTXOCohorts;
@@ -50,47 +50,49 @@ impl UTXOCohorts<Rw> {
// This is the max price between receive and send heights
let peak_price = price_range_max.max_between(receive_height, send_height);
// Update age range cohort (direct index lookup)
self.age_range
.get_mut(age)
.state
.as_mut()
.unwrap()
.send_utxo(
&sent.spendable_supply,
current_price,
prev_price,
peak_price,
age,
);
// Update epoch cohort (direct lookup by height)
self.epoch
.mut_vec_from_height(receive_height)
.state
.as_mut()
.unwrap()
.send_utxo(
&sent.spendable_supply,
current_price,
prev_price,
peak_price,
age,
);
// Update year cohort (direct lookup by timestamp)
self.year
.mut_vec_from_timestamp(block_state.timestamp)
.state
.as_mut()
.unwrap()
.send_utxo(
&sent.spendable_supply,
current_price,
prev_price,
peak_price,
age,
);
// Pre-compute once for age_range, epoch, year (all share sent.spendable_supply)
if let Some(pre) = CohortState::precompute_send(
&sent.spendable_supply,
current_price,
prev_price,
peak_price,
age,
) {
self.age_range
.get_mut(age)
.state
.as_mut()
.unwrap()
.send_utxo_precomputed(&sent.spendable_supply, &pre);
self.epoch
.mut_vec_from_height(receive_height)
.state
.as_mut()
.unwrap()
.send_utxo_precomputed(&sent.spendable_supply, &pre);
self.year
.mut_vec_from_timestamp(block_state.timestamp)
.state
.as_mut()
.unwrap()
.send_utxo_precomputed(&sent.spendable_supply, &pre);
} else if sent.spendable_supply.utxo_count > 0 {
// Zero-value UTXOs: just subtract supply
self.age_range.get_mut(age).state.as_mut().unwrap().supply -=
&sent.spendable_supply;
self.epoch
.mut_vec_from_height(receive_height)
.state
.as_mut()
.unwrap()
.supply -= &sent.spendable_supply;
self.year
.mut_vec_from_timestamp(block_state.timestamp)
.state
.as_mut()
.unwrap()
.supply -= &sent.spendable_supply;
}
// Update output type cohorts
sent.by_type

View File

@@ -1,5 +1,5 @@
use brk_cohort::AGE_BOUNDARIES;
use brk_types::{ONE_HOUR_IN_SEC, Timestamp};
use brk_types::{CostBasisSnapshot, ONE_HOUR_IN_SEC, Timestamp};
use vecdb::Rw;
use crate::distribution::state::BlockState;
@@ -63,11 +63,13 @@ impl UTXOCohorts<Rw> {
// Move supply from younger cohort to older cohort
for block_state in &chain_state[start_idx..end_idx] {
let snapshot =
CostBasisSnapshot::from_utxo(block_state.price, &block_state.supply);
if let Some(state) = age_cohorts[boundary_idx].as_mut() {
state.decrement(&block_state.supply, block_state.price);
state.decrement_snapshot(&snapshot);
}
if let Some(state) = age_cohorts[boundary_idx + 1].as_mut() {
state.increment(&block_state.supply, block_state.price);
state.increment_snapshot(&snapshot);
}
}
}

View File

@@ -200,8 +200,9 @@ pub(crate) fn process_blocks(
let mut cache = AddressCache::new();
debug!("AddressCache created, entering main loop");
// Reusable hashsets for received addresses (avoid per-block allocation)
// Reusable hashsets (avoid per-block allocation)
let mut received_addresses = ByAddressType::<FxHashSet<TypeIndex>>::default();
let mut seen_senders = ByAddressType::<FxHashSet<TypeIndex>>::default();
// Track earliest chain_state modification from sends (for incremental supply_state writes)
let mut min_supply_modified: Option<Height> = None;
@@ -259,7 +260,7 @@ pub(crate) fn process_blocks(
if input_count > 1 {
txin_iters.collect_block_inputs(first_txinindex + 1, input_count - 1, height)
} else {
(Vec::new(), Vec::new(), Vec::new(), Vec::new())
(&[][..], &[][..], &[][..], &[][..])
};
// Process outputs and inputs in parallel with tick-tock
@@ -274,7 +275,7 @@ pub(crate) fn process_blocks(
// Process outputs (receive)
process_outputs(
txoutindex_to_txindex,
&txoutdata_vec,
txoutdata_vec,
&first_addressindexes,
&cache,
&vr,
@@ -288,10 +289,10 @@ pub(crate) fn process_blocks(
process_inputs(
input_count - 1,
&txinindex_to_txindex[1..], // Skip coinbase
&input_values,
&input_outputtypes,
&input_typeindexes,
&input_prev_heights,
input_values,
input_outputtypes,
input_typeindexes,
input_prev_heights,
&first_addressindexes,
&cache,
&vr,
@@ -346,7 +347,7 @@ pub(crate) fn process_blocks(
// Push current block state before processing cohort updates
chain_state.push(BlockState {
supply: transacted.spendable_supply.clone(),
supply: transacted.spendable_supply,
price: block_price,
timestamp,
});
@@ -396,6 +397,7 @@ pub(crate) fn process_blocks(
height_to_timestamp_vec,
height,
timestamp,
&mut seen_senders,
)
.unwrap();
});
@@ -510,15 +512,21 @@ fn push_cohort_states(
height: Height,
height_price: Cents,
) -> Result<()> {
utxo_cohorts.par_iter_separate_mut().try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(height, height_price)
})?;
address_cohorts.par_iter_separate_mut().try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(height, height_price)
})?;
let (r1, r2) = rayon::join(
|| {
utxo_cohorts.par_iter_separate_mut().try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(height, height_price)
})
},
|| {
address_cohorts.par_iter_separate_mut().try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(height, height_price)
})
},
);
r1?;
r2?;
Ok(())
}

View File

@@ -25,6 +25,7 @@ pub struct TxOutReaders<'a> {
values_buf: Vec<Sats>,
outputtypes_buf: Vec<OutputType>,
typeindexes_buf: Vec<TypeIndex>,
txoutdata_buf: Vec<TxOutData>,
}
impl<'a> TxOutReaders<'a> {
@@ -34,6 +35,7 @@ impl<'a> TxOutReaders<'a> {
values_buf: Vec::new(),
outputtypes_buf: Vec::new(),
typeindexes_buf: Vec::new(),
txoutdata_buf: Vec::new(),
}
}
@@ -42,7 +44,7 @@ impl<'a> TxOutReaders<'a> {
&mut self,
first_txoutindex: usize,
output_count: usize,
) -> Vec<TxOutData> {
) -> &[TxOutData] {
let end = first_txoutindex + output_count;
self.indexer.vecs.outputs.value.collect_range_into_at(
first_txoutindex,
@@ -60,25 +62,32 @@ impl<'a> TxOutReaders<'a> {
&mut self.typeindexes_buf,
);
self.values_buf
.iter()
.zip(&self.outputtypes_buf)
.zip(&self.typeindexes_buf)
.map(|((&value, &outputtype), &typeindex)| TxOutData {
value,
outputtype,
typeindex,
})
.collect()
self.txoutdata_buf.clear();
self.txoutdata_buf.extend(
self.values_buf
.iter()
.zip(&self.outputtypes_buf)
.zip(&self.typeindexes_buf)
.map(|((&value, &outputtype), &typeindex)| TxOutData {
value,
outputtype,
typeindex,
}),
);
&self.txoutdata_buf
}
}
/// Readers for txin vectors. Reuses outpoint buffer across blocks.
/// Readers for txin vectors. Reuses all buffers across blocks.
pub struct TxInReaders<'a> {
indexer: &'a Indexer,
txins: &'a inputs::Vecs,
txindex_to_height: &'a mut RangeMap<TxIndex, Height>,
outpoints_buf: Vec<OutPoint>,
values_buf: Vec<Sats>,
prev_heights_buf: Vec<Height>,
outputtypes_buf: Vec<OutputType>,
typeindexes_buf: Vec<TypeIndex>,
}
impl<'a> TxInReaders<'a> {
@@ -92,45 +101,45 @@ impl<'a> TxInReaders<'a> {
txins,
txindex_to_height,
outpoints_buf: Vec::new(),
values_buf: Vec::new(),
prev_heights_buf: Vec::new(),
outputtypes_buf: Vec::new(),
typeindexes_buf: Vec::new(),
}
}
/// Collect input data for a block range using bulk reads.
/// Outpoint buffer is reused across blocks; returned vecs are fresh (caller-owned).
/// Collect input data for a block range using bulk reads with buffer reuse.
pub(crate) fn collect_block_inputs(
&mut self,
first_txinindex: usize,
input_count: usize,
current_height: Height,
) -> (Vec<Sats>, Vec<Height>, Vec<OutputType>, Vec<TypeIndex>) {
) -> (&[Sats], &[Height], &[OutputType], &[TypeIndex]) {
let end = first_txinindex + input_count;
let values: Vec<Sats> = self
.txins
.spent
.value
.collect_range_at(first_txinindex, end);
self.txins.spent.value.collect_range_into_at(
first_txinindex,
end,
&mut self.values_buf,
);
self.indexer.vecs.inputs.outpoint.collect_range_into_at(
first_txinindex,
end,
&mut self.outpoints_buf,
);
let outputtypes: Vec<OutputType> = self
.indexer
.vecs
.inputs
.outputtype
.collect_range_at(first_txinindex, end);
let typeindexes: Vec<TypeIndex> = self
.indexer
.vecs
.inputs
.typeindex
.collect_range_at(first_txinindex, end);
self.indexer.vecs.inputs.outputtype.collect_range_into_at(
first_txinindex,
end,
&mut self.outputtypes_buf,
);
self.indexer.vecs.inputs.typeindex.collect_range_into_at(
first_txinindex,
end,
&mut self.typeindexes_buf,
);
let prev_heights: Vec<Height> = self
.outpoints_buf
.iter()
.map(|outpoint| {
self.prev_heights_buf.clear();
self.prev_heights_buf.extend(
self.outpoints_buf.iter().map(|outpoint| {
if outpoint.is_coinbase() {
current_height
} else {
@@ -138,10 +147,15 @@ impl<'a> TxInReaders<'a> {
.get(outpoint.txindex())
.unwrap_or(current_height)
}
})
.collect();
}),
);
(values, prev_heights, outputtypes, typeindexes)
(
&self.values_buf,
&self.prev_heights_buf,
&self.outputtypes_buf,
&self.typeindexes_buf,
)
}
}

View File

@@ -70,7 +70,7 @@ pub(crate) fn write(
vecs.supply_state
.truncate_if_needed(Height::from(truncate_to))?;
for block_state in &chain_state[truncate_to..] {
vecs.supply_state.push(block_state.supply.clone());
vecs.supply_state.push(block_state.supply);
}
vecs.any_address_indexes

View File

@@ -109,30 +109,19 @@ impl ActivityMetrics {
others: &[&Self],
exit: &Exit,
) -> Result<()> {
self.sent.base.sats.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.sent.base.sats.height)
.collect::<Vec<_>>(),
exit,
)?;
self.satblocks_destroyed.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.satblocks_destroyed)
.collect::<Vec<_>>(),
exit,
)?;
self.satdays_destroyed.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.satdays_destroyed)
.collect::<Vec<_>>(),
exit,
)?;
macro_rules! sum_others {
($($field:tt).+) => {
self.$($field).+.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.$($field).+).collect::<Vec<_>>(),
exit,
)?
};
}
sum_others!(sent.base.sats.height);
sum_others!(satblocks_destroyed);
sum_others!(satdays_destroyed);
Ok(())
}

View File

@@ -157,36 +157,22 @@ impl BasicCohortMetrics {
others: &[&Self],
exit: &Exit,
) -> Result<()> {
self.supply.compute_from_stateful(
starting_indexes,
&others.iter().map(|v| &*v.supply).collect::<Vec<_>>(),
exit,
)?;
self.outputs.compute_from_stateful(
starting_indexes,
&others.iter().map(|v| &*v.outputs).collect::<Vec<_>>(),
exit,
)?;
self.activity.compute_from_stateful(
starting_indexes,
&others.iter().map(|v| &*v.activity).collect::<Vec<_>>(),
exit,
)?;
self.realized.compute_from_stateful(
starting_indexes,
&others.iter().map(|v| &*v.realized).collect::<Vec<_>>(),
exit,
)?;
self.unrealized.compute_from_stateful(
starting_indexes,
&others.iter().map(|v| &*v.unrealized).collect::<Vec<_>>(),
exit,
)?;
self.cost_basis.compute_from_stateful(
starting_indexes,
&others.iter().map(|v| &*v.cost_basis).collect::<Vec<_>>(),
exit,
)?;
macro_rules! aggregate {
($field:ident) => {
self.$field.compute_from_stateful(
starting_indexes,
&others.iter().map(|v| &*v.$field).collect::<Vec<_>>(),
exit,
)?
};
}
aggregate!(supply);
aggregate!(outputs);
aggregate!(activity);
aggregate!(realized);
aggregate!(unrealized);
aggregate!(cost_basis);
Ok(())
}
}

View File

@@ -187,42 +187,22 @@ pub trait CohortMetricsBase: Send + Sync {
where
Self: Sized,
{
self.supply_mut().compute_from_stateful(
starting_indexes,
&others.iter().map(|v| v.supply()).collect::<Vec<_>>(),
exit,
)?;
self.outputs_mut().compute_from_stateful(
starting_indexes,
&others.iter().map(|v| v.outputs()).collect::<Vec<_>>(),
exit,
)?;
self.activity_mut().compute_from_stateful(
starting_indexes,
&others.iter().map(|v| v.activity()).collect::<Vec<_>>(),
exit,
)?;
self.realized_base_mut().compute_from_stateful(
starting_indexes,
&others.iter().map(|v| v.realized_base()).collect::<Vec<_>>(),
exit,
)?;
self.unrealized_base_mut().compute_from_stateful(
starting_indexes,
&others
.iter()
.map(|v| v.unrealized_base())
.collect::<Vec<_>>(),
exit,
)?;
self.cost_basis_base_mut().compute_from_stateful(
starting_indexes,
&others
.iter()
.map(|v| v.cost_basis_base())
.collect::<Vec<_>>(),
exit,
)?;
macro_rules! aggregate {
($self_mut:ident, $accessor:ident) => {
self.$self_mut().compute_from_stateful(
starting_indexes,
&others.iter().map(|v| v.$accessor()).collect::<Vec<_>>(),
exit,
)?
};
}
aggregate!(supply_mut, supply);
aggregate!(outputs_mut, outputs);
aggregate!(activity_mut, activity);
aggregate!(realized_base_mut, realized_base);
aggregate!(unrealized_base_mut, unrealized_base);
aggregate!(cost_basis_base_mut, cost_basis_base);
Ok(())
}
}

View File

@@ -233,21 +233,24 @@ impl RealizedBase {
}
pub(crate) fn min_stateful_height_len(&self) -> usize {
self.realized_cap
.height
.len()
.min(self.realized_profit.height.len())
.min(self.realized_loss.height.len())
.min(self.investor_price.cents.height.len())
.min(self.cap_raw.len())
.min(self.investor_cap_raw.len())
.min(self.profit_value_created.height.len())
.min(self.profit_value_destroyed.height.len())
.min(self.loss_value_created.height.len())
.min(self.loss_value_destroyed.height.len())
.min(self.peak_regret.height.len())
.min(self.sent_in_profit.base.sats.height.len())
.min(self.sent_in_loss.base.sats.height.len())
[
self.realized_cap.height.len(),
self.realized_profit.height.len(),
self.realized_loss.height.len(),
self.investor_price.cents.height.len(),
self.cap_raw.len(),
self.investor_cap_raw.len(),
self.profit_value_created.height.len(),
self.profit_value_destroyed.height.len(),
self.loss_value_created.height.len(),
self.loss_value_destroyed.height.len(),
self.peak_regret.height.len(),
self.sent_in_profit.base.sats.height.len(),
self.sent_in_loss.base.sats.height.len(),
]
.into_iter()
.min()
.unwrap()
}
pub(crate) fn truncate_push(&mut self, height: Height, state: &RealizedState) -> Result<()> {
@@ -320,30 +323,19 @@ impl RealizedBase {
others: &[&Self],
exit: &Exit,
) -> Result<()> {
self.realized_cap_cents.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.realized_cap_cents.height)
.collect::<Vec<_>>(),
exit,
)?;
self.realized_profit.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.realized_profit.height)
.collect::<Vec<_>>(),
exit,
)?;
self.realized_loss.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.realized_loss.height)
.collect::<Vec<_>>(),
exit,
)?;
macro_rules! sum_others {
($($field:tt).+) => {
self.$($field).+.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.$($field).+).collect::<Vec<_>>(),
exit,
)?
};
}
sum_others!(realized_cap_cents.height);
sum_others!(realized_profit.height);
sum_others!(realized_loss.height);
// Aggregate raw values for investor_price computation
let investor_price_dep_version = others
@@ -404,62 +396,13 @@ impl RealizedBase {
self.investor_price.cents.height.write()?;
}
self.profit_value_created.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.profit_value_created.height)
.collect::<Vec<_>>(),
exit,
)?;
self.profit_value_destroyed.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.profit_value_destroyed.height)
.collect::<Vec<_>>(),
exit,
)?;
self.loss_value_created.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.loss_value_created.height)
.collect::<Vec<_>>(),
exit,
)?;
self.loss_value_destroyed.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.loss_value_destroyed.height)
.collect::<Vec<_>>(),
exit,
)?;
self.peak_regret.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.peak_regret.height)
.collect::<Vec<_>>(),
exit,
)?;
self.sent_in_profit.base.sats.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.sent_in_profit.base.sats.height)
.collect::<Vec<_>>(),
exit,
)?;
self.sent_in_loss.base.sats.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.sent_in_loss.base.sats.height)
.collect::<Vec<_>>(),
exit,
)?;
sum_others!(profit_value_created.height);
sum_others!(profit_value_destroyed.height);
sum_others!(loss_value_created.height);
sum_others!(loss_value_destroyed.height);
sum_others!(peak_regret.height);
sum_others!(sent_in_profit.base.sats.height);
sum_others!(sent_in_loss.base.sats.height);
Ok(())
}

View File

@@ -186,60 +186,22 @@ impl UnrealizedBase {
others: &[&Self],
exit: &Exit,
) -> Result<()> {
self.supply_in_profit.sats.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.supply_in_profit.sats.height)
.collect::<Vec<_>>(),
exit,
)?;
self.supply_in_loss.sats.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.supply_in_loss.sats.height)
.collect::<Vec<_>>(),
exit,
)?;
self.unrealized_profit.cents.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.unrealized_profit.cents.height)
.collect::<Vec<_>>(),
exit,
)?;
self.unrealized_loss.cents.height.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.unrealized_loss.cents.height)
.collect::<Vec<_>>(),
exit,
)?;
self.invested_capital_in_profit
.cents
.height
.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.invested_capital_in_profit.cents.height)
.collect::<Vec<_>>(),
exit,
)?;
self.invested_capital_in_loss
.cents
.height
.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| &v.invested_capital_in_loss.cents.height)
.collect::<Vec<_>>(),
exit,
)?;
macro_rules! sum_others {
($($field:tt).+) => {
self.$($field).+.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.$($field).+).collect::<Vec<_>>(),
exit,
)?
};
}
sum_others!(supply_in_profit.sats.height);
sum_others!(supply_in_loss.sats.height);
sum_others!(unrealized_profit.cents.height);
sum_others!(unrealized_loss.cents.height);
sum_others!(invested_capital_in_profit.cents.height);
sum_others!(invested_capital_in_loss.cents.height);
// Raw values for aggregation
let start = self

View File

@@ -1,10 +1,20 @@
use std::path::Path;
use std::{collections::BTreeMap, path::Path};
use brk_error::Result;
use brk_types::{Age, Cents, CentsSats, CostBasisSnapshot, Height, Sats, SupplyState};
use brk_types::{Age, Cents, CentsCompact, CentsSats, CentsSquaredSats, CostBasisSnapshot, Height, Sats, SupplyState};
use super::super::cost_basis::{CostBasisData, Percentiles, RealizedState, UnrealizedState};
pub struct SendPrecomputed {
pub sats: Sats,
pub prev_price: Cents,
pub age: Age,
pub current_ps: CentsSats,
pub prev_ps: CentsSats,
pub ath_ps: CentsSats,
pub prev_investor_cap: CentsSquaredSats,
}
pub struct CohortState {
pub supply: SupplyState,
pub realized: RealizedState,
@@ -73,10 +83,6 @@ impl CohortState {
self.realized.reset_single_iteration_values();
}
pub(crate) fn increment(&mut self, supply: &SupplyState, price: Cents) {
self.increment_snapshot(&CostBasisSnapshot::from_utxo(price, supply));
}
pub(crate) fn increment_snapshot(&mut self, s: &CostBasisSnapshot) {
self.supply += &s.supply_state;
@@ -92,10 +98,6 @@ impl CohortState {
}
}
pub(crate) fn decrement(&mut self, supply: &SupplyState, price: Cents) {
self.decrement_snapshot(&CostBasisSnapshot::from_utxo(price, supply));
}
pub(crate) fn decrement_snapshot(&mut self, s: &CostBasisSnapshot) {
self.supply -= &s.supply_state;
@@ -112,19 +114,27 @@ impl CohortState {
}
pub(crate) fn receive_utxo(&mut self, supply: &SupplyState, price: Cents) {
self.receive_utxo_snapshot(supply, &CostBasisSnapshot::from_utxo(price, supply));
}
/// Like receive_utxo but takes a pre-computed snapshot to avoid redundant multiplication
/// when the same supply/price is used across multiple cohorts.
pub(crate) fn receive_utxo_snapshot(
&mut self,
supply: &SupplyState,
snapshot: &CostBasisSnapshot,
) {
self.supply += supply;
if supply.value > Sats::ZERO {
let sats = supply.value;
self.realized.receive(snapshot.realized_price, supply.value);
// Compute once using typed values
let price_sats = CentsSats::from_price_sats(price, sats);
let investor_cap = price_sats.to_investor_cap(price);
self.realized.receive(price, sats);
self.cost_basis_data
.increment(price, sats, price_sats, investor_cap);
self.cost_basis_data.increment(
snapshot.realized_price,
supply.value,
snapshot.price_sats,
snapshot.investor_cap,
);
}
}
@@ -160,6 +170,51 @@ impl CohortState {
}
}
/// Pre-computed values for send_utxo when the same supply/prices are shared
/// across multiple cohorts (age_range, epoch, year).
pub(crate) fn precompute_send(
supply: &SupplyState,
current_price: Cents,
prev_price: Cents,
ath: Cents,
age: Age,
) -> Option<SendPrecomputed> {
if supply.utxo_count == 0 || supply.value == Sats::ZERO {
return None;
}
let sats = supply.value;
let current_ps = CentsSats::from_price_sats(current_price, sats);
let prev_ps = CentsSats::from_price_sats(prev_price, sats);
let ath_ps = CentsSats::from_price_sats(ath, sats);
let prev_investor_cap = prev_ps.to_investor_cap(prev_price);
Some(SendPrecomputed {
sats,
prev_price,
age,
current_ps,
prev_ps,
ath_ps,
prev_investor_cap,
})
}
pub(crate) fn send_utxo_precomputed(
&mut self,
supply: &SupplyState,
pre: &SendPrecomputed,
) {
self.supply -= supply;
self.sent += pre.sats;
self.satblocks_destroyed += pre.age.satblocks_destroyed(pre.sats);
self.satdays_destroyed += pre.age.satdays_destroyed(pre.sats);
self.realized
.send(pre.sats, pre.current_ps, pre.prev_ps, pre.ath_ps, pre.prev_investor_cap);
self.cost_basis_data
.decrement(pre.prev_price, pre.sats, pre.prev_ps, pre.prev_investor_cap);
}
pub(crate) fn send_utxo(
&mut self,
supply: &SupplyState,
@@ -168,33 +223,10 @@ impl CohortState {
ath: Cents,
age: Age,
) {
if supply.utxo_count == 0 {
return;
}
self.supply -= supply;
if supply.value > Sats::ZERO {
self.sent += supply.value;
self.satblocks_destroyed += age.satblocks_destroyed(supply.value);
self.satdays_destroyed += age.satdays_destroyed(supply.value);
let cp = current_price;
let pp = prev_price;
let ath_price = ath;
let sats = supply.value;
// Compute ONCE using typed values
let current_ps = CentsSats::from_price_sats(cp, sats);
let prev_ps = CentsSats::from_price_sats(pp, sats);
let ath_ps = CentsSats::from_price_sats(ath_price, sats);
let prev_investor_cap = prev_ps.to_investor_cap(pp);
self.realized
.send(sats, current_ps, prev_ps, ath_ps, prev_investor_cap);
self.cost_basis_data
.decrement(pp, sats, prev_ps, prev_investor_cap);
if let Some(pre) = Self::precompute_send(supply, current_price, prev_price, ath, age) {
self.send_utxo_precomputed(supply, &pre);
} else if supply.utxo_count > 0 {
self.supply -= supply;
}
}
@@ -263,7 +295,7 @@ impl CohortState {
self.cost_basis_data.write(height, cleanup)
}
pub(crate) fn cost_basis_data_iter(&self) -> impl Iterator<Item = (Cents, &Sats)> {
self.cost_basis_data.iter().map(|(k, v)| (k.into(), v))
pub(crate) fn cost_basis_map(&self) -> &BTreeMap<CentsCompact, Sats> {
self.cost_basis_data.map()
}
}

View File

@@ -34,6 +34,8 @@ pub struct CostBasisData {
percentiles_dirty: bool,
cached_percentiles: Option<Percentiles>,
rounding_digits: Option<i32>,
/// Monotonically increasing counter, bumped on each apply_pending with actual changes.
generation: u64,
}
const STATE_TO_KEEP: usize = 10;
@@ -49,6 +51,7 @@ impl CostBasisData {
percentiles_dirty: true,
cached_percentiles: None,
rounding_digits: None,
generation: 0,
}
}
@@ -93,15 +96,9 @@ impl CostBasisData {
&& self.pending_raw.investor_cap_dec == CentsSquaredSats::ZERO
}
pub(crate) fn iter(&self) -> impl Iterator<Item = (CentsCompact, &Sats)> {
pub(crate) fn map(&self) -> &CostBasisMap {
self.assert_pending_empty();
self.state
.as_ref()
.unwrap()
.base
.map
.iter()
.map(|(&k, v)| (k, v))
&self.state.as_ref().unwrap().base.map
}
pub(crate) fn is_empty(&self) -> bool {
@@ -183,18 +180,14 @@ impl CostBasisData {
}
pub(crate) fn apply_pending(&mut self) {
if !self.pending.is_empty() {
self.percentiles_dirty = true;
if self.pending.is_empty() && self.pending_raw_is_zero() {
return;
}
self.generation = self.generation.wrapping_add(1);
self.percentiles_dirty = true;
let map = &mut self.state.as_mut().unwrap().base.map;
for (cents, (inc, dec)) in self.pending.drain() {
let entry = self
.state
.as_mut()
.unwrap()
.base
.map
.entry(cents)
.or_default();
let entry = map.entry(cents).or_default();
*entry += inc;
if *entry < dec {
panic!(
@@ -211,7 +204,7 @@ impl CostBasisData {
}
*entry -= dec;
if *entry == Sats::ZERO {
self.state.as_mut().unwrap().base.map.remove(&cents);
map.remove(&cents);
}
}
@@ -267,7 +260,8 @@ impl CostBasisData {
if !self.percentiles_dirty {
return self.cached_percentiles;
}
self.cached_percentiles = Percentiles::compute(self.iter().map(|(k, &v)| (k, v)));
self.cached_percentiles =
Percentiles::compute_from_map(&self.state.as_ref().unwrap().base.map);
self.percentiles_dirty = false;
self.cached_percentiles
}

View File

@@ -1,7 +1,9 @@
use brk_types::{Cents, CentsCompact, Sats};
use brk_types::Cents;
use crate::internal::{PERCENTILES, PERCENTILES_LEN};
use super::CostBasisMap;
#[derive(Clone, Copy, Debug)]
pub struct Percentiles {
/// Sat-weighted: percentiles by coin count
@@ -11,19 +13,17 @@ pub struct Percentiles {
}
impl Percentiles {
/// Compute both sat-weighted and USD-weighted percentiles in a single pass.
/// Takes an iterator over (price, sats) pairs, assumed sorted by price ascending.
pub(crate) fn compute(iter: impl Iterator<Item = (CentsCompact, Sats)>) -> Option<Self> {
// Collect to allow two passes: one for totals, one for percentiles
let entries: Vec<_> = iter.collect();
if entries.is_empty() {
/// Compute both sat-weighted and USD-weighted percentiles in two passes over the BTreeMap.
/// Avoids intermediate Vec allocation by iterating the map directly.
pub(crate) fn compute_from_map(map: &CostBasisMap) -> Option<Self> {
if map.is_empty() {
return None;
}
// Compute totals
// First pass: compute totals
let mut total_sats: u64 = 0;
let mut total_usd: u128 = 0;
for &(cents, sats) in &entries {
for (&cents, &sats) in map.iter() {
total_sats += u64::from(sats);
total_usd += cents.as_u128() * sats.as_u128();
}
@@ -32,6 +32,12 @@ impl Percentiles {
return None;
}
// Precompute targets to avoid repeated multiplication in the inner loop
let sat_targets: [u64; PERCENTILES_LEN] =
PERCENTILES.map(|p| total_sats * u64::from(p) / 100);
let usd_targets: [u128; PERCENTILES_LEN] =
PERCENTILES.map(|p| total_usd * u128::from(p) / 100);
let mut sat_weighted = [Cents::ZERO; PERCENTILES_LEN];
let mut usd_weighted = [Cents::ZERO; PERCENTILES_LEN];
let mut cumsum_sats: u64 = 0;
@@ -39,20 +45,17 @@ impl Percentiles {
let mut sat_idx = 0;
let mut usd_idx = 0;
for (cents, sats) in entries {
// Second pass: compute percentiles
for (&cents, &sats) in map.iter() {
cumsum_sats += u64::from(sats);
cumsum_usd += cents.as_u128() * sats.as_u128();
while sat_idx < PERCENTILES_LEN
&& cumsum_sats >= total_sats * u64::from(PERCENTILES[sat_idx]) / 100
{
while sat_idx < PERCENTILES_LEN && cumsum_sats >= sat_targets[sat_idx] {
sat_weighted[sat_idx] = cents.into();
sat_idx += 1;
}
while usd_idx < PERCENTILES_LEN
&& cumsum_usd >= total_usd * u128::from(PERCENTILES[usd_idx]) / 100
{
while usd_idx < PERCENTILES_LEN && cumsum_usd >= usd_targets[usd_idx] {
usd_weighted[usd_idx] = cents.into();
usd_idx += 1;
}

View File

@@ -6,7 +6,9 @@ use vecdb::{
VecIndex, VecValue, Version,
};
use crate::internal::{ComputedVecValue, DistributionStats};
use crate::internal::{
ComputedVecValue, DistributionStats, compute_aggregations, compute_aggregations_nblock_window,
};
#[derive(Traversable)]
pub struct Distribution<I: VecIndex, T: ComputedVecValue + JsonSchema, M: StorageMode = Rw> {
@@ -50,7 +52,7 @@ impl<I: VecIndex, T: ComputedVecValue + JsonSchema> Distribution<I, T> {
where
A: VecIndex + VecValue + brk_types::CheckedSub<A>,
{
crate::internal::compute_aggregations(
compute_aggregations(
max_from,
source,
first_indexes,
@@ -87,7 +89,7 @@ impl<I: VecIndex, T: ComputedVecValue + JsonSchema> Distribution<I, T> {
where
A: VecIndex + VecValue + brk_types::CheckedSub<A>,
{
crate::internal::compute_aggregations_nblock_window(
compute_aggregations_nblock_window(
max_from,
source,
first_indexes,

View File

@@ -6,7 +6,7 @@ use vecdb::{
VecIndex, VecValue, Version,
};
use crate::internal::ComputedVecValue;
use crate::internal::{ComputedVecValue, compute_aggregations};
use super::Distribution;
@@ -43,7 +43,7 @@ impl<I: VecIndex, T: ComputedVecValue + JsonSchema> Full<I, T> {
where
A: VecIndex + VecValue + brk_types::CheckedSub<A>,
{
crate::internal::compute_aggregations(
compute_aggregations(
max_from,
source,
first_indexes,

View File

@@ -68,11 +68,8 @@ where
} else {
0
};
let partial_values: Vec<f64> = values
.collect_range_at(range_start, end)
.into_iter()
.map(|a| f64::from(a))
.collect();
let mut partial_values: Vec<f64> = Vec::with_capacity(end - range_start);
values.for_each_range_at(range_start, end, |a: A| partial_values.push(f64::from(a)));
let capacity = if skip > 0 && skip < end {
let first_start = window_starts.collect_one_at(skip).unwrap().to_usize();

View File

@@ -45,11 +45,8 @@ where
} else {
0
};
let partial_values: Vec<f64> = values
.collect_range_at(range_start, end)
.into_iter()
.map(|a| f64::from(a))
.collect();
let mut partial_values: Vec<f64> = Vec::with_capacity(end - range_start);
values.for_each_range_at(range_start, end, |a: A| partial_values.push(f64::from(a)));
let capacity = if skip > 0 && skip < end {
let first_start = window_starts.collect_one_at(skip).unwrap().to_usize();

View File

@@ -39,9 +39,8 @@ impl SortedBlocks {
// Find the block where value belongs: first block whose max >= value
let block_idx = self
.blocks
.iter()
.position(|b| *b.last().unwrap() >= value)
.unwrap_or(self.blocks.len() - 1);
.partition_point(|b| *b.last().unwrap() < value)
.min(self.blocks.len() - 1);
let block = &mut self.blocks[block_idx];
let pos = block.partition_point(|a| *a < value);

View File

@@ -126,25 +126,26 @@ impl TDigest {
return;
}
let total: f64 = self.centroids.iter().map(|c| c.weight).sum();
let mut merged: Vec<Centroid> = Vec::with_capacity(self.centroids.len());
let total = self.count as f64;
let mut cum = 0.0;
let mut write_idx = 0;
for c in &self.centroids {
if let Some(last) = merged.last_mut() {
let q = (cum + last.weight / 2.0) / total;
let limit = (4.0 * self.compression * q * (1.0 - q)).floor().max(1.0);
if last.weight + c.weight <= limit {
let new_weight = last.weight + c.weight;
last.mean = (last.mean * last.weight + c.mean * c.weight) / new_weight;
last.weight = new_weight;
continue;
}
for read_idx in 1..self.centroids.len() {
let c = self.centroids[read_idx];
let last = &mut self.centroids[write_idx];
let q = (cum + last.weight / 2.0) / total;
let limit = (4.0 * self.compression * q * (1.0 - q)).floor().max(1.0);
if last.weight + c.weight <= limit {
let new_weight = last.weight + c.weight;
last.mean = (last.mean * last.weight + c.mean * c.weight) / new_weight;
last.weight = new_weight;
} else {
cum += last.weight;
write_idx += 1;
self.centroids[write_idx] = c;
}
merged.push(*c);
}
self.centroids = merged;
self.centroids.truncate(write_idx + 1);
}
/// Batch quantile query in a single pass. `qs` must be sorted ascending.
@@ -167,7 +168,7 @@ impl TDigest {
return;
}
let total: f64 = self.centroids.iter().map(|c| c.weight).sum();
let total = self.count as f64;
let mut cum = 0.0;
let mut ci = 0;

View File

@@ -33,20 +33,16 @@ impl<B: BpsType> PercentFromHeight<B> {
indexes: &indexes::Vecs,
) -> Result<Self> {
let bps = ComputedFromHeight::forced_import(db, &format!("{name}_bps"), version, indexes)?;
let bps_clone = bps.height.read_only_boxed_clone();
let ratio = LazyFromHeight::from_computed::<B::ToRatio>(
&format!("{name}_ratio"),
version,
bps.height.read_only_boxed_clone(),
bps_clone.clone(),
&bps,
);
let percent = LazyFromHeight::from_computed::<B::ToPercent>(
name,
version,
bps.height.read_only_boxed_clone(),
&bps,
);
let percent = LazyFromHeight::from_computed::<B::ToPercent>(name, version, bps_clone, &bps);
Ok(Self {
bps,

View File

@@ -8,7 +8,7 @@ use vecdb::{
use crate::{
blocks, indexes,
internal::{ComputedFromHeightStdDevExtended, Price, TDigest},
internal::{ComputedFromHeightStdDevExtended, Price, PriceTimesRatioBp32Cents, TDigest},
};
use super::{super::ComputedFromHeight, ComputedFromHeightRatio};
@@ -200,8 +200,6 @@ impl ComputedFromHeightRatioExtension {
metric_price: &impl ReadableVec<Height, Cents>,
exit: &Exit,
) -> Result<()> {
use crate::internal::PriceTimesRatioBp32Cents;
macro_rules! compute_band {
($usd_field:ident, $band_source:expr) => {
self.$usd_field

View File

@@ -6,9 +6,10 @@ use vecdb::{
WritableVec,
};
use crate::{blocks, indexes};
use crate::internal::{ComputedFromHeight, Price};
use crate::{
blocks, indexes,
internal::{ComputedFromHeight, Price, PriceTimesRatioCents},
};
use super::ComputedFromHeightStdDev;
@@ -172,8 +173,7 @@ impl ComputedFromHeightStdDevExtended {
const MULTIPLIERS: [f32; 12] = [
0.5, 1.0, 1.5, 2.0, 2.5, 3.0, -0.5, -1.0, -1.5, -2.0, -2.5, -3.0,
];
let band_vecs: Vec<_> = self.mut_band_height_vecs().collect();
for (vec, mult) in band_vecs.into_iter().zip(MULTIPLIERS) {
for (vec, mult) in self.mut_band_height_vecs().zip(MULTIPLIERS) {
for (offset, _) in source_data.iter().enumerate() {
let index = start + offset;
let average = sma_data[offset];
@@ -214,8 +214,6 @@ impl ComputedFromHeightStdDevExtended {
metric_price: &impl ReadableVec<Height, Cents>,
exit: &Exit,
) -> Result<()> {
use crate::internal::PriceTimesRatioCents;
macro_rules! compute_band {
($usd_field:ident, $band_source:expr) => {
self.$usd_field

View File

@@ -52,6 +52,19 @@ impl ComputedFromHeightStdDev {
exit: &Exit,
source: &impl ReadableVec<Height, StoredF32>,
) -> Result<()> {
if self.days == usize::MAX {
self.sma
.height
.compute_sma_(starting_indexes.height, source, usize::MAX, exit, None)?;
self.sd.height.compute_expanding_sd(
starting_indexes.height,
source,
&self.sma.height,
exit,
)?;
return Ok(());
}
let window_starts = blocks.count.start_vec(self.days);
self.sma.height.compute_rolling_average(

View File

@@ -144,11 +144,10 @@ impl Vecs {
}
// Lump sum by period - returns (compute from lookback price)
let lookback_dca2 = lookback.price_lookback.as_dca_period();
for (returns, (lookback_price, _)) in self
.period_lump_sum_return
.iter_mut()
.zip(lookback_dca2.iter_with_days())
.zip(lookback_dca.iter_with_days())
{
returns.compute_binary::<Cents, Cents, RatioDiffCentsBps32>(
starting_indexes.height,

View File

@@ -2,35 +2,26 @@ use brk_error::Result;
use brk_types::{BasisPoints16, Dollars, Indexes};
use vecdb::Exit;
use super::{super::range, Vecs};
use crate::{
blocks, distribution,
internal::{RatioDollarsBp32, Windows},
mining, prices, transactions,
use super::{
super::{moving_average, range, returns},
Vecs, gini, macd, rsi,
};
use crate::{blocks, distribution, internal::RatioDollarsBp32, mining, prices, transactions};
fn tf_multiplier(tf: &str) -> usize {
match tf {
"24h" => 1,
"1w" => 7,
"1m" => 30,
"1y" => 365,
_ => unreachable!(),
}
}
const TF_MULTIPLIERS: [usize; 4] = [1, 7, 30, 365];
impl Vecs {
#[allow(clippy::too_many_arguments)]
pub(crate) fn compute(
&mut self,
rewards: &mining::RewardsVecs,
returns: &super::super::returns::Vecs,
returns: &returns::Vecs,
range: &range::Vecs,
prices: &prices::Vecs,
blocks: &blocks::Vecs,
distribution: &distribution::Vecs,
transactions: &transactions::Vecs,
moving_average: &super::super::moving_average::Vecs,
moving_average: &moving_average::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
@@ -72,22 +63,23 @@ impl Vecs {
}
// RSI per timeframe
for (tf, rsi_chain) in Windows::<()>::SUFFIXES
let return_sources = [
&returns.price_return._24h.ratio.height,
&returns.price_return._1w.ratio.height,
&returns.price_return._1m.ratio.height,
&returns.price_return._1y.ratio.height,
];
for ((rsi_chain, ret), &m) in self
.rsi
.as_mut_array()
.into_iter()
.zip(self.rsi.as_mut_array())
.zip(return_sources)
.zip(&TF_MULTIPLIERS)
{
let m = tf_multiplier(tf);
let returns_source = match tf {
"24h" => &returns.price_return._24h.ratio.height,
"1w" => &returns.price_return._1w.ratio.height,
"1m" => &returns.price_return._1m.ratio.height,
"1y" => &returns.price_return._1y.ratio.height,
_ => unreachable!(),
};
super::rsi::compute(
rsi::compute(
rsi_chain,
blocks,
returns_source,
ret,
14 * m,
3 * m,
starting_indexes,
@@ -96,12 +88,8 @@ impl Vecs {
}
// MACD per timeframe
for (tf, macd_chain) in Windows::<()>::SUFFIXES
.into_iter()
.zip(self.macd.as_mut_array())
{
let m = tf_multiplier(tf);
super::macd::compute(
for (macd_chain, &m) in self.macd.as_mut_array().into_iter().zip(&TF_MULTIPLIERS) {
macd::compute(
macd_chain,
blocks,
prices,
@@ -114,21 +102,22 @@ impl Vecs {
}
// Gini (per height)
super::gini::compute(&mut self.gini, distribution, starting_indexes, exit)?;
gini::compute(&mut self.gini, distribution, starting_indexes, exit)?;
// NVT: market_cap / tx_volume_24h
let market_cap = &distribution
.utxo_cohorts
.all
.metrics
.supply
.total
.usd
.height;
self.nvt
.bps
.compute_binary::<Dollars, Dollars, RatioDollarsBp32>(
starting_indexes.height,
&distribution
.utxo_cohorts
.all
.metrics
.supply
.total
.usd
.height,
market_cap,
&transactions.volume.sent_sum.rolling._24h.usd.height,
exit,
)?;

View File

@@ -46,24 +46,13 @@ impl Vecs {
let _24h_price_return_ratio = &self.price_return._24h.ratio.height;
self.price_return_24h_sd_1w.compute_all(
blocks,
starting_indexes,
exit,
_24h_price_return_ratio,
)?;
self.price_return_24h_sd_1m.compute_all(
blocks,
starting_indexes,
exit,
_24h_price_return_ratio,
)?;
self.price_return_24h_sd_1y.compute_all(
blocks,
starting_indexes,
exit,
_24h_price_return_ratio,
)?;
for sd in [
&mut self.price_return_24h_sd_1w,
&mut self.price_return_24h_sd_1m,
&mut self.price_return_24h_sd_1y,
] {
sd.compute_all(blocks, starting_indexes, exit, _24h_price_return_ratio)?;
}
// Downside returns: min(return, 0)
self.price_downside_24h.compute_transform(
@@ -77,24 +66,13 @@ impl Vecs {
)?;
// Downside deviation (SD of downside returns)
self.price_downside_24h_sd_1w.compute_all(
blocks,
starting_indexes,
exit,
&self.price_downside_24h,
)?;
self.price_downside_24h_sd_1m.compute_all(
blocks,
starting_indexes,
exit,
&self.price_downside_24h,
)?;
self.price_downside_24h_sd_1y.compute_all(
blocks,
starting_indexes,
exit,
&self.price_downside_24h,
)?;
for sd in [
&mut self.price_downside_24h_sd_1w,
&mut self.price_downside_24h_sd_1m,
&mut self.price_downside_24h_sd_1y,
] {
sd.compute_all(blocks, starting_indexes, exit, &self.price_downside_24h)?;
}
Ok(())
}

View File

@@ -30,78 +30,66 @@ impl Vecs {
&self.price_volatility_1y.height,
),
] {
compute_ratio(&mut out.height, starting_indexes_height, ret, vol, exit)?;
compute_divided(
&mut out.height,
starting_indexes_height,
ret,
vol,
1.0,
exit,
)?;
}
// Sortino ratios: returns / downside volatility (sd * sqrt(days))
compute_sortino(
&mut self.price_sortino_1w.height,
starting_indexes_height,
&returns.price_return._1w.ratio.height,
&returns.price_downside_24h_sd_1w.sd.height,
7.0_f32.sqrt(),
exit,
)?;
compute_sortino(
&mut self.price_sortino_1m.height,
starting_indexes_height,
&returns.price_return._1m.ratio.height,
&returns.price_downside_24h_sd_1m.sd.height,
30.0_f32.sqrt(),
exit,
)?;
compute_sortino(
&mut self.price_sortino_1y.height,
starting_indexes_height,
&returns.price_return._1y.ratio.height,
&returns.price_downside_24h_sd_1y.sd.height,
365.0_f32.sqrt(),
exit,
)?;
for (out, ret, sd, sqrt_days) in [
(
&mut self.price_sortino_1w,
&returns.price_return._1w.ratio.height,
&returns.price_downside_24h_sd_1w.sd.height,
7.0_f32.sqrt(),
),
(
&mut self.price_sortino_1m,
&returns.price_return._1m.ratio.height,
&returns.price_downside_24h_sd_1m.sd.height,
30.0_f32.sqrt(),
),
(
&mut self.price_sortino_1y,
&returns.price_return._1y.ratio.height,
&returns.price_downside_24h_sd_1y.sd.height,
365.0_f32.sqrt(),
),
] {
compute_divided(
&mut out.height,
starting_indexes_height,
ret,
sd,
sqrt_days,
exit,
)?;
}
Ok(())
}
}
fn compute_ratio(
fn compute_divided(
out: &mut EagerVec<PcoVec<Height, StoredF32>>,
starting_indexes_height: Height,
ret: &impl ReadableVec<Height, StoredF32>,
vol: &impl ReadableVec<Height, StoredF32>,
divisor: &impl ReadableVec<Height, StoredF32>,
divisor_scale: f32,
exit: &Exit,
) -> Result<()> {
out.compute_transform2(
starting_indexes_height,
ret,
vol,
|(h, ret, vol, ..)| {
let ratio = if *vol == 0.0 { 0.0 } else { *ret / *vol };
(h, StoredF32::from(ratio))
},
exit,
)?;
Ok(())
}
fn compute_sortino(
out: &mut EagerVec<PcoVec<Height, StoredF32>>,
starting_indexes_height: Height,
ret: &impl ReadableVec<Height, StoredF32>,
sd: &impl ReadableVec<Height, StoredF32>,
sqrt_days: f32,
exit: &Exit,
) -> Result<()> {
out.compute_transform2(
starting_indexes_height,
ret,
sd,
|(h, ret, sd, ..)| {
let downside_vol = (*sd) * sqrt_days;
let ratio = if downside_vol == 0.0 {
0.0
} else {
(*ret) / downside_vol
};
divisor,
|(h, ret, div, ..)| {
let denom = (*div) * divisor_scale;
let ratio = if denom == 0.0 { 0.0 } else { (*ret) / denom };
(h, StoredF32::from(ratio))
},
exit,

View File

@@ -34,33 +34,15 @@ impl Vecs {
exit,
)?;
self.hash_rate_sma_1w.height.compute_rolling_average(
starting_indexes.height,
&count_vecs.height_1w_ago,
&self.hash_rate.height,
exit,
)?;
self.hash_rate_sma_1m.height.compute_rolling_average(
starting_indexes.height,
&count_vecs.height_1m_ago,
&self.hash_rate.height,
exit,
)?;
self.hash_rate_sma_2m.height.compute_rolling_average(
starting_indexes.height,
&count_vecs.height_2m_ago,
&self.hash_rate.height,
exit,
)?;
self.hash_rate_sma_1y.height.compute_rolling_average(
starting_indexes.height,
&count_vecs.height_1y_ago,
&self.hash_rate.height,
exit,
)?;
let hash_rate = &self.hash_rate.height;
for (sma, window) in [
(&mut self.hash_rate_sma_1w.height, &count_vecs.height_1w_ago),
(&mut self.hash_rate_sma_1m.height, &count_vecs.height_1m_ago),
(&mut self.hash_rate_sma_2m.height, &count_vecs.height_2m_ago),
(&mut self.hash_rate_sma_1y.height, &count_vecs.height_1y_ago),
] {
sma.compute_rolling_average(starting_indexes.height, window, hash_rate, exit)?;
}
self.hash_rate_ath.height.compute_all_time_high(
starting_indexes.height,
@@ -121,33 +103,26 @@ impl Vecs {
exit,
)?;
self.hash_price_ths_min.height.compute_all_time_low_(
starting_indexes.height,
&self.hash_price_ths.height,
exit,
true,
)?;
self.hash_price_phs_min.height.compute_all_time_low_(
starting_indexes.height,
&self.hash_price_phs.height,
exit,
true,
)?;
self.hash_value_ths_min.height.compute_all_time_low_(
starting_indexes.height,
&self.hash_value_ths.height,
exit,
true,
)?;
self.hash_value_phs_min.height.compute_all_time_low_(
starting_indexes.height,
&self.hash_value_phs.height,
exit,
true,
)?;
for (min_vec, src_vec) in [
(
&mut self.hash_price_ths_min.height,
&self.hash_price_ths.height,
),
(
&mut self.hash_price_phs_min.height,
&self.hash_price_phs.height,
),
(
&mut self.hash_value_ths_min.height,
&self.hash_value_ths.height,
),
(
&mut self.hash_value_phs_min.height,
&self.hash_value_phs.height,
),
] {
min_vec.compute_all_time_low_(starting_indexes.height, src_vec, exit, true)?;
}
self.hash_price_rebound
.compute_binary::<StoredF32, StoredF32, RatioDiffF32Bps32>(

View File

@@ -90,8 +90,7 @@ impl Vecs {
(
height,
coinbase.checked_sub(fees).unwrap_or_else(|| {
dbg!(height, coinbase, fees);
panic!()
panic!("coinbase {coinbase:?} < fees {fees:?} at {height:?}")
}),
)
},

View File

@@ -188,22 +188,19 @@ impl Vecs {
.copied()
.unwrap_or(TxIndex::from(total_txs));
let next_out_first = out_firsts
.get(idx + 1)
.copied()
.unwrap_or(TxOutIndex::from(total_outputs))
.to_usize();
let out_start = if first_txindex.to_usize() + 1 < next_first_txindex.to_usize() {
let target = first_txindex.to_usize() + 1;
txout_cursor.advance(target - txout_cursor.position());
txout_cursor.next().unwrap().to_usize()
} else {
out_firsts
.get(idx + 1)
.copied()
.unwrap_or(TxOutIndex::from(total_outputs))
.to_usize()
next_out_first
};
let out_end = out_firsts
.get(idx + 1)
.copied()
.unwrap_or(TxOutIndex::from(total_outputs))
.to_usize();
let out_end = next_out_first;
indexer
.vecs

View File

@@ -55,19 +55,20 @@ impl Vecs {
exit,
)?;
let realized_cap = &distribution
.utxo_cohorts
.all
.metrics
.realized
.realized_cap
.height;
self.realized_cap_growth_rate
.bps
.height
.compute_rolling_ratio_change(
starting_indexes.height,
&blocks.count.height_1y_ago,
&distribution
.utxo_cohorts
.all
.metrics
.realized
.realized_cap
.height,
realized_cap,
exit,
)?;