global: big snapshot

This commit is contained in:
nym21
2026-04-26 23:12:17 +02:00
parent 2210443e37
commit 7a0b4b5890
125 changed files with 3833 additions and 3129 deletions

View File

@@ -11,7 +11,7 @@ exclude = ["examples/"]
[dependencies]
bitcoin = { workspace = true }
brk_error = { workspace = true }
brk_rpc = { workspace = true, features = ["corepc"] }
brk_rpc = { workspace = true }
brk_types = { workspace = true }
derive_more = { workspace = true }
tracing = { workspace = true }

View File

@@ -26,8 +26,8 @@ fn main() -> Result<()> {
thread::sleep(Duration::from_secs(5));
// Basic mempool info
let info = mempool.get_info();
let block_stats = mempool.get_block_stats();
let info = mempool.info();
let block_stats = mempool.block_stats();
let total_fees: u64 = block_stats.iter().map(|s| u64::from(s.total_fee)).sum();
println!("\n=== Mempool Info ===");
println!(" Transactions: {}", info.count);
@@ -38,7 +38,7 @@ fn main() -> Result<()> {
);
// Fee recommendations (like mempool.space)
let fees = mempool.get_fees();
let fees = mempool.fees();
println!("\n=== Recommended Fees (sat/vB) ===");
println!(" No Priority {:.4}", f64::from(fees.economy_fee));
println!(" Low Priority {:.4}", f64::from(fees.hour_fee));
@@ -63,7 +63,7 @@ fn main() -> Result<()> {
}
// Address tracking stats
let addrs = mempool.get_addrs();
let addrs = mempool.addrs();
println!("\n=== Address Tracking ===");
println!(" Addresses with pending txs: {}", addrs.len());

View File

@@ -1,175 +0,0 @@
use std::ops::{Index, IndexMut};
use brk_types::TxidPrefix;
use rustc_hash::FxHashMap;
use super::tx_node::TxNode;
use crate::{
entry::Entry,
types::{PoolIndex, TxIndex},
};
/// Type-safe wrapper around Vec<TxNode> that only allows PoolIndex access.
pub struct Graph(Vec<TxNode>);
impl Graph {
#[inline]
pub fn len(&self) -> usize {
self.0.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl Index<PoolIndex> for Graph {
type Output = TxNode;
#[inline]
fn index(&self, idx: PoolIndex) -> &Self::Output {
&self.0[idx.as_usize()]
}
}
impl IndexMut<PoolIndex> for Graph {
#[inline]
fn index_mut(&mut self, idx: PoolIndex) -> &mut Self::Output {
&mut self.0[idx.as_usize()]
}
}
/// Build a dependency graph from mempool entries.
pub fn build_graph(entries: &[Option<Entry>]) -> Graph {
let mut live: Vec<(TxIndex, &Entry)> = Vec::with_capacity(entries.len());
for (i, opt) in entries.iter().enumerate() {
if let Some(e) = opt.as_ref() {
live.push((TxIndex::from(i), e));
}
}
if live.is_empty() {
return Graph(Vec::new());
}
let mut prefix_to_pool: FxHashMap<TxidPrefix, PoolIndex> =
FxHashMap::with_capacity_and_hasher(live.len(), Default::default());
for (i, (_, entry)) in live.iter().enumerate() {
prefix_to_pool.insert(entry.txid_prefix(), PoolIndex::from(i));
}
let mut nodes: Vec<TxNode> = live
.iter()
.map(|(tx_index, entry)| {
let mut node = TxNode::new(*tx_index, entry.fee, entry.vsize);
for parent_prefix in &entry.depends {
if let Some(&parent_pool_idx) = prefix_to_pool.get(parent_prefix) {
node.parents.push(parent_pool_idx);
}
}
node
})
.collect();
// Populate children via direct indexing; no intermediate edge vec.
// Reading parents[j] as a Copy value releases the immutable borrow
// before the mutable borrow of children's owner.
for i in 0..nodes.len() {
let plen = nodes[i].parents.len();
for j in 0..plen {
let parent_idx = nodes[i].parents[j].as_usize();
nodes[parent_idx].children.push(PoolIndex::from(i));
}
}
Graph(nodes)
}
#[cfg(test)]
mod bench {
use std::time::Instant;
use bitcoin::hashes::Hash;
use brk_types::{Sats, Timestamp, Txid, VSize};
use smallvec::SmallVec;
use super::build_graph;
use crate::entry::Entry;
/// Synthetic mempool: mostly singletons, some CPFP chains/trees.
fn synthetic_mempool(n: usize) -> Vec<Option<Entry>> {
let make_txid = |i: usize| -> Txid {
let mut bytes = [0u8; 32];
bytes[0..8].copy_from_slice(&(i as u64).to_ne_bytes());
bytes[8..16].copy_from_slice(&((i as u64).wrapping_mul(2654435761)).to_ne_bytes());
Txid::from(bitcoin::Txid::from_slice(&bytes).unwrap())
};
let mut entries: Vec<Option<Entry>> = Vec::with_capacity(n);
let mut txids: Vec<Txid> = Vec::with_capacity(n);
for i in 0..n {
let txid = make_txid(i);
txids.push(txid.clone());
// 95% singletons, 4% 1-parent, 1% 2-parent (mimics real mempool).
let depends: SmallVec<[brk_types::TxidPrefix; 2]> = match i % 100 {
0..=94 => SmallVec::new(),
95..=98 if i > 0 => {
let p = (i.wrapping_mul(7919)) % i;
std::iter::once(brk_types::TxidPrefix::from(&txids[p])).collect()
}
_ if i > 1 => {
let p1 = (i.wrapping_mul(7919)) % i;
let p2 = (i.wrapping_mul(6151)) % i;
[
brk_types::TxidPrefix::from(&txids[p1]),
brk_types::TxidPrefix::from(&txids[p2]),
]
.into_iter()
.collect()
}
_ => SmallVec::new(),
};
entries.push(Some(Entry {
txid,
fee: Sats::from((i as u64).wrapping_mul(137) % 10_000 + 1),
vsize: VSize::from(250u64),
size: 250,
ancestor_fee: Sats::from(0u64),
ancestor_vsize: VSize::from(250u64),
depends,
first_seen: Timestamp::now(),
}));
}
entries
}
#[test]
#[ignore = "perf benchmark; run with --ignored --nocapture"]
fn perf_build_graph() {
let sizes = [1_000usize, 10_000, 50_000, 100_000, 300_000];
eprintln!();
eprintln!("build_graph perf (release, single call):");
eprintln!(" n build");
eprintln!(" ------------------------");
for &n in &sizes {
let entries = synthetic_mempool(n);
// Warm up allocator.
let _ = build_graph(&entries);
let t = Instant::now();
let g = build_graph(&entries);
let dt = t.elapsed();
let ns = dt.as_nanos();
let pretty = if ns >= 1_000_000 {
format!("{:.2} ms", ns as f64 / 1_000_000.0)
} else {
format!("{:.2} µs", ns as f64 / 1_000.0)
};
eprintln!(" {:<10} {:<10} ({} nodes)", n, pretty, g.len());
}
eprintln!();
}
}

View File

@@ -1,47 +0,0 @@
use brk_types::{FeeRate, Sats, Timestamp, Txid, TxidPrefix, VSize};
use smallvec::SmallVec;
/// A mempool transaction entry.
///
/// Stores only the data needed for fee estimation and block building.
/// Ancestor values are pre-computed by Bitcoin Core (correctly handling shared ancestors).
#[derive(Debug, Clone)]
pub struct Entry {
pub txid: Txid,
pub fee: Sats,
pub vsize: VSize,
/// Serialized tx size in bytes (witness + non-witness), from the raw tx.
pub size: u64,
/// Pre-computed ancestor fee (self + all ancestors, no double-counting)
pub ancestor_fee: Sats,
/// Pre-computed ancestor vsize (self + all ancestors, no double-counting)
pub ancestor_vsize: VSize,
/// Parent txid prefixes (most txs have 0-2 parents)
pub depends: SmallVec<[TxidPrefix; 2]>,
/// When this tx was first seen in the mempool
pub first_seen: Timestamp,
}
impl Entry {
#[inline]
pub fn fee_rate(&self) -> FeeRate {
FeeRate::from((self.fee, self.vsize))
}
/// Ancestor fee rate (package rate for CPFP).
#[inline]
pub fn ancestor_fee_rate(&self) -> FeeRate {
FeeRate::from((self.ancestor_fee, self.ancestor_vsize))
}
/// Effective fee rate for display.
#[inline]
pub fn effective_fee_rate(&self) -> FeeRate {
self.fee_rate().max(self.ancestor_fee_rate())
}
#[inline]
pub fn txid_prefix(&self) -> TxidPrefix {
TxidPrefix::from(&self.txid)
}
}

View File

@@ -1,84 +0,0 @@
use brk_types::TxidPrefix;
use rustc_hash::FxHashMap;
use smallvec::SmallVec;
use crate::{entry::Entry, types::TxIndex};
/// Pool of mempool entries with slot recycling.
///
/// Uses a slot-based storage where removed entries leave holes
/// that get reused for new entries, avoiding index invalidation.
#[derive(Default)]
pub struct EntryPool {
entries: Vec<Option<Entry>>,
prefix_to_idx: FxHashMap<TxidPrefix, TxIndex>,
parent_to_children: FxHashMap<TxidPrefix, SmallVec<[TxidPrefix; 2]>>,
free_slots: Vec<TxIndex>,
}
impl EntryPool {
/// Insert an entry, returning its index.
pub fn insert(&mut self, prefix: TxidPrefix, entry: Entry) -> TxIndex {
for parent in &entry.depends {
self.parent_to_children
.entry(*parent)
.or_default()
.push(prefix);
}
let idx = match self.free_slots.pop() {
Some(idx) => {
self.entries[idx.as_usize()] = Some(entry);
idx
}
None => {
let idx = TxIndex::from(self.entries.len());
self.entries.push(Some(entry));
idx
}
};
self.prefix_to_idx.insert(prefix, idx);
idx
}
/// Get an entry by its txid prefix.
pub fn get(&self, prefix: &TxidPrefix) -> Option<&Entry> {
let idx = self.prefix_to_idx.get(prefix)?;
self.entries.get(idx.as_usize())?.as_ref()
}
/// Get direct children of a transaction (txs that depend on it).
pub fn children(&self, prefix: &TxidPrefix) -> &[TxidPrefix] {
self.parent_to_children
.get(prefix)
.map(SmallVec::as_slice)
.unwrap_or_default()
}
/// Remove an entry by its txid prefix.
pub fn remove(&mut self, prefix: &TxidPrefix) {
if let Some(idx) = self.prefix_to_idx.remove(prefix) {
if let Some(entry) = self.entries.get(idx.as_usize()).and_then(|e| e.as_ref()) {
for parent in &entry.depends {
if let Some(children) = self.parent_to_children.get_mut(parent) {
children.retain(|c| c != prefix);
if children.is_empty() {
self.parent_to_children.remove(parent);
}
}
}
}
self.parent_to_children.remove(prefix);
if let Some(slot) = self.entries.get_mut(idx.as_usize()) {
*slot = None;
}
self.free_slots.push(idx);
}
}
/// Get the entries slice for block building.
pub fn entries(&self) -> &[Option<Entry>] {
&self.entries
}
}

View File

@@ -1,11 +1,172 @@
mod addrs;
mod block_builder;
mod entry;
mod entry_pool;
mod projected_blocks;
mod sync;
mod tx_store;
mod types;
//! Live mempool monitor for the brk indexer.
//!
//! One pull cycle, five pipeline steps:
//!
//! 1. [`steps::fetcher::Fetcher`]: three batched RPCs against bitcoind
//! (verbose listing + raw txs for new entries + raw txs for
//! confirmed parents). Pure I/O.
//! 2. [`steps::preparer::Preparer`]: turn raw bytes into a typed diff
//! (`Pulled { added, removed }`), classifying additions as
//! Fresh or Revived and removals as Replaced or Vanished.
//! Pure CPU, no locks.
//! 3. [`steps::applier::Applier`]: apply the diff to the five-bucket
//! [`stores::state::MempoolState`] (info, txs, addrs, entries,
//! graveyard) under brief write locks.
//! 4. [`steps::resolver::Resolver`]: fill prevouts whose parents are
//! in the live mempool (run after every successful apply)
//! or via an external resolver supplied by the caller
//! (typically the brk indexer for confirmed parents).
//! 5. [`steps::rebuilder::Rebuilder`]: throttled rebuild of the
//! projected-blocks `Snapshot` consumed by the API.
//!
//! [`Mempool`] is the public entry point. `Mempool::start` drives the
//! cycle on a 1-second tick.
//!
//! Source layout:
//!
//! - `steps/` - one file or folder per pipeline step.
//! - `stores/` - the state buckets held inside `MempoolState` plus
//! the value types they contain.
pub use projected_blocks::{BlockStats, RecommendedFees, Snapshot};
pub use sync::{Mempool, MempoolInner};
mod steps;
mod stores;
pub use steps::preparer::Removal;
pub use steps::rebuilder::projected_blocks::{BlockStats, RecommendedFees, Snapshot};
pub use stores::{Entry, EntryPool, Tombstone, TxGraveyard, TxStore};
use std::{sync::Arc, thread, time::Duration};
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{AddrBytes, MempoolInfo, TxOut, Txid, Vout};
use parking_lot::RwLockReadGuard;
use tracing::error;
use crate::{
steps::{fetcher::Fetcher, preparer::Preparer, rebuilder::Rebuilder, resolver::Resolver},
stores::{AddrTracker, MempoolState},
};
/// Public entry point to the mempool monitor.
///
/// Cheaply cloneable: wraps an `Arc` over the private state so clones
/// share a single live mempool. See the crate-level docs for the
/// pipeline shape.
#[derive(Clone)]
pub struct Mempool(Arc<Inner>);
struct Inner {
client: Client,
state: MempoolState,
rebuilder: Rebuilder,
}
impl Mempool {
pub fn new(client: &Client) -> Self {
Self(Arc::new(Inner {
client: client.clone(),
state: MempoolState::default(),
rebuilder: Rebuilder::default(),
}))
}
pub fn info(&self) -> MempoolInfo {
self.0.state.info.read().clone()
}
pub fn snapshot(&self) -> Arc<Snapshot> {
self.0.rebuilder.snapshot()
}
pub fn fees(&self) -> RecommendedFees {
self.0.rebuilder.fees()
}
pub fn block_stats(&self) -> Vec<BlockStats> {
self.0.rebuilder.block_stats()
}
pub fn next_block_hash(&self) -> u64 {
self.0.rebuilder.next_block_hash()
}
pub fn addr_state_hash(&self, addr: &AddrBytes) -> u64 {
self.0.state.addrs.read().stats_hash(addr)
}
pub fn txs(&self) -> RwLockReadGuard<'_, TxStore> {
self.0.state.txs.read()
}
pub fn entries(&self) -> RwLockReadGuard<'_, EntryPool> {
self.0.state.entries.read()
}
pub fn addrs(&self) -> RwLockReadGuard<'_, AddrTracker> {
self.0.state.addrs.read()
}
pub fn graveyard(&self) -> RwLockReadGuard<'_, TxGraveyard> {
self.0.state.graveyard.read()
}
/// Start an infinite update loop with a 1 second interval.
pub fn start(&self) {
self.start_with(|| {});
}
/// Variant of `start` that runs `after_update` after every cycle.
/// Used by `brk_cli` to drive `Query::fill_mempool_prevouts` so
/// indexer-resolvable prevouts get filled in place each tick.
pub fn start_with(&self, mut after_update: impl FnMut()) {
loop {
if let Err(e) = self.update() {
error!("Error updating mempool: {}", e);
}
after_update();
thread::sleep(Duration::from_secs(1));
}
}
/// Fill any remaining `prevout == None` inputs on live mempool
/// txs using `resolver`. Only call this if you have an external
/// data source for confirmed parents (typically the brk indexer);
/// in-mempool same-cycle parents are filled automatically by
/// `MempoolState::apply` and don't need an external resolver.
pub fn fill_prevouts<F>(&self, resolver: F) -> bool
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
{
Resolver::resolve_external(&self.0.state, resolver)
}
/// One sync cycle: fetch -> prepare -> apply -> resolve -> (maybe) rebuild.
/// The resolve step only runs when `apply` reported a change (no
/// new txs means no new unresolved prevouts to fill); the rebuild
/// step is throttled by `Rebuilder` regardless.
pub fn update(&self) -> Result<()> {
let inner = &*self.0;
let fetched = Fetcher::fetch(
&inner.client,
&inner.state.txs.read(),
&inner.state.graveyard.read(),
)?;
let pulled = Preparer::prepare(
fetched,
&inner.state.txs.read(),
&inner.state.graveyard.read(),
);
if inner.state.apply(pulled) {
Resolver::resolve_in_mempool(&inner.state);
inner.rebuilder.mark_dirty();
}
inner.rebuilder.tick(&inner.client, &inner.state.entries);
Ok(())
}
}

