diff --git a/crates/brk_computer/src/grouped/price_percentiles.rs b/crates/brk_computer/src/grouped/price_percentiles.rs index c9072c9bb..c370ea94f 100644 --- a/crates/brk_computer/src/grouped/price_percentiles.rs +++ b/crates/brk_computer/src/grouped/price_percentiles.rs @@ -1,7 +1,7 @@ use brk_error::Result; use brk_traversable::{Traversable, TreeNode}; use brk_types::{Dollars, Height, Version}; -use vecdb::{AnyExportableVec, Database, EagerVec, Exit, GenericStoredVec, PcoVec}; +use vecdb::{AnyExportableVec, AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, PcoVec}; use crate::{Indexes, indexes}; @@ -83,6 +83,15 @@ impl PricePercentiles { .position(|&p| p == percentile) .and_then(|i| self.vecs[i].as_ref()) } + + pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { + for vec in self.vecs.iter_mut().flatten() { + if let Some(height_vec) = vec.height.as_mut() { + height_vec.safe_flush(exit)?; + } + } + Ok(()) + } } impl Traversable for PricePercentiles { diff --git a/crates/brk_computer/src/stateful/address_cohorts.rs b/crates/brk_computer/src/stateful/address_cohorts.rs index fd814ba05..ff3f22766 100644 --- a/crates/brk_computer/src/stateful/address_cohorts.rs +++ b/crates/brk_computer/src/stateful/address_cohorts.rs @@ -5,6 +5,7 @@ use brk_grouper::{AddressGroups, AmountFilter, Filter, Filtered}; use brk_traversable::Traversable; use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version}; use derive_deref::{Deref, DerefMut}; +use rayon::prelude::*; use vecdb::{Database, Exit, IterableVec}; use crate::{ @@ -132,7 +133,7 @@ impl Vecs { } pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { - self.iter_separate_mut() + self.par_iter_separate_mut() .try_for_each(|v| v.safe_flush_stateful_vecs(height, exit)) } } diff --git a/crates/brk_computer/src/stateful/common.rs b/crates/brk_computer/src/stateful/common.rs index ccf676f3b..854396be2 100644 --- a/crates/brk_computer/src/stateful/common.rs +++ b/crates/brk_computer/src/stateful/common.rs @@ -1011,6 +1011,10 @@ impl Vecs { .um() .safe_flush(exit)?; } + + if let Some(price_percentiles) = self.price_percentiles.as_mut() { + price_percentiles.safe_flush(exit)?; + } } state.commit(height)?; diff --git a/crates/brk_computer/src/stateful/mod.rs b/crates/brk_computer/src/stateful/mod.rs index 208fe380d..df37cdbd4 100644 --- a/crates/brk_computer/src/stateful/mod.rs +++ b/crates/brk_computer/src/stateful/mod.rs @@ -517,11 +517,10 @@ impl Vecs { let starting_height = { drop(separate_utxo_vecs); drop(separate_address_vecs); - let result = if starting_height.is_not_zero() - && self - .utxo_cohorts - .import_aggregate_price_to_amount(starting_height)? - == starting_height + let imported_height = self + .utxo_cohorts + .import_aggregate_price_to_amount(starting_height)?; + let result = if starting_height.is_not_zero() && imported_height == starting_height { starting_height } else { @@ -755,15 +754,6 @@ impl Vecs { let typeindex = txoutindex_to_typeindex .read_unwrap(txoutindex, &ir.txoutindex_to_typeindex); - // Debug: track the exact bad typeindex - let ti: usize = typeindex.into(); - if ti == 254909199 { - eprintln!( - "DEBUG outputs EXACT: output_type={:?}, txoutindex={}, typeindex={}", - output_type, txoutindex.to_usize(), ti - ); - } - let addressdata_opt = Self::get_addressdatawithsource( output_type, typeindex, @@ -795,15 +785,8 @@ impl Vecs { transacted.iterate(value, output_type); if let Some((typeindex, addressdata_opt)) = typeindex_with_addressdata_opt { - let ti: usize = typeindex.into(); - if ti == 254909199 { - eprintln!("DEBUG fold outputs EXACT: output_type={:?}, typeindex={}, has_addressdata={}", output_type, ti, addressdata_opt.is_some()); - } if let Some(addressdata) = addressdata_opt { - if ti == 254909199 { - eprintln!("DEBUG fold inserting to addressdatawithsource EXACT: output_type={:?}, is_new={}", output_type, addressdata.is_new()); - } addresstype_to_typeindex_to_addressdatawithsource .insert_for_type(output_type, typeindex, addressdata); } @@ -874,15 +857,6 @@ impl Vecs { let typeindex = txoutindex_to_typeindex .read_unwrap(txoutindex, &ir.txoutindex_to_typeindex); - // Debug: track the exact bad typeindex - let ti: usize = typeindex.into(); - if ti == 254909199 { - eprintln!( - "DEBUG inputs EXACT: input_type={:?}, txoutindex={}, typeindex={}", - input_type, txoutindex.to_usize(), ti - ); - } - let addressdata_opt = Self::get_addressdatawithsource( input_type, typeindex, @@ -932,14 +906,7 @@ impl Vecs { if let Some((typeindex, addressdata_opt)) = typeindex_with_addressdata_opt { - let ti: usize = typeindex.into(); - if ti == 254909199 { - eprintln!("DEBUG fold inputs EXACT: output_type={:?}, typeindex={}, has_addressdata={}", output_type, ti, addressdata_opt.is_some()); - } if let Some(addressdata) = addressdata_opt { - if ti == 254909199 { - eprintln!("DEBUG fold inputs inserting EXACT: output_type={:?}, is_new={}", output_type, addressdata.is_new()); - } addresstype_to_typeindex_to_addressdatawithsource .insert_for_type(output_type, typeindex, addressdata); } @@ -1405,16 +1372,8 @@ impl Vecs { any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, ) -> Option> { - let typeindex_usize: usize = typeindex.into(); - if typeindex_usize == 254909199 { - eprintln!("DEBUG get_addressdatawithsource EXACT: address_type={:?}, typeindex={}", address_type, typeindex_usize); - } let first = *first_addressindexes.get(address_type).unwrap(); if first <= typeindex { - let first_usize: usize = first.into(); - if typeindex_usize == 254909199 { - eprintln!("DEBUG get_addressdatawithsource returning New EXACT: address_type={:?}, first={}", address_type, first_usize); - } return Some(WithAddressDataSource::New(LoadedAddressData::default())); } @@ -1478,12 +1437,9 @@ impl Vecs { ) -> Result<()> { info!("Flushing..."); - self.utxo_cohorts - .par_iter_separate_mut() - .try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?; + self.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?; self.address_cohorts - .par_iter_separate_mut() - .try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?; + .safe_flush_stateful_vecs(height, exit)?; self.height_to_unspendable_supply.safe_flush(exit)?; self.height_to_opreturn_supply.safe_flush(exit)?; self.addresstype_to_height_to_addr_count @@ -1509,10 +1465,6 @@ impl Vecs { let anyaddressindex = AnyAddressIndex::from(emptyaddressindex); - let ti: usize = typeindex.into(); - if address_type == OutputType::P2SH && ti > 30_000_000 { - eprintln!("DEBUG insert1 (empty New): P2SH typeindex={}", ti); - } addresstype_to_typeindex_to_new_or_updated_anyaddressindex .get_mut(address_type) .unwrap() @@ -1538,10 +1490,6 @@ impl Vecs { let anyaddressindex = emptyaddressindex.into(); - let ti: usize = typeindex.into(); - if address_type == OutputType::P2SH && ti > 30_000_000 { - eprintln!("DEBUG insert2 (empty FromLoaded): P2SH typeindex={}", ti); - } addresstype_to_typeindex_to_new_or_updated_anyaddressindex .get_mut(address_type) .unwrap() @@ -1564,10 +1512,6 @@ impl Vecs { let anyaddressindex = AnyAddressIndex::from(loadedaddressindex); - let ti: usize = typeindex.into(); - if address_type == OutputType::P2SH && ti > 30_000_000 { - eprintln!("DEBUG insert3 (loaded New): P2SH typeindex={}", ti); - } addresstype_to_typeindex_to_new_or_updated_anyaddressindex .get_mut(address_type) .unwrap() @@ -1593,10 +1537,6 @@ impl Vecs { let anyaddressindex = loadedaddressindex.into(); - let ti: usize = typeindex.into(); - if address_type == OutputType::P2SH && ti > 30_000_000 { - eprintln!("DEBUG insert4 (loaded FromEmpty): P2SH typeindex={}", ti); - } addresstype_to_typeindex_to_new_or_updated_anyaddressindex .get_mut(address_type) .unwrap() @@ -1610,14 +1550,6 @@ impl Vecs { addresstype_to_typeindex_to_new_or_updated_anyaddressindex.into_sorted_iter() { for (typeindex, anyaddressindex) in sorted { - // Debug: log right before the call that fails - let typeindex_usize: usize = typeindex.into(); - if address_type == OutputType::P2SH && typeindex_usize > 30_000_000 { - eprintln!( - "DEBUG flush update_or_push: address_type={:?}, typeindex={}, anyaddressindex={:?}", - address_type, typeindex_usize, anyaddressindex - ); - } self.any_address_indexes.update_or_push( address_type, typeindex, diff --git a/crates/brk_computer/src/stateful/transaction_processing.rs b/crates/brk_computer/src/stateful/transaction_processing.rs index 380bc7e34..c425c0d0d 100644 --- a/crates/brk_computer/src/stateful/transaction_processing.rs +++ b/crates/brk_computer/src/stateful/transaction_processing.rs @@ -33,11 +33,6 @@ impl AddressTypeToVec<(TypeIndex, Sats)> { ) { self.unwrap().into_iter().for_each(|(_type, vec)| { vec.into_iter().for_each(|(type_index, value)| { - let type_index_usize: usize = type_index.into(); - if type_index_usize == 254909199 { - eprintln!("DEBUG process_received EXACT: _type={:?}, type_index={}", _type, type_index_usize); - } - let mut is_new = false; let mut from_any_empty = false; @@ -52,18 +47,12 @@ impl AddressTypeToVec<(TypeIndex, Sats)> { .remove(&type_index) .map(|ad| { from_any_empty = true; - if type_index_usize == 254909199 { - eprintln!("DEBUG process_received from_empty EXACT: _type={:?}, is_new={}", _type, ad.is_new()); - } ad.into() }) .unwrap_or_else(|| { let addressdata = stored_or_new_addresstype_to_typeindex_to_addressdatawithsource .remove_for_type(_type, &type_index); - if type_index_usize == 254909199 { - eprintln!("DEBUG process_received from_stored_or_new EXACT: _type={:?}, is_new={}", _type, addressdata.is_new()); - } is_new = addressdata.is_new(); from_any_empty = addressdata.is_from_emptyaddressdata(); addressdata @@ -156,11 +145,6 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> { v.unwrap().into_iter().try_for_each(|(_type, vec)| { vec.into_iter().try_for_each(|(type_index, value)| { - let type_index_usize: usize = type_index.into(); - if type_index_usize == 254909199 { - eprintln!("DEBUG process_sent EXACT: _type={:?}, type_index={}", _type, type_index_usize); - } - let typeindex_to_loadedaddressdata = addresstype_to_typeindex_to_loadedaddressdata.get_mut_unwrap(_type); @@ -202,9 +186,6 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> { let addressdata = typeindex_to_loadedaddressdata.remove(&type_index).unwrap(); - if type_index_usize == 254909199 { - eprintln!("DEBUG process_sent will_be_empty EXACT: _type={:?}", _type); - } addresstype_to_typeindex_to_emptyaddressdata .get_mut(_type) .unwrap() diff --git a/crates/brk_computer/src/stateful/utxo_cohorts.rs b/crates/brk_computer/src/stateful/utxo_cohorts.rs index 0b33226fb..61d622465 100644 --- a/crates/brk_computer/src/stateful/utxo_cohorts.rs +++ b/crates/brk_computer/src/stateful/utxo_cohorts.rs @@ -8,8 +8,8 @@ use brk_grouper::{ }; use brk_traversable::Traversable; use brk_types::{ - Bitcoin, CheckedSub, DateIndex, Dollars, HalvingEpoch, Height, OutputType, Sats, Timestamp, - Version, ONE_DAY_IN_SEC, + Bitcoin, CheckedSub, DateIndex, Dollars, HalvingEpoch, Height, ONE_DAY_IN_SEC, OutputType, + Sats, Timestamp, Version, }; use derive_deref::{Deref, DerefMut}; use rayon::prelude::*; @@ -41,12 +41,17 @@ impl Vecs { let v = version + VERSION + Version::ZERO; // Helper to create a cohort - booleans are now derived from filter - let create = - |filter: Filter, state_level: StateLevel| -> Result { - utxo_cohort::Vecs::forced_import( - db, filter, v, indexes, price, states_path, state_level, - ) - }; + let create = |filter: Filter, state_level: StateLevel| -> Result { + utxo_cohort::Vecs::forced_import( + db, + filter, + v, + indexes, + price, + states_path, + state_level, + ) + }; let full = |f: Filter| create(f, StateLevel::Full); let none = |f: Filter| create(f, StateLevel::None); @@ -158,19 +163,45 @@ impl Vecs { amount_range: ByAmountRange { _0sats: full(Filter::Amount(AmountFilter::LowerThan(Sats::_1)))?, _1sat_to_10sats: full(Filter::Amount(AmountFilter::Range(Sats::_1..Sats::_10)))?, - _10sats_to_100sats: full(Filter::Amount(AmountFilter::Range(Sats::_10..Sats::_100)))?, - _100sats_to_1k_sats: full(Filter::Amount(AmountFilter::Range(Sats::_100..Sats::_1K)))?, - _1k_sats_to_10k_sats: full(Filter::Amount(AmountFilter::Range(Sats::_1K..Sats::_10K)))?, - _10k_sats_to_100k_sats: full(Filter::Amount(AmountFilter::Range(Sats::_10K..Sats::_100K)))?, - _100k_sats_to_1m_sats: full(Filter::Amount(AmountFilter::Range(Sats::_100K..Sats::_1M)))?, - _1m_sats_to_10m_sats: full(Filter::Amount(AmountFilter::Range(Sats::_1M..Sats::_10M)))?, - _10m_sats_to_1btc: full(Filter::Amount(AmountFilter::Range(Sats::_10M..Sats::_1BTC)))?, - _1btc_to_10btc: full(Filter::Amount(AmountFilter::Range(Sats::_1BTC..Sats::_10BTC)))?, - _10btc_to_100btc: full(Filter::Amount(AmountFilter::Range(Sats::_10BTC..Sats::_100BTC)))?, - _100btc_to_1k_btc: full(Filter::Amount(AmountFilter::Range(Sats::_100BTC..Sats::_1K_BTC)))?, - _1k_btc_to_10k_btc: full(Filter::Amount(AmountFilter::Range(Sats::_1K_BTC..Sats::_10K_BTC)))?, - _10k_btc_to_100k_btc: full(Filter::Amount(AmountFilter::Range(Sats::_10K_BTC..Sats::_100K_BTC)))?, - _100k_btc_or_more: full(Filter::Amount(AmountFilter::GreaterOrEqual(Sats::_100K_BTC)))?, + _10sats_to_100sats: full(Filter::Amount(AmountFilter::Range( + Sats::_10..Sats::_100, + )))?, + _100sats_to_1k_sats: full(Filter::Amount(AmountFilter::Range( + Sats::_100..Sats::_1K, + )))?, + _1k_sats_to_10k_sats: full(Filter::Amount(AmountFilter::Range( + Sats::_1K..Sats::_10K, + )))?, + _10k_sats_to_100k_sats: full(Filter::Amount(AmountFilter::Range( + Sats::_10K..Sats::_100K, + )))?, + _100k_sats_to_1m_sats: full(Filter::Amount(AmountFilter::Range( + Sats::_100K..Sats::_1M, + )))?, + _1m_sats_to_10m_sats: full(Filter::Amount(AmountFilter::Range( + Sats::_1M..Sats::_10M, + )))?, + _10m_sats_to_1btc: full(Filter::Amount(AmountFilter::Range( + Sats::_10M..Sats::_1BTC, + )))?, + _1btc_to_10btc: full(Filter::Amount(AmountFilter::Range( + Sats::_1BTC..Sats::_10BTC, + )))?, + _10btc_to_100btc: full(Filter::Amount(AmountFilter::Range( + Sats::_10BTC..Sats::_100BTC, + )))?, + _100btc_to_1k_btc: full(Filter::Amount(AmountFilter::Range( + Sats::_100BTC..Sats::_1K_BTC, + )))?, + _1k_btc_to_10k_btc: full(Filter::Amount(AmountFilter::Range( + Sats::_1K_BTC..Sats::_10K_BTC, + )))?, + _10k_btc_to_100k_btc: full(Filter::Amount(AmountFilter::Range( + Sats::_10K_BTC..Sats::_100K_BTC, + )))?, + _100k_btc_or_more: full(Filter::Amount(AmountFilter::GreaterOrEqual( + Sats::_100K_BTC, + )))?, }, lt_amount: ByLowerThanAmount { @@ -579,14 +610,17 @@ impl Vecs { pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { // Flush stateful cohorts - self.iter_separate_mut() + self.par_iter_separate_mut() .try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?; - // Flush aggregate cohorts' price_to_amount - self.0.iter_aggregate_mut().try_for_each(|v| { + // Flush aggregate cohorts' price_to_amount and price_percentiles + self.0.par_iter_aggregate_mut().try_for_each(|v| { if let Some(p2a) = v.price_to_amount.as_mut() { p2a.flush(height)?; } + if let Some(pp) = v.inner.price_percentiles.as_mut() { + pp.safe_flush(exit)?; + } Ok(()) }) } @@ -602,15 +636,28 @@ impl Vecs { }) } - /// Import aggregate cohorts' price_to_amount from disk when resuming from a checkpoint + /// Import aggregate cohorts' price_to_amount from disk when resuming from a checkpoint. + /// Returns the height to start processing from (checkpoint_height + 1), matching the + /// behavior of `common::import_state` for separate cohorts. + /// + /// Note: We don't check inner.min_height_vecs_len() for aggregate cohorts because their + /// inner vecs (height_to_supply, etc.) are computed post-hoc by compute_overlapping_vecs, + /// not maintained during the main processing loop. pub fn import_aggregate_price_to_amount(&mut self, height: Height) -> Result { - let mut min_height = height; + // Match separate vecs behavior: decrement height to get prev_height + let Some(mut prev_height) = height.decremented() else { + // height is 0, return ZERO (caller will handle this) + return Ok(Height::ZERO); + }; + for v in self.0.iter_aggregate_mut() { if let Some(p2a) = v.price_to_amount.as_mut() { - min_height = min_height.min(p2a.import_at_or_before(height)?); + // Match separate vecs: update prev_height to the checkpoint found + prev_height = prev_height.min(p2a.import_at_or_before(prev_height)?); } } - Ok(min_height) + // Return prev_height + 1, matching separate vecs behavior + Ok(prev_height.incremented()) } /// Compute and push percentiles for aggregate cohorts (all, sth, lth). @@ -620,12 +667,7 @@ impl Vecs { .0 .age_range .iter() - .map(|sub| { - ( - sub.filter().clone(), - sub.state.u().supply.value, - ) - }) + .map(|sub| (sub.filter().clone(), sub.state.u().supply.value)) .collect(); let results: Vec<_> = self