diff --git a/Cargo.lock b/Cargo.lock index f5efe8f31..60a489b5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -630,6 +630,7 @@ dependencies = [ "color-eyre", "derive_deref", "log", + "mimalloc", "pco", "rayon", "rustc-hash", @@ -2918,13 +2919,13 @@ dependencies = [ [[package]] name = "libredox" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +checksum = "df15f6eac291ed1cf25865b1ee60399f57e7c227e7f51bdbd4c5270396a9ed50" dependencies = [ "bitflags 2.10.0", "libc", - "redox_syscall", + "redox_syscall 0.6.0", ] [[package]] @@ -3785,7 +3786,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link", ] @@ -4233,6 +4234,15 @@ dependencies = [ "bitflags 2.10.0", ] +[[package]] +name = "redox_syscall" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec96166dafa0886eb81fe1c0a388bece180fbef2135f97c1e2cf8302e74b43b5" +dependencies = [ + "bitflags 2.10.0", +] + [[package]] name = "redox_users" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index bfe64148f..bd0afb915 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,10 @@ inherits = "release" [profile.dist] inherits = "release" +[profile.profiling] +inherits = "release" +debug = true + [workspace.dependencies] aide = { version = "0.16.0-alpha.1", features = ["axum-json", "axum-query"] } axum = "0.8.7" diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index e52b26b04..92be1b5ea 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -33,3 +33,4 @@ vecdb = { workspace = true } [dev-dependencies] color-eyre = { workspace = true } +mimalloc = { workspace = true } diff --git a/crates/brk_computer/examples/computer.rs b/crates/brk_computer/examples/computer.rs index 743ad60f6..6b6f5a4f7 100644 --- a/crates/brk_computer/examples/computer.rs +++ b/crates/brk_computer/examples/computer.rs @@ -12,8 +12,12 @@ use brk_indexer::Indexer; use brk_iterator::Blocks; use brk_reader::Reader; use brk_rpc::{Auth, Client}; +use mimalloc::MiMalloc; use vecdb::Exit; +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + pub fn main() -> color_eyre::Result<()> { color_eyre::install()?; diff --git a/crates/brk_computer/examples/computer_bench.rs b/crates/brk_computer/examples/computer_bench.rs index 905f8d082..752396053 100644 --- a/crates/brk_computer/examples/computer_bench.rs +++ b/crates/brk_computer/examples/computer_bench.rs @@ -9,8 +9,12 @@ use brk_iterator::Blocks; use brk_reader::Reader; use brk_rpc::{Auth, Client}; use log::{debug, info}; +use mimalloc::MiMalloc; use vecdb::Exit; +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + pub fn main() -> Result<()> { // Can't increase main thread's stack size, thus we need to use another thread thread::Builder::new() diff --git a/crates/brk_computer/examples/computer_read.rs b/crates/brk_computer/examples/computer_read.rs index 6ab57d6ba..0ee329251 100644 --- a/crates/brk_computer/examples/computer_read.rs +++ b/crates/brk_computer/examples/computer_read.rs @@ -5,8 +5,12 @@ use brk_error::Result; use brk_fetcher::Fetcher; use brk_indexer::Indexer; use brk_types::TxIndex; +use mimalloc::MiMalloc; use vecdb::{Exit, GenericStoredVec}; +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + pub fn main() -> Result<()> { // Can't increase main thread's stack size, thus we need to use another thread thread::Builder::new() diff --git a/crates/brk_computer/examples/debug_indexer.rs b/crates/brk_computer/examples/debug_indexer.rs index 6ffd0d1d4..fcb18992f 100644 --- a/crates/brk_computer/examples/debug_indexer.rs +++ b/crates/brk_computer/examples/debug_indexer.rs @@ -2,8 +2,12 @@ use std::{env, path::Path}; use brk_indexer::Indexer; use brk_types::{Height, P2PKHAddressIndex, P2SHAddressIndex, TxOutIndex, TypeIndex}; +use mimalloc::MiMalloc; use vecdb::GenericStoredVec; +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + fn main() -> color_eyre::Result<()> { color_eyre::install()?; @@ -15,54 +19,94 @@ fn main() -> color_eyre::Result<()> { let reader_typeindex = indexer.vecs.txout.txoutindex_to_typeindex.create_reader(); let reader_txindex = indexer.vecs.txout.txoutindex_to_txindex.create_reader(); let reader_txid = indexer.vecs.tx.txindex_to_txid.create_reader(); - let reader_height_to_first_txoutindex = indexer.vecs.txout.height_to_first_txoutindex.create_reader(); - let reader_p2pkh = indexer.vecs.address.p2pkhaddressindex_to_p2pkhbytes.create_reader(); - let reader_p2sh = indexer.vecs.address.p2shaddressindex_to_p2shbytes.create_reader(); + let reader_height_to_first_txoutindex = indexer + .vecs + .txout + .height_to_first_txoutindex + .create_reader(); + let reader_p2pkh = indexer + .vecs + .address + .p2pkhaddressindex_to_p2pkhbytes + .create_reader(); + let reader_p2sh = indexer + .vecs + .address + .p2shaddressindex_to_p2shbytes + .create_reader(); // Check what's stored at typeindex 254909199 in both P2PKH and P2SH vecs let typeindex = TypeIndex::from(254909199_usize); let p2pkh_bytes = indexer .vecs - .address.p2pkhaddressindex_to_p2pkhbytes + .address + .p2pkhaddressindex_to_p2pkhbytes .read(P2PKHAddressIndex::from(typeindex), &reader_p2pkh); println!("P2PKH at typeindex 254909199: {:?}", p2pkh_bytes); let p2sh_bytes = indexer .vecs - .address.p2shaddressindex_to_p2shbytes + .address + .p2shaddressindex_to_p2shbytes .read(P2SHAddressIndex::from(typeindex), &reader_p2sh); println!("P2SH at typeindex 254909199: {:?}", p2sh_bytes); // Check first P2SH index at height 476152 - let reader_first_p2sh = indexer.vecs.address.height_to_first_p2shaddressindex.create_reader(); - let reader_first_p2pkh = indexer.vecs.address.height_to_first_p2pkhaddressindex.create_reader(); - let first_p2sh_at_476152 = indexer.vecs.address.height_to_first_p2shaddressindex.read(Height::from(476152_usize), &reader_first_p2sh); - let first_p2pkh_at_476152 = indexer.vecs.address.height_to_first_p2pkhaddressindex.read(Height::from(476152_usize), &reader_first_p2pkh); - println!("First P2SH index at height 476152: {:?}", first_p2sh_at_476152); - println!("First P2PKH index at height 476152: {:?}", first_p2pkh_at_476152); + let reader_first_p2sh = indexer + .vecs + .address + .height_to_first_p2shaddressindex + .create_reader(); + let reader_first_p2pkh = indexer + .vecs + .address + .height_to_first_p2pkhaddressindex + .create_reader(); + let first_p2sh_at_476152 = indexer + .vecs + .address + .height_to_first_p2shaddressindex + .read(Height::from(476152_usize), &reader_first_p2sh); + let first_p2pkh_at_476152 = indexer + .vecs + .address + .height_to_first_p2pkhaddressindex + .read(Height::from(476152_usize), &reader_first_p2pkh); + println!( + "First P2SH index at height 476152: {:?}", + first_p2sh_at_476152 + ); + println!( + "First P2PKH index at height 476152: {:?}", + first_p2pkh_at_476152 + ); // Check the problematic txoutindexes found during debugging for txoutindex_usize in [653399433_usize, 653399443_usize] { let txoutindex = TxOutIndex::from(txoutindex_usize); let outputtype = indexer .vecs - .txout.txoutindex_to_outputtype + .txout + .txoutindex_to_outputtype .read(txoutindex, &reader_outputtype) .unwrap(); let typeindex = indexer .vecs - .txout.txoutindex_to_typeindex + .txout + .txoutindex_to_typeindex .read(txoutindex, &reader_typeindex) .unwrap(); let txindex = indexer .vecs - .txout.txoutindex_to_txindex + .txout + .txoutindex_to_txindex .read(txoutindex, &reader_txindex) .unwrap(); let txid = indexer .vecs - .tx.txindex_to_txid + .tx + .txindex_to_txid .read(txindex, &reader_txid) .unwrap(); @@ -71,7 +115,8 @@ fn main() -> color_eyre::Result<()> { for h in 0..900_000_usize { let first_txoutindex = indexer .vecs - .txout.height_to_first_txoutindex + .txout + .height_to_first_txoutindex .read(Height::from(h), &reader_height_to_first_txoutindex); if let Ok(first) = first_txoutindex { if usize::from(first) > txoutindex_usize { diff --git a/crates/brk_computer/src/stateful/cohorts/state.rs b/crates/brk_computer/src/stateful/cohorts/state.rs index ed1b6c993..9e1ae9878 100644 --- a/crates/brk_computer/src/stateful/cohorts/state.rs +++ b/crates/brk_computer/src/stateful/cohorts/state.rs @@ -9,7 +9,7 @@ use brk_types::{Dollars, Height, Sats}; use crate::{ CachedUnrealizedState, PriceToAmount, RealizedState, SupplyState, UnrealizedState, - grouped::{PERCENTILES, PERCENTILES_LEN}, + grouped::PERCENTILES_LEN, utils::OptionExt, }; @@ -321,38 +321,12 @@ impl CohortState { } /// Compute prices at percentile thresholds. + /// Uses O(19 * log n) Fenwick tree queries instead of O(n) iteration. pub fn compute_percentile_prices(&self) -> [Dollars; PERCENTILES_LEN] { - let mut result = [Dollars::NAN; PERCENTILES_LEN]; - - let price_to_amount = match self.price_to_amount.as_ref() { - Some(p) => p, - None => return result, - }; - - if price_to_amount.is_empty() || self.supply.value == Sats::ZERO { - return result; + match self.price_to_amount.as_ref() { + Some(p) if !p.is_empty() => p.compute_percentiles(), + _ => [Dollars::NAN; PERCENTILES_LEN], } - - let total = u64::from(self.supply.value); - let targets = PERCENTILES.map(|p| total * u64::from(p) / 100); - - let mut accumulated = 0u64; - let mut pct_idx = 0; - - for (&price, &sats) in price_to_amount.iter() { - accumulated += u64::from(sats); - - while pct_idx < PERCENTILES_LEN && accumulated >= targets[pct_idx] { - result[pct_idx] = price; - pct_idx += 1; - } - - if pct_idx >= PERCENTILES_LEN { - break; - } - } - - result } /// Compute unrealized profit/loss at current price. diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs index 1a3eb0411..dd45d72a3 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/tick_tock.rs @@ -2,8 +2,11 @@ //! //! When a new block arrives, UTXOs age. Some cross day boundaries //! and need to move between age-based cohorts. +//! +//! Optimization: Instead of iterating all ~800k blocks O(n), we binary search +//! for blocks at each day boundary O(k * log n) where k = number of boundaries. -use brk_grouper::{Filter, Filtered}; +use brk_grouper::AGE_BOUNDARIES; use brk_types::{ONE_DAY_IN_SEC, Timestamp}; use crate::states::BlockState; @@ -15,58 +18,70 @@ impl UTXOCohorts { /// /// UTXOs age with each block. When they cross day boundaries, /// they move between age-based cohorts (e.g., from "0-1d" to "1-7d"). + /// + /// Complexity: O(k * (log n + m)) where: + /// - k = 19 boundaries to check + /// - n = total blocks in chain_state + /// - m = blocks crossing each boundary (typically 0-2 per boundary per block) pub fn tick_tock_next_block(&mut self, chain_state: &[BlockState], timestamp: Timestamp) { if chain_state.is_empty() { return; } let prev_timestamp = chain_state.last().unwrap().timestamp; - - // Optimization: Only blocks whose age % ONE_DAY >= threshold can cross a day boundary. - // Saves computation vs checking days_old for every block. let elapsed = (*timestamp).saturating_sub(*prev_timestamp); - let threshold = ONE_DAY_IN_SEC.saturating_sub(elapsed); - // Collect age_range cohorts with their filters and states - let mut age_cohorts: Vec<(Filter, &mut Option<_>)> = self - .0 - .age_range - .iter_mut() - .map(|v| (v.filter().clone(), &mut v.state)) - .collect(); + // Skip if no time has passed + if elapsed == 0 { + return; + } - // Process blocks that might cross a day boundary - chain_state - .iter() - .filter(|block_state| { - let age = (*prev_timestamp).saturating_sub(*block_state.timestamp); - age % ONE_DAY_IN_SEC >= threshold - }) - .for_each(|block_state| { + // Get age_range cohort states (indexed 0..20) + // Cohort i covers days [BOUNDARIES[i-1], BOUNDARIES[i]) + // Cohort 0 covers [0, 1) days + // Cohort 19 covers [15*365, infinity) days + let mut age_cohorts: Vec<_> = self.0.age_range.iter_mut().map(|v| &mut v.state).collect(); + + // For each boundary, find blocks that just crossed it + for (boundary_idx, &boundary_days) in AGE_BOUNDARIES.iter().enumerate() { + let boundary_seconds = (boundary_days as u32) * ONE_DAY_IN_SEC; + + // Blocks crossing boundary B have timestamps in (prev - B*DAY, curr - B*DAY] + // prev_days < B and curr_days >= B + // means: block was younger than B days, now is B days or older + let upper_timestamp = (*timestamp).saturating_sub(boundary_seconds); + let lower_timestamp = (*prev_timestamp).saturating_sub(boundary_seconds); + + // Skip if the range is empty (would happen if boundary > chain age) + if upper_timestamp <= lower_timestamp { + continue; + } + + // Binary search to find blocks in the timestamp range (lower, upper] + let start_idx = chain_state.partition_point(|b| *b.timestamp <= lower_timestamp); + let end_idx = chain_state.partition_point(|b| *b.timestamp <= upper_timestamp); + + // Process blocks that crossed this boundary + for block_state in &chain_state[start_idx..end_idx] { + // Double-check the day boundary was actually crossed + // (handles edge cases with day boundaries) let prev_days = prev_timestamp.difference_in_days_between(block_state.timestamp); let curr_days = timestamp.difference_in_days_between(block_state.timestamp); - if prev_days == curr_days { - return; + if prev_days >= boundary_days || curr_days < boundary_days { + continue; } - // Update age_range cohort states - age_cohorts.iter_mut().for_each(|(filter, state)| { - let is_now = filter.contains_time(curr_days); - let was_before = filter.contains_time(prev_days); - - if is_now && !was_before { - state - .as_mut() - .unwrap() - .increment(&block_state.supply, block_state.price); - } else if was_before && !is_now { - state - .as_mut() - .unwrap() - .decrement(&block_state.supply, block_state.price); - } - }); - }); + // Block crossed from cohort[boundary_idx] to cohort[boundary_idx + 1] + // Decrement from the "younger" cohort + if let Some(state) = age_cohorts[boundary_idx].as_mut() { + state.decrement(&block_state.supply, block_state.price); + } + // Increment in the "older" cohort + if let Some(state) = age_cohorts[boundary_idx + 1].as_mut() { + state.increment(&block_state.supply, block_state.price); + } + } + } } } diff --git a/crates/brk_computer/src/states/fenwick.rs b/crates/brk_computer/src/states/fenwick.rs new file mode 100644 index 000000000..65f63acf7 --- /dev/null +++ b/crates/brk_computer/src/states/fenwick.rs @@ -0,0 +1,132 @@ +//! Fenwick Tree (Binary Indexed Tree) for O(log n) prefix sums. +//! +//! Used for efficient percentile computation over price distributions. + +/// Fenwick tree for O(log n) prefix sum queries and updates. +/// +/// Supports: +/// - `add(idx, delta)`: O(log n) - add delta to position idx +/// - `prefix_sum(idx)`: O(log n) - sum of elements 0..=idx +/// - `lower_bound(target)`: O(log n) - find smallest idx where prefix_sum >= target +#[derive(Clone, Debug)] +pub struct FenwickTree { + tree: Vec, + len: usize, +} + +impl FenwickTree { + /// Create a new Fenwick tree with given capacity. + pub fn new(len: usize) -> Self { + Self { + tree: vec![0; len + 1], // 1-indexed + len, + } + } + + /// Add delta to position idx. O(log n). + pub fn add(&mut self, idx: usize, delta: u64) { + let mut i = idx + 1; // Convert to 1-indexed + while i <= self.len { + self.tree[i] += delta; + i += i & i.wrapping_neg(); // Add LSB + } + } + + /// Subtract delta from position idx. O(log n). + pub fn sub(&mut self, idx: usize, delta: u64) { + let mut i = idx + 1; + while i <= self.len { + self.tree[i] -= delta; + i += i & i.wrapping_neg(); + } + } + + /// Get prefix sum of elements 0..=idx. O(log n). + pub fn prefix_sum(&self, idx: usize) -> u64 { + let mut sum = 0u64; + let mut i = idx + 1; // Convert to 1-indexed + while i > 0 { + sum += self.tree[i]; + i -= i & i.wrapping_neg(); // Remove LSB + } + sum + } + + /// Find smallest index where prefix_sum >= target. O(log n). + /// Returns None if no such index exists (target > total sum). + pub fn lower_bound(&self, target: u64) -> Option { + if target == 0 { + return Some(0); + } + + let mut sum = 0u64; + let mut pos = 0usize; + + // Find highest bit position + let mut bit = 1usize << (usize::BITS - 1 - self.len.leading_zeros()); + + while bit > 0 { + let next_pos = pos + bit; + if next_pos <= self.len && sum + self.tree[next_pos] < target { + sum += self.tree[next_pos]; + pos = next_pos; + } + bit >>= 1; + } + + // pos is now the largest index where prefix_sum < target + // So pos + 1 is the smallest where prefix_sum >= target + if pos < self.len { + Some(pos) // Convert back to 0-indexed + } else { + None + } + } + + /// Get total sum of all elements. O(log n). + pub fn total(&self) -> u64 { + self.prefix_sum(self.len.saturating_sub(1)) + } + + /// Reset all values to zero. O(n). + pub fn clear(&mut self) { + self.tree.fill(0); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_basic_operations() { + let mut ft = FenwickTree::new(10); + + ft.add(0, 5); + ft.add(2, 3); + ft.add(5, 7); + + assert_eq!(ft.prefix_sum(0), 5); + assert_eq!(ft.prefix_sum(1), 5); + assert_eq!(ft.prefix_sum(2), 8); + assert_eq!(ft.prefix_sum(5), 15); + assert_eq!(ft.total(), 15); + } + + #[test] + fn test_lower_bound() { + let mut ft = FenwickTree::new(10); + + ft.add(0, 10); + ft.add(2, 20); + ft.add(5, 30); + + assert_eq!(ft.lower_bound(5), Some(0)); + assert_eq!(ft.lower_bound(10), Some(0)); + assert_eq!(ft.lower_bound(11), Some(2)); + assert_eq!(ft.lower_bound(30), Some(2)); + assert_eq!(ft.lower_bound(31), Some(5)); + assert_eq!(ft.lower_bound(60), Some(5)); + assert_eq!(ft.lower_bound(61), None); + } +} diff --git a/crates/brk_computer/src/states/mod.rs b/crates/brk_computer/src/states/mod.rs index 2619d068f..ec22918fd 100644 --- a/crates/brk_computer/src/states/mod.rs +++ b/crates/brk_computer/src/states/mod.rs @@ -1,6 +1,8 @@ mod block; // mod cohorts; +mod fenwick; mod flushable; +mod price_buckets; mod price_to_amount; mod realized; mod supply; @@ -10,6 +12,7 @@ mod unrealized; pub use block::*; // pub use cohorts::*; pub use flushable::*; +pub use price_buckets::*; pub use price_to_amount::*; pub use realized::*; pub use supply::*; diff --git a/crates/brk_computer/src/states/price_buckets.rs b/crates/brk_computer/src/states/price_buckets.rs new file mode 100644 index 000000000..fcfdce9b6 --- /dev/null +++ b/crates/brk_computer/src/states/price_buckets.rs @@ -0,0 +1,244 @@ +//! Logarithmic price buckets with Fenwick tree for O(log n) percentile queries. +//! +//! Uses logarithmic buckets to maintain constant relative precision across all price levels. +//! Bucket i represents prices in range [MIN_PRICE * BASE^i, MIN_PRICE * BASE^(i+1)). + +use brk_types::{Dollars, Sats}; + +use super::fenwick::FenwickTree; +use crate::grouped::{PERCENTILES, PERCENTILES_LEN}; + +/// Minimum price tracked (sub-cent for early Bitcoin days). +const MIN_PRICE: f64 = 0.001; + +/// Maximum price tracked ($100M for future-proofing). +const MAX_PRICE: f64 = 100_000_000.0; + +/// Base for logarithmic buckets (0.1% precision). +const BASE: f64 = 1.001; + +/// Pre-computed ln(BASE) for efficiency. +const LN_BASE: f64 = 0.0009995003; // ln(1.001) + +/// Pre-computed ln(MIN_PRICE) for efficiency. +const LN_MIN_PRICE: f64 = -6.907755279; // ln(0.001) + +/// Number of buckets needed: ceil(ln(MAX/MIN) / ln(BASE)). +/// ln(100_000_000 / 0.001) / ln(1.001) ≈ 25,328 +const NUM_BUCKETS: usize = 25_400; // Rounded up for safety + +/// Logarithmic price buckets with O(log n) percentile queries. +#[derive(Clone, Debug)] +pub struct PriceBuckets { + /// Fenwick tree for O(log n) prefix sums. + fenwick: FenwickTree, + /// Direct bucket access for iteration (needed for unrealized computation). + buckets: Vec, + /// Total supply tracked. + total: Sats, +} + +impl Default for PriceBuckets { + fn default() -> Self { + Self::new() + } +} + +impl PriceBuckets { + /// Create new empty price buckets. + pub fn new() -> Self { + Self { + fenwick: FenwickTree::new(NUM_BUCKETS), + buckets: vec![Sats::ZERO; NUM_BUCKETS], + total: Sats::ZERO, + } + } + + /// Convert price to bucket index. O(1). + #[inline] + pub fn price_to_bucket(price: Dollars) -> usize { + let price_f64 = f64::from(price); + if price_f64 <= MIN_PRICE { + return 0; + } + let bucket = ((price_f64.ln() - LN_MIN_PRICE) / LN_BASE) as usize; + bucket.min(NUM_BUCKETS - 1) + } + + /// Convert bucket index to representative price (bucket midpoint). O(1). + #[inline] + pub fn bucket_to_price(bucket: usize) -> Dollars { + // Use geometric mean of bucket range for better accuracy + let low = MIN_PRICE * BASE.powi(bucket as i32); + let high = low * BASE; + Dollars::from((low * high).sqrt()) + } + + /// Add amount at given price. O(log n). + pub fn increment(&mut self, price: Dollars, amount: Sats) { + if amount == Sats::ZERO { + return; + } + let bucket = Self::price_to_bucket(price); + self.fenwick.add(bucket, u64::from(amount)); + self.buckets[bucket] += amount; + self.total += amount; + } + + /// Remove amount at given price. O(log n). + pub fn decrement(&mut self, price: Dollars, amount: Sats) { + if amount == Sats::ZERO { + return; + } + let bucket = Self::price_to_bucket(price); + self.fenwick.sub(bucket, u64::from(amount)); + self.buckets[bucket] -= amount; + self.total -= amount; + } + + /// Check if empty. + pub fn is_empty(&self) -> bool { + self.total == Sats::ZERO + } + + /// Get total supply. + pub fn total(&self) -> Sats { + self.total + } + + /// Compute all percentile prices. O(19 * log n) ≈ O(323 ops). + pub fn compute_percentiles(&self) -> [Dollars; PERCENTILES_LEN] { + let mut result = [Dollars::NAN; PERCENTILES_LEN]; + + if self.total == Sats::ZERO { + return result; + } + + let total = u64::from(self.total); + + for (i, &percentile) in PERCENTILES.iter().enumerate() { + let target = total * u64::from(percentile) / 100; + if let Some(bucket) = self.fenwick.lower_bound(target) { + result[i] = Self::bucket_to_price(bucket); + } + } + + result + } + + /// Get amount in a specific bucket. + pub fn get_bucket(&self, bucket: usize) -> Sats { + self.buckets.get(bucket).copied().unwrap_or(Sats::ZERO) + } + + /// Iterate over non-empty buckets in a price range. + /// Used for unrealized computation flip range. + pub fn iter_range( + &self, + from_price: Dollars, + to_price: Dollars, + ) -> impl Iterator + '_ { + let from_bucket = Self::price_to_bucket(from_price); + let to_bucket = Self::price_to_bucket(to_price); + + let (start, end) = if from_bucket <= to_bucket { + (from_bucket, to_bucket) + } else { + (to_bucket, from_bucket) + }; + + (start..=end).filter_map(move |bucket| { + let amount = self.buckets[bucket]; + if amount > Sats::ZERO { + Some((Self::bucket_to_price(bucket), amount)) + } else { + None + } + }) + } + + /// Iterate over all non-empty buckets (for full unrealized computation). + pub fn iter(&self) -> impl Iterator + '_ { + self.buckets + .iter() + .enumerate() + .filter_map(|(bucket, &amount)| { + if amount > Sats::ZERO { + Some((Self::bucket_to_price(bucket), amount)) + } else { + None + } + }) + } + + /// Get the lowest price bucket with non-zero amount. + pub fn min_price(&self) -> Option { + self.buckets + .iter() + .position(|&s| s > Sats::ZERO) + .map(Self::bucket_to_price) + } + + /// Get the highest price bucket with non-zero amount. + pub fn max_price(&self) -> Option { + self.buckets + .iter() + .rposition(|&s| s > Sats::ZERO) + .map(Self::bucket_to_price) + } + + /// Clear all data. + pub fn clear(&mut self) { + self.fenwick.clear(); + self.buckets.fill(Sats::ZERO); + self.total = Sats::ZERO; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bucket_conversion() { + // Test price -> bucket -> price roundtrip + let prices = [0.01, 1.0, 100.0, 10000.0, 50000.0, 100000.0]; + + for &price in &prices { + let bucket = PriceBuckets::price_to_bucket(Dollars::from(price)); + let recovered = PriceBuckets::bucket_to_price(bucket); + let ratio = f64::from(recovered) / price; + // Should be within 0.1% (our bucket precision) + assert!( + (0.999..=1.001).contains(&ratio), + "price={}, recovered={}, ratio={}", + price, + f64::from(recovered), + ratio + ); + } + } + + #[test] + fn test_percentiles() { + let mut buckets = PriceBuckets::new(); + + // Add 100 sats at $10, 200 sats at $20, 300 sats at $30 + buckets.increment(Dollars::from(10.0), Sats::from(100u64)); + buckets.increment(Dollars::from(20.0), Sats::from(200u64)); + buckets.increment(Dollars::from(30.0), Sats::from(300u64)); + + // Total = 600 sats + // 50th percentile = 300 sats = should be around $20-$30 + let percentiles = buckets.compute_percentiles(); + + // Median (index 9 in PERCENTILES which is 50%) + let median = percentiles[9]; // PERCENTILES[9] = 50 + let median_f64 = f64::from(median); + assert!( + (15.0..=35.0).contains(&median_f64), + "median={} should be around $20-$30", + median_f64 + ); + } +} diff --git a/crates/brk_computer/src/states/price_to_amount.rs b/crates/brk_computer/src/states/price_to_amount.rs index 0667c13f0..e91b5464f 100644 --- a/crates/brk_computer/src/states/price_to_amount.rs +++ b/crates/brk_computer/src/states/price_to_amount.rs @@ -11,12 +11,17 @@ use pco::standalone::{simple_decompress, simpler_compress}; use serde::{Deserialize, Serialize}; use vecdb::Bytes; -use crate::{states::SupplyState, utils::OptionExt}; +use crate::{grouped::PERCENTILES_LEN, states::SupplyState, utils::OptionExt}; + +use super::PriceBuckets; #[derive(Clone, Debug)] pub struct PriceToAmount { pathbuf: PathBuf, state: Option, + /// Logarithmic buckets for O(log n) percentile queries. + /// Rebuilt on load, not persisted. + buckets: Option, } const STATE_AT_: &str = "state_at_"; @@ -27,6 +32,7 @@ impl PriceToAmount { Self { pathbuf: path.join(format!("{name}_price_to_amount")), state: None, + buckets: None, } } @@ -35,7 +41,16 @@ impl PriceToAmount { let (&height, path) = files.range(..=height).next_back().ok_or(Error::NotFound( "No price state found at or before height".into(), ))?; - self.state = Some(State::deserialize(&fs::read(path)?)?); + let state = State::deserialize(&fs::read(path)?)?; + + // Rebuild buckets from loaded state + let mut buckets = PriceBuckets::new(); + for (&price, &amount) in state.iter() { + buckets.increment(price, amount); + } + + self.state = Some(state); + self.buckets = Some(buckets); Ok(height) } @@ -65,6 +80,9 @@ impl PriceToAmount { pub fn increment(&mut self, price: Dollars, supply_state: &SupplyState) { *self.state.um().entry(price).or_default() += supply_state.value; + if let Some(buckets) = self.buckets.as_mut() { + buckets.increment(price, supply_state.value); + } } pub fn decrement(&mut self, price: Dollars, supply_state: &SupplyState) { @@ -73,6 +91,9 @@ impl PriceToAmount { if *amount == Sats::ZERO { self.state.um().remove(&price); } + if let Some(buckets) = self.buckets.as_mut() { + buckets.decrement(price, supply_state.value); + } } else { dbg!(price, &self.pathbuf); unreachable!(); @@ -81,6 +102,16 @@ impl PriceToAmount { pub fn init(&mut self) { self.state.replace(State::default()); + self.buckets.replace(PriceBuckets::new()); + } + + /// Compute percentile prices using O(log n) Fenwick tree queries. + pub fn compute_percentiles(&self) -> [Dollars; PERCENTILES_LEN] { + if let Some(buckets) = self.buckets.as_ref() { + buckets.compute_percentiles() + } else { + [Dollars::NAN; PERCENTILES_LEN] + } } pub fn clean(&mut self) -> Result<()> { diff --git a/crates/brk_grouper/src/by_age_range.rs b/crates/brk_grouper/src/by_age_range.rs index 53cf3a833..123a2c2ed 100644 --- a/crates/brk_grouper/src/by_age_range.rs +++ b/crates/brk_grouper/src/by_age_range.rs @@ -3,6 +3,30 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::{Filter, TimeFilter}; +/// Age boundaries in days. Defines the cohort ranges: +/// [0, B[0]), [B[0], B[1]), [B[1], B[2]), ..., [B[n-1], ∞) +pub const AGE_BOUNDARIES: [usize; 19] = [ + 1, // up_to_1d | _1d_to_1w + 7, // _1d_to_1w | _1w_to_1m + 30, // _1w_to_1m | _1m_to_2m + 2 * 30, // _1m_to_2m | _2m_to_3m + 3 * 30, // _2m_to_3m | _3m_to_4m + 4 * 30, // _3m_to_4m | _4m_to_5m + 5 * 30, // _4m_to_5m | _5m_to_6m + 6 * 30, // _5m_to_6m | _6m_to_1y + 365, // _6m_to_1y | _1y_to_2y + 2 * 365, // _1y_to_2y | _2y_to_3y + 3 * 365, // _2y_to_3y | _3y_to_4y + 4 * 365, // _3y_to_4y | _4y_to_5y + 5 * 365, // _4y_to_5y | _5y_to_6y + 6 * 365, // _5y_to_6y | _6y_to_7y + 7 * 365, // _6y_to_7y | _7y_to_8y + 8 * 365, // _7y_to_8y | _8y_to_10y + 10 * 365, // _8y_to_10y | _10y_to_12y + 12 * 365, // _10y_to_12y | _12y_to_15y + 15 * 365, // _12y_to_15y | from_15y +]; + #[derive(Default, Clone, Traversable)] pub struct ByAgeRange { pub up_to_1d: T, @@ -33,26 +57,26 @@ impl ByAgeRange { F: FnMut(Filter) -> T, { Self { - up_to_1d: create(Filter::Time(TimeFilter::Range(0..1))), - _1d_to_1w: create(Filter::Time(TimeFilter::Range(1..7))), - _1w_to_1m: create(Filter::Time(TimeFilter::Range(7..30))), - _1m_to_2m: create(Filter::Time(TimeFilter::Range(30..2 * 30))), - _2m_to_3m: create(Filter::Time(TimeFilter::Range(2 * 30..3 * 30))), - _3m_to_4m: create(Filter::Time(TimeFilter::Range(3 * 30..4 * 30))), - _4m_to_5m: create(Filter::Time(TimeFilter::Range(4 * 30..5 * 30))), - _5m_to_6m: create(Filter::Time(TimeFilter::Range(5 * 30..6 * 30))), - _6m_to_1y: create(Filter::Time(TimeFilter::Range(6 * 30..365))), - _1y_to_2y: create(Filter::Time(TimeFilter::Range(365..2 * 365))), - _2y_to_3y: create(Filter::Time(TimeFilter::Range(2 * 365..3 * 365))), - _3y_to_4y: create(Filter::Time(TimeFilter::Range(3 * 365..4 * 365))), - _4y_to_5y: create(Filter::Time(TimeFilter::Range(4 * 365..5 * 365))), - _5y_to_6y: create(Filter::Time(TimeFilter::Range(5 * 365..6 * 365))), - _6y_to_7y: create(Filter::Time(TimeFilter::Range(6 * 365..7 * 365))), - _7y_to_8y: create(Filter::Time(TimeFilter::Range(7 * 365..8 * 365))), - _8y_to_10y: create(Filter::Time(TimeFilter::Range(8 * 365..10 * 365))), - _10y_to_12y: create(Filter::Time(TimeFilter::Range(10 * 365..12 * 365))), - _12y_to_15y: create(Filter::Time(TimeFilter::Range(12 * 365..15 * 365))), - from_15y: create(Filter::Time(TimeFilter::GreaterOrEqual(15 * 365))), + up_to_1d: create(Filter::Time(TimeFilter::Range(0..AGE_BOUNDARIES[0]))), + _1d_to_1w: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[0]..AGE_BOUNDARIES[1]))), + _1w_to_1m: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[1]..AGE_BOUNDARIES[2]))), + _1m_to_2m: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[2]..AGE_BOUNDARIES[3]))), + _2m_to_3m: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[3]..AGE_BOUNDARIES[4]))), + _3m_to_4m: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[4]..AGE_BOUNDARIES[5]))), + _4m_to_5m: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[5]..AGE_BOUNDARIES[6]))), + _5m_to_6m: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[6]..AGE_BOUNDARIES[7]))), + _6m_to_1y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[7]..AGE_BOUNDARIES[8]))), + _1y_to_2y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[8]..AGE_BOUNDARIES[9]))), + _2y_to_3y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[9]..AGE_BOUNDARIES[10]))), + _3y_to_4y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[10]..AGE_BOUNDARIES[11]))), + _4y_to_5y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[11]..AGE_BOUNDARIES[12]))), + _5y_to_6y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[12]..AGE_BOUNDARIES[13]))), + _6y_to_7y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[13]..AGE_BOUNDARIES[14]))), + _7y_to_8y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[14]..AGE_BOUNDARIES[15]))), + _8y_to_10y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[15]..AGE_BOUNDARIES[16]))), + _10y_to_12y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[16]..AGE_BOUNDARIES[17]))), + _12y_to_15y: create(Filter::Time(TimeFilter::Range(AGE_BOUNDARIES[17]..AGE_BOUNDARIES[18]))), + from_15y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[18]))), } } diff --git a/crates/brk_grouper/src/by_max_age.rs b/crates/brk_grouper/src/by_max_age.rs index 60275312f..cf24ea8fe 100644 --- a/crates/brk_grouper/src/by_max_age.rs +++ b/crates/brk_grouper/src/by_max_age.rs @@ -1,4 +1,4 @@ -use super::{Filter, TimeFilter}; +use super::{Filter, TimeFilter, AGE_BOUNDARIES}; use brk_traversable::Traversable; use rayon::prelude::*; @@ -30,24 +30,24 @@ impl ByMaxAge { F: FnMut(Filter) -> T, { Self { - _1w: create(Filter::Time(TimeFilter::LowerThan(7))), - _1m: create(Filter::Time(TimeFilter::LowerThan(30))), - _2m: create(Filter::Time(TimeFilter::LowerThan(2 * 30))), - _3m: create(Filter::Time(TimeFilter::LowerThan(3 * 30))), - _4m: create(Filter::Time(TimeFilter::LowerThan(4 * 30))), - _5m: create(Filter::Time(TimeFilter::LowerThan(5 * 30))), - _6m: create(Filter::Time(TimeFilter::LowerThan(6 * 30))), - _1y: create(Filter::Time(TimeFilter::LowerThan(365))), - _2y: create(Filter::Time(TimeFilter::LowerThan(2 * 365))), - _3y: create(Filter::Time(TimeFilter::LowerThan(3 * 365))), - _4y: create(Filter::Time(TimeFilter::LowerThan(4 * 365))), - _5y: create(Filter::Time(TimeFilter::LowerThan(5 * 365))), - _6y: create(Filter::Time(TimeFilter::LowerThan(6 * 365))), - _7y: create(Filter::Time(TimeFilter::LowerThan(7 * 365))), - _8y: create(Filter::Time(TimeFilter::LowerThan(8 * 365))), - _10y: create(Filter::Time(TimeFilter::LowerThan(10 * 365))), - _12y: create(Filter::Time(TimeFilter::LowerThan(12 * 365))), - _15y: create(Filter::Time(TimeFilter::LowerThan(15 * 365))), + _1w: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[1]))), + _1m: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[2]))), + _2m: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[3]))), + _3m: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[4]))), + _4m: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[5]))), + _5m: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[6]))), + _6m: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[7]))), + _1y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[8]))), + _2y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[9]))), + _3y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[10]))), + _4y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[11]))), + _5y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[12]))), + _6y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[13]))), + _7y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[14]))), + _8y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[15]))), + _10y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[16]))), + _12y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[17]))), + _15y: create(Filter::Time(TimeFilter::LowerThan(AGE_BOUNDARIES[18]))), } } diff --git a/crates/brk_grouper/src/by_min_age.rs b/crates/brk_grouper/src/by_min_age.rs index 993316f81..91c269bd3 100644 --- a/crates/brk_grouper/src/by_min_age.rs +++ b/crates/brk_grouper/src/by_min_age.rs @@ -1,7 +1,7 @@ use brk_traversable::Traversable; use rayon::prelude::*; -use super::{Filter, TimeFilter}; +use super::{Filter, TimeFilter, AGE_BOUNDARIES}; #[derive(Default, Clone, Traversable)] pub struct ByMinAge { @@ -31,24 +31,24 @@ impl ByMinAge { F: FnMut(Filter) -> T, { Self { - _1d: create(Filter::Time(TimeFilter::GreaterOrEqual(1))), - _1w: create(Filter::Time(TimeFilter::GreaterOrEqual(7))), - _1m: create(Filter::Time(TimeFilter::GreaterOrEqual(30))), - _2m: create(Filter::Time(TimeFilter::GreaterOrEqual(2 * 30))), - _3m: create(Filter::Time(TimeFilter::GreaterOrEqual(3 * 30))), - _4m: create(Filter::Time(TimeFilter::GreaterOrEqual(4 * 30))), - _5m: create(Filter::Time(TimeFilter::GreaterOrEqual(5 * 30))), - _6m: create(Filter::Time(TimeFilter::GreaterOrEqual(6 * 30))), - _1y: create(Filter::Time(TimeFilter::GreaterOrEqual(365))), - _2y: create(Filter::Time(TimeFilter::GreaterOrEqual(2 * 365))), - _3y: create(Filter::Time(TimeFilter::GreaterOrEqual(3 * 365))), - _4y: create(Filter::Time(TimeFilter::GreaterOrEqual(4 * 365))), - _5y: create(Filter::Time(TimeFilter::GreaterOrEqual(5 * 365))), - _6y: create(Filter::Time(TimeFilter::GreaterOrEqual(6 * 365))), - _7y: create(Filter::Time(TimeFilter::GreaterOrEqual(7 * 365))), - _8y: create(Filter::Time(TimeFilter::GreaterOrEqual(8 * 365))), - _10y: create(Filter::Time(TimeFilter::GreaterOrEqual(10 * 365))), - _12y: create(Filter::Time(TimeFilter::GreaterOrEqual(12 * 365))), + _1d: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[0]))), + _1w: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[1]))), + _1m: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[2]))), + _2m: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[3]))), + _3m: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[4]))), + _4m: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[5]))), + _5m: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[6]))), + _6m: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[7]))), + _1y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[8]))), + _2y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[9]))), + _3y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[10]))), + _4y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[11]))), + _5y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[12]))), + _6y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[13]))), + _7y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[14]))), + _8y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[15]))), + _10y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[16]))), + _12y: create(Filter::Time(TimeFilter::GreaterOrEqual(AGE_BOUNDARIES[17]))), } } diff --git a/crates/brk_indexer/examples/indexer.rs b/crates/brk_indexer/examples/indexer.rs index 55d6b9303..fc150f90b 100644 --- a/crates/brk_indexer/examples/indexer.rs +++ b/crates/brk_indexer/examples/indexer.rs @@ -12,6 +12,9 @@ use brk_rpc::{Auth, Client}; use log::{debug, info}; use vecdb::Exit; +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + fn main() -> color_eyre::Result<()> { color_eyre::install()?; diff --git a/crates/brk_indexer/examples/indexer_read.rs b/crates/brk_indexer/examples/indexer_read.rs index e168b7a6c..e7ba35c87 100644 --- a/crates/brk_indexer/examples/indexer_read.rs +++ b/crates/brk_indexer/examples/indexer_read.rs @@ -3,6 +3,9 @@ use brk_indexer::Indexer; // use brk_types::Sats; use std::{fs, path::Path}; +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + fn main() -> Result<()> { brk_logger::init(Some(Path::new(".log")))?; diff --git a/crates/brk_indexer/examples/indexer_read_speed.rs b/crates/brk_indexer/examples/indexer_read_speed.rs index 6934e4a8d..4bf236a5e 100644 --- a/crates/brk_indexer/examples/indexer_read_speed.rs +++ b/crates/brk_indexer/examples/indexer_read_speed.rs @@ -3,6 +3,9 @@ use brk_indexer::Indexer; use brk_types::Sats; use std::{fs, path::Path, time::Instant}; +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + fn run_benchmark(indexer: &Indexer) -> (Sats, std::time::Duration, usize) { let start = Instant::now(); let mut sum = Sats::ZERO; diff --git a/scripts/samply_computer.sh b/scripts/samply_computer.sh new file mode 100755 index 000000000..242ae5d8b --- /dev/null +++ b/scripts/samply_computer.sh @@ -0,0 +1 @@ +cargo build --profile profiling --example computer && samply record ./target/profiling/examples/computer