mempool: cleanups

This commit is contained in:
nym21
2026-05-07 18:56:44 +02:00
parent f4910efd7d
commit d2b8992932
13 changed files with 196 additions and 226 deletions

View File

@@ -4,31 +4,6 @@ use brk_error::Result;
use brk_mempool::Mempool; use brk_mempool::Mempool;
use brk_rpc::{Auth, Client}; use brk_rpc::{Auth, Client};
#[derive(Debug, Clone)]
struct MempoolStats {
info_count: usize,
tx_count: usize,
unresolved_count: usize,
addr_count: usize,
outpoint_spend_count: usize,
graveyard_tombstone_count: usize,
graveyard_order_count: usize,
}
impl From<&Mempool> for MempoolStats {
fn from(mempool: &Mempool) -> Self {
Self {
info_count: mempool.info().count,
tx_count: mempool.tx_count(),
unresolved_count: mempool.unresolved_count(),
addr_count: mempool.addr_count(),
outpoint_spend_count: mempool.outpoint_spend_count(),
graveyard_tombstone_count: mempool.graveyard_tombstone_count(),
graveyard_order_count: mempool.graveyard_order_count(),
}
}
}
fn main() -> Result<()> { fn main() -> Result<()> {
brk_logger::init(None)?; brk_logger::init(None)?;
@@ -48,9 +23,9 @@ fn main() -> Result<()> {
loop { loop {
thread::sleep(Duration::from_secs(5)); thread::sleep(Duration::from_secs(5));
let stats = MempoolStats::from(&mempool); let info_count = mempool.info().count;
let stats = mempool.stats();
let snapshot = mempool.snapshot(); let snapshot = mempool.snapshot();
let blocks_tx_total: usize = snapshot.blocks.iter().map(|b| b.len()).sum(); let blocks_tx_total: usize = snapshot.blocks.iter().map(|b| b.len()).sum();
println!( println!(
@@ -58,18 +33,18 @@ fn main() -> Result<()> {
graveyard.tombstones={} graveyard.order={} \ graveyard.tombstones={} graveyard.order={} \
snap.txs.len={} snap.blocks={} snap.blocks_txs={} \ snap.txs.len={} snap.blocks={} snap.blocks_txs={} \
rebuilds={} skip.clean={}", rebuilds={} skip.clean={}",
stats.info_count, info_count,
stats.tx_count, stats.txs,
stats.unresolved_count, stats.unresolved,
stats.addr_count, stats.addrs,
stats.outpoint_spend_count, stats.outpoint_spends,
stats.graveyard_tombstone_count, stats.graveyard_tombstones,
stats.graveyard_order_count, stats.graveyard_order,
snapshot.txs_len(), snapshot.txs.len(),
snapshot.blocks.len(), snapshot.blocks.len(),
blocks_tx_total, blocks_tx_total,
mempool.rebuild_count(), stats.rebuilds,
mempool.skip_clean_count(), stats.skip_cleans,
); );
} }
} }

View File

@@ -5,12 +5,13 @@
//! the proxy fallback). The walk is a pair of capped DFSes, then the //! the proxy fallback). The walk is a pair of capped DFSes, then the
//! cluster wire shape is materialized from the visited set. //! cluster wire shape is materialized from the visited set.
use std::cmp::Reverse;
use brk_types::{ use brk_types::{
CpfpCluster, CpfpClusterChunk, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate, CpfpCluster, CpfpClusterChunk, CpfpClusterTx, CpfpClusterTxIndex, CpfpEntry, CpfpInfo, FeeRate,
SigOps, TxidPrefix, VSize, SigOps, TxidPrefix, VSize,
}; };
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet}; use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use smallvec::SmallVec;
use crate::Mempool; use crate::Mempool;
use crate::steps::{SnapTx, TxIndex}; use crate::steps::{SnapTx, TxIndex};
@@ -39,7 +40,7 @@ impl Mempool {
} }
} }
pub(crate) fn build_cpfp_info( fn build_cpfp_info(
txs: &[SnapTx], txs: &[SnapTx],
seed_idx: TxIndex, seed_idx: TxIndex,
seed: &SnapTx, seed: &SnapTx,
@@ -158,39 +159,29 @@ fn build_cluster(
} }
} }
/// Group cluster members into chunks by descending `chunk_rate`. Cluster
/// size is bounded by `2 * MAX + 1` so a sort-then-fold is cheaper and
/// simpler than a hashmap keyed on `f64` bits.
fn chunk_groups( fn chunk_groups(
members: &[TxIndex], members: &[TxIndex],
txs: &[SnapTx], txs: &[SnapTx],
local_of: &FxHashMap<TxIndex, CpfpClusterTxIndex>, local_of: &FxHashMap<TxIndex, CpfpClusterTxIndex>,
) -> Vec<CpfpClusterChunk> { ) -> Vec<CpfpClusterChunk> {
let mut groups: FxHashMap<u64, (FeeRate, SmallVec<[CpfpClusterTxIndex; 4]>)> = let mut entries: Vec<(FeeRate, CpfpClusterTxIndex)> = members
FxHashMap::with_capacity_and_hasher(members.len(), FxBuildHasher); .iter()
let mut order: Vec<u64> = Vec::new(); .filter_map(|&idx| Some((txs.get(idx.as_usize())?.chunk_rate, local_of[&idx])))
for &idx in members { .collect();
let Some(t) = txs.get(idx.as_usize()) else { entries.sort_by_key(|e| Reverse(e.0));
continue;
}; let mut chunks: Vec<CpfpClusterChunk> = Vec::new();
let key = f64::from(t.chunk_rate).to_bits(); for (rate, local) in entries {
let local = local_of[&idx]; match chunks.last_mut() {
groups Some(last) if last.feerate == rate => last.txs.push(local),
.entry(key) _ => chunks.push(CpfpClusterChunk {
.and_modify(|(_, v)| v.push(local)) txs: vec![local],
.or_insert_with(|| {
order.push(key);
let mut v: SmallVec<[CpfpClusterTxIndex; 4]> = SmallVec::new();
v.push(local);
(t.chunk_rate, v)
});
}
order.sort_by_key(|k| std::cmp::Reverse(groups[k].0));
order
.into_iter()
.map(|k| {
let (rate, txs) = groups.remove(&k).unwrap();
CpfpClusterChunk {
txs: txs.into_vec(),
feerate: rate, feerate: rate,
} }),
}) }
.collect() }
chunks
} }

