global + blk

This commit is contained in:
nym21
2026-05-07 14:02:53 +02:00
parent 9347b42c9a
commit cc9ebfaf42
30 changed files with 1044 additions and 140 deletions

9
Cargo.lock generated
View File

@@ -296,6 +296,15 @@ checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3"
[[package]] [[package]]
name = "blk" name = "blk"
version = "0.3.0-beta.7" version = "0.3.0-beta.7"
dependencies = [
"bitcoin",
"brk_error",
"brk_reader",
"brk_rpc",
"brk_types",
"owo-colors",
"serde_json",
]
[[package]] [[package]]
name = "brk" name = "brk"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "blk" name = "blk"
description = "A command line tool to inspect Bitcoin Core blocks" description = "A CLI to inspect Bitcoin Core blocks"
version.workspace = true version.workspace = true
edition.workspace = true edition.workspace = true
license.workspace = true license.workspace = true
@@ -8,6 +8,13 @@ homepage.workspace = true
repository.workspace = true repository.workspace = true
[dependencies] [dependencies]
bitcoin = { workspace = true }
brk_error = { workspace = true }
brk_reader = { workspace = true }
brk_rpc = { workspace = true }
brk_types = { workspace = true }
owo-colors = { workspace = true }
serde_json = { workspace = true }
[[bin]] [[bin]]
name = "blk" name = "blk"

27
crates/blk/README.md Normal file
View File

@@ -0,0 +1,27 @@
# blk
A CLI to inspect Bitcoin Core blocks.
Reads `blk*.dat` files directly via [`brk_reader`](../brk_reader) and resolves
the chain tip / heights via the Bitcoin Core RPC. Output is shell-friendly:
bare values, NDJSON, pretty JSON, or TSV.
## Install
```sh
cargo install --path crates/blk
```
## Quick start
```sh
blk 800000 hash # bare hash
blk 800000 height hash time # one compact JSON line
blk 800000 tx.0.vout.0.value # coinbase output 0 sats
blk 0..2 hash tx.0.txid # 3 NDJSON lines
blk tip tx.0 # whole coinbase tx as JSON
```
## Reference
Run `blk --help` for the full field/selector/option reference.

131
crates/blk/src/args.rs Normal file
View File

@@ -0,0 +1,131 @@
use std::path::PathBuf;
use brk_error::{Error, Result};
use brk_rpc::{Auth, Client};
use crate::path::Path;
pub struct Args {
pub selector: String,
pub paths: Vec<Path>,
pub pretty: bool,
pub compact: bool,
bitcoindir: Option<PathBuf>,
blocksdir: Option<PathBuf>,
rpcconnect: Option<String>,
rpcport: Option<u16>,
rpccookiefile: Option<PathBuf>,
rpcuser: Option<String>,
rpcpassword: Option<String>,
}
impl Args {
pub fn parse(raw: Vec<String>) -> Result<Self> {
let mut pretty = false;
let mut compact = false;
let mut bitcoindir = None;
let mut blocksdir = None;
let mut rpcconnect = None;
let mut rpcport = None;
let mut rpccookiefile = None;
let mut rpcuser = None;
let mut rpcpassword = None;
let mut positional: Vec<String> = Vec::new();
let mut iter = raw.into_iter();
while let Some(a) = iter.next() {
if a == "-p" || a == "--pretty" {
pretty = true;
continue;
}
if a == "-c" || a == "--compact" {
compact = true;
continue;
}
if let Some(rest) = a.strip_prefix("--") {
let (key, value) = match rest.split_once('=') {
Some((k, v)) => (k.to_string(), v.to_string()),
None => (
rest.to_string(),
iter.next().ok_or_else(|| {
Error::Parse(format!("--{rest} requires a value"))
})?,
),
};
match key.as_str() {
"bitcoindir" => bitcoindir = Some(PathBuf::from(value)),
"blocksdir" => blocksdir = Some(PathBuf::from(value)),
"rpcconnect" => rpcconnect = Some(value),
"rpcport" => {
rpcport = Some(value.parse().map_err(|_| {
Error::Parse(format!("--rpcport: '{value}' is not a valid port"))
})?);
}
"rpccookiefile" => rpccookiefile = Some(PathBuf::from(value)),
"rpcuser" => rpcuser = Some(value),
"rpcpassword" => rpcpassword = Some(value),
other => return Err(Error::Parse(format!("unknown flag --{other}"))),
}
continue;
}
positional.push(a);
}
let mut iter = positional.into_iter();
let selector = iter
.next()
.ok_or_else(|| Error::Parse("missing selector".into()))?;
let paths: Vec<Path> = iter.map(|f| Path::parse(&f)).collect::<Result<_>>()?;
if paths.is_empty() {
return Err(Error::Parse(
"missing field. ask for at least one (e.g. `blk 0 hash`)".into(),
));
}
Ok(Self {
selector,
paths,
pretty,
compact,
bitcoindir,
blocksdir,
rpcconnect,
rpcport,
rpccookiefile,
rpcuser,
rpcpassword,
})
}
pub fn bitcoin_dir(&self) -> PathBuf {
self.bitcoindir
.clone()
.unwrap_or_else(Client::default_bitcoin_path)
}
pub fn blocks_dir(&self) -> PathBuf {
self.blocksdir
.clone()
.unwrap_or_else(|| self.bitcoin_dir().join("blocks"))
}
pub fn rpc(&self) -> Result<Client> {
let host = self.rpcconnect.as_deref().unwrap_or("localhost");
let port = self.rpcport.unwrap_or(8332);
let url = format!("http://{host}:{port}");
let cookie = self
.rpccookiefile
.clone()
.unwrap_or_else(|| self.bitcoin_dir().join(".cookie"));
let auth = if cookie.is_file() {
Auth::CookieFile(cookie)
} else if let (Some(u), Some(p)) =
(self.rpcuser.as_deref(), self.rpcpassword.as_deref())
{
Auth::UserPass(u.to_string(), p.to_string())
} else {
return Err(Error::Parse(
"no RPC auth: cookie file missing and --rpcuser/--rpcpassword not set".into(),
));
};
Client::new(&url, auth)
}
}

309
crates/blk/src/fields.rs Normal file
View File

