computer: stateful snapshot

This commit is contained in:
nym21
2025-12-18 15:32:47 +01:00
parent a86085c2db
commit c5e912593a
51 changed files with 532 additions and 575 deletions

7
Cargo.lock generated
View File

@@ -4193,8 +4193,6 @@ dependencies = [
[[package]]
name = "rawdb"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b50bc582b92390d3efc2dabb509f77c1d8078eaa07c2bb3528bd414c0d337425"
dependencies = [
"libc",
"log",
@@ -5385,8 +5383,6 @@ checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
[[package]]
name = "vecdb"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09567d773f1f8fd20bfe9822f5c1d6c1bb148f1edc88fe2676ac93b692ad561f"
dependencies = [
"ctrlc",
"log",
@@ -5394,6 +5390,7 @@ dependencies = [
"parking_lot",
"pco",
"rawdb",
"rayon",
"serde",
"serde_json",
"thiserror 2.0.17",
@@ -5405,8 +5402,6 @@ dependencies = [
[[package]]
name = "vecdb_derive"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5297336eb5d686ea22acddd2368c1b97d131f035c6d9a736d61087a318546e41"
dependencies = [
"quote",
"syn 2.0.111",

View File

@@ -81,8 +81,8 @@ serde_derive = "1.0.228"
serde_json = { version = "1.0.145", features = ["float_roundtrip"] }
smallvec = "1.15.1"
tokio = { version = "1.48.0", features = ["rt-multi-thread"] }
vecdb = { version = "0.4.2", features = ["derive", "serde_json", "pco"] }
# vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco"] }
# vecdb = { version = "0.4.2", features = ["derive", "serde_json", "pco"] }
vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco"] }
# vecdb = { git = "https://github.com/anydb-rs/anydb", features = ["derive", "serde_json", "pco"] }
[workspace.metadata.release]

View File

@@ -1,15 +0,0 @@
use std::path::Path;
use brk_computer::PriceToAmount;
use brk_error::Result;
use brk_types::Height;
pub fn main() -> Result<()> {
let path = Path::new(&std::env::var("HOME").unwrap())
.join(".brk")
.join("computed/stateful/states");
let mut price_to_amount = PriceToAmount::create(&path, "addrs_above_1btc_under_10btc");
dbg!(price_to_amount.import_at_or_before(Height::new(890000))?);
dbg!(price_to_amount);
Ok(())
}

View File

@@ -3,7 +3,7 @@ use brk_traversable::{Traversable, TreeNode};
use brk_types::{DateIndex, Dollars, Version};
use vecdb::{AnyExportableVec, AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, PcoVec};
use crate::{Indexes, indexes, stateful::Flushable};
use crate::{Indexes, indexes};
use super::{ComputedVecsFromDateIndex, Source, VecBuilderOptions};
@@ -62,11 +62,7 @@ impl PricePercentiles {
Ok(())
}
pub fn compute_rest(
&mut self,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
pub fn compute_rest(&mut self, starting_indexes: &Indexes, exit: &Exit) -> Result<()> {
for vec in self.vecs.iter_mut().flatten() {
vec.compute_rest(
starting_indexes,
@@ -85,17 +81,6 @@ impl PricePercentiles {
}
}
impl Flushable for PricePercentiles {
fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
for vec in self.vecs.iter_mut().flatten() {
if let Some(dateindex_vec) = vec.dateindex.as_mut() {
dateindex_vec.safe_flush(exit)?;
}
}
Ok(())
}
}
impl PricePercentiles {
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
for vec in self.vecs.iter_mut().flatten() {

View File

@@ -22,18 +22,12 @@ mod market;
mod pools;
mod price;
mod stateful;
// mod stateful_old;
mod states;
mod traits;
mod utils;
use indexes::Indexes;
use utils::OptionExt;
// pub use pools::*;
pub use states::PriceToAmount;
use states::*;
#[derive(Clone, Traversable)]
pub struct Computer {
pub blks: blks::Vecs,

View File

@@ -61,16 +61,16 @@ impl AddressTypeToHeightToAddressCount {
})?))
}
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
use vecdb::AnyStoredVec;
self.p2pk65.safe_flush(exit)?;
self.p2pk33.safe_flush(exit)?;
self.p2pkh.safe_flush(exit)?;
self.p2sh.safe_flush(exit)?;
self.p2wpkh.safe_flush(exit)?;
self.p2wsh.safe_flush(exit)?;
self.p2tr.safe_flush(exit)?;
self.p2a.safe_flush(exit)?;
self.p2pk65.safe_write(exit)?;
self.p2pk33.safe_write(exit)?;
self.p2pkh.safe_write(exit)?;
self.p2sh.safe_write(exit)?;
self.p2wpkh.safe_write(exit)?;
self.p2wsh.safe_write(exit)?;
self.p2tr.safe_write(exit)?;
self.p2a.safe_write(exit)?;
Ok(())
}

View File

@@ -10,7 +10,7 @@ use smallvec::{Array, SmallVec};
use std::collections::hash_map::Entry;
/// A hashmap for each address type, keyed by TypeIndex.
#[derive(Debug, Deref, DerefMut)]
#[derive(Debug, Clone, Deref, DerefMut)]
pub struct AddressTypeToTypeIndexMap<T>(ByAddressType<FxHashMap<TypeIndex, T>>);
impl<T> Default for AddressTypeToTypeIndexMap<T> {

View File

@@ -15,7 +15,7 @@ use crate::{
Indexes,
grouped::{ComputedVecsFromHeight, Source, VecBuilderOptions},
indexes, price,
stateful::cohorts::AddressCohortState,
stateful::states::AddressCohortState,
};
use super::super::metrics::{CohortMetrics, ImportConfig};
@@ -224,9 +224,9 @@ impl DynCohortVecs for AddressCohortVecs {
Ok(())
}
fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
self.height_to_addr_count.safe_write(exit)?;
self.metrics.safe_flush(exit)?;
self.metrics.safe_write(exit)?;
if let Some(state) = self.state.as_mut() {
state.inner.commit(height)?;

View File

@@ -222,10 +222,10 @@ impl AddressCohorts {
})
}
/// Flush stateful vectors for separate cohorts.
pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
/// Write stateful vectors for separate cohorts.
pub fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
self.par_iter_separate_mut()
.try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))
.try_for_each(|v| v.safe_write_stateful_vecs(height, exit))
}
/// Get minimum height from all separate cohorts' height-indexed vectors.

View File

@@ -8,19 +8,12 @@
mod address;
mod address_cohorts;
mod state;
mod state_address;
mod state_utxo;
mod traits;
mod utxo;
mod utxo_cohorts;
pub use crate::states::Flushable;
pub use address::AddressCohortVecs;
pub use address_cohorts::AddressCohorts;
pub use state::CohortState;
pub use state_address::AddressCohortState;
pub use state_utxo::UTXOCohortState;
pub use traits::{CohortVecs, DynCohortVecs};
pub use utxo::UTXOCohortVecs;
pub use utxo_cohorts::UTXOCohorts;

View File

@@ -4,7 +4,7 @@ use brk_error::Result;
use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version};
use vecdb::{Exit, IterableVec};
use crate::{indexes, price, Indexes};
use crate::{Indexes, indexes, price};
/// Dynamic dispatch trait for cohort vectors.
///
@@ -34,8 +34,8 @@ pub trait DynCohortVecs: Send + Sync {
date_price: Option<Option<Dollars>>,
) -> Result<()>;
/// Flush stateful vectors to disk.
fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()>;
/// Write stateful vectors to disk.
fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()>;
/// First phase of post-processing computations.
#[allow(clippy::too_many_arguments)]

View File

@@ -9,9 +9,8 @@ use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version};
use vecdb::{Database, Exit, IterableVec};
use crate::{
Indexes,
indexes, price,
stateful::{CohortVecs, DynCohortVecs, cohorts::UTXOCohortState},
Indexes, indexes, price,
stateful::{CohortVecs, DynCohortVecs, states::UTXOCohortState},
};
use super::super::metrics::{CohortMetrics, ImportConfig};
@@ -120,14 +119,25 @@ impl DynCohortVecs for UTXOCohortVecs {
prev_height = state.import_at_or_before(prev_height)?;
// Restore supply state from height-indexed vectors
state.supply.value = self.metrics.supply.height_to_supply.read_once(prev_height)?;
state.supply.utxo_count = *self.metrics.supply.height_to_utxo_count.read_once(prev_height)?;
state.supply.value = self
.metrics
.supply
.height_to_supply
.read_once(prev_height)?;
state.supply.utxo_count = *self
.metrics
.supply
.height_to_utxo_count
.read_once(prev_height)?;
// Restore realized cap if present
if let Some(realized_metrics) = self.metrics.realized.as_mut()
&& let Some(realized_state) = state.realized.as_mut() {
realized_state.cap = realized_metrics.height_to_realized_cap.read_once(prev_height)?;
}
&& let Some(realized_state) = state.realized.as_mut()
{
realized_state.cap = realized_metrics
.height_to_realized_cap
.read_once(prev_height)?;
}
let result = prev_height.incremented();
self.state_starting_height = Some(result);
@@ -179,8 +189,8 @@ impl DynCohortVecs for UTXOCohortVecs {
Ok(())
}
fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
self.metrics.safe_flush(exit)?;
fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
self.metrics.safe_write(exit)?;
if let Some(state) = self.state.as_mut() {
state.commit(height)?;

View File

@@ -349,16 +349,16 @@ impl UTXOCohorts {
})
}
/// Flush stateful vectors for separate and aggregate cohorts.
pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
/// Write stateful vectors for separate and aggregate cohorts.
pub fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
// Flush separate cohorts (includes metrics + state)
self.par_iter_separate_mut()
.try_for_each(|v| v.safe_flush_stateful_vecs(height, exit))?;
.try_for_each(|v| v.safe_write_stateful_vecs(height, exit))?;
// Flush aggregate cohorts' metrics (including price_percentiles)
// Write aggregate cohorts' metrics (including price_percentiles)
// Note: aggregate cohorts no longer maintain price_to_amount state
for v in self.0.iter_aggregate_mut() {
v.metrics.safe_flush(exit)?;
v.metrics.safe_write(exit)?;
}
Ok(())
}

