diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index ee260f29d..db3bbceb0 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -8,16 +8,16 @@ //! 5. Push to height-indexed vectors //! 6. Periodically flush checkpoints -use std::{mem, thread}; +use std::thread; use brk_error::Result; use brk_grouper::ByAddressType; use brk_indexer::Indexer; use brk_types::{DateIndex, Height, OutputType, Sats, TypeIndex}; use log::info; -use rayon::prelude::*; -use vecdb::{AnyStoredVec, Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex}; +use vecdb::{Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex}; +use crate::stateful::compute::flush::flush; use crate::states::{BlockState, Transacted}; use crate::utils::OptionExt; use crate::{chain, indexes, price}; @@ -29,7 +29,6 @@ use super::{ BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1, BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, TxInIterators, TxOutIterators, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex, - flush::flush_checkpoint as flush_checkpoint_full, }; use crate::stateful::address::AddressTypeToAddressCount; use crate::stateful::process::{ @@ -489,7 +488,7 @@ pub fn process_blocks( // Drop readers before flush to release mmap handles drop(vr); - flush_checkpoint( + flush( vecs, height, chain_state, @@ -506,7 +505,7 @@ pub fn process_blocks( // Final flush let _lock = exit.lock(); drop(vr); - flush_checkpoint( + flush( vecs, last_height, chain_state, @@ -556,65 +555,3 @@ fn push_cohort_states( Ok(()) } - -/// Flush checkpoint to disk. -/// -/// Flushes all accumulated data including: -/// - Cohort stateful vectors -/// - Height-indexed vectors -/// - Address data caches (loaded and empty) -/// - Chain state (synced from in-memory to persisted) -#[allow(clippy::too_many_arguments)] -fn flush_checkpoint( - vecs: &mut Vecs, - height: Height, - chain_state: &[BlockState], - loaded_cache: &mut AddressTypeToTypeIndexMap, - empty_cache: &mut AddressTypeToTypeIndexMap, - exit: &Exit, -) -> Result<()> { - info!("Flushing checkpoint at height {}...", height); - - // Flush height-indexed vectors - vecs.height_to_unspendable_supply.safe_write(exit)?; - vecs.height_to_opreturn_supply.safe_write(exit)?; - vecs.addresstype_to_height_to_addr_count.safe_flush(exit)?; - vecs.addresstype_to_height_to_empty_addr_count - .safe_flush(exit)?; - - // Process and flush address data updates - let empty_updates = mem::take(empty_cache); - let loaded_updates = mem::take(loaded_cache); - flush_checkpoint_full( - height, - &mut vecs - .utxo_cohorts - .par_iter_separate_mut() - .map(|v| v as &mut dyn DynCohortVecs) - .collect::>()[..], - &mut vecs - .address_cohorts - .par_iter_separate_mut() - .map(|v| v as &mut dyn DynCohortVecs) - .collect::>()[..], - &mut vecs.any_address_indexes, - &mut vecs.addresses_data, - empty_updates, - loaded_updates, - true, - exit, - )?; - - // Flush txoutindex_to_txinindex with stamp - vecs.txoutindex_to_txinindex - .stamped_flush_with_changes(height.into())?; - - // Sync in-memory chain_state to persisted and flush - vecs.chain_state.truncate_if_needed(Height::ZERO)?; - for block_state in chain_state { - vecs.chain_state.push(block_state.supply.clone()); - } - vecs.chain_state.stamped_flush_with_changes(height.into())?; - - Ok(()) -} diff --git a/crates/brk_computer/src/stateful/compute/flush.rs b/crates/brk_computer/src/stateful/compute/flush.rs index df051c385..890f07683 100644 --- a/crates/brk_computer/src/stateful/compute/flush.rs +++ b/crates/brk_computer/src/stateful/compute/flush.rs @@ -1,36 +1,27 @@ //! State flushing logic for checkpoints. //! //! Handles periodic flushing of all stateful data to disk, -//! including cohort states, address data, and chain state. +//! including address data and chain state. + +use std::mem; use brk_error::Result; use brk_types::{AnyAddressIndex, Height}; use log::info; -use vecdb::{Exit, Stamp}; +use vecdb::{AnyStoredVec, Exit, GenericStoredVec, Stamp}; -use crate::stateful::process::{ - EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses, - process_loaded_addresses, +use crate::{ + stateful::{ + Vecs, + process::{ + EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses, + process_loaded_addresses, + }, + }, + states::BlockState, }; use super::super::address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs}; -use super::super::cohorts::DynCohortVecs; - -/// Flush all cohort stateful vectors. -pub fn flush_cohort_states( - height: Height, - utxo_vecs: &mut [&mut dyn DynCohortVecs], - address_vecs: &mut [&mut dyn DynCohortVecs], - exit: &Exit, -) -> Result<()> { - for v in utxo_vecs.iter_mut() { - v.safe_flush_stateful_vecs(height, exit)?; - } - for v in address_vecs.iter_mut() { - v.safe_flush_stateful_vecs(height, exit)?; - } - Ok(()) -} /// Apply address index updates to the index storage. fn apply_address_index_updates( @@ -45,40 +36,90 @@ fn apply_address_index_updates( Ok(()) } -/// Full state flush at a checkpoint. +/// Flush checkpoint to disk. /// -/// This is the main entry point for checkpoint flushing: -/// 1. Flush cohort stateful vectors -/// 2. Process address data updates (empty and loaded) -/// 3. Update address indexes -/// 4. Stamped flush address indexes and data -/// 5. Flush chain state +/// Flushes all accumulated data including: +/// - Cohort stateful vectors +/// - Height-indexed vectors +/// - Address data caches (loaded and empty) +/// - Chain state (synced from in-memory to persisted) #[allow(clippy::too_many_arguments)] -pub fn flush_checkpoint( +pub fn flush( + vecs: &mut Vecs, + height: Height, + chain_state: &[BlockState], + loaded_cache: &mut AddressTypeToTypeIndexMap, + empty_cache: &mut AddressTypeToTypeIndexMap, + exit: &Exit, +) -> Result<()> { + info!("Flushing checkpoint at height {}...", height); + + let _lock = exit.lock(); + + // Flush cohort states (separate + aggregate) + vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?; + vecs.address_cohorts + .safe_flush_stateful_vecs(height, exit)?; + + // Flush height-indexed vectors + vecs.height_to_unspendable_supply.safe_write(exit)?; + vecs.height_to_opreturn_supply.safe_write(exit)?; + vecs.addresstype_to_height_to_addr_count.safe_flush(exit)?; + vecs.addresstype_to_height_to_empty_addr_count + .safe_flush(exit)?; + + // Process and flush address data updates + let empty_updates = mem::take(empty_cache); + let loaded_updates = mem::take(loaded_cache); + flush_address_data( + height, + &mut vecs.any_address_indexes, + &mut vecs.addresses_data, + empty_updates, + loaded_updates, + true, + )?; + + // Flush txoutindex_to_txinindex with stamp + vecs.txoutindex_to_txinindex + .stamped_flush_with_changes(height.into())?; + + // Sync in-memory chain_state to persisted and flush + vecs.chain_state.truncate_if_needed(Height::ZERO)?; + for block_state in chain_state { + vecs.chain_state.push(block_state.supply.clone()); + } + vecs.chain_state.stamped_flush_with_changes(height.into())?; + + Ok(()) +} + +/// Flush address data at a checkpoint. +/// +/// Note: Cohort states are flushed separately before this is called. +/// +/// 1. Process address data updates (empty and loaded) +/// 2. Update address indexes +/// 3. Stamped flush address indexes and data +fn flush_address_data( height: Height, - utxo_vecs: &mut [&mut dyn DynCohortVecs], - address_vecs: &mut [&mut dyn DynCohortVecs], address_indexes: &mut AnyAddressIndexesVecs, addresses_data: &mut AddressesDataVecs, empty_updates: AddressTypeToTypeIndexMap, loaded_updates: AddressTypeToTypeIndexMap, with_changes: bool, - exit: &Exit, ) -> Result<()> { - info!("Flushing at height {}...", height); + info!("Flushing address data at height {}...", height); - // 1. Flush cohort states - flush_cohort_states(height, utxo_vecs, address_vecs, exit)?; - - // 2. Process address updates - empty first, then loaded + // 1. Process address updates - empty first, then loaded let empty_result = process_empty_addresses(addresses_data, empty_updates)?; let loaded_result = process_loaded_addresses(addresses_data, loaded_updates)?; let all_updates = empty_result.merge(loaded_result); - // 3. Apply index updates + // 2. Apply index updates apply_address_index_updates(address_indexes, all_updates)?; - // 4. Stamped flush + // 3. Stamped flush let stamp = Stamp::from(height); address_indexes.flush(stamp, with_changes)?; addresses_data.flush(stamp, with_changes)?;