@@ -0,0 +1,309 @@
use std::cell::OnceCell;
use bitcoin::{
Address, Block, Network, ScriptBuf, Transaction, TxIn, TxOut, consensus::encode::serialize_hex,
hex::DisplayHex,
};
use brk_error::{Error, Result};
use brk_types::ReadBlock;
use serde_json::{Value, json};
use crate::path::{Path, Step};
pub struct Ctx<'a> {
block: &'a ReadBlock,
size_weight: OnceCell<(usize, usize)>,
}
impl<'a> Ctx<'a> {
pub fn new(block: &'a ReadBlock) -> Self {
Self {
block,
size_weight: OnceCell::new(),
}
}
pub fn resolve(&self, path: &Path) -> Result<Value> {
let (step, rest) = pop(&path.steps)?;
let b = self.block;
let raw: &Block = b;
let scalar = |v| scalar_leaf(v, step, rest);
match step.name.as_str() {
"height" => scalar(json!(*b.height())),
"hash" => scalar(json!(b.hash().to_string())),
"time" => scalar(json!(b.header.time)),
"version" => scalar(json!(b.header.version.to_consensus())),
"version_hex" => scalar(json!(format!(
"{:08x}",
b.header.version.to_consensus() as u32
))),
"bits" => scalar(json!(b.header.bits.to_consensus())),
"nonce" => scalar(json!(b.header.nonce)),
"prev" => scalar(json!(b.header.prev_blockhash.to_string())),
"merkle" => scalar(json!(b.header.merkle_root.to_string())),
"difficulty" => scalar(json!(b.header.difficulty_float())),
"txs" => scalar(json!(b.txdata.len())),
"n_inputs" => scalar(json!(
b.txdata.iter().map(|tx| tx.input.len()).sum::<usize>()
)),
"n_outputs" => scalar(json!(
b.txdata.iter().map(|tx| tx.output.len()).sum::<usize>()
)),
"witness_txs" => scalar(json!(
b.txdata.iter().filter(|tx| tx_has_witness(tx)).count()
)),
"size" => scalar(json!(self.size_and_weight().0)),
"weight" => scalar(json!(self.size_and_weight().1)),
"strippedsize" => {
let (size, weight) = self.size_and_weight();
scalar(json!((weight - size) / 3))
}
"subsidy" => scalar(json!(subsidy_sats(*b.height()))),
"header_hex" => scalar(json!(serialize_hex(&b.header))),
"hex" => scalar(json!(serialize_hex(raw))),
"coinbase" => scalar(json!(b.coinbase_tag().as_str())),
"tx" => pick(&b.txdata, step, rest, |i, tx| resolve_tx(tx, i == 0, rest)),
other => Err(unknown("block", other)),
}
}
pub fn resolve_str(&self, path: &Path) -> Result<String> {
Ok(match self.resolve(path)? {
Value::String(s) => s,
other => other.to_string(),
})
}
fn size_and_weight(&self) -> (usize, usize) {
*self
.size_weight
.get_or_init(|| self.block.total_size_and_weight())
}
}
fn resolve_tx(tx: &Transaction, is_coinbase: bool, steps: &[Step]) -> Result<Value> {
if steps.is_empty() {
return Ok(tx_to_value(tx, is_coinbase));
}
let (step, rest) = pop(steps)?;
let scalar = |v| scalar_leaf(v, step, rest);
match step.name.as_str() {
"txid" => scalar(json!(tx.compute_txid().to_string())),
"wtxid" => scalar(json!(tx.compute_wtxid().to_string())),
"version" => scalar(json!(tx.version.0)),
"locktime" => scalar(json!(tx.lock_time.to_consensus_u32())),
"size" => scalar(json!(tx.total_size())),
"base_size" => scalar(json!(tx.base_size())),
"vsize" => scalar(json!(tx.vsize())),
"weight" => scalar(json!(tx.weight().to_wu())),
"inputs" => scalar(json!(tx.input.len())),
"outputs" => scalar(json!(tx.output.len())),
"is_coinbase" => scalar(json!(is_coinbase)),
"has_witness" => scalar(json!(tx_has_witness(tx))),
"is_rbf" => scalar(json!(tx_is_rbf(tx))),
"total_out" => scalar(json!(tx_total_out(tx))),
"hex" => scalar(json!(serialize_hex(tx))),
"vin" => pick(&tx.input, step, rest, |j, vin| {
resolve_vin(vin, is_coinbase && j == 0, rest)
}),
"vout" => pick(&tx.output, step, rest, |_, vout| resolve_vout(vout, rest)),
other => Err(unknown("tx", other)),
}
}
fn resolve_vin(vin: &TxIn, is_coinbase: bool, steps: &[Step]) -> Result<Value> {
if steps.is_empty() {
return Ok(vin_to_value(vin, is_coinbase));
}
let (step, rest) = pop(steps)?;
let scalar = |v| scalar_leaf(v, step, rest);
match step.name.as_str() {
"prev_txid" => scalar(json!(vin.previous_output.txid.to_string())),
"prev_vout" => scalar(json!(vin.previous_output.vout)),
"sequence" => scalar(json!(vin.sequence.0)),
"script_sig" => scalar(json!(vin.script_sig.to_hex_string())),
"script_sig_asm" => scalar(json!(vin.script_sig.to_asm_string())),
"witness" => scalar(witness_to_value(vin)),
"has_witness" => scalar(json!(!vin.witness.is_empty())),
"is_rbf" => scalar(json!(vin.sequence.is_rbf())),
"coinbase" => scalar(json!(is_coinbase)),
other => Err(unknown("vin", other)),
}
}
fn resolve_vout(vout: &TxOut, steps: &[Step]) -> Result<Value> {
if steps.is_empty() {
return Ok(vout_to_value(vout));
}
let (step, rest) = pop(steps)?;
let scalar = |v| scalar_leaf(v, step, rest);
match step.name.as_str() {
"value" => scalar(json!(vout.value.to_sat())),
"script_pubkey" => scalar(json!(vout.script_pubkey.to_hex_string())),
"script_pubkey_asm" => scalar(json!(vout.script_pubkey.to_asm_string())),
"type" => scalar(json!(script_type(&vout.script_pubkey))),
"address" => scalar(address_value(&vout.script_pubkey)),
other => Err(unknown("vout", other)),
}
}
fn pick<T>(
items: &[T],
step: &Step,
_rest: &[Step],
mut resolve: impl FnMut(usize, &T) -> Result<Value>,
) -> Result<Value> {
match step.index {
Some(i) => {
let item = items
.get(i)
.ok_or_else(|| out_of_range(&step.name, i, items.len()))?;
resolve(i, item)
}
None => Ok(Value::Array(
items
.iter()
.enumerate()
.map(|(i, item)| resolve(i, item))
.collect::<Result<_>>()?,
)),
}
}
fn pop(steps: &[Step]) -> Result<(&Step, &[Step])> {
steps
.split_first()
.ok_or_else(|| Error::Parse("empty path segment".into()))
}
fn scalar_leaf(v: Value, step: &Step, rest: &[Step]) -> Result<Value> {
if step.index.is_some() {
return Err(Error::Parse(format!("'{}' is not an array", step.name)));
}
if !rest.is_empty() {
return Err(Error::Parse(format!(
"'{}' is a scalar; nothing to drill into",
step.name
)));
}
Ok(v)
}
fn out_of_range(name: &str, i: usize, len: usize) -> Error {
Error::Parse(format!("{name}.{i} out of range (len {len})"))
}
fn unknown(level: &str, name: &str) -> Error {
Error::Parse(format!(
"unknown {level} field '{name}' (run `blk --help` for the list)"
))
}
fn tx_to_value(tx: &Transaction, is_coinbase: bool) -> Value {
let vin: Vec<Value> = tx
.input
.iter()
.enumerate()
.map(|(j, v)| vin_to_value(v, is_coinbase && j == 0))
.collect();
let vout: Vec<Value> = tx.output.iter().map(vout_to_value).collect();
json!({
"txid": tx.compute_txid().to_string(),
"wtxid": tx.compute_wtxid().to_string(),
"version": tx.version.0,
"locktime": tx.lock_time.to_consensus_u32(),
"size": tx.total_size(),
"base_size": tx.base_size(),
"vsize": tx.vsize(),
"weight": tx.weight().to_wu(),
"inputs": tx.input.len(),
"outputs": tx.output.len(),
"is_coinbase": is_coinbase,
"has_witness": tx_has_witness(tx),
"is_rbf": tx_is_rbf(tx),
"total_out": tx_total_out(tx),
"hex": serialize_hex(tx),
"vin": vin,
"vout": vout,
})
}
fn vin_to_value(vin: &TxIn, is_coinbase: bool) -> Value {
json!({
"prev_txid": vin.previous_output.txid.to_string(),
"prev_vout": vin.previous_output.vout,
"sequence": vin.sequence.0,
"script_sig": vin.script_sig.to_hex_string(),
"script_sig_asm": vin.script_sig.to_asm_string(),
"witness": witness_to_value(vin),
"has_witness": !vin.witness.is_empty(),
"is_rbf": vin.sequence.is_rbf(),
"coinbase": is_coinbase,
})
}
fn vout_to_value(vout: &TxOut) -> Value {
json!({
"value": vout.value.to_sat(),
"script_pubkey": vout.script_pubkey.to_hex_string(),
"script_pubkey_asm": vout.script_pubkey.to_asm_string(),
"type": script_type(&vout.script_pubkey),
"address": address_value(&vout.script_pubkey),
})
}
fn tx_has_witness(tx: &Transaction) -> bool {
tx.input.iter().any(|i| !i.witness.is_empty())
}
fn tx_is_rbf(tx: &Transaction) -> bool {
tx.input.iter().any(|i| i.sequence.is_rbf())
}
fn tx_total_out(tx: &Transaction) -> u64 {
tx.output.iter().map(|o| o.value.to_sat()).sum()
}
fn subsidy_sats(height: u32) -> u64 {
let halvings = height / 210_000;
if halvings >= 64 {
0
} else {
(50 * 100_000_000u64) >> halvings
}
}
fn witness_to_value(vin: &TxIn) -> Value {
Value::Array(
vin.witness
.iter()
.map(|w| Value::String(w.to_lower_hex_string()))
.collect(),
)
}
fn script_type(s: &ScriptBuf) -> &'static str {
if s.is_p2pkh() {
"p2pkh"
} else if s.is_p2sh() {
"p2sh"
} else if s.is_p2wpkh() {
"p2wpkh"
} else if s.is_p2wsh() {
"p2wsh"
} else if s.is_p2tr() {
"p2tr"
} else if s.is_op_return() {
"op_return"
} else if s.is_p2pk() {
"p2pk"
} else {
"unknown"
}
}
fn address_value(s: &ScriptBuf) -> Value {
Address::from_script(s, Network::Bitcoin)
.map(|a| Value::String(a.to_string()))
.unwrap_or(Value::Null)
}

