mempool: snap

This commit is contained in:
nym21
2026-05-10 00:24:02 +02:00
parent c52a076bfc
commit fe5f30bca6
36 changed files with 847 additions and 720 deletions

View File

@@ -0,0 +1,138 @@
//! Snapshot-side cluster primitives: connected-component discovery
//! over `SnapTx` adjacency, topological ordering, and the glue that
//! feeds the cluster into [`brk_types::linearize`] (Single Fee
//! Linearization).
//!
//! A *cluster* is the connected component of a tx in the dependency
//! graph (`parents children`), bounded by Core 31's
//! `MAX_CLUSTER_COUNT_LIMIT = 64`. The SFL algorithm itself lives in
//! `brk_types` since it has no mempool deps and is shared with the
//! confirmed-cpfp path in `brk_query`.
use std::collections::VecDeque;
use brk_types::{ChunkInput, CpfpClusterChunk, CpfpClusterTxIndex, linearize};
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use smallvec::SmallVec;
use crate::steps::{SnapTx, TxIndex};
/// Cluster cap matches Bitcoin Core 31's `MAX_CLUSTER_COUNT_LIMIT`. Any
/// connected component above this size is malformed under Core's policy
/// and gets truncated.
pub(crate) const MAX_CLUSTER: usize = 64;
/// Capped DFS over the undirected dependency graph (`parents
/// children`) starting from `seed`. Returns the connected component
/// truncated to `MAX_CLUSTER`, with `seed` at index 0.
pub(crate) fn walk_cluster(txs: &[SnapTx], seed: TxIndex) -> Vec<TxIndex> {
if txs.get(seed.as_usize()).is_none() {
return Vec::new();
}
let mut visited: FxHashSet<TxIndex> =
FxHashSet::with_capacity_and_hasher(MAX_CLUSTER, FxBuildHasher);
visited.insert(seed);
let mut out: Vec<TxIndex> = Vec::with_capacity(MAX_CLUSTER);
out.push(seed);
let mut stack: Vec<TxIndex> = vec![seed];
while let Some(idx) = stack.pop() {
let Some(t) = txs.get(idx.as_usize()) else {
continue;
};
for &n in t.parents.iter().chain(t.children.iter()) {
if out.len() >= MAX_CLUSTER {
return out;
}
if visited.insert(n) {
out.push(n);
stack.push(n);
}
}
}
out
}
/// Linearize the connected component into chunks. Topo-sorts members,
/// remaps parent edges to cluster-local indices, and runs SFL. Returns
/// `(members, chunks)` where `members` is the topo-ordered `TxIndex`
/// list and `chunks[*].txs` are local indices into `members`. Callers
/// must filter singletons before calling - the singleton's `chunk_rate`
/// is `fee/vsize`, set elsewhere.
pub(crate) fn linearize_component(
txs: &[SnapTx],
component: &[TxIndex],
) -> (Vec<TxIndex>, Vec<CpfpClusterChunk>) {
let members = topo_sort(txs, component);
let local_of = build_local_index(&members);
let parents_local: Vec<SmallVec<[CpfpClusterTxIndex; 2]>> = members
.iter()
.map(|idx| {
txs[idx.as_usize()]
.parents
.iter()
.filter_map(|p| local_of.get(p).copied())
.collect()
})
.collect();
let inputs: Vec<ChunkInput<'_>> = members
.iter()
.zip(&parents_local)
.map(|(idx, ps)| {
let t = &txs[idx.as_usize()];
ChunkInput {
fee: t.fee,
vsize: t.vsize,
parents: ps.as_slice(),
}
})
.collect();
let chunks = linearize(&inputs);
(members, chunks)
}
/// Kahn's topological sort over the connected component, restricted to
/// in-cluster parent edges. Returns members in an order where every tx
/// follows all its in-cluster parents.
fn topo_sort(txs: &[SnapTx], component: &[TxIndex]) -> Vec<TxIndex> {
let n = component.len();
let pos: FxHashMap<TxIndex, usize> = component
.iter()
.enumerate()
.map(|(i, &x)| (x, i))
.collect();
let mut indeg: Vec<u32> = vec![0; n];
let mut children: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, &idx) in component.iter().enumerate() {
let Some(t) = txs.get(idx.as_usize()) else {
continue;
};
indeg[i] = t.parents.iter().filter(|p| pos.contains_key(p)).count() as u32;
for &c in t.children.iter() {
if let Some(&ci) = pos.get(&c) {
children[i].push(ci);
}
}
}
let mut queue: VecDeque<usize> = (0..n).filter(|&i| indeg[i] == 0).collect();
let mut out: Vec<TxIndex> = Vec::with_capacity(n);
while let Some(i) = queue.pop_front() {
out.push(component[i]);
for &c in &children[i] {
indeg[c] -= 1;
if indeg[c] == 0 {
queue.push_back(c);
}
}
}
out
}
/// `members[i]`'s wire index, keyed by snapshot `TxIndex`. Built once
/// so per-tx parent edges can be remapped without a linear scan.
pub(crate) fn build_local_index(members: &[TxIndex]) -> FxHashMap<TxIndex, CpfpClusterTxIndex> {
members
.iter()
.enumerate()
.map(|(i, &idx)| (idx, CpfpClusterTxIndex::from(i as u32)))
.collect()
}

View File

