diff --git a/Cargo.lock b/Cargo.lock index fdd0b8591..5506bdf69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -117,9 +117,9 @@ checksum = "7330592adf847ee2e3513587b4db2db410a0d751378654e7e993d9adcbe5c795" [[package]] name = "async-compression" -version = "0.4.18" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522" +checksum = "310c9bcae737a48ef5cdee3174184e6d548b292739ede61a1f955ef76a738861" dependencies = [ "brotli", "flate2", @@ -318,9 +318,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" [[package]] name = "brk" @@ -333,6 +333,7 @@ dependencies = [ "brk_indexer", "brk_logger", "brk_parser", + "brk_query", "brk_server", "brk_vec", ] @@ -340,6 +341,17 @@ dependencies = [ [[package]] name = "brk_cli" version = "0.0.2" +dependencies = [ + "brk_computer", + "brk_core", + "brk_exit", + "brk_indexer", + "brk_logger", + "brk_parser", + "brk_query", + "brk_server", + "clap", +] [[package]] name = "brk_computer" @@ -370,7 +382,7 @@ dependencies = [ "rlimit", "serde", "serde_bytes", - "zerocopy 0.8.20", + "zerocopy 0.8.21", ] [[package]] @@ -410,7 +422,7 @@ dependencies = [ "fjall", "log", "rayon", - "zerocopy 0.8.20", + "zerocopy 0.8.21", ] [[package]] @@ -434,7 +446,16 @@ dependencies = [ "rayon", "serde", "serde_json", - "zerocopy 0.8.20", + "zerocopy 0.8.21", +] + +[[package]] +name = "brk_query" +version = "0.0.2" +dependencies = [ + "brk_computer", + "brk_indexer", + "clap", ] [[package]] @@ -443,8 +464,10 @@ version = "0.0.2" dependencies = [ "axum", "brk_computer", + "brk_exit", "brk_indexer", "brk_logger", + "brk_parser", "brk_vec", "color-eyre", "derive_deref", @@ -466,7 +489,7 @@ dependencies = [ "rayon", "serde", "serde_json", - "zerocopy 0.8.20", + "zerocopy 0.8.21", ] [[package]] @@ -528,9 +551,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.15" +version = "1.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c736e259eea577f443d5c86c304f9f4ae0295c43f3ba05c21f1d66b5f06001af" +checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c" dependencies = [ "jobserver", "libc", @@ -549,6 +572,46 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "clap" +version = "4.5.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.98", +] + +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + [[package]] name = "color-eyre" version = "0.6.3" @@ -934,6 +997,12 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hex-conservative" version = "0.2.1" @@ -2225,6 +2294,12 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d08889ec5408683408db66ad89e0e1f93dff55c73a4ccc71c427d5b277ee47e6" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "syn" version = "1.0.109" @@ -2646,11 +2721,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.20" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dde3bb8c68a8f3f1ed4ac9221aad6b10cece3e60a8e2ea54a6a2dec806d0084c" +checksum = "dcf01143b2dd5d134f11f545cf9f1431b13b749695cb33bcce051e7568f99478" dependencies = [ - "zerocopy-derive 0.8.20", + "zerocopy-derive 0.8.21", ] [[package]] @@ -2666,9 +2741,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.20" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eea57037071898bf96a6da35fd626f4f27e9cee3ead2a6c703cf09d472b2e700" +checksum = "712c8386f4f4299382c9abee219bee7084f78fb939d88b6840fcc1320d5f6da2" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index a502821a3..57867baa8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,11 @@ brk_fetcher = { version = "0", path = "crates/brk_fetcher" } brk_indexer = { version = "0", path = "crates/brk_indexer" } brk_logger = { version = "0", path = "crates/brk_logger" } brk_parser = { version = "0", path = "crates/brk_parser" } +brk_query = { version = "0", path = "crates/brk_query" } brk_server = { version = "0", path = "crates/brk_server" } -brk_vec = { version = "0", path = "crates/brk_vec", features = ["json"] } +brk_vec = { version = "0", path = "crates/brk_vec" } byteview = "0.5.4" +clap = { version = "4.5.31", features = ["derive"] } color-eyre = "0.6.3" derive_deref = "1.1.1" fjall = "2.6.7" @@ -29,4 +31,4 @@ minreq = { version = "2.13.2", features = ["https", "serde_json"] } rayon = "1.10.0" serde = { version = "1.0.218", features = ["derive"] } serde_json = { version = "1.0.139", features = ["float_roundtrip"] } -zerocopy = { version = "0.8.20", features = ["derive"] } +zerocopy = { version = "0.8.21", features = ["derive"] } diff --git a/README.md b/README.md index d22b78084..784c024fe 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,12 @@ A very fast Bitcoin Core block parser and iterator built on top of bitcoin-rust. > Status: ✅ +### `brk_query` + +A library that finds requested datasets. + +> Status: ⚠️ + ### `brk_server` A server that serves Bitcoin data and swappable front-ends, built on top of brk_indexer, brk_fetcher and brk_computer. @@ -117,6 +123,11 @@ Feel free to open an issue if you want to add another instance ## Setup +### Hardware + +- Last base model Mac mini +- External SSD + ### Requirements - At least 16 GB of RAM @@ -124,12 +135,8 @@ Feel free to open an issue if you want to add another instance - Recommended: Rated at 3 GB/s (Thunderbolt 4 speed) - A running instance of bitcoin-core - Example: `bitcoind -datadir="$HOME/.bitcoin" -blocksonly` -- Git - Unix based operating system (Mac OS or Linux) - Ubuntu users need to install `open-ssl` via `sudo apt install libssl-dev pkg-config` - - Mac OS: - - Disable Spotlight or exclude the `--kibodir` folder from it - - Don't use Time Machine or exclude the `--kibodir` folder (especially needed for local snapshots) ### Build diff --git a/crates/brk/Cargo.toml b/crates/brk/Cargo.toml index 64f9018ad..447fd88c3 100644 --- a/crates/brk/Cargo.toml +++ b/crates/brk/Cargo.toml @@ -16,6 +16,7 @@ full = [ "indexer", "logger", "parser", + "query", "server", "vec", ] @@ -26,6 +27,7 @@ fetcher = ["brk_fetcher"] indexer = ["brk_indexer"] logger = ["brk_logger"] parser = ["brk_parser"] +query = ["brk_query"] server = ["brk_server"] vec = ["brk_vec"] @@ -37,6 +39,7 @@ brk_fetcher = { workspace = true, optional = true } brk_indexer = { workspace = true, optional = true } brk_logger = { workspace = true, optional = true } brk_parser = { workspace = true, optional = true } +brk_query = { workspace = true, optional = true } brk_server = { workspace = true, optional = true } brk_vec = { workspace = true, optional = true } diff --git a/crates/brk/src/lib.rs b/crates/brk/src/lib.rs index b05a8caba..838b48b03 100644 --- a/crates/brk/src/lib.rs +++ b/crates/brk/src/lib.rs @@ -42,6 +42,12 @@ pub mod parser { pub use brk_parser::*; } +#[cfg(feature = "query")] +pub mod query { + #[doc(inline)] + pub use brk_query::*; +} + #[cfg(feature = "server")] pub mod server { #[doc(inline)] diff --git a/crates/brk_cli/Cargo.toml b/crates/brk_cli/Cargo.toml index 971174b83..679aaa109 100644 --- a/crates/brk_cli/Cargo.toml +++ b/crates/brk_cli/Cargo.toml @@ -7,3 +7,12 @@ license.workspace = true repository.workspace = true [dependencies] +brk_core = { workspace = true } +brk_computer = { workspace = true } +brk_exit = { workspace = true } +brk_indexer = { workspace = true } +brk_logger = { workspace = true } +brk_parser = { workspace = true } +brk_query = { workspace = true } +brk_server = { workspace = true } +clap = { workspace = true } diff --git a/crates/brk_cli/src/lib.rs b/crates/brk_cli/src/lib.rs deleted file mode 100644 index 4cd615a2c..000000000 --- a/crates/brk_cli/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -#![doc = include_str!("../README.md")] - diff --git a/crates/brk_cli/src/main.rs b/crates/brk_cli/src/main.rs index e7a11a969..18e4361da 100644 --- a/crates/brk_cli/src/main.rs +++ b/crates/brk_cli/src/main.rs @@ -1,3 +1,40 @@ -fn main() { - println!("Hello, world!"); +use clap::{Args, Parser, Subcommand}; + +#[derive(Parser)] +#[command(version, about)] +#[command(propagate_version = true)] +struct Cli { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + Run(RunArgs), + Query(QueryArgs), +} + +#[derive(Args)] +struct RunArgs { + name: Option, +} + +#[derive(Args)] +struct QueryArgs { + name: Option, +} + +fn main() { + let cli = Cli::parse(); + + // You can check for the existence of subcommands, and if found use their + // matches just as you would the top level cmd + match &cli.command { + Commands::Run(name) => { + println!("'myapp add' was used, name is: {:?}", name.name); + } + Commands::Query(name) => { + println!("'myapp add' was used, name is: {:?}", name.name); + } + } } diff --git a/crates/brk_computer/src/lib.rs b/crates/brk_computer/src/lib.rs index 5ccec7054..84da65cce 100644 --- a/crates/brk_computer/src/lib.rs +++ b/crates/brk_computer/src/lib.rs @@ -1,27 +1,32 @@ #![doc = include_str!("../README.md")] +#![doc = "\n## Example\n\n```rust"] +#![doc = include_str!("main.rs")] +#![doc = "```"] use std::path::{Path, PathBuf}; use brk_exit::Exit; -use brk_indexer::Indexer; +use brk_indexer::{Indexer, Indexes}; pub use brk_parser::rpc; mod storage; use brk_core::Date; -use brk_vec::SINGLE_THREAD; use storage::{Stores, Vecs}; -pub struct Computer { +#[derive(Clone)] +pub struct Computer { path: PathBuf, - pub vecs: Vecs, + pub vecs: Vecs, pub stores: Stores, } -impl Computer { +impl Computer { pub fn import(computed_dir: &Path) -> color_eyre::Result { let vecs = Vecs::import(&computed_dir.join("vecs"))?; - let stores = Stores::import(&computed_dir.join("fjall"))?; + + let stores = Stores::import(&computed_dir.join("stores"))?; + Ok(Self { path: computed_dir.to_owned(), vecs, @@ -30,8 +35,8 @@ impl Computer { } } -impl Computer { - pub fn compute(&mut self, mut indexer: Indexer, exit: &Exit) -> color_eyre::Result<()> { +impl Computer { + pub fn compute(&mut self, indexer: &mut Indexer, starting_indexes: Indexes, exit: &Exit) -> color_eyre::Result<()> { let height_count = indexer.vecs.height_to_size.len(); let txindexes_count = indexer.vecs.txindex_to_txid.len(); let txinindexes_count = indexer.vecs.txinindex_to_txoutindex.len(); @@ -39,53 +44,61 @@ impl Computer { // TODO: Remove all outdated - self.vecs.txindex_to_last_txinindex.compute_last_index_from_first( - &mut indexer.vecs.txindex_to_first_txinindex, - txinindexes_count, - exit, - )?; + // self.vecs.txindex_to_last_txinindex.compute_last_index_from_first( + // starting_indexes.txindex, + // &mut indexer.vecs.txindex_to_first_txinindex, + // txinindexes_count, + // exit, + // )?; - self.vecs.txindex_to_inputs_count.compute_count_from_indexes( - &mut indexer.vecs.txindex_to_first_txinindex, - &mut self.vecs.txindex_to_last_txinindex, - exit, - )?; + // self.vecs.txindex_to_inputs_count.compute_count_from_indexes( + // starting_indexes.txindex, + // &mut indexer.vecs.txindex_to_first_txinindex, + // &mut self.vecs.txindex_to_last_txinindex, + // exit, + // )?; - self.vecs.txindex_to_last_txoutindex.compute_last_index_from_first( - &mut indexer.vecs.txindex_to_first_txoutindex, - txoutindexes_count, - exit, - )?; + // self.vecs.txindex_to_last_txoutindex.compute_last_index_from_first( + // starting_indexes.txindex, + // &mut indexer.vecs.txindex_to_first_txoutindex, + // txoutindexes_count, + // exit, + // )?; - self.vecs.txindex_to_outputs_count.compute_count_from_indexes( - &mut indexer.vecs.txindex_to_first_txoutindex, - &mut self.vecs.txindex_to_last_txoutindex, - exit, - )?; + // self.vecs.txindex_to_outputs_count.compute_count_from_indexes( + // starting_indexes.txindex, + // &mut indexer.vecs.txindex_to_first_txoutindex, + // &mut self.vecs.txindex_to_last_txoutindex, + // exit, + // )?; self.vecs.height_to_date.compute_transform( + starting_indexes.height, &mut indexer.vecs.height_to_timestamp, |timestamp| Date::from(*timestamp), exit, )?; - self.vecs.height_to_last_txindex.compute_last_index_from_first( - &mut indexer.vecs.height_to_first_txindex, - height_count, - exit, - )?; + // self.vecs.height_to_last_txindex.compute_last_index_from_first( + // starting_indexes.height, + // &mut indexer.vecs.height_to_first_txindex, + // height_count, + // exit, + // )?; - self.vecs.txindex_to_height.compute_inverse_less_to_more( - &mut indexer.vecs.height_to_first_txindex, - &mut self.vecs.height_to_last_txindex, - exit, - )?; + // self.vecs.txindex_to_height.compute_inverse_less_to_more( + // starting_indexes.height, + // &mut indexer.vecs.height_to_first_txindex, + // &mut self.vecs.height_to_last_txindex, + // exit, + // )?; - self.vecs.txindex_to_is_coinbase.compute_is_first_ordered( - &mut self.vecs.txindex_to_height, - &mut indexer.vecs.height_to_first_txindex, - exit, - )?; + // self.vecs.txindex_to_is_coinbase.compute_is_first_ordered( + // starting_indexes.txindex, + // &mut self.vecs.txindex_to_height, + // &mut indexer.vecs.height_to_first_txindex, + // exit, + // )?; // self.vecs.txindex_to_fee.compute_transform( // &mut self.vecs.txindex_to_height, @@ -96,9 +109,9 @@ impl Computer { // self.vecs.height_to_dateindex.compute(...) - self.vecs - .dateindex_to_first_height - .compute_inverse_more_to_less(&mut self.vecs.height_to_dateindex, exit)?; + // self.vecs + // .dateindex_to_first_height + // .compute_inverse_more_to_less(&mut self.vecs.height_to_dateindex, exit)?; // --- // Date to X diff --git a/crates/brk_computer/src/main.rs b/crates/brk_computer/src/main.rs index 11d1fc1f9..a6d18e2c6 100644 --- a/crates/brk_computer/src/main.rs +++ b/crates/brk_computer/src/main.rs @@ -7,7 +7,6 @@ use brk_parser::{ Parser, rpc::{self, RpcApi}, }; -use brk_vec::CACHED_GETS; use log::info; pub fn main() -> color_eyre::Result<()> { @@ -26,25 +25,26 @@ pub fn main() -> color_eyre::Result<()> { let outputs_dir = Path::new("../../_outputs"); - let indexer: Indexer = Indexer::import(&outputs_dir.join("indexes"))?; + let mut indexer = Indexer::import(&outputs_dir.join("indexed"))?; - // let mut computer = Computer::import(&outputs_dir.join("computed"))?; + let mut computer = Computer::import(&outputs_dir.join("computed"))?; - // loop { - // let block_count = rpc.get_block_count()?; + loop { + let block_count = rpc.get_block_count()?; - // info!("{block_count} blocks found."); + info!("{block_count} blocks found."); - // indexer.index(&parser, rpc, &exit)?; + let starting_indexes = indexer.index(&parser, rpc, &exit)?; - // computer.compute(indexer, &exit)?; + computer.compute(&mut indexer, starting_indexes, &exit)?; - // info!("Waiting for new blocks..."); + info!("Waiting for new blocks..."); - // while block_count == rpc.get_block_count()? { - // sleep(Duration::from_secs(1)) - // } - // } + while block_count == rpc.get_block_count()? { + sleep(Duration::from_secs(1)) + } + } + #[allow(unreachable_code)] Ok(()) } diff --git a/crates/brk_computer/src/storage/stores.rs b/crates/brk_computer/src/storage/stores.rs index 93d0fd977..a5b835a87 100644 --- a/crates/brk_computer/src/storage/stores.rs +++ b/crates/brk_computer/src/storage/stores.rs @@ -4,6 +4,7 @@ use brk_core::{AddressindexTxoutindex, Unit}; use brk_indexer::Store; use brk_vec::Version; +#[derive(Clone)] pub struct Stores { pub address_to_utxos_received: Store, pub address_to_utxos_spent: Store, diff --git a/crates/brk_computer/src/storage/vecs.rs b/crates/brk_computer/src/storage/vecs.rs index e8d330e87..6553b633c 100644 --- a/crates/brk_computer/src/storage/vecs.rs +++ b/crates/brk_computer/src/storage/vecs.rs @@ -4,59 +4,60 @@ use brk_core::{ Addressindex, Cents, Close, Date, Dateindex, Dollars, Feerate, Height, High, Low, Open, Sats, Timestamp, Txindex, Txinindex, Txoutindex, }; -use brk_vec::{StorableVec, Version}; +use brk_vec::{AnyStorableVec, StorableVec, Version}; // mod base; // use base::*; -pub struct Vecs { - pub dateindex_to_first_height: StorableVec, - // pub dateindex_to_last_height: StorableVec, - // pub height_to_block_interval: StorableVec, - pub dateindex_to_close_in_cents: StorableVec, MODE>, - pub dateindex_to_close_in_dollars: StorableVec, MODE>, - pub dateindex_to_high_in_cents: StorableVec, MODE>, - pub dateindex_to_high_in_dollars: StorableVec, MODE>, - pub dateindex_to_low_in_cents: StorableVec, MODE>, - pub dateindex_to_low_in_dollars: StorableVec, MODE>, - pub dateindex_to_open_in_cents: StorableVec, MODE>, - pub dateindex_to_open_in_dollars: StorableVec, MODE>, - pub height_to_close_in_cents: StorableVec, MODE>, - pub height_to_close_in_dollars: StorableVec, MODE>, - pub height_to_high_in_cents: StorableVec, MODE>, - pub height_to_high_in_dollars: StorableVec, MODE>, - pub height_to_low_in_cents: StorableVec, MODE>, - pub height_to_low_in_dollars: StorableVec, MODE>, - pub height_to_open_in_cents: StorableVec, MODE>, - pub height_to_open_in_dollars: StorableVec, MODE>, - pub height_to_date: StorableVec, - pub height_to_dateindex: StorableVec, - // pub height_to_fee: StorableVec, - // pub height_to_inputcount: StorableVec, - // pub height_to_last_addressindex: StorableVec, - pub height_to_last_txindex: StorableVec, - // pub height_to_last_txoutindex: StorableVec, - // pub height_to_maxfeerate: StorableVec, - // pub height_to_medianfeerate: StorableVec, - // pub height_to_minfeerate: StorableVec, - // pub height_to_outputcount: StorableVec, - // pub height_to_subsidy: StorableVec, - // pub height_to_totalfees: StorableVec, - // pub height_to_txcount: StorableVec, - pub txindex_to_fee: StorableVec, - pub txindex_to_height: StorableVec, - pub txindex_to_is_coinbase: StorableVec, - // pub txindex_to_feerate: StorableVec, - pub txindex_to_inputs_count: StorableVec, - pub txindex_to_inputs_sum: StorableVec, - pub txindex_to_last_txinindex: StorableVec, - pub txindex_to_last_txoutindex: StorableVec, - pub txindex_to_outputs_count: StorableVec, - pub txindex_to_outputs_sum: StorableVec, +#[derive(Clone)] +pub struct Vecs { + pub dateindex_to_first_height: StorableVec, + // pub dateindex_to_last_height: StorableVec, + // pub height_to_block_interval: StorableVec, + pub dateindex_to_close_in_cents: StorableVec>, + pub dateindex_to_close_in_dollars: StorableVec>, + pub dateindex_to_high_in_cents: StorableVec>, + pub dateindex_to_high_in_dollars: StorableVec>, + pub dateindex_to_low_in_cents: StorableVec>, + pub dateindex_to_low_in_dollars: StorableVec>, + pub dateindex_to_open_in_cents: StorableVec>, + pub dateindex_to_open_in_dollars: StorableVec>, + pub height_to_close_in_cents: StorableVec>, + pub height_to_close_in_dollars: StorableVec>, + pub height_to_high_in_cents: StorableVec>, + pub height_to_high_in_dollars: StorableVec>, + pub height_to_low_in_cents: StorableVec>, + pub height_to_low_in_dollars: StorableVec>, + pub height_to_open_in_cents: StorableVec>, + pub height_to_open_in_dollars: StorableVec>, + pub height_to_date: StorableVec, + pub height_to_dateindex: StorableVec, + // pub height_to_fee: StorableVec, + // pub height_to_inputcount: StorableVec, + // pub height_to_last_addressindex: StorableVec, + pub height_to_last_txindex: StorableVec, + // pub height_to_last_txoutindex: StorableVec, + // pub height_to_maxfeerate: StorableVec, + // pub height_to_medianfeerate: StorableVec, + // pub height_to_minfeerate: StorableVec, + // pub height_to_outputcount: StorableVec, + // pub height_to_subsidy: StorableVec, + // pub height_to_totalfees: StorableVec, + // pub height_to_txcount: StorableVec, + pub txindex_to_fee: StorableVec, + pub txindex_to_height: StorableVec, + pub txindex_to_is_coinbase: StorableVec, + // pub txindex_to_feerate: StorableVec, + pub txindex_to_inputs_count: StorableVec, + pub txindex_to_inputs_sum: StorableVec, + pub txindex_to_last_txinindex: StorableVec, + pub txindex_to_last_txoutindex: StorableVec, + pub txindex_to_outputs_count: StorableVec, + pub txindex_to_outputs_sum: StorableVec, } -impl Vecs { +impl Vecs { pub fn import(path: &Path) -> color_eyre::Result { fs::create_dir_all(path)?; @@ -148,45 +149,24 @@ impl Vecs { }) } - // pub fn as_slice(&self) -> [&dyn AnyComputedStorableVec; 1] { - // [ - // &self.dateindex_to_close_in_cents as &dyn AnyJsonStorableVec, - // &self.dateindex_to_close_in_dollars, - // &self.dateindex_to_high_in_cents, - // &self.dateindex_to_high_in_dollars, - // &self.dateindex_to_low_in_cents, - // &self.dateindex_to_low_in_dollars, - // &self.dateindex_to_open_in_cents, - // &self.dateindex_to_open_in_dollars, - // &self.height_to_close_in_cents, - // &self.height_to_close_in_dollars, - // &self.height_to_high_in_cents, - // &self.height_to_high_in_dollars, - // &self.height_to_low_in_cents, - // &self.height_to_low_in_dollars, - // &self.height_to_open_in_cents, - // &self.height_to_open_in_dollars, - // ] - // } - - // pub fn as_mut_slice(&mut self) -> [&mut dyn AnyComputedStorableVec; 1] { - // [ - // &mut self.dateindex_to_close_in_cents as &mut dyn AnyStorableVec, - // &mut self.dateindex_to_close_in_dollars, - // &mut self.dateindex_to_high_in_cents, - // &mut self.dateindex_to_high_in_dollars, - // &mut self.dateindex_to_low_in_cents, - // &mut self.dateindex_to_low_in_dollars, - // &mut self.dateindex_to_open_in_cents, - // &mut self.dateindex_to_open_in_dollars, - // &mut self.height_to_close_in_cents, - // &mut self.height_to_close_in_dollars, - // &mut self.height_to_high_in_cents, - // &mut self.height_to_high_in_dollars, - // &mut self.height_to_low_in_cents, - // &mut self.height_to_low_in_dollars, - // &mut self.height_to_open_in_cents, - // &mut self.height_to_open_in_dollars, - // ] - // } + pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + vec![ + &self.height_to_date as &dyn AnyStorableVec, + // &self.dateindex_to_close_in_dollars, + // &self.dateindex_to_high_in_cents, + // &self.dateindex_to_high_in_dollars, + // &self.dateindex_to_low_in_cents, + // &self.dateindex_to_low_in_dollars, + // &self.dateindex_to_open_in_cents, + // &self.dateindex_to_open_in_dollars, + // &self.height_to_close_in_cents, + // &self.height_to_close_in_dollars, + // &self.height_to_high_in_cents, + // &self.height_to_high_in_dollars, + // &self.height_to_low_in_cents, + // &self.height_to_low_in_dollars, + // &self.height_to_open_in_cents, + // &self.height_to_open_in_dollars, + ] + } } diff --git a/crates/brk_computer/src/storage/vecs_/base.rs b/crates/brk_computer/src/storage/vecs_/base.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/brk_computer/src/storage/vecs_/mod.rs b/crates/brk_computer/src/storage/vecs_/mod.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/brk_core/src/structs/date.rs b/crates/brk_core/src/structs/date.rs index 50b606525..3a448b08b 100644 --- a/crates/brk_core/src/structs/date.rs +++ b/crates/brk_core/src/structs/date.rs @@ -1,9 +1,12 @@ use jiff::{Span, civil::Date as Date_, tz::TimeZone}; +use serde::Serialize; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; use super::{Dateindex, Timestamp}; -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[derive( + Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, Immutable, IntoBytes, KnownLayout, Serialize, +)] pub struct Date(u32); impl Date { diff --git a/crates/brk_core/src/structs/dateindex.rs b/crates/brk_core/src/structs/dateindex.rs index 45a243987..ed4e92042 100644 --- a/crates/brk_core/src/structs/dateindex.rs +++ b/crates/brk_core/src/structs/dateindex.rs @@ -1,5 +1,6 @@ use std::ops::Add; +use serde::Serialize; // use color_eyre::eyre::eyre; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; @@ -7,7 +8,9 @@ use crate::Error; use super::Date; -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[derive( + Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, Immutable, IntoBytes, KnownLayout, Serialize, +)] pub struct Dateindex(u16); impl From for usize { diff --git a/crates/brk_fetcher/src/lib.rs b/crates/brk_fetcher/src/lib.rs index cfd580a00..72aa1294a 100644 --- a/crates/brk_fetcher/src/lib.rs +++ b/crates/brk_fetcher/src/lib.rs @@ -1,4 +1,7 @@ #![doc = include_str!("../README.md")] +#![doc = "\n## Example\n\n```rust"] +#![doc = include_str!("main.rs")] +#![doc = "```"] use std::{collections::BTreeMap, fs, path::Path}; diff --git a/crates/brk_indexer/README.md b/crates/brk_indexer/README.md index 8274828cb..70e419474 100644 --- a/crates/brk_indexer/README.md +++ b/crates/brk_indexer/README.md @@ -14,7 +14,7 @@ Vecs are used sparingly instead of stores for multiple reasons: Storage wise, the expected overhead should be around 30% of the chain itself. -Peaks at 11-13 GB of RAM +Peaks at 5 GB of RAM ## Outputs @@ -27,3 +27,12 @@ Stores: `src/storage/stores/mod.rs` Rust: `src/main.rs` Python: `../python/parse.py` + +## Benchmark + +Indexing `0..885_835` took `11 hours 6 min 50 s` on a Macbook Pro M3 Pro with 36 GB of RAM + +`footprint` report: +- Peak memory: `5115 MB` +- Memory while waiting for a new block: `890 MB` +- Reclaimable memory: `6478 MB` diff --git a/crates/brk_indexer/src/indexes.rs b/crates/brk_indexer/src/indexes.rs index 44a5ae146..80f30af14 100644 --- a/crates/brk_indexer/src/indexes.rs +++ b/crates/brk_indexer/src/indexes.rs @@ -4,7 +4,6 @@ use brk_core::{ P2SHindex, P2TRindex, P2WPKHindex, P2WSHindex, Pushonlyindex, Txindex, Txinindex, Txoutindex, Unknownindex, }; use brk_parser::NUMBER_OF_UNSAFE_BLOCKS; -use brk_vec::CACHED_GETS; use color_eyre::eyre::ContextCompat; use crate::{Stores, Vecs}; @@ -31,7 +30,7 @@ pub struct Indexes { } impl Indexes { - pub fn push_if_needed(&self, vecs: &mut Vecs) -> brk_vec::Result<()> { + pub fn push_if_needed(&self, vecs: &mut Vecs) -> brk_vec::Result<()> { let height = self.height; vecs.height_to_first_txindex.push_if_needed(height, self.txindex)?; vecs.height_to_first_txinindex.push_if_needed(height, self.txinindex)?; @@ -64,7 +63,7 @@ impl Indexes { Ok(()) } - pub fn push_future_if_needed(&mut self, vecs: &mut Vecs) -> brk_vec::Result<()> { + pub fn push_future_if_needed(&mut self, vecs: &mut Vecs) -> brk_vec::Result<()> { self.height.increment(); self.push_if_needed(vecs)?; self.height.decrement(); @@ -72,9 +71,9 @@ impl Indexes { } } -impl TryFrom<(&mut Vecs, &Stores, &Client)> for Indexes { +impl TryFrom<(&mut Vecs, &Stores, &Client)> for Indexes { type Error = color_eyre::Report; - fn try_from((vecs, stores, rpc): (&mut Vecs, &Stores, &Client)) -> color_eyre::Result { + fn try_from((vecs, stores, rpc): (&mut Vecs, &Stores, &Client)) -> color_eyre::Result { // Height at which we wanna start: min last saved + 1 or 0 let starting_height = vecs.starting_height().min(stores.starting_height()); diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index b9e0e6f23..57740ca24 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -1,4 +1,7 @@ #![doc = include_str!("../README.md")] +#![doc = "\n## Example\n\n```rust"] +#![doc = include_str!("main.rs")] +#![doc = "```"] use std::{ collections::BTreeMap, @@ -15,7 +18,6 @@ pub use brk_parser::*; use bitcoin::{Transaction, TxIn, TxOut}; use brk_exit::Exit; -use brk_vec::CACHED_GETS; use color_eyre::eyre::{ContextCompat, eyre}; use log::info; use rayon::prelude::*; @@ -29,12 +31,13 @@ pub use vecs::*; const SNAPSHOT_BLOCK_RANGE: usize = 1000; -pub struct Indexer { - pub vecs: Vecs, +#[derive(Clone)] +pub struct Indexer { + pub vecs: Vecs, pub stores: Stores, } -impl Indexer { +impl Indexer { pub fn import(indexes_dir: &Path) -> color_eyre::Result { setrlimit()?; @@ -42,13 +45,11 @@ impl Indexer { let vecs = Vecs::import(&indexes_dir.join("vecs"))?; - let stores = Stores::import(&indexes_dir.join("fjall"))?; + let stores = Stores::import(&indexes_dir.join("stores"))?; Ok(Self { vecs, stores }) } -} -impl Indexer { pub fn index(&mut self, parser: &Parser, rpc: &'static rpc::Client, exit: &Exit) -> color_eyre::Result { let check_collisions = true; @@ -63,35 +64,35 @@ impl Indexer { self.vecs.rollback_if_needed(&starting_indexes)?; exit.release(); - let export_if_needed = |stores: &mut Stores, - vecs: &mut Vecs, - height: Height, - rem: bool, - exit: &Exit| - -> color_eyre::Result<()> { - if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem || exit.triggered() { - return Ok(()); - } + let export_if_needed = + |stores: &mut Stores, vecs: &mut Vecs, height: Height, rem: bool, exit: &Exit| -> color_eyre::Result<()> { + if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem || exit.triggered() { + return Ok(()); + } - info!("Exporting..."); - exit.block(); - stores.commit(height)?; - vecs.flush(height)?; - exit.release(); - Ok(()) - }; + info!("Exporting..."); + exit.block(); + stores.commit(height)?; + vecs.flush(height)?; + exit.release(); + Ok(()) + }; let vecs = &mut self.vecs; let stores = &mut self.stores; - if starting_indexes.height > Height::try_from(rpc)? { + let mut idxs = starting_indexes.clone(); + + let start = Some(idxs.height); + let end = None; //Some(Height::new(400_000)); + + if starting_indexes.height > Height::try_from(rpc)? || end.is_some_and(|end| starting_indexes.height > end) { return Ok(starting_indexes); } info!("Started indexing..."); - let mut idxs = starting_indexes.clone(); - parser.parse(Some(idxs.height), None).iter().try_for_each( + parser.parse(start, None).iter().try_for_each( |(height, block, blockhash)| -> color_eyre::Result<()> { info!("Indexing block {height}..."); @@ -634,6 +635,8 @@ impl Indexer { export_if_needed(stores, vecs, idxs.height, true, exit)?; + stores.rotate_memtables(); + Ok(starting_indexes) } } diff --git a/crates/brk_indexer/src/main.rs b/crates/brk_indexer/src/main.rs index 1f6cacea5..6b49d9117 100644 --- a/crates/brk_indexer/src/main.rs +++ b/crates/brk_indexer/src/main.rs @@ -6,7 +6,6 @@ use brk_parser::{ Parser, rpc::{self}, }; -use brk_vec::CACHED_GETS; use log::info; fn main() -> color_eyre::Result<()> { @@ -23,13 +22,13 @@ fn main() -> color_eyre::Result<()> { let parser = Parser::new(data_dir, rpc); - let mut indexer: Indexer = Indexer::import(Path::new("../../_outputs/indexes"))?; - loop { let block_count = rpc.get_block_count()?; info!("{block_count} blocks found."); + let mut indexer = Indexer::import(Path::new("../../_outputs/indexed"))?; + indexer.index(&parser, rpc, &exit)?; info!("Waiting for new blocks..."); diff --git a/crates/brk_indexer/src/stores/base.rs b/crates/brk_indexer/src/stores/base.rs index e9de1f1f6..84ce10230 100644 --- a/crates/brk_indexer/src/stores/base.rs +++ b/crates/brk_indexer/src/stores/base.rs @@ -136,6 +136,10 @@ where Ok(()) } + pub fn rotate_memtable(&self) { + let _ = self.part.inner().rotate_memtable(); + } + pub fn height(&self) -> Option { self.meta.height() } @@ -155,7 +159,9 @@ where } fn open_keyspace(path: &Path) -> Result { - fjall::Config::new(path.join("fjall")).open_transactional() + fjall::Config::new(path.join("fjall")) + .max_write_buffer_size(32 * 1024 * 1024) + .open_transactional() } fn open_partition_handle(keyspace: &TransactionalKeyspace) -> Result { @@ -163,6 +169,7 @@ where "partition", PartitionCreateOptions::default() .bloom_filter_bits(Some(5)) + .max_memtable_size(8 * 1024 * 1024) .manual_journal_persist(true), ) } diff --git a/crates/brk_indexer/src/stores/mod.rs b/crates/brk_indexer/src/stores/mod.rs index 235744e70..653065117 100644 --- a/crates/brk_indexer/src/stores/mod.rs +++ b/crates/brk_indexer/src/stores/mod.rs @@ -1,7 +1,7 @@ use std::{path::Path, thread}; use brk_core::{AddressHash, Addressbytes, Addressindex, Addresstype, BlockHashPrefix, Height, TxidPrefix, Txindex}; -use brk_vec::{CACHED_GETS, Value, Version}; +use brk_vec::{Value, Version}; use crate::Indexes; @@ -38,14 +38,9 @@ impl Stores { }) } - pub fn rollback_if_needed( - &mut self, - vecs: &Vecs, - starting_indexes: &Indexes, - ) -> color_eyre::Result<()> { + pub fn rollback_if_needed(&mut self, vecs: &Vecs, starting_indexes: &Indexes) -> color_eyre::Result<()> { vecs.height_to_blockhash .iter_from(starting_indexes.height, |(_, blockhash)| { - let blockhash = blockhash.as_ref(); let blockhash_prefix = BlockHashPrefix::from(blockhash); self.blockhash_prefix_to_height.remove(blockhash_prefix); Ok(()) @@ -53,7 +48,6 @@ impl Stores { vecs.txindex_to_txid .iter_from(starting_indexes.txindex, |(_txindex, txid)| { - let txid = txid.as_ref(); let txid_prefix = TxidPrefix::from(txid); self.txid_prefix_to_txindex.remove(txid_prefix); Ok(()) @@ -173,4 +167,10 @@ impl Stores { Ok(()) }) } + + pub fn rotate_memtables(&self) { + self.addresshash_to_addressindex.rotate_memtable(); + self.blockhash_prefix_to_height.rotate_memtable(); + self.txid_prefix_to_txindex.rotate_memtable(); + } } diff --git a/crates/brk_indexer/src/vecs/base.rs b/crates/brk_indexer/src/vecs/base.rs index f205c014b..449ea4210 100644 --- a/crates/brk_indexer/src/vecs/base.rs +++ b/crates/brk_indexer/src/vecs/base.rs @@ -5,25 +5,29 @@ use std::{ path::{Path, PathBuf}, }; -use brk_vec::{CACHED_GETS, StoredIndex, StoredType, Version}; +use brk_vec::{StoredIndex, StoredType, Version}; use super::Height; #[derive(Debug)] -pub struct StorableVec { +pub struct StorableVec { height: Option, - vec: brk_vec::StorableVec, + vec: brk_vec::StorableVec, } -impl StorableVec +impl StorableVec where I: StoredIndex, T: StoredType, { pub fn import(path: &Path, version: Version) -> brk_vec::Result { + let mut vec = brk_vec::StorableVec::forced_import(path, version)?; + + vec.reset_mmaps()?; + Ok(Self { height: Height::try_from(Self::path_height_(path).as_path()).ok(), - vec: brk_vec::StorableVec::forced_import(path, version)?, + vec, }) } @@ -43,37 +47,44 @@ where fn path_height_(path: &Path) -> PathBuf { path.join("height") } -} -impl StorableVec -where - I: StoredIndex, - T: StoredType, -{ pub fn flush(&mut self, height: Height) -> io::Result<()> { height.write(&self.path_height())?; - self.vec.flush() + self.vec.flush()?; + self.vec.reset_mmaps() } } -impl Deref for StorableVec { - type Target = brk_vec::StorableVec; +impl Deref for StorableVec { + type Target = brk_vec::StorableVec; fn deref(&self) -> &Self::Target { &self.vec } } -impl DerefMut for StorableVec { +impl DerefMut for StorableVec { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.vec } } +impl Clone for StorableVec +where + I: StoredIndex, + T: StoredType, +{ + fn clone(&self) -> Self { + Self { + height: self.height, + vec: self.vec.clone(), + } + } +} -pub trait AnyStorableVec: Send + Sync { +pub trait AnyIndexedVec: Send + Sync { fn height(&self) -> brk_core::Result; fn flush(&mut self, height: Height) -> io::Result<()>; } -impl AnyStorableVec for StorableVec +impl AnyIndexedVec for StorableVec where I: StoredIndex, T: StoredType, diff --git a/crates/brk_indexer/src/vecs/mod.rs b/crates/brk_indexer/src/vecs/mod.rs index 7dda25568..ab1aedcfd 100644 --- a/crates/brk_indexer/src/vecs/mod.rs +++ b/crates/brk_indexer/src/vecs/mod.rs @@ -6,7 +6,7 @@ use brk_core::{ P2SHAddressBytes, P2SHindex, P2TRAddressBytes, P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, Sats, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight, }; -use brk_vec::{AnyJsonStorableVec, CACHED_GETS, Version}; +use brk_vec::{AnyStorableVec, Version}; use rayon::prelude::*; use crate::Indexes; @@ -15,53 +15,54 @@ mod base; pub use base::*; -pub struct Vecs { - pub addressindex_to_addresstype: StorableVec, - pub addressindex_to_addresstypeindex: StorableVec, - pub addressindex_to_height: StorableVec, - pub height_to_blockhash: StorableVec, - pub height_to_difficulty: StorableVec, - pub height_to_first_addressindex: StorableVec, - pub height_to_first_emptyindex: StorableVec, - pub height_to_first_multisigindex: StorableVec, - pub height_to_first_opreturnindex: StorableVec, - pub height_to_first_pushonlyindex: StorableVec, - pub height_to_first_txindex: StorableVec, - pub height_to_first_txinindex: StorableVec, - pub height_to_first_txoutindex: StorableVec, - pub height_to_first_unknownindex: StorableVec, - pub height_to_first_p2pk33index: StorableVec, - pub height_to_first_p2pk65index: StorableVec, - pub height_to_first_p2pkhindex: StorableVec, - pub height_to_first_p2shindex: StorableVec, - pub height_to_first_p2trindex: StorableVec, - pub height_to_first_p2wpkhindex: StorableVec, - pub height_to_first_p2wshindex: StorableVec, - pub height_to_size: StorableVec, - pub height_to_timestamp: StorableVec, - pub height_to_weight: StorableVec, - pub p2pk33index_to_p2pk33addressbytes: StorableVec, - pub p2pk65index_to_p2pk65addressbytes: StorableVec, - pub p2pkhindex_to_p2pkhaddressbytes: StorableVec, - pub p2shindex_to_p2shaddressbytes: StorableVec, - pub p2trindex_to_p2traddressbytes: StorableVec, - pub p2wpkhindex_to_p2wpkhaddressbytes: StorableVec, - pub p2wshindex_to_p2wshaddressbytes: StorableVec, - pub txindex_to_first_txinindex: StorableVec, - pub txindex_to_first_txoutindex: StorableVec, - pub txindex_to_height: StorableVec, - pub txindex_to_locktime: StorableVec, - pub txindex_to_txid: StorableVec, - pub txindex_to_base_size: StorableVec, - pub txindex_to_total_size: StorableVec, - pub txindex_to_is_explicitly_rbf: StorableVec, - pub txindex_to_txversion: StorableVec, - pub txinindex_to_txoutindex: StorableVec, - pub txoutindex_to_addressindex: StorableVec, - pub txoutindex_to_value: StorableVec, +#[derive(Clone)] +pub struct Vecs { + pub addressindex_to_addresstype: StorableVec, + pub addressindex_to_addresstypeindex: StorableVec, + pub addressindex_to_height: StorableVec, + pub height_to_blockhash: StorableVec, + pub height_to_difficulty: StorableVec, + pub height_to_first_addressindex: StorableVec, + pub height_to_first_emptyindex: StorableVec, + pub height_to_first_multisigindex: StorableVec, + pub height_to_first_opreturnindex: StorableVec, + pub height_to_first_pushonlyindex: StorableVec, + pub height_to_first_txindex: StorableVec, + pub height_to_first_txinindex: StorableVec, + pub height_to_first_txoutindex: StorableVec, + pub height_to_first_unknownindex: StorableVec, + pub height_to_first_p2pk33index: StorableVec, + pub height_to_first_p2pk65index: StorableVec, + pub height_to_first_p2pkhindex: StorableVec, + pub height_to_first_p2shindex: StorableVec, + pub height_to_first_p2trindex: StorableVec, + pub height_to_first_p2wpkhindex: StorableVec, + pub height_to_first_p2wshindex: StorableVec, + pub height_to_size: StorableVec, + pub height_to_timestamp: StorableVec, + pub height_to_weight: StorableVec, + pub p2pk33index_to_p2pk33addressbytes: StorableVec, + pub p2pk65index_to_p2pk65addressbytes: StorableVec, + pub p2pkhindex_to_p2pkhaddressbytes: StorableVec, + pub p2shindex_to_p2shaddressbytes: StorableVec, + pub p2trindex_to_p2traddressbytes: StorableVec, + pub p2wpkhindex_to_p2wpkhaddressbytes: StorableVec, + pub p2wshindex_to_p2wshaddressbytes: StorableVec, + pub txindex_to_first_txinindex: StorableVec, + pub txindex_to_first_txoutindex: StorableVec, + pub txindex_to_height: StorableVec, + pub txindex_to_locktime: StorableVec, + pub txindex_to_txid: StorableVec, + pub txindex_to_base_size: StorableVec, + pub txindex_to_total_size: StorableVec, + pub txindex_to_is_explicitly_rbf: StorableVec, + pub txindex_to_txversion: StorableVec, + pub txinindex_to_txoutindex: StorableVec, + pub txoutindex_to_addressindex: StorableVec, + pub txoutindex_to_value: StorableVec, } -impl Vecs { +impl Vecs { pub fn import(path: &Path) -> color_eyre::Result { fs::create_dir_all(path)?; @@ -293,56 +294,6 @@ impl Vecs { Ok(()) } - pub fn as_any_json_vecs(&self) -> Vec<&dyn AnyJsonStorableVec> { - vec![ - &*self.addressindex_to_addresstype as &dyn AnyJsonStorableVec, - &*self.addressindex_to_addresstypeindex, - &*self.addressindex_to_height, - &*self.height_to_blockhash, - &*self.height_to_difficulty, - &*self.height_to_first_addressindex, - &*self.height_to_first_emptyindex, - &*self.height_to_first_multisigindex, - &*self.height_to_first_opreturnindex, - &*self.height_to_first_pushonlyindex, - &*self.height_to_first_txindex, - &*self.height_to_first_txinindex, - &*self.height_to_first_txoutindex, - &*self.height_to_first_unknownindex, - &*self.height_to_first_p2pk33index, - &*self.height_to_first_p2pk65index, - &*self.height_to_first_p2pkhindex, - &*self.height_to_first_p2shindex, - &*self.height_to_first_p2trindex, - &*self.height_to_first_p2wpkhindex, - &*self.height_to_first_p2wshindex, - &*self.height_to_size, - &*self.height_to_timestamp, - &*self.height_to_weight, - &*self.p2pk33index_to_p2pk33addressbytes, - &*self.p2pk65index_to_p2pk65addressbytes, - &*self.p2pkhindex_to_p2pkhaddressbytes, - &*self.p2shindex_to_p2shaddressbytes, - &*self.p2trindex_to_p2traddressbytes, - &*self.p2wpkhindex_to_p2wpkhaddressbytes, - &*self.p2wshindex_to_p2wshaddressbytes, - &*self.txindex_to_first_txinindex, - &*self.txindex_to_first_txoutindex, - &*self.txindex_to_height, - &*self.txindex_to_locktime, - &*self.txindex_to_txid, - &*self.txindex_to_base_size, - &*self.txindex_to_total_size, - &*self.txindex_to_is_explicitly_rbf, - &*self.txindex_to_txversion, - &*self.txinindex_to_txoutindex, - &*self.txoutindex_to_addressindex, - &*self.txoutindex_to_value, - ] - } -} - -impl Vecs { pub fn get_addressbytes( &self, addresstype: Addresstype, @@ -424,9 +375,57 @@ impl Vecs { .unwrap() } - fn as_mut_any_vecs(&mut self) -> Vec<&mut dyn AnyStorableVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { vec![ - &mut self.addressindex_to_addresstype as &mut dyn AnyStorableVec, + &*self.addressindex_to_addresstype as &dyn AnyStorableVec, + &*self.addressindex_to_addresstypeindex, + &*self.addressindex_to_height, + &*self.height_to_blockhash, + &*self.height_to_difficulty, + &*self.height_to_first_addressindex, + &*self.height_to_first_emptyindex, + &*self.height_to_first_multisigindex, + &*self.height_to_first_opreturnindex, + &*self.height_to_first_pushonlyindex, + &*self.height_to_first_txindex, + &*self.height_to_first_txinindex, + &*self.height_to_first_txoutindex, + &*self.height_to_first_unknownindex, + &*self.height_to_first_p2pk33index, + &*self.height_to_first_p2pk65index, + &*self.height_to_first_p2pkhindex, + &*self.height_to_first_p2shindex, + &*self.height_to_first_p2trindex, + &*self.height_to_first_p2wpkhindex, + &*self.height_to_first_p2wshindex, + &*self.height_to_size, + &*self.height_to_timestamp, + &*self.height_to_weight, + &*self.p2pk33index_to_p2pk33addressbytes, + &*self.p2pk65index_to_p2pk65addressbytes, + &*self.p2pkhindex_to_p2pkhaddressbytes, + &*self.p2shindex_to_p2shaddressbytes, + &*self.p2trindex_to_p2traddressbytes, + &*self.p2wpkhindex_to_p2wpkhaddressbytes, + &*self.p2wshindex_to_p2wshaddressbytes, + &*self.txindex_to_first_txinindex, + &*self.txindex_to_first_txoutindex, + &*self.txindex_to_height, + &*self.txindex_to_locktime, + &*self.txindex_to_txid, + &*self.txindex_to_base_size, + &*self.txindex_to_total_size, + &*self.txindex_to_is_explicitly_rbf, + &*self.txindex_to_txversion, + &*self.txinindex_to_txoutindex, + &*self.txoutindex_to_addressindex, + &*self.txoutindex_to_value, + ] + } + + fn as_mut_any_vecs(&mut self) -> Vec<&mut dyn AnyIndexedVec> { + vec![ + &mut self.addressindex_to_addresstype as &mut dyn AnyIndexedVec, &mut self.addressindex_to_addresstypeindex, &mut self.addressindex_to_height, &mut self.height_to_blockhash, diff --git a/crates/brk_logger/src/lib.rs b/crates/brk_logger/src/lib.rs index 9bd9ef90f..634b19dca 100644 --- a/crates/brk_logger/src/lib.rs +++ b/crates/brk_logger/src/lib.rs @@ -1,4 +1,7 @@ #![doc = include_str!("../README.md")] +#![doc = "\n## Example\n\n```rust"] +#![doc = include_str!("main.rs")] +#![doc = "```"] use std::{ fmt::Display, diff --git a/crates/brk_parser/src/lib.rs b/crates/brk_parser/src/lib.rs index eda497f84..a9b58cee1 100644 --- a/crates/brk_parser/src/lib.rs +++ b/crates/brk_parser/src/lib.rs @@ -1,4 +1,7 @@ #![doc = include_str!("../README.md")] +#![doc = "\n## Example\n\n```rust"] +#![doc = include_str!("main.rs")] +#![doc = "```"] use std::{ cmp::Ordering, diff --git a/crates/brk_query/Cargo.toml b/crates/brk_query/Cargo.toml new file mode 100644 index 000000000..d380e86fd --- /dev/null +++ b/crates/brk_query/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "brk_query" +description = "A library that finds requested datasets" +license.workspace = true +edition.workspace = true +version.workspace = true +repository.workspace = true + +[dependencies] +brk_computer = { workspace = true } +brk_indexer = { workspace = true } +clap = { workspace = true } diff --git a/crates/brk_query/README.md b/crates/brk_query/README.md new file mode 100644 index 000000000..e66becfcd --- /dev/null +++ b/crates/brk_query/README.md @@ -0,0 +1 @@ +# BRK Query diff --git a/crates/brk_query/src/lib.rs b/crates/brk_query/src/lib.rs new file mode 100644 index 000000000..6c9eec308 --- /dev/null +++ b/crates/brk_query/src/lib.rs @@ -0,0 +1,4 @@ +#![doc = include_str!("../README.md")] +#![doc = "\n## Example\n\n```rust"] +#![doc = include_str!("main.rs")] +#![doc = "```"] diff --git a/crates/brk/src/main.rs b/crates/brk_query/src/main.rs similarity index 100% rename from crates/brk/src/main.rs rename to crates/brk_query/src/main.rs diff --git a/crates/brk_server/Cargo.toml b/crates/brk_server/Cargo.toml index a6c09ba3d..9d42eb6a9 100644 --- a/crates/brk_server/Cargo.toml +++ b/crates/brk_server/Cargo.toml @@ -9,8 +9,10 @@ repository.workspace = true [dependencies] axum = "0.8.1" brk_computer = { workspace = true } +brk_exit = { workspace = true } brk_indexer = { workspace = true } brk_logger = { workspace = true } +brk_parser = { workspace = true } brk_vec = { workspace = true } color-eyre = { workspace = true } derive_deref = { workspace = true } diff --git a/crates/brk_server/src/api/vecs/tree.rs b/crates/brk_server/src/api/vecs/tree.rs index 3e08950bc..22c6c5b4d 100644 --- a/crates/brk_server/src/api/vecs/tree.rs +++ b/crates/brk_server/src/api/vecs/tree.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeMap, fs, io}; -use brk_vec::AnyJsonStorableVec; +use brk_vec::AnyStorableVec; use derive_deref::{Deref, DerefMut}; use crate::WEBSITE_DEV_PATH; @@ -12,7 +12,7 @@ pub struct VecIdToIndexToVec(BTreeMap); impl VecIdToIndexToVec { // Not the most performant or type safe but only built once so that's okay - pub fn insert(&mut self, vec: &'static dyn AnyJsonStorableVec) { + pub fn insert(&mut self, vec: &'static dyn AnyStorableVec) { let file_name = vec.file_name(); let split = file_name.split("_to_").collect::>(); if split.len() != 2 { @@ -75,4 +75,4 @@ impl VecIdToIndexToVec { } #[derive(Default, Deref, DerefMut)] -pub struct IndexToVec(BTreeMap); +pub struct IndexToVec(BTreeMap); diff --git a/crates/brk_server/src/lib.rs b/crates/brk_server/src/lib.rs index a40e5492e..2b2cef39c 100644 --- a/crates/brk_server/src/lib.rs +++ b/crates/brk_server/src/lib.rs @@ -1,4 +1,7 @@ #![doc = include_str!("../README.md")] +#![doc = "\n## Example\n\n```rust"] +#![doc = include_str!("main.rs")] +#![doc = "```"] use std::time::Instant; @@ -6,7 +9,6 @@ use api::{ApiRoutes, VecIdToIndexToVec}; use axum::{Json, Router, http::StatusCode, routing::get, serve}; use brk_computer::Computer; use brk_indexer::Indexer; -use brk_vec::STATELESS; use color_eyre::owo_colors::OwoColorize; use files::FilesRoutes; use log::{error, info}; @@ -20,22 +22,19 @@ mod traits; #[derive(Clone)] pub struct AppState { vecs: &'static VecIdToIndexToVec, - indexer: &'static Indexer, - computer: &'static Computer, + indexer: &'static Indexer, + computer: &'static Computer, } pub const WEBSITE_DEV_PATH: &str = "../../websites/kibo.money/"; -pub async fn main(indexer: Indexer, computer: Computer) -> color_eyre::Result<()> { +pub async fn main(indexer: Indexer, computer: Computer) -> color_eyre::Result<()> { let indexer = Box::leak(Box::new(indexer)); let computer = Box::leak(Box::new(computer)); let vecs = Box::leak(Box::new(VecIdToIndexToVec::default())); - indexer - .vecs - .as_any_json_vecs() - .into_iter() - .for_each(|vec| vecs.insert(vec)); + indexer.vecs.as_any_vecs().into_iter().for_each(|vec| vecs.insert(vec)); + computer.vecs.as_any_vecs().into_iter().for_each(|vec| vecs.insert(vec)); vecs.generate_dts_file()?; diff --git a/crates/brk_server/src/main.rs b/crates/brk_server/src/main.rs index e21b3a6bf..c00d456fe 100644 --- a/crates/brk_server/src/main.rs +++ b/crates/brk_server/src/main.rs @@ -1,20 +1,62 @@ -use std::path::Path; +use std::{path::Path, thread::sleep, time::Duration}; use brk_computer::Computer; +use brk_exit::Exit; use brk_indexer::Indexer; -use brk_vec::STATELESS; +use brk_parser::{ + Parser, + rpc::{self, RpcApi}, +}; +use log::info; -#[tokio::main] -pub async fn main() -> color_eyre::Result<()> { +pub fn main() -> color_eyre::Result<()> { color_eyre::install()?; - brk_logger::init(None); + brk_logger::init(Some(Path::new(".log"))); - let path = Path::new("../../_outputs"); - let indexer: Indexer = Indexer::import(&path.join("indexes"))?; - let computer: Computer = Computer::import(&path.join("computed"))?; + let data_dir = Path::new("../../../bitcoin"); + let rpc = Box::leak(Box::new(rpc::Client::new( + "http://localhost:8332", + rpc::Auth::CookieFile(Path::new(data_dir).join(".cookie")), + )?)); + let exit = Exit::new(); - brk_server::main(indexer, computer).await.unwrap(); + let parser = Parser::new(data_dir, rpc); - Ok(()) + let outputs_dir = Path::new("../../_outputs"); + + let mut indexer = Indexer::import(&outputs_dir.join("indexed"))?; + + let mut computer = Computer::import(&outputs_dir.join("computed"))?; + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()? + .block_on(async { + let served_indexer = indexer.clone(); + let served_computer = computer.clone(); + + tokio::spawn(async move { + brk_server::main(served_indexer, served_computer).await.unwrap(); + }); + + loop { + let block_count = rpc.get_block_count()?; + + info!("{block_count} blocks found."); + + let starting_indexes = indexer.index(&parser, rpc, &exit)?; + + computer.compute(&mut indexer, starting_indexes, &exit)?; + + info!("Waiting for new blocks..."); + + while block_count == rpc.get_block_count()? { + sleep(Duration::from_secs(1)) + } + } + + #[allow(unreachable_code)] + Ok(()) + }) as color_eyre::Result<()> } diff --git a/crates/brk_vec/Cargo.toml b/crates/brk_vec/Cargo.toml index d84a753ea..890c1f7b2 100644 --- a/crates/brk_vec/Cargo.toml +++ b/crates/brk_vec/Cargo.toml @@ -8,13 +8,10 @@ edition.workspace = true license.workspace = true repository.workspace = true -[features] -json = ["serde", "serde_json"] - [dependencies] brk_exit = { workspace = true } memmap2 = "0.9.5" rayon = { workspace = true } -serde = { workspace = true, optional = true } -serde_json = { workspace = true, optional = true } +serde = { workspace = true } +serde_json = { workspace = true } zerocopy = { workspace = true } diff --git a/crates/brk_vec/src/lib.rs b/crates/brk_vec/src/lib.rs index 7ef221fa5..4fae959a7 100644 --- a/crates/brk_vec/src/lib.rs +++ b/crates/brk_vec/src/lib.rs @@ -1,4 +1,7 @@ #![doc = include_str!("../README.md")] +#![doc = "\n## Example\n\n```rust"] +#![doc = include_str!("main.rs")] +#![doc = "```"] use std::{ cmp::Ordering, @@ -26,23 +29,6 @@ pub use enums::*; pub use structs::*; pub use traits::*; -type Buffer = Vec; - -/// Uses `Mmap` instead of `File` -/// -/// Used in `/indexer` -pub const CACHED_GETS: u8 = 0; - -/// Will use the same `File` for every read, so not thread safe -/// -/// Used in `/computer` -pub const SINGLE_THREAD: u8 = 1; - -/// Will spin up a new `File` for every read -/// -/// Used in `/server` -pub const STATELESS: u8 = 2; - /// /// A very small, fast, efficient and simple storable Vec /// @@ -55,16 +41,15 @@ pub const STATELESS: u8 = 2; /// If you don't call `.flush()` it just acts as a normal Vec /// #[derive(Debug)] -pub struct StorableVec { +pub struct StorableVec { version: Version, pathbuf: PathBuf, file: File, /// **Number of values NOT number of bytes** file_len: usize, file_position: u64, - buf: Buffer, - /// Only for CACHED_GETS - cache: Vec>>, // Boxed Mmap to reduce the size of the Lock (from 24 to 16) + buf: Vec, + mmaps: Vec>>, // Boxed Mmap to reduce the size of the Lock (from 24 to 16) pushed: Vec, // updated: BTreeMap, // inserted: BTreeMap, @@ -76,11 +61,12 @@ pub struct StorableVec { /// In bytes const MAX_PAGE_SIZE: usize = 4 * 4096; -const ONE_MB: usize = 1000 * 1024; +const ONE_MB: usize = 1024 * 1024; // const MAX_CACHE_SIZE: usize = usize::MAX; const MAX_CACHE_SIZE: usize = 100 * ONE_MB; +const FLUSH_EVERY: usize = 10_000; -impl StorableVec +impl StorableVec where I: StoredIndex, T: StoredType, @@ -106,11 +92,9 @@ where pub fn import(path: &Path, version: Version) -> Result { fs::create_dir_all(path)?; - if MODE != STATELESS { - let path = Self::path_version_(path); - version.validate(path.as_ref())?; - version.write(path.as_ref())?; - } + let version_path = Self::path_version_(path); + version.validate(version_path.as_ref())?; + version.write(version_path.as_ref())?; let file = Self::open_file_(&Self::path_vec_(path))?; @@ -121,7 +105,7 @@ where file_len: Self::read_disk_len_(&file)?, file, buf: Self::create_buffer(), - cache: vec![], + mmaps: vec![], pushed: vec![], // updated: BTreeMap::new(), // inserted: BTreeMap::new(), @@ -131,13 +115,13 @@ where // opened_mmaps: AtomicUsize::new(0), }; - slf.reset_disk_related_state()?; + slf.reset_file_metadata()?; Ok(slf) } #[inline] - fn create_buffer() -> Buffer { + fn create_buffer() -> Vec { vec![0; Self::SIZE_OF_T] } @@ -153,9 +137,12 @@ where .open(path) } - fn open_file_at_then_read(&self, index: usize) -> Result { + pub fn open_then_read(&self, index: I) -> Result { + self.open_then_read_(Self::i_to_usize(index)?) + } + fn open_then_read_(&self, index: usize) -> Result { let mut file = self.open_file()?; - Self::seek(&mut file, Self::index_to_byte_index(index))?; + Self::seek_(&mut file, Self::index_to_byte_index(index))?; let mut buf = Self::create_buffer(); Self::read_exact(&mut file, &mut buf).map(|v| v.to_owned()) } @@ -167,35 +154,33 @@ where Ok(Self::byte_index_to_index(file.metadata()?.len() as usize)) } - fn reset_disk_related_state(&mut self) -> io::Result<()> { + fn reset_file_metadata(&mut self) -> io::Result<()> { self.file_len = self.read_disk_len()?; - self.file_position = 0; - self.reset_cache() + self.file_position = self.file.seek(SeekFrom::Start(0))?; + Ok(()) } - fn reset_cache(&mut self) -> io::Result<()> { - match MODE { - CACHED_GETS => { - self.cache.par_iter_mut().for_each(|lock| { - lock.take(); - }); + pub fn reset_mmaps(&mut self) -> io::Result<()> { + self.mmaps.par_iter_mut().for_each(|lock| { + lock.take(); + }); - let len = (self.file_len as f64 / Self::PER_PAGE as f64).ceil() as usize; - let len = Self::CACHE_LENGTH.min(len); + let len = (self.file_len as f64 / Self::PER_PAGE as f64).ceil() as usize; + let len = Self::CACHE_LENGTH.min(len); - if self.cache.len() != len { - self.cache.resize_with(len, Default::default); - // self.cache.shrink_to_fit(); - } - - Ok(()) - } - _ => Ok(()), + if self.mmaps.len() != len { + self.mmaps.resize_with(len, Default::default); } + + Ok(()) } #[inline] - fn seek(file: &mut File, byte_index: u64) -> io::Result { + fn seek(&mut self, byte_index: u64) -> io::Result { + self.file.seek(SeekFrom::Start(byte_index)) + } + #[inline] + fn seek_(file: &mut File, byte_index: u64) -> io::Result { file.seek(SeekFrom::Start(byte_index)) } @@ -206,12 +191,168 @@ where } #[inline] - fn push_(&mut self, value: T) { + pub fn get(&self, index: I) -> Result>> { + self.get_(Self::i_to_usize(index)?) + } + fn get_(&self, index: usize) -> Result>> { + match self.index_to_pushed_index(index) { + Ok(index) => { + if let Some(index) = index { + return Ok(self.pushed.get(index).map(|v| Value::Ref(v))); + } + } + Err(Error::IndexTooHigh) => return Ok(None), + Err(Error::IndexTooLow) => {} + Err(error) => return Err(error), + } + + // if !self.updated.is_empty() { + // if let Some(v) = self.updated.get(&index) { + // return Ok(Some(v)); + // } + // } + + let page_index = index / Self::PER_PAGE; + let last_index = self.file_len - 1; + let max_page_index = last_index / Self::PER_PAGE; + let min_page_index = (max_page_index + 1) - self.mmaps.len(); + + // let min_open_page = self.min.load(AtomicOrdering::SeqCst); + + // if self.min.load(AtomicOrdering::SeqCst) { + // self.min.set(value) + // } + + if !self.mmaps.is_empty() && page_index >= min_page_index { + let mmap = &**self + .mmaps + .get(page_index - min_page_index) + .ok_or(Error::MmapsVecIsTooSmall)? + .get_or_init(|| { + Box::new(unsafe { + memmap2::MmapOptions::new() + .len(Self::PAGE_SIZE) + .offset((page_index * Self::PAGE_SIZE) as u64) + .map(&self.file) + .unwrap() + }) + }); + + let range = Self::index_to_byte_range(index); + let slice = &mmap[range]; + return Ok(Some(Value::Ref(T::try_ref_from_bytes(slice)?))); + } + + Ok(self.open_then_read_(index).map_or(None, |v| Some(Value::Owned(v)))) + } + + #[inline] + pub fn read(&mut self, index: I) -> Result> { + self.read_(Self::i_to_usize(index)?) + } + #[inline] + pub fn read_(&mut self, index: usize) -> Result> { + let byte_index = Self::index_to_byte_index(index); + if self.file_position != byte_index { + self.file_position = self.seek(Self::index_to_byte_index(index))?; + } + match Self::read_exact(&mut self.file, &mut self.buf) { + Ok(value) => { + self.file_position += Self::SIZE_OF_T as u64; + Ok(Some(value)) + } + Err(e) => Err(e), + } + } + + fn read_last(&mut self) -> Result> { + let len = self.len(); + if len == 0 { + return Ok(None); + } + self.read_(len - 1) + } + + pub fn iter(&self, f: F) -> Result<()> + where + F: FnMut((I, &T)) -> Result<()>, + { + self.iter_from(I::default(), f) + } + + pub fn iter_from(&self, mut index: I, mut f: F) -> Result<()> + where + F: FnMut((I, &T)) -> Result<()>, + { + let mut file = self.open_file()?; + + let disk_len = I::from(Self::read_disk_len_(&file)?); + let pushed_len = I::from(self.pushed_len()); + + Self::seek_(&mut file, Self::index_to_byte_index(Self::i_to_usize(index)?))?; + + let mut buf = Self::create_buffer(); + + while index < disk_len { + f((index, Self::read_exact(&mut file, &mut buf)?))?; + index = index + 1; + } + + let disk_len = Self::i_to_usize(disk_len)?; + let mut i = I::default(); + while i < pushed_len { + f((i + disk_len, self.pushed.get(Self::i_to_usize(i)?).as_ref().unwrap()))?; + i = i + 1; + } + + Ok(()) + } + + pub fn collect_range(&self, from: Option, to: Option) -> Result> { + if !self.pushed.is_empty() { + return Err(Error::UnsupportedUnflushedState); + } + + let mut file = self.open_file()?; + + let len = Self::read_disk_len_(&file)?; + + let from = from.map_or(0, |from| { + if from >= 0 { + from as usize + } else { + (len as i64 + from) as usize + } + }); + + let to = to.map_or(len, |to| { + if to >= 0 { + to as usize + } else { + (len as i64 + to) as usize + } + }); + + if from >= to { + return Err(Error::RangeFromAfterTo); + } + + Self::seek_(&mut file, Self::index_to_byte_index(from))?; + + let mut buf = Self::create_buffer(); + + Ok((from..to) + .map(|_| Self::read_exact(&mut file, &mut buf).map(|v| v.to_owned()).unwrap()) + .collect::>()) + } + + #[inline] + pub fn push(&mut self, value: T) { self.pushed.push(value) } #[inline] - fn push_if_needed_(&mut self, index: I, value: T) -> Result<()> { + pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { match self.len().cmp(&Self::i_to_usize(index)?) { Ordering::Greater => { // dbg!(len, index, &self.pathbuf); @@ -229,6 +370,27 @@ where } } + #[inline] + fn push_and_flush_if_needed(&mut self, index: I, value: T, exit: &Exit) -> Result<()> { + match self.len().cmp(&Self::i_to_usize(index)?) { + Ordering::Less => { + return Err(Error::IndexTooHigh); + } + ord => { + if ord == Ordering::Greater { + self.safe_truncate_if_needed(index, exit)?; + } + self.pushed.push(value); + } + } + + if self.pushed_len() >= FLUSH_EVERY { + Ok(self.safe_flush(exit)?) + } else { + Ok(()) + } + } + #[inline] pub fn len(&self) -> usize { self.file_len + self.pushed_len() @@ -258,7 +420,7 @@ where self.has(index).map(|b| !b) } - fn _flush(&mut self) -> io::Result<()> { + pub fn flush(&mut self) -> io::Result<()> { if self.pushed.is_empty() { return Ok(()); } @@ -274,16 +436,35 @@ where self.file.write_all(&bytes)?; - self.reset_disk_related_state()?; + self.reset_file_metadata()?; Ok(()) } - fn reset(&mut self) -> Result<()> { + pub fn safe_flush(&mut self, exit: &Exit) -> io::Result<()> { + if exit.triggered() { + return Ok(()); + } + exit.block(); + self.flush()?; + exit.release(); + Ok(()) + } + + pub fn reset_file(&mut self) -> Result<()> { self.truncate_if_needed(I::from(0))?; Ok(()) } + fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> { + let path = self.path_computed_version(); + if version.validate(path.as_ref()).is_err() { + self.reset_file()?; + } + version.write(path.as_ref())?; + Ok(()) + } + pub fn truncate_if_needed(&mut self, index: I) -> Result> { let index = Self::i_to_usize(index)?; @@ -291,14 +472,23 @@ where return Ok(None); } - let value_at_index = self.open_file_at_then_read(index).ok(); + let value_at_index = self.open_then_read_(index).ok(); self.file.set_len(Self::index_to_byte_index(index))?; - self.reset_disk_related_state()?; + self.reset_file_metadata()?; Ok(value_at_index) } + pub fn safe_truncate_if_needed(&mut self, index: I, exit: &Exit) -> Result<()> { + if exit.triggered() { + return Ok(()); + } + exit.block(); + self.truncate_if_needed(index)?; + exit.release(); + Ok(()) + } #[inline] fn i_to_usize(index: I) -> Result { @@ -357,228 +547,19 @@ where path.join("version") } - pub fn index_type_to_string(&self) -> &str { - std::any::type_name::() - } -} - -impl StorableVec -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - pub fn get(&self, index: I) -> Result>> { - self.get_(Self::i_to_usize(index)?) - } - fn get_(&self, index: usize) -> Result>> { - match self.index_to_pushed_index(index) { - Ok(index) => { - if let Some(index) = index { - return Ok(self.pushed.get(index).map(|v| Value::Ref(v))); - } - } - Err(Error::IndexTooHigh) => return Ok(None), - Err(Error::IndexTooLow) => {} - Err(error) => return Err(error), - } - - // if !self.updated.is_empty() { - // if let Some(v) = self.updated.get(&index) { - // return Ok(Some(v)); - // } - // } - - let page_index = index / Self::PER_PAGE; - let last_index = self.file_len - 1; - let max_page_index = last_index / Self::PER_PAGE; - let min_page_index = (max_page_index + 1) - self.cache.len(); - - // let min_open_page = self.min.load(AtomicOrdering::SeqCst); - - // if self.min.load(AtomicOrdering::SeqCst) { - // self.min.set(value) - // } - - if page_index >= min_page_index { - let mmap = &**self - .cache - .get(page_index - min_page_index) - .ok_or(Error::MmapsVecIsTooSmall)? - .get_or_init(|| { - Box::new(unsafe { - memmap2::MmapOptions::new() - .len(Self::PAGE_SIZE) - .offset((page_index * Self::PAGE_SIZE) as u64) - .map(&self.file) - .unwrap() - }) - }); - - let range = Self::index_to_byte_range(index); - let slice = &mmap[range]; - return Ok(Some(Value::Ref(T::try_ref_from_bytes(slice)?))); - } - - Ok(Some(Value::Owned(self.open_file_at_then_read(index)?.to_owned()))) - } - - pub fn get_or_default(&self, index: I) -> Result - where - T: Default + Clone, - { - Ok(self.get(index)?.map(|v| (*v).clone()).unwrap_or(Default::default())) - } - - pub fn iter_from(&self, mut index: I, mut f: F) -> Result<()> - where - F: FnMut((I, Value)) -> Result<()>, - { - let disk_len = I::from(Self::read_disk_len_(&self.file)?); - - while index < disk_len { - f((index, self.get(index)?.unwrap()))?; - index = index + 1; - } - - let mut index = I::from(0); - let pushed_len = I::from(self.pushed_len()); - let disk_len = Self::i_to_usize(disk_len)?; - while index < pushed_len { - f(((index + disk_len), self.get(index)?.unwrap()))?; - index = index + 1; - } - - Ok(()) - } - - #[inline] - pub fn push(&mut self, value: T) { - self.push_(value) - } - - #[inline] - pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { - self.push_if_needed_(index, value) - } - - #[inline] - pub fn flush(&mut self) -> io::Result<()> { - self._flush() - } -} - -const FLUSH_EVERY: usize = 10_000; -impl StorableVec -where - I: StoredIndex, - T: StoredType, -{ - pub fn get(&mut self, index: I) -> Result<&T> { - self.get_(Self::i_to_usize(index)?) - } - fn get_(&mut self, index: usize) -> Result<&T> { - let byte_index = Self::index_to_byte_index(index); - if self.file_position != byte_index { - self.file_position = Self::seek(&mut self.file, byte_index)?; - } - let res = Self::read_exact(&mut self.file, &mut self.buf); - if res.is_ok() { - self.file_position += Self::SIZE_OF_T as u64; - } - res - } - - fn last(&mut self) -> Result> { - let len = self.len(); - if len == 0 { - return Ok(None); - } - Ok(self.get_(len - 1).ok()) - } - - // #[inline] - // fn push(&mut self, value: T) { - // self.push_(value) - // } - - #[inline] - fn blocked_push_if_needed(&mut self, index: I, value: T, exit: &Exit) -> Result<()> { - self.push_if_needed_(index, value)?; - - if self.pushed_len() >= FLUSH_EVERY { - Ok(self.blocked_flush(exit)?) - } else { - Ok(()) - } - } - - // pub fn iter(&mut self, f: F) -> Result<()> - // where - // F: FnMut((I, &T)) -> Result<()>, - // { - // self.iter_from(I::default(), f) - // } - - pub fn iter_from(&mut self, mut index: I, mut f: F) -> Result<()> - where - F: FnMut((I, &T)) -> Result<()>, - { - // let pushed_len = self.pushed_len(); - - // self.seek_if_needed(index)?; - - if !self.pushed.is_empty() { - return Err(Error::UnsupportedUnflushedState); - } - - let disk_len = I::from(Self::read_disk_len_(&self.file)?); - - while index < disk_len { - f((index, self.get(index)?))?; - index = index + 1; - } - - // i = 0; - // while i < pushed_len { - // f((I::from(i + disk_len), self.pushed.get(i).as_ref().unwrap()))?; - // i += 1; - // } - - Ok(()) - } - #[inline] fn path_computed_version(&self) -> PathBuf { self.path().join("computed_version") } - // #[inline] - // fn path_computed_version_(path: &Path) -> PathBuf { - // path.join("computed_version") - // } - fn validate_or_reset(&mut self, version: Version) -> Result<()> { - let path = self.path_computed_version(); - if version.validate(path.as_ref()).is_err() { - self.reset()?; - } - version.write(path.as_ref())?; - Ok(()) - } - - fn blocked_flush(&mut self, exit: &Exit) -> io::Result<()> { - if exit.triggered() { - return Ok(()); - } - exit.block(); - self._flush()?; - exit.release(); - Ok(()) + pub fn index_type_to_string(&self) -> &str { + std::any::type_name::() } pub fn compute_transform( &mut self, - other: &mut StorableVec, + max_from: I, + other: &mut StorableVec, t: F, exit: &Exit, ) -> Result<()> @@ -586,78 +567,91 @@ where A: StoredType, F: Fn(&A) -> T, { - self.validate_or_reset(Version::from(0) + self.version + other.version)?; + self.validate_computed_version_or_reset_file(Version::from(0) + self.version + other.version)?; - other.iter_from(I::from(self.len()), |(i, a)| self.blocked_push_if_needed(i, t(a), exit))?; - Ok(self.blocked_flush(exit)?) + let index = max_from.min(I::from(self.len())); + other.iter_from(index, |(i, a)| self.push_and_flush_if_needed(i, t(a), exit))?; + + Ok(self.safe_flush(exit)?) } pub fn compute_inverse_more_to_less( &mut self, - other: &mut StorableVec, + max_from: T, + other: &mut StorableVec, exit: &Exit, ) -> Result<()> where - I: StoredType, + I: StoredType + StoredIndex, T: StoredIndex, { - self.validate_or_reset(Version::from(0) + self.version + other.version)?; + self.validate_computed_version_or_reset_file(Version::from(0) + self.version + other.version)?; - let index = self.last()?.cloned().unwrap_or_default(); - other.iter_from(index, |(v, i)| self.blocked_push_if_needed(*i, v, exit))?; - Ok(self.blocked_flush(exit)?) + let index = max_from.min(self.read_last()?.cloned().unwrap_or_default()); + other.iter_from(index, |(v, i)| self.push_and_flush_if_needed(*i, v, exit))?; + + Ok(self.safe_flush(exit)?) } pub fn compute_inverse_less_to_more( &mut self, - first_indexes: &mut StorableVec, - last_indexes: &mut StorableVec, + max_from: T, + first_indexes: &mut StorableVec, + last_indexes: &mut StorableVec, exit: &Exit, ) -> Result<()> where I: StoredType, T: StoredIndex, { - self.validate_or_reset(Version::from(0) + self.version + first_indexes.version + last_indexes.version)?; + self.validate_computed_version_or_reset_file( + Version::from(0) + self.version + first_indexes.version + last_indexes.version, + )?; - first_indexes.iter_from(T::from(self.len()), |(value, first_index)| { + let index = max_from.min(T::from(self.len())); + first_indexes.iter_from(index, |(value, first_index)| { let first_index = Self::i_to_usize(*first_index)?; - let last_index = Self::i_to_usize(*last_indexes.get(value)?)?; - (first_index..last_index).try_for_each(|index| self.blocked_push_if_needed(I::from(index), value, exit)) + let last_index = Self::i_to_usize(*last_indexes.read(value)?.unwrap())?; + (first_index..last_index).try_for_each(|index| self.push_and_flush_if_needed(I::from(index), value, exit)) })?; - Ok(self.blocked_flush(exit)?) + + Ok(self.safe_flush(exit)?) } pub fn compute_last_index_from_first( &mut self, - first_indexes: &mut StorableVec, + max_from: I, + first_indexes: &mut StorableVec, final_len: usize, exit: &Exit, ) -> Result<()> where T: Copy + From + Sub + StoredIndex, { - self.validate_or_reset(Version::from(0) + self.version + first_indexes.version)?; + self.validate_computed_version_or_reset_file(Version::from(0) + self.version + first_indexes.version)?; + let index = max_from.min(I::from(self.len())); let one = T::from(1); let mut prev_index: Option = None; - first_indexes.iter_from(I::from(self.len()), |(i, v)| { + first_indexes.iter_from(index, |(i, v)| { if let Some(prev_index) = prev_index { - self.blocked_push_if_needed(prev_index, *v - one, exit)?; + self.push_and_flush_if_needed(prev_index, *v - one, exit)?; } prev_index.replace(i); Ok(()) })?; if let Some(prev_index) = prev_index { - self.blocked_push_if_needed(prev_index, T::from(final_len) - one, exit)?; + self.push_and_flush_if_needed(prev_index, T::from(final_len) - one, exit)?; } - Ok(self.blocked_flush(exit)?) + + Ok(self.safe_flush(exit)?) } pub fn compute_count_from_indexes( &mut self, - first_indexes: &mut StorableVec, - last_indexes: &mut StorableVec, + max_from: I, + first_indexes: &mut StorableVec, + last_indexes: &mut StorableVec, exit: &Exit, ) -> Result<()> where @@ -665,20 +659,25 @@ where T2: StoredType + Copy + Add + Sub + TryInto, >::Error: error::Error + 'static, { - self.validate_or_reset(Version::from(0) + self.version + first_indexes.version + last_indexes.version)?; + self.validate_computed_version_or_reset_file( + Version::from(0) + self.version + first_indexes.version + last_indexes.version, + )?; - first_indexes.iter_from(I::from(self.len()), |(i, first_index)| { - let last_index = last_indexes.get(i)?; + let index = max_from.min(I::from(self.len())); + first_indexes.iter_from(index, |(i, first_index)| { + let last_index = last_indexes.read(i)?.unwrap(); let count = *last_index + 1_usize - *first_index; - self.blocked_push_if_needed(i, count.into(), exit) + self.push_and_flush_if_needed(i, count.into(), exit) })?; - Ok(self.blocked_flush(exit)?) + + Ok(self.safe_flush(exit)?) } pub fn compute_is_first_ordered( &mut self, - self_to_other: &mut StorableVec, - other_to_self: &mut StorableVec, + max_from: I, + self_to_other: &mut StorableVec, + other_to_self: &mut StorableVec, exit: &Exit, ) -> Result<()> where @@ -686,18 +685,23 @@ where T: From, A: StoredIndex + StoredType, { - self.validate_or_reset(Version::from(0) + self.version + self_to_other.version + other_to_self.version)?; + self.validate_computed_version_or_reset_file( + Version::from(0) + self.version + self_to_other.version + other_to_self.version, + )?; - self_to_other.iter_from(I::from(self.len()), |(i, other)| { - self.blocked_push_if_needed(i, T::from(other_to_self.get(*other)? == &i), exit) + let index = max_from.min(I::from(self.len())); + self_to_other.iter_from(index, |(i, other)| { + self.push_and_flush_if_needed(i, T::from(other_to_self.read(*other)?.unwrap() == &i), exit) })?; - Ok(self.blocked_flush(exit)?) + + Ok(self.safe_flush(exit)?) } pub fn compute_sum_from_indexes( &mut self, - first_indexes: &mut StorableVec, - last_indexes: &mut StorableVec, + max_from: I, + first_indexes: &mut StorableVec, + last_indexes: &mut StorableVec, exit: &Exit, ) -> Result<()> where @@ -706,70 +710,22 @@ where >::Error: error::Error + 'static, F: Fn(&T2) -> T, { - self.validate_or_reset(Version::from(0) + self.version + first_indexes.version + last_indexes.version)?; + self.validate_computed_version_or_reset_file( + Version::from(0) + self.version + first_indexes.version + last_indexes.version, + )?; - first_indexes.iter_from(I::from(self.len()), |(i, first_index)| { - let last_index = last_indexes.get(i)?; + let index = max_from.min(I::from(self.len())); + first_indexes.iter_from(index, |(index, first_index)| { + let last_index = last_indexes.read(index)?.unwrap(); let count = *last_index + 1_usize - *first_index; - self.blocked_push_if_needed(i, count.into(), exit) + self.push_and_flush_if_needed(index, count.into(), exit) })?; - Ok(self.blocked_flush(exit)?) + + Ok(self.safe_flush(exit)?) } } -impl StorableVec -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - pub fn get(&self, index: I) -> Result> { - Ok(Some(self.open_file_at_then_read(Self::i_to_usize(index)?)?)) - } - - pub fn collect_range(&self, from: Option, to: Option) -> Result> { - if !self.pushed.is_empty() { - return Err(Error::UnsupportedUnflushedState); - } - - let mut file = self.open_file()?; - - let len = Self::read_disk_len_(&file)?; - - let from = from.map_or(0, |from| { - if from >= 0 { - from as usize - } else { - (len as i64 + from) as usize - } - }); - - let to = to.map_or(len, |to| { - if to >= 0 { - to as usize - } else { - (len as i64 + to) as usize - } - }); - - if from >= to { - return Err(Error::RangeFromAfterTo); - } - - Self::seek(&mut file, Self::index_to_byte_index(from))?; - - let mut buf = Self::create_buffer(); - - Ok((from..to) - .map(|_| Self::read_exact(&mut file, &mut buf).map(|v| v.to_owned()).unwrap()) - .collect::>()) - } - - // Add iter iter_from iter_range collect.. - // + add memory cap -} - -impl Clone for StorableVec +impl Clone for StorableVec where I: StoredIndex, T: StoredType, diff --git a/crates/brk_vec/src/main.rs b/crates/brk_vec/src/main.rs index 2807bb612..5083360f9 100644 --- a/crates/brk_vec/src/main.rs +++ b/crates/brk_vec/src/main.rs @@ -1,11 +1,10 @@ use std::path::Path; -use brk_vec::{CACHED_GETS, SINGLE_THREAD, StorableVec, Version}; +use brk_vec::{StorableVec, Version}; fn main() -> Result<(), Box> { { - let mut vec: StorableVec = - StorableVec::forced_import(Path::new("./v"), Version::from(1))?; + let mut vec: StorableVec = StorableVec::forced_import(Path::new("./v"), Version::from(1))?; vec.push(0); vec.push(1); @@ -17,13 +16,12 @@ fn main() -> Result<(), Box> { } { - let mut vec: StorableVec = - StorableVec::forced_import(Path::new("./v"), Version::from(1))?; + let mut vec: StorableVec = StorableVec::forced_import(Path::new("./v"), Version::from(1))?; - dbg!(vec.get(0)?); // 0 - dbg!(vec.get(1)?); // 0 - dbg!(vec.get(2)?); // 0 - dbg!(vec.get(0)?); // 0 + dbg!(vec.read(0)?); // 0 + dbg!(vec.read(1)?); // 0 + dbg!(vec.read(2)?); // 0 + dbg!(vec.read(0)?); // 0 } Ok(()) diff --git a/crates/brk_vec/src/traits/any.rs b/crates/brk_vec/src/traits/any.rs index 9b449bbd0..d8324e30c 100644 --- a/crates/brk_vec/src/traits/any.rs +++ b/crates/brk_vec/src/traits/any.rs @@ -1,6 +1,6 @@ -use std::mem; +use std::io; -use crate::{Result, STATELESS, StorableVec}; +use crate::{Result, StorableVec}; use super::{StoredIndex, StoredType}; @@ -9,10 +9,11 @@ pub trait AnyStorableVec: Send + Sync { fn index_type_to_string(&self) -> &str; fn len(&self) -> usize; fn is_empty(&self) -> bool; - // fn flush(&mut self) -> io::Result<()>; + fn collect_range_values(&self, from: Option, to: Option) -> Result>; + fn flush(&mut self) -> io::Result<()>; } -impl AnyStorableVec for StorableVec +impl AnyStorableVec for StorableVec where I: StoredIndex, T: StoredType, @@ -33,33 +34,15 @@ where self.is_empty() } - // fn flush(&mut self) -> io::Result<()> { - // self.flush() - // } -} + fn flush(&mut self) -> io::Result<()> { + self.flush() + } -#[cfg(feature = "json")] -pub trait AnyJsonStorableVec: AnyStorableVec { - fn collect_range_values(&self, from: Option, to: Option) -> Result>; -} - -#[cfg(feature = "json")] -impl AnyJsonStorableVec for StorableVec -where - I: StoredIndex, - T: StoredType + serde::Serialize, -{ fn collect_range_values(&self, from: Option, to: Option) -> Result> { - if MODE == STATELESS { - Ok( - unsafe { mem::transmute::<&StorableVec, &StorableVec>(self) } - .collect_range(from, to)? - .into_iter() - .map(|v| serde_json::to_value(v).unwrap()) - .collect::>(), - ) - } else { - todo!("todo ?") - } + Ok(self + .collect_range(from, to)? + .into_iter() + .map(|v| serde_json::to_value(v).unwrap()) + .collect::>()) } } diff --git a/crates/brk_vec/src/traits/stored_type.rs b/crates/brk_vec/src/traits/stored_type.rs index f0a7b9e42..2ae350140 100644 --- a/crates/brk_vec/src/traits/stored_type.rs +++ b/crates/brk_vec/src/traits/stored_type.rs @@ -1,13 +1,14 @@ use std::fmt::Debug; +use serde::Serialize; use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; pub trait StoredType where - Self: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync, + Self: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + Serialize, { } impl StoredType for T where - T: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + T: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + Serialize { }