View File

@@ -0,0 +1,66 @@
use brk_error::Result;
use serde_json::{Map, Value};
use crate::{fields::Ctx, mode::Mode, path::Path};
pub struct Formatter {
mode: Mode,
fields: Vec<Path>,
}
impl Formatter {
pub fn new(mode: Mode, fields: Vec<Path>) -> Self {
Self { mode, fields }
}
pub fn format(&self, ctx: &Ctx) -> Result<String> {
match self.mode {
Mode::Bare => self.bare(ctx),
Mode::Tsv => self.tsv(ctx),
Mode::Json => Ok(serde_json::to_string(&self.object(ctx)?)?),
Mode::Pretty => Ok(serde_json::to_string_pretty(&self.object(ctx)?)?),
}
}
fn bare(&self, ctx: &Ctx) -> Result<String> {
let mut out = String::new();
flatten(&ctx.resolve(&self.fields[0])?, &mut out);
Ok(out)
}
fn tsv(&self, ctx: &Ctx) -> Result<String> {
let mut row = String::new();
for (i, path) in self.fields.iter().enumerate() {
if i > 0 {
row.push('\t');
}
for c in ctx.resolve_str(path)?.chars() {
row.push(if matches!(c, '\t' | '\n' | '\r') { ' ' } else { c });
}
}
Ok(row)
}
fn object(&self, ctx: &Ctx) -> Result<Value> {
let mut obj = Map::with_capacity(self.fields.len());
for path in &self.fields {
obj.insert(path.raw.clone(), ctx.resolve(path)?);
}
Ok(Value::Object(obj))
}
}
fn flatten(v: &Value, out: &mut String) {
match v {
Value::Array(arr) => arr.iter().for_each(|item| flatten(item, out)),
Value::String(s) => push_line(out, s),
other => push_line(out, &other.to_string()),
}
}
fn push_line(out: &mut String, s: &str) {
if !out.is_empty() {
out.push('\n');
}
out.push_str(s);
}

View File

@@ -1 +1,52 @@
fn main() {} mod args;
mod fields;
mod formatter;
mod mode;
mod path;
mod selector;
mod usage;
use std::process::ExitCode;
use brk_error::Result;
use brk_reader::Reader;
use args::Args;
use fields::Ctx;
use formatter::Formatter;
use mode::Mode;
use selector::Selector;
fn main() -> ExitCode {
match run() {
Ok(()) => ExitCode::SUCCESS,
Err(e) => {
eprintln!("blk: {e}");
ExitCode::from(1)
}
}
}
fn run() -> Result<()> {
let raw: Vec<String> = std::env::args().skip(1).collect();
if raw.is_empty() || raw.iter().any(|a| matches!(a.as_str(), "-h" | "--help")) {
usage::print();
return Ok(());
}
let args = Args::parse(raw)?;
let client = args.rpc()?;
let (start, end) = Selector::parse(&args.selector, &client)?;
let mode = Mode::pick(args.pretty, args.compact, args.paths.len());
let reader = Reader::new(args.blocks_dir(), &client);
let formatter = Formatter::new(mode, args.paths);
for block in reader.range(start, end)?.iter() {
let block = block?;
let line = formatter.format(&Ctx::new(&block))?;
if !line.is_empty() {
println!("{line}");
}
}
Ok(())
}

21
crates/blk/src/mode.rs Normal file
View File

@@ -0,0 +1,21 @@
#[derive(Clone, Copy)]
pub enum Mode {
Bare,
Tsv,
Json,
Pretty,
}
impl Mode {
pub fn pick(pretty: bool, compact: bool, n_fields: usize) -> Self {
if pretty {
Self::Pretty
} else if n_fields == 1 {
Self::Bare
} else if compact {
Self::Tsv
} else {
Self::Json
}
}
}

40
crates/blk/src/path.rs Normal file
View File

@@ -0,0 +1,40 @@
use brk_error::{Error, Result};
pub struct Step {
pub name: String,
pub index: Option<usize>,
}
pub struct Path {
pub raw: String,
pub steps: Vec<Step>,
}
impl Path {
pub fn parse(s: &str) -> Result<Self> {
let parts: Vec<&str> = s.split('.').collect();
let mut steps = Vec::new();
let mut i = 0;
while i < parts.len() {
let name = parts[i];
if name.is_empty() {
return Err(Error::Parse(format!("bad path '{s}': empty segment")));
}
if name.parse::<usize>().is_ok() {
return Err(Error::Parse(format!(
"bad path '{s}': '{name}' must follow a field name"
)));
}
let index = parts.get(i + 1).and_then(|p| p.parse::<usize>().ok());
steps.push(Step {
name: name.to_string(),
index,
});
i += if index.is_some() { 2 } else { 1 };
}
Ok(Self {
raw: s.to_string(),
steps,
})
}
}

