diff --git a/crates/brk_computer/src/grouped/price_percentiles.rs b/crates/brk_computer/src/grouped/price_percentiles.rs index 02de23099..430ab2908 100644 --- a/crates/brk_computer/src/grouped/price_percentiles.rs +++ b/crates/brk_computer/src/grouped/price_percentiles.rs @@ -94,8 +94,10 @@ impl Flushable for PricePercentiles { } Ok(()) } +} - fn safe_write(&mut self, exit: &Exit) -> Result<()> { +impl PricePercentiles { + pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { for vec in self.vecs.iter_mut().flatten() { if let Some(height_vec) = vec.height.as_mut() { height_vec.safe_write(exit)?; diff --git a/crates/brk_computer/src/stateful/address/type_index_map.rs b/crates/brk_computer/src/stateful/address/type_index_map.rs index 3caaa3f6f..8ae2652f1 100644 --- a/crates/brk_computer/src/stateful/address/type_index_map.rs +++ b/crates/brk_computer/src/stateful/address/type_index_map.rs @@ -66,14 +66,6 @@ impl AddressTypeToTypeIndexMap { self.get_mut(address_type).unwrap().insert(typeindex, value); } - /// Remove and return a value for a specific address type and typeindex. - pub fn remove_for_type(&mut self, address_type: OutputType, typeindex: &TypeIndex) -> T { - self.get_mut(address_type) - .unwrap() - .remove(typeindex) - .unwrap() - } - /// Iterate over sorted entries by address type. pub fn into_sorted_iter(self) -> impl Iterator)> { self.0.into_iter().map(|(output_type, map)| { @@ -122,26 +114,3 @@ where } } -impl AddressTypeToTypeIndexMap> { - /// Merge two maps of Vec values, concatenating vectors. - pub fn merge_vecs(mut self, other: Self) -> Self { - for (address_type, other_map) in other.0.into_iter() { - let self_map = self.0.get_mut_unwrap(address_type); - for (typeindex, mut other_vec) in other_map { - match self_map.entry(typeindex) { - Entry::Occupied(mut entry) => { - let self_vec = entry.get_mut(); - if other_vec.len() > self_vec.len() { - mem::swap(self_vec, &mut other_vec); - } - self_vec.extend(other_vec); - } - Entry::Vacant(entry) => { - entry.insert(other_vec); - } - } - } - } - self - } -} diff --git a/crates/brk_computer/src/stateful/cohorts/address.rs b/crates/brk_computer/src/stateful/cohorts/address.rs index 0c15d7791..17ac7cc8f 100644 --- a/crates/brk_computer/src/stateful/cohorts/address.rs +++ b/crates/brk_computer/src/stateful/cohorts/address.rs @@ -15,7 +15,7 @@ use crate::{ Indexes, grouped::{ComputedVecsFromHeight, Source, VecBuilderOptions}, indexes, price, - states::AddressCohortState, + stateful::cohorts::AddressCohortState, }; use super::super::metrics::{CohortMetrics, ImportConfig}; diff --git a/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs b/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs index 2b22224ff..78a700ec0 100644 --- a/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs +++ b/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs @@ -262,4 +262,10 @@ impl AddressCohorts { Ok(()) }) } + + /// Validate computed versions for all separate cohorts. + pub fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> { + self.par_iter_separate_mut() + .try_for_each(|v| v.validate_computed_versions(base_version)) + } } diff --git a/crates/brk_computer/src/stateful/cohorts/mod.rs b/crates/brk_computer/src/stateful/cohorts/mod.rs index bbf2c7b5c..65a34dd4b 100644 --- a/crates/brk_computer/src/stateful/cohorts/mod.rs +++ b/crates/brk_computer/src/stateful/cohorts/mod.rs @@ -9,14 +9,18 @@ mod address; mod address_cohorts; mod state; +mod state_address; +mod state_utxo; mod traits; mod utxo; mod utxo_cohorts; +pub use crate::states::{Flushable, HeightFlushable}; pub use address::AddressCohortVecs; pub use address_cohorts::AddressCohorts; -pub use crate::states::{Flushable, HeightFlushable}; pub use state::CohortState; +pub use state_address::AddressCohortState; +pub use state_utxo::UTXOCohortState; pub use traits::{CohortVecs, DynCohortVecs}; pub use utxo::UTXOCohortVecs; pub use utxo_cohorts::UTXOCohorts; diff --git a/crates/brk_computer/src/stateful/cohorts/state.rs b/crates/brk_computer/src/stateful/cohorts/state.rs index f09dfb679..0ba52f9e0 100644 --- a/crates/brk_computer/src/stateful/cohorts/state.rs +++ b/crates/brk_computer/src/stateful/cohorts/state.rs @@ -58,7 +58,7 @@ impl CohortState { } /// Reset price_to_amount if needed (for starting fresh). - pub fn reset_price_to_amount(&mut self) -> Result<()> { + pub fn reset_price_to_amount_if_needed(&mut self) -> Result<()> { if let Some(p) = self.price_to_amount.as_mut() { p.clean()?; p.init(); @@ -66,8 +66,18 @@ impl CohortState { Ok(()) } + /// Get first (lowest) price entry in distribution. + pub fn price_to_amount_first_key_value(&self) -> Option<(&Dollars, &Sats)> { + self.price_to_amount.u().first_key_value() + } + + /// Get last (highest) price entry in distribution. + pub fn price_to_amount_last_key_value(&self) -> Option<(&Dollars, &Sats)> { + self.price_to_amount.u().last_key_value() + } + /// Reset per-block values before processing next block. - pub fn reset_block_values(&mut self) { + pub fn reset_single_iteration_values(&mut self) { self.sent = Sats::ZERO; self.satdays_destroyed = Sats::ZERO; self.satblocks_destroyed = Sats::ZERO; @@ -88,6 +98,22 @@ impl CohortState { } } + /// Add supply with pre-computed realized cap (for address cohorts). + pub fn increment_( + &mut self, + supply: &SupplyState, + realized_cap: Dollars, + realized_price: Dollars, + ) { + self.supply += supply; + + if supply.value > Sats::ZERO + && let Some(realized) = self.realized.as_mut() { + realized.increment_(realized_cap); + self.price_to_amount.as_mut().unwrap().increment(realized_price, supply); + } + } + /// Remove supply from this cohort (e.g., when UTXO ages out of cohort). pub fn decrement(&mut self, supply: &SupplyState, price: Option) { self.supply -= supply; @@ -100,15 +126,56 @@ impl CohortState { } } + /// Remove supply with pre-computed realized cap (for address cohorts). + pub fn decrement_( + &mut self, + supply: &SupplyState, + realized_cap: Dollars, + realized_price: Dollars, + ) { + self.supply -= supply; + + if supply.value > Sats::ZERO + && let Some(realized) = self.realized.as_mut() { + realized.decrement_(realized_cap); + self.price_to_amount.as_mut().unwrap().decrement(realized_price, supply); + } + } + /// Process received output (new UTXO in cohort). pub fn receive(&mut self, supply: &SupplyState, price: Option) { + self.receive_( + supply, + price, + price.map(|price| (price, supply)), + None, + ); + } + + /// Process received output with custom price_to_amount updates (for address cohorts). + pub fn receive_( + &mut self, + supply: &SupplyState, + price: Option, + price_to_amount_increment: Option<(Dollars, &SupplyState)>, + price_to_amount_decrement: Option<(Dollars, &SupplyState)>, + ) { self.supply += supply; if supply.value > Sats::ZERO && let Some(realized) = self.realized.as_mut() { let price = price.unwrap(); realized.receive(supply, price); - self.price_to_amount.as_mut().unwrap().increment(price, supply); + + if let Some((price, supply)) = price_to_amount_increment + && supply.value.is_not_zero() { + self.price_to_amount.as_mut().unwrap().increment(price, supply); + } + + if let Some((price, supply)) = price_to_amount_decrement + && supply.value.is_not_zero() { + self.price_to_amount.as_mut().unwrap().decrement(price, supply); + } } } @@ -121,6 +188,31 @@ impl CohortState { blocks_old: usize, days_old: f64, older_than_hour: bool, + ) { + self.send_( + supply, + current_price, + prev_price, + blocks_old, + days_old, + older_than_hour, + None, + prev_price.map(|prev_price| (prev_price, supply)), + ); + } + + /// Process spent input with custom price_to_amount updates (for address cohorts). + #[allow(clippy::too_many_arguments)] + pub fn send_( + &mut self, + supply: &SupplyState, + current_price: Option, + prev_price: Option, + blocks_old: usize, + days_old: f64, + older_than_hour: bool, + price_to_amount_increment: Option<(Dollars, &SupplyState)>, + price_to_amount_decrement: Option<(Dollars, &SupplyState)>, ) { if supply.utxo_count == 0 { return; @@ -138,7 +230,16 @@ impl CohortState { let current_price = current_price.unwrap(); let prev_price = prev_price.unwrap(); realized.send(supply, current_price, prev_price, older_than_hour); - self.price_to_amount.as_mut().unwrap().decrement(prev_price, supply); + + if let Some((price, supply)) = price_to_amount_increment + && supply.value.is_not_zero() { + self.price_to_amount.as_mut().unwrap().increment(price, supply); + } + + if let Some((price, supply)) = price_to_amount_decrement + && supply.value.is_not_zero() { + self.price_to_amount.as_mut().unwrap().decrement(price, supply); + } } } } @@ -178,6 +279,15 @@ impl CohortState { result } + /// Compute unrealized profit/loss at current price (alias for compatibility). + pub fn compute_unrealized_states( + &self, + height_price: Dollars, + date_price: Option, + ) -> (UnrealizedState, Option) { + self.compute_unrealized(height_price, date_price) + } + /// Compute unrealized profit/loss at current price. pub fn compute_unrealized( &self, diff --git a/crates/brk_computer/src/stateful/cohorts/state_address.rs b/crates/brk_computer/src/stateful/cohorts/state_address.rs new file mode 100644 index 000000000..65bea84f4 --- /dev/null +++ b/crates/brk_computer/src/stateful/cohorts/state_address.rs @@ -0,0 +1,128 @@ +use std::path::Path; + +use brk_error::Result; +use brk_types::{Dollars, Height, LoadedAddressData, Sats}; + +use crate::SupplyState; + +use super::CohortState; + +#[derive(Clone)] +pub struct AddressCohortState { + pub addr_count: u64, + pub inner: CohortState, +} + +impl AddressCohortState { + pub fn new(path: &Path, name: &str, compute_dollars: bool) -> Self { + Self { + addr_count: 0, + inner: CohortState::new(path, name, compute_dollars), + } + } + + pub fn reset_price_to_amount_if_needed(&mut self) -> Result<()> { + self.inner.reset_price_to_amount_if_needed() + } + + pub fn reset_single_iteration_values(&mut self) { + self.inner.reset_single_iteration_values(); + } + + #[allow(clippy::too_many_arguments)] + pub fn send( + &mut self, + addressdata: &mut LoadedAddressData, + value: Sats, + current_price: Option, + prev_price: Option, + blocks_old: usize, + days_old: f64, + older_than_hour: bool, + ) -> Result<()> { + let compute_price = current_price.is_some(); + + let prev_realized_price = compute_price.then(|| addressdata.realized_price()); + let prev_supply_state = SupplyState { + utxo_count: addressdata.utxo_count() as u64, + value: addressdata.balance(), + }; + + addressdata.send(value, prev_price)?; + + let supply_state = SupplyState { + utxo_count: addressdata.utxo_count() as u64, + value: addressdata.balance(), + }; + + self.inner.send_( + &SupplyState { + utxo_count: 1, + value, + }, + current_price, + prev_price, + blocks_old, + days_old, + older_than_hour, + compute_price.then(|| (addressdata.realized_price(), &supply_state)), + prev_realized_price.map(|prev_price| (prev_price, &prev_supply_state)), + ); + + Ok(()) + } + + pub fn receive( + &mut self, + address_data: &mut LoadedAddressData, + value: Sats, + price: Option, + ) { + let compute_price = price.is_some(); + + let prev_realized_price = compute_price.then(|| address_data.realized_price()); + let prev_supply_state = SupplyState { + utxo_count: address_data.utxo_count() as u64, + value: address_data.balance(), + }; + + address_data.receive(value, price); + + let supply_state = SupplyState { + utxo_count: address_data.utxo_count() as u64, + value: address_data.balance(), + }; + + self.inner.receive_( + &SupplyState { + utxo_count: 1, + value, + }, + price, + compute_price.then(|| (address_data.realized_price(), &supply_state)), + prev_realized_price.map(|prev_price| (prev_price, &prev_supply_state)), + ); + } + + pub fn add(&mut self, addressdata: &LoadedAddressData) { + self.addr_count += 1; + self.inner.increment_( + &addressdata.into(), + addressdata.realized_cap, + addressdata.realized_price(), + ); + } + + pub fn subtract(&mut self, addressdata: &LoadedAddressData) { + self.addr_count = self.addr_count.checked_sub(1).unwrap(); + self.inner.decrement_( + &addressdata.into(), + addressdata.realized_cap, + addressdata.realized_price(), + ); + } + + pub fn commit(&mut self, height: Height) -> Result<()> { + self.inner.commit(height) + } +} diff --git a/crates/brk_computer/src/stateful/cohorts/state_utxo.rs b/crates/brk_computer/src/stateful/cohorts/state_utxo.rs new file mode 100644 index 000000000..c633ba41f --- /dev/null +++ b/crates/brk_computer/src/stateful/cohorts/state_utxo.rs @@ -0,0 +1,19 @@ +use std::path::Path; + +use brk_error::Result; +use derive_deref::{Deref, DerefMut}; + +use super::CohortState; + +#[derive(Clone, Deref, DerefMut)] +pub struct UTXOCohortState(CohortState); + +impl UTXOCohortState { + pub fn new(path: &Path, name: &str, compute_dollars: bool) -> Self { + Self(CohortState::new(path, name, compute_dollars)) + } + + pub fn reset_price_to_amount_if_needed(&mut self) -> Result<()> { + self.0.reset_price_to_amount_if_needed() + } +} diff --git a/crates/brk_computer/src/stateful/cohorts/utxo.rs b/crates/brk_computer/src/stateful/cohorts/utxo.rs index df44eb7b1..3572ab943 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo.rs @@ -9,10 +9,10 @@ use brk_types::{Bitcoin, DateIndex, Dollars, Height, Sats, Version}; use vecdb::{Database, Exit, IterableVec}; use crate::{ - Indexes, PriceToAmount, UTXOCohortState, + Indexes, PriceToAmount, grouped::{PERCENTILES, PERCENTILES_LEN}, indexes, price, - stateful::{CohortVecs, DynCohortVecs}, + stateful::{CohortVecs, DynCohortVecs, cohorts::UTXOCohortState}, }; use super::super::metrics::{CohortMetrics, ImportConfig}; diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs index df5b9bc4e..03e782158 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs @@ -16,7 +16,7 @@ use brk_traversable::Traversable; use brk_types::{Bitcoin, DateIndex, Dollars, HalvingEpoch, Height, OutputType, Sats, Version}; use derive_deref::{Deref, DerefMut}; use rayon::prelude::*; -use vecdb::{Database, Exit, GenericStoredVec, IterableVec}; +use vecdb::{Database, Exit, IterableVec}; use crate::{Indexes, indexes, price, stateful::DynCohortVecs}; @@ -462,4 +462,10 @@ impl UTXOCohorts { Ok(()) } + + /// Validate computed versions for all separate cohorts. + pub fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> { + self.par_iter_separate_mut() + .try_for_each(|v| v.validate_computed_versions(base_version)) + } } diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs index bc3479c86..b6c9b8361 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs @@ -19,6 +19,10 @@ impl UTXOCohorts { height_to_sent: FxHashMap, chain_state: &mut [BlockState], ) { + if chain_state.is_empty() { + return; + } + let UTXOGroups { all, term, @@ -65,8 +69,8 @@ impl UTXOCohorts { last_timestamp.difference_in_days_between_float(block_state.timestamp); let older_than_hour = last_timestamp .checked_sub(block_state.timestamp) - .unwrap() - .is_more_than_hour(); + .map(|d| d.is_more_than_hour()) + .unwrap_or(false); // Update time-based cohorts time_cohorts diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index 59cba840b..2e44d669c 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -13,9 +13,7 @@ use std::{mem, thread}; use brk_error::Result; use brk_grouper::ByAddressType; use brk_indexer::Indexer; -use brk_types::{ - DateIndex, Dollars, Height, OutputType, Sats, Timestamp, TypeIndex, -}; +use brk_types::{DateIndex, Height, OutputType, Sats, TypeIndex}; use log::info; use rayon::prelude::*; use vecdb::{AnyStoredVec, Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex}; @@ -29,7 +27,7 @@ use super::super::cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts}; use super::super::vecs::Vecs; use super::{ BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1, - BIP30_ORIGINAL_HEIGHT_2, FLUSH_INTERVAL, IndexerReaders, VecsReaders, + BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex, flush::flush_checkpoint as flush_checkpoint_full, }; @@ -53,13 +51,19 @@ pub fn process_blocks( chain_state: &mut Vec, exit: &Exit, ) -> Result<()> { - if starting_height > last_height { + // Create computation context with pre-computed vectors for thread-safe access + let ctx = ComputeContext::new(starting_height, last_height, indexes, price); + + if ctx.starting_height > ctx.last_height { return Ok(()); } info!( - "Processing blocks {} to {}...", - starting_height, last_height + "Processing blocks {} to {} (compute_dollars: {}, price_data: {})...", + ctx.starting_height, + ctx.last_height, + ctx.compute_dollars, + ctx.price.is_some() ); // References to vectors using correct field paths @@ -91,11 +95,9 @@ pub fn process_blocks( let height_to_price = price.map(|p| &p.chainindexes_to_price_close.height); let dateindex_to_price = price.map(|p| p.timeindexes_to_price_close.dateindex.u()); - // Collect price and timestamp vectors for process_sent (needs slice access, not iterators) - // These are used in the spawned thread and need to be separate from chain_state - let height_to_price_vec: Option> = - height_to_price.map(|v| v.into_iter().map(|d| *d).collect()); - let height_to_timestamp_vec: Vec = height_to_timestamp.into_iter().collect(); + // Access pre-computed vectors from context for thread-safe access + let height_to_price_vec = &ctx.height_to_price; + let height_to_timestamp_vec = &ctx.height_to_timestamp; // Create iterators for sequential access let mut height_to_first_txindex_iter = height_to_first_txindex.into_iter(); @@ -222,6 +224,10 @@ pub fn process_blocks( let timestamp = height_to_timestamp_iter.get_unwrap(height); let block_price = height_to_price_iter.as_mut().map(|v| *v.get_unwrap(height)); + // Debug validation: verify context methods match iterator values + debug_assert_eq!(ctx.timestamp_at(height), timestamp); + debug_assert_eq!(ctx.price_at(height), block_price); + // Build txindex mappings for this block let txoutindex_to_txindex = build_txoutindex_to_txindex(first_txindex, tx_count, &mut txindex_to_output_count_iter); @@ -382,7 +388,7 @@ pub fn process_blocks( &mut addresstype_to_addr_count, &mut addresstype_to_empty_addr_count, height_to_price_vec.as_deref(), - &height_to_timestamp_vec, + height_to_timestamp_vec, height, timestamp, ) diff --git a/crates/brk_computer/src/stateful/compute/context.rs b/crates/brk_computer/src/stateful/compute/context.rs index 433e5797f..e142d1fcf 100644 --- a/crates/brk_computer/src/stateful/compute/context.rs +++ b/crates/brk_computer/src/stateful/compute/context.rs @@ -3,7 +3,7 @@ use brk_types::{Dollars, Height, Timestamp}; use vecdb::VecIndex; -use crate::price; +use crate::{indexes, price}; /// Context shared across block processing. pub struct ComputeContext<'a> { @@ -27,6 +27,30 @@ pub struct ComputeContext<'a> { } impl<'a> ComputeContext<'a> { + /// Create a new computation context. + pub fn new( + starting_height: Height, + last_height: Height, + indexes: &indexes::Vecs, + price: Option<&'a price::Vecs>, + ) -> Self { + let height_to_timestamp: Vec = + indexes.height_to_timestamp_fixed.into_iter().collect(); + + let height_to_price: Option> = price + .map(|p| &p.chainindexes_to_price_close.height) + .map(|v| v.into_iter().map(|d| *d).collect()); + + Self { + starting_height, + last_height, + compute_dollars: price.is_some(), + price, + height_to_timestamp, + height_to_price, + } + } + /// Get price at height (None if no price data or height out of range). pub fn price_at(&self, height: Height) -> Option { self.height_to_price.as_ref()?.get(height.to_usize()).copied() diff --git a/crates/brk_computer/src/stateful/compute/mod.rs b/crates/brk_computer/src/stateful/compute/mod.rs index f8826c570..06b8bf76d 100644 --- a/crates/brk_computer/src/stateful/compute/mod.rs +++ b/crates/brk_computer/src/stateful/compute/mod.rs @@ -7,7 +7,7 @@ //! 4. Periodically flush to disk //! 5. Compute aggregate cohorts from separate cohorts -mod aggregates; +pub mod aggregates; mod block_loop; mod context; mod flush; @@ -19,9 +19,7 @@ pub use context::ComputeContext; pub use readers::{ IndexerReaders, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex, }; -pub use recover::{ - StartMode, determine_start_mode, -}; +pub use recover::{StartMode, determine_start_mode, recover_state, reset_state}; /// Flush checkpoint interval (every N blocks). pub const FLUSH_INTERVAL: usize = 10_000; diff --git a/crates/brk_computer/src/stateful/compute/recover.rs b/crates/brk_computer/src/stateful/compute/recover.rs index df53ea537..739648a27 100644 --- a/crates/brk_computer/src/stateful/compute/recover.rs +++ b/crates/brk_computer/src/stateful/compute/recover.rs @@ -7,10 +7,11 @@ use std::collections::BTreeSet; use brk_error::Result; use brk_types::Height; -use vecdb::{AnyVec, Stamp}; +use vecdb::Stamp; use super::super::address::AnyAddressIndexesVecs; -use super::super::cohorts::{DynCohortVecs, UTXOCohorts}; +use super::super::cohorts::{AddressCohorts, UTXOCohorts}; +use super::super::AddressesDataVecs; /// Result of state recovery. pub struct RecoveredState { @@ -20,39 +21,72 @@ pub struct RecoveredState { pub restored: bool, } -/// Determine starting height from vector lengths. -pub fn find_min_height( - utxo_vecs: &[&mut dyn DynCohortVecs], - address_vecs: &[&mut dyn DynCohortVecs], - chain_state_len: usize, - address_indexes_min_height: Height, - address_data_min_height: Height, - other_vec_lens: &[usize], -) -> Height { - let utxo_min = utxo_vecs - .iter() - .map(|v| Height::from(v.min_height_vecs_len())) - .min() - .unwrap_or_default(); +/// Perform state recovery for resuming from checkpoint. +/// +/// Rolls back state vectors and imports cohort states. +/// Returns the recovered state information. +pub fn recover_state( + height: Height, + any_address_indexes: &mut AnyAddressIndexesVecs, + addresses_data: &mut AddressesDataVecs, + utxo_cohorts: &mut UTXOCohorts, + address_cohorts: &mut AddressCohorts, +) -> Result { + let stamp = Stamp::from(height); - let address_min = address_vecs - .iter() - .map(|v| Height::from(v.min_height_vecs_len())) - .min() - .unwrap_or_default(); + // Rollback address state vectors + let address_indexes_rollback = any_address_indexes.rollback_before(stamp); + let address_data_rollback = addresses_data.rollback_before(stamp); - let other_min = other_vec_lens - .iter() - .map(|&len| Height::from(len)) - .min() - .unwrap_or_default(); + // Verify rollback consistency (uses rollback_states helper) + let _consistent_height = rollback_states( + stamp, + Ok(stamp), // chain_state handled separately + address_indexes_rollback, + address_data_rollback, + ); - utxo_min - .min(address_min) - .min(Height::from(chain_state_len)) - .min(address_indexes_min_height) - .min(address_data_min_height) - .min(other_min) + // Import cohort states + utxo_cohorts.import_separate_states(height); + address_cohorts.import_separate_states(height); + + // Import aggregate price_to_amount + let _ = import_aggregate_price_to_amount(height, utxo_cohorts)?; + + Ok(RecoveredState { + starting_height: height, + restored: true, + }) +} + +/// Reset all state for fresh start. +/// +/// Resets all state vectors and cohort states. +pub fn reset_state( + any_address_indexes: &mut AnyAddressIndexesVecs, + addresses_data: &mut AddressesDataVecs, + utxo_cohorts: &mut UTXOCohorts, + address_cohorts: &mut AddressCohorts, +) -> Result { + // Reset address state + any_address_indexes.reset()?; + addresses_data.reset()?; + + // Reset cohort state heights + utxo_cohorts.reset_separate_state_heights(); + address_cohorts.reset_separate_state_heights(); + + // Reset price_to_amount for all cohorts + utxo_cohorts.reset_separate_price_to_amount()?; + address_cohorts.reset_separate_price_to_amount()?; + + // Reset aggregate cohorts' price_to_amount + utxo_cohorts.reset_aggregate_price_to_amount()?; + + Ok(RecoveredState { + starting_height: Height::ZERO, + restored: false, + }) } /// Check if we can resume from a checkpoint or need to start fresh. @@ -76,7 +110,7 @@ pub enum StartMode { /// /// Returns the consistent starting height if all vectors agree, /// otherwise returns Height::ZERO (need fresh start). -pub fn rollback_states( +fn rollback_states( _stamp: Stamp, chain_state_rollback: vecdb::Result, address_indexes_rollbacks: Result>, @@ -107,32 +141,8 @@ pub fn rollback_states( } } -/// Import state for all separate cohorts. -/// -/// Returns the starting height if all imports succeed with the same height, -/// otherwise returns Height::ZERO. -pub fn import_cohort_states( - starting_height: Height, - cohorts: &mut [&mut dyn DynCohortVecs], -) -> Height { - if starting_height.is_zero() { - return Height::ZERO; - } - - let all_match = cohorts - .iter_mut() - .map(|v| v.import_state(starting_height).unwrap_or_default()) - .all(|h| h == starting_height); - - if all_match { - starting_height - } else { - Height::ZERO - } -} - /// Import aggregate price_to_amount for UTXO cohorts. -pub fn import_aggregate_price_to_amount( +fn import_aggregate_price_to_amount( starting_height: Height, utxo_cohorts: &mut UTXOCohorts, ) -> Result { @@ -149,24 +159,3 @@ pub fn import_aggregate_price_to_amount( }) } -/// Reset all state for fresh start. -pub fn reset_all_state( - address_indexes: &mut AnyAddressIndexesVecs, - utxo_vecs: &mut [&mut dyn DynCohortVecs], - address_vecs: &mut [&mut dyn DynCohortVecs], - utxo_cohorts: &mut UTXOCohorts, -) -> Result<()> { - address_indexes.reset()?; - - for v in utxo_vecs.iter_mut() { - v.reset_state_starting_height(); - } - - for v in address_vecs.iter_mut() { - v.reset_state_starting_height(); - } - - utxo_cohorts.reset_aggregate_price_to_amount()?; - - Ok(()) -} diff --git a/crates/brk_computer/src/stateful/metrics/mod.rs b/crates/brk_computer/src/stateful/metrics/mod.rs index c2e0c9d14..93e3733e8 100644 --- a/crates/brk_computer/src/stateful/metrics/mod.rs +++ b/crates/brk_computer/src/stateful/metrics/mod.rs @@ -30,7 +30,7 @@ use brk_traversable::Traversable; use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version}; use vecdb::{Exit, IterableVec}; -use crate::{Indexes, indexes, price, states::CohortState}; +use crate::{Indexes, indexes, price, stateful::cohorts::CohortState}; /// All metrics for a cohort, organized by category. #[derive(Clone, Traversable)] diff --git a/crates/brk_computer/src/stateful/metrics/price_paid.rs b/crates/brk_computer/src/stateful/metrics/price_paid.rs index cc3006670..32dc21674 100644 --- a/crates/brk_computer/src/stateful/metrics/price_paid.rs +++ b/crates/brk_computer/src/stateful/metrics/price_paid.rs @@ -10,7 +10,8 @@ use vecdb::{AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVe use crate::{ Indexes, grouped::{ComputedVecsFromHeight, PricePercentiles, Source, VecBuilderOptions}, - states::{CohortState, Flushable}, + stateful::cohorts::CohortState, + states::Flushable, }; use super::ImportConfig; diff --git a/crates/brk_computer/src/stateful/process/with_source.rs b/crates/brk_computer/src/stateful/process/with_source.rs index 5f9a6beea..60b5fa08d 100644 --- a/crates/brk_computer/src/stateful/process/with_source.rs +++ b/crates/brk_computer/src/stateful/process/with_source.rs @@ -24,12 +24,6 @@ impl WithAddressDataSource { pub fn is_from_emptyaddressdata(&self) -> bool { matches!(self, Self::FromEmpty(..)) } - - pub fn deref_mut(&mut self) -> &mut T { - match self { - Self::New(v) | Self::FromLoaded(_, v) | Self::FromEmpty(_, v) => v, - } - } } impl std::ops::Deref for WithAddressDataSource { diff --git a/crates/brk_computer/src/stateful/vecs.rs b/crates/brk_computer/src/stateful/vecs.rs index 05c95d2be..e555203e6 100644 --- a/crates/brk_computer/src/stateful/vecs.rs +++ b/crates/brk_computer/src/stateful/vecs.rs @@ -3,9 +3,10 @@ use std::path::Path; use brk_error::Result; +use log::info; use brk_indexer::Indexer; use brk_traversable::Traversable; -use brk_types::{Dollars, Height, Sats, StoredU64, TxInIndex, TxOutIndex, Version}; +use brk_types::{Dollars, EmptyAddressData, EmptyAddressIndex, Height, LoadedAddressData, LoadedAddressIndex, Sats, StoredU64, TxInIndex, TxOutIndex, Version}; use vecdb::{ AnyStoredVec, BytesVec, Database, EagerVec, Exit, ImportableVec, IterableCloneableVec, LazyVecFrom1, PAGE_SIZE, PcoVec, @@ -13,7 +14,7 @@ use vecdb::{ use crate::{ Indexes, SupplyState, chain, - grouped::{ComputedVecsFromDateIndex, ComputedVecsFromHeight, Source, VecBuilderOptions}, + grouped::{ComputedValueVecsFromHeight, ComputedVecsFromDateIndex, ComputedVecsFromHeight, Source, VecBuilderOptions}, indexes, price, utils::OptionExt, }; @@ -21,6 +22,7 @@ use crate::{ use super::{ AddressCohorts, AddressesDataVecs, AnyAddressIndexesVecs, UTXOCohorts, address::{AddressTypeToHeightToAddressCount, AddressTypeToIndexesToAddressCount}, + compute::aggregates, }; const VERSION: Version = Version::new(21); @@ -51,10 +53,16 @@ pub struct Vecs { // --- pub addresstype_to_indexes_to_addr_count: AddressTypeToIndexesToAddressCount, pub addresstype_to_indexes_to_empty_addr_count: AddressTypeToIndexesToAddressCount, + pub indexes_to_unspendable_supply: ComputedValueVecsFromHeight, + pub indexes_to_opreturn_supply: ComputedValueVecsFromHeight, pub indexes_to_addr_count: ComputedVecsFromHeight, pub indexes_to_empty_addr_count: ComputedVecsFromHeight, pub height_to_market_cap: Option>, pub indexes_to_market_cap: Option>, + pub loadedaddressindex_to_loadedaddressindex: + LazyVecFrom1, + pub emptyaddressindex_to_emptyaddressindex: + LazyVecFrom1, } const SAVED_STAMPED_CHANGES: u16 = 10; @@ -79,6 +87,30 @@ impl Vecs { let utxo_cohorts = UTXOCohorts::forced_import(&db, version, indexes, price, &states_path)?; + // Create address data BytesVecs first so we can also use them for identity mappings + let loadedaddressindex_to_loadedaddressdata = BytesVec::forced_import_with( + vecdb::ImportOptions::new(&db, "loadedaddressdata", v0) + .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), + )?; + let emptyaddressindex_to_emptyaddressdata = BytesVec::forced_import_with( + vecdb::ImportOptions::new(&db, "emptyaddressdata", v0) + .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), + )?; + + // Identity mappings for traversable + let loadedaddressindex_to_loadedaddressindex = LazyVecFrom1::init( + "loadedaddressindex", + v0, + loadedaddressindex_to_loadedaddressdata.boxed_clone(), + |index, _| Some(index), + ); + let emptyaddressindex_to_emptyaddressindex = LazyVecFrom1::init( + "emptyaddressindex", + v0, + emptyaddressindex_to_emptyaddressdata.boxed_clone(), + |index, _| Some(index), + ); + Ok(Self { chain_state: BytesVec::forced_import_with( vecdb::ImportOptions::new(&db, "chain", v0) @@ -90,7 +122,25 @@ impl Vecs { )?, height_to_unspendable_supply: EagerVec::forced_import(&db, "unspendable_supply", v0)?, + indexes_to_unspendable_supply: ComputedValueVecsFromHeight::forced_import( + &db, + "unspendable_supply", + Source::None, + v0, + VecBuilderOptions::default().add_last(), + compute_dollars, + indexes, + )?, height_to_opreturn_supply: EagerVec::forced_import(&db, "opreturn_supply", v0)?, + indexes_to_opreturn_supply: ComputedValueVecsFromHeight::forced_import( + &db, + "opreturn_supply", + Source::None, + v0, + VecBuilderOptions::default().add_last(), + compute_dollars, + indexes, + )?, indexes_to_addr_count: ComputedVecsFromHeight::forced_import( &db, @@ -166,7 +216,12 @@ impl Vecs { )?, any_address_indexes: AnyAddressIndexesVecs::forced_import(&db, v0)?, - addresses_data: AddressesDataVecs::forced_import(&db, v0)?, + addresses_data: AddressesDataVecs { + loaded: loadedaddressindex_to_loadedaddressdata, + empty: emptyaddressindex_to_emptyaddressdata, + }, + loadedaddressindex_to_loadedaddressindex, + emptyaddressindex_to_emptyaddressindex, db, }) @@ -190,7 +245,9 @@ impl Vecs { starting_indexes: &mut Indexes, exit: &Exit, ) -> Result<()> { - use super::compute::{StartMode, determine_start_mode, process_blocks}; + use super::compute::{ + StartMode, determine_start_mode, process_blocks, recover_state, reset_state, + }; use crate::states::BlockState; use vecdb::{AnyVec, GenericStoredVec, Stamp, TypedVecIterator, VecIndex}; @@ -210,28 +267,28 @@ impl Vecs { .min(Height::from(self.height_to_unspendable_supply.len())) .min(Height::from(self.height_to_opreturn_supply.len())); - // 2. Determine start mode and recover state + // 2. Determine start mode and recover/reset state let start_mode = determine_start_mode(stateful_min, chain_state_height); let (starting_height, mut chain_state) = match start_mode { StartMode::Resume(height) => { let stamp = Stamp::from(height); - // Rollback state vectors + // Rollback BytesVec state (not handled by recover_state) let _ = self.chain_state.rollback_before(stamp); let _ = self.txoutindex_to_txinindex.rollback_before(stamp); - let _ = self.any_address_indexes.rollback_before(stamp); - let _ = self.addresses_data.rollback_before(stamp); - // Import cohort states - self.utxo_cohorts.import_separate_states(height); - self.address_cohorts.import_separate_states(height); - - // Import aggregate price_to_amount - let _ = self.utxo_cohorts.import_aggregate_price_to_amount(height); + // Use recover_state for address and cohort state recovery + let recovered = recover_state( + height, + &mut self.any_address_indexes, + &mut self.addresses_data, + &mut self.utxo_cohorts, + &mut self.address_cohorts, + )?; // Recover chain_state from stored values - let chain_state = if !height.is_zero() { + let chain_state = if !recovered.starting_height.is_zero() { let height_to_timestamp = &indexes.height_to_timestamp_fixed; let height_to_price = price.map(|p| &p.chainindexes_to_price_close.height); @@ -239,7 +296,7 @@ impl Vecs { let mut height_to_price_iter = height_to_price.map(|v| v.into_iter()); let mut chain_state_iter = self.chain_state.into_iter(); - (0..height.to_usize()) + (0..recovered.starting_height.to_usize()) .map(|h| { let h = Height::from(h); BlockState { @@ -253,29 +310,39 @@ impl Vecs { vec![] }; - (height, chain_state) + info!( + "State recovery: {} at height {}", + if recovered.restored { "resumed from checkpoint" } else { "fresh start" }, + recovered.starting_height + ); + (recovered.starting_height, chain_state) } StartMode::Fresh => { - // Reset all state + // Reset BytesVec state self.txoutindex_to_txinindex.reset()?; - self.any_address_indexes.reset()?; - self.addresses_data.reset()?; - // Reset state heights - self.utxo_cohorts.reset_separate_state_heights(); - self.address_cohorts.reset_separate_state_heights(); + // Use reset_state for cohort and address state reset + let recovered = reset_state( + &mut self.any_address_indexes, + &mut self.addresses_data, + &mut self.utxo_cohorts, + &mut self.address_cohorts, + )?; - // Reset price_to_amount for all separate cohorts - self.utxo_cohorts.reset_separate_price_to_amount()?; - self.address_cohorts.reset_separate_price_to_amount()?; - - // Reset aggregate cohorts' price_to_amount - self.utxo_cohorts.reset_aggregate_price_to_amount()?; - - (Height::ZERO, vec![]) + info!( + "State recovery: {} at height {}", + if recovered.restored { "resumed from checkpoint" } else { "fresh start" }, + recovered.starting_height + ); + (recovered.starting_height, vec![]) } }; + // 2b. Validate computed versions + let base_version = VERSION; + self.utxo_cohorts.validate_computed_versions(base_version)?; + self.address_cohorts.validate_computed_versions(base_version)?; + // 3. Get last height from indexer let last_height = Height::from( indexer @@ -302,16 +369,22 @@ impl Vecs { } // 5. Compute aggregates (overlapping cohorts from separate cohorts) - self.utxo_cohorts - .compute_overlapping_vecs(starting_indexes, exit)?; - self.address_cohorts - .compute_overlapping_vecs(starting_indexes, exit)?; + aggregates::compute_overlapping( + &mut self.utxo_cohorts, + &mut self.address_cohorts, + starting_indexes, + exit, + )?; // 6. Compute rest part1 (dateindex mappings) - self.utxo_cohorts - .compute_rest_part1(indexes, price, starting_indexes, exit)?; - self.address_cohorts - .compute_rest_part1(indexes, price, starting_indexes, exit)?; + aggregates::compute_rest_part1( + &mut self.utxo_cohorts, + &mut self.address_cohorts, + indexes, + price, + starting_indexes, + exit, + )?; // 7. Compute indexes_to_market_cap from dateindex supply if let Some(indexes_to_market_cap) = self.indexes_to_market_cap.as_mut() { @@ -336,6 +409,22 @@ impl Vecs { })?; } + // 7b. Compute indexes for unspendable and opreturn supply + self.indexes_to_unspendable_supply.compute_rest( + indexes, + price, + starting_indexes, + exit, + Some(&self.height_to_unspendable_supply), + )?; + self.indexes_to_opreturn_supply.compute_rest( + indexes, + price, + starting_indexes, + exit, + Some(&self.height_to_opreturn_supply), + )?; + // 8. Compute rest part2 (relative metrics) let height_to_supply = &self .utxo_cohorts @@ -385,20 +474,9 @@ impl Vecs { let height_to_realized_cap_ref = height_to_realized_cap.as_ref(); let dateindex_to_realized_cap_ref = dateindex_to_realized_cap.as_ref(); - self.utxo_cohorts.compute_rest_part2( - indexes, - price, - starting_indexes, - height_to_supply, - dateindex_to_supply_ref, - height_to_market_cap_ref, - dateindex_to_market_cap_ref, - height_to_realized_cap_ref, - dateindex_to_realized_cap_ref, - exit, - )?; - - self.address_cohorts.compute_rest_part2( + aggregates::compute_rest_part2( + &mut self.utxo_cohorts, + &mut self.address_cohorts, indexes, price, starting_indexes, diff --git a/crates/brk_computer/src/states/cohorts/common.rs b/crates/brk_computer/src/states/cohorts/common.rs index df99ad6f6..dc0a49788 100644 --- a/crates/brk_computer/src/states/cohorts/common.rs +++ b/crates/brk_computer/src/states/cohorts/common.rs @@ -4,9 +4,9 @@ use brk_error::Result; use brk_types::{CheckedSub, Dollars, Height, Sats}; use crate::{ + PriceToAmount, RealizedState, SupplyState, UnrealizedState, grouped::{PERCENTILES, PERCENTILES_LEN}, utils::OptionExt, - PriceToAmount, RealizedState, SupplyState, UnrealizedState, }; #[derive(Clone)] @@ -337,12 +337,7 @@ impl CohortState { update_state(price, height_price, sats, &mut height_unrealized_state); if let Some(date_price) = date_price { - update_state( - price, - date_price, - sats, - date_unrealized_state.um(), - ) + update_state(price, date_price, sats, date_unrealized_state.um()) } }); diff --git a/crates/brk_computer/src/states/flushable.rs b/crates/brk_computer/src/states/flushable.rs index 449e93f39..e97180dbe 100644 --- a/crates/brk_computer/src/states/flushable.rs +++ b/crates/brk_computer/src/states/flushable.rs @@ -12,11 +12,8 @@ use vecdb::Exit; /// /// This is for simple flush operations that don't require height tracking. pub trait Flushable { - /// Safely flush data to disk. + /// Safely flush data to disk with fsync for durability. fn safe_flush(&mut self, exit: &Exit) -> Result<()>; - - /// Write to mmap without fsync. Data visible to readers immediately but not durable. - fn safe_write(&mut self, exit: &Exit) -> Result<()>; } /// Trait for stateful components that track data indexed by height. @@ -45,13 +42,6 @@ impl Flushable for Option { } Ok(()) } - - fn safe_write(&mut self, exit: &Exit) -> Result<()> { - if let Some(inner) = self.as_mut() { - inner.safe_write(exit)?; - } - Ok(()) - } } /// Blanket implementation for Option where T: HeightFlushable diff --git a/crates/brk_computer/src/states/mod.rs b/crates/brk_computer/src/states/mod.rs index 2eed18059..2619d068f 100644 --- a/crates/brk_computer/src/states/mod.rs +++ b/crates/brk_computer/src/states/mod.rs @@ -1,5 +1,5 @@ mod block; -mod cohorts; +// mod cohorts; mod flushable; mod price_to_amount; mod realized; @@ -8,7 +8,7 @@ mod transacted; mod unrealized; pub use block::*; -pub use cohorts::*; +// pub use cohorts::*; pub use flushable::*; pub use price_to_amount::*; pub use realized::*;