View File

@@ -3,7 +3,7 @@
use brk_grouper::{Filter, Filtered};
use brk_types::{Dollars, Height};
use crate::states::Transacted;
use crate::stateful::states::Transacted;
use super::UTXOCohorts;

View File

@@ -5,7 +5,10 @@ use brk_types::{CheckedSub, HalvingEpoch, Height};
use rustc_hash::FxHashMap;
use vecdb::VecIndex;
use crate::{states::{BlockState, Transacted}, utils::OptionExt};
use crate::{
stateful::states::{BlockState, Transacted},
utils::OptionExt,
};
use super::UTXOCohorts;

View File

@@ -9,7 +9,7 @@
use brk_grouper::AGE_BOUNDARIES;
use brk_types::{ONE_DAY_IN_SEC, Timestamp};
use crate::states::BlockState;
use crate::stateful::states::BlockState;
use super::UTXOCohorts;

View File

@@ -23,18 +23,16 @@ use crate::{
address::AddressTypeToAddressCount,
compute::write::{process_address_updates, write},
process::{
AddressLookup, EmptyAddressDataWithSource, InputsResult, LoadedAddressDataWithSource,
build_txoutindex_to_height_map, process_inputs, process_outputs, process_received,
process_sent, update_tx_counts,
AddressCache, InputsResult, build_txoutindex_to_height_map, process_inputs,
process_outputs, process_received, process_sent,
},
states::{BlockState, Transacted},
},
states::{BlockState, Transacted},
utils::OptionExt,
};
use super::{
super::{
address::AddressTypeToTypeIndexMap,
cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts},
vecs::Vecs,
},
@@ -225,11 +223,7 @@ pub fn process_blocks(
)
};
// Persistent address data caches (accumulate across blocks, flushed at checkpoints)
let mut loaded_cache: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource> =
AddressTypeToTypeIndexMap::default();
let mut empty_cache: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource> =
AddressTypeToTypeIndexMap::default();
let mut cache = AddressCache::new();
info!("Starting main block iteration...");
@@ -304,8 +298,7 @@ pub fn process_blocks(
&output_types,
&output_typeindexes,
&first_addressindexes,
&loaded_cache,
&empty_cache,
&cache,
&vr,
&vecs.any_address_indexes,
&vecs.addresses_data,
@@ -326,8 +319,7 @@ pub fn process_blocks(
&txoutindex_to_height,
&ir,
&first_addressindexes,
&loaded_cache,
&empty_cache,
&cache,
&vr,
&vecs.any_address_indexes,
&vecs.addresses_data,
@@ -347,15 +339,15 @@ pub fn process_blocks(
(outputs_result, inputs_result)
});
// Merge new address data into caches
loaded_cache.merge_mut(outputs_result.address_data);
loaded_cache.merge_mut(inputs_result.address_data);
// Merge new address data into current cache
cache.merge_loaded(outputs_result.address_data);
cache.merge_loaded(inputs_result.address_data);
// Combine txindex_vecs from outputs and inputs, then update tx_count
let combined_txindex_vecs = outputs_result
.txindex_vecs
.merge_vec(inputs_result.txindex_vecs);
update_tx_counts(&mut loaded_cache, &mut empty_cache, combined_txindex_vecs);
cache.update_tx_counts(combined_txindex_vecs);
let mut transacted = outputs_result.transacted;
let mut height_to_sent = inputs_result.height_to_sent;
@@ -398,15 +390,7 @@ pub fn process_blocks(
thread::scope(|scope| {
// Spawn address cohort processing in background thread
scope.spawn(|| {
// Create lookup closure that returns None (data was pre-fetched in parallel phase)
let get_address_data =
|_output_type, _type_index| -> Option<LoadedAddressDataWithSource> { None };
let mut lookup = AddressLookup {
get_address_data,
loaded: &mut loaded_cache,
empty: &mut empty_cache,
};
let mut lookup = cache.as_lookup();
// Process received outputs (addresses receiving funds)
process_received(
@@ -491,39 +475,43 @@ pub fn process_blocks(
{
let _lock = exit.lock();
// Drop readers before flush to release mmap handles
// Drop readers to release mmap handles
drop(vr);
let (empty_updates, loaded_updates) = cache.take();
// Process address updates (mutations)
process_address_updates(
&mut vecs.addresses_data,
&mut vecs.any_address_indexes,
std::mem::take(&mut empty_cache),
std::mem::take(&mut loaded_cache),
empty_updates,
loaded_updates,
)?;
// Flush to disk (pure I/O) - no changes saved for periodic flushes
// Write to disk (pure I/O) - no changes saved for periodic flushes
write(vecs, height, chain_state, false, exit)?;
// Recreate readers after flush to pick up new data
// Recreate readers
vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
}
}
// Final flush - always save changes for rollback support
// Final write - always save changes for rollback support
{
let _lock = exit.lock();
drop(vr);
let (empty_updates, loaded_updates) = cache.take();
// Process address updates (mutations)
process_address_updates(
&mut vecs.addresses_data,
&mut vecs.any_address_indexes,
std::mem::take(&mut empty_cache),
std::mem::take(&mut loaded_cache),
empty_updates,
loaded_updates,
)?;
// Flush to disk (pure I/O) - save changes for rollback
// Write to disk (pure I/O) - save changes for rollback
write(vecs, last_height, chain_state, true, exit)?;
}

View File

@@ -4,20 +4,15 @@
//! - `process_address_updates`: applies cached address changes to storage
//! - `flush`: writes all data to disk
use std::time::Instant;
use brk_error::Result;
use brk_types::Height;
use log::info;
use vecdb::{AnyStoredVec, Exit, GenericStoredVec, Stamp};
use crate::{
stateful::{
Vecs,
process::{
EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses,
process_loaded_addresses,
},
use crate::stateful::{
Vecs,
process::{
EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses,
process_loaded_addresses,
},
states::BlockState,
};
@@ -38,34 +33,15 @@ pub fn process_address_updates(
empty_updates: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
loaded_updates: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
) -> Result<()> {
let t0 = Instant::now();
// Process address data transitions
let empty_result = process_empty_addresses(addresses_data, empty_updates)?;
let t1 = Instant::now();
let loaded_result = process_loaded_addresses(addresses_data, loaded_updates)?;
let t2 = Instant::now();
let all_updates = empty_result.merge(loaded_result);
let t3 = Instant::now();
// Apply index updates
for (address_type, sorted) in all_updates.into_sorted_iter() {
for (typeindex, any_index) in sorted {
address_indexes.update_or_push(address_type, typeindex, any_index)?;
}
}
let t4 = Instant::now();
info!(
"process_address_updates: empty={:?} loaded={:?} merge={:?} indexes={:?} total={:?}",
t1 - t0,
t2 - t1,
t3 - t2,
t4 - t3,
t4 - t0
);
Ok(())
}
@@ -87,23 +63,17 @@ pub fn write(
with_changes: bool,
exit: &Exit,
) -> Result<()> {
let t0 = Instant::now();
// Flush cohort states (separate + aggregate)
vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?;
let t1 = Instant::now();
vecs.utxo_cohorts.safe_write_stateful_vecs(height, exit)?;
vecs.address_cohorts
.safe_flush_stateful_vecs(height, exit)?;
let t2 = Instant::now();
.safe_write_stateful_vecs(height, exit)?;
// Flush height-indexed vectors
vecs.height_to_unspendable_supply.safe_write(exit)?;
vecs.height_to_opreturn_supply.safe_write(exit)?;
vecs.addresstype_to_height_to_addr_count.safe_flush(exit)?;
vecs.addresstype_to_height_to_addr_count.safe_write(exit)?;
vecs.addresstype_to_height_to_empty_addr_count
.safe_flush(exit)?;
let t3 = Instant::now();
.safe_write(exit)?;
// Flush large vecs in parallel
let stamp = Stamp::from(height);
@@ -111,29 +81,16 @@ pub fn write(
let addresses_data = &mut vecs.addresses_data;
let txoutindex_to_txinindex = &mut vecs.txoutindex_to_txinindex;
let ((addr_result, addr_idx_time, addr_data_time), (txout_result, txout_time)) = rayon::join(
let (addr_result, txout_result) = rayon::join(
|| {
let t0 = Instant::now();
let r1 = any_address_indexes.write(stamp, with_changes);
let t1 = Instant::now();
let r2 = addresses_data.write(stamp, with_changes);
let t2 = Instant::now();
let r = r1.and(r2);
(r, t1 - t0, t2 - t1)
},
|| {
let t = Instant::now();
let r = txoutindex_to_txinindex.stamped_write_maybe_with_changes(stamp, with_changes);
(r, t.elapsed())
any_address_indexes
.write(stamp, with_changes)
.and(addresses_data.write(stamp, with_changes))
},
|| txoutindex_to_txinindex.stamped_write_maybe_with_changes(stamp, with_changes),
);
addr_result?;
txout_result?;
let t4 = Instant::now();
info!(
" parallel breakdown: addr_idx={:?} addr_data={:?} txout={:?}",
addr_idx_time, addr_data_time, txout_time
);
// Sync in-memory chain_state to persisted and flush
vecs.chain_state.truncate_if_needed(Height::ZERO)?;
@@ -142,18 +99,6 @@ pub fn write(
}
vecs.chain_state
.stamped_write_maybe_with_changes(stamp, with_changes)?;
let t5 = Instant::now();
info!(
"flush: utxo={:?} addr={:?} height={:?} parallel={:?} chain={:?} total={:?} (with_changes={})",
t1 - t0,
t2 - t1,
t3 - t2,
t4 - t3,
t5 - t4,
t5 - t0,
with_changes
);
Ok(())
}

View File

@@ -113,8 +113,8 @@ impl ActivityMetrics {
Ok(())
}
/// Flush height-indexed vectors to disk.
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.height_to_sent.safe_write(exit)?;
self.height_to_satblocks_destroyed.safe_write(exit)?;
self.height_to_satdays_destroyed.safe_write(exit)?;

View File

@@ -30,7 +30,7 @@ use brk_traversable::Traversable;
use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version};
use vecdb::{Exit, IterableVec};
use crate::{Indexes, indexes, price, stateful::cohorts::CohortState};
use crate::{Indexes, indexes, price, stateful::states::CohortState};
/// All metrics for a cohort, organized by category.
#[derive(Clone, Traversable)]
@@ -105,21 +105,21 @@ impl CohortMetrics {
Ok(())
}
/// Flush height-indexed vectors to disk.
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
self.supply.safe_flush(exit)?;
self.activity.safe_flush(exit)?;
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.supply.safe_write(exit)?;
self.activity.safe_write(exit)?;
if let Some(realized) = self.realized.as_mut() {
realized.safe_flush(exit)?;
realized.safe_write(exit)?;
}
if let Some(unrealized) = self.unrealized.as_mut() {
unrealized.safe_flush(exit)?;
unrealized.safe_write(exit)?;
}
if let Some(price_paid) = self.price_paid.as_mut() {
price_paid.safe_flush(exit)?;
price_paid.safe_write(exit)?;
}
Ok(())

View File

@@ -10,8 +10,7 @@ use vecdb::{AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVe
use crate::{
Indexes,
grouped::{ComputedVecsFromHeight, PricePercentiles, Source, VecBuilderOptions},
stateful::cohorts::CohortState,
states::Flushable,
stateful::states::CohortState,
};
use super::ImportConfig;
@@ -112,12 +111,12 @@ impl PricePaidMetrics {
Ok(())
}
/// Flush height-indexed vectors to disk.
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.height_to_min_price_paid.safe_write(exit)?;
self.height_to_max_price_paid.safe_write(exit)?;
if let Some(price_percentiles) = self.price_percentiles.as_mut() {
price_percentiles.safe_flush(exit)?;
price_percentiles.safe_write(exit)?;
}
Ok(())
}

View File

@@ -14,7 +14,7 @@ use crate::{
VecBuilderOptions,
},
indexes, price,
states::RealizedState,
stateful::states::RealizedState,
utils::OptionExt,
};
@@ -411,11 +411,16 @@ impl RealizedMetrics {
/// Push realized state values to height-indexed vectors.
pub fn truncate_push(&mut self, height: Height, state: &RealizedState) -> Result<()> {
self.height_to_realized_cap.truncate_push(height, state.cap)?;
self.height_to_realized_profit.truncate_push(height, state.profit)?;
self.height_to_realized_loss.truncate_push(height, state.loss)?;
self.height_to_value_created.truncate_push(height, state.value_created)?;
self.height_to_value_destroyed.truncate_push(height, state.value_destroyed)?;
self.height_to_realized_cap
.truncate_push(height, state.cap)?;
self.height_to_realized_profit
.truncate_push(height, state.profit)?;
self.height_to_realized_loss
.truncate_push(height, state.loss)?;
self.height_to_value_created
.truncate_push(height, state.value_created)?;
self.height_to_value_destroyed
.truncate_push(height, state.value_destroyed)?;
if let Some(v) = self.height_to_adjusted_value_created.as_mut() {
v.truncate_push(height, state.adj_value_created)?;
@@ -427,8 +432,8 @@ impl RealizedMetrics {
Ok(())
}
/// Flush height-indexed vectors to disk.
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.height_to_realized_cap.safe_write(exit)?;
self.height_to_realized_profit.safe_write(exit)?;
self.height_to_realized_loss.safe_write(exit)?;
@@ -458,55 +463,74 @@ impl RealizedMetrics {
) -> Result<()> {
self.height_to_realized_cap.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_realized_cap).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_realized_cap)
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_realized_profit.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_realized_profit).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_realized_profit)
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_realized_loss.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_realized_loss).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_realized_loss)
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_value_created.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_value_created).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_value_created)
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_value_destroyed.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_value_destroyed).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_value_destroyed)
.collect::<Vec<_>>(),
exit,
)?;
if self.height_to_adjusted_value_created.is_some() {
self.height_to_adjusted_value_created.um().compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| {
v.height_to_adjusted_value_created
.as_ref()
.unwrap_or(&v.height_to_value_created)
})
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_adjusted_value_destroyed.um().compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| {
v.height_to_adjusted_value_destroyed
.as_ref()
.unwrap_or(&v.height_to_value_destroyed)
})
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_adjusted_value_created
.um()
.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| {
v.height_to_adjusted_value_created
.as_ref()
.unwrap_or(&v.height_to_value_created)
})
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_adjusted_value_destroyed
.um()
.compute_sum_of_others(
starting_indexes.height,
&others
.iter()
.map(|v| {
v.height_to_adjusted_value_destroyed
.as_ref()
.unwrap_or(&v.height_to_value_destroyed)
})
.collect::<Vec<_>>(),
exit,
)?;
}
Ok(())

