mempool: fixes

This commit is contained in:
nym21
2026-05-10 19:40:02 +02:00
parent dd6eca138b
commit 445c60a6f1
5 changed files with 11 additions and 40 deletions

View File

@@ -32,7 +32,7 @@ fn main() -> Result<()> {
"info.count={} txs={} unresolved={} addrs={} outpoints={} \ "info.count={} txs={} unresolved={} addrs={} outpoints={} \
graveyard.tombstones={} graveyard.order={} \ graveyard.tombstones={} graveyard.order={} \
snap.txs.len={} snap.blocks={} snap.blocks_txs={} \ snap.txs.len={} snap.blocks={} snap.blocks_txs={} \
rebuilds={} skip.clean={}", rebuilds={}",
info_count, info_count,
stats.txs, stats.txs,
stats.unresolved, stats.unresolved,
@@ -44,7 +44,6 @@ fn main() -> Result<()> {
snapshot.blocks.len(), snapshot.blocks.len(),
blocks_tx_total, blocks_tx_total,
stats.rebuilds, stats.rebuilds,
stats.skip_cleans,
); );
} }
} }

View File

@@ -13,7 +13,6 @@ pub struct MempoolStats {
pub graveyard_tombstones: usize, pub graveyard_tombstones: usize,
pub graveyard_order: usize, pub graveyard_order: usize,
pub rebuilds: u64, pub rebuilds: u64,
pub skip_cleans: u64,
} }
impl From<&Mempool> for MempoolStats { impl From<&Mempool> for MempoolStats {
@@ -28,7 +27,6 @@ impl From<&Mempool> for MempoolStats {
graveyard_tombstones: state.graveyard.tombstones_len(), graveyard_tombstones: state.graveyard.tombstones_len(),
graveyard_order: state.graveyard.order_len(), graveyard_order: state.graveyard.order_len(),
rebuilds: rebuilder.rebuild_count(), rebuilds: rebuilder.rebuild_count(),
skip_cleans: rebuilder.skip_clean_count(),
} }
} }
} }

View File

@@ -354,9 +354,9 @@ impl Mempool {
min_fee, min_fee,
} = Fetcher::fetch(client, state)?; } = Fetcher::fetch(client, state)?;
let pulled = Preparer::prepare(&live_txids, new_entries, new_txs, state); let pulled = Preparer::prepare(&live_txids, new_entries, new_txs, state);
let changed = Applier::apply(state, rebuilder, pulled); Applier::apply(state, rebuilder, pulled);
Prevouts::fill(state, resolver); Prevouts::fill(state, resolver);
rebuilder.tick(state, changed, &gbt_txids, min_fee); rebuilder.tick(state, &gbt_txids, min_fee);
Ok(()) Ok(())
} }

View File

@@ -14,23 +14,17 @@ use crate::{
pub struct Applier; pub struct Applier;
impl Applier { impl Applier {
/// Returns true iff anything changed.
///
/// `rebuilder` supplies the previous cycle's snapshot. Burial reads /// `rebuilder` supplies the previous cycle's snapshot. Burial reads
/// each tomb's `chunk_rate` from the snapshot (always-fresh, /// each tomb's `chunk_rate` from the snapshot (always-fresh,
/// package-aware via local linearization). The fallback to /// package-aware via local linearization). The fallback to
/// `entry.fee_rate()` is unreachable in steady state - every burial /// `entry.fee_rate()` is unreachable in steady state - every burial
/// target was alive at the previous tick, so the snapshot has it. /// target was alive at the previous tick, so the snapshot has it.
pub fn apply(lock: &RwLock<State>, rebuilder: &Rebuilder, pulled: TxsPulled) -> bool { pub fn apply(lock: &RwLock<State>, rebuilder: &Rebuilder, pulled: TxsPulled) {
let TxsPulled { added, removed } = pulled; let TxsPulled { added, removed } = pulled;
let has_changes = !added.is_empty() || !removed.is_empty();
let mut state = lock.write(); let mut state = lock.write();
Self::bury_removals(&mut state, rebuilder, removed); Self::bury_removals(&mut state, rebuilder, removed);
Self::publish_additions(&mut state, added); Self::publish_additions(&mut state, added);
state.graveyard.evict_old(); state.graveyard.evict_old();
has_changes
} }
fn bury_removals( fn bury_removals(

View File

@@ -2,7 +2,7 @@ use std::{
collections::VecDeque, collections::VecDeque,
sync::{ sync::{
Arc, Arc,
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
}, },
}; };
@@ -29,31 +29,16 @@ pub struct Rebuilder {
snapshot: RwLock<Arc<Snapshot>>, snapshot: RwLock<Arc<Snapshot>>,
/// Past block-0 txid sets keyed by `next_block_hash`, oldest first. /// Past block-0 txid sets keyed by `next_block_hash`, oldest first.
history: RwLock<VecDeque<(NextBlockHash, FxHashSet<Txid>)>>, history: RwLock<VecDeque<(NextBlockHash, FxHashSet<Txid>)>>,
dirty: AtomicBool,
rebuild_count: AtomicU64, rebuild_count: AtomicU64,
skip_clean: AtomicU64,
} }
impl Rebuilder { impl Rebuilder {
/// Mark dirty if the cycle changed mempool state, then rebuild iff /// Rebuild the snapshot every cycle. The build is pure CPU on
/// the dirty bit is set. Cycle pacing is the driver loop's job; the /// already-fetched data and `min_fee` participates in the result,
/// rebuild itself is pure CPU on already-fetched data. The dirty /// so a "skip if no add/remove" gate would freeze the served fees
/// bit is cleared only after the snapshot is published, so a panic /// when Core's `mempoolminfee` drifts on a quiet pool. Cycle pacing
/// in `build_snapshot` retries on the next cycle. /// is the driver loop's job.
pub fn tick( pub fn tick(&self, lock: &RwLock<State>, gbt_txids: &[Txid], min_fee: FeeRate) {
&self,
lock: &RwLock<State>,
changed: bool,
gbt_txids: &[Txid],
min_fee: FeeRate,
) {
if changed {
self.dirty.store(true, Ordering::Release);
}
if !self.dirty.load(Ordering::Acquire) {
self.skip_clean.fetch_add(1, Ordering::Relaxed);
return;
}
let snap = Self::build_snapshot(lock, gbt_txids, min_fee); let snap = Self::build_snapshot(lock, gbt_txids, min_fee);
let block0_set: FxHashSet<Txid> = snap.block0_txids().collect(); let block0_set: FxHashSet<Txid> = snap.block0_txids().collect();
let next_hash = snap.next_block_hash; let next_hash = snap.next_block_hash;
@@ -67,7 +52,6 @@ impl Rebuilder {
} }
drop(hist); drop(hist);
self.dirty.store(false, Ordering::Release);
self.rebuild_count.fetch_add(1, Ordering::Relaxed); self.rebuild_count.fetch_add(1, Ordering::Relaxed);
} }
@@ -86,10 +70,6 @@ impl Rebuilder {
self.rebuild_count.load(Ordering::Relaxed) self.rebuild_count.load(Ordering::Relaxed)
} }
pub fn skip_clean_count(&self) -> u64 {
self.skip_clean.load(Ordering::Relaxed)
}
fn build_snapshot( fn build_snapshot(
lock: &RwLock<State>, lock: &RwLock<State>,
gbt_txids: &[Txid], gbt_txids: &[Txid],