diff --git a/Cargo.lock b/Cargo.lock index 8bfed405a..6190f7edf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -546,8 +546,8 @@ dependencies = [ "brk_vec", "color-eyre", "derive_deref", - "either", "fjall", + "libc", "log", "rayon", "serde", diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index 32cc7b581..7f5f19c30 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -21,8 +21,8 @@ brk_store = { workspace = true } brk_vec = { workspace = true } color-eyre = { workspace = true } derive_deref = { workspace = true } -either = "1.15.0" fjall = { workspace = true } +libc = "0.2.174" log = { workspace = true } rayon = { workspace = true } serde = { workspace = true } diff --git a/crates/brk_computer/src/all.rs b/crates/brk_computer/src/all.rs index 902614870..d5c15f536 100644 --- a/crates/brk_computer/src/all.rs +++ b/crates/brk_computer/src/all.rs @@ -178,15 +178,15 @@ impl Vecs { exit, )?; - self.cointime.compute( - indexer, - &self.indexes, - &starting_indexes, - self.fetched.as_ref(), - &self.transactions, - &self.stateful, - exit, - )?; + // self.cointime.compute( + // indexer, + // &self.indexes, + // &starting_indexes, + // self.fetched.as_ref(), + // &self.transactions, + // &self.stateful, + // exit, + // )?; Ok(()) } diff --git a/crates/brk_computer/src/stateful/address_cohorts.rs b/crates/brk_computer/src/stateful/address_cohorts.rs index 0bde42eed..583ccb333 100644 --- a/crates/brk_computer/src/stateful/address_cohorts.rs +++ b/crates/brk_computer/src/stateful/address_cohorts.rs @@ -1,16 +1,17 @@ -use std::path::Path; +use std::{path::Path, time::Instant}; use brk_core::{ - AddressGroups, ByAmountRange, ByGreatEqualAmount, ByLowerThanAmount, GroupFilter, Height, - Result, Version, + AddressGroups, Bitcoin, ByAmountRange, ByGreatEqualAmount, ByLowerThanAmount, DateIndex, + Dollars, GroupFilter, Height, Result, Version, }; use brk_exit::Exit; -use brk_vec::{Computation, Format}; +use brk_indexer::Indexer; +use brk_vec::{AnyIterableVec, Computation, Format}; use derive_deref::{Deref, DerefMut}; use rayon::prelude::*; use crate::{ - Indexes, fetched, indexes, + Indexes, fetched, indexes, market, stateful::{ address_cohort, r#trait::{CohortVecs, DynCohortVecs}, @@ -542,6 +543,55 @@ impl Vecs { }) } + pub fn compute_rest_part1( + &mut self, + indexer: &Indexer, + indexes: &indexes::Vecs, + fetched: Option<&fetched::Vecs>, + starting_indexes: &Indexes, + exit: &Exit, + ) -> color_eyre::Result<()> { + self.as_mut_vecs().into_par_iter().try_for_each(|(_, v)| { + v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) + }) + } + + #[allow(clippy::too_many_arguments)] + pub fn compute_rest_part2( + &mut self, + indexer: &Indexer, + indexes: &indexes::Vecs, + fetched: Option<&fetched::Vecs>, + starting_indexes: &Indexes, + market: &market::Vecs, + height_to_supply: &impl AnyIterableVec, + dateindex_to_supply: &impl AnyIterableVec, + height_to_realized_cap: Option<&impl AnyIterableVec>, + dateindex_to_realized_cap: Option<&impl AnyIterableVec>, + exit: &Exit, + ) -> color_eyre::Result<()> { + self.0 + .as_boxed_mut_vecs() + .into_iter() + .try_for_each(|mut v| { + unsafe { libc::sync() } + v.par_iter_mut().try_for_each(|(_, v)| { + v.compute_rest_part2( + indexer, + indexes, + fetched, + starting_indexes, + market, + height_to_supply, + dateindex_to_supply, + height_to_realized_cap, + dateindex_to_realized_cap, + exit, + ) + }) + }) + } + pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { self.as_mut_separate_vecs() .par_iter_mut() diff --git a/crates/brk_computer/src/stateful/mod.rs b/crates/brk_computer/src/stateful/mod.rs index 9d6c06068..0cf56e42b 100644 --- a/crates/brk_computer/src/stateful/mod.rs +++ b/crates/brk_computer/src/stateful/mod.rs @@ -1,4 +1,4 @@ -use std::{cmp::Ordering, collections::BTreeMap, mem, path::Path, thread}; +use std::{cmp::Ordering, collections::BTreeMap, mem, path::Path, thread, time::Instant}; use brk_core::{ AnyAddressDataIndexEnum, AnyAddressIndex, ByAddressType, ByAnyAddress, CheckedSub, DateIndex, @@ -13,7 +13,6 @@ use brk_vec::{ AnyCollectableVec, AnyIndexedVec, AnyVec, CollectableVec, Computation, EagerVec, Format, GenericStoredVec, IndexedVec, Mmap, StoredIndex, StoredVec, VecIterator, }; -use either::Either; use log::info; use rayon::prelude::*; @@ -567,37 +566,8 @@ impl Vecs { let dateindex_to_first_height = &indexes.dateindex_to_first_height; let dateindex_to_height_count = &indexes.dateindex_to_height_count; - let inputindex_to_outputindex_mmap = inputindex_to_outputindex.create_mmap()?; - let outputindex_to_value_mmap = outputindex_to_value.create_mmap()?; - let outputindex_to_outputtype_mmap = outputindex_to_outputtype.create_mmap()?; - let outputindex_to_typeindex_mmap = outputindex_to_typeindex.create_mmap()?; - - let mut height_to_first_outputindex_iter = height_to_first_outputindex.into_iter(); - let mut height_to_first_inputindex_iter = height_to_first_inputindex.into_iter(); - let mut height_to_first_p2aaddressindex_iter = height_to_first_p2aaddressindex.into_iter(); - let mut height_to_first_p2pk33addressindex_iter = - height_to_first_p2pk33addressindex.into_iter(); - let mut height_to_first_p2pk65addressindex_iter = - height_to_first_p2pk65addressindex.into_iter(); - let mut height_to_first_p2pkhaddressindex_iter = - height_to_first_p2pkhaddressindex.into_iter(); - let mut height_to_first_p2shaddressindex_iter = - height_to_first_p2shaddressindex.into_iter(); - let mut height_to_first_p2traddressindex_iter = - height_to_first_p2traddressindex.into_iter(); - let mut height_to_first_p2wpkhaddressindex_iter = - height_to_first_p2wpkhaddressindex.into_iter(); - let mut height_to_first_p2wshaddressindex_iter = - height_to_first_p2wshaddressindex.into_iter(); - let mut height_to_output_count_iter = height_to_output_count.into_iter(); - let mut height_to_input_count_iter = height_to_input_count.into_iter(); let mut height_to_close_iter = height_to_close.as_ref().map(|v| v.into_iter()); - let mut height_to_unclaimed_rewards_iter = height_to_unclaimed_rewards.into_iter(); let mut height_to_timestamp_fixed_iter = height_to_timestamp_fixed.into_iter(); - let mut dateindex_to_close_iter = dateindex_to_close.as_ref().map(|v| v.into_iter()); - let mut height_to_date_fixed_iter = height_to_date_fixed.into_iter(); - let mut dateindex_to_first_height_iter = dateindex_to_first_height.into_iter(); - let mut dateindex_to_height_count_iter = dateindex_to_height_count.into_iter(); let base_version = Version::ZERO + height_to_first_outputindex.version() @@ -777,7 +747,39 @@ impl Vecs { .try_for_each(|(_, v)| v.state.reset_price_to_amount())?; } - if starting_height < Height::from(height_to_date_fixed.len()) { + let last_height = indexer.vecs.height_to_blockhash.height(); + if starting_height <= last_height { + let inputindex_to_outputindex_mmap = inputindex_to_outputindex.create_mmap()?; + let outputindex_to_value_mmap = outputindex_to_value.create_mmap()?; + let outputindex_to_outputtype_mmap = outputindex_to_outputtype.create_mmap()?; + let outputindex_to_typeindex_mmap = outputindex_to_typeindex.create_mmap()?; + + let mut height_to_first_outputindex_iter = height_to_first_outputindex.into_iter(); + let mut height_to_first_inputindex_iter = height_to_first_inputindex.into_iter(); + let mut height_to_first_p2aaddressindex_iter = + height_to_first_p2aaddressindex.into_iter(); + let mut height_to_first_p2pk33addressindex_iter = + height_to_first_p2pk33addressindex.into_iter(); + let mut height_to_first_p2pk65addressindex_iter = + height_to_first_p2pk65addressindex.into_iter(); + let mut height_to_first_p2pkhaddressindex_iter = + height_to_first_p2pkhaddressindex.into_iter(); + let mut height_to_first_p2shaddressindex_iter = + height_to_first_p2shaddressindex.into_iter(); + let mut height_to_first_p2traddressindex_iter = + height_to_first_p2traddressindex.into_iter(); + let mut height_to_first_p2wpkhaddressindex_iter = + height_to_first_p2wpkhaddressindex.into_iter(); + let mut height_to_first_p2wshaddressindex_iter = + height_to_first_p2wshaddressindex.into_iter(); + let mut height_to_output_count_iter = height_to_output_count.into_iter(); + let mut height_to_input_count_iter = height_to_input_count.into_iter(); + let mut height_to_unclaimed_rewards_iter = height_to_unclaimed_rewards.into_iter(); + let mut dateindex_to_close_iter = dateindex_to_close.as_ref().map(|v| v.into_iter()); + let mut height_to_date_fixed_iter = height_to_date_fixed.into_iter(); + let mut dateindex_to_first_height_iter = dateindex_to_first_height.into_iter(); + let mut dateindex_to_height_count_iter = dateindex_to_height_count.into_iter(); + starting_indexes.update_from_height(starting_height, indexes); separate_utxo_vecs @@ -1265,18 +1267,11 @@ impl Vecs { info!("Computing overlapping..."); - thread::scope(|scope| { - scope.spawn(|| { - self.utxo_cohorts - .compute_overlapping_vecs(starting_indexes, exit) - .unwrap(); - }); - scope.spawn(|| { - self.address_cohorts - .compute_overlapping_vecs(starting_indexes, exit) - .unwrap(); - }); - }); + self.utxo_cohorts + .compute_overlapping_vecs(starting_indexes, exit)?; + + self.address_cohorts + .compute_overlapping_vecs(starting_indexes, exit)?; info!("Computing rest part 1..."); @@ -1318,12 +1313,30 @@ impl Vecs { }, )?; + self.indexes_to_unspendable_supply.compute_rest( + indexer, + indexes, + fetched, + starting_indexes, + exit, + Some(&self.height_to_unspendable_supply), + )?; + self.indexes_to_opreturn_supply.compute_rest( + indexer, + indexes, + fetched, + starting_indexes, + exit, + Some(&self.height_to_opreturn_supply), + )?; + self.addresstype_to_indexes_to_address_count.compute( indexes, starting_indexes, exit, &self.addresstype_to_height_to_address_count, )?; + self.addresstype_to_indexes_to_empty_address_count.compute( indexes, starting_indexes, @@ -1332,31 +1345,19 @@ impl Vecs { )?; self.utxo_cohorts - .as_mut_vecs() - .into_iter() - .map(|(_, v)| v) - .map(Either::Left) - .chain( - self.address_cohorts - .as_mut_vecs() - .into_iter() - .map(|(_, v)| v) - .map(Either::Right), - ) - .collect::>>() - .into_par_iter() - .try_for_each(|either| match either { - Either::Left(v) => { - v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) - } - Either::Right(v) => { - v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) - } - })?; + .compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit)?; + + self.address_cohorts.compute_rest_part1( + indexer, + indexes, + fetched, + starting_indexes, + exit, + )?; info!("Computing rest part 2..."); - let height_to_supply = self + let height_to_supply = &self .utxo_cohorts .all .1 @@ -1383,64 +1384,34 @@ impl Vecs { let height_to_realized_cap_ref = height_to_realized_cap.as_ref(); let dateindex_to_realized_cap_ref = dateindex_to_realized_cap.as_ref(); - self.utxo_cohorts - .as_mut_vecs() - .into_iter() - .map(|(_, v)| v) - .map(Either::Left) - .chain( - self.address_cohorts - .as_mut_vecs() - .into_iter() - .map(|(_, v)| v) - .map(Either::Right), - ) - .collect::>>() - .into_par_iter() - .try_for_each(|either| match either { - Either::Left(v) => v.compute_rest_part2( - indexer, - indexes, - fetched, - starting_indexes, - market, - &height_to_supply, - dateindex_to_supply_ref, - height_to_realized_cap_ref, - dateindex_to_realized_cap_ref, - exit, - ), - Either::Right(v) => v.compute_rest_part2( - indexer, - indexes, - fetched, - starting_indexes, - market, - &height_to_supply, - dateindex_to_supply_ref, - height_to_realized_cap_ref, - dateindex_to_realized_cap_ref, - exit, - ), - })?; + self.utxo_cohorts.compute_rest_part2( + indexer, + indexes, + fetched, + starting_indexes, + market, + height_to_supply, + dateindex_to_supply_ref, + height_to_realized_cap_ref, + dateindex_to_realized_cap_ref, + exit, + )?; - self.indexes_to_unspendable_supply.compute_rest( + self.address_cohorts.compute_rest_part2( indexer, indexes, fetched, starting_indexes, + market, + height_to_supply, + dateindex_to_supply_ref, + height_to_realized_cap_ref, + dateindex_to_realized_cap_ref, exit, - Some(&self.height_to_unspendable_supply), - )?; - self.indexes_to_opreturn_supply.compute_rest( - indexer, - indexes, - fetched, - starting_indexes, - exit, - Some(&self.height_to_opreturn_supply), )?; + unsafe { libc::sync() } + exit.release(); Ok(()) diff --git a/crates/brk_computer/src/stateful/utxo_cohorts.rs b/crates/brk_computer/src/stateful/utxo_cohorts.rs index e73fdcbe1..8f25930e1 100644 --- a/crates/brk_computer/src/stateful/utxo_cohorts.rs +++ b/crates/brk_computer/src/stateful/utxo_cohorts.rs @@ -1,17 +1,18 @@ -use std::{collections::BTreeMap, ops::ControlFlow, path::Path}; +use std::{collections::BTreeMap, ops::ControlFlow, path::Path, time::Instant}; use brk_core::{ - ByAgeRange, ByAmountRange, ByEpoch, ByGreatEqualAmount, ByLowerThanAmount, ByMaxAge, ByMinAge, - BySpendableType, ByTerm, CheckedSub, Dollars, GroupFilter, HalvingEpoch, Height, Result, - Timestamp, UTXOGroups, Version, + Bitcoin, ByAgeRange, ByAmountRange, ByEpoch, ByGreatEqualAmount, ByLowerThanAmount, ByMaxAge, + ByMinAge, BySpendableType, ByTerm, CheckedSub, DateIndex, Dollars, GroupFilter, HalvingEpoch, + Height, Result, Timestamp, UTXOGroups, Version, }; use brk_exit::Exit; -use brk_vec::{Computation, Format, StoredIndex}; +use brk_indexer::Indexer; +use brk_vec::{AnyIterableVec, Computation, Format, StoredIndex}; use derive_deref::{Deref, DerefMut}; use rayon::prelude::*; use crate::{ - Indexes, fetched, indexes, + Indexes, fetched, indexes, market, stateful::r#trait::DynCohortVecs, states::{BlockState, Transacted}, }; @@ -1704,6 +1705,55 @@ impl Vecs { }) } + pub fn compute_rest_part1( + &mut self, + indexer: &Indexer, + indexes: &indexes::Vecs, + fetched: Option<&fetched::Vecs>, + starting_indexes: &Indexes, + exit: &Exit, + ) -> color_eyre::Result<()> { + self.as_mut_vecs().into_par_iter().try_for_each(|(_, v)| { + v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) + }) + } + + #[allow(clippy::too_many_arguments)] + pub fn compute_rest_part2( + &mut self, + indexer: &Indexer, + indexes: &indexes::Vecs, + fetched: Option<&fetched::Vecs>, + starting_indexes: &Indexes, + market: &market::Vecs, + height_to_supply: &impl AnyIterableVec, + dateindex_to_supply: &impl AnyIterableVec, + height_to_realized_cap: Option<&impl AnyIterableVec>, + dateindex_to_realized_cap: Option<&impl AnyIterableVec>, + exit: &Exit, + ) -> color_eyre::Result<()> { + self.0 + .as_boxed_mut_vecs() + .into_iter() + .try_for_each(|mut v| { + unsafe { libc::sync() } + v.par_iter_mut().try_for_each(|(_, v)| { + v.compute_rest_part2( + indexer, + indexes, + fetched, + starting_indexes, + market, + height_to_supply, + dateindex_to_supply, + height_to_realized_cap, + dateindex_to_realized_cap, + exit, + ) + }) + }) + } + pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { self.as_mut_separate_vecs() .par_iter_mut() diff --git a/crates/brk_core/src/groups/address.rs b/crates/brk_core/src/groups/address.rs index 75a193745..09896f12f 100644 --- a/crates/brk_core/src/groups/address.rs +++ b/crates/brk_core/src/groups/address.rs @@ -8,6 +8,14 @@ pub struct AddressGroups { } impl AddressGroups { + pub fn as_boxed_mut_vecs(&mut self) -> Vec> { + vec![ + Box::new(self.ge_amount.as_mut_vec()), + Box::new(self.amount_range.as_mut_vec()), + Box::new(self.lt_amount.as_mut_vec()), + ] + } + pub fn as_mut_vecs(&mut self) -> Vec<&mut T> { self.ge_amount .as_mut_vec() diff --git a/crates/brk_core/src/groups/utxo.rs b/crates/brk_core/src/groups/utxo.rs index 05bc77921..2d240dce2 100644 --- a/crates/brk_core/src/groups/utxo.rs +++ b/crates/brk_core/src/groups/utxo.rs @@ -18,6 +18,21 @@ pub struct UTXOGroups { } impl UTXOGroups { + pub fn as_boxed_mut_vecs(&mut self) -> Vec> { + vec![ + Box::new([&mut self.all]), + Box::new(self.term.as_mut_vec()), + Box::new(self.max_age.as_mut_vec()), + Box::new(self.min_age.as_mut_vec()), + Box::new(self.ge_amount.as_mut_vec()), + Box::new(self.age_range.as_mut_vec()), + Box::new(self.epoch.as_mut_vec()), + Box::new(self.amount_range.as_mut_vec()), + Box::new(self.lt_amount.as_mut_vec()), + Box::new(self._type.as_mut_vec()), + ] + } + pub fn as_mut_vecs(&mut self) -> Vec<&mut T> { [&mut self.all] .into_iter() diff --git a/crates/brk_vec/src/variants/compressed.rs b/crates/brk_vec/src/variants/compressed.rs index 040ac68d7..307be2f0d 100644 --- a/crates/brk_vec/src/variants/compressed.rs +++ b/crates/brk_vec/src/variants/compressed.rs @@ -204,7 +204,7 @@ where } #[inline] fn mut_holes(&mut self) -> &mut BTreeSet { - self.inner.mut_holes() + panic!("unsupported") } #[inline] fn updated(&self) -> &BTreeMap { @@ -212,7 +212,7 @@ where } #[inline] fn mut_updated(&mut self) -> &mut BTreeMap { - self.inner.mut_updated() + panic!("unsupported") } #[inline] @@ -220,22 +220,6 @@ where self.inner.path() } - fn delete(&mut self, _: I) { - panic!("unsupported") - } - - fn unchecked_delete(&mut self, _: I) { - panic!("unsupported") - } - - fn fill_first_hole_or_push(&mut self, _: T) -> Result { - panic!("unsupported") - } - - fn update(&mut self, _: I, _: T) -> Result<()> { - panic!("unsupported") - } - fn flush(&mut self) -> Result<()> { let file_opt = self.inner.write_header_if_needed()?;