View File

@@ -17,7 +17,7 @@ use crate::{
VecBuilderOptions,
},
indexes, price,
states::SupplyState,
stateful::states::SupplyState,
};
use super::ImportConfig;
@@ -130,8 +130,8 @@ impl SupplyMetrics {
Ok(())
}
/// Flush height-indexed vectors to disk.
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.height_to_supply.safe_write(exit)?;
self.height_to_utxo_count.safe_write(exit)?;
Ok(())
@@ -152,12 +152,18 @@ impl SupplyMetrics {
) -> Result<()> {
self.height_to_supply.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_supply).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_supply)
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_utxo_count.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_utxo_count).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_utxo_count)
.collect::<Vec<_>>(),
exit,
)?;
Ok(())
@@ -244,7 +250,13 @@ impl SupplyMetrics {
dateindex_to_market_cap: Option<&impl IterableVec<DateIndex, Dollars>>,
_exit: &Exit,
) -> Result<()> {
let _ = (indexes, price, height_to_supply, height_to_market_cap, dateindex_to_market_cap);
let _ = (
indexes,
price,
height_to_supply,
height_to_market_cap,
dateindex_to_market_cap,
);
// Supply relative metrics computed here if needed
Ok(())

View File

@@ -5,7 +5,9 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{DateIndex, Dollars, Height, Sats, Version};
use vecdb::{AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableCloneableVec, PcoVec};
use vecdb::{
AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableCloneableVec, PcoVec,
};
use crate::{
Indexes,
@@ -13,7 +15,7 @@ use crate::{
ComputedHeightValueVecs, ComputedValueVecsFromDateIndex, ComputedVecsFromDateIndex, Source,
VecBuilderOptions,
},
states::UnrealizedState,
stateful::states::UnrealizedState,
};
use super::ImportConfig;
@@ -216,8 +218,8 @@ impl UnrealizedMetrics {
Ok(())
}
/// Flush height-indexed vectors to disk.
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.height_to_supply_in_profit.safe_write(exit)?;
self.height_to_supply_in_loss.safe_write(exit)?;
self.height_to_unrealized_profit.safe_write(exit)?;
@@ -238,42 +240,66 @@ impl UnrealizedMetrics {
) -> Result<()> {
self.height_to_supply_in_profit.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_supply_in_profit).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_supply_in_profit)
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_supply_in_loss.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_supply_in_loss).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_supply_in_loss)
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_unrealized_profit.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_unrealized_profit).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_unrealized_profit)
.collect::<Vec<_>>(),
exit,
)?;
self.height_to_unrealized_loss.compute_sum_of_others(
starting_indexes.height,
&others.iter().map(|v| &v.height_to_unrealized_loss).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.height_to_unrealized_loss)
.collect::<Vec<_>>(),
exit,
)?;
self.dateindex_to_supply_in_profit.compute_sum_of_others(
starting_indexes.dateindex,
&others.iter().map(|v| &v.dateindex_to_supply_in_profit).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.dateindex_to_supply_in_profit)
.collect::<Vec<_>>(),
exit,
)?;
self.dateindex_to_supply_in_loss.compute_sum_of_others(
starting_indexes.dateindex,
&others.iter().map(|v| &v.dateindex_to_supply_in_loss).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.dateindex_to_supply_in_loss)
.collect::<Vec<_>>(),
exit,
)?;
self.dateindex_to_unrealized_profit.compute_sum_of_others(
starting_indexes.dateindex,
&others.iter().map(|v| &v.dateindex_to_unrealized_profit).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.dateindex_to_unrealized_profit)
.collect::<Vec<_>>(),
exit,
)?;
self.dateindex_to_unrealized_loss.compute_sum_of_others(
starting_indexes.dateindex,
&others.iter().map(|v| &v.dateindex_to_unrealized_loss).collect::<Vec<_>>(),
&others
.iter()
.map(|v| &v.dateindex_to_unrealized_loss)
.collect::<Vec<_>>(),
exit,
)?;
Ok(())

