global: snapshot

This commit is contained in:
nym21
2026-01-27 00:30:58 +01:00
parent 3d01822d27
commit ec1f2de5cf
41 changed files with 2712 additions and 5334 deletions

View File

@@ -0,0 +1,375 @@
//! Address activity tracking - per-block counts of address behaviors.
//!
//! Tracks global and per-address-type activity metrics:
//!
//! | Metric | Description |
//! |--------|-------------|
//! | `receiving` | Unique addresses that received this block |
//! | `sending` | Unique addresses that sent this block |
//! | `reactivated` | Addresses that were empty and now have funds |
//! | `both` | Addresses that both sent AND received same block |
//! | `balance_increased` | Receive-only addresses (balance definitely increased) |
//! | `balance_decreased` | Send-only addresses (balance definitely decreased) |
//!
//! Note: `balance_increased` and `balance_decreased` exclude "both" addresses
//! since their net balance change requires more complex tracking.
use brk_cohort::ByAddressType;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, StoredU32, Version};
use derive_more::{Deref, DerefMut};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, AnyVec, Database, Exit, GenericStoredVec};
use crate::{ComputeIndexes, indexes, internal::ComputedFromHeightDistribution};
/// Per-block activity counts - reset each block.
///
/// Note: `balance_increased` and `balance_decreased` are derived:
/// - `balance_increased = receiving - both` (receive-only addresses)
/// - `balance_decreased = sending - both` (send-only addresses)
#[derive(Debug, Default, Clone)]
pub struct BlockActivityCounts {
pub reactivated: u32,
pub sending: u32,
pub receiving: u32,
pub both: u32,
}
impl BlockActivityCounts {
/// Reset all counts to zero.
#[inline]
pub fn reset(&mut self) {
*self = Self::default();
}
}
/// Per-address-type activity counts - aggregated during block processing.
#[derive(Debug, Default, Deref, DerefMut)]
pub struct AddressTypeToActivityCounts(pub ByAddressType<BlockActivityCounts>);
impl AddressTypeToActivityCounts {
/// Reset all per-type counts.
pub fn reset(&mut self) {
self.0.values_mut().for_each(|v| v.reset());
}
/// Sum all types to get totals.
pub fn totals(&self) -> BlockActivityCounts {
let mut total = BlockActivityCounts::default();
for counts in self.0.values() {
total.reactivated += counts.reactivated;
total.sending += counts.sending;
total.receiving += counts.receiving;
total.both += counts.both;
}
total
}
}
/// Activity count vectors for a single category (e.g., one address type or "all").
#[derive(Clone, Traversable)]
pub struct ActivityCountVecs {
pub reactivated: ComputedFromHeightDistribution<StoredU32>,
pub sending: ComputedFromHeightDistribution<StoredU32>,
pub receiving: ComputedFromHeightDistribution<StoredU32>,
pub balance_increased: ComputedFromHeightDistribution<StoredU32>,
pub balance_decreased: ComputedFromHeightDistribution<StoredU32>,
pub both: ComputedFromHeightDistribution<StoredU32>,
}
impl ActivityCountVecs {
pub fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self {
reactivated: ComputedFromHeightDistribution::forced_import(
db,
&format!("{name}_reactivated"),
version,
indexes,
)?,
sending: ComputedFromHeightDistribution::forced_import(
db,
&format!("{name}_sending"),
version,
indexes,
)?,
receiving: ComputedFromHeightDistribution::forced_import(
db,
&format!("{name}_receiving"),
version,
indexes,
)?,
balance_increased: ComputedFromHeightDistribution::forced_import(
db,
&format!("{name}_balance_increased"),
version,
indexes,
)?,
balance_decreased: ComputedFromHeightDistribution::forced_import(
db,
&format!("{name}_balance_decreased"),
version,
indexes,
)?,
both: ComputedFromHeightDistribution::forced_import(
db,
&format!("{name}_both"),
version,
indexes,
)?,
})
}
pub fn min_stateful_height(&self) -> usize {
self.reactivated
.height
.len()
.min(self.sending.height.len())
.min(self.receiving.height.len())
.min(self.balance_increased.height.len())
.min(self.balance_decreased.height.len())
.min(self.both.height.len())
}
pub fn par_iter_height_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
[
&mut self.reactivated.height as &mut dyn AnyStoredVec,
&mut self.sending.height as &mut dyn AnyStoredVec,
&mut self.receiving.height as &mut dyn AnyStoredVec,
&mut self.balance_increased.height as &mut dyn AnyStoredVec,
&mut self.balance_decreased.height as &mut dyn AnyStoredVec,
&mut self.both.height as &mut dyn AnyStoredVec,
]
.into_par_iter()
}
pub fn reset_height(&mut self) -> Result<()> {
self.reactivated.height.reset()?;
self.sending.height.reset()?;
self.receiving.height.reset()?;
self.balance_increased.height.reset()?;
self.balance_decreased.height.reset()?;
self.both.height.reset()?;
Ok(())
}
pub fn truncate_push_height(
&mut self,
height: Height,
counts: &BlockActivityCounts,
) -> Result<()> {
self.reactivated
.height
.truncate_push(height, counts.reactivated.into())?;
self.sending
.height
.truncate_push(height, counts.sending.into())?;
self.receiving
.height
.truncate_push(height, counts.receiving.into())?;
// Derived: balance_increased = receiving - both (receive-only addresses)
self.balance_increased
.height
.truncate_push(height, (counts.receiving - counts.both).into())?;
// Derived: balance_decreased = sending - both (send-only addresses)
self.balance_decreased
.height
.truncate_push(height, (counts.sending - counts.both).into())?;
self.both
.height
.truncate_push(height, counts.both.into())?;
Ok(())
}
pub fn compute_rest(
&mut self,
indexes: &indexes::Vecs,
starting_indexes: &ComputeIndexes,
exit: &Exit,
) -> Result<()> {
self.reactivated
.compute_rest(indexes, starting_indexes, exit)?;
self.sending
.compute_rest(indexes, starting_indexes, exit)?;
self.receiving
.compute_rest(indexes, starting_indexes, exit)?;
self.balance_increased
.compute_rest(indexes, starting_indexes, exit)?;
self.balance_decreased
.compute_rest(indexes, starting_indexes, exit)?;
self.both.compute_rest(indexes, starting_indexes, exit)?;
Ok(())
}
}
/// Per-address-type activity count vecs.
#[derive(Clone, Deref, DerefMut, Traversable)]
pub struct AddressTypeToActivityCountVecs(ByAddressType<ActivityCountVecs>);
impl From<ByAddressType<ActivityCountVecs>> for AddressTypeToActivityCountVecs {
#[inline]
fn from(value: ByAddressType<ActivityCountVecs>) -> Self {
Self(value)
}
}
impl AddressTypeToActivityCountVecs {
pub fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self::from(
ByAddressType::<ActivityCountVecs>::new_with_name(|type_name| {
ActivityCountVecs::forced_import(db, &format!("{type_name}_{name}"), version, indexes)
})?,
))
}
pub fn min_stateful_height(&self) -> usize {
self.0.values().map(|v| v.min_stateful_height()).min().unwrap_or(0)
}
pub fn par_iter_height_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
let inner = &mut self.0;
let mut vecs: Vec<&mut dyn AnyStoredVec> = Vec::new();
for type_vecs in [
&mut inner.p2pk65,
&mut inner.p2pk33,
&mut inner.p2pkh,
&mut inner.p2sh,
&mut inner.p2wpkh,
&mut inner.p2wsh,
&mut inner.p2tr,
&mut inner.p2a,
] {
vecs.push(&mut type_vecs.reactivated.height);
vecs.push(&mut type_vecs.sending.height);
vecs.push(&mut type_vecs.receiving.height);
vecs.push(&mut type_vecs.balance_increased.height);
vecs.push(&mut type_vecs.balance_decreased.height);
vecs.push(&mut type_vecs.both.height);
}
vecs.into_par_iter()
}
pub fn reset_height(&mut self) -> Result<()> {
self.p2pk65.reset_height()?;
self.p2pk33.reset_height()?;
self.p2pkh.reset_height()?;
self.p2sh.reset_height()?;
self.p2wpkh.reset_height()?;
self.p2wsh.reset_height()?;
self.p2tr.reset_height()?;
self.p2a.reset_height()?;
Ok(())
}
pub fn truncate_push_height(
&mut self,
height: Height,
counts: &AddressTypeToActivityCounts,
) -> Result<()> {
self.p2pk65
.truncate_push_height(height, &counts.p2pk65)?;
self.p2pk33
.truncate_push_height(height, &counts.p2pk33)?;
self.p2pkh
.truncate_push_height(height, &counts.p2pkh)?;
self.p2sh.truncate_push_height(height, &counts.p2sh)?;
self.p2wpkh
.truncate_push_height(height, &counts.p2wpkh)?;
self.p2wsh
.truncate_push_height(height, &counts.p2wsh)?;
self.p2tr.truncate_push_height(height, &counts.p2tr)?;
self.p2a.truncate_push_height(height, &counts.p2a)?;
Ok(())
}
pub fn compute_rest(
&mut self,
indexes: &indexes::Vecs,
starting_indexes: &ComputeIndexes,
exit: &Exit,
) -> Result<()> {
self.p2pk65.compute_rest(indexes, starting_indexes, exit)?;
self.p2pk33.compute_rest(indexes, starting_indexes, exit)?;
self.p2pkh.compute_rest(indexes, starting_indexes, exit)?;
self.p2sh.compute_rest(indexes, starting_indexes, exit)?;
self.p2wpkh.compute_rest(indexes, starting_indexes, exit)?;
self.p2wsh.compute_rest(indexes, starting_indexes, exit)?;
self.p2tr.compute_rest(indexes, starting_indexes, exit)?;
self.p2a.compute_rest(indexes, starting_indexes, exit)?;
Ok(())
}
}
/// Storage for activity metrics (global + per type).
#[derive(Clone, Traversable)]
pub struct AddressActivityVecs {
pub all: ActivityCountVecs,
#[traversable(flatten)]
pub by_addresstype: AddressTypeToActivityCountVecs,
}
impl AddressActivityVecs {
pub fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self {
all: ActivityCountVecs::forced_import(db, name, version, indexes)?,
by_addresstype: AddressTypeToActivityCountVecs::forced_import(
db, name, version, indexes,
)?,
})
}
pub fn min_stateful_height(&self) -> usize {
self.all.min_stateful_height().min(self.by_addresstype.min_stateful_height())
}
pub fn par_iter_height_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
self.all
.par_iter_height_mut()
.chain(self.by_addresstype.par_iter_height_mut())
}
pub fn reset_height(&mut self) -> Result<()> {
self.all.reset_height()?;
self.by_addresstype.reset_height()?;
Ok(())
}
pub fn truncate_push_height(
&mut self,
height: Height,
counts: &AddressTypeToActivityCounts,
) -> Result<()> {
let totals = counts.totals();
self.all.truncate_push_height(height, &totals)?;
self.by_addresstype.truncate_push_height(height, counts)?;
Ok(())
}
pub fn compute_rest(
&mut self,
indexes: &indexes::Vecs,
starting_indexes: &ComputeIndexes,
exit: &Exit,
) -> Result<()> {
self.all.compute_rest(indexes, starting_indexes, exit)?;
self.by_addresstype
.compute_rest(indexes, starting_indexes, exit)?;
Ok(())
}
}

