mempool: snapshot 3

This commit is contained in:
nym21
2025-12-13 17:34:34 +01:00
parent c5e9b75261
commit db57db4bd9
3 changed files with 224 additions and 303 deletions

View File

@@ -64,8 +64,8 @@ pub struct AuditTx {
pub ancestor_vsize: VSize,
/// Already selected into a block
pub used: bool,
/// Already in modified priority queue
pub in_modified: bool,
/// Generation counter for invalidating stale heap entries
pub generation: u32,
}
impl AuditTx {
@@ -88,31 +88,20 @@ impl AuditTx {
ancestor_fee,
ancestor_vsize,
used: false,
in_modified: false,
generation: 0,
}
}
#[inline]
pub fn has_higher_score_than(&self, other: &Self) -> bool {
has_higher_fee_rate(
self.ancestor_fee,
self.ancestor_vsize,
other.ancestor_fee,
other.ancestor_vsize,
)
}
}
/// Priority queue entry for the modified queue.
/// Stores a snapshot of ancestor values at time of insertion.
/// Priority queue entry. Stores snapshot of score at insertion time.
#[derive(Clone, Copy)]
pub struct TxPriority {
/// Pool index (for indexing into pool array)
pub pool_idx: PoolIndex,
/// Snapshot of ancestor fee at insertion time
pub ancestor_fee: Sats,
/// Snapshot of ancestor vsize at insertion time
pub ancestor_vsize: VSize,
/// Score snapshot for heap ordering
ancestor_fee: Sats,
ancestor_vsize: VSize,
/// Generation at insertion (detects stale entries)
pub generation: u32,
}
impl TxPriority {
@@ -121,11 +110,18 @@ impl TxPriority {
pool_idx: tx.pool_idx,
ancestor_fee: tx.ancestor_fee,
ancestor_vsize: tx.ancestor_vsize,
generation: tx.generation,
}
}
/// Check if this entry is stale (tx was updated since insertion).
#[inline]
pub fn has_higher_score_than(&self, other: &Self) -> bool {
pub fn is_stale(&self, tx: &AuditTx) -> bool {
self.generation != tx.generation
}
#[inline]
fn has_higher_score_than(&self, other: &Self) -> bool {
has_higher_fee_rate(
self.ancestor_fee,
self.ancestor_vsize,

View File

@@ -8,14 +8,8 @@ use crate::mempool::{MempoolEntry, MempoolTxIndex, PoolIndex, SelectedTx};
/// Number of projected blocks to build
const NUM_PROJECTED_BLOCKS: usize = 8;
/// Estimated txs per block (for partial sort optimization)
const TXS_PER_BLOCK: usize = 4000;
/// Build projected blocks from mempool entries.
///
/// Returns SelectedTx (with effective fee rate) grouped by block, in mining priority order.
pub fn build_projected_blocks(entries: &[Option<MempoolEntry>]) -> Vec<Vec<SelectedTx>> {
// Collect live entries
let live: Vec<(MempoolTxIndex, &MempoolEntry)> = entries
.iter()
.enumerate()
@@ -26,30 +20,20 @@ pub fn build_projected_blocks(entries: &[Option<MempoolEntry>]) -> Vec<Vec<Selec
return Vec::new();
}
// Build AuditTx pool with pre-computed ancestor values from Bitcoin Core
let mut pool = Pool::new(build_audit_pool(&live));
// Sort by ancestor score (partial sort for efficiency)
let sorted = partial_sort_by_score(&pool);
// Run selection algorithm
select_into_blocks(&mut pool, sorted, NUM_PROJECTED_BLOCKS)
select_into_blocks(&mut pool, NUM_PROJECTED_BLOCKS)
}
/// Build the AuditTx pool with parent/child relationships.
/// AuditTx.parents and .children store pool indices (for graph traversal).
/// AuditTx.entries_idx stores the original entries index (for final output).
/// Uses Bitcoin Core's pre-computed ancestor values (correct, no double-counting).
fn build_audit_pool(live: &[(MempoolTxIndex, &MempoolEntry)]) -> Vec<AuditTx> {
// Create mapping from TxidPrefix to pool index
let prefix_to_pool_idx: FxHashMap<TxidPrefix, PoolIndex> = live
// Map TxidPrefix -> pool index
let prefix_to_idx: FxHashMap<TxidPrefix, PoolIndex> = live
.iter()
.enumerate()
.map(|(pool_idx, (_, entry))| (entry.txid_prefix(), PoolIndex::from(pool_idx)))
.map(|(i, (_, entry))| (entry.txid_prefix(), PoolIndex::from(i)))
.collect();
// Build pool with parent relationships
// Use Bitcoin Core's pre-computed ancestor_fee and ancestor_vsize
let mut pool: Vec<AuditTx> = live
.iter()
.enumerate()
@@ -64,10 +48,10 @@ fn build_audit_pool(live: &[(MempoolTxIndex, &MempoolEntry)]) -> Vec<AuditTx> {
entry.ancestor_vsize,
);
// Find in-mempool parents from depends list (provided by Bitcoin Core)
// Add in-mempool parents
for parent_prefix in &entry.depends {
if let Some(&parent_pool_idx) = prefix_to_pool_idx.get(parent_prefix) {
tx.parents.push(parent_pool_idx);
if let Some(&parent_idx) = prefix_to_idx.get(parent_prefix) {
tx.parents.push(parent_idx);
}
}
@@ -76,43 +60,12 @@ fn build_audit_pool(live: &[(MempoolTxIndex, &MempoolEntry)]) -> Vec<AuditTx> {
.collect();
// Build child relationships (reverse of parents)
for pool_idx in 0..pool.len() {
let parents = pool[pool_idx].parents.clone();
for parent_pool_idx in parents {
pool[parent_pool_idx.as_usize()].children.push(PoolIndex::from(pool_idx));
for i in 0..pool.len() {
let parents = pool[i].parents.clone();
for parent_idx in parents {
pool[parent_idx.as_usize()].children.push(PoolIndex::from(i));
}
}
pool
}
/// Partial sort: only fully sort the top N txs needed for blocks.
/// Returns pool indices sorted by ancestor score.
fn partial_sort_by_score(pool: &Pool) -> Vec<PoolIndex> {
let mut indices: Vec<PoolIndex> = (0..pool.len()).map(PoolIndex::from).collect();
let needed = NUM_PROJECTED_BLOCKS * TXS_PER_BLOCK;
// Comparator: descending by score, then ascending by index (deterministic tiebreaker)
let cmp = |a: &PoolIndex, b: &PoolIndex| -> std::cmp::Ordering {
let tx_a = &pool[*a];
let tx_b = &pool[*b];
if tx_b.has_higher_score_than(tx_a) {
std::cmp::Ordering::Greater
} else if tx_a.has_higher_score_than(tx_b) {
std::cmp::Ordering::Less
} else {
a.cmp(b)
}
};
if indices.len() > needed {
// Partition: move top `needed` to front (unordered), then sort just those
indices.select_nth_unstable_by(needed, cmp);
indices[..needed].sort_unstable_by(cmp);
indices.truncate(needed);
} else {
indices.sort_unstable_by(cmp);
}
indices
}

View File

@@ -1,174 +1,233 @@
use std::collections::BinaryHeap;
use brk_types::FeeRate;
use rustc_hash::FxHashSet;
use super::audit::{Pool, TxPriority};
use crate::mempool::{PoolIndex, SelectedTx};
use crate::mempool::{MempoolTxIndex, PoolIndex, SelectedTx};
/// Target vsize per block (~1MB, derived from 4MW weight limit)
const BLOCK_VSIZE_LIMIT: u64 = 1_000_000;
/// Select transactions into blocks using the two-source algorithm.
/// How many packages to look ahead when current doesn't fit
const LOOK_AHEAD: usize = 100;
/// A CPFP package - atomic unit that must be included together.
struct Package {
/// Transactions in topological order (parents before children)
txs: Vec<(MempoolTxIndex, FeeRate)>,
/// Combined vsize of all txs
vsize: u64,
/// Package fee rate (same for all txs)
fee_rate: FeeRate,
}
/// Select transactions into projected blocks.
///
/// Takes pool indices (sorted by score), returns SelectedTx with effective fee rate at selection time.
pub fn select_into_blocks(
pool: &mut Pool,
sorted_pool_indices: Vec<PoolIndex>,
num_blocks: usize,
) -> Vec<Vec<SelectedTx>> {
let mut blocks: Vec<Vec<SelectedTx>> = Vec::with_capacity(num_blocks);
let mut current_block: Vec<SelectedTx> = Vec::new();
let mut current_vsize: u64 = 0;
/// Algorithm:
/// 1. Select txs via heap, grouping CPFP chains into atomic packages
/// 2. Sort packages by fee rate
/// 3. Partition into blocks (packages are atomic, never split)
pub fn select_into_blocks(pool: &mut Pool, num_blocks: usize) -> Vec<Vec<SelectedTx>> {
// Phase 1: Select and group into packages
let packages = select_packages(pool, num_blocks);
let mut sorted_iter = sorted_pool_indices.into_iter().peekable();
let mut modified_queue: BinaryHeap<TxPriority> = BinaryHeap::new();
// Phase 2: Partition packages into blocks
partition_into_blocks(packages, num_blocks)
}
'outer: loop {
// Pick best candidate from either sorted list or modified queue
let best_pool_idx = pick_best_candidate(pool, &mut sorted_iter, &mut modified_queue);
let Some(pool_idx) = best_pool_idx else {
break;
};
/// Select txs and group CPFP chains into atomic packages.
fn select_packages(pool: &mut Pool, num_blocks: usize) -> Vec<Package> {
let target_vsize = BLOCK_VSIZE_LIMIT * num_blocks as u64;
let mut total_vsize: u64 = 0;
let mut packages: Vec<Package> = Vec::new();
// Skip if already used
if pool[pool_idx].used {
let mut heap: BinaryHeap<TxPriority> = (0..pool.len())
.map(|i| TxPriority::new(&pool[PoolIndex::from(i)]))
.collect();
while let Some(entry) = heap.pop() {
let tx = &pool[entry.pool_idx];
if tx.used || entry.is_stale(tx) {
continue;
}
// Capture the package rate BEFORE selecting ancestors
// This is the rate that justified this tx (and its ancestors) for inclusion
let package_rate = {
let tx = &pool[pool_idx];
FeeRate::from((tx.ancestor_fee, tx.ancestor_vsize))
};
// Package rate at selection time
let package_rate = FeeRate::from((tx.ancestor_fee, tx.ancestor_vsize));
// Select this tx and all its unselected ancestors
let selected = select_with_ancestors(pool, pool_idx);
// Select this tx and all unselected ancestors (parents first)
let ancestors = select_with_ancestors(pool, entry.pool_idx);
for sel_pool_idx in selected {
let tx = &pool[sel_pool_idx];
let tx_vsize = u64::from(tx.vsize);
let mut package_vsize: u64 = 0;
let mut txs = Vec::with_capacity(ancestors.len());
// Check if tx fits in current block
if current_vsize + tx_vsize > BLOCK_VSIZE_LIMIT && !current_block.is_empty() {
blocks.push(std::mem::take(&mut current_block));
current_vsize = 0;
for sel_idx in ancestors {
let vsize = u64::from(pool[sel_idx].vsize);
txs.push((pool[sel_idx].entries_idx, package_rate));
package_vsize += vsize;
if blocks.len() >= num_blocks {
// Early exit - we have enough blocks
break 'outer;
}
}
update_descendants(pool, sel_idx, &mut heap);
}
// Effective fee rate = the package rate at selection time.
// This is the mining score that determined which block this tx lands in.
// For CPFP, both parent and child get the same package rate (the child's score).
current_block.push(SelectedTx {
entries_idx: tx.entries_idx,
effective_fee_rate: package_rate,
});
current_vsize += tx_vsize;
total_vsize += package_vsize;
packages.push(Package {
txs,
vsize: package_vsize,
fee_rate: package_rate,
});
// Update descendants' ancestor scores
update_descendants(pool, sel_pool_idx, &mut modified_queue);
if total_vsize >= target_vsize {
break;
}
}
// Don't forget the last block
if !current_block.is_empty() && blocks.len() < num_blocks {
blocks.push(current_block);
packages
}
/// Sort packages by fee rate and partition into blocks.
fn partition_into_blocks(mut packages: Vec<Package>, num_blocks: usize) -> Vec<Vec<SelectedTx>> {
// Sort by fee rate descending
packages.sort_unstable_by(|a, b| b.fee_rate.cmp(&a.fee_rate));
// Debug: show top and bottom packages after sorting
log::info!("=== Top 10 packages after sorting ===");
for (i, pkg) in packages.iter().take(10).enumerate() {
log::info!(
" #{}: rate={:.4} sat/vB, vsize={}, txs={}",
i,
f64::from(pkg.fee_rate),
pkg.vsize,
pkg.txs.len()
);
}
log::info!("=== Bottom 10 packages after sorting ===");
let start = packages.len().saturating_sub(10);
for (i, pkg) in packages.iter().skip(start).enumerate() {
log::info!(
" #{}: rate={:.4} sat/vB, vsize={}, txs={}",
start + i,
f64::from(pkg.fee_rate),
pkg.vsize,
pkg.txs.len()
);
}
// Post-process: fix fee rate ordering violations between adjacent blocks.
// This handles cases where a tx's score improved after its target block was full.
fix_block_ordering(&mut blocks);
let mut blocks: Vec<Vec<SelectedTx>> = Vec::with_capacity(num_blocks);
let mut current_block: Vec<SelectedTx> = Vec::new();
let mut current_block_packages: Vec<(FeeRate, u64, usize)> = Vec::new(); // for debug
let mut current_vsize: u64 = 0;
let mut used = vec![false; packages.len()];
// Log how many txs were left unselected
let total_selected: usize = blocks.iter().map(|b| b.len()).sum();
log::debug!(
"Selected {} txs into {} blocks, modified_queue has {} remaining",
total_selected,
blocks.len(),
modified_queue.len()
);
let mut i = 0;
while i < packages.len() && blocks.len() < num_blocks {
if used[i] {
i += 1;
continue;
}
let remaining = BLOCK_VSIZE_LIMIT.saturating_sub(current_vsize);
if packages[i].vsize <= remaining {
// Package fits in current block
current_block_packages.push((packages[i].fee_rate, packages[i].vsize, packages[i].txs.len()));
add_package_to_block(&packages[i], &mut current_block);
current_vsize += packages[i].vsize;
used[i] = true;
i += 1;
} else if current_block.is_empty() {
// Empty block - add package anyway (handles edge case)
current_block_packages.push((packages[i].fee_rate, packages[i].vsize, packages[i].txs.len()));
add_package_to_block(&packages[i], &mut current_block);
current_vsize += packages[i].vsize;
used[i] = true;
i += 1;
} else {
// Look ahead for a smaller package that fits
let mut found = false;
let look_ahead_end = (i + LOOK_AHEAD).min(packages.len());
for j in (i + 1)..look_ahead_end {
if used[j] || packages[j].vsize > remaining {
continue;
}
log::debug!(
"Look-ahead: adding pkg #{} (rate={:.4}) to block {} (min so far={:.4})",
j,
f64::from(packages[j].fee_rate),
blocks.len() + 1,
current_block_packages.iter().map(|(r, _, _)| f64::from(*r)).fold(f64::INFINITY, f64::min)
);
current_block_packages.push((packages[j].fee_rate, packages[j].vsize, packages[j].txs.len()));
add_package_to_block(&packages[j], &mut current_block);
current_vsize += packages[j].vsize;
used[j] = true;
found = true;
break;
}
if !found {
// No package fits, start new block
log_block_debug(blocks.len() + 1, &current_block_packages);
blocks.push(std::mem::take(&mut current_block));
current_block_packages.clear();
current_vsize = 0;
}
}
}
if !current_block.is_empty() && blocks.len() < num_blocks {
log_block_debug(blocks.len() + 1, &current_block_packages);
blocks.push(current_block);
}
blocks
}
/// Pick the best candidate from sorted list or modified queue.
/// Returns a pool index.
fn pick_best_candidate(
pool: &Pool,
sorted_iter: &mut std::iter::Peekable<std::vec::IntoIter<PoolIndex>>,
modified_queue: &mut BinaryHeap<TxPriority>,
) -> Option<PoolIndex> {
// Skip used txs in sorted iterator
while sorted_iter.peek().is_some_and(|&idx| pool[idx].used) {
sorted_iter.next();
fn log_block_debug(block_num: usize, packages: &[(FeeRate, u64, usize)]) {
if packages.is_empty() {
return;
}
let mut sorted: Vec<_> = packages.to_vec();
sorted.sort_by(|a, b| b.0.cmp(&a.0));
// Skip used txs and stale entries in modified queue.
// A tx can be pushed multiple times as its score improves (when different ancestors are selected).
// For example: tx C depends on A and B. When A is selected, C is pushed with score 2.0.
// When B is selected, C is pushed again with score 4.0. The queue now has two entries for C.
// We skip the stale 2.0 entry and use the current 4.0 entry.
while let Some(p) = modified_queue.peek() {
let tx = &pool[p.pool_idx];
if tx.used {
modified_queue.pop();
continue;
}
// Check if this queue entry has outdated snapshot (a newer entry exists with better score)
if p.ancestor_fee != tx.ancestor_fee || p.ancestor_vsize != tx.ancestor_vsize {
modified_queue.pop();
continue;
}
break;
log::info!("=== Block {} - {} packages ===", block_num, packages.len());
log::info!(" Top 5:");
for (rate, vsize, txs) in sorted.iter().take(5) {
log::info!(" rate={:.4} sat/vB, vsize={}, txs={}", f64::from(*rate), vsize, txs);
}
let sorted_best = sorted_iter.peek().map(|&idx| &pool[idx]);
let modified_best = modified_queue.peek().map(|p| &pool[p.pool_idx]);
match (sorted_best, modified_best) {
(None, None) => None,
(Some(_), None) => sorted_iter.next(),
(None, Some(_)) => {
let p = modified_queue.pop().unwrap();
Some(p.pool_idx)
}
(Some(sorted_tx), Some(modified_tx)) => {
// Compare CURRENT scores from pool (not stale snapshots)
if sorted_tx.has_higher_score_than(modified_tx) {
sorted_iter.next()
} else {
let p = modified_queue.pop().unwrap();
Some(p.pool_idx)
}
}
log::info!(" Bottom 5:");
let start = sorted.len().saturating_sub(5);
for (rate, vsize, txs) in sorted.iter().skip(start) {
log::info!(" rate={:.4} sat/vB, vsize={}, txs={}", f64::from(*rate), vsize, txs);
}
}
/// Select a tx and all its unselected ancestors (topological order).
/// Returns pool indices with parents before children.
fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec<PoolIndex> {
let mut to_select: Vec<PoolIndex> = Vec::new();
fn add_package_to_block(package: &Package, block: &mut Vec<SelectedTx>) {
for (entries_idx, effective_fee_rate) in &package.txs {
block.push(SelectedTx {
entries_idx: *entries_idx,
effective_fee_rate: *effective_fee_rate,
});
}
}
// Stack entries: (pool_idx, parents_processed)
/// Select a tx and all its unselected ancestors in topological order.
fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec<PoolIndex> {
let mut result: Vec<PoolIndex> = Vec::new();
let mut stack: Vec<(PoolIndex, bool)> = vec![(pool_idx, false)];
while let Some((current, parents_processed)) = stack.pop() {
if pool[current].used {
while let Some((idx, parents_done)) = stack.pop() {
if pool[idx].used {
continue;
}
if parents_processed {
// All parents handled, select this tx
pool[current].used = true;
to_select.push(current);
if parents_done {
pool[idx].used = true;
result.push(idx);
} else {
// First visit: push self for post-processing, then push parents
stack.push((current, true));
for &parent in &pool[current].parents {
stack.push((idx, true));
for &parent in &pool[idx].parents {
if !pool[parent].used {
stack.push((parent, false));
}
@@ -176,122 +235,35 @@ fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec<PoolIndex>
}
}
to_select
}
/// Fix fee rate ordering violations between blocks.
/// Ensures Block[i].min >= Block[i+1].max for all adjacent blocks.
///
/// Uses cached min/max indices to avoid O(n) scans on each iteration.
fn fix_block_ordering(blocks: &mut [Vec<SelectedTx>]) {
if blocks.len() < 2 {
return;
}
// Cache (min_idx, max_idx) for each block
let mut cache: Vec<(usize, usize)> = blocks
.iter()
.map(|block| find_min_max_indices(block))
.collect();
let mut iterations = 0;
const MAX_ITERATIONS: usize = 100;
loop {
let mut changed = false;
iterations += 1;
for i in 0..blocks.len() - 1 {
let (curr_min_idx, _) = cache[i];
let (_, next_max_idx) = cache[i + 1];
// Skip empty blocks
if blocks[i].is_empty() || blocks[i + 1].is_empty() {
continue;
}
let curr_min = blocks[i][curr_min_idx].effective_fee_rate;
let next_max = blocks[i + 1][next_max_idx].effective_fee_rate;
if next_max > curr_min {
// Swap: high-fee tx to earlier block, low-fee tx to later block
let high_tx = blocks[i + 1].swap_remove(next_max_idx);
let low_tx = blocks[i].swap_remove(curr_min_idx);
blocks[i].push(high_tx);
blocks[i + 1].push(low_tx);
// Recompute cache only for affected blocks
cache[i] = find_min_max_indices(&blocks[i]);
cache[i + 1] = find_min_max_indices(&blocks[i + 1]);
changed = true;
}
}
if !changed || iterations >= MAX_ITERATIONS {
break;
}
}
if iterations >= MAX_ITERATIONS {
log::warn!("fix_block_ordering: reached max iterations, some violations may remain");
}
}
/// Find indices of min and max fee rate transactions in a block.
fn find_min_max_indices(block: &[SelectedTx]) -> (usize, usize) {
if block.is_empty() {
return (0, 0);
}
let mut min_idx = 0;
let mut max_idx = 0;
for (i, tx) in block.iter().enumerate().skip(1) {
if tx.effective_fee_rate < block[min_idx].effective_fee_rate {
min_idx = i;
}
if tx.effective_fee_rate > block[max_idx].effective_fee_rate {
max_idx = i;
}
}
(min_idx, max_idx)
result
}
/// Update descendants' ancestor scores after selecting a tx.
/// Takes a pool index.
fn update_descendants(
pool: &mut Pool,
selected_pool_idx: PoolIndex,
modified_queue: &mut BinaryHeap<TxPriority>,
) {
let selected_fee = pool[selected_pool_idx].fee;
let selected_vsize = pool[selected_pool_idx].vsize;
fn update_descendants(pool: &mut Pool, selected_idx: PoolIndex, heap: &mut BinaryHeap<TxPriority>) {
let selected_fee = pool[selected_idx].fee;
let selected_vsize = pool[selected_idx].vsize;
// Track visited to avoid double-subtracting in diamond patterns
let mut visited = rustc_hash::FxHashSet::default();
// BFS through children (children are pool indices)
let mut stack: Vec<PoolIndex> = pool[selected_pool_idx].children.to_vec();
// Track visited to avoid double-updates in diamond patterns
let mut visited: FxHashSet<PoolIndex> = FxHashSet::default();
let mut stack: Vec<PoolIndex> = pool[selected_idx].children.to_vec();
while let Some(child_idx) = stack.pop() {
// Skip if already visited (handles diamond patterns)
if !visited.insert(child_idx) {
continue;
}
let child = &mut pool[child_idx];
if child.used {
continue;
}
// Subtract selected tx from ancestor totals
// Update ancestor totals
child.ancestor_fee -= selected_fee;
child.ancestor_vsize -= selected_vsize;
// Always re-push to modified queue with updated score.
// This may create duplicates, but we handle that by checking
// if the tx is used or if the snapshot is stale when popping.
modified_queue.push(TxPriority::new(child));
child.in_modified = true;
// Increment generation and re-push to heap
child.generation += 1;
heap.push(TxPriority::new(child));
// Continue to grandchildren
stack.extend(child.children.iter().copied());