View File

@@ -0,0 +1,40 @@
use brk_error::{Error, Result};
use brk_rpc::Client;
use brk_types::{CheckedSub, Height};
pub struct Selector;
impl Selector {
pub fn parse(s: &str, client: &Client) -> Result<(Height, Height)> {
let (start, end) = match s.split_once("..") {
Some((a, b)) => (Self::endpoint(a, client)?, Self::endpoint(b, client)?),
None => {
let h = Self::endpoint(s, client)?;
(h, h)
}
};
if end < start {
return Err(Error::Parse(format!("range end {end} before start {start}")));
}
Ok((start, end))
}
fn endpoint(s: &str, client: &Client) -> Result<Height> {
if s == "tip" {
return client.get_last_height();
}
if let Some(rest) = s.strip_prefix("tip-") {
let n: u32 = rest
.parse()
.map_err(|_| Error::Parse(format!("bad tip offset: {s}")))?;
let tip = client.get_last_height()?;
return tip
.checked_sub(n)
.ok_or_else(|| Error::Parse(format!("tip-{n} underflows genesis")));
}
let n: u32 = s
.parse()
.map_err(|_| Error::Parse(format!("bad height: {s}")))?;
Ok(Height::new(n))
}
}

155
crates/blk/src/usage.rs Normal file
View File

@@ -0,0 +1,155 @@
use owo_colors::OwoColorize;
const SEL_W: usize = 5; // longest selector token: "tip-N"
const LABEL_W: usize = 28; // longest label across OUTPUT/OPTIONS/EXAMPLES (= example cmd "blk 800000 tx.0.vout.0.value")
const FLAG_W: usize = 15; // longest flag: "--rpccookiefile"
const PH_W: usize = LABEL_W - FLAG_W - 1; // placeholder column width so flag+ph total = LABEL_W
const GAP: usize = 4;
pub fn print() {
println!("{} - inspect a Bitcoin Core block", "blk".bold());
println!();
section("USAGE");
println!(
" blk {} {} [field ...] [OPTIONS]",
"<selector>".bright_black(),
"<field>".bright_black()
);
println!();
section("SELECTOR");
sel("<n>", "single height (e.g. 800000)");
sel("tip", "current chain tip");
sel("tip-N", "tip minus N");
sel("a..b", "inclusive range, endpoints can be height/tip/tip-N");
println!();
section("FIELDS");
println!(
" {}",
"dotted paths drill into nested data; omit an index for arrays"
.bright_black()
);
println!();
group("block");
fields(&[
"height, hash, time, version, version_hex, bits, nonce,",
"prev, merkle, difficulty, txs, n_inputs, n_outputs,",
"witness_txs, size, strippedsize, weight, subsidy, coinbase,",
"header_hex, hex",
]);
println!();
group_note("tx.i", "omit i for all txs");
fields(&[
"txid, wtxid, version, locktime, size, base_size, vsize,",
"weight, inputs, outputs, is_coinbase, has_witness, is_rbf,",
"total_out, hex",
]);
println!();
group_note("tx.i.vin.j", "omit j for all inputs");
fields(&[
"prev_txid, prev_vout, sequence, script_sig, script_sig_asm,",
"witness, has_witness, is_rbf, coinbase",
]);
println!();
group_note("tx.i.vout.j", "omit j for all outputs");
fields(&["value, script_pubkey, script_pubkey_asm, type, address"]);
println!();
println!(
" {}",
"Naked tx / tx.i / vin / vout returns the whole sub-object as JSON."
.bright_black()
);
println!();
section("OUTPUT");
out("1 field", "bare value, one per line");
out("2+ fields", "compact JSON object, one per line (NDJSON)");
out("-p, --pretty", "pretty JSON object instead");
out("-c, --compact", "tab-separated values, no field names (TSV)");
println!();
section("OPTIONS");
opt("--bitcoindir", "<PATH>", "Bitcoin directory", Some("[OS default]"));
opt("--blocksdir", "<PATH>", "Blocks directory", Some("[<bitcoindir>/blocks]"));
opt("--rpcconnect", "<IP>", "RPC host", Some("[localhost]"));
opt("--rpcport", "<PORT>", "RPC port", Some("[8332]"));
opt("--rpccookiefile", "<PATH>", "RPC cookie file", Some("[<bitcoindir>/.cookie]"));
opt("--rpcuser", "<USERNAME>", "RPC username", None);
opt("--rpcpassword", "<PASSWORD>", "RPC password", None);
println!();
section("EXAMPLES");
ex("blk 800000 hash", "bare hash");
ex("blk 800000 height hash time", "one compact JSON line");
ex("blk 800000 tx.0.txid", "coinbase txid");
ex("blk 800000 tx.txid", "all txids in block (array)");
ex("blk 800000 tx.0.vout.0.value", "coinbase output 0 sats");
ex("blk 800000 tx.0.vout.value", "all output sats for tx 0");
ex("blk 800000 tx.vout.value", "array of arrays (per tx)");
ex("blk 0..2 hash tx.0.txid", "3 NDJSON lines");
ex("blk tip tx.0", "whole coinbase tx as JSON");
}
fn section(name: &str) {
println!("{}", format!("{name}:").bold());
}
fn group(name: &str) {
println!(" {}", format!("{name}:").bold());
}
fn group_note(name: &str, note: &str) {
println!(
" {} {}",
format!("{name}:").bold(),
format!("({note})").bright_black()
);
}
fn fields(lines: &[&str]) {
for line in lines {
println!(" {line}");
}
}
fn pad(s: &str, width: usize) -> String {
" ".repeat(width.saturating_sub(s.len()))
}
fn sel(token: &str, desc: &str) {
println!(
" {}{}{}{desc}",
token.bright_black(),
pad(token, SEL_W),
" ".repeat(GAP),
);
}
fn out(label: &str, desc: &str) {
println!(" {label}{}{}{desc}", pad(label, LABEL_W), " ".repeat(GAP));
}
fn opt(flag: &str, ph: &str, desc: &str, default: Option<&str>) {
let head = format!(
" {flag}{} {}{}{}",
pad(flag, FLAG_W),
ph.bright_black(),
pad(ph, PH_W),
" ".repeat(GAP),
);
match default {
Some(d) => println!("{head}{desc} {}", d.bright_black()),
None => println!("{head}{desc}"),
}
}
fn ex(cmd: &str, note: &str) {
println!(
" {cmd}{}{}{}",
pad(cmd, LABEL_W),
" ".repeat(GAP),
format!("# {note}").bright_black()
);
}

View File

