diff --git a/Cargo.lock b/Cargo.lock index d764e42e5..536fa3394 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -322,13 +322,17 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" +[[package]] +name = "brk" +version = "0.0.0" + [[package]] name = "brk_cli" -version = "0.1.0" +version = "0.0.0" [[package]] name = "brk_computer" -version = "0.1.0" +version = "0.0.0" dependencies = [ "brk_fetcher", "brk_indexer", @@ -343,10 +347,10 @@ dependencies = [ [[package]] name = "brk_fetcher" -version = "0.1.0" +version = "0.0.0" dependencies = [ "brk_indexer", - "brk_printer", + "brk_logger", "color-eyre", "derive_deref", "jiff", @@ -360,11 +364,11 @@ dependencies = [ [[package]] name = "brk_indexer" -version = "0.1.0" +version = "0.0.0" dependencies = [ "bitcoin", + "brk_logger", "brk_parser", - "brk_printer", "color-eyre", "derive_deref", "fjall", @@ -380,9 +384,19 @@ dependencies = [ "zerocopy 0.8.20", ] +[[package]] +name = "brk_logger" +version = "0.0.0" +dependencies = [ + "color-eyre", + "env_logger", + "jiff", + "log", +] + [[package]] name = "brk_parser" -version = "0.2.3" +version = "0.0.0" dependencies = [ "bitcoin", "bitcoincore-rpc", @@ -395,24 +409,14 @@ dependencies = [ "zerocopy 0.8.20", ] -[[package]] -name = "brk_printer" -version = "0.1.0" -dependencies = [ - "color-eyre", - "env_logger", - "jiff", - "log", -] - [[package]] name = "brk_server" -version = "0.1.0" +version = "0.0.0" dependencies = [ "axum", "brk_computer", "brk_indexer", - "brk_printer", + "brk_logger", "color-eyre", "derive_deref", "jiff", @@ -1104,9 +1108,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.169" +version = "0.2.170" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" [[package]] name = "linux-raw-sys" @@ -1306,9 +1310,9 @@ checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" [[package]] name = "owo-colors" -version = "4.1.0" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb37767f6569cd834a413442455e0f066d0d522de8630436e2a1761d9726ba56" +checksum = "1036865bb9422d3300cf723f657c2851d0e9ab12567854b1f4eba3d77decf564" [[package]] name = "oxc" @@ -1335,7 +1339,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e03e63fd113c068b82d07c9c614b0b146c08a3ac0a4dface3ea1d1a9d14d549e" dependencies = [ "cfg-if", - "owo-colors 4.1.0", + "owo-colors 4.2.0", "oxc-miette-derive", "textwrap", "thiserror", @@ -1909,9 +1913,9 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "ring" -version = "0.17.10" +version = "0.17.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d34b5020fcdea098ef7d95e9f89ec15952123a4a039badd09fabebe9e963e839" +checksum = "da5349ae27d3887ca812fb375b45a4fbb36d8d12d2df394968cd86e35683fe73" dependencies = [ "cc", "cfg-if", diff --git a/Cargo.toml b/Cargo.toml index eff4598be..bb50967a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = ["crates/*"] resolver = "2" package.license = "MIT" package.edition = "2024" +package.version = "0.0.0" [workspace.dependencies] bitcoin = { version = "0.32.5", features = ["serde"] } @@ -10,7 +11,7 @@ brk_computer = { version = "0", path = "crates/brk_computer" } brk_fetcher = { version = "0", path = "crates/brk_fetcher" } brk_indexer = { version = "0", path = "crates/brk_indexer" } brk_parser = { version = "0", path = "crates/brk_parser", features = ["bytes"] } -brk_printer = { version = "0", path = "crates/brk_printer" } +brk_logger = { version = "0", path = "crates/brk_logger" } brk_server = { version = "0", path = "crates/brk_server" } color-eyre = "0.6.3" derive_deref = "1.1.1" diff --git a/crates/brk/Cargo.toml b/crates/brk/Cargo.toml new file mode 100644 index 000000000..fbcc31153 --- /dev/null +++ b/crates/brk/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "brk" +license.workspace = true +edition.workspace = true +version.workspace = true + +[dependencies] diff --git a/crates/brk/src/main.rs b/crates/brk/src/main.rs new file mode 100644 index 000000000..e7a11a969 --- /dev/null +++ b/crates/brk/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/crates/brk_cli/Cargo.toml b/crates/brk_cli/Cargo.toml index 4e38c18b1..640de0db8 100644 --- a/crates/brk_cli/Cargo.toml +++ b/crates/brk_cli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "brk_cli" description = "A command line interface to run berver" -version = "0.1.0" +version = { workspace = true } edition = { workspace = true } license = { workspace = true } diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index 760fe6fbf..733032898 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "brk_computer" description = "A Bitcoin dataset computer built on top of brk_indexer and brk_fetcher" -version = "0.1.0" +version = { workspace = true } edition = { workspace = true } license = { workspace = true } diff --git a/crates/brk_fetcher/Cargo.toml b/crates/brk_fetcher/Cargo.toml index 33a9595e8..b060434f5 100644 --- a/crates/brk_fetcher/Cargo.toml +++ b/crates/brk_fetcher/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "brk_fetcher" description = "A bitcoin price fetcher built on top of brk_indexer" -version = "0.1.0" +version = { workspace = true } edition = { workspace = true } license = { workspace = true } [dependencies] brk_indexer = { workspace = true } -brk_printer = { workspace = true } +brk_logger = { workspace = true } color-eyre = { workspace = true } derive_deref = { workspace = true } jiff = { workspace = true } diff --git a/crates/brk_fetcher/src/main.rs b/crates/brk_fetcher/src/main.rs index 2c057cdae..fd589f72a 100644 --- a/crates/brk_fetcher/src/main.rs +++ b/crates/brk_fetcher/src/main.rs @@ -5,7 +5,7 @@ use serde_json::Value; fn main() -> color_eyre::Result<()> { color_eyre::install()?; - brk_printer::init_log(None); + brk_logger::init(None); dbg!(Binance::fetch_1d()?); // dbg!(Binance::fetch_1mn_prices()); diff --git a/crates/brk_indexer/Cargo.toml b/crates/brk_indexer/Cargo.toml index 547518d74..e6526f818 100644 --- a/crates/brk_indexer/Cargo.toml +++ b/crates/brk_indexer/Cargo.toml @@ -1,14 +1,14 @@ [package] name = "brk_indexer" description = "A bitcoin-core indexer built on top of brk_parser" -version = "0.1.0" +version = { workspace = true } edition = { workspace = true } license = { workspace = true } [dependencies] bitcoin = { workspace = true } brk_parser = { workspace = true } -brk_printer = { workspace = true } +brk_logger = { workspace = true } color-eyre = { workspace = true } derive_deref = { workspace = true } fjall = { workspace = true } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 901226e81..4a3258135 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -26,7 +26,7 @@ const SNAPSHOT_BLOCK_RANGE: usize = 1000; pub struct Indexer { pub vecs: StorableVecs, - pub trees: Fjalls, + pub stores: Fjalls, } impl Indexer { @@ -40,9 +40,9 @@ impl Indexer { info!("Importing indexes..."); let vecs = StorableVecs::import(&indexes_dir.join("vecs"))?; - let trees = Fjalls::import(&indexes_dir.join("fjall"))?; + let stores = Fjalls::import(&indexes_dir.join("fjall"))?; - Ok(Self { vecs, trees }) + Ok(Self { vecs, stores }) } } @@ -52,33 +52,37 @@ impl Indexer { let check_collisions = true; - let starting_indexes = Indexes::try_from((&mut self.vecs, &self.trees, rpc)).unwrap_or_else(|_| { + let starting_indexes = Indexes::try_from((&mut self.vecs, &self.stores, rpc)).unwrap_or_else(|_| { let indexes = Indexes::default(); indexes.push_if_needed(&mut self.vecs).unwrap(); indexes }); exit.block(); - self.trees.rollback(&self.vecs, &starting_indexes)?; + self.stores.rollback(&self.vecs, &starting_indexes)?; self.vecs.rollback(&starting_indexes)?; exit.unblock(); let export = - |trees: &mut Fjalls, vecs: &mut StorableVecs, height: Height| -> color_eyre::Result<()> { + |stores: &mut Fjalls, vecs: &mut StorableVecs, height: Height| -> color_eyre::Result<()> { info!("Exporting..."); exit.block(); - trees.commit(height)?; + stores.commit(height)?; + info!("Exported stores"); vecs.flush(height)?; + info!("Exported vecs"); exit.unblock(); Ok(()) }; let vecs = &mut self.vecs; - let trees = &mut self.trees; + let stores = &mut self.stores; let mut idxs = starting_indexes; - brk_parser::new(bitcoin_dir, Some(idxs.height), None, rpc) + let parser = Parser::new(bitcoin_dir, rpc); + + parser.parse(Some(idxs.height), None) .iter() .try_for_each(|(height, block, blockhash)| -> color_eyre::Result<()> { info!("Indexing block {height}..."); @@ -88,7 +92,7 @@ impl Indexer { let blockhash = BlockHash::from(blockhash); let blockhash_prefix = BlockHashPrefix::from(&blockhash); - if trees + if stores .blockhash_prefix_to_height .get(&blockhash_prefix)? .is_some_and(|prev_height| *prev_height != height) @@ -97,7 +101,7 @@ impl Indexer { return Err(eyre!("Collision, expect prefix to need be set yet")); } - trees + stores .blockhash_prefix_to_height .insert_if_needed(blockhash_prefix, height, height); @@ -152,9 +156,9 @@ impl Indexer { let txid_prefix = TxidPrefix::from(&txid); let prev_txindex_opt = - if check_collisions && trees.txid_prefix_to_txindex.needs(height) { + if check_collisions && stores.txid_prefix_to_txindex.needs(height) { // Should only find collisions for two txids (duplicates), see below - trees.txid_prefix_to_txindex.get(&txid_prefix)?.map(|v| *v) + stores.txid_prefix_to_txindex.get(&txid_prefix)?.map(|v| *v) } else { None }; @@ -194,7 +198,7 @@ impl Indexer { return Ok((txinindex, InputSource::SameBlock((tx, txindex, txin, vin)))); } - let prev_txindex = if let Some(txindex) = trees + let prev_txindex = if let Some(txindex) = stores .txid_prefix_to_txindex .get(&TxidPrefix::from(&txid))? .map(|v| *v) @@ -272,7 +276,7 @@ impl Indexer { }); let addressindex_opt = addressbytes_res.as_ref().ok().and_then(|addressbytes| { - trees + stores .addresshash_to_addressindex .get(&AddressHash::from((addressbytes, addresstype))) .unwrap() @@ -304,7 +308,7 @@ impl Indexer { if (vecs.addressindex_to_addresstype.hasnt(addressindex)? && addresstype != prev_addresstype) - || (trees.addresshash_to_addressindex.needs(height) + || (stores.addresshash_to_addressindex.needs(height) && prev_addressbytes != addressbytes) { let txid = tx.compute_txid(); @@ -454,7 +458,7 @@ impl Indexer { already_added_addresshash .insert(addresshash, addressindex); - trees.addresshash_to_addressindex.insert_if_needed( + stores.addresshash_to_addressindex.insert_if_needed( addresshash, addressindex, height, @@ -541,7 +545,7 @@ impl Indexer { match prev_txindex_opt { None => { - trees + stores .txid_prefix_to_txindex .insert_if_needed(txid_prefix, txindex, height); } @@ -612,13 +616,13 @@ impl Indexer { let should_snapshot = height != 0 && height % SNAPSHOT_BLOCK_RANGE == 0 && !exit.blocked(); if should_snapshot { - export(trees, vecs, height)?; + export(stores, vecs, height)?; } Ok(()) })?; - export(trees, vecs, idxs.height)?; + export(stores, vecs, idxs.height)?; sleep(Duration::from_millis(100)); diff --git a/crates/brk_indexer/src/main.rs b/crates/brk_indexer/src/main.rs index fa7788c83..54b2b56f5 100644 --- a/crates/brk_indexer/src/main.rs +++ b/crates/brk_indexer/src/main.rs @@ -9,9 +9,9 @@ use storable_vec::CACHED_GETS; fn main() -> color_eyre::Result<()> { color_eyre::install()?; - brk_printer::init_log(None); + brk_logger::init(None); - let data_dir = Path::new("../../bitcoin"); + let data_dir = Path::new("../../../bitcoin"); let rpc = Box::leak(Box::new(rpc::Client::new( "http://localhost:8332", rpc::Auth::CookieFile(Path::new(data_dir).join(".cookie")), @@ -25,7 +25,7 @@ fn main() -> color_eyre::Result<()> { let i = std::time::Instant::now(); - let mut indexer: Indexer = Indexer::import(Path::new("../_outputs/indexes"))?; + let mut indexer: Indexer = Indexer::import(Path::new("../../_outputs/indexes"))?; indexer.index(data_dir, rpc, &exit)?; diff --git a/crates/brk_indexer/src/storage/fjalls/version.rs b/crates/brk_indexer/src/storage/fjalls/version.rs deleted file mode 100644 index 1fb2028ba..000000000 --- a/crates/brk_indexer/src/storage/fjalls/version.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::{fs, io, path::Path}; - -use derive_deref::Deref; -use fjall::Slice; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deref)] -pub struct Version(u32); - -impl Version { - pub fn write(&self, path: &Path) -> Result<(), io::Error> { - fs::write(path, self.to_ne_bytes()) - } -} - -impl From for Version { - fn from(value: u32) -> Self { - Self(value) - } -} - -impl TryFrom<&Path> for Version { - type Error = io::Error; - fn try_from(value: &Path) -> Result { - Self::try_from(&fs::read(value)?) - } -} -impl TryFrom for Version { - type Error = fjall::Error; - fn try_from(value: Slice) -> Result { - Self::from(&value) - } -} -impl TryFrom<&[u8]> for Version { - type Error = storable_vec::Error; - fn try_from(value: &[u8]) -> Result { - let mut buf: [u8; 4] = [0; 4]; - let buf_len = buf.len(); - if value.len() != buf_len { - panic!(); - } - value.iter().enumerate().for_each(|(i, r)| { - buf[i] = *r; - }); - Ok(Self(u32::from_ne_bytes(buf))) - } -} -impl From for Slice { - fn from(value: Version) -> Self { - Self::new(&value.to_ne_bytes()) - } -} diff --git a/crates/brk_indexer/src/storage/mod.rs b/crates/brk_indexer/src/storage/mod.rs index f25e0398a..15d757c6a 100644 --- a/crates/brk_indexer/src/storage/mod.rs +++ b/crates/brk_indexer/src/storage/mod.rs @@ -1,9 +1,5 @@ -// mod canopy; mod fjalls; -// mod sanakirja; mod storable_vecs; -// pub use canopy::*; pub use fjalls::*; -// pub use sanakirja::*; pub use storable_vecs::*; diff --git a/crates/brk_printer/Cargo.toml b/crates/brk_logger/Cargo.toml similarity index 81% rename from crates/brk_printer/Cargo.toml rename to crates/brk_logger/Cargo.toml index 5009a0d61..bbc78223a 100644 --- a/crates/brk_printer/Cargo.toml +++ b/crates/brk_logger/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "brk_printer" +name = "brk_logger" description = "A clean logger" -version = "0.1.0" +version = { workspace = true } edition = { workspace = true } license = { workspace = true } diff --git a/crates/brk_printer/src/lib.rs b/crates/brk_logger/src/lib.rs similarity index 96% rename from crates/brk_printer/src/lib.rs rename to crates/brk_logger/src/lib.rs index b56cdb27e..123503149 100644 --- a/crates/brk_printer/src/lib.rs +++ b/crates/brk_logger/src/lib.rs @@ -7,11 +7,11 @@ use std::{ use color_eyre::owo_colors::OwoColorize; use env_logger::{Builder, Env}; -use jiff::{tz, Timestamp}; +use jiff::{Timestamp, tz}; pub use log::{debug, error, info, trace, warn}; #[inline(always)] -pub fn init_log(path: Option<&Path>) { +pub fn init(path: Option<&Path>) { let file = path.map(|path| { let _ = fs::remove_file(path); OpenOptions::new().create(true).append(true).open(path).unwrap() diff --git a/crates/brk_printer/src/main.rs b/crates/brk_logger/src/main.rs similarity index 59% rename from crates/brk_printer/src/main.rs rename to crates/brk_logger/src/main.rs index 80e3c0af9..5414a6614 100644 --- a/crates/brk_printer/src/main.rs +++ b/crates/brk_logger/src/main.rs @@ -1,6 +1,6 @@ use log::info; fn main() { - brk_printer::init_log(None); + brk_logger::init(None); info!("test"); } diff --git a/crates/brk_parser/Cargo.toml b/crates/brk_parser/Cargo.toml index e80f382c3..fadde0dc1 100644 --- a/crates/brk_parser/Cargo.toml +++ b/crates/brk_parser/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "brk_parser" description = "A very fast Bitcoin block iterator built on top of bitcoin-rust" -version = "0.2.3" repository = "https://github.com/kibo-money/kibo/tree/main/src/crates/biter" keywords = ["bitcoin", "block", "iterator"] categories = ["cryptography::cryptocurrencies", "encoding"] +version = { workspace = true } edition = { workspace = true } license = { workspace = true } diff --git a/crates/brk_parser/src/blk_index_to_blk_recap.rs b/crates/brk_parser/src/blk_index_to_blk_recap.rs index 4d9735d2c..f95ef13ae 100644 --- a/crates/brk_parser/src/blk_index_to_blk_recap.rs +++ b/crates/brk_parser/src/blk_index_to_blk_recap.rs @@ -69,19 +69,19 @@ impl BlkIndexToBlkRecap { let height = start.unwrap(); - let mut start = 0; + let mut start = None; if let Some(found) = self.tree.iter().find(|(_, recap)| recap.max_height >= height) { - start = *found.0; + start = Some(*found.0); } if let Some(min_removed) = min_removed { - if start > min_removed { - start = min_removed; + if start.is_none_or(|start| start > min_removed) { + start = Some(min_removed); } } - start + start.unwrap() } pub fn export(&self) { diff --git a/crates/brk_parser/src/blk_metadata_and_block.rs b/crates/brk_parser/src/blk_metadata_and_block.rs deleted file mode 100644 index d65966b40..000000000 --- a/crates/brk_parser/src/blk_metadata_and_block.rs +++ /dev/null @@ -1,15 +0,0 @@ -use bitcoin::Block; - -use crate::BlkMetadata; - -#[derive(Debug)] -pub struct BlkIndexAndBlock { - pub blk_metadata: BlkMetadata, - pub block: Block, -} - -impl BlkIndexAndBlock { - pub fn new(blk_metadata: BlkMetadata, block: Block) -> Self { - Self { blk_metadata, block } - } -} diff --git a/crates/brk_parser/src/blk_recap.rs b/crates/brk_parser/src/blk_recap.rs index cd2fee31a..d7c8e8ca6 100644 --- a/crates/brk_parser/src/blk_recap.rs +++ b/crates/brk_parser/src/blk_recap.rs @@ -2,7 +2,7 @@ use std::path::Path; use serde::{Deserialize, Serialize}; -use crate::{path_to_modified_time, Height}; +use crate::{Height, path_to_modified_time}; #[derive(Debug, Clone, Copy, Serialize, Deserialize)] #[repr(C)] @@ -13,9 +13,6 @@ pub struct BlkRecap { impl BlkRecap { pub fn has_different_modified_time(&self, blk_path: &Path) -> bool { - if self.modified_time != path_to_modified_time(blk_path) { - dbg!(self.modified_time, path_to_modified_time(blk_path)); - } self.modified_time != path_to_modified_time(blk_path) } } diff --git a/crates/brk_parser/src/block_state.rs b/crates/brk_parser/src/block_state.rs new file mode 100644 index 000000000..32e13ddea --- /dev/null +++ b/crates/brk_parser/src/block_state.rs @@ -0,0 +1,25 @@ +use bitcoin::{Block, consensus::Decodable, io::Cursor}; + +use crate::{XORBytes, XORIndex}; + +pub enum BlockState { + Raw(Vec), + Decoded(Block), +} + +impl BlockState { + pub fn decode(&mut self, xor_i: &mut XORIndex, xor_bytes: &XORBytes) { + let bytes = match self { + BlockState::Raw(bytes) => bytes, + _ => unreachable!(), + }; + + xor_i.bytes(bytes.as_mut_slice(), xor_bytes); + + let mut cursor = Cursor::new(bytes); + + let block = Block::consensus_decode(&mut cursor).unwrap(); + + *self = BlockState::Decoded(block); + } +} diff --git a/crates/brk_parser/src/height.rs b/crates/brk_parser/src/height.rs index 8df4c97f9..12071665e 100644 --- a/crates/brk_parser/src/height.rs +++ b/crates/brk_parser/src/height.rs @@ -108,10 +108,17 @@ impl AddAssign for Height { } } +impl Rem for Height { + type Output = Height; + fn rem(self, rhs: Height) -> Self::Output { + Self(self.0.rem(rhs.0)) + } +} + impl Rem for Height { type Output = Height; fn rem(self, rhs: usize) -> Self::Output { - Self(self.abs_diff(Height::from(rhs).0)) + Self(self.0.rem(Height::from(rhs).0)) } } diff --git a/crates/brk_parser/src/lib.rs b/crates/brk_parser/src/lib.rs index e1bf31489..1ad841f91 100644 --- a/crates/brk_parser/src/lib.rs +++ b/crates/brk_parser/src/lib.rs @@ -1,10 +1,13 @@ -use std::{cmp::Ordering, collections::BTreeMap, fs, ops::ControlFlow, path::Path, thread}; - -use bitcoin::{ - Block, BlockHash, - consensus::{Decodable, ReadExt}, - io::{Cursor, Read}, +use std::{ + cmp::Ordering, + collections::BTreeMap, + fs::{self}, + ops::ControlFlow, + path::{Path, PathBuf}, + thread, }; + +use bitcoin::{Block, BlockHash}; use bitcoincore_rpc::RpcApi; use blk_index_to_blk_path::*; use blk_recap::BlkRecap; @@ -17,248 +20,237 @@ pub use bitcoincore_rpc as rpc; mod blk_index_to_blk_path; mod blk_index_to_blk_recap; mod blk_metadata; -mod blk_metadata_and_block; mod blk_recap; +mod block_state; mod error; mod height; mod utils; -mod xor; +mod xor_bytes; +mod xor_index; use blk_index_to_blk_recap::*; use blk_metadata::*; -use blk_metadata_and_block::*; +use block_state::*; pub use error::*; pub use height::*; use utils::*; -use xor::*; +use xor_bytes::*; +use xor_index::*; pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 1000; + const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217]; -const BOUND_CAP: usize = 100; +const BOUND_CAP: usize = 50; -/// -/// Returns a crossbeam channel receiver that receives `(Height, Block, BlockHash)` tuples from an **inclusive** range (`start` and `end`) -/// -/// For an example checkout `iterator/main.rs` -/// -pub fn new( - data_dir: &Path, - start: Option, - end: Option, +pub struct Parser { + data_dir: PathBuf, rpc: &'static bitcoincore_rpc::Client, -) -> Receiver<(Height, Block, BlockHash)> { - let (send_block_reader, recv_block_reader) = bounded(5); - let (send_block_xor, recv_block_xor) = bounded(BOUND_CAP); - let (send_block, recv_block) = bounded(BOUND_CAP); - let (send_height_block_hash, recv_height_block_hash) = bounded(BOUND_CAP); +} - let blk_index_to_blk_path = BlkIndexToBlkPath::scan(data_dir); +impl Parser { + pub fn new(data_dir: &Path, rpc: &'static bitcoincore_rpc::Client) -> Self { + Self { + data_dir: data_dir.to_owned(), + rpc, + } + } - let (mut blk_index_to_blk_recap, blk_index) = BlkIndexToBlkRecap::import(data_dir, &blk_index_to_blk_path, start); + /// + /// Returns a crossbeam channel receiver that receives `(Height, Block, BlockHash)` tuples from an **inclusive** range (`start` and `end`) + /// + /// For an example checkout `./main.rs` + /// + pub fn parse(&self, start: Option, end: Option) -> Receiver<(Height, Block, BlockHash)> { + let data_dir = self.data_dir.as_path(); + let rpc = self.rpc; - let xor = XOR::from(data_dir); + let (send_bytes, recv_bytes) = bounded(BOUND_CAP); + let (send_block, recv_block) = bounded(BOUND_CAP); + let (send_height_block_hash, recv_height_block_hash) = bounded(BOUND_CAP); - thread::spawn(move || { - blk_index_to_blk_path - .range(blk_index..) - .try_for_each(move |(blk_index, blk_path)| { - let blk_index = *blk_index; + let blk_index_to_blk_path = BlkIndexToBlkPath::scan(data_dir); - let blk_metadata = BlkMetadata::new(blk_index, blk_path.as_path()); + let (mut blk_index_to_blk_recap, blk_index) = + BlkIndexToBlkRecap::import(data_dir, &blk_index_to_blk_path, start); - let blk_bytes = fs::read(blk_path).unwrap(); + let xor_bytes = XORBytes::from(data_dir); - let res = send_block_reader.send((blk_metadata, blk_bytes)); - if let Err(e) = res { - dbg!(e); - return ControlFlow::Break(()); - } + thread::spawn(move || { + let xor_bytes = xor_bytes; - ControlFlow::Continue(()) - }); - }); + blk_index_to_blk_path + .range(blk_index..) + .try_for_each(move |(blk_index, blk_path)| { + let mut xor_i = XORIndex::default(); - thread::spawn(move || { - recv_block_reader - .iter() - .try_for_each(|(blk_metadata, blk_bytes)| -> ControlFlow<(), _> { - let blk_bytes = xor.process(blk_bytes); + let blk_index = *blk_index; - let blk_bytes_len = blk_bytes.len() as u64; + let blk_metadata = BlkMetadata::new(blk_index, blk_path.as_path()); - let mut cursor = Cursor::new(blk_bytes); + let mut blk_bytes_ = fs::read(blk_path).unwrap(); + let blk_bytes = blk_bytes_.as_mut_slice(); + let blk_bytes_len = blk_bytes.len(); - let mut current_4bytes = [0; 4]; + let mut current_4bytes = [0; 4]; - 'parent: loop { - if cursor.position() == blk_bytes_len { - break; - } + let mut i = 0; - // Read until we find a valid suite of MAGIC_BYTES - loop { - current_4bytes.rotate_left(1); + 'parent: loop { + loop { + if i == blk_bytes_len { + break 'parent; + } - if let Ok(byte) = cursor.read_u8() { - current_4bytes[3] = byte; - } else { - break 'parent; + current_4bytes.rotate_left(1); + + current_4bytes[3] = xor_i.byte(blk_bytes[i], &xor_bytes); + i += 1; + + if current_4bytes == MAGIC_BYTES { + break; + } } - if current_4bytes == MAGIC_BYTES { - break; + let len = + u32::from_le_bytes(xor_i.bytes(&mut blk_bytes[i..(i + 4)], &xor_bytes).try_into().unwrap()) + as usize; + i += 4; + + let block_bytes = (blk_bytes[i..(i + len)]).to_vec(); + + if send_bytes + .send((blk_metadata, BlockState::Raw(block_bytes), xor_i)) + .is_err() + { + return ControlFlow::Break(()); } + + i += len; + xor_i.add_assign(len); } - let len = cursor.read_u32().unwrap(); + ControlFlow::Continue(()) + }); + }); - let mut bytes = vec![0u8; len as usize]; + thread::spawn(move || { + let xor_bytes = xor_bytes; - cursor.read_exact(&mut bytes).unwrap(); + let mut bulk = vec![]; - if send_block_xor.send((blk_metadata, BlockState::Raw(bytes))).is_err() { + let drain_and_send = |bulk: &mut Vec<_>| { + // Using a vec and sending after to not end up with stuck threads in par iter + bulk.par_iter_mut().for_each(|(_, block_state, xor_i)| { + BlockState::decode(block_state, xor_i, &xor_bytes); + }); + + bulk.drain(..).try_for_each(|(blk_metadata, block_state, _)| { + let block = match block_state { + BlockState::Decoded(block) => block, + _ => unreachable!(), + }; + + if send_block.send((blk_metadata, block)).is_err() { return ControlFlow::Break(()); } + + ControlFlow::Continue(()) + }) + }; + + recv_bytes.iter().try_for_each(|tuple| { + bulk.push(tuple); + + if bulk.len() < BOUND_CAP / 2 { + return ControlFlow::Continue(()); } - ControlFlow::Continue(()) - }); - }); - - thread::spawn(move || { - let mut bulk = vec![]; - - let drain_and_send = |bulk: &mut Vec<_>| { - // Using a vec and sending after to not end up with stuck threads in par iter - bulk.par_iter_mut().for_each(|(_, block_state)| { - BlockState::decode(block_state); + // Sending in bulk to not lock threads in standby + drain_and_send(&mut bulk) }); - bulk.drain(..).try_for_each(|(blk_metadata, block_state)| { - let block = match block_state { - BlockState::Decoded(block) => block, - _ => unreachable!(), - }; - - if send_block.send(BlkIndexAndBlock::new(blk_metadata, block)).is_err() { - return ControlFlow::Break(()); - } - - ControlFlow::Continue(()) - }) - }; - - recv_block_xor.iter().try_for_each(|tuple| { - bulk.push(tuple); - - if bulk.len() < BOUND_CAP / 2 { - return ControlFlow::Continue(()); - } - - // Sending in bulk to not lock threads in standby drain_and_send(&mut bulk) }); - drain_and_send(&mut bulk) - }); + thread::spawn(move || { + let mut current_height = start.unwrap_or_default(); - thread::spawn(move || { - let mut current_height = start.unwrap_or_default(); + let mut future_blocks = BTreeMap::default(); - let mut future_blocks = BTreeMap::default(); + recv_block + .iter() + .try_for_each(|(blk_metadata, block)| -> ControlFlow<(), _> { + let hash = block.block_hash(); + let header = rpc.get_block_header_info(&hash); - recv_block.iter().try_for_each(|tuple| -> ControlFlow<(), _> { - let blk_metadata = tuple.blk_metadata; - let block = tuple.block; - let hash = block.block_hash(); - let header = rpc.get_block_header_info(&hash); - - if header.is_err() { - return ControlFlow::Continue(()); - } - let header = header.unwrap(); - if header.confirmations <= 0 { - return ControlFlow::Continue(()); - } - - let height = Height::from(header.height); - - let len = blk_index_to_blk_recap.tree.len(); - if blk_metadata.index == len as u16 || blk_metadata.index + 1 == len as u16 { - match (len as u16).cmp(&blk_metadata.index) { - Ordering::Equal => { - if len % 21 == 0 { - blk_index_to_blk_recap.export(); - } + if header.is_err() { + return ControlFlow::Continue(()); + } + let header = header.unwrap(); + if header.confirmations <= 0 { + return ControlFlow::Continue(()); } - Ordering::Less => panic!(), - Ordering::Greater => {} - } - blk_index_to_blk_recap - .tree - .entry(blk_metadata.index) - .and_modify(|recap| { - if recap.max_height < height { - recap.max_height = height; + let height = Height::from(header.height); + // println!("{height}"); + + let len = blk_index_to_blk_recap.tree.len(); + if blk_metadata.index == len as u16 || blk_metadata.index + 1 == len as u16 { + match (len as u16).cmp(&blk_metadata.index) { + Ordering::Equal => { + if len % 21 == 0 { + blk_index_to_blk_recap.export(); + } + } + Ordering::Less => panic!(), + Ordering::Greater => {} } - }) - .or_insert(BlkRecap { - max_height: height, - modified_time: blk_metadata.modified_time, - }); - } - let mut opt = if current_height == height { - Some((block, hash)) - } else { - if start.is_none_or(|start| start <= height) && end.is_none_or(|end| end >= height) { - future_blocks.insert(height, (block, hash)); - } - None - }; + blk_index_to_blk_recap + .tree + .entry(blk_metadata.index) + .and_modify(|recap| { + if recap.max_height < height { + recap.max_height = height; + } + }) + .or_insert(BlkRecap { + max_height: height, + modified_time: blk_metadata.modified_time, + }); + } - while let Some((block, hash)) = opt.take().or_else(|| { - if !future_blocks.is_empty() { - future_blocks.remove(¤t_height) - } else { - None - } - }) { - send_height_block_hash.send((current_height, block, hash)).unwrap(); + let mut opt = if current_height == height { + Some((block, hash)) + } else { + if start.is_none_or(|start| start <= height) && end.is_none_or(|end| end >= height) { + future_blocks.insert(height, (block, hash)); + } + None + }; - if end == Some(current_height) { - return ControlFlow::Break(()); - } + while let Some((block, hash)) = opt.take().or_else(|| { + if !future_blocks.is_empty() { + future_blocks.remove(¤t_height) + } else { + None + } + }) { + send_height_block_hash.send((current_height, block, hash)).unwrap(); - current_height.increment(); - } + if end == Some(current_height) { + return ControlFlow::Break(()); + } - ControlFlow::Continue(()) + current_height.increment(); + } + + ControlFlow::Continue(()) + }); + + blk_index_to_blk_recap.export(); }); - blk_index_to_blk_recap.export(); - }); - - recv_height_block_hash -} - -enum BlockState { - Raw(Vec), - Decoded(Block), -} - -impl BlockState { - pub fn decode(&mut self) { - let bytes = match self { - BlockState::Raw(bytes) => bytes, - _ => unreachable!(), - }; - - let mut cursor = Cursor::new(bytes); - - let block = Block::consensus_decode(&mut cursor).unwrap(); - - *self = BlockState::Decoded(block); + recv_height_block_hash } } diff --git a/crates/brk_parser/src/main.rs b/crates/brk_parser/src/main.rs index 623e79bcd..3e820a0bd 100644 --- a/crates/brk_parser/src/main.rs +++ b/crates/brk_parser/src/main.rs @@ -1,11 +1,12 @@ use std::path::Path; use bitcoincore_rpc::{Auth, Client}; +use brk_parser::Parser; fn main() { let i = std::time::Instant::now(); - let data_dir = Path::new("../../bitcoin"); + let data_dir = Path::new("../../../bitcoin"); let rpc = Box::leak(Box::new( Client::new( "http://localhost:8332", @@ -17,11 +18,11 @@ fn main() { let start = None; let end = None; - brk_parser::new(data_dir, start, end, rpc) - .iter() - .for_each(|(height, _block, hash)| { - println!("{height}: {hash}"); - }); + let parser = Parser::new(data_dir, rpc); + + parser.parse(start, end).iter().for_each(|(height, _block, hash)| { + println!("{height}: {hash}"); + }); dbg!(i.elapsed()); } diff --git a/crates/brk_parser/src/xor.rs b/crates/brk_parser/src/xor.rs deleted file mode 100644 index 54636283b..000000000 --- a/crates/brk_parser/src/xor.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::{fs, path::Path}; - -const XOR_LEN: usize = 8; - -#[derive(Debug, PartialEq, Eq, Default)] -pub struct XOR([u8; XOR_LEN]); - -impl XOR { - pub fn process(&self, mut bytes: Vec) -> Vec { - if u64::from_ne_bytes(self.0) == 0 { - return bytes; - } - - let len = bytes.len(); - let mut bytes_index = 0; - let mut xor_index = 0; - - while bytes_index < len { - bytes[bytes_index] ^= self.0[xor_index]; - bytes_index += 1; - xor_index += 1; - if xor_index == XOR_LEN { - xor_index = 0; - } - } - - bytes - } -} - -impl From<&Path> for XOR { - fn from(value: &Path) -> Self { - Self( - fs::read(value.join("blocks/xor.dat")) - .unwrap_or(vec![0; 8]) - .try_into() - .unwrap(), - ) - } -} diff --git a/crates/brk_parser/src/xor_bytes.rs b/crates/brk_parser/src/xor_bytes.rs new file mode 100644 index 000000000..9b900f534 --- /dev/null +++ b/crates/brk_parser/src/xor_bytes.rs @@ -0,0 +1,19 @@ +use std::{fs, path::Path}; + +use derive_deref::Deref; + +pub const XOR_LEN: usize = 8; + +#[derive(Debug, Clone, Copy, Deref)] +pub struct XORBytes([u8; XOR_LEN]); + +impl From<&Path> for XORBytes { + fn from(value: &Path) -> Self { + Self( + fs::read(value.join("blocks/xor.dat")) + .unwrap_or(vec![0; 8]) + .try_into() + .unwrap(), + ) + } +} diff --git a/crates/brk_parser/src/xor_index.rs b/crates/brk_parser/src/xor_index.rs new file mode 100644 index 000000000..81b79498d --- /dev/null +++ b/crates/brk_parser/src/xor_index.rs @@ -0,0 +1,39 @@ +use crate::xor_bytes::{XOR_LEN, XORBytes}; + +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] +pub struct XORIndex(usize); + +impl XORIndex { + pub fn bytes<'a>(&mut self, bytes: &'a mut [u8], xor_bytes: &XORBytes) -> &'a mut [u8] { + let len = bytes.len(); + let mut bytes_index = 0; + + while bytes_index < len { + bytes[bytes_index] ^= xor_bytes[self.0]; + self.increment(); + bytes_index += 1; + } + + bytes + } + + #[inline(always)] + pub fn byte(&mut self, mut byte: u8, xor_bytes: &XORBytes) -> u8 { + byte ^= xor_bytes[self.0]; + self.increment(); + byte + } + + #[inline(always)] + pub fn increment(&mut self) { + self.0 += 1; + if self.0 == XOR_LEN { + self.0 = 0; + } + } + + #[inline(always)] + pub fn add_assign(&mut self, i: usize) { + self.0 = (self.0 + i) % XOR_LEN; + } +} diff --git a/crates/brk_server/Cargo.toml b/crates/brk_server/Cargo.toml index 5806aeea3..e3bca264d 100644 --- a/crates/brk_server/Cargo.toml +++ b/crates/brk_server/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "brk_server" description = "A Bitcoin data server built on top of bindexer, bricer and bomputer" -version = "0.1.0" +version = { workspace = true } edition = { workspace = true } license = { workspace = true } @@ -9,7 +9,7 @@ license = { workspace = true } axum = "0.8.1" brk_computer = { workspace = true } brk_indexer = { workspace = true } -brk_printer = { workspace = true } +brk_logger = { workspace = true } color-eyre = { workspace = true } derive_deref = { workspace = true } jiff = { workspace = true } diff --git a/crates/brk_server/src/lib.rs b/crates/brk_server/src/lib.rs index e2aca251f..8cdc6423f 100644 --- a/crates/brk_server/src/lib.rs +++ b/crates/brk_server/src/lib.rs @@ -22,7 +22,7 @@ pub struct AppState { computer: &'static Computer, } -pub const WEBSITE_DEV_PATH: &str = "../websites/kibo.money/"; +pub const WEBSITE_DEV_PATH: &str = "../../websites/kibo.money/"; pub async fn main(indexer: Indexer, computer: Computer) -> color_eyre::Result<()> { let indexer = Box::leak(Box::new(indexer)); diff --git a/crates/brk_server/src/main.rs b/crates/brk_server/src/main.rs index fde668fdc..7d3e07b7d 100644 --- a/crates/brk_server/src/main.rs +++ b/crates/brk_server/src/main.rs @@ -8,9 +8,9 @@ use storable_vec::STATELESS; pub async fn main() -> color_eyre::Result<()> { color_eyre::install()?; - brk_printer::init_log(None); + brk_logger::init(None); - let path = Path::new("../_outputs"); + let path = Path::new("../../_outputs"); let indexer: Indexer = Indexer::import(&path.join("indexes"))?; let computer: Computer = Computer::import(&path.join("computed"))?;