diff --git a/crates/brk_monitor/src/mempool/block_builder/selection.rs b/crates/brk_monitor/src/mempool/block_builder/selection.rs index 88afc0902..54e24e882 100644 --- a/crates/brk_monitor/src/mempool/block_builder/selection.rs +++ b/crates/brk_monitor/src/mempool/block_builder/selection.rs @@ -149,32 +149,29 @@ fn pick_best_candidate( } /// Select a tx and all its unselected ancestors (topological order). -/// Takes and returns pool indices. +/// Returns pool indices with parents before children. fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec { let mut to_select: Vec = Vec::new(); - let mut stack = vec![pool_idx]; - // DFS to find all unselected ancestors - while let Some(current) = stack.pop() { - let tx = &pool[current]; - if tx.used { + // Stack entries: (pool_idx, parents_processed) + let mut stack: Vec<(PoolIndex, bool)> = vec![(pool_idx, false)]; + + while let Some((current, parents_processed)) = stack.pop() { + if pool[current].used { continue; } - // Push unselected parents onto stack (process parents first) - let has_unselected_parents = tx.parents.iter().any(|&p| !pool[p].used); - if has_unselected_parents { - stack.push(current); // Re-add self to process after parents - for &parent in &tx.parents { - if !pool[parent].used { - stack.push(parent); - } - } + if parents_processed { + // All parents handled, select this tx + pool[current].used = true; + to_select.push(current); } else { - // All parents selected, can select this one - if !pool[current].used { - pool[current].used = true; - to_select.push(current); + // First visit: push self for post-processing, then push parents + stack.push((current, true)); + for &parent in &pool[current].parents { + if !pool[parent].used { + stack.push((parent, false)); + } } } } @@ -183,50 +180,56 @@ fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec } /// Fix fee rate ordering violations between blocks. -/// Swaps txs between adjacent blocks until Block N's min >= Block N+1's max. +/// Ensures Block[i].min >= Block[i+1].max for all adjacent blocks. +/// +/// Uses cached min/max indices to avoid O(n) scans on each iteration. fn fix_block_ordering(blocks: &mut [Vec]) { - // Iterate until no more swaps needed - let mut changed = true; - let mut iterations = 0; - const MAX_ITERATIONS: usize = 100; // Prevent infinite loops + if blocks.len() < 2 { + return; + } - while changed && iterations < MAX_ITERATIONS { - changed = false; + // Cache (min_idx, max_idx) for each block + let mut cache: Vec<(usize, usize)> = blocks + .iter() + .map(|block| find_min_max_indices(block)) + .collect(); + + let mut iterations = 0; + const MAX_ITERATIONS: usize = 100; + + loop { + let mut changed = false; iterations += 1; - for i in 0..blocks.len().saturating_sub(1) { - // Find min in block i and max in block i+1 - let Some(curr_min_idx) = blocks[i] - .iter() - .enumerate() - .min_by_key(|(_, s)| s.effective_fee_rate) - .map(|(idx, _)| idx) - else { - continue; - }; + for i in 0..blocks.len() - 1 { + let (curr_min_idx, _) = cache[i]; + let (_, next_max_idx) = cache[i + 1]; - let Some(next_max_idx) = blocks[i + 1] - .iter() - .enumerate() - .max_by_key(|(_, s)| s.effective_fee_rate) - .map(|(idx, _)| idx) - else { + // Skip empty blocks + if blocks[i].is_empty() || blocks[i + 1].is_empty() { continue; - }; + } let curr_min = blocks[i][curr_min_idx].effective_fee_rate; let next_max = blocks[i + 1][next_max_idx].effective_fee_rate; - // If violation exists, swap the two txs if next_max > curr_min { - // Swap: move high-fee tx to earlier block, low-fee tx to later block + // Swap: high-fee tx to earlier block, low-fee tx to later block let high_tx = blocks[i + 1].swap_remove(next_max_idx); let low_tx = blocks[i].swap_remove(curr_min_idx); blocks[i].push(high_tx); blocks[i + 1].push(low_tx); + + // Recompute cache only for affected blocks + cache[i] = find_min_max_indices(&blocks[i]); + cache[i + 1] = find_min_max_indices(&blocks[i + 1]); changed = true; } } + + if !changed || iterations >= MAX_ITERATIONS { + break; + } } if iterations >= MAX_ITERATIONS { @@ -234,6 +237,24 @@ fn fix_block_ordering(blocks: &mut [Vec]) { } } +/// Find indices of min and max fee rate transactions in a block. +fn find_min_max_indices(block: &[SelectedTx]) -> (usize, usize) { + if block.is_empty() { + return (0, 0); + } + let mut min_idx = 0; + let mut max_idx = 0; + for (i, tx) in block.iter().enumerate().skip(1) { + if tx.effective_fee_rate < block[min_idx].effective_fee_rate { + min_idx = i; + } + if tx.effective_fee_rate > block[max_idx].effective_fee_rate { + max_idx = i; + } + } + (min_idx, max_idx) +} + /// Update descendants' ancestor scores after selecting a tx. /// Takes a pool index. fn update_descendants( diff --git a/crates/brk_monitor/src/mempool/monitor.rs b/crates/brk_monitor/src/mempool/monitor.rs index 8e7083bef..6729d2af0 100644 --- a/crates/brk_monitor/src/mempool/monitor.rs +++ b/crates/brk_monitor/src/mempool/monitor.rs @@ -133,7 +133,7 @@ impl MempoolInner { let new_txs = self.fetch_new_txs(¤t_txids); // Apply changes using the entry info (has fees) and full txs (for addresses) - let has_changes = self.apply_changes(&entries_info, &new_txs); + let has_changes = self.apply_changes(&entries_info, new_txs); if has_changes { self.dirty.store(true, Ordering::Release); @@ -146,13 +146,19 @@ impl MempoolInner { /// Fetch full transaction data for new txids (needed for address tracking). fn fetch_new_txs(&self, current_txids: &FxHashSet) -> FxHashMap { - let txs = self.txs.read(); - current_txids - .iter() - .filter(|txid| !txs.contains_key(*txid)) - .take(MAX_TX_FETCHES_PER_CYCLE) - .cloned() - .collect::>() + // Collect txids to fetch while holding read lock, then release it + let txids_to_fetch: Vec = { + let txs = self.txs.read(); + current_txids + .iter() + .filter(|txid| !txs.contains_key(*txid)) + .take(MAX_TX_FETCHES_PER_CYCLE) + .cloned() + .collect() + }; + + // Make RPC calls without holding the lock + txids_to_fetch .into_iter() .filter_map(|txid| { self.client @@ -167,7 +173,7 @@ impl MempoolInner { fn apply_changes( &self, entries_info: &[MempoolEntryInfo], - new_txs: &FxHashMap, + new_txs: FxHashMap, ) -> bool { // Build lookup map for current entries let current_entries: FxHashMap = entries_info @@ -194,7 +200,7 @@ impl MempoolInner { let tx = tx_with_hex.tx(); info.remove(tx); - Self::update_address_stats_on_removal(tx, txid, &mut addresses); + Self::update_address_stats(tx, txid, &mut addresses, false); // Remove from slot-based storage if let Some(idx) = block_state.txid_prefix_to_idx.remove(&prefix) { @@ -208,7 +214,7 @@ impl MempoolInner { }); // Add new transactions - for (txid, tx_with_hex) in new_txs { + for (txid, tx_with_hex) in &new_txs { let tx = tx_with_hex.tx(); let prefix = TxidPrefix::from(txid); @@ -220,7 +226,7 @@ impl MempoolInner { let entry = MempoolEntry::from_info(entry_info); info.add(tx); - Self::update_address_stats_on_addition(tx, txid, &mut addresses); + Self::update_address_stats(tx, txid, &mut addresses, true); // Allocate slot let idx = if let Some(idx) = block_state.free_indices.pop() { @@ -234,7 +240,7 @@ impl MempoolInner { block_state.txid_prefix_to_idx.insert(prefix, idx); } - txs.extend(new_txs.clone()); + txs.extend(new_txs); had_removals || had_additions } @@ -281,60 +287,42 @@ impl MempoolInner { *self.snapshot.write() = snapshot; } - fn update_address_stats_on_removal( + fn update_address_stats( tx: &brk_types::Transaction, txid: &Txid, addresses: &mut FxHashMap)>, + is_addition: bool, ) { - // Inputs: undo "sending" state + // Inputs: track sending tx.input .iter() .flat_map(|txin| txin.prevout.as_ref()) .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) .for_each(|(txout, bytes)| { let (stats, set) = addresses.entry(bytes).or_default(); - set.remove(txid); - stats.sent(txout); + if is_addition { + set.insert(txid.clone()); + stats.sending(txout); + } else { + set.remove(txid); + stats.sent(txout); + } stats.update_tx_count(set.len() as u32); }); - // Outputs: undo "receiving" state + // Outputs: track receiving tx.output .iter() .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) .for_each(|(txout, bytes)| { let (stats, set) = addresses.entry(bytes).or_default(); - set.remove(txid); - stats.received(txout); - stats.update_tx_count(set.len() as u32); - }); - } - - fn update_address_stats_on_addition( - tx: &brk_types::Transaction, - txid: &Txid, - addresses: &mut FxHashMap)>, - ) { - // Inputs: mark as "sending" - tx.input - .iter() - .flat_map(|txin| txin.prevout.as_ref()) - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - set.insert(txid.clone()); - stats.sending(txout); - stats.update_tx_count(set.len() as u32); - }); - - // Outputs: mark as "receiving" - tx.output - .iter() - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - set.insert(txid.clone()); - stats.receiving(txout); + if is_addition { + set.insert(txid.clone()); + stats.receiving(txout); + } else { + set.remove(txid); + stats.received(txout); + } stats.update_tx_count(set.len() as u32); }); }