From 2ad55bf558f003814cb9d3a1ac6f0a333c955908 Mon Sep 17 00:00:00 2001 From: nym21 Date: Tue, 4 Nov 2025 11:43:04 +0100 Subject: [PATCH] global: snapshot --- Cargo.lock | 132 ++++-- Cargo.toml | 15 +- crates/brk_computer/src/indexes.rs | 95 ++-- crates/brk_error/Cargo.toml | 2 +- crates/brk_error/src/lib.rs | 16 +- crates/brk_indexer/Cargo.toml | 3 +- crates/brk_indexer/examples/indexer_read.rs | 2 +- .../examples/indexer_read_speed.rs | 2 +- .../examples/indexer_single_vs_multi.rs | 124 +++--- crates/brk_indexer/src/indexes.rs | 2 +- crates/brk_indexer/src/lib.rs | 13 +- crates/brk_indexer/src/stores_redb.rs | 408 ++++++++++++++++++ crates/brk_indexer/src/stores_v2.rs | 120 +++--- crates/brk_indexer/src/stores_v3.rs | 298 +++++++------ crates/brk_logger/src/lib.rs | 2 +- crates/brk_store/Cargo.toml | 3 +- crates/brk_store/src/any.rs | 2 - crates/brk_store/src/{v2 => fjall_v2}/meta.rs | 0 crates/brk_store/src/{v2 => fjall_v2}/mod.rs | 18 +- crates/brk_store/src/{v3 => fjall_v3}/meta.rs | 22 +- crates/brk_store/src/fjall_v3/mod.rs | 289 +++++++++++++ crates/brk_store/src/lib.rs | 10 +- crates/brk_store/src/redb/meta.rs | 77 ++++ crates/brk_store/src/redb/mod.rs | 232 ++++++++++ crates/brk_store/src/v3/mod.rs | 256 ----------- crates/brk_types/Cargo.toml | 1 + crates/brk_types/src/addressbyteshash.rs | 40 ++ crates/brk_types/src/blockhashprefix.rs | 40 ++ crates/brk_types/src/height.rs | 40 ++ crates/brk_types/src/stored_string.rs | 36 +- crates/brk_types/src/txidprefix.rs | 40 ++ crates/brk_types/src/txindex.rs | 37 +- crates/brk_types/src/typeindex.rs | 34 +- crates/brk_types/src/typeindexandoutpoint.rs | 52 ++- crates/brk_types/src/typeindexandtxindex.rs | 40 ++ crates/brk_types/src/unit.rs | 36 ++ crates/brk_types/src/vout.rs | 34 ++ 37 files changed, 1903 insertions(+), 670 deletions(-) create mode 100644 crates/brk_indexer/src/stores_redb.rs rename crates/brk_store/src/{v2 => fjall_v2}/meta.rs (100%) rename crates/brk_store/src/{v2 => fjall_v2}/mod.rs (94%) rename crates/brk_store/src/{v3 => fjall_v3}/meta.rs (80%) create mode 100644 crates/brk_store/src/fjall_v3/mod.rs create mode 100644 crates/brk_store/src/redb/meta.rs create mode 100644 crates/brk_store/src/redb/mod.rs delete mode 100644 crates/brk_store/src/v3/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e3a412ae2..6e4e54bde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -675,7 +675,8 @@ version = "0.0.111" dependencies = [ "bitcoin", "bitcoincore-rpc", - "brk_fjall", + "brk_fjall 2.11.5", + "brk_fjall 3.0.0-pre.4", "jiff", "minreq", "sonic-rs", @@ -705,13 +706,27 @@ dependencies = [ "byteview 0.6.1", "dashmap", "log", - "lsm-tree", + "lsm-tree 2.10.4", "path-absolutize", "std-semaphore", "tempfile", "xxhash-rust", ] +[[package]] +name = "brk_fjall" +version = "3.0.0-pre.4" +dependencies = [ + "byteorder-lite", + "byteview 0.8.0", + "dashmap", + "log", + "lsm-tree 3.0.0-pre.4", + "std-semaphore", + "tempfile", + "xxhash-rust", +] + [[package]] name = "brk_grouper" version = "0.0.111" @@ -729,7 +744,8 @@ version = "0.0.111" dependencies = [ "bitcoin", "brk_error", - "brk_fjall", + "brk_fjall 2.11.5", + "brk_fjall 3.0.0-pre.4", "brk_grouper", "brk_iterator", "brk_logger", @@ -740,6 +756,7 @@ dependencies = [ "brk_types", "log", "rayon", + "redb", "rustc-hash", "vecdb", ] @@ -1269,13 +1286,15 @@ name = "brk_store" version = "0.0.111" dependencies = [ "brk_error", - "brk_fjall", + "brk_fjall 2.11.5", + "brk_fjall 3.0.0-pre.4", "brk_types", "byteview 0.6.1", "byteview 0.8.0", "candystore", "log", "parking_lot 0.12.5", + "redb", "rustc-hash", ] @@ -1317,12 +1336,13 @@ dependencies = [ "allocative", "bitcoin", "brk_error", - "byteview 0.6.1", + "byteview 0.8.0", "derive_deref", "itoa", "jiff", "num_enum", "rapidhash", + "redb", "ryu", "schemars", "serde", @@ -1389,6 +1409,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" +[[package]] +name = "byteorder-lite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" + [[package]] name = "bytes" version = "1.10.1" @@ -2836,6 +2862,26 @@ dependencies = [ "xxhash-rust", ] +[[package]] +name = "lsm-tree" +version = "3.0.0-pre.4" +dependencies = [ + "byteorder-lite", + "byteview 0.8.0", + "crossbeam-skiplist", + "enum_dispatch", + "interval-heap", + "log", + "lz4_flex", + "quick_cache", + "rustc-hash", + "self_cell", + "sfa", + "tempfile", + "varint-rs", + "xxhash-rust", +] + [[package]] name = "lz4_flex" version = "0.11.5" @@ -4060,6 +4106,19 @@ dependencies = [ "rustversion", ] +[[package]] +name = "rawdb" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6601495283897e6193cbd70d65e4e7420995c59c0de3de953f297c2fa8d9dc7" +dependencies = [ + "allocative", + "libc", + "memmap2", + "parking_lot 0.12.5", + "rayon", +] + [[package]] name = "rayon" version = "1.11.0" @@ -4080,6 +4139,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redb" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06" +dependencies = [ + "libc", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -4296,9 +4364,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" +checksum = "1317c3bf3e7df961da95b0a56a172a02abead31276215a0497241a7624b487ce" dependencies = [ "chrono", "dyn-clone", @@ -4311,9 +4379,9 @@ dependencies = [ [[package]] name = "schemars_derive" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80" +checksum = "5f760a6150d45dd66ec044983c124595ae76912e77ed0b44124cb3e415cce5d9" dependencies = [ "proc-macro2", "quote", @@ -4380,18 +4448,6 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" -[[package]] -name = "seqdb" -version = "0.2.17" -dependencies = [ - "allocative", - "libc", - "memmap2", - "parking_lot 0.12.5", - "rayon", - "zerocopy", -] - [[package]] name = "serde" version = "1.0.228" @@ -4502,6 +4558,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sfa" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84349eedbb0664e9febec9c385e8a6644d0496281965ed28b1b20a0108264a9" +dependencies = [ + "byteorder-lite", + "log", + "xxhash-rust", +] + [[package]] name = "sha1" version = "0.10.6" @@ -4594,9 +4661,9 @@ dependencies = [ [[package]] name = "sonic-rs" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22540d56ba14521e4878ad436d498518c59698c39a89d5905c694932f0bf7134" +checksum = "4425ea8d66ec950e0a8f2ef52c766cc3d68d661d9a0845c353c40833179fd866" dependencies = [ "ahash", "bumpalo", @@ -4615,9 +4682,9 @@ dependencies = [ [[package]] name = "sonic-simd" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b421f7b6aa4a5de8f685aaf398dfaa828346ee639d2b1c1061ab43d40baa6223" +checksum = "5707edbfb34a40c9f2a55fa09a49101d9fec4e0cc171ce386086bd9616f34257" dependencies = [ "cfg-if", ] @@ -4905,9 +4972,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.16" +version = "0.7.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" dependencies = [ "bytes", "futures-core", @@ -5264,17 +5331,18 @@ checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" [[package]] name = "vecdb" -version = "0.2.17" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffa6180edb8fb521c9958d4d00a1cca1b5de396ee371f2f8e3ec54c21205eff" dependencies = [ "allocative", "ctrlc", "log", "parking_lot 0.12.5", "pco", - "seqdb", + "rawdb", "serde", "serde_derive", - "serde_json", "sonic-rs", "vecdb_derive", "zerocopy", @@ -5282,7 +5350,9 @@ dependencies = [ [[package]] name = "vecdb_derive" -version = "0.2.17" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23dffd7af8e5579122f79e4a2df3f62f5f315ef0d593721f585384b61bc722e2" dependencies = [ "quote", "syn 2.0.108", diff --git a/Cargo.toml b/Cargo.toml index e8999bb98..6c60af4a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,30 +52,31 @@ brk_store = { version = "0.0.111", path = "crates/brk_store" } brk_types = { version = "0.0.111", path = "crates/brk_types" } brk_traversable = { version = "0.0.111", path = "crates/brk_traversable", features = ["derive"] } brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_derive" } -byteview = "=0.6.1" -# byteview = "~0.8.0" +# byteview = "=0.6.1" +byteview = "~0.8.0" derive_deref = "1.1.1" fjall2 = { version = "2.11.5", package = "brk_fjall" } # fjall2 = { path = "../fjall2", package = "brk_fjall" } # fjall2 = { version = "2.11.2", package = "fjall" } # fjall3 = { version = "=3.0.0-pre.0", package = "fjall" } -# fjall3 = { path = "../fjall3", package = "fjall" } +fjall3 = { path = "../fjall3", package = "brk_fjall" } # fjall3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "bb15057500dce3115d7644d268b9deeaa895b431", package = "fjall" } jiff = "0.2.15" log = "0.4.28" minreq = { version = "2.14.1", features = ["https", "serde_json"] } parking_lot = "0.12.5" rayon = "1.11.0" +redb = "3.1.0" rustc-hash = "2.1.1" -schemars = "1.0.4" +schemars = "1.0.5" serde = "1.0.228" serde_bytes = "0.11.19" serde_derive = "1.0.228" serde_json = { version = "1.0.145", features = ["float_roundtrip"] } -sonic-rs = "0.5.5" +sonic-rs = "0.5.6" tokio = { version = "1.48.0", features = ["rt-multi-thread"] } -vecdb = { path = "../seqdb/crates/vecdb", features = ["derive"] } -# vecdb = { version = "0.2.17", features = ["derive"] } +# vecdb = { path = "../seqdb/crates/vecdb", features = ["derive"] } +vecdb = { version = "0.3.1", features = ["derive"] } zerocopy = { version = "0.8.27", features = ["derive"] } [workspace.metadata.release] diff --git a/crates/brk_computer/src/indexes.rs b/crates/brk_computer/src/indexes.rs index 0ae11852a..b268c1126 100644 --- a/crates/brk_computer/src/indexes.rs +++ b/crates/brk_computer/src/indexes.rs @@ -605,12 +605,11 @@ impl Vecs { starting_dateindex }; - self.dateindex_to_first_height - .compute_inverse_more_to_less( - starting_indexes.height, - &self.height_to_dateindex, - exit, - )?; + self.dateindex_to_first_height.compute_coarser( + starting_indexes.height, + &self.height_to_dateindex, + exit, + )?; self.dateindex_to_dateindex.compute_from_index( starting_dateindex, @@ -648,8 +647,11 @@ impl Vecs { exit, )?; - self.weekindex_to_first_dateindex - .compute_inverse_more_to_less(starting_dateindex, &self.dateindex_to_weekindex, exit)?; + self.weekindex_to_first_dateindex.compute_coarser( + starting_dateindex, + &self.dateindex_to_weekindex, + exit, + )?; self.weekindex_to_weekindex.compute_from_index( starting_weekindex, @@ -681,12 +683,11 @@ impl Vecs { exit, )?; - self.difficultyepoch_to_first_height - .compute_inverse_more_to_less( - starting_indexes.height, - &self.height_to_difficultyepoch, - exit, - )?; + self.difficultyepoch_to_first_height.compute_coarser( + starting_indexes.height, + &self.height_to_difficultyepoch, + exit, + )?; self.difficultyepoch_to_difficultyepoch.compute_from_index( starting_difficultyepoch, @@ -719,12 +720,11 @@ impl Vecs { exit, )?; - self.monthindex_to_first_dateindex - .compute_inverse_more_to_less( - starting_dateindex, - &self.dateindex_to_monthindex, - exit, - )?; + self.monthindex_to_first_dateindex.compute_coarser( + starting_dateindex, + &self.dateindex_to_monthindex, + exit, + )?; self.monthindex_to_monthindex.compute_from_index( starting_monthindex, @@ -756,12 +756,11 @@ impl Vecs { exit, )?; - self.quarterindex_to_first_monthindex - .compute_inverse_more_to_less( - starting_monthindex, - &self.monthindex_to_quarterindex, - exit, - )?; + self.quarterindex_to_first_monthindex.compute_coarser( + starting_monthindex, + &self.monthindex_to_quarterindex, + exit, + )?; // let quarter_count = self.quarterindex_to_first_monthindex.len(); @@ -795,12 +794,11 @@ impl Vecs { exit, )?; - self.semesterindex_to_first_monthindex - .compute_inverse_more_to_less( - starting_monthindex, - &self.monthindex_to_semesterindex, - exit, - )?; + self.semesterindex_to_first_monthindex.compute_coarser( + starting_monthindex, + &self.monthindex_to_semesterindex, + exit, + )?; // let semester_count = self.semesterindex_to_first_monthindex.len(); @@ -834,12 +832,11 @@ impl Vecs { exit, )?; - self.yearindex_to_first_monthindex - .compute_inverse_more_to_less( - starting_monthindex, - &self.monthindex_to_yearindex, - exit, - )?; + self.yearindex_to_first_monthindex.compute_coarser( + starting_monthindex, + &self.monthindex_to_yearindex, + exit, + )?; self.yearindex_to_yearindex.compute_from_index( starting_yearindex, @@ -870,12 +867,11 @@ impl Vecs { exit, )?; - self.halvingepoch_to_first_height - .compute_inverse_more_to_less( - starting_indexes.height, - &self.height_to_halvingepoch, - exit, - )?; + self.halvingepoch_to_first_height.compute_coarser( + starting_indexes.height, + &self.height_to_halvingepoch, + exit, + )?; self.halvingepoch_to_halvingepoch.compute_from_index( starting_halvingepoch, @@ -899,12 +895,11 @@ impl Vecs { exit, )?; - self.decadeindex_to_first_yearindex - .compute_inverse_more_to_less( - starting_yearindex, - &self.yearindex_to_decadeindex, - exit, - )?; + self.decadeindex_to_first_yearindex.compute_coarser( + starting_yearindex, + &self.yearindex_to_decadeindex, + exit, + )?; self.decadeindex_to_decadeindex.compute_from_index( starting_decadeindex, diff --git a/crates/brk_error/Cargo.toml b/crates/brk_error/Cargo.toml index 5d186f85e..64ff8b853 100644 --- a/crates/brk_error/Cargo.toml +++ b/crates/brk_error/Cargo.toml @@ -13,7 +13,7 @@ build = "build.rs" bitcoin = { workspace = true } bitcoincore-rpc = { workspace = true } fjall2 = { workspace = true } -# fjall3 = { workspace = true } +fjall3 = { workspace = true } jiff = { workspace = true } minreq = { workspace = true } sonic-rs = { workspace = true } diff --git a/crates/brk_error/src/lib.rs b/crates/brk_error/src/lib.rs index e7cc2c0d8..10ad3735e 100644 --- a/crates/brk_error/src/lib.rs +++ b/crates/brk_error/src/lib.rs @@ -13,7 +13,7 @@ pub enum Error { BitcoinRPC(bitcoincore_rpc::Error), Jiff(jiff::Error), FjallV2(fjall2::Error), - // FjallV3(fjall3::Error), + FjallV3(fjall3::Error), VecDB(vecdb::Error), SeqDB(vecdb::SeqDBError), Minreq(minreq::Error), @@ -134,12 +134,12 @@ impl From for Error { } } -// impl From for Error { -// #[inline] -// fn from(value: fjall3::Error) -> Self { -// Self::FjallV3(value) -// } -// } +impl From for Error { + #[inline] + fn from(value: fjall3::Error) -> Self { + Self::FjallV3(value) + } +} impl From for Error { #[inline] @@ -179,7 +179,7 @@ impl fmt::Display for Error { Error::BitcoinHexToArrayError(error) => Display::fmt(&error, f), Error::BitcoinRPC(error) => Display::fmt(&error, f), Error::FjallV2(error) => Display::fmt(&error, f), - // Error::FjallV3(error) => Display::fmt(&error, f), + Error::FjallV3(error) => Display::fmt(&error, f), Error::IO(error) => Display::fmt(&error, f), Error::Jiff(error) => Display::fmt(&error, f), Error::Minreq(error) => Display::fmt(&error, f), diff --git a/crates/brk_indexer/Cargo.toml b/crates/brk_indexer/Cargo.toml index e408a04b6..50604e9e4 100644 --- a/crates/brk_indexer/Cargo.toml +++ b/crates/brk_indexer/Cargo.toml @@ -21,8 +21,9 @@ brk_store = { workspace = true } brk_types = { workspace = true } brk_traversable = { workspace = true } fjall2 = { workspace = true } -# fjall3 = { workspace = true } +fjall3 = { workspace = true } log = { workspace = true } rayon = { workspace = true } +redb = { workspace = true } rustc-hash = { workspace = true } vecdb = { workspace = true } diff --git a/crates/brk_indexer/examples/indexer_read.rs b/crates/brk_indexer/examples/indexer_read.rs index f3cb4d0a0..7241b28cb 100644 --- a/crates/brk_indexer/examples/indexer_read.rs +++ b/crates/brk_indexer/examples/indexer_read.rs @@ -14,7 +14,7 @@ fn main() -> Result<()> { let mut sum = Sats::ZERO; let mut count: usize = 0; - for value in indexer.vecs.txoutindex_to_value.clean_values().unwrap() { + for value in indexer.vecs.txoutindex_to_value.clean_iter().unwrap() { sum += value; count += 1; } diff --git a/crates/brk_indexer/examples/indexer_read_speed.rs b/crates/brk_indexer/examples/indexer_read_speed.rs index 32091ebae..1b8871eca 100644 --- a/crates/brk_indexer/examples/indexer_read_speed.rs +++ b/crates/brk_indexer/examples/indexer_read_speed.rs @@ -8,7 +8,7 @@ fn run_benchmark(indexer: &Indexer) -> (Sats, std::time::Duration, usize) { let mut sum = Sats::ZERO; let mut count = 0; - for value in indexer.vecs.txoutindex_to_value.clean_values().unwrap() { + for value in indexer.vecs.txoutindex_to_value.clean_iter().unwrap() { // for value in indexer.vecs.txoutindex_to_value.values() { sum += value; count += 1; diff --git a/crates/brk_indexer/examples/indexer_single_vs_multi.rs b/crates/brk_indexer/examples/indexer_single_vs_multi.rs index 8594536a8..183992bc8 100644 --- a/crates/brk_indexer/examples/indexer_single_vs_multi.rs +++ b/crates/brk_indexer/examples/indexer_single_vs_multi.rs @@ -1,4 +1,9 @@ -use std::{fs, path::Path, thread, time::Instant}; +use std::{ + fs, + path::Path, + thread, + time::{Duration, Instant}, +}; use brk_error::Result; use brk_indexer::Indexer; @@ -54,7 +59,7 @@ fn main() -> Result<()> { _ => vec![6, 4, 7, 1, 8, 5, 2], }; - let mut run_times = [std::time::Duration::ZERO; 7]; + let mut run_times = [Duration::ZERO; 7]; for &method in &order { match method { @@ -77,7 +82,7 @@ fn main() -> Result<()> { INPUTS_PER_BLOCK, OUTPUT_START_OFFSET, INPUT_START_OFFSET, - ); + )?; run_times[1] = time; } 4 => { @@ -88,7 +93,7 @@ fn main() -> Result<()> { INPUTS_PER_BLOCK, OUTPUT_START_OFFSET, INPUT_START_OFFSET, - ); + )?; run_times[2] = time; } 5 => { @@ -110,7 +115,7 @@ fn main() -> Result<()> { INPUTS_PER_BLOCK, OUTPUT_START_OFFSET, INPUT_START_OFFSET, - ); + )?; run_times[4] = time; } 7 => { @@ -177,7 +182,7 @@ fn main() -> Result<()> { ]; for (name, times) in &methods { - let avg = times.iter().sum::() / times.len() as u32; + let avg = times.iter().sum::() / times.len() as u32; let min = times.iter().min().unwrap(); let max = times.iter().max().unwrap(); @@ -193,7 +198,7 @@ fn main() -> Result<()> { let averages: Vec<_> = methods .iter() .map(|(name, times)| { - let avg = times.iter().sum::() / times.len() as u32; + let avg = times.iter().sum::() / times.len() as u32; (*name, avg) }) .collect(); @@ -221,7 +226,7 @@ fn run_method1( inputs_per_block: usize, output_start_offset: usize, input_start_offset: usize, -) -> std::time::Duration { +) -> Duration { let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader(); let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader(); @@ -287,7 +292,7 @@ fn run_method2( inputs_per_block: usize, output_start_offset: usize, input_start_offset: usize, -) -> std::time::Duration { +) -> Result { let start_time = Instant::now(); for block_idx in 0..num_blocks { @@ -298,23 +303,23 @@ fn run_method2( let values: Vec<_> = vecs .txoutindex_to_value - .iter_at(block_start) + .iter()? + .skip(block_start.to_usize()) .take(outputs_per_block) - .map(|(_, v)| v) .collect(); let output_types: Vec<_> = vecs .txoutindex_to_outputtype - .iter_at(block_start) + .iter()? + .skip(block_start.to_usize()) .take(outputs_per_block) - .map(|(_, v)| v) .collect(); let typeindexes: Vec<_> = vecs .txoutindex_to_typeindex - .iter_at(block_start) + .iter()? + .skip(block_start.to_usize()) .take(outputs_per_block) - .map(|(_, v)| v) .collect(); let _outputs: Vec<_> = (0..outputs_per_block) @@ -328,9 +333,9 @@ fn run_method2( let outpoints: Vec<_> = vecs .txinindex_to_outpoint - .iter_at(input_block_start) + .iter()? + .skip(input_block_start.to_usize()) .take(inputs_per_block) - .map(|(_, v)| v) .collect(); let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader(); @@ -360,7 +365,7 @@ fn run_method2( std::hint::black_box(input_sum); } - start_time.elapsed() + Ok(start_time.elapsed()) } fn run_method4( @@ -370,7 +375,7 @@ fn run_method4( inputs_per_block: usize, output_start_offset: usize, input_start_offset: usize, -) -> std::time::Duration { +) -> Result { let start_time = Instant::now(); for block_idx in 0..num_blocks { @@ -379,33 +384,40 @@ fn run_method4( (output_start_offset + (block_idx * outputs_per_block)) as u64, ); - let (values, output_types, typeindexes) = thread::scope(|s| { - let h1 = s.spawn(|| { - vecs.txoutindex_to_value - .iter_at(block_start) + let (values, output_types, typeindexes) = thread::scope(|s| -> Result<_> { + let h1 = s.spawn(|| -> Result<_> { + Ok(vecs + .txoutindex_to_value + .iter()? + .skip(block_start.to_usize()) .take(outputs_per_block) - .map(|(_, v)| v) - .collect::>() + .collect::>()) }); - let h2 = s.spawn(|| { - vecs.txoutindex_to_outputtype - .iter_at(block_start) + let h2 = s.spawn(|| -> Result<_> { + Ok(vecs + .txoutindex_to_outputtype + .iter()? + .skip(block_start.to_usize()) .take(outputs_per_block) - .map(|(_, v)| v) - .collect::>() + .collect::>()) }); - let h3 = s.spawn(|| { - vecs.txoutindex_to_typeindex - .iter_at(block_start) + let h3 = s.spawn(|| -> Result<_> { + Ok(vecs + .txoutindex_to_typeindex + .iter()? + .skip(block_start.to_usize()) .take(outputs_per_block) - .map(|(_, v)| v) - .collect::>() + .collect::>()) }); - (h1.join().unwrap(), h2.join().unwrap(), h3.join().unwrap()) - }); + Ok(( + h1.join().unwrap()?, + h2.join().unwrap()?, + h3.join().unwrap()?, + )) + })?; let _outputs: Vec<_> = (0..outputs_per_block) .into_par_iter() @@ -418,9 +430,9 @@ fn run_method4( let outpoints: Vec<_> = vecs .txinindex_to_outpoint - .iter_at(input_block_start) + .iter()? + .skip(input_block_start.to_usize()) .take(inputs_per_block) - .map(|(_, v)| v) .collect(); let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader(); @@ -450,7 +462,7 @@ fn run_method4( std::hint::black_box(input_sum); } - start_time.elapsed() + Ok(start_time.elapsed()) } fn run_method5( @@ -460,7 +472,7 @@ fn run_method5( inputs_per_block: usize, output_start_offset: usize, input_start_offset: usize, -) -> std::time::Duration { +) -> Duration { let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader(); let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader(); @@ -528,7 +540,7 @@ fn run_method6( inputs_per_block: usize, output_start_offset: usize, input_start_offset: usize, -) -> std::time::Duration { +) -> Result { let start_time = Instant::now(); for block_idx in 0..num_blocks { @@ -539,23 +551,23 @@ fn run_method6( let values: Vec<_> = vecs .txoutindex_to_value - .iter_at(block_start) + .iter()? + .skip(block_start.to_usize()) .take(outputs_per_block) - .map(|(_, v)| v) .collect(); let output_types: Vec<_> = vecs .txoutindex_to_outputtype - .iter_at(block_start) + .iter()? + .skip(block_start.to_usize()) .take(outputs_per_block) - .map(|(_, v)| v) .collect(); let typeindexes: Vec<_> = vecs .txoutindex_to_typeindex - .iter_at(block_start) + .iter()? + .skip(block_start.to_usize()) .take(outputs_per_block) - .map(|(_, v)| v) .collect(); // Read inputs sequentially @@ -564,9 +576,9 @@ fn run_method6( let outpoints: Vec<_> = vecs .txinindex_to_outpoint - .iter_at(input_block_start) + .iter()? + .skip(input_block_start.to_usize()) .take(inputs_per_block) - .map(|(_, v)| v) .collect(); let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader(); @@ -608,7 +620,7 @@ fn run_method6( std::hint::black_box(input_sum); } - start_time.elapsed() + Ok(start_time.elapsed()) } fn run_method7( @@ -618,7 +630,7 @@ fn run_method7( inputs_per_block: usize, output_start_offset: usize, input_start_offset: usize, -) -> std::time::Duration { +) -> Duration { // Create readers ONCE outside loop let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader(); @@ -683,7 +695,7 @@ fn run_method8( inputs_per_block: usize, output_start_offset: usize, input_start_offset: usize, -) -> std::time::Duration { +) -> Duration { let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader(); let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader(); @@ -758,8 +770,8 @@ fn run_method8( start_time.elapsed() } -fn calculate_stddev(times: &[std::time::Duration]) -> std::time::Duration { - let avg = times.iter().sum::().as_secs_f64() / times.len() as f64; +fn calculate_stddev(times: &[Duration]) -> Duration { + let avg = times.iter().sum::().as_secs_f64() / times.len() as f64; let variance = times .iter() .map(|t| { @@ -768,5 +780,5 @@ fn calculate_stddev(times: &[std::time::Duration]) -> std::time::Duration { }) .sum::() / times.len() as f64; - std::time::Duration::from_secs_f64(variance.sqrt()) + Duration::from_secs_f64(variance.sqrt()) } diff --git a/crates/brk_indexer/src/indexes.rs b/crates/brk_indexer/src/indexes.rs index c76bece15..1bd1b3bb5 100644 --- a/crates/brk_indexer/src/indexes.rs +++ b/crates/brk_indexer/src/indexes.rs @@ -233,6 +233,6 @@ where } else if h + 1_u32 == starting_height { Some(I::from(index_to_else.len())) } else { - height_to_index.iter().get_inner(starting_height) + height_to_index.iter().get(starting_height) } } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 2b75f2ff6..8926ed03f 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -15,14 +15,16 @@ use brk_types::{ use log::{error, info}; use rayon::prelude::*; use rustc_hash::{FxHashMap, FxHashSet}; -use vecdb::{AnyVec, Exit, GenericStoredVec, Reader, VecIterator}; +use vecdb::{AnyVec, Exit, GenericStoredVec, Reader, VecIteratorExtended}; mod indexes; -mod stores_v2; +mod stores_redb; +// mod stores_v2; // mod stores_v3; mod vecs; pub use indexes::*; -pub use stores_v2::*; +pub use stores_redb::*; +// pub use stores_v2::*; // pub use stores_v3::*; pub use vecs::*; @@ -80,9 +82,8 @@ impl Indexer { exit: &Exit, check_collisions: bool, ) -> Result { - let (starting_indexes, prev_hash) = if let Some(hash) = - VecIterator::last(self.vecs.height_to_blockhash.iter()).map(|(_, v)| v) - { + let last_blockhash = self.vecs.height_to_blockhash.iter()?.last(); + let (starting_indexes, prev_hash) = if let Some(hash) = last_blockhash { let (height, hash) = client.get_closest_valid_height(hash)?; let starting_indexes = Indexes::from((height.incremented(), &mut self.vecs, &self.stores)); diff --git a/crates/brk_indexer/src/stores_redb.rs b/crates/brk_indexer/src/stores_redb.rs new file mode 100644 index 000000000..14eb26cc0 --- /dev/null +++ b/crates/brk_indexer/src/stores_redb.rs @@ -0,0 +1,408 @@ +use std::{fs, path::Path, sync::Arc}; + +use brk_error::Result; +use brk_grouper::ByAddressType; +use brk_store::{AnyStore, StoreRedb as Store}; +use brk_types::{ + AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex, + TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version, + Vout, +}; +use rayon::prelude::*; +use redb::Database; +use vecdb::{AnyVec, GenericStoredVec, StoredIndex, VecIterator, VecIteratorExtended}; + +use crate::Indexes; + +use super::Vecs; + +#[derive(Clone)] +pub struct Stores { + pub database: Arc, + + pub addressbyteshash_to_typeindex: Store, + pub blockhashprefix_to_height: Store, + pub height_to_coinbase_tag: Store, + pub txidprefix_to_txindex: Store, + pub addresstype_to_typeindex_and_txindex: ByAddressType>, + pub addresstype_to_typeindex_and_unspentoutpoint: + ByAddressType>, +} + +impl Stores { + pub fn forced_import(parent: &Path, version: Version) -> Result { + let pathbuf = parent.join("stores"); + let path = pathbuf.as_path(); + + fs::create_dir_all(&pathbuf)?; + + let database = Arc::new(match brk_store::open_redb_database(path) { + Ok(database) => database, + Err(_) => { + fs::remove_dir_all(path)?; + return Self::forced_import(path, version); + } + }); + + let database_ref = &database; + + let create_addressindex_and_txindex_store = |index| { + Store::import( + database_ref, + path, + &format!("a2t{}", index), + version, + Some(false), + ) + }; + + let create_addressindex_and_unspentoutpoint_store = + |index| Store::import(database_ref, path, &format!("a2u{}", index), version, None); + + Ok(Self { + database: database.clone(), + + height_to_coinbase_tag: Store::import(database_ref, path, "h2c", version, None)?, + addressbyteshash_to_typeindex: Store::import(database_ref, path, "a2t", version, None)?, + blockhashprefix_to_height: Store::import(database_ref, path, "b2h", version, None)?, + txidprefix_to_txindex: Store::import(database_ref, path, "t2t", version, None)?, + addresstype_to_typeindex_and_txindex: ByAddressType::new_with_index( + create_addressindex_and_txindex_store, + )?, + addresstype_to_typeindex_and_unspentoutpoint: ByAddressType::new_with_index( + create_addressindex_and_unspentoutpoint_store, + )?, + }) + } + + pub fn starting_height(&self) -> Height { + self.iter_any_store() + .map(|store| { + // let height = + store.height().map(Height::incremented).unwrap_or_default() + // dbg!((height, store.name())); + }) + .min() + .unwrap() + } + + pub fn commit(&mut self, height: Height) -> Result<()> { + [ + &mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore, + &mut self.blockhashprefix_to_height, + &mut self.height_to_coinbase_tag, + &mut self.txidprefix_to_txindex, + ] + // .into_iter() // Changed from par_iter_mut() + .into_par_iter() // Changed from par_iter_mut() + .chain( + self.addresstype_to_typeindex_and_txindex + // .iter_mut() + .par_iter_mut() + .map(|s| s as &mut dyn AnyStore), + ) + .chain( + self.addresstype_to_typeindex_and_unspentoutpoint + // .iter_mut() + .par_iter_mut() + .map(|s| s as &mut dyn AnyStore), + ) + .try_for_each(|store| store.commit(height))?; + + Ok(()) + // self.database + // .persist(PersistMode::SyncAll) + // .map_err(|e| e.into()) + } + + fn iter_any_store(&self) -> impl Iterator { + [ + &self.addressbyteshash_to_typeindex as &dyn AnyStore, + &self.blockhashprefix_to_height, + &self.height_to_coinbase_tag, + &self.txidprefix_to_txindex, + ] + .into_iter() + .chain( + self.addresstype_to_typeindex_and_txindex + .iter() + .map(|s| s as &dyn AnyStore), + ) + .chain( + self.addresstype_to_typeindex_and_unspentoutpoint + .iter() + .map(|s| s as &dyn AnyStore), + ) + } + + pub fn rollback_if_needed( + &mut self, + vecs: &mut Vecs, + starting_indexes: &Indexes, + ) -> Result<()> { + if self.addressbyteshash_to_typeindex.is_empty()? + && self.blockhashprefix_to_height.is_empty()? + && self.txidprefix_to_txindex.is_empty()? + && self.height_to_coinbase_tag.is_empty()? + && self + .addresstype_to_typeindex_and_txindex + .iter() + .map(|s| s.is_empty()) + .collect::>>()? + .into_iter() + .all(|empty| empty) + && self + .addresstype_to_typeindex_and_unspentoutpoint + .iter() + .map(|s| s.is_empty()) + .collect::>>()? + .into_iter() + .all(|empty| empty) + { + return Ok(()); + } + + if starting_indexes.height != Height::ZERO { + vecs.height_to_blockhash + .iter()? + .skip(starting_indexes.height.to_usize()) + .map(BlockHashPrefix::from) + .for_each(|prefix| { + self.blockhashprefix_to_height.remove(prefix); + }); + + (starting_indexes.height.to_usize()..vecs.height_to_blockhash.len()) + .map(Height::from) + .for_each(|h| { + self.height_to_coinbase_tag.remove(h); + }); + + if let Ok(mut index) = vecs + .height_to_first_p2pk65addressindex + .one_shot_read(starting_indexes.height) + { + let mut p2pk65addressindex_to_p2pk65bytes_iter = + vecs.p2pk65addressindex_to_p2pk65bytes.iter()?; + + while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from(&bytes); + self.addressbyteshash_to_typeindex.remove(hash); + index.increment(); + } + } + + if let Ok(mut index) = vecs + .height_to_first_p2pk33addressindex + .one_shot_read(starting_indexes.height) + { + let mut p2pk33addressindex_to_p2pk33bytes_iter = + vecs.p2pk33addressindex_to_p2pk33bytes.iter()?; + + while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from(&bytes); + self.addressbyteshash_to_typeindex.remove(hash); + index.increment(); + } + } + + if let Ok(mut index) = vecs + .height_to_first_p2pkhaddressindex + .one_shot_read(starting_indexes.height) + { + let mut p2pkhaddressindex_to_p2pkhbytes_iter = + vecs.p2pkhaddressindex_to_p2pkhbytes.iter()?; + + while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from(&bytes); + self.addressbyteshash_to_typeindex.remove(hash); + index.increment(); + } + } + + if let Ok(mut index) = vecs + .height_to_first_p2shaddressindex + .one_shot_read(starting_indexes.height) + { + let mut p2shaddressindex_to_p2shbytes_iter = + vecs.p2shaddressindex_to_p2shbytes.iter()?; + + while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from(&bytes); + self.addressbyteshash_to_typeindex.remove(hash); + index.increment(); + } + } + + if let Ok(mut index) = vecs + .height_to_first_p2traddressindex + .one_shot_read(starting_indexes.height) + { + let mut p2traddressindex_to_p2trbytes_iter = + vecs.p2traddressindex_to_p2trbytes.iter()?; + + while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from(&bytes); + self.addressbyteshash_to_typeindex.remove(hash); + index.increment(); + } + } + + if let Ok(mut index) = vecs + .height_to_first_p2wpkhaddressindex + .one_shot_read(starting_indexes.height) + { + let mut p2wpkhaddressindex_to_p2wpkhbytes_iter = + vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter()?; + + while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from(&bytes); + self.addressbyteshash_to_typeindex.remove(hash); + index.increment(); + } + } + + if let Ok(mut index) = vecs + .height_to_first_p2wshaddressindex + .one_shot_read(starting_indexes.height) + { + let mut p2wshaddressindex_to_p2wshbytes_iter = + vecs.p2wshaddressindex_to_p2wshbytes.iter()?; + + while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from(&bytes); + self.addressbyteshash_to_typeindex.remove(hash); + index.increment(); + } + } + + if let Ok(mut index) = vecs + .height_to_first_p2aaddressindex + .one_shot_read(starting_indexes.height) + { + let mut p2aaddressindex_to_p2abytes_iter = + vecs.p2aaddressindex_to_p2abytes.iter()?; + + while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from(&bytes); + self.addressbyteshash_to_typeindex.remove(hash); + index.increment(); + } + } + } else { + unreachable!(); + // self.blockhashprefix_to_height.reset()?; + // self.addressbyteshash_to_typeindex.reset()?; + } + + if starting_indexes.txindex != TxIndex::ZERO { + vecs.txindex_to_txid + .iter()? + .enumerate() + .skip(starting_indexes.txindex.to_usize()) + .for_each(|(txindex, txid)| { + let txindex = TxIndex::from(txindex); + + let txidprefix = TxidPrefix::from(&txid); + + // "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599" + let is_not_first_dup = txindex != TxIndex::new(142783) + || txidprefix != TxidPrefix::from([153, 133, 216, 41, 84, 225, 15, 34]); + + // "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468" + let is_not_second_dup = txindex != TxIndex::new(142841) + || txidprefix != TxidPrefix::from([104, 180, 95, 88, 182, 116, 233, 78]); + + if is_not_first_dup && is_not_second_dup { + self.txidprefix_to_txindex.remove(txidprefix); + } + }); + } else { + unreachable!(); + // self.txidprefix_to_txindex.reset()?; + } + + if starting_indexes.txoutindex != TxOutIndex::ZERO { + let mut txoutindex_to_txindex_iter = vecs.txoutindex_to_txindex.iter()?; + let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?; + vecs.txoutindex_to_outputtype + .iter()? + .enumerate() + .skip(starting_indexes.txoutindex.to_usize()) + .zip( + vecs.txoutindex_to_typeindex + .iter()? + .skip(starting_indexes.txoutindex.to_usize()), + ) + .filter(|((_, outputtype), _)| outputtype.is_address()) + .for_each(|((txoutindex, outputtype), typeindex)| { + let txindex = txoutindex_to_txindex_iter.unsafe_get_(txoutindex); + + let vout = Vout::from( + txoutindex.to_usize() + - txindex_to_first_txoutindex_iter + .unsafe_get(txindex) + .to_usize(), + ); + let outpoint = OutPoint::new(txindex, vout); + + self.addresstype_to_typeindex_and_unspentoutpoint + .get_mut(outputtype) + .unwrap() + .remove(TypeIndexAndOutPoint::from((typeindex, outpoint))); + }); + + // Add back outputs that were spent after the rollback point + let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?; + let mut txoutindex_to_outputtype_iter = vecs.txoutindex_to_outputtype.iter()?; + let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.iter()?; + vecs.txinindex_to_outpoint + .iter()? + .skip(starting_indexes.txinindex.to_usize()) + .for_each(|outpoint| { + if outpoint.is_coinbase() { + return; + } + + let txindex = outpoint.txindex(); + let vout = outpoint.vout(); + + // Calculate txoutindex from txindex and vout + let txoutindex = txindex_to_first_txoutindex_iter.unsafe_get(txindex) + vout; + + // Only process if this output was created before the rollback point + if txoutindex < starting_indexes.txoutindex { + let outputtype = txoutindex_to_outputtype_iter.unsafe_get(txoutindex); + + if outputtype.is_address() { + let typeindex = txoutindex_to_typeindex_iter.unsafe_get(txoutindex); + + self.addresstype_to_typeindex_and_unspentoutpoint + .get_mut(outputtype) + .unwrap() + .insert(TypeIndexAndOutPoint::from((typeindex, outpoint)), Unit); + } + } + }); + } else { + unreachable!(); + // self.addresstype_to_typeindex_and_txindex + // .iter_mut() + // .try_for_each(|s| s.reset())?; + // self.addresstype_to_typeindex_and_unspentoutpoint + // .iter_mut() + // .try_for_each(|s| s.reset())?; + } + + self.commit(starting_indexes.height.decremented().unwrap_or_default())?; + + Ok(()) + } +} diff --git a/crates/brk_indexer/src/stores_v2.rs b/crates/brk_indexer/src/stores_v2.rs index 67b4b56b4..f281374ce 100644 --- a/crates/brk_indexer/src/stores_v2.rs +++ b/crates/brk_indexer/src/stores_v2.rs @@ -2,7 +2,7 @@ use std::{fs, path::Path}; use brk_error::Result; use brk_grouper::ByAddressType; -use brk_store::{AnyStore, StoreV2 as Store}; +use brk_store::{AnyStore, StoreFjallV2 as Store}; use brk_types::{ AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version, @@ -10,7 +10,7 @@ use brk_types::{ }; use fjall2::{PersistMode, TransactionalKeyspace}; use rayon::prelude::*; -use vecdb::{AnyVec, StoredIndex, VecIterator}; +use vecdb::{AnyVec, GenericStoredVec, StoredIndex, VecIterator, VecIteratorExtended}; use crate::Indexes; @@ -160,10 +160,11 @@ impl Stores { if starting_indexes.height != Height::ZERO { vecs.height_to_blockhash - .iter_at(starting_indexes.height) - .for_each(|(_, v)| { - let blockhashprefix = BlockHashPrefix::from(v); - self.blockhashprefix_to_height.remove(blockhashprefix); + .iter()? + .skip(starting_indexes.height.to_usize()) + .map(BlockHashPrefix::from) + .for_each(|prefix| { + self.blockhashprefix_to_height.remove(prefix); }); (starting_indexes.height.to_usize()..vecs.height_to_blockhash.len()) @@ -172,13 +173,12 @@ impl Stores { self.height_to_coinbase_tag.remove(h); }); - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2pk65addressindex - .iter() - .get(starting_indexes.height) + .one_shot_read(starting_indexes.height) { let mut p2pk65addressindex_to_p2pk65bytes_iter = - vecs.p2pk65addressindex_to_p2pk65bytes.iter(); + vecs.p2pk65addressindex_to_p2pk65bytes.iter()?; while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); @@ -188,13 +188,12 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2pk33addressindex - .iter() - .get(starting_indexes.height) + .one_shot_read(starting_indexes.height) { let mut p2pk33addressindex_to_p2pk33bytes_iter = - vecs.p2pk33addressindex_to_p2pk33bytes.iter(); + vecs.p2pk33addressindex_to_p2pk33bytes.iter()?; while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); @@ -204,13 +203,12 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2pkhaddressindex - .iter() - .get(starting_indexes.height) + .one_shot_read(starting_indexes.height) { let mut p2pkhaddressindex_to_p2pkhbytes_iter = - vecs.p2pkhaddressindex_to_p2pkhbytes.iter(); + vecs.p2pkhaddressindex_to_p2pkhbytes.iter()?; while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); @@ -220,13 +218,12 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2shaddressindex - .iter() - .get(starting_indexes.height) + .one_shot_read(starting_indexes.height) { let mut p2shaddressindex_to_p2shbytes_iter = - vecs.p2shaddressindex_to_p2shbytes.iter(); + vecs.p2shaddressindex_to_p2shbytes.iter()?; while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); @@ -236,13 +233,12 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2traddressindex - .iter() - .get(starting_indexes.height) + .one_shot_read(starting_indexes.height) { let mut p2traddressindex_to_p2trbytes_iter = - vecs.p2traddressindex_to_p2trbytes.iter(); + vecs.p2traddressindex_to_p2trbytes.iter()?; while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); @@ -252,13 +248,12 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2wpkhaddressindex - .iter() - .get(starting_indexes.height) + .one_shot_read(starting_indexes.height) { let mut p2wpkhaddressindex_to_p2wpkhbytes_iter = - vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter(); + vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter()?; while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); @@ -268,13 +263,12 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2wshaddressindex - .iter() - .get(starting_indexes.height) + .one_shot_read(starting_indexes.height) { let mut p2wshaddressindex_to_p2wshbytes_iter = - vecs.p2wshaddressindex_to_p2wshbytes.iter(); + vecs.p2wshaddressindex_to_p2wshbytes.iter()?; while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); @@ -284,12 +278,12 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2aaddressindex - .iter() - .get(starting_indexes.height) + .one_shot_read(starting_indexes.height) { - let mut p2aaddressindex_to_p2abytes_iter = vecs.p2aaddressindex_to_p2abytes.iter(); + let mut p2aaddressindex_to_p2abytes_iter = + vecs.p2aaddressindex_to_p2abytes.iter()?; while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); @@ -306,8 +300,12 @@ impl Stores { if starting_indexes.txindex != TxIndex::ZERO { vecs.txindex_to_txid - .iter_at(starting_indexes.txindex) + .iter()? + .enumerate() + .skip(starting_indexes.txindex.to_usize()) .for_each(|(txindex, txid)| { + let txindex = TxIndex::from(txindex); + let txidprefix = TxidPrefix::from(&txid); // "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599" @@ -328,23 +326,25 @@ impl Stores { } if starting_indexes.txoutindex != TxOutIndex::ZERO { + let mut txoutindex_to_txindex_iter = vecs.txoutindex_to_txindex.iter()?; + let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?; vecs.txoutindex_to_outputtype - .iter_at(starting_indexes.txoutindex) + .iter()? + .enumerate() + .skip(starting_indexes.txoutindex.to_usize()) .zip( vecs.txoutindex_to_typeindex - .iter_at(starting_indexes.txoutindex), + .iter()? + .skip(starting_indexes.txoutindex.to_usize()), ) .filter(|((_, outputtype), _)| outputtype.is_address()) - .for_each(|((txoutindex, outputtype), (_, typeindex))| { - let txindex = vecs.txoutindex_to_txindex.iter().get(txoutindex).unwrap(); + .for_each(|((txoutindex, outputtype), typeindex)| { + let txindex = txoutindex_to_txindex_iter.unsafe_get_(txoutindex); let vout = Vout::from( txoutindex.to_usize() - - vecs - .txindex_to_first_txoutindex - .iter() - .get(txindex) - .unwrap() + - txindex_to_first_txoutindex_iter + .unsafe_get(txindex) .to_usize(), ); let outpoint = OutPoint::new(txindex, vout); @@ -356,9 +356,13 @@ impl Stores { }); // Add back outputs that were spent after the rollback point + let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?; + let mut txoutindex_to_outputtype_iter = vecs.txoutindex_to_outputtype.iter()?; + let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.iter()?; vecs.txinindex_to_outpoint - .iter_at(starting_indexes.txinindex) - .for_each(|(_, outpoint)| { + .iter()? + .skip(starting_indexes.txinindex.to_usize()) + .for_each(|outpoint| { if outpoint.is_coinbase() { return; } @@ -367,24 +371,14 @@ impl Stores { let vout = outpoint.vout(); // Calculate txoutindex from txindex and vout - let txoutindex = vecs - .txindex_to_first_txoutindex - .iter() - .get(txindex) - .unwrap() - + vout; + let txoutindex = txindex_to_first_txoutindex_iter.unsafe_get(txindex) + vout; // Only process if this output was created before the rollback point if txoutindex < starting_indexes.txoutindex { - let outputtype = vecs - .txoutindex_to_outputtype - .iter() - .get(txoutindex) - .unwrap(); + let outputtype = txoutindex_to_outputtype_iter.unsafe_get(txoutindex); if outputtype.is_address() { - let typeindex = - vecs.txoutindex_to_typeindex.iter().get(txoutindex).unwrap(); + let typeindex = txoutindex_to_typeindex_iter.unsafe_get(txoutindex); self.addresstype_to_typeindex_and_unspentoutpoint .get_mut(outputtype) diff --git a/crates/brk_indexer/src/stores_v3.rs b/crates/brk_indexer/src/stores_v3.rs index 144796374..4876dd13d 100644 --- a/crates/brk_indexer/src/stores_v3.rs +++ b/crates/brk_indexer/src/stores_v3.rs @@ -1,15 +1,17 @@ -use std::{borrow::Cow, fs, path::Path}; +use std::{fs, path::Path}; use brk_error::Result; use brk_grouper::ByAddressType; -use brk_store::{AnyStore, StoreV3 as Store}; +use brk_store::{AnyStore, StoreFjallV3 as Store}; use brk_types::{ - AddressBytes, AddressBytesHash, BlockHashPrefix, Height, StoredString, TxIndex, TxOutIndex, - TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version, + AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex, + TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version, + Vout, }; use fjall3::{PersistMode, TxDatabase}; +use log::info; use rayon::prelude::*; -use vecdb::{AnyVec, StoredIndex, VecIterator}; +use vecdb::{AnyVec, GenericStoredVec, StoredIndex, VecIterator, VecIteratorExtended}; use crate::Indexes; @@ -35,7 +37,7 @@ impl Stores { fs::create_dir_all(&pathbuf)?; - let database = match brk_store::open_database(path) { + let database = match brk_store::open_fjall2_database(path) { Ok(database) => database, Err(_) => { fs::remove_dir_all(path)?; @@ -45,61 +47,30 @@ impl Stores { let database_ref = &database; - let create_addressindex_and_txindex_store = |cohort| { + let create_addressindex_and_txindex_store = |index| { Store::import( database_ref, path, - &format!("{}addressindex_and_txindex", cohort), + &format!("a2t{}", index), version, Some(false), ) }; - let create_addressindex_and_unspentoutpoint_store = |cohort| { - Store::import( - database_ref, - path, - &format!("{}addressindex_and_unspentoutpoint", cohort), - version, - None, - ) - }; + let create_addressindex_and_unspentoutpoint_store = + |index| Store::import(database_ref, path, &format!("a2u{}", index), version, None); Ok(Self { database: database.clone(), - height_to_coinbase_tag: Store::import( - database_ref, - path, - "height_to_coinbase_tag", - version, - None, - )?, - addressbyteshash_to_typeindex: Store::import( - database_ref, - path, - "addressbyteshash_to_typeindex", - version, - None, - )?, - blockhashprefix_to_height: Store::import( - database_ref, - path, - "blockhashprefix_to_height", - version, - None, - )?, - txidprefix_to_txindex: Store::import( - database_ref, - path, - "txidprefix_to_txindex", - version, - None, - )?, - addresstype_to_typeindex_and_txindex: ByAddressType::new( + height_to_coinbase_tag: Store::import(database_ref, path, "h2c", version, None)?, + addressbyteshash_to_typeindex: Store::import(database_ref, path, "a2t", version, None)?, + blockhashprefix_to_height: Store::import(database_ref, path, "b2h", version, None)?, + txidprefix_to_txindex: Store::import(database_ref, path, "t2t", version, None)?, + addresstype_to_typeindex_and_txindex: ByAddressType::new_with_index( create_addressindex_and_txindex_store, )?, - addresstype_to_typeindex_and_unspentoutpoint: ByAddressType::new( + addresstype_to_typeindex_and_unspentoutpoint: ByAddressType::new_with_index( create_addressindex_and_unspentoutpoint_store, )?, }) @@ -117,6 +88,12 @@ impl Stores { } pub fn commit(&mut self, height: Height) -> Result<()> { + info!( + "database.write_buffer_size = {}", + self.database.write_buffer_size() + ); + info!("database.journal_count = {}", self.database.journal_count()); + [ &mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore, &mut self.blockhashprefix_to_height, @@ -136,11 +113,15 @@ impl Stores { ) .try_for_each(|store| store.commit(height))?; - Ok(()) + info!( + "database.write_buffer_size = {}", + self.database.write_buffer_size() + ); + info!("database.journal_count = {}", self.database.journal_count()); - // self.database - // .persist(PersistMode::SyncAll) - // .map_err(|e| e.into()) + self.database + .persist(PersistMode::SyncAll) + .map_err(|e| e.into()) } fn iter_any_store(&self) -> impl Iterator { @@ -192,10 +173,11 @@ impl Stores { if starting_indexes.height != Height::ZERO { vecs.height_to_blockhash - .iter_at(starting_indexes.height) - .for_each(|(_, v)| { - let blockhashprefix = BlockHashPrefix::from(v.into_owned()); - self.blockhashprefix_to_height.remove(blockhashprefix); + .iter()? + .skip(starting_indexes.height.to_usize()) + .map(BlockHashPrefix::from) + .for_each(|prefix| { + self.blockhashprefix_to_height.remove(prefix); }); (starting_indexes.height.to_usize()..vecs.height_to_blockhash.len()) @@ -204,19 +186,14 @@ impl Stores { self.height_to_coinbase_tag.remove(h); }); - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2pk65addressindex - .iter() - .get(starting_indexes.height) - .map(Cow::into_owned) + .one_shot_read(starting_indexes.height) { let mut p2pk65addressindex_to_p2pk65bytes_iter = - vecs.p2pk65addressindex_to_p2pk65bytes.iter(); + vecs.p2pk65addressindex_to_p2pk65bytes.iter()?; - while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter - .get(index) - .map(Cow::into_owned) - { + while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); let hash = AddressBytesHash::from(&bytes); self.addressbyteshash_to_typeindex.remove(hash); @@ -224,19 +201,14 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2pk33addressindex - .iter() - .get(starting_indexes.height) - .map(Cow::into_owned) + .one_shot_read(starting_indexes.height) { let mut p2pk33addressindex_to_p2pk33bytes_iter = - vecs.p2pk33addressindex_to_p2pk33bytes.iter(); + vecs.p2pk33addressindex_to_p2pk33bytes.iter()?; - while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter - .get(index) - .map(Cow::into_owned) - { + while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); let hash = AddressBytesHash::from(&bytes); self.addressbyteshash_to_typeindex.remove(hash); @@ -244,19 +216,14 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2pkhaddressindex - .iter() - .get(starting_indexes.height) - .map(Cow::into_owned) + .one_shot_read(starting_indexes.height) { let mut p2pkhaddressindex_to_p2pkhbytes_iter = - vecs.p2pkhaddressindex_to_p2pkhbytes.iter(); + vecs.p2pkhaddressindex_to_p2pkhbytes.iter()?; - while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter - .get(index) - .map(Cow::into_owned) - { + while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); let hash = AddressBytesHash::from(&bytes); self.addressbyteshash_to_typeindex.remove(hash); @@ -264,19 +231,14 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2shaddressindex - .iter() - .get(starting_indexes.height) - .map(Cow::into_owned) + .one_shot_read(starting_indexes.height) { let mut p2shaddressindex_to_p2shbytes_iter = - vecs.p2shaddressindex_to_p2shbytes.iter(); + vecs.p2shaddressindex_to_p2shbytes.iter()?; - while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter - .get(index) - .map(Cow::into_owned) - { + while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); let hash = AddressBytesHash::from(&bytes); self.addressbyteshash_to_typeindex.remove(hash); @@ -284,19 +246,14 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2traddressindex - .iter() - .get(starting_indexes.height) - .map(Cow::into_owned) + .one_shot_read(starting_indexes.height) { let mut p2traddressindex_to_p2trbytes_iter = - vecs.p2traddressindex_to_p2trbytes.iter(); + vecs.p2traddressindex_to_p2trbytes.iter()?; - while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter - .get(index) - .map(Cow::into_owned) - { + while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); let hash = AddressBytesHash::from(&bytes); self.addressbyteshash_to_typeindex.remove(hash); @@ -304,19 +261,14 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2wpkhaddressindex - .iter() - .get(starting_indexes.height) - .map(Cow::into_owned) + .one_shot_read(starting_indexes.height) { let mut p2wpkhaddressindex_to_p2wpkhbytes_iter = - vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter(); + vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter()?; - while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter - .get(index) - .map(Cow::into_owned) - { + while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); let hash = AddressBytesHash::from(&bytes); self.addressbyteshash_to_typeindex.remove(hash); @@ -324,19 +276,14 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2wshaddressindex - .iter() - .get(starting_indexes.height) - .map(Cow::into_owned) + .one_shot_read(starting_indexes.height) { let mut p2wshaddressindex_to_p2wshbytes_iter = - vecs.p2wshaddressindex_to_p2wshbytes.iter(); + vecs.p2wshaddressindex_to_p2wshbytes.iter()?; - while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter - .get(index) - .map(Cow::into_owned) - { + while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); let hash = AddressBytesHash::from(&bytes); self.addressbyteshash_to_typeindex.remove(hash); @@ -344,18 +291,14 @@ impl Stores { } } - if let Some(mut index) = vecs + if let Ok(mut index) = vecs .height_to_first_p2aaddressindex - .iter() - .get(starting_indexes.height) - .map(Cow::into_owned) + .one_shot_read(starting_indexes.height) { - let mut p2aaddressindex_to_p2abytes_iter = vecs.p2aaddressindex_to_p2abytes.iter(); + let mut p2aaddressindex_to_p2abytes_iter = + vecs.p2aaddressindex_to_p2abytes.iter()?; - while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter - .get(index) - .map(Cow::into_owned) - { + while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); let hash = AddressBytesHash::from(&bytes); self.addressbyteshash_to_typeindex.remove(hash); @@ -363,15 +306,20 @@ impl Stores { } } } else { - self.blockhashprefix_to_height.reset()?; - self.addressbyteshash_to_typeindex.reset()?; + unreachable!(); + // self.blockhashprefix_to_height.reset()?; + // self.addressbyteshash_to_typeindex.reset()?; } if starting_indexes.txindex != TxIndex::ZERO { vecs.txindex_to_txid - .iter_at(starting_indexes.txindex) + .iter()? + .enumerate() + .skip(starting_indexes.txindex.to_usize()) .for_each(|(txindex, txid)| { - let txidprefix = TxidPrefix::from(&txid.into_owned()); + let txindex = TxIndex::from(txindex); + + let txidprefix = TxidPrefix::from(&txid); // "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599" let is_not_first_dup = txindex != TxIndex::new(142783) @@ -386,32 +334,80 @@ impl Stores { } }); } else { - self.txidprefix_to_txindex.reset()?; + unreachable!(); + // self.txidprefix_to_txindex.reset()?; } if starting_indexes.txoutindex != TxOutIndex::ZERO { - // todo!(); - // let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.into_iter(); - // vecs.txoutindex_to_outputtype - // .iter_at(starting_indexes.txoutindex) - // .filter(|(_, outputtype)| outputtype.is_address()) - // .for_each(|(txoutindex, outputtype)| { - // let outputtype = outputtype.into_owned(); + let mut txoutindex_to_txindex_iter = vecs.txoutindex_to_txindex.iter()?; + let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?; + vecs.txoutindex_to_outputtype + .iter()? + .enumerate() + .skip(starting_indexes.txoutindex.to_usize()) + .zip( + vecs.txoutindex_to_typeindex + .iter()? + .skip(starting_indexes.txoutindex.to_usize()), + ) + .filter(|((_, outputtype), _)| outputtype.is_address()) + .for_each(|((txoutindex, outputtype), typeindex)| { + let txindex = txoutindex_to_txindex_iter.unsafe_get_(txoutindex); - // let typeindex = txoutindex_to_typeindex_iter.unwrap_get_inner(txoutindex); + let vout = Vout::from( + txoutindex.to_usize() + - txindex_to_first_txoutindex_iter + .unsafe_get(txindex) + .to_usize(), + ); + let outpoint = OutPoint::new(txindex, vout); - // self.addresstype_to_typeindex_and_unspentoutpoint - // .get_mut(outputtype) - // .unwrap() - // .remove(TypeIndexAndTxIndex::from((typeindex, txoutindex))); - // }); + self.addresstype_to_typeindex_and_unspentoutpoint + .get_mut(outputtype) + .unwrap() + .remove(TypeIndexAndOutPoint::from((typeindex, outpoint))); + }); + + // Add back outputs that were spent after the rollback point + let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?; + let mut txoutindex_to_outputtype_iter = vecs.txoutindex_to_outputtype.iter()?; + let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.iter()?; + vecs.txinindex_to_outpoint + .iter()? + .skip(starting_indexes.txinindex.to_usize()) + .for_each(|outpoint| { + if outpoint.is_coinbase() { + return; + } + + let txindex = outpoint.txindex(); + let vout = outpoint.vout(); + + // Calculate txoutindex from txindex and vout + let txoutindex = txindex_to_first_txoutindex_iter.unsafe_get(txindex) + vout; + + // Only process if this output was created before the rollback point + if txoutindex < starting_indexes.txoutindex { + let outputtype = txoutindex_to_outputtype_iter.unsafe_get(txoutindex); + + if outputtype.is_address() { + let typeindex = txoutindex_to_typeindex_iter.unsafe_get(txoutindex); + + self.addresstype_to_typeindex_and_unspentoutpoint + .get_mut(outputtype) + .unwrap() + .insert(TypeIndexAndOutPoint::from((typeindex, outpoint)), Unit); + } + } + }); } else { - self.addresstype_to_typeindex_and_txindex - .iter_mut() - .try_for_each(|s| s.reset())?; - self.addresstype_to_typeindex_and_unspentoutpoint - .iter_mut() - .try_for_each(|s| s.reset())?; + unreachable!(); + // self.addresstype_to_typeindex_and_txindex + // .iter_mut() + // .try_for_each(|s| s.reset())?; + // self.addresstype_to_typeindex_and_unspentoutpoint + // .iter_mut() + // .try_for_each(|s| s.reset())?; } self.commit(starting_indexes.height.decremented().unwrap_or_default())?; diff --git a/crates/brk_logger/src/lib.rs b/crates/brk_logger/src/lib.rs index ba36d4096..d1e38e2db 100644 --- a/crates/brk_logger/src/lib.rs +++ b/crates/brk_logger/src/lib.rs @@ -23,7 +23,7 @@ pub fn init(path: Option<&Path>) -> io::Result<()> { }); Builder::from_env(Env::default().default_filter_or( - "info,bitcoin=off,bitcoincore-rpc=off,fjall=off,lsm_tree=off,rolldown=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off,brk_aide=off", + "debug,bitcoin=off,bitcoincore-rpc=off,rolldown=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off,brk_aide=off", )) .format(move |buf, record| { let date_time = Timestamp::now() diff --git a/crates/brk_store/Cargo.toml b/crates/brk_store/Cargo.toml index 91beb32ad..558a763f8 100644 --- a/crates/brk_store/Cargo.toml +++ b/crates/brk_store/Cargo.toml @@ -18,7 +18,8 @@ byteview6 = { version = "=0.6.1", package = "byteview" } byteview8 = { version = "~0.8.0", package = "byteview" } candystore = "0.5.5" fjall2 = { workspace = true } -# fjall3 = { workspace = true } +fjall3 = { workspace = true } log = { workspace = true } parking_lot = { workspace = true } +redb = { workspace = true } rustc-hash = { workspace = true } diff --git a/crates/brk_store/src/any.rs b/crates/brk_store/src/any.rs index 0ab35ca3f..46adefe73 100644 --- a/crates/brk_store/src/any.rs +++ b/crates/brk_store/src/any.rs @@ -3,8 +3,6 @@ use brk_types::{Height, Version}; pub trait AnyStore: Send + Sync { fn commit(&mut self, height: Height) -> Result<()>; - fn persist(&self) -> Result<()>; - // fn reset(&mut self) -> Result<()>; fn name(&self) -> &'static str; fn height(&self) -> Option; fn has(&self, height: Height) -> bool; diff --git a/crates/brk_store/src/v2/meta.rs b/crates/brk_store/src/fjall_v2/meta.rs similarity index 100% rename from crates/brk_store/src/v2/meta.rs rename to crates/brk_store/src/fjall_v2/meta.rs diff --git a/crates/brk_store/src/v2/mod.rs b/crates/brk_store/src/fjall_v2/mod.rs similarity index 94% rename from crates/brk_store/src/v2/mod.rs rename to crates/brk_store/src/fjall_v2/mod.rs index 467d9e328..014d6ad65 100644 --- a/crates/brk_store/src/v2/mod.rs +++ b/crates/brk_store/src/fjall_v2/mod.rs @@ -4,8 +4,8 @@ use brk_error::Result; use brk_types::{Height, Version}; use byteview6::ByteView; use fjall2::{ - InnerItem, PartitionCreateOptions, PersistMode, TransactionalKeyspace, - TransactionalPartitionHandle, ValueType, + InnerItem, PartitionCreateOptions, TransactionalKeyspace, TransactionalPartitionHandle, + ValueType, }; use rustc_hash::{FxHashMap, FxHashSet}; @@ -16,7 +16,7 @@ mod meta; use meta::*; #[derive(Clone)] -pub struct StoreV2 { +pub struct StoreFjallV2 { meta: StoreMeta, name: &'static str, keyspace: TransactionalKeyspace, @@ -33,7 +33,7 @@ pub fn open_keyspace(path: &Path) -> fjall2::Result { .open_transactional() } -impl StoreV2 +impl StoreFjallV2 where K: Debug + Clone + From + Ord + Eq + Hash, V: Debug + Clone + From, @@ -163,7 +163,7 @@ where } } -impl AnyStore for StoreV2 +impl AnyStore for StoreFjallV2 where K: Debug + Clone + From + Ord + Eq + Hash, V: Debug + Clone + From, @@ -200,12 +200,6 @@ where Ok(()) } - fn persist(&self) -> Result<()> { - self.keyspace - .persist(PersistMode::SyncAll) - .map_err(|e| e.into()) - } - fn name(&self) -> &'static str { self.name } @@ -227,7 +221,7 @@ where } } -pub enum Item { +enum Item { Value { key: K, value: V }, Tomb(K), } diff --git a/crates/brk_store/src/v3/meta.rs b/crates/brk_store/src/fjall_v3/meta.rs similarity index 80% rename from crates/brk_store/src/v3/meta.rs rename to crates/brk_store/src/fjall_v3/meta.rs index 99a4beed8..2636f2952 100644 --- a/crates/brk_store/src/v3/meta.rs +++ b/crates/brk_store/src/fjall_v3/meta.rs @@ -5,7 +5,7 @@ use std::{ use brk_error::Result; use brk_types::Version; -use fjall3::{PersistMode, TxDatabase, TxKeyspace}; +use fjall3::{TxDatabase, TxKeyspace}; use super::Height; @@ -18,7 +18,7 @@ pub struct StoreMeta { impl StoreMeta { pub fn checked_open( - database: &TxDatabase, + _database: &TxDatabase, path: &Path, version: Version, open_partition_handle: F, @@ -28,18 +28,18 @@ impl StoreMeta { { fs::create_dir_all(path)?; - let mut partition = open_partition_handle()?; + let partition = open_partition_handle()?; if Version::try_from(Self::path_version_(path).as_path()) .is_ok_and(|prev_version| version != prev_version) { todo!(); - fs::remove_dir_all(path)?; - // Doesn't exist - // database.delete_partition(partition)?; - fs::create_dir(path)?; - database.persist(PersistMode::SyncAll)?; - partition = open_partition_handle()?; + // fs::remove_dir_all(path)?; + // // Doesn't exist + // // database.delete_partition(partition)?; + // fs::create_dir(path)?; + // database.persist(PersistMode::SyncAll)?; + // partition = open_partition_handle()?; } let slf = Self { @@ -62,10 +62,6 @@ impl StoreMeta { height.write(&self.path_height()) } - pub fn reset(&mut self) { - self.height.take(); - } - pub fn path(&self) -> &Path { &self.pathbuf } diff --git a/crates/brk_store/src/fjall_v3/mod.rs b/crates/brk_store/src/fjall_v3/mod.rs new file mode 100644 index 000000000..ad9a19aa1 --- /dev/null +++ b/crates/brk_store/src/fjall_v3/mod.rs @@ -0,0 +1,289 @@ +use std::{borrow::Cow, cmp, fmt::Debug, fs, hash::Hash, mem, path::Path}; + +use brk_error::Result; +use brk_types::{Height, Version}; +use byteview8::ByteView; +use fjall3::{ + KeyspaceCreateOptions, TxDatabase, TxKeyspace, ValueType, + config::{BloomConstructionPolicy, FilterPolicy, FilterPolicyEntry}, +}; + +mod meta; + +use meta::*; +use rustc_hash::{FxHashMap, FxHashSet}; + +use crate::any::AnyStore; + +#[derive(Clone)] +pub struct StoreFjallV3 { + meta: StoreMeta, + name: &'static str, + database: TxDatabase, + keyspace: TxKeyspace, + puts: FxHashMap, + dels: FxHashSet, +} + +const MAJOR_FJALL_VERSION: Version = Version::new(3); + +pub fn open_fjall2_database(path: &Path) -> fjall3::Result { + TxDatabase::builder(path.join("fjall")) + .max_write_buffer_size(32 * 1024 * 1024) + .cache_size(1024 * 1024 * 1024) + .open() +} + +impl StoreFjallV3 +where + K: Debug + Clone + From + Ord + Eq + Hash, + V: Debug + Clone + From, + ByteView: From + From, +{ + fn open_keyspace( + database: &TxDatabase, + name: &str, + bloom_filters: Option, + ) -> Result { + let mut options = KeyspaceCreateOptions::default() + .manual_journal_persist(true) + .max_memtable_size(8 * 1024 * 1024); + + if bloom_filters.is_some_and(|b| !b) { + options = options.filter_policy(FilterPolicy::disabled()); + } else { + options = options.filter_policy(FilterPolicy::all(FilterPolicyEntry::Bloom( + BloomConstructionPolicy::BitsPerKey(5.0), + ))); + } + + database.keyspace(name, options).map_err(|e| e.into()) + } + + pub fn import( + database: &TxDatabase, + path: &Path, + name: &str, + version: Version, + bloom_filters: Option, + ) -> Result { + fs::create_dir_all(path)?; + + let (meta, keyspace) = StoreMeta::checked_open( + database, + &path.join(format!("meta/{name}")), + MAJOR_FJALL_VERSION + version, + || { + Self::open_keyspace(database, name, bloom_filters).inspect_err(|e| { + eprintln!("{e}"); + eprintln!("Delete {path:?} and try again"); + }) + }, + )?; + + Ok(Self { + meta, + name: Box::leak(Box::new(name.to_string())), + database: database.clone(), + keyspace, + puts: FxHashMap::default(), + dels: FxHashSet::default(), + }) + } + + #[inline] + pub fn get<'a>(&'a self, key: &'a K) -> Result>> + where + ByteView: From<&'a K>, + { + if let Some(v) = self.puts.get(key) { + Ok(Some(Cow::Borrowed(v))) + // } else if let Some(slice) = self + // .database + // .read_tx() + // .get(&self.keyspace, ByteView::from(key))? + // { + } else if let Some(slice) = self.keyspace.get(ByteView::from(key))? { + Ok(Some(Cow::Owned(V::from(ByteView::from(slice))))) + } else { + Ok(None) + } + } + + #[inline] + pub fn is_empty(&self) -> Result { + self.database + .read_tx() + .is_empty(&self.keyspace) + .map_err(|e| e.into()) + } + + #[inline] + pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { + if self.needs(height) { + self.insert(key, value); + } + } + + #[inline] + pub fn insert(&mut self, key: K, value: V) { + let _ = self.dels.is_empty() || self.dels.remove(&key); + self.puts.insert(key, value); + } + + #[inline] + pub fn remove(&mut self, key: K) { + // Hot path: key was recently inserted + if self.puts.remove(&key).is_some() { + return; + } + + let newly_inserted = self.dels.insert(key); + debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path()); + } + + #[inline] + pub fn remove_if_needed(&mut self, key: K, height: Height) { + if self.needs(height) { + self.remove(key) + } + } + + #[inline] + fn has(&self, height: Height) -> bool { + self.meta.has(height) + } + + #[inline] + fn needs(&self, height: Height) -> bool { + self.meta.needs(height) + } +} + +impl AnyStore for StoreFjallV3 +where + K: Debug + Clone + From + Ord + Eq + Hash, + V: Debug + Clone + From, + ByteView: From + From, + Self: Send + Sync, +{ + fn commit(&mut self, height: Height) -> Result<()> { + if self.has(height) { + return Ok(()); + } + + self.meta.export(height)?; + + if self.puts.is_empty() && self.dels.is_empty() { + return Ok(()); + } + + let mut batch = self.database.inner().batch(); + let mut items = mem::take(&mut self.puts) + .into_iter() + .map(|(key, value)| Item::Value { key, value }) + .chain( + mem::take(&mut self.dels) + .into_iter() + .map(|key| Item::Tomb(key)), + ) + .collect::>(); + items.sort_unstable(); + batch.data = items + .into_iter() + .map(|i| i.fjall(&self.keyspace)) + .collect::>(); + batch.commit_keyspace(self.keyspace.inner())?; + + // let mut wtx = self.database.write_tx(); + + // let mut dels = self.dels.drain().collect::>(); + // dels.sort_unstable(); + // dels.into_iter() + // .for_each(|key| wtx.remove(&self.keyspace, ByteView::from(key))); + + // let mut puts = self.puts.drain().collect::>(); + // puts.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); + // puts.into_iter().for_each(|(key, value)| { + // wtx.insert(&self.keyspace, ByteView::from(key), ByteView::from(value)) + // }); + + // wtx.commit()?; + + Ok(()) + } + + fn name(&self) -> &'static str { + self.name + } + + fn height(&self) -> Option { + self.meta.height() + } + + fn has(&self, height: Height) -> bool { + self.has(height) + } + + fn needs(&self, height: Height) -> bool { + self.needs(height) + } + + fn version(&self) -> Version { + self.meta.version() + } +} + +enum Item { + Value { key: K, value: V }, + Tomb(K), +} + +impl Ord for Item { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.key().cmp(other.key()) + } +} + +impl PartialOrd for Item { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for Item { + fn eq(&self, other: &Self) -> bool { + self.key() == other.key() + } +} + +impl Eq for Item {} + +impl Item { + fn key(&self) -> &K { + match self { + Self::Value { key, .. } | Self::Tomb(key) => key, + } + } + + pub fn fjall(self, keyspace: &TxKeyspace) -> fjall3::Item + where + K: Into, + V: Into, + { + match self { + Item::Value { key, value } => fjall3::Item { + keyspace_id: keyspace.inner().id, + key: key.into().into(), + value: value.into().into(), + value_type: ValueType::Value, + }, + Item::Tomb(key) => fjall3::Item { + keyspace_id: keyspace.inner().id, + key: key.into().into(), + value: [].into(), + value_type: ValueType::WeakTombstone, + }, + } + } +} diff --git a/crates/brk_store/src/lib.rs b/crates/brk_store/src/lib.rs index e3f9977d2..2753d3bf4 100644 --- a/crates/brk_store/src/lib.rs +++ b/crates/brk_store/src/lib.rs @@ -1,9 +1,11 @@ #![doc = include_str!("../README.md")] mod any; -mod v2; -// mod v3; +mod fjall_v2; +mod fjall_v3; +mod redb; pub use any::*; -pub use v2::*; -// pub use v3::*; +pub use fjall_v2::*; +pub use fjall_v3::*; +pub use redb::*; diff --git a/crates/brk_store/src/redb/meta.rs b/crates/brk_store/src/redb/meta.rs new file mode 100644 index 000000000..52609f2a5 --- /dev/null +++ b/crates/brk_store/src/redb/meta.rs @@ -0,0 +1,77 @@ +use std::{ + fs, io, + path::{Path, PathBuf}, +}; + +use brk_error::Result; +use brk_types::Version; + +use super::Height; + +#[derive(Debug, Clone)] +pub struct StoreMeta { + pathbuf: PathBuf, + version: Version, + height: Option, +} + +impl StoreMeta { + pub fn checked_open(path: &Path, version: Version) -> Result { + fs::create_dir_all(path)?; + + if Version::try_from(Self::path_version_(path).as_path()) + .is_ok_and(|prev_version| version != prev_version) + { + todo!(); + } + + let slf = Self { + pathbuf: path.to_owned(), + version, + height: Height::try_from(Self::path_height_(path).as_path()).ok(), + }; + + slf.version.write(&slf.path_version())?; + + Ok(slf) + } + + pub fn version(&self) -> Version { + self.version + } + + pub fn export(&mut self, height: Height) -> io::Result<()> { + self.height = Some(height); + height.write(&self.path_height()) + } + + pub fn path(&self) -> &Path { + &self.pathbuf + } + + fn path_version(&self) -> PathBuf { + Self::path_version_(&self.pathbuf) + } + fn path_version_(path: &Path) -> PathBuf { + path.join("version") + } + + #[inline] + pub fn height(&self) -> Option { + self.height + } + #[inline] + pub fn needs(&self, height: Height) -> bool { + self.height.is_none_or(|self_height| height > self_height) + } + #[inline] + pub fn has(&self, height: Height) -> bool { + !self.needs(height) + } + fn path_height(&self) -> PathBuf { + Self::path_height_(&self.pathbuf) + } + fn path_height_(path: &Path) -> PathBuf { + path.join("height") + } +} diff --git a/crates/brk_store/src/redb/mod.rs b/crates/brk_store/src/redb/mod.rs new file mode 100644 index 000000000..65552d16f --- /dev/null +++ b/crates/brk_store/src/redb/mod.rs @@ -0,0 +1,232 @@ +use std::{ + borrow::{Borrow, Cow}, + fmt::Debug, + fs, + hash::Hash, + mem::{self, transmute}, + path::Path, + sync::Arc, +}; + +use brk_error::Result; +use brk_types::{Height, Version}; +use parking_lot::RwLock; +use redb::{ + Builder, Database, Durability, Key, ReadOnlyTable, ReadableDatabase, ReadableTableMetadata, + TableDefinition, Value, +}; + +mod meta; + +use meta::*; +use rustc_hash::{FxHashMap, FxHashSet}; + +use crate::any::AnyStore; + +#[derive(Clone)] +pub struct StoreRedb +where + K: Key + 'static, + V: Value + 'static, +{ + meta: StoreMeta, + name: &'static str, + db: Arc, + table: Arc>>>, + puts: FxHashMap, + dels: FxHashSet, +} + +const MAJOR_FJALL_VERSION: Version = Version::new(3); + +pub fn open_redb_database(path: &Path) -> redb::Result { + let db = Builder::new() + .set_cache_size(4 * 1024 * 1024 * 1024) + .create(path.join("store.redb")) + .unwrap(); + Ok(db) +} + +impl StoreRedb +where + K: Key + Ord + Eq + Hash + 'static, + V: Value + Clone + 'static, +{ + pub fn import( + db: &Arc, + path: &Path, + name: &str, + version: Version, + _bloom_filters: Option, + ) -> Result { + fs::create_dir_all(path)?; + + let meta = StoreMeta::checked_open( + &path.join(format!("meta/{name}")), + MAJOR_FJALL_VERSION + version, + )?; + + { + let mut wtx = db.begin_write().unwrap(); + wtx.set_durability(Durability::Immediate).unwrap(); + let definition: TableDefinition = TableDefinition::new(name); + let table = wtx.open_table(definition).unwrap(); + drop(table); + wtx.commit().unwrap(); + } + + let definition: TableDefinition = TableDefinition::new(name); + let table = db.begin_read().unwrap().open_table(definition).unwrap(); + + Ok(Self { + db: db.clone(), + meta, + name: Box::leak(Box::new(name.to_string())), + table: Arc::new(RwLock::new(Some(table))), + puts: FxHashMap::default(), + dels: FxHashSet::default(), + }) + } + + // In case my hack doesn't work: + // https://github.com/cberner/redb/issues/869 + #[inline] + pub fn get<'a>(&'a self, key: &'a K) -> Result>> + where + &'a K: Borrow>, + V: From>, + { + if let Some(v) = self.puts.get(key) { + Ok(Some(Cow::Borrowed(v))) + } else if let Some(value) = self.table.read().as_ref().unwrap().get(key).unwrap() { + let selftype: ::SelfType<'static> = unsafe { transmute(value.value()) }; + let owned: V = selftype.into(); + Ok(Some(Cow::Owned(owned))) + } else { + Ok(None) + } + } + + #[inline] + pub fn is_empty(&self) -> Result { + Ok(self.table.read().as_ref().unwrap().len().unwrap() == 0) + } + + #[inline] + pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { + if self.needs(height) { + self.insert(key, value); + } + } + + #[inline] + pub fn insert(&mut self, key: K, value: V) { + let _ = self.dels.is_empty() || self.dels.remove(&key); + self.puts.insert(key, value); + } + + #[inline] + pub fn remove(&mut self, key: K) { + // Hot path: key was recently inserted + if self.puts.remove(&key).is_some() { + return; + } + + let newly_inserted = self.dels.insert(key); + debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path()); + } + + #[inline] + pub fn remove_if_needed(&mut self, key: K, height: Height) { + if self.needs(height) { + self.remove(key) + } + } + + #[inline] + fn has(&self, height: Height) -> bool { + self.meta.has(height) + } + + #[inline] + fn needs(&self, height: Height) -> bool { + self.meta.needs(height) + } +} + +impl AnyStore for StoreRedb +where + // K: Debug + Clone + From + Ord + Eq + Hash, + // V: Debug + Clone + From, + K: Debug + Clone + Key + Ord + Eq + Hash + 'static + Borrow>, + V: Debug + Clone + Value + 'static + Borrow>, + // ByteView: From + From, + Self: Send + Sync, +{ + fn commit(&mut self, height: Height) -> Result<()> { + if self.has(height) { + return Ok(()); + } + + self.meta.export(height)?; + + if self.puts.is_empty() && self.dels.is_empty() { + return Ok(()); + } + + // let mut _rtx_lock = self._rtx.write(); + // drop(_rtx_lock.take()); + let mut table_lock = self.table.write(); + drop(table_lock.take()); + + let mut wtx = self.db.begin_write().unwrap(); + wtx.set_durability(Durability::Immediate).unwrap(); + + let definition: TableDefinition = TableDefinition::new(self.name); + let mut table = wtx.open_table(definition).unwrap(); + + mem::take(&mut self.puts) + .into_iter() + .for_each(|(key, value)| { + table.insert(key, value).unwrap(); + }); + + mem::take(&mut self.dels).into_iter().for_each(|key| { + table.remove(key).unwrap(); + }); + + drop(table); + + wtx.commit().unwrap(); + + table_lock.replace( + self.db + .begin_read() + .unwrap() + .open_table(definition) + .unwrap(), + ); + + Ok(()) + } + + fn name(&self) -> &'static str { + self.name + } + + fn height(&self) -> Option { + self.meta.height() + } + + fn has(&self, height: Height) -> bool { + self.has(height) + } + + fn needs(&self, height: Height) -> bool { + self.needs(height) + } + + fn version(&self) -> Version { + self.meta.version() + } +} diff --git a/crates/brk_store/src/v3/mod.rs b/crates/brk_store/src/v3/mod.rs deleted file mode 100644 index 1ac25bb1f..000000000 --- a/crates/brk_store/src/v3/mod.rs +++ /dev/null @@ -1,256 +0,0 @@ -use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path, sync::Arc}; - -use brk_error::Result; -use brk_types::{Height, Version}; -use byteview8::ByteView; -use fjall3::{KeyspaceCreateOptions, PersistMode, TxDatabase, TxKeyspace}; - -mod meta; - -use log::info; -use meta::*; -use parking_lot::RwLock; -use rustc_hash::{FxHashMap, FxHashSet}; - -use crate::any::AnyStore; - -#[derive(Clone)] -pub struct StoreV3 { - meta: StoreMeta, - name: &'static str, - database: TxDatabase, - keyspace: Arc>>, - puts: FxHashMap, - dels: FxHashSet, -} - -const MAJOR_FJALL_VERSION: Version = Version::new(3); - -pub fn open_database(path: &Path) -> fjall3::Result { - TxDatabase::builder(path.join("fjall")) - .cache_size(4 * 1024 * 1024 * 1024) - // .max_write_buffer_size(bytes) - .open() -} - -impl StoreV3 -where - K: Debug + Clone + From + Ord + Eq + Hash, - V: Debug + Clone + From, - ByteView: From + From, -{ - fn open_keyspace(database: &TxDatabase, name: &str) -> Result { - database - .keyspace( - name, - KeyspaceCreateOptions::default().max_memtable_size(8 * 1024 * 1024), // .manual_journal_persist(true), - ) - .map_err(|e| e.into()) - } - - pub fn import( - database: &TxDatabase, - path: &Path, - name: &str, - version: Version, - _bloom_filters: Option, - ) -> Result { - fs::create_dir_all(path)?; - - let (meta, keyspace) = StoreMeta::checked_open( - database, - &path.join(format!("meta/{name}")), - MAJOR_FJALL_VERSION + version, - || { - Self::open_keyspace(database, name).inspect_err(|e| { - eprintln!("{e}"); - eprintln!("Delete {path:?} and try again"); - }) - }, - )?; - - Ok(Self { - meta, - name: Box::leak(Box::new(name.to_string())), - database: database.clone(), - keyspace: Arc::new(RwLock::new(Some(keyspace))), - puts: FxHashMap::default(), - dels: FxHashSet::default(), - }) - } - - #[inline] - pub fn get<'a>(&'a self, key: &'a K) -> Result>> - where - ByteView: From<&'a K>, - { - if let Some(v) = self.puts.get(key) { - Ok(Some(Cow::Borrowed(v))) - } else if let Some(slice) = self - .database - .read_tx() - .get(self.keyspace.read().as_ref().unwrap(), ByteView::from(key))? - { - Ok(Some(Cow::Owned(V::from(ByteView::from(slice))))) - } else { - Ok(None) - } - } - - #[inline] - pub fn is_empty(&self) -> Result { - self.database - .read_tx() - .is_empty(self.keyspace.read().as_ref().unwrap()) - .map_err(|e| e.into()) - } - - // pub fn iter(&self) -> impl Iterator { - // let keyspace = self.keyspace.read().as_ref().unwrap(); - - // self.rtx - // .read() - // .as_ref() - // .unwrap() - // .iter(keyspace) - // .map(|res| res.into_inner().unwrap()) - // .map(|(k, v)| (K::from(ByteView::from(k)), V::from(ByteView::from(v)))) - // } - - #[inline] - pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { - if self.needs(height) { - let _ = self.dels.is_empty() || self.dels.remove(&key); - self.puts.insert(key, value); - } - } - - #[inline] - pub fn remove(&mut self, key: K) { - // Hot path: key was recently inserted - if self.puts.remove(&key).is_some() { - return; - } - - let newly_inserted = self.dels.insert(key); - debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path()); - } - - #[inline] - pub fn remove_if_needed(&mut self, key: K, height: Height) { - if self.needs(height) { - self.remove(key) - } - } - - // pub fn retain_or_del(&mut self, retain: F) - // where - // F: Fn(&K, &mut V) -> bool, - // { - // self.puts.retain(|k, v| { - // let ret = retain(k, v); - // if !ret { - // self.dels.insert(k.clone()); - // } - // ret - // }); - // } - - #[inline] - fn has(&self, height: Height) -> bool { - self.meta.has(height) - } - - #[inline] - fn needs(&self, height: Height) -> bool { - self.meta.needs(height) - } -} - -impl AnyStore for StoreV3 -where - K: Debug + Clone + From + Ord + Eq + Hash, - V: Debug + Clone + From, - ByteView: From + From, - Self: Send + Sync, -{ - fn commit(&mut self, height: Height) -> Result<()> { - if self.has(height) { - return Ok(()); - } - - self.meta.export(height)?; - - if self.puts.is_empty() && self.dels.is_empty() { - return Ok(()); - } - - let mut wtx = self.database.write_tx(); - - let keyspace = self.keyspace.read(); - - let partition = keyspace.as_ref().unwrap(); - - let mut dels = self.dels.drain().collect::>(); - dels.sort_unstable(); - dels.into_iter() - .for_each(|key| wtx.remove(partition, ByteView::from(key))); - - let mut puts = self.puts.drain().collect::>(); - puts.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); - puts.into_iter().for_each(|(key, value)| { - wtx.insert(partition, ByteView::from(key), ByteView::from(value)) - }); - - wtx.commit()?; - - Ok(()) - } - - fn persist(&self) -> Result<()> { - self.database - .persist(PersistMode::SyncAll) - .map_err(|e| e.into()) - } - - fn name(&self) -> &'static str { - self.name - } - - // fn reset(&mut self) -> Result<()> { - // info!("Resetting {}...", self.name); - - // todo!(); - - // let mut opt = self.keyspace.write(); - - // let keyspace = opt.take().unwrap(); - - // // Doesn't exist yet - // // self.database.remove_keyspace(keyspace)?; - - // self.meta.reset(); - - // let keyspace = Self::open_keyspace(&self.database, self.name)?; - - // opt.replace(keyspace); - - // Ok(()) - // } - - fn height(&self) -> Option { - self.meta.height() - } - - fn has(&self, height: Height) -> bool { - self.has(height) - } - - fn needs(&self, height: Height) -> bool { - self.needs(height) - } - - fn version(&self) -> Version { - self.meta.version() - } -} diff --git a/crates/brk_types/Cargo.toml b/crates/brk_types/Cargo.toml index 9890df5f2..c08d44b10 100644 --- a/crates/brk_types/Cargo.toml +++ b/crates/brk_types/Cargo.toml @@ -19,6 +19,7 @@ itoa = "1.0.15" jiff = { workspace = true } num_enum = "0.7.5" rapidhash = "4.1.1" +redb = { workspace = true } ryu = "1.0.20" schemars = { workspace = true } serde = { workspace = true } diff --git a/crates/brk_types/src/addressbyteshash.rs b/crates/brk_types/src/addressbyteshash.rs index 6363f4327..d8e4acec4 100644 --- a/crates/brk_types/src/addressbyteshash.rs +++ b/crates/brk_types/src/addressbyteshash.rs @@ -1,5 +1,8 @@ +use std::{cmp::Ordering, mem}; + use byteview::ByteView; use derive_deref::Deref; +use redb::{Key, TypeName, Value}; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; use super::AddressBytes; @@ -48,3 +51,40 @@ impl From<&AddressBytesHash> for ByteView { Self::new(value.as_bytes()) } } + +impl Value for AddressBytesHash { + type SelfType<'a> = AddressBytesHash; + type AsBytes<'a> + = [u8; mem::size_of::()] + where + Self: 'a; + + fn fixed_width() -> Option { + Some(mem::size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> AddressBytesHash + where + Self: 'a, + { + AddressBytesHash(u64::from_le_bytes(data.try_into().unwrap())) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::()] + where + Self: 'a, + Self: 'b, + { + value.0.to_le_bytes() + } + + fn type_name() -> TypeName { + TypeName::new("AddressBytesHash") + } +} + +impl Key for AddressBytesHash { + fn compare(data1: &[u8], data2: &[u8]) -> Ordering { + Self::from_bytes(data1).cmp(&Self::from_bytes(data2)) + } +} diff --git a/crates/brk_types/src/blockhashprefix.rs b/crates/brk_types/src/blockhashprefix.rs index 583640785..8393243e2 100644 --- a/crates/brk_types/src/blockhashprefix.rs +++ b/crates/brk_types/src/blockhashprefix.rs @@ -1,5 +1,8 @@ +use std::{cmp::Ordering, mem}; + use byteview::ByteView; use derive_deref::Deref; +use redb::{Key, TypeName, Value}; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; use crate::copy_first_8bytes; @@ -57,3 +60,40 @@ impl From for ByteView { Self::from(&value) } } + +impl Value for BlockHashPrefix { + type SelfType<'a> = BlockHashPrefix; + type AsBytes<'a> + = [u8; mem::size_of::()] + where + Self: 'a; + + fn fixed_width() -> Option { + Some(mem::size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> BlockHashPrefix + where + Self: 'a, + { + BlockHashPrefix(u64::from_le_bytes(data.try_into().unwrap())) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::()] + where + Self: 'a, + Self: 'b, + { + value.0.to_le_bytes() + } + + fn type_name() -> TypeName { + TypeName::new("BlockHashPrefix") + } +} + +impl Key for BlockHashPrefix { + fn compare(data1: &[u8], data2: &[u8]) -> Ordering { + Self::from_bytes(data1).cmp(&Self::from_bytes(data2)) + } +} diff --git a/crates/brk_types/src/height.rs b/crates/brk_types/src/height.rs index 4b7c532a2..e34b050d2 100644 --- a/crates/brk_types/src/height.rs +++ b/crates/brk_types/src/height.rs @@ -1,11 +1,14 @@ use std::{ + cmp::Ordering, fmt::Debug, + mem, ops::{Add, AddAssign, Rem}, }; use allocative::Allocative; use byteview::ByteView; use derive_deref::Deref; +use redb::{Key, TypeName, Value}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use vecdb::{CheckedSub, PrintableIndex, Stamp, StoredCompressed}; @@ -280,3 +283,40 @@ impl std::fmt::Display for Height { f.write_str(str) } } + +impl Value for Height { + type SelfType<'a> = Height; + type AsBytes<'a> + = [u8; mem::size_of::()] + where + Self: 'a; + + fn fixed_width() -> Option { + Some(mem::size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Height + where + Self: 'a, + { + Height(u32::from_le_bytes(data.try_into().unwrap())) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::()] + where + Self: 'a, + Self: 'b, + { + value.0.to_le_bytes() + } + + fn type_name() -> TypeName { + TypeName::new("Height") + } +} + +impl Key for Height { + fn compare(data1: &[u8], data2: &[u8]) -> Ordering { + Self::from_bytes(data1).cmp(&Self::from_bytes(data2)) + } +} diff --git a/crates/brk_types/src/stored_string.rs b/crates/brk_types/src/stored_string.rs index eabea2b7a..78186334d 100644 --- a/crates/brk_types/src/stored_string.rs +++ b/crates/brk_types/src/stored_string.rs @@ -1,7 +1,8 @@ -use std::borrow::Cow; +use std::{borrow::Cow, str}; use byteview::ByteView; use derive_deref::Deref; +use redb::{TypeName, Value}; use serde::Serialize; use vecdb::PrintableIndex; @@ -67,3 +68,36 @@ impl PrintableIndex for StoredString { &["string"] } } + +impl Value for StoredString { + type SelfType<'a> + = StoredString + where + Self: 'a; + type AsBytes<'a> + = &'a str + where + Self: 'a; + + fn fixed_width() -> Option { + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> StoredString + where + Self: 'a, + { + StoredString(str::from_utf8(data).unwrap().to_string()) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a str + where + Self: 'b, + { + value.as_str() + } + + fn type_name() -> TypeName { + TypeName::new("StoredString") + } +} diff --git a/crates/brk_types/src/txidprefix.rs b/crates/brk_types/src/txidprefix.rs index 4029539d0..0a183e2d3 100644 --- a/crates/brk_types/src/txidprefix.rs +++ b/crates/brk_types/src/txidprefix.rs @@ -1,5 +1,8 @@ +use std::{cmp::Ordering, mem}; + use byteview::ByteView; use derive_deref::Deref; +use redb::{Key, TypeName, Value}; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; use crate::copy_first_8bytes; @@ -64,3 +67,40 @@ impl From<[u8; 8]> for TxidPrefix { Self(u64::from_ne_bytes(value)) } } + +impl Value for TxidPrefix { + type SelfType<'a> = TxidPrefix; + type AsBytes<'a> + = [u8; mem::size_of::()] + where + Self: 'a; + + fn fixed_width() -> Option { + Some(mem::size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> TxidPrefix + where + Self: 'a, + { + TxidPrefix(u64::from_le_bytes(data.try_into().unwrap())) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::()] + where + Self: 'a, + Self: 'b, + { + value.0.to_le_bytes() + } + + fn type_name() -> TypeName { + TypeName::new("TxidPrefix") + } +} + +impl Key for TxidPrefix { + fn compare(data1: &[u8], data2: &[u8]) -> Ordering { + Self::from_bytes(data1).cmp(&Self::from_bytes(data2)) + } +} diff --git a/crates/brk_types/src/txindex.rs b/crates/brk_types/src/txindex.rs index a467402c0..420eb8187 100644 --- a/crates/brk_types/src/txindex.rs +++ b/crates/brk_types/src/txindex.rs @@ -1,8 +1,12 @@ -use std::ops::{Add, AddAssign}; +use std::{ + mem, + ops::{Add, AddAssign}, +}; use allocative::Allocative; use byteview::ByteView; use derive_deref::{Deref, DerefMut}; +use redb::{TypeName, Value}; use schemars::JsonSchema; use serde::Serialize; use vecdb::{CheckedSub, PrintableIndex, StoredCompressed}; @@ -166,3 +170,34 @@ impl std::fmt::Display for TxIndex { f.write_str(str) } } + +impl Value for TxIndex { + type SelfType<'a> = TxIndex; + type AsBytes<'a> + = [u8; mem::size_of::()] + where + Self: 'a; + + fn fixed_width() -> Option { + Some(mem::size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> TxIndex + where + Self: 'a, + { + TxIndex(u32::from_le_bytes(data.try_into().unwrap())) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::()] + where + Self: 'a, + Self: 'b, + { + value.0.to_le_bytes() + } + + fn type_name() -> TypeName { + TypeName::new("TxIndex") + } +} diff --git a/crates/brk_types/src/typeindex.rs b/crates/brk_types/src/typeindex.rs index 4efd72bdc..85ca6a3ee 100644 --- a/crates/brk_types/src/typeindex.rs +++ b/crates/brk_types/src/typeindex.rs @@ -1,6 +1,7 @@ -use std::ops::Add; +use std::{mem, ops::Add}; use byteview::ByteView; +use redb::{TypeName, Value}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use vecdb::{CheckedSub, StoredCompressed}; @@ -150,3 +151,34 @@ impl std::fmt::Display for TypeIndex { f.write_str(str) } } + +impl Value for TypeIndex { + type SelfType<'a> = TypeIndex; + type AsBytes<'a> + = [u8; mem::size_of::()] + where + Self: 'a; + + fn fixed_width() -> Option { + Some(mem::size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> TypeIndex + where + Self: 'a, + { + TypeIndex(u32::from_le_bytes(data.try_into().unwrap())) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::()] + where + Self: 'a, + Self: 'b, + { + value.0.to_le_bytes() + } + + fn type_name() -> TypeName { + TypeName::new("TypeIndex") + } +} diff --git a/crates/brk_types/src/typeindexandoutpoint.rs b/crates/brk_types/src/typeindexandoutpoint.rs index 1232502bd..95366f963 100644 --- a/crates/brk_types/src/typeindexandoutpoint.rs +++ b/crates/brk_types/src/typeindexandoutpoint.rs @@ -1,6 +1,10 @@ -use std::hash::{Hash, Hasher}; +use std::{ + cmp::Ordering, + hash::{Hash, Hasher}, +}; use byteview::ByteView; +use redb::{Key, TypeName, Value}; use serde::Serialize; use zerocopy::IntoBytes; @@ -62,3 +66,49 @@ impl From<&TypeIndexAndOutPoint> for ByteView { ) } } + +impl Value for TypeIndexAndOutPoint { + type SelfType<'a> = TypeIndexAndOutPoint; + type AsBytes<'a> + = [u8; 10] + // 8 bytes (u64) + 2 bytes (u16) + where + Self: 'a; + + fn fixed_width() -> Option { + Some(10) // 8 + 2 + } + + fn from_bytes<'a>(data: &'a [u8]) -> TypeIndexAndOutPoint + where + Self: 'a, + { + TypeIndexAndOutPoint { + typeindexandtxindex: TypeIndexAndTxIndex::from_bytes(&data[0..8]), + vout: Vout::from_bytes(&data[8..10]), + } + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 10] + where + Self: 'a, + Self: 'b, + { + let mut bytes = [0u8; 10]; + bytes[0..8].copy_from_slice(&::as_bytes( + &value.typeindexandtxindex, + )); + bytes[8..10].copy_from_slice(&::as_bytes(&value.vout)); + bytes + } + + fn type_name() -> TypeName { + TypeName::new("TypeIndexAndOutPoint") + } +} + +impl Key for TypeIndexAndOutPoint { + fn compare(data1: &[u8], data2: &[u8]) -> Ordering { + Self::from_bytes(data1).cmp(&Self::from_bytes(data2)) + } +} diff --git a/crates/brk_types/src/typeindexandtxindex.rs b/crates/brk_types/src/typeindexandtxindex.rs index 7ee0ae1b7..75734b496 100644 --- a/crates/brk_types/src/typeindexandtxindex.rs +++ b/crates/brk_types/src/typeindexandtxindex.rs @@ -1,4 +1,7 @@ +use std::{cmp::Ordering, mem}; + use byteview::ByteView; +use redb::{Key, TypeName, Value}; use serde::Serialize; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; @@ -78,3 +81,40 @@ impl From for u64 { value.0 } } + +impl Value for TypeIndexAndTxIndex { + type SelfType<'a> = TypeIndexAndTxIndex; + type AsBytes<'a> + = [u8; mem::size_of::()] + where + Self: 'a; + + fn fixed_width() -> Option { + Some(mem::size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> TypeIndexAndTxIndex + where + Self: 'a, + { + TypeIndexAndTxIndex(u64::from_le_bytes(data.try_into().unwrap())) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::()] + where + Self: 'a, + Self: 'b, + { + value.0.to_le_bytes() + } + + fn type_name() -> TypeName { + TypeName::new("TypeIndexAndTxIndex") + } +} + +impl Key for TypeIndexAndTxIndex { + fn compare(data1: &[u8], data2: &[u8]) -> Ordering { + Self::from_bytes(data1).cmp(&Self::from_bytes(data2)) + } +} diff --git a/crates/brk_types/src/unit.rs b/crates/brk_types/src/unit.rs index 5bc52d041..ac5af42fc 100644 --- a/crates/brk_types/src/unit.rs +++ b/crates/brk_types/src/unit.rs @@ -1,4 +1,5 @@ use byteview::ByteView; +use redb::{TypeName, Value}; #[derive(Debug, Clone)] pub struct Unit; @@ -15,3 +16,38 @@ impl From for ByteView { Self::new(&[]) } } + +impl Value for Unit { + type SelfType<'a> + = Unit + where + Self: 'a; + type AsBytes<'a> + = &'a [u8] + where + Self: 'a; + + fn fixed_width() -> Option { + Some(0) + } + + #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)] + fn from_bytes<'a>(_data: &'a [u8]) -> Unit + where + Self: 'a, + { + Unit + } + + #[allow(clippy::ignored_unit_patterns)] + fn as_bytes<'a, 'b: 'a>(_: &'a Self::SelfType<'b>) -> &'a [u8] + where + Self: 'b, + { + &[] + } + + fn type_name() -> TypeName { + TypeName::new("Unit") + } +} diff --git a/crates/brk_types/src/vout.rs b/crates/brk_types/src/vout.rs index 57a413aa7..0e797d877 100644 --- a/crates/brk_types/src/vout.rs +++ b/crates/brk_types/src/vout.rs @@ -1,5 +1,8 @@ +use std::mem; + use allocative::Allocative; use derive_deref::Deref; +use redb::{TypeName, Value}; use schemars::JsonSchema; use serde::Serialize; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; @@ -107,3 +110,34 @@ impl std::fmt::Display for Vout { self.0.fmt(f) } } + +impl Value for Vout { + type SelfType<'a> = Vout; + type AsBytes<'a> + = [u8; mem::size_of::()] + where + Self: 'a; + + fn fixed_width() -> Option { + Some(mem::size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Vout + where + Self: 'a, + { + Vout(u16::from_le_bytes(data.try_into().unwrap())) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::()] + where + Self: 'a, + Self: 'b, + { + value.0.to_le_bytes() + } + + fn type_name() -> TypeName { + TypeName::new("Vout") + } +}