mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
global: MASSIVE snapshot
This commit is contained in:
@@ -3,11 +3,11 @@ use std::thread;
|
||||
use brk_cohort::ByAddressType;
|
||||
use brk_error::Result;
|
||||
use brk_indexer::Indexer;
|
||||
use brk_types::{CentsUnsigned, DateIndex, Dollars, Height, OutputType, Sats, TxIndex, TypeIndex};
|
||||
use brk_types::{Cents, Date, Day1, Height, OutputType, Sats, StoredU64, TxIndex, TypeIndex};
|
||||
use rayon::prelude::*;
|
||||
use rustc_hash::FxHashSet;
|
||||
use tracing::{debug, info};
|
||||
use vecdb::{Exit, IterableVec, TypedVecIterator, VecIndex};
|
||||
use vecdb::{AnyVec, Exit, ReadableVec, VecIndex};
|
||||
|
||||
use crate::{
|
||||
blocks,
|
||||
@@ -20,7 +20,7 @@ use crate::{
|
||||
compute::write::{process_address_updates, write},
|
||||
state::{BlockState, Transacted},
|
||||
},
|
||||
indexes, inputs, outputs, price, transactions,
|
||||
indexes, inputs, outputs, prices, transactions,
|
||||
};
|
||||
|
||||
use super::{
|
||||
@@ -30,13 +30,13 @@ use super::{
|
||||
vecs::Vecs,
|
||||
},
|
||||
BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1,
|
||||
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, TxInIterators, TxOutIterators,
|
||||
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, TxInReaders, TxOutReaders,
|
||||
VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex,
|
||||
};
|
||||
|
||||
/// Process all blocks from starting_height to last_height.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn process_blocks(
|
||||
pub(crate) fn process_blocks(
|
||||
vecs: &mut Vecs,
|
||||
indexer: &Indexer,
|
||||
indexes: &indexes::Vecs,
|
||||
@@ -44,7 +44,7 @@ pub fn process_blocks(
|
||||
outputs: &outputs::Vecs,
|
||||
transactions: &transactions::Vecs,
|
||||
blocks: &blocks::Vecs,
|
||||
price: Option<&price::Vecs>,
|
||||
prices: &prices::Vecs,
|
||||
starting_height: Height,
|
||||
last_height: Height,
|
||||
chain_state: &mut Vec<BlockState>,
|
||||
@@ -52,7 +52,7 @@ pub fn process_blocks(
|
||||
) -> Result<()> {
|
||||
// Create computation context with pre-computed vectors for thread-safe access
|
||||
debug!("creating ComputeContext");
|
||||
let ctx = ComputeContext::new(starting_height, last_height, blocks, price);
|
||||
let ctx = ComputeContext::new(starting_height, last_height, blocks, prices);
|
||||
debug!("ComputeContext created");
|
||||
|
||||
if ctx.starting_height > ctx.last_height {
|
||||
@@ -72,34 +72,39 @@ pub fn process_blocks(
|
||||
// From blocks:
|
||||
let height_to_timestamp = &blocks.time.timestamp_monotonic;
|
||||
let height_to_date = &blocks.time.date;
|
||||
let dateindex_to_first_height = &indexes.dateindex.first_height;
|
||||
let dateindex_to_height_count = &indexes.dateindex.height_count;
|
||||
let day1_to_first_height = &indexes.day1.first_height;
|
||||
let day1_to_height_count = &indexes.day1.height_count;
|
||||
let txindex_to_output_count = &indexes.txindex.output_count;
|
||||
let txindex_to_input_count = &indexes.txindex.input_count;
|
||||
|
||||
// From price (optional) - use cents for computation:
|
||||
let height_to_price = price.map(|p| &p.cents.split.height.close);
|
||||
let dateindex_to_price = price.map(|p| &p.cents.split.dateindex.close);
|
||||
// From price - use cents for computation:
|
||||
let height_to_price = &prices.cents.price;
|
||||
|
||||
// Access pre-computed vectors from context for thread-safe access
|
||||
let height_to_price_vec = &ctx.height_to_price;
|
||||
let height_to_timestamp_vec = &ctx.height_to_timestamp;
|
||||
|
||||
// Create iterators for sequential access
|
||||
let mut height_to_first_txindex_iter = height_to_first_txindex.into_iter();
|
||||
let mut height_to_first_txoutindex_iter = height_to_first_txoutindex.into_iter();
|
||||
let mut height_to_first_txinindex_iter = height_to_first_txinindex.into_iter();
|
||||
let mut height_to_tx_count_iter = height_to_tx_count.into_iter();
|
||||
let mut height_to_output_count_iter = height_to_output_count.into_iter();
|
||||
let mut height_to_input_count_iter = height_to_input_count.into_iter();
|
||||
let mut height_to_timestamp_iter = height_to_timestamp.into_iter();
|
||||
let mut height_to_date_iter = height_to_date.into_iter();
|
||||
let mut dateindex_to_first_height_iter = dateindex_to_first_height.into_iter();
|
||||
let mut dateindex_to_height_count_iter = dateindex_to_height_count.into_iter();
|
||||
let mut txindex_to_output_count_iter = txindex_to_output_count.iter();
|
||||
let mut txindex_to_input_count_iter = txindex_to_input_count.iter();
|
||||
let mut height_to_price_iter = height_to_price.map(|v| v.into_iter());
|
||||
let mut dateindex_to_price_iter = dateindex_to_price.map(|v| v.into_iter());
|
||||
// Range for pre-collecting height-indexed vecs
|
||||
let start_usize = starting_height.to_usize();
|
||||
let end_usize = last_height.to_usize() + 1;
|
||||
|
||||
// Pre-collect height-indexed vecs for the block range (bulk read before hot loop)
|
||||
let height_to_first_txindex_vec: Vec<TxIndex> =
|
||||
height_to_first_txindex.collect_range_at(start_usize, end_usize);
|
||||
let height_to_first_txoutindex_vec: Vec<_> =
|
||||
height_to_first_txoutindex.collect_range_at(start_usize, end_usize);
|
||||
let height_to_first_txinindex_vec: Vec<_> =
|
||||
height_to_first_txinindex.collect_range_at(start_usize, end_usize);
|
||||
let height_to_tx_count_vec: Vec<_> =
|
||||
height_to_tx_count.collect_range_at(start_usize, end_usize);
|
||||
let height_to_output_count_vec: Vec<_> =
|
||||
height_to_output_count.collect_range_at(start_usize, end_usize);
|
||||
let height_to_input_count_vec: Vec<_> =
|
||||
height_to_input_count.collect_range_at(start_usize, end_usize);
|
||||
let height_to_timestamp_collected: Vec<_> =
|
||||
height_to_timestamp.collect_range_at(start_usize, end_usize);
|
||||
let height_to_price_collected: Vec<_> =
|
||||
height_to_price.collect_range_at(start_usize, end_usize);
|
||||
|
||||
debug!("creating VecsReaders");
|
||||
let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
|
||||
@@ -108,8 +113,11 @@ pub fn process_blocks(
|
||||
// Build txindex -> height lookup map for efficient prev_height computation
|
||||
debug!("building txindex_to_height RangeMap");
|
||||
let mut txindex_to_height: RangeMap<TxIndex, Height> = {
|
||||
let mut map = RangeMap::with_capacity(last_height.to_usize() + 1);
|
||||
for first_txindex in indexer.vecs.transactions.first_txindex.into_iter() {
|
||||
let first_txindex_len = indexer.vecs.transactions.first_txindex.len();
|
||||
let all_first_txindexes: Vec<TxIndex> =
|
||||
indexer.vecs.transactions.first_txindex.collect_range_at(0, first_txindex_len);
|
||||
let mut map = RangeMap::with_capacity(first_txindex_len);
|
||||
for first_txindex in all_first_txindexes {
|
||||
map.push(first_txindex);
|
||||
}
|
||||
map
|
||||
@@ -117,18 +125,18 @@ pub fn process_blocks(
|
||||
debug!("txindex_to_height RangeMap built");
|
||||
|
||||
// Create reusable iterators for sequential txout/txin reads (16KB buffered)
|
||||
let mut txout_iters = TxOutIterators::new(indexer);
|
||||
let mut txin_iters = TxInIterators::new(indexer, inputs, &mut txindex_to_height);
|
||||
let txout_iters = TxOutReaders::new(indexer);
|
||||
let mut txin_iters = TxInReaders::new(indexer, inputs, &mut txindex_to_height);
|
||||
|
||||
// Create iterators for first address indexes per type
|
||||
let mut first_p2a_iter = indexer.vecs.addresses.first_p2aaddressindex.into_iter();
|
||||
let mut first_p2pk33_iter = indexer.vecs.addresses.first_p2pk33addressindex.into_iter();
|
||||
let mut first_p2pk65_iter = indexer.vecs.addresses.first_p2pk65addressindex.into_iter();
|
||||
let mut first_p2pkh_iter = indexer.vecs.addresses.first_p2pkhaddressindex.into_iter();
|
||||
let mut first_p2sh_iter = indexer.vecs.addresses.first_p2shaddressindex.into_iter();
|
||||
let mut first_p2tr_iter = indexer.vecs.addresses.first_p2traddressindex.into_iter();
|
||||
let mut first_p2wpkh_iter = indexer.vecs.addresses.first_p2wpkhaddressindex.into_iter();
|
||||
let mut first_p2wsh_iter = indexer.vecs.addresses.first_p2wshaddressindex.into_iter();
|
||||
// Pre-collect first address indexes per type for the block range
|
||||
let first_p2a_vec = indexer.vecs.addresses.first_p2aaddressindex.collect_range_at(start_usize, end_usize);
|
||||
let first_p2pk33_vec = indexer.vecs.addresses.first_p2pk33addressindex.collect_range_at(start_usize, end_usize);
|
||||
let first_p2pk65_vec = indexer.vecs.addresses.first_p2pk65addressindex.collect_range_at(start_usize, end_usize);
|
||||
let first_p2pkh_vec = indexer.vecs.addresses.first_p2pkhaddressindex.collect_range_at(start_usize, end_usize);
|
||||
let first_p2sh_vec = indexer.vecs.addresses.first_p2shaddressindex.collect_range_at(start_usize, end_usize);
|
||||
let first_p2tr_vec = indexer.vecs.addresses.first_p2traddressindex.collect_range_at(start_usize, end_usize);
|
||||
let first_p2wpkh_vec = indexer.vecs.addresses.first_p2wpkhaddressindex.collect_range_at(start_usize, end_usize);
|
||||
let first_p2wsh_vec = indexer.vecs.addresses.first_p2wshaddressindex.collect_range_at(start_usize, end_usize);
|
||||
|
||||
// Track running totals - recover from previous height if resuming
|
||||
debug!("recovering addr_counts from height {}", starting_height);
|
||||
@@ -151,48 +159,58 @@ pub fn process_blocks(
|
||||
// Track activity counts - reset each block
|
||||
let mut activity_counts = AddressTypeToActivityCounts::default();
|
||||
|
||||
// Pre-collect lazy vecs that don't support iterators
|
||||
let height_to_date_vec: Vec<Date> = height_to_date.collect_range_at(start_usize, end_usize);
|
||||
|
||||
debug!("creating AddressCache");
|
||||
let mut cache = AddressCache::new();
|
||||
debug!("AddressCache created, entering main loop");
|
||||
|
||||
// Cache for day1 lookups - same day1 repeats ~140 times per day
|
||||
let mut cached_day1 = Day1::default();
|
||||
let mut cached_date_first_height = Height::ZERO;
|
||||
let mut cached_date_height_count = StoredU64::default();
|
||||
|
||||
// Reusable hashsets for received addresses (avoid per-block allocation)
|
||||
let mut received_addresses = ByAddressType::<FxHashSet<TypeIndex>>::default();
|
||||
|
||||
// Main block iteration
|
||||
for height in starting_height.to_usize()..=last_height.to_usize() {
|
||||
let height = Height::from(height);
|
||||
|
||||
info!("Processing chain at {}...", height);
|
||||
|
||||
// Get block metadata
|
||||
let first_txindex = height_to_first_txindex_iter.get_unwrap(height);
|
||||
let tx_count = u64::from(height_to_tx_count_iter.get_unwrap(height));
|
||||
let first_txoutindex = height_to_first_txoutindex_iter
|
||||
.get_unwrap(height)
|
||||
.to_usize();
|
||||
let output_count = u64::from(height_to_output_count_iter.get_unwrap(height)) as usize;
|
||||
let first_txinindex = height_to_first_txinindex_iter.get_unwrap(height).to_usize();
|
||||
let input_count = u64::from(height_to_input_count_iter.get_unwrap(height)) as usize;
|
||||
let timestamp = height_to_timestamp_iter.get_unwrap(height);
|
||||
let block_price = height_to_price_iter.as_mut().map(|v| *v.get_unwrap(height));
|
||||
// Get block metadata from pre-collected vecs
|
||||
let offset = height.to_usize() - start_usize;
|
||||
let first_txindex = height_to_first_txindex_vec[offset];
|
||||
let tx_count = u64::from(height_to_tx_count_vec[offset]);
|
||||
let first_txoutindex = height_to_first_txoutindex_vec[offset].to_usize();
|
||||
let output_count = u64::from(height_to_output_count_vec[offset]) as usize;
|
||||
let first_txinindex = height_to_first_txinindex_vec[offset].to_usize();
|
||||
let input_count = u64::from(height_to_input_count_vec[offset]) as usize;
|
||||
let timestamp = height_to_timestamp_collected[offset];
|
||||
let block_price = height_to_price_collected[offset];
|
||||
|
||||
// Debug validation: verify context methods match iterator values
|
||||
// Debug validation: verify context methods match pre-collected values
|
||||
debug_assert_eq!(ctx.timestamp_at(height), timestamp);
|
||||
debug_assert_eq!(ctx.price_at(height), block_price);
|
||||
|
||||
// Build txindex mappings for this block
|
||||
// Build txindex mappings for this block (pass ReadableVec refs directly)
|
||||
let txoutindex_to_txindex =
|
||||
build_txoutindex_to_txindex(first_txindex, tx_count, &mut txindex_to_output_count_iter);
|
||||
build_txoutindex_to_txindex(first_txindex, tx_count, txindex_to_output_count);
|
||||
let txinindex_to_txindex =
|
||||
build_txinindex_to_txindex(first_txindex, tx_count, &mut txindex_to_input_count_iter);
|
||||
build_txinindex_to_txindex(first_txindex, tx_count, txindex_to_input_count);
|
||||
|
||||
// Get first address indexes for this height
|
||||
// Get first address indexes for this height from pre-collected vecs
|
||||
let first_addressindexes = ByAddressType {
|
||||
p2a: TypeIndex::from(first_p2a_iter.get_unwrap(height).to_usize()),
|
||||
p2pk33: TypeIndex::from(first_p2pk33_iter.get_unwrap(height).to_usize()),
|
||||
p2pk65: TypeIndex::from(first_p2pk65_iter.get_unwrap(height).to_usize()),
|
||||
p2pkh: TypeIndex::from(first_p2pkh_iter.get_unwrap(height).to_usize()),
|
||||
p2sh: TypeIndex::from(first_p2sh_iter.get_unwrap(height).to_usize()),
|
||||
p2tr: TypeIndex::from(first_p2tr_iter.get_unwrap(height).to_usize()),
|
||||
p2wpkh: TypeIndex::from(first_p2wpkh_iter.get_unwrap(height).to_usize()),
|
||||
p2wsh: TypeIndex::from(first_p2wsh_iter.get_unwrap(height).to_usize()),
|
||||
p2a: TypeIndex::from(first_p2a_vec[offset].to_usize()),
|
||||
p2pk33: TypeIndex::from(first_p2pk33_vec[offset].to_usize()),
|
||||
p2pk65: TypeIndex::from(first_p2pk65_vec[offset].to_usize()),
|
||||
p2pkh: TypeIndex::from(first_p2pkh_vec[offset].to_usize()),
|
||||
p2sh: TypeIndex::from(first_p2sh_vec[offset].to_usize()),
|
||||
p2tr: TypeIndex::from(first_p2tr_vec[offset].to_usize()),
|
||||
p2wpkh: TypeIndex::from(first_p2wpkh_vec[offset].to_usize()),
|
||||
p2wsh: TypeIndex::from(first_p2wsh_vec[offset].to_usize()),
|
||||
};
|
||||
|
||||
// Reset per-block values for all separate cohorts
|
||||
@@ -213,7 +231,7 @@ pub fn process_blocks(
|
||||
};
|
||||
|
||||
// Process outputs and inputs in parallel with tick-tock
|
||||
let (outputs_result, inputs_result) = thread::scope(|scope| {
|
||||
let (outputs_result, inputs_result) = thread::scope(|scope| -> Result<_> {
|
||||
// Tick-tock age transitions in background
|
||||
scope.spawn(|| {
|
||||
vecs.utxo_cohorts
|
||||
@@ -247,7 +265,7 @@ pub fn process_blocks(
|
||||
&vr,
|
||||
&vecs.any_address_indexes,
|
||||
&vecs.addresses_data,
|
||||
)
|
||||
)?
|
||||
} else {
|
||||
InputsResult {
|
||||
height_to_sent: Default::default(),
|
||||
@@ -257,10 +275,10 @@ pub fn process_blocks(
|
||||
}
|
||||
};
|
||||
|
||||
let outputs_result = outputs_handle.join().unwrap();
|
||||
let outputs_result = outputs_handle.join().unwrap()?;
|
||||
|
||||
(outputs_result, inputs_result)
|
||||
});
|
||||
Ok((outputs_result, inputs_result))
|
||||
})?;
|
||||
|
||||
// Merge new address data into current cache
|
||||
cache.merge_funded(outputs_result.address_data);
|
||||
@@ -302,16 +320,14 @@ pub fn process_blocks(
|
||||
});
|
||||
|
||||
// Build set of addresses that received this block (for detecting "both" in sent)
|
||||
let received_addresses: ByAddressType<FxHashSet<TypeIndex>> = {
|
||||
let mut sets = ByAddressType::<FxHashSet<TypeIndex>>::default();
|
||||
for (output_type, vec) in outputs_result.received_data.iter() {
|
||||
let set = sets.get_mut_unwrap(output_type);
|
||||
for (type_index, _) in vec {
|
||||
set.insert(*type_index);
|
||||
}
|
||||
// Reuse pre-allocated hashsets: clear preserves capacity, avoiding reallocation
|
||||
received_addresses.values_mut().for_each(|set| set.clear());
|
||||
for (output_type, vec) in outputs_result.received_data.iter() {
|
||||
let set = received_addresses.get_mut_unwrap(output_type);
|
||||
for (type_index, _) in vec {
|
||||
set.insert(*type_index);
|
||||
}
|
||||
sets
|
||||
};
|
||||
}
|
||||
|
||||
// Process UTXO cohorts and Address cohorts in parallel
|
||||
// - Main thread: UTXO cohorts receive/send
|
||||
@@ -339,12 +355,12 @@ pub fn process_blocks(
|
||||
&mut vecs.address_cohorts,
|
||||
&mut lookup,
|
||||
block_price,
|
||||
ctx.price_range_max.as_ref(),
|
||||
&ctx.price_range_max,
|
||||
&mut addr_counts,
|
||||
&mut empty_addr_counts,
|
||||
&mut activity_counts,
|
||||
&received_addresses,
|
||||
height_to_price_vec.as_deref(),
|
||||
height_to_price_vec,
|
||||
height_to_timestamp_vec,
|
||||
height,
|
||||
timestamp,
|
||||
@@ -356,7 +372,7 @@ pub fn process_blocks(
|
||||
vecs.utxo_cohorts
|
||||
.receive(transacted, height, timestamp, block_price);
|
||||
vecs.utxo_cohorts
|
||||
.send(height_to_sent, chain_state, ctx.price_range_max.as_ref());
|
||||
.send(height_to_sent, chain_state, &ctx.price_range_max);
|
||||
});
|
||||
|
||||
// Push to height-indexed vectors
|
||||
@@ -370,17 +386,24 @@ pub fn process_blocks(
|
||||
vecs.address_activity
|
||||
.truncate_push_height(height, &activity_counts)?;
|
||||
|
||||
// Get date info for unrealized state computation
|
||||
let date = height_to_date_iter.get_unwrap(height);
|
||||
let dateindex = DateIndex::try_from(date).unwrap();
|
||||
let date_first_height = dateindex_to_first_height_iter.get_unwrap(dateindex);
|
||||
let date_height_count = dateindex_to_height_count_iter.get_unwrap(dateindex);
|
||||
// Get date info for unrealized state computation (cold path - once per day)
|
||||
// Cache day1 lookups: same day1 repeats ~140 times per day,
|
||||
// avoiding redundant PcoVec page decompressions.
|
||||
let date = height_to_date_vec[offset];
|
||||
let day1 = Day1::try_from(date).unwrap();
|
||||
let (date_first_height, date_height_count) = if day1 == cached_day1 {
|
||||
(cached_date_first_height, cached_date_height_count)
|
||||
} else {
|
||||
let fh: Height = day1_to_first_height.collect_one(day1).unwrap();
|
||||
let hc = day1_to_height_count.collect_one(day1).unwrap();
|
||||
cached_day1 = day1;
|
||||
cached_date_first_height = fh;
|
||||
cached_date_height_count = hc;
|
||||
(fh, hc)
|
||||
};
|
||||
let is_date_last_height =
|
||||
date_first_height + Height::from(date_height_count).decremented().unwrap() == height;
|
||||
let date_price = dateindex_to_price_iter
|
||||
.as_mut()
|
||||
.map(|v| is_date_last_height.then(|| *v.get_unwrap(dateindex)));
|
||||
let dateindex_opt = is_date_last_height.then_some(dateindex);
|
||||
let day1_opt = is_date_last_height.then_some(day1);
|
||||
|
||||
// Push cohort states and compute unrealized
|
||||
push_cohort_states(
|
||||
@@ -388,33 +411,26 @@ pub fn process_blocks(
|
||||
&mut vecs.address_cohorts,
|
||||
height,
|
||||
block_price,
|
||||
dateindex_opt,
|
||||
date_price,
|
||||
)?;
|
||||
|
||||
// Compute and push percentiles for aggregate cohorts (all, sth, lth)
|
||||
if let Some(dateindex) = dateindex_opt {
|
||||
let spot = date_price
|
||||
.flatten()
|
||||
.map(|c| c.to_dollars())
|
||||
.unwrap_or(Dollars::NAN);
|
||||
vecs.utxo_cohorts
|
||||
.truncate_push_aggregate_percentiles(dateindex, spot, &vecs.states_path)?;
|
||||
let spot = block_price.to_dollars();
|
||||
vecs.utxo_cohorts.truncate_push_aggregate_percentiles(
|
||||
height,
|
||||
spot,
|
||||
day1_opt,
|
||||
&vecs.states_path,
|
||||
)?;
|
||||
|
||||
// Compute unrealized peak regret by age range (once per day)
|
||||
// Aggregate cohorts (all, term, etc.) get values via compute_from_stateful
|
||||
if let Some(spot_cents) = block_price
|
||||
&& let Some(price_range_max) = ctx.price_range_max.as_ref()
|
||||
{
|
||||
vecs.utxo_cohorts.compute_and_push_peak_regret(
|
||||
chain_state,
|
||||
height,
|
||||
timestamp,
|
||||
spot_cents,
|
||||
price_range_max,
|
||||
dateindex,
|
||||
)?;
|
||||
}
|
||||
// Compute unrealized peak regret by age range (once per day)
|
||||
if let Some(day1) = day1_opt {
|
||||
vecs.utxo_cohorts.compute_and_push_peak_regret(
|
||||
chain_state,
|
||||
height,
|
||||
timestamp,
|
||||
block_price,
|
||||
&ctx.price_range_max,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Periodic checkpoint flush
|
||||
@@ -487,20 +503,16 @@ fn push_cohort_states(
|
||||
utxo_cohorts: &mut UTXOCohorts,
|
||||
address_cohorts: &mut AddressCohorts,
|
||||
height: Height,
|
||||
height_price: Option<CentsUnsigned>,
|
||||
dateindex: Option<DateIndex>,
|
||||
date_price: Option<Option<CentsUnsigned>>,
|
||||
height_price: Cents,
|
||||
) -> Result<()> {
|
||||
// utxo_cohorts.iter_separate_mut().try_for_each(|v| {
|
||||
utxo_cohorts.par_iter_separate_mut().try_for_each(|v| {
|
||||
v.truncate_push(height)?;
|
||||
v.compute_then_truncate_push_unrealized_states(height, height_price, dateindex, date_price)
|
||||
v.compute_then_truncate_push_unrealized_states(height, height_price)
|
||||
})?;
|
||||
|
||||
// address_cohorts.iter_separate_mut().try_for_each(|v| {
|
||||
address_cohorts.par_iter_separate_mut().try_for_each(|v| {
|
||||
v.truncate_push(height)?;
|
||||
v.compute_then_truncate_push_unrealized_states(height, height_price, dateindex, date_price)
|
||||
v.compute_then_truncate_push_unrealized_states(height, height_price)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user