use std::{ cmp::Ordering, collections::BTreeMap, fs::{self}, ops::ControlFlow, path::{Path, PathBuf}, thread, }; use bitcoin::{Block, BlockHash}; use bitcoincore_rpc::RpcApi; use blk_index_to_blk_path::*; use blk_recap::BlkRecap; use brk_core::Height; use crossbeam::channel::{Receiver, bounded}; use rayon::prelude::*; pub use bitcoin; pub use bitcoincore_rpc as rpc; mod blk_index_to_blk_path; mod blk_index_to_blk_recap; mod blk_metadata; mod blk_recap; mod block_state; mod error; mod utils; mod xor_bytes; mod xor_index; use blk_index_to_blk_recap::*; use blk_metadata::*; use block_state::*; pub use error::*; use utils::*; use xor_bytes::*; use xor_index::*; pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 1000; const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217]; const BOUND_CAP: usize = 50; pub struct Parser { data_dir: PathBuf, rpc: &'static bitcoincore_rpc::Client, } impl Parser { pub fn new(data_dir: &Path, rpc: &'static bitcoincore_rpc::Client) -> Self { Self { data_dir: data_dir.to_owned(), rpc, } } /// /// Returns a crossbeam channel receiver that receives `(Height, Block, BlockHash)` tuples from an **inclusive** range (`start` and `end`) /// /// For an example checkout `./main.rs` /// pub fn parse(&self, start: Option, end: Option) -> Receiver<(Height, Block, BlockHash)> { let data_dir = self.data_dir.as_path(); let rpc = self.rpc; let (send_bytes, recv_bytes) = bounded(BOUND_CAP); let (send_block, recv_block) = bounded(BOUND_CAP); let (send_height_block_hash, recv_height_block_hash) = bounded(BOUND_CAP); let blk_index_to_blk_path = BlkIndexToBlkPath::scan(data_dir); let (mut blk_index_to_blk_recap, blk_index) = BlkIndexToBlkRecap::import(data_dir, &blk_index_to_blk_path, start); let xor_bytes = XORBytes::from(data_dir); thread::spawn(move || { let xor_bytes = xor_bytes; blk_index_to_blk_path .range(blk_index..) .try_for_each(move |(blk_index, blk_path)| { let mut xor_i = XORIndex::default(); let blk_index = *blk_index; let blk_metadata = BlkMetadata::new(blk_index, blk_path.as_path()); let mut blk_bytes_ = fs::read(blk_path).unwrap(); let blk_bytes = blk_bytes_.as_mut_slice(); let blk_bytes_len = blk_bytes.len(); let mut current_4bytes = [0; 4]; let mut i = 0; 'parent: loop { loop { if i == blk_bytes_len { break 'parent; } current_4bytes.rotate_left(1); current_4bytes[3] = xor_i.byte(blk_bytes[i], &xor_bytes); i += 1; if current_4bytes == MAGIC_BYTES { break; } } let len = u32::from_le_bytes(xor_i.bytes(&mut blk_bytes[i..(i + 4)], &xor_bytes).try_into().unwrap()) as usize; i += 4; let block_bytes = (blk_bytes[i..(i + len)]).to_vec(); if send_bytes .send((blk_metadata, BlockState::Raw(block_bytes), xor_i)) .is_err() { return ControlFlow::Break(()); } i += len; xor_i.add_assign(len); } ControlFlow::Continue(()) }); }); thread::spawn(move || { let xor_bytes = xor_bytes; let mut bulk = vec![]; let drain_and_send = |bulk: &mut Vec<_>| { // Using a vec and sending after to not end up with stuck threads in par iter bulk.par_iter_mut().for_each(|(_, block_state, xor_i)| { BlockState::decode(block_state, xor_i, &xor_bytes); }); bulk.drain(..).try_for_each(|(blk_metadata, block_state, _)| { let block = match block_state { BlockState::Decoded(block) => block, _ => unreachable!(), }; if send_block.send((blk_metadata, block)).is_err() { return ControlFlow::Break(()); } ControlFlow::Continue(()) }) }; recv_bytes.iter().try_for_each(|tuple| { bulk.push(tuple); if bulk.len() < BOUND_CAP / 2 { return ControlFlow::Continue(()); } // Sending in bulk to not lock threads in standby drain_and_send(&mut bulk) }); drain_and_send(&mut bulk) }); thread::spawn(move || { let mut current_height = start.unwrap_or_default(); let mut future_blocks = BTreeMap::default(); recv_block .iter() .try_for_each(|(blk_metadata, block)| -> ControlFlow<(), _> { let hash = block.block_hash(); let header = rpc.get_block_header_info(&hash); if header.is_err() { return ControlFlow::Continue(()); } let header = header.unwrap(); if header.confirmations <= 0 { return ControlFlow::Continue(()); } let height = Height::from(header.height); // println!("{height}"); let len = blk_index_to_blk_recap.tree.len(); if blk_metadata.index == len as u16 || blk_metadata.index + 1 == len as u16 { match (len as u16).cmp(&blk_metadata.index) { Ordering::Equal => { if len % 21 == 0 { blk_index_to_blk_recap.export(); } } Ordering::Less => panic!(), Ordering::Greater => {} } blk_index_to_blk_recap .tree .entry(blk_metadata.index) .and_modify(|recap| { if recap.max_height < height { recap.max_height = height; } }) .or_insert(BlkRecap { max_height: height, modified_time: blk_metadata.modified_time, }); } let mut opt = if current_height == height { Some((block, hash)) } else { if start.is_none_or(|start| start <= height) && end.is_none_or(|end| end >= height) { future_blocks.insert(height, (block, hash)); } None }; while let Some((block, hash)) = opt.take().or_else(|| { if !future_blocks.is_empty() { future_blocks.remove(¤t_height) } else { None } }) { send_height_block_hash.send((current_height, block, hash)).unwrap(); if end == Some(current_height) { return ControlFlow::Break(()); } current_height.increment(); } ControlFlow::Continue(()) }); blk_index_to_blk_recap.export(); }); recv_height_block_hash } }