mempool: fixes

This commit is contained in:
nym21
2026-05-10 16:23:06 +02:00
parent 774580ee11
commit dd6eca138b
10 changed files with 227 additions and 134 deletions

View File

@@ -3,7 +3,6 @@
use std::{ use std::{
fs, fs,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc,
thread, thread,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@@ -13,7 +12,6 @@ use brk_reader::{Reader, XORBytes};
use brk_rpc::Client; use brk_rpc::Client;
use brk_types::{BlockHash, Height}; use brk_types::{BlockHash, Height};
use fjall::PersistMode; use fjall::PersistMode;
use parking_lot::RwLock;
use tracing::{debug, error, info}; use tracing::{debug, error, info};
use vecdb::{ use vecdb::{
Exit, RawDBError, ReadOnlyClone, ReadableVec, Ro, Rw, StorageMode, WritableVec, unlikely, Exit, RawDBError, ReadOnlyClone, ReadableVec, Ro, Rw, StorageMode, WritableVec, unlikely,
@@ -39,13 +37,25 @@ pub struct Indexer<M: StorageMode = Rw> {
path: PathBuf, path: PathBuf,
pub vecs: Vecs<M>, pub vecs: Vecs<M>,
pub stores: Stores, pub stores: Stores,
tip_blockhash: Arc<RwLock<BlockHash>>,
safe_lengths: SafeLengths, safe_lengths: SafeLengths,
} }
impl<M: StorageMode> Indexer<M> { impl<M: StorageMode> Indexer<M> {
/// Tip block hash at the pipeline-safe ceiling.
///
/// Reads the on-disk blockhash vec at `safe_lengths.height - 1` so
/// the answer always agrees with `safe_lengths`. The indexer's loop
/// pushes new hashes per block before `safe_lengths` advances (that
/// only happens after the compute pass via
/// [`Indexer::advance_safe_lengths`]); reading from a live cache
/// here would mint a tip ahead of every safe-bound endpoint and
/// cause cache etags to invalidate before the data they cover is
/// actually queryable.
pub fn tip_blockhash(&self) -> BlockHash { pub fn tip_blockhash(&self) -> BlockHash {
*self.tip_blockhash.read() match self.safe_lengths().height.decremented() {
Some(h) => self.vecs.blocks.blockhash.collect_one(h).unwrap_or_default(),
None => BlockHash::default(),
}
} }
/// Pipeline-safe `Lengths` snapshot shared with `Query`. Writers /// Pipeline-safe `Lengths` snapshot shared with `Query`. Writers
@@ -83,8 +93,6 @@ impl Indexer {
let stores = Stores::forced_import(&indexed_path, VERSION)?; let stores = Stores::forced_import(&indexed_path, VERSION)?;
info!("Imported stores in {:?}", i.elapsed()); info!("Imported stores in {:?}", i.elapsed());
let tip_blockhash = vecs.blocks.blockhash.collect_last().unwrap_or_default();
let safe_lengths = SafeLengths::new(); let safe_lengths = SafeLengths::new();
if let Some(lengths) = Lengths::from_local(&vecs, &stores) { if let Some(lengths) = Lengths::from_local(&vecs, &stores) {
safe_lengths.advance(lengths); safe_lengths.advance(lengths);
@@ -94,7 +102,6 @@ impl Indexer {
path: indexed_path.clone(), path: indexed_path.clone(),
vecs, vecs,
stores, stores,
tip_blockhash: Arc::new(RwLock::new(tip_blockhash)),
safe_lengths, safe_lengths,
}) })
}; };
@@ -122,7 +129,6 @@ impl Indexer {
fn full_reset(&mut self) -> Result<()> { fn full_reset(&mut self) -> Result<()> {
info!("Full reset..."); info!("Full reset...");
self.safe_lengths.reset(); self.safe_lengths.reset();
*self.tip_blockhash.write() = BlockHash::default();
self.vecs.reset()?; self.vecs.reset()?;
let stores_path = self.path.join("stores"); let stores_path = self.path.join("stores");
fs::remove_dir_all(&stores_path).ok(); fs::remove_dir_all(&stores_path).ok();
@@ -188,9 +194,6 @@ impl Indexer {
debug!("Rollback stores done."); debug!("Rollback stores done.");
self.vecs.rollback_if_needed(&starting_lengths)?; self.vecs.rollback_if_needed(&starting_lengths)?;
debug!("Rollback vecs done."); debug!("Rollback vecs done.");
if let Some(hash) = prev_hash.as_ref() {
*self.tip_blockhash.write() = *hash;
}
drop(lock); drop(lock);
let mut lengths = starting_lengths; let mut lengths = starting_lengths;
@@ -312,8 +315,6 @@ impl Indexer {
export(stores, vecs, height)?; export(stores, vecs, height)?;
readers = Readers::new(vecs); readers = Readers::new(vecs);
} }
*self.tip_blockhash.write() = block.block_hash().into();
} }
drop(readers); drop(readers);
@@ -388,7 +389,6 @@ impl ReadOnlyClone for Indexer {
path: self.path.clone(), path: self.path.clone(),
vecs: self.vecs.read_only_clone(), vecs: self.vecs.read_only_clone(),
stores: self.stores.clone(), stores: self.stores.clone(),
tip_blockhash: self.tip_blockhash.clone(),
safe_lengths: self.safe_lengths.clone(), safe_lengths: self.safe_lengths.clone(),
} }
} }

View File

@@ -4,9 +4,10 @@
//! //!
//! 1. [`steps::fetcher::Fetcher`] - one mixed batched RPC for //! 1. [`steps::fetcher::Fetcher`] - one mixed batched RPC for
//! `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`, //! `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`,
//! then a `getmempoolentry` batch and a `getrawtransaction` batch //! then a single mixed `getmempoolentry`+`getrawtransaction` batch
//! on new txids only. The GBT is validated to be a subset of the //! on new txids only. GBT-only txs are synthesized inline from the
//! txid listing; on mismatch the cycle is skipped. //! GBT payload so block 0 matches Core's selection exactly without
//! a follow-up entry fetch that could race the listing.
//! 2. [`steps::preparer::Preparer`] - decode and classify into //! 2. [`steps::preparer::Preparer`] - decode and classify into
//! `TxsPulled { added, removed }`. Pure CPU. //! `TxsPulled { added, removed }`. Pure CPU.
//! 3. [`steps::applier::Applier`] - apply the diff to //! 3. [`steps::applier::Applier`] - apply the diff to
@@ -345,20 +346,17 @@ impl Mempool {
.. ..
} = &*self.0; } = &*self.0;
let Some(Fetched { let Fetched {
live_txids, live_txids,
new_entries, new_entries,
new_txs, new_txs,
gbt, gbt_txids,
min_fee, min_fee,
}) = Fetcher::fetch(client, state)? } = Fetcher::fetch(client, state)?;
else {
return Ok(());
};
let pulled = Preparer::prepare(&live_txids, new_entries, new_txs, state); let pulled = Preparer::prepare(&live_txids, new_entries, new_txs, state);
let changed = Applier::apply(state, rebuilder, pulled); let changed = Applier::apply(state, rebuilder, pulled);
Prevouts::fill(state, resolver); Prevouts::fill(state, resolver);
rebuilder.tick(state, changed, &gbt, min_fee); rebuilder.tick(state, changed, &gbt_txids, min_fee);
Ok(()) Ok(())
} }

