diff --git a/Cargo.lock b/Cargo.lock index 027852e76..aeaa37ecc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -546,6 +546,7 @@ dependencies = [ "brk_vec", "color-eyre", "derive_deref", + "either", "fjall", "jiff", "log", diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index fde6a5e87..18c70b469 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -21,6 +21,7 @@ brk_store = { workspace = true } brk_vec = { workspace = true } color-eyre = { workspace = true } derive_deref = { workspace = true } +either = "1.15.0" fjall = { workspace = true } jiff = { workspace = true } log = { workspace = true } diff --git a/crates/brk_computer/src/stores.rs b/crates/brk_computer/src/stores.rs index 024752e42..e887321b6 100644 --- a/crates/brk_computer/src/stores.rs +++ b/crates/brk_computer/src/stores.rs @@ -598,6 +598,7 @@ impl Stores { } pub fn rotate_memtables(&self) { + info!("Rotatin memtables..."); self.as_slice() .into_iter() .for_each(|store| store.rotate_memtable()); diff --git a/crates/brk_computer/src/vecs/stateful/mod.rs b/crates/brk_computer/src/vecs/stateful/mod.rs index 49c1516d3..cdb9ca83a 100644 --- a/crates/brk_computer/src/vecs/stateful/mod.rs +++ b/crates/brk_computer/src/vecs/stateful/mod.rs @@ -10,6 +10,7 @@ use brk_vec::{ AnyCollectableVec, AnyVec, CollectableVec, Computation, EagerVec, Format, GenericStoredVec, StoredIndex, StoredVec, UnsafeSlice, VecIterator, }; +use either::Either; use log::info; use rayon::prelude::*; @@ -1238,26 +1239,28 @@ impl Vecs { &self.addresstype_to_height_to_empty_address_count, )?; - thread::scope(|scope| { - scope.spawn(|| { - self.utxo_vecs - .as_mut_vecs() - .par_iter_mut() - .try_for_each(|(_, v)| { - v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) - }) - .unwrap(); - }); - scope.spawn(|| { + self.utxo_vecs + .as_mut_vecs() + .into_iter() + .map(|(_, v)| v) + .map(Either::Left) + .chain( self.address_vecs .as_mut_vecs() - .par_iter_mut() - .try_for_each(|(_, v)| { - v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) - }) - .unwrap(); - }); - }); + .into_iter() + .map(|(_, v)| v) + .map(Either::Right), + ) + .collect::>>() + .into_par_iter() + .try_for_each(|either| match either { + Either::Left(v) => { + v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) + } + Either::Right(v) => { + v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit) + } + })?; info!("Computing rest part 2..."); @@ -1278,49 +1281,57 @@ impl Vecs { .indexes_to_realized_cap .as_ref() .map(|v| v.dateindex.unwrap_last().clone()); + let dateindex_to_supply_ref = dateindex_to_supply.as_ref().unwrap(); + let height_to_realized_cap_ref = height_to_realized_cap.as_ref(); + let dateindex_to_realized_cap_ref = dateindex_to_realized_cap.as_ref(); - thread::scope(|scope| { - scope.spawn(|| { - self.utxo_vecs - .as_mut_vecs() - .par_iter_mut() - .try_for_each(|(_, v)| { - v.compute_rest_part2( - indexer, - indexes, - fetched, - starting_indexes, - market, - &height_to_supply, - dateindex_to_supply.as_ref().unwrap(), - height_to_realized_cap.as_ref(), - dateindex_to_realized_cap.as_ref(), - exit, - ) - }) - .unwrap(); - }); - scope.spawn(|| { + let vecs = self + .utxo_vecs + .as_mut_vecs() + .into_iter() + .map(|(_, v)| v) + .map(Either::Left) + .chain( self.address_vecs .as_mut_vecs() - .par_iter_mut() - .try_for_each(|(_, v)| { - v.compute_rest_part2( - indexer, - indexes, - fetched, - starting_indexes, - market, - &height_to_supply, - dateindex_to_supply.as_ref().unwrap(), - height_to_realized_cap.as_ref(), - dateindex_to_realized_cap.as_ref(), - exit, - ) - }) - .unwrap(); - }); - }); + .into_iter() + .map(|(_, v)| v) + .map(Either::Right), + ) + .collect::>>(); + + // Capped as external drives (even thunderbolt 4 SSDs) can be overwhelmed + let chunk_size = (vecs.len() as f64 / 4.0).ceil() as usize; + vecs.into_par_iter().chunks(chunk_size).try_for_each(|v| { + v.into_iter().try_for_each(|either| match either { + Either::Left(v) => v.compute_rest_part2( + indexer, + indexes, + fetched, + starting_indexes, + market, + &height_to_supply, + dateindex_to_supply_ref, + height_to_realized_cap_ref, + dateindex_to_realized_cap_ref, + exit, + ), + Either::Right(v) => v.compute_rest_part2( + indexer, + indexes, + fetched, + starting_indexes, + market, + &height_to_supply, + dateindex_to_supply_ref, + height_to_realized_cap_ref, + dateindex_to_realized_cap_ref, + exit, + ), + }) + })?; + + info!("Computing rest part 2 (others)..."); self.indexes_to_unspendable_supply.compute_rest( indexer, indexes,