View File

@@ -106,7 +106,7 @@ impl AddressTypeToAddrCountVecs {
))
}
pub fn min_len(&self) -> usize {
pub fn min_stateful_height(&self) -> usize {
self.p2pk65
.height
.len()
@@ -242,8 +242,8 @@ impl AddrCountVecs {
})
}
pub fn min_len(&self) -> usize {
self.all.height.len().min(self.by_addresstype.min_len())
pub fn min_stateful_height(&self) -> usize {
self.all.height.len().min(self.by_addresstype.min_stateful_height())
}
pub fn par_iter_height_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {

View File

@@ -0,0 +1,92 @@
//! Growth rate: new_addr_count / addr_count (global + per-type)
use brk_cohort::{ByAddressType, zip2_by_addresstype};
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, StoredF32, StoredU64, Version};
use vecdb::{Database, Exit, IterableCloneableVec};
use crate::{
ComputeIndexes, indexes,
internal::{LazyBinaryComputedFromHeightDistribution, RatioU64F32},
};
use super::{AddrCountVecs, NewAddrCountVecs};
/// Growth rate by type - lazy ratio with distribution stats
pub type GrowthRateByType =
ByAddressType<LazyBinaryComputedFromHeightDistribution<StoredF32, StoredU64, StoredU64>>;
/// Growth rate: new_addr_count / addr_count (global + per-type)
#[derive(Clone, Traversable)]
pub struct GrowthRateVecs {
pub all: LazyBinaryComputedFromHeightDistribution<StoredF32, StoredU64, StoredU64>,
#[traversable(flatten)]
pub by_addresstype: GrowthRateByType,
}
impl GrowthRateVecs {
pub fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
new_addr_count: &NewAddrCountVecs,
addr_count: &AddrCountVecs,
) -> Result<Self> {
let all = make_growth_rate(
db,
"growth_rate",
version,
indexes,
&new_addr_count.all.height,
&addr_count.all.height,
)?;
let by_addresstype: GrowthRateByType = zip2_by_addresstype(
&new_addr_count.by_addresstype,
&addr_count.by_addresstype,
|name, new, addr| {
make_growth_rate(
db,
&format!("{name}_growth_rate"),
version,
indexes,
&new.height,
&addr.height,
)
},
)?;
Ok(Self { all, by_addresstype })
}
pub fn derive_from(
&mut self,
indexes: &indexes::Vecs,
starting_indexes: &ComputeIndexes,
exit: &Exit,
) -> Result<()> {
self.all.derive_from(indexes, starting_indexes, exit)?;
for vecs in self.by_addresstype.values_mut() {
vecs.derive_from(indexes, starting_indexes, exit)?;
}
Ok(())
}
}
fn make_growth_rate<V1, V2>(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
new: &V1,
addr: &V2,
) -> Result<LazyBinaryComputedFromHeightDistribution<StoredF32, StoredU64, StoredU64>>
where
V1: IterableCloneableVec<Height, StoredU64>,
V2: IterableCloneableVec<Height, StoredU64>,
{
LazyBinaryComputedFromHeightDistribution::<StoredF32, StoredU64, StoredU64>::forced_import::<
RatioU64F32,
>(db, name, version, new.boxed_clone(), addr.boxed_clone(), indexes)
}