View File

@@ -1,4 +1,3 @@
use brk_rpc::BlockTemplateTx;
use brk_types::{FeeRate, MempoolEntryInfo, Txid}; use brk_types::{FeeRate, MempoolEntryInfo, Txid};
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
@@ -10,6 +9,10 @@ pub struct Fetched {
/// keep their first-sight entry on the live store). /// keep their first-sight entry on the live store).
pub new_entries: Vec<MempoolEntryInfo>, pub new_entries: Vec<MempoolEntryInfo>,
pub new_txs: FxHashMap<Txid, bitcoin::Transaction>, pub new_txs: FxHashMap<Txid, bitcoin::Transaction>,
pub gbt: Vec<BlockTemplateTx>, /// Block 0 ordering from `getblocktemplate`. Bodies and stats have
/// already been folded into `new_entries`/`new_txs` (or were already
/// in the pool); the Rebuilder only needs the txid sequence to
/// project Core's exact selection.
pub gbt_txids: Vec<Txid>,
pub min_fee: FeeRate, pub min_fee: FeeRate,
} }

View File

@@ -4,64 +4,106 @@ pub use fetched::Fetched;
use brk_error::Result; use brk_error::Result;
use brk_rpc::{Client, MempoolState}; use brk_rpc::{Client, MempoolState};
use brk_types::Txid; use brk_types::{MempoolEntryInfo, Timestamp, Txid, VSize};
use parking_lot::RwLock; use parking_lot::RwLock;
use rustc_hash::FxHashSet;
use crate::{State, stores::TxStore}; use crate::State;
/// Cap before the batch RPC so we never hand bitcoind an unbounded batch. /// Cap before the batch RPC so we never hand bitcoind an unbounded batch.
/// GBT-synthesized entries are not subject to this cap; they're bounded
/// by the block weight limit Core enforces on its own template.
const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000; const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000;
/// Three batched round-trips per cycle, scaling with churn rather than /// Two batched round-trips per cycle, scaling with churn rather than
/// mempool size: `getblocktemplate` + `getrawmempool false` + /// mempool size: `getblocktemplate` + `getrawmempool false` +
/// `getmempoolinfo` in one mixed batch; then `getmempoolentry` and /// `getmempoolinfo` in one mixed batch; then `getmempoolentry` +
/// `getrawtransaction` per *new* txid only. /// `getrawtransaction` for *new* non-GBT txids in a second mixed batch.
/// ///
/// `getblocktemplate` is validated to be a subset of the txid listing /// GBT entries already carry the full tx body and stats, so any GBT tx
/// inside the RPC layer; mismatches return `Ok(None)` so the cycle is /// not yet in the local pool is materialized inline from the GBT
/// skipped without polluting downstream state. /// payload instead of being refetched. That removes the GBT/listing
/// race that used to skip cycles when a tx vanished from the mempool
/// between the GBT and `getrawmempool` calls: block 0 always reflects
/// Core's exact selection because we never ask for that data twice.
/// ///
/// Confirmed prevouts are resolved post-apply by the caller-supplied /// Confirmed prevouts are resolved post-apply by the caller-supplied
/// resolver passed to `Mempool::update_with`, so the in-crate path no /// resolver passed to `Mempool::update_with`, so the in-crate path no
/// longer issues a fourth batch for parents. /// longer issues a third batch for parents.
pub struct Fetcher; pub struct Fetcher;
impl Fetcher { impl Fetcher {
pub fn fetch(client: &Client, lock: &RwLock<State>) -> Result<Option<Fetched>> { pub fn fetch(client: &Client, lock: &RwLock<State>) -> Result<Fetched> {
let Some(MempoolState { let MempoolState {
live_txids, live_txids,
gbt, gbt,
min_fee, min_fee,
}) = client.fetch_mempool_state()? } = client.fetch_mempool_state()?;
else {
return Ok(None); // One read snapshot decides both the RPC fetch list and the
}; // GBT-synthesis set, so they agree on what's "already known".
let new_txids = { // Graveyard txs are treated as known so a re-broadcast still
// flows through `Preparer::classify_addition` and lands as
// [`crate::TxAddition::Revived`].
let (new_txids, gbt_synth_set) = {
let state = lock.read(); let state = lock.read();
Self::new_txids(&live_txids, &state.txs) let mut gbt_txids: FxHashSet<Txid> =
FxHashSet::with_capacity_and_hasher(gbt.len(), Default::default());
let mut gbt_synth_set: FxHashSet<Txid> = FxHashSet::default();
for g in &gbt {
gbt_txids.insert(g.txid);
if !state.txs.contains(&g.txid) {
gbt_synth_set.insert(g.txid);
}
}
let new_txids: Vec<Txid> = live_txids
.iter()
.filter(|t| !state.txs.contains(t) && !gbt_txids.contains(t))
.take(MAX_TX_FETCHES_PER_CYCLE)
.copied()
.collect();
(new_txids, gbt_synth_set)
}; };
let new_entries = client.fetch_mempool_entries(&new_txids)?;
let new_txs = client.get_raw_transactions(&new_txids)?; let (mut new_entries, mut new_txs) = client.fetch_new_pool_data(&new_txids)?;
Ok(Some(Fetched { new_entries.reserve(gbt_synth_set.len());
new_txs.reserve(gbt_synth_set.len());
// Consume `gbt` by value: GBT-only txs move their body and
// depends into the synthesis path (no clones), and the GBT
// ordering is captured as a `Vec<Txid>` for the Rebuilder, which
// is the only downstream consumer and only reads txids.
//
// GBT carries no per-tx arrival timestamp. `now` is correct to
// within ~1 cycle for a tx that just entered Core's mempool
// (the only kind that triggers synthesis: not in our pool yet
// means it just appeared this cycle).
let now = Timestamp::now();
let gbt_txids: Vec<Txid> = gbt
.into_iter()
.map(|g| {
let txid = g.txid;
if gbt_synth_set.contains(&txid) {
new_entries.push(MempoolEntryInfo {
txid,
vsize: VSize::from(g.weight),
weight: g.weight,
fee: g.fee,
first_seen: now,
depends: g.depends,
});
new_txs.insert(txid, g.tx);
}
txid
})
.collect();
Ok(Fetched {
live_txids, live_txids,
new_entries, new_entries,
new_txs, new_txs,
gbt, gbt_txids,
min_fee, min_fee,
})) })
}
/// Live txids the local store hasn't seen yet. Graveyard txs are
/// included so a re-broadcast (post-reorg or a peer republishing)
/// flows through `Preparer::classify_addition` and lands as
/// [`crate::TxAddition::Revived`] instead of sitting orphaned for
/// the full graveyard retention.
fn new_txids(live_txids: &[Txid], known: &TxStore) -> Vec<Txid> {
live_txids
.iter()
.filter(|txid| !known.contains(txid))
.take(MAX_TX_FETCHES_PER_CYCLE)
.copied()
.collect()
} }
} }