View File

@@ -0,0 +1,69 @@
use brk_types::{MempoolInfo, Transaction, Txid};
use crate::{
steps::preparer::{Addition, Pulled},
stores::{AddrTracker, EntryPool, TxGraveyard, TxStore},
};
/// Applies a prepared diff to in-memory mempool state.
///
/// Removals are torn down first: each tx+entry is moved into the
/// graveyard with its removal reason.
///
/// Additions then publish to live state. For `Revived` additions the
/// tx body is exhumed from the graveyard (no clone); for `Fresh` ones
/// the tx arrives inline from the Preparer.
///
/// Finally the graveyard evicts entries past its retention window.
pub struct Applier;
impl Applier {
/// Apply `pulled` to all buckets. Returns true if anything changed.
pub fn apply(
pulled: Pulled,
info: &mut MempoolInfo,
txs: &mut TxStore,
addrs: &mut AddrTracker,
entries: &mut EntryPool,
graveyard: &mut TxGraveyard,
) -> bool {
let Pulled { added, removed } = pulled;
let has_changes = !added.is_empty() || !removed.is_empty();
for (prefix, reason) in removed {
let Some(entry) = entries.remove(&prefix) else {
continue;
};
let txid = entry.txid.clone();
let Some(tx) = txs.remove(&txid) else {
continue;
};
info.remove(&tx, entry.fee);
addrs.remove_tx(&tx, &txid);
graveyard.bury(txid, tx, entry, reason);
}
let mut to_store: Vec<(Txid, Transaction)> = Vec::with_capacity(added.len());
for addition in added {
let (tx, entry) = match addition {
Addition::Fresh { tx, entry } => (tx, entry),
Addition::Revived { entry } => {
let Some(tomb) = graveyard.exhume(&entry.txid) else {
continue;
};
(tomb.tx, entry)
}
};
info.add(&tx, entry.fee);
addrs.add_tx(&tx, &entry.txid);
let txid = entry.txid.clone();
entries.insert(entry);
to_store.push((txid, tx));
}
txs.extend(to_store);
graveyard.evict_old();
has_changes
}
}

View File

@@ -0,0 +1,10 @@
use brk_rpc::RawTx;
use brk_types::{MempoolEntryInfo, Txid};
use rustc_hash::FxHashMap;
/// Raw RPC output for one pull cycle. Pure data; no interpretation.
pub struct Fetched {
pub entries_info: Vec<MempoolEntryInfo>,
pub new_raws: FxHashMap<Txid, RawTx>,
pub parent_raws: FxHashMap<Txid, RawTx>,
}

View File

@@ -0,0 +1,80 @@
mod fetched;
pub use fetched::Fetched;
use brk_error::Result;
use brk_rpc::{Client, RawTx};
use brk_types::{MempoolEntryInfo, Txid};
use rustc_hash::{FxHashMap, FxHashSet};
use crate::stores::{TxGraveyard, TxStore};
/// Cap on how many new txs we fetch per cycle (applied before the batch RPC
/// so we never hand bitcoind an unbounded batch).
const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000;
/// Talks to Bitcoin Core. Three batched round-trips regardless of
/// mempool size:
/// 1. `getrawmempool verbose` - authoritative listing
/// 2. `getrawtransaction` batch - every new tx (txids not in
/// `known` / `graveyard`, capped at `MAX_TX_FETCHES_PER_CYCLE`)
/// 3. `getrawtransaction` batch - unique confirmed parents of those
/// new txs that aren't resolvable from `known` or step 2.
///
/// Step 3 is best-effort: without `-txindex`, Core returns -5 for every
/// confirmed parent and the batch yields an empty map. `brk_query`
/// fills missing prevouts at read time from the indexer, so this is
/// purely a latency optimization when `-txindex` is available.
pub struct Fetcher;
impl Fetcher {
pub fn fetch(client: &Client, known: &TxStore, graveyard: &TxGraveyard) -> Result<Fetched> {
let entries_info = client.get_raw_mempool_verbose()?;
let new_txids = Self::new_txids(&entries_info, known, graveyard);
let new_raws = client.get_raw_transactions(&new_txids)?;
let parent_txids = Self::unique_confirmed_parents(&new_raws, known);
let parent_raws = client.get_raw_transactions(&parent_txids)?;
Ok(Fetched {
entries_info,
new_raws,
parent_raws,
})
}
/// Txids in the listing that we don't already have cached (live or
/// buried) and therefore need to fetch raw bytes for. Order-preserving
/// so the batch matches the listing order for debuggability.
fn new_txids(
entries_info: &[MempoolEntryInfo],
known: &TxStore,
graveyard: &TxGraveyard,
) -> Vec<Txid> {
entries_info
.iter()
.filter(|info| !known.contains(&info.txid) && !graveyard.contains(&info.txid))
.take(MAX_TX_FETCHES_PER_CYCLE)
.map(|info| info.txid.clone())
.collect()
}
/// Parent txids referenced by `new_raws` inputs that aren't already
/// resolvable: not in the mempool store, not in `new_raws` itself.
fn unique_confirmed_parents(
new_raws: &FxHashMap<Txid, RawTx>,
known: &TxStore,
) -> Vec<Txid> {
let mut set: FxHashSet<Txid> = FxHashSet::default();
for raw in new_raws.values() {
for txin in &raw.tx.input {
let prev: Txid = txin.previous_output.txid.into();
if !known.contains_key(&prev) && !new_raws.contains_key(&prev) {
set.insert(prev);
}
}
}
set.into_iter().collect()
}
}

