mempool: use bitcoin projected block, rest is a very simple prediction

This commit is contained in:
nym21
2026-05-07 18:30:26 +02:00
parent 1b39d21bbe
commit f4910efd7d
69 changed files with 4340 additions and 5906 deletions

View File

@@ -2,16 +2,18 @@
//! `brk_mempool`) and the confirmed-tx path built here from indexer
//! and computer vecs.
//!
//! Confirmed clusters are built on demand by walking the same-block
//! Confirmed clusters are materialized on demand by walking same-block
//! parent/child edges in `TxIndex` space (no `Transaction`
//! reconstruction, no `txid tx_index` lookup), then handing the
//! resulting `brk_mempool::cluster::Cluster` to `Cluster::to_cpfp_info`
//! — the same wire converter the mempool path uses, so both produce
//! identical `CpfpInfo` shapes.
//! reconstruction, no `txid -> tx_index` lookup), then assembling the
//! wire shape directly. The seed's effective fee rate and the per-chunk
//! grouping both read precomputed `effective_fee_rate.tx_index`, which
//! carries the same chunk-rate semantics the live mempool produces.
use brk_error::{Error, OptionData, Result};
use brk_mempool::cluster::{Cluster, ClusterNode, LocalIdx};
use brk_types::{CpfpInfo, FeeRate, Height, TxInIndex, TxIndex, Txid, TxidPrefix, VSize, Weight};
use brk_types::{
CpfpCluster, CpfpClusterChunk, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate,
Height, Sats, TxInIndex, TxIndex, Txid, TxidPrefix, VSize, Weight,
};
use rustc_hash::{FxBuildHasher, FxHashMap};
use smallvec::SmallVec;
use vecdb::{ReadableVec, VecIndex};
@@ -23,15 +25,20 @@ use crate::Query;
const MAX: usize = 25;
struct WalkResult {
/// Cluster members in build order (`[seed, ancestors..., descendants...]`),
/// each paired with its in-cluster parent edges already resolved to
/// `LocalIdx`. Vec position equals the node's `LocalIdx`.
nodes: Vec<(TxIndex, SmallVec<[LocalIdx; 2]>)>,
/// Pre-permutation `LocalIdx` of the seed. Equals `ancestor_count`
/// because all of seed's in-cluster ancestors topo-sort before it
/// and only ancestors do, so after `Cluster::new` permutes nodes
/// into topological order seed lands at this exact position.
seed_local: LocalIdx,
/// Cluster members in `[ancestors..., seed, descendants...]` order,
/// each paired with its in-cluster parent edges resolved to the
/// member's local index. The seed's local index is `ancestors.len()`.
members: Vec<(TxIndex, SmallVec<[CpfpClusterTxIndex; 2]>)>,
seed_local: CpfpClusterTxIndex,
}
struct Member {
txid: Txid,
fee: Sats,
weight: Weight,
vsize: VSize,
rate: FeeRate,
parents: SmallVec<[CpfpClusterTxIndex; 2]>,
}
impl Query {
@@ -47,14 +54,14 @@ impl Query {
self.confirmed_cpfp(txid)
}
/// Effective fee rate for `txid` using the same SFL chunk-rate
/// semantics across paths:
/// Effective fee rate for `txid` using the same chunk-rate semantics
/// across paths:
///
/// - Live mempool: snapshot `cluster_of` lookup → seed's chunk rate.
/// If the tx is in the pool but not in the latest snapshot (e.g.
/// just added), falls back to the entry's simple `fee/vsize`.
/// - Confirmed: precomputed `effective_fee_rate.tx_index` (the same
/// SFL chunk rate, computed at index time).
/// - Live mempool: snapshot's per-tx `chunk_rate` (Core's
/// `fees.chunk` / `chunkweight`, or proxy fallback). If the tx is
/// in the pool but not in the latest snapshot (e.g. just added),
/// falls back to the entry's simple `fee/vsize`.
/// - Confirmed: precomputed `effective_fee_rate.tx_index`.
/// - Graveyard-only RBF predecessor: simple `fee/vsize` snapshotted
/// at burial.
///
@@ -90,13 +97,17 @@ impl Query {
}
/// CPFP cluster for a confirmed tx: the connected component of
/// same-block parent/child edges, walked on demand. SFL runs on
/// the result so `effectiveFeePerVsize` matches the live path's
/// chunk-rate semantics.
/// same-block parent/child edges, walked on demand. Per-tx
/// `effective_fee_rate.tx_index` provides each member's chunk rate.
fn confirmed_cpfp(&self, txid: &Txid) -> Result<CpfpInfo> {
let tx_index = self.resolve_tx_index(txid)?;
let height = self.confirmed_status_height(tx_index)?;
let (cluster, seed_local) = self.build_confirmed_cluster(tx_index, height)?;
let WalkResult {
members,
seed_local,
} = self.walk_same_block_cluster(tx_index, height)?;
let resolved = self.resolve_members(&members)?;
let sigops = self
.indexer()
.vecs
@@ -104,20 +115,52 @@ impl Query {
.total_sigop_cost
.collect_one(tx_index)
.data()?;
Ok(cluster.to_cpfp_info(seed_local, sigops))
Ok(build_cpfp_info(&resolved, seed_local, sigops))
}
/// Walk the seed's same-block parent/child edges, materialize each
/// member's `(txid, weight, fee)` from indexer/computer cursors,
/// and build a `Cluster<TxIndex>`. The seed's `LocalIdx` comes
/// straight from the walk (`ancestor_count`), since `Cluster::new`
/// preserves the "ancestors before seed before descendants" ordering
/// that defines that index.
fn build_confirmed_cluster(
fn resolve_members(
&self,
seed: TxIndex,
height: Height,
) -> Result<(Cluster<TxIndex>, LocalIdx)> {
members: &[(TxIndex, SmallVec<[CpfpClusterTxIndex; 2]>)],
) -> Result<Vec<Member>> {
let indexer = self.indexer();
let computer = self.computer();
let mut base_size = indexer.vecs.transactions.base_size.cursor();
let mut total_size = indexer.vecs.transactions.total_size.cursor();
let mut fee_cursor = computer.transactions.fees.fee.tx_index.cursor();
let mut rate_cursor = computer
.transactions
.fees
.effective_fee_rate
.tx_index
.cursor();
let txid_reader = indexer.vecs.transactions.txid.reader();
members
.iter()
.map(|(tx_index, parents)| {
let i = tx_index.to_usize();
let weight =
Weight::from_sizes(*base_size.get(i).data()?, *total_size.get(i).data()?);
let vsize = VSize::from(weight);
Ok(Member {
txid: txid_reader.get(i),
fee: fee_cursor.get(i).data()?,
weight,
vsize,
rate: rate_cursor.get(i).data()?,
parents: parents.clone(),
})
})
.collect()
}
/// BFS the seed's same-block ancestors (via `outpoint`) and
/// descendants (via `spent.txin_index` -> `spending_tx`), capped
/// at `MAX` each side to match Core/mempool.space. Returns members
/// laid out as `[ancestors..., seed, descendants...]` so the seed's
/// local index is `ancestors.len()`.
fn walk_same_block_cluster(&self, seed: TxIndex, height: Height) -> Result<WalkResult> {
let indexer = self.indexer();
let computer = self.computer();
let safe = self.safe_lengths();
@@ -131,46 +174,6 @@ impl Query {
};
let same_block = |idx: TxIndex| idx >= block_first && idx < block_end;
let WalkResult { nodes, seed_local } = self.walk_same_block_edges(seed, same_block);
let mut base_size = indexer.vecs.transactions.base_size.cursor();
let mut total_size = indexer.vecs.transactions.total_size.cursor();
let mut fee_cursor = computer.transactions.fees.fee.tx_index.cursor();
let txid_reader = indexer.vecs.transactions.txid.reader();
let cluster_nodes: Vec<ClusterNode<TxIndex>> = nodes
.into_iter()
.map(|(tx_index, parents)| {
let i = tx_index.to_usize();
let weight =
Weight::from_sizes(*base_size.get(i).data()?, *total_size.get(i).data()?);
Ok(ClusterNode {
id: tx_index,
txid: txid_reader.get(i),
fee: fee_cursor.get(i).data()?,
vsize: VSize::from(weight),
weight,
parents,
})
})
.collect::<Result<_>>()?;
Ok((Cluster::new(cluster_nodes), seed_local))
}
/// BFS the seed's same-block ancestors (via `outpoint`) and
/// descendants (via `spent.txin_index` → `spending_tx`), capped
/// at `MAX` each side to match Core/mempool.space. Each node is
/// pushed in build order with its full parent-outpoint list, then
/// at end of walk those lists are filtered against the membership
/// map to keep only in-cluster parents (resolved to `LocalIdx`).
fn walk_same_block_edges(
&self,
seed: TxIndex,
same_block: impl Fn(TxIndex) -> bool,
) -> WalkResult {
let indexer = self.indexer();
let computer = self.computer();
let mut first_txin = indexer.vecs.transactions.first_txin_index.cursor();
let mut first_txout = indexer.vecs.transactions.first_txout_index.cursor();
let mut outpoint = indexer.vecs.inputs.outpoint.cursor();
@@ -197,41 +200,32 @@ impl Query {
out
};
let mut raw: Vec<(TxIndex, SmallVec<[TxIndex; 2]>)> = Vec::with_capacity(2 * MAX + 1);
let mut local_of: FxHashMap<TxIndex, LocalIdx> =
let mut visited: FxHashMap<TxIndex, ()> =
FxHashMap::with_capacity_and_hasher(2 * MAX + 1, FxBuildHasher);
raw.push((seed, walk_inputs(seed)));
local_of.insert(seed, LocalIdx::ZERO);
visited.insert(seed, ());
// Ancestor BFS. Stack holds indices into `raw`; each pop reads
// that node's already-recorded parents and explores any same-block
// ones we haven't visited yet. `walk_inputs` runs at push time so
// parents are ready for the post-walk filter.
let mut stack: Vec<usize> = vec![0];
let mut ancestor_count: usize = 0;
'a: while let Some(idx) = stack.pop() {
let parents = raw[idx].1.clone();
// Ancestor BFS: each push records (tx_index, raw parent tx_indices)
// so we can filter against final cluster membership at the end.
let seed_inputs = walk_inputs(seed);
let mut ancestors: Vec<(TxIndex, SmallVec<[TxIndex; 2]>)> = Vec::new();
let mut stack: Vec<SmallVec<[TxIndex; 2]>> = vec![seed_inputs.clone()];
'a: while let Some(parents) = stack.pop() {
for parent in parents {
if ancestor_count >= MAX {
if ancestors.len() >= MAX {
break 'a;
}
if local_of.contains_key(&parent) || !same_block(parent) {
if visited.insert(parent, ()).is_some() || !same_block(parent) {
continue;
}
let new_idx = raw.len();
raw.push((parent, walk_inputs(parent)));
local_of.insert(parent, LocalIdx::from(new_idx));
stack.push(new_idx);
ancestor_count += 1;
let parent_inputs = walk_inputs(parent);
ancestors.push((parent, parent_inputs.clone()));
stack.push(parent_inputs);
}
}
// Descendant BFS. Stack holds tx_indices since we look up each
// tx's txouts via `first_txout`/`spent`/`spending_tx`. `local_of`
// already contains the seed and every ancestor, so they're
// skipped by the membership check.
// Descendant BFS via spent outputs.
let mut descendants: Vec<(TxIndex, SmallVec<[TxIndex; 2]>)> = Vec::new();
let mut stack: Vec<TxIndex> = vec![seed];
let mut descendant_count = 0;
'd: while let Some(cur) = stack.pop() {
let Ok(start) = first_txout.get(cur.to_usize()).data() else {
continue;
@@ -249,39 +243,145 @@ impl Query {
let Ok(child) = spending_tx.get(usize::from(txin_idx)).data() else {
continue;
};
if local_of.contains_key(&child) || !same_block(child) {
if visited.insert(child, ()).is_some() || !same_block(child) {
continue;
}
let new_idx = raw.len();
raw.push((child, walk_inputs(child)));
local_of.insert(child, LocalIdx::from(new_idx));
descendants.push((child, walk_inputs(child)));
stack.push(child);
descendant_count += 1;
if descendant_count >= MAX {
if descendants.len() >= MAX {
break 'd;
}
}
}
// Filter each node's full input list against `local_of` to keep
// only in-cluster parents, resolved to their `LocalIdx`.
let nodes: Vec<(TxIndex, SmallVec<[LocalIdx; 2]>)> = raw
// Lay members out as [ancestors_reverse..., seed, descendants...]
// so parents come before children when a single ancestor chain
// walks back from seed. Reversing the BFS order is good enough
// for wire output; chunk grouping doesn't depend on it.
let ancestor_count = ancestors.len();
let total = ancestor_count + 1 + descendants.len();
let mut local_of: FxHashMap<TxIndex, CpfpClusterTxIndex> =
FxHashMap::with_capacity_and_hasher(total, FxBuildHasher);
let mut members: Vec<(TxIndex, SmallVec<[TxIndex; 2]>)> = Vec::with_capacity(total);
for (tx, raw_parents) in ancestors.into_iter().rev() {
local_of.insert(tx, CpfpClusterTxIndex::from(members.len() as u32));
members.push((tx, raw_parents));
}
let seed_local = CpfpClusterTxIndex::from(members.len() as u32);
local_of.insert(seed, seed_local);
members.push((seed, seed_inputs));
for (tx, raw_parents) in descendants {
local_of.insert(tx, CpfpClusterTxIndex::from(members.len() as u32));
members.push((tx, raw_parents));
}
let resolved: Vec<(TxIndex, SmallVec<[CpfpClusterTxIndex; 2]>)> = members
.into_iter()
.map(|(tx_index, full_inputs)| {
let parents: SmallVec<[LocalIdx; 2]> = full_inputs
.map(|(tx, raw_parents)| {
let parents: SmallVec<[CpfpClusterTxIndex; 2]> = raw_parents
.iter()
.filter_map(|p| local_of.get(p).copied())
.collect();
(tx_index, parents)
(tx, parents)
})
.collect();
// Seed's pre-permutation index is 0; after `Cluster::new` topo-sorts
// it lands at `ancestor_count` (all in-cluster ancestors come first,
// and only ancestors do).
WalkResult {
nodes,
seed_local: LocalIdx::from(ancestor_count),
}
Ok(WalkResult {
members: resolved,
seed_local,
})
}
}
fn build_cpfp_info(
members: &[Member],
seed_local: CpfpClusterTxIndex,
sigops: brk_types::SigOps,
) -> CpfpInfo {
let seed_pos = u32::from(seed_local) as usize;
let seed = &members[seed_pos];
let ancestors: Vec<CpfpEntry> = members[..seed_pos]
.iter()
.map(|m| CpfpEntry {
txid: m.txid,
weight: m.weight,
fee: m.fee,
})
.collect();
let descendants: Vec<CpfpEntry> = members[seed_pos + 1..]
.iter()
.map(|m| CpfpEntry {
txid: m.txid,
weight: m.weight,
fee: m.fee,
})
.collect();
let best_descendant = descendants
.iter()
.max_by_key(|e| FeeRate::from((e.fee, e.weight)))
.cloned();
let cluster_txs: Vec<CpfpClusterTx> = members
.iter()
.map(|m| CpfpClusterTx {
txid: m.txid,
weight: m.weight,
fee: m.fee,
parents: m.parents.iter().copied().collect(),
})
.collect();
let chunks = chunk_groups(members);
let chunk_index = chunks
.iter()
.position(|ch| ch.txs.contains(&seed_local))
.map(|i| i as u32)
.unwrap_or(0);
CpfpInfo {
ancestors,
best_descendant,
descendants,
effective_fee_per_vsize: seed.rate,
sigops,
fee: seed.fee,
vsize: seed.vsize,
adjusted_vsize: sigops.adjust_vsize(seed.vsize),
cluster: CpfpCluster {
txs: cluster_txs,
chunks,
chunk_index,
},
}
}
fn chunk_groups(members: &[Member]) -> Vec<CpfpClusterChunk> {
let mut groups: FxHashMap<u64, (FeeRate, SmallVec<[CpfpClusterTxIndex; 4]>)> =
FxHashMap::with_capacity_and_hasher(members.len(), FxBuildHasher);
let mut order: Vec<u64> = Vec::new();
for (i, m) in members.iter().enumerate() {
let key = f64::from(m.rate).to_bits();
let local = CpfpClusterTxIndex::from(i as u32);
groups
.entry(key)
.and_modify(|(_, v)| v.push(local))
.or_insert_with(|| {
order.push(key);
let mut v: SmallVec<[CpfpClusterTxIndex; 4]> = SmallVec::new();
v.push(local);
(m.rate, v)
});
}
order.sort_by_key(|k| std::cmp::Reverse(groups[k].0));
order
.into_iter()
.map(|k| {
let (rate, txs) = groups.remove(&k).unwrap();
CpfpClusterChunk {
txs: txs.into_vec(),
feerate: rate,
}
})
.collect()
}

View File

@@ -1,13 +1,11 @@
use crate::Query;
use brk_error::{Error, Result};
use brk_mempool::{Mempool, RbfForTx, RbfNode};
use brk_mempool::{Mempool, PrevoutResolver, RbfForTx, RbfNode};
use brk_types::{
CheckedSub, FeeRate, MempoolBlock, MempoolInfo, MempoolRecentTx, OutputType, RbfResponse, RbfTx,
RecommendedFees, ReplacementNode, Sats, Timestamp, TxOut, TxOutIndex, Txid, TxidPrefix,
CheckedSub, FeeRate, MempoolBlock, MempoolInfo, MempoolRecentTx, OutputType, RbfResponse,
RbfTx, RecommendedFees, ReplacementNode, Sats, Timestamp, TxOut, TxOutIndex, Txid, TxidPrefix,
TypeIndex,
};
use vecdb::VecIndex;
use crate::Query;
const RECENT_REPLACEMENTS_LIMIT: usize = 25;
@@ -49,44 +47,52 @@ impl Query {
Ok(blocks)
}
/// Fill any `prevout == None` inputs on live mempool txs from the
/// indexer. Driver calls this once per cycle right after
/// `mempool.update()`. Returns true if at least one was filled.
pub fn fill_mempool_prevouts(&self) -> bool {
let Some(mempool) = self.mempool() else {
return false;
};
/// Indexer-backed resolver for confirmed-parent prevouts. Pass
/// the returned closure to `Mempool::start_with` /
/// `Mempool::update_with`; the mempool driver calls it post-apply
/// for every still-unfilled `prevout == None` input.
///
/// Reads go through `read_once` rather than a captured
/// `VecReader`: `VecReader::stored_len` is snapshotted at
/// construction, so a long-lived reader paired with fresh
/// `safe_lengths` would let `safe.tx_index` / `safe.txout_index`
/// advance past the reader's frozen length and panic in
/// `reader.get()`. `read_once` rebinds against the current vec
/// length per call and lets newly indexed parents become
/// resolvable on the next cycle.
pub fn indexer_prevout_resolver(&self) -> PrevoutResolver {
let query = self.clone();
let indexer = self.0.indexer;
let indexer = self.indexer();
let stores = &indexer.stores;
let safe = self.safe_lengths();
let tx_index_len = safe.tx_index;
let txout_index_len = safe.txout_index;
let first_txout_index_reader = indexer.vecs.transactions.first_txout_index.reader();
let output_type_reader = indexer.vecs.outputs.output_type.reader();
let type_index_reader = indexer.vecs.outputs.type_index.reader();
let value_reader = indexer.vecs.outputs.value.reader();
let addr_readers = indexer.vecs.addrs.addr_readers();
mempool.fill_prevouts(|prev_txid, vout| {
let prev_tx_index = stores
Box::new(move |prev_txid, vout| {
let safe = query.safe_lengths();
let prev_tx_index = indexer
.stores
.txid_prefix_to_tx_index
.get(&TxidPrefix::from(prev_txid))
.ok()??
.into_owned();
if prev_tx_index >= tx_index_len {
if prev_tx_index >= safe.tx_index {
return None;
}
let first_txout: TxOutIndex = first_txout_index_reader.get(prev_tx_index.to_usize());
let first_txout: TxOutIndex = indexer
.vecs
.transactions
.first_txout_index
.read_once(prev_tx_index)
.ok()?;
let txout = first_txout + vout;
if txout >= txout_index_len {
if txout >= safe.txout_index {
return None;
}
let txout_index = usize::from(txout);
let output_type: OutputType = output_type_reader.get(txout_index);
let type_index: TypeIndex = type_index_reader.get(txout_index);
let value: Sats = value_reader.get(txout_index);
let script_pubkey = addr_readers.script_pubkey(output_type, type_index);
let output_type: OutputType = indexer.vecs.outputs.output_type.read_once(txout).ok()?;
let type_index: TypeIndex = indexer.vecs.outputs.type_index.read_once(txout).ok()?;
let value: Sats = indexer.vecs.outputs.value.read_once(txout).ok()?;
let script_pubkey = indexer
.vecs
.addrs
.addr_readers()
.script_pubkey(output_type, type_index);
Some(TxOut::from((script_pubkey, value)))
})
}
@@ -125,11 +131,7 @@ impl Query {
/// Layer `mined` + effective fee rate onto an `RbfNode` tree.
/// Must run after the mempool lock has dropped (effective_fee_rate
/// re-enters Mempool).
fn enrich_rbf_node(
&self,
node: RbfNode,
successor_time: Option<Timestamp>,
) -> ReplacementNode {
fn enrich_rbf_node(&self, node: RbfNode, successor_time: Option<Timestamp>) -> ReplacementNode {
let interval = successor_time
.and_then(|st| st.checked_sub(node.first_seen))
.map(|d| *d);

View File

@@ -123,7 +123,9 @@ impl Query {
}
pub fn transaction(&self, txid: &Txid) -> Result<Transaction> {
self.lookup_tx(txid, Transaction::clone, |idx| self.transaction_by_index(idx))
self.lookup_tx(txid, Transaction::clone, |idx| {
self.transaction_by_index(idx)
})
}
pub fn transaction_status(&self, txid: &Txid) -> Result<TxStatus> {