website: snapshot

This commit is contained in:
nym21
2026-02-02 18:39:42 +01:00
parent cf4bc470e4
commit b23d20ea05
44 changed files with 2637 additions and 2800 deletions

View File

@@ -1,7 +1,7 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{
EmptyAddressData, EmptyAddressIndex, Height, LoadedAddressData, LoadedAddressIndex, Version,
EmptyAddressData, EmptyAddressIndex, FundedAddressData, FundedAddressIndex, Height, Version,
};
use rayon::prelude::*;
use vecdb::{
@@ -10,10 +10,10 @@ use vecdb::{
const SAVED_STAMPED_CHANGES: u16 = 10;
/// Storage for both loaded and empty address data.
/// Storage for both funded and empty address data.
#[derive(Clone, Traversable)]
pub struct AddressesDataVecs {
pub loaded: BytesVec<LoadedAddressIndex, LoadedAddressData>,
pub funded: BytesVec<FundedAddressIndex, FundedAddressData>,
pub empty: BytesVec<EmptyAddressIndex, EmptyAddressData>,
}
@@ -21,8 +21,8 @@ impl AddressesDataVecs {
/// Import from database.
pub fn forced_import(db: &Database, version: Version) -> Result<Self> {
Ok(Self {
loaded: BytesVec::forced_import_with(
ImportOptions::new(db, "loadedaddressdata", version)
funded: BytesVec::forced_import_with(
ImportOptions::new(db, "fundedaddressdata", version)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
empty: BytesVec::forced_import_with(
@@ -32,31 +32,31 @@ impl AddressesDataVecs {
})
}
/// Get minimum stamped height across loaded and empty data.
/// Get minimum stamped height across funded and empty data.
pub fn min_stamped_height(&self) -> Height {
Height::from(self.loaded.stamp())
Height::from(self.funded.stamp())
.incremented()
.min(Height::from(self.empty.stamp()).incremented())
}
/// Rollback both loaded and empty data to before the given stamp.
/// Rollback both funded and empty data to before the given stamp.
pub fn rollback_before(&mut self, stamp: Stamp) -> Result<[Stamp; 2]> {
Ok([
self.loaded.rollback_before(stamp)?,
self.funded.rollback_before(stamp)?,
self.empty.rollback_before(stamp)?,
])
}
/// Reset both loaded and empty data.
/// Reset both funded and empty data.
pub fn reset(&mut self) -> Result<()> {
self.loaded.reset()?;
self.funded.reset()?;
self.empty.reset()?;
Ok(())
}
/// Flush both loaded and empty data with stamp.
/// Flush both funded and empty data with stamp.
pub fn write(&mut self, stamp: Stamp, with_changes: bool) -> Result<()> {
self.loaded
self.funded
.stamped_write_maybe_with_changes(stamp, with_changes)?;
self.empty
.stamped_write_maybe_with_changes(stamp, with_changes)?;
@@ -66,7 +66,7 @@ impl AddressesDataVecs {
/// Returns a parallel iterator over all vecs for parallel writing.
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
vec![
&mut self.loaded as &mut dyn AnyStoredVec,
&mut self.funded as &mut dyn AnyStoredVec,
&mut self.empty as &mut dyn AnyStoredVec,
]
.into_par_iter()

View File

@@ -136,7 +136,7 @@ define_any_address_indexes_vecs!(
impl AnyAddressIndexesVecs {
/// Process index updates in parallel by address type.
/// Accepts two maps (e.g. from empty and loaded processing) and merges per-thread.
/// Accepts two maps (e.g. from empty and funded processing) and merges per-thread.
/// Updates existing entries and pushes new ones (sorted).
/// Returns (update_count, push_count).
pub fn par_batch_update(

View File

@@ -1,5 +1,5 @@
use brk_cohort::ByAddressType;
use brk_types::{AnyAddressDataIndexEnum, LoadedAddressData, OutputType, TypeIndex};
use brk_types::{AnyAddressDataIndexEnum, FundedAddressData, OutputType, TypeIndex};
use crate::distribution::{
address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs},
@@ -7,7 +7,7 @@ use crate::distribution::{
};
use super::super::cohort::{
EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec, WithAddressDataSource,
EmptyAddressDataWithSource, FundedAddressDataWithSource, TxIndexVec, WithAddressDataSource,
update_tx_counts,
};
use super::lookup::AddressLookup;
@@ -15,7 +15,7 @@ use super::lookup::AddressLookup;
/// Cache for address data within a flush interval.
pub struct AddressCache {
/// Addresses with non-zero balance
loaded: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
funded: AddressTypeToTypeIndexMap<FundedAddressDataWithSource>,
/// Addresses that became empty (zero balance)
empty: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
}
@@ -29,15 +29,15 @@ impl Default for AddressCache {
impl AddressCache {
pub fn new() -> Self {
Self {
loaded: AddressTypeToTypeIndexMap::default(),
funded: AddressTypeToTypeIndexMap::default(),
empty: AddressTypeToTypeIndexMap::default(),
}
}
/// Check if address is in cache (either loaded or empty).
/// Check if address is in cache (either funded or empty).
#[inline]
pub fn contains(&self, address_type: OutputType, typeindex: TypeIndex) -> bool {
self.loaded
self.funded
.get(address_type)
.is_some_and(|m| m.contains_key(&typeindex))
|| self
@@ -46,24 +46,24 @@ impl AddressCache {
.is_some_and(|m| m.contains_key(&typeindex))
}
/// Merge address data into loaded cache.
/// Merge address data into funded cache.
#[inline]
pub fn merge_loaded(&mut self, data: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>) {
self.loaded.merge_mut(data);
pub fn merge_funded(&mut self, data: AddressTypeToTypeIndexMap<FundedAddressDataWithSource>) {
self.funded.merge_mut(data);
}
/// Create an AddressLookup view into this cache.
#[inline]
pub fn as_lookup(&mut self) -> AddressLookup<'_> {
AddressLookup {
loaded: &mut self.loaded,
funded: &mut self.funded,
empty: &mut self.empty,
}
}
/// Update transaction counts for addresses.
pub fn update_tx_counts(&mut self, txindex_vecs: AddressTypeToTypeIndexMap<TxIndexVec>) {
update_tx_counts(&mut self.loaded, &mut self.empty, txindex_vecs);
update_tx_counts(&mut self.funded, &mut self.empty, txindex_vecs);
}
/// Take the cache contents for flushing, leaving empty caches.
@@ -71,18 +71,18 @@ impl AddressCache {
&mut self,
) -> (
AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
AddressTypeToTypeIndexMap<FundedAddressDataWithSource>,
) {
(
std::mem::take(&mut self.empty),
std::mem::take(&mut self.loaded),
std::mem::take(&mut self.funded),
)
}
}
/// Load address data from storage or create new.
///
/// Returns None if address is already in cache (loaded or empty).
/// Returns None if address is already in cache (funded or empty).
#[allow(clippy::too_many_arguments)]
pub fn load_uncached_address_data(
address_type: OutputType,
@@ -92,11 +92,11 @@ pub fn load_uncached_address_data(
vr: &VecsReaders,
any_address_indexes: &AnyAddressIndexesVecs,
addresses_data: &AddressesDataVecs,
) -> Option<LoadedAddressDataWithSource> {
) -> Option<FundedAddressDataWithSource> {
// Check if this is a new address (typeindex >= first for this height)
let first = *first_addressindexes.get(address_type).unwrap();
if first <= typeindex {
return Some(WithAddressDataSource::New(LoadedAddressData::default()));
return Some(WithAddressDataSource::New(FundedAddressData::default()));
}
// Skip if already in cache
@@ -109,13 +109,13 @@ pub fn load_uncached_address_data(
let anyaddressindex = any_address_indexes.get(address_type, typeindex, reader);
Some(match anyaddressindex.to_enum() {
AnyAddressDataIndexEnum::Loaded(loaded_index) => {
let reader = &vr.anyaddressindex_to_anyaddressdata.loaded;
AnyAddressDataIndexEnum::Funded(funded_index) => {
let reader = &vr.anyaddressindex_to_anyaddressdata.funded;
// Use get_any_or_read_unwrap to check updated layer (needed after rollback)
let loaded_data = addresses_data
.loaded
.get_any_or_read_unwrap(loaded_index, reader);
WithAddressDataSource::FromLoaded(loaded_index, loaded_data)
let funded_data = addresses_data
.funded
.get_any_or_read_unwrap(funded_index, reader);
WithAddressDataSource::FromFunded(funded_index, funded_data)
}
AnyAddressDataIndexEnum::Empty(empty_index) => {
let reader = &vr.anyaddressindex_to_anyaddressdata.empty;

View File

@@ -1,9 +1,9 @@
use brk_types::{LoadedAddressData, OutputType, TypeIndex};
use brk_types::{FundedAddressData, OutputType, TypeIndex};
use crate::distribution::address::AddressTypeToTypeIndexMap;
use super::super::cohort::{
EmptyAddressDataWithSource, LoadedAddressDataWithSource, WithAddressDataSource,
EmptyAddressDataWithSource, FundedAddressDataWithSource, WithAddressDataSource,
};
/// Tracking status of an address - determines cohort update strategy.
@@ -19,7 +19,7 @@ pub enum TrackingStatus {
/// Context for looking up and storing address data during block processing.
pub struct AddressLookup<'a> {
pub loaded: &'a mut AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
pub funded: &'a mut AddressTypeToTypeIndexMap<FundedAddressDataWithSource>,
pub empty: &'a mut AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
}
@@ -28,21 +28,21 @@ impl<'a> AddressLookup<'a> {
&mut self,
output_type: OutputType,
type_index: TypeIndex,
) -> (&mut LoadedAddressDataWithSource, TrackingStatus) {
) -> (&mut FundedAddressDataWithSource, TrackingStatus) {
use std::collections::hash_map::Entry;
let map = self.loaded.get_mut(output_type).unwrap();
let map = self.funded.get_mut(output_type).unwrap();
match map.entry(type_index) {
Entry::Occupied(entry) => {
// Address is in cache. Need to determine if it's been processed
// by process_received (added to a cohort) or just loaded this block.
// by process_received (added to a cohort) or just funded this block.
//
// - If wrapper is New AND funded_txo_count == 0: hasn't received yet,
// was just created in process_outputs this block → New
// - If wrapper is New AND funded_txo_count > 0: received in previous
// block but still in cache (no flush) → Tracked
// - If wrapper is FromLoaded: loaded from storage → Tracked
// - If wrapper is FromFunded: funded from storage → Tracked
// - If wrapper is FromEmpty AND utxo_count == 0: still empty → WasEmpty
// - If wrapper is FromEmpty AND utxo_count > 0: already received → Tracked
let status = match entry.get() {
@@ -53,7 +53,7 @@ impl<'a> AddressLookup<'a> {
TrackingStatus::Tracked
}
}
WithAddressDataSource::FromLoaded(..) => TrackingStatus::Tracked,
WithAddressDataSource::FromFunded(..) => TrackingStatus::Tracked,
WithAddressDataSource::FromEmpty(_, data) => {
if data.utxo_count() == 0 {
TrackingStatus::WasEmpty
@@ -71,7 +71,7 @@ impl<'a> AddressLookup<'a> {
return (entry.insert(empty_data.into()), TrackingStatus::WasEmpty);
}
(
entry.insert(WithAddressDataSource::New(LoadedAddressData::default())),
entry.insert(WithAddressDataSource::New(FundedAddressData::default())),
TrackingStatus::New,
)
}
@@ -83,18 +83,18 @@ impl<'a> AddressLookup<'a> {
&mut self,
output_type: OutputType,
type_index: TypeIndex,
) -> &mut LoadedAddressDataWithSource {
self.loaded
) -> &mut FundedAddressDataWithSource {
self.funded
.get_mut(output_type)
.unwrap()
.get_mut(&type_index)
.expect("Address must exist for send")
}
/// Move address from loaded to empty set.
/// Move address from funded to empty set.
pub fn move_to_empty(&mut self, output_type: OutputType, type_index: TypeIndex) {
let data = self
.loaded
.funded
.get_mut(output_type)
.unwrap()
.remove(&type_index)

View File

@@ -1,40 +1,40 @@
use brk_error::Result;
use brk_types::{
AnyAddressIndex, EmptyAddressData, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex,
AnyAddressIndex, EmptyAddressData, EmptyAddressIndex, FundedAddressData, FundedAddressIndex,
OutputType, TypeIndex,
};
use vecdb::AnyVec;
use crate::distribution::{AddressTypeToTypeIndexMap, AddressesDataVecs};
use super::with_source::{EmptyAddressDataWithSource, LoadedAddressDataWithSource};
use super::with_source::{EmptyAddressDataWithSource, FundedAddressDataWithSource};
/// Process loaded address data updates.
/// Process funded address data updates.
///
/// Handles:
/// - New loaded address: push to loaded storage
/// - Updated loaded address (was loaded): update in place
/// - Transition empty -> loaded: delete from empty, push to loaded
pub fn process_loaded_addresses(
/// - New funded address: push to funded storage
/// - Updated funded address (was funded): update in place
/// - Transition empty -> funded: delete from empty, push to funded
pub fn process_funded_addresses(
addresses_data: &mut AddressesDataVecs,
loaded_updates: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
funded_updates: AddressTypeToTypeIndexMap<FundedAddressDataWithSource>,
) -> Result<AddressTypeToTypeIndexMap<AnyAddressIndex>> {
let total: usize = loaded_updates.iter().map(|(_, m)| m.len()).sum();
let total: usize = funded_updates.iter().map(|(_, m)| m.len()).sum();
let mut updates: Vec<(LoadedAddressIndex, LoadedAddressData)> = Vec::with_capacity(total);
let mut updates: Vec<(FundedAddressIndex, FundedAddressData)> = Vec::with_capacity(total);
let mut deletes: Vec<EmptyAddressIndex> = Vec::with_capacity(total);
let mut pushes: Vec<(OutputType, TypeIndex, LoadedAddressData)> = Vec::with_capacity(total);
let mut pushes: Vec<(OutputType, TypeIndex, FundedAddressData)> = Vec::with_capacity(total);
for (address_type, items) in loaded_updates.into_iter() {
for (address_type, items) in funded_updates.into_iter() {
for (typeindex, source) in items {
match source {
LoadedAddressDataWithSource::New(data) => {
FundedAddressDataWithSource::New(data) => {
pushes.push((address_type, typeindex, data));
}
LoadedAddressDataWithSource::FromLoaded(index, data) => {
FundedAddressDataWithSource::FromFunded(index, data) => {
updates.push((index, data));
}
LoadedAddressDataWithSource::FromEmpty(empty_index, data) => {
FundedAddressDataWithSource::FromEmpty(empty_index, data) => {
deletes.push(empty_index);
pushes.push((address_type, typeindex, data));
}
@@ -49,16 +49,16 @@ pub fn process_loaded_addresses(
// Phase 2: Updates (in-place)
for (index, data) in updates {
addresses_data.loaded.update(index, data)?;
addresses_data.funded.update(index, data)?;
}
// Phase 3: Pushes (fill holes first, then pure pushes)
let mut result = AddressTypeToTypeIndexMap::with_capacity(pushes.len() / 4);
let holes_count = addresses_data.loaded.holes().len();
let holes_count = addresses_data.funded.holes().len();
let mut pushes_iter = pushes.into_iter();
for (address_type, typeindex, data) in pushes_iter.by_ref().take(holes_count) {
let index = addresses_data.loaded.fill_first_hole_or_push(data)?;
let index = addresses_data.funded.fill_first_hole_or_push(data)?;
result
.get_mut(address_type)
.unwrap()
@@ -66,14 +66,14 @@ pub fn process_loaded_addresses(
}
// Pure pushes - no holes remain
addresses_data.loaded.reserve_pushed(pushes_iter.len());
let mut next_index = addresses_data.loaded.len();
addresses_data.funded.reserve_pushed(pushes_iter.len());
let mut next_index = addresses_data.funded.len();
for (address_type, typeindex, data) in pushes_iter {
addresses_data.loaded.push(data);
result
.get_mut(address_type)
.unwrap()
.insert(typeindex, AnyAddressIndex::from(LoadedAddressIndex::from(next_index)));
addresses_data.funded.push(data);
result.get_mut(address_type).unwrap().insert(
typeindex,
AnyAddressIndex::from(FundedAddressIndex::from(next_index)),
);
next_index += 1;
}
@@ -85,7 +85,7 @@ pub fn process_loaded_addresses(
/// Handles:
/// - New empty address: push to empty storage
/// - Updated empty address (was empty): update in place
/// - Transition loaded -> empty: delete from loaded, push to empty
/// - Transition funded -> empty: delete from funded, push to empty
pub fn process_empty_addresses(
addresses_data: &mut AddressesDataVecs,
empty_updates: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
@@ -93,7 +93,7 @@ pub fn process_empty_addresses(
let total: usize = empty_updates.iter().map(|(_, m)| m.len()).sum();
let mut updates: Vec<(EmptyAddressIndex, EmptyAddressData)> = Vec::with_capacity(total);
let mut deletes: Vec<LoadedAddressIndex> = Vec::with_capacity(total);
let mut deletes: Vec<FundedAddressIndex> = Vec::with_capacity(total);
let mut pushes: Vec<(OutputType, TypeIndex, EmptyAddressData)> = Vec::with_capacity(total);
for (address_type, items) in empty_updates.into_iter() {
@@ -105,8 +105,8 @@ pub fn process_empty_addresses(
EmptyAddressDataWithSource::FromEmpty(index, data) => {
updates.push((index, data));
}
EmptyAddressDataWithSource::FromLoaded(loaded_index, data) => {
deletes.push(loaded_index);
EmptyAddressDataWithSource::FromFunded(funded_index, data) => {
deletes.push(funded_index);
pushes.push((address_type, typeindex, data));
}
}
@@ -114,8 +114,8 @@ pub fn process_empty_addresses(
}
// Phase 1: Deletes (creates holes)
for loaded_index in deletes {
addresses_data.loaded.delete(loaded_index);
for funded_index in deletes {
addresses_data.funded.delete(funded_index);
}
// Phase 2: Updates (in-place)
@@ -141,10 +141,10 @@ pub fn process_empty_addresses(
let mut next_index = addresses_data.empty.len();
for (address_type, typeindex, data) in pushes_iter {
addresses_data.empty.push(data);
result
.get_mut(address_type)
.unwrap()
.insert(typeindex, AnyAddressIndex::from(EmptyAddressIndex::from(next_index)));
result.get_mut(address_type).unwrap().insert(
typeindex,
AnyAddressIndex::from(EmptyAddressIndex::from(next_index)),
);
next_index += 1;
}

View File

@@ -2,7 +2,7 @@ use brk_cohort::{AmountBucket, ByAddressType};
use brk_error::Result;
use brk_types::{Age, CentsUnsigned, CheckedSub, Height, Sats, Timestamp, TypeIndex};
use rustc_hash::FxHashSet;
use vecdb::{unlikely, VecIndex};
use vecdb::{VecIndex, unlikely};
use crate::distribution::{
address::{AddressTypeToActivityCounts, HeightToAddressTypeToVec},
@@ -131,7 +131,7 @@ pub fn process_sent(
*type_addr_count -= 1;
*type_empty_count += 1;
// Move from loaded to empty
// Move from funded to empty
lookup.move_to_empty(output_type, type_index);
} else {
// Add to new cohort
@@ -151,7 +151,14 @@ pub fn process_sent(
.state
.as_mut()
.unwrap()
.send(addr_data, value, current_price.unwrap(), prev_price.unwrap(), peak_price.unwrap(), age)?;
.send(
addr_data,
value,
current_price.unwrap(),
prev_price.unwrap(),
peak_price.unwrap(),
age,
)?;
}
}
}

View File

@@ -1,6 +1,6 @@
use crate::distribution::address::AddressTypeToTypeIndexMap;
use super::with_source::{EmptyAddressDataWithSource, LoadedAddressDataWithSource, TxIndexVec};
use super::with_source::{EmptyAddressDataWithSource, FundedAddressDataWithSource, TxIndexVec};
/// Update tx_count for addresses based on unique transactions they participated in.
///
@@ -8,10 +8,10 @@ use super::with_source::{EmptyAddressDataWithSource, LoadedAddressDataWithSource
/// 1. Deduplicate transaction indexes (an address may appear in multiple inputs/outputs of same tx)
/// 2. Add the unique count to the address's tx_count field
///
/// Addresses are looked up in loaded_cache first, then empty_cache.
/// NOTE: This should be called AFTER merging parallel-fetched address data into loaded_cache.
/// Addresses are looked up in funded_cache first, then empty_cache.
/// NOTE: This should be called AFTER merging parallel-fetched address data into funded_cache.
pub fn update_tx_counts(
loaded_cache: &mut AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
funded_cache: &mut AddressTypeToTypeIndexMap<FundedAddressDataWithSource>,
empty_cache: &mut AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
mut txindex_vecs: AddressTypeToTypeIndexMap<TxIndexVec>,
) {
@@ -32,7 +32,7 @@ pub fn update_tx_counts(
{
let tx_count = txindex_vec.len() as u32;
if let Some(addr_data) = loaded_cache
if let Some(addr_data) = funded_cache
.get_mut(address_type)
.unwrap()
.get_mut(&typeindex)

View File

@@ -1,8 +1,10 @@
use brk_types::{EmptyAddressData, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex, TxIndex};
use brk_types::{
EmptyAddressData, EmptyAddressIndex, FundedAddressData, FundedAddressIndex, TxIndex,
};
use smallvec::SmallVec;
/// Loaded address data with source tracking for flush operations.
pub type LoadedAddressDataWithSource = WithAddressDataSource<LoadedAddressData>;
/// Funded address data with source tracking for flush operations.
pub type FundedAddressDataWithSource = WithAddressDataSource<FundedAddressData>;
/// Empty address data with source tracking for flush operations.
pub type EmptyAddressDataWithSource = WithAddressDataSource<EmptyAddressData>;
@@ -18,9 +20,9 @@ pub type TxIndexVec = SmallVec<[TxIndex; 4]>;
pub enum WithAddressDataSource<T> {
/// Brand new address (never seen before)
New(T),
/// Loaded from loaded address storage (with original index)
FromLoaded(LoadedAddressIndex, T),
/// Loaded from empty address storage (with original index)
/// Funded from funded address storage (with original index)
FromFunded(FundedAddressIndex, T),
/// Funded from empty address storage (with original index)
FromEmpty(EmptyAddressIndex, T),
}
@@ -29,7 +31,7 @@ impl<T> std::ops::Deref for WithAddressDataSource<T> {
fn deref(&self) -> &Self::Target {
match self {
Self::New(v) | Self::FromLoaded(_, v) | Self::FromEmpty(_, v) => v,
Self::New(v) | Self::FromFunded(_, v) | Self::FromEmpty(_, v) => v,
}
}
}
@@ -37,28 +39,28 @@ impl<T> std::ops::Deref for WithAddressDataSource<T> {
impl<T> std::ops::DerefMut for WithAddressDataSource<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
Self::New(v) | Self::FromLoaded(_, v) | Self::FromEmpty(_, v) => v,
Self::New(v) | Self::FromFunded(_, v) | Self::FromEmpty(_, v) => v,
}
}
}
impl From<WithAddressDataSource<EmptyAddressData>> for WithAddressDataSource<LoadedAddressData> {
impl From<WithAddressDataSource<EmptyAddressData>> for WithAddressDataSource<FundedAddressData> {
#[inline]
fn from(value: WithAddressDataSource<EmptyAddressData>) -> Self {
match value {
WithAddressDataSource::New(v) => Self::New(v.into()),
WithAddressDataSource::FromLoaded(i, v) => Self::FromLoaded(i, v.into()),
WithAddressDataSource::FromFunded(i, v) => Self::FromFunded(i, v.into()),
WithAddressDataSource::FromEmpty(i, v) => Self::FromEmpty(i, v.into()),
}
}
}
impl From<WithAddressDataSource<LoadedAddressData>> for WithAddressDataSource<EmptyAddressData> {
impl From<WithAddressDataSource<FundedAddressData>> for WithAddressDataSource<EmptyAddressData> {
#[inline]
fn from(value: WithAddressDataSource<LoadedAddressData>) -> Self {
fn from(value: WithAddressDataSource<FundedAddressData>) -> Self {
match value {
WithAddressDataSource::New(v) => Self::New(v.into()),
WithAddressDataSource::FromLoaded(i, v) => Self::FromLoaded(i, v.into()),
WithAddressDataSource::FromFunded(i, v) => Self::FromFunded(i, v.into()),
WithAddressDataSource::FromEmpty(i, v) => Self::FromEmpty(i, v.into()),
}
}

View File

@@ -13,7 +13,7 @@ use crate::distribution::address::HeightToAddressTypeToVec;
use super::super::{
cache::{AddressCache, load_uncached_address_data},
cohort::{LoadedAddressDataWithSource, TxIndexVec},
cohort::{FundedAddressDataWithSource, TxIndexVec},
};
/// Result of processing inputs for a block.
@@ -23,7 +23,7 @@ pub struct InputsResult {
/// Per-height, per-address-type sent data: (typeindex, value) for each address.
pub sent_data: HeightToAddressTypeToVec<(TypeIndex, Sats)>,
/// Address data looked up during processing, keyed by (address_type, typeindex).
pub address_data: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
pub address_data: AddressTypeToTypeIndexMap<FundedAddressDataWithSource>,
/// Transaction indexes per address for tx_count tracking.
pub txindex_vecs: AddressTypeToTypeIndexMap<TxIndexVec>,
}
@@ -100,7 +100,7 @@ pub fn process_inputs(
);
let mut sent_data = HeightToAddressTypeToVec::with_capacity(estimated_unique_heights);
let mut address_data =
AddressTypeToTypeIndexMap::<LoadedAddressDataWithSource>::with_capacity(estimated_per_type);
AddressTypeToTypeIndexMap::<FundedAddressDataWithSource>::with_capacity(estimated_per_type);
let mut txindex_vecs =
AddressTypeToTypeIndexMap::<TxIndexVec>::with_capacity(estimated_per_type);

View File

@@ -11,7 +11,7 @@ use crate::distribution::{
use super::super::{
cache::{AddressCache, load_uncached_address_data},
cohort::{LoadedAddressDataWithSource, TxIndexVec},
cohort::{FundedAddressDataWithSource, TxIndexVec},
};
/// Result of processing outputs for a block.
@@ -21,7 +21,7 @@ pub struct OutputsResult {
/// Per-address-type received data: (typeindex, value) for each address.
pub received_data: AddressTypeToVec<(TypeIndex, Sats)>,
/// Address data looked up during processing, keyed by (address_type, typeindex).
pub address_data: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
pub address_data: AddressTypeToTypeIndexMap<FundedAddressDataWithSource>,
/// Transaction indexes per address for tx_count tracking.
pub txindex_vecs: AddressTypeToTypeIndexMap<TxIndexVec>,
}
@@ -50,7 +50,7 @@ pub fn process_outputs(
let mut transacted = Transacted::default();
let mut received_data = AddressTypeToVec::with_capacity(estimated_per_type);
let mut address_data =
AddressTypeToTypeIndexMap::<LoadedAddressDataWithSource>::with_capacity(estimated_per_type);
AddressTypeToTypeIndexMap::<FundedAddressDataWithSource>::with_capacity(estimated_per_type);
let mut txindex_vecs =
AddressTypeToTypeIndexMap::<TxIndexVec>::with_capacity(estimated_per_type);

View File

@@ -263,8 +263,8 @@ pub fn process_blocks(
});
// Merge new address data into current cache
cache.merge_loaded(outputs_result.address_data);
cache.merge_loaded(inputs_result.address_data);
cache.merge_funded(outputs_result.address_data);
cache.merge_funded(inputs_result.address_data);
// Combine txindex_vecs from outputs and inputs, then update tx_count
let combined_txindex_vecs = outputs_result
@@ -425,14 +425,14 @@ pub fn process_blocks(
// Drop readers to release mmap handles
drop(vr);
let (empty_updates, loaded_updates) = cache.take();
let (empty_updates, funded_updates) = cache.take();
// Process address updates (mutations)
process_address_updates(
&mut vecs.addresses_data,
&mut vecs.any_address_indexes,
empty_updates,
loaded_updates,
funded_updates,
)?;
let _lock = exit.lock();
@@ -451,14 +451,14 @@ pub fn process_blocks(
let _lock = exit.lock();
drop(vr);
let (empty_updates, loaded_updates) = cache.take();
let (empty_updates, funded_updates) = cache.take();
// Process address updates (mutations)
process_address_updates(
&mut vecs.addresses_data,
&mut vecs.any_address_indexes,
empty_updates,
loaded_updates,
funded_updates,
)?;
// Write to disk (pure I/O) - save changes for rollback

View File

@@ -140,7 +140,7 @@ impl VecsReaders {
p2wsh: any_address_indexes.p2wsh.create_reader(),
},
anyaddressindex_to_anyaddressdata: ByAnyAddress {
loaded: addresses_data.loaded.create_reader(),
funded: addresses_data.funded.create_reader(),
empty: addresses_data.empty.create_reader(),
},
}

View File

@@ -9,8 +9,8 @@ use vecdb::{AnyStoredVec, GenericStoredVec, Stamp};
use crate::distribution::{
Vecs,
block::{
EmptyAddressDataWithSource, LoadedAddressDataWithSource, process_empty_addresses,
process_loaded_addresses,
EmptyAddressDataWithSource, FundedAddressDataWithSource, process_empty_addresses,
process_funded_addresses,
},
state::BlockState,
};
@@ -21,7 +21,7 @@ use super::super::address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAdd
///
/// Applies all accumulated address changes to storage structures:
/// - Processes empty address transitions
/// - Processes loaded address transitions
/// - Processes funded address transitions
/// - Updates address indexes
///
/// Call this before `flush()` to prepare data for writing.
@@ -29,14 +29,14 @@ pub fn process_address_updates(
addresses_data: &mut AddressesDataVecs,
address_indexes: &mut AnyAddressIndexesVecs,
empty_updates: AddressTypeToTypeIndexMap<EmptyAddressDataWithSource>,
loaded_updates: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
funded_updates: AddressTypeToTypeIndexMap<FundedAddressDataWithSource>,
) -> Result<()> {
info!("Processing address updates...");
let i = Instant::now();
let empty_result = process_empty_addresses(addresses_data, empty_updates)?;
let loaded_result = process_loaded_addresses(addresses_data, loaded_updates)?;
address_indexes.par_batch_update(empty_result, loaded_result)?;
let funded_result = process_funded_addresses(addresses_data, funded_updates)?;
address_indexes.par_batch_update(empty_result, funded_result)?;
info!("Processed address updates in {:?}", i.elapsed());

View File

@@ -1,7 +1,7 @@
use std::path::Path;
use brk_error::Result;
use brk_types::{Age, CentsUnsigned, Height, LoadedAddressData, Sats, SupplyState};
use brk_types::{Age, CentsUnsigned, FundedAddressData, Height, Sats, SupplyState};
use vecdb::unlikely;
use super::{super::cost_basis::RealizedState, base::CohortState};
@@ -42,7 +42,7 @@ impl AddressCohortState {
pub fn send(
&mut self,
addressdata: &mut LoadedAddressData,
addressdata: &mut FundedAddressData,
value: Sats,
current_price: CentsUnsigned,
prev_price: CentsUnsigned,
@@ -54,7 +54,10 @@ impl AddressCohortState {
let current = addressdata.cost_basis_snapshot();
self.inner.send_address(
&SupplyState { utxo_count: 1, value },
&SupplyState {
utxo_count: 1,
value,
},
current_price,
prev_price,
ath,
@@ -68,7 +71,7 @@ impl AddressCohortState {
pub fn receive(
&mut self,
address_data: &mut LoadedAddressData,
address_data: &mut FundedAddressData,
value: Sats,
price: CentsUnsigned,
) {
@@ -77,7 +80,7 @@ impl AddressCohortState {
pub fn receive_outputs(
&mut self,
address_data: &mut LoadedAddressData,
address_data: &mut FundedAddressData,
value: Sats,
price: CentsUnsigned,
output_count: u32,
@@ -87,19 +90,23 @@ impl AddressCohortState {
let current = address_data.cost_basis_snapshot();
self.inner.receive_address(
&SupplyState { utxo_count: output_count as u64, value },
&SupplyState {
utxo_count: output_count as u64,
value,
},
price,
&current,
&prev,
);
}
pub fn add(&mut self, addressdata: &LoadedAddressData) {
pub fn add(&mut self, addressdata: &FundedAddressData) {
self.addr_count += 1;
self.inner.increment_snapshot(&addressdata.cost_basis_snapshot());
self.inner
.increment_snapshot(&addressdata.cost_basis_snapshot());
}
pub fn subtract(&mut self, addressdata: &LoadedAddressData) {
pub fn subtract(&mut self, addressdata: &FundedAddressData) {
let snapshot = addressdata.cost_basis_snapshot();
// Check for potential underflow before it happens
@@ -111,7 +118,11 @@ impl AddressCohortState {
Address supply: {}\n\
Realized price: {}\n\
This means the address is not properly tracked in this cohort.",
self.addr_count, self.inner.supply, addressdata, snapshot.supply_state, snapshot.realized_price
self.addr_count,
self.inner.supply,
addressdata,
snapshot.supply_state,
snapshot.realized_price
);
}
if unlikely(self.inner.supply.value < snapshot.supply_state.value) {
@@ -122,7 +133,11 @@ impl AddressCohortState {
Address supply: {}\n\
Realized price: {}\n\
This means the address is not properly tracked in this cohort.",
self.addr_count, self.inner.supply, addressdata, snapshot.supply_state, snapshot.realized_price
self.addr_count,
self.inner.supply,
addressdata,
snapshot.supply_state,
snapshot.realized_price
);
}

View File

@@ -4,7 +4,7 @@ use brk_error::Result;
use brk_indexer::Indexer;
use brk_traversable::Traversable;
use brk_types::{
DateIndex, EmptyAddressData, EmptyAddressIndex, Height, LoadedAddressData, LoadedAddressIndex,
DateIndex, EmptyAddressData, EmptyAddressIndex, FundedAddressData, FundedAddressIndex, Height,
SupplyState, Version,
};
use tracing::{debug, info};
@@ -55,8 +55,8 @@ pub struct Vecs {
/// Growth rate (new / addr_count) - lazy ratio with distribution stats, global + per-type
pub growth_rate: GrowthRateVecs,
pub loadedaddressindex:
LazyVecFrom1<LoadedAddressIndex, LoadedAddressIndex, LoadedAddressIndex, LoadedAddressData>,
pub fundedaddressindex:
LazyVecFrom1<FundedAddressIndex, FundedAddressIndex, FundedAddressIndex, FundedAddressData>,
pub emptyaddressindex:
LazyVecFrom1<EmptyAddressIndex, EmptyAddressIndex, EmptyAddressIndex, EmptyAddressData>,
}
@@ -92,8 +92,8 @@ impl Vecs {
)?;
// Create address data BytesVecs first so we can also use them for identity mappings
let loadedaddressindex_to_loadedaddressdata = BytesVec::forced_import_with(
vecdb::ImportOptions::new(&db, "loadedaddressdata", version)
let fundedaddressindex_to_fundedaddressdata = BytesVec::forced_import_with(
vecdb::ImportOptions::new(&db, "fundedaddressdata", version)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?;
let emptyaddressindex_to_emptyaddressdata = BytesVec::forced_import_with(
@@ -102,10 +102,10 @@ impl Vecs {
)?;
// Identity mappings for traversable
let loadedaddressindex = LazyVecFrom1::init(
"loadedaddressindex",
let fundedaddressindex = LazyVecFrom1::init(
"fundedaddressindex",
version,
loadedaddressindex_to_loadedaddressdata.boxed_clone(),
fundedaddressindex_to_fundedaddressdata.boxed_clone(),
|index, _| Some(index),
);
let emptyaddressindex = LazyVecFrom1::init(
@@ -156,10 +156,10 @@ impl Vecs {
any_address_indexes: AnyAddressIndexesVecs::forced_import(&db, version)?,
addresses_data: AddressesDataVecs {
loaded: loadedaddressindex_to_loadedaddressdata,
funded: fundedaddressindex_to_fundedaddressdata,
empty: emptyaddressindex_to_emptyaddressdata,
},
loadedaddressindex,
fundedaddressindex,
emptyaddressindex,
db,