computer: stateful snapshot

This commit is contained in:
nym21
2025-12-18 11:37:33 +01:00
parent a76139c0ea
commit edbec6fd5c
2 changed files with 91 additions and 90 deletions
@@ -17,25 +17,31 @@ use brk_types::{DateIndex, Height, OutputType, Sats, TypeIndex};
use log::info;
use vecdb::{Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex};
use crate::stateful::compute::flush::flush;
use crate::states::{BlockState, Transacted};
use crate::utils::OptionExt;
use crate::{chain, indexes, price};
use crate::{
chain, indexes, price,
stateful::{
address::AddressTypeToAddressCount,
compute::flush::{flush, process_address_updates},
process::{
AddressLookup, EmptyAddressDataWithSource, InputsResult, LoadedAddressDataWithSource,
build_txoutindex_to_height_map, process_inputs, process_outputs, process_received,
process_sent, update_tx_counts,
},
},
states::{BlockState, Transacted},
utils::OptionExt,
};
use super::super::address::AddressTypeToTypeIndexMap;
use super::super::cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts};
use super::super::vecs::Vecs;
use super::{
super::{
address::AddressTypeToTypeIndexMap,
cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts},
vecs::Vecs,
},
BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1,
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, TxInIterators,
TxOutIterators, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex,
};
use crate::stateful::address::AddressTypeToAddressCount;
use crate::stateful::process::{
AddressLookup, EmptyAddressDataWithSource, InputsResult, LoadedAddressDataWithSource,
build_txoutindex_to_height_map, process_inputs, process_outputs, process_received,
process_sent, update_tx_counts,
};
/// Process all blocks from starting_height to last_height.
#[allow(clippy::too_many_arguments)]
@@ -488,15 +494,17 @@ pub fn process_blocks(
// Drop readers before flush to release mmap handles
drop(vr);
flush(
vecs,
height,
chain_state,
&mut loaded_cache,
&mut empty_cache,
exit,
// Process address updates (mutations)
process_address_updates(
&mut vecs.addresses_data,
&mut vecs.any_address_indexes,
std::mem::take(&mut empty_cache),
std::mem::take(&mut loaded_cache),
)?;
// Flush to disk (pure I/O)
flush(vecs, height, chain_state, exit)?;
// Recreate readers after flush to pick up new data
vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
}
@@ -505,15 +513,18 @@ pub fn process_blocks(
// Final flush
let _lock = exit.lock();
drop(vr);
flush(
vecs,
last_height,
chain_state,
&mut loaded_cache,
&mut empty_cache,
exit,
// Process address updates (mutations)
process_address_updates(
&mut vecs.addresses_data,
&mut vecs.any_address_indexes,
std::mem::take(&mut empty_cache),
std::mem::take(&mut loaded_cache),
)?;
// Flush to disk (pure I/O)
flush(vecs, last_height, chain_state, exit)?;
Ok(())
}
@@ -1,12 +1,13 @@
//! State flushing logic for checkpoints.
//!
//! Handles periodic flushing of all stateful data to disk,
//! including address data and chain state.
//! Separates processing (mutations) from flushing (I/O):
//! - `process_address_updates`: applies cached address changes to storage
//! - `flush`: writes all data to disk
use std::mem;
use std::time::Instant;
use brk_error::Result;
use brk_types::{AnyAddressIndex, Height};
use brk_types::Height;
use log::info;
use vecdb::{AnyStoredVec, Exit, GenericStoredVec, Stamp};
@@ -23,38 +24,67 @@ use crate::{
use super::super::address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs};
/// Apply address index updates to the index storage.
fn apply_address_index_updates(
/// Process address updates from caches.
///
/// Applies all accumulated address changes to storage structures:
/// - Processes empty address transitions
/// - Processes loaded address transitions
/// - Updates address indexes
///
/// Call this before `flush()` to prepare data for writing.
pub fn process_address_updates(
addresses_data: &mut AddressesDataVecs,
address_indexes: &mut AnyAddressIndexesVecs,
updates: AddressTypeToTypeIndexMap<AnyAddressIndex>,
empty_updates: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
loaded_updates: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
) -> Result<()> {
for (address_type, sorted) in updates.into_sorted_iter() {
let t0 = Instant::now();
// Process address data transitions
let empty_result = process_empty_addresses(addresses_data, empty_updates)?;
let t1 = Instant::now();
let loaded_result = process_loaded_addresses(addresses_data, loaded_updates)?;
let t2 = Instant::now();
let all_updates = empty_result.merge(loaded_result);
let t3 = Instant::now();
// Apply index updates
for (address_type, sorted) in all_updates.into_sorted_iter() {
for (typeindex, any_index) in sorted {
address_indexes.update_or_push(address_type, typeindex, any_index)?;
}
}
let t4 = Instant::now();
info!(
"process_address_updates: empty={:?} loaded={:?} merge={:?} indexes={:?} total={:?}",
t1 - t0,
t2 - t1,
t3 - t2,
t4 - t3,
t4 - t0
);
Ok(())
}
/// Flush checkpoint to disk.
/// Flush checkpoint to disk (pure I/O, no processing).
///
/// Flushes all accumulated data including:
/// Writes all accumulated data:
/// - Cohort stateful vectors
/// - Height-indexed vectors
/// - Address data caches (loaded and empty)
/// - Chain state (synced from in-memory to persisted)
#[allow(clippy::too_many_arguments)]
/// - Address indexes and data
/// - Transaction output index mappings
/// - Chain state
pub fn flush(
vecs: &mut Vecs,
height: Height,
chain_state: &[BlockState],
loaded_cache: &mut AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
empty_cache: &mut AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
exit: &Exit,
) -> Result<()> {
info!("Flushing checkpoint at height {}...", height);
let _lock = exit.lock();
info!("Flushing at height {}...", height);
// Flush cohort states (separate + aggregate)
vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?;
@@ -68,19 +98,12 @@ pub fn flush(
vecs.addresstype_to_height_to_empty_addr_count
.safe_flush(exit)?;
// Process and flush address data updates
let empty_updates = mem::take(empty_cache);
let loaded_updates = mem::take(loaded_cache);
flush_address_data(
height,
&mut vecs.any_address_indexes,
&mut vecs.addresses_data,
empty_updates,
loaded_updates,
true,
)?;
// Flush address data
let stamp = Stamp::from(height);
vecs.any_address_indexes.flush(stamp, true)?;
vecs.addresses_data.flush(stamp, true)?;
// Flush txoutindex_to_txinindex with stamp
// Flush txoutindex_to_txinindex
vecs.txoutindex_to_txinindex
.stamped_flush_with_changes(height.into())?;
@@ -93,36 +116,3 @@ pub fn flush(
Ok(())
}
/// Flush address data at a checkpoint.
///
/// Note: Cohort states are flushed separately before this is called.
///
/// 1. Process address data updates (empty and loaded)
/// 2. Update address indexes
/// 3. Stamped flush address indexes and data
fn flush_address_data(
height: Height,
address_indexes: &mut AnyAddressIndexesVecs,
addresses_data: &mut AddressesDataVecs,
empty_updates: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
loaded_updates: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
with_changes: bool,
) -> Result<()> {
info!("Flushing address data at height {}...", height);
// 1. Process address updates - empty first, then loaded
let empty_result = process_empty_addresses(addresses_data, empty_updates)?;
let loaded_result = process_loaded_addresses(addresses_data, loaded_updates)?;
let all_updates = empty_result.merge(loaded_result);
// 2. Apply index updates
apply_address_index_updates(address_indexes, all_updates)?;
// 3. Stamped flush
let stamp = Stamp::from(height);
address_indexes.flush(stamp, with_changes)?;
addresses_data.flush(stamp, with_changes)?;
Ok(())
}