mempool: fixes

This commit is contained in:
nym21
2026-05-07 11:21:37 +02:00
parent cb74087f27
commit 9347b42c9a
21 changed files with 518 additions and 239 deletions

View File

@@ -1,7 +1,7 @@
use std::{thread, time::Duration};
use brk_error::Result;
use brk_mempool::Mempool;
use brk_mempool::{Mempool, MempoolStats};
use brk_rpc::{Auth, Client};
fn main() -> Result<()> {
@@ -23,12 +23,7 @@ fn main() -> Result<()> {
loop {
thread::sleep(Duration::from_secs(5));
let info = mempool.info();
let entries = mempool.entries();
let txs = mempool.txs();
let addrs = mempool.addrs();
let graveyard = mempool.graveyard();
let outpoint_spends = mempool.state().outpoint_spends.read();
let stats = MempoolStats::from(&mempool);
let snapshot = mempool.snapshot();
let cluster_nodes_total: usize = snapshot.clusters.iter().map(|c| c.nodes.len()).sum();
@@ -42,16 +37,16 @@ fn main() -> Result<()> {
snap.clusters={} snap.cluster_nodes={} snap.cluster_of.len={} snap.cluster_of.active={} \
snap.blocks={} snap.blocks_txs={} \
rebuilds={} skip.clean={} skip.throttled={}",
info.count,
entries.entries().len(),
entries.active_count(),
entries.free_slots_count(),
txs.len(),
txs.unresolved().len(),
addrs.len(),
outpoint_spends.len(),
graveyard.tombstones_len(),
graveyard.order_len(),
stats.info_count,
stats.entry_slot_count,
stats.entry_active_count,
stats.entry_free_count,
stats.tx_count,
stats.unresolved_count,
stats.addr_count,
stats.outpoint_spend_count,
stats.graveyard_tombstone_count,
stats.graveyard_order_count,
snapshot.clusters.len(),
cluster_nodes_total,
snapshot.cluster_of_len(),

View File

@@ -162,7 +162,7 @@ impl<I> Cluster<I> {
}
}
debug_assert_eq!(out.len(), n, "cluster contained a cycle");
assert_eq!(out.len(), n, "cluster contained a cycle");
out
}

View File

@@ -22,17 +22,24 @@ use std::{
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{AddrBytes, MempoolInfo, OutpointPrefix, TxOut, Txid, TxidPrefix, Vin, Vout};
use brk_types::{
AddrBytes, AddrMempoolStats, FeeRate, MempoolInfo, MempoolRecentTx, OutpointPrefix, OutputType,
Sats, Timestamp, Transaction, TxOut, Txid, TxidPrefix, Vin, Vout,
};
use parking_lot::RwLockReadGuard;
use tracing::error;
pub mod cluster;
mod cpfp;
mod rbf;
mod stats;
pub(crate) mod steps;
pub(crate) mod stores;
#[cfg(test)]
mod tests;
pub use rbf::{RbfForTx, RbfNode};
pub use stats::MempoolStats;
use steps::{Applier, Fetcher, Preparer, Rebuilder, Resolver};
pub use steps::{BlockStats, RecommendedFees, Snapshot, TxEntry, TxRemoval};
use stores::{AddrTracker, MempoolState};
@@ -108,22 +115,145 @@ impl Mempool {
Some((spender_txid, Vin::from(vin_pos)))
}
pub fn txs(&self) -> RwLockReadGuard<'_, TxStore> {
pub(crate) fn txs(&self) -> RwLockReadGuard<'_, TxStore> {
self.0.state.txs.read()
}
pub fn entries(&self) -> RwLockReadGuard<'_, EntryPool> {
pub(crate) fn entries(&self) -> RwLockReadGuard<'_, EntryPool> {
self.0.state.entries.read()
}
pub fn addrs(&self) -> RwLockReadGuard<'_, AddrTracker> {
pub(crate) fn addrs(&self) -> RwLockReadGuard<'_, AddrTracker> {
self.0.state.addrs.read()
}
pub fn graveyard(&self) -> RwLockReadGuard<'_, TxGraveyard> {
pub(crate) fn graveyard(&self) -> RwLockReadGuard<'_, TxGraveyard> {
self.0.state.graveyard.read()
}
pub fn contains_txid(&self, txid: &Txid) -> bool {
self.txs().contains(txid)
}
/// Apply `f` to the live tx body if present.
pub fn with_tx<R>(&self, txid: &Txid, f: impl FnOnce(&Transaction) -> R) -> Option<R> {
self.txs().get(txid).map(f)
}
/// Apply `f` to a `Vanished` tombstone's tx body if present.
/// `Replaced` tombstones return `None` because the tx will not confirm.
pub fn with_vanished_tx<R>(
&self,
txid: &Txid,
f: impl FnOnce(&Transaction) -> R,
) -> Option<R> {
let graveyard = self.graveyard();
let tomb = graveyard.get(txid)?;
matches!(tomb.reason(), TxRemoval::Vanished).then(|| f(&tomb.tx))
}
/// Snapshot of all live mempool txids.
pub fn txids(&self) -> Vec<Txid> {
self.txs().keys().cloned().collect()
}
/// Snapshot of recent live txs.
pub fn recent_txs(&self) -> Vec<MempoolRecentTx> {
self.txs().recent().to_vec()
}
/// Per-address mempool stats. `None` if the address has no live mempool activity.
pub fn addr_stats(&self, addr: &AddrBytes) -> Option<AddrMempoolStats> {
self.addrs().get(addr).map(|e| e.stats.clone())
}
/// Live mempool txs touching `addr`, newest first by `first_seen`,
/// capped at `limit`. Acquires `txs`, `addrs`, `entries` in canonical
/// order; returns owned `Transaction`s so the lock is released
/// before the caller does anything else with them.
pub fn addr_txs(&self, addr: &AddrBytes, limit: usize) -> Vec<Transaction> {
let txs = self.txs();
let addrs = self.addrs();
let entries = self.entries();
let Some(entry) = addrs.get(addr) else {
return vec![];
};
let mut ordered: Vec<(Timestamp, &Txid)> = entry
.txids
.iter()
.map(|txid| {
let first_seen = entries
.get(&TxidPrefix::from(txid))
.map(|e| e.first_seen)
.unwrap_or_default();
(first_seen, txid)
})
.collect();
ordered.sort_unstable_by_key(|b| std::cmp::Reverse(b.0));
ordered
.into_iter()
.filter_map(|(_, txid)| txs.get(txid).cloned())
.take(limit)
.collect()
}
/// Apply `f` to an iterator over `(value, output_type)` for every output
/// of every live mempool tx. The lock is held for the duration of the call.
pub fn process_live_outputs<R>(
&self,
f: impl FnOnce(&mut dyn Iterator<Item = (Sats, OutputType)>) -> R,
) -> R {
let txs = self.txs();
let mut iter = txs
.values()
.flat_map(|tx| &tx.output)
.map(|txout| (txout.value, txout.type_()));
f(&mut iter)
}
/// Effective fee rate for a live mempool tx: the seed's chunk rate from
/// the latest snapshot, with fall-back to the entry's simple `fee/vsize`
/// when the snapshot doesn't yet contain it.
pub fn live_effective_fee_rate(&self, prefix: &TxidPrefix) -> Option<FeeRate> {
let entries = self.entries();
if let Some(seed_idx) = entries.idx_of(prefix)
&& let Some(rate) = self.snapshot().chunk_rate_of(seed_idx)
{
return Some(rate);
}
entries.get(prefix).map(|e| e.fee_rate())
}
/// Fee rate snapshotted into a graveyard tomb at burial.
pub fn graveyard_fee_rate(&self, txid: &Txid) -> Option<FeeRate> {
self.graveyard()
.get(txid)
.map(|tomb| tomb.entry.fee_rate())
}
/// `first_seen` Unix-second timestamps for `txids`, in input order.
/// Returns 0 for unknown txids. `Vanished` graveyard tombstones fall
/// back to the buried entry's `first_seen` so a tx doesn't flicker
/// to 0 in the brief window between mempool drop and indexer catch-up.
pub fn transaction_times(&self, txids: &[Txid]) -> Vec<u64> {
let entries = self.entries();
let graveyard = self.graveyard();
txids
.iter()
.map(|txid| {
if let Some(e) = entries.get(&TxidPrefix::from(txid)) {
return u64::from(e.first_seen);
}
if let Some(tomb) = graveyard.get(txid)
&& matches!(tomb.reason(), TxRemoval::Vanished)
{
return u64::from(tomb.entry.first_seen);
}
0
})
.collect()
}
/// Infinite update loop with a 1 second interval.
pub fn start(&self) {
self.start_with(|| {});
@@ -185,7 +315,7 @@ impl Mempool {
Ok(())
}
pub fn state(&self) -> &MempoolState {
pub(crate) fn state(&self) -> &MempoolState {
&self.0.state
}
}

View File

@@ -0,0 +1,132 @@
//! RBF (Replace-By-Fee) tree extraction from the live mempool +
//! graveyard.
//!
//! Both methods return owned, lock-free `RbfNode` trees so the caller
//! (typically `brk_query`) can enrich each node with indexer-resident
//! data (`mined`, effective fee rate) after the mempool lock window
//! closes. Doing the enrichment under the lock would re-enter
//! `Mempool` indirectly via `effective_fee_rate` and recursively
//! acquire the same `entries`/`graveyard` read locks, which can
//! deadlock against a queued writer in `parking_lot`.
use brk_types::{Sats, Timestamp, Transaction, Txid, TxidPrefix, VSize};
use rustc_hash::FxHashSet;
use crate::{
Mempool, TxEntry, TxRemoval, TxStore,
stores::{EntryPool, TxGraveyard},
};
/// One node in an RBF replacement tree, populated entirely from
/// mempool state. The caller layers on `mined` and effective fee rate
/// after the lock has been released.
#[derive(Debug, Clone)]
pub struct RbfNode {
pub txid: Txid,
pub fee: Sats,
pub vsize: VSize,
/// Sum of the tx's output amounts.
pub value: Sats,
pub first_seen: Timestamp,
/// BIP-125 signaling: at least one input has sequence < 0xffffffff-1.
pub rbf: bool,
/// `true` iff any predecessor in this subtree was non-signaling.
pub full_rbf: bool,
pub replaces: Vec<RbfNode>,
}
/// Result of [`Mempool::rbf_for_tx`].
#[derive(Debug, Clone, Default)]
pub struct RbfForTx {
/// Tree rooted at the requested tx's terminal replacer. `None` if
/// the tx is unknown to both the live pool and the graveyard.
pub root: Option<RbfNode>,
/// Direct predecessors of the requested tx (txids only).
pub replaces: Vec<Txid>,
}
impl Mempool {
/// Build the RBF tree relevant to `txid`: walk forward through
/// `Replaced { by }` links to the terminal replacer, return its
/// full predecessor tree, plus the requested tx's own direct
/// predecessors. Single read-lock window in canonical order.
pub fn rbf_for_tx(&self, txid: &Txid) -> RbfForTx {
let txs = self.txs();
let entries = self.entries();
let graveyard = self.graveyard();
let root_txid = walk_to_replacement_root(&graveyard, *txid);
let replaces: Vec<Txid> = graveyard.predecessors_of(txid).map(|(p, _)| *p).collect();
let root = build_node(&root_txid, &txs, &entries, &graveyard);
RbfForTx { root, replaces }
}
/// Recent terminal-replacer trees, most-recent replacement first,
/// deduplicated by tree root, capped at `limit`. When
/// `full_rbf_only` is true, drops trees with no non-signaling
/// predecessor anywhere.
pub fn recent_rbf_trees(&self, full_rbf_only: bool, limit: usize) -> Vec<RbfNode> {
let txs = self.txs();
let entries = self.entries();
let graveyard = self.graveyard();
let mut seen: FxHashSet<Txid> = FxHashSet::default();
graveyard
.replaced_iter_recent_first()
.filter_map(|(_, by)| {
let root = walk_to_replacement_root(&graveyard, *by);
seen.insert(root).then_some(root)
})
.filter_map(|root| build_node(&root, &txs, &entries, &graveyard))
.filter(|n| !full_rbf_only || n.full_rbf)
.take(limit)
.collect()
}
}
fn walk_to_replacement_root(graveyard: &TxGraveyard, mut root: Txid) -> Txid {
while let Some(TxRemoval::Replaced { by }) = graveyard.get(&root).map(|t| t.reason()) {
root = *by;
}
root
}
fn build_node(
txid: &Txid,
txs: &TxStore,
entries: &EntryPool,
graveyard: &TxGraveyard,
) -> Option<RbfNode> {
let (tx, entry) = resolve_node(txid, txs, entries, graveyard)?;
let replaces: Vec<RbfNode> = graveyard
.predecessors_of(txid)
.filter_map(|(pred, _)| build_node(pred, txs, entries, graveyard))
.collect();
let full_rbf = replaces.iter().any(|c| !c.rbf || c.full_rbf);
let value: Sats = tx.output.iter().map(|o| o.value).sum();
Some(RbfNode {
txid: *txid,
fee: entry.fee,
vsize: entry.vsize,
value,
first_seen: entry.first_seen,
rbf: entry.rbf,
full_rbf,
replaces,
})
}
fn resolve_node<'a>(
txid: &Txid,
txs: &'a TxStore,
entries: &'a EntryPool,
graveyard: &'a TxGraveyard,
) -> Option<(&'a Transaction, &'a TxEntry)> {
if let (Some(tx), Some(entry)) = (txs.get(txid), entries.get(&TxidPrefix::from(txid))) {
return Some((tx, entry));
}
graveyard.get(txid).map(|tomb| (&tomb.tx, &tomb.entry))
}