View File

@@ -7,10 +7,12 @@
//!
//! ```text
//! stateful/
//! ├── address/ # Address type handling (indexes, data storage)
//! ├── address/ # Address type collections (type_vec, type_index_map, etc.)
//! ├── cohorts/ # Cohort traits and state management
//! ├── compute/ # Block processing pipeline
//! ── metrics/ # Metric vectors organized by category
//! ├── compute/ # Block processing loop and I/O
//! ── metrics/ # Metric vectors organized by category
//! ├── process/ # Transaction processing (inputs, outputs, cache)
//! └── vecs.rs # Main vectors container
//! ```
//!
//! ## Data Flow
@@ -26,20 +28,17 @@ pub mod cohorts;
pub mod compute;
pub mod metrics;
mod process;
mod states;
mod vecs;
use states::*;
pub use vecs::Vecs;
// Address re-exports
pub use address::{
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
};
pub use address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs};
// Cohort re-exports
pub use cohorts::{
AddressCohorts, CohortVecs, DynCohortVecs, Flushable, UTXOCohorts,
};
pub use cohorts::{AddressCohorts, CohortVecs, DynCohortVecs, UTXOCohorts};
// Compute re-exports
pub use compute::IndexerReaders;

View File

@@ -0,0 +1,127 @@
//! Address data update processing for flush operations.
//!
//! Handles transitions between loaded (non-zero balance) and empty (zero balance) states:
//! - New addresses: push to storage
//! - Updated addresses: update in place
//! - State transitions: delete from source, push to destination
use brk_error::Result;
use brk_types::{
AnyAddressIndex, EmptyAddressData, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex,
OutputType, TypeIndex,
};
use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource};
use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs};
/// Process loaded address data updates.
///
/// Handles:
/// - New loaded address: push to loaded storage
/// - Updated loaded address (was loaded): update in place
/// - Transition empty -> loaded: delete from empty, push to loaded
pub fn process_loaded_addresses(
addresses_data: &mut AddressesDataVecs,
loaded_updates: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
) -> Result<AddressTypeToTypeIndexMap<AnyAddressIndex>> {
let total: usize = loaded_updates.iter().map(|(_, m)| m.len()).sum();
let mut updates: Vec<(LoadedAddressIndex, LoadedAddressData)> = Vec::with_capacity(total);
let mut deletes: Vec<EmptyAddressIndex> = Vec::with_capacity(total);
let mut pushes: Vec<(OutputType, TypeIndex, LoadedAddressData)> = Vec::with_capacity(total);
for (address_type, items) in loaded_updates.into_iter() {
for (typeindex, source) in items {
match source {
LoadedAddressDataWithSource::New(data) => {
pushes.push((address_type, typeindex, data));
}
LoadedAddressDataWithSource::FromLoaded(index, data) => {
updates.push((index, data));
}
LoadedAddressDataWithSource::FromEmpty(empty_index, data) => {
deletes.push(empty_index);
pushes.push((address_type, typeindex, data));
}
}
}
}
// Phase 1: Deletes (creates holes)
for empty_index in deletes {
addresses_data.empty.delete(empty_index);
}
// Phase 2: Updates (in-place)
for (index, data) in updates {
addresses_data.loaded.update(index, data)?;
}
// Phase 3: Pushes (fills holes, then grows)
let mut result = AddressTypeToTypeIndexMap::default();
for (address_type, typeindex, data) in pushes {
let index = addresses_data.loaded.fill_first_hole_or_push(data)?;
result
.get_mut(address_type)
.unwrap()
.insert(typeindex, AnyAddressIndex::from(index));
}
Ok(result)
}
/// Process empty address data updates.
///
/// Handles:
/// - New empty address: push to empty storage
/// - Updated empty address (was empty): update in place
/// - Transition loaded -> empty: delete from loaded, push to empty
pub fn process_empty_addresses(
addresses_data: &mut AddressesDataVecs,
empty_updates: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
) -> Result<AddressTypeToTypeIndexMap<AnyAddressIndex>> {
let total: usize = empty_updates.iter().map(|(_, m)| m.len()).sum();
let mut updates: Vec<(EmptyAddressIndex, EmptyAddressData)> = Vec::with_capacity(total);
let mut deletes: Vec<LoadedAddressIndex> = Vec::with_capacity(total);
let mut pushes: Vec<(OutputType, TypeIndex, EmptyAddressData)> = Vec::with_capacity(total);
for (address_type, items) in empty_updates.into_iter() {
for (typeindex, source) in items {
match source {
EmptyAddressDataWithSource::New(data) => {
pushes.push((address_type, typeindex, data));
}
EmptyAddressDataWithSource::FromEmpty(index, data) => {
updates.push((index, data));
}
EmptyAddressDataWithSource::FromLoaded(loaded_index, data) => {
deletes.push(loaded_index);
pushes.push((address_type, typeindex, data));
}
}
}
}
// Phase 1: Deletes (creates holes)
for loaded_index in deletes {
addresses_data.loaded.delete(loaded_index);
}
// Phase 2: Updates (in-place)
for (index, data) in updates {
addresses_data.empty.update(index, data)?;
}
// Phase 3: Pushes (fills holes, then grows)
let mut result = AddressTypeToTypeIndexMap::default();
for (address_type, typeindex, data) in pushes {
let index = addresses_data.empty.fill_first_hole_or_push(data)?;
result
.get_mut(address_type)
.unwrap()
.insert(typeindex, AnyAddressIndex::from(index));
}
Ok(result)
}