View File

@@ -0,0 +1,7 @@
//! The five pipeline steps. See the crate-level docs for the cycle.
pub mod applier;
pub mod fetcher;
pub mod preparer;
pub mod rebuilder;
pub mod resolver;

View File

@@ -0,0 +1,124 @@
//! Classification and construction of newly-observed mempool txs.
//!
//! Two kinds of arrival:
//! - **Fresh**: the tx is unknown to us, so we decode the raw bytes,
//! resolve prevouts against `known` or `parent_raws`, and build a
//! full `Transaction` + `Entry`.
//! - **Revived**: the tx is in the graveyard. We rebuild the `Entry`
//! (preserving `first_seen` / `rbf` / `size`) and let the Applier
//! exhume the cached tx body. No raw decoding.
use std::mem;
use brk_rpc::RawTx;
use brk_types::{
MempoolEntryInfo, Timestamp, Transaction, TxIn, TxOut, TxStatus, Txid, TxidPrefix, VSize, Vout,
};
use rustc_hash::FxHashMap;
use smallvec::SmallVec;
use crate::stores::{Entry, Tombstone, TxStore};
/// A newly observed tx. `Fresh` carries decoded raw data (just parsed
/// from `new_raws`); `Revived` carries only the rebuilt entry because
/// the tx body is still sitting in the graveyard and will be exhumed
/// by the Applier.
pub enum Addition {
Fresh { tx: Transaction, entry: Entry },
Revived { entry: Entry },
}
/// Decode a raw tx into a full `Fresh` addition. Resolves prevouts
/// against the live mempool first, then `parent_raws` (confirmed
/// parents fetched in step 3 of the Fetcher pipeline). Inputs whose
/// parent isn't in either source land with `prevout: None` and are
/// filled later by the Resolver or by `brk_query` at read time.
pub(super) fn fresh(
info: &MempoolEntryInfo,
mut raw: RawTx,
parent_raws: &FxHashMap<Txid, RawTx>,
mempool_txs: &TxStore,
) -> Addition {
let total_size = raw.hex.len() / 2;
let rbf = raw.tx.input.iter().any(|i| i.sequence.is_rbf());
let input = mem::take(&mut raw.tx.input)
.into_iter()
.map(|txin| {
let prev_txid: Txid = txin.previous_output.txid.into();
let prev_vout = usize::from(Vout::from(txin.previous_output.vout));
let prevout = if let Some(prev) = mempool_txs.get(&prev_txid) {
prev.output
.get(prev_vout)
.map(|o| TxOut::from((o.script_pubkey.clone(), o.value)))
} else if let Some(parent) = parent_raws.get(&prev_txid) {
parent
.tx
.output
.get(prev_vout)
.map(|o| TxOut::from((o.script_pubkey.clone(), o.value.into())))
} else {
None
};
TxIn {
// Mempool txs are never coinbase (Core rejects
// them from the pool entirely). A missing prevout
// only means we couldn't resolve the confirmed
// parent (no `-txindex`); brk_query fills it at
// read time from the indexer.
is_coinbase: false,
prevout,
txid: prev_txid,
vout: txin.previous_output.vout.into(),
script_sig: txin.script_sig,
script_sig_asm: (),
witness: txin.witness.into(),
sequence: txin.sequence.into(),
inner_redeem_script_asm: (),
inner_witness_script_asm: (),
}
})
.collect();
let mut tx = Transaction {
index: None,
txid: info.txid.clone(),
version: raw.tx.version.into(),
total_sigop_cost: 0,
weight: info.weight.into(),
lock_time: raw.tx.lock_time.into(),
total_size,
fee: info.fee,
input,
output: raw.tx.output.into_iter().map(TxOut::from).collect(),
status: TxStatus::UNCONFIRMED,
};
tx.total_sigop_cost = tx.total_sigop_cost();
let entry = build_entry(info, tx.total_size as u64, rbf, Timestamp::now());
Addition::Fresh { tx, entry }
}
/// Resurrect an entry from a tombstone. The tx body stays buried
/// until the Applier exhumes it; we only rebuild the `Entry` so the
/// preserved `first_seen` / `rbf` / `size` carry over.
pub(super) fn revived(info: &MempoolEntryInfo, tomb: &Tombstone) -> Addition {
let entry = build_entry(info, tomb.entry.size, tomb.entry.rbf, tomb.entry.first_seen);
Addition::Revived { entry }
}
fn build_entry(info: &MempoolEntryInfo, size: u64, rbf: bool, first_seen: Timestamp) -> Entry {
let depends: SmallVec<[TxidPrefix; 2]> = info.depends.iter().map(TxidPrefix::from).collect();
Entry {
txid: info.txid.clone(),
fee: info.fee,
vsize: VSize::from(info.vsize),
size,
depends,
first_seen,
rbf,
}
}

View File

@@ -0,0 +1,61 @@
//! Pipeline step 2: turn `Fetched` raws into a typed diff for the Applier.
//!
//! Pure CPU work, no locks. Three classes of new tx are handled:
//! - **live**: already in `known`, skipped (no update needed)
//! - **revivable**: in the graveyard, resurrected from the tombstone
//! - **fresh**: decoded from `new_raws`, prevouts resolved against
//! `known` or `parent_raws`, RBF detected from the raw tx
//!
//! Removals come from cross-referencing inputs (see `removed.rs`).
mod added;
mod pulled;
mod removed;
pub use added::Addition;
pub use pulled::Pulled;
pub use removed::Removal;
use brk_types::TxidPrefix;
use rustc_hash::FxHashSet;
use crate::{
steps::fetcher::Fetched,
stores::{TxGraveyard, TxStore},
};
pub struct Preparer;
impl Preparer {
pub fn prepare(fetched: Fetched, known: &TxStore, graveyard: &TxGraveyard) -> Pulled {
let Fetched {
entries_info,
mut new_raws,
parent_raws,
} = fetched;
let mut added: Vec<Addition> = Vec::new();
let mut live: FxHashSet<TxidPrefix> =
FxHashSet::with_capacity_and_hasher(entries_info.len(), Default::default());
for info in &entries_info {
live.insert(TxidPrefix::from(&info.txid));
if known.contains(&info.txid) {
continue;
}
if let Some(tomb) = graveyard.get(&info.txid) {
added.push(added::revived(info, tomb));
continue;
}
let Some(raw) = new_raws.remove(&info.txid) else {
continue;
};
added.push(added::fresh(info, raw, &parent_raws, known));
}
let removed = removed::classify(&live, &added, known);
Pulled { added, removed }
}
}

View File

@@ -0,0 +1,10 @@
use brk_types::TxidPrefix;
use rustc_hash::FxHashMap;
use super::{Addition, Removal};
/// Output of one pull cycle: the full diff, ready for the Applier.
pub struct Pulled {
pub added: Vec<Addition>,
pub removed: FxHashMap<TxidPrefix, Removal>,
}

View File

@@ -0,0 +1,58 @@
//! Classification of txs that left the mempool between two pull cycles.
//!
//! `Replaced` = at least one added tx this cycle spends one of its
//! inputs (BIP-125 replacement inferred from conflicting outpoints).
//! `Vanished` = any other reason we can't distinguish from the data
//! at hand (mined, expired, evicted, or replaced by a tx we didn't
//! fetch due to the per-cycle fetch cap).
use brk_types::{Txid, TxidPrefix, Vout};
use rustc_hash::{FxHashMap, FxHashSet};
use super::added::Addition;
use crate::stores::TxStore;
#[derive(Debug)]
pub enum Removal {
Replaced { by: Txid },
Vanished,
}
/// Diff the store against Core's listing. `live` is the set of txid
/// prefixes Core returned this cycle; anything in `known` whose prefix
/// isn't in `live` left the pool. Each loser is classified by cross-
/// referencing its inputs against the freshly added txs' inputs.
pub(super) fn classify(
live: &FxHashSet<TxidPrefix>,
added: &[Addition],
known: &TxStore,
) -> FxHashMap<TxidPrefix, Removal> {
// (parent txid, vout) -> Txid of the new tx that spends it.
// Only `Fresh` additions carry tx input data; revived txs were
// already in-pool and can't be "new spenders" of anything.
let mut spent_by: FxHashMap<(Txid, Vout), Txid> = FxHashMap::default();
for addition in added {
if let Addition::Fresh { tx, .. } = addition {
for txin in &tx.input {
spent_by.insert((txin.txid.clone(), txin.vout), tx.txid.clone());
}
}
}
known
.iter()
.filter_map(|(txid, tx)| {
let prefix = TxidPrefix::from(txid);
if live.contains(&prefix) {
return None;
}
let removal = tx
.input
.iter()
.find_map(|i| spent_by.get(&(i.txid.clone(), i.vout)).cloned())
.map(|by| Removal::Replaced { by })
.unwrap_or(Removal::Vanished);
Some((prefix, removal))
})
.collect()
}

View File

