computer: fenwick + per block profitability

This commit is contained in:
nym21
2026-03-08 13:46:31 +01:00
parent 7f1f6044dc
commit a4857ee8f4
11 changed files with 848 additions and 342 deletions
@@ -0,0 +1,429 @@
use brk_cohort::{
compute_profitability_boundaries, Filter, PROFITABILITY_RANGE_COUNT,
};
use brk_types::{Cents, CentsCompact, Sats};
use crate::{
distribution::state::PendingDelta,
internal::{FenwickNode, FenwickTree, PERCENTILES, PERCENTILES_LEN},
};
use super::COST_BASIS_PRICE_DIGITS;
/// Number of age range cohorts (21: 20 boundaries + 1 unbounded).
const AGE_RANGE_COUNT: usize = 21;
// Tier boundaries for 5-significant-digit dollar bucketing.
// Matches the rounding used by `Cents::round_to_dollar(5)`.
const TIER0_COUNT: usize = 100_000; // $0-$99,999 exact dollars
const TIER1_COUNT: usize = 90_000; // $100,000-$999,990 step $10
const OVERFLOW: usize = 1; // $1,000,000+ clamped to last bucket
const TIER1_START: usize = TIER0_COUNT;
/// Total number of buckets.
const TREE_SIZE: usize = TIER0_COUNT + TIER1_COUNT + OVERFLOW; // 190,001
/// 4-field Fenwick tree node for combined cost basis tracking.
#[derive(Clone, Copy, Default)]
pub(super) struct CostBasisNode {
all_sats: i64,
sth_sats: i64,
all_usd: i128,
sth_usd: i128,
}
impl FenwickNode for CostBasisNode {
#[inline(always)]
fn add_assign(&mut self, other: &Self) {
self.all_sats += other.all_sats;
self.sth_sats += other.sth_sats;
self.all_usd += other.all_usd;
self.sth_usd += other.sth_usd;
}
}
/// Combined Fenwick tree for per-block accurate percentile and profitability queries.
#[derive(Clone)]
pub(super) struct CostBasisFenwick {
tree: FenwickTree<CostBasisNode>,
/// Running totals (sum of all underlying frequencies).
totals: CostBasisNode,
/// Pre-computed: which age-range cohort index is STH?
is_sth: [bool; AGE_RANGE_COUNT],
initialized: bool,
}
// ---------------------------------------------------------------------------
// Bucket mapping: 5-significant-digit dollar precision
// Uses Cents::round_to_dollar(5) for rounding, then maps rounded dollars
// to a flat bucket index across two tiers.
// ---------------------------------------------------------------------------
/// Map rounded dollars to a flat bucket index.
/// Prices >= $1M are clamped to the last bucket.
#[inline]
fn dollars_to_bucket(dollars: u64) -> usize {
if dollars < 100_000 {
dollars as usize
} else if dollars < 1_000_000 {
TIER1_START + ((dollars - 100_000) / 10) as usize
} else {
TREE_SIZE - 1 // overflow bucket for $1M+
}
}
/// Convert a bucket index back to a price in Cents.
#[inline]
fn bucket_to_cents(bucket: usize) -> Cents {
let dollars: u64 = if bucket < TIER1_START {
bucket as u64
} else if bucket < TREE_SIZE - 1 {
100_000 + (bucket - TIER1_START) as u64 * 10
} else {
1_000_000
};
Cents::from(dollars * 100)
}
/// Map a CentsCompact price to a bucket index.
#[inline]
fn price_to_bucket(price: CentsCompact) -> usize {
let rounded = Cents::from(price).round_to_dollar(COST_BASIS_PRICE_DIGITS);
dollars_to_bucket(u64::from(rounded) / 100)
}
/// Map a Cents price to a bucket index.
#[inline]
fn cents_to_bucket(price: Cents) -> usize {
let rounded = price.round_to_dollar(COST_BASIS_PRICE_DIGITS);
dollars_to_bucket(u64::from(rounded) / 100)
}
// ---------------------------------------------------------------------------
// CostBasisFenwick implementation
// ---------------------------------------------------------------------------
impl CostBasisFenwick {
pub(super) fn new() -> Self {
Self {
tree: FenwickTree::new(TREE_SIZE),
totals: CostBasisNode::default(),
is_sth: [false; AGE_RANGE_COUNT],
initialized: false,
}
}
pub(super) fn is_initialized(&self) -> bool {
self.initialized
}
/// Pre-compute `is_sth` lookup from the STH filter and age-range filters.
pub(super) fn compute_is_sth<'a>(
&mut self,
sth_filter: &Filter,
age_range_filters: impl Iterator<Item = &'a Filter>,
) {
for (i, f) in age_range_filters.enumerate() {
self.is_sth[i] = sth_filter.includes(f);
}
}
pub(super) fn is_sth_at(&self, age_range_idx: usize) -> bool {
self.is_sth[age_range_idx]
}
/// Apply a net delta from a pending map entry.
pub(super) fn apply_delta(
&mut self,
price: CentsCompact,
pending: &PendingDelta,
is_sth: bool,
) {
let net_sats = u64::from(pending.inc) as i64 - u64::from(pending.dec) as i64;
if net_sats == 0 {
return;
}
let bucket = price_to_bucket(price);
let net_usd = price.as_u128() as i128 * net_sats as i128;
let delta = CostBasisNode {
all_sats: net_sats,
sth_sats: if is_sth { net_sats } else { 0 },
all_usd: net_usd,
sth_usd: if is_sth { net_usd } else { 0 },
};
self.tree.add(bucket, &delta);
self.totals.add_assign(&delta);
}
/// Bulk-initialize from BTreeMaps (one per age-range cohort).
/// Call after state import when all pending maps have been drained.
pub(super) fn bulk_init<'a>(
&mut self,
maps: impl Iterator<Item = (&'a std::collections::BTreeMap<CentsCompact, Sats>, bool)>,
) {
self.tree.reset();
self.totals = CostBasisNode::default();
for (map, is_sth) in maps {
for (&price, &sats) in map.iter() {
let bucket = price_to_bucket(price);
let s = u64::from(sats) as i64;
let usd = price.as_u128() as i128 * s as i128;
let node = CostBasisNode {
all_sats: s,
sth_sats: if is_sth { s } else { 0 },
all_usd: usd,
sth_usd: if is_sth { usd } else { 0 },
};
self.tree.add_raw(bucket, &node);
self.totals.add_assign(&node);
}
}
self.tree.build_in_place();
self.initialized = true;
}
/// Reset to uninitialized empty state.
pub(super) fn reset(&mut self) {
self.tree.reset();
self.totals = CostBasisNode::default();
self.initialized = false;
}
// -----------------------------------------------------------------------
// Percentile queries
// -----------------------------------------------------------------------
/// Compute sat-weighted and usd-weighted percentile prices for ALL cohort.
pub(super) fn percentiles_all(&self) -> PercentileResult {
self.compute_percentiles(
self.totals.all_sats,
self.totals.all_usd,
|n| n.all_sats,
|n| n.all_usd,
)
}
/// Compute percentile prices for STH cohort.
pub(super) fn percentiles_sth(&self) -> PercentileResult {
self.compute_percentiles(
self.totals.sth_sats,
self.totals.sth_usd,
|n| n.sth_sats,
|n| n.sth_usd,
)
}
/// Compute percentile prices for LTH cohort (all - sth per node).
pub(super) fn percentiles_lth(&self) -> PercentileResult {
self.compute_percentiles(
self.totals.all_sats - self.totals.sth_sats,
self.totals.all_usd - self.totals.sth_usd,
|n| n.all_sats - n.sth_sats,
|n| n.all_usd - n.sth_usd,
)
}
fn compute_percentiles(
&self,
total_sats: i64,
total_usd: i128,
sat_field: impl Fn(&CostBasisNode) -> i64,
usd_field: impl Fn(&CostBasisNode) -> i128,
) -> PercentileResult {
let mut result = PercentileResult::default();
if total_sats <= 0 {
return result;
}
// Sat-weighted percentiles: find first bucket where cumulative >= target
for (i, &p) in PERCENTILES.iter().enumerate() {
let target = (total_sats * i64::from(p) / 100 - 1).max(0);
let bucket = self.tree.kth(target, &sat_field);
result.sat_prices[i] = bucket_to_cents(bucket);
}
// USD-weighted percentiles
if total_usd > 0 {
for (i, &p) in PERCENTILES.iter().enumerate() {
let target = (total_usd * i128::from(p) / 100 - 1).max(0);
let bucket = self.tree.kth(target, &usd_field);
result.usd_prices[i] = bucket_to_cents(bucket);
}
}
// Min/max via kth(0) and kth(total-1)
result.min_price = bucket_to_cents(self.tree.kth(0i64, &sat_field));
result.max_price = bucket_to_cents(self.tree.kth(total_sats - 1, &sat_field));
result
}
// -----------------------------------------------------------------------
// Profitability queries (all cohort only)
// -----------------------------------------------------------------------
/// Compute profitability range buckets from current spot price.
/// Returns 25 ranges: (sats, usd_raw) per range.
pub(super) fn profitability(
&self,
spot_price: Cents,
) -> [(u64, u128); PROFITABILITY_RANGE_COUNT] {
let mut result = [(0u64, 0u128); PROFITABILITY_RANGE_COUNT];
if self.totals.all_sats <= 0 {
return result;
}
let boundaries = compute_profitability_boundaries(spot_price);
let mut prev_sats: i64 = 0;
let mut prev_usd: i128 = 0;
for (i, &boundary) in boundaries.iter().enumerate() {
let boundary_bucket = cents_to_bucket(boundary);
// prefix_sum through the bucket BEFORE the boundary
let cum = if boundary_bucket > 0 {
self.tree.prefix_sum(boundary_bucket - 1)
} else {
CostBasisNode::default()
};
let range_sats = cum.all_sats - prev_sats;
let range_usd = cum.all_usd - prev_usd;
result[i] = (range_sats.max(0) as u64, range_usd.max(0) as u128);
prev_sats = cum.all_sats;
prev_usd = cum.all_usd;
}
// Last range: everything >= last boundary
let remaining_sats = self.totals.all_sats - prev_sats;
let remaining_usd = self.totals.all_usd - prev_usd;
result[PROFITABILITY_RANGE_COUNT - 1] =
(remaining_sats.max(0) as u64, remaining_usd.max(0) as u128);
result
}
}
/// Result of a percentile computation for one cohort.
#[derive(Default)]
pub(super) struct PercentileResult {
pub sat_prices: [Cents; PERCENTILES_LEN],
pub usd_prices: [Cents; PERCENTILES_LEN],
pub min_price: Cents,
pub max_price: Cents,
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeMap;
#[test]
fn bucket_round_trip() {
// Low prices: exact dollar precision
let price = CentsCompact::new(5000_00); // $5000
let bucket = price_to_bucket(price);
let back = bucket_to_cents(bucket);
assert_eq!(u64::from(back), 5000 * 100);
// High price: $90,000 → rounded to $90,000 (already 5 digits)
let price = CentsCompact::new(90_000_00);
let bucket = price_to_bucket(price);
let back = bucket_to_cents(bucket);
assert_eq!(u64::from(back), 90_000 * 100);
// Tier 1: $123,456 → rounded to $123,460
let price = CentsCompact::new(123_456_00);
let bucket = price_to_bucket(price);
let back = bucket_to_cents(bucket);
assert_eq!(u64::from(back), 123_460 * 100);
// Overflow: $2,000,000 → clamped to $1,000,000
let price = CentsCompact::new(2_000_000_00);
let bucket = price_to_bucket(price);
assert_eq!(bucket, TREE_SIZE - 1);
assert_eq!(u64::from(bucket_to_cents(bucket)), 1_000_000 * 100);
}
#[test]
fn bucket_edge_cases() {
// $0
assert_eq!(price_to_bucket(CentsCompact::new(0)), 0);
assert_eq!(u64::from(bucket_to_cents(0)), 0);
// $1
let bucket = price_to_bucket(CentsCompact::new(100));
assert_eq!(bucket, 1);
// Max CentsCompact
let bucket = price_to_bucket(CentsCompact::MAX);
assert!(bucket < TREE_SIZE);
}
#[test]
fn bulk_init_and_percentiles() {
let mut fenwick = CostBasisFenwick::new();
// Create a simple BTreeMap: 100 sats at $10,000, 100 sats at $50,000
let mut map = BTreeMap::new();
map.insert(CentsCompact::new(10_000_00), Sats::from(100u64));
map.insert(CentsCompact::new(50_000_00), Sats::from(100u64));
fenwick.bulk_init(std::iter::once((&map, true)));
assert!(fenwick.is_initialized());
let result = fenwick.percentiles_all();
// Median (50th percentile) should be at $10,000 (first 100 sats)
// since target = 200 * 50/100 = 100, and first 100 sats are at $10,000
assert_eq!(u64::from(result.sat_prices[9]), 10_000 * 100); // index 9 = 50th percentile
// Min should be $10,000, max should be $50,000
assert_eq!(u64::from(result.min_price), 10_000 * 100);
assert_eq!(u64::from(result.max_price), 50_000 * 100);
}
#[test]
fn apply_delta_updates_totals() {
let mut fenwick = CostBasisFenwick::new();
fenwick.initialized = true;
let price = CentsCompact::new(10_000_00);
fenwick.apply_delta(price, &PendingDelta { inc: Sats::from(500u64), dec: Sats::ZERO }, true);
assert_eq!(fenwick.totals.all_sats, 500);
assert_eq!(fenwick.totals.sth_sats, 500);
fenwick.apply_delta(price, &PendingDelta { inc: Sats::ZERO, dec: Sats::from(200u64) }, true);
assert_eq!(fenwick.totals.all_sats, 300);
assert_eq!(fenwick.totals.sth_sats, 300);
// Non-STH delta
fenwick.apply_delta(price, &PendingDelta { inc: Sats::from(100u64), dec: Sats::ZERO }, false);
assert_eq!(fenwick.totals.all_sats, 400);
assert_eq!(fenwick.totals.sth_sats, 300);
}
#[test]
fn profitability_ranges_sum_to_total() {
let mut fenwick = CostBasisFenwick::new();
let mut map = BTreeMap::new();
// Spread sats across different prices
map.insert(CentsCompact::new(1_000_00), Sats::from(1000u64));
map.insert(CentsCompact::new(10_000_00), Sats::from(2000u64));
map.insert(CentsCompact::new(50_000_00), Sats::from(3000u64));
map.insert(CentsCompact::new(100_000_00), Sats::from(4000u64));
fenwick.bulk_init(std::iter::once((&map, false)));
let spot = Cents::from(50_000u64 * 100);
let prof = fenwick.profitability(spot);
let total_sats: u64 = prof.iter().map(|(s, _)| s).sum();
assert_eq!(total_sats, 10_000);
}
}
@@ -2,7 +2,7 @@ use std::path::Path;
use brk_cohort::{
ByAgeRange, ByAmountRange, ByClass, ByEpoch, ByGreatEqualAmount, ByLowerThanAmount, ByMaxAge,
ByMinAge, BySpendableType, CohortContext, Filter, Term,
ByMinAge, BySpendableType, CohortContext, Filter, Filtered, Term,
};
use brk_error::Result;
use brk_traversable::Traversable;
@@ -26,7 +26,7 @@ use crate::{
prices,
};
use super::{percentiles::PercentileCache, vecs::UTXOCohortVecs};
use super::{fenwick::CostBasisFenwick, vecs::UTXOCohortVecs};
const VERSION: Version = Version::new(0);
@@ -48,7 +48,7 @@ pub struct UTXOCohorts<M: StorageMode = Rw> {
pub profitability: ProfitabilityMetrics<M>,
pub matured: ByAgeRange<ValueFromHeight<M>>,
#[traversable(skip)]
pub(super) percentile_cache: PercentileCache,
pub(super) fenwick: CostBasisFenwick,
/// Cached partition_point positions for tick_tock boundary searches.
/// Avoids O(log n) binary search per boundary per block; scans forward
/// from last known position (typically O(1) per boundary).
@@ -244,11 +244,57 @@ impl UTXOCohorts<Rw> {
ge_amount,
profitability,
matured,
percentile_cache: PercentileCache::default(),
fenwick: CostBasisFenwick::new(),
tick_tock_cached_positions: [0; 20],
})
}
/// Initialize the Fenwick tree from all age-range BTreeMaps.
/// Call after state import when all pending maps have been drained.
pub(crate) fn init_fenwick_if_needed(&mut self) {
if self.fenwick.is_initialized() {
return;
}
let Self {
sth, fenwick, age_range, ..
} = self;
fenwick.compute_is_sth(&sth.metrics.filter, age_range.iter().map(|v| v.filter()));
let maps: Vec<_> = age_range
.iter()
.enumerate()
.filter_map(|(i, sub)| {
let state = sub.state.as_ref()?;
let map = state.cost_basis_map();
if map.is_empty() {
return None;
}
Some((map, fenwick.is_sth_at(i)))
})
.collect();
fenwick.bulk_init(maps.into_iter());
}
/// Apply pending deltas from all age-range cohorts to the Fenwick tree.
/// Call after receive/send, before push_cohort_states.
pub(crate) fn update_fenwick_from_pending(&mut self) {
if !self.fenwick.is_initialized() {
return;
}
// Destructure to get separate borrows on fenwick and age_range
let Self {
fenwick, age_range, ..
} = self;
for (i, sub) in age_range.iter().enumerate() {
if let Some(state) = sub.state.as_ref() {
let is_sth = fenwick.is_sth_at(i);
state.for_each_cost_basis_pending(|&price, delta| {
fenwick.apply_delta(price, delta, is_sth);
});
}
}
}
/// Push maturation sats to the matured vecs for the given height.
pub(crate) fn push_maturation(
&mut self,
@@ -1,3 +1,4 @@
mod fenwick;
mod groups;
mod percentiles;
mod receive;
@@ -5,4 +6,7 @@ mod send;
mod tick_tock;
mod vecs;
/// Rounding precision for UTXO cost basis prices (5 significant digits in dollars).
const COST_BASIS_PRICE_DIGITS: i32 = 5;
pub use groups::*;
@@ -1,51 +1,21 @@
use std::{cmp::Reverse, collections::BinaryHeap, fs, path::Path};
use brk_cohort::{
compute_profitability_boundaries, Filtered, PROFITABILITY_BOUNDARY_COUNT,
PROFITABILITY_RANGE_COUNT, PROFIT_COUNT, TERM_NAMES,
};
use brk_cohort::{Filtered, PROFITABILITY_RANGE_COUNT, PROFIT_COUNT, TERM_NAMES};
use brk_error::Result;
use brk_types::{Cents, CentsCompact, CostBasisDistribution, Date, Dollars, Height, Sats};
use crate::internal::{PERCENTILES, PERCENTILES_LEN};
use crate::distribution::metrics::{CostBasis, ProfitabilityMetrics};
use super::fenwick::PercentileResult;
use super::groups::UTXOCohorts;
const COST_BASIS_PRICE_DIGITS: i32 = 5;
#[derive(Clone, Default)]
pub(super) struct CachedPercentiles {
sat_result: [Cents; PERCENTILES_LEN],
usd_result: [Cents; PERCENTILES_LEN],
min_price: Cents,
max_price: Cents,
}
impl CachedPercentiles {
fn push(&self, height: Height, cost_basis: &mut CostBasis) -> Result<()> {
cost_basis.truncate_push_minmax(height, self.min_price, self.max_price)?;
cost_basis.truncate_push_percentiles(height, &self.sat_result, &self.usd_result)
}
}
/// Cached percentile + profitability results for all/sth/lth.
/// Avoids re-merging 21 BTreeMaps on every block.
#[derive(Clone, Default)]
pub(super) struct PercentileCache {
all: CachedPercentiles,
sth: CachedPercentiles,
lth: CachedPercentiles,
profitability: [(u64, u128); PROFITABILITY_RANGE_COUNT],
initialized: bool,
}
use super::COST_BASIS_PRICE_DIGITS;
impl UTXOCohorts {
/// Compute and push percentiles + profitability for aggregate cohorts.
///
/// Full K-way merge only runs at day boundaries or when the cache is empty.
/// For intermediate blocks, pushes cached values.
/// Percentiles and profitability are computed per-block from the Fenwick tree.
/// Disk distributions are written only at day boundaries via K-way merge.
pub(crate) fn truncate_push_aggregate_percentiles(
&mut self,
height: Height,
@@ -53,105 +23,84 @@ impl UTXOCohorts {
date_opt: Option<Date>,
states_path: &Path,
) -> Result<()> {
if date_opt.is_some() || !self.percentile_cache.initialized {
self.recompute_cache(spot_price, date_opt, states_path)?;
if self.fenwick.is_initialized() {
// Per-block accurate percentiles from Fenwick tree
self.push_percentiles_from_fenwick(height)?;
// Per-block accurate profitability from Fenwick tree with current spot_price
let prof = self.fenwick.profitability(spot_price);
push_profitability(height, &prof, &mut self.profitability)?;
}
self.push_cached(height)
}
/// Full K-way merge: recompute percentiles + profitability from scratch, update cache.
fn recompute_cache(
&mut self,
spot_price: Cents,
date_opt: Option<Date>,
states_path: &Path,
) -> Result<()> {
let collect_merged = date_opt.is_some();
let boundaries = compute_profitability_boundaries(spot_price);
let targets = {
let sth_filter = self.sth.metrics.filter.clone();
let mut totals = AllSthLth::<(u64, u128)>::default();
let maps: Vec<_> = self
.age_range
.iter()
.filter_map(|sub| {
let state = sub.state.as_ref()?;
let map = state.cost_basis_map();
if map.is_empty() {
return None;
}
let is_sth = sth_filter.includes(sub.filter());
let mut cs = 0u64;
let mut cu = 0u128;
for (&price, &sats) in map.iter() {
let s = u64::from(sats);
cs += s;
cu += price.as_u128() * s as u128;
}
totals.all.0 += cs;
totals.all.1 += cu;
let term = totals.term_mut(is_sth);
term.0 += cs;
term.1 += cu;
Some((map, is_sth))
})
.collect();
let cap = if collect_merged {
maps.iter().map(|(m, _)| m.len()).max().unwrap_or(0)
} else {
0
};
let all_has_data = totals.all.0 > 0;
let mut targets = totals.map(|(sats, usd)| PercTarget::new(sats, usd, cap));
self.percentile_cache.profitability = Default::default();
if all_has_data {
merge_k_way(
&maps,
&mut targets,
&boundaries,
&mut self.percentile_cache.profitability,
collect_merged,
);
}
targets
};
self.percentile_cache.all = targets.all.to_cached();
self.percentile_cache.sth = targets.sth.to_cached();
self.percentile_cache.lth = targets.lth.to_cached();
self.percentile_cache.initialized = true;
// Disk distributions only at day boundaries
if let Some(date) = date_opt {
write_distribution(states_path, "all", date, targets.all.merged)?;
write_distribution(states_path, TERM_NAMES.short.id, date, targets.sth.merged)?;
write_distribution(states_path, TERM_NAMES.long.id, date, targets.lth.merged)?;
self.write_disk_distributions(date, states_path)?;
}
Ok(())
}
/// Push cached percentile + profitability values.
fn push_cached(&mut self, height: Height) -> Result<()> {
self.percentile_cache
.all
.push(height, &mut self.all.metrics.cost_basis)?;
self.percentile_cache
.sth
.push(height, &mut self.sth.metrics.cost_basis)?;
self.percentile_cache
.lth
.push(height, &mut self.lth.metrics.cost_basis)?;
push_profitability(
height,
&self.percentile_cache.profitability,
&mut self.profitability,
)
/// Push Fenwick-computed percentiles for all/sth/lth to vecs.
fn push_percentiles_from_fenwick(&mut self, height: Height) -> Result<()> {
let all = self.fenwick.percentiles_all();
push_percentile_result(height, &all, &mut self.all.metrics.cost_basis)?;
let sth = self.fenwick.percentiles_sth();
push_percentile_result(height, &sth, &mut self.sth.metrics.cost_basis)?;
let lth = self.fenwick.percentiles_lth();
push_percentile_result(height, &lth, &mut self.lth.metrics.cost_basis)?;
Ok(())
}
/// K-way merge only for writing daily cost basis distributions to disk.
fn write_disk_distributions(&mut self, date: Date, states_path: &Path) -> Result<()> {
let sth_filter = self.sth.metrics.filter.clone();
let maps: Vec<_> = self
.age_range
.iter()
.filter_map(|sub| {
let state = sub.state.as_ref()?;
let map = state.cost_basis_map();
if map.is_empty() {
return None;
}
let is_sth = sth_filter.includes(sub.filter());
Some((map, is_sth))
})
.collect();
if maps.is_empty() {
return Ok(());
}
let cap = maps.iter().map(|(m, _)| m.len()).max().unwrap_or(0);
let mut targets = AllSthLth {
all: MergeTarget::new(cap),
sth: MergeTarget::new(cap),
lth: MergeTarget::new(cap),
};
merge_k_way(&maps, &mut targets);
write_distribution(states_path, "all", date, targets.all.merged)?;
write_distribution(states_path, TERM_NAMES.short.id, date, targets.sth.merged)?;
write_distribution(states_path, TERM_NAMES.long.id, date, targets.lth.merged)?;
Ok(())
}
}
/// Push a PercentileResult to cost basis vecs.
fn push_percentile_result(
height: Height,
result: &PercentileResult,
cost_basis: &mut CostBasis,
) -> Result<()> {
cost_basis.truncate_push_minmax(height, result.min_price, result.max_price)?;
cost_basis.truncate_push_percentiles(height, &result.sat_prices, &result.usd_prices)
}
/// Convert raw (cents × sats) accumulator to Dollars (÷ 100 for cents→dollars, ÷ 1e8 for sats).
@@ -215,14 +164,67 @@ fn write_distribution(
Ok(())
}
// ---------------------------------------------------------------------------
// K-way merge (retained only for disk distribution writes)
// ---------------------------------------------------------------------------
struct AllSthLth<T> {
all: T,
sth: T,
lth: T,
}
impl<T> AllSthLth<T> {
fn term_mut(&mut self, is_sth: bool) -> &mut T {
if is_sth { &mut self.sth } else { &mut self.lth }
}
fn for_each_mut(&mut self, mut f: impl FnMut(&mut T)) {
f(&mut self.all);
f(&mut self.sth);
f(&mut self.lth);
}
}
/// Merge target that only collects rounded (price, sats) pairs for disk distribution.
struct MergeTarget {
price_sats: u64,
merged: Vec<(CentsCompact, Sats)>,
}
impl MergeTarget {
fn new(cap: usize) -> Self {
Self {
price_sats: 0,
merged: Vec::with_capacity(cap),
}
}
#[inline]
fn accumulate(&mut self, amount: u64) {
self.price_sats += amount;
}
fn finalize_price(&mut self, price: Cents) {
if self.price_sats > 0 {
let rounded: CentsCompact = price.round_to_dollar(COST_BASIS_PRICE_DIGITS).into();
if let Some((lp, ls)) = self.merged.last_mut()
&& *lp == rounded
{
*ls += Sats::from(self.price_sats);
} else {
self.merged.push((rounded, Sats::from(self.price_sats)));
}
}
self.price_sats = 0;
}
}
/// K-way merge via BinaryHeap over BTreeMap iterators.
/// Also accumulates profitability buckets for the "all" target using cursor approach.
/// Only builds merged distribution for disk writes.
fn merge_k_way(
maps: &[(&std::collections::BTreeMap<CentsCompact, Sats>, bool)],
targets: &mut AllSthLth<PercTarget>,
boundaries: &[Cents; PROFITABILITY_BOUNDARY_COUNT],
prof: &mut [(u64, u128); PROFITABILITY_RANGE_COUNT],
collect_merged: bool,
targets: &mut AllSthLth<MergeTarget>,
) {
let mut iters: Vec<_> = maps
.iter()
@@ -238,33 +240,21 @@ fn merge_k_way(
}
let mut current_price: Option<CentsCompact> = None;
let mut boundary_idx = 0usize;
while let Some(Reverse((price, ci))) = heap.pop() {
let (ref mut iter, is_sth) = iters[ci];
let (_, &sats) = iter.next().unwrap();
let amount = u64::from(sats);
let price_cents = Cents::from(price);
let usd = price_cents.as_u128() * amount as u128;
if let Some(prev) = current_price
&& prev != price
{
targets.for_each_mut(|t| t.finalize_price(prev.into(), collect_merged));
targets.for_each_mut(|t| t.finalize_price(prev.into()));
}
current_price = Some(price);
targets.all.accumulate(amount, usd);
targets.term_mut(is_sth).accumulate(amount, usd);
// Profitability: advance cursor past boundaries (prices are ascending)
while boundary_idx < PROFITABILITY_BOUNDARY_COUNT
&& price_cents >= boundaries[boundary_idx]
{
boundary_idx += 1;
}
prof[boundary_idx].0 += amount;
prof[boundary_idx].1 += usd;
targets.all.accumulate(amount);
targets.term_mut(is_sth).accumulate(amount);
if let Some(&(&next_price, _)) = iter.peek() {
heap.push(Reverse((next_price, ci)));
@@ -272,148 +262,6 @@ fn merge_k_way(
}
if let Some(price) = current_price {
targets.for_each_mut(|t| t.finalize_price(price.into(), collect_merged));
targets.for_each_mut(|t| t.finalize_price(price.into()));
}
}
struct AllSthLth<T> {
all: T,
sth: T,
lth: T,
}
impl<T: Default> Default for AllSthLth<T> {
fn default() -> Self {
Self {
all: T::default(),
sth: T::default(),
lth: T::default(),
}
}
}
impl<T> AllSthLth<T> {
fn term_mut(&mut self, is_sth: bool) -> &mut T {
if is_sth { &mut self.sth } else { &mut self.lth }
}
fn map<U>(self, mut f: impl FnMut(T) -> U) -> AllSthLth<U> {
AllSthLth {
all: f(self.all),
sth: f(self.sth),
lth: f(self.lth),
}
}
fn for_each_mut(&mut self, mut f: impl FnMut(&mut T)) {
f(&mut self.all);
f(&mut self.sth);
f(&mut self.lth);
}
}
struct PercTarget {
total_sats: u64,
total_usd: u128,
cum_sats: u64,
cum_usd: u128,
sat_idx: usize,
usd_idx: usize,
sat_targets: [u64; PERCENTILES_LEN],
usd_targets: [u128; PERCENTILES_LEN],
sat_result: [Cents; PERCENTILES_LEN],
usd_result: [Cents; PERCENTILES_LEN],
price_sats: u64,
price_usd: u128,
min_price: Cents,
max_price: Cents,
merged: Vec<(CentsCompact, Sats)>,
}
impl PercTarget {
fn new(total_sats: u64, total_usd: u128, merged_cap: usize) -> Self {
Self {
sat_targets: if total_sats > 0 {
PERCENTILES.map(|p| total_sats * u64::from(p) / 100)
} else {
[0; PERCENTILES_LEN]
},
usd_targets: if total_usd > 0 {
PERCENTILES.map(|p| total_usd * u128::from(p) / 100)
} else {
[0; PERCENTILES_LEN]
},
total_sats,
total_usd,
cum_sats: 0,
cum_usd: 0,
sat_idx: 0,
usd_idx: 0,
sat_result: [Cents::ZERO; PERCENTILES_LEN],
usd_result: [Cents::ZERO; PERCENTILES_LEN],
price_sats: 0,
price_usd: 0,
min_price: Cents::ZERO,
max_price: Cents::ZERO,
merged: Vec::with_capacity(merged_cap),
}
}
fn to_cached(&self) -> CachedPercentiles {
CachedPercentiles {
sat_result: self.sat_result,
usd_result: self.usd_result,
min_price: self.min_price,
max_price: self.max_price,
}
}
#[inline]
fn accumulate(&mut self, amount: u64, usd: u128) {
self.price_sats += amount;
self.price_usd += usd;
}
fn finalize_price(&mut self, price: Cents, collect_merged: bool) {
if self.price_sats > 0 {
if self.min_price == Cents::ZERO {
self.min_price = price;
}
self.max_price = price;
if collect_merged {
let rounded: CentsCompact = price.round_to_dollar(COST_BASIS_PRICE_DIGITS).into();
if let Some((lp, ls)) = self.merged.last_mut()
&& *lp == rounded
{
*ls += Sats::from(self.price_sats);
} else {
self.merged.push((rounded, Sats::from(self.price_sats)));
}
}
}
self.cum_sats += self.price_sats;
self.cum_usd += self.price_usd;
if self.total_sats > 0 {
while self.sat_idx < PERCENTILES_LEN
&& self.cum_sats >= self.sat_targets[self.sat_idx]
{
self.sat_result[self.sat_idx] = price;
self.sat_idx += 1;
}
}
if self.total_usd > 0 {
while self.usd_idx < PERCENTILES_LEN
&& self.cum_usd >= self.usd_targets[self.usd_idx]
{
self.usd_result[self.usd_idx] = price;
self.usd_idx += 1;
}
}
self.price_sats = 0;
self.price_usd = 0;
}
}
@@ -207,6 +207,9 @@ pub(crate) fn process_blocks(
let mut cache = AddressCache::new();
debug!("AddressCache created, entering main loop");
// Initialize Fenwick tree from imported BTreeMap state (one-time)
vecs.utxo_cohorts.init_fenwick_if_needed();
// Reusable hashsets (avoid per-block allocation)
let mut received_addresses = ByAddressType::<FxHashSet<TypeIndex>>::default();
let mut seen_senders = ByAddressType::<FxHashSet<TypeIndex>>::default();
@@ -414,6 +417,9 @@ pub(crate) fn process_blocks(
);
addr_result?;
// Update Fenwick tree from pending deltas (must happen before push_cohort_states drains pending)
vecs.utxo_cohorts.update_fenwick_from_pending();
// Push to height-indexed vectors
vecs.addr_count
.truncate_push_height(height, addr_counts.sum(), &addr_counts)?;
@@ -3,7 +3,7 @@ use std::{collections::BTreeMap, path::Path};
use brk_error::Result;
use brk_types::{Age, Cents, CentsCompact, CentsSats, CentsSquaredSats, CostBasisSnapshot, Height, Sats, SupplyState};
use super::super::cost_basis::{CostBasisData, RealizedOps, UnrealizedState};
use super::super::cost_basis::{CostBasisData, PendingDelta, RealizedOps, UnrealizedState};
pub struct SendPrecomputed {
pub sats: Sats,
@@ -93,6 +93,10 @@ impl<R: RealizedOps> CohortState<R> {
Ok(())
}
pub(crate) fn for_each_cost_basis_pending(&self, f: impl FnMut(&CentsCompact, &PendingDelta)) {
self.cost_basis_data.for_each_pending(f);
}
pub(crate) fn apply_pending(&mut self) {
self.cost_basis_data.apply_pending();
}
@@ -24,11 +24,18 @@ struct PendingRaw {
investor_cap_dec: CentsSquaredSats,
}
/// Pending increments and decrements for a single price bucket.
#[derive(Clone, Copy, Debug, Default)]
pub struct PendingDelta {
pub inc: Sats,
pub dec: Sats,
}
#[derive(Clone, Debug)]
pub struct CostBasisData {
pathbuf: PathBuf,
state: Option<State>,
pending: FxHashMap<CentsCompact, (Sats, Sats)>,
pending: FxHashMap<CentsCompact, PendingDelta>,
pending_raw: PendingRaw,
cache: Option<CachedUnrealizedState>,
rounding_digits: Option<i32>,
@@ -121,7 +128,7 @@ impl CostBasisData {
investor_cap: CentsSquaredSats,
) {
let price = self.round_price(price);
self.pending.entry(price.into()).or_default().0 += sats;
self.pending.entry(price.into()).or_default().inc += sats;
self.pending_raw.cap_inc += price_sats;
if investor_cap != CentsSquaredSats::ZERO {
self.pending_raw.investor_cap_inc += investor_cap;
@@ -141,7 +148,7 @@ impl CostBasisData {
investor_cap: CentsSquaredSats,
) {
let price = self.round_price(price);
self.pending.entry(price.into()).or_default().1 += sats;
self.pending.entry(price.into()).or_default().dec += sats;
self.pending_raw.cap_dec += price_sats;
if investor_cap != CentsSquaredSats::ZERO {
self.pending_raw.investor_cap_dec += investor_cap;
@@ -151,13 +158,17 @@ impl CostBasisData {
}
}
pub(crate) fn for_each_pending(&self, mut f: impl FnMut(&CentsCompact, &PendingDelta)) {
self.pending.iter().for_each(|(k, v)| f(k, v));
}
pub(crate) fn apply_pending(&mut self) {
if self.pending.is_empty() {
return;
}
self.generation = self.generation.wrapping_add(1);
let map = &mut self.state.as_mut().unwrap().base.map;
for (cents, (inc, dec)) in self.pending.drain() {
for (cents, PendingDelta { inc, dec }) in self.pending.drain() {
match map.entry(cents) {
Entry::Occupied(mut e) => {
*e.get_mut() += inc;
@@ -1,5 +1,7 @@
use brk_types::StoredF32;
use super::fenwick::FenwickTree;
/// Fast expanding percentile tracker using a Fenwick tree (Binary Indexed Tree).
///
/// Values are discretized to 10 BPS (0.1%) resolution and tracked in
@@ -9,9 +11,7 @@ use brk_types::StoredF32;
/// - 0.1% value resolution (10 BPS granularity)
#[derive(Clone)]
pub(crate) struct ExpandingPercentiles {
/// Fenwick tree storing cumulative frequency counts.
/// 1-indexed: tree[0] is unused, tree[1..=TREE_SIZE] hold data.
tree: Vec<u32>,
tree: FenwickTree<u32>,
count: u32,
}
@@ -24,7 +24,7 @@ const TREE_SIZE: usize = (MAX_BPS / BUCKET_BPS) as usize + 1;
impl Default for ExpandingPercentiles {
fn default() -> Self {
Self {
tree: vec![0u32; TREE_SIZE + 1], // 1-indexed
tree: FenwickTree::new(TREE_SIZE),
count: 0,
}
}
@@ -36,16 +36,15 @@ impl ExpandingPercentiles {
}
pub fn reset(&mut self) {
self.tree.iter_mut().for_each(|v| *v = 0);
self.tree.reset();
self.count = 0;
}
/// Convert f32 ratio to bucket index (1-indexed for Fenwick).
/// Convert f32 ratio to 0-indexed bucket.
#[inline]
fn to_bucket(value: f32) -> usize {
let bps = (value as f64 * 10000.0).round() as i32;
let bucket = (bps / BUCKET_BPS).clamp(0, TREE_SIZE as i32 - 1);
bucket as usize + 1
(bps / BUCKET_BPS).clamp(0, TREE_SIZE as i32 - 1) as usize
}
/// Bulk-load values in O(n + N) instead of O(n log N).
@@ -57,16 +56,9 @@ impl ExpandingPercentiles {
continue;
}
self.count += 1;
self.tree[Self::to_bucket(v)] += 1;
}
// Convert flat frequencies to Fenwick tree in O(N)
for i in 1..=TREE_SIZE {
let parent = i + (i & i.wrapping_neg());
if parent <= TREE_SIZE {
let val = self.tree[i];
self.tree[parent] += val;
}
self.tree.add_raw(Self::to_bucket(v), &1);
}
self.tree.build_in_place();
}
/// Add a value. O(log N).
@@ -76,28 +68,7 @@ impl ExpandingPercentiles {
return;
}
self.count += 1;
let mut i = Self::to_bucket(value);
while i <= TREE_SIZE {
self.tree[i] += 1;
i += i & i.wrapping_neg();
}
}
/// Find the bucket containing the k-th element (1-indexed k).
/// Uses the standard Fenwick tree walk-down in O(log N).
#[inline]
fn kth(&self, mut k: u32) -> usize {
let mut pos = 0;
let mut bit = 1 << (usize::BITS - 1 - TREE_SIZE.leading_zeros());
while bit > 0 {
let next = pos + bit;
if next <= TREE_SIZE && self.tree[next] < k {
k -= self.tree[next];
pos = next;
}
bit >>= 1;
}
pos + 1
self.tree.add(Self::to_bucket(value), &1);
}
/// Compute 6 percentiles in one call. O(6 × log N).
@@ -109,7 +80,9 @@ impl ExpandingPercentiles {
}
for (i, &q) in qs.iter().enumerate() {
let k = ((q * self.count as f64).ceil() as u32).clamp(1, self.count);
out[i] = (self.kth(k) as u32 - 1) * BUCKET_BPS as u32;
// kth with 0-indexed k: k-1; result is 0-indexed bucket
let bucket = self.tree.kth(k - 1, |n| *n);
out[i] = bucket as u32 * BUCKET_BPS as u32;
}
}
}
@@ -0,0 +1,182 @@
/// Trait for types that can be stored in a Fenwick tree.
pub(crate) trait FenwickNode: Clone + Copy + Default {
fn add_assign(&mut self, other: &Self);
}
impl FenwickNode for u32 {
#[inline(always)]
fn add_assign(&mut self, other: &Self) {
*self += other;
}
}
/// Generic Fenwick tree (Binary Indexed Tree) over arbitrary node types.
///
/// Uses 0-indexed buckets externally; 1-indexed internally.
/// Provides O(log N) point-update, prefix-sum, and kth walk-down.
#[derive(Clone)]
pub(crate) struct FenwickTree<N: FenwickNode> {
/// 1-indexed tree array. Position 0 is unused.
tree: Vec<N>,
size: usize,
}
impl<N: FenwickNode> FenwickTree<N> {
pub fn new(size: usize) -> Self {
Self {
tree: vec![N::default(); size + 1],
size,
}
}
#[inline]
pub fn size(&self) -> usize {
self.size
}
pub fn reset(&mut self) {
self.tree.fill(N::default());
}
/// Point-update: add `delta` to the node at `bucket` (0-indexed).
#[inline]
pub fn add(&mut self, bucket: usize, delta: &N) {
let mut i = bucket + 1;
while i <= self.size {
self.tree[i].add_assign(delta);
i += i & i.wrapping_neg();
}
}
/// Prefix sum of buckets [0, bucket] inclusive (0-indexed).
pub fn prefix_sum(&self, bucket: usize) -> N {
let mut result = N::default();
let mut i = bucket + 1;
while i > 0 {
result.add_assign(&self.tree[i]);
i -= i & i.wrapping_neg();
}
result
}
/// Find the 0-indexed bucket containing the k-th element (0-indexed k).
///
/// `field_fn` extracts the relevant count field from a node.
/// The value type `V` must support comparison and subtraction
/// (works with `u32`, `i64`, `i128`).
#[inline]
pub fn kth<V, F>(&self, k: V, field_fn: F) -> usize
where
V: Copy + PartialOrd + std::ops::SubAssign,
F: Fn(&N) -> V,
{
debug_assert!(self.size > 0);
let mut pos = 0usize;
let mut remaining = k;
let mut bit = 1usize << (usize::BITS - 1 - self.size.leading_zeros() as u32);
while bit > 0 {
let next = pos + bit;
if next <= self.size {
let val = field_fn(&self.tree[next]);
if remaining >= val {
remaining -= val;
pos = next;
}
}
bit >>= 1;
}
pos // 0-indexed bucket
}
/// Write a raw frequency delta at a bucket. Does NOT maintain the Fenwick invariant.
/// Call [`build_in_place`] after all raw writes.
#[inline]
pub fn add_raw(&mut self, bucket: usize, delta: &N) {
self.tree[bucket + 1].add_assign(delta);
}
/// Convert raw frequencies (written via [`add_raw`]) into a valid Fenwick tree. O(size).
pub fn build_in_place(&mut self) {
for i in 1..=self.size {
let parent = i + (i & i.wrapping_neg());
if parent <= self.size {
let child = self.tree[i];
self.tree[parent].add_assign(&child);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_add_and_prefix_sum() {
let mut tree = FenwickTree::<u32>::new(10);
tree.add(0, &3);
tree.add(1, &2);
tree.add(5, &7);
assert_eq!(tree.prefix_sum(0), 3);
assert_eq!(tree.prefix_sum(1), 5);
assert_eq!(tree.prefix_sum(4), 5);
assert_eq!(tree.prefix_sum(5), 12);
assert_eq!(tree.prefix_sum(9), 12);
}
#[test]
fn kth_walk_down() {
let mut tree = FenwickTree::<u32>::new(5);
// freq: [3, 2, 0, 5, 1]
tree.add(0, &3);
tree.add(1, &2);
tree.add(3, &5);
tree.add(4, &1);
// kth(0) = first element → bucket 0
assert_eq!(tree.kth(0u32, |n| *n), 0);
// kth(2) = 3rd element → bucket 0 (last of bucket 0)
assert_eq!(tree.kth(2u32, |n| *n), 0);
// kth(3) = 4th element → bucket 1
assert_eq!(tree.kth(3u32, |n| *n), 1);
// kth(4) = 5th element → bucket 1
assert_eq!(tree.kth(4u32, |n| *n), 1);
// kth(5) = 6th element → bucket 3 (bucket 2 is empty)
assert_eq!(tree.kth(5u32, |n| *n), 3);
// kth(10) = 11th element → bucket 4
assert_eq!(tree.kth(10u32, |n| *n), 4);
}
#[test]
fn build_in_place_matches_add() {
let mut tree_add = FenwickTree::<u32>::new(8);
tree_add.add(0, &5);
tree_add.add(2, &3);
tree_add.add(5, &7);
tree_add.add(7, &1);
let mut tree_bulk = FenwickTree::<u32>::new(8);
tree_bulk.add_raw(0, &5);
tree_bulk.add_raw(2, &3);
tree_bulk.add_raw(5, &7);
tree_bulk.add_raw(7, &1);
tree_bulk.build_in_place();
for i in 0..8 {
assert_eq!(
tree_add.prefix_sum(i),
tree_bulk.prefix_sum(i),
"mismatch at bucket {i}"
);
}
}
#[test]
fn reset_clears_all() {
let mut tree = FenwickTree::<u32>::new(10);
tree.add(3, &42);
tree.reset();
assert_eq!(tree.prefix_sum(9), 0);
}
}
+6 -3
View File
@@ -1,12 +1,15 @@
mod aggregation;
mod drawdown;
mod expanding_percentiles;
mod fenwick;
mod sliding_distribution;
mod sliding_median;
pub(crate) mod sliding_window;
mod expanding_percentiles;
mod sliding_window;
pub(crate) use aggregation::*;
pub(crate) use drawdown::*;
pub(crate) use expanding_percentiles::*;
pub(crate) use fenwick::*;
pub(crate) use sliding_distribution::*;
pub(crate) use sliding_median::*;
pub(crate) use expanding_percentiles::*;
pub(crate) use sliding_window::*;
+2 -2
View File
@@ -1,5 +1,5 @@
mod aggregate;
pub(crate) mod algo;
mod algo;
mod containers;
mod db_utils;
mod derived;
@@ -8,7 +8,7 @@ mod from_tx;
mod indexes;
mod rolling;
mod traits;
pub mod transform;
mod transform;
mod value;
pub(crate) use aggregate::*;