reader: snap

This commit is contained in:
nym21
2026-04-14 01:37:04 +02:00
parent 283baca848
commit 4cd8d9eb56
17 changed files with 927 additions and 1144 deletions

View File

@@ -88,12 +88,12 @@ impl Blocks {
if count <= 10 {
State::new_rpc(client.clone(), start, end, hash_opt)
} else {
State::new_reader(reader.clone(), start, end, hash_opt)
State::new_reader(reader.clone(), start, end, hash_opt)?
}
}
Source::Rpc { client } => State::new_rpc(client.clone(), start, end, hash_opt),
Source::Reader { reader, .. } => {
State::new_reader(reader.clone(), start, end, hash_opt)
State::new_reader(reader.clone(), start, end, hash_opt)?
}
};

View File

@@ -1,5 +1,6 @@
use std::vec;
use brk_error::Result;
use brk_reader::{Reader, Receiver};
use brk_rpc::Client;
use brk_types::{BlockHash, Height, ReadBlock};
@@ -40,10 +41,10 @@ impl State {
start: Height,
end: Height,
after_hash: Option<BlockHash>,
) -> Self {
State::Reader {
receiver: reader.read(Some(start), Some(end)),
) -> Result<Self> {
Ok(State::Reader {
receiver: reader.range(start, end)?,
after_hash,
}
})
}
}

View File

@@ -19,6 +19,5 @@ crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] }
derive_more = { workspace = true }
tracing = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true }
rlimit = "0.11.0"
rustc-hash = { workspace = true }

View File

