From 5cc3fbfa6e10fcd984da1468e938ed8593383cf8 Mon Sep 17 00:00:00 2001 From: nym21 Date: Tue, 12 May 2026 22:33:09 +0200 Subject: [PATCH] crates: snapshot --- crates/brk_client/src/lib.rs | 4 +- crates/brk_computer/src/prices/compute.rs | 105 +++-------- crates/brk_mempool/Cargo.toml | 1 + crates/brk_mempool/src/lib.rs | 30 +-- crates/brk_mempool/src/stores/mod.rs | 2 + crates/brk_mempool/src/stores/output_bins.rs | 23 +++ crates/brk_mempool/src/stores/tx_store.rs | 25 ++- crates/brk_oracle/examples/compare_digits.rs | 18 +- crates/brk_oracle/examples/determinism.rs | 20 +- crates/brk_oracle/examples/noise.rs | 29 +-- crates/brk_oracle/examples/report.rs | 19 +- crates/brk_oracle/examples/sweep_digits.rs | 18 +- crates/brk_oracle/examples/sweep_tolerance.rs | 18 +- crates/brk_oracle/examples/validate.rs | 20 +- crates/brk_oracle/src/config.rs | 40 ++++ crates/brk_oracle/src/histogram.rs | 41 +++++ crates/brk_oracle/src/lib.rs | 172 ++++++++---------- crates/brk_query/Cargo.toml | 1 + crates/brk_query/src/impl/addr.rs | 55 +++--- crates/brk_query/src/impl/price.rs | 60 +++++- crates/brk_query/src/lib.rs | 8 +- crates/brk_server/src/api/addrs.rs | 39 +++- crates/brk_server/src/api/mining.rs | 3 +- crates/brk_server/src/api/server.rs | 10 +- crates/brk_server/src/state.rs | 51 +++++- 25 files changed, 450 insertions(+), 362 deletions(-) create mode 100644 crates/brk_mempool/src/stores/output_bins.rs create mode 100644 crates/brk_oracle/src/config.rs create mode 100644 crates/brk_oracle/src/histogram.rs diff --git a/crates/brk_client/src/lib.rs b/crates/brk_client/src/lib.rs index ab0a03702..be6c2a7ff 100644 --- a/crates/brk_client/src/lib.rs +++ b/crates/brk_client/src/lib.rs @@ -9009,7 +9009,7 @@ impl BrkClient { /// Health check /// - /// Returns the health status of the API server, including uptime information. + /// Liveness probe. Returns server identity, uptime, and indexed/computed heights from local state only (no bitcoind round-trip). For real chain-tip catch-up, see `/api/server/sync`. /// /// Endpoint: `GET /health` pub fn get_health(&self) -> Result { @@ -9294,7 +9294,7 @@ impl BrkClient { /// Address transactions /// - /// Get transaction history for an address, sorted with newest first. Returns up to 50 entries: mempool transactions first, then confirmed transactions filling the remainder. To paginate further confirmed transactions, use `/address/{address}/txs/chain/{last_seen_txid}`. + /// Get transaction history for an address, newest first. Returns up to 50 mempool transactions plus a confirmed page sized to fill the response to 50 total (chain floor of 25, so 25-50 confirmed depending on mempool weight). To paginate further confirmed history, use `/address/{address}/txs/chain/{last_seen_txid}`. /// /// *[Mempool.space docs](https://mempool.space/docs/api/rest#get-address-transactions)* /// diff --git a/crates/brk_computer/src/prices/compute.rs b/crates/brk_computer/src/prices/compute.rs index 01ca8672c..24facff32 100644 --- a/crates/brk_computer/src/prices/compute.rs +++ b/crates/brk_computer/src/prices/compute.rs @@ -1,8 +1,8 @@ use std::ops::Range; -use brk_error::{Error, Result}; +use brk_error::Result; use brk_indexer::{Indexer, Lengths}; -use brk_oracle::{Config, NUM_BINS, Oracle, START_HEIGHT, bin_to_cents, cents_to_bin}; +use brk_oracle::{Config, Histogram, Oracle, START_HEIGHT, bin_to_cents, cents_to_bin}; use brk_types::{Cents, OutputType, Sats, TxIndex, TxOutIndex}; use tracing::info; use vecdb::{AnyStoredVec, AnyVec, Exit, ReadableVec, StorageMode, VecIndex, WritableVec}; @@ -112,7 +112,7 @@ impl Vecs { let seed_bin = cents_to_bin(prev_cents.inner() as f64); let warmup = config.window_size.min(committed - START_HEIGHT); let mut oracle = Oracle::from_checkpoint(seed_bin, config, |o| { - Self::feed_blocks(o, indexer, (committed - warmup)..committed); + Self::feed_blocks(o, indexer, (committed - warmup)..committed, None); }); let num_new = total_heights - committed; @@ -121,7 +121,8 @@ impl Vecs { committed, total_heights ); - let ref_bins = Self::feed_blocks(&mut oracle, indexer, committed..total_heights); + let ref_bins = + Self::feed_blocks(&mut oracle, indexer, committed..total_heights, None); for (i, ref_bin) in ref_bins.into_iter().enumerate() { self.spot @@ -150,32 +151,14 @@ impl Vecs { } /// Feed a range of blocks from the indexer into an Oracle (skipping coinbase), - /// returning per-block ref_bin values. Uncapped: derives boundaries from - /// raw indexer vec lengths. Use during compute, when the indexer is - /// quiescent and `safe_lengths` is still pinned at the pre-pass value. - fn feed_blocks( + /// returning per-block ref_bin values. + /// + /// Pass `cap = None` from compute paths, when the indexer is quiescent and + /// raw vec lengths are authoritative. Pass `cap = Some(&safe_lengths)` from + /// reader paths so concurrent writer pushes past the cap are invisible. + pub fn feed_blocks( oracle: &mut Oracle, - indexer: &Indexer, - range: Range, - ) -> Vec { - Self::feed_blocks_inner(oracle, indexer, range, None) - } - - /// Capped variant: derives boundaries from `cap` instead of raw vec - /// lengths, so concurrent writer pushes past `cap` are invisible. - /// Reader paths (live_oracle) use this with the current `safe_lengths`. - fn feed_blocks_capped( - oracle: &mut Oracle, - indexer: &Indexer, - range: Range, - cap: &Lengths, - ) -> Vec { - Self::feed_blocks_inner(oracle, indexer, range, Some(cap)) - } - - fn feed_blocks_inner( - oracle: &mut Oracle, - indexer: &Indexer, + indexer: &Indexer, range: Range, cap: Option<&Lengths>, ) -> Vec { @@ -208,24 +191,24 @@ impl Vecs { let mut ref_bins = Vec::with_capacity(range.len()); - // Cursor avoids per-block PcoVec page decompression for - // the tx-indexed first_txout_index lookup. The accessed - // tx_index values (first_tx_index + 1) are strictly increasing - // across blocks, so the cursor only advances forward. + // Cursor avoids per-block PcoVec page decompression for the + // tx-indexed first_txout_index lookup. Accessed tx_index values + // (first_tx_index + 1) are strictly increasing across blocks, + // so the cursor only advances forward. let mut txout_cursor = indexer.vecs.transactions.first_txout_index.cursor(); - // Reusable buffers — avoid per-block allocation + // Reusable buffers: avoid per-block allocation let mut values: Vec = Vec::new(); let mut output_types: Vec = Vec::new(); - for (idx, _h) in range.enumerate() { + for idx in 0..range.len() { let first_tx_index = first_tx_indexes[idx]; let next_first_tx_index = first_tx_indexes .get(idx + 1) .copied() .unwrap_or(TxIndex::from(total_txs)); - let next_out_first = out_firsts + let out_end = out_firsts .get(idx + 1) .copied() .unwrap_or(TxOutIndex::from(total_outputs)) @@ -235,9 +218,8 @@ impl Vecs { txout_cursor.advance(target - txout_cursor.position()); txout_cursor.next().unwrap().to_usize() } else { - next_out_first + out_end }; - let out_end = next_out_first; indexer .vecs @@ -250,10 +232,10 @@ impl Vecs { &mut output_types, ); - let mut hist = [0u32; NUM_BINS]; - for i in 0..values.len() { - if let Some(bin) = oracle.output_to_bin(values[i], output_types[i]) { - hist[bin] += 1; + let mut hist = Histogram::zeros(); + for (sats, output_type) in values.iter().zip(&output_types) { + if let Some(bin) = oracle.output_to_bin(*sats, *output_type) { + hist.increment(bin); } } @@ -263,42 +245,3 @@ impl Vecs { ref_bins } } - -impl Vecs { - /// Returns an Oracle seeded from the last committed price, with the last - /// window_size blocks already processed. Ready for additional blocks (e.g. mempool). - pub fn live_oracle(&self, indexer: &Indexer) -> Result { - let config = Config::default(); - let safe_lengths = indexer.safe_lengths(); - let height = safe_lengths.height.to_usize(); - let last_idx = self - .spot - .cents - .height - .len() - .checked_sub(1) - .ok_or(Error::NotFound( - "oracle prices not yet computed".to_string(), - ))?; - let last_cents = self - .spot - .cents - .height - .collect_one_at(last_idx) - .ok_or(Error::NotFound( - "oracle prices not yet computed".to_string(), - ))?; - let seed_bin = cents_to_bin(last_cents.inner() as f64); - let window_size = config.window_size; - let oracle = Oracle::from_checkpoint(seed_bin, config, |o| { - Vecs::feed_blocks_capped( - o, - indexer, - height.saturating_sub(window_size)..height, - &safe_lengths, - ); - }); - - Ok(oracle) - } -} diff --git a/crates/brk_mempool/Cargo.toml b/crates/brk_mempool/Cargo.toml index 93418110a..ad72c0c80 100644 --- a/crates/brk_mempool/Cargo.toml +++ b/crates/brk_mempool/Cargo.toml @@ -11,6 +11,7 @@ exclude = ["examples/"] [dependencies] bitcoin = { workspace = true } brk_error = { workspace = true } +brk_oracle = { workspace = true } brk_rpc = { workspace = true } brk_types = { workspace = true } derive_more = { workspace = true } diff --git a/crates/brk_mempool/src/lib.rs b/crates/brk_mempool/src/lib.rs index eea10ab88..9db8c345a 100644 --- a/crates/brk_mempool/src/lib.rs +++ b/crates/brk_mempool/src/lib.rs @@ -32,11 +32,12 @@ use std::{ }; use brk_error::Result; +use brk_oracle::Histogram; use brk_rpc::Client; use brk_types::{ AddrBytes, AddrMempoolStats, BlockTemplate, BlockTemplateDiff, FeeRate, MempoolBlock, - MempoolInfo, MempoolRecentTx, NextBlockHash, OutpointPrefix, OutputType, Sats, Timestamp, - Transaction, TxOut, Txid, TxidPrefix, Vin, Vout, + MempoolInfo, MempoolRecentTx, NextBlockHash, OutpointPrefix, Timestamp, Transaction, TxOut, + Txid, TxidPrefix, Vin, Vout, }; use parking_lot::{RwLock, RwLockReadGuard}; use rustc_hash::{FxHashMap, FxHashSet}; @@ -236,19 +237,20 @@ impl Mempool { .collect() } - /// Apply `f` to an iterator over `(value, output_type)` for every output - /// of every live mempool tx. The lock is held for the duration of the call. - pub fn process_live_outputs( - &self, - f: impl FnOnce(&mut dyn Iterator) -> R, - ) -> R { + /// Histogram of pre-bucketed oracle bins across all live mempool tx + /// outputs. Bins are computed once on insert (see `OutputBins`), so this + /// hot path is `O(eligible outputs)` of integer increments. Used by + /// `live_price` to blend the mempool into the committed oracle without + /// re-parsing scripts per request. + pub fn live_histogram(&self) -> Histogram { + let mut hist = Histogram::zeros(); let state = self.read(); - let mut iter = state - .txs - .values() - .flat_map(|tx| &tx.output) - .map(|txout| (txout.value, txout.type_())); - f(&mut iter) + for (_, record) in state.txs.records() { + for bin in record.output_bins.iter() { + hist.increment(bin as usize); + } + } + hist } /// Effective fee rate for a live tx: snapshot's linearized chunk diff --git a/crates/brk_mempool/src/stores/mod.rs b/crates/brk_mempool/src/stores/mod.rs index 4836377f6..fbb378e96 100644 --- a/crates/brk_mempool/src/stores/mod.rs +++ b/crates/brk_mempool/src/stores/mod.rs @@ -5,10 +5,12 @@ pub(crate) mod addr_tracker; pub(crate) mod outpoint_spends; +pub(crate) mod output_bins; pub(crate) mod tx_graveyard; pub(crate) mod tx_store; pub(crate) use addr_tracker::AddrTracker; pub(crate) use outpoint_spends::OutpointSpends; +pub(crate) use output_bins::OutputBins; pub(crate) use tx_graveyard::{TxGraveyard, TxTombstone}; pub(crate) use tx_store::TxStore; diff --git a/crates/brk_mempool/src/stores/output_bins.rs b/crates/brk_mempool/src/stores/output_bins.rs new file mode 100644 index 000000000..c7166f825 --- /dev/null +++ b/crates/brk_mempool/src/stores/output_bins.rs @@ -0,0 +1,23 @@ +use brk_oracle::default_eligible_bin; +use brk_types::Transaction; +use smallvec::SmallVec; + +/// Pre-bucketed oracle bins for a tx's eligible outputs. Computed once on +/// insert so `Mempool::live_histogram` can bin all live outputs without +/// re-parsing scripts or recomputing eligibility per request. +pub struct OutputBins(SmallVec<[u16; 4]>); + +impl OutputBins { + pub fn from_tx(tx: &Transaction) -> Self { + Self( + tx.output + .iter() + .filter_map(|o| default_eligible_bin(o.value, o.type_())) + .collect(), + ) + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.0.iter().copied() + } +} diff --git a/crates/brk_mempool/src/stores/tx_store.rs b/crates/brk_mempool/src/stores/tx_store.rs index 32b70a84c..be99b8359 100644 --- a/crates/brk_mempool/src/stores/tx_store.rs +++ b/crates/brk_mempool/src/stores/tx_store.rs @@ -1,15 +1,28 @@ use brk_types::{MempoolRecentTx, Transaction, TxOut, Txid, TxidPrefix, Vin}; use rustc_hash::{FxHashMap, FxHashSet}; -use crate::TxEntry; +use crate::{TxEntry, stores::OutputBins}; const RECENT_CAP: usize = 10; -/// Per-tx record: live tx body and its mempool entry, kept under one -/// key so a single map probe returns both. +/// Per-tx record: live tx body, its mempool entry, and the pre-bucketed +/// oracle bins for its outputs. Kept under one key so a single map probe +/// returns everything readers need. pub struct TxRecord { pub tx: Transaction, pub entry: TxEntry, + pub output_bins: OutputBins, +} + +impl TxRecord { + pub fn new(tx: Transaction, entry: TxEntry) -> Self { + let output_bins = OutputBins::from_tx(&tx); + Self { + tx, + entry, + output_bins, + } + } } /// Live-pool index keyed by `TxidPrefix`. The full `Txid` lives in @@ -63,10 +76,6 @@ impl TxStore { self.records.values().map(|r| &r.entry.txid) } - pub fn values(&self) -> impl Iterator { - self.records.values().map(|r| &r.tx) - } - pub fn insert(&mut self, tx: Transaction, entry: TxEntry) { let prefix = entry.txid_prefix(); debug_assert!( @@ -77,7 +86,7 @@ impl TxStore { if tx.input.iter().any(|i| i.prevout.is_none()) { self.unresolved.insert(prefix); } - self.records.insert(prefix, TxRecord { tx, entry }); + self.records.insert(prefix, TxRecord::new(tx, entry)); } fn sample_recent(&mut self, txid: &Txid, tx: &Transaction) { diff --git a/crates/brk_oracle/examples/compare_digits.rs b/crates/brk_oracle/examples/compare_digits.rs index 71ca15596..0d1a47476 100644 --- a/crates/brk_oracle/examples/compare_digits.rs +++ b/crates/brk_oracle/examples/compare_digits.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use std::time::Instant; use brk_indexer::Indexer; -use brk_oracle::{Config, NUM_BINS, Oracle, PRICES, START_HEIGHT, cents_to_bin, sats_to_bin}; +use brk_oracle::{Config, Histogram, NUM_BINS, Oracle, PRICES, cents_to_bin, sats_to_bin}; use brk_types::{OutputType, Sats, TxIndex, TxOutIndex}; use vecdb::{AnyVec, ReadableVec, VecIndex}; @@ -159,7 +159,7 @@ fn main() { let ref_config = Config::default(); let earliest_start = *start_heights.iter().min().unwrap(); - for h in START_HEIGHT..total_heights { + for h in earliest_start..total_heights { let ft = first_tx_index[h]; let next_ft = first_tx_index .get(h + 1) @@ -187,10 +187,6 @@ fn main() { .unwrap_or(TxOutIndex::from(total_outputs)) .to_usize(); - if h < earliest_start { - continue; - } - let values: Vec = indexer .vecs .outputs @@ -203,8 +199,8 @@ fn main() { .collect_range_at(out_start, out_end); // Build full histogram and per-digit histograms. - let mut full_hist = [0u32; NUM_BINS]; - let mut digit_hist = [[0u32; NUM_BINS]; 9]; + let mut full_hist = Histogram::zeros(); + let mut digit_hist: [Histogram; 9] = std::array::from_fn(|_| Histogram::zeros()); for (sats, output_type) in values.into_iter().zip(output_types) { if ref_config.excluded_output_types.contains(&output_type) { @@ -214,11 +210,11 @@ fn main() { continue; } if let Some(bin) = sats_to_bin(sats) { - full_hist[bin] += 1; + full_hist.increment(bin); if is_round(*sats) { let d = leading_digit(*sats); if (1..=9).contains(&d) { - digit_hist[(d - 1) as usize][bin] += 1; + digit_hist[(d - 1) as usize].increment(bin); } } } @@ -227,7 +223,7 @@ fn main() { // Feed each (mask, start_height) combo. for (mi, &(mask, _)) in masks.iter().enumerate() { // Build filtered histogram for this mask. - let mut hist = full_hist; + let mut hist = full_hist.clone(); (0..9usize).for_each(|d| { if mask & (1 << d) != 0 { for bin in 0..NUM_BINS { diff --git a/crates/brk_oracle/examples/determinism.rs b/crates/brk_oracle/examples/determinism.rs index 57a54e1a8..080d23433 100644 --- a/crates/brk_oracle/examples/determinism.rs +++ b/crates/brk_oracle/examples/determinism.rs @@ -11,7 +11,9 @@ use std::path::PathBuf; use brk_indexer::Indexer; -use brk_oracle::{Config, NUM_BINS, Oracle, PRICES, START_HEIGHT, cents_to_bin, sats_to_bin}; +use brk_oracle::{ + Config, Histogram, Oracle, PRICES, START_HEIGHT, cents_to_bin, default_eligible_bin, +}; use brk_types::{OutputType, Sats, TxIndex, TxOutIndex}; use vecdb::{AnyVec, ReadableVec, VecIndex}; @@ -52,8 +54,6 @@ fn main() { let first_tx_index: Vec = indexer.vecs.transactions.first_tx_index.collect(); let out_first: Vec = indexer.vecs.outputs.first_txout_index.collect(); - let ref_config = Config::default(); - // Reference oracle at 575k. let ref_start = START_HEIGHT; let mut ref_oracle = Oracle::new(seed_bin(ref_start), Config::default()); @@ -112,18 +112,10 @@ fn main() { .output_type .collect_range_at(out_start, out_end); - let mut hist = [0u32; NUM_BINS]; + let mut hist = Histogram::zeros(); for (sats, output_type) in values.into_iter().zip(output_types) { - if ref_config.excluded_output_types.contains(&output_type) { - continue; - } - if *sats < ref_config.min_sats - || (ref_config.exclude_common_round_values && sats.is_common_round_value()) - { - continue; - } - if let Some(bin) = sats_to_bin(sats) { - hist[bin] += 1; + if let Some(bin) = default_eligible_bin(sats, output_type) { + hist.increment(bin as usize); } } diff --git a/crates/brk_oracle/examples/noise.rs b/crates/brk_oracle/examples/noise.rs index aae9d09c3..04463570b 100644 --- a/crates/brk_oracle/examples/noise.rs +++ b/crates/brk_oracle/examples/noise.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use std::time::Instant; use brk_indexer::Indexer; -use brk_oracle::{Config, NUM_BINS, Oracle, PRICES, cents_to_bin, sats_to_bin}; +use brk_oracle::{Config, Histogram, Oracle, PRICES, cents_to_bin, default_eligible_bin}; use brk_types::{Sats, TxIndex, TxOutIndex}; use vecdb::{AnyVec, ReadableVec, VecIndex}; @@ -19,7 +19,7 @@ fn bins_to_pct(bins: f64) -> f64 { (10.0_f64.powf(bins / BPD) - 1.0) * 100.0 } -fn price_seed_bin(start_height: usize) -> f64 { +fn seed_bin(start_height: usize) -> f64 { let price: f64 = PRICES .lines() .nth(start_height - 1) @@ -30,9 +30,7 @@ fn price_seed_bin(start_height: usize) -> f64 { } /// Clamp the top N bins in `src` down to the (N+1)th highest value, writing into `dst`. -fn clamp_top_n(src: &[u32; NUM_BINS], dst: &mut [u32; NUM_BINS], n: usize) { - // Find the (n+1)th largest value. - // Collect non-zero counts, sort descending, take the (n+1)th. +fn clamp_top_n(src: &Histogram, dst: &mut Histogram, n: usize) { let mut top: Vec = src.iter().copied().filter(|&v| v > 0).collect(); top.sort_unstable_by(|a, b| b.cmp(a)); let clamp_to = if top.len() > n { top[n] } else { 0 }; @@ -102,7 +100,7 @@ fn main() { let total_blocks = total_heights - lowest; struct BlockData { - hist: Box<[u32; NUM_BINS]>, + hist: Histogram, high_bin: f64, low_bin: f64, } @@ -144,21 +142,12 @@ fn main() { .unwrap_or(TxOutIndex::from(total_outputs)) .to_usize(); - let mut hist = Box::new([0u32; NUM_BINS]); + let mut hist = Histogram::zeros(); for i in out_start..out_end { let sats: Sats = value_reader.get(i); let output_type = output_type_reader.get(i); - if config.excluded_output_types.contains(&output_type) { - continue; - } - if *sats < config.min_sats { - continue; - } - if config.exclude_common_round_values && sats.is_common_round_value() { - continue; - } - if let Some(bin) = sats_to_bin(sats) { - hist[bin] += 1; + if let Some(bin) = default_eligible_bin(sats, output_type) { + hist.increment(bin as usize); } } @@ -206,7 +195,7 @@ fn main() { println!("{}", "-".repeat(72)); for &start_height in &start_heights { - let mut oracle = Oracle::new(price_seed_bin(start_height), config.clone()); + let mut oracle = Oracle::new(seed_bin(start_height), config.clone()); let block_offset = start_height - lowest; let mut worst_err: f64 = 0.0; @@ -217,7 +206,7 @@ fn main() { let mut total_sq_err: f64 = 0.0; let mut total_measured: u64 = 0; - let mut clamped_hist = [0u32; NUM_BINS]; + let mut clamped_hist = Histogram::zeros(); for (i, bd) in blocks[block_offset..].iter().enumerate() { if clamp_n > 0 { clamp_top_n(&bd.hist, &mut clamped_hist, clamp_n); diff --git a/crates/brk_oracle/examples/report.rs b/crates/brk_oracle/examples/report.rs index 375e67f42..b125322aa 100644 --- a/crates/brk_oracle/examples/report.rs +++ b/crates/brk_oracle/examples/report.rs @@ -6,7 +6,8 @@ use std::path::PathBuf; use brk_indexer::Indexer; use brk_oracle::{ - Config, NUM_BINS, Oracle, PRICES, START_HEIGHT, bin_to_cents, cents_to_bin, sats_to_bin, + Config, Histogram, Oracle, PRICES, START_HEIGHT, bin_to_cents, cents_to_bin, + default_eligible_bin, }; use brk_types::{OutputType, Sats, TxIndex, TxOutIndex}; use vecdb::{AnyVec, ReadableVec, VecIndex}; @@ -188,8 +189,6 @@ fn main() { let first_tx_index: Vec = indexer.vecs.transactions.first_tx_index.collect(); let out_first: Vec = indexer.vecs.outputs.first_txout_index.collect(); - let ref_config = Config::default(); - let mut year_stats: Vec = Vec::new(); let mut overall = YearStats::new(0); let mut worst_blocks: Vec = Vec::new(); @@ -238,18 +237,10 @@ fn main() { .output_type .collect_range_at(out_start, out_end); - let mut hist = [0u32; NUM_BINS]; + let mut hist = Histogram::zeros(); for (sats, output_type) in values.into_iter().zip(output_types) { - if ref_config.excluded_output_types.contains(&output_type) { - continue; - } - if *sats < ref_config.min_sats - || (ref_config.exclude_common_round_values && sats.is_common_round_value()) - { - continue; - } - if let Some(bin) = sats_to_bin(sats) { - hist[bin] += 1; + if let Some(bin) = default_eligible_bin(sats, output_type) { + hist.increment(bin as usize); } } diff --git a/crates/brk_oracle/examples/sweep_digits.rs b/crates/brk_oracle/examples/sweep_digits.rs index a022cfff7..0d412d642 100644 --- a/crates/brk_oracle/examples/sweep_digits.rs +++ b/crates/brk_oracle/examples/sweep_digits.rs @@ -12,7 +12,7 @@ use std::path::PathBuf; use std::time::Instant; use brk_indexer::Indexer; -use brk_oracle::{Config, NUM_BINS, Oracle, PRICES, START_HEIGHT, cents_to_bin, sats_to_bin}; +use brk_oracle::{Config, Histogram, Oracle, PRICES, cents_to_bin, sats_to_bin}; use brk_types::{OutputType, Sats, TxIndex, TxOutIndex}; use vecdb::{AnyVec, ReadableVec, VecIndex}; @@ -117,7 +117,7 @@ impl Stats { } struct BlockData { - full_hist: Box<[u32; NUM_BINS]>, + full_hist: Histogram, /// (bin_index, leading_digit) for outputs that are round values. round_outputs: Vec<(u16, u8)>, high_bin: f64, @@ -173,7 +173,7 @@ fn main() { let total_blocks = total_heights - sweep_start; let mut blocks: Vec = Vec::with_capacity(total_blocks); - for h in START_HEIGHT..total_heights { + for h in sweep_start..total_heights { let ft = first_tx_index[h]; let next_ft = first_tx_index .get(h + 1) @@ -201,10 +201,6 @@ fn main() { .unwrap_or(TxOutIndex::from(total_outputs)) .to_usize(); - if h < sweep_start { - continue; - } - let values: Vec = indexer .vecs .outputs @@ -216,7 +212,7 @@ fn main() { .output_type .collect_range_at(out_start, out_end); - let mut full_hist = Box::new([0u32; NUM_BINS]); + let mut full_hist = Histogram::zeros(); let mut round_outputs = Vec::new(); for (sats, output_type) in values.into_iter().zip(output_types) { @@ -227,7 +223,7 @@ fn main() { continue; } if let Some(bin) = sats_to_bin(sats) { - full_hist[bin] += 1; + full_hist.increment(bin); if is_round(*sats) { let d = leading_digit(*sats); if (1..=9).contains(&d) { @@ -260,7 +256,7 @@ fn main() { } } - let mem_hists = blocks.len() * std::mem::size_of::<[u32; NUM_BINS]>(); + let mem_hists = blocks.len() * std::mem::size_of::(); let mem_rounds: usize = blocks.iter().map(|b| b.round_outputs.len() * 3).sum(); eprintln!( "\r {} blocks precomputed ({:.1} GB hists + {:.0} MB rounds) in {:.1}s", @@ -308,7 +304,7 @@ fn main() { let mut stats = Stats::new(); for bd in blocks.iter() { - let mut hist = *bd.full_hist; + let mut hist = bd.full_hist.clone(); for &(bin, digit) in &bd.round_outputs { if mask & (1 << (digit - 1)) != 0 { hist[bin as usize] -= 1; diff --git a/crates/brk_oracle/examples/sweep_tolerance.rs b/crates/brk_oracle/examples/sweep_tolerance.rs index d33a57fdc..f64c9d0a6 100644 --- a/crates/brk_oracle/examples/sweep_tolerance.rs +++ b/crates/brk_oracle/examples/sweep_tolerance.rs @@ -12,7 +12,7 @@ use std::path::PathBuf; use std::time::Instant; use brk_indexer::Indexer; -use brk_oracle::{Config, NUM_BINS, Oracle, PRICES, START_HEIGHT, cents_to_bin, sats_to_bin}; +use brk_oracle::{Config, Histogram, Oracle, PRICES, cents_to_bin, sats_to_bin}; use brk_types::{OutputType, Sats, TxIndex, TxOutIndex}; use vecdb::{AnyVec, ReadableVec, VecIndex}; @@ -114,7 +114,7 @@ struct RoundOutput { } struct BlockData { - full_hist: Box<[u32; NUM_BINS]>, + full_hist: Histogram, round_outputs: Vec, high_bin: f64, low_bin: f64, @@ -175,7 +175,7 @@ fn main() { // Outputs beyond 5% relative error will never be filtered at any tolerance. let max_tolerance: f64 = 0.05; - for h in START_HEIGHT..total_heights { + for h in sweep_start..total_heights { let ft = first_tx_index[h]; let next_ft = first_tx_index .get(h + 1) @@ -203,10 +203,6 @@ fn main() { .unwrap_or(TxOutIndex::from(total_outputs)) .to_usize(); - if h < sweep_start { - continue; - } - let values: Vec = indexer .vecs .outputs @@ -218,7 +214,7 @@ fn main() { .output_type .collect_range_at(out_start, out_end); - let mut full_hist = Box::new([0u32; NUM_BINS]); + let mut full_hist = Histogram::zeros(); let mut round_outputs = Vec::new(); for (sats, output_type) in values.into_iter().zip(output_types) { @@ -229,7 +225,7 @@ fn main() { continue; } if let Some(bin) = sats_to_bin(sats) { - full_hist[bin] += 1; + full_hist.increment(bin); let d = leading_digit(*sats); if (1..=9).contains(&d) { let rel_err = relative_roundness(*sats); @@ -267,7 +263,7 @@ fn main() { } } - let mem_hists = blocks.len() * std::mem::size_of::<[u32; NUM_BINS]>(); + let mem_hists = blocks.len() * std::mem::size_of::(); let mem_rounds: usize = blocks .iter() .map(|b| b.round_outputs.len() * std::mem::size_of::()) @@ -350,7 +346,7 @@ fn main() { let mut stats = Stats::new(); for bd in blocks.iter() { - let mut hist = *bd.full_hist; + let mut hist = bd.full_hist.clone(); // Remove outputs matching this tolerance + mask. let tol_f32 = tolerance as f32; diff --git a/crates/brk_oracle/examples/validate.rs b/crates/brk_oracle/examples/validate.rs index 07941c731..9a5af5244 100644 --- a/crates/brk_oracle/examples/validate.rs +++ b/crates/brk_oracle/examples/validate.rs @@ -9,7 +9,9 @@ use std::path::PathBuf; use brk_indexer::Indexer; -use brk_oracle::{Config, NUM_BINS, Oracle, PRICES, START_HEIGHT, cents_to_bin, sats_to_bin}; +use brk_oracle::{ + Config, Histogram, Oracle, PRICES, START_HEIGHT, cents_to_bin, default_eligible_bin, +}; use brk_types::{OutputType, Sats, TxIndex, TxOutIndex}; use vecdb::{AnyVec, ReadableVec, VecIndex}; @@ -155,8 +157,6 @@ fn main() { let first_tx_index: Vec = indexer.vecs.transactions.first_tx_index.collect(); let out_first: Vec = indexer.vecs.outputs.first_txout_index.collect(); - let ref_config = Config::default(); - for h in START_HEIGHT..total_heights { let ft = first_tx_index[h]; let next_ft = first_tx_index @@ -197,18 +197,10 @@ fn main() { .output_type .collect_range_at(out_start, out_end); - let mut hist = [0u32; NUM_BINS]; + let mut hist = Histogram::zeros(); for (sats, output_type) in values.into_iter().zip(output_types) { - if ref_config.excluded_output_types.contains(&output_type) { - continue; - } - if *sats < ref_config.min_sats - || (ref_config.exclude_common_round_values && sats.is_common_round_value()) - { - continue; - } - if let Some(bin) = sats_to_bin(sats) { - hist[bin] += 1; + if let Some(bin) = default_eligible_bin(sats, output_type) { + hist.increment(bin as usize); } } diff --git a/crates/brk_oracle/src/config.rs b/crates/brk_oracle/src/config.rs new file mode 100644 index 000000000..3d88e8deb --- /dev/null +++ b/crates/brk_oracle/src/config.rs @@ -0,0 +1,40 @@ +use brk_types::OutputType; + +/// Dust floor used by `Config::default()` and `default_eligible_bin`. +pub(crate) const DEFAULT_MIN_SATS: u64 = 1000; + +/// Output types skipped by `Config::default()` (noisy) and the source of +/// truth for `default_eligible_bin`'s precomputed exclusion mask. +pub(crate) const DEFAULT_EXCLUDED_OUTPUT_TYPES: &[OutputType] = + &[OutputType::P2TR, OutputType::P2WSH]; + +#[derive(Clone)] +pub struct Config { + /// EMA decay: 2/(N+1) where N is span in blocks. 2/7 = 6-block span. + pub alpha: f64, + /// Ring buffer depth. 12 blocks for deterministic convergence at any start height. + pub window_size: usize, + /// Search window bins below/above previous estimate. Asymmetric for log-scale. + pub search_below: usize, + pub search_above: usize, + /// Minimum output value in sats (dust filter). + pub min_sats: u64, + /// Exclude round BTC amounts that create false stencil matches. + pub exclude_common_round_values: bool, + /// Output types to ignore (e.g. P2TR, P2WSH are noisy). + pub excluded_output_types: Vec, +} + +impl Default for Config { + fn default() -> Self { + Self { + alpha: 2.0 / 7.0, + window_size: 12, + search_below: 9, + search_above: 11, + min_sats: DEFAULT_MIN_SATS, + exclude_common_round_values: true, + excluded_output_types: DEFAULT_EXCLUDED_OUTPUT_TYPES.to_vec(), + } + } +} diff --git a/crates/brk_oracle/src/histogram.rs b/crates/brk_oracle/src/histogram.rs new file mode 100644 index 000000000..2d901486a --- /dev/null +++ b/crates/brk_oracle/src/histogram.rs @@ -0,0 +1,41 @@ +use crate::NUM_BINS; + +/// Per-block oracle histogram: count of eligible outputs per bin. Wraps +/// the raw `[u32; NUM_BINS]` so callers can't pass arbitrary bin-indexed +/// arrays to `Oracle::process_histogram`. Deref to the underlying array +/// gives indexing for read paths. +#[derive(Clone)] +pub struct Histogram([u32; NUM_BINS]); + +impl Histogram { + #[inline] + pub fn zeros() -> Self { + Self([0; NUM_BINS]) + } + + #[inline] + pub fn increment(&mut self, bin: usize) { + self.0[bin] += 1; + } +} + +impl Default for Histogram { + fn default() -> Self { + Self::zeros() + } +} + +impl std::ops::Deref for Histogram { + type Target = [u32; NUM_BINS]; + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for Histogram { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} diff --git a/crates/brk_oracle/src/lib.rs b/crates/brk_oracle/src/lib.rs index 5e2374621..a5c06bb93 100644 --- a/crates/brk_oracle/src/lib.rs +++ b/crates/brk_oracle/src/lib.rs @@ -3,7 +3,14 @@ //! Detects round-dollar transaction patterns ($1, $5, $10, ... $10,000) in Bitcoin //! block outputs to derive the current price without any exchange data. -use brk_types::{Block, Cents, Dollars, OutputType, Sats}; +use brk_types::{Cents, Dollars, OutputType, Sats}; + +mod config; +mod histogram; + +use config::{DEFAULT_EXCLUDED_OUTPUT_TYPES, DEFAULT_MIN_SATS}; +pub use config::Config; +pub use histogram::Histogram; /// Pre-oracle dollar prices, one per line, heights 0..630_000. pub const PRICES: &str = include_str!("prices.txt"); @@ -55,6 +62,33 @@ pub fn sats_to_bin(sats: Sats) -> Option { } } +/// Bitmask form of `DEFAULT_EXCLUDED_OUTPUT_TYPES`, evaluated at compile +/// time so `default_eligible_bin` checks membership with a single AND. +const DEFAULT_EXCLUDED_MASK: u16 = { + let mut mask = 0u16; + let mut i = 0; + while i < DEFAULT_EXCLUDED_OUTPUT_TYPES.len() { + mask |= 1u16 << DEFAULT_EXCLUDED_OUTPUT_TYPES[i] as u8; + i += 1; + } + mask +}; + +/// Bin index for `(sats, output_type)` under `Config::default()` rules. +/// Returns `None` for excluded types (P2TR/P2WSH), dust, round-BTC values, +/// or out-of-range bins. Mirror of `Oracle::output_to_bin` for callers that +/// can pre-bin outputs at write time and don't have an `Oracle` handle. +#[inline(always)] +pub fn default_eligible_bin(sats: Sats, output_type: OutputType) -> Option { + if DEFAULT_EXCLUDED_MASK & (1u16 << output_type as u8) != 0 { + return None; + } + if *sats < DEFAULT_MIN_SATS || sats.is_common_round_value() { + return None; + } + sats_to_bin(sats).map(|b| b as u16) +} + /// Converts a fractional bin to a USD price in cents. /// For a $D output at price P: sats = D * 1e8 / P, so P = 10^(10 - bin/200) dollars, /// where 10 = log10($100 reference * 1e8 sats/BTC). @@ -140,40 +174,9 @@ fn find_best_bin( best_bin as f64 + sub_bin } -#[derive(Clone)] -pub struct Config { - /// EMA decay: 2/(N+1) where N is span in blocks. 2/7 = 6-block span. - pub alpha: f64, - /// Ring buffer depth. 12 blocks for deterministic convergence at any start height. - pub window_size: usize, - /// Search window bins below/above previous estimate. Asymmetric for log-scale. - pub search_below: usize, - pub search_above: usize, - /// Minimum output value in sats (dust filter). - pub min_sats: u64, - /// Exclude round BTC amounts that create false stencil matches. - pub exclude_common_round_values: bool, - /// Output types to ignore (e.g. P2TR, P2WSH are noisy). - pub excluded_output_types: Vec, -} - -impl Default for Config { - fn default() -> Self { - Self { - alpha: 2.0 / 7.0, - window_size: 12, - search_below: 9, - search_above: 11, - min_sats: 1000, - exclude_common_round_values: true, - excluded_output_types: vec![OutputType::P2TR, OutputType::P2WSH], - } - } -} - #[derive(Clone)] pub struct Oracle { - histograms: Vec<[u32; NUM_BINS]>, + histograms: Vec, ema: Box<[f64; NUM_BINS]>, cursor: usize, filled: usize, @@ -196,7 +199,7 @@ impl Oracle { .iter() .fold(0u16, |mask, ot| mask | (1 << *ot as u8)); Self { - histograms: vec![[0u32; NUM_BINS]; window_size], + histograms: vec![Histogram::zeros(); window_size], ema: Box::new([0.0; NUM_BINS]), cursor: 0, filled: 0, @@ -208,81 +211,21 @@ impl Oracle { } } - pub fn process_block(&mut self, block: &Block) -> f64 { - self.process_outputs( - block - .txdata - .iter() - .skip(1) // skip coinbase - .flat_map(|tx| &tx.output) - .map(|txout| { - ( - Sats::from(txout.value), - OutputType::from(&txout.script_pubkey), - ) - }), - ) - } - - pub fn process_outputs(&mut self, outputs: impl Iterator) -> f64 { - let mut hist = [0u32; NUM_BINS]; - for (sats, output_type) in outputs { - if let Some(bin) = self.eligible_bin(sats, output_type) { - hist[bin] += 1; - } - } - self.ingest(&hist) - } - - /// Create an oracle restored from a known price. - /// `fill` should feed warmup blocks to populate the ring buffer. - /// ref_bin is anchored to the checkpoint regardless of warmup drift. + /// Create an oracle restored from a known price. `fill` should call + /// `process_histogram` for the warmup blocks; during warmup the ring + /// fills without recomputing EMA or searching, then we recompute once + /// at the end so the first non-warmup call has a primed EMA. pub fn from_checkpoint(ref_bin: f64, config: Config, fill: impl FnOnce(&mut Self)) -> Self { let mut oracle = Self::new(ref_bin, config); oracle.warmup = true; fill(&mut oracle); oracle.warmup = false; oracle.recompute_ema(); - oracle.ref_bin = ref_bin; oracle } - pub fn process_histogram(&mut self, hist: &[u32; NUM_BINS]) -> f64 { - self.ingest(hist) - } - - pub fn ref_bin(&self) -> f64 { - self.ref_bin - } - - pub fn price_cents(&self) -> Cents { - bin_to_cents(self.ref_bin).into() - } - - pub fn price_dollars(&self) -> Dollars { - self.price_cents().into() - } - - #[inline(always)] - pub fn output_to_bin(&self, sats: Sats, output_type: OutputType) -> Option { - self.eligible_bin(sats, output_type) - } - - #[inline(always)] - fn eligible_bin(&self, sats: Sats, output_type: OutputType) -> Option { - if self.excluded_mask & (1 << output_type as u8) != 0 { - return None; - } - if *sats < self.config.min_sats - || (self.config.exclude_common_round_values && sats.is_common_round_value()) - { - return None; - } - sats_to_bin(sats) - } - - fn ingest(&mut self, hist: &[u32; NUM_BINS]) -> f64 { - self.histograms[self.cursor] = *hist; + pub fn process_histogram(&mut self, hist: &Histogram) -> f64 { + self.histograms[self.cursor] = hist.clone(); self.cursor = (self.cursor + 1) % self.config.window_size; if self.filled < self.config.window_size { self.filled += 1; @@ -301,6 +244,35 @@ impl Oracle { self.ref_bin } + pub fn ref_bin(&self) -> f64 { + self.ref_bin + } + + pub fn price_cents(&self) -> Cents { + bin_to_cents(self.ref_bin).into() + } + + pub fn price_dollars(&self) -> Dollars { + self.price_cents().into() + } + + /// Config-aware bin index for `(sats, output_type)`. Returns `None` + /// for excluded types, dust, round-BTC values, or out-of-range bins. + /// Callers under `Config::default()` should use `default_eligible_bin` + /// (free function) to skip the `&self` indirection. + #[inline(always)] + pub fn output_to_bin(&self, sats: Sats, output_type: OutputType) -> Option { + if self.excluded_mask & (1 << output_type as u8) != 0 { + return None; + } + if *sats < self.config.min_sats + || (self.config.exclude_common_round_values && sats.is_common_round_value()) + { + return None; + } + sats_to_bin(sats) + } + fn recompute_ema(&mut self) { self.ema.fill(0.0); for age in 0..self.filled { diff --git a/crates/brk_query/Cargo.toml b/crates/brk_query/Cargo.toml index 84e8615b3..70701ba49 100644 --- a/crates/brk_query/Cargo.toml +++ b/crates/brk_query/Cargo.toml @@ -17,6 +17,7 @@ brk_computer = { workspace = true } brk_error = { workspace = true, features = ["jiff", "vecdb"] } brk_indexer = { workspace = true } brk_mempool = { workspace = true } +brk_oracle = { workspace = true } brk_reader = { workspace = true } brk_rpc = { workspace = true } brk_traversable = { workspace = true } diff --git a/crates/brk_query/src/impl/addr.rs b/crates/brk_query/src/impl/addr.rs index 40fb80e43..b1958ca57 100644 --- a/crates/brk_query/src/impl/addr.rs +++ b/crates/brk_query/src/impl/addr.rs @@ -85,25 +85,6 @@ impl Query { }) } - /// Esplora `/address/:address/txs` first page: up to `mempool_limit` - /// mempool entries (newest first), then chain entries fill the response - /// up to `total_limit`. Pagination is path-style via `/txs/chain/:after_txid`. - pub fn addr_txs( - &self, - addr: Addr, - total_limit: usize, - mempool_limit: usize, - ) -> Result> { - let mut out = if self.mempool().is_some() { - self.addr_mempool_txs(&addr, mempool_limit)? - } else { - Vec::new() - }; - let chain_limit = total_limit.saturating_sub(out.len()); - out.extend(self.addr_txs_chain(&addr, None, chain_limit)?); - Ok(out) - } - pub fn addr_txs_chain( &self, addr: &Addr, @@ -236,8 +217,15 @@ impl Query { Ok(mempool.addr_txs(&bytes, limit)) } - /// Height of the last on-chain activity for an address (last tx_index → height). - pub fn addr_last_activity_height(&self, addr: &Addr) -> Result { + /// Height of the last on-chain activity for an address (last tx_index to height). + /// With `before_txid`, returns the newest activity strictly older than that + /// cursor. Used by paginated chain etags so a new tx above the cursor + /// doesn't invalidate deeper pages. + pub fn addr_last_activity_height( + &self, + addr: &Addr, + before_txid: Option<&Txid>, + ) -> Result { let (output_type, type_index) = self.resolve_addr(addr)?; let store = self .indexer() @@ -246,12 +234,25 @@ impl Query { .get(output_type) .data()?; let tx_index_len = self.safe_lengths().tx_index; - let last_tx_index = store - .prefix(type_index) - .rev() - .map(|(key, _): (AddrIndexTxIndex, Unit)| key.tx_index()) - .find(|tx_index| *tx_index < tx_index_len) - .ok_or(Error::UnknownAddr)?; + let last_tx_index = match before_txid { + Some(txid) => { + let before_tx_index = self.resolve_tx_index(txid)?; + let min = AddrIndexTxIndex::min_for_addr(type_index); + let cursor = AddrIndexTxIndex::from((type_index, before_tx_index)); + store + .range(min..cursor) + .rev() + .map(|(key, _): (AddrIndexTxIndex, Unit)| key.tx_index()) + .find(|tx_index| *tx_index < tx_index_len) + .ok_or(Error::UnknownAddr)? + } + None => store + .prefix(type_index) + .rev() + .map(|(key, _): (AddrIndexTxIndex, Unit)| key.tx_index()) + .find(|tx_index| *tx_index < tx_index_len) + .ok_or(Error::UnknownAddr)?, + }; self.confirmed_status_height(last_tx_index) } diff --git a/crates/brk_query/src/impl/price.rs b/crates/brk_query/src/impl/price.rs index 3c10d7407..482833ad3 100644 --- a/crates/brk_query/src/impl/price.rs +++ b/crates/brk_query/src/impl/price.rs @@ -1,20 +1,68 @@ -use brk_error::Result; +use std::sync::Arc; + +use brk_computer::prices::Vecs as PricesVecs; +use brk_error::{Error, Result}; +use brk_oracle::{Config, Oracle, cents_to_bin}; use brk_types::{ Dollars, ExchangeRates, HistoricalPrice, HistoricalPriceEntry, Hour4, INDEX_EPOCH, Timestamp, }; -use vecdb::ReadableVec; +use vecdb::{AnyVec, ReadableVec, VecIndex}; use crate::Query; impl Query { pub fn live_price(&self) -> Result { - let mut oracle = self.computer().prices.live_oracle(self.indexer())?; + let base = self.cached_oracle()?; + Ok(match self.mempool() { + Some(mempool) => { + let mut oracle = (*base).clone(); + oracle.process_histogram(&mempool.live_histogram()); + oracle.price_dollars() + } + None => base.price_dollars(), + }) + } - if let Some(mempool) = self.mempool() { - mempool.process_live_outputs(|iter| oracle.process_outputs(iter)); + /// Oracle warmed by the last `window_size` committed blocks, seeded from + /// the last committed price. Cached per tip height; rebuilt on advance or + /// reorg. Reads are capped at `safe_lengths` so concurrent indexer writes + /// stay invisible. + fn cached_oracle(&self) -> Result> { + let safe_lengths = self.safe_lengths(); + let height = safe_lengths.height; + + if let Some(oracle) = self + .0 + .live_oracle + .read() + .unwrap() + .as_ref() + .filter(|(h, _)| *h == height) + .map(|(_, o)| o.clone()) + { + return Ok(oracle); } - Ok(oracle.price_dollars()) + let cents_height = &self.computer().prices.spot.cents.height; + let last_cents = cents_height + .len() + .checked_sub(1) + .and_then(|i| cents_height.collect_one_at(i)) + .ok_or_else(|| Error::NotFound("oracle prices not yet computed".to_string()))?; + + let config = Config::default(); + let seed_bin = cents_to_bin(last_cents.inner() as f64); + let tip = height.to_usize(); + let warmup_range = tip.saturating_sub(config.window_size)..tip; + let oracle = Arc::new(Oracle::from_checkpoint(seed_bin, config, |o| { + PricesVecs::feed_blocks(o, self.indexer(), warmup_range, Some(&safe_lengths)); + })); + + let mut cache = self.0.live_oracle.write().unwrap(); + if cache.as_ref().is_none_or(|(h, _)| *h != height) { + *cache = Some((height, oracle.clone())); + } + Ok(oracle) } pub fn historical_price(&self, timestamp: Option) -> Result { diff --git a/crates/brk_query/src/lib.rs b/crates/brk_query/src/lib.rs index 034611592..e59d5c61d 100644 --- a/crates/brk_query/src/lib.rs +++ b/crates/brk_query/src/lib.rs @@ -1,12 +1,16 @@ #![doc = include_str!("../README.md")] #![allow(clippy::module_inception)] -use std::{path::Path, sync::Arc}; +use std::{ + path::Path, + sync::{Arc, RwLock}, +}; use brk_computer::Computer; use brk_error::{OptionData, Result}; use brk_indexer::{Indexer, Lengths}; use brk_mempool::Mempool; +use brk_oracle::Oracle; use brk_reader::Reader; use brk_rpc::Client; use brk_types::{BlockHash, BlockHashPrefix, Height, SyncStatus}; @@ -32,6 +36,7 @@ struct QueryInner<'a> { indexer: &'a Indexer, computer: &'a Computer, mempool: Option, + live_oracle: RwLock)>>, } impl Query { @@ -54,6 +59,7 @@ impl Query { indexer, computer, mempool, + live_oracle: RwLock::new(None), })) } diff --git a/crates/brk_server/src/api/addrs.rs b/crates/brk_server/src/api/addrs.rs index fe74abfd1..a9f641776 100644 --- a/crates/brk_server/src/api/addrs.rs +++ b/crates/brk_server/src/api/addrs.rs @@ -11,6 +11,14 @@ use crate::{ params::{AddrAfterTxidParam, AddrParam, Empty, ValidateAddrParam}, }; +/// Esplora `/txs` and `/txs/chain` page sizes. Wire-protocol constants from +/// mempool.space/esplora, not deployment policy. `/txs` returns up to +/// `MEMPOOL_PAGE` mempool entries plus a chain page sized to reach +/// `TXS_TOTAL_TARGET` total, floored at `CHAIN_PAGE`. +const MEMPOOL_PAGE: usize = 50; +const CHAIN_PAGE: usize = 25; +const TXS_TOTAL_TARGET: usize = 50; + pub trait AddrRoutes { fn add_addr_routes(self) -> Self; } @@ -26,7 +34,7 @@ impl AddrRoutes for ApiRouter { _: Empty, State(state): State | { - let strategy = state.addr_strategy(Version::ONE, &path.addr, false); + let strategy = state.addr_strategy(Version::ONE, &path.addr, false, None); state.respond_json(&headers, strategy, &uri, move |q| q.addr(path.addr)).await }, |op| op .id("get_address") @@ -49,13 +57,24 @@ impl AddrRoutes for ApiRouter { _: Empty, State(state): State | { - let strategy = state.addr_strategy(Version::ONE, &path.addr, false); - state.respond_json(&headers, strategy, &uri, move |q| q.addr_txs(path.addr, 50, 50)).await + let strategy = state.addr_strategy(Version::ONE, &path.addr, false, None); + state.respond_json(&headers, strategy, &uri, move |q| { + let mempool_txs = if q.mempool().is_some() { + q.addr_mempool_txs(&path.addr, MEMPOOL_PAGE)? + } else { + Vec::new() + }; + let chain_limit = TXS_TOTAL_TARGET.saturating_sub(mempool_txs.len()).max(CHAIN_PAGE); + let chain_txs = q.addr_txs_chain(&path.addr, None, chain_limit)?; + let mut out = mempool_txs; + out.extend(chain_txs); + Ok(out) + }).await }, |op| op .id("get_address_txs") .addrs_tag() .summary("Address transactions") - .description("Get transaction history for an address, sorted with newest first. Returns up to 50 entries: mempool transactions first, then confirmed transactions filling the remainder. To paginate further confirmed transactions, use `/address/{address}/txs/chain/{last_seen_txid}`.\n\n*[Mempool.space docs](https://mempool.space/docs/api/rest#get-address-transactions)*") + .description("Get transaction history for an address, newest first. Returns up to 50 mempool transactions plus a confirmed page sized to fill the response to 50 total (chain floor of 25, so 25-50 confirmed depending on mempool weight). To paginate further confirmed history, use `/address/{address}/txs/chain/{last_seen_txid}`.\n\n*[Mempool.space docs](https://mempool.space/docs/api/rest#get-address-transactions)*") .json_response::>() .not_modified() .bad_request() @@ -72,8 +91,8 @@ impl AddrRoutes for ApiRouter { _: Empty, State(state): State | { - let strategy = state.addr_strategy(Version::ONE, &path.addr, true); - state.respond_json(&headers, strategy, &uri, move |q| q.addr_txs_chain(&path.addr, None, 25)).await + let strategy = state.addr_strategy(Version::ONE, &path.addr, true, None); + state.respond_json(&headers, strategy, &uri, move |q| q.addr_txs_chain(&path.addr, None, CHAIN_PAGE)).await }, |op| op .id("get_address_confirmed_txs") .addrs_tag() @@ -95,8 +114,8 @@ impl AddrRoutes for ApiRouter { _: Empty, State(state): State | { - let strategy = state.addr_strategy(Version::ONE, &path.addr, true); - state.respond_json(&headers, strategy, &uri, move |q| q.addr_txs_chain(&path.addr, Some(path.after_txid), 25)).await + let strategy = state.addr_strategy(Version::ONE, &path.addr, true, Some(&path.after_txid)); + state.respond_json(&headers, strategy, &uri, move |q| q.addr_txs_chain(&path.addr, Some(path.after_txid), CHAIN_PAGE)).await }, |op| op .id("get_address_confirmed_txs_after") .addrs_tag() @@ -119,7 +138,7 @@ impl AddrRoutes for ApiRouter { State(state): State | { let hash = state.sync(|q| q.addr_mempool_hash(&path.addr)).unwrap_or(0); - state.respond_json(&headers, CacheStrategy::MempoolHash(hash), &uri, move |q| q.addr_mempool_txs(&path.addr, 50)).await + state.respond_json(&headers, CacheStrategy::MempoolHash(hash), &uri, move |q| q.addr_mempool_txs(&path.addr, MEMPOOL_PAGE)).await }, |op| op .id("get_address_mempool_txs") .addrs_tag() @@ -141,7 +160,7 @@ impl AddrRoutes for ApiRouter { _: Empty, State(state): State | { - let strategy = state.addr_strategy(Version::ONE, &path.addr, false); + let strategy = state.addr_strategy(Version::ONE, &path.addr, false, None); let max_utxos = state.max_utxos; state.respond_json(&headers, strategy, &uri, move |q| q.addr_utxos(path.addr, max_utxos)).await }, |op| op diff --git a/crates/brk_server/src/api/mining.rs b/crates/brk_server/src/api/mining.rs index 86385b6d0..0e02e15e4 100644 --- a/crates/brk_server/src/api/mining.rs +++ b/crates/brk_server/src/api/mining.rs @@ -135,7 +135,8 @@ impl MiningRoutes for ApiRouter { "/api/v1/mining/pool/{slug}/blocks", get_with( async |uri: Uri, headers: HeaderMap, Path(path): Path, _: Empty, State(state): State| { - state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.pool_blocks(path.slug, None, POOL_BLOCKS_LIMIT)).await + let strategy = state.pool_blocks_strategy(Version::ONE, path.slug); + state.respond_json(&headers, strategy, &uri, move |q| q.pool_blocks(path.slug, None, POOL_BLOCKS_LIMIT)).await }, |op| { op.id("get_pool_blocks") diff --git a/crates/brk_server/src/api/server.rs b/crates/brk_server/src/api/server.rs index f5941adaf..0c52978fb 100644 --- a/crates/brk_server/src/api/server.rs +++ b/crates/brk_server/src/api/server.rs @@ -29,13 +29,7 @@ impl ServerRoutes for ApiRouter { let uptime = state.started_instant.elapsed(); let started_at = state.started_at.to_string(); let sync = state - .run(move |q| { - let tip_height = q - .client() - .get_last_height() - .unwrap_or(q.height()); - q.sync_status(tip_height) - }) + .run(move |q| q.sync_status(q.height())) .await .expect("health sync task panicked"); let mut response = axum::Json(Health { @@ -57,7 +51,7 @@ impl ServerRoutes for ApiRouter { op.id("get_health") .server_tag() .summary("Health check") - .description("Returns the health status of the API server, including uptime information.") + .description("Liveness probe. Returns server identity, uptime, and indexed/computed heights from local state only (no bitcoind round-trip). For real chain-tip catch-up, see `/api/server/sync`.") .json_response::() }, ), diff --git a/crates/brk_server/src/state.rs b/crates/brk_server/src/state.rs index d2949c10c..d9639d656 100644 --- a/crates/brk_server/src/state.rs +++ b/crates/brk_server/src/state.rs @@ -6,13 +6,13 @@ use axum::{ }; use brk_query::AsyncQuery; use brk_types::{ - Addr, BlockHash, BlockHashPrefix, Date, Height, ONE_HOUR_IN_SEC, Timestamp as BrkTimestamp, - Txid, Version, + Addr, BlockHash, BlockHashPrefix, Date, Height, ONE_HOUR_IN_SEC, PoolSlug, + Timestamp as BrkTimestamp, Txid, Version, }; use derive_more::Deref; use jiff::Timestamp; use serde::Serialize; -use vecdb::ReadableVec; +use vecdb::{ReadableVec, VecIndex}; use crate::{CacheParams, CacheStrategy, Error, Website, extended::ResponseExtended}; @@ -70,16 +70,26 @@ impl AppState { }) } - /// Smart address caching: checks mempool activity first (unless `chain_only`), then on-chain. - /// - Address has mempool txs → `MempoolHash(addr_specific_hash)` - /// - No mempool, has on-chain activity → `BlockBound(last_activity_block)` - /// - Unknown address → `Tip` - pub fn addr_strategy(&self, version: Version, addr: &Addr, chain_only: bool) -> CacheStrategy { + /// Smart address caching. Checks mempool activity first (unless `chain_only`), then on-chain. + /// - Address has mempool txs: `MempoolHash(addr_specific_hash)` + /// - No mempool, has on-chain activity: `BlockBound(last_activity_block)` + /// - Unknown address: `Tip` + /// + /// `before_txid` narrows the on-chain branch to the newest activity strictly + /// older than the cursor, so paginated chain pages stay cacheable when newer + /// activity arrives above the cursor. + pub fn addr_strategy( + &self, + version: Version, + addr: &Addr, + chain_only: bool, + before_txid: Option<&Txid>, + ) -> CacheStrategy { self.sync(|q| { if !chain_only && let Some(mempool_hash) = q.addr_mempool_hash(addr) { return CacheStrategy::MempoolHash(mempool_hash); } - q.addr_last_activity_height(addr) + q.addr_last_activity_height(addr, before_txid) .and_then(|h| { let block_hash = q.block_hash_by_height(h)?; Ok(CacheStrategy::BlockBound( @@ -135,6 +145,29 @@ impl AppState { }) } + /// `BlockBound` on the pool's last-mined block hash, `Tip` if the pool has + /// never mined. Lets the no-cursor pool-blocks page stay cached when *other* + /// pools mine; only invalidates when this pool itself mines. + pub fn pool_blocks_strategy(&self, version: Version, slug: PoolSlug) -> CacheStrategy { + self.sync(|q| { + let tip = q.height().to_usize(); + let last = q + .computer() + .pools + .pool_heights + .read() + .get(&slug) + .and_then(|heights| { + let pos = heights.partition_point(|h| h.to_usize() <= tip); + pos.checked_sub(1).map(|i| heights[i]) + }); + match last.and_then(|h| q.block_hash_by_height(h).ok()) { + Some(hash) => CacheStrategy::BlockBound(version, BlockHashPrefix::from(&hash)), + None => CacheStrategy::Tip, + } + }) + } + pub fn mempool_strategy(&self) -> CacheStrategy { let hash = self.sync(|q| q.mempool().map(|m| m.next_block_hash().into()).unwrap_or(0)); CacheStrategy::MempoolHash(hash)