View File

@@ -1,9 +1,17 @@
mod activity;
mod address_count;
mod data;
mod growth_rate;
mod indexes;
mod new_addr_count;
mod total_addr_count;
mod type_map;
pub use activity::{AddressActivityVecs, AddressTypeToActivityCounts};
pub use address_count::{AddrCountVecs, AddressTypeToAddressCount};
pub use data::AddressesDataVecs;
pub use growth_rate::GrowthRateVecs;
pub use indexes::AnyAddressIndexesVecs;
pub use new_addr_count::NewAddrCountVecs;
pub use total_addr_count::TotalAddrCountVecs;
pub use type_map::{AddressTypeToTypeIndexMap, AddressTypeToVec, HeightToAddressTypeToVec};

View File

@@ -0,0 +1,83 @@
//! New address count: delta of total_addr_count (global + per-type)
use brk_cohort::{ByAddressType, zip_by_addresstype};
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, StoredU64, Version};
use vecdb::{Database, Exit, TypedVecIterator};
use crate::{ComputeIndexes, indexes, internal::LazyComputedFromHeightFull};
use super::TotalAddrCountVecs;
/// New addresses by type - lazy delta with stored dateindex stats
pub type NewAddrCountByType = ByAddressType<LazyComputedFromHeightFull<StoredU64, StoredU64>>;
/// New address count per block (global + per-type)
#[derive(Clone, Traversable)]
pub struct NewAddrCountVecs {
pub all: LazyComputedFromHeightFull<StoredU64, StoredU64>,
#[traversable(flatten)]
pub by_addresstype: NewAddrCountByType,
}
impl NewAddrCountVecs {
pub fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
total_addr_count: &TotalAddrCountVecs,
) -> Result<Self> {
let all = LazyComputedFromHeightFull::forced_import_with_init(
db,
"new_addr_count",
version,
total_addr_count.all.height.clone(),
indexes,
delta_init_fn,
)?;
let by_addresstype: NewAddrCountByType = zip_by_addresstype(
&total_addr_count.by_addresstype,
|name, total| {
LazyComputedFromHeightFull::forced_import_with_init(
db,
&format!("{name}_new_addr_count"),
version,
total.height.clone(),
indexes,
delta_init_fn,
)
},
)?;
Ok(Self { all, by_addresstype })
}
pub fn derive_from(
&mut self,
indexes: &indexes::Vecs,
starting_indexes: &ComputeIndexes,
exit: &Exit,
) -> Result<()> {
self.all.derive_from(indexes, starting_indexes, exit)?;
for vecs in self.by_addresstype.values_mut() {
vecs.derive_from(indexes, starting_indexes, exit)?;
}
Ok(())
}
}
/// Delta init function: value[h] = source[h] - source[h-1]
fn delta_init_fn(
h: Height,
total_iter: &mut dyn TypedVecIterator<I = Height, T = StoredU64, Item = StoredU64>,
) -> Option<StoredU64> {
let current: u64 = total_iter.get(h)?.into();
let prev: u64 = h
.decremented()
.and_then(|prev_h| total_iter.get(prev_h))
.map(|v: StoredU64| v.into())
.unwrap_or(0);
Some(StoredU64::from(current.saturating_sub(prev)))
}

