From d31d47eb3271e39a05a274ee9425f4cce0bf9a62 Mon Sep 17 00:00:00 2001 From: nym21 Date: Sat, 5 Jul 2025 17:44:51 +0200 Subject: [PATCH] computer: store part 10 --- Cargo.lock | 2 + Cargo.toml | 4 + crates/brk/.gitignore | 2 + crates/brk/flamegraph.sh | 2 + crates/brk/samply.sh | 2 + crates/brk_computer/Cargo.toml | 1 + crates/brk_computer/src/lib.rs | 1 + crates/brk_computer/src/stores.rs | 37 +- .../src/vecs/stateful/address_cohort.rs | 47 +- .../src/vecs/stateful/address_cohorts.rs | 5 +- .../stateful/addresstype_to_typeindex_vec.rs | 7 +- crates/brk_computer/src/vecs/stateful/mod.rs | 452 ++++++++++-------- .../brk_computer/src/vecs/stateful/trait.rs | 38 +- .../src/vecs/stateful/utxo_cohort.rs | 47 +- .../src/vecs/stateful/utxo_cohorts.rs | 2 +- crates/brk_core/src/structs/outputindex.rs | 2 + crates/brk_core/src/structs/outputtype.rs | 4 + .../brk_core/src/structs/p2aaddressindex.rs | 5 + .../src/structs/p2pk33addressindex.rs | 5 + .../src/structs/p2pk65addressindex.rs | 5 + .../brk_core/src/structs/p2pkhaddressindex.rs | 5 + .../brk_core/src/structs/p2shaddressindex.rs | 5 + .../brk_core/src/structs/p2traddressindex.rs | 5 + .../src/structs/p2wpkhaddressindex.rs | 5 + .../brk_core/src/structs/p2wshaddressindex.rs | 5 + crates/brk_indexer/Cargo.toml | 1 + crates/brk_indexer/src/lib.rs | 3 + crates/brk_indexer/src/stores.rs | 182 ++++--- crates/brk_store/examples/main.rs | 7 +- crates/brk_store/src/lib.rs | 51 +- crates/brk_store/src/meta.rs | 4 +- crates/brk_store/src/trait.rs | 4 + crates/brk_vec/src/traits/collectable.rs | 4 + websites/default/index.html | 54 ++- .../packages/lightweight-charts/wrapper.js | 57 +-- websites/default/scripts/chart.js | 24 +- websites/default/scripts/explorer.js | 53 ++ websites/default/scripts/main.js | 125 +++-- websites/default/scripts/options.js | 118 +++-- 39 files changed, 896 insertions(+), 486 deletions(-) create mode 100644 crates/brk/.gitignore create mode 100755 crates/brk/flamegraph.sh create mode 100755 crates/brk/samply.sh create mode 100644 websites/default/scripts/explorer.js diff --git a/Cargo.lock b/Cargo.lock index 4188e9dee..2d36e23f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -512,6 +512,7 @@ dependencies = [ "brk_vec", "color-eyre", "derive_deref", + "fjall", "jiff", "log", "rayon", @@ -575,6 +576,7 @@ dependencies = [ "brk_store", "brk_vec", "color-eyre", + "fjall", "log", "rayon", ] diff --git a/Cargo.toml b/Cargo.toml index 5ac9eda8d..1ed08480e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,10 @@ lto = "fat" codegen-units = 1 panic = "abort" +[profile.profiling] +inherits = "release" +debug = true + [profile.dist] inherits = "release" diff --git a/crates/brk/.gitignore b/crates/brk/.gitignore new file mode 100644 index 000000000..996e0ebda --- /dev/null +++ b/crates/brk/.gitignore @@ -0,0 +1,2 @@ +profile.json.gz +flamegraph.svg diff --git a/crates/brk/flamegraph.sh b/crates/brk/flamegraph.sh new file mode 100755 index 000000000..1a00c11d7 --- /dev/null +++ b/crates/brk/flamegraph.sh @@ -0,0 +1,2 @@ +cargo build --profile profiling +flamegraph -- ../../target/profiling/brk diff --git a/crates/brk/samply.sh b/crates/brk/samply.sh new file mode 100755 index 000000000..f11387b4f --- /dev/null +++ b/crates/brk/samply.sh @@ -0,0 +1,2 @@ +cargo build --profile profiling +samply record ../../target/profiling/brk diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index f1cbffa6c..fde6a5e87 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -21,6 +21,7 @@ brk_store = { workspace = true } brk_vec = { workspace = true } color-eyre = { workspace = true } derive_deref = { workspace = true } +fjall = { workspace = true } jiff = { workspace = true } log = { workspace = true } rayon = { workspace = true } diff --git a/crates/brk_computer/src/lib.rs b/crates/brk_computer/src/lib.rs index 9983524ce..c2e5b1cbc 100644 --- a/crates/brk_computer/src/lib.rs +++ b/crates/brk_computer/src/lib.rs @@ -53,6 +53,7 @@ impl Computer { // TODO: Give self.path, join inside import &outputs_dir.join("stores"), VERSION + Version::ZERO, + &indexer.stores.keyspace, )?, fetcher, }) diff --git a/crates/brk_computer/src/stores.rs b/crates/brk_computer/src/stores.rs index 3753fa362..a254ec656 100644 --- a/crates/brk_computer/src/stores.rs +++ b/crates/brk_computer/src/stores.rs @@ -6,6 +6,7 @@ use brk_core::{ P2WPKHAddressIndex, P2WSHAddressIndex, Result, TypeIndex, Version, }; use brk_store::{AnyStore, Store}; +use fjall::{PersistMode, TransactionalKeyspace}; use log::info; use crate::vecs::stateful::{AddressTypeToTypeIndexTree, WithAddressDataSource}; @@ -14,6 +15,8 @@ const VERSION: Version = Version::ZERO; #[derive(Clone)] pub struct Stores { + keyspace: TransactionalKeyspace, + pub p2aaddressindex_to_addressdata: Store, pub p2aaddressindex_to_emptyaddressdata: Store, pub p2pk33addressindex_to_addressdata: Store, @@ -33,7 +36,11 @@ pub struct Stores { } impl Stores { - pub fn import(path: &Path, version: Version) -> color_eyre::Result { + pub fn import( + path: &Path, + version: Version, + keyspace: &TransactionalKeyspace, + ) -> color_eyre::Result { let ( (p2aaddressindex_to_addressdata, p2aaddressindex_to_emptyaddressdata), (p2pk33addressindex_to_addressdata, p2pk33addressindex_to_emptyaddressdata), @@ -47,6 +54,7 @@ impl Stores { let p2a = scope.spawn(|| { ( Store::import( + keyspace, path, "p2aaddressindex_to_addressdata", version + VERSION + Version::ZERO, @@ -54,6 +62,7 @@ impl Stores { ) .unwrap(), Store::import( + keyspace, path, "p2aaddressindex_to_emptyaddressdata", version + VERSION + Version::ZERO, @@ -66,6 +75,7 @@ impl Stores { let p2pk33 = scope.spawn(|| { ( Store::import( + keyspace, path, "p2pk33addressindex_to_addressdata", version + VERSION + Version::ZERO, @@ -73,6 +83,7 @@ impl Stores { ) .unwrap(), Store::import( + keyspace, path, "p2pk33addressindex_to_emptyaddressdata", version + VERSION + Version::ZERO, @@ -85,6 +96,7 @@ impl Stores { let p2pk65 = scope.spawn(|| { ( Store::import( + keyspace, path, "p2pk65addressindex_to_addressdata", version + VERSION + Version::ZERO, @@ -92,6 +104,7 @@ impl Stores { ) .unwrap(), Store::import( + keyspace, path, "p2pk65addressindex_to_emptyaddressdata", version + VERSION + Version::ZERO, @@ -104,6 +117,7 @@ impl Stores { let p2pkh = scope.spawn(|| { ( Store::import( + keyspace, path, "p2pkhaddressindex_to_addressdata", version + VERSION + Version::ZERO, @@ -111,6 +125,7 @@ impl Stores { ) .unwrap(), Store::import( + keyspace, path, "p2pkhaddressindex_to_emptyaddressdata", version + VERSION + Version::ZERO, @@ -123,6 +138,7 @@ impl Stores { let p2sh = scope.spawn(|| { ( Store::import( + keyspace, path, "p2shaddressindex_to_addressdata", version + VERSION + Version::ZERO, @@ -130,6 +146,7 @@ impl Stores { ) .unwrap(), Store::import( + keyspace, path, "p2shaddressindex_to_emptyaddressdata", version + VERSION + Version::ZERO, @@ -142,6 +159,7 @@ impl Stores { let p2tr = scope.spawn(|| { ( Store::import( + keyspace, path, "p2traddressindex_to_addressdata", version + VERSION + Version::ZERO, @@ -149,6 +167,7 @@ impl Stores { ) .unwrap(), Store::import( + keyspace, path, "p2traddressindex_to_emptyaddressdata", version + VERSION + Version::ZERO, @@ -161,6 +180,7 @@ impl Stores { let p2wpkh = scope.spawn(|| { ( Store::import( + keyspace, path, "p2wpkhaddressindex_to_addressdata", version + VERSION + Version::ZERO, @@ -168,6 +188,7 @@ impl Stores { ) .unwrap(), Store::import( + keyspace, path, "p2wpkhaddressindex_to_emptyaddressdata", version + VERSION + Version::ZERO, @@ -180,6 +201,7 @@ impl Stores { let p2wsh = scope.spawn(|| { ( Store::import( + keyspace, path, "p2wshaddressindex_to_addressdata", version + VERSION + Version::ZERO, @@ -187,6 +209,7 @@ impl Stores { ) .unwrap(), Store::import( + keyspace, path, "p2wshaddressindex_to_emptyaddressdata", version + VERSION + Version::ZERO, @@ -209,6 +232,8 @@ impl Stores { }); Ok(Self { + keyspace: keyspace.clone(), + p2aaddressindex_to_addressdata, p2aaddressindex_to_emptyaddressdata, @@ -249,7 +274,11 @@ impl Stores { self.as_mut_slice() .into_iter() - .try_for_each(|store| store.reset()) + .try_for_each(|store| store.reset())?; + + self.keyspace + .persist(PersistMode::SyncAll) + .map_err(|e| e.into()) } pub fn get_addressdata( @@ -563,7 +592,9 @@ impl Stores { }); }); - Ok(()) + self.keyspace + .persist(PersistMode::SyncAll) + .map_err(|e| e.into()) } pub fn rotate_memtables(&self) { diff --git a/crates/brk_computer/src/vecs/stateful/address_cohort.rs b/crates/brk_computer/src/vecs/stateful/address_cohort.rs index aa1d31f15..0b64ede52 100644 --- a/crates/brk_computer/src/vecs/stateful/address_cohort.rs +++ b/crates/brk_computer/src/vecs/stateful/address_cohort.rs @@ -11,7 +11,10 @@ use crate::{ states::AddressCohortState, vecs::{ Indexes, fetched, indexes, market, - stateful::{common, r#trait::CohortVecs}, + stateful::{ + common, + r#trait::{CohortVecs, DynCohortVecs}, + }, }, }; @@ -28,9 +31,9 @@ pub struct Vecs { pub inner: common::Vecs, } -impl CohortVecs for Vecs { +impl Vecs { #[allow(clippy::too_many_arguments)] - fn forced_import( + pub fn forced_import( path: &Path, cohort_name: Option<&str>, computation: Computation, @@ -68,7 +71,9 @@ impl CohortVecs for Vecs { )?, }) } +} +impl DynCohortVecs for Vecs { fn starting_height(&self) -> Height { [ self.state.height().map_or(Height::MAX, |h| h.incremented()), @@ -146,6 +151,25 @@ impl CohortVecs for Vecs { .safe_flush_stateful_vecs(height, exit, &mut self.state.inner) } + #[allow(clippy::too_many_arguments)] + fn compute_rest_part1( + &mut self, + indexer: &Indexer, + indexes: &indexes::Vecs, + fetched: Option<&fetched::Vecs>, + starting_indexes: &Indexes, + exit: &Exit, + ) -> color_eyre::Result<()> { + self.inner + .compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) + } + + fn vecs(&self) -> Vec<&dyn AnyCollectableVec> { + [self.inner.vecs(), vec![&self.height_to_address_count]].concat() + } +} + +impl CohortVecs for Vecs { fn compute_from_stateful( &mut self, starting_indexes: &Indexes, @@ -168,19 +192,6 @@ impl CohortVecs for Vecs { ) } - #[allow(clippy::too_many_arguments)] - fn compute_rest_part1( - &mut self, - indexer: &Indexer, - indexes: &indexes::Vecs, - fetched: Option<&fetched::Vecs>, - starting_indexes: &Indexes, - exit: &Exit, - ) -> color_eyre::Result<()> { - self.inner - .compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) - } - #[allow(clippy::too_many_arguments)] fn compute_rest_part2( &mut self, @@ -208,10 +219,6 @@ impl CohortVecs for Vecs { exit, ) } - - fn vecs(&self) -> Vec<&dyn AnyCollectableVec> { - [self.inner.vecs(), vec![&self.height_to_address_count]].concat() - } } impl Deref for Vecs { diff --git a/crates/brk_computer/src/vecs/stateful/address_cohorts.rs b/crates/brk_computer/src/vecs/stateful/address_cohorts.rs index 5828d01b1..f73a3b30e 100644 --- a/crates/brk_computer/src/vecs/stateful/address_cohorts.rs +++ b/crates/brk_computer/src/vecs/stateful/address_cohorts.rs @@ -11,7 +11,10 @@ use rayon::prelude::*; use crate::vecs::{ Indexes, fetched, - stateful::{address_cohort, r#trait::CohortVecs}, + stateful::{ + address_cohort, + r#trait::{CohortVecs, DynCohortVecs}, + }, }; const VERSION: Version = Version::new(0); diff --git a/crates/brk_computer/src/vecs/stateful/addresstype_to_typeindex_vec.rs b/crates/brk_computer/src/vecs/stateful/addresstype_to_typeindex_vec.rs index b9537901a..84c807d74 100644 --- a/crates/brk_computer/src/vecs/stateful/addresstype_to_typeindex_vec.rs +++ b/crates/brk_computer/src/vecs/stateful/addresstype_to_typeindex_vec.rs @@ -1,14 +1,13 @@ use std::mem; -use brk_core::TypeIndex; use derive_deref::{Deref, DerefMut}; use super::GroupedByAddressType; #[derive(Debug, Default, Deref, DerefMut)] -pub struct AddressTypeToTypeIndexVec(GroupedByAddressType>); +pub struct AddressTypeToVec(GroupedByAddressType>); -impl AddressTypeToTypeIndexVec { +impl AddressTypeToVec { pub fn merge(&mut self, mut other: Self) { Self::merge_(&mut self.p2pk65, &mut other.p2pk65); Self::merge_(&mut self.p2pk33, &mut other.p2pk33); @@ -20,7 +19,7 @@ impl AddressTypeToTypeIndexVec { Self::merge_(&mut self.p2a, &mut other.p2a); } - fn merge_(own: &mut Vec<(TypeIndex, T)>, other: &mut Vec<(TypeIndex, T)>) { + fn merge_(own: &mut Vec, other: &mut Vec) { if own.len() >= other.len() { own.append(other); } else { diff --git a/crates/brk_computer/src/vecs/stateful/mod.rs b/crates/brk_computer/src/vecs/stateful/mod.rs index 58e4e27d5..5e27adccc 100644 --- a/crates/brk_computer/src/vecs/stateful/mod.rs +++ b/crates/brk_computer/src/vecs/stateful/mod.rs @@ -2,7 +2,7 @@ use std::{cmp::Ordering, collections::BTreeMap, mem, path::Path, thread}; use brk_core::{ AddressData, CheckedSub, DateIndex, Dollars, EmptyAddressData, GroupedByAddressType, Height, - InputIndex, OutputIndex, OutputType, Result, Sats, Version, + InputIndex, OutputIndex, OutputType, Result, Sats, TypeIndex, Version, }; use brk_exit::Exit; use brk_indexer::Indexer; @@ -20,7 +20,7 @@ use crate::{ market, stateful::{ addresstype_to_addresscount::AddressTypeToAddressCount, - addresstype_to_addresscount_vec::AddressTypeToAddressCountVec, + addresstype_to_addresscount_vec::AddressTypeToAddressCountVec, r#trait::DynCohortVecs, }, }, }; @@ -269,6 +269,14 @@ impl Vecs { ) -> color_eyre::Result<()> { let height_to_first_outputindex = &indexer.vecs.height_to_first_outputindex; let height_to_first_inputindex = &indexer.vecs.height_to_first_inputindex; + let height_to_first_p2aaddressindex = &indexer.vecs.height_to_first_p2aaddressindex; + let height_to_first_p2pk33addressindex = &indexer.vecs.height_to_first_p2pk33addressindex; + let height_to_first_p2pk65addressindex = &indexer.vecs.height_to_first_p2pk65addressindex; + let height_to_first_p2pkhaddressindex = &indexer.vecs.height_to_first_p2pkhaddressindex; + let height_to_first_p2shaddressindex = &indexer.vecs.height_to_first_p2shaddressindex; + let height_to_first_p2traddressindex = &indexer.vecs.height_to_first_p2traddressindex; + let height_to_first_p2wpkhaddressindex = &indexer.vecs.height_to_first_p2wpkhaddressindex; + let height_to_first_p2wshaddressindex = &indexer.vecs.height_to_first_p2wshaddressindex; let height_to_output_count = transactions.indexes_to_output_count.height.unwrap_sum(); let height_to_input_count = transactions.indexes_to_input_count.height.unwrap_sum(); let inputindex_to_outputindex = &indexer.vecs.inputindex_to_outputindex; @@ -300,11 +308,24 @@ impl Vecs { let outputindex_to_typeindex_mmap = outputindex_to_typeindex.mmap().load(); let outputindex_to_txindex_mmap = outputindex_to_txindex.mmap().load(); let txindex_to_height_mmap = txindex_to_height.mmap().load(); - let height_to_close_mmap = height_to_close.map(|v| v.mmap().load()); - let height_to_timestamp_fixed_mmap = height_to_timestamp_fixed.mmap().load(); let mut height_to_first_outputindex_iter = height_to_first_outputindex.into_iter(); let mut height_to_first_inputindex_iter = height_to_first_inputindex.into_iter(); + let mut height_to_first_p2aaddressindex_iter = height_to_first_p2aaddressindex.into_iter(); + let mut height_to_first_p2pk33addressindex_iter = + height_to_first_p2pk33addressindex.into_iter(); + let mut height_to_first_p2pk65addressindex_iter = + height_to_first_p2pk65addressindex.into_iter(); + let mut height_to_first_p2pkhaddressindex_iter = + height_to_first_p2pkhaddressindex.into_iter(); + let mut height_to_first_p2shaddressindex_iter = + height_to_first_p2shaddressindex.into_iter(); + let mut height_to_first_p2traddressindex_iter = + height_to_first_p2traddressindex.into_iter(); + let mut height_to_first_p2wpkhaddressindex_iter = + height_to_first_p2wpkhaddressindex.into_iter(); + let mut height_to_first_p2wshaddressindex_iter = + height_to_first_p2wshaddressindex.into_iter(); let mut height_to_output_count_iter = height_to_output_count.into_iter(); let mut height_to_input_count_iter = height_to_input_count.into_iter(); let mut height_to_close_iter = height_to_close.as_ref().map(|v| v.into_iter()); @@ -318,6 +339,14 @@ impl Vecs { let base_version = Version::ZERO + height_to_first_outputindex.version() + height_to_first_inputindex.version() + + height_to_first_p2aaddressindex.version() + + height_to_first_p2pk33addressindex.version() + + height_to_first_p2pk65addressindex.version() + + height_to_first_p2pkhaddressindex.version() + + height_to_first_p2shaddressindex.version() + + height_to_first_p2traddressindex.version() + + height_to_first_p2wpkhaddressindex.version() + + height_to_first_p2wshaddressindex.version() + height_to_timestamp_fixed.version() + height_to_output_count.version() + height_to_input_count.version() @@ -438,6 +467,11 @@ impl Vecs { .par_iter_mut() .for_each(|(_, v)| v.init(starting_height)); + let height_to_close_vec = + height_to_close.map(|height_to_close| height_to_close.collect().unwrap()); + + let height_to_timestamp_fixed_vec = height_to_timestamp_fixed.collect().unwrap(); + let mut unspendable_supply = if let Some(prev_height) = starting_height.decremented() { self.height_to_unspendable_supply .into_iter() @@ -498,6 +532,34 @@ impl Vecs { let output_count = height_to_output_count_iter.unwrap_get_inner(height); let input_count = height_to_input_count_iter.unwrap_get_inner(height); + let first_addressindexes: GroupedByAddressType = + GroupedByAddressType { + p2a: height_to_first_p2aaddressindex_iter + .unwrap_get_inner(height) + .into(), + p2pk33: height_to_first_p2pk33addressindex_iter + .unwrap_get_inner(height) + .into(), + p2pk65: height_to_first_p2pk65addressindex_iter + .unwrap_get_inner(height) + .into(), + p2pkh: height_to_first_p2pkhaddressindex_iter + .unwrap_get_inner(height) + .into(), + p2sh: height_to_first_p2shaddressindex_iter + .unwrap_get_inner(height) + .into(), + p2tr: height_to_first_p2traddressindex_iter + .unwrap_get_inner(height) + .into(), + p2wpkh: height_to_first_p2wpkhaddressindex_iter + .unwrap_get_inner(height) + .into(), + p2wsh: height_to_first_p2wshaddressindex_iter + .unwrap_get_inner(height) + .into(), + }; + let ( (mut height_to_sent, addresstype_to_typedindex_to_sent_data), (mut received, addresstype_to_typedindex_to_received_data), @@ -532,31 +594,6 @@ impl Vecs { .unwrap() .into_owned(); - let typeindex = outputindex_to_typeindex - .get_or_read(outputindex, &outputindex_to_typeindex_mmap) - .unwrap() - .unwrap() - .into_owned(); - - if input_type.is_unspendable() { - unreachable!() - } - - let addressdata_opt = if input_type.is_address() - && !addresstype_to_typeindex_to_addressdata - .get(input_type) - .unwrap() - .contains_key(&typeindex) - && let Some(address_data) = - stores.get_addressdata(input_type, typeindex).unwrap() - { - Some(WithAddressDataSource::FromAddressDataStore( - address_data, - )) - } else { - None - }; - let input_txindex = outputindex_to_txindex .get_or_read(outputindex, &outputindex_to_txindex_mmap) .unwrap() @@ -569,22 +606,49 @@ impl Vecs { .unwrap() .into_owned(); - let prev_price = height_to_close.map(|m| { - *m.get_or_read( + if input_type.is_unspendable() { + unreachable!() + } else if input_type.is_not_address() { + return ( prev_height, - height_to_close_mmap.as_ref().unwrap(), + value, + input_type, + None ) - .unwrap() - .unwrap() - .into_owned() - }); + } - let prev_timestamp = height_to_timestamp_fixed - .get_or_read(prev_height, &height_to_timestamp_fixed_mmap) + let typeindex = outputindex_to_typeindex + .get_or_read(outputindex, &outputindex_to_typeindex_mmap) .unwrap() .unwrap() .into_owned(); + let addressdata_opt = if input_type.is_address() + && *first_addressindexes.get(input_type).unwrap() + > typeindex + && !addresstype_to_typeindex_to_addressdata + .get(input_type) + .unwrap() + .contains_key(&typeindex) + && let Some(address_data) = + stores.get_addressdata(input_type, typeindex).unwrap() + // Otherwise it was empty and got funds in the same block before sending them + { + Some(WithAddressDataSource::FromAddressDataStore( + address_data, + )) + } else { + None + }; + + let prev_price = height_to_close_vec + .as_ref() + .map(|v| **v.get(prev_height.unwrap_to_usize()).unwrap()); + + let prev_timestamp = *height_to_timestamp_fixed_vec + .get(prev_height.unwrap_to_usize()) + .unwrap(); + let blocks_old = height.unwrap_to_usize() - prev_height.unwrap_to_usize(); @@ -600,25 +664,27 @@ impl Vecs { prev_height, value, input_type, - typeindex, - addressdata_opt, - prev_price, - blocks_old, - days_old, - older_than_hour, + Some((typeindex, + addressdata_opt, + prev_price, + blocks_old, + days_old, + older_than_hour + )) ) }) .fold( || { ( BTreeMap::::default(), - AddressTypeToTypeIndexVec::<( + AddressTypeToVec::<( + TypeIndex, Sats, Option>, Option, usize, f64, - bool, + bool )>::default( ), ) @@ -628,25 +694,23 @@ impl Vecs { height, value, input_type, - typeindex, + address_data_opt + )| { + tree.entry(height).or_default().iterate(value, input_type); + if let Some((typeindex, addressdata_opt, prev_price, blocks_old, days_old, - older_than_hour, - )| { - tree.entry(height).or_default().iterate(value, input_type); - if let Some(vec) = vecs.get_mut(input_type) { - vec.push(( + older_than_hour)) = address_data_opt { + vecs.get_mut(input_type).unwrap().push(( typeindex, - ( - value, - addressdata_opt, - prev_price, - blocks_old, - days_old, - older_than_hour, - ), + value, + addressdata_opt, + prev_price, + blocks_old, + days_old, + older_than_hour, )); } (tree, vecs) @@ -656,15 +720,15 @@ impl Vecs { || { ( BTreeMap::::default(), - AddressTypeToTypeIndexVec::<( + AddressTypeToVec::<( + TypeIndex, Sats, Option>, Option, usize, f64, bool, - )>::default( - ), + )>::default(), ) }, |(first_tree, mut source_vecs), (second_tree, other_vecs)| { @@ -683,111 +747,119 @@ impl Vecs { ) }); - let received_handle = s.spawn(|| { - (first_outputindex..first_outputindex + *output_count) - .into_par_iter() - .map(OutputIndex::from) - .map(|outputindex| { - let value = outputindex_to_value - .get_or_read(outputindex, &outputindex_to_value_mmap) - .unwrap() - .unwrap() - .into_owned(); + // let received_handle = s.spawn(|| { + let received_output = (first_outputindex..first_outputindex + *output_count) + .into_par_iter() + .map(OutputIndex::from) + .map(|outputindex| { + let value = outputindex_to_value + .get_or_read(outputindex, &outputindex_to_value_mmap) + .unwrap() + .unwrap() + .into_owned(); - let output_type = outputindex_to_outputtype - .get_or_read(outputindex, &outputindex_to_outputtype_mmap) - .unwrap() - .unwrap() - .into_owned(); + let output_type = outputindex_to_outputtype + .get_or_read(outputindex, &outputindex_to_outputtype_mmap) + .unwrap() + .unwrap() + .into_owned(); - let typeindex = outputindex_to_typeindex - .get_or_read(outputindex, &outputindex_to_typeindex_mmap) - .unwrap() - .unwrap() - .into_owned(); + if output_type.is_not_address() { + return (value, output_type, None); + } - let addressdata_opt = if output_type.is_address() - && !addresstype_to_typeindex_to_addressdata - .get(output_type) + let typeindex = outputindex_to_typeindex + .get_or_read(outputindex, &outputindex_to_typeindex_mmap) + .unwrap() + .unwrap() + .into_owned(); + + let addressdata_opt = if *first_addressindexes.get(output_type).unwrap() + <= typeindex { + Some(WithAddressDataSource::New(AddressData::default())) + } else if !addresstype_to_typeindex_to_addressdata + .get(output_type) + .unwrap() + .contains_key(&typeindex) + && !addresstype_to_typeindex_to_emptyaddressdata + .get(output_type) + .unwrap() + .contains_key(&typeindex) { + Some( + if let Some(addressdata) = stores + .get_addressdata(output_type, typeindex) .unwrap() - .contains_key(&typeindex) - && !addresstype_to_typeindex_to_emptyaddressdata - .get(output_type) + { + WithAddressDataSource::FromAddressDataStore( + addressdata, + ) + } else if let Some(emptyaddressdata) = stores + .get_emptyaddressdata(output_type, typeindex) .unwrap() - .contains_key(&typeindex) - { - Some( - if let Some(addressdata) = stores - .get_addressdata(output_type, typeindex) - .unwrap() - { - WithAddressDataSource::FromAddressDataStore( - addressdata, - ) - } else if let Some(emptyaddressdata) = stores - .get_emptyaddressdata(output_type, typeindex) - .unwrap() - { - WithAddressDataSource::FromEmptyAddressDataStore( - emptyaddressdata.into(), - ) - } else { - WithAddressDataSource::New(AddressData::default()) - }, - ) - } else { - None - }; + { + WithAddressDataSource::FromEmptyAddressDataStore( + emptyaddressdata.into(), + ) + } else { + WithAddressDataSource::New(AddressData::default()) + }, + ) + } else { + None + }; - (value, output_type, typeindex, addressdata_opt) - }) - .fold( - || { - ( - Transacted::default(), - AddressTypeToTypeIndexVec::<( - Sats, - Option>, - )>::default( - ), - ) - }, - |(mut transacted, mut vecs), - ( - value, - output_type, - typeindex, - addressdata_opt, - )| { - transacted.iterate(value, output_type); - if let Some(vec) = vecs.get_mut(output_type) { - vec.push(( - typeindex, - (value, addressdata_opt), - )); - } - (transacted, vecs) - }, - ) - .reduce( - || { - ( - Transacted::default(), - AddressTypeToTypeIndexVec::<( - Sats, - Option>, - )>::default( - ), - ) - }, - |(transacted, mut vecs), (other_transacted, other_vecs)| { - vecs.merge(other_vecs); - (transacted + other_transacted, vecs) - }, - ) - }); + (value, output_type, Some((typeindex, addressdata_opt))) + }) + .fold( + || { + ( + Transacted::default(), + AddressTypeToVec::<( + TypeIndex, + Sats, + Option>, + )>::default( + ), + ) + }, + |(mut transacted, mut vecs), + ( + value, + output_type, + typeindex_with_addressdata_opt, + )| { + transacted.iterate(value, output_type); + if let Some(vec) = vecs.get_mut(output_type) { + let (typeindex, + addressdata_opt) = typeindex_with_addressdata_opt.unwrap(); + vec.push(( + typeindex, + value, + addressdata_opt, + )); + } + (transacted, vecs) + }, + ) + .reduce( + || { + ( + Transacted::default(), + AddressTypeToVec::<( + TypeIndex, + Sats, + Option>, + )>::default(), + ) + }, + |(transacted, mut vecs), (other_transacted, other_vecs)| { + vecs.merge(other_vecs); + (transacted + other_transacted, vecs) + }, + ); + // }); - (sent_handle.join().unwrap(), received_handle.join().unwrap()) + (sent_handle.join().unwrap(), received_output) }); if chain_state_starting_height > height { @@ -904,36 +976,22 @@ impl Vecs { .as_mut() .map(|v| is_date_last_height.then(|| *v.unwrap_get_inner(dateindex))); - thread::scope(|scope| { - scope.spawn(|| { - separate_utxo_vecs - .par_iter_mut() - .try_for_each(|(_, v)| { - v.compute_then_force_push_unrealized_states( - height, - price, - is_date_last_height.then_some(dateindex), - date_price, - exit, - ) - }) - .unwrap(); - }); - scope.spawn(|| { + separate_utxo_vecs + .into_par_iter() + .map(|(_, v)| v as &mut dyn DynCohortVecs).chain( separate_address_vecs - .par_iter_mut() - .try_for_each(|(_, v)| { - v.compute_then_force_push_unrealized_states( - height, - price, - is_date_last_height.then_some(dateindex), - date_price, - exit, - ) - }) - .unwrap(); - }); - }); + .into_par_iter() + .map(|(_, v)| v as &mut dyn DynCohortVecs) + ) + .try_for_each(|v| { + v.compute_then_force_push_unrealized_states( + height, + price, + is_date_last_height.then_some(dateindex), + date_price, + exit, + ) + })?; if height != Height::ZERO && height.unwrap_to_usize() % 10_000 == 0 { info!("Flushing..."); @@ -1159,7 +1217,7 @@ impl Vecs { } } -impl AddressTypeToTypeIndexVec<(Sats, Option>)> { +impl AddressTypeToVec<(TypeIndex, Sats, Option>)> { fn process_received( mut self, vecs: &mut address_cohorts::Vecs, @@ -1175,7 +1233,7 @@ impl AddressTypeToTypeIndexVec<(Sats, Option> ) { self.into_typed_vec().into_iter().for_each(|(_type, vec)| { vec.into_iter() - .for_each(|(type_index, (value, addressdata_opt))| { + .for_each(|(type_index, value, addressdata_opt)| { let mut is_new = false; let mut from_any_empty = false; @@ -1242,7 +1300,8 @@ impl AddressTypeToTypeIndexVec<(Sats, Option> } impl - AddressTypeToTypeIndexVec<( + AddressTypeToVec<( + TypeIndex, Sats, Option>, Option, @@ -1270,7 +1329,12 @@ impl vec.into_iter().try_for_each( |( type_index, - (value, addressdata_opt, prev_price, blocks_old, days_old, older_than_hour), + value, + addressdata_opt, + prev_price, + blocks_old, + days_old, + older_than_hour, )| { let typeindex_to_addressdata = addresstype_to_typeindex_to_addressdata .get_mut(_type) diff --git a/crates/brk_computer/src/vecs/stateful/trait.rs b/crates/brk_computer/src/vecs/stateful/trait.rs index c9614714f..db138ebed 100644 --- a/crates/brk_computer/src/vecs/stateful/trait.rs +++ b/crates/brk_computer/src/vecs/stateful/trait.rs @@ -1,25 +1,11 @@ -use std::path::Path; - use brk_core::{Bitcoin, DateIndex, Dollars, Height, Result, Version}; use brk_exit::Exit; use brk_indexer::Indexer; -use brk_vec::{AnyCollectableVec, AnyIterableVec, Computation, Format}; +use brk_vec::{AnyCollectableVec, AnyIterableVec}; use crate::vecs::{Indexes, fetched, indexes, market}; -pub trait CohortVecs: Sized { - #[allow(clippy::too_many_arguments)] - fn forced_import( - path: &Path, - cohort_name: Option<&str>, - computation: Computation, - format: Format, - version: Version, - fetched: Option<&fetched::Vecs>, - states_path: &Path, - compute_relative_to_all: bool, - ) -> color_eyre::Result; - +pub trait DynCohortVecs: Send + Sync { fn starting_height(&self) -> Height; fn init(&mut self, starting_height: Height); @@ -39,13 +25,6 @@ pub trait CohortVecs: Sized { fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()>; - fn compute_from_stateful( - &mut self, - starting_indexes: &Indexes, - others: &[&Self], - exit: &Exit, - ) -> Result<()>; - #[allow(clippy::too_many_arguments)] fn compute_rest_part1( &mut self, @@ -56,6 +35,17 @@ pub trait CohortVecs: Sized { exit: &Exit, ) -> color_eyre::Result<()>; + fn vecs(&self) -> Vec<&dyn AnyCollectableVec>; +} + +pub trait CohortVecs: DynCohortVecs { + fn compute_from_stateful( + &mut self, + starting_indexes: &Indexes, + others: &[&Self], + exit: &Exit, + ) -> Result<()>; + #[allow(clippy::too_many_arguments)] fn compute_rest_part2( &mut self, @@ -70,6 +60,4 @@ pub trait CohortVecs: Sized { dateindex_to_realized_cap: Option<&impl AnyIterableVec>, exit: &Exit, ) -> color_eyre::Result<()>; - - fn vecs(&self) -> Vec<&dyn AnyCollectableVec>; } diff --git a/crates/brk_computer/src/vecs/stateful/utxo_cohort.rs b/crates/brk_computer/src/vecs/stateful/utxo_cohort.rs index 9ab8c9b82..1c72bc084 100644 --- a/crates/brk_computer/src/vecs/stateful/utxo_cohort.rs +++ b/crates/brk_computer/src/vecs/stateful/utxo_cohort.rs @@ -9,7 +9,10 @@ use crate::{ UTXOCohortState, vecs::{ Indexes, fetched, indexes, market, - stateful::{common, r#trait::CohortVecs}, + stateful::{ + common, + r#trait::{CohortVecs, DynCohortVecs}, + }, }, }; @@ -22,9 +25,9 @@ pub struct Vecs { inner: common::Vecs, } -impl CohortVecs for Vecs { +impl Vecs { #[allow(clippy::too_many_arguments)] - fn forced_import( + pub fn forced_import( path: &Path, cohort_name: Option<&str>, computation: Computation, @@ -56,7 +59,9 @@ impl CohortVecs for Vecs { )?, }) } +} +impl DynCohortVecs for Vecs { fn starting_height(&self) -> Height { [ self.state.height().map_or(Height::MAX, |h| h.incremented()), @@ -112,19 +117,6 @@ impl CohortVecs for Vecs { .safe_flush_stateful_vecs(height, exit, &mut self.state) } - fn compute_from_stateful( - &mut self, - starting_indexes: &Indexes, - others: &[&Self], - exit: &Exit, - ) -> Result<()> { - self.inner.compute_from_stateful( - starting_indexes, - &others.iter().map(|v| &v.inner).collect::>(), - exit, - ) - } - #[allow(clippy::too_many_arguments)] fn compute_rest_part1( &mut self, @@ -138,6 +130,25 @@ impl CohortVecs for Vecs { .compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) } + fn vecs(&self) -> Vec<&dyn AnyCollectableVec> { + self.inner.vecs() + } +} + +impl CohortVecs for Vecs { + fn compute_from_stateful( + &mut self, + starting_indexes: &Indexes, + others: &[&Self], + exit: &Exit, + ) -> Result<()> { + self.inner.compute_from_stateful( + starting_indexes, + &others.iter().map(|v| &v.inner).collect::>(), + exit, + ) + } + #[allow(clippy::too_many_arguments)] fn compute_rest_part2( &mut self, @@ -165,10 +176,6 @@ impl CohortVecs for Vecs { exit, ) } - - fn vecs(&self) -> Vec<&dyn AnyCollectableVec> { - self.inner.vecs() - } } impl Deref for Vecs { diff --git a/crates/brk_computer/src/vecs/stateful/utxo_cohorts.rs b/crates/brk_computer/src/vecs/stateful/utxo_cohorts.rs index 3744b9176..5ae916c57 100644 --- a/crates/brk_computer/src/vecs/stateful/utxo_cohorts.rs +++ b/crates/brk_computer/src/vecs/stateful/utxo_cohorts.rs @@ -13,7 +13,7 @@ use rayon::prelude::*; use crate::{ states::{BlockState, Transacted}, - vecs::{Indexes, fetched}, + vecs::{Indexes, fetched, stateful::r#trait::DynCohortVecs}, }; use super::{r#trait::CohortVecs, utxo_cohort}; diff --git a/crates/brk_core/src/structs/outputindex.rs b/crates/brk_core/src/structs/outputindex.rs index 6766ce675..80e1e90a9 100644 --- a/crates/brk_core/src/structs/outputindex.rs +++ b/crates/brk_core/src/structs/outputindex.rs @@ -28,6 +28,8 @@ use super::Vout; pub struct OutputIndex(u64); impl OutputIndex { + pub const ZERO: Self = Self(0); + pub const COINBASE: Self = Self(u64::MAX); pub fn incremented(self) -> Self { diff --git a/crates/brk_core/src/structs/outputtype.rs b/crates/brk_core/src/structs/outputtype.rs index c1e4b887e..4f1eab597 100644 --- a/crates/brk_core/src/structs/outputtype.rs +++ b/crates/brk_core/src/structs/outputtype.rs @@ -67,6 +67,10 @@ impl OutputType { } } + pub fn is_not_address(&self) -> bool { + !self.is_address() + } + pub fn is_unspendable(&self) -> bool { !self.is_spendable() } diff --git a/crates/brk_core/src/structs/p2aaddressindex.rs b/crates/brk_core/src/structs/p2aaddressindex.rs index 6276d91bc..6c98a44a3 100644 --- a/crates/brk_core/src/structs/p2aaddressindex.rs +++ b/crates/brk_core/src/structs/p2aaddressindex.rs @@ -31,6 +31,11 @@ impl From for P2AAddressIndex { Self(value) } } +impl From for TypeIndex { + fn from(value: P2AAddressIndex) -> Self { + value.0 + } +} impl From for u32 { fn from(value: P2AAddressIndex) -> Self { Self::from(*value) diff --git a/crates/brk_core/src/structs/p2pk33addressindex.rs b/crates/brk_core/src/structs/p2pk33addressindex.rs index c26426be6..c6f5aa7de 100644 --- a/crates/brk_core/src/structs/p2pk33addressindex.rs +++ b/crates/brk_core/src/structs/p2pk33addressindex.rs @@ -31,6 +31,11 @@ impl From for P2PK33AddressIndex { Self(value) } } +impl From for TypeIndex { + fn from(value: P2PK33AddressIndex) -> Self { + value.0 + } +} impl From for u32 { fn from(value: P2PK33AddressIndex) -> Self { Self::from(*value) diff --git a/crates/brk_core/src/structs/p2pk65addressindex.rs b/crates/brk_core/src/structs/p2pk65addressindex.rs index 1b912859c..8de0a1df0 100644 --- a/crates/brk_core/src/structs/p2pk65addressindex.rs +++ b/crates/brk_core/src/structs/p2pk65addressindex.rs @@ -31,6 +31,11 @@ impl From for P2PK65AddressIndex { Self(value) } } +impl From for TypeIndex { + fn from(value: P2PK65AddressIndex) -> Self { + value.0 + } +} impl From for usize { fn from(value: P2PK65AddressIndex) -> Self { Self::from(*value) diff --git a/crates/brk_core/src/structs/p2pkhaddressindex.rs b/crates/brk_core/src/structs/p2pkhaddressindex.rs index f275cc2ea..a481af106 100644 --- a/crates/brk_core/src/structs/p2pkhaddressindex.rs +++ b/crates/brk_core/src/structs/p2pkhaddressindex.rs @@ -31,6 +31,11 @@ impl From for P2PKHAddressIndex { Self(value) } } +impl From for TypeIndex { + fn from(value: P2PKHAddressIndex) -> Self { + value.0 + } +} impl From for usize { fn from(value: P2PKHAddressIndex) -> Self { Self::from(*value) diff --git a/crates/brk_core/src/structs/p2shaddressindex.rs b/crates/brk_core/src/structs/p2shaddressindex.rs index cfe2ce217..4ee432881 100644 --- a/crates/brk_core/src/structs/p2shaddressindex.rs +++ b/crates/brk_core/src/structs/p2shaddressindex.rs @@ -31,6 +31,11 @@ impl From for P2SHAddressIndex { Self(value) } } +impl From for TypeIndex { + fn from(value: P2SHAddressIndex) -> Self { + value.0 + } +} impl From for u32 { fn from(value: P2SHAddressIndex) -> Self { Self::from(*value) diff --git a/crates/brk_core/src/structs/p2traddressindex.rs b/crates/brk_core/src/structs/p2traddressindex.rs index 568c4c7c1..7f76cb713 100644 --- a/crates/brk_core/src/structs/p2traddressindex.rs +++ b/crates/brk_core/src/structs/p2traddressindex.rs @@ -31,6 +31,11 @@ impl From for P2TRAddressIndex { Self(value) } } +impl From for TypeIndex { + fn from(value: P2TRAddressIndex) -> Self { + value.0 + } +} impl From for u32 { fn from(value: P2TRAddressIndex) -> Self { Self::from(*value) diff --git a/crates/brk_core/src/structs/p2wpkhaddressindex.rs b/crates/brk_core/src/structs/p2wpkhaddressindex.rs index acaafed72..dbda26d63 100644 --- a/crates/brk_core/src/structs/p2wpkhaddressindex.rs +++ b/crates/brk_core/src/structs/p2wpkhaddressindex.rs @@ -31,6 +31,11 @@ impl From for P2WPKHAddressIndex { Self(value) } } +impl From for TypeIndex { + fn from(value: P2WPKHAddressIndex) -> Self { + value.0 + } +} impl From for usize { fn from(value: P2WPKHAddressIndex) -> Self { Self::from(*value) diff --git a/crates/brk_core/src/structs/p2wshaddressindex.rs b/crates/brk_core/src/structs/p2wshaddressindex.rs index 389ad902f..3d92cd598 100644 --- a/crates/brk_core/src/structs/p2wshaddressindex.rs +++ b/crates/brk_core/src/structs/p2wshaddressindex.rs @@ -31,6 +31,11 @@ impl From for P2WSHAddressIndex { Self(value) } } +impl From for TypeIndex { + fn from(value: P2WSHAddressIndex) -> Self { + value.0 + } +} impl From for u32 { fn from(value: P2WSHAddressIndex) -> Self { Self::from(*value) diff --git a/crates/brk_indexer/Cargo.toml b/crates/brk_indexer/Cargo.toml index b7404a840..e03022a31 100644 --- a/crates/brk_indexer/Cargo.toml +++ b/crates/brk_indexer/Cargo.toml @@ -17,5 +17,6 @@ brk_parser = { workspace = true } brk_store = { workspace = true } brk_vec = { workspace = true } color-eyre = { workspace = true } +fjall = { workspace = true } log = { workspace = true } rayon = { workspace = true } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 553943a73..dc836326b 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -56,6 +56,9 @@ impl Indexer { let starting_indexes = Indexes::try_from((&mut self.vecs, &self.stores, rpc)) .unwrap_or_else(|_report| Indexes::default()); + // dbg!(starting_indexes); + // panic!(); + exit.block(); self.stores .rollback_if_needed(&mut self.vecs, &starting_indexes)?; diff --git a/crates/brk_indexer/src/stores.rs b/crates/brk_indexer/src/stores.rs index 50a4a2b74..a740c91b0 100644 --- a/crates/brk_indexer/src/stores.rs +++ b/crates/brk_indexer/src/stores.rs @@ -1,11 +1,12 @@ use std::{borrow::Cow, fs, path::Path, thread}; use brk_core::{ - AddressBytes, AddressBytesHash, BlockHashPrefix, GroupedByAddressType, Height, OutputType, - Result, TxIndex, TxidPrefix, TypeIndex, TypeIndexWithOutputindex, Unit, Version, + AddressBytes, AddressBytesHash, BlockHashPrefix, GroupedByAddressType, Height, OutputIndex, + OutputType, Result, TxIndex, TxidPrefix, TypeIndex, TypeIndexWithOutputindex, Unit, Version, }; use brk_store::{AnyStore, Store}; -use brk_vec::AnyIterableVec; +use brk_vec::{AnyIterableVec, VecIterator}; +use fjall::{PersistMode, TransactionalKeyspace}; use rayon::prelude::*; use crate::Indexes; @@ -14,6 +15,8 @@ use super::Vecs; #[derive(Clone)] pub struct Stores { + pub keyspace: TransactionalKeyspace, + pub addressbyteshash_to_typeindex: Store, pub blockhashprefix_to_height: Store, pub txidprefix_to_txindex: Store, @@ -27,9 +30,18 @@ impl Stores { pub fn forced_import(path: &Path, version: Version) -> color_eyre::Result { fs::create_dir_all(path)?; + let keyspace = match brk_store::open_keyspace(path) { + Ok(keyspace) => keyspace, + Err(_) => { + fs::remove_dir_all(path)?; + return Self::forced_import(path, version); + } + }; + thread::scope(|scope| { let addressbyteshash_to_typeindex = scope.spawn(|| { Store::import( + &keyspace, path, "addressbyteshash_to_typeindex", version + VERSION + Version::ZERO, @@ -38,6 +50,7 @@ impl Stores { }); let blockhashprefix_to_height = scope.spawn(|| { Store::import( + &keyspace, path, "blockhashprefix_to_height", version + VERSION + Version::ZERO, @@ -46,6 +59,7 @@ impl Stores { }); let txidprefix_to_txindex = scope.spawn(|| { Store::import( + &keyspace, path, "txidprefix_to_txindex", version + VERSION + Version::ZERO, @@ -54,6 +68,7 @@ impl Stores { }); let p2aaddressindex_with_outputindex = scope.spawn(|| { Store::import( + &keyspace, path, "p2aaddressindex_with_outputindex", version + VERSION + Version::ZERO, @@ -62,6 +77,7 @@ impl Stores { }); let p2pk33addressindex_with_outputindex = scope.spawn(|| { Store::import( + &keyspace, path, "p2pk33addressindex_with_outputindex", version + VERSION + Version::ZERO, @@ -70,6 +86,7 @@ impl Stores { }); let p2pk65addressindex_with_outputindex = scope.spawn(|| { Store::import( + &keyspace, path, "p2pk65addressindex_with_outputindex", version + VERSION + Version::ZERO, @@ -78,6 +95,7 @@ impl Stores { }); let p2pkhaddressindex_with_outputindex = scope.spawn(|| { Store::import( + &keyspace, path, "p2pkhaddressindex_with_outputindex", version + VERSION + Version::ZERO, @@ -86,6 +104,7 @@ impl Stores { }); let p2shaddressindex_with_outputindex = scope.spawn(|| { Store::import( + &keyspace, path, "p2shaddressindex_with_outputindex", version + VERSION + Version::ZERO, @@ -94,6 +113,7 @@ impl Stores { }); let p2traddressindex_with_outputindex = scope.spawn(|| { Store::import( + &keyspace, path, "p2traddressindex_with_outputindex", version + VERSION + Version::ZERO, @@ -102,6 +122,7 @@ impl Stores { }); let p2wpkhaddressindex_with_outputindex = scope.spawn(|| { Store::import( + &keyspace, path, "p2wpkhaddressindex_with_outputindex", version + VERSION + Version::ZERO, @@ -110,6 +131,7 @@ impl Stores { }); let p2wshaddressindex_with_outputindex = scope.spawn(|| { Store::import( + &keyspace, path, "p2wshaddressindex_with_outputindex", version + VERSION + Version::ZERO, @@ -118,6 +140,8 @@ impl Stores { }); Ok(Self { + keyspace: keyspace.clone(), + addressbyteshash_to_typeindex: addressbyteshash_to_typeindex.join().unwrap()?, blockhashprefix_to_height: blockhashprefix_to_height.join().unwrap()?, txidprefix_to_txindex: txidprefix_to_txindex.join().unwrap()?, @@ -135,6 +159,66 @@ impl Stores { }) } + pub fn starting_height(&self) -> Height { + self.as_slice() + .into_iter() + .map(|store| { + let height = store.height().map(Height::incremented).unwrap_or_default(); + // dbg!((height, store.name())); + height + }) + .min() + .unwrap() + } + + pub fn commit(&mut self, height: Height) -> Result<()> { + self.as_mut_slice() + .into_par_iter() + .try_for_each(|store| store.commit(height))?; + + self.keyspace + .persist(PersistMode::SyncAll) + .map_err(|e| e.into()) + } + + pub fn rotate_memtables(&self) { + self.as_slice() + .into_iter() + .for_each(|store| store.rotate_memtable()); + } + + fn as_slice(&self) -> [&(dyn AnyStore + Send + Sync); 11] { + [ + &self.addressbyteshash_to_typeindex, + &self.blockhashprefix_to_height, + &self.txidprefix_to_txindex, + &self.addresstype_to_typeindex_with_outputindex.p2a, + &self.addresstype_to_typeindex_with_outputindex.p2pk33, + &self.addresstype_to_typeindex_with_outputindex.p2pk65, + &self.addresstype_to_typeindex_with_outputindex.p2pkh, + &self.addresstype_to_typeindex_with_outputindex.p2sh, + &self.addresstype_to_typeindex_with_outputindex.p2tr, + &self.addresstype_to_typeindex_with_outputindex.p2wpkh, + &self.addresstype_to_typeindex_with_outputindex.p2wsh, + ] + } + + fn as_mut_slice(&mut self) -> [&mut (dyn AnyStore + Send + Sync); 11] { + [ + &mut self.addressbyteshash_to_typeindex, + &mut self.blockhashprefix_to_height, + &mut self.txidprefix_to_txindex, + &mut self.addresstype_to_typeindex_with_outputindex.p2a, + &mut self.addresstype_to_typeindex_with_outputindex.p2pk33, + &mut self.addresstype_to_typeindex_with_outputindex.p2pk65, + &mut self.addresstype_to_typeindex_with_outputindex.p2pkh, + &mut self.addresstype_to_typeindex_with_outputindex.p2sh, + &mut self.addresstype_to_typeindex_with_outputindex.p2tr, + &mut self.addresstype_to_typeindex_with_outputindex.p2wpkh, + &mut self.addresstype_to_typeindex_with_outputindex.p2wsh, + ] + } + pub fn rollback_if_needed( &mut self, vecs: &mut Vecs, @@ -372,60 +456,48 @@ impl Stores { self.txidprefix_to_txindex.reset()?; } + if starting_indexes.outputindex != OutputIndex::ZERO { + let mut outputindex_to_typeindex_iter = vecs.outputindex_to_typeindex.into_iter(); + vecs.outputindex_to_outputtype + .iter_at(starting_indexes.outputindex) + .filter(|(_, outputtype)| outputtype.is_address()) + .for_each(|(outputindex, outputtype)| { + let outputtype = outputtype.into_owned(); + + let typeindex = outputindex_to_typeindex_iter.unwrap_get_inner(outputindex); + + self.addresstype_to_typeindex_with_outputindex + .get_mut(outputtype) + .unwrap() + .remove(TypeIndexWithOutputindex::from((typeindex, outputindex))); + }); + } else { + self.addresstype_to_typeindex_with_outputindex.p2a.reset()?; + self.addresstype_to_typeindex_with_outputindex + .p2pk33 + .reset()?; + self.addresstype_to_typeindex_with_outputindex + .p2pk65 + .reset()?; + self.addresstype_to_typeindex_with_outputindex + .p2pkh + .reset()?; + self.addresstype_to_typeindex_with_outputindex + .p2sh + .reset()?; + self.addresstype_to_typeindex_with_outputindex + .p2tr + .reset()?; + self.addresstype_to_typeindex_with_outputindex + .p2wpkh + .reset()?; + self.addresstype_to_typeindex_with_outputindex + .p2wsh + .reset()?; + } + self.commit(starting_indexes.height.decremented().unwrap_or_default())?; Ok(()) } - - pub fn starting_height(&self) -> Height { - self.as_slice() - .into_iter() - .map(|store| store.height().map(Height::incremented).unwrap_or_default()) - .min() - .unwrap() - } - - pub fn commit(&mut self, height: Height) -> Result<()> { - self.as_mut_slice() - .into_par_iter() - .try_for_each(|store| store.commit(height)) - } - - pub fn rotate_memtables(&self) { - self.as_slice() - .into_iter() - .for_each(|store| store.rotate_memtable()); - } - - fn as_slice(&self) -> [&(dyn AnyStore + Send + Sync); 11] { - [ - &self.addressbyteshash_to_typeindex, - &self.blockhashprefix_to_height, - &self.txidprefix_to_txindex, - &self.addresstype_to_typeindex_with_outputindex.p2a, - &self.addresstype_to_typeindex_with_outputindex.p2pk33, - &self.addresstype_to_typeindex_with_outputindex.p2pk65, - &self.addresstype_to_typeindex_with_outputindex.p2pkh, - &self.addresstype_to_typeindex_with_outputindex.p2sh, - &self.addresstype_to_typeindex_with_outputindex.p2tr, - &self.addresstype_to_typeindex_with_outputindex.p2wpkh, - &self.addresstype_to_typeindex_with_outputindex.p2wsh, - ] - } - - fn as_mut_slice(&mut self) -> [&mut (dyn AnyStore + Send + Sync); 11] { - [ - &mut self.addressbyteshash_to_typeindex, - &mut self.blockhashprefix_to_height, - &mut self.txidprefix_to_txindex, - &mut self.addresstype_to_typeindex_with_outputindex.p2a, - &mut self.addresstype_to_typeindex_with_outputindex.p2pk33, - &mut self.addresstype_to_typeindex_with_outputindex.p2pk65, - &mut self.addresstype_to_typeindex_with_outputindex.p2pkh, - &mut self.addresstype_to_typeindex_with_outputindex.p2sh, - &mut self.addresstype_to_typeindex_with_outputindex.p2tr, - &mut self.addresstype_to_typeindex_with_outputindex.p2wpkh, - &mut self.addresstype_to_typeindex_with_outputindex.p2wsh, - ] - } } diff --git a/crates/brk_store/examples/main.rs b/crates/brk_store/examples/main.rs index 362cfc6cf..82400ecdd 100644 --- a/crates/brk_store/examples/main.rs +++ b/crates/brk_store/examples/main.rs @@ -6,11 +6,16 @@ use brk_store::{AnyStore, Store}; fn main() -> Result<()> { let p = Path::new("./examples/_fjall"); - let mut store: Store = brk_store::Store::import(p, "n", Version::ZERO, None)?; + let keyspace = brk_store::open_keyspace(p)?; + + let mut store: Store = + brk_store::Store::import(&keyspace, p, "n", Version::ZERO, None)?; store.insert_if_needed(Dollars::from(10.0), Sats::FIFTY_BTC, Height::ZERO); store.commit(Height::ZERO)?; + store.persist()?; + Ok(()) } diff --git a/crates/brk_store/src/lib.rs b/crates/brk_store/src/lib.rs index ff9d8e33a..fde1fd458 100644 --- a/crates/brk_store/src/lib.rs +++ b/crates/brk_store/src/lib.rs @@ -42,6 +42,12 @@ const DEFAULT_BLOOM_FILTER_BITS: Option = Some(5); // const CHECK_COLLISIONS: bool = true; const MAJOR_FJALL_VERSION: Version = Version::TWO; +pub fn open_keyspace(path: &Path) -> fjall::Result { + fjall::Config::new(path.join("fjall")) + .max_write_buffer_size(32 * 1024 * 1024) + .open_transactional() +} + impl<'a, K, V> Store where K: Debug + Clone + From + Ord + 'a, @@ -49,32 +55,20 @@ where ByteView: From + From<&'a K> + From, { pub fn import( - path_: &Path, + keyspace: &TransactionalKeyspace, + path: &Path, name: &str, version: Version, bloom_filter_bits: Option>, ) -> Result { - let path = path_.join(name); - - fs::create_dir_all(&path)?; - - let keyspace = match fjall::Config::new(path.join("fjall")) - .max_write_buffer_size(32 * 1024 * 1024) - .open_transactional() - { - Ok(keyspace) => keyspace, - Err(_) => { - fs::remove_dir_all(path)?; - return Self::import(path_, name, version, bloom_filter_bits); - } - }; + fs::create_dir_all(path)?; let (meta, partition) = StoreMeta::checked_open( - &keyspace, - &path.join("meta"), + keyspace, + &path.join(format!("meta/{name}")), MAJOR_FJALL_VERSION + version, || { - Self::open_partition_handle(&keyspace, bloom_filter_bits).inspect_err(|e| { + Self::open_partition_handle(keyspace, name, bloom_filter_bits).inspect_err(|e| { eprintln!("{e}"); eprintln!("Delete {path:?} and try again"); }) @@ -185,11 +179,12 @@ where fn open_partition_handle( keyspace: &TransactionalKeyspace, + name: &str, bloom_filter_bits: Option>, ) -> Result { keyspace .open_partition( - "partition", + name, PartitionCreateOptions::default() .bloom_filter_bits(bloom_filter_bits.unwrap_or(DEFAULT_BLOOM_FILTER_BITS)) .max_memtable_size(8 * 1024 * 1024) @@ -234,8 +229,6 @@ where wtx.commit()?; - self.keyspace.persist(PersistMode::SyncAll)?; - self.rtx = self.keyspace.read_tx(); Ok(()) @@ -250,6 +243,7 @@ where { fn commit(&mut self, height: Height) -> Result<()> { if self.puts.is_empty() && self.dels.is_empty() { + self.meta.export(height)?; return Ok(()); } @@ -259,6 +253,16 @@ where self.commit_(height, dels.into_iter(), puts.into_iter()) } + fn persist(&self) -> Result<()> { + self.keyspace + .persist(PersistMode::SyncAll) + .map_err(|e| e.into()) + } + + fn name(&self) -> &'static str { + self.name + } + fn reset(&mut self) -> Result<()> { info!("Resetting {}...", self.name); @@ -268,12 +272,11 @@ where self.meta.reset(); - let partition = Self::open_partition_handle(&self.keyspace, self.bloom_filter_bits)?; + let partition = + Self::open_partition_handle(&self.keyspace, self.name, self.bloom_filter_bits)?; self.partition.replace(partition); - self.keyspace.persist(PersistMode::SyncAll)?; - Ok(()) } diff --git a/crates/brk_store/src/meta.rs b/crates/brk_store/src/meta.rs index cb1a26af6..5597afc14 100644 --- a/crates/brk_store/src/meta.rs +++ b/crates/brk_store/src/meta.rs @@ -4,7 +4,7 @@ use std::{ }; use brk_core::{Result, Version}; -use fjall::{TransactionalKeyspace, TransactionalPartitionHandle}; +use fjall::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle}; use super::Height; @@ -39,7 +39,7 @@ impl StoreMeta { fs::remove_dir_all(path)?; fs::create_dir(path)?; keyspace.delete_partition(partition)?; - keyspace.persist(fjall::PersistMode::SyncAll)?; + keyspace.persist(PersistMode::SyncAll)?; partition = open_partition_handle()?; } diff --git a/crates/brk_store/src/trait.rs b/crates/brk_store/src/trait.rs index dbba8c90a..24c86e6fe 100644 --- a/crates/brk_store/src/trait.rs +++ b/crates/brk_store/src/trait.rs @@ -3,8 +3,12 @@ use brk_core::{Height, Result, Version}; pub trait AnyStore { fn commit(&mut self, height: Height) -> Result<()>; + fn persist(&self) -> Result<()>; + fn reset(&mut self) -> Result<()>; + fn name(&self) -> &'static str; + fn rotate_memtable(&self); fn height(&self) -> Option; diff --git a/crates/brk_vec/src/traits/collectable.rs b/crates/brk_vec/src/traits/collectable.rs index a9aa2cc7e..87d7b840b 100644 --- a/crates/brk_vec/src/traits/collectable.rs +++ b/crates/brk_vec/src/traits/collectable.rs @@ -10,6 +10,10 @@ where I: StoredIndex, T: StoredType, { + fn collect(&self) -> Result> { + self.collect_range(None, None) + } + fn collect_range(&self, from: Option, to: Option) -> Result> { let len = self.len(); let from = from.unwrap_or_default(); diff --git a/websites/default/index.html b/websites/default/index.html index 1d343a71d..bab3e5e87 100644 --- a/websites/default/index.html +++ b/websites/default/index.html @@ -36,8 +36,9 @@ line-height: 1.5; -webkit-text-size-adjust: 100%; tab-size: 4; - font-family: "Geist mono", ui-monospace, SFMono-Regular, Menlo, Monaco, - Consolas, "Liberation Mono", "Courier New", monospace; + font-family: + "Geist mono", ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, + "Liberation Mono", "Courier New", monospace; font-feature-settings: "ss03"; -webkit-tap-highlight-color: transparent; } @@ -80,9 +81,9 @@ kbd, samp, pre { - font-family: var(--default-mono-font-family), ui-monospace, - SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", - "Courier New", monospace; + font-family: + var(--default-mono-font-family), ui-monospace, SFMono-Regular, Menlo, + Monaco, Consolas, "Liberation Mono", "Courier New", monospace; font-feature-settings: var( --default-mono-font-feature-settings, normal @@ -273,6 +274,8 @@ --default-main-width: 25rem; --bottom-area: 69vh; + + --cube: 50px; } [data-resize] { @@ -1365,6 +1368,46 @@ } } } + + #explorer { + #chain { + display: flex; + flex-direction: column; + gap: calc(var(--cube) * 1.1); + padding: 1rem; + + .cube { + width: var(--cube); + height: var(--cube); + overflow: hidden; + + .face { + transform-origin: 0 0; + position: absolute; + width: var(--cube); + height: var(--cube); + } + .front { + background-color: var(--orange); + transform: rotate(-30deg) skewX(-30deg) + translate(calc(var(--cube) * 1.3), calc(var(--cube) * 1.725)) + scaleY(0.864); + } + .top { + background-color: var(--yellow); + transform: rotate(30deg) skew(-30deg) + translate(calc(var(--cube) * 0.99), calc(var(--cube) * -0.265)) + scaleY(0.864); + } + .side { + background-color: var(--amber); + transform: rotate(30deg) skewX(30deg) + translate(calc(var(--cube) * 0.3), calc(var(--cube) * 0.6)) + scaleY(0.864); + } + } + } + } @@ -1678,6 +1721,7 @@