diff --git a/crates/brk_monitor/src/mempool/block_builder/audit.rs b/crates/brk_monitor/src/mempool/block_builder/audit.rs index 13ace27d4..c90233e61 100644 --- a/crates/brk_monitor/src/mempool/block_builder/audit.rs +++ b/crates/brk_monitor/src/mempool/block_builder/audit.rs @@ -64,8 +64,8 @@ pub struct AuditTx { pub ancestor_vsize: VSize, /// Already selected into a block pub used: bool, - /// Already in modified priority queue - pub in_modified: bool, + /// Generation counter for invalidating stale heap entries + pub generation: u32, } impl AuditTx { @@ -88,31 +88,20 @@ impl AuditTx { ancestor_fee, ancestor_vsize, used: false, - in_modified: false, + generation: 0, } } - - #[inline] - pub fn has_higher_score_than(&self, other: &Self) -> bool { - has_higher_fee_rate( - self.ancestor_fee, - self.ancestor_vsize, - other.ancestor_fee, - other.ancestor_vsize, - ) - } } -/// Priority queue entry for the modified queue. -/// Stores a snapshot of ancestor values at time of insertion. +/// Priority queue entry. Stores snapshot of score at insertion time. #[derive(Clone, Copy)] pub struct TxPriority { - /// Pool index (for indexing into pool array) pub pool_idx: PoolIndex, - /// Snapshot of ancestor fee at insertion time - pub ancestor_fee: Sats, - /// Snapshot of ancestor vsize at insertion time - pub ancestor_vsize: VSize, + /// Score snapshot for heap ordering + ancestor_fee: Sats, + ancestor_vsize: VSize, + /// Generation at insertion (detects stale entries) + pub generation: u32, } impl TxPriority { @@ -121,11 +110,18 @@ impl TxPriority { pool_idx: tx.pool_idx, ancestor_fee: tx.ancestor_fee, ancestor_vsize: tx.ancestor_vsize, + generation: tx.generation, } } + /// Check if this entry is stale (tx was updated since insertion). #[inline] - pub fn has_higher_score_than(&self, other: &Self) -> bool { + pub fn is_stale(&self, tx: &AuditTx) -> bool { + self.generation != tx.generation + } + + #[inline] + fn has_higher_score_than(&self, other: &Self) -> bool { has_higher_fee_rate( self.ancestor_fee, self.ancestor_vsize, diff --git a/crates/brk_monitor/src/mempool/block_builder/build.rs b/crates/brk_monitor/src/mempool/block_builder/build.rs index 0a1e7989f..94e539933 100644 --- a/crates/brk_monitor/src/mempool/block_builder/build.rs +++ b/crates/brk_monitor/src/mempool/block_builder/build.rs @@ -8,14 +8,8 @@ use crate::mempool::{MempoolEntry, MempoolTxIndex, PoolIndex, SelectedTx}; /// Number of projected blocks to build const NUM_PROJECTED_BLOCKS: usize = 8; -/// Estimated txs per block (for partial sort optimization) -const TXS_PER_BLOCK: usize = 4000; - /// Build projected blocks from mempool entries. -/// -/// Returns SelectedTx (with effective fee rate) grouped by block, in mining priority order. pub fn build_projected_blocks(entries: &[Option]) -> Vec> { - // Collect live entries let live: Vec<(MempoolTxIndex, &MempoolEntry)> = entries .iter() .enumerate() @@ -26,30 +20,20 @@ pub fn build_projected_blocks(entries: &[Option]) -> Vec Vec { - // Create mapping from TxidPrefix to pool index - let prefix_to_pool_idx: FxHashMap = live + // Map TxidPrefix -> pool index + let prefix_to_idx: FxHashMap = live .iter() .enumerate() - .map(|(pool_idx, (_, entry))| (entry.txid_prefix(), PoolIndex::from(pool_idx))) + .map(|(i, (_, entry))| (entry.txid_prefix(), PoolIndex::from(i))) .collect(); // Build pool with parent relationships - // Use Bitcoin Core's pre-computed ancestor_fee and ancestor_vsize let mut pool: Vec = live .iter() .enumerate() @@ -64,10 +48,10 @@ fn build_audit_pool(live: &[(MempoolTxIndex, &MempoolEntry)]) -> Vec { entry.ancestor_vsize, ); - // Find in-mempool parents from depends list (provided by Bitcoin Core) + // Add in-mempool parents for parent_prefix in &entry.depends { - if let Some(&parent_pool_idx) = prefix_to_pool_idx.get(parent_prefix) { - tx.parents.push(parent_pool_idx); + if let Some(&parent_idx) = prefix_to_idx.get(parent_prefix) { + tx.parents.push(parent_idx); } } @@ -76,43 +60,12 @@ fn build_audit_pool(live: &[(MempoolTxIndex, &MempoolEntry)]) -> Vec { .collect(); // Build child relationships (reverse of parents) - for pool_idx in 0..pool.len() { - let parents = pool[pool_idx].parents.clone(); - for parent_pool_idx in parents { - pool[parent_pool_idx.as_usize()].children.push(PoolIndex::from(pool_idx)); + for i in 0..pool.len() { + let parents = pool[i].parents.clone(); + for parent_idx in parents { + pool[parent_idx.as_usize()].children.push(PoolIndex::from(i)); } } pool } - -/// Partial sort: only fully sort the top N txs needed for blocks. -/// Returns pool indices sorted by ancestor score. -fn partial_sort_by_score(pool: &Pool) -> Vec { - let mut indices: Vec = (0..pool.len()).map(PoolIndex::from).collect(); - let needed = NUM_PROJECTED_BLOCKS * TXS_PER_BLOCK; - - // Comparator: descending by score, then ascending by index (deterministic tiebreaker) - let cmp = |a: &PoolIndex, b: &PoolIndex| -> std::cmp::Ordering { - let tx_a = &pool[*a]; - let tx_b = &pool[*b]; - if tx_b.has_higher_score_than(tx_a) { - std::cmp::Ordering::Greater - } else if tx_a.has_higher_score_than(tx_b) { - std::cmp::Ordering::Less - } else { - a.cmp(b) - } - }; - - if indices.len() > needed { - // Partition: move top `needed` to front (unordered), then sort just those - indices.select_nth_unstable_by(needed, cmp); - indices[..needed].sort_unstable_by(cmp); - indices.truncate(needed); - } else { - indices.sort_unstable_by(cmp); - } - - indices -} diff --git a/crates/brk_monitor/src/mempool/block_builder/selection.rs b/crates/brk_monitor/src/mempool/block_builder/selection.rs index 54e24e882..46ea7cf6b 100644 --- a/crates/brk_monitor/src/mempool/block_builder/selection.rs +++ b/crates/brk_monitor/src/mempool/block_builder/selection.rs @@ -1,174 +1,233 @@ use std::collections::BinaryHeap; use brk_types::FeeRate; +use rustc_hash::FxHashSet; use super::audit::{Pool, TxPriority}; -use crate::mempool::{PoolIndex, SelectedTx}; +use crate::mempool::{MempoolTxIndex, PoolIndex, SelectedTx}; /// Target vsize per block (~1MB, derived from 4MW weight limit) const BLOCK_VSIZE_LIMIT: u64 = 1_000_000; -/// Select transactions into blocks using the two-source algorithm. +/// How many packages to look ahead when current doesn't fit +const LOOK_AHEAD: usize = 100; + +/// A CPFP package - atomic unit that must be included together. +struct Package { + /// Transactions in topological order (parents before children) + txs: Vec<(MempoolTxIndex, FeeRate)>, + /// Combined vsize of all txs + vsize: u64, + /// Package fee rate (same for all txs) + fee_rate: FeeRate, +} + +/// Select transactions into projected blocks. /// -/// Takes pool indices (sorted by score), returns SelectedTx with effective fee rate at selection time. -pub fn select_into_blocks( - pool: &mut Pool, - sorted_pool_indices: Vec, - num_blocks: usize, -) -> Vec> { - let mut blocks: Vec> = Vec::with_capacity(num_blocks); - let mut current_block: Vec = Vec::new(); - let mut current_vsize: u64 = 0; +/// Algorithm: +/// 1. Select txs via heap, grouping CPFP chains into atomic packages +/// 2. Sort packages by fee rate +/// 3. Partition into blocks (packages are atomic, never split) +pub fn select_into_blocks(pool: &mut Pool, num_blocks: usize) -> Vec> { + // Phase 1: Select and group into packages + let packages = select_packages(pool, num_blocks); - let mut sorted_iter = sorted_pool_indices.into_iter().peekable(); - let mut modified_queue: BinaryHeap = BinaryHeap::new(); + // Phase 2: Partition packages into blocks + partition_into_blocks(packages, num_blocks) +} - 'outer: loop { - // Pick best candidate from either sorted list or modified queue - let best_pool_idx = pick_best_candidate(pool, &mut sorted_iter, &mut modified_queue); - let Some(pool_idx) = best_pool_idx else { - break; - }; +/// Select txs and group CPFP chains into atomic packages. +fn select_packages(pool: &mut Pool, num_blocks: usize) -> Vec { + let target_vsize = BLOCK_VSIZE_LIMIT * num_blocks as u64; + let mut total_vsize: u64 = 0; + let mut packages: Vec = Vec::new(); - // Skip if already used - if pool[pool_idx].used { + let mut heap: BinaryHeap = (0..pool.len()) + .map(|i| TxPriority::new(&pool[PoolIndex::from(i)])) + .collect(); + + while let Some(entry) = heap.pop() { + let tx = &pool[entry.pool_idx]; + + if tx.used || entry.is_stale(tx) { continue; } - // Capture the package rate BEFORE selecting ancestors - // This is the rate that justified this tx (and its ancestors) for inclusion - let package_rate = { - let tx = &pool[pool_idx]; - FeeRate::from((tx.ancestor_fee, tx.ancestor_vsize)) - }; + // Package rate at selection time + let package_rate = FeeRate::from((tx.ancestor_fee, tx.ancestor_vsize)); - // Select this tx and all its unselected ancestors - let selected = select_with_ancestors(pool, pool_idx); + // Select this tx and all unselected ancestors (parents first) + let ancestors = select_with_ancestors(pool, entry.pool_idx); - for sel_pool_idx in selected { - let tx = &pool[sel_pool_idx]; - let tx_vsize = u64::from(tx.vsize); + let mut package_vsize: u64 = 0; + let mut txs = Vec::with_capacity(ancestors.len()); - // Check if tx fits in current block - if current_vsize + tx_vsize > BLOCK_VSIZE_LIMIT && !current_block.is_empty() { - blocks.push(std::mem::take(&mut current_block)); - current_vsize = 0; + for sel_idx in ancestors { + let vsize = u64::from(pool[sel_idx].vsize); + txs.push((pool[sel_idx].entries_idx, package_rate)); + package_vsize += vsize; - if blocks.len() >= num_blocks { - // Early exit - we have enough blocks - break 'outer; - } - } + update_descendants(pool, sel_idx, &mut heap); + } - // Effective fee rate = the package rate at selection time. - // This is the mining score that determined which block this tx lands in. - // For CPFP, both parent and child get the same package rate (the child's score). - current_block.push(SelectedTx { - entries_idx: tx.entries_idx, - effective_fee_rate: package_rate, - }); - current_vsize += tx_vsize; + total_vsize += package_vsize; + packages.push(Package { + txs, + vsize: package_vsize, + fee_rate: package_rate, + }); - // Update descendants' ancestor scores - update_descendants(pool, sel_pool_idx, &mut modified_queue); + if total_vsize >= target_vsize { + break; } } - // Don't forget the last block - if !current_block.is_empty() && blocks.len() < num_blocks { - blocks.push(current_block); + packages +} + +/// Sort packages by fee rate and partition into blocks. +fn partition_into_blocks(mut packages: Vec, num_blocks: usize) -> Vec> { + // Sort by fee rate descending + packages.sort_unstable_by(|a, b| b.fee_rate.cmp(&a.fee_rate)); + + // Debug: show top and bottom packages after sorting + log::info!("=== Top 10 packages after sorting ==="); + for (i, pkg) in packages.iter().take(10).enumerate() { + log::info!( + " #{}: rate={:.4} sat/vB, vsize={}, txs={}", + i, + f64::from(pkg.fee_rate), + pkg.vsize, + pkg.txs.len() + ); + } + log::info!("=== Bottom 10 packages after sorting ==="); + let start = packages.len().saturating_sub(10); + for (i, pkg) in packages.iter().skip(start).enumerate() { + log::info!( + " #{}: rate={:.4} sat/vB, vsize={}, txs={}", + start + i, + f64::from(pkg.fee_rate), + pkg.vsize, + pkg.txs.len() + ); } - // Post-process: fix fee rate ordering violations between adjacent blocks. - // This handles cases where a tx's score improved after its target block was full. - fix_block_ordering(&mut blocks); + let mut blocks: Vec> = Vec::with_capacity(num_blocks); + let mut current_block: Vec = Vec::new(); + let mut current_block_packages: Vec<(FeeRate, u64, usize)> = Vec::new(); // for debug + let mut current_vsize: u64 = 0; + let mut used = vec![false; packages.len()]; - // Log how many txs were left unselected - let total_selected: usize = blocks.iter().map(|b| b.len()).sum(); - log::debug!( - "Selected {} txs into {} blocks, modified_queue has {} remaining", - total_selected, - blocks.len(), - modified_queue.len() - ); + let mut i = 0; + while i < packages.len() && blocks.len() < num_blocks { + if used[i] { + i += 1; + continue; + } + + let remaining = BLOCK_VSIZE_LIMIT.saturating_sub(current_vsize); + + if packages[i].vsize <= remaining { + // Package fits in current block + current_block_packages.push((packages[i].fee_rate, packages[i].vsize, packages[i].txs.len())); + add_package_to_block(&packages[i], &mut current_block); + current_vsize += packages[i].vsize; + used[i] = true; + i += 1; + } else if current_block.is_empty() { + // Empty block - add package anyway (handles edge case) + current_block_packages.push((packages[i].fee_rate, packages[i].vsize, packages[i].txs.len())); + add_package_to_block(&packages[i], &mut current_block); + current_vsize += packages[i].vsize; + used[i] = true; + i += 1; + } else { + // Look ahead for a smaller package that fits + let mut found = false; + let look_ahead_end = (i + LOOK_AHEAD).min(packages.len()); + + for j in (i + 1)..look_ahead_end { + if used[j] || packages[j].vsize > remaining { + continue; + } + log::debug!( + "Look-ahead: adding pkg #{} (rate={:.4}) to block {} (min so far={:.4})", + j, + f64::from(packages[j].fee_rate), + blocks.len() + 1, + current_block_packages.iter().map(|(r, _, _)| f64::from(*r)).fold(f64::INFINITY, f64::min) + ); + current_block_packages.push((packages[j].fee_rate, packages[j].vsize, packages[j].txs.len())); + add_package_to_block(&packages[j], &mut current_block); + current_vsize += packages[j].vsize; + used[j] = true; + found = true; + break; + } + + if !found { + // No package fits, start new block + log_block_debug(blocks.len() + 1, ¤t_block_packages); + blocks.push(std::mem::take(&mut current_block)); + current_block_packages.clear(); + current_vsize = 0; + } + } + } + + if !current_block.is_empty() && blocks.len() < num_blocks { + log_block_debug(blocks.len() + 1, ¤t_block_packages); + blocks.push(current_block); + } blocks } -/// Pick the best candidate from sorted list or modified queue. -/// Returns a pool index. -fn pick_best_candidate( - pool: &Pool, - sorted_iter: &mut std::iter::Peekable>, - modified_queue: &mut BinaryHeap, -) -> Option { - // Skip used txs in sorted iterator - while sorted_iter.peek().is_some_and(|&idx| pool[idx].used) { - sorted_iter.next(); +fn log_block_debug(block_num: usize, packages: &[(FeeRate, u64, usize)]) { + if packages.is_empty() { + return; } + let mut sorted: Vec<_> = packages.to_vec(); + sorted.sort_by(|a, b| b.0.cmp(&a.0)); - // Skip used txs and stale entries in modified queue. - // A tx can be pushed multiple times as its score improves (when different ancestors are selected). - // For example: tx C depends on A and B. When A is selected, C is pushed with score 2.0. - // When B is selected, C is pushed again with score 4.0. The queue now has two entries for C. - // We skip the stale 2.0 entry and use the current 4.0 entry. - while let Some(p) = modified_queue.peek() { - let tx = &pool[p.pool_idx]; - if tx.used { - modified_queue.pop(); - continue; - } - // Check if this queue entry has outdated snapshot (a newer entry exists with better score) - if p.ancestor_fee != tx.ancestor_fee || p.ancestor_vsize != tx.ancestor_vsize { - modified_queue.pop(); - continue; - } - break; + log::info!("=== Block {} - {} packages ===", block_num, packages.len()); + log::info!(" Top 5:"); + for (rate, vsize, txs) in sorted.iter().take(5) { + log::info!(" rate={:.4} sat/vB, vsize={}, txs={}", f64::from(*rate), vsize, txs); } - - let sorted_best = sorted_iter.peek().map(|&idx| &pool[idx]); - let modified_best = modified_queue.peek().map(|p| &pool[p.pool_idx]); - - match (sorted_best, modified_best) { - (None, None) => None, - (Some(_), None) => sorted_iter.next(), - (None, Some(_)) => { - let p = modified_queue.pop().unwrap(); - Some(p.pool_idx) - } - (Some(sorted_tx), Some(modified_tx)) => { - // Compare CURRENT scores from pool (not stale snapshots) - if sorted_tx.has_higher_score_than(modified_tx) { - sorted_iter.next() - } else { - let p = modified_queue.pop().unwrap(); - Some(p.pool_idx) - } - } + log::info!(" Bottom 5:"); + let start = sorted.len().saturating_sub(5); + for (rate, vsize, txs) in sorted.iter().skip(start) { + log::info!(" rate={:.4} sat/vB, vsize={}, txs={}", f64::from(*rate), vsize, txs); } } -/// Select a tx and all its unselected ancestors (topological order). -/// Returns pool indices with parents before children. -fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec { - let mut to_select: Vec = Vec::new(); +fn add_package_to_block(package: &Package, block: &mut Vec) { + for (entries_idx, effective_fee_rate) in &package.txs { + block.push(SelectedTx { + entries_idx: *entries_idx, + effective_fee_rate: *effective_fee_rate, + }); + } +} - // Stack entries: (pool_idx, parents_processed) +/// Select a tx and all its unselected ancestors in topological order. +fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec { + let mut result: Vec = Vec::new(); let mut stack: Vec<(PoolIndex, bool)> = vec![(pool_idx, false)]; - while let Some((current, parents_processed)) = stack.pop() { - if pool[current].used { + while let Some((idx, parents_done)) = stack.pop() { + if pool[idx].used { continue; } - if parents_processed { - // All parents handled, select this tx - pool[current].used = true; - to_select.push(current); + if parents_done { + pool[idx].used = true; + result.push(idx); } else { - // First visit: push self for post-processing, then push parents - stack.push((current, true)); - for &parent in &pool[current].parents { + stack.push((idx, true)); + for &parent in &pool[idx].parents { if !pool[parent].used { stack.push((parent, false)); } @@ -176,122 +235,35 @@ fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec } } - to_select -} - -/// Fix fee rate ordering violations between blocks. -/// Ensures Block[i].min >= Block[i+1].max for all adjacent blocks. -/// -/// Uses cached min/max indices to avoid O(n) scans on each iteration. -fn fix_block_ordering(blocks: &mut [Vec]) { - if blocks.len() < 2 { - return; - } - - // Cache (min_idx, max_idx) for each block - let mut cache: Vec<(usize, usize)> = blocks - .iter() - .map(|block| find_min_max_indices(block)) - .collect(); - - let mut iterations = 0; - const MAX_ITERATIONS: usize = 100; - - loop { - let mut changed = false; - iterations += 1; - - for i in 0..blocks.len() - 1 { - let (curr_min_idx, _) = cache[i]; - let (_, next_max_idx) = cache[i + 1]; - - // Skip empty blocks - if blocks[i].is_empty() || blocks[i + 1].is_empty() { - continue; - } - - let curr_min = blocks[i][curr_min_idx].effective_fee_rate; - let next_max = blocks[i + 1][next_max_idx].effective_fee_rate; - - if next_max > curr_min { - // Swap: high-fee tx to earlier block, low-fee tx to later block - let high_tx = blocks[i + 1].swap_remove(next_max_idx); - let low_tx = blocks[i].swap_remove(curr_min_idx); - blocks[i].push(high_tx); - blocks[i + 1].push(low_tx); - - // Recompute cache only for affected blocks - cache[i] = find_min_max_indices(&blocks[i]); - cache[i + 1] = find_min_max_indices(&blocks[i + 1]); - changed = true; - } - } - - if !changed || iterations >= MAX_ITERATIONS { - break; - } - } - - if iterations >= MAX_ITERATIONS { - log::warn!("fix_block_ordering: reached max iterations, some violations may remain"); - } -} - -/// Find indices of min and max fee rate transactions in a block. -fn find_min_max_indices(block: &[SelectedTx]) -> (usize, usize) { - if block.is_empty() { - return (0, 0); - } - let mut min_idx = 0; - let mut max_idx = 0; - for (i, tx) in block.iter().enumerate().skip(1) { - if tx.effective_fee_rate < block[min_idx].effective_fee_rate { - min_idx = i; - } - if tx.effective_fee_rate > block[max_idx].effective_fee_rate { - max_idx = i; - } - } - (min_idx, max_idx) + result } /// Update descendants' ancestor scores after selecting a tx. -/// Takes a pool index. -fn update_descendants( - pool: &mut Pool, - selected_pool_idx: PoolIndex, - modified_queue: &mut BinaryHeap, -) { - let selected_fee = pool[selected_pool_idx].fee; - let selected_vsize = pool[selected_pool_idx].vsize; +fn update_descendants(pool: &mut Pool, selected_idx: PoolIndex, heap: &mut BinaryHeap) { + let selected_fee = pool[selected_idx].fee; + let selected_vsize = pool[selected_idx].vsize; - // Track visited to avoid double-subtracting in diamond patterns - let mut visited = rustc_hash::FxHashSet::default(); - - // BFS through children (children are pool indices) - let mut stack: Vec = pool[selected_pool_idx].children.to_vec(); + // Track visited to avoid double-updates in diamond patterns + let mut visited: FxHashSet = FxHashSet::default(); + let mut stack: Vec = pool[selected_idx].children.to_vec(); while let Some(child_idx) = stack.pop() { - // Skip if already visited (handles diamond patterns) if !visited.insert(child_idx) { continue; } let child = &mut pool[child_idx]; - if child.used { continue; } - // Subtract selected tx from ancestor totals + // Update ancestor totals child.ancestor_fee -= selected_fee; child.ancestor_vsize -= selected_vsize; - // Always re-push to modified queue with updated score. - // This may create duplicates, but we handle that by checking - // if the tx is used or if the snapshot is stale when popping. - modified_queue.push(TxPriority::new(child)); - child.in_modified = true; + // Increment generation and re-push to heap + child.generation += 1; + heap.push(TxPriority::new(child)); // Continue to grandchildren stack.extend(child.children.iter().copied());