View File

@@ -0,0 +1,72 @@
//! Total address count: addr_count + empty_addr_count (global + per-type)
use brk_cohort::{ByAddressType, zip2_by_addresstype};
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{StoredU64, Version};
use vecdb::{Database, Exit, IterableCloneableVec};
use crate::{ComputeIndexes, indexes, internal::{LazyBinaryComputedFromHeightLast, U64Plus}};
use super::AddrCountVecs;
/// Total addresses by type - lazy sum with all derived indexes
pub type TotalAddrCountByType =
ByAddressType<LazyBinaryComputedFromHeightLast<StoredU64, StoredU64, StoredU64>>;
/// Total address count (global + per-type) with all derived indexes
#[derive(Clone, Traversable)]
pub struct TotalAddrCountVecs {
pub all: LazyBinaryComputedFromHeightLast<StoredU64, StoredU64, StoredU64>,
#[traversable(flatten)]
pub by_addresstype: TotalAddrCountByType,
}
impl TotalAddrCountVecs {
pub fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
addr_count: &AddrCountVecs,
empty_addr_count: &AddrCountVecs,
) -> Result<Self> {
let all = LazyBinaryComputedFromHeightLast::forced_import::<U64Plus>(
db,
"total_addr_count",
version,
addr_count.all.height.boxed_clone(),
empty_addr_count.all.height.boxed_clone(),
indexes,
)?;
let by_addresstype: TotalAddrCountByType = zip2_by_addresstype(
&addr_count.by_addresstype,
&empty_addr_count.by_addresstype,
|name, addr, empty| {
LazyBinaryComputedFromHeightLast::forced_import::<U64Plus>(
db,
&format!("{name}_total_addr_count"),
version,
addr.height.boxed_clone(),
empty.height.boxed_clone(),
indexes,
)
},
)?;
Ok(Self { all, by_addresstype })
}
pub fn derive_from(
&mut self,
indexes: &indexes::Vecs,
starting_indexes: &ComputeIndexes,
exit: &Exit,
) -> Result<()> {
self.all.derive_from(indexes, starting_indexes, exit)?;
for vecs in self.by_addresstype.values_mut() {
vecs.derive_from(indexes, starting_indexes, exit)?;
}
Ok(())
}
}

