diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index e4690f72e..73a689fac 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -3,7 +3,6 @@ use std::{ fs, path::{Path, PathBuf}, - sync::Arc, thread, time::{Duration, Instant}, }; @@ -13,7 +12,6 @@ use brk_reader::{Reader, XORBytes}; use brk_rpc::Client; use brk_types::{BlockHash, Height}; use fjall::PersistMode; -use parking_lot::RwLock; use tracing::{debug, error, info}; use vecdb::{ Exit, RawDBError, ReadOnlyClone, ReadableVec, Ro, Rw, StorageMode, WritableVec, unlikely, @@ -39,13 +37,25 @@ pub struct Indexer { path: PathBuf, pub vecs: Vecs, pub stores: Stores, - tip_blockhash: Arc>, safe_lengths: SafeLengths, } impl Indexer { + /// Tip block hash at the pipeline-safe ceiling. + /// + /// Reads the on-disk blockhash vec at `safe_lengths.height - 1` so + /// the answer always agrees with `safe_lengths`. The indexer's loop + /// pushes new hashes per block before `safe_lengths` advances (that + /// only happens after the compute pass via + /// [`Indexer::advance_safe_lengths`]); reading from a live cache + /// here would mint a tip ahead of every safe-bound endpoint and + /// cause cache etags to invalidate before the data they cover is + /// actually queryable. pub fn tip_blockhash(&self) -> BlockHash { - *self.tip_blockhash.read() + match self.safe_lengths().height.decremented() { + Some(h) => self.vecs.blocks.blockhash.collect_one(h).unwrap_or_default(), + None => BlockHash::default(), + } } /// Pipeline-safe `Lengths` snapshot shared with `Query`. Writers @@ -83,8 +93,6 @@ impl Indexer { let stores = Stores::forced_import(&indexed_path, VERSION)?; info!("Imported stores in {:?}", i.elapsed()); - let tip_blockhash = vecs.blocks.blockhash.collect_last().unwrap_or_default(); - let safe_lengths = SafeLengths::new(); if let Some(lengths) = Lengths::from_local(&vecs, &stores) { safe_lengths.advance(lengths); @@ -94,7 +102,6 @@ impl Indexer { path: indexed_path.clone(), vecs, stores, - tip_blockhash: Arc::new(RwLock::new(tip_blockhash)), safe_lengths, }) }; @@ -122,7 +129,6 @@ impl Indexer { fn full_reset(&mut self) -> Result<()> { info!("Full reset..."); self.safe_lengths.reset(); - *self.tip_blockhash.write() = BlockHash::default(); self.vecs.reset()?; let stores_path = self.path.join("stores"); fs::remove_dir_all(&stores_path).ok(); @@ -188,9 +194,6 @@ impl Indexer { debug!("Rollback stores done."); self.vecs.rollback_if_needed(&starting_lengths)?; debug!("Rollback vecs done."); - if let Some(hash) = prev_hash.as_ref() { - *self.tip_blockhash.write() = *hash; - } drop(lock); let mut lengths = starting_lengths; @@ -312,8 +315,6 @@ impl Indexer { export(stores, vecs, height)?; readers = Readers::new(vecs); } - - *self.tip_blockhash.write() = block.block_hash().into(); } drop(readers); @@ -388,7 +389,6 @@ impl ReadOnlyClone for Indexer { path: self.path.clone(), vecs: self.vecs.read_only_clone(), stores: self.stores.clone(), - tip_blockhash: self.tip_blockhash.clone(), safe_lengths: self.safe_lengths.clone(), } } diff --git a/crates/brk_mempool/src/lib.rs b/crates/brk_mempool/src/lib.rs index cfea5de3a..f311d46c0 100644 --- a/crates/brk_mempool/src/lib.rs +++ b/crates/brk_mempool/src/lib.rs @@ -4,9 +4,10 @@ //! //! 1. [`steps::fetcher::Fetcher`] - one mixed batched RPC for //! `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`, -//! then a `getmempoolentry` batch and a `getrawtransaction` batch -//! on new txids only. The GBT is validated to be a subset of the -//! txid listing; on mismatch the cycle is skipped. +//! then a single mixed `getmempoolentry`+`getrawtransaction` batch +//! on new txids only. GBT-only txs are synthesized inline from the +//! GBT payload so block 0 matches Core's selection exactly without +//! a follow-up entry fetch that could race the listing. //! 2. [`steps::preparer::Preparer`] - decode and classify into //! `TxsPulled { added, removed }`. Pure CPU. //! 3. [`steps::applier::Applier`] - apply the diff to @@ -345,20 +346,17 @@ impl Mempool { .. } = &*self.0; - let Some(Fetched { + let Fetched { live_txids, new_entries, new_txs, - gbt, + gbt_txids, min_fee, - }) = Fetcher::fetch(client, state)? - else { - return Ok(()); - }; + } = Fetcher::fetch(client, state)?; let pulled = Preparer::prepare(&live_txids, new_entries, new_txs, state); let changed = Applier::apply(state, rebuilder, pulled); Prevouts::fill(state, resolver); - rebuilder.tick(state, changed, &gbt, min_fee); + rebuilder.tick(state, changed, &gbt_txids, min_fee); Ok(()) } diff --git a/crates/brk_mempool/src/steps/fetcher/fetched.rs b/crates/brk_mempool/src/steps/fetcher/fetched.rs index 4b0851b10..aac3742dd 100644 --- a/crates/brk_mempool/src/steps/fetcher/fetched.rs +++ b/crates/brk_mempool/src/steps/fetcher/fetched.rs @@ -1,4 +1,3 @@ -use brk_rpc::BlockTemplateTx; use brk_types::{FeeRate, MempoolEntryInfo, Txid}; use rustc_hash::FxHashMap; @@ -10,6 +9,10 @@ pub struct Fetched { /// keep their first-sight entry on the live store). pub new_entries: Vec, pub new_txs: FxHashMap, - pub gbt: Vec, + /// Block 0 ordering from `getblocktemplate`. Bodies and stats have + /// already been folded into `new_entries`/`new_txs` (or were already + /// in the pool); the Rebuilder only needs the txid sequence to + /// project Core's exact selection. + pub gbt_txids: Vec, pub min_fee: FeeRate, } diff --git a/crates/brk_mempool/src/steps/fetcher/mod.rs b/crates/brk_mempool/src/steps/fetcher/mod.rs index 086592c5e..15db713c7 100644 --- a/crates/brk_mempool/src/steps/fetcher/mod.rs +++ b/crates/brk_mempool/src/steps/fetcher/mod.rs @@ -4,64 +4,106 @@ pub use fetched::Fetched; use brk_error::Result; use brk_rpc::{Client, MempoolState}; -use brk_types::Txid; +use brk_types::{MempoolEntryInfo, Timestamp, Txid, VSize}; use parking_lot::RwLock; +use rustc_hash::FxHashSet; -use crate::{State, stores::TxStore}; +use crate::State; /// Cap before the batch RPC so we never hand bitcoind an unbounded batch. +/// GBT-synthesized entries are not subject to this cap; they're bounded +/// by the block weight limit Core enforces on its own template. const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000; -/// Three batched round-trips per cycle, scaling with churn rather than +/// Two batched round-trips per cycle, scaling with churn rather than /// mempool size: `getblocktemplate` + `getrawmempool false` + -/// `getmempoolinfo` in one mixed batch; then `getmempoolentry` and -/// `getrawtransaction` per *new* txid only. +/// `getmempoolinfo` in one mixed batch; then `getmempoolentry` + +/// `getrawtransaction` for *new* non-GBT txids in a second mixed batch. /// -/// `getblocktemplate` is validated to be a subset of the txid listing -/// inside the RPC layer; mismatches return `Ok(None)` so the cycle is -/// skipped without polluting downstream state. +/// GBT entries already carry the full tx body and stats, so any GBT tx +/// not yet in the local pool is materialized inline from the GBT +/// payload instead of being refetched. That removes the GBT/listing +/// race that used to skip cycles when a tx vanished from the mempool +/// between the GBT and `getrawmempool` calls: block 0 always reflects +/// Core's exact selection because we never ask for that data twice. /// /// Confirmed prevouts are resolved post-apply by the caller-supplied /// resolver passed to `Mempool::update_with`, so the in-crate path no -/// longer issues a fourth batch for parents. +/// longer issues a third batch for parents. pub struct Fetcher; impl Fetcher { - pub fn fetch(client: &Client, lock: &RwLock) -> Result> { - let Some(MempoolState { + pub fn fetch(client: &Client, lock: &RwLock) -> Result { + let MempoolState { live_txids, gbt, min_fee, - }) = client.fetch_mempool_state()? - else { - return Ok(None); - }; - let new_txids = { + } = client.fetch_mempool_state()?; + + // One read snapshot decides both the RPC fetch list and the + // GBT-synthesis set, so they agree on what's "already known". + // Graveyard txs are treated as known so a re-broadcast still + // flows through `Preparer::classify_addition` and lands as + // [`crate::TxAddition::Revived`]. + let (new_txids, gbt_synth_set) = { let state = lock.read(); - Self::new_txids(&live_txids, &state.txs) + let mut gbt_txids: FxHashSet = + FxHashSet::with_capacity_and_hasher(gbt.len(), Default::default()); + let mut gbt_synth_set: FxHashSet = FxHashSet::default(); + for g in &gbt { + gbt_txids.insert(g.txid); + if !state.txs.contains(&g.txid) { + gbt_synth_set.insert(g.txid); + } + } + let new_txids: Vec = live_txids + .iter() + .filter(|t| !state.txs.contains(t) && !gbt_txids.contains(t)) + .take(MAX_TX_FETCHES_PER_CYCLE) + .copied() + .collect(); + (new_txids, gbt_synth_set) }; - let new_entries = client.fetch_mempool_entries(&new_txids)?; - let new_txs = client.get_raw_transactions(&new_txids)?; - Ok(Some(Fetched { + + let (mut new_entries, mut new_txs) = client.fetch_new_pool_data(&new_txids)?; + new_entries.reserve(gbt_synth_set.len()); + new_txs.reserve(gbt_synth_set.len()); + + // Consume `gbt` by value: GBT-only txs move their body and + // depends into the synthesis path (no clones), and the GBT + // ordering is captured as a `Vec` for the Rebuilder, which + // is the only downstream consumer and only reads txids. + // + // GBT carries no per-tx arrival timestamp. `now` is correct to + // within ~1 cycle for a tx that just entered Core's mempool + // (the only kind that triggers synthesis: not in our pool yet + // means it just appeared this cycle). + let now = Timestamp::now(); + let gbt_txids: Vec = gbt + .into_iter() + .map(|g| { + let txid = g.txid; + if gbt_synth_set.contains(&txid) { + new_entries.push(MempoolEntryInfo { + txid, + vsize: VSize::from(g.weight), + weight: g.weight, + fee: g.fee, + first_seen: now, + depends: g.depends, + }); + new_txs.insert(txid, g.tx); + } + txid + }) + .collect(); + + Ok(Fetched { live_txids, new_entries, new_txs, - gbt, + gbt_txids, min_fee, - })) - } - - /// Live txids the local store hasn't seen yet. Graveyard txs are - /// included so a re-broadcast (post-reorg or a peer republishing) - /// flows through `Preparer::classify_addition` and lands as - /// [`crate::TxAddition::Revived`] instead of sitting orphaned for - /// the full graveyard retention. - fn new_txids(live_txids: &[Txid], known: &TxStore) -> Vec { - live_txids - .iter() - .filter(|txid| !known.contains(txid)) - .take(MAX_TX_FETCHES_PER_CYCLE) - .copied() - .collect() + }) } } diff --git a/crates/brk_mempool/src/steps/rebuilder/mod.rs b/crates/brk_mempool/src/steps/rebuilder/mod.rs index e16ffb9ad..86b9355ca 100644 --- a/crates/brk_mempool/src/steps/rebuilder/mod.rs +++ b/crates/brk_mempool/src/steps/rebuilder/mod.rs @@ -6,7 +6,6 @@ use std::{ }, }; -use brk_rpc::BlockTemplateTx; use brk_types::{FeeRate, NextBlockHash, Txid, TxidPrefix}; use parking_lot::RwLock; use rustc_hash::FxHashSet; @@ -45,7 +44,7 @@ impl Rebuilder { &self, lock: &RwLock, changed: bool, - gbt: &[BlockTemplateTx], + gbt_txids: &[Txid], min_fee: FeeRate, ) { if changed { @@ -55,7 +54,7 @@ impl Rebuilder { self.skip_clean.fetch_add(1, Ordering::Relaxed); return; } - let snap = Self::build_snapshot(lock, gbt, min_fee); + let snap = Self::build_snapshot(lock, gbt_txids, min_fee); let block0_set: FxHashSet = snap.block0_txids().collect(); let next_hash = snap.next_block_hash; *self.snapshot.write() = Arc::new(snap); @@ -93,7 +92,7 @@ impl Rebuilder { fn build_snapshot( lock: &RwLock, - gbt: &[BlockTemplateTx], + gbt_txids: &[Txid], min_fee: FeeRate, ) -> Snapshot { let (txs, prefix_to_idx) = { @@ -102,12 +101,15 @@ impl Rebuilder { }; // Block 0 from `getblocktemplate`: Core's actual selection. - // Fetcher already validated GBT ⊆ live txid listing, so any - // drop here is a same-cycle race and the partitioner picks up - // the slack so callers always see NUM_BLOCKS blocks. - let block0: Vec = gbt + // The Fetcher synthesizes pool entries for GBT txs that aren't + // already present (using GBT's inline body + stats), so this + // lookup always resolves and block 0 matches Core exactly. + // The `filter_map` only drops if a tx was concurrently evicted + // from `txs` between `build_txs` and the rebuild, which the + // partitioner backfills so callers still see `NUM_BLOCKS`. + let block0: Vec = gbt_txids .iter() - .filter_map(|t| prefix_to_idx.get(&TxidPrefix::from(&t.txid)).copied()) + .filter_map(|txid| prefix_to_idx.get(&TxidPrefix::from(txid)).copied()) .collect(); let excluded: FxHashSet = block0.iter().copied().collect(); let rest = Partitioner::partition(&txs, &excluded, NUM_BLOCKS.saturating_sub(1)); diff --git a/crates/brk_query/src/impl/block/info.rs b/crates/brk_query/src/impl/block/info.rs index fb18ea4cf..0c8985a08 100644 --- a/crates/brk_query/src/impl/block/info.rs +++ b/crates/brk_query/src/impl/block/info.rs @@ -499,10 +499,11 @@ impl Query { // === Helper methods === - /// Hash to height. The prefix store keys on the first 8 bytes of - /// the hash, so the resolved height is verified against the full - /// `blockhash[height]` before being returned. Prefix collisions - /// (or unknown hashes) surface as `NotFound`. + /// Hash to height, clamped to the safe-lengths snapshot. The prefix + /// store keys on the first 8 bytes of the hash, so the resolved + /// height is verified against the full `blockhash[height]` before + /// being returned. Prefix collisions, unknown hashes, and hashes + /// past the snapshot all surface as `NotFound`. pub fn height_by_hash(&self, hash: &BlockHash) -> Result { let indexer = self.indexer(); let prefix = BlockHashPrefix::from(hash); @@ -512,6 +513,9 @@ impl Query { .get(&prefix)? .map(|h| *h) .ok_or(Error::NotFound("Block not found".into()))?; + if height >= self.safe_lengths().height { + return Err(Error::NotFound("Block not found".into())); + } match indexer.vecs.blocks.blockhash.get(height) { Some(stored) if &stored == hash => Ok(height), _ => Err(Error::NotFound("Block not found".into())), diff --git a/crates/brk_query/src/impl/series.rs b/crates/brk_query/src/impl/series.rs index 642d6404f..1ccf7c46e 100644 --- a/crates/brk_query/src/impl/series.rs +++ b/crates/brk_query/src/impl/series.rs @@ -184,8 +184,8 @@ impl Query { } // Snapshot tip-derived state together so the historical-branch ETag stays - // self-consistent: stable_count is computed from tip_height, hash_prefix - // is the live tip. + // self-consistent: tip_height and hash_prefix both reflect the safe-bound + // tip, and stable_count is computed from tip_height. let tip_height = self.height(); let hash_prefix = self.tip_hash_prefix(); let stable_count = self.stable_count(params.index, total, tip_height); diff --git a/crates/brk_query/src/lib.rs b/crates/brk_query/src/lib.rs index 10742b0db..034611592 100644 --- a/crates/brk_query/src/lib.rs +++ b/crates/brk_query/src/lib.rs @@ -77,7 +77,7 @@ impl Query { self.indexer().safe_lengths() } - /// Tip block hash, cached in the indexer. + /// Tip block hash at the pipeline-safe ceiling. #[inline] pub fn tip_blockhash(&self) -> BlockHash { self.indexer().tip_blockhash() diff --git a/crates/brk_rpc/src/lib.rs b/crates/brk_rpc/src/lib.rs index a59f5ef61..c3f31f2d4 100644 --- a/crates/brk_rpc/src/lib.rs +++ b/crates/brk_rpc/src/lib.rs @@ -7,7 +7,7 @@ use std::{ use bitcoin::ScriptBuf; use brk_error::Result; -use brk_types::{BlockHash, Sats, Txid}; +use brk_types::{BlockHash, Sats, Txid, Weight}; mod client; mod methods; @@ -35,10 +35,19 @@ pub struct TxOutInfo { pub script_pub_key: ScriptBuf, } +/// One transaction from `getblocktemplate`. Carries the full decoded +/// body and stats so block 0 can be projected without a follow-up +/// `getmempoolentry`/`getrawtransaction` per tx; that follow-up was the +/// source of the GBT/listing race that used to skip cycles. #[derive(Debug, Clone)] pub struct BlockTemplateTx { pub txid: Txid, pub fee: Sats, + pub weight: Weight, + /// Parent txids also in this template (Core's own ancestor + /// accounting, resolved from the wire-level 1-based indices). + pub depends: Vec, + pub tx: bitcoin::Transaction, } #[derive(Clone, Debug)] diff --git a/crates/brk_rpc/src/methods.rs b/crates/brk_rpc/src/methods.rs index 87d9b8f4a..7483bb3f7 100644 --- a/crates/brk_rpc/src/methods.rs +++ b/crates/brk_rpc/src/methods.rs @@ -27,16 +27,18 @@ const RPC_NOT_FOUND: i32 = -5; use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, TxOutInfo}; /// Per-batch request count for `get_block_hashes_range`, -/// `fetch_mempool_entries`, and `fetch_raw_transactions`. Sized so the -/// JSON request body stays well under a megabyte and bitcoind doesn't -/// spend too long on a single batch before yielding results. +/// `fetch_new_pool_data`, and `get_raw_transactions`. Sized so the JSON +/// request body stays well under a megabyte and bitcoind doesn't spend +/// too long on a single batch before yielding results. For the mixed +/// `getmempoolentry`+`getrawtransaction` batch this is the *txid* count; +/// the wire batch is twice that. const BATCH_CHUNK: usize = 2000; /// Live mempool state fetched in one batched bitcoind round-trip: -/// `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`. -/// `gbt` is validated to be a subset of `live_txids` before -/// construction; on mismatch the cycle is skipped (`Ok(None)`) so we -/// never publish a block 0 missing txids Core would actually mine. +/// `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`. Each +/// `gbt` entry carries the full decoded tx and stats so block 0 is +/// projected directly from Core's selection without a follow-up entry +/// fetch that could race the eviction of one of those txs. pub struct MempoolState { pub live_txids: Vec, pub gbt: Vec, @@ -64,8 +66,11 @@ struct GbtResponseRaw { #[derive(Deserialize)] struct GbtTxRaw { + data: String, txid: bitcoin::Txid, fee: u64, + weight: u64, + depends: Vec, } fn build_entry(txid: Txid, e: MempoolEntryRaw) -> Result { @@ -84,14 +89,32 @@ fn build_entry(txid: Txid, e: MempoolEntryRaw) -> Result { }) } -fn build_gbt(raw: GbtResponseRaw) -> Vec { - raw.transactions - .into_iter() - .map(|t| BlockTemplateTx { +fn build_gbt(raw: GbtResponseRaw) -> Result> { + // Pass 1: decode bodies and stash the 1-based GBT-array indices + // aside so we can drop each `data` hex string and `GbtTxRaw` as + // soon as the tx is pushed. + let n = raw.transactions.len(); + let mut depends_idx: Vec> = Vec::with_capacity(n); + let mut result: Vec = Vec::with_capacity(n); + for t in raw.transactions { + depends_idx.push(t.depends); + result.push(BlockTemplateTx { txid: Txid::from(t.txid), fee: Sats::from(t.fee), - }) - .collect() + weight: Weight::from(t.weight), + depends: Vec::new(), + tx: encode::deserialize_hex(&t.data)?, + }); + } + // Pass 2: resolve indices to txids now that the array is complete. + for (i, indices) in depends_idx.iter().enumerate() { + let resolved: Vec = indices + .iter() + .filter_map(|d| result.get((*d as usize).checked_sub(1)?).map(|t| t.txid)) + .collect(); + result[i].depends = resolved; + } + Ok(result) } /// Convert bitcoind's `mempoolminfee` (BTC/kvB f64) to sat/vB. Round-trip @@ -367,15 +390,11 @@ impl Client { } /// Core's projected next block + live mempool txid set + - /// `mempoolminfee`, fetched in a single bitcoind round-trip. - /// `getblocktemplate` runs first so any tx arriving between the - /// intra-batch calls lands in the txid listing only, preserving - /// GBT ⊆ txids for the common race. Validates that every GBT txid - /// is present in the txid listing and returns `Ok(None)` on - /// mismatch so the caller can skip the cycle: republishing block 0 - /// with missing txids would diverge from Core's exact selection. - /// Other failures bubble up as `Err`. - pub fn fetch_mempool_state(&self) -> Result> { + /// `mempoolminfee`, fetched in a single bitcoind round-trip. GBT + /// carries each tx's full body and stats, so block 0 is exact even + /// when a tx vanishes from the mempool listing between the GBT and + /// `getrawmempool` calls; no follow-up entry fetch can race it. + pub fn fetch_mempool_state(&self) -> Result { let requests: [(&str, Vec); 3] = [ ( "getblocktemplate", @@ -394,56 +413,72 @@ impl Client { .iter() .map(|s| Self::parse_txid(s, "mempool txid")) .collect::>>()?; - let gbt = build_gbt(serde_json::from_str(gbt_raw.get())?); + let gbt = build_gbt(serde_json::from_str(gbt_raw.get())?)?; let min_fee = build_min_fee(serde_json::from_str(info_raw.get())?); - let live_set: rustc_hash::FxHashSet = live_txids.iter().copied().collect(); - let missing = gbt.iter().filter(|t| !live_set.contains(&t.txid)).count(); - if missing > 0 { - #[cfg(debug_assertions)] - tracing::warn!( - missing, - gbt_total = gbt.len(), - "getblocktemplate has {missing} txids not in mempool listing; skipping cycle" - ); - return Ok(None); - } - - Ok(Some(MempoolState { + Ok(MempoolState { live_txids, gbt, min_fee, - })) + }) } - /// Batched `getmempoolentry` for the given txids. Returns - /// `MempoolEntryInfo` per successful lookup. Per-item -5 (NOT_FOUND - /// — tx evicted/replaced between the txid listing and this call) - /// drops silently; transport-level failures still propagate. - /// Chunked at `BATCH_CHUNK` requests per round-trip. - pub fn fetch_mempool_entries(&self, txids: &[Txid]) -> Result> { - let mut out: Vec = Vec::with_capacity(txids.len()); + /// Mixed batch of `getmempoolentry` + `getrawtransaction` for the + /// same txid set in one round-trip. Returns the entries vec and the + /// raw-tx map keyed by txid. Per-item -5 (NOT_FOUND — tx evicted + /// between the listing and this call) drops silently for either leg; + /// transport-level failures still propagate. Chunked at `BATCH_CHUNK` + /// txids per round-trip (2× that on the wire). + pub fn fetch_new_pool_data( + &self, + txids: &[Txid], + ) -> Result<(Vec, FxHashMap)> { + let mut entries: Vec = Vec::with_capacity(txids.len()); + let mut txs: FxHashMap = + FxHashMap::with_capacity_and_hasher(txids.len(), Default::default()); for chunk in txids.chunks(BATCH_CHUNK) { - let args = chunk.iter().map(|t| { - let bt: &bitcoin::Txid = t.into(); - vec![serde_json::to_value(bt).unwrap_or(Value::Null)] - }); - let results: Vec> = - self.0.call_batch_per_item("getmempoolentry", args)?; + let mut requests: Vec<(&str, Vec)> = Vec::with_capacity(chunk.len() * 2); + for txid in chunk { + let bt: &bitcoin::Txid = txid.into(); + let tv = serde_json::to_value(bt).unwrap_or(Value::Null); + requests.push(("getmempoolentry", vec![tv.clone()])); + requests.push(("getrawtransaction", vec![tv, Value::Bool(false)])); + } - for (txid, res) in chunk.iter().zip(results) { - match res { - Ok(raw) => out.push(build_entry(*txid, raw)?), + let results = self.0.call_mixed_batch(&requests)?; + let mut iter = results.into_iter(); + for txid in chunk { + let entry_res = iter.next().ok_or(Error::Internal("missing entry"))?; + let raw_res = iter.next().ok_or(Error::Internal("missing raw"))?; + + match entry_res.and_then(|raw| { + let me: MempoolEntryRaw = serde_json::from_str(raw.get())?; + build_entry(*txid, me) + }) { + Ok(info) => entries.push(info), Err(Error::CorepcRPC(JsonRpcError::Rpc(rpc))) if rpc.code == RPC_NOT_FOUND => {} Err(e) => { - debug!(txid = %txid, error = %e, "getmempoolentry batch: item failed") + debug!(txid = %txid, error = %e, "getmempoolentry mixed batch: item failed") + } + } + + match raw_res.and_then(|raw| { + let hex: String = serde_json::from_str(raw.get())?; + Ok(encode::deserialize_hex::(&hex)?) + }) { + Ok(tx) => { + txs.insert(*txid, tx); + } + Err(Error::CorepcRPC(JsonRpcError::Rpc(rpc))) if rpc.code == RPC_NOT_FOUND => {} + Err(e) => { + debug!(txid = %txid, error = %e, "getrawtransaction mixed batch: item failed") } } } } - Ok(out) + Ok((entries, txs)) } pub fn get_closest_valid_height(&self, hash: BlockHash) -> Result<(Height, BlockHash)> {