Files
brk/crates/brk_computer/src/distribution/addr/activity.rs
2026-03-17 11:01:21 +01:00

304 lines
8.9 KiB
Rust

//! Address activity tracking - per-block counts of address behaviors.
//!
//! Tracks global and per-address-type activity metrics:
//!
//! | Metric | Description |
//! |--------|-------------|
//! | `receiving` | Unique addresses that received this block |
//! | `sending` | Unique addresses that sent this block |
//! | `reactivated` | Addresses that were empty and now have funds |
//! | `both` | Addresses that both sent AND received same block |
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, StoredU32, Version};
use derive_more::{Deref, DerefMut};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, AnyVec, Database, Exit, Rw, StorageMode, WritableVec};
use crate::{
indexes,
internal::{CachedWindowStarts, PerBlockRollingAverage},
};
/// Per-block activity counts - reset each block.
#[derive(Debug, Default, Clone)]
pub struct BlockActivityCounts {
pub reactivated: u32,
pub sending: u32,
pub receiving: u32,
pub both: u32,
}
impl BlockActivityCounts {
/// Reset all counts to zero.
#[inline]
pub(crate) fn reset(&mut self) {
*self = Self::default();
}
}
/// Per-address-type activity counts - aggregated during block processing.
#[derive(Debug, Default, Deref, DerefMut)]
pub struct AddrTypeToActivityCounts(pub ByAddrType<BlockActivityCounts>);
impl AddrTypeToActivityCounts {
/// Reset all per-type counts.
pub(crate) fn reset(&mut self) {
self.0.values_mut().for_each(|v| v.reset());
}
/// Sum all types to get totals.
pub(crate) fn totals(&self) -> BlockActivityCounts {
let mut total = BlockActivityCounts::default();
for counts in self.0.values() {
total.reactivated += counts.reactivated;
total.sending += counts.sending;
total.receiving += counts.receiving;
total.both += counts.both;
}
total
}
}
/// Activity count vectors for a single category (e.g., one address type or "all").
#[derive(Traversable)]
pub struct ActivityCountVecs<M: StorageMode = Rw> {
pub reactivated: PerBlockRollingAverage<StoredU32, M>,
pub sending: PerBlockRollingAverage<StoredU32, M>,
pub receiving: PerBlockRollingAverage<StoredU32, M>,
pub both: PerBlockRollingAverage<StoredU32, M>,
}
impl ActivityCountVecs {
pub(crate) fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &CachedWindowStarts,
) -> Result<Self> {
Ok(Self {
reactivated: PerBlockRollingAverage::forced_import(
db,
&format!("{name}_reactivated"),
version,
indexes,
cached_starts,
)?,
sending: PerBlockRollingAverage::forced_import(
db,
&format!("{name}_sending"),
version,
indexes,
cached_starts,
)?,
receiving: PerBlockRollingAverage::forced_import(
db,
&format!("{name}_receiving"),
version,
indexes,
cached_starts,
)?,
both: PerBlockRollingAverage::forced_import(
db,
&format!("{name}_both"),
version,
indexes,
cached_starts,
)?,
})
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.reactivated
.base
.len()
.min(self.sending.base.len())
.min(self.receiving.base.len())
.min(self.both.base.len())
}
pub(crate) fn par_iter_height_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
[
&mut self.reactivated.base as &mut dyn AnyStoredVec,
&mut self.sending.base as &mut dyn AnyStoredVec,
&mut self.receiving.base as &mut dyn AnyStoredVec,
&mut self.both.base as &mut dyn AnyStoredVec,
]
.into_par_iter()
}
pub(crate) fn reset_height(&mut self) -> Result<()> {
self.reactivated.base.reset()?;
self.sending.base.reset()?;
self.receiving.base.reset()?;
self.both.base.reset()?;
Ok(())
}
#[inline(always)]
pub(crate) fn push_height(&mut self, counts: &BlockActivityCounts) {
self.reactivated.base.push(counts.reactivated.into());
self.sending.base.push(counts.sending.into());
self.receiving.base.push(counts.receiving.into());
self.both.base.push(counts.both.into());
}
pub(crate) fn compute_rest(
&mut self,
max_from: Height,
exit: &Exit,
) -> Result<()> {
self.reactivated.compute_rest(max_from, exit)?;
self.sending.compute_rest(max_from, exit)?;
self.receiving.compute_rest(max_from, exit)?;
self.both.compute_rest(max_from, exit)?;
Ok(())
}
}
/// Per-address-type activity count vecs.
#[derive(Deref, DerefMut, Traversable)]
pub struct AddrTypeToActivityCountVecs<M: StorageMode = Rw>(ByAddrType<ActivityCountVecs<M>>);
impl From<ByAddrType<ActivityCountVecs>> for AddrTypeToActivityCountVecs {
#[inline]
fn from(value: ByAddrType<ActivityCountVecs>) -> Self {
Self(value)
}
}
impl AddrTypeToActivityCountVecs {
pub(crate) fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &CachedWindowStarts,
) -> Result<Self> {
Ok(Self::from(
ByAddrType::<ActivityCountVecs>::new_with_name(|type_name| {
ActivityCountVecs::forced_import(
db,
&format!("{type_name}_{name}"),
version,
indexes,
cached_starts,
)
})?,
))
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.0
.values()
.map(|v| v.min_stateful_len())
.min()
.unwrap_or(0)
}
pub(crate) fn par_iter_height_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
let mut vecs: Vec<&mut dyn AnyStoredVec> = Vec::new();
for type_vecs in self.0.values_mut() {
vecs.push(&mut type_vecs.reactivated.base);
vecs.push(&mut type_vecs.sending.base);
vecs.push(&mut type_vecs.receiving.base);
vecs.push(&mut type_vecs.both.base);
}
vecs.into_par_iter()
}
pub(crate) fn reset_height(&mut self) -> Result<()> {
for v in self.0.values_mut() {
v.reset_height()?;
}
Ok(())
}
pub(crate) fn compute_rest(
&mut self,
max_from: Height,
exit: &Exit,
) -> Result<()> {
for type_vecs in self.0.values_mut() {
type_vecs.compute_rest(max_from, exit)?;
}
Ok(())
}
#[inline(always)]
pub(crate) fn push_height(&mut self, counts: &AddrTypeToActivityCounts) {
for (vecs, c) in self.0.values_mut().zip(counts.0.values()) {
vecs.push_height(c);
}
}
}
/// Storage for activity metrics (global + per type).
#[derive(Traversable)]
pub struct AddrActivityVecs<M: StorageMode = Rw> {
pub all: ActivityCountVecs<M>,
#[traversable(flatten)]
pub by_addr_type: AddrTypeToActivityCountVecs<M>,
}
impl AddrActivityVecs {
pub(crate) fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &CachedWindowStarts,
) -> Result<Self> {
Ok(Self {
all: ActivityCountVecs::forced_import(db, name, version, indexes, cached_starts)?,
by_addr_type: AddrTypeToActivityCountVecs::forced_import(
db, name, version, indexes, cached_starts,
)?,
})
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.all
.min_stateful_len()
.min(self.by_addr_type.min_stateful_len())
}
pub(crate) fn par_iter_height_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
self.all
.par_iter_height_mut()
.chain(self.by_addr_type.par_iter_height_mut())
}
pub(crate) fn reset_height(&mut self) -> Result<()> {
self.all.reset_height()?;
self.by_addr_type.reset_height()?;
Ok(())
}
pub(crate) fn compute_rest(
&mut self,
max_from: Height,
exit: &Exit,
) -> Result<()> {
self.all.compute_rest(max_from, exit)?;
self.by_addr_type.compute_rest(max_from, exit)?;
Ok(())
}
#[inline(always)]
pub(crate) fn push_height(&mut self, counts: &AddrTypeToActivityCounts) {
let totals = counts.totals();
self.all.push_height(&totals);
self.by_addr_type.push_height(counts);
}
}