mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
global: big snapshot part 2
This commit is contained in:
@@ -2,12 +2,15 @@
|
||||
//! versus `Reader::after_canonical` (1 reader + N parser threads + canonical
|
||||
//! hash filter).
|
||||
//!
|
||||
//! Two phases:
|
||||
//! Three phases:
|
||||
//!
|
||||
//! 1. **Tail scenarios** — pick an anchor `N` blocks below the chain tip
|
||||
//! and run each implementation `REPEATS` times. Exercises the tail
|
||||
//! (≤10) and forward (>10) code paths under realistic catchup ranges.
|
||||
//! 2. **Full reindex** — anchor=`None` (genesis to tip), one run per
|
||||
//! 2. **Partial reindex** — anchor=`None` but stop after
|
||||
//! `PARTIAL_LIMIT` blocks. Exercises the early-chain blk files
|
||||
//! where blocks are small and dense-parsing isn't the bottleneck.
|
||||
//! 3. **Full reindex** — anchor=`None` (genesis to tip), one run per
|
||||
//! config. Exercises every blk file once and shows steady-state
|
||||
//! throughput on the densest possible workload.
|
||||
//!
|
||||
@@ -25,6 +28,7 @@ use brk_types::{BlockHash, Height, ReadBlock};
|
||||
|
||||
const SCENARIOS: &[usize] = &[5, 10, 100, 1_000, 10_000];
|
||||
const REPEATS: usize = 3;
|
||||
const PARTIAL_LIMIT: usize = 400_000;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let bitcoin_dir = Client::default_bitcoin_path();
|
||||
@@ -68,6 +72,26 @@ fn main() -> Result<()> {
|
||||
println!();
|
||||
}
|
||||
|
||||
println!();
|
||||
println!("Partial reindex (genesis → {PARTIAL_LIMIT} blocks), one run per config:");
|
||||
println!(
|
||||
"{:>10} {:>16} {:>12} {:>10}",
|
||||
"blocks", "impl", "elapsed", "blk/s"
|
||||
);
|
||||
println!("{}", "-".repeat(54));
|
||||
|
||||
let after_partial = run_bounded(PARTIAL_LIMIT, || reader.after(None))?;
|
||||
print_full_row("after", &after_partial);
|
||||
let p1_partial = run_bounded(PARTIAL_LIMIT, || reader.after_canonical(None))?;
|
||||
print_full_row("canonical[p=1]", &p1_partial);
|
||||
sanity_check_full(&after_partial, &p1_partial);
|
||||
let p4_partial = run_bounded(PARTIAL_LIMIT, || reader.after_canonical_with(None, 4))?;
|
||||
print_full_row("canonical[p=4]", &p4_partial);
|
||||
sanity_check_full(&after_partial, &p4_partial);
|
||||
let p16_partial = run_bounded(PARTIAL_LIMIT, || reader.after_canonical_with(None, 16))?;
|
||||
print_full_row("canonical[p=16]", &p16_partial);
|
||||
sanity_check_full(&after_partial, &p16_partial);
|
||||
|
||||
println!();
|
||||
println!("Full reindex (genesis → tip), one run per config:");
|
||||
println!(
|
||||
@@ -176,6 +200,27 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
/// Runs the pipeline starting from genesis but stops consuming once
|
||||
/// `limit` blocks have been received. Dropping the receiver then closes
|
||||
/// the channel, which unblocks and unwinds the reader's spawned worker.
|
||||
fn run_bounded<F>(limit: usize, mut f: F) -> Result<FullRun>
|
||||
where
|
||||
F: FnMut() -> Result<Receiver<ReadBlock>>,
|
||||
{
|
||||
let start = Instant::now();
|
||||
let recv = f()?;
|
||||
let mut count = 0;
|
||||
for block in recv.iter().take(limit) {
|
||||
std::hint::black_box(block.height());
|
||||
count += 1;
|
||||
}
|
||||
let elapsed = start.elapsed();
|
||||
// Explicit drop so the reader worker sees the channel close before
|
||||
// the next bench config spins up another one.
|
||||
drop(recv);
|
||||
Ok(FullRun { elapsed, count })
|
||||
}
|
||||
|
||||
fn print_full_row(label: &str, run: &FullRun) {
|
||||
let blk_per_s = if run.elapsed.is_zero() {
|
||||
0.0
|
||||
|
||||
@@ -1,32 +1,44 @@
|
||||
//! Canonical-hash pipeline for `Reader::after`.
|
||||
//!
|
||||
//! Three pieces, each with one job:
|
||||
//! Bitcoin Core stores accepted blocks in append-only `blk*.dat` files
|
||||
//! under the data dir, XOR-encoded with a per-datadir key. A "blk
|
||||
//! file" contains every block the node ever accepted — including
|
||||
//! blocks that were later orphaned by a reorg — in acceptance order,
|
||||
//! not height order. This module turns "give me every block after
|
||||
//! `hash` up to the tip" into an ordered `ReadBlock` stream drawn from
|
||||
//! those files while skipping orphans.
|
||||
//!
|
||||
//! * **`CanonicalRange::walk`** is the only place bitcoind is consulted
|
||||
//! about the main chain. It batch-fetches every canonical hash in the
|
||||
//! target window once, up front, via `getblockhash` JSON-RPC batching.
|
||||
//! * **`parse_canonical_block`** is a pure function of raw blk bytes.
|
||||
//! It XOR-decodes only the 80-byte header, looks the hash up in the
|
||||
//! pre-fetched `CanonicalRange`, and short-circuits orphans before
|
||||
//! touching the (expensive) transaction body. No RPC, no `confirmations`
|
||||
//! filter, no chain logic.
|
||||
//! * **`pipeline_forward` / `pipeline_tail`** wire the scan loop to a
|
||||
//! parser pool. The forward pipeline runs 1 reader + N parser threads
|
||||
//! (default `N = 1`, configurable via `after_canonical_with`); the
|
||||
//! tail pipeline (≤10 blocks) stays inline on a single thread because
|
||||
//! channel/lock overhead would dominate.
|
||||
//! How it works:
|
||||
//!
|
||||
//! Coexists with the original `read`/`read_rev`/`after` so the two can be
|
||||
//! A/B-tested from the indexer.
|
||||
//! 1. [`CanonicalRange::walk`] asks bitcoind once, up front, for the
|
||||
//! canonical block hash at every height in the target window. This
|
||||
//! is one batched JSON-RPC request — no per-block RPC overhead.
|
||||
//! 2. The reader walks blk files in order and scans each one for the
|
||||
//! block magic prefix. For every block found,
|
||||
//! [`peek_canonical_offset`] hashes the 80-byte header and looks
|
||||
//! the hash up in the canonical map. Orphans short-circuit here,
|
||||
//! before any bytes are cloned.
|
||||
//! 3. Canonical hits are cloned into [`ScannedBlock`]s and shipped
|
||||
//! over a channel to a small pool of parser workers, which run
|
||||
//! [`parse_canonical_body`] to fully decode the block.
|
||||
//! 4. Parsers serialise their output through [`ReorderState`] so that
|
||||
//! the consumer receives blocks in canonical-height order even if
|
||||
//! the blk files emitted them out of order.
|
||||
//!
|
||||
//! Ranges of at most `TAIL_THRESHOLD` blocks take a specialised
|
||||
//! [`pipeline_tail`] path that reverse-scans the newest blk files in
|
||||
//! 5 MB chunks — cheaper than walking forward from genesis for a
|
||||
//! handful of tip blocks.
|
||||
//!
|
||||
//! Public entry points: [`ReaderInner::after_canonical`] and
|
||||
//! [`ReaderInner::after_canonical_with`]. Coexists with the original
|
||||
//! `read` / `read_rev` / `after` so the two can be A/B-tested.
|
||||
|
||||
use std::{
|
||||
fs::{self, File},
|
||||
io::{Cursor, Read, Seek, SeekFrom},
|
||||
ops::ControlFlow,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
},
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
thread,
|
||||
};
|
||||
|
||||
@@ -39,10 +51,7 @@ use parking_lot::Mutex;
|
||||
use rustc_hash::FxHashMap;
|
||||
use tracing::{error, warn};
|
||||
|
||||
use crate::{
|
||||
BlkIndexToBlkPath, ReaderInner, XORBytes, XORIndex,
|
||||
scan::{ScanResult, scan_bytes},
|
||||
};
|
||||
use crate::{BlkIndexToBlkPath, ReaderInner, XORBytes, XORIndex, scan::scan_bytes};
|
||||
|
||||
const BOUND_CAP: usize = 50;
|
||||
const TAIL_CHUNK: usize = 5 * 1024 * 1024;
|
||||
@@ -58,64 +67,50 @@ const DEFAULT_PARSER_THREADS: usize = 1;
|
||||
// CanonicalRange — the only RPC-aware piece in this file.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Forward-ordered canonical hashes for `start..=end`, resolved once up front.
|
||||
///
|
||||
/// `hashes[i]` is the canonical block hash at height `start + i`.
|
||||
/// `by_prefix` maps the 8-byte `BlockHashPrefix` of every canonical hash to
|
||||
/// its offset — same prefix-keyed scheme brk already uses in `stores`.
|
||||
/// Lookups verify the full hash via `hashes[offset]`, neutralising the
|
||||
/// (astronomically small) prefix collision risk at zero extra cost.
|
||||
/// Every canonical block hash in a contiguous height window, resolved
|
||||
/// from bitcoind once up front. `hashes[i]` is the canonical hash at
|
||||
/// height `start + i`. Lookups by hash go through `by_prefix` (8-byte
|
||||
/// key, same scheme as `brk_store`) and verify the full hash on hit.
|
||||
pub struct CanonicalRange {
|
||||
pub start: Height,
|
||||
pub end: Height,
|
||||
hashes: Vec<BlockHash>,
|
||||
by_prefix: FxHashMap<BlockHashPrefix, u32>,
|
||||
}
|
||||
|
||||
impl CanonicalRange {
|
||||
/// Resolves canonical hashes for every height strictly after `anchor`
|
||||
/// up to `tip` inclusive. If `anchor` is `None`, starts at genesis.
|
||||
///
|
||||
/// Uses `get_block_hash(h)` which is a deterministic height → canonical
|
||||
/// hash lookup — no race window against in-progress reorgs.
|
||||
/// up to `tip` inclusive. `anchor = None` starts at genesis.
|
||||
pub fn walk(client: &Client, anchor: Option<BlockHash>, tip: Height) -> Result<Self> {
|
||||
let start = match anchor {
|
||||
Some(hash) => {
|
||||
let info = client.get_block_header_info(&hash)?;
|
||||
Height::from(info.height + 1)
|
||||
}
|
||||
Some(hash) => Height::from(client.get_block_header_info(&hash)?.height + 1),
|
||||
None => Height::ZERO,
|
||||
};
|
||||
|
||||
if start > tip {
|
||||
return Ok(Self::empty(start));
|
||||
return Ok(Self {
|
||||
start,
|
||||
hashes: Vec::new(),
|
||||
by_prefix: FxHashMap::default(),
|
||||
});
|
||||
}
|
||||
|
||||
let len = (*tip - *start + 1) as usize;
|
||||
let hashes = client.get_block_hashes_range(*start, *tip)?;
|
||||
|
||||
let mut by_prefix = FxHashMap::with_capacity_and_hasher(len, Default::default());
|
||||
for (offset, hash) in hashes.iter().enumerate() {
|
||||
by_prefix.insert(BlockHashPrefix::from(hash), offset as u32);
|
||||
}
|
||||
let mut by_prefix =
|
||||
FxHashMap::with_capacity_and_hasher(hashes.len(), Default::default());
|
||||
by_prefix.extend(
|
||||
hashes
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, h)| (BlockHashPrefix::from(h), i as u32)),
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
start,
|
||||
end: tip,
|
||||
hashes,
|
||||
by_prefix,
|
||||
})
|
||||
}
|
||||
|
||||
fn empty(start: Height) -> Self {
|
||||
Self {
|
||||
start,
|
||||
end: start,
|
||||
hashes: Vec::new(),
|
||||
by_prefix: FxHashMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.hashes.len()
|
||||
}
|
||||
@@ -124,10 +119,10 @@ impl CanonicalRange {
|
||||
self.hashes.is_empty()
|
||||
}
|
||||
|
||||
/// Returns the offset-from-start of `hash` iff it matches a canonical
|
||||
/// block in this range. A prefix hit is verified against the stored
|
||||
/// full hash to rule out the (vanishing) chance of prefix collisions
|
||||
/// from unrelated orphans in blk files.
|
||||
/// Returns the offset-from-`start` of `hash` iff it matches the
|
||||
/// canonical chain in this range. A prefix hit is verified against
|
||||
/// the full hash so prefix collisions from orphaned blocks are
|
||||
/// rejected.
|
||||
#[inline]
|
||||
fn offset_of(&self, hash: &BlockHash) -> Option<u32> {
|
||||
let offset = *self.by_prefix.get(&BlockHashPrefix::from(hash))?;
|
||||
@@ -136,64 +131,63 @@ impl CanonicalRange {
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Pure block parser — no client, no confirmations, no Ok(None) on RPC errors.
|
||||
// Block parsing — cheap header peek first, full body parse only on a hit.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
const HEADER_LEN: usize = 80;
|
||||
|
||||
/// XOR-decode just the 80-byte header, compute the block hash, look it
|
||||
/// up in `canonical`, and only proceed to parse the body and transactions
|
||||
/// when the block is on the canonical chain. Returning early before the
|
||||
/// body decode is what lets a single parser thread keep up with the
|
||||
/// 4-thread `read()` pool on sparse ranges.
|
||||
///
|
||||
/// Returns `Ok(None)` for orphans / out-of-range blocks. Deterministic —
|
||||
/// never touches RPC.
|
||||
fn parse_canonical_block(
|
||||
mut bytes: Vec<u8>,
|
||||
metadata: BlkMetadata,
|
||||
mut xor_i: XORIndex,
|
||||
/// Returns the canonical offset of `bytes` if its header hashes to a
|
||||
/// known canonical block, otherwise `None`. Does not allocate and does
|
||||
/// not mutate `bytes`: the header is copied onto a stack buffer and
|
||||
/// XOR-decoded there so an orphan short-circuits cleanly and a
|
||||
/// canonical hit can still be cloned out intact.
|
||||
fn peek_canonical_offset(
|
||||
bytes: &[u8],
|
||||
mut xor_state: XORIndex,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &CanonicalRange,
|
||||
) -> Result<Option<(u32, ReadBlock)>> {
|
||||
) -> Option<u32> {
|
||||
if bytes.len() < HEADER_LEN {
|
||||
return Err(Error::Internal("Block bytes shorter than header"));
|
||||
return None;
|
||||
}
|
||||
let mut header_buf = [0u8; HEADER_LEN];
|
||||
header_buf.copy_from_slice(&bytes[..HEADER_LEN]);
|
||||
xor_state.bytes(&mut header_buf, xor_bytes);
|
||||
let header = Header::consensus_decode(&mut &header_buf[..]).ok()?;
|
||||
canonical.offset_of(&BlockHash::from(header.block_hash()))
|
||||
}
|
||||
|
||||
// Decode just the header and look the hash up before paying for the
|
||||
// body. `xor_i` advances `HEADER_LEN` here so it stays in lock-step
|
||||
// with the decoded prefix.
|
||||
xor_i.bytes(&mut bytes[..HEADER_LEN], xor_bytes);
|
||||
let header = Header::consensus_decode(&mut &bytes[..HEADER_LEN])?;
|
||||
let bitcoin_hash = header.block_hash();
|
||||
|
||||
let Some(offset) = canonical.offset_of(&BlockHash::from(bitcoin_hash)) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Canonical: XOR-decode the body and parse transactions.
|
||||
xor_i.bytes(&mut bytes[HEADER_LEN..], xor_bytes);
|
||||
/// Full XOR-decode + parse for a block that has already been confirmed
|
||||
/// canonical by `peek_canonical_offset`. Takes owned `bytes` so it can
|
||||
/// mutate them in place and hand them to the resulting `ReadBlock`.
|
||||
fn parse_canonical_body(
|
||||
mut bytes: Vec<u8>,
|
||||
metadata: BlkMetadata,
|
||||
mut xor_state: XORIndex,
|
||||
xor_bytes: XORBytes,
|
||||
height: Height,
|
||||
) -> Result<ReadBlock> {
|
||||
xor_state.bytes(&mut bytes, xor_bytes);
|
||||
let mut cursor = Cursor::new(bytes);
|
||||
cursor.set_position(HEADER_LEN as u64);
|
||||
let header = Header::consensus_decode(&mut cursor)?;
|
||||
let bitcoin_hash = header.block_hash();
|
||||
let tx_count = VarInt::consensus_decode(&mut cursor)?.0 as usize;
|
||||
let mut txdata = Vec::with_capacity(tx_count);
|
||||
let mut tx_metadata = Vec::with_capacity(tx_count);
|
||||
let mut tx_offsets = Vec::with_capacity(tx_count);
|
||||
for _ in 0..tx_count {
|
||||
let off = cursor.position() as u32;
|
||||
tx_offsets.push(off);
|
||||
let position = metadata.position() + off;
|
||||
let tx_start = cursor.position() as u32;
|
||||
tx_offsets.push(tx_start);
|
||||
let tx = Transaction::consensus_decode(&mut cursor)?;
|
||||
let tx_len = cursor.position() as u32 - tx_start;
|
||||
txdata.push(tx);
|
||||
let len = cursor.position() as u32 - off;
|
||||
tx_metadata.push(BlkMetadata::new(position, len));
|
||||
tx_metadata.push(BlkMetadata::new(metadata.position() + tx_start, tx_len));
|
||||
}
|
||||
|
||||
let raw_bytes = cursor.into_inner();
|
||||
let height = Height::from(*canonical.start + offset);
|
||||
let mut block = Block::from((height, bitcoin_hash, bitcoin::Block { header, txdata }));
|
||||
block.set_raw_data(raw_bytes, tx_offsets);
|
||||
Ok(Some((offset, ReadBlock::from((block, metadata, tx_metadata)))))
|
||||
Ok(ReadBlock::from((block, metadata, tx_metadata)))
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
@@ -201,21 +195,19 @@ fn parse_canonical_block(
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
impl ReaderInner {
|
||||
/// Stream every canonical block strictly after `hash` (or from
|
||||
/// genesis if `None`) up to the current chain tip, in canonical
|
||||
/// order, via the canonical-hash pipeline.
|
||||
///
|
||||
/// Uses the default parser-thread count (`1`); see
|
||||
/// `after_canonical_with` to override.
|
||||
/// Streams every canonical block strictly after `hash` (or from
|
||||
/// genesis when `None`) up to the current chain tip, in canonical
|
||||
/// order. Uses the default parser-thread count; see
|
||||
/// [`after_canonical_with`](Self::after_canonical_with) to override.
|
||||
pub fn after_canonical(&self, hash: Option<BlockHash>) -> Result<Receiver<ReadBlock>> {
|
||||
self.after_canonical_with(hash, DEFAULT_PARSER_THREADS)
|
||||
}
|
||||
|
||||
/// Same as `after_canonical` but with a configurable number of parser
|
||||
/// threads. `parser_threads = 1` is the minimal-thread default
|
||||
/// (1 reader + 1 parser, uncontended mutex hot path). Higher values
|
||||
/// trade extra cores for throughput on dense ranges where the parser
|
||||
/// is the bottleneck.
|
||||
/// Like [`after_canonical`](Self::after_canonical) but with a
|
||||
/// configurable number of parser threads. `parser_threads = 1` is
|
||||
/// the minimal-thread default (1 reader + 1 parser, uncontended
|
||||
/// mutex). Higher values trade extra cores for throughput on dense
|
||||
/// ranges where the parser is the bottleneck.
|
||||
pub fn after_canonical_with(
|
||||
&self,
|
||||
hash: Option<BlockHash>,
|
||||
@@ -223,43 +215,42 @@ impl ReaderInner {
|
||||
) -> Result<Receiver<ReadBlock>> {
|
||||
let parser_threads = parser_threads.max(1);
|
||||
let tip = self.client.get_last_height()?;
|
||||
let canonical = Arc::new(CanonicalRange::walk(&self.client, hash, tip)?);
|
||||
let canonical = CanonicalRange::walk(&self.client, hash, tip)?;
|
||||
|
||||
if canonical.is_empty() {
|
||||
return Ok(bounded(0).1);
|
||||
}
|
||||
|
||||
// Refresh the blk path cache once, on the caller's thread, so the
|
||||
// worker thread below has a stable view.
|
||||
let paths = BlkIndexToBlkPath::scan(&self.blocks_dir);
|
||||
*self.blk_index_to_blk_path.write() = paths.clone();
|
||||
|
||||
let (send, recv) = bounded(BOUND_CAP);
|
||||
let xor_bytes = self.xor_bytes;
|
||||
|
||||
if canonical.len() <= TAIL_THRESHOLD {
|
||||
thread::spawn(move || {
|
||||
if let Err(e) = pipeline_tail(&paths, xor_bytes, &canonical, &send) {
|
||||
error!("after_canonical tail pipeline failed: {e}");
|
||||
}
|
||||
});
|
||||
let use_tail = canonical.len() <= TAIL_THRESHOLD;
|
||||
let first_blk_index = if use_tail {
|
||||
0
|
||||
} else {
|
||||
let first_blk_index = self
|
||||
.find_start_blk_index(Some(canonical.start), &paths, xor_bytes)
|
||||
.unwrap_or_default();
|
||||
thread::spawn(move || {
|
||||
if let Err(e) = pipeline_forward(
|
||||
self.find_start_blk_index(Some(canonical.start), &paths, xor_bytes)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
thread::spawn(move || {
|
||||
let result = if use_tail {
|
||||
pipeline_tail(&paths, xor_bytes, &canonical, &send)
|
||||
} else {
|
||||
pipeline_forward(
|
||||
&paths,
|
||||
first_blk_index,
|
||||
xor_bytes,
|
||||
canonical,
|
||||
&canonical,
|
||||
&send,
|
||||
parser_threads,
|
||||
) {
|
||||
error!("after_canonical forward pipeline failed: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
)
|
||||
};
|
||||
if let Err(e) = result {
|
||||
error!("after_canonical pipeline failed: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(recv)
|
||||
}
|
||||
@@ -269,36 +260,40 @@ impl ReaderInner {
|
||||
// Forward pipeline — 1 reader + N parsers + shared in-order emission.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Item shipped from the reader thread to the parser pool: raw block
|
||||
/// bytes, blk-file metadata, and the XOR state at the byte the bytes
|
||||
/// start at.
|
||||
type ScannedItem = (BlkMetadata, Vec<u8>, XORIndex);
|
||||
/// A raw block the reader has already confirmed is on the canonical
|
||||
/// chain, shipped to the parser pool for full decoding.
|
||||
struct ScannedBlock {
|
||||
metadata: BlkMetadata,
|
||||
bytes: Vec<u8>,
|
||||
xor_state: XORIndex,
|
||||
canonical_offset: u32,
|
||||
}
|
||||
|
||||
/// Shared in-order emission buffer used by N parser threads. The mutex
|
||||
/// is uncontended at `parser_threads = 1` (still acquired, never blocks).
|
||||
/// In-order emission buffer shared between the parser threads. Access
|
||||
/// is serialised through a `parking_lot::Mutex`; at `parser_threads = 1`
|
||||
/// the lock is always uncontended.
|
||||
struct ReorderState {
|
||||
next_offset: u32,
|
||||
target_len: u32,
|
||||
/// Ahead-of-line matches keyed by canonical offset; drained
|
||||
/// contiguously each time `next_offset` advances.
|
||||
/// contiguously each time `next_offset` advances. Bounded in
|
||||
/// practice by parser-thread scheduling jitter — see module doc.
|
||||
pending: FxHashMap<u32, ReadBlock>,
|
||||
send_to_consumer: Sender<ReadBlock>,
|
||||
}
|
||||
|
||||
impl ReorderState {
|
||||
fn new(send_to_consumer: Sender<ReadBlock>, target_len: u32) -> Self {
|
||||
fn new(send_to_consumer: Sender<ReadBlock>) -> Self {
|
||||
Self {
|
||||
next_offset: 0,
|
||||
target_len,
|
||||
pending: FxHashMap::default(),
|
||||
send_to_consumer,
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a parsed canonical block. Returns `false` once the pipeline
|
||||
/// is done — either the consumer dropped the receiver, every canonical
|
||||
/// block has been emitted, or a parser somehow produced a duplicate
|
||||
/// offset — so the caller should stop processing and exit.
|
||||
/// Accepts a parsed canonical block; emits it and drains any
|
||||
/// contiguous pending matches. Returns `false` iff the consumer
|
||||
/// dropped the receiver — a pure liveness signal. Completion is
|
||||
/// checked by the caller via `next_offset`.
|
||||
fn try_emit(&mut self, offset: u32, block: ReadBlock) -> bool {
|
||||
use std::cmp::Ordering::*;
|
||||
match offset.cmp(&self.next_offset) {
|
||||
@@ -307,80 +302,83 @@ impl ReorderState {
|
||||
return false;
|
||||
}
|
||||
self.next_offset += 1;
|
||||
while let Some(b) = self.pending.remove(&self.next_offset) {
|
||||
if self.send_to_consumer.send(b).is_err() {
|
||||
while let Some(next) = self.pending.remove(&self.next_offset) {
|
||||
if self.send_to_consumer.send(next).is_err() {
|
||||
return false;
|
||||
}
|
||||
self.next_offset += 1;
|
||||
}
|
||||
self.next_offset < self.target_len
|
||||
true
|
||||
}
|
||||
Greater => {
|
||||
self.pending.insert(offset, block);
|
||||
true
|
||||
}
|
||||
// Each canonical hash appears at exactly one offset, and
|
||||
// each block is parsed once, so a parser should never
|
||||
// produce an offset below `next_offset`. Treat as done.
|
||||
Less => false,
|
||||
// Unreachable in practice: each canonical hash appears at
|
||||
// exactly one offset and each block is parsed once.
|
||||
Less => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Two-stage pipeline:
|
||||
/// Forward pipeline: the reader (this thread) scans blk files and
|
||||
/// ships canonical hits to a scoped parser pool via `parser_send`;
|
||||
/// parsers decode bodies and serialise emission through `reorder`.
|
||||
/// Scoped threads let every parser borrow `canonical`, `reorder`, and
|
||||
/// `done` directly — no `Arc` required.
|
||||
///
|
||||
/// 1. **Reader (this thread)** — walks blk files from `first_blk_index`,
|
||||
/// `fs::read`s each one, runs `scan_bytes` to locate every block, and
|
||||
/// ships `ScannedItem`s over an mpmc channel to the parser pool.
|
||||
/// 2. **Parser pool** — `parser_threads` workers draining the same
|
||||
/// channel. Each worker runs `parse_canonical_block` (header first,
|
||||
/// body only on canonical match) and acquires the shared `ReorderState`
|
||||
/// mutex to insert into the in-order emission buffer.
|
||||
///
|
||||
/// Canonical blocks can arrive out of order across blk files (bitcoind
|
||||
/// doesn't write in strict chain order during initial sync, headers-first
|
||||
/// body fetch, or reindex), so the reorder buffer is required even with
|
||||
/// a single parser thread.
|
||||
/// A reorder buffer is required even at `parser_threads = 1` because
|
||||
/// canonical blocks can arrive out of order across blk files (bitcoind
|
||||
/// doesn't write in strict chain order during initial sync, headers-
|
||||
/// first body fetch, or reindex).
|
||||
fn pipeline_forward(
|
||||
paths: &BlkIndexToBlkPath,
|
||||
first_blk_index: u16,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: Arc<CanonicalRange>,
|
||||
canonical: &CanonicalRange,
|
||||
send: &Sender<ReadBlock>,
|
||||
parser_threads: usize,
|
||||
) -> Result<()> {
|
||||
let (parser_send, parser_recv) = bounded::<ScannedItem>(BOUND_CAP);
|
||||
let reorder = Arc::new(Mutex::new(ReorderState::new(
|
||||
send.clone(),
|
||||
canonical.len() as u32,
|
||||
)));
|
||||
// Set when the pipeline is finished (consumer dropped or all canonical
|
||||
// blocks emitted) so parsers can short-circuit instead of burning CPU
|
||||
// on doomed work while the reader drains the queue.
|
||||
let done = Arc::new(AtomicBool::new(false));
|
||||
let (parser_send, parser_recv) = bounded::<ScannedBlock>(BOUND_CAP);
|
||||
let reorder = Mutex::new(ReorderState::new(send.clone()));
|
||||
let target_canonical_count = canonical.len() as u32;
|
||||
let done = AtomicBool::new(false);
|
||||
|
||||
let parsers = spawn_parser_pool(
|
||||
parser_threads,
|
||||
&parser_recv,
|
||||
&reorder,
|
||||
&done,
|
||||
&canonical,
|
||||
xor_bytes,
|
||||
);
|
||||
drop(parser_recv); // parsers own clones; this would otherwise keep the channel open
|
||||
thread::scope(|scope| -> Result<()> {
|
||||
for _ in 0..parser_threads {
|
||||
let parser_recv = parser_recv.clone();
|
||||
scope.spawn(|| {
|
||||
parser_loop(
|
||||
parser_recv,
|
||||
&reorder,
|
||||
&done,
|
||||
canonical,
|
||||
xor_bytes,
|
||||
target_canonical_count,
|
||||
)
|
||||
});
|
||||
}
|
||||
// Every parser owns its own clone; ours would otherwise keep
|
||||
// the channel "alive" and leak a dangling receiver.
|
||||
drop(parser_recv);
|
||||
|
||||
let read_result = read_and_dispatch(paths, first_blk_index, xor_bytes, &parser_send, &done);
|
||||
drop(parser_send); // signal end-of-input to parsers
|
||||
let read_result = read_and_dispatch(
|
||||
paths,
|
||||
first_blk_index,
|
||||
xor_bytes,
|
||||
canonical,
|
||||
&parser_send,
|
||||
&done,
|
||||
);
|
||||
// Signal end-of-input to the parsers so they exit their `for`
|
||||
// loops and the scope can join them.
|
||||
drop(parser_send);
|
||||
read_result
|
||||
})?;
|
||||
|
||||
for parser in parsers {
|
||||
parser
|
||||
.join()
|
||||
.map_err(|_| Error::Internal("parser thread panicked"))??;
|
||||
}
|
||||
read_result?;
|
||||
|
||||
let state = reorder.lock();
|
||||
if (state.next_offset as usize) < canonical.len() && !done.load(Ordering::Relaxed) {
|
||||
let pipeline_cancelled = done.load(Ordering::Relaxed);
|
||||
let emitted = reorder.lock().next_offset as usize;
|
||||
if !pipeline_cancelled && emitted < canonical.len() {
|
||||
return Err(Error::Internal(
|
||||
"after_canonical forward pipeline: blk files missing canonical blocks",
|
||||
));
|
||||
@@ -388,58 +386,50 @@ fn pipeline_forward(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawn `n` parser threads that drain `parser_recv`, parse each scanned
|
||||
/// item via `parse_canonical_block`, and emit canonical matches to
|
||||
/// `reorder`. Parsers exit when the channel closes or `done` is set.
|
||||
fn spawn_parser_pool(
|
||||
n: usize,
|
||||
parser_recv: &Receiver<ScannedItem>,
|
||||
reorder: &Arc<Mutex<ReorderState>>,
|
||||
done: &Arc<AtomicBool>,
|
||||
canonical: &Arc<CanonicalRange>,
|
||||
/// Full-body parse + in-order emit loop run by every scoped parser
|
||||
/// worker in `pipeline_forward`. Drains `parser_recv` to exhaustion.
|
||||
fn parser_loop(
|
||||
parser_recv: Receiver<ScannedBlock>,
|
||||
reorder: &Mutex<ReorderState>,
|
||||
done: &AtomicBool,
|
||||
canonical: &CanonicalRange,
|
||||
xor_bytes: XORBytes,
|
||||
) -> Vec<thread::JoinHandle<Result<()>>> {
|
||||
(0..n)
|
||||
.map(|_| {
|
||||
let parser_recv = parser_recv.clone();
|
||||
let reorder = reorder.clone();
|
||||
let done = done.clone();
|
||||
let canonical = canonical.clone();
|
||||
thread::spawn(move || -> Result<()> {
|
||||
for (metadata, bytes, xor_i) in parser_recv {
|
||||
if done.load(Ordering::Relaxed) {
|
||||
continue; // drain quietly
|
||||
}
|
||||
|
||||
let (offset, block) = match parse_canonical_block(
|
||||
bytes, metadata, xor_i, xor_bytes, &canonical,
|
||||
) {
|
||||
Ok(Some(item)) => item,
|
||||
Ok(None) => continue, // orphan / out of range
|
||||
Err(e) => {
|
||||
warn!("parse_canonical_block failed: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if !reorder.lock().try_emit(offset, block) {
|
||||
done.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
target_canonical_count: u32,
|
||||
) {
|
||||
for ScannedBlock { metadata, bytes, xor_state, canonical_offset } in parser_recv {
|
||||
if done.load(Ordering::Relaxed) {
|
||||
continue;
|
||||
}
|
||||
let height = Height::from(*canonical.start + canonical_offset);
|
||||
let block = match parse_canonical_body(bytes, metadata, xor_state, xor_bytes, height) {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
warn!("parse_canonical_body failed: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let pipeline_finished = {
|
||||
let mut state = reorder.lock();
|
||||
!state.try_emit(canonical_offset, block)
|
||||
|| state.next_offset >= target_canonical_count
|
||||
};
|
||||
if pipeline_finished {
|
||||
done.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Walk blk files from `first_blk_index`, scan each one, and ship every
|
||||
/// raw block found to the parser pool. Stops early if `done` flips or
|
||||
/// the parser channel closes.
|
||||
/// Walk blk files from `first_blk_index`, scan each one, and ship
|
||||
/// canonical blocks to the parser pool. Non-canonical blocks are
|
||||
/// rejected via `peek_canonical_offset` *before* being cloned — the
|
||||
/// cheap filter is what lets a sparse catchup avoid allocating for the
|
||||
/// ~99% of blocks outside the window.
|
||||
fn read_and_dispatch(
|
||||
paths: &BlkIndexToBlkPath,
|
||||
first_blk_index: u16,
|
||||
xor_bytes: XORBytes,
|
||||
parser_send: &Sender<ScannedItem>,
|
||||
canonical: &CanonicalRange,
|
||||
parser_send: &Sender<ScannedBlock>,
|
||||
done: &AtomicBool,
|
||||
) -> Result<()> {
|
||||
for (&blk_index, blk_path) in paths.range(first_blk_index..) {
|
||||
@@ -457,10 +447,22 @@ fn read_and_dispatch(
|
||||
blk_index,
|
||||
0,
|
||||
xor_bytes,
|
||||
|metadata, block_bytes, xor_i| {
|
||||
if done.load(Ordering::Relaxed)
|
||||
|| parser_send.send((metadata, block_bytes, xor_i)).is_err()
|
||||
{
|
||||
|metadata, block_bytes, xor_state| {
|
||||
if done.load(Ordering::Relaxed) {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
let Some(canonical_offset) =
|
||||
peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical)
|
||||
else {
|
||||
return ControlFlow::Continue(());
|
||||
};
|
||||
let scanned = ScannedBlock {
|
||||
metadata,
|
||||
bytes: block_bytes.to_vec(),
|
||||
xor_state,
|
||||
canonical_offset,
|
||||
};
|
||||
if parser_send.send(scanned).is_err() {
|
||||
ControlFlow::Break(())
|
||||
} else {
|
||||
ControlFlow::Continue(())
|
||||
@@ -476,65 +478,72 @@ fn read_and_dispatch(
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Tail pipeline — reverse 5MB chunks of the last blk files until we've
|
||||
// collected every canonical hash, then emit forward.
|
||||
// Tail pipeline — reverse-scan the newest blk files in 5 MB chunks until
|
||||
// every canonical hash has been matched, then emit them forward.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
fn pipeline_tail(
|
||||
paths: &BlkIndexToBlkPath,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &Arc<CanonicalRange>,
|
||||
canonical: &CanonicalRange,
|
||||
send: &Sender<ReadBlock>,
|
||||
) -> Result<()> {
|
||||
// Collected matches, keyed by canonical offset. Tail ranges are ≤10 so
|
||||
// a Vec<Option<_>> is the simplest representation.
|
||||
let mut collected: Vec<Option<ReadBlock>> = (0..canonical.len()).map(|_| None).collect();
|
||||
let mut slots: Vec<Option<ReadBlock>> = (0..canonical.len()).map(|_| None).collect();
|
||||
let mut remaining = canonical.len();
|
||||
// Carries the bytes before a chunk's first magic into the next
|
||||
// (earlier) chunk so blocks straddling the boundary survive.
|
||||
let mut spillover: Vec<u8> = Vec::new();
|
||||
|
||||
'files: for (&blk_index, path) in paths.iter().rev() {
|
||||
let file_len = fs::metadata(path).map(|m| m.len() as usize).unwrap_or(0);
|
||||
let mut file = File::open(path).map_err(|_| Error::Internal("Failed to open blk file"))?;
|
||||
let file_len = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
|
||||
if file_len == 0 {
|
||||
continue;
|
||||
}
|
||||
let Ok(mut file) = File::open(path) else {
|
||||
return Err(Error::Internal("Failed to open blk file"));
|
||||
};
|
||||
|
||||
let mut read_end = file_len;
|
||||
let mut head: Vec<u8> = Vec::new();
|
||||
spillover.clear();
|
||||
|
||||
while read_end > 0 && remaining > 0 {
|
||||
let read_start = read_end.saturating_sub(TAIL_CHUNK);
|
||||
let chunk_len = read_end - read_start;
|
||||
read_end = read_start;
|
||||
|
||||
if file.seek(SeekFrom::Start(read_start as u64)).is_err() {
|
||||
return Err(Error::Internal("Failed to seek blk file"));
|
||||
}
|
||||
let mut buf = vec![0u8; chunk_len + head.len()];
|
||||
if file.read_exact(&mut buf[..chunk_len]).is_err() {
|
||||
return Err(Error::Internal("Failed to read blk chunk"));
|
||||
}
|
||||
buf[chunk_len..].copy_from_slice(&head);
|
||||
head.clear();
|
||||
file.seek(SeekFrom::Start(read_start as u64))
|
||||
.map_err(|_| Error::Internal("Failed to seek blk file"))?;
|
||||
let mut buf = vec![0u8; chunk_len + spillover.len()];
|
||||
file.read_exact(&mut buf[..chunk_len])
|
||||
.map_err(|_| Error::Internal("Failed to read blk chunk"))?;
|
||||
buf[chunk_len..].copy_from_slice(&spillover);
|
||||
spillover.clear();
|
||||
|
||||
let result: ScanResult = scan_bytes(
|
||||
let result = scan_bytes(
|
||||
&mut buf,
|
||||
blk_index,
|
||||
read_start,
|
||||
xor_bytes,
|
||||
|metadata, block_bytes, xor_i| {
|
||||
match parse_canonical_block(block_bytes, metadata, xor_i, xor_bytes, canonical)
|
||||
{
|
||||
Ok(Some((offset, block))) => {
|
||||
let slot = &mut collected[offset as usize];
|
||||
if slot.is_none() {
|
||||
*slot = Some(block);
|
||||
remaining -= 1;
|
||||
}
|
||||
|metadata, block_bytes, xor_state| {
|
||||
let Some(offset) =
|
||||
peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical)
|
||||
else {
|
||||
return ControlFlow::Continue(());
|
||||
};
|
||||
if slots[offset as usize].is_some() {
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
let height = Height::from(*canonical.start + offset);
|
||||
match parse_canonical_body(
|
||||
block_bytes.to_vec(),
|
||||
metadata,
|
||||
xor_state,
|
||||
xor_bytes,
|
||||
height,
|
||||
) {
|
||||
Ok(block) => {
|
||||
slots[offset as usize] = Some(block);
|
||||
remaining -= 1;
|
||||
}
|
||||
Ok(None) => {} // orphan / out of range
|
||||
Err(e) => warn!("parse_canonical_block failed in tail pipeline: {e}"),
|
||||
Err(e) => warn!("parse_canonical_body failed in tail pipeline: {e}"),
|
||||
}
|
||||
if remaining == 0 {
|
||||
ControlFlow::Break(())
|
||||
@@ -547,9 +556,8 @@ fn pipeline_tail(
|
||||
if remaining == 0 {
|
||||
break 'files;
|
||||
}
|
||||
|
||||
if read_start > 0 {
|
||||
head = buf[..result.first_magic.unwrap_or(buf.len())].to_vec();
|
||||
spillover.extend_from_slice(&buf[..result.first_magic.unwrap_or(buf.len())]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -560,9 +568,7 @@ fn pipeline_tail(
|
||||
));
|
||||
}
|
||||
|
||||
// `remaining == 0` above guarantees every slot is `Some`; `flatten`
|
||||
// is just the natural way to write the emit loop.
|
||||
for block in collected.into_iter().flatten() {
|
||||
for block in slots.into_iter().flatten() {
|
||||
if send.send(block).is_err() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -221,7 +221,8 @@ impl ReaderInner {
|
||||
0,
|
||||
xor_bytes,
|
||||
|metadata, block_bytes, xor_i| {
|
||||
if send_bytes.send((metadata, block_bytes, xor_i)).is_err() {
|
||||
// Send owned bytes to the rayon parser pool.
|
||||
if send_bytes.send((metadata, block_bytes.to_vec(), xor_i)).is_err() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
ControlFlow::Continue(())
|
||||
@@ -371,8 +372,10 @@ impl ReaderInner {
|
||||
read_start,
|
||||
xor_bytes,
|
||||
|metadata, bytes, xor_i| {
|
||||
// `decode_block` needs owned bytes — it XOR-
|
||||
// decodes in place before parsing.
|
||||
if let Ok(Some(block)) = decode_block(
|
||||
bytes, metadata, &client, xor_i, xor_bytes, start, end, 0, 0,
|
||||
bytes.to_vec(), metadata, &client, xor_i, xor_bytes, start, end, 0, 0,
|
||||
) {
|
||||
blocks.push(block);
|
||||
}
|
||||
|
||||
@@ -23,14 +23,17 @@ pub struct ScanResult {
|
||||
pub interrupted: bool,
|
||||
}
|
||||
|
||||
/// Scans `buf` for blocks. `file_offset` is the absolute position of `buf[0]` in the file.
|
||||
/// Calls `on_block` for each complete block found.
|
||||
/// Scans `buf` for blocks. `file_offset` is the absolute position of
|
||||
/// `buf[0]` in the file. Calls `on_block` for each complete block found,
|
||||
/// passing the block's raw bytes as a mutable borrow of the buffer — the
|
||||
/// caller decides whether to clone them (e.g. to ship owned data to a
|
||||
/// parser thread) or process them in place (e.g. cheap header peek).
|
||||
pub fn scan_bytes(
|
||||
buf: &mut [u8],
|
||||
blk_index: u16,
|
||||
file_offset: usize,
|
||||
xor_bytes: XORBytes,
|
||||
mut on_block: impl FnMut(BlkMetadata, Vec<u8>, XORIndex) -> ControlFlow<()>,
|
||||
mut on_block: impl FnMut(BlkMetadata, &mut [u8], XORIndex) -> ControlFlow<()>,
|
||||
) -> ScanResult {
|
||||
let mut xor_i = XORIndex::default();
|
||||
xor_i.add_assign(file_offset);
|
||||
@@ -56,7 +59,7 @@ pub fn scan_bytes(
|
||||
}
|
||||
let position = BlkPosition::new(blk_index, (file_offset + i) as u32);
|
||||
let metadata = BlkMetadata::new(position, len as u32);
|
||||
if on_block(metadata, buf[i..i + len].to_vec(), xor_i).is_break() {
|
||||
if on_block(metadata, &mut buf[i..i + len], xor_i).is_break() {
|
||||
return ScanResult {
|
||||
first_magic,
|
||||
interrupted: true,
|
||||
|
||||
Reference in New Issue
Block a user