View File

@@ -0,0 +1,77 @@
//! Address data cache for flush intervals.
//!
//! Accumulates address data across blocks within a flush interval.
//! Data is flushed to disk at checkpoints.
use brk_types::{OutputType, TypeIndex};
use super::super::address::AddressTypeToTypeIndexMap;
use super::{AddressLookup, EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec};
/// Cache for address data within a flush interval.
pub struct AddressCache {
/// Addresses with non-zero balance
loaded: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
/// Addresses that became empty (zero balance)
empty: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
}
impl Default for AddressCache {
fn default() -> Self {
Self::new()
}
}
impl AddressCache {
pub fn new() -> Self {
Self {
loaded: AddressTypeToTypeIndexMap::default(),
empty: AddressTypeToTypeIndexMap::default(),
}
}
/// Check if address is in cache (either loaded or empty).
#[inline]
pub fn contains(&self, address_type: OutputType, typeindex: TypeIndex) -> bool {
self.loaded
.get(address_type)
.is_some_and(|m| m.contains_key(&typeindex))
|| self
.empty
.get(address_type)
.is_some_and(|m| m.contains_key(&typeindex))
}
/// Merge address data into loaded cache.
#[inline]
pub fn merge_loaded(&mut self, data: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>) {
self.loaded.merge_mut(data);
}
/// Create an AddressLookup view into this cache.
#[inline]
pub fn as_lookup(&mut self) -> AddressLookup<'_> {
AddressLookup {
loaded: &mut self.loaded,
empty: &mut self.empty,
}
}
/// Update transaction counts for addresses.
pub fn update_tx_counts(&mut self, txindex_vecs: AddressTypeToTypeIndexMap<TxIndexVec>) {
super::update_tx_counts(&mut self.loaded, &mut self.empty, txindex_vecs);
}
/// Take the cache contents for flushing, leaving empty caches.
pub fn take(
&mut self,
) -> (
AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
) {
(
std::mem::take(&mut self.empty),
std::mem::take(&mut self.loaded),
)
}
}

View File

