From 9347b42c9a491b585f2ef734a9d57bd32dd1cffe Mon Sep 17 00:00:00 2001 From: nym21 Date: Thu, 7 May 2026 11:21:37 +0200 Subject: [PATCH] mempool: fixes --- Cargo.lock | 4 + crates/blk/.gitignore | 2 + crates/blk/Cargo.toml | 14 ++ crates/blk/src/main.rs | 1 + crates/brk_client/src/lib.rs | 9 + crates/brk_mempool/examples/mempool.rs | 29 ++- crates/brk_mempool/src/cluster/mod.rs | 2 +- crates/brk_mempool/src/lib.rs | 142 ++++++++++++- crates/brk_mempool/src/rbf.rs | 132 ++++++++++++ crates/brk_mempool/src/stats.rs | 43 ++++ crates/brk_mempool/src/steps/applier.rs | 21 +- crates/brk_mempool/src/steps/rebuilder/mod.rs | 8 +- crates/brk_mempool/src/stores/state.rs | 16 +- crates/brk_query/src/impl/addr.rs | 30 +-- crates/brk_query/src/impl/cpfp.rs | 18 +- crates/brk_query/src/impl/mempool.rs | 198 ++++++------------ crates/brk_query/src/impl/price.rs | 7 +- crates/brk_query/src/impl/tx.rs | 57 +++-- crates/brk_server/src/state.rs | 2 +- modules/brk-client/index.js | 14 ++ packages/brk_client/brk_client/__init__.py | 8 + 21 files changed, 518 insertions(+), 239 deletions(-) create mode 100644 crates/blk/.gitignore create mode 100644 crates/blk/Cargo.toml create mode 100644 crates/blk/src/main.rs create mode 100644 crates/brk_mempool/src/rbf.rs create mode 100644 crates/brk_mempool/src/stats.rs diff --git a/Cargo.lock b/Cargo.lock index 8b94b55d3..7f241dfbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,6 +293,10 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +[[package]] +name = "blk" +version = "0.3.0-beta.7" + [[package]] name = "brk" version = "0.3.0-beta.7" diff --git a/crates/blk/.gitignore b/crates/blk/.gitignore new file mode 100644 index 000000000..88b06327a --- /dev/null +++ b/crates/blk/.gitignore @@ -0,0 +1,2 @@ +*.md +!README.md diff --git a/crates/blk/Cargo.toml b/crates/blk/Cargo.toml new file mode 100644 index 000000000..3d4927598 --- /dev/null +++ b/crates/blk/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "blk" +description = "A command line tool to inspect Bitcoin Core blocks" +version.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] + +[[bin]] +name = "blk" +path = "src/main.rs" diff --git a/crates/blk/src/main.rs b/crates/blk/src/main.rs new file mode 100644 index 000000000..f328e4d9d --- /dev/null +++ b/crates/blk/src/main.rs @@ -0,0 +1 @@ +fn main() {} diff --git a/crates/brk_client/src/lib.rs b/crates/brk_client/src/lib.rs index 7357b59a8..1a32a3a82 100644 --- a/crates/brk_client/src/lib.rs +++ b/crates/brk_client/src/lib.rs @@ -9236,6 +9236,15 @@ impl BrkClient { self.base.get_json(&format!("/api/mempool")) } + /// Mempool content hash + /// + /// Returns an opaque `u64` that changes whenever the projected next block changes. Same value as the mempool ETag. Useful as a freshness/liveness signal: if it stays constant for tens of seconds on a live network, the mempool sync loop has stalled. + /// + /// Endpoint: `GET /api/mempool/hash` + pub fn get_mempool_hash(&self) -> Result { + self.base.get_json(&format!("/api/mempool/hash")) + } + /// Live BTC/USD price /// /// Returns the current BTC/USD price in dollars, derived from on-chain round-dollar output patterns in the last 12 blocks plus mempool. diff --git a/crates/brk_mempool/examples/mempool.rs b/crates/brk_mempool/examples/mempool.rs index 4ed06ec3e..6107b3c34 100644 --- a/crates/brk_mempool/examples/mempool.rs +++ b/crates/brk_mempool/examples/mempool.rs @@ -1,7 +1,7 @@ use std::{thread, time::Duration}; use brk_error::Result; -use brk_mempool::Mempool; +use brk_mempool::{Mempool, MempoolStats}; use brk_rpc::{Auth, Client}; fn main() -> Result<()> { @@ -23,12 +23,7 @@ fn main() -> Result<()> { loop { thread::sleep(Duration::from_secs(5)); - let info = mempool.info(); - let entries = mempool.entries(); - let txs = mempool.txs(); - let addrs = mempool.addrs(); - let graveyard = mempool.graveyard(); - let outpoint_spends = mempool.state().outpoint_spends.read(); + let stats = MempoolStats::from(&mempool); let snapshot = mempool.snapshot(); let cluster_nodes_total: usize = snapshot.clusters.iter().map(|c| c.nodes.len()).sum(); @@ -42,16 +37,16 @@ fn main() -> Result<()> { snap.clusters={} snap.cluster_nodes={} snap.cluster_of.len={} snap.cluster_of.active={} \ snap.blocks={} snap.blocks_txs={} \ rebuilds={} skip.clean={} skip.throttled={}", - info.count, - entries.entries().len(), - entries.active_count(), - entries.free_slots_count(), - txs.len(), - txs.unresolved().len(), - addrs.len(), - outpoint_spends.len(), - graveyard.tombstones_len(), - graveyard.order_len(), + stats.info_count, + stats.entry_slot_count, + stats.entry_active_count, + stats.entry_free_count, + stats.tx_count, + stats.unresolved_count, + stats.addr_count, + stats.outpoint_spend_count, + stats.graveyard_tombstone_count, + stats.graveyard_order_count, snapshot.clusters.len(), cluster_nodes_total, snapshot.cluster_of_len(), diff --git a/crates/brk_mempool/src/cluster/mod.rs b/crates/brk_mempool/src/cluster/mod.rs index c40ac9fe3..39f203b82 100644 --- a/crates/brk_mempool/src/cluster/mod.rs +++ b/crates/brk_mempool/src/cluster/mod.rs @@ -162,7 +162,7 @@ impl Cluster { } } - debug_assert_eq!(out.len(), n, "cluster contained a cycle"); + assert_eq!(out.len(), n, "cluster contained a cycle"); out } diff --git a/crates/brk_mempool/src/lib.rs b/crates/brk_mempool/src/lib.rs index 5fe34343b..71c43a534 100644 --- a/crates/brk_mempool/src/lib.rs +++ b/crates/brk_mempool/src/lib.rs @@ -22,17 +22,24 @@ use std::{ use brk_error::Result; use brk_rpc::Client; -use brk_types::{AddrBytes, MempoolInfo, OutpointPrefix, TxOut, Txid, TxidPrefix, Vin, Vout}; +use brk_types::{ + AddrBytes, AddrMempoolStats, FeeRate, MempoolInfo, MempoolRecentTx, OutpointPrefix, OutputType, + Sats, Timestamp, Transaction, TxOut, Txid, TxidPrefix, Vin, Vout, +}; use parking_lot::RwLockReadGuard; use tracing::error; pub mod cluster; mod cpfp; +mod rbf; +mod stats; pub(crate) mod steps; pub(crate) mod stores; #[cfg(test)] mod tests; +pub use rbf::{RbfForTx, RbfNode}; +pub use stats::MempoolStats; use steps::{Applier, Fetcher, Preparer, Rebuilder, Resolver}; pub use steps::{BlockStats, RecommendedFees, Snapshot, TxEntry, TxRemoval}; use stores::{AddrTracker, MempoolState}; @@ -108,22 +115,145 @@ impl Mempool { Some((spender_txid, Vin::from(vin_pos))) } - pub fn txs(&self) -> RwLockReadGuard<'_, TxStore> { + pub(crate) fn txs(&self) -> RwLockReadGuard<'_, TxStore> { self.0.state.txs.read() } - pub fn entries(&self) -> RwLockReadGuard<'_, EntryPool> { + pub(crate) fn entries(&self) -> RwLockReadGuard<'_, EntryPool> { self.0.state.entries.read() } - pub fn addrs(&self) -> RwLockReadGuard<'_, AddrTracker> { + pub(crate) fn addrs(&self) -> RwLockReadGuard<'_, AddrTracker> { self.0.state.addrs.read() } - pub fn graveyard(&self) -> RwLockReadGuard<'_, TxGraveyard> { + pub(crate) fn graveyard(&self) -> RwLockReadGuard<'_, TxGraveyard> { self.0.state.graveyard.read() } + pub fn contains_txid(&self, txid: &Txid) -> bool { + self.txs().contains(txid) + } + + /// Apply `f` to the live tx body if present. + pub fn with_tx(&self, txid: &Txid, f: impl FnOnce(&Transaction) -> R) -> Option { + self.txs().get(txid).map(f) + } + + /// Apply `f` to a `Vanished` tombstone's tx body if present. + /// `Replaced` tombstones return `None` because the tx will not confirm. + pub fn with_vanished_tx( + &self, + txid: &Txid, + f: impl FnOnce(&Transaction) -> R, + ) -> Option { + let graveyard = self.graveyard(); + let tomb = graveyard.get(txid)?; + matches!(tomb.reason(), TxRemoval::Vanished).then(|| f(&tomb.tx)) + } + + /// Snapshot of all live mempool txids. + pub fn txids(&self) -> Vec { + self.txs().keys().cloned().collect() + } + + /// Snapshot of recent live txs. + pub fn recent_txs(&self) -> Vec { + self.txs().recent().to_vec() + } + + /// Per-address mempool stats. `None` if the address has no live mempool activity. + pub fn addr_stats(&self, addr: &AddrBytes) -> Option { + self.addrs().get(addr).map(|e| e.stats.clone()) + } + + /// Live mempool txs touching `addr`, newest first by `first_seen`, + /// capped at `limit`. Acquires `txs`, `addrs`, `entries` in canonical + /// order; returns owned `Transaction`s so the lock is released + /// before the caller does anything else with them. + pub fn addr_txs(&self, addr: &AddrBytes, limit: usize) -> Vec { + let txs = self.txs(); + let addrs = self.addrs(); + let entries = self.entries(); + let Some(entry) = addrs.get(addr) else { + return vec![]; + }; + let mut ordered: Vec<(Timestamp, &Txid)> = entry + .txids + .iter() + .map(|txid| { + let first_seen = entries + .get(&TxidPrefix::from(txid)) + .map(|e| e.first_seen) + .unwrap_or_default(); + (first_seen, txid) + }) + .collect(); + ordered.sort_unstable_by_key(|b| std::cmp::Reverse(b.0)); + ordered + .into_iter() + .filter_map(|(_, txid)| txs.get(txid).cloned()) + .take(limit) + .collect() + } + + /// Apply `f` to an iterator over `(value, output_type)` for every output + /// of every live mempool tx. The lock is held for the duration of the call. + pub fn process_live_outputs( + &self, + f: impl FnOnce(&mut dyn Iterator) -> R, + ) -> R { + let txs = self.txs(); + let mut iter = txs + .values() + .flat_map(|tx| &tx.output) + .map(|txout| (txout.value, txout.type_())); + f(&mut iter) + } + + /// Effective fee rate for a live mempool tx: the seed's chunk rate from + /// the latest snapshot, with fall-back to the entry's simple `fee/vsize` + /// when the snapshot doesn't yet contain it. + pub fn live_effective_fee_rate(&self, prefix: &TxidPrefix) -> Option { + let entries = self.entries(); + if let Some(seed_idx) = entries.idx_of(prefix) + && let Some(rate) = self.snapshot().chunk_rate_of(seed_idx) + { + return Some(rate); + } + entries.get(prefix).map(|e| e.fee_rate()) + } + + /// Fee rate snapshotted into a graveyard tomb at burial. + pub fn graveyard_fee_rate(&self, txid: &Txid) -> Option { + self.graveyard() + .get(txid) + .map(|tomb| tomb.entry.fee_rate()) + } + + /// `first_seen` Unix-second timestamps for `txids`, in input order. + /// Returns 0 for unknown txids. `Vanished` graveyard tombstones fall + /// back to the buried entry's `first_seen` so a tx doesn't flicker + /// to 0 in the brief window between mempool drop and indexer catch-up. + pub fn transaction_times(&self, txids: &[Txid]) -> Vec { + let entries = self.entries(); + let graveyard = self.graveyard(); + txids + .iter() + .map(|txid| { + if let Some(e) = entries.get(&TxidPrefix::from(txid)) { + return u64::from(e.first_seen); + } + if let Some(tomb) = graveyard.get(txid) + && matches!(tomb.reason(), TxRemoval::Vanished) + { + return u64::from(tomb.entry.first_seen); + } + 0 + }) + .collect() + } + /// Infinite update loop with a 1 second interval. pub fn start(&self) { self.start_with(|| {}); @@ -185,7 +315,7 @@ impl Mempool { Ok(()) } - pub fn state(&self) -> &MempoolState { + pub(crate) fn state(&self) -> &MempoolState { &self.0.state } } diff --git a/crates/brk_mempool/src/rbf.rs b/crates/brk_mempool/src/rbf.rs new file mode 100644 index 000000000..43d93b54b --- /dev/null +++ b/crates/brk_mempool/src/rbf.rs @@ -0,0 +1,132 @@ +//! RBF (Replace-By-Fee) tree extraction from the live mempool + +//! graveyard. +//! +//! Both methods return owned, lock-free `RbfNode` trees so the caller +//! (typically `brk_query`) can enrich each node with indexer-resident +//! data (`mined`, effective fee rate) after the mempool lock window +//! closes. Doing the enrichment under the lock would re-enter +//! `Mempool` indirectly via `effective_fee_rate` and recursively +//! acquire the same `entries`/`graveyard` read locks, which can +//! deadlock against a queued writer in `parking_lot`. + +use brk_types::{Sats, Timestamp, Transaction, Txid, TxidPrefix, VSize}; +use rustc_hash::FxHashSet; + +use crate::{ + Mempool, TxEntry, TxRemoval, TxStore, + stores::{EntryPool, TxGraveyard}, +}; + +/// One node in an RBF replacement tree, populated entirely from +/// mempool state. The caller layers on `mined` and effective fee rate +/// after the lock has been released. +#[derive(Debug, Clone)] +pub struct RbfNode { + pub txid: Txid, + pub fee: Sats, + pub vsize: VSize, + /// Sum of the tx's output amounts. + pub value: Sats, + pub first_seen: Timestamp, + /// BIP-125 signaling: at least one input has sequence < 0xffffffff-1. + pub rbf: bool, + /// `true` iff any predecessor in this subtree was non-signaling. + pub full_rbf: bool, + pub replaces: Vec, +} + +/// Result of [`Mempool::rbf_for_tx`]. +#[derive(Debug, Clone, Default)] +pub struct RbfForTx { + /// Tree rooted at the requested tx's terminal replacer. `None` if + /// the tx is unknown to both the live pool and the graveyard. + pub root: Option, + /// Direct predecessors of the requested tx (txids only). + pub replaces: Vec, +} + +impl Mempool { + /// Build the RBF tree relevant to `txid`: walk forward through + /// `Replaced { by }` links to the terminal replacer, return its + /// full predecessor tree, plus the requested tx's own direct + /// predecessors. Single read-lock window in canonical order. + pub fn rbf_for_tx(&self, txid: &Txid) -> RbfForTx { + let txs = self.txs(); + let entries = self.entries(); + let graveyard = self.graveyard(); + + let root_txid = walk_to_replacement_root(&graveyard, *txid); + let replaces: Vec = graveyard.predecessors_of(txid).map(|(p, _)| *p).collect(); + let root = build_node(&root_txid, &txs, &entries, &graveyard); + RbfForTx { root, replaces } + } + + /// Recent terminal-replacer trees, most-recent replacement first, + /// deduplicated by tree root, capped at `limit`. When + /// `full_rbf_only` is true, drops trees with no non-signaling + /// predecessor anywhere. + pub fn recent_rbf_trees(&self, full_rbf_only: bool, limit: usize) -> Vec { + let txs = self.txs(); + let entries = self.entries(); + let graveyard = self.graveyard(); + + let mut seen: FxHashSet = FxHashSet::default(); + graveyard + .replaced_iter_recent_first() + .filter_map(|(_, by)| { + let root = walk_to_replacement_root(&graveyard, *by); + seen.insert(root).then_some(root) + }) + .filter_map(|root| build_node(&root, &txs, &entries, &graveyard)) + .filter(|n| !full_rbf_only || n.full_rbf) + .take(limit) + .collect() + } +} + +fn walk_to_replacement_root(graveyard: &TxGraveyard, mut root: Txid) -> Txid { + while let Some(TxRemoval::Replaced { by }) = graveyard.get(&root).map(|t| t.reason()) { + root = *by; + } + root +} + +fn build_node( + txid: &Txid, + txs: &TxStore, + entries: &EntryPool, + graveyard: &TxGraveyard, +) -> Option { + let (tx, entry) = resolve_node(txid, txs, entries, graveyard)?; + + let replaces: Vec = graveyard + .predecessors_of(txid) + .filter_map(|(pred, _)| build_node(pred, txs, entries, graveyard)) + .collect(); + + let full_rbf = replaces.iter().any(|c| !c.rbf || c.full_rbf); + let value: Sats = tx.output.iter().map(|o| o.value).sum(); + + Some(RbfNode { + txid: *txid, + fee: entry.fee, + vsize: entry.vsize, + value, + first_seen: entry.first_seen, + rbf: entry.rbf, + full_rbf, + replaces, + }) +} + +fn resolve_node<'a>( + txid: &Txid, + txs: &'a TxStore, + entries: &'a EntryPool, + graveyard: &'a TxGraveyard, +) -> Option<(&'a Transaction, &'a TxEntry)> { + if let (Some(tx), Some(entry)) = (txs.get(txid), entries.get(&TxidPrefix::from(txid))) { + return Some((tx, entry)); + } + graveyard.get(txid).map(|tomb| (&tomb.tx, &tomb.entry)) +} diff --git a/crates/brk_mempool/src/stats.rs b/crates/brk_mempool/src/stats.rs new file mode 100644 index 000000000..c73fec71a --- /dev/null +++ b/crates/brk_mempool/src/stats.rs @@ -0,0 +1,43 @@ +//! Owned snapshot of mempool in-memory counters for diagnostic display. + +use crate::Mempool; + +#[derive(Debug, Clone)] +pub struct MempoolStats { + pub info_count: usize, + pub tx_count: usize, + pub unresolved_count: usize, + pub addr_count: usize, + pub entry_slot_count: usize, + pub entry_active_count: usize, + pub entry_free_count: usize, + pub outpoint_spend_count: usize, + pub graveyard_tombstone_count: usize, + pub graveyard_order_count: usize, +} + +impl From<&Mempool> for MempoolStats { + /// Acquires every sub-lock in canonical order to build a coherent + /// snapshot. Cheap; locks are released as soon as the counts are read. + fn from(mempool: &Mempool) -> Self { + let state = mempool.state(); + let info = state.info.read(); + let txs = state.txs.read(); + let addrs = state.addrs.read(); + let entries = state.entries.read(); + let outpoint_spends = state.outpoint_spends.read(); + let graveyard = state.graveyard.read(); + Self { + info_count: info.count, + tx_count: txs.len(), + unresolved_count: txs.unresolved().len(), + addr_count: addrs.len(), + entry_slot_count: entries.entries().len(), + entry_active_count: entries.active_count(), + entry_free_count: entries.free_slots_count(), + outpoint_spend_count: outpoint_spends.len(), + graveyard_tombstone_count: graveyard.tombstones_len(), + graveyard_order_count: graveyard.order_len(), + } + } +} diff --git a/crates/brk_mempool/src/steps/applier.rs b/crates/brk_mempool/src/steps/applier.rs index 6c1e72300..2655b7e65 100644 --- a/crates/brk_mempool/src/steps/applier.rs +++ b/crates/brk_mempool/src/steps/applier.rs @@ -33,24 +33,25 @@ impl Applier { } fn bury_one(s: &mut LockedState, prefix: &TxidPrefix, reason: TxRemoval) { - let Some((idx, entry)) = s.entries.remove(prefix) else { + let Some(txid) = s.entries.get(prefix).map(|e| e.txid) else { return; }; - let txid = entry.txid; - let Some(tx) = s.txs.remove(&txid) else { + if !s.txs.contains(&txid) { // entries had this prefix but txs didn't — a state divergence // that should be impossible: publish/bury both touch them // together under one write_all guard. Reaching this branch - // means a prior cycle left the two stores out of sync (panic - // mid-publish, prefix collision, etc). The slot has been - // freed by entries.remove, but addrs/outpoint_spends/info may - // still hold stale references that we can't repair without - // the tx body. Log loudly so the corruption is visible. + // means a prior cycle left the two stores out of sync (e.g. + // panic mid-publish before `txs.extend` ran). Skip the bury + // entirely: freeing the entries slot here would let + // outpoint_spends point at a slot the next insert recycles + // for an unrelated tx. warn!( - "mempool bury: entry present but tx missing for txid={txid} - addr/outpoint state may be stale" + "mempool bury: entry present but tx missing for txid={txid} - skipping bury to preserve outpoint_spends integrity" ); return; - }; + } + let (idx, entry) = s.entries.remove(prefix).expect("entry present"); + let tx = s.txs.remove(&txid).expect("tx present"); s.info.remove(&tx, entry.fee); s.addrs.remove_tx(&tx, &txid); s.outpoint_spends.remove_spends(&tx, idx); diff --git a/crates/brk_mempool/src/steps/rebuilder/mod.rs b/crates/brk_mempool/src/steps/rebuilder/mod.rs index eda989dc0..3e12613cf 100644 --- a/crates/brk_mempool/src/steps/rebuilder/mod.rs +++ b/crates/brk_mempool/src/steps/rebuilder/mod.rs @@ -50,6 +50,7 @@ impl Rebuilder { return; } self.publish(Self::build_snapshot(client, state)); + self.dirty.store(false, Ordering::Release); self.rebuild_count.fetch_add(1, Ordering::Relaxed); } @@ -89,9 +90,9 @@ impl Rebuilder { } /// Returns true iff dirty and the throttle window has elapsed. On - /// success, clears the dirty bit and starts a new throttle window; - /// on failure, leaves all state untouched so the next cycle can - /// retry. + /// success, starts a new throttle window. The dirty bit is cleared + /// by `tick` only after `publish` returns, so a panic in + /// `build_snapshot` leaves dirty set and the next cycle retries. fn try_claim_rebuild(&self) -> bool { if !self.dirty.load(Ordering::Acquire) { self.skip_clean.fetch_add(1, Ordering::Relaxed); @@ -103,7 +104,6 @@ impl Rebuilder { return false; } *last = Some(Instant::now()); - self.dirty.store(false, Ordering::Release); true } diff --git a/crates/brk_mempool/src/stores/state.rs b/crates/brk_mempool/src/stores/state.rs index a1d30916e..a783ed96b 100644 --- a/crates/brk_mempool/src/stores/state.rs +++ b/crates/brk_mempool/src/stores/state.rs @@ -6,8 +6,20 @@ use super::{AddrTracker, EntryPool, OutpointSpends, TxGraveyard, TxStore}; /// The six buckets making up live mempool state. /// /// Each bucket has its own `RwLock` so readers of different buckets -/// don't contend with each other. The Applier takes all six write -/// locks in a fixed order for a brief window once per cycle. +/// don't contend with each other. Any code that takes more than one +/// lock must follow the canonical partial order +/// `info → txs → addrs → entries → outpoint_spends → graveyard`, +/// otherwise a reader-holds-A-wants-B / writer-holds-B-wants-A +/// circular wait can deadlock. The Applier takes all six write locks +/// in this order for a brief window once per cycle via +/// [`MempoolState::write_all`]; multi-lock readers inside the crate +/// take a (canonical-order) subset inline. +/// +/// This discipline is *internal* to `brk_mempool`: external crates +/// only see `Mempool` methods that bundle each multi-lock operation +/// behind a single call (e.g. `Mempool::lookup_spender`, +/// `Mempool::addr_txs`, `Mempool::rbf_for_tx`), so callers can never +/// take the order wrong because they don't get to choose. #[derive(Default)] pub struct MempoolState { pub(crate) info: RwLock, diff --git a/crates/brk_query/src/impl/addr.rs b/crates/brk_query/src/impl/addr.rs index bde9caa54..40fb80e43 100644 --- a/crates/brk_query/src/impl/addr.rs +++ b/crates/brk_query/src/impl/addr.rs @@ -4,8 +4,8 @@ use bitcoin::{Network, PublicKey, ScriptBuf}; use brk_error::{Error, OptionData, Result}; use brk_types::{ Addr, AddrBytes, AddrChainStats, AddrHash, AddrIndexOutPoint, AddrIndexTxIndex, AddrStats, - AnyAddrDataIndexEnum, Dollars, Height, OutputType, Timestamp, Transaction, TxIndex, TxStatus, - Txid, TxidPrefix, TypeIndex, Unit, Utxo, Vout, + AnyAddrDataIndexEnum, Dollars, Height, OutputType, Transaction, TxIndex, TxStatus, Txid, + TypeIndex, Unit, Utxo, Vout, }; use vecdb::VecIndex; @@ -80,7 +80,7 @@ impl Query { }, mempool_stats: self .mempool() - .and_then(|m| m.addrs().get(&bytes).map(|e| e.stats.clone())) + .and_then(|m| m.addr_stats(&bytes)) .unwrap_or_default(), }) } @@ -233,29 +233,7 @@ impl Query { pub fn addr_mempool_txs(&self, addr: &Addr, limit: usize) -> Result> { let bytes = AddrBytes::from_str(addr)?; let mempool = self.mempool().ok_or(Error::MempoolNotAvailable)?; - let addrs = mempool.addrs(); - let Some(entry) = addrs.get(&bytes) else { - return Ok(vec![]); - }; - let entries = mempool.entries(); - let mut ordered: Vec<(Timestamp, &Txid)> = entry - .txids - .iter() - .map(|txid| { - let first_seen = entries - .get(&TxidPrefix::from(txid)) - .map(|e| e.first_seen) - .unwrap_or_default(); - (first_seen, txid) - }) - .collect(); - ordered.sort_unstable_by_key(|b| std::cmp::Reverse(b.0)); - let txs = mempool.txs(); - Ok(ordered - .into_iter() - .filter_map(|(_, txid)| txs.get(txid).cloned()) - .take(limit) - .collect()) + Ok(mempool.addr_txs(&bytes, limit)) } /// Height of the last on-chain activity for an address (last tx_index → height). diff --git a/crates/brk_query/src/impl/cpfp.rs b/crates/brk_query/src/impl/cpfp.rs index 7c66e2810..6ed54daeb 100644 --- a/crates/brk_query/src/impl/cpfp.rs +++ b/crates/brk_query/src/impl/cpfp.rs @@ -62,16 +62,10 @@ impl Query { pub fn effective_fee_rate(&self, txid: &Txid) -> Result { let prefix = TxidPrefix::from(txid); - if let Some(mempool) = self.mempool() { - let entries = mempool.entries(); - if let Some(seed_idx) = entries.idx_of(&prefix) - && let Some(rate) = mempool.snapshot().chunk_rate_of(seed_idx) - { - return Ok(rate); - } - if let Some(entry) = entries.get(&prefix) { - return Ok(entry.fee_rate()); - } + if let Some(mempool) = self.mempool() + && let Some(rate) = mempool.live_effective_fee_rate(&prefix) + { + return Ok(rate); } if let Ok(idx) = self.resolve_tx_index(txid) @@ -87,9 +81,9 @@ impl Query { } if let Some(mempool) = self.mempool() - && let Some(tomb) = mempool.graveyard().get(txid) + && let Some(rate) = mempool.graveyard_fee_rate(txid) { - return Ok(tomb.entry.fee_rate()); + return Ok(rate); } Err(Error::UnknownTxid) diff --git a/crates/brk_query/src/impl/mempool.rs b/crates/brk_query/src/impl/mempool.rs index 805116baa..3dd6f6aea 100644 --- a/crates/brk_query/src/impl/mempool.rs +++ b/crates/brk_query/src/impl/mempool.rs @@ -1,11 +1,10 @@ use brk_error::{Error, Result}; -use brk_mempool::{EntryPool, Mempool, TxEntry, TxGraveyard, TxRemoval, TxStore, TxTombstone}; +use brk_mempool::{Mempool, RbfForTx, RbfNode}; use brk_types::{ - CheckedSub, MempoolBlock, MempoolInfo, MempoolRecentTx, OutputType, RbfResponse, RbfTx, - RecommendedFees, ReplacementNode, Sats, Timestamp, Transaction, TxOut, TxOutIndex, Txid, - TxidPrefix, TypeIndex, + CheckedSub, FeeRate, MempoolBlock, MempoolInfo, MempoolRecentTx, OutputType, RbfResponse, RbfTx, + RecommendedFees, ReplacementNode, Sats, Timestamp, TxOut, TxOutIndex, Txid, TxidPrefix, + TypeIndex, }; -use rustc_hash::FxHashSet; use vecdb::VecIndex; use crate::Query; @@ -22,8 +21,7 @@ impl Query { } pub fn mempool_txids(&self) -> Result> { - let txs = self.require_mempool()?.txs(); - Ok(txs.keys().cloned().collect()) + Ok(self.require_mempool()?.txids()) } pub fn recommended_fees(&self) -> Result { @@ -100,159 +98,89 @@ impl Query { } pub fn mempool_recent(&self) -> Result> { - Ok(self.require_mempool()?.txs().recent().to_vec()) + Ok(self.require_mempool()?.recent_txs()) } /// RBF history for a tx, matching mempool.space's - /// `GET /api/v1/tx/:txid/rbf`. Walks forward through the graveyard - /// to find the latest known replacer (tree root), then recursively - /// walks `predecessors_of` backward to build the tree. `replaces` - /// is the requested tx's own direct predecessors. + /// `GET /api/v1/tx/:txid/rbf`. The mempool builds the owned + /// replacement tree (terminal replacer + recursive predecessors) + /// under one read-lock window; this method then enriches each node + /// with `mined` + effective fee rate, both of which need the + /// indexer/computer. pub fn tx_rbf(&self, txid: &Txid) -> Result { - let mempool = self.require_mempool()?; - let txs = mempool.txs(); - let entries = mempool.entries(); - let graveyard = mempool.graveyard(); - - let root_txid = Self::walk_to_replacement_root(&graveyard, *txid); - - let replaces_vec: Vec = graveyard.predecessors_of(txid).map(|(p, _)| *p).collect(); - let replaces = (!replaces_vec.is_empty()).then_some(replaces_vec); - - let replacements = self.build_rbf_node(&root_txid, None, &txs, &entries, &graveyard); - + let RbfForTx { root, replaces } = self.require_mempool()?.rbf_for_tx(txid); + let replacements = root.map(|n| self.enrich_rbf_node(n, None)); + let replaces = (!replaces.is_empty()).then_some(replaces); Ok(RbfResponse { replacements, replaces, }) } - /// Walk forward through `Replaced { by }` links to the terminal - /// replacer of an RBF chain. Returns `txid` itself if it's already - /// the root. - fn walk_to_replacement_root(graveyard: &TxGraveyard, mut root: Txid) -> Txid { - while let Some(TxRemoval::Replaced { by }) = graveyard.get(&root).map(TxTombstone::reason) { - root = *by; - } - root - } - - /// Resolve a txid to the data we need for an `RbfTx`. The live - /// pool takes priority; the graveyard is the fallback. Returns - /// `None` if the tx has no known data in either. - fn resolve_rbf_node_data<'a>( - txid: &Txid, - txs: &'a TxStore, - entries: &'a EntryPool, - graveyard: &'a TxGraveyard, - ) -> Option<(&'a Transaction, &'a TxEntry)> { - if let (Some(tx), Some(entry)) = (txs.get(txid), entries.get(&TxidPrefix::from(txid))) { - return Some((tx, entry)); - } - graveyard.get(txid).map(|tomb| (&tomb.tx, &tomb.entry)) - } - - /// Recursively build an RBF tree node rooted at `txid`. - /// Predecessors are always in the graveyard (that's where - /// `Removal::Replaced` lives), so the recursion only needs the - /// graveyard; the live pool is consulted for the root. - /// - /// `rate` matches mempool.space's `tx.effectiveFeePerVsize` via - /// `Query::effective_fee_rate`, with a fall-back to the entry's - /// simple `fee/vsize` when the rate lookup fails. - fn build_rbf_node( - &self, - txid: &Txid, - successor_time: Option, - txs: &TxStore, - entries: &EntryPool, - graveyard: &TxGraveyard, - ) -> Option { - let (tx, entry) = Self::resolve_rbf_node_data(txid, txs, entries, graveyard)?; - - let replaces: Vec = graveyard - .predecessors_of(txid) - .filter_map(|(pred_txid, _)| { - self.build_rbf_node(pred_txid, Some(entry.first_seen), txs, entries, graveyard) - }) - .collect(); - - let full_rbf = replaces.iter().any(|c| !c.tx.rbf || c.full_rbf); - - let interval = successor_time - .and_then(|st| st.checked_sub(entry.first_seen)) - .map(|d| *d); - - let value: Sats = tx.output.iter().map(|o| o.value).sum(); - let mined = self.resolve_tx_index(txid).is_ok().then_some(true); - let rate = self - .effective_fee_rate(txid) - .unwrap_or_else(|_| entry.fee_rate()); - - Some(ReplacementNode { - tx: RbfTx { - txid: *txid, - fee: entry.fee, - vsize: entry.vsize, - value, - rate, - time: entry.first_seen, - rbf: entry.rbf, - full_rbf: Some(full_rbf), - mined, - }, - time: entry.first_seen, - full_rbf, - interval, - mined, - replaces, - }) - } - /// Recent RBF replacements across the whole mempool, matching /// mempool.space's `GET /api/v1/replacements` and /// `GET /api/v1/fullrbf/replacements`. Each entry is a complete /// replacement tree rooted at the terminal replacer; same shape as /// `tx_rbf().replacements`. Ordered by most-recent replacement - /// event first (matches mempool.space's reversed-`replacedBy` - /// iteration) and capped at 25 entries. When `full_rbf_only` is + /// event first and capped at 25 entries. When `full_rbf_only` is /// true, only trees with at least one non-signaling predecessor /// are returned. pub fn recent_replacements(&self, full_rbf_only: bool) -> Result> { - let mempool = self.require_mempool()?; - let txs = mempool.txs(); - let entries = mempool.entries(); - let graveyard = mempool.graveyard(); - - // A predecessor's `by` may itself be replaced; walk the chain - // forward to the terminal replacer for each tree, dedup so each - // tree is emitted once at its first (most recent) sighting. - let mut seen: FxHashSet = FxHashSet::default(); - Ok(graveyard - .replaced_iter_recent_first() - .filter_map(|(_, by)| { - let root = Self::walk_to_replacement_root(&graveyard, *by); - seen.insert(root).then_some(root) - }) - .filter_map(|root| self.build_rbf_node(&root, None, &txs, &entries, &graveyard)) - .filter(|node| !full_rbf_only || node.full_rbf) - .take(RECENT_REPLACEMENTS_LIMIT) + Ok(self + .require_mempool()? + .recent_rbf_trees(full_rbf_only, RECENT_REPLACEMENTS_LIMIT) + .into_iter() + .map(|n| self.enrich_rbf_node(n, None)) .collect()) } + /// Layer indexer-resident data (`mined`, effective fee rate) onto + /// a `RbfNode` tree. Runs after the mempool lock window has closed + /// because `effective_fee_rate` re-enters `Mempool` and would + /// recursively acquire the same read locks otherwise. + fn enrich_rbf_node( + &self, + node: RbfNode, + successor_time: Option, + ) -> ReplacementNode { + let interval = successor_time + .and_then(|st| st.checked_sub(node.first_seen)) + .map(|d| *d); + let mined = self.resolve_tx_index(&node.txid).is_ok().then_some(true); + let rate = self + .effective_fee_rate(&node.txid) + .unwrap_or_else(|_| FeeRate::from((node.fee, node.vsize))); + let first_seen = node.first_seen; + let replaces = node + .replaces + .into_iter() + .map(|child| self.enrich_rbf_node(child, Some(first_seen))) + .collect(); + ReplacementNode { + tx: RbfTx { + txid: node.txid, + fee: node.fee, + vsize: node.vsize, + value: node.value, + rate, + time: first_seen, + rbf: node.rbf, + full_rbf: Some(node.full_rbf), + mined, + }, + time: first_seen, + full_rbf: node.full_rbf, + interval, + mined, + replaces, + } + } + /// `first_seen` Unix-second timestamps for each txid, matching /// mempool.space's `POST /api/v1/transaction-times`. Returns 0 for /// unknown txids, in input order. pub fn transaction_times(&self, txids: &[Txid]) -> Result> { - let entries = self.require_mempool()?.entries(); - Ok(txids - .iter() - .map(|txid| { - entries - .get(&TxidPrefix::from(txid)) - .map_or(0, |e| u64::from(e.first_seen)) - }) - .collect()) + Ok(self.require_mempool()?.transaction_times(txids)) } /// Opaque content hash that changes whenever the projected next diff --git a/crates/brk_query/src/impl/price.rs b/crates/brk_query/src/impl/price.rs index c0340b289..3c10d7407 100644 --- a/crates/brk_query/src/impl/price.rs +++ b/crates/brk_query/src/impl/price.rs @@ -11,12 +11,7 @@ impl Query { let mut oracle = self.computer().prices.live_oracle(self.indexer())?; if let Some(mempool) = self.mempool() { - let txs = mempool.txs(); - oracle.process_outputs( - txs.values() - .flat_map(|tx| &tx.output) - .map(|txout| (txout.value, txout.type_())), - ); + mempool.process_live_outputs(|iter| oracle.process_outputs(iter)); } Ok(oracle.price_dollars()) diff --git a/crates/brk_query/src/impl/tx.rs b/crates/brk_query/src/impl/tx.rs index 049be6828..c8dc68179 100644 --- a/crates/brk_query/src/impl/tx.rs +++ b/crates/brk_query/src/impl/tx.rs @@ -97,21 +97,40 @@ impl Query { // ── Transaction queries ──────────────────────────────────────── - /// Map a mempool transaction by txid through `f`, returning `None` - /// if no mempool is attached or the txid is not in mempool. - fn map_mempool_tx(&self, txid: &Txid, f: impl FnOnce(&Transaction) -> R) -> Option { - self.mempool()?.txs().get(txid).map(f) + /// Resolve a tx body across the three sources in order: live mempool, + /// indexer (via `indexed`), then `Vanished` graveyard tombstone. + /// The graveyard fallback only fires when the indexer reports + /// `UnknownTxid`, covering the brief race where a mined tx has been + /// buried by `Applier` but `safe_lengths.tx_index` has not yet + /// advanced to cover it. `Replaced` tombstones are excluded — those + /// txs will never confirm. + fn lookup_tx( + &self, + txid: &Txid, + f: impl Fn(&Transaction) -> R, + indexed: impl FnOnce(TxIndex) -> Result, + ) -> Result { + if let Some(mempool) = self.mempool() + && let Some(r) = mempool.with_tx(txid, &f) + { + return Ok(r); + } + match self.resolve_tx_index_bounded(txid) { + Ok(idx) => indexed(idx), + Err(Error::UnknownTxid) => self + .mempool() + .and_then(|m| m.with_vanished_tx(txid, &f)) + .ok_or(Error::UnknownTxid), + Err(e) => Err(e), + } } pub fn transaction(&self, txid: &Txid) -> Result { - if let Some(tx) = self.map_mempool_tx(txid, Transaction::clone) { - return Ok(tx); - } - self.transaction_by_index(self.resolve_tx_index_bounded(txid)?) + self.lookup_tx(txid, Transaction::clone, |idx| self.transaction_by_index(idx)) } pub fn transaction_status(&self, txid: &Txid) -> Result { - if self.mempool().is_some_and(|m| m.txs().contains_key(txid)) { + if self.mempool().is_some_and(|m| m.contains_txid(txid)) { return Ok(TxStatus::UNCONFIRMED); } let (_, height) = self.resolve_tx(txid)?; @@ -119,23 +138,23 @@ impl Query { } pub fn transaction_raw(&self, txid: &Txid) -> Result> { - if let Some(bytes) = self.map_mempool_tx(txid, Transaction::encode_bytes) { - return Ok(bytes); - } - self.transaction_raw_by_index(self.resolve_tx_index_bounded(txid)?) + self.lookup_tx(txid, Transaction::encode_bytes, |idx| { + self.transaction_raw_by_index(idx) + }) } pub fn transaction_hex(&self, txid: &Txid) -> Result { - if let Some(hex) = self.map_mempool_tx(txid, |tx| tx.encode_bytes().to_lower_hex_string()) { - return Ok(hex); - } - self.transaction_hex_by_index(self.resolve_tx_index_bounded(txid)?) + self.lookup_tx( + txid, + |tx| tx.encode_bytes().to_lower_hex_string(), + |idx| self.transaction_hex_by_index(idx), + ) } // ── Outspend queries ─────────────────────────────────────────── pub fn outspend(&self, txid: &Txid, vout: Vout) -> Result { - if self.mempool().is_some_and(|m| m.txs().contains_key(txid)) { + if self.mempool().is_some_and(|m| m.contains_txid(txid)) { return Ok(self.mempool_outspend(txid, vout)); } let (_, first_txout, output_count) = self.resolve_tx_outputs(txid)?; @@ -151,7 +170,7 @@ impl Query { pub fn outspends(&self, txid: &Txid) -> Result> { if let Some(mempool) = self.mempool() - && let Some(output_count) = mempool.txs().get(txid).map(|tx| tx.output.len()) + && let Some(output_count) = mempool.with_tx(txid, |tx| tx.output.len()) { return Ok((0..output_count) .map(|i| self.mempool_outspend(txid, Vout::from(i))) diff --git a/crates/brk_server/src/state.rs b/crates/brk_server/src/state.rs index befc84598..b000ab15e 100644 --- a/crates/brk_server/src/state.rs +++ b/crates/brk_server/src/state.rs @@ -122,7 +122,7 @@ impl AppState { pub fn tx_strategy(&self, version: Version, txid: &Txid) -> CacheStrategy { self.sync(|q| { if let Some(mempool) = q.mempool() - && mempool.txs().contains(txid) + && mempool.contains_txid(txid) { return CacheStrategy::MempoolHash(mempool.next_block_hash()); } diff --git a/modules/brk-client/index.js b/modules/brk-client/index.js index d309f5753..2e204331f 100644 --- a/modules/brk-client/index.js +++ b/modules/brk-client/index.js @@ -10839,6 +10839,20 @@ class BrkClient extends BrkClientBase { return this.getJson(path, { signal, onValue }); } + /** + * Mempool content hash + * + * Returns an opaque `u64` that changes whenever the projected next block changes. Same value as the mempool ETag. Useful as a freshness/liveness signal: if it stays constant for tens of seconds on a live network, the mempool sync loop has stalled. + * + * Endpoint: `GET /api/mempool/hash` + * @param {{ signal?: AbortSignal, onValue?: (value: number) => void }} [options] + * @returns {Promise} + */ + async getMempoolHash({ signal, onValue } = {}) { + const path = `/api/mempool/hash`; + return this.getJson(path, { signal, onValue }); + } + /** * Live BTC/USD price * diff --git a/packages/brk_client/brk_client/__init__.py b/packages/brk_client/brk_client/__init__.py index a7b336037..29ea237e4 100644 --- a/packages/brk_client/brk_client/__init__.py +++ b/packages/brk_client/brk_client/__init__.py @@ -8033,6 +8033,14 @@ class BrkClient(BrkClientBase): Endpoint: `GET /api/mempool`""" return self.get_json('/api/mempool') + def get_mempool_hash(self) -> int: + """Mempool content hash. + + Returns an opaque `u64` that changes whenever the projected next block changes. Same value as the mempool ETag. Useful as a freshness/liveness signal: if it stays constant for tens of seconds on a live network, the mempool sync loop has stalled. + + Endpoint: `GET /api/mempool/hash`""" + return self.get_json('/api/mempool/hash') + def get_live_price(self) -> Dollars: """Live BTC/USD price.