computer: final fix for external disks crashing

This commit is contained in:
nym21
2025-07-18 16:29:53 +02:00
parent 537d98b41b
commit dd51f91cab
9 changed files with 234 additions and 156 deletions

View File

@@ -178,15 +178,15 @@ impl Vecs {
exit,
)?;
self.cointime.compute(
indexer,
&self.indexes,
&starting_indexes,
self.fetched.as_ref(),
&self.transactions,
&self.stateful,
exit,
)?;
// self.cointime.compute(
// indexer,
// &self.indexes,
// &starting_indexes,
// self.fetched.as_ref(),
// &self.transactions,
// &self.stateful,
// exit,
// )?;
Ok(())
}

View File

@@ -1,16 +1,17 @@
use std::path::Path;
use std::{path::Path, time::Instant};
use brk_core::{
AddressGroups, ByAmountRange, ByGreatEqualAmount, ByLowerThanAmount, GroupFilter, Height,
Result, Version,
AddressGroups, Bitcoin, ByAmountRange, ByGreatEqualAmount, ByLowerThanAmount, DateIndex,
Dollars, GroupFilter, Height, Result, Version,
};
use brk_exit::Exit;
use brk_vec::{Computation, Format};
use brk_indexer::Indexer;
use brk_vec::{AnyIterableVec, Computation, Format};
use derive_deref::{Deref, DerefMut};
use rayon::prelude::*;
use crate::{
Indexes, fetched, indexes,
Indexes, fetched, indexes, market,
stateful::{
address_cohort,
r#trait::{CohortVecs, DynCohortVecs},
@@ -542,6 +543,55 @@ impl Vecs {
})
}
pub fn compute_rest_part1(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
fetched: Option<&fetched::Vecs>,
starting_indexes: &Indexes,
exit: &Exit,
) -> color_eyre::Result<()> {
self.as_mut_vecs().into_par_iter().try_for_each(|(_, v)| {
v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit)
})
}
#[allow(clippy::too_many_arguments)]
pub fn compute_rest_part2(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
fetched: Option<&fetched::Vecs>,
starting_indexes: &Indexes,
market: &market::Vecs,
height_to_supply: &impl AnyIterableVec<Height, Bitcoin>,
dateindex_to_supply: &impl AnyIterableVec<DateIndex, Bitcoin>,
height_to_realized_cap: Option<&impl AnyIterableVec<Height, Dollars>>,
dateindex_to_realized_cap: Option<&impl AnyIterableVec<DateIndex, Dollars>>,
exit: &Exit,
) -> color_eyre::Result<()> {
self.0
.as_boxed_mut_vecs()
.into_iter()
.try_for_each(|mut v| {
unsafe { libc::sync() }
v.par_iter_mut().try_for_each(|(_, v)| {
v.compute_rest_part2(
indexer,
indexes,
fetched,
starting_indexes,
market,
height_to_supply,
dateindex_to_supply,
height_to_realized_cap,
dateindex_to_realized_cap,
exit,
)
})
})
}
pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
self.as_mut_separate_vecs()
.par_iter_mut()

View File

