diff --git a/crates/brk_mempool/src/cluster.rs b/crates/brk_mempool/src/cluster.rs new file mode 100644 index 000000000..131b75ea2 --- /dev/null +++ b/crates/brk_mempool/src/cluster.rs @@ -0,0 +1,138 @@ +//! Snapshot-side cluster primitives: connected-component discovery +//! over `SnapTx` adjacency, topological ordering, and the glue that +//! feeds the cluster into [`brk_types::linearize`] (Single Fee +//! Linearization). +//! +//! A *cluster* is the connected component of a tx in the dependency +//! graph (`parents ∪ children`), bounded by Core 31's +//! `MAX_CLUSTER_COUNT_LIMIT = 64`. The SFL algorithm itself lives in +//! `brk_types` since it has no mempool deps and is shared with the +//! confirmed-cpfp path in `brk_query`. + +use std::collections::VecDeque; + +use brk_types::{ChunkInput, CpfpClusterChunk, CpfpClusterTxIndex, linearize}; +use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet}; +use smallvec::SmallVec; + +use crate::steps::{SnapTx, TxIndex}; + +/// Cluster cap matches Bitcoin Core 31's `MAX_CLUSTER_COUNT_LIMIT`. Any +/// connected component above this size is malformed under Core's policy +/// and gets truncated. +pub(crate) const MAX_CLUSTER: usize = 64; + +/// Capped DFS over the undirected dependency graph (`parents ∪ +/// children`) starting from `seed`. Returns the connected component +/// truncated to `MAX_CLUSTER`, with `seed` at index 0. +pub(crate) fn walk_cluster(txs: &[SnapTx], seed: TxIndex) -> Vec { + if txs.get(seed.as_usize()).is_none() { + return Vec::new(); + } + let mut visited: FxHashSet = + FxHashSet::with_capacity_and_hasher(MAX_CLUSTER, FxBuildHasher); + visited.insert(seed); + let mut out: Vec = Vec::with_capacity(MAX_CLUSTER); + out.push(seed); + let mut stack: Vec = vec![seed]; + while let Some(idx) = stack.pop() { + let Some(t) = txs.get(idx.as_usize()) else { + continue; + }; + for &n in t.parents.iter().chain(t.children.iter()) { + if out.len() >= MAX_CLUSTER { + return out; + } + if visited.insert(n) { + out.push(n); + stack.push(n); + } + } + } + out +} + +/// Linearize the connected component into chunks. Topo-sorts members, +/// remaps parent edges to cluster-local indices, and runs SFL. Returns +/// `(members, chunks)` where `members` is the topo-ordered `TxIndex` +/// list and `chunks[*].txs` are local indices into `members`. Callers +/// must filter singletons before calling - the singleton's `chunk_rate` +/// is `fee/vsize`, set elsewhere. +pub(crate) fn linearize_component( + txs: &[SnapTx], + component: &[TxIndex], +) -> (Vec, Vec) { + let members = topo_sort(txs, component); + let local_of = build_local_index(&members); + let parents_local: Vec> = members + .iter() + .map(|idx| { + txs[idx.as_usize()] + .parents + .iter() + .filter_map(|p| local_of.get(p).copied()) + .collect() + }) + .collect(); + let inputs: Vec> = members + .iter() + .zip(&parents_local) + .map(|(idx, ps)| { + let t = &txs[idx.as_usize()]; + ChunkInput { + fee: t.fee, + vsize: t.vsize, + parents: ps.as_slice(), + } + }) + .collect(); + let chunks = linearize(&inputs); + (members, chunks) +} + +/// Kahn's topological sort over the connected component, restricted to +/// in-cluster parent edges. Returns members in an order where every tx +/// follows all its in-cluster parents. +fn topo_sort(txs: &[SnapTx], component: &[TxIndex]) -> Vec { + let n = component.len(); + let pos: FxHashMap = component + .iter() + .enumerate() + .map(|(i, &x)| (x, i)) + .collect(); + let mut indeg: Vec = vec![0; n]; + let mut children: Vec> = vec![Vec::new(); n]; + for (i, &idx) in component.iter().enumerate() { + let Some(t) = txs.get(idx.as_usize()) else { + continue; + }; + indeg[i] = t.parents.iter().filter(|p| pos.contains_key(p)).count() as u32; + for &c in t.children.iter() { + if let Some(&ci) = pos.get(&c) { + children[i].push(ci); + } + } + } + let mut queue: VecDeque = (0..n).filter(|&i| indeg[i] == 0).collect(); + let mut out: Vec = Vec::with_capacity(n); + while let Some(i) = queue.pop_front() { + out.push(component[i]); + for &c in &children[i] { + indeg[c] -= 1; + if indeg[c] == 0 { + queue.push_back(c); + } + } + } + out +} + +/// `members[i]`'s wire index, keyed by snapshot `TxIndex`. Built once +/// so per-tx parent edges can be remapped without a linear scan. +pub(crate) fn build_local_index(members: &[TxIndex]) -> FxHashMap { + members + .iter() + .enumerate() + .map(|(i, &idx)| (idx, CpfpClusterTxIndex::from(i as u32))) + .collect() +} diff --git a/crates/brk_mempool/src/cpfp.rs b/crates/brk_mempool/src/cpfp.rs index f458add19..7505a558b 100644 --- a/crates/brk_mempool/src/cpfp.rs +++ b/crates/brk_mempool/src/cpfp.rs @@ -1,48 +1,24 @@ //! CPFP (Child Pays For Parent) walk over a `Snapshot`'s adjacency. //! -//! The snapshot stores per-tx parent/child edges in `TxIndex` space and -//! per-tx `(fee, vsize)` we need for chunking. -//! //! Three independent walks: -//! - `ancestors_idx`: capped DFS up `parents` only. -//! - `descendants_idx`: capped DFS down `children` only. -//! - cluster `members`: capped DFS over `parents ∪ children`, i.e. the -//! connected component of the seed in the in-mempool dependency -//! graph. Required to match Core 31's cluster mempool semantics: -//! siblings (sharing a parent) and cousins (sharing a descendant) -//! belong to the same cluster but are missed by ancestor/descendant -//! walks alone. -//! -//! The cluster is then linearized via `brk_types::linearize` (single fee -//! linearization) so chunks reflect Core's CPFP "lift": a child whose -//! rate exceeds its parent's gets folded into a chunk with the parent -//! at the combined feerate. The seed's chunk feerate is what -//! `effective_fee_per_vsize` reports. - -use std::collections::VecDeque; +//! - `ancestors`: capped DFS up `parents` only. +//! - `descendants`: capped DFS down `children` only. +//! - cluster: the connected component over `parents ∪ children`, +//! linearized via [`crate::cluster`] for the cluster wire shape and +//! the seed's chunk feerate. use brk_types::{ - CpfpCluster, CpfpClusterChunk, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate, - SigOps, TxidPrefix, VSize, + CPFP_CHAIN_LIMIT, CpfpCluster, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate, + SigOps, TxidPrefix, VSize, find_seed_chunk, }; -use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet}; +use rustc_hash::{FxBuildHasher, FxHashSet}; use crate::{ Mempool, - chunking::{ChunkInput, linearize}, + cluster::{build_local_index, linearize_component, walk_cluster}, steps::{SnapTx, TxIndex}, }; -/// Cap matches Bitcoin Core's default mempool ancestor/descendant -/// chain limits and mempool.space's truncation. -const MAX: usize = 25; - -/// Cluster cap matches Bitcoin Core 31's `MAX_CLUSTER_COUNT_LIMIT` -/// (max txs in a single cluster-mempool cluster). Sized large enough -/// to hold the whole connected component for any policy-conformant -/// cluster, then truncated. -const MAX_CLUSTER: usize = 64; - impl Mempool { /// CPFP info for a live mempool tx. Returns `None` only when the /// tx isn't in the mempool, so callers can fall through to the @@ -92,39 +68,30 @@ fn build_cpfp_info( } } -/// Walk the graph from `seed` along `next` and lift the visited indices -/// into wire-shape `CpfpEntry`s in one go. +/// Capped DFS from `seed` (exclusive) along `next`, lifted directly to +/// wire-shape `CpfpEntry`s. Used for both ancestor and descendant walks. fn collect_entries( txs: &[SnapTx], seed: TxIndex, next: impl Fn(&SnapTx) -> &[TxIndex], ) -> Vec { - walk(txs, seed, next) - .iter() - .filter_map(|&i| txs.get(i.as_usize()).map(CpfpEntry::from)) - .collect() -} - -/// Capped DFS from `seed` (exclusive), following the neighbors yielded -/// by `next`. Used for both the ancestor and descendant walks. -fn walk(txs: &[SnapTx], seed: TxIndex, next: impl Fn(&SnapTx) -> &[TxIndex]) -> Vec { let Some(seed_node) = txs.get(seed.as_usize()) else { return Vec::new(); }; let mut visited: FxHashSet = - FxHashSet::with_capacity_and_hasher(MAX + 1, FxBuildHasher); + FxHashSet::with_capacity_and_hasher(CPFP_CHAIN_LIMIT + 1, FxBuildHasher); visited.insert(seed); - let mut out: Vec = Vec::with_capacity(MAX); + let mut out: Vec = Vec::with_capacity(CPFP_CHAIN_LIMIT); let mut stack: Vec = next(seed_node).to_vec(); while let Some(idx) = stack.pop() { - if out.len() >= MAX { + if out.len() >= CPFP_CHAIN_LIMIT { break; } if !visited.insert(idx) { continue; } - out.push(idx); if let Some(t) = txs.get(idx.as_usize()) { + out.push(CpfpEntry::from(t)); stack.extend(next(t).iter().copied()); } } @@ -132,8 +99,8 @@ fn walk(txs: &[SnapTx], seed: TxIndex, next: impl Fn(&SnapTx) -> &[TxIndex]) -> } /// Wire-shape `CpfpCluster` plus the seed's chunk feerate. Members are -/// the connected component of the seed in the dependency graph, then -/// topologically sorted (parents before children) so wire indices and +/// the connected component of the seed in the dependency graph, +/// topologically ordered (parents before children) so wire indices and /// chunk-internal ordering are valid for client-side reconstruction. /// Returns `(None, seed_per_tx_rate)` for singletons (matches /// mempool.space, which omits `cluster` when no relations exist). @@ -148,12 +115,15 @@ fn build_cluster( return (None, seed_per_tx_rate); } - let members = topo_sort(txs, &component); - let local_of = build_local_index(&members); - let (cluster_txs, vsizes) = collect_cluster_members(txs, &members, &local_of); - let chunks = linearize_cluster(&cluster_txs, &vsizes); - let (chunk_index, seed_chunk_rate) = - locate_seed_chunk(local_of[&seed_idx], &chunks, seed_per_tx_rate); + let (members, chunks) = linearize_component(txs, &component); + let cluster_txs = build_wire_members(txs, &members); + let seed_local = CpfpClusterTxIndex::from( + members + .iter() + .position(|&i| i == seed_idx) + .map_or(0, |p| p as u32), + ); + let (chunk_index, seed_chunk_rate) = find_seed_chunk(&chunks, seed_local, seed_per_tx_rate); ( Some(CpfpCluster { @@ -165,141 +135,25 @@ fn build_cluster( ) } -/// `members[i]`'s wire index, keyed by snapshot `TxIndex`. Built once -/// so per-tx parent edges can be remapped without a linear scan. -fn build_local_index(members: &[TxIndex]) -> FxHashMap { +/// Materialize wire-shape `CpfpClusterTx`s for every topo-ordered +/// member with parent edges remapped to local indices. +fn build_wire_members(txs: &[SnapTx], members: &[TxIndex]) -> Vec { + let local_of = build_local_index(members); members .iter() - .enumerate() - .map(|(i, &idx)| (idx, CpfpClusterTxIndex::from(i as u32))) + .map(|&idx| { + let t = &txs[idx.as_usize()]; + CpfpClusterTx { + txid: t.txid, + weight: t.weight, + fee: t.fee, + parents: t + .parents + .iter() + .filter_map(|p| local_of.get(p).copied()) + .collect(), + } + }) .collect() } -/// Materialize wire-shape `CpfpClusterTx`s for every member with parent -/// edges remapped to local indices, plus the parallel `vsize` column the -/// linearizer needs (not carried on `CpfpClusterTx`, which only stores -/// weight). -fn collect_cluster_members( - txs: &[SnapTx], - members: &[TxIndex], - local_of: &FxHashMap, -) -> (Vec, Vec) { - let mut cluster_txs: Vec = Vec::with_capacity(members.len()); - let mut vsizes: Vec = Vec::with_capacity(members.len()); - for &idx in members { - let Some(t) = txs.get(idx.as_usize()) else { - continue; - }; - let parents: Vec = t - .parents - .iter() - .filter_map(|p| local_of.get(p).copied()) - .collect(); - cluster_txs.push(CpfpClusterTx { - txid: t.txid, - weight: t.weight, - fee: t.fee, - parents, - }); - vsizes.push(t.vsize); - } - (cluster_txs, vsizes) -} - -/// Single-fee-linearize the cluster, borrowing parents from the -/// already-built `cluster_txs` so no re-allocation is needed. -fn linearize_cluster(cluster_txs: &[CpfpClusterTx], vsizes: &[VSize]) -> Vec { - let inputs: Vec> = cluster_txs - .iter() - .zip(vsizes) - .map(|(c, &vsize)| ChunkInput { - fee: c.fee, - vsize, - parents: &c.parents, - }) - .collect(); - linearize(&inputs) -} - -/// Find the chunk containing the seed and return its index plus rate. -/// Falls back to `(0, seed_per_tx_rate)` when the seed isn't in any -/// chunk - shouldn't happen but keeps the wire shape valid. -fn locate_seed_chunk( - seed_local: CpfpClusterTxIndex, - chunks: &[CpfpClusterChunk], - seed_per_tx_rate: FeeRate, -) -> (u32, FeeRate) { - chunks - .iter() - .enumerate() - .find(|(_, ch)| ch.txs.contains(&seed_local)) - .map(|(i, ch)| (i as u32, ch.feerate)) - .unwrap_or((0, seed_per_tx_rate)) -} - -/// Capped DFS over the undirected dependency graph (`parents ∪ -/// children`) starting from `seed`. Returns the connected component -/// truncated to `MAX_CLUSTER`, with `seed` at index 0. -fn walk_cluster(txs: &[SnapTx], seed: TxIndex) -> Vec { - if txs.get(seed.as_usize()).is_none() { - return Vec::new(); - } - let mut visited: FxHashSet = - FxHashSet::with_capacity_and_hasher(MAX_CLUSTER, FxBuildHasher); - visited.insert(seed); - let mut out: Vec = Vec::with_capacity(MAX_CLUSTER); - out.push(seed); - let mut stack: Vec = vec![seed]; - while let Some(idx) = stack.pop() { - let Some(t) = txs.get(idx.as_usize()) else { - continue; - }; - for &n in t.parents.iter().chain(t.children.iter()) { - if out.len() >= MAX_CLUSTER { - return out; - } - if visited.insert(n) { - out.push(n); - stack.push(n); - } - } - } - out -} - -/// Kahn's topological sort over the connected component, restricted to -/// in-cluster parent edges. Returns members in an order where every tx -/// follows all its in-cluster parents. -fn topo_sort(txs: &[SnapTx], component: &[TxIndex]) -> Vec { - let n = component.len(); - let pos: FxHashMap = component - .iter() - .enumerate() - .map(|(i, &x)| (x, i)) - .collect(); - let mut indeg: Vec = vec![0; n]; - let mut children: Vec> = vec![Vec::new(); n]; - for (i, &idx) in component.iter().enumerate() { - let Some(t) = txs.get(idx.as_usize()) else { - continue; - }; - indeg[i] = t.parents.iter().filter(|p| pos.contains_key(p)).count() as u32; - for &c in t.children.iter() { - if let Some(&ci) = pos.get(&c) { - children[i].push(ci); - } - } - } - let mut queue: VecDeque = (0..n).filter(|&i| indeg[i] == 0).collect(); - let mut out: Vec = Vec::with_capacity(n); - while let Some(i) = queue.pop_front() { - out.push(component[i]); - for &c in &children[i] { - indeg[c] -= 1; - if indeg[c] == 0 { - queue.push_back(c); - } - } - } - out -} diff --git a/crates/brk_mempool/src/diagnostics.rs b/crates/brk_mempool/src/diagnostics.rs index e0812b7d9..4e25aed86 100644 --- a/crates/brk_mempool/src/diagnostics.rs +++ b/crates/brk_mempool/src/diagnostics.rs @@ -18,15 +18,15 @@ pub struct MempoolStats { impl From<&Mempool> for MempoolStats { fn from(mempool: &Mempool) -> Self { - let inner = mempool.read(); + let state = mempool.read(); let rebuilder = mempool.rebuilder(); Self { - txs: inner.txs.len(), - unresolved: inner.txs.unresolved().len(), - addrs: inner.addrs.len(), - outpoint_spends: inner.outpoint_spends.len(), - graveyard_tombstones: inner.graveyard.tombstones_len(), - graveyard_order: inner.graveyard.order_len(), + txs: state.txs.len(), + unresolved: state.txs.unresolved().len(), + addrs: state.addrs.len(), + outpoint_spends: state.outpoint_spends.len(), + graveyard_tombstones: state.graveyard.tombstones_len(), + graveyard_order: state.graveyard.order_len(), rebuilds: rebuilder.rebuild_count(), skip_cleans: rebuilder.skip_clean_count(), } diff --git a/crates/brk_mempool/src/lib.rs b/crates/brk_mempool/src/lib.rs index a30c9ecc5..0927ab55c 100644 --- a/crates/brk_mempool/src/lib.rs +++ b/crates/brk_mempool/src/lib.rs @@ -3,10 +3,10 @@ //! One pull cycle, five steps: //! //! 1. [`steps::fetcher::Fetcher`] - one mixed batched RPC for -//! `getrawmempool verbose` + `getblocktemplate` + `getmempoolinfo`, -//! then a second batch for `getrawtransaction` on new entries. The -//! GBT is validated to be a subset of the verbose listing; on -//! mismatch the cycle is skipped. +//! `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`, +//! then a `getmempoolentry` batch and a `getrawtransaction` batch +//! on new txids only. The GBT is validated to be a subset of the +//! txid listing; on mismatch the cycle is skipped. //! 2. [`steps::preparer::Preparer`] - decode and classify into //! `TxsPulled { added, removed }`. Pure CPU. //! 3. [`steps::applier::Applier`] - apply the diff to @@ -38,7 +38,7 @@ use rustc_hash::FxHashSet; use parking_lot::{RwLock, RwLockReadGuard}; use tracing::error; -pub mod chunking; +mod cluster; mod cpfp; mod diagnostics; mod rbf; @@ -46,12 +46,12 @@ mod state; pub(crate) mod steps; pub(crate) mod stores; -pub use chunking::{ChunkInput, linearize}; pub use diagnostics::MempoolStats; pub use rbf::{RbfForTx, RbfNode}; +pub use steps::Snapshot; use steps::{Applier, Fetched, Fetcher, Preparer, Prevouts, Rebuilder}; -pub use steps::{BlockStats, RecommendedFees, Snapshot, TxEntry, TxRemoval}; -pub use stores::{TxGraveyard, TxStore, TxTombstone}; +pub(crate) use steps::{BlockStats, RecommendedFees, TxEntry, TxRemoval}; +pub(crate) use stores::{TxStore, TxTombstone}; /// Confirmed-parent prevout resolver passed to [`Mempool::update_with`] / /// [`Mempool::start_with`]. Receives `(parent_txid, vout)`, returns the @@ -62,9 +62,9 @@ pub(crate) use state::State; /// Cheaply cloneable: clones share one live mempool via `Arc`. #[derive(Clone)] -pub struct Mempool(Arc); +pub struct Mempool(Arc); -struct Shared { +struct Inner { client: Client, state: RwLock, rebuilder: Rebuilder, @@ -72,7 +72,7 @@ struct Shared { impl Mempool { pub fn new(client: &Client) -> Self { - Self(Arc::new(Shared { + Self(Arc::new(Inner { client: client.clone(), state: RwLock::new(State::default()), rebuilder: Rebuilder::default(), @@ -112,16 +112,14 @@ impl Mempool { /// (block 0) with aggregate stats and full tx bodies in GBT order. pub fn block_template(&self) -> BlockTemplate { let snap = self.snapshot(); - let stats = MempoolBlock::from(&snap.block_stats[0]); - let txids: Vec = snap.blocks[0] - .iter() - .map(|idx| snap.txs[idx.as_usize()].txid) - .collect(); - let transactions = self.collect_txs(&txids); BlockTemplate { hash: snap.next_block_hash, - stats, - transactions, + stats: snap + .block_stats + .first() + .map(MempoolBlock::from) + .unwrap_or_default(), + transactions: self.collect_txs(snap.block0_txids()), } } @@ -133,26 +131,20 @@ impl Mempool { pub fn block_template_diff(&self, since: NextBlockHash) -> Option { let past = self.0.rebuilder.historical_block0(since)?; let snap = self.snapshot(); - let current: FxHashSet = snap.blocks[0] - .iter() - .map(|idx| snap.txs[idx.as_usize()].txid) - .collect(); - let added_txids: Vec = current.difference(&past).copied().collect(); - let removed: Vec = past.difference(¤t).copied().collect(); - let added = self.collect_txs(&added_txids); + let current: FxHashSet = snap.block0_txids().collect(); Some(BlockTemplateDiff { hash: snap.next_block_hash, since, - added, - removed, + added: self.collect_txs(current.difference(&past).copied()), + removed: past.difference(¤t).copied().collect(), }) } - fn collect_txs(&self, txids: &[Txid]) -> Vec { + fn collect_txs(&self, txids: impl IntoIterator) -> Vec { let state = self.read(); txids - .iter() - .filter_map(|txid| state.txs.get(txid).cloned()) + .into_iter() + .filter_map(|txid| state.txs.get(&txid).cloned()) .collect() } @@ -191,9 +183,7 @@ impl Mempool { /// Apply `f` to a `Vanished` tombstone's tx body if present. /// `Replaced` tombstones return `None` because the tx will not confirm. pub fn with_vanished_tx(&self, txid: &Txid, f: impl FnOnce(&Transaction) -> R) -> Option { - let state = self.read(); - let tomb = state.graveyard.get(txid)?; - matches!(tomb.reason(), TxRemoval::Vanished).then(|| f(&tomb.tx)) + self.read().graveyard.get_vanished(txid).map(|t| f(&t.tx)) } /// Snapshot of all live mempool txids. @@ -240,8 +230,8 @@ impl Mempool { &self, f: impl FnOnce(&mut dyn Iterator) -> R, ) -> R { - let inner = self.read(); - let mut iter = inner + let state = self.read(); + let mut iter = state .txs .values() .flat_map(|tx| &tx.output) @@ -249,9 +239,9 @@ impl Mempool { f(&mut iter) } - /// Effective fee rate for a live tx: snapshot's chunk rate when - /// the tx is in the latest snapshot, falling back to the entry's - /// `fee/vsize` if not yet ingested. + /// Effective fee rate for a live tx: snapshot's linearized chunk + /// rate. Falls back to `fee/vsize` for txs added since the latest + /// snapshot was built (apply -> same-cycle tick gap). pub fn live_effective_fee_rate(&self, prefix: &TxidPrefix) -> Option { if let Some(rate) = self.snapshot().chunk_rate_for(prefix) { return Some(rate); @@ -262,16 +252,15 @@ impl Mempool { .map(|e| e.fee_rate()) } - /// Effective fee rate (Core's chunk rate) snapshotted into the - /// tomb's entry at burial - same value `live_effective_fee_rate` - /// returns while the tx is alive, so an evicted RBF predecessor - /// reports the package-effective rate it had in the mempool, not a - /// misleading isolated `fee/vsize`. + /// Linearized chunk rate captured at burial - same value + /// `live_effective_fee_rate` returned while the tx was alive, so an + /// evicted RBF predecessor reports the package-effective rate it + /// had in the mempool, not a misleading isolated `fee/vsize`. pub fn graveyard_fee_rate(&self, txid: &Txid) -> Option { self.read() .graveyard .get(txid) - .map(|tomb| tomb.entry.chunk_rate) + .map(|tomb| tomb.chunk_rate) } /// `first_seen` Unix-second timestamps for `txids`, in input order. @@ -286,7 +275,7 @@ impl Mempool { .collect() } - /// Infinite update loop with a 1 second interval. Resolves + /// Infinite update loop with a 500ms interval. Resolves /// confirmed-parent prevouts via the default `getrawtransaction` /// resolver; requires bitcoind started with `txindex=1`. pub fn start(&self) { @@ -298,14 +287,14 @@ impl Mempool { /// Each cycle is wrapped in `catch_unwind` so a panic doesn't /// freeze the snapshot; `parking_lot` locks don't poison. /// - /// Sleep is `PERIOD - work_duration`, so a 700ms cycle followed by - /// a 200ms cycle still ticks roughly every `PERIOD`. When work + /// Sleep is `PERIOD - work_duration`, so a 350ms cycle followed by + /// a 100ms cycle still ticks roughly every `PERIOD`. When work /// overruns `PERIOD`, the next cycle starts immediately. pub fn start_with(&self, resolver: F) where F: Fn(&Txid, Vout) -> Option, { - const PERIOD: Duration = Duration::from_secs(1); + const PERIOD: Duration = Duration::from_millis(500); loop { let started = Instant::now(); let outcome = catch_unwind(AssertUnwindSafe(|| { @@ -337,14 +326,15 @@ impl Mempool { where F: Fn(&Txid, Vout) -> Option, { - let Shared { + let Inner { client, state, rebuilder, } = &*self.0; let Some(Fetched { - entries_info, + live_txids, + new_entries, new_raws, gbt, min_fee, @@ -352,8 +342,8 @@ impl Mempool { else { return Ok(()); }; - let pulled = Preparer::prepare(entries_info, new_raws, state); - let changed = Applier::apply(state, pulled); + let pulled = Preparer::prepare(&live_txids, new_entries, new_raws, state); + let changed = Applier::apply(state, rebuilder, pulled); Prevouts::fill(state, resolver); rebuilder.tick(state, changed, &gbt, min_fee); diff --git a/crates/brk_mempool/src/rbf.rs b/crates/brk_mempool/src/rbf.rs index 264635562..57cc36ae4 100644 --- a/crates/brk_mempool/src/rbf.rs +++ b/crates/brk_mempool/src/rbf.rs @@ -6,7 +6,7 @@ use brk_types::{Sats, Timestamp, Transaction, Txid, TxidPrefix, VSize}; use rustc_hash::FxHashSet; -use crate::{Mempool, TxEntry, TxRemoval, TxStore, stores::TxGraveyard}; +use crate::{Mempool, TxEntry, TxStore, stores::TxGraveyard}; #[derive(Debug, Clone)] pub struct RbfNode { @@ -35,15 +35,15 @@ impl Mempool { /// and return its full predecessor tree, plus the requested tx's /// direct predecessors. Single read-lock window. pub fn rbf_for_tx(&self, txid: &Txid) -> RbfForTx { - let inner = self.read(); + let state = self.read(); - let root_txid = walk_to_replacement_root(&inner.graveyard, *txid); - let replaces: Vec = inner + let root_txid = state.graveyard.replacement_root_of(*txid); + let replaces: Vec = state .graveyard .predecessors_of(txid) .map(|(p, _)| *p) .collect(); - let root = build_node(&root_txid, &inner.txs, &inner.graveyard); + let root = build_node(&root_txid, &state.txs, &state.graveyard); RbfForTx { root, replaces } } @@ -51,30 +51,23 @@ impl Mempool { /// by root, capped at `limit`. `full_rbf_only` drops trees with no /// non-signaling predecessor. pub fn recent_rbf_trees(&self, full_rbf_only: bool, limit: usize) -> Vec { - let inner = self.read(); + let state = self.read(); let mut seen: FxHashSet = FxHashSet::default(); - inner + state .graveyard .replaced_iter_recent_first() .filter_map(|(_, by)| { - let root = walk_to_replacement_root(&inner.graveyard, *by); + let root = state.graveyard.replacement_root_of(*by); seen.insert(root).then_some(root) }) - .filter_map(|root| build_node(&root, &inner.txs, &inner.graveyard)) + .filter_map(|root| build_node(&root, &state.txs, &state.graveyard)) .filter(|n| !full_rbf_only || n.full_rbf) .take(limit) .collect() } } -fn walk_to_replacement_root(graveyard: &TxGraveyard, mut root: Txid) -> Txid { - while let Some(TxRemoval::Replaced { by }) = graveyard.get(&root).map(|t| t.reason()) { - root = *by; - } - root -} - fn build_node(txid: &Txid, txs: &TxStore, graveyard: &TxGraveyard) -> Option { let (tx, entry) = resolve_node(txid, txs, graveyard)?; diff --git a/crates/brk_mempool/src/state.rs b/crates/brk_mempool/src/state.rs index 5d4b80bd0..b2bcf1018 100644 --- a/crates/brk_mempool/src/state.rs +++ b/crates/brk_mempool/src/state.rs @@ -7,10 +7,7 @@ use brk_types::{MempoolInfo, Timestamp, Txid}; -use crate::{ - TxRemoval, - stores::{AddrTracker, OutpointSpends, TxGraveyard, TxStore}, -}; +use crate::stores::{AddrTracker, OutpointSpends, TxGraveyard, TxStore}; #[derive(Default)] pub struct State { @@ -29,7 +26,6 @@ impl State { if let Some(e) = self.txs.entry(txid) { return Some(e.first_seen); } - let tomb = self.graveyard.get(txid)?; - matches!(tomb.reason(), TxRemoval::Vanished).then_some(tomb.entry.first_seen) + self.graveyard.get_vanished(txid).map(|t| t.entry.first_seen) } } diff --git a/crates/brk_mempool/src/steps/applier.rs b/crates/brk_mempool/src/steps/applier.rs index 2db118164..b1da85e82 100644 --- a/crates/brk_mempool/src/steps/applier.rs +++ b/crates/brk_mempool/src/steps/applier.rs @@ -3,7 +3,10 @@ use parking_lot::RwLock; use crate::{ State, TxEntry, TxRemoval, - steps::preparer::{TxAddition, TxsPulled}, + steps::{ + preparer::{TxAddition, TxsPulled}, + rebuilder::{Rebuilder, Snapshot}, + }, }; /// Applies a prepared diff to in-memory mempool state under one write @@ -12,33 +15,48 @@ pub struct Applier; impl Applier { /// Returns true iff anything changed. - pub fn apply(lock: &RwLock, pulled: TxsPulled) -> bool { + /// + /// `rebuilder` supplies the previous cycle's snapshot. Burial reads + /// each tomb's `chunk_rate` from the snapshot (always-fresh, + /// package-aware via local linearization). The fallback to + /// `entry.fee_rate()` is unreachable in steady state - every burial + /// target was alive at the previous tick, so the snapshot has it. + pub fn apply(lock: &RwLock, rebuilder: &Rebuilder, pulled: TxsPulled) -> bool { let TxsPulled { added, removed } = pulled; let has_changes = !added.is_empty() || !removed.is_empty(); let mut state = lock.write(); - Self::bury_removals(&mut state, removed); + Self::bury_removals(&mut state, rebuilder, removed); Self::publish_additions(&mut state, added); state.graveyard.evict_old(); has_changes } - fn bury_removals(state: &mut State, removed: Vec<(TxidPrefix, TxRemoval)>) { + fn bury_removals( + state: &mut State, + rebuilder: &Rebuilder, + removed: Vec<(TxidPrefix, TxRemoval)>, + ) { + let snapshot = rebuilder.snapshot(); for (prefix, reason) in removed { - Self::bury_one(state, &prefix, reason); + Self::bury_one(state, &snapshot, &prefix, reason); } } - fn bury_one(state: &mut State, prefix: &TxidPrefix, reason: TxRemoval) { + fn bury_one(state: &mut State, snapshot: &Snapshot, prefix: &TxidPrefix, reason: TxRemoval) { let Some(record) = state.txs.remove_by_prefix(prefix) else { return; }; - let txid = record.entry.txid; + let chunk_rate = snapshot + .chunk_rate_for(prefix) + .unwrap_or_else(|| record.entry.fee_rate()); state.info.remove(&record.tx, record.entry.fee); - state.addrs.remove_tx(&record.tx, &txid); + state.addrs.remove_tx(&record.tx); state.outpoint_spends.remove_spends(&record.tx, *prefix); - state.graveyard.bury(txid, record.tx, record.entry, reason); + state + .graveyard + .bury(record.tx, record.entry, chunk_rate, reason); } fn publish_additions(state: &mut State, added: Vec) { @@ -62,7 +80,7 @@ impl Applier { fn publish_one(state: &mut State, tx: Transaction, entry: TxEntry) { let prefix = entry.txid_prefix(); state.info.add(&tx, entry.fee); - state.addrs.add_tx(&tx, &entry.txid); + state.addrs.add_tx(&tx); state.outpoint_spends.insert_spends(&tx, prefix); state.txs.insert(tx, entry); } diff --git a/crates/brk_mempool/src/steps/fetcher/fetched.rs b/crates/brk_mempool/src/steps/fetcher/fetched.rs index e5c28db63..c00f33eb5 100644 --- a/crates/brk_mempool/src/steps/fetcher/fetched.rs +++ b/crates/brk_mempool/src/steps/fetcher/fetched.rs @@ -3,7 +3,12 @@ use brk_types::{FeeRate, MempoolEntryInfo, Txid}; use rustc_hash::FxHashMap; pub struct Fetched { - pub entries_info: Vec, + /// Every txid currently in the mempool (from `getrawmempool false`). + /// Used to derive the `live` set for removal classification. + pub live_txids: Vec, + /// `MempoolEntryInfo` for newly-observed txids only (existing ones + /// keep their first-sight entry on the live store). + pub new_entries: Vec, pub new_raws: FxHashMap, pub gbt: Vec, pub min_fee: FeeRate, diff --git a/crates/brk_mempool/src/steps/fetcher/mod.rs b/crates/brk_mempool/src/steps/fetcher/mod.rs index 720425467..d32bdbfeb 100644 --- a/crates/brk_mempool/src/steps/fetcher/mod.rs +++ b/crates/brk_mempool/src/steps/fetcher/mod.rs @@ -4,34 +4,32 @@ pub use fetched::Fetched; use brk_error::Result; use brk_rpc::{Client, MempoolState}; -use brk_types::{MempoolEntryInfo, Txid}; +use brk_types::Txid; use parking_lot::RwLock; -use crate::{ - State, - stores::{TxGraveyard, TxStore}, -}; +use crate::{State, stores::TxStore}; /// Cap before the batch RPC so we never hand bitcoind an unbounded batch. const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000; -/// Two batched round-trips per cycle regardless of mempool size: -/// `getrawmempool verbose` + `getblocktemplate` + `getmempoolinfo` in -/// one mixed batch, then `getrawtransaction` for new txs. +/// Three batched round-trips per cycle, scaling with churn rather than +/// mempool size: `getblocktemplate` + `getrawmempool false` + +/// `getmempoolinfo` in one mixed batch; then `getmempoolentry` and +/// `getrawtransaction` per *new* txid only. /// -/// `getblocktemplate` is validated to be a subset of the verbose -/// listing inside the RPC layer; mismatches return `Ok(None)` so the -/// cycle is skipped without polluting downstream state. +/// `getblocktemplate` is validated to be a subset of the txid listing +/// inside the RPC layer; mismatches return `Ok(None)` so the cycle is +/// skipped without polluting downstream state. /// /// Confirmed prevouts are resolved post-apply by the caller-supplied /// resolver passed to `Mempool::update_with`, so the in-crate path no -/// longer issues a third batch for parents. +/// longer issues a fourth batch for parents. pub struct Fetcher; impl Fetcher { pub fn fetch(client: &Client, lock: &RwLock) -> Result> { let Some(MempoolState { - entries, + live_txids, gbt, min_fee, }) = client.fetch_mempool_state()? @@ -40,27 +38,30 @@ impl Fetcher { }; let new_txids = { let state = lock.read(); - Self::new_txids(&entries, &state.txs, &state.graveyard) + Self::new_txids(&live_txids, &state.txs) }; + let new_entries = client.fetch_mempool_entries(&new_txids)?; let new_raws = client.get_raw_transactions(&new_txids)?; Ok(Some(Fetched { - entries_info: entries, + live_txids, + new_entries, new_raws, gbt, min_fee, })) } - fn new_txids( - entries_info: &[MempoolEntryInfo], - known: &TxStore, - graveyard: &TxGraveyard, - ) -> Vec { - entries_info + /// Live txids the local store hasn't seen yet. Graveyard txs are + /// included so a re-broadcast (post-reorg or a peer republishing) + /// flows through `Preparer::classify_addition` and lands as + /// [`crate::TxAddition::Revived`] instead of sitting orphaned for + /// the full graveyard retention. + fn new_txids(live_txids: &[Txid], known: &TxStore) -> Vec { + live_txids .iter() - .filter(|info| !known.contains(&info.txid) && !graveyard.contains(&info.txid)) + .filter(|txid| !known.contains(txid)) .take(MAX_TX_FETCHES_PER_CYCLE) - .map(|info| info.txid) + .copied() .collect() } } diff --git a/crates/brk_mempool/src/steps/mod.rs b/crates/brk_mempool/src/steps/mod.rs index 4808b70e4..58a0e2bcf 100644 --- a/crates/brk_mempool/src/steps/mod.rs +++ b/crates/brk_mempool/src/steps/mod.rs @@ -1,13 +1,15 @@ -//! The five pipeline steps. See the crate-level docs for the cycle. +//! The five pipeline steps, in cycle order. See the crate-level docs +//! for the full cycle narrative. mod applier; mod fetcher; -pub(crate) mod preparer; +mod preparer; mod prevouts; -pub(crate) mod rebuilder; +mod rebuilder; -pub use applier::Applier; -pub use fetcher::{Fetched, Fetcher}; -pub use preparer::{Preparer, TxEntry, TxRemoval}; -pub use prevouts::Prevouts; -pub use rebuilder::{BlockStats, Rebuilder, RecommendedFees, SnapTx, Snapshot, TxIndex}; +pub(crate) use applier::Applier; +pub(crate) use fetcher::{Fetched, Fetcher}; +pub(crate) use preparer::{Preparer, TxEntry, TxRemoval}; +pub(crate) use prevouts::Prevouts; +pub(crate) use rebuilder::{BlockStats, RecommendedFees, Rebuilder, SnapTx, TxIndex}; +pub use rebuilder::Snapshot; diff --git a/crates/brk_mempool/src/steps/preparer/mod.rs b/crates/brk_mempool/src/steps/preparer/mod.rs index 5338a5f8a..b018c5526 100644 --- a/crates/brk_mempool/src/steps/preparer/mod.rs +++ b/crates/brk_mempool/src/steps/preparer/mod.rs @@ -1,18 +1,19 @@ //! Turn `Fetched` raws into a typed diff for the Applier. Pure CPU, -//! holds a read guard on `State` for the cycle. New txs are -//! classified into three buckets: +//! holds a read guard on `State` for the cycle. New entries are +//! classified into two buckets: //! -//! - **live** - already in `known`, skipped. //! - **revivable** - in the graveyard, resurrected from the tombstone. //! - **fresh** - decoded from `new_raws`, prevouts resolved against //! the live mempool only. Confirmed-parent prevouts land as //! `prevout: None` and are filled post-apply by the resolver passed //! to `Mempool::update_with`. //! -//! Removals are inferred by cross-referencing inputs. +//! Existing entries are not re-classified - they keep their first-sight +//! state on the live store. Removals are inferred by cross-referencing +//! inputs against the full `live_txids` set from the cycle's pull. use brk_rpc::RawTx; -use brk_types::{MempoolEntryInfo, Txid, TxidPrefix}; +use brk_types::{MempoolEntryInfo, Transaction, Txid, TxidPrefix, Vout}; use parking_lot::RwLock; use rustc_hash::{FxHashMap, FxHashSet}; @@ -31,39 +32,39 @@ pub use tx_entry::TxEntry; pub use tx_removal::TxRemoval; pub use txs_pulled::TxsPulled; +type SpentBy = FxHashMap<(Txid, Vout), Txid>; + pub struct Preparer; impl Preparer { pub fn prepare( - entries_info: Vec, + live_txids: &[Txid], + new_entries: Vec, new_raws: FxHashMap, lock: &RwLock, ) -> TxsPulled { let state = lock.read(); - let live: FxHashSet = entries_info - .iter() - .map(|info| TxidPrefix::from(&info.txid)) - .collect(); - let added = Self::classify_additions(entries_info, new_raws, &state.txs, &state.graveyard); - let removed = TxRemoval::classify(&live, &added, &state.txs); + let live: FxHashSet = live_txids.iter().map(TxidPrefix::from).collect(); + let added = Self::classify_additions(new_entries, new_raws, &state.txs, &state.graveyard); + let removed = Self::classify_removals(&live, &added, &state.txs); TxsPulled { added, removed } } fn classify_additions( - entries_info: Vec, + new_entries: Vec, mut new_raws: FxHashMap, known: &TxStore, graveyard: &TxGraveyard, ) -> Vec { - entries_info + new_entries .iter() - .filter_map(|info| Self::classify(info, known, graveyard, &mut new_raws)) + .filter_map(|info| Self::classify_addition(info, known, graveyard, &mut new_raws)) .collect() } - fn classify( + fn classify_addition( info: &MempoolEntryInfo, known: &TxStore, graveyard: &TxGraveyard, @@ -78,4 +79,44 @@ impl Preparer { let raw = new_raws.remove(&info.txid)?; Some(TxAddition::fresh(info, raw, known)) } + + /// One `(prefix, reason)` per known tx that's gone from the live set, + /// in `known` iteration order. + fn classify_removals( + live: &FxHashSet, + added: &[TxAddition], + known: &TxStore, + ) -> Vec<(TxidPrefix, TxRemoval)> { + let spent_by = Self::build_spent_by(added); + known + .records() + .filter_map(|(prefix, record)| { + if live.contains(prefix) { + return None; + } + Some((*prefix, Self::removal_reason(&record.tx, &spent_by))) + }) + .collect() + } + + fn removal_reason(tx: &Transaction, spent_by: &SpentBy) -> TxRemoval { + tx.input + .iter() + .find_map(|i| spent_by.get(&(i.txid, i.vout)).copied()) + .map_or(TxRemoval::Vanished, |by| TxRemoval::Replaced { by }) + } + + /// Only `Fresh` additions carry tx input data. Revived txs were + /// already in-pool, so they can't be new spenders of anything. + fn build_spent_by(added: &[TxAddition]) -> SpentBy { + let mut spent_by: SpentBy = FxHashMap::default(); + for addition in added { + if let TxAddition::Fresh { tx, .. } = addition { + for txin in &tx.input { + spent_by.insert((txin.txid, txin.vout), tx.txid); + } + } + } + spent_by + } } diff --git a/crates/brk_mempool/src/steps/preparer/tx_addition.rs b/crates/brk_mempool/src/steps/preparer/tx_addition.rs index 64d2e661c..70e7b6600 100644 --- a/crates/brk_mempool/src/steps/preparer/tx_addition.rs +++ b/crates/brk_mempool/src/steps/preparer/tx_addition.rs @@ -69,7 +69,7 @@ impl TxAddition { fn build_txin(txin: bitcoin::TxIn, mempool_txs: &TxStore) -> TxIn { let prev_txid: Txid = txin.previous_output.txid.into(); - let prev_vout = usize::from(Vout::from(txin.previous_output.vout)); + let prev_vout = Vout::from(txin.previous_output.vout); let prevout = Self::resolve_prevout(&prev_txid, prev_vout, mempool_txs); TxIn { @@ -78,7 +78,7 @@ impl TxAddition { is_coinbase: false, prevout, txid: prev_txid, - vout: txin.previous_output.vout.into(), + vout: prev_vout, script_sig: txin.script_sig, script_sig_asm: (), witness: txin.witness.into(), @@ -88,10 +88,10 @@ impl TxAddition { } } - fn resolve_prevout(prev_txid: &Txid, prev_vout: usize, mempool_txs: &TxStore) -> Option { + fn resolve_prevout(prev_txid: &Txid, prev_vout: Vout, mempool_txs: &TxStore) -> Option { let prev = mempool_txs.get(prev_txid)?; prev.output - .get(prev_vout) + .get(usize::from(prev_vout)) .map(|o| TxOut::from((o.script_pubkey.clone(), o.value))) } } diff --git a/crates/brk_mempool/src/steps/preparer/tx_entry.rs b/crates/brk_mempool/src/steps/preparer/tx_entry.rs index 3684060de..49c9ac935 100644 --- a/crates/brk_mempool/src/steps/preparer/tx_entry.rs +++ b/crates/brk_mempool/src/steps/preparer/tx_entry.rs @@ -2,9 +2,8 @@ use brk_types::{FeeRate, MempoolEntryInfo, Sats, Timestamp, Txid, TxidPrefix, VS use smallvec::SmallVec; /// A mempool transaction entry. Carries the per-tx facts needed for -/// projection, plus the snapshot-time `chunk_rate` (Core's cluster-mempool -/// chunk fee rate, or the proxy fallback) used as the effective rate -/// for partitioning, fee tiers, and CPFP. +/// projection. Chunk rates live on the snapshot (linearized fresh each +/// cycle) - not stored here. #[derive(Debug, Clone)] pub struct TxEntry { pub txid: Txid, @@ -17,11 +16,6 @@ pub struct TxEntry { pub first_seen: Timestamp, /// BIP-125 explicit signaling: any input has sequence < 0xfffffffe. pub rbf: bool, - /// Effective per-vbyte rate Core would mine this tx at. From - /// `MempoolEntryInfo::chunk_rate()`: Core 31+ uses `fees.chunk / - /// (chunkweight/4)`, older Core falls back to - /// `max(ancestor_rate, descendant_pkg_rate)`. - pub chunk_rate: FeeRate, } impl TxEntry { @@ -35,7 +29,6 @@ impl TxEntry { depends: info.depends.iter().map(TxidPrefix::from).collect(), first_seen: info.first_seen, rbf, - chunk_rate: info.chunk_rate(), } } diff --git a/crates/brk_mempool/src/steps/preparer/tx_removal.rs b/crates/brk_mempool/src/steps/preparer/tx_removal.rs index 65c3f7452..883ffd6a8 100644 --- a/crates/brk_mempool/src/steps/preparer/tx_removal.rs +++ b/crates/brk_mempool/src/steps/preparer/tx_removal.rs @@ -1,12 +1,7 @@ -//! Why a tx left the mempool between two pull cycles, plus the -//! classifier that diffs the live prefix set against `known` to -//! produce one [`TxRemoval`] per loser. +//! Why a tx left the mempool between two pull cycles. The diff that +//! produces one [`TxRemoval`] per loser lives on [`super::Preparer`]. -use brk_types::{Transaction, Txid, TxidPrefix, Vout}; -use rustc_hash::{FxHashMap, FxHashSet}; - -use super::TxAddition; -use crate::stores::TxStore; +use brk_types::Txid; /// `Replaced` = at least one freshly added tx this cycle spends one of /// its inputs (BIP-125 replacement inferred from conflicting outpoints). @@ -18,47 +13,3 @@ pub enum TxRemoval { Replaced { by: Txid }, Vanished, } - -type SpentBy = FxHashMap<(Txid, Vout), Txid>; - -impl TxRemoval { - /// Returns `(prefix, reason)` pairs in iteration order of `known`. - pub(super) fn classify( - live: &FxHashSet, - added: &[TxAddition], - known: &TxStore, - ) -> Vec<(TxidPrefix, Self)> { - let spent_by = Self::build_spent_by(added); - - known - .records() - .filter_map(|(prefix, record)| { - if live.contains(prefix) { - return None; - } - Some((*prefix, Self::find_removal(&record.tx, &spent_by))) - }) - .collect() - } - - fn find_removal(tx: &Transaction, spent_by: &SpentBy) -> Self { - tx.input - .iter() - .find_map(|i| spent_by.get(&(i.txid, i.vout)).cloned()) - .map_or(Self::Vanished, |by| Self::Replaced { by }) - } - - /// Only `Fresh` additions carry tx input data. Revived txs were - /// already in-pool, so they can't be new spenders of anything. - fn build_spent_by(added: &[TxAddition]) -> SpentBy { - let mut spent_by: SpentBy = FxHashMap::default(); - for addition in added { - if let TxAddition::Fresh { tx, .. } = addition { - for txin in &tx.input { - spent_by.insert((txin.txid, txin.vout), tx.txid); - } - } - } - spent_by - } -} diff --git a/crates/brk_mempool/src/steps/prevouts.rs b/crates/brk_mempool/src/steps/prevouts.rs index 25f266761..5739f5ae0 100644 --- a/crates/brk_mempool/src/steps/prevouts.rs +++ b/crates/brk_mempool/src/steps/prevouts.rs @@ -54,8 +54,12 @@ impl Prevouts { } let mut state = lock.write(); - Self::write_fills(&mut state, in_mempool); - Self::write_fills(&mut state, external); + for (txid, fills) in in_mempool.into_iter().chain(external) { + let prefix = TxidPrefix::from(&txid); + for prevout in state.txs.apply_fills(&prefix, fills) { + state.addrs.add_input(&txid, &prevout); + } + } true } @@ -140,13 +144,4 @@ impl Prevouts { }) .collect() } - - fn write_fills(state: &mut State, fills: FillBatch) { - for (txid, tx_fills) in fills { - let prefix = TxidPrefix::from(&txid); - for prevout in state.txs.apply_fills(&prefix, tx_fills) { - state.addrs.add_input(&txid, &prevout); - } - } - } } diff --git a/crates/brk_mempool/src/steps/rebuilder/mod.rs b/crates/brk_mempool/src/steps/rebuilder/mod.rs index 41607e59e..e16ffb9ad 100644 --- a/crates/brk_mempool/src/steps/rebuilder/mod.rs +++ b/crates/brk_mempool/src/steps/rebuilder/mod.rs @@ -14,7 +14,7 @@ use rustc_hash::FxHashSet; use crate::State; use partition::Partitioner; -use snapshot::{PrefixIndex, builder}; +use snapshot::build_txs; mod partition; mod snapshot; @@ -56,24 +56,20 @@ impl Rebuilder { return; } let snap = Self::build_snapshot(lock, gbt, min_fee); - let block0_set: FxHashSet = snap.blocks[0] - .iter() - .map(|idx| snap.txs[idx.as_usize()].txid) - .collect(); + let block0_set: FxHashSet = snap.block0_txids().collect(); let next_hash = snap.next_block_hash; *self.snapshot.write() = Arc::new(snap); - self.push_history(next_hash, block0_set); - self.dirty.store(false, Ordering::Release); - self.rebuild_count.fetch_add(1, Ordering::Relaxed); - } - fn push_history(&self, hash: NextBlockHash, set: FxHashSet) { let mut hist = self.history.write(); - hist.retain(|(h, _)| *h != hash); - hist.push_back((hash, set)); + hist.retain(|(h, _)| *h != next_hash); + hist.push_back((next_hash, block0_set)); while hist.len() > HISTORY { hist.pop_front(); } + drop(hist); + + self.dirty.store(false, Ordering::Release); + self.rebuild_count.fetch_add(1, Ordering::Relaxed); } /// Past block-0 txid set for `hash`, or `None` if it has aged out @@ -102,10 +98,17 @@ impl Rebuilder { ) -> Snapshot { let (txs, prefix_to_idx) = { let state = lock.read(); - builder::build_txs(&state.txs) + build_txs(&state.txs) }; - let block0 = Self::block_from_gbt(gbt, &prefix_to_idx); + // Block 0 from `getblocktemplate`: Core's actual selection. + // Fetcher already validated GBT ⊆ live txid listing, so any + // drop here is a same-cycle race and the partitioner picks up + // the slack so callers always see NUM_BLOCKS blocks. + let block0: Vec = gbt + .iter() + .filter_map(|t| prefix_to_idx.get(&TxidPrefix::from(&t.txid)).copied()) + .collect(); let excluded: FxHashSet = block0.iter().copied().collect(); let rest = Partitioner::partition(&txs, &excluded, NUM_BLOCKS.saturating_sub(1)); @@ -116,17 +119,6 @@ impl Rebuilder { Snapshot::build(txs, blocks, prefix_to_idx, min_fee) } - /// Block 0 from `getblocktemplate`: Core's actual selection. Maps - /// each GBT txid back to its `TxIndex` via the per-build prefix - /// index. Fetcher already validated GBT ⊆ verbose mempool, so any - /// drop here is a same-cycle race and the partitioner picks up the - /// slack so callers always see eight blocks. - fn block_from_gbt(gbt: &[BlockTemplateTx], prefix_to_idx: &PrefixIndex) -> Vec { - gbt.iter() - .filter_map(|t| prefix_to_idx.get(&TxidPrefix::from(&t.txid)).copied()) - .collect() - } - pub fn snapshot(&self) -> Arc { self.snapshot.read().clone() } diff --git a/crates/brk_mempool/src/steps/rebuilder/partition.rs b/crates/brk_mempool/src/steps/rebuilder/partition.rs index 136ecce2c..f4ca5d04d 100644 --- a/crates/brk_mempool/src/steps/rebuilder/partition.rs +++ b/crates/brk_mempool/src/steps/rebuilder/partition.rs @@ -4,9 +4,10 @@ //! `/api/v1/fees/mempool-blocks` as a coarse fee-tier gradient. //! //! No topological gate: a child can sit before its parent within a -//! tied-rate run, but cluster members share a `chunk_rate` so they -//! land in the same block in the common case, and the only output is -//! a per-block rate distribution where intra-block order is invisible. +//! tied-rate run, but every cluster member shares its chunk's +//! `chunk_rate` (linearized at snapshot time) so chunk-mates land in +//! the same block, and the only output is a per-block rate +//! distribution where intra-block order is invisible. //! //! The final block is a catch-all (no vsize cap) so leftover tail //! vsize is accounted for instead of silently dropped. @@ -17,7 +18,7 @@ use std::cmp::Reverse; -use brk_types::VSize; +use brk_types::{FeeRate, VSize}; use rustc_hash::FxHashSet; use super::snapshot::{SnapTx, TxIndex}; @@ -33,12 +34,12 @@ impl Partitioner { if num_remaining_blocks == 0 { return Vec::new(); } - let sorted = sorted_indices(txs, excluded); + let sorted = sorted_candidates(txs, excluded); let mut blocks: Vec> = (0..num_remaining_blocks).map(|_| Vec::new()).collect(); let mut block_vsize = VSize::default(); let mut current = 0; let last = num_remaining_blocks - 1; - for (idx, vsize) in sorted { + for (idx, vsize, _) in sorted { let fits = vsize <= VSize::MAX_BLOCK.saturating_sub(block_vsize); if !fits && current < last && !blocks[current].is_empty() { current += 1; @@ -51,8 +52,11 @@ impl Partitioner { } } -fn sorted_indices(txs: &[SnapTx], excluded: &FxHashSet) -> Vec<(TxIndex, VSize)> { - let mut cands: Vec<(TxIndex, VSize, _)> = txs +fn sorted_candidates( + txs: &[SnapTx], + excluded: &FxHashSet, +) -> Vec<(TxIndex, VSize, FeeRate)> { + let mut cands: Vec<(TxIndex, VSize, FeeRate)> = txs .iter() .enumerate() .filter_map(|(i, t)| { @@ -61,5 +65,5 @@ fn sorted_indices(txs: &[SnapTx], excluded: &FxHashSet) -> Vec<(TxIndex }) .collect(); cands.sort_by_key(|(_, _, rate)| Reverse(*rate)); - cands.into_iter().map(|(i, v, _)| (i, v)).collect() + cands } diff --git a/crates/brk_mempool/src/steps/rebuilder/snapshot/builder.rs b/crates/brk_mempool/src/steps/rebuilder/snapshot/builder.rs index 848ce4da7..cd8948e0c 100644 --- a/crates/brk_mempool/src/steps/rebuilder/snapshot/builder.rs +++ b/crates/brk_mempool/src/steps/rebuilder/snapshot/builder.rs @@ -1,45 +1,55 @@ -//! Build the per-tx adjacency for a snapshot from the live `TxStore`. +//! Build the per-tx adjacency for a snapshot from the live `TxStore`, +//! then linearize chunk rates over every multi-tx cluster. //! //! One pass over the live records to assign compact `TxIndex`es and a //! `prefix -> TxIndex` map, then per entry resolve `depends` against -//! it to produce parent edges. Children are mirrored from parents in -//! a second pass. Cross-pool parents (confirmed or evicted) are -//! dropped silently - the live pool reflects what miners actually see, -//! and any stale `depends` entry is self-healing. +//! it to produce parent edges. Children are mirrored from parents in a +//! second pass. Cross-pool parents (confirmed or evicted) are dropped +//! silently - the live pool reflects what miners actually see, and any +//! stale `depends` entry is self-healing. +//! +//! Final pass: walk every connected component and run Single Fee +//! Linearization over it (see [`crate::cluster`]); each member's +//! `chunk_rate` is overwritten with its chunk's feerate. Singletons +//! keep the `fee/vsize` seed set in `live_tx`. //! //! The prefix map is returned alongside the txs so the rebuilder can //! reuse it for GBT mapping and the final `Snapshot::build` step //! without reconstructing it. +use std::mem; + use brk_types::TxidPrefix; use rustc_hash::{FxBuildHasher, FxHashMap}; use smallvec::SmallVec; -use crate::TxEntry; -use crate::stores::TxStore; +use crate::{ + TxEntry, + cluster::{linearize_component, walk_cluster}, + stores::TxStore, +}; use super::{SnapTx, TxIndex}; pub type PrefixIndex = FxHashMap; pub fn build_txs(txs: &TxStore) -> (Vec, PrefixIndex) { - let (prefix_to_idx, ordered) = compact_index(txs); - let mut snap_txs: Vec = ordered.iter().map(|e| live_tx(e, &prefix_to_idx)).collect(); + let n = txs.len(); + let mut prefix_to_idx: PrefixIndex = + FxHashMap::with_capacity_and_hasher(n, FxBuildHasher); + for (i, (prefix, _)) in txs.records().enumerate() { + prefix_to_idx.insert(*prefix, TxIndex::from(i)); + } + let mut snap_txs: Vec = txs + .records() + .map(|(_, record)| live_tx(&record.entry, &prefix_to_idx)) + .collect(); mirror_children(&mut snap_txs); + refresh_chunk_rates(&mut snap_txs); (snap_txs, prefix_to_idx) } -fn compact_index(txs: &TxStore) -> (PrefixIndex, Vec<&TxEntry>) { - let mut map: PrefixIndex = FxHashMap::with_capacity_and_hasher(txs.len(), FxBuildHasher); - let mut ordered: Vec<&TxEntry> = Vec::with_capacity(txs.len()); - for (i, (prefix, record)) in txs.records().enumerate() { - map.insert(*prefix, TxIndex::from(i)); - ordered.push(&record.entry); - } - (map, ordered) -} - fn live_tx(e: &TxEntry, prefix_to_idx: &PrefixIndex) -> SnapTx { let parents: SmallVec<[TxIndex; 2]> = e .depends @@ -52,24 +62,160 @@ fn live_tx(e: &TxEntry, prefix_to_idx: &PrefixIndex) -> SnapTx { vsize: e.vsize, weight: e.weight, size: e.size, - chunk_rate: e.chunk_rate, + chunk_rate: e.fee_rate(), parents, children: SmallVec::new(), } } fn mirror_children(txs: &mut [SnapTx]) { - let edges: Vec<(TxIndex, TxIndex)> = txs - .iter() - .enumerate() - .flat_map(|(i, t)| { - let child = TxIndex::from(i); - t.parents.iter().map(move |&p| (p, child)) - }) - .collect(); - for (parent, child) in edges { - if let Some(t) = txs.get_mut(parent.as_usize()) { - t.children.push(child); + for i in 0..txs.len() { + let child = TxIndex::from(i); + let parents = mem::take(&mut txs[i].parents); + for &p in &parents { + if let Some(t) = txs.get_mut(p.as_usize()) { + t.children.push(child); + } + } + txs[i].parents = parents; + } +} + +/// Walk every multi-tx connected component once and overwrite each +/// member's `chunk_rate` with the linearized chunk's feerate. Visited +/// bitmap ensures each cluster is linearized exactly once. +fn refresh_chunk_rates(snap_txs: &mut [SnapTx]) { + let n = snap_txs.len(); + let mut visited = vec![false; n]; + for seed in 0..n { + if visited[seed] { + continue; + } + let t = &snap_txs[seed]; + if t.parents.is_empty() && t.children.is_empty() { + visited[seed] = true; + continue; + } + let component = walk_cluster(snap_txs, TxIndex::from(seed)); + for &m in &component { + visited[m.as_usize()] = true; + } + if component.len() <= 1 { + continue; + } + let (members, chunks) = linearize_component(snap_txs, &component); + for chunk in &chunks { + for &local in &chunk.txs { + let m = members[u32::from(local) as usize]; + snap_txs[m.as_usize()].chunk_rate = chunk.feerate; + } } } } + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicU32, Ordering}; + + use bitcoin::hashes::Hash; + use brk_types::{FeeRate, Sats, Txid, VSize, Weight}; + + use super::*; + + /// Build a `SnapTx` for tests. `txid` is auto-assigned from a + /// process-wide counter so each tx is distinguishable in + /// debug output; the cluster code itself keys off `TxIndex`, + /// not `txid`. + fn snap_tx(fee: Sats, vsize: VSize) -> SnapTx { + static COUNTER: AtomicU32 = AtomicU32::new(0); + let mut bytes = [0u8; 32]; + bytes[..4].copy_from_slice(&COUNTER.fetch_add(1, Ordering::Relaxed).to_le_bytes()); + SnapTx { + txid: Txid::from(bitcoin::Txid::from_byte_array(bytes)), + fee, + vsize, + weight: Weight::from(vsize), + size: u64::from(vsize), + chunk_rate: FeeRate::from((fee, vsize)), + parents: SmallVec::new(), + children: SmallVec::new(), + } + } + + fn link(txs: &mut [SnapTx], parent: usize, child: usize) { + txs[child].parents.push(TxIndex::from(parent)); + txs[parent].children.push(TxIndex::from(child)); + } + + fn sats(n: u64) -> Sats { + Sats::from(n) + } + + fn vsize(n: u64) -> VSize { + VSize::from(n) + } + + #[test] + fn singleton_keeps_fee_per_vsize() { + let mut txs = vec![snap_tx(sats(1000), vsize(100))]; + let seed = txs[0].chunk_rate; + refresh_chunk_rates(&mut txs); + assert_eq!(txs[0].chunk_rate, seed); + } + + #[test] + fn two_tx_cpfp_lift() { + let mut txs = vec![ + snap_tx(sats(100), vsize(100)), + snap_tx(sats(1900), vsize(100)), + ]; + link(&mut txs, 0, 1); + let parent_seed = txs[0].chunk_rate; + refresh_chunk_rates(&mut txs); + assert!(txs[0].chunk_rate > parent_seed); + assert_eq!(txs[0].chunk_rate, txs[1].chunk_rate); + assert_eq!(txs[0].chunk_rate, FeeRate::from((sats(2000), vsize(200)))); + } + + #[test] + fn three_tx_chain_chunks_correctly() { + let mut txs = vec![ + snap_tx(sats(100), vsize(100)), + snap_tx(sats(100), vsize(100)), + snap_tx(sats(5800), vsize(100)), + ]; + link(&mut txs, 0, 1); + link(&mut txs, 1, 2); + refresh_chunk_rates(&mut txs); + let combined = FeeRate::from((sats(6000), vsize(300))); + assert_eq!(txs[0].chunk_rate, combined); + assert_eq!(txs[1].chunk_rate, combined); + assert_eq!(txs[2].chunk_rate, combined); + } + + #[test] + fn disjoint_clusters_linearized_independently() { + let mut txs = vec![ + snap_tx(sats(100), vsize(100)), + snap_tx(sats(1900), vsize(100)), + snap_tx(sats(500), vsize(100)), + snap_tx(sats(4500), vsize(100)), + ]; + link(&mut txs, 0, 1); + link(&mut txs, 2, 3); + refresh_chunk_rates(&mut txs); + assert_eq!(txs[0].chunk_rate, txs[1].chunk_rate); + assert_eq!(txs[2].chunk_rate, txs[3].chunk_rate); + assert_ne!(txs[0].chunk_rate, txs[2].chunk_rate); + } + + #[test] + fn cluster_cap_does_not_panic() { + let n = 100; + let mut txs: Vec = (0..n).map(|_| snap_tx(sats(1000), vsize(100))).collect(); + for i in 1..n { + link(&mut txs, i - 1, i); + } + refresh_chunk_rates(&mut txs); + } +} diff --git a/crates/brk_mempool/src/steps/rebuilder/snapshot/fees.rs b/crates/brk_mempool/src/steps/rebuilder/snapshot/fees.rs index fe30d9617..7be8b5533 100644 --- a/crates/brk_mempool/src/steps/rebuilder/snapshot/fees.rs +++ b/crates/brk_mempool/src/steps/rebuilder/snapshot/fees.rs @@ -74,7 +74,7 @@ impl Fees { previous_fee: Option, min_fee: FeeRate, ) -> FeeRate { - let median = block.median_fee_rate(); + let median = block.fee_range[3]; let use_fee = previous_fee.map_or(median, |prev| FeeRate::mean(median, prev)); let vsize = u64::from(block.total_vsize); if vsize <= EMPTY_BLOCK_VSIZE || median < min_fee { diff --git a/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs b/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs index 57f5647df..2033fc5ec 100644 --- a/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs +++ b/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs @@ -1,26 +1,26 @@ -pub mod builder; +mod builder; mod fees; mod stats; mod tx; mod tx_index; -pub use builder::PrefixIndex; +pub(crate) use builder::{PrefixIndex, build_txs}; pub use stats::BlockStats; pub use tx::SnapTx; pub use tx_index::TxIndex; use std::hash::{DefaultHasher, Hash, Hasher}; -use brk_types::{FeeRate, NextBlockHash, RecommendedFees, TxidPrefix}; +use brk_types::{FeeRate, NextBlockHash, RecommendedFees, Txid, TxidPrefix}; use fees::Fees; #[derive(Default)] pub struct Snapshot { /// Dense per-tx data indexed by `TxIndex`. Each entry carries the - /// chunk rate (Core's chunk-mempool truth or proxy fallback) plus - /// resolved parent/child adjacency, so CPFP queries don't re-read - /// any external state. + /// linearized chunk rate (computed locally at snapshot build time) + /// plus resolved parent/child adjacency, so CPFP queries don't + /// re-read any external state. pub txs: Vec, /// Projected blocks. `blocks[0]` is Core's `getblocktemplate` /// (Bitcoin Core's actual selection); the rest are greedy-packed @@ -33,8 +33,8 @@ pub struct Snapshot { pub next_block_hash: NextBlockHash, /// Per-snapshot `TxidPrefix -> TxIndex` index, so live queries can /// resolve a prefix to the snapshot's compact index without - /// re-walking `txs`. Built once by `builder::build_txs` and reused - /// by the rebuilder for GBT mapping. + /// re-walking `txs`. Built once by `build_txs` and reused by the + /// rebuilder for GBT mapping. prefix_to_idx: PrefixIndex, } @@ -47,17 +47,7 @@ impl Snapshot { prefix_to_idx: PrefixIndex, min_fee: FeeRate, ) -> Self { - let block_stats: Vec = blocks - .iter() - .enumerate() - .map(|(i, block)| { - if i == 0 { - BlockStats::compute_core(block, &txs) - } else { - BlockStats::compute_projected(block, &txs) - } - }) - .collect(); + let block_stats = BlockStats::for_blocks(&blocks, &txs); let fees = Fees::compute(&block_stats, min_fee); let next_block_hash = Self::hash_next_block(&blocks); Self { @@ -87,8 +77,20 @@ impl Snapshot { self.prefix_to_idx.get(prefix).copied() } - /// Effective chunk rate for a live tx by prefix, or `None` if the - /// tx isn't in this snapshot. + /// Txids of `blocks[0]` (Core's `getblocktemplate` selection), + /// in template order. Empty for a default snapshot. + pub fn block0_txids(&self) -> impl Iterator + '_ { + self.blocks + .first() + .into_iter() + .flatten() + .map(|idx| self.txs[idx.as_usize()].txid) + } + + /// Linearized chunk rate for a live tx by prefix. Always fresh + /// (recomputed each snapshot), package-aware (CPFP and ancestor + /// chains lift correctly), and equals `fee/vsize` for singletons. + /// Returns `None` if the tx isn't in this snapshot. pub fn chunk_rate_for(&self, prefix: &TxidPrefix) -> Option { let idx = self.idx_of(prefix)?; Some(self.txs[idx.as_usize()].chunk_rate) diff --git a/crates/brk_mempool/src/steps/rebuilder/snapshot/stats.rs b/crates/brk_mempool/src/steps/rebuilder/snapshot/stats.rs index 954a30b1f..4821db98f 100644 --- a/crates/brk_mempool/src/steps/rebuilder/snapshot/stats.rs +++ b/crates/brk_mempool/src/steps/rebuilder/snapshot/stats.rs @@ -32,14 +32,18 @@ pub struct BlockStats { } impl BlockStats { - /// Block 0 (Core's actual selection): exact 0/10/25/50/75/90/100. - pub fn compute_core(block: &[TxIndex], txs: &[SnapTx]) -> Self { - Self::compute(block, txs, CORE_PERCENTILES) - } - - /// Blocks 1..N (projected): clipped 5/95 bounds to hide outliers. - pub fn compute_projected(block: &[TxIndex], txs: &[SnapTx]) -> Self { - Self::compute(block, txs, PROJECTED_PERCENTILES) + /// Stats for every projected block in `blocks`, in order. `blocks[0]` + /// uses Core's exact 0..100 percentiles; the rest use the clipped + /// 5..95 range to hide CPFP / stale-GBT outliers. + pub fn for_blocks(blocks: &[Vec], txs: &[SnapTx]) -> Vec { + blocks + .iter() + .enumerate() + .map(|(i, block)| { + let pct = if i == 0 { CORE_PERCENTILES } else { PROJECTED_PERCENTILES }; + Self::compute(block, txs, pct) + }) + .collect() } /// Vsize-weighted percentile distribution over `chunk_rate` - @@ -78,10 +82,6 @@ impl BlockStats { fee_range, } } - - pub fn median_fee_rate(&self) -> FeeRate { - self.fee_range[3] - } } impl From<&BlockStats> for MempoolBlock { diff --git a/crates/brk_mempool/src/steps/rebuilder/snapshot/tx.rs b/crates/brk_mempool/src/steps/rebuilder/snapshot/tx.rs index 9008e2004..e3b4c7d23 100644 --- a/crates/brk_mempool/src/steps/rebuilder/snapshot/tx.rs +++ b/crates/brk_mempool/src/steps/rebuilder/snapshot/tx.rs @@ -3,10 +3,11 @@ use smallvec::SmallVec; use super::TxIndex; -/// Frozen per-tx view used by the snapshot. Holds the chunk rate -/// (Core's `fees.chunk` / `chunkweight` when available, else proxy) -/// plus resolved parent/child adjacency in `TxIndex` space, so -/// CPFP queries are a pure walk over `Snapshot.txs`. +/// Frozen per-tx view used by the snapshot. `chunk_rate` is the +/// linearized chunk feerate (local Single Fee Linearization, run fresh +/// every snapshot); singletons report `fee/vsize`. Parent/child +/// adjacency in `TxIndex` space, so CPFP queries are a pure walk over +/// `Snapshot.txs`. #[derive(Clone, Debug)] pub struct SnapTx { pub txid: Txid, diff --git a/crates/brk_mempool/src/stores/addr_tracker/mod.rs b/crates/brk_mempool/src/stores/addr_tracker/mod.rs index 7423c4bd1..c625615f3 100644 --- a/crates/brk_mempool/src/stores/addr_tracker/mod.rs +++ b/crates/brk_mempool/src/stores/addr_tracker/mod.rs @@ -15,7 +15,8 @@ use addr_entry::AddrEntry; pub struct AddrTracker(FxHashMap); impl AddrTracker { - pub fn add_tx(&mut self, tx: &Transaction, txid: &Txid) { + pub fn add_tx(&mut self, tx: &Transaction) { + let txid = &tx.txid; for txin in &tx.input { if let Some(prevout) = txin.prevout.as_ref() { self.add_input(txid, prevout); @@ -28,7 +29,8 @@ impl AddrTracker { } } - pub fn remove_tx(&mut self, tx: &Transaction, txid: &Txid) { + pub fn remove_tx(&mut self, tx: &Transaction) { + let txid = &tx.txid; for txin in &tx.input { if let Some(prevout) = txin.prevout.as_ref() { self.remove_input(txid, prevout); diff --git a/crates/brk_mempool/src/stores/mod.rs b/crates/brk_mempool/src/stores/mod.rs index 63db69cad..4836377f6 100644 --- a/crates/brk_mempool/src/stores/mod.rs +++ b/crates/brk_mempool/src/stores/mod.rs @@ -1,13 +1,14 @@ -//! Stateful in-memory holders. After Phase 3 they're plain owned -//! types (no internal locks) — `State` aggregates them under a -//! single `RwLock` in `crate::state`. +//! In-memory holders for live mempool state. Plain owned types with +//! no internal locks: `crate::state::State` aggregates them under a +//! single `RwLock` so the cycle steps and read-side accessors share +//! one lock-order discipline. -pub mod addr_tracker; +pub(crate) mod addr_tracker; pub(crate) mod outpoint_spends; -pub mod tx_graveyard; -pub mod tx_store; +pub(crate) mod tx_graveyard; +pub(crate) mod tx_store; -pub use addr_tracker::AddrTracker; +pub(crate) use addr_tracker::AddrTracker; pub(crate) use outpoint_spends::OutpointSpends; -pub use tx_graveyard::{TxGraveyard, TxTombstone}; -pub use tx_store::TxStore; +pub(crate) use tx_graveyard::{TxGraveyard, TxTombstone}; +pub(crate) use tx_store::TxStore; diff --git a/crates/brk_mempool/src/stores/outpoint_spends.rs b/crates/brk_mempool/src/stores/outpoint_spends.rs index c87dd9fca..3707989df 100644 --- a/crates/brk_mempool/src/stores/outpoint_spends.rs +++ b/crates/brk_mempool/src/stores/outpoint_spends.rs @@ -13,11 +13,7 @@ pub struct OutpointSpends(FxHashMap); impl OutpointSpends { pub fn insert_spends(&mut self, tx: &Transaction, spender: TxidPrefix) { - for input in &tx.input { - if input.is_coinbase { - continue; - } - let key = OutpointPrefix::new(TxidPrefix::from(&input.txid), input.vout); + for key in spent_outpoints(tx) { self.0.insert(key, spender); } } @@ -25,11 +21,7 @@ impl OutpointSpends { /// Only removes entries whose stored prefix still matches `spender`, /// so an outpoint already re-claimed by a later spender is left alone. pub fn remove_spends(&mut self, tx: &Transaction, spender: TxidPrefix) { - for input in &tx.input { - if input.is_coinbase { - continue; - } - let key = OutpointPrefix::new(TxidPrefix::from(&input.txid), input.vout); + for key in spent_outpoints(tx) { if self.0.get(&key) == Some(&spender) { self.0.remove(&key); } @@ -41,3 +33,10 @@ impl OutpointSpends { self.0.get(key).copied() } } + +fn spent_outpoints(tx: &Transaction) -> impl Iterator + '_ { + tx.input + .iter() + .filter(|i| !i.is_coinbase) + .map(|i| OutpointPrefix::new(TxidPrefix::from(&i.txid), i.vout)) +} diff --git a/crates/brk_mempool/src/stores/tx_graveyard/mod.rs b/crates/brk_mempool/src/stores/tx_graveyard/mod.rs index 61eb732c3..db420494c 100644 --- a/crates/brk_mempool/src/stores/tx_graveyard/mod.rs +++ b/crates/brk_mempool/src/stores/tx_graveyard/mod.rs @@ -3,12 +3,12 @@ use std::{ time::{Duration, Instant}, }; -use brk_types::{Transaction, Txid}; +use brk_types::{FeeRate, Transaction, Txid}; use rustc_hash::FxHashMap; mod tombstone; -pub use tombstone::TxTombstone; +pub(crate) use tombstone::TxTombstone; use crate::{TxEntry, TxRemoval}; @@ -23,10 +23,6 @@ pub struct TxGraveyard { } impl TxGraveyard { - pub fn contains(&self, txid: &Txid) -> bool { - self.tombstones.contains_key(txid) - } - pub fn tombstones_len(&self) -> usize { self.tombstones.len() } @@ -39,6 +35,25 @@ impl TxGraveyard { self.tombstones.get(txid) } + /// Tombstone iff the tx vanished from the pool (mined, expired, or + /// dropped). `Replaced` tombstones return `None` because the tx + /// will not confirm. + pub fn get_vanished(&self, txid: &Txid) -> Option<&TxTombstone> { + let tomb = self.tombstones.get(txid)?; + matches!(tomb.removal, TxRemoval::Vanished).then_some(tomb) + } + + /// Walk forward through `Replaced { by }` to the terminal replacer. + /// Returns `txid` itself if it isn't replaced (live or `Vanished`). + pub fn replacement_root_of(&self, mut txid: Txid) -> Txid { + while let Some(TxRemoval::Replaced { by }) = + self.tombstones.get(&txid).map(|t| &t.removal) + { + txid = *by; + } + txid + } + /// Tombstones marked as `Replaced { by: replacer }`. Used to walk /// backward through RBF history: given a tx that's still live (or /// in the graveyard), find every tx it displaced. @@ -61,18 +76,33 @@ impl TxGraveyard { pub fn replaced_iter_recent_first(&self) -> impl Iterator { self.order.iter().rev().filter_map(|(t, txid)| { let ts = self.tombstones.get(txid)?; - if ts.removed_at() != *t { + if ts.removed_at != *t { return None; } Some((txid, ts.replaced_by()?)) }) } - pub fn bury(&mut self, txid: Txid, tx: Transaction, entry: TxEntry, removal: TxRemoval) { - let now = Instant::now(); - self.tombstones - .insert(txid, TxTombstone::new(tx, entry, removal, now)); - self.order.push_back((now, txid)); + pub fn bury( + &mut self, + tx: Transaction, + entry: TxEntry, + chunk_rate: FeeRate, + removal: TxRemoval, + ) { + let txid = entry.txid; + let removed_at = Instant::now(); + self.tombstones.insert( + txid, + TxTombstone { + tx, + entry, + chunk_rate, + removal, + removed_at, + }, + ); + self.order.push_back((removed_at, txid)); } /// Remove and return the tombstone, e.g. when the tx comes back to life. @@ -92,7 +122,7 @@ impl TxGraveyard { } let (_, txid) = self.order.pop_front().unwrap(); if let Some(ts) = self.tombstones.get(&txid) - && ts.removed_at() == t + && ts.removed_at == t { self.tombstones.remove(&txid); } diff --git a/crates/brk_mempool/src/stores/tx_graveyard/tombstone.rs b/crates/brk_mempool/src/stores/tx_graveyard/tombstone.rs index 13959185e..fefc26921 100644 --- a/crates/brk_mempool/src/stores/tx_graveyard/tombstone.rs +++ b/crates/brk_mempool/src/stores/tx_graveyard/tombstone.rs @@ -1,41 +1,23 @@ use std::time::Instant; -use brk_types::{Transaction, Txid}; +use brk_types::{FeeRate, Transaction, Txid}; use crate::{TxEntry, TxRemoval}; /// A buried mempool tx, retained for reappearance detection and -/// post-mine analytics. +/// post-mine analytics. `chunk_rate` is the linearized chunk feerate at +/// burial time - same value `live_effective_fee_rate` reported while +/// the tx was alive, so an evicted RBF predecessor reports the +/// package-effective rate, not a misleading isolated `fee/vsize`. pub struct TxTombstone { pub tx: Transaction, pub entry: TxEntry, - removal: TxRemoval, - removed_at: Instant, + pub chunk_rate: FeeRate, + pub removal: TxRemoval, + pub removed_at: Instant, } impl TxTombstone { - pub(crate) fn new( - tx: Transaction, - entry: TxEntry, - removal: TxRemoval, - removed_at: Instant, - ) -> Self { - Self { - tx, - entry, - removal, - removed_at, - } - } - - pub fn reason(&self) -> &TxRemoval { - &self.removal - } - - pub(crate) fn removed_at(&self) -> Instant { - self.removed_at - } - pub(crate) fn replaced_by(&self) -> Option<&Txid> { match &self.removal { TxRemoval::Replaced { by } => Some(by), diff --git a/crates/brk_mempool/src/stores/tx_store.rs b/crates/brk_mempool/src/stores/tx_store.rs index 5a82ac164..32b70a84c 100644 --- a/crates/brk_mempool/src/stores/tx_store.rs +++ b/crates/brk_mempool/src/stores/tx_store.rs @@ -34,10 +34,6 @@ impl TxStore { self.records.len() } - pub fn is_empty(&self) -> bool { - self.records.is_empty() - } - pub fn get(&self, txid: &Txid) -> Option<&Transaction> { self.records.get(&TxidPrefix::from(txid)).map(|r| &r.tx) } diff --git a/crates/brk_query/src/impl/cpfp.rs b/crates/brk_query/src/impl/cpfp.rs index 6e030b1dd..b5496ea22 100644 --- a/crates/brk_query/src/impl/cpfp.rs +++ b/crates/brk_query/src/impl/cpfp.rs @@ -10,10 +10,10 @@ //! carries the same chunk-rate semantics the live mempool produces. use brk_error::{Error, OptionData, Result}; -use brk_mempool::{ChunkInput, linearize}; use brk_types::{ - CpfpCluster, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate, Height, Sats, - TxInIndex, TxIndex, Txid, TxidPrefix, VSize, Weight, + CPFP_CHAIN_LIMIT, ChunkInput, CpfpCluster, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, + CpfpInfo, FeeRate, Height, Sats, TxInIndex, TxIndex, Txid, TxidPrefix, VSize, Weight, + find_seed_chunk, linearize, }; use rustc_hash::{FxBuildHasher, FxHashMap}; use smallvec::SmallVec; @@ -21,10 +21,6 @@ use vecdb::{ReadableVec, VecIndex}; use crate::Query; -/// Cap matches Bitcoin Core's default mempool ancestor/descendant -/// chain limits and mempool.space's truncation. -const MAX: usize = 25; - struct WalkResult { /// Cluster members in `[ancestors..., seed, descendants...]` order, /// each paired with its in-cluster parent edges resolved to the @@ -58,13 +54,12 @@ impl Query { /// Effective fee rate for `txid` using the same chunk-rate semantics /// across paths: /// - /// - Live mempool: snapshot's per-tx `chunk_rate` (Core's - /// `fees.chunk` / `chunkweight`, or proxy fallback). If the tx is - /// in the pool but not in the latest snapshot (e.g. just added), - /// falls back to the entry's simple `fee/vsize`. + /// - Live mempool: snapshot's per-tx linearized `chunk_rate`. If + /// the tx is in the pool but not in the latest snapshot (e.g. + /// just added), falls back to the entry's simple `fee/vsize`. /// - Confirmed: precomputed `effective_fee_rate.tx_index`. - /// - Graveyard-only RBF predecessor: simple `fee/vsize` snapshotted - /// at burial. + /// - Graveyard-only RBF predecessor: linearized chunk rate + /// captured at burial. /// /// Returns `Error::UnknownTxid` for txids not seen in any of those. pub fn effective_fee_rate(&self, txid: &Txid) -> Result { @@ -158,7 +153,7 @@ impl Query { /// BFS the seed's same-block ancestors (via `outpoint`) and /// descendants (via `spent.txin_index` -> `spending_tx`), capped - /// at `MAX` each side to match Core/mempool.space. Returns members + /// at `CPFP_CHAIN_LIMIT` each side to match Core/mempool.space. Returns members /// laid out as `[ancestors..., seed, descendants...]` so the seed's /// local index is `ancestors.len()`. fn walk_same_block_cluster(&self, seed: TxIndex, height: Height) -> Result { @@ -202,7 +197,7 @@ impl Query { }; let mut visited: FxHashMap = - FxHashMap::with_capacity_and_hasher(2 * MAX + 1, FxBuildHasher); + FxHashMap::with_capacity_and_hasher(2 * CPFP_CHAIN_LIMIT + 1, FxBuildHasher); visited.insert(seed, ()); // Ancestor BFS: each push records (tx_index, raw parent tx_indices) @@ -212,7 +207,7 @@ impl Query { let mut stack: Vec> = vec![seed_inputs.clone()]; 'a: while let Some(parents) = stack.pop() { for parent in parents { - if ancestors.len() >= MAX { + if ancestors.len() >= CPFP_CHAIN_LIMIT { break 'a; } if visited.insert(parent, ()).is_some() || !same_block(parent) { @@ -249,7 +244,7 @@ impl Query { } descendants.push((child, walk_inputs(child))); stack.push(child); - if descendants.len() >= MAX { + if descendants.len() >= CPFP_CHAIN_LIMIT { break 'd; } } @@ -336,12 +331,7 @@ fn build_cpfp_info( }) .collect(); let chunks = linearize(&inputs); - let (chunk_index, seed_rate) = chunks - .iter() - .enumerate() - .find(|(_, ch)| ch.txs.contains(&seed_local)) - .map(|(i, ch)| (i as u32, ch.feerate)) - .unwrap_or((0, seed.rate)); + let (chunk_index, seed_rate) = find_seed_chunk(&chunks, seed_local, seed.rate); let cluster_txs: Vec = members .iter() .map(|m| CpfpClusterTx { diff --git a/crates/brk_rpc/src/methods.rs b/crates/brk_rpc/src/methods.rs index daa6fd422..55d6c2103 100644 --- a/crates/brk_rpc/src/methods.rs +++ b/crates/brk_rpc/src/methods.rs @@ -26,45 +26,35 @@ const RPC_NOT_FOUND: i32 = -5; use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, RawTx, TxOutInfo}; -/// Per-batch request count for `get_block_hashes_range`. Sized so the +/// Per-batch request count for `get_block_hashes_range`, +/// `fetch_mempool_entries`, and `fetch_raw_transactions`. Sized so the /// JSON request body stays well under a megabyte and bitcoind doesn't /// spend too long on a single batch before yielding results. const BATCH_CHUNK: usize = 2000; /// Live mempool state fetched in one batched bitcoind round-trip: -/// `getrawmempool verbose` + `getblocktemplate` + `getmempoolinfo`. -/// `gbt` is validated to be a subset of `entries` before construction; -/// callers that want strict consistency should rely on this fact. +/// `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`. +/// `gbt` is validated to be a subset of `live_txids` before +/// construction; on mismatch the cycle is skipped (`Ok(None)`) so we +/// never publish a block 0 missing txids Core would actually mine. pub struct MempoolState { - pub entries: Vec, + pub live_txids: Vec, pub gbt: Vec, pub min_fee: FeeRate, } #[derive(Deserialize)] -struct VerboseEntryRaw { +struct MempoolEntryRaw { vsize: VSize, weight: Weight, time: Timestamp, - #[serde(rename = "ancestorcount")] - ancestor_count: u64, - #[serde(rename = "ancestorsize")] - ancestor_size: VSize, - #[serde(rename = "descendantsize")] - descendant_size: VSize, - fees: VerboseFeesRaw, + fees: MempoolEntryFeesRaw, depends: Vec, - #[serde(rename = "chunkweight", default)] - chunk_weight: Option, } #[derive(Deserialize)] -struct VerboseFeesRaw { +struct MempoolEntryFeesRaw { base: Bitcoin, - ancestor: Bitcoin, - descendant: Bitcoin, - #[serde(default)] - chunk: Option, } #[derive(Deserialize)] @@ -78,31 +68,20 @@ struct GbtTxRaw { fee: u64, } -fn build_verbose(raw: FxHashMap) -> Result> { - raw.into_iter() - .map(|(txid_str, e)| { - let depends = e - .depends - .iter() - .map(|s| Client::parse_txid(s, "depends txid")) - .collect::>>()?; - Ok(MempoolEntryInfo { - txid: Client::parse_txid(&txid_str, "mempool txid")?, - vsize: e.vsize, - weight: e.weight, - fee: Sats::from(e.fees.base), - first_seen: e.time, - ancestor_count: e.ancestor_count, - ancestor_size: e.ancestor_size, - ancestor_fee: Sats::from(e.fees.ancestor), - descendant_size: e.descendant_size, - descendant_fee: Sats::from(e.fees.descendant), - chunk_fee: e.fees.chunk.map(Sats::from), - chunk_weight: e.chunk_weight, - depends, - }) - }) - .collect() +fn build_entry(txid: Txid, e: MempoolEntryRaw) -> Result { + let depends = e + .depends + .iter() + .map(|s| Client::parse_txid(s, "depends txid")) + .collect::>>()?; + Ok(MempoolEntryInfo { + txid, + vsize: e.vsize, + weight: e.weight, + fee: Sats::from(e.fees.base), + first_seen: e.time, + depends, + }) } fn build_gbt(raw: GbtResponseRaw) -> Vec { @@ -371,55 +350,86 @@ impl Client { Ok(Txid::from(txid)) } - /// Verbose mempool listing + Core's projected next block + live + /// Core's projected next block + live mempool txid set + /// `mempoolminfee`, fetched in a single bitcoind round-trip. - /// `getblocktemplate` runs first so that any tx arriving between - /// the two intra-batch calls lands in the verbose listing only, - /// preserving GBT ⊆ verbose for the common race. Validates that - /// every GBT txid is present in the verbose listing and returns - /// `Ok(None)` on mismatch so the caller can skip the cycle: - /// republishing block 0 with missing txids would diverge from - /// Core's exact selection. Other failures bubble up as `Err`. + /// `getblocktemplate` runs first so any tx arriving between the + /// intra-batch calls lands in the txid listing only, preserving + /// GBT ⊆ txids for the common race. Validates that every GBT txid + /// is present in the txid listing and returns `Ok(None)` on + /// mismatch so the caller can skip the cycle: republishing block 0 + /// with missing txids would diverge from Core's exact selection. + /// Other failures bubble up as `Err`. pub fn fetch_mempool_state(&self) -> Result> { let requests: [(&str, Vec); 3] = [ ( "getblocktemplate", vec![serde_json::json!({ "rules": ["segwit"] })], ), - ("getrawmempool", vec![Value::Bool(true)]), + ("getrawmempool", vec![Value::Bool(false)]), ("getmempoolinfo", vec![]), ]; let mut out = self.0.call_mixed_batch(&requests)?.into_iter(); let gbt_raw = out.next().ok_or(Error::Internal("missing gbt"))??; - let verbose_raw = out.next().ok_or(Error::Internal("missing verbose"))??; + let txids_raw = out.next().ok_or(Error::Internal("missing rawmempool"))??; let info_raw = out.next().ok_or(Error::Internal("missing mempoolinfo"))??; - let verbose: FxHashMap = serde_json::from_str(verbose_raw.get())?; - let entries = build_verbose(verbose)?; + let txid_strs: Vec = serde_json::from_str(txids_raw.get())?; + let live_txids: Vec = txid_strs + .iter() + .map(|s| Self::parse_txid(s, "mempool txid")) + .collect::>>()?; let gbt = build_gbt(serde_json::from_str(gbt_raw.get())?); let min_fee = build_min_fee(serde_json::from_str(info_raw.get())?); - #[cfg(debug_assertions)] - { - let entry_set: rustc_hash::FxHashSet = entries.iter().map(|e| e.txid).collect(); - let missing = gbt.iter().filter(|t| !entry_set.contains(&t.txid)).count(); - if missing > 0 { - tracing::warn!( - missing, - gbt_total = gbt.len(), - "getblocktemplate has {missing} txids not in verbose mempool; skipping cycle" - ); - return Ok(None); - } + let live_set: rustc_hash::FxHashSet = live_txids.iter().copied().collect(); + let missing = gbt.iter().filter(|t| !live_set.contains(&t.txid)).count(); + if missing > 0 { + #[cfg(debug_assertions)] + tracing::warn!( + missing, + gbt_total = gbt.len(), + "getblocktemplate has {missing} txids not in mempool listing; skipping cycle" + ); + return Ok(None); } Ok(Some(MempoolState { - entries, + live_txids, gbt, min_fee, })) } + /// Batched `getmempoolentry` for the given txids. Returns + /// `MempoolEntryInfo` per successful lookup. Per-item -5 (NOT_FOUND + /// — tx evicted/replaced between the txid listing and this call) + /// drops silently; transport-level failures still propagate. + /// Chunked at `BATCH_CHUNK` requests per round-trip. + pub fn fetch_mempool_entries(&self, txids: &[Txid]) -> Result> { + let mut out: Vec = Vec::with_capacity(txids.len()); + + for chunk in txids.chunks(BATCH_CHUNK) { + let args = chunk.iter().map(|t| { + let bt: &bitcoin::Txid = t.into(); + vec![serde_json::to_value(bt).unwrap_or(Value::Null)] + }); + let results: Vec> = + self.0.call_batch_per_item("getmempoolentry", args)?; + + for (txid, res) in chunk.iter().zip(results) { + match res { + Ok(raw) => out.push(build_entry(*txid, raw)?), + Err(Error::CorepcRPC(JsonRpcError::Rpc(rpc))) if rpc.code == RPC_NOT_FOUND => {} + Err(e) => { + debug!(txid = %txid, error = %e, "getmempoolentry batch: item failed") + } + } + } + } + + Ok(out) + } + pub fn get_closest_valid_height(&self, hash: BlockHash) -> Result<(Height, BlockHash)> { debug!("Get closest valid height..."); diff --git a/crates/brk_types/src/cpfp/chunk_input.rs b/crates/brk_types/src/cpfp/chunk_input.rs new file mode 100644 index 000000000..5145cb854 --- /dev/null +++ b/crates/brk_types/src/cpfp/chunk_input.rs @@ -0,0 +1,9 @@ +use crate::{CpfpClusterTxIndex, Sats, VSize}; + +/// One cluster member's input to Single Fee Linearization: its +/// `(fee, vsize)` and parent edges as local indices into the same array. +pub struct ChunkInput<'a> { + pub fee: Sats, + pub vsize: VSize, + pub parents: &'a [CpfpClusterTxIndex], +} diff --git a/crates/brk_types/src/cpfp/cluster_chunk.rs b/crates/brk_types/src/cpfp/cluster_chunk.rs index 9558894e8..33b6b1d72 100644 --- a/crates/brk_types/src/cpfp/cluster_chunk.rs +++ b/crates/brk_types/src/cpfp/cluster_chunk.rs @@ -14,3 +14,20 @@ pub struct CpfpClusterChunk { pub txs: Vec, pub feerate: FeeRate, } + +/// Find the chunk containing `seed_local` and return `(chunk_index, +/// feerate)`. Falls back to `(0, fallback)` when the seed isn't in any +/// chunk - shouldn't happen for a well-formed linearization but keeps +/// callers' wire shape valid. +pub fn find_seed_chunk( + chunks: &[CpfpClusterChunk], + seed_local: CpfpClusterTxIndex, + fallback: FeeRate, +) -> (u32, FeeRate) { + chunks + .iter() + .enumerate() + .find(|(_, ch)| ch.txs.contains(&seed_local)) + .map(|(i, ch)| (i as u32, ch.feerate)) + .unwrap_or((0, fallback)) +} diff --git a/crates/brk_mempool/src/chunking.rs b/crates/brk_types/src/cpfp/linearize.rs similarity index 75% rename from crates/brk_mempool/src/chunking.rs rename to crates/brk_types/src/cpfp/linearize.rs index be1a9e612..11f32d9dc 100644 --- a/crates/brk_mempool/src/chunking.rs +++ b/crates/brk_types/src/cpfp/linearize.rs @@ -1,34 +1,21 @@ -//! Cluster mempool linearization (Core 31's "Single Fee Linearization"). +//! Single Fee Linearization (Bitcoin Core 31's cluster mempool SFL). //! -//! Given a topologically ordered cluster (parents before children) with -//! per-tx `(fee, vsize)` and parent edges as local indices, partition the -//! cluster into chunks ordered by descending feerate, where each chunk is -//! the highest-rate ancestor-closed set of remaining txs. +//! Partition a topo-ordered cluster into chunks ordered by descending +//! feerate, where each chunk is the highest-rate ancestor-closed set +//! of remaining txs. Spec-equivalent to Core's `fees.chunk` / +//! `chunkweight`; works on any Core version. //! -//! The "lift" merging this implements is what makes CPFP visible at the -//! cluster level: a child whose rate exceeds its parent's rate gets folded -//! into a chunk with the parent, and the chunk's rate is the combined -//! `(parent_fee + child_fee) / (parent_vsize + child_vsize)`. Cascades -//! upward through any further parents until rates are non-increasing. +//! The "lift" this implements is what makes CPFP visible at the +//! cluster level: a child whose rate exceeds its parent's gets folded +//! into a chunk with the parent at the combined `(parent_fee + +//! child_fee) / (parent_vsize + child_vsize)`. Cascades upward through +//! any further parents until rates are non-increasing. //! -//! This is the proxy-fallback case; under Core 31+ each tx's `fees.chunk` -//! / `chunkweight` already encodes the chunked rate, so all members of a -//! chunk would share that rate. Computing locally from `(fee, vsize)` -//! gives the same answer either way and works on older Core too. -//! -//! Complexity is `O(n^2)` per linearization (n bounded by cluster cap), -//! matching mempool.space's frontend implementation. +//! `O(n^2)` per linearization, `n` bounded by the cluster cap. -use brk_types::{CpfpClusterChunk, CpfpClusterTxIndex, FeeRate, Sats, VSize}; use rustc_hash::{FxBuildHasher, FxHashSet}; -/// One cluster member: its `(fee, vsize)` and parent edges as -/// local indices into the same array. -pub struct ChunkInput<'a> { - pub fee: Sats, - pub vsize: VSize, - pub parents: &'a [CpfpClusterTxIndex], -} +use crate::{ChunkInput, CpfpClusterChunk, CpfpClusterTxIndex, FeeRate, Sats, VSize}; /// Linearize `items` into chunks. `items` must be in topological order /// (parents before children); `parents` indices must point earlier in diff --git a/crates/brk_types/src/cpfp/mod.rs b/crates/brk_types/src/cpfp/mod.rs index d5da1286a..8ea08b136 100644 --- a/crates/brk_types/src/cpfp/mod.rs +++ b/crates/brk_types/src/cpfp/mod.rs @@ -1,13 +1,21 @@ +mod chunk_input; mod cluster; mod cluster_chunk; mod cluster_tx; mod cluster_tx_index; mod entry; mod info; +mod linearize; +pub use chunk_input::ChunkInput; pub use cluster::CpfpCluster; -pub use cluster_chunk::CpfpClusterChunk; +pub use cluster_chunk::{CpfpClusterChunk, find_seed_chunk}; pub use cluster_tx::CpfpClusterTx; pub use cluster_tx_index::CpfpClusterTxIndex; pub use entry::CpfpEntry; pub use info::CpfpInfo; +pub use linearize::linearize; + +/// Bitcoin Core's default mempool ancestor/descendant chain cap, also +/// used by mempool.space-style truncation in CPFP walks. +pub const CPFP_CHAIN_LIMIT: usize = 25; diff --git a/crates/brk_types/src/mempool_block.rs b/crates/brk_types/src/mempool_block.rs index b0943ef68..f0dc903df 100644 --- a/crates/brk_types/src/mempool_block.rs +++ b/crates/brk_types/src/mempool_block.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::{FeeRate, Sats, VSize}; /// Block info in a mempool.space like format for fee estimation. -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct MempoolBlock { /// Total serialized block size in bytes (witness + non-witness). diff --git a/crates/brk_types/src/mempool_entry_info.rs b/crates/brk_types/src/mempool_entry_info.rs index 57c9513c5..eb6f9de67 100644 --- a/crates/brk_types/src/mempool_entry_info.rs +++ b/crates/brk_types/src/mempool_entry_info.rs @@ -1,6 +1,6 @@ -use crate::{FeeRate, Sats, Timestamp, Txid, VSize, Weight}; +use crate::{Sats, Timestamp, Txid, VSize, Weight}; -/// Mempool entry info from Bitcoin Core's `getrawmempool true`. +/// Mempool entry info from Bitcoin Core's `getmempoolentry`. #[derive(Debug, Clone)] pub struct MempoolEntryInfo { pub txid: Txid, @@ -8,32 +8,6 @@ pub struct MempoolEntryInfo { pub weight: Weight, pub fee: Sats, pub first_seen: Timestamp, - pub ancestor_count: u64, - pub ancestor_size: VSize, - pub ancestor_fee: Sats, - pub descendant_size: VSize, - pub descendant_fee: Sats, - /// Total fee of the cluster mempool chunk this tx belongs to. - /// Present from Bitcoin Core 31+ (cluster mempool); absent on - /// older Core, in which case rate-callers fall back to - /// `max(ancestor_rate, descendant_pkg_rate)`. - pub chunk_fee: Option, - pub chunk_weight: Option, /// Parent txids in the mempool. pub depends: Vec, } - -impl MempoolEntryInfo { - /// Effective per-vbyte rate Core would mine this tx at. Uses the - /// Core-31 `fees.chunk` / `chunkweight` chunk fields when present; - /// otherwise falls back to `max(ancestor_rate, descendant_pkg_rate)`, - /// which bounds the predictive error in deep clusters. - pub fn chunk_rate(&self) -> FeeRate { - if let (Some(chunk_fee), Some(chunk_weight)) = (self.chunk_fee, self.chunk_weight) { - return FeeRate::from((chunk_fee, VSize::from(chunk_weight))); - } - let anc = FeeRate::from((self.ancestor_fee, self.ancestor_size)); - let desc = FeeRate::from((self.descendant_fee, self.descendant_size)); - anc.max(desc) - } -}