global: big snapshot

This commit is contained in:
nym21
2026-04-13 22:46:56 +02:00
parent c3cef71aa3
commit 765261648d
89 changed files with 4138 additions and 149 deletions

View File

@@ -1,22 +1,18 @@
use brk_cohort::ByAddrType;
use brk_traversable::Traversable;
use brk_types::{BasisPointsSigned32, StoredI64, StoredU64, Version};
use derive_more::{Deref, DerefMut};
use crate::{
indexes,
internal::{LazyRollingDeltasFromHeight, WindowStartVec, Windows},
};
use super::AddrCountsVecs;
use super::{AddrCountsVecs, WithAddrTypes};
type AddrDelta = LazyRollingDeltasFromHeight<StoredU64, StoredI64, BasisPointsSigned32>;
#[derive(Clone, Traversable)]
pub struct DeltaVecs {
pub all: AddrDelta,
#[traversable(flatten)]
pub by_addr_type: ByAddrType<AddrDelta>,
}
#[derive(Clone, Deref, DerefMut, Traversable)]
pub struct DeltaVecs(#[traversable(flatten)] pub WithAddrTypes<AddrDelta>);
impl DeltaVecs {
pub(crate) fn new(
@@ -45,6 +41,6 @@ impl DeltaVecs {
)
});
Self { all, by_addr_type }
Self(WithAddrTypes { all, by_addr_type })
}
}

View File

@@ -0,0 +1,74 @@
//! Exposed address count tracking — running counters of how many addresses
//! are currently in (or have ever been in) the exposed set, per address type
//! plus an aggregated `all`. See the parent [`super`] module for the
//! definition of "exposed" and how it varies by address type.
mod state;
mod vecs;
pub use state::AddrTypeToExposedAddrCount;
pub use vecs::ExposedAddrCountAllVecs;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Indexes, Version};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Database, Exit, Rw, StorageMode};
use crate::indexes;
/// Exposed address counts: funded (currently at-risk) and total (ever at-risk).
#[derive(Traversable)]
pub struct ExposedAddrCountsVecs<M: StorageMode = Rw> {
pub funded: ExposedAddrCountAllVecs<M>,
pub total: ExposedAddrCountAllVecs<M>,
}
impl ExposedAddrCountsVecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self {
funded: ExposedAddrCountAllVecs::forced_import(
db,
"exposed_addr_count",
version,
indexes,
)?,
total: ExposedAddrCountAllVecs::forced_import(
db,
"total_exposed_addr_count",
version,
indexes,
)?,
})
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.funded
.min_stateful_len()
.min(self.total.min_stateful_len())
}
pub(crate) fn par_iter_height_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
self.funded
.par_iter_height_mut()
.chain(self.total.par_iter_height_mut())
}
pub(crate) fn reset_height(&mut self) -> Result<()> {
self.funded.reset_height()?;
self.total.reset_height()?;
Ok(())
}
pub(crate) fn compute_rest(&mut self, starting_indexes: &Indexes, exit: &Exit) -> Result<()> {
self.funded.compute_rest(starting_indexes, exit)?;
self.total.compute_rest(starting_indexes, exit)?;
Ok(())
}
}

View File

@@ -0,0 +1,42 @@
use brk_cohort::ByAddrType;
use brk_types::{Height, StoredU64};
use derive_more::{Deref, DerefMut};
use vecdb::ReadableVec;
use crate::internal::PerBlock;
use super::vecs::ExposedAddrCountAllVecs;
/// Runtime counter for exposed address counts per address type.
#[derive(Debug, Default, Deref, DerefMut)]
pub struct AddrTypeToExposedAddrCount(ByAddrType<u64>);
impl AddrTypeToExposedAddrCount {
#[inline]
pub(crate) fn sum(&self) -> u64 {
self.0.values().sum()
}
}
impl From<(&ExposedAddrCountAllVecs, Height)> for AddrTypeToExposedAddrCount {
#[inline]
fn from((vecs, starting_height): (&ExposedAddrCountAllVecs, Height)) -> Self {
if let Some(prev_height) = starting_height.decremented() {
let read = |v: &PerBlock<StoredU64>| -> u64 {
v.height.collect_one(prev_height).unwrap().into()
};
Self(ByAddrType {
p2pk65: read(&vecs.by_addr_type.p2pk65),
p2pk33: read(&vecs.by_addr_type.p2pk33),
p2pkh: read(&vecs.by_addr_type.p2pkh),
p2sh: read(&vecs.by_addr_type.p2sh),
p2wpkh: read(&vecs.by_addr_type.p2wpkh),
p2wsh: read(&vecs.by_addr_type.p2wsh),
p2tr: read(&vecs.by_addr_type.p2tr),
p2a: read(&vecs.by_addr_type.p2a),
})
} else {
Default::default()
}
}
}

View File

@@ -0,0 +1,30 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{StoredU64, Version};
use derive_more::{Deref, DerefMut};
use vecdb::{Database, Rw, StorageMode};
use crate::{
distribution::addr::WithAddrTypes,
indexes,
internal::PerBlock,
};
/// Exposed address count (`all` + per-type) for a single variant (funded or total).
#[derive(Deref, DerefMut, Traversable)]
pub struct ExposedAddrCountAllVecs<M: StorageMode = Rw>(
#[traversable(flatten)] pub WithAddrTypes<PerBlock<StoredU64, M>>,
);
impl ExposedAddrCountAllVecs {
pub(crate) fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self(WithAddrTypes::<PerBlock<StoredU64>>::forced_import(
db, name, version, indexes,
)?))
}
}

View File

@@ -0,0 +1,95 @@
//! Exposed address tracking (quantum / pubkey-exposure sense).
//!
//! An address is "exposed" once its public key is in the blockchain. Once
//! exposed, any funds at that address are at cryptographic risk (e.g. from
//! a quantum attacker capable of recovering the private key from the pubkey).
//!
//! When the pubkey gets exposed depends on the address type:
//!
//! - **P2PK33, P2PK65, P2TR**: the pubkey (or P2TR's tweaked output key) is
//! directly in the locking script of the funding output. These addresses are
//! exposed the moment they receive any funds.
//! - **P2PKH, P2SH, P2WPKH, P2WSH**: the locking script contains a hash of
//! the pubkey/script. The pubkey is only revealed when spending. Note that
//! even the spending tx itself exposes the pubkey while the address still
//! holds funds — during the mempool window between broadcast and confirmation,
//! the pubkey is visible while the UTXO being spent is still unspent on-chain.
//! So every spent address of these types has had at least one moment with
//! funds at quantum risk.
//! - **P2A**: anyone-can-spend, no pubkey at all. Excluded from both counters.
//!
//! Formally, with `is_funding_exposed` = `output_type.pubkey_exposed_at_funding()`:
//! - `funded` (count): `(utxo_count > 0) AND (is_funding_exposed OR spent_txo_count >= 1)`
//! - `total` (count): `(is_funding_exposed AND ever received) OR spent_txo_count >= 1`
//! - `supply` (sats): sum of balances of addresses currently in the funded set
//!
//! For P2PK/P2TR types this means `total ≡ total_addr_count` and
//! `funded ≡ funded_addr_count` (every address of those types is exposed by
//! virtue of existing). For P2PKH/P2SH/P2WPKH/P2WSH it's the strict subset of
//! addresses that have been spent from. The aggregate `all` exposed counter
//! sums these, giving "Bitcoin addresses currently with funds at quantum risk".
//!
//! All metrics are tracked as running counters and require no extra fields
//! on the address data — they're maintained via delta detection in
//! `process_received` and `process_sent`.
mod count;
mod supply;
pub use count::{AddrTypeToExposedAddrCount, ExposedAddrCountsVecs};
pub use supply::{AddrTypeToExposedAddrSupply, ExposedAddrSupplyVecs};
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Indexes, Version};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Database, Exit, Rw, StorageMode};
use crate::indexes;
/// Top-level container for all exposed address tracking: counts (funded +
/// total) plus the funded supply.
#[derive(Traversable)]
pub struct ExposedAddrVecs<M: StorageMode = Rw> {
pub count: ExposedAddrCountsVecs<M>,
pub supply: ExposedAddrSupplyVecs<M>,
}
impl ExposedAddrVecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self {
count: ExposedAddrCountsVecs::forced_import(db, version, indexes)?,
supply: ExposedAddrSupplyVecs::forced_import(db, version, indexes)?,
})
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.count
.min_stateful_len()
.min(self.supply.min_stateful_len())
}
pub(crate) fn par_iter_height_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
self.count
.par_iter_height_mut()
.chain(self.supply.par_iter_height_mut())
}
pub(crate) fn reset_height(&mut self) -> Result<()> {
self.count.reset_height()?;
self.supply.reset_height()?;
Ok(())
}
pub(crate) fn compute_rest(&mut self, starting_indexes: &Indexes, exit: &Exit) -> Result<()> {
self.count.compute_rest(starting_indexes, exit)?;
self.supply.compute_rest(starting_indexes, exit)?;
Ok(())
}
}

View File

@@ -0,0 +1,10 @@
//! Exposed address supply (sats) tracking — running sum of balances held by
//! addresses currently in the funded exposed set, per address type plus an
//! aggregated `all`. See the parent [`super`] module for the definition of
//! "exposed" and how it varies by address type.
mod state;
mod vecs;
pub use state::AddrTypeToExposedAddrSupply;
pub use vecs::ExposedAddrSupplyVecs;

View File

@@ -0,0 +1,43 @@
use brk_cohort::ByAddrType;
use brk_types::{Height, Sats};
use derive_more::{Deref, DerefMut};
use vecdb::ReadableVec;
use crate::internal::PerBlock;
use super::vecs::ExposedAddrSupplyVecs;
/// Runtime running counter for the total balance (sats) held by funded
/// exposed addresses, per address type.
#[derive(Debug, Default, Deref, DerefMut)]
pub struct AddrTypeToExposedAddrSupply(ByAddrType<u64>);
impl AddrTypeToExposedAddrSupply {
#[inline]
pub(crate) fn sum(&self) -> u64 {
self.0.values().sum()
}
}
impl From<(&ExposedAddrSupplyVecs, Height)> for AddrTypeToExposedAddrSupply {
#[inline]
fn from((vecs, starting_height): (&ExposedAddrSupplyVecs, Height)) -> Self {
if let Some(prev_height) = starting_height.decremented() {
let read = |v: &PerBlock<Sats>| -> u64 {
u64::from(v.height.collect_one(prev_height).unwrap())
};
Self(ByAddrType {
p2pk65: read(&vecs.by_addr_type.p2pk65),
p2pk33: read(&vecs.by_addr_type.p2pk33),
p2pkh: read(&vecs.by_addr_type.p2pkh),
p2sh: read(&vecs.by_addr_type.p2sh),
p2wpkh: read(&vecs.by_addr_type.p2wpkh),
p2wsh: read(&vecs.by_addr_type.p2wsh),
p2tr: read(&vecs.by_addr_type.p2tr),
p2a: read(&vecs.by_addr_type.p2a),
})
} else {
Default::default()
}
}
}

View File

@@ -0,0 +1,33 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Sats, Version};
use derive_more::{Deref, DerefMut};
use vecdb::{Database, Rw, StorageMode};
use crate::{
distribution::addr::WithAddrTypes,
indexes,
internal::PerBlock,
};
/// Exposed address supply (sats) — `all` + per-address-type. Tracks the total
/// balance held by addresses currently in the funded exposed set.
#[derive(Deref, DerefMut, Traversable)]
pub struct ExposedAddrSupplyVecs<M: StorageMode = Rw>(
#[traversable(flatten)] pub WithAddrTypes<PerBlock<Sats, M>>,
);
impl ExposedAddrSupplyVecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self(WithAddrTypes::<PerBlock<Sats>>::forced_import(
db,
"exposed_addr_supply",
version,
indexes,
)?))
}
}

View File

@@ -2,16 +2,26 @@ mod activity;
mod addr_count;
mod data;
mod delta;
mod exposed;
mod indexes;
mod new_addr_count;
mod reused;
mod total_addr_count;
mod type_map;
mod with_addr_types;
pub use activity::{AddrActivityVecs, AddrTypeToActivityCounts};
pub use addr_count::{AddrCountsVecs, AddrTypeToAddrCount};
pub use data::AddrsDataVecs;
pub use delta::DeltaVecs;
pub use exposed::{
AddrTypeToExposedAddrCount, AddrTypeToExposedAddrSupply, ExposedAddrVecs,
};
pub use indexes::AnyAddrIndexesVecs;
pub use new_addr_count::NewAddrCountVecs;
pub use reused::{
AddrTypeToReusedAddrCount, AddrTypeToReusedAddrUseCount, ReusedAddrVecs,
};
pub use total_addr_count::TotalAddrCountVecs;
pub use type_map::{AddrTypeToTypeIndexMap, AddrTypeToVec, HeightToAddrTypeToVec};
pub use with_addr_types::WithAddrTypes;