View File

@@ -0,0 +1,43 @@
//! Owned snapshot of mempool in-memory counters for diagnostic display.
use crate::Mempool;
#[derive(Debug, Clone)]
pub struct MempoolStats {
pub info_count: usize,
pub tx_count: usize,
pub unresolved_count: usize,
pub addr_count: usize,
pub entry_slot_count: usize,
pub entry_active_count: usize,
pub entry_free_count: usize,
pub outpoint_spend_count: usize,
pub graveyard_tombstone_count: usize,
pub graveyard_order_count: usize,
}
impl From<&Mempool> for MempoolStats {
/// Acquires every sub-lock in canonical order to build a coherent
/// snapshot. Cheap; locks are released as soon as the counts are read.
fn from(mempool: &Mempool) -> Self {
let state = mempool.state();
let info = state.info.read();
let txs = state.txs.read();
let addrs = state.addrs.read();
let entries = state.entries.read();
let outpoint_spends = state.outpoint_spends.read();
let graveyard = state.graveyard.read();
Self {
info_count: info.count,
tx_count: txs.len(),
unresolved_count: txs.unresolved().len(),
addr_count: addrs.len(),
entry_slot_count: entries.entries().len(),
entry_active_count: entries.active_count(),
entry_free_count: entries.free_slots_count(),
outpoint_spend_count: outpoint_spends.len(),
graveyard_tombstone_count: graveyard.tombstones_len(),
graveyard_order_count: graveyard.order_len(),
}
}
}

