parser: fixed hanging + global: snapshot

This commit is contained in:
nym21
2025-02-23 21:53:39 +01:00
parent 19cf34f9d4
commit 8acbcc548c
30 changed files with 372 additions and 383 deletions

View File

@@ -1,10 +1,13 @@
use std::{cmp::Ordering, collections::BTreeMap, fs, ops::ControlFlow, path::Path, thread};
use bitcoin::{
Block, BlockHash,
consensus::{Decodable, ReadExt},
io::{Cursor, Read},
use std::{
cmp::Ordering,
collections::BTreeMap,
fs::{self},
ops::ControlFlow,
path::{Path, PathBuf},
thread,
};
use bitcoin::{Block, BlockHash};
use bitcoincore_rpc::RpcApi;
use blk_index_to_blk_path::*;
use blk_recap::BlkRecap;
@@ -17,248 +20,237 @@ pub use bitcoincore_rpc as rpc;
mod blk_index_to_blk_path;
mod blk_index_to_blk_recap;
mod blk_metadata;
mod blk_metadata_and_block;
mod blk_recap;
mod block_state;
mod error;
mod height;
mod utils;
mod xor;
mod xor_bytes;
mod xor_index;
use blk_index_to_blk_recap::*;
use blk_metadata::*;
use blk_metadata_and_block::*;
use block_state::*;
pub use error::*;
pub use height::*;
use utils::*;
use xor::*;
use xor_bytes::*;
use xor_index::*;
pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 1000;
const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217];
const BOUND_CAP: usize = 100;
const BOUND_CAP: usize = 50;
///
/// Returns a crossbeam channel receiver that receives `(Height, Block, BlockHash)` tuples from an **inclusive** range (`start` and `end`)
///
/// For an example checkout `iterator/main.rs`
///
pub fn new(
data_dir: &Path,
start: Option<Height>,
end: Option<Height>,
pub struct Parser {
data_dir: PathBuf,
rpc: &'static bitcoincore_rpc::Client,
) -> Receiver<(Height, Block, BlockHash)> {
let (send_block_reader, recv_block_reader) = bounded(5);
let (send_block_xor, recv_block_xor) = bounded(BOUND_CAP);
let (send_block, recv_block) = bounded(BOUND_CAP);
let (send_height_block_hash, recv_height_block_hash) = bounded(BOUND_CAP);
}
let blk_index_to_blk_path = BlkIndexToBlkPath::scan(data_dir);
impl Parser {
pub fn new(data_dir: &Path, rpc: &'static bitcoincore_rpc::Client) -> Self {
Self {
data_dir: data_dir.to_owned(),
rpc,
}
}
let (mut blk_index_to_blk_recap, blk_index) = BlkIndexToBlkRecap::import(data_dir, &blk_index_to_blk_path, start);
///
/// Returns a crossbeam channel receiver that receives `(Height, Block, BlockHash)` tuples from an **inclusive** range (`start` and `end`)
///
/// For an example checkout `./main.rs`
///
pub fn parse(&self, start: Option<Height>, end: Option<Height>) -> Receiver<(Height, Block, BlockHash)> {
let data_dir = self.data_dir.as_path();
let rpc = self.rpc;
let xor = XOR::from(data_dir);
let (send_bytes, recv_bytes) = bounded(BOUND_CAP);
let (send_block, recv_block) = bounded(BOUND_CAP);
let (send_height_block_hash, recv_height_block_hash) = bounded(BOUND_CAP);
thread::spawn(move || {
blk_index_to_blk_path
.range(blk_index..)
.try_for_each(move |(blk_index, blk_path)| {
let blk_index = *blk_index;
let blk_index_to_blk_path = BlkIndexToBlkPath::scan(data_dir);
let blk_metadata = BlkMetadata::new(blk_index, blk_path.as_path());
let (mut blk_index_to_blk_recap, blk_index) =
BlkIndexToBlkRecap::import(data_dir, &blk_index_to_blk_path, start);
let blk_bytes = fs::read(blk_path).unwrap();
let xor_bytes = XORBytes::from(data_dir);
let res = send_block_reader.send((blk_metadata, blk_bytes));
if let Err(e) = res {
dbg!(e);
return ControlFlow::Break(());
}
thread::spawn(move || {
let xor_bytes = xor_bytes;
ControlFlow::Continue(())
});
});
blk_index_to_blk_path
.range(blk_index..)
.try_for_each(move |(blk_index, blk_path)| {
let mut xor_i = XORIndex::default();
thread::spawn(move || {
recv_block_reader
.iter()
.try_for_each(|(blk_metadata, blk_bytes)| -> ControlFlow<(), _> {
let blk_bytes = xor.process(blk_bytes);
let blk_index = *blk_index;
let blk_bytes_len = blk_bytes.len() as u64;
let blk_metadata = BlkMetadata::new(blk_index, blk_path.as_path());
let mut cursor = Cursor::new(blk_bytes);
let mut blk_bytes_ = fs::read(blk_path).unwrap();
let blk_bytes = blk_bytes_.as_mut_slice();
let blk_bytes_len = blk_bytes.len();
let mut current_4bytes = [0; 4];
let mut current_4bytes = [0; 4];
'parent: loop {
if cursor.position() == blk_bytes_len {
break;
}
let mut i = 0;
// Read until we find a valid suite of MAGIC_BYTES
loop {
current_4bytes.rotate_left(1);
'parent: loop {
loop {
if i == blk_bytes_len {
break 'parent;
}
if let Ok(byte) = cursor.read_u8() {
current_4bytes[3] = byte;
} else {
break 'parent;
current_4bytes.rotate_left(1);
current_4bytes[3] = xor_i.byte(blk_bytes[i], &xor_bytes);
i += 1;
if current_4bytes == MAGIC_BYTES {
break;
}
}
if current_4bytes == MAGIC_BYTES {
break;
let len =
u32::from_le_bytes(xor_i.bytes(&mut blk_bytes[i..(i + 4)], &xor_bytes).try_into().unwrap())
as usize;
i += 4;
let block_bytes = (blk_bytes[i..(i + len)]).to_vec();
if send_bytes
.send((blk_metadata, BlockState::Raw(block_bytes), xor_i))
.is_err()
{
return ControlFlow::Break(());
}
i += len;
xor_i.add_assign(len);
}
let len = cursor.read_u32().unwrap();
ControlFlow::Continue(())
});
});
let mut bytes = vec![0u8; len as usize];
thread::spawn(move || {
let xor_bytes = xor_bytes;
cursor.read_exact(&mut bytes).unwrap();
let mut bulk = vec![];
if send_block_xor.send((blk_metadata, BlockState::Raw(bytes))).is_err() {
let drain_and_send = |bulk: &mut Vec<_>| {
// Using a vec and sending after to not end up with stuck threads in par iter
bulk.par_iter_mut().for_each(|(_, block_state, xor_i)| {
BlockState::decode(block_state, xor_i, &xor_bytes);
});
bulk.drain(..).try_for_each(|(blk_metadata, block_state, _)| {
let block = match block_state {
BlockState::Decoded(block) => block,
_ => unreachable!(),
};
if send_block.send((blk_metadata, block)).is_err() {
return ControlFlow::Break(());
}
ControlFlow::Continue(())
})
};
recv_bytes.iter().try_for_each(|tuple| {
bulk.push(tuple);
if bulk.len() < BOUND_CAP / 2 {
return ControlFlow::Continue(());
}
ControlFlow::Continue(())
});
});
thread::spawn(move || {
let mut bulk = vec![];
let drain_and_send = |bulk: &mut Vec<_>| {
// Using a vec and sending after to not end up with stuck threads in par iter
bulk.par_iter_mut().for_each(|(_, block_state)| {
BlockState::decode(block_state);
// Sending in bulk to not lock threads in standby
drain_and_send(&mut bulk)
});
bulk.drain(..).try_for_each(|(blk_metadata, block_state)| {
let block = match block_state {
BlockState::Decoded(block) => block,
_ => unreachable!(),
};
if send_block.send(BlkIndexAndBlock::new(blk_metadata, block)).is_err() {
return ControlFlow::Break(());
}
ControlFlow::Continue(())
})
};
recv_block_xor.iter().try_for_each(|tuple| {
bulk.push(tuple);
if bulk.len() < BOUND_CAP / 2 {
return ControlFlow::Continue(());
}
// Sending in bulk to not lock threads in standby
drain_and_send(&mut bulk)
});
drain_and_send(&mut bulk)
});
thread::spawn(move || {
let mut current_height = start.unwrap_or_default();
thread::spawn(move || {
let mut current_height = start.unwrap_or_default();
let mut future_blocks = BTreeMap::default();
let mut future_blocks = BTreeMap::default();
recv_block
.iter()
.try_for_each(|(blk_metadata, block)| -> ControlFlow<(), _> {
let hash = block.block_hash();
let header = rpc.get_block_header_info(&hash);
recv_block.iter().try_for_each(|tuple| -> ControlFlow<(), _> {
let blk_metadata = tuple.blk_metadata;
let block = tuple.block;
let hash = block.block_hash();
let header = rpc.get_block_header_info(&hash);
if header.is_err() {
return ControlFlow::Continue(());
}
let header = header.unwrap();
if header.confirmations <= 0 {
return ControlFlow::Continue(());
}
let height = Height::from(header.height);
let len = blk_index_to_blk_recap.tree.len();
if blk_metadata.index == len as u16 || blk_metadata.index + 1 == len as u16 {
match (len as u16).cmp(&blk_metadata.index) {
Ordering::Equal => {
if len % 21 == 0 {
blk_index_to_blk_recap.export();
}
if header.is_err() {
return ControlFlow::Continue(());
}
let header = header.unwrap();
if header.confirmations <= 0 {
return ControlFlow::Continue(());
}
Ordering::Less => panic!(),
Ordering::Greater => {}
}
blk_index_to_blk_recap
.tree
.entry(blk_metadata.index)
.and_modify(|recap| {
if recap.max_height < height {
recap.max_height = height;
let height = Height::from(header.height);
// println!("{height}");
let len = blk_index_to_blk_recap.tree.len();
if blk_metadata.index == len as u16 || blk_metadata.index + 1 == len as u16 {
match (len as u16).cmp(&blk_metadata.index) {
Ordering::Equal => {
if len % 21 == 0 {
blk_index_to_blk_recap.export();
}
}
Ordering::Less => panic!(),
Ordering::Greater => {}
}
})
.or_insert(BlkRecap {
max_height: height,
modified_time: blk_metadata.modified_time,
});
}
let mut opt = if current_height == height {
Some((block, hash))
} else {
if start.is_none_or(|start| start <= height) && end.is_none_or(|end| end >= height) {
future_blocks.insert(height, (block, hash));
}
None
};
blk_index_to_blk_recap
.tree
.entry(blk_metadata.index)
.and_modify(|recap| {
if recap.max_height < height {
recap.max_height = height;
}
})
.or_insert(BlkRecap {
max_height: height,
modified_time: blk_metadata.modified_time,
});
}
while let Some((block, hash)) = opt.take().or_else(|| {
if !future_blocks.is_empty() {
future_blocks.remove(&current_height)
} else {
None
}
}) {
send_height_block_hash.send((current_height, block, hash)).unwrap();
let mut opt = if current_height == height {
Some((block, hash))
} else {
if start.is_none_or(|start| start <= height) && end.is_none_or(|end| end >= height) {
future_blocks.insert(height, (block, hash));
}
None
};
if end == Some(current_height) {
return ControlFlow::Break(());
}
while let Some((block, hash)) = opt.take().or_else(|| {
if !future_blocks.is_empty() {
future_blocks.remove(&current_height)
} else {
None
}
}) {
send_height_block_hash.send((current_height, block, hash)).unwrap();
current_height.increment();
}
if end == Some(current_height) {
return ControlFlow::Break(());
}
ControlFlow::Continue(())
current_height.increment();
}
ControlFlow::Continue(())
});
blk_index_to_blk_recap.export();
});
blk_index_to_blk_recap.export();
});
recv_height_block_hash
}
enum BlockState {
Raw(Vec<u8>),
Decoded(Block),
}
impl BlockState {
pub fn decode(&mut self) {
let bytes = match self {
BlockState::Raw(bytes) => bytes,
_ => unreachable!(),
};
let mut cursor = Cursor::new(bytes);
let block = Block::consensus_decode(&mut cursor).unwrap();
*self = BlockState::Decoded(block);
recv_height_block_hash
}
}