mempool: fixes

This commit is contained in:
nym21
2026-04-30 12:38:34 +02:00
parent 43f3be4924
commit 9b42b40a36
14 changed files with 408 additions and 79 deletions

View File

@@ -173,10 +173,17 @@ impl Query {
let prefix = u32::from(type_index).to_be_bytes();
// Match mempool.space's electrs cap: refuse addresses with >500 UTXOs.
// Bounds worst-case work and response size, prevents heavy-address DDoS.
const MAX_UTXOS: usize = 500;
let outpoints: Vec<(TxIndex, Vout)> = store
.prefix(prefix)
.map(|(key, _): (AddrIndexOutPoint, Unit)| (key.tx_index(), key.vout()))
.take(MAX_UTXOS + 1)
.collect();
if outpoints.len() > MAX_UTXOS {
return Err(Error::TooManyUtxos);
}
let txid_reader = vecs.transactions.txid.reader();
let first_txout_index_reader = vecs.transactions.first_txout_index.reader();

View File

@@ -1,14 +1,12 @@
use std::cmp::Ordering;
use brk_error::{Error, Result};
use brk_error::{Error, OptionData, Result};
use brk_mempool::{EntryPool, TxEntry, TxGraveyard, TxRemoval, TxStore, TxTombstone};
use brk_types::{
CheckedSub, CpfpEntry, CpfpInfo, FeeRate, MempoolBlock, MempoolInfo, MempoolRecentTx,
OutputType, RbfResponse, RbfTx, RecommendedFees, ReplacementNode, Sats, Timestamp, Transaction,
TxOut, TxOutIndex, Txid, TxidPrefix, TypeIndex, VSize, Weight,
TxIndex, TxInIndex, TxOut, TxOutIndex, Txid, TxidPrefix, TypeIndex, VSize, Weight,
};
use rustc_hash::FxHashSet;
use vecdb::VecIndex;
use vecdb::{AnyVec, ReadableVec, VecIndex};
use crate::Query;
@@ -96,74 +94,190 @@ impl Query {
pub fn cpfp(&self, txid: &Txid) -> Result<CpfpInfo> {
let mempool = self.mempool().ok_or(Error::MempoolNotAvailable)?;
let entries = mempool.entries();
let prefix = TxidPrefix::from(txid);
Ok(mempool
.cpfp_info(&prefix)
.unwrap_or_else(|| self.confirmed_cpfp(txid)))
}
let Some(entry) = entries.get(&prefix) else {
return Ok(CpfpInfo::default());
/// CPFP cluster for a confirmed tx: the connected component of
/// same-block parent/child edges, reconstructed by BFS on demand.
/// Walks entirely in `TxIndex` space using direct vec reads (height,
/// weight, fee) - skips full `Transaction` reconstruction and avoids
/// `txid -> tx_index` lookups by reading `OutPoint`'s packed
/// `tx_index` directly. Capped at 25 each side to match Bitcoin
/// Core's default mempool chain limits and mempool.space's own
/// truncation. `effectiveFeePerVsize` is the simple package rate;
/// mempool's `calculateGoodBlockCpfp` chunk-rate algorithm is not
/// ported.
fn confirmed_cpfp(&self, txid: &Txid) -> CpfpInfo {
const MAX: usize = 25;
let Ok(seed_idx) = self.resolve_tx_index(txid) else {
return CpfpInfo::default();
};
let Ok(seed_height) = self.confirmed_status_height(seed_idx) else {
return CpfpInfo::default();
};
// Ancestor walk doubles as package-rate aggregation. Stale
// `depends` entries pointing at mined/evicted txs are silently
// dropped via the live `entries.get` probe, so the aggregates
// reflect only in-pool ancestors.
let mut ancestors = Vec::new();
let mut visited: FxHashSet<TxidPrefix> = FxHashSet::default();
let mut package_fee = u64::from(entry.fee);
let mut package_vsize = u64::from(entry.vsize);
let mut stack: Vec<TxidPrefix> = entry.depends.to_vec();
while let Some(p) = stack.pop() {
if !visited.insert(p) {
continue;
}
if let Some(anc) = entries.get(&p) {
package_fee += u64::from(anc.fee);
package_vsize += u64::from(anc.vsize);
ancestors.push(CpfpEntry {
txid: anc.txid.clone(),
weight: Weight::from(anc.vsize),
fee: anc.fee,
});
stack.extend(anc.depends.iter().cloned());
let indexer = self.indexer();
let computer = self.computer();
// Block's tx_index range. Reduces the per-neighbor height check to a
// pair of integer compares (vs `tx_heights.get_shared` which acquires
// a read lock and walks a `RangeMap`).
let Ok(block_first) = indexer
.vecs
.transactions
.first_tx_index
.collect_one(seed_height)
.data()
else {
return CpfpInfo::default();
};
let block_end = indexer
.vecs
.transactions
.first_tx_index
.collect_one(seed_height.incremented())
.unwrap_or_else(|| TxIndex::from(indexer.vecs.transactions.txid.len()));
let same_block = |idx: TxIndex| idx >= block_first && idx < block_end;
let mut first_txin = indexer.vecs.transactions.first_txin_index.cursor();
let mut first_txout = indexer.vecs.transactions.first_txout_index.cursor();
let mut outpoint = indexer.vecs.inputs.outpoint.cursor();
let mut spent = computer.outputs.spent.txin_index.cursor();
let mut spending_tx = indexer.vecs.inputs.tx_index.cursor();
let mut visited: FxHashSet<TxIndex> = FxHashSet::with_capacity_and_hasher(
2 * MAX + 1,
Default::default(),
);
visited.insert(seed_idx);
let mut ancestor_idxs: Vec<TxIndex> = Vec::with_capacity(MAX);
let mut queue: Vec<TxIndex> = vec![seed_idx];
'a: while let Some(cur) = queue.pop() {
let Ok(start) = first_txin.get(cur.to_usize()).data() else { continue };
let Ok(end) = first_txin.get(cur.to_usize() + 1).data() else { continue };
for i in usize::from(start)..usize::from(end) {
let Ok(op) = outpoint.get(i).data() else { continue };
if op.is_coinbase() {
continue;
}
let parent = op.tx_index();
if !visited.insert(parent) || !same_block(parent) {
continue;
}
ancestor_idxs.push(parent);
queue.push(parent);
if ancestor_idxs.len() >= MAX {
break 'a;
}
}
}
let mut descendants = Vec::new();
for child_prefix in entries.children(&prefix) {
if let Some(e) = entries.get(&child_prefix) {
descendants.push(CpfpEntry {
txid: e.txid.clone(),
weight: Weight::from(e.vsize),
fee: e.fee,
});
let mut descendant_idxs: Vec<TxIndex> = Vec::with_capacity(MAX);
let mut queue: Vec<TxIndex> = vec![seed_idx];
'd: while let Some(cur) = queue.pop() {
let Ok(start) = first_txout.get(cur.to_usize()).data() else { continue };
let Ok(end) = first_txout.get(cur.to_usize() + 1).data() else { continue };
for i in usize::from(start)..usize::from(end) {
let Ok(txin_idx) = spent.get(i).data() else { continue };
if txin_idx == TxInIndex::UNSPENT {
continue;
}
let Ok(child) = spending_tx.get(usize::from(txin_idx)).data() else { continue };
if !visited.insert(child) || !same_block(child) {
continue;
}
descendant_idxs.push(child);
queue.push(child);
if descendant_idxs.len() >= MAX {
break 'd;
}
}
}
let self_rate = entry.fee_rate();
let package_rate = FeeRate::from((Sats::from(package_fee), VSize::from(package_vsize)));
let effective_fee_per_vsize = if package_rate > self_rate {
package_rate
} else {
self_rate
// Phase 2: bulk-fetch (weight, fee) for seed + cluster, cursors opened
// once and reads issued in tx_index order for sequential page locality.
let mut all = Vec::with_capacity(1 + ancestor_idxs.len() + descendant_idxs.len());
all.push(seed_idx);
all.extend(&ancestor_idxs);
all.extend(&descendant_idxs);
let Ok(weights_fees) = self.txs_weight_fee(&all) else {
return CpfpInfo::default();
};
let txid_reader = indexer.vecs.transactions.txid.reader();
let entry_at = |i: usize, idx: TxIndex| {
let (weight, fee) = weights_fees[i];
CpfpEntry {
txid: txid_reader.get(idx.to_usize()),
weight,
fee,
}
};
let (seed_weight, seed_fee) = weights_fees[0];
let seed_vsize = VSize::from(seed_weight);
let ancestors: Vec<CpfpEntry> = ancestor_idxs
.iter()
.enumerate()
.map(|(k, &idx)| entry_at(1 + k, idx))
.collect();
let descendants: Vec<CpfpEntry> = descendant_idxs
.iter()
.enumerate()
.map(|(k, &idx)| entry_at(1 + ancestor_idxs.len() + k, idx))
.collect();
let (sum_fee, sum_vsize) = ancestors
.iter()
.chain(descendants.iter())
.fold((u64::from(seed_fee), u64::from(seed_vsize)), |(f, v), e| {
(f + u64::from(e.fee), v + u64::from(VSize::from(e.weight)))
});
let package_rate = FeeRate::from((Sats::from(sum_fee), VSize::from(sum_vsize)));
let effective = FeeRate::from((seed_fee, seed_vsize)).max(package_rate);
let best_descendant = descendants
.iter()
.max_by(|a, b| {
FeeRate::from((a.fee, a.weight))
.partial_cmp(&FeeRate::from((b.fee, b.weight)))
.unwrap_or(Ordering::Equal)
})
.max_by_key(|e| FeeRate::from((e.fee, e.weight)))
.cloned();
Ok(CpfpInfo {
CpfpInfo {
ancestors,
best_descendant,
descendants,
effective_fee_per_vsize: Some(effective_fee_per_vsize),
fee: Some(entry.fee),
adjusted_vsize: Some(entry.vsize),
})
effective_fee_per_vsize: Some(effective),
fee: Some(seed_fee),
adjusted_vsize: Some(seed_vsize),
}
}
/// Bulk read `(weight, fee)` for many tx_indexes. Cursors opened once;
/// reads issued in ascending `tx_index` order for sequential I/O,
/// results returned in the caller's order.
fn txs_weight_fee(&self, idxs: &[TxIndex]) -> Result<Vec<(Weight, Sats)>> {
if idxs.is_empty() {
return Ok(vec![]);
}
let indexer = self.indexer();
let computer = self.computer();
let mut base_size = indexer.vecs.transactions.base_size.cursor();
let mut total_size = indexer.vecs.transactions.total_size.cursor();
let mut fee_cursor = computer.transactions.fees.fee.tx_index.cursor();
let mut order: Vec<usize> = (0..idxs.len()).collect();
order.sort_unstable_by_key(|&i| idxs[i]);
let mut out = vec![(Weight::default(), Sats::ZERO); idxs.len()];
for &pos in &order {
let i = idxs[pos].to_usize();
let bs = base_size.get(i).data()?;
let ts = total_size.get(i).data()?;
let f = fee_cursor.get(i).data()?;
out[pos] = (Weight::from_sizes(*bs, *ts), f);
}
Ok(out)
}
/// RBF history for a tx, matching mempool.space's