View File

@@ -33,24 +33,25 @@ impl Applier {
}
fn bury_one(s: &mut LockedState, prefix: &TxidPrefix, reason: TxRemoval) {
let Some((idx, entry)) = s.entries.remove(prefix) else {
let Some(txid) = s.entries.get(prefix).map(|e| e.txid) else {
return;
};
let txid = entry.txid;
let Some(tx) = s.txs.remove(&txid) else {
if !s.txs.contains(&txid) {
// entries had this prefix but txs didn't — a state divergence
// that should be impossible: publish/bury both touch them
// together under one write_all guard. Reaching this branch
// means a prior cycle left the two stores out of sync (panic
// mid-publish, prefix collision, etc). The slot has been
// freed by entries.remove, but addrs/outpoint_spends/info may
// still hold stale references that we can't repair without
// the tx body. Log loudly so the corruption is visible.
// means a prior cycle left the two stores out of sync (e.g.
// panic mid-publish before `txs.extend` ran). Skip the bury
// entirely: freeing the entries slot here would let
// outpoint_spends point at a slot the next insert recycles
// for an unrelated tx.
warn!(
"mempool bury: entry present but tx missing for txid={txid} - addr/outpoint state may be stale"
"mempool bury: entry present but tx missing for txid={txid} - skipping bury to preserve outpoint_spends integrity"
);
return;
};
}
let (idx, entry) = s.entries.remove(prefix).expect("entry present");
let tx = s.txs.remove(&txid).expect("tx present");
s.info.remove(&tx, entry.fee);
s.addrs.remove_tx(&tx, &txid);
s.outpoint_spends.remove_spends(&tx, idx);

View File

@@ -50,6 +50,7 @@ impl Rebuilder {
return;
}
self.publish(Self::build_snapshot(client, state));
self.dirty.store(false, Ordering::Release);
self.rebuild_count.fetch_add(1, Ordering::Relaxed);
}
@@ -89,9 +90,9 @@ impl Rebuilder {
}
/// Returns true iff dirty and the throttle window has elapsed. On
/// success, clears the dirty bit and starts a new throttle window;
/// on failure, leaves all state untouched so the next cycle can
/// retry.
/// success, starts a new throttle window. The dirty bit is cleared
/// by `tick` only after `publish` returns, so a panic in
/// `build_snapshot` leaves dirty set and the next cycle retries.
fn try_claim_rebuild(&self) -> bool {
if !self.dirty.load(Ordering::Acquire) {
self.skip_clean.fetch_add(1, Ordering::Relaxed);
@@ -103,7 +104,6 @@ impl Rebuilder {
return false;
}
*last = Some(Instant::now());
self.dirty.store(false, Ordering::Release);
true
}

View File

@@ -6,8 +6,20 @@ use super::{AddrTracker, EntryPool, OutpointSpends, TxGraveyard, TxStore};
/// The six buckets making up live mempool state.
///
/// Each bucket has its own `RwLock` so readers of different buckets
/// don't contend with each other. The Applier takes all six write
/// locks in a fixed order for a brief window once per cycle.
/// don't contend with each other. Any code that takes more than one
/// lock must follow the canonical partial order
/// `info → txs → addrs → entries → outpoint_spends → graveyard`,
/// otherwise a reader-holds-A-wants-B / writer-holds-B-wants-A
/// circular wait can deadlock. The Applier takes all six write locks
/// in this order for a brief window once per cycle via
/// [`MempoolState::write_all`]; multi-lock readers inside the crate
/// take a (canonical-order) subset inline.
///
/// This discipline is *internal* to `brk_mempool`: external crates
/// only see `Mempool` methods that bundle each multi-lock operation
/// behind a single call (e.g. `Mempool::lookup_spender`,
/// `Mempool::addr_txs`, `Mempool::rbf_for_tx`), so callers can never
/// take the order wrong because they don't get to choose.
#[derive(Default)]
pub struct MempoolState {
pub(crate) info: RwLock<MempoolInfo>,