global: snapshot part 1

This commit is contained in:
nym21
2026-03-19 22:21:27 +01:00
parent 19bd17566f
commit b8e57f4788
16 changed files with 555 additions and 699 deletions

View File

@@ -11,7 +11,7 @@ use crate::{
blocks,
distribution::metrics::{
ActivityFull, AdjustedSopr, CohortMetricsBase, CostBasis, ImportConfig, OutputsBase,
RealizedFull, RelativeForAll, SupplyCore, UnrealizedFull, UnrealizedLike,
RealizedFull, RelativeForAll, SupplyCore, UnrealizedFull,
},
prices,
};
@@ -136,6 +136,23 @@ impl AllCohortMetrics {
exit,
)?;
self.cost_basis.compute_prices(
starting_indexes,
&self.unrealized.invested_capital.in_profit.cents.height,
&self.unrealized.invested_capital.in_loss.cents.height,
&self.supply.in_profit.sats.height,
&self.supply.in_loss.sats.height,
&self.unrealized.investor_cap_in_profit_raw,
&self.unrealized.investor_cap_in_loss_raw,
exit,
)?;
self.unrealized.compute_sentiment(
starting_indexes,
&prices.spot.cents.height,
exit,
)?;
self.relative.compute(
starting_indexes.height,
&self.supply,

View File

@@ -7,7 +7,7 @@ use vecdb::{AnyStoredVec, Exit, ReadableVec, Rw, StorageMode};
use crate::{
distribution::metrics::{
ActivityCore, CohortMetricsBase, ImportConfig, OutputsBase, RealizedCore, RelativeToAll,
SupplyCore, UnrealizedBase,
SupplyCore, UnrealizedCore,
},
prices,
};
@@ -22,7 +22,7 @@ pub struct BasicCohortMetrics<M: StorageMode = Rw> {
pub outputs: Box<OutputsBase<M>>,
pub activity: Box<ActivityCore<M>>,
pub realized: Box<RealizedCore<M>>,
pub unrealized: Box<UnrealizedBase<M>>,
pub unrealized: Box<UnrealizedCore<M>>,
#[traversable(flatten)]
pub relative: Box<RelativeToAll<M>>,
}
@@ -30,7 +30,7 @@ pub struct BasicCohortMetrics<M: StorageMode = Rw> {
impl CohortMetricsBase for BasicCohortMetrics {
type ActivityVecs = ActivityCore;
type RealizedVecs = RealizedCore;
type UnrealizedVecs = UnrealizedBase;
type UnrealizedVecs = UnrealizedCore;
impl_cohort_accessors!();
@@ -48,7 +48,7 @@ impl CohortMetricsBase for BasicCohortMetrics {
impl BasicCohortMetrics {
pub(crate) fn forced_import(cfg: &ImportConfig) -> Result<Self> {
let supply = SupplyCore::forced_import(cfg)?;
let unrealized = UnrealizedBase::forced_import(cfg)?;
let unrealized = UnrealizedCore::forced_import(cfg)?;
let realized = RealizedCore::forced_import(cfg)?;
let relative = RelativeToAll::forced_import(cfg)?;

View File

@@ -92,7 +92,7 @@ impl CoreCohortMetrics {
)?;
self.unrealized.compute_from_stateful(
starting_indexes,
&others.iter().map(|v| &v.unrealized_base().core).collect::<Vec<_>>(),
&others.iter().map(|v| v.unrealized_core()).collect::<Vec<_>>(),
exit,
)?;

View File

@@ -11,7 +11,7 @@ use crate::{
blocks,
distribution::metrics::{
ActivityFull, CohortMetricsBase, CostBasis, ImportConfig, OutputsBase, RealizedFull,
RelativeWithExtended, SupplyCore, UnrealizedFull, UnrealizedLike,
RelativeWithExtended, SupplyCore, UnrealizedFull,
},
prices,
};
@@ -114,6 +114,23 @@ impl ExtendedCohortMetrics {
exit,
)?;
self.cost_basis.compute_prices(
starting_indexes,
&self.unrealized.invested_capital.in_profit.cents.height,
&self.unrealized.invested_capital.in_loss.cents.height,
&self.supply.in_profit.sats.height,
&self.supply.in_loss.sats.height,
&self.unrealized.investor_cap_in_profit_raw,
&self.unrealized.investor_cap_in_loss_raw,
exit,
)?;
self.unrealized.compute_sentiment(
starting_indexes,
&prices.spot.cents.height,
exit,
)?;
self.relative.compute(
starting_indexes.height,
&self.supply,
@@ -126,4 +143,5 @@ impl ExtendedCohortMetrics {
Ok(())
}
}

View File

@@ -1,16 +1,25 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{BasisPoints16, Cents, Version};
use vecdb::{AnyStoredVec, AnyVec, Rw, StorageMode, WritableVec};
use brk_types::{BasisPoints16, Cents, Height, Indexes, Sats, Version};
use brk_types::CentsSquaredSats;
use vecdb::{AnyStoredVec, AnyVec, Exit, ReadableVec, Rw, StorageMode, WritableVec};
use crate::internal::{PerBlock, PercentPerBlock, PercentilesVecs, Price, PERCENTILES_LEN};
use crate::internal::{FiatPerBlock, PerBlock, PercentPerBlock, PercentilesVecs, Price, PERCENTILES_LEN};
use super::ImportConfig;
/// Cost basis metrics: min/max + percentiles + supply density.
#[derive(Traversable)]
pub struct CostBasisSide<M: StorageMode = Rw> {
pub per_coin: FiatPerBlock<Cents, M>,
pub per_dollar: FiatPerBlock<Cents, M>,
}
/// Cost basis metrics: min/max + profit/loss splits + percentiles + supply density.
/// Used by all/sth/lth cohorts only.
#[derive(Traversable)]
pub struct CostBasis<M: StorageMode = Rw> {
pub in_profit: CostBasisSide<M>,
pub in_loss: CostBasisSide<M>,
pub min: Price<PerBlock<Cents, M>>,
pub max: Price<PerBlock<Cents, M>>,
pub percentiles: PercentilesVecs<M>,
@@ -21,6 +30,14 @@ pub struct CostBasis<M: StorageMode = Rw> {
impl CostBasis {
pub(crate) fn forced_import(cfg: &ImportConfig) -> Result<Self> {
Ok(Self {
in_profit: CostBasisSide {
per_coin: cfg.import("cost_basis_in_profit_per_coin", Version::ZERO)?,
per_dollar: cfg.import("cost_basis_in_profit_per_dollar", Version::ZERO)?,
},
in_loss: CostBasisSide {
per_coin: cfg.import("cost_basis_in_loss_per_coin", Version::ZERO)?,
per_dollar: cfg.import("cost_basis_in_loss_per_dollar", Version::ZERO)?,
},
min: cfg.import("cost_basis_min", Version::ZERO)?,
max: cfg.import("cost_basis_max", Version::ZERO)?,
percentiles: PercentilesVecs::forced_import(
@@ -84,6 +101,10 @@ impl CostBasis {
pub(crate) fn collect_vecs_mut(&mut self) -> Vec<&mut dyn AnyStoredVec> {
let mut vecs: Vec<&mut dyn AnyStoredVec> = vec![
&mut self.in_profit.per_coin.cents.height,
&mut self.in_profit.per_dollar.cents.height,
&mut self.in_loss.per_coin.cents.height,
&mut self.in_loss.per_dollar.cents.height,
&mut self.min.cents.height,
&mut self.max.cents.height,
&mut self.supply_density.bps.height,
@@ -102,4 +123,62 @@ impl CostBasis {
);
vecs
}
pub(crate) fn compute_prices(
&mut self,
starting_indexes: &Indexes,
invested_cap_in_profit: &impl ReadableVec<Height, Cents>,
invested_cap_in_loss: &impl ReadableVec<Height, Cents>,
supply_in_profit_sats: &impl ReadableVec<Height, Sats>,
supply_in_loss_sats: &impl ReadableVec<Height, Sats>,
investor_cap_in_profit_raw: &impl ReadableVec<Height, CentsSquaredSats>,
investor_cap_in_loss_raw: &impl ReadableVec<Height, CentsSquaredSats>,
exit: &Exit,
) -> Result<()> {
self.in_profit.per_coin.cents.height.compute_transform2(
starting_indexes.height,
invested_cap_in_profit,
supply_in_profit_sats,
|(h, invested_cents, supply_sats, ..)| {
let supply = supply_sats.as_u128();
if supply == 0 { return (h, Cents::ZERO); }
(h, Cents::new((invested_cents.as_u128() * Sats::ONE_BTC_U128 / supply) as u64))
},
exit,
)?;
self.in_loss.per_coin.cents.height.compute_transform2(
starting_indexes.height,
invested_cap_in_loss,
supply_in_loss_sats,
|(h, invested_cents, supply_sats, ..)| {
let supply = supply_sats.as_u128();
if supply == 0 { return (h, Cents::ZERO); }
(h, Cents::new((invested_cents.as_u128() * Sats::ONE_BTC_U128 / supply) as u64))
},
exit,
)?;
self.in_profit.per_dollar.cents.height.compute_transform2(
starting_indexes.height,
investor_cap_in_profit_raw,
invested_cap_in_profit,
|(h, investor_cap, invested_cents, ..)| {
let invested_raw = invested_cents.as_u128() * Sats::ONE_BTC_U128;
if invested_raw == 0 { return (h, Cents::ZERO); }
(h, Cents::new((investor_cap.inner() / invested_raw) as u64))
},
exit,
)?;
self.in_loss.per_dollar.cents.height.compute_transform2(
starting_indexes.height,
investor_cap_in_loss_raw,
invested_cap_in_loss,
|(h, investor_cap, invested_cents, ..)| {
let invested_raw = invested_cents.as_u128() * Sats::ONE_BTC_U128;
if invested_raw == 0 { return (h, Cents::ZERO); }
(h, Cents::new((investor_cap.inner() / invested_raw) as u64))
},
exit,
)?;
Ok(())
}
}

View File

@@ -125,7 +125,7 @@ pub use realized::{
pub use relative::{RelativeForAll, RelativeToAll, RelativeWithExtended};
pub use supply::{SupplyBase, SupplyCore};
pub use unrealized::{
UnrealizedBase, UnrealizedBasic, UnrealizedCore, UnrealizedFull, UnrealizedLike,
UnrealizedBasic, UnrealizedCore, UnrealizedFull, UnrealizedLike,
UnrealizedMinimal,
};
@@ -212,12 +212,12 @@ pub trait CohortMetricsBase:
self.realized_mut().as_core_mut()
}
/// Convenience: access unrealized as `&UnrealizedBase` (via `UnrealizedLike::as_base`).
fn unrealized_base(&self) -> &UnrealizedBase {
self.unrealized().as_base()
/// Convenience: access unrealized as `&UnrealizedCore` (via `UnrealizedLike::as_core`).
fn unrealized_core(&self) -> &UnrealizedCore {
self.unrealized().as_core()
}
fn unrealized_base_mut(&mut self) -> &mut UnrealizedBase {
self.unrealized_mut().as_base_mut()
fn unrealized_core_mut(&mut self) -> &mut UnrealizedCore {
self.unrealized_mut().as_core_mut()
}
fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {
@@ -314,11 +314,11 @@ pub trait CohortMetricsBase:
&others.iter().map(|v| v.realized_core()).collect::<Vec<_>>(),
exit,
)?;
self.unrealized_base_mut().compute_from_stateful(
self.unrealized_core_mut().compute_from_stateful(
starting_indexes,
&others
.iter()
.map(|v| v.unrealized_base())
.map(|v| v.unrealized_core())
.collect::<Vec<_>>(),
exit,
)?;

View File

@@ -38,12 +38,12 @@ impl RelativeForAll {
self.base.compute(
max_from,
supply,
&unrealized.inner.core.basic,
&unrealized.inner.basic,
market_cap,
exit,
)?;
self.extended_own_pnl
.compute(max_from, &unrealized.inner.core, &unrealized.gross_pnl.usd.height, exit)?;
.compute(max_from, &unrealized.inner, &unrealized.gross_pnl.usd.height, exit)?;
Ok(())
}
}

View File

@@ -48,7 +48,7 @@ impl RelativeWithExtended {
self.base.compute(
max_from,
supply,
&unrealized.inner.core.basic,
&unrealized.inner.basic,
market_cap,
exit,
)?;
@@ -59,9 +59,9 @@ impl RelativeWithExtended {
exit,
)?;
self.extended_own_market_cap
.compute(max_from, &unrealized.inner.core, own_market_cap, exit)?;
.compute(max_from, &unrealized.inner, own_market_cap, exit)?;
self.extended_own_pnl
.compute(max_from, &unrealized.inner.core, &unrealized.gross_pnl.usd.height, exit)?;
.compute(max_from, &unrealized.inner, &unrealized.gross_pnl.usd.height, exit)?;
Ok(())
}
}

View File

@@ -1,127 +0,0 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{CentsSquaredSats, Height, Indexes, Version};
use derive_more::{Deref, DerefMut};
use vecdb::{AnyStoredVec, AnyVec, BytesVec, Exit, ReadableVec, Rw, StorageMode, WritableVec};
use crate::distribution::{metrics::ImportConfig, state::UnrealizedState};
use super::UnrealizedCore;
#[derive(Deref, DerefMut, Traversable)]
pub struct UnrealizedBase<M: StorageMode = Rw> {
#[deref]
#[deref_mut]
#[traversable(flatten)]
pub core: UnrealizedCore<M>,
#[traversable(hidden)]
pub investor_cap_in_profit_raw: M::Stored<BytesVec<Height, CentsSquaredSats>>,
#[traversable(hidden)]
pub investor_cap_in_loss_raw: M::Stored<BytesVec<Height, CentsSquaredSats>>,
}
impl UnrealizedBase {
pub(crate) fn forced_import(cfg: &ImportConfig) -> Result<Self> {
let v0 = Version::ZERO;
let core = UnrealizedCore::forced_import(cfg)?;
let investor_cap_in_profit_raw = cfg.import("investor_cap_in_profit_raw", v0)?;
let investor_cap_in_loss_raw = cfg.import("investor_cap_in_loss_raw", v0)?;
Ok(Self {
core,
investor_cap_in_profit_raw,
investor_cap_in_loss_raw,
})
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.core
.min_stateful_len()
.min(self.investor_cap_in_profit_raw.len())
.min(self.investor_cap_in_loss_raw.len())
}
#[inline(always)]
pub(crate) fn push_state(&mut self, state: &UnrealizedState) {
self.core.push_state(state);
self.investor_cap_in_profit_raw
.push(CentsSquaredSats::new(state.investor_cap_in_profit_raw));
self.investor_cap_in_loss_raw
.push(CentsSquaredSats::new(state.investor_cap_in_loss_raw));
}
pub(crate) fn collect_vecs_mut(&mut self) -> Vec<&mut dyn AnyStoredVec> {
let mut vecs = self.core.collect_vecs_mut();
vecs.push(&mut self.investor_cap_in_profit_raw as &mut dyn AnyStoredVec);
vecs.push(&mut self.investor_cap_in_loss_raw as &mut dyn AnyStoredVec);
vecs
}
pub(crate) fn compute_from_stateful(
&mut self,
starting_indexes: &Indexes,
others: &[&Self],
exit: &Exit,
) -> Result<()> {
let core_refs: Vec<&UnrealizedCore> =
others.iter().map(|o| &o.core).collect();
self.core
.compute_from_stateful(starting_indexes, &core_refs, exit)?;
let start = self
.investor_cap_in_profit_raw
.len()
.min(self.investor_cap_in_loss_raw.len());
let end = others
.iter()
.map(|o| o.investor_cap_in_profit_raw.len())
.min()
.unwrap_or(0);
let investor_profit_ranges: Vec<Vec<CentsSquaredSats>> = others
.iter()
.map(|o| o.investor_cap_in_profit_raw.collect_range_at(start, end))
.collect();
let investor_loss_ranges: Vec<Vec<CentsSquaredSats>> = others
.iter()
.map(|o| o.investor_cap_in_loss_raw.collect_range_at(start, end))
.collect();
self.investor_cap_in_profit_raw
.truncate_if_needed_at(start)?;
self.investor_cap_in_loss_raw
.truncate_if_needed_at(start)?;
for i in start..end {
let local_i = i - start;
let mut sum_investor_profit = CentsSquaredSats::ZERO;
let mut sum_investor_loss = CentsSquaredSats::ZERO;
for idx in 0..others.len() {
sum_investor_profit += investor_profit_ranges[idx][local_i];
sum_investor_loss += investor_loss_ranges[idx][local_i];
}
self.investor_cap_in_profit_raw
.push(sum_investor_profit);
self.investor_cap_in_loss_raw
.push(sum_investor_loss);
}
Ok(())
}
pub(crate) fn compute_rest(
&mut self,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.core.compute_rest(starting_indexes, exit)?;
Ok(())
}
}

View File

@@ -1,14 +1,14 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Cents, CentsSigned, Height, Indexes, Sats, Version};
use brk_types::{Cents, CentsSigned, CentsSquaredSats, Height, Indexes, Sats, Version};
use derive_more::{Deref, DerefMut};
use vecdb::{AnyStoredVec, Exit, ReadableVec, Rw, StorageMode};
use vecdb::{AnyStoredVec, AnyVec, BytesVec, Exit, ReadableVec, Rw, StorageMode, WritableVec};
use crate::distribution::state::UnrealizedState;
use crate::internal::{CentsSubtractToCentsSigned, FiatPerBlock};
use crate::{distribution::metrics::ImportConfig, prices};
use super::UnrealizedBase;
use super::UnrealizedCore;
#[derive(Traversable)]
pub struct UnrealizedSentiment<M: StorageMode = Rw> {
@@ -28,43 +28,64 @@ pub struct UnrealizedFull<M: StorageMode = Rw> {
#[deref]
#[deref_mut]
#[traversable(flatten)]
pub inner: UnrealizedBase<M>,
pub inner: UnrealizedCore<M>,
pub gross_pnl: FiatPerBlock<Cents, M>,
pub invested_capital: UnrealizedInvestedCapital<M>,
#[traversable(hidden)]
pub investor_cap_in_profit_raw: M::Stored<BytesVec<Height, CentsSquaredSats>>,
#[traversable(hidden)]
pub investor_cap_in_loss_raw: M::Stored<BytesVec<Height, CentsSquaredSats>>,
pub sentiment: UnrealizedSentiment<M>,
}
impl UnrealizedFull {
pub(crate) fn forced_import(cfg: &ImportConfig) -> Result<Self> {
let v0 = Version::ZERO;
let inner = UnrealizedBase::forced_import(cfg)?;
let inner = UnrealizedCore::forced_import(cfg)?;
let gross_pnl = cfg.import("unrealized_gross_pnl", v0)?;
let invested_capital = UnrealizedInvestedCapital {
in_profit: cfg.import("invested_capital_in_profit", v0)?,
in_loss: cfg.import("invested_capital_in_loss", v0)?,
};
let investor_cap_in_profit_raw = cfg.import("investor_cap_in_profit_raw", v0)?;
let investor_cap_in_loss_raw = cfg.import("investor_cap_in_loss_raw", v0)?;
let sentiment = UnrealizedSentiment {
pain_index: cfg.import("pain_index", v0)?,
greed_index: cfg.import("greed_index", v0)?,
net: cfg.import("net_sentiment", Version::ONE)?,
};
let invested_capital = UnrealizedInvestedCapital {
in_profit: cfg.import("invested_capital_in_profit", v0)?,
in_loss: cfg.import("invested_capital_in_loss", v0)?,
};
Ok(Self {
inner,
gross_pnl,
invested_capital,
investor_cap_in_profit_raw,
investor_cap_in_loss_raw,
sentiment,
})
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.inner
.min_stateful_len()
.min(self.investor_cap_in_profit_raw.len())
.min(self.investor_cap_in_loss_raw.len())
}
#[inline(always)]
pub(crate) fn push_state_all(&mut self, state: &UnrealizedState) {
self.inner.push_state(state);
self.investor_cap_in_profit_raw
.push(CentsSquaredSats::new(state.investor_cap_in_profit_raw));
self.investor_cap_in_loss_raw
.push(CentsSquaredSats::new(state.investor_cap_in_loss_raw));
}
pub(crate) fn collect_vecs_mut(&mut self) -> Vec<&mut dyn AnyStoredVec> {
@@ -72,6 +93,8 @@ impl UnrealizedFull {
vecs.push(&mut self.gross_pnl.cents.height as &mut dyn AnyStoredVec);
vecs.push(&mut self.invested_capital.in_profit.cents.height as &mut dyn AnyStoredVec);
vecs.push(&mut self.invested_capital.in_loss.cents.height as &mut dyn AnyStoredVec);
vecs.push(&mut self.investor_cap_in_profit_raw as &mut dyn AnyStoredVec);
vecs.push(&mut self.investor_cap_in_loss_raw as &mut dyn AnyStoredVec);
vecs.push(&mut self.sentiment.pain_index.cents.height as &mut dyn AnyStoredVec);
vecs.push(&mut self.sentiment.greed_index.cents.height as &mut dyn AnyStoredVec);
vecs.push(&mut self.sentiment.net.cents.height as &mut dyn AnyStoredVec);
@@ -88,10 +111,11 @@ impl UnrealizedFull {
) -> Result<()> {
self.inner.compute_rest(starting_indexes, exit)?;
// gross_pnl = profit + loss
self.gross_pnl.cents.height.compute_add(
starting_indexes.height,
&self.inner.core.basic.profit.cents.height,
&self.inner.core.basic.loss.cents.height,
&self.inner.basic.profit.cents.height,
&self.inner.basic.loss.cents.height,
exit,
)?;
@@ -100,8 +124,8 @@ impl UnrealizedFull {
starting_indexes.height,
supply_in_profit_sats,
&prices.spot.cents.height,
&self.inner.core.basic.profit.cents.height,
|(h, supply_sats, spot, profit, ..)| {
&self.inner.basic.profit.cents.height,
|(h, supply_sats, spot, profit, ..): (_, Sats, Cents, Cents, _)| {
let market_value = supply_sats.as_u128() * spot.as_u128() / Sats::ONE_BTC_U128;
(h, Cents::new(market_value.saturating_sub(profit.as_u128()) as u64))
},
@@ -113,44 +137,33 @@ impl UnrealizedFull {
starting_indexes.height,
supply_in_loss_sats,
&prices.spot.cents.height,
&self.inner.core.basic.loss.cents.height,
|(h, supply_sats, spot, loss, ..)| {
&self.inner.basic.loss.cents.height,
|(h, supply_sats, spot, loss, ..): (_, Sats, Cents, Cents, _)| {
let market_value = supply_sats.as_u128() * spot.as_u128() / Sats::ONE_BTC_U128;
(h, Cents::new((market_value + loss.as_u128()) as u64))
},
exit,
)?;
self.compute_rest_extended(prices, starting_indexes, exit)?;
self.sentiment
.net
.cents
.height
.compute_binary::<Cents, Cents, CentsSubtractToCentsSigned>(
starting_indexes.height,
&self.sentiment.greed_index.cents.height,
&self.sentiment.pain_index.cents.height,
exit,
)?;
Ok(())
}
fn compute_rest_extended(
/// Compute sentiment using investor_price (original formula).
/// Called after cost_basis.in_profit/loss are computed at the cohort level.
pub(crate) fn compute_sentiment(
&mut self,
prices: &prices::Vecs,
starting_indexes: &Indexes,
spot: &impl ReadableVec<Height, Cents>,
exit: &Exit,
) -> Result<()> {
// greed = spot - investor_price_winners
// investor_price = investor_cap / invested_cap (both in CentsSats)
// invested_cap is now in Cents (already divided by ONE_BTC), so multiply back
// investor_price = investor_cap / invested_cap
// invested_cap is in Cents (already / ONE_BTC), multiply back for CentsSats scale
self.sentiment.greed_index.cents.height.compute_transform3(
starting_indexes.height,
&self.inner.investor_cap_in_profit_raw,
&self.investor_cap_in_profit_raw,
&self.invested_capital.in_profit.cents.height,
&prices.spot.cents.height,
spot,
|(h, investor_cap, invested_cap_cents, spot, ..)| {
let invested_cap_raw = invested_cap_cents.as_u128() * Sats::ONE_BTC_U128;
if invested_cap_raw == 0 {
@@ -166,9 +179,9 @@ impl UnrealizedFull {
// pain = investor_price_losers - spot
self.sentiment.pain_index.cents.height.compute_transform3(
starting_indexes.height,
&self.inner.investor_cap_in_loss_raw,
&self.investor_cap_in_loss_raw,
&self.invested_capital.in_loss.cents.height,
&prices.spot.cents.height,
spot,
|(h, investor_cap, invested_cap_cents, spot, ..)| {
let invested_cap_raw = invested_cap_cents.as_u128() * Sats::ONE_BTC_U128;
if invested_cap_raw == 0 {
@@ -181,7 +194,18 @@ impl UnrealizedFull {
exit,
)?;
// net = greed - pain
self.sentiment
.net
.cents
.height
.compute_binary::<Cents, Cents, CentsSubtractToCentsSigned>(
starting_indexes.height,
&self.sentiment.greed_index.cents.height,
&self.sentiment.pain_index.cents.height,
exit,
)?;
Ok(())
}
}

View File

@@ -1,11 +1,9 @@
mod base;
mod basic;
mod core;
mod full;
mod minimal;
pub use self::core::UnrealizedCore;
pub use base::UnrealizedBase;
pub use basic::UnrealizedBasic;
pub use full::UnrealizedFull;
pub use minimal::UnrealizedMinimal;
@@ -17,8 +15,8 @@ use vecdb::{Exit, ReadableVec};
use crate::{distribution::state::UnrealizedState, prices};
pub trait UnrealizedLike: Send + Sync {
fn as_base(&self) -> &UnrealizedBase;
fn as_base_mut(&mut self) -> &mut UnrealizedBase;
fn as_core(&self) -> &UnrealizedCore;
fn as_core_mut(&mut self) -> &mut UnrealizedCore;
fn min_stateful_len(&self) -> usize;
fn push_state(&mut self, state: &UnrealizedState);
fn compute_rest(
@@ -31,11 +29,11 @@ pub trait UnrealizedLike: Send + Sync {
) -> Result<()>;
}
impl UnrealizedLike for UnrealizedBase {
fn as_base(&self) -> &UnrealizedBase {
impl UnrealizedLike for UnrealizedCore {
fn as_core(&self) -> &UnrealizedCore {
self
}
fn as_base_mut(&mut self) -> &mut UnrealizedBase {
fn as_core_mut(&mut self) -> &mut UnrealizedCore {
self
}
fn min_stateful_len(&self) -> usize {
@@ -58,10 +56,10 @@ impl UnrealizedLike for UnrealizedBase {
}
impl UnrealizedLike for UnrealizedFull {
fn as_base(&self) -> &UnrealizedBase {
fn as_core(&self) -> &UnrealizedCore {
&self.inner
}
fn as_base_mut(&mut self) -> &mut UnrealizedBase {
fn as_core_mut(&mut self) -> &mut UnrealizedCore {
&mut self.inner
}
fn min_stateful_len(&self) -> usize {

View File

@@ -10,9 +10,7 @@ pub struct UnrealizedState {
pub supply_in_loss: Sats,
pub unrealized_profit: Cents,
pub unrealized_loss: Cents,
/// Raw Σ(price² × sats) for UTXOs in profit. Used for aggregation.
pub investor_cap_in_profit_raw: u128,
/// Raw Σ(price² × sats) for UTXOs in loss. Used for aggregation.
pub investor_cap_in_loss_raw: u128,
}
@@ -36,12 +34,10 @@ pub struct WithoutCapital {
pub(crate) unrealized_loss: u128,
}
/// Full cache state: core + invested capital + investor cap (128 bytes, 2 cache lines).
/// Full cache state: core + investor cap (for sentiment computation).
#[derive(Debug, Default, Clone)]
pub struct WithCapital {
core: WithoutCapital,
invested_capital_in_profit: u128,
invested_capital_in_loss: u128,
investor_cap_in_profit: u128,
investor_cap_in_loss: u128,
}
@@ -120,30 +116,26 @@ impl Accumulate for WithCapital {
#[inline(always)]
fn accumulate_profit(&mut self, price_u128: u128, sats: Sats) {
self.core.supply_in_profit += sats;
let invested_capital = price_u128 * sats.as_u128();
self.invested_capital_in_profit += invested_capital;
self.investor_cap_in_profit += price_u128 * invested_capital;
let invested = price_u128 * sats.as_u128();
self.investor_cap_in_profit += price_u128 * invested;
}
#[inline(always)]
fn accumulate_loss(&mut self, price_u128: u128, sats: Sats) {
self.core.supply_in_loss += sats;
let invested_capital = price_u128 * sats.as_u128();
self.invested_capital_in_loss += invested_capital;
self.investor_cap_in_loss += price_u128 * invested_capital;
let invested = price_u128 * sats.as_u128();
self.investor_cap_in_loss += price_u128 * invested;
}
#[inline(always)]
fn deaccumulate_profit(&mut self, price_u128: u128, sats: Sats) {
self.core.supply_in_profit -= sats;
let invested_capital = price_u128 * sats.as_u128();
self.invested_capital_in_profit -= invested_capital;
self.investor_cap_in_profit -= price_u128 * invested_capital;
let invested = price_u128 * sats.as_u128();
self.investor_cap_in_profit -= price_u128 * invested;
}
#[inline(always)]
fn deaccumulate_loss(&mut self, price_u128: u128, sats: Sats) {
self.core.supply_in_loss -= sats;
let invested_capital = price_u128 * sats.as_u128();
self.invested_capital_in_loss -= invested_capital;
self.investor_cap_in_loss -= price_u128 * invested_capital;
let invested = price_u128 * sats.as_u128();
self.investor_cap_in_loss -= price_u128 * invested;
}
}

View File

@@ -1,422 +0,0 @@
use std::collections::VecDeque;
use brk_error::Result;
use brk_types::{CheckedSub, StoredU64, get_percentile};
use schemars::JsonSchema;
use vecdb::{
AnyStoredVec, AnyVec, EagerVec, Exit, PcoVec, ReadableVec, VecIndex, VecValue, WritableVec,
};
use crate::internal::ComputedVecValue;
fn validate_and_start<I: VecIndex, T: ComputedVecValue + JsonSchema>(
vec: &mut EagerVec<PcoVec<I, T>>,
combined_version: vecdb::Version,
current_start: I,
) -> Result<I> {
vec.validate_computed_version_or_reset(combined_version)?;
Ok(current_start.min(I::from(vec.len())))
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn compute_aggregations<I, T, A>(
max_from: I,
source: &impl ReadableVec<A, T>,
first_indexes: &impl ReadableVec<I, A>,
count_indexes: &impl ReadableVec<I, StoredU64>,
exit: &Exit,
skip_count: usize,
mut min: Option<&mut EagerVec<PcoVec<I, T>>>,
mut max: Option<&mut EagerVec<PcoVec<I, T>>>,
mut sum: Option<&mut EagerVec<PcoVec<I, T>>>,
mut cumulative: Option<&mut EagerVec<PcoVec<I, T>>>,
mut median: Option<&mut EagerVec<PcoVec<I, T>>>,
mut pct10: Option<&mut EagerVec<PcoVec<I, T>>>,
mut pct25: Option<&mut EagerVec<PcoVec<I, T>>>,
mut pct75: Option<&mut EagerVec<PcoVec<I, T>>>,
mut pct90: Option<&mut EagerVec<PcoVec<I, T>>>,
) -> Result<()>
where
I: VecIndex,
T: ComputedVecValue + JsonSchema,
A: VecIndex + VecValue + CheckedSub<A>,
{
let combined_version = source.version() + first_indexes.version() + count_indexes.version();
macro_rules! validate_vec {
($($vec:ident),*) => {{
let mut idx = max_from;
$(if let Some(ref mut v) = $vec {
idx = validate_and_start(v, combined_version, idx)?;
})*
idx
}};
}
let index = validate_vec!(
min, max, sum, cumulative, median, pct10, pct25, pct75, pct90
);
let needs_min = min.is_some();
let needs_max = max.is_some();
let needs_sum = sum.is_some();
let needs_cumulative = cumulative.is_some();
let needs_percentiles = median.is_some()
|| pct10.is_some()
|| pct25.is_some()
|| pct75.is_some()
|| pct90.is_some();
let needs_minmax = needs_min || needs_max;
let needs_sum_or_cumulative = needs_sum || needs_cumulative;
if !needs_minmax && !needs_sum_or_cumulative && !needs_percentiles {
return Ok(());
}
let mut cumulative_val = cumulative.as_ref().map(|cumulative_vec| {
index.decremented().map_or(T::from(0_usize), |idx| {
cumulative_vec
.collect_one_at(idx.to_usize())
.unwrap_or(T::from(0_usize))
})
});
let start = index.to_usize();
// Truncate all vecs to start once, so the loop only pushes
macro_rules! truncate_vec {
($($vec:ident),*) => {
$(if let Some(ref mut v) = $vec {
v.truncate_if_needed_at(start)?;
})*
};
}
truncate_vec!(
min, max, sum, cumulative, median, pct10, pct25, pct75, pct90
);
let fi_len = first_indexes.len();
let first_indexes_batch: Vec<A> = first_indexes.collect_range_at(start, fi_len);
let count_indexes_batch: Vec<StoredU64> = count_indexes.collect_range_at(start, fi_len);
let mut values: Vec<T> = Vec::new();
first_indexes_batch
.into_iter()
.zip(count_indexes_batch)
.enumerate()
.try_for_each(|(_, (first_index, count_index))| -> Result<()> {
let count = u64::from(count_index) as usize;
// Effective count after skipping (e.g., skip coinbase for fee calculations)
let effective_count = count.saturating_sub(skip_count);
let effective_first_index = first_index + skip_count.min(count);
// Fast path: only min/max needed, no sorting or allocation required
if needs_minmax && !needs_percentiles && !needs_sum_or_cumulative {
let efi = effective_first_index.to_usize();
let mut min_val: Option<T> = None;
let mut max_val: Option<T> = None;
source.for_each_range_at(efi, efi + effective_count, |val| {
if needs_min {
min_val = Some(min_val.map_or(val, |m| if val < m { val } else { m }));
}
if needs_max {
max_val = Some(max_val.map_or(val, |m| if val > m { val } else { m }));
}
});
if let Some(ref mut min_vec) = min {
min_vec.push(min_val.or(max_val).unwrap_or_else(|| T::from(0_usize)));
}
if let Some(ref mut max_vec) = max {
max_vec.push(max_val.or(min_val).unwrap_or_else(|| T::from(0_usize)));
}
} else if needs_percentiles || needs_minmax {
source.collect_range_into_at(
effective_first_index.to_usize(),
effective_first_index.to_usize() + effective_count,
&mut values,
);
if values.is_empty() {
macro_rules! push_zero {
($($vec:ident),*) => {
$(if let Some(ref mut v) = $vec {
v.push(T::from(0_usize));
})*
};
}
push_zero!(max, pct90, pct75, median, pct25, pct10, min, sum);
if let Some(ref mut cumulative_vec) = cumulative {
cumulative_vec.push(cumulative_val.unwrap());
}
} else if needs_percentiles {
let sum_val = if needs_sum_or_cumulative {
Some(values.iter().copied().fold(T::from(0), |a, b| a + b))
} else {
None
};
values.sort_unstable();
if let Some(ref mut max_vec) = max {
max_vec.push(*values.last().unwrap());
}
if let Some(ref mut pct90_vec) = pct90 {
pct90_vec.push(get_percentile(&values, 0.90));
}
if let Some(ref mut pct75_vec) = pct75 {
pct75_vec.push(get_percentile(&values, 0.75));
}
if let Some(ref mut median_vec) = median {
median_vec.push(get_percentile(&values, 0.50));
}
if let Some(ref mut pct25_vec) = pct25 {
pct25_vec.push(get_percentile(&values, 0.25));
}
if let Some(ref mut pct10_vec) = pct10 {
pct10_vec.push(get_percentile(&values, 0.10));
}
if let Some(ref mut min_vec) = min {
min_vec.push(*values.first().unwrap());
}
if let Some(sum_val) = sum_val {
if let Some(ref mut sum_vec) = sum {
sum_vec.push(sum_val);
}
if let Some(ref mut cumulative_vec) = cumulative {
let t = cumulative_val.unwrap() + sum_val;
cumulative_val.replace(t);
cumulative_vec.push(t);
}
}
} else if needs_minmax {
// Single pass for min + max + optional sum
let (min_val, max_val, sum_val, _len) = values.iter().copied().fold(
(values[0], values[0], T::from(0_usize), 0_usize),
|(mn, mx, s, c), v| (mn.min(v), mx.max(v), s + v, c + 1),
);
if let Some(ref mut min_vec) = min {
min_vec.push(min_val);
}
if let Some(ref mut max_vec) = max {
max_vec.push(max_val);
}
if needs_sum_or_cumulative {
if let Some(ref mut sum_vec) = sum {
sum_vec.push(sum_val);
}
if let Some(ref mut cumulative_vec) = cumulative {
let t = cumulative_val.unwrap() + sum_val;
cumulative_val.replace(t);
cumulative_vec.push(t);
}
}
}
} else if needs_sum_or_cumulative {
let efi = effective_first_index.to_usize();
let sum_val = source.fold_range_at(
efi,
efi + effective_count,
T::from(0_usize),
|acc, val| acc + val,
);
if let Some(ref mut sum_vec) = sum {
sum_vec.push(sum_val);
}
if let Some(ref mut cumulative_vec) = cumulative {
let t = cumulative_val.unwrap() + sum_val;
cumulative_val.replace(t);
cumulative_vec.push(t);
}
}
Ok(())
})?;
let _lock = exit.lock();
macro_rules! write_vec {
($($vec:ident),*) => {
$(if let Some(v) = $vec { v.write()?; })*
};
}
write_vec!(
min, max, sum, cumulative, median, pct10, pct25, pct75, pct90
);
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn compute_aggregations_nblock_window<I, T, A>(
max_from: I,
source: &(impl ReadableVec<A, T> + Sized),
first_indexes: &impl ReadableVec<I, A>,
count_indexes: &impl ReadableVec<I, StoredU64>,
n_blocks: usize,
exit: &Exit,
min: &mut EagerVec<PcoVec<I, T>>,
max: &mut EagerVec<PcoVec<I, T>>,
median: &mut EagerVec<PcoVec<I, T>>,
pct10: &mut EagerVec<PcoVec<I, T>>,
pct25: &mut EagerVec<PcoVec<I, T>>,
pct75: &mut EagerVec<PcoVec<I, T>>,
pct90: &mut EagerVec<PcoVec<I, T>>,
) -> Result<()>
where
I: VecIndex,
T: ComputedVecValue + CheckedSub + JsonSchema,
A: VecIndex + VecValue + CheckedSub<A>,
{
let combined_version = source.version() + first_indexes.version() + count_indexes.version();
let mut idx = max_from;
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
idx = validate_and_start(vec, combined_version, idx)?;
}
let index = idx;
let start = index.to_usize();
let fi_len = first_indexes.len();
let batch_start = start.saturating_sub(n_blocks - 1);
let first_indexes_batch: Vec<A> = first_indexes.collect_range_at(batch_start, fi_len);
let count_indexes_all: Vec<StoredU64> = count_indexes.collect_range_at(batch_start, fi_len);
let zero = T::from(0_usize);
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.truncate_if_needed_at(start)?;
}
// Persistent sorted window: O(n) merge-insert for new block, O(n) merge-filter
// for expired block. Avoids re-sorting every block. Cursor reads only the new
// block (~1 page decompress vs original's ~4). Ring buffer caches per-block
// sorted values for O(1) expiry.
// Peak memory: 2 × ~15k window elements + n_blocks × ~2500 cached ≈ 360 KB.
let mut block_ring: VecDeque<Vec<T>> = VecDeque::with_capacity(n_blocks + 1);
let mut cursor = source.cursor();
let mut sorted_window: Vec<T> = Vec::new();
let mut merge_buf: Vec<T> = Vec::new();
// Pre-fill initial window blocks [window_start_of_first..start)
let window_start_of_first = start.saturating_sub(n_blocks - 1);
for block_idx in window_start_of_first..start {
let fi = first_indexes_batch[block_idx - batch_start].to_usize();
let count = u64::from(count_indexes_all[block_idx - batch_start]) as usize;
if cursor.position() < fi {
cursor.advance(fi - cursor.position());
}
let mut bv = Vec::with_capacity(count);
cursor.for_each(count, |v: T| bv.push(v));
bv.sort_unstable();
sorted_window.extend_from_slice(&bv);
block_ring.push_back(bv);
}
// Initial sorted_window was built by extending individually sorted blocks —
// stable sort detects these sorted runs and merges in O(n × log(k)) instead of O(n log n).
sorted_window.sort();
for j in 0..(fi_len - start) {
let idx = start + j;
// Read and sort new block's values
let fi = first_indexes_batch[idx - batch_start].to_usize();
let count = u64::from(count_indexes_all[idx - batch_start]) as usize;
if cursor.position() < fi {
cursor.advance(fi - cursor.position());
}
let mut new_block = Vec::with_capacity(count);
cursor.for_each(count, |v: T| new_block.push(v));
new_block.sort_unstable();
// Merge-insert new sorted block into sorted_window: O(n+m)
merge_buf.clear();
merge_buf.reserve(sorted_window.len() + new_block.len());
let (mut si, mut ni) = (0, 0);
while si < sorted_window.len() && ni < new_block.len() {
if sorted_window[si] <= new_block[ni] {
merge_buf.push(sorted_window[si]);
si += 1;
} else {
merge_buf.push(new_block[ni]);
ni += 1;
}
}
merge_buf.extend_from_slice(&sorted_window[si..]);
merge_buf.extend_from_slice(&new_block[ni..]);
std::mem::swap(&mut sorted_window, &mut merge_buf);
block_ring.push_back(new_block);
// Expire oldest block: merge-filter its sorted values from sorted_window in O(n)
if block_ring.len() > n_blocks {
let expired = block_ring.pop_front().unwrap();
merge_buf.clear();
merge_buf.reserve(sorted_window.len());
let mut ei = 0;
for &v in &sorted_window {
if ei < expired.len() && v == expired[ei] {
ei += 1;
} else {
merge_buf.push(v);
}
}
std::mem::swap(&mut sorted_window, &mut merge_buf);
}
if sorted_window.is_empty() {
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.push(zero);
}
} else {
max.push(*sorted_window.last().unwrap());
pct90.push(get_percentile(&sorted_window, 0.90));
pct75.push(get_percentile(&sorted_window, 0.75));
median.push(get_percentile(&sorted_window, 0.50));
pct25.push(get_percentile(&sorted_window, 0.25));
pct10.push(get_percentile(&sorted_window, 0.10));
min.push(*sorted_window.first().unwrap());
}
}
let _lock = exit.lock();
for vec in [min, max, median, pct10, pct25, pct75, pct90] {
vec.write()?;
}
Ok(())
}

View File

@@ -1,4 +1,3 @@
mod aggregation;
mod drawdown;
mod expanding_percentiles;
mod fenwick;
@@ -6,7 +5,6 @@ mod sliding_distribution;
mod sliding_median;
mod sliding_window;
pub(crate) use aggregation::*;
pub(crate) use drawdown::*;
pub(crate) use expanding_percentiles::*;
pub(crate) use fenwick::*;

View File

@@ -2,13 +2,15 @@ use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::Height;
use schemars::JsonSchema;
use vecdb::{Database, Exit, ReadableVec, Rw, StorageMode, VecIndex, VecValue, Version};
use vecdb::{
AnyStoredVec, AnyVec, Database, Exit, ReadableVec, Rw, StorageMode, VecIndex, VecValue,
Version, WritableVec,
};
use crate::{
indexes,
internal::{
CachedWindowStarts, NumericValue, PerBlock, RollingComplete, WindowStarts,
algo::compute_aggregations,
},
};
@@ -68,23 +70,68 @@ where
f64: From<T>,
A: VecIndex + VecValue + brk_types::CheckedSub<A>,
{
compute_aggregations(
max_from,
source,
first_indexes,
count_indexes,
exit,
skip_count,
None,
None,
Some(&mut self.sum.height),
Some(&mut self.cumulative.height),
None,
None,
None,
None,
None,
)?;
let combined_version =
source.version() + first_indexes.version() + count_indexes.version();
let mut index = max_from;
index = {
self.sum
.height
.validate_computed_version_or_reset(combined_version)?;
index.min(Height::from(self.sum.height.len()))
};
index = {
self.cumulative
.height
.validate_computed_version_or_reset(combined_version)?;
index.min(Height::from(self.cumulative.height.len()))
};
let start = index.to_usize();
self.sum.height.truncate_if_needed_at(start)?;
self.cumulative.height.truncate_if_needed_at(start)?;
let mut cumulative_val = index.decremented().map_or(T::from(0_usize), |idx| {
self.cumulative
.height
.collect_one_at(idx.to_usize())
.unwrap_or(T::from(0_usize))
});
let fi_len = first_indexes.len();
let first_indexes_batch: Vec<A> = first_indexes.collect_range_at(start, fi_len);
let count_indexes_batch: Vec<brk_types::StoredU64> =
count_indexes.collect_range_at(start, fi_len);
first_indexes_batch
.into_iter()
.zip(count_indexes_batch)
.try_for_each(|(first_index, count_index)| -> Result<()> {
let count = u64::from(count_index) as usize;
let effective_count = count.saturating_sub(skip_count);
let effective_first_index = first_index + skip_count.min(count);
let efi = effective_first_index.to_usize();
let sum_val = source.fold_range_at(
efi,
efi + effective_count,
T::from(0_usize),
|acc, val| acc + val,
);
self.sum.height.push(sum_val);
cumulative_val = cumulative_val + sum_val;
self.cumulative.height.push(cumulative_val);
Ok(())
})?;
let _lock = exit.lock();
self.sum.height.write()?;
self.cumulative.height.write()?;
drop(_lock);
self.rolling
.compute(max_from, windows, &self.sum.height, exit)?;
Ok(())

View File

@@ -1,19 +1,18 @@
use std::collections::VecDeque;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::Height;
use brk_types::{Height, get_percentile};
use derive_more::{Deref, DerefMut};
use schemars::JsonSchema;
use vecdb::{
CheckedSub, Database, Exit, ReadableVec, Rw, StorageMode,
VecIndex, VecValue, Version,
AnyStoredVec, AnyVec, CheckedSub, Database, Exit, ReadableVec, Rw, StorageMode, VecIndex,
VecValue, Version, WritableVec,
};
use crate::{
indexes,
internal::{
ComputedVecValue, DistributionStats, NumericValue, PerBlock,
algo::{compute_aggregations, compute_aggregations_nblock_window},
},
internal::{ComputedVecValue, DistributionStats, NumericValue, PerBlock},
};
#[derive(Deref, DerefMut, Traversable)]
@@ -46,24 +45,110 @@ impl<T: NumericValue + JsonSchema> PerBlockDistribution<T> {
where
A: VecIndex + VecValue + brk_types::CheckedSub<A>,
{
let s = &mut self.0;
compute_aggregations(
max_from,
source,
first_indexes,
count_indexes,
exit,
skip_count,
Some(&mut s.min.height),
Some(&mut s.max.height),
None,
None,
Some(&mut s.median.height),
Some(&mut s.pct10.height),
Some(&mut s.pct25.height),
Some(&mut s.pct75.height),
Some(&mut s.pct90.height),
)
let DistributionStats {
min,
max,
pct10,
pct25,
median,
pct75,
pct90,
} = &mut self.0;
let min = &mut min.height;
let max = &mut max.height;
let pct10 = &mut pct10.height;
let pct25 = &mut pct25.height;
let median = &mut median.height;
let pct75 = &mut pct75.height;
let pct90 = &mut pct90.height;
let combined_version =
source.version() + first_indexes.version() + count_indexes.version();
let mut index = max_from;
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.validate_computed_version_or_reset(combined_version)?;
index = index.min(Height::from(vec.len()));
}
let start = index.to_usize();
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.truncate_if_needed_at(start)?;
}
let fi_len = first_indexes.len();
let first_indexes_batch: Vec<A> = first_indexes.collect_range_at(start, fi_len);
let count_indexes_batch: Vec<brk_types::StoredU64> =
count_indexes.collect_range_at(start, fi_len);
let mut values: Vec<T> = Vec::new();
first_indexes_batch
.into_iter()
.zip(count_indexes_batch)
.try_for_each(|(first_index, count_index)| -> Result<()> {
let count = u64::from(count_index) as usize;
let effective_count = count.saturating_sub(skip_count);
let effective_first_index = first_index + skip_count.min(count);
source.collect_range_into_at(
effective_first_index.to_usize(),
effective_first_index.to_usize() + effective_count,
&mut values,
);
if values.is_empty() {
let zero = T::from(0_usize);
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.push(zero);
}
} else {
values.sort_unstable();
max.push(*values.last().unwrap());
pct90.push(get_percentile(&values, 0.90));
pct75.push(get_percentile(&values, 0.75));
median.push(get_percentile(&values, 0.50));
pct25.push(get_percentile(&values, 0.25));
pct10.push(get_percentile(&values, 0.10));
min.push(*values.first().unwrap());
}
Ok(())
})?;
let _lock = exit.lock();
for vec in [min, max, median, pct10, pct25, pct75, pct90] {
vec.write()?;
}
Ok(())
}
pub(crate) fn compute_from_nblocks<A>(
@@ -79,21 +164,168 @@ impl<T: NumericValue + JsonSchema> PerBlockDistribution<T> {
T: CheckedSub,
A: VecIndex + VecValue + brk_types::CheckedSub<A>,
{
let s = &mut self.0;
compute_aggregations_nblock_window(
max_from,
source,
first_indexes,
count_indexes,
n_blocks,
exit,
&mut s.min.height,
&mut s.max.height,
&mut s.median.height,
&mut s.pct10.height,
&mut s.pct25.height,
&mut s.pct75.height,
&mut s.pct90.height,
)
let DistributionStats {
min,
max,
pct10,
pct25,
median,
pct75,
pct90,
} = &mut self.0;
let min = &mut min.height;
let max = &mut max.height;
let pct10 = &mut pct10.height;
let pct25 = &mut pct25.height;
let median = &mut median.height;
let pct75 = &mut pct75.height;
let pct90 = &mut pct90.height;
let combined_version =
source.version() + first_indexes.version() + count_indexes.version();
let mut index = max_from;
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.validate_computed_version_or_reset(combined_version)?;
index = index.min(Height::from(vec.len()));
}
let start = index.to_usize();
let fi_len = first_indexes.len();
let batch_start = start.saturating_sub(n_blocks - 1);
let first_indexes_batch: Vec<A> = first_indexes.collect_range_at(batch_start, fi_len);
let count_indexes_all: Vec<brk_types::StoredU64> =
count_indexes.collect_range_at(batch_start, fi_len);
let zero = T::from(0_usize);
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.truncate_if_needed_at(start)?;
}
// Persistent sorted window: O(n) merge-insert for new block, O(n) merge-filter
// for expired block. Avoids re-sorting every block. Cursor reads only the new
// block (~1 page decompress vs original's ~4). Ring buffer caches per-block
// sorted values for O(1) expiry.
// Peak memory: 2 × ~15k window elements + n_blocks × ~2500 cached ≈ 360 KB.
let mut block_ring: VecDeque<Vec<T>> = VecDeque::with_capacity(n_blocks + 1);
let mut cursor = source.cursor();
let mut sorted_window: Vec<T> = Vec::new();
let mut merge_buf: Vec<T> = Vec::new();
// Pre-fill initial window blocks [window_start_of_first..start)
let window_start_of_first = start.saturating_sub(n_blocks - 1);
for block_idx in window_start_of_first..start {
let fi = first_indexes_batch[block_idx - batch_start].to_usize();
let count = u64::from(count_indexes_all[block_idx - batch_start]) as usize;
if cursor.position() < fi {
cursor.advance(fi - cursor.position());
}
let mut bv = Vec::with_capacity(count);
cursor.for_each(count, |v: T| bv.push(v));
bv.sort_unstable();
sorted_window.extend_from_slice(&bv);
block_ring.push_back(bv);
}
// Initial sorted_window was built by extending individually sorted blocks —
// stable sort detects these sorted runs and merges in O(n × log(k)) instead of O(n log n).
sorted_window.sort();
for j in 0..(fi_len - start) {
let idx = start + j;
// Read and sort new block's values
let fi = first_indexes_batch[idx - batch_start].to_usize();
let count = u64::from(count_indexes_all[idx - batch_start]) as usize;
if cursor.position() < fi {
cursor.advance(fi - cursor.position());
}
let mut new_block = Vec::with_capacity(count);
cursor.for_each(count, |v: T| new_block.push(v));
new_block.sort_unstable();
// Merge-insert new sorted block into sorted_window: O(n+m)
merge_buf.clear();
merge_buf.reserve(sorted_window.len() + new_block.len());
let (mut si, mut ni) = (0, 0);
while si < sorted_window.len() && ni < new_block.len() {
if sorted_window[si] <= new_block[ni] {
merge_buf.push(sorted_window[si]);
si += 1;
} else {
merge_buf.push(new_block[ni]);
ni += 1;
}
}
merge_buf.extend_from_slice(&sorted_window[si..]);
merge_buf.extend_from_slice(&new_block[ni..]);
std::mem::swap(&mut sorted_window, &mut merge_buf);
block_ring.push_back(new_block);
// Expire oldest block: merge-filter its sorted values from sorted_window in O(n)
if block_ring.len() > n_blocks {
let expired = block_ring.pop_front().unwrap();
merge_buf.clear();
merge_buf.reserve(sorted_window.len());
let mut ei = 0;
for &v in &sorted_window {
if ei < expired.len() && v == expired[ei] {
ei += 1;
} else {
merge_buf.push(v);
}
}
std::mem::swap(&mut sorted_window, &mut merge_buf);
}
if sorted_window.is_empty() {
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.push(zero);
}
} else {
max.push(*sorted_window.last().unwrap());
pct90.push(get_percentile(&sorted_window, 0.90));
pct75.push(get_percentile(&sorted_window, 0.75));
median.push(get_percentile(&sorted_window, 0.50));
pct25.push(get_percentile(&sorted_window, 0.25));
pct10.push(get_percentile(&sorted_window, 0.10));
min.push(*sorted_window.first().unwrap());
}
}
let _lock = exit.lock();
for vec in [min, max, median, pct10, pct25, pct75, pct90] {
vec.write()?;
}
Ok(())
}
}