global: snapshot

This commit is contained in:
nym21
2025-12-08 17:05:38 +01:00
parent f4a1384dc4
commit 60a38b4108
19 changed files with 262 additions and 114 deletions
+55 -15
View File
@@ -108,11 +108,15 @@ impl Vecs {
let version = parent_version + VERSION;
macro_rules! eager {
($name:expr) => { EagerVec::forced_import(&db, $name, version)? };
($name:expr) => {
EagerVec::forced_import(&db, $name, version)?
};
}
macro_rules! lazy {
($name:expr, $source:expr) => {
LazyVecFrom1::init($name, version, $source.boxed_clone(), |index, _| Some(index))
LazyVecFrom1::init($name, version, $source.boxed_clone(), |index, _| {
Some(index)
})
};
}
@@ -120,18 +124,54 @@ impl Vecs {
txinindex_to_txoutindex: eager!("txoutindex"),
txoutindex_to_txoutindex: lazy!("txoutindex", indexer.vecs.txoutindex_to_value),
txinindex_to_txinindex: lazy!("txinindex", indexer.vecs.txinindex_to_outpoint),
p2pk33addressindex_to_p2pk33addressindex: lazy!("p2pk33addressindex", indexer.vecs.p2pk33addressindex_to_p2pk33bytes),
p2pk65addressindex_to_p2pk65addressindex: lazy!("p2pk65addressindex", indexer.vecs.p2pk65addressindex_to_p2pk65bytes),
p2pkhaddressindex_to_p2pkhaddressindex: lazy!("p2pkhaddressindex", indexer.vecs.p2pkhaddressindex_to_p2pkhbytes),
p2shaddressindex_to_p2shaddressindex: lazy!("p2shaddressindex", indexer.vecs.p2shaddressindex_to_p2shbytes),
p2traddressindex_to_p2traddressindex: lazy!("p2traddressindex", indexer.vecs.p2traddressindex_to_p2trbytes),
p2wpkhaddressindex_to_p2wpkhaddressindex: lazy!("p2wpkhaddressindex", indexer.vecs.p2wpkhaddressindex_to_p2wpkhbytes),
p2wshaddressindex_to_p2wshaddressindex: lazy!("p2wshaddressindex", indexer.vecs.p2wshaddressindex_to_p2wshbytes),
p2aaddressindex_to_p2aaddressindex: lazy!("p2aaddressindex", indexer.vecs.p2aaddressindex_to_p2abytes),
p2msoutputindex_to_p2msoutputindex: lazy!("p2msoutputindex", indexer.vecs.p2msoutputindex_to_txindex),
emptyoutputindex_to_emptyoutputindex: lazy!("emptyoutputindex", indexer.vecs.emptyoutputindex_to_txindex),
unknownoutputindex_to_unknownoutputindex: lazy!("unknownoutputindex", indexer.vecs.unknownoutputindex_to_txindex),
opreturnindex_to_opreturnindex: lazy!("opreturnindex", indexer.vecs.opreturnindex_to_txindex),
p2pk33addressindex_to_p2pk33addressindex: lazy!(
"p2pk33addressindex",
indexer.vecs.p2pk33addressindex_to_p2pk33bytes
),
p2pk65addressindex_to_p2pk65addressindex: lazy!(
"p2pk65addressindex",
indexer.vecs.p2pk65addressindex_to_p2pk65bytes
),
p2pkhaddressindex_to_p2pkhaddressindex: lazy!(
"p2pkhaddressindex",
indexer.vecs.p2pkhaddressindex_to_p2pkhbytes
),
p2shaddressindex_to_p2shaddressindex: lazy!(
"p2shaddressindex",
indexer.vecs.p2shaddressindex_to_p2shbytes
),
p2traddressindex_to_p2traddressindex: lazy!(
"p2traddressindex",
indexer.vecs.p2traddressindex_to_p2trbytes
),
p2wpkhaddressindex_to_p2wpkhaddressindex: lazy!(
"p2wpkhaddressindex",
indexer.vecs.p2wpkhaddressindex_to_p2wpkhbytes
),
p2wshaddressindex_to_p2wshaddressindex: lazy!(
"p2wshaddressindex",
indexer.vecs.p2wshaddressindex_to_p2wshbytes
),
p2aaddressindex_to_p2aaddressindex: lazy!(
"p2aaddressindex",
indexer.vecs.p2aaddressindex_to_p2abytes
),
p2msoutputindex_to_p2msoutputindex: lazy!(
"p2msoutputindex",
indexer.vecs.p2msoutputindex_to_txindex
),
emptyoutputindex_to_emptyoutputindex: lazy!(
"emptyoutputindex",
indexer.vecs.emptyoutputindex_to_txindex
),
unknownoutputindex_to_unknownoutputindex: lazy!(
"unknownoutputindex",
indexer.vecs.unknownoutputindex_to_txindex
),
opreturnindex_to_opreturnindex: lazy!(
"opreturnindex",
indexer.vecs.opreturnindex_to_txindex
),
txindex_to_txindex: lazy!("txindex", indexer.vecs.txindex_to_txid),
txindex_to_input_count: eager!("input_count"),
txindex_to_output_count: eager!("output_count"),
@@ -653,7 +693,7 @@ impl Vecs {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Indexes {
indexes: brk_indexer::Indexes,
pub dateindex: DateIndex,
+40 -23
View File
@@ -56,6 +56,14 @@ impl Computer {
indexer: &Indexer,
fetcher: Option<Fetcher>,
) -> Result<Self> {
info!("Increasing number of open files...");
let no_file_limit = rlimit::getrlimit(rlimit::Resource::NOFILE)?;
rlimit::setrlimit(
rlimit::Resource::NOFILE,
no_file_limit.0.max(10_000),
no_file_limit.1,
)?;
info!("Importing computer...");
let import_start = Instant::now();
@@ -181,7 +189,7 @@ impl Computer {
info!("Computed prices in {:?}", i.elapsed());
}
std::thread::scope(|scope| -> Result<()> {
thread::scope(|scope| -> Result<()> {
let blks = scope.spawn(|| -> Result<()> {
info!("Computing BLKs metadata...");
let i = Instant::now();
@@ -227,29 +235,38 @@ impl Computer {
Ok(())
})?;
info!("Computing pools...");
let i = Instant::now();
self.pools.compute(
indexer,
&self.indexes,
&starting_indexes,
&self.chain,
self.price.as_ref(),
exit,
)?;
info!("Computed pools in {:?}", i.elapsed());
let starting_indexes_clone = starting_indexes.clone();
thread::scope(|scope| -> Result<()> {
let pools = scope.spawn(|| -> Result<()> {
info!("Computing pools...");
let i = Instant::now();
self.pools.compute(
indexer,
&self.indexes,
&starting_indexes_clone,
&self.chain,
self.price.as_ref(),
exit,
)?;
info!("Computed pools in {:?}", i.elapsed());
Ok(())
});
info!("Computing stateful...");
let i = Instant::now();
self.stateful.compute(
indexer,
&self.indexes,
&self.chain,
self.price.as_ref(),
&mut starting_indexes,
exit,
)?;
info!("Computed stateful in {:?}", i.elapsed());
info!("Computing stateful...");
let i = Instant::now();
self.stateful.compute(
indexer,
&self.indexes,
&self.chain,
self.price.as_ref(),
&mut starting_indexes,
exit,
)?;
info!("Computed stateful in {:?}", i.elapsed());
pools.join().unwrap()?;
Ok(())
})?;
info!("Computing cointime...");
let i = Instant::now();
@@ -57,7 +57,7 @@ impl Vecs {
[
self.0
.ge_amount
.iter_mut()
.par_iter_mut()
.map(|vecs| {
let filter = vecs.filter().clone();
(
@@ -71,7 +71,7 @@ impl Vecs {
.collect::<Vec<_>>(),
self.0
.lt_amount
.iter_mut()
.par_iter_mut()
.map(|vecs| {
let filter = vecs.filter().clone();
(
@@ -98,7 +98,7 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.iter_mut()
self.par_iter_mut()
.try_for_each(|v| v.compute_rest_part1(indexes, price, starting_indexes, exit))
}
@@ -116,7 +116,7 @@ impl Vecs {
dateindex_to_realized_cap: Option<&impl IterableVec<DateIndex, Dollars>>,
exit: &Exit,
) -> Result<()> {
self.0.iter_mut().try_for_each(|v| {
self.0.par_iter_mut().try_for_each(|v| {
v.compute_rest_part2(
indexes,
price,
@@ -511,8 +511,8 @@ impl Vecs {
let by_size_range = &self.0.amount_range;
[(&mut self.0.all, by_date_range.iter().collect::<Vec<_>>())]
.into_iter()
.chain(self.0.min_age.iter_mut().map(|vecs| {
.into_par_iter()
.chain(self.0.min_age.par_iter_mut().map(|vecs| {
let filter = vecs.filter().clone();
(
vecs,
@@ -522,7 +522,7 @@ impl Vecs {
.collect::<Vec<_>>(),
)
}))
.chain(self.0.max_age.iter_mut().map(|vecs| {
.chain(self.0.max_age.par_iter_mut().map(|vecs| {
let filter = vecs.filter().clone();
(
vecs,
@@ -532,7 +532,7 @@ impl Vecs {
.collect::<Vec<_>>(),
)
}))
.chain(self.0.term.iter_mut().map(|vecs| {
.chain(self.0.term.par_iter_mut().map(|vecs| {
let filter = vecs.filter().clone();
(
vecs,
@@ -542,7 +542,7 @@ impl Vecs {
.collect::<Vec<_>>(),
)
}))
.chain(self.0.ge_amount.iter_mut().map(|vecs| {
.chain(self.0.ge_amount.par_iter_mut().map(|vecs| {
let filter = vecs.filter().clone();
(
vecs,
@@ -552,7 +552,7 @@ impl Vecs {
.collect::<Vec<_>>(),
)
}))
.chain(self.0.lt_amount.iter_mut().map(|vecs| {
.chain(self.0.lt_amount.par_iter_mut().map(|vecs| {
let filter = vecs.filter().clone();
(
vecs,
@@ -574,7 +574,7 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.iter_mut()
self.par_iter_mut()
.try_for_each(|v| v.compute_rest_part1(indexes, price, starting_indexes, exit))
}
@@ -592,7 +592,7 @@ impl Vecs {
dateindex_to_realized_cap: Option<&impl IterableVec<DateIndex, Dollars>>,
exit: &Exit,
) -> Result<()> {
self.iter_mut().try_for_each(|v| {
self.par_iter_mut().try_for_each(|v| {
v.compute_rest_part2(
indexes,
price,
@@ -624,9 +624,9 @@ impl Vecs {
/// Reset aggregate cohorts' price_to_amount when starting from scratch
pub fn reset_aggregate_price_to_amount(&mut self) -> Result<()> {
self.0.iter_aggregate_mut().try_for_each(|v| {
v.price_to_amount.reset()
})
self.0
.iter_aggregate_mut()
.try_for_each(|v| v.price_to_amount.reset())
}
/// Import aggregate cohorts' price_to_amount from disk when resuming from a checkpoint.