diff --git a/crates/brk_computer/src/blks.rs b/crates/brk_computer/src/blks.rs index 2af253b03..3320ab19c 100644 --- a/crates/brk_computer/src/blks.rs +++ b/crates/brk_computer/src/blks.rs @@ -53,6 +53,7 @@ impl Vecs { exit: &Exit, ) -> Result<()> { self.compute_(indexer, starting_indexes, reader, exit)?; + let _lock = exit.lock(); self.db.compact()?; Ok(()) } diff --git a/crates/brk_computer/src/chain/compute.rs b/crates/brk_computer/src/chain/compute.rs index 42082292f..1ac68ad31 100644 --- a/crates/brk_computer/src/chain/compute.rs +++ b/crates/brk_computer/src/chain/compute.rs @@ -21,6 +21,7 @@ impl Vecs { exit: &Exit, ) -> Result<()> { self.compute_(indexer, indexes, txins, starting_indexes, price, exit)?; + let _lock = exit.lock(); self.db.compact()?; Ok(()) } diff --git a/crates/brk_computer/src/cointime.rs b/crates/brk_computer/src/cointime.rs index 3abe35db5..b6ffb84ab 100644 --- a/crates/brk_computer/src/cointime.rs +++ b/crates/brk_computer/src/cointime.rs @@ -191,6 +191,7 @@ impl Vecs { exit: &Exit, ) -> Result<()> { self.compute_(indexes, starting_indexes, price, chain, stateful, exit)?; + let _lock = exit.lock(); self.db.compact()?; Ok(()) } diff --git a/crates/brk_computer/src/constants.rs b/crates/brk_computer/src/constants.rs index d98525949..ac28518bb 100644 --- a/crates/brk_computer/src/constants.rs +++ b/crates/brk_computer/src/constants.rs @@ -179,6 +179,7 @@ impl Vecs { exit: &Exit, ) -> Result<()> { self.compute_(indexes, starting_indexes, exit)?; + let _lock = exit.lock(); self.db.compact()?; Ok(()) } diff --git a/crates/brk_computer/src/fetched.rs b/crates/brk_computer/src/fetched.rs index 3469332eb..a48db36b4 100644 --- a/crates/brk_computer/src/fetched.rs +++ b/crates/brk_computer/src/fetched.rs @@ -62,6 +62,7 @@ impl Vecs { exit: &Exit, ) -> Result<()> { self.compute_(indexer, indexes, starting_indexes, exit)?; + let _lock = exit.lock(); self.db.compact()?; Ok(()) } diff --git a/crates/brk_computer/src/indexes.rs b/crates/brk_computer/src/indexes.rs index 74595dce1..84b819ca9 100644 --- a/crates/brk_computer/src/indexes.rs +++ b/crates/brk_computer/src/indexes.rs @@ -235,6 +235,7 @@ impl Vecs { exit: &Exit, ) -> Result { let indexes = self.compute_(indexer, starting_indexes, exit)?; + let _lock = exit.lock(); self.db.compact()?; Ok(indexes) } diff --git a/crates/brk_computer/src/market/compute.rs b/crates/brk_computer/src/market/compute.rs index 366da226d..4ccfcc94e 100644 --- a/crates/brk_computer/src/market/compute.rs +++ b/crates/brk_computer/src/market/compute.rs @@ -21,6 +21,7 @@ impl Vecs { exit: &Exit, ) -> Result<()> { self.compute_(price, starting_indexes, exit)?; + let _lock = exit.lock(); self.db.compact()?; Ok(()) } diff --git a/crates/brk_computer/src/pools/mod.rs b/crates/brk_computer/src/pools/mod.rs index 75a8edbcc..980244031 100644 --- a/crates/brk_computer/src/pools/mod.rs +++ b/crates/brk_computer/src/pools/mod.rs @@ -82,6 +82,7 @@ impl Vecs { exit: &Exit, ) -> Result<()> { self.compute_(indexer, indexes, starting_indexes, chain, price, exit)?; + let _lock = exit.lock(); self.db.compact()?; Ok(()) } diff --git a/crates/brk_computer/src/price.rs b/crates/brk_computer/src/price.rs index 4cc46ff23..0d50694d4 100644 --- a/crates/brk_computer/src/price.rs +++ b/crates/brk_computer/src/price.rs @@ -176,6 +176,7 @@ impl Vecs { exit: &Exit, ) -> Result<()> { self.compute_(indexes, starting_indexes, fetched, exit)?; + let _lock = exit.lock(); self.db.compact()?; Ok(()) } diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs index af9647ee9..934301f2a 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs @@ -437,7 +437,7 @@ impl UTXOCohorts { let state = sub.state.as_ref()?; let entries: Vec<(Dollars, Sats)> = state .price_to_amount_iter()? - .map(|(&p, &a)| (p, a)) + .map(|(p, &a)| (p, a)) .collect(); Some((sub.filter().clone(), state.supply.value, entries)) }) diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index b981bc596..162cf243d 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -473,6 +473,7 @@ pub fn process_blocks( // Write to disk (pure I/O) - no changes saved for periodic flushes write(vecs, height, chain_state, false)?; + vecs.flush()?; // Recreate readers vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); diff --git a/crates/brk_computer/src/stateful/compute/write.rs b/crates/brk_computer/src/stateful/compute/write.rs index 7d3225d35..b9911a70f 100644 --- a/crates/brk_computer/src/stateful/compute/write.rs +++ b/crates/brk_computer/src/stateful/compute/write.rs @@ -3,6 +3,7 @@ use std::time::Instant; use brk_error::Result; use brk_types::Height; use log::info; +use rayon::prelude::*; use vecdb::{AnyStoredVec, GenericStoredVec, Stamp}; use crate::stateful::{ @@ -61,9 +62,8 @@ pub fn write( chain_state: &[BlockState], with_changes: bool, ) -> Result<()> { - use rayon::prelude::*; - info!("Writing to disk..."); + let i = Instant::now(); let stamp = Stamp::from(height); @@ -74,7 +74,6 @@ pub fn write( vecs.chain_state.push(block_state.supply.clone()); } - // Write all vecs in parallel using chained iterators vecs.any_address_indexes .par_iter_mut() .chain(vecs.addresses_data.par_iter_mut()) diff --git a/crates/brk_computer/src/stateful/metrics/price_paid.rs b/crates/brk_computer/src/stateful/metrics/price_paid.rs index ad05f6c96..18c1709d4 100644 --- a/crates/brk_computer/src/stateful/metrics/price_paid.rs +++ b/crates/brk_computer/src/stateful/metrics/price_paid.rs @@ -81,14 +81,14 @@ impl PricePaidMetrics { height, state .price_to_amount_first_key_value() - .map(|(&dollars, _)| dollars) + .map(|(dollars, _)| dollars) .unwrap_or(Dollars::NAN), )?; self.height_to_max_price_paid.truncate_push( height, state .price_to_amount_last_key_value() - .map(|(&dollars, _)| dollars) + .map(|(dollars, _)| dollars) .unwrap_or(Dollars::NAN), )?; Ok(()) diff --git a/crates/brk_computer/src/stateful/states/cohort.rs b/crates/brk_computer/src/stateful/states/cohort.rs index 41887c0e9..56f432221 100644 --- a/crates/brk_computer/src/stateful/states/cohort.rs +++ b/crates/brk_computer/src/stateful/states/cohort.rs @@ -76,12 +76,12 @@ impl CohortState { } /// Get first (lowest) price entry in distribution. - pub fn price_to_amount_first_key_value(&self) -> Option<(&Dollars, &Sats)> { + pub fn price_to_amount_first_key_value(&self) -> Option<(Dollars, &Sats)> { self.price_to_amount.as_ref()?.first_key_value() } /// Get last (highest) price entry in distribution. - pub fn price_to_amount_last_key_value(&self) -> Option<(&Dollars, &Sats)> { + pub fn price_to_amount_last_key_value(&self) -> Option<(Dollars, &Sats)> { self.price_to_amount.as_ref()?.last_key_value() } @@ -377,7 +377,7 @@ impl CohortState { self.price_to_amount .as_ref()? .first_key_value() - .map(|(&k, _)| k) + .map(|(k, _)| k) } /// Get last (highest) price in distribution. @@ -385,12 +385,12 @@ impl CohortState { self.price_to_amount .as_ref()? .last_key_value() - .map(|(&k, _)| k) + .map(|(k, _)| k) } /// Get iterator over price_to_amount for merged percentile computation. /// Returns None if price data is not tracked for this cohort. - pub fn price_to_amount_iter(&self) -> Option> { + pub fn price_to_amount_iter(&self) -> Option> { self.price_to_amount.as_ref().map(|p| p.iter()) } } diff --git a/crates/brk_computer/src/stateful/states/price_to_amount.rs b/crates/brk_computer/src/stateful/states/price_to_amount.rs index 3b52d0925..a8e5b8091 100644 --- a/crates/brk_computer/src/stateful/states/price_to_amount.rs +++ b/crates/brk_computer/src/stateful/states/price_to_amount.rs @@ -1,11 +1,12 @@ use std::{ collections::BTreeMap, fs, + ops::Bound, path::{Path, PathBuf}, }; use brk_error::{Error, Result}; -use brk_types::{Dollars, Height, Sats}; +use brk_types::{CentsCompact, Dollars, Height, Sats}; use derive_deref::{Deref, DerefMut}; use pco::standalone::{simple_decompress, simpler_compress}; use rustc_hash::FxHashMap; @@ -25,7 +26,7 @@ pub struct PriceToAmount { state: Option, /// Pending deltas: (total_increment, total_decrement) per price. /// Flushed to BTreeMap before reads and at end of block. - pending: FxHashMap, + pending: FxHashMap, } const STATE_AT_: &str = "state_at_"; @@ -57,49 +58,71 @@ impl PriceToAmount { ); } - pub fn iter(&self) -> impl Iterator { + pub fn iter(&self) -> impl Iterator { self.assert_pending_empty(); - self.state.u().iter() + self.state.u().iter().map(|(k, v)| (k.to_dollars(), v)) } - /// Iterate over entries in a price range with custom bounds. - pub fn range>( + /// Iterate over entries in a price range with explicit bounds. + pub fn range( &self, - range: R, - ) -> impl Iterator { + bounds: (Bound, Bound), + ) -> impl Iterator { self.assert_pending_empty(); - self.state.u().range(range) + + let start = match bounds.0 { + Bound::Included(d) => Bound::Included(CentsCompact::from(d)), + Bound::Excluded(d) => Bound::Excluded(CentsCompact::from(d)), + Bound::Unbounded => Bound::Unbounded, + }; + + let end = match bounds.1 { + Bound::Included(d) => Bound::Included(CentsCompact::from(d)), + Bound::Excluded(d) => Bound::Excluded(CentsCompact::from(d)), + Bound::Unbounded => Bound::Unbounded, + }; + + self.state + .u() + .range((start, end)) + .map(|(k, v)| (k.to_dollars(), v)) } pub fn is_empty(&self) -> bool { self.pending.is_empty() && self.state.u().is_empty() } - pub fn first_key_value(&self) -> Option<(&Dollars, &Sats)> { + pub fn first_key_value(&self) -> Option<(Dollars, &Sats)> { self.assert_pending_empty(); - self.state.u().first_key_value() + self.state + .u() + .first_key_value() + .map(|(k, v)| (k.to_dollars(), v)) } - pub fn last_key_value(&self) -> Option<(&Dollars, &Sats)> { + pub fn last_key_value(&self) -> Option<(Dollars, &Sats)> { self.assert_pending_empty(); - self.state.u().last_key_value() + self.state + .u() + .last_key_value() + .map(|(k, v)| (k.to_dollars(), v)) } /// Accumulate increment in pending batch. O(1). pub fn increment(&mut self, price: Dollars, supply_state: &SupplyState) { - self.pending.entry(price).or_default().0 += supply_state.value; + self.pending.entry(CentsCompact::from(price)).or_default().0 += supply_state.value; } /// Accumulate decrement in pending batch. O(1). pub fn decrement(&mut self, price: Dollars, supply_state: &SupplyState) { - self.pending.entry(price).or_default().1 += supply_state.value; + self.pending.entry(CentsCompact::from(price)).or_default().1 += supply_state.value; } /// Apply pending deltas to BTreeMap. O(k log n) where k = unique prices in pending. /// Must be called before any read operations. pub fn apply_pending(&mut self) { - for (price, (inc, dec)) in self.pending.drain() { - let entry = self.state.um().entry(price).or_default(); + for (cents, (inc, dec)) in self.pending.drain() { + let entry = self.state.um().entry(cents).or_default(); *entry += inc; if *entry < dec { panic!( @@ -108,12 +131,15 @@ impl PriceToAmount { Price: {}\n\ Current + increments: {}\n\ Trying to decrement by: {}", - self.pathbuf, price, entry, dec + self.pathbuf, + cents.to_dollars(), + entry, + dec ); } *entry -= dec; if *entry == Sats::ZERO { - self.state.um().remove(&price); + self.state.um().remove(¢s); } } } @@ -142,10 +168,10 @@ impl PriceToAmount { let mut cumsum = 0u64; let mut idx = 0; - for (&price, &amount) in state.iter() { + for (¢s, &amount) in state.iter() { cumsum += u64::from(amount); while idx < PERCENTILES_LEN && cumsum >= total * u64::from(PERCENTILES[idx]) / 100 { - result[idx] = price; + result[idx] = cents.to_dollars(); idx += 1; } } @@ -208,15 +234,14 @@ impl PriceToAmount { } #[derive(Clone, Default, Debug, Deref, DerefMut, Serialize, Deserialize)] -struct State(BTreeMap); +struct State(BTreeMap); const COMPRESSION_LEVEL: usize = 4; impl State { fn serialize(&self) -> vecdb::Result> { - let keys: Vec = self.keys().cloned().map(f64::from).collect(); - - let values: Vec = self.values().cloned().map(u64::from).collect(); + let keys: Vec = self.keys().map(|k| i32::from(*k)).collect(); + let values: Vec = self.values().map(|v| u64::from(*v)).collect(); let compressed_keys = simpler_compress(&keys, COMPRESSION_LEVEL)?; let compressed_values = simpler_compress(&values, COMPRESSION_LEVEL)?; @@ -234,13 +259,13 @@ impl State { let entry_count = usize::from_bytes(&data[0..8])?; let keys_len = usize::from_bytes(&data[8..16])?; - let keys: Vec = simple_decompress(&data[16..16 + keys_len])?; + let keys: Vec = simple_decompress(&data[16..16 + keys_len])?; let values: Vec = simple_decompress(&data[16 + keys_len..])?; - let map: BTreeMap = keys + let map: BTreeMap = keys .into_iter() .zip(values) - .map(|(k, v)| (Dollars::from(k), Sats::from(v))) + .map(|(k, v)| (CentsCompact::from(k), Sats::from(v))) .collect(); assert_eq!(map.len(), entry_count); diff --git a/crates/brk_computer/src/stateful/states/unrealized.rs b/crates/brk_computer/src/stateful/states/unrealized.rs index 40daa8fba..e60a033dd 100644 --- a/crates/brk_computer/src/stateful/states/unrealized.rs +++ b/crates/brk_computer/src/stateful/states/unrealized.rs @@ -133,7 +133,7 @@ impl CachedUnrealizedState { // Handle flipped entries (only iterate the small range between prices) if new_price > old_price { // Price went up: entries where old < price <= new flip from loss to profit - for (&price, &sats) in + for (price, &sats) in price_to_amount.range((Bound::Excluded(old_price), Bound::Included(new_price))) { // Move from loss to profit @@ -170,7 +170,7 @@ impl CachedUnrealizedState { } } else if new_price < old_price { // Price went down: entries where new < price <= old flip from profit to loss - for (&price, &sats) in + for (price, &sats) in price_to_amount.range((Bound::Excluded(new_price), Bound::Included(old_price))) { // Move from profit to loss @@ -209,7 +209,7 @@ impl CachedUnrealizedState { ) -> UnrealizedState { let mut state = UnrealizedState::ZERO; - for (&price, &sats) in price_to_amount.iter() { + for (price, &sats) in price_to_amount.iter() { if price <= current_price { state.supply_in_profit += sats; if price < current_price { diff --git a/crates/brk_computer/src/stateful/vecs.rs b/crates/brk_computer/src/stateful/vecs.rs index f68327c28..db70ebf29 100644 --- a/crates/brk_computer/src/stateful/vecs.rs +++ b/crates/brk_computer/src/stateful/vecs.rs @@ -489,7 +489,13 @@ impl Vecs { exit, )?; + let _lock = exit.lock(); self.db.compact()?; Ok(()) } + + pub fn flush(&self) -> Result<()> { + self.db.flush()?; + Ok(()) + } } diff --git a/crates/brk_computer/src/txins.rs b/crates/brk_computer/src/txins.rs index 2aa1600e1..4e2e2c5d3 100644 --- a/crates/brk_computer/src/txins.rs +++ b/crates/brk_computer/src/txins.rs @@ -12,7 +12,7 @@ use vecdb::{ use super::Indexes; -const ONE_GB: usize = 1024 * 1024 * 1024; +const BATCH_SIZE: usize = 2 * 1024 * 1024 * 1024 / size_of::(); #[derive(Clone, Traversable)] pub struct Vecs { @@ -49,26 +49,33 @@ impl Vecs { indexer: &Indexer, starting_indexes: &Indexes, exit: &Exit, + ) -> Result<()> { + self.compute_(indexer, starting_indexes, exit)?; + let _lock = exit.lock(); + self.db.compact()?; + Ok(()) + } + + fn compute_( + &mut self, + indexer: &Indexer, + starting_indexes: &Indexes, + exit: &Exit, ) -> Result<()> { let target = indexer.vecs.txin.txinindex_to_outpoint.len(); if target == 0 { return Ok(()); } - let min = self - .txinindex_to_txoutindex - .len() - .min(self.txinindex_to_value.len()) - .min(starting_indexes.txinindex.to_usize()); + let len1 = self.txinindex_to_txoutindex.len(); + let len2 = self.txinindex_to_value.len(); + let starting = starting_indexes.txinindex.to_usize(); + let min = len1.min(len2).min(starting); if min >= target { return Ok(()); } - info!("TxIns: computing {} entries ({} to {})", target - min, min, target); - - const BATCH_SIZE: usize = ONE_GB / size_of::(); - let mut outpoint_iter = indexer.vecs.txin.txinindex_to_outpoint.iter()?; let mut first_txoutindex_iter = indexer.vecs.tx.txindex_to_first_txoutindex.iter()?; let mut value_iter = indexer.vecs.txout.txoutindex_to_value.iter()?; @@ -116,16 +123,18 @@ impl Vecs { .truncate_push(entry.txinindex, entry.value)?; } + if batch_end < target { + info!("TxIns: {:.2}%", batch_end as f64 / target as f64 * 100.0); + } + + let _lock = exit.lock(); + self.txinindex_to_txoutindex.write()?; + self.txinindex_to_value.write()?; + self.db.flush()?; + batch_start = batch_end; } - { - let _lock = exit.lock(); - self.txinindex_to_txoutindex.flush()?; - self.txinindex_to_value.flush()?; - } - - self.db.compact()?; Ok(()) } } diff --git a/crates/brk_computer/src/txouts.rs b/crates/brk_computer/src/txouts.rs index f5158a79c..1ec9fd91b 100644 --- a/crates/brk_computer/src/txouts.rs +++ b/crates/brk_computer/src/txouts.rs @@ -6,14 +6,11 @@ use brk_traversable::Traversable; use brk_types::{Height, TxInIndex, TxOutIndex, Version}; use log::info; use vecdb::{ - AnyVec, BytesVec, Database, Exit, GenericStoredVec, ImportableVec, PAGE_SIZE, Stamp, - TypedVecIterator, + AnyStoredVec, AnyVec, BytesVec, Database, Exit, GenericStoredVec, ImportableVec, PAGE_SIZE, + Stamp, TypedVecIterator, VecIndex, }; -use super::{txins, Indexes}; - -const ONE_GB: usize = 1024 * 1024 * 1024; -const BATCH_SIZE: usize = ONE_GB / size_of::<(TxOutIndex, TxInIndex)>(); +use super::{Indexes, txins}; #[derive(Clone, Traversable)] pub struct Vecs { @@ -51,6 +48,7 @@ impl Vecs { exit: &Exit, ) -> Result<()> { self.compute_(indexer, txins, starting_indexes, exit)?; + let _lock = exit.lock(); self.db.compact()?; Ok(()) } @@ -62,66 +60,111 @@ impl Vecs { starting_indexes: &Indexes, exit: &Exit, ) -> Result<()> { - let target_txoutindex = indexer.vecs.txout.txoutindex_to_value.len(); - let target_txinindex = txins.txinindex_to_txoutindex.len(); - - if target_txoutindex == 0 { + let target_height = indexer.vecs.block.height_to_blockhash.len(); + if target_height == 0 { return Ok(()); } + let target_height = Height::from(target_height - 1); - let target_height = Height::from(indexer.vecs.block.height_to_blockhash.len() - 1); - - let min_txoutindex = - TxOutIndex::from(self.txoutindex_to_txinindex.len()).min(starting_indexes.txoutindex); - let min_txinindex = usize::from(starting_indexes.txinindex); + // Find min_height from current vec length + let current_txoutindex = self.txoutindex_to_txinindex.len(); + let min_txoutindex = current_txoutindex.min(starting_indexes.txoutindex.to_usize()); let starting_stamp = Stamp::from(starting_indexes.height); let _ = self.txoutindex_to_txinindex.rollback_before(starting_stamp); self.txoutindex_to_txinindex - .truncate_if_needed(min_txoutindex)?; + .truncate_if_needed(TxOutIndex::from(min_txoutindex))?; - self.txoutindex_to_txinindex - .fill_to(target_txoutindex, TxInIndex::UNSPENT)?; + let mut height_to_first_txoutindex = + indexer.vecs.txout.height_to_first_txoutindex.iter()?; + let mut height_to_first_txinindex = indexer.vecs.txin.height_to_first_txinindex.iter()?; + let mut txinindex_to_txoutindex = txins.txinindex_to_txoutindex.iter()?; - if min_txinindex < target_txinindex { - info!( - "TxOuts: computing spend mappings ({} to {})", - min_txinindex, target_txinindex - ); - - let mut txoutindex_iter = txins.txinindex_to_txoutindex.iter()?; - let mut pairs: Vec<(TxOutIndex, TxInIndex)> = Vec::with_capacity(BATCH_SIZE); - - let mut batch_start = min_txinindex; - while batch_start < target_txinindex { - let batch_end = (batch_start + BATCH_SIZE).min(target_txinindex); - - pairs.clear(); - for i in batch_start..batch_end { - let txinindex = TxInIndex::from(i); - let txoutindex = txoutindex_iter.get_unwrap(txinindex); - - if txoutindex.is_coinbase() { - continue; - } - - pairs.push((txoutindex, txinindex)); - } - - pairs.sort_unstable_by_key(|(txoutindex, _)| *txoutindex); - - for &(txoutindex, txinindex) in &pairs { - self.txoutindex_to_txinindex.update(txoutindex, txinindex)?; - } - - batch_start = batch_end; + // Find starting height from min_txoutindex + let mut min_height = Height::ZERO; + for h in 0..=target_height.to_usize() { + let txoutindex = height_to_first_txoutindex.get_unwrap(Height::from(h)); + if txoutindex.to_usize() > min_txoutindex { + break; } + min_height = Height::from(h); + } + + // Validate: computed height must not exceed starting height + assert!( + min_height <= starting_indexes.height, + "txouts min_height ({}) exceeds starting_indexes.height ({})", + min_height, + starting_indexes.height + ); + + const HEIGHT_BATCH: u32 = 10_000; + let mut pairs: Vec<(TxOutIndex, TxInIndex)> = Vec::new(); + + let mut batch_start_height = min_height; + while batch_start_height <= target_height { + let batch_end_height = (batch_start_height + HEIGHT_BATCH).min(target_height); + + // Fill txoutindex up to batch_end_height + 1 + let batch_txoutindex = if batch_end_height >= target_height { + indexer.vecs.txout.txoutindex_to_value.len() + } else { + height_to_first_txoutindex + .get_unwrap(batch_end_height + 1_u32) + .to_usize() + }; + self.txoutindex_to_txinindex + .fill_to(batch_txoutindex, TxInIndex::UNSPENT)?; + + // Get txin range for this height batch + let txin_start = height_to_first_txinindex + .get_unwrap(batch_start_height) + .to_usize(); + let txin_end = if batch_end_height >= target_height { + txins.txinindex_to_txoutindex.len() + } else { + height_to_first_txinindex + .get_unwrap(batch_end_height + 1_u32) + .to_usize() + }; + + // Collect and process txins + pairs.clear(); + for i in txin_start..txin_end { + let txinindex = TxInIndex::from(i); + let txoutindex = txinindex_to_txoutindex.get_unwrap(txinindex); + + if txoutindex.is_coinbase() { + continue; + } + + pairs.push((txoutindex, txinindex)); + } + + pairs.sort_unstable_by_key(|(txoutindex, _)| *txoutindex); + + for &(txoutindex, txinindex) in &pairs { + self.txoutindex_to_txinindex.update(txoutindex, txinindex)?; + } + + if batch_end_height < target_height { + let _lock = exit.lock(); + self.txoutindex_to_txinindex.write()?; + info!( + "TxOuts: {:.2}%", + batch_end_height.to_usize() as f64 / target_height.to_usize() as f64 * 100.0 + ); + self.db.flush()?; + } + + batch_start_height = batch_end_height + 1_u32; } let _lock = exit.lock(); self.txoutindex_to_txinindex .stamped_write_with_changes(Stamp::from(target_height))?; + self.db.flush()?; Ok(()) } diff --git a/crates/brk_types/src/centscompact.rs b/crates/brk_types/src/centscompact.rs new file mode 100644 index 000000000..aa0aeb2c6 --- /dev/null +++ b/crates/brk_types/src/centscompact.rs @@ -0,0 +1,62 @@ +use serde::{Deserialize, Serialize}; + +use super::Dollars; + +/// Compact representation of USD cents as i32. +/// +/// Used as a memory-efficient BTreeMap key instead of Dollars (f64). +/// Supports prices from $0.00 to $21,474,836.47 (i32::MAX / 100). +/// +/// Memory savings: 4 bytes vs 8 bytes per key, plus eliminates +/// floating-point precision issues that create duplicate keys. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct CentsCompact(i32); + +impl CentsCompact { + pub const ZERO: Self = Self(0); + + /// Convert to Dollars for display/computation + #[inline] + pub fn to_dollars(self) -> Dollars { + Dollars::from(self.0 as f64 / 100.0) + } +} + +impl From for CentsCompact { + #[inline] + fn from(value: Dollars) -> Self { + let f = f64::from(value); + if f.is_nan() || f < 0.0 { + Self::ZERO + } else { + let cents = (f * 100.0).round(); + assert!( + cents <= i32::MAX as f64, + "Price ${} exceeds CentsCompact max (~$21.5M)", + f + ); + Self(cents as i32) + } + } +} + +impl From for CentsCompact { + #[inline] + fn from(value: i32) -> Self { + Self(value) + } +} + +impl From for i32 { + #[inline] + fn from(value: CentsCompact) -> Self { + value.0 + } +} + +impl From for Dollars { + #[inline] + fn from(value: CentsCompact) -> Self { + value.to_dollars() + } +} diff --git a/crates/brk_types/src/lib.rs b/crates/brk_types/src/lib.rs index d3dc3cd17..2b35ff09d 100644 --- a/crates/brk_types/src/lib.rs +++ b/crates/brk_types/src/lib.rs @@ -35,6 +35,7 @@ mod blocktimestamp; mod blockweightentry; mod bytes; mod cents; +mod centscompact; mod datarange; mod datarangeformat; mod date; @@ -189,6 +190,7 @@ pub use blocktimestamp::*; pub use blockweightentry::*; pub use bytes::*; pub use cents::*; +pub use centscompact::*; pub use datarange::*; pub use datarangeformat::*; pub use date::*; diff --git a/crates/brk_types/src/loadedaddressdata.rs b/crates/brk_types/src/loadedaddressdata.rs index ef965ceeb..119bb5876 100644 --- a/crates/brk_types/src/loadedaddressdata.rs +++ b/crates/brk_types/src/loadedaddressdata.rs @@ -30,6 +30,9 @@ impl LoadedAddressData { (u64::from(self.received) - u64::from(self.sent)).into() } + /// Max realized price for CentsCompact (i32::MAX / 100) + const MAX_REALIZED_PRICE: f64 = 21_000_000.0; + pub fn realized_price(&self) -> Dollars { let p = (self.realized_cap / Bitcoin::from(self.balance())).round_to(4); if p.is_negative() { @@ -41,7 +44,7 @@ impl LoadedAddressData { )); panic!(""); } - p + p.min(Dollars::from(Self::MAX_REALIZED_PRICE)) } #[inline] diff --git a/crates/brk_types/src/outpoint.rs b/crates/brk_types/src/outpoint.rs index 4e5836ba9..a5aea1fe4 100644 --- a/crates/brk_types/src/outpoint.rs +++ b/crates/brk_types/src/outpoint.rs @@ -25,6 +25,9 @@ impl OutPoint { #[inline(always)] pub fn vout(self) -> Vout { + if self.is_coinbase() { + return Vout::MAX; + } Vout::from(self.0 as u32) }