mempool: fixes

This commit is contained in:
nym21
2026-05-06 19:31:18 +02:00
parent 086bfd9938
commit cb74087f27
10 changed files with 157 additions and 209 deletions

View File

@@ -23,6 +23,7 @@ pub use cluster_ref::ClusterRef;
pub use local_idx::LocalIdx;
use smallvec::SmallVec;
use tracing::warn;
/// A connected component of the mempool graph, stored in topological
/// order (parents before children) and SFL-linearized into chunks.
@@ -47,8 +48,20 @@ pub struct Cluster<I> {
impl<I> Cluster<I> {
pub fn new(nodes: Vec<ClusterNode<I>>) -> Self {
let nodes = Self::permute_to_topo_order(nodes);
let chunk_masks = sfl::linearize(&nodes);
let (chunks, node_to_chunk) = Self::materialize_chunks(&chunk_masks, nodes.len());
let (chunks, node_to_chunk) = if nodes.len() < sfl::BITMASK_LIMIT {
let chunk_masks = sfl::linearize(&nodes);
Self::materialize_chunks(&chunk_masks, nodes.len())
} else {
// Bitcoin Core 30+ caps clusters at 100, but pre-BIP431 nodes
// (or relay-policy edge cases) can produce larger connected
// components. Fall back to a trivial linearization so the
// mempool loop survives instead of panicking.
warn!(
"cluster size {} >= u128 capacity, using trivial linearization",
nodes.len()
);
Self::trivial_chunks(&nodes)
};
Self {
nodes,
chunks,
@@ -56,6 +69,47 @@ impl<I> Cluster<I> {
}
}
/// Fallback linearization for oversized clusters: emit each node as
/// its own chunk (topo-ordered), then run the same stack-merge as
/// `sfl::canonicalize` to restore the non-increasing fee_rate
/// invariant the partitioner relies on. Suboptimal partitioning
/// for that one cluster, but topology is preserved (merges only
/// join consecutive topo-ordered runs).
fn trivial_chunks(nodes: &[ClusterNode<I>]) -> (Vec<Chunk>, Vec<ChunkId>) {
let mut out: Vec<Chunk> = Vec::with_capacity(nodes.len());
for (i, node) in nodes.iter().enumerate() {
let mut txs: SmallVec<[LocalIdx; 4]> = SmallVec::new();
txs.push(LocalIdx::from(i));
let mut cur = Chunk {
txs,
fee: node.fee,
vsize: node.vsize,
};
while let Some(top) = out.last() {
if cur.fee_rate() <= top.fee_rate() {
break;
}
let prev = out.pop().unwrap();
let mut merged_txs = prev.txs;
merged_txs.extend(cur.txs.iter().copied());
cur = Chunk {
txs: merged_txs,
fee: prev.fee + cur.fee,
vsize: prev.vsize + cur.vsize,
};
}
out.push(cur);
}
let mut node_to_chunk = vec![ChunkId::ZERO; nodes.len()];
for (cid, chunk) in out.iter().enumerate() {
let chunk_id = ChunkId::from(cid);
for &local in &chunk.txs {
node_to_chunk[local.as_usize()] = chunk_id;
}
}
(out, node_to_chunk)
}
/// O(1) chunk lookup for a node.
#[inline]
pub fn chunk_of(&self, local: LocalIdx) -> &Chunk {

View File

@@ -27,7 +27,7 @@ use super::ClusterNode;
const BRUTE_FORCE_LIMIT: usize = 18;
/// Cluster nodes are indexed by `u128` bitmask, so `n < 128`. Bitcoin
/// Core's cluster cap is 100, so this leaves comfortable margin.
const BITMASK_LIMIT: usize = 128;
pub(super) const BITMASK_LIMIT: usize = 128;
/// Raw SFL output: a chunk's bitmask plus its totals. `Cluster::new`
/// converts these into final `Chunk`s with topo-ordered `txs`, so the
@@ -45,8 +45,12 @@ impl ChunkMask {
}
/// Linearize a cluster into SFL chunks.
///
/// Precondition: `nodes.len() < BITMASK_LIMIT`. `Cluster::new` enforces
/// this by dispatching oversized clusters to a trivial fallback before
/// reaching here, so the check is `debug_assert!` rather than runtime.
pub(super) fn linearize<I>(nodes: &[ClusterNode<I>]) -> Vec<ChunkMask> {
assert!(
debug_assert!(
nodes.len() < BITMASK_LIMIT,
"cluster size {} exceeds u128 capacity",
nodes.len()

View File

@@ -13,7 +13,12 @@
//! 5. [`steps::rebuilder::Rebuilder`] - throttled rebuild of the
//! projected-blocks `Snapshot`.
use std::{sync::Arc, thread, time::Duration};
use std::{
panic::{AssertUnwindSafe, catch_unwind},
sync::Arc,
thread,
time::Duration,
};
use brk_error::Result;
use brk_rpc::Client;
@@ -125,12 +130,29 @@ impl Mempool {
}
/// Variant of `start` that runs `after_update` after every cycle.
///
/// `update` and `after_update` are wrapped in `catch_unwind` so an
/// unexpected panic in either step doesn't kill the loop and freeze
/// the mempool snapshot. `parking_lot` locks don't poison, so state
/// remains usable after a panic.
pub fn start_with(&self, mut after_update: impl FnMut()) {
loop {
if let Err(e) = self.update() {
error!("update failed: {e}");
let outcome = catch_unwind(AssertUnwindSafe(|| {
if let Err(e) = self.update() {
error!("update failed: {e}");
}
after_update();
}));
if let Err(payload) = outcome {
let msg = if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic payload>".to_string()
};
error!("mempool update panicked, continuing loop: {msg}");
}
after_update();
thread::sleep(Duration::from_secs(1));
}
}

View File

@@ -1,4 +1,5 @@
use brk_types::{Transaction, Txid, TxidPrefix};
use tracing::warn;
use crate::{
TxEntry, TxRemoval,
@@ -37,6 +38,17 @@ impl Applier {
};
let txid = entry.txid;
let Some(tx) = s.txs.remove(&txid) else {
// entries had this prefix but txs didn't — a state divergence
// that should be impossible: publish/bury both touch them
// together under one write_all guard. Reaching this branch
// means a prior cycle left the two stores out of sync (panic
// mid-publish, prefix collision, etc). The slot has been
// freed by entries.remove, but addrs/outpoint_spends/info may
// still hold stale references that we can't repair without
// the tx body. Log loudly so the corruption is visible.
warn!(
"mempool bury: entry present but tx missing for txid={txid} - addr/outpoint state may be stale"
);
return;
};
s.info.remove(&tx, entry.fee);

View File

@@ -5,7 +5,7 @@ pub use fetched::Fetched;
use brk_error::Result;
use brk_rpc::{Client, RawTx};
use brk_types::{MempoolEntryInfo, Txid};
use rustc_hash::FxHashMap;
use rustc_hash::{FxHashMap, FxHashSet};
use crate::stores::{MempoolState, TxGraveyard, TxStore};
@@ -77,13 +77,16 @@ impl Fetcher {
}
fn unique_confirmed_parents(new_raws: &FxHashMap<Txid, RawTx>, known: &TxStore) -> Vec<Txid> {
let mut v = new_raws
// Iterating new_raws.values() yields txs in arbitrary FxHashMap order,
// so duplicates of the same parent are typically non-adjacent. Dedup
// via a FxHashSet so a parent shared by N new txs is fetched once.
let mut seen: FxHashSet<Txid> = FxHashSet::default();
new_raws
.values()
.flat_map(|raw| &raw.tx.input)
.map(|txin| Txid::from(txin.previous_output.txid))
.filter(|prev| !known.contains(prev) && !new_raws.contains_key(prev))
.collect::<Vec<_>>();
v.dedup();
v
.filter(|prev| seen.insert(*prev))
.collect()
}
}

View File

@@ -23,6 +23,12 @@ pub struct EntryPool {
impl EntryPool {
pub fn insert(&mut self, entry: TxEntry) -> TxIndex {
let prefix = entry.txid_prefix();
debug_assert!(
!self.prefix_to_idx.contains_key(&prefix),
"TxidPrefix collision in EntryPool: prefix {prefix:?} already mapped. \
Birthday-rare on SHA-256d, but if it ever fires the previous slot \
leaks because outpoint_spends still references it."
);
let idx = self.claim_slot(entry);
self.prefix_to_idx.insert(prefix, idx);
idx

View File

@@ -254,4 +254,13 @@ impl Query {
})
.collect())
}
/// Opaque content hash that changes whenever the projected next
/// block changes. Same value used as the mempool ETag, surfaced as
/// JSON so external monitors can detect a frozen update loop by
/// polling: if the value doesn't change for tens of seconds on a
/// live network, the mempool sync has stalled.
pub fn mempool_hash(&self) -> Result<u64> {
Ok(self.require_mempool()?.next_block_hash())
}
}

View File

@@ -32,6 +32,25 @@ impl MempoolRoutes for ApiRouter<AppState> {
},
),
)
.api_route(
"/api/mempool/hash",
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.respond_json(&headers, state.mempool_strategy(), &uri, |q| q.mempool_hash())
.await
},
|op| {
op.id("get_mempool_hash")
.mempool_tag()
.summary("Mempool content hash")
.description("Returns an opaque `u64` that changes whenever the projected next block changes. Same value as the mempool ETag. Useful as a freshness/liveness signal: if it stays constant for tens of seconds on a live network, the mempool sync loop has stalled.")
.json_response::<u64>()
.not_modified()
.server_error()
},
),
)
.api_route(
"/api/mempool/txids",
get_with(