mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
global: snap
This commit is contained in:
52
crates/brk_reader/examples/last_n_bench.rs
Normal file
52
crates/brk_reader/examples/last_n_bench.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
//! Times `Reader::after` for a handful of tail-clustered catchup
|
||||
//! sizes. `N ≤ ~1024` lands in the tail strategy (chunked reverse
|
||||
//! reader); `N = 10_000` falls through to the forward strategy since
|
||||
//! it's past the 8-newest-files window.
|
||||
//!
|
||||
//! Run with:
|
||||
//! cargo run --release -p brk_reader --example last_n_bench
|
||||
//!
|
||||
//! Requires a running bitcoind with a cookie file at the default path.
|
||||
|
||||
use std::time::Instant;
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_reader::Reader;
|
||||
use brk_rpc::{Auth, Client};
|
||||
use brk_types::Height;
|
||||
|
||||
const SCENARIOS: &[u32] = &[1, 10, 100, 1_000, 10_000];
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let bitcoin_dir = Client::default_bitcoin_path();
|
||||
let client = Client::new(
|
||||
Client::default_url(),
|
||||
Auth::CookieFile(bitcoin_dir.join(".cookie")),
|
||||
)?;
|
||||
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
|
||||
|
||||
let tip = client.get_last_height()?;
|
||||
println!("Tip: {tip}");
|
||||
println!();
|
||||
println!("{:>6} {:>14} {:>10}", "blocks", "elapsed", "blk/s");
|
||||
println!("{}", "-".repeat(36));
|
||||
|
||||
for &n in SCENARIOS {
|
||||
let anchor_height = Height::from(tip.saturating_sub(n));
|
||||
let anchor_hash = client.get_block_hash(*anchor_height as u64)?;
|
||||
let anchor = Some(anchor_hash);
|
||||
|
||||
let start = Instant::now();
|
||||
let mut count = 0usize;
|
||||
for block in reader.after(anchor)? {
|
||||
let _ = block?;
|
||||
count += 1;
|
||||
}
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
let blk_per_s = count as f64 / elapsed.as_secs_f64().max(f64::EPSILON);
|
||||
println!("{n:>6} {elapsed:>14?} {blk_per_s:>10.0}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -53,7 +53,7 @@ pub(crate) fn first_block_height(
|
||||
}
|
||||
xor_i.bytes(&mut buf[magic_end..header_end], xor_bytes);
|
||||
|
||||
let header = Header::consensus_decode(&mut &buf[magic_end + 4..header_end])?;
|
||||
let header = Header::consensus_decode_from_finite_reader(&mut &buf[magic_end + 4..header_end])?;
|
||||
let height = client.get_block_info(&header.block_hash())?.height as u32;
|
||||
|
||||
Ok(Height::new(height))
|
||||
|
||||
@@ -27,7 +27,7 @@ pub(crate) fn peek_canonical(
|
||||
let mut header_buf = [0u8; HEADER_LEN];
|
||||
header_buf.copy_from_slice(&bytes[..HEADER_LEN]);
|
||||
xor_state.bytes(&mut header_buf, xor_bytes);
|
||||
let header = Header::consensus_decode(&mut &header_buf[..]).ok()?;
|
||||
let header = Header::consensus_decode_from_finite_reader(&mut &header_buf[..]).ok()?;
|
||||
let offset = canonical.offset_of(&BlockHash::from(header.block_hash()))?;
|
||||
Some((offset, header))
|
||||
}
|
||||
@@ -52,14 +52,20 @@ pub(crate) fn parse_canonical_body(
|
||||
let mut cursor = Cursor::new(bytes);
|
||||
cursor.set_position(HEADER_LEN as u64);
|
||||
|
||||
let tx_count = VarInt::consensus_decode(&mut cursor)?.0 as usize;
|
||||
// `consensus_decode_from_finite_reader` skips the `Take<R>` wrap
|
||||
// that `consensus_decode` applies to every nested field for
|
||||
// memory-safety — our cursor is already a bounded `Vec<u8>`, so
|
||||
// the extra wrapping is pure overhead. Per the crate docs it's
|
||||
// "marginally faster", but for a ~2000-tx block the per-field
|
||||
// compounding adds up.
|
||||
let tx_count = VarInt::consensus_decode_from_finite_reader(&mut cursor)?.0 as usize;
|
||||
let mut txdata = Vec::with_capacity(tx_count);
|
||||
let mut tx_metadata = Vec::with_capacity(tx_count);
|
||||
let mut tx_offsets = Vec::with_capacity(tx_count);
|
||||
for _ in 0..tx_count {
|
||||
let tx_start = cursor.position() as u32;
|
||||
tx_offsets.push(tx_start);
|
||||
let tx = Transaction::consensus_decode(&mut cursor)?;
|
||||
let tx = Transaction::consensus_decode_from_finite_reader(&mut cursor)?;
|
||||
let tx_len = cursor.position() as u32 - tx_start;
|
||||
txdata.push(tx);
|
||||
tx_metadata.push(BlkMetadata::new(metadata.position() + tx_start, tx_len));
|
||||
|
||||
@@ -135,6 +135,7 @@ fn read_and_dispatch(
|
||||
scan_bytes(
|
||||
&mut bytes,
|
||||
blk_index,
|
||||
0,
|
||||
xor_bytes,
|
||||
|metadata, block_bytes, xor_state| {
|
||||
if stop.get().is_some() {
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
//! Tail pipeline: single-threaded reverse scan of the newest blk
|
||||
//! files until every canonical hash is matched, then forward-emit
|
||||
//! with an inline chain check. Avoids the forward pipeline's
|
||||
//! bisection + out-of-order backoff (~2.7 GB of reads) for any
|
||||
//! tip-clustered catchup.
|
||||
//! files, reading each file in `TAIL_CHUNK`-sized slices from tail
|
||||
//! to head so we only touch bytes covering the canonical window.
|
||||
//! Matches fill offset slots and are emitted forward with an inline
|
||||
//! chain check.
|
||||
|
||||
use std::{fs, ops::ControlFlow};
|
||||
use std::{fs::File, ops::ControlFlow, os::unix::fs::FileExt};
|
||||
|
||||
use brk_error::{Error, Result};
|
||||
use brk_rpc::Client;
|
||||
@@ -18,6 +18,8 @@ use crate::{
|
||||
scan::scan_bytes,
|
||||
};
|
||||
|
||||
const TAIL_CHUNK: usize = 8 * 1024 * 1024;
|
||||
|
||||
pub(super) fn pipeline_tail(
|
||||
client: &Client,
|
||||
paths: &BlkIndexToBlkPath,
|
||||
@@ -34,7 +36,7 @@ pub(super) fn pipeline_tail(
|
||||
// miss doesn't scan the entire chain in reverse.
|
||||
let mut below_floor_streak: usize = 0;
|
||||
|
||||
for (&blk_index, path) in paths.iter().rev() {
|
||||
'files: for (&blk_index, path) in paths.iter().rev() {
|
||||
// If this file's first block is below the lowest still-missing
|
||||
// canonical height, we've walked past the window.
|
||||
if let Some(missing_idx) = slots.iter().position(Option::is_none)
|
||||
@@ -53,51 +55,85 @@ pub(super) fn pipeline_tail(
|
||||
}
|
||||
}
|
||||
|
||||
let mut bytes = fs::read(path)?;
|
||||
scan_bytes(
|
||||
&mut bytes,
|
||||
blk_index,
|
||||
xor_bytes,
|
||||
|metadata, block_bytes, xor_state| {
|
||||
let Some((offset, header)) =
|
||||
peek_canonical(block_bytes, xor_state, xor_bytes, canonical)
|
||||
else {
|
||||
return ControlFlow::Continue(());
|
||||
};
|
||||
if slots[offset as usize].is_some() {
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
let height = Height::from(*canonical.start + offset);
|
||||
match parse_canonical_body(
|
||||
block_bytes.to_vec(),
|
||||
metadata,
|
||||
xor_state,
|
||||
xor_bytes,
|
||||
height,
|
||||
header,
|
||||
) {
|
||||
Ok(block) => {
|
||||
slots[offset as usize] = Some(block);
|
||||
remaining -= 1;
|
||||
}
|
||||
Err(e) => {
|
||||
parse_failure = Some(e);
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
}
|
||||
if remaining == 0 {
|
||||
ControlFlow::Break(())
|
||||
} else {
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
if let Some(e) = parse_failure {
|
||||
return Err(e);
|
||||
let file = File::open(path)?;
|
||||
let file_len = file.metadata()?.len() as usize;
|
||||
if file_len == 0 {
|
||||
continue;
|
||||
}
|
||||
if remaining == 0 {
|
||||
break;
|
||||
|
||||
// Chunked reverse read. `end` is the file position we've
|
||||
// already covered (exclusive). Each iteration reads
|
||||
// [end - TAIL_CHUNK..end] and prepends it to any `spillover`
|
||||
// carried from the previous iteration — the pre-first-magic
|
||||
// bytes of that chunk, which must belong to a block that
|
||||
// started in this earlier region.
|
||||
let mut end = file_len;
|
||||
let mut spillover: Vec<u8> = Vec::new();
|
||||
|
||||
while end > 0 && remaining > 0 {
|
||||
let start = end.saturating_sub(TAIL_CHUNK);
|
||||
let chunk_len = end - start;
|
||||
let mut buf = vec![0u8; chunk_len + spillover.len()];
|
||||
file.read_exact_at(&mut buf[..chunk_len], start as u64)?;
|
||||
buf[chunk_len..].copy_from_slice(&spillover);
|
||||
spillover.clear();
|
||||
|
||||
// `buf` now represents file bytes [start..start + buf.len()].
|
||||
let result = scan_bytes(
|
||||
&mut buf,
|
||||
blk_index,
|
||||
start,
|
||||
xor_bytes,
|
||||
|metadata, block_bytes, xor_state| {
|
||||
let Some((offset, header)) =
|
||||
peek_canonical(block_bytes, xor_state, xor_bytes, canonical)
|
||||
else {
|
||||
return ControlFlow::Continue(());
|
||||
};
|
||||
if slots[offset as usize].is_some() {
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
let height = Height::from(*canonical.start + offset);
|
||||
match parse_canonical_body(
|
||||
block_bytes.to_vec(),
|
||||
metadata,
|
||||
xor_state,
|
||||
xor_bytes,
|
||||
height,
|
||||
header,
|
||||
) {
|
||||
Ok(block) => {
|
||||
slots[offset as usize] = Some(block);
|
||||
remaining -= 1;
|
||||
}
|
||||
Err(e) => {
|
||||
parse_failure = Some(e);
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
}
|
||||
if remaining == 0 {
|
||||
ControlFlow::Break(())
|
||||
} else {
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
if let Some(e) = parse_failure {
|
||||
return Err(e);
|
||||
}
|
||||
if remaining == 0 {
|
||||
break 'files;
|
||||
}
|
||||
|
||||
// Carry pre-first-magic bytes into the next (earlier)
|
||||
// chunk so a block that straddled this chunk's start is
|
||||
// stitched back together.
|
||||
end = start;
|
||||
if end > 0 {
|
||||
let prefix_len = result.first_magic.unwrap_or(buf.len());
|
||||
spillover.extend_from_slice(&buf[..prefix_len]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ const MAGIC_BYTES: [u8; 4] = [0xF9, 0xBE, 0xB4, 0xD9];
|
||||
|
||||
/// Returns the position **immediately after** the matched magic, or
|
||||
/// `None` if no match. Advances `xor_i` by the bytes consumed either
|
||||
/// way.
|
||||
/// way. First-byte fast-fail keeps the inner loop tight.
|
||||
pub(crate) fn find_magic(bytes: &[u8], xor_i: &mut XORIndex, xor_bytes: XORBytes) -> Option<usize> {
|
||||
let len = bytes.len();
|
||||
if len < MAGIC_BYTES.len() {
|
||||
@@ -42,36 +42,51 @@ pub(crate) fn find_magic(bytes: &[u8], xor_i: &mut XORIndex, xor_bytes: XORBytes
|
||||
None
|
||||
}
|
||||
|
||||
/// Scans `buf` (the full contents of one blk file) for blocks,
|
||||
/// calling `on_block` for each. The block bytes are passed as a
|
||||
/// mutable borrow so the callback can clone (to ship to a parser
|
||||
/// thread) or process in place (to peek the header).
|
||||
/// Position (relative to `buf`) of the first matched magic byte.
|
||||
/// Used by the chunked tail pipeline to carry pre-first-magic bytes
|
||||
/// into the next (earlier) chunk.
|
||||
pub(crate) struct ScanResult {
|
||||
pub first_magic: Option<usize>,
|
||||
}
|
||||
|
||||
/// Scans `buf` for blocks and calls `on_block` for each. `file_offset`
|
||||
/// is the absolute file position of `buf[0]` — used to seed the XOR
|
||||
/// phase and to report absolute `BlkPosition`s so the chunked tail
|
||||
/// pipeline can read mid-file slices.
|
||||
pub(crate) fn scan_bytes(
|
||||
buf: &mut [u8],
|
||||
blk_index: u16,
|
||||
file_offset: usize,
|
||||
xor_bytes: XORBytes,
|
||||
mut on_block: impl FnMut(BlkMetadata, &mut [u8], XORIndex) -> ControlFlow<()>,
|
||||
) {
|
||||
let mut xor_i = XORIndex::default();
|
||||
) -> ScanResult {
|
||||
let mut xor_i = XORIndex::at_offset(file_offset);
|
||||
let mut first_magic: Option<usize> = None;
|
||||
let mut i = 0;
|
||||
|
||||
while let Some(off) = find_magic(&buf[i..], &mut xor_i, xor_bytes) {
|
||||
first_magic.get_or_insert(i + off - MAGIC_BYTES.len());
|
||||
i += off;
|
||||
if i + 4 > buf.len() {
|
||||
return;
|
||||
break;
|
||||
}
|
||||
let mut size_bytes = [buf[i], buf[i + 1], buf[i + 2], buf[i + 3]];
|
||||
xor_i.bytes(&mut size_bytes, xor_bytes);
|
||||
let len = u32::from_le_bytes(size_bytes) as usize;
|
||||
i += 4;
|
||||
if i + len > buf.len() {
|
||||
return;
|
||||
break;
|
||||
}
|
||||
let metadata = BlkMetadata::new(BlkPosition::new(blk_index, i as u32), len as u32);
|
||||
let metadata = BlkMetadata::new(
|
||||
BlkPosition::new(blk_index, (file_offset + i) as u32),
|
||||
len as u32,
|
||||
);
|
||||
if on_block(metadata, &mut buf[i..i + len], xor_i).is_break() {
|
||||
return;
|
||||
break;
|
||||
}
|
||||
i += len;
|
||||
xor_i.add_assign(len);
|
||||
}
|
||||
|
||||
ScanResult { first_magic }
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user