mmpl: new, mempool + rpc: fixes

This commit is contained in:
nym21
2026-05-14 13:59:15 +02:00
parent 528c134f26
commit 90aca2e048
36 changed files with 1269 additions and 453 deletions

21
crates/mmpl/Cargo.toml Normal file
View File

@@ -0,0 +1,21 @@
[package]
name = "mmpl"
description = "A CLI to stream Bitcoin mempool events as NDJSON"
version.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
[dependencies]
brk_error = { workspace = true }
brk_mempool = { workspace = true }
brk_rpc = { workspace = true }
brk_types = { workspace = true }
rustc-hash = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
[[bin]]
name = "mmpl"
path = "src/main.rs"

83
crates/mmpl/src/args.rs Normal file
View File

@@ -0,0 +1,83 @@
use std::path::PathBuf;
use brk_error::{Error, Result};
use brk_rpc::{Auth, Client};
pub struct Args {
bitcoindir: Option<PathBuf>,
rpcconnect: Option<String>,
rpcport: Option<u16>,
rpccookiefile: Option<PathBuf>,
rpcuser: Option<String>,
rpcpassword: Option<String>,
}
impl Args {
pub fn parse(raw: Vec<String>) -> Result<Self> {
let mut bitcoindir = None;
let mut rpcconnect = None;
let mut rpcport = None;
let mut rpccookiefile = None;
let mut rpcuser = None;
let mut rpcpassword = None;
let mut iter = raw.into_iter();
while let Some(a) = iter.next() {
let rest = a
.strip_prefix("--")
.ok_or_else(|| Error::Parse(format!("unexpected arg: '{a}'")))?;
let (key, value) = match rest.split_once('=') {
Some((k, v)) => (k.to_string(), v.to_string()),
None => (
rest.to_string(),
iter.next()
.ok_or_else(|| Error::Parse(format!("--{rest} requires a value")))?,
),
};
match key.as_str() {
"bitcoindir" => bitcoindir = Some(PathBuf::from(value)),
"rpcconnect" => rpcconnect = Some(value),
"rpcport" => {
rpcport = Some(value.parse().map_err(|_| {
Error::Parse(format!("--rpcport: '{value}' is not a valid port"))
})?);
}
"rpccookiefile" => rpccookiefile = Some(PathBuf::from(value)),
"rpcuser" => rpcuser = Some(value),
"rpcpassword" => rpcpassword = Some(value),
other => return Err(Error::Parse(format!("unknown flag --{other}"))),
}
}
Ok(Self {
bitcoindir,
rpcconnect,
rpcport,
rpccookiefile,
rpcuser,
rpcpassword,
})
}
pub fn rpc(&self) -> Result<Client> {
let host = self.rpcconnect.as_deref().unwrap_or("localhost");
let port = self.rpcport.unwrap_or(8332);
let url = format!("http://{host}:{port}");
let bitcoin_dir = self
.bitcoindir
.clone()
.unwrap_or_else(Client::default_bitcoin_path);
let cookie = self
.rpccookiefile
.clone()
.unwrap_or_else(|| bitcoin_dir.join(".cookie"));
let auth = if cookie.is_file() {
Auth::CookieFile(cookie)
} else if let (Some(u), Some(p)) = (self.rpcuser.as_deref(), self.rpcpassword.as_deref()) {
Auth::UserPass(u.to_string(), p.to_string())
} else {
return Err(Error::Parse(
"no RPC auth: cookie file missing and --rpcuser/--rpcpassword not set".into(),
));
};
Client::new(&url, auth)
}
}

105
crates/mmpl/src/emitter.rs Normal file
View File