View File

@@ -1,7 +1,7 @@
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, StoredU64, Version};
use derive_more::{Deref, DerefMut};
use vecdb::{Database, Exit, Rw, StorageMode};
use crate::{
@@ -9,15 +9,14 @@ use crate::{
internal::{PerBlockCumulativeRolling, WindowStartVec, Windows},
};
use super::TotalAddrCountVecs;
use super::{TotalAddrCountVecs, WithAddrTypes};
/// New address count per block (global + per-type)
#[derive(Traversable)]
pub struct NewAddrCountVecs<M: StorageMode = Rw> {
pub all: PerBlockCumulativeRolling<StoredU64, StoredU64, M>,
/// New address count per block (global + per-type).
#[derive(Deref, DerefMut, Traversable)]
pub struct NewAddrCountVecs<M: StorageMode = Rw>(
#[traversable(flatten)]
pub by_addr_type: ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64, M>>,
}
pub WithAddrTypes<PerBlockCumulativeRolling<StoredU64, StoredU64, M>>,
);
impl NewAddrCountVecs {
pub(crate) fn forced_import(
@@ -26,25 +25,11 @@ impl NewAddrCountVecs {
indexes: &indexes::Vecs,
cached_starts: &Windows<&WindowStartVec>,
) -> Result<Self> {
let all = PerBlockCumulativeRolling::forced_import(
db,
"new_addr_count",
version,
indexes,
cached_starts,
)?;
let by_addr_type = ByAddrType::new_with_name(|name| {
PerBlockCumulativeRolling::forced_import(
db,
&format!("{name}_new_addr_count"),
version,
indexes,
cached_starts,
)
})?;
Ok(Self { all, by_addr_type })
Ok(Self(WithAddrTypes::<
PerBlockCumulativeRolling<StoredU64, StoredU64>,
>::forced_import(
db, "new_addr_count", version, indexes, cached_starts
)?))
}
pub(crate) fn compute(
@@ -53,11 +38,12 @@ impl NewAddrCountVecs {
total_addr_count: &TotalAddrCountVecs,
exit: &Exit,
) -> Result<()> {
self.all.compute(max_from, exit, |height_vec| {
self.0.all.compute(max_from, exit, |height_vec| {
Ok(height_vec.compute_change(max_from, &total_addr_count.all.height, 1, exit)?)
})?;
for ((_, new), (_, total)) in self
.0
.by_addr_type
.iter_mut()
.zip(total_addr_count.by_addr_type.iter())

View File

@@ -0,0 +1,78 @@
//! Reused address count tracking — running counters of how many addresses
//! are currently in (or have ever been in) the reused set, per address type
//! plus an aggregated `all`. See the parent [`super`] module for the
//! definition of "reused".
//!
//! Two counters are exposed:
//! - `funded`: addresses currently funded AND with `funded_txo_count > 1`
//! - `total`: addresses that have ever satisfied `funded_txo_count > 1` (monotonic)
mod state;
mod vecs;
pub use state::AddrTypeToReusedAddrCount;
pub use vecs::ReusedAddrCountAllVecs;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Indexes, Version};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Database, Exit, Rw, StorageMode};
use crate::indexes;
/// Reused address counts: funded (currently with balance) and total (ever reused).
#[derive(Traversable)]
pub struct ReusedAddrCountsVecs<M: StorageMode = Rw> {
pub funded: ReusedAddrCountAllVecs<M>,
pub total: ReusedAddrCountAllVecs<M>,
}
impl ReusedAddrCountsVecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self {
funded: ReusedAddrCountAllVecs::forced_import(
db,
"reused_addr_count",
version,
indexes,
)?,
total: ReusedAddrCountAllVecs::forced_import(
db,
"total_reused_addr_count",
version,
indexes,
)?,
})
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.funded
.min_stateful_len()
.min(self.total.min_stateful_len())
}
pub(crate) fn par_iter_height_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
self.funded
.par_iter_height_mut()
.chain(self.total.par_iter_height_mut())
}
pub(crate) fn reset_height(&mut self) -> Result<()> {
self.funded.reset_height()?;
self.total.reset_height()?;
Ok(())
}
pub(crate) fn compute_rest(&mut self, starting_indexes: &Indexes, exit: &Exit) -> Result<()> {
self.funded.compute_rest(starting_indexes, exit)?;
self.total.compute_rest(starting_indexes, exit)?;
Ok(())
}
}

View File

@@ -0,0 +1,42 @@
use brk_cohort::ByAddrType;
use brk_types::{Height, StoredU64};
use derive_more::{Deref, DerefMut};
use vecdb::ReadableVec;
use crate::internal::PerBlock;
use super::vecs::ReusedAddrCountAllVecs;
/// Runtime counter for reused address counts per address type.
#[derive(Debug, Default, Deref, DerefMut)]
pub struct AddrTypeToReusedAddrCount(ByAddrType<u64>);
impl AddrTypeToReusedAddrCount {
#[inline]
pub(crate) fn sum(&self) -> u64 {
self.0.values().sum()
}
}
impl From<(&ReusedAddrCountAllVecs, Height)> for AddrTypeToReusedAddrCount {
#[inline]
fn from((vecs, starting_height): (&ReusedAddrCountAllVecs, Height)) -> Self {
if let Some(prev_height) = starting_height.decremented() {
let read = |v: &PerBlock<StoredU64>| -> u64 {
v.height.collect_one(prev_height).unwrap().into()
};
Self(ByAddrType {
p2pk65: read(&vecs.by_addr_type.p2pk65),
p2pk33: read(&vecs.by_addr_type.p2pk33),
p2pkh: read(&vecs.by_addr_type.p2pkh),
p2sh: read(&vecs.by_addr_type.p2sh),
p2wpkh: read(&vecs.by_addr_type.p2wpkh),
p2wsh: read(&vecs.by_addr_type.p2wsh),
p2tr: read(&vecs.by_addr_type.p2tr),
p2a: read(&vecs.by_addr_type.p2a),
})
} else {
Default::default()
}
}
}

View File

@@ -0,0 +1,30 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{StoredU64, Version};
use derive_more::{Deref, DerefMut};
use vecdb::{Database, Rw, StorageMode};
use crate::{
distribution::addr::WithAddrTypes,
indexes,
internal::PerBlock,
};
/// Reused address count (`all` + per-type) for a single variant (funded or total).
#[derive(Deref, DerefMut, Traversable)]
pub struct ReusedAddrCountAllVecs<M: StorageMode = Rw>(
#[traversable(flatten)] pub WithAddrTypes<PerBlock<StoredU64, M>>,
);
impl ReusedAddrCountAllVecs {
pub(crate) fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self(WithAddrTypes::<PerBlock<StoredU64>>::forced_import(
db, name, version, indexes,
)?))
}
}

View File

@@ -0,0 +1,85 @@
//! Reused address tracking.
//!
//! An address is "reused" if its lifetime `funded_txo_count > 1` — i.e. it
//! has received more than one output across its lifetime. This is the
//! simplest output-multiplicity proxy for address linkability.
//!
//! Two facets are tracked here:
//! - [`count`] — how many distinct addresses are currently reused (funded)
//! and how many have *ever* been reused (total). Per address type plus
//! an aggregated `all`.
//! - [`uses`] — per-block count of outputs going to addresses that were
//! already reused, plus the derived percent over total address-output
//! count (denominator from `scripts::count`).
mod count;
mod uses;
pub use count::{AddrTypeToReusedAddrCount, ReusedAddrCountsVecs};
pub use uses::{AddrTypeToReusedAddrUseCount, ReusedAddrUsesVecs};
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Indexes, Version};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Database, Exit, Rw, StorageMode};
use crate::{
indexes,
internal::{WindowStartVec, Windows},
scripts,
};
/// Top-level container for all reused address tracking: counts (funded +
/// total) plus per-block uses (count + percent).
#[derive(Traversable)]
pub struct ReusedAddrVecs<M: StorageMode = Rw> {
pub count: ReusedAddrCountsVecs<M>,
pub uses: ReusedAddrUsesVecs<M>,
}
impl ReusedAddrVecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &Windows<&WindowStartVec>,
) -> Result<Self> {
Ok(Self {
count: ReusedAddrCountsVecs::forced_import(db, version, indexes)?,
uses: ReusedAddrUsesVecs::forced_import(db, version, indexes, cached_starts)?,
})
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.count
.min_stateful_len()
.min(self.uses.min_stateful_len())
}
pub(crate) fn par_iter_height_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
self.count
.par_iter_height_mut()
.chain(self.uses.par_iter_height_mut())
}
pub(crate) fn reset_height(&mut self) -> Result<()> {
self.count.reset_height()?;
self.uses.reset_height()?;
Ok(())
}
pub(crate) fn compute_rest(
&mut self,
starting_indexes: &Indexes,
scripts_count: &scripts::CountVecs,
exit: &Exit,
) -> Result<()> {
self.count.compute_rest(starting_indexes, exit)?;
self.uses
.compute_rest(starting_indexes, scripts_count, exit)?;
Ok(())
}
}

View File

@@ -0,0 +1,8 @@
//! Per-block reused-address-use tracking. See [`vecs::ReusedAddrUsesVecs`]
//! for the full description of the metric.
mod state;
mod vecs;
pub use state::AddrTypeToReusedAddrUseCount;
pub use vecs::ReusedAddrUsesVecs;

View File

@@ -0,0 +1,22 @@
use brk_cohort::ByAddrType;
use derive_more::{Deref, DerefMut};
/// Per-block running counter of reused address uses, per address type.
/// Reset at the start of each block (no disk recovery needed since the
/// per-block flow is reconstructed from `process_received` deterministically).
#[derive(Debug, Default, Deref, DerefMut)]
pub struct AddrTypeToReusedAddrUseCount(ByAddrType<u64>);
impl AddrTypeToReusedAddrUseCount {
#[inline]
pub(crate) fn sum(&self) -> u64 {
self.0.values().sum()
}
#[inline]
pub(crate) fn reset(&mut self) {
for v in self.0.values_mut() {
*v = 0;
}
}
}

View File

