mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
global: snapshot
This commit is contained in:
@@ -17,7 +17,7 @@ brk_rpc = { workspace = true, features = ["corepc"] }
|
||||
brk_types = { workspace = true }
|
||||
crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] }
|
||||
derive_more = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
rlimit = "0.11.0"
|
||||
rustc-hash = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -21,22 +21,28 @@ let client = Client::new(
|
||||
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
|
||||
|
||||
// Everything from genesis to the current tip
|
||||
for block in reader.after(None)?.iter() {
|
||||
for block in reader.after(None)? {
|
||||
let block = block?;
|
||||
println!("{}: {}", block.height(), block.hash());
|
||||
}
|
||||
|
||||
// Everything strictly after a known hash (typical sync / catchup pattern)
|
||||
for block in reader.after(Some(last_known_hash))?.iter() {
|
||||
for block in reader.after(Some(last_known_hash))? {
|
||||
let block = block?;
|
||||
// ...
|
||||
}
|
||||
|
||||
// A specific inclusive height range
|
||||
for block in reader.range(Height::new(800_000), Height::new(850_000))?.iter() {
|
||||
for block in reader.range(Height::new(800_000), Height::new(850_000))? {
|
||||
let block = block?;
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
`Reader` is thread-safe and cheap to clone (Arc-backed).
|
||||
`Reader` is thread-safe and cheap to clone (Arc-backed). Each item is
|
||||
a `Result<ReadBlock>` so mid-stream failures (chain breaks, parse
|
||||
errors, missing canonical blocks) reach the consumer as a final
|
||||
`Err` instead of being silently dropped.
|
||||
|
||||
## What You Get
|
||||
|
||||
@@ -54,11 +60,19 @@ Each `ReadBlock` gives you access to:
|
||||
|
||||
## How It Works
|
||||
|
||||
Two-stage pipeline, one reader thread plus `N` parser threads
|
||||
(default `N = 1`, configurable via `after_with` / `range_with`):
|
||||
Two strategies, picked per call:
|
||||
|
||||
* **forward** — one reader thread walks blk files in order from a
|
||||
bisection lower bound, ships canonical hits to a parser pool of `N`
|
||||
threads (default `N = 1`, configurable via `after_with` /
|
||||
`range_with`), which decode bodies in parallel and emit in-order.
|
||||
* **tail** — single-threaded reverse scan of the newest blk files,
|
||||
used when the requested range sits within ~8 files of the chain
|
||||
tip. Avoids the forward pipeline's bisection + 21-file backoff
|
||||
(~2.7 GB of reads) for tip-clustered catchups.
|
||||
|
||||
```text
|
||||
canonical chain ──► Reader thread ──► Parser pool ──► Receiver<ReadBlock>
|
||||
canonical chain ──► Reader thread ──► Parser pool ──► Receiver<Result<ReadBlock>>
|
||||
(pre-fetched walks blk files, N workers in canonical order
|
||||
hashes via RPC) peeks headers, decode bodies
|
||||
ships hits
|
||||
@@ -67,15 +81,17 @@ canonical chain ──► Reader thread ──► Parser pool ──► Receiver
|
||||
1. **`CanonicalRange`** asks bitcoind once, up front, for the canonical
|
||||
block hash at every height in the target window — one batched
|
||||
JSON-RPC call, no per-block RPC chatter.
|
||||
2. **Reader thread** walks blk files in order, scans each for block
|
||||
magic, and for every block found hashes its 80-byte header and
|
||||
looks the hash up in the canonical map. Orphans short-circuit
|
||||
before the block bytes are cloned.
|
||||
3. **Parser pool** (scoped threads) fully decodes canonical bodies in
|
||||
parallel and serialises output through an in-order reorder buffer.
|
||||
The consumer always receives blocks in canonical-height order.
|
||||
2. **Reader thread** walks blk files, scans each for block magic, and
|
||||
for every block found hashes its 80-byte header and looks the hash
|
||||
up in the canonical map. Orphans short-circuit before the block
|
||||
bytes are cloned.
|
||||
3. **Parser pool** (scoped threads, forward pipeline only) fully
|
||||
decodes canonical bodies in parallel and serialises output through
|
||||
an in-order reorder buffer that also verifies `prev_blockhash`
|
||||
against the previously-emitted block — and against the user-
|
||||
supplied anchor for the very first block.
|
||||
|
||||
Orphans can never be mistaken for canonical blocks, and a missing
|
||||
canonical block produces a hard error instead of a silent drop. See
|
||||
`src/pipeline.rs` for the orchestration and `src/canonical.rs` for the
|
||||
filter map.
|
||||
canonical block produces a final `Err` to the consumer instead of a
|
||||
silent drop. See `src/pipeline/` for the orchestration and
|
||||
`src/canonical.rs` for the filter map.
|
||||
|
||||
@@ -24,7 +24,9 @@ use std::time::{Duration, Instant};
|
||||
use brk_error::Result;
|
||||
use brk_reader::{Reader, Receiver};
|
||||
use brk_rpc::{Auth, Client};
|
||||
use brk_types::{BlockHash, Height, ReadBlock};
|
||||
use brk_types::{Height, ReadBlock};
|
||||
|
||||
type BlockStream = Receiver<Result<ReadBlock>>;
|
||||
|
||||
const SCENARIOS: &[usize] = &[5, 10, 100, 1_000, 10_000];
|
||||
const REPEATS: usize = 3;
|
||||
@@ -51,7 +53,7 @@ fn main() -> Result<()> {
|
||||
for &n in SCENARIOS {
|
||||
let anchor_height = Height::from(tip.saturating_sub(n as u32));
|
||||
let anchor_hash = client.get_block_hash(*anchor_height as u64)?;
|
||||
let anchor = Some(BlockHash::from(anchor_hash));
|
||||
let anchor = Some(anchor_hash);
|
||||
|
||||
let mut first: Option<RunStats> = None;
|
||||
for &p in PARSER_COUNTS {
|
||||
@@ -113,7 +115,7 @@ struct RunStats {
|
||||
|
||||
fn bench<F>(repeats: usize, mut f: F) -> Result<RunStats>
|
||||
where
|
||||
F: FnMut() -> Result<Receiver<ReadBlock>>,
|
||||
F: FnMut() -> Result<BlockStream>,
|
||||
{
|
||||
let mut best = Duration::MAX;
|
||||
let mut total = Duration::ZERO;
|
||||
@@ -124,6 +126,7 @@ where
|
||||
let recv = f()?;
|
||||
let mut n = 0;
|
||||
for block in recv.iter() {
|
||||
let block = block?;
|
||||
std::hint::black_box(block.height());
|
||||
n += 1;
|
||||
}
|
||||
@@ -175,12 +178,13 @@ struct FullRun {
|
||||
|
||||
fn run_once<F>(mut f: F) -> Result<FullRun>
|
||||
where
|
||||
F: FnMut() -> Result<Receiver<ReadBlock>>,
|
||||
F: FnMut() -> Result<BlockStream>,
|
||||
{
|
||||
let start = Instant::now();
|
||||
let recv = f()?;
|
||||
let mut count = 0;
|
||||
for block in recv.iter() {
|
||||
let block = block?;
|
||||
std::hint::black_box(block.height());
|
||||
count += 1;
|
||||
}
|
||||
@@ -195,12 +199,13 @@ where
|
||||
/// the channel, which unblocks and unwinds the reader's spawned worker.
|
||||
fn run_bounded<F>(limit: usize, mut f: F) -> Result<FullRun>
|
||||
where
|
||||
F: FnMut() -> Result<Receiver<ReadBlock>>,
|
||||
F: FnMut() -> Result<BlockStream>,
|
||||
{
|
||||
let start = Instant::now();
|
||||
let recv = f()?;
|
||||
let mut count = 0;
|
||||
for block in recv.iter().take(limit) {
|
||||
let block = block?;
|
||||
std::hint::black_box(block.height());
|
||||
count += 1;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use brk_error::Result;
|
||||
use brk_reader::Reader;
|
||||
use brk_reader::{BlkIndexToBlkPath, Reader};
|
||||
use brk_rpc::{Auth, Client};
|
||||
|
||||
fn main() -> Result<()> {
|
||||
@@ -11,7 +11,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
|
||||
let xor_bytes = reader.xor_bytes();
|
||||
let blk_map = reader.blk_index_to_blk_path();
|
||||
let blk_map = BlkIndexToBlkPath::scan(reader.blocks_dir())?;
|
||||
|
||||
let mut prev_height: Option<u32> = None;
|
||||
let mut max_drop: u32 = 0;
|
||||
|
||||
@@ -15,6 +15,7 @@ fn main() -> Result<()> {
|
||||
// Stream all blocks from genesis to the current tip.
|
||||
let i = std::time::Instant::now();
|
||||
for block in reader.after(None)?.iter() {
|
||||
let block = block?;
|
||||
println!("{}: {}", block.height(), block.hash());
|
||||
}
|
||||
println!("Full read: {:?}", i.elapsed());
|
||||
|
||||
@@ -20,6 +20,7 @@ fn main() -> Result<()> {
|
||||
let i = std::time::Instant::now();
|
||||
|
||||
if let Some(block) = reader.range(height, height)?.iter().next() {
|
||||
let block = block?;
|
||||
println!(
|
||||
"height={} hash={} txs={} coinbase=\"{:?}\" ({:?})",
|
||||
block.height(),
|
||||
|
||||
109
crates/brk_reader/src/bisect.rs
Normal file
109
crates/brk_reader/src/bisect.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
//! Helpers for picking where to start scanning: probe the first
|
||||
//! block of a file ([`first_block_height`]) and bisect the blk-index
|
||||
//! map for a target height ([`find_start_blk_index`]).
|
||||
|
||||
use std::{fs::File, io::Read, path::Path};
|
||||
|
||||
use bitcoin::{block::Header, consensus::Decodable};
|
||||
use brk_error::{Error, Result};
|
||||
use brk_rpc::Client;
|
||||
use brk_types::Height;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{
|
||||
BlkIndexToBlkPath, OUT_OF_ORDER_FILE_BACKOFF, XORBytes, XORIndex,
|
||||
parse::HEADER_LEN, scan::find_magic,
|
||||
};
|
||||
|
||||
const PROBE_BUF_LEN: usize = 4096;
|
||||
|
||||
/// Decodes the first block in `blk_path` and resolves its height via
|
||||
/// RPC. One short read + one RPC.
|
||||
pub(crate) fn first_block_height(
|
||||
client: &Client,
|
||||
blk_path: &Path,
|
||||
xor_bytes: XORBytes,
|
||||
) -> Result<Height> {
|
||||
let mut file = File::open(blk_path)?;
|
||||
let mut buf = [0u8; PROBE_BUF_LEN];
|
||||
let n = file.read(&mut buf)?;
|
||||
|
||||
let mut xor_i = XORIndex::default();
|
||||
let magic_end = find_magic(&buf[..n], &mut xor_i, xor_bytes)
|
||||
.ok_or_else(|| Error::NotFound("No magic bytes found".into()))?;
|
||||
|
||||
// Decode the 4-byte size + 80-byte header in one pass; the size
|
||||
// is discarded. Bounds-check first so a corrupt file whose only
|
||||
// magic-shaped bytes sit at the end of the probe doesn't index
|
||||
// past `n`.
|
||||
let header_end = magic_end + 4 + HEADER_LEN;
|
||||
if header_end > n {
|
||||
warn!(
|
||||
"first_block_height: {} has magic-shaped bytes at offset {} but \
|
||||
not enough room in the {}-byte probe to decode the header — \
|
||||
the file is probably corrupt",
|
||||
blk_path.display(),
|
||||
magic_end - 4,
|
||||
PROBE_BUF_LEN,
|
||||
);
|
||||
return Err(Error::Parse(format!(
|
||||
"blk file probe truncated before header at {}",
|
||||
blk_path.display()
|
||||
)));
|
||||
}
|
||||
xor_i.bytes(&mut buf[magic_end..header_end], xor_bytes);
|
||||
|
||||
let header = Header::consensus_decode(&mut &buf[magic_end + 4..header_end])?;
|
||||
let height = client.get_block_info(&header.block_hash())?.height as u32;
|
||||
|
||||
Ok(Height::new(height))
|
||||
}
|
||||
|
||||
/// Bisects the map for the file whose first block height is ≤
|
||||
/// `target_start`, then backs off [`OUT_OF_ORDER_FILE_BACKOFF`] files.
|
||||
/// Always returns a valid blk index — read errors mid-search log and
|
||||
/// fall through to the backoff (or to 0 if the map is empty).
|
||||
///
|
||||
/// On a transient read error we **break** rather than `left = mid + 1`:
|
||||
/// the height bound at `mid` is unknown, so any further bisection on
|
||||
/// that side could skip valid lower indices. Falling through to the
|
||||
/// backoff still gives a safe lower bound.
|
||||
pub(crate) fn find_start_blk_index(
|
||||
client: &Client,
|
||||
target_start: Height,
|
||||
paths: &BlkIndexToBlkPath,
|
||||
xor_bytes: XORBytes,
|
||||
) -> u16 {
|
||||
let entries: Vec<(u16, &Path)> = paths.iter().map(|(&i, p)| (i, p.as_path())).collect();
|
||||
if entries.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let mut left = 0;
|
||||
let mut right = entries.len() - 1;
|
||||
let mut best_start_idx = 0;
|
||||
|
||||
while left <= right {
|
||||
let mid = (left + right) / 2;
|
||||
let (blk_index, blk_path) = entries[mid];
|
||||
match first_block_height(client, blk_path, xor_bytes) {
|
||||
Ok(height) if height <= target_start => {
|
||||
best_start_idx = mid;
|
||||
left = mid + 1;
|
||||
}
|
||||
Ok(_) => {
|
||||
if mid == 0 {
|
||||
break;
|
||||
}
|
||||
right = mid - 1;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("find_start_blk_index: read error at blk{blk_index:05}.dat: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let final_idx = best_start_idx.saturating_sub(OUT_OF_ORDER_FILE_BACKOFF);
|
||||
entries[final_idx].0
|
||||
}
|
||||
@@ -4,29 +4,49 @@ use std::{
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use derive_more::{Deref, DerefMut};
|
||||
use brk_error::{Error, Result};
|
||||
use derive_more::Deref;
|
||||
|
||||
const BLK: &str = "blk";
|
||||
const DOT_DAT: &str = ".dat";
|
||||
|
||||
#[derive(Debug, Clone, Deref, DerefMut)]
|
||||
#[derive(Debug, Default, Clone, Deref)]
|
||||
pub struct BlkIndexToBlkPath(BTreeMap<u16, PathBuf>);
|
||||
|
||||
impl BlkIndexToBlkPath {
|
||||
pub fn scan(blocks_dir: &Path) -> Self {
|
||||
Self(
|
||||
fs::read_dir(blocks_dir)
|
||||
.unwrap()
|
||||
.filter_map(|entry| {
|
||||
let path = entry.unwrap().path();
|
||||
let file_name = path.file_name()?.to_str()?;
|
||||
/// Collects every `blkNNNNN.dat` in `blocks_dir`. Unrelated
|
||||
/// entries (`xor.dat`, `rev*.dat`, `index/`, …) are skipped
|
||||
/// silently; anything that **looks** like a blk file but fails to
|
||||
/// parse or isn't a regular file is a hard error, since silently
|
||||
/// dropping one would leave an undetectable hole in the chain.
|
||||
pub fn scan(blocks_dir: &Path) -> Result<Self> {
|
||||
let mut map = BTreeMap::new();
|
||||
|
||||
let index_str = file_name.strip_prefix(BLK)?.strip_suffix(DOT_DAT)?;
|
||||
let blk_index = index_str.parse::<u16>().ok()?;
|
||||
for entry in fs::read_dir(blocks_dir)? {
|
||||
let path = entry?.path();
|
||||
|
||||
path.is_file().then_some((blk_index, path))
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
let Some(file_name) = path.file_name().and_then(|n| n.to_str()) else {
|
||||
continue;
|
||||
};
|
||||
let Some(index_str) = file_name.strip_prefix(BLK).and_then(|s| s.strip_suffix(DOT_DAT))
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let blk_index = index_str
|
||||
.parse::<u16>()
|
||||
.map_err(|_| Error::Parse(format!("Malformed blk file name: {file_name}")))?;
|
||||
|
||||
if !path.is_file() {
|
||||
return Err(Error::Parse(format!(
|
||||
"blk entry is not a regular file: {}",
|
||||
path.display()
|
||||
)));
|
||||
}
|
||||
|
||||
map.insert(blk_index, path);
|
||||
}
|
||||
|
||||
Ok(Self(map))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,36 +1,28 @@
|
||||
//! `CanonicalRange`: a pre-fetched map from canonical block hash to
|
||||
//! offset-from-`start`. The reader uses this as the authoritative
|
||||
//! filter for "is this block on the main chain?".
|
||||
//!
|
||||
//! Every canonical hash in the target height window is fetched from
|
||||
//! bitcoind up front via [`get_block_hashes_range`], so the scan
|
||||
//! pipeline never needs a per-block RPC call (which is what caused the
|
||||
//! original silent-drop reorg bug).
|
||||
//!
|
||||
//! [`get_block_hashes_range`]: brk_rpc::Client::get_block_hashes_range
|
||||
//! `CanonicalRange`: every canonical block hash in a height window,
|
||||
//! pre-fetched once via [`brk_rpc::Client::get_block_hashes_range`].
|
||||
//! Used as the authoritative "is this block on the main chain?"
|
||||
//! filter so the scan pipeline never needs a per-block RPC call.
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_rpc::Client;
|
||||
use brk_types::{BlockHash, BlockHashPrefix, Height};
|
||||
use brk_types::{BlockHash, Height};
|
||||
use rustc_hash::FxHashMap;
|
||||
|
||||
/// Every canonical block hash in a contiguous height window, resolved
|
||||
/// from bitcoind once up front. `hashes[i]` is the canonical hash at
|
||||
/// height `start + i`. Lookups by hash go through `by_prefix` (8-byte
|
||||
/// key, same scheme as `brk_store`) and verify the full hash on hit.
|
||||
/// Keyed on the full 32-byte hash because a prefix collision would
|
||||
/// silently drop both blocks; the ~24 MB extra RAM is negligible
|
||||
/// against the 128 MB blk reads happening in parallel.
|
||||
pub struct CanonicalRange {
|
||||
pub start: Height,
|
||||
hashes: Vec<BlockHash>,
|
||||
by_prefix: FxHashMap<BlockHashPrefix, u32>,
|
||||
by_hash: FxHashMap<BlockHash, u32>,
|
||||
}
|
||||
|
||||
impl CanonicalRange {
|
||||
/// Resolves canonical hashes for every height strictly after
|
||||
/// `anchor` up to `tip` inclusive. `anchor = None` starts at
|
||||
/// genesis.
|
||||
pub fn walk(client: &Client, anchor: Option<BlockHash>, tip: Height) -> Result<Self> {
|
||||
pub fn walk(client: &Client, anchor: Option<&BlockHash>, tip: Height) -> Result<Self> {
|
||||
let start = match anchor {
|
||||
Some(hash) => Height::from(client.get_block_header_info(&hash)?.height + 1),
|
||||
Some(hash) => Height::from(client.get_block_header_info(hash)?.height + 1),
|
||||
None => Height::ZERO,
|
||||
};
|
||||
Self::between(client, start, tip)
|
||||
@@ -41,43 +33,29 @@ impl CanonicalRange {
|
||||
if start > end {
|
||||
return Ok(Self {
|
||||
start,
|
||||
hashes: Vec::new(),
|
||||
by_prefix: FxHashMap::default(),
|
||||
by_hash: FxHashMap::default(),
|
||||
});
|
||||
}
|
||||
|
||||
let hashes = client.get_block_hashes_range(*start, *end)?;
|
||||
let mut by_prefix =
|
||||
FxHashMap::with_capacity_and_hasher(hashes.len(), Default::default());
|
||||
by_prefix.extend(
|
||||
hashes
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, h)| (BlockHashPrefix::from(h), i as u32)),
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
start,
|
||||
hashes,
|
||||
by_prefix,
|
||||
})
|
||||
let by_hash = client
|
||||
.get_block_hashes_range(*start, *end)?
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, h)| (h, i as u32))
|
||||
.collect();
|
||||
Ok(Self { start, by_hash })
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.hashes.len()
|
||||
self.by_hash.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.hashes.is_empty()
|
||||
self.by_hash.is_empty()
|
||||
}
|
||||
|
||||
/// Returns the offset-from-`start` of `hash` iff it matches the
|
||||
/// canonical chain in this range. A prefix hit is verified against
|
||||
/// the full hash so prefix collisions from orphaned blocks are
|
||||
/// rejected.
|
||||
/// Offset-from-`start` of `hash` iff it's on the canonical chain.
|
||||
#[inline]
|
||||
pub(crate) fn offset_of(&self, hash: &BlockHash) -> Option<u32> {
|
||||
let offset = *self.by_prefix.get(&BlockHashPrefix::from(hash))?;
|
||||
(self.hashes[offset as usize] == *hash).then_some(offset)
|
||||
self.by_hash.get(hash).copied()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,15 +9,14 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use bitcoin::{block::Header, consensus::Decodable};
|
||||
use blk_index_to_blk_path::*;
|
||||
use brk_error::{Error, Result};
|
||||
use brk_rpc::Client;
|
||||
use brk_types::{BlkPosition, BlockHash, Height, ReadBlock};
|
||||
pub use crossbeam::channel::Receiver;
|
||||
use derive_more::Deref;
|
||||
use parking_lot::{RwLock, RwLockReadGuard};
|
||||
use parking_lot::RwLock;
|
||||
use tracing::warn;
|
||||
|
||||
mod bisect;
|
||||
mod blk_index_to_blk_path;
|
||||
mod canonical;
|
||||
mod parse;
|
||||
@@ -26,259 +25,201 @@ mod scan;
|
||||
mod xor_bytes;
|
||||
mod xor_index;
|
||||
|
||||
pub use blk_index_to_blk_path::BlkIndexToBlkPath;
|
||||
pub use canonical::CanonicalRange;
|
||||
use scan::*;
|
||||
pub use xor_bytes::*;
|
||||
pub use xor_index::*;
|
||||
|
||||
/// How many blk files to step back from the binary-search hit in
|
||||
/// [`ReaderInner::find_start_blk_index`]. Guards against blocks that
|
||||
/// bitcoind wrote to the "current" file slightly out of height order
|
||||
/// (e.g. the tail of a reorg landing in an earlier file index than
|
||||
/// its successors).
|
||||
const START_BLK_INDEX_BACKOFF: usize = 21;
|
||||
/// Files of out-of-order play to tolerate. bitcoind sometimes writes
|
||||
/// blocks slightly out of height order across files (initial sync,
|
||||
/// headers-first body fetch, reindex), so a single "out of bounds"
|
||||
/// signal isn't enough to declare failure. Used by the forward
|
||||
/// bisection backoff and the tail bailout streak.
|
||||
pub(crate) const OUT_OF_ORDER_FILE_BACKOFF: usize = 21;
|
||||
|
||||
/// Handle to a Bitcoin Core blk-file reader.
|
||||
///
|
||||
/// Cheap to clone (`Arc`-backed) and thread-safe: all streaming
|
||||
/// methods take `&self` and the returned `Receiver<ReadBlock>` can be
|
||||
const TARGET_NOFILE: u64 = 15_000;
|
||||
|
||||
/// Bitcoin Core blk-file reader. Cheap to clone (`Arc`-backed) and
|
||||
/// thread-safe: every method takes `&self` and the
|
||||
/// `Receiver<Result<ReadBlock>>` from the streaming API can be
|
||||
/// drained from any thread.
|
||||
#[derive(Debug, Clone, Deref)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Reader(Arc<ReaderInner>);
|
||||
|
||||
impl Reader {
|
||||
/// Raises the per-process `NOFILE` limit so the file-handle cache
|
||||
/// can keep one open `File` per `blkNNNNN.dat`. For tests or
|
||||
/// embeddings that don't want the process-wide rlimit side
|
||||
/// effect, use [`Self::new_without_rlimit`].
|
||||
pub fn new(blocks_dir: PathBuf, client: &Client) -> Self {
|
||||
Self(Arc::new(ReaderInner::new(blocks_dir, client.clone())))
|
||||
Self::raise_fd_limit();
|
||||
Self::new_without_rlimit(blocks_dir, client)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ReaderInner {
|
||||
blk_index_to_blk_path: Arc<RwLock<BlkIndexToBlkPath>>,
|
||||
blk_file_cache: RwLock<BTreeMap<u16, File>>,
|
||||
xor_bytes: XORBytes,
|
||||
blocks_dir: PathBuf,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl ReaderInner {
|
||||
pub fn new(blocks_dir: PathBuf, client: Client) -> Self {
|
||||
let no_file_limit = rlimit::getrlimit(rlimit::Resource::NOFILE).unwrap_or((0, 0));
|
||||
let _ = rlimit::setrlimit(
|
||||
rlimit::Resource::NOFILE,
|
||||
no_file_limit.0.max(15_000),
|
||||
no_file_limit.1,
|
||||
);
|
||||
|
||||
Self {
|
||||
pub fn new_without_rlimit(blocks_dir: PathBuf, client: &Client) -> Self {
|
||||
Self(Arc::new(ReaderInner {
|
||||
xor_bytes: XORBytes::from(blocks_dir.as_path()),
|
||||
blk_index_to_blk_path: Arc::new(RwLock::new(BlkIndexToBlkPath::scan(
|
||||
blocks_dir.as_path(),
|
||||
))),
|
||||
blk_file_cache: RwLock::new(BTreeMap::new()),
|
||||
blocks_dir,
|
||||
client,
|
||||
client: client.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
/// Called automatically by [`Self::new`]. Exposed so callers
|
||||
/// using [`Self::new_without_rlimit`] can opt in once.
|
||||
///
|
||||
/// Raises **only the soft limit**, clamped to the current hard
|
||||
/// limit — raising the hard limit requires `CAP_SYS_RESOURCE`
|
||||
/// and would fail (dropping the entire call) on containers and
|
||||
/// unprivileged macOS user processes.
|
||||
pub fn raise_fd_limit() {
|
||||
let (soft, hard) = rlimit::getrlimit(rlimit::Resource::NOFILE).unwrap_or((0, 0));
|
||||
let new_soft = soft.max(TARGET_NOFILE).min(hard);
|
||||
if new_soft > soft
|
||||
&& let Err(e) = rlimit::setrlimit(rlimit::Resource::NOFILE, new_soft, hard)
|
||||
{
|
||||
warn!("failed to raise NOFILE rlimit: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn client(&self) -> &Client {
|
||||
&self.client
|
||||
&self.0.client
|
||||
}
|
||||
|
||||
pub fn blocks_dir(&self) -> &Path {
|
||||
&self.blocks_dir
|
||||
}
|
||||
|
||||
pub fn blk_index_to_blk_path(&self) -> RwLockReadGuard<'_, BlkIndexToBlkPath> {
|
||||
self.blk_index_to_blk_path.read()
|
||||
&self.0.blocks_dir
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn xor_bytes(&self) -> XORBytes {
|
||||
self.xor_bytes
|
||||
self.0.xor_bytes
|
||||
}
|
||||
|
||||
/// Ensure the blk file for `blk_index` is in the file handle cache.
|
||||
fn ensure_blk_cached(&self, blk_index: u16) -> Result<()> {
|
||||
if self.blk_file_cache.read().contains_key(&blk_index) {
|
||||
return Ok(());
|
||||
}
|
||||
let blk_paths = self.blk_index_to_blk_path();
|
||||
let blk_path = blk_paths
|
||||
.get(&blk_index)
|
||||
.ok_or(Error::NotFound("Blk file not found".into()))?;
|
||||
let file = File::open(blk_path)?;
|
||||
self.blk_file_cache.write().entry(blk_index).or_insert(file);
|
||||
Ok(())
|
||||
/// Decode the first block in `blk_path` and resolve its height
|
||||
/// via RPC. Exposed for inspection tools (see
|
||||
/// `examples/blk_heights.rs`).
|
||||
pub fn first_block_height(&self, blk_path: &Path, xor_bytes: XORBytes) -> Result<Height> {
|
||||
bisect::first_block_height(&self.0.client, blk_path, xor_bytes)
|
||||
}
|
||||
|
||||
/// Read raw bytes from a blk file at the given position with XOR decoding.
|
||||
/// `read_exact_at` so a short read becomes a hard error instead
|
||||
/// of silent corruption from the buffer's zero-init tail.
|
||||
pub fn read_raw_bytes(&self, position: BlkPosition, size: usize) -> Result<Vec<u8>> {
|
||||
self.ensure_blk_cached(position.blk_index())?;
|
||||
|
||||
let cache = self.blk_file_cache.read();
|
||||
let file = cache.get(&position.blk_index()).unwrap();
|
||||
let file = self.0.open_blk(position.blk_index())?;
|
||||
let mut buffer = vec![0u8; size];
|
||||
file.read_at(&mut buffer, position.offset() as u64)?;
|
||||
XORIndex::decode_at(&mut buffer, position.offset() as usize, self.xor_bytes);
|
||||
file.read_exact_at(&mut buffer, position.offset() as u64)?;
|
||||
XORIndex::decode_at(&mut buffer, position.offset() as usize, self.0.xor_bytes);
|
||||
Ok(buffer)
|
||||
}
|
||||
|
||||
/// Returns a `Read` impl positioned at `position` in the blk file.
|
||||
/// Reads only the bytes requested — no upfront allocation.
|
||||
pub fn reader_at(&self, position: BlkPosition) -> Result<BlkRead<'_>> {
|
||||
self.ensure_blk_cached(position.blk_index())?;
|
||||
|
||||
let mut xor_index = XORIndex::default();
|
||||
xor_index.add_assign(position.offset() as usize);
|
||||
|
||||
/// Streaming `Read` at `position`. Holds an `Arc<File>` so the
|
||||
/// cache lock isn't held across the I/O.
|
||||
pub fn reader_at(&self, position: BlkPosition) -> Result<BlkRead> {
|
||||
let file = self.0.open_blk(position.blk_index())?;
|
||||
Ok(BlkRead {
|
||||
cache: self.blk_file_cache.read(),
|
||||
blk_index: position.blk_index(),
|
||||
file,
|
||||
offset: position.offset() as u64,
|
||||
xor_index,
|
||||
xor_bytes: self.xor_bytes,
|
||||
xor_index: XORIndex::at_offset(position.offset() as usize),
|
||||
xor_bytes: self.0.xor_bytes,
|
||||
})
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────
|
||||
// Public streaming API — all calls delegate to `pipeline::spawn`.
|
||||
// ─────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Streams every canonical block strictly after `hash` (or from
|
||||
/// genesis when `None`) up to the current chain tip, in canonical
|
||||
/// order. Uses the default parser-thread count; see
|
||||
/// [`after_with`](Self::after_with) to override.
|
||||
pub fn after(&self, hash: Option<BlockHash>) -> Result<Receiver<ReadBlock>> {
|
||||
/// genesis when `None`) up to the current chain tip.
|
||||
pub fn after(&self, hash: Option<BlockHash>) -> Result<Receiver<Result<ReadBlock>>> {
|
||||
self.after_with(hash, pipeline::DEFAULT_PARSER_THREADS)
|
||||
}
|
||||
|
||||
/// Like [`after`](Self::after) but with a configurable number of
|
||||
/// parser threads. `parser_threads = 1` is the minimal-thread
|
||||
/// default (1 reader + 1 parser, uncontended mutex). Higher values
|
||||
/// trade extra cores for throughput on dense ranges where the
|
||||
/// parser is the bottleneck.
|
||||
/// Like [`after`](Self::after) with a configurable parser-thread
|
||||
/// count. The default of 1 reader + 1 parser leaves the rest of
|
||||
/// the cores for the indexer; bench tools that drain the channel
|
||||
/// cheaply can override.
|
||||
pub fn after_with(
|
||||
&self,
|
||||
hash: Option<BlockHash>,
|
||||
parser_threads: usize,
|
||||
) -> Result<Receiver<ReadBlock>> {
|
||||
let tip = self.client.get_last_height()?;
|
||||
let canonical = CanonicalRange::walk(&self.client, hash, tip)?;
|
||||
pipeline::spawn(self, canonical, parser_threads)
|
||||
) -> Result<Receiver<Result<ReadBlock>>> {
|
||||
let tip = self.0.client.get_last_height()?;
|
||||
let canonical = CanonicalRange::walk(&self.0.client, hash.as_ref(), tip)?;
|
||||
pipeline::spawn(self.0.clone(), canonical, hash, parser_threads)
|
||||
}
|
||||
|
||||
/// Streams every canonical block in the inclusive height range
|
||||
/// `start..=end` in canonical order, via the same pipeline as
|
||||
/// [`after`](Self::after).
|
||||
pub fn range(&self, start: Height, end: Height) -> Result<Receiver<ReadBlock>> {
|
||||
/// Inclusive height range `start..=end` in canonical order.
|
||||
pub fn range(&self, start: Height, end: Height) -> Result<Receiver<Result<ReadBlock>>> {
|
||||
self.range_with(start, end, pipeline::DEFAULT_PARSER_THREADS)
|
||||
}
|
||||
|
||||
/// Like [`range`](Self::range) but with a configurable number of
|
||||
/// parser threads. See [`after_with`](Self::after_with) for the
|
||||
/// parser-count tradeoff.
|
||||
pub fn range_with(
|
||||
&self,
|
||||
start: Height,
|
||||
end: Height,
|
||||
parser_threads: usize,
|
||||
) -> Result<Receiver<ReadBlock>> {
|
||||
let canonical = CanonicalRange::between(&self.client, start, end)?;
|
||||
pipeline::spawn(self, canonical, parser_threads)
|
||||
}
|
||||
|
||||
/// Binary-searches `blk_index_to_blk_path` for the first file
|
||||
/// whose earliest block height is ≤ `target_start`, then backs
|
||||
/// off a few files as a safety margin for blocks that were written
|
||||
/// out of height order (see [`START_BLK_INDEX_BACKOFF`]).
|
||||
fn find_start_blk_index(
|
||||
&self,
|
||||
target_start: Option<Height>,
|
||||
blk_index_to_blk_path: &BlkIndexToBlkPath,
|
||||
xor_bytes: XORBytes,
|
||||
) -> Result<u16> {
|
||||
let Some(target_start) = target_start else {
|
||||
return Ok(0);
|
||||
};
|
||||
|
||||
let blk_indices: Vec<u16> = blk_index_to_blk_path.keys().copied().collect();
|
||||
if blk_indices.is_empty() {
|
||||
return Ok(0);
|
||||
) -> Result<Receiver<Result<ReadBlock>>> {
|
||||
let tip = self.0.client.get_last_height()?;
|
||||
if end > tip {
|
||||
return Err(Error::OutOfRange(format!(
|
||||
"range end {end} is past current tip {tip}"
|
||||
)));
|
||||
}
|
||||
|
||||
let mut left = 0;
|
||||
let mut right = blk_indices.len() - 1;
|
||||
let mut best_start_idx = 0;
|
||||
|
||||
while left <= right {
|
||||
let mid = (left + right) / 2;
|
||||
let blk_index = blk_indices[mid];
|
||||
|
||||
let Some(blk_path) = blk_index_to_blk_path.get(&blk_index) else {
|
||||
break;
|
||||
};
|
||||
match self.first_block_height(blk_path, xor_bytes) {
|
||||
Ok(height) if height <= target_start => {
|
||||
best_start_idx = mid;
|
||||
left = mid + 1;
|
||||
}
|
||||
Ok(_) => {
|
||||
if mid == 0 {
|
||||
break;
|
||||
}
|
||||
right = mid - 1;
|
||||
}
|
||||
Err(_) => {
|
||||
left = mid + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let final_idx = best_start_idx.saturating_sub(START_BLK_INDEX_BACKOFF);
|
||||
Ok(blk_indices.get(final_idx).copied().unwrap_or(0))
|
||||
}
|
||||
|
||||
pub fn first_block_height(
|
||||
&self,
|
||||
blk_path: &Path,
|
||||
xor_bytes: XORBytes,
|
||||
) -> Result<Height> {
|
||||
let mut file = File::open(blk_path)?;
|
||||
let mut buf = [0u8; 4096];
|
||||
let n = file.read(&mut buf)?;
|
||||
|
||||
let mut xor_i = XORIndex::default();
|
||||
let magic_end = find_magic(&buf[..n], &mut xor_i, xor_bytes)
|
||||
.ok_or_else(|| Error::NotFound("No magic bytes found".into()))?;
|
||||
|
||||
let size_end = magic_end + 4;
|
||||
xor_i.bytes(&mut buf[magic_end..size_end], xor_bytes);
|
||||
|
||||
let header_end = size_end + 80;
|
||||
xor_i.bytes(&mut buf[size_end..header_end], xor_bytes);
|
||||
|
||||
let header =
|
||||
Header::consensus_decode(&mut std::io::Cursor::new(&buf[size_end..header_end]))?;
|
||||
|
||||
let height = self.client.get_block_info(&header.block_hash())?.height as u32;
|
||||
|
||||
Ok(Height::new(height))
|
||||
let canonical = CanonicalRange::between(&self.0.client, start, end)?;
|
||||
// No anchor: caller asked for "blocks at heights X..=Y", they
|
||||
// get whatever bitcoind says is canonical there.
|
||||
pipeline::spawn(self.0.clone(), canonical, None, parser_threads)
|
||||
}
|
||||
}
|
||||
|
||||
/// Streaming reader at a position in a blk file. Reads via pread + XOR on demand.
|
||||
pub struct BlkRead<'a> {
|
||||
cache: RwLockReadGuard<'a, BTreeMap<u16, File>>,
|
||||
blk_index: u16,
|
||||
/// `pub(crate)` so `pipeline` can capture it via `Arc<ReaderInner>`
|
||||
/// for spawned workers; everything else goes through `Reader`.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ReaderInner {
|
||||
/// Invalidated on every [`refresh_paths`](Self::refresh_paths) so
|
||||
/// a pruned/reindexed blk file can't keep serving stale bytes
|
||||
/// from a dead inode. `Arc<File>` lets us hand out cheap clones
|
||||
/// without holding the cache lock during I/O.
|
||||
blk_file_cache: RwLock<BTreeMap<u16, Arc<File>>>,
|
||||
pub(crate) xor_bytes: XORBytes,
|
||||
pub(crate) blocks_dir: PathBuf,
|
||||
pub(crate) client: Client,
|
||||
}
|
||||
|
||||
impl ReaderInner {
|
||||
/// Rescan the blocks directory and drop the file-handle cache in
|
||||
/// the same critical section. Old `Arc<File>`s already in flight
|
||||
/// stay valid until their last drop; new lookups go through the
|
||||
/// fresh inode.
|
||||
pub(crate) fn refresh_paths(&self) -> Result<BlkIndexToBlkPath> {
|
||||
let paths = BlkIndexToBlkPath::scan(&self.blocks_dir)?;
|
||||
self.blk_file_cache.write().clear();
|
||||
Ok(paths)
|
||||
}
|
||||
|
||||
/// The blk path is deterministic (`<blocks_dir>/blkNNNNN.dat`),
|
||||
/// so we don't need a directory scan to resolve it. Two threads
|
||||
/// racing on a missing entry will both call `File::open`; the
|
||||
/// loser's `Arc` is dropped via `or_insert`.
|
||||
fn open_blk(&self, blk_index: u16) -> Result<Arc<File>> {
|
||||
if let Some(file) = self.blk_file_cache.read().get(&blk_index).cloned() {
|
||||
return Ok(file);
|
||||
}
|
||||
let path = self.blocks_dir.join(format!("blk{blk_index:05}.dat"));
|
||||
let file = Arc::new(File::open(&path)?);
|
||||
let mut cache = self.blk_file_cache.write();
|
||||
Ok(cache.entry(blk_index).or_insert(file).clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Streaming reader at a position in a blk file. Holds an `Arc<File>`
|
||||
/// so it doesn't lock the file cache while the consumer is reading.
|
||||
pub struct BlkRead {
|
||||
file: Arc<File>,
|
||||
offset: u64,
|
||||
xor_index: XORIndex,
|
||||
xor_bytes: XORBytes,
|
||||
}
|
||||
|
||||
impl Read for BlkRead<'_> {
|
||||
impl Read for BlkRead {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
let file = self.cache.get(&self.blk_index).unwrap();
|
||||
let n = file.read_at(buf, self.offset)?;
|
||||
let n = self.file.read_at(buf, self.offset)?;
|
||||
self.xor_index.bytes(&mut buf[..n], self.xor_bytes);
|
||||
self.offset += n as u64;
|
||||
Ok(n)
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
//! Pure block parsing — XOR decoding, header and body decode.
|
||||
//!
|
||||
//! Split into a cheap header peek and a full body parse so the scan
|
||||
//! loop can reject non-canonical blocks without copying them. No RPC,
|
||||
//! no threading, no state.
|
||||
//! Block parsing — XOR decoding, header peek, full body parse. Split
|
||||
//! so the scan loop can reject non-canonical blocks before copying.
|
||||
|
||||
use std::io::Cursor;
|
||||
|
||||
@@ -12,19 +9,18 @@ use brk_types::{BlkMetadata, Block, BlockHash, Height, ReadBlock};
|
||||
|
||||
use crate::{XORBytes, XORIndex, canonical::CanonicalRange};
|
||||
|
||||
const HEADER_LEN: usize = 80;
|
||||
pub(crate) const HEADER_LEN: usize = 80;
|
||||
|
||||
/// Returns the canonical offset of `bytes` if its header hashes to a
|
||||
/// known canonical block, otherwise `None`. Does not allocate and does
|
||||
/// not mutate `bytes`: the header is copied onto a stack buffer and
|
||||
/// XOR-decoded there so an orphan short-circuits cleanly and a
|
||||
/// canonical hit can still be cloned out intact.
|
||||
pub fn peek_canonical_offset(
|
||||
/// Cheap canonical-membership check. Decodes the header onto a stack
|
||||
/// buffer so `bytes` stays untouched (the parser later re-XORs the
|
||||
/// full block from the original phase). Returning the parsed header
|
||||
/// lets the body parse skip a second decode.
|
||||
pub(crate) fn peek_canonical(
|
||||
bytes: &[u8],
|
||||
mut xor_state: XORIndex,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &CanonicalRange,
|
||||
) -> Option<u32> {
|
||||
) -> Option<(u32, Header)> {
|
||||
if bytes.len() < HEADER_LEN {
|
||||
return None;
|
||||
}
|
||||
@@ -32,27 +28,30 @@ pub fn peek_canonical_offset(
|
||||
header_buf.copy_from_slice(&bytes[..HEADER_LEN]);
|
||||
xor_state.bytes(&mut header_buf, xor_bytes);
|
||||
let header = Header::consensus_decode(&mut &header_buf[..]).ok()?;
|
||||
canonical.offset_of(&BlockHash::from(header.block_hash()))
|
||||
let offset = canonical.offset_of(&BlockHash::from(header.block_hash()))?;
|
||||
Some((offset, header))
|
||||
}
|
||||
|
||||
/// Full XOR-decode + parse for a block that has already been confirmed
|
||||
/// canonical by `peek_canonical_offset`. Takes owned `bytes` so it can
|
||||
/// mutate them in place and hand them to the resulting `ReadBlock`.
|
||||
pub fn parse_canonical_body(
|
||||
/// Full XOR-decode + body parse. Takes the previously-parsed `header`
|
||||
/// from `peek_canonical` so we don't re-parse it.
|
||||
pub(crate) fn parse_canonical_body(
|
||||
mut bytes: Vec<u8>,
|
||||
metadata: BlkMetadata,
|
||||
mut xor_state: XORIndex,
|
||||
xor_bytes: XORBytes,
|
||||
height: Height,
|
||||
header: Header,
|
||||
) -> Result<ReadBlock> {
|
||||
if bytes.len() < HEADER_LEN {
|
||||
return Err(Error::Internal("Block bytes shorter than header"));
|
||||
}
|
||||
|
||||
xor_state.bytes(&mut bytes, xor_bytes);
|
||||
let mut cursor = Cursor::new(bytes);
|
||||
let header = Header::consensus_decode(&mut cursor)?;
|
||||
let bitcoin_hash = header.block_hash();
|
||||
|
||||
let mut cursor = Cursor::new(bytes);
|
||||
cursor.set_position(HEADER_LEN as u64);
|
||||
|
||||
let tx_count = VarInt::consensus_decode(&mut cursor)?.0 as usize;
|
||||
let mut txdata = Vec::with_capacity(tx_count);
|
||||
let mut tx_metadata = Vec::with_capacity(tx_count);
|
||||
|
||||
@@ -1,470 +0,0 @@
|
||||
//! The actual pipeline turning a blk-file scan into an ordered
|
||||
//! `ReadBlock` stream. [`spawn`] picks between two strategies:
|
||||
//!
|
||||
//! * **[`pipeline_forward`]** — one reader thread walks blk files in
|
||||
//! order, peeks each block's header against the pre-fetched
|
||||
//! `CanonicalRange`, and ships canonical hits over an mpmc channel
|
||||
//! to a scoped parser pool of `parser_threads` workers, which decode
|
||||
//! bodies in parallel and serialise emission through a shared
|
||||
//! [`ReorderState`] mutex. Used when the range is larger than
|
||||
//! `TAIL_THRESHOLD`.
|
||||
//! * **[`pipeline_tail`]** — single-threaded reverse scan of the
|
||||
//! newest blk files in 5 MB chunks, buffering every canonical match
|
||||
//! in offset-indexed slots and then emitting through [`ReorderState`]
|
||||
//! in the same order. Used for `canonical.len() <= TAIL_THRESHOLD`,
|
||||
//! where the channel + lock overhead of the forward pipeline would
|
||||
//! dominate.
|
||||
//!
|
||||
//! Both pipelines route emission through [`ReorderState`], which
|
||||
//! verifies `block.header.prev_blockhash` against the previously
|
||||
//! emitted block's hash and aborts cleanly if the canonical-hash batch
|
||||
//! that produced the stream was stitched across a mid-batch reorg.
|
||||
//!
|
||||
//! Canonical blocks can also arrive out of order across blk files
|
||||
//! (bitcoind doesn't write in strict chain order during initial sync,
|
||||
//! headers-first body fetch, or reindex), so the reorder buffer is
|
||||
//! required even at `parser_threads = 1`.
|
||||
|
||||
use std::{
|
||||
fs::{self, File},
|
||||
io::{Read, Seek, SeekFrom},
|
||||
ops::ControlFlow,
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
thread,
|
||||
};
|
||||
|
||||
use brk_error::{Error, Result};
|
||||
use brk_types::{BlkMetadata, BlockHash, Height, ReadBlock};
|
||||
use crossbeam::channel::{Receiver, Sender, bounded};
|
||||
use parking_lot::Mutex;
|
||||
use rustc_hash::FxHashMap;
|
||||
use tracing::{error, warn};
|
||||
|
||||
use crate::{
|
||||
BlkIndexToBlkPath, ReaderInner, XORBytes, XORIndex,
|
||||
canonical::CanonicalRange,
|
||||
parse::{parse_canonical_body, peek_canonical_offset},
|
||||
scan::scan_bytes,
|
||||
};
|
||||
|
||||
const CHANNEL_CAPACITY: usize = 50;
|
||||
const TAIL_CHUNK: usize = 5 * 1024 * 1024;
|
||||
/// Up to this many canonical blocks → tail pipeline. Beyond → forward.
|
||||
const TAIL_THRESHOLD: usize = 10;
|
||||
|
||||
/// Default parser-thread count for [`ReaderInner::after`]. The indexer
|
||||
/// is CPU-bound on the consumer side, so 1 parser + 1 reader (= 2
|
||||
/// threads total) leaves the rest of the cores for the indexer. Bench
|
||||
/// tools that drain the channel cheaply can override via
|
||||
/// [`ReaderInner::after_with`].
|
||||
pub(crate) const DEFAULT_PARSER_THREADS: usize = 1;
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Shared pipeline entry — called by `Reader::after_with` and `Reader::range_with`.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Spawns the reader worker and (for non-tail ranges) a scoped parser
|
||||
/// pool, and returns the consumer receiver. Shared backend for
|
||||
/// `after_with` and `range_with`.
|
||||
pub(crate) fn spawn(
|
||||
reader: &ReaderInner,
|
||||
canonical: CanonicalRange,
|
||||
parser_threads: usize,
|
||||
) -> Result<Receiver<ReadBlock>> {
|
||||
let parser_threads = parser_threads.max(1);
|
||||
|
||||
if canonical.is_empty() {
|
||||
return Ok(bounded(0).1);
|
||||
}
|
||||
|
||||
let paths = BlkIndexToBlkPath::scan(reader.blocks_dir());
|
||||
*reader.blk_index_to_blk_path.write() = paths.clone();
|
||||
|
||||
let (send, recv) = bounded(CHANNEL_CAPACITY);
|
||||
let xor_bytes = reader.xor_bytes();
|
||||
let use_tail = canonical.len() <= TAIL_THRESHOLD;
|
||||
let first_blk_index = if use_tail {
|
||||
0
|
||||
} else {
|
||||
reader
|
||||
.find_start_blk_index(Some(canonical.start), &paths, xor_bytes)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
thread::spawn(move || {
|
||||
let result = if use_tail {
|
||||
pipeline_tail(&paths, xor_bytes, &canonical, &send)
|
||||
} else {
|
||||
pipeline_forward(
|
||||
&paths,
|
||||
first_blk_index,
|
||||
xor_bytes,
|
||||
&canonical,
|
||||
&send,
|
||||
parser_threads,
|
||||
)
|
||||
};
|
||||
if let Err(e) = result {
|
||||
error!("Reader canonical pipeline failed: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(recv)
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Forward pipeline — 1 reader + N parsers + shared in-order emission.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/// A raw block the reader has already confirmed is on the canonical
|
||||
/// chain, shipped to the parser pool for full decoding.
|
||||
struct ScannedBlock {
|
||||
metadata: BlkMetadata,
|
||||
bytes: Vec<u8>,
|
||||
xor_state: XORIndex,
|
||||
canonical_offset: u32,
|
||||
}
|
||||
|
||||
/// In-order emission buffer shared between the parser threads. Access
|
||||
/// is serialised through a `parking_lot::Mutex`; at `parser_threads = 1`
|
||||
/// the lock is always uncontended.
|
||||
///
|
||||
/// Also enforces **chain continuity**: before emitting each block it
|
||||
/// checks that `block.header.prev_blockhash` matches the previously-
|
||||
/// emitted block's hash. A mismatch means the canonical-hash batch
|
||||
/// that produced this stream was stitched across a mid-batch reorg,
|
||||
/// so we stop emitting cleanly and let the caller retry.
|
||||
struct ReorderState {
|
||||
next_offset: u32,
|
||||
/// Ahead-of-line matches keyed by canonical offset; drained
|
||||
/// contiguously each time `next_offset` advances. Bounded in
|
||||
/// practice by parser-thread scheduling jitter.
|
||||
pending: FxHashMap<u32, ReadBlock>,
|
||||
send_to_consumer: Sender<ReadBlock>,
|
||||
/// Hash of the last block successfully emitted, used to verify
|
||||
/// continuity with the next one. `None` before the first emit.
|
||||
last_emitted_hash: Option<BlockHash>,
|
||||
/// Flipped when a continuity check fails.
|
||||
chain_broken: bool,
|
||||
}
|
||||
|
||||
impl ReorderState {
|
||||
fn new(send_to_consumer: Sender<ReadBlock>) -> Self {
|
||||
Self {
|
||||
next_offset: 0,
|
||||
pending: FxHashMap::default(),
|
||||
send_to_consumer,
|
||||
last_emitted_hash: None,
|
||||
chain_broken: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Accepts a parsed canonical block; emits it and drains any
|
||||
/// contiguous pending matches. Returns `false` once the pipeline
|
||||
/// should stop — either the consumer dropped the receiver or a
|
||||
/// chain-continuity check failed. Completion (all blocks emitted)
|
||||
/// is checked by the caller via `next_offset`.
|
||||
fn try_emit(&mut self, offset: u32, block: ReadBlock) -> bool {
|
||||
use std::cmp::Ordering::*;
|
||||
match offset.cmp(&self.next_offset) {
|
||||
Equal => {
|
||||
if !self.send_in_order(block) {
|
||||
return false;
|
||||
}
|
||||
while let Some(next) = self.pending.remove(&self.next_offset) {
|
||||
if !self.send_in_order(next) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
Greater => {
|
||||
self.pending.insert(offset, block);
|
||||
true
|
||||
}
|
||||
// Unreachable in practice: each canonical hash appears at
|
||||
// exactly one offset and each block is parsed once.
|
||||
Less => true,
|
||||
}
|
||||
}
|
||||
|
||||
/// Verifies `block.prev_blockhash` against the last emitted hash,
|
||||
/// sends the block, and bumps `next_offset`. Returns `false` on
|
||||
/// continuity failure or consumer drop.
|
||||
fn send_in_order(&mut self, block: ReadBlock) -> bool {
|
||||
if let Some(last) = &self.last_emitted_hash {
|
||||
let prev = BlockHash::from(block.header.prev_blockhash);
|
||||
if prev != *last {
|
||||
warn!(
|
||||
"canonical chain broken at offset {}: expected prev={} got {}",
|
||||
self.next_offset, last, prev,
|
||||
);
|
||||
self.chain_broken = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
let hash = block.hash().clone();
|
||||
if self.send_to_consumer.send(block).is_err() {
|
||||
return false;
|
||||
}
|
||||
self.last_emitted_hash = Some(hash);
|
||||
self.next_offset += 1;
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn pipeline_forward(
|
||||
paths: &BlkIndexToBlkPath,
|
||||
first_blk_index: u16,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &CanonicalRange,
|
||||
send: &Sender<ReadBlock>,
|
||||
parser_threads: usize,
|
||||
) -> Result<()> {
|
||||
let (parser_send, parser_recv) = bounded::<ScannedBlock>(CHANNEL_CAPACITY);
|
||||
let reorder = Mutex::new(ReorderState::new(send.clone()));
|
||||
let done = AtomicBool::new(false);
|
||||
|
||||
thread::scope(|scope| -> Result<()> {
|
||||
for _ in 0..parser_threads {
|
||||
let parser_recv = parser_recv.clone();
|
||||
scope.spawn(|| parser_loop(parser_recv, &reorder, &done, canonical, xor_bytes));
|
||||
}
|
||||
// Every parser owns its own clone; ours would otherwise keep
|
||||
// the channel "alive" and leak a dangling receiver.
|
||||
drop(parser_recv);
|
||||
|
||||
let read_result = read_and_dispatch(
|
||||
paths,
|
||||
first_blk_index,
|
||||
xor_bytes,
|
||||
canonical,
|
||||
&parser_send,
|
||||
&done,
|
||||
);
|
||||
// Signal end-of-input to the parsers so they exit their `for`
|
||||
// loops and the scope can join them.
|
||||
drop(parser_send);
|
||||
read_result
|
||||
})?;
|
||||
|
||||
let state = reorder.lock();
|
||||
if state.chain_broken {
|
||||
return Err(Error::Internal(
|
||||
"forward pipeline: canonical batch stitched across a reorg",
|
||||
));
|
||||
}
|
||||
let pipeline_cancelled = done.load(Ordering::Relaxed);
|
||||
let emitted = state.next_offset as usize;
|
||||
if !pipeline_cancelled && emitted < canonical.len() {
|
||||
return Err(Error::Internal(
|
||||
"forward pipeline: blk files missing canonical blocks",
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Full-body parse + in-order emit loop run by every scoped parser
|
||||
/// worker in `pipeline_forward`. Drains `parser_recv` to exhaustion.
|
||||
fn parser_loop(
|
||||
parser_recv: Receiver<ScannedBlock>,
|
||||
reorder: &Mutex<ReorderState>,
|
||||
done: &AtomicBool,
|
||||
canonical: &CanonicalRange,
|
||||
xor_bytes: XORBytes,
|
||||
) {
|
||||
for ScannedBlock { metadata, bytes, xor_state, canonical_offset } in parser_recv {
|
||||
if done.load(Ordering::Relaxed) {
|
||||
continue;
|
||||
}
|
||||
let height = Height::from(*canonical.start + canonical_offset);
|
||||
let block = match parse_canonical_body(bytes, metadata, xor_state, xor_bytes, height) {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
warn!("parse_canonical_body failed: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let pipeline_finished = {
|
||||
let mut state = reorder.lock();
|
||||
!state.try_emit(canonical_offset, block)
|
||||
|| state.next_offset as usize >= canonical.len()
|
||||
};
|
||||
if pipeline_finished {
|
||||
done.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Walk blk files from `first_blk_index`, scan each one, and ship
|
||||
/// canonical blocks to the parser pool. Non-canonical blocks are
|
||||
/// rejected via `peek_canonical_offset` *before* being cloned — the
|
||||
/// cheap filter is what lets a sparse catchup avoid allocating for the
|
||||
/// ~99% of blocks outside the window.
|
||||
fn read_and_dispatch(
|
||||
paths: &BlkIndexToBlkPath,
|
||||
first_blk_index: u16,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &CanonicalRange,
|
||||
parser_send: &Sender<ScannedBlock>,
|
||||
done: &AtomicBool,
|
||||
) -> Result<()> {
|
||||
for (&blk_index, blk_path) in paths.range(first_blk_index..) {
|
||||
if done.load(Ordering::Relaxed) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut bytes = fs::read(blk_path).map_err(|e| {
|
||||
error!("Failed to read blk file {}: {e}", blk_path.display());
|
||||
Error::Internal("Failed to read blk file")
|
||||
})?;
|
||||
|
||||
let result = scan_bytes(
|
||||
&mut bytes,
|
||||
blk_index,
|
||||
0,
|
||||
xor_bytes,
|
||||
|metadata, block_bytes, xor_state| {
|
||||
if done.load(Ordering::Relaxed) {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
let Some(canonical_offset) =
|
||||
peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical)
|
||||
else {
|
||||
return ControlFlow::Continue(());
|
||||
};
|
||||
let scanned = ScannedBlock {
|
||||
metadata,
|
||||
bytes: block_bytes.to_vec(),
|
||||
xor_state,
|
||||
canonical_offset,
|
||||
};
|
||||
if parser_send.send(scanned).is_err() {
|
||||
ControlFlow::Break(())
|
||||
} else {
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
if result.interrupted {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Tail pipeline — reverse-scan the newest blk files in 5 MB chunks until
|
||||
// every canonical hash has been matched, then emit them forward.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Single-threaded tail-range pipeline for small `canonical.len()`.
|
||||
/// Walks blk files in reverse-index order, reads each one in 5 MB
|
||||
/// chunks from tail to head, and stuffs every canonical match into an
|
||||
/// offset-indexed `slots` vec. Once every canonical block is matched,
|
||||
/// emits them in order through [`ReorderState`] (which doubles as the
|
||||
/// shared continuity checker). Bails on missing blocks or a chain
|
||||
/// break just like [`pipeline_forward`].
|
||||
fn pipeline_tail(
|
||||
paths: &BlkIndexToBlkPath,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &CanonicalRange,
|
||||
send: &Sender<ReadBlock>,
|
||||
) -> Result<()> {
|
||||
let mut slots: Vec<Option<ReadBlock>> = (0..canonical.len()).map(|_| None).collect();
|
||||
let mut remaining = canonical.len();
|
||||
// Carries the bytes before a chunk's first magic into the next
|
||||
// (earlier) chunk so blocks straddling the boundary survive.
|
||||
let mut spillover: Vec<u8> = Vec::new();
|
||||
|
||||
'files: for (&blk_index, path) in paths.iter().rev() {
|
||||
let mut file = File::open(path).map_err(|_| Error::Internal("Failed to open blk file"))?;
|
||||
let file_len = file.metadata().map(|m| m.len() as usize).unwrap_or(0);
|
||||
if file_len == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut read_end = file_len;
|
||||
spillover.clear();
|
||||
|
||||
while read_end > 0 && remaining > 0 {
|
||||
let read_start = read_end.saturating_sub(TAIL_CHUNK);
|
||||
let chunk_len = read_end - read_start;
|
||||
read_end = read_start;
|
||||
|
||||
file.seek(SeekFrom::Start(read_start as u64))
|
||||
.map_err(|_| Error::Internal("Failed to seek blk file"))?;
|
||||
let mut buf = vec![0u8; chunk_len + spillover.len()];
|
||||
file.read_exact(&mut buf[..chunk_len])
|
||||
.map_err(|_| Error::Internal("Failed to read blk chunk"))?;
|
||||
buf[chunk_len..].copy_from_slice(&spillover);
|
||||
spillover.clear();
|
||||
|
||||
let result = scan_bytes(
|
||||
&mut buf,
|
||||
blk_index,
|
||||
read_start,
|
||||
xor_bytes,
|
||||
|metadata, block_bytes, xor_state| {
|
||||
let Some(offset) =
|
||||
peek_canonical_offset(block_bytes, xor_state, xor_bytes, canonical)
|
||||
else {
|
||||
return ControlFlow::Continue(());
|
||||
};
|
||||
if slots[offset as usize].is_some() {
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
let height = Height::from(*canonical.start + offset);
|
||||
match parse_canonical_body(
|
||||
block_bytes.to_vec(),
|
||||
metadata,
|
||||
xor_state,
|
||||
xor_bytes,
|
||||
height,
|
||||
) {
|
||||
Ok(block) => {
|
||||
slots[offset as usize] = Some(block);
|
||||
remaining -= 1;
|
||||
}
|
||||
Err(e) => warn!("parse_canonical_body failed in tail pipeline: {e}"),
|
||||
}
|
||||
if remaining == 0 {
|
||||
ControlFlow::Break(())
|
||||
} else {
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
if remaining == 0 {
|
||||
break 'files;
|
||||
}
|
||||
if read_start > 0 {
|
||||
spillover.extend_from_slice(&buf[..result.first_magic.unwrap_or(buf.len())]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if remaining > 0 {
|
||||
return Err(Error::Internal(
|
||||
"tail pipeline: blk files missing canonical blocks",
|
||||
));
|
||||
}
|
||||
|
||||
// Emit in canonical order via the same `ReorderState` the forward
|
||||
// pipeline uses, which verifies `prev_blockhash` continuity between
|
||||
// adjacent blocks as a side effect of `try_emit`.
|
||||
let mut reorder = ReorderState::new(send.clone());
|
||||
for (offset, block) in slots.into_iter().flatten().enumerate() {
|
||||
if !reorder.try_emit(offset as u32, block) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if reorder.chain_broken {
|
||||
return Err(Error::Internal(
|
||||
"tail pipeline: canonical batch stitched across a reorg",
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
164
crates/brk_reader/src/pipeline/forward.rs
Normal file
164
crates/brk_reader/src/pipeline/forward.rs
Normal file
@@ -0,0 +1,164 @@
|
||||
//! Forward pipeline: 1 reader thread + N scoped parser threads.
|
||||
//! Reader walks blk files from a bisection lower bound, peeks each
|
||||
//! block's header against `CanonicalRange`, and ships hits to the
|
||||
//! parser pool. Parsers decode bodies in parallel and emit in-order
|
||||
//! through `ReorderState`.
|
||||
|
||||
use std::{fs, ops::ControlFlow, sync::OnceLock, thread};
|
||||
|
||||
use bitcoin::block::Header;
|
||||
use brk_error::{Error, Result};
|
||||
use brk_types::{BlkMetadata, Height, ReadBlock};
|
||||
use crossbeam::channel::{Receiver, Sender, bounded};
|
||||
use parking_lot::Mutex;
|
||||
use tracing::error;
|
||||
|
||||
use crate::{
|
||||
BlkIndexToBlkPath, BlockHash, XORBytes, XORIndex,
|
||||
canonical::CanonicalRange,
|
||||
parse::{parse_canonical_body, peek_canonical},
|
||||
pipeline::{CHANNEL_CAPACITY, reorder::ReorderState},
|
||||
scan::scan_bytes,
|
||||
};
|
||||
|
||||
/// Reader→parser channel message. `header` was decoded during the
|
||||
/// peek and is reused so the parser doesn't redo it.
|
||||
struct ScannedBlock {
|
||||
metadata: BlkMetadata,
|
||||
bytes: Vec<u8>,
|
||||
xor_state: XORIndex,
|
||||
canonical_offset: u32,
|
||||
header: Header,
|
||||
}
|
||||
|
||||
/// Single shared signal carrying both the cancel flag and (when set
|
||||
/// to `Failed`) the first parse error. `stop.get().is_some()` is the
|
||||
/// reader's cheap "should I stop" check.
|
||||
enum Stop {
|
||||
Done,
|
||||
Failed(Error),
|
||||
}
|
||||
|
||||
pub(super) fn pipeline_forward(
|
||||
paths: &BlkIndexToBlkPath,
|
||||
first_blk_index: u16,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &CanonicalRange,
|
||||
anchor: Option<BlockHash>,
|
||||
send: &Sender<Result<ReadBlock>>,
|
||||
parser_threads: usize,
|
||||
) -> Result<()> {
|
||||
let (parser_send, parser_recv) = bounded::<ScannedBlock>(CHANNEL_CAPACITY);
|
||||
let reorder = Mutex::new(ReorderState::new(send.clone(), anchor));
|
||||
let stop: OnceLock<Stop> = OnceLock::new();
|
||||
|
||||
thread::scope(|scope| {
|
||||
for _ in 0..parser_threads {
|
||||
let parser_recv = parser_recv.clone();
|
||||
scope.spawn(|| parser_loop(parser_recv, &reorder, &stop, canonical, xor_bytes));
|
||||
}
|
||||
// Every parser owns its own clone; ours would otherwise leak
|
||||
// a dangling receiver.
|
||||
drop(parser_recv);
|
||||
|
||||
let read_result =
|
||||
read_and_dispatch(paths, first_blk_index, xor_bytes, canonical, &parser_send, &stop);
|
||||
// End-of-input signal so parser `for` loops exit and the
|
||||
// scope can join.
|
||||
drop(parser_send);
|
||||
read_result
|
||||
})?;
|
||||
|
||||
if let Some(Stop::Failed(e)) = stop.into_inner() {
|
||||
return Err(e);
|
||||
}
|
||||
reorder.into_inner().finalize(canonical.len())
|
||||
}
|
||||
|
||||
/// Per-thread parser body: drain `parser_recv`, decode each block,
|
||||
/// emit through `reorder`. Stops on `stop`.
|
||||
fn parser_loop(
|
||||
parser_recv: Receiver<ScannedBlock>,
|
||||
reorder: &Mutex<ReorderState>,
|
||||
stop: &OnceLock<Stop>,
|
||||
canonical: &CanonicalRange,
|
||||
xor_bytes: XORBytes,
|
||||
) {
|
||||
for ScannedBlock {
|
||||
metadata,
|
||||
bytes,
|
||||
xor_state,
|
||||
canonical_offset,
|
||||
header,
|
||||
} in parser_recv
|
||||
{
|
||||
if stop.get().is_some() {
|
||||
continue;
|
||||
}
|
||||
let height = Height::from(*canonical.start + canonical_offset);
|
||||
let block =
|
||||
match parse_canonical_body(bytes, metadata, xor_state, xor_bytes, height, header) {
|
||||
Ok(block) => block,
|
||||
Err(e) => {
|
||||
error!("parse_canonical_body failed at height {height}: {e}");
|
||||
let _ = stop.set(Stop::Failed(e));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let pipeline_finished = {
|
||||
let mut state = reorder.lock();
|
||||
!state.try_emit(canonical_offset, block)
|
||||
|| state.next_offset as usize >= canonical.len()
|
||||
};
|
||||
if pipeline_finished {
|
||||
let _ = stop.set(Stop::Done);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `peek_canonical` filters orphans **before** the block bytes are
|
||||
/// cloned, so a sparse catchup avoids allocating for the ~99% of
|
||||
/// blocks outside the window.
|
||||
fn read_and_dispatch(
|
||||
paths: &BlkIndexToBlkPath,
|
||||
first_blk_index: u16,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &CanonicalRange,
|
||||
parser_send: &Sender<ScannedBlock>,
|
||||
stop: &OnceLock<Stop>,
|
||||
) -> Result<()> {
|
||||
for (&blk_index, blk_path) in paths.range(first_blk_index..) {
|
||||
if stop.get().is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
let mut bytes = fs::read(blk_path)?;
|
||||
scan_bytes(
|
||||
&mut bytes,
|
||||
blk_index,
|
||||
xor_bytes,
|
||||
|metadata, block_bytes, xor_state| {
|
||||
if stop.get().is_some() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
let Some((canonical_offset, header)) =
|
||||
peek_canonical(block_bytes, xor_state, xor_bytes, canonical)
|
||||
else {
|
||||
return ControlFlow::Continue(());
|
||||
};
|
||||
let scanned = ScannedBlock {
|
||||
metadata,
|
||||
bytes: block_bytes.to_vec(),
|
||||
xor_state,
|
||||
canonical_offset,
|
||||
header,
|
||||
};
|
||||
if parser_send.send(scanned).is_err() {
|
||||
ControlFlow::Break(())
|
||||
} else {
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
127
crates/brk_reader/src/pipeline/mod.rs
Normal file
127
crates/brk_reader/src/pipeline/mod.rs
Normal file
@@ -0,0 +1,127 @@
|
||||
//! Two-strategy block-streaming pipeline. [`spawn`] picks between:
|
||||
//!
|
||||
//! * **forward** — one reader thread walks blk files in order from a
|
||||
//! bisection lower bound; canonical hits ship to a parser pool that
|
||||
//! emits in-order through [`reorder::ReorderState`].
|
||||
//! * **tail** — single-threaded reverse scan of the newest blk files,
|
||||
//! buffering matches in offset slots, then emitting forward with
|
||||
//! an inline chain check.
|
||||
//!
|
||||
//! Both strategies verify `block.header.prev_blockhash` against the
|
||||
//! previously emitted block — and against the user-supplied `anchor`
|
||||
//! for the very first block — and propagate a final `Err` to the
|
||||
//! consumer on chain breaks, parse failures, or missing blocks.
|
||||
|
||||
use std::{sync::Arc, thread};
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_rpc::Client;
|
||||
use brk_types::{BlockHash, Height, ReadBlock};
|
||||
use crossbeam::channel::{Receiver, bounded};
|
||||
|
||||
use crate::{
|
||||
BlkIndexToBlkPath, ReaderInner, XORBytes, bisect,
|
||||
canonical::CanonicalRange,
|
||||
};
|
||||
|
||||
mod forward;
|
||||
mod reorder;
|
||||
mod tail;
|
||||
|
||||
pub(crate) const CHANNEL_CAPACITY: usize = 50;
|
||||
|
||||
/// If `canonical.start` lives within this many files of the chain
|
||||
/// tip, use the reverse-scan pipeline. The forward pipeline pays the
|
||||
/// bisection + 21-file backoff (~2.7 GB of reads) regardless of how
|
||||
/// few canonical blocks live in the window, so for any tip-clustered
|
||||
/// catchup the tail wins until the window grows past this many files.
|
||||
const TAIL_DISTANCE_FILES: usize = 8;
|
||||
|
||||
/// The indexer is CPU-bound on the consumer side, so 1 reader + 1
|
||||
/// parser leaves the rest of the cores for it. Bench tools that
|
||||
/// drain the channel cheaply can override.
|
||||
pub(crate) const DEFAULT_PARSER_THREADS: usize = 1;
|
||||
|
||||
enum Strategy {
|
||||
Tail,
|
||||
Forward { first_blk_index: u16 },
|
||||
}
|
||||
|
||||
/// `anchor`, when supplied, is the hash the consumer expects to be
|
||||
/// the **parent** of the first emitted block. Seeded into the chain
|
||||
/// check so a stale `Reader::after` anchor (e.g. the tip of a
|
||||
/// reorged-out chain) cannot silently produce a stitched stream.
|
||||
/// `None` skips the check (genesis or `range`-style calls have no
|
||||
/// anchor to verify against).
|
||||
pub(crate) fn spawn(
|
||||
reader: Arc<ReaderInner>,
|
||||
canonical: CanonicalRange,
|
||||
anchor: Option<BlockHash>,
|
||||
parser_threads: usize,
|
||||
) -> Result<Receiver<Result<ReadBlock>>> {
|
||||
// Cap at the parser channel capacity: beyond that, extra parsers
|
||||
// are idle (they all contend for the same buffered items) and
|
||||
// absurd inputs would otherwise OOM the scoped spawn.
|
||||
let parser_threads = parser_threads.clamp(1, CHANNEL_CAPACITY);
|
||||
|
||||
if canonical.is_empty() {
|
||||
return Ok(bounded(0).1);
|
||||
}
|
||||
|
||||
let paths = reader.refresh_paths()?;
|
||||
let xor_bytes = reader.xor_bytes;
|
||||
let strategy = pick_strategy(&reader.client, &paths, xor_bytes, canonical.start);
|
||||
|
||||
let (send, recv) = bounded(CHANNEL_CAPACITY);
|
||||
|
||||
thread::spawn(move || {
|
||||
let result = match strategy {
|
||||
Strategy::Tail => {
|
||||
tail::pipeline_tail(&reader.client, &paths, xor_bytes, &canonical, anchor, &send)
|
||||
}
|
||||
Strategy::Forward { first_blk_index } => forward::pipeline_forward(
|
||||
&paths,
|
||||
first_blk_index,
|
||||
xor_bytes,
|
||||
&canonical,
|
||||
anchor,
|
||||
&send,
|
||||
parser_threads,
|
||||
),
|
||||
};
|
||||
if let Err(e) = result {
|
||||
// No-op if the consumer already dropped the receiver.
|
||||
let _ = send.send(Err(e));
|
||||
}
|
||||
});
|
||||
|
||||
Ok(recv)
|
||||
}
|
||||
|
||||
/// Tail iff one of the last `TAIL_DISTANCE_FILES` files starts at a
|
||||
/// height ≤ `canonical_start`; that file is where tail iteration
|
||||
/// would land. Otherwise bisect for the forward start. Genesis-rooted
|
||||
/// catchups skip the tail probes since no file's first block is ≤
|
||||
/// genesis.
|
||||
fn pick_strategy(
|
||||
client: &Client,
|
||||
paths: &BlkIndexToBlkPath,
|
||||
xor_bytes: XORBytes,
|
||||
canonical_start: Height,
|
||||
) -> Strategy {
|
||||
if canonical_start != Height::ZERO
|
||||
&& paths
|
||||
.iter()
|
||||
.rev()
|
||||
.take(TAIL_DISTANCE_FILES)
|
||||
.any(|(_, path)| {
|
||||
bisect::first_block_height(client, path, xor_bytes)
|
||||
.is_ok_and(|h| h <= canonical_start)
|
||||
})
|
||||
{
|
||||
return Strategy::Tail;
|
||||
}
|
||||
Strategy::Forward {
|
||||
first_blk_index: bisect::find_start_blk_index(client, canonical_start, paths, xor_bytes),
|
||||
}
|
||||
}
|
||||
110
crates/brk_reader/src/pipeline/reorder.rs
Normal file
110
crates/brk_reader/src/pipeline/reorder.rs
Normal file
@@ -0,0 +1,110 @@
|
||||
//! In-order emission buffer + chain-continuity check used by the
|
||||
//! forward pipeline. Parsers complete blocks out of order, so this
|
||||
//! parks ahead-of-line matches in `pending` until `next_offset`
|
||||
//! catches up.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
|
||||
use brk_error::{Error, Result};
|
||||
use brk_types::{BlockHash, ReadBlock};
|
||||
use crossbeam::channel::Sender;
|
||||
use rustc_hash::FxHashMap;
|
||||
use tracing::warn;
|
||||
|
||||
/// Accessed by the parser pool under a `parking_lot::Mutex` owned by
|
||||
/// `pipeline_forward`; at `parser_threads = 1` the lock is always
|
||||
/// uncontended.
|
||||
pub(super) struct ReorderState {
|
||||
pub(super) next_offset: u32,
|
||||
pending: FxHashMap<u32, ReadBlock>,
|
||||
send_to_consumer: Sender<Result<ReadBlock>>,
|
||||
/// Seeded with the user-supplied anchor so the first emit is
|
||||
/// also verified against it.
|
||||
last_emitted_hash: Option<BlockHash>,
|
||||
/// A `prev_blockhash` mismatch fires this; converted into a
|
||||
/// final `Err` by `finalize`.
|
||||
chain_broken: bool,
|
||||
/// Distinguishes "consumer cancelled" from "ran out of work
|
||||
/// early" in the missing-blocks check inside `finalize`.
|
||||
consumer_dropped: bool,
|
||||
}
|
||||
|
||||
impl ReorderState {
|
||||
pub(super) fn new(send_to_consumer: Sender<Result<ReadBlock>>, anchor: Option<BlockHash>) -> Self {
|
||||
Self {
|
||||
next_offset: 0,
|
||||
pending: FxHashMap::default(),
|
||||
send_to_consumer,
|
||||
last_emitted_hash: anchor,
|
||||
chain_broken: false,
|
||||
consumer_dropped: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolves the pipeline's exit state. Called by
|
||||
/// `pipeline_forward` after the read loop has finished and all
|
||||
/// parser threads have joined.
|
||||
pub(super) fn finalize(self, expected_count: usize) -> Result<()> {
|
||||
if self.chain_broken {
|
||||
return Err(Error::Internal(
|
||||
"forward pipeline: canonical batch stitched across a reorg",
|
||||
));
|
||||
}
|
||||
if !self.consumer_dropped && (self.next_offset as usize) < expected_count {
|
||||
return Err(Error::Internal(
|
||||
"forward pipeline: blk files missing canonical blocks",
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emits `block` if it's the next expected offset (and drains
|
||||
/// any contiguous pending matches), otherwise parks it. Returns
|
||||
/// `false` once the pipeline should stop (consumer drop or chain
|
||||
/// break).
|
||||
pub(super) fn try_emit(&mut self, offset: u32, block: ReadBlock) -> bool {
|
||||
match offset.cmp(&self.next_offset) {
|
||||
Ordering::Equal => {
|
||||
if !self.send_in_order(block) {
|
||||
return false;
|
||||
}
|
||||
while let Some(next) = self.pending.remove(&self.next_offset) {
|
||||
if !self.send_in_order(next) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
Ordering::Greater => {
|
||||
self.pending.insert(offset, block);
|
||||
true
|
||||
}
|
||||
// Each canonical hash appears at exactly one offset and
|
||||
// each block is parsed once, so this is unreachable in
|
||||
// practice.
|
||||
Ordering::Less => true,
|
||||
}
|
||||
}
|
||||
|
||||
fn send_in_order(&mut self, block: ReadBlock) -> bool {
|
||||
if let Some(last) = &self.last_emitted_hash {
|
||||
let prev = BlockHash::from(block.header.prev_blockhash);
|
||||
if prev != *last {
|
||||
warn!(
|
||||
"canonical chain broken at offset {}: expected prev={} got {}",
|
||||
self.next_offset, last, prev,
|
||||
);
|
||||
self.chain_broken = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
let hash = block.hash().clone();
|
||||
if self.send_to_consumer.send(Ok(block)).is_err() {
|
||||
self.consumer_dropped = true;
|
||||
return false;
|
||||
}
|
||||
self.last_emitted_hash = Some(hash);
|
||||
self.next_offset += 1;
|
||||
true
|
||||
}
|
||||
}
|
||||
129
crates/brk_reader/src/pipeline/tail.rs
Normal file
129
crates/brk_reader/src/pipeline/tail.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
//! Tail pipeline: single-threaded reverse scan of the newest blk
|
||||
//! files until every canonical hash is matched, then forward-emit
|
||||
//! with an inline chain check. Avoids the forward pipeline's
|
||||
//! bisection + out-of-order backoff (~2.7 GB of reads) for any
|
||||
//! tip-clustered catchup.
|
||||
|
||||
use std::{fs, ops::ControlFlow};
|
||||
|
||||
use brk_error::{Error, Result};
|
||||
use brk_rpc::Client;
|
||||
use brk_types::{Height, ReadBlock};
|
||||
use crossbeam::channel::Sender;
|
||||
|
||||
use crate::{
|
||||
BlkIndexToBlkPath, BlockHash, OUT_OF_ORDER_FILE_BACKOFF, XORBytes, bisect,
|
||||
canonical::CanonicalRange,
|
||||
parse::{parse_canonical_body, peek_canonical},
|
||||
scan::scan_bytes,
|
||||
};
|
||||
|
||||
pub(super) fn pipeline_tail(
|
||||
client: &Client,
|
||||
paths: &BlkIndexToBlkPath,
|
||||
xor_bytes: XORBytes,
|
||||
canonical: &CanonicalRange,
|
||||
anchor: Option<BlockHash>,
|
||||
send: &Sender<Result<ReadBlock>>,
|
||||
) -> Result<()> {
|
||||
let mut slots: Vec<Option<ReadBlock>> = (0..canonical.len()).map(|_| None).collect();
|
||||
let mut remaining = canonical.len();
|
||||
let mut parse_failure: Option<Error> = None;
|
||||
// Bailout streak: gives up after OUT_OF_ORDER_FILE_BACKOFF
|
||||
// consecutive files below the canonical window so a permanent
|
||||
// miss doesn't scan the entire chain in reverse.
|
||||
let mut below_floor_streak: usize = 0;
|
||||
|
||||
for (&blk_index, path) in paths.iter().rev() {
|
||||
// If this file's first block is below the lowest still-missing
|
||||
// canonical height, we've walked past the window.
|
||||
if let Some(missing_idx) = slots.iter().position(Option::is_none)
|
||||
&& let Ok(first_height) = bisect::first_block_height(client, path, xor_bytes)
|
||||
{
|
||||
let lowest_missing = Height::from(*canonical.start + missing_idx as u32);
|
||||
if first_height < lowest_missing {
|
||||
below_floor_streak += 1;
|
||||
if below_floor_streak >= OUT_OF_ORDER_FILE_BACKOFF {
|
||||
return Err(Error::Internal(
|
||||
"tail pipeline: walked past the canonical window without finding all blocks",
|
||||
));
|
||||
}
|
||||
} else {
|
||||
below_floor_streak = 0;
|
||||
}
|
||||
}
|
||||
|
||||
let mut bytes = fs::read(path)?;
|
||||
scan_bytes(
|
||||
&mut bytes,
|
||||
blk_index,
|
||||
xor_bytes,
|
||||
|metadata, block_bytes, xor_state| {
|
||||
let Some((offset, header)) =
|
||||
peek_canonical(block_bytes, xor_state, xor_bytes, canonical)
|
||||
else {
|
||||
return ControlFlow::Continue(());
|
||||
};
|
||||
if slots[offset as usize].is_some() {
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
let height = Height::from(*canonical.start + offset);
|
||||
match parse_canonical_body(
|
||||
block_bytes.to_vec(),
|
||||
metadata,
|
||||
xor_state,
|
||||
xor_bytes,
|
||||
height,
|
||||
header,
|
||||
) {
|
||||
Ok(block) => {
|
||||
slots[offset as usize] = Some(block);
|
||||
remaining -= 1;
|
||||
}
|
||||
Err(e) => {
|
||||
parse_failure = Some(e);
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
}
|
||||
if remaining == 0 {
|
||||
ControlFlow::Break(())
|
||||
} else {
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
if let Some(e) = parse_failure {
|
||||
return Err(e);
|
||||
}
|
||||
if remaining == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if remaining > 0 {
|
||||
return Err(Error::Internal(
|
||||
"tail pipeline: blk files missing canonical blocks",
|
||||
));
|
||||
}
|
||||
|
||||
// Inline chain check; ReorderState would be 130 lines of
|
||||
// machinery for the single-threaded path.
|
||||
let mut last_hash: Option<BlockHash> = anchor;
|
||||
for slot in slots {
|
||||
let block = slot.expect("tail pipeline left a slot empty after `remaining == 0`");
|
||||
if let Some(prev) = &last_hash {
|
||||
let actual_prev = BlockHash::from(block.header.prev_blockhash);
|
||||
if actual_prev != *prev {
|
||||
return Err(Error::Internal(
|
||||
"tail pipeline: canonical batch stitched across a reorg",
|
||||
));
|
||||
}
|
||||
}
|
||||
last_hash = Some(block.hash().clone());
|
||||
if send.send(Ok(block)).is_err() {
|
||||
return Ok(()); // consumer dropped — clean exit
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -2,75 +2,76 @@ use std::ops::ControlFlow;
|
||||
|
||||
use brk_types::{BlkMetadata, BlkPosition};
|
||||
|
||||
use crate::{XORBytes, XORIndex};
|
||||
use crate::{XORBytes, XORIndex, xor_bytes::XOR_LEN};
|
||||
|
||||
const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217];
|
||||
const MAGIC_BYTES: [u8; 4] = [0xF9, 0xBE, 0xB4, 0xD9];
|
||||
|
||||
pub fn find_magic(bytes: &[u8], xor_i: &mut XORIndex, xor_bytes: XORBytes) -> Option<usize> {
|
||||
let mut window = [0u8; 4];
|
||||
for (i, &b) in bytes.iter().enumerate() {
|
||||
window.rotate_left(1);
|
||||
window[3] = xor_i.byte(b, xor_bytes);
|
||||
if window == MAGIC_BYTES {
|
||||
return Some(i + 1);
|
||||
}
|
||||
/// Returns the position **immediately after** the matched magic, or
|
||||
/// `None` if no match. Advances `xor_i` by the bytes consumed either
|
||||
/// way.
|
||||
pub(crate) fn find_magic(bytes: &[u8], xor_i: &mut XORIndex, xor_bytes: XORBytes) -> Option<usize> {
|
||||
let len = bytes.len();
|
||||
if len < MAGIC_BYTES.len() {
|
||||
xor_i.add_assign(len);
|
||||
return None;
|
||||
}
|
||||
|
||||
let xb = *xor_bytes;
|
||||
let mut phase = xor_i.phase();
|
||||
let mut i = 0;
|
||||
let stop = len - MAGIC_BYTES.len();
|
||||
|
||||
while i <= stop {
|
||||
if bytes[i] ^ xb[phase] == MAGIC_BYTES[0] {
|
||||
let p1 = (phase + 1) & (XOR_LEN - 1);
|
||||
let p2 = (phase + 2) & (XOR_LEN - 1);
|
||||
let p3 = (phase + 3) & (XOR_LEN - 1);
|
||||
if bytes[i + 1] ^ xb[p1] == MAGIC_BYTES[1]
|
||||
&& bytes[i + 2] ^ xb[p2] == MAGIC_BYTES[2]
|
||||
&& bytes[i + 3] ^ xb[p3] == MAGIC_BYTES[3]
|
||||
{
|
||||
xor_i.set_phase(phase + MAGIC_BYTES.len());
|
||||
return Some(i + MAGIC_BYTES.len());
|
||||
}
|
||||
}
|
||||
phase = (phase + 1) & (XOR_LEN - 1);
|
||||
i += 1;
|
||||
}
|
||||
|
||||
xor_i.set_phase(phase + (len - i));
|
||||
None
|
||||
}
|
||||
|
||||
pub struct ScanResult {
|
||||
pub first_magic: Option<usize>,
|
||||
pub interrupted: bool,
|
||||
}
|
||||
|
||||
/// Scans `buf` for blocks. `file_offset` is the absolute position of
|
||||
/// `buf[0]` in the file. Calls `on_block` for each complete block found,
|
||||
/// passing the block's raw bytes as a mutable borrow of the buffer — the
|
||||
/// caller decides whether to clone them (e.g. to ship owned data to a
|
||||
/// parser thread) or process them in place (e.g. cheap header peek).
|
||||
pub fn scan_bytes(
|
||||
/// Scans `buf` (the full contents of one blk file) for blocks,
|
||||
/// calling `on_block` for each. The block bytes are passed as a
|
||||
/// mutable borrow so the callback can clone (to ship to a parser
|
||||
/// thread) or process in place (to peek the header).
|
||||
pub(crate) fn scan_bytes(
|
||||
buf: &mut [u8],
|
||||
blk_index: u16,
|
||||
file_offset: usize,
|
||||
xor_bytes: XORBytes,
|
||||
mut on_block: impl FnMut(BlkMetadata, &mut [u8], XORIndex) -> ControlFlow<()>,
|
||||
) -> ScanResult {
|
||||
) {
|
||||
let mut xor_i = XORIndex::default();
|
||||
xor_i.add_assign(file_offset);
|
||||
let mut first_magic = None;
|
||||
let mut i = 0;
|
||||
|
||||
while let Some(off) = find_magic(&buf[i..], &mut xor_i, xor_bytes) {
|
||||
let before = i;
|
||||
i += off;
|
||||
first_magic.get_or_insert(before + off.saturating_sub(4));
|
||||
if i + 4 > buf.len() {
|
||||
break;
|
||||
return;
|
||||
}
|
||||
let len = u32::from_le_bytes(
|
||||
xor_i
|
||||
.bytes(&mut buf[i..i + 4], xor_bytes)
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
) as usize;
|
||||
let mut size_bytes = [buf[i], buf[i + 1], buf[i + 2], buf[i + 3]];
|
||||
xor_i.bytes(&mut size_bytes, xor_bytes);
|
||||
let len = u32::from_le_bytes(size_bytes) as usize;
|
||||
i += 4;
|
||||
if i + len > buf.len() {
|
||||
break;
|
||||
return;
|
||||
}
|
||||
let position = BlkPosition::new(blk_index, (file_offset + i) as u32);
|
||||
let metadata = BlkMetadata::new(position, len as u32);
|
||||
let metadata = BlkMetadata::new(BlkPosition::new(blk_index, i as u32), len as u32);
|
||||
if on_block(metadata, &mut buf[i..i + len], xor_i).is_break() {
|
||||
return ScanResult {
|
||||
first_magic,
|
||||
interrupted: true,
|
||||
};
|
||||
return;
|
||||
}
|
||||
i += len;
|
||||
xor_i.add_assign(len);
|
||||
}
|
||||
|
||||
ScanResult {
|
||||
first_magic,
|
||||
interrupted: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,20 +7,30 @@ pub const XOR_LEN: usize = 8;
|
||||
#[derive(Debug, Clone, Copy, Deref, PartialEq, Eq)]
|
||||
pub struct XORBytes([u8; XOR_LEN]);
|
||||
|
||||
impl XORBytes {
|
||||
/// All-zero mask: nodes without `xor.dat` need no decode.
|
||||
#[inline]
|
||||
pub fn is_identity(self) -> bool {
|
||||
self.0 == [0u8; XOR_LEN]
|
||||
}
|
||||
}
|
||||
|
||||
impl From<[u8; XOR_LEN]> for XORBytes {
|
||||
#[inline]
|
||||
fn from(value: [u8; XOR_LEN]) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Path> for XORBytes {
|
||||
/// Loads `<blocks_dir>/xor.dat`. Falls back to the identity mask
|
||||
/// if missing, unreadable, or the wrong length.
|
||||
#[inline]
|
||||
fn from(value: &Path) -> Self {
|
||||
Self(
|
||||
fs::read(value.join("xor.dat"))
|
||||
.unwrap_or(vec![0; 8])
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
)
|
||||
let mask = fs::read(value.join("xor.dat"))
|
||||
.ok()
|
||||
.and_then(|v| <[u8; XOR_LEN]>::try_from(v).ok())
|
||||
.unwrap_or([0; XOR_LEN]);
|
||||
Self(mask)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,44 +4,67 @@ use crate::xor_bytes::{XOR_LEN, XORBytes};
|
||||
pub struct XORIndex(usize);
|
||||
|
||||
impl XORIndex {
|
||||
pub fn bytes<'a>(&mut self, bytes: &'a mut [u8], xor_bytes: XORBytes) -> &'a mut [u8] {
|
||||
let len = bytes.len();
|
||||
let mut bytes_index = 0;
|
||||
|
||||
while bytes_index < len {
|
||||
bytes[bytes_index] ^= xor_bytes[self.0];
|
||||
self.increment();
|
||||
bytes_index += 1;
|
||||
}
|
||||
|
||||
bytes
|
||||
/// Phase-aligned `XORIndex` for a buffer that conceptually starts
|
||||
/// at `offset` in the blk file.
|
||||
#[inline]
|
||||
pub fn at_offset(offset: usize) -> Self {
|
||||
Self(offset & (XOR_LEN - 1))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn byte(&mut self, mut byte: u8, xor_bytes: XORBytes) -> u8 {
|
||||
byte ^= xor_bytes[self.0];
|
||||
self.increment();
|
||||
byte
|
||||
pub(crate) fn phase(self) -> usize {
|
||||
self.0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn increment(&mut self) {
|
||||
self.0 += 1;
|
||||
if self.0 == XOR_LEN {
|
||||
self.0 = 0;
|
||||
}
|
||||
pub(crate) fn set_phase(&mut self, phase: usize) {
|
||||
self.0 = phase & (XOR_LEN - 1);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn add_assign(&mut self, i: usize) {
|
||||
self.0 = (self.0 + i) % XOR_LEN;
|
||||
self.0 = (self.0 + i) & (XOR_LEN - 1);
|
||||
}
|
||||
|
||||
/// XOR-decode `buffer` starting at `offset`.
|
||||
/// XOR-decode `bytes` in place, advancing the phase. Aligned 8-byte
|
||||
/// chunks XOR against the full mask in one go (auto-vectorised by
|
||||
/// LLVM); only the head/tail straddling alignment are scalar.
|
||||
pub fn bytes<'a>(&mut self, bytes: &'a mut [u8], xor_bytes: XORBytes) -> &'a mut [u8] {
|
||||
if xor_bytes.is_identity() {
|
||||
return bytes;
|
||||
}
|
||||
let xb = *xor_bytes;
|
||||
let mut phase = self.0;
|
||||
let len = bytes.len();
|
||||
let mut i = 0;
|
||||
|
||||
while phase != 0 && i < len {
|
||||
bytes[i] ^= xb[phase];
|
||||
phase = (phase + 1) & (XOR_LEN - 1);
|
||||
i += 1;
|
||||
}
|
||||
|
||||
let body_len = (len - i) & !(XOR_LEN - 1);
|
||||
for chunk in bytes[i..i + body_len].chunks_exact_mut(XOR_LEN) {
|
||||
for (b, m) in chunk.iter_mut().zip(xb) {
|
||||
*b ^= m;
|
||||
}
|
||||
}
|
||||
i += body_len;
|
||||
|
||||
while i < len {
|
||||
bytes[i] ^= xb[phase];
|
||||
phase = (phase + 1) & (XOR_LEN - 1);
|
||||
i += 1;
|
||||
}
|
||||
|
||||
self.0 = phase;
|
||||
bytes
|
||||
}
|
||||
|
||||
/// XOR-decode `buffer` as if it lived at `offset` in the blk file.
|
||||
#[inline]
|
||||
pub fn decode_at(buffer: &mut [u8], offset: usize, xor_bytes: XORBytes) {
|
||||
let mut xori = Self::default();
|
||||
xori.add_assign(offset);
|
||||
xori.bytes(buffer, xor_bytes);
|
||||
Self::at_offset(offset).bytes(buffer, xor_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user