@@ -1,65 +0,0 @@
use brk_error::Result;
use brk_types::{AnyAddressIndex, EmptyAddressData, EmptyAddressIndex, LoadedAddressIndex, OutputType, TypeIndex};
use super::EmptyAddressDataWithSource;
use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs};
/// Process empty address data updates.
///
/// Handles three cases:
/// - New empty address: push to empty storage
/// - Updated empty address (was empty): update in place
/// - Transition loaded -> empty: delete from loaded, push to empty
///
/// Optimized to batch operations: deletes first (creates holes), then updates, then pushes.
pub fn process_empty_addresses(
addresses_data: &mut AddressesDataVecs,
empty_updates: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
) -> Result<AddressTypeToTypeIndexMap<AnyAddressIndex>> {
// Estimate capacity from input size
let total: usize = empty_updates.iter().map(|(_, m)| m.len()).sum();
// Collect operations by type (no sorting needed)
let mut updates: Vec<(EmptyAddressIndex, EmptyAddressData)> = Vec::with_capacity(total);
let mut deletes: Vec<LoadedAddressIndex> = Vec::with_capacity(total);
let mut pushes: Vec<(OutputType, TypeIndex, EmptyAddressData)> = Vec::with_capacity(total);
for (address_type, items) in empty_updates.into_iter() {
for (typeindex, source) in items {
match source {
EmptyAddressDataWithSource::New(data) => {
pushes.push((address_type, typeindex, data));
}
EmptyAddressDataWithSource::FromEmpty(index, data) => {
updates.push((index, data));
}
EmptyAddressDataWithSource::FromLoaded(loaded_index, data) => {
deletes.push(loaded_index);
pushes.push((address_type, typeindex, data));
}
}
}
}
// Phase 1: All deletes (creates holes in loaded vec)
for loaded_index in deletes {
addresses_data.loaded.delete(loaded_index);
}
// Phase 2: All updates (in-place in empty vec)
for (index, data) in updates {
addresses_data.empty.update(index, data)?;
}
// Phase 3: All pushes (fills holes in empty vec, then grows)
let mut result = AddressTypeToTypeIndexMap::default();
for (address_type, typeindex, data) in pushes {
let index = addresses_data.empty.fill_first_hole_or_push(data)?;
result
.get_mut(address_type)
.unwrap()
.insert(typeindex, AnyAddressIndex::from(index));
}
Ok(result)
}

View File

@@ -17,15 +17,13 @@ use crate::stateful::address::{
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
};
use crate::stateful::compute::VecsReaders;
use crate::{
stateful::{IndexerReaders, process::RangeMap},
states::Transacted,
};
use crate::stateful::states::Transacted;
use super::AddressCache;
use crate::stateful::{IndexerReaders, process::RangeMap};
use super::super::address::HeightToAddressTypeToVec;
use super::{
EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec, WithAddressDataSource,
};
use super::{LoadedAddressDataWithSource, TxIndexVec, WithAddressDataSource};
/// Result of processing inputs for a block.
pub struct InputsResult {
@@ -69,8 +67,7 @@ pub fn process_inputs(
ir: &IndexerReaders,
// Address lookup parameters
first_addressindexes: &ByAddressType<TypeIndex>,
loaded_cache: &AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
empty_cache: &AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
cache: &AddressCache,
vr: &VecsReaders,
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
@@ -109,8 +106,7 @@ pub fn process_inputs(
input_type,
typeindex,
first_addressindexes,
loaded_cache,
empty_cache,
cache,
vr,
any_address_indexes,
addresses_data,
@@ -188,8 +184,7 @@ fn get_address_data(
address_type: OutputType,
typeindex: TypeIndex,
first_addressindexes: &ByAddressType<TypeIndex>,
loaded_cache: &AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
empty_cache: &AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
cache: &AddressCache,
vr: &VecsReaders,
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
@@ -201,15 +196,7 @@ fn get_address_data(
}
// Skip if already in cache
if loaded_cache
.get(address_type)
.unwrap()
.contains_key(&typeindex)
|| empty_cache
.get(address_type)
.unwrap()
.contains_key(&typeindex)
{
if cache.contains(address_type, typeindex) {
return None;
}

View File

@@ -1,65 +0,0 @@
use brk_error::Result;
use brk_types::{AnyAddressIndex, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex, OutputType, TypeIndex};
use super::LoadedAddressDataWithSource;
use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs};
/// Process loaded address data updates.
///
/// Handles three cases:
/// - New loaded address: push to loaded storage
/// - Updated loaded address (was loaded): update in place
/// - Transition empty -> loaded: delete from empty, push to loaded
///
/// Optimized to batch operations: deletes first (creates holes), then updates, then pushes.
pub fn process_loaded_addresses(
addresses_data: &mut AddressesDataVecs,
loaded_updates: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
) -> Result<AddressTypeToTypeIndexMap<AnyAddressIndex>> {
// Estimate capacity from input size
let total: usize = loaded_updates.iter().map(|(_, m)| m.len()).sum();
// Collect operations by type (no sorting needed)
let mut updates: Vec<(LoadedAddressIndex, LoadedAddressData)> = Vec::with_capacity(total);
let mut deletes: Vec<EmptyAddressIndex> = Vec::with_capacity(total);
let mut pushes: Vec<(OutputType, TypeIndex, LoadedAddressData)> = Vec::with_capacity(total);
for (address_type, items) in loaded_updates.into_iter() {
for (typeindex, source) in items {
match source {
LoadedAddressDataWithSource::New(data) => {
pushes.push((address_type, typeindex, data));
}
LoadedAddressDataWithSource::FromLoaded(index, data) => {
updates.push((index, data));
}
LoadedAddressDataWithSource::FromEmpty(empty_index, data) => {
deletes.push(empty_index);
pushes.push((address_type, typeindex, data));
}
}
}
}
// Phase 1: All deletes (creates holes in empty vec)
for empty_index in deletes {
addresses_data.empty.delete(empty_index);
}
// Phase 2: All updates (in-place in loaded vec)
for (index, data) in updates {
addresses_data.loaded.update(index, data)?;
}
// Phase 3: All pushes (fills holes in loaded vec, then grows)
let mut result = AddressTypeToTypeIndexMap::default();
for (address_type, typeindex, data) in pushes {
let index = addresses_data.loaded.fill_first_hole_or_push(data)?;
result
.get_mut(address_type)
.unwrap()
.insert(typeindex, AnyAddressIndex::from(index));
}
Ok(result)
}

View File

@@ -1,37 +1,23 @@
//! Address data lookup and source tracking.
//!
//! Handles looking up existing address data from storage and tracking
//! whether addresses are new, from storage, or previously empty.
//! Address data lookup during block processing.
use brk_types::{EmptyAddressData, LoadedAddressData, OutputType, TypeIndex};
use brk_types::{LoadedAddressData, OutputType, TypeIndex};
use super::super::address::AddressTypeToTypeIndexMap;
pub use super::WithAddressDataSource;
/// Loaded address data with source tracking for flush operations.
pub type LoadedAddressDataWithSource = WithAddressDataSource<LoadedAddressData>;
/// Empty address data with source tracking for flush operations.
pub type EmptyAddressDataWithSource = WithAddressDataSource<EmptyAddressData>;
use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, WithAddressDataSource};
/// Context for looking up and storing address data during block processing.
///
/// Uses the same pattern as the original stateful module:
/// All addresses should be pre-fetched into the cache before using this.
/// - `loaded`: addresses with non-zero balance (wrapped with source info)
/// - `empty`: addresses that became empty this block (wrapped with source info)
pub struct AddressLookup<'a, F> {
/// Function to get existing address data from storage
pub get_address_data: F,
pub struct AddressLookup<'a> {
/// Loaded addresses touched in current block
pub loaded: &'a mut AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
/// Empty addresses touched in current block
pub empty: &'a mut AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
}
impl<'a, F> AddressLookup<'a, F>
where
F: FnMut(OutputType, TypeIndex) -> Option<LoadedAddressDataWithSource>,
{
impl<'a> AddressLookup<'a> {
/// Get or create address data for a receive operation.
///
/// Returns (address_data, is_new, from_empty)
@@ -61,26 +47,15 @@ where
return (data, false, true);
}
// Look up from storage or create new
match (self.get_address_data)(output_type, type_index) {
Some(data) => {
let is_new = data.is_new();
let from_empty = data.is_from_emptyaddressdata();
let data = entry.insert(data);
(data, is_new, from_empty)
}
None => {
let data = entry.insert(WithAddressDataSource::New(
LoadedAddressData::default(),
));
(data, true, false)
}
}
// Not found - create new address
let data =
entry.insert(WithAddressDataSource::New(LoadedAddressData::default()));
(data, true, false)
}
}
}
/// Get address data for a send operation (must exist).
/// Get address data for a send operation (must exist in cache).
pub fn get_for_send(
&mut self,
output_type: OutputType,
@@ -89,11 +64,8 @@ where
self.loaded
.get_mut(output_type)
.unwrap()
.entry(type_index)
.or_insert_with(|| {
(self.get_address_data)(output_type, type_index)
.expect("Address must exist for send")
})
.get_mut(&type_index)
.expect("Address must exist for send")
}
/// Move address from loaded to empty set.

