computer: stateful snapshot

This commit is contained in:
nym21
2025-12-18 11:18:18 +01:00
parent 59f1296d56
commit a76139c0ea
2 changed files with 86 additions and 108 deletions
@@ -8,16 +8,16 @@
//! 5. Push to height-indexed vectors
//! 6. Periodically flush checkpoints
use std::{mem, thread};
use std::thread;
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_indexer::Indexer;
use brk_types::{DateIndex, Height, OutputType, Sats, TypeIndex};
use log::info;
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex};
use vecdb::{Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex};
use crate::stateful::compute::flush::flush;
use crate::states::{BlockState, Transacted};
use crate::utils::OptionExt;
use crate::{chain, indexes, price};
@@ -29,7 +29,6 @@ use super::{
BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1,
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, TxInIterators,
TxOutIterators, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex,
flush::flush_checkpoint as flush_checkpoint_full,
};
use crate::stateful::address::AddressTypeToAddressCount;
use crate::stateful::process::{
@@ -489,7 +488,7 @@ pub fn process_blocks(
// Drop readers before flush to release mmap handles
drop(vr);
flush_checkpoint(
flush(
vecs,
height,
chain_state,
@@ -506,7 +505,7 @@ pub fn process_blocks(
// Final flush
let _lock = exit.lock();
drop(vr);
flush_checkpoint(
flush(
vecs,
last_height,
chain_state,
@@ -556,65 +555,3 @@ fn push_cohort_states(
Ok(())
}
/// Flush checkpoint to disk.
///
/// Flushes all accumulated data including:
/// - Cohort stateful vectors
/// - Height-indexed vectors
/// - Address data caches (loaded and empty)
/// - Chain state (synced from in-memory to persisted)
#[allow(clippy::too_many_arguments)]
fn flush_checkpoint(
vecs: &mut Vecs,
height: Height,
chain_state: &[BlockState],
loaded_cache: &mut AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
empty_cache: &mut AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
exit: &Exit,
) -> Result<()> {
info!("Flushing checkpoint at height {}...", height);
// Flush height-indexed vectors
vecs.height_to_unspendable_supply.safe_write(exit)?;
vecs.height_to_opreturn_supply.safe_write(exit)?;
vecs.addresstype_to_height_to_addr_count.safe_flush(exit)?;
vecs.addresstype_to_height_to_empty_addr_count
.safe_flush(exit)?;
// Process and flush address data updates
let empty_updates = mem::take(empty_cache);
let loaded_updates = mem::take(loaded_cache);
flush_checkpoint_full(
height,
&mut vecs
.utxo_cohorts
.par_iter_separate_mut()
.map(|v| v as &mut dyn DynCohortVecs)
.collect::<Vec<_>>()[..],
&mut vecs
.address_cohorts
.par_iter_separate_mut()
.map(|v| v as &mut dyn DynCohortVecs)
.collect::<Vec<_>>()[..],
&mut vecs.any_address_indexes,
&mut vecs.addresses_data,
empty_updates,
loaded_updates,
true,
exit,
)?;
// Flush txoutindex_to_txinindex with stamp
vecs.txoutindex_to_txinindex
.stamped_flush_with_changes(height.into())?;
// Sync in-memory chain_state to persisted and flush
vecs.chain_state.truncate_if_needed(Height::ZERO)?;
for block_state in chain_state {
vecs.chain_state.push(block_state.supply.clone());
}
vecs.chain_state.stamped_flush_with_changes(height.into())?;
Ok(())
}
@@ -1,36 +1,27 @@
//! State flushing logic for checkpoints.
//!
//! Handles periodic flushing of all stateful data to disk,
//! including cohort states, address data, and chain state.
//! including address data and chain state.
use std::mem;
use brk_error::Result;
use brk_types::{AnyAddressIndex, Height};
use log::info;
use vecdb::{Exit, Stamp};
use vecdb::{AnyStoredVec, Exit, GenericStoredVec, Stamp};
use crate::stateful::process::{
EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses,
process_loaded_addresses,
use crate::{
stateful::{
Vecs,
process::{
EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses,
process_loaded_addresses,
},
},
states::BlockState,
};
use super::super::address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs};
use super::super::cohorts::DynCohortVecs;
/// Flush all cohort stateful vectors.
pub fn flush_cohort_states(
height: Height,
utxo_vecs: &mut [&mut dyn DynCohortVecs],
address_vecs: &mut [&mut dyn DynCohortVecs],
exit: &Exit,
) -> Result<()> {
for v in utxo_vecs.iter_mut() {
v.safe_flush_stateful_vecs(height, exit)?;
}
for v in address_vecs.iter_mut() {
v.safe_flush_stateful_vecs(height, exit)?;
}
Ok(())
}
/// Apply address index updates to the index storage.
fn apply_address_index_updates(
@@ -45,40 +36,90 @@ fn apply_address_index_updates(
Ok(())
}
/// Full state flush at a checkpoint.
/// Flush checkpoint to disk.
///
/// This is the main entry point for checkpoint flushing:
/// 1. Flush cohort stateful vectors
/// 2. Process address data updates (empty and loaded)
/// 3. Update address indexes
/// 4. Stamped flush address indexes and data
/// 5. Flush chain state
/// Flushes all accumulated data including:
/// - Cohort stateful vectors
/// - Height-indexed vectors
/// - Address data caches (loaded and empty)
/// - Chain state (synced from in-memory to persisted)
#[allow(clippy::too_many_arguments)]
pub fn flush_checkpoint(
pub fn flush(
vecs: &mut Vecs,
height: Height,
chain_state: &[BlockState],
loaded_cache: &mut AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
empty_cache: &mut AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
exit: &Exit,
) -> Result<()> {
info!("Flushing checkpoint at height {}...", height);
let _lock = exit.lock();
// Flush cohort states (separate + aggregate)
vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?;
vecs.address_cohorts
.safe_flush_stateful_vecs(height, exit)?;
// Flush height-indexed vectors
vecs.height_to_unspendable_supply.safe_write(exit)?;
vecs.height_to_opreturn_supply.safe_write(exit)?;
vecs.addresstype_to_height_to_addr_count.safe_flush(exit)?;
vecs.addresstype_to_height_to_empty_addr_count
.safe_flush(exit)?;
// Process and flush address data updates
let empty_updates = mem::take(empty_cache);
let loaded_updates = mem::take(loaded_cache);
flush_address_data(
height,
&mut vecs.any_address_indexes,
&mut vecs.addresses_data,
empty_updates,
loaded_updates,
true,
)?;
// Flush txoutindex_to_txinindex with stamp
vecs.txoutindex_to_txinindex
.stamped_flush_with_changes(height.into())?;
// Sync in-memory chain_state to persisted and flush
vecs.chain_state.truncate_if_needed(Height::ZERO)?;
for block_state in chain_state {
vecs.chain_state.push(block_state.supply.clone());
}
vecs.chain_state.stamped_flush_with_changes(height.into())?;
Ok(())
}
/// Flush address data at a checkpoint.
///
/// Note: Cohort states are flushed separately before this is called.
///
/// 1. Process address data updates (empty and loaded)
/// 2. Update address indexes
/// 3. Stamped flush address indexes and data
fn flush_address_data(
height: Height,
utxo_vecs: &mut [&mut dyn DynCohortVecs],
address_vecs: &mut [&mut dyn DynCohortVecs],
address_indexes: &mut AnyAddressIndexesVecs,
addresses_data: &mut AddressesDataVecs,
empty_updates: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
loaded_updates: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
with_changes: bool,
exit: &Exit,
) -> Result<()> {
info!("Flushing at height {}...", height);
info!("Flushing address data at height {}...", height);
// 1. Flush cohort states
flush_cohort_states(height, utxo_vecs, address_vecs, exit)?;
// 2. Process address updates - empty first, then loaded
// 1. Process address updates - empty first, then loaded
let empty_result = process_empty_addresses(addresses_data, empty_updates)?;
let loaded_result = process_loaded_addresses(addresses_data, loaded_updates)?;
let all_updates = empty_result.merge(loaded_result);
// 3. Apply index updates
// 2. Apply index updates
apply_address_index_updates(address_indexes, all_updates)?;
// 4. Stamped flush
// 3. Stamped flush
let stamp = Stamp::from(height);
address_indexes.flush(stamp, with_changes)?;
addresses_data.flush(stamp, with_changes)?;