global: snapshot

This commit is contained in:
nym21
2026-03-15 11:25:21 +01:00
parent 9e36a4188a
commit 9626c7de32
54 changed files with 884 additions and 750 deletions

View File

@@ -7,7 +7,7 @@ use brk_types::{
use rayon::prelude::*;
use rustc_hash::FxHashSet;
use tracing::{debug, info};
use vecdb::{AnyVec, Exit, ReadableVec, VecIndex, WritableVec};
use vecdb::{AnyStoredVec, AnyVec, Exit, ReadableVec, VecIndex, WritableVec};
use crate::{
distribution::{
@@ -210,6 +210,22 @@ pub(crate) fn process_blocks(
// Initialize Fenwick tree from imported BTreeMap state (one-time)
vecs.utxo_cohorts.init_fenwick_if_needed();
// Pre-truncate all stored vecs to starting_height (one-time).
// This eliminates per-push truncation checks inside the block loop.
{
let start = starting_height.to_usize();
vecs.utxo_cohorts
.par_iter_vecs_mut()
.chain(vecs.address_cohorts.par_iter_vecs_mut())
.chain(vecs.addresses.funded.par_iter_height_mut())
.chain(vecs.addresses.empty.par_iter_height_mut())
.chain(vecs.addresses.activity.par_iter_height_mut())
.chain(rayon::iter::once(
&mut vecs.coinblocks_destroyed.base.height as &mut dyn AnyStoredVec,
))
.try_for_each(|v| v.any_truncate_if_needed_at(start))?;
}
// Reusable hashsets (avoid per-block allocation)
let mut received_addresses = ByAddressType::<FxHashSet<TypeIndex>>::default();
let mut seen_senders = ByAddressType::<FxHashSet<TypeIndex>>::default();
@@ -364,14 +380,13 @@ pub(crate) fn process_blocks(
blocks_old as u128 * u64::from(sent.spendable_supply.value) as u128
})
.sum();
vecs.coinblocks_destroyed.base.height.truncate_push(
height,
vecs.coinblocks_destroyed.base.height.push(
StoredF64::from(total_satblocks as f64 / Sats::ONE_BTC_U128 as f64),
)?;
);
}
// Record maturation (sats crossing age boundaries)
vecs.utxo_cohorts.push_maturation(height, &matured)?;
vecs.utxo_cohorts.push_maturation(&matured);
// Build set of addresses that received this block (for detecting "both" in sent)
// Reuse pre-allocated hashsets: clear preserves capacity, avoiding reallocation
@@ -437,14 +452,10 @@ pub(crate) fn process_blocks(
// Push to height-indexed vectors
vecs.addresses.funded
.truncate_push_height(height, address_counts.sum(), &address_counts)?;
vecs.addresses.empty.truncate_push_height(
height,
empty_address_counts.sum(),
&empty_address_counts,
)?;
vecs.addresses.activity
.truncate_push_height(height, &activity_counts)?;
.push_height(address_counts.sum(), &address_counts);
vecs.addresses.empty
.push_height(empty_address_counts.sum(), &empty_address_counts);
vecs.addresses.activity.push_height(&activity_counts);
let is_last_of_day = is_last_of_day[offset];
let date_opt = is_last_of_day.then(|| Date::from(timestamp));
@@ -454,11 +465,9 @@ pub(crate) fn process_blocks(
&mut vecs.address_cohorts,
height,
block_price,
date_opt.is_some(),
)?;
);
vecs.utxo_cohorts.truncate_push_aggregate_percentiles(
height,
vecs.utxo_cohorts.push_aggregate_percentiles(
block_price,
date_opt,
&vecs.states_path,
@@ -521,42 +530,29 @@ fn push_cohort_states(
address_cohorts: &mut AddressCohorts,
height: Height,
height_price: Cents,
is_day_boundary: bool,
) -> Result<()> {
) {
// Phase 1: push + unrealized (no reset yet — states still needed for aggregation)
let (r1, r2) = rayon::join(
rayon::join(
|| {
utxo_cohorts
.par_iter_separate_mut()
.try_for_each(|v| -> Result<()> {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(
height,
height_price,
is_day_boundary,
)?;
Ok(())
.for_each(|v| {
v.push_state(height);
v.push_unrealized_state(height_price);
})
},
|| {
address_cohorts
.par_iter_separate_mut()
.try_for_each(|v| -> Result<()> {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(
height,
height_price,
is_day_boundary,
)?;
Ok(())
.for_each(|v| {
v.push_state(height);
v.push_unrealized_state(height_price);
})
},
);
r1?;
r2?;
// Phase 2: aggregate age_range realized states → push to overlapping cohorts' RealizedFull
utxo_cohorts.push_overlapping_realized_full(height)?;
utxo_cohorts.push_overlapping_realized_full();
// Phase 3: reset per-block values
utxo_cohorts
@@ -565,6 +561,4 @@ fn push_cohort_states(
address_cohorts
.iter_separate_mut()
.for_each(|v| v.reset_single_iteration_values());
Ok(())
}