From 774580ee11d52a90bd8f272ac2cf0147c311aff9 Mon Sep 17 00:00:00 2001 From: nym21 Date: Sun, 10 May 2026 14:04:08 +0200 Subject: [PATCH] mempool: fixes --- crates/brk_mempool/src/lib.rs | 57 ++++++++----- .../brk_mempool/src/steps/fetcher/fetched.rs | 4 +- crates/brk_mempool/src/steps/fetcher/mod.rs | 4 +- crates/brk_mempool/src/steps/preparer/mod.rs | 15 ++-- .../src/steps/preparer/tx_addition.rs | 34 ++++---- .../src/steps/preparer/tx_removal.rs | 3 + crates/brk_mempool/src/steps/prevouts.rs | 78 ++++++++++++------ .../src/steps/rebuilder/partition.rs | 8 +- .../src/steps/rebuilder/snapshot/mod.rs | 17 ++-- .../src/stores/addr_tracker/mod.rs | 6 +- .../src/stores/tx_graveyard/mod.rs | 4 +- crates/brk_query/src/impl/mempool.rs | 82 +++++++++---------- crates/brk_rpc/src/lib.rs | 11 +-- crates/brk_rpc/src/methods.rs | 82 +++++++++++-------- 14 files changed, 231 insertions(+), 174 deletions(-) diff --git a/crates/brk_mempool/src/lib.rs b/crates/brk_mempool/src/lib.rs index 0927ab55c..cfea5de3a 100644 --- a/crates/brk_mempool/src/lib.rs +++ b/crates/brk_mempool/src/lib.rs @@ -22,7 +22,10 @@ use std::{ any::Any, cmp::Reverse, panic::{AssertUnwindSafe, catch_unwind}, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, thread, time::{Duration, Instant}, }; @@ -34,8 +37,8 @@ use brk_types::{ MempoolInfo, MempoolRecentTx, NextBlockHash, OutpointPrefix, OutputType, Sats, Timestamp, Transaction, TxOut, Txid, TxidPrefix, Vin, Vout, }; -use rustc_hash::FxHashSet; use parking_lot::{RwLock, RwLockReadGuard}; +use rustc_hash::{FxHashMap, FxHashSet}; use tracing::error; mod cluster; @@ -54,9 +57,15 @@ pub(crate) use steps::{BlockStats, RecommendedFees, TxEntry, TxRemoval}; pub(crate) use stores::{TxStore, TxTombstone}; /// Confirmed-parent prevout resolver passed to [`Mempool::update_with`] / -/// [`Mempool::start_with`]. Receives `(parent_txid, vout)`, returns the -/// `TxOut` if the parent is reachable, `None` otherwise. -pub type PrevoutResolver = Box Option + Send + Sync>; +/// [`Mempool::start_with`]. Receives a slice of `(parent_txid, vout)` +/// holes and returns the subset that resolved. Unresolved holes are +/// simply omitted from the map; the next cycle retries automatically. +/// +/// Batched so the RPC implementation can pack one round-trip per cycle +/// (deduping by parent txid so a tx with N inputs from one parent costs +/// one fetch); the indexer implementation just loops over local reads. +pub type PrevoutResolver = + Box FxHashMap<(Txid, Vout), TxOut> + Send + Sync>; pub(crate) use state::State; @@ -68,6 +77,7 @@ struct Inner { client: Client, state: RwLock, rebuilder: Rebuilder, + started: AtomicBool, } impl Mempool { @@ -76,6 +86,7 @@ impl Mempool { client: client.clone(), state: RwLock::new(State::default()), rebuilder: Rebuilder::default(), + started: AtomicBool::new(false), })) } @@ -257,10 +268,7 @@ impl Mempool { /// evicted RBF predecessor reports the package-effective rate it /// had in the mempool, not a misleading isolated `fee/vsize`. pub fn graveyard_fee_rate(&self, txid: &Txid) -> Option { - self.read() - .graveyard - .get(txid) - .map(|tomb| tomb.chunk_rate) + self.read().graveyard.get(txid).map(|tomb| tomb.chunk_rate) } /// `first_seen` Unix-second timestamps for `txids`, in input order. @@ -292,8 +300,16 @@ impl Mempool { /// overruns `PERIOD`, the next cycle starts immediately. pub fn start_with(&self, resolver: F) where - F: Fn(&Txid, Vout) -> Option, + F: Fn(&[(Txid, Vout)]) -> FxHashMap<(Txid, Vout), TxOut>, { + if self + .0 + .started + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + panic!("Mempool::start_with already running on this instance"); + } const PERIOD: Duration = Duration::from_millis(500); loop { let started = Instant::now(); @@ -303,7 +319,10 @@ impl Mempool { } })); if let Err(payload) = outcome { - error!("mempool update panicked, continuing loop: {}", panic_msg(&payload)); + error!( + "mempool update panicked, continuing loop: {}", + panic_msg(&payload) + ); } if let Some(rest) = PERIOD.checked_sub(started.elapsed()) { thread::sleep(rest); @@ -311,38 +330,32 @@ impl Mempool { } } - /// One sync cycle with the default RPC resolver. Equivalent to - /// `update_with(rpc_resolver)`. Standalone consumers (Core + - /// `txindex=1`) get a one-line driver loop. - pub fn update(&self) -> Result<()> { - self.update_with(Prevouts::rpc_resolver(self.0.client.clone())) - } - /// One sync cycle: fetch, prepare, apply, fill prevouts, maybe /// rebuild. The resolver MUST resolve confirmed prevouts only; /// mempool-to-mempool chains are wired internally and the /// resolver is never called for them. - pub fn update_with(&self, resolver: F) -> Result<()> + fn update_with(&self, resolver: F) -> Result<()> where - F: Fn(&Txid, Vout) -> Option, + F: Fn(&[(Txid, Vout)]) -> FxHashMap<(Txid, Vout), TxOut>, { let Inner { client, state, rebuilder, + .. } = &*self.0; let Some(Fetched { live_txids, new_entries, - new_raws, + new_txs, gbt, min_fee, }) = Fetcher::fetch(client, state)? else { return Ok(()); }; - let pulled = Preparer::prepare(&live_txids, new_entries, new_raws, state); + let pulled = Preparer::prepare(&live_txids, new_entries, new_txs, state); let changed = Applier::apply(state, rebuilder, pulled); Prevouts::fill(state, resolver); rebuilder.tick(state, changed, &gbt, min_fee); diff --git a/crates/brk_mempool/src/steps/fetcher/fetched.rs b/crates/brk_mempool/src/steps/fetcher/fetched.rs index c00f33eb5..4b0851b10 100644 --- a/crates/brk_mempool/src/steps/fetcher/fetched.rs +++ b/crates/brk_mempool/src/steps/fetcher/fetched.rs @@ -1,4 +1,4 @@ -use brk_rpc::{BlockTemplateTx, RawTx}; +use brk_rpc::BlockTemplateTx; use brk_types::{FeeRate, MempoolEntryInfo, Txid}; use rustc_hash::FxHashMap; @@ -9,7 +9,7 @@ pub struct Fetched { /// `MempoolEntryInfo` for newly-observed txids only (existing ones /// keep their first-sight entry on the live store). pub new_entries: Vec, - pub new_raws: FxHashMap, + pub new_txs: FxHashMap, pub gbt: Vec, pub min_fee: FeeRate, } diff --git a/crates/brk_mempool/src/steps/fetcher/mod.rs b/crates/brk_mempool/src/steps/fetcher/mod.rs index d32bdbfeb..086592c5e 100644 --- a/crates/brk_mempool/src/steps/fetcher/mod.rs +++ b/crates/brk_mempool/src/steps/fetcher/mod.rs @@ -41,11 +41,11 @@ impl Fetcher { Self::new_txids(&live_txids, &state.txs) }; let new_entries = client.fetch_mempool_entries(&new_txids)?; - let new_raws = client.get_raw_transactions(&new_txids)?; + let new_txs = client.get_raw_transactions(&new_txids)?; Ok(Some(Fetched { live_txids, new_entries, - new_raws, + new_txs, gbt, min_fee, })) diff --git a/crates/brk_mempool/src/steps/preparer/mod.rs b/crates/brk_mempool/src/steps/preparer/mod.rs index b018c5526..cd20380b9 100644 --- a/crates/brk_mempool/src/steps/preparer/mod.rs +++ b/crates/brk_mempool/src/steps/preparer/mod.rs @@ -12,7 +12,6 @@ //! state on the live store. Removals are inferred by cross-referencing //! inputs against the full `live_txids` set from the cycle's pull. -use brk_rpc::RawTx; use brk_types::{MempoolEntryInfo, Transaction, Txid, TxidPrefix, Vout}; use parking_lot::RwLock; use rustc_hash::{FxHashMap, FxHashSet}; @@ -40,13 +39,13 @@ impl Preparer { pub fn prepare( live_txids: &[Txid], new_entries: Vec, - new_raws: FxHashMap, + new_txs: FxHashMap, lock: &RwLock, ) -> TxsPulled { let state = lock.read(); let live: FxHashSet = live_txids.iter().map(TxidPrefix::from).collect(); - let added = Self::classify_additions(new_entries, new_raws, &state.txs, &state.graveyard); + let added = Self::classify_additions(new_entries, new_txs, &state.txs, &state.graveyard); let removed = Self::classify_removals(&live, &added, &state.txs); TxsPulled { added, removed } @@ -54,13 +53,13 @@ impl Preparer { fn classify_additions( new_entries: Vec, - mut new_raws: FxHashMap, + mut new_txs: FxHashMap, known: &TxStore, graveyard: &TxGraveyard, ) -> Vec { new_entries .iter() - .filter_map(|info| Self::classify_addition(info, known, graveyard, &mut new_raws)) + .filter_map(|info| Self::classify_addition(info, known, graveyard, &mut new_txs)) .collect() } @@ -68,7 +67,7 @@ impl Preparer { info: &MempoolEntryInfo, known: &TxStore, graveyard: &TxGraveyard, - new_raws: &mut FxHashMap, + new_txs: &mut FxHashMap, ) -> Option { if known.contains(&info.txid) { return None; @@ -76,8 +75,8 @@ impl Preparer { if let Some(tomb) = graveyard.get(&info.txid) { return Some(TxAddition::revived(info, tomb)); } - let raw = new_raws.remove(&info.txid)?; - Some(TxAddition::fresh(info, raw, known)) + let tx = new_txs.remove(&info.txid)?; + Some(TxAddition::fresh(info, tx, known)) } /// One `(prefix, reason)` per known tx that's gone from the live set, diff --git a/crates/brk_mempool/src/steps/preparer/tx_addition.rs b/crates/brk_mempool/src/steps/preparer/tx_addition.rs index 70e7b6600..56bf51cee 100644 --- a/crates/brk_mempool/src/steps/preparer/tx_addition.rs +++ b/crates/brk_mempool/src/steps/preparer/tx_addition.rs @@ -9,9 +9,6 @@ //! (preserving `rbf`, `size`). The Applier exhumes the cached tx //! body. No raw decoding. -use std::mem; - -use brk_rpc::RawTx; use brk_types::{MempoolEntryInfo, SigOps, Transaction, TxIn, TxOut, TxStatus, Txid, Vout}; use crate::{TxTombstone, stores::TxStore}; @@ -27,39 +24,44 @@ impl TxAddition { /// Resolves prevouts against the live mempool only. Confirmed /// parents land with `prevout: None` and are filled by the /// resolver supplied to `Mempool::update_with` in the same cycle. - pub(super) fn fresh(info: &MempoolEntryInfo, raw: RawTx, mempool_txs: &TxStore) -> Self { - let total_size = raw.hex.len() / 2; - let rbf = raw.tx.input.iter().any(|i| i.sequence.is_rbf()); - let tx = Self::build_tx(info, raw, total_size, mempool_txs); + pub(super) fn fresh( + info: &MempoolEntryInfo, + tx: bitcoin::Transaction, + mempool_txs: &TxStore, + ) -> Self { + let total_size = tx.total_size(); + let rbf = tx.input.iter().any(|i| i.sequence.is_rbf()); + let built = Self::build_tx(info, tx, total_size, mempool_txs); let entry = TxEntry::new(info, total_size as u64, rbf); - Self::Fresh { tx, entry } + Self::Fresh { tx: built, entry } } fn build_tx( info: &MempoolEntryInfo, - mut raw: RawTx, + tx: bitcoin::Transaction, total_size: usize, mempool_txs: &TxStore, ) -> Transaction { - let input = mem::take(&mut raw.tx.input) + let input = tx + .input .into_iter() .map(|txin| Self::build_txin(txin, mempool_txs)) .collect(); - let mut tx = Transaction { + let mut built = Transaction { index: None, txid: info.txid, - version: raw.tx.version.into(), + version: tx.version.into(), total_sigop_cost: SigOps::ZERO, weight: info.weight, - lock_time: raw.tx.lock_time.into(), + lock_time: tx.lock_time.into(), total_size, fee: info.fee, input, - output: raw.tx.output.into_iter().map(TxOut::from).collect(), + output: tx.output.into_iter().map(TxOut::from).collect(), status: TxStatus::UNCONFIRMED, }; - tx.total_sigop_cost = tx.total_sigop_cost(); - tx + built.total_sigop_cost = built.total_sigop_cost(); + built } pub(super) fn revived(info: &MempoolEntryInfo, tomb: &TxTombstone) -> Self { diff --git a/crates/brk_mempool/src/steps/preparer/tx_removal.rs b/crates/brk_mempool/src/steps/preparer/tx_removal.rs index 883ffd6a8..451a9b2f1 100644 --- a/crates/brk_mempool/src/steps/preparer/tx_removal.rs +++ b/crates/brk_mempool/src/steps/preparer/tx_removal.rs @@ -5,6 +5,9 @@ use brk_types::Txid; /// `Replaced` = at least one freshly added tx this cycle spends one of /// its inputs (BIP-125 replacement inferred from conflicting outpoints). +/// `by` is the immediate successor; the chain extends if `by` is itself +/// later replaced. Walk it forward via `TxGraveyard::replacement_root_of`. +/// /// `Vanished` = any other reason we can't distinguish from the data at /// hand (mined, expired, evicted, or replaced by a tx we didn't fetch /// due to the per-cycle fetch cap). diff --git a/crates/brk_mempool/src/steps/prevouts.rs b/crates/brk_mempool/src/steps/prevouts.rs index 5739f5ae0..08068082c 100644 --- a/crates/brk_mempool/src/steps/prevouts.rs +++ b/crates/brk_mempool/src/steps/prevouts.rs @@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use brk_rpc::Client; use brk_types::{TxOut, Txid, TxidPrefix, Vin, Vout}; use parking_lot::RwLock; +use rustc_hash::{FxHashMap, FxHashSet}; use tracing::warn; use crate::{State, stores::TxStore}; @@ -33,15 +34,16 @@ type Fills = Vec<(Vin, TxOut)>; type Holes = Vec<(Vin, Txid, Vout)>; type FillBatch = Vec<(Txid, Fills)>; type HoleBatch = Vec<(Txid, Holes)>; +type Resolved = FxHashMap<(Txid, Vout), TxOut>; impl Prevouts { /// Fill every unfilled prevout the cycle can resolve. Same-cycle /// in-mempool parents are filled lock-locally; the remainder go - /// through `resolver` outside any lock. Returns true iff anything - /// was written. + /// through `resolver` (one batched call) outside any lock. Returns + /// true iff anything was written. pub fn fill(lock: &RwLock, resolver: F) -> bool where - F: Fn(&Txid, Vout) -> Option, + F: Fn(&[(Txid, Vout)]) -> Resolved, { let (in_mempool, holes) = { let state = lock.read(); @@ -63,28 +65,41 @@ impl Prevouts { true } - /// Default resolver: per-call `getrawtransaction` against the - /// bitcoind RPC client `Mempool` already holds. Requires - /// `txindex=1`. On any failure logs once with a hint, then returns - /// `None`; the next cycle retries automatically. - pub fn rpc_resolver(client: Client) -> impl Fn(&Txid, Vout) -> Option { + /// Default resolver: one batched `getrawtransaction` per cycle, + /// deduped by parent txid. Requires bitcoind with `txindex=1`. + pub fn rpc_resolver(client: Client) -> impl Fn(&[(Txid, Vout)]) -> Resolved { let warned = AtomicBool::new(false); - move |txid, vout| { - let bt: &bitcoin::Txid = txid.into(); - match client.get_raw_transaction(bt, None as Option<&bitcoin::BlockHash>) { - Ok(tx) => tx - .output - .get(usize::from(vout)) - .map(|o| TxOut::from((o.script_pubkey.clone(), o.value.into()))), + move |holes: &[(Txid, Vout)]| { + if holes.is_empty() { + return Resolved::default(); + } + let mut seen: FxHashSet = FxHashSet::default(); + let unique: Vec = holes + .iter() + .filter_map(|(t, _)| seen.insert(*t).then_some(*t)) + .collect(); + let parents = match client.get_raw_transactions(&unique) { + Ok(map) => { + warned.store(false, Ordering::Relaxed); + map + } Err(_) => { if !warned.swap(true, Ordering::Relaxed) { warn!( - "mempool: getrawtransaction missed for {txid}; ensure bitcoind is running with txindex=1" + "mempool: getrawtransaction batch failed; ensure bitcoind is running with txindex=1" ); } - None + return Resolved::default(); } - } + }; + holes + .iter() + .filter_map(|(txid, vout)| { + let o = parents.get(txid)?.output.get(usize::from(*vout))?; + let txout = TxOut::from((o.script_pubkey.clone(), o.value.into())); + Some(((*txid, *vout), txout)) + }) + .collect() } } @@ -92,9 +107,6 @@ impl Prevouts { /// same-cycle in-mempool fill (parent is live) or an external hole /// (parent is confirmed or unknown). fn gather(txs: &TxStore) -> (FillBatch, HoleBatch) { - if txs.unresolved().is_empty() { - return (Vec::new(), Vec::new()); - } let mut filled: FillBatch = Vec::new(); let mut holes: HoleBatch = Vec::new(); for prefix in txs.unresolved() { @@ -127,17 +139,33 @@ impl Prevouts { (filled, holes) } + /// Flatten holes into one `(prev_txid, vout)` slice, invoke the + /// resolver once, then re-attribute resolved entries to their + /// consumer txs. Mempool double-spend rules guarantee every + /// `(prev_txid, vout)` key is unique across the batch, so no + /// dedup is needed before calling. fn resolve_external(holes: HoleBatch, resolver: F) -> FillBatch where - F: Fn(&Txid, Vout) -> Option, + F: Fn(&[(Txid, Vout)]) -> Resolved, { + let total: usize = holes.iter().map(|(_, h)| h.len()).sum(); + let mut flat: Vec<(Txid, Vout)> = Vec::with_capacity(total); + for (_, tx_holes) in &holes { + for (_, prev_txid, vout) in tx_holes { + flat.push((*prev_txid, *vout)); + } + } + let mut resolved = resolver(&flat); + if resolved.is_empty() { + return Vec::new(); + } holes .into_iter() - .filter_map(|(txid, holes)| { - let fills: Fills = holes + .filter_map(|(txid, tx_holes)| { + let fills: Fills = tx_holes .into_iter() .filter_map(|(vin, prev_txid, vout)| { - resolver(&prev_txid, vout).map(|o| (vin, o)) + resolved.remove(&(prev_txid, vout)).map(|o| (vin, o)) }) .collect(); (!fills.is_empty()).then_some((txid, fills)) diff --git a/crates/brk_mempool/src/steps/rebuilder/partition.rs b/crates/brk_mempool/src/steps/rebuilder/partition.rs index f4ca5d04d..135ebaf87 100644 --- a/crates/brk_mempool/src/steps/rebuilder/partition.rs +++ b/crates/brk_mempool/src/steps/rebuilder/partition.rs @@ -16,8 +16,6 @@ //! block if it fits; otherwise advance to the next block (unless we //! are already on the last one, which absorbs everything remaining). -use std::cmp::Reverse; - use brk_types::{FeeRate, VSize}; use rustc_hash::FxHashSet; @@ -64,6 +62,10 @@ fn sorted_candidates( (!excluded.contains(&idx)).then_some((idx, t.vsize, t.chunk_rate)) }) .collect(); - cands.sort_by_key(|(_, _, rate)| Reverse(*rate)); + cands.sort_by(|(a_idx, _, a_rate), (b_idx, _, b_rate)| { + b_rate + .cmp(a_rate) + .then_with(|| txs[a_idx.as_usize()].txid.cmp(&txs[b_idx.as_usize()].txid)) + }); cands } diff --git a/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs b/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs index 2033fc5ec..be4d26699 100644 --- a/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs +++ b/crates/brk_mempool/src/steps/rebuilder/snapshot/mod.rs @@ -9,9 +9,10 @@ pub use stats::BlockStats; pub use tx::SnapTx; pub use tx_index::TxIndex; -use std::hash::{DefaultHasher, Hash, Hasher}; +use std::hash::{Hash, Hasher}; use brk_types::{FeeRate, NextBlockHash, RecommendedFees, Txid, TxidPrefix}; +use rustc_hash::FxHasher; use fees::Fees; @@ -49,7 +50,7 @@ impl Snapshot { ) -> Self { let block_stats = BlockStats::for_blocks(&blocks, &txs); let fees = Fees::compute(&block_stats, min_fee); - let next_block_hash = Self::hash_next_block(&blocks); + let next_block_hash = Self::hash_next_block(&blocks, &txs); Self { txs, blocks, @@ -60,12 +61,18 @@ impl Snapshot { } } - fn hash_next_block(blocks: &[Vec]) -> NextBlockHash { + /// Content tag over block 0 in template order. Hashes txids, not + /// `TxIndex` slots, because slot assignment is per-cycle and + /// unstable; the txid set is the actual identity of "what's in the + /// next block". + fn hash_next_block(blocks: &[Vec], txs: &[SnapTx]) -> NextBlockHash { let Some(block) = blocks.first() else { return NextBlockHash::ZERO; }; - let mut hasher = DefaultHasher::new(); - block.hash(&mut hasher); + let mut hasher = FxHasher::default(); + for idx in block { + txs[idx.as_usize()].txid.hash(&mut hasher); + } NextBlockHash::new(hasher.finish()) } diff --git a/crates/brk_mempool/src/stores/addr_tracker/mod.rs b/crates/brk_mempool/src/stores/addr_tracker/mod.rs index c625615f3..645534d2e 100644 --- a/crates/brk_mempool/src/stores/addr_tracker/mod.rs +++ b/crates/brk_mempool/src/stores/addr_tracker/mod.rs @@ -1,11 +1,11 @@ use std::{ collections::hash_map::Entry as MapEntry, - hash::{DefaultHasher, Hash, Hasher}, + hash::{Hash, Hasher}, }; use brk_types::{AddrBytes, AddrMempoolStats, Transaction, TxOut, Txid}; use derive_more::Deref; -use rustc_hash::FxHashMap; +use rustc_hash::{FxHashMap, FxHasher}; mod addr_entry; @@ -52,7 +52,7 @@ impl AddrTracker { let Some(entry) = self.0.get(addr) else { return 0; }; - let mut hasher = DefaultHasher::new(); + let mut hasher = FxHasher::default(); entry.stats.hash(&mut hasher); hasher.finish() } diff --git a/crates/brk_mempool/src/stores/tx_graveyard/mod.rs b/crates/brk_mempool/src/stores/tx_graveyard/mod.rs index db420494c..3fa858b6e 100644 --- a/crates/brk_mempool/src/stores/tx_graveyard/mod.rs +++ b/crates/brk_mempool/src/stores/tx_graveyard/mod.rs @@ -44,7 +44,9 @@ impl TxGraveyard { } /// Walk forward through `Replaced { by }` to the terminal replacer. - /// Returns `txid` itself if it isn't replaced (live or `Vanished`). + /// Returns the first txid in the chain that isn't a `Replaced` + /// tombstone: live, `Vanished`, or unknown (chain broken because an + /// intermediate `by` aged out of the graveyard). pub fn replacement_root_of(&self, mut txid: Txid) -> Txid { while let Some(TxRemoval::Replaced { by }) = self.tombstones.get(&txid).map(|t| &t.removal) diff --git a/crates/brk_query/src/impl/mempool.rs b/crates/brk_query/src/impl/mempool.rs index b9e68fe42..2dca0ac55 100644 --- a/crates/brk_query/src/impl/mempool.rs +++ b/crates/brk_query/src/impl/mempool.rs @@ -4,8 +4,9 @@ use brk_mempool::{Mempool, PrevoutResolver, RbfForTx, RbfNode}; use brk_types::{ BlockTemplate, BlockTemplateDiff, CheckedSub, FeeRate, MempoolBlock, MempoolInfo, MempoolRecentTx, NextBlockHash, OutputType, RbfResponse, RbfTx, RecommendedFees, - ReplacementNode, Sats, Timestamp, TxOut, TxOutIndex, Txid, TxidPrefix, TypeIndex, + ReplacementNode, Sats, Timestamp, TxOut, TxOutIndex, Txid, TxidPrefix, TypeIndex, Vout, }; +use rustc_hash::FxHashMap; const RECENT_REPLACEMENTS_LIMIT: usize = 25; @@ -31,53 +32,46 @@ impl Query { Ok(mempool.block_stats().iter().map(MempoolBlock::from).collect()) } - /// Indexer-backed resolver for confirmed-parent prevouts. Pass - /// the returned closure to `Mempool::start_with` / - /// `Mempool::update_with`; the mempool driver calls it post-apply - /// for every still-unfilled `prevout == None` input. - /// - /// Reads go through `read_once` rather than a captured - /// `VecReader`: `VecReader::stored_len` is snapshotted at - /// construction, so a long-lived reader paired with fresh - /// `safe_lengths` would let `safe.tx_index` / `safe.txout_index` - /// advance past the reader's frozen length and panic in - /// `reader.get()`. `read_once` rebinds against the current vec - /// length per call and lets newly indexed parents become - /// resolvable on the next cycle. + /// Indexer-backed resolver for confirmed-parent prevouts. pub fn indexer_prevout_resolver(&self) -> PrevoutResolver { - let query = self.clone(); let indexer = self.0.indexer; - Box::new(move |prev_txid, vout| { - let safe = query.safe_lengths(); - let prev_tx_index = indexer - .stores - .txid_prefix_to_tx_index - .get(&TxidPrefix::from(prev_txid)) - .ok()?? - .into_owned(); - if prev_tx_index >= safe.tx_index { - return None; + Box::new(move |holes: &[(Txid, Vout)]| { + if holes.is_empty() { + return FxHashMap::default(); } - let first_txout: TxOutIndex = indexer - .vecs - .transactions - .first_txout_index - .read_once(prev_tx_index) - .ok()?; - let txout = first_txout + vout; - if txout >= safe.txout_index { - return None; - } - let output_type: OutputType = indexer.vecs.outputs.output_type.read_once(txout).ok()?; - let type_index: TypeIndex = indexer.vecs.outputs.type_index.read_once(txout).ok()?; - let value: Sats = indexer.vecs.outputs.value.read_once(txout).ok()?; - let script_pubkey = indexer - .vecs - .addrs - .addr_readers() - .script_pubkey(output_type, type_index); - Some(TxOut::from((script_pubkey, value))) + let safe = indexer.safe_lengths(); + let first_txout_reader = indexer.vecs.transactions.first_txout_index.reader(); + let output_type_reader = indexer.vecs.outputs.output_type.reader(); + let type_index_reader = indexer.vecs.outputs.type_index.reader(); + let value_reader = indexer.vecs.outputs.value.reader(); + let addr_readers = indexer.vecs.addrs.addr_readers(); + holes + .iter() + .filter_map(|(prev_txid, vout)| { + let prev_tx_index = indexer + .stores + .txid_prefix_to_tx_index + .get(&TxidPrefix::from(prev_txid)) + .ok()?? + .into_owned(); + if prev_tx_index >= safe.tx_index { + return None; + } + let first_txout: TxOutIndex = + first_txout_reader.try_get(usize::from(prev_tx_index))?; + let txout = first_txout + *vout; + if txout >= safe.txout_index { + return None; + } + let txout_idx = usize::from(txout); + let output_type: OutputType = output_type_reader.try_get(txout_idx)?; + let type_index: TypeIndex = type_index_reader.try_get(txout_idx)?; + let value: Sats = value_reader.try_get(txout_idx)?; + let script_pubkey = addr_readers.script_pubkey(output_type, type_index); + Some(((*prev_txid, *vout), TxOut::from((script_pubkey, value)))) + }) + .collect() }) } diff --git a/crates/brk_rpc/src/lib.rs b/crates/brk_rpc/src/lib.rs index fa947df4a..a59f5ef61 100644 --- a/crates/brk_rpc/src/lib.rs +++ b/crates/brk_rpc/src/lib.rs @@ -7,7 +7,7 @@ use std::{ use bitcoin::ScriptBuf; use brk_error::Result; -use brk_types::{BlockHash, Hex, Sats, Txid}; +use brk_types::{BlockHash, Sats, Txid}; mod client; mod methods; @@ -41,15 +41,6 @@ pub struct BlockTemplateTx { pub fee: Sats, } -/// A transaction fetched from Core alongside the exact hex bytes Core -/// returned, so downstream code can re-emit the raw tx without re- -/// serializing (which could diverge on segwit flag encoding, etc.). -#[derive(Debug, Clone)] -pub struct RawTx { - pub tx: bitcoin::Transaction, - pub hex: Hex, -} - #[derive(Clone, Debug)] pub enum Auth { None, diff --git a/crates/brk_rpc/src/methods.rs b/crates/brk_rpc/src/methods.rs index 55d6c2103..87d9b8f4a 100644 --- a/crates/brk_rpc/src/methods.rs +++ b/crates/brk_rpc/src/methods.rs @@ -24,7 +24,7 @@ use tracing::{debug, info}; /// The mempool fetcher tolerates these per-item failures silently. const RPC_NOT_FOUND: i32 = -5; -use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, RawTx, TxOutInfo}; +use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, TxOutInfo}; /// Per-batch request count for `get_block_hashes_range`, /// `fetch_mempool_entries`, and `fetch_raw_transactions`. Sized so the @@ -94,8 +94,13 @@ fn build_gbt(raw: GbtResponseRaw) -> Vec { .collect() } +/// Convert bitcoind's `mempoolminfee` (BTC/kvB f64) to sat/vB. Round-trip +/// via integer sat/kvB (bitcoind's native CFeeRate unit) so the f64 drift +/// in the JSON-decoded value can't push 1.0 sat/vB to 1.0...e-13 above 1.0 +/// and trip `ceil_to(0.001)` downstream. fn build_min_fee(raw: GetMempoolInfo) -> FeeRate { - FeeRate::from(raw.mempool_min_fee * 100_000.0) + let sat_per_kvb = (raw.mempool_min_fee * 100_000_000.0).round() as u64; + FeeRate::from(sat_per_kvb as f64 / 1000.0) } impl Client { @@ -247,55 +252,70 @@ impl Client { .collect() } - pub fn get_raw_transaction<'a, T, H>( + pub fn get_raw_transaction<'a, T>(&self, txid: &'a T) -> Result + where + &'a T: Into<&'a bitcoin::Txid>, + { + let hex = self.get_raw_transaction_hex(txid)?; + Ok(encode::deserialize_hex::(&hex)?) + } + + pub fn get_raw_transaction_from<'a, T, H>( &self, txid: &'a T, - block_hash: Option<&'a H>, + block_hash: &'a H, ) -> Result where &'a T: Into<&'a bitcoin::Txid>, &'a H: Into<&'a bitcoin::BlockHash>, { - let hex = self.get_raw_transaction_hex(txid, block_hash)?; - let tx = encode::deserialize_hex::(&hex)?; - Ok(tx) + let hex = self.get_raw_transaction_hex_from(txid, block_hash)?; + Ok(encode::deserialize_hex::(&hex)?) } - pub fn get_raw_transaction_hex<'a, T, H>( + pub fn get_raw_transaction_hex<'a, T>(&self, txid: &'a T) -> Result + where + &'a T: Into<&'a bitcoin::Txid>, + { + let txid: &bitcoin::Txid = txid.into(); + let args = [serde_json::to_value(txid)?, Value::Bool(false)]; + self.0.call_with_retry("getrawtransaction", &args) + } + + pub fn get_raw_transaction_hex_from<'a, T, H>( &self, txid: &'a T, - block_hash: Option<&'a H>, + block_hash: &'a H, ) -> Result where &'a T: Into<&'a bitcoin::Txid>, &'a H: Into<&'a bitcoin::BlockHash>, { let txid: &bitcoin::Txid = txid.into(); - let mut args: Vec = vec![serde_json::to_value(txid)?, Value::Bool(false)]; - if let Some(bh) = block_hash { - let bh: &bitcoin::BlockHash = bh.into(); - args.push(serde_json::to_value(bh)?); - } + let bh: &bitcoin::BlockHash = block_hash.into(); + let args = [ + serde_json::to_value(txid)?, + Value::Bool(false), + serde_json::to_value(bh)?, + ]; self.0.call_with_retry("getrawtransaction", &args) } - pub fn get_mempool_raw_tx(&self, txid: &Txid) -> Result { - let hex = self.get_raw_transaction_hex(txid, None as Option<&BlockHash>)?; - let tx = encode::deserialize_hex::(&hex)?; - Ok(RawTx { - tx, - hex: hex.into(), - }) + pub fn get_mempool_raw_tx(&self, txid: &Txid) -> Result { + self.get_raw_transaction(txid) } /// Batched `getrawtransaction` over a slice of txids. Returns a map keyed - /// by txid containing the deserialized tx and its raw hex. Individual - /// failures (e.g. a tx that evicted between the listing and this call) - /// are logged and dropped so a single bad entry doesn't kill the batch. + /// by txid containing the deserialized tx. Individual failures (e.g. a + /// tx that evicted between the listing and this call) are logged and + /// dropped so a single bad entry doesn't kill the batch. /// /// Chunked at `BATCH_CHUNK` requests per round-trip. - pub fn get_raw_transactions(&self, txids: &[Txid]) -> Result> { - let mut out: FxHashMap = + pub fn get_raw_transactions( + &self, + txids: &[Txid], + ) -> Result> { + let mut out: FxHashMap = FxHashMap::with_capacity_and_hasher(txids.len(), Default::default()); for chunk in txids.chunks(BATCH_CHUNK) { @@ -311,14 +331,10 @@ impl Client { for (txid, res) in chunk.iter().zip(results) { match res.and_then(|hex| { - let tx = encode::deserialize_hex::(&hex)?; - Ok::<_, Error>(RawTx { - tx, - hex: hex.into(), - }) + Ok(encode::deserialize_hex::(&hex)?) }) { - Ok(raw) => { - out.insert(*txid, raw); + Ok(tx) => { + out.insert(*txid, tx); } Err(Error::CorepcRPC(JsonRpcError::Rpc(rpc))) if rpc.code == RPC_NOT_FOUND => {} Err(e) => {