computer: fix flushes

This commit is contained in:
nym21
2025-12-05 17:54:01 +01:00
parent 82050c7c01
commit cfc5f7633b
6 changed files with 98 additions and 129 deletions

View File

@@ -1,7 +1,7 @@
use brk_error::Result;
use brk_traversable::{Traversable, TreeNode};
use brk_types::{Dollars, Height, Version};
use vecdb::{AnyExportableVec, Database, EagerVec, Exit, GenericStoredVec, PcoVec};
use vecdb::{AnyExportableVec, AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, PcoVec};
use crate::{Indexes, indexes};
@@ -83,6 +83,15 @@ impl PricePercentiles {
.position(|&p| p == percentile)
.and_then(|i| self.vecs[i].as_ref())
}
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
for vec in self.vecs.iter_mut().flatten() {
if let Some(height_vec) = vec.height.as_mut() {
height_vec.safe_flush(exit)?;
}
}
Ok(())
}
}
impl Traversable for PricePercentiles {

View File

@@ -5,6 +5,7 @@ use brk_grouper::{AddressGroups, AmountFilter, Filter, Filtered};
use brk_traversable::Traversable;
use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version};
use derive_deref::{Deref, DerefMut};
use rayon::prelude::*;
use vecdb::{Database, Exit, IterableVec};
use crate::{
@@ -132,7 +133,7 @@ impl Vecs {
}
pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
self.iter_separate_mut()
self.par_iter_separate_mut()
.try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))
}
}

View File

@@ -1011,6 +1011,10 @@ impl Vecs {
.um()
.safe_flush(exit)?;
}
if let Some(price_percentiles) = self.price_percentiles.as_mut() {
price_percentiles.safe_flush(exit)?;
}
}
state.commit(height)?;

View File