@@ -1,48 +1,24 @@
//! CPFP (Child Pays For Parent) walk over a `Snapshot`'s adjacency.
//!
//! The snapshot stores per-tx parent/child edges in `TxIndex` space and
//! per-tx `(fee, vsize)` we need for chunking.
//!
//! Three independent walks:
//! - `ancestors_idx`: capped DFS up `parents` only.
//! - `descendants_idx`: capped DFS down `children` only.
//! - cluster `members`: capped DFS over `parents children`, i.e. the
//! connected component of the seed in the in-mempool dependency
//! graph. Required to match Core 31's cluster mempool semantics:
//! siblings (sharing a parent) and cousins (sharing a descendant)
//! belong to the same cluster but are missed by ancestor/descendant
//! walks alone.
//!
//! The cluster is then linearized via `brk_types::linearize` (single fee
//! linearization) so chunks reflect Core's CPFP "lift": a child whose
//! rate exceeds its parent's gets folded into a chunk with the parent
//! at the combined feerate. The seed's chunk feerate is what
//! `effective_fee_per_vsize` reports.
use std::collections::VecDeque;
//! - `ancestors`: capped DFS up `parents` only.
//! - `descendants`: capped DFS down `children` only.
//! - cluster: the connected component over `parents children`,
//! linearized via [`crate::cluster`] for the cluster wire shape and
//! the seed's chunk feerate.
use brk_types::{
CpfpCluster, CpfpClusterChunk, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate,
SigOps, TxidPrefix, VSize,
CPFP_CHAIN_LIMIT, CpfpCluster, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate,
SigOps, TxidPrefix, VSize, find_seed_chunk,
};
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use rustc_hash::{FxBuildHasher, FxHashSet};
use crate::{
Mempool,
chunking::{ChunkInput, linearize},
cluster::{build_local_index, linearize_component, walk_cluster},
steps::{SnapTx, TxIndex},
};
/// Cap matches Bitcoin Core's default mempool ancestor/descendant
/// chain limits and mempool.space's truncation.
const MAX: usize = 25;
/// Cluster cap matches Bitcoin Core 31's `MAX_CLUSTER_COUNT_LIMIT`
/// (max txs in a single cluster-mempool cluster). Sized large enough
/// to hold the whole connected component for any policy-conformant
/// cluster, then truncated.
const MAX_CLUSTER: usize = 64;
impl Mempool {
/// CPFP info for a live mempool tx. Returns `None` only when the
/// tx isn't in the mempool, so callers can fall through to the
@@ -92,39 +68,30 @@ fn build_cpfp_info(
}
}
/// Walk the graph from `seed` along `next` and lift the visited indices
/// into wire-shape `CpfpEntry`s in one go.
/// Capped DFS from `seed` (exclusive) along `next`, lifted directly to
/// wire-shape `CpfpEntry`s. Used for both ancestor and descendant walks.
fn collect_entries(
txs: &[SnapTx],
seed: TxIndex,
next: impl Fn(&SnapTx) -> &[TxIndex],
) -> Vec<CpfpEntry> {
walk(txs, seed, next)
.iter()
.filter_map(|&i| txs.get(i.as_usize()).map(CpfpEntry::from))
.collect()
}
/// Capped DFS from `seed` (exclusive), following the neighbors yielded
/// by `next`. Used for both the ancestor and descendant walks.
fn walk(txs: &[SnapTx], seed: TxIndex, next: impl Fn(&SnapTx) -> &[TxIndex]) -> Vec<TxIndex> {
let Some(seed_node) = txs.get(seed.as_usize()) else {
return Vec::new();
};
let mut visited: FxHashSet<TxIndex> =
FxHashSet::with_capacity_and_hasher(MAX + 1, FxBuildHasher);
FxHashSet::with_capacity_and_hasher(CPFP_CHAIN_LIMIT + 1, FxBuildHasher);
visited.insert(seed);
let mut out: Vec<TxIndex> = Vec::with_capacity(MAX);
let mut out: Vec<CpfpEntry> = Vec::with_capacity(CPFP_CHAIN_LIMIT);
let mut stack: Vec<TxIndex> = next(seed_node).to_vec();
while let Some(idx) = stack.pop() {
if out.len() >= MAX {
if out.len() >= CPFP_CHAIN_LIMIT {
break;
}
if !visited.insert(idx) {
continue;
}
out.push(idx);
if let Some(t) = txs.get(idx.as_usize()) {
out.push(CpfpEntry::from(t));
stack.extend(next(t).iter().copied());
}
}
@@ -132,8 +99,8 @@ fn walk(txs: &[SnapTx], seed: TxIndex, next: impl Fn(&SnapTx) -> &[TxIndex]) ->
}
/// Wire-shape `CpfpCluster` plus the seed's chunk feerate. Members are
/// the connected component of the seed in the dependency graph, then
/// topologically sorted (parents before children) so wire indices and
/// the connected component of the seed in the dependency graph,
/// topologically ordered (parents before children) so wire indices and
/// chunk-internal ordering are valid for client-side reconstruction.
/// Returns `(None, seed_per_tx_rate)` for singletons (matches
/// mempool.space, which omits `cluster` when no relations exist).
@@ -148,12 +115,15 @@ fn build_cluster(
return (None, seed_per_tx_rate);
}
let members = topo_sort(txs, &component);
let local_of = build_local_index(&members);
let (cluster_txs, vsizes) = collect_cluster_members(txs, &members, &local_of);
let chunks = linearize_cluster(&cluster_txs, &vsizes);
let (chunk_index, seed_chunk_rate) =
locate_seed_chunk(local_of[&seed_idx], &chunks, seed_per_tx_rate);
let (members, chunks) = linearize_component(txs, &component);
let cluster_txs = build_wire_members(txs, &members);
let seed_local = CpfpClusterTxIndex::from(
members
.iter()
.position(|&i| i == seed_idx)
.map_or(0, |p| p as u32),
);
let (chunk_index, seed_chunk_rate) = find_seed_chunk(&chunks, seed_local, seed_per_tx_rate);
(
Some(CpfpCluster {
@@ -165,141 +135,25 @@ fn build_cluster(
)
}
/// `members[i]`'s wire index, keyed by snapshot `TxIndex`. Built once
/// so per-tx parent edges can be remapped without a linear scan.
fn build_local_index(members: &[TxIndex]) -> FxHashMap<TxIndex, CpfpClusterTxIndex> {
/// Materialize wire-shape `CpfpClusterTx`s for every topo-ordered
/// member with parent edges remapped to local indices.
fn build_wire_members(txs: &[SnapTx], members: &[TxIndex]) -> Vec<CpfpClusterTx> {
let local_of = build_local_index(members);
members
.iter()
.enumerate()
.map(|(i, &idx)| (idx, CpfpClusterTxIndex::from(i as u32)))
.map(|&idx| {
let t = &txs[idx.as_usize()];
CpfpClusterTx {
txid: t.txid,
weight: t.weight,
fee: t.fee,
parents: t
.parents
.iter()
.filter_map(|p| local_of.get(p).copied())
.collect(),
}
})
.collect()
}
/// Materialize wire-shape `CpfpClusterTx`s for every member with parent
/// edges remapped to local indices, plus the parallel `vsize` column the
/// linearizer needs (not carried on `CpfpClusterTx`, which only stores
/// weight).
fn collect_cluster_members(
txs: &[SnapTx],
members: &[TxIndex],
local_of: &FxHashMap<TxIndex, CpfpClusterTxIndex>,
) -> (Vec<CpfpClusterTx>, Vec<VSize>) {
let mut cluster_txs: Vec<CpfpClusterTx> = Vec::with_capacity(members.len());
let mut vsizes: Vec<VSize> = Vec::with_capacity(members.len());
for &idx in members {
let Some(t) = txs.get(idx.as_usize()) else {
continue;
};
let parents: Vec<CpfpClusterTxIndex> = t
.parents
.iter()
.filter_map(|p| local_of.get(p).copied())
.collect();
cluster_txs.push(CpfpClusterTx {
txid: t.txid,
weight: t.weight,
fee: t.fee,
parents,
});
vsizes.push(t.vsize);
}
(cluster_txs, vsizes)
}
/// Single-fee-linearize the cluster, borrowing parents from the
/// already-built `cluster_txs` so no re-allocation is needed.
fn linearize_cluster(cluster_txs: &[CpfpClusterTx], vsizes: &[VSize]) -> Vec<CpfpClusterChunk> {
let inputs: Vec<ChunkInput<'_>> = cluster_txs
.iter()
.zip(vsizes)
.map(|(c, &vsize)| ChunkInput {
fee: c.fee,
vsize,
parents: &c.parents,
})
.collect();
linearize(&inputs)
}
/// Find the chunk containing the seed and return its index plus rate.
/// Falls back to `(0, seed_per_tx_rate)` when the seed isn't in any
/// chunk - shouldn't happen but keeps the wire shape valid.
fn locate_seed_chunk(
seed_local: CpfpClusterTxIndex,
chunks: &[CpfpClusterChunk],
seed_per_tx_rate: FeeRate,
) -> (u32, FeeRate) {
chunks
.iter()
.enumerate()
.find(|(_, ch)| ch.txs.contains(&seed_local))
.map(|(i, ch)| (i as u32, ch.feerate))
.unwrap_or((0, seed_per_tx_rate))
}
/// Capped DFS over the undirected dependency graph (`parents
/// children`) starting from `seed`. Returns the connected component
/// truncated to `MAX_CLUSTER`, with `seed` at index 0.
fn walk_cluster(txs: &[SnapTx], seed: TxIndex) -> Vec<TxIndex> {
if txs.get(seed.as_usize()).is_none() {
return Vec::new();
}
let mut visited: FxHashSet<TxIndex> =
FxHashSet::with_capacity_and_hasher(MAX_CLUSTER, FxBuildHasher);
visited.insert(seed);
let mut out: Vec<TxIndex> = Vec::with_capacity(MAX_CLUSTER);
out.push(seed);
let mut stack: Vec<TxIndex> = vec![seed];
while let Some(idx) = stack.pop() {
let Some(t) = txs.get(idx.as_usize()) else {
continue;
};
for &n in t.parents.iter().chain(t.children.iter()) {
if out.len() >= MAX_CLUSTER {
return out;
}
if visited.insert(n) {
out.push(n);
stack.push(n);
}
}
}
out
}
/// Kahn's topological sort over the connected component, restricted to
/// in-cluster parent edges. Returns members in an order where every tx
/// follows all its in-cluster parents.
fn topo_sort(txs: &[SnapTx], component: &[TxIndex]) -> Vec<TxIndex> {
let n = component.len();
let pos: FxHashMap<TxIndex, usize> = component
.iter()
.enumerate()
.map(|(i, &x)| (x, i))
.collect();
let mut indeg: Vec<u32> = vec![0; n];
let mut children: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, &idx) in component.iter().enumerate() {
let Some(t) = txs.get(idx.as_usize()) else {
continue;
};
indeg[i] = t.parents.iter().filter(|p| pos.contains_key(p)).count() as u32;
for &c in t.children.iter() {
if let Some(&ci) = pos.get(&c) {
children[i].push(ci);
}
}
}
let mut queue: VecDeque<usize> = (0..n).filter(|&i| indeg[i] == 0).collect();
let mut out: Vec<TxIndex> = Vec::with_capacity(n);
while let Some(i) = queue.pop_front() {
out.push(component[i]);
for &c in &children[i] {
indeg[c] -= 1;
if indeg[c] == 0 {
queue.push_back(c);
}
}
}
out
}

View File

@@ -18,15 +18,15 @@ pub struct MempoolStats {
impl From<&Mempool> for MempoolStats {
fn from(mempool: &Mempool) -> Self {
let inner = mempool.read();
let state = mempool.read();
let rebuilder = mempool.rebuilder();
Self {
txs: inner.txs.len(),
unresolved: inner.txs.unresolved().len(),
addrs: inner.addrs.len(),
outpoint_spends: inner.outpoint_spends.len(),
graveyard_tombstones: inner.graveyard.tombstones_len(),
graveyard_order: inner.graveyard.order_len(),
txs: state.txs.len(),
unresolved: state.txs.unresolved().len(),
addrs: state.addrs.len(),
outpoint_spends: state.outpoint_spends.len(),
graveyard_tombstones: state.graveyard.tombstones_len(),
graveyard_order: state.graveyard.order_len(),
rebuilds: rebuilder.rebuild_count(),
skip_cleans: rebuilder.skip_clean_count(),
}

View File

@@ -3,10 +3,10 @@
//! One pull cycle, five steps:
//!
//! 1. [`steps::fetcher::Fetcher`] - one mixed batched RPC for
//! `getrawmempool verbose` + `getblocktemplate` + `getmempoolinfo`,
//! then a second batch for `getrawtransaction` on new entries. The
//! GBT is validated to be a subset of the verbose listing; on
//! mismatch the cycle is skipped.
//! `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`,
//! then a `getmempoolentry` batch and a `getrawtransaction` batch
//! on new txids only. The GBT is validated to be a subset of the
//! txid listing; on mismatch the cycle is skipped.
//! 2. [`steps::preparer::Preparer`] - decode and classify into
//! `TxsPulled { added, removed }`. Pure CPU.
//! 3. [`steps::applier::Applier`] - apply the diff to
@@ -38,7 +38,7 @@ use rustc_hash::FxHashSet;
use parking_lot::{RwLock, RwLockReadGuard};
use tracing::error;
pub mod chunking;
mod cluster;
mod cpfp;
mod diagnostics;
mod rbf;
@@ -46,12 +46,12 @@ mod state;
pub(crate) mod steps;
pub(crate) mod stores;
pub use chunking::{ChunkInput, linearize};
pub use diagnostics::MempoolStats;
pub use rbf::{RbfForTx, RbfNode};
pub use steps::Snapshot;
use steps::{Applier, Fetched, Fetcher, Preparer, Prevouts, Rebuilder};
pub use steps::{BlockStats, RecommendedFees, Snapshot, TxEntry, TxRemoval};
pub use stores::{TxGraveyard, TxStore, TxTombstone};
pub(crate) use steps::{BlockStats, RecommendedFees, TxEntry, TxRemoval};
pub(crate) use stores::{TxStore, TxTombstone};
/// Confirmed-parent prevout resolver passed to [`Mempool::update_with`] /
/// [`Mempool::start_with`]. Receives `(parent_txid, vout)`, returns the
@@ -62,9 +62,9 @@ pub(crate) use state::State;
/// Cheaply cloneable: clones share one live mempool via `Arc`.
#[derive(Clone)]
pub struct Mempool(Arc<Shared>);
pub struct Mempool(Arc<Inner>);
struct Shared {
struct Inner {
client: Client,
state: RwLock<State>,
rebuilder: Rebuilder,
@@ -72,7 +72,7 @@ struct Shared {
impl Mempool {
pub fn new(client: &Client) -> Self {
Self(Arc::new(Shared {
Self(Arc::new(Inner {
client: client.clone(),
state: RwLock::new(State::default()),
rebuilder: Rebuilder::default(),
@@ -112,16 +112,14 @@ impl Mempool {
/// (block 0) with aggregate stats and full tx bodies in GBT order.
pub fn block_template(&self) -> BlockTemplate {
let snap = self.snapshot();
let stats = MempoolBlock::from(&snap.block_stats[0]);
let txids: Vec<Txid> = snap.blocks[0]
.iter()
.map(|idx| snap.txs[idx.as_usize()].txid)
.collect();
let transactions = self.collect_txs(&txids);
BlockTemplate {
hash: snap.next_block_hash,
stats,
transactions,
stats: snap
.block_stats
.first()
.map(MempoolBlock::from)
.unwrap_or_default(),
transactions: self.collect_txs(snap.block0_txids()),
}
}
@@ -133,26 +131,20 @@ impl Mempool {
pub fn block_template_diff(&self, since: NextBlockHash) -> Option<BlockTemplateDiff> {
let past = self.0.rebuilder.historical_block0(since)?;
let snap = self.snapshot();
let current: FxHashSet<Txid> = snap.blocks[0]
.iter()
.map(|idx| snap.txs[idx.as_usize()].txid)
.collect();
let added_txids: Vec<Txid> = current.difference(&past).copied().collect();
let removed: Vec<Txid> = past.difference(&current).copied().collect();
let added = self.collect_txs(&added_txids);
let current: FxHashSet<Txid> = snap.block0_txids().collect();
Some(BlockTemplateDiff {
hash: snap.next_block_hash,
since,
added,
removed,
added: self.collect_txs(current.difference(&past).copied()),
removed: past.difference(&current).copied().collect(),
})
}
fn collect_txs(&self, txids: &[Txid]) -> Vec<Transaction> {
fn collect_txs(&self, txids: impl IntoIterator<Item = Txid>) -> Vec<Transaction> {
let state = self.read();
txids
.iter()
.filter_map(|txid| state.txs.get(txid).cloned())
.into_iter()
.filter_map(|txid| state.txs.get(&txid).cloned())
.collect()
}
@@ -191,9 +183,7 @@ impl Mempool {
/// Apply `f` to a `Vanished` tombstone's tx body if present.
/// `Replaced` tombstones return `None` because the tx will not confirm.
pub fn with_vanished_tx<R>(&self, txid: &Txid, f: impl FnOnce(&Transaction) -> R) -> Option<R> {
let state = self.read();
let tomb = state.graveyard.get(txid)?;
matches!(tomb.reason(), TxRemoval::Vanished).then(|| f(&tomb.tx))
self.read().graveyard.get_vanished(txid).map(|t| f(&t.tx))
}
/// Snapshot of all live mempool txids.
@@ -240,8 +230,8 @@ impl Mempool {
&self,
f: impl FnOnce(&mut dyn Iterator<Item = (Sats, OutputType)>) -> R,
) -> R {
let inner = self.read();
let mut iter = inner
let state = self.read();
let mut iter = state
.txs
.values()
.flat_map(|tx| &tx.output)
@@ -249,9 +239,9 @@ impl Mempool {
f(&mut iter)
}
/// Effective fee rate for a live tx: snapshot's chunk rate when
/// the tx is in the latest snapshot, falling back to the entry's
/// `fee/vsize` if not yet ingested.
/// Effective fee rate for a live tx: snapshot's linearized chunk
/// rate. Falls back to `fee/vsize` for txs added since the latest
/// snapshot was built (apply -> same-cycle tick gap).
pub fn live_effective_fee_rate(&self, prefix: &TxidPrefix) -> Option<FeeRate> {
if let Some(rate) = self.snapshot().chunk_rate_for(prefix) {
return Some(rate);
@@ -262,16 +252,15 @@ impl Mempool {
.map(|e| e.fee_rate())
}
/// Effective fee rate (Core's chunk rate) snapshotted into the
/// tomb's entry at burial - same value `live_effective_fee_rate`
/// returns while the tx is alive, so an evicted RBF predecessor
/// reports the package-effective rate it had in the mempool, not a
/// misleading isolated `fee/vsize`.
/// Linearized chunk rate captured at burial - same value
/// `live_effective_fee_rate` returned while the tx was alive, so an
/// evicted RBF predecessor reports the package-effective rate it
/// had in the mempool, not a misleading isolated `fee/vsize`.
pub fn graveyard_fee_rate(&self, txid: &Txid) -> Option<FeeRate> {
self.read()
.graveyard
.get(txid)
.map(|tomb| tomb.entry.chunk_rate)
.map(|tomb| tomb.chunk_rate)
}
/// `first_seen` Unix-second timestamps for `txids`, in input order.
@@ -286,7 +275,7 @@ impl Mempool {
.collect()
}
/// Infinite update loop with a 1 second interval. Resolves
/// Infinite update loop with a 500ms interval. Resolves
/// confirmed-parent prevouts via the default `getrawtransaction`
/// resolver; requires bitcoind started with `txindex=1`.
pub fn start(&self) {
@@ -298,14 +287,14 @@ impl Mempool {
/// Each cycle is wrapped in `catch_unwind` so a panic doesn't
/// freeze the snapshot; `parking_lot` locks don't poison.
///
/// Sleep is `PERIOD - work_duration`, so a 700ms cycle followed by
/// a 200ms cycle still ticks roughly every `PERIOD`. When work
/// Sleep is `PERIOD - work_duration`, so a 350ms cycle followed by
/// a 100ms cycle still ticks roughly every `PERIOD`. When work
/// overruns `PERIOD`, the next cycle starts immediately.
pub fn start_with<F>(&self, resolver: F)
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
{
const PERIOD: Duration = Duration::from_secs(1);
const PERIOD: Duration = Duration::from_millis(500);
loop {
let started = Instant::now();
let outcome = catch_unwind(AssertUnwindSafe(|| {
@@ -337,14 +326,15 @@ impl Mempool {
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
{
let Shared {
let Inner {
client,
state,
rebuilder,
} = &*self.0;
let Some(Fetched {
entries_info,
live_txids,
new_entries,
new_raws,
gbt,
min_fee,
@@ -352,8 +342,8 @@ impl Mempool {
else {
return Ok(());
};
let pulled = Preparer::prepare(entries_info, new_raws, state);
let changed = Applier::apply(state, pulled);
let pulled = Preparer::prepare(&live_txids, new_entries, new_raws, state);
let changed = Applier::apply(state, rebuilder, pulled);
Prevouts::fill(state, resolver);
rebuilder.tick(state, changed, &gbt, min_fee);

View File

@@ -6,7 +6,7 @@
use brk_types::{Sats, Timestamp, Transaction, Txid, TxidPrefix, VSize};
use rustc_hash::FxHashSet;
use crate::{Mempool, TxEntry, TxRemoval, TxStore, stores::TxGraveyard};
use crate::{Mempool, TxEntry, TxStore, stores::TxGraveyard};
#[derive(Debug, Clone)]
pub struct RbfNode {
@@ -35,15 +35,15 @@ impl Mempool {
/// and return its full predecessor tree, plus the requested tx's
/// direct predecessors. Single read-lock window.
pub fn rbf_for_tx(&self, txid: &Txid) -> RbfForTx {
let inner = self.read();
let state = self.read();
let root_txid = walk_to_replacement_root(&inner.graveyard, *txid);
let replaces: Vec<Txid> = inner
let root_txid = state.graveyard.replacement_root_of(*txid);
let replaces: Vec<Txid> = state
.graveyard
.predecessors_of(txid)
.map(|(p, _)| *p)
.collect();
let root = build_node(&root_txid, &inner.txs, &inner.graveyard);
let root = build_node(&root_txid, &state.txs, &state.graveyard);
RbfForTx { root, replaces }
}
@@ -51,30 +51,23 @@ impl Mempool {
/// by root, capped at `limit`. `full_rbf_only` drops trees with no
/// non-signaling predecessor.
pub fn recent_rbf_trees(&self, full_rbf_only: bool, limit: usize) -> Vec<RbfNode> {
let inner = self.read();
let state = self.read();
let mut seen: FxHashSet<Txid> = FxHashSet::default();
inner
state
.graveyard
.replaced_iter_recent_first()
.filter_map(|(_, by)| {
let root = walk_to_replacement_root(&inner.graveyard, *by);
let root = state.graveyard.replacement_root_of(*by);
seen.insert(root).then_some(root)
})
.filter_map(|root| build_node(&root, &inner.txs, &inner.graveyard))
.filter_map(|root| build_node(&root, &state.txs, &state.graveyard))
.filter(|n| !full_rbf_only || n.full_rbf)
.take(limit)
.collect()
}
}
fn walk_to_replacement_root(graveyard: &TxGraveyard, mut root: Txid) -> Txid {
while let Some(TxRemoval::Replaced { by }) = graveyard.get(&root).map(|t| t.reason()) {
root = *by;
}
root
}
fn build_node(txid: &Txid, txs: &TxStore, graveyard: &TxGraveyard) -> Option<RbfNode> {
let (tx, entry) = resolve_node(txid, txs, graveyard)?;

View File

@@ -7,10 +7,7 @@
use brk_types::{MempoolInfo, Timestamp, Txid};
use crate::{
TxRemoval,
stores::{AddrTracker, OutpointSpends, TxGraveyard, TxStore},
};
use crate::stores::{AddrTracker, OutpointSpends, TxGraveyard, TxStore};
#[derive(Default)]
pub struct State {
@@ -29,7 +26,6 @@ impl State {
if let Some(e) = self.txs.entry(txid) {
return Some(e.first_seen);
}
let tomb = self.graveyard.get(txid)?;
matches!(tomb.reason(), TxRemoval::Vanished).then_some(tomb.entry.first_seen)
self.graveyard.get_vanished(txid).map(|t| t.entry.first_seen)
}
}

View File

@@ -3,7 +3,10 @@ use parking_lot::RwLock;
use crate::{
State, TxEntry, TxRemoval,
steps::preparer::{TxAddition, TxsPulled},
steps::{
preparer::{TxAddition, TxsPulled},
rebuilder::{Rebuilder, Snapshot},
},
};
/// Applies a prepared diff to in-memory mempool state under one write
@@ -12,33 +15,48 @@ pub struct Applier;
impl Applier {
/// Returns true iff anything changed.
pub fn apply(lock: &RwLock<State>, pulled: TxsPulled) -> bool {
///
/// `rebuilder` supplies the previous cycle's snapshot. Burial reads
/// each tomb's `chunk_rate` from the snapshot (always-fresh,
/// package-aware via local linearization). The fallback to
/// `entry.fee_rate()` is unreachable in steady state - every burial
/// target was alive at the previous tick, so the snapshot has it.
pub fn apply(lock: &RwLock<State>, rebuilder: &Rebuilder, pulled: TxsPulled) -> bool {
let TxsPulled { added, removed } = pulled;
let has_changes = !added.is_empty() || !removed.is_empty();
let mut state = lock.write();
Self::bury_removals(&mut state, removed);
Self::bury_removals(&mut state, rebuilder, removed);
Self::publish_additions(&mut state, added);
state.graveyard.evict_old();
has_changes
}
fn bury_removals(state: &mut State, removed: Vec<(TxidPrefix, TxRemoval)>) {
fn bury_removals(
state: &mut State,
rebuilder: &Rebuilder,
removed: Vec<(TxidPrefix, TxRemoval)>,
) {
let snapshot = rebuilder.snapshot();
for (prefix, reason) in removed {
Self::bury_one(state, &prefix, reason);
Self::bury_one(state, &snapshot, &prefix, reason);
}
}
fn bury_one(state: &mut State, prefix: &TxidPrefix, reason: TxRemoval) {
fn bury_one(state: &mut State, snapshot: &Snapshot, prefix: &TxidPrefix, reason: TxRemoval) {
let Some(record) = state.txs.remove_by_prefix(prefix) else {
return;
};
let txid = record.entry.txid;
let chunk_rate = snapshot
.chunk_rate_for(prefix)
.unwrap_or_else(|| record.entry.fee_rate());
state.info.remove(&record.tx, record.entry.fee);
state.addrs.remove_tx(&record.tx, &txid);
state.addrs.remove_tx(&record.tx);
state.outpoint_spends.remove_spends(&record.tx, *prefix);
state.graveyard.bury(txid, record.tx, record.entry, reason);
state
.graveyard
.bury(record.tx, record.entry, chunk_rate, reason);
}
fn publish_additions(state: &mut State, added: Vec<TxAddition>) {
@@ -62,7 +80,7 @@ impl Applier {
fn publish_one(state: &mut State, tx: Transaction, entry: TxEntry) {
let prefix = entry.txid_prefix();
state.info.add(&tx, entry.fee);
state.addrs.add_tx(&tx, &entry.txid);
state.addrs.add_tx(&tx);
state.outpoint_spends.insert_spends(&tx, prefix);
state.txs.insert(tx, entry);
}

View File

@@ -3,7 +3,12 @@ use brk_types::{FeeRate, MempoolEntryInfo, Txid};
use rustc_hash::FxHashMap;
pub struct Fetched {
pub entries_info: Vec<MempoolEntryInfo>,
/// Every txid currently in the mempool (from `getrawmempool false`).
/// Used to derive the `live` set for removal classification.
pub live_txids: Vec<Txid>,
/// `MempoolEntryInfo` for newly-observed txids only (existing ones
/// keep their first-sight entry on the live store).
pub new_entries: Vec<MempoolEntryInfo>,
pub new_raws: FxHashMap<Txid, RawTx>,
pub gbt: Vec<BlockTemplateTx>,
pub min_fee: FeeRate,

View File

@@ -4,34 +4,32 @@ pub use fetched::Fetched;
use brk_error::Result;
use brk_rpc::{Client, MempoolState};
use brk_types::{MempoolEntryInfo, Txid};
use brk_types::Txid;
use parking_lot::RwLock;
use crate::{
State,
stores::{TxGraveyard, TxStore},
};
use crate::{State, stores::TxStore};
/// Cap before the batch RPC so we never hand bitcoind an unbounded batch.
const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000;
/// Two batched round-trips per cycle regardless of mempool size:
/// `getrawmempool verbose` + `getblocktemplate` + `getmempoolinfo` in
/// one mixed batch, then `getrawtransaction` for new txs.
/// Three batched round-trips per cycle, scaling with churn rather than
/// mempool size: `getblocktemplate` + `getrawmempool false` +
/// `getmempoolinfo` in one mixed batch; then `getmempoolentry` and
/// `getrawtransaction` per *new* txid only.
///
/// `getblocktemplate` is validated to be a subset of the verbose
/// listing inside the RPC layer; mismatches return `Ok(None)` so the
/// cycle is skipped without polluting downstream state.
/// `getblocktemplate` is validated to be a subset of the txid listing
/// inside the RPC layer; mismatches return `Ok(None)` so the cycle is
/// skipped without polluting downstream state.
///
/// Confirmed prevouts are resolved post-apply by the caller-supplied
/// resolver passed to `Mempool::update_with`, so the in-crate path no
/// longer issues a third batch for parents.
/// longer issues a fourth batch for parents.
pub struct Fetcher;
impl Fetcher {
pub fn fetch(client: &Client, lock: &RwLock<State>) -> Result<Option<Fetched>> {
let Some(MempoolState {
entries,
live_txids,
gbt,
min_fee,
}) = client.fetch_mempool_state()?
@@ -40,27 +38,30 @@ impl Fetcher {
};
let new_txids = {
let state = lock.read();
Self::new_txids(&entries, &state.txs, &state.graveyard)
Self::new_txids(&live_txids, &state.txs)
};
let new_entries = client.fetch_mempool_entries(&new_txids)?;
let new_raws = client.get_raw_transactions(&new_txids)?;
Ok(Some(Fetched {
entries_info: entries,
live_txids,
new_entries,
new_raws,
gbt,
min_fee,
}))
}
fn new_txids(
entries_info: &[MempoolEntryInfo],
known: &TxStore,
graveyard: &TxGraveyard,
) -> Vec<Txid> {
entries_info
/// Live txids the local store hasn't seen yet. Graveyard txs are
/// included so a re-broadcast (post-reorg or a peer republishing)
/// flows through `Preparer::classify_addition` and lands as
/// [`crate::TxAddition::Revived`] instead of sitting orphaned for
/// the full graveyard retention.
fn new_txids(live_txids: &[Txid], known: &TxStore) -> Vec<Txid> {
live_txids
.iter()
.filter(|info| !known.contains(&info.txid) && !graveyard.contains(&info.txid))
.filter(|txid| !known.contains(txid))
.take(MAX_TX_FETCHES_PER_CYCLE)
.map(|info| info.txid)
.copied()
.collect()
}
}

View File

@@ -1,13 +1,15 @@
//! The five pipeline steps. See the crate-level docs for the cycle.
//! The five pipeline steps, in cycle order. See the crate-level docs
//! for the full cycle narrative.
mod applier;
mod fetcher;
pub(crate) mod preparer;
mod preparer;
mod prevouts;
pub(crate) mod rebuilder;
mod rebuilder;
pub use applier::Applier;
pub use fetcher::{Fetched, Fetcher};
pub use preparer::{Preparer, TxEntry, TxRemoval};
pub use prevouts::Prevouts;
pub use rebuilder::{BlockStats, Rebuilder, RecommendedFees, SnapTx, Snapshot, TxIndex};
pub(crate) use applier::Applier;
pub(crate) use fetcher::{Fetched, Fetcher};
pub(crate) use preparer::{Preparer, TxEntry, TxRemoval};
pub(crate) use prevouts::Prevouts;
pub(crate) use rebuilder::{BlockStats, RecommendedFees, Rebuilder, SnapTx, TxIndex};
pub use rebuilder::Snapshot;

View File

@@ -1,18 +1,19 @@
//! Turn `Fetched` raws into a typed diff for the Applier. Pure CPU,
//! holds a read guard on `State` for the cycle. New txs are
//! classified into three buckets:
//! holds a read guard on `State` for the cycle. New entries are
//! classified into two buckets:
//!
//! - **live** - already in `known`, skipped.
//! - **revivable** - in the graveyard, resurrected from the tombstone.
//! - **fresh** - decoded from `new_raws`, prevouts resolved against
//! the live mempool only. Confirmed-parent prevouts land as
//! `prevout: None` and are filled post-apply by the resolver passed
//! to `Mempool::update_with`.
//!
//! Removals are inferred by cross-referencing inputs.
//! Existing entries are not re-classified - they keep their first-sight
//! state on the live store. Removals are inferred by cross-referencing
//! inputs against the full `live_txids` set from the cycle's pull.
use brk_rpc::RawTx;
use brk_types::{MempoolEntryInfo, Txid, TxidPrefix};
use brk_types::{MempoolEntryInfo, Transaction, Txid, TxidPrefix, Vout};
use parking_lot::RwLock;
use rustc_hash::{FxHashMap, FxHashSet};
@@ -31,39 +32,39 @@ pub use tx_entry::TxEntry;
pub use tx_removal::TxRemoval;
pub use txs_pulled::TxsPulled;
type SpentBy = FxHashMap<(Txid, Vout), Txid>;
pub struct Preparer;
impl Preparer {
pub fn prepare(
entries_info: Vec<MempoolEntryInfo>,
live_txids: &[Txid],
new_entries: Vec<MempoolEntryInfo>,
new_raws: FxHashMap<Txid, RawTx>,
lock: &RwLock<State>,
) -> TxsPulled {
let state = lock.read();
let live: FxHashSet<TxidPrefix> = entries_info
.iter()
.map(|info| TxidPrefix::from(&info.txid))
.collect();
let added = Self::classify_additions(entries_info, new_raws, &state.txs, &state.graveyard);
let removed = TxRemoval::classify(&live, &added, &state.txs);
let live: FxHashSet<TxidPrefix> = live_txids.iter().map(TxidPrefix::from).collect();
let added = Self::classify_additions(new_entries, new_raws, &state.txs, &state.graveyard);
let removed = Self::classify_removals(&live, &added, &state.txs);
TxsPulled { added, removed }
}
fn classify_additions(
entries_info: Vec<MempoolEntryInfo>,
new_entries: Vec<MempoolEntryInfo>,
mut new_raws: FxHashMap<Txid, RawTx>,
known: &TxStore,
graveyard: &TxGraveyard,
) -> Vec<TxAddition> {
entries_info
new_entries
.iter()
.filter_map(|info| Self::classify(info, known, graveyard, &mut new_raws))
.filter_map(|info| Self::classify_addition(info, known, graveyard, &mut new_raws))
.collect()
}
fn classify(
fn classify_addition(
info: &MempoolEntryInfo,
known: &TxStore,
graveyard: &TxGraveyard,
@@ -78,4 +79,44 @@ impl Preparer {
let raw = new_raws.remove(&info.txid)?;
Some(TxAddition::fresh(info, raw, known))
}
/// One `(prefix, reason)` per known tx that's gone from the live set,
/// in `known` iteration order.
fn classify_removals(
live: &FxHashSet<TxidPrefix>,
added: &[TxAddition],
known: &TxStore,
) -> Vec<(TxidPrefix, TxRemoval)> {
let spent_by = Self::build_spent_by(added);
known
.records()
.filter_map(|(prefix, record)| {
if live.contains(prefix) {
return None;
}
Some((*prefix, Self::removal_reason(&record.tx, &spent_by)))
})
.collect()
}
fn removal_reason(tx: &Transaction, spent_by: &SpentBy) -> TxRemoval {
tx.input
.iter()
.find_map(|i| spent_by.get(&(i.txid, i.vout)).copied())
.map_or(TxRemoval::Vanished, |by| TxRemoval::Replaced { by })
}
/// Only `Fresh` additions carry tx input data. Revived txs were
/// already in-pool, so they can't be new spenders of anything.
fn build_spent_by(added: &[TxAddition]) -> SpentBy {
let mut spent_by: SpentBy = FxHashMap::default();
for addition in added {
if let TxAddition::Fresh { tx, .. } = addition {
for txin in &tx.input {
spent_by.insert((txin.txid, txin.vout), tx.txid);
}
}
}
spent_by
}
}

View File

@@ -69,7 +69,7 @@ impl TxAddition {
fn build_txin(txin: bitcoin::TxIn, mempool_txs: &TxStore) -> TxIn {
let prev_txid: Txid = txin.previous_output.txid.into();
let prev_vout = usize::from(Vout::from(txin.previous_output.vout));
let prev_vout = Vout::from(txin.previous_output.vout);
let prevout = Self::resolve_prevout(&prev_txid, prev_vout, mempool_txs);
TxIn {
@@ -78,7 +78,7 @@ impl TxAddition {
is_coinbase: false,
prevout,
txid: prev_txid,
vout: txin.previous_output.vout.into(),
vout: prev_vout,
script_sig: txin.script_sig,
script_sig_asm: (),
witness: txin.witness.into(),
@@ -88,10 +88,10 @@ impl TxAddition {
}
}
fn resolve_prevout(prev_txid: &Txid, prev_vout: usize, mempool_txs: &TxStore) -> Option<TxOut> {
fn resolve_prevout(prev_txid: &Txid, prev_vout: Vout, mempool_txs: &TxStore) -> Option<TxOut> {
let prev = mempool_txs.get(prev_txid)?;
prev.output
.get(prev_vout)
.get(usize::from(prev_vout))
.map(|o| TxOut::from((o.script_pubkey.clone(), o.value)))
}
}

View File

@@ -2,9 +2,8 @@ use brk_types::{FeeRate, MempoolEntryInfo, Sats, Timestamp, Txid, TxidPrefix, VS
use smallvec::SmallVec;
/// A mempool transaction entry. Carries the per-tx facts needed for
/// projection, plus the snapshot-time `chunk_rate` (Core's cluster-mempool
/// chunk fee rate, or the proxy fallback) used as the effective rate
/// for partitioning, fee tiers, and CPFP.
/// projection. Chunk rates live on the snapshot (linearized fresh each
/// cycle) - not stored here.
#[derive(Debug, Clone)]
pub struct TxEntry {
pub txid: Txid,
@@ -17,11 +16,6 @@ pub struct TxEntry {
pub first_seen: Timestamp,
/// BIP-125 explicit signaling: any input has sequence < 0xfffffffe.
pub rbf: bool,
/// Effective per-vbyte rate Core would mine this tx at. From
/// `MempoolEntryInfo::chunk_rate()`: Core 31+ uses `fees.chunk /
/// (chunkweight/4)`, older Core falls back to
/// `max(ancestor_rate, descendant_pkg_rate)`.
pub chunk_rate: FeeRate,
}
impl TxEntry {
@@ -35,7 +29,6 @@ impl TxEntry {
depends: info.depends.iter().map(TxidPrefix::from).collect(),
first_seen: info.first_seen,
rbf,
chunk_rate: info.chunk_rate(),
}
}

View File

@@ -1,12 +1,7 @@
//! Why a tx left the mempool between two pull cycles, plus the
//! classifier that diffs the live prefix set against `known` to
//! produce one [`TxRemoval`] per loser.
//! Why a tx left the mempool between two pull cycles. The diff that
//! produces one [`TxRemoval`] per loser lives on [`super::Preparer`].
use brk_types::{Transaction, Txid, TxidPrefix, Vout};
use rustc_hash::{FxHashMap, FxHashSet};
use super::TxAddition;
use crate::stores::TxStore;
use brk_types::Txid;
/// `Replaced` = at least one freshly added tx this cycle spends one of
/// its inputs (BIP-125 replacement inferred from conflicting outpoints).
@@ -18,47 +13,3 @@ pub enum TxRemoval {
Replaced { by: Txid },
Vanished,
}
type SpentBy = FxHashMap<(Txid, Vout), Txid>;
impl TxRemoval {
/// Returns `(prefix, reason)` pairs in iteration order of `known`.
pub(super) fn classify(
live: &FxHashSet<TxidPrefix>,
added: &[TxAddition],
known: &TxStore,
) -> Vec<(TxidPrefix, Self)> {
let spent_by = Self::build_spent_by(added);
known
.records()
.filter_map(|(prefix, record)| {
if live.contains(prefix) {
return None;
}
Some((*prefix, Self::find_removal(&record.tx, &spent_by)))
})
.collect()
}
fn find_removal(tx: &Transaction, spent_by: &SpentBy) -> Self {
tx.input
.iter()
.find_map(|i| spent_by.get(&(i.txid, i.vout)).cloned())
.map_or(Self::Vanished, |by| Self::Replaced { by })
}
/// Only `Fresh` additions carry tx input data. Revived txs were
/// already in-pool, so they can't be new spenders of anything.
fn build_spent_by(added: &[TxAddition]) -> SpentBy {
let mut spent_by: SpentBy = FxHashMap::default();
for addition in added {
if let TxAddition::Fresh { tx, .. } = addition {
for txin in &tx.input {
spent_by.insert((txin.txid, txin.vout), tx.txid);
}
}
}
spent_by
}
}

View File

@@ -54,8 +54,12 @@ impl Prevouts {
}
let mut state = lock.write();
Self::write_fills(&mut state, in_mempool);
Self::write_fills(&mut state, external);
for (txid, fills) in in_mempool.into_iter().chain(external) {
let prefix = TxidPrefix::from(&txid);
for prevout in state.txs.apply_fills(&prefix, fills) {
state.addrs.add_input(&txid, &prevout);
}
}
true
}
@@ -140,13 +144,4 @@ impl Prevouts {
})
.collect()
}
fn write_fills(state: &mut State, fills: FillBatch) {
for (txid, tx_fills) in fills {
let prefix = TxidPrefix::from(&txid);
for prevout in state.txs.apply_fills(&prefix, tx_fills) {
state.addrs.add_input(&txid, &prevout);
}
}
}
}

View File

@@ -14,7 +14,7 @@ use rustc_hash::FxHashSet;
use crate::State;
use partition::Partitioner;
use snapshot::{PrefixIndex, builder};
use snapshot::build_txs;
mod partition;
mod snapshot;
@@ -56,24 +56,20 @@ impl Rebuilder {
return;
}
let snap = Self::build_snapshot(lock, gbt, min_fee);
let block0_set: FxHashSet<Txid> = snap.blocks[0]
.iter()
.map(|idx| snap.txs[idx.as_usize()].txid)
.collect();
let block0_set: FxHashSet<Txid> = snap.block0_txids().collect();
let next_hash = snap.next_block_hash;
*self.snapshot.write() = Arc::new(snap);
self.push_history(next_hash, block0_set);
self.dirty.store(false, Ordering::Release);
self.rebuild_count.fetch_add(1, Ordering::Relaxed);
}
fn push_history(&self, hash: NextBlockHash, set: FxHashSet<Txid>) {
let mut hist = self.history.write();
hist.retain(|(h, _)| *h != hash);
hist.push_back((hash, set));
hist.retain(|(h, _)| *h != next_hash);
hist.push_back((next_hash, block0_set));
while hist.len() > HISTORY {
hist.pop_front();
}
drop(hist);
self.dirty.store(false, Ordering::Release);
self.rebuild_count.fetch_add(1, Ordering::Relaxed);
}
/// Past block-0 txid set for `hash`, or `None` if it has aged out
@@ -102,10 +98,17 @@ impl Rebuilder {
) -> Snapshot {
let (txs, prefix_to_idx) = {
let state = lock.read();
builder::build_txs(&state.txs)
build_txs(&state.txs)
};
let block0 = Self::block_from_gbt(gbt, &prefix_to_idx);
// Block 0 from `getblocktemplate`: Core's actual selection.
// Fetcher already validated GBT ⊆ live txid listing, so any
// drop here is a same-cycle race and the partitioner picks up
// the slack so callers always see NUM_BLOCKS blocks.
let block0: Vec<TxIndex> = gbt
.iter()
.filter_map(|t| prefix_to_idx.get(&TxidPrefix::from(&t.txid)).copied())
.collect();
let excluded: FxHashSet<TxIndex> = block0.iter().copied().collect();
let rest = Partitioner::partition(&txs, &excluded, NUM_BLOCKS.saturating_sub(1));
@@ -116,17 +119,6 @@ impl Rebuilder {
Snapshot::build(txs, blocks, prefix_to_idx, min_fee)
}
/// Block 0 from `getblocktemplate`: Core's actual selection. Maps
/// each GBT txid back to its `TxIndex` via the per-build prefix
/// index. Fetcher already validated GBT ⊆ verbose mempool, so any
/// drop here is a same-cycle race and the partitioner picks up the
/// slack so callers always see eight blocks.
fn block_from_gbt(gbt: &[BlockTemplateTx], prefix_to_idx: &PrefixIndex) -> Vec<TxIndex> {
gbt.iter()
.filter_map(|t| prefix_to_idx.get(&TxidPrefix::from(&t.txid)).copied())
.collect()
}
pub fn snapshot(&self) -> Arc<Snapshot> {
self.snapshot.read().clone()
}

View File

@@ -4,9 +4,10 @@
//! `/api/v1/fees/mempool-blocks` as a coarse fee-tier gradient.
//!
//! No topological gate: a child can sit before its parent within a
//! tied-rate run, but cluster members share a `chunk_rate` so they
//! land in the same block in the common case, and the only output is
//! a per-block rate distribution where intra-block order is invisible.
//! tied-rate run, but every cluster member shares its chunk's
//! `chunk_rate` (linearized at snapshot time) so chunk-mates land in
//! the same block, and the only output is a per-block rate
//! distribution where intra-block order is invisible.
//!
//! The final block is a catch-all (no vsize cap) so leftover tail
//! vsize is accounted for instead of silently dropped.
@@ -17,7 +18,7 @@
use std::cmp::Reverse;
use brk_types::VSize;
use brk_types::{FeeRate, VSize};
use rustc_hash::FxHashSet;
use super::snapshot::{SnapTx, TxIndex};
@@ -33,12 +34,12 @@ impl Partitioner {
if num_remaining_blocks == 0 {
return Vec::new();
}
let sorted = sorted_indices(txs, excluded);
let sorted = sorted_candidates(txs, excluded);
let mut blocks: Vec<Vec<TxIndex>> = (0..num_remaining_blocks).map(|_| Vec::new()).collect();
let mut block_vsize = VSize::default();
let mut current = 0;
let last = num_remaining_blocks - 1;
for (idx, vsize) in sorted {
for (idx, vsize, _) in sorted {
let fits = vsize <= VSize::MAX_BLOCK.saturating_sub(block_vsize);
if !fits && current < last && !blocks[current].is_empty() {
current += 1;
@@ -51,8 +52,11 @@ impl Partitioner {
}
}
fn sorted_indices(txs: &[SnapTx], excluded: &FxHashSet<TxIndex>) -> Vec<(TxIndex, VSize)> {
let mut cands: Vec<(TxIndex, VSize, _)> = txs
fn sorted_candidates(
txs: &[SnapTx],
excluded: &FxHashSet<TxIndex>,
) -> Vec<(TxIndex, VSize, FeeRate)> {
let mut cands: Vec<(TxIndex, VSize, FeeRate)> = txs
.iter()
.enumerate()
.filter_map(|(i, t)| {
@@ -61,5 +65,5 @@ fn sorted_indices(txs: &[SnapTx], excluded: &FxHashSet<TxIndex>) -> Vec<(TxIndex
})
.collect();
cands.sort_by_key(|(_, _, rate)| Reverse(*rate));
cands.into_iter().map(|(i, v, _)| (i, v)).collect()
cands
}

View File

@@ -1,45 +1,55 @@
//! Build the per-tx adjacency for a snapshot from the live `TxStore`.
//! Build the per-tx adjacency for a snapshot from the live `TxStore`,
//! then linearize chunk rates over every multi-tx cluster.
//!
//! One pass over the live records to assign compact `TxIndex`es and a
//! `prefix -> TxIndex` map, then per entry resolve `depends` against
//! it to produce parent edges. Children are mirrored from parents in
//! a second pass. Cross-pool parents (confirmed or evicted) are
//! dropped silently - the live pool reflects what miners actually see,
//! and any stale `depends` entry is self-healing.
//! it to produce parent edges. Children are mirrored from parents in a
//! second pass. Cross-pool parents (confirmed or evicted) are dropped
//! silently - the live pool reflects what miners actually see, and any
//! stale `depends` entry is self-healing.
//!
//! Final pass: walk every connected component and run Single Fee
//! Linearization over it (see [`crate::cluster`]); each member's
//! `chunk_rate` is overwritten with its chunk's feerate. Singletons
//! keep the `fee/vsize` seed set in `live_tx`.
//!
//! The prefix map is returned alongside the txs so the rebuilder can
//! reuse it for GBT mapping and the final `Snapshot::build` step
//! without reconstructing it.
use std::mem;
use brk_types::TxidPrefix;
use rustc_hash::{FxBuildHasher, FxHashMap};
use smallvec::SmallVec;
use crate::TxEntry;
use crate::stores::TxStore;
use crate::{
TxEntry,
cluster::{linearize_component, walk_cluster},
stores::TxStore,
};
use super::{SnapTx, TxIndex};
pub type PrefixIndex = FxHashMap<TxidPrefix, TxIndex>;
pub fn build_txs(txs: &TxStore) -> (Vec<SnapTx>, PrefixIndex) {
let (prefix_to_idx, ordered) = compact_index(txs);
let mut snap_txs: Vec<SnapTx> = ordered.iter().map(|e| live_tx(e, &prefix_to_idx)).collect();
let n = txs.len();
let mut prefix_to_idx: PrefixIndex =
FxHashMap::with_capacity_and_hasher(n, FxBuildHasher);
for (i, (prefix, _)) in txs.records().enumerate() {
prefix_to_idx.insert(*prefix, TxIndex::from(i));
}
let mut snap_txs: Vec<SnapTx> = txs
.records()
.map(|(_, record)| live_tx(&record.entry, &prefix_to_idx))
.collect();
mirror_children(&mut snap_txs);
refresh_chunk_rates(&mut snap_txs);
(snap_txs, prefix_to_idx)
}
fn compact_index(txs: &TxStore) -> (PrefixIndex, Vec<&TxEntry>) {
let mut map: PrefixIndex = FxHashMap::with_capacity_and_hasher(txs.len(), FxBuildHasher);
let mut ordered: Vec<&TxEntry> = Vec::with_capacity(txs.len());
for (i, (prefix, record)) in txs.records().enumerate() {
map.insert(*prefix, TxIndex::from(i));
ordered.push(&record.entry);
}
(map, ordered)
}
fn live_tx(e: &TxEntry, prefix_to_idx: &PrefixIndex) -> SnapTx {
let parents: SmallVec<[TxIndex; 2]> = e
.depends
@@ -52,24 +62,160 @@ fn live_tx(e: &TxEntry, prefix_to_idx: &PrefixIndex) -> SnapTx {
vsize: e.vsize,
weight: e.weight,
size: e.size,
chunk_rate: e.chunk_rate,
chunk_rate: e.fee_rate(),
parents,
children: SmallVec::new(),
}
}
fn mirror_children(txs: &mut [SnapTx]) {
let edges: Vec<(TxIndex, TxIndex)> = txs
.iter()
.enumerate()
.flat_map(|(i, t)| {
let child = TxIndex::from(i);
t.parents.iter().map(move |&p| (p, child))
})
.collect();
for (parent, child) in edges {
if let Some(t) = txs.get_mut(parent.as_usize()) {
t.children.push(child);
for i in 0..txs.len() {
let child = TxIndex::from(i);
let parents = mem::take(&mut txs[i].parents);
for &p in &parents {
if let Some(t) = txs.get_mut(p.as_usize()) {
t.children.push(child);
}
}
txs[i].parents = parents;
}
}
/// Walk every multi-tx connected component once and overwrite each
/// member's `chunk_rate` with the linearized chunk's feerate. Visited
/// bitmap ensures each cluster is linearized exactly once.
fn refresh_chunk_rates(snap_txs: &mut [SnapTx]) {
let n = snap_txs.len();
let mut visited = vec![false; n];
for seed in 0..n {
if visited[seed] {
continue;
}
let t = &snap_txs[seed];
if t.parents.is_empty() && t.children.is_empty() {
visited[seed] = true;
continue;
}
let component = walk_cluster(snap_txs, TxIndex::from(seed));
for &m in &component {
visited[m.as_usize()] = true;
}
if component.len() <= 1 {
continue;
}
let (members, chunks) = linearize_component(snap_txs, &component);
for chunk in &chunks {
for &local in &chunk.txs {
let m = members[u32::from(local) as usize];
snap_txs[m.as_usize()].chunk_rate = chunk.feerate;
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU32, Ordering};
use bitcoin::hashes::Hash;
use brk_types::{FeeRate, Sats, Txid, VSize, Weight};
use super::*;
/// Build a `SnapTx` for tests. `txid` is auto-assigned from a
/// process-wide counter so each tx is distinguishable in
/// debug output; the cluster code itself keys off `TxIndex`,
/// not `txid`.
fn snap_tx(fee: Sats, vsize: VSize) -> SnapTx {
static COUNTER: AtomicU32 = AtomicU32::new(0);
let mut bytes = [0u8; 32];
bytes[..4].copy_from_slice(&COUNTER.fetch_add(1, Ordering::Relaxed).to_le_bytes());
SnapTx {
txid: Txid::from(bitcoin::Txid::from_byte_array(bytes)),
fee,
vsize,
weight: Weight::from(vsize),
size: u64::from(vsize),
chunk_rate: FeeRate::from((fee, vsize)),
parents: SmallVec::new(),
children: SmallVec::new(),
}
}
fn link(txs: &mut [SnapTx], parent: usize, child: usize) {
txs[child].parents.push(TxIndex::from(parent));
txs[parent].children.push(TxIndex::from(child));
}
fn sats(n: u64) -> Sats {
Sats::from(n)
}
fn vsize(n: u64) -> VSize {
VSize::from(n)
}
#[test]
fn singleton_keeps_fee_per_vsize() {
let mut txs = vec![snap_tx(sats(1000), vsize(100))];
let seed = txs[0].chunk_rate;
refresh_chunk_rates(&mut txs);
assert_eq!(txs[0].chunk_rate, seed);
}
#[test]
fn two_tx_cpfp_lift() {
let mut txs = vec![
snap_tx(sats(100), vsize(100)),
snap_tx(sats(1900), vsize(100)),
];
link(&mut txs, 0, 1);
let parent_seed = txs[0].chunk_rate;
refresh_chunk_rates(&mut txs);
assert!(txs[0].chunk_rate > parent_seed);
assert_eq!(txs[0].chunk_rate, txs[1].chunk_rate);
assert_eq!(txs[0].chunk_rate, FeeRate::from((sats(2000), vsize(200))));
}
#[test]
fn three_tx_chain_chunks_correctly() {
let mut txs = vec![
snap_tx(sats(100), vsize(100)),
snap_tx(sats(100), vsize(100)),
snap_tx(sats(5800), vsize(100)),
];
link(&mut txs, 0, 1);
link(&mut txs, 1, 2);
refresh_chunk_rates(&mut txs);
let combined = FeeRate::from((sats(6000), vsize(300)));
assert_eq!(txs[0].chunk_rate, combined);
assert_eq!(txs[1].chunk_rate, combined);
assert_eq!(txs[2].chunk_rate, combined);
}
#[test]
fn disjoint_clusters_linearized_independently() {
let mut txs = vec![
snap_tx(sats(100), vsize(100)),
snap_tx(sats(1900), vsize(100)),
snap_tx(sats(500), vsize(100)),
snap_tx(sats(4500), vsize(100)),
];
link(&mut txs, 0, 1);
link(&mut txs, 2, 3);
refresh_chunk_rates(&mut txs);
assert_eq!(txs[0].chunk_rate, txs[1].chunk_rate);
assert_eq!(txs[2].chunk_rate, txs[3].chunk_rate);
assert_ne!(txs[0].chunk_rate, txs[2].chunk_rate);
}
#[test]
fn cluster_cap_does_not_panic() {
let n = 100;
let mut txs: Vec<SnapTx> = (0..n).map(|_| snap_tx(sats(1000), vsize(100))).collect();
for i in 1..n {
link(&mut txs, i - 1, i);
}
refresh_chunk_rates(&mut txs);
}
}

View File

@@ -74,7 +74,7 @@ impl Fees {
previous_fee: Option<FeeRate>,
min_fee: FeeRate,
) -> FeeRate {
let median = block.median_fee_rate();
let median = block.fee_range[3];
let use_fee = previous_fee.map_or(median, |prev| FeeRate::mean(median, prev));
let vsize = u64::from(block.total_vsize);
if vsize <= EMPTY_BLOCK_VSIZE || median < min_fee {

View File

@@ -1,26 +1,26 @@
pub mod builder;
mod builder;
mod fees;
mod stats;
mod tx;
mod tx_index;
pub use builder::PrefixIndex;
pub(crate) use builder::{PrefixIndex, build_txs};
pub use stats::BlockStats;
pub use tx::SnapTx;
pub use tx_index::TxIndex;
use std::hash::{DefaultHasher, Hash, Hasher};
use brk_types::{FeeRate, NextBlockHash, RecommendedFees, TxidPrefix};
use brk_types::{FeeRate, NextBlockHash, RecommendedFees, Txid, TxidPrefix};
use fees::Fees;
#[derive(Default)]
pub struct Snapshot {
/// Dense per-tx data indexed by `TxIndex`. Each entry carries the
/// chunk rate (Core's chunk-mempool truth or proxy fallback) plus
/// resolved parent/child adjacency, so CPFP queries don't re-read
/// any external state.
/// linearized chunk rate (computed locally at snapshot build time)
/// plus resolved parent/child adjacency, so CPFP queries don't
/// re-read any external state.
pub txs: Vec<SnapTx>,
/// Projected blocks. `blocks[0]` is Core's `getblocktemplate`
/// (Bitcoin Core's actual selection); the rest are greedy-packed
@@ -33,8 +33,8 @@ pub struct Snapshot {
pub next_block_hash: NextBlockHash,
/// Per-snapshot `TxidPrefix -> TxIndex` index, so live queries can
/// resolve a prefix to the snapshot's compact index without
/// re-walking `txs`. Built once by `builder::build_txs` and reused
/// by the rebuilder for GBT mapping.
/// re-walking `txs`. Built once by `build_txs` and reused by the
/// rebuilder for GBT mapping.
prefix_to_idx: PrefixIndex,
}
@@ -47,17 +47,7 @@ impl Snapshot {
prefix_to_idx: PrefixIndex,
min_fee: FeeRate,
) -> Self {
let block_stats: Vec<BlockStats> = blocks
.iter()
.enumerate()
.map(|(i, block)| {
if i == 0 {
BlockStats::compute_core(block, &txs)
} else {
BlockStats::compute_projected(block, &txs)
}
})
.collect();
let block_stats = BlockStats::for_blocks(&blocks, &txs);
let fees = Fees::compute(&block_stats, min_fee);
let next_block_hash = Self::hash_next_block(&blocks);
Self {
@@ -87,8 +77,20 @@ impl Snapshot {
self.prefix_to_idx.get(prefix).copied()
}
/// Effective chunk rate for a live tx by prefix, or `None` if the
/// tx isn't in this snapshot.
/// Txids of `blocks[0]` (Core's `getblocktemplate` selection),
/// in template order. Empty for a default snapshot.
pub fn block0_txids(&self) -> impl Iterator<Item = Txid> + '_ {
self.blocks
.first()
.into_iter()
.flatten()
.map(|idx| self.txs[idx.as_usize()].txid)
}
/// Linearized chunk rate for a live tx by prefix. Always fresh
/// (recomputed each snapshot), package-aware (CPFP and ancestor
/// chains lift correctly), and equals `fee/vsize` for singletons.
/// Returns `None` if the tx isn't in this snapshot.
pub fn chunk_rate_for(&self, prefix: &TxidPrefix) -> Option<FeeRate> {
let idx = self.idx_of(prefix)?;
Some(self.txs[idx.as_usize()].chunk_rate)

View File

@@ -32,14 +32,18 @@ pub struct BlockStats {
}
impl BlockStats {
/// Block 0 (Core's actual selection): exact 0/10/25/50/75/90/100.
pub fn compute_core(block: &[TxIndex], txs: &[SnapTx]) -> Self {
Self::compute(block, txs, CORE_PERCENTILES)
}
/// Blocks 1..N (projected): clipped 5/95 bounds to hide outliers.
pub fn compute_projected(block: &[TxIndex], txs: &[SnapTx]) -> Self {
Self::compute(block, txs, PROJECTED_PERCENTILES)
/// Stats for every projected block in `blocks`, in order. `blocks[0]`
/// uses Core's exact 0..100 percentiles; the rest use the clipped
/// 5..95 range to hide CPFP / stale-GBT outliers.
pub fn for_blocks(blocks: &[Vec<TxIndex>], txs: &[SnapTx]) -> Vec<Self> {
blocks
.iter()
.enumerate()
.map(|(i, block)| {
let pct = if i == 0 { CORE_PERCENTILES } else { PROJECTED_PERCENTILES };
Self::compute(block, txs, pct)
})
.collect()
}
/// Vsize-weighted percentile distribution over `chunk_rate` -
@@ -78,10 +82,6 @@ impl BlockStats {
fee_range,
}
}
pub fn median_fee_rate(&self) -> FeeRate {
self.fee_range[3]
}
}
impl From<&BlockStats> for MempoolBlock {

View File

@@ -3,10 +3,11 @@ use smallvec::SmallVec;
use super::TxIndex;
/// Frozen per-tx view used by the snapshot. Holds the chunk rate
/// (Core's `fees.chunk` / `chunkweight` when available, else proxy)
/// plus resolved parent/child adjacency in `TxIndex` space, so
/// CPFP queries are a pure walk over `Snapshot.txs`.
/// Frozen per-tx view used by the snapshot. `chunk_rate` is the
/// linearized chunk feerate (local Single Fee Linearization, run fresh
/// every snapshot); singletons report `fee/vsize`. Parent/child
/// adjacency in `TxIndex` space, so CPFP queries are a pure walk over
/// `Snapshot.txs`.
#[derive(Clone, Debug)]
pub struct SnapTx {
pub txid: Txid,

View File

@@ -15,7 +15,8 @@ use addr_entry::AddrEntry;
pub struct AddrTracker(FxHashMap<AddrBytes, AddrEntry>);
impl AddrTracker {
pub fn add_tx(&mut self, tx: &Transaction, txid: &Txid) {
pub fn add_tx(&mut self, tx: &Transaction) {
let txid = &tx.txid;
for txin in &tx.input {
if let Some(prevout) = txin.prevout.as_ref() {
self.add_input(txid, prevout);
@@ -28,7 +29,8 @@ impl AddrTracker {
}
}
pub fn remove_tx(&mut self, tx: &Transaction, txid: &Txid) {
pub fn remove_tx(&mut self, tx: &Transaction) {
let txid = &tx.txid;
for txin in &tx.input {
if let Some(prevout) = txin.prevout.as_ref() {
self.remove_input(txid, prevout);

View File

@@ -1,13 +1,14 @@
//! Stateful in-memory holders. After Phase 3 they're plain owned
//! types (no internal locks) — `State` aggregates them under a
//! single `RwLock` in `crate::state`.
//! In-memory holders for live mempool state. Plain owned types with
//! no internal locks: `crate::state::State` aggregates them under a
//! single `RwLock` so the cycle steps and read-side accessors share
//! one lock-order discipline.
pub mod addr_tracker;
pub(crate) mod addr_tracker;
pub(crate) mod outpoint_spends;
pub mod tx_graveyard;
pub mod tx_store;
pub(crate) mod tx_graveyard;
pub(crate) mod tx_store;
pub use addr_tracker::AddrTracker;
pub(crate) use addr_tracker::AddrTracker;
pub(crate) use outpoint_spends::OutpointSpends;
pub use tx_graveyard::{TxGraveyard, TxTombstone};
pub use tx_store::TxStore;
pub(crate) use tx_graveyard::{TxGraveyard, TxTombstone};
pub(crate) use tx_store::TxStore;

View File

@@ -13,11 +13,7 @@ pub struct OutpointSpends(FxHashMap<OutpointPrefix, TxidPrefix>);
impl OutpointSpends {
pub fn insert_spends(&mut self, tx: &Transaction, spender: TxidPrefix) {
for input in &tx.input {
if input.is_coinbase {
continue;
}
let key = OutpointPrefix::new(TxidPrefix::from(&input.txid), input.vout);
for key in spent_outpoints(tx) {
self.0.insert(key, spender);
}
}
@@ -25,11 +21,7 @@ impl OutpointSpends {
/// Only removes entries whose stored prefix still matches `spender`,
/// so an outpoint already re-claimed by a later spender is left alone.
pub fn remove_spends(&mut self, tx: &Transaction, spender: TxidPrefix) {
for input in &tx.input {
if input.is_coinbase {
continue;
}
let key = OutpointPrefix::new(TxidPrefix::from(&input.txid), input.vout);
for key in spent_outpoints(tx) {
if self.0.get(&key) == Some(&spender) {
self.0.remove(&key);
}
@@ -41,3 +33,10 @@ impl OutpointSpends {
self.0.get(key).copied()
}
}
fn spent_outpoints(tx: &Transaction) -> impl Iterator<Item = OutpointPrefix> + '_ {
tx.input
.iter()
.filter(|i| !i.is_coinbase)
.map(|i| OutpointPrefix::new(TxidPrefix::from(&i.txid), i.vout))
}

View File

@@ -3,12 +3,12 @@ use std::{
time::{Duration, Instant},
};
use brk_types::{Transaction, Txid};
use brk_types::{FeeRate, Transaction, Txid};
use rustc_hash::FxHashMap;
mod tombstone;
pub use tombstone::TxTombstone;
pub(crate) use tombstone::TxTombstone;
use crate::{TxEntry, TxRemoval};
@@ -23,10 +23,6 @@ pub struct TxGraveyard {
}
impl TxGraveyard {
pub fn contains(&self, txid: &Txid) -> bool {
self.tombstones.contains_key(txid)
}
pub fn tombstones_len(&self) -> usize {
self.tombstones.len()
}
@@ -39,6 +35,25 @@ impl TxGraveyard {
self.tombstones.get(txid)
}
/// Tombstone iff the tx vanished from the pool (mined, expired, or
/// dropped). `Replaced` tombstones return `None` because the tx
/// will not confirm.
pub fn get_vanished(&self, txid: &Txid) -> Option<&TxTombstone> {
let tomb = self.tombstones.get(txid)?;
matches!(tomb.removal, TxRemoval::Vanished).then_some(tomb)
}
/// Walk forward through `Replaced { by }` to the terminal replacer.
/// Returns `txid` itself if it isn't replaced (live or `Vanished`).
pub fn replacement_root_of(&self, mut txid: Txid) -> Txid {
while let Some(TxRemoval::Replaced { by }) =
self.tombstones.get(&txid).map(|t| &t.removal)
{
txid = *by;
}
txid
}
/// Tombstones marked as `Replaced { by: replacer }`. Used to walk
/// backward through RBF history: given a tx that's still live (or
/// in the graveyard), find every tx it displaced.
@@ -61,18 +76,33 @@ impl TxGraveyard {
pub fn replaced_iter_recent_first(&self) -> impl Iterator<Item = (&Txid, &Txid)> {
self.order.iter().rev().filter_map(|(t, txid)| {
let ts = self.tombstones.get(txid)?;
if ts.removed_at() != *t {
if ts.removed_at != *t {
return None;
}
Some((txid, ts.replaced_by()?))
})
}
pub fn bury(&mut self, txid: Txid, tx: Transaction, entry: TxEntry, removal: TxRemoval) {
let now = Instant::now();
self.tombstones
.insert(txid, TxTombstone::new(tx, entry, removal, now));
self.order.push_back((now, txid));
pub fn bury(
&mut self,
tx: Transaction,
entry: TxEntry,
chunk_rate: FeeRate,
removal: TxRemoval,
) {
let txid = entry.txid;
let removed_at = Instant::now();
self.tombstones.insert(
txid,
TxTombstone {
tx,
entry,
chunk_rate,
removal,
removed_at,
},
);
self.order.push_back((removed_at, txid));
}
/// Remove and return the tombstone, e.g. when the tx comes back to life.
@@ -92,7 +122,7 @@ impl TxGraveyard {
}
let (_, txid) = self.order.pop_front().unwrap();
if let Some(ts) = self.tombstones.get(&txid)
&& ts.removed_at() == t
&& ts.removed_at == t
{
self.tombstones.remove(&txid);
}

View File

@@ -1,41 +1,23 @@
use std::time::Instant;
use brk_types::{Transaction, Txid};
use brk_types::{FeeRate, Transaction, Txid};
use crate::{TxEntry, TxRemoval};
/// A buried mempool tx, retained for reappearance detection and
/// post-mine analytics.
/// post-mine analytics. `chunk_rate` is the linearized chunk feerate at
/// burial time - same value `live_effective_fee_rate` reported while
/// the tx was alive, so an evicted RBF predecessor reports the
/// package-effective rate, not a misleading isolated `fee/vsize`.
pub struct TxTombstone {
pub tx: Transaction,
pub entry: TxEntry,
removal: TxRemoval,
removed_at: Instant,
pub chunk_rate: FeeRate,
pub removal: TxRemoval,
pub removed_at: Instant,
}
impl TxTombstone {
pub(crate) fn new(
tx: Transaction,
entry: TxEntry,
removal: TxRemoval,
removed_at: Instant,
) -> Self {
Self {
tx,
entry,
removal,
removed_at,
}
}
pub fn reason(&self) -> &TxRemoval {
&self.removal
}
pub(crate) fn removed_at(&self) -> Instant {
self.removed_at
}
pub(crate) fn replaced_by(&self) -> Option<&Txid> {
match &self.removal {
TxRemoval::Replaced { by } => Some(by),

View File

@@ -34,10 +34,6 @@ impl TxStore {
self.records.len()
}
pub fn is_empty(&self) -> bool {
self.records.is_empty()
}
pub fn get(&self, txid: &Txid) -> Option<&Transaction> {
self.records.get(&TxidPrefix::from(txid)).map(|r| &r.tx)
}

View File

@@ -10,10 +10,10 @@
//! carries the same chunk-rate semantics the live mempool produces.
use brk_error::{Error, OptionData, Result};
use brk_mempool::{ChunkInput, linearize};
use brk_types::{
CpfpCluster, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate, Height, Sats,
TxInIndex, TxIndex, Txid, TxidPrefix, VSize, Weight,
CPFP_CHAIN_LIMIT, ChunkInput, CpfpCluster, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry,
CpfpInfo, FeeRate, Height, Sats, TxInIndex, TxIndex, Txid, TxidPrefix, VSize, Weight,
find_seed_chunk, linearize,
};
use rustc_hash::{FxBuildHasher, FxHashMap};
use smallvec::SmallVec;
@@ -21,10 +21,6 @@ use vecdb::{ReadableVec, VecIndex};
use crate::Query;
/// Cap matches Bitcoin Core's default mempool ancestor/descendant
/// chain limits and mempool.space's truncation.
const MAX: usize = 25;
struct WalkResult {
/// Cluster members in `[ancestors..., seed, descendants...]` order,
/// each paired with its in-cluster parent edges resolved to the
@@ -58,13 +54,12 @@ impl Query {
/// Effective fee rate for `txid` using the same chunk-rate semantics
/// across paths:
///
/// - Live mempool: snapshot's per-tx `chunk_rate` (Core's
/// `fees.chunk` / `chunkweight`, or proxy fallback). If the tx is
/// in the pool but not in the latest snapshot (e.g. just added),
/// falls back to the entry's simple `fee/vsize`.
/// - Live mempool: snapshot's per-tx linearized `chunk_rate`. If
/// the tx is in the pool but not in the latest snapshot (e.g.
/// just added), falls back to the entry's simple `fee/vsize`.
/// - Confirmed: precomputed `effective_fee_rate.tx_index`.
/// - Graveyard-only RBF predecessor: simple `fee/vsize` snapshotted
/// at burial.
/// - Graveyard-only RBF predecessor: linearized chunk rate
/// captured at burial.
///
/// Returns `Error::UnknownTxid` for txids not seen in any of those.
pub fn effective_fee_rate(&self, txid: &Txid) -> Result<FeeRate> {
@@ -158,7 +153,7 @@ impl Query {
/// BFS the seed's same-block ancestors (via `outpoint`) and
/// descendants (via `spent.txin_index` -> `spending_tx`), capped
/// at `MAX` each side to match Core/mempool.space. Returns members
/// at `CPFP_CHAIN_LIMIT` each side to match Core/mempool.space. Returns members
/// laid out as `[ancestors..., seed, descendants...]` so the seed's
/// local index is `ancestors.len()`.
fn walk_same_block_cluster(&self, seed: TxIndex, height: Height) -> Result<WalkResult> {
@@ -202,7 +197,7 @@ impl Query {
};
let mut visited: FxHashMap<TxIndex, ()> =
FxHashMap::with_capacity_and_hasher(2 * MAX + 1, FxBuildHasher);
FxHashMap::with_capacity_and_hasher(2 * CPFP_CHAIN_LIMIT + 1, FxBuildHasher);
visited.insert(seed, ());
// Ancestor BFS: each push records (tx_index, raw parent tx_indices)
@@ -212,7 +207,7 @@ impl Query {
let mut stack: Vec<SmallVec<[TxIndex; 2]>> = vec![seed_inputs.clone()];
'a: while let Some(parents) = stack.pop() {
for parent in parents {
if ancestors.len() >= MAX {
if ancestors.len() >= CPFP_CHAIN_LIMIT {
break 'a;
}
if visited.insert(parent, ()).is_some() || !same_block(parent) {
@@ -249,7 +244,7 @@ impl Query {
}
descendants.push((child, walk_inputs(child)));
stack.push(child);
if descendants.len() >= MAX {
if descendants.len() >= CPFP_CHAIN_LIMIT {
break 'd;
}
}
@@ -336,12 +331,7 @@ fn build_cpfp_info(
})
.collect();
let chunks = linearize(&inputs);
let (chunk_index, seed_rate) = chunks
.iter()
.enumerate()
.find(|(_, ch)| ch.txs.contains(&seed_local))
.map(|(i, ch)| (i as u32, ch.feerate))
.unwrap_or((0, seed.rate));
let (chunk_index, seed_rate) = find_seed_chunk(&chunks, seed_local, seed.rate);
let cluster_txs: Vec<CpfpClusterTx> = members
.iter()
.map(|m| CpfpClusterTx {

View File

@@ -26,45 +26,35 @@ const RPC_NOT_FOUND: i32 = -5;
use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, RawTx, TxOutInfo};
/// Per-batch request count for `get_block_hashes_range`. Sized so the
/// Per-batch request count for `get_block_hashes_range`,
/// `fetch_mempool_entries`, and `fetch_raw_transactions`. Sized so the
/// JSON request body stays well under a megabyte and bitcoind doesn't
/// spend too long on a single batch before yielding results.
const BATCH_CHUNK: usize = 2000;
/// Live mempool state fetched in one batched bitcoind round-trip:
/// `getrawmempool verbose` + `getblocktemplate` + `getmempoolinfo`.
/// `gbt` is validated to be a subset of `entries` before construction;
/// callers that want strict consistency should rely on this fact.
/// `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`.
/// `gbt` is validated to be a subset of `live_txids` before
/// construction; on mismatch the cycle is skipped (`Ok(None)`) so we
/// never publish a block 0 missing txids Core would actually mine.
pub struct MempoolState {
pub entries: Vec<MempoolEntryInfo>,
pub live_txids: Vec<Txid>,
pub gbt: Vec<BlockTemplateTx>,
pub min_fee: FeeRate,
}
#[derive(Deserialize)]
struct VerboseEntryRaw {
struct MempoolEntryRaw {
vsize: VSize,
weight: Weight,
time: Timestamp,
#[serde(rename = "ancestorcount")]
ancestor_count: u64,
#[serde(rename = "ancestorsize")]
ancestor_size: VSize,
#[serde(rename = "descendantsize")]
descendant_size: VSize,
fees: VerboseFeesRaw,
fees: MempoolEntryFeesRaw,
depends: Vec<String>,
#[serde(rename = "chunkweight", default)]
chunk_weight: Option<Weight>,
}
#[derive(Deserialize)]
struct VerboseFeesRaw {
struct MempoolEntryFeesRaw {
base: Bitcoin,
ancestor: Bitcoin,
descendant: Bitcoin,
#[serde(default)]
chunk: Option<Bitcoin>,
}
#[derive(Deserialize)]
@@ -78,31 +68,20 @@ struct GbtTxRaw {
fee: u64,
}
fn build_verbose(raw: FxHashMap<String, VerboseEntryRaw>) -> Result<Vec<MempoolEntryInfo>> {
raw.into_iter()
.map(|(txid_str, e)| {
let depends = e
.depends
.iter()
.map(|s| Client::parse_txid(s, "depends txid"))
.collect::<Result<Vec<_>>>()?;
Ok(MempoolEntryInfo {
txid: Client::parse_txid(&txid_str, "mempool txid")?,
vsize: e.vsize,
weight: e.weight,
fee: Sats::from(e.fees.base),
first_seen: e.time,
ancestor_count: e.ancestor_count,
ancestor_size: e.ancestor_size,
ancestor_fee: Sats::from(e.fees.ancestor),
descendant_size: e.descendant_size,
descendant_fee: Sats::from(e.fees.descendant),
chunk_fee: e.fees.chunk.map(Sats::from),
chunk_weight: e.chunk_weight,
depends,
})
})
.collect()
fn build_entry(txid: Txid, e: MempoolEntryRaw) -> Result<MempoolEntryInfo> {
let depends = e
.depends
.iter()
.map(|s| Client::parse_txid(s, "depends txid"))
.collect::<Result<Vec<_>>>()?;
Ok(MempoolEntryInfo {
txid,
vsize: e.vsize,
weight: e.weight,
fee: Sats::from(e.fees.base),
first_seen: e.time,
depends,
})
}
fn build_gbt(raw: GbtResponseRaw) -> Vec<BlockTemplateTx> {
@@ -371,55 +350,86 @@ impl Client {
Ok(Txid::from(txid))
}
/// Verbose mempool listing + Core's projected next block + live
/// Core's projected next block + live mempool txid set +
/// `mempoolminfee`, fetched in a single bitcoind round-trip.
/// `getblocktemplate` runs first so that any tx arriving between
/// the two intra-batch calls lands in the verbose listing only,
/// preserving GBT ⊆ verbose for the common race. Validates that
/// every GBT txid is present in the verbose listing and returns
/// `Ok(None)` on mismatch so the caller can skip the cycle:
/// republishing block 0 with missing txids would diverge from
/// Core's exact selection. Other failures bubble up as `Err`.
/// `getblocktemplate` runs first so any tx arriving between the
/// intra-batch calls lands in the txid listing only, preserving
/// GBT ⊆ txids for the common race. Validates that every GBT txid
/// is present in the txid listing and returns `Ok(None)` on
/// mismatch so the caller can skip the cycle: republishing block 0
/// with missing txids would diverge from Core's exact selection.
/// Other failures bubble up as `Err`.
pub fn fetch_mempool_state(&self) -> Result<Option<MempoolState>> {
let requests: [(&str, Vec<Value>); 3] = [
(
"getblocktemplate",
vec![serde_json::json!({ "rules": ["segwit"] })],
),
("getrawmempool", vec![Value::Bool(true)]),
("getrawmempool", vec![Value::Bool(false)]),
("getmempoolinfo", vec![]),
];
let mut out = self.0.call_mixed_batch(&requests)?.into_iter();
let gbt_raw = out.next().ok_or(Error::Internal("missing gbt"))??;
let verbose_raw = out.next().ok_or(Error::Internal("missing verbose"))??;
let txids_raw = out.next().ok_or(Error::Internal("missing rawmempool"))??;
let info_raw = out.next().ok_or(Error::Internal("missing mempoolinfo"))??;
let verbose: FxHashMap<String, VerboseEntryRaw> = serde_json::from_str(verbose_raw.get())?;
let entries = build_verbose(verbose)?;
let txid_strs: Vec<String> = serde_json::from_str(txids_raw.get())?;
let live_txids: Vec<Txid> = txid_strs
.iter()
.map(|s| Self::parse_txid(s, "mempool txid"))
.collect::<Result<Vec<_>>>()?;
let gbt = build_gbt(serde_json::from_str(gbt_raw.get())?);
let min_fee = build_min_fee(serde_json::from_str(info_raw.get())?);
#[cfg(debug_assertions)]
{
let entry_set: rustc_hash::FxHashSet<Txid> = entries.iter().map(|e| e.txid).collect();
let missing = gbt.iter().filter(|t| !entry_set.contains(&t.txid)).count();
if missing > 0 {
tracing::warn!(
missing,
gbt_total = gbt.len(),
"getblocktemplate has {missing} txids not in verbose mempool; skipping cycle"
);
return Ok(None);
}
let live_set: rustc_hash::FxHashSet<Txid> = live_txids.iter().copied().collect();
let missing = gbt.iter().filter(|t| !live_set.contains(&t.txid)).count();
if missing > 0 {
#[cfg(debug_assertions)]
tracing::warn!(
missing,
gbt_total = gbt.len(),
"getblocktemplate has {missing} txids not in mempool listing; skipping cycle"
);
return Ok(None);
}
Ok(Some(MempoolState {
entries,
live_txids,
gbt,
min_fee,
}))
}
/// Batched `getmempoolentry` for the given txids. Returns
/// `MempoolEntryInfo` per successful lookup. Per-item -5 (NOT_FOUND
/// — tx evicted/replaced between the txid listing and this call)
/// drops silently; transport-level failures still propagate.
/// Chunked at `BATCH_CHUNK` requests per round-trip.
pub fn fetch_mempool_entries(&self, txids: &[Txid]) -> Result<Vec<MempoolEntryInfo>> {
let mut out: Vec<MempoolEntryInfo> = Vec::with_capacity(txids.len());
for chunk in txids.chunks(BATCH_CHUNK) {
let args = chunk.iter().map(|t| {
let bt: &bitcoin::Txid = t.into();
vec![serde_json::to_value(bt).unwrap_or(Value::Null)]
});
let results: Vec<Result<MempoolEntryRaw>> =
self.0.call_batch_per_item("getmempoolentry", args)?;
for (txid, res) in chunk.iter().zip(results) {
match res {
Ok(raw) => out.push(build_entry(*txid, raw)?),
Err(Error::CorepcRPC(JsonRpcError::Rpc(rpc))) if rpc.code == RPC_NOT_FOUND => {}
Err(e) => {
debug!(txid = %txid, error = %e, "getmempoolentry batch: item failed")
}
}
}
}
Ok(out)
}
pub fn get_closest_valid_height(&self, hash: BlockHash) -> Result<(Height, BlockHash)> {
debug!("Get closest valid height...");

View File

@@ -0,0 +1,9 @@
use crate::{CpfpClusterTxIndex, Sats, VSize};
/// One cluster member's input to Single Fee Linearization: its
/// `(fee, vsize)` and parent edges as local indices into the same array.
pub struct ChunkInput<'a> {
pub fee: Sats,
pub vsize: VSize,
pub parents: &'a [CpfpClusterTxIndex],
}

View File

@@ -14,3 +14,20 @@ pub struct CpfpClusterChunk {
pub txs: Vec<CpfpClusterTxIndex>,
pub feerate: FeeRate,
}
/// Find the chunk containing `seed_local` and return `(chunk_index,
/// feerate)`. Falls back to `(0, fallback)` when the seed isn't in any
/// chunk - shouldn't happen for a well-formed linearization but keeps
/// callers' wire shape valid.
pub fn find_seed_chunk(
chunks: &[CpfpClusterChunk],
seed_local: CpfpClusterTxIndex,
fallback: FeeRate,
) -> (u32, FeeRate) {
chunks
.iter()
.enumerate()
.find(|(_, ch)| ch.txs.contains(&seed_local))
.map(|(i, ch)| (i as u32, ch.feerate))
.unwrap_or((0, fallback))
}

View File

@@ -1,34 +1,21 @@
//! Cluster mempool linearization (Core 31's "Single Fee Linearization").
//! Single Fee Linearization (Bitcoin Core 31's cluster mempool SFL).
//!
//! Given a topologically ordered cluster (parents before children) with
//! per-tx `(fee, vsize)` and parent edges as local indices, partition the
//! cluster into chunks ordered by descending feerate, where each chunk is
//! the highest-rate ancestor-closed set of remaining txs.
//! Partition a topo-ordered cluster into chunks ordered by descending
//! feerate, where each chunk is the highest-rate ancestor-closed set
//! of remaining txs. Spec-equivalent to Core's `fees.chunk` /
//! `chunkweight`; works on any Core version.
//!
//! The "lift" merging this implements is what makes CPFP visible at the
//! cluster level: a child whose rate exceeds its parent's rate gets folded
//! into a chunk with the parent, and the chunk's rate is the combined
//! `(parent_fee + child_fee) / (parent_vsize + child_vsize)`. Cascades
//! upward through any further parents until rates are non-increasing.
//! The "lift" this implements is what makes CPFP visible at the
//! cluster level: a child whose rate exceeds its parent's gets folded
//! into a chunk with the parent at the combined `(parent_fee +
//! child_fee) / (parent_vsize + child_vsize)`. Cascades upward through
//! any further parents until rates are non-increasing.
//!
//! This is the proxy-fallback case; under Core 31+ each tx's `fees.chunk`
//! / `chunkweight` already encodes the chunked rate, so all members of a
//! chunk would share that rate. Computing locally from `(fee, vsize)`
//! gives the same answer either way and works on older Core too.
//!
//! Complexity is `O(n^2)` per linearization (n bounded by cluster cap),
//! matching mempool.space's frontend implementation.
//! `O(n^2)` per linearization, `n` bounded by the cluster cap.
use brk_types::{CpfpClusterChunk, CpfpClusterTxIndex, FeeRate, Sats, VSize};
use rustc_hash::{FxBuildHasher, FxHashSet};
/// One cluster member: its `(fee, vsize)` and parent edges as
/// local indices into the same array.
pub struct ChunkInput<'a> {
pub fee: Sats,
pub vsize: VSize,
pub parents: &'a [CpfpClusterTxIndex],
}
use crate::{ChunkInput, CpfpClusterChunk, CpfpClusterTxIndex, FeeRate, Sats, VSize};
/// Linearize `items` into chunks. `items` must be in topological order
/// (parents before children); `parents` indices must point earlier in

View File

@@ -1,13 +1,21 @@
mod chunk_input;
mod cluster;
mod cluster_chunk;
mod cluster_tx;
mod cluster_tx_index;
mod entry;
mod info;
mod linearize;
pub use chunk_input::ChunkInput;
pub use cluster::CpfpCluster;
pub use cluster_chunk::CpfpClusterChunk;
pub use cluster_chunk::{CpfpClusterChunk, find_seed_chunk};
pub use cluster_tx::CpfpClusterTx;
pub use cluster_tx_index::CpfpClusterTxIndex;
pub use entry::CpfpEntry;
pub use info::CpfpInfo;
pub use linearize::linearize;
/// Bitcoin Core's default mempool ancestor/descendant chain cap, also
/// used by mempool.space-style truncation in CPFP walks.
pub const CPFP_CHAIN_LIMIT: usize = 25;

View File

@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use crate::{FeeRate, Sats, VSize};
/// Block info in a mempool.space like format for fee estimation.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Default, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct MempoolBlock {
/// Total serialized block size in bytes (witness + non-witness).

View File

@@ -1,6 +1,6 @@
use crate::{FeeRate, Sats, Timestamp, Txid, VSize, Weight};
use crate::{Sats, Timestamp, Txid, VSize, Weight};
/// Mempool entry info from Bitcoin Core's `getrawmempool true`.
/// Mempool entry info from Bitcoin Core's `getmempoolentry`.
#[derive(Debug, Clone)]
pub struct MempoolEntryInfo {
pub txid: Txid,
@@ -8,32 +8,6 @@ pub struct MempoolEntryInfo {
pub weight: Weight,
pub fee: Sats,
pub first_seen: Timestamp,
pub ancestor_count: u64,
pub ancestor_size: VSize,
pub ancestor_fee: Sats,
pub descendant_size: VSize,
pub descendant_fee: Sats,
/// Total fee of the cluster mempool chunk this tx belongs to.
/// Present from Bitcoin Core 31+ (cluster mempool); absent on
/// older Core, in which case rate-callers fall back to
/// `max(ancestor_rate, descendant_pkg_rate)`.
pub chunk_fee: Option<Sats>,
pub chunk_weight: Option<Weight>,
/// Parent txids in the mempool.
pub depends: Vec<Txid>,
}
impl MempoolEntryInfo {
/// Effective per-vbyte rate Core would mine this tx at. Uses the
/// Core-31 `fees.chunk` / `chunkweight` chunk fields when present;
/// otherwise falls back to `max(ancestor_rate, descendant_pkg_rate)`,
/// which bounds the predictive error in deep clusters.
pub fn chunk_rate(&self) -> FeeRate {
if let (Some(chunk_fee), Some(chunk_weight)) = (self.chunk_fee, self.chunk_weight) {
return FeeRate::from((chunk_fee, VSize::from(chunk_weight)));
}
let anc = FeeRate::from((self.ancestor_fee, self.ancestor_size));
let desc = FeeRate::from((self.descendant_fee, self.descendant_size));
anc.max(desc)
}
}