diff --git a/Cargo.lock b/Cargo.lock index 92731e493..f5efe8f31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -601,6 +601,7 @@ dependencies = [ "clap", "color-eyre", "log", + "mimalloc", "minreq", "serde", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 3a9c7eff4..bfe64148f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ fjall = "3.0.0-rc.6" # fjall3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "434979ef59d8fd2b36b91e6ff759a36d19a397ee", package = "fjall" } jiff = "0.2.16" log = "0.4.29" +mimalloc = { version = "0.1.48", features = ["v3"] } minreq = { version = "2.14.1", features = ["https", "serde_json"] } parking_lot = "0.12.5" rayon = "1.11.0" diff --git a/crates/brk_cli/Cargo.toml b/crates/brk_cli/Cargo.toml index 3c7d96ee4..2de769fe6 100644 --- a/crates/brk_cli/Cargo.toml +++ b/crates/brk_cli/Cargo.toml @@ -16,20 +16,21 @@ brk_error = { workspace = true } brk_fetcher = { workspace = true } brk_indexer = { workspace = true } brk_iterator = { workspace = true } +brk_logger = { workspace = true } brk_mempool = { workspace = true } brk_query = { workspace = true } -brk_logger = { workspace = true } brk_reader = { workspace = true } brk_rpc = { workspace = true } brk_server = { workspace = true } -vecdb = { workspace = true } clap = { version = "4.5.53", features = ["derive", "string"] } color-eyre = { workspace = true } log = { workspace = true } +mimalloc = { workspace = true } minreq = { workspace = true } serde = { workspace = true } tokio = { workspace = true } toml = "0.9.8" +vecdb = { workspace = true } zip = { version = "6.0.0", default-features = false, features = ["deflate"] } [[bin]] diff --git a/crates/brk_cli/src/main.rs b/crates/brk_cli/src/main.rs index d884dcb48..0a2a91eca 100644 --- a/crates/brk_cli/src/main.rs +++ b/crates/brk_cli/src/main.rs @@ -19,6 +19,7 @@ use brk_query::AsyncQuery; use brk_reader::Reader; use brk_server::{Server, VERSION}; use log::info; +use mimalloc::MiMalloc; use vecdb::Exit; mod config; @@ -27,6 +28,9 @@ mod website; use crate::{config::Config, paths::*}; +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + pub fn main() -> color_eyre::Result<()> { // Can't increase main thread's stack size, thus we need to use another thread thread::Builder::new() diff --git a/crates/brk_computer/.gitignore b/crates/brk_computer/.gitignore new file mode 100644 index 000000000..b2e818da8 --- /dev/null +++ b/crates/brk_computer/.gitignore @@ -0,0 +1 @@ +bottlenecks.md diff --git a/crates/brk_computer/src/grouped/price_percentiles.rs b/crates/brk_computer/src/grouped/price_percentiles.rs index 430ab2908..f71e7d3f4 100644 --- a/crates/brk_computer/src/grouped/price_percentiles.rs +++ b/crates/brk_computer/src/grouped/price_percentiles.rs @@ -105,6 +105,16 @@ impl PricePercentiles { } Ok(()) } + + /// Validate computed versions or reset if mismatched. + pub fn validate_computed_version_or_reset(&mut self, version: Version) -> Result<()> { + for vec in self.vecs.iter_mut().flatten() { + if let Some(height_vec) = vec.height.as_mut() { + height_vec.validate_computed_version_or_reset(version)?; + } + } + Ok(()) + } } impl Traversable for PricePercentiles { diff --git a/crates/brk_computer/src/lib.rs b/crates/brk_computer/src/lib.rs index 880ff7c91..7f47d505d 100644 --- a/crates/brk_computer/src/lib.rs +++ b/crates/brk_computer/src/lib.rs @@ -182,84 +182,84 @@ impl Computer { info!("Computed prices in {:?}", i.elapsed()); } - thread::scope(|scope| -> Result<()> { - let blks = scope.spawn(|| -> Result<()> { - info!("Computing BLKs metadata..."); - let i = Instant::now(); - self.blks - .compute(indexer, &starting_indexes, reader, exit)?; - info!("Computed blk in {:?}", i.elapsed()); - Ok(()) - }); + // thread::scope(|scope| -> Result<()> { + // let blks = scope.spawn(|| -> Result<()> { + info!("Computing BLKs metadata..."); + let i = Instant::now(); + self.blks + .compute(indexer, &starting_indexes, reader, exit)?; + info!("Computed blk in {:?}", i.elapsed()); + // Ok(()) + // }); - let constants = scope.spawn(|| -> Result<()> { - info!("Computing constants..."); - let i = Instant::now(); - self.constants - .compute(&self.indexes, &starting_indexes, exit)?; - info!("Computed constants in {:?}", i.elapsed()); - Ok(()) - }); + // let constants = scope.spawn(|| -> Result<()> { + info!("Computing constants..."); + let i = Instant::now(); + self.constants + .compute(&self.indexes, &starting_indexes, exit)?; + info!("Computed constants in {:?}", i.elapsed()); + // Ok(()) + // }); - let chain = scope.spawn(|| -> Result<()> { - info!("Computing chain..."); - let i = Instant::now(); - self.chain.compute( - indexer, - &self.indexes, - &starting_indexes, - self.price.as_ref(), - exit, - )?; - info!("Computed chain in {:?}", i.elapsed()); - Ok(()) - }); + // let chain = scope.spawn(|| -> Result<()> { + info!("Computing chain..."); + let i = Instant::now(); + self.chain.compute( + indexer, + &self.indexes, + &starting_indexes, + self.price.as_ref(), + exit, + )?; + info!("Computed chain in {:?}", i.elapsed()); + // Ok(()) + // }); - if let Some(price) = self.price.as_ref() { - info!("Computing market..."); - let i = Instant::now(); - self.market.compute(price, &starting_indexes, exit)?; - info!("Computed market in {:?}", i.elapsed()); - } + if let Some(price) = self.price.as_ref() { + info!("Computing market..."); + let i = Instant::now(); + self.market.compute(price, &starting_indexes, exit)?; + info!("Computed market in {:?}", i.elapsed()); + } - blks.join().unwrap()?; - constants.join().unwrap()?; - chain.join().unwrap()?; - Ok(()) - })?; + // blks.join().unwrap()?; + // constants.join().unwrap()?; + // chain.join().unwrap()?; + // Ok(()) + // })?; let starting_indexes_clone = starting_indexes.clone(); - thread::scope(|scope| -> Result<()> { - let pools = scope.spawn(|| -> Result<()> { - info!("Computing pools..."); - let i = Instant::now(); - self.pools.compute( - indexer, - &self.indexes, - &starting_indexes_clone, - &self.chain, - self.price.as_ref(), - exit, - )?; - info!("Computed pools in {:?}", i.elapsed()); - Ok(()) - }); + // thread::scope(|scope| -> Result<()> { + // let pools = scope.spawn(|| -> Result<()> { + info!("Computing pools..."); + let i = Instant::now(); + self.pools.compute( + indexer, + &self.indexes, + &starting_indexes_clone, + &self.chain, + self.price.as_ref(), + exit, + )?; + info!("Computed pools in {:?}", i.elapsed()); + // Ok(()) + // }); - info!("Computing stateful..."); - let i = Instant::now(); - self.stateful.compute( - indexer, - &self.indexes, - &self.chain, - self.price.as_ref(), - &mut starting_indexes, - exit, - )?; - info!("Computed stateful in {:?}", i.elapsed()); + info!("Computing stateful..."); + let i = Instant::now(); + self.stateful.compute( + indexer, + &self.indexes, + &self.chain, + self.price.as_ref(), + &mut starting_indexes, + exit, + )?; + info!("Computed stateful in {:?}", i.elapsed()); - pools.join().unwrap()?; - Ok(()) - })?; + // pools.join().unwrap()?; + // Ok(()) + // })?; info!("Computing cointime..."); let i = Instant::now(); diff --git a/crates/brk_computer/src/stateful/address/address_count.rs b/crates/brk_computer/src/stateful/address/address_count.rs index e0f2c516e..8183f7593 100644 --- a/crates/brk_computer/src/stateful/address/address_count.rs +++ b/crates/brk_computer/src/stateful/address/address_count.rs @@ -97,6 +97,19 @@ impl AddressTypeToHeightToAddressCount { .truncate_push(height, addresstype_to_usize.p2a.into())?; Ok(()) } + + pub fn reset(&mut self) -> Result<()> { + use vecdb::GenericStoredVec; + self.p2pk65.reset()?; + self.p2pk33.reset()?; + self.p2pkh.reset()?; + self.p2sh.reset()?; + self.p2wpkh.reset()?; + self.p2wsh.reset()?; + self.p2tr.reset()?; + self.p2a.reset()?; + Ok(()) + } } /// Address count per address type, indexed by various indexes (dateindex, etc.). diff --git a/crates/brk_computer/src/stateful/cohorts/address.rs b/crates/brk_computer/src/stateful/cohorts/address.rs index 17ac7cc8f..d462ebf0d 100644 --- a/crates/brk_computer/src/stateful/cohorts/address.rs +++ b/crates/brk_computer/src/stateful/cohorts/address.rs @@ -128,24 +128,52 @@ impl DynCohortVecs for AddressCohortVecs { fn reset_state_starting_height(&mut self) { self.reset_starting_height(); + if let Some(state) = self.state.as_mut() { + state.reset(); + } } fn import_state(&mut self, starting_height: Height) -> Result { + use vecdb::GenericStoredVec; + // Import state from runtime state if present if let Some(state) = self.state.as_mut() { - let imported = state.inner.import_at_or_before(starting_height)?; - self.starting_height = Some(imported); + // State files are saved AT height H, so to resume at H+1 we need to import at H + // Decrement first, then increment result to match expected starting_height + if let Some(mut prev_height) = starting_height.decremented() { + // Import price_to_amount state file (may adjust prev_height to actual file found) + prev_height = state.inner.import_at_or_before(prev_height)?; - // Restore addr_count from last known value - if let Some(prev_height) = imported.decremented() { - use vecdb::TypedVecIterator; - state.addr_count = *self - .height_to_addr_count - .into_iter() - .get_unwrap(prev_height); + // Restore supply state from height-indexed vectors + state.inner.supply.value = self + .metrics + .supply + .height_to_supply + .read_once(prev_height)?; + state.inner.supply.utxo_count = *self + .metrics + .supply + .height_to_utxo_count + .read_once(prev_height)?; + state.addr_count = *self.height_to_addr_count.read_once(prev_height)?; + + // Restore realized cap if present + if let Some(realized_metrics) = self.metrics.realized.as_mut() + && let Some(realized_state) = state.inner.realized.as_mut() + { + realized_state.cap = realized_metrics + .height_to_realized_cap + .read_once(prev_height)?; + } + + let result = prev_height.incremented(); + self.starting_height = Some(result); + Ok(result) + } else { + // starting_height is 0, nothing to import + self.starting_height = Some(Height::ZERO); + Ok(Height::ZERO) } - - Ok(imported) } else { self.starting_height = Some(starting_height); Ok(starting_height) diff --git a/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs b/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs index 78a700ec0..e25cf656f 100644 --- a/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs +++ b/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs @@ -236,14 +236,12 @@ impl AddressCohorts { .unwrap_or_default() } - /// Import state for all separate cohorts at given height. - /// - /// Note: This follows the same pattern as UTXOCohorts - errors are ignored - /// and the start_mode logic ensures we're in a valid state before calling. - pub fn import_separate_states(&mut self, height: Height) { - self.par_iter_separate_mut().for_each(|v| { - let _ = v.import_state(height); - }); + /// Import state for all separate cohorts at or before given height. + /// Returns true if all imports succeeded and returned the expected height. + pub fn import_separate_states(&mut self, height: Height) -> bool { + self.par_iter_separate_mut() + .map(|v| v.import_state(height).unwrap_or_default()) + .all(|h| h == height) } /// Reset state heights for all separate cohorts. diff --git a/crates/brk_computer/src/stateful/cohorts/state_address.rs b/crates/brk_computer/src/stateful/cohorts/state_address.rs index 65bea84f4..2081092fe 100644 --- a/crates/brk_computer/src/stateful/cohorts/state_address.rs +++ b/crates/brk_computer/src/stateful/cohorts/state_address.rs @@ -21,6 +21,18 @@ impl AddressCohortState { } } + /// Reset state for fresh start. + pub fn reset(&mut self) { + self.addr_count = 0; + self.inner.supply = crate::SupplyState::default(); + self.inner.sent = Sats::ZERO; + self.inner.satblocks_destroyed = Sats::ZERO; + self.inner.satdays_destroyed = Sats::ZERO; + if let Some(realized) = self.inner.realized.as_mut() { + *realized = crate::RealizedState::NAN; + } + } + pub fn reset_price_to_amount_if_needed(&mut self) -> Result<()> { self.inner.reset_price_to_amount_if_needed() } diff --git a/crates/brk_computer/src/stateful/cohorts/state_utxo.rs b/crates/brk_computer/src/stateful/cohorts/state_utxo.rs index c633ba41f..dc3c12766 100644 --- a/crates/brk_computer/src/stateful/cohorts/state_utxo.rs +++ b/crates/brk_computer/src/stateful/cohorts/state_utxo.rs @@ -1,9 +1,11 @@ use std::path::Path; use brk_error::Result; +use brk_types::Sats; use derive_deref::{Deref, DerefMut}; use super::CohortState; +use crate::{RealizedState, SupplyState}; #[derive(Clone, Deref, DerefMut)] pub struct UTXOCohortState(CohortState); @@ -16,4 +18,15 @@ impl UTXOCohortState { pub fn reset_price_to_amount_if_needed(&mut self) -> Result<()> { self.0.reset_price_to_amount_if_needed() } + + /// Reset state for fresh start. + pub fn reset(&mut self) { + self.0.supply = SupplyState::default(); + self.0.sent = Sats::ZERO; + self.0.satblocks_destroyed = Sats::ZERO; + self.0.satdays_destroyed = Sats::ZERO; + if let Some(realized) = self.0.realized.as_mut() { + *realized = RealizedState::NAN; + } + } } diff --git a/crates/brk_computer/src/stateful/cohorts/utxo.rs b/crates/brk_computer/src/stateful/cohorts/utxo.rs index 3572ab943..8eae22512 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo.rs @@ -92,9 +92,12 @@ impl UTXOCohortVecs { self.state_starting_height = Some(height); } - /// Reset state starting height to zero. + /// Reset state starting height to zero and reset state values. pub fn reset_state_starting_height(&mut self) { self.state_starting_height = Some(Height::ZERO); + if let Some(state) = self.state.as_mut() { + state.reset(); + } } /// Compute percentile prices from standalone price_to_amount. @@ -150,14 +153,40 @@ impl DynCohortVecs for UTXOCohortVecs { fn reset_state_starting_height(&mut self) { self.state_starting_height = Some(Height::ZERO); + if let Some(state) = self.state.as_mut() { + state.reset(); + } } fn import_state(&mut self, starting_height: Height) -> Result { + use vecdb::GenericStoredVec; + // Import state from runtime state if present if let Some(state) = self.state.as_mut() { - let imported = state.import_at_or_before(starting_height)?; - self.state_starting_height = Some(imported); - Ok(imported) + // State files are saved AT height H, so to resume at H+1 we need to import at H + // Decrement first, then increment result to match expected starting_height + if let Some(mut prev_height) = starting_height.decremented() { + // Import price_to_amount state file (may adjust prev_height to actual file found) + prev_height = state.import_at_or_before(prev_height)?; + + // Restore supply state from height-indexed vectors + state.supply.value = self.metrics.supply.height_to_supply.read_once(prev_height)?; + state.supply.utxo_count = *self.metrics.supply.height_to_utxo_count.read_once(prev_height)?; + + // Restore realized cap if present + if let Some(realized_metrics) = self.metrics.realized.as_mut() + && let Some(realized_state) = state.realized.as_mut() { + realized_state.cap = realized_metrics.height_to_realized_cap.read_once(prev_height)?; + } + + let result = prev_height.incremented(); + self.state_starting_height = Some(result); + Ok(result) + } else { + // starting_height is 0, nothing to import + self.state_starting_height = Some(Height::ZERO); + Ok(Height::ZERO) + } } else { self.state_starting_height = Some(starting_height); Ok(starting_height) diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs index 03e782158..62f828160 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs @@ -344,14 +344,18 @@ impl UTXOCohorts { }) } - /// Flush stateful vectors for separate cohorts. + /// Flush stateful vectors for separate and aggregate cohorts. pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { + // Flush separate cohorts (includes metrics + state) self.par_iter_separate_mut() .try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?; - self.0 - .par_iter_aggregate_mut() - .try_for_each(|v| v.price_to_amount.flush_at_height(height, exit)) + // Flush aggregate cohorts' price_to_amount state AND metrics (including price_percentiles) + for v in self.0.iter_aggregate_mut() { + v.price_to_amount.flush_at_height(height, exit)?; + v.metrics.safe_flush(exit)?; + } + Ok(()) } /// Reset aggregate cohorts' price_to_amount for fresh start. @@ -382,11 +386,12 @@ impl UTXOCohorts { .unwrap_or_default() } - /// Import state for all separate cohorts at given height. - pub fn import_separate_states(&mut self, height: Height) { - self.par_iter_separate_mut().for_each(|v| { - let _ = v.import_state(height); - }); + /// Import state for all separate cohorts at or before given height. + /// Returns true if all imports succeeded and returned the expected height. + pub fn import_separate_states(&mut self, height: Height) -> bool { + self.par_iter_separate_mut() + .map(|v| v.import_state(height).unwrap_or_default()) + .all(|h| h == height) } /// Reset state heights for all separate cohorts. @@ -463,9 +468,17 @@ impl UTXOCohorts { Ok(()) } - /// Validate computed versions for all separate cohorts. + /// Validate computed versions for all cohorts (separate and aggregate). pub fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> { + // Validate separate cohorts self.par_iter_separate_mut() - .try_for_each(|v| v.validate_computed_versions(base_version)) + .try_for_each(|v| v.validate_computed_versions(base_version))?; + + // Validate aggregate cohorts' price_percentiles + for v in self.0.iter_aggregate_mut() { + v.validate_computed_versions(base_version)?; + } + + Ok(()) } } diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs index b6c9b8361..f8309060a 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs @@ -69,8 +69,8 @@ impl UTXOCohorts { last_timestamp.difference_in_days_between_float(block_state.timestamp); let older_than_hour = last_timestamp .checked_sub(block_state.timestamp) - .map(|d| d.is_more_than_hour()) - .unwrap_or(false); + .unwrap() + .is_more_than_hour(); // Update time-based cohorts time_cohorts diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs index bee1018d8..38e1dc50b 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs @@ -6,7 +6,7 @@ use brk_grouper::{Filter, Filtered, UTXOGroups}; use brk_types::{ONE_DAY_IN_SEC, Sats, Timestamp}; -use crate::{states::BlockState, utils::OptionExt, PriceToAmount}; +use crate::{PriceToAmount, states::BlockState, utils::OptionExt}; use super::UTXOCohorts; diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index 2e44d669c..f8eca7397 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -66,6 +66,8 @@ pub fn process_blocks( ctx.price.is_some() ); + info!("Setting up references..."); + // References to vectors using correct field paths // From indexer.vecs: let height_to_first_txindex = &indexer.vecs.tx.height_to_first_txindex; @@ -99,6 +101,8 @@ pub fn process_blocks( let height_to_price_vec = &ctx.height_to_price; let height_to_timestamp_vec = &ctx.height_to_timestamp; + info!("Creating iterators..."); + // Create iterators for sequential access let mut height_to_first_txindex_iter = height_to_first_txindex.into_iter(); let mut height_to_first_txoutindex_iter = height_to_first_txoutindex.into_iter(); @@ -116,13 +120,19 @@ pub fn process_blocks( let mut height_to_price_iter = height_to_price.map(|v| v.into_iter()); let mut dateindex_to_price_iter = dateindex_to_price.map(|v| v.into_iter()); + info!("Building txoutindex_to_height map..."); + // Build txoutindex -> height map for input processing let txoutindex_to_height = build_txoutindex_to_height_map(height_to_first_txoutindex); + info!("Creating readers..."); + // Create readers for parallel data access let ir = IndexerReaders::new(indexer); let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); + info!("Creating address iterators..."); + // Create iterators for first address indexes per type let mut first_p2a_iter = indexer .vecs @@ -165,6 +175,8 @@ pub fn process_blocks( .height_to_first_p2wshaddressindex .into_iter(); + info!("Recovering running totals..."); + // Track running totals - recover from previous height if resuming let ( mut unspendable_supply, @@ -172,23 +184,29 @@ pub fn process_blocks( mut addresstype_to_addr_count, mut addresstype_to_empty_addr_count, ) = if starting_height > Height::ZERO { + info!("Reading unspendable_supply..."); let prev_height = starting_height.decremented().unwrap(); - ( - vecs.height_to_unspendable_supply - .into_iter() - .get_unwrap(prev_height), - vecs.height_to_opreturn_supply - .into_iter() - .get_unwrap(prev_height), - AddressTypeToAddressCount::from(( - &vecs.addresstype_to_height_to_addr_count, - starting_height, - )), - AddressTypeToAddressCount::from(( - &vecs.addresstype_to_height_to_empty_addr_count, - starting_height, - )), - ) + let unspendable = vecs + .height_to_unspendable_supply + .into_iter() + .get_unwrap(prev_height); + info!("Reading opreturn_supply..."); + let opreturn = vecs + .height_to_opreturn_supply + .into_iter() + .get_unwrap(prev_height); + info!("Reading addresstype_to_addr_count..."); + let addr_count = AddressTypeToAddressCount::from(( + &vecs.addresstype_to_height_to_addr_count, + starting_height, + )); + info!("Reading addresstype_to_empty_addr_count..."); + let empty_addr_count = AddressTypeToAddressCount::from(( + &vecs.addresstype_to_height_to_empty_addr_count, + starting_height, + )); + info!("Recovery complete."); + (unspendable, opreturn, addr_count, empty_addr_count) } else { ( Sats::ZERO, @@ -204,13 +222,13 @@ pub fn process_blocks( let mut empty_cache: AddressTypeToTypeIndexMap = AddressTypeToTypeIndexMap::default(); + info!("Starting main block iteration..."); + // Main block iteration for height in starting_height.to_usize()..=last_height.to_usize() { let height = Height::from(height); - if height.to_usize() % 10000 == 0 { - info!("Processing chain at {}...", height); - } + info!("Processing chain at {}...", height); // Get block metadata let first_txindex = height_to_first_txindex_iter.get_unwrap(height); @@ -452,7 +470,14 @@ pub fn process_blocks( // Drop readers before flush to release mmap handles drop(vr); - flush_checkpoint(vecs, height, &mut loaded_cache, &mut empty_cache, exit)?; + flush_checkpoint( + vecs, + height, + chain_state, + &mut loaded_cache, + &mut empty_cache, + exit, + )?; // Recreate readers after flush to pick up new data vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); @@ -462,7 +487,14 @@ pub fn process_blocks( // Final flush let _lock = exit.lock(); drop(vr); - flush_checkpoint(vecs, last_height, &mut loaded_cache, &mut empty_cache, exit)?; + flush_checkpoint( + vecs, + last_height, + chain_state, + &mut loaded_cache, + &mut empty_cache, + exit, + )?; Ok(()) } @@ -491,23 +523,15 @@ fn push_cohort_states( dateindex: Option, date_price: Option>, ) -> Result<()> { - utxo_cohorts - .par_iter_separate_mut() - .map(|v| v as &mut dyn DynCohortVecs) - .chain( - address_cohorts - .par_iter_separate_mut() - .map(|v| v as &mut dyn DynCohortVecs), - ) - .try_for_each(|v| { - v.truncate_push(height)?; - v.compute_then_truncate_push_unrealized_states( - height, - height_price, - dateindex, - date_price, - ) - })?; + utxo_cohorts.par_iter_separate_mut().try_for_each(|v| { + v.truncate_push(height)?; + v.compute_then_truncate_push_unrealized_states(height, height_price, dateindex, date_price) + })?; + + address_cohorts.par_iter_separate_mut().try_for_each(|v| { + v.truncate_push(height)?; + v.compute_then_truncate_push_unrealized_states(height, height_price, dateindex, date_price) + })?; Ok(()) } @@ -518,11 +542,12 @@ fn push_cohort_states( /// - Cohort stateful vectors /// - Height-indexed vectors /// - Address data caches (loaded and empty) -/// - Chain state +/// - Chain state (synced from in-memory to persisted) #[allow(clippy::too_many_arguments)] fn flush_checkpoint( vecs: &mut Vecs, height: Height, + chain_state: &[BlockState], loaded_cache: &mut AddressTypeToTypeIndexMap, empty_cache: &mut AddressTypeToTypeIndexMap, exit: &Exit, @@ -564,9 +589,15 @@ fn flush_checkpoint( exit, )?; - // Flush chain state and txoutindex_to_txinindex with stamp + // Flush txoutindex_to_txinindex with stamp vecs.txoutindex_to_txinindex .stamped_flush_with_changes(height.into())?; + + // Sync in-memory chain_state to persisted and flush + vecs.chain_state.truncate_if_needed(Height::ZERO)?; + for block_state in chain_state { + vecs.chain_state.push(block_state.supply.clone()); + } vecs.chain_state.stamped_flush_with_changes(height.into())?; Ok(()) diff --git a/crates/brk_computer/src/stateful/compute/readers.rs b/crates/brk_computer/src/stateful/compute/readers.rs index e4f985400..534e1ed73 100644 --- a/crates/brk_computer/src/stateful/compute/readers.rs +++ b/crates/brk_computer/src/stateful/compute/readers.rs @@ -74,40 +74,32 @@ impl VecsReaders { pub fn build_txoutindex_to_txindex<'a>( block_first_txindex: TxIndex, block_tx_count: u64, - txindex_to_output_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>, + txindex_to_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>, ) -> Vec { - let first = block_first_txindex.to_usize(); - - let counts: Vec = (0..block_tx_count as usize) - .map(|offset| { - let txindex = TxIndex::from(first + offset); - u64::from(txindex_to_output_count.get_unwrap(txindex)) - }) - .collect(); - - let total: u64 = counts.iter().sum(); - let mut result = Vec::with_capacity(total as usize); - - for (offset, &count) in counts.iter().enumerate() { - let txindex = TxIndex::from(first + offset); - result.extend(std::iter::repeat_n(txindex, count as usize)); - } - - result + build_index_to_txindex(block_first_txindex, block_tx_count, txindex_to_count) } /// Build txinindex -> txindex mapping for a block. pub fn build_txinindex_to_txindex<'a>( block_first_txindex: TxIndex, block_tx_count: u64, - txindex_to_input_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>, + txindex_to_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>, +) -> Vec { + build_index_to_txindex(block_first_txindex, block_tx_count, txindex_to_count) +} + +/// Build index -> txindex mapping for a block (shared implementation). +fn build_index_to_txindex<'a>( + block_first_txindex: TxIndex, + block_tx_count: u64, + txindex_to_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>, ) -> Vec { let first = block_first_txindex.to_usize(); let counts: Vec = (0..block_tx_count as usize) .map(|offset| { let txindex = TxIndex::from(first + offset); - u64::from(txindex_to_input_count.get_unwrap(txindex)) + u64::from(txindex_to_count.get_unwrap(txindex)) }) .collect(); diff --git a/crates/brk_computer/src/stateful/compute/recover.rs b/crates/brk_computer/src/stateful/compute/recover.rs index 739648a27..2ba950725 100644 --- a/crates/brk_computer/src/stateful/compute/recover.rs +++ b/crates/brk_computer/src/stateful/compute/recover.rs @@ -15,18 +15,19 @@ use super::super::AddressesDataVecs; /// Result of state recovery. pub struct RecoveredState { - /// Height to start processing from. + /// Height to start processing from. Zero means fresh start. pub starting_height: Height, - /// Whether state was successfully restored (vs starting fresh). - pub restored: bool, } /// Perform state recovery for resuming from checkpoint. /// /// Rolls back state vectors and imports cohort states. -/// Returns the recovered state information. +/// Validates that all rollbacks and imports are consistent. +/// Returns Height::ZERO if any validation fails (triggers fresh start). pub fn recover_state( height: Height, + chain_state_rollback: vecdb::Result, + txoutindex_rollback: vecdb::Result, any_address_indexes: &mut AnyAddressIndexesVecs, addresses_data: &mut AddressesDataVecs, utxo_cohorts: &mut UTXOCohorts, @@ -38,24 +39,45 @@ pub fn recover_state( let address_indexes_rollback = any_address_indexes.rollback_before(stamp); let address_data_rollback = addresses_data.rollback_before(stamp); - // Verify rollback consistency (uses rollback_states helper) - let _consistent_height = rollback_states( - stamp, - Ok(stamp), // chain_state handled separately + // Verify rollback consistency - all must agree on the same height + let consistent_height = rollback_states( + chain_state_rollback, + txoutindex_rollback, address_indexes_rollback, address_data_rollback, ); - // Import cohort states - utxo_cohorts.import_separate_states(height); - address_cohorts.import_separate_states(height); + // If rollbacks are inconsistent, start fresh + if consistent_height.is_zero() { + return Ok(RecoveredState { + starting_height: Height::ZERO, + }); + } - // Import aggregate price_to_amount - let _ = import_aggregate_price_to_amount(height, utxo_cohorts)?; + // Import UTXO cohort states - all must succeed + if !utxo_cohorts.import_separate_states(height) { + return Ok(RecoveredState { + starting_height: Height::ZERO, + }); + } + + // Import address cohort states - all must succeed + if !address_cohorts.import_separate_states(height) { + return Ok(RecoveredState { + starting_height: Height::ZERO, + }); + } + + // Import aggregate price_to_amount - must match height + let imported = import_aggregate_price_to_amount(height, utxo_cohorts)?; + if imported != height { + return Ok(RecoveredState { + starting_height: Height::ZERO, + }); + } Ok(RecoveredState { starting_height: height, - restored: true, }) } @@ -85,12 +107,16 @@ pub fn reset_state( Ok(RecoveredState { starting_height: Height::ZERO, - restored: false, }) } /// Check if we can resume from a checkpoint or need to start fresh. pub fn determine_start_mode(computed_min: Height, chain_state_height: Height) -> StartMode { + // No data to resume from + if chain_state_height.is_zero() { + return StartMode::Fresh; + } + match computed_min.cmp(&chain_state_height) { Ordering::Greater => unreachable!("min height > chain state height"), Ordering::Equal => StartMode::Resume(chain_state_height), @@ -108,32 +134,42 @@ pub enum StartMode { /// Rollback state vectors to before a given stamp. /// -/// Returns the consistent starting height if all vectors agree, +/// Returns the consistent starting height if ALL rollbacks succeed and agree, /// otherwise returns Height::ZERO (need fresh start). fn rollback_states( - _stamp: Stamp, chain_state_rollback: vecdb::Result, + txoutindex_rollback: vecdb::Result, address_indexes_rollbacks: Result>, address_data_rollbacks: Result<[Stamp; 2]>, ) -> Height { let mut heights: BTreeSet = BTreeSet::new(); - if let Ok(s) = chain_state_rollback { + // All rollbacks must succeed - any error means fresh start + let Ok(s) = chain_state_rollback else { + return Height::ZERO; + }; + heights.insert(Height::from(s).incremented()); + + let Ok(s) = txoutindex_rollback else { + return Height::ZERO; + }; + heights.insert(Height::from(s).incremented()); + + let Ok(stamps) = address_indexes_rollbacks else { + return Height::ZERO; + }; + for s in stamps { heights.insert(Height::from(s).incremented()); } - if let Ok(stamps) = address_indexes_rollbacks { - for s in stamps { - heights.insert(Height::from(s).incremented()); - } - } - - if let Ok(stamps) = address_data_rollbacks { - for s in stamps { - heights.insert(Height::from(s).incremented()); - } + let Ok(stamps) = address_data_rollbacks else { + return Height::ZERO; + }; + for s in stamps { + heights.insert(Height::from(s).incremented()); } + // All must agree on the same height if heights.len() == 1 { heights.pop_first().unwrap() } else { diff --git a/crates/brk_computer/src/stateful/metrics/mod.rs b/crates/brk_computer/src/stateful/metrics/mod.rs index 93e3733e8..6a8e0c916 100644 --- a/crates/brk_computer/src/stateful/metrics/mod.rs +++ b/crates/brk_computer/src/stateful/metrics/mod.rs @@ -134,6 +134,10 @@ impl CohortMetrics { realized.validate_computed_versions(base_version)?; } + if let Some(price_paid) = self.price_paid.as_mut() { + price_paid.validate_computed_versions(base_version)?; + } + Ok(()) } @@ -151,10 +155,8 @@ impl CohortMetrics { self.price_paid.as_mut(), height_price, ) { - // Push price paid min/max price_paid.truncate_push_minmax(height, state)?; - // Compute unrealized states from price_to_amount let (height_unrealized_state, date_unrealized_state) = state.compute_unrealized_states(height_price, date_price.unwrap()); @@ -165,7 +167,6 @@ impl CohortMetrics { date_unrealized_state.as_ref(), )?; - // Compute and push price percentiles price_paid.truncate_push_percentiles(height, state)?; } diff --git a/crates/brk_computer/src/stateful/metrics/price_paid.rs b/crates/brk_computer/src/stateful/metrics/price_paid.rs index 32dc21674..90d7045f9 100644 --- a/crates/brk_computer/src/stateful/metrics/price_paid.rs +++ b/crates/brk_computer/src/stateful/metrics/price_paid.rs @@ -117,6 +117,14 @@ impl PricePaidMetrics { Ok(()) } + /// Validate computed versions or reset if mismatched. + pub fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> { + if let Some(price_percentiles) = self.price_percentiles.as_mut() { + price_percentiles.validate_computed_version_or_reset(base_version)?; + } + Ok(()) + } + /// Compute aggregate values from separate cohorts. pub fn compute_from_stateful( &mut self, diff --git a/crates/brk_computer/src/stateful/metrics/realized.rs b/crates/brk_computer/src/stateful/metrics/realized.rs index 491356ec5..005f45a38 100644 --- a/crates/brk_computer/src/stateful/metrics/realized.rs +++ b/crates/brk_computer/src/stateful/metrics/realized.rs @@ -434,8 +434,12 @@ impl RealizedMetrics { self.height_to_realized_loss.safe_write(exit)?; self.height_to_value_created.safe_write(exit)?; self.height_to_value_destroyed.safe_write(exit)?; - self.height_to_adjusted_value_created.um().safe_write(exit)?; - self.height_to_adjusted_value_destroyed.um().safe_write(exit)?; + if let Some(v) = self.height_to_adjusted_value_created.as_mut() { + v.safe_write(exit)?; + } + if let Some(v) = self.height_to_adjusted_value_destroyed.as_mut() { + v.safe_write(exit)?; + } Ok(()) } diff --git a/crates/brk_computer/src/stateful/process/address_lookup.rs b/crates/brk_computer/src/stateful/process/address_lookup.rs index 5d5bf050d..2e4dffbb3 100644 --- a/crates/brk_computer/src/stateful/process/address_lookup.rs +++ b/crates/brk_computer/src/stateful/process/address_lookup.rs @@ -40,36 +40,44 @@ where output_type: OutputType, type_index: TypeIndex, ) -> (&mut LoadedAddressDataWithSource, bool, bool) { - let mut is_new = false; - let mut from_empty = false; + use std::collections::hash_map::Entry; - let addr_data = self - .loaded - .get_mut(output_type) - .unwrap() - .entry(type_index) - .or_insert_with(|| { + let map = self.loaded.get_mut(output_type).unwrap(); + + match map.entry(type_index) { + Entry::Occupied(entry) => { + // Entry already exists - check its source + let data = entry.into_mut(); + let is_new = data.is_new(); + let from_empty = data.is_from_emptyaddressdata(); + (data, is_new, from_empty) + } + Entry::Vacant(entry) => { // Check if it was in empty set - if let Some(empty_data) = self.empty.get_mut(output_type).unwrap().remove(&type_index) { - from_empty = true; - return empty_data.into(); + if let Some(empty_data) = + self.empty.get_mut(output_type).unwrap().remove(&type_index) + { + let data = entry.insert(empty_data.into()); + return (data, false, true); } // Look up from storage or create new match (self.get_address_data)(output_type, type_index) { Some(data) => { - is_new = data.is_new(); - from_empty = data.is_from_emptyaddressdata(); - data + let is_new = data.is_new(); + let from_empty = data.is_from_emptyaddressdata(); + let data = entry.insert(data); + (data, is_new, from_empty) } None => { - is_new = true; - WithAddressDataSource::New(LoadedAddressData::default()) + let data = entry.insert(WithAddressDataSource::New( + LoadedAddressData::default(), + )); + (data, true, false) } } - }); - - (addr_data, is_new, from_empty) + } + } } /// Get address data for a send operation (must exist). diff --git a/crates/brk_computer/src/stateful/process/sent.rs b/crates/brk_computer/src/stateful/process/sent.rs index 1c8eff606..eba9ce89c 100644 --- a/crates/brk_computer/src/stateful/process/sent.rs +++ b/crates/brk_computer/src/stateful/process/sent.rs @@ -48,8 +48,8 @@ where let days_old = current_timestamp.difference_in_days_between_float(prev_timestamp); let older_than_hour = current_timestamp .checked_sub(prev_timestamp) - .map(|d| d.is_more_than_hour()) - .unwrap_or(false); + .unwrap() + .is_more_than_hour(); for (output_type, vec) in by_type.unwrap().into_iter() { for (type_index, value) in vec { @@ -78,8 +78,10 @@ where addr_data.send(value, prev_price)?; if will_be_empty { - // Address becoming empty - debug_assert!(new_balance.is_zero()); + // Address becoming empty - invariant check + if new_balance.is_not_zero() { + unreachable!() + } *addr_count.get_mut(output_type).unwrap() -= 1; *empty_addr_count.get_mut(output_type).unwrap() += 1; diff --git a/crates/brk_computer/src/stateful/vecs.rs b/crates/brk_computer/src/stateful/vecs.rs index e555203e6..1d067b03b 100644 --- a/crates/brk_computer/src/stateful/vecs.rs +++ b/crates/brk_computer/src/stateful/vecs.rs @@ -270,72 +270,76 @@ impl Vecs { // 2. Determine start mode and recover/reset state let start_mode = determine_start_mode(stateful_min, chain_state_height); - let (starting_height, mut chain_state) = match start_mode { + // Try to resume from checkpoint, fall back to fresh start if needed + let recovered_height = match start_mode { StartMode::Resume(height) => { let stamp = Stamp::from(height); - // Rollback BytesVec state (not handled by recover_state) - let _ = self.chain_state.rollback_before(stamp); - let _ = self.txoutindex_to_txinindex.rollback_before(stamp); + // Rollback BytesVec state and capture results for validation + let chain_state_rollback = self.chain_state.rollback_before(stamp); + let txoutindex_rollback = self.txoutindex_to_txinindex.rollback_before(stamp); - // Use recover_state for address and cohort state recovery + // Validate all rollbacks and imports are consistent let recovered = recover_state( height, + chain_state_rollback, + txoutindex_rollback, &mut self.any_address_indexes, &mut self.addresses_data, &mut self.utxo_cohorts, &mut self.address_cohorts, )?; - // Recover chain_state from stored values - let chain_state = if !recovered.starting_height.is_zero() { - let height_to_timestamp = &indexes.height_to_timestamp_fixed; - let height_to_price = price.map(|p| &p.chainindexes_to_price_close.height); - - let mut height_to_timestamp_iter = height_to_timestamp.into_iter(); - let mut height_to_price_iter = height_to_price.map(|v| v.into_iter()); - let mut chain_state_iter = self.chain_state.into_iter(); - - (0..recovered.starting_height.to_usize()) - .map(|h| { - let h = Height::from(h); - BlockState { - supply: chain_state_iter.get_unwrap(h), - price: height_to_price_iter.as_mut().map(|v| *v.get_unwrap(h)), - timestamp: height_to_timestamp_iter.get_unwrap(h), - } - }) - .collect() - } else { - vec![] - }; - - info!( - "State recovery: {} at height {}", - if recovered.restored { "resumed from checkpoint" } else { "fresh start" }, - recovered.starting_height - ); - (recovered.starting_height, chain_state) + if recovered.starting_height.is_zero() { + info!("State recovery validation failed, falling back to fresh start"); + } + recovered.starting_height } - StartMode::Fresh => { - // Reset BytesVec state - self.txoutindex_to_txinindex.reset()?; + StartMode::Fresh => Height::ZERO, + }; - // Use reset_state for cohort and address state reset - let recovered = reset_state( - &mut self.any_address_indexes, - &mut self.addresses_data, - &mut self.utxo_cohorts, - &mut self.address_cohorts, - )?; + // Fresh start: reset all state + let (starting_height, mut chain_state) = if recovered_height.is_zero() { + self.chain_state.reset()?; + self.txoutindex_to_txinindex.reset()?; + self.height_to_unspendable_supply.reset()?; + self.height_to_opreturn_supply.reset()?; + self.addresstype_to_height_to_addr_count.reset()?; + self.addresstype_to_height_to_empty_addr_count.reset()?; + reset_state( + &mut self.any_address_indexes, + &mut self.addresses_data, + &mut self.utxo_cohorts, + &mut self.address_cohorts, + )?; - info!( - "State recovery: {} at height {}", - if recovered.restored { "resumed from checkpoint" } else { "fresh start" }, - recovered.starting_height - ); - (recovered.starting_height, vec![]) - } + info!("State recovery: fresh start"); + (Height::ZERO, vec![]) + } else { + // Recover chain_state from stored values + let height_to_timestamp = &indexes.height_to_timestamp_fixed; + let height_to_price = price.map(|p| &p.chainindexes_to_price_close.height); + + let mut height_to_timestamp_iter = height_to_timestamp.into_iter(); + let mut height_to_price_iter = height_to_price.map(|v| v.into_iter()); + let mut chain_state_iter = self.chain_state.into_iter(); + + let chain_state = (0..recovered_height.to_usize()) + .map(|h| { + let h = Height::from(h); + BlockState { + supply: chain_state_iter.get_unwrap(h), + price: height_to_price_iter.as_mut().map(|v| *v.get_unwrap(h)), + timestamp: height_to_timestamp_iter.get_unwrap(h), + } + }) + .collect(); + + info!( + "State recovery: resumed from checkpoint at height {}", + recovered_height + ); + (recovered_height, chain_state) }; // 2b. Validate computed versions diff --git a/crates/brk_indexer/Cargo.toml b/crates/brk_indexer/Cargo.toml index 4930d30d5..6e4e89d8d 100644 --- a/crates/brk_indexer/Cargo.toml +++ b/crates/brk_indexer/Cargo.toml @@ -29,4 +29,4 @@ vecdb = { workspace = true } [dev-dependencies] color-eyre = { workspace = true } -mimalloc = { version = "0.1.48", features = ["v3"] } +mimalloc = { workspace = true }