@@ -0,0 +1,85 @@
use std::ops::{Index, IndexMut};
use brk_types::TxidPrefix;
use rustc_hash::FxHashMap;
use super::{pool_index::PoolIndex, tx_node::TxNode};
use crate::stores::{Entry, TxIndex};
/// Type-safe wrapper around Vec<TxNode> that only allows PoolIndex access.
pub struct Graph(Vec<TxNode>);
impl Graph {
#[inline]
pub fn len(&self) -> usize {
self.0.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl Index<PoolIndex> for Graph {
type Output = TxNode;
#[inline]
fn index(&self, idx: PoolIndex) -> &Self::Output {
&self.0[idx.as_usize()]
}
}
impl IndexMut<PoolIndex> for Graph {
#[inline]
fn index_mut(&mut self, idx: PoolIndex) -> &mut Self::Output {
&mut self.0[idx.as_usize()]
}
}
/// Build a dependency graph from mempool entries.
pub fn build_graph(entries: &[Option<Entry>]) -> Graph {
// Pass 1: collect live entries and index their prefixes in lockstep.
// We can't resolve parent links yet because a parent may sit later in
// slot order than its child, so prefix_to_pool needs to be complete
// before we touch `entry.depends`.
let mut live: Vec<(TxIndex, &Entry)> = Vec::with_capacity(entries.len());
let mut prefix_to_pool: FxHashMap<TxidPrefix, PoolIndex> =
FxHashMap::with_capacity_and_hasher(entries.len(), Default::default());
for (i, opt) in entries.iter().enumerate() {
if let Some(e) = opt.as_ref() {
prefix_to_pool.insert(e.txid_prefix(), PoolIndex::from(live.len()));
live.push((TxIndex::from(i), e));
}
}
if live.is_empty() {
return Graph(Vec::new());
}
// Pass 2: materialize nodes with their parent edges.
let mut nodes: Vec<TxNode> = live
.iter()
.map(|(tx_index, entry)| {
let mut node = TxNode::new(*tx_index, entry.fee, entry.vsize);
for parent_prefix in &entry.depends {
if let Some(&parent_pool_idx) = prefix_to_pool.get(parent_prefix) {
node.parents.push(parent_pool_idx);
}
}
node
})
.collect();
// Pass 3: mirror parent edges as children. Direct indexing only;
// no intermediate edge vec.
for i in 0..nodes.len() {
let plen = nodes[i].parents.len();
for j in 0..plen {
let parent_idx = nodes[i].parents[j].as_usize();
nodes[parent_idx].children.push(PoolIndex::from(i));
}
}
Graph(nodes)
}

View File

@@ -0,0 +1,88 @@
//! Throwaway perf bench for `build_graph`.
//!
//! Run with `cargo test --release -p brk_mempool -- --ignored --nocapture
//! perf_build_graph`. Not part of the regular test sweep.
use std::time::Instant;
use bitcoin::hashes::Hash;
use brk_types::{Sats, Timestamp, Txid, TxidPrefix, VSize};
use smallvec::SmallVec;
use super::graph::build_graph;
use crate::stores::Entry;
/// Synthetic mempool: mostly singletons, some CPFP chains/trees.
fn synthetic_mempool(n: usize) -> Vec<Option<Entry>> {
let make_txid = |i: usize| -> Txid {
let mut bytes = [0u8; 32];
bytes[0..8].copy_from_slice(&(i as u64).to_ne_bytes());
bytes[8..16].copy_from_slice(&((i as u64).wrapping_mul(2654435761)).to_ne_bytes());
Txid::from(bitcoin::Txid::from_slice(&bytes).unwrap())
};
let mut entries: Vec<Option<Entry>> = Vec::with_capacity(n);
let mut txids: Vec<Txid> = Vec::with_capacity(n);
for i in 0..n {
let txid = make_txid(i);
txids.push(txid.clone());
// 95% singletons, 4% 1-parent, 1% 2-parent (mimics real mempool).
let depends: SmallVec<[TxidPrefix; 2]> = match i % 100 {
0..=94 => SmallVec::new(),
95..=98 if i > 0 => {
let p = (i.wrapping_mul(7919)) % i;
std::iter::once(TxidPrefix::from(&txids[p])).collect()
}
_ if i > 1 => {
let p1 = (i.wrapping_mul(7919)) % i;
let p2 = (i.wrapping_mul(6151)) % i;
[
TxidPrefix::from(&txids[p1]),
TxidPrefix::from(&txids[p2]),
]
.into_iter()
.collect()
}
_ => SmallVec::new(),
};
entries.push(Some(Entry {
txid,
fee: Sats::from((i as u64).wrapping_mul(137) % 10_000 + 1),
vsize: VSize::from(250u64),
size: 250,
depends,
first_seen: Timestamp::now(),
rbf: false,
}));
}
entries
}
#[test]
#[ignore = "perf benchmark; run with --ignored --nocapture"]
fn perf_build_graph() {
let sizes = [1_000usize, 10_000, 50_000, 100_000, 300_000];
eprintln!();
eprintln!("build_graph perf (release, single call):");
eprintln!(" n build");
eprintln!(" ------------------------");
for &n in &sizes {
let entries = synthetic_mempool(n);
// Warm up allocator.
let _ = build_graph(&entries);
let t = Instant::now();
let g = build_graph(&entries);
let dt = t.elapsed();
let ns = dt.as_nanos();
let pretty = if ns >= 1_000_000 {
format!("{:.2} ms", ns as f64 / 1_000_000.0)
} else {
format!("{:.2} µs", ns as f64 / 1_000.0)
};
eprintln!(" {:<10} {:<10} ({} nodes)", n, pretty, g.len());
}
eprintln!();
}

View File

@@ -15,8 +15,8 @@ use brk_types::{FeeRate, Sats, VSize};
use rustc_hash::FxHashMap;
use smallvec::SmallVec;
use super::{graph::Graph, package::Package};
use crate::types::{PoolIndex, TxIndex};
use super::{graph::Graph, package::Package, pool_index::PoolIndex};
use crate::stores::TxIndex;
/// Cluster-local index for a node within one cluster's flat array.
type LocalIdx = u32;
@@ -59,13 +59,13 @@ pub fn linearize_clusters(graph: &Graph) -> Vec<Package> {
packages
}
/// BFS over (parents + children) adjacency to partition `graph` into
/// DFS over (parents + children) adjacency to partition `graph` into
/// connected components, each re-indexed locally.
fn find_components(graph: &Graph) -> Vec<Cluster> {
let n = graph.len();
let mut seen: Vec<bool> = vec![false; n];
let mut clusters: Vec<Cluster> = Vec::new();
let mut queue: Vec<PoolIndex> = Vec::new();
let mut stack: Vec<PoolIndex> = Vec::new();
for start in 0..n {
if seen[start] {
@@ -73,23 +73,23 @@ fn find_components(graph: &Graph) -> Vec<Cluster> {
}
let mut members: Vec<PoolIndex> = Vec::new();
queue.clear();
queue.push(PoolIndex::from(start));
stack.clear();
stack.push(PoolIndex::from(start));
seen[start] = true;
while let Some(idx) = queue.pop() {
while let Some(idx) = stack.pop() {
members.push(idx);
let node = &graph[idx];
for &p in &node.parents {
if !seen[p.as_usize()] {
seen[p.as_usize()] = true;
queue.push(p);
stack.push(p);
}
}
for &c in &node.children {
if !seen[c.as_usize()] {
seen[c.as_usize()] = true;
queue.push(c);
stack.push(c);
}
}
}

View File

@@ -72,8 +72,17 @@ pub fn linearize(cluster: &Cluster) -> Vec<Chunk> {
remaining &= !mask;
}
canonicalize(&mut chunks);
chunks
canonicalize(chunks)
}
/// Immutable inputs for the brute-force recursion. Packing them into a
/// struct keeps `recurse` to four moving args: `(idx, included, f, v)`.
struct Ctx<'a> {
topo_order: &'a [LocalIdx],
parents_mask: &'a [u128],
fee_of: &'a [u64],
vsize_of: &'a [u64],
remaining: u128,
}
/// Recursive enumeration of topologically-closed subsets of
@@ -85,86 +94,46 @@ fn best_subset(
fee_of: &[u64],
vsize_of: &[u64],
) -> (u128, u64, u64) {
let mut best = (0u128, 0u64, 1u64);
recurse(
0,
let ctx = Ctx {
topo_order,
parents_mask,
remaining,
0,
0,
0,
fee_of,
vsize_of,
&mut best,
);
remaining,
};
let mut best = (0u128, 0u64, 1u64);
recurse(&ctx, 0, 0, 0, 0, &mut best);
best
}
#[allow(clippy::too_many_arguments)]
fn recurse(
idx: usize,
topo_order: &[LocalIdx],
parents_mask: &[u128],
remaining: u128,
included: u128,
f: u64,
v: u64,
fee_of: &[u64],
vsize_of: &[u64],
best: &mut (u128, u64, u64),
) {
if idx == topo_order.len() {
fn recurse(ctx: &Ctx, idx: usize, included: u128, f: u64, v: u64, best: &mut (u128, u64, u64)) {
if idx == ctx.topo_order.len() {
if included != 0 && f as u128 * best.2 as u128 > best.1 as u128 * v as u128 {
*best = (included, f, v);
}
return;
}
let node = topo_order[idx];
let node = ctx.topo_order[idx];
let bit = 1u128 << node;
// Not in remaining, or a parent (within remaining) is excluded:
// this node is forced-excluded, no branching.
if (bit & remaining) == 0 || (parents_mask[node as usize] & remaining & !included) != 0 {
recurse(
idx + 1,
topo_order,
parents_mask,
remaining,
included,
f,
v,
fee_of,
vsize_of,
best,
);
if (bit & ctx.remaining) == 0
|| (ctx.parents_mask[node as usize] & ctx.remaining & !included) != 0
{
recurse(ctx, idx + 1, included, f, v, best);
return;
}
// Exclude
recurse(
idx + 1,
topo_order,
parents_mask,
remaining,
included,
f,
v,
fee_of,
vsize_of,
best,
);
recurse(ctx, idx + 1, included, f, v, best);
// Include
recurse(
ctx,
idx + 1,
topo_order,
parents_mask,
remaining,
included | bit,
f + fee_of[node as usize],
v + vsize_of[node as usize],
fee_of,
vsize_of,
f + ctx.fee_of[node as usize],
v + ctx.vsize_of[node as usize],
best,
);
}
@@ -239,10 +208,9 @@ fn best_ancestor_union(
/// Single-pass stack merge: for each incoming chunk, merge it into
/// the stack top while the merge would raise the top's feerate, then
/// push. O(n) total regardless of how many merges cascade.
fn canonicalize(chunks: &mut Vec<Chunk>) {
let taken = std::mem::take(chunks);
let mut out: Vec<Chunk> = Vec::with_capacity(taken.len());
for mut cur in taken {
fn canonicalize(chunks: Vec<Chunk>) -> Vec<Chunk> {
let mut out: Vec<Chunk> = Vec::with_capacity(chunks.len());
for mut cur in chunks {
while let Some(top) = out.last() {
if cur.fee as u128 * top.vsize as u128 > top.fee as u128 * cur.vsize as u128 {
let mut prev = out.pop().unwrap();
@@ -256,7 +224,7 @@ fn canonicalize(chunks: &mut Vec<Chunk>) {
}
out.push(cur);
}
*chunks = out;
out
}
#[inline]

View File

@@ -13,7 +13,7 @@ use smallvec::SmallVec;
use super::sfl::Chunk;
use super::{Cluster, ClusterNode, LocalIdx, kahn_topo_rank, sfl};
use crate::types::TxIndex;
use crate::stores::TxIndex;
/// Build a `Cluster` from `(fee, vsize)` tuples plus a list of
/// `(parent_local, child_local)` edges. Tx indices are assigned 0..n.

View File

@@ -296,9 +296,12 @@ impl DagRng {
}
}
/// `(fee, vsize)` per node + edge list. Used by random-DAG generators.
type FvAndEdges = (Vec<(u64, u64)>, Vec<(LocalIdx, LocalIdx)>);
/// Random DAG with `n` nodes: each node i > 0 has 0-3 parents drawn
/// uniformly from nodes {0..i}. Fees/vsizes are varied.
fn random_dag(n: usize, seed: u64) -> (Vec<(u64, u64)>, Vec<(LocalIdx, LocalIdx)>) {
fn random_dag(n: usize, seed: u64) -> FvAndEdges {
let mut rng = DagRng::new(seed);
let fees_vsizes: Vec<(u64, u64)> = (0..n)
.map(|_| {
@@ -324,6 +327,7 @@ fn random_dag(n: usize, seed: u64) -> (Vec<(u64, u64)>, Vec<(LocalIdx, LocalIdx)
(fees_vsizes, edges)
}
#[expect(dead_code, reason = "kept for ad-hoc oracle sweeps; called via uncommented stress tests")]
fn assert_optimal_on_random(n: usize, seed: u64) {
let (fv, edges) = random_dag(n, seed);
let cluster = super::make_cluster(&fv, &edges);

View File

@@ -29,10 +29,13 @@ impl Rng {
}
}
/// `(fee, vsize)` per node + edge list.
type FvAndEdges = (Vec<(u64, u64)>, Vec<(LocalIdx, LocalIdx)>);
/// Build a random DAG with `n` nodes. For each node `i` > 0, add a
/// random number of parents from nodes with index < i (guarantees
/// acyclic). Fee and vsize are random in a small range.
fn random_cluster(n: usize, seed: u64) -> (Vec<(u64, u64)>, Vec<(LocalIdx, LocalIdx)>) {
fn random_cluster(n: usize, seed: u64) -> FvAndEdges {
let mut rng = Rng::new(seed);
let mut fees_vsizes = Vec::with_capacity(n);
for _ in 0..n {

View File

@@ -2,11 +2,15 @@ mod graph;
mod linearize;
mod package;
mod partitioner;
mod pool_index;
mod tx_node;
#[cfg(test)]
mod graph_bench;
pub use package::Package;
use crate::entry::Entry;
use crate::stores::Entry;
/// Target vsize per block (~1MB, derived from 4MW weight limit).
pub(crate) const BLOCK_VSIZE: u64 = 1_000_000;

View File

@@ -1,6 +1,6 @@
use brk_types::FeeRate;
use crate::types::TxIndex;
use crate::stores::TxIndex;
/// A CPFP package: transactions the linearizer decided to mine together
/// because a child pays for its parent.

View File

@@ -34,16 +34,10 @@ pub fn partition_into_blocks(
let mut blocks: Vec<Vec<Package>> = Vec::with_capacity(num_blocks);
let normal_blocks = num_blocks.saturating_sub(1);
let mut idx = fill_normal_blocks(&mut slots, &mut blocks, normal_blocks, &mut cluster_next);
let idx = fill_normal_blocks(&mut slots, &mut blocks, normal_blocks, &mut cluster_next);
if blocks.len() < num_blocks {
let mut overflow: Vec<Package> = Vec::new();
while idx < slots.len() {
if let Some(pkg) = slots[idx].take() {
overflow.push(pkg);
}
idx += 1;
}
let overflow: Vec<Package> = slots[idx..].iter_mut().filter_map(Option::take).collect();
if !overflow.is_empty() {
blocks.push(overflow);
}

View File

@@ -1,7 +1,8 @@
use brk_types::{Sats, VSize};
use smallvec::SmallVec;
use crate::types::{PoolIndex, TxIndex};
use super::pool_index::PoolIndex;
use crate::stores::TxIndex;
/// A transaction node in the dependency graph.
///

View File

@@ -0,0 +1,111 @@
pub mod block_builder;
pub mod projected_blocks;
use std::{
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
time::{SystemTime, UNIX_EPOCH},
};
use brk_rpc::Client;
use parking_lot::RwLock;
#[cfg(debug_assertions)]
use self::projected_blocks::verify::Verifier;
use self::{
block_builder::build_projected_blocks,
projected_blocks::{BlockStats, RecommendedFees, Snapshot},
};
use crate::stores::EntryPool;
/// Minimum interval between rebuilds (milliseconds).
const MIN_REBUILD_INTERVAL_MS: u64 = 1000;
/// Owns the projected-blocks `Snapshot` and the scheduling around its
/// rebuild.
///
/// Internally stateful: a `dirty` flag the Applier nudges after each
/// state change, a `last_rebuild_ms` throttle so we rebuild at most
/// once per `MIN_REBUILD_INTERVAL_MS` regardless of churn, and the
/// `Snapshot` itself swapped behind a cheap `Arc` so readers clone a
/// pointer, not the vectors inside.
#[derive(Default)]
pub struct Rebuilder {
snapshot: RwLock<Arc<Snapshot>>,
dirty: AtomicBool,
last_rebuild_ms: AtomicU64,
}
impl Rebuilder {
/// Signal that state has changed and a rebuild is eventually needed.
pub fn mark_dirty(&self) {
self.dirty.store(true, Ordering::Release);
}
/// Rebuild iff dirty and enough time has passed since the last
/// run. Takes a short read lock on `entries` while building and
/// a short write lock on the internal snapshot at swap time.
pub fn tick(&self, client: &Client, entries: &RwLock<EntryPool>) {
if !self.dirty.load(Ordering::Acquire) {
return;
}
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let last = self.last_rebuild_ms.load(Ordering::Acquire);
if now_ms.saturating_sub(last) < MIN_REBUILD_INTERVAL_MS {
return;
}
if self
.last_rebuild_ms
.compare_exchange(last, now_ms, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
return;
}
self.dirty.store(false, Ordering::Release);
let built = {
let entries = entries.read();
let entries_slice = entries.entries();
let blocks = build_projected_blocks(entries_slice);
#[cfg(debug_assertions)]
Verifier::check(client, &blocks, entries_slice);
#[cfg(not(debug_assertions))]
let _ = client;
Snapshot::build(blocks, entries_slice)
};
*self.snapshot.write() = Arc::new(built);
}
/// Cheap: reader clones an `Arc` pointer and releases the lock.
fn current(&self) -> Arc<Snapshot> {
self.snapshot.read().clone()
}
pub fn snapshot(&self) -> Arc<Snapshot> {
self.current()
}
pub fn fees(&self) -> RecommendedFees {
self.current().fees.clone()
}
pub fn block_stats(&self) -> Vec<BlockStats> {
self.current().block_stats.clone()
}
pub fn next_block_hash(&self) -> u64 {
self.current().next_block_hash
}
}

View File

@@ -3,10 +3,11 @@ use std::hash::{DefaultHasher, Hash, Hasher};
use brk_types::RecommendedFees;
use super::{
super::block_builder::Package,
fees,
stats::{self, BlockStats},
};
use crate::{block_builder::Package, entry::Entry, types::TxIndex};
use crate::stores::{Entry, TxIndex};
/// Immutable snapshot of projected blocks.
#[derive(Debug, Clone, Default)]
@@ -16,6 +17,12 @@ pub struct Snapshot {
pub blocks: Vec<Vec<TxIndex>>,
pub block_stats: Vec<BlockStats>,
pub fees: RecommendedFees,
/// ETag-like cache key for the first projected block. A hash of
/// the block's tx ordering, not a Bitcoin block header hash (no
/// header exists yet - it's a projection). Precomputed at build
/// time since the snapshot is immutable; `0` iff there are no
/// projected blocks.
pub next_block_hash: u64,
}
impl Snapshot {
@@ -28,21 +35,23 @@ impl Snapshot {
let fees = fees::compute_recommended_fees(&block_stats);
let blocks = blocks
let blocks: Vec<Vec<TxIndex>> = blocks
.into_iter()
.map(|block| block.into_iter().flat_map(|pkg| pkg.txs).collect())
.collect();
let next_block_hash = Self::hash_next_block(&blocks);
Self {
blocks,
block_stats,
fees,
next_block_hash,
}
}
/// Hash of the first projected block (the one about to be mined).
pub fn next_block_hash(&self) -> u64 {
let Some(block) = self.blocks.first() else {
fn hash_next_block(blocks: &[Vec<TxIndex>]) -> u64 {
let Some(block) = blocks.first() else {
return 0;
};
let mut hasher = DefaultHasher::new();

View File

@@ -1,6 +1,7 @@
use brk_types::{FeeRate, Sats, VSize};
use crate::{block_builder::Package, entry::Entry};
use super::super::block_builder::Package;
use crate::stores::Entry;
/// Statistics for a single projected block.
#[derive(Debug, Clone, Default)]
@@ -32,10 +33,6 @@ impl BlockStats {
/// containing package's `fee_rate` to the percentile distribution,
/// since that's the rate the miner collects per vsize.
pub fn compute_block_stats(block: &[Package], entries: &[Option<Entry>]) -> BlockStats {
if block.is_empty() {
return BlockStats::default();
}
let mut total_fee = Sats::default();
let mut total_vsize = VSize::default();
let mut total_size: u64 = 0;

View File

@@ -3,11 +3,8 @@ use brk_types::{Sats, SatsSigned, TxidPrefix};
use rustc_hash::{FxHashMap, FxHashSet};
use tracing::{debug, warn};
use crate::{
block_builder::{BLOCK_VSIZE, Package},
entry::Entry,
types::TxIndex,
};
use super::super::block_builder::{BLOCK_VSIZE, Package};
use crate::stores::{Entry, TxIndex};
type PrefixSet = FxHashSet<TxidPrefix>;
type FeeByPrefix = FxHashMap<TxidPrefix, Sats>;
@@ -48,12 +45,12 @@ impl Verifier {
}
}
fn live_entry<'e>(
entries: &'e [Option<Entry>],
fn live_entry(
entries: &[Option<Entry>],
tx_index: TxIndex,
b: usize,
p: usize,
) -> &'e Entry {
) -> &Entry {
entries[tx_index.as_usize()]
.as_ref()
.unwrap_or_else(|| panic!("block {b} pkg {p}: dead tx_index {tx_index:?}"))

View File

@@ -0,0 +1,139 @@
//! Prevout resolution for live mempool txs.
//!
//! A fresh tx can land in the store with `prevout: None` on some
//! inputs when the Preparer can't see the parent (parent arrived in
//! the same cycle as the child, or parent is confirmed and Core
//! lacks `-txindex`). Two paths fix that, both writing through the
//! same `apply_fills` -> `add_input` plumbing:
//!
//! - [`Resolver::resolve_in_mempool`]: same-cycle parents from the
//! live `txs` map. Run by the orchestrator after each successful
//! `MempoolState::apply`. No external dependency.
//! - [`Resolver::resolve_external`]: caller-supplied resolver
//! (typically the brk indexer). Run on demand by API consumers
//! that have a confirmed-tx data source. Lock-free during the
//! resolver call.
//!
//! Both phases:
//! 1. Snapshot under `txs.read()`, gather work for unresolved txs
//! (early-exit if `txs.unresolved()` is empty).
//! 2. (external only) Call the resolver outside any lock.
//! 3. Write fills under `txs.write()` + `addrs.write()`, in that
//! order to match the Applier's lock order.
//!
//! Idempotent: `apply_fills` checks `prevout.is_none()` per input
//! and bails if the tx was removed between phases.
use brk_types::{TxOut, Txid, Vin, Vout};
use crate::stores::MempoolState;
/// Per-tx fills to apply: (vin index, resolved prevout).
type Fills = Vec<(Vin, TxOut)>;
/// Per-tx holes to resolve: (vin index, parent txid, parent vout).
type Holes = Vec<(Vin, Txid, Vout)>;
pub struct Resolver;
impl Resolver {
/// Fill prevouts whose parent is also live in the mempool.
///
/// Called by the orchestrator after each successful
/// `MempoolState::apply`. Catches parent/child pairs that arrived
/// in the same cycle: the Preparer resolves against a snapshot
/// taken before the cycle's adds were applied, so neither parent
/// nor child is in it; both are in `txs` by the time we run.
pub fn resolve_in_mempool(state: &MempoolState) -> bool {
let filled: Vec<(Txid, Fills)> = {
let txs = state.txs.read();
if txs.unresolved().is_empty() {
return false;
}
txs.unresolved()
.iter()
.filter_map(|txid| {
let tx = txs.get(txid)?;
let fills: Fills = tx
.input
.iter()
.enumerate()
.filter(|(_, txin)| txin.prevout.is_none())
.filter_map(|(i, txin)| {
let parent = txs.get(&txin.txid)?;
let out = parent.output.get(usize::from(txin.vout))?;
Some((Vin::from(i), out.clone()))
})
.collect();
(!fills.is_empty()).then_some((txid.clone(), fills))
})
.collect()
};
Self::write_back(state, filled)
}
/// Fill prevouts via an external resolver, typically backed by the
/// brk indexer for confirmed parents.
///
/// Phase 1 collects holes under `txs.read()`; phase 2 runs the
/// resolver outside any lock; phase 3 writes back. Holes already
/// resolvable from in-mempool parents have been filled by
/// [`Resolver::resolve_in_mempool`] in the preceding `apply`, so
/// anything reaching the resolver here is genuinely external.
pub fn resolve_external<F>(state: &MempoolState, resolver: F) -> bool
where
F: Fn(&Txid, Vout) -> Option<TxOut>,
{
let holes: Vec<(Txid, Holes)> = {
let txs = state.txs.read();
if txs.unresolved().is_empty() {
return false;
}
txs.unresolved()
.iter()
.filter_map(|txid| {
let tx = txs.get(txid)?;
let holes: Holes = tx
.input
.iter()
.enumerate()
.filter(|(_, txin)| txin.prevout.is_none())
.map(|(i, txin)| (Vin::from(i), txin.txid.clone(), txin.vout))
.collect();
(!holes.is_empty()).then_some((txid.clone(), holes))
})
.collect()
};
let filled: Vec<(Txid, Fills)> = holes
.into_iter()
.filter_map(|(txid, holes)| {
let fills: Fills = holes
.into_iter()
.filter_map(|(vin, prev_txid, vout)| {
resolver(&prev_txid, vout).map(|o| (vin, o))
})
.collect();
(!fills.is_empty()).then_some((txid, fills))
})
.collect();
Self::write_back(state, filled)
}
/// Apply per-tx fills under `txs.write()` + `addrs.write()`.
/// Each successful prevout write is folded into `AddrTracker` via
/// `add_input`. Lock order matches the Applier's (txs before addrs).
fn write_back(state: &MempoolState, fills: Vec<(Txid, Fills)>) -> bool {
if fills.is_empty() {
return false;
}
let mut txs = state.txs.write();
let mut addrs = state.addrs.write();
for (txid, tx_fills) in fills {
for prevout in txs.apply_fills(&txid, tx_fills) {
addrs.add_input(&txid, &prevout);
}
}
true
}
}

View File

@@ -1,4 +1,6 @@
use brk_types::{AddrBytes, AddrMempoolStats, Transaction, Txid};
use std::hash::{DefaultHasher, Hash, Hasher};
use brk_types::{AddrBytes, AddrMempoolStats, Transaction, TxOut, Txid};
use derive_more::Deref;
use rustc_hash::{FxHashMap, FxHashSet};
@@ -20,6 +22,34 @@ impl AddrTracker {
self.update(tx, txid, false);
}
/// Hash of an address's per-mempool stats. Stable while the address
/// is unchanged; cheaper to recompute than to track invalidation.
/// Returns 0 for unknown addresses (collision with a real hash is
/// astronomically unlikely and only costs one ETag false-hit if it
/// ever happens).
pub fn stats_hash(&self, addr: &AddrBytes) -> u64 {
let Some((stats, _)) = self.0.get(addr) else {
return 0;
};
let mut hasher = DefaultHasher::new();
stats.hash(&mut hasher);
hasher.finish()
}
/// Fold a single newly-resolved input into the per-address stats.
/// Called by the Resolver after a prevout that was previously
/// `None` has been filled. Inputs whose prevout doesn't resolve
/// to an addr are no-ops.
pub fn add_input(&mut self, txid: &Txid, prevout: &TxOut) {
let Some(bytes) = prevout.addr_bytes() else {
return;
};
let (stats, txids) = self.0.entry(bytes).or_default();
txids.insert(txid.clone());
stats.sending(prevout);
stats.update_tx_count(txids.len() as u32);
}
fn update(&mut self, tx: &Transaction, txid: &Txid, is_addition: bool) {
// Inputs: track sending
for txin in &tx.input {

View File

@@ -0,0 +1,39 @@
use brk_types::{FeeRate, Sats, Timestamp, Txid, TxidPrefix, VSize};
use smallvec::SmallVec;
/// A mempool transaction entry.
///
/// Stores only immutable per-tx facts. Ancestor aggregates are
/// deliberately not cached: they're derivable from the live
/// dependency graph, and any cached copy would go stale the moment
/// any ancestor confirms or is replaced.
#[derive(Debug, Clone)]
pub struct Entry {
pub txid: Txid,
pub fee: Sats,
pub vsize: VSize,
/// Serialized tx size in bytes (witness + non-witness), from the raw tx.
pub size: u64,
/// Parent txid prefixes (most txs have 0-2 parents).
///
/// May reference parents no longer in the pool; consumers resolve
/// against the live pool and drop misses, so staleness here is
/// self-healing.
pub depends: SmallVec<[TxidPrefix; 2]>,
/// When this tx was first seen in the mempool.
pub first_seen: Timestamp,
/// BIP-125 explicit signaling: any input has sequence < 0xfffffffe.
pub rbf: bool,
}
impl Entry {
#[inline]
pub fn fee_rate(&self) -> FeeRate {
FeeRate::from((self.fee, self.vsize))
}
#[inline]
pub fn txid_prefix(&self) -> TxidPrefix {
TxidPrefix::from(&self.txid)
}
}

View File

@@ -0,0 +1,72 @@
use brk_types::TxidPrefix;
use rustc_hash::FxHashMap;
use smallvec::SmallVec;
use super::{Entry, TxIndex};
/// Pool of mempool entries with slot recycling.
///
/// Slot-based storage: removed entries leave holes that are reused
/// by the next insert, so `TxIndex` stays stable for the lifetime of
/// an entry. Only stores what can't be derived: the entries
/// themselves, their prefix-to-slot index, and the free slot list.
#[derive(Default)]
pub struct EntryPool {
entries: Vec<Option<Entry>>,
prefix_to_idx: FxHashMap<TxidPrefix, TxIndex>,
free_slots: Vec<TxIndex>,
}
impl EntryPool {
/// Insert an entry, returning its index. The prefix is derived from
/// `entry.txid`, so the caller never has to pass it in.
pub fn insert(&mut self, entry: Entry) -> TxIndex {
let prefix = entry.txid_prefix();
let idx = match self.free_slots.pop() {
Some(idx) => {
self.entries[idx.as_usize()] = Some(entry);
idx
}
None => {
let idx = TxIndex::from(self.entries.len());
self.entries.push(Some(entry));
idx
}
};
self.prefix_to_idx.insert(prefix, idx);
idx
}
/// Get an entry by its txid prefix.
pub fn get(&self, prefix: &TxidPrefix) -> Option<&Entry> {
let idx = self.prefix_to_idx.get(prefix)?;
self.entries.get(idx.as_usize())?.as_ref()
}
/// Direct children of a transaction (txs whose `depends` includes
/// `prefix`). Derived on demand via a linear scan — called only by
/// the CPFP query endpoint, which is not on the hot path.
pub fn children(&self, prefix: &TxidPrefix) -> SmallVec<[TxidPrefix; 2]> {
let mut out: SmallVec<[TxidPrefix; 2]> = SmallVec::new();
for entry in self.entries.iter().flatten() {
if entry.depends.iter().any(|p| p == prefix) {
out.push(entry.txid_prefix());
}
}
out
}
/// Remove an entry by its txid prefix, returning it if present.
pub fn remove(&mut self, prefix: &TxidPrefix) -> Option<Entry> {
let idx = self.prefix_to_idx.remove(prefix)?;
let entry = self.entries.get_mut(idx.as_usize()).and_then(Option::take)?;
self.free_slots.push(idx);
Some(entry)
}
/// Get the entries slice for block building.
pub fn entries(&self) -> &[Option<Entry>] {
&self.entries
}
}

View File

@@ -0,0 +1,32 @@
//! State held inside the mempool, plus the value types stored in it.
//!
//! [`state::MempoolState`] aggregates four locked buckets:
//!
//! - [`tx_store::TxStore`] - full `Transaction` data for live txs.
//! - [`addr_tracker::AddrTracker`] - per-address mempool stats.
//! - [`entry_pool::EntryPool`] - slot-recycled `Entry` storage indexed
//! by [`tx_index::TxIndex`].
//! - [`tx_graveyard::TxGraveyard`] - recently-dropped txs as
//! [`tombstone::Tombstone`]s, retained for reappearance detection
//! and post-mine analytics.
//!
//! A fifth bucket, `info`, holds a `MempoolInfo` from `brk_types`,
//! so it has no file here.
pub mod addr_tracker;
pub mod entry;
pub mod entry_pool;
pub mod state;
pub mod tombstone;
pub mod tx_graveyard;
pub mod tx_index;
pub mod tx_store;
pub use addr_tracker::AddrTracker;
pub use entry::Entry;
pub use entry_pool::EntryPool;
pub use state::MempoolState;
pub use tombstone::Tombstone;
pub use tx_graveyard::TxGraveyard;
pub use tx_index::TxIndex;
pub use tx_store::TxStore;

View File

@@ -0,0 +1,35 @@
use brk_types::MempoolInfo;
use parking_lot::RwLock;
use super::{AddrTracker, EntryPool, TxGraveyard, TxStore};
use crate::steps::{applier::Applier, preparer::Pulled};
/// The five buckets making up live mempool state.
///
/// Each bucket has its own `RwLock` so readers of different buckets
/// don't contend with each other; the Applier takes all five write
/// locks in a fixed order for a brief window once per cycle.
#[derive(Default)]
pub struct MempoolState {
pub(crate) info: RwLock<MempoolInfo>,
pub(crate) txs: RwLock<TxStore>,
pub(crate) addrs: RwLock<AddrTracker>,
pub(crate) entries: RwLock<EntryPool>,
pub(crate) graveyard: RwLock<TxGraveyard>,
}
impl MempoolState {
/// Apply a prepared diff to all five buckets atomically. Returns
/// true iff the Applier observed any change. Same-cycle prevout
/// resolution is a separate pipeline step run by the orchestrator.
pub fn apply(&self, pulled: Pulled) -> bool {
Applier::apply(
pulled,
&mut self.info.write(),
&mut self.txs.write(),
&mut self.addrs.write(),
&mut self.entries.write(),
&mut self.graveyard.write(),
)
}
}

View File

@@ -0,0 +1,45 @@
use std::time::{Duration, Instant};
use brk_types::Transaction;
use super::Entry;
use crate::steps::preparer::Removal;
/// A buried mempool tx, retained for reappearance detection and
/// post-mine analytics.
pub struct Tombstone {
pub tx: Transaction,
pub entry: Entry,
removal: Removal,
removed_at: Instant,
}
impl Tombstone {
pub(super) fn new(tx: Transaction, entry: Entry, removal: Removal, removed_at: Instant) -> Self {
Self {
tx,
entry,
removal,
removed_at,
}
}
pub fn reason(&self) -> &Removal {
&self.removal
}
pub fn age(&self) -> Duration {
self.removed_at.elapsed()
}
pub(super) fn removed_at(&self) -> Instant {
self.removed_at
}
pub(super) fn replaced_by(&self) -> Option<&brk_types::Txid> {
match &self.removal {
Removal::Replaced { by } => Some(by),
Removal::Vanished => None,
}
}
}

View File

@@ -0,0 +1,82 @@
use std::{
collections::VecDeque,
time::{Duration, Instant},
};
use brk_types::{Transaction, Txid};
use rustc_hash::FxHashMap;
use super::{Entry, Tombstone};
use crate::steps::preparer::Removal;
/// How long a dropped tx stays retained after removal.
const RETENTION: Duration = Duration::from_secs(60 * 60);
/// Recently-dropped txs retained for reappearance detection (Puller can revive
/// them without RPC) and post-mine analytics (RBF/replacement chains, etc.).
#[derive(Default)]
pub struct TxGraveyard {
tombstones: FxHashMap<Txid, Tombstone>,
order: VecDeque<(Instant, Txid)>,
}
impl TxGraveyard {
pub fn contains(&self, txid: &Txid) -> bool {
self.tombstones.contains_key(txid)
}
pub fn get(&self, txid: &Txid) -> Option<&Tombstone> {
self.tombstones.get(txid)
}
/// Tombstones marked as `Replaced { by: replacer }`. Used to walk
/// backward through RBF history: given a tx that's still live (or
/// in the graveyard), find every tx it displaced.
pub fn predecessors_of<'a>(
&'a self,
replacer: &'a Txid,
) -> impl Iterator<Item = (&'a Txid, &'a Tombstone)> {
self.tombstones
.iter()
.filter_map(move |(txid, ts)| (ts.replaced_by() == Some(replacer)).then_some((txid, ts)))
}
pub fn bury(&mut self, txid: Txid, tx: Transaction, entry: Entry, removal: Removal) {
let now = Instant::now();
self.tombstones
.insert(txid.clone(), Tombstone::new(tx, entry, removal, now));
self.order.push_back((now, txid));
}
/// Remove and return the tombstone, e.g. when the tx comes back to life.
pub fn exhume(&mut self, txid: &Txid) -> Option<Tombstone> {
self.tombstones.remove(txid)
}
/// Drop tombstones older than RETENTION. O(k) in the number of evictions.
///
/// The order queue may carry stale entries (from re-buries or prior
/// exhumes); the timestamp-match check skips those without disturbing
/// live tombstones.
pub fn evict_old(&mut self) {
while let Some(&(t, _)) = self.order.front() {
if t.elapsed() < RETENTION {
break;
}
let (_, txid) = self.order.pop_front().unwrap();
if let Some(ts) = self.tombstones.get(&txid)
&& ts.removed_at() == t
{
self.tombstones.remove(&txid);
}
}
}
pub fn len(&self) -> usize {
self.tombstones.len()
}
pub fn is_empty(&self) -> bool {
self.tombstones.is_empty()
}
}

View File

@@ -0,0 +1,90 @@
use brk_types::{MempoolRecentTx, Transaction, TxOut, Txid, Vin};
use derive_more::Deref;
use rustc_hash::{FxHashMap, FxHashSet};
const RECENT_CAP: usize = 10;
/// Store of full transaction data for API access.
#[derive(Default, Deref)]
pub struct TxStore {
#[deref]
txs: FxHashMap<Txid, Transaction>,
recent: Vec<MempoolRecentTx>,
/// Txids whose tx has at least one input with `prevout == None`.
/// Maintained on every `extend` / `remove` / `apply_fills` so the
/// post-update prevout filler can early-exit when this set is empty.
unresolved: FxHashSet<Txid>,
}
impl TxStore {
pub fn contains(&self, txid: &Txid) -> bool {
self.txs.contains_key(txid)
}
/// Insert each `(Txid, Transaction)` yielded by `items`, and push
/// up to `RECENT_CAP` of them onto the front of `recent` as the
/// newest-seen window (older entries fall off the end).
pub fn extend<I>(&mut self, items: I)
where
I: IntoIterator<Item = (Txid, Transaction)>,
{
let mut new_recent: Vec<MempoolRecentTx> = Vec::with_capacity(RECENT_CAP);
for (txid, tx) in items {
if new_recent.len() < RECENT_CAP {
new_recent.push(MempoolRecentTx::from((&txid, &tx)));
}
if tx.input.iter().any(|i| i.prevout.is_none()) {
self.unresolved.insert(txid.clone());
}
self.txs.insert(txid, tx);
}
let keep = RECENT_CAP.saturating_sub(new_recent.len());
new_recent.extend(self.recent.drain(..keep.min(self.recent.len())));
self.recent = new_recent;
}
pub fn recent(&self) -> &[MempoolRecentTx] {
&self.recent
}
/// Remove a single tx and return its stored data if present. `recent`
/// isn't touched: it's an "added" window, not a live-set mirror.
pub fn remove(&mut self, txid: &Txid) -> Option<Transaction> {
self.unresolved.remove(txid);
self.txs.remove(txid)
}
/// Set of txids with at least one unfilled prevout. Used by the
/// prevout filler as a cheap "is there any work?" gate.
pub fn unresolved(&self) -> &FxHashSet<Txid> {
&self.unresolved
}
/// Apply resolved prevouts to a tx in place. `fills` is `(vin, prevout)`.
/// Returns the prevouts that were actually written (so the caller can
/// fold them into `AddrTracker`). Updates `unresolved` if the tx is
/// fully resolved after the fill, and recomputes `total_sigop_cost`
/// since the P2SH and witness components depend on prevouts.
pub fn apply_fills(&mut self, txid: &Txid, fills: Vec<(Vin, TxOut)>) -> Vec<TxOut> {
let Some(tx) = self.txs.get_mut(txid) else {
return Vec::new();
};
let mut applied = Vec::with_capacity(fills.len());
for (vin, prevout) in fills {
if let Some(txin) = tx.input.get_mut(usize::from(vin))
&& txin.prevout.is_none()
{
txin.prevout = Some(prevout.clone());
applied.push(prevout);
}
}
if !applied.is_empty() {
tx.total_sigop_cost = tx.total_sigop_cost();
}
if !tx.input.iter().any(|i| i.prevout.is_none()) {
self.unresolved.remove(txid);
}
applied
}
}

View File

@@ -1,354 +0,0 @@
use std::{
hash::{DefaultHasher, Hash, Hasher},
mem,
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use bitcoin::hex::DisplayHex;
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{
AddrBytes, BlockHash, MempoolEntryInfo, MempoolInfo, Timestamp, Transaction, TxIn, TxOut,
TxStatus, Txid, TxidPrefix, VSize, Vout,
};
use derive_more::Deref;
use parking_lot::{RwLock, RwLockReadGuard};
use rustc_hash::FxHashMap;
use tracing::error;
use crate::{
addrs::AddrTracker,
block_builder::build_projected_blocks,
entry::Entry,
entry_pool::EntryPool,
projected_blocks::{BlockStats, RecommendedFees, Snapshot},
tx_store::TxStore,
types::TxWithHex,
};
/// Max new txs to fetch full data for per update cycle (for address tracking).
const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000;
/// Minimum interval between rebuilds (milliseconds).
const MIN_REBUILD_INTERVAL_MS: u64 = 1000;
/// Mempool monitor.
///
/// Thread-safe wrapper around `MempoolInner`. Free to clone.
#[derive(Clone, Deref)]
pub struct Mempool(Arc<MempoolInner>);
impl Mempool {
pub fn new(client: &Client) -> Self {
Self(Arc::new(MempoolInner::new(client.clone())))
}
}
/// Inner mempool state and logic.
pub struct MempoolInner {
client: Client,
info: RwLock<MempoolInfo>,
txs: RwLock<TxStore>,
addrs: RwLock<AddrTracker>,
entries: RwLock<EntryPool>,
snapshot: RwLock<Snapshot>,
dirty: AtomicBool,
last_rebuild_ms: AtomicU64,
}
impl MempoolInner {
pub fn new(client: Client) -> Self {
Self {
client,
info: RwLock::new(MempoolInfo::default()),
txs: RwLock::new(TxStore::default()),
addrs: RwLock::new(AddrTracker::default()),
entries: RwLock::new(EntryPool::default()),
snapshot: RwLock::new(Snapshot::default()),
dirty: AtomicBool::new(false),
last_rebuild_ms: AtomicU64::new(0),
}
}
pub fn get_info(&self) -> MempoolInfo {
self.info.read().clone()
}
pub fn get_fees(&self) -> RecommendedFees {
self.snapshot.read().fees.clone()
}
pub fn get_snapshot(&self) -> Snapshot {
self.snapshot.read().clone()
}
pub fn get_block_stats(&self) -> Vec<BlockStats> {
self.snapshot.read().block_stats.clone()
}
pub fn next_block_hash(&self) -> u64 {
self.snapshot.read().next_block_hash()
}
pub fn addr_hash(&self, addr: &AddrBytes) -> u64 {
let addrs = self.addrs.read();
let Some((stats, _)) = addrs.get(addr) else {
return 0;
};
let mut hasher = DefaultHasher::new();
stats.hash(&mut hasher);
hasher.finish()
}
pub fn get_txs(&self) -> RwLockReadGuard<'_, TxStore> {
self.txs.read()
}
pub fn get_entries(&self) -> RwLockReadGuard<'_, EntryPool> {
self.entries.read()
}
pub fn get_addrs(&self) -> RwLockReadGuard<'_, AddrTracker> {
self.addrs.read()
}
/// Start an infinite update loop with a 1 second interval.
pub fn start(&self) {
loop {
if let Err(e) = self.update() {
error!("Error updating mempool: {}", e);
}
thread::sleep(Duration::from_secs(1));
}
}
/// Sync with Bitcoin Core mempool and rebuild projections if needed.
pub fn update(&self) -> Result<()> {
let entries_info = self.client.get_raw_mempool_verbose()?;
let new_txs = self.fetch_new_txs(&entries_info);
let has_changes = self.apply_changes(&entries_info, new_txs);
if has_changes {
self.dirty.store(true, Ordering::Release);
}
self.rebuild_if_needed();
Ok(())
}
/// Fetch full transaction data for new txids (needed for address tracking).
fn fetch_new_txs(&self, entries_info: &[MempoolEntryInfo]) -> FxHashMap<Txid, TxWithHex> {
let txs = self.txs.read();
entries_info
.iter()
.filter(|e| !txs.contains(&e.txid))
.take(MAX_TX_FETCHES_PER_CYCLE)
.filter_map(|entry| {
self.build_transaction(entry, &txs)
.ok()
.map(|tx| (entry.txid.clone(), tx))
})
.collect()
}
fn build_transaction(
&self,
entry: &MempoolEntryInfo,
mempool_txs: &TxStore,
) -> Result<TxWithHex> {
let (mut btc_tx, hex) = self.client.get_mempool_raw_tx(&entry.txid)?;
let total_size = hex.len() / 2;
let total_sigop_cost = btc_tx.total_sigop_cost(|_| None);
// Collect unique parent txids not in the mempool store,
// fetch each once instead of one get_tx_out per input
let mut parent_cache: FxHashMap<Txid, Vec<bitcoin::TxOut>> = FxHashMap::default();
for txin in &btc_tx.input {
let prev_txid: Txid = txin.previous_output.txid.into();
if !mempool_txs.contains_key(&prev_txid)
&& !parent_cache.contains_key(&prev_txid)
&& let Ok(prev) = self
.client
.get_raw_transaction(&prev_txid, None as Option<&BlockHash>)
{
parent_cache.insert(prev_txid, prev.output);
}
}
let input = mem::take(&mut btc_tx.input)
.into_iter()
.map(|txin| {
let prev_txid: Txid = txin.previous_output.txid.into();
let prev_vout = usize::from(Vout::from(txin.previous_output.vout));
let prevout = if let Some(prev) = mempool_txs.get(&prev_txid) {
prev.tx()
.output
.get(prev_vout)
.map(|o| TxOut::from((o.script_pubkey.clone(), o.value)))
} else if let Some(outputs) = parent_cache.get(&prev_txid) {
outputs
.get(prev_vout)
.map(|o| TxOut::from((o.script_pubkey.clone(), o.value.into())))
} else {
None
};
TxIn {
is_coinbase: prevout.is_none(),
prevout,
txid: prev_txid,
vout: txin.previous_output.vout.into(),
script_sig: txin.script_sig,
script_sig_asm: (),
witness: txin
.witness
.iter()
.map(|w| w.to_lower_hex_string())
.collect(),
sequence: txin.sequence.into(),
inner_redeem_script_asm: (),
inner_witness_script_asm: (),
}
})
.collect();
let tx = Transaction {
index: None,
txid: entry.txid.clone(),
version: btc_tx.version.into(),
total_sigop_cost,
weight: entry.weight.into(),
lock_time: btc_tx.lock_time.into(),
total_size,
fee: entry.fee,
input,
output: btc_tx.output.into_iter().map(TxOut::from).collect(),
status: TxStatus::UNCONFIRMED,
};
Ok(TxWithHex::new(tx, hex))
}
/// Apply transaction additions and removals. Returns true if there were changes.
fn apply_changes(
&self,
entries_info: &[MempoolEntryInfo],
new_txs: FxHashMap<Txid, TxWithHex>,
) -> bool {
let entries_by_prefix: FxHashMap<TxidPrefix, &MempoolEntryInfo> = entries_info
.iter()
.map(|e| (TxidPrefix::from(&e.txid), e))
.collect();
let mut info = self.info.write();
let mut txs = self.txs.write();
let mut addrs = self.addrs.write();
let mut entries = self.entries.write();
let mut had_removals = false;
let had_additions = !new_txs.is_empty();
// Remove transactions no longer in mempool
txs.retain_or_remove(
|txid| entries_by_prefix.contains_key(&TxidPrefix::from(txid)),
|txid, tx_with_hex| {
had_removals = true;
let tx = tx_with_hex.tx();
let prefix = TxidPrefix::from(txid);
// Get fee from entries (before removing) - this is the authoritative fee from Bitcoin Core
let fee = entries.get(&prefix).map(|e| e.fee).unwrap_or_default();
info.remove(tx, fee);
addrs.remove_tx(tx, txid);
entries.remove(&prefix);
},
);
// Add new transactions
for (txid, tx_with_hex) in &new_txs {
let tx = tx_with_hex.tx();
let prefix = TxidPrefix::from(txid);
let Some(entry_info) = entries_by_prefix.get(&prefix) else {
continue;
};
info.add(tx, entry_info.fee);
addrs.add_tx(tx, txid);
entries.insert(
prefix,
Entry {
txid: entry_info.txid.clone(),
fee: entry_info.fee,
vsize: VSize::from(entry_info.vsize),
size: tx.total_size as u64,
ancestor_fee: entry_info.ancestor_fee,
ancestor_vsize: VSize::from(entry_info.ancestor_size),
depends: entry_info.depends.iter().map(TxidPrefix::from).collect(),
first_seen: Timestamp::now(),
},
);
}
txs.extend(new_txs);
had_removals || had_additions
}
/// Rebuild projected blocks if dirty and enough time has passed.
fn rebuild_if_needed(&self) {
if !self.dirty.load(Ordering::Acquire) {
return;
}
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let last = self.last_rebuild_ms.load(Ordering::Acquire);
if now_ms.saturating_sub(last) < MIN_REBUILD_INTERVAL_MS {
return;
}
if self
.last_rebuild_ms
.compare_exchange(last, now_ms, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
return;
}
self.dirty.store(false, Ordering::Release);
// let i = Instant::now();
self.rebuild_projected_blocks();
// debug!("mempool: rebuild_projected_blocks in {:?}", i.elapsed());
}
/// Rebuild projected blocks snapshot.
fn rebuild_projected_blocks(&self) {
let entries = self.entries.read();
let entries_slice = entries.entries();
let blocks = build_projected_blocks(entries_slice);
#[cfg(debug_assertions)]
crate::projected_blocks::verify::Verifier::check(&self.client, &blocks, entries_slice);
let snapshot = Snapshot::build(blocks, entries_slice);
*self.snapshot.write() = snapshot;
}
}

View File

@@ -1,56 +0,0 @@
use brk_types::{MempoolRecentTx, Txid};
use derive_more::Deref;
use rustc_hash::FxHashMap;
use crate::types::TxWithHex;
const RECENT_CAP: usize = 10;
/// Store of full transaction data for API access.
#[derive(Default, Deref)]
pub struct TxStore {
#[deref]
txs: FxHashMap<Txid, TxWithHex>,
recent: Vec<MempoolRecentTx>,
}
impl TxStore {
/// Check if a transaction exists.
pub fn contains(&self, txid: &Txid) -> bool {
self.txs.contains_key(txid)
}
/// Add transactions in bulk.
pub fn extend(&mut self, txs: FxHashMap<Txid, TxWithHex>) {
let mut new: Vec<_> = txs
.iter()
.take(RECENT_CAP)
.map(|(txid, tx_hex)| MempoolRecentTx::from((txid, tx_hex.tx())))
.collect();
let keep = RECENT_CAP.saturating_sub(new.len());
new.extend(self.recent.drain(..keep.min(self.recent.len())));
self.recent = new;
self.txs.extend(txs);
}
/// Last 10 transactions to enter the mempool.
pub fn recent(&self) -> &[MempoolRecentTx] {
&self.recent
}
/// Keep items matching predicate, call `on_remove` for each removed item.
pub fn retain_or_remove<K, R>(&mut self, mut keep: K, mut on_remove: R)
where
K: FnMut(&Txid) -> bool,
R: FnMut(&Txid, &TxWithHex),
{
self.txs.retain(|txid, tx| {
if keep(txid) {
true
} else {
on_remove(txid, tx);
false
}
});
}
}

View File

@@ -1,7 +0,0 @@
mod pool_index;
mod tx_index;
mod tx_with_hex;
pub use pool_index::PoolIndex;
pub use tx_index::TxIndex;
pub use tx_with_hex::TxWithHex;

View File

@@ -1,26 +0,0 @@
use brk_types::Transaction;
/// A transaction with its raw hex representation
#[derive(Debug, Clone)]
pub struct TxWithHex {
tx: Transaction,
hex: String,
}
impl TxWithHex {
pub fn new(tx: Transaction, hex: String) -> Self {
Self { tx, hex }
}
pub fn tx(&self) -> &Transaction {
&self.tx
}
pub fn hex(&self) -> &str {
&self.hex
}
pub fn into_parts(self) -> (Transaction, String) {
(self.tx, self.hex)
}
}