diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index db3bbceb0..619f923b6 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -17,25 +17,31 @@ use brk_types::{DateIndex, Height, OutputType, Sats, TypeIndex}; use log::info; use vecdb::{Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex}; -use crate::stateful::compute::flush::flush; -use crate::states::{BlockState, Transacted}; -use crate::utils::OptionExt; -use crate::{chain, indexes, price}; +use crate::{ + chain, indexes, price, + stateful::{ + address::AddressTypeToAddressCount, + compute::flush::{flush, process_address_updates}, + process::{ + AddressLookup, EmptyAddressDataWithSource, InputsResult, LoadedAddressDataWithSource, + build_txoutindex_to_height_map, process_inputs, process_outputs, process_received, + process_sent, update_tx_counts, + }, + }, + states::{BlockState, Transacted}, + utils::OptionExt, +}; -use super::super::address::AddressTypeToTypeIndexMap; -use super::super::cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts}; -use super::super::vecs::Vecs; use super::{ + super::{ + address::AddressTypeToTypeIndexMap, + cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts}, + vecs::Vecs, + }, BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1, BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, TxInIterators, TxOutIterators, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex, }; -use crate::stateful::address::AddressTypeToAddressCount; -use crate::stateful::process::{ - AddressLookup, EmptyAddressDataWithSource, InputsResult, LoadedAddressDataWithSource, - build_txoutindex_to_height_map, process_inputs, process_outputs, process_received, - process_sent, update_tx_counts, -}; /// Process all blocks from starting_height to last_height. #[allow(clippy::too_many_arguments)] @@ -488,15 +494,17 @@ pub fn process_blocks( // Drop readers before flush to release mmap handles drop(vr); - flush( - vecs, - height, - chain_state, - &mut loaded_cache, - &mut empty_cache, - exit, + // Process address updates (mutations) + process_address_updates( + &mut vecs.addresses_data, + &mut vecs.any_address_indexes, + std::mem::take(&mut empty_cache), + std::mem::take(&mut loaded_cache), )?; + // Flush to disk (pure I/O) + flush(vecs, height, chain_state, exit)?; + // Recreate readers after flush to pick up new data vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); } @@ -505,15 +513,18 @@ pub fn process_blocks( // Final flush let _lock = exit.lock(); drop(vr); - flush( - vecs, - last_height, - chain_state, - &mut loaded_cache, - &mut empty_cache, - exit, + + // Process address updates (mutations) + process_address_updates( + &mut vecs.addresses_data, + &mut vecs.any_address_indexes, + std::mem::take(&mut empty_cache), + std::mem::take(&mut loaded_cache), )?; + // Flush to disk (pure I/O) + flush(vecs, last_height, chain_state, exit)?; + Ok(()) } diff --git a/crates/brk_computer/src/stateful/compute/flush.rs b/crates/brk_computer/src/stateful/compute/flush.rs index 890f07683..e0f9dc554 100644 --- a/crates/brk_computer/src/stateful/compute/flush.rs +++ b/crates/brk_computer/src/stateful/compute/flush.rs @@ -1,12 +1,13 @@ //! State flushing logic for checkpoints. //! -//! Handles periodic flushing of all stateful data to disk, -//! including address data and chain state. +//! Separates processing (mutations) from flushing (I/O): +//! - `process_address_updates`: applies cached address changes to storage +//! - `flush`: writes all data to disk -use std::mem; +use std::time::Instant; use brk_error::Result; -use brk_types::{AnyAddressIndex, Height}; +use brk_types::Height; use log::info; use vecdb::{AnyStoredVec, Exit, GenericStoredVec, Stamp}; @@ -23,38 +24,67 @@ use crate::{ use super::super::address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs}; -/// Apply address index updates to the index storage. -fn apply_address_index_updates( +/// Process address updates from caches. +/// +/// Applies all accumulated address changes to storage structures: +/// - Processes empty address transitions +/// - Processes loaded address transitions +/// - Updates address indexes +/// +/// Call this before `flush()` to prepare data for writing. +pub fn process_address_updates( + addresses_data: &mut AddressesDataVecs, address_indexes: &mut AnyAddressIndexesVecs, - updates: AddressTypeToTypeIndexMap, + empty_updates: AddressTypeToTypeIndexMap, + loaded_updates: AddressTypeToTypeIndexMap, ) -> Result<()> { - for (address_type, sorted) in updates.into_sorted_iter() { + let t0 = Instant::now(); + + // Process address data transitions + let empty_result = process_empty_addresses(addresses_data, empty_updates)?; + let t1 = Instant::now(); + + let loaded_result = process_loaded_addresses(addresses_data, loaded_updates)?; + let t2 = Instant::now(); + + let all_updates = empty_result.merge(loaded_result); + let t3 = Instant::now(); + + // Apply index updates + for (address_type, sorted) in all_updates.into_sorted_iter() { for (typeindex, any_index) in sorted { address_indexes.update_or_push(address_type, typeindex, any_index)?; } } + let t4 = Instant::now(); + + info!( + "process_address_updates: empty={:?} loaded={:?} merge={:?} indexes={:?} total={:?}", + t1 - t0, + t2 - t1, + t3 - t2, + t4 - t3, + t4 - t0 + ); + Ok(()) } -/// Flush checkpoint to disk. +/// Flush checkpoint to disk (pure I/O, no processing). /// -/// Flushes all accumulated data including: +/// Writes all accumulated data: /// - Cohort stateful vectors /// - Height-indexed vectors -/// - Address data caches (loaded and empty) -/// - Chain state (synced from in-memory to persisted) -#[allow(clippy::too_many_arguments)] +/// - Address indexes and data +/// - Transaction output index mappings +/// - Chain state pub fn flush( vecs: &mut Vecs, height: Height, chain_state: &[BlockState], - loaded_cache: &mut AddressTypeToTypeIndexMap, - empty_cache: &mut AddressTypeToTypeIndexMap, exit: &Exit, ) -> Result<()> { - info!("Flushing checkpoint at height {}...", height); - - let _lock = exit.lock(); + info!("Flushing at height {}...", height); // Flush cohort states (separate + aggregate) vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?; @@ -68,19 +98,12 @@ pub fn flush( vecs.addresstype_to_height_to_empty_addr_count .safe_flush(exit)?; - // Process and flush address data updates - let empty_updates = mem::take(empty_cache); - let loaded_updates = mem::take(loaded_cache); - flush_address_data( - height, - &mut vecs.any_address_indexes, - &mut vecs.addresses_data, - empty_updates, - loaded_updates, - true, - )?; + // Flush address data + let stamp = Stamp::from(height); + vecs.any_address_indexes.flush(stamp, true)?; + vecs.addresses_data.flush(stamp, true)?; - // Flush txoutindex_to_txinindex with stamp + // Flush txoutindex_to_txinindex vecs.txoutindex_to_txinindex .stamped_flush_with_changes(height.into())?; @@ -93,36 +116,3 @@ pub fn flush( Ok(()) } - -/// Flush address data at a checkpoint. -/// -/// Note: Cohort states are flushed separately before this is called. -/// -/// 1. Process address data updates (empty and loaded) -/// 2. Update address indexes -/// 3. Stamped flush address indexes and data -fn flush_address_data( - height: Height, - address_indexes: &mut AnyAddressIndexesVecs, - addresses_data: &mut AddressesDataVecs, - empty_updates: AddressTypeToTypeIndexMap, - loaded_updates: AddressTypeToTypeIndexMap, - with_changes: bool, -) -> Result<()> { - info!("Flushing address data at height {}...", height); - - // 1. Process address updates - empty first, then loaded - let empty_result = process_empty_addresses(addresses_data, empty_updates)?; - let loaded_result = process_loaded_addresses(addresses_data, loaded_updates)?; - let all_updates = empty_result.merge(loaded_result); - - // 2. Apply index updates - apply_address_index_updates(address_indexes, all_updates)?; - - // 3. Stamped flush - let stamp = Stamp::from(height); - address_indexes.flush(stamp, with_changes)?; - addresses_data.flush(stamp, with_changes)?; - - Ok(()) -}