mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
global: snapshot
This commit is contained in:
@@ -12,7 +12,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let indexer = Indexer::forced_import(&outputs_dir)?;
|
||||
|
||||
println!("{:?}", indexer.vecs.outputs.value.collect_range(0, 200));
|
||||
println!("{:?}", indexer.vecs.outputs.value.collect_range_at(0, 200));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -207,6 +207,6 @@ where
|
||||
} else if h + 1_u32 == starting_height {
|
||||
Some(I::from(index_to_else.len()))
|
||||
} else {
|
||||
height_to_index.collect_one(starting_height.to_usize())
|
||||
height_to_index.collect_one(starting_height)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -242,7 +242,7 @@ impl Stores {
|
||||
vecs: &mut Vecs,
|
||||
starting_indexes: &Indexes,
|
||||
) -> Result<()> {
|
||||
vecs.blocks.blockhash.for_each_range(
|
||||
vecs.blocks.blockhash.for_each_range_at(
|
||||
starting_indexes.height.to_usize(),
|
||||
vecs.blocks.blockhash.len(),
|
||||
|blockhash| {
|
||||
@@ -272,7 +272,7 @@ impl Stores {
|
||||
let start = starting_indexes.txindex.to_usize();
|
||||
let end = vecs.transactions.txid.len();
|
||||
let mut current_index = start;
|
||||
vecs.transactions.txid.for_each_range(start, end, |txid| {
|
||||
vecs.transactions.txid.for_each_range_at(start, end, |txid| {
|
||||
let txindex = TxIndex::from(current_index);
|
||||
let txidprefix = TxidPrefix::from(&txid);
|
||||
|
||||
@@ -303,7 +303,7 @@ impl Stores {
|
||||
let rollback_end = vecs.outputs.outputtype.len();
|
||||
|
||||
let txindexes: Vec<TxIndex> =
|
||||
vecs.outputs.txindex.collect_range(rollback_start, rollback_end);
|
||||
vecs.outputs.txindex.collect_range_at(rollback_start, rollback_end);
|
||||
|
||||
for (i, txoutindex) in (rollback_start..rollback_end).enumerate() {
|
||||
let outputtype = txoutindex_to_outputtype_reader.get(txoutindex);
|
||||
@@ -332,8 +332,8 @@ impl Stores {
|
||||
|
||||
let start = starting_indexes.txinindex.to_usize();
|
||||
let end = vecs.inputs.outpoint.len();
|
||||
let outpoints: Vec<OutPoint> = vecs.inputs.outpoint.collect_range(start, end);
|
||||
let spending_txindexes: Vec<TxIndex> = vecs.inputs.txindex.collect_range(start, end);
|
||||
let outpoints: Vec<OutPoint> = vecs.inputs.outpoint.collect_range_at(start, end);
|
||||
let spending_txindexes: Vec<TxIndex> = vecs.inputs.txindex.collect_range_at(start, end);
|
||||
|
||||
let outputs_to_unspend: Vec<_> = outpoints
|
||||
.into_iter()
|
||||
|
||||
@@ -250,8 +250,7 @@ impl AddressesVecs {
|
||||
) -> Result<Box<dyn Iterator<Item = AddressHash> + '_>> {
|
||||
macro_rules! make_iter {
|
||||
($height_vec:expr, $bytes_vec:expr) => {{
|
||||
let h = height.to_usize();
|
||||
match $height_vec.collect_one(h) {
|
||||
match $height_vec.collect_one(height) {
|
||||
Some(mut index) => {
|
||||
let reader = $bytes_vec.reader();
|
||||
Ok(Box::new(std::iter::from_fn(move || {
|
||||
|
||||
@@ -8,11 +8,12 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
homepage.workspace = true
|
||||
repository.workspace = true
|
||||
exclude = ["examples/"]
|
||||
|
||||
[dependencies]
|
||||
bitcoin = { workspace = true }
|
||||
brk_error = { workspace = true }
|
||||
brk_rpc = { workspace = true }
|
||||
brk_rpc = { workspace = true, features = ["corepc"] }
|
||||
brk_types = { workspace = true }
|
||||
crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] }
|
||||
derive_more = { workspace = true }
|
||||
|
||||
@@ -1,46 +1,59 @@
|
||||
# brk_reader
|
||||
|
||||
High-performance Bitcoin block reader from raw blk files.
|
||||
Streams Bitcoin blocks from Bitcoin Core's raw `blk*.dat` files in chain order.
|
||||
|
||||
## What It Enables
|
||||
## Requirements
|
||||
|
||||
Stream blocks directly from Bitcoin Core's `blk*.dat` files with parallel parsing, automatic XOR decoding, and chain-order delivery. Much faster than RPC for full-chain scans.
|
||||
A running Bitcoin Core node with RPC access. The reader needs:
|
||||
- The `blocks/` directory (for `blk*.dat` files)
|
||||
- RPC connection (to resolve block heights and filter orphan blocks)
|
||||
|
||||
## Key Features
|
||||
|
||||
- **Direct blk file access**: Bypasses RPC overhead entirely
|
||||
- **XOR decoding**: Handles Bitcoin Core's obfuscated block storage
|
||||
- **Parallel parsing**: Multi-threaded block deserialization
|
||||
- **Chain ordering**: Reorders out-of-sequence blocks before delivery
|
||||
- **Smart start finding**: Binary search to locate starting height across blk files
|
||||
- **Reorg detection**: Stops iteration on chain discontinuity
|
||||
|
||||
## Core API
|
||||
## Quick Start
|
||||
|
||||
```rust,ignore
|
||||
let reader = Reader::new(blocks_dir, &rpc_client);
|
||||
let bitcoin_dir = Client::default_bitcoin_path();
|
||||
let client = Client::new(
|
||||
Client::default_url(),
|
||||
Auth::CookieFile(bitcoin_dir.join(".cookie")),
|
||||
)?;
|
||||
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
|
||||
|
||||
// Stream blocks from height 800,000 to 850,000
|
||||
let receiver = reader.read(Some(Height::new(800_000)), Some(Height::new(850_000)));
|
||||
// Stream the entire chain
|
||||
for block in reader.read(None, None) {
|
||||
println!("{}: {}", block.height(), block.hash());
|
||||
}
|
||||
|
||||
for block in receiver {
|
||||
// Process block in chain order
|
||||
// Or a specific range (inclusive)
|
||||
for block in reader.read(Some(Height::new(800_000)), Some(Height::new(850_000))) {
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
## Architecture
|
||||
## What You Get
|
||||
|
||||
1. **File scanner**: Maps `blk*.dat` files to indices
|
||||
2. **Byte reader**: Streams raw bytes, finds magic bytes, segments blocks
|
||||
3. **Parser pool**: Parallel deserialization with rayon
|
||||
4. **Orderer**: Buffers and emits blocks in height order
|
||||
Each `ReadBlock` gives you access to:
|
||||
|
||||
## Performance
|
||||
| Field | Description |
|
||||
|-------|-------------|
|
||||
| `block.height()` | Block height |
|
||||
| `block.hash()` | Block hash |
|
||||
| `block.header` | Block header (timestamp, nonce, difficulty, ...) |
|
||||
| `block.txdata` | All transactions |
|
||||
| `block.coinbase_tag()` | Miner's coinbase tag |
|
||||
| `block.metadata()` | Position in the blk file |
|
||||
| `block.tx_metadata()` | Per-transaction blk file positions |
|
||||
|
||||
The parallel pipeline can saturate disk I/O while parsing on multiple cores. For recent blocks, falls back to RPC for lower latency.
|
||||
`Reader` is thread-safe and cheap to clone (Arc-backed).
|
||||
|
||||
## Built On
|
||||
## How It Works
|
||||
|
||||
- `brk_error` for error handling
|
||||
- `brk_rpc` for RPC client (height lookups, recent blocks)
|
||||
- `brk_types` for `Height`, `BlockHash`, `BlkPosition`, `BlkMetadata`
|
||||
Three-thread pipeline connected by bounded channels:
|
||||
|
||||
```text
|
||||
blk*.dat ──► File Reader ──► Parser Pool ──► Orderer ──► Receiver<ReadBlock>
|
||||
1 thread up to 4 1 thread
|
||||
```
|
||||
|
||||
1. **File reader** binary-searches to the starting blk file, scans for magic bytes, segments raw blocks
|
||||
2. **Parser pool** XOR-decodes and deserializes blocks in parallel, skips out-of-range blocks via header timestamp, filters orphans via RPC
|
||||
3. **Orderer** buffers out-of-order arrivals, validates `prev_blockhash` continuity, emits blocks sequentially
|
||||
|
||||
46
crates/brk_reader/examples/blk_heights.rs
Normal file
46
crates/brk_reader/examples/blk_heights.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use brk_error::Result;
|
||||
use brk_reader::Reader;
|
||||
use brk_rpc::{Auth, Client};
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let bitcoin_dir = Client::default_bitcoin_path();
|
||||
let client = Client::new(
|
||||
Client::default_url(),
|
||||
Auth::CookieFile(bitcoin_dir.join(".cookie")),
|
||||
)?;
|
||||
|
||||
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
|
||||
let xor_bytes = reader.xor_bytes();
|
||||
let blk_map = reader.blk_index_to_blk_path();
|
||||
|
||||
let mut prev_height: Option<u32> = None;
|
||||
let mut max_drop: u32 = 0;
|
||||
let mut max_drop_at: u16 = 0;
|
||||
|
||||
for (&blk_index, blk_path) in blk_map.iter() {
|
||||
match reader.get_first_block_height(blk_path, xor_bytes) {
|
||||
Ok(height) => {
|
||||
let h = u32::from(height);
|
||||
let drop = prev_height.map(|p| p.saturating_sub(h)).unwrap_or(0);
|
||||
if drop > max_drop {
|
||||
max_drop = drop;
|
||||
max_drop_at = blk_index;
|
||||
}
|
||||
let blocks_per_file = prev_height.map(|p| h.saturating_sub(p));
|
||||
println!(
|
||||
"blk{blk_index:05}.dat first_height={h:>7} gap={:>5} drop={drop}",
|
||||
blocks_per_file.map(|b| b.to_string()).unwrap_or("-".into()),
|
||||
);
|
||||
prev_height = Some(h);
|
||||
}
|
||||
Err(e) => {
|
||||
println!("blk{blk_index:05}.dat ERROR: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("\nMax backwards drop: {max_drop} blocks (at blk{max_drop_at:05}.dat)");
|
||||
println!("Files scanned: {}", blk_map.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2,10 +2,7 @@ use brk_error::Result;
|
||||
use brk_reader::Reader;
|
||||
use brk_rpc::{Auth, Client};
|
||||
|
||||
#[allow(clippy::needless_doctest_main)]
|
||||
fn main() -> Result<()> {
|
||||
let i = std::time::Instant::now();
|
||||
|
||||
let bitcoin_dir = Client::default_bitcoin_path();
|
||||
|
||||
let client = Client::new(
|
||||
@@ -13,32 +10,14 @@ fn main() -> Result<()> {
|
||||
Auth::CookieFile(bitcoin_dir.join(".cookie")),
|
||||
)?;
|
||||
|
||||
let blocks_dir = bitcoin_dir.join("blocks");
|
||||
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
|
||||
|
||||
let reader = Reader::new(blocks_dir, &client);
|
||||
|
||||
let start = None;
|
||||
// let start = Some(916037_u32.into());
|
||||
let end = None;
|
||||
reader.read(start, end).iter().for_each(|block| {
|
||||
// Stream all blocks
|
||||
let i = std::time::Instant::now();
|
||||
for block in reader.read(None, None) {
|
||||
println!("{}: {}", block.height(), block.hash());
|
||||
});
|
||||
|
||||
// let v = diff.iter().rev().take(10).collect::<Vec<_>>();
|
||||
|
||||
// let block_0 = parser.get(Height::new(0))?;
|
||||
// dbg!("{}", block_0.coinbase_tag());
|
||||
|
||||
// let block_158251 = parser.get(Height::new(158251))?;
|
||||
// dbg!("{}", block_158251.coinbase_tag());
|
||||
|
||||
// let block_173195 = parser.get(Height::new(173195))?;
|
||||
// dbg!("{}", block_173195.coinbase_tag());
|
||||
|
||||
// let block_840_000 = parser.get(Height::new(840_004))?;
|
||||
// dbg!("{}", block_840_000.coinbase_tag());
|
||||
|
||||
dbg!(i.elapsed());
|
||||
}
|
||||
println!("Full read: {:?}", i.elapsed());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
35
crates/brk_reader/examples/reader_single.rs
Normal file
35
crates/brk_reader/examples/reader_single.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use brk_error::Result;
|
||||
use brk_reader::Reader;
|
||||
use brk_rpc::{Auth, Client};
|
||||
use brk_types::Height;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let bitcoin_dir = Client::default_bitcoin_path();
|
||||
|
||||
let client = Client::new(
|
||||
Client::default_url(),
|
||||
Auth::CookieFile(bitcoin_dir.join(".cookie")),
|
||||
)?;
|
||||
|
||||
let reader = Reader::new(bitcoin_dir.join("blocks"), &client);
|
||||
|
||||
let heights = [0, 100_000, 158_251, 173_195, 840_000];
|
||||
|
||||
for &h in &heights {
|
||||
let height = Height::new(h);
|
||||
let i = std::time::Instant::now();
|
||||
|
||||
if let Some(block) = reader.read(Some(height), Some(height)).iter().next() {
|
||||
println!(
|
||||
"height={} hash={} txs={} coinbase=\"{}\" ({:?})",
|
||||
block.height(),
|
||||
block.hash(),
|
||||
block.txdata.len(),
|
||||
block.coinbase_tag(),
|
||||
i.elapsed(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable, io::Cursor};
|
||||
use brk_error::Result;
|
||||
use brk_rpc::Client;
|
||||
use brk_types::{BlkMetadata, Block, Height, ReadBlock};
|
||||
|
||||
use crate::{XORBytes, XORIndex};
|
||||
|
||||
pub enum AnyBlock {
|
||||
Raw(Vec<u8>),
|
||||
Decoded(ReadBlock),
|
||||
Skipped,
|
||||
}
|
||||
|
||||
impl AnyBlock {
|
||||
pub fn decode(
|
||||
self,
|
||||
metadata: BlkMetadata,
|
||||
client: &Client,
|
||||
mut xor_i: XORIndex,
|
||||
xor_bytes: XORBytes,
|
||||
start: Option<Height>,
|
||||
end: Option<Height>,
|
||||
) -> Result<Self> {
|
||||
let mut bytes = match self {
|
||||
AnyBlock::Raw(bytes) => bytes,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
xor_i.bytes(bytes.as_mut_slice(), xor_bytes);
|
||||
|
||||
let mut cursor = Cursor::new(bytes);
|
||||
|
||||
let header = Header::consensus_decode(&mut cursor)?;
|
||||
|
||||
let hash = header.block_hash();
|
||||
|
||||
let tx_count = VarInt::consensus_decode(&mut cursor)?.0;
|
||||
|
||||
let Ok(block_header_result) = client.get_block_header_info(&hash) else {
|
||||
return Ok(Self::Skipped);
|
||||
};
|
||||
|
||||
let height = Height::from(block_header_result.height);
|
||||
|
||||
if let Some(start) = start
|
||||
&& start > height
|
||||
{
|
||||
return Ok(Self::Skipped);
|
||||
} else if let Some(end) = end
|
||||
&& end < height
|
||||
{
|
||||
return Ok(Self::Skipped);
|
||||
} else if block_header_result.confirmations <= 0 {
|
||||
return Ok(Self::Skipped);
|
||||
}
|
||||
|
||||
let mut txdata = Vec::with_capacity(tx_count as usize);
|
||||
let mut tx_metadata = Vec::with_capacity(tx_count as usize);
|
||||
for _ in 0..tx_count {
|
||||
let offset = cursor.position() as u32;
|
||||
let position = metadata.position() + offset;
|
||||
let tx = Transaction::consensus_decode(&mut cursor)?;
|
||||
txdata.push(tx);
|
||||
let len = cursor.position() as u32 - offset;
|
||||
tx_metadata.push(BlkMetadata::new(position, len));
|
||||
}
|
||||
|
||||
let block = bitcoin::Block { header, txdata };
|
||||
let block = Block::from((height, hash, block));
|
||||
let block = ReadBlock::from((block, metadata, tx_metadata));
|
||||
|
||||
Ok(Self::Decoded(block))
|
||||
}
|
||||
}
|
||||
@@ -17,28 +17,16 @@ impl BlkIndexToBlkPath {
|
||||
Self(
|
||||
fs::read_dir(blocks_dir)
|
||||
.unwrap()
|
||||
.map(|entry| entry.unwrap().path())
|
||||
.filter(|path| {
|
||||
let is_file = path.is_file();
|
||||
.filter_map(|entry| {
|
||||
let path = entry.unwrap().path();
|
||||
let file_name = path.file_name()?.to_str()?;
|
||||
|
||||
if is_file {
|
||||
let file_name = path.file_name().unwrap().to_str().unwrap();
|
||||
let index_str = file_name.strip_prefix(BLK)?.strip_suffix(DOT_DAT)?;
|
||||
let blk_index = index_str.parse::<u16>().ok()?;
|
||||
|
||||
file_name.starts_with(BLK) && file_name.ends_with(DOT_DAT)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
path.is_file().then_some((blk_index, path))
|
||||
})
|
||||
.map(|path| {
|
||||
let file_name = path.file_name().unwrap().to_str().unwrap();
|
||||
|
||||
let blk_index = file_name[BLK.len()..(file_name.len() - DOT_DAT.len())]
|
||||
.parse::<u16>()
|
||||
.unwrap();
|
||||
|
||||
(blk_index, path)
|
||||
})
|
||||
.collect::<BTreeMap<_, _>>(),
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
73
crates/brk_reader/src/decode.rs
Normal file
73
crates/brk_reader/src/decode.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable, io::Cursor};
|
||||
use brk_error::Result;
|
||||
use brk_rpc::Client;
|
||||
use brk_types::{BlkMetadata, Block, Height, ReadBlock};
|
||||
|
||||
use crate::{XORBytes, XORIndex};
|
||||
|
||||
/// Margin for timestamp non-monotonicity
|
||||
const TIMESTAMP_MARGIN: u32 = 3 * 3600;
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn decode_block(
|
||||
mut bytes: Vec<u8>,
|
||||
metadata: BlkMetadata,
|
||||
client: &Client,
|
||||
mut xor_i: XORIndex,
|
||||
xor_bytes: XORBytes,
|
||||
start: Option<Height>,
|
||||
end: Option<Height>,
|
||||
start_time: u32,
|
||||
end_time: u32,
|
||||
) -> Result<Option<ReadBlock>> {
|
||||
xor_i.bytes(bytes.as_mut_slice(), xor_bytes);
|
||||
|
||||
let mut cursor = Cursor::new(bytes);
|
||||
|
||||
let header = Header::consensus_decode(&mut cursor)?;
|
||||
|
||||
// Skip blocks clearly outside the target range using header timestamp
|
||||
if header.time < start_time.saturating_sub(TIMESTAMP_MARGIN)
|
||||
|| (end_time > 0 && header.time > end_time.saturating_add(TIMESTAMP_MARGIN))
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let hash = header.block_hash();
|
||||
|
||||
let Ok(block_header_result) = client.get_block_header_info(&hash) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let height = Height::from(block_header_result.height);
|
||||
|
||||
if start.is_some_and(|s| s > height) || end.is_some_and(|e| e < height) {
|
||||
return Ok(None);
|
||||
}
|
||||
if block_header_result.confirmations <= 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let tx_count = VarInt::consensus_decode(&mut cursor)?.0 as usize;
|
||||
|
||||
let mut txdata = Vec::with_capacity(tx_count);
|
||||
let mut tx_metadata = Vec::with_capacity(tx_count);
|
||||
let mut tx_offsets = Vec::with_capacity(tx_count);
|
||||
for _ in 0..tx_count {
|
||||
let offset = cursor.position() as u32;
|
||||
tx_offsets.push(offset);
|
||||
let position = metadata.position() + offset;
|
||||
let tx = Transaction::consensus_decode(&mut cursor)?;
|
||||
txdata.push(tx);
|
||||
let len = cursor.position() as u32 - offset;
|
||||
tx_metadata.push(BlkMetadata::new(position, len));
|
||||
}
|
||||
|
||||
let block_bytes = cursor.into_inner();
|
||||
|
||||
let block = bitcoin::Block { header, txdata };
|
||||
let mut block = Block::from((height, hash, block));
|
||||
block.set_raw_data(block_bytes, tx_offsets);
|
||||
|
||||
Ok(Some(ReadBlock::from((block, metadata, tx_metadata))))
|
||||
}
|
||||
@@ -4,12 +4,10 @@ use std::{
|
||||
collections::BTreeMap,
|
||||
fs::{self, File},
|
||||
io::{Read, Seek, SeekFrom},
|
||||
mem,
|
||||
ops::ControlFlow,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use bitcoin::{block::Header, consensus::Decodable};
|
||||
@@ -24,18 +22,30 @@ use parking_lot::{RwLock, RwLockReadGuard};
|
||||
use rayon::prelude::*;
|
||||
use tracing::error;
|
||||
|
||||
mod any_block;
|
||||
mod blk_index_to_blk_path;
|
||||
mod decode;
|
||||
mod xor_bytes;
|
||||
mod xor_index;
|
||||
|
||||
use any_block::*;
|
||||
use decode::*;
|
||||
pub use xor_bytes::*;
|
||||
pub use xor_index::*;
|
||||
|
||||
const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217];
|
||||
const BOUND_CAP: usize = 50;
|
||||
|
||||
fn find_magic(bytes: &[u8], xor_i: &mut XORIndex, xor_bytes: XORBytes) -> Option<usize> {
|
||||
let mut window = [0u8; 4];
|
||||
for (i, &b) in bytes.iter().enumerate() {
|
||||
window.rotate_left(1);
|
||||
window[3] = xor_i.byte(b, xor_bytes);
|
||||
if window == MAGIC_BYTES {
|
||||
return Some(i + 1);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
///
|
||||
/// Bitcoin BLK file reader
|
||||
///
|
||||
@@ -107,11 +117,9 @@ impl ReaderInner {
|
||||
Ok(buffer)
|
||||
}
|
||||
|
||||
/// Returns a crossbeam channel receiver that streams `ReadBlock`s in chain order.
|
||||
///
|
||||
/// Returns a crossbeam channel receiver that receives `Block` from an **inclusive** range (`start` and `end`)
|
||||
///
|
||||
/// For an example checkout `./main.rs`
|
||||
///
|
||||
/// Both `start` and `end` are inclusive. `None` means unbounded.
|
||||
pub fn read(&self, start: Option<Height>, end: Option<Height>) -> Receiver<ReadBlock> {
|
||||
let client = self.client.clone();
|
||||
|
||||
@@ -128,6 +136,18 @@ impl ReaderInner {
|
||||
.find_start_blk_index(start, &blk_index_to_blk_path, xor_bytes)
|
||||
.unwrap_or_default();
|
||||
|
||||
let get_block_time = |h: Height| -> u32 {
|
||||
self.client
|
||||
.get_block_hash(*h as u64)
|
||||
.ok()
|
||||
.and_then(|hash| self.client.get_block_header(&hash).ok())
|
||||
.map(|h| h.time)
|
||||
.unwrap_or(0)
|
||||
};
|
||||
|
||||
let start_time = start.filter(|h| **h > 0).map(&get_block_time).unwrap_or(0);
|
||||
let end_time = end.map(&get_block_time).unwrap_or(0);
|
||||
|
||||
thread::spawn(move || {
|
||||
let _ = blk_index_to_blk_path.range(first_blk_index..).try_for_each(
|
||||
move |(blk_index, blk_path)| {
|
||||
@@ -135,29 +155,19 @@ impl ReaderInner {
|
||||
|
||||
let blk_index = *blk_index;
|
||||
|
||||
let mut blk_bytes_ = fs::read(blk_path).unwrap();
|
||||
let Ok(mut blk_bytes_) = fs::read(blk_path) else {
|
||||
error!("Failed to read blk file: {}", blk_path.display());
|
||||
return ControlFlow::Break(());
|
||||
};
|
||||
let blk_bytes = blk_bytes_.as_mut_slice();
|
||||
let blk_bytes_len = blk_bytes.len();
|
||||
|
||||
let mut current_4bytes = [0; 4];
|
||||
|
||||
let mut i = 0;
|
||||
|
||||
'parent: loop {
|
||||
loop {
|
||||
if i == blk_bytes_len {
|
||||
break 'parent;
|
||||
}
|
||||
|
||||
current_4bytes.rotate_left(1);
|
||||
|
||||
current_4bytes[3] = xor_i.byte(blk_bytes[i], xor_bytes);
|
||||
i += 1;
|
||||
|
||||
if current_4bytes == MAGIC_BYTES {
|
||||
break;
|
||||
}
|
||||
}
|
||||
loop {
|
||||
let Some(offset) = find_magic(&blk_bytes[i..], &mut xor_i, xor_bytes)
|
||||
else {
|
||||
break;
|
||||
};
|
||||
i += offset;
|
||||
|
||||
let len = u32::from_le_bytes(
|
||||
xor_i
|
||||
@@ -172,10 +182,7 @@ impl ReaderInner {
|
||||
|
||||
let block_bytes = (blk_bytes[i..(i + len)]).to_vec();
|
||||
|
||||
if send_bytes
|
||||
.send((metadata, AnyBlock::Raw(block_bytes), xor_i))
|
||||
.is_err()
|
||||
{
|
||||
if send_bytes.send((metadata, block_bytes, xor_i)).is_err() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
|
||||
@@ -189,47 +196,27 @@ impl ReaderInner {
|
||||
});
|
||||
|
||||
thread::spawn(move || {
|
||||
let xor_bytes = xor_bytes;
|
||||
|
||||
let mut bulk = vec![];
|
||||
|
||||
// Private pool to prevent collision with the global pool
|
||||
// Without it there can be hanging
|
||||
let parser_pool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(thread::available_parallelism().unwrap().get() / 2)
|
||||
.num_threads(4.min(thread::available_parallelism().unwrap().get() / 2))
|
||||
.build()
|
||||
.expect("Failed to create parser thread pool");
|
||||
|
||||
let drain_and_send = |bulk: &mut Vec<(BlkMetadata, AnyBlock, XORIndex)>| {
|
||||
parser_pool.install(|| {
|
||||
mem::take(bulk)
|
||||
.into_par_iter()
|
||||
.try_for_each(|(metdata, any_block, xor_i)| {
|
||||
if let Ok(AnyBlock::Decoded(block)) =
|
||||
any_block.decode(metdata, &client, xor_i, xor_bytes, start, end)
|
||||
parser_pool.install(|| {
|
||||
let _ =
|
||||
recv_bytes
|
||||
.into_iter()
|
||||
.par_bridge()
|
||||
.try_for_each(|(metadata, bytes, xor_i)| {
|
||||
if let Ok(Some(block)) =
|
||||
decode_block(bytes, metadata, &client, xor_i, xor_bytes, start, end, start_time, end_time)
|
||||
&& send_block.send(block).is_err()
|
||||
{
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
ControlFlow::Continue(())
|
||||
})
|
||||
})
|
||||
};
|
||||
|
||||
recv_bytes.iter().try_for_each(|tuple| {
|
||||
bulk.push(tuple);
|
||||
|
||||
if bulk.len() < BOUND_CAP / 2 {
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
|
||||
while send_block.len() >= bulk.len() {
|
||||
thread::sleep(Duration::from_micros(100));
|
||||
}
|
||||
drain_and_send(&mut bulk)
|
||||
})?;
|
||||
|
||||
drain_and_send(&mut bulk)
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
thread::spawn(move || {
|
||||
@@ -272,7 +259,7 @@ impl ReaderInner {
|
||||
|
||||
current_height.increment();
|
||||
|
||||
if end.is_some_and(|end| end == current_height) {
|
||||
if end.is_some_and(|end| current_height > end) {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
}
|
||||
@@ -302,14 +289,11 @@ impl ReaderInner {
|
||||
.keys()
|
||||
.rev()
|
||||
.nth(2)
|
||||
.cloned()
|
||||
.copied()
|
||||
.unwrap_or_default());
|
||||
}
|
||||
|
||||
let blk_indices: Vec<u16> = blk_index_to_blk_path
|
||||
.range(0..)
|
||||
.map(|(idx, _)| *idx)
|
||||
.collect();
|
||||
let blk_indices: Vec<u16> = blk_index_to_blk_path.keys().copied().collect();
|
||||
|
||||
if blk_indices.is_empty() {
|
||||
return Ok(0);
|
||||
@@ -328,9 +312,6 @@ impl ReaderInner {
|
||||
Ok(height) => {
|
||||
if height <= target_start {
|
||||
best_start_idx = mid;
|
||||
if mid == usize::MAX {
|
||||
break;
|
||||
}
|
||||
left = mid + 1;
|
||||
} else {
|
||||
if mid == 0 {
|
||||
@@ -340,9 +321,6 @@ impl ReaderInner {
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
if mid == usize::MAX {
|
||||
break;
|
||||
}
|
||||
left = mid + 1;
|
||||
}
|
||||
}
|
||||
@@ -357,35 +335,23 @@ impl ReaderInner {
|
||||
Ok(blk_indices.get(final_idx).copied().unwrap_or(0))
|
||||
}
|
||||
|
||||
fn get_first_block_height(&self, blk_path: &PathBuf, xor_bytes: XORBytes) -> Result<Height> {
|
||||
pub fn get_first_block_height(&self, blk_path: &PathBuf, xor_bytes: XORBytes) -> Result<Height> {
|
||||
let mut file = File::open(blk_path)?;
|
||||
let mut buf = [0u8; 4096];
|
||||
let n = file.read(&mut buf)?;
|
||||
|
||||
let mut xor_i = XORIndex::default();
|
||||
let mut current_4bytes = [0; 4];
|
||||
let mut byte_buffer = [0u8; 1];
|
||||
let magic_end = find_magic(&buf[..n], &mut xor_i, xor_bytes)
|
||||
.ok_or_else(|| Error::NotFound("No magic bytes found".into()))?;
|
||||
|
||||
loop {
|
||||
if file.read_exact(&mut byte_buffer).is_err() {
|
||||
return Err(Error::NotFound("No magic bytes found".into()));
|
||||
}
|
||||
let size_end = magic_end + 4;
|
||||
xor_i.bytes(&mut buf[magic_end..size_end], xor_bytes);
|
||||
|
||||
current_4bytes.rotate_left(1);
|
||||
current_4bytes[3] = xor_i.byte(byte_buffer[0], xor_bytes);
|
||||
let header_end = size_end + 80;
|
||||
xor_i.bytes(&mut buf[size_end..header_end], xor_bytes);
|
||||
|
||||
if current_4bytes == MAGIC_BYTES {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let mut size_bytes = [0u8; 4];
|
||||
file.read_exact(&mut size_bytes)?;
|
||||
let _block_size =
|
||||
u32::from_le_bytes(xor_i.bytes(&mut size_bytes, xor_bytes).try_into().unwrap());
|
||||
|
||||
let mut header_bytes = [0u8; 80];
|
||||
file.read_exact(&mut header_bytes)?;
|
||||
xor_i.bytes(&mut header_bytes, xor_bytes);
|
||||
|
||||
let header = Header::consensus_decode(&mut std::io::Cursor::new(&header_bytes))?;
|
||||
let header =
|
||||
Header::consensus_decode(&mut std::io::Cursor::new(&buf[size_end..header_end]))?;
|
||||
|
||||
let height = self.client.get_block_info(&header.block_hash())?.height as u32;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user