mempool: snapshot partial

This commit is contained in:
nym21
2025-12-13 16:42:54 +01:00
parent c59ac62e45
commit c5e9b75261
2 changed files with 103 additions and 94 deletions
@@ -149,32 +149,29 @@ fn pick_best_candidate(
}
/// Select a tx and all its unselected ancestors (topological order).
/// Takes and returns pool indices.
/// Returns pool indices with parents before children.
fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec<PoolIndex> {
let mut to_select: Vec<PoolIndex> = Vec::new();
let mut stack = vec![pool_idx];
// DFS to find all unselected ancestors
while let Some(current) = stack.pop() {
let tx = &pool[current];
if tx.used {
// Stack entries: (pool_idx, parents_processed)
let mut stack: Vec<(PoolIndex, bool)> = vec![(pool_idx, false)];
while let Some((current, parents_processed)) = stack.pop() {
if pool[current].used {
continue;
}
// Push unselected parents onto stack (process parents first)
let has_unselected_parents = tx.parents.iter().any(|&p| !pool[p].used);
if has_unselected_parents {
stack.push(current); // Re-add self to process after parents
for &parent in &tx.parents {
if !pool[parent].used {
stack.push(parent);
}
}
if parents_processed {
// All parents handled, select this tx
pool[current].used = true;
to_select.push(current);
} else {
// All parents selected, can select this one
if !pool[current].used {
pool[current].used = true;
to_select.push(current);
// First visit: push self for post-processing, then push parents
stack.push((current, true));
for &parent in &pool[current].parents {
if !pool[parent].used {
stack.push((parent, false));
}
}
}
}
@@ -183,50 +180,56 @@ fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec<PoolIndex>
}
/// Fix fee rate ordering violations between blocks.
/// Swaps txs between adjacent blocks until Block N's min >= Block N+1's max.
/// Ensures Block[i].min >= Block[i+1].max for all adjacent blocks.
///
/// Uses cached min/max indices to avoid O(n) scans on each iteration.
fn fix_block_ordering(blocks: &mut [Vec<SelectedTx>]) {
// Iterate until no more swaps needed
let mut changed = true;
let mut iterations = 0;
const MAX_ITERATIONS: usize = 100; // Prevent infinite loops
if blocks.len() < 2 {
return;
}
while changed && iterations < MAX_ITERATIONS {
changed = false;
// Cache (min_idx, max_idx) for each block
let mut cache: Vec<(usize, usize)> = blocks
.iter()
.map(|block| find_min_max_indices(block))
.collect();
let mut iterations = 0;
const MAX_ITERATIONS: usize = 100;
loop {
let mut changed = false;
iterations += 1;
for i in 0..blocks.len().saturating_sub(1) {
// Find min in block i and max in block i+1
let Some(curr_min_idx) = blocks[i]
.iter()
.enumerate()
.min_by_key(|(_, s)| s.effective_fee_rate)
.map(|(idx, _)| idx)
else {
continue;
};
for i in 0..blocks.len() - 1 {
let (curr_min_idx, _) = cache[i];
let (_, next_max_idx) = cache[i + 1];
let Some(next_max_idx) = blocks[i + 1]
.iter()
.enumerate()
.max_by_key(|(_, s)| s.effective_fee_rate)
.map(|(idx, _)| idx)
else {
// Skip empty blocks
if blocks[i].is_empty() || blocks[i + 1].is_empty() {
continue;
};
}
let curr_min = blocks[i][curr_min_idx].effective_fee_rate;
let next_max = blocks[i + 1][next_max_idx].effective_fee_rate;
// If violation exists, swap the two txs
if next_max > curr_min {
// Swap: move high-fee tx to earlier block, low-fee tx to later block
// Swap: high-fee tx to earlier block, low-fee tx to later block
let high_tx = blocks[i + 1].swap_remove(next_max_idx);
let low_tx = blocks[i].swap_remove(curr_min_idx);
blocks[i].push(high_tx);
blocks[i + 1].push(low_tx);
// Recompute cache only for affected blocks
cache[i] = find_min_max_indices(&blocks[i]);
cache[i + 1] = find_min_max_indices(&blocks[i + 1]);
changed = true;
}
}
if !changed || iterations >= MAX_ITERATIONS {
break;
}
}
if iterations >= MAX_ITERATIONS {
@@ -234,6 +237,24 @@ fn fix_block_ordering(blocks: &mut [Vec<SelectedTx>]) {
}
}
/// Find indices of min and max fee rate transactions in a block.
fn find_min_max_indices(block: &[SelectedTx]) -> (usize, usize) {
if block.is_empty() {
return (0, 0);
}
let mut min_idx = 0;
let mut max_idx = 0;
for (i, tx) in block.iter().enumerate().skip(1) {
if tx.effective_fee_rate < block[min_idx].effective_fee_rate {
min_idx = i;
}
if tx.effective_fee_rate > block[max_idx].effective_fee_rate {
max_idx = i;
}
}
(min_idx, max_idx)
}
/// Update descendants' ancestor scores after selecting a tx.
/// Takes a pool index.
fn update_descendants(
+37 -49
View File
@@ -133,7 +133,7 @@ impl MempoolInner {
let new_txs = self.fetch_new_txs(&current_txids);
// Apply changes using the entry info (has fees) and full txs (for addresses)
let has_changes = self.apply_changes(&entries_info, &new_txs);
let has_changes = self.apply_changes(&entries_info, new_txs);
if has_changes {
self.dirty.store(true, Ordering::Release);
@@ -146,13 +146,19 @@ impl MempoolInner {
/// Fetch full transaction data for new txids (needed for address tracking).
fn fetch_new_txs(&self, current_txids: &FxHashSet<Txid>) -> FxHashMap<Txid, TxWithHex> {
let txs = self.txs.read();
current_txids
.iter()
.filter(|txid| !txs.contains_key(*txid))
.take(MAX_TX_FETCHES_PER_CYCLE)
.cloned()
.collect::<Vec<_>>()
// Collect txids to fetch while holding read lock, then release it
let txids_to_fetch: Vec<Txid> = {
let txs = self.txs.read();
current_txids
.iter()
.filter(|txid| !txs.contains_key(*txid))
.take(MAX_TX_FETCHES_PER_CYCLE)
.cloned()
.collect()
};
// Make RPC calls without holding the lock
txids_to_fetch
.into_iter()
.filter_map(|txid| {
self.client
@@ -167,7 +173,7 @@ impl MempoolInner {
fn apply_changes(
&self,
entries_info: &[MempoolEntryInfo],
new_txs: &FxHashMap<Txid, TxWithHex>,
new_txs: FxHashMap<Txid, TxWithHex>,
) -> bool {
// Build lookup map for current entries
let current_entries: FxHashMap<TxidPrefix, &MempoolEntryInfo> = entries_info
@@ -194,7 +200,7 @@ impl MempoolInner {
let tx = tx_with_hex.tx();
info.remove(tx);
Self::update_address_stats_on_removal(tx, txid, &mut addresses);
Self::update_address_stats(tx, txid, &mut addresses, false);
// Remove from slot-based storage
if let Some(idx) = block_state.txid_prefix_to_idx.remove(&prefix) {
@@ -208,7 +214,7 @@ impl MempoolInner {
});
// Add new transactions
for (txid, tx_with_hex) in new_txs {
for (txid, tx_with_hex) in &new_txs {
let tx = tx_with_hex.tx();
let prefix = TxidPrefix::from(txid);
@@ -220,7 +226,7 @@ impl MempoolInner {
let entry = MempoolEntry::from_info(entry_info);
info.add(tx);
Self::update_address_stats_on_addition(tx, txid, &mut addresses);
Self::update_address_stats(tx, txid, &mut addresses, true);
// Allocate slot
let idx = if let Some(idx) = block_state.free_indices.pop() {
@@ -234,7 +240,7 @@ impl MempoolInner {
block_state.txid_prefix_to_idx.insert(prefix, idx);
}
txs.extend(new_txs.clone());
txs.extend(new_txs);
had_removals || had_additions
}
@@ -281,60 +287,42 @@ impl MempoolInner {
*self.snapshot.write() = snapshot;
}
fn update_address_stats_on_removal(
fn update_address_stats(
tx: &brk_types::Transaction,
txid: &Txid,
addresses: &mut FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>,
is_addition: bool,
) {
// Inputs: undo "sending" state
// Inputs: track sending
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.sent(txout);
if is_addition {
set.insert(txid.clone());
stats.sending(txout);
} else {
set.remove(txid);
stats.sent(txout);
}
stats.update_tx_count(set.len() as u32);
});
// Outputs: undo "receiving" state
// Outputs: track receiving
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.received(txout);
stats.update_tx_count(set.len() as u32);
});
}
fn update_address_stats_on_addition(
tx: &brk_types::Transaction,
txid: &Txid,
addresses: &mut FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>,
) {
// Inputs: mark as "sending"
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.sending(txout);
stats.update_tx_count(set.len() as u32);
});
// Outputs: mark as "receiving"
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.receiving(txout);
if is_addition {
set.insert(txid.clone());
stats.receiving(txout);
} else {
set.remove(txid);
stats.received(txout);
}
stats.update_tx_count(set.len() as u32);
});
}