global: snapshot

This commit is contained in:
nym21
2026-03-08 01:30:30 +01:00
parent cf6c755e51
commit 6bb5c63db7
23 changed files with 2024 additions and 198 deletions

View File

@@ -15,7 +15,7 @@ use crate::{blocks, distribution::DynCohortVecs, indexes, prices};
use crate::distribution::metrics::{
AllCohortMetrics, BasicCohortMetrics, CohortMetricsBase, CoreCohortMetrics,
ExtendedAdjustedCohortMetrics, ExtendedCohortMetrics, ImportConfig, MinimalCohortMetrics,
SupplyMetrics,
ProfitabilityMetrics, SupplyMetrics,
};
use super::{percentiles::PercentileCache, vecs::UTXOCohortVecs};
@@ -39,6 +39,7 @@ pub struct UTXOCohorts<M: StorageMode = Rw> {
pub amount_range: ByAmountRange<UTXOCohortVecs<MinimalCohortMetrics<M>>>,
pub lt_amount: ByLowerThanAmount<UTXOCohortVecs<MinimalCohortMetrics<M>>>,
pub type_: BySpendableType<UTXOCohortVecs<MinimalCohortMetrics<M>>>,
pub profitability: ProfitabilityMetrics<M>,
#[traversable(skip)]
pub(super) percentile_cache: PercentileCache,
/// Cached partition_point positions for tick_tock boundary searches.
@@ -141,6 +142,9 @@ impl UTXOCohorts<Rw> {
AllCohortMetrics::forced_import_with_supply(&all_cfg, all_supply)?,
);
// Phase 3b: Import profitability metrics (derived from "all" during k-way merge).
let profitability = ProfitabilityMetrics::forced_import(db, v, indexes)?;
// Phase 4: Import aggregate cohorts.
// sth: ExtendedAdjustedCohortMetrics
@@ -227,6 +231,7 @@ impl UTXOCohorts<Rw> {
amount_range,
lt_amount,
ge_amount,
profitability,
percentile_cache: PercentileCache::default(),
tick_tock_cached_positions: [0; 20],
})
@@ -590,6 +595,7 @@ impl UTXOCohorts<Rw> {
for v in self.type_.iter_mut() {
vecs.extend(v.metrics.collect_all_vecs_mut());
}
vecs.extend(self.profitability.collect_all_vecs_mut());
vecs.into_par_iter()
}
@@ -599,12 +605,13 @@ impl UTXOCohorts<Rw> {
.try_for_each(|v| v.write_state(height, cleanup))
}
/// Get minimum height from all separate cohorts' height-indexed vectors.
/// Get minimum height from all separate cohorts' + profitability height-indexed vectors.
pub(crate) fn min_separate_stateful_height_len(&self) -> Height {
self.iter_separate()
.map(|v| Height::from(v.min_stateful_height_len()))
.min()
.unwrap_or_default()
.min(Height::from(self.profitability.min_stateful_height_len()))
}
/// Import state for all separate cohorts at or before given height.
@@ -650,7 +657,6 @@ impl UTXOCohorts<Rw> {
for v in self.lt_amount.iter_mut() {
v.metrics.validate_computed_versions(base_version)?;
}
Ok(())
}
}

View File

