From d2b8992932d8717dc4f1c83e12a1835d50eaa97f Mon Sep 17 00:00:00 2001 From: nym21 Date: Thu, 7 May 2026 18:56:44 +0200 Subject: [PATCH] mempool: cleanups --- crates/brk_mempool/examples/mempool.rs | 49 ++------- crates/brk_mempool/src/cpfp.rs | 53 ++++----- crates/brk_mempool/src/diagnostics.rs | 34 ++++++ crates/brk_mempool/src/inner.rs | 19 ---- crates/brk_mempool/src/lib.rs | 104 +++++++----------- crates/brk_mempool/src/prevouts.rs | 20 ++-- crates/brk_mempool/src/state.rs | 35 ++++++ crates/brk_mempool/src/steps/applier.rs | 52 ++++----- crates/brk_mempool/src/steps/fetcher/mod.rs | 8 +- crates/brk_mempool/src/steps/preparer/mod.rs | 12 +- crates/brk_mempool/src/steps/rebuilder/mod.rs | 28 ++--- .../src/steps/rebuilder/snapshot/mod.rs | 4 - crates/brk_mempool/src/stores/mod.rs | 4 +- 13 files changed, 196 insertions(+), 226 deletions(-) create mode 100644 crates/brk_mempool/src/diagnostics.rs delete mode 100644 crates/brk_mempool/src/inner.rs create mode 100644 crates/brk_mempool/src/state.rs diff --git a/crates/brk_mempool/examples/mempool.rs b/crates/brk_mempool/examples/mempool.rs index ebf9c032d..85b1873b5 100644 --- a/crates/brk_mempool/examples/mempool.rs +++ b/crates/brk_mempool/examples/mempool.rs @@ -4,31 +4,6 @@ use brk_error::Result; use brk_mempool::Mempool; use brk_rpc::{Auth, Client}; -#[derive(Debug, Clone)] -struct MempoolStats { - info_count: usize, - tx_count: usize, - unresolved_count: usize, - addr_count: usize, - outpoint_spend_count: usize, - graveyard_tombstone_count: usize, - graveyard_order_count: usize, -} - -impl From<&Mempool> for MempoolStats { - fn from(mempool: &Mempool) -> Self { - Self { - info_count: mempool.info().count, - tx_count: mempool.tx_count(), - unresolved_count: mempool.unresolved_count(), - addr_count: mempool.addr_count(), - outpoint_spend_count: mempool.outpoint_spend_count(), - graveyard_tombstone_count: mempool.graveyard_tombstone_count(), - graveyard_order_count: mempool.graveyard_order_count(), - } - } -} - fn main() -> Result<()> { brk_logger::init(None)?; @@ -48,9 +23,9 @@ fn main() -> Result<()> { loop { thread::sleep(Duration::from_secs(5)); - let stats = MempoolStats::from(&mempool); + let info_count = mempool.info().count; + let stats = mempool.stats(); let snapshot = mempool.snapshot(); - let blocks_tx_total: usize = snapshot.blocks.iter().map(|b| b.len()).sum(); println!( @@ -58,18 +33,18 @@ fn main() -> Result<()> { graveyard.tombstones={} graveyard.order={} \ snap.txs.len={} snap.blocks={} snap.blocks_txs={} \ rebuilds={} skip.clean={}", - stats.info_count, - stats.tx_count, - stats.unresolved_count, - stats.addr_count, - stats.outpoint_spend_count, - stats.graveyard_tombstone_count, - stats.graveyard_order_count, - snapshot.txs_len(), + info_count, + stats.txs, + stats.unresolved, + stats.addrs, + stats.outpoint_spends, + stats.graveyard_tombstones, + stats.graveyard_order, + snapshot.txs.len(), snapshot.blocks.len(), blocks_tx_total, - mempool.rebuild_count(), - mempool.skip_clean_count(), + stats.rebuilds, + stats.skip_cleans, ); } } diff --git a/crates/brk_mempool/src/cpfp.rs b/crates/brk_mempool/src/cpfp.rs index d86e77a0e..a681e9b87 100644 --- a/crates/brk_mempool/src/cpfp.rs +++ b/crates/brk_mempool/src/cpfp.rs @@ -5,12 +5,13 @@ //! the proxy fallback). The walk is a pair of capped DFSes, then the //! cluster wire shape is materialized from the visited set. +use std::cmp::Reverse; + use brk_types::{ CpfpCluster, CpfpClusterChunk, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate, SigOps, TxidPrefix, VSize, }; use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet}; -use smallvec::SmallVec; use crate::Mempool; use crate::steps::{SnapTx, TxIndex}; @@ -39,7 +40,7 @@ impl Mempool { } } -pub(crate) fn build_cpfp_info( +fn build_cpfp_info( txs: &[SnapTx], seed_idx: TxIndex, seed: &SnapTx, @@ -158,39 +159,29 @@ fn build_cluster( } } +/// Group cluster members into chunks by descending `chunk_rate`. Cluster +/// size is bounded by `2 * MAX + 1` so a sort-then-fold is cheaper and +/// simpler than a hashmap keyed on `f64` bits. fn chunk_groups( members: &[TxIndex], txs: &[SnapTx], local_of: &FxHashMap, ) -> Vec { - let mut groups: FxHashMap)> = - FxHashMap::with_capacity_and_hasher(members.len(), FxBuildHasher); - let mut order: Vec = Vec::new(); - for &idx in members { - let Some(t) = txs.get(idx.as_usize()) else { - continue; - }; - let key = f64::from(t.chunk_rate).to_bits(); - let local = local_of[&idx]; - groups - .entry(key) - .and_modify(|(_, v)| v.push(local)) - .or_insert_with(|| { - order.push(key); - let mut v: SmallVec<[CpfpClusterTxIndex; 4]> = SmallVec::new(); - v.push(local); - (t.chunk_rate, v) - }); - } - order.sort_by_key(|k| std::cmp::Reverse(groups[k].0)); - order - .into_iter() - .map(|k| { - let (rate, txs) = groups.remove(&k).unwrap(); - CpfpClusterChunk { - txs: txs.into_vec(), + let mut entries: Vec<(FeeRate, CpfpClusterTxIndex)> = members + .iter() + .filter_map(|&idx| Some((txs.get(idx.as_usize())?.chunk_rate, local_of[&idx]))) + .collect(); + entries.sort_by_key(|e| Reverse(e.0)); + + let mut chunks: Vec = Vec::new(); + for (rate, local) in entries { + match chunks.last_mut() { + Some(last) if last.feerate == rate => last.txs.push(local), + _ => chunks.push(CpfpClusterChunk { + txs: vec![local], feerate: rate, - } - }) - .collect() + }), + } + } + chunks } diff --git a/crates/brk_mempool/src/diagnostics.rs b/crates/brk_mempool/src/diagnostics.rs new file mode 100644 index 000000000..e0812b7d9 --- /dev/null +++ b/crates/brk_mempool/src/diagnostics.rs @@ -0,0 +1,34 @@ +//! Cycle-internal counters surfaced for observability and the +//! `examples/mempool.rs` driver. Captured under a single read guard +//! by `MempoolStats::from(&Mempool)`. + +use crate::Mempool; + +#[derive(Debug, Clone, Default)] +pub struct MempoolStats { + pub txs: usize, + pub unresolved: usize, + pub addrs: usize, + pub outpoint_spends: usize, + pub graveyard_tombstones: usize, + pub graveyard_order: usize, + pub rebuilds: u64, + pub skip_cleans: u64, +} + +impl From<&Mempool> for MempoolStats { + fn from(mempool: &Mempool) -> Self { + let inner = mempool.read(); + let rebuilder = mempool.rebuilder(); + Self { + txs: inner.txs.len(), + unresolved: inner.txs.unresolved().len(), + addrs: inner.addrs.len(), + outpoint_spends: inner.outpoint_spends.len(), + graveyard_tombstones: inner.graveyard.tombstones_len(), + graveyard_order: inner.graveyard.order_len(), + rebuilds: rebuilder.rebuild_count(), + skip_cleans: rebuilder.skip_clean_count(), + } + } +} diff --git a/crates/brk_mempool/src/inner.rs b/crates/brk_mempool/src/inner.rs deleted file mode 100644 index 4f6f16a61..000000000 --- a/crates/brk_mempool/src/inner.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Single-locked container for the live mempool. -//! -//! All cycle steps and read-side accessors take a guard on this one -//! lock. The substructures are plain owned types — they used to each -//! own a RwLock, but the canonical lock-order discipline disappears -//! when there's nothing to order. - -use brk_types::MempoolInfo; - -use crate::stores::{AddrTracker, OutpointSpends, TxGraveyard, TxStore}; - -#[derive(Default)] -pub struct MempoolInner { - pub info: MempoolInfo, - pub txs: TxStore, - pub addrs: AddrTracker, - pub outpoint_spends: OutpointSpends, - pub graveyard: TxGraveyard, -} diff --git a/crates/brk_mempool/src/lib.rs b/crates/brk_mempool/src/lib.rs index fc44d4814..0bc21857e 100644 --- a/crates/brk_mempool/src/lib.rs +++ b/crates/brk_mempool/src/lib.rs @@ -10,7 +10,7 @@ //! 2. [`steps::preparer::Preparer`] - decode and classify into //! `TxsPulled { added, removed }`. Pure CPU. //! 3. [`steps::applier::Applier`] - apply the diff to -//! [`inner::MempoolInner`] under a single write lock. +//! [`state::State`] under a single write lock. //! 4. [`prevouts::fill`] - fills `prevout: None` inputs in one pass, //! using same-cycle in-mempool parents directly and the //! caller-supplied resolver (default: `getrawtransaction`) for @@ -19,6 +19,7 @@ //! projected-blocks `Snapshot` from the same-cycle GBT and min fee. use std::{ + cmp::Reverse, panic::{AssertUnwindSafe, catch_unwind}, sync::Arc, thread, @@ -35,12 +36,14 @@ use parking_lot::{RwLock, RwLockReadGuard}; use tracing::error; mod cpfp; -mod inner; +mod diagnostics; mod prevouts; mod rbf; +mod state; pub(crate) mod steps; pub(crate) mod stores; +pub use diagnostics::MempoolStats; pub use rbf::{RbfForTx, RbfNode}; use steps::{Applier, Fetched, Fetcher, Preparer, Rebuilder}; pub use steps::{BlockStats, RecommendedFees, Snapshot, TxEntry, TxRemoval}; @@ -51,23 +54,23 @@ pub use stores::{TxGraveyard, TxStore, TxTombstone}; /// `TxOut` if the parent is reachable, `None` otherwise. pub type PrevoutResolver = Box Option + Send + Sync>; -pub(crate) use inner::MempoolInner; +pub(crate) use state::State; /// Cheaply cloneable: clones share one live mempool via `Arc`. #[derive(Clone)] -pub struct Mempool(Arc); +pub struct Mempool(Arc); -struct Inner { +struct Shared { client: Client, - lock: RwLock, + state: RwLock, rebuilder: Rebuilder, } impl Mempool { pub fn new(client: &Client) -> Self { - Self(Arc::new(Inner { + Self(Arc::new(Shared { client: client.clone(), - lock: RwLock::new(MempoolInner::default()), + state: RwLock::new(State::default()), rebuilder: Rebuilder::default(), })) } @@ -80,12 +83,13 @@ impl Mempool { self.0.rebuilder.snapshot() } - pub fn rebuild_count(&self) -> u64 { - self.0.rebuilder.rebuild_count() + /// One-shot diagnostic counters captured under a single read guard. + pub fn stats(&self) -> MempoolStats { + MempoolStats::from(self) } - pub fn skip_clean_count(&self) -> u64 { - self.0.rebuilder.skip_clean_count() + pub(crate) fn rebuilder(&self) -> &Rebuilder { + &self.0.rebuilder } pub fn fees(&self) -> RecommendedFees { @@ -108,9 +112,9 @@ impl Mempool { /// input list is walked to rule out `TxidPrefix` collisions. pub fn lookup_spender(&self, txid: &Txid, vout: Vout) -> Option<(Txid, Vin)> { let key = OutpointPrefix::new(TxidPrefix::from(txid), vout); - let inner = self.read(); - let spender_prefix = inner.outpoint_spends.get(&key)?; - let spender = inner.txs.record_by_prefix(&spender_prefix)?; + let state = self.read(); + let spender_prefix = state.outpoint_spends.get(&key)?; + let spender = state.txs.record_by_prefix(&spender_prefix)?; let vin_pos = spender .tx .input @@ -119,32 +123,8 @@ impl Mempool { Some((spender.entry.txid, Vin::from(vin_pos))) } - pub(crate) fn read(&self) -> RwLockReadGuard<'_, MempoolInner> { - self.0.lock.read() - } - - pub fn tx_count(&self) -> usize { - self.read().txs.len() - } - - pub fn unresolved_count(&self) -> usize { - self.read().txs.unresolved().len() - } - - pub fn addr_count(&self) -> usize { - self.read().addrs.len() - } - - pub fn outpoint_spend_count(&self) -> usize { - self.read().outpoint_spends.len() - } - - pub fn graveyard_tombstone_count(&self) -> usize { - self.read().graveyard.tombstones_len() - } - - pub fn graveyard_order_count(&self) -> usize { - self.read().graveyard.order_len() + pub(crate) fn read(&self) -> RwLockReadGuard<'_, State> { + self.0.state.read() } pub fn contains_txid(&self, txid: &Txid) -> bool { @@ -159,8 +139,8 @@ impl Mempool { /// Apply `f` to a `Vanished` tombstone's tx body if present. /// `Replaced` tombstones return `None` because the tx will not confirm. pub fn with_vanished_tx(&self, txid: &Txid, f: impl FnOnce(&Transaction) -> R) -> Option { - let inner = self.read(); - let tomb = inner.graveyard.get(txid)?; + let state = self.read(); + let tomb = state.graveyard.get(txid)?; matches!(tomb.reason(), TxRemoval::Vanished).then(|| f(&tomb.tx)) } @@ -182,15 +162,15 @@ impl Mempool { /// Live mempool txs touching `addr`, newest first by `first_seen`, /// capped at `limit`. Returns owned `Transaction`s. pub fn addr_txs(&self, addr: &AddrBytes, limit: usize) -> Vec { - let inner = self.read(); - let Some(entry) = inner.addrs.get(addr) else { + let state = self.read(); + let Some(entry) = state.addrs.get(addr) else { return vec![]; }; let mut ordered: Vec<(Timestamp, &Txid)> = entry .txids .iter() .map(|txid| { - let first_seen = inner + let first_seen = state .txs .entry(txid) .map(|e| e.first_seen) @@ -198,10 +178,10 @@ impl Mempool { (first_seen, txid) }) .collect(); - ordered.sort_unstable_by_key(|b| std::cmp::Reverse(b.0)); + ordered.sort_unstable_by_key(|b| Reverse(b.0)); ordered .into_iter() - .filter_map(|(_, txid)| inner.txs.get(txid).cloned()) + .filter_map(|(_, txid)| state.txs.get(txid).cloned()) .take(limit) .collect() } @@ -247,20 +227,10 @@ impl Mempool { /// the buried entry's `first_seen` to avoid flicker between drop /// and indexer catch-up. pub fn transaction_times(&self, txids: &[Txid]) -> Vec { - let inner = self.read(); + let state = self.read(); txids .iter() - .map(|txid| { - if let Some(e) = inner.txs.entry(txid) { - return u64::from(e.first_seen); - } - if let Some(tomb) = inner.graveyard.get(txid) - && matches!(tomb.reason(), TxRemoval::Vanished) - { - return u64::from(tomb.entry.first_seen); - } - 0 - }) + .map(|txid| state.first_seen(txid).map_or(0, u64::from)) .collect() } @@ -314,9 +284,9 @@ impl Mempool { where F: Fn(&Txid, Vout) -> Option, { - let Inner { + let Shared { client, - lock, + state, rebuilder, } = &*self.0; @@ -325,14 +295,14 @@ impl Mempool { new_raws, gbt, min_fee, - }) = Fetcher::fetch(client, lock)? + }) = Fetcher::fetch(client, state)? else { return Ok(()); }; - let pulled = Preparer::prepare(entries_info, new_raws, lock); - let changed = Applier::apply(lock, pulled); - prevouts::fill(lock, resolver); - rebuilder.tick(lock, changed, &gbt, min_fee); + let pulled = Preparer::prepare(entries_info, new_raws, state); + let changed = Applier::apply(state, pulled); + prevouts::fill(state, resolver); + rebuilder.tick(state, changed, &gbt, min_fee); Ok(()) } diff --git a/crates/brk_mempool/src/prevouts.rs b/crates/brk_mempool/src/prevouts.rs index 4a6afb002..6c0b53444 100644 --- a/crates/brk_mempool/src/prevouts.rs +++ b/crates/brk_mempool/src/prevouts.rs @@ -25,7 +25,7 @@ use brk_types::{TxOut, Txid, TxidPrefix, Vin, Vout}; use parking_lot::RwLock; use tracing::warn; -use crate::{MempoolInner, stores::TxStore}; +use crate::{State, stores::TxStore}; /// Default resolver: per-call `getrawtransaction` against the bitcoind /// RPC client `Mempool` already holds. Requires `txindex=1`. On any @@ -61,13 +61,13 @@ type HoleBatch = Vec<(TxidPrefix, Txid, Holes)>; /// in-mempool parents are filled lock-locally; the remainder go /// through `resolver` outside any lock. Returns true iff anything /// was written. -pub(crate) fn fill(lock: &RwLock, resolver: F) -> bool +pub(crate) fn fill(lock: &RwLock, resolver: F) -> bool where F: Fn(&Txid, Vout) -> Option, { let (in_mempool, holes) = { - let inner = lock.read(); - gather(&inner.txs) + let state = lock.read(); + gather(&state.txs) }; let external = resolve_external(holes, resolver); @@ -75,9 +75,9 @@ where return false; } - let mut inner = lock.write(); - write_fills(&mut inner, in_mempool); - write_fills(&mut inner, external); + let mut state = lock.write(); + write_fills(&mut state, in_mempool); + write_fills(&mut state, external); true } @@ -136,10 +136,10 @@ where .collect() } -fn write_fills(inner: &mut MempoolInner, fills: FillBatch) { +fn write_fills(state: &mut State, fills: FillBatch) { for (prefix, txid, tx_fills) in fills { - for prevout in inner.txs.apply_fills(&prefix, tx_fills) { - inner.addrs.add_input(&txid, &prevout); + for prevout in state.txs.apply_fills(&prefix, tx_fills) { + state.addrs.add_input(&txid, &prevout); } } } diff --git a/crates/brk_mempool/src/state.rs b/crates/brk_mempool/src/state.rs new file mode 100644 index 000000000..5d4b80bd0 --- /dev/null +++ b/crates/brk_mempool/src/state.rs @@ -0,0 +1,35 @@ +//! Single-locked container for the live mempool. +//! +//! All cycle steps and read-side accessors take a guard on this one +//! lock. The substructures are plain owned types — they used to each +//! own a RwLock, but the canonical lock-order discipline disappears +//! when there's nothing to order. + +use brk_types::{MempoolInfo, Timestamp, Txid}; + +use crate::{ + TxRemoval, + stores::{AddrTracker, OutpointSpends, TxGraveyard, TxStore}, +}; + +#[derive(Default)] +pub struct State { + pub info: MempoolInfo, + pub txs: TxStore, + pub addrs: AddrTracker, + pub outpoint_spends: OutpointSpends, + pub graveyard: TxGraveyard, +} + +impl State { + /// `first_seen` for a tx that's live or in a `Vanished` tombstone. + /// Smooths the flicker between drop and indexer catch-up; `Replaced` + /// tombstones are excluded since the tx will not confirm. + pub fn first_seen(&self, txid: &Txid) -> Option { + if let Some(e) = self.txs.entry(txid) { + return Some(e.first_seen); + } + let tomb = self.graveyard.get(txid)?; + matches!(tomb.reason(), TxRemoval::Vanished).then_some(tomb.entry.first_seen) + } +} diff --git a/crates/brk_mempool/src/steps/applier.rs b/crates/brk_mempool/src/steps/applier.rs index f056f3db4..2db118164 100644 --- a/crates/brk_mempool/src/steps/applier.rs +++ b/crates/brk_mempool/src/steps/applier.rs @@ -2,8 +2,7 @@ use brk_types::{Transaction, TxidPrefix}; use parking_lot::RwLock; use crate::{ - TxEntry, TxRemoval, - inner::MempoolInner, + State, TxEntry, TxRemoval, steps::preparer::{TxAddition, TxsPulled}, }; @@ -13,61 +12,58 @@ pub struct Applier; impl Applier { /// Returns true iff anything changed. - pub fn apply(lock: &RwLock, pulled: TxsPulled) -> bool { + pub fn apply(lock: &RwLock, pulled: TxsPulled) -> bool { let TxsPulled { added, removed } = pulled; let has_changes = !added.is_empty() || !removed.is_empty(); - let mut inner = lock.write(); - Self::bury_removals(&mut inner, removed); - Self::publish_additions(&mut inner, added); - inner.graveyard.evict_old(); + let mut state = lock.write(); + Self::bury_removals(&mut state, removed); + Self::publish_additions(&mut state, added); + state.graveyard.evict_old(); has_changes } - fn bury_removals(inner: &mut MempoolInner, removed: Vec<(TxidPrefix, TxRemoval)>) { + fn bury_removals(state: &mut State, removed: Vec<(TxidPrefix, TxRemoval)>) { for (prefix, reason) in removed { - Self::bury_one(inner, &prefix, reason); + Self::bury_one(state, &prefix, reason); } } - fn bury_one(inner: &mut MempoolInner, prefix: &TxidPrefix, reason: TxRemoval) { - let Some(record) = inner.txs.remove_by_prefix(prefix) else { + fn bury_one(state: &mut State, prefix: &TxidPrefix, reason: TxRemoval) { + let Some(record) = state.txs.remove_by_prefix(prefix) else { return; }; let txid = record.entry.txid; - inner.info.remove(&record.tx, record.entry.fee); - inner.addrs.remove_tx(&record.tx, &txid); - inner.outpoint_spends.remove_spends(&record.tx, *prefix); - inner.graveyard.bury(txid, record.tx, record.entry, reason); + state.info.remove(&record.tx, record.entry.fee); + state.addrs.remove_tx(&record.tx, &txid); + state.outpoint_spends.remove_spends(&record.tx, *prefix); + state.graveyard.bury(txid, record.tx, record.entry, reason); } - fn publish_additions(inner: &mut MempoolInner, added: Vec) { + fn publish_additions(state: &mut State, added: Vec) { for addition in added { - if let Some((tx, entry)) = Self::resolve_addition(inner, addition) { - Self::publish_one(inner, tx, entry); + if let Some((tx, entry)) = Self::resolve_addition(state, addition) { + Self::publish_one(state, tx, entry); } } } - fn resolve_addition( - inner: &mut MempoolInner, - addition: TxAddition, - ) -> Option<(Transaction, TxEntry)> { + fn resolve_addition(state: &mut State, addition: TxAddition) -> Option<(Transaction, TxEntry)> { match addition { TxAddition::Fresh { tx, entry } => Some((tx, entry)), TxAddition::Revived { entry } => { - let tomb = inner.graveyard.exhume(&entry.txid)?; + let tomb = state.graveyard.exhume(&entry.txid)?; Some((tomb.tx, entry)) } } } - fn publish_one(inner: &mut MempoolInner, tx: Transaction, entry: TxEntry) { + fn publish_one(state: &mut State, tx: Transaction, entry: TxEntry) { let prefix = entry.txid_prefix(); - inner.info.add(&tx, entry.fee); - inner.addrs.add_tx(&tx, &entry.txid); - inner.outpoint_spends.insert_spends(&tx, prefix); - inner.txs.insert(tx, entry); + state.info.add(&tx, entry.fee); + state.addrs.add_tx(&tx, &entry.txid); + state.outpoint_spends.insert_spends(&tx, prefix); + state.txs.insert(tx, entry); } } diff --git a/crates/brk_mempool/src/steps/fetcher/mod.rs b/crates/brk_mempool/src/steps/fetcher/mod.rs index 889e4ed37..720425467 100644 --- a/crates/brk_mempool/src/steps/fetcher/mod.rs +++ b/crates/brk_mempool/src/steps/fetcher/mod.rs @@ -8,7 +8,7 @@ use brk_types::{MempoolEntryInfo, Txid}; use parking_lot::RwLock; use crate::{ - MempoolInner, + State, stores::{TxGraveyard, TxStore}, }; @@ -29,7 +29,7 @@ const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000; pub struct Fetcher; impl Fetcher { - pub fn fetch(client: &Client, lock: &RwLock) -> Result> { + pub fn fetch(client: &Client, lock: &RwLock) -> Result> { let Some(MempoolState { entries, gbt, @@ -39,8 +39,8 @@ impl Fetcher { return Ok(None); }; let new_txids = { - let inner = lock.read(); - Self::new_txids(&entries, &inner.txs, &inner.graveyard) + let state = lock.read(); + Self::new_txids(&entries, &state.txs, &state.graveyard) }; let new_raws = client.get_raw_transactions(&new_txids)?; Ok(Some(Fetched { diff --git a/crates/brk_mempool/src/steps/preparer/mod.rs b/crates/brk_mempool/src/steps/preparer/mod.rs index ff1a6ce39..f28be4330 100644 --- a/crates/brk_mempool/src/steps/preparer/mod.rs +++ b/crates/brk_mempool/src/steps/preparer/mod.rs @@ -1,5 +1,5 @@ //! Turn `Fetched` raws into a typed diff for the Applier. Pure CPU, -//! holds a read guard on `MempoolInner` for the cycle. New txs are +//! holds a read guard on `State` for the cycle. New txs are //! classified into three buckets: //! //! - **live** - already in `known`, skipped. @@ -17,7 +17,7 @@ use parking_lot::RwLock; use rustc_hash::{FxHashMap, FxHashSet}; use crate::{ - MempoolInner, + State, stores::{TxGraveyard, TxStore}, }; @@ -37,13 +37,13 @@ impl Preparer { pub fn prepare( entries_info: Vec, new_raws: FxHashMap, - lock: &RwLock, + lock: &RwLock, ) -> TxsPulled { - let inner = lock.read(); + let state = lock.read(); let live = Self::live_set(&entries_info); - let added = Self::classify_additions(entries_info, new_raws, &inner.txs, &inner.graveyard); - let removed = TxRemoval::classify(&live, &added, &inner.txs); + let added = Self::classify_additions(entries_info, new_raws, &state.txs, &state.graveyard); + let removed = TxRemoval::classify(&live, &added, &state.txs); TxsPulled { added, removed } } diff --git a/crates/brk_mempool/src/steps/rebuilder/mod.rs b/crates/brk_mempool/src/steps/rebuilder/mod.rs index 2faa3b6e3..1785f45c8 100644 --- a/crates/brk_mempool/src/steps/rebuilder/mod.rs +++ b/crates/brk_mempool/src/steps/rebuilder/mod.rs @@ -8,7 +8,7 @@ use brk_types::{FeeRate, TxidPrefix}; use parking_lot::RwLock; use rustc_hash::FxHashSet; -use crate::inner::MempoolInner; +use crate::State; use partition::Partitioner; use snapshot::{PrefixIndex, builder}; @@ -32,10 +32,12 @@ pub struct Rebuilder { impl Rebuilder { /// Mark dirty if the cycle changed mempool state, then rebuild iff /// the dirty bit is set. Cycle pacing is the driver loop's job; the - /// rebuild itself is pure CPU on already-fetched data. + /// rebuild itself is pure CPU on already-fetched data. The dirty + /// bit is cleared only after the snapshot is published, so a panic + /// in `build_snapshot` retries on the next cycle. pub fn tick( &self, - lock: &RwLock, + lock: &RwLock, changed: bool, gbt: &[BlockTemplateTx], min_fee: FeeRate, @@ -43,7 +45,8 @@ impl Rebuilder { if changed { self.dirty.store(true, Ordering::Release); } - if !self.try_claim_rebuild() { + if !self.dirty.load(Ordering::Acquire) { + self.skip_clean.fetch_add(1, Ordering::Relaxed); return; } *self.snapshot.write() = Arc::new(Self::build_snapshot(lock, gbt, min_fee)); @@ -60,13 +63,13 @@ impl Rebuilder { } fn build_snapshot( - lock: &RwLock, + lock: &RwLock, gbt: &[BlockTemplateTx], min_fee: FeeRate, ) -> Snapshot { let (txs, prefix_to_idx) = { - let inner = lock.read(); - builder::build_txs(&inner.txs) + let state = lock.read(); + builder::build_txs(&state.txs) }; let block0 = Self::block_from_gbt(gbt, &prefix_to_idx); @@ -94,15 +97,4 @@ impl Rebuilder { pub fn snapshot(&self) -> Arc { self.snapshot.read().clone() } - - /// True iff dirty. The dirty bit is cleared in `tick` only after - /// the snapshot is published, so a panic in `build_snapshot` - /// retries on the next cycle. - fn try_claim_rebuild(&self) -> bool { - if !self.dirty.load(Ordering::Acquire) { - self.skip_clean.fetch_add(1, Ordering::Relaxed); - return false; - } - true - } } diff --git a/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs b/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs index 05ecf65b4..79e2e5465 100644 --- a/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs +++ b/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs @@ -87,10 +87,6 @@ impl Snapshot { self.prefix_to_idx.get(prefix).copied() } - pub fn txs_len(&self) -> usize { - self.txs.len() - } - /// Effective chunk rate for a live tx by prefix, or `None` if the /// tx isn't in this snapshot. pub fn chunk_rate_for(&self, prefix: &TxidPrefix) -> Option { diff --git a/crates/brk_mempool/src/stores/mod.rs b/crates/brk_mempool/src/stores/mod.rs index 4251dcb0d..63db69cad 100644 --- a/crates/brk_mempool/src/stores/mod.rs +++ b/crates/brk_mempool/src/stores/mod.rs @@ -1,6 +1,6 @@ //! Stateful in-memory holders. After Phase 3 they're plain owned -//! types (no internal locks) — `MempoolInner` aggregates them under a -//! single `RwLock` in `crate::inner`. +//! types (no internal locks) — `State` aggregates them under a +//! single `RwLock` in `crate::state`. pub mod addr_tracker; pub(crate) mod outpoint_spends;