@@ -0,0 +1,105 @@
//! Per-cycle NDJSON emitter. Owns the cycle-over-cycle memory used to
//! turn the always-fresh `Cycle` into change-only events for `tip`,
//! `block`, and `fees`.
use std::{
io::{self, Write},
time::{SystemTime, UNIX_EPOCH},
};
use brk_mempool::Cycle;
use brk_types::{Addr, AddrBytes, BlockHash, NextBlockHash, RecommendedFees, Txid};
use rustc_hash::FxHashSet;
use crate::event::Event;
/// Cycle-over-cycle memory for change-event detection. `None` on the
/// first cycle, so the very first `Tip` / `Block` / `Fees` always
/// fires - downstream consumers get a baseline without a special-case
/// "current state" RPC.
///
/// `prev_block0` is `None` on cold start so the first `block` event
/// reports the entire template as `added` (one big line, then small
/// deltas).
#[derive(Default)]
pub struct Emitter {
prev_tip_hash: Option<BlockHash>,
prev_next_block_hash: Option<NextBlockHash>,
prev_block0: Option<FxHashSet<Txid>>,
prev_fees: Option<RecommendedFees>,
}
impl Emitter {
/// Writes every event for one cycle and flushes once at the end.
/// Per-line flushes would cost one syscall per event on busy cycles;
/// the cycle period (~500ms) is the real "live" granularity.
pub fn emit<W: Write>(&mut self, out: &mut W, cycle: &Cycle) -> io::Result<()> {
let t = now_secs();
for tx in &cycle.added {
write_line(out, &Event::enter(t, tx))?;
}
for tx in &cycle.removed {
write_line(out, &Event::leave(t, tx))?;
}
for bytes in &cycle.addr_enters {
Self::emit_addr(out, t, bytes, Event::addr_enter)?;
}
for bytes in &cycle.addr_leaves {
Self::emit_addr(out, t, bytes, Event::addr_leave)?;
}
if self.prev_tip_hash != Some(cycle.tip_hash) {
self.prev_tip_hash = Some(cycle.tip_hash);
write_line(out, &Event::tip(t, cycle.tip_hash, cycle.tip_height))?;
}
let next_block_hash = cycle.snapshot.next_block_hash;
if self.prev_next_block_hash != Some(next_block_hash) {
self.prev_next_block_hash = Some(next_block_hash);
let current: FxHashSet<Txid> = cycle.snapshot.block0_txids().collect();
let (added, removed) = match &self.prev_block0 {
Some(prev) => (
current.difference(prev).copied().collect(),
prev.difference(&current).copied().collect(),
),
None => (current.iter().copied().collect(), Vec::new()),
};
write_line(out, &Event::block(t, next_block_hash, added, removed))?;
self.prev_block0 = Some(current);
}
if self.prev_fees.as_ref() != Some(&cycle.snapshot.fees) {
self.prev_fees = Some(cycle.snapshot.fees.clone());
write_line(out, &Event::fees(t, &cycle.snapshot.fees))?;
}
write_line(out, &Event::summary(t, cycle))?;
out.flush()
}
/// Render an `AddrBytes` and emit it via `make_event`. Unrenderable
/// bytes (e.g. exotic non-standard scripts) drop a one-line warning
/// to stderr - the event stream stays clean for downstream `jq`.
fn emit_addr<W: Write>(
out: &mut W,
t: f64,
bytes: &AddrBytes,
make_event: fn(f64, Addr) -> Event,
) -> io::Result<()> {
match Addr::try_from(bytes) {
Ok(addr) => write_line(out, &make_event(t, addr)),
Err(e) => {
eprintln!("mmpl: skipping addr event: {e}");
Ok(())
}
}
}
}
fn write_line<W: Write>(out: &mut W, ev: &Event) -> io::Result<()> {
serde_json::to_writer(&mut *out, ev).map_err(io::Error::other)?;
out.write_all(b"\n")
}
fn now_secs() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs_f64()
}

160
crates/mmpl/src/event.rs Normal file
View File