@@ -138,7 +138,7 @@ impl Lengths {
} }
/// Read current local lengths. `None` pre-genesis. /// Read current local lengths. `None` pre-genesis.
pub fn from_local(vecs: &mut Vecs, stores: &Stores) -> Option<Self> { pub fn from_local(vecs: &Vecs, stores: &Stores) -> Option<Self> {
let height = vecs.next_height().min(stores.next_height()); let height = vecs.next_height().min(stores.next_height());
Self::collect_at(height, vecs) Self::collect_at(height, vecs)
} }
@@ -146,7 +146,7 @@ impl Lengths {
/// Read lengths to resume at `required_height`. Reorg-aware: /// Read lengths to resume at `required_height`. Reorg-aware:
/// - if local is ahead, clamp down to `required_height`; /// - if local is ahead, clamp down to `required_height`;
/// - if local is behind, return `None` (caller must full-reset). /// - if local is behind, return `None` (caller must full-reset).
pub fn resume_at(required_height: Height, vecs: &mut Vecs, stores: &Stores) -> Option<Self> { pub fn resume_at(required_height: Height, vecs: &Vecs, stores: &Stores) -> Option<Self> {
let local = vecs.next_height().min(stores.next_height()); let local = vecs.next_height().min(stores.next_height());
if local < required_height { if local < required_height {
return None; return None;
@@ -163,7 +163,7 @@ impl Lengths {
Self::collect_at(height, vecs) Self::collect_at(height, vecs)
} }
fn collect_at(height: Height, vecs: &mut Vecs) -> Option<Self> { fn collect_at(height: Height, vecs: &Vecs) -> Option<Self> {
Some(Self { Some(Self {
empty_output_index: next_index( empty_output_index: next_index(
&vecs.scripts.empty.first_index, &vecs.scripts.empty.first_index,

View File

@@ -85,12 +85,17 @@ impl Indexer {
let tip_blockhash = vecs.blocks.blockhash.collect_last().unwrap_or_default(); let tip_blockhash = vecs.blocks.blockhash.collect_last().unwrap_or_default();
let safe_lengths = SafeLengths::new();
if let Some(lengths) = Lengths::from_local(&vecs, &stores) {
safe_lengths.advance(lengths);
}
Ok(Self { Ok(Self {
path: indexed_path.clone(), path: indexed_path.clone(),
vecs, vecs,
stores, stores,
tip_blockhash: Arc::new(RwLock::new(tip_blockhash)), tip_blockhash: Arc::new(RwLock::new(tip_blockhash)),
safe_lengths: SafeLengths::new(), safe_lengths,
}) })
}; };
@@ -157,7 +162,7 @@ impl Indexer {
let (starting_lengths, prev_hash) = if let Some(hash) = last_blockhash { let (starting_lengths, prev_hash) = if let Some(hash) = last_blockhash {
let (height, hash) = client.get_closest_valid_height(hash)?; let (height, hash) = client.get_closest_valid_height(hash)?;
match Lengths::resume_at(height.incremented(), &mut self.vecs, &self.stores) { match Lengths::resume_at(height.incremented(), &self.vecs, &self.stores) {
Some(starting_lengths) => { Some(starting_lengths) => {
if starting_lengths.height > client.get_last_height()? { if starting_lengths.height > client.get_last_height()? {
info!("Up to date, nothing to index."); info!("Up to date, nothing to index.");
@@ -368,7 +373,7 @@ impl Indexer {
/// bg ingest first so stores are queryable at the new bound. /// bg ingest first so stores are queryable at the new bound.
pub fn advance_safe_lengths(&mut self) -> Result<()> { pub fn advance_safe_lengths(&mut self) -> Result<()> {
self.vecs.db.sync_bg_tasks()?; self.vecs.db.sync_bg_tasks()?;
if let Some(lengths) = Lengths::from_local(&mut self.vecs, &self.stores) { if let Some(lengths) = Lengths::from_local(&self.vecs, &self.stores) {
self.safe_lengths.advance(lengths); self.safe_lengths.advance(lengths);
} }
Ok(()) Ok(())

View File

@@ -199,6 +199,28 @@ impl AddrsVecs {
.into_par_iter() .into_par_iter()
} }
pub fn iter_any(&self) -> impl Iterator<Item = &dyn AnyStoredVec> {
[
&self.p2pk65.first_index as &dyn AnyStoredVec,
&self.p2pk33.first_index,
&self.p2pkh.first_index,
&self.p2sh.first_index,
&self.p2wpkh.first_index,
&self.p2wsh.first_index,
&self.p2tr.first_index,
&self.p2a.first_index,
&self.p2pk65.bytes,
&self.p2pk33.bytes,
&self.p2pkh.bytes,
&self.p2sh.bytes,
&self.p2wpkh.bytes,
&self.p2wsh.bytes,
&self.p2tr.bytes,
&self.p2a.bytes,
]
.into_iter()
}
/// Get address bytes by output type, using the cached VecReader for the specific address type. /// Get address bytes by output type, using the cached VecReader for the specific address type.
/// Returns None if the index doesn't exist yet. /// Returns None if the index doesn't exist yet.
pub fn get_bytes_by_type( pub fn get_bytes_by_type(

View File

@@ -109,4 +109,20 @@ impl BlocksVecs {
] ]
.into_par_iter() .into_par_iter()
} }
pub fn iter_any(&self) -> impl Iterator<Item = &dyn AnyStoredVec> {
[
&self.blockhash.inner as &dyn AnyStoredVec,
&self.coinbase_tag,
&self.difficulty,
&self.timestamp.inner,
&self.total,
&self.weight,
&self.position,
&self.segwit_txs,
&self.segwit_size,
&self.segwit_weight,
]
.into_iter()
}
} }

View File

@@ -57,4 +57,15 @@ impl InputsVecs {
] ]
.into_par_iter() .into_par_iter()
} }
pub fn iter_any(&self) -> impl Iterator<Item = &dyn AnyStoredVec> {
[
&self.first_txin_index as &dyn AnyStoredVec,
&self.outpoint,
&self.tx_index,
&self.output_type,
&self.type_index,
]
.into_iter()
}
} }

View File

@@ -126,8 +126,8 @@ impl Vecs {
Ok(()) Ok(())
} }
pub fn next_height(&mut self) -> Height { pub fn next_height(&self) -> Height {
self.par_iter_mut_any_stored_vec() self.iter_any_stored_vec()
.map(|vec| { .map(|vec| {
let h = Height::from(vec.stamp()); let h = Height::from(vec.stamp());
if h > Height::ZERO { h.incremented() } else { h } if h > Height::ZERO { h.incremented() } else { h }
@@ -172,4 +172,14 @@ impl Vecs {
.chain(self.addrs.par_iter_mut_any()) .chain(self.addrs.par_iter_mut_any())
.chain(self.scripts.par_iter_mut_any()) .chain(self.scripts.par_iter_mut_any())
} }
fn iter_any_stored_vec(&self) -> impl Iterator<Item = &dyn AnyStoredVec> {
self.blocks
.iter_any()
.chain(self.transactions.iter_any())
.chain(self.inputs.iter_any())
.chain(self.outputs.iter_any())
.chain(self.addrs.iter_any())
.chain(self.scripts.iter_any())
}
} }

View File

@@ -64,4 +64,15 @@ impl OutputsVecs {
] ]
.into_par_iter() .into_par_iter()
} }
pub fn iter_any(&self) -> impl Iterator<Item = &dyn AnyStoredVec> {
[
&self.first_txout_index as &dyn AnyStoredVec,
&self.value,
&self.output_type,
&self.type_index,
&self.tx_index,
]
.into_iter()
}
} }

View File

@@ -120,4 +120,18 @@ impl ScriptsVecs {
] ]
.into_par_iter() .into_par_iter()
} }
pub fn iter_any(&self) -> impl Iterator<Item = &dyn AnyStoredVec> {
[
&self.empty.first_index as &dyn AnyStoredVec,
&self.op_return.first_index,
&self.p2ms.first_index,
&self.unknown.first_index,
&self.empty.to_tx_index,
&self.op_return.to_tx_index,
&self.p2ms.to_tx_index,
&self.unknown.to_tx_index,
]
.into_iter()
}
} }

View File

@@ -142,4 +142,21 @@ impl TransactionsVecs {
] ]
.into_par_iter() .into_par_iter()
} }
pub fn iter_any(&self) -> impl Iterator<Item = &dyn AnyStoredVec> {
[
&self.first_tx_index as &dyn AnyStoredVec,
&self.txid,
&self.tx_version,
&self.raw_locktime,
&self.base_size,
&self.total_size,
&self.total_sigop_cost,
&self.is_explicitly_rbf,
&self.first_txin_index,
&self.first_txout_index,
&self.position,
]
.into_iter()
}
} }

2
crates/brk_mempool/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
*.md
!README.md

View File

@@ -96,14 +96,12 @@ impl Mempool {
self.0.state.addrs.read().stats_hash(addr) self.0.state.addrs.read().stats_hash(addr)
} }
/// Look up the mempool tx that spends `(txid, vout)`. Returns /// Mempool tx spending `(txid, vout)`, or `None`. The spender's
/// `(spender_txid, vin)` if the outpoint is spent in the mempool, /// input list is walked to rule out `TxidPrefix` collisions.
/// `None` otherwise. The spender's input list is walked to rule
/// out a `TxidPrefix` collision before returning a match.
pub fn lookup_spender(&self, txid: &Txid, vout: Vout) -> Option<(Txid, Vin)> { pub fn lookup_spender(&self, txid: &Txid, vout: Vout) -> Option<(Txid, Vin)> {
let key = OutpointPrefix::new(TxidPrefix::from(txid), vout); let key = OutpointPrefix::new(TxidPrefix::from(txid), vout);
let txs = self.0.state.txs.read(); let txs = self.txs();
let entries = self.0.state.entries.read(); let entries = self.entries();
let outpoint_spends = self.0.state.outpoint_spends.read(); let outpoint_spends = self.0.state.outpoint_spends.read();
let idx = outpoint_spends.get(&key)?; let idx = outpoint_spends.get(&key)?;
let spender_txid = entries.slot(idx)?.txid; let spender_txid = entries.slot(idx)?.txid;
@@ -168,9 +166,7 @@ impl Mempool {
} }
/// Live mempool txs touching `addr`, newest first by `first_seen`, /// Live mempool txs touching `addr`, newest first by `first_seen`,
/// capped at `limit`. Acquires `txs`, `addrs`, `entries` in canonical /// capped at `limit`. Returns owned `Transaction`s.
/// order; returns owned `Transaction`s so the lock is released
/// before the caller does anything else with them.
pub fn addr_txs(&self, addr: &AddrBytes, limit: usize) -> Vec<Transaction> { pub fn addr_txs(&self, addr: &AddrBytes, limit: usize) -> Vec<Transaction> {
let txs = self.txs(); let txs = self.txs();
let addrs = self.addrs(); let addrs = self.addrs();
@@ -211,9 +207,8 @@ impl Mempool {
f(&mut iter) f(&mut iter)
} }
/// Effective fee rate for a live mempool tx: the seed's chunk rate from /// Effective fee rate for a live tx: seed's snapshot chunk rate,
/// the latest snapshot, with fall-back to the entry's simple `fee/vsize` /// falling back to the entry's `fee/vsize` if not yet in the snapshot.
/// when the snapshot doesn't yet contain it.
pub fn live_effective_fee_rate(&self, prefix: &TxidPrefix) -> Option<FeeRate> { pub fn live_effective_fee_rate(&self, prefix: &TxidPrefix) -> Option<FeeRate> {
let entries = self.entries(); let entries = self.entries();
if let Some(seed_idx) = entries.idx_of(prefix) if let Some(seed_idx) = entries.idx_of(prefix)
@@ -232,9 +227,9 @@ impl Mempool {
} }
/// `first_seen` Unix-second timestamps for `txids`, in input order. /// `first_seen` Unix-second timestamps for `txids`, in input order.
/// Returns 0 for unknown txids. `Vanished` graveyard tombstones fall /// Returns 0 for unknown txids. `Vanished` tombstones fall back to
/// back to the buried entry's `first_seen` so a tx doesn't flicker /// the buried entry's `first_seen` to avoid flicker between drop
/// to 0 in the brief window between mempool drop and indexer catch-up. /// and indexer catch-up.
pub fn transaction_times(&self, txids: &[Txid]) -> Vec<u64> { pub fn transaction_times(&self, txids: &[Txid]) -> Vec<u64> {
let entries = self.entries(); let entries = self.entries();
let graveyard = self.graveyard(); let graveyard = self.graveyard();
@@ -260,11 +255,8 @@ impl Mempool {
} }
/// Variant of `start` that runs `after_update` after every cycle. /// Variant of `start` that runs `after_update` after every cycle.
/// /// Both steps are wrapped in `catch_unwind` so a panic doesn't
/// `update` and `after_update` are wrapped in `catch_unwind` so an /// freeze the snapshot; `parking_lot` locks don't poison.
/// unexpected panic in either step doesn't kill the loop and freeze
/// the mempool snapshot. `parking_lot` locks don't poison, so state
/// remains usable after a panic.
pub fn start_with(&self, mut after_update: impl FnMut()) { pub fn start_with(&self, mut after_update: impl FnMut()) {
loop { loop {
let outcome = catch_unwind(AssertUnwindSafe(|| { let outcome = catch_unwind(AssertUnwindSafe(|| {
@@ -288,9 +280,8 @@ impl Mempool {
} }
/// Fill remaining `prevout == None` inputs via an external /// Fill remaining `prevout == None` inputs via an external
/// resolver (typically the brk indexer for confirmed parents). /// resolver (typically the indexer for confirmed parents).
/// Same-cycle in-mempool parents are filled automatically by /// In-mempool parents are filled automatically each cycle.
/// `Resolver::resolve_in_mempool` after each `Applier::apply`.
pub fn fill_prevouts<F>(&self, resolver: F) -> bool pub fn fill_prevouts<F>(&self, resolver: F) -> bool
where where
F: Fn(&Txid, Vout) -> Option<TxOut>, F: Fn(&Txid, Vout) -> Option<TxOut>,

View File

@@ -1,13 +1,7 @@
//! RBF (Replace-By-Fee) tree extraction from the live mempool + //! RBF tree extraction. Returns owned trees so the caller can enrich
//! graveyard. //! with indexer data (`mined`, effective fee rate) after the lock
//! //! drops: enriching under the lock re-enters `Mempool` and would
//! Both methods return owned, lock-free `RbfNode` trees so the caller //! recursively acquire the same read locks.
//! (typically `brk_query`) can enrich each node with indexer-resident
//! data (`mined`, effective fee rate) after the mempool lock window
//! closes. Doing the enrichment under the lock would re-enter
//! `Mempool` indirectly via `effective_fee_rate` and recursively
//! acquire the same `entries`/`graveyard` read locks, which can
//! deadlock against a queued writer in `parking_lot`.
use brk_types::{Sats, Timestamp, Transaction, Txid, TxidPrefix, VSize}; use brk_types::{Sats, Timestamp, Transaction, Txid, TxidPrefix, VSize};
use rustc_hash::FxHashSet; use rustc_hash::FxHashSet;
@@ -17,15 +11,11 @@ use crate::{
stores::{EntryPool, TxGraveyard}, stores::{EntryPool, TxGraveyard},
}; };
/// One node in an RBF replacement tree, populated entirely from
/// mempool state. The caller layers on `mined` and effective fee rate
/// after the lock has been released.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RbfNode { pub struct RbfNode {
pub txid: Txid, pub txid: Txid,
pub fee: Sats, pub fee: Sats,
pub vsize: VSize, pub vsize: VSize,
/// Sum of the tx's output amounts.
pub value: Sats, pub value: Sats,
pub first_seen: Timestamp, pub first_seen: Timestamp,
/// BIP-125 signaling: at least one input has sequence < 0xffffffff-1. /// BIP-125 signaling: at least one input has sequence < 0xffffffff-1.
@@ -35,21 +25,18 @@ pub struct RbfNode {
pub replaces: Vec<RbfNode>, pub replaces: Vec<RbfNode>,
} }
/// Result of [`Mempool::rbf_for_tx`].
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub struct RbfForTx { pub struct RbfForTx {
/// Tree rooted at the requested tx's terminal replacer. `None` if /// Tree rooted at the terminal replacer. `None` if `txid` is unknown.
/// the tx is unknown to both the live pool and the graveyard.
pub root: Option<RbfNode>, pub root: Option<RbfNode>,
/// Direct predecessors of the requested tx (txids only). /// Direct predecessors of the requested tx (txids only).
pub replaces: Vec<Txid>, pub replaces: Vec<Txid>,
} }
impl Mempool { impl Mempool {
/// Build the RBF tree relevant to `txid`: walk forward through /// Walk forward through `Replaced { by }` to the terminal replacer
/// `Replaced { by }` links to the terminal replacer, return its /// and return its full predecessor tree, plus the requested tx's
/// full predecessor tree, plus the requested tx's own direct /// direct predecessors. Single read-lock window in canonical order.
/// predecessors. Single read-lock window in canonical order.
pub fn rbf_for_tx(&self, txid: &Txid) -> RbfForTx { pub fn rbf_for_tx(&self, txid: &Txid) -> RbfForTx {
let txs = self.txs(); let txs = self.txs();
let entries = self.entries(); let entries = self.entries();
@@ -61,10 +48,9 @@ impl Mempool {
RbfForTx { root, replaces } RbfForTx { root, replaces }
} }
/// Recent terminal-replacer trees, most-recent replacement first, /// Recent terminal-replacer trees, most-recent first, deduplicated
/// deduplicated by tree root, capped at `limit`. When /// by root, capped at `limit`. `full_rbf_only` drops trees with no
/// `full_rbf_only` is true, drops trees with no non-signaling /// non-signaling predecessor.
/// predecessor anywhere.
pub fn recent_rbf_trees(&self, full_rbf_only: bool, limit: usize) -> Vec<RbfNode> { pub fn recent_rbf_trees(&self, full_rbf_only: bool, limit: usize) -> Vec<RbfNode> {
let txs = self.txs(); let txs = self.txs();
let entries = self.entries(); let entries = self.entries();

View File

@@ -17,8 +17,6 @@ pub struct MempoolStats {
} }
impl From<&Mempool> for MempoolStats { impl From<&Mempool> for MempoolStats {
/// Acquires every sub-lock in canonical order to build a coherent
/// snapshot. Cheap; locks are released as soon as the counts are read.
fn from(mempool: &Mempool) -> Self { fn from(mempool: &Mempool) -> Self {
let state = mempool.state(); let state = mempool.state();
let info = state.info.read(); let info = state.info.read();

View File

@@ -37,17 +37,10 @@ impl Applier {
return; return;
}; };
if !s.txs.contains(&txid) { if !s.txs.contains(&txid) {
// entries had this prefix but txs didn't — a state divergence // Skip bury on entries/txs divergence: freeing the slot here
// that should be impossible: publish/bury both touch them // would let outpoint_spends point at a slot the next insert
// together under one write_all guard. Reaching this branch // recycles for an unrelated tx.
// means a prior cycle left the two stores out of sync (e.g. warn!("mempool bury: entry present but tx missing for txid={txid}");
// panic mid-publish before `txs.extend` ran). Skip the bury
// entirely: freeing the entries slot here would let
// outpoint_spends point at a slot the next insert recycles
// for an unrelated tx.
warn!(
"mempool bury: entry present but tx missing for txid={txid} - skipping bury to preserve outpoint_spends integrity"
);
return; return;
} }
let (idx, entry) = s.entries.remove(prefix).expect("entry present"); let (idx, entry) = s.entries.remove(prefix).expect("entry present");

View File

@@ -89,10 +89,9 @@ impl Rebuilder {
} }
} }
/// Returns true iff dirty and the throttle window has elapsed. On /// True iff dirty and the throttle window has elapsed. The dirty
/// success, starts a new throttle window. The dirty bit is cleared /// bit is cleared in `tick` only after `publish` returns, so a
/// by `tick` only after `publish` returns, so a panic in /// panic in `build_snapshot` retries on the next cycle.
/// `build_snapshot` leaves dirty set and the next cycle retries.
fn try_claim_rebuild(&self) -> bool { fn try_claim_rebuild(&self) -> bool {
if !self.dirty.load(Ordering::Acquire) { if !self.dirty.load(Ordering::Acquire) {
self.skip_clean.fetch_add(1, Ordering::Relaxed); self.skip_clean.fetch_add(1, Ordering::Relaxed);

View File

@@ -3,23 +3,11 @@ use parking_lot::{RwLock, RwLockWriteGuard};
use super::{AddrTracker, EntryPool, OutpointSpends, TxGraveyard, TxStore}; use super::{AddrTracker, EntryPool, OutpointSpends, TxGraveyard, TxStore};
/// The six buckets making up live mempool state. /// The six buckets making up live mempool state. Each has its own
/// /// `RwLock`. Multi-lock code must follow the canonical order
/// Each bucket has its own `RwLock` so readers of different buckets /// `info → txs → addrs → entries → outpoint_spends → graveyard` to
/// don't contend with each other. Any code that takes more than one /// avoid circular waits. External callers go through bundled
/// lock must follow the canonical partial order /// `Mempool` methods so they can't take the order wrong.
/// `info → txs → addrs → entries → outpoint_spends → graveyard`,
/// otherwise a reader-holds-A-wants-B / writer-holds-B-wants-A
/// circular wait can deadlock. The Applier takes all six write locks
/// in this order for a brief window once per cycle via
/// [`MempoolState::write_all`]; multi-lock readers inside the crate
/// take a (canonical-order) subset inline.
///
/// This discipline is *internal* to `brk_mempool`: external crates
/// only see `Mempool` methods that bundle each multi-lock operation
/// behind a single call (e.g. `Mempool::lookup_spender`,
/// `Mempool::addr_txs`, `Mempool::rbf_for_tx`), so callers can never
/// take the order wrong because they don't get to choose.
#[derive(Default)] #[derive(Default)]
pub struct MempoolState { pub struct MempoolState {
pub(crate) info: RwLock<MempoolInfo>, pub(crate) info: RwLock<MempoolInfo>,

View File

@@ -48,7 +48,9 @@ impl Query {
for h in start..=tip { for h in start..=tip {
let block_ts = ts_cursor.get(h).data()?; let block_ts = ts_cursor.get(h).data()?;
if block_ts <= target { if block_ts <= target {
best = Some((h, block_ts)); if best.is_none_or(|(_, bts)| block_ts > bts) {
best = Some((h, block_ts));
}
above_streak = 0; above_streak = 0;
} else { } else {
above_streak += 1; above_streak += 1;
@@ -63,12 +65,15 @@ impl Query {
for h in (0..start).rev() { for h in (0..start).rev() {
let block_ts = ts_cursor.get(h).data()?; let block_ts = ts_cursor.get(h).data()?;
if block_ts <= target { if block_ts <= target {
best = Some((h, block_ts)); if best.is_none_or(|(_, bts)| block_ts > bts) {
break; best = Some((h, block_ts));
} }
above_streak += 1; above_streak = 0;
if above_streak >= MTP_TERMINAL_STREAK { } else {
break; above_streak += 1;
if above_streak >= MTP_TERMINAL_STREAK {
break;
}
} }
} }
} }

View File

@@ -50,14 +50,8 @@ impl Query {
} }
/// Fill any `prevout == None` inputs on live mempool txs from the /// Fill any `prevout == None` inputs on live mempool txs from the
/// indexer, mutating them in place. Cheap when the unresolved set /// indexer. Driver calls this once per cycle right after
/// is empty (the steady-state with `-txindex` on); otherwise resolves /// `mempool.update()`. Returns true if at least one was filled.
/// each missing prevout via the same lookup chain used for confirmed
/// txs: `txid → tx_index → first_txout_index + vout → output_type
/// / type_index / value → script_pubkey`.
///
/// Driver calls this once per cycle, right after `mempool.update()`.
/// Returns true if at least one prevout was filled.
pub fn fill_mempool_prevouts(&self) -> bool { pub fn fill_mempool_prevouts(&self) -> bool {
let Some(mempool) = self.mempool() else { let Some(mempool) = self.mempool() else {
return false; return false;
@@ -101,12 +95,10 @@ impl Query {
Ok(self.require_mempool()?.recent_txs()) Ok(self.require_mempool()?.recent_txs())
} }
/// RBF history for a tx, matching mempool.space's /// RBF history for a tx. Matches mempool.space's
/// `GET /api/v1/tx/:txid/rbf`. The mempool builds the owned /// `GET /api/v1/tx/:txid/rbf`. Mempool builds the owned tree under
/// replacement tree (terminal replacer + recursive predecessors) /// one read-lock window; this then layers on `mined` + effective
/// under one read-lock window; this method then enriches each node /// fee rate from the indexer/computer.
/// with `mined` + effective fee rate, both of which need the
/// indexer/computer.
pub fn tx_rbf(&self, txid: &Txid) -> Result<RbfResponse> { pub fn tx_rbf(&self, txid: &Txid) -> Result<RbfResponse> {
let RbfForTx { root, replaces } = self.require_mempool()?.rbf_for_tx(txid); let RbfForTx { root, replaces } = self.require_mempool()?.rbf_for_tx(txid);
let replacements = root.map(|n| self.enrich_rbf_node(n, None)); let replacements = root.map(|n| self.enrich_rbf_node(n, None));
@@ -117,14 +109,10 @@ impl Query {
}) })
} }
/// Recent RBF replacements across the whole mempool, matching /// Recent RBF replacements. Matches mempool.space's
/// mempool.space's `GET /api/v1/replacements` and /// `GET /api/v1/replacements` and `GET /api/v1/fullrbf/replacements`.
/// `GET /api/v1/fullrbf/replacements`. Each entry is a complete /// Most-recent first, capped at 25. `full_rbf_only` keeps only
/// replacement tree rooted at the terminal replacer; same shape as /// trees with at least one non-signaling predecessor.
/// `tx_rbf().replacements`. Ordered by most-recent replacement
/// event first and capped at 25 entries. When `full_rbf_only` is
/// true, only trees with at least one non-signaling predecessor
/// are returned.
pub fn recent_replacements(&self, full_rbf_only: bool) -> Result<Vec<ReplacementNode>> { pub fn recent_replacements(&self, full_rbf_only: bool) -> Result<Vec<ReplacementNode>> {
Ok(self Ok(self
.require_mempool()? .require_mempool()?
@@ -134,10 +122,9 @@ impl Query {
.collect()) .collect())
} }
/// Layer indexer-resident data (`mined`, effective fee rate) onto /// Layer `mined` + effective fee rate onto an `RbfNode` tree.
/// a `RbfNode` tree. Runs after the mempool lock window has closed /// Must run after the mempool lock has dropped (effective_fee_rate
/// because `effective_fee_rate` re-enters `Mempool` and would /// re-enters Mempool).
/// recursively acquire the same read locks otherwise.
fn enrich_rbf_node( fn enrich_rbf_node(
&self, &self,
node: RbfNode, node: RbfNode,
@@ -176,18 +163,14 @@ impl Query {
} }
} }
/// `first_seen` Unix-second timestamps for each txid, matching /// `first_seen` Unix-second timestamps. Matches mempool.space's
/// mempool.space's `POST /api/v1/transaction-times`. Returns 0 for /// `POST /api/v1/transaction-times`. Returns 0 for unknowns.
/// unknown txids, in input order.
pub fn transaction_times(&self, txids: &[Txid]) -> Result<Vec<u64>> { pub fn transaction_times(&self, txids: &[Txid]) -> Result<Vec<u64>> {
Ok(self.require_mempool()?.transaction_times(txids)) Ok(self.require_mempool()?.transaction_times(txids))
} }
/// Opaque content hash that changes whenever the projected next /// Content hash of the projected next block. Same value as the
/// block changes. Same value used as the mempool ETag, surfaced as /// mempool ETag. Polling lets monitors detect a stalled sync.
/// JSON so external monitors can detect a frozen update loop by
/// polling: if the value doesn't change for tens of seconds on a
/// live network, the mempool sync has stalled.
pub fn mempool_hash(&self) -> Result<u64> { pub fn mempool_hash(&self) -> Result<u64> {
Ok(self.require_mempool()?.next_block_hash()) Ok(self.require_mempool()?.next_block_hash())
} }

View File

@@ -97,13 +97,10 @@ impl Query {
// ── Transaction queries ──────────────────────────────────────── // ── Transaction queries ────────────────────────────────────────
/// Resolve a tx body across the three sources in order: live mempool, /// Resolve a tx body: live mempool → indexer → `Vanished` tombstone.
/// indexer (via `indexed`), then `Vanished` graveyard tombstone. /// The tombstone fallback covers the race where a mined tx has been
/// The graveyard fallback only fires when the indexer reports /// buried but `safe_lengths.tx_index` hasn't caught up. `Replaced`
/// `UnknownTxid`, covering the brief race where a mined tx has been /// tombstones are excluded since they will never confirm.
/// buried by `Applier` but `safe_lengths.tx_index` has not yet
/// advanced to cover it. `Replaced` tombstones are excluded — those
/// txs will never confirm.
fn lookup_tx<R>( fn lookup_tx<R>(
&self, &self,
txid: &Txid, txid: &Txid,