From b8e57f4788a83fd508cf138203ab1218ed207216 Mon Sep 17 00:00:00 2001 From: nym21 Date: Thu, 19 Mar 2026 22:21:27 +0100 Subject: [PATCH] global: snapshot part 1 --- .../src/distribution/metrics/cohort/all.rs | 19 +- .../src/distribution/metrics/cohort/basic.rs | 8 +- .../src/distribution/metrics/cohort/core.rs | 2 +- .../distribution/metrics/cohort/extended.rs | 20 +- .../distribution/metrics/cost_basis/mod.rs | 87 +++- .../src/distribution/metrics/mod.rs | 16 +- .../distribution/metrics/relative/for_all.rs | 4 +- .../metrics/relative/with_extended.rs | 6 +- .../distribution/metrics/unrealized/base.rs | 127 ------ .../distribution/metrics/unrealized/full.rs | 100 +++-- .../distribution/metrics/unrealized/mod.rs | 16 +- .../state/cost_basis/unrealized.rs | 26 +- .../src/internal/algo/aggregation.rs | 422 ------------------ crates/brk_computer/src/internal/algo/mod.rs | 2 - .../internal/per_block/computed/aggregated.rs | 85 +++- .../per_block/computed/distribution.rs | 314 +++++++++++-- 16 files changed, 555 insertions(+), 699 deletions(-) delete mode 100644 crates/brk_computer/src/distribution/metrics/unrealized/base.rs delete mode 100644 crates/brk_computer/src/internal/algo/aggregation.rs diff --git a/crates/brk_computer/src/distribution/metrics/cohort/all.rs b/crates/brk_computer/src/distribution/metrics/cohort/all.rs index 25e507ab3..0eb7b2187 100644 --- a/crates/brk_computer/src/distribution/metrics/cohort/all.rs +++ b/crates/brk_computer/src/distribution/metrics/cohort/all.rs @@ -11,7 +11,7 @@ use crate::{ blocks, distribution::metrics::{ ActivityFull, AdjustedSopr, CohortMetricsBase, CostBasis, ImportConfig, OutputsBase, - RealizedFull, RelativeForAll, SupplyCore, UnrealizedFull, UnrealizedLike, + RealizedFull, RelativeForAll, SupplyCore, UnrealizedFull, }, prices, }; @@ -136,6 +136,23 @@ impl AllCohortMetrics { exit, )?; + self.cost_basis.compute_prices( + starting_indexes, + &self.unrealized.invested_capital.in_profit.cents.height, + &self.unrealized.invested_capital.in_loss.cents.height, + &self.supply.in_profit.sats.height, + &self.supply.in_loss.sats.height, + &self.unrealized.investor_cap_in_profit_raw, + &self.unrealized.investor_cap_in_loss_raw, + exit, + )?; + + self.unrealized.compute_sentiment( + starting_indexes, + &prices.spot.cents.height, + exit, + )?; + self.relative.compute( starting_indexes.height, &self.supply, diff --git a/crates/brk_computer/src/distribution/metrics/cohort/basic.rs b/crates/brk_computer/src/distribution/metrics/cohort/basic.rs index 1e4b66aee..10397313d 100644 --- a/crates/brk_computer/src/distribution/metrics/cohort/basic.rs +++ b/crates/brk_computer/src/distribution/metrics/cohort/basic.rs @@ -7,7 +7,7 @@ use vecdb::{AnyStoredVec, Exit, ReadableVec, Rw, StorageMode}; use crate::{ distribution::metrics::{ ActivityCore, CohortMetricsBase, ImportConfig, OutputsBase, RealizedCore, RelativeToAll, - SupplyCore, UnrealizedBase, + SupplyCore, UnrealizedCore, }, prices, }; @@ -22,7 +22,7 @@ pub struct BasicCohortMetrics { pub outputs: Box>, pub activity: Box>, pub realized: Box>, - pub unrealized: Box>, + pub unrealized: Box>, #[traversable(flatten)] pub relative: Box>, } @@ -30,7 +30,7 @@ pub struct BasicCohortMetrics { impl CohortMetricsBase for BasicCohortMetrics { type ActivityVecs = ActivityCore; type RealizedVecs = RealizedCore; - type UnrealizedVecs = UnrealizedBase; + type UnrealizedVecs = UnrealizedCore; impl_cohort_accessors!(); @@ -48,7 +48,7 @@ impl CohortMetricsBase for BasicCohortMetrics { impl BasicCohortMetrics { pub(crate) fn forced_import(cfg: &ImportConfig) -> Result { let supply = SupplyCore::forced_import(cfg)?; - let unrealized = UnrealizedBase::forced_import(cfg)?; + let unrealized = UnrealizedCore::forced_import(cfg)?; let realized = RealizedCore::forced_import(cfg)?; let relative = RelativeToAll::forced_import(cfg)?; diff --git a/crates/brk_computer/src/distribution/metrics/cohort/core.rs b/crates/brk_computer/src/distribution/metrics/cohort/core.rs index 52df05068..08586fb10 100644 --- a/crates/brk_computer/src/distribution/metrics/cohort/core.rs +++ b/crates/brk_computer/src/distribution/metrics/cohort/core.rs @@ -92,7 +92,7 @@ impl CoreCohortMetrics { )?; self.unrealized.compute_from_stateful( starting_indexes, - &others.iter().map(|v| &v.unrealized_base().core).collect::>(), + &others.iter().map(|v| v.unrealized_core()).collect::>(), exit, )?; diff --git a/crates/brk_computer/src/distribution/metrics/cohort/extended.rs b/crates/brk_computer/src/distribution/metrics/cohort/extended.rs index dae5391df..74e2cbf9a 100644 --- a/crates/brk_computer/src/distribution/metrics/cohort/extended.rs +++ b/crates/brk_computer/src/distribution/metrics/cohort/extended.rs @@ -11,7 +11,7 @@ use crate::{ blocks, distribution::metrics::{ ActivityFull, CohortMetricsBase, CostBasis, ImportConfig, OutputsBase, RealizedFull, - RelativeWithExtended, SupplyCore, UnrealizedFull, UnrealizedLike, + RelativeWithExtended, SupplyCore, UnrealizedFull, }, prices, }; @@ -114,6 +114,23 @@ impl ExtendedCohortMetrics { exit, )?; + self.cost_basis.compute_prices( + starting_indexes, + &self.unrealized.invested_capital.in_profit.cents.height, + &self.unrealized.invested_capital.in_loss.cents.height, + &self.supply.in_profit.sats.height, + &self.supply.in_loss.sats.height, + &self.unrealized.investor_cap_in_profit_raw, + &self.unrealized.investor_cap_in_loss_raw, + exit, + )?; + + self.unrealized.compute_sentiment( + starting_indexes, + &prices.spot.cents.height, + exit, + )?; + self.relative.compute( starting_indexes.height, &self.supply, @@ -126,4 +143,5 @@ impl ExtendedCohortMetrics { Ok(()) } + } diff --git a/crates/brk_computer/src/distribution/metrics/cost_basis/mod.rs b/crates/brk_computer/src/distribution/metrics/cost_basis/mod.rs index e826ab7e8..0e4635844 100644 --- a/crates/brk_computer/src/distribution/metrics/cost_basis/mod.rs +++ b/crates/brk_computer/src/distribution/metrics/cost_basis/mod.rs @@ -1,16 +1,25 @@ use brk_error::Result; use brk_traversable::Traversable; -use brk_types::{BasisPoints16, Cents, Version}; -use vecdb::{AnyStoredVec, AnyVec, Rw, StorageMode, WritableVec}; +use brk_types::{BasisPoints16, Cents, Height, Indexes, Sats, Version}; +use brk_types::CentsSquaredSats; +use vecdb::{AnyStoredVec, AnyVec, Exit, ReadableVec, Rw, StorageMode, WritableVec}; -use crate::internal::{PerBlock, PercentPerBlock, PercentilesVecs, Price, PERCENTILES_LEN}; +use crate::internal::{FiatPerBlock, PerBlock, PercentPerBlock, PercentilesVecs, Price, PERCENTILES_LEN}; use super::ImportConfig; -/// Cost basis metrics: min/max + percentiles + supply density. +#[derive(Traversable)] +pub struct CostBasisSide { + pub per_coin: FiatPerBlock, + pub per_dollar: FiatPerBlock, +} + +/// Cost basis metrics: min/max + profit/loss splits + percentiles + supply density. /// Used by all/sth/lth cohorts only. #[derive(Traversable)] pub struct CostBasis { + pub in_profit: CostBasisSide, + pub in_loss: CostBasisSide, pub min: Price>, pub max: Price>, pub percentiles: PercentilesVecs, @@ -21,6 +30,14 @@ pub struct CostBasis { impl CostBasis { pub(crate) fn forced_import(cfg: &ImportConfig) -> Result { Ok(Self { + in_profit: CostBasisSide { + per_coin: cfg.import("cost_basis_in_profit_per_coin", Version::ZERO)?, + per_dollar: cfg.import("cost_basis_in_profit_per_dollar", Version::ZERO)?, + }, + in_loss: CostBasisSide { + per_coin: cfg.import("cost_basis_in_loss_per_coin", Version::ZERO)?, + per_dollar: cfg.import("cost_basis_in_loss_per_dollar", Version::ZERO)?, + }, min: cfg.import("cost_basis_min", Version::ZERO)?, max: cfg.import("cost_basis_max", Version::ZERO)?, percentiles: PercentilesVecs::forced_import( @@ -84,6 +101,10 @@ impl CostBasis { pub(crate) fn collect_vecs_mut(&mut self) -> Vec<&mut dyn AnyStoredVec> { let mut vecs: Vec<&mut dyn AnyStoredVec> = vec![ + &mut self.in_profit.per_coin.cents.height, + &mut self.in_profit.per_dollar.cents.height, + &mut self.in_loss.per_coin.cents.height, + &mut self.in_loss.per_dollar.cents.height, &mut self.min.cents.height, &mut self.max.cents.height, &mut self.supply_density.bps.height, @@ -102,4 +123,62 @@ impl CostBasis { ); vecs } + + pub(crate) fn compute_prices( + &mut self, + starting_indexes: &Indexes, + invested_cap_in_profit: &impl ReadableVec, + invested_cap_in_loss: &impl ReadableVec, + supply_in_profit_sats: &impl ReadableVec, + supply_in_loss_sats: &impl ReadableVec, + investor_cap_in_profit_raw: &impl ReadableVec, + investor_cap_in_loss_raw: &impl ReadableVec, + exit: &Exit, + ) -> Result<()> { + self.in_profit.per_coin.cents.height.compute_transform2( + starting_indexes.height, + invested_cap_in_profit, + supply_in_profit_sats, + |(h, invested_cents, supply_sats, ..)| { + let supply = supply_sats.as_u128(); + if supply == 0 { return (h, Cents::ZERO); } + (h, Cents::new((invested_cents.as_u128() * Sats::ONE_BTC_U128 / supply) as u64)) + }, + exit, + )?; + self.in_loss.per_coin.cents.height.compute_transform2( + starting_indexes.height, + invested_cap_in_loss, + supply_in_loss_sats, + |(h, invested_cents, supply_sats, ..)| { + let supply = supply_sats.as_u128(); + if supply == 0 { return (h, Cents::ZERO); } + (h, Cents::new((invested_cents.as_u128() * Sats::ONE_BTC_U128 / supply) as u64)) + }, + exit, + )?; + self.in_profit.per_dollar.cents.height.compute_transform2( + starting_indexes.height, + investor_cap_in_profit_raw, + invested_cap_in_profit, + |(h, investor_cap, invested_cents, ..)| { + let invested_raw = invested_cents.as_u128() * Sats::ONE_BTC_U128; + if invested_raw == 0 { return (h, Cents::ZERO); } + (h, Cents::new((investor_cap.inner() / invested_raw) as u64)) + }, + exit, + )?; + self.in_loss.per_dollar.cents.height.compute_transform2( + starting_indexes.height, + investor_cap_in_loss_raw, + invested_cap_in_loss, + |(h, investor_cap, invested_cents, ..)| { + let invested_raw = invested_cents.as_u128() * Sats::ONE_BTC_U128; + if invested_raw == 0 { return (h, Cents::ZERO); } + (h, Cents::new((investor_cap.inner() / invested_raw) as u64)) + }, + exit, + )?; + Ok(()) + } } diff --git a/crates/brk_computer/src/distribution/metrics/mod.rs b/crates/brk_computer/src/distribution/metrics/mod.rs index 8b261fc82..b3a91199f 100644 --- a/crates/brk_computer/src/distribution/metrics/mod.rs +++ b/crates/brk_computer/src/distribution/metrics/mod.rs @@ -125,7 +125,7 @@ pub use realized::{ pub use relative::{RelativeForAll, RelativeToAll, RelativeWithExtended}; pub use supply::{SupplyBase, SupplyCore}; pub use unrealized::{ - UnrealizedBase, UnrealizedBasic, UnrealizedCore, UnrealizedFull, UnrealizedLike, + UnrealizedBasic, UnrealizedCore, UnrealizedFull, UnrealizedLike, UnrealizedMinimal, }; @@ -212,12 +212,12 @@ pub trait CohortMetricsBase: self.realized_mut().as_core_mut() } - /// Convenience: access unrealized as `&UnrealizedBase` (via `UnrealizedLike::as_base`). - fn unrealized_base(&self) -> &UnrealizedBase { - self.unrealized().as_base() + /// Convenience: access unrealized as `&UnrealizedCore` (via `UnrealizedLike::as_core`). + fn unrealized_core(&self) -> &UnrealizedCore { + self.unrealized().as_core() } - fn unrealized_base_mut(&mut self) -> &mut UnrealizedBase { - self.unrealized_mut().as_base_mut() + fn unrealized_core_mut(&mut self) -> &mut UnrealizedCore { + self.unrealized_mut().as_core_mut() } fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> { @@ -314,11 +314,11 @@ pub trait CohortMetricsBase: &others.iter().map(|v| v.realized_core()).collect::>(), exit, )?; - self.unrealized_base_mut().compute_from_stateful( + self.unrealized_core_mut().compute_from_stateful( starting_indexes, &others .iter() - .map(|v| v.unrealized_base()) + .map(|v| v.unrealized_core()) .collect::>(), exit, )?; diff --git a/crates/brk_computer/src/distribution/metrics/relative/for_all.rs b/crates/brk_computer/src/distribution/metrics/relative/for_all.rs index 652d7bbeb..8a9566dc8 100644 --- a/crates/brk_computer/src/distribution/metrics/relative/for_all.rs +++ b/crates/brk_computer/src/distribution/metrics/relative/for_all.rs @@ -38,12 +38,12 @@ impl RelativeForAll { self.base.compute( max_from, supply, - &unrealized.inner.core.basic, + &unrealized.inner.basic, market_cap, exit, )?; self.extended_own_pnl - .compute(max_from, &unrealized.inner.core, &unrealized.gross_pnl.usd.height, exit)?; + .compute(max_from, &unrealized.inner, &unrealized.gross_pnl.usd.height, exit)?; Ok(()) } } diff --git a/crates/brk_computer/src/distribution/metrics/relative/with_extended.rs b/crates/brk_computer/src/distribution/metrics/relative/with_extended.rs index af52b7ce1..8d56c7bba 100644 --- a/crates/brk_computer/src/distribution/metrics/relative/with_extended.rs +++ b/crates/brk_computer/src/distribution/metrics/relative/with_extended.rs @@ -48,7 +48,7 @@ impl RelativeWithExtended { self.base.compute( max_from, supply, - &unrealized.inner.core.basic, + &unrealized.inner.basic, market_cap, exit, )?; @@ -59,9 +59,9 @@ impl RelativeWithExtended { exit, )?; self.extended_own_market_cap - .compute(max_from, &unrealized.inner.core, own_market_cap, exit)?; + .compute(max_from, &unrealized.inner, own_market_cap, exit)?; self.extended_own_pnl - .compute(max_from, &unrealized.inner.core, &unrealized.gross_pnl.usd.height, exit)?; + .compute(max_from, &unrealized.inner, &unrealized.gross_pnl.usd.height, exit)?; Ok(()) } } diff --git a/crates/brk_computer/src/distribution/metrics/unrealized/base.rs b/crates/brk_computer/src/distribution/metrics/unrealized/base.rs deleted file mode 100644 index 678b77adc..000000000 --- a/crates/brk_computer/src/distribution/metrics/unrealized/base.rs +++ /dev/null @@ -1,127 +0,0 @@ -use brk_error::Result; -use brk_traversable::Traversable; -use brk_types::{CentsSquaredSats, Height, Indexes, Version}; -use derive_more::{Deref, DerefMut}; -use vecdb::{AnyStoredVec, AnyVec, BytesVec, Exit, ReadableVec, Rw, StorageMode, WritableVec}; - -use crate::distribution::{metrics::ImportConfig, state::UnrealizedState}; - -use super::UnrealizedCore; - -#[derive(Deref, DerefMut, Traversable)] -pub struct UnrealizedBase { - #[deref] - #[deref_mut] - #[traversable(flatten)] - pub core: UnrealizedCore, - - #[traversable(hidden)] - pub investor_cap_in_profit_raw: M::Stored>, - #[traversable(hidden)] - pub investor_cap_in_loss_raw: M::Stored>, -} - -impl UnrealizedBase { - pub(crate) fn forced_import(cfg: &ImportConfig) -> Result { - let v0 = Version::ZERO; - - let core = UnrealizedCore::forced_import(cfg)?; - - let investor_cap_in_profit_raw = cfg.import("investor_cap_in_profit_raw", v0)?; - let investor_cap_in_loss_raw = cfg.import("investor_cap_in_loss_raw", v0)?; - - Ok(Self { - core, - investor_cap_in_profit_raw, - investor_cap_in_loss_raw, - }) - } - - pub(crate) fn min_stateful_len(&self) -> usize { - self.core - .min_stateful_len() - .min(self.investor_cap_in_profit_raw.len()) - .min(self.investor_cap_in_loss_raw.len()) - } - - #[inline(always)] - pub(crate) fn push_state(&mut self, state: &UnrealizedState) { - self.core.push_state(state); - - self.investor_cap_in_profit_raw - .push(CentsSquaredSats::new(state.investor_cap_in_profit_raw)); - self.investor_cap_in_loss_raw - .push(CentsSquaredSats::new(state.investor_cap_in_loss_raw)); - } - - pub(crate) fn collect_vecs_mut(&mut self) -> Vec<&mut dyn AnyStoredVec> { - let mut vecs = self.core.collect_vecs_mut(); - vecs.push(&mut self.investor_cap_in_profit_raw as &mut dyn AnyStoredVec); - vecs.push(&mut self.investor_cap_in_loss_raw as &mut dyn AnyStoredVec); - vecs - } - - pub(crate) fn compute_from_stateful( - &mut self, - starting_indexes: &Indexes, - others: &[&Self], - exit: &Exit, - ) -> Result<()> { - let core_refs: Vec<&UnrealizedCore> = - others.iter().map(|o| &o.core).collect(); - self.core - .compute_from_stateful(starting_indexes, &core_refs, exit)?; - - let start = self - .investor_cap_in_profit_raw - .len() - .min(self.investor_cap_in_loss_raw.len()); - let end = others - .iter() - .map(|o| o.investor_cap_in_profit_raw.len()) - .min() - .unwrap_or(0); - - let investor_profit_ranges: Vec> = others - .iter() - .map(|o| o.investor_cap_in_profit_raw.collect_range_at(start, end)) - .collect(); - let investor_loss_ranges: Vec> = others - .iter() - .map(|o| o.investor_cap_in_loss_raw.collect_range_at(start, end)) - .collect(); - - self.investor_cap_in_profit_raw - .truncate_if_needed_at(start)?; - self.investor_cap_in_loss_raw - .truncate_if_needed_at(start)?; - - for i in start..end { - let local_i = i - start; - - let mut sum_investor_profit = CentsSquaredSats::ZERO; - let mut sum_investor_loss = CentsSquaredSats::ZERO; - - for idx in 0..others.len() { - sum_investor_profit += investor_profit_ranges[idx][local_i]; - sum_investor_loss += investor_loss_ranges[idx][local_i]; - } - - self.investor_cap_in_profit_raw - .push(sum_investor_profit); - self.investor_cap_in_loss_raw - .push(sum_investor_loss); - } - - Ok(()) - } - - pub(crate) fn compute_rest( - &mut self, - starting_indexes: &Indexes, - exit: &Exit, - ) -> Result<()> { - self.core.compute_rest(starting_indexes, exit)?; - Ok(()) - } -} diff --git a/crates/brk_computer/src/distribution/metrics/unrealized/full.rs b/crates/brk_computer/src/distribution/metrics/unrealized/full.rs index 9c5599e5f..c5929d749 100644 --- a/crates/brk_computer/src/distribution/metrics/unrealized/full.rs +++ b/crates/brk_computer/src/distribution/metrics/unrealized/full.rs @@ -1,14 +1,14 @@ use brk_error::Result; use brk_traversable::Traversable; -use brk_types::{Cents, CentsSigned, Height, Indexes, Sats, Version}; +use brk_types::{Cents, CentsSigned, CentsSquaredSats, Height, Indexes, Sats, Version}; use derive_more::{Deref, DerefMut}; -use vecdb::{AnyStoredVec, Exit, ReadableVec, Rw, StorageMode}; +use vecdb::{AnyStoredVec, AnyVec, BytesVec, Exit, ReadableVec, Rw, StorageMode, WritableVec}; use crate::distribution::state::UnrealizedState; use crate::internal::{CentsSubtractToCentsSigned, FiatPerBlock}; use crate::{distribution::metrics::ImportConfig, prices}; -use super::UnrealizedBase; +use super::UnrealizedCore; #[derive(Traversable)] pub struct UnrealizedSentiment { @@ -28,43 +28,64 @@ pub struct UnrealizedFull { #[deref] #[deref_mut] #[traversable(flatten)] - pub inner: UnrealizedBase, + pub inner: UnrealizedCore, pub gross_pnl: FiatPerBlock, pub invested_capital: UnrealizedInvestedCapital, + #[traversable(hidden)] + pub investor_cap_in_profit_raw: M::Stored>, + #[traversable(hidden)] + pub investor_cap_in_loss_raw: M::Stored>, + pub sentiment: UnrealizedSentiment, } impl UnrealizedFull { pub(crate) fn forced_import(cfg: &ImportConfig) -> Result { let v0 = Version::ZERO; - let inner = UnrealizedBase::forced_import(cfg)?; + let inner = UnrealizedCore::forced_import(cfg)?; let gross_pnl = cfg.import("unrealized_gross_pnl", v0)?; + let invested_capital = UnrealizedInvestedCapital { + in_profit: cfg.import("invested_capital_in_profit", v0)?, + in_loss: cfg.import("invested_capital_in_loss", v0)?, + }; + + let investor_cap_in_profit_raw = cfg.import("investor_cap_in_profit_raw", v0)?; + let investor_cap_in_loss_raw = cfg.import("investor_cap_in_loss_raw", v0)?; + let sentiment = UnrealizedSentiment { pain_index: cfg.import("pain_index", v0)?, greed_index: cfg.import("greed_index", v0)?, net: cfg.import("net_sentiment", Version::ONE)?, }; - let invested_capital = UnrealizedInvestedCapital { - in_profit: cfg.import("invested_capital_in_profit", v0)?, - in_loss: cfg.import("invested_capital_in_loss", v0)?, - }; - Ok(Self { inner, gross_pnl, invested_capital, + investor_cap_in_profit_raw, + investor_cap_in_loss_raw, sentiment, }) } + pub(crate) fn min_stateful_len(&self) -> usize { + self.inner + .min_stateful_len() + .min(self.investor_cap_in_profit_raw.len()) + .min(self.investor_cap_in_loss_raw.len()) + } + #[inline(always)] pub(crate) fn push_state_all(&mut self, state: &UnrealizedState) { self.inner.push_state(state); + self.investor_cap_in_profit_raw + .push(CentsSquaredSats::new(state.investor_cap_in_profit_raw)); + self.investor_cap_in_loss_raw + .push(CentsSquaredSats::new(state.investor_cap_in_loss_raw)); } pub(crate) fn collect_vecs_mut(&mut self) -> Vec<&mut dyn AnyStoredVec> { @@ -72,6 +93,8 @@ impl UnrealizedFull { vecs.push(&mut self.gross_pnl.cents.height as &mut dyn AnyStoredVec); vecs.push(&mut self.invested_capital.in_profit.cents.height as &mut dyn AnyStoredVec); vecs.push(&mut self.invested_capital.in_loss.cents.height as &mut dyn AnyStoredVec); + vecs.push(&mut self.investor_cap_in_profit_raw as &mut dyn AnyStoredVec); + vecs.push(&mut self.investor_cap_in_loss_raw as &mut dyn AnyStoredVec); vecs.push(&mut self.sentiment.pain_index.cents.height as &mut dyn AnyStoredVec); vecs.push(&mut self.sentiment.greed_index.cents.height as &mut dyn AnyStoredVec); vecs.push(&mut self.sentiment.net.cents.height as &mut dyn AnyStoredVec); @@ -88,10 +111,11 @@ impl UnrealizedFull { ) -> Result<()> { self.inner.compute_rest(starting_indexes, exit)?; + // gross_pnl = profit + loss self.gross_pnl.cents.height.compute_add( starting_indexes.height, - &self.inner.core.basic.profit.cents.height, - &self.inner.core.basic.loss.cents.height, + &self.inner.basic.profit.cents.height, + &self.inner.basic.loss.cents.height, exit, )?; @@ -100,8 +124,8 @@ impl UnrealizedFull { starting_indexes.height, supply_in_profit_sats, &prices.spot.cents.height, - &self.inner.core.basic.profit.cents.height, - |(h, supply_sats, spot, profit, ..)| { + &self.inner.basic.profit.cents.height, + |(h, supply_sats, spot, profit, ..): (_, Sats, Cents, Cents, _)| { let market_value = supply_sats.as_u128() * spot.as_u128() / Sats::ONE_BTC_U128; (h, Cents::new(market_value.saturating_sub(profit.as_u128()) as u64)) }, @@ -113,44 +137,33 @@ impl UnrealizedFull { starting_indexes.height, supply_in_loss_sats, &prices.spot.cents.height, - &self.inner.core.basic.loss.cents.height, - |(h, supply_sats, spot, loss, ..)| { + &self.inner.basic.loss.cents.height, + |(h, supply_sats, spot, loss, ..): (_, Sats, Cents, Cents, _)| { let market_value = supply_sats.as_u128() * spot.as_u128() / Sats::ONE_BTC_U128; (h, Cents::new((market_value + loss.as_u128()) as u64)) }, exit, )?; - self.compute_rest_extended(prices, starting_indexes, exit)?; - - self.sentiment - .net - .cents - .height - .compute_binary::( - starting_indexes.height, - &self.sentiment.greed_index.cents.height, - &self.sentiment.pain_index.cents.height, - exit, - )?; - Ok(()) } - fn compute_rest_extended( + /// Compute sentiment using investor_price (original formula). + /// Called after cost_basis.in_profit/loss are computed at the cohort level. + pub(crate) fn compute_sentiment( &mut self, - prices: &prices::Vecs, starting_indexes: &Indexes, + spot: &impl ReadableVec, exit: &Exit, ) -> Result<()> { // greed = spot - investor_price_winners - // investor_price = investor_cap / invested_cap (both in CentsSats) - // invested_cap is now in Cents (already divided by ONE_BTC), so multiply back + // investor_price = investor_cap / invested_cap + // invested_cap is in Cents (already / ONE_BTC), multiply back for CentsSats scale self.sentiment.greed_index.cents.height.compute_transform3( starting_indexes.height, - &self.inner.investor_cap_in_profit_raw, + &self.investor_cap_in_profit_raw, &self.invested_capital.in_profit.cents.height, - &prices.spot.cents.height, + spot, |(h, investor_cap, invested_cap_cents, spot, ..)| { let invested_cap_raw = invested_cap_cents.as_u128() * Sats::ONE_BTC_U128; if invested_cap_raw == 0 { @@ -166,9 +179,9 @@ impl UnrealizedFull { // pain = investor_price_losers - spot self.sentiment.pain_index.cents.height.compute_transform3( starting_indexes.height, - &self.inner.investor_cap_in_loss_raw, + &self.investor_cap_in_loss_raw, &self.invested_capital.in_loss.cents.height, - &prices.spot.cents.height, + spot, |(h, investor_cap, invested_cap_cents, spot, ..)| { let invested_cap_raw = invested_cap_cents.as_u128() * Sats::ONE_BTC_U128; if invested_cap_raw == 0 { @@ -181,7 +194,18 @@ impl UnrealizedFull { exit, )?; + // net = greed - pain + self.sentiment + .net + .cents + .height + .compute_binary::( + starting_indexes.height, + &self.sentiment.greed_index.cents.height, + &self.sentiment.pain_index.cents.height, + exit, + )?; + Ok(()) } - } diff --git a/crates/brk_computer/src/distribution/metrics/unrealized/mod.rs b/crates/brk_computer/src/distribution/metrics/unrealized/mod.rs index 4c59616e3..a3e850006 100644 --- a/crates/brk_computer/src/distribution/metrics/unrealized/mod.rs +++ b/crates/brk_computer/src/distribution/metrics/unrealized/mod.rs @@ -1,11 +1,9 @@ -mod base; mod basic; mod core; mod full; mod minimal; pub use self::core::UnrealizedCore; -pub use base::UnrealizedBase; pub use basic::UnrealizedBasic; pub use full::UnrealizedFull; pub use minimal::UnrealizedMinimal; @@ -17,8 +15,8 @@ use vecdb::{Exit, ReadableVec}; use crate::{distribution::state::UnrealizedState, prices}; pub trait UnrealizedLike: Send + Sync { - fn as_base(&self) -> &UnrealizedBase; - fn as_base_mut(&mut self) -> &mut UnrealizedBase; + fn as_core(&self) -> &UnrealizedCore; + fn as_core_mut(&mut self) -> &mut UnrealizedCore; fn min_stateful_len(&self) -> usize; fn push_state(&mut self, state: &UnrealizedState); fn compute_rest( @@ -31,11 +29,11 @@ pub trait UnrealizedLike: Send + Sync { ) -> Result<()>; } -impl UnrealizedLike for UnrealizedBase { - fn as_base(&self) -> &UnrealizedBase { +impl UnrealizedLike for UnrealizedCore { + fn as_core(&self) -> &UnrealizedCore { self } - fn as_base_mut(&mut self) -> &mut UnrealizedBase { + fn as_core_mut(&mut self) -> &mut UnrealizedCore { self } fn min_stateful_len(&self) -> usize { @@ -58,10 +56,10 @@ impl UnrealizedLike for UnrealizedBase { } impl UnrealizedLike for UnrealizedFull { - fn as_base(&self) -> &UnrealizedBase { + fn as_core(&self) -> &UnrealizedCore { &self.inner } - fn as_base_mut(&mut self) -> &mut UnrealizedBase { + fn as_core_mut(&mut self) -> &mut UnrealizedCore { &mut self.inner } fn min_stateful_len(&self) -> usize { diff --git a/crates/brk_computer/src/distribution/state/cost_basis/unrealized.rs b/crates/brk_computer/src/distribution/state/cost_basis/unrealized.rs index 4f7b4f51b..7e884de54 100644 --- a/crates/brk_computer/src/distribution/state/cost_basis/unrealized.rs +++ b/crates/brk_computer/src/distribution/state/cost_basis/unrealized.rs @@ -10,9 +10,7 @@ pub struct UnrealizedState { pub supply_in_loss: Sats, pub unrealized_profit: Cents, pub unrealized_loss: Cents, - /// Raw Σ(price² × sats) for UTXOs in profit. Used for aggregation. pub investor_cap_in_profit_raw: u128, - /// Raw Σ(price² × sats) for UTXOs in loss. Used for aggregation. pub investor_cap_in_loss_raw: u128, } @@ -36,12 +34,10 @@ pub struct WithoutCapital { pub(crate) unrealized_loss: u128, } -/// Full cache state: core + invested capital + investor cap (128 bytes, 2 cache lines). +/// Full cache state: core + investor cap (for sentiment computation). #[derive(Debug, Default, Clone)] pub struct WithCapital { core: WithoutCapital, - invested_capital_in_profit: u128, - invested_capital_in_loss: u128, investor_cap_in_profit: u128, investor_cap_in_loss: u128, } @@ -120,30 +116,26 @@ impl Accumulate for WithCapital { #[inline(always)] fn accumulate_profit(&mut self, price_u128: u128, sats: Sats) { self.core.supply_in_profit += sats; - let invested_capital = price_u128 * sats.as_u128(); - self.invested_capital_in_profit += invested_capital; - self.investor_cap_in_profit += price_u128 * invested_capital; + let invested = price_u128 * sats.as_u128(); + self.investor_cap_in_profit += price_u128 * invested; } #[inline(always)] fn accumulate_loss(&mut self, price_u128: u128, sats: Sats) { self.core.supply_in_loss += sats; - let invested_capital = price_u128 * sats.as_u128(); - self.invested_capital_in_loss += invested_capital; - self.investor_cap_in_loss += price_u128 * invested_capital; + let invested = price_u128 * sats.as_u128(); + self.investor_cap_in_loss += price_u128 * invested; } #[inline(always)] fn deaccumulate_profit(&mut self, price_u128: u128, sats: Sats) { self.core.supply_in_profit -= sats; - let invested_capital = price_u128 * sats.as_u128(); - self.invested_capital_in_profit -= invested_capital; - self.investor_cap_in_profit -= price_u128 * invested_capital; + let invested = price_u128 * sats.as_u128(); + self.investor_cap_in_profit -= price_u128 * invested; } #[inline(always)] fn deaccumulate_loss(&mut self, price_u128: u128, sats: Sats) { self.core.supply_in_loss -= sats; - let invested_capital = price_u128 * sats.as_u128(); - self.invested_capital_in_loss -= invested_capital; - self.investor_cap_in_loss -= price_u128 * invested_capital; + let invested = price_u128 * sats.as_u128(); + self.investor_cap_in_loss -= price_u128 * invested; } } diff --git a/crates/brk_computer/src/internal/algo/aggregation.rs b/crates/brk_computer/src/internal/algo/aggregation.rs deleted file mode 100644 index 98d523195..000000000 --- a/crates/brk_computer/src/internal/algo/aggregation.rs +++ /dev/null @@ -1,422 +0,0 @@ -use std::collections::VecDeque; - -use brk_error::Result; -use brk_types::{CheckedSub, StoredU64, get_percentile}; -use schemars::JsonSchema; -use vecdb::{ - AnyStoredVec, AnyVec, EagerVec, Exit, PcoVec, ReadableVec, VecIndex, VecValue, WritableVec, -}; - -use crate::internal::ComputedVecValue; - -fn validate_and_start( - vec: &mut EagerVec>, - combined_version: vecdb::Version, - current_start: I, -) -> Result { - vec.validate_computed_version_or_reset(combined_version)?; - Ok(current_start.min(I::from(vec.len()))) -} - -#[allow(clippy::too_many_arguments)] -pub(crate) fn compute_aggregations( - max_from: I, - source: &impl ReadableVec, - first_indexes: &impl ReadableVec, - count_indexes: &impl ReadableVec, - exit: &Exit, - skip_count: usize, - mut min: Option<&mut EagerVec>>, - mut max: Option<&mut EagerVec>>, - mut sum: Option<&mut EagerVec>>, - mut cumulative: Option<&mut EagerVec>>, - mut median: Option<&mut EagerVec>>, - mut pct10: Option<&mut EagerVec>>, - mut pct25: Option<&mut EagerVec>>, - mut pct75: Option<&mut EagerVec>>, - mut pct90: Option<&mut EagerVec>>, -) -> Result<()> -where - I: VecIndex, - T: ComputedVecValue + JsonSchema, - A: VecIndex + VecValue + CheckedSub, -{ - let combined_version = source.version() + first_indexes.version() + count_indexes.version(); - - macro_rules! validate_vec { - ($($vec:ident),*) => {{ - let mut idx = max_from; - $(if let Some(ref mut v) = $vec { - idx = validate_and_start(v, combined_version, idx)?; - })* - idx - }}; - } - - let index = validate_vec!( - min, max, sum, cumulative, median, pct10, pct25, pct75, pct90 - ); - - let needs_min = min.is_some(); - let needs_max = max.is_some(); - let needs_sum = sum.is_some(); - let needs_cumulative = cumulative.is_some(); - let needs_percentiles = median.is_some() - || pct10.is_some() - || pct25.is_some() - || pct75.is_some() - || pct90.is_some(); - let needs_minmax = needs_min || needs_max; - let needs_sum_or_cumulative = needs_sum || needs_cumulative; - - if !needs_minmax && !needs_sum_or_cumulative && !needs_percentiles { - return Ok(()); - } - - let mut cumulative_val = cumulative.as_ref().map(|cumulative_vec| { - index.decremented().map_or(T::from(0_usize), |idx| { - cumulative_vec - .collect_one_at(idx.to_usize()) - .unwrap_or(T::from(0_usize)) - }) - }); - - let start = index.to_usize(); - - // Truncate all vecs to start once, so the loop only pushes - macro_rules! truncate_vec { - ($($vec:ident),*) => { - $(if let Some(ref mut v) = $vec { - v.truncate_if_needed_at(start)?; - })* - }; - } - truncate_vec!( - min, max, sum, cumulative, median, pct10, pct25, pct75, pct90 - ); - - let fi_len = first_indexes.len(); - let first_indexes_batch: Vec = first_indexes.collect_range_at(start, fi_len); - let count_indexes_batch: Vec = count_indexes.collect_range_at(start, fi_len); - - let mut values: Vec = Vec::new(); - - first_indexes_batch - .into_iter() - .zip(count_indexes_batch) - .enumerate() - .try_for_each(|(_, (first_index, count_index))| -> Result<()> { - let count = u64::from(count_index) as usize; - - // Effective count after skipping (e.g., skip coinbase for fee calculations) - let effective_count = count.saturating_sub(skip_count); - let effective_first_index = first_index + skip_count.min(count); - - // Fast path: only min/max needed, no sorting or allocation required - if needs_minmax && !needs_percentiles && !needs_sum_or_cumulative { - let efi = effective_first_index.to_usize(); - let mut min_val: Option = None; - let mut max_val: Option = None; - - source.for_each_range_at(efi, efi + effective_count, |val| { - if needs_min { - min_val = Some(min_val.map_or(val, |m| if val < m { val } else { m })); - } - if needs_max { - max_val = Some(max_val.map_or(val, |m| if val > m { val } else { m })); - } - }); - - if let Some(ref mut min_vec) = min { - min_vec.push(min_val.or(max_val).unwrap_or_else(|| T::from(0_usize))); - } - if let Some(ref mut max_vec) = max { - max_vec.push(max_val.or(min_val).unwrap_or_else(|| T::from(0_usize))); - } - } else if needs_percentiles || needs_minmax { - source.collect_range_into_at( - effective_first_index.to_usize(), - effective_first_index.to_usize() + effective_count, - &mut values, - ); - - if values.is_empty() { - macro_rules! push_zero { - ($($vec:ident),*) => { - $(if let Some(ref mut v) = $vec { - v.push(T::from(0_usize)); - })* - }; - } - push_zero!(max, pct90, pct75, median, pct25, pct10, min, sum); - if let Some(ref mut cumulative_vec) = cumulative { - cumulative_vec.push(cumulative_val.unwrap()); - } - } else if needs_percentiles { - let sum_val = if needs_sum_or_cumulative { - Some(values.iter().copied().fold(T::from(0), |a, b| a + b)) - } else { - None - }; - - values.sort_unstable(); - - if let Some(ref mut max_vec) = max { - max_vec.push(*values.last().unwrap()); - } - if let Some(ref mut pct90_vec) = pct90 { - pct90_vec.push(get_percentile(&values, 0.90)); - } - if let Some(ref mut pct75_vec) = pct75 { - pct75_vec.push(get_percentile(&values, 0.75)); - } - if let Some(ref mut median_vec) = median { - median_vec.push(get_percentile(&values, 0.50)); - } - if let Some(ref mut pct25_vec) = pct25 { - pct25_vec.push(get_percentile(&values, 0.25)); - } - if let Some(ref mut pct10_vec) = pct10 { - pct10_vec.push(get_percentile(&values, 0.10)); - } - if let Some(ref mut min_vec) = min { - min_vec.push(*values.first().unwrap()); - } - - if let Some(sum_val) = sum_val { - if let Some(ref mut sum_vec) = sum { - sum_vec.push(sum_val); - } - if let Some(ref mut cumulative_vec) = cumulative { - let t = cumulative_val.unwrap() + sum_val; - cumulative_val.replace(t); - cumulative_vec.push(t); - } - } - } else if needs_minmax { - // Single pass for min + max + optional sum - let (min_val, max_val, sum_val, _len) = values.iter().copied().fold( - (values[0], values[0], T::from(0_usize), 0_usize), - |(mn, mx, s, c), v| (mn.min(v), mx.max(v), s + v, c + 1), - ); - - if let Some(ref mut min_vec) = min { - min_vec.push(min_val); - } - if let Some(ref mut max_vec) = max { - max_vec.push(max_val); - } - - if needs_sum_or_cumulative { - if let Some(ref mut sum_vec) = sum { - sum_vec.push(sum_val); - } - if let Some(ref mut cumulative_vec) = cumulative { - let t = cumulative_val.unwrap() + sum_val; - cumulative_val.replace(t); - cumulative_vec.push(t); - } - } - } - } else if needs_sum_or_cumulative { - let efi = effective_first_index.to_usize(); - let sum_val = source.fold_range_at( - efi, - efi + effective_count, - T::from(0_usize), - |acc, val| acc + val, - ); - - if let Some(ref mut sum_vec) = sum { - sum_vec.push(sum_val); - } - if let Some(ref mut cumulative_vec) = cumulative { - let t = cumulative_val.unwrap() + sum_val; - cumulative_val.replace(t); - cumulative_vec.push(t); - } - } - - Ok(()) - })?; - - let _lock = exit.lock(); - - macro_rules! write_vec { - ($($vec:ident),*) => { - $(if let Some(v) = $vec { v.write()?; })* - }; - } - - write_vec!( - min, max, sum, cumulative, median, pct10, pct25, pct75, pct90 - ); - - Ok(()) -} - -#[allow(clippy::too_many_arguments)] -pub(crate) fn compute_aggregations_nblock_window( - max_from: I, - source: &(impl ReadableVec + Sized), - first_indexes: &impl ReadableVec, - count_indexes: &impl ReadableVec, - n_blocks: usize, - exit: &Exit, - min: &mut EagerVec>, - max: &mut EagerVec>, - median: &mut EagerVec>, - pct10: &mut EagerVec>, - pct25: &mut EagerVec>, - pct75: &mut EagerVec>, - pct90: &mut EagerVec>, -) -> Result<()> -where - I: VecIndex, - T: ComputedVecValue + CheckedSub + JsonSchema, - A: VecIndex + VecValue + CheckedSub, -{ - let combined_version = source.version() + first_indexes.version() + count_indexes.version(); - - let mut idx = max_from; - for vec in [ - &mut *min, - &mut *max, - &mut *median, - &mut *pct10, - &mut *pct25, - &mut *pct75, - &mut *pct90, - ] { - idx = validate_and_start(vec, combined_version, idx)?; - } - let index = idx; - - let start = index.to_usize(); - let fi_len = first_indexes.len(); - - let batch_start = start.saturating_sub(n_blocks - 1); - let first_indexes_batch: Vec = first_indexes.collect_range_at(batch_start, fi_len); - let count_indexes_all: Vec = count_indexes.collect_range_at(batch_start, fi_len); - - let zero = T::from(0_usize); - - for vec in [ - &mut *min, - &mut *max, - &mut *median, - &mut *pct10, - &mut *pct25, - &mut *pct75, - &mut *pct90, - ] { - vec.truncate_if_needed_at(start)?; - } - - // Persistent sorted window: O(n) merge-insert for new block, O(n) merge-filter - // for expired block. Avoids re-sorting every block. Cursor reads only the new - // block (~1 page decompress vs original's ~4). Ring buffer caches per-block - // sorted values for O(1) expiry. - // Peak memory: 2 × ~15k window elements + n_blocks × ~2500 cached ≈ 360 KB. - let mut block_ring: VecDeque> = VecDeque::with_capacity(n_blocks + 1); - let mut cursor = source.cursor(); - let mut sorted_window: Vec = Vec::new(); - let mut merge_buf: Vec = Vec::new(); - - // Pre-fill initial window blocks [window_start_of_first..start) - let window_start_of_first = start.saturating_sub(n_blocks - 1); - for block_idx in window_start_of_first..start { - let fi = first_indexes_batch[block_idx - batch_start].to_usize(); - let count = u64::from(count_indexes_all[block_idx - batch_start]) as usize; - if cursor.position() < fi { - cursor.advance(fi - cursor.position()); - } - let mut bv = Vec::with_capacity(count); - cursor.for_each(count, |v: T| bv.push(v)); - bv.sort_unstable(); - sorted_window.extend_from_slice(&bv); - block_ring.push_back(bv); - } - // Initial sorted_window was built by extending individually sorted blocks — - // stable sort detects these sorted runs and merges in O(n × log(k)) instead of O(n log n). - sorted_window.sort(); - - for j in 0..(fi_len - start) { - let idx = start + j; - - // Read and sort new block's values - let fi = first_indexes_batch[idx - batch_start].to_usize(); - let count = u64::from(count_indexes_all[idx - batch_start]) as usize; - if cursor.position() < fi { - cursor.advance(fi - cursor.position()); - } - let mut new_block = Vec::with_capacity(count); - cursor.for_each(count, |v: T| new_block.push(v)); - new_block.sort_unstable(); - - // Merge-insert new sorted block into sorted_window: O(n+m) - merge_buf.clear(); - merge_buf.reserve(sorted_window.len() + new_block.len()); - let (mut si, mut ni) = (0, 0); - while si < sorted_window.len() && ni < new_block.len() { - if sorted_window[si] <= new_block[ni] { - merge_buf.push(sorted_window[si]); - si += 1; - } else { - merge_buf.push(new_block[ni]); - ni += 1; - } - } - merge_buf.extend_from_slice(&sorted_window[si..]); - merge_buf.extend_from_slice(&new_block[ni..]); - std::mem::swap(&mut sorted_window, &mut merge_buf); - - block_ring.push_back(new_block); - - // Expire oldest block: merge-filter its sorted values from sorted_window in O(n) - if block_ring.len() > n_blocks { - let expired = block_ring.pop_front().unwrap(); - - merge_buf.clear(); - merge_buf.reserve(sorted_window.len()); - let mut ei = 0; - for &v in &sorted_window { - if ei < expired.len() && v == expired[ei] { - ei += 1; - } else { - merge_buf.push(v); - } - } - std::mem::swap(&mut sorted_window, &mut merge_buf); - } - - if sorted_window.is_empty() { - for vec in [ - &mut *min, - &mut *max, - &mut *median, - &mut *pct10, - &mut *pct25, - &mut *pct75, - &mut *pct90, - ] { - vec.push(zero); - } - } else { - max.push(*sorted_window.last().unwrap()); - pct90.push(get_percentile(&sorted_window, 0.90)); - pct75.push(get_percentile(&sorted_window, 0.75)); - median.push(get_percentile(&sorted_window, 0.50)); - pct25.push(get_percentile(&sorted_window, 0.25)); - pct10.push(get_percentile(&sorted_window, 0.10)); - min.push(*sorted_window.first().unwrap()); - } - } - - let _lock = exit.lock(); - for vec in [min, max, median, pct10, pct25, pct75, pct90] { - vec.write()?; - } - - Ok(()) -} diff --git a/crates/brk_computer/src/internal/algo/mod.rs b/crates/brk_computer/src/internal/algo/mod.rs index 88e971047..038de0808 100644 --- a/crates/brk_computer/src/internal/algo/mod.rs +++ b/crates/brk_computer/src/internal/algo/mod.rs @@ -1,4 +1,3 @@ -mod aggregation; mod drawdown; mod expanding_percentiles; mod fenwick; @@ -6,7 +5,6 @@ mod sliding_distribution; mod sliding_median; mod sliding_window; -pub(crate) use aggregation::*; pub(crate) use drawdown::*; pub(crate) use expanding_percentiles::*; pub(crate) use fenwick::*; diff --git a/crates/brk_computer/src/internal/per_block/computed/aggregated.rs b/crates/brk_computer/src/internal/per_block/computed/aggregated.rs index c3de8955c..0f504be05 100644 --- a/crates/brk_computer/src/internal/per_block/computed/aggregated.rs +++ b/crates/brk_computer/src/internal/per_block/computed/aggregated.rs @@ -2,13 +2,15 @@ use brk_error::Result; use brk_traversable::Traversable; use brk_types::Height; use schemars::JsonSchema; -use vecdb::{Database, Exit, ReadableVec, Rw, StorageMode, VecIndex, VecValue, Version}; +use vecdb::{ + AnyStoredVec, AnyVec, Database, Exit, ReadableVec, Rw, StorageMode, VecIndex, VecValue, + Version, WritableVec, +}; use crate::{ indexes, internal::{ CachedWindowStarts, NumericValue, PerBlock, RollingComplete, WindowStarts, - algo::compute_aggregations, }, }; @@ -68,23 +70,68 @@ where f64: From, A: VecIndex + VecValue + brk_types::CheckedSub, { - compute_aggregations( - max_from, - source, - first_indexes, - count_indexes, - exit, - skip_count, - None, - None, - Some(&mut self.sum.height), - Some(&mut self.cumulative.height), - None, - None, - None, - None, - None, - )?; + let combined_version = + source.version() + first_indexes.version() + count_indexes.version(); + + let mut index = max_from; + index = { + self.sum + .height + .validate_computed_version_or_reset(combined_version)?; + index.min(Height::from(self.sum.height.len())) + }; + index = { + self.cumulative + .height + .validate_computed_version_or_reset(combined_version)?; + index.min(Height::from(self.cumulative.height.len())) + }; + + let start = index.to_usize(); + + self.sum.height.truncate_if_needed_at(start)?; + self.cumulative.height.truncate_if_needed_at(start)?; + + let mut cumulative_val = index.decremented().map_or(T::from(0_usize), |idx| { + self.cumulative + .height + .collect_one_at(idx.to_usize()) + .unwrap_or(T::from(0_usize)) + }); + + let fi_len = first_indexes.len(); + let first_indexes_batch: Vec = first_indexes.collect_range_at(start, fi_len); + let count_indexes_batch: Vec = + count_indexes.collect_range_at(start, fi_len); + + first_indexes_batch + .into_iter() + .zip(count_indexes_batch) + .try_for_each(|(first_index, count_index)| -> Result<()> { + let count = u64::from(count_index) as usize; + let effective_count = count.saturating_sub(skip_count); + let effective_first_index = first_index + skip_count.min(count); + + let efi = effective_first_index.to_usize(); + let sum_val = source.fold_range_at( + efi, + efi + effective_count, + T::from(0_usize), + |acc, val| acc + val, + ); + + self.sum.height.push(sum_val); + cumulative_val = cumulative_val + sum_val; + self.cumulative.height.push(cumulative_val); + + Ok(()) + })?; + + let _lock = exit.lock(); + self.sum.height.write()?; + self.cumulative.height.write()?; + drop(_lock); + self.rolling .compute(max_from, windows, &self.sum.height, exit)?; Ok(()) diff --git a/crates/brk_computer/src/internal/per_block/computed/distribution.rs b/crates/brk_computer/src/internal/per_block/computed/distribution.rs index f49198edb..643aa4ca4 100644 --- a/crates/brk_computer/src/internal/per_block/computed/distribution.rs +++ b/crates/brk_computer/src/internal/per_block/computed/distribution.rs @@ -1,19 +1,18 @@ +use std::collections::VecDeque; + use brk_error::Result; use brk_traversable::Traversable; -use brk_types::Height; +use brk_types::{Height, get_percentile}; use derive_more::{Deref, DerefMut}; use schemars::JsonSchema; use vecdb::{ - CheckedSub, Database, Exit, ReadableVec, Rw, StorageMode, - VecIndex, VecValue, Version, + AnyStoredVec, AnyVec, CheckedSub, Database, Exit, ReadableVec, Rw, StorageMode, VecIndex, + VecValue, Version, WritableVec, }; use crate::{ indexes, - internal::{ - ComputedVecValue, DistributionStats, NumericValue, PerBlock, - algo::{compute_aggregations, compute_aggregations_nblock_window}, - }, + internal::{ComputedVecValue, DistributionStats, NumericValue, PerBlock}, }; #[derive(Deref, DerefMut, Traversable)] @@ -46,24 +45,110 @@ impl PerBlockDistribution { where A: VecIndex + VecValue + brk_types::CheckedSub, { - let s = &mut self.0; - compute_aggregations( - max_from, - source, - first_indexes, - count_indexes, - exit, - skip_count, - Some(&mut s.min.height), - Some(&mut s.max.height), - None, - None, - Some(&mut s.median.height), - Some(&mut s.pct10.height), - Some(&mut s.pct25.height), - Some(&mut s.pct75.height), - Some(&mut s.pct90.height), - ) + let DistributionStats { + min, + max, + pct10, + pct25, + median, + pct75, + pct90, + } = &mut self.0; + + let min = &mut min.height; + let max = &mut max.height; + let pct10 = &mut pct10.height; + let pct25 = &mut pct25.height; + let median = &mut median.height; + let pct75 = &mut pct75.height; + let pct90 = &mut pct90.height; + + let combined_version = + source.version() + first_indexes.version() + count_indexes.version(); + + let mut index = max_from; + for vec in [ + &mut *min, + &mut *max, + &mut *median, + &mut *pct10, + &mut *pct25, + &mut *pct75, + &mut *pct90, + ] { + vec.validate_computed_version_or_reset(combined_version)?; + index = index.min(Height::from(vec.len())); + } + + let start = index.to_usize(); + + for vec in [ + &mut *min, + &mut *max, + &mut *median, + &mut *pct10, + &mut *pct25, + &mut *pct75, + &mut *pct90, + ] { + vec.truncate_if_needed_at(start)?; + } + + let fi_len = first_indexes.len(); + let first_indexes_batch: Vec = first_indexes.collect_range_at(start, fi_len); + let count_indexes_batch: Vec = + count_indexes.collect_range_at(start, fi_len); + + let mut values: Vec = Vec::new(); + + first_indexes_batch + .into_iter() + .zip(count_indexes_batch) + .try_for_each(|(first_index, count_index)| -> Result<()> { + let count = u64::from(count_index) as usize; + let effective_count = count.saturating_sub(skip_count); + let effective_first_index = first_index + skip_count.min(count); + + source.collect_range_into_at( + effective_first_index.to_usize(), + effective_first_index.to_usize() + effective_count, + &mut values, + ); + + if values.is_empty() { + let zero = T::from(0_usize); + for vec in [ + &mut *min, + &mut *max, + &mut *median, + &mut *pct10, + &mut *pct25, + &mut *pct75, + &mut *pct90, + ] { + vec.push(zero); + } + } else { + values.sort_unstable(); + + max.push(*values.last().unwrap()); + pct90.push(get_percentile(&values, 0.90)); + pct75.push(get_percentile(&values, 0.75)); + median.push(get_percentile(&values, 0.50)); + pct25.push(get_percentile(&values, 0.25)); + pct10.push(get_percentile(&values, 0.10)); + min.push(*values.first().unwrap()); + } + + Ok(()) + })?; + + let _lock = exit.lock(); + for vec in [min, max, median, pct10, pct25, pct75, pct90] { + vec.write()?; + } + + Ok(()) } pub(crate) fn compute_from_nblocks( @@ -79,21 +164,168 @@ impl PerBlockDistribution { T: CheckedSub, A: VecIndex + VecValue + brk_types::CheckedSub, { - let s = &mut self.0; - compute_aggregations_nblock_window( - max_from, - source, - first_indexes, - count_indexes, - n_blocks, - exit, - &mut s.min.height, - &mut s.max.height, - &mut s.median.height, - &mut s.pct10.height, - &mut s.pct25.height, - &mut s.pct75.height, - &mut s.pct90.height, - ) + let DistributionStats { + min, + max, + pct10, + pct25, + median, + pct75, + pct90, + } = &mut self.0; + + let min = &mut min.height; + let max = &mut max.height; + let pct10 = &mut pct10.height; + let pct25 = &mut pct25.height; + let median = &mut median.height; + let pct75 = &mut pct75.height; + let pct90 = &mut pct90.height; + + let combined_version = + source.version() + first_indexes.version() + count_indexes.version(); + + let mut index = max_from; + for vec in [ + &mut *min, + &mut *max, + &mut *median, + &mut *pct10, + &mut *pct25, + &mut *pct75, + &mut *pct90, + ] { + vec.validate_computed_version_or_reset(combined_version)?; + index = index.min(Height::from(vec.len())); + } + + let start = index.to_usize(); + let fi_len = first_indexes.len(); + + let batch_start = start.saturating_sub(n_blocks - 1); + let first_indexes_batch: Vec = first_indexes.collect_range_at(batch_start, fi_len); + let count_indexes_all: Vec = + count_indexes.collect_range_at(batch_start, fi_len); + + let zero = T::from(0_usize); + + for vec in [ + &mut *min, + &mut *max, + &mut *median, + &mut *pct10, + &mut *pct25, + &mut *pct75, + &mut *pct90, + ] { + vec.truncate_if_needed_at(start)?; + } + + // Persistent sorted window: O(n) merge-insert for new block, O(n) merge-filter + // for expired block. Avoids re-sorting every block. Cursor reads only the new + // block (~1 page decompress vs original's ~4). Ring buffer caches per-block + // sorted values for O(1) expiry. + // Peak memory: 2 × ~15k window elements + n_blocks × ~2500 cached ≈ 360 KB. + let mut block_ring: VecDeque> = VecDeque::with_capacity(n_blocks + 1); + let mut cursor = source.cursor(); + let mut sorted_window: Vec = Vec::new(); + let mut merge_buf: Vec = Vec::new(); + + // Pre-fill initial window blocks [window_start_of_first..start) + let window_start_of_first = start.saturating_sub(n_blocks - 1); + for block_idx in window_start_of_first..start { + let fi = first_indexes_batch[block_idx - batch_start].to_usize(); + let count = u64::from(count_indexes_all[block_idx - batch_start]) as usize; + if cursor.position() < fi { + cursor.advance(fi - cursor.position()); + } + let mut bv = Vec::with_capacity(count); + cursor.for_each(count, |v: T| bv.push(v)); + bv.sort_unstable(); + sorted_window.extend_from_slice(&bv); + block_ring.push_back(bv); + } + // Initial sorted_window was built by extending individually sorted blocks — + // stable sort detects these sorted runs and merges in O(n × log(k)) instead of O(n log n). + sorted_window.sort(); + + for j in 0..(fi_len - start) { + let idx = start + j; + + // Read and sort new block's values + let fi = first_indexes_batch[idx - batch_start].to_usize(); + let count = u64::from(count_indexes_all[idx - batch_start]) as usize; + if cursor.position() < fi { + cursor.advance(fi - cursor.position()); + } + let mut new_block = Vec::with_capacity(count); + cursor.for_each(count, |v: T| new_block.push(v)); + new_block.sort_unstable(); + + // Merge-insert new sorted block into sorted_window: O(n+m) + merge_buf.clear(); + merge_buf.reserve(sorted_window.len() + new_block.len()); + let (mut si, mut ni) = (0, 0); + while si < sorted_window.len() && ni < new_block.len() { + if sorted_window[si] <= new_block[ni] { + merge_buf.push(sorted_window[si]); + si += 1; + } else { + merge_buf.push(new_block[ni]); + ni += 1; + } + } + merge_buf.extend_from_slice(&sorted_window[si..]); + merge_buf.extend_from_slice(&new_block[ni..]); + std::mem::swap(&mut sorted_window, &mut merge_buf); + + block_ring.push_back(new_block); + + // Expire oldest block: merge-filter its sorted values from sorted_window in O(n) + if block_ring.len() > n_blocks { + let expired = block_ring.pop_front().unwrap(); + + merge_buf.clear(); + merge_buf.reserve(sorted_window.len()); + let mut ei = 0; + for &v in &sorted_window { + if ei < expired.len() && v == expired[ei] { + ei += 1; + } else { + merge_buf.push(v); + } + } + std::mem::swap(&mut sorted_window, &mut merge_buf); + } + + if sorted_window.is_empty() { + for vec in [ + &mut *min, + &mut *max, + &mut *median, + &mut *pct10, + &mut *pct25, + &mut *pct75, + &mut *pct90, + ] { + vec.push(zero); + } + } else { + max.push(*sorted_window.last().unwrap()); + pct90.push(get_percentile(&sorted_window, 0.90)); + pct75.push(get_percentile(&sorted_window, 0.75)); + median.push(get_percentile(&sorted_window, 0.50)); + pct25.push(get_percentile(&sorted_window, 0.25)); + pct10.push(get_percentile(&sorted_window, 0.10)); + min.push(*sorted_window.first().unwrap()); + } + } + + let _lock = exit.lock(); + for vec in [min, max, median, pct10, pct25, pct75, pct90] { + vec.write()?; + } + + Ok(()) } }