global: snapshot

This commit is contained in:
nym21
2026-03-02 15:28:13 +01:00
parent 4d97cec869
commit 4e7cd9ab6f
21 changed files with 595 additions and 373 deletions

View File

@@ -3,7 +3,9 @@ use std::thread;
use brk_cohort::ByAddressType;
use brk_error::Result;
use brk_indexer::Indexer;
use brk_types::{Cents, Date, Day1, Height, OutputType, Sats, StoredU64, TxIndex, TypeIndex};
use brk_types::{
Cents, Date, Day1, Height, OutputType, Sats, StoredU64, Timestamp, TxIndex, TypeIndex,
};
use rayon::prelude::*;
use rustc_hash::FxHashSet;
use tracing::{debug, info};
@@ -20,7 +22,7 @@ use crate::{
compute::write::{process_address_updates, write},
state::{BlockState, Transacted},
},
indexes, inputs, outputs, prices, transactions,
indexes, inputs, outputs, transactions,
};
use super::{
@@ -30,8 +32,8 @@ use super::{
vecs::Vecs,
},
BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1,
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexToTxIndexBuf, TxInReaders,
TxOutReaders, VecsReaders,
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexToTxIndexBuf, PriceRangeMax,
TxInReaders, TxOutReaders, VecsReaders,
};
/// Process all blocks from starting_height to last_height.
@@ -44,52 +46,45 @@ pub(crate) fn process_blocks(
outputs: &outputs::Vecs,
transactions: &transactions::Vecs,
blocks: &blocks::Vecs,
prices: &prices::Vecs,
starting_height: Height,
last_height: Height,
chain_state: &mut Vec<BlockState>,
txindex_to_height: &mut RangeMap<TxIndex, Height>,
cached_prices: &[Cents],
cached_timestamps: &[Timestamp],
cached_price_range_max: &PriceRangeMax,
exit: &Exit,
) -> Result<()> {
// Create computation context with pre-computed vectors for thread-safe access
debug!("creating ComputeContext");
let ctx = ComputeContext::new(starting_height, last_height, blocks, prices);
debug!("ComputeContext created");
let ctx = ComputeContext {
starting_height,
last_height,
height_to_timestamp: cached_timestamps,
height_to_price: cached_prices,
price_range_max: cached_price_range_max,
};
if ctx.starting_height > ctx.last_height {
return Ok(());
}
// References to vectors using correct field paths
// From indexer.vecs:
let height_to_first_txindex = &indexer.vecs.transactions.first_txindex;
let height_to_first_txoutindex = &indexer.vecs.outputs.first_txoutindex;
let height_to_first_txinindex = &indexer.vecs.inputs.first_txinindex;
// From transactions and inputs/outputs (via .height or .height.sum_cumulative.sum patterns):
let height_to_tx_count = &transactions.count.tx_count.height;
let height_to_output_count = &outputs.count.total_count.full.sum_cumulative.sum.0;
let height_to_input_count = &inputs.count.full.sum_cumulative.sum.0;
// From blocks:
let height_to_timestamp = &blocks.time.timestamp_monotonic;
let height_to_date = &blocks.time.date;
let day1_to_first_height = &indexes.day1.first_height;
let day1_to_height_count = &indexes.day1.height_count;
let txindex_to_output_count = &indexes.txindex.output_count;
let txindex_to_input_count = &indexes.txindex.input_count;
// From price - use cents for computation:
let height_to_price = &prices.price.cents.height;
let height_to_price_vec = cached_prices;
let height_to_timestamp_vec = cached_timestamps;
// Access pre-computed vectors from context for thread-safe access
let height_to_price_vec = &ctx.height_to_price;
let height_to_timestamp_vec = &ctx.height_to_timestamp;
// Range for pre-collecting height-indexed vecs
let start_usize = starting_height.to_usize();
let end_usize = last_height.to_usize() + 1;
// Pre-collect height-indexed vecs for the block range (bulk read before hot loop)
let height_to_first_txindex_vec: Vec<TxIndex> =
height_to_first_txindex.collect_range_at(start_usize, end_usize);
let height_to_first_txoutindex_vec: Vec<_> =
@@ -102,10 +97,8 @@ pub(crate) fn process_blocks(
height_to_output_count.collect_range_at(start_usize, end_usize);
let height_to_input_count_vec: Vec<_> =
height_to_input_count.collect_range_at(start_usize, end_usize);
let height_to_timestamp_collected: Vec<_> =
height_to_timestamp.collect_range_at(start_usize, end_usize);
let height_to_price_collected: Vec<_> =
height_to_price.collect_range_at(start_usize, end_usize);
let height_to_timestamp_collected = &cached_timestamps[start_usize..end_usize];
let height_to_price_collected = &cached_prices[start_usize..end_usize];
debug!("creating VecsReaders");
let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
@@ -115,7 +108,10 @@ pub(crate) fn process_blocks(
let target_len = indexer.vecs.transactions.first_txindex.len();
let current_len = txindex_to_height.len();
if current_len < target_len {
debug!("extending txindex_to_height RangeMap from {} to {}", current_len, target_len);
debug!(
"extending txindex_to_height RangeMap from {} to {}",
current_len, target_len
);
let new_entries: Vec<TxIndex> = indexer
.vecs
.transactions
@@ -125,10 +121,16 @@ pub(crate) fn process_blocks(
txindex_to_height.push(first_txindex);
}
} else if current_len > target_len {
debug!("truncating txindex_to_height RangeMap from {} to {}", current_len, target_len);
debug!(
"truncating txindex_to_height RangeMap from {} to {}",
current_len, target_len
);
txindex_to_height.truncate(target_len);
}
debug!("txindex_to_height RangeMap ready ({} entries)", txindex_to_height.len());
debug!(
"txindex_to_height RangeMap ready ({} entries)",
txindex_to_height.len()
);
// Create reusable iterators and buffers for per-block reads
let mut txout_iters = TxOutReaders::new(indexer);
@@ -395,7 +397,7 @@ pub(crate) fn process_blocks(
&mut vecs.address_cohorts,
&mut lookup,
block_price,
&ctx.price_range_max,
ctx.price_range_max,
&mut addr_counts,
&mut empty_addr_counts,
&mut activity_counts,
@@ -412,7 +414,7 @@ pub(crate) fn process_blocks(
vecs.utxo_cohorts
.receive(transacted, height, timestamp, block_price);
vecs.utxo_cohorts
.send(height_to_sent, chain_state, &ctx.price_range_max);
.send(height_to_sent, chain_state, ctx.price_range_max);
});
// Push to height-indexed vectors
@@ -468,7 +470,7 @@ pub(crate) fn process_blocks(
height,
timestamp,
block_price,
&ctx.price_range_max,
ctx.price_range_max,
)?;
}

View File

@@ -2,150 +2,123 @@ use std::time::Instant;
use brk_types::{Cents, Height, Timestamp};
use tracing::debug;
use vecdb::{ReadableVec, VecIndex};
use crate::{blocks, prices};
use vecdb::VecIndex;
/// Sparse table for O(1) range maximum queries on prices.
/// Uses O(n log n) space (~140MB for 880k blocks).
/// Vec<Vec> per level for incremental O(new_blocks * log n) extension.
#[derive(Debug, Clone, Default)]
pub struct PriceRangeMax {
/// Flattened table: table[k * n + i] = max of 2^k elements starting at index i
/// Using flat layout for better cache locality.
table: Vec<Cents>,
/// Number of elements
levels: Vec<Vec<Cents>>,
n: usize,
}
impl PriceRangeMax {
/// Build sparse table from high prices. O(n log n) time and space.
pub(crate) fn build(prices: &[Cents]) -> Self {
let start = Instant::now();
let n = prices.len();
if n == 0 {
return Self {
table: vec![],
n: 0,
};
pub(crate) fn extend(&mut self, prices: &[Cents]) {
let new_n = prices.len();
if new_n <= self.n || new_n == 0 {
return;
}
// levels = floor(log2(n)) + 1
let levels = (usize::BITS - n.leading_zeros()) as usize;
let start = Instant::now();
let old_n = self.n;
let new_levels_count = (usize::BITS - new_n.leading_zeros()) as usize;
// Allocate flat table: levels * n elements
let mut table = vec![Cents::ZERO; levels * n];
while self.levels.len() < new_levels_count {
self.levels.push(Vec::new());
}
// Base case: level 0 = original prices
table[..n].copy_from_slice(prices);
self.levels[0].extend_from_slice(&prices[old_n..new_n]);
// Build each level from the previous
// table[k][i] = max(table[k-1][i], table[k-1][i + 2^(k-1)])
for k in 1..levels {
let prev_offset = (k - 1) * n;
let curr_offset = k * n;
for k in 1..new_levels_count {
let half = 1 << (k - 1);
let end = n.saturating_sub(1 << k) + 1;
let new_end = if new_n >= (1 << k) {
new_n + 1 - (1 << k)
} else {
0
};
// Use split_at_mut to avoid bounds checks in the loop
let (prev_level, rest) = table.split_at_mut(curr_offset);
let prev = &prev_level[prev_offset..prev_offset + n];
let curr = &mut rest[..n];
for i in 0..end {
curr[i] = prev[i].max(prev[i + half]);
let old_end = self.levels[k].len();
if new_end > old_end {
let (prev_levels, curr_levels) = self.levels.split_at_mut(k);
let prev = &prev_levels[k - 1];
let curr = &mut curr_levels[0];
curr.reserve(new_end - old_end);
for i in old_end..new_end {
curr.push(prev[i].max(prev[i + half]));
}
}
}
self.n = new_n;
let elapsed = start.elapsed();
let total_entries: usize = self.levels.iter().map(|l| l.len()).sum();
debug!(
"PriceRangeMax built: {} heights, {} levels, {:.2}MB, {:.2}ms",
n,
levels,
(levels * n * std::mem::size_of::<Cents>()) as f64 / 1_000_000.0,
"PriceRangeMax extended: {} -> {} heights ({} new), {} levels, {:.2}MB, {:.2}ms",
old_n,
new_n,
new_n - old_n,
new_levels_count,
(total_entries * std::mem::size_of::<Cents>()) as f64 / 1_000_000.0,
elapsed.as_secs_f64() * 1000.0
);
Self { table, n }
}
/// Query maximum value in range [l, r] (inclusive). O(1) time.
pub(crate) fn truncate(&mut self, new_n: usize) {
if new_n >= self.n {
return;
}
if new_n == 0 {
self.levels.clear();
self.n = 0;
return;
}
let new_levels_count = (usize::BITS - new_n.leading_zeros()) as usize;
self.levels.truncate(new_levels_count);
for k in 0..new_levels_count {
let valid = if new_n >= (1 << k) {
new_n + 1 - (1 << k)
} else {
0
};
self.levels[k].truncate(valid);
}
self.n = new_n;
}
#[inline]
pub(crate) fn range_max(&self, l: usize, r: usize) -> Cents {
debug_assert!(l <= r && r < self.n);
let len = r - l + 1;
// k = floor(log2(len))
let k = (usize::BITS - len.leading_zeros() - 1) as usize;
let half = 1 << k;
// max of [l, l + 2^k) and [r - 2^k + 1, r + 1)
let offset = k * self.n;
let level = &self.levels[k];
unsafe {
let a = *self.table.get_unchecked(offset + l);
let b = *self.table.get_unchecked(offset + r + 1 - half);
let a = *level.get_unchecked(l);
let b = *level.get_unchecked(r + 1 - half);
a.max(b)
}
}
/// Query maximum value in height range. O(1) time.
#[inline]
pub(crate) fn max_between(&self, from: Height, to: Height) -> Cents {
self.range_max(from.to_usize(), to.to_usize())
}
}
/// Context shared across block processing.
pub struct ComputeContext {
/// Starting height for this computation run
pub struct ComputeContext<'a> {
pub starting_height: Height,
/// Last height to process
pub last_height: Height,
/// Pre-computed height -> timestamp mapping
pub height_to_timestamp: Vec<Timestamp>,
/// Pre-computed height -> price mapping
pub height_to_price: Vec<Cents>,
/// Sparse table for O(1) range max queries on high prices.
/// Used for computing max price during UTXO holding periods (peak regret).
pub price_range_max: PriceRangeMax,
pub height_to_timestamp: &'a [Timestamp],
pub height_to_price: &'a [Cents],
pub price_range_max: &'a PriceRangeMax,
}
impl ComputeContext {
/// Create a new computation context.
pub(crate) fn new(
starting_height: Height,
last_height: Height,
blocks: &blocks::Vecs,
prices: &prices::Vecs,
) -> Self {
let height_to_timestamp: Vec<Timestamp> =
blocks.time.timestamp_monotonic.collect();
let height_to_price: Vec<Cents> =
prices.price.cents.height.collect();
// Build sparse table for O(1) range max queries on prices
// Used for computing peak price during UTXO holding periods (peak regret)
let price_range_max = PriceRangeMax::build(&height_to_price);
Self {
starting_height,
last_height,
height_to_timestamp,
height_to_price,
price_range_max,
}
}
/// Get price at height.
impl<'a> ComputeContext<'a> {
pub(crate) fn price_at(&self, height: Height) -> Cents {
self.height_to_price[height.to_usize()]
}
/// Get timestamp at height.
pub(crate) fn timestamp_at(&self, height: Height) -> Timestamp {
self.height_to_timestamp[height.to_usize()]
}

View File

@@ -34,16 +34,6 @@ impl<I: Default + Copy, V: Default + Copy> Default for RangeMap<I, V> {
}
impl<I: Ord + Copy + Default, V: From<usize> + Copy + Default> RangeMap<I, V> {
/// Create with pre-allocated capacity.
pub(crate) fn with_capacity(capacity: usize) -> Self {
Self {
first_indexes: Vec::with_capacity(capacity),
cache: [(I::default(), I::default(), V::default(), 0); CACHE_SIZE],
cache_len: 0,
_phantom: PhantomData,
}
}
/// Number of ranges stored.
pub(crate) fn len(&self) -> usize {
self.first_indexes.len()

View File

@@ -4,8 +4,8 @@ use brk_error::Result;
use brk_indexer::Indexer;
use brk_traversable::Traversable;
use brk_types::{
Day1, EmptyAddressData, EmptyAddressIndex, FundedAddressData, FundedAddressIndex, Height,
SupplyState, TxIndex, Version,
Cents, Day1, EmptyAddressData, EmptyAddressIndex, FundedAddressData, FundedAddressIndex,
Height, SupplyState, Timestamp, TxIndex, Version,
};
use tracing::{debug, info};
use vecdb::{
@@ -16,7 +16,10 @@ use vecdb::{
use crate::{
ComputeIndexes, blocks,
distribution::{
compute::{StartMode, determine_start_mode, process_blocks, recover_state, reset_state},
compute::{
PriceRangeMax, StartMode, determine_start_mode, process_blocks, recover_state,
reset_state,
},
state::BlockState,
},
indexes, inputs, outputs, prices, transactions,
@@ -69,6 +72,16 @@ pub struct Vecs<M: StorageMode = Rw> {
/// In-memory txindex→height reverse lookup. Kept across compute() calls.
#[traversable(skip)]
txindex_to_height: RangeMap<TxIndex, Height>,
/// Cached height→price mapping. Incrementally extended, O(new_blocks) on resume.
#[traversable(skip)]
cached_prices: Vec<Cents>,
/// Cached height→timestamp mapping. Incrementally extended, O(new_blocks) on resume.
#[traversable(skip)]
cached_timestamps: Vec<Timestamp>,
/// Cached sparse table for O(1) range-max price queries. Incrementally extended.
#[traversable(skip)]
cached_price_range_max: PriceRangeMax,
}
const SAVED_STAMPED_CHANGES: u16 = 10;
@@ -159,6 +172,10 @@ impl Vecs {
chain_state: Vec::new(),
txindex_to_height: RangeMap::default(),
cached_prices: Vec::new(),
cached_timestamps: Vec::new(),
cached_price_range_max: PriceRangeMax::default(),
db,
states_path,
};
@@ -194,6 +211,32 @@ impl Vecs {
starting_indexes: &mut ComputeIndexes,
exit: &Exit,
) -> Result<()> {
let cache_target_len = prices
.price
.cents
.height
.len()
.min(blocks.time.timestamp_monotonic.len());
let cache_current_len = self.cached_prices.len();
if cache_target_len < cache_current_len {
self.cached_prices.truncate(cache_target_len);
self.cached_timestamps.truncate(cache_target_len);
self.cached_price_range_max.truncate(cache_target_len);
} else if cache_target_len > cache_current_len {
let new_prices = prices
.price
.cents
.height
.collect_range_at(cache_current_len, cache_target_len);
let new_timestamps = blocks
.time
.timestamp_monotonic
.collect_range_at(cache_current_len, cache_target_len);
self.cached_prices.extend(new_prices);
self.cached_timestamps.extend(new_timestamps);
}
self.cached_price_range_max.extend(&self.cached_prices);
// 1. Find minimum height we have data for across stateful vecs
let current_height = Height::from(self.supply_state.len());
let min_stateful = self.min_stateful_height_len();
@@ -268,15 +311,9 @@ impl Vecs {
debug!("reusing in-memory chain_state ({} entries)", chain_state.len());
recovered_height
} else {
// Rollback or first run after restart: rebuild from supply_state
debug!("rebuilding chain_state from stored values");
let height_to_timestamp = &blocks.time.timestamp_monotonic;
let height_to_price = &prices.price.cents.height;
let end = usize::from(recovered_height);
let timestamp_data: Vec<_> = height_to_timestamp.collect_range_at(0, end);
let price_data: Vec<_> = height_to_price.collect_range_at(0, end);
debug!("building supply_state vec for {} heights", recovered_height);
let supply_state_data: Vec<_> = self.supply_state.collect_range_at(0, end);
chain_state = supply_state_data
@@ -284,8 +321,8 @@ impl Vecs {
.enumerate()
.map(|(h, supply)| BlockState {
supply,
price: price_data[h],
timestamp: timestamp_data[h],
price: self.cached_prices[h],
timestamp: self.cached_timestamps[h],
})
.collect();
debug!("chain_state rebuilt");
@@ -329,6 +366,12 @@ impl Vecs {
// 4. Process blocks
if starting_height <= last_height {
debug!("calling process_blocks");
let cached_prices = std::mem::take(&mut self.cached_prices);
let cached_timestamps = std::mem::take(&mut self.cached_timestamps);
let cached_price_range_max =
std::mem::take(&mut self.cached_price_range_max);
process_blocks(
self,
indexer,
@@ -337,13 +380,19 @@ impl Vecs {
outputs,
transactions,
blocks,
prices,
starting_height,
last_height,
&mut chain_state,
&mut txindex_to_height,
&cached_prices,
&cached_timestamps,
&cached_price_range_max,
exit,
)?;
self.cached_prices = cached_prices;
self.cached_timestamps = cached_timestamps;
self.cached_price_range_max = cached_price_range_max;
}
// Put chain_state and txindex_to_height back