From 8c3f5190167d82c7ef6e21336558b4152bc14265 Mon Sep 17 00:00:00 2001 From: nym21 Date: Sat, 22 Feb 2025 14:06:43 +0100 Subject: [PATCH] iterator: add xor support --- Cargo.lock | 10 +++--- Cargo.toml | 4 +-- iterator/src/blk_index_to_blk_recap.rs | 6 ++-- iterator/src/lib.rs | 48 ++++++++++++++++---------- iterator/src/main.rs | 2 +- iterator/src/xor.rs | 40 +++++++++++++++++++++ storable_vec/src/lib.rs | 2 +- 7 files changed, 82 insertions(+), 30 deletions(-) create mode 100644 iterator/src/xor.rs diff --git a/Cargo.lock b/Cargo.lock index 6222c5d05..e7ab13591 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -758,9 +758,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "fjall" -version = "2.6.4" +version = "2.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a7107ac1ad7c38b8a32c8bfe579bdb24bb1373b80c8e0b168f36b092647f917" +checksum = "33e0fd3f3a8cbaa2b179ccc690ea0d37282d24787c06eab0dfd9137e1c4d4699" dependencies = [ "byteorder", "byteview", @@ -780,7 +780,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" dependencies = [ "crc32fast", - "miniz_oxide 0.8.4", + "miniz_oxide 0.8.5", ] [[package]] @@ -1195,9 +1195,9 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3b1c9bd4fe1f0f8b387f6eb9eb3b4a1aa26185e5750efb9140301703f62cd1b" +checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" dependencies = [ "adler2", ] diff --git a/Cargo.toml b/Cargo.toml index c550b953e..ef5ae6f42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,14 +12,14 @@ members = [ ] resolver = "2" package.license = "MIT" -package.edition = "2021" +package.edition = "2024" [workspace.dependencies] bitcoin = { version = "0.32.5", features = ["serde"] } color-eyre = "0.6.3" computer = { version = "0", path = "computer", package = "bomputer" } derive_deref = "1.1.1" -fjall = "2.6.4" +fjall = "2.6.5" hodor = { version = "0", path = "hodor" } indexer = { version = "0", path = "indexer", package = "bindexer" } iterator = { version = "0", path = "iterator", package = "biterator", features = [ diff --git a/iterator/src/blk_index_to_blk_recap.rs b/iterator/src/blk_index_to_blk_recap.rs index df20ae43c..4d9735d2c 100644 --- a/iterator/src/blk_index_to_blk_recap.rs +++ b/iterator/src/blk_index_to_blk_recap.rs @@ -5,7 +5,7 @@ use std::{ path::{Path, PathBuf}, }; -use crate::{blk_recap::BlkRecap, BlkIndexToBlkPath, Height}; +use crate::{BlkIndexToBlkPath, Height, blk_recap::BlkRecap}; #[derive(Debug)] pub struct BlkIndexToBlkRecap { @@ -45,7 +45,7 @@ impl BlkIndexToBlkRecap { if let Some(blk_recap) = self.tree.get(blk_index) { if blk_recap.has_different_modified_time(blk_path) { self.tree.remove(blk_index).unwrap(); - if min_removed_blk_index.map_or(true, |_blk_index| *blk_index < _blk_index) { + if min_removed_blk_index.is_none_or(|_blk_index| *blk_index < _blk_index) { min_removed_blk_index.replace(*blk_index); } } @@ -54,7 +54,7 @@ impl BlkIndexToBlkRecap { unprocessed_keys.into_iter().for_each(|blk_index| { self.tree.remove(&blk_index).unwrap(); - if min_removed_blk_index.map_or(true, |_blk_index| blk_index < _blk_index) { + if min_removed_blk_index.is_none_or(|_blk_index| blk_index < _blk_index) { min_removed_blk_index.replace(blk_index); } }); diff --git a/iterator/src/lib.rs b/iterator/src/lib.rs index ac303f0c2..e1bf31489 100644 --- a/iterator/src/lib.rs +++ b/iterator/src/lib.rs @@ -1,21 +1,14 @@ -use std::{ - cmp::Ordering, - collections::BTreeMap, - fs::{self}, - ops::ControlFlow, - path::Path, - thread, -}; +use std::{cmp::Ordering, collections::BTreeMap, fs, ops::ControlFlow, path::Path, thread}; use bitcoin::{ + Block, BlockHash, consensus::{Decodable, ReadExt}, io::{Cursor, Read}, - Block, BlockHash, }; use bitcoincore_rpc::RpcApi; use blk_index_to_blk_path::*; use blk_recap::BlkRecap; -use crossbeam::channel::{bounded, Receiver}; +use crossbeam::channel::{Receiver, bounded}; use rayon::prelude::*; pub use bitcoin; @@ -29,6 +22,7 @@ mod blk_recap; mod error; mod height; mod utils; +mod xor; use blk_index_to_blk_recap::*; use blk_metadata::*; @@ -36,6 +30,7 @@ use blk_metadata_and_block::*; pub use error::*; pub use height::*; use utils::*; +use xor::*; pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 1000; const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217]; @@ -52,7 +47,8 @@ pub fn new( end: Option, rpc: &'static bitcoincore_rpc::Client, ) -> Receiver<(Height, Block, BlockHash)> { - let (send_block_reader, recv_block_reader) = bounded(BOUND_CAP); + let (send_block_reader, recv_block_reader) = bounded(5); + let (send_block_xor, recv_block_xor) = bounded(BOUND_CAP); let (send_block, recv_block) = bounded(BOUND_CAP); let (send_height_block_hash, recv_height_block_hash) = bounded(BOUND_CAP); @@ -60,6 +56,8 @@ pub fn new( let (mut blk_index_to_blk_recap, blk_index) = BlkIndexToBlkRecap::import(data_dir, &blk_index_to_blk_path, start); + let xor = XOR::from(data_dir); + thread::spawn(move || { blk_index_to_blk_path .range(blk_index..) @@ -69,9 +67,26 @@ pub fn new( let blk_metadata = BlkMetadata::new(blk_index, blk_path.as_path()); let blk_bytes = fs::read(blk_path).unwrap(); + + let res = send_block_reader.send((blk_metadata, blk_bytes)); + if let Err(e) = res { + dbg!(e); + return ControlFlow::Break(()); + } + + ControlFlow::Continue(()) + }); + }); + + thread::spawn(move || { + recv_block_reader + .iter() + .try_for_each(|(blk_metadata, blk_bytes)| -> ControlFlow<(), _> { + let blk_bytes = xor.process(blk_bytes); + let blk_bytes_len = blk_bytes.len() as u64; - let mut cursor = Cursor::new(blk_bytes.as_slice()); + let mut cursor = Cursor::new(blk_bytes); let mut current_4bytes = [0; 4]; @@ -101,7 +116,7 @@ pub fn new( cursor.read_exact(&mut bytes).unwrap(); - if send_block_reader.send((blk_metadata, BlockState::Raw(bytes))).is_err() { + if send_block_xor.send((blk_metadata, BlockState::Raw(bytes))).is_err() { return ControlFlow::Break(()); } } @@ -133,7 +148,7 @@ pub fn new( }) }; - recv_block_reader.iter().try_for_each(|tuple| { + recv_block_xor.iter().try_for_each(|tuple| { bulk.push(tuple); if bulk.len() < BOUND_CAP / 2 { @@ -192,15 +207,12 @@ pub fn new( max_height: height, modified_time: blk_metadata.modified_time, }); - } else { - dbg!(blk_metadata.index, len); - panic!() } let mut opt = if current_height == height { Some((block, hash)) } else { - if start.map_or(true, |start| start <= height) && end.map_or(true, |end| end >= height) { + if start.is_none_or(|start| start <= height) && end.is_none_or(|end| end >= height) { future_blocks.insert(height, (block, hash)); } None diff --git a/iterator/src/main.rs b/iterator/src/main.rs index d055ff53b..4341a714d 100644 --- a/iterator/src/main.rs +++ b/iterator/src/main.rs @@ -15,7 +15,7 @@ fn main() { )); let start = None; - let end = None; //Some(200_000_u32.into()); + let end = None; biterator::new(data_dir, start, end, rpc) .iter() diff --git a/iterator/src/xor.rs b/iterator/src/xor.rs new file mode 100644 index 000000000..54636283b --- /dev/null +++ b/iterator/src/xor.rs @@ -0,0 +1,40 @@ +use std::{fs, path::Path}; + +const XOR_LEN: usize = 8; + +#[derive(Debug, PartialEq, Eq, Default)] +pub struct XOR([u8; XOR_LEN]); + +impl XOR { + pub fn process(&self, mut bytes: Vec) -> Vec { + if u64::from_ne_bytes(self.0) == 0 { + return bytes; + } + + let len = bytes.len(); + let mut bytes_index = 0; + let mut xor_index = 0; + + while bytes_index < len { + bytes[bytes_index] ^= self.0[xor_index]; + bytes_index += 1; + xor_index += 1; + if xor_index == XOR_LEN { + xor_index = 0; + } + } + + bytes + } +} + +impl From<&Path> for XOR { + fn from(value: &Path) -> Self { + Self( + fs::read(value.join("blocks/xor.dat")) + .unwrap_or(vec![0; 8]) + .try_into() + .unwrap(), + ) + } +} diff --git a/storable_vec/src/lib.rs b/storable_vec/src/lib.rs index 51f8da646..184a31939 100644 --- a/storable_vec/src/lib.rs +++ b/storable_vec/src/lib.rs @@ -447,7 +447,7 @@ where let pushed_len = I::from(self.pushed_len()); let disk_len = Self::i_to_usize(disk_len)?; while index < pushed_len { - f(((index + disk_len), self.get(index)?.map(Value::from).unwrap()))?; + f(((index + disk_len), self.get(index)?.unwrap()))?; index = index + 1; }