mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-25 23:29:58 -07:00
computer: store part 3
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
use std::{cmp::Ordering, collections::BTreeMap, path::Path, thread};
|
||||
use std::{cmp::Ordering, collections::BTreeMap, mem, path::Path, thread};
|
||||
|
||||
use brk_core::{
|
||||
DateIndex, GroupedByAddressType, Height, InputIndex, OutputIndex, OutputType, Result, Sats,
|
||||
StoredUsize, Version,
|
||||
AddressIndexToTypeIndedToOutputIndex, DateIndex, GroupedByAddressType, Height, InputIndex,
|
||||
OutputIndex, OutputType, Result, Sats, StoredUsize, Version,
|
||||
};
|
||||
use brk_exit::Exit;
|
||||
use brk_indexer::Indexer;
|
||||
@@ -200,6 +200,7 @@ impl Vecs {
|
||||
let height_to_timestamp_fixed = &indexes.height_to_timestamp_fixed;
|
||||
let outputindex_to_txindex = &indexes.outputindex_to_txindex;
|
||||
let outputindex_to_outputtype = &indexer.vecs.outputindex_to_outputtype;
|
||||
let outputindex_to_typeindex = &indexer.vecs.outputindex_to_typeindex;
|
||||
let height_to_unclaimed_rewards = transactions
|
||||
.indexes_to_unclaimed_rewards
|
||||
.sats
|
||||
@@ -219,6 +220,7 @@ impl Vecs {
|
||||
let inputindex_to_outputindex_mmap = inputindex_to_outputindex.mmap().load();
|
||||
let outputindex_to_value_mmap = outputindex_to_value.mmap().load();
|
||||
let outputindex_to_outputtype_mmap = outputindex_to_outputtype.mmap().load();
|
||||
let outputindex_to_typeindex_mmap = outputindex_to_typeindex.mmap().load();
|
||||
let outputindex_to_txindex_mmap = outputindex_to_txindex.mmap().load();
|
||||
let txindex_to_height_mmap = txindex_to_height.mmap().load();
|
||||
|
||||
@@ -226,9 +228,7 @@ impl Vecs {
|
||||
let mut height_to_first_inputindex_iter = height_to_first_inputindex.into_iter();
|
||||
let mut height_to_output_count_iter = height_to_output_count.into_iter();
|
||||
let mut height_to_input_count_iter = height_to_input_count.into_iter();
|
||||
// let mut outputindex_to_value_iter_2 = outputindex_to_value.into_iter();
|
||||
let mut height_to_close_iter = height_to_close.as_ref().map(|v| v.into_iter());
|
||||
// let mut outputindex_to_outputtype_iter_2 = outputindex_to_outputtype.into_iter();
|
||||
let mut height_to_unclaimed_rewards_iter = height_to_unclaimed_rewards.into_iter();
|
||||
let mut height_to_timestamp_fixed_iter = height_to_timestamp_fixed.into_iter();
|
||||
let mut dateindex_to_close_iter = dateindex_to_close.as_ref().map(|v| v.into_iter());
|
||||
@@ -249,6 +249,7 @@ impl Vecs {
|
||||
+ txindex_to_height.version()
|
||||
+ outputindex_to_txindex.version()
|
||||
+ outputindex_to_outputtype.version()
|
||||
+ outputindex_to_typeindex.version()
|
||||
+ height_to_unclaimed_rewards.version()
|
||||
+ height_to_close
|
||||
.as_ref()
|
||||
@@ -258,7 +259,8 @@ impl Vecs {
|
||||
.map_or(Version::ZERO, |v| v.version())
|
||||
+ height_to_date_fixed.version()
|
||||
+ dateindex_to_first_height.version()
|
||||
+ dateindex_to_height_count.version();
|
||||
+ dateindex_to_height_count.version()
|
||||
+ stores.as_slice().into_iter().map(|s| s.version()).sum();
|
||||
|
||||
separate_utxo_vecs
|
||||
.par_iter_mut()
|
||||
@@ -281,6 +283,9 @@ impl Vecs {
|
||||
.min()
|
||||
.unwrap_or_default()
|
||||
.min(chain_state_starting_height)
|
||||
.min(stores.starting_height())
|
||||
.min(Height::from(self.height_to_unspendable_supply.len()))
|
||||
.min(Height::from(self.height_to_opreturn_supply.len()))
|
||||
.cmp(&chain_state_starting_height)
|
||||
{
|
||||
Ordering::Greater => unreachable!(),
|
||||
@@ -308,11 +313,7 @@ impl Vecs {
|
||||
Ordering::Less => Height::ZERO,
|
||||
};
|
||||
|
||||
let starting_height = starting_indexes
|
||||
.height
|
||||
.min(stateful_starting_height)
|
||||
.min(Height::from(self.height_to_unspendable_supply.len()))
|
||||
.min(Height::from(self.height_to_opreturn_supply.len()));
|
||||
let starting_height = starting_indexes.height.min(stateful_starting_height);
|
||||
|
||||
if starting_height.is_zero() {
|
||||
info!("Starting processing utxos from the start");
|
||||
@@ -321,6 +322,8 @@ impl Vecs {
|
||||
chain_state = vec![];
|
||||
chain_state_starting_height = Height::ZERO;
|
||||
|
||||
stores.reset()?;
|
||||
|
||||
separate_utxo_vecs
|
||||
.par_iter_mut()
|
||||
.try_for_each(|(_, v)| v.state.price_to_amount.reset())?;
|
||||
@@ -350,6 +353,11 @@ impl Vecs {
|
||||
|
||||
let mut height = starting_height;
|
||||
|
||||
let mut addressindex_to_typedindex_to_sent_outputindex =
|
||||
AddressIndexToTypeIndedToOutputIndex::default();
|
||||
let mut addressindex_to_typedindex_to_received_outputindex =
|
||||
AddressIndexToTypeIndedToOutputIndex::default();
|
||||
|
||||
(height.unwrap_to_usize()..height_to_date_fixed.len())
|
||||
.map(Height::from)
|
||||
.try_for_each(|_height| -> color_eyre::Result<()> {
|
||||
@@ -375,7 +383,7 @@ impl Vecs {
|
||||
let output_count = height_to_output_count_iter.unwrap_get_inner(height);
|
||||
let input_count = height_to_input_count_iter.unwrap_get_inner(height);
|
||||
|
||||
let (mut height_to_sent, mut received) = thread::scope(|s| {
|
||||
let ((mut height_to_sent, new_addressindex_to_typedindex_to_sent_outputindex), (mut received, new_addressindex_to_typedindex_to_received_outputindex)) = thread::scope(|s| {
|
||||
if chain_state_starting_height <= height {
|
||||
s.spawn(|| {
|
||||
self.utxos_vecs
|
||||
@@ -407,12 +415,18 @@ impl Vecs {
|
||||
.unwrap()
|
||||
.into_owned();
|
||||
|
||||
// dbg!(input_type);
|
||||
let typeindex = outputindex_to_typeindex
|
||||
.get_or_read(outputindex, &outputindex_to_typeindex_mmap)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.into_owned();
|
||||
|
||||
if input_type.is_unspendable() {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
// stores.
|
||||
|
||||
let input_txindex = outputindex_to_txindex
|
||||
.get_or_read(outputindex, &outputindex_to_txindex_mmap)
|
||||
.unwrap()
|
||||
@@ -425,25 +439,39 @@ impl Vecs {
|
||||
.unwrap()
|
||||
.into_owned();
|
||||
|
||||
(height, value, input_type)
|
||||
(height, value, input_type, typeindex, outputindex)
|
||||
})
|
||||
.fold(
|
||||
BTreeMap::<Height, Transacted>::default,
|
||||
|mut tree, (height, value, input_type)| {
|
||||
|| {
|
||||
(
|
||||
BTreeMap::<Height, Transacted>::default(),
|
||||
AddressIndexToTypeIndedToOutputIndex::default(),
|
||||
)
|
||||
},
|
||||
|(mut tree, mut vecs), (height, value, input_type, typeindex, outputindex)| {
|
||||
tree.entry(height).or_default().iterate(value, input_type);
|
||||
tree
|
||||
if let Some( vec) = vecs.get_mut(input_type) {
|
||||
vec.push((typeindex, outputindex));
|
||||
}
|
||||
(tree, vecs)
|
||||
},
|
||||
)
|
||||
.reduce(BTreeMap::<Height, Transacted>::default, |first, second| {
|
||||
let (mut source, to_consume) = if first.len() > second.len() {
|
||||
(first, second)
|
||||
.reduce( || {
|
||||
(
|
||||
BTreeMap::<Height, Transacted>::default(),
|
||||
AddressIndexToTypeIndedToOutputIndex::default(),
|
||||
)
|
||||
}, |(first_tree, mut source_vecs), (second_tree, other_vecs)| {
|
||||
let (mut tree_source, tree_to_consume) = if first_tree.len() > second_tree.len() {
|
||||
(first_tree, second_tree)
|
||||
} else {
|
||||
(second, first)
|
||||
(second_tree, first_tree)
|
||||
};
|
||||
to_consume.into_iter().for_each(|(k, v)| {
|
||||
*source.entry(k).or_default() += v;
|
||||
tree_to_consume.into_iter().for_each(|(k, v)| {
|
||||
*tree_source.entry(k).or_default() += v;
|
||||
});
|
||||
source
|
||||
source_vecs.merge(other_vecs);
|
||||
(tree_source, source_vecs)
|
||||
})
|
||||
});
|
||||
|
||||
@@ -464,21 +492,36 @@ impl Vecs {
|
||||
.unwrap()
|
||||
.into_owned();
|
||||
|
||||
(value, output_type)
|
||||
let typeindex = outputindex_to_typeindex
|
||||
.get_or_read(outputindex, &outputindex_to_typeindex_mmap)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.into_owned();
|
||||
|
||||
(value, output_type, typeindex, outputindex)
|
||||
})
|
||||
.fold(
|
||||
Transacted::default,
|
||||
|mut transacted, (value, output_type)| {
|
||||
|| (Transacted::default(), AddressIndexToTypeIndedToOutputIndex::default()),
|
||||
|(mut transacted, mut vecs), (value, output_type, typeindex, outputindex)| {
|
||||
transacted.iterate(value, output_type);
|
||||
transacted
|
||||
if let Some(vec) = vecs.get_mut(output_type) {
|
||||
vec.push((typeindex, outputindex));
|
||||
}
|
||||
(transacted, vecs)
|
||||
},
|
||||
)
|
||||
.reduce(Transacted::default, |acc, transacted| acc + transacted)
|
||||
.reduce(|| (Transacted::default(), AddressIndexToTypeIndedToOutputIndex::default()), |(transacted, mut vecs), (other_transacted, other_vecs)| {
|
||||
vecs.merge(other_vecs);
|
||||
(transacted + other_transacted, vecs)
|
||||
})
|
||||
});
|
||||
|
||||
(sent_handle.join().unwrap(), received_handle.join().unwrap())
|
||||
});
|
||||
|
||||
addressindex_to_typedindex_to_sent_outputindex.merge(new_addressindex_to_typedindex_to_sent_outputindex);
|
||||
addressindex_to_typedindex_to_received_outputindex.merge(new_addressindex_to_typedindex_to_received_outputindex);
|
||||
|
||||
unspendable_supply += received
|
||||
.by_type
|
||||
.unspendable
|
||||
@@ -564,10 +607,19 @@ impl Vecs {
|
||||
)
|
||||
})?;
|
||||
|
||||
if height != Height::ZERO && height.unwrap_to_usize() % 20_000 == 0 {
|
||||
if height != Height::ZERO && height.unwrap_to_usize() % 10_000 == 0 {
|
||||
info!("Flushing...");
|
||||
exit.block();
|
||||
self.flush_states(height, &chain_state, exit)?;
|
||||
self.flush_states(
|
||||
height,
|
||||
&chain_state,
|
||||
exit,
|
||||
)?;
|
||||
stores.commit(
|
||||
height,
|
||||
mem::take(&mut addressindex_to_typedindex_to_sent_outputindex),
|
||||
mem::take( &mut addressindex_to_typedindex_to_received_outputindex)
|
||||
)?;
|
||||
exit.release();
|
||||
}
|
||||
|
||||
@@ -579,6 +631,13 @@ impl Vecs {
|
||||
info!("Flushing...");
|
||||
|
||||
self.flush_states(height, &chain_state, exit)?;
|
||||
stores.commit(
|
||||
height,
|
||||
mem::take(&mut addressindex_to_typedindex_to_sent_outputindex),
|
||||
mem::take(&mut addressindex_to_typedindex_to_received_outputindex),
|
||||
)?;
|
||||
} else {
|
||||
exit.block();
|
||||
}
|
||||
|
||||
info!("Computing overlapping...");
|
||||
@@ -649,6 +708,8 @@ impl Vecs {
|
||||
Some(&self.height_to_opreturn_supply),
|
||||
)?;
|
||||
|
||||
stores.rotate_memtables();
|
||||
|
||||
exit.release();
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user