diff --git a/crates/brk_reader/src/bisect.rs b/crates/brk_reader/src/bisect.rs index ceba50f63..ab4028954 100644 --- a/crates/brk_reader/src/bisect.rs +++ b/crates/brk_reader/src/bisect.rs @@ -1,7 +1,3 @@ -//! Helpers for picking where to start scanning: probe the first -//! block of a file ([`first_block_height`]) and bisect the blk-index -//! map for a target height ([`find_start_blk_index`]). - use std::{fs::File, io::Read, path::Path}; use bitcoin::{block::Header, consensus::Decodable}; @@ -17,8 +13,6 @@ use crate::{ const PROBE_BUF_LEN: usize = 4096; -/// Decodes the first block in `blk_path` and resolves its height via -/// RPC. One short read + one RPC. pub(crate) fn first_block_height( client: &Client, blk_path: &Path, @@ -32,15 +26,11 @@ pub(crate) fn first_block_height( let magic_end = find_magic(&buf[..n], &mut xor_i, xor_bytes) .ok_or_else(|| Error::NotFound("No magic bytes found".into()))?; - // Decode the 4-byte size + 80-byte header in one pass; the size - // is discarded. Bounds-check first so a corrupt file whose only - // magic-shaped bytes sit at the end of the probe doesn't index - // past `n`. let header_end = magic_end + 4 + HEADER_LEN; if header_end > n { warn!( "first_block_height: {} has magic-shaped bytes at offset {} but \ - not enough room in the {}-byte probe to decode the header — \ + not enough room in the {}-byte probe to decode the header, \ the file is probably corrupt", blk_path.display(), magic_end - 4, @@ -59,15 +49,11 @@ pub(crate) fn first_block_height( Ok(Height::new(height)) } -/// Bisects the map for the file whose first block height is ≤ -/// `target_start`, then backs off [`OUT_OF_ORDER_FILE_BACKOFF`] files. -/// Always returns a valid blk index — read errors mid-search log and -/// fall through to the backoff (or to 0 if the map is empty). -/// -/// On a transient read error we **break** rather than `left = mid + 1`: -/// the height bound at `mid` is unknown, so any further bisection on -/// that side could skip valid lower indices. Falling through to the -/// backoff still gives a safe lower bound. +/// Bisects for the file whose first block height is <= `target_start`, +/// then backs off [`OUT_OF_ORDER_FILE_BACKOFF`] files. A transient +/// read error mid-bisect breaks out rather than narrowing further: +/// the bound at `mid` is unknown, so any further step could skip +/// valid lower indices. The backoff still provides a safe lower bound. pub(crate) fn find_start_blk_index( client: &Client, target_start: Height, diff --git a/crates/brk_reader/src/canonical.rs b/crates/brk_reader/src/canonical.rs index 219408237..094f9b142 100644 --- a/crates/brk_reader/src/canonical.rs +++ b/crates/brk_reader/src/canonical.rs @@ -1,38 +1,32 @@ -//! `CanonicalRange`: every canonical block hash in a height window, -//! pre-fetched once via [`brk_rpc::Client::get_block_hashes_range`]. -//! Used as the authoritative "is this block on the main chain?" -//! filter so the scan pipeline never needs a per-block RPC call. - use brk_error::Result; use brk_rpc::Client; use brk_types::{BlockHash, Height}; use rustc_hash::FxHashMap; -/// Keyed on the full 32-byte hash because a prefix collision would -/// silently drop both blocks; the ~24 MB extra RAM is negligible -/// against the 128 MB blk reads happening in parallel. +/// Keyed on the full 32-byte hash: a prefix collision would +/// silently drop both blocks. pub struct CanonicalRange { pub start: Height, + anchor: Option, by_hash: FxHashMap, } impl CanonicalRange { - /// Resolves canonical hashes for every height strictly after - /// `anchor` up to `tip` inclusive. `anchor = None` starts at - /// genesis. pub fn walk(client: &Client, anchor: Option<&BlockHash>, tip: Height) -> Result { let start = match anchor { Some(hash) => Height::from(client.get_block_header_info(hash)?.height + 1), None => Height::ZERO, }; - Self::between(client, start, tip) + let mut range = Self::between(client, start, tip)?; + range.anchor = anchor.cloned(); + Ok(range) } - /// Resolves canonical hashes for every height in `start..=end`. pub fn between(client: &Client, start: Height, end: Height) -> Result { if start > end { return Ok(Self { start, + anchor: None, by_hash: FxHashMap::default(), }); } @@ -42,7 +36,11 @@ impl CanonicalRange { .enumerate() .map(|(i, h)| (h, i as u32)) .collect(); - Ok(Self { start, by_hash }) + Ok(Self { + start, + anchor: None, + by_hash, + }) } pub fn len(&self) -> usize { @@ -53,9 +51,18 @@ impl CanonicalRange { self.by_hash.is_empty() } - /// Offset-from-`start` of `hash` iff it's on the canonical chain. #[inline] pub(crate) fn offset_of(&self, hash: &BlockHash) -> Option { self.by_hash.get(hash).copied() } + + /// `prev_hash` must match the canonical hash at `offset - 1`, or + /// the anchor when `offset == 0`. + #[inline] + pub(crate) fn verify_prev(&self, offset: u32, prev_hash: &BlockHash) -> bool { + match offset { + 0 => self.anchor.as_ref().is_none_or(|a| a == prev_hash), + _ => self.offset_of(prev_hash) == Some(offset - 1), + } + } } diff --git a/crates/brk_reader/src/lib.rs b/crates/brk_reader/src/lib.rs index df4c22ddb..b6a5fe734 100644 --- a/crates/brk_reader/src/lib.rs +++ b/crates/brk_reader/src/lib.rs @@ -30,27 +30,19 @@ pub use canonical::CanonicalRange; pub use xor_bytes::*; pub use xor_index::*; -/// Files of out-of-order play to tolerate. bitcoind sometimes writes -/// blocks slightly out of height order across files (initial sync, -/// headers-first body fetch, reindex), so a single "out of bounds" -/// signal isn't enough to declare failure. Used by the forward -/// bisection backoff and the tail bailout streak. +/// bitcoind writes blocks slightly out of height order across files +/// during initial sync, headers-first body fetch, and reindex, so a +/// single "out of bounds" signal isn't enough to declare failure. pub(crate) const OUT_OF_ORDER_FILE_BACKOFF: usize = 21; const TARGET_NOFILE: u64 = 15_000; /// Bitcoin Core blk-file reader. Cheap to clone (`Arc`-backed) and -/// thread-safe: every method takes `&self` and the -/// `Receiver>` from the streaming API can be -/// drained from any thread. +/// thread-safe. #[derive(Debug, Clone)] pub struct Reader(Arc); impl Reader { - /// Raises the per-process `NOFILE` limit so the file-handle cache - /// can keep one open `File` per `blkNNNNN.dat`. For tests or - /// embeddings that don't want the process-wide rlimit side - /// effect, use [`Self::new_without_rlimit`]. pub fn new(blocks_dir: PathBuf, client: &Client) -> Self { Self::raise_fd_limit(); Self::new_without_rlimit(blocks_dir, client) @@ -65,13 +57,9 @@ impl Reader { })) } - /// Called automatically by [`Self::new`]. Exposed so callers - /// using [`Self::new_without_rlimit`] can opt in once. - /// - /// Raises **only the soft limit**, clamped to the current hard - /// limit — raising the hard limit requires `CAP_SYS_RESOURCE` - /// and would fail (dropping the entire call) on containers and - /// unprivileged macOS user processes. + /// Raises only the soft limit, clamped to the current hard limit: + /// raising the hard limit requires `CAP_SYS_RESOURCE` and would + /// fail on containers and unprivileged macOS processes. pub fn raise_fd_limit() { let (soft, hard) = rlimit::getrlimit(rlimit::Resource::NOFILE).unwrap_or((0, 0)); let new_soft = soft.max(TARGET_NOFILE).min(hard); @@ -95,15 +83,10 @@ impl Reader { self.0.xor_bytes } - /// Decode the first block in `blk_path` and resolve its height - /// via RPC. Exposed for inspection tools (see - /// `examples/blk_heights.rs`). pub fn first_block_height(&self, blk_path: &Path, xor_bytes: XORBytes) -> Result { bisect::first_block_height(&self.0.client, blk_path, xor_bytes) } - /// `read_exact_at` so a short read becomes a hard error instead - /// of silent corruption from the buffer's zero-init tail. pub fn read_raw_bytes(&self, position: BlkPosition, size: usize) -> Result> { let file = self.0.open_blk(position.blk_index())?; let mut buffer = vec![0u8; size]; @@ -112,8 +95,6 @@ impl Reader { Ok(buffer) } - /// Streaming `Read` at `position`. Holds an `Arc` so the - /// cache lock isn't held across the I/O. pub fn reader_at(&self, position: BlkPosition) -> Result { let file = self.0.open_blk(position.blk_index())?; Ok(BlkRead { @@ -124,16 +105,18 @@ impl Reader { }) } + /// Streams every canonical block from genesis to the current + /// chain tip. + pub fn all(&self) -> Result>> { + self.after(None) + } + /// Streams every canonical block strictly after `hash` (or from /// genesis when `None`) up to the current chain tip. pub fn after(&self, hash: Option) -> Result>> { self.after_with(hash, pipeline::DEFAULT_PARSER_THREADS) } - /// Like [`after`](Self::after) with a configurable parser-thread - /// count. The default of 1 reader + 1 parser leaves the rest of - /// the cores for the indexer; bench tools that drain the channel - /// cheaply can override. pub fn after_with( &self, hash: Option, @@ -141,7 +124,7 @@ impl Reader { ) -> Result>> { let tip = self.0.client.get_last_height()?; let canonical = CanonicalRange::walk(&self.0.client, hash.as_ref(), tip)?; - pipeline::spawn(self.0.clone(), canonical, hash, parser_threads) + pipeline::spawn(self.0.clone(), canonical, parser_threads) } /// Inclusive height range `start..=end` in canonical order. @@ -162,20 +145,14 @@ impl Reader { ))); } let canonical = CanonicalRange::between(&self.0.client, start, end)?; - // No anchor: caller asked for "blocks at heights X..=Y", they - // get whatever bitcoind says is canonical there. - pipeline::spawn(self.0.clone(), canonical, None, parser_threads) + pipeline::spawn(self.0.clone(), canonical, parser_threads) } } -/// `pub(crate)` so `pipeline` can capture it via `Arc` -/// for spawned workers; everything else goes through `Reader`. #[derive(Debug)] pub(crate) struct ReaderInner { - /// Invalidated on every [`refresh_paths`](Self::refresh_paths) so - /// a pruned/reindexed blk file can't keep serving stale bytes - /// from a dead inode. `Arc` lets us hand out cheap clones - /// without holding the cache lock during I/O. + /// Invalidated on every `refresh_paths` so a pruned or reindexed + /// blk file can't keep serving stale bytes from a dead inode. blk_file_cache: RwLock>>, pub(crate) xor_bytes: XORBytes, pub(crate) blocks_dir: PathBuf, @@ -183,20 +160,12 @@ pub(crate) struct ReaderInner { } impl ReaderInner { - /// Rescan the blocks directory and drop the file-handle cache in - /// the same critical section. Old `Arc`s already in flight - /// stay valid until their last drop; new lookups go through the - /// fresh inode. pub(crate) fn refresh_paths(&self) -> Result { let paths = BlkIndexToBlkPath::scan(&self.blocks_dir)?; self.blk_file_cache.write().clear(); Ok(paths) } - /// The blk path is deterministic (`/blkNNNNN.dat`), - /// so we don't need a directory scan to resolve it. Two threads - /// racing on a missing entry will both call `File::open`; the - /// loser's `Arc` is dropped via `or_insert`. fn open_blk(&self, blk_index: u16) -> Result> { if let Some(file) = self.blk_file_cache.read().get(&blk_index).cloned() { return Ok(file); @@ -208,8 +177,6 @@ impl ReaderInner { } } -/// Streaming reader at a position in a blk file. Holds an `Arc` -/// so it doesn't lock the file cache while the consumer is reading. pub struct BlkRead { file: Arc, offset: u64, diff --git a/crates/brk_reader/src/parse.rs b/crates/brk_reader/src/parse.rs index de593b9f3..4dd8cb783 100644 --- a/crates/brk_reader/src/parse.rs +++ b/crates/brk_reader/src/parse.rs @@ -1,6 +1,3 @@ -//! Block parsing — XOR decoding, header peek, full body parse. Split -//! so the scan loop can reject non-canonical blocks before copying. - use std::io::Cursor; use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable}; @@ -11,10 +8,8 @@ use crate::{XORBytes, XORIndex, canonical::CanonicalRange}; pub(crate) const HEADER_LEN: usize = 80; -/// Cheap canonical-membership check. Decodes the header onto a stack -/// buffer so `bytes` stays untouched (the parser later re-XORs the -/// full block from the original phase). Returning the parsed header -/// lets the body parse skip a second decode. +/// Decodes the header onto a stack buffer so `bytes` stays untouched: +/// the body parse later re-XORs the full block from the original phase. pub(crate) fn peek_canonical( bytes: &[u8], mut xor_state: XORIndex, @@ -32,8 +27,6 @@ pub(crate) fn peek_canonical( Some((offset, header)) } -/// Full XOR-decode + body parse. Takes the previously-parsed `header` -/// from `peek_canonical` so we don't re-parse it. pub(crate) fn parse_canonical_body( mut bytes: Vec, metadata: BlkMetadata, @@ -52,12 +45,10 @@ pub(crate) fn parse_canonical_body( let mut cursor = Cursor::new(bytes); cursor.set_position(HEADER_LEN as u64); - // `consensus_decode_from_finite_reader` skips the `Take` wrap - // that `consensus_decode` applies to every nested field for - // memory-safety — our cursor is already a bounded `Vec`, so - // the extra wrapping is pure overhead. Per the crate docs it's - // "marginally faster", but for a ~2000-tx block the per-field - // compounding adds up. + // `from_finite_reader` skips the `Take` wrap that + // `consensus_decode` applies to every nested field for memory + // safety: our cursor is already a bounded `Vec`, so the + // wrapping is pure overhead and compounds across ~2000 tx fields. let tx_count = VarInt::consensus_decode_from_finite_reader(&mut cursor)?.0 as usize; let mut txdata = Vec::with_capacity(tx_count); let mut tx_metadata = Vec::with_capacity(tx_count); diff --git a/crates/brk_reader/src/pipeline/forward.rs b/crates/brk_reader/src/pipeline/forward.rs index ae71ed603..147025399 100644 --- a/crates/brk_reader/src/pipeline/forward.rs +++ b/crates/brk_reader/src/pipeline/forward.rs @@ -1,9 +1,3 @@ -//! Forward pipeline: 1 reader thread + N scoped parser threads. -//! Reader walks blk files from a bisection lower bound, peeks each -//! block's header against `CanonicalRange`, and ships hits to the -//! parser pool. Parsers decode bodies in parallel and emit in-order -//! through `ReorderState`. - use std::{fs, ops::ControlFlow, sync::OnceLock, thread}; use bitcoin::block::Header; @@ -21,8 +15,6 @@ use crate::{ scan::scan_bytes, }; -/// Reader→parser channel message. `header` was decoded during the -/// peek and is reused so the parser doesn't redo it. struct ScannedBlock { metadata: BlkMetadata, bytes: Vec, @@ -31,9 +23,6 @@ struct ScannedBlock { header: Header, } -/// Single shared signal carrying both the cancel flag and (when set -/// to `Failed`) the first parse error. `stop.get().is_some()` is the -/// reader's cheap "should I stop" check. enum Stop { Done, Failed(Error), @@ -44,12 +33,11 @@ pub(super) fn pipeline_forward( first_blk_index: u16, xor_bytes: XORBytes, canonical: &CanonicalRange, - anchor: Option, send: &Sender>, parser_threads: usize, ) -> Result<()> { let (parser_send, parser_recv) = bounded::(CHANNEL_CAPACITY); - let reorder = Mutex::new(ReorderState::new(send.clone(), anchor)); + let reorder = Mutex::new(ReorderState::new(send.clone())); let stop: OnceLock = OnceLock::new(); thread::scope(|scope| { @@ -57,14 +45,10 @@ pub(super) fn pipeline_forward( let parser_recv = parser_recv.clone(); scope.spawn(|| parser_loop(parser_recv, &reorder, &stop, canonical, xor_bytes)); } - // Every parser owns its own clone; ours would otherwise leak - // a dangling receiver. drop(parser_recv); let read_result = read_and_dispatch(paths, first_blk_index, xor_bytes, canonical, &parser_send, &stop); - // End-of-input signal so parser `for` loops exit and the - // scope can join. drop(parser_send); read_result })?; @@ -75,8 +59,6 @@ pub(super) fn pipeline_forward( reorder.into_inner().finalize(canonical.len()) } -/// Per-thread parser body: drain `parser_recv`, decode each block, -/// emit through `reorder`. Stops on `stop`. fn parser_loop( parser_recv: Receiver, reorder: &Mutex, @@ -116,9 +98,6 @@ fn parser_loop( } } -/// `peek_canonical` filters orphans **before** the block bytes are -/// cloned, so a sparse catchup avoids allocating for the ~99% of -/// blocks outside the window. fn read_and_dispatch( paths: &BlkIndexToBlkPath, first_blk_index: u16, @@ -146,6 +125,14 @@ fn read_and_dispatch( else { return ControlFlow::Continue(()); }; + if !canonical + .verify_prev(canonical_offset, &BlockHash::from(header.prev_blockhash)) + { + let _ = stop.set(Stop::Failed(Error::Internal( + "forward pipeline: canonical batch stitched across a reorg", + ))); + return ControlFlow::Break(()); + } let scanned = ScannedBlock { metadata, bytes: block_bytes.to_vec(), diff --git a/crates/brk_reader/src/pipeline/mod.rs b/crates/brk_reader/src/pipeline/mod.rs index 5761e206c..2cc7ad727 100644 --- a/crates/brk_reader/src/pipeline/mod.rs +++ b/crates/brk_reader/src/pipeline/mod.rs @@ -1,22 +1,8 @@ -//! Two-strategy block-streaming pipeline. [`spawn`] picks between: -//! -//! * **forward** — one reader thread walks blk files in order from a -//! bisection lower bound; canonical hits ship to a parser pool that -//! emits in-order through [`reorder::ReorderState`]. -//! * **tail** — single-threaded reverse scan of the newest blk files, -//! buffering matches in offset slots, then emitting forward with -//! an inline chain check. -//! -//! Both strategies verify `block.header.prev_blockhash` against the -//! previously emitted block — and against the user-supplied `anchor` -//! for the very first block — and propagate a final `Err` to the -//! consumer on chain breaks, parse failures, or missing blocks. - use std::{sync::Arc, thread}; use brk_error::Result; use brk_rpc::Client; -use brk_types::{BlockHash, Height, ReadBlock}; +use brk_types::{Height, ReadBlock}; use crossbeam::channel::{Receiver, bounded}; use crate::{ @@ -30,16 +16,13 @@ mod tail; pub(crate) const CHANNEL_CAPACITY: usize = 50; -/// If `canonical.start` lives within this many files of the chain -/// tip, use the reverse-scan pipeline. The forward pipeline pays the -/// bisection + 21-file backoff (~2.7 GB of reads) regardless of how -/// few canonical blocks live in the window, so for any tip-clustered -/// catchup the tail wins until the window grows past this many files. +/// Forward pays the bisection + 21-file backoff (~2.7 GB of reads) +/// regardless of how few canonical blocks live in the window, so +/// tail wins for any catchup within this many files of the tip. const TAIL_DISTANCE_FILES: usize = 8; /// The indexer is CPU-bound on the consumer side, so 1 reader + 1 -/// parser leaves the rest of the cores for it. Bench tools that -/// drain the channel cheaply can override. +/// parser leaves the rest of the cores for it. pub(crate) const DEFAULT_PARSER_THREADS: usize = 1; enum Strategy { @@ -47,21 +30,11 @@ enum Strategy { Forward { first_blk_index: u16 }, } -/// `anchor`, when supplied, is the hash the consumer expects to be -/// the **parent** of the first emitted block. Seeded into the chain -/// check so a stale `Reader::after` anchor (e.g. the tip of a -/// reorged-out chain) cannot silently produce a stitched stream. -/// `None` skips the check (genesis or `range`-style calls have no -/// anchor to verify against). pub(crate) fn spawn( reader: Arc, canonical: CanonicalRange, - anchor: Option, parser_threads: usize, ) -> Result>> { - // Cap at the parser channel capacity: beyond that, extra parsers - // are idle (they all contend for the same buffered items) and - // absurd inputs would otherwise OOM the scoped spawn. let parser_threads = parser_threads.clamp(1, CHANNEL_CAPACITY); if canonical.is_empty() { @@ -77,20 +50,18 @@ pub(crate) fn spawn( thread::spawn(move || { let result = match strategy { Strategy::Tail => { - tail::pipeline_tail(&reader.client, &paths, xor_bytes, &canonical, anchor, &send) + tail::pipeline_tail(&reader.client, &paths, xor_bytes, &canonical, &send) } Strategy::Forward { first_blk_index } => forward::pipeline_forward( &paths, first_blk_index, xor_bytes, &canonical, - anchor, &send, parser_threads, ), }; if let Err(e) = result { - // No-op if the consumer already dropped the receiver. let _ = send.send(Err(e)); } }); @@ -98,11 +69,6 @@ pub(crate) fn spawn( Ok(recv) } -/// Tail iff one of the last `TAIL_DISTANCE_FILES` files starts at a -/// height ≤ `canonical_start`; that file is where tail iteration -/// would land. Otherwise bisect for the forward start. Genesis-rooted -/// catchups skip the tail probes since no file's first block is ≤ -/// genesis. fn pick_strategy( client: &Client, paths: &BlkIndexToBlkPath, diff --git a/crates/brk_reader/src/pipeline/reorder.rs b/crates/brk_reader/src/pipeline/reorder.rs index d55876823..20cc44432 100644 --- a/crates/brk_reader/src/pipeline/reorder.rs +++ b/crates/brk_reader/src/pipeline/reorder.rs @@ -1,55 +1,28 @@ -//! In-order emission buffer + chain-continuity check used by the -//! forward pipeline. Parsers complete blocks out of order, so this -//! parks ahead-of-line matches in `pending` until `next_offset` -//! catches up. - use std::cmp::Ordering; use brk_error::{Error, Result}; -use brk_types::{BlockHash, ReadBlock}; +use brk_types::ReadBlock; use crossbeam::channel::Sender; use rustc_hash::FxHashMap; -use tracing::warn; -/// Accessed by the parser pool under a `parking_lot::Mutex` owned by -/// `pipeline_forward`; at `parser_threads = 1` the lock is always -/// uncontended. pub(super) struct ReorderState { pub(super) next_offset: u32, pending: FxHashMap, send_to_consumer: Sender>, - /// Seeded with the user-supplied anchor so the first emit is - /// also verified against it. - last_emitted_hash: Option, - /// A `prev_blockhash` mismatch fires this; converted into a - /// final `Err` by `finalize`. - chain_broken: bool, - /// Distinguishes "consumer cancelled" from "ran out of work - /// early" in the missing-blocks check inside `finalize`. consumer_dropped: bool, } impl ReorderState { - pub(super) fn new(send_to_consumer: Sender>, anchor: Option) -> Self { + pub(super) fn new(send_to_consumer: Sender>) -> Self { Self { next_offset: 0, pending: FxHashMap::default(), send_to_consumer, - last_emitted_hash: anchor, - chain_broken: false, consumer_dropped: false, } } - /// Resolves the pipeline's exit state. Called by - /// `pipeline_forward` after the read loop has finished and all - /// parser threads have joined. pub(super) fn finalize(self, expected_count: usize) -> Result<()> { - if self.chain_broken { - return Err(Error::Internal( - "forward pipeline: canonical batch stitched across a reorg", - )); - } if !self.consumer_dropped && (self.next_offset as usize) < expected_count { return Err(Error::Internal( "forward pipeline: blk files missing canonical blocks", @@ -58,18 +31,14 @@ impl ReorderState { Ok(()) } - /// Emits `block` if it's the next expected offset (and drains - /// any contiguous pending matches), otherwise parks it. Returns - /// `false` once the pipeline should stop (consumer drop or chain - /// break). pub(super) fn try_emit(&mut self, offset: u32, block: ReadBlock) -> bool { match offset.cmp(&self.next_offset) { Ordering::Equal => { - if !self.send_in_order(block) { + if !self.send(block) { return false; } while let Some(next) = self.pending.remove(&self.next_offset) { - if !self.send_in_order(next) { + if !self.send(next) { return false; } } @@ -79,31 +48,15 @@ impl ReorderState { self.pending.insert(offset, block); true } - // Each canonical hash appears at exactly one offset and - // each block is parsed once, so this is unreachable in - // practice. Ordering::Less => true, } } - fn send_in_order(&mut self, block: ReadBlock) -> bool { - if let Some(last) = &self.last_emitted_hash { - let prev = BlockHash::from(block.header.prev_blockhash); - if prev != *last { - warn!( - "canonical chain broken at offset {}: expected prev={} got {}", - self.next_offset, last, prev, - ); - self.chain_broken = true; - return false; - } - } - let hash = block.hash().clone(); + fn send(&mut self, block: ReadBlock) -> bool { if self.send_to_consumer.send(Ok(block)).is_err() { self.consumer_dropped = true; return false; } - self.last_emitted_hash = Some(hash); self.next_offset += 1; true } diff --git a/crates/brk_reader/src/pipeline/tail.rs b/crates/brk_reader/src/pipeline/tail.rs index 05c29ca45..3174f53cc 100644 --- a/crates/brk_reader/src/pipeline/tail.rs +++ b/crates/brk_reader/src/pipeline/tail.rs @@ -1,18 +1,12 @@ -//! Tail pipeline: single-threaded reverse scan of the newest blk -//! files, reading each file in `TAIL_CHUNK`-sized slices from tail -//! to head so we only touch bytes covering the canonical window. -//! Matches fill offset slots and are emitted forward with an inline -//! chain check. - use std::{fs::File, ops::ControlFlow, os::unix::fs::FileExt}; use brk_error::{Error, Result}; use brk_rpc::Client; -use brk_types::{Height, ReadBlock}; +use brk_types::{BlockHash, Height, ReadBlock}; use crossbeam::channel::Sender; use crate::{ - BlkIndexToBlkPath, BlockHash, OUT_OF_ORDER_FILE_BACKOFF, XORBytes, bisect, + BlkIndexToBlkPath, OUT_OF_ORDER_FILE_BACKOFF, XORBytes, bisect, canonical::CanonicalRange, parse::{parse_canonical_body, peek_canonical}, scan::scan_bytes, @@ -25,20 +19,14 @@ pub(super) fn pipeline_tail( paths: &BlkIndexToBlkPath, xor_bytes: XORBytes, canonical: &CanonicalRange, - anchor: Option, send: &Sender>, ) -> Result<()> { let mut slots: Vec> = (0..canonical.len()).map(|_| None).collect(); let mut remaining = canonical.len(); let mut parse_failure: Option = None; - // Bailout streak: gives up after OUT_OF_ORDER_FILE_BACKOFF - // consecutive files below the canonical window so a permanent - // miss doesn't scan the entire chain in reverse. let mut below_floor_streak: usize = 0; 'files: for (&blk_index, path) in paths.iter().rev() { - // If this file's first block is below the lowest still-missing - // canonical height, we've walked past the window. if let Some(missing_idx) = slots.iter().position(Option::is_none) && let Ok(first_height) = bisect::first_block_height(client, path, xor_bytes) { @@ -61,12 +49,6 @@ pub(super) fn pipeline_tail( continue; } - // Chunked reverse read. `end` is the file position we've - // already covered (exclusive). Each iteration reads - // [end - TAIL_CHUNK..end] and prepends it to any `spillover` - // carried from the previous iteration — the pre-first-magic - // bytes of that chunk, which must belong to a block that - // started in this earlier region. let mut end = file_len; let mut spillover: Vec = Vec::new(); @@ -78,7 +60,6 @@ pub(super) fn pipeline_tail( buf[chunk_len..].copy_from_slice(&spillover); spillover.clear(); - // `buf` now represents file bytes [start..start + buf.len()]. let result = scan_bytes( &mut buf, blk_index, @@ -93,6 +74,14 @@ pub(super) fn pipeline_tail( if slots[offset as usize].is_some() { return ControlFlow::Continue(()); } + if !canonical + .verify_prev(offset, &BlockHash::from(header.prev_blockhash)) + { + parse_failure = Some(Error::Internal( + "tail pipeline: canonical batch stitched across a reorg", + )); + return ControlFlow::Break(()); + } let height = Height::from(*canonical.start + offset); match parse_canonical_body( block_bytes.to_vec(), @@ -126,9 +115,8 @@ pub(super) fn pipeline_tail( break 'files; } - // Carry pre-first-magic bytes into the next (earlier) - // chunk so a block that straddled this chunk's start is - // stitched back together. + // Carry pre-first-magic bytes into the earlier chunk so a + // block straddling the boundary is stitched back together. end = start; if end > 0 { let prefix_len = result.first_magic.unwrap_or(buf.len()); @@ -143,22 +131,10 @@ pub(super) fn pipeline_tail( )); } - // Inline chain check; ReorderState would be 130 lines of - // machinery for the single-threaded path. - let mut last_hash: Option = anchor; for slot in slots { let block = slot.expect("tail pipeline left a slot empty after `remaining == 0`"); - if let Some(prev) = &last_hash { - let actual_prev = BlockHash::from(block.header.prev_blockhash); - if actual_prev != *prev { - return Err(Error::Internal( - "tail pipeline: canonical batch stitched across a reorg", - )); - } - } - last_hash = Some(block.hash().clone()); if send.send(Ok(block)).is_err() { - return Ok(()); // consumer dropped — clean exit + return Ok(()); } } Ok(()) diff --git a/crates/brk_reader/src/scan.rs b/crates/brk_reader/src/scan.rs index 4bc397cd8..db969dd61 100644 --- a/crates/brk_reader/src/scan.rs +++ b/crates/brk_reader/src/scan.rs @@ -6,9 +6,9 @@ use crate::{XORBytes, XORIndex, xor_bytes::XOR_LEN}; const MAGIC_BYTES: [u8; 4] = [0xF9, 0xBE, 0xB4, 0xD9]; -/// Returns the position **immediately after** the matched magic, or +/// Returns the position immediately after the matched magic, or /// `None` if no match. Advances `xor_i` by the bytes consumed either -/// way. First-byte fast-fail keeps the inner loop tight. +/// way. pub(crate) fn find_magic(bytes: &[u8], xor_i: &mut XORIndex, xor_bytes: XORBytes) -> Option { let len = bytes.len(); if len < MAGIC_BYTES.len() { @@ -42,17 +42,13 @@ pub(crate) fn find_magic(bytes: &[u8], xor_i: &mut XORIndex, xor_bytes: XORBytes None } -/// Position (relative to `buf`) of the first matched magic byte. -/// Used by the chunked tail pipeline to carry pre-first-magic bytes -/// into the next (earlier) chunk. pub(crate) struct ScanResult { pub first_magic: Option, } /// Scans `buf` for blocks and calls `on_block` for each. `file_offset` -/// is the absolute file position of `buf[0]` — used to seed the XOR -/// phase and to report absolute `BlkPosition`s so the chunked tail -/// pipeline can read mid-file slices. +/// is the absolute file position of `buf[0]`, used to seed the XOR +/// phase and to report absolute `BlkPosition`s. pub(crate) fn scan_bytes( buf: &mut [u8], blk_index: u16, diff --git a/website/scripts/options/network.js b/website/scripts/options/network.js index 916bdaaae..613fcf951 100644 --- a/website/scripts/options/network.js +++ b/website/scripts/options/network.js @@ -53,15 +53,15 @@ export function createNetworkSection() { { key: "p2pk65", name: "P2PK65", color: st.p2pk65, defaultActive: false }, ]); - // Non-addressable script types + // Non-addressable script types, reverse creation with catch-alls at tail const nonAddressableTypes = /** @type {const} */ ([ - { key: "p2ms", name: "P2MS", color: st.p2ms, defaultActive: false }, { key: "opReturn", name: "OP_RETURN", color: st.opReturn, defaultActive: true, }, + { key: "p2ms", name: "P2MS", color: st.p2ms, defaultActive: false }, { key: "empty", name: "Empty",