mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-06-03 03:33:38 -07:00
global: improve par writes
This commit is contained in:
@@ -1,7 +1,10 @@
|
||||
use brk_error::Result;
|
||||
use brk_traversable::{Traversable, TreeNode};
|
||||
use brk_types::{DateIndex, Dollars, Version};
|
||||
use vecdb::{AnyExportableVec, AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, PcoVec};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{
|
||||
AnyExportableVec, AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, PcoVec,
|
||||
};
|
||||
|
||||
use crate::{Indexes, indexes};
|
||||
|
||||
@@ -91,6 +94,17 @@ impl PricePercentiles {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
self.vecs
|
||||
.iter_mut()
|
||||
.flatten()
|
||||
.filter_map(|v| v.dateindex.as_mut())
|
||||
.map(|v| v as &mut dyn AnyStoredVec)
|
||||
.collect::<Vec<_>>()
|
||||
.into_par_iter()
|
||||
}
|
||||
|
||||
/// Validate computed versions or reset if mismatched.
|
||||
pub fn validate_computed_version_or_reset(&mut self, version: Version) -> Result<()> {
|
||||
for vec in self.vecs.iter_mut().flatten() {
|
||||
|
||||
@@ -5,6 +5,7 @@ use brk_grouper::ByAddressType;
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{Height, StoredU64, Version};
|
||||
use derive_deref::{Deref, DerefMut};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{
|
||||
AnyStoredVec, AnyVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVec,
|
||||
TypedVecIterator,
|
||||
@@ -88,6 +89,22 @@ impl AddressTypeToHeightToAddressCount {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
let inner = &mut self.0;
|
||||
[
|
||||
&mut inner.p2pk65 as &mut dyn AnyStoredVec,
|
||||
&mut inner.p2pk33 as &mut dyn AnyStoredVec,
|
||||
&mut inner.p2pkh as &mut dyn AnyStoredVec,
|
||||
&mut inner.p2sh as &mut dyn AnyStoredVec,
|
||||
&mut inner.p2wpkh as &mut dyn AnyStoredVec,
|
||||
&mut inner.p2wsh as &mut dyn AnyStoredVec,
|
||||
&mut inner.p2tr as &mut dyn AnyStoredVec,
|
||||
&mut inner.p2a as &mut dyn AnyStoredVec,
|
||||
]
|
||||
.into_par_iter()
|
||||
}
|
||||
|
||||
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
|
||||
self.p2pk65.safe_write(exit)?;
|
||||
self.p2pk33.safe_write(exit)?;
|
||||
|
||||
@@ -7,6 +7,7 @@ use brk_types::{
|
||||
P2PKHAddressIndex, P2SHAddressIndex, P2TRAddressIndex, P2WPKHAddressIndex, P2WSHAddressIndex,
|
||||
TypeIndex, Version,
|
||||
};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{
|
||||
AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportOptions, ImportableVec, Reader, Stamp,
|
||||
};
|
||||
@@ -81,6 +82,11 @@ macro_rules! define_any_address_indexes_vecs {
|
||||
$(self.$field.stamped_write_maybe_with_changes(stamp, with_changes)?;)*
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
vec![$(&mut self.$field as &mut dyn AnyStoredVec),*].into_par_iter()
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use brk_traversable::Traversable;
|
||||
use brk_types::{
|
||||
EmptyAddressData, EmptyAddressIndex, Height, LoadedAddressData, LoadedAddressIndex, Version,
|
||||
};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{
|
||||
AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportOptions, ImportableVec, Stamp,
|
||||
};
|
||||
@@ -63,4 +64,13 @@ impl AddressesDataVecs {
|
||||
.stamped_write_maybe_with_changes(stamp, with_changes)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
vec![
|
||||
&mut self.loaded as &mut dyn AnyStoredVec,
|
||||
&mut self.empty as &mut dyn AnyStoredVec,
|
||||
]
|
||||
.into_par_iter()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use brk_error::Result;
|
||||
use brk_grouper::{CohortContext, Filter, Filtered};
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{Bitcoin, DateIndex, Dollars, Height, StoredU64, Version};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{
|
||||
AnyStoredVec, AnyVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableVec,
|
||||
PcoVec,
|
||||
@@ -113,6 +114,20 @@ impl AddressCohortVecs {
|
||||
.min(self.metrics.supply.min_len())
|
||||
.min(self.metrics.activity.min_len())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_vecs_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
rayon::iter::once(&mut self.height_to_addr_count as &mut dyn AnyStoredVec)
|
||||
.chain(self.metrics.par_iter_mut())
|
||||
}
|
||||
|
||||
/// Commit state to disk (separate from vec writes for parallelization).
|
||||
pub fn write_state(&mut self, height: Height, cleanup: bool) -> Result<()> {
|
||||
if let Some(state) = self.state.as_mut() {
|
||||
state.inner.write(height, cleanup)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Filtered for AddressCohortVecs {
|
||||
@@ -224,17 +239,6 @@ impl DynCohortVecs for AddressCohortVecs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_stateful_vecs(&mut self, height: Height) -> Result<()> {
|
||||
self.height_to_addr_count.write()?;
|
||||
self.metrics.write()?;
|
||||
|
||||
if let Some(state) = self.state.as_mut() {
|
||||
state.inner.commit(height)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compute_rest_part1(
|
||||
&mut self,
|
||||
indexes: &indexes::Vecs,
|
||||
|
||||
@@ -11,7 +11,7 @@ use brk_traversable::Traversable;
|
||||
use brk_types::{Bitcoin, DateIndex, Dollars, Height, Sats, Version};
|
||||
use derive_deref::{Deref, DerefMut};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{Database, Exit, IterableVec};
|
||||
use vecdb::{AnyStoredVec, Database, Exit, IterableVec};
|
||||
|
||||
use crate::{Indexes, indexes, price, stateful::DynCohortVecs};
|
||||
|
||||
@@ -222,10 +222,20 @@ impl AddressCohorts {
|
||||
})
|
||||
}
|
||||
|
||||
/// Write stateful vectors for separate cohorts.
|
||||
pub fn write_stateful_vecs(&mut self, height: Height) -> Result<()> {
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_vecs_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
// Collect all vecs from all cohorts
|
||||
self.0
|
||||
.iter_mut()
|
||||
.flat_map(|v| v.par_iter_vecs_mut().collect::<Vec<_>>())
|
||||
.collect::<Vec<_>>()
|
||||
.into_par_iter()
|
||||
}
|
||||
|
||||
/// Commit all states to disk (separate from vec writes for parallelization).
|
||||
pub fn commit_all_states(&mut self, height: Height, cleanup: bool) -> Result<()> {
|
||||
self.par_iter_separate_mut()
|
||||
.try_for_each(|v| v.write_stateful_vecs(height))
|
||||
.try_for_each(|v| v.write_state(height, cleanup))
|
||||
}
|
||||
|
||||
/// Get minimum height from all separate cohorts' height-indexed vectors.
|
||||
|
||||
@@ -34,9 +34,6 @@ pub trait DynCohortVecs: Send + Sync {
|
||||
date_price: Option<Option<Dollars>>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Write stateful vectors to disk.
|
||||
fn write_stateful_vecs(&mut self, height: Height) -> Result<()>;
|
||||
|
||||
/// First phase of post-processing computations.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn compute_rest_part1(
|
||||
|
||||
@@ -6,7 +6,8 @@ use brk_error::Result;
|
||||
use brk_grouper::{CohortContext, Filter, Filtered, StateLevel};
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version};
|
||||
use vecdb::{Database, Exit, IterableVec};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{AnyStoredVec, Database, Exit, IterableVec};
|
||||
|
||||
use crate::{
|
||||
Indexes, indexes, price,
|
||||
@@ -87,6 +88,19 @@ impl UTXOCohortVecs {
|
||||
state.reset();
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_vecs_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
self.metrics.par_iter_mut()
|
||||
}
|
||||
|
||||
/// Commit state to disk (separate from vec writes for parallelization).
|
||||
pub fn write_state(&mut self, height: Height, cleanup: bool) -> Result<()> {
|
||||
if let Some(state) = self.state.as_mut() {
|
||||
state.write(height, cleanup)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Filtered for UTXOCohortVecs {
|
||||
@@ -189,16 +203,6 @@ impl DynCohortVecs for UTXOCohortVecs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_stateful_vecs(&mut self, height: Height) -> Result<()> {
|
||||
self.metrics.write()?;
|
||||
|
||||
if let Some(state) = self.state.as_mut() {
|
||||
state.commit(height)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compute_rest_part1(
|
||||
&mut self,
|
||||
indexes: &indexes::Vecs,
|
||||
|
||||
@@ -9,16 +9,18 @@ use std::path::Path;
|
||||
use brk_error::Result;
|
||||
use brk_grouper::{
|
||||
AmountFilter, ByAgeRange, ByAmountRange, ByEpoch, ByGreatEqualAmount, ByLowerThanAmount,
|
||||
ByMaxAge, ByMinAge, BySpendableType, ByTerm, ByYear, Filter, Filtered, StateLevel, Term,
|
||||
TimeFilter, UTXOGroups, DAYS_10Y, DAYS_12Y, DAYS_15Y, DAYS_1D, DAYS_1M, DAYS_1W, DAYS_1Y,
|
||||
ByMaxAge, ByMinAge, BySpendableType, ByTerm, ByYear, DAYS_1D, DAYS_1M, DAYS_1W, DAYS_1Y,
|
||||
DAYS_2M, DAYS_2Y, DAYS_3M, DAYS_3Y, DAYS_4M, DAYS_4Y, DAYS_5M, DAYS_5Y, DAYS_6M, DAYS_6Y,
|
||||
DAYS_7Y, DAYS_8Y,
|
||||
DAYS_7Y, DAYS_8Y, DAYS_10Y, DAYS_12Y, DAYS_15Y, Filter, Filtered, StateLevel, Term, TimeFilter,
|
||||
UTXOGroups,
|
||||
};
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{Bitcoin, DateIndex, Dollars, HalvingEpoch, Height, OutputType, Sats, Version, Year};
|
||||
use brk_types::{
|
||||
Bitcoin, DateIndex, Dollars, HalvingEpoch, Height, OutputType, Sats, Version, Year,
|
||||
};
|
||||
use derive_deref::{Deref, DerefMut};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{Database, Exit, IterableVec};
|
||||
use vecdb::{AnyStoredVec, Database, Exit, IterableVec};
|
||||
|
||||
use crate::{
|
||||
Indexes,
|
||||
@@ -372,18 +374,20 @@ impl UTXOCohorts {
|
||||
})
|
||||
}
|
||||
|
||||
/// Write stateful vectors for separate and aggregate cohorts.
|
||||
pub fn write_stateful_vecs(&mut self, height: Height) -> Result<()> {
|
||||
// Flush separate cohorts (includes metrics + state)
|
||||
self.par_iter_separate_mut()
|
||||
.try_for_each(|v| v.write_stateful_vecs(height))?;
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_vecs_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
// Collect all vecs from all cohorts (separate + aggregate)
|
||||
self.0
|
||||
.iter_mut()
|
||||
.flat_map(|v| v.par_iter_vecs_mut().collect::<Vec<_>>())
|
||||
.collect::<Vec<_>>()
|
||||
.into_par_iter()
|
||||
}
|
||||
|
||||
// Write aggregate cohorts' metrics (including price_percentiles)
|
||||
// Note: aggregate cohorts no longer maintain price_to_amount state
|
||||
for v in self.0.iter_aggregate_mut() {
|
||||
v.metrics.write()?;
|
||||
}
|
||||
Ok(())
|
||||
/// Commit all states to disk (separate from vec writes for parallelization).
|
||||
pub fn commit_all_states(&mut self, height: Height, cleanup: bool) -> Result<()> {
|
||||
self.par_iter_separate_mut()
|
||||
.try_for_each(|v| v.write_state(height, cleanup))
|
||||
}
|
||||
|
||||
/// Get minimum height from all separate cohorts' height-indexed vectors.
|
||||
|
||||
@@ -53,11 +53,11 @@ pub fn process_address_updates(
|
||||
|
||||
/// Flush checkpoint to disk (pure I/O, no processing).
|
||||
///
|
||||
/// Writes all accumulated data:
|
||||
/// - Cohort stateful vectors
|
||||
/// Writes all accumulated data in parallel:
|
||||
/// - Cohort stateful vectors (parallel internally)
|
||||
/// - Height-indexed vectors
|
||||
/// - Address indexes and data (parallel)
|
||||
/// - Transaction output index mappings (parallel)
|
||||
/// - Address indexes and data
|
||||
/// - Transaction output index mappings
|
||||
/// - Chain state
|
||||
///
|
||||
/// Set `with_changes=true` near chain tip to enable rollback support.
|
||||
@@ -67,43 +67,48 @@ pub fn write(
|
||||
chain_state: &[BlockState],
|
||||
with_changes: bool,
|
||||
) -> Result<()> {
|
||||
use rayon::prelude::*;
|
||||
|
||||
info!("Writing to disk...");
|
||||
let i = Instant::now();
|
||||
|
||||
// Flush cohort states (separate + aggregate)
|
||||
vecs.utxo_cohorts.write_stateful_vecs(height)?;
|
||||
vecs.address_cohorts.write_stateful_vecs(height)?;
|
||||
|
||||
// Flush height-indexed vectors
|
||||
vecs.height_to_unspendable_supply.write()?;
|
||||
vecs.height_to_opreturn_supply.write()?;
|
||||
vecs.addresstype_to_height_to_addr_count.write()?;
|
||||
vecs.addresstype_to_height_to_empty_addr_count.write()?;
|
||||
|
||||
// Flush large vecs in parallel
|
||||
let stamp = Stamp::from(height);
|
||||
let any_address_indexes = &mut vecs.any_address_indexes;
|
||||
let addresses_data = &mut vecs.addresses_data;
|
||||
let txoutindex_to_txinindex = &mut vecs.txoutindex_to_txinindex;
|
||||
|
||||
let (addr_result, txout_result) = rayon::join(
|
||||
|| {
|
||||
any_address_indexes
|
||||
.write(stamp, with_changes)
|
||||
.and(addresses_data.write(stamp, with_changes))
|
||||
},
|
||||
|| txoutindex_to_txinindex.stamped_write_maybe_with_changes(stamp, with_changes),
|
||||
);
|
||||
addr_result?;
|
||||
txout_result?;
|
||||
|
||||
// Sync in-memory chain_state to persisted and flush
|
||||
// Prepare chain_state before parallel write
|
||||
vecs.chain_state.truncate_if_needed(Height::ZERO)?;
|
||||
for block_state in chain_state {
|
||||
vecs.chain_state.push(block_state.supply.clone());
|
||||
}
|
||||
vecs.chain_state
|
||||
.stamped_write_maybe_with_changes(stamp, with_changes)?;
|
||||
|
||||
// Write all vecs in parallel using chained iterators
|
||||
vecs.any_address_indexes
|
||||
.par_iter_mut()
|
||||
.chain(vecs.addresses_data.par_iter_mut())
|
||||
.chain(vecs.addresstype_to_height_to_addr_count.par_iter_mut())
|
||||
.chain(
|
||||
vecs.addresstype_to_height_to_empty_addr_count
|
||||
.par_iter_mut(),
|
||||
)
|
||||
.chain(rayon::iter::once(
|
||||
&mut vecs.txoutindex_to_txinindex as &mut dyn AnyStoredVec,
|
||||
))
|
||||
.chain(rayon::iter::once(
|
||||
&mut vecs.chain_state as &mut dyn AnyStoredVec,
|
||||
))
|
||||
.chain(rayon::iter::once(
|
||||
&mut vecs.height_to_unspendable_supply as &mut dyn AnyStoredVec,
|
||||
))
|
||||
.chain(rayon::iter::once(
|
||||
&mut vecs.height_to_opreturn_supply as &mut dyn AnyStoredVec,
|
||||
))
|
||||
.chain(vecs.utxo_cohorts.par_iter_vecs_mut())
|
||||
.chain(vecs.address_cohorts.par_iter_vecs_mut())
|
||||
.try_for_each(|v| v.any_stamped_write_maybe_with_changes(stamp, with_changes))?;
|
||||
|
||||
// Commit states after vec writes
|
||||
let cleanup = with_changes;
|
||||
vecs.utxo_cohorts.commit_all_states(height, cleanup)?;
|
||||
vecs.address_cohorts.commit_all_states(height, cleanup)?;
|
||||
|
||||
info!("Wrote in {:?}", i.elapsed());
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
use brk_error::Result;
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{Bitcoin, Height, Sats, StoredF64, Version};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{AnyStoredVec, AnyVec, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVec};
|
||||
|
||||
use crate::{
|
||||
@@ -121,6 +122,16 @@ impl ActivityMetrics {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
vec![
|
||||
&mut self.height_to_sent as &mut dyn AnyStoredVec,
|
||||
&mut self.height_to_satblocks_destroyed as &mut dyn AnyStoredVec,
|
||||
&mut self.height_to_satdays_destroyed as &mut dyn AnyStoredVec,
|
||||
]
|
||||
.into_par_iter()
|
||||
}
|
||||
|
||||
/// Validate computed versions against base version.
|
||||
pub fn validate_computed_versions(&mut self, _base_version: Version) -> Result<()> {
|
||||
// Validation logic for computed vecs
|
||||
|
||||
@@ -28,7 +28,8 @@ use brk_error::Result;
|
||||
use brk_grouper::Filter;
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{Bitcoin, DateIndex, Dollars, Height, Version};
|
||||
use vecdb::{Exit, IterableVec};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{AnyStoredVec, Exit, IterableVec};
|
||||
|
||||
use crate::{Indexes, indexes, price, stateful::states::CohortState};
|
||||
|
||||
@@ -125,6 +126,28 @@ impl CohortMetrics {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
let mut vecs: Vec<&mut dyn AnyStoredVec> = Vec::new();
|
||||
|
||||
vecs.extend(self.supply.par_iter_mut().collect::<Vec<_>>());
|
||||
vecs.extend(self.activity.par_iter_mut().collect::<Vec<_>>());
|
||||
|
||||
if let Some(realized) = self.realized.as_mut() {
|
||||
vecs.extend(realized.par_iter_mut().collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
if let Some(unrealized) = self.unrealized.as_mut() {
|
||||
vecs.extend(unrealized.par_iter_mut().collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
if let Some(price_paid) = self.price_paid.as_mut() {
|
||||
vecs.extend(price_paid.par_iter_mut().collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
vecs.into_par_iter()
|
||||
}
|
||||
|
||||
/// Validate computed versions against base version.
|
||||
pub fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {
|
||||
self.supply.validate_computed_versions(base_version)?;
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
use brk_error::Result;
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{DateIndex, Dollars, Height, Version};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVec};
|
||||
|
||||
use crate::{
|
||||
@@ -121,6 +122,24 @@ impl PricePaidMetrics {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
let mut vecs: Vec<&mut dyn AnyStoredVec> = vec![
|
||||
&mut self.height_to_min_price_paid,
|
||||
&mut self.height_to_max_price_paid,
|
||||
];
|
||||
if let Some(pp) = self.price_percentiles.as_mut() {
|
||||
vecs.extend(
|
||||
pp.vecs
|
||||
.iter_mut()
|
||||
.flatten()
|
||||
.filter_map(|v| v.dateindex.as_mut())
|
||||
.map(|v| v as &mut dyn AnyStoredVec),
|
||||
);
|
||||
}
|
||||
vecs.into_par_iter()
|
||||
}
|
||||
|
||||
/// Validate computed versions or reset if mismatched.
|
||||
pub fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {
|
||||
if let Some(price_percentiles) = self.price_percentiles.as_mut() {
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
use brk_error::Result;
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{Bitcoin, DateIndex, Dollars, Height, StoredF32, StoredF64, Version};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableVec, PcoVec};
|
||||
|
||||
use crate::{
|
||||
@@ -448,6 +449,24 @@ impl RealizedMetrics {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
let mut vecs: Vec<&mut dyn AnyStoredVec> = vec![
|
||||
&mut self.height_to_realized_cap,
|
||||
&mut self.height_to_realized_profit,
|
||||
&mut self.height_to_realized_loss,
|
||||
&mut self.height_to_value_created,
|
||||
&mut self.height_to_value_destroyed,
|
||||
];
|
||||
if let Some(v) = self.height_to_adjusted_value_created.as_mut() {
|
||||
vecs.push(v);
|
||||
}
|
||||
if let Some(v) = self.height_to_adjusted_value_destroyed.as_mut() {
|
||||
vecs.push(v);
|
||||
}
|
||||
vecs.into_par_iter()
|
||||
}
|
||||
|
||||
/// Validate computed versions against base version.
|
||||
pub fn validate_computed_versions(&mut self, _base_version: Version) -> Result<()> {
|
||||
// Validation logic for computed vecs
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
use brk_error::Result;
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{Bitcoin, DateIndex, Dollars, Height, Sats, StoredU64, Version};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{
|
||||
AnyStoredVec, AnyVec, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableVec, PcoVec,
|
||||
TypedVecIterator,
|
||||
@@ -137,6 +138,15 @@ impl SupplyMetrics {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
vec![
|
||||
&mut self.height_to_supply as &mut dyn AnyStoredVec,
|
||||
&mut self.height_to_utxo_count as &mut dyn AnyStoredVec,
|
||||
]
|
||||
.into_par_iter()
|
||||
}
|
||||
|
||||
/// Validate computed versions against base version.
|
||||
pub fn validate_computed_versions(&mut self, _base_version: Version) -> Result<()> {
|
||||
// Validation logic for computed vecs
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
use brk_error::Result;
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{DateIndex, Dollars, Height, Sats, Version};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{
|
||||
AnyStoredVec, EagerVec, Exit, GenericStoredVec, ImportableVec, IterableCloneableVec, PcoVec,
|
||||
};
|
||||
@@ -231,6 +232,21 @@ impl UnrealizedMetrics {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a parallel iterator over all vecs for parallel writing.
|
||||
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
|
||||
vec![
|
||||
&mut self.height_to_supply_in_profit as &mut dyn AnyStoredVec,
|
||||
&mut self.height_to_supply_in_loss as &mut dyn AnyStoredVec,
|
||||
&mut self.height_to_unrealized_profit as &mut dyn AnyStoredVec,
|
||||
&mut self.height_to_unrealized_loss as &mut dyn AnyStoredVec,
|
||||
&mut self.dateindex_to_supply_in_profit as &mut dyn AnyStoredVec,
|
||||
&mut self.dateindex_to_supply_in_loss as &mut dyn AnyStoredVec,
|
||||
&mut self.dateindex_to_unrealized_profit as &mut dyn AnyStoredVec,
|
||||
&mut self.dateindex_to_unrealized_loss as &mut dyn AnyStoredVec,
|
||||
]
|
||||
.into_par_iter()
|
||||
}
|
||||
|
||||
/// Compute aggregate values from separate cohorts.
|
||||
pub fn compute_from_stateful(
|
||||
&mut self,
|
||||
|
||||
@@ -173,14 +173,11 @@ impl AddressCohortState {
|
||||
)
|
||||
});
|
||||
|
||||
self.inner.decrement_(
|
||||
&addr_supply,
|
||||
addressdata.realized_cap,
|
||||
realized_price,
|
||||
);
|
||||
self.inner
|
||||
.decrement_(&addr_supply, addressdata.realized_cap, realized_price);
|
||||
}
|
||||
|
||||
pub fn commit(&mut self, height: Height) -> Result<()> {
|
||||
self.inner.commit(height)
|
||||
pub fn write(&mut self, height: Height, cleanup: bool) -> Result<()> {
|
||||
self.inner.write(height, cleanup)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,9 +369,9 @@ impl CohortState {
|
||||
}
|
||||
|
||||
/// Flush state to disk at checkpoint.
|
||||
pub fn commit(&mut self, height: Height) -> Result<()> {
|
||||
pub fn write(&mut self, height: Height, cleanup: bool) -> Result<()> {
|
||||
if let Some(p) = self.price_to_amount.as_mut() {
|
||||
p.flush(height)?;
|
||||
p.write(height, cleanup)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -144,9 +144,7 @@ impl PriceToAmount {
|
||||
|
||||
for (&price, &amount) in state.iter() {
|
||||
cumsum += u64::from(amount);
|
||||
while idx < PERCENTILES_LEN
|
||||
&& cumsum >= total * u64::from(PERCENTILES[idx]) / 100
|
||||
{
|
||||
while idx < PERCENTILES_LEN && cumsum >= total * u64::from(PERCENTILES[idx]) / 100 {
|
||||
result[idx] = price;
|
||||
idx += 1;
|
||||
}
|
||||
@@ -181,16 +179,19 @@ impl PriceToAmount {
|
||||
.collect::<BTreeMap<Height, PathBuf>>())
|
||||
}
|
||||
|
||||
pub fn flush(&mut self, height: Height) -> Result<()> {
|
||||
/// Flush state to disk, optionally cleaning up old state files.
|
||||
pub fn write(&mut self, height: Height, cleanup: bool) -> Result<()> {
|
||||
self.apply_pending();
|
||||
|
||||
let files = self.read_dir(Some(height))?;
|
||||
if cleanup {
|
||||
let files = self.read_dir(Some(height))?;
|
||||
|
||||
for (_, path) in files
|
||||
.iter()
|
||||
.take(files.len().saturating_sub(STATE_TO_KEEP - 1))
|
||||
{
|
||||
fs::remove_file(path)?;
|
||||
for (_, path) in files
|
||||
.iter()
|
||||
.take(files.len().saturating_sub(STATE_TO_KEEP - 1))
|
||||
{
|
||||
fs::remove_file(path)?;
|
||||
}
|
||||
}
|
||||
|
||||
fs::write(self.path_state(height), self.state.u().serialize()?)?;
|
||||
|
||||
Reference in New Issue
Block a user