@@ -0,0 +1,152 @@
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{BasisPoints16, Height, Indexes, OutputType, StoredU64, Version};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Database, Exit, Rw, StorageMode};
use crate::{
distribution::addr::WithAddrTypes,
indexes,
internal::{
PerBlockCumulativeRolling, PercentCumulativeRolling, RatioU64Bp16, WindowStartVec, Windows,
},
scripts,
};
use super::state::AddrTypeToReusedAddrUseCount;
/// Per-block reused-address-use metrics. A "use" is a single output going
/// to an address (not deduplicated): an address receiving N outputs in one
/// block contributes N. The count only includes uses going to addresses
/// that were *already* reused at the moment of the use, so the use that
/// makes an address reused is not itself counted.
///
/// The denominator for the percent (total address-output count) lives in
/// `scripts::count` and is reused here rather than duplicated.
#[derive(Traversable)]
pub struct ReusedAddrUsesVecs<M: StorageMode = Rw> {
pub reused_addr_use_count:
WithAddrTypes<PerBlockCumulativeRolling<StoredU64, StoredU64, M>>,
pub reused_addr_use_percent: WithAddrTypes<PercentCumulativeRolling<BasisPoints16, M>>,
}
impl ReusedAddrUsesVecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &Windows<&WindowStartVec>,
) -> Result<Self> {
let reused_addr_use_count =
WithAddrTypes::<PerBlockCumulativeRolling<StoredU64, StoredU64>>::forced_import(
db,
"reused_addr_use_count",
version,
indexes,
cached_starts,
)?;
let percent_name = "reused_addr_use_percent";
let reused_addr_use_percent = WithAddrTypes {
all: PercentCumulativeRolling::forced_import(db, percent_name, version, indexes)?,
by_addr_type: ByAddrType::new_with_name(|type_name| {
PercentCumulativeRolling::forced_import(
db,
&format!("{type_name}_{percent_name}"),
version,
indexes,
)
})?,
};
Ok(Self {
reused_addr_use_count,
reused_addr_use_percent,
})
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.reused_addr_use_count.min_stateful_len()
}
pub(crate) fn par_iter_height_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
self.reused_addr_use_count.par_iter_height_mut()
}
pub(crate) fn reset_height(&mut self) -> Result<()> {
self.reused_addr_use_count.reset_height()
}
#[inline(always)]
pub(crate) fn push_height(&mut self, reused: &AddrTypeToReusedAddrUseCount) {
self.reused_addr_use_count
.push_height(reused.sum(), reused.values().copied());
}
pub(crate) fn compute_rest(
&mut self,
starting_indexes: &Indexes,
scripts_count: &scripts::CountVecs,
exit: &Exit,
) -> Result<()> {
self.reused_addr_use_count
.compute_rest(starting_indexes.height, exit)?;
compute_one_percent(
&mut self.reused_addr_use_percent.all,
&self.reused_addr_use_count.all,
&scripts_count.addr_output_count,
starting_indexes.height,
exit,
)?;
for otype in OutputType::ADDR_TYPES {
compute_one_percent(
self.reused_addr_use_percent
.by_addr_type
.get_mut_unwrap(otype),
self.reused_addr_use_count.by_addr_type.get_unwrap(otype),
denom_for_type(scripts_count, otype),
starting_indexes.height,
exit,
)?;
}
Ok(())
}
}
#[inline]
fn compute_one_percent(
percent: &mut PercentCumulativeRolling<BasisPoints16>,
reused: &PerBlockCumulativeRolling<StoredU64, StoredU64>,
denom: &PerBlockCumulativeRolling<StoredU64, StoredU64>,
starting_height: Height,
exit: &Exit,
) -> Result<()> {
percent.compute_binary::<StoredU64, StoredU64, RatioU64Bp16, _, _, _, _>(
starting_height,
&reused.cumulative.height,
&denom.cumulative.height,
reused.sum.as_array().map(|w| &w.height),
denom.sum.as_array().map(|w| &w.height),
exit,
)
}
#[inline]
fn denom_for_type(
scripts_count: &scripts::CountVecs,
otype: OutputType,
) -> &PerBlockCumulativeRolling<StoredU64, StoredU64> {
match otype {
OutputType::P2PK33 => &scripts_count.p2pk33,
OutputType::P2PK65 => &scripts_count.p2pk65,
OutputType::P2PKH => &scripts_count.p2pkh,
OutputType::P2SH => &scripts_count.p2sh,
OutputType::P2WPKH => &scripts_count.p2wpkh,
OutputType::P2WSH => &scripts_count.p2wsh,
OutputType::P2TR => &scripts_count.p2tr,
OutputType::P2A => &scripts_count.p2a,
_ => unreachable!("OutputType::ADDR_TYPES contains only address types"),
}
}

View File