@@ -1,4 +1,4 @@
use std::{cmp::Ordering, collections::BTreeMap, mem, path::Path, thread};
use std::{cmp::Ordering, collections::BTreeMap, mem, path::Path, thread, time::Instant};
use brk_core::{
AnyAddressDataIndexEnum, AnyAddressIndex, ByAddressType, ByAnyAddress, CheckedSub, DateIndex,
@@ -13,7 +13,6 @@ use brk_vec::{
AnyCollectableVec, AnyIndexedVec, AnyVec, CollectableVec, Computation, EagerVec, Format,
GenericStoredVec, IndexedVec, Mmap, StoredIndex, StoredVec, VecIterator,
};
use either::Either;
use log::info;
use rayon::prelude::*;
@@ -567,37 +566,8 @@ impl Vecs {
let dateindex_to_first_height = &indexes.dateindex_to_first_height;
let dateindex_to_height_count = &indexes.dateindex_to_height_count;
let inputindex_to_outputindex_mmap = inputindex_to_outputindex.create_mmap()?;
let outputindex_to_value_mmap = outputindex_to_value.create_mmap()?;
let outputindex_to_outputtype_mmap = outputindex_to_outputtype.create_mmap()?;
let outputindex_to_typeindex_mmap = outputindex_to_typeindex.create_mmap()?;
let mut height_to_first_outputindex_iter = height_to_first_outputindex.into_iter();
let mut height_to_first_inputindex_iter = height_to_first_inputindex.into_iter();
let mut height_to_first_p2aaddressindex_iter = height_to_first_p2aaddressindex.into_iter();
let mut height_to_first_p2pk33addressindex_iter =
height_to_first_p2pk33addressindex.into_iter();
let mut height_to_first_p2pk65addressindex_iter =
height_to_first_p2pk65addressindex.into_iter();
let mut height_to_first_p2pkhaddressindex_iter =
height_to_first_p2pkhaddressindex.into_iter();
let mut height_to_first_p2shaddressindex_iter =
height_to_first_p2shaddressindex.into_iter();
let mut height_to_first_p2traddressindex_iter =
height_to_first_p2traddressindex.into_iter();
let mut height_to_first_p2wpkhaddressindex_iter =
height_to_first_p2wpkhaddressindex.into_iter();
let mut height_to_first_p2wshaddressindex_iter =
height_to_first_p2wshaddressindex.into_iter();
let mut height_to_output_count_iter = height_to_output_count.into_iter();
let mut height_to_input_count_iter = height_to_input_count.into_iter();
let mut height_to_close_iter = height_to_close.as_ref().map(|v| v.into_iter());
let mut height_to_unclaimed_rewards_iter = height_to_unclaimed_rewards.into_iter();
let mut height_to_timestamp_fixed_iter = height_to_timestamp_fixed.into_iter();
let mut dateindex_to_close_iter = dateindex_to_close.as_ref().map(|v| v.into_iter());
let mut height_to_date_fixed_iter = height_to_date_fixed.into_iter();
let mut dateindex_to_first_height_iter = dateindex_to_first_height.into_iter();
let mut dateindex_to_height_count_iter = dateindex_to_height_count.into_iter();
let base_version = Version::ZERO
+ height_to_first_outputindex.version()
@@ -777,7 +747,39 @@ impl Vecs {
.try_for_each(|(_, v)| v.state.reset_price_to_amount())?;
}
if starting_height < Height::from(height_to_date_fixed.len()) {
let last_height = indexer.vecs.height_to_blockhash.height();
if starting_height <= last_height {
let inputindex_to_outputindex_mmap = inputindex_to_outputindex.create_mmap()?;
let outputindex_to_value_mmap = outputindex_to_value.create_mmap()?;
let outputindex_to_outputtype_mmap = outputindex_to_outputtype.create_mmap()?;
let outputindex_to_typeindex_mmap = outputindex_to_typeindex.create_mmap()?;
let mut height_to_first_outputindex_iter = height_to_first_outputindex.into_iter();
let mut height_to_first_inputindex_iter = height_to_first_inputindex.into_iter();
let mut height_to_first_p2aaddressindex_iter =
height_to_first_p2aaddressindex.into_iter();
let mut height_to_first_p2pk33addressindex_iter =
height_to_first_p2pk33addressindex.into_iter();
let mut height_to_first_p2pk65addressindex_iter =
height_to_first_p2pk65addressindex.into_iter();
let mut height_to_first_p2pkhaddressindex_iter =
height_to_first_p2pkhaddressindex.into_iter();
let mut height_to_first_p2shaddressindex_iter =
height_to_first_p2shaddressindex.into_iter();
let mut height_to_first_p2traddressindex_iter =
height_to_first_p2traddressindex.into_iter();
let mut height_to_first_p2wpkhaddressindex_iter =
height_to_first_p2wpkhaddressindex.into_iter();
let mut height_to_first_p2wshaddressindex_iter =
height_to_first_p2wshaddressindex.into_iter();
let mut height_to_output_count_iter = height_to_output_count.into_iter();
let mut height_to_input_count_iter = height_to_input_count.into_iter();
let mut height_to_unclaimed_rewards_iter = height_to_unclaimed_rewards.into_iter();
let mut dateindex_to_close_iter = dateindex_to_close.as_ref().map(|v| v.into_iter());
let mut height_to_date_fixed_iter = height_to_date_fixed.into_iter();
let mut dateindex_to_first_height_iter = dateindex_to_first_height.into_iter();
let mut dateindex_to_height_count_iter = dateindex_to_height_count.into_iter();
starting_indexes.update_from_height(starting_height, indexes);
separate_utxo_vecs
@@ -1265,18 +1267,11 @@ impl Vecs {
info!("Computing overlapping...");
thread::scope(|scope| {
scope.spawn(|| {
self.utxo_cohorts
.compute_overlapping_vecs(starting_indexes, exit)
.unwrap();
});
scope.spawn(|| {
self.address_cohorts
.compute_overlapping_vecs(starting_indexes, exit)
.unwrap();
});
});
self.utxo_cohorts
.compute_overlapping_vecs(starting_indexes, exit)?;
self.address_cohorts
.compute_overlapping_vecs(starting_indexes, exit)?;
info!("Computing rest part 1...");
@@ -1318,12 +1313,30 @@ impl Vecs {
},
)?;
self.indexes_to_unspendable_supply.compute_rest(
indexer,
indexes,
fetched,
starting_indexes,
exit,
Some(&self.height_to_unspendable_supply),
)?;
self.indexes_to_opreturn_supply.compute_rest(
indexer,
indexes,
fetched,
starting_indexes,
exit,
Some(&self.height_to_opreturn_supply),
)?;
self.addresstype_to_indexes_to_address_count.compute(
indexes,
starting_indexes,
exit,
&self.addresstype_to_height_to_address_count,
)?;
self.addresstype_to_indexes_to_empty_address_count.compute(
indexes,
starting_indexes,
@@ -1332,31 +1345,19 @@ impl Vecs {
)?;
self.utxo_cohorts
.as_mut_vecs()
.into_iter()
.map(|(_, v)| v)
.map(Either::Left)
.chain(
self.address_cohorts
.as_mut_vecs()
.into_iter()
.map(|(_, v)| v)
.map(Either::Right),
)
.collect::<Vec<Either<&mut utxo_cohort::Vecs, &mut address_cohort::Vecs>>>()
.into_par_iter()
.try_for_each(|either| match either {
Either::Left(v) => {
v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit)
}
Either::Right(v) => {
v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit)
}
})?;
.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit)?;
self.address_cohorts.compute_rest_part1(
indexer,
indexes,
fetched,
starting_indexes,
exit,
)?;
info!("Computing rest part 2...");
let height_to_supply = self
let height_to_supply = &self
.utxo_cohorts
.all
.1
@@ -1383,64 +1384,34 @@ impl Vecs {
let height_to_realized_cap_ref = height_to_realized_cap.as_ref();
let dateindex_to_realized_cap_ref = dateindex_to_realized_cap.as_ref();
self.utxo_cohorts
.as_mut_vecs()
.into_iter()
.map(|(_, v)| v)
.map(Either::Left)
.chain(
self.address_cohorts
.as_mut_vecs()
.into_iter()
.map(|(_, v)| v)
.map(Either::Right),
)
.collect::<Vec<Either<&mut utxo_cohort::Vecs, &mut address_cohort::Vecs>>>()
.into_par_iter()
.try_for_each(|either| match either {
Either::Left(v) => v.compute_rest_part2(
indexer,
indexes,
fetched,
starting_indexes,
market,
&height_to_supply,
dateindex_to_supply_ref,
height_to_realized_cap_ref,
dateindex_to_realized_cap_ref,
exit,
),
Either::Right(v) => v.compute_rest_part2(
indexer,
indexes,
fetched,
starting_indexes,
market,
&height_to_supply,
dateindex_to_supply_ref,
height_to_realized_cap_ref,
dateindex_to_realized_cap_ref,
exit,
),
})?;
self.utxo_cohorts.compute_rest_part2(
indexer,
indexes,
fetched,
starting_indexes,
market,
height_to_supply,
dateindex_to_supply_ref,
height_to_realized_cap_ref,
dateindex_to_realized_cap_ref,
exit,
)?;
self.indexes_to_unspendable_supply.compute_rest(
self.address_cohorts.compute_rest_part2(
indexer,
indexes,
fetched,
starting_indexes,
market,
height_to_supply,
dateindex_to_supply_ref,
height_to_realized_cap_ref,
dateindex_to_realized_cap_ref,
exit,
Some(&self.height_to_unspendable_supply),
)?;
self.indexes_to_opreturn_supply.compute_rest(
indexer,
indexes,
fetched,
starting_indexes,
exit,
Some(&self.height_to_opreturn_supply),
)?;
unsafe { libc::sync() }
exit.release();
Ok(())

View File

@@ -1,17 +1,18 @@
use std::{collections::BTreeMap, ops::ControlFlow, path::Path};
use std::{collections::BTreeMap, ops::ControlFlow, path::Path, time::Instant};
use brk_core::{
ByAgeRange, ByAmountRange, ByEpoch, ByGreatEqualAmount, ByLowerThanAmount, ByMaxAge, ByMinAge,
BySpendableType, ByTerm, CheckedSub, Dollars, GroupFilter, HalvingEpoch, Height, Result,
Timestamp, UTXOGroups, Version,
Bitcoin, ByAgeRange, ByAmountRange, ByEpoch, ByGreatEqualAmount, ByLowerThanAmount, ByMaxAge,
ByMinAge, BySpendableType, ByTerm, CheckedSub, DateIndex, Dollars, GroupFilter, HalvingEpoch,
Height, Result, Timestamp, UTXOGroups, Version,
};
use brk_exit::Exit;
use brk_vec::{Computation, Format, StoredIndex};
use brk_indexer::Indexer;
use brk_vec::{AnyIterableVec, Computation, Format, StoredIndex};
use derive_deref::{Deref, DerefMut};
use rayon::prelude::*;
use crate::{
Indexes, fetched, indexes,
Indexes, fetched, indexes, market,
stateful::r#trait::DynCohortVecs,
states::{BlockState, Transacted},
};
@@ -1704,6 +1705,55 @@ impl Vecs {
})
}
pub fn compute_rest_part1(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
fetched: Option<&fetched::Vecs>,
starting_indexes: &Indexes,
exit: &Exit,
) -> color_eyre::Result<()> {
self.as_mut_vecs().into_par_iter().try_for_each(|(_, v)| {
v.compute_rest_part1(indexer, indexes, fetched, starting_indexes, exit)
})
}
#[allow(clippy::too_many_arguments)]
pub fn compute_rest_part2(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
fetched: Option<&fetched::Vecs>,
starting_indexes: &Indexes,
market: &market::Vecs,
height_to_supply: &impl AnyIterableVec<Height, Bitcoin>,
dateindex_to_supply: &impl AnyIterableVec<DateIndex, Bitcoin>,
height_to_realized_cap: Option<&impl AnyIterableVec<Height, Dollars>>,
dateindex_to_realized_cap: Option<&impl AnyIterableVec<DateIndex, Dollars>>,
exit: &Exit,
) -> color_eyre::Result<()> {
self.0
.as_boxed_mut_vecs()
.into_iter()
.try_for_each(|mut v| {
unsafe { libc::sync() }
v.par_iter_mut().try_for_each(|(_, v)| {
v.compute_rest_part2(
indexer,
indexes,
fetched,
starting_indexes,
market,
height_to_supply,
dateindex_to_supply,
height_to_realized_cap,
dateindex_to_realized_cap,
exit,
)
})
})
}
pub fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
self.as_mut_separate_vecs()
.par_iter_mut()