diff --git a/Cargo.lock b/Cargo.lock index 962c8b844..80d10b318 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -625,6 +625,7 @@ dependencies = [ "brk_store", "brk_traversable", "brk_types", + "color-eyre", "derive_deref", "log", "pco", @@ -706,6 +707,7 @@ dependencies = [ "brk_store", "brk_traversable", "brk_types", + "color-eyre", "fjall", "log", "rayon", diff --git a/Cargo.toml b/Cargo.toml index 19534e352..f1548c28c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ brk_traversable = { version = "0.0.111", path = "crates/brk_traversable", featur brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_derive" } byteview = "=0.6.1" # byteview = "~0.8.0" +color-eyre = "0.6.5" derive_deref = "1.1.1" fjall2 = { version = "2.11.8", package = "brk_fjall" } # fjall2 = { path = "../fjall2", package = "brk_fjall" } diff --git a/crates/brk_cli/Cargo.toml b/crates/brk_cli/Cargo.toml index e2eece151..7fb7f8c21 100644 --- a/crates/brk_cli/Cargo.toml +++ b/crates/brk_cli/Cargo.toml @@ -23,7 +23,7 @@ brk_rpc = { workspace = true } brk_server = { workspace = true } vecdb = { workspace = true } clap = { version = "4.5.53", features = ["derive", "string"] } -color-eyre = "0.6.5" +color-eyre = { workspace = true } log = { workspace = true } minreq = { workspace = true } serde = { workspace = true } diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index a48502e1f..519699d42 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -30,3 +30,6 @@ rustc-hash = { workspace = true } serde = { workspace = true } smallvec = "1.15.1" vecdb = { workspace = true } + +[dev-dependencies] +color-eyre = { workspace = true } diff --git a/crates/brk_computer/examples/computer.rs b/crates/brk_computer/examples/computer.rs index 35a8c942e..743ad60f6 100644 --- a/crates/brk_computer/examples/computer.rs +++ b/crates/brk_computer/examples/computer.rs @@ -14,13 +14,17 @@ use brk_reader::Reader; use brk_rpc::{Auth, Client}; use vecdb::Exit; -pub fn main() -> Result<()> { +pub fn main() -> color_eyre::Result<()> { + color_eyre::install()?; + // Can't increase main thread's stack size, thus we need to use another thread thread::Builder::new() .stack_size(512 * 1024 * 1024) .spawn(run)? .join() - .unwrap() + .unwrap()?; + + Ok(()) } fn run() -> Result<()> { diff --git a/crates/brk_computer/src/grouped/builder_eager.rs b/crates/brk_computer/src/grouped/builder_eager.rs index 1126a2a8e..11d4b613a 100644 --- a/crates/brk_computer/src/grouped/builder_eager.rs +++ b/crates/brk_computer/src/grouped/builder_eager.rs @@ -2,14 +2,16 @@ use brk_error::{Error, Result}; use brk_traversable::Traversable; use brk_types::{CheckedSub, StoredU64, Version}; use vecdb::{ - AnyStoredVec, AnyVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableVec, - PcoVec, VecIndex, VecValue, + AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableVec, PcoVec, + VecIndex, VecValue, }; -use crate::utils::{get_percentile, OptionExt}; +use crate::utils::{OptionExt, get_percentile}; use super::ComputedVecValue; +const VERSION: Version = Version::ZERO; + #[derive(Clone, Debug, Traversable)] pub struct EagerVecsBuilder where @@ -30,8 +32,6 @@ where pub cumulative: Option>>>, } -const VERSION: Version = Version::ZERO; - impl EagerVecsBuilder where I: VecIndex, @@ -45,29 +45,42 @@ where ) -> Result { let only_one_active = options.is_only_one_active(); let suffix = |s: &str| format!("{name}_{s}"); - let maybe_suffix = |s: &str| if only_one_active { name.to_string() } else { suffix(s) }; + let maybe_suffix = |s: &str| { + if only_one_active { + name.to_string() + } else { + suffix(s) + } + }; let v = version + VERSION; macro_rules! import { - ($s:expr) => { Box::new(EagerVec::forced_import(db, &maybe_suffix($s), v).unwrap()) }; + ($s:expr) => { + Box::new(EagerVec::forced_import(db, &maybe_suffix($s), v).unwrap()) + }; } let s = Self { first: options.first.then(|| import!("first")), - last: options.last.then(|| Box::new(EagerVec::forced_import(db, name, v).unwrap())), + last: options + .last + .then(|| Box::new(EagerVec::forced_import(db, name, v).unwrap())), min: options.min.then(|| import!("min")), max: options.max.then(|| import!("max")), median: options.median.then(|| import!("median")), average: options.average.then(|| import!("avg")), sum: options.sum.then(|| { - let sum_name = if !options.last && !options.average && !options.min && !options.max { + let sum_name = if !options.last && !options.average && !options.min && !options.max + { name.to_string() } else { maybe_suffix("sum") }; Box::new(EagerVec::forced_import(db, &sum_name, v).unwrap()) }), - cumulative: options.cumulative.then(|| Box::new(EagerVec::forced_import(db, &suffix("cumulative"), v).unwrap())), + cumulative: options + .cumulative + .then(|| Box::new(EagerVec::forced_import(db, &suffix("cumulative"), v).unwrap())), pct90: options.pct90.then(|| import!("pct90")), pct75: options.pct75.then(|| import!("pct75")), pct25: options.pct25.then(|| import!("pct25")), @@ -77,6 +90,125 @@ where Ok(s) } + #[inline] + fn needs_percentiles(&self) -> bool { + self.pct90.is_some() + || self.pct75.is_some() + || self.median.is_some() + || self.pct25.is_some() + || self.pct10.is_some() + } + + #[inline] + fn needs_minmax(&self) -> bool { + self.max.is_some() || self.min.is_some() + } + + #[inline] + fn needs_sum_or_cumulative(&self) -> bool { + self.sum.is_some() || self.cumulative.is_some() + } + + #[inline] + fn needs_average_sum_or_cumulative(&self) -> bool { + self.needs_sum_or_cumulative() || self.average.is_some() + } + + /// Compute min/max in O(n) without sorting or collecting + #[inline] + fn compute_minmax_streaming( + &mut self, + index: usize, + iter: impl Iterator, + ) -> Result<()> { + let mut min_val: Option = None; + let mut max_val: Option = None; + let need_min = self.min.is_some(); + let need_max = self.max.is_some(); + + for val in iter { + if need_min { + min_val = Some(min_val.map_or(val, |m| if val < m { val } else { m })); + } + if need_max { + max_val = Some(max_val.map_or(val, |m| if val > m { val } else { m })); + } + } + + if let Some(min) = self.min.as_mut() { + min.truncate_push_at(index, min_val.unwrap())?; + } + if let Some(max) = self.max.as_mut() { + max.truncate_push_at(index, max_val.unwrap())?; + } + Ok(()) + } + + /// Compute min/max from collected values in O(n) without sorting + #[inline] + fn compute_minmax_from_slice(&mut self, index: usize, values: &[T]) -> Result<()> { + if let Some(min) = self.min.as_mut() { + min.truncate_push_at(index, *values.iter().min().unwrap())?; + } + if let Some(max) = self.max.as_mut() { + max.truncate_push_at(index, *values.iter().max().unwrap())?; + } + Ok(()) + } + + /// Compute percentiles from sorted values (assumes values is already sorted) + fn compute_percentiles_from_sorted(&mut self, index: usize, values: &[T]) -> Result<()> { + if let Some(max) = self.max.as_mut() { + max.truncate_push_at(index, *values.last().ok_or(Error::Str("expect some"))?)?; + } + if let Some(pct90) = self.pct90.as_mut() { + pct90.truncate_push_at(index, get_percentile(values, 0.90))?; + } + if let Some(pct75) = self.pct75.as_mut() { + pct75.truncate_push_at(index, get_percentile(values, 0.75))?; + } + if let Some(median) = self.median.as_mut() { + median.truncate_push_at(index, get_percentile(values, 0.50))?; + } + if let Some(pct25) = self.pct25.as_mut() { + pct25.truncate_push_at(index, get_percentile(values, 0.25))?; + } + if let Some(pct10) = self.pct10.as_mut() { + pct10.truncate_push_at(index, get_percentile(values, 0.10))?; + } + if let Some(min) = self.min.as_mut() { + min.truncate_push_at(index, *values.first().unwrap())?; + } + Ok(()) + } + + /// Compute sum, average, and cumulative from values + fn compute_aggregates( + &mut self, + index: usize, + values: Vec, + cumulative: &mut Option, + ) -> Result<()> { + let len = values.len(); + let sum = values.into_iter().fold(T::from(0), |a, b| a + b); + + if let Some(average) = self.average.as_mut() { + average.truncate_push_at(index, sum / len)?; + } + + if self.needs_sum_or_cumulative() { + if let Some(sum_vec) = self.sum.as_mut() { + sum_vec.truncate_push_at(index, sum)?; + } + if let Some(cumulative_vec) = self.cumulative.as_mut() { + let t = cumulative.unwrap() + sum; + cumulative.replace(t); + cumulative_vec.truncate_push_at(index, t)?; + } + } + Ok(()) + } + pub fn extend( &mut self, max_from: I, @@ -170,95 +302,33 @@ where last.truncate_push_at(index, v)?; } - let needs_sum_or_cumulative = self.sum.is_some() || self.cumulative.is_some(); - let needs_average_sum_or_cumulative = - needs_sum_or_cumulative || self.average.is_some(); - let needs_sorted = self.max.is_some() - || self.pct90.is_some() - || self.pct75.is_some() - || self.median.is_some() - || self.pct25.is_some() - || self.pct10.is_some() - || self.min.is_some(); - let needs_values = needs_sorted || needs_average_sum_or_cumulative; + let needs_percentiles = self.needs_percentiles(); + let needs_minmax = self.needs_minmax(); + let needs_aggregates = self.needs_average_sum_or_cumulative(); - if needs_values { + // Fast path: only min/max needed, no sorting or allocation required + if needs_minmax && !needs_percentiles && !needs_aggregates { + source_iter.set_position(first_index); + self.compute_minmax_streaming( + index, + (&mut source_iter).take(*count_index as usize), + )?; + } else if needs_percentiles || needs_aggregates { source_iter.set_position(first_index); let mut values = (&mut source_iter) .take(*count_index as usize) .collect::>(); - if needs_sorted { + if needs_percentiles { values.sort_unstable(); - - if let Some(max) = self.max.as_mut() { - max.truncate_push_at( - index, - *values - .last() - .ok_or(Error::Str("expect some")) - .inspect_err(|_| { - dbg!( - &values, - max.name(), - index, - first_indexes.name(), - first_index, - count_indexes.name(), - count_index, - source.len(), - source.name() - ); - }) - .unwrap(), - )?; - } - - if let Some(pct90) = self.pct90.as_mut() { - pct90.truncate_push_at(index, get_percentile(&values, 0.90))?; - } - - if let Some(pct75) = self.pct75.as_mut() { - pct75.truncate_push_at(index, get_percentile(&values, 0.75))?; - } - - if let Some(median) = self.median.as_mut() { - median.truncate_push_at(index, get_percentile(&values, 0.50))?; - } - - if let Some(pct25) = self.pct25.as_mut() { - pct25.truncate_push_at(index, get_percentile(&values, 0.25))?; - } - - if let Some(pct10) = self.pct10.as_mut() { - pct10.truncate_push_at(index, get_percentile(&values, 0.10))?; - } - - if let Some(min) = self.min.as_mut() { - min.truncate_push_at(index, *values.first().unwrap())?; - } + self.compute_percentiles_from_sorted(index, &values)?; + } else if needs_minmax { + // We have values collected but only need min/max (along with aggregates) + self.compute_minmax_from_slice(index, &values)?; } - if needs_average_sum_or_cumulative { - let len = values.len(); - let sum = values.into_iter().fold(T::from(0), |a, b| a + b); - - if let Some(average) = self.average.as_mut() { - let avg = sum / len; - average.truncate_push_at(index, avg)?; - } - - if needs_sum_or_cumulative { - if let Some(sum_vec) = self.sum.as_mut() { - sum_vec.truncate_push_at(index, sum)?; - } - - if let Some(cumulative_vec) = self.cumulative.as_mut() { - let t = cumulative.unwrap() + sum; - cumulative.replace(t); - cumulative_vec.truncate_push_at(index, t)?; - } - } + if needs_aggregates { + self.compute_aggregates(index, values, &mut cumulative)?; } } @@ -282,13 +352,8 @@ where where A: VecIndex + VecValue + CheckedSub, { - if self.pct90.is_some() - || self.pct75.is_some() - || self.median.is_some() - || self.pct25.is_some() - || self.pct10.is_some() - { - panic!("unsupported"); + if self.needs_percentiles() { + panic!("percentiles unsupported in from_aligned"); } self.validate_computed_version_or_reset( @@ -334,59 +399,50 @@ where last.truncate_push_at(index, v)?; } - let needs_sum_or_cumulative = self.sum.is_some() || self.cumulative.is_some(); - let needs_average_sum_or_cumulative = - needs_sum_or_cumulative || self.average.is_some(); - let needs_sorted = self.max.is_some() || self.min.is_some(); - let needs_values = needs_sorted || needs_average_sum_or_cumulative; + let needs_minmax = self.needs_minmax(); + let needs_aggregates = self.needs_average_sum_or_cumulative(); - if needs_values { - if needs_sorted { + if needs_minmax || needs_aggregates { + // Min/max: use streaming O(n) instead of sort O(n log n) + if needs_minmax { if let Some(max) = self.max.as_mut() { let source_max_iter = source_max_iter.um(); source_max_iter.set_position(first_index); - let mut values = source_max_iter - .take(*count_index as usize) - .collect::>(); - values.sort_unstable(); - max.truncate_push_at(index, *values.last().unwrap())?; + let max_val = + source_max_iter.take(*count_index as usize).max().unwrap(); + max.truncate_push_at(index, max_val)?; } if let Some(min) = self.min.as_mut() { let source_min_iter = source_min_iter.um(); source_min_iter.set_position(first_index); - let mut values = source_min_iter - .take(*count_index as usize) - .collect::>(); - values.sort_unstable(); - min.truncate_push_at(index, *values.first().unwrap())?; + let min_val = + source_min_iter.take(*count_index as usize).min().unwrap(); + min.truncate_push_at(index, min_val)?; } } - if needs_average_sum_or_cumulative { + if needs_aggregates { if let Some(average) = self.average.as_mut() { let source_average_iter = source_average_iter.um(); source_average_iter.set_position(first_index); - let values = source_average_iter + let mut len = 0usize; + let sum = (&mut *source_average_iter) .take(*count_index as usize) - .collect::>(); - - let len = values.len(); - let cumulative = values.into_iter().fold(T::from(0), |a, b| a + b); + .inspect(|_| len += 1) + .fold(T::from(0), |a, b| a + b); // TODO: Multiply by count then divide by cumulative // Right now it's not 100% accurate as there could be more or less elements in the lower timeframe (28 days vs 31 days in a month for example) - let avg = cumulative / len; + let avg = sum / len; average.truncate_push_at(index, avg)?; } - if needs_sum_or_cumulative { + if self.needs_sum_or_cumulative() { let source_sum_iter = source_sum_iter.um(); source_sum_iter.set_position(first_index); - let values = source_sum_iter + let sum = source_sum_iter .take(*count_index as usize) - .collect::>(); - - let sum = values.into_iter().fold(T::from(0), |a, b| a + b); + .fold(T::from(0), |a, b| a + b); if let Some(sum_vec) = self.sum.as_mut() { sum_vec.truncate_push_at(index, sum)?; @@ -415,45 +471,57 @@ where )) } + #[inline] pub fn unwrap_first(&self) -> &EagerVec> { self.first.u() } + #[inline] #[allow(unused)] pub fn unwrap_average(&self) -> &EagerVec> { self.average.u() } + #[inline] pub fn unwrap_sum(&self) -> &EagerVec> { self.sum.u() } + #[inline] pub fn unwrap_max(&self) -> &EagerVec> { self.max.u() } + #[inline] #[allow(unused)] pub fn unwrap_pct90(&self) -> &EagerVec> { self.pct90.u() } + #[inline] #[allow(unused)] pub fn unwrap_pct75(&self) -> &EagerVec> { self.pct75.u() } + #[inline] #[allow(unused)] pub fn unwrap_median(&self) -> &EagerVec> { self.median.u() } + #[inline] #[allow(unused)] pub fn unwrap_pct25(&self) -> &EagerVec> { self.pct25.u() } + #[inline] #[allow(unused)] pub fn unwrap_pct10(&self) -> &EagerVec> { self.pct10.u() } + #[inline] pub fn unwrap_min(&self) -> &EagerVec> { self.min.u() } + #[inline] pub fn unwrap_last(&self) -> &EagerVec> { self.last.u() } + #[inline] #[allow(unused)] pub fn unwrap_cumulative(&self) -> &EagerVec> { self.cumulative.u() diff --git a/crates/brk_computer/src/grouped/builder_lazy.rs b/crates/brk_computer/src/grouped/builder_lazy.rs index 97a886fe5..698e14341 100644 --- a/crates/brk_computer/src/grouped/builder_lazy.rs +++ b/crates/brk_computer/src/grouped/builder_lazy.rs @@ -149,15 +149,17 @@ where if i.to_usize() >= len_source.vec_len() { return None; } - let vec = S1I::inclusive_range_from(i, source.vec_len()) + let mut sum = T::from(0); + let mut len = 0usize; + for v in S1I::inclusive_range_from(i, source.vec_len()) .flat_map(|i| source.get_at(i)) - .collect::>(); - if vec.is_empty() { + { + sum += v; + len += 1; + } + if len == 0 { return None; } - let mut sum = T::from(0); - let len = vec.len(); - vec.into_iter().for_each(|v| sum += v); Some(sum / len) }, )) @@ -179,14 +181,17 @@ where if i.to_usize() >= len_source.vec_len() { return None; } - let vec = S1I::inclusive_range_from(i, source.vec_len()) + let mut sum = T::from(0); + let mut has_values = false; + for v in S1I::inclusive_range_from(i, source.vec_len()) .flat_map(|i| source.get_at(i)) - .collect::>(); - if vec.is_empty() { + { + sum += v; + has_values = true; + } + if !has_values { return None; } - let mut sum = T::from(0); - vec.into_iter().for_each(|v| sum += v); Some(sum) }, )) diff --git a/crates/brk_computer/src/grouped/ratio_from_dateindex.rs b/crates/brk_computer/src/grouped/ratio_from_dateindex.rs index 33e9d10f6..94849abb6 100644 --- a/crates/brk_computer/src/grouped/ratio_from_dateindex.rs +++ b/crates/brk_computer/src/grouped/ratio_from_dateindex.rs @@ -210,6 +210,14 @@ impl ComputedRatioVecsFromDateIndex { sorted.sort_unstable(); + // Cache mutable refs before the loop to avoid repeated unwrap chains + let pct1_vec = self.ratio_pct1.um().dateindex.um(); + let pct2_vec = self.ratio_pct2.um().dateindex.um(); + let pct5_vec = self.ratio_pct5.um().dateindex.um(); + let pct95_vec = self.ratio_pct95.um().dateindex.um(); + let pct98_vec = self.ratio_pct98.um().dateindex.um(); + let pct99_vec = self.ratio_pct99.um().dateindex.um(); + self.ratio .dateindex .as_ref() @@ -219,94 +227,22 @@ impl ComputedRatioVecsFromDateIndex { .skip(starting_dateindex.to_usize()) .try_for_each(|(index, ratio)| -> Result<()> { if index < min_ratio_date_usize { - self.ratio_pct5 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, StoredF32::NAN)?; - self.ratio_pct2 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, StoredF32::NAN)?; - self.ratio_pct1 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, StoredF32::NAN)?; - self.ratio_pct95 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, StoredF32::NAN)?; - self.ratio_pct98 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, StoredF32::NAN)?; - self.ratio_pct99 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, StoredF32::NAN)?; + pct1_vec.truncate_push_at(index, StoredF32::NAN)?; + pct2_vec.truncate_push_at(index, StoredF32::NAN)?; + pct5_vec.truncate_push_at(index, StoredF32::NAN)?; + pct95_vec.truncate_push_at(index, StoredF32::NAN)?; + pct98_vec.truncate_push_at(index, StoredF32::NAN)?; + pct99_vec.truncate_push_at(index, StoredF32::NAN)?; } else { let pos = sorted.binary_search(&ratio).unwrap_or_else(|pos| pos); sorted.insert(pos, ratio); - self.ratio_pct1 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, get_percentile(&sorted, 0.01))?; - self.ratio_pct2 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, get_percentile(&sorted, 0.02))?; - self.ratio_pct5 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, get_percentile(&sorted, 0.05))?; - self.ratio_pct95 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, get_percentile(&sorted, 0.95))?; - self.ratio_pct98 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, get_percentile(&sorted, 0.98))?; - self.ratio_pct99 - .as_mut() - .unwrap() - .dateindex - .as_mut() - .unwrap() - .truncate_push_at(index, get_percentile(&sorted, 0.99))?; + pct1_vec.truncate_push_at(index, get_percentile(&sorted, 0.01))?; + pct2_vec.truncate_push_at(index, get_percentile(&sorted, 0.02))?; + pct5_vec.truncate_push_at(index, get_percentile(&sorted, 0.05))?; + pct95_vec.truncate_push_at(index, get_percentile(&sorted, 0.95))?; + pct98_vec.truncate_push_at(index, get_percentile(&sorted, 0.98))?; + pct99_vec.truncate_push_at(index, get_percentile(&sorted, 0.99))?; } Ok(()) diff --git a/crates/brk_computer/src/pools/vecs.rs b/crates/brk_computer/src/pools/vecs.rs index c8fd3b812..e9c711711 100644 --- a/crates/brk_computer/src/pools/vecs.rs +++ b/crates/brk_computer/src/pools/vecs.rs @@ -54,7 +54,12 @@ impl Vecs { macro_rules! import_di { ($name:expr) => { ComputedVecsFromDateIndex::forced_import( - db, &suffix($name), Source::Compute, version, indexes, last.clone(), + db, + &suffix($name), + Source::Compute, + version, + indexes, + last.clone(), )? }; } @@ -62,19 +67,42 @@ impl Vecs { Ok(Self { id, indexes_to_blocks_mined: ComputedVecsFromHeight::forced_import( - db, &suffix("blocks_mined"), Source::Compute, version, indexes, sum_cum.clone(), + db, + &suffix("blocks_mined"), + Source::Compute, + version, + indexes, + sum_cum, )?, indexes_to_1w_blocks_mined: import_di!("1w_blocks_mined"), indexes_to_1m_blocks_mined: import_di!("1m_blocks_mined"), indexes_to_1y_blocks_mined: import_di!("1y_blocks_mined"), indexes_to_subsidy: ComputedValueVecsFromHeight::forced_import( - db, &suffix("subsidy"), Source::Compute, version, sum_cum.clone(), compute_dollars, indexes, + db, + &suffix("subsidy"), + Source::Compute, + version, + sum_cum, + compute_dollars, + indexes, )?, indexes_to_fee: ComputedValueVecsFromHeight::forced_import( - db, &suffix("fee"), Source::Compute, version, sum_cum.clone(), compute_dollars, indexes, + db, + &suffix("fee"), + Source::Compute, + version, + sum_cum, + compute_dollars, + indexes, )?, indexes_to_coinbase: ComputedValueVecsFromHeight::forced_import( - db, &suffix("coinbase"), Source::Compute, version, sum_cum, compute_dollars, indexes, + db, + &suffix("coinbase"), + Source::Compute, + version, + sum_cum, + compute_dollars, + indexes, )?, indexes_to_dominance: import_di!("dominance"), indexes_to_1d_dominance: import_di!("1d_dominance"), diff --git a/crates/brk_computer/src/stateful/address_indexes.rs b/crates/brk_computer/src/stateful/address_indexes.rs index 1206c9f67..6541fb28e 100644 --- a/crates/brk_computer/src/stateful/address_indexes.rs +++ b/crates/brk_computer/src/stateful/address_indexes.rs @@ -6,7 +6,7 @@ use brk_types::{ P2PKHAddressIndex, P2SHAddressIndex, P2TRAddressIndex, P2WPKHAddressIndex, P2WSHAddressIndex, TypeIndex, }; -use vecdb::{AnyStoredVec, BytesVec, GenericStoredVec, Reader, Stamp}; +use vecdb::{AnyStoredVec, AnyVec, BytesVec, GenericStoredVec, Reader, Stamp}; #[derive(Clone, Traversable)] pub struct AnyAddressIndexesVecs { @@ -99,6 +99,24 @@ impl AnyAddressIndexesVecs { typeindex: TypeIndex, anyaddressindex: AnyAddressIndex, ) -> Result<()> { + let vec_len = match address_type { + OutputType::P2PK33 => self.p2pk33.len(), + OutputType::P2PK65 => self.p2pk65.len(), + OutputType::P2PKH => self.p2pkh.len(), + OutputType::P2SH => self.p2sh.len(), + OutputType::P2TR => self.p2tr.len(), + OutputType::P2WPKH => self.p2wpkh.len(), + OutputType::P2WSH => self.p2wsh.len(), + OutputType::P2A => self.p2a.len(), + _ => unreachable!(), + }; + let typeindex_usize: usize = typeindex.into(); + if typeindex_usize > vec_len { + eprintln!( + "DEBUG update_or_push: address_type={:?}, typeindex={}, vec_len={}, anyaddressindex={:?}", + address_type, typeindex_usize, vec_len, anyaddressindex + ); + } (match address_type { OutputType::P2PK33 => self .p2pk33 diff --git a/crates/brk_computer/src/stateful/mod.rs b/crates/brk_computer/src/stateful/mod.rs index 8477566ee..95a7565b1 100644 --- a/crates/brk_computer/src/stateful/mod.rs +++ b/crates/brk_computer/src/stateful/mod.rs @@ -55,6 +55,12 @@ type TxIndexVec = SmallVec<[TxIndex; 4]>; const VERSION: Version = Version::new(21); +const BIP30_DUPLICATE_COINBASE_HEIGHT_1: u32 = 91_842; +const BIP30_DUPLICATE_COINBASE_HEIGHT_2: u32 = 91_880; +const BIP30_ORIGINAL_COINBASE_HEIGHT_1: u32 = 91_812; +const BIP30_ORIGINAL_COINBASE_HEIGHT_2: u32 = 91_722; +const FLUSH_INTERVAL: usize = 10_000; + #[derive(Clone, Traversable)] pub struct Vecs { db: Database, @@ -461,18 +467,9 @@ impl Vecs { Ordering::Less => Height::ZERO, }; - // info!("stateful_starting_height = {stateful_starting_height}"); - // let stateful_starting_height = stateful_starting_height - // .checked_sub(Height::new(1)) - // .unwrap_or_default(); - // info!("stateful_starting_height = {stateful_starting_height}"); - let starting_height = starting_indexes.height.min(stateful_starting_height); - // info!("starting_height = {starting_height}"); let last_height = Height::from(indexer.vecs.height_to_blockhash.stamp()); - // info!("last_height = {last_height}"); if starting_height <= last_height { - // info!("starting_height = {starting_height}"); let stamp = starting_height.into(); let starting_height = if starting_height.is_not_zero() { @@ -480,12 +477,6 @@ impl Vecs { .into_iter() .chain(self.any_address_indexes.rollback_before(stamp)?) .chain(self.addresses_data.rollback_before(stamp)?) - // .enumerate() - // .map(|(i, s)| { - // let h = Height::from(s).incremented(); - // // dbg!((i, s, h)); - // h - // }) .map(Height::from) .map(Height::incremented) .collect::>(); @@ -509,7 +500,6 @@ impl Vecs { } else { Height::ZERO }; - // info!("starting_height = {starting_height}"); let starting_height = if starting_height.is_not_zero() && separate_address_vecs @@ -543,8 +533,6 @@ impl Vecs { result }; - // info!("starting_height = {starting_height}"); - let mut chain_state: Vec; if starting_height.is_not_zero() { chain_state = self @@ -568,8 +556,6 @@ impl Vecs { } else { info!("Starting processing utxos from the start"); - // std::process::exit(0); - chain_state = vec![]; self.any_address_indexes.reset()?; @@ -770,6 +756,15 @@ impl Vecs { let typeindex = txoutindex_to_typeindex .read_unwrap(txoutindex, &ir.txoutindex_to_typeindex); + let typeindex_usize: usize = typeindex.into(); + if output_type == OutputType::P2SH && typeindex_usize > 100_000_000 { + let txoutindex_usize: usize = txoutindex.into(); + eprintln!( + "DEBUG P2SH bad typeindex at read: txoutindex={}, typeindex={}, txindex={}", + txoutindex_usize, typeindex_usize, txindex.to_usize() + ); + } + let addressdata_opt = Self::get_addressdatawithsource( output_type, typeindex, @@ -1095,34 +1090,34 @@ impl Vecs { .unwrap(); }); - if chain_state_starting_height > height { - dbg!(chain_state_starting_height, height); - panic!("temp, just making sure") - } + debug_assert!( + chain_state_starting_height <= height, + "chain_state_starting_height ({chain_state_starting_height}) > height ({height})" + ); - unspendable_supply += transacted - .by_type - .unspendable - .as_vec() - .into_iter() - .map(|state| state.value) - .sum::() + // NOTE: If ByUnspendableType gains more fields, change to .as_vec().into_iter().map(|s| s.value).sum() + unspendable_supply += transacted.by_type.unspendable.opreturn.value + height_to_unclaimed_rewards_iter.get_unwrap(height); opreturn_supply += transacted.by_type.unspendable.opreturn.value; - if height == Height::new(0) { + if height == Height::ZERO { transacted = Transacted::default(); unspendable_supply += Sats::FIFTY_BTC; - } else if height == Height::new(91_842) || height == Height::new(91_880) { - // Need to destroy invalid coinbases due to duplicate txids - if height == Height::new(91_842) { - height_to_sent.entry(Height::new(91_812)).or_default() + } else if height == Height::new(BIP30_DUPLICATE_COINBASE_HEIGHT_1) + || height == Height::new(BIP30_DUPLICATE_COINBASE_HEIGHT_2) + { + if height == Height::new(BIP30_DUPLICATE_COINBASE_HEIGHT_1) { + height_to_sent + .entry(Height::new(BIP30_ORIGINAL_COINBASE_HEIGHT_1)) + .or_default() } else { - height_to_sent.entry(Height::new(91_722)).or_default() + height_to_sent + .entry(Height::new(BIP30_ORIGINAL_COINBASE_HEIGHT_2)) + .or_default() } .iterate(Sats::FIFTY_BTC, OutputType::P2PK65); - }; + } // Push current block state before processing sends and receives chain_state.push(BlockState { @@ -1182,7 +1177,7 @@ impl Vecs { if height != last_height && height != Height::ZERO - && height.to_usize() % 10_000 == 0 + && height.to_usize() % FLUSH_INTERVAL == 0 { let _lock = exit.lock(); @@ -1388,7 +1383,16 @@ impl Vecs { any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, ) -> Option> { - if *first_addressindexes.get(address_type).unwrap() <= typeindex { + let first = *first_addressindexes.get(address_type).unwrap(); + if first <= typeindex { + let typeindex_usize: usize = typeindex.into(); + let first_usize: usize = first.into(); + if typeindex_usize > 100_000_000 { + eprintln!( + "DEBUG get_addressdatawithsource NEW: address_type={:?}, typeindex={}, first_addressindex={}", + address_type, typeindex_usize, first_usize + ); + } return Some(WithAddressDataSource::New(LoadedAddressData::default())); } @@ -1474,6 +1478,13 @@ impl Vecs { addresstype_to_typeindex_to_emptyaddressdata.into_sorted_iter() { for (typeindex, emptyaddressdata_with_source) in sorted.into_iter() { + let typeindex_usize: usize = typeindex.into(); + if typeindex_usize > 100_000_000 { + eprintln!( + "DEBUG emptyaddressdata: address_type={:?}, typeindex={}, variant={:?}", + address_type, typeindex_usize, std::mem::discriminant(&emptyaddressdata_with_source) + ); + } match emptyaddressdata_with_source { WithAddressDataSource::New(emptyaddressdata) => { let emptyaddressindex = self @@ -1521,6 +1532,13 @@ impl Vecs { addresstype_to_typeindex_to_loadedaddressdata.into_sorted_iter() { for (typeindex, loadedaddressdata_with_source) in sorted.into_iter() { + let typeindex_usize: usize = typeindex.into(); + if typeindex_usize > 100_000_000 { + eprintln!( + "DEBUG loadedaddressdata: address_type={:?}, typeindex={}, variant={:?}", + address_type, typeindex_usize, std::mem::discriminant(&loadedaddressdata_with_source) + ); + } match loadedaddressdata_with_source { WithAddressDataSource::New(loadedaddressdata) => { let loadedaddressindex = self diff --git a/crates/brk_computer/src/stateful/readers.rs b/crates/brk_computer/src/stateful/readers.rs index 7cac3e696..5597fac56 100644 --- a/crates/brk_computer/src/stateful/readers.rs +++ b/crates/brk_computer/src/stateful/readers.rs @@ -61,13 +61,20 @@ pub fn build_txoutindex_to_txindex<'a>( block_tx_count: u64, txindex_to_output_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>, ) -> Vec { - let mut vec = Vec::new(); - let block_first_txindex = block_first_txindex.to_usize(); - for tx_offset in 0..block_tx_count as usize { - let txindex = TxIndex::from(block_first_txindex + tx_offset); - let output_count = u64::from(txindex_to_output_count.get_unwrap(txindex)); + let counts: Vec<_> = (0..block_tx_count as usize) + .map(|tx_offset| { + let txindex = TxIndex::from(block_first_txindex + tx_offset); + u64::from(txindex_to_output_count.get_unwrap(txindex)) + }) + .collect(); + + let total: u64 = counts.iter().sum(); + let mut vec = Vec::with_capacity(total as usize); + + for (tx_offset, &output_count) in counts.iter().enumerate() { + let txindex = TxIndex::from(block_first_txindex + tx_offset); for _ in 0..output_count { vec.push(txindex); } @@ -81,13 +88,20 @@ pub fn build_txinindex_to_txindex<'a>( block_tx_count: u64, txindex_to_input_count: &mut BoxedVecIterator<'a, TxIndex, StoredU64>, ) -> Vec { - let mut vec = Vec::new(); - let block_first_txindex = block_first_txindex.to_usize(); - for tx_offset in 0..block_tx_count as usize { - let txindex = TxIndex::from(block_first_txindex + tx_offset); - let input_count = u64::from(txindex_to_input_count.get_unwrap(txindex)); + let counts: Vec<_> = (0..block_tx_count as usize) + .map(|tx_offset| { + let txindex = TxIndex::from(block_first_txindex + tx_offset); + u64::from(txindex_to_input_count.get_unwrap(txindex)) + }) + .collect(); + + let total: u64 = counts.iter().sum(); + let mut vec = Vec::with_capacity(total as usize); + + for (tx_offset, &input_count) in counts.iter().enumerate() { + let txindex = TxIndex::from(block_first_txindex + tx_offset); for _ in 0..input_count { vec.push(txindex); } diff --git a/crates/brk_computer/src/stateful/transaction_processing.rs b/crates/brk_computer/src/stateful/transaction_processing.rs index b55156123..b95716ccb 100644 --- a/crates/brk_computer/src/stateful/transaction_processing.rs +++ b/crates/brk_computer/src/stateful/transaction_processing.rs @@ -73,11 +73,10 @@ impl AddressTypeToVec<(TypeIndex, Sats)> { let amount = prev_amount + value; - if is_new - || from_any_empty - || vecs.amount_range.get_mut(amount).filter().clone() - != vecs.amount_range.get_mut(prev_amount).filter().clone() - { + let filters_differ = + vecs.amount_range.get(amount).filter() != vecs.amount_range.get(prev_amount).filter(); + + if is_new || from_any_empty || filters_differ { if !is_new && !from_any_empty { vecs.amount_range .get_mut(prev_amount) @@ -162,10 +161,10 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> { let will_be_empty = addressdata.has_1_utxos(); - if will_be_empty - || vecs.amount_range.get_mut(amount).filter().clone() - != vecs.amount_range.get_mut(prev_amount).filter().clone() - { + let filters_differ = + vecs.amount_range.get(amount).filter() != vecs.amount_range.get(prev_amount).filter(); + + if will_be_empty || filters_differ { vecs.amount_range .get_mut(prev_amount) .state.um() diff --git a/crates/brk_computer/src/stateful/utxo_cohorts.rs b/crates/brk_computer/src/stateful/utxo_cohorts.rs index a590685db..0b33226fb 100644 --- a/crates/brk_computer/src/stateful/utxo_cohorts.rs +++ b/crates/brk_computer/src/stateful/utxo_cohorts.rs @@ -1,4 +1,4 @@ -use std::{ops::ControlFlow, path::Path}; +use std::path::Path; use brk_error::Result; use brk_grouper::{ @@ -9,7 +9,7 @@ use brk_grouper::{ use brk_traversable::Traversable; use brk_types::{ Bitcoin, CheckedSub, DateIndex, Dollars, HalvingEpoch, Height, OutputType, Sats, Timestamp, - Version, + Version, ONE_DAY_IN_SEC, }; use derive_deref::{Deref, DerefMut}; use rayon::prelude::*; @@ -214,6 +214,11 @@ impl Vecs { let prev_timestamp = chain_state.last().unwrap().timestamp; + // Only blocks whose age % ONE_DAY >= threshold can cross a day boundary. + // Saves 1 subtraction + 2 divisions per block vs computing days_old directly. + let elapsed = (*timestamp).saturating_sub(*prev_timestamp); + let threshold = ONE_DAY_IN_SEC.saturating_sub(elapsed); + // Extract all mutable references upfront to avoid borrow checker issues // Use a single destructuring to get non-overlapping mutable borrows let UTXOGroups { @@ -241,15 +246,19 @@ impl Vecs { ), ]; - let _ = chain_state + chain_state .iter() - .try_for_each(|block_state| -> ControlFlow<()> { + .filter(|block_state| { + let age = (*prev_timestamp).saturating_sub(*block_state.timestamp); + age % ONE_DAY_IN_SEC >= threshold + }) + .for_each(|block_state| { let prev_days_old = prev_timestamp.difference_in_days_between(block_state.timestamp); let days_old = timestamp.difference_in_days_between(block_state.timestamp); if prev_days_old == days_old { - return ControlFlow::Continue(()); + return; } vecs.iter_mut().for_each(|(filter, state)| { @@ -286,8 +295,6 @@ impl Vecs { } }); } - - ControlFlow::Continue(()) }); } @@ -325,8 +332,9 @@ impl Vecs { ), ]; - let last_timestamp = chain_state.last().unwrap().timestamp; - let current_price = chain_state.last().unwrap().price; + let last_block = chain_state.last().unwrap(); + let last_timestamp = last_block.timestamp; + let current_price = last_block.price; let chain_state_len = chain_state.len(); diff --git a/crates/brk_grouper/src/by_amount_range.rs b/crates/brk_grouper/src/by_amount_range.rs index 9816879fc..c76236dc0 100644 --- a/crates/brk_grouper/src/by_amount_range.rs +++ b/crates/brk_grouper/src/by_amount_range.rs @@ -49,6 +49,41 @@ impl ByAmountRange { } } + #[allow(clippy::inconsistent_digit_grouping)] + pub fn get(&self, value: Sats) -> &T { + if value == Sats::ZERO { + &self._0sats + } else if value < Sats::_10 { + &self._1sat_to_10sats + } else if value < Sats::_100 { + &self._10sats_to_100sats + } else if value < Sats::_1K { + &self._100sats_to_1k_sats + } else if value < Sats::_10K { + &self._1k_sats_to_10k_sats + } else if value < Sats::_100K { + &self._10k_sats_to_100k_sats + } else if value < Sats::_1M { + &self._100k_sats_to_1m_sats + } else if value < Sats::_10M { + &self._1m_sats_to_10m_sats + } else if value < Sats::_1BTC { + &self._10m_sats_to_1btc + } else if value < Sats::_10BTC { + &self._1btc_to_10btc + } else if value < Sats::_100BTC { + &self._10btc_to_100btc + } else if value < Sats::_1K_BTC { + &self._100btc_to_1k_btc + } else if value < Sats::_10K_BTC { + &self._1k_btc_to_10k_btc + } else if value < Sats::_100K_BTC { + &self._10k_btc_to_100k_btc + } else { + &self._100k_btc_or_more + } + } + #[allow(clippy::inconsistent_digit_grouping)] pub fn get_mut(&mut self, value: Sats) -> &mut T { if value == Sats::ZERO { diff --git a/crates/brk_indexer/Cargo.toml b/crates/brk_indexer/Cargo.toml index d45c4affb..538953d7f 100644 --- a/crates/brk_indexer/Cargo.toml +++ b/crates/brk_indexer/Cargo.toml @@ -26,3 +26,6 @@ log = { workspace = true } rayon = { workspace = true } rustc-hash = { workspace = true } vecdb = { workspace = true } + +[dev-dependencies] +color-eyre = { workspace = true } diff --git a/crates/brk_indexer/examples/indexer.rs b/crates/brk_indexer/examples/indexer.rs index f43c514ce..55d6b9303 100644 --- a/crates/brk_indexer/examples/indexer.rs +++ b/crates/brk_indexer/examples/indexer.rs @@ -5,7 +5,6 @@ use std::{ time::{Duration, Instant}, }; -use brk_error::Result; use brk_indexer::Indexer; use brk_iterator::Blocks; use brk_reader::Reader; @@ -13,7 +12,9 @@ use brk_rpc::{Auth, Client}; use log::{debug, info}; use vecdb::Exit; -fn main() -> Result<()> { +fn main() -> color_eyre::Result<()> { + color_eyre::install()?; + brk_logger::init(Some(Path::new(".log")))?; let bitcoin_dir = Client::default_bitcoin_path();