mempool: fixes

This commit is contained in:
nym21
2026-05-08 00:16:37 +02:00
parent d2b8992932
commit 9d18e2db9b
17 changed files with 571 additions and 361 deletions

View File

@@ -86,7 +86,7 @@ tower-http = { version = "0.6.10", features = ["catch-panic", "compression-br",
tower-layer = "0.3"
tracing = { version = "0.1", default-features = false, features = ["std"] }
ureq = { version = "3.3.0", features = ["json"] }
vecdb = { version = "=0.10.3", features = ["derive", "serde_json", "pco", "schemars"] }
vecdb = { version = "0.10.3", features = ["derive", "serde_json", "pco", "schemars"] }
# vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco", "schemars"] }
[workspace.metadata.release]

View File

@@ -0,0 +1,89 @@
//! Cluster mempool linearization (Core 31's "Single Fee Linearization").
//!
//! Given a topologically ordered cluster (parents before children) with
//! per-tx `(fee, vsize)` and parent edges as local indices, partition the
//! cluster into chunks ordered by descending feerate, where each chunk is
//! the highest-rate ancestor-closed set of remaining txs.
//!
//! The "lift" merging this implements is what makes CPFP visible at the
//! cluster level: a child whose rate exceeds its parent's rate gets folded
//! into a chunk with the parent, and the chunk's rate is the combined
//! `(parent_fee + child_fee) / (parent_vsize + child_vsize)`. Cascades
//! upward through any further parents until rates are non-increasing.
//!
//! This is the proxy-fallback case; under Core 31+ each tx's `fees.chunk`
//! / `chunkweight` already encodes the chunked rate, so all members of a
//! chunk would share that rate. Computing locally from `(fee, vsize)`
//! gives the same answer either way and works on older Core too.
//!
//! Complexity is `O(n^2)` per linearization (n bounded by cluster cap),
//! matching mempool.space's frontend implementation.
use brk_types::{CpfpClusterChunk, CpfpClusterTxIndex, FeeRate, Sats, VSize};
use rustc_hash::{FxBuildHasher, FxHashSet};
/// One cluster member: its `(fee, vsize)` and parent edges as
/// local indices into the same array.
pub struct ChunkInput<'a> {
pub fee: Sats,
pub vsize: VSize,
pub parents: &'a [CpfpClusterTxIndex],
}
/// Linearize `items` into chunks. `items` must be in topological order
/// (parents before children); `parents` indices must point earlier in
/// the slice. Returns chunks sorted by descending feerate, with each
/// chunk's `txs` listed in the input topological order.
pub fn linearize(items: &[ChunkInput<'_>]) -> Vec<CpfpClusterChunk> {
let n = items.len();
if n == 0 {
return Vec::new();
}
let mut remaining: Vec<bool> = vec![true; n];
let mut chunks: Vec<CpfpClusterChunk> = Vec::new();
while remaining.iter().any(|&r| r) {
let mut best: Option<(FeeRate, FxHashSet<u32>)> = None;
for i in 0..n {
if !remaining[i] {
continue;
}
let mut anc: FxHashSet<u32> =
FxHashSet::with_capacity_and_hasher(8, FxBuildHasher);
let mut stack: Vec<u32> = vec![i as u32];
while let Some(x) = stack.pop() {
if !anc.insert(x) {
continue;
}
for &p in items[x as usize].parents {
let pu: u32 = u32::from(p);
if remaining[pu as usize] && !anc.contains(&pu) {
stack.push(pu);
}
}
}
let mut fee = Sats::ZERO;
let mut vsize = VSize::from(0u64);
for &x in &anc {
fee += items[x as usize].fee;
vsize += items[x as usize].vsize;
}
let rate = FeeRate::from((fee, vsize));
match &best {
Some((br, _)) if *br >= rate => {}
_ => best = Some((rate, anc)),
}
}
let (rate, set) = best.expect("at least one remaining tx");
let mut indices: Vec<u32> = set.into_iter().collect();
indices.sort_unstable();
for &x in &indices {
remaining[x as usize] = false;
}
let txs: Vec<CpfpClusterTxIndex> =
indices.into_iter().map(CpfpClusterTxIndex::from).collect();
chunks.push(CpfpClusterChunk { txs, feerate: rate });
}
chunks
}

View File

@@ -1,11 +1,25 @@
//! CPFP (Child Pays For Parent) walk over a `Snapshot`'s adjacency.
//!
//! The snapshot stores per-tx parent/child edges in `TxIndex` space and
//! a per-tx `chunk_rate` (Core's `fees.chunk` / `chunkweight` truth, or
//! the proxy fallback). The walk is a pair of capped DFSes, then the
//! cluster wire shape is materialized from the visited set.
//! per-tx `(fee, vsize)` we need for chunking.
//!
//! Three independent walks:
//! - `ancestors_idx`: capped DFS up `parents` only.
//! - `descendants_idx`: capped DFS down `children` only.
//! - cluster `members`: capped DFS over `parents children`, i.e. the
//! connected component of the seed in the in-mempool dependency
//! graph. Required to match Core 31's cluster mempool semantics:
//! siblings (sharing a parent) and cousins (sharing a descendant)
//! belong to the same cluster but are missed by ancestor/descendant
//! walks alone.
//!
//! The cluster is then linearized via `brk_types::linearize` (single fee
//! linearization) so chunks reflect Core's CPFP "lift": a child whose
//! rate exceeds its parent's gets folded into a chunk with the parent
//! at the combined feerate. The seed's chunk feerate is what
//! `effective_fee_per_vsize` reports.
use std::cmp::Reverse;
use std::collections::VecDeque;
use brk_types::{
CpfpCluster, CpfpClusterChunk, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate,
@@ -13,13 +27,22 @@ use brk_types::{
};
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use crate::Mempool;
use crate::steps::{SnapTx, TxIndex};
use crate::{
Mempool,
chunking::{ChunkInput, linearize},
steps::{SnapTx, TxIndex},
};
/// Cap matches Bitcoin Core's default mempool ancestor/descendant
/// chain limits and mempool.space's truncation.
const MAX: usize = 25;
/// Cluster cap matches Bitcoin Core 31's `MAX_CLUSTER_COUNT_LIMIT`
/// (max txs in a single cluster-mempool cluster). Sized large enough
/// to hold the whole connected component for any policy-conformant
/// cluster, then truncated.
const MAX_CLUSTER: usize = 64;
impl Mempool {
/// CPFP info for a live mempool tx. Returns `None` only when the
/// tx isn't in the mempool, so callers can fall through to the
@@ -46,30 +69,21 @@ fn build_cpfp_info(
seed: &SnapTx,
sigops: SigOps,
) -> CpfpInfo {
let ancestors_idx = walk(txs, seed_idx, |t| &t.parents);
let descendants_idx = walk(txs, seed_idx, |t| &t.children);
let ancestors: Vec<CpfpEntry> = ancestors_idx
.iter()
.filter_map(|&i| txs.get(i.as_usize()).map(CpfpEntry::from))
.collect();
let descendants: Vec<CpfpEntry> = descendants_idx
.iter()
.filter_map(|&i| txs.get(i.as_usize()).map(CpfpEntry::from))
.collect();
let ancestors = collect_entries(txs, seed_idx, |t| &t.parents);
let descendants = collect_entries(txs, seed_idx, |t| &t.children);
let best_descendant = descendants
.iter()
.max_by_key(|e| FeeRate::from((e.fee, e.weight)))
.cloned();
let cluster = build_cluster(txs, seed_idx, &ancestors_idx, &descendants_idx);
let (cluster, effective_fee_per_vsize) = build_cluster(txs, seed_idx, seed);
let vsize = VSize::from(seed.weight);
CpfpInfo {
ancestors,
best_descendant,
descendants,
effective_fee_per_vsize: seed.chunk_rate,
effective_fee_per_vsize,
sigops,
fee: seed.fee,
vsize,
@@ -78,17 +92,30 @@ fn build_cpfp_info(
}
}
/// Walk the graph from `seed` along `next` and lift the visited indices
/// into wire-shape `CpfpEntry`s in one go.
fn collect_entries(
txs: &[SnapTx],
seed: TxIndex,
next: impl Fn(&SnapTx) -> &[TxIndex],
) -> Vec<CpfpEntry> {
walk(txs, seed, next)
.iter()
.filter_map(|&i| txs.get(i.as_usize()).map(CpfpEntry::from))
.collect()
}
/// Capped DFS from `seed` (exclusive), following the neighbors yielded
/// by `next`. Used for both the ancestor and descendant walks.
fn walk(txs: &[SnapTx], seed: TxIndex, next: impl Fn(&SnapTx) -> &[TxIndex]) -> Vec<TxIndex> {
let Some(seed_node) = txs.get(seed.as_usize()) else {
return Vec::new();
};
let mut visited: FxHashSet<TxIndex> =
FxHashSet::with_capacity_and_hasher(MAX + 1, FxBuildHasher);
visited.insert(seed);
let mut out: Vec<TxIndex> = Vec::with_capacity(MAX);
let mut stack: Vec<TxIndex> = txs
.get(seed.as_usize())
.map(|t| next(t).to_vec())
.unwrap_or_default();
let mut stack: Vec<TxIndex> = next(seed_node).to_vec();
while let Some(idx) = stack.pop() {
if out.len() >= MAX {
break;
@@ -104,84 +131,175 @@ fn walk(txs: &[SnapTx], seed: TxIndex, next: impl Fn(&SnapTx) -> &[TxIndex]) ->
out
}
/// Wire-shape `CpfpCluster`. Members are emitted in `[ancestors..., seed,
/// descendants...]` order so the seed's index inside the cluster is
/// `ancestors.len()`. Chunks group txs by exact `chunk_rate` value: under
/// Core 31 this matches Core's actual chunks; under proxy fallback it
/// produces a fine-grained but consistent breakdown.
/// Wire-shape `CpfpCluster` plus the seed's chunk feerate. Members are
/// the connected component of the seed in the dependency graph, then
/// topologically sorted (parents before children) so wire indices and
/// chunk-internal ordering are valid for client-side reconstruction.
/// Returns `(None, seed_per_tx_rate)` for singletons (matches
/// mempool.space, which omits `cluster` when no relations exist).
fn build_cluster(
txs: &[SnapTx],
seed_idx: TxIndex,
ancestors: &[TxIndex],
descendants: &[TxIndex],
) -> CpfpCluster {
let members: Vec<TxIndex> = ancestors
.iter()
.copied()
.chain(std::iter::once(seed_idx))
.chain(descendants.iter().copied())
.collect();
seed: &SnapTx,
) -> (Option<CpfpCluster>, FeeRate) {
let seed_per_tx_rate = FeeRate::from((seed.fee, seed.vsize));
let component = walk_cluster(txs, seed_idx);
if component.len() <= 1 {
return (None, seed_per_tx_rate);
}
let local_of: FxHashMap<TxIndex, CpfpClusterTxIndex> = members
let members = topo_sort(txs, &component);
let local_of = build_local_index(&members);
let (cluster_txs, vsizes) = collect_cluster_members(txs, &members, &local_of);
let chunks = linearize_cluster(&cluster_txs, &vsizes);
let (chunk_index, seed_chunk_rate) =
locate_seed_chunk(local_of[&seed_idx], &chunks, seed_per_tx_rate);
(
Some(CpfpCluster {
txs: cluster_txs,
chunks,
chunk_index,
}),
seed_chunk_rate,
)
}
/// `members[i]`'s wire index, keyed by snapshot `TxIndex`. Built once
/// so per-tx parent edges can be remapped without a linear scan.
fn build_local_index(members: &[TxIndex]) -> FxHashMap<TxIndex, CpfpClusterTxIndex> {
members
.iter()
.enumerate()
.map(|(i, &idx)| (idx, CpfpClusterTxIndex::from(i as u32)))
.collect();
.collect()
}
let cluster_txs: Vec<CpfpClusterTx> = members
/// Materialize wire-shape `CpfpClusterTx`s for every member with parent
/// edges remapped to local indices, plus the parallel `vsize` column the
/// linearizer needs (not carried on `CpfpClusterTx`, which only stores
/// weight).
fn collect_cluster_members(
txs: &[SnapTx],
members: &[TxIndex],
local_of: &FxHashMap<TxIndex, CpfpClusterTxIndex>,
) -> (Vec<CpfpClusterTx>, Vec<VSize>) {
let mut cluster_txs: Vec<CpfpClusterTx> = Vec::with_capacity(members.len());
let mut vsizes: Vec<VSize> = Vec::with_capacity(members.len());
for &idx in members {
let Some(t) = txs.get(idx.as_usize()) else {
continue;
};
let parents: Vec<CpfpClusterTxIndex> = t
.parents
.iter()
.filter_map(|p| local_of.get(p).copied())
.collect();
cluster_txs.push(CpfpClusterTx {
txid: t.txid,
weight: t.weight,
fee: t.fee,
parents,
});
vsizes.push(t.vsize);
}
(cluster_txs, vsizes)
}
/// Single-fee-linearize the cluster, borrowing parents from the
/// already-built `cluster_txs` so no re-allocation is needed.
fn linearize_cluster(cluster_txs: &[CpfpClusterTx], vsizes: &[VSize]) -> Vec<CpfpClusterChunk> {
let inputs: Vec<ChunkInput<'_>> = cluster_txs
.iter()
.filter_map(|&idx| {
let t = txs.get(idx.as_usize())?;
Some(CpfpClusterTx {
txid: t.txid,
weight: t.weight,
fee: t.fee,
parents: t
.parents
.iter()
.filter_map(|p| local_of.get(p).copied())
.collect(),
})
.zip(vsizes)
.map(|(c, &vsize)| ChunkInput {
fee: c.fee,
vsize,
parents: &c.parents,
})
.collect();
let chunks = chunk_groups(&members, txs, &local_of);
let seed_local = local_of[&seed_idx];
let chunk_index = chunks
.iter()
.position(|ch| ch.txs.contains(&seed_local))
.unwrap_or(0) as u32;
CpfpCluster {
txs: cluster_txs,
chunks,
chunk_index,
}
linearize(&inputs)
}
/// Group cluster members into chunks by descending `chunk_rate`. Cluster
/// size is bounded by `2 * MAX + 1` so a sort-then-fold is cheaper and
/// simpler than a hashmap keyed on `f64` bits.
fn chunk_groups(
members: &[TxIndex],
txs: &[SnapTx],
local_of: &FxHashMap<TxIndex, CpfpClusterTxIndex>,
) -> Vec<CpfpClusterChunk> {
let mut entries: Vec<(FeeRate, CpfpClusterTxIndex)> = members
/// Find the chunk containing the seed and return its index plus rate.
/// Falls back to `(0, seed_per_tx_rate)` when the seed isn't in any
/// chunk - shouldn't happen but keeps the wire shape valid.
fn locate_seed_chunk(
seed_local: CpfpClusterTxIndex,
chunks: &[CpfpClusterChunk],
seed_per_tx_rate: FeeRate,
) -> (u32, FeeRate) {
chunks
.iter()
.filter_map(|&idx| Some((txs.get(idx.as_usize())?.chunk_rate, local_of[&idx])))
.collect();
entries.sort_by_key(|e| Reverse(e.0));
.enumerate()
.find(|(_, ch)| ch.txs.contains(&seed_local))
.map(|(i, ch)| (i as u32, ch.feerate))
.unwrap_or((0, seed_per_tx_rate))
}
let mut chunks: Vec<CpfpClusterChunk> = Vec::new();
for (rate, local) in entries {
match chunks.last_mut() {
Some(last) if last.feerate == rate => last.txs.push(local),
_ => chunks.push(CpfpClusterChunk {
txs: vec![local],
feerate: rate,
}),
/// Capped DFS over the undirected dependency graph (`parents
/// children`) starting from `seed`. Returns the connected component
/// truncated to `MAX_CLUSTER`, with `seed` at index 0.
fn walk_cluster(txs: &[SnapTx], seed: TxIndex) -> Vec<TxIndex> {
if txs.get(seed.as_usize()).is_none() {
return Vec::new();
}
let mut visited: FxHashSet<TxIndex> =
FxHashSet::with_capacity_and_hasher(MAX_CLUSTER, FxBuildHasher);
visited.insert(seed);
let mut out: Vec<TxIndex> = Vec::with_capacity(MAX_CLUSTER);
out.push(seed);
let mut stack: Vec<TxIndex> = vec![seed];
while let Some(idx) = stack.pop() {
let Some(t) = txs.get(idx.as_usize()) else {
continue;
};
for &n in t.parents.iter().chain(t.children.iter()) {
if out.len() >= MAX_CLUSTER {
return out;
}
if visited.insert(n) {
out.push(n);
stack.push(n);
}
}
}
chunks
out
}
/// Kahn's topological sort over the connected component, restricted to
/// in-cluster parent edges. Returns members in an order where every tx
/// follows all its in-cluster parents.
fn topo_sort(txs: &[SnapTx], component: &[TxIndex]) -> Vec<TxIndex> {
let n = component.len();
let pos: FxHashMap<TxIndex, usize> = component
.iter()
.enumerate()
.map(|(i, &x)| (x, i))
.collect();
let mut indeg: Vec<u32> = vec![0; n];
let mut children: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, &idx) in component.iter().enumerate() {
let Some(t) = txs.get(idx.as_usize()) else {
continue;
};
indeg[i] = t.parents.iter().filter(|p| pos.contains_key(p)).count() as u32;
for &c in t.children.iter() {
if let Some(&ci) = pos.get(&c) {
children[i].push(ci);
}
}
}
let mut queue: VecDeque<usize> = (0..n).filter(|&i| indeg[i] == 0).collect();
let mut out: Vec<TxIndex> = Vec::with_capacity(n);
while let Some(i) = queue.pop_front() {
out.push(component[i]);
for &c in &children[i] {
indeg[c] -= 1;
if indeg[c] == 0 {
queue.push_back(c);
}
}
}
out
}

View File

@@ -11,14 +11,15 @@
//! `TxsPulled { added, removed }`. Pure CPU.
//! 3. [`steps::applier::Applier`] - apply the diff to
//! [`state::State`] under a single write lock.
//! 4. [`prevouts::fill`] - fills `prevout: None` inputs in one pass,
//! using same-cycle in-mempool parents directly and the
//! 4. [`steps::Prevouts::fill`] - fills `prevout: None` inputs in one
//! pass, using same-cycle in-mempool parents directly and the
//! caller-supplied resolver (default: `getrawtransaction`) for
//! confirmed parents.
//! 5. [`steps::rebuilder::Rebuilder`] - throttled rebuild of the
//! projected-blocks `Snapshot` from the same-cycle GBT and min fee.
use std::{
any::Any,
cmp::Reverse,
panic::{AssertUnwindSafe, catch_unwind},
sync::Arc,
@@ -35,17 +36,18 @@ use brk_types::{
use parking_lot::{RwLock, RwLockReadGuard};
use tracing::error;
pub mod chunking;
mod cpfp;
mod diagnostics;
mod prevouts;
mod rbf;
mod state;
pub(crate) mod steps;
pub(crate) mod stores;
pub use chunking::{ChunkInput, linearize};
pub use diagnostics::MempoolStats;
pub use rbf::{RbfForTx, RbfNode};
use steps::{Applier, Fetched, Fetcher, Preparer, Rebuilder};
use steps::{Applier, Fetched, Fetcher, Preparer, Prevouts, Rebuilder};
pub use steps::{BlockStats, RecommendedFees, Snapshot, TxEntry, TxRemoval};
pub use stores::{TxGraveyard, TxStore, TxTombstone};
@@ -166,23 +168,19 @@ impl Mempool {
let Some(entry) = state.addrs.get(addr) else {
return vec![];
};
let mut ordered: Vec<(Timestamp, &Txid)> = entry
let mut ordered: Vec<(Timestamp, &Transaction)> = entry
.txids
.iter()
.map(|txid| {
let first_seen = state
.txs
.entry(txid)
.map(|e| e.first_seen)
.unwrap_or_default();
(first_seen, txid)
.filter_map(|txid| {
let record = state.txs.record_by_prefix(&TxidPrefix::from(txid))?;
Some((record.entry.first_seen, &record.tx))
})
.collect();
ordered.sort_unstable_by_key(|b| Reverse(b.0));
ordered
.into_iter()
.filter_map(|(_, txid)| state.txs.get(txid).cloned())
.take(limit)
.map(|(_, tx)| tx.clone())
.collect()
}
@@ -238,7 +236,7 @@ impl Mempool {
/// confirmed-parent prevouts via the default `getrawtransaction`
/// resolver; requires bitcoind started with `txindex=1`.
pub fn start(&self) {
self.start_with(prevouts::rpc_resolver(self.0.client.clone()));
self.start_with(Prevouts::rpc_resolver(self.0.client.clone()));
}
/// Variant of `start` that uses a caller-supplied resolver for
@@ -256,14 +254,7 @@ impl Mempool {
}
}));
if let Err(payload) = outcome {
let msg = if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic payload>".to_string()
};
error!("mempool update panicked, continuing loop: {msg}");
error!("mempool update panicked, continuing loop: {}", panic_msg(&payload));
}
thread::sleep(Duration::from_secs(1));
}
@@ -273,7 +264,7 @@ impl Mempool {
/// `update_with(rpc_resolver)`. Standalone consumers (Core +
/// `txindex=1`) get a one-line driver loop.
pub fn update(&self) -> Result<()> {
self.update_with(prevouts::rpc_resolver(self.0.client.clone()))
self.update_with(Prevouts::rpc_resolver(self.0.client.clone()))
}
/// One sync cycle: fetch, prepare, apply, fill prevouts, maybe
@@ -301,9 +292,17 @@ impl Mempool {
};
let pulled = Preparer::prepare(entries_info, new_raws, state);
let changed = Applier::apply(state, pulled);
prevouts::fill(state, resolver);
Prevouts::fill(state, resolver);
rebuilder.tick(state, changed, &gbt, min_fee);
Ok(())
}
}
fn panic_msg(payload: &(dyn Any + Send)) -> &str {
payload
.downcast_ref::<&'static str>()
.copied()
.or_else(|| payload.downcast_ref::<String>().map(String::as_str))
.unwrap_or("<non-string panic payload>")
}

View File

@@ -1,145 +0,0 @@
//! Prevout fill plumbing.
//!
//! A fresh tx can land in the store with `prevout: None` on some
//! inputs when the Preparer can't see the parent (parent arrived in
//! the same cycle as the child, or parent is confirmed and we don't
//! have an indexer hooked up). [`fill`] runs after each successful
//! `Applier::apply` and closes both gaps in one pass:
//!
//! 1. Snapshot under a read guard, walking `txs.unresolved()` once.
//! For each hole, if the parent is also in the live pool we record
//! a fill directly (cheap, lock-local). Otherwise we record the
//! hole for external resolution.
//! 2. Drop the read guard. Call `resolver` on the remaining holes
//! (typically `getrawtransaction` or an indexer lookup); failures
//! are simply skipped and retried next cycle.
//! 3. Take the write guard once and fold both fill batches into the
//! `TxStore` via `apply_fills` -> `add_input`. Idempotent: each
//! fill checks `prevout.is_none()` and bails if the tx was already
//! removed or filled between phases.
use std::sync::atomic::{AtomicBool, Ordering};
use brk_rpc::Client;
use brk_types::{TxOut, Txid, TxidPrefix, Vin, Vout};
use parking_lot::RwLock;
use tracing::warn;
use crate::{State, stores::TxStore};
/// Default resolver: per-call `getrawtransaction` against the bitcoind
/// RPC client `Mempool` already holds. Requires `txindex=1`. On any
/// failure logs once with a hint, then returns `None`; the next cycle
/// retries automatically.
pub(crate) fn rpc_resolver(client: Client) -> impl Fn(&Txid, Vout) -> Option<TxOut> {
let warned = AtomicBool::new(false);
move |txid, vout| {
let bt: &bitcoin::Txid = txid.into();
match client.get_raw_transaction(bt, None as Option<&bitcoin::BlockHash>) {
Ok(tx) => tx
.output
.get(usize::from(vout))
.map(|o| TxOut::from((o.script_pubkey.clone(), o.value.into()))),
Err(_) => {
if !warned.swap(true, Ordering::Relaxed) {
warn!(
"mempool: getrawtransaction missed for {txid}; ensure bitcoind is running with txindex=1"
);
}
None
}
}
}
}
type Fills = Vec<(Vin, TxOut)>;
type Holes = Vec<(Vin, Txid, Vout)>;
type FillBatch = Vec<(TxidPrefix, Txid, Fills)>;
type HoleBatch = Vec<(TxidPrefix, Txid, Holes)>;
/// Fill every unfilled prevout the cycle can resolve. Same-cycle
/// in-mempool parents are filled lock-locally; the remainder go
/// through `resolver` outside any lock. Returns true iff anything
/// was written.
pub(crate) fn fill<F>(lock: &RwLock<State>, resolver: F) -> bool
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
{
let (in_mempool, holes) = {
let state = lock.read();
gather(&state.txs)
};
let external = resolve_external(holes, resolver);
if in_mempool.is_empty() && external.is_empty() {
return false;
}
let mut state = lock.write();
write_fills(&mut state, in_mempool);
write_fills(&mut state, external);
true
}
/// Single pass over `txs.unresolved()`: bucket each hole into a
/// same-cycle in-mempool fill (parent is live) or an external hole
/// (parent is confirmed or unknown).
fn gather(txs: &TxStore) -> (FillBatch, HoleBatch) {
if txs.unresolved().is_empty() {
return (Vec::new(), Vec::new());
}
let mut filled: FillBatch = Vec::new();
let mut holes: HoleBatch = Vec::new();
for prefix in txs.unresolved() {
let Some(record) = txs.record_by_prefix(prefix) else {
continue;
};
let mut tx_fills: Fills = Vec::new();
let mut tx_holes: Holes = Vec::new();
for (i, txin) in record.tx.input.iter().enumerate() {
if txin.prevout.is_some() {
continue;
}
let vin = Vin::from(i);
if let Some(parent) = txs.get(&txin.txid)
&& let Some(out) = parent.output.get(usize::from(txin.vout))
{
tx_fills.push((vin, out.clone()));
} else {
tx_holes.push((vin, txin.txid, txin.vout));
}
}
let txid = record.entry.txid;
if !tx_fills.is_empty() {
filled.push((*prefix, txid, tx_fills));
}
if !tx_holes.is_empty() {
holes.push((*prefix, txid, tx_holes));
}
}
(filled, holes)
}
fn resolve_external<F>(holes: HoleBatch, resolver: F) -> FillBatch
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
{
holes
.into_iter()
.filter_map(|(prefix, txid, holes)| {
let fills: Fills = holes
.into_iter()
.filter_map(|(vin, prev_txid, vout)| resolver(&prev_txid, vout).map(|o| (vin, o)))
.collect();
(!fills.is_empty()).then_some((prefix, txid, fills))
})
.collect()
}
fn write_fills(state: &mut State, fills: FillBatch) {
for (prefix, txid, tx_fills) in fills {
for prevout in state.txs.apply_fills(&prefix, tx_fills) {
state.addrs.add_input(&txid, &prevout);
}
}
}

View File

@@ -103,8 +103,7 @@ fn resolve_node<'a>(
txs: &'a TxStore,
graveyard: &'a TxGraveyard,
) -> Option<(&'a Transaction, &'a TxEntry)> {
if let Some(record) = txs.record_by_prefix(&TxidPrefix::from(txid)) {
return Some((&record.tx, &record.entry));
}
graveyard.get(txid).map(|tomb| (&tomb.tx, &tomb.entry))
txs.record_by_prefix(&TxidPrefix::from(txid))
.map(|r| (&r.tx, &r.entry))
.or_else(|| graveyard.get(txid).map(|t| (&t.tx, &t.entry)))
}

View File

@@ -1,11 +1,13 @@
//! The four pipeline steps. See the crate-level docs for the cycle.
//! The five pipeline steps. See the crate-level docs for the cycle.
mod applier;
mod fetcher;
pub(crate) mod preparer;
mod prevouts;
pub(crate) mod rebuilder;
pub use applier::Applier;
pub use fetcher::{Fetched, Fetcher};
pub use preparer::{Preparer, TxEntry, TxRemoval};
pub use prevouts::Prevouts;
pub use rebuilder::{BlockStats, Rebuilder, RecommendedFees, SnapTx, Snapshot, TxIndex};

View File

@@ -41,20 +41,16 @@ impl Preparer {
) -> TxsPulled {
let state = lock.read();
let live = Self::live_set(&entries_info);
let live: FxHashSet<TxidPrefix> = entries_info
.iter()
.map(|info| TxidPrefix::from(&info.txid))
.collect();
let added = Self::classify_additions(entries_info, new_raws, &state.txs, &state.graveyard);
let removed = TxRemoval::classify(&live, &added, &state.txs);
TxsPulled { added, removed }
}
fn live_set(entries_info: &[MempoolEntryInfo]) -> FxHashSet<TxidPrefix> {
entries_info
.iter()
.map(|info| TxidPrefix::from(&info.txid))
.collect()
}
fn classify_additions(
entries_info: Vec<MempoolEntryInfo>,
mut new_raws: FxHashMap<Txid, RawTx>,

View File

@@ -0,0 +1,152 @@
//! Prevout fill plumbing.
//!
//! A fresh tx can land in the store with `prevout: None` on some
//! inputs when the Preparer can't see the parent (parent arrived in
//! the same cycle as the child, or parent is confirmed and we don't
//! have an indexer hooked up). [`Prevouts::fill`] runs after each
//! successful `Applier::apply` and closes both gaps in one pass:
//!
//! 1. Snapshot under a read guard, walking `txs.unresolved()` once.
//! For each hole, if the parent is also in the live pool we record
//! a fill directly (cheap, lock-local). Otherwise we record the
//! hole for external resolution.
//! 2. Drop the read guard. Call `resolver` on the remaining holes
//! (typically `getrawtransaction` or an indexer lookup); failures
//! are simply skipped and retried next cycle.
//! 3. Take the write guard once and fold both fill batches into the
//! `TxStore` via `apply_fills` -> `add_input`. Idempotent: each
//! fill checks `prevout.is_none()` and bails if the tx was already
//! removed or filled between phases.
use std::sync::atomic::{AtomicBool, Ordering};
use brk_rpc::Client;
use brk_types::{TxOut, Txid, TxidPrefix, Vin, Vout};
use parking_lot::RwLock;
use tracing::warn;
use crate::{State, stores::TxStore};
pub struct Prevouts;
type Fills = Vec<(Vin, TxOut)>;
type Holes = Vec<(Vin, Txid, Vout)>;
type FillBatch = Vec<(Txid, Fills)>;
type HoleBatch = Vec<(Txid, Holes)>;
impl Prevouts {
/// Fill every unfilled prevout the cycle can resolve. Same-cycle
/// in-mempool parents are filled lock-locally; the remainder go
/// through `resolver` outside any lock. Returns true iff anything
/// was written.
pub fn fill<F>(lock: &RwLock<State>, resolver: F) -> bool
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
{
let (in_mempool, holes) = {
let state = lock.read();
Self::gather(&state.txs)
};
let external = Self::resolve_external(holes, resolver);
if in_mempool.is_empty() && external.is_empty() {
return false;
}
let mut state = lock.write();
Self::write_fills(&mut state, in_mempool);
Self::write_fills(&mut state, external);
true
}
/// Default resolver: per-call `getrawtransaction` against the
/// bitcoind RPC client `Mempool` already holds. Requires
/// `txindex=1`. On any failure logs once with a hint, then returns
/// `None`; the next cycle retries automatically.
pub fn rpc_resolver(client: Client) -> impl Fn(&Txid, Vout) -> Option<TxOut> {
let warned = AtomicBool::new(false);
move |txid, vout| {
let bt: &bitcoin::Txid = txid.into();
match client.get_raw_transaction(bt, None as Option<&bitcoin::BlockHash>) {
Ok(tx) => tx
.output
.get(usize::from(vout))
.map(|o| TxOut::from((o.script_pubkey.clone(), o.value.into()))),
Err(_) => {
if !warned.swap(true, Ordering::Relaxed) {
warn!(
"mempool: getrawtransaction missed for {txid}; ensure bitcoind is running with txindex=1"
);
}
None
}
}
}
}
/// Single pass over `txs.unresolved()`: bucket each hole into a
/// same-cycle in-mempool fill (parent is live) or an external hole
/// (parent is confirmed or unknown).
fn gather(txs: &TxStore) -> (FillBatch, HoleBatch) {
if txs.unresolved().is_empty() {
return (Vec::new(), Vec::new());
}
let mut filled: FillBatch = Vec::new();
let mut holes: HoleBatch = Vec::new();
for prefix in txs.unresolved() {
let Some(record) = txs.record_by_prefix(prefix) else {
continue;
};
let mut tx_fills: Fills = Vec::new();
let mut tx_holes: Holes = Vec::new();
for (i, txin) in record.tx.input.iter().enumerate() {
if txin.prevout.is_some() {
continue;
}
let vin = Vin::from(i);
if let Some(parent) = txs.get(&txin.txid)
&& let Some(out) = parent.output.get(usize::from(txin.vout))
{
tx_fills.push((vin, out.clone()));
} else {
tx_holes.push((vin, txin.txid, txin.vout));
}
}
let txid = record.entry.txid;
if !tx_fills.is_empty() {
filled.push((txid, tx_fills));
}
if !tx_holes.is_empty() {
holes.push((txid, tx_holes));
}
}
(filled, holes)
}
fn resolve_external<F>(holes: HoleBatch, resolver: F) -> FillBatch
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
{
holes
.into_iter()
.filter_map(|(txid, holes)| {
let fills: Fills = holes
.into_iter()
.filter_map(|(vin, prev_txid, vout)| {
resolver(&prev_txid, vout).map(|o| (vin, o))
})
.collect();
(!fills.is_empty()).then_some((txid, fills))
})
.collect()
}
fn write_fills(state: &mut State, fills: FillBatch) {
for (txid, tx_fills) in fills {
let prefix = TxidPrefix::from(&txid);
for prevout in state.txs.apply_fills(&prefix, tx_fills) {
state.addrs.add_input(&txid, &prevout);
}
}
}
}

View File

@@ -52,7 +52,7 @@ impl Partitioner {
}
fn sorted_indices(txs: &[SnapTx], excluded: &FxHashSet<TxIndex>) -> Vec<(TxIndex, VSize)> {
let mut cands: Vec<(TxIndex, VSize, brk_types::FeeRate)> = txs
let mut cands: Vec<(TxIndex, VSize, _)> = txs
.iter()
.enumerate()
.filter_map(|(i, t)| {
@@ -61,8 +61,5 @@ fn sorted_indices(txs: &[SnapTx], excluded: &FxHashSet<TxIndex>) -> Vec<(TxIndex
})
.collect();
cands.sort_by_key(|(_, _, rate)| Reverse(*rate));
cands
.into_iter()
.map(|(idx, vsize, _)| (idx, vsize))
.collect()
cands.into_iter().map(|(i, v, _)| (i, v)).collect()
}

View File

@@ -23,10 +23,6 @@ use super::{SnapTx, TxIndex};
pub type PrefixIndex = FxHashMap<TxidPrefix, TxIndex>;
pub fn build_txs(txs: &TxStore) -> (Vec<SnapTx>, PrefixIndex) {
if txs.is_empty() {
return (Vec::new(), PrefixIndex::default());
}
let (prefix_to_idx, ordered) = compact_index(txs);
let mut snap_txs: Vec<SnapTx> = ordered.iter().map(|e| live_tx(e, &prefix_to_idx)).collect();

View File

@@ -11,6 +11,17 @@ const CORE_PERCENTILES: [f64; 7] = [0.0, 0.10, 0.25, 0.50, 0.75, 0.90, 1.00];
/// columns of an otherwise tightly clustered fee tier.
const PROJECTED_PERCENTILES: [f64; 7] = [0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95];
/// Per-block aggregate stats for a projected block.
///
/// `block_stats[0]` mirrors Bitcoin Core's `getblocktemplate` - the
/// node's actual next-block selection. `fee_range` spans the full
/// 0..100 percentiles.
///
/// `block_stats[1..]` are a coarse greedy-packed projection by
/// descending chunk rate, useful as a client-facing fee-tier gradient
/// but not a prediction of what miners will include. Their `fee_range`
/// is clipped to 5..95 percentiles so a single stale-GBT leftover or
/// CPFP orphan doesn't dominate the min/max columns.
#[derive(Debug, Clone, Default)]
pub struct BlockStats {
pub tx_count: u32,

View File

@@ -17,37 +17,27 @@ pub struct AddrTracker(FxHashMap<AddrBytes, AddrEntry>);
impl AddrTracker {
pub fn add_tx(&mut self, tx: &Transaction, txid: &Txid) {
for txin in &tx.input {
let Some(prevout) = txin.prevout.as_ref() else {
continue;
};
let Some(bytes) = prevout.addr_bytes() else {
continue;
};
self.apply_add(bytes, txid, |stats| stats.sending(prevout));
if let Some(prevout) = txin.prevout.as_ref() {
self.add_input(txid, prevout);
}
}
for txout in &tx.output {
let Some(bytes) = txout.addr_bytes() else {
continue;
};
self.apply_add(bytes, txid, |stats| stats.receiving(txout));
if let Some(bytes) = txout.addr_bytes() {
self.apply_add(bytes, txid, |stats| stats.receiving(txout));
}
}
}
pub fn remove_tx(&mut self, tx: &Transaction, txid: &Txid) {
for txin in &tx.input {
let Some(prevout) = txin.prevout.as_ref() else {
continue;
};
let Some(bytes) = prevout.addr_bytes() else {
continue;
};
self.apply_remove(bytes, txid, |stats| stats.sent(prevout));
if let Some(prevout) = txin.prevout.as_ref() {
self.remove_input(txid, prevout);
}
}
for txout in &tx.output {
let Some(bytes) = txout.addr_bytes() else {
continue;
};
self.apply_remove(bytes, txid, |stats| stats.received(txout));
if let Some(bytes) = txout.addr_bytes() {
self.apply_remove(bytes, txid, |stats| stats.received(txout));
}
}
}
@@ -67,8 +57,9 @@ impl AddrTracker {
/// Fold a single newly-resolved input into the per-address stats.
/// Called by the prevout-fill paths after a prevout that was
/// previously `None` has been filled. Inputs whose prevout doesn't
/// resolve to an addr are no-ops.
/// previously `None` has been filled, and by `add_tx` for each
/// resolved input. Inputs whose prevout doesn't resolve to an addr
/// are no-ops.
pub fn add_input(&mut self, txid: &Txid, prevout: &TxOut) {
let Some(bytes) = prevout.addr_bytes() else {
return;
@@ -76,6 +67,13 @@ impl AddrTracker {
self.apply_add(bytes, txid, |stats| stats.sending(prevout));
}
fn remove_input(&mut self, txid: &Txid, prevout: &TxOut) {
let Some(bytes) = prevout.addr_bytes() else {
return;
};
self.apply_remove(bytes, txid, |stats| stats.sent(prevout));
}
fn apply_add(
&mut self,
bytes: AddrBytes,

View File

@@ -10,9 +10,10 @@
//! carries the same chunk-rate semantics the live mempool produces.
use brk_error::{Error, OptionData, Result};
use brk_mempool::{ChunkInput, linearize};
use brk_types::{
CpfpCluster, CpfpClusterChunk, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate,
Height, Sats, TxInIndex, TxIndex, Txid, TxidPrefix, VSize, Weight,
CpfpCluster, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate, Height, Sats,
TxInIndex, TxIndex, Txid, TxidPrefix, VSize, Weight,
};
use rustc_hash::{FxBuildHasher, FxHashMap};
use smallvec::SmallVec;
@@ -323,65 +324,52 @@ fn build_cpfp_info(
.max_by_key(|e| FeeRate::from((e.fee, e.weight)))
.cloned();
let cluster_txs: Vec<CpfpClusterTx> = members
.iter()
.map(|m| CpfpClusterTx {
txid: m.txid,
weight: m.weight,
fee: m.fee,
parents: m.parents.iter().copied().collect(),
})
.collect();
let chunks = chunk_groups(members);
let chunk_index = chunks
.iter()
.position(|ch| ch.txs.contains(&seed_local))
.map(|i| i as u32)
.unwrap_or(0);
let (cluster, effective_fee_per_vsize) = if members.len() <= 1 {
(None, seed.rate)
} else {
let inputs: Vec<ChunkInput<'_>> = members
.iter()
.map(|m| ChunkInput {
fee: m.fee,
vsize: m.vsize,
parents: m.parents.as_slice(),
})
.collect();
let chunks = linearize(&inputs);
let (chunk_index, seed_rate) = chunks
.iter()
.enumerate()
.find(|(_, ch)| ch.txs.contains(&seed_local))
.map(|(i, ch)| (i as u32, ch.feerate))
.unwrap_or((0, seed.rate));
let cluster_txs: Vec<CpfpClusterTx> = members
.iter()
.map(|m| CpfpClusterTx {
txid: m.txid,
weight: m.weight,
fee: m.fee,
parents: m.parents.iter().copied().collect(),
})
.collect();
(
Some(CpfpCluster {
txs: cluster_txs,
chunks,
chunk_index,
}),
seed_rate,
)
};
CpfpInfo {
ancestors,
best_descendant,
descendants,
effective_fee_per_vsize: seed.rate,
effective_fee_per_vsize,
sigops,
fee: seed.fee,
vsize: seed.vsize,
adjusted_vsize: sigops.adjust_vsize(seed.vsize),
cluster: CpfpCluster {
txs: cluster_txs,
chunks,
chunk_index,
},
cluster,
}
}
fn chunk_groups(members: &[Member]) -> Vec<CpfpClusterChunk> {
let mut groups: FxHashMap<u64, (FeeRate, SmallVec<[CpfpClusterTxIndex; 4]>)> =
FxHashMap::with_capacity_and_hasher(members.len(), FxBuildHasher);
let mut order: Vec<u64> = Vec::new();
for (i, m) in members.iter().enumerate() {
let key = f64::from(m.rate).to_bits();
let local = CpfpClusterTxIndex::from(i as u32);
groups
.entry(key)
.and_modify(|(_, v)| v.push(local))
.or_insert_with(|| {
order.push(key);
let mut v: SmallVec<[CpfpClusterTxIndex; 4]> = SmallVec::new();
v.push(local);
(m.rate, v)
});
}
order.sort_by_key(|k| std::cmp::Reverse(groups[k].0));
order
.into_iter()
.map(|k| {
let (rate, txs) = groups.remove(&k).unwrap();
CpfpClusterChunk {
txs: txs.into_vec(),
feerate: rate,
}
})
.collect()
}

View File

@@ -16,6 +16,8 @@ pub struct CpfpInfo {
/// Descendant transactions in the CPFP chain.
pub descendants: Vec<CpfpEntry>,
/// Effective fee rate considering CPFP relationships (sat/vB).
/// This is the seed's chunk feerate after lift-merging, i.e. the
/// rate Core/mempool.space would surface for this tx.
pub effective_fee_per_vsize: FeeRate,
/// BIP-141 sigop cost for the seed tx (witness sigops count as 1,
/// legacy and P2SH-redeem sigops count as 4).
@@ -27,6 +29,8 @@ pub struct CpfpInfo {
/// Policy-adjusted virtual size: `max(vsize, sigops * 5)`.
pub adjusted_vsize: VSize,
/// Cluster the seed belongs to: full tx list, SFL-linearized chunks,
/// and the seed's chunk index.
pub cluster: CpfpCluster,
/// and the seed's chunk index. Omitted when the seed has no
/// ancestors and no descendants (matches mempool.space).
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster: Option<CpfpCluster>,
}

View File

@@ -421,13 +421,16 @@ Matches mempool.space/bitcoin-cli behavior.
* @property {(CpfpEntry|null)=} bestDescendant - Best (highest fee rate) descendant, if any.
* @property {CpfpEntry[]} descendants - Descendant transactions in the CPFP chain.
* @property {FeeRate} effectiveFeePerVsize - Effective fee rate considering CPFP relationships (sat/vB).
This is the seed's chunk feerate after lift-merging, i.e. the
rate Core/mempool.space would surface for this tx.
* @property {SigOps} sigops - BIP-141 sigop cost for the seed tx (witness sigops count as 1,
legacy and P2SH-redeem sigops count as 4).
* @property {Sats} fee - Transaction fee (sats).
* @property {VSize} vsize - Virtual size of the seed tx (vbytes).
* @property {VSize} adjustedVsize - Policy-adjusted virtual size: `max(vsize, sigops * 5)`.
* @property {CpfpCluster} cluster - Cluster the seed belongs to: full tx list, SFL-linearized chunks,
and the seed's chunk index.
* @property {(CpfpCluster|null)=} cluster - Cluster the seed belongs to: full tx list, SFL-linearized chunks,
and the seed's chunk index. Omitted when the seed has no
ancestors and no descendants (matches mempool.space).
*/
/**
* Range parameters with output format for API query parameters.

View File

@@ -716,13 +716,16 @@ class CpfpInfo(TypedDict):
bestDescendant: Best (highest fee rate) descendant, if any.
descendants: Descendant transactions in the CPFP chain.
effectiveFeePerVsize: Effective fee rate considering CPFP relationships (sat/vB).
This is the seed's chunk feerate after lift-merging, i.e. the
rate Core/mempool.space would surface for this tx.
sigops: BIP-141 sigop cost for the seed tx (witness sigops count as 1,
legacy and P2SH-redeem sigops count as 4).
fee: Transaction fee (sats).
vsize: Virtual size of the seed tx (vbytes).
adjustedVsize: Policy-adjusted virtual size: `max(vsize, sigops * 5)`.
cluster: Cluster the seed belongs to: full tx list, SFL-linearized chunks,
and the seed's chunk index.
and the seed's chunk index. Omitted when the seed has no
ancestors and no descendants (matches mempool.space).
"""
ancestors: List[CpfpEntry]
bestDescendant: Union[CpfpEntry, None]
@@ -732,7 +735,7 @@ and the seed's chunk index.
fee: Sats
vsize: VSize
adjustedVsize: VSize
cluster: CpfpCluster
cluster: Union[CpfpCluster, None]
class DataRangeFormat(TypedDict):
"""