View File

@@ -1,7 +1,7 @@
mod address_lookup;
mod empty_addresses;
mod address_updates;
mod cache;
mod inputs;
mod loaded_addresses;
mod lookup;
mod outputs;
mod range_map;
mod received;
@@ -9,10 +9,10 @@ mod sent;
mod tx_counts;
mod with_source;
pub use address_lookup::*;
pub use empty_addresses::*;
pub use address_updates::*;
pub use cache::*;
pub use inputs::*;
pub use loaded_addresses::*;
pub use lookup::*;
pub use outputs::*;
pub use range_map::*;
pub use received::*;

View File

@@ -5,23 +5,17 @@
//! - Address data for address cohort tracking (optional)
use brk_grouper::ByAddressType;
use brk_types::{
AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TypeIndex,
};
use smallvec::SmallVec;
use brk_types::{AnyAddressDataIndexEnum, LoadedAddressData, OutputType, Sats, TxIndex, TypeIndex};
use vecdb::GenericStoredVec;
use crate::stateful::address::{
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
};
use crate::stateful::compute::VecsReaders;
use crate::states::Transacted;
use crate::stateful::states::Transacted;
use super::super::address::AddressTypeToVec;
use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, WithAddressDataSource};
/// SmallVec for transaction indexes - most addresses have few transactions per block.
pub type TxIndexVec = SmallVec<[TxIndex; 4]>;
use super::{AddressCache, LoadedAddressDataWithSource, TxIndexVec, WithAddressDataSource};
/// Result of processing outputs for a block.
pub struct OutputsResult {
@@ -52,8 +46,7 @@ pub fn process_outputs(
typeindexes: &[TypeIndex],
// Address lookup parameters
first_addressindexes: &ByAddressType<TypeIndex>,
loaded_cache: &AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
empty_cache: &AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
cache: &AddressCache,
vr: &VecsReaders,
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
@@ -90,8 +83,7 @@ pub fn process_outputs(
output_type,
typeindex,
first_addressindexes,
loaded_cache,
empty_cache,
cache,
vr,
any_address_indexes,
addresses_data,
@@ -125,8 +117,7 @@ fn get_address_data(
address_type: OutputType,
typeindex: TypeIndex,
first_addressindexes: &ByAddressType<TypeIndex>,
loaded_cache: &AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
empty_cache: &AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
cache: &AddressCache,
vr: &VecsReaders,
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
@@ -138,15 +129,7 @@ fn get_address_data(
}
// Skip if already in cache
if loaded_cache
.get(address_type)
.unwrap()
.contains_key(&typeindex)
|| empty_cache
.get(address_type)
.unwrap()
.contains_key(&typeindex)
{
if cache.contains(address_type, typeindex) {
return None;
}

View File

@@ -1,6 +1,6 @@
//! Main block processing loop.
//! Range-based lookup map.
//!
//! Iterates through blocks, processing outputs (receive) and inputs (send) in parallel.
//! Maps ranges of indices to values for efficient reverse lookups.
use std::collections::BTreeMap;

View File

@@ -6,11 +6,11 @@
//! - Empty addresses become non-empty again
use brk_grouper::{ByAddressType, Filtered};
use brk_types::{Dollars, OutputType, Sats, TypeIndex};
use brk_types::{Dollars, Sats, TypeIndex};
use super::super::address::AddressTypeToVec;
use super::super::cohorts::AddressCohorts;
use super::address_lookup::{AddressLookup, LoadedAddressDataWithSource};
use super::lookup::AddressLookup;
/// Process received outputs for address cohorts.
///
@@ -18,16 +18,14 @@ use super::address_lookup::{AddressLookup, LoadedAddressDataWithSource};
/// 1. Look up or create address data
/// 2. Update address balance and cohort membership
/// 3. Update cohort states (add/subtract for boundary crossings, receive otherwise)
pub fn process_received<F>(
pub fn process_received(
received_data: AddressTypeToVec<(TypeIndex, Sats)>,
cohorts: &mut AddressCohorts,
lookup: &mut AddressLookup<F>,
lookup: &mut AddressLookup<'_>,
price: Option<Dollars>,
addr_count: &mut ByAddressType<u64>,
empty_addr_count: &mut ByAddressType<u64>,
) where
F: FnMut(OutputType, TypeIndex) -> Option<LoadedAddressDataWithSource>,
{
) {
for (output_type, vec) in received_data.unwrap().into_iter() {
if vec.is_empty() {
continue;

View File

@@ -7,12 +7,12 @@
use brk_error::Result;
use brk_grouper::{ByAddressType, Filtered};
use brk_types::{CheckedSub, Dollars, Height, OutputType, Sats, Timestamp, TypeIndex};
use brk_types::{CheckedSub, Dollars, Height, Sats, Timestamp, TypeIndex};
use vecdb::VecIndex;
use super::super::address::HeightToAddressTypeToVec;
use super::super::cohorts::AddressCohorts;
use super::address_lookup::{AddressLookup, LoadedAddressDataWithSource};
use super::lookup::AddressLookup;
/// Process sent outputs for address cohorts.
///
@@ -25,10 +25,10 @@ use super::address_lookup::{AddressLookup, LoadedAddressDataWithSource};
/// Note: Takes separate price/timestamp slices instead of chain_state to allow
/// parallel execution with UTXO cohort processing (which mutates chain_state).
#[allow(clippy::too_many_arguments)]
pub fn process_sent<F>(
pub fn process_sent(
sent_data: HeightToAddressTypeToVec<(TypeIndex, Sats)>,
cohorts: &mut AddressCohorts,
lookup: &mut AddressLookup<F>,
lookup: &mut AddressLookup<'_>,
current_price: Option<Dollars>,
addr_count: &mut ByAddressType<u64>,
empty_addr_count: &mut ByAddressType<u64>,
@@ -36,10 +36,7 @@ pub fn process_sent<F>(
height_to_timestamp: &[Timestamp],
current_height: Height,
current_timestamp: Timestamp,
) -> Result<()>
where
F: FnMut(OutputType, TypeIndex) -> Option<LoadedAddressDataWithSource>,
{
) -> Result<()> {
for (prev_height, by_type) in sent_data.into_iter() {
let prev_price = height_to_price.map(|v| v[prev_height.to_usize()]);
let prev_timestamp = height_to_timestamp[prev_height.to_usize()];

View File

@@ -1,12 +1,22 @@
//! Address data wrapper that tracks its source for flush operations.
//! Address data types with source tracking for flush operations.
use brk_types::{EmptyAddressData, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex};
use brk_types::{EmptyAddressData, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex, TxIndex};
use smallvec::SmallVec;
/// Loaded address data with source tracking for flush operations.
pub type LoadedAddressDataWithSource = WithAddressDataSource<LoadedAddressData>;
/// Empty address data with source tracking for flush operations.
pub type EmptyAddressDataWithSource = WithAddressDataSource<EmptyAddressData>;
/// SmallVec for transaction indexes - most addresses have few transactions per block.
pub type TxIndexVec = SmallVec<[TxIndex; 4]>;
/// Address data wrapped with its source location for flush operations.
///
/// This enum tracks where the data came from so it can be correctly
/// updated or created during the flush phase.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum WithAddressDataSource<T> {
/// Brand new address (never seen before)
New(T),

View File

@@ -3,7 +3,7 @@ use std::path::Path;
use brk_error::Result;
use brk_types::{Dollars, Height, LoadedAddressData, Sats};
use crate::SupplyState;
use crate::stateful::states::{RealizedState, SupplyState};
use super::CohortState;
@@ -24,12 +24,12 @@ impl AddressCohortState {
/// Reset state for fresh start.
pub fn reset(&mut self) {
self.addr_count = 0;
self.inner.supply = crate::SupplyState::default();
self.inner.supply = SupplyState::default();
self.inner.sent = Sats::ZERO;
self.inner.satblocks_destroyed = Sats::ZERO;
self.inner.satdays_destroyed = Sats::ZERO;
if let Some(realized) = self.inner.realized.as_mut() {
*realized = crate::RealizedState::NAN;
*realized = RealizedState::NAN;
}
}

View File

@@ -7,11 +7,9 @@ use std::path::Path;
use brk_error::Result;
use brk_types::{Dollars, Height, Sats};
use crate::{
CachedUnrealizedState, PriceToAmount, RealizedState, SupplyState, UnrealizedState,
grouped::PERCENTILES_LEN,
utils::OptionExt,
};
use crate::{grouped::PERCENTILES_LEN, utils::OptionExt};
use super::{CachedUnrealizedState, PriceToAmount, RealizedState, SupplyState, UnrealizedState};
/// State tracked for each cohort during computation.
#[derive(Clone)]

View File

@@ -1,20 +1,22 @@
mod address_cohort;
mod block;
// mod cohorts;
mod cohort;
mod fenwick;
mod flushable;
mod price_buckets;
mod price_to_amount;
mod realized;
mod supply;
mod transacted;
mod unrealized;
mod utxo_cohort;
pub use address_cohort::*;
pub use block::*;
// pub use cohorts::*;
pub use flushable::*;
pub use cohort::*;
pub use price_buckets::*;
pub use price_to_amount::*;
pub use realized::*;
pub use supply::*;
pub use transacted::*;
pub use unrealized::*;
pub use utxo_cohort::*;

View File

@@ -11,9 +11,9 @@ use pco::standalone::{simple_decompress, simpler_compress};
use serde::{Deserialize, Serialize};
use vecdb::Bytes;
use crate::{grouped::PERCENTILES_LEN, states::SupplyState, utils::OptionExt};
use crate::{grouped::PERCENTILES_LEN, utils::OptionExt};
use super::PriceBuckets;
use super::{PriceBuckets, SupplyState};
#[derive(Clone, Debug)]
pub struct PriceToAmount {

View File

@@ -3,7 +3,7 @@ use std::ops::Bound;
use brk_types::{Dollars, Sats};
use vecdb::CheckedSub;
use crate::PriceToAmount;
use super::PriceToAmount;
#[derive(Debug, Default, Clone)]
pub struct UnrealizedState {

View File

@@ -4,8 +4,7 @@ use brk_error::Result;
use brk_types::Sats;
use derive_deref::{Deref, DerefMut};
use super::CohortState;
use crate::{RealizedState, SupplyState};
use super::{CohortState, RealizedState, SupplyState};
#[derive(Clone, Deref, DerefMut)]
pub struct UTXOCohortState(CohortState);

View File

@@ -3,24 +3,34 @@
use std::path::Path;
use brk_error::Result;
use log::info;
use brk_indexer::Indexer;
use brk_traversable::Traversable;
use brk_types::{Dollars, EmptyAddressData, EmptyAddressIndex, Height, LoadedAddressData, LoadedAddressIndex, Sats, StoredU64, TxInIndex, TxOutIndex, Version};
use brk_types::{
Dollars, EmptyAddressData, EmptyAddressIndex, Height, LoadedAddressData, LoadedAddressIndex,
Sats, StoredU64, TxInIndex, TxOutIndex, Version,
};
use log::info;
use vecdb::{
AnyStoredVec, BytesVec, Database, EagerVec, Exit, ImportableVec, IterableCloneableVec,
LazyVecFrom1, PAGE_SIZE, PcoVec,
AnyStoredVec, AnyVec, BytesVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec,
IterableCloneableVec, LazyVecFrom1, PAGE_SIZE, PcoVec, Stamp, TypedVecIterator, VecIndex,
};
use crate::{
Indexes, SupplyState, chain,
grouped::{ComputedValueVecsFromHeight, ComputedVecsFromDateIndex, ComputedVecsFromHeight, Source, VecBuilderOptions},
Indexes, chain,
grouped::{
ComputedValueVecsFromHeight, ComputedVecsFromDateIndex, ComputedVecsFromHeight, Source,
VecBuilderOptions,
},
indexes, price,
stateful::{
compute::{StartMode, determine_start_mode, process_blocks, recover_state, reset_state},
states::BlockState,
},
utils::OptionExt,
};
use super::{
AddressCohorts, AddressesDataVecs, AnyAddressIndexesVecs, UTXOCohorts,
AddressCohorts, AddressesDataVecs, AnyAddressIndexesVecs, SupplyState, UTXOCohorts,
address::{AddressTypeToHeightToAddressCount, AddressTypeToIndexesToAddressCount},
compute::aggregates,
};
@@ -245,12 +255,6 @@ impl Vecs {
starting_indexes: &mut Indexes,
exit: &Exit,
) -> Result<()> {
use super::compute::{
StartMode, determine_start_mode, process_blocks, recover_state, reset_state,
};
use crate::states::BlockState;
use vecdb::{AnyVec, GenericStoredVec, Stamp, TypedVecIterator, VecIndex};
// 1. Find minimum computed height for recovery
let chain_state_height = Height::from(self.chain_state.len());
@@ -345,7 +349,8 @@ impl Vecs {
// 2b. Validate computed versions
let base_version = VERSION;
self.utxo_cohorts.validate_computed_versions(base_version)?;
self.address_cohorts.validate_computed_versions(base_version)?;
self.address_cohorts
.validate_computed_versions(base_version)?;
// 3. Get last height from indexer
let last_height = Height::from(

View File

@@ -1,26 +0,0 @@
//! Traits for consistent state flushing and importing.
//!
//! These traits ensure all stateful components follow the same patterns
//! for checkpoint/resume operations, preventing bugs where new fields
//! are forgotten during flush operations.
use brk_error::Result;
use vecdb::Exit;
/// Trait for components that can be flushed to disk.
///
/// This is for simple flush operations that don't require height tracking.
pub trait Flushable {
/// Safely flush data to disk with fsync for durability.
fn safe_flush(&mut self, exit: &Exit) -> Result<()>;
}
/// Blanket implementation for Option<T> where T: Flushable
impl<T: Flushable> Flushable for Option<T> {
fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
if let Some(inner) = self.as_mut() {
inner.safe_flush(exit)?;
}
Ok(())
}
}