global: final snapshot and fixes before release

This commit is contained in:
nym21
2026-03-22 23:16:52 +01:00
parent 514fdc40ee
commit 514b0513de
34 changed files with 323 additions and 210 deletions

View File

@@ -4,8 +4,8 @@ use brk_error::Result;
use brk_indexer::Indexer;
use brk_traversable::Traversable;
use brk_types::{
Cents, EmptyAddrData, EmptyAddrIndex, FundedAddrData, FundedAddrIndex, Height,
Indexes, StoredF64, SupplyState, Timestamp, TxIndex, Version,
Cents, EmptyAddrData, EmptyAddrIndex, FundedAddrData, FundedAddrIndex, Height, Indexes,
StoredF64, SupplyState, Timestamp, TxIndex, Version,
};
use tracing::{debug, info};
use vecdb::{
@@ -23,15 +23,16 @@ use crate::{
state::BlockState,
},
indexes, inputs,
internal::{CachedWindowStarts, PerBlockCumulativeRolling, db_utils::{finalize_db, open_db}},
internal::{
CachedWindowStarts, PerBlockCumulativeRolling,
db_utils::{finalize_db, open_db},
},
outputs, prices, transactions,
};
use super::{
AddrCohorts, AddrsDataVecs, AnyAddrIndexesVecs, RangeMap, UTXOCohorts,
addr::{
AddrCountsVecs, AddrActivityVecs, DeltaVecs, NewAddrCountVecs, TotalAddrCountVecs,
},
addr::{AddrActivityVecs, AddrCountsVecs, DeltaVecs, NewAddrCountVecs, TotalAddrCountVecs},
};
const VERSION: Version = Version::new(22);
@@ -48,8 +49,7 @@ pub struct AddrMetricsVecs<M: StorageMode = Rw> {
pub funded_index:
LazyVecFrom1<FundedAddrIndex, FundedAddrIndex, FundedAddrIndex, FundedAddrData>,
#[traversable(wrap = "indexes", rename = "empty")]
pub empty_index:
LazyVecFrom1<EmptyAddrIndex, EmptyAddrIndex, EmptyAddrIndex, EmptyAddrData>,
pub empty_index: LazyVecFrom1<EmptyAddrIndex, EmptyAddrIndex, EmptyAddrIndex, EmptyAddrData>,
}
#[derive(Traversable)]
@@ -73,23 +73,26 @@ pub struct Vecs<M: StorageMode = Rw> {
pub coinblocks_destroyed: PerBlockCumulativeRolling<StoredF64, StoredF64, M>,
pub addrs: AddrMetricsVecs<M>,
/// In-memory block state for UTXO processing. Persisted via supply_state.
/// Kept across compute() calls to avoid O(n) rebuild on resume.
/// In-memory state that does NOT survive rollback.
/// Grouped so that adding a new field automatically gets it reset.
#[traversable(skip)]
chain_state: Vec<BlockState>,
/// In-memory tx_index→height reverse lookup. Kept across compute() calls.
#[traversable(skip)]
tx_index_to_height: RangeMap<TxIndex, Height>,
caches: DistributionTransientState,
}
/// Cached height→price mapping. Incrementally extended, O(new_blocks) on resume.
#[traversable(skip)]
cached_prices: Vec<Cents>,
/// Cached height→timestamp mapping. Incrementally extended, O(new_blocks) on resume.
#[traversable(skip)]
cached_timestamps: Vec<Timestamp>,
/// Cached sparse table for O(1) range-max price queries. Incrementally extended.
#[traversable(skip)]
cached_price_range_max: PriceRangeMax,
/// In-memory state that does NOT survive rollback.
/// On rollback, the entire struct is replaced with `Default::default()`.
#[derive(Clone, Default)]
struct DistributionTransientState {
/// Block state for UTXO processing. Persisted via supply_state.
chain_state: Vec<BlockState>,
/// tx_index→height reverse lookup.
tx_index_to_height: RangeMap<TxIndex, Height>,
/// Height→price mapping. Incrementally extended.
prices: Vec<Cents>,
/// Height→timestamp mapping. Incrementally extended.
timestamps: Vec<Timestamp>,
/// Sparse table for O(1) range-max price queries. Incrementally extended.
price_range_max: PriceRangeMax,
}
const SAVED_STAMPED_CHANGES: u16 = 10;
@@ -109,9 +112,11 @@ impl Vecs {
let version = parent_version + VERSION;
let utxo_cohorts = UTXOCohorts::forced_import(&db, version, indexes, &states_path, cached_starts)?;
let utxo_cohorts =
UTXOCohorts::forced_import(&db, version, indexes, &states_path, cached_starts)?;
let addr_cohorts = AddrCohorts::forced_import(&db, version, indexes, &states_path, cached_starts)?;
let addr_cohorts =
AddrCohorts::forced_import(&db, version, indexes, &states_path, cached_starts)?;
// Create address data BytesVecs first so we can also use them for identity mappings
let funded_addr_index_to_funded_addr_data = BytesVec::forced_import_with(
@@ -147,8 +152,7 @@ impl Vecs {
let total_addr_count = TotalAddrCountVecs::forced_import(&db, version, indexes)?;
// Per-block delta of total (global + per-type)
let new_addr_count =
NewAddrCountVecs::forced_import(&db, version, indexes, cached_starts)?;
let new_addr_count = NewAddrCountVecs::forced_import(&db, version, indexes, cached_starts)?;
// Growth rate: delta change + rate (global + per-type)
let delta = DeltaVecs::new(version, &addr_count, cached_starts, indexes);
@@ -186,12 +190,7 @@ impl Vecs {
funded: funded_addr_index_to_funded_addr_data,
empty: empty_addr_index_to_empty_addr_data,
},
chain_state: Vec::new(),
tx_index_to_height: RangeMap::default(),
cached_prices: Vec::new(),
cached_timestamps: Vec::new(),
cached_price_range_max: PriceRangeMax::default(),
caches: DistributionTransientState::default(),
db,
states_path,
@@ -201,6 +200,12 @@ impl Vecs {
Ok(this)
}
/// Reset in-memory caches that become stale after rollback.
fn reset_in_memory_caches(&mut self) {
self.utxo_cohorts.reset_caches();
self.caches = DistributionTransientState::default();
}
/// Main computation loop.
///
/// Processes blocks to compute UTXO and address cohort metrics:
@@ -222,32 +227,6 @@ impl Vecs {
starting_indexes: &mut Indexes,
exit: &Exit,
) -> Result<()> {
let cache_target_len = prices
.spot
.cents
.height
.len()
.min(blocks.time.timestamp_monotonic.len());
let cache_current_len = self.cached_prices.len();
if cache_target_len < cache_current_len {
self.cached_prices.truncate(cache_target_len);
self.cached_timestamps.truncate(cache_target_len);
self.cached_price_range_max.truncate(cache_target_len);
} else if cache_target_len > cache_current_len {
let new_prices = prices
.spot
.cents
.height
.collect_range_at(cache_current_len, cache_target_len);
let new_timestamps = blocks
.time
.timestamp_monotonic
.collect_range_at(cache_current_len, cache_target_len);
self.cached_prices.extend(new_prices);
self.cached_timestamps.extend(new_timestamps);
}
self.cached_price_range_max.extend(&self.cached_prices);
// 1. Find minimum height we have data for across stateful vecs
let current_height = Height::from(self.supply_state.len());
let min_stateful = self.min_stateful_len();
@@ -281,9 +260,6 @@ impl Vecs {
&mut self.addr_cohorts,
)?;
if recovered.starting_height.is_zero() {
info!("State recovery validation failed, falling back to fresh start");
}
debug!(
"recover_state completed, starting_height={}",
recovered.starting_height
@@ -295,12 +271,14 @@ impl Vecs {
debug!("recovered_height={}", recovered_height);
// Take chain_state and tx_index_to_height out of self to avoid borrow conflicts
let mut chain_state = std::mem::take(&mut self.chain_state);
let mut tx_index_to_height = std::mem::take(&mut self.tx_index_to_height);
let needs_fresh_start = recovered_height.is_zero();
let needs_rollback = recovered_height < current_height;
// Recover or reuse chain_state
let starting_height = if recovered_height.is_zero() {
if needs_fresh_start || needs_rollback {
self.reset_in_memory_caches();
}
if needs_fresh_start {
self.supply_state.reset()?;
self.addrs.funded.reset_height()?;
self.addrs.empty.reset_height()?;
@@ -311,11 +289,44 @@ impl Vecs {
&mut self.utxo_cohorts,
&mut self.addr_cohorts,
)?;
chain_state.clear();
tx_index_to_height.truncate(0);
info!("State recovery: fresh start");
}
// Populate price/timestamp caches from the prices module.
// Must happen AFTER rollback/reset (which clears caches) but BEFORE
// chain_state rebuild (which reads from them).
let cache_target_len = prices
.spot
.cents
.height
.len()
.min(blocks.time.timestamp_monotonic.len());
let cache_current_len = self.caches.prices.len();
if cache_target_len < cache_current_len {
self.caches.prices.truncate(cache_target_len);
self.caches.timestamps.truncate(cache_target_len);
self.caches.price_range_max.truncate(cache_target_len);
} else if cache_target_len > cache_current_len {
let new_prices = prices
.spot
.cents
.height
.collect_range_at(cache_current_len, cache_target_len);
let new_timestamps = blocks
.time
.timestamp_monotonic
.collect_range_at(cache_current_len, cache_target_len);
self.caches.prices.extend(new_prices);
self.caches.timestamps.extend(new_timestamps);
}
self.caches.price_range_max.extend(&self.caches.prices);
// Take chain_state and tx_index_to_height out of self to avoid borrow conflicts
let mut chain_state = std::mem::take(&mut self.caches.chain_state);
let mut tx_index_to_height = std::mem::take(&mut self.caches.tx_index_to_height);
// Recover or reuse chain_state
let starting_height = if recovered_height.is_zero() {
Height::ZERO
} else if chain_state.len() == usize::from(recovered_height) {
// Normal resume: chain_state already matches, reuse as-is
@@ -335,8 +346,8 @@ impl Vecs {
.enumerate()
.map(|(h, supply)| BlockState {
supply,
price: self.cached_prices[h],
timestamp: self.cached_timestamps[h],
price: self.caches.prices[h],
timestamp: self.caches.timestamps[h],
})
.collect();
debug!("chain_state rebuilt");
@@ -352,12 +363,11 @@ impl Vecs {
starting_indexes.height = starting_height;
}
// 2b. Validate computed versions
// 2c. Validate computed versions
debug!("validating computed versions");
let base_version = VERSION;
self.utxo_cohorts.validate_computed_versions(base_version)?;
self.addr_cohorts
.validate_computed_versions(base_version)?;
self.addr_cohorts.validate_computed_versions(base_version)?;
debug!("computed versions validated");
// 3. Get last height from indexer
@@ -371,9 +381,9 @@ impl Vecs {
if starting_height <= last_height {
debug!("calling process_blocks");
let cached_prices = std::mem::take(&mut self.cached_prices);
let cached_timestamps = std::mem::take(&mut self.cached_timestamps);
let cached_price_range_max = std::mem::take(&mut self.cached_price_range_max);
let prices = std::mem::take(&mut self.caches.prices);
let timestamps = std::mem::take(&mut self.caches.timestamps);
let price_range_max = std::mem::take(&mut self.caches.price_range_max);
process_blocks(
self,
@@ -386,27 +396,33 @@ impl Vecs {
last_height,
&mut chain_state,
&mut tx_index_to_height,
&cached_prices,
&cached_timestamps,
&cached_price_range_max,
&prices,
&timestamps,
&price_range_max,
exit,
)?;
self.cached_prices = cached_prices;
self.cached_timestamps = cached_timestamps;
self.cached_price_range_max = cached_price_range_max;
self.caches.prices = prices;
self.caches.timestamps = timestamps;
self.caches.price_range_max = price_range_max;
}
// Put chain_state and tx_index_to_height back
self.chain_state = chain_state;
self.tx_index_to_height = tx_index_to_height;
self.caches.chain_state = chain_state;
self.caches.tx_index_to_height = tx_index_to_height;
// 5. Compute aggregates (overlapping cohorts from separate cohorts)
info!("Computing overlapping cohorts...");
{
let (r1, r2) = rayon::join(
|| self.utxo_cohorts.compute_overlapping_vecs(starting_indexes, exit),
|| self.addr_cohorts.compute_overlapping_vecs(starting_indexes, exit),
|| {
self.utxo_cohorts
.compute_overlapping_vecs(starting_indexes, exit)
},
|| {
self.addr_cohorts
.compute_overlapping_vecs(starting_indexes, exit)
},
);
r1?;
r2?;
@@ -420,8 +436,14 @@ impl Vecs {
info!("Computing rest part 1...");
{
let (r1, r2) = rayon::join(
|| self.utxo_cohorts.compute_rest_part1(prices, starting_indexes, exit),
|| self.addr_cohorts.compute_rest_part1(prices, starting_indexes, exit),
|| {
self.utxo_cohorts
.compute_rest_part1(prices, starting_indexes, exit)
},
|| {
self.addr_cohorts
.compute_rest_part1(prices, starting_indexes, exit)
},
);
r1?;
r2?;
@@ -442,11 +464,9 @@ impl Vecs {
self.addrs
.activity
.compute_rest(starting_indexes.height, exit)?;
self.addrs.new.compute(
starting_indexes.height,
&self.addrs.total,
exit,
)?;
self.addrs
.new
.compute(starting_indexes.height, &self.addrs.total, exit)?;
// 7. Compute rest part2 (relative metrics)
let height_to_market_cap = self
@@ -468,7 +488,14 @@ impl Vecs {
exit,
)?;
let all_utxo_count = self.utxo_cohorts.all.metrics.outputs.unspent_count.height.read_only_clone();
let all_utxo_count = self
.utxo_cohorts
.all
.metrics
.outputs
.unspent_count
.height
.read_only_clone();
self.addr_cohorts
.compute_rest_part2(prices, starting_indexes, &all_utxo_count, exit)?;