computer: trying the new stateful

This commit is contained in:
nym21
2025-12-16 21:59:13 +01:00
parent 1ad8d8a631
commit 4b2ada14a0
74 changed files with 1896 additions and 1841 deletions

View File

@@ -68,25 +68,61 @@ impl Vecs {
macro_rules! computed_h {
($name:expr, $opts:expr) => {
ComputedVecsFromHeight::forced_import(&db, $name, Source::Compute, v0, indexes, $opts)?
ComputedVecsFromHeight::forced_import(
&db,
$name,
Source::Compute,
v0,
indexes,
$opts,
)?
};
($name:expr, $v:expr, $opts:expr) => {
ComputedVecsFromHeight::forced_import(&db, $name, Source::Compute, $v, indexes, $opts)?
ComputedVecsFromHeight::forced_import(
&db,
$name,
Source::Compute,
$v,
indexes,
$opts,
)?
};
}
macro_rules! computed_di {
($name:expr, $opts:expr) => {
ComputedVecsFromDateIndex::forced_import(&db, $name, Source::Compute, v0, indexes, $opts)?
ComputedVecsFromDateIndex::forced_import(
&db,
$name,
Source::Compute,
v0,
indexes,
$opts,
)?
};
}
macro_rules! ratio_di {
($name:expr) => {
ComputedRatioVecsFromDateIndex::forced_import(&db, $name, Source::None, v0, indexes, true)?
ComputedRatioVecsFromDateIndex::forced_import(
&db,
$name,
Source::None,
v0,
indexes,
true,
)?
};
}
macro_rules! value_h {
($name:expr) => {
ComputedValueVecsFromHeight::forced_import(&db, $name, Source::Compute, v1, last(), compute_dollars, indexes)?
ComputedValueVecsFromHeight::forced_import(
&db,
$name,
Source::Compute,
v1,
last(),
compute_dollars,
indexes,
)?
};
}
@@ -95,7 +131,10 @@ impl Vecs {
indexes_to_coinblocks_stored: computed_h!("coinblocks_stored", sum_cum()),
indexes_to_liveliness: computed_h!("liveliness", last()),
indexes_to_vaultedness: computed_h!("vaultedness", last()),
indexes_to_activity_to_vaultedness_ratio: computed_h!("activity_to_vaultedness_ratio", last()),
indexes_to_activity_to_vaultedness_ratio: computed_h!(
"activity_to_vaultedness_ratio",
last()
),
indexes_to_vaulted_supply: value_h!("vaulted_supply"),
indexes_to_active_supply: value_h!("active_supply"),
indexes_to_thermo_cap: computed_h!("thermo_cap", v1, last()),
@@ -114,9 +153,18 @@ impl Vecs {
indexes_to_cointime_price: computed_h!("cointime_price", last()),
indexes_to_cointime_cap: computed_h!("cointime_cap", last()),
indexes_to_cointime_price_ratio: ratio_di!("cointime_price"),
indexes_to_cointime_adj_inflation_rate: computed_di!("cointime_adj_inflation_rate", last()),
indexes_to_cointime_adj_tx_btc_velocity: computed_di!("cointime_adj_tx_btc_velocity", last()),
indexes_to_cointime_adj_tx_usd_velocity: computed_di!("cointime_adj_tx_usd_velocity", last()),
indexes_to_cointime_adj_inflation_rate: computed_di!(
"cointime_adj_inflation_rate",
last()
),
indexes_to_cointime_adj_tx_btc_velocity: computed_di!(
"cointime_adj_tx_btc_velocity",
last()
),
indexes_to_cointime_adj_tx_usd_velocity: computed_di!(
"cointime_adj_tx_usd_velocity",
last()
),
db,
};
@@ -157,7 +205,7 @@ impl Vecs {
stateful: &stateful::Vecs,
exit: &Exit,
) -> Result<()> {
let circulating_supply = &stateful.utxo_cohorts.all.inner.height_to_supply;
let circulating_supply = &stateful.utxo_cohorts.all.metrics.supply.height_to_supply;
self.indexes_to_coinblocks_created
.compute_all(indexes, starting_indexes, exit, |vec| {
@@ -170,8 +218,12 @@ impl Vecs {
Ok(())
})?;
let indexes_to_coinblocks_destroyed =
&stateful.utxo_cohorts.all.inner.indexes_to_coinblocks_destroyed;
let indexes_to_coinblocks_destroyed = &stateful
.utxo_cohorts
.all
.metrics
.activity
.indexes_to_coinblocks_destroyed;
self.indexes_to_coinblocks_stored
.compute_all(indexes, starting_indexes, exit, |vec| {
@@ -294,24 +346,22 @@ impl Vecs {
})?;
if let Some(price) = price {
let realized_cap = stateful
let realized_cap = &stateful
.utxo_cohorts
.all
.inner
.height_to_realized_cap
.as_ref()
.unwrap();
.metrics
.realized
.u()
.height_to_realized_cap;
let realized_price = stateful
.utxo_cohorts
.all
.inner
.metrics
.realized
.u()
.indexes_to_realized_price
.as_ref()
.unwrap()
.height
.as_ref()
.unwrap();
.u();
self.indexes_to_thermo_cap
.compute_all(indexes, starting_indexes, exit, |vec| {

View File

@@ -83,7 +83,6 @@ impl PricePercentiles {
.position(|&p| p == percentile)
.and_then(|i| self.vecs[i].as_ref())
}
}
impl Flushable for PricePercentiles {

View File

@@ -22,7 +22,7 @@ mod market;
mod pools;
mod price;
mod stateful;
// mod stateful_new;
// mod stateful_old;
mod states;
mod traits;
mod utils;

View File

@@ -163,7 +163,7 @@ impl DynCohortVecs for AddressCohortVecs {
}
fn truncate_push(&mut self, height: Height) -> Result<()> {
if self.starting_height.map_or(false, |h| h > height) {
if self.starting_height.is_some_and(|h| h > height) {
return Ok(());
}

View File

@@ -13,7 +13,7 @@ use derive_deref::{Deref, DerefMut};
use rayon::prelude::*;
use vecdb::{Database, Exit, IterableVec};
use crate::{Indexes, indexes, price, stateful_new::DynCohortVecs};
use crate::{Indexes, indexes, price, stateful::DynCohortVecs};
use super::{AddressCohortVecs, CohortVecs};
@@ -255,12 +255,11 @@ impl AddressCohorts {
/// Reset price_to_amount for all separate cohorts (called during fresh start).
pub fn reset_separate_price_to_amount(&mut self) -> Result<()> {
self.par_iter_separate_mut()
.try_for_each(|v| {
if let Some(state) = v.state.as_mut() {
state.reset_price_to_amount_if_needed()?;
}
Ok(())
})
self.par_iter_separate_mut().try_for_each(|v| {
if let Some(state) = v.state.as_mut() {
state.reset_price_to_amount_if_needed()?;
}
Ok(())
})
}
}

View File

@@ -80,39 +80,36 @@ impl CohortState {
pub fn increment(&mut self, supply: &SupplyState, price: Option<Dollars>) {
self.supply += supply;
if supply.value > Sats::ZERO {
if let Some(realized) = self.realized.as_mut() {
if supply.value > Sats::ZERO
&& let Some(realized) = self.realized.as_mut() {
let price = price.unwrap();
realized.increment(supply, price);
self.price_to_amount.as_mut().unwrap().increment(price, supply);
}
}
}
/// Remove supply from this cohort (e.g., when UTXO ages out of cohort).
pub fn decrement(&mut self, supply: &SupplyState, price: Option<Dollars>) {
self.supply -= supply;
if supply.value > Sats::ZERO {
if let Some(realized) = self.realized.as_mut() {
if supply.value > Sats::ZERO
&& let Some(realized) = self.realized.as_mut() {
let price = price.unwrap();
realized.decrement(supply, price);
self.price_to_amount.as_mut().unwrap().decrement(price, supply);
}
}
}
/// Process received output (new UTXO in cohort).
pub fn receive(&mut self, supply: &SupplyState, price: Option<Dollars>) {
self.supply += supply;
if supply.value > Sats::ZERO {
if let Some(realized) = self.realized.as_mut() {
if supply.value > Sats::ZERO
&& let Some(realized) = self.realized.as_mut() {
let price = price.unwrap();
realized.receive(supply, price);
self.price_to_amount.as_mut().unwrap().increment(price, supply);
}
}
}
/// Process spent input (UTXO leaving cohort).

View File

@@ -12,7 +12,7 @@ use crate::{
Indexes, PriceToAmount, UTXOCohortState,
grouped::{PERCENTILES, PERCENTILES_LEN},
indexes, price,
stateful_new::{CohortVecs, DynCohortVecs},
stateful::{CohortVecs, DynCohortVecs},
};
use super::super::metrics::{CohortMetrics, ImportConfig};
@@ -169,13 +169,13 @@ impl DynCohortVecs for UTXOCohortVecs {
}
fn truncate_push(&mut self, height: Height) -> Result<()> {
if self.state_starting_height.map_or(false, |h| h > height) {
if self.state_starting_height.is_some_and(|h| h > height) {
return Ok(());
}
// Push from state to metrics
if let Some(state) = self.state.as_ref() {
self.metrics.truncate_push(height, &state)?;
self.metrics.truncate_push(height, state)?;
}
Ok(())

View File

@@ -18,7 +18,7 @@ use derive_deref::{Deref, DerefMut};
use rayon::prelude::*;
use vecdb::{Database, Exit, GenericStoredVec, IterableVec};
use crate::{Indexes, indexes, price, stateful_new::DynCohortVecs};
use crate::{Indexes, indexes, price, stateful::DynCohortVecs};
use super::{CohortVecs, HeightFlushable, UTXOCohortVecs};
@@ -398,13 +398,12 @@ impl UTXOCohorts {
/// Reset price_to_amount for all separate cohorts (called during fresh start).
pub fn reset_separate_price_to_amount(&mut self) -> Result<()> {
self.par_iter_separate_mut()
.try_for_each(|v| {
if let Some(state) = v.state.as_mut() {
state.reset_price_to_amount_if_needed()?;
}
Ok(())
})
self.par_iter_separate_mut().try_for_each(|v| {
if let Some(state) = v.state.as_mut() {
state.reset_price_to_amount_if_needed()?;
}
Ok(())
})
}
/// Compute and push percentiles for aggregate cohorts (all, sth, lth).
@@ -415,7 +414,15 @@ impl UTXOCohorts {
.0
.age_range
.iter()
.map(|sub| (sub.filter().clone(), sub.state.as_ref().map(|s| s.supply.value).unwrap_or(Sats::ZERO)))
.map(|sub| {
(
sub.filter().clone(),
sub.state
.as_ref()
.map(|s| s.supply.value)
.unwrap_or(Sats::ZERO),
)
})
.collect();
// Compute percentiles for each aggregate cohort in parallel
@@ -423,9 +430,7 @@ impl UTXOCohorts {
.0
.par_iter_aggregate()
.filter_map(|v| {
if v.price_to_amount.is_none() {
return None;
}
v.price_to_amount.as_ref()?;
let filter = v.filter().clone();
let supply = age_range_data
.iter()
@@ -445,7 +450,12 @@ impl UTXOCohorts {
.find(|v| v.filter() == &filter)
.unwrap();
if let Some(pp) = v.metrics.price_paid.as_mut().and_then(|p| p.price_percentiles.as_mut()) {
if let Some(pp) = v
.metrics
.price_paid
.as_mut()
.and_then(|p| p.price_percentiles.as_mut())
{
pp.truncate_push(height, &percentiles)?;
}
}

View File

@@ -14,7 +14,7 @@ use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_indexer::Indexer;
use brk_types::{
DateIndex, Dollars, Height, OutputType, Sats, Timestamp, TxInIndex, TxOutIndex, TypeIndex,
DateIndex, Dollars, Height, OutputType, Sats, Timestamp, TypeIndex,
};
use log::info;
use rayon::prelude::*;
@@ -33,8 +33,8 @@ use super::{
build_txinindex_to_txindex, build_txoutindex_to_txindex,
flush::flush_checkpoint as flush_checkpoint_full,
};
use crate::stateful_new::address::AddressTypeToAddressCount;
use crate::stateful_new::process::{
use crate::stateful::address::AddressTypeToAddressCount;
use crate::stateful::process::{
AddressLookup, EmptyAddressDataWithSource, InputsResult, LoadedAddressDataWithSource,
build_txoutindex_to_height_map, process_inputs, process_outputs, process_received,
process_sent, update_tx_counts,

View File

@@ -8,7 +8,7 @@ use brk_types::{AnyAddressIndex, Height};
use log::info;
use vecdb::{Exit, Stamp};
use crate::stateful_new::process::{
use crate::stateful::process::{
EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses,
process_loaded_addresses,
};

View File

@@ -14,16 +14,13 @@ mod flush;
mod readers;
mod recover;
pub use aggregates::{compute_overlapping, compute_rest_part1, compute_rest_part2};
pub use block_loop::process_blocks;
pub use context::ComputeContext;
pub use flush::{flush_checkpoint, flush_cohort_states};
pub use readers::{
IndexerReaders, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex,
};
pub use recover::{
RecoveredState, StartMode, determine_start_mode, find_min_height,
import_aggregate_price_to_amount, import_cohort_states, reset_all_state, rollback_states,
StartMode, determine_start_mode,
};
/// Flush checkpoint interval (every N blocks).

View File

@@ -7,7 +7,7 @@ use brk_indexer::Indexer;
use brk_types::{OutputType, StoredU64, TxIndex};
use vecdb::{BoxedVecIterator, GenericStoredVec, Reader, VecIndex};
use crate::stateful_new::address::{AddressesDataVecs, AnyAddressIndexesVecs};
use crate::stateful::address::{AddressesDataVecs, AnyAddressIndexesVecs};
/// Cached readers for indexer vectors.
pub struct IndexerReaders {
@@ -22,7 +22,11 @@ impl IndexerReaders {
pub fn new(indexer: &Indexer) -> Self {
Self {
txinindex_to_outpoint: indexer.vecs.txin.txinindex_to_outpoint.create_reader(),
txindex_to_first_txoutindex: indexer.vecs.tx.txindex_to_first_txoutindex.create_reader(),
txindex_to_first_txoutindex: indexer
.vecs
.tx
.txindex_to_first_txoutindex
.create_reader(),
txoutindex_to_value: indexer.vecs.txout.txoutindex_to_value.create_reader(),
txoutindex_to_outputtype: indexer.vecs.txout.txoutindex_to_outputtype.create_reader(),
txoutindex_to_typeindex: indexer.vecs.txout.txoutindex_to_typeindex.create_reader(),
@@ -86,7 +90,7 @@ pub fn build_txoutindex_to_txindex<'a>(
for (offset, &count) in counts.iter().enumerate() {
let txindex = TxIndex::from(first + offset);
result.extend(std::iter::repeat(txindex).take(count as usize));
result.extend(std::iter::repeat_n(txindex, count as usize));
}
result

View File

@@ -75,7 +75,7 @@ impl ActivityMetrics {
Source::Compute,
cfg.version + v0,
cfg.indexes,
sum.clone(),
sum,
)?,
indexes_to_coindays_destroyed: ComputedVecsFromHeight::forced_import(

View File

@@ -469,8 +469,8 @@ impl RelativeMetrics {
starting_indexes,
exit,
|v| {
if let Some(dateindex_vec) = unrealized.indexes_to_supply_in_profit.bitcoin.dateindex.as_ref() {
if let Some(supply_dateindex) = supply.indexes_to_supply.bitcoin.dateindex.as_ref() {
if let Some(dateindex_vec) = unrealized.indexes_to_supply_in_profit.bitcoin.dateindex.as_ref()
&& let Some(supply_dateindex) = supply.indexes_to_supply.bitcoin.dateindex.as_ref() {
v.compute_percentage(
starting_indexes.dateindex,
dateindex_vec,
@@ -478,7 +478,6 @@ impl RelativeMetrics {
exit,
)?;
}
}
Ok(())
},
)?;
@@ -487,8 +486,8 @@ impl RelativeMetrics {
starting_indexes,
exit,
|v| {
if let Some(dateindex_vec) = unrealized.indexes_to_supply_in_loss.bitcoin.dateindex.as_ref() {
if let Some(supply_dateindex) = supply.indexes_to_supply.bitcoin.dateindex.as_ref() {
if let Some(dateindex_vec) = unrealized.indexes_to_supply_in_loss.bitcoin.dateindex.as_ref()
&& let Some(supply_dateindex) = supply.indexes_to_supply.bitcoin.dateindex.as_ref() {
v.compute_percentage(
starting_indexes.dateindex,
dateindex_vec,
@@ -496,7 +495,6 @@ impl RelativeMetrics {
exit,
)?;
}
}
Ok(())
},
)?;
@@ -554,8 +552,8 @@ impl RelativeMetrics {
)?;
}
if let Some(dateindex_to_mc) = dateindex_to_market_cap {
if let Some(unrealized) = unrealized {
if let Some(dateindex_to_mc) = dateindex_to_market_cap
&& let Some(unrealized) = unrealized {
self.indexes_to_unrealized_profit_rel_to_market_cap.compute_all(
starting_indexes,
exit,
@@ -583,7 +581,6 @@ impl RelativeMetrics {
},
)?;
}
}
// TODO: Remaining relative metrics to implement:
// - indexes_to_supply_in_profit/loss_rel_to_circulating_supply

View File

@@ -237,12 +237,12 @@ impl SupplyMetrics {
&mut self,
indexes: &indexes::Vecs,
price: Option<&price::Vecs>,
starting_indexes: &Indexes,
_starting_indexes: &Indexes,
height_to_supply: &impl IterableVec<Height, Bitcoin>,
_dateindex_to_supply: &impl IterableVec<DateIndex, Bitcoin>,
height_to_market_cap: Option<&impl IterableVec<Height, Dollars>>,
dateindex_to_market_cap: Option<&impl IterableVec<DateIndex, Dollars>>,
exit: &Exit,
_exit: &Exit,
) -> Result<()> {
let _ = (indexes, price, height_to_supply, height_to_market_cap, dateindex_to_market_cap);

File diff suppressed because it is too large Load Diff

View File

@@ -2,7 +2,7 @@ use brk_error::Result;
use brk_types::AnyAddressIndex;
use super::EmptyAddressDataWithSource;
use crate::stateful_new::{AddressTypeToTypeIndexMap, AddressesDataVecs};
use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs};
/// Process empty address data updates.
///

View File

@@ -13,15 +13,19 @@ use rayon::prelude::*;
use rustc_hash::FxHashMap;
use vecdb::{BytesVec, GenericStoredVec, PcoVec};
use crate::stateful_new::address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs};
use crate::stateful_new::compute::VecsReaders;
use crate::stateful::address::{
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
};
use crate::stateful::compute::VecsReaders;
use crate::{
stateful_new::{IndexerReaders, process::RangeMap},
stateful::{IndexerReaders, process::RangeMap},
states::Transacted,
};
use super::super::address::HeightToAddressTypeToVec;
use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec, WithAddressDataSource};
use super::{
EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec, WithAddressDataSource,
};
/// Result of processing inputs for a block.
pub struct InputsResult {
@@ -202,6 +206,7 @@ pub fn process_inputs(
/// Look up address data from storage or determine if new.
///
/// Returns None if address is already in loaded or empty cache.
#[allow(clippy::too_many_arguments)]
fn get_address_data(
address_type: OutputType,
typeindex: TypeIndex,

View File

@@ -2,7 +2,7 @@ use brk_error::Result;
use brk_types::AnyAddressIndex;
use super::LoadedAddressDataWithSource;
use crate::stateful_new::{AddressTypeToTypeIndexMap, AddressesDataVecs};
use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs};
/// Process loaded address data updates.
///

View File

@@ -12,9 +12,11 @@ use rayon::prelude::*;
use smallvec::SmallVec;
use vecdb::{BytesVec, GenericStoredVec};
use crate::stateful_new::address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs};
use crate::stateful_new::compute::VecsReaders;
use crate::{stateful_new::IndexerReaders, states::Transacted};
use crate::stateful::address::{
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
};
use crate::stateful::compute::VecsReaders;
use crate::{stateful::IndexerReaders, states::Transacted};
use super::super::address::AddressTypeToVec;
use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, WithAddressDataSource};
@@ -90,7 +92,11 @@ pub fn process_outputs(
addresses_data,
);
(value, output_type, Some((typeindex, txindex, value, addr_data_opt)))
(
value,
output_type,
Some((typeindex, txindex, value, addr_data_opt)),
)
})
.fold(
|| {
@@ -151,6 +157,7 @@ pub fn process_outputs(
/// Look up address data from storage or determine if new.
///
/// Returns None if address is already in loaded or empty cache.
#[allow(clippy::too_many_arguments)]
fn get_address_data(
address_type: OutputType,
typeindex: TypeIndex,

View File

@@ -2,7 +2,7 @@
//!
//! Updates tx_count on address data after deduplicating transaction indexes.
use crate::stateful_new::address::AddressTypeToTypeIndexMap;
use crate::stateful::address::AddressTypeToTypeIndexMap;
use super::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec};

View File

@@ -1,57 +0,0 @@
//! Stateful computation for Bitcoin UTXO and address cohort metrics.
//!
//! This module processes blockchain data to compute metrics for various cohorts
//! (groups of UTXOs or addresses filtered by age, amount, type, etc.).
//!
//! ## Module Structure
//!
//! ```text
//! stateful/
//! ├── address/ # Address type handling (indexes, data storage)
//! ├── cohorts/ # Cohort traits and state management
//! ├── compute/ # Block processing pipeline
//! └── metrics/ # Metric vectors organized by category
//! ```
//!
//! ## Data Flow
//!
//! 1. **Import**: Load from checkpoint or start fresh
//! 2. **Process blocks**: For each block, process outputs/inputs in parallel
//! 3. **Update cohorts**: Track supply, realized/unrealized P&L per cohort
//! 4. **Flush**: Periodically checkpoint state to disk
//! 5. **Compute aggregates**: Derive aggregate cohorts from separate cohorts
pub mod address;
pub mod cohorts;
pub mod compute;
pub mod metrics;
mod process;
mod vecs;
use process::*;
pub use vecs::Vecs;
// Address re-exports
pub use address::{
AddressTypeToTypeIndexMap, AddressTypeToVec, AddressesDataVecs, AnyAddressIndexesVecs,
HeightToAddressTypeToVec,
};
// Cohort re-exports
pub use cohorts::{
AddressCohortVecs, AddressCohorts, CohortState, CohortVecs, DynCohortVecs, Flushable,
HeightFlushable, UTXOCohortVecs, UTXOCohorts,
};
// Compute re-exports
pub use compute::{
BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1,
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, VecsReaders,
};
// Metrics re-exports
pub use metrics::{
ActivityMetrics, CohortMetrics, ImportConfig, PricePaidMetrics, RealizedMetrics,
RelativeMetrics, SupplyMetrics, UnrealizedMetrics,
};

File diff suppressed because it is too large Load Diff