From 90aca2e04875a339237fbf94633482d5679b399e Mon Sep 17 00:00:00 2001 From: nym21 Date: Thu, 14 May 2026 13:59:15 +0200 Subject: [PATCH] mmpl: new, mempool + rpc: fixes --- Cargo.lock | 91 +++- Cargo.toml | 6 +- crates/blk/Cargo.toml | 2 +- crates/blk/src/args.rs | 11 +- crates/blk/src/fields.rs | 406 ++++++++++-------- crates/blk/src/formatter.rs | 6 +- crates/blk/src/main.rs | 10 +- crates/blk/src/mode.rs | 18 +- crates/blk/src/selector.rs | 20 +- crates/blk/src/usage.rs | 45 +- crates/brk_iterator/src/lib.rs | 2 +- crates/brk_mempool/src/cycle.rs | 57 +++ crates/brk_mempool/src/cycle_diff.rs | 11 + crates/brk_mempool/src/lib.rs | 69 ++- crates/brk_mempool/src/steps/applier.rs | 68 ++- .../brk_mempool/src/steps/fetcher/fetched.rs | 14 +- crates/brk_mempool/src/steps/fetcher/mod.rs | 50 +-- crates/brk_mempool/src/steps/mod.rs | 3 +- crates/brk_mempool/src/steps/preparer/mod.rs | 2 +- .../src/steps/preparer/tx_addition.rs | 19 +- .../src/steps/preparer/tx_removal.rs | 2 +- crates/brk_mempool/src/steps/prevouts.rs | 6 +- .../stores/addr_tracker/addr_transitions.rs | 41 ++ .../src/stores/addr_tracker/mod.rs | 57 ++- crates/brk_mempool/src/stores/mod.rs | 2 +- crates/brk_reader/src/canonical.rs | 2 +- crates/brk_rpc/src/lib.rs | 24 +- crates/brk_rpc/src/methods.rs | 195 ++++----- crates/brk_types/src/addr_bytes.rs | 2 +- crates/brk_types/src/recommended_fees.rs | 2 +- crates/mmpl/Cargo.toml | 21 + crates/mmpl/src/args.rs | 83 ++++ crates/mmpl/src/emitter.rs | 105 +++++ crates/mmpl/src/event.rs | 160 +++++++ crates/mmpl/src/main.rs | 61 +++ crates/mmpl/src/usage.rs | 49 +++ 36 files changed, 1269 insertions(+), 453 deletions(-) create mode 100644 crates/brk_mempool/src/cycle.rs create mode 100644 crates/brk_mempool/src/cycle_diff.rs create mode 100644 crates/brk_mempool/src/stores/addr_tracker/addr_transitions.rs create mode 100644 crates/mmpl/Cargo.toml create mode 100644 crates/mmpl/src/args.rs create mode 100644 crates/mmpl/src/emitter.rs create mode 100644 crates/mmpl/src/event.rs create mode 100644 crates/mmpl/src/main.rs create mode 100644 crates/mmpl/src/usage.rs diff --git a/Cargo.lock b/Cargo.lock index 4a421a57b..3f374b02e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -769,9 +769,9 @@ checksum = "1c53ba0f290bfc610084c05582d9c5d421662128fc69f4bf236707af6fd321b9" [[package]] name = "cc" -version = "1.2.61" +version = "1.2.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16d90359e986641506914ba71350897565610e87ce0ad9e6f28569db3dd5c6d" +checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" dependencies = [ "find-msvc-tools", "jobserver", @@ -969,9 +969,9 @@ dependencies = [ [[package]] name = "corepc-types" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1583872320eb2ac629c36753023fd072f1ca1b3b74b20cc62bab055b54278789" +checksum = "b96c7869aa8234d10a41cbe3a1697bcb3a2482c48d9eb3541b3a4014a81afdad" dependencies = [ "bitcoin", "serde", @@ -1501,9 +1501,9 @@ checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" [[package]] name = "hashbrown" -version = "0.17.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" +checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" [[package]] name = "heck" @@ -1511,6 +1511,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex-conservative" version = "0.2.2" @@ -1796,7 +1802,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" dependencies = [ "equivalent", - "hashbrown 0.17.0", + "hashbrown 0.17.1", "serde", "serde_core", ] @@ -1810,6 +1816,23 @@ dependencies = [ "compare", ] +[[package]] +name = "is-terminal" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "is_ci" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7655c9839580ee829dfacba1d1278c2b7883e50a277ff7541299489d6bdfdc45" + [[package]] name = "itertools" version = "0.13.0" @@ -2019,9 +2042,9 @@ dependencies = [ [[package]] name = "lz4_flex" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db9a0d582c2874f68138a16ce1867e0ffde6c0bb0a0df85e1f36d04146db488a" +checksum = "7ef0d4ed8669f8f8826eb00dc878084aa8f253506c4fd5e8f58f5bce72ddb97e" [[package]] name = "matchit" @@ -2086,6 +2109,19 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "mmpl" +version = "0.3.0-beta.9" +dependencies = [ + "brk_error", + "brk_mempool", + "brk_rpc", + "brk_types", + "rustc-hash", + "serde", + "serde_json", +] + [[package]] name = "nom" version = "7.1.3" @@ -2155,6 +2191,10 @@ name = "owo-colors" version = "4.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d211803b9b6b570f68772237e415a029d5a50c65d382910b879fb19d3271f94d" +dependencies = [ + "supports-color 2.1.0", + "supports-color 3.0.2", +] [[package]] name = "parking_lot" @@ -2200,9 +2240,9 @@ dependencies = [ [[package]] name = "pco" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e89d71ab3c07ed898defa4915bdc2a963131d811a1eab0eeacfac65c94cdeae8" +checksum = "553ccdc7f6999785559af4998c79712a5ab820e26b68bad9146609c19587ec82" dependencies = [ "better_io", "dtype_dispatch", @@ -2887,6 +2927,25 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "supports-color" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6398cde53adc3c4557306a96ce67b302968513830a77a95b2b17305d9719a89" +dependencies = [ + "is-terminal", + "is_ci", +] + +[[package]] +name = "supports-color" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c64fc7232dd8d2e4ac5ce4ef302b1d81e0b80d055b9d77c7c4f51f6aa4c867d6" +dependencies = [ + "is_ci", +] + [[package]] name = "syn" version = "2.0.117" @@ -3000,9 +3059,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.52.2" +version = "1.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "110a78583f19d5cdb2c5ccf321d1290344e71313c6c37d43520d386027d18386" +checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" dependencies = [ "libc", "mio", @@ -3302,7 +3361,7 @@ dependencies = [ "itoa", "libc", "log", - "lz4_flex 0.13.0", + "lz4_flex 0.13.1", "parking_lot", "pco", "rawdb", @@ -3819,9 +3878,9 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" +checksum = "0ec05a11813ea801ff6d75110ad09cd0824ddba17dfe17128ea0d5f68e6c5272" dependencies = [ "zerofrom-derive", ] diff --git a/Cargo.toml b/Cargo.toml index 0e9bda973..c2a865861 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,14 +65,14 @@ brk_website = { version = "0.3.0-beta.9", path = "crates/brk_website" } byteview = "0.10.1" color-eyre = "0.6.5" corepc-jsonrpc = { package = "jsonrpc", version = "0.19.0", features = ["simple_http"], default-features = false } -corepc-types = { version = "0.12.0", features = ["std"], default-features = false } +corepc-types = { version = "0.13.0", features = ["std"], default-features = false } derive_more = { version = "2.1.1", features = ["deref", "deref_mut"] } fjall = "=3.0.4" indexmap = { version = "2.14.0", features = ["serde"] } jiff = { version = "0.2.24", features = ["perf-inline", "tz-system"], default-features = false } owo-colors = "4.3.0" parking_lot = "0.12.5" -pco = "1.0.1" +pco = "1.0.2" rayon = "1.12.0" rustc-hash = "2.1.2" schemars = { version = "1.2.1", features = ["indexmap2"] } @@ -81,7 +81,7 @@ serde_bytes = "0.11.19" serde_derive = "1.0.228" serde_json = { version = "1.0.149", features = ["float_roundtrip", "preserve_order"] } smallvec = "1.15.1" -tokio = { version = "1.52.2", features = ["rt-multi-thread"] } +tokio = { version = "1.52.3", features = ["rt-multi-thread"] } tower-http = { version = "0.6.10", features = ["catch-panic", "compression-br", "compression-gzip", "compression-zstd", "cors", "normalize-path", "timeout", "trace"] } tower-layer = "0.3" tracing = { version = "0.1", default-features = false, features = ["std"] } diff --git a/crates/blk/Cargo.toml b/crates/blk/Cargo.toml index bd7c8320f..faf86a356 100644 --- a/crates/blk/Cargo.toml +++ b/crates/blk/Cargo.toml @@ -13,7 +13,7 @@ brk_error = { workspace = true } brk_reader = { workspace = true } brk_rpc = { workspace = true } brk_types = { workspace = true } -owo-colors = { workspace = true } +owo-colors = { workspace = true, features = ["supports-colors"] } serde_json = { workspace = true } [[bin]] diff --git a/crates/blk/src/args.rs b/crates/blk/src/args.rs index 1e5a47c9f..2cb6ed1fe 100644 --- a/crates/blk/src/args.rs +++ b/crates/blk/src/args.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{collections::HashSet, path::PathBuf}; use brk_error::{Error, Result}; use brk_rpc::{Auth, Client}; @@ -66,6 +66,9 @@ impl Args { } continue; } + if a.starts_with('-') { + return Err(Error::Parse(format!("unknown flag {a}"))); + } positional.push(a); } @@ -74,6 +77,12 @@ impl Args { .next() .ok_or_else(|| Error::Parse("missing selector".into()))?; let paths: Vec = iter.map(|f| Path::parse(&f)).collect::>()?; + let mut seen = HashSet::with_capacity(paths.len()); + for p in &paths { + if !seen.insert(p.raw.as_str()) { + return Err(Error::Parse(format!("duplicate field '{}'", p.raw))); + } + } Ok(Self { selector, paths, diff --git a/crates/blk/src/fields.rs b/crates/blk/src/fields.rs index 844dce093..1fc1e25bf 100644 --- a/crates/blk/src/fields.rs +++ b/crates/blk/src/fields.rs @@ -6,29 +6,126 @@ use bitcoin::{ }; use brk_error::{Error, Result}; use brk_types::ReadBlock; -use serde_json::{Value, json}; +use serde_json::{Map, Value, json}; use crate::path::{Path, Step}; +// `hex` is intentionally absent: matches `bitcoin-cli getblock 2` +// and keeps NDJSON dumps tractable. Still reachable explicitly via `blk N hex`. +const BLOCK_FIELDS: &[&str] = &[ + "height", + "hash", + "version", + "version_hex", + "merkle", + "time", + "nonce", + "bits", + "difficulty", + "prev", + "txs", + "n_inputs", + "n_outputs", + "witness_txs", + "size", + "strippedsize", + "weight", + "subsidy", + "coinbase", + "coinbase_hex", + "header_hex", + "tx", +]; + +const TX_FIELDS: &[&str] = &[ + "txid", + "wtxid", + "version", + "locktime", + "size", + "base_size", + "vsize", + "weight", + "inputs", + "outputs", + "is_coinbase", + "has_witness", + "is_rbf", + "total_out", + "hex", + "vin", + "vout", +]; + +const VIN_FIELDS: &[&str] = &[ + "prev_txid", + "prev_vout", + "sequence", + "script_sig", + "script_sig_asm", + "witness", + "has_witness", + "is_rbf", + "coinbase", +]; + +const VOUT_FIELDS: &[&str] = &[ + "value", + "script_pubkey", + "script_pubkey_asm", + "type", + "address", +]; + pub struct Ctx<'a> { block: &'a ReadBlock, + network: Network, size_weight: OnceCell<(usize, usize)>, } impl<'a> Ctx<'a> { - pub fn new(block: &'a ReadBlock) -> Self { + pub fn new(block: &'a ReadBlock, network: Network) -> Self { Self { block, + network, size_weight: OnceCell::new(), } } pub fn resolve(&self, path: &Path) -> Result { let (step, rest) = pop(&path.steps)?; + self.block_field(&step.name, step.index, rest) + } + + pub fn resolve_str(&self, path: &Path) -> Result { + Ok(match self.resolve(path)? { + Value::String(s) => s, + other => other.to_string(), + }) + } + + pub fn full(&self) -> Value { + let mut obj = Map::with_capacity(BLOCK_FIELDS.len()); + for &name in BLOCK_FIELDS { + obj.insert( + name.into(), + self.block_field(name, None, &[]).expect("known block field"), + ); + } + Value::Object(obj) + } + + fn size_and_weight(&self) -> (usize, usize) { + *self + .size_weight + .get_or_init(|| self.block.total_size_and_weight()) + } + + fn block_field(&self, name: &str, index: Option, rest: &[Step]) -> Result { let b = self.block; let raw: &Block = b; - let scalar = |v| scalar_leaf(v, step, rest); - match step.name.as_str() { + let scalar = |v| scalar_leaf(v, name, index, rest); + match name { "height" => scalar(json!(*b.height())), "hash" => scalar(json!(b.hash().to_string())), "time" => scalar(json!(b.header.time)), @@ -37,7 +134,7 @@ impl<'a> Ctx<'a> { "{:08x}", b.header.version.to_consensus() as u32 ))), - "bits" => scalar(json!(b.header.bits.to_consensus())), + "bits" => scalar(json!(format!("{:08x}", b.header.bits.to_consensus()))), "nonce" => scalar(json!(b.header.nonce)), "prev" => scalar(json!(b.header.prev_blockhash.to_string())), "merkle" => scalar(json!(b.header.merkle_root.to_string())), @@ -62,102 +159,154 @@ impl<'a> Ctx<'a> { "header_hex" => scalar(json!(serialize_hex(&b.header))), "hex" => scalar(json!(serialize_hex(raw))), "coinbase" => scalar(json!(b.coinbase_tag().as_str())), - "tx" => pick(&b.txdata, step, rest, |i, tx| resolve_tx(tx, i == 0, rest)), + "coinbase_hex" => { + debug_assert!( + !b.txdata.is_empty() && !b.txdata[0].input.is_empty(), + "consensus-valid block has a coinbase tx with at least one input" + ); + scalar(json!(b.txdata[0].input[0].script_sig.to_hex_string())) + } + "tx" => pick(&b.txdata, name, index, |i, tx| { + self.resolve_tx(tx, i == 0, rest) + }), other => Err(unknown("block", other)), } } - pub fn resolve_str(&self, path: &Path) -> Result { - Ok(match self.resolve(path)? { - Value::String(s) => s, - other => other.to_string(), - }) + fn resolve_tx(&self, tx: &Transaction, is_coinbase: bool, steps: &[Step]) -> Result { + if steps.is_empty() { + let mut obj = Map::with_capacity(TX_FIELDS.len()); + for &name in TX_FIELDS { + obj.insert( + name.into(), + self.tx_field(tx, is_coinbase, name, None, &[]) + .expect("known tx field"), + ); + } + return Ok(Value::Object(obj)); + } + let (step, rest) = pop(steps)?; + self.tx_field(tx, is_coinbase, &step.name, step.index, rest) } - pub fn full(&self) -> Value { - let b = self.block; - let (size, weight) = self.size_and_weight(); - let tx: Vec = b - .txdata - .iter() - .enumerate() - .map(|(i, tx)| tx_to_value(tx, i == 0)) - .collect(); - json!({ - "height": *b.height(), - "hash": b.hash().to_string(), - "version": b.header.version.to_consensus(), - "version_hex": format!("{:08x}", b.header.version.to_consensus() as u32), - "merkle": b.header.merkle_root.to_string(), - "time": b.header.time, - "nonce": b.header.nonce, - "bits": b.header.bits.to_consensus(), - "difficulty": b.header.difficulty_float(), - "prev": b.header.prev_blockhash.to_string(), - "txs": b.txdata.len(), - "n_inputs": b.txdata.iter().map(|t| t.input.len()).sum::(), - "n_outputs": b.txdata.iter().map(|t| t.output.len()).sum::(), - "witness_txs": b.txdata.iter().filter(|t| tx_has_witness(t)).count(), - "size": size, - "strippedsize": (weight - size) / 3, - "weight": weight, - "subsidy": subsidy_sats(*b.height()), - "coinbase": b.coinbase_tag().as_str(), - "header_hex": serialize_hex(&b.header), - "tx": tx, - }) + fn tx_field( + &self, + tx: &Transaction, + is_coinbase: bool, + name: &str, + index: Option, + rest: &[Step], + ) -> Result { + let scalar = |v| scalar_leaf(v, name, index, rest); + match name { + "txid" => scalar(json!(tx.compute_txid().to_string())), + "wtxid" => scalar(json!(tx.compute_wtxid().to_string())), + "version" => scalar(json!(tx.version.0)), + "locktime" => scalar(json!(tx.lock_time.to_consensus_u32())), + "size" => scalar(json!(tx.total_size())), + "base_size" => scalar(json!(tx.base_size())), + "vsize" => scalar(json!(tx.vsize())), + "weight" => scalar(json!(tx.weight().to_wu())), + "inputs" => scalar(json!(tx.input.len())), + "outputs" => scalar(json!(tx.output.len())), + "is_coinbase" => scalar(json!(is_coinbase)), + "has_witness" => scalar(json!(tx_has_witness(tx))), + "is_rbf" => scalar(json!(tx_is_rbf(tx))), + "total_out" => scalar(json!(tx_total_out(tx))), + "hex" => scalar(json!(serialize_hex(tx))), + "vin" => pick(&tx.input, name, index, |j, vin| { + resolve_vin(vin, is_coinbase && j == 0, rest) + }), + "vout" => pick(&tx.output, name, index, |_, vout| { + self.resolve_vout(vout, rest) + }), + other => Err(unknown("tx", other)), + } } - fn size_and_weight(&self) -> (usize, usize) { - *self - .size_weight - .get_or_init(|| self.block.total_size_and_weight()) + fn resolve_vout(&self, vout: &TxOut, steps: &[Step]) -> Result { + if steps.is_empty() { + let mut obj = Map::with_capacity(VOUT_FIELDS.len()); + for &name in VOUT_FIELDS { + obj.insert( + name.into(), + self.vout_field(vout, name, None, &[]) + .expect("known vout field"), + ); + } + return Ok(Value::Object(obj)); + } + let (step, rest) = pop(steps)?; + self.vout_field(vout, &step.name, step.index, rest) } -} -fn resolve_tx(tx: &Transaction, is_coinbase: bool, steps: &[Step]) -> Result { - if steps.is_empty() { - return Ok(tx_to_value(tx, is_coinbase)); + fn vout_field( + &self, + vout: &TxOut, + name: &str, + index: Option, + rest: &[Step], + ) -> Result { + let scalar = |v| scalar_leaf(v, name, index, rest); + match name { + "value" => scalar(json!(vout.value.to_sat())), + "script_pubkey" => scalar(json!(vout.script_pubkey.to_hex_string())), + "script_pubkey_asm" => scalar(json!(vout.script_pubkey.to_asm_string())), + "type" => scalar(json!(script_type(&vout.script_pubkey))), + "address" => scalar(self.address_value(&vout.script_pubkey)), + other => Err(unknown("vout", other)), + } } - let (step, rest) = pop(steps)?; - let scalar = |v| scalar_leaf(v, step, rest); - match step.name.as_str() { - "txid" => scalar(json!(tx.compute_txid().to_string())), - "wtxid" => scalar(json!(tx.compute_wtxid().to_string())), - "version" => scalar(json!(tx.version.0)), - "locktime" => scalar(json!(tx.lock_time.to_consensus_u32())), - "size" => scalar(json!(tx.total_size())), - "base_size" => scalar(json!(tx.base_size())), - "vsize" => scalar(json!(tx.vsize())), - "weight" => scalar(json!(tx.weight().to_wu())), - "inputs" => scalar(json!(tx.input.len())), - "outputs" => scalar(json!(tx.output.len())), - "is_coinbase" => scalar(json!(is_coinbase)), - "has_witness" => scalar(json!(tx_has_witness(tx))), - "is_rbf" => scalar(json!(tx_is_rbf(tx))), - "total_out" => scalar(json!(tx_total_out(tx))), - "hex" => scalar(json!(serialize_hex(tx))), - "vin" => pick(&tx.input, step, rest, |j, vin| { - resolve_vin(vin, is_coinbase && j == 0, rest) - }), - "vout" => pick(&tx.output, step, rest, |_, vout| resolve_vout(vout, rest)), - other => Err(unknown("tx", other)), + + fn address_value(&self, s: &ScriptBuf) -> Value { + Address::from_script(s, self.network) + .map(|a| Value::String(a.to_string())) + .unwrap_or(Value::Null) } } fn resolve_vin(vin: &TxIn, is_coinbase: bool, steps: &[Step]) -> Result { if steps.is_empty() { - return Ok(vin_to_value(vin, is_coinbase)); + let mut obj = Map::with_capacity(VIN_FIELDS.len()); + for &name in VIN_FIELDS { + obj.insert( + name.into(), + vin_field(vin, is_coinbase, name, None, &[]).expect("known vin field"), + ); + } + return Ok(Value::Object(obj)); } let (step, rest) = pop(steps)?; - let scalar = |v| scalar_leaf(v, step, rest); - match step.name.as_str() { + vin_field(vin, is_coinbase, &step.name, step.index, rest) +} + +fn vin_field( + vin: &TxIn, + is_coinbase: bool, + name: &str, + index: Option, + rest: &[Step], +) -> Result { + let scalar = |v| scalar_leaf(v, name, index, rest); + match name { "prev_txid" => scalar(json!(vin.previous_output.txid.to_string())), "prev_vout" => scalar(json!(vin.previous_output.vout)), "sequence" => scalar(json!(vin.sequence.0)), "script_sig" => scalar(json!(vin.script_sig.to_hex_string())), "script_sig_asm" => scalar(json!(vin.script_sig.to_asm_string())), - "witness" => scalar(witness_to_value(vin)), + "witness" => { + if !rest.is_empty() { + return Err(Error::Parse( + "'witness' element has no fields to drill into".into(), + )); + } + let items: Vec = vin + .witness + .iter() + .map(|w| w.to_lower_hex_string()) + .collect(); + pick(&items, name, index, |_, hex| Ok(Value::String(hex.clone()))) + } "has_witness" => scalar(json!(!vin.witness.is_empty())), "is_rbf" => scalar(json!(vin.sequence.is_rbf())), "coinbase" => scalar(json!(is_coinbase)), @@ -165,33 +314,17 @@ fn resolve_vin(vin: &TxIn, is_coinbase: bool, steps: &[Step]) -> Result { } } -fn resolve_vout(vout: &TxOut, steps: &[Step]) -> Result { - if steps.is_empty() { - return Ok(vout_to_value(vout)); - } - let (step, rest) = pop(steps)?; - let scalar = |v| scalar_leaf(v, step, rest); - match step.name.as_str() { - "value" => scalar(json!(vout.value.to_sat())), - "script_pubkey" => scalar(json!(vout.script_pubkey.to_hex_string())), - "script_pubkey_asm" => scalar(json!(vout.script_pubkey.to_asm_string())), - "type" => scalar(json!(script_type(&vout.script_pubkey))), - "address" => scalar(address_value(&vout.script_pubkey)), - other => Err(unknown("vout", other)), - } -} - fn pick( items: &[T], - step: &Step, - _rest: &[Step], + name: &str, + index: Option, mut resolve: impl FnMut(usize, &T) -> Result, ) -> Result { - match step.index { + match index { Some(i) => { let item = items .get(i) - .ok_or_else(|| out_of_range(&step.name, i, items.len()))?; + .ok_or_else(|| out_of_range(name, i, items.len()))?; resolve(i, item) } None => Ok(Value::Array( @@ -210,14 +343,13 @@ fn pop(steps: &[Step]) -> Result<(&Step, &[Step])> { .ok_or_else(|| Error::Parse("empty path segment".into())) } -fn scalar_leaf(v: Value, step: &Step, rest: &[Step]) -> Result { - if step.index.is_some() { - return Err(Error::Parse(format!("'{}' is not an array", step.name))); +fn scalar_leaf(v: Value, name: &str, index: Option, rest: &[Step]) -> Result { + if index.is_some() { + return Err(Error::Parse(format!("'{name}' is not an array"))); } if !rest.is_empty() { return Err(Error::Parse(format!( - "'{}' is a scalar; nothing to drill into", - step.name + "'{name}' has no fields to drill into" ))); } Ok(v) @@ -233,59 +365,6 @@ fn unknown(level: &str, name: &str) -> Error { )) } -fn tx_to_value(tx: &Transaction, is_coinbase: bool) -> Value { - let vin: Vec = tx - .input - .iter() - .enumerate() - .map(|(j, v)| vin_to_value(v, is_coinbase && j == 0)) - .collect(); - let vout: Vec = tx.output.iter().map(vout_to_value).collect(); - json!({ - "txid": tx.compute_txid().to_string(), - "wtxid": tx.compute_wtxid().to_string(), - "version": tx.version.0, - "locktime": tx.lock_time.to_consensus_u32(), - "size": tx.total_size(), - "base_size": tx.base_size(), - "vsize": tx.vsize(), - "weight": tx.weight().to_wu(), - "inputs": tx.input.len(), - "outputs": tx.output.len(), - "is_coinbase": is_coinbase, - "has_witness": tx_has_witness(tx), - "is_rbf": tx_is_rbf(tx), - "total_out": tx_total_out(tx), - "hex": serialize_hex(tx), - "vin": vin, - "vout": vout, - }) -} - -fn vin_to_value(vin: &TxIn, is_coinbase: bool) -> Value { - json!({ - "prev_txid": vin.previous_output.txid.to_string(), - "prev_vout": vin.previous_output.vout, - "sequence": vin.sequence.0, - "script_sig": vin.script_sig.to_hex_string(), - "script_sig_asm": vin.script_sig.to_asm_string(), - "witness": witness_to_value(vin), - "has_witness": !vin.witness.is_empty(), - "is_rbf": vin.sequence.is_rbf(), - "coinbase": is_coinbase, - }) -} - -fn vout_to_value(vout: &TxOut) -> Value { - json!({ - "value": vout.value.to_sat(), - "script_pubkey": vout.script_pubkey.to_hex_string(), - "script_pubkey_asm": vout.script_pubkey.to_asm_string(), - "type": script_type(&vout.script_pubkey), - "address": address_value(&vout.script_pubkey), - }) -} - fn tx_has_witness(tx: &Transaction) -> bool { tx.input.iter().any(|i| !i.witness.is_empty()) } @@ -307,15 +386,6 @@ fn subsidy_sats(height: u32) -> u64 { } } -fn witness_to_value(vin: &TxIn) -> Value { - Value::Array( - vin.witness - .iter() - .map(|w| Value::String(w.to_lower_hex_string())) - .collect(), - ) -} - fn script_type(s: &ScriptBuf) -> &'static str { if s.is_p2pkh() { "p2pkh" @@ -335,9 +405,3 @@ fn script_type(s: &ScriptBuf) -> &'static str { "unknown" } } - -fn address_value(s: &ScriptBuf) -> Value { - Address::from_script(s, Network::Bitcoin) - .map(|a| Value::String(a.to_string())) - .unwrap_or(Value::Null) -} diff --git a/crates/blk/src/formatter.rs b/crates/blk/src/formatter.rs index d21302640..40f065843 100644 --- a/crates/blk/src/formatter.rs +++ b/crates/blk/src/formatter.rs @@ -15,16 +15,18 @@ impl Formatter { pub fn format(&self, ctx: &Ctx) -> Result { match self.mode { - Mode::Bare => self.bare(ctx), + Mode::Bare => self.bare(ctx, false), Mode::Tsv => self.tsv(ctx), Mode::Json => Ok(serde_json::to_string(&self.object(ctx)?)?), + Mode::Pretty if self.fields.len() == 1 => self.bare(ctx, true), Mode::Pretty => Ok(serde_json::to_string_pretty(&self.object(ctx)?)?), } } - fn bare(&self, ctx: &Ctx) -> Result { + fn bare(&self, ctx: &Ctx, pretty: bool) -> Result { Ok(match ctx.resolve(&self.fields[0])? { Value::String(s) => s, + other if pretty => serde_json::to_string_pretty(&other)?, other => other.to_string(), }) } diff --git a/crates/blk/src/main.rs b/crates/blk/src/main.rs index 973ed7437..852a10170 100644 --- a/crates/blk/src/main.rs +++ b/crates/blk/src/main.rs @@ -37,17 +37,19 @@ fn run() -> Result<()> { let client = args.rpc()?; let (start, end) = Selector::parse(&args.selector, &client)?; + let network = client.get_network()?; - let mode = Mode::pick(args.pretty, args.compact, args.paths.len()); + let mode = Mode::pick(args.pretty, args.compact, args.paths.len())?; let reader = Reader::new(args.blocks_dir(), &client); let formatter = Formatter::new(mode, args.paths); - let parser_threads = std::thread::available_parallelism() + let parser_threads = (std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(2) - / 2; + / 2) + .max(1); for block in reader.range_with(start, end, parser_threads)?.iter() { let block = block?; - let line = formatter.format(&Ctx::new(&block))?; + let line = formatter.format(&Ctx::new(&block, network))?; if !line.is_empty() { println!("{line}"); } diff --git a/crates/blk/src/mode.rs b/crates/blk/src/mode.rs index 3c7c982ca..371063f1a 100644 --- a/crates/blk/src/mode.rs +++ b/crates/blk/src/mode.rs @@ -1,3 +1,5 @@ +use brk_error::{Error, Result}; + #[derive(Clone, Copy)] pub enum Mode { Bare, @@ -7,8 +9,18 @@ pub enum Mode { } impl Mode { - pub fn pick(pretty: bool, compact: bool, n_fields: usize) -> Self { - if pretty { + pub fn pick(pretty: bool, compact: bool, n_fields: usize) -> Result { + if pretty && compact { + return Err(Error::Parse( + "--pretty and --compact are mutually exclusive".into(), + )); + } + if compact && n_fields == 0 { + return Err(Error::Parse( + "--compact requires at least one field".into(), + )); + } + Ok(if pretty { Self::Pretty } else if n_fields == 0 { Self::Json @@ -18,6 +30,6 @@ impl Mode { Self::Tsv } else { Self::Json - } + }) } } diff --git a/crates/blk/src/selector.rs b/crates/blk/src/selector.rs index fa62f96b5..85bc5a496 100644 --- a/crates/blk/src/selector.rs +++ b/crates/blk/src/selector.rs @@ -6,13 +6,15 @@ pub struct Selector; impl Selector { pub fn parse(s: &str, client: &Client) -> Result<(Height, Height)> { - let (start, end) = match s.split_once("..") { - Some((a, b)) => (Self::endpoint(a, client)?, Self::endpoint(b, client)?), - None => { - let h = Self::endpoint(s, client)?; - (h, h) - } + let (a, b) = s.split_once("..").unwrap_or((s, s)); + let needs_tip = |p: &str| p == "tip" || p.starts_with("tip-"); + let tip = if needs_tip(a) || needs_tip(b) { + Some(client.get_last_height()?) + } else { + None }; + let start = Self::endpoint(a, tip)?; + let end = Self::endpoint(b, tip)?; if end < start { return Err(Error::Parse(format!( "range end {end} before start {start}" @@ -21,15 +23,15 @@ impl Selector { Ok((start, end)) } - fn endpoint(s: &str, client: &Client) -> Result { + fn endpoint(s: &str, tip: Option) -> Result { if s == "tip" { - return client.get_last_height(); + return Ok(tip.expect("tip pre-resolved when input contains 'tip'")); } if let Some(rest) = s.strip_prefix("tip-") { let n: u32 = rest .parse() .map_err(|_| Error::Parse(format!("bad tip offset: {s}")))?; - let tip = client.get_last_height()?; + let tip = tip.expect("tip pre-resolved when input contains 'tip'"); return tip .checked_sub(n) .ok_or_else(|| Error::Parse(format!("tip-{n} underflows genesis"))); diff --git a/crates/blk/src/usage.rs b/crates/blk/src/usage.rs index c085a21ee..eb6e9fe6f 100644 --- a/crates/blk/src/usage.rs +++ b/crates/blk/src/usage.rs @@ -1,4 +1,4 @@ -use owo_colors::OwoColorize; +use owo_colors::{OwoColorize, Stream}; const SEL_W: usize = 5; // longest selector token: "tip-N" const LABEL_W: usize = 28; // longest label across OUTPUT/OPTIONS/EXAMPLES (= example cmd "blk 800000 tx.0.vout.0.value") @@ -7,18 +7,18 @@ const PH_W: usize = LABEL_W - FLAG_W - 1; // placeholder column width so flag+ph const GAP: usize = 4; pub fn print() { - println!("{} - inspect a Bitcoin Core block", "blk".bold()); + println!("{} - inspect a Bitcoin Core block", bold("blk")); println!(); section("USAGE"); println!( " blk {} [{} ...] [OPTIONS]", - "".bright_black(), - "".bright_black() + dim(""), + dim("") ); println!( " {}", - "no fields = full block as JSON (analog of `bitcoin-cli getblock 2`)".bright_black() + dim("no fields = full block as JSON (analog of `bitcoin-cli getblock 2`)") ); println!(); @@ -32,15 +32,15 @@ pub fn print() { section("FIELDS"); println!( " {}", - "dotted paths drill into nested data; omit an index for arrays".bright_black() + dim("dotted paths drill into nested data, omit an index for arrays") ); println!(); group("block"); fields(&[ "height, hash, time, version, version_hex, bits, nonce,", "prev, merkle, difficulty, txs, n_inputs, n_outputs,", - "witness_txs, size, strippedsize, weight, subsidy, coinbase,", - "header_hex, hex", + "witness_txs, size, strippedsize, weight, subsidy,", + "coinbase, coinbase_hex, header_hex, hex", ]); println!(); group_note("tx.i", "omit i for all txs"); @@ -61,14 +61,14 @@ pub fn print() { println!(); println!( " {}", - "Naked tx / tx.i / vin / vout returns the whole sub-object as JSON.".bright_black() + dim("Naked tx / tx.i / vin / vout returns the whole sub-object as JSON.") ); println!(); section("OUTPUT"); out("no fields", "full block JSON object, one per line (NDJSON)"); out("1 field", "bare value, one per line"); - out("2+ fields", "compact JSON object, one per line (NDJSON)"); + out("2+ fields", "JSON object, one per line (NDJSON)"); out("-p, --pretty", "pretty JSON object instead"); out( "-c, --compact", @@ -115,18 +115,18 @@ pub fn print() { } fn section(name: &str) { - println!("{}", format!("{name}:").bold()); + println!("{}", bold(&format!("{name}:"))); } fn group(name: &str) { - println!(" {}", format!("{name}:").bold()); + println!(" {}", bold(&format!("{name}:"))); } fn group_note(name: &str, note: &str) { println!( " {} {}", - format!("{name}:").bold(), - format!("({note})").bright_black() + bold(&format!("{name}:")), + dim(&format!("({note})")) ); } @@ -143,7 +143,7 @@ fn pad(s: &str, width: usize) -> String { fn sel(token: &str, desc: &str) { println!( " {}{}{}{desc}", - token.bright_black(), + dim(token), pad(token, SEL_W), " ".repeat(GAP), ); @@ -161,12 +161,12 @@ fn opt(flag: &str, ph: &str, desc: &str, default: Option<&str>) { let head = format!( " {flag}{} {}{}{}", pad(flag, FLAG_W), - ph.bright_black(), + dim(ph), pad(ph, PH_W), " ".repeat(GAP), ); match default { - Some(d) => println!("{head}{desc} {}", d.bright_black()), + Some(d) => println!("{head}{desc} {}", dim(d)), None => println!("{head}{desc}"), } } @@ -176,6 +176,15 @@ fn ex(cmd: &str, note: &str) { " {cmd}{}{}{}", pad(cmd, LABEL_W), " ".repeat(GAP), - format!("# {note}").bright_black() + dim(&format!("# {note}")) ); } + +fn bold(s: &str) -> String { + s.if_supports_color(Stream::Stdout, |t| t.bold()).to_string() +} + +fn dim(s: &str) -> String { + s.if_supports_color(Stream::Stdout, |t| t.bright_black()) + .to_string() +} diff --git a/crates/brk_iterator/src/lib.rs b/crates/brk_iterator/src/lib.rs index 195fddd3e..b7dc70258 100644 --- a/crates/brk_iterator/src/lib.rs +++ b/crates/brk_iterator/src/lib.rs @@ -118,7 +118,7 @@ impl Blocks { BlockRange::After { hash } => { let start = if let Some(hash) = hash.as_ref() { let block_info = client.get_block_header_info(hash)?; - (block_info.height + 1).into() + Height::from((block_info.height + 1) as u64) } else { Height::ZERO }; diff --git a/crates/brk_mempool/src/cycle.rs b/crates/brk_mempool/src/cycle.rs new file mode 100644 index 000000000..9a660eaca --- /dev/null +++ b/crates/brk_mempool/src/cycle.rs @@ -0,0 +1,57 @@ +//! Per-cycle event report returned by [`super::Mempool::tick`]. + +use std::{sync::Arc, time::Duration}; + +use brk_types::{AddrBytes, BlockHash, FeeRate, Height, MempoolInfo, Sats, Timestamp, Txid, VSize}; + +use crate::{Snapshot, TxRemoval}; + +/// One pull cycle's worth of changes. Produced by +/// [`super::Mempool::tick`] after fetch → prepare → apply → prevouts → +/// rebuild. The snapshot is always present (the rebuilder runs every +/// cycle); compare `next_block_hash` across cycles if you need to +/// detect whether the projection actually changed. +pub struct Cycle { + pub added: Vec, + pub removed: Vec, + /// Addresses that went from 0 → 1+ live mempool txs this cycle. + /// Same-cycle enter-then-leave is collapsed (no event in either list). + pub addr_enters: Vec, + /// Addresses that went from 1+ → 0 live mempool txs this cycle. + pub addr_leaves: Vec, + /// Latest confirmed block. Compare to the prior cycle's `tip_hash` + /// to detect a new block. + pub tip_hash: BlockHash, + pub tip_height: Height, + pub info: MempoolInfo, + pub snapshot: Arc, + pub took: Duration, +} + +#[derive(Debug, Clone, Copy)] +pub struct TxAdded { + pub txid: Txid, + pub fee: Sats, + pub vsize: VSize, + pub fee_rate: FeeRate, + pub first_seen: Timestamp, + pub kind: AddedKind, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AddedKind { + /// First time we've seen this txid. + Fresh, + /// Re-entered the pool after a prior removal still in the graveyard. + Revived, +} + +#[derive(Debug, Clone, Copy)] +pub struct TxRemoved { + pub txid: Txid, + pub reason: TxRemoval, + /// Package-effective rate at burial. Same value the tx reported + /// while alive - RBF predecessors keep their package rate, not a + /// misleading isolated fee/vsize. + pub chunk_rate: FeeRate, +} diff --git a/crates/brk_mempool/src/cycle_diff.rs b/crates/brk_mempool/src/cycle_diff.rs new file mode 100644 index 000000000..e922f79e5 --- /dev/null +++ b/crates/brk_mempool/src/cycle_diff.rs @@ -0,0 +1,11 @@ +//! Per-cycle accumulator threaded through the pipeline steps and +//! drained into the public [`crate::Cycle`] at end of cycle. + +use crate::{TxAdded, TxRemoved, stores::AddrTransitions}; + +#[derive(Default)] +pub struct CycleDiff { + pub added: Vec, + pub removed: Vec, + pub addrs: AddrTransitions, +} diff --git a/crates/brk_mempool/src/lib.rs b/crates/brk_mempool/src/lib.rs index 1e547a296..ce861b8cd 100644 --- a/crates/brk_mempool/src/lib.rs +++ b/crates/brk_mempool/src/lib.rs @@ -16,8 +16,8 @@ //! pass, using same-cycle in-mempool parents directly and the //! caller-supplied resolver (default: `getrawtransaction`) for //! confirmed parents. -//! 5. [`steps::rebuilder::Rebuilder`] - throttled rebuild of the -//! projected-blocks `Snapshot` from the same-cycle GBT and min fee. +//! 5. [`steps::rebuilder::Rebuilder`] - rebuild of the projected-blocks +//! `Snapshot` from the same-cycle GBT and min fee. use std::{ any::Any, @@ -45,20 +45,24 @@ use tracing::error; mod cluster; mod cpfp; +mod cycle; +mod cycle_diff; mod diagnostics; mod rbf; mod state; pub(crate) mod steps; pub(crate) mod stores; +pub use cycle::{AddedKind, Cycle, TxAdded, TxRemoved}; pub use diagnostics::MempoolStats; pub use rbf::{RbfForTx, RbfNode}; -pub use steps::Snapshot; +pub use steps::{Snapshot, TxRemoval}; use steps::{Applier, Fetched, Fetcher, Preparer, Prevouts, Rebuilder}; -pub(crate) use steps::{BlockStats, RecommendedFees, TxEntry, TxRemoval}; -pub(crate) use stores::{TxStore, TxTombstone}; +pub(crate) use cycle_diff::CycleDiff; +pub(crate) use steps::{BlockStats, RecommendedFees, TxEntry}; +pub(crate) use stores::{AddrTransitions, TxStore, TxTombstone}; -/// Confirmed-parent prevout resolver passed to [`Mempool::update_with`] / +/// Confirmed-parent prevout resolver passed to [`Mempool::tick_with`] / /// [`Mempool::start_with`]. Receives a slice of `(parent_txid, vout)` /// holes and returns the subset that resolved. Unresolved holes are /// simply omitted from the map; the next cycle retries automatically. @@ -326,7 +330,9 @@ impl Mempool { /// Infinite update loop with a 500ms interval. Resolves /// confirmed-parent prevouts via the default `getrawtransaction` - /// resolver; requires bitcoind started with `txindex=1`. + /// resolver; requires bitcoind started with `txindex=1`. Drops + /// per-cycle [`Cycle`] events on the floor - use [`Mempool::tick`] + /// to consume them. pub fn start(&self) { self.start_with(Prevouts::rpc_resolver(self.0.client.clone())); } @@ -355,7 +361,7 @@ impl Mempool { loop { let started = Instant::now(); let outcome = catch_unwind(AssertUnwindSafe(|| { - if let Err(e) = self.update_with(&resolver) { + if let Err(e) = self.tick_with(&resolver) { error!("update failed: {e}"); } })); @@ -371,14 +377,23 @@ impl Mempool { } } - /// One sync cycle: fetch, prepare, apply, fill prevouts, maybe - /// rebuild. The resolver MUST resolve confirmed prevouts only; - /// mempool-to-mempool chains are wired internally and the - /// resolver is never called for them. - fn update_with(&self, resolver: F) -> Result<()> + /// One sync cycle: fetch, prepare, apply, fill prevouts, rebuild. + /// Returns a [`Cycle`] reporting everything that changed. Uses the + /// default `getrawtransaction` resolver for confirmed-parent + /// prevouts (requires `txindex=1`). + pub fn tick(&self) -> Result { + self.tick_with(Prevouts::rpc_resolver(self.0.client.clone())) + } + + /// Variant of [`Mempool::tick`] with a caller-supplied resolver for + /// confirmed-parent prevouts. The resolver MUST resolve confirmed + /// prevouts only; mempool-to-mempool chains are wired internally + /// and the resolver is never called for them. + pub fn tick_with(&self, resolver: F) -> Result where F: Fn(&[(Txid, Vout)]) -> FxHashMap<(Txid, Vout), TxOut>, { + let started = Instant::now(); let Inner { client, state, @@ -387,18 +402,30 @@ impl Mempool { } = &*self.0; let Fetched { - live_txids, + state: rpc, new_entries, new_txs, - gbt_txids, - min_fee, + block_template_txids, } = Fetcher::fetch(client, state)?; - let pulled = Preparer::prepare(&live_txids, new_entries, new_txs, state); - Applier::apply(state, rebuilder, pulled); - Prevouts::fill(state, resolver); - rebuilder.tick(state, &gbt_txids, min_fee); + let pulled = Preparer::prepare(&rpc.live_txids, new_entries, new_txs, state); + let mut diff = CycleDiff::default(); + Applier::apply(state, rebuilder, pulled, &mut diff); + Prevouts::fill(state, &mut diff, resolver); + rebuilder.tick(state, &block_template_txids, rpc.min_fee); + let CycleDiff { added, removed, addrs } = diff; + let (addr_enters, addr_leaves) = addrs.into_vecs(); - Ok(()) + Ok(Cycle { + added, + removed, + addr_enters, + addr_leaves, + tip_hash: rpc.tip_hash, + tip_height: rpc.tip_height, + info: self.info(), + snapshot: rebuilder.snapshot(), + took: started.elapsed(), + }) } } diff --git a/crates/brk_mempool/src/steps/applier.rs b/crates/brk_mempool/src/steps/applier.rs index 70b876a60..3fa498193 100644 --- a/crates/brk_mempool/src/steps/applier.rs +++ b/crates/brk_mempool/src/steps/applier.rs @@ -2,7 +2,8 @@ use brk_types::{Transaction, TxidPrefix}; use parking_lot::RwLock; use crate::{ - State, TxEntry, TxRemoval, + AddrTransitions, CycleDiff, State, TxEntry, TxRemoval, + cycle::{TxAdded, TxRemoved}, steps::{ preparer::{TxAddition, TxsPulled}, rebuilder::{Rebuilder, Snapshot}, @@ -10,7 +11,8 @@ use crate::{ }; /// Applies a prepared diff to in-memory mempool state under one write -/// guard. Body proceeds: bury removed → publish added → evict. +/// guard. Body proceeds: bury removed → publish added → evict. Events +/// are pushed into the caller-supplied [`CycleDiff`] accumulator. pub struct Applier; impl Applier { @@ -19,44 +21,75 @@ impl Applier { /// package-aware via local linearization). The fallback to /// `entry.fee_rate()` is unreachable in steady state - every burial /// target was alive at the previous tick, so the snapshot has it. - pub fn apply(lock: &RwLock, rebuilder: &Rebuilder, pulled: TxsPulled) { + pub fn apply( + lock: &RwLock, + rebuilder: &Rebuilder, + pulled: TxsPulled, + diff: &mut CycleDiff, + ) { let TxsPulled { added, removed } = pulled; let mut state = lock.write(); - Self::bury_removals(&mut state, rebuilder, removed); - Self::publish_additions(&mut state, added); + Self::bury_removals(&mut state, rebuilder, &mut diff.addrs, &mut diff.removed, removed); + Self::publish_additions(&mut state, &mut diff.addrs, &mut diff.added, added); state.graveyard.evict_old(); } fn bury_removals( state: &mut State, rebuilder: &Rebuilder, + transitions: &mut AddrTransitions, + events: &mut Vec, removed: Vec<(TxidPrefix, TxRemoval)>, ) { let snapshot = rebuilder.snapshot(); + events.reserve(removed.len()); for (prefix, reason) in removed { - Self::bury_one(state, &snapshot, &prefix, reason); + if let Some(ev) = Self::bury_one(state, &snapshot, transitions, &prefix, reason) { + events.push(ev); + } } } - fn bury_one(state: &mut State, snapshot: &Snapshot, prefix: &TxidPrefix, reason: TxRemoval) { - let Some(record) = state.txs.remove_by_prefix(prefix) else { - return; - }; + fn bury_one( + state: &mut State, + snapshot: &Snapshot, + transitions: &mut AddrTransitions, + prefix: &TxidPrefix, + reason: TxRemoval, + ) -> Option { + let record = state.txs.remove_by_prefix(prefix)?; let chunk_rate = snapshot .chunk_rate_for(prefix) .unwrap_or_else(|| record.entry.fee_rate()); + let txid = record.entry.txid; state.info.remove(&record.tx, record.entry.fee); - state.addrs.remove_tx(&record.tx); + state.addrs.remove_tx(transitions, &record.tx); state.outpoint_spends.remove_spends(&record.tx, *prefix); state .graveyard .bury(record.tx, record.entry, chunk_rate, reason); + Some(TxRemoved { txid, reason, chunk_rate }) } - fn publish_additions(state: &mut State, added: Vec) { + fn publish_additions( + state: &mut State, + transitions: &mut AddrTransitions, + events: &mut Vec, + added: Vec, + ) { + events.reserve(added.len()); for addition in added { + let kind = addition.kind(); if let Some((tx, entry)) = Self::resolve_addition(state, addition) { - Self::publish_one(state, tx, entry); + events.push(TxAdded { + txid: entry.txid, + fee: entry.fee, + vsize: entry.vsize, + fee_rate: entry.fee_rate(), + first_seen: entry.first_seen, + kind, + }); + Self::publish_one(state, transitions, tx, entry); } } } @@ -71,10 +104,15 @@ impl Applier { } } - fn publish_one(state: &mut State, tx: Transaction, entry: TxEntry) { + fn publish_one( + state: &mut State, + transitions: &mut AddrTransitions, + tx: Transaction, + entry: TxEntry, + ) { let prefix = entry.txid_prefix(); state.info.add(&tx, entry.fee); - state.addrs.add_tx(&tx); + state.addrs.add_tx(transitions, &tx); state.outpoint_spends.insert_spends(&tx, prefix); state.txs.insert(tx, entry); } diff --git a/crates/brk_mempool/src/steps/fetcher/fetched.rs b/crates/brk_mempool/src/steps/fetcher/fetched.rs index aac3742dd..8c1a78060 100644 --- a/crates/brk_mempool/src/steps/fetcher/fetched.rs +++ b/crates/brk_mempool/src/steps/fetcher/fetched.rs @@ -1,10 +1,13 @@ -use brk_types::{FeeRate, MempoolEntryInfo, Txid}; +use brk_rpc::MempoolState; +use brk_types::{MempoolEntryInfo, Txid}; use rustc_hash::FxHashMap; pub struct Fetched { - /// Every txid currently in the mempool (from `getrawmempool false`). - /// Used to derive the `live` set for removal classification. - pub live_txids: Vec, + /// Passthrough fields from the batched RPC fetch: live txid set, + /// fee floor, chain tip. `live_txids` is the union of + /// `getrawmempool` and `getblocktemplate` (see [`super::Fetcher::fetch`]), + /// so downstream sees a single coherent "live" view. + pub state: MempoolState, /// `MempoolEntryInfo` for newly-observed txids only (existing ones /// keep their first-sight entry on the live store). pub new_entries: Vec, @@ -13,6 +16,5 @@ pub struct Fetched { /// already been folded into `new_entries`/`new_txs` (or were already /// in the pool); the Rebuilder only needs the txid sequence to /// project Core's exact selection. - pub gbt_txids: Vec, - pub min_fee: FeeRate, + pub block_template_txids: Vec, } diff --git a/crates/brk_mempool/src/steps/fetcher/mod.rs b/crates/brk_mempool/src/steps/fetcher/mod.rs index 15db713c7..f45820b88 100644 --- a/crates/brk_mempool/src/steps/fetcher/mod.rs +++ b/crates/brk_mempool/src/steps/fetcher/mod.rs @@ -3,7 +3,7 @@ mod fetched; pub use fetched::Fetched; use brk_error::Result; -use brk_rpc::{Client, MempoolState}; +use brk_rpc::Client; use brk_types::{MempoolEntryInfo, Timestamp, Txid, VSize}; use parking_lot::RwLock; use rustc_hash::FxHashSet; @@ -28,37 +28,31 @@ const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000; /// Core's exact selection because we never ask for that data twice. /// /// Confirmed prevouts are resolved post-apply by the caller-supplied -/// resolver passed to `Mempool::update_with`, so the in-crate path no +/// resolver passed to `Mempool::tick_with`, so the in-crate path no /// longer issues a third batch for parents. pub struct Fetcher; impl Fetcher { pub fn fetch(client: &Client, lock: &RwLock) -> Result { - let MempoolState { - live_txids, - gbt, - min_fee, - } = client.fetch_mempool_state()?; + let (mut state, block_template) = client.fetch_mempool_state()?; // One read snapshot decides both the RPC fetch list and the // GBT-synthesis set, so they agree on what's "already known". - // Graveyard txs are treated as known so a re-broadcast still - // flows through `Preparer::classify_addition` and lands as - // [`crate::TxAddition::Revived`]. let (new_txids, gbt_synth_set) = { - let state = lock.read(); + let mempool = lock.read(); let mut gbt_txids: FxHashSet = - FxHashSet::with_capacity_and_hasher(gbt.len(), Default::default()); + FxHashSet::with_capacity_and_hasher(block_template.len(), Default::default()); let mut gbt_synth_set: FxHashSet = FxHashSet::default(); - for g in &gbt { + for g in &block_template { gbt_txids.insert(g.txid); - if !state.txs.contains(&g.txid) { + if !mempool.txs.contains(&g.txid) { gbt_synth_set.insert(g.txid); } } - let new_txids: Vec = live_txids + let new_txids: Vec = state + .live_txids .iter() - .filter(|t| !state.txs.contains(t) && !gbt_txids.contains(t)) + .filter(|t| !mempool.txs.contains(t) && !gbt_txids.contains(t)) .take(MAX_TX_FETCHES_PER_CYCLE) .copied() .collect(); @@ -69,17 +63,18 @@ impl Fetcher { new_entries.reserve(gbt_synth_set.len()); new_txs.reserve(gbt_synth_set.len()); - // Consume `gbt` by value: GBT-only txs move their body and - // depends into the synthesis path (no clones), and the GBT - // ordering is captured as a `Vec` for the Rebuilder, which - // is the only downstream consumer and only reads txids. + // Consume `block_template` by value: GBT-only txs move their + // body and depends into the synthesis path (no clones), and + // the GBT ordering is captured as a `Vec` for the + // Rebuilder, which is the only downstream consumer and only + // reads txids. // // GBT carries no per-tx arrival timestamp. `now` is correct to // within ~1 cycle for a tx that just entered Core's mempool // (the only kind that triggers synthesis: not in our pool yet // means it just appeared this cycle). let now = Timestamp::now(); - let gbt_txids: Vec = gbt + let block_template_txids: Vec = block_template .into_iter() .map(|g| { let txid = g.txid; @@ -98,12 +93,19 @@ impl Fetcher { }) .collect(); + // Promote `live_txids` to the union of `getrawmempool` and GBT: + // the two RPC views can disagree by a cycle, so a tx visible to + // GBT but missing from `getrawmempool` (or vice versa) is still + // alive. Without the union, GBT-only txs would oscillate enter ↔ + // leave every cycle as `Preparer::classify_removals` buried what + // GBT had just resurrected. + state.live_txids.extend(block_template_txids.iter().copied()); + Ok(Fetched { - live_txids, + state, new_entries, new_txs, - gbt_txids, - min_fee, + block_template_txids, }) } } diff --git a/crates/brk_mempool/src/steps/mod.rs b/crates/brk_mempool/src/steps/mod.rs index 58a0e2bcf..4fdfe5c71 100644 --- a/crates/brk_mempool/src/steps/mod.rs +++ b/crates/brk_mempool/src/steps/mod.rs @@ -9,7 +9,8 @@ mod rebuilder; pub(crate) use applier::Applier; pub(crate) use fetcher::{Fetched, Fetcher}; -pub(crate) use preparer::{Preparer, TxEntry, TxRemoval}; +pub(crate) use preparer::{Preparer, TxEntry}; +pub use preparer::TxRemoval; pub(crate) use prevouts::Prevouts; pub(crate) use rebuilder::{BlockStats, RecommendedFees, Rebuilder, SnapTx, TxIndex}; pub use rebuilder::Snapshot; diff --git a/crates/brk_mempool/src/steps/preparer/mod.rs b/crates/brk_mempool/src/steps/preparer/mod.rs index cd20380b9..7e2207c93 100644 --- a/crates/brk_mempool/src/steps/preparer/mod.rs +++ b/crates/brk_mempool/src/steps/preparer/mod.rs @@ -6,7 +6,7 @@ //! - **fresh** - decoded from `new_raws`, prevouts resolved against //! the live mempool only. Confirmed-parent prevouts land as //! `prevout: None` and are filled post-apply by the resolver passed -//! to `Mempool::update_with`. +//! to `Mempool::tick_with`. //! //! Existing entries are not re-classified - they keep their first-sight //! state on the live store. Removals are inferred by cross-referencing diff --git a/crates/brk_mempool/src/steps/preparer/tx_addition.rs b/crates/brk_mempool/src/steps/preparer/tx_addition.rs index 56bf51cee..8957db248 100644 --- a/crates/brk_mempool/src/steps/preparer/tx_addition.rs +++ b/crates/brk_mempool/src/steps/preparer/tx_addition.rs @@ -4,14 +4,14 @@ //! prevouts against the live mempool (same-cycle parents), build a //! full `Transaction` + `Entry`. Confirmed parents land as //! `prevout: None` and are filled post-apply by the resolver passed -//! to `Mempool::update_with`. +//! to `Mempool::tick_with`. //! - **Revived** - tx in the graveyard. Rebuild the `Entry` only //! (preserving `rbf`, `size`). The Applier exhumes the cached tx //! body. No raw decoding. use brk_types::{MempoolEntryInfo, SigOps, Transaction, TxIn, TxOut, TxStatus, Txid, Vout}; -use crate::{TxTombstone, stores::TxStore}; +use crate::{TxTombstone, cycle::AddedKind, stores::TxStore}; use super::TxEntry; @@ -21,9 +21,16 @@ pub enum TxAddition { } impl TxAddition { + pub fn kind(&self) -> AddedKind { + match self { + Self::Fresh { .. } => AddedKind::Fresh, + Self::Revived { .. } => AddedKind::Revived, + } + } + /// Resolves prevouts against the live mempool only. Confirmed /// parents land with `prevout: None` and are filled by the - /// resolver supplied to `Mempool::update_with` in the same cycle. + /// resolver supplied to `Mempool::tick_with` in the same cycle. pub(super) fn fresh( info: &MempoolEntryInfo, tx: bitcoin::Transaction, @@ -64,8 +71,12 @@ impl TxAddition { built } + /// Preserves the tomb's original `first_seen`: bitcoind resets the + /// timestamp on re-acceptance (and GBT synthesis carries "now"), but + /// the consumer wants the first-ever sighting, not the latest one. pub(super) fn revived(info: &MempoolEntryInfo, tomb: &TxTombstone) -> Self { - let entry = TxEntry::new(info, tomb.entry.size, tomb.entry.rbf); + let mut entry = TxEntry::new(info, tomb.entry.size, tomb.entry.rbf); + entry.first_seen = tomb.entry.first_seen; Self::Revived { entry } } diff --git a/crates/brk_mempool/src/steps/preparer/tx_removal.rs b/crates/brk_mempool/src/steps/preparer/tx_removal.rs index 451a9b2f1..4242fc878 100644 --- a/crates/brk_mempool/src/steps/preparer/tx_removal.rs +++ b/crates/brk_mempool/src/steps/preparer/tx_removal.rs @@ -11,7 +11,7 @@ use brk_types::Txid; /// `Vanished` = any other reason we can't distinguish from the data at /// hand (mined, expired, evicted, or replaced by a tx we didn't fetch /// due to the per-cycle fetch cap). -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub enum TxRemoval { Replaced { by: Txid }, Vanished, diff --git a/crates/brk_mempool/src/steps/prevouts.rs b/crates/brk_mempool/src/steps/prevouts.rs index 08068082c..2569677ff 100644 --- a/crates/brk_mempool/src/steps/prevouts.rs +++ b/crates/brk_mempool/src/steps/prevouts.rs @@ -26,7 +26,7 @@ use parking_lot::RwLock; use rustc_hash::{FxHashMap, FxHashSet}; use tracing::warn; -use crate::{State, stores::TxStore}; +use crate::{CycleDiff, State, stores::TxStore}; pub struct Prevouts; @@ -41,7 +41,7 @@ impl Prevouts { /// in-mempool parents are filled lock-locally; the remainder go /// through `resolver` (one batched call) outside any lock. Returns /// true iff anything was written. - pub fn fill(lock: &RwLock, resolver: F) -> bool + pub fn fill(lock: &RwLock, diff: &mut CycleDiff, resolver: F) -> bool where F: Fn(&[(Txid, Vout)]) -> Resolved, { @@ -59,7 +59,7 @@ impl Prevouts { for (txid, fills) in in_mempool.into_iter().chain(external) { let prefix = TxidPrefix::from(&txid); for prevout in state.txs.apply_fills(&prefix, fills) { - state.addrs.add_input(&txid, &prevout); + state.addrs.add_input(&mut diff.addrs, &txid, &prevout); } } true diff --git a/crates/brk_mempool/src/stores/addr_tracker/addr_transitions.rs b/crates/brk_mempool/src/stores/addr_tracker/addr_transitions.rs new file mode 100644 index 000000000..5a7aaa626 --- /dev/null +++ b/crates/brk_mempool/src/stores/addr_tracker/addr_transitions.rs @@ -0,0 +1,41 @@ +//! Per-cycle 0↔1+ address transition buffer. +//! +//! Lives on the stack inside [`crate::Mempool::tick_with`], not on a +//! long-lived store, so the set naturally resets between cycles. +//! Same-cycle cancellation (enter→leave, leave→enter, and the 3-step +//! enter→leave→enter / leave→enter→leave variants) is encapsulated on +//! the recording methods so callers just announce raw 0↔1+ flips. + +use brk_types::AddrBytes; +use rustc_hash::FxHashSet; + +#[derive(Default)] +pub struct AddrTransitions { + enters: FxHashSet, + leaves: FxHashSet, +} + +impl AddrTransitions { + /// Address just went 0 → 1+ live mempool txs. Cancels a pending + /// `leave` for the same address in this cycle. + pub fn record_enter(&mut self, bytes: AddrBytes) { + if !self.leaves.remove(&bytes) { + self.enters.insert(bytes); + } + } + + /// Address just went 1+ → 0 live mempool txs. Cancels a pending + /// `enter` for the same address in this cycle. + pub fn record_leave(&mut self, bytes: AddrBytes) { + if !self.enters.remove(&bytes) { + self.leaves.insert(bytes); + } + } + + pub fn into_vecs(self) -> (Vec, Vec) { + ( + self.enters.into_iter().collect(), + self.leaves.into_iter().collect(), + ) + } +} diff --git a/crates/brk_mempool/src/stores/addr_tracker/mod.rs b/crates/brk_mempool/src/stores/addr_tracker/mod.rs index 645534d2e..14d4b2a76 100644 --- a/crates/brk_mempool/src/stores/addr_tracker/mod.rs +++ b/crates/brk_mempool/src/stores/addr_tracker/mod.rs @@ -8,37 +8,39 @@ use derive_more::Deref; use rustc_hash::{FxHashMap, FxHasher}; mod addr_entry; +mod addr_transitions; use addr_entry::AddrEntry; +pub use addr_transitions::AddrTransitions; #[derive(Default, Deref)] pub struct AddrTracker(FxHashMap); impl AddrTracker { - pub fn add_tx(&mut self, tx: &Transaction) { + pub fn add_tx(&mut self, transitions: &mut AddrTransitions, tx: &Transaction) { let txid = &tx.txid; for txin in &tx.input { if let Some(prevout) = txin.prevout.as_ref() { - self.add_input(txid, prevout); + self.add_input(transitions, txid, prevout); } } for txout in &tx.output { if let Some(bytes) = txout.addr_bytes() { - self.apply_add(bytes, txid, |stats| stats.receiving(txout)); + self.apply_add(transitions, bytes, txid, |stats| stats.receiving(txout)); } } } - pub fn remove_tx(&mut self, tx: &Transaction) { + pub fn remove_tx(&mut self, transitions: &mut AddrTransitions, tx: &Transaction) { let txid = &tx.txid; for txin in &tx.input { if let Some(prevout) = txin.prevout.as_ref() { - self.remove_input(txid, prevout); + self.remove_input(transitions, txid, prevout); } } for txout in &tx.output { if let Some(bytes) = txout.addr_bytes() { - self.apply_remove(bytes, txid, |stats| stats.received(txout)); + self.apply_remove(transitions, bytes, txid, |stats| stats.received(txout)); } } } @@ -62,34 +64,58 @@ impl AddrTracker { /// previously `None` has been filled, and by `add_tx` for each /// resolved input. Inputs whose prevout doesn't resolve to an addr /// are no-ops. - pub fn add_input(&mut self, txid: &Txid, prevout: &TxOut) { + pub fn add_input( + &mut self, + transitions: &mut AddrTransitions, + txid: &Txid, + prevout: &TxOut, + ) { let Some(bytes) = prevout.addr_bytes() else { return; }; - self.apply_add(bytes, txid, |stats| stats.sending(prevout)); + self.apply_add(transitions, bytes, txid, |stats| stats.sending(prevout)); } - fn remove_input(&mut self, txid: &Txid, prevout: &TxOut) { + fn remove_input( + &mut self, + transitions: &mut AddrTransitions, + txid: &Txid, + prevout: &TxOut, + ) { let Some(bytes) = prevout.addr_bytes() else { return; }; - self.apply_remove(bytes, txid, |stats| stats.sent(prevout)); + self.apply_remove(transitions, bytes, txid, |stats| stats.sent(prevout)); } fn apply_add( &mut self, + transitions: &mut AddrTransitions, bytes: AddrBytes, txid: &Txid, update_stats: impl FnOnce(&mut AddrMempoolStats), ) { - let entry = self.0.entry(bytes).or_default(); - entry.txids.insert(*txid); - update_stats(&mut entry.stats); - entry.stats.update_tx_count(entry.txids.len() as u32); + match self.0.entry(bytes) { + MapEntry::Occupied(mut occupied) => { + let entry = occupied.get_mut(); + entry.txids.insert(*txid); + update_stats(&mut entry.stats); + entry.stats.update_tx_count(entry.txids.len() as u32); + } + MapEntry::Vacant(vacant) => { + let key = vacant.key().clone(); + let entry = vacant.insert(AddrEntry::default()); + entry.txids.insert(*txid); + update_stats(&mut entry.stats); + entry.stats.update_tx_count(entry.txids.len() as u32); + transitions.record_enter(key); + } + } } fn apply_remove( &mut self, + transitions: &mut AddrTransitions, bytes: AddrBytes, txid: &Txid, update_stats: impl FnOnce(&mut AddrMempoolStats), @@ -102,7 +128,8 @@ impl AddrTracker { update_stats(&mut entry.stats); let len = entry.txids.len(); if len == 0 { - occupied.remove(); + let (bytes, _) = occupied.remove_entry(); + transitions.record_leave(bytes); } else { entry.stats.update_tx_count(len as u32); } diff --git a/crates/brk_mempool/src/stores/mod.rs b/crates/brk_mempool/src/stores/mod.rs index fbb378e96..a5ece6154 100644 --- a/crates/brk_mempool/src/stores/mod.rs +++ b/crates/brk_mempool/src/stores/mod.rs @@ -9,7 +9,7 @@ pub(crate) mod output_bins; pub(crate) mod tx_graveyard; pub(crate) mod tx_store; -pub(crate) use addr_tracker::AddrTracker; +pub(crate) use addr_tracker::{AddrTracker, AddrTransitions}; pub(crate) use outpoint_spends::OutpointSpends; pub(crate) use output_bins::OutputBins; pub(crate) use tx_graveyard::{TxGraveyard, TxTombstone}; diff --git a/crates/brk_reader/src/canonical.rs b/crates/brk_reader/src/canonical.rs index 094f9b142..65e318480 100644 --- a/crates/brk_reader/src/canonical.rs +++ b/crates/brk_reader/src/canonical.rs @@ -14,7 +14,7 @@ pub struct CanonicalRange { impl CanonicalRange { pub fn walk(client: &Client, anchor: Option<&BlockHash>, tip: Height) -> Result { let start = match anchor { - Some(hash) => Height::from(client.get_block_header_info(hash)?.height + 1), + Some(hash) => Height::from((client.get_block_header_info(hash)?.height + 1) as u64), None => Height::ZERO, }; let mut range = Self::between(client, start, tip)?; diff --git a/crates/brk_rpc/src/lib.rs b/crates/brk_rpc/src/lib.rs index c3f31f2d4..7713fc9de 100644 --- a/crates/brk_rpc/src/lib.rs +++ b/crates/brk_rpc/src/lib.rs @@ -5,36 +5,16 @@ use std::{ time::Duration, }; -use bitcoin::ScriptBuf; use brk_error::Result; -use brk_types::{BlockHash, Sats, Txid, Weight}; +use brk_types::{Sats, Txid, Weight}; mod client; mod methods; use client::ClientInner; +pub use corepc_types::v17::{GetBlockHeaderVerbose, GetBlockVerboseOne, GetTxOut}; pub use methods::MempoolState; -#[derive(Debug, Clone)] -pub struct BlockInfo { - pub height: usize, - pub confirmations: i64, -} - -#[derive(Debug, Clone)] -pub struct BlockHeaderInfo { - pub height: usize, - pub confirmations: i64, - pub previous_block_hash: Option, -} - -#[derive(Debug, Clone)] -pub struct TxOutInfo { - pub coinbase: bool, - pub value: Sats, - pub script_pub_key: ScriptBuf, -} - /// One transaction from `getblocktemplate`. Carries the full decoded /// body and stats so block 0 can be projected without a follow-up /// `getmempoolentry`/`getrawtransaction` per tx; that follow-up was the diff --git a/crates/brk_rpc/src/methods.rs b/crates/brk_rpc/src/methods.rs index 7483bb3f7..78803f064 100644 --- a/crates/brk_rpc/src/methods.rs +++ b/crates/brk_rpc/src/methods.rs @@ -9,13 +9,14 @@ use brk_types::{ use corepc_jsonrpc::error::Error as JsonRpcError; use corepc_types::{ v17::{ - GetBlockCount, GetBlockHash, GetBlockHeader, GetBlockHeaderVerbose, GetBlockVerboseOne, - GetBlockVerboseZero, GetRawMempool, GetTxOut, + BlockTemplateTransaction, GetBlockCount, GetBlockHash, GetBlockHeader, + GetBlockHeaderVerbose, GetBlockTemplate, GetBlockVerboseOne, GetBlockVerboseZero, + GetRawMempool, GetTxOut, }, - v24::GetMempoolInfo, + v28::GetBlockchainInfo, + v24::{GetMempoolInfo, MempoolEntry}, }; use rustc_hash::FxHashMap; -use serde::Deserialize; use serde_json::Value; use tracing::{debug, info}; @@ -24,7 +25,7 @@ use tracing::{debug, info}; /// The mempool fetcher tolerates these per-item failures silently. const RPC_NOT_FOUND: i32 = -5; -use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, TxOutInfo}; +use crate::{BlockTemplateTx, Client}; /// Per-batch request count for `get_block_hashes_range`, /// `fetch_new_pool_data`, and `get_raw_transactions`. Sized so the JSON @@ -34,46 +35,21 @@ use crate::{BlockHeaderInfo, BlockInfo, BlockTemplateTx, Client, TxOutInfo}; /// the wire batch is twice that. const BATCH_CHUNK: usize = 2000; -/// Live mempool state fetched in one batched bitcoind round-trip: -/// `getblocktemplate` + `getrawmempool false` + `getmempoolinfo`. Each -/// `gbt` entry carries the full decoded tx and stats so block 0 is -/// projected directly from Core's selection without a follow-up entry -/// fetch that could race the eviction of one of those txs. +/// Mempool snapshot data that survives one fetch cycle: the live +/// txid set, fee floor, and chain tip. Returned alongside the raw +/// `block_template` (which Fetcher consumes for GBT synthesis) by +/// `Client::fetch_mempool_state`. pub struct MempoolState { pub live_txids: Vec, - pub gbt: Vec, pub min_fee: FeeRate, + /// Chain tip's hash (block-template's `previousblockhash`). + /// Compared between cycles to detect newly mined blocks. + pub tip_hash: BlockHash, + /// Chain tip's height (block-template's `height` minus one). + pub tip_height: Height, } -#[derive(Deserialize)] -struct MempoolEntryRaw { - vsize: VSize, - weight: Weight, - time: Timestamp, - fees: MempoolEntryFeesRaw, - depends: Vec, -} - -#[derive(Deserialize)] -struct MempoolEntryFeesRaw { - base: Bitcoin, -} - -#[derive(Deserialize)] -struct GbtResponseRaw { - transactions: Vec, -} - -#[derive(Deserialize)] -struct GbtTxRaw { - data: String, - txid: bitcoin::Txid, - fee: u64, - weight: u64, - depends: Vec, -} - -fn build_entry(txid: Txid, e: MempoolEntryRaw) -> Result { +fn build_entry(txid: Txid, e: MempoolEntry) -> Result { let depends = e .depends .iter() @@ -81,36 +57,47 @@ fn build_entry(txid: Txid, e: MempoolEntryRaw) -> Result { .collect::>>()?; Ok(MempoolEntryInfo { txid, - vsize: e.vsize, - weight: e.weight, - fee: Sats::from(e.fees.base), - first_seen: e.time, + vsize: VSize::from(e.vsize as u64), + weight: Weight::from(e.weight as u64), + fee: Sats::from(Bitcoin::from(e.fees.base)), + first_seen: Timestamp::from(e.time), depends, }) } -fn build_gbt(raw: GbtResponseRaw) -> Result> { - // Pass 1: decode bodies and stash the 1-based GBT-array indices - // aside so we can drop each `data` hex string and `GbtTxRaw` as +fn build_gbt(raw: GetBlockTemplate) -> Result> { + // Pass 1: decode bodies and stash the 1-based GBT-array indices aside + // so each `data` hex string and `BlockTemplateTransaction` drops as // soon as the tx is pushed. let n = raw.transactions.len(); - let mut depends_idx: Vec> = Vec::with_capacity(n); + let mut depends_idx: Vec> = Vec::with_capacity(n); let mut result: Vec = Vec::with_capacity(n); for t in raw.transactions { - depends_idx.push(t.depends); + let BlockTemplateTransaction { + data, + txid, + depends, + fee, + weight, + .. + } = t; + depends_idx.push(depends); result.push(BlockTemplateTx { - txid: Txid::from(t.txid), - fee: Sats::from(t.fee), - weight: Weight::from(t.weight), + txid: Client::parse_txid(&txid, "gbt txid")?, + fee: Sats::from(fee as u64), + weight: Weight::from(weight), depends: Vec::new(), - tx: encode::deserialize_hex(&t.data)?, + tx: encode::deserialize_hex(&data)?, }); } // Pass 2: resolve indices to txids now that the array is complete. for (i, indices) in depends_idx.iter().enumerate() { let resolved: Vec = indices .iter() - .filter_map(|d| result.get((*d as usize).checked_sub(1)?).map(|t| t.txid)) + .filter_map(|d| { + let idx = usize::try_from(*d).ok()?.checked_sub(1)?; + result.get(idx).map(|t| t.txid) + }) .collect(); result[i].depends = resolved; } @@ -150,18 +137,13 @@ impl Client { .map_err(|e| Error::Parse(format!("decode getblock: {e}"))) } - pub fn get_block_info<'a, H>(&self, hash: &'a H) -> Result + pub fn get_block_info<'a, H>(&self, hash: &'a H) -> Result where &'a H: Into<&'a bitcoin::BlockHash>, { let hash: &bitcoin::BlockHash = hash.into(); - let r: GetBlockVerboseOne = self - .0 - .call_with_retry("getblock", &[serde_json::to_value(hash)?, Value::from(1u8)])?; - Ok(BlockInfo { - height: r.height as usize, - confirmations: r.confirmations, - }) + self.0 + .call_with_retry("getblock", &[serde_json::to_value(hash)?, Value::from(1u8)]) } pub fn get_block_header<'a, H>(&self, hash: &'a H) -> Result @@ -177,23 +159,13 @@ impl Client { bitcoin::consensus::deserialize::(&bytes).map_err(Error::from) } - pub fn get_block_header_info<'a, H>(&self, hash: &'a H) -> Result + pub fn get_block_header_info<'a, H>(&self, hash: &'a H) -> Result where &'a H: Into<&'a bitcoin::BlockHash>, { let hash: &bitcoin::BlockHash = hash.into(); - let r: GetBlockHeaderVerbose = self - .0 - .call_with_retry("getblockheader", &[serde_json::to_value(hash)?])?; - let previous_block_hash = r - .previous_block_hash - .map(|s| Self::parse_block_hash(&s, "previousblockhash")) - .transpose()?; - Ok(BlockHeaderInfo { - height: r.height as usize, - confirmations: r.confirmations, - previous_block_hash, - }) + self.0 + .call_with_retry("getblockheader", &[serde_json::to_value(hash)?]) } pub fn get_block_hash(&self, height: H) -> Result @@ -244,7 +216,7 @@ impl Client { txid: &Txid, vout: Vout, include_mempool: Option, - ) -> Result> { + ) -> Result> { let txid: &bitcoin::Txid = txid.into(); let mut args: Vec = vec![ serde_json::to_value(txid)?, @@ -253,19 +225,7 @@ impl Client { if let Some(mempool) = include_mempool { args.push(Value::Bool(mempool)); } - let r: Option = self.0.call_with_retry("gettxout", &args)?; - match r { - Some(r) => { - let script_pub_key = bitcoin::ScriptBuf::from_hex(&r.script_pubkey.hex) - .map_err(|e| Error::Parse(format!("script hex: {e}")))?; - Ok(Some(TxOutInfo { - coinbase: r.coinbase, - value: Sats::from(Bitcoin::from(r.value)), - script_pub_key, - })) - } - None => Ok(None), - } + self.0.call_with_retry("gettxout", &args) } pub fn get_raw_mempool(&self) -> Result> { @@ -394,7 +354,11 @@ impl Client { /// carries each tx's full body and stats, so block 0 is exact even /// when a tx vanishes from the mempool listing between the GBT and /// `getrawmempool` calls; no follow-up entry fetch can race it. - pub fn fetch_mempool_state(&self) -> Result { + /// Returns the passthrough `MempoolState` and the raw + /// `block_template` (consumed downstream by GBT synthesis), in one + /// batched round-trip: `getblocktemplate` + `getrawmempool false` + /// + `getmempoolinfo`. + pub fn fetch_mempool_state(&self) -> Result<(MempoolState, Vec)> { let requests: [(&str, Vec); 3] = [ ( "getblocktemplate", @@ -404,7 +368,7 @@ impl Client { ("getmempoolinfo", vec![]), ]; let mut out = self.0.call_mixed_batch(&requests)?.into_iter(); - let gbt_raw = out.next().ok_or(Error::Internal("missing gbt"))??; + let template_raw = out.next().ok_or(Error::Internal("missing gbt"))??; let txids_raw = out.next().ok_or(Error::Internal("missing rawmempool"))??; let info_raw = out.next().ok_or(Error::Internal("missing mempoolinfo"))??; @@ -413,14 +377,23 @@ impl Client { .iter() .map(|s| Self::parse_txid(s, "mempool txid")) .collect::>>()?; - let gbt = build_gbt(serde_json::from_str(gbt_raw.get())?)?; + let template: GetBlockTemplate = serde_json::from_str(template_raw.get())?; + let tip_hash = Self::parse_block_hash(&template.previous_block_hash, "previousblockhash")?; + let tip_height = Height::from(u64::try_from(template.height - 1).map_err(|_| { + Error::Parse(format!("gbt height out of range: {}", template.height)) + })?); + let block_template = build_gbt(template)?; let min_fee = build_min_fee(serde_json::from_str(info_raw.get())?); - Ok(MempoolState { - live_txids, - gbt, - min_fee, - }) + Ok(( + MempoolState { + live_txids, + min_fee, + tip_hash, + tip_height, + }, + block_template, + )) } /// Mixed batch of `getmempoolentry` + `getrawtransaction` for the @@ -453,7 +426,7 @@ impl Client { let raw_res = iter.next().ok_or(Error::Internal("missing raw"))?; match entry_res.and_then(|raw| { - let me: MempoolEntryRaw = serde_json::from_str(raw.get())?; + let me: MempoolEntry = serde_json::from_str(raw.get())?; build_entry(*txid, me) }) { Ok(info) => entries.push(info), @@ -488,23 +461,31 @@ impl Client { loop { let info = self.get_block_header_info(¤t)?; if info.confirmations > 0 { - return Ok((info.height.into(), current)); + return Ok((Height::from(info.height as u64), current)); } - current = info.previous_block_hash.ok_or(Error::NotFound( + let prev = info.previous_block_hash.ok_or(Error::NotFound( "Reached genesis without finding main chain".into(), ))?; + current = Self::parse_block_hash(&prev, "previousblockhash")?; } } + pub fn get_blockchain_info(&self) -> Result { + self.0.call_with_retry("getblockchaininfo", &[]) + } + + /// Bitcoin network the connected node is running on, derived from + /// `getblockchaininfo.chain`. + pub fn get_network(&self) -> Result { + let chain = self.get_blockchain_info()?.chain; + bitcoin::Network::from_core_arg(&chain) + .map_err(|e| Error::Parse(format!("getblockchaininfo.chain '{chain}': {e}"))) + } + pub fn wait_for_synced_node(&self) -> Result<()> { - #[derive(Deserialize)] - struct SyncProgress { - headers: u64, - blocks: u64, - } let is_synced = || -> Result { - let p: SyncProgress = self.0.call_with_retry("getblockchaininfo", &[])?; - Ok(p.headers == p.blocks) + let info = self.get_blockchain_info()?; + Ok(info.headers == info.blocks) }; if !is_synced()? { diff --git a/crates/brk_types/src/addr_bytes.rs b/crates/brk_types/src/addr_bytes.rs index 6b535b0fb..58965ae40 100644 --- a/crates/brk_types/src/addr_bytes.rs +++ b/crates/brk_types/src/addr_bytes.rs @@ -8,7 +8,7 @@ use super::{ P2WSHBytes, }; -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum AddrBytes { P2PK65(P2PK65Bytes), // 65 P2PK33(P2PK33Bytes), // 33 diff --git a/crates/brk_types/src/recommended_fees.rs b/crates/brk_types/src/recommended_fees.rs index 20414c2d8..95e527327 100644 --- a/crates/brk_types/src/recommended_fees.rs +++ b/crates/brk_types/src/recommended_fees.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::FeeRate; /// Recommended fee rates in sat/vB -#[derive(Debug, Default, Clone, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct RecommendedFees { /// Fee rate for fastest confirmation (next block) diff --git a/crates/mmpl/Cargo.toml b/crates/mmpl/Cargo.toml new file mode 100644 index 000000000..75d962310 --- /dev/null +++ b/crates/mmpl/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "mmpl" +description = "A CLI to stream Bitcoin mempool events as NDJSON" +version.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +brk_error = { workspace = true } +brk_mempool = { workspace = true } +brk_rpc = { workspace = true } +brk_types = { workspace = true } +rustc-hash = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } + +[[bin]] +name = "mmpl" +path = "src/main.rs" diff --git a/crates/mmpl/src/args.rs b/crates/mmpl/src/args.rs new file mode 100644 index 000000000..0c21ce4f5 --- /dev/null +++ b/crates/mmpl/src/args.rs @@ -0,0 +1,83 @@ +use std::path::PathBuf; + +use brk_error::{Error, Result}; +use brk_rpc::{Auth, Client}; + +pub struct Args { + bitcoindir: Option, + rpcconnect: Option, + rpcport: Option, + rpccookiefile: Option, + rpcuser: Option, + rpcpassword: Option, +} + +impl Args { + pub fn parse(raw: Vec) -> Result { + let mut bitcoindir = None; + let mut rpcconnect = None; + let mut rpcport = None; + let mut rpccookiefile = None; + let mut rpcuser = None; + let mut rpcpassword = None; + let mut iter = raw.into_iter(); + while let Some(a) = iter.next() { + let rest = a + .strip_prefix("--") + .ok_or_else(|| Error::Parse(format!("unexpected arg: '{a}'")))?; + let (key, value) = match rest.split_once('=') { + Some((k, v)) => (k.to_string(), v.to_string()), + None => ( + rest.to_string(), + iter.next() + .ok_or_else(|| Error::Parse(format!("--{rest} requires a value")))?, + ), + }; + match key.as_str() { + "bitcoindir" => bitcoindir = Some(PathBuf::from(value)), + "rpcconnect" => rpcconnect = Some(value), + "rpcport" => { + rpcport = Some(value.parse().map_err(|_| { + Error::Parse(format!("--rpcport: '{value}' is not a valid port")) + })?); + } + "rpccookiefile" => rpccookiefile = Some(PathBuf::from(value)), + "rpcuser" => rpcuser = Some(value), + "rpcpassword" => rpcpassword = Some(value), + other => return Err(Error::Parse(format!("unknown flag --{other}"))), + } + } + Ok(Self { + bitcoindir, + rpcconnect, + rpcport, + rpccookiefile, + rpcuser, + rpcpassword, + }) + } + + pub fn rpc(&self) -> Result { + let host = self.rpcconnect.as_deref().unwrap_or("localhost"); + let port = self.rpcport.unwrap_or(8332); + let url = format!("http://{host}:{port}"); + let bitcoin_dir = self + .bitcoindir + .clone() + .unwrap_or_else(Client::default_bitcoin_path); + let cookie = self + .rpccookiefile + .clone() + .unwrap_or_else(|| bitcoin_dir.join(".cookie")); + let auth = if cookie.is_file() { + Auth::CookieFile(cookie) + } else if let (Some(u), Some(p)) = (self.rpcuser.as_deref(), self.rpcpassword.as_deref()) { + Auth::UserPass(u.to_string(), p.to_string()) + } else { + return Err(Error::Parse( + "no RPC auth: cookie file missing and --rpcuser/--rpcpassword not set".into(), + )); + }; + Client::new(&url, auth) + } +} diff --git a/crates/mmpl/src/emitter.rs b/crates/mmpl/src/emitter.rs new file mode 100644 index 000000000..7f115268a --- /dev/null +++ b/crates/mmpl/src/emitter.rs @@ -0,0 +1,105 @@ +//! Per-cycle NDJSON emitter. Owns the cycle-over-cycle memory used to +//! turn the always-fresh `Cycle` into change-only events for `tip`, +//! `block`, and `fees`. + +use std::{ + io::{self, Write}, + time::{SystemTime, UNIX_EPOCH}, +}; + +use brk_mempool::Cycle; +use brk_types::{Addr, AddrBytes, BlockHash, NextBlockHash, RecommendedFees, Txid}; +use rustc_hash::FxHashSet; + +use crate::event::Event; + +/// Cycle-over-cycle memory for change-event detection. `None` on the +/// first cycle, so the very first `Tip` / `Block` / `Fees` always +/// fires - downstream consumers get a baseline without a special-case +/// "current state" RPC. +/// +/// `prev_block0` is `None` on cold start so the first `block` event +/// reports the entire template as `added` (one big line, then small +/// deltas). +#[derive(Default)] +pub struct Emitter { + prev_tip_hash: Option, + prev_next_block_hash: Option, + prev_block0: Option>, + prev_fees: Option, +} + +impl Emitter { + /// Writes every event for one cycle and flushes once at the end. + /// Per-line flushes would cost one syscall per event on busy cycles; + /// the cycle period (~500ms) is the real "live" granularity. + pub fn emit(&mut self, out: &mut W, cycle: &Cycle) -> io::Result<()> { + let t = now_secs(); + for tx in &cycle.added { + write_line(out, &Event::enter(t, tx))?; + } + for tx in &cycle.removed { + write_line(out, &Event::leave(t, tx))?; + } + for bytes in &cycle.addr_enters { + Self::emit_addr(out, t, bytes, Event::addr_enter)?; + } + for bytes in &cycle.addr_leaves { + Self::emit_addr(out, t, bytes, Event::addr_leave)?; + } + if self.prev_tip_hash != Some(cycle.tip_hash) { + self.prev_tip_hash = Some(cycle.tip_hash); + write_line(out, &Event::tip(t, cycle.tip_hash, cycle.tip_height))?; + } + let next_block_hash = cycle.snapshot.next_block_hash; + if self.prev_next_block_hash != Some(next_block_hash) { + self.prev_next_block_hash = Some(next_block_hash); + let current: FxHashSet = cycle.snapshot.block0_txids().collect(); + let (added, removed) = match &self.prev_block0 { + Some(prev) => ( + current.difference(prev).copied().collect(), + prev.difference(¤t).copied().collect(), + ), + None => (current.iter().copied().collect(), Vec::new()), + }; + write_line(out, &Event::block(t, next_block_hash, added, removed))?; + self.prev_block0 = Some(current); + } + if self.prev_fees.as_ref() != Some(&cycle.snapshot.fees) { + self.prev_fees = Some(cycle.snapshot.fees.clone()); + write_line(out, &Event::fees(t, &cycle.snapshot.fees))?; + } + write_line(out, &Event::summary(t, cycle))?; + out.flush() + } + + /// Render an `AddrBytes` and emit it via `make_event`. Unrenderable + /// bytes (e.g. exotic non-standard scripts) drop a one-line warning + /// to stderr - the event stream stays clean for downstream `jq`. + fn emit_addr( + out: &mut W, + t: f64, + bytes: &AddrBytes, + make_event: fn(f64, Addr) -> Event, + ) -> io::Result<()> { + match Addr::try_from(bytes) { + Ok(addr) => write_line(out, &make_event(t, addr)), + Err(e) => { + eprintln!("mmpl: skipping addr event: {e}"); + Ok(()) + } + } + } +} + +fn write_line(out: &mut W, ev: &Event) -> io::Result<()> { + serde_json::to_writer(&mut *out, ev).map_err(io::Error::other)?; + out.write_all(b"\n") +} + +fn now_secs() -> f64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs_f64() +} diff --git a/crates/mmpl/src/event.rs b/crates/mmpl/src/event.rs new file mode 100644 index 000000000..65a2583f1 --- /dev/null +++ b/crates/mmpl/src/event.rs @@ -0,0 +1,160 @@ +//! NDJSON event schema. One [`Event`] per line; consumers pipe to +//! `jq` / `grep` to filter. Per-event fields are flat (no nested +//! objects) so `jq -c 'select(...)'` works without `..` walks. + +use brk_mempool::{Cycle, TxAdded, TxRemoval, TxRemoved}; +use brk_types::{ + Addr, BlockHash, FeeRate, Height, NextBlockHash, RecommendedFees, Sats, Timestamp, Txid, VSize, +}; +use serde::Serialize; + +#[derive(Serialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum Event { + /// A tx entered the pool this cycle (either brand new or revived + /// from the graveyard - the stream collapses both to one event). + Enter { + t: f64, + txid: Txid, + vsize: VSize, + fee: Sats, + rate: FeeRate, + first_seen: Timestamp, + }, + /// A tx left the pool this cycle. `rate` is the package-effective + /// rate at burial, not raw fee/vsize. + Leave { + t: f64, + txid: Txid, + #[serde(flatten)] + reason: LeaveReason, + rate: FeeRate, + }, + /// An address went 0 → 1+ live mempool txs this cycle. Same-cycle + /// flip-flops are collapsed by the upstream tracker (no event). + AddrEnter { t: f64, addr: Addr }, + /// An address went 1+ → 0 live mempool txs this cycle. + AddrLeave { t: f64, addr: Addr }, + /// New confirmed block: bitcoind's chain tip moved since the last + /// cycle. `height` is the tip's own height (one less than the next + /// block being templated). + Tip { + t: f64, + hash: BlockHash, + height: Height, + }, + /// The projected next block changed (different tx set or order). + /// `hash` is the same opaque content hash used as the mempool ETag. + /// `added`/`removed` is the txid-level diff against the previous + /// template; on the very first cycle `added` is the full template + /// and `removed` is empty. + Block { + t: f64, + hash: NextBlockHash, + added: Vec, + removed: Vec, + }, + /// Recommended fee rates changed since the last cycle. + Fees { + t: f64, + fastest: FeeRate, + half_hour: FeeRate, + hour: FeeRate, + economy: FeeRate, + minimum: FeeRate, + }, + /// Per-cycle heartbeat. Always emitted, even on idle cycles, so + /// downstream consumers see a steady pulse and can spot stalls. + /// `addr_enters`/`addr_leaves` count the post-cancellation 0↔1+ + /// address transitions this cycle. + Cycle { + t: f64, + added: usize, + removed: usize, + addr_enters: usize, + addr_leaves: usize, + count: usize, + vsize: VSize, + fee: Sats, + took_ms: u64, + }, +} + +#[derive(Serialize)] +#[serde(tag = "reason", rename_all = "snake_case")] +pub enum LeaveReason { + Replaced { by: Txid }, + Vanished, +} + +impl Event { + pub fn enter(t: f64, tx: &TxAdded) -> Self { + Self::Enter { + t, + txid: tx.txid, + vsize: tx.vsize, + fee: tx.fee, + rate: tx.fee_rate, + first_seen: tx.first_seen, + } + } + + pub fn leave(t: f64, tx: &TxRemoved) -> Self { + Self::Leave { + t, + txid: tx.txid, + reason: LeaveReason::from(tx.reason), + rate: tx.chunk_rate, + } + } + + pub fn addr_enter(t: f64, addr: Addr) -> Self { + Self::AddrEnter { t, addr } + } + + pub fn addr_leave(t: f64, addr: Addr) -> Self { + Self::AddrLeave { t, addr } + } + + pub fn tip(t: f64, hash: BlockHash, height: Height) -> Self { + Self::Tip { t, hash, height } + } + + pub fn block(t: f64, hash: NextBlockHash, added: Vec, removed: Vec) -> Self { + Self::Block { t, hash, added, removed } + } + + pub fn fees(t: f64, fees: &RecommendedFees) -> Self { + Self::Fees { + t, + fastest: fees.fastest_fee, + half_hour: fees.half_hour_fee, + hour: fees.hour_fee, + economy: fees.economy_fee, + minimum: fees.minimum_fee, + } + } + + pub fn summary(t: f64, cycle: &Cycle) -> Self { + Self::Cycle { + t, + added: cycle.added.len(), + removed: cycle.removed.len(), + addr_enters: cycle.addr_enters.len(), + addr_leaves: cycle.addr_leaves.len(), + count: cycle.info.count, + vsize: cycle.info.vsize, + fee: cycle.info.total_fee, + took_ms: cycle.took.as_millis() as u64, + } + } +} + +impl From for LeaveReason { + fn from(reason: TxRemoval) -> Self { + match reason { + TxRemoval::Replaced { by } => Self::Replaced { by }, + TxRemoval::Vanished => Self::Vanished, + } + } +} diff --git a/crates/mmpl/src/main.rs b/crates/mmpl/src/main.rs new file mode 100644 index 000000000..166d28ca4 --- /dev/null +++ b/crates/mmpl/src/main.rs @@ -0,0 +1,61 @@ +mod args; +mod emitter; +mod event; +mod usage; + +use std::{ + io::{self, BufWriter}, + process::ExitCode, + thread, + time::{Duration, Instant}, +}; + +use brk_error::Result; +use brk_mempool::Mempool; + +use args::Args; +use emitter::Emitter; + +const PERIOD: Duration = Duration::from_millis(500); + +fn main() -> ExitCode { + match run() { + Ok(()) => ExitCode::SUCCESS, + Err(e) => { + eprintln!("mmpl: {e}"); + ExitCode::from(1) + } + } +} + +fn run() -> Result<()> { + let raw: Vec = std::env::args().skip(1).collect(); + if raw.iter().any(|a| matches!(a.as_str(), "-h" | "--help")) { + usage::print(); + return Ok(()); + } + let args = Args::parse(raw)?; + let client = args.rpc()?; + let mempool = Mempool::new(&client); + + let stdout = io::stdout(); + let mut out = BufWriter::new(stdout.lock()); + let mut emitter = Emitter::default(); + + loop { + let started = Instant::now(); + match mempool.tick() { + Ok(cycle) => match emitter.emit(&mut out, &cycle) { + Ok(()) => {} + // Broken pipe (e.g. `mmpl | head`) is a normal end-of-stream. + Err(e) if e.kind() == io::ErrorKind::BrokenPipe => return Ok(()), + Err(e) => return Err(e.into()), + }, + // Transient RPC failure - log, then retry on the next tick. + Err(e) => eprintln!("mmpl: tick failed: {e}"), + } + if let Some(rest) = PERIOD.checked_sub(started.elapsed()) { + thread::sleep(rest); + } + } +} diff --git a/crates/mmpl/src/usage.rs b/crates/mmpl/src/usage.rs new file mode 100644 index 000000000..b0862a590 --- /dev/null +++ b/crates/mmpl/src/usage.rs @@ -0,0 +1,49 @@ +// Raw string contains `{`/`}` literals (JSON), so it can't be the +// format string of `print!`. Pass via positional arg. +#[allow(clippy::print_literal)] +pub fn print() { + print!( + "{}", + r#"mmpl - stream Bitcoin mempool events as NDJSON + +Usage: + mmpl [options] + +Options: + --bitcoindir Bitcoin data dir (default: platform-specific) + --rpcconnect RPC host (default: localhost) + --rpcport RPC port (default: 8332) + --rpccookiefile Cookie file (default: /.cookie) + --rpcuser RPC username (if no cookie file) + --rpcpassword RPC password (if no cookie file) + -h, --help Show this help + +Events (one JSON object per line): + Per-tx (one event per change): + {"kind":"enter","t":..,"txid":..,"vsize":..,"fee":..,"rate":..,"first_seen":..} + {"kind":"leave","t":..,"txid":..,"reason":"vanished","rate":..} + {"kind":"leave","t":..,"txid":..,"reason":"replaced","by":..,"rate":..} + + Per-address (0 <-> 1+ live mempool txs): + {"kind":"addr_enter","t":..,"addr":..} + {"kind":"addr_leave","t":..,"addr":..} + + State changes (fires only when the value changed): + {"kind":"tip","t":..,"hash":..,"height":..} (new confirmed block) + {"kind":"block","t":..,"hash":..,"added":[txid..],"removed":[txid..]} + (next-block template changed; first cycle + emits the full template as `added`) + {"kind":"fees","t":..,"fastest":..,"half_hour":..,"hour":..,"economy":..,"minimum":..} + + Per-cycle heartbeat (always emitted): + {"kind":"cycle","t":..,"added":N,"removed":N,"addr_enters":N,"addr_leaves":N, + "count":N,"vsize":N,"fee":N,"took_ms":N} + +Examples: + mmpl | jq -c 'select(.kind=="enter" and .rate>=50)' + mmpl | jq -c 'select(.kind=="tip")' + mmpl | grep -v '"kind":"cycle"' + mmpl | jq -c 'select(.reason=="replaced")' +"# + ); +}