global: snapshot

This commit is contained in:
nym21
2026-03-02 19:44:45 +01:00
parent 4e7cd9ab6f
commit ccb2db2309
37 changed files with 337 additions and 1412 deletions

View File

@@ -3,16 +3,13 @@ use std::thread;
use brk_cohort::ByAddressType;
use brk_error::Result;
use brk_indexer::Indexer;
use brk_types::{
Cents, Date, Day1, Height, OutputType, Sats, StoredU64, Timestamp, TxIndex, TypeIndex,
};
use brk_types::{Cents, Date, Height, OutputType, Sats, Timestamp, TxIndex, TypeIndex, ONE_DAY_IN_SEC};
use rayon::prelude::*;
use rustc_hash::FxHashSet;
use tracing::{debug, info};
use vecdb::{AnyVec, Exit, ReadableVec, VecIndex};
use crate::{
blocks,
distribution::{
address::{AddressTypeToActivityCounts, AddressTypeToAddressCount},
block::{
@@ -45,7 +42,6 @@ pub(crate) fn process_blocks(
inputs: &inputs::Vecs,
outputs: &outputs::Vecs,
transactions: &transactions::Vecs,
blocks: &blocks::Vecs,
starting_height: Height,
last_height: Height,
chain_state: &mut Vec<BlockState>,
@@ -73,9 +69,6 @@ pub(crate) fn process_blocks(
let height_to_tx_count = &transactions.count.tx_count.height;
let height_to_output_count = &outputs.count.total_count.full.sum_cumulative.sum.0;
let height_to_input_count = &inputs.count.full.sum_cumulative.sum.0;
let height_to_date = &blocks.time.date;
let day1_to_first_height = &indexes.day1.first_height;
let day1_to_height_count = &indexes.day1.height_count;
let txindex_to_output_count = &indexes.txindex.output_count;
let txindex_to_input_count = &indexes.txindex.input_count;
@@ -201,21 +194,16 @@ pub(crate) fn process_blocks(
// Track activity counts - reset each block
let mut activity_counts = AddressTypeToActivityCounts::default();
// Pre-collect lazy vecs that don't support iterators
let height_to_date_vec: Vec<Date> = height_to_date.collect_range_at(start_usize, end_usize);
debug!("creating AddressCache");
let mut cache = AddressCache::new();
debug!("AddressCache created, entering main loop");
// Cache for day1 lookups - same day1 repeats ~140 times per day
let mut cached_day1: Option<Day1> = None;
let mut cached_date_first_height = Height::ZERO;
let mut cached_date_height_count = StoredU64::default();
// Reusable hashsets for received addresses (avoid per-block allocation)
let mut received_addresses = ByAddressType::<FxHashSet<TypeIndex>>::default();
// Track earliest chain_state modification from sends (for incremental supply_state writes)
let mut min_supply_modified: Option<Height> = None;
// Main block iteration
for height in starting_height.to_usize()..=last_height.to_usize() {
let height = Height::from(height);
@@ -413,8 +401,13 @@ pub(crate) fn process_blocks(
// Main thread: Update UTXO cohorts
vecs.utxo_cohorts
.receive(transacted, height, timestamp, block_price);
vecs.utxo_cohorts
.send(height_to_sent, chain_state, ctx.price_range_max);
if let Some(min_h) = vecs.utxo_cohorts
.send(height_to_sent, chain_state, ctx.price_range_max)
{
min_supply_modified = Some(
min_supply_modified.map_or(min_h, |cur| cur.min(min_h)),
);
}
});
// Push to height-indexed vectors
@@ -428,26 +421,12 @@ pub(crate) fn process_blocks(
vecs.address_activity
.truncate_push_height(height, &activity_counts)?;
// Get date info for unrealized state computation (cold path - once per day)
// Cache day1 lookups: same day1 repeats ~140 times per day,
// avoiding redundant PcoVec page decompressions.
let date = height_to_date_vec[offset];
let day1 = Day1::try_from(date).unwrap();
let (date_first_height, date_height_count) = if cached_day1 == Some(day1) {
(cached_date_first_height, cached_date_height_count)
} else {
let fh: Height = day1_to_first_height.collect_one(day1).unwrap();
let hc = day1_to_height_count.collect_one(day1).unwrap();
cached_day1 = Some(day1);
cached_date_first_height = fh;
cached_date_height_count = hc;
(fh, hc)
};
let is_date_last_height =
date_first_height + Height::from(date_height_count).decremented().unwrap() == height;
let day1_opt = is_date_last_height.then_some(day1);
let h = height.to_usize();
let is_last_of_day = height == last_height
|| *cached_timestamps[h] / ONE_DAY_IN_SEC
!= *cached_timestamps[h + 1] / ONE_DAY_IN_SEC;
let date_opt = is_last_of_day.then(|| Date::from(timestamp));
// Push cohort states and compute unrealized
push_cohort_states(
&mut vecs.utxo_cohorts,
&mut vecs.address_cohorts,
@@ -455,25 +434,13 @@ pub(crate) fn process_blocks(
block_price,
)?;
// Compute and push percentiles for aggregate cohorts (all, sth, lth)
vecs.utxo_cohorts.truncate_push_aggregate_percentiles(
height,
block_price,
day1_opt,
date_opt,
&vecs.states_path,
)?;
// Compute unrealized peak regret by age range (once per day)
if day1_opt.is_some() {
vecs.utxo_cohorts.compute_and_push_peak_regret(
chain_state,
height,
timestamp,
block_price,
ctx.price_range_max,
)?;
}
// Periodic checkpoint flush
if height != last_height
&& height != Height::ZERO
@@ -495,7 +462,8 @@ pub(crate) fn process_blocks(
let _lock = exit.lock();
// Write to disk (pure I/O) - no changes saved for periodic flushes
write(vecs, height, chain_state, false)?;
write(vecs, height, chain_state, min_supply_modified, false)?;
min_supply_modified = None;
vecs.flush()?;
// Recreate readers
@@ -519,7 +487,7 @@ pub(crate) fn process_blocks(
)?;
// Write to disk (pure I/O) - save changes for rollback
write(vecs, last_height, chain_state, true)?;
write(vecs, last_height, chain_state, min_supply_modified, true)?;
Ok(())
}