computer: snapshot

This commit is contained in:
nym21
2025-12-04 00:39:22 +01:00
parent d27cc02e8c
commit a1f31a14be
17 changed files with 443 additions and 300 deletions

View File

@@ -2,14 +2,16 @@ use brk_error::{Error, Result};
use brk_traversable::Traversable;
use brk_types::{CheckedSub, StoredU64, Version};
use vecdb::{
AnyStoredVec, AnyVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableVec,
PcoVec, VecIndex, VecValue,
AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableVec, PcoVec,
VecIndex, VecValue,
};
use crate::utils::{get_percentile, OptionExt};
use crate::utils::{OptionExt, get_percentile};
use super::ComputedVecValue;
const VERSION: Version = Version::ZERO;
#[derive(Clone, Debug, Traversable)]
pub struct EagerVecsBuilder<I, T>
where
@@ -30,8 +32,6 @@ where
pub cumulative: Option<Box<EagerVec<PcoVec<I, T>>>>,
}
const VERSION: Version = Version::ZERO;
impl<I, T> EagerVecsBuilder<I, T>
where
I: VecIndex,
@@ -45,29 +45,42 @@ where
) -> Result<Self> {
let only_one_active = options.is_only_one_active();
let suffix = |s: &str| format!("{name}_{s}");
let maybe_suffix = |s: &str| if only_one_active { name.to_string() } else { suffix(s) };
let maybe_suffix = |s: &str| {
if only_one_active {
name.to_string()
} else {
suffix(s)
}
};
let v = version + VERSION;
macro_rules! import {
($s:expr) => { Box::new(EagerVec::forced_import(db, &maybe_suffix($s), v).unwrap()) };
($s:expr) => {
Box::new(EagerVec::forced_import(db, &maybe_suffix($s), v).unwrap())
};
}
let s = Self {
first: options.first.then(|| import!("first")),
last: options.last.then(|| Box::new(EagerVec::forced_import(db, name, v).unwrap())),
last: options
.last
.then(|| Box::new(EagerVec::forced_import(db, name, v).unwrap())),
min: options.min.then(|| import!("min")),
max: options.max.then(|| import!("max")),
median: options.median.then(|| import!("median")),
average: options.average.then(|| import!("avg")),
sum: options.sum.then(|| {
let sum_name = if !options.last && !options.average && !options.min && !options.max {
let sum_name = if !options.last && !options.average && !options.min && !options.max
{
name.to_string()
} else {
maybe_suffix("sum")
};
Box::new(EagerVec::forced_import(db, &sum_name, v).unwrap())
}),
cumulative: options.cumulative.then(|| Box::new(EagerVec::forced_import(db, &suffix("cumulative"), v).unwrap())),
cumulative: options
.cumulative
.then(|| Box::new(EagerVec::forced_import(db, &suffix("cumulative"), v).unwrap())),
pct90: options.pct90.then(|| import!("pct90")),
pct75: options.pct75.then(|| import!("pct75")),
pct25: options.pct25.then(|| import!("pct25")),
@@ -77,6 +90,125 @@ where
Ok(s)
}
#[inline]
fn needs_percentiles(&self) -> bool {
self.pct90.is_some()
|| self.pct75.is_some()
|| self.median.is_some()
|| self.pct25.is_some()
|| self.pct10.is_some()
}
#[inline]
fn needs_minmax(&self) -> bool {
self.max.is_some() || self.min.is_some()
}
#[inline]
fn needs_sum_or_cumulative(&self) -> bool {
self.sum.is_some() || self.cumulative.is_some()
}
#[inline]
fn needs_average_sum_or_cumulative(&self) -> bool {
self.needs_sum_or_cumulative() || self.average.is_some()
}
/// Compute min/max in O(n) without sorting or collecting
#[inline]
fn compute_minmax_streaming(
&mut self,
index: usize,
iter: impl Iterator<Item = T>,
) -> Result<()> {
let mut min_val: Option<T> = None;
let mut max_val: Option<T> = None;
let need_min = self.min.is_some();
let need_max = self.max.is_some();
for val in iter {
if need_min {
min_val = Some(min_val.map_or(val, |m| if val < m { val } else { m }));
}
if need_max {
max_val = Some(max_val.map_or(val, |m| if val > m { val } else { m }));
}
}
if let Some(min) = self.min.as_mut() {
min.truncate_push_at(index, min_val.unwrap())?;
}
if let Some(max) = self.max.as_mut() {
max.truncate_push_at(index, max_val.unwrap())?;
}
Ok(())
}
/// Compute min/max from collected values in O(n) without sorting
#[inline]
fn compute_minmax_from_slice(&mut self, index: usize, values: &[T]) -> Result<()> {
if let Some(min) = self.min.as_mut() {
min.truncate_push_at(index, *values.iter().min().unwrap())?;
}
if let Some(max) = self.max.as_mut() {
max.truncate_push_at(index, *values.iter().max().unwrap())?;
}
Ok(())
}
/// Compute percentiles from sorted values (assumes values is already sorted)
fn compute_percentiles_from_sorted(&mut self, index: usize, values: &[T]) -> Result<()> {
if let Some(max) = self.max.as_mut() {
max.truncate_push_at(index, *values.last().ok_or(Error::Str("expect some"))?)?;
}
if let Some(pct90) = self.pct90.as_mut() {
pct90.truncate_push_at(index, get_percentile(values, 0.90))?;
}
if let Some(pct75) = self.pct75.as_mut() {
pct75.truncate_push_at(index, get_percentile(values, 0.75))?;
}
if let Some(median) = self.median.as_mut() {
median.truncate_push_at(index, get_percentile(values, 0.50))?;
}
if let Some(pct25) = self.pct25.as_mut() {
pct25.truncate_push_at(index, get_percentile(values, 0.25))?;
}
if let Some(pct10) = self.pct10.as_mut() {
pct10.truncate_push_at(index, get_percentile(values, 0.10))?;
}
if let Some(min) = self.min.as_mut() {
min.truncate_push_at(index, *values.first().unwrap())?;
}
Ok(())
}
/// Compute sum, average, and cumulative from values
fn compute_aggregates(
&mut self,
index: usize,
values: Vec<T>,
cumulative: &mut Option<T>,
) -> Result<()> {
let len = values.len();
let sum = values.into_iter().fold(T::from(0), |a, b| a + b);
if let Some(average) = self.average.as_mut() {
average.truncate_push_at(index, sum / len)?;
}
if self.needs_sum_or_cumulative() {
if let Some(sum_vec) = self.sum.as_mut() {
sum_vec.truncate_push_at(index, sum)?;
}
if let Some(cumulative_vec) = self.cumulative.as_mut() {
let t = cumulative.unwrap() + sum;
cumulative.replace(t);
cumulative_vec.truncate_push_at(index, t)?;
}
}
Ok(())
}
pub fn extend(
&mut self,
max_from: I,
@@ -170,95 +302,33 @@ where
last.truncate_push_at(index, v)?;
}
let needs_sum_or_cumulative = self.sum.is_some() || self.cumulative.is_some();
let needs_average_sum_or_cumulative =
needs_sum_or_cumulative || self.average.is_some();
let needs_sorted = self.max.is_some()
|| self.pct90.is_some()
|| self.pct75.is_some()
|| self.median.is_some()
|| self.pct25.is_some()
|| self.pct10.is_some()
|| self.min.is_some();
let needs_values = needs_sorted || needs_average_sum_or_cumulative;
let needs_percentiles = self.needs_percentiles();
let needs_minmax = self.needs_minmax();
let needs_aggregates = self.needs_average_sum_or_cumulative();
if needs_values {
// Fast path: only min/max needed, no sorting or allocation required
if needs_minmax && !needs_percentiles && !needs_aggregates {
source_iter.set_position(first_index);
self.compute_minmax_streaming(
index,
(&mut source_iter).take(*count_index as usize),
)?;
} else if needs_percentiles || needs_aggregates {
source_iter.set_position(first_index);
let mut values = (&mut source_iter)
.take(*count_index as usize)
.collect::<Vec<_>>();
if needs_sorted {
if needs_percentiles {
values.sort_unstable();
if let Some(max) = self.max.as_mut() {
max.truncate_push_at(
index,
*values
.last()
.ok_or(Error::Str("expect some"))
.inspect_err(|_| {
dbg!(
&values,
max.name(),
index,
first_indexes.name(),
first_index,
count_indexes.name(),
count_index,
source.len(),
source.name()
);
})
.unwrap(),
)?;
}
if let Some(pct90) = self.pct90.as_mut() {
pct90.truncate_push_at(index, get_percentile(&values, 0.90))?;
}
if let Some(pct75) = self.pct75.as_mut() {
pct75.truncate_push_at(index, get_percentile(&values, 0.75))?;
}
if let Some(median) = self.median.as_mut() {
median.truncate_push_at(index, get_percentile(&values, 0.50))?;
}
if let Some(pct25) = self.pct25.as_mut() {
pct25.truncate_push_at(index, get_percentile(&values, 0.25))?;
}
if let Some(pct10) = self.pct10.as_mut() {
pct10.truncate_push_at(index, get_percentile(&values, 0.10))?;
}
if let Some(min) = self.min.as_mut() {
min.truncate_push_at(index, *values.first().unwrap())?;
}
self.compute_percentiles_from_sorted(index, &values)?;
} else if needs_minmax {
// We have values collected but only need min/max (along with aggregates)
self.compute_minmax_from_slice(index, &values)?;
}
if needs_average_sum_or_cumulative {
let len = values.len();
let sum = values.into_iter().fold(T::from(0), |a, b| a + b);
if let Some(average) = self.average.as_mut() {
let avg = sum / len;
average.truncate_push_at(index, avg)?;
}
if needs_sum_or_cumulative {
if let Some(sum_vec) = self.sum.as_mut() {
sum_vec.truncate_push_at(index, sum)?;
}
if let Some(cumulative_vec) = self.cumulative.as_mut() {
let t = cumulative.unwrap() + sum;
cumulative.replace(t);
cumulative_vec.truncate_push_at(index, t)?;
}
}
if needs_aggregates {
self.compute_aggregates(index, values, &mut cumulative)?;
}
}
@@ -282,13 +352,8 @@ where
where
A: VecIndex + VecValue + CheckedSub<A>,
{
if self.pct90.is_some()
|| self.pct75.is_some()
|| self.median.is_some()
|| self.pct25.is_some()
|| self.pct10.is_some()
{
panic!("unsupported");
if self.needs_percentiles() {
panic!("percentiles unsupported in from_aligned");
}
self.validate_computed_version_or_reset(
@@ -334,59 +399,50 @@ where
last.truncate_push_at(index, v)?;
}
let needs_sum_or_cumulative = self.sum.is_some() || self.cumulative.is_some();
let needs_average_sum_or_cumulative =
needs_sum_or_cumulative || self.average.is_some();
let needs_sorted = self.max.is_some() || self.min.is_some();
let needs_values = needs_sorted || needs_average_sum_or_cumulative;
let needs_minmax = self.needs_minmax();
let needs_aggregates = self.needs_average_sum_or_cumulative();
if needs_values {
if needs_sorted {
if needs_minmax || needs_aggregates {
// Min/max: use streaming O(n) instead of sort O(n log n)
if needs_minmax {
if let Some(max) = self.max.as_mut() {
let source_max_iter = source_max_iter.um();
source_max_iter.set_position(first_index);
let mut values = source_max_iter
.take(*count_index as usize)
.collect::<Vec<_>>();
values.sort_unstable();
max.truncate_push_at(index, *values.last().unwrap())?;
let max_val =
source_max_iter.take(*count_index as usize).max().unwrap();
max.truncate_push_at(index, max_val)?;
}
if let Some(min) = self.min.as_mut() {
let source_min_iter = source_min_iter.um();
source_min_iter.set_position(first_index);
let mut values = source_min_iter
.take(*count_index as usize)
.collect::<Vec<_>>();
values.sort_unstable();
min.truncate_push_at(index, *values.first().unwrap())?;
let min_val =
source_min_iter.take(*count_index as usize).min().unwrap();
min.truncate_push_at(index, min_val)?;
}
}
if needs_average_sum_or_cumulative {
if needs_aggregates {
if let Some(average) = self.average.as_mut() {
let source_average_iter = source_average_iter.um();
source_average_iter.set_position(first_index);
let values = source_average_iter
let mut len = 0usize;
let sum = (&mut *source_average_iter)
.take(*count_index as usize)
.collect::<Vec<_>>();
let len = values.len();
let cumulative = values.into_iter().fold(T::from(0), |a, b| a + b);
.inspect(|_| len += 1)
.fold(T::from(0), |a, b| a + b);
// TODO: Multiply by count then divide by cumulative
// Right now it's not 100% accurate as there could be more or less elements in the lower timeframe (28 days vs 31 days in a month for example)
let avg = cumulative / len;
let avg = sum / len;
average.truncate_push_at(index, avg)?;
}
if needs_sum_or_cumulative {
if self.needs_sum_or_cumulative() {
let source_sum_iter = source_sum_iter.um();
source_sum_iter.set_position(first_index);
let values = source_sum_iter
let sum = source_sum_iter
.take(*count_index as usize)
.collect::<Vec<_>>();
let sum = values.into_iter().fold(T::from(0), |a, b| a + b);
.fold(T::from(0), |a, b| a + b);
if let Some(sum_vec) = self.sum.as_mut() {
sum_vec.truncate_push_at(index, sum)?;
@@ -415,45 +471,57 @@ where
))
}
#[inline]
pub fn unwrap_first(&self) -> &EagerVec<PcoVec<I, T>> {
self.first.u()
}
#[inline]
#[allow(unused)]
pub fn unwrap_average(&self) -> &EagerVec<PcoVec<I, T>> {
self.average.u()
}
#[inline]
pub fn unwrap_sum(&self) -> &EagerVec<PcoVec<I, T>> {
self.sum.u()
}
#[inline]
pub fn unwrap_max(&self) -> &EagerVec<PcoVec<I, T>> {
self.max.u()
}
#[inline]
#[allow(unused)]
pub fn unwrap_pct90(&self) -> &EagerVec<PcoVec<I, T>> {
self.pct90.u()
}
#[inline]
#[allow(unused)]
pub fn unwrap_pct75(&self) -> &EagerVec<PcoVec<I, T>> {
self.pct75.u()
}
#[inline]
#[allow(unused)]
pub fn unwrap_median(&self) -> &EagerVec<PcoVec<I, T>> {
self.median.u()
}
#[inline]
#[allow(unused)]
pub fn unwrap_pct25(&self) -> &EagerVec<PcoVec<I, T>> {
self.pct25.u()
}
#[inline]
#[allow(unused)]
pub fn unwrap_pct10(&self) -> &EagerVec<PcoVec<I, T>> {
self.pct10.u()
}
#[inline]
pub fn unwrap_min(&self) -> &EagerVec<PcoVec<I, T>> {
self.min.u()
}
#[inline]
pub fn unwrap_last(&self) -> &EagerVec<PcoVec<I, T>> {
self.last.u()
}
#[inline]
#[allow(unused)]
pub fn unwrap_cumulative(&self) -> &EagerVec<PcoVec<I, T>> {
self.cumulative.u()

View File

@@ -149,15 +149,17 @@ where
if i.to_usize() >= len_source.vec_len() {
return None;
}
let vec = S1I::inclusive_range_from(i, source.vec_len())
let mut sum = T::from(0);
let mut len = 0usize;
for v in S1I::inclusive_range_from(i, source.vec_len())
.flat_map(|i| source.get_at(i))
.collect::<Vec<_>>();
if vec.is_empty() {
{
sum += v;
len += 1;
}
if len == 0 {
return None;
}
let mut sum = T::from(0);
let len = vec.len();
vec.into_iter().for_each(|v| sum += v);
Some(sum / len)
},
))
@@ -179,14 +181,17 @@ where
if i.to_usize() >= len_source.vec_len() {
return None;
}
let vec = S1I::inclusive_range_from(i, source.vec_len())
let mut sum = T::from(0);
let mut has_values = false;
for v in S1I::inclusive_range_from(i, source.vec_len())
.flat_map(|i| source.get_at(i))
.collect::<Vec<_>>();
if vec.is_empty() {
{
sum += v;
has_values = true;
}
if !has_values {
return None;
}
let mut sum = T::from(0);
vec.into_iter().for_each(|v| sum += v);
Some(sum)
},
))

View File

@@ -210,6 +210,14 @@ impl ComputedRatioVecsFromDateIndex {
sorted.sort_unstable();
// Cache mutable refs before the loop to avoid repeated unwrap chains
let pct1_vec = self.ratio_pct1.um().dateindex.um();
let pct2_vec = self.ratio_pct2.um().dateindex.um();
let pct5_vec = self.ratio_pct5.um().dateindex.um();
let pct95_vec = self.ratio_pct95.um().dateindex.um();
let pct98_vec = self.ratio_pct98.um().dateindex.um();
let pct99_vec = self.ratio_pct99.um().dateindex.um();
self.ratio
.dateindex
.as_ref()
@@ -219,94 +227,22 @@ impl ComputedRatioVecsFromDateIndex {
.skip(starting_dateindex.to_usize())
.try_for_each(|(index, ratio)| -> Result<()> {
if index < min_ratio_date_usize {
self.ratio_pct5
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, StoredF32::NAN)?;
self.ratio_pct2
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, StoredF32::NAN)?;
self.ratio_pct1
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, StoredF32::NAN)?;
self.ratio_pct95
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, StoredF32::NAN)?;
self.ratio_pct98
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, StoredF32::NAN)?;
self.ratio_pct99
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, StoredF32::NAN)?;
pct1_vec.truncate_push_at(index, StoredF32::NAN)?;
pct2_vec.truncate_push_at(index, StoredF32::NAN)?;
pct5_vec.truncate_push_at(index, StoredF32::NAN)?;
pct95_vec.truncate_push_at(index, StoredF32::NAN)?;
pct98_vec.truncate_push_at(index, StoredF32::NAN)?;
pct99_vec.truncate_push_at(index, StoredF32::NAN)?;
} else {
let pos = sorted.binary_search(&ratio).unwrap_or_else(|pos| pos);
sorted.insert(pos, ratio);
self.ratio_pct1
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, get_percentile(&sorted, 0.01))?;
self.ratio_pct2
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, get_percentile(&sorted, 0.02))?;
self.ratio_pct5
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, get_percentile(&sorted, 0.05))?;
self.ratio_pct95
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, get_percentile(&sorted, 0.95))?;
self.ratio_pct98
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, get_percentile(&sorted, 0.98))?;
self.ratio_pct99
.as_mut()
.unwrap()
.dateindex
.as_mut()
.unwrap()
.truncate_push_at(index, get_percentile(&sorted, 0.99))?;
pct1_vec.truncate_push_at(index, get_percentile(&sorted, 0.01))?;
pct2_vec.truncate_push_at(index, get_percentile(&sorted, 0.02))?;
pct5_vec.truncate_push_at(index, get_percentile(&sorted, 0.05))?;
pct95_vec.truncate_push_at(index, get_percentile(&sorted, 0.95))?;
pct98_vec.truncate_push_at(index, get_percentile(&sorted, 0.98))?;
pct99_vec.truncate_push_at(index, get_percentile(&sorted, 0.99))?;
}
Ok(())

View File

@@ -54,7 +54,12 @@ impl Vecs {
macro_rules! import_di {
($name:expr) => {
ComputedVecsFromDateIndex::forced_import(
db, &suffix($name), Source::Compute, version, indexes, last.clone(),
db,
&suffix($name),
Source::Compute,
version,
indexes,
last.clone(),
)?
};
}
@@ -62,19 +67,42 @@ impl Vecs {
Ok(Self {
id,
indexes_to_blocks_mined: ComputedVecsFromHeight::forced_import(
db, &suffix("blocks_mined"), Source::Compute, version, indexes, sum_cum.clone(),
db,
&suffix("blocks_mined"),
Source::Compute,
version,
indexes,
sum_cum,
)?,
indexes_to_1w_blocks_mined: import_di!("1w_blocks_mined"),
indexes_to_1m_blocks_mined: import_di!("1m_blocks_mined"),
indexes_to_1y_blocks_mined: import_di!("1y_blocks_mined"),
indexes_to_subsidy: ComputedValueVecsFromHeight::forced_import(
db, &suffix("subsidy"), Source::Compute, version, sum_cum.clone(), compute_dollars, indexes,
db,
&suffix("subsidy"),
Source::Compute,
version,
sum_cum,
compute_dollars,
indexes,
)?,
indexes_to_fee: ComputedValueVecsFromHeight::forced_import(
db, &suffix("fee"), Source::Compute, version, sum_cum.clone(), compute_dollars, indexes,
db,
&suffix("fee"),
Source::Compute,
version,
sum_cum,
compute_dollars,
indexes,
)?,
indexes_to_coinbase: ComputedValueVecsFromHeight::forced_import(
db, &suffix("coinbase"), Source::Compute, version, sum_cum, compute_dollars, indexes,
db,
&suffix("coinbase"),
Source::Compute,
version,
sum_cum,
compute_dollars,
indexes,
)?,
indexes_to_dominance: import_di!("dominance"),
indexes_to_1d_dominance: import_di!("1d_dominance"),

View File

@@ -6,7 +6,7 @@ use brk_types::{
P2PKHAddressIndex, P2SHAddressIndex, P2TRAddressIndex, P2WPKHAddressIndex, P2WSHAddressIndex,
TypeIndex,
};
use vecdb::{AnyStoredVec, BytesVec, GenericStoredVec, Reader, Stamp};
use vecdb::{AnyStoredVec, AnyVec, BytesVec, GenericStoredVec, Reader, Stamp};
#[derive(Clone, Traversable)]
pub struct AnyAddressIndexesVecs {
@@ -99,6 +99,24 @@ impl AnyAddressIndexesVecs {
typeindex: TypeIndex,
anyaddressindex: AnyAddressIndex,
) -> Result<()> {
let vec_len = match address_type {
OutputType::P2PK33 => self.p2pk33.len(),
OutputType::P2PK65 => self.p2pk65.len(),
OutputType::P2PKH => self.p2pkh.len(),
OutputType::P2SH => self.p2sh.len(),
OutputType::P2TR => self.p2tr.len(),
OutputType::P2WPKH => self.p2wpkh.len(),
OutputType::P2WSH => self.p2wsh.len(),
OutputType::P2A => self.p2a.len(),
_ => unreachable!(),
};
let typeindex_usize: usize = typeindex.into();
if typeindex_usize > vec_len {
eprintln!(
"DEBUG update_or_push: address_type={:?}, typeindex={}, vec_len={}, anyaddressindex={:?}",
address_type, typeindex_usize, vec_len, anyaddressindex
);
}
(match address_type {
OutputType::P2PK33 => self
.p2pk33

View File

@@ -55,6 +55,12 @@ type TxIndexVec = SmallVec<[TxIndex; 4]>;
const VERSION: Version = Version::new(21);
const BIP30_DUPLICATE_COINBASE_HEIGHT_1: u32 = 91_842;
const BIP30_DUPLICATE_COINBASE_HEIGHT_2: u32 = 91_880;
const BIP30_ORIGINAL_COINBASE_HEIGHT_1: u32 = 91_812;
const BIP30_ORIGINAL_COINBASE_HEIGHT_2: u32 = 91_722;
const FLUSH_INTERVAL: usize = 10_000;
#[derive(Clone, Traversable)]
pub struct Vecs {
db: Database,
@@ -461,18 +467,9 @@ impl Vecs {
Ordering::Less => Height::ZERO,
};
// info!("stateful_starting_height = {stateful_starting_height}");
// let stateful_starting_height = stateful_starting_height
// .checked_sub(Height::new(1))
// .unwrap_or_default();
// info!("stateful_starting_height = {stateful_starting_height}");
let starting_height = starting_indexes.height.min(stateful_starting_height);
// info!("starting_height = {starting_height}");
let last_height = Height::from(indexer.vecs.height_to_blockhash.stamp());
// info!("last_height = {last_height}");
if starting_height <= last_height {
// info!("starting_height = {starting_height}");
let stamp = starting_height.into();
let starting_height = if starting_height.is_not_zero() {
@@ -480,12 +477,6 @@ impl Vecs {
.into_iter()
.chain(self.any_address_indexes.rollback_before(stamp)?)
.chain(self.addresses_data.rollback_before(stamp)?)
// .enumerate()
// .map(|(i, s)| {
// let h = Height::from(s).incremented();
// // dbg!((i, s, h));
// h
// })
.map(Height::from)
.map(Height::incremented)
.collect::<BTreeSet<Height>>();
@@ -509,7 +500,6 @@ impl Vecs {
} else {
Height::ZERO
};
// info!("starting_height = {starting_height}");
let starting_height = if starting_height.is_not_zero()
&& separate_address_vecs
@@ -543,8 +533,6 @@ impl Vecs {
result
};
// info!("starting_height = {starting_height}");
let mut chain_state: Vec<BlockState>;
if starting_height.is_not_zero() {
chain_state = self
@@ -568,8 +556,6 @@ impl Vecs {
} else {
info!("Starting processing utxos from the start");
// std::process::exit(0);
chain_state = vec![];
self.any_address_indexes.reset()?;
@@ -770,6 +756,15 @@ impl Vecs {
let typeindex = txoutindex_to_typeindex
.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex);
let typeindex_usize: usize = typeindex.into();
if output_type == OutputType::P2SH && typeindex_usize > 100_000_000 {
let txoutindex_usize: usize = txoutindex.into();
eprintln!(
"DEBUG P2SH bad typeindex at read: txoutindex={}, typeindex={}, txindex={}",
txoutindex_usize, typeindex_usize, txindex.to_usize()
);
}
let addressdata_opt = Self::get_addressdatawithsource(
output_type,
typeindex,
@@ -1095,34 +1090,34 @@ impl Vecs {
.unwrap();
});
if chain_state_starting_height > height {
dbg!(chain_state_starting_height, height);
panic!("temp, just making sure")
}
debug_assert!(
chain_state_starting_height <= height,
"chain_state_starting_height ({chain_state_starting_height}) > height ({height})"
);
unspendable_supply += transacted
.by_type
.unspendable
.as_vec()
.into_iter()
.map(|state| state.value)
.sum::<Sats>()
// NOTE: If ByUnspendableType gains more fields, change to .as_vec().into_iter().map(|s| s.value).sum()
unspendable_supply += transacted.by_type.unspendable.opreturn.value
+ height_to_unclaimed_rewards_iter.get_unwrap(height);
opreturn_supply += transacted.by_type.unspendable.opreturn.value;
if height == Height::new(0) {
if height == Height::ZERO {
transacted = Transacted::default();
unspendable_supply += Sats::FIFTY_BTC;
} else if height == Height::new(91_842) || height == Height::new(91_880) {
// Need to destroy invalid coinbases due to duplicate txids
if height == Height::new(91_842) {
height_to_sent.entry(Height::new(91_812)).or_default()
} else if height == Height::new(BIP30_DUPLICATE_COINBASE_HEIGHT_1)
|| height == Height::new(BIP30_DUPLICATE_COINBASE_HEIGHT_2)
{
if height == Height::new(BIP30_DUPLICATE_COINBASE_HEIGHT_1) {
height_to_sent
.entry(Height::new(BIP30_ORIGINAL_COINBASE_HEIGHT_1))
.or_default()
} else {
height_to_sent.entry(Height::new(91_722)).or_default()
height_to_sent
.entry(Height::new(BIP30_ORIGINAL_COINBASE_HEIGHT_2))
.or_default()
}
.iterate(Sats::FIFTY_BTC, OutputType::P2PK65);
};
}
// Push current block state before processing sends and receives
chain_state.push(BlockState {
@@ -1182,7 +1177,7 @@ impl Vecs {
if height != last_height
&& height != Height::ZERO
&& height.to_usize() % 10_000 == 0
&& height.to_usize() % FLUSH_INTERVAL == 0
{
let _lock = exit.lock();
@@ -1388,7 +1383,16 @@ impl Vecs {
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
) -> Option<WithAddressDataSource<LoadedAddressData>> {
if *first_addressindexes.get(address_type).unwrap() <= typeindex {
let first = *first_addressindexes.get(address_type).unwrap();
if first <= typeindex {
let typeindex_usize: usize = typeindex.into();
let first_usize: usize = first.into();
if typeindex_usize > 100_000_000 {
eprintln!(
"DEBUG get_addressdatawithsource NEW: address_type={:?}, typeindex={}, first_addressindex={}",
address_type, typeindex_usize, first_usize
);
}
return Some(WithAddressDataSource::New(LoadedAddressData::default()));
}
@@ -1474,6 +1478,13 @@ impl Vecs {
addresstype_to_typeindex_to_emptyaddressdata.into_sorted_iter()
{
for (typeindex, emptyaddressdata_with_source) in sorted.into_iter() {
let typeindex_usize: usize = typeindex.into();
if typeindex_usize > 100_000_000 {
eprintln!(
"DEBUG emptyaddressdata: address_type={:?}, typeindex={}, variant={:?}",
address_type, typeindex_usize, std::mem::discriminant(&emptyaddressdata_with_source)
);
}
match emptyaddressdata_with_source {
WithAddressDataSource::New(emptyaddressdata) => {
let emptyaddressindex = self
@@ -1521,6 +1532,13 @@ impl Vecs {
addresstype_to_typeindex_to_loadedaddressdata.into_sorted_iter()
{
for (typeindex, loadedaddressdata_with_source) in sorted.into_iter() {
let typeindex_usize: usize = typeindex.into();
if typeindex_usize > 100_000_000 {
eprintln!(
"DEBUG loadedaddressdata: address_type={:?}, typeindex={}, variant={:?}",
address_type, typeindex_usize, std::mem::discriminant(&loadedaddressdata_with_source)
);
}
match loadedaddressdata_with_source {
WithAddressDataSource::New(loadedaddressdata) => {
let loadedaddressindex = self

View File

@@ -61,13 +61,20 @@ pub fn build_txoutindex_to_txindex<'a>(
block_tx_count: u64,
txindex_to_output_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>,
) -> Vec<TxIndex> {
let mut vec = Vec::new();
let block_first_txindex = block_first_txindex.to_usize();
for tx_offset in 0..block_tx_count as usize {
let txindex = TxIndex::from(block_first_txindex + tx_offset);
let output_count = u64::from(txindex_to_output_count.get_unwrap(txindex));
let counts: Vec<_> = (0..block_tx_count as usize)
.map(|tx_offset| {
let txindex = TxIndex::from(block_first_txindex + tx_offset);
u64::from(txindex_to_output_count.get_unwrap(txindex))
})
.collect();
let total: u64 = counts.iter().sum();
let mut vec = Vec::with_capacity(total as usize);
for (tx_offset, &output_count) in counts.iter().enumerate() {
let txindex = TxIndex::from(block_first_txindex + tx_offset);
for _ in 0..output_count {
vec.push(txindex);
}
@@ -81,13 +88,20 @@ pub fn build_txinindex_to_txindex<'a>(
block_tx_count: u64,
txindex_to_input_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>,
) -> Vec<TxIndex> {
let mut vec = Vec::new();
let block_first_txindex = block_first_txindex.to_usize();
for tx_offset in 0..block_tx_count as usize {
let txindex = TxIndex::from(block_first_txindex + tx_offset);
let input_count = u64::from(txindex_to_input_count.get_unwrap(txindex));
let counts: Vec<_> = (0..block_tx_count as usize)
.map(|tx_offset| {
let txindex = TxIndex::from(block_first_txindex + tx_offset);
u64::from(txindex_to_input_count.get_unwrap(txindex))
})
.collect();
let total: u64 = counts.iter().sum();
let mut vec = Vec::with_capacity(total as usize);
for (tx_offset, &input_count) in counts.iter().enumerate() {
let txindex = TxIndex::from(block_first_txindex + tx_offset);
for _ in 0..input_count {
vec.push(txindex);
}

View File

@@ -73,11 +73,10 @@ impl AddressTypeToVec<(TypeIndex, Sats)> {
let amount = prev_amount + value;
if is_new
|| from_any_empty
|| vecs.amount_range.get_mut(amount).filter().clone()
!= vecs.amount_range.get_mut(prev_amount).filter().clone()
{
let filters_differ =
vecs.amount_range.get(amount).filter() != vecs.amount_range.get(prev_amount).filter();
if is_new || from_any_empty || filters_differ {
if !is_new && !from_any_empty {
vecs.amount_range
.get_mut(prev_amount)
@@ -162,10 +161,10 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> {
let will_be_empty = addressdata.has_1_utxos();
if will_be_empty
|| vecs.amount_range.get_mut(amount).filter().clone()
!= vecs.amount_range.get_mut(prev_amount).filter().clone()
{
let filters_differ =
vecs.amount_range.get(amount).filter() != vecs.amount_range.get(prev_amount).filter();
if will_be_empty || filters_differ {
vecs.amount_range
.get_mut(prev_amount)
.state.um()

View File

@@ -1,4 +1,4 @@
use std::{ops::ControlFlow, path::Path};
use std::path::Path;
use brk_error::Result;
use brk_grouper::{
@@ -9,7 +9,7 @@ use brk_grouper::{
use brk_traversable::Traversable;
use brk_types::{
Bitcoin, CheckedSub, DateIndex, Dollars, HalvingEpoch, Height, OutputType, Sats, Timestamp,
Version,
Version, ONE_DAY_IN_SEC,
};
use derive_deref::{Deref, DerefMut};
use rayon::prelude::*;
@@ -214,6 +214,11 @@ impl Vecs {
let prev_timestamp = chain_state.last().unwrap().timestamp;
// Only blocks whose age % ONE_DAY >= threshold can cross a day boundary.
// Saves 1 subtraction + 2 divisions per block vs computing days_old directly.
let elapsed = (*timestamp).saturating_sub(*prev_timestamp);
let threshold = ONE_DAY_IN_SEC.saturating_sub(elapsed);
// Extract all mutable references upfront to avoid borrow checker issues
// Use a single destructuring to get non-overlapping mutable borrows
let UTXOGroups {
@@ -241,15 +246,19 @@ impl Vecs {
),
];
let _ = chain_state
chain_state
.iter()
.try_for_each(|block_state| -> ControlFlow<()> {
.filter(|block_state| {
let age = (*prev_timestamp).saturating_sub(*block_state.timestamp);
age % ONE_DAY_IN_SEC >= threshold
})
.for_each(|block_state| {
let prev_days_old =
prev_timestamp.difference_in_days_between(block_state.timestamp);
let days_old = timestamp.difference_in_days_between(block_state.timestamp);
if prev_days_old == days_old {
return ControlFlow::Continue(());
return;
}
vecs.iter_mut().for_each(|(filter, state)| {
@@ -286,8 +295,6 @@ impl Vecs {
}
});
}
ControlFlow::Continue(())
});
}
@@ -325,8 +332,9 @@ impl Vecs {
),
];
let last_timestamp = chain_state.last().unwrap().timestamp;
let current_price = chain_state.last().unwrap().price;
let last_block = chain_state.last().unwrap();
let last_timestamp = last_block.timestamp;
let current_price = last_block.price;
let chain_state_len = chain_state.len();