global: move addressindex_to_outputindex stores from computer to indexer

This commit is contained in:
nym21
2025-07-04 17:30:43 +02:00
parent 6d35c26b3f
commit 7f07b0daa7
47 changed files with 515 additions and 1391 deletions

View File

@@ -1,8 +1,8 @@
use std::{cmp::Ordering, collections::BTreeMap, mem, path::Path, thread};
use brk_core::{
AddressData, CheckedSub, DateIndex, Dollars, EmptyAddressData, Height, InputIndex, OutputIndex,
OutputType, Result, Sats, StoredUsize, Version,
AddressData, CheckedSub, DateIndex, Dollars, EmptyAddressData, GroupedByAddressType, Height,
InputIndex, OutputIndex, OutputType, Result, Sats, Version,
};
use brk_exit::Exit;
use brk_indexer::Indexer;
@@ -14,7 +14,15 @@ use log::info;
use rayon::prelude::*;
use crate::{
BlockState, GroupedByAddressType, SupplyState, Transacted, stores::Stores, vecs::market,
BlockState, SupplyState, Transacted,
stores::Stores,
vecs::{
market,
stateful::{
addresstype_to_addresscount::AddressTypeToAddressCount,
addresstype_to_addresscount_vec::AddressTypeToAddressCountVec,
},
},
};
use super::{
@@ -25,6 +33,8 @@ use super::{
mod address_cohort;
mod address_cohorts;
mod addresstype_to_addresscount;
mod addresstype_to_addresscount_vec;
mod addresstype_to_typeindex_tree;
mod addresstype_to_typeindex_vec;
mod common;
@@ -50,9 +60,8 @@ pub struct Vecs {
pub indexes_to_opreturn_supply: ComputedValueVecsFromHeight,
// pub height_to_address_count: EagerVec<Height, StoredUsize>,
// pub height_to_empty_address_count: EagerVec<Height, StoredUsize>,
pub addresstype_to_height_to_address_count: GroupedByAddressType<EagerVec<Height, StoredUsize>>,
pub addresstype_to_height_to_empty_address_count:
GroupedByAddressType<EagerVec<Height, StoredUsize>>,
pub addresstype_to_height_to_address_count: AddressTypeToAddressCountVec,
pub addresstype_to_height_to_empty_address_count: AddressTypeToAddressCountVec,
pub utxo_vecs: utxo_cohorts::Vecs,
pub address_vecs: address_cohorts::Vecs,
}
@@ -122,106 +131,110 @@ impl Vecs {
// version + VERSION + Version::ZERO,
// format,
// )?,
addresstype_to_height_to_address_count: GroupedByAddressType {
p2pk65: EagerVec::forced_import(
path,
"p2pk65_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2pk33: EagerVec::forced_import(
path,
"p2pk33_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2pkh: EagerVec::forced_import(
path,
"p2pkh_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2sh: EagerVec::forced_import(
path,
"p2sh_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2wpkh: EagerVec::forced_import(
path,
"p2wpkh_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2wsh: EagerVec::forced_import(
path,
"p2wsh_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2tr: EagerVec::forced_import(
path,
"p2tr_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2a: EagerVec::forced_import(
path,
"p2a_address_count",
version + VERSION + Version::ZERO,
format,
)?,
},
addresstype_to_height_to_empty_address_count: GroupedByAddressType {
p2pk65: EagerVec::forced_import(
path,
"p2pk65_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2pk33: EagerVec::forced_import(
path,
"p2pk33_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2pkh: EagerVec::forced_import(
path,
"p2pkh_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2sh: EagerVec::forced_import(
path,
"p2sh_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2wpkh: EagerVec::forced_import(
path,
"p2wpkh_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2wsh: EagerVec::forced_import(
path,
"p2wsh_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2tr: EagerVec::forced_import(
path,
"p2tr_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2a: EagerVec::forced_import(
path,
"p2a_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
},
addresstype_to_height_to_address_count: AddressTypeToAddressCountVec::from(
GroupedByAddressType {
p2pk65: EagerVec::forced_import(
path,
"p2pk65_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2pk33: EagerVec::forced_import(
path,
"p2pk33_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2pkh: EagerVec::forced_import(
path,
"p2pkh_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2sh: EagerVec::forced_import(
path,
"p2sh_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2wpkh: EagerVec::forced_import(
path,
"p2wpkh_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2wsh: EagerVec::forced_import(
path,
"p2wsh_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2tr: EagerVec::forced_import(
path,
"p2tr_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2a: EagerVec::forced_import(
path,
"p2a_address_count",
version + VERSION + Version::ZERO,
format,
)?,
},
),
addresstype_to_height_to_empty_address_count: AddressTypeToAddressCountVec::from(
GroupedByAddressType {
p2pk65: EagerVec::forced_import(
path,
"p2pk65_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2pk33: EagerVec::forced_import(
path,
"p2pk33_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2pkh: EagerVec::forced_import(
path,
"p2pkh_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2sh: EagerVec::forced_import(
path,
"p2sh_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2wpkh: EagerVec::forced_import(
path,
"p2wpkh_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2wsh: EagerVec::forced_import(
path,
"p2wsh_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2tr: EagerVec::forced_import(
path,
"p2tr_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
p2a: EagerVec::forced_import(
path,
"p2a_empty_address_count",
version + VERSION + Version::ZERO,
format,
)?,
},
),
utxo_vecs: utxo_cohorts::Vecs::forced_import(
path,
version,
@@ -439,21 +452,17 @@ impl Vecs {
} else {
Sats::ZERO
};
let mut addresstype_to_address_count = GroupedByAddressType::<usize>::from((
let mut addresstype_to_address_count = AddressTypeToAddressCount::from((
&self.addresstype_to_height_to_address_count,
starting_height,
));
let mut addresstype_to_empty_address_count = GroupedByAddressType::<usize>::from((
let mut addresstype_to_empty_address_count = AddressTypeToAddressCount::from((
&self.addresstype_to_height_to_empty_address_count,
starting_height,
));
let mut height = starting_height;
let mut addresstype_to_typeindex_to_sent_outputindex =
AddressTypeToTypeIndexVec::<OutputIndex>::default();
let mut addresstype_to_typeindex_to_received_outputindex =
AddressTypeToTypeIndexVec::<OutputIndex>::default();
let mut addresstype_to_typeindex_to_addressdata =
AddressTypeToTypeIndexTree::<WithAddressDataSource<AddressData>>::default();
let mut addresstype_to_typeindex_to_emptyaddressdata =
@@ -490,8 +499,8 @@ impl Vecs {
let input_count = height_to_input_count_iter.unwrap_get_inner(height);
let (
(mut height_to_sent, addresstype_to_typedindex_to_sent_outputindex, addresstype_to_typedindex_to_sent_data),
(mut received, addresstype_to_typedindex_to_received_outputindex, addresstype_to_typedindex_to_received_data),
(mut height_to_sent, addresstype_to_typedindex_to_sent_data),
(mut received, addresstype_to_typedindex_to_received_data),
) = thread::scope(|s| {
if chain_state_starting_height <= height {
s.spawn(|| {
@@ -592,7 +601,6 @@ impl Vecs {
value,
input_type,
typeindex,
outputindex,
addressdata_opt,
prev_price,
blocks_old,
@@ -604,7 +612,6 @@ impl Vecs {
|| {
(
BTreeMap::<Height, Transacted>::default(),
AddressTypeToTypeIndexVec::<OutputIndex>::default(),
AddressTypeToTypeIndexVec::<(
Sats,
Option<WithAddressDataSource<AddressData>>,
@@ -616,13 +623,12 @@ impl Vecs {
),
)
},
|(mut tree, mut vecs, mut vecs2),
|(mut tree, mut vecs),
(
height,
value,
input_type,
typeindex,
outputindex,
addressdata_opt,
prev_price,
blocks_old,
@@ -631,9 +637,6 @@ impl Vecs {
)| {
tree.entry(height).or_default().iterate(value, input_type);
if let Some(vec) = vecs.get_mut(input_type) {
vec.push((typeindex, outputindex));
}
if let Some(vec) = vecs2.get_mut(input_type) {
vec.push((
typeindex,
(
@@ -646,14 +649,13 @@ impl Vecs {
),
));
}
(tree, vecs, vecs2)
(tree, vecs)
},
)
.reduce(
|| {
(
BTreeMap::<Height, Transacted>::default(),
AddressTypeToTypeIndexVec::<OutputIndex>::default(),
AddressTypeToTypeIndexVec::<(
Sats,
Option<WithAddressDataSource<AddressData>>,
@@ -665,7 +667,7 @@ impl Vecs {
),
)
},
|(first_tree, mut source_vecs, mut source_vecs2), (second_tree, other_vecs, other_vecs2)| {
|(first_tree, mut source_vecs), (second_tree, other_vecs)| {
let (mut tree_source, tree_to_consume) =
if first_tree.len() > second_tree.len() {
(first_tree, second_tree)
@@ -676,8 +678,7 @@ impl Vecs {
*tree_source.entry(k).or_default() += v;
});
source_vecs.merge(other_vecs);
source_vecs2.merge(other_vecs2);
(tree_source, source_vecs, source_vecs2)
(tree_source, source_vecs)
},
)
});
@@ -738,13 +739,12 @@ impl Vecs {
None
};
(value, output_type, typeindex, outputindex, addressdata_opt)
(value, output_type, typeindex, addressdata_opt)
})
.fold(
|| {
(
Transacted::default(),
AddressTypeToTypeIndexVec::<OutputIndex>::default(),
AddressTypeToTypeIndexVec::<(
Sats,
Option<WithAddressDataSource<AddressData>>,
@@ -752,35 +752,27 @@ impl Vecs {
),
)
},
|(mut transacted, mut vecs, mut vecs2),
|(mut transacted, mut vecs),
(
value,
output_type,
typeindex,
outputindex,
addressdata_opt,
)| {
transacted.iterate(value, output_type);
if let Some(vec) = vecs.get_mut(output_type) {
vec.push((
typeindex,
outputindex,
));
}
if let Some(vec) = vecs2.get_mut(output_type) {
vec.push((
typeindex,
(value, addressdata_opt),
));
}
(transacted, vecs, vecs2)
(transacted, vecs)
},
)
.reduce(
|| {
(
Transacted::default(),
AddressTypeToTypeIndexVec::<OutputIndex>::default(),
AddressTypeToTypeIndexVec::<(
Sats,
Option<WithAddressDataSource<AddressData>>,
@@ -788,10 +780,9 @@ impl Vecs {
),
)
},
|(transacted, mut vecs, mut vecs2), (other_transacted, other_vecs, other_vecs2)| {
|(transacted, mut vecs), (other_transacted, other_vecs)| {
vecs.merge(other_vecs);
vecs2.merge(other_vecs2);
(transacted + other_transacted, vecs, vecs2)
(transacted + other_transacted, vecs)
},
)
});
@@ -829,12 +820,6 @@ impl Vecs {
};
thread::scope(|scope| -> Result<()> {
scope.spawn(|| addresstype_to_typeindex_to_sent_outputindex
.merge(addresstype_to_typedindex_to_sent_outputindex));
scope.spawn(|| addresstype_to_typeindex_to_received_outputindex
.merge(addresstype_to_typedindex_to_received_outputindex));
scope.spawn(|| {
// Push current block state before processing sends and receives
chain_state.push(BlockState {
@@ -956,8 +941,6 @@ impl Vecs {
self.flush_states(height, &chain_state, exit)?;
stores.commit(
height,
mem::take(&mut addresstype_to_typeindex_to_sent_outputindex),
mem::take(&mut addresstype_to_typeindex_to_received_outputindex),
mem::take(&mut addresstype_to_typeindex_to_addressdata),
mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata),
)?;
@@ -974,8 +957,6 @@ impl Vecs {
self.flush_states(height, &chain_state, exit)?;
stores.commit(
height,
mem::take(&mut addresstype_to_typeindex_to_sent_outputindex),
mem::take(&mut addresstype_to_typeindex_to_received_outputindex),
mem::take(&mut addresstype_to_typeindex_to_addressdata),
mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata),
)?;