diff --git a/Cargo.lock b/Cargo.lock index 3395a206b..8c2ef27cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -653,6 +653,7 @@ dependencies = [ "log", "pco", "rayon", + "rustc-hash", "serde", "vecdb", "zerocopy", @@ -798,6 +799,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "tokio", "vecdb", ] @@ -1261,6 +1263,7 @@ dependencies = [ "brk_types", "byteview 0.6.1", "byteview 0.8.0", + "candystore", "log", "parking_lot 0.12.5", "rustc-hash", @@ -1350,6 +1353,26 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "bytemuck" +version = "1.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" +dependencies = [ + "bytemuck_derive", +] + +[[package]] +name = "bytemuck_derive" +version = "1.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "byteorder" version = "1.5.0" @@ -1374,6 +1397,26 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e6b0e42e210b794e14b152c6fe1a55831e30ef4a0f5dc39d73d714fb5f1906c" +[[package]] +name = "candystore" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "304e90a814421e59b7e986f06d6d9041ecddeb28ce8bcc0a20bbf0091f22d0f0" +dependencies = [ + "anyhow", + "bytemuck", + "crossbeam-channel", + "databuf", + "fslock", + "libc", + "memmap", + "parking_lot 0.12.5", + "rand 0.9.2", + "simd-itertools", + "siphasher", + "uuid", +] + [[package]] name = "castaway" version = "0.2.4" @@ -1761,6 +1804,34 @@ dependencies = [ "parking_lot_core 0.9.12", ] +[[package]] +name = "databuf" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1ad1d99bee317a8dac0b7cd86896c5a5f24307009292985dabbf3e412c8b9d" +dependencies = [ + "databuf-derive", +] + +[[package]] +name = "databuf-derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04040c9fc8fcb4084222a26c99faf5b3014772a6115e076b7a50fe49bf25d0ea" +dependencies = [ + "databuf_derive_impl", +] + +[[package]] +name = "databuf_derive_impl" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf656eb071fe87d23716f933788a35a8ad6baa6fdbf66a67a261dbd3f9dc81a" +dependencies = [ + "quote2", + "syn 2.0.108", +] + [[package]] name = "deranged" version = "0.5.4" @@ -2067,6 +2138,16 @@ dependencies = [ "libc", ] +[[package]] +name = "fslock" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04412b8935272e3a9bae6f48c7bfff74c2911f60525404edfdd28e49884c3bfb" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "futures" version = "0.3.31" @@ -2777,6 +2858,16 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "memmap" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "memmap2" version = "0.9.9" @@ -2827,6 +2918,28 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "multiversion" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edb7f0ff51249dfda9ab96b5823695e15a052dc15074c9dbf3d118afaf2c201" +dependencies = [ + "multiversion-macros", + "target-features", +] + +[[package]] +name = "multiversion-macros" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b093064383341eb3271f42e381cb8f10a01459478446953953c75d24bd339fc0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.108", + "target-features", +] + [[package]] name = "munge" version = "0.4.7" @@ -3824,6 +3937,23 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "quote2" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "970573b86f7e5795c8c6c50c56ef602368593f0687188da27fd489a59e253630" +dependencies = [ + "proc-macro2", + "quote", + "quote2-macros", +] + +[[package]] +name = "quote2-macros" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4b89c37b2d870a28629ad20da669bb0e7d7214878d0d5111b304aa466e1977" + [[package]] name = "r-efi" version = "5.3.0" @@ -4412,6 +4542,15 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "simd-itertools" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a037ed5ba0cb7102a5b720453b642c5b2cf39960edd2ceace91af8ec3743082a" +dependencies = [ + "multiversion", +] + [[package]] name = "simdutf8" version = "0.1.5" @@ -4603,6 +4742,12 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "target-features" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1bbb9f3c5c463a01705937a24fdabc5047929ac764b2d5b9cf681c1f5041ed5" + [[package]] name = "tempfile" version = "3.23.0" diff --git a/Cargo.toml b/Cargo.toml index a13e46e65..e8999bb98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ brk_error = { version = "0.0.111", path = "crates/brk_error" } brk_fetcher = { version = "0.0.111", path = "crates/brk_fetcher" } brk_grouper = { version = "0.0.111", path = "crates/brk_grouper" } brk_indexer = { version = "0.0.111", path = "crates/brk_indexer" } -brk_query = { version = "0.0.111", path = "crates/brk_query" } +brk_query = { version = "0.0.111", path = "crates/brk_query", features = ["tokio"] } brk_iterator = { version = "0.0.111", path = "crates/brk_iterator" } brk_logger = { version = "0.0.111", path = "crates/brk_logger" } brk_mcp = { version = "0.0.111", path = "crates/brk_mcp" } diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index e1fc1d784..e1d8f8b07 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -27,6 +27,7 @@ derive_deref = { workspace = true } log = { workspace = true } pco = "0.4.7" rayon = { workspace = true } +rustc-hash = { workspace = true } serde = { workspace = true } vecdb = { workspace = true } zerocopy = { workspace = true } diff --git a/crates/brk_computer/src/chain.rs b/crates/brk_computer/src/chain.rs index 8a347f66d..a0dc50af5 100644 --- a/crates/brk_computer/src/chain.rs +++ b/crates/brk_computer/src/chain.rs @@ -168,7 +168,7 @@ impl Vecs { |(txinindex, txoutindex)| { let txoutindex = txoutindex.into_owned(); if txoutindex == TxOutIndex::COINBASE { - Sats::ZERO + Sats::MAX } else if let Some((_, value)) = txoutindex_to_value_iter.next_at(txoutindex.to_usize()) { @@ -1456,15 +1456,14 @@ impl Vecs { // }, // )?; - self.txindex_to_fee.compute_transform3( + self.txindex_to_fee.compute_transform2( starting_indexes.txindex, &self.txindex_to_input_value, &self.txindex_to_output_value, - &self.txindex_to_is_coinbase, - |(i, input, output, coinbase, ..)| { + |(i, input, output, ..)| { ( i, - if coinbase.is_true() { + if input.is_max() { Sats::ZERO } else { input.checked_sub(output).unwrap() diff --git a/crates/brk_computer/src/indexes.rs b/crates/brk_computer/src/indexes.rs index 9e5da70bb..b962eb18d 100644 --- a/crates/brk_computer/src/indexes.rs +++ b/crates/brk_computer/src/indexes.rs @@ -120,6 +120,9 @@ impl Vecs { .next_at(index.to_usize()) .map(|(_, outpoint)| { let outpoint = outpoint.into_owned(); + if outpoint.is_coinbase() { + return TxOutIndex::COINBASE; + } txindex_to_first_txoutindex_iter .next_at(outpoint.txindex().to_usize()) .unwrap() diff --git a/crates/brk_computer/src/lib.rs b/crates/brk_computer/src/lib.rs index b0d4ed0cf..21d30ef0e 100644 --- a/crates/brk_computer/src/lib.rs +++ b/crates/brk_computer/src/lib.rs @@ -59,12 +59,20 @@ impl Computer { let computed_path = outputs_path.join("computed"); - let (indexes, fetched, blks) = thread::scope(|s| -> Result<_> { - let fetched_handle = fetcher.map(|fetcher| { - s.spawn(move || fetched::Vecs::forced_import(outputs_path, fetcher, VERSION)) - }); + const STACK_SIZE: usize = 512 * 1024 * 1024; + let big_thread = || thread::Builder::new().stack_size(STACK_SIZE); - let blks_handle = s.spawn(|| blks::Vecs::forced_import(&computed_path, VERSION)); + let (indexes, fetched, blks) = thread::scope(|s| -> Result<_> { + let fetched_handle = fetcher + .map(|fetcher| { + big_thread().spawn_scoped(s, move || { + fetched::Vecs::forced_import(outputs_path, fetcher, VERSION) + }) + }) + .transpose()?; + + let blks_handle = big_thread() + .spawn_scoped(s, || blks::Vecs::forced_import(&computed_path, VERSION))?; let indexes = indexes::Vecs::forced_import(&computed_path, VERSION, indexer)?; let fetched = fetched_handle.map(|h| h.join().unwrap()).transpose()?; @@ -74,11 +82,13 @@ impl Computer { })?; let (price, constants, market) = thread::scope(|s| -> Result<_> { - let constants_handle = - s.spawn(|| constants::Vecs::forced_import(&computed_path, VERSION, &indexes)); + let constants_handle = big_thread().spawn_scoped(s, || { + constants::Vecs::forced_import(&computed_path, VERSION, &indexes) + })?; - let market_handle = - s.spawn(|| market::Vecs::forced_import(&computed_path, VERSION, &indexes)); + let market_handle = big_thread().spawn_scoped(s, || { + market::Vecs::forced_import(&computed_path, VERSION, &indexes) + })?; let price = fetched .is_some() @@ -91,7 +101,7 @@ impl Computer { })?; let (chain, pools, cointime) = thread::scope(|s| -> Result<_> { - let chain_handle = s.spawn(|| { + let chain_handle = big_thread().spawn_scoped(s, || { chain::Vecs::forced_import( &computed_path, VERSION, @@ -99,11 +109,11 @@ impl Computer { &indexes, price.as_ref(), ) - }); + })?; - let pools_handle = s.spawn(|| { + let pools_handle = big_thread().spawn_scoped(s, || { pools::Vecs::forced_import(&computed_path, VERSION, &indexes, price.as_ref()) - }); + })?; let cointime = cointime::Vecs::forced_import(&computed_path, VERSION, &indexes, price.as_ref())?; diff --git a/crates/brk_computer/src/stateful/addresstype/mod.rs b/crates/brk_computer/src/stateful/addresstype/mod.rs index ddb86896b..8b2fc4845 100644 --- a/crates/brk_computer/src/stateful/addresstype/mod.rs +++ b/crates/brk_computer/src/stateful/addresstype/mod.rs @@ -2,12 +2,12 @@ mod addresscount; mod height_to_addresscount; mod height_to_vec; mod indexes_to_addresscount; -mod typeindex_tree; +mod typeindex_map; mod vec; pub use addresscount::*; pub use height_to_addresscount::*; pub use height_to_vec::*; pub use indexes_to_addresscount::*; -pub use typeindex_tree::*; +pub use typeindex_map::*; pub use vec::*; diff --git a/crates/brk_computer/src/stateful/addresstype/typeindex_map.rs b/crates/brk_computer/src/stateful/addresstype/typeindex_map.rs new file mode 100644 index 000000000..5a2f382a3 --- /dev/null +++ b/crates/brk_computer/src/stateful/addresstype/typeindex_map.rs @@ -0,0 +1,68 @@ +use std::mem; + +use brk_grouper::ByAddressType; +use brk_types::{OutputType, TypeIndex}; +use derive_deref::{Deref, DerefMut}; +use rustc_hash::FxHashMap; + +#[derive(Debug, Deref, DerefMut)] +pub struct AddressTypeToTypeIndexMap(ByAddressType>); + +impl AddressTypeToTypeIndexMap { + pub fn merge(mut self, mut other: Self) -> Self { + Self::merge_(&mut self.p2pk65, &mut other.p2pk65); + Self::merge_(&mut self.p2pk33, &mut other.p2pk33); + Self::merge_(&mut self.p2pkh, &mut other.p2pkh); + Self::merge_(&mut self.p2sh, &mut other.p2sh); + Self::merge_(&mut self.p2wpkh, &mut other.p2wpkh); + Self::merge_(&mut self.p2wsh, &mut other.p2wsh); + Self::merge_(&mut self.p2tr, &mut other.p2tr); + Self::merge_(&mut self.p2a, &mut other.p2a); + self + } + + fn merge_(own: &mut FxHashMap, other: &mut FxHashMap) { + if own.len() < other.len() { + mem::swap(own, other); + } + own.extend(other.drain()); + } + + // pub fn get_for_type(&self, address_type: OutputType, typeindex: &TypeIndex) -> Option<&T> { + // self.get(address_type).unwrap().get(typeindex) + // } + + pub fn insert_for_type(&mut self, address_type: OutputType, typeindex: TypeIndex, value: T) { + self.get_mut(address_type).unwrap().insert(typeindex, value); + } + + pub fn remove_for_type(&mut self, address_type: OutputType, typeindex: &TypeIndex) -> T { + self.get_mut(address_type) + .unwrap() + .remove(typeindex) + .unwrap() + } + + pub fn into_sorted_iter(self) -> impl Iterator)> { + self.0.into_iter_typed().map(|(output_type, map)| { + let mut sorted: Vec<_> = map.into_iter().collect(); + sorted.sort_unstable_by_key(|(typeindex, _)| *typeindex); + (output_type, sorted) + }) + } +} + +impl Default for AddressTypeToTypeIndexMap { + fn default() -> Self { + Self(ByAddressType { + p2pk65: FxHashMap::default(), + p2pk33: FxHashMap::default(), + p2pkh: FxHashMap::default(), + p2sh: FxHashMap::default(), + p2wpkh: FxHashMap::default(), + p2wsh: FxHashMap::default(), + p2tr: FxHashMap::default(), + p2a: FxHashMap::default(), + }) + } +} diff --git a/crates/brk_computer/src/stateful/addresstype/typeindex_tree.rs b/crates/brk_computer/src/stateful/addresstype/typeindex_tree.rs deleted file mode 100644 index f0021ccf8..000000000 --- a/crates/brk_computer/src/stateful/addresstype/typeindex_tree.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::{collections::BTreeMap, mem}; - -use brk_grouper::ByAddressType; -use brk_types::TypeIndex; -use derive_deref::{Deref, DerefMut}; - -#[derive(Debug, Deref, DerefMut)] -pub struct AddressTypeToTypeIndexTree(ByAddressType>); - -impl AddressTypeToTypeIndexTree { - pub fn merge(mut self, mut other: Self) -> Self { - Self::merge_(&mut self.p2pk65, &mut other.p2pk65); - Self::merge_(&mut self.p2pk33, &mut other.p2pk33); - Self::merge_(&mut self.p2pkh, &mut other.p2pkh); - Self::merge_(&mut self.p2sh, &mut other.p2sh); - Self::merge_(&mut self.p2wpkh, &mut other.p2wpkh); - Self::merge_(&mut self.p2wsh, &mut other.p2wsh); - Self::merge_(&mut self.p2tr, &mut other.p2tr); - Self::merge_(&mut self.p2a, &mut other.p2a); - self - } - - fn merge_(own: &mut BTreeMap, other: &mut BTreeMap) { - if own.len() >= other.len() { - own.append(other); - } else { - other.append(own); - mem::swap(own, other); - } - } - - pub fn unwrap(self) -> ByAddressType> { - self.0 - } -} - -impl Default for AddressTypeToTypeIndexTree { - fn default() -> Self { - Self(ByAddressType { - p2pk65: BTreeMap::default(), - p2pk33: BTreeMap::default(), - p2pkh: BTreeMap::default(), - p2sh: BTreeMap::default(), - p2wpkh: BTreeMap::default(), - p2wsh: BTreeMap::default(), - p2tr: BTreeMap::default(), - p2a: BTreeMap::default(), - }) - } -} diff --git a/crates/brk_computer/src/stateful/mod.rs b/crates/brk_computer/src/stateful/mod.rs index b6b7661ce..301eb5b23 100644 --- a/crates/brk_computer/src/stateful/mod.rs +++ b/crates/brk_computer/src/stateful/mod.rs @@ -1,10 +1,4 @@ -use std::{ - cmp::Ordering, - collections::{BTreeMap, BTreeSet}, - mem, - path::Path, - thread, -}; +use std::{cmp::Ordering, collections::BTreeSet, mem, path::Path, thread}; use brk_error::Result; use brk_grouper::{ByAddressType, ByAnyAddress, Filtered}; @@ -19,6 +13,7 @@ use brk_types::{ }; use log::info; use rayon::prelude::*; +use rustc_hash::FxHashMap; use vecdb::{ AnyCloneableIterableVec, AnyStoredVec, AnyVec, CollectableVec, Database, EagerVec, Exit, Format, GenericStoredVec, ImportOptions, LazyVecFrom1, PAGE_SIZE, RawVec, Reader, Stamp, @@ -59,17 +54,8 @@ pub struct Vecs { // States // --- pub chain_state: RawVec, - pub p2pk33addressindex_to_anyaddressindex: RawVec, - pub p2pk65addressindex_to_anyaddressindex: RawVec, - pub p2pkhaddressindex_to_anyaddressindex: RawVec, - pub p2shaddressindex_to_anyaddressindex: RawVec, - pub p2traddressindex_to_anyaddressindex: RawVec, - pub p2wpkhaddressindex_to_anyaddressindex: RawVec, - pub p2wshaddressindex_to_anyaddressindex: RawVec, - pub p2aaddressindex_to_anyaddressindex: RawVec, - pub loadedaddressindex_to_loadedaddressdata: RawVec, - pub emptyaddressindex_to_emptyaddressdata: RawVec, - + pub any_address_indexes: AnyAddressIndexes, + pub addresses_data: AddressesData, pub utxo_cohorts: utxo_cohorts::Vecs, pub address_cohorts: address_cohorts::Vecs, @@ -453,41 +439,44 @@ impl Vecs { &states_path, )?, - p2aaddressindex_to_anyaddressindex: RawVec::forced_import_with( - ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) - .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), - )?, - p2pk33addressindex_to_anyaddressindex: RawVec::forced_import_with( - ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) - .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), - )?, - p2pk65addressindex_to_anyaddressindex: RawVec::forced_import_with( - ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) - .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), - )?, - p2pkhaddressindex_to_anyaddressindex: RawVec::forced_import_with( - ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) - .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), - )?, - p2shaddressindex_to_anyaddressindex: RawVec::forced_import_with( - ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) - .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), - )?, - p2traddressindex_to_anyaddressindex: RawVec::forced_import_with( - ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) - .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), - )?, - p2wpkhaddressindex_to_anyaddressindex: RawVec::forced_import_with( - ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) - .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), - )?, - p2wshaddressindex_to_anyaddressindex: RawVec::forced_import_with( - ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) - .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), - )?, - - loadedaddressindex_to_loadedaddressdata, - emptyaddressindex_to_emptyaddressdata, + any_address_indexes: AnyAddressIndexes { + p2a: RawVec::forced_import_with( + ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) + .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), + )?, + p2pk33: RawVec::forced_import_with( + ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) + .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), + )?, + p2pk65: RawVec::forced_import_with( + ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) + .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), + )?, + p2pkh: RawVec::forced_import_with( + ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) + .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), + )?, + p2sh: RawVec::forced_import_with( + ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) + .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), + )?, + p2tr: RawVec::forced_import_with( + ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) + .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), + )?, + p2wpkh: RawVec::forced_import_with( + ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) + .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), + )?, + p2wsh: RawVec::forced_import_with( + ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO) + .with_saved_stamped_changes(SAVED_STAMPED_CHANGES), + )?, + }, + addresses_data: AddressesData { + loaded: loadedaddressindex_to_loadedaddressdata, + empty: emptyaddressindex_to_emptyaddressdata, + }, loadedaddressindex_to_loadedaddressindex, emptyaddressindex_to_emptyaddressindex, @@ -634,16 +623,8 @@ impl Vecs { .unwrap_or_default(), ) .min(chain_state_starting_height) - .min(Height::from(self.p2pk33addressindex_to_anyaddressindex.stamp()).incremented()) - .min(Height::from(self.p2pk65addressindex_to_anyaddressindex.stamp()).incremented()) - .min(Height::from(self.p2pkhaddressindex_to_anyaddressindex.stamp()).incremented()) - .min(Height::from(self.p2shaddressindex_to_anyaddressindex.stamp()).incremented()) - .min(Height::from(self.p2traddressindex_to_anyaddressindex.stamp()).incremented()) - .min(Height::from(self.p2wpkhaddressindex_to_anyaddressindex.stamp()).incremented()) - .min(Height::from(self.p2wshaddressindex_to_anyaddressindex.stamp()).incremented()) - .min(Height::from(self.p2aaddressindex_to_anyaddressindex.stamp()).incremented()) - .min(Height::from(self.loadedaddressindex_to_loadedaddressdata.stamp()).incremented()) - .min(Height::from(self.emptyaddressindex_to_emptyaddressdata.stamp()).incremented()) + .min(self.any_address_indexes.min_stamped_height()) + .min(self.addresses_data.min_stamped_height()) .min(Height::from(self.height_to_unspendable_supply.len())) .min(Height::from(self.height_to_opreturn_supply.len())) .cmp(&chain_state_starting_height) @@ -666,40 +647,21 @@ impl Vecs { if starting_height <= last_height { // info!("starting_height = {starting_height}"); + let stamp = starting_height.into(); let starting_height = if starting_height.is_not_zero() { - let mut set = [ - self.chain_state.rollback_before(starting_height.into())?, - self.p2pk33addressindex_to_anyaddressindex - .rollback_before(starting_height.into())?, - self.p2pk65addressindex_to_anyaddressindex - .rollback_before(starting_height.into())?, - self.p2pkhaddressindex_to_anyaddressindex - .rollback_before(starting_height.into())?, - self.p2shaddressindex_to_anyaddressindex - .rollback_before(starting_height.into())?, - self.p2traddressindex_to_anyaddressindex - .rollback_before(starting_height.into())?, - self.p2wpkhaddressindex_to_anyaddressindex - .rollback_before(starting_height.into())?, - self.p2wshaddressindex_to_anyaddressindex - .rollback_before(starting_height.into())?, - self.p2aaddressindex_to_anyaddressindex - .rollback_before(starting_height.into())?, - self.loadedaddressindex_to_loadedaddressdata - .rollback_before(starting_height.into())?, - self.emptyaddressindex_to_emptyaddressdata - .rollback_before(starting_height.into())?, - ] - .into_iter() - // .enumerate() - // .map(|(i, s)| { - // let h = Height::from(s).incremented(); - // // dbg!((i, s, h)); - // h - // }) - .map(Height::from) - .map(Height::incremented) - .collect::>(); + let mut set = [self.chain_state.rollback_before(stamp)?] + .into_iter() + .chain(self.any_address_indexes.rollback_before(stamp)?) + .chain(self.addresses_data.rollback_before(stamp)?) + // .enumerate() + // .map(|(i, s)| { + // let h = Height::from(s).incremented(); + // // dbg!((i, s, h)); + // h + // }) + .map(Height::from) + .map(Height::incremented) + .collect::>(); if set.len() == 1 { set.pop_first().unwrap() @@ -762,16 +724,8 @@ impl Vecs { chain_state = vec![]; - self.p2pk33addressindex_to_anyaddressindex.reset()?; - self.p2pk65addressindex_to_anyaddressindex.reset()?; - self.p2pkhaddressindex_to_anyaddressindex.reset()?; - self.p2shaddressindex_to_anyaddressindex.reset()?; - self.p2traddressindex_to_anyaddressindex.reset()?; - self.p2wpkhaddressindex_to_anyaddressindex.reset()?; - self.p2wshaddressindex_to_anyaddressindex.reset()?; - self.p2aaddressindex_to_anyaddressindex.reset()?; - self.loadedaddressindex_to_loadedaddressdata.reset()?; - self.emptyaddressindex_to_emptyaddressdata.reset()?; + self.any_address_indexes.reset()?; + self.addresses_data.reset()?; separate_utxo_vecs .par_iter_mut() @@ -792,11 +746,7 @@ impl Vecs { starting_indexes.update_from_height(starting_height, indexes); - let txinindex_to_outpoint_reader = txinindex_to_outpoint.create_reader(); - let txindex_to_first_txoutindex_reader = txindex_to_first_txoutindex.create_reader(); - let txoutindex_to_value_reader = txoutindex_to_value.create_reader(); - let txoutindex_to_outputtype_reader = txoutindex_to_outputtype.create_reader(); - let txoutindex_to_typeindex_reader = txoutindex_to_typeindex.create_reader(); + let ir = IndexerReaders::new(indexer); let mut height_to_first_txoutindex_iter = height_to_first_txoutindex.into_iter(); let mut height_to_first_txinindex_iter = height_to_first_txinindex.into_iter(); @@ -857,18 +807,11 @@ impl Vecs { let mut height = starting_height; let mut addresstype_to_typeindex_to_loadedaddressdata = - AddressTypeToTypeIndexTree::>::default(); + AddressTypeToTypeIndexMap::>::default(); let mut addresstype_to_typeindex_to_emptyaddressdata = - AddressTypeToTypeIndexTree::>::default(); - let mut addresstypeindex_to_anyaddressindex_reader_opt = - ByAddressType::>::default(); - let mut anyaddressindex_to_anyaddressdata_reader_opt = - ByAnyAddress::>::default(); + AddressTypeToTypeIndexMap::>::default(); - self.reset_readers_options( - &mut addresstypeindex_to_anyaddressindex_reader_opt, - &mut anyaddressindex_to_anyaddressdata_reader_opt, - ); + let mut vr = VecsReaders::new(self); let last_height = Height::from( height_to_date_fixed @@ -877,170 +820,172 @@ impl Vecs { .unwrap_or_default(), ); - (height.to_usize()..height_to_date_fixed.len()) - .map(Height::from) - .try_for_each(|_height| -> Result<()> { - height = _height; + for _height in (height.to_usize()..height_to_date_fixed.len()).map(Height::from) { + height = _height; - info!("Processing chain at {height}..."); + info!("Processing chain at {height}..."); - self.utxo_cohorts - .iter_separate_mut() - .for_each(|Filtered(_, v)| v.state.as_mut().unwrap().reset_single_iteration_values()); + self.utxo_cohorts + .iter_separate_mut() + .for_each(|Filtered(_, v)| { + v.state.as_mut().unwrap().reset_single_iteration_values() + }); - self.address_cohorts - .iter_separate_mut() - .for_each(|Filtered(_, v)| v.state.as_mut().unwrap().reset_single_iteration_values()); + self.address_cohorts + .iter_separate_mut() + .for_each(|Filtered(_, v)| { + v.state.as_mut().unwrap().reset_single_iteration_values() + }); - let timestamp = height_to_timestamp_fixed_iter.unwrap_get_inner(height); - let price = height_to_price_close_iter - .as_mut() - .map(|i| *i.unwrap_get_inner(height)); - let first_txoutindex = height_to_first_txoutindex_iter + let timestamp = height_to_timestamp_fixed_iter.unwrap_get_inner(height); + let price = height_to_price_close_iter + .as_mut() + .map(|i| *i.unwrap_get_inner(height)); + let first_txoutindex = height_to_first_txoutindex_iter + .unwrap_get_inner(height) + .to_usize(); + let first_txinindex = height_to_first_txinindex_iter + .unwrap_get_inner(height) + .to_usize(); + let output_count = height_to_output_count_iter.unwrap_get_inner(height); + let input_count = height_to_input_count_iter.unwrap_get_inner(height); + + let first_addressindexes: ByAddressType = ByAddressType { + p2a: height_to_first_p2aaddressindex_iter .unwrap_get_inner(height) - .to_usize(); - let first_txinindex = height_to_first_txinindex_iter + .into(), + p2pk33: height_to_first_p2pk33addressindex_iter .unwrap_get_inner(height) - .to_usize(); - let output_count = height_to_output_count_iter.unwrap_get_inner(height); - let input_count = height_to_input_count_iter.unwrap_get_inner(height); + .into(), + p2pk65: height_to_first_p2pk65addressindex_iter + .unwrap_get_inner(height) + .into(), + p2pkh: height_to_first_p2pkhaddressindex_iter + .unwrap_get_inner(height) + .into(), + p2sh: height_to_first_p2shaddressindex_iter + .unwrap_get_inner(height) + .into(), + p2tr: height_to_first_p2traddressindex_iter + .unwrap_get_inner(height) + .into(), + p2wpkh: height_to_first_p2wpkhaddressindex_iter + .unwrap_get_inner(height) + .into(), + p2wsh: height_to_first_p2wshaddressindex_iter + .unwrap_get_inner(height) + .into(), + }; - let first_addressindexes: ByAddressType = ByAddressType { - p2a: height_to_first_p2aaddressindex_iter - .unwrap_get_inner(height) - .into(), - p2pk33: height_to_first_p2pk33addressindex_iter - .unwrap_get_inner(height) - .into(), - p2pk65: height_to_first_p2pk65addressindex_iter - .unwrap_get_inner(height) - .into(), - p2pkh: height_to_first_p2pkhaddressindex_iter - .unwrap_get_inner(height) - .into(), - p2sh: height_to_first_p2shaddressindex_iter - .unwrap_get_inner(height) - .into(), - p2tr: height_to_first_p2traddressindex_iter - .unwrap_get_inner(height) - .into(), - p2wpkh: height_to_first_p2wpkhaddressindex_iter - .unwrap_get_inner(height) - .into(), - p2wsh: height_to_first_p2wshaddressindex_iter - .unwrap_get_inner(height) - .into(), - }; + let ( + mut transacted, + addresstype_to_typedindex_to_received_data, + mut height_to_sent, + addresstype_to_typedindex_to_sent_data, + mut stored_or_new_addresstype_to_typeindex_to_addressdatawithsource, + ) = thread::scope(|scope| { + scope.spawn(|| { + self.utxo_cohorts + .tick_tock_next_block(&chain_state, timestamp); + }); - let ( - mut transacted, - addresstype_to_typedindex_to_received_data, - mut height_to_sent, - addresstype_to_typedindex_to_sent_data, - mut stored_or_new_addresstype_to_typeindex_to_addressdatawithsource, - ) = thread::scope(|scope| { - scope.spawn(|| { - self.utxo_cohorts - .tick_tock_next_block(&chain_state, timestamp); - }); + let (transacted, addresstype_to_typedindex_to_received_data, receiving_addresstype_to_typeindex_to_addressdatawithsource) = (first_txoutindex..first_txoutindex + usize::from(output_count)) + .into_par_iter() + .map(|i| { + let txoutindex = TxOutIndex::from(i); - let (transacted, addresstype_to_typedindex_to_received_data, receiving_addresstype_to_typeindex_to_addressdatawithsource) = (first_txoutindex..first_txoutindex + usize::from(output_count)) - .into_par_iter() - .map(TxOutIndex::from) - .map(|txoutindex| { - let value = txoutindex_to_value - .unwrap_read(txoutindex, &txoutindex_to_value_reader); + let value = txoutindex_to_value + .unwrap_read(txoutindex, &ir.txoutindex_to_value); - let output_type = txoutindex_to_outputtype - .unwrap_read(txoutindex, &txoutindex_to_outputtype_reader); + let output_type = txoutindex_to_outputtype + .unwrap_read(txoutindex, &ir.txoutindex_to_outputtype); - if output_type.is_not_address() { - return (value, output_type, None); + if output_type.is_not_address() { + return (value, output_type, None); + } + + let typeindex = txoutindex_to_typeindex + .unwrap_read(txoutindex, &ir.txoutindex_to_typeindex); + + let addressdata_opt = Self::get_addressdatawithsource( + output_type, + typeindex, + &first_addressindexes, + &addresstype_to_typeindex_to_loadedaddressdata, + &addresstype_to_typeindex_to_emptyaddressdata, + &vr, + &self.any_address_indexes, + &self.addresses_data, + ); + + (value, output_type, Some((typeindex, addressdata_opt))) + }).fold( + || { + ( + Transacted::default(), + AddressTypeToVec::<(TypeIndex, Sats)>::default(), + AddressTypeToTypeIndexMap::default() + ) + }, + |(mut transacted, mut addresstype_to_typedindex_to_data, mut addresstype_to_typeindex_to_addressdatawithsource), + ( + value, + output_type, + typeindex_with_addressdata_opt, + )| { + transacted.iterate(value, output_type); + + if let Some((typeindex, addressdata_opt)) = typeindex_with_addressdata_opt { + if let Some(addressdata) = addressdata_opt + { + addresstype_to_typeindex_to_addressdatawithsource + .insert_for_type(output_type, typeindex, addressdata); } - let typeindex = txoutindex_to_typeindex - .unwrap_read(txoutindex, &txoutindex_to_typeindex_reader); + addresstype_to_typedindex_to_data + .get_mut(output_type) + .unwrap() + .push((typeindex, value)); + } - let addressdata_opt = Self::get_addressdatawithsource( - output_type, - typeindex, - &first_addressindexes, - &addresstype_to_typeindex_to_loadedaddressdata, - &addresstype_to_typeindex_to_emptyaddressdata, - &addresstypeindex_to_anyaddressindex_reader_opt, - &anyaddressindex_to_anyaddressdata_reader_opt, - &self.p2pk33addressindex_to_anyaddressindex, - &self.p2pk65addressindex_to_anyaddressindex, - &self.p2pkhaddressindex_to_anyaddressindex, - &self.p2shaddressindex_to_anyaddressindex, - &self.p2traddressindex_to_anyaddressindex, - &self.p2wpkhaddressindex_to_anyaddressindex, - &self.p2wshaddressindex_to_anyaddressindex, - &self.p2aaddressindex_to_anyaddressindex, - &self.loadedaddressindex_to_loadedaddressdata, - &self.emptyaddressindex_to_emptyaddressdata, - ); - - (value, output_type, Some((typeindex, addressdata_opt))) - }).fold( + (transacted, addresstype_to_typedindex_to_data, addresstype_to_typeindex_to_addressdatawithsource) + }).reduce( || { ( Transacted::default(), AddressTypeToVec::<(TypeIndex, Sats)>::default(), - AddressTypeToTypeIndexTree::default() + AddressTypeToTypeIndexMap::default() ) }, - |(mut transacted, mut addresstype_to_typedindex_to_data, mut addresstype_to_typeindex_to_addressdatawithsource), - ( - value, - output_type, - typeindex_with_addressdata_opt, - )| { - transacted.iterate(value, output_type); + |(transacted, addresstype_to_typedindex_to_data, addresstype_to_typeindex_to_addressdatawithsource), (transacted2, addresstype_to_typedindex_to_data2, addresstype_to_typeindex_to_addressdatawithsource2)| { + (transacted + transacted2, addresstype_to_typedindex_to_data.merge(addresstype_to_typedindex_to_data2), addresstype_to_typeindex_to_addressdatawithsource.merge(addresstype_to_typeindex_to_addressdatawithsource2)) + }, + ); - if let Some((typeindex, addressdata_opt)) = typeindex_with_addressdata_opt { - if let Some(addressdata) = addressdata_opt - { - addresstype_to_typeindex_to_addressdatawithsource - .get_mut(output_type) - .unwrap() - .insert(typeindex, addressdata); - } - - addresstype_to_typedindex_to_data - .get_mut(output_type) - .unwrap() - .push((typeindex, value)); - } - - (transacted, addresstype_to_typedindex_to_data, addresstype_to_typeindex_to_addressdatawithsource) - }).reduce( - || { - ( - Transacted::default(), - AddressTypeToVec::<(TypeIndex, Sats)>::default(), - AddressTypeToTypeIndexTree::default() - ) - }, - |(transacted, addresstype_to_typedindex_to_data, addresstype_to_typeindex_to_addressdatawithsource), (transacted2, addresstype_to_typedindex_to_data2, addresstype_to_typeindex_to_addressdatawithsource2)| { - (transacted + transacted2, addresstype_to_typedindex_to_data.merge(addresstype_to_typedindex_to_data2), addresstype_to_typeindex_to_addressdatawithsource.merge(addresstype_to_typeindex_to_addressdatawithsource2)) - }, - ); - - // Skip coinbase - let (height_to_sent, addresstype_to_typedindex_to_sent_data, sending_addresstype_to_typeindex_to_addressdatawithsource) = (first_txinindex + 1..first_txinindex + usize::from(input_count)) + // Skip coinbase + let ( + height_to_sent, + addresstype_to_typedindex_to_sent_data, + sending_addresstype_to_typeindex_to_addressdatawithsource, + ) = + (first_txinindex + 1..first_txinindex + usize::from(input_count)) .into_par_iter() - .map(TxInIndex::from) - .map(|txinindex| { - let outpoint = txinindex_to_outpoint.unwrap_read(txinindex, &txinindex_to_outpoint_reader); + .map(|i| { + let txinindex = TxInIndex::from(i); - let txoutindex = txindex_to_first_txoutindex.unwrap_read(outpoint.txindex(), &txindex_to_first_txoutindex_reader) + outpoint.vout(); + let outpoint = txinindex_to_outpoint + .unwrap_read(txinindex, &ir.txinindex_to_outpoint); + + let txoutindex = txindex_to_first_txoutindex.unwrap_read( + outpoint.txindex(), + &ir.txindex_to_first_txoutindex, + ) + outpoint.vout(); let value = txoutindex_to_value - .unwrap_read(txoutindex, &txoutindex_to_value_reader); + .unwrap_read(txoutindex, &ir.txoutindex_to_value); let input_type = txoutindex_to_outputtype - .unwrap_read(txoutindex, &txoutindex_to_outputtype_reader); + .unwrap_read(txoutindex, &ir.txoutindex_to_outputtype); let prev_height = *txoutindex_range_to_height.get(txoutindex).unwrap(); @@ -1050,7 +995,7 @@ impl Vecs { } let typeindex = txoutindex_to_typeindex - .unwrap_read(txoutindex, &txoutindex_to_typeindex_reader); + .unwrap_read(txoutindex, &ir.txoutindex_to_typeindex); let addressdata_opt = Self::get_addressdatawithsource( input_type, @@ -1058,242 +1003,289 @@ impl Vecs { &first_addressindexes, &addresstype_to_typeindex_to_loadedaddressdata, &addresstype_to_typeindex_to_emptyaddressdata, - &addresstypeindex_to_anyaddressindex_reader_opt, - &anyaddressindex_to_anyaddressdata_reader_opt, - &self.p2pk33addressindex_to_anyaddressindex, - &self.p2pk65addressindex_to_anyaddressindex, - &self.p2pkhaddressindex_to_anyaddressindex, - &self.p2shaddressindex_to_anyaddressindex, - &self.p2traddressindex_to_anyaddressindex, - &self.p2wpkhaddressindex_to_anyaddressindex, - &self.p2wshaddressindex_to_anyaddressindex, - &self.p2aaddressindex_to_anyaddressindex, - &self.loadedaddressindex_to_loadedaddressdata, - &self.emptyaddressindex_to_emptyaddressdata, + &vr, + &self.any_address_indexes, + &self.addresses_data, ); - (prev_height, value, input_type, Some((typeindex, addressdata_opt))) - }).fold( - || { ( - BTreeMap::::default(), - HeightToAddressTypeToVec::<(TypeIndex, Sats)>::default(), - AddressTypeToTypeIndexTree::default() + prev_height, + value, + input_type, + Some((typeindex, addressdata_opt)), ) - }, - |(mut height_to_transacted, mut height_to_addresstype_to_typedindex_to_data, mut addresstype_to_typeindex_to_addressdatawithsource), - ( - prev_height, - value, - output_type, - typeindex_with_addressdata_opt, - )| { - height_to_transacted - .entry(prev_height) - .or_default() - .iterate(value, output_type); - - if let Some((typeindex, addressdata_opt)) = typeindex_with_addressdata_opt { - if let Some(addressdata) = addressdata_opt - { - addresstype_to_typeindex_to_addressdatawithsource - .get_mut(output_type) - .unwrap() - .insert(typeindex, addressdata); - } - - height_to_addresstype_to_typedindex_to_data - .entry(prev_height) - .or_default() - .get_mut(output_type) - .unwrap() - .push((typeindex, value)); - } - - (height_to_transacted, height_to_addresstype_to_typedindex_to_data, addresstype_to_typeindex_to_addressdatawithsource) - }).reduce( + }) + .fold( || { ( - BTreeMap::::default(), + FxHashMap::::default(), HeightToAddressTypeToVec::<(TypeIndex, Sats)>::default(), - AddressTypeToTypeIndexTree::default() + AddressTypeToTypeIndexMap::default(), ) }, - |(height_to_transacted, addresstype_to_typedindex_to_data, addresstype_to_typeindex_to_addressdatawithsource), (height_to_transacted2, addresstype_to_typedindex_to_data2, addresstype_to_typeindex_to_addressdatawithsource2)| { - let (mut height_to_transacted, height_to_transacted_consumed) = if height_to_transacted.len() > height_to_transacted2.len() { - (height_to_transacted, height_to_transacted2) - } else { - (height_to_transacted2, height_to_transacted) - }; - height_to_transacted_consumed.into_iter().for_each(|(k, v)| { - *height_to_transacted.entry(k).or_default() += v; - }); + |( + mut height_to_transacted, + mut height_to_addresstype_to_typedindex_to_data, + mut addresstype_to_typeindex_to_addressdatawithsource, + ), + ( + prev_height, + value, + output_type, + typeindex_with_addressdata_opt, + )| { + height_to_transacted + .entry(prev_height) + .or_default() + .iterate(value, output_type); - let (mut addresstype_to_typedindex_to_data, addresstype_to_typedindex_to_data_consumed) = if addresstype_to_typedindex_to_data.len() > addresstype_to_typedindex_to_data2.len() { - (addresstype_to_typedindex_to_data, addresstype_to_typedindex_to_data2) - } else { - (addresstype_to_typedindex_to_data2, addresstype_to_typedindex_to_data) - }; - addresstype_to_typedindex_to_data_consumed.0.into_iter().for_each(|(k, v)| { - addresstype_to_typedindex_to_data.entry(k).or_default().merge_mut(v); - }); + if let Some((typeindex, addressdata_opt)) = + typeindex_with_addressdata_opt + { + if let Some(addressdata) = addressdata_opt { + addresstype_to_typeindex_to_addressdatawithsource + .insert_for_type( + output_type, + typeindex, + addressdata, + ); + } - (height_to_transacted, addresstype_to_typedindex_to_data, addresstype_to_typeindex_to_addressdatawithsource.merge(addresstype_to_typeindex_to_addressdatawithsource2)) + height_to_addresstype_to_typedindex_to_data + .entry(prev_height) + .or_default() + .get_mut(output_type) + .unwrap() + .push((typeindex, value)); + } + + ( + height_to_transacted, + height_to_addresstype_to_typedindex_to_data, + addresstype_to_typeindex_to_addressdatawithsource, + ) + }, + ) + .reduce( + || { + ( + FxHashMap::::default(), + HeightToAddressTypeToVec::<(TypeIndex, Sats)>::default(), + AddressTypeToTypeIndexMap::default(), + ) + }, + |( + height_to_transacted, + addresstype_to_typedindex_to_data, + addresstype_to_typeindex_to_addressdatawithsource, + ), + ( + height_to_transacted2, + addresstype_to_typedindex_to_data2, + addresstype_to_typeindex_to_addressdatawithsource2, + )| { + let (mut height_to_transacted, height_to_transacted_consumed) = + if height_to_transacted.len() > height_to_transacted2.len() + { + (height_to_transacted, height_to_transacted2) + } else { + (height_to_transacted2, height_to_transacted) + }; + height_to_transacted_consumed + .into_iter() + .for_each(|(k, v)| { + *height_to_transacted.entry(k).or_default() += v; + }); + + let ( + mut addresstype_to_typedindex_to_data, + addresstype_to_typedindex_to_data_consumed, + ) = if addresstype_to_typedindex_to_data.len() + > addresstype_to_typedindex_to_data2.len() + { + ( + addresstype_to_typedindex_to_data, + addresstype_to_typedindex_to_data2, + ) + } else { + ( + addresstype_to_typedindex_to_data2, + addresstype_to_typedindex_to_data, + ) + }; + addresstype_to_typedindex_to_data_consumed + .0 + .into_iter() + .for_each(|(k, v)| { + addresstype_to_typedindex_to_data + .entry(k) + .or_default() + .merge_mut(v); + }); + + ( + height_to_transacted, + addresstype_to_typedindex_to_data, + addresstype_to_typeindex_to_addressdatawithsource.merge( + addresstype_to_typeindex_to_addressdatawithsource2, + ), + ) }, ); - let addresstype_to_typeindex_to_addressdatawithsource = - receiving_addresstype_to_typeindex_to_addressdatawithsource - .merge(sending_addresstype_to_typeindex_to_addressdatawithsource); + let addresstype_to_typeindex_to_addressdatawithsource = + receiving_addresstype_to_typeindex_to_addressdatawithsource + .merge(sending_addresstype_to_typeindex_to_addressdatawithsource); - ( - transacted, - addresstype_to_typedindex_to_received_data, - height_to_sent, - addresstype_to_typedindex_to_sent_data, - addresstype_to_typeindex_to_addressdatawithsource, - ) - }); + ( + transacted, + addresstype_to_typedindex_to_received_data, + height_to_sent, + addresstype_to_typedindex_to_sent_data, + addresstype_to_typeindex_to_addressdatawithsource, + ) + }); - thread::scope(|scope| { - scope.spawn(|| { - addresstype_to_typedindex_to_received_data.process_received( + thread::scope(|scope| { + scope.spawn(|| { + addresstype_to_typedindex_to_received_data.process_received( + &mut self.address_cohorts, + &mut addresstype_to_typeindex_to_loadedaddressdata, + &mut addresstype_to_typeindex_to_emptyaddressdata, + price, + &mut addresstype_to_addr_count, + &mut addresstype_to_empty_addr_count, + &mut stored_or_new_addresstype_to_typeindex_to_addressdatawithsource, + ); + + addresstype_to_typedindex_to_sent_data + .process_sent( &mut self.address_cohorts, &mut addresstype_to_typeindex_to_loadedaddressdata, &mut addresstype_to_typeindex_to_emptyaddressdata, price, &mut addresstype_to_addr_count, &mut addresstype_to_empty_addr_count, + height_to_price_close_vec.as_ref(), + &height_to_timestamp_fixed_vec, + height, + timestamp, &mut stored_or_new_addresstype_to_typeindex_to_addressdatawithsource, - ); - - addresstype_to_typedindex_to_sent_data - .process_sent( - &mut self.address_cohorts, - &mut addresstype_to_typeindex_to_loadedaddressdata, - &mut addresstype_to_typeindex_to_emptyaddressdata, - price, - &mut addresstype_to_addr_count, - &mut addresstype_to_empty_addr_count, - height_to_price_close_vec.as_ref(), - &height_to_timestamp_fixed_vec, - height, - timestamp, - &mut stored_or_new_addresstype_to_typeindex_to_addressdatawithsource, - ) - .unwrap(); - }); - - if chain_state_starting_height > height { - dbg!(chain_state_starting_height, height); - panic!("temp, just making sure") - } - - unspendable_supply += transacted - .by_type - .unspendable - .as_vec() - .into_iter() - .map(|state| state.value) - .sum::() - + height_to_unclaimed_rewards_iter.unwrap_get_inner(height); - - opreturn_supply += transacted.by_type.unspendable.opreturn.value; - - if height == Height::new(0) { - transacted = Transacted::default(); - unspendable_supply += Sats::FIFTY_BTC; - } else if height == Height::new(91_842) || height == Height::new(91_880) { - // Need to destroy invalid coinbases due to duplicate txids - if height == Height::new(91_842) { - height_to_sent.entry(Height::new(91_812)).or_default() - } else { - height_to_sent.entry(Height::new(91_722)).or_default() - } - .iterate(Sats::FIFTY_BTC, OutputType::P2PK65); - }; - - // Push current block state before processing sends and receives - chain_state.push(BlockState { - supply: transacted.spendable_supply.clone(), - price, - timestamp, - }); - - self.utxo_cohorts.receive(transacted, height, price); - - self.utxo_cohorts.send(height_to_sent, &mut chain_state); + ) + .unwrap(); }); - self.height_to_unspendable_supply.forced_push_at( - height, - unspendable_supply, - exit, - )?; - - self.height_to_opreturn_supply - .forced_push_at(height, opreturn_supply, exit)?; - - self.addresstype_to_height_to_addr_count.forced_push_at( - height, - &addresstype_to_addr_count, - exit, - )?; - - self.addresstype_to_height_to_empty_addr_count - .forced_push_at(height, &addresstype_to_empty_addr_count, exit)?; - - let date = height_to_date_fixed_iter.unwrap_get_inner(height); - let dateindex = DateIndex::try_from(date).unwrap(); - let date_first_height = - dateindex_to_first_height_iter.unwrap_get_inner(dateindex); - let date_height_count = - dateindex_to_height_count_iter.unwrap_get_inner(dateindex); - let is_date_last_height = date_first_height - + Height::from(date_height_count).decremented().unwrap() - == height; - let date_price = dateindex_to_price_close_iter - .as_mut() - .map(|v| is_date_last_height.then(|| *v.unwrap_get_inner(dateindex))); - - let dateindex = is_date_last_height.then_some(dateindex); - - self.utxo_cohorts.iter_separate_mut().par_bridge() - .map(|Filtered(_, v)| v as &mut dyn DynCohortVecs) - .chain( - self.address_cohorts.iter_separate_mut() - .par_bridge() - .map(|Filtered(_, v)| v as &mut dyn DynCohortVecs), - ) - .try_for_each(|v| { - v.forced_pushed_at(height, exit)?; - v.compute_then_force_push_unrealized_states( - height, price, dateindex, date_price, exit, - ) - })?; - - if height != last_height && height != Height::ZERO && height.to_usize() % 10_000 == 0 { - let _lock = exit.lock(); - - addresstypeindex_to_anyaddressindex_reader_opt.take(); - anyaddressindex_to_anyaddressdata_reader_opt.take(); - - self.flush_states(height, &chain_state, mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata), mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata), false, exit)?; - - self.reset_readers_options( - &mut addresstypeindex_to_anyaddressindex_reader_opt, - &mut anyaddressindex_to_anyaddressdata_reader_opt, - ); + if chain_state_starting_height > height { + dbg!(chain_state_starting_height, height); + panic!("temp, just making sure") } - Ok(()) - })?; + unspendable_supply += transacted + .by_type + .unspendable + .as_vec() + .into_iter() + .map(|state| state.value) + .sum::() + + height_to_unclaimed_rewards_iter.unwrap_get_inner(height); + + opreturn_supply += transacted.by_type.unspendable.opreturn.value; + + if height == Height::new(0) { + transacted = Transacted::default(); + unspendable_supply += Sats::FIFTY_BTC; + } else if height == Height::new(91_842) || height == Height::new(91_880) { + // Need to destroy invalid coinbases due to duplicate txids + if height == Height::new(91_842) { + height_to_sent.entry(Height::new(91_812)).or_default() + } else { + height_to_sent.entry(Height::new(91_722)).or_default() + } + .iterate(Sats::FIFTY_BTC, OutputType::P2PK65); + }; + + // Push current block state before processing sends and receives + chain_state.push(BlockState { + supply: transacted.spendable_supply.clone(), + price, + timestamp, + }); + + self.utxo_cohorts.receive(transacted, height, price); + + self.utxo_cohorts.send(height_to_sent, &mut chain_state); + }); + + self.height_to_unspendable_supply.forced_push_at( + height, + unspendable_supply, + exit, + )?; + + self.height_to_opreturn_supply + .forced_push_at(height, opreturn_supply, exit)?; + + self.addresstype_to_height_to_addr_count.forced_push_at( + height, + &addresstype_to_addr_count, + exit, + )?; + + self.addresstype_to_height_to_empty_addr_count + .forced_push_at(height, &addresstype_to_empty_addr_count, exit)?; + + let date = height_to_date_fixed_iter.unwrap_get_inner(height); + let dateindex = DateIndex::try_from(date).unwrap(); + let date_first_height = dateindex_to_first_height_iter.unwrap_get_inner(dateindex); + let date_height_count = dateindex_to_height_count_iter.unwrap_get_inner(dateindex); + let is_date_last_height = date_first_height + + Height::from(date_height_count).decremented().unwrap() + == height; + let date_price = dateindex_to_price_close_iter + .as_mut() + .map(|v| is_date_last_height.then(|| *v.unwrap_get_inner(dateindex))); + + let dateindex = is_date_last_height.then_some(dateindex); + + self.utxo_cohorts + .par_iter_separate_mut() + .map(|Filtered(_, v)| v as &mut dyn DynCohortVecs) + .chain( + self.address_cohorts + .par_iter_separate_mut() + .map(|Filtered(_, v)| v as &mut dyn DynCohortVecs), + ) + .try_for_each(|v| { + v.forced_pushed_at(height, exit)?; + v.compute_then_force_push_unrealized_states( + height, price, dateindex, date_price, exit, + ) + })?; + + if height != last_height + && height != Height::ZERO + && height.to_usize() % 10_000 == 0 + { + let _lock = exit.lock(); + + drop(vr); + + self.flush_states( + height, + &chain_state, + mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata), + mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata), + false, + exit, + )?; + + vr = VecsReaders::new(self); + } + } + + drop(vr); let _lock = exit.lock(); - - addresstypeindex_to_anyaddressindex_reader_opt.take(); - anyaddressindex_to_anyaddressdata_reader_opt.take(); - self.flush_states( height, &chain_state, @@ -1468,24 +1460,15 @@ impl Vecs { address_type: OutputType, typeindex: TypeIndex, first_addressindexes: &ByAddressType, - addresstype_to_typeindex_to_loadedaddressdata: &AddressTypeToTypeIndexTree< + addresstype_to_typeindex_to_loadedaddressdata: &AddressTypeToTypeIndexMap< WithAddressDataSource, >, - addresstype_to_typeindex_to_emptyaddressdata: &AddressTypeToTypeIndexTree< + addresstype_to_typeindex_to_emptyaddressdata: &AddressTypeToTypeIndexMap< WithAddressDataSource, >, - addresstypeindex_to_anyaddressindex_reader_opt: &ByAddressType>, - anyaddressindex_to_anyaddressdata_reader_opt: &ByAnyAddress>, - p2pk33addressindex_to_anyaddressindex: &RawVec, - p2pk65addressindex_to_anyaddressindex: &RawVec, - p2pkhaddressindex_to_anyaddressindex: &RawVec, - p2shaddressindex_to_anyaddressindex: &RawVec, - p2traddressindex_to_anyaddressindex: &RawVec, - p2wpkhaddressindex_to_anyaddressindex: &RawVec, - p2wshaddressindex_to_anyaddressindex: &RawVec, - p2aaddressindex_to_anyaddressindex: &RawVec, - loadedaddressindex_to_loadedaddressdata: &RawVec, - emptyaddressindex_to_emptyaddressdata: &RawVec, + vr: &VecsReaders, + any_address_indexes: &AnyAddressIndexes, + addresses_data: &AddressesData, ) -> Option> { if *first_addressindexes.get(address_type).unwrap() <= typeindex { return Some(WithAddressDataSource::New(LoadedAddressData::default())); @@ -1503,51 +1486,18 @@ impl Vecs { return None; } - let reader = addresstypeindex_to_anyaddressindex_reader_opt - .get_unwrap(address_type) - .as_ref() - .unwrap(); + let reader = vr.get_anyaddressindex_reader(address_type); - let anyaddressindex = match address_type { - OutputType::P2PK33 => { - p2pk33addressindex_to_anyaddressindex.get_any_or_read(typeindex.into(), reader) - } - OutputType::P2PK65 => { - p2pk65addressindex_to_anyaddressindex.get_any_or_read(typeindex.into(), reader) - } - OutputType::P2PKH => { - p2pkhaddressindex_to_anyaddressindex.get_any_or_read(typeindex.into(), reader) - } - OutputType::P2SH => { - p2shaddressindex_to_anyaddressindex.get_any_or_read(typeindex.into(), reader) - } - OutputType::P2TR => { - p2traddressindex_to_anyaddressindex.get_any_or_read(typeindex.into(), reader) - } - OutputType::P2WPKH => { - p2wpkhaddressindex_to_anyaddressindex.get_any_or_read(typeindex.into(), reader) - } - OutputType::P2WSH => { - p2wshaddressindex_to_anyaddressindex.get_any_or_read(typeindex.into(), reader) - } - OutputType::P2A => { - p2aaddressindex_to_anyaddressindex.get_any_or_read(typeindex.into(), reader) - } - _ => unreachable!(), - } - .unwrap() - .unwrap() - .into_owned(); + let anyaddressindex = + any_address_indexes.get_anyaddressindex(address_type, typeindex, reader); Some(match anyaddressindex.to_enum() { AnyAddressDataIndexEnum::Loaded(loadedaddressindex) => { - let mmap = anyaddressindex_to_anyaddressdata_reader_opt - .loaded - .as_ref() - .unwrap(); + let reader = &vr.anyaddressindex_to_anyaddressdata.loaded; - let loadedaddressdata = loadedaddressindex_to_loadedaddressdata - .get_any_or_read(loadedaddressindex, mmap) + let loadedaddressdata = addresses_data + .loaded + .get_any_or_read(loadedaddressindex, reader) .unwrap() .unwrap() .into_owned(); @@ -1558,13 +1508,11 @@ impl Vecs { )) } AnyAddressDataIndexEnum::Empty(emtpyaddressindex) => { - let mmap = anyaddressindex_to_anyaddressdata_reader_opt - .empty - .as_ref() - .unwrap(); + let reader = &vr.anyaddressindex_to_anyaddressdata.empty; - let emptyaddressdata = emptyaddressindex_to_emptyaddressdata - .get_any_or_read(emtpyaddressindex, mmap) + let emptyaddressdata = addresses_data + .empty + .get_any_or_read(emtpyaddressindex, reader) .unwrap() .unwrap() .into_owned(); @@ -1577,71 +1525,14 @@ impl Vecs { }) } - fn reset_readers_options( - &self, - addresstypeindex_to_anyaddressindex_reader_opt: &mut ByAddressType>, - anyaddressindex_to_anyaddressdata_reader_opt: &mut ByAnyAddress>, - ) { - addresstypeindex_to_anyaddressindex_reader_opt - .p2pk65 - .replace( - self.p2pk65addressindex_to_anyaddressindex - .create_static_reader(), - ); - addresstypeindex_to_anyaddressindex_reader_opt - .p2pk33 - .replace( - self.p2pk33addressindex_to_anyaddressindex - .create_static_reader(), - ); - addresstypeindex_to_anyaddressindex_reader_opt - .p2pkh - .replace( - self.p2pkhaddressindex_to_anyaddressindex - .create_static_reader(), - ); - addresstypeindex_to_anyaddressindex_reader_opt.p2sh.replace( - self.p2shaddressindex_to_anyaddressindex - .create_static_reader(), - ); - addresstypeindex_to_anyaddressindex_reader_opt - .p2wpkh - .replace( - self.p2wpkhaddressindex_to_anyaddressindex - .create_static_reader(), - ); - addresstypeindex_to_anyaddressindex_reader_opt - .p2wsh - .replace( - self.p2wshaddressindex_to_anyaddressindex - .create_static_reader(), - ); - addresstypeindex_to_anyaddressindex_reader_opt.p2tr.replace( - self.p2traddressindex_to_anyaddressindex - .create_static_reader(), - ); - addresstypeindex_to_anyaddressindex_reader_opt.p2a.replace( - self.p2aaddressindex_to_anyaddressindex - .create_static_reader(), - ); - anyaddressindex_to_anyaddressdata_reader_opt.loaded.replace( - self.loadedaddressindex_to_loadedaddressdata - .create_static_reader(), - ); - anyaddressindex_to_anyaddressdata_reader_opt.empty.replace( - self.emptyaddressindex_to_emptyaddressdata - .create_static_reader(), - ); - } - fn flush_states( &mut self, height: Height, chain_state: &[BlockState], - addresstype_to_typeindex_to_loadedaddressdata: AddressTypeToTypeIndexTree< + addresstype_to_typeindex_to_loadedaddressdata: AddressTypeToTypeIndexMap< WithAddressDataSource, >, - addresstype_to_typeindex_to_emptyaddressdata: AddressTypeToTypeIndexTree< + addresstype_to_typeindex_to_emptyaddressdata: AddressTypeToTypeIndexMap< WithAddressDataSource, >, with_changes: bool, @@ -1650,12 +1541,10 @@ impl Vecs { info!("Flushing..."); self.utxo_cohorts - .iter_separate_mut() - .par_bridge() + .par_iter_separate_mut() .try_for_each(|Filtered(_, v)| v.safe_flush_stateful_vecs(height, exit))?; self.address_cohorts - .iter_separate_mut() - .par_bridge() + .par_iter_separate_mut() .try_for_each(|Filtered(_, v)| v.safe_flush_stateful_vecs(height, exit))?; self.height_to_unspendable_supply.safe_flush(exit)?; self.height_to_opreturn_supply.safe_flush(exit)?; @@ -1667,172 +1556,119 @@ impl Vecs { .try_for_each(|v| v.safe_flush(exit))?; let mut addresstype_to_typeindex_to_new_or_updated_anyaddressindex = - AddressTypeToTypeIndexTree::default(); + AddressTypeToTypeIndexMap::default(); - addresstype_to_typeindex_to_emptyaddressdata - .unwrap() - .into_iter_typed() - .try_for_each(|(_type, tree)| -> Result<()> { - tree.into_iter().try_for_each( - |(typeindex, emptyaddressdata_with_source)| -> Result<()> { - match emptyaddressdata_with_source { - WithAddressDataSource::New(emptyaddressdata) => { - let emptyaddressindex = self - .emptyaddressindex_to_emptyaddressdata - .fill_first_hole_or_push(emptyaddressdata)?; + for (address_type, sorted) in + addresstype_to_typeindex_to_emptyaddressdata.into_sorted_iter() + { + for (typeindex, emptyaddressdata_with_source) in sorted.into_iter() { + match emptyaddressdata_with_source { + WithAddressDataSource::New(emptyaddressdata) => { + let emptyaddressindex = self + .addresses_data + .empty + .fill_first_hole_or_push(emptyaddressdata)?; - let anyaddressindex = AnyAddressIndex::from(emptyaddressindex); + let anyaddressindex = AnyAddressIndex::from(emptyaddressindex); - addresstype_to_typeindex_to_new_or_updated_anyaddressindex - .get_mut(_type) - .unwrap() - .insert(typeindex, anyaddressindex); + addresstype_to_typeindex_to_new_or_updated_anyaddressindex + .get_mut(address_type) + .unwrap() + .insert(typeindex, anyaddressindex); + } + WithAddressDataSource::FromEmptyAddressDataVec(( + emptyaddressindex, + emptyaddressdata, + )) => self + .addresses_data + .empty + .update(emptyaddressindex, emptyaddressdata)?, + WithAddressDataSource::FromLoadedAddressDataVec(( + loadedaddressindex, + emptyaddressdata, + )) => { + self.addresses_data.loaded.delete(loadedaddressindex); - Ok(()) - } - WithAddressDataSource::FromEmptyAddressDataVec(( - emptyaddressindex, - emptyaddressdata, - )) => self - .emptyaddressindex_to_emptyaddressdata - .update(emptyaddressindex, emptyaddressdata) - .map_err(|e| e.into()), - WithAddressDataSource::FromLoadedAddressDataVec(( - loadedaddressindex, - emptyaddressdata, - )) => { - self.loadedaddressindex_to_loadedaddressdata - .delete(loadedaddressindex); + let emptyaddressindex = self + .addresses_data + .empty + .fill_first_hole_or_push(emptyaddressdata)?; - let emptyaddressindex = self - .emptyaddressindex_to_emptyaddressdata - .fill_first_hole_or_push(emptyaddressdata)?; + let anyaddressindex = emptyaddressindex.into(); - let anyaddressindex = emptyaddressindex.into(); + addresstype_to_typeindex_to_new_or_updated_anyaddressindex + .get_mut(address_type) + .unwrap() + .insert(typeindex, anyaddressindex); + } + } + } + } - addresstype_to_typeindex_to_new_or_updated_anyaddressindex - .get_mut(_type) - .unwrap() - .insert(typeindex, anyaddressindex); + for (address_type, sorted) in + addresstype_to_typeindex_to_loadedaddressdata.into_sorted_iter() + { + for (typeindex, loadedaddressdata_with_source) in sorted.into_iter() { + match loadedaddressdata_with_source { + WithAddressDataSource::New(loadedaddressdata) => { + let loadedaddressindex = self + .addresses_data + .loaded + .fill_first_hole_or_push(loadedaddressdata)?; - Ok(()) - } - } - }, - ) - })?; + let anyaddressindex = AnyAddressIndex::from(loadedaddressindex); - addresstype_to_typeindex_to_loadedaddressdata - .unwrap() - .into_iter_typed() - .try_for_each(|(_type, tree)| -> Result<()> { - tree.into_iter().try_for_each( - |(typeindex, loadedaddressdata_with_source)| -> Result<()> { - match loadedaddressdata_with_source { - WithAddressDataSource::New(loadedaddressdata) => { - let loadedaddressindex = self - .loadedaddressindex_to_loadedaddressdata - .fill_first_hole_or_push(loadedaddressdata)?; + addresstype_to_typeindex_to_new_or_updated_anyaddressindex + .get_mut(address_type) + .unwrap() + .insert(typeindex, anyaddressindex); + } + WithAddressDataSource::FromLoadedAddressDataVec(( + loadedaddressindex, + loadedaddressdata, + )) => self + .addresses_data + .loaded + .update(loadedaddressindex, loadedaddressdata)?, + WithAddressDataSource::FromEmptyAddressDataVec(( + emptyaddressindex, + loadedaddressdata, + )) => { + self.addresses_data.empty.delete(emptyaddressindex); - let anyaddressindex = AnyAddressIndex::from(loadedaddressindex); + let loadedaddressindex = self + .addresses_data + .loaded + .fill_first_hole_or_push(loadedaddressdata)?; - addresstype_to_typeindex_to_new_or_updated_anyaddressindex - .get_mut(_type) - .unwrap() - .insert(typeindex, anyaddressindex); + let anyaddressindex = loadedaddressindex.into(); - Ok(()) - } - WithAddressDataSource::FromLoadedAddressDataVec(( - loadedaddressindex, - loadedaddressdata, - )) => self - .loadedaddressindex_to_loadedaddressdata - .update(loadedaddressindex, loadedaddressdata) - .map_err(|e| e.into()), - WithAddressDataSource::FromEmptyAddressDataVec(( - emptyaddressindex, - loadedaddressdata, - )) => { - self.emptyaddressindex_to_emptyaddressdata - .delete(emptyaddressindex); + addresstype_to_typeindex_to_new_or_updated_anyaddressindex + .get_mut(address_type) + .unwrap() + .insert(typeindex, anyaddressindex); + } + } + } + } - let loadedaddressindex = self - .loadedaddressindex_to_loadedaddressdata - .fill_first_hole_or_push(loadedaddressdata)?; - - let anyaddressindex = loadedaddressindex.into(); - - addresstype_to_typeindex_to_new_or_updated_anyaddressindex - .get_mut(_type) - .unwrap() - .insert(typeindex, anyaddressindex); - - Ok(()) - } - } - }, - ) - })?; - - addresstype_to_typeindex_to_new_or_updated_anyaddressindex - .unwrap() - .into_iter_typed() - .try_for_each(|(_type, tree)| -> Result<()> { - tree.into_iter() - .try_for_each(|(typeindex, anyaddressindex)| -> Result<()> { - match _type { - OutputType::P2PK33 => self - .p2pk33addressindex_to_anyaddressindex - .update_or_push(typeindex.into(), anyaddressindex), - OutputType::P2PK65 => self - .p2pk65addressindex_to_anyaddressindex - .update_or_push(typeindex.into(), anyaddressindex), - OutputType::P2PKH => self - .p2pkhaddressindex_to_anyaddressindex - .update_or_push(typeindex.into(), anyaddressindex), - OutputType::P2SH => self - .p2shaddressindex_to_anyaddressindex - .update_or_push(typeindex.into(), anyaddressindex), - OutputType::P2TR => self - .p2traddressindex_to_anyaddressindex - .update_or_push(typeindex.into(), anyaddressindex), - OutputType::P2WPKH => self - .p2wpkhaddressindex_to_anyaddressindex - .update_or_push(typeindex.into(), anyaddressindex), - OutputType::P2WSH => self - .p2wshaddressindex_to_anyaddressindex - .update_or_push(typeindex.into(), anyaddressindex), - OutputType::P2A => self - .p2aaddressindex_to_anyaddressindex - .update_or_push(typeindex.into(), anyaddressindex), - _ => unreachable!(), - }?; - Ok(()) - }) - })?; + for (address_type, sorted) in + addresstype_to_typeindex_to_new_or_updated_anyaddressindex.into_sorted_iter() + { + for (typeindex, anyaddressindex) in sorted { + self.any_address_indexes.update_or_push( + address_type, + typeindex, + anyaddressindex, + )?; + } + } let stamp = Stamp::from(height); - self.p2pk33addressindex_to_anyaddressindex + self.any_address_indexes .stamped_flush_maybe_with_changes(stamp, with_changes)?; - self.p2pk65addressindex_to_anyaddressindex - .stamped_flush_maybe_with_changes(stamp, with_changes)?; - self.p2pkhaddressindex_to_anyaddressindex - .stamped_flush_maybe_with_changes(stamp, with_changes)?; - self.p2shaddressindex_to_anyaddressindex - .stamped_flush_maybe_with_changes(stamp, with_changes)?; - self.p2traddressindex_to_anyaddressindex - .stamped_flush_maybe_with_changes(stamp, with_changes)?; - self.p2wpkhaddressindex_to_anyaddressindex - .stamped_flush_maybe_with_changes(stamp, with_changes)?; - self.p2wshaddressindex_to_anyaddressindex - .stamped_flush_maybe_with_changes(stamp, with_changes)?; - self.p2aaddressindex_to_anyaddressindex - .stamped_flush_maybe_with_changes(stamp, with_changes)?; - self.loadedaddressindex_to_loadedaddressdata - .stamped_flush_maybe_with_changes(stamp, with_changes)?; - self.emptyaddressindex_to_emptyaddressdata + self.addresses_data .stamped_flush_maybe_with_changes(stamp, with_changes)?; self.chain_state.truncate_if_needed(Height::ZERO)?; @@ -1851,16 +1687,16 @@ impl AddressTypeToVec<(TypeIndex, Sats)> { fn process_received( self, vecs: &mut address_cohorts::Vecs, - addresstype_to_typeindex_to_loadedaddressdata: &mut AddressTypeToTypeIndexTree< + addresstype_to_typeindex_to_loadedaddressdata: &mut AddressTypeToTypeIndexMap< WithAddressDataSource, >, - addresstype_to_typeindex_to_emptyaddressdata: &mut AddressTypeToTypeIndexTree< + addresstype_to_typeindex_to_emptyaddressdata: &mut AddressTypeToTypeIndexMap< WithAddressDataSource, >, price: Option, addresstype_to_addr_count: &mut ByAddressType, addresstype_to_empty_addr_count: &mut ByAddressType, - stored_or_new_addresstype_to_typeindex_to_addressdatawithsource: &mut AddressTypeToTypeIndexTree< + stored_or_new_addresstype_to_typeindex_to_addressdatawithsource: &mut AddressTypeToTypeIndexMap< WithAddressDataSource, >, ) { @@ -1885,10 +1721,7 @@ impl AddressTypeToVec<(TypeIndex, Sats)> { .unwrap_or_else(|| { let addressdata = stored_or_new_addresstype_to_typeindex_to_addressdatawithsource - .get_mut(_type) - .unwrap() - .remove(&type_index) - .unwrap(); + .remove_for_type(_type, &type_index); is_new = addressdata.is_new(); from_any_empty = addressdata.is_from_emptyaddressdata(); addressdata @@ -1951,10 +1784,10 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> { fn process_sent( self, vecs: &mut address_cohorts::Vecs, - addresstype_to_typeindex_to_loadedaddressdata: &mut AddressTypeToTypeIndexTree< + addresstype_to_typeindex_to_loadedaddressdata: &mut AddressTypeToTypeIndexMap< WithAddressDataSource, >, - addresstype_to_typeindex_to_emptyaddressdata: &mut AddressTypeToTypeIndexTree< + addresstype_to_typeindex_to_emptyaddressdata: &mut AddressTypeToTypeIndexMap< WithAddressDataSource, >, price: Option, @@ -1964,7 +1797,7 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> { height_to_timestamp_fixed_vec: &[Timestamp], height: Height, timestamp: Timestamp, - stored_or_new_addresstype_to_typeindex_to_addressdatawithsource: &mut AddressTypeToTypeIndexTree< + stored_or_new_addresstype_to_typeindex_to_addressdatawithsource: &mut AddressTypeToTypeIndexMap< WithAddressDataSource, >, ) -> Result<()> { @@ -1989,18 +1822,13 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> { v.unwrap().into_iter_typed().try_for_each(|(_type, vec)| { vec.into_iter().try_for_each(|(type_index, value)| { let typeindex_to_loadedaddressdata = - addresstype_to_typeindex_to_loadedaddressdata - .get_mut(_type) - .unwrap(); + addresstype_to_typeindex_to_loadedaddressdata.get_mut_unwrap(_type); let addressdata_withsource = typeindex_to_loadedaddressdata .entry(type_index) .or_insert_with(|| { stored_or_new_addresstype_to_typeindex_to_addressdatawithsource - .get_mut(_type) - .unwrap() - .remove(&type_index) - .unwrap() + .remove_for_type(_type, &type_index) }); let addressdata = addressdata_withsource.deref_mut(); @@ -2073,3 +1901,206 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> { }) } } + +#[derive(Clone, Traversable)] +pub struct AnyAddressIndexes { + pub p2pk33: RawVec, + pub p2pk65: RawVec, + pub p2pkh: RawVec, + pub p2sh: RawVec, + pub p2tr: RawVec, + pub p2wpkh: RawVec, + pub p2wsh: RawVec, + pub p2a: RawVec, +} + +impl AnyAddressIndexes { + fn min_stamped_height(&self) -> Height { + Height::from(self.p2pk33.stamp()) + .incremented() + .min(Height::from(self.p2pk65.stamp()).incremented()) + .min(Height::from(self.p2pkh.stamp()).incremented()) + .min(Height::from(self.p2sh.stamp()).incremented()) + .min(Height::from(self.p2tr.stamp()).incremented()) + .min(Height::from(self.p2wpkh.stamp()).incremented()) + .min(Height::from(self.p2wsh.stamp()).incremented()) + .min(Height::from(self.p2a.stamp()).incremented()) + } + + fn rollback_before(&mut self, stamp: Stamp) -> Result<[Stamp; 8]> { + Ok([ + self.p2pk33.rollback_before(stamp)?, + self.p2pk65.rollback_before(stamp)?, + self.p2pkh.rollback_before(stamp)?, + self.p2sh.rollback_before(stamp)?, + self.p2tr.rollback_before(stamp)?, + self.p2wpkh.rollback_before(stamp)?, + self.p2wsh.rollback_before(stamp)?, + self.p2a.rollback_before(stamp)?, + ]) + } + + fn reset(&mut self) -> Result<()> { + self.p2pk33.reset()?; + self.p2pk65.reset()?; + self.p2pkh.reset()?; + self.p2sh.reset()?; + self.p2tr.reset()?; + self.p2wpkh.reset()?; + self.p2wsh.reset()?; + self.p2a.reset()?; + Ok(()) + } + + fn get_anyaddressindex( + &self, + address_type: OutputType, + typeindex: TypeIndex, + reader: &Reader<'static>, + ) -> AnyAddressIndex { + let result = match address_type { + OutputType::P2PK33 => self.p2pk33.get_any_or_read(typeindex.into(), reader), + OutputType::P2PK65 => self.p2pk65.get_any_or_read(typeindex.into(), reader), + OutputType::P2PKH => self.p2pkh.get_any_or_read(typeindex.into(), reader), + OutputType::P2SH => self.p2sh.get_any_or_read(typeindex.into(), reader), + OutputType::P2TR => self.p2tr.get_any_or_read(typeindex.into(), reader), + OutputType::P2WPKH => self.p2wpkh.get_any_or_read(typeindex.into(), reader), + OutputType::P2WSH => self.p2wsh.get_any_or_read(typeindex.into(), reader), + OutputType::P2A => self.p2a.get_any_or_read(typeindex.into(), reader), + _ => unreachable!(), + }; + result.unwrap().unwrap().into_owned() + } + + fn update_or_push( + &mut self, + address_type: OutputType, + typeindex: TypeIndex, + anyaddressindex: AnyAddressIndex, + ) -> Result<()> { + (match address_type { + OutputType::P2PK33 => self + .p2pk33 + .update_or_push(typeindex.into(), anyaddressindex), + OutputType::P2PK65 => self + .p2pk65 + .update_or_push(typeindex.into(), anyaddressindex), + OutputType::P2PKH => self.p2pkh.update_or_push(typeindex.into(), anyaddressindex), + OutputType::P2SH => self.p2sh.update_or_push(typeindex.into(), anyaddressindex), + OutputType::P2TR => self.p2tr.update_or_push(typeindex.into(), anyaddressindex), + OutputType::P2WPKH => self + .p2wpkh + .update_or_push(typeindex.into(), anyaddressindex), + OutputType::P2WSH => self.p2wsh.update_or_push(typeindex.into(), anyaddressindex), + OutputType::P2A => self.p2a.update_or_push(typeindex.into(), anyaddressindex), + _ => unreachable!(), + })?; + Ok(()) + } + + fn stamped_flush_maybe_with_changes(&mut self, stamp: Stamp, with_changes: bool) -> Result<()> { + self.p2pk33 + .stamped_flush_maybe_with_changes(stamp, with_changes)?; + self.p2pk65 + .stamped_flush_maybe_with_changes(stamp, with_changes)?; + self.p2pkh + .stamped_flush_maybe_with_changes(stamp, with_changes)?; + self.p2sh + .stamped_flush_maybe_with_changes(stamp, with_changes)?; + self.p2tr + .stamped_flush_maybe_with_changes(stamp, with_changes)?; + self.p2wpkh + .stamped_flush_maybe_with_changes(stamp, with_changes)?; + self.p2wsh + .stamped_flush_maybe_with_changes(stamp, with_changes)?; + self.p2a + .stamped_flush_maybe_with_changes(stamp, with_changes)?; + Ok(()) + } +} + +#[derive(Clone, Traversable)] +pub struct AddressesData { + pub loaded: RawVec, + pub empty: RawVec, +} + +impl AddressesData { + fn min_stamped_height(&self) -> Height { + Height::from(self.loaded.stamp()) + .incremented() + .min(Height::from(self.empty.stamp()).incremented()) + } + + fn rollback_before(&mut self, stamp: Stamp) -> Result<[Stamp; 2]> { + Ok([ + self.loaded.rollback_before(stamp)?, + self.empty.rollback_before(stamp)?, + ]) + } + + fn reset(&mut self) -> Result<()> { + self.loaded.reset()?; + self.empty.reset()?; + Ok(()) + } + + fn stamped_flush_maybe_with_changes(&mut self, stamp: Stamp, with_changes: bool) -> Result<()> { + self.loaded + .stamped_flush_maybe_with_changes(stamp, with_changes)?; + self.empty + .stamped_flush_maybe_with_changes(stamp, with_changes)?; + Ok(()) + } +} + +struct IndexerReaders<'a> { + txinindex_to_outpoint: Reader<'a>, + txindex_to_first_txoutindex: Reader<'a>, + txoutindex_to_value: Reader<'a>, + txoutindex_to_outputtype: Reader<'a>, + txoutindex_to_typeindex: Reader<'a>, +} + +impl<'a> IndexerReaders<'a> { + fn new(indexer: &'a Indexer) -> Self { + Self { + txinindex_to_outpoint: indexer.vecs.txinindex_to_outpoint.create_reader(), + txindex_to_first_txoutindex: indexer.vecs.txindex_to_first_txoutindex.create_reader(), + txoutindex_to_value: indexer.vecs.txoutindex_to_value.create_reader(), + txoutindex_to_outputtype: indexer.vecs.txoutindex_to_outputtype.create_reader(), + txoutindex_to_typeindex: indexer.vecs.txoutindex_to_typeindex.create_reader(), + } + } +} + +struct VecsReaders { + addresstypeindex_to_anyaddressindex: ByAddressType>, + anyaddressindex_to_anyaddressdata: ByAnyAddress>, +} + +impl VecsReaders { + fn new(vecs: &Vecs) -> Self { + Self { + addresstypeindex_to_anyaddressindex: ByAddressType { + p2pk33: vecs.any_address_indexes.p2pk33.create_static_reader(), + p2pk65: vecs.any_address_indexes.p2pk65.create_static_reader(), + p2pkh: vecs.any_address_indexes.p2pkh.create_static_reader(), + p2sh: vecs.any_address_indexes.p2sh.create_static_reader(), + p2tr: vecs.any_address_indexes.p2tr.create_static_reader(), + p2wpkh: vecs.any_address_indexes.p2wpkh.create_static_reader(), + p2wsh: vecs.any_address_indexes.p2wsh.create_static_reader(), + p2a: vecs.any_address_indexes.p2a.create_static_reader(), + }, + anyaddressindex_to_anyaddressdata: ByAnyAddress { + loaded: vecs.addresses_data.loaded.create_static_reader(), + empty: vecs.addresses_data.empty.create_static_reader(), + }, + } + } + + fn get_anyaddressindex_reader(&self, address_type: OutputType) -> &Reader<'static> { + self.addresstypeindex_to_anyaddressindex + .get_unwrap(address_type) + } +} diff --git a/crates/brk_computer/src/stateful/utxo_cohorts.rs b/crates/brk_computer/src/stateful/utxo_cohorts.rs index cdcfb69e0..4645d19a9 100644 --- a/crates/brk_computer/src/stateful/utxo_cohorts.rs +++ b/crates/brk_computer/src/stateful/utxo_cohorts.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, ops::ControlFlow, path::Path}; +use std::{ops::ControlFlow, path::Path}; use brk_error::Result; use brk_grouper::{ @@ -10,6 +10,7 @@ use brk_types::{ Bitcoin, CheckedSub, DateIndex, Dollars, HalvingEpoch, Height, Timestamp, Version, }; use derive_deref::{Deref, DerefMut}; +use rustc_hash::FxHashMap; use vecdb::{AnyIterableVec, Database, Exit, Format, StoredIndex}; use crate::{ @@ -1498,7 +1499,7 @@ impl Vecs { pub fn send( &mut self, - height_to_sent: BTreeMap, + height_to_sent: FxHashMap, chain_state: &mut [BlockState], ) { let mut time_based_vecs = self diff --git a/crates/brk_grouper/src/address.rs b/crates/brk_grouper/src/address.rs index 5b8145300..192c47029 100644 --- a/crates/brk_grouper/src/address.rs +++ b/crates/brk_grouper/src/address.rs @@ -1,4 +1,5 @@ use brk_traversable::Traversable; +use rayon::prelude::*; use crate::Filtered; @@ -23,6 +24,13 @@ impl AddressGroups { self.amount_range.iter_mut() } + pub fn par_iter_separate_mut(&mut self) -> impl ParallelIterator + where + T: Send + Sync, + { + self.amount_range.par_iter_mut() + } + pub fn iter_overlapping_mut(&mut self) -> impl Iterator { self.lt_amount.iter_mut().chain(self.ge_amount.iter_mut()) } diff --git a/crates/brk_grouper/src/by_address_type.rs b/crates/brk_grouper/src/by_address_type.rs index 3cd04321e..b17f1856f 100644 --- a/crates/brk_grouper/src/by_address_type.rs +++ b/crates/brk_grouper/src/by_address_type.rs @@ -62,10 +62,12 @@ impl ByAddressType { }) } + #[inline] pub fn get_unwrap(&self, address_type: OutputType) -> &T { self.get(address_type).unwrap() } + #[inline] pub fn get(&self, address_type: OutputType) -> Option<&T> { match address_type { OutputType::P2PK65 => Some(&self.p2pk65), @@ -80,10 +82,12 @@ impl ByAddressType { } } + #[inline] pub fn get_mut_unwrap(&mut self, address_type: OutputType) -> &mut T { self.get_mut(address_type).unwrap() } + #[inline] pub fn get_mut(&mut self, address_type: OutputType) -> Option<&mut T> { match address_type { OutputType::P2PK65 => Some(&mut self.p2pk65), @@ -98,6 +102,7 @@ impl ByAddressType { } } + #[inline] pub fn iter(&self) -> impl Iterator { [ &self.p2pk65, @@ -112,6 +117,7 @@ impl ByAddressType { .into_iter() } + #[inline] pub fn iter_mut(&mut self) -> impl Iterator { [ &mut self.p2pk65, @@ -126,6 +132,7 @@ impl ByAddressType { .into_iter() } + #[inline] pub fn par_iter(&mut self) -> impl ParallelIterator where T: Send + Sync, @@ -143,6 +150,7 @@ impl ByAddressType { .into_par_iter() } + #[inline] pub fn par_iter_mut(&mut self) -> impl ParallelIterator where T: Send + Sync, @@ -160,6 +168,7 @@ impl ByAddressType { .into_par_iter() } + #[inline] pub fn iter_typed(&self) -> impl Iterator { [ (OutputType::P2PK65, &self.p2pk65), @@ -174,6 +183,7 @@ impl ByAddressType { .into_iter() } + #[inline] pub fn into_iter_typed(self) -> impl Iterator { [ (OutputType::P2PK65, self.p2pk65), @@ -188,6 +198,7 @@ impl ByAddressType { .into_iter() } + #[inline] pub fn iter_typed_mut(&mut self) -> impl Iterator { [ (OutputType::P2PK65, &mut self.p2pk65), diff --git a/crates/brk_grouper/src/by_age_range.rs b/crates/brk_grouper/src/by_age_range.rs index d3b1fb069..28fca5f30 100644 --- a/crates/brk_grouper/src/by_age_range.rs +++ b/crates/brk_grouper/src/by_age_range.rs @@ -1,4 +1,5 @@ use brk_traversable::Traversable; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use crate::Filtered; @@ -107,6 +108,35 @@ impl ByAgeRange { ] .into_iter() } + + pub fn par_iter_mut(&mut self) -> impl ParallelIterator + where + T: Send + Sync, + { + [ + &mut self.up_to_1d, + &mut self._1d_to_1w, + &mut self._1w_to_1m, + &mut self._1m_to_2m, + &mut self._2m_to_3m, + &mut self._3m_to_4m, + &mut self._4m_to_5m, + &mut self._5m_to_6m, + &mut self._6m_to_1y, + &mut self._1y_to_2y, + &mut self._2y_to_3y, + &mut self._3y_to_4y, + &mut self._4y_to_5y, + &mut self._5y_to_6y, + &mut self._6y_to_7y, + &mut self._7y_to_8y, + &mut self._8y_to_10y, + &mut self._10y_to_12y, + &mut self._12y_to_15y, + &mut self.from_15y, + ] + .into_par_iter() + } } impl ByAgeRange> { diff --git a/crates/brk_grouper/src/by_amount_range.rs b/crates/brk_grouper/src/by_amount_range.rs index 6edc51ef5..00c007022 100644 --- a/crates/brk_grouper/src/by_amount_range.rs +++ b/crates/brk_grouper/src/by_amount_range.rs @@ -2,6 +2,7 @@ use std::ops::{Add, AddAssign}; use brk_traversable::Traversable; use brk_types::Sats; +use rayon::prelude::*; use super::{Filter, Filtered}; @@ -201,6 +202,30 @@ impl ByAmountRange { ] .into_iter() } + + pub fn par_iter_mut(&mut self) -> impl ParallelIterator + where + T: Send + Sync, + { + [ + &mut self._0sats, + &mut self._1sat_to_10sats, + &mut self._10sats_to_100sats, + &mut self._100sats_to_1k_sats, + &mut self._1k_sats_to_10k_sats, + &mut self._10k_sats_to_100k_sats, + &mut self._100k_sats_to_1m_sats, + &mut self._1m_sats_to_10m_sats, + &mut self._10m_sats_to_1btc, + &mut self._1btc_to_10btc, + &mut self._10btc_to_100btc, + &mut self._100btc_to_1k_btc, + &mut self._1k_btc_to_10k_btc, + &mut self._10k_btc_to_100k_btc, + &mut self._100k_btc_or_more, + ] + .into_par_iter() + } } impl ByAmountRange> { diff --git a/crates/brk_grouper/src/by_epoch.rs b/crates/brk_grouper/src/by_epoch.rs index 090feba5d..3e3cadcbe 100644 --- a/crates/brk_grouper/src/by_epoch.rs +++ b/crates/brk_grouper/src/by_epoch.rs @@ -1,5 +1,6 @@ use brk_traversable::Traversable; use brk_types::{HalvingEpoch, Height}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::{Filter, Filtered}; @@ -36,6 +37,20 @@ impl ByEpoch { .into_iter() } + pub fn par_iter_mut(&mut self) -> impl ParallelIterator + where + T: Send + Sync, + { + [ + &mut self._0, + &mut self._1, + &mut self._2, + &mut self._3, + &mut self._4, + ] + .into_par_iter() + } + pub fn mut_vec_from_height(&mut self, height: Height) -> &mut T { let epoch = HalvingEpoch::from(height); if epoch == HalvingEpoch::new(0) { diff --git a/crates/brk_grouper/src/by_spendable_type.rs b/crates/brk_grouper/src/by_spendable_type.rs index 156ea0859..7a211c406 100644 --- a/crates/brk_grouper/src/by_spendable_type.rs +++ b/crates/brk_grouper/src/by_spendable_type.rs @@ -2,6 +2,7 @@ use std::ops::{Add, AddAssign}; use brk_traversable::Traversable; use brk_types::OutputType; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use super::{Filter, Filtered}; @@ -55,6 +56,26 @@ impl BySpendableType { .into_iter() } + pub fn par_iter_mut(&mut self) -> impl ParallelIterator + where + T: Send + Sync, + { + [ + &mut self.p2pk65, + &mut self.p2pk33, + &mut self.p2pkh, + &mut self.p2ms, + &mut self.p2sh, + &mut self.p2wpkh, + &mut self.p2wsh, + &mut self.p2tr, + &mut self.p2a, + &mut self.unknown, + &mut self.empty, + ] + .into_par_iter() + } + pub fn iter_typed(&self) -> impl Iterator { [ (OutputType::P2PK65, &self.p2pk65), diff --git a/crates/brk_grouper/src/utxo.rs b/crates/brk_grouper/src/utxo.rs index 0684f0575..d13208b6b 100644 --- a/crates/brk_grouper/src/utxo.rs +++ b/crates/brk_grouper/src/utxo.rs @@ -1,4 +1,5 @@ use brk_traversable::Traversable; +use rayon::prelude::*; use crate::{ ByAgeRange, ByAmountRange, ByEpoch, ByGreatEqualAmount, ByLowerThanAmount, ByMaxAge, ByMinAge, @@ -42,6 +43,17 @@ impl UTXOGroups { .chain(self._type.iter_mut()) } + pub fn par_iter_separate_mut(&mut self) -> impl ParallelIterator + where + T: Send + Sync, + { + self.age_range + .par_iter_mut() + .chain(self.epoch.par_iter_mut()) + .chain(self.amount_range.par_iter_mut()) + .chain(self._type.par_iter_mut()) + } + pub fn iter_overlapping_mut(&mut self) -> impl Iterator { [&mut self.all] .into_iter() diff --git a/crates/brk_indexer/BENCH.md b/crates/brk_indexer/BENCH.md new file mode 100644 index 000000000..ee193329c --- /dev/null +++ b/crates/brk_indexer/BENCH.md @@ -0,0 +1,15 @@ +## Bench + +MBP M3 Pro + +0..920750 +Time: 13h 6mn + +RAM: +Peak: ~12GB +After finish: ~8GB +After restart ~6.5GB + +storage: 230GB + +mode: checked diff --git a/crates/brk_indexer/examples/indexer_single_vs_multi.rs b/crates/brk_indexer/examples/indexer_single_vs_multi.rs new file mode 100644 index 000000000..84107ea9f --- /dev/null +++ b/crates/brk_indexer/examples/indexer_single_vs_multi.rs @@ -0,0 +1,772 @@ +use std::{fs, path::Path, thread, time::Instant}; + +use brk_error::Result; +use brk_indexer::Indexer; +use brk_types::TxInIndex; +use rayon::prelude::*; +use vecdb::{GenericStoredVec, StoredIndex}; + +fn main() -> Result<()> { + brk_logger::init(Some(Path::new(".log")))?; + + let outputs_dir = Path::new(&std::env::var("HOME").unwrap()).join(".brk"); + fs::create_dir_all(&outputs_dir)?; + + let indexer = Indexer::forced_import(&outputs_dir)?; + let vecs = indexer.vecs; + + let output_len = vecs.txoutindex_to_value.len_(); + let input_len = vecs.txinindex_to_outpoint.len_(); + dbg!(output_len, input_len); + + // Simulate processing blocks + const NUM_BLOCKS: usize = 10_000; + const OUTPUTS_PER_BLOCK: usize = 5_000; + const INPUTS_PER_BLOCK: usize = 5_000; + const OUTPUT_START_OFFSET: usize = 2_000_000_000; + const INPUT_START_OFFSET: usize = 2_000_000_000; + const NUM_RUNS: usize = 3; + + println!( + "\n=== Running {} iterations of {} blocks ===", + NUM_RUNS, NUM_BLOCKS + ); + println!(" {} outputs per block", OUTPUTS_PER_BLOCK); + println!(" {} inputs per block\n", INPUTS_PER_BLOCK); + + // Store all run times + let mut method1_times = Vec::new(); + let mut method2_times = Vec::new(); + let mut method4_times = Vec::new(); + let mut method5_times = Vec::new(); + let mut method6_times = Vec::new(); + let mut method7_times = Vec::new(); + let mut method8_times = Vec::new(); + + for run in 0..NUM_RUNS { + println!("--- Run {}/{} ---", run + 1, NUM_RUNS); + + // Randomize order for this run + let order = match run % 4 { + 0 => vec![1, 2, 4, 5, 6, 7, 8], + 1 => vec![8, 7, 6, 5, 4, 2, 1], + 2 => vec![2, 5, 8, 1, 7, 4, 6], + _ => vec![6, 4, 7, 1, 8, 5, 2], + }; + + let mut run_times = [std::time::Duration::ZERO; 7]; + + for &method in &order { + match method { + 1 => { + let time = run_method1( + &vecs, + NUM_BLOCKS, + OUTPUTS_PER_BLOCK, + INPUTS_PER_BLOCK, + OUTPUT_START_OFFSET, + INPUT_START_OFFSET, + ); + run_times[0] = time; + } + 2 => { + let time = run_method2( + &vecs, + NUM_BLOCKS, + OUTPUTS_PER_BLOCK, + INPUTS_PER_BLOCK, + OUTPUT_START_OFFSET, + INPUT_START_OFFSET, + ); + run_times[1] = time; + } + 4 => { + let time = run_method4( + &vecs, + NUM_BLOCKS, + OUTPUTS_PER_BLOCK, + INPUTS_PER_BLOCK, + OUTPUT_START_OFFSET, + INPUT_START_OFFSET, + ); + run_times[2] = time; + } + 5 => { + let time = run_method5( + &vecs, + NUM_BLOCKS, + OUTPUTS_PER_BLOCK, + INPUTS_PER_BLOCK, + OUTPUT_START_OFFSET, + INPUT_START_OFFSET, + ); + run_times[3] = time; + } + 6 => { + let time = run_method6( + &vecs, + NUM_BLOCKS, + OUTPUTS_PER_BLOCK, + INPUTS_PER_BLOCK, + OUTPUT_START_OFFSET, + INPUT_START_OFFSET, + ); + run_times[4] = time; + } + 7 => { + let time = run_method7( + &vecs, + NUM_BLOCKS, + OUTPUTS_PER_BLOCK, + INPUTS_PER_BLOCK, + OUTPUT_START_OFFSET, + INPUT_START_OFFSET, + ); + run_times[5] = time; + } + 8 => { + let time = run_method8( + &vecs, + NUM_BLOCKS, + OUTPUTS_PER_BLOCK, + INPUTS_PER_BLOCK, + OUTPUT_START_OFFSET, + INPUT_START_OFFSET, + ); + run_times[6] = time; + } + _ => unreachable!(), + } + } + + method1_times.push(run_times[0]); + method2_times.push(run_times[1]); + method4_times.push(run_times[2]); + method5_times.push(run_times[3]); + method6_times.push(run_times[4]); + method7_times.push(run_times[5]); + method8_times.push(run_times[6]); + + println!(" Method 1: {:?}", run_times[0]); + println!(" Method 2: {:?}", run_times[1]); + println!(" Method 4: {:?}", run_times[2]); + println!(" Method 5: {:?}", run_times[3]); + println!(" Method 6: {:?}", run_times[4]); + println!(" Method 7: {:?}", run_times[5]); + println!(" Method 8: {:?}", run_times[6]); + println!(); + } + + // Calculate statistics + println!("\n=== Statistics over {} runs ===\n", NUM_RUNS); + + let methods = vec![ + ("Method 1 (Parallel Interleaved)", &method1_times), + ( + "Method 2 (Sequential Read + Parallel Process)", + &method2_times, + ), + ( + "Method 4 (Parallel Sequential Reads + Parallel Process)", + &method4_times, + ), + ("Method 5 (Chunked Parallel)", &method5_times), + ("Method 6 (Prefetch)", &method6_times), + ("Method 7 (Reuse Readers)", &method7_times), + ("Method 8 (Bulk Processing)", &method8_times), + ]; + + for (name, times) in &methods { + let avg = times.iter().sum::() / times.len() as u32; + let min = times.iter().min().unwrap(); + let max = times.iter().max().unwrap(); + + println!("{}:", name); + println!(" Average: {:?}", avg); + println!(" Min: {:?}", min); + println!(" Max: {:?}", max); + println!(" Std dev: {:?}", calculate_stddev(times)); + println!(); + } + + // Find overall winner based on average + let averages: Vec<_> = methods + .iter() + .map(|(name, times)| { + let avg = times.iter().sum::() / times.len() as u32; + (*name, avg) + }) + .collect(); + + let fastest = averages.iter().min_by_key(|(_, t)| t).unwrap(); + println!( + "=== Winner (by average): {} - {:?} ===\n", + fastest.0, fastest.1 + ); + + for (name, time) in &averages { + if time != &fastest.1 { + let diff = time.as_secs_f64() / fastest.1.as_secs_f64(); + println!("{} is {:.2}x slower", name, diff); + } + } + + Ok(()) +} + +fn run_method1( + vecs: &brk_indexer::Vecs, + num_blocks: usize, + outputs_per_block: usize, + inputs_per_block: usize, + output_start_offset: usize, + input_start_offset: usize, +) -> std::time::Duration { + let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); + let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader(); + let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader(); + let txinindex_to_outpoint_reader = vecs.txinindex_to_outpoint.create_reader(); + let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader(); + + let start_time = Instant::now(); + + for block_idx in 0..num_blocks { + // Process outputs + let block_start = output_start_offset + (block_idx * outputs_per_block); + + let _outputs: Vec<_> = (block_start..(block_start + outputs_per_block)) + .into_par_iter() + .map(|i| { + ( + vecs.txoutindex_to_value + .unwrap_read_(i, &txoutindex_to_value_reader), + vecs.txoutindex_to_outputtype + .unwrap_read_(i, &txoutindex_to_outputtype_reader), + vecs.txoutindex_to_typeindex + .unwrap_read_(i, &txoutindex_to_typeindex_reader), + ) + }) + .collect(); + + // Process inputs + let input_block_start = input_start_offset + (block_idx * inputs_per_block); + + let input_sum: u64 = (input_block_start..(input_block_start + inputs_per_block)) + .into_par_iter() + .filter_map(|i| { + let outpoint = vecs + .txinindex_to_outpoint + .unwrap_read_(i, &txinindex_to_outpoint_reader); + + if outpoint.is_coinbase() { + return None; + } + + let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_( + outpoint.txindex().to_usize(), + &txindex_to_first_txoutindex_reader, + ); + let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout()); + let value = vecs + .txoutindex_to_value + .unwrap_read_(txoutindex, &txoutindex_to_value_reader); + Some(u64::from(value)) + }) + .sum(); + + std::hint::black_box(input_sum); + } + + start_time.elapsed() +} + +fn run_method2( + vecs: &brk_indexer::Vecs, + num_blocks: usize, + outputs_per_block: usize, + inputs_per_block: usize, + output_start_offset: usize, + input_start_offset: usize, +) -> std::time::Duration { + let start_time = Instant::now(); + + for block_idx in 0..num_blocks { + // Process outputs + let block_start = brk_types::TxOutIndex::new( + (output_start_offset + (block_idx * outputs_per_block)) as u64, + ); + + let values: Vec<_> = vecs + .txoutindex_to_value + .iter_at(block_start) + .take(outputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect(); + + let output_types: Vec<_> = vecs + .txoutindex_to_outputtype + .iter_at(block_start) + .take(outputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect(); + + let typeindexes: Vec<_> = vecs + .txoutindex_to_typeindex + .iter_at(block_start) + .take(outputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect(); + + let _outputs: Vec<_> = (0..outputs_per_block) + .into_par_iter() + .map(|i| (values[i], output_types[i], typeindexes[i])) + .collect(); + + // Process inputs + let input_block_start = + TxInIndex::new((input_start_offset + (block_idx * inputs_per_block)) as u64); + + let outpoints: Vec<_> = vecs + .txinindex_to_outpoint + .iter_at(input_block_start) + .take(inputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect(); + + let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader(); + let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); + + let input_sum: u64 = (0..outpoints.len()) + .into_par_iter() + .filter_map(|i| { + let outpoint = outpoints[i]; + + if outpoint.is_coinbase() { + return None; + } + + let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_( + outpoint.txindex().to_usize(), + &txindex_to_first_txoutindex_reader, + ); + let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout()); + let value = vecs + .txoutindex_to_value + .unwrap_read_(txoutindex, &txoutindex_to_value_reader); + Some(u64::from(value)) + }) + .sum(); + + std::hint::black_box(input_sum); + } + + start_time.elapsed() +} + +fn run_method4( + vecs: &brk_indexer::Vecs, + num_blocks: usize, + outputs_per_block: usize, + inputs_per_block: usize, + output_start_offset: usize, + input_start_offset: usize, +) -> std::time::Duration { + let start_time = Instant::now(); + + for block_idx in 0..num_blocks { + // Process outputs with parallel reads + let block_start = brk_types::TxOutIndex::new( + (output_start_offset + (block_idx * outputs_per_block)) as u64, + ); + + let (values, output_types, typeindexes) = thread::scope(|s| { + let h1 = s.spawn(|| { + vecs.txoutindex_to_value + .iter_at(block_start) + .take(outputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect::>() + }); + + let h2 = s.spawn(|| { + vecs.txoutindex_to_outputtype + .iter_at(block_start) + .take(outputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect::>() + }); + + let h3 = s.spawn(|| { + vecs.txoutindex_to_typeindex + .iter_at(block_start) + .take(outputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect::>() + }); + + (h1.join().unwrap(), h2.join().unwrap(), h3.join().unwrap()) + }); + + let _outputs: Vec<_> = (0..outputs_per_block) + .into_par_iter() + .map(|i| (values[i], output_types[i], typeindexes[i])) + .collect(); + + // Process inputs + let input_block_start = + TxInIndex::new((input_start_offset + (block_idx * inputs_per_block)) as u64); + + let outpoints: Vec<_> = vecs + .txinindex_to_outpoint + .iter_at(input_block_start) + .take(inputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect(); + + let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader(); + let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); + + let input_sum: u64 = (0..outpoints.len()) + .into_par_iter() + .filter_map(|i| { + let outpoint = outpoints[i]; + + if outpoint.is_coinbase() { + return None; + } + + let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_( + outpoint.txindex().to_usize(), + &txindex_to_first_txoutindex_reader, + ); + let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout()); + let value = vecs + .txoutindex_to_value + .unwrap_read_(txoutindex, &txoutindex_to_value_reader); + Some(u64::from(value)) + }) + .sum(); + + std::hint::black_box(input_sum); + } + + start_time.elapsed() +} + +fn run_method5( + vecs: &brk_indexer::Vecs, + num_blocks: usize, + outputs_per_block: usize, + inputs_per_block: usize, + output_start_offset: usize, + input_start_offset: usize, +) -> std::time::Duration { + let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); + let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader(); + let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader(); + let txinindex_to_outpoint_reader = vecs.txinindex_to_outpoint.create_reader(); + let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader(); + + let start_time = Instant::now(); + + for block_idx in 0..num_blocks { + // Process outputs with larger chunks + let block_start = output_start_offset + (block_idx * outputs_per_block); + + let _outputs: Vec<_> = (block_start..(block_start + outputs_per_block)) + .into_par_iter() + .with_min_len(500) // Larger chunks + .map(|i| { + ( + vecs.txoutindex_to_value + .unwrap_read_(i, &txoutindex_to_value_reader), + vecs.txoutindex_to_outputtype + .unwrap_read_(i, &txoutindex_to_outputtype_reader), + vecs.txoutindex_to_typeindex + .unwrap_read_(i, &txoutindex_to_typeindex_reader), + ) + }) + .collect(); + + // Process inputs with larger chunks + let input_block_start = input_start_offset + (block_idx * inputs_per_block); + + let input_sum: u64 = (input_block_start..(input_block_start + inputs_per_block)) + .into_par_iter() + .with_min_len(500) // Larger chunks + .filter_map(|i| { + let outpoint = vecs + .txinindex_to_outpoint + .unwrap_read_(i, &txinindex_to_outpoint_reader); + + if outpoint.is_coinbase() { + return None; + } + + let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_( + outpoint.txindex().to_usize(), + &txindex_to_first_txoutindex_reader, + ); + let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout()); + let value = vecs + .txoutindex_to_value + .unwrap_read_(txoutindex, &txoutindex_to_value_reader); + Some(u64::from(value)) + }) + .sum(); + + std::hint::black_box(input_sum); + } + + start_time.elapsed() +} + +fn run_method6( + vecs: &brk_indexer::Vecs, + num_blocks: usize, + outputs_per_block: usize, + inputs_per_block: usize, + output_start_offset: usize, + input_start_offset: usize, +) -> std::time::Duration { + let start_time = Instant::now(); + + for block_idx in 0..num_blocks { + // Read outputs sequentially + let block_start = brk_types::TxOutIndex::new( + (output_start_offset + (block_idx * outputs_per_block)) as u64, + ); + + let values: Vec<_> = vecs + .txoutindex_to_value + .iter_at(block_start) + .take(outputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect(); + + let output_types: Vec<_> = vecs + .txoutindex_to_outputtype + .iter_at(block_start) + .take(outputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect(); + + let typeindexes: Vec<_> = vecs + .txoutindex_to_typeindex + .iter_at(block_start) + .take(outputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect(); + + // Read inputs sequentially + let input_block_start = + TxInIndex::new((input_start_offset + (block_idx * inputs_per_block)) as u64); + + let outpoints: Vec<_> = vecs + .txinindex_to_outpoint + .iter_at(input_block_start) + .take(inputs_per_block) + .map(|(_, v)| v.into_owned()) + .collect(); + + let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader(); + let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); + + // Prefetch all first_txoutindexes in parallel + let first_txoutindexes: Vec> = outpoints + .par_iter() + .map(|op| { + if op.is_coinbase() { + return None; + } + Some( + vecs.txindex_to_first_txoutindex + .unwrap_read_(op.txindex().to_usize(), &txindex_to_first_txoutindex_reader), + ) + }) + .collect(); + + // Then read values in parallel + let input_sum: u64 = outpoints + .par_iter() + .zip(first_txoutindexes.par_iter()) + .filter_map(|(op, first_opt)| { + let first_txoutindex = first_opt.as_ref()?; + let txoutindex = first_txoutindex.to_usize() + usize::from(op.vout()); + let value = vecs + .txoutindex_to_value + .unwrap_read_(txoutindex, &txoutindex_to_value_reader); + Some(u64::from(value)) + }) + .sum(); + + let _outputs: Vec<_> = (0..outputs_per_block) + .into_par_iter() + .map(|i| (values[i], output_types[i], typeindexes[i])) + .collect(); + + std::hint::black_box(input_sum); + } + + start_time.elapsed() +} + +fn run_method7( + vecs: &brk_indexer::Vecs, + num_blocks: usize, + outputs_per_block: usize, + inputs_per_block: usize, + output_start_offset: usize, + input_start_offset: usize, +) -> std::time::Duration { + // Create readers ONCE outside loop + let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); + let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader(); + let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader(); + let txinindex_to_outpoint_reader = vecs.txinindex_to_outpoint.create_reader(); + let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader(); + + let start_time = Instant::now(); + + for block_idx in 0..num_blocks { + let block_start = output_start_offset + (block_idx * outputs_per_block); + + let _outputs: Vec<_> = (block_start..(block_start + outputs_per_block)) + .into_par_iter() + .map(|i| { + ( + vecs.txoutindex_to_value + .unwrap_read_(i, &txoutindex_to_value_reader), + vecs.txoutindex_to_outputtype + .unwrap_read_(i, &txoutindex_to_outputtype_reader), + vecs.txoutindex_to_typeindex + .unwrap_read_(i, &txoutindex_to_typeindex_reader), + ) + }) + .collect(); + + let input_block_start = input_start_offset + (block_idx * inputs_per_block); + + let input_sum: u64 = (input_block_start..(input_block_start + inputs_per_block)) + .into_par_iter() + .filter_map(|i| { + let outpoint = vecs + .txinindex_to_outpoint + .unwrap_read_(i, &txinindex_to_outpoint_reader); + + if outpoint.is_coinbase() { + return None; + } + + let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_( + outpoint.txindex().to_usize(), + &txindex_to_first_txoutindex_reader, + ); + let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout()); + let value = vecs + .txoutindex_to_value + .unwrap_read_(txoutindex, &txoutindex_to_value_reader); + Some(u64::from(value)) + }) + .sum(); + + std::hint::black_box(input_sum); + } + + start_time.elapsed() +} + +fn run_method8( + vecs: &brk_indexer::Vecs, + num_blocks: usize, + outputs_per_block: usize, + inputs_per_block: usize, + output_start_offset: usize, + input_start_offset: usize, +) -> std::time::Duration { + let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader(); + let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader(); + let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader(); + let txinindex_to_outpoint_reader = vecs.txinindex_to_outpoint.create_reader(); + let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader(); + + const BULK_SIZE: usize = 64; + + let start_time = Instant::now(); + + for block_idx in 0..num_blocks { + let block_start = output_start_offset + (block_idx * outputs_per_block); + + // Process outputs in bulk chunks + let _outputs: Vec<_> = (0..outputs_per_block) + .collect::>() + .par_chunks(BULK_SIZE) + .flat_map(|chunk| { + chunk + .iter() + .map(|&offset| { + let i = block_start + offset; + ( + vecs.txoutindex_to_value + .unwrap_read_(i, &txoutindex_to_value_reader), + vecs.txoutindex_to_outputtype + .unwrap_read_(i, &txoutindex_to_outputtype_reader), + vecs.txoutindex_to_typeindex + .unwrap_read_(i, &txoutindex_to_typeindex_reader), + ) + }) + .collect::>() + }) + .collect(); + + // Process inputs in bulk chunks + let input_block_start = input_start_offset + (block_idx * inputs_per_block); + + let input_sum: u64 = (0..inputs_per_block) + .collect::>() + .par_chunks(BULK_SIZE) + .flat_map(|chunk| { + chunk + .iter() + .filter_map(|&offset| { + let i = input_block_start + offset; + let outpoint = vecs + .txinindex_to_outpoint + .unwrap_read_(i, &txinindex_to_outpoint_reader); + + if outpoint.is_coinbase() { + return None; + } + + let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_( + outpoint.txindex().to_usize(), + &txindex_to_first_txoutindex_reader, + ); + let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout()); + let value = vecs + .txoutindex_to_value + .unwrap_read_(txoutindex, &txoutindex_to_value_reader); + Some(u64::from(value)) + }) + .collect::>() + }) + .sum(); + + std::hint::black_box(input_sum); + } + + start_time.elapsed() +} + +fn calculate_stddev(times: &[std::time::Duration]) -> std::time::Duration { + let avg = times.iter().sum::().as_secs_f64() / times.len() as f64; + let variance = times + .iter() + .map(|t| { + let diff = t.as_secs_f64() - avg; + diff * diff + }) + .sum::() + / times.len() as f64; + std::time::Duration::from_secs_f64(variance.sqrt()) +} diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index ae23698ac..c3e71fbeb 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -28,9 +28,9 @@ pub use vecs::*; // One version for all data sources // Increment on **change _OR_ addition** -const VERSION: Version = Version::new(22); +const VERSION: Version = Version::new(23); const SNAPSHOT_BLOCK_RANGE: usize = 1_000; -const COLLISIONS_CHECKED_UP_TO: Height = Height::new(0); +const COLLISIONS_CHECKED_UP_TO: Height = Height::new(920_000); #[derive(Clone)] pub struct Indexer { @@ -474,94 +474,49 @@ impl Indexer { FxHashMap::default(); let mut same_block_output_info: FxHashMap = FxHashMap::default(); - txouts - .into_iter() - .try_for_each(|data| -> Result<()> { - let ( - txoutindex, - txout, - txindex, - vout, - outputtype, - addressbytes_opt, - typeindex_opt, - ) = data; + for (txoutindex, txout, txindex, vout, outputtype, addressbytes_opt, typeindex_opt) in + txouts + { + let sats = Sats::from(txout.value); - let sats = Sats::from(txout.value); + if vout.is_zero() { + vecs.txindex_to_first_txoutindex + .push_if_needed(txindex, txoutindex)?; + } - if vout.is_zero() { - vecs.txindex_to_first_txoutindex - .push_if_needed(txindex, txoutindex)?; - } + vecs.txoutindex_to_value.push_if_needed(txoutindex, sats)?; - vecs.txoutindex_to_value.push_if_needed(txoutindex, sats)?; + vecs.txoutindex_to_txindex + .push_if_needed(txoutindex, txindex)?; - vecs.txoutindex_to_txindex - .push_if_needed(txoutindex, txindex)?; + vecs.txoutindex_to_outputtype + .push_if_needed(txoutindex, outputtype)?; - vecs.txoutindex_to_outputtype - .push_if_needed(txoutindex, outputtype)?; - - let typeindex = if let Some(ti) = typeindex_opt { + let typeindex = if let Some(ti) = typeindex_opt { + ti + } else if let Some((address_bytes, address_hash)) = addressbytes_opt { + if let Some(&ti) = already_added_addressbyteshash.get(&address_hash) { ti - } else if let Some((address_bytes, address_hash)) = addressbytes_opt { - if let Some(&ti) = already_added_addressbyteshash.get(&address_hash) { - ti - } else { - let ti = match outputtype { - OutputType::P2PK65 => indexes.p2pk65addressindex.copy_then_increment(), - OutputType::P2PK33 => indexes.p2pk33addressindex.copy_then_increment(), - OutputType::P2PKH => indexes.p2pkhaddressindex.copy_then_increment(), - OutputType::P2MS => { - vecs.p2msoutputindex_to_txindex - .push_if_needed(indexes.p2msoutputindex, txindex)?; - indexes.p2msoutputindex.copy_then_increment() - } - OutputType::P2SH => indexes.p2shaddressindex.copy_then_increment(), - OutputType::OpReturn => { - vecs.opreturnindex_to_txindex - .push_if_needed(indexes.opreturnindex, txindex)?; - indexes.opreturnindex.copy_then_increment() - } - OutputType::P2WPKH => indexes.p2wpkhaddressindex.copy_then_increment(), - OutputType::P2WSH => indexes.p2wshaddressindex.copy_then_increment(), - OutputType::P2TR => indexes.p2traddressindex.copy_then_increment(), - OutputType::P2A => indexes.p2aaddressindex.copy_then_increment(), - OutputType::Empty => { - vecs.emptyoutputindex_to_txindex - .push_if_needed(indexes.emptyoutputindex, txindex)?; - indexes.emptyoutputindex.copy_then_increment() - } - OutputType::Unknown => { - vecs.unknownoutputindex_to_txindex - .push_if_needed(indexes.unknownoutputindex, txindex)?; - indexes.unknownoutputindex.copy_then_increment() - } - _ => unreachable!(), - }; - - already_added_addressbyteshash.insert(address_hash, ti); - stores.addressbyteshash_to_typeindex.insert_if_needed( - address_hash, - ti, - height, - ); - vecs.push_bytes_if_needed(ti, address_bytes)?; - - ti - } } else { - match outputtype { + let ti = match outputtype { + OutputType::P2PK65 => indexes.p2pk65addressindex.copy_then_increment(), + OutputType::P2PK33 => indexes.p2pk33addressindex.copy_then_increment(), + OutputType::P2PKH => indexes.p2pkhaddressindex.copy_then_increment(), OutputType::P2MS => { vecs.p2msoutputindex_to_txindex .push_if_needed(indexes.p2msoutputindex, txindex)?; indexes.p2msoutputindex.copy_then_increment() } + OutputType::P2SH => indexes.p2shaddressindex.copy_then_increment(), OutputType::OpReturn => { vecs.opreturnindex_to_txindex .push_if_needed(indexes.opreturnindex, txindex)?; indexes.opreturnindex.copy_then_increment() } + OutputType::P2WPKH => indexes.p2wpkhaddressindex.copy_then_increment(), + OutputType::P2WSH => indexes.p2wshaddressindex.copy_then_increment(), + OutputType::P2TR => indexes.p2traddressindex.copy_then_increment(), + OutputType::P2A => indexes.p2aaddressindex.copy_then_increment(), OutputType::Empty => { vecs.emptyoutputindex_to_txindex .push_if_needed(indexes.emptyoutputindex, txindex)?; @@ -573,217 +528,203 @@ impl Indexer { indexes.unknownoutputindex.copy_then_increment() } _ => unreachable!(), - } - }; + }; - vecs.txoutindex_to_typeindex - .push_if_needed(txoutindex, typeindex)?; + already_added_addressbyteshash.insert(address_hash, ti); + stores.addressbyteshash_to_typeindex.insert_if_needed( + address_hash, + ti, + height, + ); + vecs.push_bytes_if_needed(ti, address_bytes)?; - if outputtype.is_unspendable() { - return Ok(()); - } else if outputtype.is_address() { - stores - .addresstype_to_typeindex_and_txindex - .get_mut(outputtype) - .unwrap() - .insert_if_needed( - TypeIndexAndTxIndex::from((typeindex, txindex)), - Unit, - height, - ); + ti } - - let outpoint = OutPoint::new(txindex, vout); - - if !same_block_spent_outpoints.contains(&outpoint) { - if outputtype.is_address() { - stores - .addresstype_to_typeindex_and_unspentoutpoint - .get_mut(outputtype) - .unwrap() - .insert_if_needed( - TypeIndexAndOutPoint::from((typeindex, outpoint)), - Unit, - height, - ); + } else { + match outputtype { + OutputType::P2MS => { + vecs.p2msoutputindex_to_txindex + .push_if_needed(indexes.p2msoutputindex, txindex)?; + indexes.p2msoutputindex.copy_then_increment() } - } else { - same_block_output_info.insert(outpoint, (outputtype, typeindex)); + OutputType::OpReturn => { + vecs.opreturnindex_to_txindex + .push_if_needed(indexes.opreturnindex, txindex)?; + indexes.opreturnindex.copy_then_increment() + } + OutputType::Empty => { + vecs.emptyoutputindex_to_txindex + .push_if_needed(indexes.emptyoutputindex, txindex)?; + indexes.emptyoutputindex.copy_then_increment() + } + OutputType::Unknown => { + vecs.unknownoutputindex_to_txindex + .push_if_needed(indexes.unknownoutputindex, txindex)?; + indexes.unknownoutputindex.copy_then_increment() + } + _ => unreachable!(), } + }; - Ok(()) - })?; + vecs.txoutindex_to_typeindex + .push_if_needed(txoutindex, typeindex)?; + + if outputtype.is_unspendable() { + continue; + } else if outputtype.is_address() { + stores + .addresstype_to_typeindex_and_txindex + .get_mut(outputtype) + .unwrap() + .insert_if_needed( + TypeIndexAndTxIndex::from((typeindex, txindex)), + Unit, + height, + ); + } + + let outpoint = OutPoint::new(txindex, vout); + + if same_block_spent_outpoints.contains(&outpoint) { + same_block_output_info.insert(outpoint, (outputtype, typeindex)); + } else if outputtype.is_address() { + stores + .addresstype_to_typeindex_and_unspentoutpoint + .get_mut(outputtype) + .unwrap() + .insert_if_needed( + TypeIndexAndOutPoint::from((typeindex, outpoint)), + Unit, + height, + ); + } + } // println!( // "txouts.into_iter() = : {:?}", // i.elapsed() // ); // let i = Instant::now(); - txins - .into_iter() - .map( - #[allow(clippy::type_complexity)] - |(txinindex, input_source)| -> Result<( - TxInIndex, - Vin, - TxIndex, - OutPoint, - Option<(OutputType, TypeIndex)>, - )> { - if let InputSource::PreviousBlock(( - vin, - txindex, - outpoint, - outputtype_typeindex_opt, - )) = input_source - { - return Ok(( - txinindex, - vin, - txindex, - outpoint, - outputtype_typeindex_opt, - )); - } - - let InputSource::SameBlock((txindex, txin, vin, outpoint)) = input_source - else { - unreachable!() - }; - - let mut tuple = (txinindex, vin, txindex, outpoint, None); - + for (txinindex, input_source) in txins { + let (vin, txindex, outpoint, outputtype_typeindex_opt) = match input_source { + InputSource::PreviousBlock(tuple) => tuple, + InputSource::SameBlock((txindex, txin, vin, outpoint)) => { + let mut tuple = (vin, txindex, outpoint, None); if outpoint.is_coinbase() { - return Ok(tuple); + tuple + } else { + let outputtype_typeindex = same_block_output_info + .remove(&outpoint) + .ok_or(Error::Str("should have found addressindex from same block")) + .inspect_err(|_| { + dbg!(&same_block_output_info, txin); + })?; + if outputtype_typeindex.0.is_address() { + tuple.3 = Some(outputtype_typeindex); + } + (tuple.0, tuple.1, tuple.2, tuple.3) } - - let outputtype_typeindex = same_block_output_info - .remove(&outpoint) - .ok_or(Error::Str("should have found addressindex from same block")) - .inspect_err(|_| { - dbg!(&same_block_output_info, txin); - })?; - - if outputtype_typeindex.0.is_address() { - tuple.4 = Some(outputtype_typeindex); - } - - Ok(tuple) - }, - ) - .try_for_each(|res| -> Result<()> { - let (txinindex, vin, txindex, outpoint, outputtype_typeindex_opt) = res?; - - if vin.is_zero() { - vecs.txindex_to_first_txinindex - .push_if_needed(txindex, txinindex)?; } + }; - vecs.txinindex_to_outpoint - .push_if_needed(txinindex, outpoint)?; + if vin.is_zero() { + vecs.txindex_to_first_txinindex + .push_if_needed(txindex, txinindex)?; + } - let Some((outputtype, typeindex)) = outputtype_typeindex_opt else { - return Ok(()); - }; + vecs.txinindex_to_outpoint + .push_if_needed(txinindex, outpoint)?; - stores - .addresstype_to_typeindex_and_txindex - .get_mut_unwrap(outputtype) - .insert_if_needed( - TypeIndexAndTxIndex::from((typeindex, txindex)), - Unit, - height, - ); + let Some((outputtype, typeindex)) = outputtype_typeindex_opt else { + continue; + }; - stores - .addresstype_to_typeindex_and_unspentoutpoint - .get_mut_unwrap(outputtype) - .remove_if_needed( - TypeIndexAndOutPoint::from((typeindex, outpoint)), - height, - ); + stores + .addresstype_to_typeindex_and_txindex + .get_mut_unwrap(outputtype) + .insert_if_needed( + TypeIndexAndTxIndex::from((typeindex, txindex)), + Unit, + height, + ); - Ok(()) - })?; + stores + .addresstype_to_typeindex_and_unspentoutpoint + .get_mut_unwrap(outputtype) + .remove_if_needed(TypeIndexAndOutPoint::from((typeindex, outpoint)), height); + } // println!("txins.into_iter(): {:?}", i.elapsed()); // let i = Instant::now(); if check_collisions { let mut txindex_to_txid_iter = vecs.txindex_to_txid.into_iter(); - txs.iter() - .try_for_each(|(txindex, _, _, _, prev_txindex_opt)| -> Result<()> { - let Some(prev_txindex) = prev_txindex_opt else { - return Ok(()); - }; + for (txindex, _, _, _, prev_txindex_opt) in txs.iter() { + let Some(prev_txindex) = prev_txindex_opt else { + continue; + }; - // In case if we start at an already parsed height - if txindex == prev_txindex { - return Ok(()); - } + // In case if we start at an already parsed height + if txindex == prev_txindex { + continue; + } - let len = vecs.txindex_to_txid.len(); - // Ok if `get` is not par as should happen only twice - let prev_txid = txindex_to_txid_iter - .get(*prev_txindex) - .ok_or(Error::Str("To have txid for txindex")) - .inspect_err(|_| { - dbg!(txindex, len); - })?; + let len = vecs.txindex_to_txid.len(); + // Ok if `get` is not par as should happen only twice + let prev_txid = txindex_to_txid_iter + .get(*prev_txindex) + .ok_or(Error::Str("To have txid for txindex")) + .inspect_err(|_| { + dbg!(txindex, len); + })?; - let prev_txid = prev_txid.as_ref(); + let prev_txid = prev_txid.as_ref(); - // If another Txid needs to be added to the list - // We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated - let only_known_dup_txids = [ - bitcoin::Txid::from_str( - "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599", - ) - .unwrap() - .into(), - bitcoin::Txid::from_str( - "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468", - ) - .unwrap() - .into(), - ]; + // If another Txid needs to be added to the list + // We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated + let only_known_dup_txids = [ + bitcoin::Txid::from_str( + "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599", + ) + .unwrap() + .into(), + bitcoin::Txid::from_str( + "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468", + ) + .unwrap() + .into(), + ]; - let is_dup = only_known_dup_txids.contains(prev_txid); + let is_dup = only_known_dup_txids.contains(prev_txid); - if !is_dup { - dbg!(height, txindex, prev_txid, prev_txindex); - return Err(Error::Str("Expect none")); - } - - Ok(()) - })?; + if !is_dup { + dbg!(height, txindex, prev_txid, prev_txindex); + return Err(Error::Str("Expect none")); + } + } } // println!("txindex_to_tx_and_txid = : {:?}", i.elapsed()); // let i = Instant::now(); - txs.into_iter().try_for_each( - |(txindex, tx, txid, txid_prefix, prev_txindex_opt)| -> Result<()> { - if prev_txindex_opt.is_none() { - stores - .txidprefix_to_txindex - .insert_if_needed(txid_prefix, txindex, height); - } + for (txindex, tx, txid, txid_prefix, prev_txindex_opt) in txs { + if prev_txindex_opt.is_none() { + stores + .txidprefix_to_txindex + .insert_if_needed(txid_prefix, txindex, height); + } - vecs.txindex_to_height.push_if_needed(txindex, height)?; - vecs.txindex_to_txversion - .push_if_needed(txindex, tx.version.into())?; - vecs.txindex_to_txid.push_if_needed(txindex, txid)?; - vecs.txindex_to_rawlocktime - .push_if_needed(txindex, tx.lock_time.into())?; - vecs.txindex_to_base_size - .push_if_needed(txindex, tx.base_size().into())?; - vecs.txindex_to_total_size - .push_if_needed(txindex, tx.total_size().into())?; - vecs.txindex_to_is_explicitly_rbf - .push_if_needed(txindex, StoredBool::from(tx.is_explicitly_rbf()))?; - - Ok(()) - }, - )?; + vecs.txindex_to_height.push_if_needed(txindex, height)?; + vecs.txindex_to_txversion + .push_if_needed(txindex, tx.version.into())?; + vecs.txindex_to_txid.push_if_needed(txindex, txid)?; + vecs.txindex_to_rawlocktime + .push_if_needed(txindex, tx.lock_time.into())?; + vecs.txindex_to_base_size + .push_if_needed(txindex, tx.base_size().into())?; + vecs.txindex_to_total_size + .push_if_needed(txindex, tx.total_size().into())?; + vecs.txindex_to_is_explicitly_rbf + .push_if_needed(txindex, StoredBool::from(tx.is_explicitly_rbf()))?; + } // println!("txindex_to_tx_and_txid.into_iter(): {:?}", i.elapsed()); indexes.txindex += TxIndex::from(tx_len); diff --git a/crates/brk_indexer/src/vecs.rs b/crates/brk_indexer/src/vecs.rs index f9c54b429..10c1df7b6 100644 --- a/crates/brk_indexer/src/vecs.rs +++ b/crates/brk_indexer/src/vecs.rs @@ -59,7 +59,7 @@ pub struct Vecs { pub txindex_to_total_size: CompressedVec, pub txindex_to_txid: RawVec, pub txindex_to_txversion: CompressedVec, - pub txinindex_to_outpoint: RawVec, + pub txinindex_to_outpoint: CompressedVec, pub txoutindex_to_outputtype: RawVec, pub txoutindex_to_txindex: CompressedVec, pub txoutindex_to_typeindex: RawVec, @@ -181,7 +181,7 @@ impl Vecs { txindex_to_total_size: CompressedVec::forced_import(&db, "total_size", version)?, txindex_to_txid: RawVec::forced_import(&db, "txid", version)?, txindex_to_txversion: CompressedVec::forced_import(&db, "txversion", version)?, - txinindex_to_outpoint: RawVec::forced_import(&db, "outpoint", version)?, + txinindex_to_outpoint: CompressedVec::forced_import(&db, "outpoint", version)?, txoutindex_to_outputtype: RawVec::forced_import(&db, "outputtype", version)?, txoutindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?, txoutindex_to_typeindex: RawVec::forced_import(&db, "typeindex", version)?, diff --git a/crates/brk_mcp/src/lib.rs b/crates/brk_mcp/src/lib.rs index f59e9d28f..ae453ca7c 100644 --- a/crates/brk_mcp/src/lib.rs +++ b/crates/brk_mcp/src/lib.rs @@ -1,6 +1,6 @@ #![doc = include_str!("../README.md")] -use brk_query::{PaginatedIndexParam, PaginationParam, Params, Query}; +use brk_query::{AsyncQuery, PaginatedIndexParam, PaginationParam, Params}; use brk_rmcp::{ ErrorData as McpError, RoleServer, ServerHandler, handler::server::{router::tool::ToolRouter, wrapper::Parameters}, @@ -16,7 +16,7 @@ pub mod route; #[derive(Clone)] pub struct MCP { - query: Query, + query: AsyncQuery, tool_router: ToolRouter, } @@ -24,7 +24,7 @@ const VERSION: &str = env!("CARGO_PKG_VERSION"); #[tool_router] impl MCP { - pub fn new(query: &Query) -> Self { + pub fn new(query: &AsyncQuery) -> Self { Self { query: query.clone(), tool_router: Self::tool_router(), diff --git a/crates/brk_mcp/src/route.rs b/crates/brk_mcp/src/route.rs index 9d4f87dc5..347c98ef2 100644 --- a/crates/brk_mcp/src/route.rs +++ b/crates/brk_mcp/src/route.rs @@ -1,5 +1,5 @@ use aide::axum::ApiRouter; -use brk_query::Query; +use brk_query::AsyncQuery; use brk_rmcp::transport::{ StreamableHttpServerConfig, streamable_http_server::{StreamableHttpService, session::local::LocalSessionManager}, @@ -10,14 +10,14 @@ use log::info; use crate::MCP; pub trait MCPRoutes { - fn add_mcp_routes(self, query: &Query, mcp: bool) -> Self; + fn add_mcp_routes(self, query: &AsyncQuery, mcp: bool) -> Self; } impl MCPRoutes for ApiRouter where T: Clone + Send + Sync + 'static, { - fn add_mcp_routes(self, query: &Query, mcp: bool) -> Self { + fn add_mcp_routes(self, query: &AsyncQuery, mcp: bool) -> Self { if !mcp { return self; } diff --git a/crates/brk_query/Cargo.toml b/crates/brk_query/Cargo.toml index 464084171..14a19c9dc 100644 --- a/crates/brk_query/Cargo.toml +++ b/crates/brk_query/Cargo.toml @@ -9,6 +9,9 @@ repository.workspace = true rust-version.workspace = true build = "build.rs" +[features] +tokio = ["dep:tokio"] + [dependencies] bitcoin = { workspace = true } brk_computer = { workspace = true } @@ -24,4 +27,5 @@ quickmatch = "0.1.8" schemars = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +tokio = { workspace = true, optional = true } vecdb = { workspace = true } diff --git a/crates/brk_query/src/async.rs b/crates/brk_query/src/async.rs new file mode 100644 index 000000000..d6b1036af --- /dev/null +++ b/crates/brk_query/src/async.rs @@ -0,0 +1,18 @@ +// Should be async +// anything related to IO should use +// +// Sync function +// fn get(db: &CandyStore, key: &str) -> Option> { +// db.get(key).ok().flatten() +// } + +use crate::Query; + +// // Async function +// async fn get_async(db: Arc, key: String) -> Option> { +// tokio::task::spawn_blocking(move || { +// db.get(&key).ok().flatten() +// }).await.ok()? +// } +#[derive(Clone)] +pub struct AsyncQuery(Query); diff --git a/crates/brk_query/src/lib.rs b/crates/brk_query/src/lib.rs index 781776d0e..ecc588574 100644 --- a/crates/brk_query/src/lib.rs +++ b/crates/brk_query/src/lib.rs @@ -13,6 +13,7 @@ use brk_types::{ }; use vecdb::{AnyCollectableVec, AnyStoredVec}; +mod r#async; mod chain; mod deser; mod output; @@ -20,6 +21,7 @@ mod pagination; mod params; mod vecs; +pub use r#async::*; pub use output::{Output, Value}; pub use pagination::{PaginatedIndexParam, PaginatedMetrics, PaginationParam}; pub use params::{Params, ParamsDeprec, ParamsOpt}; diff --git a/crates/brk_server/src/lib.rs b/crates/brk_server/src/lib.rs index f41b1ab6a..6a424506c 100644 --- a/crates/brk_server/src/lib.rs +++ b/crates/brk_server/src/lib.rs @@ -19,7 +19,7 @@ use axum::{ use brk_error::Result; use brk_logger::OwoColorize; use brk_mcp::route::MCPRoutes; -use brk_query::Query; +use brk_query::AsyncQuery; use files::FilesRoutes; use log::{error, info}; use quick_cache::sync::Cache; @@ -31,19 +31,18 @@ mod api; mod extended; mod files; +use api::*; use extended::*; -use crate::api::create_openapi; - #[derive(Clone)] pub struct AppState { - query: Query, + query: AsyncQuery, path: Option, cache: Arc>, } impl Deref for AppState { - type Target = Query; + type Target = AsyncQuery; fn deref(&self) -> &Self::Target { &self.query } @@ -54,7 +53,7 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION"); pub struct Server(AppState); impl Server { - pub fn new(query: &Query, files_path: Option) -> Self { + pub fn new(query: &AsyncQuery, files_path: Option) -> Self { Self(AppState { query: query.clone(), path: files_path, diff --git a/crates/brk_store/Cargo.toml b/crates/brk_store/Cargo.toml index c6399c612..91beb32ad 100644 --- a/crates/brk_store/Cargo.toml +++ b/crates/brk_store/Cargo.toml @@ -16,6 +16,7 @@ brk_error = { workspace = true } brk_types = { workspace = true } byteview6 = { version = "=0.6.1", package = "byteview" } byteview8 = { version = "~0.8.0", package = "byteview" } +candystore = "0.5.5" fjall2 = { workspace = true } # fjall3 = { workspace = true } log = { workspace = true } diff --git a/crates/brk_types/src/outpoint.rs b/crates/brk_types/src/outpoint.rs index ef6caadbf..d77739f02 100644 --- a/crates/brk_types/src/outpoint.rs +++ b/crates/brk_types/src/outpoint.rs @@ -1,6 +1,7 @@ use allocative::Allocative; use schemars::JsonSchema; use serde::Serialize; +use vecdb::StoredCompressed; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; use crate::{TxIndex, Vout}; @@ -22,6 +23,7 @@ use crate::{TxIndex, Vout}; Allocative, JsonSchema, Hash, + StoredCompressed, )] pub struct OutPoint(u64); diff --git a/crates/brk_types/src/sats.rs b/crates/brk_types/src/sats.rs index 1ced689c0..c5f3925bb 100644 --- a/crates/brk_types/src/sats.rs +++ b/crates/brk_types/src/sats.rs @@ -71,6 +71,10 @@ impl Sats { pub fn is_not_zero(&self) -> bool { *self != Self::ZERO } + + pub fn is_max(&self) -> bool { + *self == Self::MAX + } } impl Add for Sats { diff --git a/crates/brk_types/src/txinindex.rs b/crates/brk_types/src/txinindex.rs index edf8f90da..61d522630 100644 --- a/crates/brk_types/src/txinindex.rs +++ b/crates/brk_types/src/txinindex.rs @@ -30,6 +30,10 @@ use super::Vin; pub struct TxInIndex(u64); impl TxInIndex { + pub fn new(index: u64) -> Self { + Self(index) + } + pub fn incremented(self) -> Self { Self(*self + 1) } diff --git a/crates/brk_types/src/txoutindex.rs b/crates/brk_types/src/txoutindex.rs index a76d6e2cc..fdf815baa 100644 --- a/crates/brk_types/src/txoutindex.rs +++ b/crates/brk_types/src/txoutindex.rs @@ -36,6 +36,10 @@ impl TxOutIndex { pub const COINBASE: Self = Self(u64::MAX); + pub fn new(index: u64) -> Self { + Self(index) + } + pub fn incremented(self) -> Self { Self(*self + 1) } diff --git a/crates/brk_types/src/vout.rs b/crates/brk_types/src/vout.rs index 90a487d38..7f35e0a38 100644 --- a/crates/brk_types/src/vout.rs +++ b/crates/brk_types/src/vout.rs @@ -83,6 +83,12 @@ impl From for u64 { } } +impl From for usize { + fn from(value: Vout) -> Self { + value.0 as usize + } +} + impl From<&[u8]> for Vout { fn from(value: &[u8]) -> Self { Self(u16::from_be_bytes(copy_first_2bytes(value).unwrap()))