diff --git a/Cargo.lock b/Cargo.lock index eab7848a9..ecba31013 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4193,8 +4193,6 @@ dependencies = [ [[package]] name = "rawdb" version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b50bc582b92390d3efc2dabb509f77c1d8078eaa07c2bb3528bd414c0d337425" dependencies = [ "libc", "log", @@ -5385,8 +5383,6 @@ checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" [[package]] name = "vecdb" version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09567d773f1f8fd20bfe9822f5c1d6c1bb148f1edc88fe2676ac93b692ad561f" dependencies = [ "ctrlc", "log", @@ -5394,6 +5390,7 @@ dependencies = [ "parking_lot", "pco", "rawdb", + "rayon", "serde", "serde_json", "thiserror 2.0.17", @@ -5405,8 +5402,6 @@ dependencies = [ [[package]] name = "vecdb_derive" version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5297336eb5d686ea22acddd2368c1b97d131f035c6d9a736d61087a318546e41" dependencies = [ "quote", "syn 2.0.111", diff --git a/Cargo.toml b/Cargo.toml index b80ef281c..039e23ccc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,8 +81,8 @@ serde_derive = "1.0.228" serde_json = { version = "1.0.145", features = ["float_roundtrip"] } smallvec = "1.15.1" tokio = { version = "1.48.0", features = ["rt-multi-thread"] } -vecdb = { version = "0.4.2", features = ["derive", "serde_json", "pco"] } -# vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco"] } +# vecdb = { version = "0.4.2", features = ["derive", "serde_json", "pco"] } +vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco"] } # vecdb = { git = "https://github.com/anydb-rs/anydb", features = ["derive", "serde_json", "pco"] } [workspace.metadata.release] diff --git a/crates/brk_computer/examples/price_to_amount.rs b/crates/brk_computer/examples/price_to_amount.rs deleted file mode 100644 index 7eec910c7..000000000 --- a/crates/brk_computer/examples/price_to_amount.rs +++ /dev/null @@ -1,15 +0,0 @@ -use std::path::Path; - -use brk_computer::PriceToAmount; -use brk_error::Result; -use brk_types::Height; - -pub fn main() -> Result<()> { - let path = Path::new(&std::env::var("HOME").unwrap()) - .join(".brk") - .join("computed/stateful/states"); - let mut price_to_amount = PriceToAmount::create(&path, "addrs_above_1btc_under_10btc"); - dbg!(price_to_amount.import_at_or_before(Height::new(890000))?); - dbg!(price_to_amount); - Ok(()) -} diff --git a/crates/brk_computer/src/grouped/price_percentiles.rs b/crates/brk_computer/src/grouped/price_percentiles.rs index a70c37db1..2e052a4f4 100644 --- a/crates/brk_computer/src/grouped/price_percentiles.rs +++ b/crates/brk_computer/src/grouped/price_percentiles.rs @@ -3,7 +3,7 @@ use brk_traversable::{Traversable, TreeNode}; use brk_types::{DateIndex, Dollars, Version}; use vecdb::{AnyExportableVec, AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, PcoVec}; -use crate::{Indexes, indexes, stateful::Flushable}; +use crate::{Indexes, indexes}; use super::{ComputedVecsFromDateIndex, Source, VecBuilderOptions}; @@ -62,11 +62,7 @@ impl PricePercentiles { Ok(()) } - pub fn compute_rest( - &mut self, - starting_indexes: &Indexes, - exit: &Exit, - ) -> Result<()> { + pub fn compute_rest(&mut self, starting_indexes: &Indexes, exit: &Exit) -> Result<()> { for vec in self.vecs.iter_mut().flatten() { vec.compute_rest( starting_indexes, @@ -85,17 +81,6 @@ impl PricePercentiles { } } -impl Flushable for PricePercentiles { - fn safe_flush(&mut self, exit: &Exit) -> Result<()> { - for vec in self.vecs.iter_mut().flatten() { - if let Some(dateindex_vec) = vec.dateindex.as_mut() { - dateindex_vec.safe_flush(exit)?; - } - } - Ok(()) - } -} - impl PricePercentiles { pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { for vec in self.vecs.iter_mut().flatten() { diff --git a/crates/brk_computer/src/lib.rs b/crates/brk_computer/src/lib.rs index 7f47d505d..c2af98103 100644 --- a/crates/brk_computer/src/lib.rs +++ b/crates/brk_computer/src/lib.rs @@ -22,18 +22,12 @@ mod market; mod pools; mod price; mod stateful; -// mod stateful_old; -mod states; mod traits; mod utils; use indexes::Indexes; use utils::OptionExt; -// pub use pools::*; -pub use states::PriceToAmount; -use states::*; - #[derive(Clone, Traversable)] pub struct Computer { pub blks: blks::Vecs, diff --git a/crates/brk_computer/src/stateful/address/address_count.rs b/crates/brk_computer/src/stateful/address/address_count.rs index 8183f7593..4cb7f291a 100644 --- a/crates/brk_computer/src/stateful/address/address_count.rs +++ b/crates/brk_computer/src/stateful/address/address_count.rs @@ -61,16 +61,16 @@ impl AddressTypeToHeightToAddressCount { })?)) } - pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { + pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { use vecdb::AnyStoredVec; - self.p2pk65.safe_flush(exit)?; - self.p2pk33.safe_flush(exit)?; - self.p2pkh.safe_flush(exit)?; - self.p2sh.safe_flush(exit)?; - self.p2wpkh.safe_flush(exit)?; - self.p2wsh.safe_flush(exit)?; - self.p2tr.safe_flush(exit)?; - self.p2a.safe_flush(exit)?; + self.p2pk65.safe_write(exit)?; + self.p2pk33.safe_write(exit)?; + self.p2pkh.safe_write(exit)?; + self.p2sh.safe_write(exit)?; + self.p2wpkh.safe_write(exit)?; + self.p2wsh.safe_write(exit)?; + self.p2tr.safe_write(exit)?; + self.p2a.safe_write(exit)?; Ok(()) } diff --git a/crates/brk_computer/src/stateful/address/type_index_map.rs b/crates/brk_computer/src/stateful/address/type_index_map.rs index ca3696e24..6e2b25c8e 100644 --- a/crates/brk_computer/src/stateful/address/type_index_map.rs +++ b/crates/brk_computer/src/stateful/address/type_index_map.rs @@ -10,7 +10,7 @@ use smallvec::{Array, SmallVec}; use std::collections::hash_map::Entry; /// A hashmap for each address type, keyed by TypeIndex. -#[derive(Debug, Deref, DerefMut)] +#[derive(Debug, Clone, Deref, DerefMut)] pub struct AddressTypeToTypeIndexMap(ByAddressType>); impl Default for AddressTypeToTypeIndexMap { diff --git a/crates/brk_computer/src/stateful/cohorts/address.rs b/crates/brk_computer/src/stateful/cohorts/address.rs index b6c0bcf45..639a24a25 100644 --- a/crates/brk_computer/src/stateful/cohorts/address.rs +++ b/crates/brk_computer/src/stateful/cohorts/address.rs @@ -15,7 +15,7 @@ use crate::{ Indexes, grouped::{ComputedVecsFromHeight, Source, VecBuilderOptions}, indexes, price, - stateful::cohorts::AddressCohortState, + stateful::states::AddressCohortState, }; use super::super::metrics::{CohortMetrics, ImportConfig}; @@ -224,9 +224,9 @@ impl DynCohortVecs for AddressCohortVecs { Ok(()) } - fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { + fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { self.height_to_addr_count.safe_write(exit)?; - self.metrics.safe_flush(exit)?; + self.metrics.safe_write(exit)?; if let Some(state) = self.state.as_mut() { state.inner.commit(height)?; diff --git a/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs b/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs index e25cf656f..a89dbb3b3 100644 --- a/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs +++ b/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs @@ -222,10 +222,10 @@ impl AddressCohorts { }) } - /// Flush stateful vectors for separate cohorts. - pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { + /// Write stateful vectors for separate cohorts. + pub fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { self.par_iter_separate_mut() - .try_for_each(|v| v.safe_flush_stateful_vecs(height, exit)) + .try_for_each(|v| v.safe_write_stateful_vecs(height, exit)) } /// Get minimum height from all separate cohorts' height-indexed vectors. diff --git a/crates/brk_computer/src/stateful/cohorts/mod.rs b/crates/brk_computer/src/stateful/cohorts/mod.rs index c95e8d27a..80b94b938 100644 --- a/crates/brk_computer/src/stateful/cohorts/mod.rs +++ b/crates/brk_computer/src/stateful/cohorts/mod.rs @@ -8,19 +8,12 @@ mod address; mod address_cohorts; -mod state; -mod state_address; -mod state_utxo; mod traits; mod utxo; mod utxo_cohorts; -pub use crate::states::Flushable; pub use address::AddressCohortVecs; pub use address_cohorts::AddressCohorts; -pub use state::CohortState; -pub use state_address::AddressCohortState; -pub use state_utxo::UTXOCohortState; pub use traits::{CohortVecs, DynCohortVecs}; pub use utxo::UTXOCohortVecs; pub use utxo_cohorts::UTXOCohorts; diff --git a/crates/brk_computer/src/stateful/cohorts/traits.rs b/crates/brk_computer/src/stateful/cohorts/traits.rs index d92978010..81e8de277 100644 --- a/crates/brk_computer/src/stateful/cohorts/traits.rs +++ b/crates/brk_computer/src/stateful/cohorts/traits.rs @@ -4,7 +4,7 @@ use brk_error::Result; use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version}; use vecdb::{Exit, IterableVec}; -use crate::{indexes, price, Indexes}; +use crate::{Indexes, indexes, price}; /// Dynamic dispatch trait for cohort vectors. /// @@ -34,8 +34,8 @@ pub trait DynCohortVecs: Send + Sync { date_price: Option>, ) -> Result<()>; - /// Flush stateful vectors to disk. - fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()>; + /// Write stateful vectors to disk. + fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()>; /// First phase of post-processing computations. #[allow(clippy::too_many_arguments)] diff --git a/crates/brk_computer/src/stateful/cohorts/utxo.rs b/crates/brk_computer/src/stateful/cohorts/utxo.rs index 080e409d8..7ebaa5092 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo.rs @@ -9,9 +9,8 @@ use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version}; use vecdb::{Database, Exit, IterableVec}; use crate::{ - Indexes, - indexes, price, - stateful::{CohortVecs, DynCohortVecs, cohorts::UTXOCohortState}, + Indexes, indexes, price, + stateful::{CohortVecs, DynCohortVecs, states::UTXOCohortState}, }; use super::super::metrics::{CohortMetrics, ImportConfig}; @@ -120,14 +119,25 @@ impl DynCohortVecs for UTXOCohortVecs { prev_height = state.import_at_or_before(prev_height)?; // Restore supply state from height-indexed vectors - state.supply.value = self.metrics.supply.height_to_supply.read_once(prev_height)?; - state.supply.utxo_count = *self.metrics.supply.height_to_utxo_count.read_once(prev_height)?; + state.supply.value = self + .metrics + .supply + .height_to_supply + .read_once(prev_height)?; + state.supply.utxo_count = *self + .metrics + .supply + .height_to_utxo_count + .read_once(prev_height)?; // Restore realized cap if present if let Some(realized_metrics) = self.metrics.realized.as_mut() - && let Some(realized_state) = state.realized.as_mut() { - realized_state.cap = realized_metrics.height_to_realized_cap.read_once(prev_height)?; - } + && let Some(realized_state) = state.realized.as_mut() + { + realized_state.cap = realized_metrics + .height_to_realized_cap + .read_once(prev_height)?; + } let result = prev_height.incremented(); self.state_starting_height = Some(result); @@ -179,8 +189,8 @@ impl DynCohortVecs for UTXOCohortVecs { Ok(()) } - fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { - self.metrics.safe_flush(exit)?; + fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { + self.metrics.safe_write(exit)?; if let Some(state) = self.state.as_mut() { state.commit(height)?; diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs index d59779939..992c15ccf 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs @@ -349,16 +349,16 @@ impl UTXOCohorts { }) } - /// Flush stateful vectors for separate and aggregate cohorts. - pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { + /// Write stateful vectors for separate and aggregate cohorts. + pub fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { // Flush separate cohorts (includes metrics + state) self.par_iter_separate_mut() - .try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?; + .try_for_each(|v| v.safe_write_stateful_vecs(height, exit))?; - // Flush aggregate cohorts' metrics (including price_percentiles) + // Write aggregate cohorts' metrics (including price_percentiles) // Note: aggregate cohorts no longer maintain price_to_amount state for v in self.0.iter_aggregate_mut() { - v.metrics.safe_flush(exit)?; + v.metrics.safe_write(exit)?; } Ok(()) } diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/receive.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/receive.rs index 7df52517f..c22767356 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/receive.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/receive.rs @@ -3,7 +3,7 @@ use brk_grouper::{Filter, Filtered}; use brk_types::{Dollars, Height}; -use crate::states::Transacted; +use crate::stateful::states::Transacted; use super::UTXOCohorts; diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs index 0a6843224..b497a7a4a 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/send.rs @@ -5,7 +5,10 @@ use brk_types::{CheckedSub, HalvingEpoch, Height}; use rustc_hash::FxHashMap; use vecdb::VecIndex; -use crate::{states::{BlockState, Transacted}, utils::OptionExt}; +use crate::{ + stateful::states::{BlockState, Transacted}, + utils::OptionExt, +}; use super::UTXOCohorts; diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs index dd45d72a3..8f31ff4a9 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs @@ -9,7 +9,7 @@ use brk_grouper::AGE_BOUNDARIES; use brk_types::{ONE_DAY_IN_SEC, Timestamp}; -use crate::states::BlockState; +use crate::stateful::states::BlockState; use super::UTXOCohorts; diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index 8c1d81d31..a07504a85 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -23,18 +23,16 @@ use crate::{ address::AddressTypeToAddressCount, compute::write::{process_address_updates, write}, process::{ - AddressLookup, EmptyAddressDataWithSource, InputsResult, LoadedAddressDataWithSource, - build_txoutindex_to_height_map, process_inputs, process_outputs, process_received, - process_sent, update_tx_counts, + AddressCache, InputsResult, build_txoutindex_to_height_map, process_inputs, + process_outputs, process_received, process_sent, }, + states::{BlockState, Transacted}, }, - states::{BlockState, Transacted}, utils::OptionExt, }; use super::{ super::{ - address::AddressTypeToTypeIndexMap, cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts}, vecs::Vecs, }, @@ -225,11 +223,7 @@ pub fn process_blocks( ) }; - // Persistent address data caches (accumulate across blocks, flushed at checkpoints) - let mut loaded_cache: AddressTypeToTypeIndexMap = - AddressTypeToTypeIndexMap::default(); - let mut empty_cache: AddressTypeToTypeIndexMap = - AddressTypeToTypeIndexMap::default(); + let mut cache = AddressCache::new(); info!("Starting main block iteration..."); @@ -304,8 +298,7 @@ pub fn process_blocks( &output_types, &output_typeindexes, &first_addressindexes, - &loaded_cache, - &empty_cache, + &cache, &vr, &vecs.any_address_indexes, &vecs.addresses_data, @@ -326,8 +319,7 @@ pub fn process_blocks( &txoutindex_to_height, &ir, &first_addressindexes, - &loaded_cache, - &empty_cache, + &cache, &vr, &vecs.any_address_indexes, &vecs.addresses_data, @@ -347,15 +339,15 @@ pub fn process_blocks( (outputs_result, inputs_result) }); - // Merge new address data into caches - loaded_cache.merge_mut(outputs_result.address_data); - loaded_cache.merge_mut(inputs_result.address_data); + // Merge new address data into current cache + cache.merge_loaded(outputs_result.address_data); + cache.merge_loaded(inputs_result.address_data); // Combine txindex_vecs from outputs and inputs, then update tx_count let combined_txindex_vecs = outputs_result .txindex_vecs .merge_vec(inputs_result.txindex_vecs); - update_tx_counts(&mut loaded_cache, &mut empty_cache, combined_txindex_vecs); + cache.update_tx_counts(combined_txindex_vecs); let mut transacted = outputs_result.transacted; let mut height_to_sent = inputs_result.height_to_sent; @@ -398,15 +390,7 @@ pub fn process_blocks( thread::scope(|scope| { // Spawn address cohort processing in background thread scope.spawn(|| { - // Create lookup closure that returns None (data was pre-fetched in parallel phase) - let get_address_data = - |_output_type, _type_index| -> Option { None }; - - let mut lookup = AddressLookup { - get_address_data, - loaded: &mut loaded_cache, - empty: &mut empty_cache, - }; + let mut lookup = cache.as_lookup(); // Process received outputs (addresses receiving funds) process_received( @@ -491,39 +475,43 @@ pub fn process_blocks( { let _lock = exit.lock(); - // Drop readers before flush to release mmap handles + // Drop readers to release mmap handles drop(vr); + let (empty_updates, loaded_updates) = cache.take(); + // Process address updates (mutations) process_address_updates( &mut vecs.addresses_data, &mut vecs.any_address_indexes, - std::mem::take(&mut empty_cache), - std::mem::take(&mut loaded_cache), + empty_updates, + loaded_updates, )?; - // Flush to disk (pure I/O) - no changes saved for periodic flushes + // Write to disk (pure I/O) - no changes saved for periodic flushes write(vecs, height, chain_state, false, exit)?; - // Recreate readers after flush to pick up new data + // Recreate readers vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); } } - // Final flush - always save changes for rollback support + // Final write - always save changes for rollback support { let _lock = exit.lock(); drop(vr); + let (empty_updates, loaded_updates) = cache.take(); + // Process address updates (mutations) process_address_updates( &mut vecs.addresses_data, &mut vecs.any_address_indexes, - std::mem::take(&mut empty_cache), - std::mem::take(&mut loaded_cache), + empty_updates, + loaded_updates, )?; - // Flush to disk (pure I/O) - save changes for rollback + // Write to disk (pure I/O) - save changes for rollback write(vecs, last_height, chain_state, true, exit)?; } diff --git a/crates/brk_computer/src/stateful/compute/write.rs b/crates/brk_computer/src/stateful/compute/write.rs index 694d68ae4..b0078ed92 100644 --- a/crates/brk_computer/src/stateful/compute/write.rs +++ b/crates/brk_computer/src/stateful/compute/write.rs @@ -4,20 +4,15 @@ //! - `process_address_updates`: applies cached address changes to storage //! - `flush`: writes all data to disk -use std::time::Instant; - use brk_error::Result; use brk_types::Height; -use log::info; use vecdb::{AnyStoredVec, Exit, GenericStoredVec, Stamp}; -use crate::{ - stateful::{ - Vecs, - process::{ - EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses, - process_loaded_addresses, - }, +use crate::stateful::{ + Vecs, + process::{ + EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses, + process_loaded_addresses, }, states::BlockState, }; @@ -38,34 +33,15 @@ pub fn process_address_updates( empty_updates: AddressTypeToTypeIndexMap, loaded_updates: AddressTypeToTypeIndexMap, ) -> Result<()> { - let t0 = Instant::now(); - - // Process address data transitions let empty_result = process_empty_addresses(addresses_data, empty_updates)?; - let t1 = Instant::now(); - let loaded_result = process_loaded_addresses(addresses_data, loaded_updates)?; - let t2 = Instant::now(); - let all_updates = empty_result.merge(loaded_result); - let t3 = Instant::now(); - // Apply index updates for (address_type, sorted) in all_updates.into_sorted_iter() { for (typeindex, any_index) in sorted { address_indexes.update_or_push(address_type, typeindex, any_index)?; } } - let t4 = Instant::now(); - - info!( - "process_address_updates: empty={:?} loaded={:?} merge={:?} indexes={:?} total={:?}", - t1 - t0, - t2 - t1, - t3 - t2, - t4 - t3, - t4 - t0 - ); Ok(()) } @@ -87,23 +63,17 @@ pub fn write( with_changes: bool, exit: &Exit, ) -> Result<()> { - let t0 = Instant::now(); - // Flush cohort states (separate + aggregate) - vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?; - let t1 = Instant::now(); - + vecs.utxo_cohorts.safe_write_stateful_vecs(height, exit)?; vecs.address_cohorts - .safe_flush_stateful_vecs(height, exit)?; - let t2 = Instant::now(); + .safe_write_stateful_vecs(height, exit)?; // Flush height-indexed vectors vecs.height_to_unspendable_supply.safe_write(exit)?; vecs.height_to_opreturn_supply.safe_write(exit)?; - vecs.addresstype_to_height_to_addr_count.safe_flush(exit)?; + vecs.addresstype_to_height_to_addr_count.safe_write(exit)?; vecs.addresstype_to_height_to_empty_addr_count - .safe_flush(exit)?; - let t3 = Instant::now(); + .safe_write(exit)?; // Flush large vecs in parallel let stamp = Stamp::from(height); @@ -111,29 +81,16 @@ pub fn write( let addresses_data = &mut vecs.addresses_data; let txoutindex_to_txinindex = &mut vecs.txoutindex_to_txinindex; - let ((addr_result, addr_idx_time, addr_data_time), (txout_result, txout_time)) = rayon::join( + let (addr_result, txout_result) = rayon::join( || { - let t0 = Instant::now(); - let r1 = any_address_indexes.write(stamp, with_changes); - let t1 = Instant::now(); - let r2 = addresses_data.write(stamp, with_changes); - let t2 = Instant::now(); - let r = r1.and(r2); - (r, t1 - t0, t2 - t1) - }, - || { - let t = Instant::now(); - let r = txoutindex_to_txinindex.stamped_write_maybe_with_changes(stamp, with_changes); - (r, t.elapsed()) + any_address_indexes + .write(stamp, with_changes) + .and(addresses_data.write(stamp, with_changes)) }, + || txoutindex_to_txinindex.stamped_write_maybe_with_changes(stamp, with_changes), ); addr_result?; txout_result?; - let t4 = Instant::now(); - info!( - " parallel breakdown: addr_idx={:?} addr_data={:?} txout={:?}", - addr_idx_time, addr_data_time, txout_time - ); // Sync in-memory chain_state to persisted and flush vecs.chain_state.truncate_if_needed(Height::ZERO)?; @@ -142,18 +99,6 @@ pub fn write( } vecs.chain_state .stamped_write_maybe_with_changes(stamp, with_changes)?; - let t5 = Instant::now(); - - info!( - "flush: utxo={:?} addr={:?} height={:?} parallel={:?} chain={:?} total={:?} (with_changes={})", - t1 - t0, - t2 - t1, - t3 - t2, - t4 - t3, - t5 - t4, - t5 - t0, - with_changes - ); Ok(()) } diff --git a/crates/brk_computer/src/stateful/metrics/activity.rs b/crates/brk_computer/src/stateful/metrics/activity.rs index 8da020d5b..0aa30537a 100644 --- a/crates/brk_computer/src/stateful/metrics/activity.rs +++ b/crates/brk_computer/src/stateful/metrics/activity.rs @@ -113,8 +113,8 @@ impl ActivityMetrics { Ok(()) } - /// Flush height-indexed vectors to disk. - pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { + /// Write height-indexed vectors to disk. + pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { self.height_to_sent.safe_write(exit)?; self.height_to_satblocks_destroyed.safe_write(exit)?; self.height_to_satdays_destroyed.safe_write(exit)?; diff --git a/crates/brk_computer/src/stateful/metrics/mod.rs b/crates/brk_computer/src/stateful/metrics/mod.rs index 844f16b73..b30a9eef2 100644 --- a/crates/brk_computer/src/stateful/metrics/mod.rs +++ b/crates/brk_computer/src/stateful/metrics/mod.rs @@ -30,7 +30,7 @@ use brk_traversable::Traversable; use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version}; use vecdb::{Exit, IterableVec}; -use crate::{Indexes, indexes, price, stateful::cohorts::CohortState}; +use crate::{Indexes, indexes, price, stateful::states::CohortState}; /// All metrics for a cohort, organized by category. #[derive(Clone, Traversable)] @@ -105,21 +105,21 @@ impl CohortMetrics { Ok(()) } - /// Flush height-indexed vectors to disk. - pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { - self.supply.safe_flush(exit)?; - self.activity.safe_flush(exit)?; + /// Write height-indexed vectors to disk. + pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { + self.supply.safe_write(exit)?; + self.activity.safe_write(exit)?; if let Some(realized) = self.realized.as_mut() { - realized.safe_flush(exit)?; + realized.safe_write(exit)?; } if let Some(unrealized) = self.unrealized.as_mut() { - unrealized.safe_flush(exit)?; + unrealized.safe_write(exit)?; } if let Some(price_paid) = self.price_paid.as_mut() { - price_paid.safe_flush(exit)?; + price_paid.safe_write(exit)?; } Ok(()) diff --git a/crates/brk_computer/src/stateful/metrics/price_paid.rs b/crates/brk_computer/src/stateful/metrics/price_paid.rs index 449e9b443..afa6959c2 100644 --- a/crates/brk_computer/src/stateful/metrics/price_paid.rs +++ b/crates/brk_computer/src/stateful/metrics/price_paid.rs @@ -10,8 +10,7 @@ use vecdb::{AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVe use crate::{ Indexes, grouped::{ComputedVecsFromHeight, PricePercentiles, Source, VecBuilderOptions}, - stateful::cohorts::CohortState, - states::Flushable, + stateful::states::CohortState, }; use super::ImportConfig; @@ -112,12 +111,12 @@ impl PricePaidMetrics { Ok(()) } - /// Flush height-indexed vectors to disk. - pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { + /// Write height-indexed vectors to disk. + pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { self.height_to_min_price_paid.safe_write(exit)?; self.height_to_max_price_paid.safe_write(exit)?; if let Some(price_percentiles) = self.price_percentiles.as_mut() { - price_percentiles.safe_flush(exit)?; + price_percentiles.safe_write(exit)?; } Ok(()) } diff --git a/crates/brk_computer/src/stateful/metrics/realized.rs b/crates/brk_computer/src/stateful/metrics/realized.rs index 005f45a38..4dd0fa676 100644 --- a/crates/brk_computer/src/stateful/metrics/realized.rs +++ b/crates/brk_computer/src/stateful/metrics/realized.rs @@ -14,7 +14,7 @@ use crate::{ VecBuilderOptions, }, indexes, price, - states::RealizedState, + stateful::states::RealizedState, utils::OptionExt, }; @@ -411,11 +411,16 @@ impl RealizedMetrics { /// Push realized state values to height-indexed vectors. pub fn truncate_push(&mut self, height: Height, state: &RealizedState) -> Result<()> { - self.height_to_realized_cap.truncate_push(height, state.cap)?; - self.height_to_realized_profit.truncate_push(height, state.profit)?; - self.height_to_realized_loss.truncate_push(height, state.loss)?; - self.height_to_value_created.truncate_push(height, state.value_created)?; - self.height_to_value_destroyed.truncate_push(height, state.value_destroyed)?; + self.height_to_realized_cap + .truncate_push(height, state.cap)?; + self.height_to_realized_profit + .truncate_push(height, state.profit)?; + self.height_to_realized_loss + .truncate_push(height, state.loss)?; + self.height_to_value_created + .truncate_push(height, state.value_created)?; + self.height_to_value_destroyed + .truncate_push(height, state.value_destroyed)?; if let Some(v) = self.height_to_adjusted_value_created.as_mut() { v.truncate_push(height, state.adj_value_created)?; @@ -427,8 +432,8 @@ impl RealizedMetrics { Ok(()) } - /// Flush height-indexed vectors to disk. - pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { + /// Write height-indexed vectors to disk. + pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { self.height_to_realized_cap.safe_write(exit)?; self.height_to_realized_profit.safe_write(exit)?; self.height_to_realized_loss.safe_write(exit)?; @@ -458,55 +463,74 @@ impl RealizedMetrics { ) -> Result<()> { self.height_to_realized_cap.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_realized_cap).collect::>(), + &others + .iter() + .map(|v| &v.height_to_realized_cap) + .collect::>(), exit, )?; self.height_to_realized_profit.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_realized_profit).collect::>(), + &others + .iter() + .map(|v| &v.height_to_realized_profit) + .collect::>(), exit, )?; self.height_to_realized_loss.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_realized_loss).collect::>(), + &others + .iter() + .map(|v| &v.height_to_realized_loss) + .collect::>(), exit, )?; self.height_to_value_created.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_value_created).collect::>(), + &others + .iter() + .map(|v| &v.height_to_value_created) + .collect::>(), exit, )?; self.height_to_value_destroyed.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_value_destroyed).collect::>(), + &others + .iter() + .map(|v| &v.height_to_value_destroyed) + .collect::>(), exit, )?; if self.height_to_adjusted_value_created.is_some() { - self.height_to_adjusted_value_created.um().compute_sum_of_others( - starting_indexes.height, - &others - .iter() - .map(|v| { - v.height_to_adjusted_value_created - .as_ref() - .unwrap_or(&v.height_to_value_created) - }) - .collect::>(), - exit, - )?; - self.height_to_adjusted_value_destroyed.um().compute_sum_of_others( - starting_indexes.height, - &others - .iter() - .map(|v| { - v.height_to_adjusted_value_destroyed - .as_ref() - .unwrap_or(&v.height_to_value_destroyed) - }) - .collect::>(), - exit, - )?; + self.height_to_adjusted_value_created + .um() + .compute_sum_of_others( + starting_indexes.height, + &others + .iter() + .map(|v| { + v.height_to_adjusted_value_created + .as_ref() + .unwrap_or(&v.height_to_value_created) + }) + .collect::>(), + exit, + )?; + self.height_to_adjusted_value_destroyed + .um() + .compute_sum_of_others( + starting_indexes.height, + &others + .iter() + .map(|v| { + v.height_to_adjusted_value_destroyed + .as_ref() + .unwrap_or(&v.height_to_value_destroyed) + }) + .collect::>(), + exit, + )?; } Ok(()) diff --git a/crates/brk_computer/src/stateful/metrics/supply.rs b/crates/brk_computer/src/stateful/metrics/supply.rs index 26622a936..ddbcbf4d0 100644 --- a/crates/brk_computer/src/stateful/metrics/supply.rs +++ b/crates/brk_computer/src/stateful/metrics/supply.rs @@ -17,7 +17,7 @@ use crate::{ VecBuilderOptions, }, indexes, price, - states::SupplyState, + stateful::states::SupplyState, }; use super::ImportConfig; @@ -130,8 +130,8 @@ impl SupplyMetrics { Ok(()) } - /// Flush height-indexed vectors to disk. - pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { + /// Write height-indexed vectors to disk. + pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { self.height_to_supply.safe_write(exit)?; self.height_to_utxo_count.safe_write(exit)?; Ok(()) @@ -152,12 +152,18 @@ impl SupplyMetrics { ) -> Result<()> { self.height_to_supply.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_supply).collect::>(), + &others + .iter() + .map(|v| &v.height_to_supply) + .collect::>(), exit, )?; self.height_to_utxo_count.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_utxo_count).collect::>(), + &others + .iter() + .map(|v| &v.height_to_utxo_count) + .collect::>(), exit, )?; Ok(()) @@ -244,7 +250,13 @@ impl SupplyMetrics { dateindex_to_market_cap: Option<&impl IterableVec>, _exit: &Exit, ) -> Result<()> { - let _ = (indexes, price, height_to_supply, height_to_market_cap, dateindex_to_market_cap); + let _ = ( + indexes, + price, + height_to_supply, + height_to_market_cap, + dateindex_to_market_cap, + ); // Supply relative metrics computed here if needed Ok(()) diff --git a/crates/brk_computer/src/stateful/metrics/unrealized.rs b/crates/brk_computer/src/stateful/metrics/unrealized.rs index c5f1a96ff..a6aadc245 100644 --- a/crates/brk_computer/src/stateful/metrics/unrealized.rs +++ b/crates/brk_computer/src/stateful/metrics/unrealized.rs @@ -5,7 +5,9 @@ use brk_error::Result; use brk_traversable::Traversable; use brk_types::{DateIndex, Dollars, Height, Sats, Version}; -use vecdb::{AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableCloneableVec, PcoVec}; +use vecdb::{ + AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableCloneableVec, PcoVec, +}; use crate::{ Indexes, @@ -13,7 +15,7 @@ use crate::{ ComputedHeightValueVecs, ComputedValueVecsFromDateIndex, ComputedVecsFromDateIndex, Source, VecBuilderOptions, }, - states::UnrealizedState, + stateful::states::UnrealizedState, }; use super::ImportConfig; @@ -216,8 +218,8 @@ impl UnrealizedMetrics { Ok(()) } - /// Flush height-indexed vectors to disk. - pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { + /// Write height-indexed vectors to disk. + pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { self.height_to_supply_in_profit.safe_write(exit)?; self.height_to_supply_in_loss.safe_write(exit)?; self.height_to_unrealized_profit.safe_write(exit)?; @@ -238,42 +240,66 @@ impl UnrealizedMetrics { ) -> Result<()> { self.height_to_supply_in_profit.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_supply_in_profit).collect::>(), + &others + .iter() + .map(|v| &v.height_to_supply_in_profit) + .collect::>(), exit, )?; self.height_to_supply_in_loss.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_supply_in_loss).collect::>(), + &others + .iter() + .map(|v| &v.height_to_supply_in_loss) + .collect::>(), exit, )?; self.height_to_unrealized_profit.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_unrealized_profit).collect::>(), + &others + .iter() + .map(|v| &v.height_to_unrealized_profit) + .collect::>(), exit, )?; self.height_to_unrealized_loss.compute_sum_of_others( starting_indexes.height, - &others.iter().map(|v| &v.height_to_unrealized_loss).collect::>(), + &others + .iter() + .map(|v| &v.height_to_unrealized_loss) + .collect::>(), exit, )?; self.dateindex_to_supply_in_profit.compute_sum_of_others( starting_indexes.dateindex, - &others.iter().map(|v| &v.dateindex_to_supply_in_profit).collect::>(), + &others + .iter() + .map(|v| &v.dateindex_to_supply_in_profit) + .collect::>(), exit, )?; self.dateindex_to_supply_in_loss.compute_sum_of_others( starting_indexes.dateindex, - &others.iter().map(|v| &v.dateindex_to_supply_in_loss).collect::>(), + &others + .iter() + .map(|v| &v.dateindex_to_supply_in_loss) + .collect::>(), exit, )?; self.dateindex_to_unrealized_profit.compute_sum_of_others( starting_indexes.dateindex, - &others.iter().map(|v| &v.dateindex_to_unrealized_profit).collect::>(), + &others + .iter() + .map(|v| &v.dateindex_to_unrealized_profit) + .collect::>(), exit, )?; self.dateindex_to_unrealized_loss.compute_sum_of_others( starting_indexes.dateindex, - &others.iter().map(|v| &v.dateindex_to_unrealized_loss).collect::>(), + &others + .iter() + .map(|v| &v.dateindex_to_unrealized_loss) + .collect::>(), exit, )?; Ok(()) diff --git a/crates/brk_computer/src/stateful/mod.rs b/crates/brk_computer/src/stateful/mod.rs index 5f76f07fa..cd06f7489 100644 --- a/crates/brk_computer/src/stateful/mod.rs +++ b/crates/brk_computer/src/stateful/mod.rs @@ -7,10 +7,12 @@ //! //! ```text //! stateful/ -//! ├── address/ # Address type handling (indexes, data storage) +//! ├── address/ # Address type collections (type_vec, type_index_map, etc.) //! ├── cohorts/ # Cohort traits and state management -//! ├── compute/ # Block processing pipeline -//! └── metrics/ # Metric vectors organized by category +//! ├── compute/ # Block processing loop and I/O +//! ├── metrics/ # Metric vectors organized by category +//! ├── process/ # Transaction processing (inputs, outputs, cache) +//! └── vecs.rs # Main vectors container //! ``` //! //! ## Data Flow @@ -26,20 +28,17 @@ pub mod cohorts; pub mod compute; pub mod metrics; mod process; +mod states; mod vecs; - +use states::*; pub use vecs::Vecs; // Address re-exports -pub use address::{ - AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs, -}; +pub use address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs}; // Cohort re-exports -pub use cohorts::{ - AddressCohorts, CohortVecs, DynCohortVecs, Flushable, UTXOCohorts, -}; +pub use cohorts::{AddressCohorts, CohortVecs, DynCohortVecs, UTXOCohorts}; // Compute re-exports pub use compute::IndexerReaders; diff --git a/crates/brk_computer/src/stateful/process/address_updates.rs b/crates/brk_computer/src/stateful/process/address_updates.rs new file mode 100644 index 000000000..7512eabf0 --- /dev/null +++ b/crates/brk_computer/src/stateful/process/address_updates.rs @@ -0,0 +1,127 @@ +//! Address data update processing for flush operations. +//! +//! Handles transitions between loaded (non-zero balance) and empty (zero balance) states: +//! - New addresses: push to storage +//! - Updated addresses: update in place +//! - State transitions: delete from source, push to destination + +use brk_error::Result; +use brk_types::{ + AnyAddressIndex, EmptyAddressData, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex, + OutputType, TypeIndex, +}; + +use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource}; +use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs}; + +/// Process loaded address data updates. +/// +/// Handles: +/// - New loaded address: push to loaded storage +/// - Updated loaded address (was loaded): update in place +/// - Transition empty -> loaded: delete from empty, push to loaded +pub fn process_loaded_addresses( + addresses_data: &mut AddressesDataVecs, + loaded_updates: AddressTypeToTypeIndexMap, +) -> Result> { + let total: usize = loaded_updates.iter().map(|(_, m)| m.len()).sum(); + + let mut updates: Vec<(LoadedAddressIndex, LoadedAddressData)> = Vec::with_capacity(total); + let mut deletes: Vec = Vec::with_capacity(total); + let mut pushes: Vec<(OutputType, TypeIndex, LoadedAddressData)> = Vec::with_capacity(total); + + for (address_type, items) in loaded_updates.into_iter() { + for (typeindex, source) in items { + match source { + LoadedAddressDataWithSource::New(data) => { + pushes.push((address_type, typeindex, data)); + } + LoadedAddressDataWithSource::FromLoaded(index, data) => { + updates.push((index, data)); + } + LoadedAddressDataWithSource::FromEmpty(empty_index, data) => { + deletes.push(empty_index); + pushes.push((address_type, typeindex, data)); + } + } + } + } + + // Phase 1: Deletes (creates holes) + for empty_index in deletes { + addresses_data.empty.delete(empty_index); + } + + // Phase 2: Updates (in-place) + for (index, data) in updates { + addresses_data.loaded.update(index, data)?; + } + + // Phase 3: Pushes (fills holes, then grows) + let mut result = AddressTypeToTypeIndexMap::default(); + for (address_type, typeindex, data) in pushes { + let index = addresses_data.loaded.fill_first_hole_or_push(data)?; + result + .get_mut(address_type) + .unwrap() + .insert(typeindex, AnyAddressIndex::from(index)); + } + + Ok(result) +} + +/// Process empty address data updates. +/// +/// Handles: +/// - New empty address: push to empty storage +/// - Updated empty address (was empty): update in place +/// - Transition loaded -> empty: delete from loaded, push to empty +pub fn process_empty_addresses( + addresses_data: &mut AddressesDataVecs, + empty_updates: AddressTypeToTypeIndexMap, +) -> Result> { + let total: usize = empty_updates.iter().map(|(_, m)| m.len()).sum(); + + let mut updates: Vec<(EmptyAddressIndex, EmptyAddressData)> = Vec::with_capacity(total); + let mut deletes: Vec = Vec::with_capacity(total); + let mut pushes: Vec<(OutputType, TypeIndex, EmptyAddressData)> = Vec::with_capacity(total); + + for (address_type, items) in empty_updates.into_iter() { + for (typeindex, source) in items { + match source { + EmptyAddressDataWithSource::New(data) => { + pushes.push((address_type, typeindex, data)); + } + EmptyAddressDataWithSource::FromEmpty(index, data) => { + updates.push((index, data)); + } + EmptyAddressDataWithSource::FromLoaded(loaded_index, data) => { + deletes.push(loaded_index); + pushes.push((address_type, typeindex, data)); + } + } + } + } + + // Phase 1: Deletes (creates holes) + for loaded_index in deletes { + addresses_data.loaded.delete(loaded_index); + } + + // Phase 2: Updates (in-place) + for (index, data) in updates { + addresses_data.empty.update(index, data)?; + } + + // Phase 3: Pushes (fills holes, then grows) + let mut result = AddressTypeToTypeIndexMap::default(); + for (address_type, typeindex, data) in pushes { + let index = addresses_data.empty.fill_first_hole_or_push(data)?; + result + .get_mut(address_type) + .unwrap() + .insert(typeindex, AnyAddressIndex::from(index)); + } + + Ok(result) +} diff --git a/crates/brk_computer/src/stateful/process/cache.rs b/crates/brk_computer/src/stateful/process/cache.rs new file mode 100644 index 000000000..d8d536b94 --- /dev/null +++ b/crates/brk_computer/src/stateful/process/cache.rs @@ -0,0 +1,77 @@ +//! Address data cache for flush intervals. +//! +//! Accumulates address data across blocks within a flush interval. +//! Data is flushed to disk at checkpoints. + +use brk_types::{OutputType, TypeIndex}; + +use super::super::address::AddressTypeToTypeIndexMap; +use super::{AddressLookup, EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec}; + +/// Cache for address data within a flush interval. +pub struct AddressCache { + /// Addresses with non-zero balance + loaded: AddressTypeToTypeIndexMap, + /// Addresses that became empty (zero balance) + empty: AddressTypeToTypeIndexMap, +} + +impl Default for AddressCache { + fn default() -> Self { + Self::new() + } +} + +impl AddressCache { + pub fn new() -> Self { + Self { + loaded: AddressTypeToTypeIndexMap::default(), + empty: AddressTypeToTypeIndexMap::default(), + } + } + + /// Check if address is in cache (either loaded or empty). + #[inline] + pub fn contains(&self, address_type: OutputType, typeindex: TypeIndex) -> bool { + self.loaded + .get(address_type) + .is_some_and(|m| m.contains_key(&typeindex)) + || self + .empty + .get(address_type) + .is_some_and(|m| m.contains_key(&typeindex)) + } + + /// Merge address data into loaded cache. + #[inline] + pub fn merge_loaded(&mut self, data: AddressTypeToTypeIndexMap) { + self.loaded.merge_mut(data); + } + + /// Create an AddressLookup view into this cache. + #[inline] + pub fn as_lookup(&mut self) -> AddressLookup<'_> { + AddressLookup { + loaded: &mut self.loaded, + empty: &mut self.empty, + } + } + + /// Update transaction counts for addresses. + pub fn update_tx_counts(&mut self, txindex_vecs: AddressTypeToTypeIndexMap) { + super::update_tx_counts(&mut self.loaded, &mut self.empty, txindex_vecs); + } + + /// Take the cache contents for flushing, leaving empty caches. + pub fn take( + &mut self, + ) -> ( + AddressTypeToTypeIndexMap, + AddressTypeToTypeIndexMap, + ) { + ( + std::mem::take(&mut self.empty), + std::mem::take(&mut self.loaded), + ) + } +} diff --git a/crates/brk_computer/src/stateful/process/empty_addresses.rs b/crates/brk_computer/src/stateful/process/empty_addresses.rs deleted file mode 100644 index b0e44c519..000000000 --- a/crates/brk_computer/src/stateful/process/empty_addresses.rs +++ /dev/null @@ -1,65 +0,0 @@ -use brk_error::Result; -use brk_types::{AnyAddressIndex, EmptyAddressData, EmptyAddressIndex, LoadedAddressIndex, OutputType, TypeIndex}; - -use super::EmptyAddressDataWithSource; -use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs}; - -/// Process empty address data updates. -/// -/// Handles three cases: -/// - New empty address: push to empty storage -/// - Updated empty address (was empty): update in place -/// - Transition loaded -> empty: delete from loaded, push to empty -/// -/// Optimized to batch operations: deletes first (creates holes), then updates, then pushes. -pub fn process_empty_addresses( - addresses_data: &mut AddressesDataVecs, - empty_updates: AddressTypeToTypeIndexMap, -) -> Result> { - // Estimate capacity from input size - let total: usize = empty_updates.iter().map(|(_, m)| m.len()).sum(); - - // Collect operations by type (no sorting needed) - let mut updates: Vec<(EmptyAddressIndex, EmptyAddressData)> = Vec::with_capacity(total); - let mut deletes: Vec = Vec::with_capacity(total); - let mut pushes: Vec<(OutputType, TypeIndex, EmptyAddressData)> = Vec::with_capacity(total); - - for (address_type, items) in empty_updates.into_iter() { - for (typeindex, source) in items { - match source { - EmptyAddressDataWithSource::New(data) => { - pushes.push((address_type, typeindex, data)); - } - EmptyAddressDataWithSource::FromEmpty(index, data) => { - updates.push((index, data)); - } - EmptyAddressDataWithSource::FromLoaded(loaded_index, data) => { - deletes.push(loaded_index); - pushes.push((address_type, typeindex, data)); - } - } - } - } - - // Phase 1: All deletes (creates holes in loaded vec) - for loaded_index in deletes { - addresses_data.loaded.delete(loaded_index); - } - - // Phase 2: All updates (in-place in empty vec) - for (index, data) in updates { - addresses_data.empty.update(index, data)?; - } - - // Phase 3: All pushes (fills holes in empty vec, then grows) - let mut result = AddressTypeToTypeIndexMap::default(); - for (address_type, typeindex, data) in pushes { - let index = addresses_data.empty.fill_first_hole_or_push(data)?; - result - .get_mut(address_type) - .unwrap() - .insert(typeindex, AnyAddressIndex::from(index)); - } - - Ok(result) -} diff --git a/crates/brk_computer/src/stateful/process/inputs.rs b/crates/brk_computer/src/stateful/process/inputs.rs index 9ed0359fb..fb901428a 100644 --- a/crates/brk_computer/src/stateful/process/inputs.rs +++ b/crates/brk_computer/src/stateful/process/inputs.rs @@ -17,15 +17,13 @@ use crate::stateful::address::{ AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs, }; use crate::stateful::compute::VecsReaders; -use crate::{ - stateful::{IndexerReaders, process::RangeMap}, - states::Transacted, -}; +use crate::stateful::states::Transacted; + +use super::AddressCache; +use crate::stateful::{IndexerReaders, process::RangeMap}; use super::super::address::HeightToAddressTypeToVec; -use super::{ - EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec, WithAddressDataSource, -}; +use super::{LoadedAddressDataWithSource, TxIndexVec, WithAddressDataSource}; /// Result of processing inputs for a block. pub struct InputsResult { @@ -69,8 +67,7 @@ pub fn process_inputs( ir: &IndexerReaders, // Address lookup parameters first_addressindexes: &ByAddressType, - loaded_cache: &AddressTypeToTypeIndexMap, - empty_cache: &AddressTypeToTypeIndexMap, + cache: &AddressCache, vr: &VecsReaders, any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, @@ -109,8 +106,7 @@ pub fn process_inputs( input_type, typeindex, first_addressindexes, - loaded_cache, - empty_cache, + cache, vr, any_address_indexes, addresses_data, @@ -188,8 +184,7 @@ fn get_address_data( address_type: OutputType, typeindex: TypeIndex, first_addressindexes: &ByAddressType, - loaded_cache: &AddressTypeToTypeIndexMap, - empty_cache: &AddressTypeToTypeIndexMap, + cache: &AddressCache, vr: &VecsReaders, any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, @@ -201,15 +196,7 @@ fn get_address_data( } // Skip if already in cache - if loaded_cache - .get(address_type) - .unwrap() - .contains_key(&typeindex) - || empty_cache - .get(address_type) - .unwrap() - .contains_key(&typeindex) - { + if cache.contains(address_type, typeindex) { return None; } diff --git a/crates/brk_computer/src/stateful/process/loaded_addresses.rs b/crates/brk_computer/src/stateful/process/loaded_addresses.rs deleted file mode 100644 index f527df890..000000000 --- a/crates/brk_computer/src/stateful/process/loaded_addresses.rs +++ /dev/null @@ -1,65 +0,0 @@ -use brk_error::Result; -use brk_types::{AnyAddressIndex, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex, OutputType, TypeIndex}; - -use super::LoadedAddressDataWithSource; -use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs}; - -/// Process loaded address data updates. -/// -/// Handles three cases: -/// - New loaded address: push to loaded storage -/// - Updated loaded address (was loaded): update in place -/// - Transition empty -> loaded: delete from empty, push to loaded -/// -/// Optimized to batch operations: deletes first (creates holes), then updates, then pushes. -pub fn process_loaded_addresses( - addresses_data: &mut AddressesDataVecs, - loaded_updates: AddressTypeToTypeIndexMap, -) -> Result> { - // Estimate capacity from input size - let total: usize = loaded_updates.iter().map(|(_, m)| m.len()).sum(); - - // Collect operations by type (no sorting needed) - let mut updates: Vec<(LoadedAddressIndex, LoadedAddressData)> = Vec::with_capacity(total); - let mut deletes: Vec = Vec::with_capacity(total); - let mut pushes: Vec<(OutputType, TypeIndex, LoadedAddressData)> = Vec::with_capacity(total); - - for (address_type, items) in loaded_updates.into_iter() { - for (typeindex, source) in items { - match source { - LoadedAddressDataWithSource::New(data) => { - pushes.push((address_type, typeindex, data)); - } - LoadedAddressDataWithSource::FromLoaded(index, data) => { - updates.push((index, data)); - } - LoadedAddressDataWithSource::FromEmpty(empty_index, data) => { - deletes.push(empty_index); - pushes.push((address_type, typeindex, data)); - } - } - } - } - - // Phase 1: All deletes (creates holes in empty vec) - for empty_index in deletes { - addresses_data.empty.delete(empty_index); - } - - // Phase 2: All updates (in-place in loaded vec) - for (index, data) in updates { - addresses_data.loaded.update(index, data)?; - } - - // Phase 3: All pushes (fills holes in loaded vec, then grows) - let mut result = AddressTypeToTypeIndexMap::default(); - for (address_type, typeindex, data) in pushes { - let index = addresses_data.loaded.fill_first_hole_or_push(data)?; - result - .get_mut(address_type) - .unwrap() - .insert(typeindex, AnyAddressIndex::from(index)); - } - - Ok(result) -} diff --git a/crates/brk_computer/src/stateful/process/address_lookup.rs b/crates/brk_computer/src/stateful/process/lookup.rs similarity index 55% rename from crates/brk_computer/src/stateful/process/address_lookup.rs rename to crates/brk_computer/src/stateful/process/lookup.rs index 2e4dffbb3..b37d8f7ab 100644 --- a/crates/brk_computer/src/stateful/process/address_lookup.rs +++ b/crates/brk_computer/src/stateful/process/lookup.rs @@ -1,37 +1,23 @@ -//! Address data lookup and source tracking. -//! -//! Handles looking up existing address data from storage and tracking -//! whether addresses are new, from storage, or previously empty. +//! Address data lookup during block processing. -use brk_types::{EmptyAddressData, LoadedAddressData, OutputType, TypeIndex}; +use brk_types::{LoadedAddressData, OutputType, TypeIndex}; use super::super::address::AddressTypeToTypeIndexMap; -pub use super::WithAddressDataSource; - -/// Loaded address data with source tracking for flush operations. -pub type LoadedAddressDataWithSource = WithAddressDataSource; - -/// Empty address data with source tracking for flush operations. -pub type EmptyAddressDataWithSource = WithAddressDataSource; +use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, WithAddressDataSource}; /// Context for looking up and storing address data during block processing. /// -/// Uses the same pattern as the original stateful module: +/// All addresses should be pre-fetched into the cache before using this. /// - `loaded`: addresses with non-zero balance (wrapped with source info) /// - `empty`: addresses that became empty this block (wrapped with source info) -pub struct AddressLookup<'a, F> { - /// Function to get existing address data from storage - pub get_address_data: F, +pub struct AddressLookup<'a> { /// Loaded addresses touched in current block pub loaded: &'a mut AddressTypeToTypeIndexMap, /// Empty addresses touched in current block pub empty: &'a mut AddressTypeToTypeIndexMap, } -impl<'a, F> AddressLookup<'a, F> -where - F: FnMut(OutputType, TypeIndex) -> Option, -{ +impl<'a> AddressLookup<'a> { /// Get or create address data for a receive operation. /// /// Returns (address_data, is_new, from_empty) @@ -61,26 +47,15 @@ where return (data, false, true); } - // Look up from storage or create new - match (self.get_address_data)(output_type, type_index) { - Some(data) => { - let is_new = data.is_new(); - let from_empty = data.is_from_emptyaddressdata(); - let data = entry.insert(data); - (data, is_new, from_empty) - } - None => { - let data = entry.insert(WithAddressDataSource::New( - LoadedAddressData::default(), - )); - (data, true, false) - } - } + // Not found - create new address + let data = + entry.insert(WithAddressDataSource::New(LoadedAddressData::default())); + (data, true, false) } } } - /// Get address data for a send operation (must exist). + /// Get address data for a send operation (must exist in cache). pub fn get_for_send( &mut self, output_type: OutputType, @@ -89,11 +64,8 @@ where self.loaded .get_mut(output_type) .unwrap() - .entry(type_index) - .or_insert_with(|| { - (self.get_address_data)(output_type, type_index) - .expect("Address must exist for send") - }) + .get_mut(&type_index) + .expect("Address must exist for send") } /// Move address from loaded to empty set. diff --git a/crates/brk_computer/src/stateful/process/mod.rs b/crates/brk_computer/src/stateful/process/mod.rs index 343b9185b..c85e84644 100644 --- a/crates/brk_computer/src/stateful/process/mod.rs +++ b/crates/brk_computer/src/stateful/process/mod.rs @@ -1,7 +1,7 @@ -mod address_lookup; -mod empty_addresses; +mod address_updates; +mod cache; mod inputs; -mod loaded_addresses; +mod lookup; mod outputs; mod range_map; mod received; @@ -9,10 +9,10 @@ mod sent; mod tx_counts; mod with_source; -pub use address_lookup::*; -pub use empty_addresses::*; +pub use address_updates::*; +pub use cache::*; pub use inputs::*; -pub use loaded_addresses::*; +pub use lookup::*; pub use outputs::*; pub use range_map::*; pub use received::*; diff --git a/crates/brk_computer/src/stateful/process/outputs.rs b/crates/brk_computer/src/stateful/process/outputs.rs index 69407e575..8ae677361 100644 --- a/crates/brk_computer/src/stateful/process/outputs.rs +++ b/crates/brk_computer/src/stateful/process/outputs.rs @@ -5,23 +5,17 @@ //! - Address data for address cohort tracking (optional) use brk_grouper::ByAddressType; -use brk_types::{ - AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TypeIndex, -}; -use smallvec::SmallVec; +use brk_types::{AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TypeIndex}; use vecdb::GenericStoredVec; use crate::stateful::address::{ AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs, }; use crate::stateful::compute::VecsReaders; -use crate::states::Transacted; +use crate::stateful::states::Transacted; use super::super::address::AddressTypeToVec; -use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, WithAddressDataSource}; - -/// SmallVec for transaction indexes - most addresses have few transactions per block. -pub type TxIndexVec = SmallVec<[TxIndex; 4]>; +use super::{AddressCache, LoadedAddressDataWithSource, TxIndexVec, WithAddressDataSource}; /// Result of processing outputs for a block. pub struct OutputsResult { @@ -52,8 +46,7 @@ pub fn process_outputs( typeindexes: &[TypeIndex], // Address lookup parameters first_addressindexes: &ByAddressType, - loaded_cache: &AddressTypeToTypeIndexMap, - empty_cache: &AddressTypeToTypeIndexMap, + cache: &AddressCache, vr: &VecsReaders, any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, @@ -90,8 +83,7 @@ pub fn process_outputs( output_type, typeindex, first_addressindexes, - loaded_cache, - empty_cache, + cache, vr, any_address_indexes, addresses_data, @@ -125,8 +117,7 @@ fn get_address_data( address_type: OutputType, typeindex: TypeIndex, first_addressindexes: &ByAddressType, - loaded_cache: &AddressTypeToTypeIndexMap, - empty_cache: &AddressTypeToTypeIndexMap, + cache: &AddressCache, vr: &VecsReaders, any_address_indexes: &AnyAddressIndexesVecs, addresses_data: &AddressesDataVecs, @@ -138,15 +129,7 @@ fn get_address_data( } // Skip if already in cache - if loaded_cache - .get(address_type) - .unwrap() - .contains_key(&typeindex) - || empty_cache - .get(address_type) - .unwrap() - .contains_key(&typeindex) - { + if cache.contains(address_type, typeindex) { return None; } diff --git a/crates/brk_computer/src/stateful/process/range_map.rs b/crates/brk_computer/src/stateful/process/range_map.rs index 88a44ce7c..0d33bf052 100644 --- a/crates/brk_computer/src/stateful/process/range_map.rs +++ b/crates/brk_computer/src/stateful/process/range_map.rs @@ -1,6 +1,6 @@ -//! Main block processing loop. +//! Range-based lookup map. //! -//! Iterates through blocks, processing outputs (receive) and inputs (send) in parallel. +//! Maps ranges of indices to values for efficient reverse lookups. use std::collections::BTreeMap; diff --git a/crates/brk_computer/src/stateful/process/received.rs b/crates/brk_computer/src/stateful/process/received.rs index f7c4a8626..9b9cac28e 100644 --- a/crates/brk_computer/src/stateful/process/received.rs +++ b/crates/brk_computer/src/stateful/process/received.rs @@ -6,11 +6,11 @@ //! - Empty addresses become non-empty again use brk_grouper::{ByAddressType, Filtered}; -use brk_types::{Dollars, OutputType, Sats, TypeIndex}; +use brk_types::{Dollars, Sats, TypeIndex}; use super::super::address::AddressTypeToVec; use super::super::cohorts::AddressCohorts; -use super::address_lookup::{AddressLookup, LoadedAddressDataWithSource}; +use super::lookup::AddressLookup; /// Process received outputs for address cohorts. /// @@ -18,16 +18,14 @@ use super::address_lookup::{AddressLookup, LoadedAddressDataWithSource}; /// 1. Look up or create address data /// 2. Update address balance and cohort membership /// 3. Update cohort states (add/subtract for boundary crossings, receive otherwise) -pub fn process_received( +pub fn process_received( received_data: AddressTypeToVec<(TypeIndex, Sats)>, cohorts: &mut AddressCohorts, - lookup: &mut AddressLookup, + lookup: &mut AddressLookup<'_>, price: Option, addr_count: &mut ByAddressType, empty_addr_count: &mut ByAddressType, -) where - F: FnMut(OutputType, TypeIndex) -> Option, -{ +) { for (output_type, vec) in received_data.unwrap().into_iter() { if vec.is_empty() { continue; diff --git a/crates/brk_computer/src/stateful/process/sent.rs b/crates/brk_computer/src/stateful/process/sent.rs index eba9ce89c..247bca0d1 100644 --- a/crates/brk_computer/src/stateful/process/sent.rs +++ b/crates/brk_computer/src/stateful/process/sent.rs @@ -7,12 +7,12 @@ use brk_error::Result; use brk_grouper::{ByAddressType, Filtered}; -use brk_types::{CheckedSub, Dollars, Height, OutputType, Sats, Timestamp, TypeIndex}; +use brk_types::{CheckedSub, Dollars, Height, Sats, Timestamp, TypeIndex}; use vecdb::VecIndex; use super::super::address::HeightToAddressTypeToVec; use super::super::cohorts::AddressCohorts; -use super::address_lookup::{AddressLookup, LoadedAddressDataWithSource}; +use super::lookup::AddressLookup; /// Process sent outputs for address cohorts. /// @@ -25,10 +25,10 @@ use super::address_lookup::{AddressLookup, LoadedAddressDataWithSource}; /// Note: Takes separate price/timestamp slices instead of chain_state to allow /// parallel execution with UTXO cohort processing (which mutates chain_state). #[allow(clippy::too_many_arguments)] -pub fn process_sent( +pub fn process_sent( sent_data: HeightToAddressTypeToVec<(TypeIndex, Sats)>, cohorts: &mut AddressCohorts, - lookup: &mut AddressLookup, + lookup: &mut AddressLookup<'_>, current_price: Option, addr_count: &mut ByAddressType, empty_addr_count: &mut ByAddressType, @@ -36,10 +36,7 @@ pub fn process_sent( height_to_timestamp: &[Timestamp], current_height: Height, current_timestamp: Timestamp, -) -> Result<()> -where - F: FnMut(OutputType, TypeIndex) -> Option, -{ +) -> Result<()> { for (prev_height, by_type) in sent_data.into_iter() { let prev_price = height_to_price.map(|v| v[prev_height.to_usize()]); let prev_timestamp = height_to_timestamp[prev_height.to_usize()]; diff --git a/crates/brk_computer/src/stateful/process/with_source.rs b/crates/brk_computer/src/stateful/process/with_source.rs index 60b5fa08d..0d8cd64e6 100644 --- a/crates/brk_computer/src/stateful/process/with_source.rs +++ b/crates/brk_computer/src/stateful/process/with_source.rs @@ -1,12 +1,22 @@ -//! Address data wrapper that tracks its source for flush operations. +//! Address data types with source tracking for flush operations. -use brk_types::{EmptyAddressData, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex}; +use brk_types::{EmptyAddressData, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex, TxIndex}; +use smallvec::SmallVec; + +/// Loaded address data with source tracking for flush operations. +pub type LoadedAddressDataWithSource = WithAddressDataSource; + +/// Empty address data with source tracking for flush operations. +pub type EmptyAddressDataWithSource = WithAddressDataSource; + +/// SmallVec for transaction indexes - most addresses have few transactions per block. +pub type TxIndexVec = SmallVec<[TxIndex; 4]>; /// Address data wrapped with its source location for flush operations. /// /// This enum tracks where the data came from so it can be correctly /// updated or created during the flush phase. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum WithAddressDataSource { /// Brand new address (never seen before) New(T), diff --git a/crates/brk_computer/src/stateful/cohorts/state_address.rs b/crates/brk_computer/src/stateful/states/address_cohort.rs similarity index 96% rename from crates/brk_computer/src/stateful/cohorts/state_address.rs rename to crates/brk_computer/src/stateful/states/address_cohort.rs index 2081092fe..dfb2d8d5f 100644 --- a/crates/brk_computer/src/stateful/cohorts/state_address.rs +++ b/crates/brk_computer/src/stateful/states/address_cohort.rs @@ -3,7 +3,7 @@ use std::path::Path; use brk_error::Result; use brk_types::{Dollars, Height, LoadedAddressData, Sats}; -use crate::SupplyState; +use crate::stateful::states::{RealizedState, SupplyState}; use super::CohortState; @@ -24,12 +24,12 @@ impl AddressCohortState { /// Reset state for fresh start. pub fn reset(&mut self) { self.addr_count = 0; - self.inner.supply = crate::SupplyState::default(); + self.inner.supply = SupplyState::default(); self.inner.sent = Sats::ZERO; self.inner.satblocks_destroyed = Sats::ZERO; self.inner.satdays_destroyed = Sats::ZERO; if let Some(realized) = self.inner.realized.as_mut() { - *realized = crate::RealizedState::NAN; + *realized = RealizedState::NAN; } } diff --git a/crates/brk_computer/src/states/block.rs b/crates/brk_computer/src/stateful/states/block.rs similarity index 100% rename from crates/brk_computer/src/states/block.rs rename to crates/brk_computer/src/stateful/states/block.rs diff --git a/crates/brk_computer/src/stateful/cohorts/state.rs b/crates/brk_computer/src/stateful/states/cohort.rs similarity index 98% rename from crates/brk_computer/src/stateful/cohorts/state.rs rename to crates/brk_computer/src/stateful/states/cohort.rs index 9e1ae9878..3388a22b9 100644 --- a/crates/brk_computer/src/stateful/cohorts/state.rs +++ b/crates/brk_computer/src/stateful/states/cohort.rs @@ -7,11 +7,9 @@ use std::path::Path; use brk_error::Result; use brk_types::{Dollars, Height, Sats}; -use crate::{ - CachedUnrealizedState, PriceToAmount, RealizedState, SupplyState, UnrealizedState, - grouped::PERCENTILES_LEN, - utils::OptionExt, -}; +use crate::{grouped::PERCENTILES_LEN, utils::OptionExt}; + +use super::{CachedUnrealizedState, PriceToAmount, RealizedState, SupplyState, UnrealizedState}; /// State tracked for each cohort during computation. #[derive(Clone)] diff --git a/crates/brk_computer/src/states/fenwick.rs b/crates/brk_computer/src/stateful/states/fenwick.rs similarity index 100% rename from crates/brk_computer/src/states/fenwick.rs rename to crates/brk_computer/src/stateful/states/fenwick.rs diff --git a/crates/brk_computer/src/states/mod.rs b/crates/brk_computer/src/stateful/states/mod.rs similarity index 70% rename from crates/brk_computer/src/states/mod.rs rename to crates/brk_computer/src/stateful/states/mod.rs index ec22918fd..4d27cfa7e 100644 --- a/crates/brk_computer/src/states/mod.rs +++ b/crates/brk_computer/src/stateful/states/mod.rs @@ -1,20 +1,22 @@ +mod address_cohort; mod block; -// mod cohorts; +mod cohort; mod fenwick; -mod flushable; mod price_buckets; mod price_to_amount; mod realized; mod supply; mod transacted; mod unrealized; +mod utxo_cohort; +pub use address_cohort::*; pub use block::*; -// pub use cohorts::*; -pub use flushable::*; +pub use cohort::*; pub use price_buckets::*; pub use price_to_amount::*; pub use realized::*; pub use supply::*; pub use transacted::*; pub use unrealized::*; +pub use utxo_cohort::*; diff --git a/crates/brk_computer/src/states/price_buckets.rs b/crates/brk_computer/src/stateful/states/price_buckets.rs similarity index 100% rename from crates/brk_computer/src/states/price_buckets.rs rename to crates/brk_computer/src/stateful/states/price_buckets.rs diff --git a/crates/brk_computer/src/states/price_to_amount.rs b/crates/brk_computer/src/stateful/states/price_to_amount.rs similarity index 98% rename from crates/brk_computer/src/states/price_to_amount.rs rename to crates/brk_computer/src/stateful/states/price_to_amount.rs index e91b5464f..e765fa884 100644 --- a/crates/brk_computer/src/states/price_to_amount.rs +++ b/crates/brk_computer/src/stateful/states/price_to_amount.rs @@ -11,9 +11,9 @@ use pco::standalone::{simple_decompress, simpler_compress}; use serde::{Deserialize, Serialize}; use vecdb::Bytes; -use crate::{grouped::PERCENTILES_LEN, states::SupplyState, utils::OptionExt}; +use crate::{grouped::PERCENTILES_LEN, utils::OptionExt}; -use super::PriceBuckets; +use super::{PriceBuckets, SupplyState}; #[derive(Clone, Debug)] pub struct PriceToAmount { diff --git a/crates/brk_computer/src/states/realized.rs b/crates/brk_computer/src/stateful/states/realized.rs similarity index 100% rename from crates/brk_computer/src/states/realized.rs rename to crates/brk_computer/src/stateful/states/realized.rs diff --git a/crates/brk_computer/src/states/supply.rs b/crates/brk_computer/src/stateful/states/supply.rs similarity index 100% rename from crates/brk_computer/src/states/supply.rs rename to crates/brk_computer/src/stateful/states/supply.rs diff --git a/crates/brk_computer/src/states/transacted.rs b/crates/brk_computer/src/stateful/states/transacted.rs similarity index 100% rename from crates/brk_computer/src/states/transacted.rs rename to crates/brk_computer/src/stateful/states/transacted.rs diff --git a/crates/brk_computer/src/states/unrealized.rs b/crates/brk_computer/src/stateful/states/unrealized.rs similarity index 99% rename from crates/brk_computer/src/states/unrealized.rs rename to crates/brk_computer/src/stateful/states/unrealized.rs index 5e5073444..40daa8fba 100644 --- a/crates/brk_computer/src/states/unrealized.rs +++ b/crates/brk_computer/src/stateful/states/unrealized.rs @@ -3,7 +3,7 @@ use std::ops::Bound; use brk_types::{Dollars, Sats}; use vecdb::CheckedSub; -use crate::PriceToAmount; +use super::PriceToAmount; #[derive(Debug, Default, Clone)] pub struct UnrealizedState { diff --git a/crates/brk_computer/src/stateful/cohorts/state_utxo.rs b/crates/brk_computer/src/stateful/states/utxo_cohort.rs similarity index 92% rename from crates/brk_computer/src/stateful/cohorts/state_utxo.rs rename to crates/brk_computer/src/stateful/states/utxo_cohort.rs index dc3c12766..2b2c3e1e5 100644 --- a/crates/brk_computer/src/stateful/cohorts/state_utxo.rs +++ b/crates/brk_computer/src/stateful/states/utxo_cohort.rs @@ -4,8 +4,7 @@ use brk_error::Result; use brk_types::Sats; use derive_deref::{Deref, DerefMut}; -use super::CohortState; -use crate::{RealizedState, SupplyState}; +use super::{CohortState, RealizedState, SupplyState}; #[derive(Clone, Deref, DerefMut)] pub struct UTXOCohortState(CohortState); diff --git a/crates/brk_computer/src/stateful/vecs.rs b/crates/brk_computer/src/stateful/vecs.rs index 1d067b03b..ab9a5ada6 100644 --- a/crates/brk_computer/src/stateful/vecs.rs +++ b/crates/brk_computer/src/stateful/vecs.rs @@ -3,24 +3,34 @@ use std::path::Path; use brk_error::Result; -use log::info; use brk_indexer::Indexer; use brk_traversable::Traversable; -use brk_types::{Dollars, EmptyAddressData, EmptyAddressIndex, Height, LoadedAddressData, LoadedAddressIndex, Sats, StoredU64, TxInIndex, TxOutIndex, Version}; +use brk_types::{ + Dollars, EmptyAddressData, EmptyAddressIndex, Height, LoadedAddressData, LoadedAddressIndex, + Sats, StoredU64, TxInIndex, TxOutIndex, Version, +}; +use log::info; use vecdb::{ - AnyStoredVec, BytesVec, Database, EagerVec, Exit, ImportableVec, IterableCloneableVec, - LazyVecFrom1, PAGE_SIZE, PcoVec, + AnyStoredVec, AnyVec, BytesVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec, + IterableCloneableVec, LazyVecFrom1, PAGE_SIZE, PcoVec, Stamp, TypedVecIterator, VecIndex, }; use crate::{ - Indexes, SupplyState, chain, - grouped::{ComputedValueVecsFromHeight, ComputedVecsFromDateIndex, ComputedVecsFromHeight, Source, VecBuilderOptions}, + Indexes, chain, + grouped::{ + ComputedValueVecsFromHeight, ComputedVecsFromDateIndex, ComputedVecsFromHeight, Source, + VecBuilderOptions, + }, indexes, price, + stateful::{ + compute::{StartMode, determine_start_mode, process_blocks, recover_state, reset_state}, + states::BlockState, + }, utils::OptionExt, }; use super::{ - AddressCohorts, AddressesDataVecs, AnyAddressIndexesVecs, UTXOCohorts, + AddressCohorts, AddressesDataVecs, AnyAddressIndexesVecs, SupplyState, UTXOCohorts, address::{AddressTypeToHeightToAddressCount, AddressTypeToIndexesToAddressCount}, compute::aggregates, }; @@ -245,12 +255,6 @@ impl Vecs { starting_indexes: &mut Indexes, exit: &Exit, ) -> Result<()> { - use super::compute::{ - StartMode, determine_start_mode, process_blocks, recover_state, reset_state, - }; - use crate::states::BlockState; - use vecdb::{AnyVec, GenericStoredVec, Stamp, TypedVecIterator, VecIndex}; - // 1. Find minimum computed height for recovery let chain_state_height = Height::from(self.chain_state.len()); @@ -345,7 +349,8 @@ impl Vecs { // 2b. Validate computed versions let base_version = VERSION; self.utxo_cohorts.validate_computed_versions(base_version)?; - self.address_cohorts.validate_computed_versions(base_version)?; + self.address_cohorts + .validate_computed_versions(base_version)?; // 3. Get last height from indexer let last_height = Height::from( diff --git a/crates/brk_computer/src/states/flushable.rs b/crates/brk_computer/src/states/flushable.rs deleted file mode 100644 index 9bb339bec..000000000 --- a/crates/brk_computer/src/states/flushable.rs +++ /dev/null @@ -1,26 +0,0 @@ -//! Traits for consistent state flushing and importing. -//! -//! These traits ensure all stateful components follow the same patterns -//! for checkpoint/resume operations, preventing bugs where new fields -//! are forgotten during flush operations. - -use brk_error::Result; -use vecdb::Exit; - -/// Trait for components that can be flushed to disk. -/// -/// This is for simple flush operations that don't require height tracking. -pub trait Flushable { - /// Safely flush data to disk with fsync for durability. - fn safe_flush(&mut self, exit: &Exit) -> Result<()>; -} - -/// Blanket implementation for Option where T: Flushable -impl Flushable for Option { - fn safe_flush(&mut self, exit: &Exit) -> Result<()> { - if let Some(inner) = self.as_mut() { - inner.safe_flush(exit)?; - } - Ok(()) - } -}