@@ -1,12 +1,14 @@
# brk_reader
Streams Bitcoin blocks from Bitcoin Core's raw `blk*.dat` files in chain order.
Streams Bitcoin blocks from Bitcoin Core's raw `blk*.dat` files in
canonical chain order, skipping orphans.
## Requirements
A running Bitcoin Core node with RPC access. The reader needs:
- The `blocks/` directory (for `blk*.dat` files)
- RPC connection (to resolve block heights and filter orphan blocks)
- The `blocks/` directory (to read `blk*.dat` files)
- RPC connection (to resolve the canonical chain up front)
## Quick Start
@@ -18,42 +20,62 @@ let client = Client::new(
)?;
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
// Stream the entire chain
for block in reader.read(None, None) {
// Everything from genesis to the current tip
for block in reader.after(None)?.iter() {
println!("{}: {}", block.height(), block.hash());
}
// Or a specific range (inclusive)
for block in reader.read(Some(Height::new(800_000)), Some(Height::new(850_000))) {
// Everything strictly after a known hash (typical sync / catchup pattern)
for block in reader.after(Some(last_known_hash))?.iter() {
// ...
}
// A specific inclusive height range
for block in reader.range(Height::new(800_000), Height::new(850_000))?.iter() {
// ...
}
```
`Reader` is thread-safe and cheap to clone (Arc-backed).
## What You Get
Each `ReadBlock` gives you access to:
| Field | Description |
|-------|-------------|
| `block.height()` | Block height |
| `block.hash()` | Block hash |
| `block.header` | Block header (timestamp, nonce, difficulty, ...) |
| `block.txdata` | All transactions |
| `block.coinbase_tag()` | Miner's coinbase tag |
| `block.metadata()` | Position in the blk file |
| `block.tx_metadata()` | Per-transaction blk file positions |
`Reader` is thread-safe and cheap to clone (Arc-backed).
| Field | Description |
| ---------------------- | ---------------------------------------- |
| `block.height()` | Block height |
| `block.hash()` | Block hash |
| `block.header` | Block header (timestamp, nonce, ...) |
| `block.txdata` | All transactions |
| `block.coinbase_tag()` | Miner's coinbase tag |
| `block.metadata()` | Position in the blk file |
| `block.tx_metadata()` | Per-transaction blk file positions |
## How It Works
Three-thread pipeline connected by bounded channels:
Two-stage pipeline, one reader thread plus `N` parser threads
(default `N = 1`, configurable via `after_with` / `range_with`):
```text
blk*.dat ──► File Reader ──► Parser Pool ──► Orderer ──► Receiver<ReadBlock>
1 thread up to 4 1 thread
canonical chain ──► Reader thread ──► Parser pool ──► Receiver<ReadBlock>
(pre-fetched walks blk files, N workers in canonical order
hashes via RPC) peeks headers, decode bodies
ships hits
```
1. **File reader** binary-searches to the starting blk file, scans for magic bytes, segments raw blocks
2. **Parser pool** XOR-decodes and deserializes blocks in parallel, skips out-of-range blocks via header timestamp, filters orphans via RPC
3. **Orderer** buffers out-of-order arrivals, validates `prev_blockhash` continuity, emits blocks sequentially
1. **`CanonicalRange`** asks bitcoind once, up front, for the canonical
block hash at every height in the target window — one batched
JSON-RPC call, no per-block RPC chatter.
2. **Reader thread** walks blk files in order, scans each for block
magic, and for every block found hashes its 80-byte header and
looks the hash up in the canonical map. Orphans short-circuit
before the block bytes are cloned.
3. **Parser pool** (scoped threads) fully decodes canonical bodies in
parallel and serialises output through an in-order reorder buffer.
The consumer always receives blocks in canonical-height order.
Orphans can never be mistaken for canonical blocks, and a missing
canonical block produces a hard error instead of a silent drop. See
`src/pipeline.rs` for the orchestration and `src/canonical.rs` for the
filter map.

View File

@@ -1,12 +1,12 @@
//! End-to-end benchmark: `Reader::after` (rayon-parallel + reorder thread)
//! versus `Reader::after_canonical` (1 reader + N parser threads + canonical
//! hash filter).
//! Benchmark `Reader::after` / `Reader::after_with` across three
//! parser-thread counts (1 / 4 / 16).
//!
//! Three phases:
//!
//! 1. **Tail scenarios** — pick an anchor `N` blocks below the chain tip
//! and run each implementation `REPEATS` times. Exercises the tail
//! (≤10) and forward (>10) code paths under realistic catchup ranges.
//! 1. **Tail scenarios** — pick an anchor `N` blocks below the chain
//! tip and run each config `REPEATS` times. Exercises the tail
//! (≤10) and forward (>10) code paths under realistic catchup
//! ranges.
//! 2. **Partial reindex** — anchor=`None` but stop after
//! `PARTIAL_LIMIT` blocks. Exercises the early-chain blk files
//! where blocks are small and dense-parsing isn't the bottleneck.
@@ -29,6 +29,7 @@ use brk_types::{BlockHash, Height, ReadBlock};
const SCENARIOS: &[usize] = &[5, 10, 100, 1_000, 10_000];
const REPEATS: usize = 3;
const PARTIAL_LIMIT: usize = 400_000;
const PARSER_COUNTS: &[usize] = &[1, 4, 16];
fn main() -> Result<()> {
let bitcoin_dir = Client::default_bitcoin_path();
@@ -42,75 +43,64 @@ fn main() -> Result<()> {
println!("Tip: {tip}");
println!();
println!(
"{:>10} {:>16} {:>12} {:>12} {:>10}",
"blocks", "impl", "best", "avg", "blk/s"
"{:>10} {:>12} {:>12} {:>12} {:>10}",
"blocks", "parsers", "best", "avg", "blk/s"
);
println!("{}", "-".repeat(68));
println!("{}", "-".repeat(64));
for &n in SCENARIOS {
let anchor_height = Height::from(tip.saturating_sub(n as u32));
let anchor_hash = client.get_block_hash(*anchor_height as u64)?;
let anchor = Some(BlockHash::from(anchor_hash));
let after = bench(REPEATS, || reader.after(anchor.clone()))?;
print_row(n, "after", &after);
let canonical_1 = bench(REPEATS, || reader.after_canonical(anchor.clone()))?;
print_row(n, "canonical[p=1]", &canonical_1);
let canonical_4 =
bench(REPEATS, || reader.after_canonical_with(anchor.clone(), 4))?;
print_row(n, "canonical[p=4]", &canonical_4);
let canonical_16 =
bench(REPEATS, || reader.after_canonical_with(anchor.clone(), 16))?;
print_row(n, "canonical[p=16]", &canonical_16);
sanity_check(n, &after, &canonical_1);
sanity_check(n, &after, &canonical_4);
sanity_check(n, &after, &canonical_16);
let mut first: Option<RunStats> = None;
for &p in PARSER_COUNTS {
let stats = bench(REPEATS, || reader.after_with(anchor.clone(), p))?;
print_row(n, p, &stats);
if let Some(baseline) = &first {
sanity_check(n, baseline, &stats);
} else {
first = Some(stats);
}
}
println!();
}
println!();
println!("Partial reindex (genesis → {PARTIAL_LIMIT} blocks), one run per config:");
println!(
"{:>10} {:>16} {:>12} {:>10}",
"blocks", "impl", "elapsed", "blk/s"
"{:>10} {:>12} {:>12} {:>10}",
"blocks", "parsers", "elapsed", "blk/s"
);
println!("{}", "-".repeat(54));
let after_partial = run_bounded(PARTIAL_LIMIT, || reader.after(None))?;
print_full_row("after", &after_partial);
let p1_partial = run_bounded(PARTIAL_LIMIT, || reader.after_canonical(None))?;
print_full_row("canonical[p=1]", &p1_partial);
sanity_check_full(&after_partial, &p1_partial);
let p4_partial = run_bounded(PARTIAL_LIMIT, || reader.after_canonical_with(None, 4))?;
print_full_row("canonical[p=4]", &p4_partial);
sanity_check_full(&after_partial, &p4_partial);
let p16_partial = run_bounded(PARTIAL_LIMIT, || reader.after_canonical_with(None, 16))?;
print_full_row("canonical[p=16]", &p16_partial);
sanity_check_full(&after_partial, &p16_partial);
println!("{}", "-".repeat(50));
let mut partial_baseline: Option<FullRun> = None;
for &p in PARSER_COUNTS {
let run = run_bounded(PARTIAL_LIMIT, || reader.after_with(None, p))?;
print_full_row(p, &run);
if let Some(baseline) = &partial_baseline {
sanity_check_full(baseline, &run);
} else {
partial_baseline = Some(run);
}
}
println!();
println!("Full reindex (genesis → tip), one run per config:");
println!(
"{:>10} {:>16} {:>12} {:>10}",
"blocks", "impl", "elapsed", "blk/s"
"{:>10} {:>12} {:>12} {:>10}",
"blocks", "parsers", "elapsed", "blk/s"
);
println!("{}", "-".repeat(54));
let after_full = run_once(|| reader.after(None))?;
print_full_row("after", &after_full);
let p1_full = run_once(|| reader.after_canonical(None))?;
print_full_row("canonical[p=1]", &p1_full);
sanity_check_full(&after_full, &p1_full);
let p4_full = run_once(|| reader.after_canonical_with(None, 4))?;
print_full_row("canonical[p=4]", &p4_full);
sanity_check_full(&after_full, &p4_full);
let p16_full = run_once(|| reader.after_canonical_with(None, 16))?;
print_full_row("canonical[p=16]", &p16_full);
sanity_check_full(&after_full, &p16_full);
println!("{}", "-".repeat(50));
let mut full_baseline: Option<FullRun> = None;
for &p in PARSER_COUNTS {
let run = run_once(|| reader.after_with(None, p))?;
print_full_row(p, &run);
if let Some(baseline) = &full_baseline {
sanity_check_full(baseline, &run);
} else {
full_baseline = Some(run);
}
}
Ok(())
}
@@ -152,28 +142,28 @@ where
})
}
fn print_row(requested: usize, label: &str, s: &RunStats) {
fn print_row(requested: usize, parsers: usize, s: &RunStats) {
let blk_per_s = if s.best.is_zero() {
0.0
} else {
s.count as f64 / s.best.as_secs_f64()
};
println!(
"{:>10} {:>16} {:>12?} {:>12?} {:>10.0}",
requested, label, s.best, s.avg, blk_per_s
"{:>10} {:>12} {:>12?} {:>12?} {:>10.0}",
requested, parsers, s.best, s.avg, blk_per_s
);
}
fn sanity_check(requested: usize, after: &RunStats, canonical: &RunStats) {
if after.count != canonical.count {
fn sanity_check(requested: usize, baseline: &RunStats, stats: &RunStats) {
if baseline.count != stats.count {
println!(
" ⚠ block count mismatch: after={} canonical={}",
after.count, canonical.count
" ⚠ block count mismatch: {} vs {}",
baseline.count, stats.count
);
} else if after.count != requested {
} else if baseline.count != requested {
println!(
" (note: got {} blocks, requested {}; tip may have advanced)",
after.count, requested
baseline.count, requested
);
}
}
@@ -221,23 +211,23 @@ where
Ok(FullRun { elapsed, count })
}
fn print_full_row(label: &str, run: &FullRun) {
fn print_full_row(parsers: usize, run: &FullRun) {
let blk_per_s = if run.elapsed.is_zero() {
0.0
} else {
run.count as f64 / run.elapsed.as_secs_f64()
};
println!(
"{:>10} {:>16} {:>12?} {:>10.0}",
run.count, label, run.elapsed, blk_per_s
"{:>10} {:>12} {:>12?} {:>10.0}",
run.count, parsers, run.elapsed, blk_per_s
);
}
fn sanity_check_full(after: &FullRun, canonical: &FullRun) {
if after.count != canonical.count {
fn sanity_check_full(baseline: &FullRun, run: &FullRun) {
if baseline.count != run.count {
println!(
" ⚠ block count mismatch vs after: {} vs {}",
after.count, canonical.count
" ⚠ block count mismatch: {} vs {}",
baseline.count, run.count
);
}
}

View File

@@ -18,7 +18,7 @@ fn main() -> Result<()> {
let mut max_drop_at: u16 = 0;
for (&blk_index, blk_path) in blk_map.iter() {
match reader.get_first_block_height(blk_path, xor_bytes) {
match reader.first_block_height(blk_path, xor_bytes) {
Ok(height) => {
let h = u32::from(height);
let drop = prev_height.map(|p| p.saturating_sub(h)).unwrap_or(0);

View File

@@ -12,9 +12,9 @@ fn main() -> Result<()> {
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
// Stream all blocks
// Stream all blocks from genesis to the current tip.
let i = std::time::Instant::now();
for block in reader.read(None, None) {
for block in reader.after(None)?.iter() {
println!("{}: {}", block.height(), block.hash());
}
println!("Full read: {:?}", i.elapsed());

View File

@@ -19,7 +19,7 @@ fn main() -> Result<()> {
let height = Height::new(h);
let i = std::time::Instant::now();
if let Some(block) = reader.read(Some(height), Some(height)).iter().next() {
if let Some(block) = reader.range(height, height)?.iter().next() {
println!(
"height={} hash={} txs={} coinbase=\"{:?}\" ({:?})",
block.height(),

View File

@@ -1,71 +1,18 @@
//! Canonical-hash pipeline for `Reader::after`.
//! `CanonicalRange`: a pre-fetched map from canonical block hash to
//! offset-from-`start`. The reader uses this as the authoritative
//! filter for "is this block on the main chain?".
//!
//! Bitcoin Core stores accepted blocks in append-only `blk*.dat` files
//! under the data dir, XOR-encoded with a per-datadir key. A "blk
//! file" contains every block the node ever accepted — including
//! blocks that were later orphaned by a reorg — in acceptance order,
//! not height order. This module turns "give me every block after
//! `hash` up to the tip" into an ordered `ReadBlock` stream drawn from
//! those files while skipping orphans.
//! Every canonical hash in the target height window is fetched from
//! bitcoind up front via [`get_block_hashes_range`], so the scan
//! pipeline never needs a per-block RPC call (which is what caused the
//! original silent-drop reorg bug).
//!
//! How it works:
//!
//! 1. [`CanonicalRange::walk`] asks bitcoind once, up front, for the
//! canonical block hash at every height in the target window. This
//! is one batched JSON-RPC request — no per-block RPC overhead.
//! 2. The reader walks blk files in order and scans each one for the
//! block magic prefix. For every block found,
//! [`peek_canonical_offset`] hashes the 80-byte header and looks
//! the hash up in the canonical map. Orphans short-circuit here,
//! before any bytes are cloned.
//! 3. Canonical hits are cloned into [`ScannedBlock`]s and shipped
//! over a channel to a small pool of parser workers, which run
//! [`parse_canonical_body`] to fully decode the block.
//! 4. Parsers serialise their output through [`ReorderState`] so that
//! the consumer receives blocks in canonical-height order even if
//! the blk files emitted them out of order.
//!
//! Ranges of at most `TAIL_THRESHOLD` blocks take a specialised
//! [`pipeline_tail`] path that reverse-scans the newest blk files in
//! 5 MB chunks — cheaper than walking forward from genesis for a
//! handful of tip blocks.
//!
//! Public entry points: [`ReaderInner::after_canonical`] and
//! [`ReaderInner::after_canonical_with`]. Coexists with the original
//! `read` / `read_rev` / `after` so the two can be A/B-tested.
//! [`get_block_hashes_range`]: brk_rpc::Client::get_block_hashes_range
use std::{
fs::{self, File},
io::{Cursor, Read, Seek, SeekFrom},
ops::ControlFlow,
sync::atomic::{AtomicBool, Ordering},
thread,
};
use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable};
use brk_error::{Error, Result};
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{BlkMetadata, Block, BlockHash, BlockHashPrefix, Height, ReadBlock};
use crossbeam::channel::{Receiver, Sender, bounded};
use parking_lot::Mutex;
use brk_types::{BlockHash, BlockHashPrefix, Height};
use rustc_hash::FxHashMap;
use tracing::{error, warn};
use crate::{BlkIndexToBlkPath, ReaderInner, XORBytes, XORIndex, scan::scan_bytes};
const BOUND_CAP: usize = 50;
const TAIL_CHUNK: usize = 5 * 1024 * 1024;
/// Up to this many canonical blocks → tail pipeline. Beyond → forward.
const TAIL_THRESHOLD: usize = 10;
/// Default parser-thread count for `after_canonical`. The indexer is
/// CPU-bound on the consumer side, so 1 parser thread + 1 reader thread
/// (= 2 total) leaves the rest of the cores for the indexer. Bench tools
/// that drain the channel cheaply can override via `after_canonical_with`.
const DEFAULT_PARSER_THREADS: usize = 1;
// ─────────────────────────────────────────────────────────────────────────────
// CanonicalRange — the only RPC-aware piece in this file.
// ─────────────────────────────────────────────────────────────────────────────
/// Every canonical block hash in a contiguous height window, resolved
/// from bitcoind once up front. `hashes[i]` is the canonical hash at
@@ -78,15 +25,20 @@ pub struct CanonicalRange {
}
impl CanonicalRange {
/// Resolves canonical hashes for every height strictly after `anchor`
/// up to `tip` inclusive. `anchor = None` starts at genesis.
/// Resolves canonical hashes for every height strictly after
/// `anchor` up to `tip` inclusive. `anchor = None` starts at
/// genesis.
pub fn walk(client: &Client, anchor: Option<BlockHash>, tip: Height) -> Result<Self> {
let start = match anchor {
Some(hash) => Height::from(client.get_block_header_info(&hash)?.height + 1),
None => Height::ZERO,
};
Self::between(client, start, tip)
}
if start > tip {
/// Resolves canonical hashes for every height in `start..=end`.
pub fn between(client: &Client, start: Height, end: Height) -> Result<Self> {
if start > end {
return Ok(Self {
start,
hashes: Vec::new(),
@@ -94,7 +46,7 @@ impl CanonicalRange {
});
}
let hashes = client.get_block_hashes_range(*start, *tip)?;
let hashes = client.get_block_hashes_range(*start, *end)?;
let mut by_prefix =
FxHashMap::with_capacity_and_hasher(hashes.len(), Default::default());
by_prefix.extend(
@@ -124,454 +76,8 @@ impl CanonicalRange {
/// the full hash so prefix collisions from orphaned blocks are
/// rejected.
#[inline]
fn offset_of(&self, hash: &BlockHash) -> Option<u32> {
pub(crate) fn offset_of(&self, hash: &BlockHash) -> Option<u32> {
let offset = *self.by_prefix.get(&BlockHashPrefix::from(hash))?;
(self.hashes[offset as usize] == *hash).then_some(offset)
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Block parsing — cheap header peek first, full body parse only on a hit.
// ─────────────────────────────────────────────────────────────────────────────
const HEADER_LEN: usize = 80;
/// Returns the canonical offset of `bytes` if its header hashes to a
/// known canonical block, otherwise `None`. Does not allocate and does
/// not mutate `bytes`: the header is copied onto a stack buffer and
/// XOR-decoded there so an orphan short-circuits cleanly and a
/// canonical hit can still be cloned out intact.
fn peek_canonical_offset(
bytes: &[u8],
mut xor_state: XORIndex,
xor_bytes: XORBytes,
canonical: &CanonicalRange,
) -> Option<u32> {
if bytes.len() < HEADER_LEN {
return None;
}
let mut header_buf = [0u8; HEADER_LEN];
header_buf.copy_from_slice(&bytes[..HEADER_LEN]);
xor_state.bytes(&mut header_buf, xor_bytes);
let header = Header::consensus_decode(&mut &header_buf[..]).ok()?;
canonical.offset_of(&BlockHash::from(header.block_hash()))
}
/// Full XOR-decode + parse for a block that has already been confirmed
/// canonical by `peek_canonical_offset`. Takes owned `bytes` so it can
/// mutate them in place and hand them to the resulting `ReadBlock`.
fn parse_canonical_body(
mut bytes: Vec<u8>,
metadata: BlkMetadata,
mut xor_state: XORIndex,
xor_bytes: XORBytes,
height: Height,
) -> Result<ReadBlock> {
xor_state.bytes(&mut bytes, xor_bytes);
let mut cursor = Cursor::new(bytes);
let header = Header::consensus_decode(&mut cursor)?;
let bitcoin_hash = header.block_hash();
let tx_count = VarInt::consensus_decode(&mut cursor)?.0 as usize;
let mut txdata = Vec::with_capacity(tx_count);
let mut tx_metadata = Vec::with_capacity(tx_count);
let mut tx_offsets = Vec::with_capacity(tx_count);
for _ in 0..tx_count {
let tx_start = cursor.position() as u32;
tx_offsets.push(tx_start);
let tx = Transaction::consensus_decode(&mut cursor)?;
let tx_len = cursor.position() as u32 - tx_start;
txdata.push(tx);
tx_metadata.push(BlkMetadata::new(metadata.position() + tx_start, tx_len));
}
let raw_bytes = cursor.into_inner();
let mut block = Block::from((height, bitcoin_hash, bitcoin::Block { header, txdata }));
block.set_raw_data(raw_bytes, tx_offsets);
Ok(ReadBlock::from((block, metadata, tx_metadata)))
}
// ─────────────────────────────────────────────────────────────────────────────
// Public entry — drop-in replacement for `Reader::after`.
// ─────────────────────────────────────────────────────────────────────────────
impl ReaderInner {
/// Streams every canonical block strictly after `hash` (or from
/// genesis when `None`) up to the current chain tip, in canonical
/// order. Uses the default parser-thread count; see
/// [`after_canonical_with`](Self::after_canonical_with) to override.
pub fn after_canonical(&self, hash: Option<BlockHash>) -> Result<Receiver<ReadBlock>> {
self.after_canonical_with(hash, DEFAULT_PARSER_THREADS)
}
/// Like [`after_canonical`](Self::after_canonical) but with a
/// configurable number of parser threads. `parser_threads = 1` is
/// the minimal-thread default (1 reader + 1 parser, uncontended
/// mutex). Higher values trade extra cores for throughput on dense
/// ranges where the parser is the bottleneck.
pub fn after_canonical_with(
&self,
hash: Option<BlockHash>,
parser_threads: usize,
) -> Result<Receiver<ReadBlock>> {
let parser_threads = parser_threads.max(1);
let tip = self.client.get_last_height()?;
let canonical = CanonicalRange::walk(&self.client, hash, tip)?;
if canonical.is_empty() {
return Ok(bounded(0).1);
}
let paths = BlkIndexToBlkPath::scan(&self.blocks_dir);
*self.blk_index_to_blk_path.write() = paths.clone();
let (send, recv) = bounded(BOUND_CAP);
let xor_bytes = self.xor_bytes;
let use_tail = canonical.len() <= TAIL_THRESHOLD;
let first_blk_index = if use_tail {
0
} else {
self.find_start_blk_index(Some(canonical.start), &paths, xor_bytes)
.unwrap_or_default()
};
thread::spawn(move || {
let result = if use_tail {
pipeline_tail(&paths, xor_bytes, &canonical, &send)
} else {
pipeline_forward(
&paths,
first_blk_index,
xor_bytes,
&canonical,
&send,
parser_threads,
)
};
if let Err(e) = result {
error!("after_canonical pipeline failed: {e}");
}
});
Ok(recv)
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Forward pipeline — 1 reader + N parsers + shared in-order emission.
// ─────────────────────────────────────────────────────────────────────────────
/// A raw block the reader has already confirmed is on the canonical
/// chain, shipped to the parser pool for full decoding.
struct ScannedBlock {
metadata: BlkMetadata,
bytes: Vec<u8>,
xor_state: XORIndex,
canonical_offset: u32,
}
/// In-order emission buffer shared between the parser threads. Access
/// is serialised through a `parking_lot::Mutex`; at `parser_threads = 1`
/// the lock is always uncontended.
struct ReorderState {
next_offset: u32,
/// Ahead-of-line matches keyed by canonical offset; drained
/// contiguously each time `next_offset` advances. Bounded in
/// practice by parser-thread scheduling jitter — see module doc.
pending: FxHashMap<u32, ReadBlock>,
send_to_consumer: Sender<ReadBlock>,
}
impl ReorderState {
fn new(send_to_consumer: Sender<ReadBlock>) -> Self {
Self {
next_offset: 0,
pending: FxHashMap::default(),
send_to_consumer,
}
}
/// Accepts a parsed canonical block; emits it and drains any
/// contiguous pending matches. Returns `false` iff the consumer
/// dropped the receiver — a pure liveness signal. Completion is
/// checked by the caller via `next_offset`.
fn try_emit(&mut self, offset: u32, block: ReadBlock) -> bool {
use std::cmp::Ordering::*;
match offset.cmp(&self.next_offset) {
Equal => {
if self.send_to_consumer.send(block).is_err() {
return false;
}
self.next_offset += 1;
while let Some(next) = self.pending.remove(&self.next_offset) {
if self.send_to_consumer.send(next).is_err() {
return false;
}
self.next_offset += 1;
}
true
}
Greater => {
self.pending.insert(offset, block);
true
}
// Unreachable in practice: each canonical hash appears at
// exactly one offset and each block is parsed once.
Less => true,
}
}
}
/// Forward pipeline: the reader (this thread) scans blk files and
/// ships canonical hits to a scoped parser pool via `parser_send`;
/// parsers decode bodies and serialise emission through `reorder`.
/// Scoped threads let every parser borrow `canonical`, `reorder`, and
/// `done` directly — no `Arc` required.
///
/// A reorder buffer is required even at `parser_threads = 1` because
/// canonical blocks can arrive out of order across blk files (bitcoind
/// doesn't write in strict chain order during initial sync, headers-
/// first body fetch, or reindex).
fn pipeline_forward(
paths: &BlkIndexToBlkPath,
first_blk_index: u16,
xor_bytes: XORBytes,
canonical: &CanonicalRange,
send: &Sender<ReadBlock>,
parser_threads: usize,
) -> Result<()> {
let (parser_send, parser_recv) = bounded::<ScannedBlock>(BOUND_CAP);
let reorder = Mutex::new(ReorderState::new(send.clone()));
let target_canonical_count = canonical.len() as u32;
let done = AtomicBool::new(false);
thread::scope(|scope| -> Result<()> {
for _ in 0..parser_threads {
let parser_recv = parser_recv.clone();
scope.spawn(|| {
parser_loop(
parser_recv,
&reorder,
&done,
canonical,
xor_bytes,
target_canonical_count,
)
});
}
// Every parser owns its own clone; ours would otherwise keep
// the channel "alive" and leak a dangling receiver.
drop(parser_recv);
let read_result = read_and_dispatch(
paths,
first_blk_index,
xor_bytes,
canonical,
&parser_send,
&done,
);
// Signal end-of-input to the parsers so they exit their `for`
// loops and the scope can join them.
drop(parser_send);
read_result
})?;
let pipeline_cancelled = done.load(Ordering::Relaxed);
let emitted = reorder.lock().next_offset as usize;
if !pipeline_cancelled && emitted < canonical.len() {
return Err(Error::Internal(
"after_canonical forward pipeline: blk files missing canonical blocks",
));
}
Ok(())
}
/// Full-body parse + in-order emit loop run by every scoped parser
/// worker in `pipeline_forward`. Drains `parser_recv` to exhaustion.
fn parser_loop(
parser_recv: Receiver<ScannedBlock>,
reorder: &Mutex<ReorderState>,
done: &AtomicBool,
canonical: &CanonicalRange,
xor_bytes: XORBytes,
target_canonical_count: u32,
) {
for ScannedBlock { metadata, bytes, xor_state, canonical_offset } in parser_recv {
if done.load(Ordering::Relaxed) {
continue;
}
let height = Height::from(*canonical.start + canonical_offset);
let block = match parse_canonical_body(bytes, metadata, xor_state, xor_bytes, height) {
Ok(block) => block,
Err(e) => {
warn!("parse_canonical_body failed: {e}");
continue;
}
};
let pipeline_finished = {
let mut state = reorder.lock();
!state.try_emit(canonical_offset, block)
|| state.next_offset >= target_canonical_count
};
if pipeline_finished {
done.store(true, Ordering::Relaxed);
}
}
}
/// Walk blk files from `first_blk_index`, scan each one, and ship
/// canonical blocks to the parser pool. Non-canonical blocks are
/// rejected via `peek_canonical_offset` *before* being cloned — the
/// cheap filter is what lets a sparse catchup avoid allocating for the
/// ~99% of blocks outside the window.
fn read_and_dispatch(
paths: &BlkIndexToBlkPath,
first_blk_index: u16,
xor_bytes: XORBytes,
canonical: &CanonicalRange,
parser_send: &Sender<ScannedBlock>,
done: &AtomicBool,
) -> Result<()> {
for (&blk_index, blk_path) in paths.range(first_blk_index..) {
if done.load(Ordering::Relaxed) {
return Ok(());
}
let mut bytes = fs::read(blk_path).map_err(|e| {
error!("Failed to read blk file {}: {e}", blk_path.display());
Error::Internal("Failed to read blk file")
})?;
let result = scan_bytes(
&mut bytes,
blk_index,
0,
xor_bytes,
|metadata, block_bytes, xor_state| {
if done.load(Ordering::Relaxed) {
return ControlFlow::Break(());
}
let Some(canonical_offset) =
peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical)
else {
return ControlFlow::Continue(());
};
let scanned = ScannedBlock {
metadata,
bytes: block_bytes.to_vec(),
xor_state,
canonical_offset,
};
if parser_send.send(scanned).is_err() {
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
},
);
if result.interrupted {
return Ok(());
}
}
Ok(())
}
// ─────────────────────────────────────────────────────────────────────────────
// Tail pipeline — reverse-scan the newest blk files in 5 MB chunks until
// every canonical hash has been matched, then emit them forward.
// ─────────────────────────────────────────────────────────────────────────────
fn pipeline_tail(
paths: &BlkIndexToBlkPath,
xor_bytes: XORBytes,
canonical: &CanonicalRange,
send: &Sender<ReadBlock>,
) -> Result<()> {
let mut slots: Vec<Option<ReadBlock>> = (0..canonical.len()).map(|_| None).collect();
let mut remaining = canonical.len();
// Carries the bytes before a chunk's first magic into the next
// (earlier) chunk so blocks straddling the boundary survive.
let mut spillover: Vec<u8> = Vec::new();
'files: for (&blk_index, path) in paths.iter().rev() {
let mut file = File::open(path).map_err(|_| Error::Internal("Failed to open blk file"))?;
let file_len = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
if file_len == 0 {
continue;
}
let mut read_end = file_len;
spillover.clear();
while read_end > 0 && remaining > 0 {
let read_start = read_end.saturating_sub(TAIL_CHUNK);
let chunk_len = read_end - read_start;
read_end = read_start;
file.seek(SeekFrom::Start(read_start as u64))
.map_err(|_| Error::Internal("Failed to seek blk file"))?;
let mut buf = vec![0u8; chunk_len + spillover.len()];
file.read_exact(&mut buf[..chunk_len])
.map_err(|_| Error::Internal("Failed to read blk chunk"))?;
buf[chunk_len..].copy_from_slice(&spillover);
spillover.clear();
let result = scan_bytes(
&mut buf,
blk_index,
read_start,
xor_bytes,
|metadata, block_bytes, xor_state| {
let Some(offset) =
peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical)
else {
return ControlFlow::Continue(());
};
if slots[offset as usize].is_some() {
return ControlFlow::Continue(());
}
let height = Height::from(*canonical.start + offset);
match parse_canonical_body(
block_bytes.to_vec(),
metadata,
xor_state,
xor_bytes,
height,
) {
Ok(block) => {
slots[offset as usize] = Some(block);
remaining -= 1;
}
Err(e) => warn!("parse_canonical_body failed in tail pipeline: {e}"),
}
if remaining == 0 {
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
},
);
if remaining == 0 {
break 'files;
}
if read_start > 0 {
spillover.extend_from_slice(&buf[..result.first_magic.unwrap_or(buf.len())]);
}
}
}
if remaining > 0 {
return Err(Error::Internal(
"after_canonical tail pipeline: blk files missing canonical blocks",
));
}
for block in slots.into_iter().flatten() {
if send.send(block).is_err() {
return Ok(());
}
}
Ok(())
}

View File

@@ -1,73 +0,0 @@
use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable, io::Cursor};
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{BlkMetadata, Block, Height, ReadBlock};
use crate::{XORBytes, XORIndex};
/// Margin for timestamp non-monotonicity
const TIMESTAMP_MARGIN: u32 = 3 * 3600;
#[allow(clippy::too_many_arguments)]
pub fn decode_block(
mut bytes: Vec<u8>,
metadata: BlkMetadata,
client: &Client,
mut xor_i: XORIndex,
xor_bytes: XORBytes,
start: Option<Height>,
end: Option<Height>,
start_time: u32,
end_time: u32,
) -> Result<Option<ReadBlock>> {
xor_i.bytes(bytes.as_mut_slice(), xor_bytes);
let mut cursor = Cursor::new(bytes);
let header = Header::consensus_decode(&mut cursor)?;
// Skip blocks clearly outside the target range using header timestamp
if header.time < start_time.saturating_sub(TIMESTAMP_MARGIN)
|| (end_time > 0 && header.time > end_time.saturating_add(TIMESTAMP_MARGIN))
{
return Ok(None);
}
let hash = header.block_hash();
let Ok(block_header_result) = client.get_block_header_info(&hash) else {
return Ok(None);
};
let height = Height::from(block_header_result.height);
if start.is_some_and(|s| s > height) || end.is_some_and(|e| e < height) {
return Ok(None);
}
if block_header_result.confirmations <= 0 {
return Ok(None);
}
let tx_count = VarInt::consensus_decode(&mut cursor)?.0 as usize;
let mut txdata = Vec::with_capacity(tx_count);
let mut tx_metadata = Vec::with_capacity(tx_count);
let mut tx_offsets = Vec::with_capacity(tx_count);
for _ in 0..tx_count {
let offset = cursor.position() as u32;
tx_offsets.push(offset);
let position = metadata.position() + offset;
let tx = Transaction::consensus_decode(&mut cursor)?;
txdata.push(tx);
let len = cursor.position() as u32 - offset;
tx_metadata.push(BlkMetadata::new(position, len));
}
let block_bytes = cursor.into_inner();
let block = bitcoin::Block { header, txdata };
let mut block = Block::from((height, hash, block));
block.set_raw_data(block_bytes, tx_offsets);
Ok(Some(ReadBlock::from((block, metadata, tx_metadata))))
}

View File

@@ -2,13 +2,11 @@
use std::{
collections::BTreeMap,
fs::{self, File},
io::{Read, Seek, SeekFrom},
ops::ControlFlow,
fs::File,
io::Read,
os::unix::fs::FileExt,
path::{Path, PathBuf},
sync::Arc,
thread,
};
use bitcoin::{block::Header, consensus::Decodable};
@@ -17,33 +15,34 @@ use brk_error::{Error, Result};
use brk_rpc::Client;
use brk_types::{BlkPosition, BlockHash, Height, ReadBlock};
pub use crossbeam::channel::Receiver;
use crossbeam::channel::bounded;
use derive_more::Deref;
use parking_lot::{RwLock, RwLockReadGuard};
use rayon::prelude::*;
use tracing::{error, warn};
mod blk_index_to_blk_path;
mod canonical;
mod decode;
mod parse;
mod pipeline;
mod scan;
mod xor_bytes;
mod xor_index;
pub use canonical::CanonicalRange;
use decode::*;
use scan::*;
pub use xor_bytes::*;
pub use xor_index::*;
const BOUND_CAP: usize = 50;
/// How many blk files to step back from the binary-search hit in
/// [`ReaderInner::find_start_blk_index`]. Guards against blocks that
/// bitcoind wrote to the "current" file slightly out of height order
/// (e.g. the tail of a reorg landing in an earlier file index than
/// its successors).
const START_BLK_INDEX_BACKOFF: usize = 21;
/// Handle to a Bitcoin Core blk-file reader.
///
/// Bitcoin BLK file reader
///
/// Thread safe and free to clone
///
///
/// Cheap to clone (`Arc`-backed) and thread-safe: all streaming
/// methods take `&self` and the returned `Receiver<ReadBlock>` can be
/// drained from any thread.
#[derive(Debug, Clone, Deref)]
pub struct Reader(Arc<ReaderInner>);
@@ -141,265 +140,57 @@ impl ReaderInner {
})
}
/// Returns a receiver streaming `ReadBlock`s from `hash + 1` to the chain tip.
/// If `hash` is `None`, starts from genesis.
// ─────────────────────────────────────────────────────────────────
// Public streaming API — all calls delegate to `pipeline::spawn`.
// ─────────────────────────────────────────────────────────────────
/// Streams every canonical block strictly after `hash` (or from
/// genesis when `None`) up to the current chain tip, in canonical
/// order. Uses the default parser-thread count; see
/// [`after_with`](Self::after_with) to override.
pub fn after(&self, hash: Option<BlockHash>) -> Result<Receiver<ReadBlock>> {
let start = if let Some(hash) = hash.as_ref() {
let info = self.client.get_block_header_info(hash)?;
Height::from(info.height + 1)
} else {
Height::ZERO
};
let end = self.client.get_last_height()?;
if end < start {
return Ok(bounded(0).1);
}
if *end - *start < 10 {
let mut blocks: Vec<_> = self.read_rev(Some(start), Some(end)).iter().collect();
blocks.reverse();
let (send, recv) = bounded(blocks.len());
for block in blocks {
let _ = send.send(block);
}
return Ok(recv);
}
Ok(self.read(Some(start), Some(end)))
self.after_with(hash, pipeline::DEFAULT_PARSER_THREADS)
}
/// Returns a crossbeam channel receiver that streams `ReadBlock`s in chain order.
///
/// Both `start` and `end` are inclusive. `None` means unbounded.
pub fn read(&self, start: Option<Height>, end: Option<Height>) -> Receiver<ReadBlock> {
if let (Some(s), Some(e)) = (start, end)
&& s > e
{
let (_, recv) = bounded(0);
return recv;
}
let client = self.client.clone();
let (send_bytes, recv_bytes) = bounded(BOUND_CAP / 2);
let (send_block, recv_block) = bounded(BOUND_CAP);
let (send_ordered, recv_ordered) = bounded(BOUND_CAP);
let blk_index_to_blk_path = BlkIndexToBlkPath::scan(&self.blocks_dir);
*self.blk_index_to_blk_path.write() = blk_index_to_blk_path.clone();
let xor_bytes = self.xor_bytes;
let first_blk_index = self
.find_start_blk_index(start, &blk_index_to_blk_path, xor_bytes)
.unwrap_or_default();
let get_block_time = |h: Height| -> u32 {
self.client
.get_block_hash(*h as u64)
.ok()
.and_then(|hash| self.client.get_block_header(&hash).ok())
.map(|h| h.time)
.unwrap_or(0)
};
let start_time = start.filter(|h| **h > 0).map(&get_block_time).unwrap_or(0);
let end_time = end.map(&get_block_time).unwrap_or(0);
thread::spawn(move || {
let _ = blk_index_to_blk_path.range(first_blk_index..).try_for_each(
move |(blk_index, blk_path)| {
let Ok(mut bytes) = fs::read(blk_path) else {
error!("Failed to read blk file: {}", blk_path.display());
return ControlFlow::Break(());
};
let result = scan_bytes(
&mut bytes,
*blk_index,
0,
xor_bytes,
|metadata, block_bytes, xor_i| {
// Send owned bytes to the rayon parser pool.
if send_bytes.send((metadata, block_bytes.to_vec(), xor_i)).is_err() {
return ControlFlow::Break(());
}
ControlFlow::Continue(())
},
);
if result.interrupted {
return ControlFlow::Break(());
}
ControlFlow::Continue(())
},
);
});
thread::spawn(move || {
// Private pool to prevent collision with the global pool
let parser_pool = rayon::ThreadPoolBuilder::new()
.num_threads(4.min(thread::available_parallelism().unwrap().get() / 2))
.build()
.expect("Failed to create parser thread pool");
parser_pool.install(|| {
let _ =
recv_bytes
.into_iter()
.par_bridge()
.try_for_each(|(metadata, bytes, xor_i)| {
let position = metadata.position();
match decode_block(
bytes, metadata, &client, xor_i, xor_bytes, start, end, start_time,
end_time,
) {
Ok(Some(block)) => {
if send_block.send(block).is_err() {
return ControlFlow::Break(());
}
}
Ok(None) => {} // Block filtered out (outside range, unconfirmed)
Err(e) => {
warn!("Failed to decode block at {position}: {e}");
}
}
ControlFlow::Continue(())
});
});
});
thread::spawn(move || {
let mut current_height = start.unwrap_or_default();
let mut prev_hash: Option<BlockHash> = None;
let mut future_blocks = BTreeMap::default();
let _ = recv_block
.iter()
.try_for_each(|block| -> ControlFlow<(), _> {
let mut opt = if current_height == block.height() {
Some(block)
} else {
future_blocks.insert(block.height(), block);
None
};
while let Some(block) = opt.take().or_else(|| {
if !future_blocks.is_empty() {
future_blocks.remove(&current_height)
} else {
None
}
}) {
if let Some(expected_prev) = prev_hash.as_ref() && block.header.prev_blockhash != expected_prev.into() {
error!(
"Chain discontinuity detected at height {}: expected prev_hash {}, got {}. Stopping iteration.",
*block.height(),
expected_prev,
block.header.prev_blockhash
);
return ControlFlow::Break(());
}
prev_hash = Some(block.hash().clone());
if send_ordered.send(block).is_err() {
return ControlFlow::Break(());
}
current_height.increment();
if end.is_some_and(|end| current_height > end) {
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
});
});
recv_ordered
/// Like [`after`](Self::after) but with a configurable number of
/// parser threads. `parser_threads = 1` is the minimal-thread
/// default (1 reader + 1 parser, uncontended mutex). Higher values
/// trade extra cores for throughput on dense ranges where the
/// parser is the bottleneck.
pub fn after_with(
&self,
hash: Option<BlockHash>,
parser_threads: usize,
) -> Result<Receiver<ReadBlock>> {
let tip = self.client.get_last_height()?;
let canonical = CanonicalRange::walk(&self.client, hash, tip)?;
pipeline::spawn(self, canonical, parser_threads)
}
/// Streams `ReadBlock`s in reverse order (newest first) by scanning
/// `.blk` files from the tail. Efficient for reading recent blocks.
/// Both `start` and `end` are inclusive. `None` means unbounded.
pub fn read_rev(&self, start: Option<Height>, end: Option<Height>) -> Receiver<ReadBlock> {
const CHUNK: usize = 5 * 1024 * 1024;
if let (Some(s), Some(e)) = (start, end)
&& s > e
{
return bounded(0).1;
}
let client = self.client.clone();
let xor_bytes = self.xor_bytes;
let paths = BlkIndexToBlkPath::scan(&self.blocks_dir);
*self.blk_index_to_blk_path.write() = paths.clone();
let (send, recv) = bounded(BOUND_CAP);
thread::spawn(move || {
let mut head = Vec::new();
for (&blk_index, path) in paths.iter().rev() {
let file_len = fs::metadata(path).map(|m| m.len() as usize).unwrap_or(0);
if file_len == 0 {
continue;
}
let Ok(mut file) = File::open(path) else {
return;
};
let mut read_end = file_len;
while read_end > 0 {
let read_start = read_end.saturating_sub(CHUNK);
let chunk_len = read_end - read_start;
read_end = read_start;
let _ = file.seek(SeekFrom::Start(read_start as u64));
let mut buf = vec![0u8; chunk_len + head.len()];
if file.read_exact(&mut buf[..chunk_len]).is_err() {
return;
}
buf[chunk_len..].copy_from_slice(&head);
head.clear();
let mut blocks = Vec::new();
let result = scan_bytes(
&mut buf,
blk_index,
read_start,
xor_bytes,
|metadata, bytes, xor_i| {
// `decode_block` needs owned bytes — it XOR-
// decodes in place before parsing.
if let Ok(Some(block)) = decode_block(
bytes.to_vec(), metadata, &client, xor_i, xor_bytes, start, end, 0, 0,
) {
blocks.push(block);
}
ControlFlow::Continue(())
},
);
for block in blocks.into_iter().rev() {
let done = start.is_some_and(|s| block.height() <= s);
if send.send(block).is_err() || done {
return;
}
}
if read_start > 0 {
head = buf[..result.first_magic.unwrap_or(buf.len())].to_vec();
}
}
}
});
recv
/// Streams every canonical block in the inclusive height range
/// `start..=end` in canonical order, via the same pipeline as
/// [`after`](Self::after).
pub fn range(&self, start: Height, end: Height) -> Result<Receiver<ReadBlock>> {
self.range_with(start, end, pipeline::DEFAULT_PARSER_THREADS)
}
/// Like [`range`](Self::range) but with a configurable number of
/// parser threads. See [`after_with`](Self::after_with) for the
/// parser-count tradeoff.
pub fn range_with(
&self,
start: Height,
end: Height,
parser_threads: usize,
) -> Result<Receiver<ReadBlock>> {
let canonical = CanonicalRange::between(&self.client, start, end)?;
pipeline::spawn(self, canonical, parser_threads)
}
/// Binary-searches `blk_index_to_blk_path` for the first file
/// whose earliest block height is ≤ `target_start`, then backs
/// off a few files as a safety margin for blocks that were written
/// out of height order (see [`START_BLK_INDEX_BACKOFF`]).
fn find_start_blk_index(
&self,
target_start: Option<Height>,
@@ -411,7 +202,6 @@ impl ReaderInner {
};
let blk_indices: Vec<u16> = blk_index_to_blk_path.keys().copied().collect();
if blk_indices.is_empty() {
return Ok(0);
}
@@ -424,37 +214,33 @@ impl ReaderInner {
let mid = (left + right) / 2;
let blk_index = blk_indices[mid];
if let Some(blk_path) = blk_index_to_blk_path.get(&blk_index) {
match self.get_first_block_height(blk_path, xor_bytes) {
Ok(height) => {
if height <= target_start {
best_start_idx = mid;
left = mid + 1;
} else {
if mid == 0 {
break;
}
right = mid - 1;
}
}
Err(_) => {
left = mid + 1;
}
}
} else {
let Some(blk_path) = blk_index_to_blk_path.get(&blk_index) else {
break;
};
match self.first_block_height(blk_path, xor_bytes) {
Ok(height) if height <= target_start => {
best_start_idx = mid;
left = mid + 1;
}
Ok(_) => {
if mid == 0 {
break;
}
right = mid - 1;
}
Err(_) => {
left = mid + 1;
}
}
}
// buffer for worst-case scenarios when a block as far behind
let final_idx = best_start_idx.saturating_sub(21);
let final_idx = best_start_idx.saturating_sub(START_BLK_INDEX_BACKOFF);
Ok(blk_indices.get(final_idx).copied().unwrap_or(0))
}
pub fn get_first_block_height(
pub fn first_block_height(
&self,
blk_path: &PathBuf,
blk_path: &Path,
xor_bytes: XORBytes,
) -> Result<Height> {
let mut file = File::open(blk_path)?;

View File

@@ -0,0 +1,73 @@
//! Pure block parsing — XOR decoding, header and body decode.
//!
//! Split into a cheap header peek and a full body parse so the scan
//! loop can reject non-canonical blocks without copying them. No RPC,
//! no threading, no state.
use std::io::Cursor;
use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable};
use brk_error::{Error, Result};
use brk_types::{BlkMetadata, Block, BlockHash, Height, ReadBlock};
use crate::{XORBytes, XORIndex, canonical::CanonicalRange};
const HEADER_LEN: usize = 80;
/// Returns the canonical offset of `bytes` if its header hashes to a
/// known canonical block, otherwise `None`. Does not allocate and does
/// not mutate `bytes`: the header is copied onto a stack buffer and
/// XOR-decoded there so an orphan short-circuits cleanly and a
/// canonical hit can still be cloned out intact.
pub fn peek_canonical_offset(
bytes: &[u8],
mut xor_state: XORIndex,
xor_bytes: XORBytes,
canonical: &CanonicalRange,
) -> Option<u32> {
if bytes.len() < HEADER_LEN {
return None;
}
let mut header_buf = [0u8; HEADER_LEN];
header_buf.copy_from_slice(&bytes[..HEADER_LEN]);
xor_state.bytes(&mut header_buf, xor_bytes);
let header = Header::consensus_decode(&mut &header_buf[..]).ok()?;
canonical.offset_of(&BlockHash::from(header.block_hash()))
}
/// Full XOR-decode + parse for a block that has already been confirmed
/// canonical by `peek_canonical_offset`. Takes owned `bytes` so it can
/// mutate them in place and hand them to the resulting `ReadBlock`.
pub fn parse_canonical_body(
mut bytes: Vec<u8>,
metadata: BlkMetadata,
mut xor_state: XORIndex,
xor_bytes: XORBytes,
height: Height,
) -> Result<ReadBlock> {
if bytes.len() < HEADER_LEN {
return Err(Error::Internal("Block bytes shorter than header"));
}
xor_state.bytes(&mut bytes, xor_bytes);
let mut cursor = Cursor::new(bytes);
let header = Header::consensus_decode(&mut cursor)?;
let bitcoin_hash = header.block_hash();
let tx_count = VarInt::consensus_decode(&mut cursor)?.0 as usize;
let mut txdata = Vec::with_capacity(tx_count);
let mut tx_metadata = Vec::with_capacity(tx_count);
let mut tx_offsets = Vec::with_capacity(tx_count);
for _ in 0..tx_count {
let tx_start = cursor.position() as u32;
tx_offsets.push(tx_start);
let tx = Transaction::consensus_decode(&mut cursor)?;
let tx_len = cursor.position() as u32 - tx_start;
txdata.push(tx);
tx_metadata.push(BlkMetadata::new(metadata.position() + tx_start, tx_len));
}
let raw_bytes = cursor.into_inner();
let mut block = Block::from((height, bitcoin_hash, bitcoin::Block { header, txdata }));
block.set_raw_data(raw_bytes, tx_offsets);
Ok(ReadBlock::from((block, metadata, tx_metadata)))
}

View File

@@ -0,0 +1,470 @@
//! The actual pipeline turning a blk-file scan into an ordered
//! `ReadBlock` stream. [`spawn`] picks between two strategies:
//!
//! * **[`pipeline_forward`]** — one reader thread walks blk files in
//! order, peeks each block's header against the pre-fetched
//! `CanonicalRange`, and ships canonical hits over an mpmc channel
//! to a scoped parser pool of `parser_threads` workers, which decode
//! bodies in parallel and serialise emission through a shared
//! [`ReorderState`] mutex. Used when the range is larger than
//! `TAIL_THRESHOLD`.
//! * **[`pipeline_tail`]** — single-threaded reverse scan of the
//! newest blk files in 5 MB chunks, buffering every canonical match
//! in offset-indexed slots and then emitting through [`ReorderState`]
//! in the same order. Used for `canonical.len() <= TAIL_THRESHOLD`,
//! where the channel + lock overhead of the forward pipeline would
//! dominate.
//!
//! Both pipelines route emission through [`ReorderState`], which
//! verifies `block.header.prev_blockhash` against the previously
//! emitted block's hash and aborts cleanly if the canonical-hash batch
//! that produced the stream was stitched across a mid-batch reorg.
//!
//! Canonical blocks can also arrive out of order across blk files
//! (bitcoind doesn't write in strict chain order during initial sync,
//! headers-first body fetch, or reindex), so the reorder buffer is
//! required even at `parser_threads = 1`.
use std::{
fs::{self, File},
io::{Read, Seek, SeekFrom},
ops::ControlFlow,
sync::atomic::{AtomicBool, Ordering},
thread,
};
use brk_error::{Error, Result};
use brk_types::{BlkMetadata, BlockHash, Height, ReadBlock};
use crossbeam::channel::{Receiver, Sender, bounded};
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use tracing::{error, warn};
use crate::{
BlkIndexToBlkPath, ReaderInner, XORBytes, XORIndex,
canonical::CanonicalRange,
parse::{parse_canonical_body, peek_canonical_offset},
scan::scan_bytes,
};
const CHANNEL_CAPACITY: usize = 50;
const TAIL_CHUNK: usize = 5 * 1024 * 1024;
/// Up to this many canonical blocks → tail pipeline. Beyond → forward.
const TAIL_THRESHOLD: usize = 10;
/// Default parser-thread count for [`ReaderInner::after`]. The indexer
/// is CPU-bound on the consumer side, so 1 parser + 1 reader (= 2
/// threads total) leaves the rest of the cores for the indexer. Bench
/// tools that drain the channel cheaply can override via
/// [`ReaderInner::after_with`].
pub(crate) const DEFAULT_PARSER_THREADS: usize = 1;
// ─────────────────────────────────────────────────────────────────────────────
// Shared pipeline entry — called by `Reader::after_with` and `Reader::range_with`.
// ─────────────────────────────────────────────────────────────────────────────
/// Spawns the reader worker and (for non-tail ranges) a scoped parser
/// pool, and returns the consumer receiver. Shared backend for
/// `after_with` and `range_with`.
pub(crate) fn spawn(
reader: &ReaderInner,
canonical: CanonicalRange,
parser_threads: usize,
) -> Result<Receiver<ReadBlock>> {
let parser_threads = parser_threads.max(1);
if canonical.is_empty() {
return Ok(bounded(0).1);
}
let paths = BlkIndexToBlkPath::scan(reader.blocks_dir());
*reader.blk_index_to_blk_path.write() = paths.clone();
let (send, recv) = bounded(CHANNEL_CAPACITY);
let xor_bytes = reader.xor_bytes();
let use_tail = canonical.len() <= TAIL_THRESHOLD;
let first_blk_index = if use_tail {
0
} else {
reader
.find_start_blk_index(Some(canonical.start), &paths, xor_bytes)
.unwrap_or_default()
};
thread::spawn(move || {
let result = if use_tail {
pipeline_tail(&paths, xor_bytes, &canonical, &send)
} else {
pipeline_forward(
&paths,
first_blk_index,
xor_bytes,
&canonical,
&send,
parser_threads,
)
};
if let Err(e) = result {
error!("Reader canonical pipeline failed: {e}");
}
});
Ok(recv)
}
// ─────────────────────────────────────────────────────────────────────────────
// Forward pipeline — 1 reader + N parsers + shared in-order emission.
// ─────────────────────────────────────────────────────────────────────────────
/// A raw block the reader has already confirmed is on the canonical
/// chain, shipped to the parser pool for full decoding.
struct ScannedBlock {
metadata: BlkMetadata,
bytes: Vec<u8>,
xor_state: XORIndex,
canonical_offset: u32,
}
/// In-order emission buffer shared between the parser threads. Access
/// is serialised through a `parking_lot::Mutex`; at `parser_threads = 1`
/// the lock is always uncontended.
///
/// Also enforces **chain continuity**: before emitting each block it
/// checks that `block.header.prev_blockhash` matches the previously-
/// emitted block's hash. A mismatch means the canonical-hash batch
/// that produced this stream was stitched across a mid-batch reorg,
/// so we stop emitting cleanly and let the caller retry.
struct ReorderState {
next_offset: u32,
/// Ahead-of-line matches keyed by canonical offset; drained
/// contiguously each time `next_offset` advances. Bounded in
/// practice by parser-thread scheduling jitter.
pending: FxHashMap<u32, ReadBlock>,
send_to_consumer: Sender<ReadBlock>,
/// Hash of the last block successfully emitted, used to verify
/// continuity with the next one. `None` before the first emit.
last_emitted_hash: Option<BlockHash>,
/// Flipped when a continuity check fails.
chain_broken: bool,
}
impl ReorderState {
fn new(send_to_consumer: Sender<ReadBlock>) -> Self {
Self {
next_offset: 0,
pending: FxHashMap::default(),
send_to_consumer,
last_emitted_hash: None,
chain_broken: false,
}
}
/// Accepts a parsed canonical block; emits it and drains any
/// contiguous pending matches. Returns `false` once the pipeline
/// should stop — either the consumer dropped the receiver or a
/// chain-continuity check failed. Completion (all blocks emitted)
/// is checked by the caller via `next_offset`.
fn try_emit(&mut self, offset: u32, block: ReadBlock) -> bool {
use std::cmp::Ordering::*;
match offset.cmp(&self.next_offset) {
Equal => {
if !self.send_in_order(block) {
return false;
}
while let Some(next) = self.pending.remove(&self.next_offset) {
if !self.send_in_order(next) {
return false;
}
}
true
}
Greater => {
self.pending.insert(offset, block);
true
}
// Unreachable in practice: each canonical hash appears at
// exactly one offset and each block is parsed once.
Less => true,
}
}
/// Verifies `block.prev_blockhash` against the last emitted hash,
/// sends the block, and bumps `next_offset`. Returns `false` on
/// continuity failure or consumer drop.
fn send_in_order(&mut self, block: ReadBlock) -> bool {
if let Some(last) = &self.last_emitted_hash {
let prev = BlockHash::from(block.header.prev_blockhash);
if prev != *last {
warn!(
"canonical chain broken at offset {}: expected prev={} got {}",
self.next_offset, last, prev,
);
self.chain_broken = true;
return false;
}
}
let hash = block.hash().clone();
if self.send_to_consumer.send(block).is_err() {
return false;
}
self.last_emitted_hash = Some(hash);
self.next_offset += 1;
true
}
}
fn pipeline_forward(
paths: &BlkIndexToBlkPath,
first_blk_index: u16,
xor_bytes: XORBytes,
canonical: &CanonicalRange,
send: &Sender<ReadBlock>,
parser_threads: usize,
) -> Result<()> {
let (parser_send, parser_recv) = bounded::<ScannedBlock>(CHANNEL_CAPACITY);
let reorder = Mutex::new(ReorderState::new(send.clone()));
let done = AtomicBool::new(false);
thread::scope(|scope| -> Result<()> {
for _ in 0..parser_threads {
let parser_recv = parser_recv.clone();
scope.spawn(|| parser_loop(parser_recv, &reorder, &done, canonical, xor_bytes));
}
// Every parser owns its own clone; ours would otherwise keep
// the channel "alive" and leak a dangling receiver.
drop(parser_recv);
let read_result = read_and_dispatch(
paths,
first_blk_index,
xor_bytes,
canonical,
&parser_send,
&done,
);
// Signal end-of-input to the parsers so they exit their `for`
// loops and the scope can join them.
drop(parser_send);
read_result
})?;
let state = reorder.lock();
if state.chain_broken {
return Err(Error::Internal(
"forward pipeline: canonical batch stitched across a reorg",
));
}
let pipeline_cancelled = done.load(Ordering::Relaxed);
let emitted = state.next_offset as usize;
if !pipeline_cancelled && emitted < canonical.len() {
return Err(Error::Internal(
"forward pipeline: blk files missing canonical blocks",
));
}
Ok(())
}
/// Full-body parse + in-order emit loop run by every scoped parser
/// worker in `pipeline_forward`. Drains `parser_recv` to exhaustion.
fn parser_loop(
parser_recv: Receiver<ScannedBlock>,
reorder: &Mutex<ReorderState>,
done: &AtomicBool,
canonical: &CanonicalRange,
xor_bytes: XORBytes,
) {
for ScannedBlock { metadata, bytes, xor_state, canonical_offset } in parser_recv {
if done.load(Ordering::Relaxed) {
continue;
}
let height = Height::from(*canonical.start + canonical_offset);
let block = match parse_canonical_body(bytes, metadata, xor_state, xor_bytes, height) {
Ok(block) => block,
Err(e) => {
warn!("parse_canonical_body failed: {e}");
continue;
}
};
let pipeline_finished = {
let mut state = reorder.lock();
!state.try_emit(canonical_offset, block)
|| state.next_offset as usize >= canonical.len()
};
if pipeline_finished {
done.store(true, Ordering::Relaxed);
}
}
}
/// Walk blk files from `first_blk_index`, scan each one, and ship
/// canonical blocks to the parser pool. Non-canonical blocks are
/// rejected via `peek_canonical_offset` *before* being cloned — the
/// cheap filter is what lets a sparse catchup avoid allocating for the
/// ~99% of blocks outside the window.
fn read_and_dispatch(
paths: &BlkIndexToBlkPath,
first_blk_index: u16,
xor_bytes: XORBytes,
canonical: &CanonicalRange,
parser_send: &Sender<ScannedBlock>,
done: &AtomicBool,
) -> Result<()> {
for (&blk_index, blk_path) in paths.range(first_blk_index..) {
if done.load(Ordering::Relaxed) {
return Ok(());
}
let mut bytes = fs::read(blk_path).map_err(|e| {
error!("Failed to read blk file {}: {e}", blk_path.display());
Error::Internal("Failed to read blk file")
})?;
let result = scan_bytes(
&mut bytes,
blk_index,
0,
xor_bytes,
|metadata, block_bytes, xor_state| {
if done.load(Ordering::Relaxed) {
return ControlFlow::Break(());
}
let Some(canonical_offset) =
peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical)
else {
return ControlFlow::Continue(());
};
let scanned = ScannedBlock {
metadata,
bytes: block_bytes.to_vec(),
xor_state,
canonical_offset,
};
if parser_send.send(scanned).is_err() {
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
},
);
if result.interrupted {
return Ok(());
}
}
Ok(())
}
// ─────────────────────────────────────────────────────────────────────────────
// Tail pipeline — reverse-scan the newest blk files in 5 MB chunks until
// every canonical hash has been matched, then emit them forward.
// ─────────────────────────────────────────────────────────────────────────────
/// Single-threaded tail-range pipeline for small `canonical.len()`.
/// Walks blk files in reverse-index order, reads each one in 5 MB
/// chunks from tail to head, and stuffs every canonical match into an
/// offset-indexed `slots` vec. Once every canonical block is matched,
/// emits them in order through [`ReorderState`] (which doubles as the
/// shared continuity checker). Bails on missing blocks or a chain
/// break just like [`pipeline_forward`].
fn pipeline_tail(
paths: &BlkIndexToBlkPath,
xor_bytes: XORBytes,
canonical: &CanonicalRange,
send: &Sender<ReadBlock>,
) -> Result<()> {
let mut slots: Vec<Option<ReadBlock>> = (0..canonical.len()).map(|_| None).collect();
let mut remaining = canonical.len();
// Carries the bytes before a chunk's first magic into the next
// (earlier) chunk so blocks straddling the boundary survive.
let mut spillover: Vec<u8> = Vec::new();
'files: for (&blk_index, path) in paths.iter().rev() {
let mut file = File::open(path).map_err(|_| Error::Internal("Failed to open blk file"))?;
let file_len = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
if file_len == 0 {
continue;
}
let mut read_end = file_len;
spillover.clear();
while read_end > 0 && remaining > 0 {
let read_start = read_end.saturating_sub(TAIL_CHUNK);
let chunk_len = read_end - read_start;
read_end = read_start;
file.seek(SeekFrom::Start(read_start as u64))
.map_err(|_| Error::Internal("Failed to seek blk file"))?;
let mut buf = vec![0u8; chunk_len + spillover.len()];
file.read_exact(&mut buf[..chunk_len])
.map_err(|_| Error::Internal("Failed to read blk chunk"))?;
buf[chunk_len..].copy_from_slice(&spillover);
spillover.clear();
let result = scan_bytes(
&mut buf,
blk_index,
read_start,
xor_bytes,
|metadata, block_bytes, xor_state| {
let Some(offset) =
peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical)
else {
return ControlFlow::Continue(());
};
if slots[offset as usize].is_some() {
return ControlFlow::Continue(());
}
let height = Height::from(*canonical.start + offset);
match parse_canonical_body(
block_bytes.to_vec(),
metadata,
xor_state,
xor_bytes,
height,
) {
Ok(block) => {
slots[offset as usize] = Some(block);
remaining -= 1;
}
Err(e) => warn!("parse_canonical_body failed in tail pipeline: {e}"),
}
if remaining == 0 {
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
},
);
if remaining == 0 {
break 'files;
}
if read_start > 0 {
spillover.extend_from_slice(&buf[..result.first_magic.unwrap_or(buf.len())]);
}
}
}
if remaining > 0 {
return Err(Error::Internal(
"tail pipeline: blk files missing canonical blocks",
));
}
// Emit in canonical order via the same `ReorderState` the forward
// pipeline uses, which verifies `prev_blockhash` continuity between
// adjacent blocks as a side effect of `try_emit`.
let mut reorder = ReorderState::new(send.clone());
for (offset, block) in slots.into_iter().flatten().enumerate() {
if !reorder.try_emit(offset as u32, block) {
break;
}
}
if reorder.chain_broken {
return Err(Error::Internal(
"tail pipeline: canonical batch stitched across a reorg",
));
}
Ok(())
}

View File

@@ -184,8 +184,6 @@ impl ClientInner {
/// range `start..=end`. Internally splits into JSON-RPC batches of
/// `BATCH_CHUNK` requests so a 1M-block reindex doesn't try to push
/// a 50 MB request body or hold every response in memory at once.
/// Each chunk is one HTTP round-trip — still drops the per-call
/// overhead that dominates a sequential `get_block_hash` loop.
///
/// Returns hashes in canonical order (`start`, `start+1`, …, `end`).
pub fn get_block_hashes_range(
@@ -214,9 +212,6 @@ impl ClientInner {
end: u64,
out: &mut Vec<bitcoin::BlockHash>,
) -> Result<()> {
// Build raw param strings up front so each `Request` can borrow
// them; `corepc_jsonrpc::Client::build_request` takes a borrowed
// `&RawValue`.
let params: Vec<Box<RawValue>> = (start..=end)
.map(|h| {
RawValue::from_string(format!("[{h}]")).map_err(|e| Error::Parse(e.to_string()))