global: snapshot

This commit is contained in:
nym21
2026-03-15 00:57:53 +01:00
parent 0d177494d9
commit 9e36a4188a
50 changed files with 2765 additions and 1239 deletions

View File

@@ -1,3 +1,5 @@
use std::thread;
use brk_error::Result;
use brk_indexer::Indexer;
use brk_types::Indexes;
@@ -15,22 +17,40 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
// Sequential: time → lookback (dependency chain)
self.time
.timestamp
.compute(indexer, indexes, starting_indexes, exit)?;
self.lookback
.compute(&self.time, starting_indexes, exit)?;
self.count
.compute(indexer, starting_indexes, exit)?;
self.interval
.compute(indexer, starting_indexes, exit)?;
self.size
.compute(indexer, &self.lookback, starting_indexes, exit)?;
self.weight
.compute(indexer, starting_indexes, exit)?;
self.difficulty
.compute(indexer, indexes, starting_indexes, exit)?;
self.halving.compute(indexes, starting_indexes, exit)?;
// Parallel: remaining sub-modules are independent of each other.
// size depends on lookback (already computed above).
let Vecs {
lookback,
count,
interval,
size,
weight,
difficulty,
halving,
..
} = self;
thread::scope(|s| -> Result<()> {
let r1 = s.spawn(|| count.compute(indexer, starting_indexes, exit));
let r2 = s.spawn(|| interval.compute(indexer, starting_indexes, exit));
let r3 = s.spawn(|| weight.compute(indexer, starting_indexes, exit));
let r4 =
s.spawn(|| difficulty.compute(indexer, indexes, starting_indexes, exit));
let r5 = s.spawn(|| halving.compute(indexes, starting_indexes, exit));
size.compute(indexer, &*lookback, starting_indexes, exit)?;
r1.join().unwrap()?;
r2.join().unwrap()?;
r3.join().unwrap()?;
r4.join().unwrap()?;
r5.join().unwrap()?;
Ok(())
})?;
let _lock = exit.lock();
self.db.compact()?;

View File