View File

@@ -6,7 +6,6 @@ use std::{
}, },
}; };
use brk_rpc::BlockTemplateTx;
use brk_types::{FeeRate, NextBlockHash, Txid, TxidPrefix}; use brk_types::{FeeRate, NextBlockHash, Txid, TxidPrefix};
use parking_lot::RwLock; use parking_lot::RwLock;
use rustc_hash::FxHashSet; use rustc_hash::FxHashSet;
@@ -45,7 +44,7 @@ impl Rebuilder {
&self, &self,
lock: &RwLock<State>, lock: &RwLock<State>,
changed: bool, changed: bool,
gbt: &[BlockTemplateTx], gbt_txids: &[Txid],
min_fee: FeeRate, min_fee: FeeRate,
) { ) {
if changed { if changed {
@@ -55,7 +54,7 @@ impl Rebuilder {
self.skip_clean.fetch_add(1, Ordering::Relaxed); self.skip_clean.fetch_add(1, Ordering::Relaxed);
return; return;
} }
let snap = Self::build_snapshot(lock, gbt, min_fee); let snap = Self::build_snapshot(lock, gbt_txids, min_fee);
let block0_set: FxHashSet<Txid> = snap.block0_txids().collect(); let block0_set: FxHashSet<Txid> = snap.block0_txids().collect();
let next_hash = snap.next_block_hash; let next_hash = snap.next_block_hash;
*self.snapshot.write() = Arc::new(snap); *self.snapshot.write() = Arc::new(snap);
@@ -93,7 +92,7 @@ impl Rebuilder {
fn build_snapshot( fn build_snapshot(
lock: &RwLock<State>, lock: &RwLock<State>,
gbt: &[BlockTemplateTx], gbt_txids: &[Txid],
min_fee: FeeRate, min_fee: FeeRate,
) -> Snapshot { ) -> Snapshot {
let (txs, prefix_to_idx) = { let (txs, prefix_to_idx) = {
@@ -102,12 +101,15 @@ impl Rebuilder {
}; };
// Block 0 from `getblocktemplate`: Core's actual selection. // Block 0 from `getblocktemplate`: Core's actual selection.
// Fetcher already validated GBT ⊆ live txid listing, so any // The Fetcher synthesizes pool entries for GBT txs that aren't
// drop here is a same-cycle race and the partitioner picks up // already present (using GBT's inline body + stats), so this
// the slack so callers always see NUM_BLOCKS blocks. // lookup always resolves and block 0 matches Core exactly.
let block0: Vec<TxIndex> = gbt // The `filter_map` only drops if a tx was concurrently evicted
// from `txs` between `build_txs` and the rebuild, which the
// partitioner backfills so callers still see `NUM_BLOCKS`.
let block0: Vec<TxIndex> = gbt_txids
.iter() .iter()
.filter_map(|t| prefix_to_idx.get(&TxidPrefix::from(&t.txid)).copied()) .filter_map(|txid| prefix_to_idx.get(&TxidPrefix::from(txid)).copied())
.collect(); .collect();
let excluded: FxHashSet<TxIndex> = block0.iter().copied().collect(); let excluded: FxHashSet<TxIndex> = block0.iter().copied().collect();
let rest = Partitioner::partition(&txs, &excluded, NUM_BLOCKS.saturating_sub(1)); let rest = Partitioner::partition(&txs, &excluded, NUM_BLOCKS.saturating_sub(1));

View File

@@ -499,10 +499,11 @@ impl Query {
// === Helper methods === // === Helper methods ===
/// Hash to height. The prefix store keys on the first 8 bytes of /// Hash to height, clamped to the safe-lengths snapshot. The prefix
/// the hash, so the resolved height is verified against the full /// store keys on the first 8 bytes of the hash, so the resolved
/// `blockhash[height]` before being returned. Prefix collisions /// height is verified against the full `blockhash[height]` before
/// (or unknown hashes) surface as `NotFound`. /// being returned. Prefix collisions, unknown hashes, and hashes
/// past the snapshot all surface as `NotFound`.
pub fn height_by_hash(&self, hash: &BlockHash) -> Result<Height> { pub fn height_by_hash(&self, hash: &BlockHash) -> Result<Height> {
let indexer = self.indexer(); let indexer = self.indexer();
let prefix = BlockHashPrefix::from(hash); let prefix = BlockHashPrefix::from(hash);
@@ -512,6 +513,9 @@ impl Query {
.get(&prefix)? .get(&prefix)?
.map(|h| *h) .map(|h| *h)
.ok_or(Error::NotFound("Block not found".into()))?; .ok_or(Error::NotFound("Block not found".into()))?;
if height >= self.safe_lengths().height {
return Err(Error::NotFound("Block not found".into()));
}
match indexer.vecs.blocks.blockhash.get(height) { match indexer.vecs.blocks.blockhash.get(height) {
Some(stored) if &stored == hash => Ok(height), Some(stored) if &stored == hash => Ok(height),
_ => Err(Error::NotFound("Block not found".into())), _ => Err(Error::NotFound("Block not found".into())),

View File

@@ -184,8 +184,8 @@ impl Query {
} }
// Snapshot tip-derived state together so the historical-branch ETag stays // Snapshot tip-derived state together so the historical-branch ETag stays
// self-consistent: stable_count is computed from tip_height, hash_prefix // self-consistent: tip_height and hash_prefix both reflect the safe-bound
// is the live tip. // tip, and stable_count is computed from tip_height.
let tip_height = self.height(); let tip_height = self.height();
let hash_prefix = self.tip_hash_prefix(); let hash_prefix = self.tip_hash_prefix();
let stable_count = self.stable_count(params.index, total, tip_height); let stable_count = self.stable_count(params.index, total, tip_height);

View File

@@ -77,7 +77,7 @@ impl Query {
self.indexer().safe_lengths() self.indexer().safe_lengths()
} }
/// Tip block hash, cached in the indexer. /// Tip block hash at the pipeline-safe ceiling.
#[inline] #[inline]
pub fn tip_blockhash(&self) -> BlockHash { pub fn tip_blockhash(&self) -> BlockHash {
self.indexer().tip_blockhash() self.indexer().tip_blockhash()

View File

@@ -7,7 +7,7 @@ use std::{
use bitcoin::ScriptBuf; use bitcoin::ScriptBuf;
use brk_error::Result; use brk_error::Result;
use brk_types::{BlockHash, Sats, Txid}; use brk_types::{BlockHash, Sats, Txid, Weight};
mod client; mod client;
mod methods; mod methods;
@@ -35,10 +35,19 @@ pub struct TxOutInfo {
pub script_pub_key: ScriptBuf, pub script_pub_key: ScriptBuf,
} }
/// One transaction from `getblocktemplate`. Carries the full decoded
/// body and stats so block 0 can be projected without a follow-up
/// `getmempoolentry`/`getrawtransaction` per tx; that follow-up was the
/// source of the GBT/listing race that used to skip cycles.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct BlockTemplateTx { pub struct BlockTemplateTx {
pub txid: Txid, pub txid: Txid,
pub fee: Sats, pub fee: Sats,
pub weight: Weight,
/// Parent txids also in this template (Core's own ancestor
/// accounting, resolved from the wire-level 1-based indices).
pub depends: Vec<Txid>,
pub tx: bitcoin::Transaction,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View File

@@ -27,16 +27,18 @@ const RPC_NOT_FOUND: i32 = -5;
use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, TxOutInfo}; use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, TxOutInfo};
/// Per-batch request count for `get_block_hashes_range`, /// Per-batch request count for `get_block_hashes_range`,
/// `fetch_mempool_entries`, and `fetch_raw_transactions`. Sized so the /// `fetch_new_pool_data`, and `get_raw_transactions`. Sized so the JSON
/// JSON request body stays well under a megabyte and bitcoind doesn't /// request body stays well under a megabyte and bitcoind doesn't spend
/// spend too long on a single batch before yielding results. /// too long on a single batch before yielding results. For the mixed
/// `getmempoolentry`+`getrawtransaction` batch this is the *txid* count;
/// the wire batch is twice that.
const BATCH_CHUNK: usize = 2000; const BATCH_CHUNK: usize = 2000;
/// Live mempool state fetched in one batched bitcoind round-trip: /// Live mempool state fetched in one batched bitcoind round-trip:
/// `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`. /// `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`. Each
/// `gbt` is validated to be a subset of `live_txids` before /// `gbt` entry carries the full decoded tx and stats so block 0 is
/// construction; on mismatch the cycle is skipped (`Ok(None)`) so we /// projected directly from Core's selection without a follow-up entry
/// never publish a block 0 missing txids Core would actually mine. /// fetch that could race the eviction of one of those txs.
pub struct MempoolState { pub struct MempoolState {
pub live_txids: Vec<Txid>, pub live_txids: Vec<Txid>,
pub gbt: Vec<BlockTemplateTx>, pub gbt: Vec<BlockTemplateTx>,
@@ -64,8 +66,11 @@ struct GbtResponseRaw {
#[derive(Deserialize)] #[derive(Deserialize)]
struct GbtTxRaw { struct GbtTxRaw {
data: String,
txid: bitcoin::Txid, txid: bitcoin::Txid,
fee: u64, fee: u64,
weight: u64,
depends: Vec<u32>,
} }
fn build_entry(txid: Txid, e: MempoolEntryRaw) -> Result<MempoolEntryInfo> { fn build_entry(txid: Txid, e: MempoolEntryRaw) -> Result<MempoolEntryInfo> {
@@ -84,14 +89,32 @@ fn build_entry(txid: Txid, e: MempoolEntryRaw) -> Result<MempoolEntryInfo> {
}) })
} }
fn build_gbt(raw: GbtResponseRaw) -> Vec<BlockTemplateTx> { fn build_gbt(raw: GbtResponseRaw) -> Result<Vec<BlockTemplateTx>> {
raw.transactions // Pass 1: decode bodies and stash the 1-based GBT-array indices
.into_iter() // aside so we can drop each `data` hex string and `GbtTxRaw` as
.map(|t| BlockTemplateTx { // soon as the tx is pushed.
let n = raw.transactions.len();
let mut depends_idx: Vec<Vec<u32>> = Vec::with_capacity(n);
let mut result: Vec<BlockTemplateTx> = Vec::with_capacity(n);
for t in raw.transactions {
depends_idx.push(t.depends);
result.push(BlockTemplateTx {
txid: Txid::from(t.txid), txid: Txid::from(t.txid),
fee: Sats::from(t.fee), fee: Sats::from(t.fee),
}) weight: Weight::from(t.weight),
.collect() depends: Vec::new(),
tx: encode::deserialize_hex(&t.data)?,
});
}
// Pass 2: resolve indices to txids now that the array is complete.
for (i, indices) in depends_idx.iter().enumerate() {
let resolved: Vec<Txid> = indices
.iter()
.filter_map(|d| result.get((*d as usize).checked_sub(1)?).map(|t| t.txid))
.collect();
result[i].depends = resolved;
}
Ok(result)
} }
/// Convert bitcoind's `mempoolminfee` (BTC/kvB f64) to sat/vB. Round-trip /// Convert bitcoind's `mempoolminfee` (BTC/kvB f64) to sat/vB. Round-trip
@@ -367,15 +390,11 @@ impl Client {
} }
/// Core's projected next block + live mempool txid set + /// Core's projected next block + live mempool txid set +
/// `mempoolminfee`, fetched in a single bitcoind round-trip. /// `mempoolminfee`, fetched in a single bitcoind round-trip. GBT
/// `getblocktemplate` runs first so any tx arriving between the /// carries each tx's full body and stats, so block 0 is exact even
/// intra-batch calls lands in the txid listing only, preserving /// when a tx vanishes from the mempool listing between the GBT and
/// GBT ⊆ txids for the common race. Validates that every GBT txid /// `getrawmempool` calls; no follow-up entry fetch can race it.
/// is present in the txid listing and returns `Ok(None)` on pub fn fetch_mempool_state(&self) -> Result<MempoolState> {
/// mismatch so the caller can skip the cycle: republishing block 0
/// with missing txids would diverge from Core's exact selection.
/// Other failures bubble up as `Err`.
pub fn fetch_mempool_state(&self) -> Result<Option<MempoolState>> {
let requests: [(&str, Vec<Value>); 3] = [ let requests: [(&str, Vec<Value>); 3] = [
( (
"getblocktemplate", "getblocktemplate",
@@ -394,56 +413,72 @@ impl Client {
.iter() .iter()
.map(|s| Self::parse_txid(s, "mempool txid")) .map(|s| Self::parse_txid(s, "mempool txid"))
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
let gbt = build_gbt(serde_json::from_str(gbt_raw.get())?); let gbt = build_gbt(serde_json::from_str(gbt_raw.get())?)?;
let min_fee = build_min_fee(serde_json::from_str(info_raw.get())?); let min_fee = build_min_fee(serde_json::from_str(info_raw.get())?);
let live_set: rustc_hash::FxHashSet<Txid> = live_txids.iter().copied().collect(); Ok(MempoolState {
let missing = gbt.iter().filter(|t| !live_set.contains(&t.txid)).count();
if missing > 0 {
#[cfg(debug_assertions)]
tracing::warn!(
missing,
gbt_total = gbt.len(),
"getblocktemplate has {missing} txids not in mempool listing; skipping cycle"
);
return Ok(None);
}
Ok(Some(MempoolState {
live_txids, live_txids,
gbt, gbt,
min_fee, min_fee,
})) })
} }
/// Batched `getmempoolentry` for the given txids. Returns /// Mixed batch of `getmempoolentry` + `getrawtransaction` for the
/// `MempoolEntryInfo` per successful lookup. Per-item -5 (NOT_FOUND /// same txid set in one round-trip. Returns the entries vec and the
/// — tx evicted/replaced between the txid listing and this call) /// raw-tx map keyed by txid. Per-item -5 (NOT_FOUND — tx evicted
/// drops silently; transport-level failures still propagate. /// between the listing and this call) drops silently for either leg;
/// Chunked at `BATCH_CHUNK` requests per round-trip. /// transport-level failures still propagate. Chunked at `BATCH_CHUNK`
pub fn fetch_mempool_entries(&self, txids: &[Txid]) -> Result<Vec<MempoolEntryInfo>> { /// txids per round-trip (2× that on the wire).
let mut out: Vec<MempoolEntryInfo> = Vec::with_capacity(txids.len()); pub fn fetch_new_pool_data(
&self,
txids: &[Txid],
) -> Result<(Vec<MempoolEntryInfo>, FxHashMap<Txid, bitcoin::Transaction>)> {
let mut entries: Vec<MempoolEntryInfo> = Vec::with_capacity(txids.len());
let mut txs: FxHashMap<Txid, bitcoin::Transaction> =
FxHashMap::with_capacity_and_hasher(txids.len(), Default::default());
for chunk in txids.chunks(BATCH_CHUNK) { for chunk in txids.chunks(BATCH_CHUNK) {
let args = chunk.iter().map(|t| { let mut requests: Vec<(&str, Vec<Value>)> = Vec::with_capacity(chunk.len() * 2);
let bt: &bitcoin::Txid = t.into(); for txid in chunk {
vec![serde_json::to_value(bt).unwrap_or(Value::Null)] let bt: &bitcoin::Txid = txid.into();
}); let tv = serde_json::to_value(bt).unwrap_or(Value::Null);
let results: Vec<Result<MempoolEntryRaw>> = requests.push(("getmempoolentry", vec![tv.clone()]));
self.0.call_batch_per_item("getmempoolentry", args)?; requests.push(("getrawtransaction", vec![tv, Value::Bool(false)]));
}
for (txid, res) in chunk.iter().zip(results) { let results = self.0.call_mixed_batch(&requests)?;
match res { let mut iter = results.into_iter();
Ok(raw) => out.push(build_entry(*txid, raw)?), for txid in chunk {
let entry_res = iter.next().ok_or(Error::Internal("missing entry"))?;
let raw_res = iter.next().ok_or(Error::Internal("missing raw"))?;
match entry_res.and_then(|raw| {
let me: MempoolEntryRaw = serde_json::from_str(raw.get())?;
build_entry(*txid, me)
}) {
Ok(info) => entries.push(info),
Err(Error::CorepcRPC(JsonRpcError::Rpc(rpc))) if rpc.code == RPC_NOT_FOUND => {} Err(Error::CorepcRPC(JsonRpcError::Rpc(rpc))) if rpc.code == RPC_NOT_FOUND => {}
Err(e) => { Err(e) => {
debug!(txid = %txid, error = %e, "getmempoolentry batch: item failed") debug!(txid = %txid, error = %e, "getmempoolentry mixed batch: item failed")
}
}
match raw_res.and_then(|raw| {
let hex: String = serde_json::from_str(raw.get())?;
Ok(encode::deserialize_hex::<bitcoin::Transaction>(&hex)?)
}) {
Ok(tx) => {
txs.insert(*txid, tx);
}
Err(Error::CorepcRPC(JsonRpcError::Rpc(rpc))) if rpc.code == RPC_NOT_FOUND => {}
Err(e) => {
debug!(txid = %txid, error = %e, "getrawtransaction mixed batch: item failed")
} }
} }
} }
} }
Ok(out) Ok((entries, txs))
} }
pub fn get_closest_valid_height(&self, hash: BlockHash) -> Result<(Height, BlockHash)> { pub fn get_closest_valid_height(&self, hash: BlockHash) -> Result<(Height, BlockHash)> {