mempool: fixes

This commit is contained in:
nym21
2026-05-10 14:04:08 +02:00
parent fe5f30bca6
commit 774580ee11
14 changed files with 231 additions and 174 deletions

View File

@@ -22,7 +22,10 @@ use std::{
any::Any,
cmp::Reverse,
panic::{AssertUnwindSafe, catch_unwind},
sync::Arc,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::{Duration, Instant},
};
@@ -34,8 +37,8 @@ use brk_types::{
MempoolInfo, MempoolRecentTx, NextBlockHash, OutpointPrefix, OutputType, Sats, Timestamp,
Transaction, TxOut, Txid, TxidPrefix, Vin, Vout,
};
use rustc_hash::FxHashSet;
use parking_lot::{RwLock, RwLockReadGuard};
use rustc_hash::{FxHashMap, FxHashSet};
use tracing::error;
mod cluster;
@@ -54,9 +57,15 @@ pub(crate) use steps::{BlockStats, RecommendedFees, TxEntry, TxRemoval};
pub(crate) use stores::{TxStore, TxTombstone};
/// Confirmed-parent prevout resolver passed to [`Mempool::update_with`] /
/// [`Mempool::start_with`]. Receives `(parent_txid, vout)`, returns the
/// `TxOut` if the parent is reachable, `None` otherwise.
pub type PrevoutResolver = Box<dyn Fn(&Txid, Vout) -> Option<TxOut> + Send + Sync>;
/// [`Mempool::start_with`]. Receives a slice of `(parent_txid, vout)`
/// holes and returns the subset that resolved. Unresolved holes are
/// simply omitted from the map; the next cycle retries automatically.
///
/// Batched so the RPC implementation can pack one round-trip per cycle
/// (deduping by parent txid so a tx with N inputs from one parent costs
/// one fetch); the indexer implementation just loops over local reads.
pub type PrevoutResolver =
Box<dyn Fn(&[(Txid, Vout)]) -> FxHashMap<(Txid, Vout), TxOut> + Send + Sync>;
pub(crate) use state::State;
@@ -68,6 +77,7 @@ struct Inner {
client: Client,
state: RwLock<State>,
rebuilder: Rebuilder,
started: AtomicBool,
}
impl Mempool {
@@ -76,6 +86,7 @@ impl Mempool {
client: client.clone(),
state: RwLock::new(State::default()),
rebuilder: Rebuilder::default(),
started: AtomicBool::new(false),
}))
}
@@ -257,10 +268,7 @@ impl Mempool {
/// evicted RBF predecessor reports the package-effective rate it
/// had in the mempool, not a misleading isolated `fee/vsize`.
pub fn graveyard_fee_rate(&self, txid: &Txid) -> Option<FeeRate> {
self.read()
.graveyard
.get(txid)
.map(|tomb| tomb.chunk_rate)
self.read().graveyard.get(txid).map(|tomb| tomb.chunk_rate)
}
/// `first_seen` Unix-second timestamps for `txids`, in input order.
@@ -292,8 +300,16 @@ impl Mempool {
/// overruns `PERIOD`, the next cycle starts immediately.
pub fn start_with<F>(&self, resolver: F)
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
F: Fn(&[(Txid, Vout)]) -> FxHashMap<(Txid, Vout), TxOut>,
{
if self
.0
.started
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
panic!("Mempool::start_with already running on this instance");
}
const PERIOD: Duration = Duration::from_millis(500);
loop {
let started = Instant::now();
@@ -303,7 +319,10 @@ impl Mempool {
}
}));
if let Err(payload) = outcome {
error!("mempool update panicked, continuing loop: {}", panic_msg(&payload));
error!(
"mempool update panicked, continuing loop: {}",
panic_msg(&payload)
);
}
if let Some(rest) = PERIOD.checked_sub(started.elapsed()) {
thread::sleep(rest);
@@ -311,38 +330,32 @@ impl Mempool {
}
}
/// One sync cycle with the default RPC resolver. Equivalent to
/// `update_with(rpc_resolver)`. Standalone consumers (Core +
/// `txindex=1`) get a one-line driver loop.
pub fn update(&self) -> Result<()> {
self.update_with(Prevouts::rpc_resolver(self.0.client.clone()))
}
/// One sync cycle: fetch, prepare, apply, fill prevouts, maybe
/// rebuild. The resolver MUST resolve confirmed prevouts only;
/// mempool-to-mempool chains are wired internally and the
/// resolver is never called for them.
pub fn update_with<F>(&self, resolver: F) -> Result<()>
fn update_with<F>(&self, resolver: F) -> Result<()>
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
F: Fn(&[(Txid, Vout)]) -> FxHashMap<(Txid, Vout), TxOut>,
{
let Inner {
client,
state,
rebuilder,
..
} = &*self.0;
let Some(Fetched {
live_txids,
new_entries,
new_raws,
new_txs,
gbt,
min_fee,
}) = Fetcher::fetch(client, state)?
else {
return Ok(());
};
let pulled = Preparer::prepare(&live_txids, new_entries, new_raws, state);
let pulled = Preparer::prepare(&live_txids, new_entries, new_txs, state);
let changed = Applier::apply(state, rebuilder, pulled);
Prevouts::fill(state, resolver);
rebuilder.tick(state, changed, &gbt, min_fee);

View File

@@ -1,4 +1,4 @@
use brk_rpc::{BlockTemplateTx, RawTx};
use brk_rpc::BlockTemplateTx;
use brk_types::{FeeRate, MempoolEntryInfo, Txid};
use rustc_hash::FxHashMap;
@@ -9,7 +9,7 @@ pub struct Fetched {
/// `MempoolEntryInfo` for newly-observed txids only (existing ones
/// keep their first-sight entry on the live store).
pub new_entries: Vec<MempoolEntryInfo>,
pub new_raws: FxHashMap<Txid, RawTx>,
pub new_txs: FxHashMap<Txid, bitcoin::Transaction>,
pub gbt: Vec<BlockTemplateTx>,
pub min_fee: FeeRate,
}

View File

@@ -41,11 +41,11 @@ impl Fetcher {
Self::new_txids(&live_txids, &state.txs)
};
let new_entries = client.fetch_mempool_entries(&new_txids)?;
let new_raws = client.get_raw_transactions(&new_txids)?;
let new_txs = client.get_raw_transactions(&new_txids)?;
Ok(Some(Fetched {
live_txids,
new_entries,
new_raws,
new_txs,
gbt,
min_fee,
}))

View File

@@ -12,7 +12,6 @@
//! state on the live store. Removals are inferred by cross-referencing
//! inputs against the full `live_txids` set from the cycle's pull.
use brk_rpc::RawTx;
use brk_types::{MempoolEntryInfo, Transaction, Txid, TxidPrefix, Vout};
use parking_lot::RwLock;
use rustc_hash::{FxHashMap, FxHashSet};
@@ -40,13 +39,13 @@ impl Preparer {
pub fn prepare(
live_txids: &[Txid],
new_entries: Vec<MempoolEntryInfo>,
new_raws: FxHashMap<Txid, RawTx>,
new_txs: FxHashMap<Txid, bitcoin::Transaction>,
lock: &RwLock<State>,
) -> TxsPulled {
let state = lock.read();
let live: FxHashSet<TxidPrefix> = live_txids.iter().map(TxidPrefix::from).collect();
let added = Self::classify_additions(new_entries, new_raws, &state.txs, &state.graveyard);
let added = Self::classify_additions(new_entries, new_txs, &state.txs, &state.graveyard);
let removed = Self::classify_removals(&live, &added, &state.txs);
TxsPulled { added, removed }
@@ -54,13 +53,13 @@ impl Preparer {
fn classify_additions(
new_entries: Vec<MempoolEntryInfo>,
mut new_raws: FxHashMap<Txid, RawTx>,
mut new_txs: FxHashMap<Txid, bitcoin::Transaction>,
known: &TxStore,
graveyard: &TxGraveyard,
) -> Vec<TxAddition> {
new_entries
.iter()
.filter_map(|info| Self::classify_addition(info, known, graveyard, &mut new_raws))
.filter_map(|info| Self::classify_addition(info, known, graveyard, &mut new_txs))
.collect()
}
@@ -68,7 +67,7 @@ impl Preparer {
info: &MempoolEntryInfo,
known: &TxStore,
graveyard: &TxGraveyard,
new_raws: &mut FxHashMap<Txid, RawTx>,
new_txs: &mut FxHashMap<Txid, bitcoin::Transaction>,
) -> Option<TxAddition> {
if known.contains(&info.txid) {
return None;
@@ -76,8 +75,8 @@ impl Preparer {
if let Some(tomb) = graveyard.get(&info.txid) {
return Some(TxAddition::revived(info, tomb));
}
let raw = new_raws.remove(&info.txid)?;
Some(TxAddition::fresh(info, raw, known))
let tx = new_txs.remove(&info.txid)?;
Some(TxAddition::fresh(info, tx, known))
}
/// One `(prefix, reason)` per known tx that's gone from the live set,

View File

@@ -9,9 +9,6 @@
//! (preserving `rbf`, `size`). The Applier exhumes the cached tx
//! body. No raw decoding.
use std::mem;
use brk_rpc::RawTx;
use brk_types::{MempoolEntryInfo, SigOps, Transaction, TxIn, TxOut, TxStatus, Txid, Vout};
use crate::{TxTombstone, stores::TxStore};
@@ -27,39 +24,44 @@ impl TxAddition {
/// Resolves prevouts against the live mempool only. Confirmed
/// parents land with `prevout: None` and are filled by the
/// resolver supplied to `Mempool::update_with` in the same cycle.
pub(super) fn fresh(info: &MempoolEntryInfo, raw: RawTx, mempool_txs: &TxStore) -> Self {
let total_size = raw.hex.len() / 2;
let rbf = raw.tx.input.iter().any(|i| i.sequence.is_rbf());
let tx = Self::build_tx(info, raw, total_size, mempool_txs);
pub(super) fn fresh(
info: &MempoolEntryInfo,
tx: bitcoin::Transaction,
mempool_txs: &TxStore,
) -> Self {
let total_size = tx.total_size();
let rbf = tx.input.iter().any(|i| i.sequence.is_rbf());
let built = Self::build_tx(info, tx, total_size, mempool_txs);
let entry = TxEntry::new(info, total_size as u64, rbf);
Self::Fresh { tx, entry }
Self::Fresh { tx: built, entry }
}
fn build_tx(
info: &MempoolEntryInfo,
mut raw: RawTx,
tx: bitcoin::Transaction,
total_size: usize,
mempool_txs: &TxStore,
) -> Transaction {
let input = mem::take(&mut raw.tx.input)
let input = tx
.input
.into_iter()
.map(|txin| Self::build_txin(txin, mempool_txs))
.collect();
let mut tx = Transaction {
let mut built = Transaction {
index: None,
txid: info.txid,
version: raw.tx.version.into(),
version: tx.version.into(),
total_sigop_cost: SigOps::ZERO,
weight: info.weight,
lock_time: raw.tx.lock_time.into(),
lock_time: tx.lock_time.into(),
total_size,
fee: info.fee,
input,
output: raw.tx.output.into_iter().map(TxOut::from).collect(),
output: tx.output.into_iter().map(TxOut::from).collect(),
status: TxStatus::UNCONFIRMED,
};
tx.total_sigop_cost = tx.total_sigop_cost();
tx
built.total_sigop_cost = built.total_sigop_cost();
built
}
pub(super) fn revived(info: &MempoolEntryInfo, tomb: &TxTombstone) -> Self {

View File

@@ -5,6 +5,9 @@ use brk_types::Txid;
/// `Replaced` = at least one freshly added tx this cycle spends one of
/// its inputs (BIP-125 replacement inferred from conflicting outpoints).
/// `by` is the immediate successor; the chain extends if `by` is itself
/// later replaced. Walk it forward via `TxGraveyard::replacement_root_of`.
///
/// `Vanished` = any other reason we can't distinguish from the data at
/// hand (mined, expired, evicted, or replaced by a tx we didn't fetch
/// due to the per-cycle fetch cap).

View File

@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use brk_rpc::Client;
use brk_types::{TxOut, Txid, TxidPrefix, Vin, Vout};
use parking_lot::RwLock;
use rustc_hash::{FxHashMap, FxHashSet};
use tracing::warn;
use crate::{State, stores::TxStore};
@@ -33,15 +34,16 @@ type Fills = Vec<(Vin, TxOut)>;
type Holes = Vec<(Vin, Txid, Vout)>;
type FillBatch = Vec<(Txid, Fills)>;
type HoleBatch = Vec<(Txid, Holes)>;
type Resolved = FxHashMap<(Txid, Vout), TxOut>;
impl Prevouts {
/// Fill every unfilled prevout the cycle can resolve. Same-cycle
/// in-mempool parents are filled lock-locally; the remainder go
/// through `resolver` outside any lock. Returns true iff anything
/// was written.
/// through `resolver` (one batched call) outside any lock. Returns
/// true iff anything was written.
pub fn fill<F>(lock: &RwLock<State>, resolver: F) -> bool
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
F: Fn(&[(Txid, Vout)]) -> Resolved,
{
let (in_mempool, holes) = {
let state = lock.read();
@@ -63,28 +65,41 @@ impl Prevouts {
true
}
/// Default resolver: per-call `getrawtransaction` against the
/// bitcoind RPC client `Mempool` already holds. Requires
/// `txindex=1`. On any failure logs once with a hint, then returns
/// `None`; the next cycle retries automatically.
pub fn rpc_resolver(client: Client) -> impl Fn(&Txid, Vout) -> Option<TxOut> {
/// Default resolver: one batched `getrawtransaction` per cycle,
/// deduped by parent txid. Requires bitcoind with `txindex=1`.
pub fn rpc_resolver(client: Client) -> impl Fn(&[(Txid, Vout)]) -> Resolved {
let warned = AtomicBool::new(false);
move |txid, vout| {
let bt: &bitcoin::Txid = txid.into();
match client.get_raw_transaction(bt, None as Option<&bitcoin::BlockHash>) {
Ok(tx) => tx
.output
.get(usize::from(vout))
.map(|o| TxOut::from((o.script_pubkey.clone(), o.value.into()))),
move |holes: &[(Txid, Vout)]| {
if holes.is_empty() {
return Resolved::default();
}
let mut seen: FxHashSet<Txid> = FxHashSet::default();
let unique: Vec<Txid> = holes
.iter()
.filter_map(|(t, _)| seen.insert(*t).then_some(*t))
.collect();
let parents = match client.get_raw_transactions(&unique) {
Ok(map) => {
warned.store(false, Ordering::Relaxed);
map
}
Err(_) => {
if !warned.swap(true, Ordering::Relaxed) {
warn!(
"mempool: getrawtransaction missed for {txid}; ensure bitcoind is running with txindex=1"
"mempool: getrawtransaction batch failed; ensure bitcoind is running with txindex=1"
);
}
None
return Resolved::default();
}
}
};
holes
.iter()
.filter_map(|(txid, vout)| {
let o = parents.get(txid)?.output.get(usize::from(*vout))?;
let txout = TxOut::from((o.script_pubkey.clone(), o.value.into()));
Some(((*txid, *vout), txout))
})
.collect()
}
}
@@ -92,9 +107,6 @@ impl Prevouts {
/// same-cycle in-mempool fill (parent is live) or an external hole
/// (parent is confirmed or unknown).
fn gather(txs: &TxStore) -> (FillBatch, HoleBatch) {
if txs.unresolved().is_empty() {
return (Vec::new(), Vec::new());
}
let mut filled: FillBatch = Vec::new();
let mut holes: HoleBatch = Vec::new();
for prefix in txs.unresolved() {
@@ -127,17 +139,33 @@ impl Prevouts {
(filled, holes)
}
/// Flatten holes into one `(prev_txid, vout)` slice, invoke the
/// resolver once, then re-attribute resolved entries to their
/// consumer txs. Mempool double-spend rules guarantee every
/// `(prev_txid, vout)` key is unique across the batch, so no
/// dedup is needed before calling.
fn resolve_external<F>(holes: HoleBatch, resolver: F) -> FillBatch
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
F: Fn(&[(Txid, Vout)]) -> Resolved,
{
let total: usize = holes.iter().map(|(_, h)| h.len()).sum();
let mut flat: Vec<(Txid, Vout)> = Vec::with_capacity(total);
for (_, tx_holes) in &holes {
for (_, prev_txid, vout) in tx_holes {
flat.push((*prev_txid, *vout));
}
}
let mut resolved = resolver(&flat);
if resolved.is_empty() {
return Vec::new();
}
holes
.into_iter()
.filter_map(|(txid, holes)| {
let fills: Fills = holes
.filter_map(|(txid, tx_holes)| {
let fills: Fills = tx_holes
.into_iter()
.filter_map(|(vin, prev_txid, vout)| {
resolver(&prev_txid, vout).map(|o| (vin, o))
resolved.remove(&(prev_txid, vout)).map(|o| (vin, o))
})
.collect();
(!fills.is_empty()).then_some((txid, fills))

View File

@@ -16,8 +16,6 @@
//! block if it fits; otherwise advance to the next block (unless we
//! are already on the last one, which absorbs everything remaining).
use std::cmp::Reverse;
use brk_types::{FeeRate, VSize};
use rustc_hash::FxHashSet;
@@ -64,6 +62,10 @@ fn sorted_candidates(
(!excluded.contains(&idx)).then_some((idx, t.vsize, t.chunk_rate))
})
.collect();
cands.sort_by_key(|(_, _, rate)| Reverse(*rate));
cands.sort_by(|(a_idx, _, a_rate), (b_idx, _, b_rate)| {
b_rate
.cmp(a_rate)
.then_with(|| txs[a_idx.as_usize()].txid.cmp(&txs[b_idx.as_usize()].txid))
});
cands
}

View File

@@ -9,9 +9,10 @@ pub use stats::BlockStats;
pub use tx::SnapTx;
pub use tx_index::TxIndex;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::hash::{Hash, Hasher};
use brk_types::{FeeRate, NextBlockHash, RecommendedFees, Txid, TxidPrefix};
use rustc_hash::FxHasher;
use fees::Fees;
@@ -49,7 +50,7 @@ impl Snapshot {
) -> Self {
let block_stats = BlockStats::for_blocks(&blocks, &txs);
let fees = Fees::compute(&block_stats, min_fee);
let next_block_hash = Self::hash_next_block(&blocks);
let next_block_hash = Self::hash_next_block(&blocks, &txs);
Self {
txs,
blocks,
@@ -60,12 +61,18 @@ impl Snapshot {
}
}
fn hash_next_block(blocks: &[Vec<TxIndex>]) -> NextBlockHash {
/// Content tag over block 0 in template order. Hashes txids, not
/// `TxIndex` slots, because slot assignment is per-cycle and
/// unstable; the txid set is the actual identity of "what's in the
/// next block".
fn hash_next_block(blocks: &[Vec<TxIndex>], txs: &[SnapTx]) -> NextBlockHash {
let Some(block) = blocks.first() else {
return NextBlockHash::ZERO;
};
let mut hasher = DefaultHasher::new();
block.hash(&mut hasher);
let mut hasher = FxHasher::default();
for idx in block {
txs[idx.as_usize()].txid.hash(&mut hasher);
}
NextBlockHash::new(hasher.finish())
}

View File

@@ -1,11 +1,11 @@
use std::{
collections::hash_map::Entry as MapEntry,
hash::{DefaultHasher, Hash, Hasher},
hash::{Hash, Hasher},
};
use brk_types::{AddrBytes, AddrMempoolStats, Transaction, TxOut, Txid};
use derive_more::Deref;
use rustc_hash::FxHashMap;
use rustc_hash::{FxHashMap, FxHasher};
mod addr_entry;
@@ -52,7 +52,7 @@ impl AddrTracker {
let Some(entry) = self.0.get(addr) else {
return 0;
};
let mut hasher = DefaultHasher::new();
let mut hasher = FxHasher::default();
entry.stats.hash(&mut hasher);
hasher.finish()
}

View File

@@ -44,7 +44,9 @@ impl TxGraveyard {
}
/// Walk forward through `Replaced { by }` to the terminal replacer.
/// Returns `txid` itself if it isn't replaced (live or `Vanished`).
/// Returns the first txid in the chain that isn't a `Replaced`
/// tombstone: live, `Vanished`, or unknown (chain broken because an
/// intermediate `by` aged out of the graveyard).
pub fn replacement_root_of(&self, mut txid: Txid) -> Txid {
while let Some(TxRemoval::Replaced { by }) =
self.tombstones.get(&txid).map(|t| &t.removal)

View File

@@ -4,8 +4,9 @@ use brk_mempool::{Mempool, PrevoutResolver, RbfForTx, RbfNode};
use brk_types::{
BlockTemplate, BlockTemplateDiff, CheckedSub, FeeRate, MempoolBlock, MempoolInfo,
MempoolRecentTx, NextBlockHash, OutputType, RbfResponse, RbfTx, RecommendedFees,
ReplacementNode, Sats, Timestamp, TxOut, TxOutIndex, Txid, TxidPrefix, TypeIndex,
ReplacementNode, Sats, Timestamp, TxOut, TxOutIndex, Txid, TxidPrefix, TypeIndex, Vout,
};
use rustc_hash::FxHashMap;
const RECENT_REPLACEMENTS_LIMIT: usize = 25;
@@ -31,53 +32,46 @@ impl Query {
Ok(mempool.block_stats().iter().map(MempoolBlock::from).collect())
}
/// Indexer-backed resolver for confirmed-parent prevouts. Pass
/// the returned closure to `Mempool::start_with` /
/// `Mempool::update_with`; the mempool driver calls it post-apply
/// for every still-unfilled `prevout == None` input.
///
/// Reads go through `read_once` rather than a captured
/// `VecReader`: `VecReader::stored_len` is snapshotted at
/// construction, so a long-lived reader paired with fresh
/// `safe_lengths` would let `safe.tx_index` / `safe.txout_index`
/// advance past the reader's frozen length and panic in
/// `reader.get()`. `read_once` rebinds against the current vec
/// length per call and lets newly indexed parents become
/// resolvable on the next cycle.
/// Indexer-backed resolver for confirmed-parent prevouts.
pub fn indexer_prevout_resolver(&self) -> PrevoutResolver {
let query = self.clone();
let indexer = self.0.indexer;
Box::new(move |prev_txid, vout| {
let safe = query.safe_lengths();
let prev_tx_index = indexer
.stores
.txid_prefix_to_tx_index
.get(&TxidPrefix::from(prev_txid))
.ok()??
.into_owned();
if prev_tx_index >= safe.tx_index {
return None;
Box::new(move |holes: &[(Txid, Vout)]| {
if holes.is_empty() {
return FxHashMap::default();
}
let first_txout: TxOutIndex = indexer
.vecs
.transactions
.first_txout_index
.read_once(prev_tx_index)
.ok()?;
let txout = first_txout + vout;
if txout >= safe.txout_index {
return None;
}
let output_type: OutputType = indexer.vecs.outputs.output_type.read_once(txout).ok()?;
let type_index: TypeIndex = indexer.vecs.outputs.type_index.read_once(txout).ok()?;
let value: Sats = indexer.vecs.outputs.value.read_once(txout).ok()?;
let script_pubkey = indexer
.vecs
.addrs
.addr_readers()
.script_pubkey(output_type, type_index);
Some(TxOut::from((script_pubkey, value)))
let safe = indexer.safe_lengths();
let first_txout_reader = indexer.vecs.transactions.first_txout_index.reader();
let output_type_reader = indexer.vecs.outputs.output_type.reader();
let type_index_reader = indexer.vecs.outputs.type_index.reader();
let value_reader = indexer.vecs.outputs.value.reader();
let addr_readers = indexer.vecs.addrs.addr_readers();
holes
.iter()
.filter_map(|(prev_txid, vout)| {
let prev_tx_index = indexer
.stores
.txid_prefix_to_tx_index
.get(&TxidPrefix::from(prev_txid))
.ok()??
.into_owned();
if prev_tx_index >= safe.tx_index {
return None;
}
let first_txout: TxOutIndex =
first_txout_reader.try_get(usize::from(prev_tx_index))?;
let txout = first_txout + *vout;
if txout >= safe.txout_index {
return None;
}
let txout_idx = usize::from(txout);
let output_type: OutputType = output_type_reader.try_get(txout_idx)?;
let type_index: TypeIndex = type_index_reader.try_get(txout_idx)?;
let value: Sats = value_reader.try_get(txout_idx)?;
let script_pubkey = addr_readers.script_pubkey(output_type, type_index);
Some(((*prev_txid, *vout), TxOut::from((script_pubkey, value))))
})
.collect()
})
}

View File

@@ -7,7 +7,7 @@ use std::{
use bitcoin::ScriptBuf;
use brk_error::Result;
use brk_types::{BlockHash, Hex, Sats, Txid};
use brk_types::{BlockHash, Sats, Txid};
mod client;
mod methods;
@@ -41,15 +41,6 @@ pub struct BlockTemplateTx {
pub fee: Sats,
}
/// A transaction fetched from Core alongside the exact hex bytes Core
/// returned, so downstream code can re-emit the raw tx without re-
/// serializing (which could diverge on segwit flag encoding, etc.).
#[derive(Debug, Clone)]
pub struct RawTx {
pub tx: bitcoin::Transaction,
pub hex: Hex,
}
#[derive(Clone, Debug)]
pub enum Auth {
None,

View File

@@ -24,7 +24,7 @@ use tracing::{debug, info};
/// The mempool fetcher tolerates these per-item failures silently.
const RPC_NOT_FOUND: i32 = -5;
use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, RawTx, TxOutInfo};
use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, TxOutInfo};
/// Per-batch request count for `get_block_hashes_range`,
/// `fetch_mempool_entries`, and `fetch_raw_transactions`. Sized so the
@@ -94,8 +94,13 @@ fn build_gbt(raw: GbtResponseRaw) -> Vec<BlockTemplateTx> {
.collect()
}
/// Convert bitcoind's `mempoolminfee` (BTC/kvB f64) to sat/vB. Round-trip
/// via integer sat/kvB (bitcoind's native CFeeRate unit) so the f64 drift
/// in the JSON-decoded value can't push 1.0 sat/vB to 1.0...e-13 above 1.0
/// and trip `ceil_to(0.001)` downstream.
fn build_min_fee(raw: GetMempoolInfo) -> FeeRate {
FeeRate::from(raw.mempool_min_fee * 100_000.0)
let sat_per_kvb = (raw.mempool_min_fee * 100_000_000.0).round() as u64;
FeeRate::from(sat_per_kvb as f64 / 1000.0)
}
impl Client {
@@ -247,55 +252,70 @@ impl Client {
.collect()
}
pub fn get_raw_transaction<'a, T, H>(
pub fn get_raw_transaction<'a, T>(&self, txid: &'a T) -> Result<bitcoin::Transaction>
where
&'a T: Into<&'a bitcoin::Txid>,
{
let hex = self.get_raw_transaction_hex(txid)?;
Ok(encode::deserialize_hex::<bitcoin::Transaction>(&hex)?)
}
pub fn get_raw_transaction_from<'a, T, H>(
&self,
txid: &'a T,
block_hash: Option<&'a H>,
block_hash: &'a H,
) -> Result<bitcoin::Transaction>
where
&'a T: Into<&'a bitcoin::Txid>,
&'a H: Into<&'a bitcoin::BlockHash>,
{
let hex = self.get_raw_transaction_hex(txid, block_hash)?;
let tx = encode::deserialize_hex::<bitcoin::Transaction>(&hex)?;
Ok(tx)
let hex = self.get_raw_transaction_hex_from(txid, block_hash)?;
Ok(encode::deserialize_hex::<bitcoin::Transaction>(&hex)?)
}
pub fn get_raw_transaction_hex<'a, T, H>(
pub fn get_raw_transaction_hex<'a, T>(&self, txid: &'a T) -> Result<String>
where
&'a T: Into<&'a bitcoin::Txid>,
{
let txid: &bitcoin::Txid = txid.into();
let args = [serde_json::to_value(txid)?, Value::Bool(false)];
self.0.call_with_retry("getrawtransaction", &args)
}
pub fn get_raw_transaction_hex_from<'a, T, H>(
&self,
txid: &'a T,
block_hash: Option<&'a H>,
block_hash: &'a H,
) -> Result<String>
where
&'a T: Into<&'a bitcoin::Txid>,
&'a H: Into<&'a bitcoin::BlockHash>,
{
let txid: &bitcoin::Txid = txid.into();
let mut args: Vec<Value> = vec![serde_json::to_value(txid)?, Value::Bool(false)];
if let Some(bh) = block_hash {
let bh: &bitcoin::BlockHash = bh.into();
args.push(serde_json::to_value(bh)?);
}
let bh: &bitcoin::BlockHash = block_hash.into();
let args = [
serde_json::to_value(txid)?,
Value::Bool(false),
serde_json::to_value(bh)?,
];
self.0.call_with_retry("getrawtransaction", &args)
}
pub fn get_mempool_raw_tx(&self, txid: &Txid) -> Result<RawTx> {
let hex = self.get_raw_transaction_hex(txid, None as Option<&BlockHash>)?;
let tx = encode::deserialize_hex::<bitcoin::Transaction>(&hex)?;
Ok(RawTx {
tx,
hex: hex.into(),
})
pub fn get_mempool_raw_tx(&self, txid: &Txid) -> Result<bitcoin::Transaction> {
self.get_raw_transaction(txid)
}
/// Batched `getrawtransaction` over a slice of txids. Returns a map keyed
/// by txid containing the deserialized tx and its raw hex. Individual
/// failures (e.g. a tx that evicted between the listing and this call)
/// are logged and dropped so a single bad entry doesn't kill the batch.
/// by txid containing the deserialized tx. Individual failures (e.g. a
/// tx that evicted between the listing and this call) are logged and
/// dropped so a single bad entry doesn't kill the batch.
///
/// Chunked at `BATCH_CHUNK` requests per round-trip.
pub fn get_raw_transactions(&self, txids: &[Txid]) -> Result<FxHashMap<Txid, RawTx>> {
let mut out: FxHashMap<Txid, RawTx> =
pub fn get_raw_transactions(
&self,
txids: &[Txid],
) -> Result<FxHashMap<Txid, bitcoin::Transaction>> {
let mut out: FxHashMap<Txid, bitcoin::Transaction> =
FxHashMap::with_capacity_and_hasher(txids.len(), Default::default());
for chunk in txids.chunks(BATCH_CHUNK) {
@@ -311,14 +331,10 @@ impl Client {
for (txid, res) in chunk.iter().zip(results) {
match res.and_then(|hex| {
let tx = encode::deserialize_hex::<bitcoin::Transaction>(&hex)?;
Ok::<_, Error>(RawTx {
tx,
hex: hex.into(),
})
Ok(encode::deserialize_hex::<bitcoin::Transaction>(&hex)?)
}) {
Ok(raw) => {
out.insert(*txid, raw);
Ok(tx) => {
out.insert(*txid, tx);
}
Err(Error::CorepcRPC(JsonRpcError::Rpc(rpc))) if rpc.code == RPC_NOT_FOUND => {}
Err(e) => {