mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-25 15:19:58 -07:00
computer: fix par compute_rest_part2 crashing external drives
This commit is contained in:
@@ -598,6 +598,7 @@ impl Stores {
|
||||
}
|
||||
|
||||
pub fn rotate_memtables(&self) {
|
||||
info!("Rotatin memtables...");
|
||||
self.as_slice()
|
||||
.into_iter()
|
||||
.for_each(|store| store.rotate_memtable());
|
||||
|
||||
@@ -10,6 +10,7 @@ use brk_vec::{
|
||||
AnyCollectableVec, AnyVec, CollectableVec, Computation, EagerVec, Format, GenericStoredVec,
|
||||
StoredIndex, StoredVec, UnsafeSlice, VecIterator,
|
||||
};
|
||||
use either::Either;
|
||||
use log::info;
|
||||
use rayon::prelude::*;
|
||||
|
||||
@@ -1238,26 +1239,28 @@ impl Vecs {
|
||||
&self.addresstype_to_height_to_empty_address_count,
|
||||
)?;
|
||||
|
||||
thread::scope(|scope| {
|
||||
scope.spawn(|| {
|
||||
self.utxo_vecs
|
||||
.as_mut_vecs()
|
||||
.par_iter_mut()
|
||||
.try_for_each(|(_, v)| {
|
||||
v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit)
|
||||
})
|
||||
.unwrap();
|
||||
});
|
||||
scope.spawn(|| {
|
||||
self.utxo_vecs
|
||||
.as_mut_vecs()
|
||||
.into_iter()
|
||||
.map(|(_, v)| v)
|
||||
.map(Either::Left)
|
||||
.chain(
|
||||
self.address_vecs
|
||||
.as_mut_vecs()
|
||||
.par_iter_mut()
|
||||
.try_for_each(|(_, v)| {
|
||||
v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit)
|
||||
})
|
||||
.unwrap();
|
||||
});
|
||||
});
|
||||
.into_iter()
|
||||
.map(|(_, v)| v)
|
||||
.map(Either::Right),
|
||||
)
|
||||
.collect::<Vec<Either<&mut utxo_cohort::Vecs, &mut address_cohort::Vecs>>>()
|
||||
.into_par_iter()
|
||||
.try_for_each(|either| match either {
|
||||
Either::Left(v) => {
|
||||
v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit)
|
||||
}
|
||||
Either::Right(v) => {
|
||||
v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit)
|
||||
}
|
||||
})?;
|
||||
|
||||
info!("Computing rest part 2...");
|
||||
|
||||
@@ -1278,49 +1281,57 @@ impl Vecs {
|
||||
.indexes_to_realized_cap
|
||||
.as_ref()
|
||||
.map(|v| v.dateindex.unwrap_last().clone());
|
||||
let dateindex_to_supply_ref = dateindex_to_supply.as_ref().unwrap();
|
||||
let height_to_realized_cap_ref = height_to_realized_cap.as_ref();
|
||||
let dateindex_to_realized_cap_ref = dateindex_to_realized_cap.as_ref();
|
||||
|
||||
thread::scope(|scope| {
|
||||
scope.spawn(|| {
|
||||
self.utxo_vecs
|
||||
.as_mut_vecs()
|
||||
.par_iter_mut()
|
||||
.try_for_each(|(_, v)| {
|
||||
v.compute_rest_part2(
|
||||
indexer,
|
||||
indexes,
|
||||
fetched,
|
||||
starting_indexes,
|
||||
market,
|
||||
&height_to_supply,
|
||||
dateindex_to_supply.as_ref().unwrap(),
|
||||
height_to_realized_cap.as_ref(),
|
||||
dateindex_to_realized_cap.as_ref(),
|
||||
exit,
|
||||
)
|
||||
})
|
||||
.unwrap();
|
||||
});
|
||||
scope.spawn(|| {
|
||||
let vecs = self
|
||||
.utxo_vecs
|
||||
.as_mut_vecs()
|
||||
.into_iter()
|
||||
.map(|(_, v)| v)
|
||||
.map(Either::Left)
|
||||
.chain(
|
||||
self.address_vecs
|
||||
.as_mut_vecs()
|
||||
.par_iter_mut()
|
||||
.try_for_each(|(_, v)| {
|
||||
v.compute_rest_part2(
|
||||
indexer,
|
||||
indexes,
|
||||
fetched,
|
||||
starting_indexes,
|
||||
market,
|
||||
&height_to_supply,
|
||||
dateindex_to_supply.as_ref().unwrap(),
|
||||
height_to_realized_cap.as_ref(),
|
||||
dateindex_to_realized_cap.as_ref(),
|
||||
exit,
|
||||
)
|
||||
})
|
||||
.unwrap();
|
||||
});
|
||||
});
|
||||
.into_iter()
|
||||
.map(|(_, v)| v)
|
||||
.map(Either::Right),
|
||||
)
|
||||
.collect::<Vec<Either<&mut utxo_cohort::Vecs, &mut address_cohort::Vecs>>>();
|
||||
|
||||
// Capped as external drives (even thunderbolt 4 SSDs) can be overwhelmed
|
||||
let chunk_size = (vecs.len() as f64 / 4.0).ceil() as usize;
|
||||
vecs.into_par_iter().chunks(chunk_size).try_for_each(|v| {
|
||||
v.into_iter().try_for_each(|either| match either {
|
||||
Either::Left(v) => v.compute_rest_part2(
|
||||
indexer,
|
||||
indexes,
|
||||
fetched,
|
||||
starting_indexes,
|
||||
market,
|
||||
&height_to_supply,
|
||||
dateindex_to_supply_ref,
|
||||
height_to_realized_cap_ref,
|
||||
dateindex_to_realized_cap_ref,
|
||||
exit,
|
||||
),
|
||||
Either::Right(v) => v.compute_rest_part2(
|
||||
indexer,
|
||||
indexes,
|
||||
fetched,
|
||||
starting_indexes,
|
||||
market,
|
||||
&height_to_supply,
|
||||
dateindex_to_supply_ref,
|
||||
height_to_realized_cap_ref,
|
||||
dateindex_to_realized_cap_ref,
|
||||
exit,
|
||||
),
|
||||
})
|
||||
})?;
|
||||
|
||||
info!("Computing rest part 2 (others)...");
|
||||
self.indexes_to_unspendable_supply.compute_rest(
|
||||
indexer,
|
||||
indexes,
|
||||
|
||||
Reference in New Issue
Block a user