mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-06-30 22:09:00 -07:00
computer: add positions
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable, io::Cursor};
|
||||
use bitcoincore_rpc::RpcApi;
|
||||
use brk_error::Result;
|
||||
use brk_structs::{Block, BlockPosition, Height, ParsedBlock};
|
||||
use brk_structs::{BlkPosition, Block, Height, ParsedBlock};
|
||||
|
||||
use crate::{XORBytes, XORIndex};
|
||||
|
||||
@@ -14,10 +14,10 @@ pub enum AnyBlock {
|
||||
impl AnyBlock {
|
||||
pub fn decode(
|
||||
self,
|
||||
position: BlockPosition,
|
||||
position: BlkPosition,
|
||||
rpc: &'static bitcoincore_rpc::Client,
|
||||
mut xor_i: XORIndex,
|
||||
xor_bytes: &XORBytes,
|
||||
xor_bytes: XORBytes,
|
||||
start: Option<Height>,
|
||||
end: Option<Height>,
|
||||
) -> Result<Self> {
|
||||
@@ -55,14 +55,17 @@ impl AnyBlock {
|
||||
}
|
||||
|
||||
let mut txdata = Vec::with_capacity(tx_count as usize);
|
||||
let mut tx_positions = Vec::with_capacity(tx_count as usize);
|
||||
for _ in 0..tx_count {
|
||||
let tx_position = BlkPosition::new(position.blk_index(), cursor.position() as u32);
|
||||
tx_positions.push(tx_position);
|
||||
let tx = Transaction::consensus_decode(&mut cursor)?;
|
||||
txdata.push(tx);
|
||||
}
|
||||
|
||||
let block = bitcoin::Block { header, txdata };
|
||||
let block = Block::from((height, hash, block));
|
||||
let block = ParsedBlock::from((block, position));
|
||||
let block = ParsedBlock::from((block, position, tx_positions));
|
||||
|
||||
Ok(Self::Decoded(block))
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ use derive_deref::{Deref, DerefMut};
|
||||
const BLK: &str = "blk";
|
||||
const DOT_DAT: &str = ".dat";
|
||||
|
||||
#[derive(Debug, Deref, DerefMut)]
|
||||
#[derive(Debug, Clone, Deref, DerefMut)]
|
||||
pub struct BlkIndexToBlkPath(BTreeMap<u16, PathBuf>);
|
||||
|
||||
impl BlkIndexToBlkPath {
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::{
|
||||
mem,
|
||||
ops::ControlFlow,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
thread,
|
||||
};
|
||||
|
||||
@@ -14,8 +15,9 @@ use bitcoin::{block::Header, consensus::Decodable};
|
||||
use bitcoincore_rpc::RpcApi;
|
||||
use blk_index_to_blk_path::*;
|
||||
use brk_error::Result;
|
||||
use brk_structs::{Block, BlockPosition, Height, ParsedBlock};
|
||||
use brk_structs::{BlkPosition, Block, Height, ParsedBlock};
|
||||
use crossbeam::channel::{Receiver, bounded};
|
||||
use parking_lot::{RwLock, RwLockReadGuard};
|
||||
use rayon::prelude::*;
|
||||
|
||||
mod any_block;
|
||||
@@ -24,22 +26,40 @@ mod xor_bytes;
|
||||
mod xor_index;
|
||||
|
||||
use any_block::*;
|
||||
use xor_bytes::*;
|
||||
use xor_index::*;
|
||||
pub use xor_bytes::*;
|
||||
pub use xor_index::*;
|
||||
|
||||
pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 100;
|
||||
|
||||
const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217];
|
||||
const BOUND_CAP: usize = 50;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Parser {
|
||||
blk_index_to_blk_path: Arc<RwLock<BlkIndexToBlkPath>>,
|
||||
xor_bytes: XORBytes,
|
||||
blocks_dir: PathBuf,
|
||||
rpc: &'static bitcoincore_rpc::Client,
|
||||
}
|
||||
|
||||
impl Parser {
|
||||
pub fn new(blocks_dir: PathBuf, rpc: &'static bitcoincore_rpc::Client) -> Self {
|
||||
Self { blocks_dir, rpc }
|
||||
Self {
|
||||
xor_bytes: XORBytes::from(blocks_dir.as_path()),
|
||||
blk_index_to_blk_path: Arc::new(RwLock::new(BlkIndexToBlkPath::scan(
|
||||
blocks_dir.as_path(),
|
||||
))),
|
||||
blocks_dir,
|
||||
rpc,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn blk_index_to_blk_path(&self) -> RwLockReadGuard<'_, BlkIndexToBlkPath> {
|
||||
self.blk_index_to_blk_path.read()
|
||||
}
|
||||
|
||||
pub fn xor_bytes(&self) -> XORBytes {
|
||||
self.xor_bytes
|
||||
}
|
||||
|
||||
pub fn get(&self, height: Height) -> Result<Block> {
|
||||
@@ -57,19 +77,19 @@ impl Parser {
|
||||
/// For an example checkout `./main.rs`
|
||||
///
|
||||
pub fn parse(&self, start: Option<Height>, end: Option<Height>) -> Receiver<ParsedBlock> {
|
||||
let blocks_dir = self.blocks_dir.as_path();
|
||||
let rpc = self.rpc;
|
||||
|
||||
let (send_bytes, recv_bytes) = bounded(BOUND_CAP);
|
||||
let (send_block, recv_block) = bounded(BOUND_CAP);
|
||||
let (send_ordered, recv_ordered) = bounded(BOUND_CAP);
|
||||
|
||||
let blk_index_to_blk_path = BlkIndexToBlkPath::scan(blocks_dir);
|
||||
let blk_index_to_blk_path = BlkIndexToBlkPath::scan(&self.blocks_dir);
|
||||
*self.blk_index_to_blk_path.write() = blk_index_to_blk_path.clone();
|
||||
|
||||
let xor_bytes = XORBytes::from(blocks_dir);
|
||||
let xor_bytes = self.xor_bytes;
|
||||
|
||||
let first_blk_index = self
|
||||
.find_start_blk_index(start, &blk_index_to_blk_path, &xor_bytes)
|
||||
.find_start_blk_index(start, &blk_index_to_blk_path, xor_bytes)
|
||||
.unwrap_or_default();
|
||||
|
||||
thread::spawn(move || {
|
||||
@@ -95,7 +115,7 @@ impl Parser {
|
||||
|
||||
current_4bytes.rotate_left(1);
|
||||
|
||||
current_4bytes[3] = xor_i.byte(blk_bytes[i], &xor_bytes);
|
||||
current_4bytes[3] = xor_i.byte(blk_bytes[i], xor_bytes);
|
||||
i += 1;
|
||||
|
||||
if current_4bytes == MAGIC_BYTES {
|
||||
@@ -103,27 +123,20 @@ impl Parser {
|
||||
}
|
||||
}
|
||||
|
||||
let deser_len = u32::from_le_bytes(
|
||||
let position = BlkPosition::new(blk_index, i as u32);
|
||||
|
||||
let len = u32::from_le_bytes(
|
||||
xor_i
|
||||
.bytes(&mut blk_bytes[i..(i + 4)], &xor_bytes)
|
||||
.bytes(&mut blk_bytes[i..(i + 4)], xor_bytes)
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
);
|
||||
let len = deser_len as usize;
|
||||
) as usize;
|
||||
i += 4;
|
||||
|
||||
let block_bytes = (blk_bytes[i..(i + len)]).to_vec();
|
||||
|
||||
if send_bytes
|
||||
.send((
|
||||
BlockPosition {
|
||||
blk_index,
|
||||
offset: i,
|
||||
len: deser_len,
|
||||
},
|
||||
AnyBlock::Raw(block_bytes),
|
||||
xor_i,
|
||||
))
|
||||
.send((position, AnyBlock::Raw(block_bytes), xor_i))
|
||||
.is_err()
|
||||
{
|
||||
return ControlFlow::Break(());
|
||||
@@ -143,13 +156,13 @@ impl Parser {
|
||||
|
||||
let mut bulk = vec![];
|
||||
|
||||
let drain_and_send = |bulk: &mut Vec<(BlockPosition, AnyBlock, XORIndex)>| {
|
||||
let drain_and_send = |bulk: &mut Vec<(BlkPosition, AnyBlock, XORIndex)>| {
|
||||
// Using a vec and sending after to not end up with stuck threads in par iter
|
||||
mem::take(bulk)
|
||||
.into_par_iter()
|
||||
.try_for_each(|(position, any_block, xor_i)| {
|
||||
if let Ok(AnyBlock::Decoded(block)) =
|
||||
any_block.decode(position, rpc, xor_i, &xor_bytes, start, end)
|
||||
any_block.decode(position, rpc, xor_i, xor_bytes, start, end)
|
||||
&& send_block.send(block).is_err()
|
||||
{
|
||||
return ControlFlow::Break(());
|
||||
@@ -211,7 +224,7 @@ impl Parser {
|
||||
&self,
|
||||
target_start: Option<Height>,
|
||||
blk_index_to_blk_path: &BlkIndexToBlkPath,
|
||||
xor_bytes: &XORBytes,
|
||||
xor_bytes: XORBytes,
|
||||
) -> Result<u16> {
|
||||
let Some(target_start) = target_start else {
|
||||
return Ok(0);
|
||||
@@ -280,7 +293,7 @@ impl Parser {
|
||||
Ok(blk_indices.get(final_idx).copied().unwrap_or(0))
|
||||
}
|
||||
|
||||
fn get_first_block_height(&self, blk_path: &PathBuf, xor_bytes: &XORBytes) -> Result<Height> {
|
||||
fn get_first_block_height(&self, blk_path: &PathBuf, xor_bytes: XORBytes) -> Result<Height> {
|
||||
let mut file = File::open(blk_path)?;
|
||||
let mut xor_i = XORIndex::default();
|
||||
let mut current_4bytes = [0; 4];
|
||||
@@ -314,4 +327,8 @@ impl Parser {
|
||||
|
||||
Ok(Height::new(height))
|
||||
}
|
||||
|
||||
pub fn static_clone(&self) -> &'static Self {
|
||||
Box::leak(Box::new(self.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::xor_bytes::{XOR_LEN, XORBytes};
|
||||
pub struct XORIndex(usize);
|
||||
|
||||
impl XORIndex {
|
||||
pub fn bytes<'a>(&mut self, bytes: &'a mut [u8], xor_bytes: &XORBytes) -> &'a mut [u8] {
|
||||
pub fn bytes<'a>(&mut self, bytes: &'a mut [u8], xor_bytes: XORBytes) -> &'a mut [u8] {
|
||||
let len = bytes.len();
|
||||
let mut bytes_index = 0;
|
||||
|
||||
@@ -18,7 +18,7 @@ impl XORIndex {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn byte(&mut self, mut byte: u8, xor_bytes: &XORBytes) -> u8 {
|
||||
pub fn byte(&mut self, mut byte: u8, xor_bytes: XORBytes) -> u8 {
|
||||
byte ^= xor_bytes[self.0];
|
||||
self.increment();
|
||||
byte
|
||||
|
||||
Reference in New Issue
Block a user