global: snapshot

This commit is contained in:
nym21
2026-03-02 13:34:45 +01:00
parent 7cb1bfa667
commit 4d97cec869
57 changed files with 1724 additions and 2011 deletions

View File

@@ -48,6 +48,7 @@ pub(crate) fn process_blocks(
starting_height: Height,
last_height: Height,
chain_state: &mut Vec<BlockState>,
txindex_to_height: &mut RangeMap<TxIndex, Height>,
exit: &Exit,
) -> Result<()> {
// Create computation context with pre-computed vectors for thread-safe access
@@ -110,26 +111,28 @@ pub(crate) fn process_blocks(
let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
debug!("VecsReaders created");
// Build txindex -> height lookup map for efficient prev_height computation
debug!("building txindex_to_height RangeMap");
let mut txindex_to_height: RangeMap<TxIndex, Height> = {
let first_txindex_len = indexer.vecs.transactions.first_txindex.len();
let all_first_txindexes: Vec<TxIndex> = indexer
// Extend txindex_to_height RangeMap with new entries (incremental, O(new_blocks))
let target_len = indexer.vecs.transactions.first_txindex.len();
let current_len = txindex_to_height.len();
if current_len < target_len {
debug!("extending txindex_to_height RangeMap from {} to {}", current_len, target_len);
let new_entries: Vec<TxIndex> = indexer
.vecs
.transactions
.first_txindex
.collect_range_at(0, first_txindex_len);
let mut map = RangeMap::with_capacity(first_txindex_len);
for first_txindex in all_first_txindexes {
map.push(first_txindex);
.collect_range_at(current_len, target_len);
for first_txindex in new_entries {
txindex_to_height.push(first_txindex);
}
map
};
debug!("txindex_to_height RangeMap built");
} else if current_len > target_len {
debug!("truncating txindex_to_height RangeMap from {} to {}", current_len, target_len);
txindex_to_height.truncate(target_len);
}
debug!("txindex_to_height RangeMap ready ({} entries)", txindex_to_height.len());
// Create reusable iterators and buffers for per-block reads
let mut txout_iters = TxOutReaders::new(indexer);
let mut txin_iters = TxInReaders::new(indexer, inputs, &mut txindex_to_height);
let mut txin_iters = TxInReaders::new(indexer, inputs, txindex_to_height);
let mut txout_to_txindex_buf = IndexToTxIndexBuf::new();
let mut txin_to_txindex_buf = IndexToTxIndexBuf::new();

View File

@@ -12,7 +12,7 @@ const CACHE_SIZE: usize = 8;
///
/// Includes an LRU cache of recently accessed ranges to avoid binary search
/// when there's locality in access patterns.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RangeMap<I, V> {
/// Sorted vec of first_index values. Position in vec = value.
first_indexes: Vec<I>,
@@ -44,6 +44,17 @@ impl<I: Ord + Copy + Default, V: From<usize> + Copy + Default> RangeMap<I, V> {
}
}
/// Number of ranges stored.
pub(crate) fn len(&self) -> usize {
self.first_indexes.len()
}
/// Truncate to `new_len` ranges and clear the cache.
pub(crate) fn truncate(&mut self, new_len: usize) {
self.first_indexes.truncate(new_len);
self.cache_len = 0;
}
/// Push a new first_index. Value is implicitly the current length.
/// Must be called in order (first_index must be >= all previous).
#[inline]

View File

@@ -5,7 +5,7 @@ use brk_indexer::Indexer;
use brk_traversable::Traversable;
use brk_types::{
Day1, EmptyAddressData, EmptyAddressIndex, FundedAddressData, FundedAddressIndex, Height,
SupplyState, Version,
SupplyState, TxIndex, Version,
};
use tracing::{debug, info};
use vecdb::{
@@ -23,7 +23,7 @@ use crate::{
};
use super::{
AddressCohorts, AddressesDataVecs, AnyAddressIndexesVecs, UTXOCohorts,
AddressCohorts, AddressesDataVecs, AnyAddressIndexesVecs, RangeMap, UTXOCohorts,
address::{
AddrCountsVecs, AddressActivityVecs, GrowthRateVecs, NewAddrCountVecs, TotalAddrCountVecs,
},
@@ -61,6 +61,14 @@ pub struct Vecs<M: StorageMode = Rw> {
LazyVecFrom1<FundedAddressIndex, FundedAddressIndex, FundedAddressIndex, FundedAddressData>,
pub emptyaddressindex:
LazyVecFrom1<EmptyAddressIndex, EmptyAddressIndex, EmptyAddressIndex, EmptyAddressData>,
/// In-memory block state for UTXO processing. Persisted via supply_state.
/// Kept across compute() calls to avoid O(n) rebuild on resume.
#[traversable(skip)]
chain_state: Vec<BlockState>,
/// In-memory txindex→height reverse lookup. Kept across compute() calls.
#[traversable(skip)]
txindex_to_height: RangeMap<TxIndex, Height>,
}
const SAVED_STAMPED_CHANGES: u16 = 10;
@@ -148,6 +156,9 @@ impl Vecs {
fundedaddressindex,
emptyaddressindex,
chain_state: Vec::new(),
txindex_to_height: RangeMap::default(),
db,
states_path,
};
@@ -230,8 +241,12 @@ impl Vecs {
debug!("recovered_height={}", recovered_height);
// Fresh start: reset all state
let (starting_height, mut chain_state) = if recovered_height.is_zero() {
// Take chain_state and txindex_to_height out of self to avoid borrow conflicts
let mut chain_state = std::mem::take(&mut self.chain_state);
let mut txindex_to_height = std::mem::take(&mut self.txindex_to_height);
// Recover or reuse chain_state
let starting_height = if recovered_height.is_zero() {
self.supply_state.reset()?;
self.addr_count.reset_height()?;
self.empty_addr_count.reset_height()?;
@@ -243,11 +258,18 @@ impl Vecs {
&mut self.address_cohorts,
)?;
chain_state.clear();
txindex_to_height.truncate(0);
info!("State recovery: fresh start");
(Height::ZERO, vec![])
Height::ZERO
} else if chain_state.len() == usize::from(recovered_height) {
// Normal resume: chain_state already matches, reuse as-is
debug!("reusing in-memory chain_state ({} entries)", chain_state.len());
recovered_height
} else {
// Recover chain_state from stored values
debug!("recovering chain_state from stored values");
// Rollback or first run after restart: rebuild from supply_state
debug!("rebuilding chain_state from stored values");
let height_to_timestamp = &blocks.time.timestamp_monotonic;
let height_to_price = &prices.price.cents.height;
@@ -257,7 +279,7 @@ impl Vecs {
debug!("building supply_state vec for {} heights", recovered_height);
let supply_state_data: Vec<_> = self.supply_state.collect_range_at(0, end);
let chain_state = supply_state_data
chain_state = supply_state_data
.into_iter()
.enumerate()
.map(|(h, supply)| BlockState {
@@ -266,9 +288,12 @@ impl Vecs {
timestamp: timestamp_data[h],
})
.collect();
debug!("chain_state vec built");
debug!("chain_state rebuilt");
(recovered_height, chain_state)
// Truncate RangeMap to match (entries are immutable, safe to keep)
txindex_to_height.truncate(end);
recovered_height
};
// Update starting_indexes if we need to recompute from an earlier point
@@ -316,10 +341,15 @@ impl Vecs {
starting_height,
last_height,
&mut chain_state,
&mut txindex_to_height,
exit,
)?;
}
// Put chain_state and txindex_to_height back
self.chain_state = chain_state;
self.txindex_to_height = txindex_to_height;
// 5. Compute aggregates (overlapping cohorts from separate cohorts)
aggregates::compute_overlapping(
&mut self.utxo_cohorts,