@@ -517,11 +517,10 @@ impl Vecs {
let starting_height = {
drop(separate_utxo_vecs);
drop(separate_address_vecs);
let result = if starting_height.is_not_zero()
&& self
.utxo_cohorts
.import_aggregate_price_to_amount(starting_height)?
== starting_height
let imported_height = self
.utxo_cohorts
.import_aggregate_price_to_amount(starting_height)?;
let result = if starting_height.is_not_zero() && imported_height == starting_height
{
starting_height
} else {
@@ -755,15 +754,6 @@ impl Vecs {
let typeindex = txoutindex_to_typeindex
.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex);
// Debug: track the exact bad typeindex
let ti: usize = typeindex.into();
if ti == 254909199 {
eprintln!(
"DEBUG outputs EXACT: output_type={:?}, txoutindex={}, typeindex={}",
output_type, txoutindex.to_usize(), ti
);
}
let addressdata_opt = Self::get_addressdatawithsource(
output_type,
typeindex,
@@ -795,15 +785,8 @@ impl Vecs {
transacted.iterate(value, output_type);
if let Some((typeindex, addressdata_opt)) = typeindex_with_addressdata_opt {
let ti: usize = typeindex.into();
if ti == 254909199 {
eprintln!("DEBUG fold outputs EXACT: output_type={:?}, typeindex={}, has_addressdata={}", output_type, ti, addressdata_opt.is_some());
}
if let Some(addressdata) = addressdata_opt
{
if ti == 254909199 {
eprintln!("DEBUG fold inserting to addressdatawithsource EXACT: output_type={:?}, is_new={}", output_type, addressdata.is_new());
}
addresstype_to_typeindex_to_addressdatawithsource
.insert_for_type(output_type, typeindex, addressdata);
}
@@ -874,15 +857,6 @@ impl Vecs {
let typeindex = txoutindex_to_typeindex
.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex);
// Debug: track the exact bad typeindex
let ti: usize = typeindex.into();
if ti == 254909199 {
eprintln!(
"DEBUG inputs EXACT: input_type={:?}, txoutindex={}, typeindex={}",
input_type, txoutindex.to_usize(), ti
);
}
let addressdata_opt = Self::get_addressdatawithsource(
input_type,
typeindex,
@@ -932,14 +906,7 @@ impl Vecs {
if let Some((typeindex, addressdata_opt)) =
typeindex_with_addressdata_opt
{
let ti: usize = typeindex.into();
if ti == 254909199 {
eprintln!("DEBUG fold inputs EXACT: output_type={:?}, typeindex={}, has_addressdata={}", output_type, ti, addressdata_opt.is_some());
}
if let Some(addressdata) = addressdata_opt {
if ti == 254909199 {
eprintln!("DEBUG fold inputs inserting EXACT: output_type={:?}, is_new={}", output_type, addressdata.is_new());
}
addresstype_to_typeindex_to_addressdatawithsource
.insert_for_type(output_type, typeindex, addressdata);
}
@@ -1405,16 +1372,8 @@ impl Vecs {
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
) -> Option<WithAddressDataSource<LoadedAddressData>> {
let typeindex_usize: usize = typeindex.into();
if typeindex_usize == 254909199 {
eprintln!("DEBUG get_addressdatawithsource EXACT: address_type={:?}, typeindex={}", address_type, typeindex_usize);
}
let first = *first_addressindexes.get(address_type).unwrap();
if first <= typeindex {
let first_usize: usize = first.into();
if typeindex_usize == 254909199 {
eprintln!("DEBUG get_addressdatawithsource returning New EXACT: address_type={:?}, first={}", address_type, first_usize);
}
return Some(WithAddressDataSource::New(LoadedAddressData::default()));
}
@@ -1478,12 +1437,9 @@ impl Vecs {
) -> Result<()> {
info!("Flushing...");
self.utxo_cohorts
.par_iter_separate_mut()
.try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?;
self.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?;
self.address_cohorts
.par_iter_separate_mut()
.try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?;
.safe_flush_stateful_vecs(height, exit)?;
self.height_to_unspendable_supply.safe_flush(exit)?;
self.height_to_opreturn_supply.safe_flush(exit)?;
self.addresstype_to_height_to_addr_count
@@ -1509,10 +1465,6 @@ impl Vecs {
let anyaddressindex = AnyAddressIndex::from(emptyaddressindex);
let ti: usize = typeindex.into();
if address_type == OutputType::P2SH && ti > 30_000_000 {
eprintln!("DEBUG insert1 (empty New): P2SH typeindex={}", ti);
}
addresstype_to_typeindex_to_new_or_updated_anyaddressindex
.get_mut(address_type)
.unwrap()
@@ -1538,10 +1490,6 @@ impl Vecs {
let anyaddressindex = emptyaddressindex.into();
let ti: usize = typeindex.into();
if address_type == OutputType::P2SH && ti > 30_000_000 {
eprintln!("DEBUG insert2 (empty FromLoaded): P2SH typeindex={}", ti);
}
addresstype_to_typeindex_to_new_or_updated_anyaddressindex
.get_mut(address_type)
.unwrap()
@@ -1564,10 +1512,6 @@ impl Vecs {
let anyaddressindex = AnyAddressIndex::from(loadedaddressindex);
let ti: usize = typeindex.into();
if address_type == OutputType::P2SH && ti > 30_000_000 {
eprintln!("DEBUG insert3 (loaded New): P2SH typeindex={}", ti);
}
addresstype_to_typeindex_to_new_or_updated_anyaddressindex
.get_mut(address_type)
.unwrap()
@@ -1593,10 +1537,6 @@ impl Vecs {
let anyaddressindex = loadedaddressindex.into();
let ti: usize = typeindex.into();
if address_type == OutputType::P2SH && ti > 30_000_000 {
eprintln!("DEBUG insert4 (loaded FromEmpty): P2SH typeindex={}", ti);
}
addresstype_to_typeindex_to_new_or_updated_anyaddressindex
.get_mut(address_type)
.unwrap()
@@ -1610,14 +1550,6 @@ impl Vecs {
addresstype_to_typeindex_to_new_or_updated_anyaddressindex.into_sorted_iter()
{
for (typeindex, anyaddressindex) in sorted {
// Debug: log right before the call that fails
let typeindex_usize: usize = typeindex.into();
if address_type == OutputType::P2SH && typeindex_usize > 30_000_000 {
eprintln!(
"DEBUG flush update_or_push: address_type={:?}, typeindex={}, anyaddressindex={:?}",
address_type, typeindex_usize, anyaddressindex
);
}
self.any_address_indexes.update_or_push(
address_type,
typeindex,

View File

@@ -33,11 +33,6 @@ impl AddressTypeToVec<(TypeIndex, Sats)> {
) {
self.unwrap().into_iter().for_each(|(_type, vec)| {
vec.into_iter().for_each(|(type_index, value)| {
let type_index_usize: usize = type_index.into();
if type_index_usize == 254909199 {
eprintln!("DEBUG process_received EXACT: _type={:?}, type_index={}", _type, type_index_usize);
}
let mut is_new = false;
let mut from_any_empty = false;
@@ -52,18 +47,12 @@ impl AddressTypeToVec<(TypeIndex, Sats)> {
.remove(&type_index)
.map(|ad| {
from_any_empty = true;
if type_index_usize == 254909199 {
eprintln!("DEBUG process_received from_empty EXACT: _type={:?}, is_new={}", _type, ad.is_new());
}
ad.into()
})
.unwrap_or_else(|| {
let addressdata =
stored_or_new_addresstype_to_typeindex_to_addressdatawithsource
.remove_for_type(_type, &type_index);
if type_index_usize == 254909199 {
eprintln!("DEBUG process_received from_stored_or_new EXACT: _type={:?}, is_new={}", _type, addressdata.is_new());
}
is_new = addressdata.is_new();
from_any_empty = addressdata.is_from_emptyaddressdata();
addressdata
@@ -156,11 +145,6 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> {
v.unwrap().into_iter().try_for_each(|(_type, vec)| {
vec.into_iter().try_for_each(|(type_index, value)| {
let type_index_usize: usize = type_index.into();
if type_index_usize == 254909199 {
eprintln!("DEBUG process_sent EXACT: _type={:?}, type_index={}", _type, type_index_usize);
}
let typeindex_to_loadedaddressdata =
addresstype_to_typeindex_to_loadedaddressdata.get_mut_unwrap(_type);
@@ -202,9 +186,6 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> {
let addressdata =
typeindex_to_loadedaddressdata.remove(&type_index).unwrap();
if type_index_usize == 254909199 {
eprintln!("DEBUG process_sent will_be_empty EXACT: _type={:?}", _type);
}
addresstype_to_typeindex_to_emptyaddressdata
.get_mut(_type)
.unwrap()

View File

@@ -8,8 +8,8 @@ use brk_grouper::{
};
use brk_traversable::Traversable;
use brk_types::{
Bitcoin, CheckedSub, DateIndex, Dollars, HalvingEpoch, Height, OutputType, Sats, Timestamp,
Version, ONE_DAY_IN_SEC,
Bitcoin, CheckedSub, DateIndex, Dollars, HalvingEpoch, Height, ONE_DAY_IN_SEC, OutputType,
Sats, Timestamp, Version,
};
use derive_deref::{Deref, DerefMut};
use rayon::prelude::*;
@@ -41,12 +41,17 @@ impl Vecs {
let v = version + VERSION + Version::ZERO;
// Helper to create a cohort - booleans are now derived from filter
let create =
|filter: Filter, state_level: StateLevel| -> Result<utxo_cohort::Vecs> {
utxo_cohort::Vecs::forced_import(
db, filter, v, indexes, price, states_path, state_level,
)
};
let create = |filter: Filter, state_level: StateLevel| -> Result<utxo_cohort::Vecs> {
utxo_cohort::Vecs::forced_import(
db,
filter,
v,
indexes,
price,
states_path,
state_level,
)
};
let full = |f: Filter| create(f, StateLevel::Full);
let none = |f: Filter| create(f, StateLevel::None);
@@ -158,19 +163,45 @@ impl Vecs {
amount_range: ByAmountRange {
_0sats: full(Filter::Amount(AmountFilter::LowerThan(Sats::_1)))?,
_1sat_to_10sats: full(Filter::Amount(AmountFilter::Range(Sats::_1..Sats::_10)))?,
_10sats_to_100sats: full(Filter::Amount(AmountFilter::Range(Sats::_10..Sats::_100)))?,
_100sats_to_1k_sats: full(Filter::Amount(AmountFilter::Range(Sats::_100..Sats::_1K)))?,
_1k_sats_to_10k_sats: full(Filter::Amount(AmountFilter::Range(Sats::_1K..Sats::_10K)))?,
_10k_sats_to_100k_sats: full(Filter::Amount(AmountFilter::Range(Sats::_10K..Sats::_100K)))?,
_100k_sats_to_1m_sats: full(Filter::Amount(AmountFilter::Range(Sats::_100K..Sats::_1M)))?,
_1m_sats_to_10m_sats: full(Filter::Amount(AmountFilter::Range(Sats::_1M..Sats::_10M)))?,
_10m_sats_to_1btc: full(Filter::Amount(AmountFilter::Range(Sats::_10M..Sats::_1BTC)))?,
_1btc_to_10btc: full(Filter::Amount(AmountFilter::Range(Sats::_1BTC..Sats::_10BTC)))?,
_10btc_to_100btc: full(Filter::Amount(AmountFilter::Range(Sats::_10BTC..Sats::_100BTC)))?,
_100btc_to_1k_btc: full(Filter::Amount(AmountFilter::Range(Sats::_100BTC..Sats::_1K_BTC)))?,
_1k_btc_to_10k_btc: full(Filter::Amount(AmountFilter::Range(Sats::_1K_BTC..Sats::_10K_BTC)))?,
_10k_btc_to_100k_btc: full(Filter::Amount(AmountFilter::Range(Sats::_10K_BTC..Sats::_100K_BTC)))?,
_100k_btc_or_more: full(Filter::Amount(AmountFilter::GreaterOrEqual(Sats::_100K_BTC)))?,
_10sats_to_100sats: full(Filter::Amount(AmountFilter::Range(
Sats::_10..Sats::_100,
)))?,
_100sats_to_1k_sats: full(Filter::Amount(AmountFilter::Range(
Sats::_100..Sats::_1K,
)))?,
_1k_sats_to_10k_sats: full(Filter::Amount(AmountFilter::Range(
Sats::_1K..Sats::_10K,
)))?,
_10k_sats_to_100k_sats: full(Filter::Amount(AmountFilter::Range(
Sats::_10K..Sats::_100K,
)))?,
_100k_sats_to_1m_sats: full(Filter::Amount(AmountFilter::Range(
Sats::_100K..Sats::_1M,
)))?,
_1m_sats_to_10m_sats: full(Filter::Amount(AmountFilter::Range(
Sats::_1M..Sats::_10M,
)))?,
_10m_sats_to_1btc: full(Filter::Amount(AmountFilter::Range(
Sats::_10M..Sats::_1BTC,
)))?,
_1btc_to_10btc: full(Filter::Amount(AmountFilter::Range(
Sats::_1BTC..Sats::_10BTC,
)))?,
_10btc_to_100btc: full(Filter::Amount(AmountFilter::Range(
Sats::_10BTC..Sats::_100BTC,
)))?,
_100btc_to_1k_btc: full(Filter::Amount(AmountFilter::Range(
Sats::_100BTC..Sats::_1K_BTC,
)))?,
_1k_btc_to_10k_btc: full(Filter::Amount(AmountFilter::Range(
Sats::_1K_BTC..Sats::_10K_BTC,
)))?,
_10k_btc_to_100k_btc: full(Filter::Amount(AmountFilter::Range(
Sats::_10K_BTC..Sats::_100K_BTC,
)))?,
_100k_btc_or_more: full(Filter::Amount(AmountFilter::GreaterOrEqual(
Sats::_100K_BTC,
)))?,
},
lt_amount: ByLowerThanAmount {
@@ -579,14 +610,17 @@ impl Vecs {
pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
// Flush stateful cohorts
self.iter_separate_mut()
self.par_iter_separate_mut()
.try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?;
// Flush aggregate cohorts' price_to_amount
self.0.iter_aggregate_mut().try_for_each(|v| {
// Flush aggregate cohorts' price_to_amount and price_percentiles
self.0.par_iter_aggregate_mut().try_for_each(|v| {
if let Some(p2a) = v.price_to_amount.as_mut() {
p2a.flush(height)?;
}
if let Some(pp) = v.inner.price_percentiles.as_mut() {
pp.safe_flush(exit)?;
}
Ok(())
})
}
@@ -602,15 +636,28 @@ impl Vecs {
})
}
/// Import aggregate cohorts' price_to_amount from disk when resuming from a checkpoint
/// Import aggregate cohorts' price_to_amount from disk when resuming from a checkpoint.
/// Returns the height to start processing from (checkpoint_height + 1), matching the
/// behavior of `common::import_state` for separate cohorts.
///
/// Note: We don't check inner.min_height_vecs_len() for aggregate cohorts because their
/// inner vecs (height_to_supply, etc.) are computed post-hoc by compute_overlapping_vecs,
/// not maintained during the main processing loop.
pub fn import_aggregate_price_to_amount(&mut self, height: Height) -> Result<Height> {
let mut min_height = height;
// Match separate vecs behavior: decrement height to get prev_height
let Some(mut prev_height) = height.decremented() else {
// height is 0, return ZERO (caller will handle this)
return Ok(Height::ZERO);
};
for v in self.0.iter_aggregate_mut() {
if let Some(p2a) = v.price_to_amount.as_mut() {
min_height = min_height.min(p2a.import_at_or_before(height)?);
// Match separate vecs: update prev_height to the checkpoint found
prev_height = prev_height.min(p2a.import_at_or_before(prev_height)?);
}
}
Ok(min_height)
// Return prev_height + 1, matching separate vecs behavior
Ok(prev_height.incremented())
}
/// Compute and push percentiles for aggregate cohorts (all, sth, lth).
@@ -620,12 +667,7 @@ impl Vecs {
.0
.age_range
.iter()
.map(|sub| {
(
sub.filter().clone(),
sub.state.u().supply.value,
)
})
.map(|sub| (sub.filter().clone(), sub.state.u().supply.value))
.collect();
let results: Vec<_> = self