View File

@@ -2,10 +2,14 @@ use brk_cohort::{AmountBucket, ByAddressType};
use brk_types::{Dollars, Sats, TypeIndex};
use rustc_hash::FxHashMap;
use crate::distribution::{address::AddressTypeToVec, cohorts::AddressCohorts};
use crate::distribution::{
address::{AddressTypeToActivityCounts, AddressTypeToVec},
cohorts::AddressCohorts,
};
use super::super::cache::{AddressLookup, TrackingStatus};
#[allow(clippy::too_many_arguments)]
pub fn process_received(
received_data: AddressTypeToVec<(TypeIndex, Sats)>,
cohorts: &mut AddressCohorts,
@@ -13,6 +17,7 @@ pub fn process_received(
price: Option<Dollars>,
addr_count: &mut ByAddressType<u64>,
empty_addr_count: &mut ByAddressType<u64>,
activity_counts: &mut AddressTypeToActivityCounts,
) {
for (output_type, vec) in received_data.unwrap().into_iter() {
if vec.is_empty() {
@@ -22,6 +27,7 @@ pub fn process_received(
// Cache mutable refs for this address type
let type_addr_count = addr_count.get_mut(output_type).unwrap();
let type_empty_count = empty_addr_count.get_mut(output_type).unwrap();
let type_activity = activity_counts.get_mut_unwrap(output_type);
// Aggregate receives by address - each address processed exactly once
// Track (total_value, output_count) for correct UTXO counting
@@ -35,6 +41,9 @@ pub fn process_received(
for (type_index, (total_value, output_count)) in aggregated {
let (addr_data, status) = lookup.get_or_create_for_receive(output_type, type_index);
// Track receiving activity - each address in receive aggregation
type_activity.receiving += 1;
match status {
TrackingStatus::New => {
*type_addr_count += 1;
@@ -42,6 +51,8 @@ pub fn process_received(
TrackingStatus::WasEmpty => {
*type_addr_count += 1;
*type_empty_count -= 1;
// Reactivated - was empty, now has funds
type_activity.reactivated += 1;
}
TrackingStatus::Tracked => {}
}

View File

@@ -1,9 +1,13 @@
use brk_cohort::{AmountBucket, ByAddressType};
use brk_error::Result;
use brk_types::{Age, CheckedSub, Dollars, Height, Sats, Timestamp, TypeIndex};
use rustc_hash::FxHashSet;
use vecdb::{unlikely, VecIndex};
use crate::distribution::{address::HeightToAddressTypeToVec, cohorts::AddressCohorts};
use crate::distribution::{
address::{AddressTypeToActivityCounts, HeightToAddressTypeToVec},
cohorts::AddressCohorts,
};
use super::super::cache::AddressLookup;
@@ -25,11 +29,16 @@ pub fn process_sent(
current_price: Option<Dollars>,
addr_count: &mut ByAddressType<u64>,
empty_addr_count: &mut ByAddressType<u64>,
activity_counts: &mut AddressTypeToActivityCounts,
received_addresses: &ByAddressType<FxHashSet<TypeIndex>>,
height_to_price: Option<&[Dollars]>,
height_to_timestamp: &[Timestamp],
current_height: Height,
current_timestamp: Timestamp,
) -> Result<()> {
// Track unique senders per address type (simple set, no extra data needed)
let mut seen_senders: ByAddressType<FxHashSet<TypeIndex>> = ByAddressType::default();
for (prev_height, by_type) in sent_data.into_iter() {
let prev_price = height_to_price.map(|v| v[prev_height.to_usize()]);
let prev_timestamp = height_to_timestamp[prev_height.to_usize()];
@@ -40,12 +49,26 @@ pub fn process_sent(
// Cache mutable refs for this address type
let type_addr_count = addr_count.get_mut(output_type).unwrap();
let type_empty_count = empty_addr_count.get_mut(output_type).unwrap();
let type_activity = activity_counts.get_mut_unwrap(output_type);
let type_received = received_addresses.get_unwrap(output_type);
let type_seen = seen_senders.get_mut_unwrap(output_type);
for (type_index, value) in vec {
let addr_data = lookup.get_for_send(output_type, type_index);
let prev_balance = addr_data.balance();
let new_balance = prev_balance.checked_sub(value).unwrap();
// On first encounter of this address this block, track activity
if type_seen.insert(type_index) {
type_activity.sending += 1;
// Track "both" - addresses that sent AND received this block
if type_received.contains(&type_index) {
type_activity.both += 1;
}
}
let will_be_empty = addr_data.has_1_utxos();
// Compute buckets once

View File

@@ -5,13 +5,14 @@ use brk_error::Result;
use brk_indexer::Indexer;
use brk_types::{DateIndex, Height, OutputType, Sats, TxIndex, TypeIndex};
use rayon::prelude::*;
use rustc_hash::FxHashSet;
use tracing::info;
use vecdb::{Exit, IterableVec, TypedVecIterator, VecIndex};
use crate::{
blocks,
distribution::{
address::AddressTypeToAddressCount,
address::{AddressTypeToActivityCounts, AddressTypeToAddressCount},
block::{
AddressCache, InputsResult, process_inputs, process_outputs, process_received,
process_sent,
@@ -139,6 +140,9 @@ pub fn process_blocks(
)
};
// Track activity counts - reset each block
let mut activity_counts = AddressTypeToActivityCounts::default();
let mut cache = AddressCache::new();
// Main block iteration
@@ -184,6 +188,9 @@ pub fn process_blocks(
// Reset per-block values for all separate cohorts
reset_block_values(&mut vecs.utxo_cohorts, &mut vecs.address_cohorts);
// Reset per-block activity counts
activity_counts.reset();
// Collect output/input data using reusable iterators (16KB buffered reads)
// Must be done before thread::scope since iterators aren't Send
let txoutdata_vec = txout_iters.collect_block_outputs(first_txoutindex, output_count);
@@ -284,6 +291,18 @@ pub fn process_blocks(
timestamp,
});
// Build set of addresses that received this block (for detecting "both" in sent)
let received_addresses: ByAddressType<FxHashSet<TypeIndex>> = {
let mut sets = ByAddressType::<FxHashSet<TypeIndex>>::default();
for (output_type, vec) in outputs_result.received_data.iter() {
let set = sets.get_mut_unwrap(output_type);
for (type_index, _) in vec {
set.insert(*type_index);
}
}
sets
};
// Process UTXO cohorts and Address cohorts in parallel
// - Main thread: UTXO cohorts receive/send
// - Spawned thread: Address cohorts process_received/process_sent
@@ -300,6 +319,7 @@ pub fn process_blocks(
block_price,
&mut addr_counts,
&mut empty_addr_counts,
&mut activity_counts,
);
// Process sent inputs (addresses sending funds)
@@ -311,6 +331,8 @@ pub fn process_blocks(
block_price,
&mut addr_counts,
&mut empty_addr_counts,
&mut activity_counts,
&received_addresses,
height_to_price_vec.as_deref(),
height_to_timestamp_vec,
height,
@@ -333,6 +355,8 @@ pub fn process_blocks(
empty_addr_counts.sum(),
&empty_addr_counts,
)?;
vecs.address_activity
.truncate_push_height(height, &activity_counts)?;
// Get date info for unrealized state computation
let date = height_to_date_iter.get_unwrap(height);

View File

@@ -76,6 +76,7 @@ pub fn write(
.chain(vecs.addresses_data.par_iter_mut())
.chain(vecs.addr_count.par_iter_height_mut())
.chain(vecs.empty_addr_count.par_iter_height_mut())
.chain(vecs.address_activity.par_iter_height_mut())
.chain(rayon::iter::once(
&mut vecs.chain_state as &mut dyn AnyStoredVec,
))

View File

@@ -23,7 +23,10 @@ use crate::{
};
use super::{
AddressCohorts, AddressesDataVecs, AnyAddressIndexesVecs, UTXOCohorts, address::AddrCountVecs,
AddressCohorts, AddressesDataVecs, AnyAddressIndexesVecs, UTXOCohorts,
address::{
AddrCountVecs, AddressActivityVecs, GrowthRateVecs, NewAddrCountVecs, TotalAddrCountVecs,
},
compute::aggregates,
};
@@ -43,6 +46,15 @@ pub struct Vecs {
pub addr_count: AddrCountVecs,
pub empty_addr_count: AddrCountVecs,
pub address_activity: AddressActivityVecs,
/// Total addresses ever seen (addr_count + empty_addr_count) - lazy, global + per-type
pub total_addr_count: TotalAddrCountVecs,
/// New addresses per block (delta of total) - lazy height, stored dateindex stats, global + per-type
pub new_addr_count: NewAddrCountVecs,
/// Growth rate (new / addr_count) - lazy ratio with distribution stats, global + per-type
pub growth_rate: GrowthRateVecs,
pub loadedaddressindex:
LazyVecFrom1<LoadedAddressIndex, LoadedAddressIndex, LoadedAddressIndex, LoadedAddressData>,
pub emptyaddressindex:
@@ -103,19 +115,41 @@ impl Vecs {
|index, _| Some(index),
);
let addr_count = AddrCountVecs::forced_import(&db, "addr_count", version, indexes)?;
let empty_addr_count =
AddrCountVecs::forced_import(&db, "empty_addr_count", version, indexes)?;
let address_activity =
AddressActivityVecs::forced_import(&db, "address_activity", version, indexes)?;
// Lazy total = addr_count + empty_addr_count (global + per-type, with all derived indexes)
let total_addr_count = TotalAddrCountVecs::forced_import(
&db,
version,
indexes,
&addr_count,
&empty_addr_count,
)?;
// Lazy delta of total (global + per-type)
let new_addr_count =
NewAddrCountVecs::forced_import(&db, version, indexes, &total_addr_count)?;
// Growth rate: new / addr_count (global + per-type)
let growth_rate =
GrowthRateVecs::forced_import(&db, version, indexes, &new_addr_count, &addr_count)?;
let this = Self {
chain_state: BytesVec::forced_import_with(
vecdb::ImportOptions::new(&db, "chain", version)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
addr_count: AddrCountVecs::forced_import(&db, "addr_count", version, indexes)?,
empty_addr_count: AddrCountVecs::forced_import(
&db,
"empty_addr_count",
version,
indexes,
)?,
addr_count,
empty_addr_count,
address_activity,
total_addr_count,
new_addr_count,
growth_rate,
utxo_cohorts,
address_cohorts,
@@ -210,6 +244,7 @@ impl Vecs {
self.chain_state.reset()?;
self.addr_count.reset_height()?;
self.empty_addr_count.reset_height()?;
self.address_activity.reset_height()?;
reset_state(
&mut self.any_address_indexes,
&mut self.addresses_data,
@@ -306,6 +341,20 @@ impl Vecs {
.compute_rest(indexes, starting_indexes, exit)?;
self.empty_addr_count
.compute_rest(indexes, starting_indexes, exit)?;
self.address_activity
.compute_rest(indexes, starting_indexes, exit)?;
// 6c. Derive total_addr_count dateindex stats (height is lazy sum)
self.total_addr_count
.derive_from(indexes, starting_indexes, exit)?;
// 6d. Derive new_addr_count dateindex stats (height is lazy delta)
self.new_addr_count
.derive_from(indexes, starting_indexes, exit)?;
// 6e. Derive growth_rate dateindex stats (height is lazy ratio)
self.growth_rate
.derive_from(indexes, starting_indexes, exit)?;
// 7. Compute rest part2 (relative metrics)
let supply_metrics = &self.utxo_cohorts.all.metrics.supply;
@@ -354,8 +403,9 @@ impl Vecs {
.min(Height::from(self.chain_state.len()))
.min(self.any_address_indexes.min_stamped_height())
.min(self.addresses_data.min_stamped_height())
.min(Height::from(self.addr_count.min_len()))
.min(Height::from(self.empty_addr_count.min_len()))
.min(Height::from(self.addr_count.min_stateful_height()))
.min(Height::from(self.empty_addr_count.min_stateful_height()))
.min(Height::from(self.address_activity.min_stateful_height()))
}
/// Get minimum length across all dateindex-indexed stateful vectors.