View File

@@ -0,0 +1,34 @@
//! Cycle-internal counters surfaced for observability and the
//! `examples/mempool.rs` driver. Captured under a single read guard
//! by `MempoolStats::from(&Mempool)`.
use crate::Mempool;
#[derive(Debug, Clone, Default)]
pub struct MempoolStats {
pub txs: usize,
pub unresolved: usize,
pub addrs: usize,
pub outpoint_spends: usize,
pub graveyard_tombstones: usize,
pub graveyard_order: usize,
pub rebuilds: u64,
pub skip_cleans: u64,
}
impl From<&Mempool> for MempoolStats {
fn from(mempool: &Mempool) -> Self {
let inner = mempool.read();
let rebuilder = mempool.rebuilder();
Self {
txs: inner.txs.len(),
unresolved: inner.txs.unresolved().len(),
addrs: inner.addrs.len(),
outpoint_spends: inner.outpoint_spends.len(),
graveyard_tombstones: inner.graveyard.tombstones_len(),
graveyard_order: inner.graveyard.order_len(),
rebuilds: rebuilder.rebuild_count(),
skip_cleans: rebuilder.skip_clean_count(),
}
}
}

View File

@@ -1,19 +0,0 @@
//! Single-locked container for the live mempool.
//!
//! All cycle steps and read-side accessors take a guard on this one
//! lock. The substructures are plain owned types — they used to each
//! own a RwLock, but the canonical lock-order discipline disappears
//! when there's nothing to order.
use brk_types::MempoolInfo;
use crate::stores::{AddrTracker, OutpointSpends, TxGraveyard, TxStore};
#[derive(Default)]
pub struct MempoolInner {
pub info: MempoolInfo,
pub txs: TxStore,
pub addrs: AddrTracker,
pub outpoint_spends: OutpointSpends,
pub graveyard: TxGraveyard,
}

View File

