From cb74087f27d2f78dbeb3e3d2227a707c1c5ca524 Mon Sep 17 00:00:00 2001 From: nym21 Date: Wed, 6 May 2026 19:31:18 +0200 Subject: [PATCH] mempool: fixes --- Cargo.lock | 207 ++---------------- Cargo.toml | 4 +- crates/brk_mempool/src/cluster/mod.rs | 58 ++++- crates/brk_mempool/src/cluster/sfl.rs | 8 +- crates/brk_mempool/src/lib.rs | 30 ++- crates/brk_mempool/src/steps/applier.rs | 12 + crates/brk_mempool/src/steps/fetcher/mod.rs | 13 +- .../brk_mempool/src/stores/entry_pool/mod.rs | 6 + crates/brk_query/src/impl/mempool.rs | 9 + crates/brk_server/src/api/mempool.rs | 19 ++ 10 files changed, 157 insertions(+), 209 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d158d27e6..8b94b55d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -775,9 +775,9 @@ dependencies = [ [[package]] name = "cfg-if" -version = "1.0.4" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" @@ -1277,12 +1277,6 @@ dependencies = [ "spin", ] -[[package]] -name = "foldhash" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" - [[package]] name = "font-kit" version = "0.14.3" @@ -1413,23 +1407,10 @@ checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", "libc", - "r-efi 5.3.0", + "r-efi", "wasip2", ] -[[package]] -name = "getrandom" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" -dependencies = [ - "cfg-if", - "libc", - "r-efi 6.0.0", - "wasip2", - "wasip3", -] - [[package]] name = "gif" version = "0.12.0" @@ -1469,15 +1450,6 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" -[[package]] -name = "hashbrown" -version = "0.15.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" -dependencies = [ - "foldhash", -] - [[package]] name = "hashbrown" version = "0.16.1" @@ -1697,12 +1669,6 @@ dependencies = [ "zerovec", ] -[[package]] -name = "id-arena" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" - [[package]] name = "idna" version = "1.1.0" @@ -1880,12 +1846,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" -[[package]] -name = "leb128fmt" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" - [[package]] name = "lexopt" version = "0.3.2" @@ -2301,16 +2261,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" -[[package]] -name = "prettyplease" -version = "0.2.37" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" -dependencies = [ - "proc-macro2", - "syn", -] - [[package]] name = "proc-macro2" version = "1.0.106" @@ -2354,12 +2304,6 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" -[[package]] -name = "r-efi" -version = "6.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" - [[package]] name = "rand_core" version = "0.6.4" @@ -2386,7 +2330,9 @@ dependencies = [ [[package]] name = "rawdb" -version = "0.10.2" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b22c0f9f22e83612f89b67f341029c59d652f59f88d053037ad464ff4f1f886" dependencies = [ "libc", "log", @@ -2904,7 +2850,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.4.2", + "getrandom 0.3.4", "once_cell", "rustix", "windows-sys 0.61.2", @@ -3278,6 +3224,8 @@ checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" [[package]] name = "vecdb" version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ca57cedd42c0c7d8a343c06ab9c311be28a731e5d1e4101ef671d9a9af409a8" dependencies = [ "itoa", "libc", @@ -3298,7 +3246,9 @@ dependencies = [ [[package]] name = "vecdb_derive" -version = "0.10.2" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d5dadc8894e9260b3ac87613a3cdf2f26c319b1957018b7f87a4e6713c9a14d" dependencies = [ "quote", "syn", @@ -3332,16 +3282,7 @@ version = "1.0.3+wasi-0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6" dependencies = [ - "wit-bindgen 0.57.1", -] - -[[package]] -name = "wasip3" -version = "0.4.0+wasi-0.3.0-rc-2026-01-06" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" -dependencies = [ - "wit-bindgen 0.51.0", + "wit-bindgen", ] [[package]] @@ -3389,40 +3330,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "wasm-encoder" -version = "0.244.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" -dependencies = [ - "leb128fmt", - "wasmparser", -] - -[[package]] -name = "wasm-metadata" -version = "0.244.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" -dependencies = [ - "anyhow", - "indexmap", - "wasm-encoder", - "wasmparser", -] - -[[package]] -name = "wasmparser" -version = "0.244.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" -dependencies = [ - "bitflags 2.11.1", - "hashbrown 0.15.5", - "indexmap", - "semver", -] - [[package]] name = "web-sys" version = "0.3.97" @@ -3635,100 +3542,12 @@ dependencies = [ "winapi", ] -[[package]] -name = "wit-bindgen" -version = "0.51.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" -dependencies = [ - "wit-bindgen-rust-macro", -] - [[package]] name = "wit-bindgen" version = "0.57.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" -[[package]] -name = "wit-bindgen-core" -version = "0.51.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" -dependencies = [ - "anyhow", - "heck", - "wit-parser", -] - -[[package]] -name = "wit-bindgen-rust" -version = "0.51.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" -dependencies = [ - "anyhow", - "heck", - "indexmap", - "prettyplease", - "syn", - "wasm-metadata", - "wit-bindgen-core", - "wit-component", -] - -[[package]] -name = "wit-bindgen-rust-macro" -version = "0.51.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" -dependencies = [ - "anyhow", - "prettyplease", - "proc-macro2", - "quote", - "syn", - "wit-bindgen-core", - "wit-bindgen-rust", -] - -[[package]] -name = "wit-component" -version = "0.244.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" -dependencies = [ - "anyhow", - "bitflags 2.11.1", - "indexmap", - "log", - "serde", - "serde_derive", - "serde_json", - "wasm-encoder", - "wasm-metadata", - "wasmparser", - "wit-parser", -] - -[[package]] -name = "wit-parser" -version = "0.244.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" -dependencies = [ - "anyhow", - "id-arena", - "indexmap", - "log", - "semver", - "serde", - "serde_derive", - "serde_json", - "unicode-xid", - "wasmparser", -] - [[package]] name = "writeable" version = "0.6.3" diff --git a/Cargo.toml b/Cargo.toml index ebc69edd3..f37430f42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,8 +86,8 @@ tower-http = { version = "0.6.9", features = ["catch-panic", "compression-br", " tower-layer = "0.3" tracing = { version = "0.1", default-features = false, features = ["std"] } ureq = { version = "3.3.0", features = ["json"] } -# vecdb = { version = "=0.10.2", features = ["derive", "serde_json", "pco", "schemars"] } -vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco", "schemars"] } +vecdb = { version = "=0.10.2", features = ["derive", "serde_json", "pco", "schemars"] } +# vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco", "schemars"] } [workspace.metadata.release] shared-version = true diff --git a/crates/brk_mempool/src/cluster/mod.rs b/crates/brk_mempool/src/cluster/mod.rs index 75dac089e..c40ac9fe3 100644 --- a/crates/brk_mempool/src/cluster/mod.rs +++ b/crates/brk_mempool/src/cluster/mod.rs @@ -23,6 +23,7 @@ pub use cluster_ref::ClusterRef; pub use local_idx::LocalIdx; use smallvec::SmallVec; +use tracing::warn; /// A connected component of the mempool graph, stored in topological /// order (parents before children) and SFL-linearized into chunks. @@ -47,8 +48,20 @@ pub struct Cluster { impl Cluster { pub fn new(nodes: Vec>) -> Self { let nodes = Self::permute_to_topo_order(nodes); - let chunk_masks = sfl::linearize(&nodes); - let (chunks, node_to_chunk) = Self::materialize_chunks(&chunk_masks, nodes.len()); + let (chunks, node_to_chunk) = if nodes.len() < sfl::BITMASK_LIMIT { + let chunk_masks = sfl::linearize(&nodes); + Self::materialize_chunks(&chunk_masks, nodes.len()) + } else { + // Bitcoin Core 30+ caps clusters at 100, but pre-BIP431 nodes + // (or relay-policy edge cases) can produce larger connected + // components. Fall back to a trivial linearization so the + // mempool loop survives instead of panicking. + warn!( + "cluster size {} >= u128 capacity, using trivial linearization", + nodes.len() + ); + Self::trivial_chunks(&nodes) + }; Self { nodes, chunks, @@ -56,6 +69,47 @@ impl Cluster { } } + /// Fallback linearization for oversized clusters: emit each node as + /// its own chunk (topo-ordered), then run the same stack-merge as + /// `sfl::canonicalize` to restore the non-increasing fee_rate + /// invariant the partitioner relies on. Suboptimal partitioning + /// for that one cluster, but topology is preserved (merges only + /// join consecutive topo-ordered runs). + fn trivial_chunks(nodes: &[ClusterNode]) -> (Vec, Vec) { + let mut out: Vec = Vec::with_capacity(nodes.len()); + for (i, node) in nodes.iter().enumerate() { + let mut txs: SmallVec<[LocalIdx; 4]> = SmallVec::new(); + txs.push(LocalIdx::from(i)); + let mut cur = Chunk { + txs, + fee: node.fee, + vsize: node.vsize, + }; + while let Some(top) = out.last() { + if cur.fee_rate() <= top.fee_rate() { + break; + } + let prev = out.pop().unwrap(); + let mut merged_txs = prev.txs; + merged_txs.extend(cur.txs.iter().copied()); + cur = Chunk { + txs: merged_txs, + fee: prev.fee + cur.fee, + vsize: prev.vsize + cur.vsize, + }; + } + out.push(cur); + } + let mut node_to_chunk = vec![ChunkId::ZERO; nodes.len()]; + for (cid, chunk) in out.iter().enumerate() { + let chunk_id = ChunkId::from(cid); + for &local in &chunk.txs { + node_to_chunk[local.as_usize()] = chunk_id; + } + } + (out, node_to_chunk) + } + /// O(1) chunk lookup for a node. #[inline] pub fn chunk_of(&self, local: LocalIdx) -> &Chunk { diff --git a/crates/brk_mempool/src/cluster/sfl.rs b/crates/brk_mempool/src/cluster/sfl.rs index 380bbf398..67b52a10d 100644 --- a/crates/brk_mempool/src/cluster/sfl.rs +++ b/crates/brk_mempool/src/cluster/sfl.rs @@ -27,7 +27,7 @@ use super::ClusterNode; const BRUTE_FORCE_LIMIT: usize = 18; /// Cluster nodes are indexed by `u128` bitmask, so `n < 128`. Bitcoin /// Core's cluster cap is 100, so this leaves comfortable margin. -const BITMASK_LIMIT: usize = 128; +pub(super) const BITMASK_LIMIT: usize = 128; /// Raw SFL output: a chunk's bitmask plus its totals. `Cluster::new` /// converts these into final `Chunk`s with topo-ordered `txs`, so the @@ -45,8 +45,12 @@ impl ChunkMask { } /// Linearize a cluster into SFL chunks. +/// +/// Precondition: `nodes.len() < BITMASK_LIMIT`. `Cluster::new` enforces +/// this by dispatching oversized clusters to a trivial fallback before +/// reaching here, so the check is `debug_assert!` rather than runtime. pub(super) fn linearize(nodes: &[ClusterNode]) -> Vec { - assert!( + debug_assert!( nodes.len() < BITMASK_LIMIT, "cluster size {} exceeds u128 capacity", nodes.len() diff --git a/crates/brk_mempool/src/lib.rs b/crates/brk_mempool/src/lib.rs index c006265f3..5fe34343b 100644 --- a/crates/brk_mempool/src/lib.rs +++ b/crates/brk_mempool/src/lib.rs @@ -13,7 +13,12 @@ //! 5. [`steps::rebuilder::Rebuilder`] - throttled rebuild of the //! projected-blocks `Snapshot`. -use std::{sync::Arc, thread, time::Duration}; +use std::{ + panic::{AssertUnwindSafe, catch_unwind}, + sync::Arc, + thread, + time::Duration, +}; use brk_error::Result; use brk_rpc::Client; @@ -125,12 +130,29 @@ impl Mempool { } /// Variant of `start` that runs `after_update` after every cycle. + /// + /// `update` and `after_update` are wrapped in `catch_unwind` so an + /// unexpected panic in either step doesn't kill the loop and freeze + /// the mempool snapshot. `parking_lot` locks don't poison, so state + /// remains usable after a panic. pub fn start_with(&self, mut after_update: impl FnMut()) { loop { - if let Err(e) = self.update() { - error!("update failed: {e}"); + let outcome = catch_unwind(AssertUnwindSafe(|| { + if let Err(e) = self.update() { + error!("update failed: {e}"); + } + after_update(); + })); + if let Err(payload) = outcome { + let msg = if let Some(s) = payload.downcast_ref::<&'static str>() { + (*s).to_string() + } else if let Some(s) = payload.downcast_ref::() { + s.clone() + } else { + "".to_string() + }; + error!("mempool update panicked, continuing loop: {msg}"); } - after_update(); thread::sleep(Duration::from_secs(1)); } } diff --git a/crates/brk_mempool/src/steps/applier.rs b/crates/brk_mempool/src/steps/applier.rs index 2b8f782e4..6c1e72300 100644 --- a/crates/brk_mempool/src/steps/applier.rs +++ b/crates/brk_mempool/src/steps/applier.rs @@ -1,4 +1,5 @@ use brk_types::{Transaction, Txid, TxidPrefix}; +use tracing::warn; use crate::{ TxEntry, TxRemoval, @@ -37,6 +38,17 @@ impl Applier { }; let txid = entry.txid; let Some(tx) = s.txs.remove(&txid) else { + // entries had this prefix but txs didn't — a state divergence + // that should be impossible: publish/bury both touch them + // together under one write_all guard. Reaching this branch + // means a prior cycle left the two stores out of sync (panic + // mid-publish, prefix collision, etc). The slot has been + // freed by entries.remove, but addrs/outpoint_spends/info may + // still hold stale references that we can't repair without + // the tx body. Log loudly so the corruption is visible. + warn!( + "mempool bury: entry present but tx missing for txid={txid} - addr/outpoint state may be stale" + ); return; }; s.info.remove(&tx, entry.fee); diff --git a/crates/brk_mempool/src/steps/fetcher/mod.rs b/crates/brk_mempool/src/steps/fetcher/mod.rs index f73ec2b06..900078f90 100644 --- a/crates/brk_mempool/src/steps/fetcher/mod.rs +++ b/crates/brk_mempool/src/steps/fetcher/mod.rs @@ -5,7 +5,7 @@ pub use fetched::Fetched; use brk_error::Result; use brk_rpc::{Client, RawTx}; use brk_types::{MempoolEntryInfo, Txid}; -use rustc_hash::FxHashMap; +use rustc_hash::{FxHashMap, FxHashSet}; use crate::stores::{MempoolState, TxGraveyard, TxStore}; @@ -77,13 +77,16 @@ impl Fetcher { } fn unique_confirmed_parents(new_raws: &FxHashMap, known: &TxStore) -> Vec { - let mut v = new_raws + // Iterating new_raws.values() yields txs in arbitrary FxHashMap order, + // so duplicates of the same parent are typically non-adjacent. Dedup + // via a FxHashSet so a parent shared by N new txs is fetched once. + let mut seen: FxHashSet = FxHashSet::default(); + new_raws .values() .flat_map(|raw| &raw.tx.input) .map(|txin| Txid::from(txin.previous_output.txid)) .filter(|prev| !known.contains(prev) && !new_raws.contains_key(prev)) - .collect::>(); - v.dedup(); - v + .filter(|prev| seen.insert(*prev)) + .collect() } } diff --git a/crates/brk_mempool/src/stores/entry_pool/mod.rs b/crates/brk_mempool/src/stores/entry_pool/mod.rs index 9a9bd6a95..b2bf6e0b4 100644 --- a/crates/brk_mempool/src/stores/entry_pool/mod.rs +++ b/crates/brk_mempool/src/stores/entry_pool/mod.rs @@ -23,6 +23,12 @@ pub struct EntryPool { impl EntryPool { pub fn insert(&mut self, entry: TxEntry) -> TxIndex { let prefix = entry.txid_prefix(); + debug_assert!( + !self.prefix_to_idx.contains_key(&prefix), + "TxidPrefix collision in EntryPool: prefix {prefix:?} already mapped. \ + Birthday-rare on SHA-256d, but if it ever fires the previous slot \ + leaks because outpoint_spends still references it." + ); let idx = self.claim_slot(entry); self.prefix_to_idx.insert(prefix, idx); idx diff --git a/crates/brk_query/src/impl/mempool.rs b/crates/brk_query/src/impl/mempool.rs index 2d439e2c2..805116baa 100644 --- a/crates/brk_query/src/impl/mempool.rs +++ b/crates/brk_query/src/impl/mempool.rs @@ -254,4 +254,13 @@ impl Query { }) .collect()) } + + /// Opaque content hash that changes whenever the projected next + /// block changes. Same value used as the mempool ETag, surfaced as + /// JSON so external monitors can detect a frozen update loop by + /// polling: if the value doesn't change for tens of seconds on a + /// live network, the mempool sync has stalled. + pub fn mempool_hash(&self) -> Result { + Ok(self.require_mempool()?.next_block_hash()) + } } diff --git a/crates/brk_server/src/api/mempool.rs b/crates/brk_server/src/api/mempool.rs index 04573d2a1..b53b7d77e 100644 --- a/crates/brk_server/src/api/mempool.rs +++ b/crates/brk_server/src/api/mempool.rs @@ -32,6 +32,25 @@ impl MempoolRoutes for ApiRouter { }, ), ) + .api_route( + "/api/mempool/hash", + get_with( + async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State| { + state + .respond_json(&headers, state.mempool_strategy(), &uri, |q| q.mempool_hash()) + .await + }, + |op| { + op.id("get_mempool_hash") + .mempool_tag() + .summary("Mempool content hash") + .description("Returns an opaque `u64` that changes whenever the projected next block changes. Same value as the mempool ETag. Useful as a freshness/liveness signal: if it stays constant for tens of seconds on a live network, the mempool sync loop has stalled.") + .json_response::() + .not_modified() + .server_error() + }, + ), + ) .api_route( "/api/mempool/txids", get_with(