mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
global: snap
This commit is contained in:
@@ -1,7 +1,3 @@
|
||||
//! Helpers for picking where to start scanning: probe the first
|
||||
//! block of a file ([`first_block_height`]) and bisect the blk-index
|
||||
//! map for a target height ([`find_start_blk_index`]).
|
||||
|
||||
use std::{fs::File, io::Read, path::Path};
|
||||
|
||||
use bitcoin::{block::Header, consensus::Decodable};
|
||||
@@ -17,8 +13,6 @@ use crate::{
|
||||
|
||||
const PROBE_BUF_LEN: usize = 4096;
|
||||
|
||||
/// Decodes the first block in `blk_path` and resolves its height via
|
||||
/// RPC. One short read + one RPC.
|
||||
pub(crate) fn first_block_height(
|
||||
client: &Client,
|
||||
blk_path: &Path,
|
||||
@@ -32,15 +26,11 @@ pub(crate) fn first_block_height(
|
||||
let magic_end = find_magic(&buf[..n], &mut xor_i, xor_bytes)
|
||||
.ok_or_else(|| Error::NotFound("No magic bytes found".into()))?;
|
||||
|
||||
// Decode the 4-byte size + 80-byte header in one pass; the size
|
||||
// is discarded. Bounds-check first so a corrupt file whose only
|
||||
// magic-shaped bytes sit at the end of the probe doesn't index
|
||||
// past `n`.
|
||||
let header_end = magic_end + 4 + HEADER_LEN;
|
||||
if header_end > n {
|
||||
warn!(
|
||||
"first_block_height: {} has magic-shaped bytes at offset {} but \
|
||||
not enough room in the {}-byte probe to decode the header — \
|
||||
not enough room in the {}-byte probe to decode the header, \
|
||||
the file is probably corrupt",
|
||||
blk_path.display(),
|
||||
magic_end - 4,
|
||||
@@ -59,15 +49,11 @@ pub(crate) fn first_block_height(
|
||||
Ok(Height::new(height))
|
||||
}
|
||||
|
||||
/// Bisects the map for the file whose first block height is ≤
|
||||
/// `target_start`, then backs off [`OUT_OF_ORDER_FILE_BACKOFF`] files.
|
||||
/// Always returns a valid blk index — read errors mid-search log and
|
||||
/// fall through to the backoff (or to 0 if the map is empty).
|
||||
///
|
||||
/// On a transient read error we **break** rather than `left = mid + 1`:
|
||||
/// the height bound at `mid` is unknown, so any further bisection on
|
||||
/// that side could skip valid lower indices. Falling through to the
|
||||
/// backoff still gives a safe lower bound.
|
||||
/// Bisects for the file whose first block height is <= `target_start`,
|
||||
/// then backs off [`OUT_OF_ORDER_FILE_BACKOFF`] files. A transient
|
||||
/// read error mid-bisect breaks out rather than narrowing further:
|
||||
/// the bound at `mid` is unknown, so any further step could skip
|
||||
/// valid lower indices. The backoff still provides a safe lower bound.
|
||||
pub(crate) fn find_start_blk_index(
|
||||
client: &Client,
|
||||
target_start: Height,
|
||||
|
||||
@@ -1,38 +1,32 @@
|
||||
//! `CanonicalRange`: every canonical block hash in a height window,
|
||||
//! pre-fetched once via [`brk_rpc::Client::get_block_hashes_range`].
|
||||
//! Used as the authoritative "is this block on the main chain?"
|
||||
//! filter so the scan pipeline never needs a per-block RPC call.
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_rpc::Client;
|
||||
use brk_types::{BlockHash, Height};
|
||||
use rustc_hash::FxHashMap;
|
||||
|
||||
/// Keyed on the full 32-byte hash because a prefix collision would
|
||||
/// silently drop both blocks; the ~24 MB extra RAM is negligible
|
||||
/// against the 128 MB blk reads happening in parallel.
|
||||
/// Keyed on the full 32-byte hash: a prefix collision would
|
||||
/// silently drop both blocks.
|
||||
pub struct CanonicalRange {
|
||||
pub start: Height,
|
||||
anchor: Option<BlockHash>,
|
||||
by_hash: FxHashMap<BlockHash, u32>,
|
||||
}
|
||||
|
||||
impl CanonicalRange {
|
||||
/// Resolves canonical hashes for every height strictly after
|
||||
/// `anchor` up to `tip` inclusive. `anchor = None` starts at
|
||||
/// genesis.
|
||||
pub fn walk(client: &Client, anchor: Option<&BlockHash>, tip: Height) -> Result<Self> {
|
||||
let start = match anchor {
|
||||
Some(hash) => Height::from(client.get_block_header_info(hash)?.height + 1),
|
||||
None => Height::ZERO,
|
||||
};
|
||||
Self::between(client, start, tip)
|
||||
let mut range = Self::between(client, start, tip)?;
|
||||
range.anchor = anchor.cloned();
|
||||
Ok(range)
|
||||
}
|
||||
|
||||
/// Resolves canonical hashes for every height in `start..=end`.
|
||||
pub fn between(client: &Client, start: Height, end: Height) -> Result<Self> {
|
||||
if start > end {
|
||||
return Ok(Self {
|
||||
start,
|
||||
anchor: None,
|
||||
by_hash: FxHashMap::default(),
|
||||
});
|
||||
}
|
||||
@@ -42,7 +36,11 @@ impl CanonicalRange {
|
||||
.enumerate()
|
||||
.map(|(i, h)| (h, i as u32))
|
||||
.collect();
|
||||
Ok(Self { start, by_hash })
|
||||
Ok(Self {
|
||||
start,
|
||||
anchor: None,
|
||||
by_hash,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
@@ -53,9 +51,18 @@ impl CanonicalRange {
|
||||
self.by_hash.is_empty()
|
||||
}
|
||||
|
||||
/// Offset-from-`start` of `hash` iff it's on the canonical chain.
|
||||
#[inline]
|
||||
pub(crate) fn offset_of(&self, hash: &BlockHash) -> Option<u32> {
|
||||
self.by_hash.get(hash).copied()
|
||||
}
|
||||
|
||||
/// `prev_hash` must match the canonical hash at `offset - 1`, or
|
||||
/// the anchor when `offset == 0`.
|
||||
#[inline]
|
||||
pub(crate) fn verify_prev(&self, offset: u32, prev_hash: &BlockHash) -> bool {
|
||||
match offset {
|
||||
0 => self.anchor.as_ref().is_none_or(|a| a == prev_hash),
|
||||
_ => self.offset_of(prev_hash) == Some(offset - 1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,27 +30,19 @@ pub use canonical::CanonicalRange;
|
||||
pub use xor_bytes::*;
|
||||
pub use xor_index::*;
|
||||
|
||||
/// Files of out-of-order play to tolerate. bitcoind sometimes writes
|
||||
/// blocks slightly out of height order across files (initial sync,
|
||||
/// headers-first body fetch, reindex), so a single "out of bounds"
|
||||
/// signal isn't enough to declare failure. Used by the forward
|
||||
/// bisection backoff and the tail bailout streak.
|
||||
/// bitcoind writes blocks slightly out of height order across files
|
||||
/// during initial sync, headers-first body fetch, and reindex, so a
|
||||
/// single "out of bounds" signal isn't enough to declare failure.
|
||||
pub(crate) const OUT_OF_ORDER_FILE_BACKOFF: usize = 21;
|
||||
|
||||
const TARGET_NOFILE: u64 = 15_000;
|
||||
|
||||
/// Bitcoin Core blk-file reader. Cheap to clone (`Arc`-backed) and
|
||||
/// thread-safe: every method takes `&self` and the
|
||||
/// `Receiver<Result<ReadBlock>>` from the streaming API can be
|
||||
/// drained from any thread.
|
||||
/// thread-safe.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Reader(Arc<ReaderInner>);
|
||||
|
||||
impl Reader {
|
||||
/// Raises the per-process `NOFILE` limit so the file-handle cache
|
||||
/// can keep one open `File` per `blkNNNNN.dat`. For tests or
|
||||
/// embeddings that don't want the process-wide rlimit side
|
||||
/// effect, use [`Self::new_without_rlimit`].
|
||||
pub fn new(blocks_dir: PathBuf, client: &Client) -> Self {
|
||||
Self::raise_fd_limit();
|
||||
Self::new_without_rlimit(blocks_dir, client)
|
||||
@@ -65,13 +57,9 @@ impl Reader {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Called automatically by [`Self::new`]. Exposed so callers
|
||||
/// using [`Self::new_without_rlimit`] can opt in once.
|
||||
///
|
||||
/// Raises **only the soft limit**, clamped to the current hard
|
||||
/// limit — raising the hard limit requires `CAP_SYS_RESOURCE`
|
||||
/// and would fail (dropping the entire call) on containers and
|
||||
/// unprivileged macOS user processes.
|
||||
/// Raises only the soft limit, clamped to the current hard limit:
|
||||
/// raising the hard limit requires `CAP_SYS_RESOURCE` and would
|
||||
/// fail on containers and unprivileged macOS processes.
|
||||
pub fn raise_fd_limit() {
|
||||
let (soft, hard) = rlimit::getrlimit(rlimit::Resource::NOFILE).unwrap_or((0, 0));
|
||||
let new_soft = soft.max(TARGET_NOFILE).min(hard);
|
||||
@@ -95,15 +83,10 @@ impl Reader {
|
||||
self.0.xor_bytes
|
||||
}
|
||||
|
||||
/// Decode the first block in `blk_path` and resolve its height
|
||||
/// via RPC. Exposed for inspection tools (see
|
||||
/// `examples/blk_heights.rs`).
|
||||
pub fn first_block_height(&self, blk_path: &Path, xor_bytes: XORBytes) -> Result<Height> {
|
||||
bisect::first_block_height(&self.0.client, blk_path, xor_bytes)
|
||||
}
|
||||
|
||||
/// `read_exact_at` so a short read becomes a hard error instead
|
||||
/// of silent corruption from the buffer's zero-init tail.
|
||||
pub fn read_raw_bytes(&self, position: BlkPosition, size: usize) -> Result<Vec<u8>> {
|
||||
let file = self.0.open_blk(position.blk_index())?;
|
||||
let mut buffer = vec![0u8; size];
|
||||
@@ -112,8 +95,6 @@ impl Reader {
|
||||
Ok(buffer)
|
||||
}
|
||||
|
||||
/// Streaming `Read` at `position`. Holds an `Arc<File>` so the
|
||||
/// cache lock isn't held across the I/O.
|
||||
pub fn reader_at(&self, position: BlkPosition) -> Result<BlkRead> {
|
||||
let file = self.0.open_blk(position.blk_index())?;
|
||||
Ok(BlkRead {
|
||||
@@ -124,16 +105,18 @@ impl Reader {
|
||||
})
|
||||
}
|
||||
|
||||
/// Streams every canonical block from genesis to the current
|
||||
/// chain tip.
|
||||
pub fn all(&self) -> Result<Receiver<Result<ReadBlock>>> {
|
||||
self.after(None)
|
||||
}
|
||||
|
||||
/// Streams every canonical block strictly after `hash` (or from
|
||||
/// genesis when `None`) up to the current chain tip.
|
||||
pub fn after(&self, hash: Option<BlockHash>) -> Result<Receiver<Result<ReadBlock>>> {
|
||||
self.after_with(hash, pipeline::DEFAULT_PARSER_THREADS)
|
||||
}
|
||||
|
||||
/// Like [`after`](Self::after) with a configurable parser-thread
|
||||
/// count. The default of 1 reader + 1 parser leaves the rest of
|
||||
/// the cores for the indexer; bench tools that drain the channel
|
||||
/// cheaply can override.
|
||||
pub fn after_with(
|
||||
&self,
|
||||
hash: Option<BlockHash>,
|
||||
@@ -141,7 +124,7 @@ impl Reader {
|
||||
) -> Result<Receiver<Result<ReadBlock>>> {
|
||||
let tip = self.0.client.get_last_height()?;
|
||||
let canonical = CanonicalRange::walk(&self.0.client, hash.as_ref(), tip)?;
|
||||
pipeline::spawn(self.0.clone(), canonical, hash, parser_threads)
|
||||
pipeline::spawn(self.0.clone(), canonical, parser_threads)
|
||||
}
|
||||
|
||||
/// Inclusive height range `start..=end` in canonical order.
|
||||
@@ -162,20 +145,14 @@ impl Reader {
|
||||
)));
|
||||
}
|
||||
let canonical = CanonicalRange::between(&self.0.client, start, end)?;
|
||||
// No anchor: caller asked for "blocks at heights X..=Y", they
|
||||
// get whatever bitcoind says is canonical there.
|
||||
pipeline::spawn(self.0.clone(), canonical, None, parser_threads)
|
||||
pipeline::spawn(self.0.clone(), canonical, parser_threads)
|
||||
}
|
||||
}
|
||||
|
||||
/// `pub(crate)` so `pipeline` can capture it via `Arc<ReaderInner>`
|
||||
/// for spawned workers; everything else goes through `Reader`.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ReaderInner {
|
||||
/// Invalidated on every [`refresh_paths`](Self::refresh_paths) so
|
||||
/// a pruned/reindexed blk file can't keep serving stale bytes
|
||||
/// from a dead inode. `Arc<File>` lets us hand out cheap clones
|
||||
/// without holding the cache lock during I/O.
|
||||
/// Invalidated on every `refresh_paths` so a pruned or reindexed
|
||||
/// blk file can't keep serving stale bytes from a dead inode.
|
||||
blk_file_cache: RwLock<BTreeMap<u16, Arc<File>>>,
|
||||
pub(crate) xor_bytes: XORBytes,
|
||||
pub(crate) blocks_dir: PathBuf,
|
||||
@@ -183,20 +160,12 @@ pub(crate) struct ReaderInner {
|
||||
}
|
||||
|
||||
impl ReaderInner {
|
||||
/// Rescan the blocks directory and drop the file-handle cache in
|
||||
/// the same critical section. Old `Arc<File>`s already in flight
|
||||
/// stay valid until their last drop; new lookups go through the
|
||||
/// fresh inode.
|
||||
pub(crate) fn refresh_paths(&self) -> Result<BlkIndexToBlkPath> {
|
||||
let paths = BlkIndexToBlkPath::scan(&self.blocks_dir)?;
|
||||
self.blk_file_cache.write().clear();
|
||||
Ok(paths)
|
||||
}
|
||||
|
||||
/// The blk path is deterministic (`<blocks_dir>/blkNNNNN.dat`),
|
||||
/// so we don't need a directory scan to resolve it. Two threads
|
||||
/// racing on a missing entry will both call `File::open`; the
|
||||
/// loser's `Arc` is dropped via `or_insert`.
|
||||
fn open_blk(&self, blk_index: u16) -> Result<Arc<File>> {
|
||||
if let Some(file) = self.blk_file_cache.read().get(&blk_index).cloned() {
|
||||
return Ok(file);
|
||||
@@ -208,8 +177,6 @@ impl ReaderInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// Streaming reader at a position in a blk file. Holds an `Arc<File>`
|
||||
/// so it doesn't lock the file cache while the consumer is reading.
|
||||
pub struct BlkRead {
|
||||
file: Arc<File>,
|
||||
offset: u64,
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
//! Block parsing — XOR decoding, header peek, full body parse. Split
|
||||
//! so the scan loop can reject non-canonical blocks before copying.
|
||||
|
||||
use std::io::Cursor;
|
||||
|
||||
use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable};
|
||||
@@ -11,10 +8,8 @@ use crate::{XORBytes, XORIndex, canonical::CanonicalRange};
|
||||
|
||||
pub(crate) const HEADER_LEN: usize = 80;
|
||||
|
||||
/// Cheap canonical-membership check. Decodes the header onto a stack
|
||||
/// buffer so `bytes` stays untouched (the parser later re-XORs the
|
||||
/// full block from the original phase). Returning the parsed header
|
||||
/// lets the body parse skip a second decode.
|
||||
/// Decodes the header onto a stack buffer so `bytes` stays untouched:
|
||||
/// the body parse later re-XORs the full block from the original phase.
|
||||
pub(crate) fn peek_canonical(
|
||||
bytes: &[u8],
|
||||
mut xor_state: XORIndex,
|
||||
@@ -32,8 +27,6 @@ pub(crate) fn peek_canonical(
|
||||
Some((offset, header))
|
||||
}
|
||||
|
||||
/// Full XOR-decode + body parse. Takes the previously-parsed `header`
|
||||
/// from `peek_canonical` so we don't re-parse it.
|
||||
pub(crate) fn parse_canonical_body(
|
||||
mut bytes: Vec<u8>,
|
||||
metadata: BlkMetadata,
|
||||
@@ -52,12 +45,10 @@ pub(crate) fn parse_canonical_body(
|
||||
let mut cursor = Cursor::new(bytes);
|
||||
cursor.set_position(HEADER_LEN as u64);
|
||||
|
||||
// `consensus_decode_from_finite_reader` skips the `Take<R>` wrap
|
||||
// that `consensus_decode` applies to every nested field for
|
||||
// memory-safety — our cursor is already a bounded `Vec<u8>`, so
|
||||
// the extra wrapping is pure overhead. Per the crate docs it's
|
||||
// "marginally faster", but for a ~2000-tx block the per-field
|
||||
// compounding adds up.
|
||||
// `from_finite_reader` skips the `Take<R>` wrap that
|
||||
// `consensus_decode` applies to every nested field for memory
|
||||
// safety: our cursor is already a bounded `Vec<u8>`, so the
|
||||
// wrapping is pure overhead and compounds across ~2000 tx fields.
|
||||
let tx_count = VarInt::consensus_decode_from_finite_reader(&mut cursor)?.0 as usize;
|
||||
let mut txdata = Vec::with_capacity(tx_count);
|
||||
let mut tx_metadata = Vec::with_capacity(tx_count);
|
||||
|
||||
@@ -1,9 +1,3 @@
|
||||
//! Forward pipeline: 1 reader thread + N scoped parser threads.
|
||||
//! Reader walks blk files from a bisection lower bound, peeks each
|
||||
//! block's header against `CanonicalRange`, and ships hits to the
|
||||
//! parser pool. Parsers decode bodies in parallel and emit in-order
|
||||
//! through `ReorderState`.
|
||||
|
||||
use std::{fs, ops::ControlFlow, sync::OnceLock, thread};
|
||||
|
||||
use bitcoin::block::Header;
|
||||
@@ -21,8 +15,6 @@ use crate::{
|
||||
scan::scan_bytes,
|
||||
};
|
||||
|
||||
/// Reader→parser channel message. `header` was decoded during the
|
||||
/// peek and is reused so the parser doesn't redo it.
|
||||
struct ScannedBlock {
|
||||
metadata: BlkMetadata,
|
||||
bytes: Vec<u8>,
|
||||
@@ -31,9 +23,6 @@ struct ScannedBlock {
|
||||
header: Header,
|
||||
}
|
||||
|
||||
/// Single shared signal carrying both the cancel flag and (when set
|
||||
/// to `Failed`) the first parse error. `stop.get().is_some()` is the
|
||||
/// reader's cheap "should I stop" check.
|
||||
enum Stop {
|
||||
Done,
|
||||
Failed(Error),
|
||||
@@ -44,12 +33,11 @@ pub(super) fn pipeline_forward(
|
||||
first_blk_index: u16,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &CanonicalRange,
|
||||
anchor: Option<BlockHash>,
|
||||
send: &Sender<Result<ReadBlock>>,
|
||||
parser_threads: usize,
|
||||
) -> Result<()> {
|
||||
let (parser_send, parser_recv) = bounded::<ScannedBlock>(CHANNEL_CAPACITY);
|
||||
let reorder = Mutex::new(ReorderState::new(send.clone(), anchor));
|
||||
let reorder = Mutex::new(ReorderState::new(send.clone()));
|
||||
let stop: OnceLock<Stop> = OnceLock::new();
|
||||
|
||||
thread::scope(|scope| {
|
||||
@@ -57,14 +45,10 @@ pub(super) fn pipeline_forward(
|
||||
let parser_recv = parser_recv.clone();
|
||||
scope.spawn(|| parser_loop(parser_recv, &reorder, &stop, canonical, xor_bytes));
|
||||
}
|
||||
// Every parser owns its own clone; ours would otherwise leak
|
||||
// a dangling receiver.
|
||||
drop(parser_recv);
|
||||
|
||||
let read_result =
|
||||
read_and_dispatch(paths, first_blk_index, xor_bytes, canonical, &parser_send, &stop);
|
||||
// End-of-input signal so parser `for` loops exit and the
|
||||
// scope can join.
|
||||
drop(parser_send);
|
||||
read_result
|
||||
})?;
|
||||
@@ -75,8 +59,6 @@ pub(super) fn pipeline_forward(
|
||||
reorder.into_inner().finalize(canonical.len())
|
||||
}
|
||||
|
||||
/// Per-thread parser body: drain `parser_recv`, decode each block,
|
||||
/// emit through `reorder`. Stops on `stop`.
|
||||
fn parser_loop(
|
||||
parser_recv: Receiver<ScannedBlock>,
|
||||
reorder: &Mutex<ReorderState>,
|
||||
@@ -116,9 +98,6 @@ fn parser_loop(
|
||||
}
|
||||
}
|
||||
|
||||
/// `peek_canonical` filters orphans **before** the block bytes are
|
||||
/// cloned, so a sparse catchup avoids allocating for the ~99% of
|
||||
/// blocks outside the window.
|
||||
fn read_and_dispatch(
|
||||
paths: &BlkIndexToBlkPath,
|
||||
first_blk_index: u16,
|
||||
@@ -146,6 +125,14 @@ fn read_and_dispatch(
|
||||
else {
|
||||
return ControlFlow::Continue(());
|
||||
};
|
||||
if !canonical
|
||||
.verify_prev(canonical_offset, &BlockHash::from(header.prev_blockhash))
|
||||
{
|
||||
let _ = stop.set(Stop::Failed(Error::Internal(
|
||||
"forward pipeline: canonical batch stitched across a reorg",
|
||||
)));
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
let scanned = ScannedBlock {
|
||||
metadata,
|
||||
bytes: block_bytes.to_vec(),
|
||||
|
||||
@@ -1,22 +1,8 @@
|
||||
//! Two-strategy block-streaming pipeline. [`spawn`] picks between:
|
||||
//!
|
||||
//! * **forward** — one reader thread walks blk files in order from a
|
||||
//! bisection lower bound; canonical hits ship to a parser pool that
|
||||
//! emits in-order through [`reorder::ReorderState`].
|
||||
//! * **tail** — single-threaded reverse scan of the newest blk files,
|
||||
//! buffering matches in offset slots, then emitting forward with
|
||||
//! an inline chain check.
|
||||
//!
|
||||
//! Both strategies verify `block.header.prev_blockhash` against the
|
||||
//! previously emitted block — and against the user-supplied `anchor`
|
||||
//! for the very first block — and propagate a final `Err` to the
|
||||
//! consumer on chain breaks, parse failures, or missing blocks.
|
||||
|
||||
use std::{sync::Arc, thread};
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_rpc::Client;
|
||||
use brk_types::{BlockHash, Height, ReadBlock};
|
||||
use brk_types::{Height, ReadBlock};
|
||||
use crossbeam::channel::{Receiver, bounded};
|
||||
|
||||
use crate::{
|
||||
@@ -30,16 +16,13 @@ mod tail;
|
||||
|
||||
pub(crate) const CHANNEL_CAPACITY: usize = 50;
|
||||
|
||||
/// If `canonical.start` lives within this many files of the chain
|
||||
/// tip, use the reverse-scan pipeline. The forward pipeline pays the
|
||||
/// bisection + 21-file backoff (~2.7 GB of reads) regardless of how
|
||||
/// few canonical blocks live in the window, so for any tip-clustered
|
||||
/// catchup the tail wins until the window grows past this many files.
|
||||
/// Forward pays the bisection + 21-file backoff (~2.7 GB of reads)
|
||||
/// regardless of how few canonical blocks live in the window, so
|
||||
/// tail wins for any catchup within this many files of the tip.
|
||||
const TAIL_DISTANCE_FILES: usize = 8;
|
||||
|
||||
/// The indexer is CPU-bound on the consumer side, so 1 reader + 1
|
||||
/// parser leaves the rest of the cores for it. Bench tools that
|
||||
/// drain the channel cheaply can override.
|
||||
/// parser leaves the rest of the cores for it.
|
||||
pub(crate) const DEFAULT_PARSER_THREADS: usize = 1;
|
||||
|
||||
enum Strategy {
|
||||
@@ -47,21 +30,11 @@ enum Strategy {
|
||||
Forward { first_blk_index: u16 },
|
||||
}
|
||||
|
||||
/// `anchor`, when supplied, is the hash the consumer expects to be
|
||||
/// the **parent** of the first emitted block. Seeded into the chain
|
||||
/// check so a stale `Reader::after` anchor (e.g. the tip of a
|
||||
/// reorged-out chain) cannot silently produce a stitched stream.
|
||||
/// `None` skips the check (genesis or `range`-style calls have no
|
||||
/// anchor to verify against).
|
||||
pub(crate) fn spawn(
|
||||
reader: Arc<ReaderInner>,
|
||||
canonical: CanonicalRange,
|
||||
anchor: Option<BlockHash>,
|
||||
parser_threads: usize,
|
||||
) -> Result<Receiver<Result<ReadBlock>>> {
|
||||
// Cap at the parser channel capacity: beyond that, extra parsers
|
||||
// are idle (they all contend for the same buffered items) and
|
||||
// absurd inputs would otherwise OOM the scoped spawn.
|
||||
let parser_threads = parser_threads.clamp(1, CHANNEL_CAPACITY);
|
||||
|
||||
if canonical.is_empty() {
|
||||
@@ -77,20 +50,18 @@ pub(crate) fn spawn(
|
||||
thread::spawn(move || {
|
||||
let result = match strategy {
|
||||
Strategy::Tail => {
|
||||
tail::pipeline_tail(&reader.client, &paths, xor_bytes, &canonical, anchor, &send)
|
||||
tail::pipeline_tail(&reader.client, &paths, xor_bytes, &canonical, &send)
|
||||
}
|
||||
Strategy::Forward { first_blk_index } => forward::pipeline_forward(
|
||||
&paths,
|
||||
first_blk_index,
|
||||
xor_bytes,
|
||||
&canonical,
|
||||
anchor,
|
||||
&send,
|
||||
parser_threads,
|
||||
),
|
||||
};
|
||||
if let Err(e) = result {
|
||||
// No-op if the consumer already dropped the receiver.
|
||||
let _ = send.send(Err(e));
|
||||
}
|
||||
});
|
||||
@@ -98,11 +69,6 @@ pub(crate) fn spawn(
|
||||
Ok(recv)
|
||||
}
|
||||
|
||||
/// Tail iff one of the last `TAIL_DISTANCE_FILES` files starts at a
|
||||
/// height ≤ `canonical_start`; that file is where tail iteration
|
||||
/// would land. Otherwise bisect for the forward start. Genesis-rooted
|
||||
/// catchups skip the tail probes since no file's first block is ≤
|
||||
/// genesis.
|
||||
fn pick_strategy(
|
||||
client: &Client,
|
||||
paths: &BlkIndexToBlkPath,
|
||||
|
||||
@@ -1,55 +1,28 @@
|
||||
//! In-order emission buffer + chain-continuity check used by the
|
||||
//! forward pipeline. Parsers complete blocks out of order, so this
|
||||
//! parks ahead-of-line matches in `pending` until `next_offset`
|
||||
//! catches up.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
|
||||
use brk_error::{Error, Result};
|
||||
use brk_types::{BlockHash, ReadBlock};
|
||||
use brk_types::ReadBlock;
|
||||
use crossbeam::channel::Sender;
|
||||
use rustc_hash::FxHashMap;
|
||||
use tracing::warn;
|
||||
|
||||
/// Accessed by the parser pool under a `parking_lot::Mutex` owned by
|
||||
/// `pipeline_forward`; at `parser_threads = 1` the lock is always
|
||||
/// uncontended.
|
||||
pub(super) struct ReorderState {
|
||||
pub(super) next_offset: u32,
|
||||
pending: FxHashMap<u32, ReadBlock>,
|
||||
send_to_consumer: Sender<Result<ReadBlock>>,
|
||||
/// Seeded with the user-supplied anchor so the first emit is
|
||||
/// also verified against it.
|
||||
last_emitted_hash: Option<BlockHash>,
|
||||
/// A `prev_blockhash` mismatch fires this; converted into a
|
||||
/// final `Err` by `finalize`.
|
||||
chain_broken: bool,
|
||||
/// Distinguishes "consumer cancelled" from "ran out of work
|
||||
/// early" in the missing-blocks check inside `finalize`.
|
||||
consumer_dropped: bool,
|
||||
}
|
||||
|
||||
impl ReorderState {
|
||||
pub(super) fn new(send_to_consumer: Sender<Result<ReadBlock>>, anchor: Option<BlockHash>) -> Self {
|
||||
pub(super) fn new(send_to_consumer: Sender<Result<ReadBlock>>) -> Self {
|
||||
Self {
|
||||
next_offset: 0,
|
||||
pending: FxHashMap::default(),
|
||||
send_to_consumer,
|
||||
last_emitted_hash: anchor,
|
||||
chain_broken: false,
|
||||
consumer_dropped: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolves the pipeline's exit state. Called by
|
||||
/// `pipeline_forward` after the read loop has finished and all
|
||||
/// parser threads have joined.
|
||||
pub(super) fn finalize(self, expected_count: usize) -> Result<()> {
|
||||
if self.chain_broken {
|
||||
return Err(Error::Internal(
|
||||
"forward pipeline: canonical batch stitched across a reorg",
|
||||
));
|
||||
}
|
||||
if !self.consumer_dropped && (self.next_offset as usize) < expected_count {
|
||||
return Err(Error::Internal(
|
||||
"forward pipeline: blk files missing canonical blocks",
|
||||
@@ -58,18 +31,14 @@ impl ReorderState {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emits `block` if it's the next expected offset (and drains
|
||||
/// any contiguous pending matches), otherwise parks it. Returns
|
||||
/// `false` once the pipeline should stop (consumer drop or chain
|
||||
/// break).
|
||||
pub(super) fn try_emit(&mut self, offset: u32, block: ReadBlock) -> bool {
|
||||
match offset.cmp(&self.next_offset) {
|
||||
Ordering::Equal => {
|
||||
if !self.send_in_order(block) {
|
||||
if !self.send(block) {
|
||||
return false;
|
||||
}
|
||||
while let Some(next) = self.pending.remove(&self.next_offset) {
|
||||
if !self.send_in_order(next) {
|
||||
if !self.send(next) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -79,31 +48,15 @@ impl ReorderState {
|
||||
self.pending.insert(offset, block);
|
||||
true
|
||||
}
|
||||
// Each canonical hash appears at exactly one offset and
|
||||
// each block is parsed once, so this is unreachable in
|
||||
// practice.
|
||||
Ordering::Less => true,
|
||||
}
|
||||
}
|
||||
|
||||
fn send_in_order(&mut self, block: ReadBlock) -> bool {
|
||||
if let Some(last) = &self.last_emitted_hash {
|
||||
let prev = BlockHash::from(block.header.prev_blockhash);
|
||||
if prev != *last {
|
||||
warn!(
|
||||
"canonical chain broken at offset {}: expected prev={} got {}",
|
||||
self.next_offset, last, prev,
|
||||
);
|
||||
self.chain_broken = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
let hash = block.hash().clone();
|
||||
fn send(&mut self, block: ReadBlock) -> bool {
|
||||
if self.send_to_consumer.send(Ok(block)).is_err() {
|
||||
self.consumer_dropped = true;
|
||||
return false;
|
||||
}
|
||||
self.last_emitted_hash = Some(hash);
|
||||
self.next_offset += 1;
|
||||
true
|
||||
}
|
||||
|
||||
@@ -1,18 +1,12 @@
|
||||
//! Tail pipeline: single-threaded reverse scan of the newest blk
|
||||
//! files, reading each file in `TAIL_CHUNK`-sized slices from tail
|
||||
//! to head so we only touch bytes covering the canonical window.
|
||||
//! Matches fill offset slots and are emitted forward with an inline
|
||||
//! chain check.
|
||||
|
||||
use std::{fs::File, ops::ControlFlow, os::unix::fs::FileExt};
|
||||
|
||||
use brk_error::{Error, Result};
|
||||
use brk_rpc::Client;
|
||||
use brk_types::{Height, ReadBlock};
|
||||
use brk_types::{BlockHash, Height, ReadBlock};
|
||||
use crossbeam::channel::Sender;
|
||||
|
||||
use crate::{
|
||||
BlkIndexToBlkPath, BlockHash, OUT_OF_ORDER_FILE_BACKOFF, XORBytes, bisect,
|
||||
BlkIndexToBlkPath, OUT_OF_ORDER_FILE_BACKOFF, XORBytes, bisect,
|
||||
canonical::CanonicalRange,
|
||||
parse::{parse_canonical_body, peek_canonical},
|
||||
scan::scan_bytes,
|
||||
@@ -25,20 +19,14 @@ pub(super) fn pipeline_tail(
|
||||
paths: &BlkIndexToBlkPath,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &CanonicalRange,
|
||||
anchor: Option<BlockHash>,
|
||||
send: &Sender<Result<ReadBlock>>,
|
||||
) -> Result<()> {
|
||||
let mut slots: Vec<Option<ReadBlock>> = (0..canonical.len()).map(|_| None).collect();
|
||||
let mut remaining = canonical.len();
|
||||
let mut parse_failure: Option<Error> = None;
|
||||
// Bailout streak: gives up after OUT_OF_ORDER_FILE_BACKOFF
|
||||
// consecutive files below the canonical window so a permanent
|
||||
// miss doesn't scan the entire chain in reverse.
|
||||
let mut below_floor_streak: usize = 0;
|
||||
|
||||
'files: for (&blk_index, path) in paths.iter().rev() {
|
||||
// If this file's first block is below the lowest still-missing
|
||||
// canonical height, we've walked past the window.
|
||||
if let Some(missing_idx) = slots.iter().position(Option::is_none)
|
||||
&& let Ok(first_height) = bisect::first_block_height(client, path, xor_bytes)
|
||||
{
|
||||
@@ -61,12 +49,6 @@ pub(super) fn pipeline_tail(
|
||||
continue;
|
||||
}
|
||||
|
||||
// Chunked reverse read. `end` is the file position we've
|
||||
// already covered (exclusive). Each iteration reads
|
||||
// [end - TAIL_CHUNK..end] and prepends it to any `spillover`
|
||||
// carried from the previous iteration — the pre-first-magic
|
||||
// bytes of that chunk, which must belong to a block that
|
||||
// started in this earlier region.
|
||||
let mut end = file_len;
|
||||
let mut spillover: Vec<u8> = Vec::new();
|
||||
|
||||
@@ -78,7 +60,6 @@ pub(super) fn pipeline_tail(
|
||||
buf[chunk_len..].copy_from_slice(&spillover);
|
||||
spillover.clear();
|
||||
|
||||
// `buf` now represents file bytes [start..start + buf.len()].
|
||||
let result = scan_bytes(
|
||||
&mut buf,
|
||||
blk_index,
|
||||
@@ -93,6 +74,14 @@ pub(super) fn pipeline_tail(
|
||||
if slots[offset as usize].is_some() {
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
if !canonical
|
||||
.verify_prev(offset, &BlockHash::from(header.prev_blockhash))
|
||||
{
|
||||
parse_failure = Some(Error::Internal(
|
||||
"tail pipeline: canonical batch stitched across a reorg",
|
||||
));
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
let height = Height::from(*canonical.start + offset);
|
||||
match parse_canonical_body(
|
||||
block_bytes.to_vec(),
|
||||
@@ -126,9 +115,8 @@ pub(super) fn pipeline_tail(
|
||||
break 'files;
|
||||
}
|
||||
|
||||
// Carry pre-first-magic bytes into the next (earlier)
|
||||
// chunk so a block that straddled this chunk's start is
|
||||
// stitched back together.
|
||||
// Carry pre-first-magic bytes into the earlier chunk so a
|
||||
// block straddling the boundary is stitched back together.
|
||||
end = start;
|
||||
if end > 0 {
|
||||
let prefix_len = result.first_magic.unwrap_or(buf.len());
|
||||
@@ -143,22 +131,10 @@ pub(super) fn pipeline_tail(
|
||||
));
|
||||
}
|
||||
|
||||
// Inline chain check; ReorderState would be 130 lines of
|
||||
// machinery for the single-threaded path.
|
||||
let mut last_hash: Option<BlockHash> = anchor;
|
||||
for slot in slots {
|
||||
let block = slot.expect("tail pipeline left a slot empty after `remaining == 0`");
|
||||
if let Some(prev) = &last_hash {
|
||||
let actual_prev = BlockHash::from(block.header.prev_blockhash);
|
||||
if actual_prev != *prev {
|
||||
return Err(Error::Internal(
|
||||
"tail pipeline: canonical batch stitched across a reorg",
|
||||
));
|
||||
}
|
||||
}
|
||||
last_hash = Some(block.hash().clone());
|
||||
if send.send(Ok(block)).is_err() {
|
||||
return Ok(()); // consumer dropped — clean exit
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -6,9 +6,9 @@ use crate::{XORBytes, XORIndex, xor_bytes::XOR_LEN};
|
||||
|
||||
const MAGIC_BYTES: [u8; 4] = [0xF9, 0xBE, 0xB4, 0xD9];
|
||||
|
||||
/// Returns the position **immediately after** the matched magic, or
|
||||
/// Returns the position immediately after the matched magic, or
|
||||
/// `None` if no match. Advances `xor_i` by the bytes consumed either
|
||||
/// way. First-byte fast-fail keeps the inner loop tight.
|
||||
/// way.
|
||||
pub(crate) fn find_magic(bytes: &[u8], xor_i: &mut XORIndex, xor_bytes: XORBytes) -> Option<usize> {
|
||||
let len = bytes.len();
|
||||
if len < MAGIC_BYTES.len() {
|
||||
@@ -42,17 +42,13 @@ pub(crate) fn find_magic(bytes: &[u8], xor_i: &mut XORIndex, xor_bytes: XORBytes
|
||||
None
|
||||
}
|
||||
|
||||
/// Position (relative to `buf`) of the first matched magic byte.
|
||||
/// Used by the chunked tail pipeline to carry pre-first-magic bytes
|
||||
/// into the next (earlier) chunk.
|
||||
pub(crate) struct ScanResult {
|
||||
pub first_magic: Option<usize>,
|
||||
}
|
||||
|
||||
/// Scans `buf` for blocks and calls `on_block` for each. `file_offset`
|
||||
/// is the absolute file position of `buf[0]` — used to seed the XOR
|
||||
/// phase and to report absolute `BlkPosition`s so the chunked tail
|
||||
/// pipeline can read mid-file slices.
|
||||
/// is the absolute file position of `buf[0]`, used to seed the XOR
|
||||
/// phase and to report absolute `BlkPosition`s.
|
||||
pub(crate) fn scan_bytes(
|
||||
buf: &mut [u8],
|
||||
blk_index: u16,
|
||||
|
||||
Reference in New Issue
Block a user