From 2ec3ca8308089eb3ad69a4b24afc89cd5197a8b5 Mon Sep 17 00:00:00 2001 From: nym21 Date: Thu, 11 Dec 2025 18:34:23 +0100 Subject: [PATCH] computer: stateful: refactor part 2 --- .../src/stateful_new/address/address_count.rs | 13 + .../stateful_new/address/height_type_vec.rs | 5 + .../stateful_new/address/type_index_map.rs | 41 +++ .../stateful_new/cohorts/address_cohorts.rs | 36 ++ .../src/stateful_new/cohorts/utxo.rs | 45 ++- .../stateful_new/cohorts/utxo_cohorts/mod.rs | 81 ++++- .../src/stateful_new/compute/block_loop.rs | 323 ++++++++++++++---- .../src/stateful_new/compute/flush.rs | 14 +- .../src/stateful_new/compute/mod.rs | 4 +- .../src/stateful_new/compute/recover.rs | 33 +- .../src/stateful_new/metrics/mod.rs | 1 + .../src/stateful_new/metrics/relative.rs | 155 ++++++++- .../src/stateful_new/metrics/supply.rs | 2 +- .../stateful_new/process/address_lookup.rs | 105 ++++++ .../stateful_new/process/empty_addresses.rs | 11 +- .../src/stateful_new/process/inputs.rs | 133 +++++++- .../stateful_new/process/loaded_addresses.rs | 11 +- .../src/stateful_new/process/mod.rs | 10 + .../src/stateful_new/process/outputs.rs | 144 +++++++- .../src/stateful_new/process/received.rs | 92 +++++ .../src/stateful_new/process/sent.rs | 122 +++++++ .../src/stateful_new/process/tx_counts.rs | 53 +++ .../src/stateful_new/process/with_source.rs | 73 ++++ crates/brk_computer/src/stateful_new/vecs.rs | 265 +++++++++++--- crates/brk_grouper/src/address.rs | 4 + crates/brk_grouper/src/utxo.rs | 8 + crates/brk_store/src/lib.rs | 2 +- crates/brk_types/src/addressdata_source.rs | 12 - crates/brk_types/src/lib.rs | 2 - 29 files changed, 1609 insertions(+), 191 deletions(-) create mode 100644 crates/brk_computer/src/stateful_new/process/address_lookup.rs create mode 100644 crates/brk_computer/src/stateful_new/process/received.rs create mode 100644 crates/brk_computer/src/stateful_new/process/sent.rs create mode 100644 crates/brk_computer/src/stateful_new/process/tx_counts.rs create mode 100644 crates/brk_computer/src/stateful_new/process/with_source.rs delete mode 100644 crates/brk_types/src/addressdata_source.rs diff --git a/crates/brk_computer/src/stateful_new/address/address_count.rs b/crates/brk_computer/src/stateful_new/address/address_count.rs index eda8f02ac..e0f2c516e 100644 --- a/crates/brk_computer/src/stateful_new/address/address_count.rs +++ b/crates/brk_computer/src/stateful_new/address/address_count.rs @@ -61,6 +61,19 @@ impl AddressTypeToHeightToAddressCount { })?)) } + pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { + use vecdb::AnyStoredVec; + self.p2pk65.safe_flush(exit)?; + self.p2pk33.safe_flush(exit)?; + self.p2pkh.safe_flush(exit)?; + self.p2sh.safe_flush(exit)?; + self.p2wpkh.safe_flush(exit)?; + self.p2wsh.safe_flush(exit)?; + self.p2tr.safe_flush(exit)?; + self.p2a.safe_flush(exit)?; + Ok(()) + } + pub fn truncate_push( &mut self, height: Height, diff --git a/crates/brk_computer/src/stateful_new/address/height_type_vec.rs b/crates/brk_computer/src/stateful_new/address/height_type_vec.rs index 535bcc4d0..f746c0375 100644 --- a/crates/brk_computer/src/stateful_new/address/height_type_vec.rs +++ b/crates/brk_computer/src/stateful_new/address/height_type_vec.rs @@ -17,4 +17,9 @@ impl HeightToAddressTypeToVec { self.entry(height).or_default().merge_mut(vec); } } + + /// Consume and iterate over (Height, AddressTypeToVec) pairs. + pub fn into_iter(self) -> impl Iterator)> { + self.0.into_iter() + } } diff --git a/crates/brk_computer/src/stateful_new/address/type_index_map.rs b/crates/brk_computer/src/stateful_new/address/type_index_map.rs index 9565f7cab..3caaa3f6f 100644 --- a/crates/brk_computer/src/stateful_new/address/type_index_map.rs +++ b/crates/brk_computer/src/stateful_new/address/type_index_map.rs @@ -49,6 +49,18 @@ impl AddressTypeToTypeIndexMap { own.extend(other.drain()); } + /// Merge another map into self, consuming other. + pub fn merge_mut(&mut self, mut other: Self) { + Self::merge_single(&mut self.p2a, &mut other.p2a); + Self::merge_single(&mut self.p2pk33, &mut other.p2pk33); + Self::merge_single(&mut self.p2pk65, &mut other.p2pk65); + Self::merge_single(&mut self.p2pkh, &mut other.p2pkh); + Self::merge_single(&mut self.p2sh, &mut other.p2sh); + Self::merge_single(&mut self.p2tr, &mut other.p2tr); + Self::merge_single(&mut self.p2wpkh, &mut other.p2wpkh); + Self::merge_single(&mut self.p2wsh, &mut other.p2wsh); + } + /// Insert a value for a specific address type and typeindex. pub fn insert_for_type(&mut self, address_type: OutputType, typeindex: TypeIndex, value: T) { self.get_mut(address_type).unwrap().insert(typeindex, value); @@ -76,6 +88,11 @@ impl AddressTypeToTypeIndexMap { pub fn into_iter(self) -> impl Iterator)> { self.0.into_iter() } + + /// Iterate mutably over entries by address type. + pub fn iter_mut(&mut self) -> impl Iterator)> { + self.0.iter_mut() + } } impl AddressTypeToTypeIndexMap> @@ -104,3 +121,27 @@ where self } } + +impl AddressTypeToTypeIndexMap> { + /// Merge two maps of Vec values, concatenating vectors. + pub fn merge_vecs(mut self, other: Self) -> Self { + for (address_type, other_map) in other.0.into_iter() { + let self_map = self.0.get_mut_unwrap(address_type); + for (typeindex, mut other_vec) in other_map { + match self_map.entry(typeindex) { + Entry::Occupied(mut entry) => { + let self_vec = entry.get_mut(); + if other_vec.len() > self_vec.len() { + mem::swap(self_vec, &mut other_vec); + } + self_vec.extend(other_vec); + } + Entry::Vacant(entry) => { + entry.insert(other_vec); + } + } + } + } + self + } +} diff --git a/crates/brk_computer/src/stateful_new/cohorts/address_cohorts.rs b/crates/brk_computer/src/stateful_new/cohorts/address_cohorts.rs index a05a2e14a..49fd92795 100644 --- a/crates/brk_computer/src/stateful_new/cohorts/address_cohorts.rs +++ b/crates/brk_computer/src/stateful_new/cohorts/address_cohorts.rs @@ -227,4 +227,40 @@ impl AddressCohorts { self.par_iter_separate_mut() .try_for_each(|v| v.safe_flush_stateful_vecs(height, exit)) } + + /// Get minimum height from all separate cohorts' height-indexed vectors. + pub fn min_separate_height_vecs_len(&self) -> Height { + self.iter_separate() + .map(|v| Height::from(v.min_height_vecs_len())) + .min() + .unwrap_or_default() + } + + /// Import state for all separate cohorts at given height. + /// + /// Note: This follows the same pattern as UTXOCohorts - errors are ignored + /// and the start_mode logic ensures we're in a valid state before calling. + pub fn import_separate_states(&mut self, height: Height) { + self.par_iter_separate_mut().for_each(|v| { + let _ = v.import_state(height); + }); + } + + /// Reset state heights for all separate cohorts. + pub fn reset_separate_state_heights(&mut self) { + self.par_iter_separate_mut().for_each(|v| { + v.reset_state_starting_height(); + }); + } + + /// Reset price_to_amount for all separate cohorts (called during fresh start). + pub fn reset_separate_price_to_amount(&mut self) -> Result<()> { + self.par_iter_separate_mut() + .try_for_each(|v| { + if let Some(state) = v.state.as_mut() { + state.reset_price_to_amount_if_needed()?; + } + Ok(()) + }) + } } diff --git a/crates/brk_computer/src/stateful_new/cohorts/utxo.rs b/crates/brk_computer/src/stateful_new/cohorts/utxo.rs index 7b28d7af8..deddb762d 100644 --- a/crates/brk_computer/src/stateful_new/cohorts/utxo.rs +++ b/crates/brk_computer/src/stateful_new/cohorts/utxo.rs @@ -5,11 +5,13 @@ use std::path::Path; use brk_error::Result; use brk_grouper::{CohortContext, Filter, Filtered, StateLevel}; use brk_traversable::Traversable; -use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version}; +use brk_types::{Bitcoin, DateIndex, Dollars, Height, Sats, Version}; use vecdb::{Database, Exit, IterableVec}; use crate::{ - Indexes, PriceToAmount, UTXOCohortState, indexes, price, + Indexes, PriceToAmount, UTXOCohortState, + grouped::{PERCENTILES, PERCENTILES_LEN}, + indexes, price, stateful_new::{CohortVecs, DynCohortVecs}, }; @@ -94,6 +96,45 @@ impl UTXOCohortVecs { pub fn reset_state_starting_height(&mut self) { self.state_starting_height = Some(Height::ZERO); } + + /// Compute percentile prices from standalone price_to_amount. + /// Returns NaN array if price_to_amount is None or empty. + pub fn compute_percentile_prices_from_standalone( + &self, + supply: Sats, + ) -> [Dollars; PERCENTILES_LEN] { + let mut result = [Dollars::NAN; PERCENTILES_LEN]; + + let price_to_amount = match self.price_to_amount.as_ref() { + Some(p) => p, + None => return result, + }; + + if price_to_amount.is_empty() || supply == Sats::ZERO { + return result; + } + + let total = supply; + let targets = PERCENTILES.map(|p| total * p as u64 / 100); + + let mut accumulated = Sats::ZERO; + let mut pct_idx = 0; + + for (&price, &sats) in price_to_amount.iter() { + accumulated += sats; + + while pct_idx < PERCENTILES_LEN && accumulated >= targets[pct_idx] { + result[pct_idx] = price; + pct_idx += 1; + } + + if pct_idx >= PERCENTILES_LEN { + break; + } + } + + result + } } impl Filtered for UTXOCohortVecs { diff --git a/crates/brk_computer/src/stateful_new/cohorts/utxo_cohorts/mod.rs b/crates/brk_computer/src/stateful_new/cohorts/utxo_cohorts/mod.rs index 426372bdc..b28453422 100644 --- a/crates/brk_computer/src/stateful_new/cohorts/utxo_cohorts/mod.rs +++ b/crates/brk_computer/src/stateful_new/cohorts/utxo_cohorts/mod.rs @@ -16,7 +16,7 @@ use brk_traversable::Traversable; use brk_types::{Bitcoin, DateIndex, Dollars, HalvingEpoch, Height, OutputType, Sats, Version}; use derive_deref::{Deref, DerefMut}; use rayon::prelude::*; -use vecdb::{Database, Exit, IterableVec}; +use vecdb::{Database, Exit, GenericStoredVec, IterableVec}; use crate::{Indexes, indexes, price, stateful_new::DynCohortVecs}; @@ -373,4 +373,83 @@ impl UTXOCohorts { Ok(prev_height.incremented()) } + + /// Get minimum height from all separate cohorts' height-indexed vectors. + pub fn min_separate_height_vecs_len(&self) -> Height { + self.iter_separate() + .map(|v| Height::from(v.min_height_vecs_len())) + .min() + .unwrap_or_default() + } + + /// Import state for all separate cohorts at given height. + pub fn import_separate_states(&mut self, height: Height) { + self.par_iter_separate_mut().for_each(|v| { + let _ = v.import_state(height); + }); + } + + /// Reset state heights for all separate cohorts. + pub fn reset_separate_state_heights(&mut self) { + self.par_iter_separate_mut().for_each(|v| { + v.reset_state_starting_height(); + }); + } + + /// Reset price_to_amount for all separate cohorts (called during fresh start). + pub fn reset_separate_price_to_amount(&mut self) -> Result<()> { + self.par_iter_separate_mut() + .try_for_each(|v| { + if let Some(state) = v.state.as_mut() { + state.reset_price_to_amount_if_needed()?; + } + Ok(()) + }) + } + + /// Compute and push percentiles for aggregate cohorts (all, sth, lth). + /// Must be called after receive()/send() when price_to_amount is up to date. + pub fn truncate_push_aggregate_percentiles(&mut self, height: Height) -> Result<()> { + // Collect supply values from age_range cohorts + let age_range_data: Vec<_> = self + .0 + .age_range + .iter() + .map(|sub| (sub.filter().clone(), sub.state.as_ref().map(|s| s.supply.value).unwrap_or(Sats::ZERO))) + .collect(); + + // Compute percentiles for each aggregate cohort in parallel + let results: Vec<_> = self + .0 + .par_iter_aggregate() + .filter_map(|v| { + if v.price_to_amount.is_none() { + return None; + } + let filter = v.filter().clone(); + let supply = age_range_data + .iter() + .filter(|(sub_filter, _)| filter.includes(sub_filter)) + .map(|(_, value)| *value) + .fold(Sats::ZERO, |acc, v| acc + v); + let percentiles = v.compute_percentile_prices_from_standalone(supply); + Some((filter, percentiles)) + }) + .collect(); + + // Push results sequentially (requires &mut) + for (filter, percentiles) in results { + let v = self + .0 + .iter_aggregate_mut() + .find(|v| v.filter() == &filter) + .unwrap(); + + if let Some(pp) = v.metrics.price_paid.as_mut().and_then(|p| p.price_percentiles.as_mut()) { + pp.truncate_push(height, &percentiles)?; + } + } + + Ok(()) + } } diff --git a/crates/brk_computer/src/stateful_new/compute/block_loop.rs b/crates/brk_computer/src/stateful_new/compute/block_loop.rs index 5708d76c8..eb59a8dbd 100644 --- a/crates/brk_computer/src/stateful_new/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful_new/compute/block_loop.rs @@ -13,25 +13,29 @@ use std::thread; use brk_error::Result; use brk_grouper::ByAddressType; use brk_indexer::Indexer; -use brk_types::{DateIndex, Height, OutputType, Sats}; +use brk_types::{DateIndex, Dollars, Height, OutputType, Sats, Timestamp, TypeIndex}; use log::info; -use vecdb::{Exit, GenericStoredVec, IterableVec, VecIndex}; +use rayon::prelude::*; +use vecdb::{AnyStoredVec, Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex}; use crate::states::{BlockState, Transacted}; +use crate::utils::OptionExt; use crate::{chain, indexes, price}; +use super::super::address::AddressTypeToTypeIndexMap; use super::super::cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts}; use super::super::vecs::Vecs; use super::{ - FLUSH_INTERVAL, IndexerReaders, build_txinindex_to_txindex, build_txoutindex_to_height_map, - build_txoutindex_to_txindex, process_inputs, process_outputs, + BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1, + BIP30_ORIGINAL_HEIGHT_2, FLUSH_INTERVAL, IndexerReaders, VecsReaders, build_txinindex_to_txindex, + build_txoutindex_to_txindex, flush::flush_checkpoint as flush_checkpoint_full, +}; +use crate::stateful_new::address::AddressTypeToAddressCount; +use crate::stateful_new::process::{ + AddressLookup, EmptyAddressDataWithSource, InputsResult, LoadedAddressDataWithSource, + build_txoutindex_to_height_map, process_inputs, process_outputs, process_received, + process_sent, update_tx_counts, }; - -/// BIP30 duplicate coinbase heights - must handle specially. -const BIP30_DUPLICATE_HEIGHT_1: u32 = 91_842; -const BIP30_DUPLICATE_HEIGHT_2: u32 = 91_880; -const BIP30_ORIGINAL_HEIGHT_1: u32 = 91_812; -const BIP30_ORIGINAL_HEIGHT_2: u32 = 91_722; /// Process all blocks from starting_height to last_height. #[allow(clippy::too_many_arguments)] @@ -55,48 +59,104 @@ pub fn process_blocks( starting_height, last_height ); - // Pre-compute iterators for fast access - let mut height_to_first_txindex = indexes.height_to_first_txindex.boxed_iter(); - let mut height_to_tx_count = chain.height_to_tx_count.boxed_iter(); - let mut height_to_first_txoutindex = indexes.height_to_first_txoutindex.boxed_iter(); - let mut height_to_output_count = chain.height_to_output_count.boxed_iter(); - let mut height_to_first_txinindex = indexes.height_to_first_txinindex.boxed_iter(); - let mut height_to_input_count = chain.height_to_input_count.boxed_iter(); - let mut height_to_timestamp = chain.height_to_timestamp.boxed_iter(); - let mut height_to_unclaimed_rewards = chain.height_to_unclaimed_reward.boxed_iter(); - let mut height_to_date = indexes.height_to_date.boxed_iter(); - let mut dateindex_to_first_height = indexes.dateindex_to_first_height.boxed_iter(); - let mut dateindex_to_height_count = indexes.dateindex_to_height_count.boxed_iter(); - let mut txindex_to_output_count = chain.txindex_to_output_count.boxed_iter(); - let mut txindex_to_input_count = chain.txindex_to_input_count.boxed_iter(); + // References to vectors using correct field paths + // From indexer.vecs: + let height_to_first_txindex = &indexer.vecs.height_to_first_txindex; + let height_to_first_txoutindex = &indexer.vecs.height_to_first_txoutindex; + let height_to_first_txinindex = &indexer.vecs.height_to_first_txinindex; - let mut height_to_price = price.map(|p| p.height_to_close.boxed_iter()); - let mut dateindex_to_price = price.map(|p| p.dateindex_to_close.boxed_iter()); + // From chain (via .height.u() or .height.unwrap_sum() patterns): + let height_to_tx_count = chain.indexes_to_tx_count.height.u(); + let height_to_output_count = chain.indexes_to_output_count.height.unwrap_sum(); + let height_to_input_count = chain.indexes_to_input_count.height.unwrap_sum(); + let height_to_unclaimed_rewards = chain.indexes_to_unclaimed_rewards.sats.height.as_ref().unwrap(); + + // From indexes: + let height_to_timestamp = &indexes.height_to_timestamp_fixed; + let height_to_date = &indexes.height_to_date_fixed; + let dateindex_to_first_height = &indexes.dateindex_to_first_height; + let dateindex_to_height_count = &indexes.dateindex_to_height_count; + let txindex_to_output_count = &indexes.txindex_to_output_count; + let txindex_to_input_count = &indexes.txindex_to_input_count; + + // From price (optional): + let height_to_price = price.map(|p| &p.chainindexes_to_price_close.height); + let dateindex_to_price = price.map(|p| p.timeindexes_to_price_close.dateindex.u()); + + // Collect price and timestamp vectors for process_sent (needs slice access, not iterators) + // These are used in the spawned thread and need to be separate from chain_state + let height_to_price_vec: Option> = + height_to_price.map(|v| v.into_iter().map(|d| *d).collect()); + let height_to_timestamp_vec: Vec = height_to_timestamp.into_iter().collect(); + + // Create iterators for sequential access + let mut height_to_first_txindex_iter = height_to_first_txindex.into_iter(); + let mut height_to_first_txoutindex_iter = height_to_first_txoutindex.into_iter(); + let mut height_to_first_txinindex_iter = height_to_first_txinindex.into_iter(); + let mut height_to_tx_count_iter = height_to_tx_count.into_iter(); + let mut height_to_output_count_iter = height_to_output_count.into_iter(); + let mut height_to_input_count_iter = height_to_input_count.into_iter(); + let mut height_to_unclaimed_rewards_iter = height_to_unclaimed_rewards.into_iter(); + let mut height_to_timestamp_iter = height_to_timestamp.into_iter(); + let mut height_to_date_iter = height_to_date.into_iter(); + let mut dateindex_to_first_height_iter = dateindex_to_first_height.into_iter(); + let mut dateindex_to_height_count_iter = dateindex_to_height_count.into_iter(); + let mut txindex_to_output_count_iter = txindex_to_output_count.iter(); + let mut txindex_to_input_count_iter = txindex_to_input_count.iter(); + let mut height_to_price_iter = height_to_price.map(|v| v.into_iter()); + let mut dateindex_to_price_iter = dateindex_to_price.map(|v| v.into_iter()); // Build txoutindex -> height map for input processing - let txoutindex_to_height = build_txoutindex_to_height_map(&indexes.height_to_first_txoutindex); + let txoutindex_to_height = build_txoutindex_to_height_map(height_to_first_txoutindex); // Create readers for parallel data access let ir = IndexerReaders::new(indexer); + let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); - // Track running totals - let mut unspendable_supply = Sats::ZERO; - let mut opreturn_supply = Sats::ZERO; - let mut addresstype_to_addr_count = ByAddressType::::default(); - let mut addresstype_to_empty_addr_count = ByAddressType::::default(); + // Create iterators for first address indexes per type + let mut first_p2a_iter = indexer.vecs.height_to_first_p2aaddressindex.into_iter(); + let mut first_p2pk33_iter = indexer.vecs.height_to_first_p2pk33addressindex.into_iter(); + let mut first_p2pk65_iter = indexer.vecs.height_to_first_p2pk65addressindex.into_iter(); + let mut first_p2pkh_iter = indexer.vecs.height_to_first_p2pkhaddressindex.into_iter(); + let mut first_p2sh_iter = indexer.vecs.height_to_first_p2shaddressindex.into_iter(); + let mut first_p2tr_iter = indexer.vecs.height_to_first_p2traddressindex.into_iter(); + let mut first_p2wpkh_iter = indexer.vecs.height_to_first_p2wpkhaddressindex.into_iter(); + let mut first_p2wsh_iter = indexer.vecs.height_to_first_p2wshaddressindex.into_iter(); - // Recover initial values if resuming - if starting_height > Height::ZERO { - let prev_height = starting_height.decremented().unwrap(); - unspendable_supply = vecs - .height_to_unspendable_supply - .get(prev_height) - .unwrap_or_default(); - opreturn_supply = vecs - .height_to_opreturn_supply - .get(prev_height) - .unwrap_or_default(); - } + // Track running totals - recover from previous height if resuming + let (mut unspendable_supply, mut opreturn_supply, mut addresstype_to_addr_count, mut addresstype_to_empty_addr_count) = + if starting_height > Height::ZERO { + let prev_height = starting_height.decremented().unwrap(); + ( + vecs.height_to_unspendable_supply + .into_iter() + .get_unwrap(prev_height), + vecs.height_to_opreturn_supply + .into_iter() + .get_unwrap(prev_height), + AddressTypeToAddressCount::from(( + &vecs.addresstype_to_height_to_addr_count, + starting_height, + )), + AddressTypeToAddressCount::from(( + &vecs.addresstype_to_height_to_empty_addr_count, + starting_height, + )), + ) + } else { + ( + Sats::ZERO, + Sats::ZERO, + AddressTypeToAddressCount::default(), + AddressTypeToAddressCount::default(), + ) + }; + + // Persistent address data caches (accumulate across blocks, flushed at checkpoints) + let mut loaded_cache: AddressTypeToTypeIndexMap = + AddressTypeToTypeIndexMap::default(); + let mut empty_cache: AddressTypeToTypeIndexMap = + AddressTypeToTypeIndexMap::default(); // Main block iteration for height in starting_height.to_usize()..=last_height.to_usize() { @@ -107,20 +167,32 @@ pub fn process_blocks( } // Get block metadata - let first_txindex = height_to_first_txindex.get_unwrap(height); - let tx_count = u64::from(height_to_tx_count.get_unwrap(height)); - let first_txoutindex = height_to_first_txoutindex.get_unwrap(height).to_usize(); - let output_count = u64::from(height_to_output_count.get_unwrap(height)) as usize; - let first_txinindex = height_to_first_txinindex.get_unwrap(height).to_usize(); - let input_count = u64::from(height_to_input_count.get_unwrap(height)) as usize; - let timestamp = height_to_timestamp.get_unwrap(height); - let block_price = height_to_price.as_mut().map(|v| v.get_unwrap(height)); + let first_txindex = height_to_first_txindex_iter.get_unwrap(height); + let tx_count = u64::from(height_to_tx_count_iter.get_unwrap(height)); + let first_txoutindex = height_to_first_txoutindex_iter.get_unwrap(height).to_usize(); + let output_count = u64::from(height_to_output_count_iter.get_unwrap(height)) as usize; + let first_txinindex = height_to_first_txinindex_iter.get_unwrap(height).to_usize(); + let input_count = u64::from(height_to_input_count_iter.get_unwrap(height)) as usize; + let timestamp = height_to_timestamp_iter.get_unwrap(height); + let block_price = height_to_price_iter.as_mut().map(|v| *v.get_unwrap(height)); // Build txindex mappings for this block let txoutindex_to_txindex = - build_txoutindex_to_txindex(first_txindex, tx_count, &mut txindex_to_output_count); + build_txoutindex_to_txindex(first_txindex, tx_count, &mut txindex_to_output_count_iter); let txinindex_to_txindex = - build_txinindex_to_txindex(first_txindex, tx_count, &mut txindex_to_input_count); + build_txinindex_to_txindex(first_txindex, tx_count, &mut txindex_to_input_count_iter); + + // Get first address indexes for this height + let first_addressindexes = ByAddressType { + p2a: TypeIndex::from(first_p2a_iter.get_unwrap(height).to_usize()), + p2pk33: TypeIndex::from(first_p2pk33_iter.get_unwrap(height).to_usize()), + p2pk65: TypeIndex::from(first_p2pk65_iter.get_unwrap(height).to_usize()), + p2pkh: TypeIndex::from(first_p2pkh_iter.get_unwrap(height).to_usize()), + p2sh: TypeIndex::from(first_p2sh_iter.get_unwrap(height).to_usize()), + p2tr: TypeIndex::from(first_p2tr_iter.get_unwrap(height).to_usize()), + p2wpkh: TypeIndex::from(first_p2wpkh_iter.get_unwrap(height).to_usize()), + p2wsh: TypeIndex::from(first_p2wsh_iter.get_unwrap(height).to_usize()), + }; // Reset per-block values for all separate cohorts reset_block_values(&mut vecs.utxo_cohorts, &mut vecs.address_cohorts); @@ -142,6 +214,12 @@ pub fn process_blocks( &indexer.vecs.txoutindex_to_outputtype, &indexer.vecs.txoutindex_to_typeindex, &ir, + &first_addressindexes, + &loaded_cache, + &empty_cache, + &vr, + &vecs.any_address_indexes, + &vecs.addresses_data, ); // Process inputs (send) - skip coinbase input @@ -157,23 +235,40 @@ pub fn process_blocks( &indexer.vecs.txoutindex_to_typeindex, &txoutindex_to_height, &ir, + &first_addressindexes, + &loaded_cache, + &empty_cache, + &vr, + &vecs.any_address_indexes, + &vecs.addresses_data, ) } else { - super::InputsResult { + InputsResult { height_to_sent: Default::default(), sent_data: Default::default(), + address_data: Default::default(), + txindex_vecs: Default::default(), } }; (outputs_result, inputs_result) }); + // Merge new address data into caches + loaded_cache.merge_mut(outputs_result.address_data); + loaded_cache.merge_mut(inputs_result.address_data); + + // Combine txindex_vecs from outputs and inputs, then update tx_count + let combined_txindex_vecs = + outputs_result.txindex_vecs.merge_vec(inputs_result.txindex_vecs); + update_tx_counts(&mut loaded_cache, &mut empty_cache, combined_txindex_vecs); + let mut transacted = outputs_result.transacted; let mut height_to_sent = inputs_result.height_to_sent; // Update supply tracking unspendable_supply += transacted.by_type.unspendable.opreturn.value - + height_to_unclaimed_rewards.get_unwrap(height); + + height_to_unclaimed_rewards_iter.get_unwrap(height); opreturn_supply += transacted.by_type.unspendable.opreturn.value; // Handle special cases @@ -203,9 +298,53 @@ pub fn process_blocks( timestamp, }); - // Update UTXO cohorts - vecs.utxo_cohorts.receive(transacted, height, block_price); - vecs.utxo_cohorts.send(height_to_sent, chain_state); + // Process UTXO cohorts and Address cohorts in parallel + // - Main thread: UTXO cohorts receive/send + // - Spawned thread: Address cohorts process_received/process_sent + thread::scope(|scope| { + // Spawn address cohort processing in background thread + scope.spawn(|| { + // Create lookup closure that returns None (data was pre-fetched in parallel phase) + let get_address_data = + |_output_type, _type_index| -> Option { None }; + + let mut lookup = AddressLookup { + get_address_data, + loaded: &mut loaded_cache, + empty: &mut empty_cache, + }; + + // Process received outputs (addresses receiving funds) + process_received( + outputs_result.received_data, + &mut vecs.address_cohorts, + &mut lookup, + block_price, + &mut addresstype_to_addr_count, + &mut addresstype_to_empty_addr_count, + ); + + // Process sent inputs (addresses sending funds) + // Uses separate price/timestamp vecs to avoid borrowing chain_state + process_sent( + inputs_result.sent_data, + &mut vecs.address_cohorts, + &mut lookup, + block_price, + &mut addresstype_to_addr_count, + &mut addresstype_to_empty_addr_count, + height_to_price_vec.as_deref(), + &height_to_timestamp_vec, + height, + timestamp, + ) + .unwrap(); + }); + + // Main thread: Update UTXO cohorts + vecs.utxo_cohorts.receive(transacted, height, block_price); + vecs.utxo_cohorts.send(height_to_sent, chain_state); + }); // Push to height-indexed vectors vecs.height_to_unspendable_supply @@ -218,15 +357,15 @@ pub fn process_blocks( .truncate_push(height, &addresstype_to_empty_addr_count)?; // Get date info for unrealized state computation - let date = height_to_date.get_unwrap(height); + let date = height_to_date_iter.get_unwrap(height); let dateindex = DateIndex::try_from(date).unwrap(); - let date_first_height = dateindex_to_first_height.get_unwrap(dateindex); - let date_height_count = dateindex_to_height_count.get_unwrap(dateindex); + let date_first_height = dateindex_to_first_height_iter.get_unwrap(dateindex); + let date_height_count = dateindex_to_height_count_iter.get_unwrap(dateindex); let is_date_last_height = date_first_height + Height::from(date_height_count).decremented().unwrap() == height; - let date_price = dateindex_to_price + let date_price = dateindex_to_price_iter .as_mut() - .map(|v| is_date_last_height.then(|| v.get_unwrap(dateindex))); + .map(|v| is_date_last_height.then(|| *v.get_unwrap(dateindex))); let dateindex_opt = is_date_last_height.then_some(dateindex); // Push cohort states and compute unrealized @@ -239,19 +378,31 @@ pub fn process_blocks( date_price, )?; + // Compute and push percentiles for aggregate cohorts (all, sth, lth) + vecs.utxo_cohorts + .truncate_push_aggregate_percentiles(height)?; + // Periodic checkpoint flush if height != last_height && height != Height::ZERO && height.to_usize() % FLUSH_INTERVAL == 0 { let _lock = exit.lock(); - flush_checkpoint(vecs, height, exit)?; + + // Drop readers before flush to release mmap handles + drop(vr); + + flush_checkpoint(vecs, height, &mut loaded_cache, &mut empty_cache, exit)?; + + // Recreate readers after flush to pick up new data + vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); } } // Final flush let _lock = exit.lock(); - flush_checkpoint(vecs, last_height, exit)?; + drop(vr); + flush_checkpoint(vecs, last_height, &mut loaded_cache, &mut empty_cache, exit)?; Ok(()) } @@ -302,7 +453,20 @@ fn push_cohort_states( } /// Flush checkpoint to disk. -fn flush_checkpoint(vecs: &mut Vecs, height: Height, exit: &Exit) -> Result<()> { +/// +/// Flushes all accumulated data including: +/// - Cohort stateful vectors +/// - Height-indexed vectors +/// - Address data caches (loaded and empty) +/// - Chain state +#[allow(clippy::too_many_arguments)] +fn flush_checkpoint( + vecs: &mut Vecs, + height: Height, + loaded_cache: &mut AddressTypeToTypeIndexMap, + empty_cache: &mut AddressTypeToTypeIndexMap, + exit: &Exit, +) -> Result<()> { info!("Flushing checkpoint at height {}...", height); // Flush cohort states @@ -316,8 +480,31 @@ fn flush_checkpoint(vecs: &mut Vecs, height: Height, exit: &Exit) -> Result<()> vecs.addresstype_to_height_to_empty_addr_count .safe_flush(exit)?; + // Process and flush address data updates + let empty_updates = std::mem::take(empty_cache); + let loaded_updates = std::mem::take(loaded_cache); + flush_checkpoint_full( + height, + &mut vecs + .utxo_cohorts + .par_iter_separate_mut() + .map(|v| v as &mut dyn DynCohortVecs) + .collect::>()[..], + &mut vecs + .address_cohorts + .par_iter_separate_mut() + .map(|v| v as &mut dyn DynCohortVecs) + .collect::>()[..], + &mut vecs.any_address_indexes, + &mut vecs.addresses_data, + empty_updates, + loaded_updates, + true, + exit, + )?; + // Flush chain state with stamp - vecs.chain_state.safe_write_with_stamp(height.into(), exit)?; + vecs.chain_state.stamped_flush_with_changes(height.into())?; Ok(()) } diff --git a/crates/brk_computer/src/stateful_new/compute/flush.rs b/crates/brk_computer/src/stateful_new/compute/flush.rs index 0815bb2fa..dee590bb8 100644 --- a/crates/brk_computer/src/stateful_new/compute/flush.rs +++ b/crates/brk_computer/src/stateful_new/compute/flush.rs @@ -4,14 +4,14 @@ //! including cohort states, address data, and chain state. use brk_error::Result; -use brk_types::{ - AddressDataSource, AnyAddressIndex, EmptyAddressData, EmptyAddressIndex, Height, - LoadedAddressData, LoadedAddressIndex, -}; +use brk_types::{AnyAddressIndex, Height}; use log::info; use vecdb::{Exit, Stamp}; -use crate::stateful_new::process::{process_empty_addresses, process_loaded_addresses}; +use crate::stateful_new::process::{ + EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses, + process_loaded_addresses, +}; use super::super::address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs}; use super::super::cohorts::DynCohortVecs; @@ -60,8 +60,8 @@ pub fn flush_checkpoint( address_vecs: &mut [&mut dyn DynCohortVecs], address_indexes: &mut AnyAddressIndexesVecs, addresses_data: &mut AddressesDataVecs, - empty_updates: AddressTypeToTypeIndexMap>, - loaded_updates: AddressTypeToTypeIndexMap>, + empty_updates: AddressTypeToTypeIndexMap, + loaded_updates: AddressTypeToTypeIndexMap, with_changes: bool, exit: &Exit, ) -> Result<()> { diff --git a/crates/brk_computer/src/stateful_new/compute/mod.rs b/crates/brk_computer/src/stateful_new/compute/mod.rs index f99046d2e..a07b6bd9f 100644 --- a/crates/brk_computer/src/stateful_new/compute/mod.rs +++ b/crates/brk_computer/src/stateful_new/compute/mod.rs @@ -8,14 +8,14 @@ //! 5. Compute aggregate cohorts from separate cohorts mod aggregates; -// mod block_loop; +mod block_loop; mod context; mod flush; mod readers; mod recover; pub use aggregates::{compute_overlapping, compute_rest_part1, compute_rest_part2}; -// pub use block_loop::process_blocks; +pub use block_loop::process_blocks; pub use context::ComputeContext; pub use flush::{flush_checkpoint, flush_cohort_states}; pub use readers::{ diff --git a/crates/brk_computer/src/stateful_new/compute/recover.rs b/crates/brk_computer/src/stateful_new/compute/recover.rs index e204c3ece..df53ea537 100644 --- a/crates/brk_computer/src/stateful_new/compute/recover.rs +++ b/crates/brk_computer/src/stateful_new/compute/recover.rs @@ -77,19 +77,28 @@ pub enum StartMode { /// Returns the consistent starting height if all vectors agree, /// otherwise returns Height::ZERO (need fresh start). pub fn rollback_states( - stamp: Stamp, - chain_state_rollback: Result, - address_indexes_rollbacks: Vec>, - address_data_rollbacks: Vec>, + _stamp: Stamp, + chain_state_rollback: vecdb::Result, + address_indexes_rollbacks: Result>, + address_data_rollbacks: Result<[Stamp; 2]>, ) -> Height { - let mut heights: BTreeSet = [chain_state_rollback] - .into_iter() - .chain(address_indexes_rollbacks) - .chain(address_data_rollbacks) - .filter_map(|r| r.ok()) - .map(Height::from) - .map(Height::incremented) - .collect(); + let mut heights: BTreeSet = BTreeSet::new(); + + if let Ok(s) = chain_state_rollback { + heights.insert(Height::from(s).incremented()); + } + + if let Ok(stamps) = address_indexes_rollbacks { + for s in stamps { + heights.insert(Height::from(s).incremented()); + } + } + + if let Ok(stamps) = address_data_rollbacks { + for s in stamps { + heights.insert(Height::from(s).incremented()); + } + } if heights.len() == 1 { heights.pop_first().unwrap() diff --git a/crates/brk_computer/src/stateful_new/metrics/mod.rs b/crates/brk_computer/src/stateful_new/metrics/mod.rs index 6c6a978b0..c2e0c9d14 100644 --- a/crates/brk_computer/src/stateful_new/metrics/mod.rs +++ b/crates/brk_computer/src/stateful_new/metrics/mod.rs @@ -283,6 +283,7 @@ impl CohortMetrics { height_to_realized_cap, dateindex_to_realized_cap, &self.supply, + self.unrealized.as_ref(), self.realized.as_ref(), exit, )?; diff --git a/crates/brk_computer/src/stateful_new/metrics/relative.rs b/crates/brk_computer/src/stateful_new/metrics/relative.rs index 359074453..47c4b89e7 100644 --- a/crates/brk_computer/src/stateful_new/metrics/relative.rs +++ b/crates/brk_computer/src/stateful_new/metrics/relative.rs @@ -414,6 +414,13 @@ impl RelativeMetrics { } /// Second phase of computed metrics (ratios, relative values). + /// + /// This computes percentage ratios comparing cohort metrics to global metrics: + /// - Supply relative to circulating supply + /// - Supply in profit/loss relative to own supply and circulating supply + /// - Unrealized profit/loss relative to market cap, own market cap, total unrealized + /// + /// See `stateful/common/compute.rs` lines 800-1200 for the full original implementation. #[allow(clippy::too_many_arguments)] pub fn compute_rest_part2( &mut self, @@ -426,10 +433,11 @@ impl RelativeMetrics { _height_to_realized_cap: Option<&impl IterableVec>, _dateindex_to_realized_cap: Option<&impl IterableVec>, supply: &SupplyMetrics, + unrealized: Option<&super::UnrealizedMetrics>, _realized: Option<&RealizedMetrics>, exit: &Exit, ) -> Result<()> { - // Supply relative to circulating supply + // === Supply Relative to Circulating Supply === if let Some(v) = self.indexes_to_supply_rel_to_circulating_supply.as_mut() { v.compute_all(indexes, starting_indexes, exit, |v| { v.compute_percentage( @@ -442,9 +450,150 @@ impl RelativeMetrics { })?; } - let _ = (dateindex_to_supply, height_to_market_cap, dateindex_to_market_cap); + // === Supply in Profit/Loss Relative to Own Supply === + if let Some(unrealized) = unrealized { + self.height_to_supply_in_profit_rel_to_own_supply.compute_percentage( + starting_indexes.height, + &unrealized.height_to_supply_in_profit_value.bitcoin, + &supply.height_to_supply_value.bitcoin, + exit, + )?; + self.height_to_supply_in_loss_rel_to_own_supply.compute_percentage( + starting_indexes.height, + &unrealized.height_to_supply_in_loss_value.bitcoin, + &supply.height_to_supply_value.bitcoin, + exit, + )?; - // Additional relative metrics computed here + self.indexes_to_supply_in_profit_rel_to_own_supply.compute_all( + starting_indexes, + exit, + |v| { + if let Some(dateindex_vec) = unrealized.indexes_to_supply_in_profit.bitcoin.dateindex.as_ref() { + if let Some(supply_dateindex) = supply.indexes_to_supply.bitcoin.dateindex.as_ref() { + v.compute_percentage( + starting_indexes.dateindex, + dateindex_vec, + supply_dateindex, + exit, + )?; + } + } + Ok(()) + }, + )?; + + self.indexes_to_supply_in_loss_rel_to_own_supply.compute_all( + starting_indexes, + exit, + |v| { + if let Some(dateindex_vec) = unrealized.indexes_to_supply_in_loss.bitcoin.dateindex.as_ref() { + if let Some(supply_dateindex) = supply.indexes_to_supply.bitcoin.dateindex.as_ref() { + v.compute_percentage( + starting_indexes.dateindex, + dateindex_vec, + supply_dateindex, + exit, + )?; + } + } + Ok(()) + }, + )?; + } + + // === Supply in Profit/Loss Relative to Circulating Supply === + if let (Some(unrealized), Some(v)) = ( + unrealized, + self.height_to_supply_in_profit_rel_to_circulating_supply.as_mut(), + ) { + v.compute_percentage( + starting_indexes.height, + &unrealized.height_to_supply_in_profit_value.bitcoin, + height_to_supply, + exit, + )?; + } + if let (Some(unrealized), Some(v)) = ( + unrealized, + self.height_to_supply_in_loss_rel_to_circulating_supply.as_mut(), + ) { + v.compute_percentage( + starting_indexes.height, + &unrealized.height_to_supply_in_loss_value.bitcoin, + height_to_supply, + exit, + )?; + } + + // === Unrealized vs Market Cap === + if let (Some(unrealized), Some(height_to_mc)) = (unrealized, height_to_market_cap) { + self.height_to_unrealized_profit_rel_to_market_cap.compute_percentage( + starting_indexes.height, + &unrealized.height_to_unrealized_profit, + height_to_mc, + exit, + )?; + self.height_to_unrealized_loss_rel_to_market_cap.compute_percentage( + starting_indexes.height, + &unrealized.height_to_unrealized_loss, + height_to_mc, + exit, + )?; + self.height_to_neg_unrealized_loss_rel_to_market_cap.compute_percentage( + starting_indexes.height, + &unrealized.height_to_neg_unrealized_loss, + height_to_mc, + exit, + )?; + self.height_to_net_unrealized_pnl_rel_to_market_cap.compute_percentage( + starting_indexes.height, + &unrealized.height_to_net_unrealized_pnl, + height_to_mc, + exit, + )?; + } + + if let Some(dateindex_to_mc) = dateindex_to_market_cap { + if let Some(unrealized) = unrealized { + self.indexes_to_unrealized_profit_rel_to_market_cap.compute_all( + starting_indexes, + exit, + |v| { + v.compute_percentage( + starting_indexes.dateindex, + &unrealized.dateindex_to_unrealized_profit, + dateindex_to_mc, + exit, + )?; + Ok(()) + }, + )?; + self.indexes_to_unrealized_loss_rel_to_market_cap.compute_all( + starting_indexes, + exit, + |v| { + v.compute_percentage( + starting_indexes.dateindex, + &unrealized.dateindex_to_unrealized_loss, + dateindex_to_mc, + exit, + )?; + Ok(()) + }, + )?; + } + } + + // TODO: Remaining relative metrics to implement: + // - indexes_to_supply_in_profit/loss_rel_to_circulating_supply + // - height_to_unrealized_*_rel_to_own_market_cap + // - height_to_unrealized_*_rel_to_own_total_unrealized_pnl + // - indexes_to_unrealized_*_rel_to_own_market_cap + // - indexes_to_unrealized_*_rel_to_own_total_unrealized_pnl + // See stateful/common/compute.rs for patterns. + + let _ = dateindex_to_supply; Ok(()) } } diff --git a/crates/brk_computer/src/stateful_new/metrics/supply.rs b/crates/brk_computer/src/stateful_new/metrics/supply.rs index 05197aefe..455dabfa5 100644 --- a/crates/brk_computer/src/stateful_new/metrics/supply.rs +++ b/crates/brk_computer/src/stateful_new/metrics/supply.rs @@ -4,7 +4,7 @@ use brk_error::Result; use brk_traversable::Traversable; -use brk_types::{Bitcoin, DateIndex, Dollars, Height, Sats, StoredF64, StoredU64, Version}; +use brk_types::{Bitcoin, DateIndex, Dollars, Height, Sats, StoredU64, Version}; use vecdb::{ AnyStoredVec, AnyVec, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableVec, PcoVec, TypedVecIterator, diff --git a/crates/brk_computer/src/stateful_new/process/address_lookup.rs b/crates/brk_computer/src/stateful_new/process/address_lookup.rs new file mode 100644 index 000000000..5d5bf050d --- /dev/null +++ b/crates/brk_computer/src/stateful_new/process/address_lookup.rs @@ -0,0 +1,105 @@ +//! Address data lookup and source tracking. +//! +//! Handles looking up existing address data from storage and tracking +//! whether addresses are new, from storage, or previously empty. + +use brk_types::{EmptyAddressData, LoadedAddressData, OutputType, TypeIndex}; + +use super::super::address::AddressTypeToTypeIndexMap; +pub use super::WithAddressDataSource; + +/// Loaded address data with source tracking for flush operations. +pub type LoadedAddressDataWithSource = WithAddressDataSource; + +/// Empty address data with source tracking for flush operations. +pub type EmptyAddressDataWithSource = WithAddressDataSource; + +/// Context for looking up and storing address data during block processing. +/// +/// Uses the same pattern as the original stateful module: +/// - `loaded`: addresses with non-zero balance (wrapped with source info) +/// - `empty`: addresses that became empty this block (wrapped with source info) +pub struct AddressLookup<'a, F> { + /// Function to get existing address data from storage + pub get_address_data: F, + /// Loaded addresses touched in current block + pub loaded: &'a mut AddressTypeToTypeIndexMap, + /// Empty addresses touched in current block + pub empty: &'a mut AddressTypeToTypeIndexMap, +} + +impl<'a, F> AddressLookup<'a, F> +where + F: FnMut(OutputType, TypeIndex) -> Option, +{ + /// Get or create address data for a receive operation. + /// + /// Returns (address_data, is_new, from_empty) + pub fn get_or_create_for_receive( + &mut self, + output_type: OutputType, + type_index: TypeIndex, + ) -> (&mut LoadedAddressDataWithSource, bool, bool) { + let mut is_new = false; + let mut from_empty = false; + + let addr_data = self + .loaded + .get_mut(output_type) + .unwrap() + .entry(type_index) + .or_insert_with(|| { + // Check if it was in empty set + if let Some(empty_data) = self.empty.get_mut(output_type).unwrap().remove(&type_index) { + from_empty = true; + return empty_data.into(); + } + + // Look up from storage or create new + match (self.get_address_data)(output_type, type_index) { + Some(data) => { + is_new = data.is_new(); + from_empty = data.is_from_emptyaddressdata(); + data + } + None => { + is_new = true; + WithAddressDataSource::New(LoadedAddressData::default()) + } + } + }); + + (addr_data, is_new, from_empty) + } + + /// Get address data for a send operation (must exist). + pub fn get_for_send( + &mut self, + output_type: OutputType, + type_index: TypeIndex, + ) -> &mut LoadedAddressDataWithSource { + self.loaded + .get_mut(output_type) + .unwrap() + .entry(type_index) + .or_insert_with(|| { + (self.get_address_data)(output_type, type_index) + .expect("Address must exist for send") + }) + } + + /// Move address from loaded to empty set. + pub fn move_to_empty(&mut self, output_type: OutputType, type_index: TypeIndex) { + let data = self + .loaded + .get_mut(output_type) + .unwrap() + .remove(&type_index) + .unwrap(); + + self.empty + .get_mut(output_type) + .unwrap() + .insert(type_index, data.into()); + } +} diff --git a/crates/brk_computer/src/stateful_new/process/empty_addresses.rs b/crates/brk_computer/src/stateful_new/process/empty_addresses.rs index ce7e77c5e..21d056426 100644 --- a/crates/brk_computer/src/stateful_new/process/empty_addresses.rs +++ b/crates/brk_computer/src/stateful_new/process/empty_addresses.rs @@ -1,6 +1,7 @@ use brk_error::Result; -use brk_types::{AddressDataSource, AnyAddressIndex, EmptyAddressData}; +use brk_types::AnyAddressIndex; +use super::EmptyAddressDataWithSource; use crate::stateful_new::{AddressTypeToTypeIndexMap, AddressesDataVecs}; /// Process empty address data updates. @@ -11,24 +12,24 @@ use crate::stateful_new::{AddressTypeToTypeIndexMap, AddressesDataVecs}; /// - Transition loaded -> empty: delete from loaded, push to empty pub fn process_empty_addresses( addresses_data: &mut AddressesDataVecs, - empty_updates: AddressTypeToTypeIndexMap>, + empty_updates: AddressTypeToTypeIndexMap, ) -> Result> { let mut result = AddressTypeToTypeIndexMap::default(); for (address_type, sorted) in empty_updates.into_sorted_iter() { for (typeindex, source) in sorted { match source { - AddressDataSource::New(data) => { + EmptyAddressDataWithSource::New(data) => { let index = addresses_data.empty.fill_first_hole_or_push(data)?; result .get_mut(address_type) .unwrap() .insert(typeindex, AnyAddressIndex::from(index)); } - AddressDataSource::FromEmpty((index, data)) => { + EmptyAddressDataWithSource::FromEmpty(index, data) => { addresses_data.empty.update(index, data)?; } - AddressDataSource::FromLoaded((loaded_index, data)) => { + EmptyAddressDataWithSource::FromLoaded(loaded_index, data) => { addresses_data.loaded.delete(loaded_index); let empty_index = addresses_data.empty.fill_first_hole_or_push(data)?; result diff --git a/crates/brk_computer/src/stateful_new/process/inputs.rs b/crates/brk_computer/src/stateful_new/process/inputs.rs index 78cfaf87b..786e54b8b 100644 --- a/crates/brk_computer/src/stateful_new/process/inputs.rs +++ b/crates/brk_computer/src/stateful_new/process/inputs.rs @@ -4,17 +4,24 @@ //! - height_to_sent: map from creation height -> Transacted for sends //! - Address data for address cohort tracking (optional) -use brk_types::{Height, OutPoint, OutputType, Sats, TxInIndex, TxIndex, TxOutIndex, TypeIndex}; +use brk_grouper::ByAddressType; +use brk_types::{ + AnyAddressDataIndexEnum, Height, LoadedAddressData, OutPoint, OutputType, Sats, TxInIndex, + TxIndex, TxOutIndex, TypeIndex, +}; use rayon::prelude::*; use rustc_hash::FxHashMap; use vecdb::{BytesVec, GenericStoredVec, PcoVec}; +use crate::stateful_new::address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs}; +use crate::stateful_new::compute::VecsReaders; use crate::{ stateful_new::{IndexerReaders, process::RangeMap}, states::Transacted, }; use super::super::address::HeightToAddressTypeToVec; +use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec, WithAddressDataSource}; /// Result of processing inputs for a block. pub struct InputsResult { @@ -22,6 +29,10 @@ pub struct InputsResult { pub height_to_sent: FxHashMap, /// Per-height, per-address-type sent data: (typeindex, value) for each address. pub sent_data: HeightToAddressTypeToVec<(TypeIndex, Sats)>, + /// Address data looked up during processing, keyed by (address_type, typeindex). + pub address_data: AddressTypeToTypeIndexMap, + /// Transaction indexes per address for tx_count tracking. + pub txindex_vecs: AddressTypeToTypeIndexMap, } /// Process inputs (spent UTXOs) for a block in parallel. @@ -30,8 +41,9 @@ pub struct InputsResult { /// 1. Read outpoint, resolve to txoutindex /// 2. Get the creation height from txoutindex_to_height map /// 3. Read value and type from the referenced output -/// 4. Accumulate into height_to_sent map -/// 5. Track address-specific data if input references an address type +/// 4. Look up address data if input references an address type +/// 5. Accumulate into height_to_sent map +/// 6. Track address-specific data for address cohort processing #[allow(clippy::too_many_arguments)] pub fn process_inputs( first_txinindex: usize, @@ -44,13 +56,21 @@ pub fn process_inputs( txoutindex_to_typeindex: &BytesVec, txoutindex_to_height: &RangeMap, ir: &IndexerReaders, + // Address lookup parameters + first_addressindexes: &ByAddressType, + loaded_cache: &AddressTypeToTypeIndexMap, + empty_cache: &AddressTypeToTypeIndexMap, + vr: &VecsReaders, + any_address_indexes: &AnyAddressIndexesVecs, + addresses_data: &AddressesDataVecs, ) -> InputsResult { - let (height_to_sent, sent_data) = (first_txinindex..first_txinindex + input_count) + let (height_to_sent, sent_data, address_data, txindex_vecs) = (first_txinindex + ..first_txinindex + input_count) .into_par_iter() .map(|i| { let txinindex = TxInIndex::from(i); let local_idx = i - first_txinindex; - let _txindex = txinindex_to_txindex[local_idx]; + let txindex = txinindex_to_txindex[local_idx]; // Get outpoint and resolve to txoutindex let outpoint = txinindex_to_outpoint.read_unwrap(txinindex, &ir.txinindex_to_outpoint); @@ -66,7 +86,7 @@ pub fn process_inputs( let input_type = txoutindex_to_outputtype.read_unwrap(txoutindex, &ir.txoutindex_to_outputtype); - // Non-address inputs don't need typeindex + // Non-address inputs don't need typeindex or address lookup if input_type.is_not_address() { return (prev_height, value, input_type, None); } @@ -74,31 +94,62 @@ pub fn process_inputs( let typeindex = txoutindex_to_typeindex.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex); - (prev_height, value, input_type, Some((typeindex, value))) + // Look up address data + let addr_data_opt = get_address_data( + input_type, + typeindex, + first_addressindexes, + loaded_cache, + empty_cache, + vr, + any_address_indexes, + addresses_data, + ); + + ( + prev_height, + value, + input_type, + Some((typeindex, txindex, value, addr_data_opt)), + ) }) .fold( || { ( FxHashMap::::default(), HeightToAddressTypeToVec::default(), + AddressTypeToTypeIndexMap::::default(), + AddressTypeToTypeIndexMap::::default(), ) }, - |(mut height_to_sent, mut sent_data), (prev_height, value, output_type, addr_data)| { + |(mut height_to_sent, mut sent_data, mut address_data, mut txindex_vecs), + (prev_height, value, output_type, addr_info)| { height_to_sent .entry(prev_height) .or_default() .iterate(value, output_type); - if let Some((typeindex, value)) = addr_data { + if let Some((typeindex, txindex, value, addr_data_opt)) = addr_info { sent_data .entry(prev_height) .or_default() .get_mut(output_type) .unwrap() .push((typeindex, value)); + + if let Some(addr_data) = addr_data_opt { + address_data.insert_for_type(output_type, typeindex, addr_data); + } + + txindex_vecs + .get_mut(output_type) + .unwrap() + .entry(typeindex) + .or_insert_with(TxIndexVec::new) + .push(txindex); } - (height_to_sent, sent_data) + (height_to_sent, sent_data, address_data, txindex_vecs) }, ) .reduce( @@ -106,9 +157,11 @@ pub fn process_inputs( ( FxHashMap::::default(), HeightToAddressTypeToVec::default(), + AddressTypeToTypeIndexMap::::default(), + AddressTypeToTypeIndexMap::::default(), ) }, - |(mut h1, mut s1), (h2, s2)| { + |(mut h1, mut s1, a1, tx1), (h2, s2, a2, tx2)| { // Merge height_to_sent maps for (k, v) in h2 { *h1.entry(k).or_default() += v; @@ -117,12 +170,68 @@ pub fn process_inputs( // Merge sent_data maps s1.merge_mut(s2); - (h1, s1) + (h1, s1, a1.merge(a2), tx1.merge_vec(tx2)) }, ); InputsResult { height_to_sent, sent_data, + address_data, + txindex_vecs, } } + +/// Look up address data from storage or determine if new. +/// +/// Returns None if address is already in loaded or empty cache. +fn get_address_data( + address_type: OutputType, + typeindex: TypeIndex, + first_addressindexes: &ByAddressType, + loaded_cache: &AddressTypeToTypeIndexMap, + empty_cache: &AddressTypeToTypeIndexMap, + vr: &VecsReaders, + any_address_indexes: &AnyAddressIndexesVecs, + addresses_data: &AddressesDataVecs, +) -> Option { + // Check if this is a new address (typeindex >= first for this height) + let first = *first_addressindexes.get(address_type).unwrap(); + if first <= typeindex { + return Some(WithAddressDataSource::New(LoadedAddressData::default())); + } + + // Skip if already in cache + if loaded_cache + .get(address_type) + .unwrap() + .contains_key(&typeindex) + || empty_cache + .get(address_type) + .unwrap() + .contains_key(&typeindex) + { + return None; + } + + // Read from storage + let reader = vr.address_reader(address_type); + let anyaddressindex = any_address_indexes.get(address_type, typeindex, reader); + + Some(match anyaddressindex.to_enum() { + AnyAddressDataIndexEnum::Loaded(loaded_index) => { + let reader = &vr.anyaddressindex_to_anyaddressdata.loaded; + let loaded_data = addresses_data + .loaded + .get_pushed_or_read_unwrap(loaded_index, reader); + WithAddressDataSource::FromLoaded(loaded_index, loaded_data) + } + AnyAddressDataIndexEnum::Empty(empty_index) => { + let reader = &vr.anyaddressindex_to_anyaddressdata.empty; + let empty_data = addresses_data + .empty + .get_pushed_or_read_unwrap(empty_index, reader); + WithAddressDataSource::FromEmpty(empty_index, empty_data.into()) + } + }) +} diff --git a/crates/brk_computer/src/stateful_new/process/loaded_addresses.rs b/crates/brk_computer/src/stateful_new/process/loaded_addresses.rs index 3580f7bc5..94d442566 100644 --- a/crates/brk_computer/src/stateful_new/process/loaded_addresses.rs +++ b/crates/brk_computer/src/stateful_new/process/loaded_addresses.rs @@ -1,6 +1,7 @@ use brk_error::Result; -use brk_types::{AddressDataSource, AnyAddressIndex, LoadedAddressData}; +use brk_types::AnyAddressIndex; +use super::LoadedAddressDataWithSource; use crate::stateful_new::{AddressTypeToTypeIndexMap, AddressesDataVecs}; /// Process loaded address data updates. @@ -11,24 +12,24 @@ use crate::stateful_new::{AddressTypeToTypeIndexMap, AddressesDataVecs}; /// - Transition empty -> loaded: delete from empty, push to loaded pub fn process_loaded_addresses( addresses_data: &mut AddressesDataVecs, - loaded_updates: AddressTypeToTypeIndexMap>, + loaded_updates: AddressTypeToTypeIndexMap, ) -> Result> { let mut result = AddressTypeToTypeIndexMap::default(); for (address_type, sorted) in loaded_updates.into_sorted_iter() { for (typeindex, source) in sorted { match source { - AddressDataSource::New(data) => { + LoadedAddressDataWithSource::New(data) => { let index = addresses_data.loaded.fill_first_hole_or_push(data)?; result .get_mut(address_type) .unwrap() .insert(typeindex, AnyAddressIndex::from(index)); } - AddressDataSource::FromLoaded((index, data)) => { + LoadedAddressDataWithSource::FromLoaded(index, data) => { addresses_data.loaded.update(index, data)?; } - AddressDataSource::FromEmpty((empty_index, data)) => { + LoadedAddressDataWithSource::FromEmpty(empty_index, data) => { addresses_data.empty.delete(empty_index); let loaded_index = addresses_data.loaded.fill_first_hole_or_push(data)?; result diff --git a/crates/brk_computer/src/stateful_new/process/mod.rs b/crates/brk_computer/src/stateful_new/process/mod.rs index 7ee37a162..343b9185b 100644 --- a/crates/brk_computer/src/stateful_new/process/mod.rs +++ b/crates/brk_computer/src/stateful_new/process/mod.rs @@ -1,11 +1,21 @@ +mod address_lookup; mod empty_addresses; mod inputs; mod loaded_addresses; mod outputs; mod range_map; +mod received; +mod sent; +mod tx_counts; +mod with_source; +pub use address_lookup::*; pub use empty_addresses::*; pub use inputs::*; pub use loaded_addresses::*; pub use outputs::*; pub use range_map::*; +pub use received::*; +pub use sent::*; +pub use tx_counts::*; +pub use with_source::*; diff --git a/crates/brk_computer/src/stateful_new/process/outputs.rs b/crates/brk_computer/src/stateful_new/process/outputs.rs index c471ac936..378b935d8 100644 --- a/crates/brk_computer/src/stateful_new/process/outputs.rs +++ b/crates/brk_computer/src/stateful_new/process/outputs.rs @@ -4,13 +4,23 @@ //! - Transacted: aggregated supply by output type and amount range //! - Address data for address cohort tracking (optional) -use brk_types::{OutputType, Sats, TxIndex, TxOutIndex, TypeIndex}; +use brk_grouper::ByAddressType; +use brk_types::{ + AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TxOutIndex, TypeIndex, +}; use rayon::prelude::*; +use smallvec::SmallVec; use vecdb::{BytesVec, GenericStoredVec}; +use crate::stateful_new::address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs}; +use crate::stateful_new::compute::VecsReaders; use crate::{stateful_new::IndexerReaders, states::Transacted}; use super::super::address::AddressTypeToVec; +use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, WithAddressDataSource}; + +/// SmallVec for transaction indexes - most addresses have few transactions per block. +pub type TxIndexVec = SmallVec<[TxIndex; 4]>; /// Result of processing outputs for a block. pub struct OutputsResult { @@ -18,6 +28,10 @@ pub struct OutputsResult { pub transacted: Transacted, /// Per-address-type received data: (typeindex, value) for each address. pub received_data: AddressTypeToVec<(TypeIndex, Sats)>, + /// Address data looked up during processing, keyed by (address_type, typeindex). + pub address_data: AddressTypeToTypeIndexMap, + /// Transaction indexes per address for tx_count tracking. + pub txindex_vecs: AddressTypeToTypeIndexMap, } /// Process outputs (new UTXOs) for a block in parallel. @@ -25,7 +39,9 @@ pub struct OutputsResult { /// For each output: /// 1. Read value and output type from indexer /// 2. Accumulate into Transacted by type and amount -/// 3. Track address-specific data if output is an address type +/// 3. Look up address data if output is an address type +/// 4. Track address-specific data for address cohort processing +#[allow(clippy::too_many_arguments)] pub fn process_outputs( first_txoutindex: usize, output_count: usize, @@ -34,19 +50,27 @@ pub fn process_outputs( txoutindex_to_outputtype: &BytesVec, txoutindex_to_typeindex: &BytesVec, ir: &IndexerReaders, + // Address lookup parameters + first_addressindexes: &ByAddressType, + loaded_cache: &AddressTypeToTypeIndexMap, + empty_cache: &AddressTypeToTypeIndexMap, + vr: &VecsReaders, + any_address_indexes: &AnyAddressIndexesVecs, + addresses_data: &AddressesDataVecs, ) -> OutputsResult { - let (transacted, received_data) = (first_txoutindex..first_txoutindex + output_count) + let (transacted, received_data, address_data, txindex_vecs) = (first_txoutindex + ..first_txoutindex + output_count) .into_par_iter() .map(|i| { let txoutindex = TxOutIndex::from(i); let local_idx = i - first_txoutindex; - let _txindex = txoutindex_to_txindex[local_idx]; + let txindex = txoutindex_to_txindex[local_idx]; let value = txoutindex_to_value.read_unwrap(txoutindex, &ir.txoutindex_to_value); let output_type = txoutindex_to_outputtype.read_unwrap(txoutindex, &ir.txoutindex_to_outputtype); - // Non-address outputs don't need typeindex + // Non-address outputs don't need typeindex or address lookup if output_type.is_not_address() { return (value, output_type, None); } @@ -54,30 +78,126 @@ pub fn process_outputs( let typeindex = txoutindex_to_typeindex.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex); - (value, output_type, Some((typeindex, value))) + // Look up address data + let addr_data_opt = get_address_data( + output_type, + typeindex, + first_addressindexes, + loaded_cache, + empty_cache, + vr, + any_address_indexes, + addresses_data, + ); + + (value, output_type, Some((typeindex, txindex, value, addr_data_opt))) }) .fold( - || (Transacted::default(), AddressTypeToVec::default()), - |(mut transacted, mut received_data), (value, output_type, addr_data)| { + || { + ( + Transacted::default(), + AddressTypeToVec::default(), + AddressTypeToTypeIndexMap::::default(), + AddressTypeToTypeIndexMap::::default(), + ) + }, + |(mut transacted, mut received_data, mut address_data, mut txindex_vecs), + (value, output_type, addr_info)| { transacted.iterate(value, output_type); - if let Some((typeindex, value)) = addr_data { + if let Some((typeindex, txindex, value, addr_data_opt)) = addr_info { received_data .get_mut(output_type) .unwrap() .push((typeindex, value)); + + if let Some(addr_data) = addr_data_opt { + address_data.insert_for_type(output_type, typeindex, addr_data); + } + + txindex_vecs + .get_mut(output_type) + .unwrap() + .entry(typeindex) + .or_insert_with(TxIndexVec::new) + .push(txindex); } - (transacted, received_data) + (transacted, received_data, address_data, txindex_vecs) }, ) .reduce( - || (Transacted::default(), AddressTypeToVec::default()), - |(t1, r1), (t2, r2)| (t1 + t2, r1.merge(r2)), + || { + ( + Transacted::default(), + AddressTypeToVec::default(), + AddressTypeToTypeIndexMap::::default(), + AddressTypeToTypeIndexMap::::default(), + ) + }, + |(t1, r1, a1, tx1), (t2, r2, a2, tx2)| { + (t1 + t2, r1.merge(r2), a1.merge(a2), tx1.merge_vec(tx2)) + }, ); OutputsResult { transacted, received_data, + address_data, + txindex_vecs, } } + +/// Look up address data from storage or determine if new. +/// +/// Returns None if address is already in loaded or empty cache. +fn get_address_data( + address_type: OutputType, + typeindex: TypeIndex, + first_addressindexes: &ByAddressType, + loaded_cache: &AddressTypeToTypeIndexMap, + empty_cache: &AddressTypeToTypeIndexMap, + vr: &VecsReaders, + any_address_indexes: &AnyAddressIndexesVecs, + addresses_data: &AddressesDataVecs, +) -> Option { + // Check if this is a new address (typeindex >= first for this height) + let first = *first_addressindexes.get(address_type).unwrap(); + if first <= typeindex { + return Some(WithAddressDataSource::New(LoadedAddressData::default())); + } + + // Skip if already in cache + if loaded_cache + .get(address_type) + .unwrap() + .contains_key(&typeindex) + || empty_cache + .get(address_type) + .unwrap() + .contains_key(&typeindex) + { + return None; + } + + // Read from storage + let reader = vr.address_reader(address_type); + let anyaddressindex = any_address_indexes.get(address_type, typeindex, reader); + + Some(match anyaddressindex.to_enum() { + AnyAddressDataIndexEnum::Loaded(loaded_index) => { + let reader = &vr.anyaddressindex_to_anyaddressdata.loaded; + let loaded_data = addresses_data + .loaded + .get_pushed_or_read_unwrap(loaded_index, reader); + WithAddressDataSource::FromLoaded(loaded_index, loaded_data) + } + AnyAddressDataIndexEnum::Empty(empty_index) => { + let reader = &vr.anyaddressindex_to_anyaddressdata.empty; + let empty_data = addresses_data + .empty + .get_pushed_or_read_unwrap(empty_index, reader); + WithAddressDataSource::FromEmpty(empty_index, empty_data.into()) + } + }) +} diff --git a/crates/brk_computer/src/stateful_new/process/received.rs b/crates/brk_computer/src/stateful_new/process/received.rs new file mode 100644 index 000000000..f7c4a8626 --- /dev/null +++ b/crates/brk_computer/src/stateful_new/process/received.rs @@ -0,0 +1,92 @@ +//! Process received outputs for address cohorts. +//! +//! Updates address cohort states when addresses receive funds: +//! - New addresses enter a cohort +//! - Existing addresses may cross cohort boundaries +//! - Empty addresses become non-empty again + +use brk_grouper::{ByAddressType, Filtered}; +use brk_types::{Dollars, OutputType, Sats, TypeIndex}; + +use super::super::address::AddressTypeToVec; +use super::super::cohorts::AddressCohorts; +use super::address_lookup::{AddressLookup, LoadedAddressDataWithSource}; + +/// Process received outputs for address cohorts. +/// +/// For each received output: +/// 1. Look up or create address data +/// 2. Update address balance and cohort membership +/// 3. Update cohort states (add/subtract for boundary crossings, receive otherwise) +pub fn process_received( + received_data: AddressTypeToVec<(TypeIndex, Sats)>, + cohorts: &mut AddressCohorts, + lookup: &mut AddressLookup, + price: Option, + addr_count: &mut ByAddressType, + empty_addr_count: &mut ByAddressType, +) where + F: FnMut(OutputType, TypeIndex) -> Option, +{ + for (output_type, vec) in received_data.unwrap().into_iter() { + if vec.is_empty() { + continue; + } + + for (type_index, value) in vec { + let (addr_data, is_new, from_empty) = + lookup.get_or_create_for_receive(output_type, type_index); + + // Update address counts + if is_new || from_empty { + *addr_count.get_mut(output_type).unwrap() += 1; + if from_empty { + *empty_addr_count.get_mut(output_type).unwrap() -= 1; + } + } + + let prev_balance = addr_data.balance(); + let new_balance = prev_balance + value; + + // Check if crossing cohort boundary + let prev_cohort = cohorts.amount_range.get(prev_balance); + let new_cohort = cohorts.amount_range.get(new_balance); + let filters_differ = prev_cohort.filter() != new_cohort.filter(); + + if is_new || from_empty || filters_differ { + // Address entering or changing cohorts + if !is_new && !from_empty { + // Subtract from old cohort + cohorts + .amount_range + .get_mut(prev_balance) + .state + .as_mut() + .unwrap() + .subtract(addr_data); + } + + // Update address data + addr_data.receive(value, price); + + // Add to new cohort + cohorts + .amount_range + .get_mut(new_balance) + .state + .as_mut() + .unwrap() + .add(addr_data); + } else { + // Address staying in same cohort - update in place + cohorts + .amount_range + .get_mut(new_balance) + .state + .as_mut() + .unwrap() + .receive(addr_data, value, price); + } + } + } +} diff --git a/crates/brk_computer/src/stateful_new/process/sent.rs b/crates/brk_computer/src/stateful_new/process/sent.rs new file mode 100644 index 000000000..1c8eff606 --- /dev/null +++ b/crates/brk_computer/src/stateful_new/process/sent.rs @@ -0,0 +1,122 @@ +//! Process sent outputs for address cohorts. +//! +//! Updates address cohort states when addresses send funds: +//! - Addresses may cross cohort boundaries +//! - Addresses may become empty (0 balance) +//! - Age metrics (blocks_old, days_old) are tracked for sent UTXOs + +use brk_error::Result; +use brk_grouper::{ByAddressType, Filtered}; +use brk_types::{CheckedSub, Dollars, Height, OutputType, Sats, Timestamp, TypeIndex}; +use vecdb::VecIndex; + +use super::super::address::HeightToAddressTypeToVec; +use super::super::cohorts::AddressCohorts; +use super::address_lookup::{AddressLookup, LoadedAddressDataWithSource}; + +/// Process sent outputs for address cohorts. +/// +/// For each spent UTXO: +/// 1. Look up address data +/// 2. Calculate age metrics (blocks_old, days_old) +/// 3. Update address balance and cohort membership +/// 4. Handle addresses becoming empty +/// +/// Note: Takes separate price/timestamp slices instead of chain_state to allow +/// parallel execution with UTXO cohort processing (which mutates chain_state). +#[allow(clippy::too_many_arguments)] +pub fn process_sent( + sent_data: HeightToAddressTypeToVec<(TypeIndex, Sats)>, + cohorts: &mut AddressCohorts, + lookup: &mut AddressLookup, + current_price: Option, + addr_count: &mut ByAddressType, + empty_addr_count: &mut ByAddressType, + height_to_price: Option<&[Dollars]>, + height_to_timestamp: &[Timestamp], + current_height: Height, + current_timestamp: Timestamp, +) -> Result<()> +where + F: FnMut(OutputType, TypeIndex) -> Option, +{ + for (prev_height, by_type) in sent_data.into_iter() { + let prev_price = height_to_price.map(|v| v[prev_height.to_usize()]); + let prev_timestamp = height_to_timestamp[prev_height.to_usize()]; + + let blocks_old = current_height.to_usize() - prev_height.to_usize(); + let days_old = current_timestamp.difference_in_days_between_float(prev_timestamp); + let older_than_hour = current_timestamp + .checked_sub(prev_timestamp) + .map(|d| d.is_more_than_hour()) + .unwrap_or(false); + + for (output_type, vec) in by_type.unwrap().into_iter() { + for (type_index, value) in vec { + let addr_data = lookup.get_for_send(output_type, type_index); + + let prev_balance = addr_data.balance(); + let new_balance = prev_balance.checked_sub(value).unwrap(); + let will_be_empty = addr_data.has_1_utxos(); + + // Check if crossing cohort boundary + let prev_cohort = cohorts.amount_range.get(prev_balance); + let new_cohort = cohorts.amount_range.get(new_balance); + let filters_differ = prev_cohort.filter() != new_cohort.filter(); + + if will_be_empty || filters_differ { + // Subtract from old cohort + cohorts + .amount_range + .get_mut(prev_balance) + .state + .as_mut() + .unwrap() + .subtract(addr_data); + + // Update address data + addr_data.send(value, prev_price)?; + + if will_be_empty { + // Address becoming empty + debug_assert!(new_balance.is_zero()); + + *addr_count.get_mut(output_type).unwrap() -= 1; + *empty_addr_count.get_mut(output_type).unwrap() += 1; + + // Move from loaded to empty + lookup.move_to_empty(output_type, type_index); + } else { + // Add to new cohort + cohorts + .amount_range + .get_mut(new_balance) + .state + .as_mut() + .unwrap() + .add(addr_data); + } + } else { + // Address staying in same cohort - update in place + cohorts + .amount_range + .get_mut(new_balance) + .state + .as_mut() + .unwrap() + .send( + addr_data, + value, + current_price, + prev_price, + blocks_old, + days_old, + older_than_hour, + )?; + } + } + } + } + + Ok(()) +} diff --git a/crates/brk_computer/src/stateful_new/process/tx_counts.rs b/crates/brk_computer/src/stateful_new/process/tx_counts.rs new file mode 100644 index 000000000..38b142a26 --- /dev/null +++ b/crates/brk_computer/src/stateful_new/process/tx_counts.rs @@ -0,0 +1,53 @@ +//! Transaction count tracking per address. +//! +//! Updates tx_count on address data after deduplicating transaction indexes. + +use crate::stateful_new::address::AddressTypeToTypeIndexMap; + +use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec}; + +/// Update tx_count for addresses based on unique transactions they participated in. +/// +/// For each address: +/// 1. Deduplicate transaction indexes (an address may appear in multiple inputs/outputs of same tx) +/// 2. Add the unique count to the address's tx_count field +/// +/// Addresses are looked up in loaded_cache first, then empty_cache. +/// NOTE: This should be called AFTER merging parallel-fetched address data into loaded_cache. +pub fn update_tx_counts( + loaded_cache: &mut AddressTypeToTypeIndexMap, + empty_cache: &mut AddressTypeToTypeIndexMap, + mut txindex_vecs: AddressTypeToTypeIndexMap, +) { + // First, deduplicate txindex_vecs for addresses that appear multiple times in a block + for (_, map) in txindex_vecs.iter_mut() { + for (_, txindex_vec) in map.iter_mut() { + if txindex_vec.len() > 1 { + txindex_vec.sort_unstable(); + txindex_vec.dedup(); + } + } + } + + // Update tx_count on address data + for (address_type, typeindex, txindex_vec) in txindex_vecs + .into_iter() + .flat_map(|(t, m)| m.into_iter().map(move |(i, v)| (t, i, v))) + { + let tx_count = txindex_vec.len() as u32; + + if let Some(addr_data) = loaded_cache + .get_mut(address_type) + .unwrap() + .get_mut(&typeindex) + { + addr_data.tx_count += tx_count; + } else if let Some(addr_data) = empty_cache + .get_mut(address_type) + .unwrap() + .get_mut(&typeindex) + { + addr_data.tx_count += tx_count; + } + } +} diff --git a/crates/brk_computer/src/stateful_new/process/with_source.rs b/crates/brk_computer/src/stateful_new/process/with_source.rs new file mode 100644 index 000000000..5f9a6beea --- /dev/null +++ b/crates/brk_computer/src/stateful_new/process/with_source.rs @@ -0,0 +1,73 @@ +//! Address data wrapper that tracks its source for flush operations. + +use brk_types::{EmptyAddressData, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex}; + +/// Address data wrapped with its source location for flush operations. +/// +/// This enum tracks where the data came from so it can be correctly +/// updated or created during the flush phase. +#[derive(Debug)] +pub enum WithAddressDataSource { + /// Brand new address (never seen before) + New(T), + /// Loaded from loaded address storage (with original index) + FromLoaded(LoadedAddressIndex, T), + /// Loaded from empty address storage (with original index) + FromEmpty(EmptyAddressIndex, T), +} + +impl WithAddressDataSource { + pub fn is_new(&self) -> bool { + matches!(self, Self::New(_)) + } + + pub fn is_from_emptyaddressdata(&self) -> bool { + matches!(self, Self::FromEmpty(..)) + } + + pub fn deref_mut(&mut self) -> &mut T { + match self { + Self::New(v) | Self::FromLoaded(_, v) | Self::FromEmpty(_, v) => v, + } + } +} + +impl std::ops::Deref for WithAddressDataSource { + type Target = T; + + fn deref(&self) -> &Self::Target { + match self { + Self::New(v) | Self::FromLoaded(_, v) | Self::FromEmpty(_, v) => v, + } + } +} + +impl std::ops::DerefMut for WithAddressDataSource { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + Self::New(v) | Self::FromLoaded(_, v) | Self::FromEmpty(_, v) => v, + } + } +} + +impl From> for WithAddressDataSource { + #[inline] + fn from(value: WithAddressDataSource) -> Self { + match value { + WithAddressDataSource::New(v) => Self::New(v.into()), + WithAddressDataSource::FromLoaded(i, v) => Self::FromLoaded(i, v.into()), + WithAddressDataSource::FromEmpty(i, v) => Self::FromEmpty(i, v.into()), + } + } +} + +impl From> for WithAddressDataSource { + #[inline] + fn from(value: WithAddressDataSource) -> Self { + match value { + WithAddressDataSource::New(v) => Self::New(v.into()), + WithAddressDataSource::FromLoaded(i, v) => Self::FromLoaded(i, v.into()), + WithAddressDataSource::FromEmpty(i, v) => Self::FromEmpty(i, v.into()), + } + } +} diff --git a/crates/brk_computer/src/stateful_new/vecs.rs b/crates/brk_computer/src/stateful_new/vecs.rs index 16d42e67f..7d810c628 100644 --- a/crates/brk_computer/src/stateful_new/vecs.rs +++ b/crates/brk_computer/src/stateful_new/vecs.rs @@ -15,6 +15,7 @@ use crate::{ Indexes, SupplyState, chain, grouped::{ComputedVecsFromDateIndex, ComputedVecsFromHeight, Source, VecBuilderOptions}, indexes, price, + utils::OptionExt, }; use super::{ @@ -174,56 +175,228 @@ impl Vecs { /// 3. Flushes checkpoints periodically /// 4. Computes aggregate cohorts from separate cohorts /// 5. Computes derived metrics - /// - /// NOTE: This is a placeholder. The full implementation needs to be ported - /// from stateful/mod.rs once all the supporting methods on UTXOCohorts, - /// AddressCohorts, and state types are implemented. #[allow(clippy::too_many_arguments)] pub fn compute( &mut self, - _indexer: &Indexer, - _indexes: &indexes::Vecs, - _chain: &chain::Vecs, - _price: Option<&price::Vecs>, - _starting_indexes: &mut Indexes, - _exit: &Exit, + indexer: &Indexer, + indexes: &indexes::Vecs, + chain: &chain::Vecs, + price: Option<&price::Vecs>, + starting_indexes: &mut Indexes, + exit: &Exit, ) -> Result<()> { - // The full compute implementation requires these methods to be implemented: - // - // On UTXOCohorts: - // - tick_tock_next_block(&chain_state, timestamp) - // - receive(transacted, height, price) - // - send(height_to_sent, &mut chain_state) - // - truncate_push_aggregate_percentiles(height) - // - import_aggregate_price_to_amount(height) - // - reset_aggregate_price_to_amount() - // - // On UTXOCohortState: - // - reset_block_values() - // - reset_price_to_amount() - // - // On AddressCohortState: - // - inner.reset_block_values() - // - inner.reset_price_to_amount() - // - // On AddressTypeToHeightToAddressCount: - // - safe_flush(exit) - // - truncate_push(height, &count) - // - // See stateful/mod.rs:368-1397 for the full implementation. - // - // The basic structure is: - // 1. Validate computed versions against base version - // 2. Find min stateful height and recover state - // 3. For each block: - // a. Reset per-block values - // b. Process outputs in parallel (receive) - // c. Process inputs in parallel (send) - // d. Push to height-indexed vectors - // e. Flush checkpoint every 10,000 blocks - // 4. Compute aggregate cohorts from separate cohorts - // 5. Compute rest_part1 (dateindex mappings) - // 6. Compute rest_part2 (ratios and relative metrics) + use super::compute::{ + StartMode, determine_start_mode, process_blocks, + }; + use crate::states::BlockState; + use vecdb::{AnyVec, GenericStoredVec, Stamp, TypedVecIterator, VecIndex}; + + // 1. Find minimum computed height for recovery + let chain_state_height = Height::from(self.chain_state.len()); + + // Get minimum heights without holding mutable references + let utxo_min = self.utxo_cohorts.min_separate_height_vecs_len(); + let address_min = self.address_cohorts.min_separate_height_vecs_len(); + + let stateful_min = utxo_min + .min(address_min) + .min(Height::from(self.chain_state.len())) + .min(self.any_address_indexes.min_stamped_height()) + .min(self.addresses_data.min_stamped_height()) + .min(Height::from(self.height_to_unspendable_supply.len())) + .min(Height::from(self.height_to_opreturn_supply.len())); + + // 2. Determine start mode and recover state + let start_mode = determine_start_mode(stateful_min, chain_state_height); + + let (starting_height, mut chain_state) = match start_mode { + StartMode::Resume(height) => { + let stamp = Stamp::from(height); + + // Rollback state vectors + let _ = self.chain_state.rollback_before(stamp); + let _ = self.any_address_indexes.rollback_before(stamp); + let _ = self.addresses_data.rollback_before(stamp); + + // Import cohort states + self.utxo_cohorts.import_separate_states(height); + self.address_cohorts.import_separate_states(height); + + // Import aggregate price_to_amount + let _ = self.utxo_cohorts.import_aggregate_price_to_amount(height); + + // Recover chain_state from stored values + let chain_state = if !height.is_zero() { + let height_to_timestamp = &indexes.height_to_timestamp_fixed; + let height_to_price = price.map(|p| &p.chainindexes_to_price_close.height); + + let mut height_to_timestamp_iter = height_to_timestamp.into_iter(); + let mut height_to_price_iter = height_to_price.map(|v| v.into_iter()); + let mut chain_state_iter = self.chain_state.into_iter(); + + (0..height.to_usize()) + .map(|h| { + let h = Height::from(h); + BlockState { + supply: chain_state_iter.get_unwrap(h), + price: height_to_price_iter.as_mut().map(|v| *v.get_unwrap(h)), + timestamp: height_to_timestamp_iter.get_unwrap(h), + } + }) + .collect() + } else { + vec![] + }; + + (height, chain_state) + } + StartMode::Fresh => { + // Reset all state + self.any_address_indexes.reset()?; + self.addresses_data.reset()?; + + // Reset state heights + self.utxo_cohorts.reset_separate_state_heights(); + self.address_cohorts.reset_separate_state_heights(); + + // Reset price_to_amount for all separate cohorts + self.utxo_cohorts.reset_separate_price_to_amount()?; + self.address_cohorts.reset_separate_price_to_amount()?; + + // Reset aggregate cohorts' price_to_amount + self.utxo_cohorts.reset_aggregate_price_to_amount()?; + + (Height::ZERO, vec![]) + } + }; + + // 3. Get last height from indexer + let last_height = Height::from(indexer.vecs.height_to_blockhash.len().saturating_sub(1)); + + // 4. Process blocks + if starting_height <= last_height { + process_blocks( + self, + indexer, + indexes, + chain, + price, + starting_height, + last_height, + &mut chain_state, + exit, + )?; + } + + // 5. Compute aggregates (overlapping cohorts from separate cohorts) + self.utxo_cohorts + .compute_overlapping_vecs(starting_indexes, exit)?; + self.address_cohorts + .compute_overlapping_vecs(starting_indexes, exit)?; + + // 6. Compute rest part1 (dateindex mappings) + self.utxo_cohorts + .compute_rest_part1(indexes, price, starting_indexes, exit)?; + self.address_cohorts + .compute_rest_part1(indexes, price, starting_indexes, exit)?; + + // 7. Compute indexes_to_market_cap from dateindex supply + if let Some(indexes_to_market_cap) = self.indexes_to_market_cap.as_mut() { + indexes_to_market_cap.compute_all(starting_indexes, exit, |v| { + v.compute_transform( + starting_indexes.dateindex, + self.utxo_cohorts + .all + .metrics + .supply + .indexes_to_supply + .dollars + .as_ref() + .unwrap() + .dateindex + .as_ref() + .unwrap(), + |(i, v, ..)| (i, v), + exit, + )?; + Ok(()) + })?; + } + + // 8. Compute rest part2 (relative metrics) + let height_to_supply = &self + .utxo_cohorts + .all + .metrics + .supply + .height_to_supply_value + .bitcoin + .clone(); + + let dateindex_to_supply = self + .utxo_cohorts + .all + .metrics + .supply + .indexes_to_supply + .bitcoin + .dateindex + .clone(); + + let height_to_market_cap = self.height_to_market_cap.clone(); + + let dateindex_to_market_cap = self + .indexes_to_market_cap + .as_ref() + .map(|v| v.dateindex.u().clone()); + + let height_to_realized_cap = self + .utxo_cohorts + .all + .metrics + .realized + .as_ref() + .map(|r| r.height_to_realized_cap.clone()); + + let dateindex_to_realized_cap = self + .utxo_cohorts + .all + .metrics + .realized + .as_ref() + .map(|r| r.indexes_to_realized_cap.dateindex.unwrap_last().clone()); + + let dateindex_to_supply_ref = dateindex_to_supply.u(); + let height_to_market_cap_ref = height_to_market_cap.as_ref(); + let dateindex_to_market_cap_ref = dateindex_to_market_cap.as_ref(); + let height_to_realized_cap_ref = height_to_realized_cap.as_ref(); + let dateindex_to_realized_cap_ref = dateindex_to_realized_cap.as_ref(); + + self.utxo_cohorts.compute_rest_part2( + indexes, + price, + starting_indexes, + height_to_supply, + dateindex_to_supply_ref, + height_to_market_cap_ref, + dateindex_to_market_cap_ref, + height_to_realized_cap_ref, + dateindex_to_realized_cap_ref, + exit, + )?; + + self.address_cohorts.compute_rest_part2( + indexes, + price, + starting_indexes, + height_to_supply, + dateindex_to_supply_ref, + height_to_market_cap_ref, + dateindex_to_market_cap_ref, + height_to_realized_cap_ref, + dateindex_to_realized_cap_ref, + exit, + )?; self.db.compact()?; Ok(()) diff --git a/crates/brk_grouper/src/address.rs b/crates/brk_grouper/src/address.rs index 1c5f2c18c..e3162aca6 100644 --- a/crates/brk_grouper/src/address.rs +++ b/crates/brk_grouper/src/address.rs @@ -49,6 +49,10 @@ impl AddressGroups { .chain(self.lt_amount.par_iter_mut()) } + pub fn iter_separate(&self) -> impl Iterator { + self.amount_range.iter() + } + pub fn iter_separate_mut(&mut self) -> impl Iterator { self.amount_range.iter_mut() } diff --git a/crates/brk_grouper/src/utxo.rs b/crates/brk_grouper/src/utxo.rs index 8cfcd9292..50ddce915 100644 --- a/crates/brk_grouper/src/utxo.rs +++ b/crates/brk_grouper/src/utxo.rs @@ -84,6 +84,14 @@ impl UTXOGroups { .chain(self.type_.par_iter_mut()) } + pub fn iter_separate(&self) -> impl Iterator { + self.age_range + .iter() + .chain(self.epoch.iter()) + .chain(self.amount_range.iter()) + .chain(self.type_.iter()) + } + pub fn iter_separate_mut(&mut self) -> impl Iterator { self.age_range .iter_mut() diff --git a/crates/brk_store/src/lib.rs b/crates/brk_store/src/lib.rs index f0c50b9bf..0f0865b47 100644 --- a/crates/brk_store/src/lib.rs +++ b/crates/brk_store/src/lib.rs @@ -25,7 +25,7 @@ const MAJOR_FJALL_VERSION: Version = Version::new(3); pub fn open_database(path: &Path) -> fjall::Result { Database::builder(path.join("fjall")) .cache_size(3 * 1024 * 1024 * 1024) - .max_cached_files(Some(1024)) + .max_cached_files(Some(512)) .open() } diff --git a/crates/brk_types/src/addressdata_source.rs b/crates/brk_types/src/addressdata_source.rs deleted file mode 100644 index 1d2af20a7..000000000 --- a/crates/brk_types/src/addressdata_source.rs +++ /dev/null @@ -1,12 +0,0 @@ -use crate::{EmptyAddressIndex, LoadedAddressIndex}; - -/// Source of address data update (where the data came from). -#[derive(Clone)] -pub enum AddressDataSource { - /// Brand new address, not in any storage yet. - New(T), - /// From empty address storage. - FromEmpty((EmptyAddressIndex, T)), - /// From loaded address storage. - FromLoaded((LoadedAddressIndex, T)), -} diff --git a/crates/brk_types/src/lib.rs b/crates/brk_types/src/lib.rs index 385fa74a8..6a5dabd70 100644 --- a/crates/brk_types/src/lib.rs +++ b/crates/brk_types/src/lib.rs @@ -5,7 +5,6 @@ pub use vecdb::{CheckedSub, Exit, PrintableIndex, Version}; mod address; mod addressbytes; mod addresschainstats; -mod addressdata_source; mod addresshash; mod addressindexoutpoint; mod addressindextxindex; @@ -104,7 +103,6 @@ mod yearindex; pub use address::*; pub use addressbytes::*; pub use addresschainstats::*; -pub use addressdata_source::*; pub use addresshash::*; pub use addressindexoutpoint::*; pub use addressindextxindex::*;