@@ -32,7 +32,7 @@ pub(super) struct CostBasisNode {
}
impl CostBasisNode {
#[inline]
#[inline(always)]
fn new(sats: i64, usd: i128, is_sth: bool) -> Self {
Self {
all_sats: sats,
@@ -237,7 +237,7 @@ impl CostBasisFenwick {
let mut sat_buckets = [0usize; PERCENTILES_LEN + 2];
self.tree
.batch_kth(&sat_targets, &sat_field, &mut sat_buckets);
.kth(&sat_targets, &sat_field, &mut sat_buckets);
result.min_price = bucket_to_cents(sat_buckets[0]);
(0..PERCENTILES_LEN).for_each(|i| {
@@ -254,7 +254,7 @@ impl CostBasisFenwick {
let mut usd_buckets = [0usize; PERCENTILES_LEN];
self.tree
.batch_kth(&usd_targets, &usd_field, &mut usd_buckets);
.kth(&usd_targets, &usd_field, &mut usd_buckets);
(0..PERCENTILES_LEN).for_each(|i| {
result.usd_prices[i] = bucket_to_cents(usd_buckets[i]);
@@ -310,16 +310,16 @@ impl CostBasisFenwick {
}
// -----------------------------------------------------------------------
// Profitability queries (all cohort only)
// Profitability queries
// -----------------------------------------------------------------------
/// Compute profitability range buckets from current spot price.
/// Returns 25 ranges: (sats, usd_raw) per range.
/// Returns 25 ranges with all/sth splits.
pub(super) fn profitability(
&self,
spot_price: Cents,
) -> [(u64, u128); PROFITABILITY_RANGE_COUNT] {
let mut result = [(0u64, 0u128); PROFITABILITY_RANGE_COUNT];
) -> [ProfitabilityRangeResult; PROFITABILITY_RANGE_COUNT] {
let mut result = [ProfitabilityRangeResult::ZERO; PROFITABILITY_RANGE_COUNT];
if self.totals.all_sats <= 0 {
return result;
@@ -327,34 +327,54 @@ impl CostBasisFenwick {
let boundaries = compute_profitability_boundaries(spot_price);
let mut prev_sats: i64 = 0;
let mut prev_usd: i128 = 0;
let mut prev = CostBasisNode::default();
for (i, &boundary) in boundaries.iter().enumerate() {
let boundary_bucket = cents_to_bucket(boundary);
// prefix_sum through the bucket BEFORE the boundary
let cum = if boundary_bucket > 0 {
self.tree.prefix_sum(boundary_bucket - 1)
} else {
CostBasisNode::default()
};
let range_sats = cum.all_sats - prev_sats;
let range_usd = cum.all_usd - prev_usd;
result[i] = (range_sats.max(0) as u64, range_usd.max(0) as u128);
prev_sats = cum.all_sats;
prev_usd = cum.all_usd;
result[i] = ProfitabilityRangeResult {
all_sats: (cum.all_sats - prev.all_sats).max(0) as u64,
all_usd: (cum.all_usd - prev.all_usd).max(0) as u128,
sth_sats: (cum.sth_sats - prev.sth_sats).max(0) as u64,
sth_usd: (cum.sth_usd - prev.sth_usd).max(0) as u128,
};
prev = cum;
}
// Last range: everything >= last boundary
let remaining_sats = self.totals.all_sats - prev_sats;
let remaining_usd = self.totals.all_usd - prev_usd;
result[PROFITABILITY_RANGE_COUNT - 1] =
(remaining_sats.max(0) as u64, remaining_usd.max(0) as u128);
result[PROFITABILITY_RANGE_COUNT - 1] = ProfitabilityRangeResult {
all_sats: (self.totals.all_sats - prev.all_sats).max(0) as u64,
all_usd: (self.totals.all_usd - prev.all_usd).max(0) as u128,
sth_sats: (self.totals.sth_sats - prev.sth_sats).max(0) as u64,
sth_usd: (self.totals.sth_usd - prev.sth_usd).max(0) as u128,
};
result
}
}
/// Per-range profitability result with all/sth split.
#[derive(Clone, Copy)]
pub(super) struct ProfitabilityRangeResult {
pub all_sats: u64,
pub all_usd: u128,
pub sth_sats: u64,
pub sth_usd: u128,
}
impl ProfitabilityRangeResult {
const ZERO: Self = Self {
all_sats: 0,
all_usd: 0,
sth_sats: 0,
sth_usd: 0,
};
}
/// Result of a percentile computation for one cohort.
#[derive(Default)]
pub(super) struct PercentileResult {

View File

@@ -25,7 +25,7 @@ use crate::{
state::UTXOCohortState,
},
indexes,
internal::{AmountPerBlock, CachedWindowStarts},
internal::{AmountPerBlockCumulativeWithSums, CachedWindowStarts},
prices,
};
@@ -50,7 +50,7 @@ pub struct UTXOCohorts<M: StorageMode = Rw> {
#[traversable(rename = "type")]
pub type_: SpendableType<UTXOCohortVecs<TypeCohortMetrics<M>>>,
pub profitability: ProfitabilityMetrics<M>,
pub matured: AgeRange<AmountPerBlock<M>>,
pub matured: AgeRange<AmountPerBlockCumulativeWithSums<M>>,
#[traversable(skip)]
pub(super) fenwick: CostBasisFenwick,
/// Cached partition_point positions for tick_tock boundary searches.
@@ -178,7 +178,7 @@ impl UTXOCohorts<Rw> {
);
// Phase 3b: Import profitability metrics (derived from "all" during k-way merge).
let profitability = ProfitabilityMetrics::forced_import(db, v, indexes)?;
let profitability = ProfitabilityMetrics::forced_import(db, v, indexes, cached_starts)?;
// Phase 4: Import aggregate cohorts.
@@ -256,10 +256,17 @@ impl UTXOCohorts<Rw> {
let under_amount = UnderAmount::try_new(&minimal_no_state)?;
let over_amount = OverAmount::try_new(&minimal_no_state)?;
let prefix = CohortContext::Utxo.prefix();
let matured = AgeRange::try_new(&|_f: Filter,
name: &'static str|
-> Result<AmountPerBlock> {
AmountPerBlock::forced_import(db, &format!("utxo_{name}_matured"), v, indexes)
-> Result<AmountPerBlockCumulativeWithSums> {
AmountPerBlockCumulativeWithSums::forced_import(
db,
&format!("{prefix}_{name}_matured_supply"),
v,
indexes,
cached_starts,
)
})?;
Ok(Self {
@@ -338,7 +345,7 @@ impl UTXOCohorts<Rw> {
matured: &AgeRange<Sats>,
) -> Result<()> {
for (v, &sats) in self.matured.iter_mut().zip(matured.iter()) {
v.sats.height.truncate_push(height, sats)?;
v.base.sats.height.truncate_push(height, sats)?;
}
Ok(())
}
@@ -509,10 +516,13 @@ impl UTXOCohorts<Rw> {
.try_for_each(|v| v.compute_rest_part1(prices, starting_indexes, exit))?;
}
// Compute matured cents from sats × price
// Compute matured cumulative + cents from sats × price
self.matured
.par_iter_mut()
.try_for_each(|v| v.compute(prices, starting_indexes.height, exit))?;
.try_for_each(|v| v.compute_rest(starting_indexes.height, prices, exit))?;
// Compute profitability supply cents and realized price
self.profitability.compute(prices, starting_indexes, exit)?;
Ok(())
}
@@ -709,8 +719,10 @@ impl UTXOCohorts<Rw> {
}
vecs.extend(self.profitability.collect_all_vecs_mut());
for v in self.matured.iter_mut() {
vecs.push(&mut v.sats.height);
vecs.push(&mut v.cents.height);
vecs.push(&mut v.base.sats.height);
vecs.push(&mut v.base.cents.height);
vecs.push(&mut v.cumulative.sats.height);
vecs.push(&mut v.cumulative.cents.height);
}
vecs.into_par_iter()
}
@@ -727,7 +739,7 @@ impl UTXOCohorts<Rw> {
.chain(
self.matured
.iter()
.map(|v| Height::from(v.min_stateful_len())),
.map(|v| Height::from(v.base.min_stateful_len())),
)
.min()
.unwrap_or_default()

View File

@@ -6,7 +6,7 @@ use brk_types::{BasisPoints16, Cents, CentsCompact, CostBasisDistribution, Date,
use crate::distribution::metrics::{CostBasis, ProfitabilityMetrics};
use super::fenwick::PercentileResult;
use super::fenwick::{PercentileResult, ProfitabilityRangeResult};
use super::groups::UTXOCohorts;
use super::COST_BASIS_PRICE_DIGITS;
@@ -104,7 +104,7 @@ fn push_cost_basis(
}
/// Convert raw (cents × sats) accumulator to Dollars (÷ 100 for cents→dollars, ÷ 1e8 for sats).
#[inline]
#[inline(always)]
fn raw_usd_to_dollars(raw: u128) -> Dollars {
Dollars::from(raw as f64 / 1e10)
}
@@ -112,25 +112,41 @@ fn raw_usd_to_dollars(raw: u128) -> Dollars {
/// Push profitability range + profit/loss aggregate values to vecs.
fn push_profitability(
height: Height,
buckets: &[(u64, u128); PROFITABILITY_RANGE_COUNT],
buckets: &[ProfitabilityRangeResult; PROFITABILITY_RANGE_COUNT],
metrics: &mut ProfitabilityMetrics,
) -> Result<()> {
// Truncate all buckets once upfront to avoid per-push checks
metrics.truncate(height)?;
// Push 25 range buckets
for (i, bucket) in metrics.range.as_array_mut().into_iter().enumerate() {
let (sats, usd_raw) = buckets[i];
bucket.truncate_push(height, Sats::from(sats), raw_usd_to_dollars(usd_raw))?;
let r = &buckets[i];
bucket.push(
Sats::from(r.all_sats),
Sats::from(r.sth_sats),
raw_usd_to_dollars(r.all_usd),
raw_usd_to_dollars(r.sth_usd),
);
}
// Profit: forward cumulative sum over ranges[0..15], pushed in reverse.
// profit[0] (breakeven) = sum(0..=13), ..., profit[13] (_500pct) = ranges[0]
let profit_arr = metrics.profit.as_array_mut();
let mut cum_sats = 0u64;
let mut cum_sth_sats = 0u64;
let mut cum_usd = 0u128;
let mut cum_sth_usd = 0u128;
for i in 0..PROFIT_COUNT {
cum_sats += buckets[i].0;
cum_usd += buckets[i].1;
profit_arr[PROFIT_COUNT - 1 - i]
.truncate_push(height, Sats::from(cum_sats), raw_usd_to_dollars(cum_usd))?;
cum_sats += buckets[i].all_sats;
cum_sth_sats += buckets[i].sth_sats;
cum_usd += buckets[i].all_usd;
cum_sth_usd += buckets[i].sth_usd;
profit_arr[PROFIT_COUNT - 1 - i].push(
Sats::from(cum_sats),
Sats::from(cum_sth_sats),
raw_usd_to_dollars(cum_usd),
raw_usd_to_dollars(cum_sth_usd),
);
}
// Loss: backward cumulative sum over ranges[15..25], pushed in reverse.
@@ -138,12 +154,21 @@ fn push_profitability(
let loss_arr = metrics.loss.as_array_mut();
let loss_count = loss_arr.len();
cum_sats = 0;
cum_sth_sats = 0;
cum_usd = 0;
cum_sth_usd = 0;
for i in 0..loss_count {
cum_sats += buckets[PROFITABILITY_RANGE_COUNT - 1 - i].0;
cum_usd += buckets[PROFITABILITY_RANGE_COUNT - 1 - i].1;
loss_arr[loss_count - 1 - i]
.truncate_push(height, Sats::from(cum_sats), raw_usd_to_dollars(cum_usd))?;
let r = &buckets[PROFITABILITY_RANGE_COUNT - 1 - i];
cum_sats += r.all_sats;
cum_sth_sats += r.sth_sats;
cum_usd += r.all_usd;
cum_sth_usd += r.sth_usd;
loss_arr[loss_count - 1 - i].push(
Sats::from(cum_sats),
Sats::from(cum_sth_sats),
raw_usd_to_dollars(cum_usd),
raw_usd_to_dollars(cum_sth_usd),
);
}
Ok(())

View File

@@ -16,8 +16,8 @@ impl UTXOCohorts<Rw> {
/// Since timestamps are monotonic, positions only advance forward.
/// Complexity: O(k * c) where k = 20 boundaries, c = ~1 (forward scan steps).
///
/// Returns how many sats matured INTO each cohort from the younger adjacent one.
/// `under_1h` is always zero since nothing ages into the youngest cohort.
/// Returns how many sats matured OUT OF each cohort into the older adjacent one.
/// `over_15y` is always zero since nothing ages out of the oldest cohort.
pub(crate) fn tick_tock_next_block(
&mut self,
chain_state: &[BlockState],
@@ -92,7 +92,7 @@ impl UTXOCohorts<Rw> {
if let Some(state) = age_cohorts[boundary_idx + 1].as_mut() {
state.increment_snapshot(&snapshot);
}
matured[boundary_idx + 1] += block_state.supply.value;
matured[boundary_idx] += block_state.supply.value;
}
}

View File

@@ -1,21 +1,43 @@
use brk_cohort::{Loss, Profit, ProfitabilityRange};
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Dollars, Height, Sats, Version};
use vecdb::{AnyStoredVec, AnyVec, Database, Rw, StorageMode, WritableVec};
use brk_types::{
BasisPoints32, BasisPointsSigned32, Cents, Dollars, Height, Indexes, Sats, StoredF32, Version,
};
use vecdb::{AnyStoredVec, AnyVec, Database, Exit, Rw, StorageMode, WritableVec};
use crate::{indexes, internal::PerBlock};
use crate::{
indexes,
internal::{
AmountPerBlock, AmountPerBlockWithDeltas, CachedWindowStarts, Identity, LazyPerBlock,
PerBlock, PriceWithRatioPerBlock, RatioPerBlock,
},
prices,
};
#[derive(Traversable)]
pub struct WithSth<All, Sth = All> {
pub all: All,
pub sth: Sth,
}
/// Supply + realized cap for a single profitability bucket.
#[derive(Traversable)]
pub struct ProfitabilityBucket<M: StorageMode = Rw> {
pub supply: PerBlock<Sats, M>,
pub realized_cap: PerBlock<Dollars, M>,
pub supply: WithSth<AmountPerBlockWithDeltas<M>, AmountPerBlock<M>>,
pub realized_cap: WithSth<PerBlock<Dollars, M>>,
pub realized_price: PriceWithRatioPerBlock<M>,
pub mvrv: LazyPerBlock<StoredF32>,
pub nupl: RatioPerBlock<BasisPointsSigned32, M>,
}
impl<M: StorageMode> ProfitabilityBucket<M> {
fn min_len(&self) -> usize {
self.supply.height.len().min(self.realized_cap.height.len())
self.supply
.all
.sats
.height
.len()
.min(self.realized_cap.all.height.len())
}
}
@@ -25,45 +47,154 @@ impl ProfitabilityBucket {
name: &str,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &CachedWindowStarts,
) -> Result<Self> {
let realized_price = PriceWithRatioPerBlock::forced_import(
db,
&format!("{name}_realized_price"),
version,
indexes,
)?;
let mvrv = LazyPerBlock::from_lazy::<Identity<StoredF32>, BasisPoints32>(
&format!("{name}_mvrv"),
version,
&realized_price.ratio,
);
Ok(Self {
supply: PerBlock::forced_import(
supply: WithSth {
all: AmountPerBlockWithDeltas::forced_import(
db,
&format!("{name}_supply"),
version,
indexes,
cached_starts,
)?,
sth: AmountPerBlock::forced_import(
db,
&format!("{name}_sth_supply"),
version,
indexes,
)?,
},
realized_cap: WithSth {
all: PerBlock::forced_import(
db,
&format!("{name}_realized_cap"),
version,
indexes,
)?,
sth: PerBlock::forced_import(
db,
&format!("{name}_sth_realized_cap"),
version,
indexes,
)?,
},
realized_price,
mvrv,
nupl: RatioPerBlock::forced_import_raw(
db,
&format!("{name}_supply"),
version,
indexes,
)?,
realized_cap: PerBlock::forced_import(
db,
&format!("{name}_realized_cap"),
version,
&format!("{name}_nupl"),
version + Version::ONE,
indexes,
)?,
})
}
pub(crate) fn truncate_push(
#[inline(always)]
pub(crate) fn truncate(&mut self, height: Height) -> Result<()> {
self.supply.all.sats.height.truncate_if_needed(height)?;
self.supply.sth.sats.height.truncate_if_needed(height)?;
self.realized_cap.all.height.truncate_if_needed(height)?;
self.realized_cap.sth.height.truncate_if_needed(height)?;
Ok(())
}
#[inline(always)]
pub(crate) fn push(
&mut self,
height: Height,
supply: Sats,
sth_supply: Sats,
realized_cap: Dollars,
sth_realized_cap: Dollars,
) {
self.supply.all.sats.height.push(supply);
self.supply.sth.sats.height.push(sth_supply);
self.realized_cap.all.height.push(realized_cap);
self.realized_cap.sth.height.push(sth_realized_cap);
}
pub(crate) fn compute(
&mut self,
prices: &prices::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.supply.height.truncate_push(height, supply)?;
self.realized_cap
.height
.truncate_push(height, realized_cap)?;
let max_from = starting_indexes.height;
self.supply.all.compute(prices, max_from, exit)?;
self.supply.sth.compute(prices, max_from, exit)?;
// Realized price cents = realized_cap_cents × ONE_BTC / supply_sats
self.realized_price.cents.height.compute_transform2(
max_from,
&self.realized_cap.all.height,
&self.supply.all.sats.height,
|(i, cap_dollars, supply_sats, ..)| {
let cap_cents = Cents::from(cap_dollars).as_u128();
let supply = supply_sats.as_u128();
if supply == 0 {
(i, Cents::ZERO)
} else {
(i, Cents::from(cap_cents * Sats::ONE_BTC_U128 / supply))
}
},
exit,
)?;
// Ratio (spot / realized_price) → feeds MVRV lazily
self.realized_price
.compute_ratio(starting_indexes, &prices.spot.cents.height, exit)?;
// NUPL = (spot - realized_price) / spot
self.nupl.bps.height.compute_transform2(
max_from,
&prices.spot.cents.height,
&self.realized_price.cents.height,
|(i, spot, realized, ..)| {
let p = spot.as_u128();
if p == 0 {
(i, BasisPointsSigned32::ZERO)
} else {
let rp = realized.as_u128();
let bps = ((p as i128 - rp as i128) * 10000) / p as i128;
(i, BasisPointsSigned32::from(bps as i32))
}
},
exit,
)?;
Ok(())
}
pub(crate) fn collect_all_vecs_mut(&mut self) -> Vec<&mut dyn AnyStoredVec> {
vec![
&mut self.supply.height as &mut dyn AnyStoredVec,
&mut self.realized_cap.height as &mut dyn AnyStoredVec,
&mut self.supply.all.inner.sats.height as &mut dyn AnyStoredVec,
&mut self.supply.all.inner.cents.height as &mut dyn AnyStoredVec,
&mut self.supply.sth.sats.height as &mut dyn AnyStoredVec,
&mut self.supply.sth.cents.height as &mut dyn AnyStoredVec,
&mut self.realized_cap.all.height as &mut dyn AnyStoredVec,
&mut self.realized_cap.sth.height as &mut dyn AnyStoredVec,
&mut self.realized_price.cents.height as &mut dyn AnyStoredVec,
&mut self.realized_price.bps.height as &mut dyn AnyStoredVec,
&mut self.nupl.bps.height as &mut dyn AnyStoredVec,
]
}
}
/// All profitability metrics: 25 ranges + 15 profit thresholds + 10 loss thresholds.
/// All profitability metrics: 25 ranges + 14 profit thresholds + 9 loss thresholds.
#[derive(Traversable)]
pub struct ProfitabilityMetrics<M: StorageMode = Rw> {
pub range: ProfitabilityRange<ProfitabilityBucket<M>>,
@@ -72,32 +203,46 @@ pub struct ProfitabilityMetrics<M: StorageMode = Rw> {
}
impl<M: StorageMode> ProfitabilityMetrics<M> {
pub(crate) fn min_stateful_len(&self) -> usize {
self.range.iter()
pub fn iter(&self) -> impl Iterator<Item = &ProfitabilityBucket<M>> {
self.range
.iter()
.chain(self.profit.iter())
.chain(self.loss.iter())
.map(|b| b.min_len())
.min()
.unwrap_or(0)
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut ProfitabilityBucket<M>> {
self.range
.iter_mut()
.chain(self.profit.iter_mut())
.chain(self.loss.iter_mut())
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.iter().map(|b| b.min_len()).min().unwrap_or(0)
}
}
impl ProfitabilityMetrics {
pub(crate) fn truncate(&mut self, height: Height) -> Result<()> {
self.iter_mut().try_for_each(|b| b.truncate(height))
}
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &CachedWindowStarts,
) -> Result<Self> {
let range = ProfitabilityRange::try_new(|name| {
ProfitabilityBucket::forced_import(db, name, version, indexes)
ProfitabilityBucket::forced_import(db, name, version, indexes, cached_starts)
})?;
let profit = Profit::try_new(|name| {
ProfitabilityBucket::forced_import(db, name, version, indexes)
ProfitabilityBucket::forced_import(db, name, version, indexes, cached_starts)
})?;
let loss = Loss::try_new(|name| {
ProfitabilityBucket::forced_import(db, name, version, indexes)
ProfitabilityBucket::forced_import(db, name, version, indexes, cached_starts)
})?;
Ok(Self {
@@ -107,18 +252,21 @@ impl ProfitabilityMetrics {
})
}
pub(crate) fn compute(
&mut self,
prices: &prices::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.iter_mut()
.try_for_each(|b| b.compute(prices, starting_indexes, exit))
}
pub(crate) fn collect_all_vecs_mut(&mut self) -> Vec<&mut dyn AnyStoredVec> {
let mut vecs = Vec::new();
for bucket in self.range.iter_mut() {
vecs.extend(bucket.collect_all_vecs_mut());
}
for bucket in self.profit.iter_mut() {
vecs.extend(bucket.collect_all_vecs_mut());
}
for bucket in self.loss.iter_mut() {
for bucket in self.iter_mut() {
vecs.extend(bucket.collect_all_vecs_mut());
}
vecs
}
}

View File

@@ -1,12 +1,12 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Cents, Height, Indexes, Version};
use brk_types::{Cents, Dollars, Height, Indexes, Version};
use derive_more::{Deref, DerefMut};
use vecdb::{AnyStoredVec, AnyVec, Exit, Rw, StorageMode, WritableVec};
use vecdb::{AnyStoredVec, AnyVec, Exit, ReadableCloneableVec, Rw, StorageMode, WritableVec};
use crate::{
distribution::{metrics::ImportConfig, state::UnrealizedState},
internal::FiatPerBlockCumulativeWithSums,
internal::{FiatPerBlockCumulativeWithSums, LazyPerBlock, NegCentsUnsignedToDollars},
};
use super::UnrealizedMinimal;
@@ -19,16 +19,28 @@ pub struct UnrealizedBasic<M: StorageMode = Rw> {
pub minimal: UnrealizedMinimal<M>,
pub profit: FiatPerBlockCumulativeWithSums<Cents, M>,
pub loss: FiatPerBlockCumulativeWithSums<Cents, M>,
#[traversable(wrap = "loss", rename = "negative")]
pub neg_loss: LazyPerBlock<Dollars, Cents>,
}
impl UnrealizedBasic {
pub(crate) fn forced_import(cfg: &ImportConfig) -> Result<Self> {
let v1 = Version::ONE;
let loss: FiatPerBlockCumulativeWithSums<Cents> = cfg.import("unrealized_loss", v1)?;
let neg_loss = LazyPerBlock::from_computed::<NegCentsUnsignedToDollars>(
&cfg.name("neg_unrealized_loss"),
cfg.version,
loss.base.cents.height.read_only_boxed_clone(),
&loss.base.cents,
);
Ok(Self {
minimal: UnrealizedMinimal::forced_import(cfg)?,
profit: cfg.import("unrealized_profit", v1)?,
loss: cfg.import("unrealized_loss", v1)?,
loss,
neg_loss,
})
}

View File

@@ -2,18 +2,16 @@ use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Cents, CentsSigned, Height, Indexes, Version};
use derive_more::{Deref, DerefMut};
use vecdb::{AnyStoredVec, Exit, ReadableCloneableVec, Rw, StorageMode};
use vecdb::{AnyStoredVec, Exit, Rw, StorageMode};
use crate::{
distribution::{
metrics::ImportConfig,
state::UnrealizedState,
},
internal::{CentsSubtractToCentsSigned, FiatPerBlock, LazyPerBlock, NegCentsUnsignedToDollars},
internal::{CentsSubtractToCentsSigned, FiatPerBlock},
};
use brk_types::Dollars;
use super::UnrealizedBasic;
#[derive(Deref, DerefMut, Traversable)]
@@ -23,27 +21,16 @@ pub struct UnrealizedCore<M: StorageMode = Rw> {
#[traversable(flatten)]
pub basic: UnrealizedBasic<M>,
#[traversable(wrap = "loss", rename = "negative")]
pub neg_loss: LazyPerBlock<Dollars, Cents>,
pub net_pnl: FiatPerBlock<CentsSigned, M>,
}
impl UnrealizedCore {
pub(crate) fn forced_import(cfg: &ImportConfig) -> Result<Self> {
let basic = UnrealizedBasic::forced_import(cfg)?;
let neg_unrealized_loss = LazyPerBlock::from_computed::<NegCentsUnsignedToDollars>(
&cfg.name("neg_unrealized_loss"),
cfg.version,
basic.loss.base.cents.height.read_only_boxed_clone(),
&basic.loss.base.cents,
);
let net_unrealized_pnl = cfg.import("net_unrealized_pnl", Version::ZERO)?;
Ok(Self {
basic,
neg_loss: neg_unrealized_loss,
net_pnl: net_unrealized_pnl,
})
}

View File

@@ -1,6 +1,6 @@
use brk_error::Result;
use brk_types::{Bitcoin, Dollars, Indexes, Sats, StoredF32};
use vecdb::{Exit, ReadableVec};
use brk_types::{Bitcoin, Dollars, Indexes, StoredF32};
use vecdb::Exit;
use super::{gini, Vecs};
use crate::{distribution, internal::RatioDollarsBp32, market, mining, transactions};
@@ -180,14 +180,12 @@ impl Vecs {
// Seller Exhaustion Constant: % supply_in_profit × 30d_volatility
self.seller_exhaustion_constant
.height
.compute_transform2(
.compute_transform3(
starting_indexes.height,
&all_metrics.supply.in_profit.sats.height,
&market.volatility._1m.height,
|(i, profit_sats, volatility, ..)| {
let total_sats: Sats = supply_total_sats
.collect_one(i)
.unwrap_or_default();
supply_total_sats,
|(i, profit_sats, volatility, total_sats, ..)| {
let total = total_sats.as_u128() as f64;
if total == 0.0 {
(i, StoredF32::from(0.0f32))

View File

@@ -1,6 +1,6 @@
use brk_error::Result;
use brk_indexer::Indexer;
use brk_types::{Indexes, Sats, TxInIndex, TxIndex, TxOutIndex, Vout};
use brk_types::{Indexes, Sats, TxIndex, TxOutIndex, Vout};
use tracing::info;
use vecdb::{AnyStoredVec, AnyVec, Database, Exit, ReadableVec, VecIndex, WritableVec};
@@ -98,11 +98,11 @@ impl Vecs {
out_value[entry.original_idx] = entry.value;
}
self.txout_index.truncate_if_needed_at(batch_start)?;
self.value.truncate_if_needed_at(batch_start)?;
for i in 0..batch_len {
let txin_index = TxInIndex::from(batch_start + i);
self.txout_index
.truncate_push(txin_index, out_txout_index[i])?;
self.value.truncate_push(txin_index, out_value[i])?;
self.txout_index.push(out_txout_index[i]);
self.value.push(out_value[i]);
}
if batch_end < target {

View File

@@ -71,18 +71,23 @@ impl ExpandingPercentiles {
self.tree.add(Self::to_bucket(value), &1);
}
/// Compute 6 percentiles in one call. O(6 × log N).
/// Quantiles q must be in (0, 1). Output is in BPS.
/// Compute 6 percentiles in one call via kth. O(6 × log N) but with
/// shared tree traversal across all 6 targets for better cache locality.
/// Quantiles q must be sorted ascending in (0, 1). Output is in BPS.
pub fn quantiles(&self, qs: &[f64; 6], out: &mut [u32; 6]) {
if self.count == 0 {
out.iter_mut().for_each(|o| *o = 0);
return;
}
let mut targets = [0u32; 6];
for (i, &q) in qs.iter().enumerate() {
let k = ((q * self.count as f64).ceil() as u32).clamp(1, self.count);
// kth with 0-indexed k: k-1; result is 0-indexed bucket
let bucket = self.tree.kth(k - 1, |n| *n);
out[i] = bucket as u32 * BUCKET_BPS as u32;
targets[i] = k - 1; // 0-indexed
}
let mut buckets = [0usize; 6];
self.tree.kth(&targets, &|n: &u32| *n, &mut buckets);
for (i, bucket) in buckets.iter().enumerate() {
out[i] = *bucket as u32 * BUCKET_BPS as u32;
}
}
}

View File

@@ -54,42 +54,15 @@ impl<N: FenwickNode> FenwickTree<N> {
result
}
/// Find the 0-indexed bucket containing the k-th element (0-indexed k).
/// Find the 0-indexed bucket containing the k-th element for each target.
///
/// `field_fn` extracts the relevant count field from a node.
/// The value type `V` must support comparison and subtraction
/// (works with `u32`, `i64`, `i128`).
#[inline]
pub fn kth<V, F>(&self, k: V, field_fn: F) -> usize
where
V: Copy + PartialOrd + std::ops::SubAssign,
F: Fn(&N) -> V,
{
debug_assert!(self.size > 0);
let mut pos = 0usize;
let mut remaining = k;
let mut bit = 1usize << (usize::BITS - 1 - self.size.leading_zeros());
while bit > 0 {
let next = pos + bit;
if next <= self.size {
let val = field_fn(&self.tree[next]);
if remaining >= val {
remaining -= val;
pos = next;
}
}
bit >>= 1;
}
pos // 0-indexed bucket
}
/// Batch kth for sorted targets. Processes all targets at each tree level
/// for better cache locality vs individual kth() calls.
/// `sorted_targets` must be sorted ascending. `out` receives the 0-indexed
/// bucket for each target. Both slices must have the same length.
///
/// `sorted_targets` must be sorted ascending. `out` receives the 0-indexed bucket
/// for each target. Both slices must have the same length.
/// Processes all targets at each tree level for better cache locality.
#[inline]
pub fn batch_kth<V, F>(&self, sorted_targets: &[V], field_fn: &F, out: &mut [usize])
pub fn kth<V, F>(&self, sorted_targets: &[V], field_fn: &F, out: &mut [usize])
where
V: Copy + PartialOrd + std::ops::SubAssign,
F: Fn(&N) -> V,
@@ -162,18 +135,14 @@ mod tests {
tree.add(3, &5);
tree.add(4, &1);
// kth(0) = first element → bucket 0
assert_eq!(tree.kth(0u32, |n| *n), 0);
// kth(2) = 3rd element → bucket 0 (last of bucket 0)
assert_eq!(tree.kth(2u32, |n| *n), 0);
// kth(3) = 4th element → bucket 1
assert_eq!(tree.kth(3u32, |n| *n), 1);
// kth(4) = 5th element → bucket 1
assert_eq!(tree.kth(4u32, |n| *n), 1);
// kth(5) = 6th element → bucket 3 (bucket 2 is empty)
assert_eq!(tree.kth(5u32, |n| *n), 3);
// kth(10) = 11th element → bucket 4
assert_eq!(tree.kth(10u32, |n| *n), 4);
let mut out = [0usize; 6];
tree.kth(&[0u32, 2, 3, 4, 5, 10], &|n: &u32| *n, &mut out);
assert_eq!(out[0], 0); // kth(0) → bucket 0
assert_eq!(out[1], 0); // kth(2) → bucket 0 (last of bucket 0)
assert_eq!(out[2], 1); // kth(3) → bucket 1
assert_eq!(out[3], 1); // kth(4) → bucket 1
assert_eq!(out[4], 3); // kth(5) → bucket 3 (bucket 2 is empty)
assert_eq!(out[5], 4); // kth(10) → bucket 4
}
#[test]

View File

@@ -8,6 +8,11 @@ use super::sliding_window::SlidingWindowSorted;
/// Compute all 8 rolling distribution stats (avg, min, max, p10, p25, median, p75, p90)
/// in a single sorted-vec pass per window.
///
/// When computing multiple windows from the same source, pass the same
/// `&mut Option<(usize, Vec<f64>)>` cache to each call — the first call reads
/// and caches, subsequent calls reuse if their range is covered.
/// Process the largest window first (1y) so its cache covers all smaller windows.
#[allow(clippy::too_many_arguments)]
pub fn compute_rolling_distribution_from_starts<I, T, A>(
max_from: I,
@@ -22,6 +27,7 @@ pub fn compute_rolling_distribution_from_starts<I, T, A>(
p75_out: &mut EagerVec<PcoVec<I, T>>,
p90_out: &mut EagerVec<PcoVec<I, T>>,
exit: &Exit,
values_cache: &mut Option<(usize, Vec<f64>)>,
) -> Result<()>
where
I: VecIndex,
@@ -68,8 +74,21 @@ where
} else {
0
};
let mut partial_values: Vec<f64> = Vec::with_capacity(end - range_start);
values.for_each_range_at(range_start, end, |a: A| partial_values.push(f64::from(a)));
// Reuse cached values if the cache covers our range, otherwise read and cache.
let need_read = match values_cache.as_ref() {
Some((cached_start, cached)) => {
range_start < *cached_start || end > *cached_start + cached.len()
}
None => true,
};
if need_read {
let mut v = Vec::with_capacity(end - range_start);
values.for_each_range_at(range_start, end, |a: A| v.push(f64::from(a)));
*values_cache = Some((range_start, v));
}
let (cached_start, cached) = values_cache.as_ref().unwrap();
let partial_values = &cached[(range_start - cached_start)..(end - cached_start)];
let capacity = if skip > 0 && skip < end {
let first_start = window_starts.collect_one_at(skip).unwrap().to_usize();
@@ -83,7 +102,7 @@ where
let mut window = SlidingWindowSorted::with_capacity(capacity);
if skip > 0 {
window.reconstruct(&partial_values, range_start, skip);
window.reconstruct(partial_values, range_start, skip);
}
let starts_batch = window_starts.collect_range_at(skip, end);
@@ -92,7 +111,7 @@ where
let i = skip + j;
let v = partial_values[i - range_start];
let start_usize = start.to_usize();
window.advance(v, start_usize, &partial_values, range_start);
window.advance(v, start_usize, partial_values, range_start);
if window.is_empty() {
let zero = T::from(0.0);

View File

@@ -26,10 +26,20 @@ impl<A> Windows<A> {
[&self._24h, &self._1w, &self._1m, &self._1y]
}
/// Largest window first (1y, 1m, 1w, 24h).
pub fn as_array_largest_first(&self) -> [&A; 4] {
[&self._1y, &self._1m, &self._1w, &self._24h]
}
pub fn as_mut_array(&mut self) -> [&mut A; 4] {
[&mut self._24h, &mut self._1w, &mut self._1m, &mut self._1y]
}
/// Largest window first (1y, 1m, 1w, 24h).
pub fn as_mut_array_largest_first(&mut self) -> [&mut A; 4] {
[&mut self._1y, &mut self._1m, &mut self._1w, &mut self._24h]
}
pub fn as_mut_array_from_1w(&mut self) -> [&mut A; 3] {
[&mut self._1w, &mut self._1m, &mut self._1y]
}

View File

@@ -1,7 +1,7 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Cents, Height, Sats, Version};
use vecdb::{Database, EagerVec, Exit, PcoVec, Rw, StorageMode};
use vecdb::{Database, Exit, Rw, StorageMode};
use crate::{
indexes,
@@ -37,17 +37,6 @@ impl AmountPerBlockCumulative {
})
}
pub(crate) fn compute_with(
&mut self,
max_from: Height,
prices: &prices::Vecs,
exit: &Exit,
compute_sats: impl FnOnce(&mut EagerVec<PcoVec<Height, Sats>>) -> Result<()>,
) -> Result<()> {
compute_sats(&mut self.base.sats.height)?;
self.compute(prices, max_from, exit)
}
pub(crate) fn compute(
&mut self,
prices: &prices::Vecs,

View File

@@ -6,6 +6,7 @@ mod lazy;
mod lazy_derived_resolutions;
mod lazy_rolling_sum;
mod rolling_distribution;
mod with_deltas;
pub use base::*;
pub use cumulative::*;
@@ -15,3 +16,4 @@ pub use lazy::*;
pub use lazy_derived_resolutions::*;
pub use lazy_rolling_sum::*;
pub use rolling_distribution::*;
pub use with_deltas::*;

View File

@@ -42,11 +42,13 @@ impl RollingDistributionSlot {
sats_source: &impl ReadableVec<Height, Sats>,
cents_source: &impl ReadableVec<Height, Cents>,
exit: &Exit,
sats_cache: &mut Option<(usize, Vec<f64>)>,
cents_cache: &mut Option<(usize, Vec<f64>)>,
) -> Result<()> {
let d = &mut self.distribution;
macro_rules! compute_unit {
($unit:ident, $source:expr) => {
($unit:ident, $source:expr, $cache:expr) => {
compute_rolling_distribution_from_starts(
max_from,
starts,
@@ -60,11 +62,12 @@ impl RollingDistributionSlot {
&mut d.pct75.$unit.height,
&mut d.pct90.$unit.height,
exit,
$cache,
)?
};
}
compute_unit!(sats, sats_source);
compute_unit!(cents, cents_source);
compute_unit!(sats, sats_source, sats_cache);
compute_unit!(cents, cents_source, cents_cache);
Ok(())
}
@@ -104,8 +107,23 @@ impl RollingDistributionAmountPerBlock {
cents_source: &impl ReadableVec<Height, Cents>,
exit: &Exit,
) -> Result<()> {
for (slot, starts) in self.0.as_mut_array().into_iter().zip(windows.as_array()) {
slot.compute(max_from, *starts, sats_source, cents_source, exit)?;
let mut sats_cache = None;
let mut cents_cache = None;
for (slot, starts) in self
.0
.as_mut_array_largest_first()
.into_iter()
.zip(windows.as_array_largest_first())
{
slot.compute(
max_from,
*starts,
sats_source,
cents_source,
exit,
&mut sats_cache,
&mut cents_cache,
)?;
}
Ok(())
}

View File

@@ -0,0 +1,41 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{BasisPointsSigned32, Sats, SatsSigned, Version};
use derive_more::{Deref, DerefMut};
use vecdb::{Database, Rw, StorageMode};
use crate::{
indexes,
internal::{AmountPerBlock, CachedWindowStarts, LazyRollingDeltasFromHeight},
};
#[derive(Deref, DerefMut, Traversable)]
pub struct AmountPerBlockWithDeltas<M: StorageMode = Rw> {
#[deref]
#[deref_mut]
#[traversable(flatten)]
pub inner: AmountPerBlock<M>,
pub delta: LazyRollingDeltasFromHeight<Sats, SatsSigned, BasisPointsSigned32>,
}
impl AmountPerBlockWithDeltas {
pub(crate) fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &CachedWindowStarts,
) -> Result<Self> {
let inner = AmountPerBlock::forced_import(db, name, version, indexes)?;
let delta = LazyRollingDeltasFromHeight::new(
&format!("{name}_delta"),
version + Version::ONE,
&inner.sats.height,
cached_starts,
indexes,
);
Ok(Self { inner, delta })
}
}

View File

@@ -46,6 +46,7 @@ where
T: Copy + Ord + From<f64> + Default,
f64: From<T>,
{
let mut values_cache = None;
macro_rules! compute_window {
($w:ident) => {
compute_rolling_distribution_from_starts(
@@ -61,13 +62,15 @@ where
&mut self.0.pct75.$w.height,
&mut self.0.pct90.$w.height,
exit,
&mut values_cache,
)?
};
}
compute_window!(_24h);
compute_window!(_1w);
compute_window!(_1m);
// Largest window first: its cache covers all smaller windows.
compute_window!(_1y);
compute_window!(_1m);
compute_window!(_1w);
compute_window!(_24h);
Ok(())
}

View File

@@ -325,25 +325,33 @@ impl Computer {
.compute(indexer, &self.indexes, &starting_indexes, exit)
})?;
timed("Computed inputs", || {
self.inputs.compute(
indexer,
&self.indexes,
&self.blocks,
&starting_indexes,
exit,
)
})?;
timed("Computed scripts", || {
self.scripts.compute(
indexer,
&self.outputs,
&self.prices,
&starting_indexes,
exit,
)
})?;
// inputs and scripts are independent — parallelize
let (inputs_result, scripts_result) = rayon::join(
|| {
timed("Computed inputs", || {
self.inputs.compute(
indexer,
&self.indexes,
&self.blocks,
&starting_indexes,
exit,
)
})
},
|| {
timed("Computed scripts", || {
self.scripts.compute(
indexer,
&self.outputs,
&self.prices,
&starting_indexes,
exit,
)
})
},
);
inputs_result?;
scripts_result?;
timed("Computed outputs", || {
self.outputs.compute(

View File

@@ -18,62 +18,68 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.coinbase.compute(
starting_indexes.height,
prices,
exit,
|vec| {
// Cursors avoid per-height PcoVec page decompression for the
// tx-indexed lookups. Coinbase tx_index values are strictly
// increasing, so the cursors only advance forward.
let mut txout_cursor = indexer.vecs.transactions.first_txout_index.cursor();
let mut count_cursor = indexes.tx_index.output_count.cursor();
vec.compute_transform(
starting_indexes.height,
&indexer.vecs.transactions.first_tx_index,
|(height, tx_index, ..)| {
let ti = tx_index.to_usize();
txout_cursor.advance(ti - txout_cursor.position());
let first_txout_index = txout_cursor.next().unwrap().to_usize();
count_cursor.advance(ti - count_cursor.position());
let output_count: usize = count_cursor.next().unwrap().into();
let sats = indexer.vecs.outputs.value.fold_range_at(
first_txout_index,
first_txout_index + output_count,
Sats::ZERO,
|acc, v| acc + v,
);
(height, sats)
},
exit,
)?;
Ok(())
},
)?;
// Coinbase fee is 0, so including it in the sum doesn't affect the result
// coinbase and fees are independent — parallelize
let window_starts = lookback.window_starts();
self.fees.compute(
starting_indexes.height,
&window_starts,
prices,
exit,
|vec| {
vec.compute_sum_from_indexes(
let (r_coinbase, r_fees) = rayon::join(
|| {
self.coinbase.compute(
starting_indexes.height,
&indexer.vecs.transactions.first_tx_index,
&indexes.height.tx_index_count,
&transactions_fees.fee.tx_index,
prices,
exit,
)?;
Ok(())
|vec| {
let mut txout_cursor =
indexer.vecs.transactions.first_txout_index.cursor();
let mut count_cursor = indexes.tx_index.output_count.cursor();
vec.compute_transform(
starting_indexes.height,
&indexer.vecs.transactions.first_tx_index,
|(height, tx_index, ..)| {
let ti = tx_index.to_usize();
txout_cursor.advance(ti - txout_cursor.position());
let first_txout_index =
txout_cursor.next().unwrap().to_usize();
count_cursor.advance(ti - count_cursor.position());
let output_count: usize =
count_cursor.next().unwrap().into();
let sats = indexer.vecs.outputs.value.fold_range_at(
first_txout_index,
first_txout_index + output_count,
Sats::ZERO,
|acc, v| acc + v,
);
(height, sats)
},
exit,
)?;
Ok(())
},
)
},
)?;
|| {
self.fees.compute(
starting_indexes.height,
&window_starts,
prices,
exit,
|vec| {
vec.compute_sum_from_indexes(
starting_indexes.height,
&indexer.vecs.transactions.first_tx_index,
&indexes.height.tx_index_count,
&transactions_fees.fee.tx_index,
exit,
)?;
Ok(())
},
)
},
);
r_coinbase?;
r_fees?;
self.subsidy.base.sats.height.compute_transform2(
starting_indexes.height,
@@ -110,7 +116,6 @@ impl Vecs {
},
)?;
// All-time cumulative fee dominance
self.fee_dominance
.compute_binary::<Sats, Sats, RatioSatsBp16>(
starting_indexes.height,
@@ -119,7 +124,6 @@ impl Vecs {
exit,
)?;
// Rolling fee dominance = sum(fees) / sum(coinbase)
self.fee_dominance_rolling
.compute_binary::<Sats, Sats, RatioSatsBp16, _, _>(
starting_indexes.height,
@@ -128,7 +132,6 @@ impl Vecs {
exit,
)?;
// All-time cumulative subsidy dominance
self.subsidy_dominance
.compute_binary::<Sats, Sats, RatioSatsBp16>(
starting_indexes.height,
@@ -144,7 +147,6 @@ impl Vecs {
exit,
)?;
// Fee Ratio Multiple: sum(coinbase) / sum(fees) per rolling window
self.fee_ratio_multiple
.compute_binary::<Dollars, Dollars, RatioDollarsBp32, _, _>(
starting_indexes.height,

View File

@@ -22,7 +22,7 @@ impl Vecs {
let version = parent_version;
let count = CountVecs::forced_import(&db, version, indexes, cached_starts)?;
let value = ValueVecs::forced_import(&db, version, indexes)?;
let value = ValueVecs::forced_import(&db, version, indexes, cached_starts)?;
let adoption = AdoptionVecs::forced_import(&db, version, indexes)?;
let this = Self {

View File

@@ -14,7 +14,7 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.op_return.compute_with(
self.op_return.compute(
starting_indexes.height,
prices,
exit,

View File

@@ -3,20 +3,22 @@ use brk_types::Version;
use vecdb::Database;
use super::Vecs;
use crate::{indexes, internal::AmountPerBlockCumulative};
use crate::{indexes, internal::{AmountPerBlockCumulativeWithSums, CachedWindowStarts}};
impl Vecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &CachedWindowStarts,
) -> Result<Self> {
Ok(Self {
op_return: AmountPerBlockCumulative::forced_import(
op_return: AmountPerBlockCumulativeWithSums::forced_import(
db,
"op_return_value",
version,
indexes,
cached_starts,
)?,
})
}

View File

@@ -1,9 +1,9 @@
use brk_traversable::Traversable;
use vecdb::{Rw, StorageMode};
use crate::internal::AmountPerBlockCumulative;
use crate::internal::AmountPerBlockCumulativeWithSums;
#[derive(Traversable)]
pub struct Vecs<M: StorageMode = Rw> {
pub op_return: AmountPerBlockCumulative<M>,
pub op_return: AmountPerBlockCumulativeWithSums<M>,
}

View File

@@ -14,47 +14,7 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.op_return.compute(
starting_indexes.height,
prices,
exit,
|height_vec| {
// Validate computed versions against dependencies
let op_return_dep_version = scripts.value.op_return.base.sats.height.version();
height_vec.validate_computed_version_or_reset(op_return_dep_version)?;
// Copy per-block op_return values from scripts
let scripts_target = scripts.value.op_return.base.sats.height.len();
if scripts_target > 0 {
let target_height = Height::from(scripts_target - 1);
let current_len = height_vec.len();
let starting_height =
Height::from(current_len.min(starting_indexes.height.to_usize()));
if starting_height <= target_height {
let start = starting_height.to_usize();
let end = target_height.to_usize() + 1;
scripts.value.op_return.base.sats.height.fold_range_at(
start,
end,
start,
|idx, value| {
height_vec.truncate_push(Height::from(idx), value).unwrap();
idx + 1
},
);
}
}
height_vec.write()?;
Ok(())
},
)?;
// 2. Compute unspendable supply = op_return + unclaimed_rewards + genesis (at height 0)
// Get reference to op_return height vec for computing unspendable
let op_return_height = &self.op_return.base.sats.height;
let op_return_height = &scripts.value.op_return.base.sats.height;
let unclaimed_height = &mining.rewards.unclaimed.base.sats.height;
self.unspendable.compute(

View File

@@ -13,13 +13,6 @@ impl Vecs {
cached_starts: &CachedWindowStarts,
) -> Result<Self> {
Ok(Self {
op_return: AmountPerBlockCumulativeWithSums::forced_import(
db,
"op_return_supply",
version,
indexes,
cached_starts,
)?,
unspendable: AmountPerBlockCumulativeWithSums::forced_import(
db,
"unspendable_supply",

View File

@@ -5,6 +5,5 @@ use crate::internal::AmountPerBlockCumulativeWithSums;
#[derive(Traversable)]
pub struct Vecs<M: StorageMode = Rw> {
pub op_return: AmountPerBlockCumulativeWithSums<M>,
pub unspendable: AmountPerBlockCumulativeWithSums<M>,
}

View File

@@ -20,17 +20,19 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
// Count computes first
self.count
.compute(indexer, &blocks.lookback, starting_indexes, exit)?;
// Versions depends on count
self.versions
.compute(indexer, starting_indexes, exit)?;
// Size computes next (uses 6-block rolling window)
self.size
.compute(indexer, indexes, starting_indexes, exit)?;
// count, versions, size are independent — parallelize
let (r1, (r2, r3)) = rayon::join(
|| self.count.compute(indexer, &blocks.lookback, starting_indexes, exit),
|| {
rayon::join(
|| self.versions.compute(indexer, starting_indexes, exit),
|| self.size.compute(indexer, indexes, starting_indexes, exit),
)
},
);
r1?;
r2?;
r3?;
// Fees depends on size
self.fees

View File

@@ -18,21 +18,29 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.input_value.compute_sum_from_indexes(
starting_indexes.tx_index,
&indexer.vecs.transactions.first_txin_index,
&indexes.tx_index.input_count,
&txins.spent.value,
exit,
)?;
self.output_value.compute_sum_from_indexes(
starting_indexes.tx_index,
&indexer.vecs.transactions.first_txout_index,
&indexes.tx_index.output_count,
&indexer.vecs.outputs.value,
exit,
)?;
// input_value and output_value are independent — parallelize
let (r1, r2) = rayon::join(
|| {
self.input_value.compute_sum_from_indexes(
starting_indexes.tx_index,
&indexer.vecs.transactions.first_txin_index,
&indexes.tx_index.input_count,
&txins.spent.value,
exit,
)
},
|| {
self.output_value.compute_sum_from_indexes(
starting_indexes.tx_index,
&indexer.vecs.transactions.first_txout_index,
&indexes.tx_index.output_count,
&indexer.vecs.outputs.value,
exit,
)
},
);
r1?;
r2?;
self.fee.tx_index.compute_transform2(
starting_indexes.tx_index,

View File

@@ -1,10 +1,9 @@
use brk_error::Result;
use brk_indexer::Indexer;
use brk_types::{Indexes, StoredU64, TxVersion};
use vecdb::{Exit, ReadableVec, VecIndex};
use vecdb::{AnyStoredVec, AnyVec, Exit, ReadableVec, VecIndex, WritableVec};
use super::Vecs;
use crate::internal::PerBlockCumulativeWithSums;
impl Vecs {
pub(crate) fn compute(
@@ -13,30 +12,86 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
let tx_vany = |tx_vany: &mut PerBlockCumulativeWithSums<StoredU64, StoredU64>,
tx_version: TxVersion| {
let tx_version_vec = &indexer.vecs.transactions.tx_version;
// Cursor avoids per-transaction PcoVec page decompression.
// Txindex values are sequential, so the cursor only advances forward.
let mut cursor = tx_version_vec.cursor();
tx_vany.compute(starting_indexes.height, exit, |vec| {
vec.compute_filtered_count_from_indexes(
starting_indexes.height,
&indexer.vecs.transactions.first_tx_index,
&indexer.vecs.transactions.txid,
|tx_index| {
let ti = tx_index.to_usize();
cursor.advance(ti - cursor.position());
cursor.next().unwrap() == tx_version
},
exit,
)?;
Ok(())
})
};
tx_vany(&mut self.v1, TxVersion::ONE)?;
tx_vany(&mut self.v2, TxVersion::TWO)?;
tx_vany(&mut self.v3, TxVersion::THREE)?;
let dep_version = indexer.vecs.transactions.tx_version.version()
+ indexer.vecs.transactions.first_tx_index.version()
+ indexer.vecs.transactions.txid.version();
for vec in [
&mut self.v1.base.height,
&mut self.v2.base.height,
&mut self.v3.base.height,
] {
vec.validate_and_truncate(dep_version, starting_indexes.height)?;
}
let skip = self
.v1
.base
.height
.len()
.min(self.v2.base.height.len())
.min(self.v3.base.height.len());
let first_tx_index = &indexer.vecs.transactions.first_tx_index;
let end = first_tx_index.len();
if skip >= end {
return Ok(());
}
// Truncate all 3 to skip, then push (no per-element bounds checks).
self.v1.base.height.truncate_if_needed_at(skip)?;
self.v2.base.height.truncate_if_needed_at(skip)?;
self.v3.base.height.truncate_if_needed_at(skip)?;
// Single cursor over tx_version — scanned once for all 3 version counts.
let mut cursor = indexer.vecs.transactions.tx_version.cursor();
let fi_batch = first_tx_index.collect_range_at(skip, end);
let txid_len = indexer.vecs.transactions.txid.len();
for (j, first_index) in fi_batch.iter().enumerate() {
let next_first = fi_batch
.get(j + 1)
.map(|fi| fi.to_usize())
.unwrap_or(txid_len);
let mut c1: usize = 0;
let mut c2: usize = 0;
let mut c3: usize = 0;
let fi = first_index.to_usize();
cursor.advance(fi - cursor.position());
for _ in fi..next_first {
match cursor.next().unwrap() {
TxVersion::ONE => c1 += 1,
TxVersion::TWO => c2 += 1,
TxVersion::THREE => c3 += 1,
_ => {}
}
}
self.v1.base.height.push(StoredU64::from(c1 as u64));
self.v2.base.height.push(StoredU64::from(c2 as u64));
self.v3.base.height.push(StoredU64::from(c3 as u64));
if self.v1.base.height.batch_limit_reached() {
let _lock = exit.lock();
self.v1.base.height.write()?;
self.v2.base.height.write()?;
self.v3.base.height.write()?;
}
}
{
let _lock = exit.lock();
self.v1.base.height.write()?;
self.v2.base.height.write()?;
self.v3.base.height.write()?;
}
// Derive cumulative + sums from base
self.v1.compute_rest(starting_indexes.height, exit)?;
self.v2.compute_rest(starting_indexes.height, exit)?;
self.v3.compute_rest(starting_indexes.height, exit)?;
Ok(())
}

View File

@@ -22,36 +22,44 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.sent_sum.compute(
starting_indexes.height,
prices,
exit,
|sats_vec| {
Ok(sats_vec.compute_filtered_sum_from_indexes(
// sent_sum and received_sum are independent — parallelize
let (r1, r2) = rayon::join(
|| {
self.sent_sum.compute(
starting_indexes.height,
&indexer.vecs.transactions.first_tx_index,
&indexes.height.tx_index_count,
&fees_vecs.input_value,
|sats| !sats.is_max(),
prices,
exit,
)?)
|sats_vec| {
Ok(sats_vec.compute_filtered_sum_from_indexes(
starting_indexes.height,
&indexer.vecs.transactions.first_tx_index,
&indexes.height.tx_index_count,
&fees_vecs.input_value,
|sats| !sats.is_max(),
exit,
)?)
},
)
},
)?;
self.received_sum.compute(
starting_indexes.height,
prices,
exit,
|sats_vec| {
Ok(sats_vec.compute_sum_from_indexes(
|| {
self.received_sum.compute(
starting_indexes.height,
&indexer.vecs.transactions.first_tx_index,
&indexes.height.tx_index_count,
&fees_vecs.output_value,
prices,
exit,
)?)
|sats_vec| {
Ok(sats_vec.compute_sum_from_indexes(
starting_indexes.height,
&indexer.vecs.transactions.first_tx_index,
&indexes.height.tx_index_count,
&fees_vecs.output_value,
exit,
)?)
},
)
},
)?;
);
r1?;
r2?;
self.tx_per_sec
.height