computer: stateful snapshot

This commit is contained in:
nym21
2025-12-17 14:22:31 +01:00
parent a006cefd71
commit 9b2f334130
26 changed files with 488 additions and 278 deletions
Generated
+1
View File
@@ -601,6 +601,7 @@ dependencies = [
"clap",
"color-eyre",
"log",
"mimalloc",
"minreq",
"serde",
"tokio",
+1
View File
@@ -65,6 +65,7 @@ fjall = "3.0.0-rc.6"
# fjall3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "434979ef59d8fd2b36b91e6ff759a36d19a397ee", package = "fjall" }
jiff = "0.2.16"
log = "0.4.29"
mimalloc = { version = "0.1.48", features = ["v3"] }
minreq = { version = "2.14.1", features = ["https", "serde_json"] }
parking_lot = "0.12.5"
rayon = "1.11.0"
+3 -2
View File
@@ -16,20 +16,21 @@ brk_error = { workspace = true }
brk_fetcher = { workspace = true }
brk_indexer = { workspace = true }
brk_iterator = { workspace = true }
brk_logger = { workspace = true }
brk_mempool = { workspace = true }
brk_query = { workspace = true }
brk_logger = { workspace = true }
brk_reader = { workspace = true }
brk_rpc = { workspace = true }
brk_server = { workspace = true }
vecdb = { workspace = true }
clap = { version = "4.5.53", features = ["derive", "string"] }
color-eyre = { workspace = true }
log = { workspace = true }
mimalloc = { workspace = true }
minreq = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
toml = "0.9.8"
vecdb = { workspace = true }
zip = { version = "6.0.0", default-features = false, features = ["deflate"] }
[[bin]]
+4
View File
@@ -19,6 +19,7 @@ use brk_query::AsyncQuery;
use brk_reader::Reader;
use brk_server::{Server, VERSION};
use log::info;
use mimalloc::MiMalloc;
use vecdb::Exit;
mod config;
@@ -27,6 +28,9 @@ mod website;
use crate::{config::Config, paths::*};
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
pub fn main() -> color_eyre::Result<()> {
// Can't increase main thread's stack size, thus we need to use another thread
thread::Builder::new()
+1
View File
@@ -0,0 +1 @@
bottlenecks.md
@@ -105,6 +105,16 @@ impl PricePercentiles {
}
Ok(())
}
/// Validate computed versions or reset if mismatched.
pub fn validate_computed_version_or_reset(&mut self, version: Version) -> Result<()> {
for vec in self.vecs.iter_mut().flatten() {
if let Some(height_vec) = vec.height.as_mut() {
height_vec.validate_computed_version_or_reset(version)?;
}
}
Ok(())
}
}
impl Traversable for PricePercentiles {
+70 -70
View File
@@ -182,84 +182,84 @@ impl Computer {
info!("Computed prices in {:?}", i.elapsed());
}
thread::scope(|scope| -> Result<()> {
let blks = scope.spawn(|| -> Result<()> {
info!("Computing BLKs metadata...");
let i = Instant::now();
self.blks
.compute(indexer, &starting_indexes, reader, exit)?;
info!("Computed blk in {:?}", i.elapsed());
Ok(())
});
// thread::scope(|scope| -> Result<()> {
// let blks = scope.spawn(|| -> Result<()> {
info!("Computing BLKs metadata...");
let i = Instant::now();
self.blks
.compute(indexer, &starting_indexes, reader, exit)?;
info!("Computed blk in {:?}", i.elapsed());
// Ok(())
// });
let constants = scope.spawn(|| -> Result<()> {
info!("Computing constants...");
let i = Instant::now();
self.constants
.compute(&self.indexes, &starting_indexes, exit)?;
info!("Computed constants in {:?}", i.elapsed());
Ok(())
});
// let constants = scope.spawn(|| -> Result<()> {
info!("Computing constants...");
let i = Instant::now();
self.constants
.compute(&self.indexes, &starting_indexes, exit)?;
info!("Computed constants in {:?}", i.elapsed());
// Ok(())
// });
let chain = scope.spawn(|| -> Result<()> {
info!("Computing chain...");
let i = Instant::now();
self.chain.compute(
indexer,
&self.indexes,
&starting_indexes,
self.price.as_ref(),
exit,
)?;
info!("Computed chain in {:?}", i.elapsed());
Ok(())
});
// let chain = scope.spawn(|| -> Result<()> {
info!("Computing chain...");
let i = Instant::now();
self.chain.compute(
indexer,
&self.indexes,
&starting_indexes,
self.price.as_ref(),
exit,
)?;
info!("Computed chain in {:?}", i.elapsed());
// Ok(())
// });
if let Some(price) = self.price.as_ref() {
info!("Computing market...");
let i = Instant::now();
self.market.compute(price, &starting_indexes, exit)?;
info!("Computed market in {:?}", i.elapsed());
}
if let Some(price) = self.price.as_ref() {
info!("Computing market...");
let i = Instant::now();
self.market.compute(price, &starting_indexes, exit)?;
info!("Computed market in {:?}", i.elapsed());
}
blks.join().unwrap()?;
constants.join().unwrap()?;
chain.join().unwrap()?;
Ok(())
})?;
// blks.join().unwrap()?;
// constants.join().unwrap()?;
// chain.join().unwrap()?;
// Ok(())
// })?;
let starting_indexes_clone = starting_indexes.clone();
thread::scope(|scope| -> Result<()> {
let pools = scope.spawn(|| -> Result<()> {
info!("Computing pools...");
let i = Instant::now();
self.pools.compute(
indexer,
&self.indexes,
&starting_indexes_clone,
&self.chain,
self.price.as_ref(),
exit,
)?;
info!("Computed pools in {:?}", i.elapsed());
Ok(())
});
// thread::scope(|scope| -> Result<()> {
// let pools = scope.spawn(|| -> Result<()> {
info!("Computing pools...");
let i = Instant::now();
self.pools.compute(
indexer,
&self.indexes,
&starting_indexes_clone,
&self.chain,
self.price.as_ref(),
exit,
)?;
info!("Computed pools in {:?}", i.elapsed());
// Ok(())
// });
info!("Computing stateful...");
let i = Instant::now();
self.stateful.compute(
indexer,
&self.indexes,
&self.chain,
self.price.as_ref(),
&mut starting_indexes,
exit,
)?;
info!("Computed stateful in {:?}", i.elapsed());
info!("Computing stateful...");
let i = Instant::now();
self.stateful.compute(
indexer,
&self.indexes,
&self.chain,
self.price.as_ref(),
&mut starting_indexes,
exit,
)?;
info!("Computed stateful in {:?}", i.elapsed());
pools.join().unwrap()?;
Ok(())
})?;
// pools.join().unwrap()?;
// Ok(())
// })?;
info!("Computing cointime...");
let i = Instant::now();
@@ -97,6 +97,19 @@ impl AddressTypeToHeightToAddressCount {
.truncate_push(height, addresstype_to_usize.p2a.into())?;
Ok(())
}
pub fn reset(&mut self) -> Result<()> {
use vecdb::GenericStoredVec;
self.p2pk65.reset()?;
self.p2pk33.reset()?;
self.p2pkh.reset()?;
self.p2sh.reset()?;
self.p2wpkh.reset()?;
self.p2wsh.reset()?;
self.p2tr.reset()?;
self.p2a.reset()?;
Ok(())
}
}
/// Address count per address type, indexed by various indexes (dateindex, etc.).
@@ -128,24 +128,52 @@ impl DynCohortVecs for AddressCohortVecs {
fn reset_state_starting_height(&mut self) {
self.reset_starting_height();
if let Some(state) = self.state.as_mut() {
state.reset();
}
}
fn import_state(&mut self, starting_height: Height) -> Result<Height> {
use vecdb::GenericStoredVec;
// Import state from runtime state if present
if let Some(state) = self.state.as_mut() {
let imported = state.inner.import_at_or_before(starting_height)?;
self.starting_height = Some(imported);
// State files are saved AT height H, so to resume at H+1 we need to import at H
// Decrement first, then increment result to match expected starting_height
if let Some(mut prev_height) = starting_height.decremented() {
// Import price_to_amount state file (may adjust prev_height to actual file found)
prev_height = state.inner.import_at_or_before(prev_height)?;
// Restore addr_count from last known value
if let Some(prev_height) = imported.decremented() {
use vecdb::TypedVecIterator;
state.addr_count = *self
.height_to_addr_count
.into_iter()
.get_unwrap(prev_height);
// Restore supply state from height-indexed vectors
state.inner.supply.value = self
.metrics
.supply
.height_to_supply
.read_once(prev_height)?;
state.inner.supply.utxo_count = *self
.metrics
.supply
.height_to_utxo_count
.read_once(prev_height)?;
state.addr_count = *self.height_to_addr_count.read_once(prev_height)?;
// Restore realized cap if present
if let Some(realized_metrics) = self.metrics.realized.as_mut()
&& let Some(realized_state) = state.inner.realized.as_mut()
{
realized_state.cap = realized_metrics
.height_to_realized_cap
.read_once(prev_height)?;
}
let result = prev_height.incremented();
self.starting_height = Some(result);
Ok(result)
} else {
// starting_height is 0, nothing to import
self.starting_height = Some(Height::ZERO);
Ok(Height::ZERO)
}
Ok(imported)
} else {
self.starting_height = Some(starting_height);
Ok(starting_height)
@@ -236,14 +236,12 @@ impl AddressCohorts {
.unwrap_or_default()
}
/// Import state for all separate cohorts at given height.
///
/// Note: This follows the same pattern as UTXOCohorts - errors are ignored
/// and the start_mode logic ensures we're in a valid state before calling.
pub fn import_separate_states(&mut self, height: Height) {
self.par_iter_separate_mut().for_each(|v| {
let _ = v.import_state(height);
});
/// Import state for all separate cohorts at or before given height.
/// Returns true if all imports succeeded and returned the expected height.
pub fn import_separate_states(&mut self, height: Height) -> bool {
self.par_iter_separate_mut()
.map(|v| v.import_state(height).unwrap_or_default())
.all(|h| h == height)
}
/// Reset state heights for all separate cohorts.
@@ -21,6 +21,18 @@ impl AddressCohortState {
}
}
/// Reset state for fresh start.
pub fn reset(&mut self) {
self.addr_count = 0;
self.inner.supply = crate::SupplyState::default();
self.inner.sent = Sats::ZERO;
self.inner.satblocks_destroyed = Sats::ZERO;
self.inner.satdays_destroyed = Sats::ZERO;
if let Some(realized) = self.inner.realized.as_mut() {
*realized = crate::RealizedState::NAN;
}
}
pub fn reset_price_to_amount_if_needed(&mut self) -> Result<()> {
self.inner.reset_price_to_amount_if_needed()
}
@@ -1,9 +1,11 @@
use std::path::Path;
use brk_error::Result;
use brk_types::Sats;
use derive_deref::{Deref, DerefMut};
use super::CohortState;
use crate::{RealizedState, SupplyState};
#[derive(Clone, Deref, DerefMut)]
pub struct UTXOCohortState(CohortState);
@@ -16,4 +18,15 @@ impl UTXOCohortState {
pub fn reset_price_to_amount_if_needed(&mut self) -> Result<()> {
self.0.reset_price_to_amount_if_needed()
}
/// Reset state for fresh start.
pub fn reset(&mut self) {
self.0.supply = SupplyState::default();
self.0.sent = Sats::ZERO;
self.0.satblocks_destroyed = Sats::ZERO;
self.0.satdays_destroyed = Sats::ZERO;
if let Some(realized) = self.0.realized.as_mut() {
*realized = RealizedState::NAN;
}
}
}
@@ -92,9 +92,12 @@ impl UTXOCohortVecs {
self.state_starting_height = Some(height);
}
/// Reset state starting height to zero.
/// Reset state starting height to zero and reset state values.
pub fn reset_state_starting_height(&mut self) {
self.state_starting_height = Some(Height::ZERO);
if let Some(state) = self.state.as_mut() {
state.reset();
}
}
/// Compute percentile prices from standalone price_to_amount.
@@ -150,14 +153,40 @@ impl DynCohortVecs for UTXOCohortVecs {
fn reset_state_starting_height(&mut self) {
self.state_starting_height = Some(Height::ZERO);
if let Some(state) = self.state.as_mut() {
state.reset();
}
}
fn import_state(&mut self, starting_height: Height) -> Result<Height> {
use vecdb::GenericStoredVec;
// Import state from runtime state if present
if let Some(state) = self.state.as_mut() {
let imported = state.import_at_or_before(starting_height)?;
self.state_starting_height = Some(imported);
Ok(imported)
// State files are saved AT height H, so to resume at H+1 we need to import at H
// Decrement first, then increment result to match expected starting_height
if let Some(mut prev_height) = starting_height.decremented() {
// Import price_to_amount state file (may adjust prev_height to actual file found)
prev_height = state.import_at_or_before(prev_height)?;
// Restore supply state from height-indexed vectors
state.supply.value = self.metrics.supply.height_to_supply.read_once(prev_height)?;
state.supply.utxo_count = *self.metrics.supply.height_to_utxo_count.read_once(prev_height)?;
// Restore realized cap if present
if let Some(realized_metrics) = self.metrics.realized.as_mut()
&& let Some(realized_state) = state.realized.as_mut() {
realized_state.cap = realized_metrics.height_to_realized_cap.read_once(prev_height)?;
}
let result = prev_height.incremented();
self.state_starting_height = Some(result);
Ok(result)
} else {
// starting_height is 0, nothing to import
self.state_starting_height = Some(Height::ZERO);
Ok(Height::ZERO)
}
} else {
self.state_starting_height = Some(starting_height);
Ok(starting_height)
@@ -344,14 +344,18 @@ impl UTXOCohorts {
})
}
/// Flush stateful vectors for separate cohorts.
/// Flush stateful vectors for separate and aggregate cohorts.
pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
// Flush separate cohorts (includes metrics + state)
self.par_iter_separate_mut()
.try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?;
self.0
.par_iter_aggregate_mut()
.try_for_each(|v| v.price_to_amount.flush_at_height(height, exit))
// Flush aggregate cohorts' price_to_amount state AND metrics (including price_percentiles)
for v in self.0.iter_aggregate_mut() {
v.price_to_amount.flush_at_height(height, exit)?;
v.metrics.safe_flush(exit)?;
}
Ok(())
}
/// Reset aggregate cohorts' price_to_amount for fresh start.
@@ -382,11 +386,12 @@ impl UTXOCohorts {
.unwrap_or_default()
}
/// Import state for all separate cohorts at given height.
pub fn import_separate_states(&mut self, height: Height) {
self.par_iter_separate_mut().for_each(|v| {
let _ = v.import_state(height);
});
/// Import state for all separate cohorts at or before given height.
/// Returns true if all imports succeeded and returned the expected height.
pub fn import_separate_states(&mut self, height: Height) -> bool {
self.par_iter_separate_mut()
.map(|v| v.import_state(height).unwrap_or_default())
.all(|h| h == height)
}
/// Reset state heights for all separate cohorts.
@@ -463,9 +468,17 @@ impl UTXOCohorts {
Ok(())
}
/// Validate computed versions for all separate cohorts.
/// Validate computed versions for all cohorts (separate and aggregate).
pub fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {
// Validate separate cohorts
self.par_iter_separate_mut()
.try_for_each(|v| v.validate_computed_versions(base_version))
.try_for_each(|v| v.validate_computed_versions(base_version))?;
// Validate aggregate cohorts' price_percentiles
for v in self.0.iter_aggregate_mut() {
v.validate_computed_versions(base_version)?;
}
Ok(())
}
}
@@ -69,8 +69,8 @@ impl UTXOCohorts {
last_timestamp.difference_in_days_between_float(block_state.timestamp);
let older_than_hour = last_timestamp
.checked_sub(block_state.timestamp)
.map(|d| d.is_more_than_hour())
.unwrap_or(false);
.unwrap()
.is_more_than_hour();
// Update time-based cohorts
time_cohorts
@@ -6,7 +6,7 @@
use brk_grouper::{Filter, Filtered, UTXOGroups};
use brk_types::{ONE_DAY_IN_SEC, Sats, Timestamp};
use crate::{states::BlockState, utils::OptionExt, PriceToAmount};
use crate::{PriceToAmount, states::BlockState, utils::OptionExt};
use super::UTXOCohorts;
@@ -66,6 +66,8 @@ pub fn process_blocks(
ctx.price.is_some()
);
info!("Setting up references...");
// References to vectors using correct field paths
// From indexer.vecs:
let height_to_first_txindex = &indexer.vecs.tx.height_to_first_txindex;
@@ -99,6 +101,8 @@ pub fn process_blocks(
let height_to_price_vec = &ctx.height_to_price;
let height_to_timestamp_vec = &ctx.height_to_timestamp;
info!("Creating iterators...");
// Create iterators for sequential access
let mut height_to_first_txindex_iter = height_to_first_txindex.into_iter();
let mut height_to_first_txoutindex_iter = height_to_first_txoutindex.into_iter();
@@ -116,13 +120,19 @@ pub fn process_blocks(
let mut height_to_price_iter = height_to_price.map(|v| v.into_iter());
let mut dateindex_to_price_iter = dateindex_to_price.map(|v| v.into_iter());
info!("Building txoutindex_to_height map...");
// Build txoutindex -> height map for input processing
let txoutindex_to_height = build_txoutindex_to_height_map(height_to_first_txoutindex);
info!("Creating readers...");
// Create readers for parallel data access
let ir = IndexerReaders::new(indexer);
let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
info!("Creating address iterators...");
// Create iterators for first address indexes per type
let mut first_p2a_iter = indexer
.vecs
@@ -165,6 +175,8 @@ pub fn process_blocks(
.height_to_first_p2wshaddressindex
.into_iter();
info!("Recovering running totals...");
// Track running totals - recover from previous height if resuming
let (
mut unspendable_supply,
@@ -172,23 +184,29 @@ pub fn process_blocks(
mut addresstype_to_addr_count,
mut addresstype_to_empty_addr_count,
) = if starting_height > Height::ZERO {
info!("Reading unspendable_supply...");
let prev_height = starting_height.decremented().unwrap();
(
vecs.height_to_unspendable_supply
.into_iter()
.get_unwrap(prev_height),
vecs.height_to_opreturn_supply
.into_iter()
.get_unwrap(prev_height),
AddressTypeToAddressCount::from((
&vecs.addresstype_to_height_to_addr_count,
starting_height,
)),
AddressTypeToAddressCount::from((
&vecs.addresstype_to_height_to_empty_addr_count,
starting_height,
)),
)
let unspendable = vecs
.height_to_unspendable_supply
.into_iter()
.get_unwrap(prev_height);
info!("Reading opreturn_supply...");
let opreturn = vecs
.height_to_opreturn_supply
.into_iter()
.get_unwrap(prev_height);
info!("Reading addresstype_to_addr_count...");
let addr_count = AddressTypeToAddressCount::from((
&vecs.addresstype_to_height_to_addr_count,
starting_height,
));
info!("Reading addresstype_to_empty_addr_count...");
let empty_addr_count = AddressTypeToAddressCount::from((
&vecs.addresstype_to_height_to_empty_addr_count,
starting_height,
));
info!("Recovery complete.");
(unspendable, opreturn, addr_count, empty_addr_count)
} else {
(
Sats::ZERO,
@@ -204,13 +222,13 @@ pub fn process_blocks(
let mut empty_cache: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource> =
AddressTypeToTypeIndexMap::default();
info!("Starting main block iteration...");
// Main block iteration
for height in starting_height.to_usize()..=last_height.to_usize() {
let height = Height::from(height);
if height.to_usize() % 10000 == 0 {
info!("Processing chain at {}...", height);
}
info!("Processing chain at {}...", height);
// Get block metadata
let first_txindex = height_to_first_txindex_iter.get_unwrap(height);
@@ -452,7 +470,14 @@ pub fn process_blocks(
// Drop readers before flush to release mmap handles
drop(vr);
flush_checkpoint(vecs, height, &mut loaded_cache, &mut empty_cache, exit)?;
flush_checkpoint(
vecs,
height,
chain_state,
&mut loaded_cache,
&mut empty_cache,
exit,
)?;
// Recreate readers after flush to pick up new data
vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
@@ -462,7 +487,14 @@ pub fn process_blocks(
// Final flush
let _lock = exit.lock();
drop(vr);
flush_checkpoint(vecs, last_height, &mut loaded_cache, &mut empty_cache, exit)?;
flush_checkpoint(
vecs,
last_height,
chain_state,
&mut loaded_cache,
&mut empty_cache,
exit,
)?;
Ok(())
}
@@ -491,23 +523,15 @@ fn push_cohort_states(
dateindex: Option<DateIndex>,
date_price: Option<Option<brk_types::Dollars>>,
) -> Result<()> {
utxo_cohorts
.par_iter_separate_mut()
.map(|v| v as &mut dyn DynCohortVecs)
.chain(
address_cohorts
.par_iter_separate_mut()
.map(|v| v as &mut dyn DynCohortVecs),
)
.try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(
height,
height_price,
dateindex,
date_price,
)
})?;
utxo_cohorts.par_iter_separate_mut().try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(height, height_price, dateindex, date_price)
})?;
address_cohorts.par_iter_separate_mut().try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(height, height_price, dateindex, date_price)
})?;
Ok(())
}
@@ -518,11 +542,12 @@ fn push_cohort_states(
/// - Cohort stateful vectors
/// - Height-indexed vectors
/// - Address data caches (loaded and empty)
/// - Chain state
/// - Chain state (synced from in-memory to persisted)
#[allow(clippy::too_many_arguments)]
fn flush_checkpoint(
vecs: &mut Vecs,
height: Height,
chain_state: &[BlockState],
loaded_cache: &mut AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
empty_cache: &mut AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
exit: &Exit,
@@ -564,9 +589,15 @@ fn flush_checkpoint(
exit,
)?;
// Flush chain state and txoutindex_to_txinindex with stamp
// Flush txoutindex_to_txinindex with stamp
vecs.txoutindex_to_txinindex
.stamped_flush_with_changes(height.into())?;
// Sync in-memory chain_state to persisted and flush
vecs.chain_state.truncate_if_needed(Height::ZERO)?;
for block_state in chain_state {
vecs.chain_state.push(block_state.supply.clone());
}
vecs.chain_state.stamped_flush_with_changes(height.into())?;
Ok(())
@@ -74,40 +74,32 @@ impl VecsReaders {
pub fn build_txoutindex_to_txindex<'a>(
block_first_txindex: TxIndex,
block_tx_count: u64,
txindex_to_output_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>,
txindex_to_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>,
) -> Vec<TxIndex> {
let first = block_first_txindex.to_usize();
let counts: Vec<u64> = (0..block_tx_count as usize)
.map(|offset| {
let txindex = TxIndex::from(first + offset);
u64::from(txindex_to_output_count.get_unwrap(txindex))
})
.collect();
let total: u64 = counts.iter().sum();
let mut result = Vec::with_capacity(total as usize);
for (offset, &count) in counts.iter().enumerate() {
let txindex = TxIndex::from(first + offset);
result.extend(std::iter::repeat_n(txindex, count as usize));
}
result
build_index_to_txindex(block_first_txindex, block_tx_count, txindex_to_count)
}
/// Build txinindex -> txindex mapping for a block.
pub fn build_txinindex_to_txindex<'a>(
block_first_txindex: TxIndex,
block_tx_count: u64,
txindex_to_input_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>,
txindex_to_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>,
) -> Vec<TxIndex> {
build_index_to_txindex(block_first_txindex, block_tx_count, txindex_to_count)
}
/// Build index -> txindex mapping for a block (shared implementation).
fn build_index_to_txindex<'a>(
block_first_txindex: TxIndex,
block_tx_count: u64,
txindex_to_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>,
) -> Vec<TxIndex> {
let first = block_first_txindex.to_usize();
let counts: Vec<u64> = (0..block_tx_count as usize)
.map(|offset| {
let txindex = TxIndex::from(first + offset);
u64::from(txindex_to_input_count.get_unwrap(txindex))
u64::from(txindex_to_count.get_unwrap(txindex))
})
.collect();
@@ -15,18 +15,19 @@ use super::super::AddressesDataVecs;
/// Result of state recovery.
pub struct RecoveredState {
/// Height to start processing from.
/// Height to start processing from. Zero means fresh start.
pub starting_height: Height,
/// Whether state was successfully restored (vs starting fresh).
pub restored: bool,
}
/// Perform state recovery for resuming from checkpoint.
///
/// Rolls back state vectors and imports cohort states.
/// Returns the recovered state information.
/// Validates that all rollbacks and imports are consistent.
/// Returns Height::ZERO if any validation fails (triggers fresh start).
pub fn recover_state(
height: Height,
chain_state_rollback: vecdb::Result<Stamp>,
txoutindex_rollback: vecdb::Result<Stamp>,
any_address_indexes: &mut AnyAddressIndexesVecs,
addresses_data: &mut AddressesDataVecs,
utxo_cohorts: &mut UTXOCohorts,
@@ -38,24 +39,45 @@ pub fn recover_state(
let address_indexes_rollback = any_address_indexes.rollback_before(stamp);
let address_data_rollback = addresses_data.rollback_before(stamp);
// Verify rollback consistency (uses rollback_states helper)
let _consistent_height = rollback_states(
stamp,
Ok(stamp), // chain_state handled separately
// Verify rollback consistency - all must agree on the same height
let consistent_height = rollback_states(
chain_state_rollback,
txoutindex_rollback,
address_indexes_rollback,
address_data_rollback,
);
// Import cohort states
utxo_cohorts.import_separate_states(height);
address_cohorts.import_separate_states(height);
// If rollbacks are inconsistent, start fresh
if consistent_height.is_zero() {
return Ok(RecoveredState {
starting_height: Height::ZERO,
});
}
// Import aggregate price_to_amount
let _ = import_aggregate_price_to_amount(height, utxo_cohorts)?;
// Import UTXO cohort states - all must succeed
if !utxo_cohorts.import_separate_states(height) {
return Ok(RecoveredState {
starting_height: Height::ZERO,
});
}
// Import address cohort states - all must succeed
if !address_cohorts.import_separate_states(height) {
return Ok(RecoveredState {
starting_height: Height::ZERO,
});
}
// Import aggregate price_to_amount - must match height
let imported = import_aggregate_price_to_amount(height, utxo_cohorts)?;
if imported != height {
return Ok(RecoveredState {
starting_height: Height::ZERO,
});
}
Ok(RecoveredState {
starting_height: height,
restored: true,
})
}
@@ -85,12 +107,16 @@ pub fn reset_state(
Ok(RecoveredState {
starting_height: Height::ZERO,
restored: false,
})
}
/// Check if we can resume from a checkpoint or need to start fresh.
pub fn determine_start_mode(computed_min: Height, chain_state_height: Height) -> StartMode {
// No data to resume from
if chain_state_height.is_zero() {
return StartMode::Fresh;
}
match computed_min.cmp(&chain_state_height) {
Ordering::Greater => unreachable!("min height > chain state height"),
Ordering::Equal => StartMode::Resume(chain_state_height),
@@ -108,32 +134,42 @@ pub enum StartMode {
/// Rollback state vectors to before a given stamp.
///
/// Returns the consistent starting height if all vectors agree,
/// Returns the consistent starting height if ALL rollbacks succeed and agree,
/// otherwise returns Height::ZERO (need fresh start).
fn rollback_states(
_stamp: Stamp,
chain_state_rollback: vecdb::Result<Stamp>,
txoutindex_rollback: vecdb::Result<Stamp>,
address_indexes_rollbacks: Result<Vec<Stamp>>,
address_data_rollbacks: Result<[Stamp; 2]>,
) -> Height {
let mut heights: BTreeSet<Height> = BTreeSet::new();
if let Ok(s) = chain_state_rollback {
// All rollbacks must succeed - any error means fresh start
let Ok(s) = chain_state_rollback else {
return Height::ZERO;
};
heights.insert(Height::from(s).incremented());
let Ok(s) = txoutindex_rollback else {
return Height::ZERO;
};
heights.insert(Height::from(s).incremented());
let Ok(stamps) = address_indexes_rollbacks else {
return Height::ZERO;
};
for s in stamps {
heights.insert(Height::from(s).incremented());
}
if let Ok(stamps) = address_indexes_rollbacks {
for s in stamps {
heights.insert(Height::from(s).incremented());
}
}
if let Ok(stamps) = address_data_rollbacks {
for s in stamps {
heights.insert(Height::from(s).incremented());
}
let Ok(stamps) = address_data_rollbacks else {
return Height::ZERO;
};
for s in stamps {
heights.insert(Height::from(s).incremented());
}
// All must agree on the same height
if heights.len() == 1 {
heights.pop_first().unwrap()
} else {
@@ -134,6 +134,10 @@ impl CohortMetrics {
realized.validate_computed_versions(base_version)?;
}
if let Some(price_paid) = self.price_paid.as_mut() {
price_paid.validate_computed_versions(base_version)?;
}
Ok(())
}
@@ -151,10 +155,8 @@ impl CohortMetrics {
self.price_paid.as_mut(),
height_price,
) {
// Push price paid min/max
price_paid.truncate_push_minmax(height, state)?;
// Compute unrealized states from price_to_amount
let (height_unrealized_state, date_unrealized_state) =
state.compute_unrealized_states(height_price, date_price.unwrap());
@@ -165,7 +167,6 @@ impl CohortMetrics {
date_unrealized_state.as_ref(),
)?;
// Compute and push price percentiles
price_paid.truncate_push_percentiles(height, state)?;
}
@@ -117,6 +117,14 @@ impl PricePaidMetrics {
Ok(())
}
/// Validate computed versions or reset if mismatched.
pub fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {
if let Some(price_percentiles) = self.price_percentiles.as_mut() {
price_percentiles.validate_computed_version_or_reset(base_version)?;
}
Ok(())
}
/// Compute aggregate values from separate cohorts.
pub fn compute_from_stateful(
&mut self,
@@ -434,8 +434,12 @@ impl RealizedMetrics {
self.height_to_realized_loss.safe_write(exit)?;
self.height_to_value_created.safe_write(exit)?;
self.height_to_value_destroyed.safe_write(exit)?;
self.height_to_adjusted_value_created.um().safe_write(exit)?;
self.height_to_adjusted_value_destroyed.um().safe_write(exit)?;
if let Some(v) = self.height_to_adjusted_value_created.as_mut() {
v.safe_write(exit)?;
}
if let Some(v) = self.height_to_adjusted_value_destroyed.as_mut() {
v.safe_write(exit)?;
}
Ok(())
}
@@ -40,36 +40,44 @@ where
output_type: OutputType,
type_index: TypeIndex,
) -> (&mut LoadedAddressDataWithSource, bool, bool) {
let mut is_new = false;
let mut from_empty = false;
use std::collections::hash_map::Entry;
let addr_data = self
.loaded
.get_mut(output_type)
.unwrap()
.entry(type_index)
.or_insert_with(|| {
let map = self.loaded.get_mut(output_type).unwrap();
match map.entry(type_index) {
Entry::Occupied(entry) => {
// Entry already exists - check its source
let data = entry.into_mut();
let is_new = data.is_new();
let from_empty = data.is_from_emptyaddressdata();
(data, is_new, from_empty)
}
Entry::Vacant(entry) => {
// Check if it was in empty set
if let Some(empty_data) = self.empty.get_mut(output_type).unwrap().remove(&type_index) {
from_empty = true;
return empty_data.into();
if let Some(empty_data) =
self.empty.get_mut(output_type).unwrap().remove(&type_index)
{
let data = entry.insert(empty_data.into());
return (data, false, true);
}
// Look up from storage or create new
match (self.get_address_data)(output_type, type_index) {
Some(data) => {
is_new = data.is_new();
from_empty = data.is_from_emptyaddressdata();
data
let is_new = data.is_new();
let from_empty = data.is_from_emptyaddressdata();
let data = entry.insert(data);
(data, is_new, from_empty)
}
None => {
is_new = true;
WithAddressDataSource::New(LoadedAddressData::default())
let data = entry.insert(WithAddressDataSource::New(
LoadedAddressData::default(),
));
(data, true, false)
}
}
});
(addr_data, is_new, from_empty)
}
}
}
/// Get address data for a send operation (must exist).
@@ -48,8 +48,8 @@ where
let days_old = current_timestamp.difference_in_days_between_float(prev_timestamp);
let older_than_hour = current_timestamp
.checked_sub(prev_timestamp)
.map(|d| d.is_more_than_hour())
.unwrap_or(false);
.unwrap()
.is_more_than_hour();
for (output_type, vec) in by_type.unwrap().into_iter() {
for (type_index, value) in vec {
@@ -78,8 +78,10 @@ where
addr_data.send(value, prev_price)?;
if will_be_empty {
// Address becoming empty
debug_assert!(new_balance.is_zero());
// Address becoming empty - invariant check
if new_balance.is_not_zero() {
unreachable!()
}
*addr_count.get_mut(output_type).unwrap() -= 1;
*empty_addr_count.get_mut(output_type).unwrap() += 1;
+55 -51
View File
@@ -270,72 +270,76 @@ impl Vecs {
// 2. Determine start mode and recover/reset state
let start_mode = determine_start_mode(stateful_min, chain_state_height);
let (starting_height, mut chain_state) = match start_mode {
// Try to resume from checkpoint, fall back to fresh start if needed
let recovered_height = match start_mode {
StartMode::Resume(height) => {
let stamp = Stamp::from(height);
// Rollback BytesVec state (not handled by recover_state)
let _ = self.chain_state.rollback_before(stamp);
let _ = self.txoutindex_to_txinindex.rollback_before(stamp);
// Rollback BytesVec state and capture results for validation
let chain_state_rollback = self.chain_state.rollback_before(stamp);
let txoutindex_rollback = self.txoutindex_to_txinindex.rollback_before(stamp);
// Use recover_state for address and cohort state recovery
// Validate all rollbacks and imports are consistent
let recovered = recover_state(
height,
chain_state_rollback,
txoutindex_rollback,
&mut self.any_address_indexes,
&mut self.addresses_data,
&mut self.utxo_cohorts,
&mut self.address_cohorts,
)?;
// Recover chain_state from stored values
let chain_state = if !recovered.starting_height.is_zero() {
let height_to_timestamp = &indexes.height_to_timestamp_fixed;
let height_to_price = price.map(|p| &p.chainindexes_to_price_close.height);
let mut height_to_timestamp_iter = height_to_timestamp.into_iter();
let mut height_to_price_iter = height_to_price.map(|v| v.into_iter());
let mut chain_state_iter = self.chain_state.into_iter();
(0..recovered.starting_height.to_usize())
.map(|h| {
let h = Height::from(h);
BlockState {
supply: chain_state_iter.get_unwrap(h),
price: height_to_price_iter.as_mut().map(|v| *v.get_unwrap(h)),
timestamp: height_to_timestamp_iter.get_unwrap(h),
}
})
.collect()
} else {
vec![]
};
info!(
"State recovery: {} at height {}",
if recovered.restored { "resumed from checkpoint" } else { "fresh start" },
recovered.starting_height
);
(recovered.starting_height, chain_state)
if recovered.starting_height.is_zero() {
info!("State recovery validation failed, falling back to fresh start");
}
recovered.starting_height
}
StartMode::Fresh => {
// Reset BytesVec state
self.txoutindex_to_txinindex.reset()?;
StartMode::Fresh => Height::ZERO,
};
// Use reset_state for cohort and address state reset
let recovered = reset_state(
&mut self.any_address_indexes,
&mut self.addresses_data,
&mut self.utxo_cohorts,
&mut self.address_cohorts,
)?;
// Fresh start: reset all state
let (starting_height, mut chain_state) = if recovered_height.is_zero() {
self.chain_state.reset()?;
self.txoutindex_to_txinindex.reset()?;
self.height_to_unspendable_supply.reset()?;
self.height_to_opreturn_supply.reset()?;
self.addresstype_to_height_to_addr_count.reset()?;
self.addresstype_to_height_to_empty_addr_count.reset()?;
reset_state(
&mut self.any_address_indexes,
&mut self.addresses_data,
&mut self.utxo_cohorts,
&mut self.address_cohorts,
)?;
info!(
"State recovery: {} at height {}",
if recovered.restored { "resumed from checkpoint" } else { "fresh start" },
recovered.starting_height
);
(recovered.starting_height, vec![])
}
info!("State recovery: fresh start");
(Height::ZERO, vec![])
} else {
// Recover chain_state from stored values
let height_to_timestamp = &indexes.height_to_timestamp_fixed;
let height_to_price = price.map(|p| &p.chainindexes_to_price_close.height);
let mut height_to_timestamp_iter = height_to_timestamp.into_iter();
let mut height_to_price_iter = height_to_price.map(|v| v.into_iter());
let mut chain_state_iter = self.chain_state.into_iter();
let chain_state = (0..recovered_height.to_usize())
.map(|h| {
let h = Height::from(h);
BlockState {
supply: chain_state_iter.get_unwrap(h),
price: height_to_price_iter.as_mut().map(|v| *v.get_unwrap(h)),
timestamp: height_to_timestamp_iter.get_unwrap(h),
}
})
.collect();
info!(
"State recovery: resumed from checkpoint at height {}",
recovered_height
);
(recovered_height, chain_state)
};
// 2b. Validate computed versions
+1 -1
View File
@@ -29,4 +29,4 @@ vecdb = { workspace = true }
[dev-dependencies]
color-eyre = { workspace = true }
mimalloc = { version = "0.1.48", features = ["v3"] }
mimalloc = { workspace = true }