@@ -0,0 +1,160 @@
//! NDJSON event schema. One [`Event`] per line; consumers pipe to
//! `jq` / `grep` to filter. Per-event fields are flat (no nested
//! objects) so `jq -c 'select(...)'` works without `..` walks.
use brk_mempool::{Cycle, TxAdded, TxRemoval, TxRemoved};
use brk_types::{
Addr, BlockHash, FeeRate, Height, NextBlockHash, RecommendedFees, Sats, Timestamp, Txid, VSize,
};
use serde::Serialize;
#[derive(Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Event {
/// A tx entered the pool this cycle (either brand new or revived
/// from the graveyard - the stream collapses both to one event).
Enter {
t: f64,
txid: Txid,
vsize: VSize,
fee: Sats,
rate: FeeRate,
first_seen: Timestamp,
},
/// A tx left the pool this cycle. `rate` is the package-effective
/// rate at burial, not raw fee/vsize.
Leave {
t: f64,
txid: Txid,
#[serde(flatten)]
reason: LeaveReason,
rate: FeeRate,
},
/// An address went 0 → 1+ live mempool txs this cycle. Same-cycle
/// flip-flops are collapsed by the upstream tracker (no event).
AddrEnter { t: f64, addr: Addr },
/// An address went 1+ → 0 live mempool txs this cycle.
AddrLeave { t: f64, addr: Addr },
/// New confirmed block: bitcoind's chain tip moved since the last
/// cycle. `height` is the tip's own height (one less than the next
/// block being templated).
Tip {
t: f64,
hash: BlockHash,
height: Height,
},
/// The projected next block changed (different tx set or order).
/// `hash` is the same opaque content hash used as the mempool ETag.
/// `added`/`removed` is the txid-level diff against the previous
/// template; on the very first cycle `added` is the full template
/// and `removed` is empty.
Block {
t: f64,
hash: NextBlockHash,
added: Vec<Txid>,
removed: Vec<Txid>,
},
/// Recommended fee rates changed since the last cycle.
Fees {
t: f64,
fastest: FeeRate,
half_hour: FeeRate,
hour: FeeRate,
economy: FeeRate,
minimum: FeeRate,
},
/// Per-cycle heartbeat. Always emitted, even on idle cycles, so
/// downstream consumers see a steady pulse and can spot stalls.
/// `addr_enters`/`addr_leaves` count the post-cancellation 0↔1+
/// address transitions this cycle.
Cycle {
t: f64,
added: usize,
removed: usize,
addr_enters: usize,
addr_leaves: usize,
count: usize,
vsize: VSize,
fee: Sats,
took_ms: u64,
},
}
#[derive(Serialize)]
#[serde(tag = "reason", rename_all = "snake_case")]
pub enum LeaveReason {
Replaced { by: Txid },
Vanished,
}
impl Event {
pub fn enter(t: f64, tx: &TxAdded) -> Self {
Self::Enter {
t,
txid: tx.txid,
vsize: tx.vsize,
fee: tx.fee,
rate: tx.fee_rate,
first_seen: tx.first_seen,
}
}
pub fn leave(t: f64, tx: &TxRemoved) -> Self {
Self::Leave {
t,
txid: tx.txid,
reason: LeaveReason::from(tx.reason),
rate: tx.chunk_rate,
}
}
pub fn addr_enter(t: f64, addr: Addr) -> Self {
Self::AddrEnter { t, addr }
}
pub fn addr_leave(t: f64, addr: Addr) -> Self {
Self::AddrLeave { t, addr }
}
pub fn tip(t: f64, hash: BlockHash, height: Height) -> Self {
Self::Tip { t, hash, height }
}
pub fn block(t: f64, hash: NextBlockHash, added: Vec<Txid>, removed: Vec<Txid>) -> Self {
Self::Block { t, hash, added, removed }
}
pub fn fees(t: f64, fees: &RecommendedFees) -> Self {
Self::Fees {
t,
fastest: fees.fastest_fee,
half_hour: fees.half_hour_fee,
hour: fees.hour_fee,
economy: fees.economy_fee,
minimum: fees.minimum_fee,
}
}
pub fn summary(t: f64, cycle: &Cycle) -> Self {
Self::Cycle {
t,
added: cycle.added.len(),
removed: cycle.removed.len(),
addr_enters: cycle.addr_enters.len(),
addr_leaves: cycle.addr_leaves.len(),
count: cycle.info.count,
vsize: cycle.info.vsize,
fee: cycle.info.total_fee,
took_ms: cycle.took.as_millis() as u64,
}
}
}
impl From<TxRemoval> for LeaveReason {
fn from(reason: TxRemoval) -> Self {
match reason {
TxRemoval::Replaced { by } => Self::Replaced { by },
TxRemoval::Vanished => Self::Vanished,
}
}
}