@@ -1,20 +1,18 @@
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, StoredU64, Version};
use derive_more::{Deref, DerefMut};
use vecdb::{Database, Exit, Rw, StorageMode};
use crate::{indexes, internal::PerBlock};
use super::AddrCountsVecs;
use super::{AddrCountsVecs, WithAddrTypes};
/// Total address count (global + per-type) with all derived indexes
#[derive(Traversable)]
pub struct TotalAddrCountVecs<M: StorageMode = Rw> {
pub all: PerBlock<StoredU64, M>,
#[traversable(flatten)]
pub by_addr_type: ByAddrType<PerBlock<StoredU64, M>>,
}
/// Total address count (global + per-type) with all derived indexes.
#[derive(Deref, DerefMut, Traversable)]
pub struct TotalAddrCountVecs<M: StorageMode = Rw>(
#[traversable(flatten)] pub WithAddrTypes<PerBlock<StoredU64, M>>,
);
impl TotalAddrCountVecs {
pub(crate) fn forced_import(
@@ -22,13 +20,12 @@ impl TotalAddrCountVecs {
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
let all = PerBlock::forced_import(db, "total_addr_count", version, indexes)?;
let by_addr_type: ByAddrType<PerBlock<StoredU64>> = ByAddrType::new_with_name(|name| {
PerBlock::forced_import(db, &format!("{name}_total_addr_count"), version, indexes)
})?;
Ok(Self { all, by_addr_type })
Ok(Self(WithAddrTypes::<PerBlock<StoredU64>>::forced_import(
db,
"total_addr_count",
version,
indexes,
)?))
}
/// Eagerly compute total = addr_count + empty_addr_count.
@@ -39,14 +36,14 @@ impl TotalAddrCountVecs {
empty_addr_count: &AddrCountsVecs,
exit: &Exit,
) -> Result<()> {
self.all.height.compute_add(
self.0.all.height.compute_add(
max_from,
&addr_count.all.height,
&empty_addr_count.all.height,
exit,
)?;
for ((_, total), ((_, addr), (_, empty))) in self.by_addr_type.iter_mut().zip(
for ((_, total), ((_, addr), (_, empty))) in self.0.by_addr_type.iter_mut().zip(
addr_count
.by_addr_type
.iter()

View File

@@ -0,0 +1,173 @@
//! Generic `all` + per-`AddrType` container, mirrors the `WithSth` pattern
//! along the address-type axis. Used by every metric that tracks one
//! aggregate value alongside a per-address-type breakdown.
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, Indexes, Version};
use rayon::prelude::*;
use schemars::JsonSchema;
use vecdb::{AnyStoredVec, AnyVec, Database, EagerVec, Exit, PcoVec, WritableVec};
use crate::{
indexes,
internal::{NumericValue, PerBlock, PerBlockCumulativeRolling, WindowStartVec, Windows},
};
/// `all` aggregate plus per-`AddrType` breakdown.
#[derive(Clone, Traversable)]
pub struct WithAddrTypes<T> {
pub all: T,
#[traversable(flatten)]
pub by_addr_type: ByAddrType<T>,
}
impl<T> WithAddrTypes<PerBlock<T>>
where
T: NumericValue + JsonSchema,
{
pub(crate) fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
let all = PerBlock::forced_import(db, name, version, indexes)?;
let by_addr_type = ByAddrType::new_with_name(|type_name| {
PerBlock::forced_import(db, &format!("{type_name}_{name}"), version, indexes)
})?;
Ok(Self { all, by_addr_type })
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.by_addr_type
.values()
.map(|v| v.height.len())
.min()
.unwrap()
.min(self.all.height.len())
}
pub(crate) fn par_iter_height_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
rayon::iter::once(&mut self.all.height as &mut dyn AnyStoredVec).chain(
self.by_addr_type
.par_values_mut()
.map(|v| &mut v.height as &mut dyn AnyStoredVec),
)
}
pub(crate) fn reset_height(&mut self) -> Result<()> {
self.all.height.reset()?;
for v in self.by_addr_type.values_mut() {
v.height.reset()?;
}
Ok(())
}
#[inline(always)]
pub(crate) fn push_height<U>(&mut self, total: U, per_type: impl IntoIterator<Item = U>)
where
U: Into<T>,
{
self.all.height.push(total.into());
for (v, value) in self.by_addr_type.values_mut().zip(per_type) {
v.height.push(value.into());
}
}
/// Compute `all.height` as the per-block sum of the per-type vecs.
pub(crate) fn compute_rest(
&mut self,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
let sources: Vec<&EagerVec<PcoVec<Height, T>>> =
self.by_addr_type.values().map(|v| &v.height).collect();
self.all
.height
.compute_sum_of_others(starting_indexes.height, &sources, exit)?;
Ok(())
}
}
impl<T, C> WithAddrTypes<PerBlockCumulativeRolling<T, C>>
where
T: NumericValue + JsonSchema + Into<C>,
C: NumericValue + JsonSchema,
{
pub(crate) fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &Windows<&WindowStartVec>,
) -> Result<Self> {
let all = PerBlockCumulativeRolling::forced_import(
db,
name,
version,
indexes,
cached_starts,
)?;
let by_addr_type = ByAddrType::new_with_name(|type_name| {
PerBlockCumulativeRolling::forced_import(
db,
&format!("{type_name}_{name}"),
version,
indexes,
cached_starts,
)
})?;
Ok(Self { all, by_addr_type })
}
pub(crate) fn min_stateful_len(&self) -> usize {
self.by_addr_type
.values()
.map(|v| v.block.len())
.min()
.unwrap()
.min(self.all.block.len())
}
pub(crate) fn par_iter_height_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
rayon::iter::once(&mut self.all.block as &mut dyn AnyStoredVec).chain(
self.by_addr_type
.par_values_mut()
.map(|v| &mut v.block as &mut dyn AnyStoredVec),
)
}
pub(crate) fn reset_height(&mut self) -> Result<()> {
self.all.block.reset()?;
for v in self.by_addr_type.values_mut() {
v.block.reset()?;
}
Ok(())
}
#[inline(always)]
pub(crate) fn push_height<U>(&mut self, total: U, per_type: impl IntoIterator<Item = U>)
where
U: Into<T>,
{
self.all.block.push(total.into());
for (v, value) in self.by_addr_type.values_mut().zip(per_type) {
v.block.push(value.into());
}
}
/// Finalize `cumulative` / `sum` / `average` for `all` and every per-type vec.
pub(crate) fn compute_rest(&mut self, max_from: Height, exit: &Exit) -> Result<()> {
self.all.compute_rest(max_from, exit)?;
for v in self.by_addr_type.values_mut() {
v.compute_rest(max_from, exit)?;
}
Ok(())
}
}

View File

@@ -3,7 +3,10 @@ use brk_types::{Cents, Sats, TypeIndex};
use rustc_hash::FxHashMap;
use crate::distribution::{
addr::{AddrTypeToActivityCounts, AddrTypeToVec},
addr::{
AddrTypeToActivityCounts, AddrTypeToExposedAddrCount, AddrTypeToExposedAddrSupply,
AddrTypeToReusedAddrCount, AddrTypeToReusedAddrUseCount, AddrTypeToVec,
},
cohorts::AddrCohorts,
};
@@ -25,6 +28,12 @@ pub(crate) fn process_received(
addr_count: &mut ByAddrType<u64>,
empty_addr_count: &mut ByAddrType<u64>,
activity_counts: &mut AddrTypeToActivityCounts,
reused_addr_count: &mut AddrTypeToReusedAddrCount,
total_reused_addr_count: &mut AddrTypeToReusedAddrCount,
reused_addr_use_count: &mut AddrTypeToReusedAddrUseCount,
exposed_addr_count: &mut AddrTypeToExposedAddrCount,
total_exposed_addr_count: &mut AddrTypeToExposedAddrCount,
exposed_addr_supply: &mut AddrTypeToExposedAddrSupply,
) {
let max_type_len = received_data
.iter()
@@ -43,6 +52,12 @@ pub(crate) fn process_received(
let type_addr_count = addr_count.get_mut(output_type).unwrap();
let type_empty_count = empty_addr_count.get_mut(output_type).unwrap();
let type_activity = activity_counts.get_mut_unwrap(output_type);
let type_reused_count = reused_addr_count.get_mut(output_type).unwrap();
let type_total_reused_count = total_reused_addr_count.get_mut(output_type).unwrap();
let type_reused_use_count = reused_addr_use_count.get_mut(output_type).unwrap();
let type_exposed_count = exposed_addr_count.get_mut(output_type).unwrap();
let type_total_exposed_count = total_exposed_addr_count.get_mut(output_type).unwrap();
let type_exposed_supply = exposed_addr_supply.get_mut(output_type).unwrap();
// Aggregate receives by address - each address processed exactly once
for (type_index, value) in vec {
@@ -57,6 +72,13 @@ pub(crate) fn process_received(
// Track receiving activity - each address in receive aggregation
type_activity.receiving += 1;
// Capture state BEFORE the receive mutates funded_txo_count
let was_funded = addr_data.is_funded();
let was_reused = addr_data.is_reused();
let funded_txo_count_before = addr_data.funded_txo_count;
let was_pubkey_exposed = addr_data.is_pubkey_exposed(output_type);
let exposed_contribution_before = addr_data.exposed_supply_contribution(output_type);
match status {
TrackingStatus::New => {
*type_addr_count += 1;
@@ -134,6 +156,54 @@ pub(crate) fn process_received(
.receive_outputs(addr_data, recv.total_value, price, recv.output_count);
}
}
// Update reused counts based on the post-receive state
let is_now_reused = addr_data.is_reused();
if is_now_reused && !was_reused {
// Newly crossed the reuse threshold this block
*type_reused_count += 1;
*type_total_reused_count += 1;
} else if is_now_reused && !was_funded {
// Already-reused address reactivating into the funded set
*type_reused_count += 1;
}
// Per-block reused-use count: every individual output to this
// address counts iff the address was already reused at the
// moment of that output. With aggregation, that means we
// skip enough outputs at the front to take the lifetime
// funding count from `funded_txo_count_before` past 1, then
// count the rest. `skipped` is `max(0, 2 - before)`.
let skipped = 2u32.saturating_sub(funded_txo_count_before);
let counted = recv.output_count.saturating_sub(skipped);
*type_reused_use_count += u64::from(counted);
// Update exposed counts. The address's pubkey-exposure state
// is unchanged by a receive (spent_txo_count unchanged), so we
// can use the captured `was_pubkey_exposed` for both pre and post.
// After the receive the address is always funded, so it's in the
// funded exposed set iff its pubkey is exposed.
//
// Funded exposed enters when the address wasn't funded before but
// is now AND its pubkey is exposed.
// Total exposed (pk_exposed_at_funding types only) increments on
// first-ever receive (status == TrackingStatus::New); for other
// types it's incremented in process_sent on the first spend.
if !was_funded && was_pubkey_exposed {
*type_exposed_count += 1;
}
if output_type.pubkey_exposed_at_funding()
&& matches!(status, TrackingStatus::New)
{
*type_total_exposed_count += 1;
}
// Update exposed supply via post-receive contribution delta.
let exposed_contribution_after =
addr_data.exposed_supply_contribution(output_type);
// Receives can only add to balance and membership, so the delta
// is always non-negative.
*type_exposed_supply += exposed_contribution_after - exposed_contribution_before;
}
}
}

View File

@@ -5,7 +5,10 @@ use rustc_hash::FxHashSet;
use vecdb::VecIndex;
use crate::distribution::{
addr::{AddrTypeToActivityCounts, HeightToAddrTypeToVec},
addr::{
AddrTypeToActivityCounts, AddrTypeToExposedAddrCount, AddrTypeToExposedAddrSupply,
AddrTypeToReusedAddrCount, HeightToAddrTypeToVec,
},
cohorts::AddrCohorts,
compute::PriceRangeMax,
};
@@ -35,6 +38,10 @@ pub(crate) fn process_sent(
addr_count: &mut ByAddrType<u64>,
empty_addr_count: &mut ByAddrType<u64>,
activity_counts: &mut AddrTypeToActivityCounts,
reused_addr_count: &mut AddrTypeToReusedAddrCount,
exposed_addr_count: &mut AddrTypeToExposedAddrCount,
total_exposed_addr_count: &mut AddrTypeToExposedAddrCount,
exposed_addr_supply: &mut AddrTypeToExposedAddrSupply,
received_addrs: &ByAddrType<FxHashSet<TypeIndex>>,
height_to_price: &[Cents],
height_to_timestamp: &[Timestamp],
@@ -57,6 +64,10 @@ pub(crate) fn process_sent(
let type_addr_count = addr_count.get_mut(output_type).unwrap();
let type_empty_count = empty_addr_count.get_mut(output_type).unwrap();
let type_activity = activity_counts.get_mut_unwrap(output_type);
let type_reused_count = reused_addr_count.get_mut(output_type).unwrap();
let type_exposed_count = exposed_addr_count.get_mut(output_type).unwrap();
let type_total_exposed_count = total_exposed_addr_count.get_mut(output_type).unwrap();
let type_exposed_supply = exposed_addr_supply.get_mut(output_type).unwrap();
let type_received = received_addrs.get(output_type);
let type_seen = seen_senders.get_mut_unwrap(output_type);
@@ -78,6 +89,11 @@ pub(crate) fn process_sent(
let will_be_empty = addr_data.has_1_utxos();
// Capture exposed state BEFORE the spend mutates spent_txo_count.
let was_pubkey_exposed = addr_data.is_pubkey_exposed(output_type);
let exposed_contribution_before =
addr_data.exposed_supply_contribution(output_type);
// Compute buckets once
let prev_bucket = AmountBucket::from(prev_balance);
let new_bucket = AmountBucket::from(new_balance);
@@ -91,6 +107,27 @@ pub(crate) fn process_sent(
.unwrap();
cohort_state.send(addr_data, value, current_price, prev_price, peak_price, age)?;
// addr_data.spent_txo_count is now incremented by 1.
// Update exposed supply via post-spend contribution delta.
let exposed_contribution_after =
addr_data.exposed_supply_contribution(output_type);
if exposed_contribution_after >= exposed_contribution_before {
*type_exposed_supply += exposed_contribution_after - exposed_contribution_before;
} else {
*type_exposed_supply -= exposed_contribution_before - exposed_contribution_after;
}
// Update exposed counts on first-ever pubkey exposure.
// For non-pk-exposed types this fires on the first spend; for
// pk-exposed types it never fires here (was_pubkey_exposed was
// already true at first receive in process_received).
if !was_pubkey_exposed {
*type_total_exposed_count += 1;
if !will_be_empty {
*type_exposed_count += 1;
}
}
// If crossing a bucket boundary, remove the (now-updated) address from old bucket
if will_be_empty || crossing_boundary {
@@ -101,6 +138,17 @@ pub(crate) fn process_sent(
if will_be_empty {
*type_addr_count -= 1;
*type_empty_count += 1;
// Reused addr leaving the funded reused set
if addr_data.is_reused() {
*type_reused_count -= 1;
}
// Exposed addr leaving the funded exposed set: was in set
// iff its pubkey was exposed pre-spend (since it was funded
// to be in process_sent in the first place), and now leaves
// because it's empty.
if was_pubkey_exposed {
*type_exposed_count -= 1;
}
lookup.move_to_empty(output_type, type_index);
} else if crossing_boundary {
cohorts

View File

@@ -11,7 +11,10 @@ use vecdb::{AnyStoredVec, AnyVec, Exit, ReadableVec, VecIndex, WritableVec, unli
use crate::{
distribution::{
addr::{AddrTypeToActivityCounts, AddrTypeToAddrCount},
addr::{
AddrTypeToActivityCounts, AddrTypeToAddrCount, AddrTypeToExposedAddrCount,
AddrTypeToExposedAddrSupply, AddrTypeToReusedAddrCount, AddrTypeToReusedAddrUseCount,
},
block::{
AddrCache, InputsResult, process_inputs, process_outputs, process_received,
process_sent,
@@ -192,22 +195,41 @@ pub(crate) fn process_blocks(
// Track running totals - recover from previous height if resuming
debug!("recovering addr_counts from height {}", starting_height);
let (mut addr_counts, mut empty_addr_counts) = if starting_height > Height::ZERO {
let addr_counts =
AddrTypeToAddrCount::from((&vecs.addrs.funded.by_addr_type, starting_height));
let empty_addr_counts =
AddrTypeToAddrCount::from((&vecs.addrs.empty.by_addr_type, starting_height));
(addr_counts, empty_addr_counts)
let (
mut addr_counts,
mut empty_addr_counts,
mut reused_addr_counts,
mut total_reused_addr_counts,
mut exposed_addr_counts,
mut total_exposed_addr_counts,
mut exposed_addr_supply,
) = if starting_height > Height::ZERO {
(
AddrTypeToAddrCount::from((&vecs.addrs.funded.by_addr_type, starting_height)),
AddrTypeToAddrCount::from((&vecs.addrs.empty.by_addr_type, starting_height)),
AddrTypeToReusedAddrCount::from((&vecs.addrs.reused.count.funded, starting_height)),
AddrTypeToReusedAddrCount::from((&vecs.addrs.reused.count.total, starting_height)),
AddrTypeToExposedAddrCount::from((&vecs.addrs.exposed.count.funded, starting_height)),
AddrTypeToExposedAddrCount::from((&vecs.addrs.exposed.count.total, starting_height)),
AddrTypeToExposedAddrSupply::from((&vecs.addrs.exposed.supply, starting_height)),
)
} else {
(
AddrTypeToAddrCount::default(),
AddrTypeToAddrCount::default(),
AddrTypeToReusedAddrCount::default(),
AddrTypeToReusedAddrCount::default(),
AddrTypeToExposedAddrCount::default(),
AddrTypeToExposedAddrCount::default(),
AddrTypeToExposedAddrSupply::default(),
)
};
debug!("addr_counts recovered");
// Track activity counts - reset each block
let mut activity_counts = AddrTypeToActivityCounts::default();
// Reused-use count - per-block flow, reset each block
let mut reused_addr_use_counts = AddrTypeToReusedAddrUseCount::default();
debug!("creating AddrCache");
let mut cache = AddrCache::new();
@@ -226,6 +248,8 @@ pub(crate) fn process_blocks(
.chain(vecs.addrs.funded.par_iter_height_mut())
.chain(vecs.addrs.empty.par_iter_height_mut())
.chain(vecs.addrs.activity.par_iter_height_mut())
.chain(vecs.addrs.reused.par_iter_height_mut())
.chain(vecs.addrs.exposed.par_iter_height_mut())
.chain(rayon::iter::once(
&mut vecs.coinblocks_destroyed.block as &mut dyn AnyStoredVec,
))
@@ -278,6 +302,7 @@ pub(crate) fn process_blocks(
// Reset per-block activity counts
activity_counts.reset();
reused_addr_use_counts.reset();
// Process outputs, inputs, and tick-tock in parallel via rayon::join.
// Collection (build tx_index mappings + bulk mmap reads) is merged into the
@@ -447,6 +472,12 @@ pub(crate) fn process_blocks(
&mut addr_counts,
&mut empty_addr_counts,
&mut activity_counts,
&mut reused_addr_counts,
&mut total_reused_addr_counts,
&mut reused_addr_use_counts,
&mut exposed_addr_counts,
&mut total_exposed_addr_counts,
&mut exposed_addr_supply,
);
// Process sent inputs (addresses sending funds)
@@ -459,6 +490,10 @@ pub(crate) fn process_blocks(
&mut addr_counts,
&mut empty_addr_counts,
&mut activity_counts,
&mut reused_addr_counts,
&mut exposed_addr_counts,
&mut total_exposed_addr_counts,
&mut exposed_addr_supply,
&received_addrs,
height_to_price_vec,
height_to_timestamp_vec,
@@ -481,6 +516,27 @@ pub(crate) fn process_blocks(
.empty
.push_height(empty_addr_counts.sum(), &empty_addr_counts);
vecs.addrs.activity.push_height(&activity_counts);
vecs.addrs.reused.count.funded.push_height(
reused_addr_counts.sum(),
reused_addr_counts.values().copied(),
);
vecs.addrs.reused.count.total.push_height(
total_reused_addr_counts.sum(),
total_reused_addr_counts.values().copied(),
);
vecs.addrs.reused.uses.push_height(&reused_addr_use_counts);
vecs.addrs.exposed.count.funded.push_height(
exposed_addr_counts.sum(),
exposed_addr_counts.values().copied(),
);
vecs.addrs.exposed.count.total.push_height(
total_exposed_addr_counts.sum(),
total_exposed_addr_counts.values().copied(),
);
vecs.addrs.exposed.supply.push_height(
exposed_addr_supply.sum(),
exposed_addr_supply.values().copied(),
);
let is_last_of_day = is_last_of_day[offset];
let date_opt = is_last_of_day.then(|| Date::from(timestamp));

View File

@@ -79,6 +79,8 @@ pub(crate) fn write(
.chain(vecs.addrs.funded.par_iter_height_mut())
.chain(vecs.addrs.empty.par_iter_height_mut())
.chain(vecs.addrs.activity.par_iter_height_mut())
.chain(vecs.addrs.reused.par_iter_height_mut())
.chain(vecs.addrs.exposed.par_iter_height_mut())
.chain(
[
&mut vecs.supply_state as &mut dyn AnyStoredVec,

View File

@@ -27,12 +27,15 @@ use crate::{
PerBlockCumulativeRolling, WindowStartVec, Windows,
db_utils::{finalize_db, open_db},
},
outputs, prices, transactions,
outputs, prices, scripts, transactions,
};
use super::{
AddrCohorts, AddrsDataVecs, AnyAddrIndexesVecs, RangeMap, UTXOCohorts,
addr::{AddrActivityVecs, AddrCountsVecs, DeltaVecs, NewAddrCountVecs, TotalAddrCountVecs},
addr::{
AddrActivityVecs, AddrCountsVecs, DeltaVecs, ExposedAddrVecs, NewAddrCountVecs,
ReusedAddrVecs, TotalAddrCountVecs,
},
};
const VERSION: Version = Version::new(22);
@@ -44,6 +47,8 @@ pub struct AddrMetricsVecs<M: StorageMode = Rw> {
pub activity: AddrActivityVecs<M>,
pub total: TotalAddrCountVecs<M>,
pub new: NewAddrCountVecs<M>,
pub reused: ReusedAddrVecs<M>,
pub exposed: ExposedAddrVecs<M>,
pub delta: DeltaVecs,
#[traversable(wrap = "indexes", rename = "funded")]
pub funded_index:
@@ -154,6 +159,13 @@ impl Vecs {
// Per-block delta of total (global + per-type)
let new_addr_count = NewAddrCountVecs::forced_import(&db, version, indexes, cached_starts)?;
// Reused address tracking (counts + per-block uses + percent)
let reused_addr_count =
ReusedAddrVecs::forced_import(&db, version, indexes, cached_starts)?;
// Exposed address tracking (counts + supply) - quantum / pubkey-exposure sense
let exposed_addr_vecs = ExposedAddrVecs::forced_import(&db, version, indexes)?;
// Growth rate: delta change + rate (global + per-type)
let delta = DeltaVecs::new(version, &addr_count, cached_starts, indexes);
@@ -169,6 +181,8 @@ impl Vecs {
activity: addr_activity,
total: total_addr_count,
new: new_addr_count,
reused: reused_addr_count,
exposed: exposed_addr_vecs,
delta,
funded_index: funded_addr_index,
empty_index: empty_addr_index,
@@ -221,6 +235,7 @@ impl Vecs {
indexes: &indexes::Vecs,
inputs: &inputs::Vecs,
outputs: &outputs::Vecs,
scripts: &scripts::Vecs,
transactions: &transactions::Vecs,
blocks: &blocks::Vecs,
prices: &prices::Vecs,
@@ -285,6 +300,8 @@ impl Vecs {
self.addrs.funded.reset_height()?;
self.addrs.empty.reset_height()?;
self.addrs.activity.reset_height()?;
self.addrs.reused.reset_height()?;
self.addrs.exposed.reset_height()?;
reset_state(
&mut self.any_addr_indexes,
&mut self.addrs_data,
@@ -454,6 +471,10 @@ impl Vecs {
// 6b. Compute address count sum (by addr_type -> all)
self.addrs.funded.compute_rest(starting_indexes, exit)?;
self.addrs.empty.compute_rest(starting_indexes, exit)?;
self.addrs
.reused
.compute_rest(starting_indexes, &scripts.count, exit)?;
self.addrs.exposed.compute_rest(starting_indexes, exit)?;
// 6c. Compute total_addr_count = addr_count + empty_addr_count
self.addrs.total.compute(
@@ -524,6 +545,8 @@ impl Vecs {
.min(Height::from(self.addrs.funded.min_stateful_len()))
.min(Height::from(self.addrs.empty.min_stateful_len()))
.min(Height::from(self.addrs.activity.min_stateful_len()))
.min(Height::from(self.addrs.reused.min_stateful_len()))
.min(Height::from(self.addrs.exposed.min_stateful_len()))
.min(Height::from(self.coinblocks_destroyed.block.len()))
}
}

View File

@@ -0,0 +1,104 @@
use brk_error::{OptionData, Result};
use brk_indexer::Indexer;
use brk_types::{Indexes, StoredU64};
use vecdb::{AnyVec, Exit, ReadableVec, VecIndex, WritableVec};
use super::Vecs;
use crate::internal::{
PerBlockFull, compute_by_addr_type_block_counts, compute_by_addr_type_tx_percents,
};
impl Vecs {
/// Phase 1: walk inputs and populate `input_count` + `tx_count`.
/// Independent of transactions, can run alongside other inputs work.
pub(crate) fn compute_counts(
&mut self,
indexer: &Indexer,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
let dep_version = indexer.vecs.inputs.output_type.version()
+ indexer.vecs.transactions.first_tx_index.version()
+ indexer.vecs.transactions.first_txin_index.version()
+ indexer.vecs.transactions.txid.version();
for (_, v) in self.input_count.iter_mut() {
v.block
.validate_and_truncate(dep_version, starting_indexes.height)?;
}
for (_, v) in self.tx_count.iter_mut() {
v.block
.validate_and_truncate(dep_version, starting_indexes.height)?;
}
let skip = self
.input_count
.values()
.map(|v| v.block.len())
.min()
.unwrap()
.min(self.tx_count.values().map(|v| v.block.len()).min().unwrap());
let first_tx_index = &indexer.vecs.transactions.first_tx_index;
let end = first_tx_index.len();
if skip >= end {
return Ok(());
}
for (_, v) in self.input_count.iter_mut() {
v.block.truncate_if_needed_at(skip)?;
}
for (_, v) in self.tx_count.iter_mut() {
v.block.truncate_if_needed_at(skip)?;
}
let fi_batch = first_tx_index.collect_range_at(skip, end);
let txid_len = indexer.vecs.transactions.txid.len();
let total_txin_len = indexer.vecs.inputs.output_type.len();
let mut itype_cursor = indexer.vecs.inputs.output_type.cursor();
let mut fi_in_cursor = indexer.vecs.transactions.first_txin_index.cursor();
compute_by_addr_type_block_counts(
&mut self.input_count,
&mut self.tx_count,
&fi_batch,
txid_len,
true, // skip coinbase (1 fake input)
starting_indexes.height,
exit,
|tx_pos, per_tx| {
let fi_in = fi_in_cursor.get(tx_pos).data()?.to_usize();
let next_fi_in = if tx_pos + 1 < txid_len {
fi_in_cursor.get(tx_pos + 1).data()?.to_usize()
} else {
total_txin_len
};
itype_cursor.advance(fi_in - itype_cursor.position());
for _ in fi_in..next_fi_in {
let otype = itype_cursor.next().unwrap();
per_tx[otype as usize] += 1;
}
Ok(())
},
)
}
/// Phase 2: derive `tx_percent` from `tx_count` and the total tx count.
/// Must run after `transactions::Vecs::compute`.
pub(crate) fn compute_percents(
&mut self,
transactions_count_total: &PerBlockFull<StoredU64>,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
compute_by_addr_type_tx_percents(
&self.tx_count,
&mut self.tx_percent,
transactions_count_total,
starting_indexes,
exit,
)
}
}

View File

@@ -0,0 +1,48 @@
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_types::Version;
use vecdb::Database;
use super::Vecs;
use crate::{
indexes,
internal::{PerBlockCumulativeRolling, PercentCumulativeRolling, WindowStartVec, Windows},
};
impl Vecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &Windows<&WindowStartVec>,
) -> Result<Self> {
Ok(Self {
input_count: ByAddrType::new_with_name(|name| {
PerBlockCumulativeRolling::forced_import(
db,
&format!("{name}_input_count"),
version,
indexes,
cached_starts,
)
})?,
tx_count: ByAddrType::new_with_name(|name| {
PerBlockCumulativeRolling::forced_import(
db,
&format!("tx_count_with_{name}_in"),
version,
indexes,
cached_starts,
)
})?,
tx_percent: ByAddrType::new_with_name(|name| {
PercentCumulativeRolling::forced_import(
db,
&format!("tx_count_with_{name}_in_rel_to_all"),
version,
indexes,
)
})?,
})
}
}

View File

@@ -0,0 +1,5 @@
mod compute;
mod import;
mod vecs;
pub use vecs::Vecs;

View File

@@ -0,0 +1,18 @@
use brk_cohort::ByAddrType;
use brk_traversable::Traversable;
use brk_types::{BasisPoints16, StoredU64};
use vecdb::{Rw, StorageMode};
use crate::internal::{PerBlockCumulativeRolling, PercentCumulativeRolling};
#[derive(Traversable)]
pub struct Vecs<M: StorageMode = Rw> {
/// Per-block, per-type total input count (granular). The "type" is the
/// type of the spent output that the input consumes.
pub input_count: ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64, M>>,
/// Per-block, per-type count of TXs containing at least one input that
/// spends an output of this type.
pub tx_count: ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64, M>>,
/// Per-type tx_count as a percent of total tx count.
pub tx_percent: ByAddrType<PercentCumulativeRolling<BasisPoints16, M>>,
}

View File

@@ -0,0 +1,125 @@
//! Shared per-block per-address-type counters.
//!
//! Used by `outputs/by_type/` (counts outputs per type) and `inputs/by_type/`
//! (counts inputs per type). Walks each block's tx range, calls a scanner
//! callback that fills a `[u32; 12]` per-tx counter, and produces two
//! per-block aggregates in a single pass:
//!
//! - `entry_count` — total number of items (outputs / inputs) per type
//! - `tx_count` — number of txs that contain at least one item of each type
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_types::{BasisPoints16, Height, Indexes, OutputType, StoredU64, TxIndex};
use vecdb::{AnyStoredVec, Exit, VecIndex, WritableVec};
use crate::internal::{
PerBlockCumulativeRolling, PerBlockFull, PercentCumulativeRolling, RatioU64Bp16,
};
/// Per-block scan that simultaneously computes:
/// - `entry_count[type] += per_tx[type]` (sum of items)
/// - `tx_count[type] += 1 if per_tx[type] > 0` (presence flag)
///
/// `scan_tx` is called once per tx with a zeroed `[u32; 12]` buffer that
/// it must fill with the per-type item count for that tx.
#[allow(clippy::too_many_arguments)]
pub(crate) fn compute_by_addr_type_block_counts(
entry_count: &mut ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64>>,
tx_count: &mut ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64>>,
fi_batch: &[TxIndex],
txid_len: usize,
skip_first_tx: bool,
starting_height: Height,
exit: &Exit,
mut scan_tx: impl FnMut(usize, &mut [u32; 12]) -> Result<()>,
) -> Result<()> {
for (j, first_tx) in fi_batch.iter().enumerate() {
let fi = first_tx.to_usize();
let next_fi = fi_batch
.get(j + 1)
.map(|v| v.to_usize())
.unwrap_or(txid_len);
let start_tx = if skip_first_tx { fi + 1 } else { fi };
let mut entries_per_block = [0u64; 12];
let mut txs_per_block = [0u64; 12];
for tx_pos in start_tx..next_fi {
let mut per_tx = [0u32; 12];
scan_tx(tx_pos, &mut per_tx)?;
for (i, &n) in per_tx.iter().enumerate() {
if n > 0 {
entries_per_block[i] += u64::from(n);
txs_per_block[i] += 1;
}
}
}
for otype in OutputType::ADDR_TYPES {
let idx = otype as usize;
entry_count
.get_mut_unwrap(otype)
.block
.push(StoredU64::from(entries_per_block[idx]));
tx_count
.get_mut_unwrap(otype)
.block
.push(StoredU64::from(txs_per_block[idx]));
}
if entry_count.p2pkh.block.batch_limit_reached() {
let _lock = exit.lock();
for (_, v) in entry_count.iter_mut() {
v.block.write()?;
}
for (_, v) in tx_count.iter_mut() {
v.block.write()?;
}
}
}
{
let _lock = exit.lock();
for (_, v) in entry_count.iter_mut() {
v.block.write()?;
}
for (_, v) in tx_count.iter_mut() {
v.block.write()?;
}
}
for (_, v) in entry_count.iter_mut() {
v.compute_rest(starting_height, exit)?;
}
for (_, v) in tx_count.iter_mut() {
v.compute_rest(starting_height, exit)?;
}
Ok(())
}
/// Compute per-type tx-count percent over total tx count, for all 8 address types.
pub(crate) fn compute_by_addr_type_tx_percents(
tx_count: &ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64>>,
tx_percent: &mut ByAddrType<PercentCumulativeRolling<BasisPoints16>>,
count_total: &PerBlockFull<StoredU64>,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
for otype in OutputType::ADDR_TYPES {
let source = tx_count.get_unwrap(otype);
tx_percent
.get_mut_unwrap(otype)
.compute_binary::<StoredU64, StoredU64, RatioU64Bp16, _, _, _, _>(
starting_indexes.height,
&source.cumulative.height,
&count_total.cumulative.height,
source.sum.as_array().map(|w| &w.height),
count_total.rolling.sum.as_array().map(|w| &w.height),
exit,
)?;
}
Ok(())
}

View File

@@ -1,5 +1,6 @@
pub(crate) mod algo;
mod amount;
mod by_type_counts;
mod cache_budget;
mod containers;
pub(crate) mod db_utils;
@@ -10,6 +11,7 @@ mod traits;
mod transform;
pub(crate) use amount::*;
pub(crate) use by_type_counts::*;
pub(crate) use cache_budget::*;
pub(crate) use containers::*;
pub(crate) use indexes::*;

View File

@@ -2,7 +2,7 @@ use std::collections::VecDeque;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, get_percentile};
use brk_types::{Height, VSize, get_percentile, get_weighted_percentile};
use derive_more::{Deref, DerefMut};
use schemars::JsonSchema;
use vecdb::{
@@ -154,6 +154,141 @@ impl<T: NumericValue + JsonSchema> PerBlockDistribution<T> {
Ok(())
}
/// Like `compute_with_skip` but uses vsize-weighted percentiles.
/// Each transaction's contribution to percentile rank is proportional to its vsize.
#[allow(clippy::too_many_arguments)]
pub(crate) fn compute_with_skip_weighted<A>(
&mut self,
max_from: Height,
source: &impl ReadableVec<A, T>,
vsize_source: &impl ReadableVec<A, VSize>,
first_indexes: &impl ReadableVec<Height, A>,
count_indexes: &impl ReadableVec<Height, brk_types::StoredU64>,
exit: &Exit,
skip_count: usize,
) -> Result<()>
where
A: VecIndex + VecValue + brk_types::CheckedSub<A>,
{
let DistributionStats {
min,
max,
pct10,
pct25,
median,
pct75,
pct90,
} = &mut self.0;
let min = &mut min.height;
let max = &mut max.height;
let pct10 = &mut pct10.height;
let pct25 = &mut pct25.height;
let median = &mut median.height;
let pct75 = &mut pct75.height;
let pct90 = &mut pct90.height;
let combined_version = source.version()
+ vsize_source.version()
+ first_indexes.version()
+ count_indexes.version();
let mut index = max_from;
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.validate_computed_version_or_reset(combined_version)?;
index = index.min(Height::from(vec.len()));
}
let start = index.to_usize();
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.truncate_if_needed_at(start)?;
}
let fi_len = first_indexes.len();
let first_indexes_batch: Vec<A> = first_indexes.collect_range_at(start, fi_len);
let count_indexes_batch: Vec<brk_types::StoredU64> =
count_indexes.collect_range_at(start, fi_len);
let zero = T::from(0_usize);
let mut values: Vec<T> = Vec::new();
let mut vsizes: Vec<VSize> = Vec::new();
let mut weighted: Vec<(T, VSize)> = Vec::new();
first_indexes_batch
.into_iter()
.zip(count_indexes_batch)
.try_for_each(|(first_index, count_index)| -> Result<()> {
let count = u64::from(count_index) as usize;
let effective_count = count.saturating_sub(skip_count);
let effective_first_index = first_index + skip_count.min(count);
let start_at = effective_first_index.to_usize();
let end_at = start_at + effective_count;
source.collect_range_into_at(start_at, end_at, &mut values);
vsize_source.collect_range_into_at(start_at, end_at, &mut vsizes);
weighted.clear();
weighted.extend(
values
.iter()
.copied()
.zip(vsizes.iter().copied())
.filter(|(v, _)| skip_count == 0 || *v > zero),
);
if weighted.is_empty() {
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.push(zero);
}
} else {
weighted.sort_unstable_by(|a, b| a.0.cmp(&b.0));
max.push(weighted.last().unwrap().0);
pct90.push(get_weighted_percentile(&weighted, 0.90));
pct75.push(get_weighted_percentile(&weighted, 0.75));
median.push(get_weighted_percentile(&weighted, 0.50));
pct25.push(get_weighted_percentile(&weighted, 0.25));
pct10.push(get_weighted_percentile(&weighted, 0.10));
min.push(weighted.first().unwrap().0);
}
Ok(())
})?;
let _lock = exit.lock();
for vec in [min, max, median, pct10, pct25, pct75, pct90] {
vec.write()?;
}
Ok(())
}
pub(crate) fn compute_from_nblocks<A>(
&mut self,
max_from: Height,

View File

@@ -0,0 +1,91 @@
//! PercentCumulativeRolling - cumulative percent + 4 rolling window percents.
//!
//! Mirrors `PerBlockCumulativeRolling` but for percentages derived from ratios
//! of cumulative values and rolling sums.
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, Version};
use vecdb::{BinaryTransform, Database, Exit, ReadableVec, Rw, StorageMode, VecValue};
use crate::{
indexes,
internal::{BpsType, PercentPerBlock, PercentRollingWindows},
};
#[derive(Traversable)]
pub struct PercentCumulativeRolling<B: BpsType, M: StorageMode = Rw> {
pub cumulative: PercentPerBlock<B, M>,
#[traversable(flatten)]
pub rolling: PercentRollingWindows<B, M>,
}
impl<B: BpsType> PercentCumulativeRolling<B> {
pub(crate) fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
let cumulative =
PercentPerBlock::forced_import(db, &format!("{name}_cumulative"), version, indexes)?;
let rolling =
PercentRollingWindows::forced_import(db, &format!("{name}_sum"), version, indexes)?;
Ok(Self {
cumulative,
rolling,
})
}
/// Alternate constructor that uses the same base name for both the
/// cumulative `PercentPerBlock` and the `PercentRollingWindows`, relying on
/// the window suffix to disambiguate. Useful for preserving legacy disk
/// names where the two variants historically shared a prefix.
pub(crate) fn forced_import_flat(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
let cumulative = PercentPerBlock::forced_import(db, name, version, indexes)?;
let rolling = PercentRollingWindows::forced_import(db, name, version, indexes)?;
Ok(Self {
cumulative,
rolling,
})
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn compute_binary<S1T, S2T, F, Rc1, Rc2, Rw1, Rw2>(
&mut self,
max_from: Height,
cumulative_numerator: &Rc1,
cumulative_denominator: &Rc2,
rolling_numerators: [&Rw1; 4],
rolling_denominators: [&Rw2; 4],
exit: &Exit,
) -> Result<()>
where
S1T: VecValue,
S2T: VecValue,
Rc1: ReadableVec<Height, S1T>,
Rc2: ReadableVec<Height, S2T>,
Rw1: ReadableVec<Height, S1T>,
Rw2: ReadableVec<Height, S2T>,
F: BinaryTransform<S1T, S2T, B>,
{
self.cumulative.compute_binary::<S1T, S2T, F>(
max_from,
cumulative_numerator,
cumulative_denominator,
exit,
)?;
self.rolling.compute_binary::<S1T, S2T, F, Rw1, Rw2>(
max_from,
rolling_numerators,
rolling_denominators,
exit,
)?;
Ok(())
}
}

View File

@@ -1,10 +1,12 @@
mod base;
mod cumulative_rolling;
mod lazy;
mod lazy_windows;
mod vec;
mod windows;
pub use base::*;
pub use cumulative_rolling::*;
pub use lazy::*;
pub use lazy_windows::*;
pub use vec::*;

View File

@@ -2,7 +2,7 @@ use brk_error::Result;
use brk_indexer::Indexer;
use brk_traversable::Traversable;
use brk_types::{Indexes, TxIndex};
use brk_types::{Indexes, TxIndex, VSize};
use schemars::JsonSchema;
use vecdb::{Database, Exit, ReadableVec, Rw, StorageMode, Version};
@@ -113,4 +113,43 @@ where
Ok(())
}
/// Like `derive_from_with_skip` but uses vsize-weighted percentiles for the
/// per-block distribution. The rolling 6-block distribution stays count-based.
#[allow(clippy::too_many_arguments)]
pub(crate) fn derive_from_with_skip_weighted(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
tx_index_source: &impl ReadableVec<TxIndex, T>,
vsize_source: &impl ReadableVec<TxIndex, VSize>,
exit: &Exit,
skip_count: usize,
) -> Result<()>
where
T: Copy + Ord + From<f64> + Default,
f64: From<T>,
{
self.block.compute_with_skip_weighted(
starting_indexes.height,
tx_index_source,
vsize_source,
&indexer.vecs.transactions.first_tx_index,
&indexes.height.tx_index_count,
exit,
skip_count,
)?;
self.distribution._6b.compute_from_nblocks(
starting_indexes.height,
tx_index_source,
&indexer.vecs.transactions.first_tx_index,
&indexes.height.tx_index_count,
6,
exit,
)?;
Ok(())
}
}

View File

@@ -6,9 +6,9 @@
use brk_error::Result;
use brk_indexer::Indexer;
use brk_traversable::Traversable;
use brk_types::{Indexes, TxIndex};
use brk_types::{Indexes, TxIndex, VSize};
use schemars::JsonSchema;
use vecdb::{Database, EagerVec, Exit, ImportableVec, PcoVec, Rw, StorageMode, Version};
use vecdb::{Database, EagerVec, Exit, ImportableVec, PcoVec, ReadableVec, Rw, StorageMode, Version};
use crate::{
indexes,
@@ -65,4 +65,29 @@ where
skip_count,
)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn derive_from_with_skip_weighted(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
vsize_source: &impl ReadableVec<TxIndex, VSize>,
exit: &Exit,
skip_count: usize,
) -> Result<()>
where
T: Copy + Ord + From<f64> + Default,
f64: From<T>,
{
self.distribution.derive_from_with_skip_weighted(
indexer,
indexes,
starting_indexes,
&self.tx_index,
vsize_source,
exit,
skip_count,
)
}
}

View File

@@ -433,6 +433,7 @@ impl Computer {
&self.indexes,
&self.inputs,
&self.outputs,
&self.scripts,
&self.transactions,
&self.blocks,
&self.prices,

View File

@@ -116,16 +116,10 @@ impl Vecs {
.compute(prices, starting_indexes.height, exit)?;
self.fee_dominance
.compute_binary::<Sats, Sats, RatioSatsBp16>(
.compute_binary::<Sats, Sats, RatioSatsBp16, _, _, _, _>(
starting_indexes.height,
&self.fees.cumulative.sats.height,
&self.coinbase.cumulative.sats.height,
exit,
)?;
self.fee_dominance_rolling
.compute_binary::<Sats, Sats, RatioSatsBp16, _, _>(
starting_indexes.height,
self.fees.sum.as_array().map(|w| &w.sats.height),
self.coinbase.sum.as_array().map(|w| &w.sats.height),
exit,

View File

@@ -7,7 +7,7 @@ use crate::{
indexes,
internal::{
AmountPerBlockCumulative, AmountPerBlockCumulativeRolling, AmountPerBlockFull,
LazyPercentRollingWindows, OneMinusBp16, PercentPerBlock, PercentRollingWindows,
LazyPercentRollingWindows, OneMinusBp16, PercentCumulativeRolling, PercentPerBlock,
RatioRollingWindows, WindowStartVec, Windows,
},
};
@@ -19,13 +19,13 @@ impl Vecs {
indexes: &indexes::Vecs,
cached_starts: &Windows<&WindowStartVec>,
) -> Result<Self> {
let fee_dominance_rolling =
PercentRollingWindows::forced_import(db, "fee_dominance", version, indexes)?;
let fee_dominance =
PercentCumulativeRolling::forced_import_flat(db, "fee_dominance", version, indexes)?;
let subsidy_dominance_rolling = LazyPercentRollingWindows::from_rolling::<OneMinusBp16>(
"subsidy_dominance",
version,
&fee_dominance_rolling,
&fee_dominance.rolling,
);
Ok(Self {
@@ -51,8 +51,7 @@ impl Vecs {
version,
indexes,
)?,
fee_dominance: PercentPerBlock::forced_import(db, "fee_dominance", version, indexes)?,
fee_dominance_rolling,
fee_dominance,
subsidy_dominance: PercentPerBlock::forced_import(
db,
"subsidy_dominance",

View File

@@ -4,7 +4,7 @@ use vecdb::{EagerVec, PcoVec, Rw, StorageMode};
use crate::internal::{
AmountPerBlockCumulative, AmountPerBlockCumulativeRolling, AmountPerBlockFull,
LazyPercentRollingWindows, PercentPerBlock, PercentRollingWindows, RatioRollingWindows,
LazyPercentRollingWindows, PercentCumulativeRolling, PercentPerBlock, RatioRollingWindows,
};
#[derive(Traversable)]
@@ -15,9 +15,7 @@ pub struct Vecs<M: StorageMode = Rw> {
pub output_volume: M::Stored<EagerVec<PcoVec<Height, Sats>>>,
pub unclaimed: AmountPerBlockCumulative<M>,
#[traversable(wrap = "fees", rename = "dominance")]
pub fee_dominance: PercentPerBlock<BasisPoints16, M>,
#[traversable(wrap = "fees", rename = "dominance")]
pub fee_dominance_rolling: PercentRollingWindows<BasisPoints16, M>,
pub fee_dominance: PercentCumulativeRolling<BasisPoints16, M>,
#[traversable(wrap = "subsidy", rename = "dominance")]
pub subsidy_dominance: PercentPerBlock<BasisPoints16, M>,
#[traversable(wrap = "subsidy", rename = "dominance")]

View File

@@ -0,0 +1,104 @@
use brk_error::{OptionData, Result};
use brk_indexer::Indexer;
use brk_types::{Indexes, StoredU64};
use vecdb::{AnyVec, Exit, ReadableVec, VecIndex, WritableVec};
use super::Vecs;
use crate::internal::{
PerBlockFull, compute_by_addr_type_block_counts, compute_by_addr_type_tx_percents,
};
impl Vecs {
/// Phase 1: walk outputs and populate `output_count` + `tx_count`.
/// Independent of transactions, can run alongside other outputs work.
pub(crate) fn compute_counts(
&mut self,
indexer: &Indexer,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
let dep_version = indexer.vecs.outputs.output_type.version()
+ indexer.vecs.transactions.first_tx_index.version()
+ indexer.vecs.transactions.first_txout_index.version()
+ indexer.vecs.transactions.txid.version();
for (_, v) in self.output_count.iter_mut() {
v.block
.validate_and_truncate(dep_version, starting_indexes.height)?;
}
for (_, v) in self.tx_count.iter_mut() {
v.block
.validate_and_truncate(dep_version, starting_indexes.height)?;
}
let skip = self
.output_count
.values()
.map(|v| v.block.len())
.min()
.unwrap()
.min(self.tx_count.values().map(|v| v.block.len()).min().unwrap());
let first_tx_index = &indexer.vecs.transactions.first_tx_index;
let end = first_tx_index.len();
if skip >= end {
return Ok(());
}
for (_, v) in self.output_count.iter_mut() {
v.block.truncate_if_needed_at(skip)?;
}
for (_, v) in self.tx_count.iter_mut() {
v.block.truncate_if_needed_at(skip)?;
}
let fi_batch = first_tx_index.collect_range_at(skip, end);
let txid_len = indexer.vecs.transactions.txid.len();
let total_txout_len = indexer.vecs.outputs.output_type.len();
let mut otype_cursor = indexer.vecs.outputs.output_type.cursor();
let mut fo_cursor = indexer.vecs.transactions.first_txout_index.cursor();
compute_by_addr_type_block_counts(
&mut self.output_count,
&mut self.tx_count,
&fi_batch,
txid_len,
false,
starting_indexes.height,
exit,
|tx_pos, per_tx| {
let fo = fo_cursor.get(tx_pos).data()?.to_usize();
let next_fo = if tx_pos + 1 < txid_len {
fo_cursor.get(tx_pos + 1).data()?.to_usize()
} else {
total_txout_len
};
otype_cursor.advance(fo - otype_cursor.position());
for _ in fo..next_fo {
let otype = otype_cursor.next().unwrap();
per_tx[otype as usize] += 1;
}
Ok(())
},
)
}
/// Phase 2: derive `tx_percent` from `tx_count` and the total tx count.
/// Must run after `transactions::Vecs::compute` (depends on tx count totals).
pub(crate) fn compute_percents(
&mut self,
transactions_count_total: &PerBlockFull<StoredU64>,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
compute_by_addr_type_tx_percents(
&self.tx_count,
&mut self.tx_percent,
transactions_count_total,
starting_indexes,
exit,
)
}
}

View File

@@ -0,0 +1,48 @@
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_types::Version;
use vecdb::Database;
use super::Vecs;
use crate::{
indexes,
internal::{PerBlockCumulativeRolling, PercentCumulativeRolling, WindowStartVec, Windows},
};
impl Vecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &Windows<&WindowStartVec>,
) -> Result<Self> {
Ok(Self {
output_count: ByAddrType::new_with_name(|name| {
PerBlockCumulativeRolling::forced_import(
db,
&format!("{name}_output_count"),
version,
indexes,
cached_starts,
)
})?,
tx_count: ByAddrType::new_with_name(|name| {
PerBlockCumulativeRolling::forced_import(
db,
&format!("tx_count_with_{name}_out"),
version,
indexes,
cached_starts,
)
})?,
tx_percent: ByAddrType::new_with_name(|name| {
PercentCumulativeRolling::forced_import(
db,
&format!("tx_count_with_{name}_out_rel_to_all"),
version,
indexes,
)
})?,
})
}
}

View File

@@ -0,0 +1,5 @@
mod compute;
mod import;
mod vecs;
pub use vecs::Vecs;

View File

@@ -0,0 +1,16 @@
use brk_cohort::ByAddrType;
use brk_traversable::Traversable;
use brk_types::{BasisPoints16, StoredU64};
use vecdb::{Rw, StorageMode};
use crate::internal::{PerBlockCumulativeRolling, PercentCumulativeRolling};
#[derive(Traversable)]
pub struct Vecs<M: StorageMode = Rw> {
/// Per-block, per-type total output count (granular).
pub output_count: ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64, M>>,
/// Per-block, per-type count of TXs containing at least one output of this type.
pub tx_count: ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64, M>>,
/// Per-type tx_count as a percent of total tx count.
pub tx_percent: ByAddrType<PercentCumulativeRolling<BasisPoints16, M>>,
}

View File

@@ -11,7 +11,7 @@ use crate::{
},
};
use super::{CountVecs, SpentVecs, Vecs};
use super::{ByTypeVecs, CountVecs, SpentVecs, Vecs};
impl Vecs {
pub(crate) fn forced_import(
@@ -25,8 +25,14 @@ impl Vecs {
let spent = SpentVecs::forced_import(&db, version)?;
let count = CountVecs::forced_import(&db, version, indexes, cached_starts)?;
let by_type = ByTypeVecs::forced_import(&db, version, indexes, cached_starts)?;
let this = Self { db, spent, count };
let this = Self {
db,
spent,
count,
by_type,
};
finalize_db(&this.db, &this)?;
Ok(this)
}

View File

@@ -1,3 +1,4 @@
pub mod by_type;
pub mod count;
pub mod spent;
@@ -7,6 +8,7 @@ mod import;
use brk_traversable::Traversable;
use vecdb::{Database, Rw, StorageMode};
pub use by_type::Vecs as ByTypeVecs;
pub use count::Vecs as CountVecs;
pub use spent::Vecs as SpentVecs;
@@ -19,4 +21,5 @@ pub struct Vecs<M: StorageMode = Rw> {
pub spent: SpentVecs<M>,
pub count: CountVecs<M>,
pub by_type: ByTypeVecs<M>,
}

View File

@@ -93,6 +93,26 @@ impl Vecs {
)?)
})?;
// addr_output_count = sum of the 8 address-type per-block counts.
// Lives here (not in addr/) because every consumer that asks "what
// fraction of address outputs are X" needs it as the denominator.
self.addr_output_count.block.compute_sum_of_others(
starting_indexes.height,
&[
&self.p2pk65.block,
&self.p2pk33.block,
&self.p2pkh.block,
&self.p2sh.block,
&self.p2wpkh.block,
&self.p2wsh.block,
&self.p2tr.block,
&self.p2a.block,
],
exit,
)?;
self.addr_output_count
.compute_rest(starting_indexes.height, exit)?;
self.op_return.compute(starting_indexes.height, exit, |v| {
Ok(v.compute_count_from_indexes(
starting_indexes.height,

View File

@@ -88,6 +88,13 @@ impl Vecs {
p2tr,
p2wpkh,
p2wsh,
addr_output_count: PerBlockCumulativeRolling::forced_import(
db,
"addr_output_count",
version,
indexes,
cached_starts,
)?,
op_return: PerBlockCumulativeRolling::forced_import(
db,
"op_return_count",

View File

@@ -15,6 +15,9 @@ pub struct Vecs<M: StorageMode = Rw> {
pub p2tr: PerBlockCumulativeRolling<StoredU64, StoredU64, M>,
pub p2wpkh: PerBlockCumulativeRolling<StoredU64, StoredU64, M>,
pub p2wsh: PerBlockCumulativeRolling<StoredU64, StoredU64, M>,
/// Sum of the 8 address-type per-block counts. Useful as a denominator
/// for any "fraction of address outputs that …" metric.
pub addr_output_count: PerBlockCumulativeRolling<StoredU64, StoredU64, M>,
pub op_return: PerBlockCumulativeRolling<StoredU64, StoredU64, M>,
pub empty_output: PerBlockCumulativeRolling<StoredU64, StoredU64, M>,
pub unknown_output: PerBlockCumulativeRolling<StoredU64, StoredU64, M>,

View File

@@ -3,10 +3,9 @@ use brk_indexer::Indexer;
use brk_types::Indexes;
use vecdb::Exit;
use super::{Vecs, type_counts::compute_type_percents};
use crate::{blocks, indexes, inputs, outputs, prices};
use super::Vecs;
impl Vecs {
#[allow(clippy::too_many_arguments)]
pub(crate) fn compute(
@@ -22,7 +21,7 @@ impl Vecs {
) -> Result<()> {
self.db.sync_bg_tasks()?;
let (r1, (r2, r3)) = rayon::join(
let (r1, (r2, (r3, (r4, r5)))) = rayon::join(
|| {
self.count
.compute(indexer, &blocks.lookback, starting_indexes, exit)
@@ -30,13 +29,56 @@ impl Vecs {
|| {
rayon::join(
|| self.versions.compute(indexer, starting_indexes, exit),
|| self.size.compute(indexer, indexes, starting_indexes, exit),
|| {
rayon::join(
|| self.size.compute(indexer, indexes, starting_indexes, exit),
|| {
rayon::join(
|| {
self.input_types
.compute(indexer, starting_indexes, exit)
},
|| {
self.output_types
.compute(indexer, starting_indexes, exit)
},
)
},
)
},
)
},
);
r1?;
r2?;
r3?;
r4?;
r5?;
let count_total = &self.count.total;
let (input_types, output_types) = (&mut self.input_types, &mut self.output_types);
let (r6, r7) = rayon::join(
|| {
compute_type_percents(
&input_types.by_type,
&mut input_types.percent,
count_total,
starting_indexes.height,
exit,
)
},
|| {
compute_type_percents(
&output_types.by_type,
&mut output_types.percent,
count_total,
starting_indexes.height,
exit,
)
},
);
r6?;
r7?;
self.fees.compute(
indexer,

View File

@@ -35,37 +35,25 @@ impl Vecs {
self.compute_fees(indexer, indexes, size_vecs, starting_indexes, exit)?;
let (r1, (r2, r3)) = rayon::join(
let vsize_source = &size_vecs.vsize.tx_index;
let (r1, r2) = rayon::join(
|| {
self.fee
.derive_from_with_skip(indexer, indexes, starting_indexes, exit, 1)
},
|| {
rayon::join(
|| {
self.fee_rate.derive_from_with_skip(
indexer,
indexes,
starting_indexes,
exit,
1,
)
},
|| {
self.effective_fee_rate.derive_from_with_skip(
indexer,
indexes,
starting_indexes,
exit,
1,
)
},
self.effective_fee_rate.derive_from_with_skip_weighted(
indexer,
indexes,
starting_indexes,
vsize_source,
exit,
1,
)
},
);
r1?;
r2?;
r3?;
Ok(())
}
@@ -86,7 +74,6 @@ impl Vecs {
.tx_index
.validate_computed_version_or_reset(dep_version)?;
self.fee_rate
.tx_index
.validate_computed_version_or_reset(dep_version)?;
self.effective_fee_rate
.tx_index
@@ -101,7 +88,7 @@ impl Vecs {
.fee
.tx_index
.len()
.min(self.fee_rate.tx_index.len())
.min(self.fee_rate.len())
.min(self.effective_fee_rate.tx_index.len())
.min(starting_indexes.tx_index.to_usize());
@@ -113,7 +100,6 @@ impl Vecs {
.tx_index
.truncate_if_needed(starting_indexes.tx_index)?;
self.fee_rate
.tx_index
.truncate_if_needed(starting_indexes.tx_index)?;
self.effective_fee_rate
.tx_index
@@ -185,7 +171,7 @@ impl Vecs {
input_values[j] - output_values[j]
};
self.fee.tx_index.push(fee);
self.fee_rate.tx_index.push(FeeRate::from((fee, vsizes[j])));
self.fee_rate.push(FeeRate::from((fee, vsizes[j])));
fees.push(fee);
}
@@ -205,14 +191,14 @@ impl Vecs {
if h % 1_000 == 0 {
let _lock = exit.lock();
self.fee.tx_index.write()?;
self.fee_rate.tx_index.write()?;
self.fee_rate.write()?;
self.effective_fee_rate.tx_index.write()?;
}
}
let _lock = exit.lock();
self.fee.tx_index.write()?;
self.fee_rate.tx_index.write()?;
self.fee_rate.write()?;
self.effective_fee_rate.tx_index.write()?;
Ok(())

View File

@@ -19,7 +19,7 @@ impl Vecs {
input_value: EagerVec::forced_import(db, "input_value", version)?,
output_value: EagerVec::forced_import(db, "output_value", version)?,
fee: PerTxDistribution::forced_import(db, "fee", v, indexes)?,
fee_rate: PerTxDistribution::forced_import(db, "fee_rate", v, indexes)?,
fee_rate: EagerVec::forced_import(db, "fee_rate", v)?,
effective_fee_rate: PerTxDistribution::forced_import(
db,
"effective_fee_rate",

View File

@@ -9,6 +9,6 @@ pub struct Vecs<M: StorageMode = Rw> {
pub input_value: M::Stored<EagerVec<PcoVec<TxIndex, Sats>>>,
pub output_value: M::Stored<EagerVec<PcoVec<TxIndex, Sats>>>,
pub fee: PerTxDistribution<Sats, M>,
pub fee_rate: PerTxDistribution<FeeRate, M>,
pub fee_rate: M::Stored<EagerVec<PcoVec<TxIndex, FeeRate>>>,
pub effective_fee_rate: PerTxDistribution<FeeRate, M>,
}

View File

@@ -12,7 +12,9 @@ use crate::{
},
};
use super::{CountVecs, FeesVecs, SizeVecs, Vecs, VersionsVecs, VolumeVecs};
use super::{
CountVecs, FeesVecs, InputTypesVecs, OutputTypesVecs, SizeVecs, Vecs, VersionsVecs, VolumeVecs,
};
impl Vecs {
pub(crate) fn forced_import(
@@ -30,6 +32,8 @@ impl Vecs {
let fees = FeesVecs::forced_import(&db, version, indexes)?;
let versions = VersionsVecs::forced_import(&db, version, indexes, cached_starts)?;
let volume = VolumeVecs::forced_import(&db, version, indexes, cached_starts)?;
let input_types = InputTypesVecs::forced_import(&db, version, indexes, cached_starts)?;
let output_types = OutputTypesVecs::forced_import(&db, version, indexes, cached_starts)?;
let this = Self {
db,
@@ -38,6 +42,8 @@ impl Vecs {
fees,
versions,
volume,
input_types,
output_types,
};
finalize_db(&this.db, &this)?;
Ok(this)

View File

@@ -0,0 +1,68 @@
use brk_error::{OptionData, Result};
use brk_indexer::Indexer;
use brk_types::Indexes;
use vecdb::{AnyVec, Exit, ReadableVec, VecIndex, WritableVec};
use super::{super::type_counts::compute_type_counts, Vecs};
impl Vecs {
pub(crate) fn compute(
&mut self,
indexer: &Indexer,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
let dep_version = indexer.vecs.inputs.output_type.version()
+ indexer.vecs.transactions.first_tx_index.version()
+ indexer.vecs.transactions.first_txin_index.version()
+ indexer.vecs.transactions.txid.version();
for (_, v) in self.by_type.iter_mut() {
v.block
.validate_and_truncate(dep_version, starting_indexes.height)?;
}
let skip = self.by_type.values().map(|v| v.block.len()).min().unwrap();
let first_tx_index = &indexer.vecs.transactions.first_tx_index;
let end = first_tx_index.len();
if skip >= end {
return Ok(());
}
for (_, v) in self.by_type.iter_mut() {
v.block.truncate_if_needed_at(skip)?;
}
let fi_batch = first_tx_index.collect_range_at(skip, end);
let txid_len = indexer.vecs.transactions.txid.len();
let total_txin_len = indexer.vecs.inputs.output_type.len();
let mut itype_cursor = indexer.vecs.inputs.output_type.cursor();
let mut fi_in_cursor = indexer.vecs.transactions.first_txin_index.cursor();
compute_type_counts(
&mut self.by_type,
&fi_batch,
txid_len,
true,
starting_indexes.height,
exit,
|tx_pos| {
let fi_in = fi_in_cursor.get(tx_pos).data()?.to_usize();
let next_fi_in = if tx_pos + 1 < txid_len {
fi_in_cursor.get(tx_pos + 1).data()?.to_usize()
} else {
total_txin_len
};
let mut seen: u16 = 0;
itype_cursor.advance(fi_in - itype_cursor.position());
for _ in fi_in..next_fi_in {
seen |= 1u16 << (itype_cursor.next().unwrap() as u8);
}
Ok(seen)
},
)
}
}

View File

@@ -0,0 +1,39 @@
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_types::Version;
use vecdb::Database;
use super::Vecs;
use crate::{
indexes,
internal::{PerBlockCumulativeRolling, PercentCumulativeRolling, WindowStartVec, Windows},
};
impl Vecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &Windows<&WindowStartVec>,
) -> Result<Self> {
Ok(Self {
by_type: ByAddrType::new_with_name(|name| {
PerBlockCumulativeRolling::forced_import(
db,
&format!("tx_count_with_{name}_in"),
version,
indexes,
cached_starts,
)
})?,
percent: ByAddrType::new_with_name(|name| {
PercentCumulativeRolling::forced_import(
db,
&format!("tx_count_with_{name}_in_rel_to_all"),
version,
indexes,
)
})?,
})
}
}

View File

@@ -0,0 +1,5 @@
mod compute;
mod import;
mod vecs;
pub use vecs::Vecs;

View File

@@ -0,0 +1,12 @@
use brk_cohort::ByAddrType;
use brk_traversable::Traversable;
use brk_types::{BasisPoints16, StoredU64};
use vecdb::{Rw, StorageMode};
use crate::internal::{PerBlockCumulativeRolling, PercentCumulativeRolling};
#[derive(Traversable)]
pub struct Vecs<M: StorageMode = Rw> {
pub by_type: ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64, M>>,
pub percent: ByAddrType<PercentCumulativeRolling<BasisPoints16, M>>,
}

View File

@@ -1,9 +1,12 @@
pub mod count;
pub mod fees;
pub mod input_types;
pub mod output_types;
pub mod size;
pub mod versions;
pub mod volume;
mod type_counts;
mod compute;
mod import;
@@ -12,6 +15,8 @@ use vecdb::{Database, Rw, StorageMode};
pub use count::Vecs as CountVecs;
pub use fees::Vecs as FeesVecs;
pub use input_types::Vecs as InputTypesVecs;
pub use output_types::Vecs as OutputTypesVecs;
pub use size::Vecs as SizeVecs;
pub use versions::Vecs as VersionsVecs;
pub use volume::Vecs as VolumeVecs;
@@ -28,4 +33,6 @@ pub struct Vecs<M: StorageMode = Rw> {
pub fees: FeesVecs<M>,
pub versions: VersionsVecs<M>,
pub volume: VolumeVecs<M>,
pub input_types: InputTypesVecs<M>,
pub output_types: OutputTypesVecs<M>,
}

View File

@@ -0,0 +1,68 @@
use brk_error::{OptionData, Result};
use brk_indexer::Indexer;
use brk_types::Indexes;
use vecdb::{AnyVec, Exit, ReadableVec, VecIndex, WritableVec};
use super::{super::type_counts::compute_type_counts, Vecs};
impl Vecs {
pub(crate) fn compute(
&mut self,
indexer: &Indexer,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
let dep_version = indexer.vecs.outputs.output_type.version()
+ indexer.vecs.transactions.first_tx_index.version()
+ indexer.vecs.transactions.first_txout_index.version()
+ indexer.vecs.transactions.txid.version();
for (_, v) in self.by_type.iter_mut() {
v.block
.validate_and_truncate(dep_version, starting_indexes.height)?;
}
let skip = self.by_type.values().map(|v| v.block.len()).min().unwrap();
let first_tx_index = &indexer.vecs.transactions.first_tx_index;
let end = first_tx_index.len();
if skip >= end {
return Ok(());
}
for (_, v) in self.by_type.iter_mut() {
v.block.truncate_if_needed_at(skip)?;
}
let fi_batch = first_tx_index.collect_range_at(skip, end);
let txid_len = indexer.vecs.transactions.txid.len();
let total_txout_len = indexer.vecs.outputs.output_type.len();
let mut otype_cursor = indexer.vecs.outputs.output_type.cursor();
let mut fo_cursor = indexer.vecs.transactions.first_txout_index.cursor();
compute_type_counts(
&mut self.by_type,
&fi_batch,
txid_len,
false,
starting_indexes.height,
exit,
|tx_pos| {
let fo = fo_cursor.get(tx_pos).data()?.to_usize();
let next_fo = if tx_pos + 1 < txid_len {
fo_cursor.get(tx_pos + 1).data()?.to_usize()
} else {
total_txout_len
};
let mut seen: u16 = 0;
otype_cursor.advance(fo - otype_cursor.position());
for _ in fo..next_fo {
seen |= 1u16 << (otype_cursor.next().unwrap() as u8);
}
Ok(seen)
},
)
}
}

View File

@@ -0,0 +1,39 @@
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_types::Version;
use vecdb::Database;
use super::Vecs;
use crate::{
indexes,
internal::{PerBlockCumulativeRolling, PercentCumulativeRolling, WindowStartVec, Windows},
};
impl Vecs {
pub(crate) fn forced_import(
db: &Database,
version: Version,
indexes: &indexes::Vecs,
cached_starts: &Windows<&WindowStartVec>,
) -> Result<Self> {
Ok(Self {
by_type: ByAddrType::new_with_name(|name| {
PerBlockCumulativeRolling::forced_import(
db,
&format!("tx_count_with_{name}_out"),
version,
indexes,
cached_starts,
)
})?,
percent: ByAddrType::new_with_name(|name| {
PercentCumulativeRolling::forced_import(
db,
&format!("tx_count_with_{name}_out_rel_to_all"),
version,
indexes,
)
})?,
})
}
}

View File

@@ -0,0 +1,5 @@
mod compute;
mod import;
mod vecs;
pub use vecs::Vecs;

View File

@@ -0,0 +1,12 @@
use brk_cohort::ByAddrType;
use brk_traversable::Traversable;
use brk_types::{BasisPoints16, StoredU64};
use vecdb::{Rw, StorageMode};
use crate::internal::{PerBlockCumulativeRolling, PercentCumulativeRolling};
#[derive(Traversable)]
pub struct Vecs<M: StorageMode = Rw> {
pub by_type: ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64, M>>,
pub percent: ByAddrType<PercentCumulativeRolling<BasisPoints16, M>>,
}

View File

@@ -0,0 +1,91 @@
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_types::{BasisPoints16, Height, OutputType, StoredU64, TxIndex};
use vecdb::{AnyStoredVec, Exit, VecIndex, WritableVec};
use crate::internal::{
PerBlockCumulativeRolling, PerBlockFull, PercentCumulativeRolling, RatioU64Bp16,
};
pub(super) fn compute_type_counts(
by_type: &mut ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64>>,
fi_batch: &[TxIndex],
txid_len: usize,
skip_first_tx: bool,
starting_height: Height,
exit: &Exit,
mut scan_tx: impl FnMut(usize) -> Result<u16>,
) -> Result<()> {
for (j, first_tx) in fi_batch.iter().enumerate() {
let fi = first_tx.to_usize();
let next_fi = fi_batch
.get(j + 1)
.map(|v| v.to_usize())
.unwrap_or(txid_len);
let start_tx = if skip_first_tx { fi + 1 } else { fi };
let mut counts = [0u64; 12];
for tx_pos in start_tx..next_fi {
let seen = scan_tx(tx_pos)?;
let mut bits = seen;
while bits != 0 {
let idx = bits.trailing_zeros() as usize;
counts[idx] += 1;
bits &= bits - 1;
}
}
for otype in OutputType::ADDR_TYPES {
by_type
.get_mut_unwrap(otype)
.block
.push(StoredU64::from(counts[otype as usize]));
}
if by_type.p2pkh.block.batch_limit_reached() {
let _lock = exit.lock();
for (_, v) in by_type.iter_mut() {
v.block.write()?;
}
}
}
{
let _lock = exit.lock();
for (_, v) in by_type.iter_mut() {
v.block.write()?;
}
}
for (_, v) in by_type.iter_mut() {
v.compute_rest(starting_height, exit)?;
}
Ok(())
}
pub(super) fn compute_type_percents(
by_type: &ByAddrType<PerBlockCumulativeRolling<StoredU64, StoredU64>>,
percent: &mut ByAddrType<PercentCumulativeRolling<BasisPoints16>>,
count_total: &PerBlockFull<StoredU64>,
starting_height: Height,
exit: &Exit,
) -> Result<()> {
for otype in OutputType::ADDR_TYPES {
let source = by_type.get_unwrap(otype);
percent
.get_mut_unwrap(otype)
.compute_binary::<StoredU64, StoredU64, RatioU64Bp16, _, _, _, _>(
starting_height,
&source.cumulative.height,
&count_total.cumulative.height,
source.sum.as_array().map(|w| &w.height),
count_total.rolling.sum.as_array().map(|w| &w.height),
exit,
)?;
}
Ok(())
}