diff --git a/Cargo.lock b/Cargo.lock index 41cc273dd..cf9a84d6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -363,7 +363,9 @@ dependencies = [ [[package]] name = "brk-corepc-client" -version = "0.11.0" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a735b1f9b9e14301a267946f66b5e72bb147ce6e002ac1a34f5857f6849a5e24" dependencies = [ "bitcoin", "brk-corepc-jsonrpc", @@ -375,7 +377,9 @@ dependencies = [ [[package]] name = "brk-corepc-jsonrpc" -version = "0.19.0" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eb0983c8009f2d34fa8dce3f08c2ab3aa1ea0cf092cc47be8934219b0b383eb" dependencies = [ "base64 0.22.1", "serde", @@ -384,7 +388,9 @@ dependencies = [ [[package]] name = "brk-corepc-types" -version = "0.11.0" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3f711507c9872a538ab57241486a3c59bf5827651a725928b862f296828f9b0" dependencies = [ "bitcoin", "serde", @@ -641,7 +647,6 @@ dependencies = [ "crossbeam", "derive_more", "parking_lot", - "rayon", "rlimit", "rustc-hash", "tracing", @@ -2545,6 +2550,8 @@ dependencies = [ [[package]] name = "rawdb" version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23b5d5fae99af33e8d0c82763b890c469dcf18b48600ed78b0d70fce4dbe189" dependencies = [ "libc", "log", @@ -3437,6 +3444,8 @@ checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" [[package]] name = "vecdb" version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5fe60956ddba8c141ca8020aaf5bea55683475b83d19006c5f44b85c71bf974" dependencies = [ "itoa", "libc", @@ -3458,6 +3467,8 @@ dependencies = [ [[package]] name = "vecdb_derive" version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "789897c1999d5d74f977020ad3d449846df046194103a4afcbac6d49baeaaffc" dependencies = [ "quote", "syn", diff --git a/Cargo.toml b/Cargo.toml index 730f0f4a0..9c0975564 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,10 +65,10 @@ brk_types = { version = "0.3.0-beta.1", path = "crates/brk_types" } brk_website = { version = "0.3.0-beta.1", path = "crates/brk_website" } byteview = "0.10.1" color-eyre = "0.6.5" -# corepc-client = { package = "brk-corepc-client", version = "0.11.0", features = ["client-sync"] } -corepc-client = { package = "brk-corepc-client", path = "../corepc/client", features = ["client-sync"] } -# corepc-jsonrpc = { package = "brk-corepc-jsonrpc", version = "0.19.0", features = ["simple_http"], default-features = false } -corepc-jsonrpc = { package = "brk-corepc-jsonrpc", path = "../corepc/jsonrpc", features = ["simple_http"], default-features = false } +corepc-client = { package = "brk-corepc-client", version = "0.11.1", features = ["client-sync"] } +# corepc-client = { package = "brk-corepc-client", path = "../corepc/client", features = ["client-sync"] } +corepc-jsonrpc = { package = "brk-corepc-jsonrpc", version = "0.19.1", features = ["simple_http"], default-features = false } +# corepc-jsonrpc = { package = "brk-corepc-jsonrpc", path = "../corepc/jsonrpc", features = ["simple_http"], default-features = false } derive_more = { version = "2.1.1", features = ["deref", "deref_mut"] } fjall = "=3.0.4" indexmap = { version = "2.14.0", features = ["serde"] } @@ -89,8 +89,8 @@ tower-http = { version = "0.6.8", features = ["catch-panic", "compression-br", " tower-layer = "0.3" tracing = { version = "0.1", default-features = false, features = ["std"] } ureq = { version = "3.3.0", features = ["json"] } -# vecdb = { version = "0.9.3", features = ["derive", "serde_json", "pco", "schemars"] } -vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco", "schemars"] } +vecdb = { version = "0.9.3", features = ["derive", "serde_json", "pco", "schemars"] } +# vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco", "schemars"] } [workspace.metadata.release] shared-version = true diff --git a/crates/brk_iterator/src/lib.rs b/crates/brk_iterator/src/lib.rs index ad3a68be0..195fddd3e 100644 --- a/crates/brk_iterator/src/lib.rs +++ b/crates/brk_iterator/src/lib.rs @@ -88,12 +88,12 @@ impl Blocks { if count <= 10 { State::new_rpc(client.clone(), start, end, hash_opt) } else { - State::new_reader(reader.clone(), start, end, hash_opt) + State::new_reader(reader.clone(), start, end, hash_opt)? } } Source::Rpc { client } => State::new_rpc(client.clone(), start, end, hash_opt), Source::Reader { reader, .. } => { - State::new_reader(reader.clone(), start, end, hash_opt) + State::new_reader(reader.clone(), start, end, hash_opt)? } }; diff --git a/crates/brk_iterator/src/state.rs b/crates/brk_iterator/src/state.rs index 859b74b28..eaf70cc5f 100644 --- a/crates/brk_iterator/src/state.rs +++ b/crates/brk_iterator/src/state.rs @@ -1,5 +1,6 @@ use std::vec; +use brk_error::Result; use brk_reader::{Reader, Receiver}; use brk_rpc::Client; use brk_types::{BlockHash, Height, ReadBlock}; @@ -40,10 +41,10 @@ impl State { start: Height, end: Height, after_hash: Option, - ) -> Self { - State::Reader { - receiver: reader.read(Some(start), Some(end)), + ) -> Result { + Ok(State::Reader { + receiver: reader.range(start, end)?, after_hash, - } + }) } } diff --git a/crates/brk_reader/Cargo.toml b/crates/brk_reader/Cargo.toml index 68b8ce3bb..a892e9512 100644 --- a/crates/brk_reader/Cargo.toml +++ b/crates/brk_reader/Cargo.toml @@ -19,6 +19,5 @@ crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } derive_more = { workspace = true } tracing = { workspace = true } parking_lot = { workspace = true } -rayon = { workspace = true } rlimit = "0.11.0" rustc-hash = { workspace = true } diff --git a/crates/brk_reader/README.md b/crates/brk_reader/README.md index ed9a1b403..bcf3a5242 100644 --- a/crates/brk_reader/README.md +++ b/crates/brk_reader/README.md @@ -1,12 +1,14 @@ # brk_reader -Streams Bitcoin blocks from Bitcoin Core's raw `blk*.dat` files in chain order. +Streams Bitcoin blocks from Bitcoin Core's raw `blk*.dat` files in +canonical chain order, skipping orphans. ## Requirements A running Bitcoin Core node with RPC access. The reader needs: -- The `blocks/` directory (for `blk*.dat` files) -- RPC connection (to resolve block heights and filter orphan blocks) + +- The `blocks/` directory (to read `blk*.dat` files) +- RPC connection (to resolve the canonical chain up front) ## Quick Start @@ -18,42 +20,62 @@ let client = Client::new( )?; let reader = Reader::new(bitcoin_dir.join("blocks"), &client); -// Stream the entire chain -for block in reader.read(None, None) { +// Everything from genesis to the current tip +for block in reader.after(None)?.iter() { println!("{}: {}", block.height(), block.hash()); } -// Or a specific range (inclusive) -for block in reader.read(Some(Height::new(800_000)), Some(Height::new(850_000))) { +// Everything strictly after a known hash (typical sync / catchup pattern) +for block in reader.after(Some(last_known_hash))?.iter() { + // ... +} + +// A specific inclusive height range +for block in reader.range(Height::new(800_000), Height::new(850_000))?.iter() { // ... } ``` +`Reader` is thread-safe and cheap to clone (Arc-backed). + ## What You Get Each `ReadBlock` gives you access to: -| Field | Description | -|-------|-------------| -| `block.height()` | Block height | -| `block.hash()` | Block hash | -| `block.header` | Block header (timestamp, nonce, difficulty, ...) | -| `block.txdata` | All transactions | -| `block.coinbase_tag()` | Miner's coinbase tag | -| `block.metadata()` | Position in the blk file | -| `block.tx_metadata()` | Per-transaction blk file positions | - -`Reader` is thread-safe and cheap to clone (Arc-backed). +| Field | Description | +| ---------------------- | ---------------------------------------- | +| `block.height()` | Block height | +| `block.hash()` | Block hash | +| `block.header` | Block header (timestamp, nonce, ...) | +| `block.txdata` | All transactions | +| `block.coinbase_tag()` | Miner's coinbase tag | +| `block.metadata()` | Position in the blk file | +| `block.tx_metadata()` | Per-transaction blk file positions | ## How It Works -Three-thread pipeline connected by bounded channels: +Two-stage pipeline, one reader thread plus `N` parser threads +(default `N = 1`, configurable via `after_with` / `range_with`): ```text -blk*.dat ──► File Reader ──► Parser Pool ──► Orderer ──► Receiver - 1 thread up to 4 1 thread +canonical chain ──► Reader thread ──► Parser pool ──► Receiver +(pre-fetched walks blk files, N workers in canonical order + hashes via RPC) peeks headers, decode bodies + ships hits ``` -1. **File reader** binary-searches to the starting blk file, scans for magic bytes, segments raw blocks -2. **Parser pool** XOR-decodes and deserializes blocks in parallel, skips out-of-range blocks via header timestamp, filters orphans via RPC -3. **Orderer** buffers out-of-order arrivals, validates `prev_blockhash` continuity, emits blocks sequentially +1. **`CanonicalRange`** asks bitcoind once, up front, for the canonical + block hash at every height in the target window — one batched + JSON-RPC call, no per-block RPC chatter. +2. **Reader thread** walks blk files in order, scans each for block + magic, and for every block found hashes its 80-byte header and + looks the hash up in the canonical map. Orphans short-circuit + before the block bytes are cloned. +3. **Parser pool** (scoped threads) fully decodes canonical bodies in + parallel and serialises output through an in-order reorder buffer. + The consumer always receives blocks in canonical-height order. + +Orphans can never be mistaken for canonical blocks, and a missing +canonical block produces a hard error instead of a silent drop. See +`src/pipeline.rs` for the orchestration and `src/canonical.rs` for the +filter map. diff --git a/crates/brk_reader/examples/after_bench.rs b/crates/brk_reader/examples/after_bench.rs index 2f289c203..55affad99 100644 --- a/crates/brk_reader/examples/after_bench.rs +++ b/crates/brk_reader/examples/after_bench.rs @@ -1,12 +1,12 @@ -//! End-to-end benchmark: `Reader::after` (rayon-parallel + reorder thread) -//! versus `Reader::after_canonical` (1 reader + N parser threads + canonical -//! hash filter). +//! Benchmark `Reader::after` / `Reader::after_with` across three +//! parser-thread counts (1 / 4 / 16). //! //! Three phases: //! -//! 1. **Tail scenarios** — pick an anchor `N` blocks below the chain tip -//! and run each implementation `REPEATS` times. Exercises the tail -//! (≤10) and forward (>10) code paths under realistic catchup ranges. +//! 1. **Tail scenarios** — pick an anchor `N` blocks below the chain +//! tip and run each config `REPEATS` times. Exercises the tail +//! (≤10) and forward (>10) code paths under realistic catchup +//! ranges. //! 2. **Partial reindex** — anchor=`None` but stop after //! `PARTIAL_LIMIT` blocks. Exercises the early-chain blk files //! where blocks are small and dense-parsing isn't the bottleneck. @@ -29,6 +29,7 @@ use brk_types::{BlockHash, Height, ReadBlock}; const SCENARIOS: &[usize] = &[5, 10, 100, 1_000, 10_000]; const REPEATS: usize = 3; const PARTIAL_LIMIT: usize = 400_000; +const PARSER_COUNTS: &[usize] = &[1, 4, 16]; fn main() -> Result<()> { let bitcoin_dir = Client::default_bitcoin_path(); @@ -42,75 +43,64 @@ fn main() -> Result<()> { println!("Tip: {tip}"); println!(); println!( - "{:>10} {:>16} {:>12} {:>12} {:>10}", - "blocks", "impl", "best", "avg", "blk/s" + "{:>10} {:>12} {:>12} {:>12} {:>10}", + "blocks", "parsers", "best", "avg", "blk/s" ); - println!("{}", "-".repeat(68)); + println!("{}", "-".repeat(64)); for &n in SCENARIOS { let anchor_height = Height::from(tip.saturating_sub(n as u32)); let anchor_hash = client.get_block_hash(*anchor_height as u64)?; let anchor = Some(BlockHash::from(anchor_hash)); - let after = bench(REPEATS, || reader.after(anchor.clone()))?; - print_row(n, "after", &after); - - let canonical_1 = bench(REPEATS, || reader.after_canonical(anchor.clone()))?; - print_row(n, "canonical[p=1]", &canonical_1); - - let canonical_4 = - bench(REPEATS, || reader.after_canonical_with(anchor.clone(), 4))?; - print_row(n, "canonical[p=4]", &canonical_4); - - let canonical_16 = - bench(REPEATS, || reader.after_canonical_with(anchor.clone(), 16))?; - print_row(n, "canonical[p=16]", &canonical_16); - - sanity_check(n, &after, &canonical_1); - sanity_check(n, &after, &canonical_4); - sanity_check(n, &after, &canonical_16); + let mut first: Option = None; + for &p in PARSER_COUNTS { + let stats = bench(REPEATS, || reader.after_with(anchor.clone(), p))?; + print_row(n, p, &stats); + if let Some(baseline) = &first { + sanity_check(n, baseline, &stats); + } else { + first = Some(stats); + } + } println!(); } println!(); println!("Partial reindex (genesis → {PARTIAL_LIMIT} blocks), one run per config:"); println!( - "{:>10} {:>16} {:>12} {:>10}", - "blocks", "impl", "elapsed", "blk/s" + "{:>10} {:>12} {:>12} {:>10}", + "blocks", "parsers", "elapsed", "blk/s" ); - println!("{}", "-".repeat(54)); - - let after_partial = run_bounded(PARTIAL_LIMIT, || reader.after(None))?; - print_full_row("after", &after_partial); - let p1_partial = run_bounded(PARTIAL_LIMIT, || reader.after_canonical(None))?; - print_full_row("canonical[p=1]", &p1_partial); - sanity_check_full(&after_partial, &p1_partial); - let p4_partial = run_bounded(PARTIAL_LIMIT, || reader.after_canonical_with(None, 4))?; - print_full_row("canonical[p=4]", &p4_partial); - sanity_check_full(&after_partial, &p4_partial); - let p16_partial = run_bounded(PARTIAL_LIMIT, || reader.after_canonical_with(None, 16))?; - print_full_row("canonical[p=16]", &p16_partial); - sanity_check_full(&after_partial, &p16_partial); + println!("{}", "-".repeat(50)); + let mut partial_baseline: Option = None; + for &p in PARSER_COUNTS { + let run = run_bounded(PARTIAL_LIMIT, || reader.after_with(None, p))?; + print_full_row(p, &run); + if let Some(baseline) = &partial_baseline { + sanity_check_full(baseline, &run); + } else { + partial_baseline = Some(run); + } + } println!(); println!("Full reindex (genesis → tip), one run per config:"); println!( - "{:>10} {:>16} {:>12} {:>10}", - "blocks", "impl", "elapsed", "blk/s" + "{:>10} {:>12} {:>12} {:>10}", + "blocks", "parsers", "elapsed", "blk/s" ); - println!("{}", "-".repeat(54)); - - let after_full = run_once(|| reader.after(None))?; - print_full_row("after", &after_full); - let p1_full = run_once(|| reader.after_canonical(None))?; - print_full_row("canonical[p=1]", &p1_full); - sanity_check_full(&after_full, &p1_full); - let p4_full = run_once(|| reader.after_canonical_with(None, 4))?; - print_full_row("canonical[p=4]", &p4_full); - sanity_check_full(&after_full, &p4_full); - let p16_full = run_once(|| reader.after_canonical_with(None, 16))?; - print_full_row("canonical[p=16]", &p16_full); - sanity_check_full(&after_full, &p16_full); + println!("{}", "-".repeat(50)); + let mut full_baseline: Option = None; + for &p in PARSER_COUNTS { + let run = run_once(|| reader.after_with(None, p))?; + print_full_row(p, &run); + if let Some(baseline) = &full_baseline { + sanity_check_full(baseline, &run); + } else { + full_baseline = Some(run); + } + } Ok(()) } @@ -152,28 +142,28 @@ where }) } -fn print_row(requested: usize, label: &str, s: &RunStats) { +fn print_row(requested: usize, parsers: usize, s: &RunStats) { let blk_per_s = if s.best.is_zero() { 0.0 } else { s.count as f64 / s.best.as_secs_f64() }; println!( - "{:>10} {:>16} {:>12?} {:>12?} {:>10.0}", - requested, label, s.best, s.avg, blk_per_s + "{:>10} {:>12} {:>12?} {:>12?} {:>10.0}", + requested, parsers, s.best, s.avg, blk_per_s ); } -fn sanity_check(requested: usize, after: &RunStats, canonical: &RunStats) { - if after.count != canonical.count { +fn sanity_check(requested: usize, baseline: &RunStats, stats: &RunStats) { + if baseline.count != stats.count { println!( - " ⚠ block count mismatch: after={} canonical={}", - after.count, canonical.count + " ⚠ block count mismatch: {} vs {}", + baseline.count, stats.count ); - } else if after.count != requested { + } else if baseline.count != requested { println!( " (note: got {} blocks, requested {}; tip may have advanced)", - after.count, requested + baseline.count, requested ); } } @@ -221,23 +211,23 @@ where Ok(FullRun { elapsed, count }) } -fn print_full_row(label: &str, run: &FullRun) { +fn print_full_row(parsers: usize, run: &FullRun) { let blk_per_s = if run.elapsed.is_zero() { 0.0 } else { run.count as f64 / run.elapsed.as_secs_f64() }; println!( - "{:>10} {:>16} {:>12?} {:>10.0}", - run.count, label, run.elapsed, blk_per_s + "{:>10} {:>12} {:>12?} {:>10.0}", + run.count, parsers, run.elapsed, blk_per_s ); } -fn sanity_check_full(after: &FullRun, canonical: &FullRun) { - if after.count != canonical.count { +fn sanity_check_full(baseline: &FullRun, run: &FullRun) { + if baseline.count != run.count { println!( - " ⚠ block count mismatch vs after: {} vs {}", - after.count, canonical.count + " ⚠ block count mismatch: {} vs {}", + baseline.count, run.count ); } } diff --git a/crates/brk_reader/examples/blk_heights.rs b/crates/brk_reader/examples/blk_heights.rs index 801f170da..bba87aef3 100644 --- a/crates/brk_reader/examples/blk_heights.rs +++ b/crates/brk_reader/examples/blk_heights.rs @@ -18,7 +18,7 @@ fn main() -> Result<()> { let mut max_drop_at: u16 = 0; for (&blk_index, blk_path) in blk_map.iter() { - match reader.get_first_block_height(blk_path, xor_bytes) { + match reader.first_block_height(blk_path, xor_bytes) { Ok(height) => { let h = u32::from(height); let drop = prev_height.map(|p| p.saturating_sub(h)).unwrap_or(0); diff --git a/crates/brk_reader/examples/reader.rs b/crates/brk_reader/examples/reader.rs index d2518ab9f..aab012083 100644 --- a/crates/brk_reader/examples/reader.rs +++ b/crates/brk_reader/examples/reader.rs @@ -12,9 +12,9 @@ fn main() -> Result<()> { let reader = Reader::new(bitcoin_dir.join("blocks"), &client); - // Stream all blocks + // Stream all blocks from genesis to the current tip. let i = std::time::Instant::now(); - for block in reader.read(None, None) { + for block in reader.after(None)?.iter() { println!("{}: {}", block.height(), block.hash()); } println!("Full read: {:?}", i.elapsed()); diff --git a/crates/brk_reader/examples/reader_single.rs b/crates/brk_reader/examples/reader_single.rs index 9025d3d8a..11f3a50d0 100644 --- a/crates/brk_reader/examples/reader_single.rs +++ b/crates/brk_reader/examples/reader_single.rs @@ -19,7 +19,7 @@ fn main() -> Result<()> { let height = Height::new(h); let i = std::time::Instant::now(); - if let Some(block) = reader.read(Some(height), Some(height)).iter().next() { + if let Some(block) = reader.range(height, height)?.iter().next() { println!( "height={} hash={} txs={} coinbase=\"{:?}\" ({:?})", block.height(), diff --git a/crates/brk_reader/src/canonical.rs b/crates/brk_reader/src/canonical.rs index f9a03a5ca..31bc05064 100644 --- a/crates/brk_reader/src/canonical.rs +++ b/crates/brk_reader/src/canonical.rs @@ -1,71 +1,18 @@ -//! Canonical-hash pipeline for `Reader::after`. +//! `CanonicalRange`: a pre-fetched map from canonical block hash to +//! offset-from-`start`. The reader uses this as the authoritative +//! filter for "is this block on the main chain?". //! -//! Bitcoin Core stores accepted blocks in append-only `blk*.dat` files -//! under the data dir, XOR-encoded with a per-datadir key. A "blk -//! file" contains every block the node ever accepted — including -//! blocks that were later orphaned by a reorg — in acceptance order, -//! not height order. This module turns "give me every block after -//! `hash` up to the tip" into an ordered `ReadBlock` stream drawn from -//! those files while skipping orphans. +//! Every canonical hash in the target height window is fetched from +//! bitcoind up front via [`get_block_hashes_range`], so the scan +//! pipeline never needs a per-block RPC call (which is what caused the +//! original silent-drop reorg bug). //! -//! How it works: -//! -//! 1. [`CanonicalRange::walk`] asks bitcoind once, up front, for the -//! canonical block hash at every height in the target window. This -//! is one batched JSON-RPC request — no per-block RPC overhead. -//! 2. The reader walks blk files in order and scans each one for the -//! block magic prefix. For every block found, -//! [`peek_canonical_offset`] hashes the 80-byte header and looks -//! the hash up in the canonical map. Orphans short-circuit here, -//! before any bytes are cloned. -//! 3. Canonical hits are cloned into [`ScannedBlock`]s and shipped -//! over a channel to a small pool of parser workers, which run -//! [`parse_canonical_body`] to fully decode the block. -//! 4. Parsers serialise their output through [`ReorderState`] so that -//! the consumer receives blocks in canonical-height order even if -//! the blk files emitted them out of order. -//! -//! Ranges of at most `TAIL_THRESHOLD` blocks take a specialised -//! [`pipeline_tail`] path that reverse-scans the newest blk files in -//! 5 MB chunks — cheaper than walking forward from genesis for a -//! handful of tip blocks. -//! -//! Public entry points: [`ReaderInner::after_canonical`] and -//! [`ReaderInner::after_canonical_with`]. Coexists with the original -//! `read` / `read_rev` / `after` so the two can be A/B-tested. +//! [`get_block_hashes_range`]: brk_rpc::Client::get_block_hashes_range -use std::{ - fs::{self, File}, - io::{Cursor, Read, Seek, SeekFrom}, - ops::ControlFlow, - sync::atomic::{AtomicBool, Ordering}, - thread, -}; - -use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable}; -use brk_error::{Error, Result}; +use brk_error::Result; use brk_rpc::Client; -use brk_types::{BlkMetadata, Block, BlockHash, BlockHashPrefix, Height, ReadBlock}; -use crossbeam::channel::{Receiver, Sender, bounded}; -use parking_lot::Mutex; +use brk_types::{BlockHash, BlockHashPrefix, Height}; use rustc_hash::FxHashMap; -use tracing::{error, warn}; - -use crate::{BlkIndexToBlkPath, ReaderInner, XORBytes, XORIndex, scan::scan_bytes}; - -const BOUND_CAP: usize = 50; -const TAIL_CHUNK: usize = 5 * 1024 * 1024; -/// Up to this many canonical blocks → tail pipeline. Beyond → forward. -const TAIL_THRESHOLD: usize = 10; -/// Default parser-thread count for `after_canonical`. The indexer is -/// CPU-bound on the consumer side, so 1 parser thread + 1 reader thread -/// (= 2 total) leaves the rest of the cores for the indexer. Bench tools -/// that drain the channel cheaply can override via `after_canonical_with`. -const DEFAULT_PARSER_THREADS: usize = 1; - -// ───────────────────────────────────────────────────────────────────────────── -// CanonicalRange — the only RPC-aware piece in this file. -// ───────────────────────────────────────────────────────────────────────────── /// Every canonical block hash in a contiguous height window, resolved /// from bitcoind once up front. `hashes[i]` is the canonical hash at @@ -78,15 +25,20 @@ pub struct CanonicalRange { } impl CanonicalRange { - /// Resolves canonical hashes for every height strictly after `anchor` - /// up to `tip` inclusive. `anchor = None` starts at genesis. + /// Resolves canonical hashes for every height strictly after + /// `anchor` up to `tip` inclusive. `anchor = None` starts at + /// genesis. pub fn walk(client: &Client, anchor: Option, tip: Height) -> Result { let start = match anchor { Some(hash) => Height::from(client.get_block_header_info(&hash)?.height + 1), None => Height::ZERO, }; + Self::between(client, start, tip) + } - if start > tip { + /// Resolves canonical hashes for every height in `start..=end`. + pub fn between(client: &Client, start: Height, end: Height) -> Result { + if start > end { return Ok(Self { start, hashes: Vec::new(), @@ -94,7 +46,7 @@ impl CanonicalRange { }); } - let hashes = client.get_block_hashes_range(*start, *tip)?; + let hashes = client.get_block_hashes_range(*start, *end)?; let mut by_prefix = FxHashMap::with_capacity_and_hasher(hashes.len(), Default::default()); by_prefix.extend( @@ -124,454 +76,8 @@ impl CanonicalRange { /// the full hash so prefix collisions from orphaned blocks are /// rejected. #[inline] - fn offset_of(&self, hash: &BlockHash) -> Option { + pub(crate) fn offset_of(&self, hash: &BlockHash) -> Option { let offset = *self.by_prefix.get(&BlockHashPrefix::from(hash))?; (self.hashes[offset as usize] == *hash).then_some(offset) } } - -// ───────────────────────────────────────────────────────────────────────────── -// Block parsing — cheap header peek first, full body parse only on a hit. -// ───────────────────────────────────────────────────────────────────────────── - -const HEADER_LEN: usize = 80; - -/// Returns the canonical offset of `bytes` if its header hashes to a -/// known canonical block, otherwise `None`. Does not allocate and does -/// not mutate `bytes`: the header is copied onto a stack buffer and -/// XOR-decoded there so an orphan short-circuits cleanly and a -/// canonical hit can still be cloned out intact. -fn peek_canonical_offset( - bytes: &[u8], - mut xor_state: XORIndex, - xor_bytes: XORBytes, - canonical: &CanonicalRange, -) -> Option { - if bytes.len() < HEADER_LEN { - return None; - } - let mut header_buf = [0u8; HEADER_LEN]; - header_buf.copy_from_slice(&bytes[..HEADER_LEN]); - xor_state.bytes(&mut header_buf, xor_bytes); - let header = Header::consensus_decode(&mut &header_buf[..]).ok()?; - canonical.offset_of(&BlockHash::from(header.block_hash())) -} - -/// Full XOR-decode + parse for a block that has already been confirmed -/// canonical by `peek_canonical_offset`. Takes owned `bytes` so it can -/// mutate them in place and hand them to the resulting `ReadBlock`. -fn parse_canonical_body( - mut bytes: Vec, - metadata: BlkMetadata, - mut xor_state: XORIndex, - xor_bytes: XORBytes, - height: Height, -) -> Result { - xor_state.bytes(&mut bytes, xor_bytes); - let mut cursor = Cursor::new(bytes); - let header = Header::consensus_decode(&mut cursor)?; - let bitcoin_hash = header.block_hash(); - let tx_count = VarInt::consensus_decode(&mut cursor)?.0 as usize; - let mut txdata = Vec::with_capacity(tx_count); - let mut tx_metadata = Vec::with_capacity(tx_count); - let mut tx_offsets = Vec::with_capacity(tx_count); - for _ in 0..tx_count { - let tx_start = cursor.position() as u32; - tx_offsets.push(tx_start); - let tx = Transaction::consensus_decode(&mut cursor)?; - let tx_len = cursor.position() as u32 - tx_start; - txdata.push(tx); - tx_metadata.push(BlkMetadata::new(metadata.position() + tx_start, tx_len)); - } - - let raw_bytes = cursor.into_inner(); - let mut block = Block::from((height, bitcoin_hash, bitcoin::Block { header, txdata })); - block.set_raw_data(raw_bytes, tx_offsets); - Ok(ReadBlock::from((block, metadata, tx_metadata))) -} - -// ───────────────────────────────────────────────────────────────────────────── -// Public entry — drop-in replacement for `Reader::after`. -// ───────────────────────────────────────────────────────────────────────────── - -impl ReaderInner { - /// Streams every canonical block strictly after `hash` (or from - /// genesis when `None`) up to the current chain tip, in canonical - /// order. Uses the default parser-thread count; see - /// [`after_canonical_with`](Self::after_canonical_with) to override. - pub fn after_canonical(&self, hash: Option) -> Result> { - self.after_canonical_with(hash, DEFAULT_PARSER_THREADS) - } - - /// Like [`after_canonical`](Self::after_canonical) but with a - /// configurable number of parser threads. `parser_threads = 1` is - /// the minimal-thread default (1 reader + 1 parser, uncontended - /// mutex). Higher values trade extra cores for throughput on dense - /// ranges where the parser is the bottleneck. - pub fn after_canonical_with( - &self, - hash: Option, - parser_threads: usize, - ) -> Result> { - let parser_threads = parser_threads.max(1); - let tip = self.client.get_last_height()?; - let canonical = CanonicalRange::walk(&self.client, hash, tip)?; - - if canonical.is_empty() { - return Ok(bounded(0).1); - } - - let paths = BlkIndexToBlkPath::scan(&self.blocks_dir); - *self.blk_index_to_blk_path.write() = paths.clone(); - - let (send, recv) = bounded(BOUND_CAP); - let xor_bytes = self.xor_bytes; - let use_tail = canonical.len() <= TAIL_THRESHOLD; - let first_blk_index = if use_tail { - 0 - } else { - self.find_start_blk_index(Some(canonical.start), &paths, xor_bytes) - .unwrap_or_default() - }; - - thread::spawn(move || { - let result = if use_tail { - pipeline_tail(&paths, xor_bytes, &canonical, &send) - } else { - pipeline_forward( - &paths, - first_blk_index, - xor_bytes, - &canonical, - &send, - parser_threads, - ) - }; - if let Err(e) = result { - error!("after_canonical pipeline failed: {e}"); - } - }); - - Ok(recv) - } -} - -// ───────────────────────────────────────────────────────────────────────────── -// Forward pipeline — 1 reader + N parsers + shared in-order emission. -// ───────────────────────────────────────────────────────────────────────────── - -/// A raw block the reader has already confirmed is on the canonical -/// chain, shipped to the parser pool for full decoding. -struct ScannedBlock { - metadata: BlkMetadata, - bytes: Vec, - xor_state: XORIndex, - canonical_offset: u32, -} - -/// In-order emission buffer shared between the parser threads. Access -/// is serialised through a `parking_lot::Mutex`; at `parser_threads = 1` -/// the lock is always uncontended. -struct ReorderState { - next_offset: u32, - /// Ahead-of-line matches keyed by canonical offset; drained - /// contiguously each time `next_offset` advances. Bounded in - /// practice by parser-thread scheduling jitter — see module doc. - pending: FxHashMap, - send_to_consumer: Sender, -} - -impl ReorderState { - fn new(send_to_consumer: Sender) -> Self { - Self { - next_offset: 0, - pending: FxHashMap::default(), - send_to_consumer, - } - } - - /// Accepts a parsed canonical block; emits it and drains any - /// contiguous pending matches. Returns `false` iff the consumer - /// dropped the receiver — a pure liveness signal. Completion is - /// checked by the caller via `next_offset`. - fn try_emit(&mut self, offset: u32, block: ReadBlock) -> bool { - use std::cmp::Ordering::*; - match offset.cmp(&self.next_offset) { - Equal => { - if self.send_to_consumer.send(block).is_err() { - return false; - } - self.next_offset += 1; - while let Some(next) = self.pending.remove(&self.next_offset) { - if self.send_to_consumer.send(next).is_err() { - return false; - } - self.next_offset += 1; - } - true - } - Greater => { - self.pending.insert(offset, block); - true - } - // Unreachable in practice: each canonical hash appears at - // exactly one offset and each block is parsed once. - Less => true, - } - } -} - -/// Forward pipeline: the reader (this thread) scans blk files and -/// ships canonical hits to a scoped parser pool via `parser_send`; -/// parsers decode bodies and serialise emission through `reorder`. -/// Scoped threads let every parser borrow `canonical`, `reorder`, and -/// `done` directly — no `Arc` required. -/// -/// A reorder buffer is required even at `parser_threads = 1` because -/// canonical blocks can arrive out of order across blk files (bitcoind -/// doesn't write in strict chain order during initial sync, headers- -/// first body fetch, or reindex). -fn pipeline_forward( - paths: &BlkIndexToBlkPath, - first_blk_index: u16, - xor_bytes: XORBytes, - canonical: &CanonicalRange, - send: &Sender, - parser_threads: usize, -) -> Result<()> { - let (parser_send, parser_recv) = bounded::(BOUND_CAP); - let reorder = Mutex::new(ReorderState::new(send.clone())); - let target_canonical_count = canonical.len() as u32; - let done = AtomicBool::new(false); - - thread::scope(|scope| -> Result<()> { - for _ in 0..parser_threads { - let parser_recv = parser_recv.clone(); - scope.spawn(|| { - parser_loop( - parser_recv, - &reorder, - &done, - canonical, - xor_bytes, - target_canonical_count, - ) - }); - } - // Every parser owns its own clone; ours would otherwise keep - // the channel "alive" and leak a dangling receiver. - drop(parser_recv); - - let read_result = read_and_dispatch( - paths, - first_blk_index, - xor_bytes, - canonical, - &parser_send, - &done, - ); - // Signal end-of-input to the parsers so they exit their `for` - // loops and the scope can join them. - drop(parser_send); - read_result - })?; - - let pipeline_cancelled = done.load(Ordering::Relaxed); - let emitted = reorder.lock().next_offset as usize; - if !pipeline_cancelled && emitted < canonical.len() { - return Err(Error::Internal( - "after_canonical forward pipeline: blk files missing canonical blocks", - )); - } - Ok(()) -} - -/// Full-body parse + in-order emit loop run by every scoped parser -/// worker in `pipeline_forward`. Drains `parser_recv` to exhaustion. -fn parser_loop( - parser_recv: Receiver, - reorder: &Mutex, - done: &AtomicBool, - canonical: &CanonicalRange, - xor_bytes: XORBytes, - target_canonical_count: u32, -) { - for ScannedBlock { metadata, bytes, xor_state, canonical_offset } in parser_recv { - if done.load(Ordering::Relaxed) { - continue; - } - let height = Height::from(*canonical.start + canonical_offset); - let block = match parse_canonical_body(bytes, metadata, xor_state, xor_bytes, height) { - Ok(block) => block, - Err(e) => { - warn!("parse_canonical_body failed: {e}"); - continue; - } - }; - let pipeline_finished = { - let mut state = reorder.lock(); - !state.try_emit(canonical_offset, block) - || state.next_offset >= target_canonical_count - }; - if pipeline_finished { - done.store(true, Ordering::Relaxed); - } - } -} - -/// Walk blk files from `first_blk_index`, scan each one, and ship -/// canonical blocks to the parser pool. Non-canonical blocks are -/// rejected via `peek_canonical_offset` *before* being cloned — the -/// cheap filter is what lets a sparse catchup avoid allocating for the -/// ~99% of blocks outside the window. -fn read_and_dispatch( - paths: &BlkIndexToBlkPath, - first_blk_index: u16, - xor_bytes: XORBytes, - canonical: &CanonicalRange, - parser_send: &Sender, - done: &AtomicBool, -) -> Result<()> { - for (&blk_index, blk_path) in paths.range(first_blk_index..) { - if done.load(Ordering::Relaxed) { - return Ok(()); - } - - let mut bytes = fs::read(blk_path).map_err(|e| { - error!("Failed to read blk file {}: {e}", blk_path.display()); - Error::Internal("Failed to read blk file") - })?; - - let result = scan_bytes( - &mut bytes, - blk_index, - 0, - xor_bytes, - |metadata, block_bytes, xor_state| { - if done.load(Ordering::Relaxed) { - return ControlFlow::Break(()); - } - let Some(canonical_offset) = - peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical) - else { - return ControlFlow::Continue(()); - }; - let scanned = ScannedBlock { - metadata, - bytes: block_bytes.to_vec(), - xor_state, - canonical_offset, - }; - if parser_send.send(scanned).is_err() { - ControlFlow::Break(()) - } else { - ControlFlow::Continue(()) - } - }, - ); - - if result.interrupted { - return Ok(()); - } - } - Ok(()) -} - -// ───────────────────────────────────────────────────────────────────────────── -// Tail pipeline — reverse-scan the newest blk files in 5 MB chunks until -// every canonical hash has been matched, then emit them forward. -// ───────────────────────────────────────────────────────────────────────────── - -fn pipeline_tail( - paths: &BlkIndexToBlkPath, - xor_bytes: XORBytes, - canonical: &CanonicalRange, - send: &Sender, -) -> Result<()> { - let mut slots: Vec> = (0..canonical.len()).map(|_| None).collect(); - let mut remaining = canonical.len(); - // Carries the bytes before a chunk's first magic into the next - // (earlier) chunk so blocks straddling the boundary survive. - let mut spillover: Vec = Vec::new(); - - 'files: for (&blk_index, path) in paths.iter().rev() { - let mut file = File::open(path).map_err(|_| Error::Internal("Failed to open blk file"))?; - let file_len = file.metadata().map(|m| m.len() as usize).unwrap_or(0); - if file_len == 0 { - continue; - } - - let mut read_end = file_len; - spillover.clear(); - - while read_end > 0 && remaining > 0 { - let read_start = read_end.saturating_sub(TAIL_CHUNK); - let chunk_len = read_end - read_start; - read_end = read_start; - - file.seek(SeekFrom::Start(read_start as u64)) - .map_err(|_| Error::Internal("Failed to seek blk file"))?; - let mut buf = vec![0u8; chunk_len + spillover.len()]; - file.read_exact(&mut buf[..chunk_len]) - .map_err(|_| Error::Internal("Failed to read blk chunk"))?; - buf[chunk_len..].copy_from_slice(&spillover); - spillover.clear(); - - let result = scan_bytes( - &mut buf, - blk_index, - read_start, - xor_bytes, - |metadata, block_bytes, xor_state| { - let Some(offset) = - peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical) - else { - return ControlFlow::Continue(()); - }; - if slots[offset as usize].is_some() { - return ControlFlow::Continue(()); - } - let height = Height::from(*canonical.start + offset); - match parse_canonical_body( - block_bytes.to_vec(), - metadata, - xor_state, - xor_bytes, - height, - ) { - Ok(block) => { - slots[offset as usize] = Some(block); - remaining -= 1; - } - Err(e) => warn!("parse_canonical_body failed in tail pipeline: {e}"), - } - if remaining == 0 { - ControlFlow::Break(()) - } else { - ControlFlow::Continue(()) - } - }, - ); - - if remaining == 0 { - break 'files; - } - if read_start > 0 { - spillover.extend_from_slice(&buf[..result.first_magic.unwrap_or(buf.len())]); - } - } - } - - if remaining > 0 { - return Err(Error::Internal( - "after_canonical tail pipeline: blk files missing canonical blocks", - )); - } - - for block in slots.into_iter().flatten() { - if send.send(block).is_err() { - return Ok(()); - } - } - Ok(()) -} diff --git a/crates/brk_reader/src/decode.rs b/crates/brk_reader/src/decode.rs deleted file mode 100644 index 4c887f92e..000000000 --- a/crates/brk_reader/src/decode.rs +++ /dev/null @@ -1,73 +0,0 @@ -use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable, io::Cursor}; -use brk_error::Result; -use brk_rpc::Client; -use brk_types::{BlkMetadata, Block, Height, ReadBlock}; - -use crate::{XORBytes, XORIndex}; - -/// Margin for timestamp non-monotonicity -const TIMESTAMP_MARGIN: u32 = 3 * 3600; - -#[allow(clippy::too_many_arguments)] -pub fn decode_block( - mut bytes: Vec, - metadata: BlkMetadata, - client: &Client, - mut xor_i: XORIndex, - xor_bytes: XORBytes, - start: Option, - end: Option, - start_time: u32, - end_time: u32, -) -> Result> { - xor_i.bytes(bytes.as_mut_slice(), xor_bytes); - - let mut cursor = Cursor::new(bytes); - - let header = Header::consensus_decode(&mut cursor)?; - - // Skip blocks clearly outside the target range using header timestamp - if header.time < start_time.saturating_sub(TIMESTAMP_MARGIN) - || (end_time > 0 && header.time > end_time.saturating_add(TIMESTAMP_MARGIN)) - { - return Ok(None); - } - - let hash = header.block_hash(); - - let Ok(block_header_result) = client.get_block_header_info(&hash) else { - return Ok(None); - }; - - let height = Height::from(block_header_result.height); - - if start.is_some_and(|s| s > height) || end.is_some_and(|e| e < height) { - return Ok(None); - } - if block_header_result.confirmations <= 0 { - return Ok(None); - } - - let tx_count = VarInt::consensus_decode(&mut cursor)?.0 as usize; - - let mut txdata = Vec::with_capacity(tx_count); - let mut tx_metadata = Vec::with_capacity(tx_count); - let mut tx_offsets = Vec::with_capacity(tx_count); - for _ in 0..tx_count { - let offset = cursor.position() as u32; - tx_offsets.push(offset); - let position = metadata.position() + offset; - let tx = Transaction::consensus_decode(&mut cursor)?; - txdata.push(tx); - let len = cursor.position() as u32 - offset; - tx_metadata.push(BlkMetadata::new(position, len)); - } - - let block_bytes = cursor.into_inner(); - - let block = bitcoin::Block { header, txdata }; - let mut block = Block::from((height, hash, block)); - block.set_raw_data(block_bytes, tx_offsets); - - Ok(Some(ReadBlock::from((block, metadata, tx_metadata)))) -} diff --git a/crates/brk_reader/src/lib.rs b/crates/brk_reader/src/lib.rs index 5f9ce3979..efb60ffb7 100644 --- a/crates/brk_reader/src/lib.rs +++ b/crates/brk_reader/src/lib.rs @@ -2,13 +2,11 @@ use std::{ collections::BTreeMap, - fs::{self, File}, - io::{Read, Seek, SeekFrom}, - ops::ControlFlow, + fs::File, + io::Read, os::unix::fs::FileExt, path::{Path, PathBuf}, sync::Arc, - thread, }; use bitcoin::{block::Header, consensus::Decodable}; @@ -17,33 +15,34 @@ use brk_error::{Error, Result}; use brk_rpc::Client; use brk_types::{BlkPosition, BlockHash, Height, ReadBlock}; pub use crossbeam::channel::Receiver; -use crossbeam::channel::bounded; use derive_more::Deref; use parking_lot::{RwLock, RwLockReadGuard}; -use rayon::prelude::*; -use tracing::{error, warn}; mod blk_index_to_blk_path; mod canonical; -mod decode; +mod parse; +mod pipeline; mod scan; mod xor_bytes; mod xor_index; pub use canonical::CanonicalRange; -use decode::*; use scan::*; pub use xor_bytes::*; pub use xor_index::*; -const BOUND_CAP: usize = 50; +/// How many blk files to step back from the binary-search hit in +/// [`ReaderInner::find_start_blk_index`]. Guards against blocks that +/// bitcoind wrote to the "current" file slightly out of height order +/// (e.g. the tail of a reorg landing in an earlier file index than +/// its successors). +const START_BLK_INDEX_BACKOFF: usize = 21; +/// Handle to a Bitcoin Core blk-file reader. /// -/// Bitcoin BLK file reader -/// -/// Thread safe and free to clone -/// -/// +/// Cheap to clone (`Arc`-backed) and thread-safe: all streaming +/// methods take `&self` and the returned `Receiver` can be +/// drained from any thread. #[derive(Debug, Clone, Deref)] pub struct Reader(Arc); @@ -141,265 +140,57 @@ impl ReaderInner { }) } - /// Returns a receiver streaming `ReadBlock`s from `hash + 1` to the chain tip. - /// If `hash` is `None`, starts from genesis. + // ───────────────────────────────────────────────────────────────── + // Public streaming API — all calls delegate to `pipeline::spawn`. + // ───────────────────────────────────────────────────────────────── + + /// Streams every canonical block strictly after `hash` (or from + /// genesis when `None`) up to the current chain tip, in canonical + /// order. Uses the default parser-thread count; see + /// [`after_with`](Self::after_with) to override. pub fn after(&self, hash: Option) -> Result> { - let start = if let Some(hash) = hash.as_ref() { - let info = self.client.get_block_header_info(hash)?; - Height::from(info.height + 1) - } else { - Height::ZERO - }; - let end = self.client.get_last_height()?; - - if end < start { - return Ok(bounded(0).1); - } - - if *end - *start < 10 { - let mut blocks: Vec<_> = self.read_rev(Some(start), Some(end)).iter().collect(); - blocks.reverse(); - - let (send, recv) = bounded(blocks.len()); - for block in blocks { - let _ = send.send(block); - } - return Ok(recv); - } - - Ok(self.read(Some(start), Some(end))) + self.after_with(hash, pipeline::DEFAULT_PARSER_THREADS) } - /// Returns a crossbeam channel receiver that streams `ReadBlock`s in chain order. - /// - /// Both `start` and `end` are inclusive. `None` means unbounded. - pub fn read(&self, start: Option, end: Option) -> Receiver { - if let (Some(s), Some(e)) = (start, end) - && s > e - { - let (_, recv) = bounded(0); - return recv; - } - - let client = self.client.clone(); - - let (send_bytes, recv_bytes) = bounded(BOUND_CAP / 2); - let (send_block, recv_block) = bounded(BOUND_CAP); - let (send_ordered, recv_ordered) = bounded(BOUND_CAP); - - let blk_index_to_blk_path = BlkIndexToBlkPath::scan(&self.blocks_dir); - *self.blk_index_to_blk_path.write() = blk_index_to_blk_path.clone(); - - let xor_bytes = self.xor_bytes; - - let first_blk_index = self - .find_start_blk_index(start, &blk_index_to_blk_path, xor_bytes) - .unwrap_or_default(); - - let get_block_time = |h: Height| -> u32 { - self.client - .get_block_hash(*h as u64) - .ok() - .and_then(|hash| self.client.get_block_header(&hash).ok()) - .map(|h| h.time) - .unwrap_or(0) - }; - - let start_time = start.filter(|h| **h > 0).map(&get_block_time).unwrap_or(0); - let end_time = end.map(&get_block_time).unwrap_or(0); - - thread::spawn(move || { - let _ = blk_index_to_blk_path.range(first_blk_index..).try_for_each( - move |(blk_index, blk_path)| { - let Ok(mut bytes) = fs::read(blk_path) else { - error!("Failed to read blk file: {}", blk_path.display()); - return ControlFlow::Break(()); - }; - let result = scan_bytes( - &mut bytes, - *blk_index, - 0, - xor_bytes, - |metadata, block_bytes, xor_i| { - // Send owned bytes to the rayon parser pool. - if send_bytes.send((metadata, block_bytes.to_vec(), xor_i)).is_err() { - return ControlFlow::Break(()); - } - ControlFlow::Continue(()) - }, - ); - if result.interrupted { - return ControlFlow::Break(()); - } - ControlFlow::Continue(()) - }, - ); - }); - - thread::spawn(move || { - // Private pool to prevent collision with the global pool - let parser_pool = rayon::ThreadPoolBuilder::new() - .num_threads(4.min(thread::available_parallelism().unwrap().get() / 2)) - .build() - .expect("Failed to create parser thread pool"); - - parser_pool.install(|| { - let _ = - recv_bytes - .into_iter() - .par_bridge() - .try_for_each(|(metadata, bytes, xor_i)| { - let position = metadata.position(); - match decode_block( - bytes, metadata, &client, xor_i, xor_bytes, start, end, start_time, - end_time, - ) { - Ok(Some(block)) => { - if send_block.send(block).is_err() { - return ControlFlow::Break(()); - } - } - Ok(None) => {} // Block filtered out (outside range, unconfirmed) - Err(e) => { - warn!("Failed to decode block at {position}: {e}"); - } - } - ControlFlow::Continue(()) - }); - }); - }); - - thread::spawn(move || { - let mut current_height = start.unwrap_or_default(); - let mut prev_hash: Option = None; - let mut future_blocks = BTreeMap::default(); - - let _ = recv_block - .iter() - .try_for_each(|block| -> ControlFlow<(), _> { - let mut opt = if current_height == block.height() { - Some(block) - } else { - future_blocks.insert(block.height(), block); - None - }; - - while let Some(block) = opt.take().or_else(|| { - if !future_blocks.is_empty() { - future_blocks.remove(¤t_height) - } else { - None - } - }) { - if let Some(expected_prev) = prev_hash.as_ref() && block.header.prev_blockhash != expected_prev.into() { - error!( - "Chain discontinuity detected at height {}: expected prev_hash {}, got {}. Stopping iteration.", - *block.height(), - expected_prev, - block.header.prev_blockhash - ); - return ControlFlow::Break(()); - } - - prev_hash = Some(block.hash().clone()); - - if send_ordered.send(block).is_err() { - return ControlFlow::Break(()); - } - - current_height.increment(); - - if end.is_some_and(|end| current_height > end) { - return ControlFlow::Break(()); - } - } - - ControlFlow::Continue(()) - }); - }); - - recv_ordered + /// Like [`after`](Self::after) but with a configurable number of + /// parser threads. `parser_threads = 1` is the minimal-thread + /// default (1 reader + 1 parser, uncontended mutex). Higher values + /// trade extra cores for throughput on dense ranges where the + /// parser is the bottleneck. + pub fn after_with( + &self, + hash: Option, + parser_threads: usize, + ) -> Result> { + let tip = self.client.get_last_height()?; + let canonical = CanonicalRange::walk(&self.client, hash, tip)?; + pipeline::spawn(self, canonical, parser_threads) } - /// Streams `ReadBlock`s in reverse order (newest first) by scanning - /// `.blk` files from the tail. Efficient for reading recent blocks. - /// Both `start` and `end` are inclusive. `None` means unbounded. - pub fn read_rev(&self, start: Option, end: Option) -> Receiver { - const CHUNK: usize = 5 * 1024 * 1024; - - if let (Some(s), Some(e)) = (start, end) - && s > e - { - return bounded(0).1; - } - - let client = self.client.clone(); - let xor_bytes = self.xor_bytes; - let paths = BlkIndexToBlkPath::scan(&self.blocks_dir); - *self.blk_index_to_blk_path.write() = paths.clone(); - let (send, recv) = bounded(BOUND_CAP); - - thread::spawn(move || { - let mut head = Vec::new(); - - for (&blk_index, path) in paths.iter().rev() { - let file_len = fs::metadata(path).map(|m| m.len() as usize).unwrap_or(0); - if file_len == 0 { - continue; - } - let Ok(mut file) = File::open(path) else { - return; - }; - let mut read_end = file_len; - - while read_end > 0 { - let read_start = read_end.saturating_sub(CHUNK); - let chunk_len = read_end - read_start; - read_end = read_start; - - let _ = file.seek(SeekFrom::Start(read_start as u64)); - let mut buf = vec![0u8; chunk_len + head.len()]; - if file.read_exact(&mut buf[..chunk_len]).is_err() { - return; - } - buf[chunk_len..].copy_from_slice(&head); - head.clear(); - - let mut blocks = Vec::new(); - let result = scan_bytes( - &mut buf, - blk_index, - read_start, - xor_bytes, - |metadata, bytes, xor_i| { - // `decode_block` needs owned bytes — it XOR- - // decodes in place before parsing. - if let Ok(Some(block)) = decode_block( - bytes.to_vec(), metadata, &client, xor_i, xor_bytes, start, end, 0, 0, - ) { - blocks.push(block); - } - ControlFlow::Continue(()) - }, - ); - - for block in blocks.into_iter().rev() { - let done = start.is_some_and(|s| block.height() <= s); - if send.send(block).is_err() || done { - return; - } - } - - if read_start > 0 { - head = buf[..result.first_magic.unwrap_or(buf.len())].to_vec(); - } - } - } - }); - - recv + /// Streams every canonical block in the inclusive height range + /// `start..=end` in canonical order, via the same pipeline as + /// [`after`](Self::after). + pub fn range(&self, start: Height, end: Height) -> Result> { + self.range_with(start, end, pipeline::DEFAULT_PARSER_THREADS) } + /// Like [`range`](Self::range) but with a configurable number of + /// parser threads. See [`after_with`](Self::after_with) for the + /// parser-count tradeoff. + pub fn range_with( + &self, + start: Height, + end: Height, + parser_threads: usize, + ) -> Result> { + let canonical = CanonicalRange::between(&self.client, start, end)?; + pipeline::spawn(self, canonical, parser_threads) + } + + /// Binary-searches `blk_index_to_blk_path` for the first file + /// whose earliest block height is ≤ `target_start`, then backs + /// off a few files as a safety margin for blocks that were written + /// out of height order (see [`START_BLK_INDEX_BACKOFF`]). fn find_start_blk_index( &self, target_start: Option, @@ -411,7 +202,6 @@ impl ReaderInner { }; let blk_indices: Vec = blk_index_to_blk_path.keys().copied().collect(); - if blk_indices.is_empty() { return Ok(0); } @@ -424,37 +214,33 @@ impl ReaderInner { let mid = (left + right) / 2; let blk_index = blk_indices[mid]; - if let Some(blk_path) = blk_index_to_blk_path.get(&blk_index) { - match self.get_first_block_height(blk_path, xor_bytes) { - Ok(height) => { - if height <= target_start { - best_start_idx = mid; - left = mid + 1; - } else { - if mid == 0 { - break; - } - right = mid - 1; - } - } - Err(_) => { - left = mid + 1; - } - } - } else { + let Some(blk_path) = blk_index_to_blk_path.get(&blk_index) else { break; + }; + match self.first_block_height(blk_path, xor_bytes) { + Ok(height) if height <= target_start => { + best_start_idx = mid; + left = mid + 1; + } + Ok(_) => { + if mid == 0 { + break; + } + right = mid - 1; + } + Err(_) => { + left = mid + 1; + } } } - // buffer for worst-case scenarios when a block as far behind - let final_idx = best_start_idx.saturating_sub(21); - + let final_idx = best_start_idx.saturating_sub(START_BLK_INDEX_BACKOFF); Ok(blk_indices.get(final_idx).copied().unwrap_or(0)) } - pub fn get_first_block_height( + pub fn first_block_height( &self, - blk_path: &PathBuf, + blk_path: &Path, xor_bytes: XORBytes, ) -> Result { let mut file = File::open(blk_path)?; diff --git a/crates/brk_reader/src/parse.rs b/crates/brk_reader/src/parse.rs new file mode 100644 index 000000000..332b64191 --- /dev/null +++ b/crates/brk_reader/src/parse.rs @@ -0,0 +1,73 @@ +//! Pure block parsing — XOR decoding, header and body decode. +//! +//! Split into a cheap header peek and a full body parse so the scan +//! loop can reject non-canonical blocks without copying them. No RPC, +//! no threading, no state. + +use std::io::Cursor; + +use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable}; +use brk_error::{Error, Result}; +use brk_types::{BlkMetadata, Block, BlockHash, Height, ReadBlock}; + +use crate::{XORBytes, XORIndex, canonical::CanonicalRange}; + +const HEADER_LEN: usize = 80; + +/// Returns the canonical offset of `bytes` if its header hashes to a +/// known canonical block, otherwise `None`. Does not allocate and does +/// not mutate `bytes`: the header is copied onto a stack buffer and +/// XOR-decoded there so an orphan short-circuits cleanly and a +/// canonical hit can still be cloned out intact. +pub fn peek_canonical_offset( + bytes: &[u8], + mut xor_state: XORIndex, + xor_bytes: XORBytes, + canonical: &CanonicalRange, +) -> Option { + if bytes.len() < HEADER_LEN { + return None; + } + let mut header_buf = [0u8; HEADER_LEN]; + header_buf.copy_from_slice(&bytes[..HEADER_LEN]); + xor_state.bytes(&mut header_buf, xor_bytes); + let header = Header::consensus_decode(&mut &header_buf[..]).ok()?; + canonical.offset_of(&BlockHash::from(header.block_hash())) +} + +/// Full XOR-decode + parse for a block that has already been confirmed +/// canonical by `peek_canonical_offset`. Takes owned `bytes` so it can +/// mutate them in place and hand them to the resulting `ReadBlock`. +pub fn parse_canonical_body( + mut bytes: Vec, + metadata: BlkMetadata, + mut xor_state: XORIndex, + xor_bytes: XORBytes, + height: Height, +) -> Result { + if bytes.len() < HEADER_LEN { + return Err(Error::Internal("Block bytes shorter than header")); + } + + xor_state.bytes(&mut bytes, xor_bytes); + let mut cursor = Cursor::new(bytes); + let header = Header::consensus_decode(&mut cursor)?; + let bitcoin_hash = header.block_hash(); + let tx_count = VarInt::consensus_decode(&mut cursor)?.0 as usize; + let mut txdata = Vec::with_capacity(tx_count); + let mut tx_metadata = Vec::with_capacity(tx_count); + let mut tx_offsets = Vec::with_capacity(tx_count); + for _ in 0..tx_count { + let tx_start = cursor.position() as u32; + tx_offsets.push(tx_start); + let tx = Transaction::consensus_decode(&mut cursor)?; + let tx_len = cursor.position() as u32 - tx_start; + txdata.push(tx); + tx_metadata.push(BlkMetadata::new(metadata.position() + tx_start, tx_len)); + } + + let raw_bytes = cursor.into_inner(); + let mut block = Block::from((height, bitcoin_hash, bitcoin::Block { header, txdata })); + block.set_raw_data(raw_bytes, tx_offsets); + Ok(ReadBlock::from((block, metadata, tx_metadata))) +} diff --git a/crates/brk_reader/src/pipeline.rs b/crates/brk_reader/src/pipeline.rs new file mode 100644 index 000000000..02554bf7b --- /dev/null +++ b/crates/brk_reader/src/pipeline.rs @@ -0,0 +1,470 @@ +//! The actual pipeline turning a blk-file scan into an ordered +//! `ReadBlock` stream. [`spawn`] picks between two strategies: +//! +//! * **[`pipeline_forward`]** — one reader thread walks blk files in +//! order, peeks each block's header against the pre-fetched +//! `CanonicalRange`, and ships canonical hits over an mpmc channel +//! to a scoped parser pool of `parser_threads` workers, which decode +//! bodies in parallel and serialise emission through a shared +//! [`ReorderState`] mutex. Used when the range is larger than +//! `TAIL_THRESHOLD`. +//! * **[`pipeline_tail`]** — single-threaded reverse scan of the +//! newest blk files in 5 MB chunks, buffering every canonical match +//! in offset-indexed slots and then emitting through [`ReorderState`] +//! in the same order. Used for `canonical.len() <= TAIL_THRESHOLD`, +//! where the channel + lock overhead of the forward pipeline would +//! dominate. +//! +//! Both pipelines route emission through [`ReorderState`], which +//! verifies `block.header.prev_blockhash` against the previously +//! emitted block's hash and aborts cleanly if the canonical-hash batch +//! that produced the stream was stitched across a mid-batch reorg. +//! +//! Canonical blocks can also arrive out of order across blk files +//! (bitcoind doesn't write in strict chain order during initial sync, +//! headers-first body fetch, or reindex), so the reorder buffer is +//! required even at `parser_threads = 1`. + +use std::{ + fs::{self, File}, + io::{Read, Seek, SeekFrom}, + ops::ControlFlow, + sync::atomic::{AtomicBool, Ordering}, + thread, +}; + +use brk_error::{Error, Result}; +use brk_types::{BlkMetadata, BlockHash, Height, ReadBlock}; +use crossbeam::channel::{Receiver, Sender, bounded}; +use parking_lot::Mutex; +use rustc_hash::FxHashMap; +use tracing::{error, warn}; + +use crate::{ + BlkIndexToBlkPath, ReaderInner, XORBytes, XORIndex, + canonical::CanonicalRange, + parse::{parse_canonical_body, peek_canonical_offset}, + scan::scan_bytes, +}; + +const CHANNEL_CAPACITY: usize = 50; +const TAIL_CHUNK: usize = 5 * 1024 * 1024; +/// Up to this many canonical blocks → tail pipeline. Beyond → forward. +const TAIL_THRESHOLD: usize = 10; + +/// Default parser-thread count for [`ReaderInner::after`]. The indexer +/// is CPU-bound on the consumer side, so 1 parser + 1 reader (= 2 +/// threads total) leaves the rest of the cores for the indexer. Bench +/// tools that drain the channel cheaply can override via +/// [`ReaderInner::after_with`]. +pub(crate) const DEFAULT_PARSER_THREADS: usize = 1; + +// ───────────────────────────────────────────────────────────────────────────── +// Shared pipeline entry — called by `Reader::after_with` and `Reader::range_with`. +// ───────────────────────────────────────────────────────────────────────────── + +/// Spawns the reader worker and (for non-tail ranges) a scoped parser +/// pool, and returns the consumer receiver. Shared backend for +/// `after_with` and `range_with`. +pub(crate) fn spawn( + reader: &ReaderInner, + canonical: CanonicalRange, + parser_threads: usize, +) -> Result> { + let parser_threads = parser_threads.max(1); + + if canonical.is_empty() { + return Ok(bounded(0).1); + } + + let paths = BlkIndexToBlkPath::scan(reader.blocks_dir()); + *reader.blk_index_to_blk_path.write() = paths.clone(); + + let (send, recv) = bounded(CHANNEL_CAPACITY); + let xor_bytes = reader.xor_bytes(); + let use_tail = canonical.len() <= TAIL_THRESHOLD; + let first_blk_index = if use_tail { + 0 + } else { + reader + .find_start_blk_index(Some(canonical.start), &paths, xor_bytes) + .unwrap_or_default() + }; + + thread::spawn(move || { + let result = if use_tail { + pipeline_tail(&paths, xor_bytes, &canonical, &send) + } else { + pipeline_forward( + &paths, + first_blk_index, + xor_bytes, + &canonical, + &send, + parser_threads, + ) + }; + if let Err(e) = result { + error!("Reader canonical pipeline failed: {e}"); + } + }); + + Ok(recv) +} + +// ───────────────────────────────────────────────────────────────────────────── +// Forward pipeline — 1 reader + N parsers + shared in-order emission. +// ───────────────────────────────────────────────────────────────────────────── + +/// A raw block the reader has already confirmed is on the canonical +/// chain, shipped to the parser pool for full decoding. +struct ScannedBlock { + metadata: BlkMetadata, + bytes: Vec, + xor_state: XORIndex, + canonical_offset: u32, +} + +/// In-order emission buffer shared between the parser threads. Access +/// is serialised through a `parking_lot::Mutex`; at `parser_threads = 1` +/// the lock is always uncontended. +/// +/// Also enforces **chain continuity**: before emitting each block it +/// checks that `block.header.prev_blockhash` matches the previously- +/// emitted block's hash. A mismatch means the canonical-hash batch +/// that produced this stream was stitched across a mid-batch reorg, +/// so we stop emitting cleanly and let the caller retry. +struct ReorderState { + next_offset: u32, + /// Ahead-of-line matches keyed by canonical offset; drained + /// contiguously each time `next_offset` advances. Bounded in + /// practice by parser-thread scheduling jitter. + pending: FxHashMap, + send_to_consumer: Sender, + /// Hash of the last block successfully emitted, used to verify + /// continuity with the next one. `None` before the first emit. + last_emitted_hash: Option, + /// Flipped when a continuity check fails. + chain_broken: bool, +} + +impl ReorderState { + fn new(send_to_consumer: Sender) -> Self { + Self { + next_offset: 0, + pending: FxHashMap::default(), + send_to_consumer, + last_emitted_hash: None, + chain_broken: false, + } + } + + /// Accepts a parsed canonical block; emits it and drains any + /// contiguous pending matches. Returns `false` once the pipeline + /// should stop — either the consumer dropped the receiver or a + /// chain-continuity check failed. Completion (all blocks emitted) + /// is checked by the caller via `next_offset`. + fn try_emit(&mut self, offset: u32, block: ReadBlock) -> bool { + use std::cmp::Ordering::*; + match offset.cmp(&self.next_offset) { + Equal => { + if !self.send_in_order(block) { + return false; + } + while let Some(next) = self.pending.remove(&self.next_offset) { + if !self.send_in_order(next) { + return false; + } + } + true + } + Greater => { + self.pending.insert(offset, block); + true + } + // Unreachable in practice: each canonical hash appears at + // exactly one offset and each block is parsed once. + Less => true, + } + } + + /// Verifies `block.prev_blockhash` against the last emitted hash, + /// sends the block, and bumps `next_offset`. Returns `false` on + /// continuity failure or consumer drop. + fn send_in_order(&mut self, block: ReadBlock) -> bool { + if let Some(last) = &self.last_emitted_hash { + let prev = BlockHash::from(block.header.prev_blockhash); + if prev != *last { + warn!( + "canonical chain broken at offset {}: expected prev={} got {}", + self.next_offset, last, prev, + ); + self.chain_broken = true; + return false; + } + } + let hash = block.hash().clone(); + if self.send_to_consumer.send(block).is_err() { + return false; + } + self.last_emitted_hash = Some(hash); + self.next_offset += 1; + true + } +} + +fn pipeline_forward( + paths: &BlkIndexToBlkPath, + first_blk_index: u16, + xor_bytes: XORBytes, + canonical: &CanonicalRange, + send: &Sender, + parser_threads: usize, +) -> Result<()> { + let (parser_send, parser_recv) = bounded::(CHANNEL_CAPACITY); + let reorder = Mutex::new(ReorderState::new(send.clone())); + let done = AtomicBool::new(false); + + thread::scope(|scope| -> Result<()> { + for _ in 0..parser_threads { + let parser_recv = parser_recv.clone(); + scope.spawn(|| parser_loop(parser_recv, &reorder, &done, canonical, xor_bytes)); + } + // Every parser owns its own clone; ours would otherwise keep + // the channel "alive" and leak a dangling receiver. + drop(parser_recv); + + let read_result = read_and_dispatch( + paths, + first_blk_index, + xor_bytes, + canonical, + &parser_send, + &done, + ); + // Signal end-of-input to the parsers so they exit their `for` + // loops and the scope can join them. + drop(parser_send); + read_result + })?; + + let state = reorder.lock(); + if state.chain_broken { + return Err(Error::Internal( + "forward pipeline: canonical batch stitched across a reorg", + )); + } + let pipeline_cancelled = done.load(Ordering::Relaxed); + let emitted = state.next_offset as usize; + if !pipeline_cancelled && emitted < canonical.len() { + return Err(Error::Internal( + "forward pipeline: blk files missing canonical blocks", + )); + } + Ok(()) +} + +/// Full-body parse + in-order emit loop run by every scoped parser +/// worker in `pipeline_forward`. Drains `parser_recv` to exhaustion. +fn parser_loop( + parser_recv: Receiver, + reorder: &Mutex, + done: &AtomicBool, + canonical: &CanonicalRange, + xor_bytes: XORBytes, +) { + for ScannedBlock { metadata, bytes, xor_state, canonical_offset } in parser_recv { + if done.load(Ordering::Relaxed) { + continue; + } + let height = Height::from(*canonical.start + canonical_offset); + let block = match parse_canonical_body(bytes, metadata, xor_state, xor_bytes, height) { + Ok(block) => block, + Err(e) => { + warn!("parse_canonical_body failed: {e}"); + continue; + } + }; + let pipeline_finished = { + let mut state = reorder.lock(); + !state.try_emit(canonical_offset, block) + || state.next_offset as usize >= canonical.len() + }; + if pipeline_finished { + done.store(true, Ordering::Relaxed); + } + } +} + +/// Walk blk files from `first_blk_index`, scan each one, and ship +/// canonical blocks to the parser pool. Non-canonical blocks are +/// rejected via `peek_canonical_offset` *before* being cloned — the +/// cheap filter is what lets a sparse catchup avoid allocating for the +/// ~99% of blocks outside the window. +fn read_and_dispatch( + paths: &BlkIndexToBlkPath, + first_blk_index: u16, + xor_bytes: XORBytes, + canonical: &CanonicalRange, + parser_send: &Sender, + done: &AtomicBool, +) -> Result<()> { + for (&blk_index, blk_path) in paths.range(first_blk_index..) { + if done.load(Ordering::Relaxed) { + return Ok(()); + } + + let mut bytes = fs::read(blk_path).map_err(|e| { + error!("Failed to read blk file {}: {e}", blk_path.display()); + Error::Internal("Failed to read blk file") + })?; + + let result = scan_bytes( + &mut bytes, + blk_index, + 0, + xor_bytes, + |metadata, block_bytes, xor_state| { + if done.load(Ordering::Relaxed) { + return ControlFlow::Break(()); + } + let Some(canonical_offset) = + peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical) + else { + return ControlFlow::Continue(()); + }; + let scanned = ScannedBlock { + metadata, + bytes: block_bytes.to_vec(), + xor_state, + canonical_offset, + }; + if parser_send.send(scanned).is_err() { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + }, + ); + + if result.interrupted { + return Ok(()); + } + } + Ok(()) +} + +// ───────────────────────────────────────────────────────────────────────────── +// Tail pipeline — reverse-scan the newest blk files in 5 MB chunks until +// every canonical hash has been matched, then emit them forward. +// ───────────────────────────────────────────────────────────────────────────── + +/// Single-threaded tail-range pipeline for small `canonical.len()`. +/// Walks blk files in reverse-index order, reads each one in 5 MB +/// chunks from tail to head, and stuffs every canonical match into an +/// offset-indexed `slots` vec. Once every canonical block is matched, +/// emits them in order through [`ReorderState`] (which doubles as the +/// shared continuity checker). Bails on missing blocks or a chain +/// break just like [`pipeline_forward`]. +fn pipeline_tail( + paths: &BlkIndexToBlkPath, + xor_bytes: XORBytes, + canonical: &CanonicalRange, + send: &Sender, +) -> Result<()> { + let mut slots: Vec> = (0..canonical.len()).map(|_| None).collect(); + let mut remaining = canonical.len(); + // Carries the bytes before a chunk's first magic into the next + // (earlier) chunk so blocks straddling the boundary survive. + let mut spillover: Vec = Vec::new(); + + 'files: for (&blk_index, path) in paths.iter().rev() { + let mut file = File::open(path).map_err(|_| Error::Internal("Failed to open blk file"))?; + let file_len = file.metadata().map(|m| m.len() as usize).unwrap_or(0); + if file_len == 0 { + continue; + } + + let mut read_end = file_len; + spillover.clear(); + + while read_end > 0 && remaining > 0 { + let read_start = read_end.saturating_sub(TAIL_CHUNK); + let chunk_len = read_end - read_start; + read_end = read_start; + + file.seek(SeekFrom::Start(read_start as u64)) + .map_err(|_| Error::Internal("Failed to seek blk file"))?; + let mut buf = vec![0u8; chunk_len + spillover.len()]; + file.read_exact(&mut buf[..chunk_len]) + .map_err(|_| Error::Internal("Failed to read blk chunk"))?; + buf[chunk_len..].copy_from_slice(&spillover); + spillover.clear(); + + let result = scan_bytes( + &mut buf, + blk_index, + read_start, + xor_bytes, + |metadata, block_bytes, xor_state| { + let Some(offset) = + peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical) + else { + return ControlFlow::Continue(()); + }; + if slots[offset as usize].is_some() { + return ControlFlow::Continue(()); + } + let height = Height::from(*canonical.start + offset); + match parse_canonical_body( + block_bytes.to_vec(), + metadata, + xor_state, + xor_bytes, + height, + ) { + Ok(block) => { + slots[offset as usize] = Some(block); + remaining -= 1; + } + Err(e) => warn!("parse_canonical_body failed in tail pipeline: {e}"), + } + if remaining == 0 { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + }, + ); + + if remaining == 0 { + break 'files; + } + if read_start > 0 { + spillover.extend_from_slice(&buf[..result.first_magic.unwrap_or(buf.len())]); + } + } + } + + if remaining > 0 { + return Err(Error::Internal( + "tail pipeline: blk files missing canonical blocks", + )); + } + + // Emit in canonical order via the same `ReorderState` the forward + // pipeline uses, which verifies `prev_blockhash` continuity between + // adjacent blocks as a side effect of `try_emit`. + let mut reorder = ReorderState::new(send.clone()); + for (offset, block) in slots.into_iter().flatten().enumerate() { + if !reorder.try_emit(offset as u32, block) { + break; + } + } + if reorder.chain_broken { + return Err(Error::Internal( + "tail pipeline: canonical batch stitched across a reorg", + )); + } + Ok(()) +} diff --git a/crates/brk_rpc/src/backend/corepc.rs b/crates/brk_rpc/src/backend/corepc.rs index 1a9b040e2..399dedf65 100644 --- a/crates/brk_rpc/src/backend/corepc.rs +++ b/crates/brk_rpc/src/backend/corepc.rs @@ -184,8 +184,6 @@ impl ClientInner { /// range `start..=end`. Internally splits into JSON-RPC batches of /// `BATCH_CHUNK` requests so a 1M-block reindex doesn't try to push /// a 50 MB request body or hold every response in memory at once. - /// Each chunk is one HTTP round-trip — still drops the per-call - /// overhead that dominates a sequential `get_block_hash` loop. /// /// Returns hashes in canonical order (`start`, `start+1`, …, `end`). pub fn get_block_hashes_range( @@ -214,9 +212,6 @@ impl ClientInner { end: u64, out: &mut Vec, ) -> Result<()> { - // Build raw param strings up front so each `Request` can borrow - // them; `corepc_jsonrpc::Client::build_request` takes a borrowed - // `&RawValue`. let params: Vec> = (start..=end) .map(|h| { RawValue::from_string(format!("[{h}]")).map_err(|e| Error::Parse(e.to_string())) diff --git a/website/scripts/options/network.js b/website/scripts/options/network.js index d8a12647d..95634db66 100644 --- a/website/scripts/options/network.js +++ b/website/scripts/options/network.js @@ -32,15 +32,8 @@ import { * @returns {PartialOptionsGroup} */ export function createNetworkSection() { - const { - blocks, - transactions, - inputs, - outputs, - supply, - addrs, - cohorts, - } = brk.series; + const { blocks, transactions, inputs, outputs, supply, addrs, cohorts } = + brk.series; const st = colors.scriptType; @@ -213,7 +206,7 @@ export function createNetworkSection() { ], }, { - name: "Count", + name: "Uses", tree: chartsFromCount({ pattern: addrs.reused.uses.reusedAddrUseCount[key], title, @@ -236,7 +229,7 @@ export function createNetworkSection() { tree: [ { name: "Compare", - title: title("Exposed Address Count"), + title: title("Quantum Exposed Address Count"), bottom: [ line({ series: addrs.exposed.count.funded[key], @@ -253,7 +246,7 @@ export function createNetworkSection() { }, { name: "Funded", - title: title("Funded Exposed Addresses"), + title: title("Funded Quantum Exposed Address Count"), bottom: [ line({ series: addrs.exposed.count.funded[key], @@ -264,7 +257,7 @@ export function createNetworkSection() { }, { name: "Total", - title: title("Total Exposed Addresses"), + title: title("Total Quantum Exposed Address Count"), bottom: [ line({ series: addrs.exposed.count.total[key], @@ -276,7 +269,7 @@ export function createNetworkSection() { }, { name: "Supply", - title: title("Supply in Exposed Addresses"), + title: title("Supply in Quantum Exposed Addresses"), bottom: satsBtcUsd({ pattern: addrs.exposed.supply[key], name: "Supply", @@ -331,7 +324,6 @@ export function createNetworkSection() { ]; }; - /** * Build a "By Type" subtree: Compare (count / tx count / tx %) plus a * per-type drill-down with the same three metrics. @@ -352,7 +344,7 @@ export function createNetworkSection() { name: "Compare", tree: [ { - name: `${label} Count`, + name: "Count", tree: [ ...ROLLING_WINDOWS.map((w) => ({ name: w.name, @@ -414,7 +406,7 @@ export function createNetworkSection() { ], }, { - name: "TX %", + name: "TX Share", tree: [ ...ROLLING_WINDOWS.map((w) => ({ name: w.name, @@ -450,7 +442,7 @@ export function createNetworkSection() { name: t.name, tree: [ { - name: `${label} Count`, + name: "Count", tree: chartsFromCount({ pattern: count[t.key], metric: `${t.name} ${label} Count`, @@ -468,7 +460,7 @@ export function createNetworkSection() { }), }, { - name: "TX %", + name: "TX Share", tree: chartsFromPercentCumulative({ pattern: txPercent[t.key], metric: `Share of Transactions with ${t.name} ${lowerLabel}`, @@ -511,101 +503,26 @@ export function createNetworkSection() { name: "Supply", }), }, - { name: "Unspendable", - title: "Unspendable Supply", - bottom: satsBtcUsdFrom({ - source: supply.burned, - key: "cumulative", - name: "All Time", - }), - }, - { - name: "OP_RETURN", - title: "OP_RETURN Burned", - bottom: satsBtcUsd({ - pattern: outputs.value.opReturn.cumulative, - name: "All Time", - }), - }, - ], - }, - - // Transactions - { - name: "Transactions", - tree: [ - { - name: "Count", - tree: chartsFromFullPerBlock({ - pattern: transactions.count.total, - metric: "Transaction Count", - unit: Unit.count, - }), - }, - { - name: "Volume", - tree: satsBtcUsdFullTree({ - pattern: transactions.volume.transferVolume, - metric: "Transaction Volume", - }), - }, - { - name: "Effective Fee Rate", - tree: chartsFromBlockAnd6b({ - pattern: transactions.fees.effectiveFeeRate, - metric: "Effective Transaction Fee Rate", - unit: Unit.feeRate, - }), - }, - { - name: "Fee", - tree: chartsFromBlockAnd6b({ - pattern: transactions.fees.fee, - metric: "Transaction Fee", - unit: Unit.sats, - }), - }, - { - name: "Weight", - tree: chartsFromBlockAnd6b({ - pattern: transactions.size.weight, - metric: "Transaction Weight", - unit: Unit.wu, - }), - }, - { - name: "vSize", - tree: chartsFromBlockAnd6b({ - pattern: transactions.size.vsize, - metric: "Transaction vSize", - unit: Unit.vb, - }), - }, - { - name: "Versions", - tree: chartsFromCountEntries({ - entries: entries(transactions.versions), - metric: "Transaction Versions", - unit: Unit.count, - }), - }, - { - name: "Velocity", - title: "Transaction Velocity", - bottom: [ - line({ - series: supply.velocity.native, - name: "BTC", - unit: Unit.ratio, - }), - line({ - series: supply.velocity.fiat, - name: "USD", - color: colors.usd, - unit: Unit.ratio, - }), + tree: [ + { + name: "Total", + title: "Unspendable Supply", + bottom: satsBtcUsdFrom({ + source: supply.burned, + key: "cumulative", + name: "All Time", + }), + }, + { + name: "OP_RETURN", + title: "OP_RETURN Burned", + bottom: satsBtcUsd({ + pattern: outputs.value.opReturn.cumulative, + name: "All Time", + }), + }, ], }, ], @@ -696,6 +613,93 @@ export function createNetworkSection() { ], }, + // Transactions + { + name: "Transactions", + tree: [ + { + name: "Count", + tree: chartsFromFullPerBlock({ + pattern: transactions.count.total, + metric: "Transaction Count", + unit: Unit.count, + }), + }, + { + name: "Per Second", + tree: averagesArray({ + windows: transactions.volume.txPerSec, + metric: "Transactions per Second", + unit: Unit.perSec, + }), + }, + { + name: "Volume", + tree: satsBtcUsdFullTree({ + pattern: transactions.volume.transferVolume, + metric: "Transaction Volume", + }), + }, + { + name: "Effective Fee Rate", + tree: chartsFromBlockAnd6b({ + pattern: transactions.fees.effectiveFeeRate, + metric: "Effective Transaction Fee Rate", + unit: Unit.feeRate, + }), + }, + { + name: "Fee", + tree: chartsFromBlockAnd6b({ + pattern: transactions.fees.fee, + metric: "Transaction Fee", + unit: Unit.sats, + }), + }, + { + name: "Weight", + tree: chartsFromBlockAnd6b({ + pattern: transactions.size.weight, + metric: "Transaction Weight", + unit: Unit.wu, + }), + }, + { + name: "vSize", + tree: chartsFromBlockAnd6b({ + pattern: transactions.size.vsize, + metric: "Transaction vSize", + unit: Unit.vb, + }), + }, + { + name: "Versions", + tree: chartsFromCountEntries({ + entries: entries(transactions.versions), + metric: "Transaction Versions", + unit: Unit.count, + }), + }, + { + name: "Velocity", + title: "Transaction Velocity", + bottom: [ + line({ + series: supply.velocity.native, + name: "BTC", + unit: Unit.ratio, + }), + line({ + series: supply.velocity.fiat, + name: "USD", + color: colors.usd, + unit: Unit.ratio, + }), + ], + }, + ], + }, + // UTXOs { name: "UTXOs", @@ -741,37 +745,6 @@ export function createNetworkSection() { }, ], }, - { - name: "Inputs", - tree: [ - { - name: "Count", - tree: chartsFromAggregatedPerBlock({ - pattern: inputs.count, - metric: "Input Count", - unit: Unit.count, - }), - }, - { - name: "Per Second", - tree: averagesArray({ - windows: inputs.perSec, - metric: "Inputs per Second", - unit: Unit.perSec, - }), - }, - { - name: "By Type", - tree: createByTypeTree({ - label: "Prev-Out", - count: inputs.byType.inputCount, - txCount: inputs.byType.txCount, - txPercent: inputs.byType.txPercent, - types: inputTypes, - }), - }, - ], - }, { name: "Outputs", tree: [ @@ -803,6 +776,37 @@ export function createNetworkSection() { }, ], }, + { + name: "Inputs", + tree: [ + { + name: "Count", + tree: chartsFromAggregatedPerBlock({ + pattern: inputs.count, + metric: "Input Count", + unit: Unit.count, + }), + }, + { + name: "Per Second", + tree: averagesArray({ + windows: inputs.perSec, + metric: "Inputs per Second", + unit: Unit.perSec, + }), + }, + { + name: "By Type", + tree: createByTypeTree({ + label: "Prev-Out", + count: inputs.byType.inputCount, + txCount: inputs.byType.txCount, + txPercent: inputs.byType.txPercent, + types: inputTypes, + }), + }, + ], + }, { name: "Throughput", tree: ROLLING_WINDOWS.map((w) => ({ @@ -863,7 +867,6 @@ export function createNetworkSection() { }, ], }, - ], }; }