61
crates/mmpl/src/main.rs Normal file
View File

@@ -0,0 +1,61 @@
mod args;
mod emitter;
mod event;
mod usage;
use std::{
io::{self, BufWriter},
process::ExitCode,
thread,
time::{Duration, Instant},
};
use brk_error::Result;
use brk_mempool::Mempool;
use args::Args;
use emitter::Emitter;
const PERIOD: Duration = Duration::from_millis(500);
fn main() -> ExitCode {
match run() {
Ok(()) => ExitCode::SUCCESS,
Err(e) => {
eprintln!("mmpl: {e}");
ExitCode::from(1)
}
}
}
fn run() -> Result<()> {
let raw: Vec<String> = std::env::args().skip(1).collect();
if raw.iter().any(|a| matches!(a.as_str(), "-h" | "--help")) {
usage::print();
return Ok(());
}
let args = Args::parse(raw)?;
let client = args.rpc()?;
let mempool = Mempool::new(&client);
let stdout = io::stdout();
let mut out = BufWriter::new(stdout.lock());
let mut emitter = Emitter::default();
loop {
let started = Instant::now();
match mempool.tick() {
Ok(cycle) => match emitter.emit(&mut out, &cycle) {
Ok(()) => {}
// Broken pipe (e.g. `mmpl | head`) is a normal end-of-stream.
Err(e) if e.kind() == io::ErrorKind::BrokenPipe => return Ok(()),
Err(e) => return Err(e.into()),
},
// Transient RPC failure - log, then retry on the next tick.
Err(e) => eprintln!("mmpl: tick failed: {e}"),
}
if let Some(rest) = PERIOD.checked_sub(started.elapsed()) {
thread::sleep(rest);
}
}
}

49
crates/mmpl/src/usage.rs Normal file
View File

@@ -0,0 +1,49 @@
// Raw string contains `{`/`}` literals (JSON), so it can't be the
// format string of `print!`. Pass via positional arg.
#[allow(clippy::print_literal)]
pub fn print() {
print!(
"{}",
r#"mmpl - stream Bitcoin mempool events as NDJSON
Usage:
mmpl [options]
Options:
--bitcoindir <path> Bitcoin data dir (default: platform-specific)
--rpcconnect <host> RPC host (default: localhost)
--rpcport <port> RPC port (default: 8332)
--rpccookiefile <path> Cookie file (default: <bitcoindir>/.cookie)
--rpcuser <user> RPC username (if no cookie file)
--rpcpassword <pass> RPC password (if no cookie file)
-h, --help Show this help
Events (one JSON object per line):
Per-tx (one event per change):
{"kind":"enter","t":..,"txid":..,"vsize":..,"fee":..,"rate":..,"first_seen":..}
{"kind":"leave","t":..,"txid":..,"reason":"vanished","rate":..}
{"kind":"leave","t":..,"txid":..,"reason":"replaced","by":..,"rate":..}
Per-address (0 <-> 1+ live mempool txs):
{"kind":"addr_enter","t":..,"addr":..}
{"kind":"addr_leave","t":..,"addr":..}
State changes (fires only when the value changed):
{"kind":"tip","t":..,"hash":..,"height":..} (new confirmed block)
{"kind":"block","t":..,"hash":..,"added":[txid..],"removed":[txid..]}
(next-block template changed; first cycle
emits the full template as `added`)
{"kind":"fees","t":..,"fastest":..,"half_hour":..,"hour":..,"economy":..,"minimum":..}
Per-cycle heartbeat (always emitted):
{"kind":"cycle","t":..,"added":N,"removed":N,"addr_enters":N,"addr_leaves":N,
"count":N,"vsize":N,"fee":N,"took_ms":N}
Examples:
mmpl | jq -c 'select(.kind=="enter" and .rate>=50)'
mmpl | jq -c 'select(.kind=="tip")'
mmpl | grep -v '"kind":"cycle"'
mmpl | jq -c 'select(.reason=="replaced")'
"#
);
}