@@ -10,7 +10,7 @@
//! 2. [`steps::preparer::Preparer`] - decode and classify into //! 2. [`steps::preparer::Preparer`] - decode and classify into
//! `TxsPulled { added, removed }`. Pure CPU. //! `TxsPulled { added, removed }`. Pure CPU.
//! 3. [`steps::applier::Applier`] - apply the diff to //! 3. [`steps::applier::Applier`] - apply the diff to
//! [`inner::MempoolInner`] under a single write lock. //! [`state::State`] under a single write lock.
//! 4. [`prevouts::fill`] - fills `prevout: None` inputs in one pass, //! 4. [`prevouts::fill`] - fills `prevout: None` inputs in one pass,
//! using same-cycle in-mempool parents directly and the //! using same-cycle in-mempool parents directly and the
//! caller-supplied resolver (default: `getrawtransaction`) for //! caller-supplied resolver (default: `getrawtransaction`) for
@@ -19,6 +19,7 @@
//! projected-blocks `Snapshot` from the same-cycle GBT and min fee. //! projected-blocks `Snapshot` from the same-cycle GBT and min fee.
use std::{ use std::{
cmp::Reverse,
panic::{AssertUnwindSafe, catch_unwind}, panic::{AssertUnwindSafe, catch_unwind},
sync::Arc, sync::Arc,
thread, thread,
@@ -35,12 +36,14 @@ use parking_lot::{RwLock, RwLockReadGuard};
use tracing::error; use tracing::error;
mod cpfp; mod cpfp;
mod inner; mod diagnostics;
mod prevouts; mod prevouts;
mod rbf; mod rbf;
mod state;
pub(crate) mod steps; pub(crate) mod steps;
pub(crate) mod stores; pub(crate) mod stores;
pub use diagnostics::MempoolStats;
pub use rbf::{RbfForTx, RbfNode}; pub use rbf::{RbfForTx, RbfNode};
use steps::{Applier, Fetched, Fetcher, Preparer, Rebuilder}; use steps::{Applier, Fetched, Fetcher, Preparer, Rebuilder};
pub use steps::{BlockStats, RecommendedFees, Snapshot, TxEntry, TxRemoval}; pub use steps::{BlockStats, RecommendedFees, Snapshot, TxEntry, TxRemoval};
@@ -51,23 +54,23 @@ pub use stores::{TxGraveyard, TxStore, TxTombstone};
/// `TxOut` if the parent is reachable, `None` otherwise. /// `TxOut` if the parent is reachable, `None` otherwise.
pub type PrevoutResolver = Box<dyn Fn(&Txid, Vout) -> Option<TxOut> + Send + Sync>; pub type PrevoutResolver = Box<dyn Fn(&Txid, Vout) -> Option<TxOut> + Send + Sync>;
pub(crate) use inner::MempoolInner; pub(crate) use state::State;
/// Cheaply cloneable: clones share one live mempool via `Arc`. /// Cheaply cloneable: clones share one live mempool via `Arc`.
#[derive(Clone)] #[derive(Clone)]
pub struct Mempool(Arc<Inner>); pub struct Mempool(Arc<Shared>);
struct Inner { struct Shared {
client: Client, client: Client,
lock: RwLock<MempoolInner>, state: RwLock<State>,
rebuilder: Rebuilder, rebuilder: Rebuilder,
} }
impl Mempool { impl Mempool {
pub fn new(client: &Client) -> Self { pub fn new(client: &Client) -> Self {
Self(Arc::new(Inner { Self(Arc::new(Shared {
client: client.clone(), client: client.clone(),
lock: RwLock::new(MempoolInner::default()), state: RwLock::new(State::default()),
rebuilder: Rebuilder::default(), rebuilder: Rebuilder::default(),
})) }))
} }
@@ -80,12 +83,13 @@ impl Mempool {
self.0.rebuilder.snapshot() self.0.rebuilder.snapshot()
} }
pub fn rebuild_count(&self) -> u64 { /// One-shot diagnostic counters captured under a single read guard.
self.0.rebuilder.rebuild_count() pub fn stats(&self) -> MempoolStats {
MempoolStats::from(self)
} }
pub fn skip_clean_count(&self) -> u64 { pub(crate) fn rebuilder(&self) -> &Rebuilder {
self.0.rebuilder.skip_clean_count() &self.0.rebuilder
} }
pub fn fees(&self) -> RecommendedFees { pub fn fees(&self) -> RecommendedFees {
@@ -108,9 +112,9 @@ impl Mempool {
/// input list is walked to rule out `TxidPrefix` collisions. /// input list is walked to rule out `TxidPrefix` collisions.
pub fn lookup_spender(&self, txid: &Txid, vout: Vout) -> Option<(Txid, Vin)> { pub fn lookup_spender(&self, txid: &Txid, vout: Vout) -> Option<(Txid, Vin)> {
let key = OutpointPrefix::new(TxidPrefix::from(txid), vout); let key = OutpointPrefix::new(TxidPrefix::from(txid), vout);
let inner = self.read(); let state = self.read();
let spender_prefix = inner.outpoint_spends.get(&key)?; let spender_prefix = state.outpoint_spends.get(&key)?;
let spender = inner.txs.record_by_prefix(&spender_prefix)?; let spender = state.txs.record_by_prefix(&spender_prefix)?;
let vin_pos = spender let vin_pos = spender
.tx .tx
.input .input
@@ -119,32 +123,8 @@ impl Mempool {
Some((spender.entry.txid, Vin::from(vin_pos))) Some((spender.entry.txid, Vin::from(vin_pos)))
} }
pub(crate) fn read(&self) -> RwLockReadGuard<'_, MempoolInner> { pub(crate) fn read(&self) -> RwLockReadGuard<'_, State> {
self.0.lock.read() self.0.state.read()
}
pub fn tx_count(&self) -> usize {
self.read().txs.len()
}
pub fn unresolved_count(&self) -> usize {
self.read().txs.unresolved().len()
}
pub fn addr_count(&self) -> usize {
self.read().addrs.len()
}
pub fn outpoint_spend_count(&self) -> usize {
self.read().outpoint_spends.len()
}
pub fn graveyard_tombstone_count(&self) -> usize {
self.read().graveyard.tombstones_len()
}
pub fn graveyard_order_count(&self) -> usize {
self.read().graveyard.order_len()
} }
pub fn contains_txid(&self, txid: &Txid) -> bool { pub fn contains_txid(&self, txid: &Txid) -> bool {
@@ -159,8 +139,8 @@ impl Mempool {
/// Apply `f` to a `Vanished` tombstone's tx body if present. /// Apply `f` to a `Vanished` tombstone's tx body if present.
/// `Replaced` tombstones return `None` because the tx will not confirm. /// `Replaced` tombstones return `None` because the tx will not confirm.
pub fn with_vanished_tx<R>(&self, txid: &Txid, f: impl FnOnce(&Transaction) -> R) -> Option<R> { pub fn with_vanished_tx<R>(&self, txid: &Txid, f: impl FnOnce(&Transaction) -> R) -> Option<R> {
let inner = self.read(); let state = self.read();
let tomb = inner.graveyard.get(txid)?; let tomb = state.graveyard.get(txid)?;
matches!(tomb.reason(), TxRemoval::Vanished).then(|| f(&tomb.tx)) matches!(tomb.reason(), TxRemoval::Vanished).then(|| f(&tomb.tx))
} }
@@ -182,15 +162,15 @@ impl Mempool {
/// Live mempool txs touching `addr`, newest first by `first_seen`, /// Live mempool txs touching `addr`, newest first by `first_seen`,
/// capped at `limit`. Returns owned `Transaction`s. /// capped at `limit`. Returns owned `Transaction`s.
pub fn addr_txs(&self, addr: &AddrBytes, limit: usize) -> Vec<Transaction> { pub fn addr_txs(&self, addr: &AddrBytes, limit: usize) -> Vec<Transaction> {
let inner = self.read(); let state = self.read();
let Some(entry) = inner.addrs.get(addr) else { let Some(entry) = state.addrs.get(addr) else {
return vec![]; return vec![];
}; };
let mut ordered: Vec<(Timestamp, &Txid)> = entry let mut ordered: Vec<(Timestamp, &Txid)> = entry
.txids .txids
.iter() .iter()
.map(|txid| { .map(|txid| {
let first_seen = inner let first_seen = state
.txs .txs
.entry(txid) .entry(txid)
.map(|e| e.first_seen) .map(|e| e.first_seen)
@@ -198,10 +178,10 @@ impl Mempool {
(first_seen, txid) (first_seen, txid)
}) })
.collect(); .collect();
ordered.sort_unstable_by_key(|b| std::cmp::Reverse(b.0)); ordered.sort_unstable_by_key(|b| Reverse(b.0));
ordered ordered
.into_iter() .into_iter()
.filter_map(|(_, txid)| inner.txs.get(txid).cloned()) .filter_map(|(_, txid)| state.txs.get(txid).cloned())
.take(limit) .take(limit)
.collect() .collect()
} }
@@ -247,20 +227,10 @@ impl Mempool {
/// the buried entry's `first_seen` to avoid flicker between drop /// the buried entry's `first_seen` to avoid flicker between drop
/// and indexer catch-up. /// and indexer catch-up.
pub fn transaction_times(&self, txids: &[Txid]) -> Vec<u64> { pub fn transaction_times(&self, txids: &[Txid]) -> Vec<u64> {
let inner = self.read(); let state = self.read();
txids txids
.iter() .iter()
.map(|txid| { .map(|txid| state.first_seen(txid).map_or(0, u64::from))
if let Some(e) = inner.txs.entry(txid) {
return u64::from(e.first_seen);
}
if let Some(tomb) = inner.graveyard.get(txid)
&& matches!(tomb.reason(), TxRemoval::Vanished)
{
return u64::from(tomb.entry.first_seen);
}
0
})
.collect() .collect()
} }
@@ -314,9 +284,9 @@ impl Mempool {
where where
F: Fn(&Txid, Vout) -> Option<TxOut>, F: Fn(&Txid, Vout) -> Option<TxOut>,
{ {
let Inner { let Shared {
client, client,
lock, state,
rebuilder, rebuilder,
} = &*self.0; } = &*self.0;
@@ -325,14 +295,14 @@ impl Mempool {
new_raws, new_raws,
gbt, gbt,
min_fee, min_fee,
}) = Fetcher::fetch(client, lock)? }) = Fetcher::fetch(client, state)?
else { else {
return Ok(()); return Ok(());
}; };
let pulled = Preparer::prepare(entries_info, new_raws, lock); let pulled = Preparer::prepare(entries_info, new_raws, state);
let changed = Applier::apply(lock, pulled); let changed = Applier::apply(state, pulled);
prevouts::fill(lock, resolver); prevouts::fill(state, resolver);
rebuilder.tick(lock, changed, &gbt, min_fee); rebuilder.tick(state, changed, &gbt, min_fee);
Ok(()) Ok(())
} }

View File

@@ -25,7 +25,7 @@ use brk_types::{TxOut, Txid, TxidPrefix, Vin, Vout};
use parking_lot::RwLock; use parking_lot::RwLock;
use tracing::warn; use tracing::warn;
use crate::{MempoolInner, stores::TxStore}; use crate::{State, stores::TxStore};
/// Default resolver: per-call `getrawtransaction` against the bitcoind /// Default resolver: per-call `getrawtransaction` against the bitcoind
/// RPC client `Mempool` already holds. Requires `txindex=1`. On any /// RPC client `Mempool` already holds. Requires `txindex=1`. On any
@@ -61,13 +61,13 @@ type HoleBatch = Vec<(TxidPrefix, Txid, Holes)>;
/// in-mempool parents are filled lock-locally; the remainder go /// in-mempool parents are filled lock-locally; the remainder go
/// through `resolver` outside any lock. Returns true iff anything /// through `resolver` outside any lock. Returns true iff anything
/// was written. /// was written.
pub(crate) fn fill<F>(lock: &RwLock<MempoolInner>, resolver: F) -> bool pub(crate) fn fill<F>(lock: &RwLock<State>, resolver: F) -> bool
where where
F: Fn(&Txid, Vout) -> Option<TxOut>, F: Fn(&Txid, Vout) -> Option<TxOut>,
{ {
let (in_mempool, holes) = { let (in_mempool, holes) = {
let inner = lock.read(); let state = lock.read();
gather(&inner.txs) gather(&state.txs)
}; };
let external = resolve_external(holes, resolver); let external = resolve_external(holes, resolver);
@@ -75,9 +75,9 @@ where
return false; return false;
} }
let mut inner = lock.write(); let mut state = lock.write();
write_fills(&mut inner, in_mempool); write_fills(&mut state, in_mempool);
write_fills(&mut inner, external); write_fills(&mut state, external);
true true
} }
@@ -136,10 +136,10 @@ where
.collect() .collect()
} }
fn write_fills(inner: &mut MempoolInner, fills: FillBatch) { fn write_fills(state: &mut State, fills: FillBatch) {
for (prefix, txid, tx_fills) in fills { for (prefix, txid, tx_fills) in fills {
for prevout in inner.txs.apply_fills(&prefix, tx_fills) { for prevout in state.txs.apply_fills(&prefix, tx_fills) {
inner.addrs.add_input(&txid, &prevout); state.addrs.add_input(&txid, &prevout);
} }
} }
} }

View File

@@ -0,0 +1,35 @@
//! Single-locked container for the live mempool.
//!
//! All cycle steps and read-side accessors take a guard on this one
//! lock. The substructures are plain owned types — they used to each
//! own a RwLock, but the canonical lock-order discipline disappears
//! when there's nothing to order.
use brk_types::{MempoolInfo, Timestamp, Txid};
use crate::{
TxRemoval,
stores::{AddrTracker, OutpointSpends, TxGraveyard, TxStore},
};
#[derive(Default)]
pub struct State {
pub info: MempoolInfo,
pub txs: TxStore,
pub addrs: AddrTracker,
pub outpoint_spends: OutpointSpends,
pub graveyard: TxGraveyard,
}
impl State {
/// `first_seen` for a tx that's live or in a `Vanished` tombstone.
/// Smooths the flicker between drop and indexer catch-up; `Replaced`
/// tombstones are excluded since the tx will not confirm.
pub fn first_seen(&self, txid: &Txid) -> Option<Timestamp> {
if let Some(e) = self.txs.entry(txid) {
return Some(e.first_seen);
}
let tomb = self.graveyard.get(txid)?;
matches!(tomb.reason(), TxRemoval::Vanished).then_some(tomb.entry.first_seen)
}
}

View File

@@ -2,8 +2,7 @@ use brk_types::{Transaction, TxidPrefix};
use parking_lot::RwLock; use parking_lot::RwLock;
use crate::{ use crate::{
TxEntry, TxRemoval, State, TxEntry, TxRemoval,
inner::MempoolInner,
steps::preparer::{TxAddition, TxsPulled}, steps::preparer::{TxAddition, TxsPulled},
}; };
@@ -13,61 +12,58 @@ pub struct Applier;
impl Applier { impl Applier {
/// Returns true iff anything changed. /// Returns true iff anything changed.
pub fn apply(lock: &RwLock<MempoolInner>, pulled: TxsPulled) -> bool { pub fn apply(lock: &RwLock<State>, pulled: TxsPulled) -> bool {
let TxsPulled { added, removed } = pulled; let TxsPulled { added, removed } = pulled;
let has_changes = !added.is_empty() || !removed.is_empty(); let has_changes = !added.is_empty() || !removed.is_empty();
let mut inner = lock.write(); let mut state = lock.write();
Self::bury_removals(&mut inner, removed); Self::bury_removals(&mut state, removed);
Self::publish_additions(&mut inner, added); Self::publish_additions(&mut state, added);
inner.graveyard.evict_old(); state.graveyard.evict_old();
has_changes has_changes
} }
fn bury_removals(inner: &mut MempoolInner, removed: Vec<(TxidPrefix, TxRemoval)>) { fn bury_removals(state: &mut State, removed: Vec<(TxidPrefix, TxRemoval)>) {
for (prefix, reason) in removed { for (prefix, reason) in removed {
Self::bury_one(inner, &prefix, reason); Self::bury_one(state, &prefix, reason);
} }
} }
fn bury_one(inner: &mut MempoolInner, prefix: &TxidPrefix, reason: TxRemoval) { fn bury_one(state: &mut State, prefix: &TxidPrefix, reason: TxRemoval) {
let Some(record) = inner.txs.remove_by_prefix(prefix) else { let Some(record) = state.txs.remove_by_prefix(prefix) else {
return; return;
}; };
let txid = record.entry.txid; let txid = record.entry.txid;
inner.info.remove(&record.tx, record.entry.fee); state.info.remove(&record.tx, record.entry.fee);
inner.addrs.remove_tx(&record.tx, &txid); state.addrs.remove_tx(&record.tx, &txid);
inner.outpoint_spends.remove_spends(&record.tx, *prefix); state.outpoint_spends.remove_spends(&record.tx, *prefix);
inner.graveyard.bury(txid, record.tx, record.entry, reason); state.graveyard.bury(txid, record.tx, record.entry, reason);
} }
fn publish_additions(inner: &mut MempoolInner, added: Vec<TxAddition>) { fn publish_additions(state: &mut State, added: Vec<TxAddition>) {
for addition in added { for addition in added {
if let Some((tx, entry)) = Self::resolve_addition(inner, addition) { if let Some((tx, entry)) = Self::resolve_addition(state, addition) {
Self::publish_one(inner, tx, entry); Self::publish_one(state, tx, entry);
} }
} }
} }
fn resolve_addition( fn resolve_addition(state: &mut State, addition: TxAddition) -> Option<(Transaction, TxEntry)> {
inner: &mut MempoolInner,
addition: TxAddition,
) -> Option<(Transaction, TxEntry)> {
match addition { match addition {
TxAddition::Fresh { tx, entry } => Some((tx, entry)), TxAddition::Fresh { tx, entry } => Some((tx, entry)),
TxAddition::Revived { entry } => { TxAddition::Revived { entry } => {
let tomb = inner.graveyard.exhume(&entry.txid)?; let tomb = state.graveyard.exhume(&entry.txid)?;
Some((tomb.tx, entry)) Some((tomb.tx, entry))
} }
} }
} }
fn publish_one(inner: &mut MempoolInner, tx: Transaction, entry: TxEntry) { fn publish_one(state: &mut State, tx: Transaction, entry: TxEntry) {
let prefix = entry.txid_prefix(); let prefix = entry.txid_prefix();
inner.info.add(&tx, entry.fee); state.info.add(&tx, entry.fee);
inner.addrs.add_tx(&tx, &entry.txid); state.addrs.add_tx(&tx, &entry.txid);
inner.outpoint_spends.insert_spends(&tx, prefix); state.outpoint_spends.insert_spends(&tx, prefix);
inner.txs.insert(tx, entry); state.txs.insert(tx, entry);
} }
} }

View File

@@ -8,7 +8,7 @@ use brk_types::{MempoolEntryInfo, Txid};
use parking_lot::RwLock; use parking_lot::RwLock;
use crate::{ use crate::{
MempoolInner, State,
stores::{TxGraveyard, TxStore}, stores::{TxGraveyard, TxStore},
}; };
@@ -29,7 +29,7 @@ const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000;
pub struct Fetcher; pub struct Fetcher;
impl Fetcher { impl Fetcher {
pub fn fetch(client: &Client, lock: &RwLock<MempoolInner>) -> Result<Option<Fetched>> { pub fn fetch(client: &Client, lock: &RwLock<State>) -> Result<Option<Fetched>> {
let Some(MempoolState { let Some(MempoolState {
entries, entries,
gbt, gbt,
@@ -39,8 +39,8 @@ impl Fetcher {
return Ok(None); return Ok(None);
}; };
let new_txids = { let new_txids = {
let inner = lock.read(); let state = lock.read();
Self::new_txids(&entries, &inner.txs, &inner.graveyard) Self::new_txids(&entries, &state.txs, &state.graveyard)
}; };
let new_raws = client.get_raw_transactions(&new_txids)?; let new_raws = client.get_raw_transactions(&new_txids)?;
Ok(Some(Fetched { Ok(Some(Fetched {

View File

@@ -1,5 +1,5 @@
//! Turn `Fetched` raws into a typed diff for the Applier. Pure CPU, //! Turn `Fetched` raws into a typed diff for the Applier. Pure CPU,
//! holds a read guard on `MempoolInner` for the cycle. New txs are //! holds a read guard on `State` for the cycle. New txs are
//! classified into three buckets: //! classified into three buckets:
//! //!
//! - **live** - already in `known`, skipped. //! - **live** - already in `known`, skipped.
@@ -17,7 +17,7 @@ use parking_lot::RwLock;
use rustc_hash::{FxHashMap, FxHashSet}; use rustc_hash::{FxHashMap, FxHashSet};
use crate::{ use crate::{
MempoolInner, State,
stores::{TxGraveyard, TxStore}, stores::{TxGraveyard, TxStore},
}; };
@@ -37,13 +37,13 @@ impl Preparer {
pub fn prepare( pub fn prepare(
entries_info: Vec<MempoolEntryInfo>, entries_info: Vec<MempoolEntryInfo>,
new_raws: FxHashMap<Txid, RawTx>, new_raws: FxHashMap<Txid, RawTx>,
lock: &RwLock<MempoolInner>, lock: &RwLock<State>,
) -> TxsPulled { ) -> TxsPulled {
let inner = lock.read(); let state = lock.read();
let live = Self::live_set(&entries_info); let live = Self::live_set(&entries_info);
let added = Self::classify_additions(entries_info, new_raws, &inner.txs, &inner.graveyard); let added = Self::classify_additions(entries_info, new_raws, &state.txs, &state.graveyard);
let removed = TxRemoval::classify(&live, &added, &inner.txs); let removed = TxRemoval::classify(&live, &added, &state.txs);
TxsPulled { added, removed } TxsPulled { added, removed }
} }

View File

@@ -8,7 +8,7 @@ use brk_types::{FeeRate, TxidPrefix};
use parking_lot::RwLock; use parking_lot::RwLock;
use rustc_hash::FxHashSet; use rustc_hash::FxHashSet;
use crate::inner::MempoolInner; use crate::State;
use partition::Partitioner; use partition::Partitioner;
use snapshot::{PrefixIndex, builder}; use snapshot::{PrefixIndex, builder};
@@ -32,10 +32,12 @@ pub struct Rebuilder {
impl Rebuilder { impl Rebuilder {
/// Mark dirty if the cycle changed mempool state, then rebuild iff /// Mark dirty if the cycle changed mempool state, then rebuild iff
/// the dirty bit is set. Cycle pacing is the driver loop's job; the /// the dirty bit is set. Cycle pacing is the driver loop's job; the
/// rebuild itself is pure CPU on already-fetched data. /// rebuild itself is pure CPU on already-fetched data. The dirty
/// bit is cleared only after the snapshot is published, so a panic
/// in `build_snapshot` retries on the next cycle.
pub fn tick( pub fn tick(
&self, &self,
lock: &RwLock<MempoolInner>, lock: &RwLock<State>,
changed: bool, changed: bool,
gbt: &[BlockTemplateTx], gbt: &[BlockTemplateTx],
min_fee: FeeRate, min_fee: FeeRate,
@@ -43,7 +45,8 @@ impl Rebuilder {
if changed { if changed {
self.dirty.store(true, Ordering::Release); self.dirty.store(true, Ordering::Release);
} }
if !self.try_claim_rebuild() { if !self.dirty.load(Ordering::Acquire) {
self.skip_clean.fetch_add(1, Ordering::Relaxed);
return; return;
} }
*self.snapshot.write() = Arc::new(Self::build_snapshot(lock, gbt, min_fee)); *self.snapshot.write() = Arc::new(Self::build_snapshot(lock, gbt, min_fee));
@@ -60,13 +63,13 @@ impl Rebuilder {
} }
fn build_snapshot( fn build_snapshot(
lock: &RwLock<MempoolInner>, lock: &RwLock<State>,
gbt: &[BlockTemplateTx], gbt: &[BlockTemplateTx],
min_fee: FeeRate, min_fee: FeeRate,
) -> Snapshot { ) -> Snapshot {
let (txs, prefix_to_idx) = { let (txs, prefix_to_idx) = {
let inner = lock.read(); let state = lock.read();
builder::build_txs(&inner.txs) builder::build_txs(&state.txs)
}; };
let block0 = Self::block_from_gbt(gbt, &prefix_to_idx); let block0 = Self::block_from_gbt(gbt, &prefix_to_idx);
@@ -94,15 +97,4 @@ impl Rebuilder {
pub fn snapshot(&self) -> Arc<Snapshot> { pub fn snapshot(&self) -> Arc<Snapshot> {
self.snapshot.read().clone() self.snapshot.read().clone()
} }
/// True iff dirty. The dirty bit is cleared in `tick` only after
/// the snapshot is published, so a panic in `build_snapshot`
/// retries on the next cycle.
fn try_claim_rebuild(&self) -> bool {
if !self.dirty.load(Ordering::Acquire) {
self.skip_clean.fetch_add(1, Ordering::Relaxed);
return false;
}
true
}
} }

View File

@@ -87,10 +87,6 @@ impl Snapshot {
self.prefix_to_idx.get(prefix).copied() self.prefix_to_idx.get(prefix).copied()
} }
pub fn txs_len(&self) -> usize {
self.txs.len()
}
/// Effective chunk rate for a live tx by prefix, or `None` if the /// Effective chunk rate for a live tx by prefix, or `None` if the
/// tx isn't in this snapshot. /// tx isn't in this snapshot.
pub fn chunk_rate_for(&self, prefix: &TxidPrefix) -> Option<FeeRate> { pub fn chunk_rate_for(&self, prefix: &TxidPrefix) -> Option<FeeRate> {

View File

@@ -1,6 +1,6 @@
//! Stateful in-memory holders. After Phase 3 they're plain owned //! Stateful in-memory holders. After Phase 3 they're plain owned
//! types (no internal locks) — `MempoolInner` aggregates them under a //! types (no internal locks) — `State` aggregates them under a
//! single `RwLock` in `crate::inner`. //! single `RwLock` in `crate::state`.
pub mod addr_tracker; pub mod addr_tracker;
pub(crate) mod outpoint_spends; pub(crate) mod outpoint_spends;