parser: rework, made stateless

This commit is contained in:
nym21
2025-09-17 23:31:57 +02:00
parent 9524eafea1
commit cc5701ea62
22 changed files with 795 additions and 636 deletions
+49 -6
View File
@@ -1,16 +1,27 @@
use bitcoin::{Block, consensus::Decodable, io::Cursor};
use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable, io::Cursor};
use bitcoincore_rpc::RpcApi;
use brk_error::Result;
use brk_structs::{Block, BlockPosition, Height, ParsedBlock};
use crate::{XORBytes, XORIndex};
pub enum AnyBlock {
Raw(Vec<u8>),
Decoded(Block),
Decoded(ParsedBlock),
Skipped,
}
impl AnyBlock {
pub fn decode(&mut self, xor_i: &mut XORIndex, xor_bytes: &XORBytes) {
let bytes = match self {
pub fn decode(
self,
position: BlockPosition,
rpc: &'static bitcoincore_rpc::Client,
mut xor_i: XORIndex,
xor_bytes: &XORBytes,
start: Option<Height>,
end: Option<Height>,
) -> Result<Self> {
let mut bytes = match self {
AnyBlock::Raw(bytes) => bytes,
_ => unreachable!(),
};
@@ -19,8 +30,40 @@ impl AnyBlock {
let mut cursor = Cursor::new(bytes);
let block = Block::consensus_decode(&mut cursor).unwrap();
let header = Header::consensus_decode(&mut cursor)?;
*self = AnyBlock::Decoded(block);
let hash = header.block_hash();
let tx_count = VarInt::consensus_decode(&mut cursor)?.0;
let Ok(block_header_result) = rpc.get_block_header_info(&hash) else {
return Ok(Self::Skipped);
};
let height = Height::from(block_header_result.height);
if let Some(start) = start
&& start > height
{
return Ok(Self::Skipped);
} else if let Some(end) = end
&& end < height
{
return Ok(Self::Skipped);
} else if block_header_result.confirmations <= 0 {
return Ok(Self::Skipped);
}
let mut txdata = Vec::with_capacity(tx_count as usize);
for _ in 0..tx_count {
let tx = Transaction::consensus_decode(&mut cursor)?;
txdata.push(tx);
}
let block = bitcoin::Block { header, txdata };
let block = Block::from((height, hash, block));
let block = ParsedBlock::from((block, position));
Ok(Self::Decoded(block))
}
}
@@ -7,7 +7,7 @@ use std::{
use derive_deref::{Deref, DerefMut};
const BLK: &str = "blk";
const DAT: &str = ".dat";
const DOT_DAT: &str = ".dat";
#[derive(Debug, Deref, DerefMut)]
pub struct BlkIndexToBlkPath(BTreeMap<u16, PathBuf>);
@@ -24,7 +24,7 @@ impl BlkIndexToBlkPath {
if is_file {
let file_name = path.file_name().unwrap().to_str().unwrap();
file_name.starts_with(BLK) && file_name.ends_with(DAT)
file_name.starts_with(BLK) && file_name.ends_with(DOT_DAT)
} else {
false
}
@@ -32,7 +32,7 @@ impl BlkIndexToBlkPath {
.map(|path| {
let file_name = path.file_name().unwrap().to_str().unwrap();
let blk_index = file_name[BLK.len()..(file_name.len() - DAT.len())]
let blk_index = file_name[BLK.len()..(file_name.len() - DOT_DAT.len())]
.parse::<u16>()
.unwrap();
@@ -1,107 +0,0 @@
use std::{
collections::{BTreeMap, BTreeSet},
fs::File,
io::{BufReader, BufWriter},
path::{Path, PathBuf},
};
use crate::{BlkIndexToBlkPath, Height, blk_recap::BlkRecap};
#[derive(Debug)]
pub struct BlkIndexToBlkRecap {
pub path: PathBuf,
pub tree: BTreeMap<u16, BlkRecap>,
}
impl BlkIndexToBlkRecap {
pub fn import(
outputs_dir: &Path,
blk_index_to_blk_path: &BlkIndexToBlkPath,
start: Option<Height>,
) -> (Self, u16) {
let path = outputs_dir.join("blk_index_to_blk_recap.json");
let tree = {
if let Ok(file) = File::open(&path) {
let reader = BufReader::new(file);
serde_json::from_reader(reader).unwrap_or_default()
} else {
BTreeMap::default()
}
};
let mut slf = Self { path, tree };
let min_removed = slf.clean_outdated(blk_index_to_blk_path);
let blk_index = slf.get_start_recap(min_removed, start);
(slf, blk_index)
}
fn clean_outdated(&mut self, blk_index_to_blk_path: &BlkIndexToBlkPath) -> Option<u16> {
let mut min_removed_blk_index: Option<u16> = None;
let mut unprocessed_keys = self.tree.keys().copied().collect::<BTreeSet<_>>();
blk_index_to_blk_path
.iter()
.for_each(|(blk_index, blk_path)| {
unprocessed_keys.remove(blk_index);
if let Some(blk_recap) = self.tree.get(blk_index)
&& blk_recap.has_different_modified_time(blk_path)
{
self.tree.remove(blk_index).unwrap();
if min_removed_blk_index.is_none_or(|_blk_index| *blk_index < _blk_index) {
min_removed_blk_index.replace(*blk_index);
}
}
});
unprocessed_keys.into_iter().for_each(|blk_index| {
self.tree.remove(&blk_index).unwrap();
if min_removed_blk_index.is_none_or(|_blk_index| blk_index < _blk_index) {
min_removed_blk_index.replace(blk_index);
}
});
min_removed_blk_index
}
pub fn get_start_recap(&mut self, min_removed: Option<u16>, start: Option<Height>) -> u16 {
if start.is_none() {
return 0;
}
let height = start.unwrap();
let mut start = None;
if let Some(found) = self
.tree
.iter()
.find(|(_, recap)| recap.max_height >= height)
{
start = Some(*found.0);
}
if let Some(min_removed) = min_removed
&& start.is_none_or(|start| start > min_removed)
{
start = Some(min_removed);
}
// Should only be none if asking for a too high start
start.unwrap_or_else(|| self.tree.last_key_value().map_or(0, |(i, _)| *i))
}
pub fn export(&self) {
let file = File::create(&self.path).unwrap_or_else(|e| {
dbg!(e);
dbg!(&self.path);
panic!("Cannot write file");
});
serde_json::to_writer(&mut BufWriter::new(file), &self.tree).unwrap();
}
}
-18
View File
@@ -1,18 +0,0 @@
use std::path::Path;
use crate::path_to_modified_time;
#[derive(Debug, Clone, Copy)]
pub struct BlkMetadata {
pub index: u16,
pub modified_time: u64,
}
impl BlkMetadata {
pub fn new(index: u16, path: &Path) -> Self {
Self {
index,
modified_time: path_to_modified_time(path),
}
}
}
-19
View File
@@ -1,19 +0,0 @@
use std::path::Path;
use brk_structs::Height;
use serde::{Deserialize, Serialize};
use crate::path_to_modified_time;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[repr(C)]
pub struct BlkRecap {
pub max_height: Height,
pub modified_time: u64,
}
impl BlkRecap {
pub fn has_different_modified_time(&self, blk_path: &Path) -> bool {
self.modified_time != path_to_modified_time(blk_path)
}
}
+167 -124
View File
@@ -1,28 +1,29 @@
#![doc = include_str!("../README.md")]
use std::{cmp::Ordering, collections::BTreeMap, fs, ops::ControlFlow, path::PathBuf, thread};
use std::{
collections::BTreeMap,
fs::{self, File},
io::Read,
mem,
ops::ControlFlow,
path::PathBuf,
thread,
};
use bitcoin::BlockHash;
use bitcoin::{block::Header, consensus::Decodable};
use bitcoincore_rpc::RpcApi;
use blk_index_to_blk_path::*;
use blk_recap::BlkRecap;
use brk_structs::{Block, Height};
use brk_error::Result;
use brk_structs::{Block, BlockPosition, Height, ParsedBlock};
use crossbeam::channel::{Receiver, bounded};
use rayon::prelude::*;
mod any_block;
mod blk_index_to_blk_path;
mod blk_index_to_blk_recap;
mod blk_metadata;
mod blk_recap;
mod utils;
mod xor_bytes;
mod xor_index;
use any_block::*;
use blk_index_to_blk_recap::*;
use blk_metadata::*;
use utils::*;
use xor_bytes::*;
use xor_index::*;
@@ -33,41 +34,29 @@ const BOUND_CAP: usize = 50;
pub struct Parser {
blocks_dir: PathBuf,
outputs_dir: Option<PathBuf>,
rpc: &'static bitcoincore_rpc::Client,
}
impl Parser {
pub fn new(
blocks_dir: PathBuf,
outputs_dir: Option<PathBuf>,
rpc: &'static bitcoincore_rpc::Client,
) -> Self {
Self {
blocks_dir,
outputs_dir,
rpc,
}
pub fn new(blocks_dir: PathBuf, rpc: &'static bitcoincore_rpc::Client) -> Self {
Self { blocks_dir, rpc }
}
// pub fn get(&self, height: Height) -> Block {
// self.parse(Some(height), Some(height))
// .iter()
// .next()
// .unwrap()
// .1
// }
pub fn get(&self, height: Height) -> Result<Block> {
Ok((
height,
self.rpc
.get_block(&self.rpc.get_block_hash(height.into())?)?,
)
.into())
}
///
/// Returns a crossbeam channel receiver that receives `(Height, Block, BlockHash)` tuples from an **inclusive** range (`start` and `end`)
/// Returns a crossbeam channel receiver that receives `Block` from an **inclusive** range (`start` and `end`)
///
/// For an example checkout `./main.rs`
///
pub fn parse(
&self,
start: Option<Height>,
end: Option<Height>,
) -> Receiver<(Height, bitcoin::Block, BlockHash)> {
pub fn parse(&self, start: Option<Height>, end: Option<Height>) -> Receiver<ParsedBlock> {
let blocks_dir = self.blocks_dir.as_path();
let rpc = self.rpc;
@@ -77,25 +66,19 @@ impl Parser {
let blk_index_to_blk_path = BlkIndexToBlkPath::scan(blocks_dir);
let (mut blk_index_to_blk_recap, blk_index) = BlkIndexToBlkRecap::import(
self.outputs_dir.as_ref().unwrap(),
&blk_index_to_blk_path,
start,
);
let xor_bytes = XORBytes::from(blocks_dir);
thread::spawn(move || {
let xor_bytes = xor_bytes;
let first_blk_index = self
.find_start_blk_index(start, &blk_index_to_blk_path, &xor_bytes)
.unwrap_or_default();
let _ = blk_index_to_blk_path.range(blk_index..).try_for_each(
thread::spawn(move || {
let _ = blk_index_to_blk_path.range(first_blk_index..).try_for_each(
move |(blk_index, blk_path)| {
let mut xor_i = XORIndex::default();
let blk_index = *blk_index;
let blk_metadata = BlkMetadata::new(blk_index, blk_path.as_path());
let mut blk_bytes_ = fs::read(blk_path).unwrap();
let blk_bytes = blk_bytes_.as_mut_slice();
let blk_bytes_len = blk_bytes.len();
@@ -120,18 +103,27 @@ impl Parser {
}
}
let len = u32::from_le_bytes(
let deser_len = u32::from_le_bytes(
xor_i
.bytes(&mut blk_bytes[i..(i + 4)], &xor_bytes)
.try_into()
.unwrap(),
) as usize;
);
let len = deser_len as usize;
i += 4;
let block_bytes = (blk_bytes[i..(i + len)]).to_vec();
if send_bytes
.send((blk_metadata, AnyBlock::Raw(block_bytes), xor_i))
.send((
BlockPosition {
blk_index,
offset: i,
len: deser_len,
},
AnyBlock::Raw(block_bytes),
xor_i,
))
.is_err()
{
return ControlFlow::Break(());
@@ -151,24 +143,20 @@ impl Parser {
let mut bulk = vec![];
let drain_and_send = |bulk: &mut Vec<_>| {
let drain_and_send = |bulk: &mut Vec<(BlockPosition, AnyBlock, XORIndex)>| {
// Using a vec and sending after to not end up with stuck threads in par iter
bulk.par_iter_mut().for_each(|(_, any_block, xor_i)| {
AnyBlock::decode(any_block, xor_i, &xor_bytes);
});
mem::take(bulk)
.into_par_iter()
.try_for_each(|(position, any_block, xor_i)| {
if let Ok(AnyBlock::Decoded(block)) =
any_block.decode(position, rpc, xor_i, &xor_bytes, start, end)
&& send_block.send(block).is_err()
{
return ControlFlow::Break(());
}
bulk.drain(..).try_for_each(|(blk_metadata, any_block, _)| {
let block = match any_block {
AnyBlock::Decoded(block) => block,
_ => unreachable!(),
};
if send_block.send((blk_metadata, block)).is_err() {
return ControlFlow::Break(());
}
ControlFlow::Continue(())
})
ControlFlow::Continue(())
})
};
recv_bytes.iter().try_for_each(|tuple| {
@@ -192,83 +180,138 @@ impl Parser {
let _ = recv_block
.iter()
.try_for_each(|(blk_metadata, block)| -> ControlFlow<(), _> {
let hash = block.block_hash();
let header = rpc.get_block_header_info(&hash);
if header.is_err() {
return ControlFlow::Continue(());
}
let header = header.unwrap();
if header.confirmations <= 0 {
return ControlFlow::Continue(());
}
let height = Height::from(header.height);
let len = blk_index_to_blk_recap.tree.len();
if blk_metadata.index == len as u16 || blk_metadata.index + 1 == len as u16 {
match (len as u16).cmp(&blk_metadata.index) {
Ordering::Equal => {
if len % 21 == 0 {
blk_index_to_blk_recap.export();
}
}
Ordering::Less => panic!(),
Ordering::Greater => {}
}
blk_index_to_blk_recap
.tree
.entry(blk_metadata.index)
.and_modify(|recap| {
if recap.max_height < height {
recap.max_height = height;
}
})
.or_insert(BlkRecap {
max_height: height,
modified_time: blk_metadata.modified_time,
});
}
let mut opt = if current_height == height {
Some((block, hash))
.try_for_each(|block| -> ControlFlow<(), _> {
let mut opt = if current_height == block.height() {
Some(block)
} else {
if start.is_none_or(|start| start <= height)
&& end.is_none_or(|end| end >= height)
{
future_blocks.insert(height, (block, hash));
}
future_blocks.insert(block.height(), block);
None
};
while let Some((block, hash)) = opt.take().or_else(|| {
while let Some(block) = opt.take().or_else(|| {
if !future_blocks.is_empty() {
future_blocks.remove(&current_height)
} else {
None
}
}) {
if end.is_some_and(|end| end < current_height) {
return ControlFlow::Break(());
}
send_ordered.send((current_height, block, hash)).unwrap();
if end.is_some_and(|end| end == current_height) {
return ControlFlow::Break(());
}
send_ordered.send(block).unwrap();
current_height.increment();
}
ControlFlow::Continue(())
});
blk_index_to_blk_recap.export();
});
recv_ordered
}
fn find_start_blk_index(
&self,
target_start: Option<Height>,
blk_index_to_blk_path: &BlkIndexToBlkPath,
xor_bytes: &XORBytes,
) -> Result<u16> {
let Some(target_start) = target_start else {
return Ok(0);
};
// If start is a very recent block we only look back X blk file before the last
if let Ok(count) = self.rpc.get_block_count()
&& (count as u32).saturating_sub(*target_start) <= 3
{
return Ok(blk_index_to_blk_path
.keys()
.rev()
.nth(2)
.cloned()
.unwrap_or_default());
}
let blk_indices: Vec<u16> = blk_index_to_blk_path
.range(0..)
.map(|(idx, _)| *idx)
.collect();
if blk_indices.is_empty() {
return Ok(0);
}
let mut left = 0;
let mut right = blk_indices.len() - 1;
let mut best_start_idx = 0;
while left <= right {
let mid = (left + right) / 2;
let blk_index = blk_indices[mid];
if let Some(blk_path) = blk_index_to_blk_path.get(&blk_index) {
match self.get_first_block_height(blk_path, xor_bytes) {
Ok(height) => {
if height <= target_start {
best_start_idx = mid;
if mid == usize::MAX {
break;
}
left = mid + 1;
} else {
if mid == 0 {
break;
}
right = mid - 1;
}
}
Err(_) => {
if mid == usize::MAX {
break;
}
left = mid + 1;
}
}
} else {
break;
}
}
// buffer for worst-case scenarios when a block as far behind
let final_idx = best_start_idx.saturating_sub(21);
Ok(blk_indices.get(final_idx).copied().unwrap_or(0))
}
fn get_first_block_height(&self, blk_path: &PathBuf, xor_bytes: &XORBytes) -> Result<Height> {
let mut file = File::open(blk_path)?;
let mut xor_i = XORIndex::default();
let mut current_4bytes = [0; 4];
let mut byte_buffer = [0u8; 1];
loop {
if file.read_exact(&mut byte_buffer).is_err() {
return Err("No magic bytes found".into());
}
current_4bytes.rotate_left(1);
current_4bytes[3] = xor_i.byte(byte_buffer[0], xor_bytes);
if current_4bytes == MAGIC_BYTES {
break;
}
}
let mut size_bytes = [0u8; 4];
file.read_exact(&mut size_bytes)?;
let _block_size =
u32::from_le_bytes(xor_i.bytes(&mut size_bytes, xor_bytes).try_into().unwrap());
let mut header_bytes = [0u8; 80];
file.read_exact(&mut header_bytes)?;
xor_i.bytes(&mut header_bytes, xor_bytes);
let header = Header::consensus_decode(&mut std::io::Cursor::new(&header_bytes))?;
let height = self.rpc.get_block_info(&header.block_hash())?.height as u32;
Ok(Height::new(height))
}
}
-11
View File
@@ -1,11 +0,0 @@
use std::{fs, path::Path, time::UNIX_EPOCH};
pub fn path_to_modified_time(path: &Path) -> u64 {
fs::metadata(path)
.unwrap()
.modified()
.unwrap()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}