use std::path::Path; use brk_error::Result; use brk_grouper::{CohortContext, Filter, Filtered}; use brk_traversable::Traversable; use brk_types::{Bitcoin, DateIndex, Dollars, Height, StoredU64, Version}; use rayon::prelude::*; use vecdb::{ AnyStoredVec, AnyVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableCloneableVec, IterableVec, PcoVec, }; use crate::{ Indexes, grouped::{ComputedVecsFromHeight, Source, VecBuilderOptions}, indexes, price, stateful::states::AddressCohortState, }; use super::{ super::metrics::{CohortMetrics, ImportConfig, SupplyMetrics}, traits::{CohortVecs, DynCohortVecs}, }; const VERSION: Version = Version::ZERO; /// Address cohort with metrics and optional runtime state. #[derive(Clone, Traversable)] pub struct AddressCohortVecs { /// Starting height when state was imported starting_height: Option, /// Runtime state for block-by-block processing #[traversable(skip)] pub state: Option, /// Metric vectors #[traversable(flatten)] pub metrics: CohortMetrics, /// Address count at each height pub height_to_addr_count: EagerVec>, /// Address count indexed by various dimensions pub indexes_to_addr_count: ComputedVecsFromHeight, } impl AddressCohortVecs { /// Import address cohort from database. /// /// `all_supply` is the supply metrics from the "all" cohort, used as global /// sources for `*_rel_to_market_cap` ratios. Pass `None` if not available. #[allow(clippy::too_many_arguments)] pub fn forced_import( db: &Database, filter: Filter, name: &str, version: Version, indexes: &indexes::Vecs, price: Option<&price::Vecs>, states_path: Option<&Path>, all_supply: Option<&SupplyMetrics>, ) -> Result { let compute_dollars = price.is_some(); let full_name = CohortContext::Address.full_name(&filter, name); let cfg = ImportConfig { db, filter, full_name: &full_name, context: CohortContext::Address, version, indexes, price, }; let height_to_addr_count = EagerVec::forced_import( db, &cfg.name("addr_count"), version + VERSION + Version::ZERO, )?; Ok(Self { starting_height: None, state: states_path .map(|path| AddressCohortState::new(path, &full_name, compute_dollars)), metrics: CohortMetrics::forced_import(&cfg, all_supply)?, indexes_to_addr_count: ComputedVecsFromHeight::forced_import( db, &cfg.name("addr_count"), Source::Vec(height_to_addr_count.boxed_clone()), version + VERSION + Version::ZERO, indexes, VecBuilderOptions::default().add_last(), )?, height_to_addr_count, }) } /// Get the starting height when state was imported. pub fn starting_height(&self) -> Option { self.starting_height } /// Set the starting height. pub fn set_starting_height(&mut self, height: Height) { self.starting_height = Some(height); } /// Reset starting height to zero. pub fn reset_starting_height(&mut self) { self.starting_height = Some(Height::ZERO); } /// Get minimum length across height-indexed vectors. pub fn min_len(&self) -> usize { self.height_to_addr_count .len() .min(self.metrics.supply.min_len()) .min(self.metrics.activity.min_len()) } /// Returns a parallel iterator over all vecs for parallel writing. pub fn par_iter_vecs_mut(&mut self) -> impl ParallelIterator { rayon::iter::once(&mut self.height_to_addr_count as &mut dyn AnyStoredVec) .chain(self.metrics.par_iter_mut()) } /// Commit state to disk (separate from vec writes for parallelization). pub fn write_state(&mut self, height: Height, cleanup: bool) -> Result<()> { if let Some(state) = self.state.as_mut() { state.inner.write(height, cleanup)?; } Ok(()) } } impl Filtered for AddressCohortVecs { fn filter(&self) -> &Filter { &self.metrics.filter } } impl DynCohortVecs for AddressCohortVecs { fn min_height_vecs_len(&self) -> usize { self.min_len() } fn reset_state_starting_height(&mut self) { self.reset_starting_height(); if let Some(state) = self.state.as_mut() { state.reset(); } } fn import_state(&mut self, starting_height: Height) -> Result { use vecdb::GenericStoredVec; // Import state from runtime state if present if let Some(state) = self.state.as_mut() { // State files are saved AT height H, so to resume at H+1 we need to import at H // Decrement first, then increment result to match expected starting_height if let Some(mut prev_height) = starting_height.decremented() { // Import price_to_amount state file (may adjust prev_height to actual file found) prev_height = state.inner.import_at_or_before(prev_height)?; // Restore supply state from height-indexed vectors state.inner.supply.value = self .metrics .supply .height_to_supply .read_once(prev_height)?; state.inner.supply.utxo_count = *self .metrics .supply .height_to_utxo_count .read_once(prev_height)?; state.addr_count = *self.height_to_addr_count.read_once(prev_height)?; // Restore realized cap if present if let Some(realized_metrics) = self.metrics.realized.as_mut() && let Some(realized_state) = state.inner.realized.as_mut() { realized_state.cap = realized_metrics .height_to_realized_cap .read_once(prev_height)?; } let result = prev_height.incremented(); self.starting_height = Some(result); Ok(result) } else { // starting_height is 0, nothing to import self.starting_height = Some(Height::ZERO); Ok(Height::ZERO) } } else { self.starting_height = Some(starting_height); Ok(starting_height) } } fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> { use vecdb::GenericStoredVec; self.height_to_addr_count .validate_computed_version_or_reset( base_version + self.height_to_addr_count.inner_version(), )?; self.metrics.validate_computed_versions(base_version)?; Ok(()) } fn truncate_push(&mut self, height: Height) -> Result<()> { if self.starting_height.is_some_and(|h| h > height) { return Ok(()); } // Push addr_count from state if let Some(state) = self.state.as_ref() { self.height_to_addr_count .truncate_push(height, state.addr_count.into())?; self.metrics.truncate_push(height, &state.inner)?; } Ok(()) } fn compute_then_truncate_push_unrealized_states( &mut self, height: Height, height_price: Option, dateindex: Option, date_price: Option>, ) -> Result<()> { if let Some(state) = self.state.as_mut() { self.metrics.compute_then_truncate_push_unrealized_states( height, height_price, dateindex, date_price, &mut state.inner, )?; } Ok(()) } fn compute_rest_part1( &mut self, indexes: &indexes::Vecs, price: Option<&price::Vecs>, starting_indexes: &Indexes, exit: &Exit, ) -> Result<()> { self.indexes_to_addr_count.compute_rest( indexes, starting_indexes, exit, Some(&self.height_to_addr_count), )?; self.metrics .compute_rest_part1(indexes, price, starting_indexes, exit)?; Ok(()) } } impl CohortVecs for AddressCohortVecs { fn compute_from_stateful( &mut self, starting_indexes: &Indexes, others: &[&Self], exit: &Exit, ) -> Result<()> { self.height_to_addr_count.compute_sum_of_others( starting_indexes.height, others .iter() .map(|v| &v.height_to_addr_count) .collect::>() .as_slice(), exit, )?; self.metrics.compute_from_stateful( starting_indexes, &others.iter().map(|v| &v.metrics).collect::>(), exit, )?; Ok(()) } fn compute_rest_part2( &mut self, indexes: &indexes::Vecs, price: Option<&price::Vecs>, starting_indexes: &Indexes, height_to_supply: &impl IterableVec, height_to_market_cap: Option<&impl IterableVec>, dateindex_to_market_cap: Option<&impl IterableVec>, exit: &Exit, ) -> Result<()> { self.metrics.compute_rest_part2( indexes, price, starting_indexes, height_to_supply, height_to_market_cap, dateindex_to_market_cap, exit, )?; Ok(()) } }