@@ -1,12 +1,15 @@
use std::{cmp::Reverse, collections::BinaryHeap, fs, path::Path};
use brk_cohort::{Filtered, TERM_NAMES};
use brk_cohort::{
compute_profitability_boundaries, Filtered, PROFITABILITY_BOUNDARY_COUNT,
PROFITABILITY_RANGE_COUNT, PROFIT_COUNT, TERM_NAMES,
};
use brk_error::Result;
use brk_types::{Cents, CentsCompact, CostBasisDistribution, Date, Height, Sats};
use brk_types::{Cents, CentsCompact, CostBasisDistribution, Date, Dollars, Height, Sats};
use crate::internal::{PERCENTILES, PERCENTILES_LEN};
use crate::distribution::metrics::{CohortMetricsBase, CostBasis};
use crate::distribution::metrics::{CostBasis, ProfitabilityMetrics};
use super::groups::UTXOCohorts;
@@ -27,45 +30,47 @@ impl CachedPercentiles {
}
}
/// Cached percentile results for all/sth/lth.
/// Cached percentile + profitability results for all/sth/lth.
/// Avoids re-merging 21 BTreeMaps on every block.
#[derive(Clone, Default)]
pub(super) struct PercentileCache {
all: CachedPercentiles,
sth: CachedPercentiles,
lth: CachedPercentiles,
profitability: [(u64, u128); PROFITABILITY_RANGE_COUNT],
initialized: bool,
}
impl UTXOCohorts {
/// Compute and push percentiles for aggregate cohorts (all, sth, lth).
/// Compute and push percentiles + profitability for aggregate cohorts.
///
/// Full K-way merge only runs at day boundaries or when the cache is empty.
/// For intermediate blocks, pushes cached percentile arrays.
/// For intermediate blocks, pushes cached values.
pub(crate) fn truncate_push_aggregate_percentiles(
&mut self,
height: Height,
spot_price: Cents,
date_opt: Option<Date>,
states_path: &Path,
) -> Result<()> {
if date_opt.is_some() || !self.percentile_cache.initialized {
self.merge_and_push_percentiles(height, date_opt, states_path)
} else {
self.push_cached_percentiles(height)
self.recompute_cache(spot_price, date_opt, states_path)?;
}
self.push_cached(height)
}
/// Full K-way merge: compute percentiles from scratch, update cache, push.
fn merge_and_push_percentiles(
/// Full K-way merge: recompute percentiles + profitability from scratch, update cache.
fn recompute_cache(
&mut self,
height: Height,
spot_price: Cents,
date_opt: Option<Date>,
states_path: &Path,
) -> Result<()> {
let collect_merged = date_opt.is_some();
let boundaries = compute_profitability_boundaries(spot_price);
let targets = {
let sth_filter = self.sth.metrics.filter().clone();
let sth_filter = self.sth.metrics.filter.clone();
let mut totals = AllSthLth::<(u64, u128)>::default();
let maps: Vec<_> = self
@@ -101,31 +106,26 @@ impl UTXOCohorts {
};
let all_has_data = totals.all.0 > 0;
let mut targets = totals.map(|(sats, usd)| PercTarget::new(sats, usd, cap));
self.percentile_cache.profitability = Default::default();
if all_has_data {
merge_k_way(&maps, &mut targets, collect_merged);
merge_k_way(
&maps,
&mut targets,
&boundaries,
&mut self.percentile_cache.profitability,
collect_merged,
);
}
targets
};
// Update cache + push
self.percentile_cache.all = targets.all.to_cached();
self.percentile_cache.sth = targets.sth.to_cached();
self.percentile_cache.lth = targets.lth.to_cached();
self.percentile_cache.initialized = true;
self.percentile_cache
.all
.push(height, &mut self.all.metrics.cost_basis)?;
self.percentile_cache
.sth
.push(height, &mut self.sth.metrics.cost_basis)?;
self.percentile_cache
.lth
.push(height, &mut self.lth.metrics.cost_basis)?;
// Serialize full distribution at day boundaries
if let Some(date) = date_opt {
write_distribution(states_path, "all", date, targets.all.merged)?;
write_distribution(states_path, TERM_NAMES.short.id, date, targets.sth.merged)?;
@@ -135,8 +135,8 @@ impl UTXOCohorts {
Ok(())
}
/// Fast path: push cached percentile arrays.
fn push_cached_percentiles(&mut self, height: Height) -> Result<()> {
/// Push cached percentile + profitability values.
fn push_cached(&mut self, height: Height) -> Result<()> {
self.percentile_cache
.all
.push(height, &mut self.all.metrics.cost_basis)?;
@@ -146,10 +146,60 @@ impl UTXOCohorts {
self.percentile_cache
.lth
.push(height, &mut self.lth.metrics.cost_basis)?;
Ok(())
push_profitability(
height,
&self.percentile_cache.profitability,
&mut self.profitability,
)
}
}
/// Convert raw (cents × sats) accumulator to Dollars (÷ 100 for cents→dollars, ÷ 1e8 for sats).
#[inline]
fn raw_usd_to_dollars(raw: u128) -> Dollars {
Dollars::from(raw as f64 / 1e10)
}
/// Push profitability range + profit/loss aggregate values to vecs.
fn push_profitability(
height: Height,
buckets: &[(u64, u128); PROFITABILITY_RANGE_COUNT],
metrics: &mut ProfitabilityMetrics,
) -> Result<()> {
// Push 25 range buckets
for (i, bucket) in metrics.range.as_array_mut().into_iter().enumerate() {
let (sats, usd_raw) = buckets[i];
bucket.truncate_push(height, Sats::from(sats), raw_usd_to_dollars(usd_raw))?;
}
// ByProfit: forward cumulative sum over ranges[0..15], pushed in reverse.
// profit[0] (breakeven) = sum(0..=14), ..., profit[14] (_1000pct) = ranges[0]
let profit_arr = metrics.profit.as_array_mut();
let mut cum_sats = 0u64;
let mut cum_usd = 0u128;
for i in 0..PROFIT_COUNT {
cum_sats += buckets[i].0;
cum_usd += buckets[i].1;
profit_arr[PROFIT_COUNT - 1 - i]
.truncate_push(height, Sats::from(cum_sats), raw_usd_to_dollars(cum_usd))?;
}
// ByLoss: backward cumulative sum over ranges[15..25], pushed in reverse.
// loss[0] (breakeven) = sum(15..=24), ..., loss[9] (_90pct) = ranges[24]
let loss_arr = metrics.loss.as_array_mut();
let loss_count = loss_arr.len();
cum_sats = 0;
cum_usd = 0;
for i in 0..loss_count {
cum_sats += buckets[PROFITABILITY_RANGE_COUNT - 1 - i].0;
cum_usd += buckets[PROFITABILITY_RANGE_COUNT - 1 - i].1;
loss_arr[loss_count - 1 - i]
.truncate_push(height, Sats::from(cum_sats), raw_usd_to_dollars(cum_usd))?;
}
Ok(())
}
fn write_distribution(
states_path: &Path,
name: &str,
@@ -166,9 +216,12 @@ fn write_distribution(
}
/// K-way merge via BinaryHeap over BTreeMap iterators.
/// Also accumulates profitability buckets for the "all" target using cursor approach.
fn merge_k_way(
maps: &[(&std::collections::BTreeMap<CentsCompact, Sats>, bool)],
targets: &mut AllSthLth<PercTarget>,
boundaries: &[Cents; PROFITABILITY_BOUNDARY_COUNT],
prof: &mut [(u64, u128); PROFITABILITY_RANGE_COUNT],
collect_merged: bool,
) {
let mut iters: Vec<_> = maps
@@ -185,36 +238,40 @@ fn merge_k_way(
}
let mut current_price: Option<CentsCompact> = None;
let mut early_exit = false;
let mut boundary_idx = 0usize;
while let Some(Reverse((price, ci))) = heap.pop() {
let (ref mut iter, is_sth) = iters[ci];
let (_, &sats) = iter.next().unwrap();
let amount = u64::from(sats);
let usd = Cents::from(price).as_u128() * amount as u128;
let price_cents = Cents::from(price);
let usd = price_cents.as_u128() * amount as u128;
if let Some(prev) = current_price
&& prev != price
{
targets.for_each_mut(|t| t.finalize_price(prev.into(), collect_merged));
if !collect_merged && targets.all_match(|t| t.done()) {
early_exit = true;
break;
}
}
current_price = Some(price);
targets.all.accumulate(amount, usd);
targets.term_mut(is_sth).accumulate(amount, usd);
// Profitability: advance cursor past boundaries (prices are ascending)
while boundary_idx < PROFITABILITY_BOUNDARY_COUNT
&& price_cents >= boundaries[boundary_idx]
{
boundary_idx += 1;
}
prof[boundary_idx].0 += amount;
prof[boundary_idx].1 += usd;
if let Some(&(&next_price, _)) = iter.peek() {
heap.push(Reverse((next_price, ci)));
}
}
if !early_exit
&& let Some(price) = current_price
{
if let Some(price) = current_price {
targets.for_each_mut(|t| t.finalize_price(price.into(), collect_merged));
}
}
@@ -254,9 +311,6 @@ impl<T> AllSthLth<T> {
f(&mut self.lth);
}
fn all_match(&self, mut f: impl FnMut(&T) -> bool) -> bool {
f(&self.all) && f(&self.sth) && f(&self.lth)
}
}
struct PercTarget {
@@ -362,8 +416,4 @@ impl PercTarget {
self.price_usd = 0;
}
fn done(&self) -> bool {
(self.total_sats == 0 || self.sat_idx >= PERCENTILES_LEN)
&& (self.total_usd == 0 || self.usd_idx >= PERCENTILES_LEN)
}
}

View File

@@ -438,6 +438,7 @@ pub(crate) fn process_blocks(
vecs.utxo_cohorts.truncate_push_aggregate_percentiles(
height,
block_price,
date_opt,
&vecs.states_path,
)?;

View File

@@ -10,10 +10,10 @@ use crate::{
indexes,
internal::{
CentsType, ComputedFromHeight, ComputedFromHeightCumulative,
ComputedFromHeightCumulativeSum, ComputedFromHeightRatio, FiatFromHeight, NumericValue,
PercentFromHeight, PercentRollingWindows, Price, RollingDelta1m, RollingDeltaExcept1m,
RollingWindow24h, RollingWindows, RollingWindowsFrom1w,
ValueFromHeight, ValueFromHeightCumulative,
ComputedFromHeightCumulativeSum, ComputedFromHeightRatio, FiatFromHeight,
FiatRollingDelta1m, FiatRollingDeltaExcept1m, NumericValue, PercentFromHeight,
PercentRollingWindows, Price, RollingDelta1m, RollingDeltaExcept1m, RollingWindow24h,
RollingWindows, RollingWindowsFrom1w, ValueFromHeight, ValueFromHeightCumulative,
},
};
@@ -96,6 +96,16 @@ impl<S: NumericValue + JsonSchema, C: NumericValue + JsonSchema> ConfigImport
Self::forced_import(cfg.db, &cfg.name(suffix), cfg.version + offset, cfg.indexes)
}
}
impl<S: NumericValue + JsonSchema, C: CentsType> ConfigImport for FiatRollingDelta1m<S, C> {
fn config_import(cfg: &ImportConfig, suffix: &str, offset: Version) -> Result<Self> {
Self::forced_import(cfg.db, &cfg.name(suffix), cfg.version + offset, cfg.indexes)
}
}
impl<S: NumericValue + JsonSchema, C: CentsType> ConfigImport for FiatRollingDeltaExcept1m<S, C> {
fn config_import(cfg: &ImportConfig, suffix: &str, offset: Version) -> Result<Self> {
Self::forced_import(cfg.db, &cfg.name(suffix), cfg.version + offset, cfg.indexes)
}
}
impl<T: BytesVecValue> ConfigImport for BytesVec<Height, T> {
fn config_import(cfg: &ImportConfig, suffix: &str, offset: Version) -> Result<Self> {
Ok(Self::forced_import(

View File

@@ -36,6 +36,7 @@ mod cohort;
mod config;
mod cost_basis;
mod outputs;
mod profitability;
mod realized;
mod relative;
mod supply;
@@ -48,6 +49,7 @@ pub use cohort::{
};
pub use config::ImportConfig;
pub use cost_basis::CostBasis;
pub use profitability::ProfitabilityMetrics;
pub use outputs::OutputsMetrics;
pub use realized::{
RealizedAdjusted, RealizedBase, RealizedCore, RealizedFull, RealizedLike, RealizedMinimal,

View File

@@ -0,0 +1,124 @@
use brk_cohort::{ByLoss, ByProfit, ByProfitabilityRange};
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Dollars, Height, Sats, Version};
use vecdb::{AnyStoredVec, AnyVec, Database, Rw, StorageMode, WritableVec};
use crate::{indexes, internal::ComputedFromHeight};
/// Supply + realized cap for a single profitability bucket.
#[derive(Traversable)]
pub struct ProfitabilityBucket<M: StorageMode = Rw> {
pub supply: ComputedFromHeight<Sats, M>,
pub realized_cap: ComputedFromHeight<Dollars, M>,
}
impl<M: StorageMode> ProfitabilityBucket<M> {
fn min_len(&self) -> usize {
self.supply.height.len().min(self.realized_cap.height.len())
}
}
impl ProfitabilityBucket {
fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self {
supply: ComputedFromHeight::forced_import(
db,
&format!("{name}_supply"),
version,
indexes,
)?,
realized_cap: ComputedFromHeight::forced_import(
db,
&format!("{name}_realized_cap"),
version,
indexes,
)?,
})
}
pub(crate) fn truncate_push(
&mut self,
height: Height,
supply: Sats,
realized_cap: Dollars,
) -> Result<()> {
self.supply.height.truncate_push(height, supply)?;
self.realized_cap
.height
.truncate_push(height, realized_cap)?;
Ok(())
}
pub(crate) fn collect_all_vecs_mut(&mut self) -> Vec<&mut dyn AnyStoredVec> {
vec![
&mut self.supply.height as &mut dyn AnyStoredVec,
&mut self.realized_cap.height as &mut dyn AnyStoredVec,
]
}
}
/// All profitability metrics: 25 ranges + 15 profit thresholds + 10 loss thresholds.
#[derive(Traversable)]
pub struct ProfitabilityMetrics<M: StorageMode = Rw> {
pub range: ByProfitabilityRange<ProfitabilityBucket<M>>,
pub profit: ByProfit<ProfitabilityBucket<M>>,
pub loss: ByLoss<ProfitabilityBucket<M>>,
}
impl<M: StorageMode> ProfitabilityMetrics<M> {
pub(crate) fn min_stateful_height_len(&self) -> usize {
self.range.iter()
.chain(self.profit.iter())
.chain(self.loss.iter())
.map(|b| b.min_len())
.min()
.unwrap_or(0)
}
}
impl ProfitabilityMetrics {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
let range = ByProfitabilityRange::try_new(|name| {
ProfitabilityBucket::forced_import(db, name, version, indexes)
})?;
let profit = ByProfit::try_new(|name| {
ProfitabilityBucket::forced_import(db, name, version, indexes)
})?;
let loss = ByLoss::try_new(|name| {
ProfitabilityBucket::forced_import(db, name, version, indexes)
})?;
Ok(Self {
range,
profit,
loss,
})
}
pub(crate) fn collect_all_vecs_mut(&mut self) -> Vec<&mut dyn AnyStoredVec> {
let mut vecs = Vec::new();
for bucket in self.range.iter_mut() {
vecs.extend(bucket.collect_all_vecs_mut());
}
for bucket in self.profit.iter_mut() {
vecs.extend(bucket.collect_all_vecs_mut());
}
for bucket in self.loss.iter_mut() {
vecs.extend(bucket.collect_all_vecs_mut());
}
vecs
}
}

View File

@@ -10,8 +10,8 @@ use crate::{
blocks,
distribution::state::RealizedOps,
internal::{
ComputedFromHeight, LazyFromHeight, NegCentsUnsignedToDollars, RatioCents64,
RollingDelta1m, RollingWindow24h,
ComputedFromHeight, FiatRollingDelta1m, LazyFromHeight, NegCentsUnsignedToDollars,
RatioCents64, RollingWindow24h,
},
prices,
};
@@ -27,7 +27,7 @@ pub struct RealizedCore<M: StorageMode = Rw> {
#[traversable(flatten)]
pub minimal: RealizedMinimal<M>,
pub realized_cap_delta: RollingDelta1m<Cents, CentsSigned, M>,
pub realized_cap_delta: FiatRollingDelta1m<Cents, CentsSigned, M>,
pub neg_realized_loss: LazyFromHeight<Dollars, Cents>,
pub net_realized_pnl: ComputedFromHeight<CentsSigned, M>,

View File

@@ -14,12 +14,12 @@ use crate::{
blocks,
distribution::state::RealizedState,
internal::{
CentsUnsignedToDollars, ComputedFromHeight, ComputedFromHeightCumulative, FiatFromHeight,
CentsUnsignedToDollars, ComputedFromHeight, ComputedFromHeightCumulative,
ComputedFromHeightRatio, ComputedFromHeightRatioPercentiles,
ComputedFromHeightRatioStdDevBands, LazyFromHeight, PercentFromHeight,
PercentRollingWindows, Price, RatioCents64, RatioCentsBp32,
RatioCentsSignedCentsBps32, RatioCentsSignedDollarsBps32, RatioDollarsBp32,
RollingDelta1m, RollingDeltaExcept1m, RollingWindows, RollingWindowsFrom1w,
ComputedFromHeightRatioStdDevBands, FiatFromHeight, FiatRollingDelta1m,
FiatRollingDeltaExcept1m, LazyFromHeight, PercentFromHeight, PercentRollingWindows, Price,
RatioCents64, RatioCentsBp32, RatioCentsSignedCentsBps32, RatioCentsSignedDollarsBps32,
RatioDollarsBp32, RollingWindows, RollingWindowsFrom1w,
},
prices,
};
@@ -59,12 +59,12 @@ pub struct RealizedFull<M: StorageMode = Rw> {
pub net_realized_pnl_cumulative: ComputedFromHeight<CentsSigned, M>,
pub net_realized_pnl_sum_extended: RollingWindowsFrom1w<CentsSigned, M>,
pub net_pnl_delta: RollingDelta1m<CentsSigned, CentsSigned, M>,
pub net_pnl_delta_extended: RollingDeltaExcept1m<CentsSigned, CentsSigned, M>,
pub net_pnl_delta: FiatRollingDelta1m<CentsSigned, CentsSigned, M>,
pub net_pnl_delta_extended: FiatRollingDeltaExcept1m<CentsSigned, CentsSigned, M>,
pub net_pnl_change_1m_rel_to_realized_cap: PercentFromHeight<BasisPointsSigned32, M>,
pub net_pnl_change_1m_rel_to_market_cap: PercentFromHeight<BasisPointsSigned32, M>,
pub realized_cap_delta_extended: RollingDeltaExcept1m<Cents, CentsSigned, M>,
pub realized_cap_delta_extended: FiatRollingDeltaExcept1m<Cents, CentsSigned, M>,
pub investor_price: Price<ComputedFromHeight<Cents, M>>,
pub investor_price_ratio: ComputedFromHeightRatio<M>,
@@ -468,14 +468,14 @@ impl RealizedFull {
self.net_pnl_change_1m_rel_to_realized_cap
.compute_binary::<CentsSigned, Cents, RatioCentsSignedCentsBps32>(
starting_indexes.height,
&self.net_pnl_delta.change_1m.height,
&self.net_pnl_delta.change_1m.cents.height,
&self.base.core.minimal.realized_cap_cents.height,
exit,
)?;
self.net_pnl_change_1m_rel_to_market_cap
.compute_binary::<CentsSigned, Dollars, RatioCentsSignedDollarsBps32>(
starting_indexes.height,
&self.net_pnl_delta.change_1m.height,
&self.net_pnl_delta.change_1m.cents.height,
height_to_market_cap,
exit,
)?;