computer: store part 10

This commit is contained in:
nym21
2025-07-05 17:44:51 +02:00
parent 5fe984c39d
commit d31d47eb32
39 changed files with 896 additions and 486 deletions

View File

@@ -2,7 +2,7 @@ use std::{cmp::Ordering, collections::BTreeMap, mem, path::Path, thread};
use brk_core::{
AddressData, CheckedSub, DateIndex, Dollars, EmptyAddressData, GroupedByAddressType, Height,
InputIndex, OutputIndex, OutputType, Result, Sats, Version,
InputIndex, OutputIndex, OutputType, Result, Sats, TypeIndex, Version,
};
use brk_exit::Exit;
use brk_indexer::Indexer;
@@ -20,7 +20,7 @@ use crate::{
market,
stateful::{
addresstype_to_addresscount::AddressTypeToAddressCount,
addresstype_to_addresscount_vec::AddressTypeToAddressCountVec,
addresstype_to_addresscount_vec::AddressTypeToAddressCountVec, r#trait::DynCohortVecs,
},
},
};
@@ -269,6 +269,14 @@ impl Vecs {
) -> color_eyre::Result<()> {
let height_to_first_outputindex = &indexer.vecs.height_to_first_outputindex;
let height_to_first_inputindex = &indexer.vecs.height_to_first_inputindex;
let height_to_first_p2aaddressindex = &indexer.vecs.height_to_first_p2aaddressindex;
let height_to_first_p2pk33addressindex = &indexer.vecs.height_to_first_p2pk33addressindex;
let height_to_first_p2pk65addressindex = &indexer.vecs.height_to_first_p2pk65addressindex;
let height_to_first_p2pkhaddressindex = &indexer.vecs.height_to_first_p2pkhaddressindex;
let height_to_first_p2shaddressindex = &indexer.vecs.height_to_first_p2shaddressindex;
let height_to_first_p2traddressindex = &indexer.vecs.height_to_first_p2traddressindex;
let height_to_first_p2wpkhaddressindex = &indexer.vecs.height_to_first_p2wpkhaddressindex;
let height_to_first_p2wshaddressindex = &indexer.vecs.height_to_first_p2wshaddressindex;
let height_to_output_count = transactions.indexes_to_output_count.height.unwrap_sum();
let height_to_input_count = transactions.indexes_to_input_count.height.unwrap_sum();
let inputindex_to_outputindex = &indexer.vecs.inputindex_to_outputindex;
@@ -300,11 +308,24 @@ impl Vecs {
let outputindex_to_typeindex_mmap = outputindex_to_typeindex.mmap().load();
let outputindex_to_txindex_mmap = outputindex_to_txindex.mmap().load();
let txindex_to_height_mmap = txindex_to_height.mmap().load();
let height_to_close_mmap = height_to_close.map(|v| v.mmap().load());
let height_to_timestamp_fixed_mmap = height_to_timestamp_fixed.mmap().load();
let mut height_to_first_outputindex_iter = height_to_first_outputindex.into_iter();
let mut height_to_first_inputindex_iter = height_to_first_inputindex.into_iter();
let mut height_to_first_p2aaddressindex_iter = height_to_first_p2aaddressindex.into_iter();
let mut height_to_first_p2pk33addressindex_iter =
height_to_first_p2pk33addressindex.into_iter();
let mut height_to_first_p2pk65addressindex_iter =
height_to_first_p2pk65addressindex.into_iter();
let mut height_to_first_p2pkhaddressindex_iter =
height_to_first_p2pkhaddressindex.into_iter();
let mut height_to_first_p2shaddressindex_iter =
height_to_first_p2shaddressindex.into_iter();
let mut height_to_first_p2traddressindex_iter =
height_to_first_p2traddressindex.into_iter();
let mut height_to_first_p2wpkhaddressindex_iter =
height_to_first_p2wpkhaddressindex.into_iter();
let mut height_to_first_p2wshaddressindex_iter =
height_to_first_p2wshaddressindex.into_iter();
let mut height_to_output_count_iter = height_to_output_count.into_iter();
let mut height_to_input_count_iter = height_to_input_count.into_iter();
let mut height_to_close_iter = height_to_close.as_ref().map(|v| v.into_iter());
@@ -318,6 +339,14 @@ impl Vecs {
let base_version = Version::ZERO
+ height_to_first_outputindex.version()
+ height_to_first_inputindex.version()
+ height_to_first_p2aaddressindex.version()
+ height_to_first_p2pk33addressindex.version()
+ height_to_first_p2pk65addressindex.version()
+ height_to_first_p2pkhaddressindex.version()
+ height_to_first_p2shaddressindex.version()
+ height_to_first_p2traddressindex.version()
+ height_to_first_p2wpkhaddressindex.version()
+ height_to_first_p2wshaddressindex.version()
+ height_to_timestamp_fixed.version()
+ height_to_output_count.version()
+ height_to_input_count.version()
@@ -438,6 +467,11 @@ impl Vecs {
.par_iter_mut()
.for_each(|(_, v)| v.init(starting_height));
let height_to_close_vec =
height_to_close.map(|height_to_close| height_to_close.collect().unwrap());
let height_to_timestamp_fixed_vec = height_to_timestamp_fixed.collect().unwrap();
let mut unspendable_supply = if let Some(prev_height) = starting_height.decremented() {
self.height_to_unspendable_supply
.into_iter()
@@ -498,6 +532,34 @@ impl Vecs {
let output_count = height_to_output_count_iter.unwrap_get_inner(height);
let input_count = height_to_input_count_iter.unwrap_get_inner(height);
let first_addressindexes: GroupedByAddressType<TypeIndex> =
GroupedByAddressType {
p2a: height_to_first_p2aaddressindex_iter
.unwrap_get_inner(height)
.into(),
p2pk33: height_to_first_p2pk33addressindex_iter
.unwrap_get_inner(height)
.into(),
p2pk65: height_to_first_p2pk65addressindex_iter
.unwrap_get_inner(height)
.into(),
p2pkh: height_to_first_p2pkhaddressindex_iter
.unwrap_get_inner(height)
.into(),
p2sh: height_to_first_p2shaddressindex_iter
.unwrap_get_inner(height)
.into(),
p2tr: height_to_first_p2traddressindex_iter
.unwrap_get_inner(height)
.into(),
p2wpkh: height_to_first_p2wpkhaddressindex_iter
.unwrap_get_inner(height)
.into(),
p2wsh: height_to_first_p2wshaddressindex_iter
.unwrap_get_inner(height)
.into(),
};
let (
(mut height_to_sent, addresstype_to_typedindex_to_sent_data),
(mut received, addresstype_to_typedindex_to_received_data),
@@ -532,31 +594,6 @@ impl Vecs {
.unwrap()
.into_owned();
let typeindex = outputindex_to_typeindex
.get_or_read(outputindex, &outputindex_to_typeindex_mmap)
.unwrap()
.unwrap()
.into_owned();
if input_type.is_unspendable() {
unreachable!()
}
let addressdata_opt = if input_type.is_address()
&& !addresstype_to_typeindex_to_addressdata
.get(input_type)
.unwrap()
.contains_key(&typeindex)
&& let Some(address_data) =
stores.get_addressdata(input_type, typeindex).unwrap()
{
Some(WithAddressDataSource::FromAddressDataStore(
address_data,
))
} else {
None
};
let input_txindex = outputindex_to_txindex
.get_or_read(outputindex, &outputindex_to_txindex_mmap)
.unwrap()
@@ -569,22 +606,49 @@ impl Vecs {
.unwrap()
.into_owned();
let prev_price = height_to_close.map(|m| {
*m.get_or_read(
if input_type.is_unspendable() {
unreachable!()
} else if input_type.is_not_address() {
return (
prev_height,
height_to_close_mmap.as_ref().unwrap(),
value,
input_type,
None
)
.unwrap()
.unwrap()
.into_owned()
});
}
let prev_timestamp = height_to_timestamp_fixed
.get_or_read(prev_height, &height_to_timestamp_fixed_mmap)
let typeindex = outputindex_to_typeindex
.get_or_read(outputindex, &outputindex_to_typeindex_mmap)
.unwrap()
.unwrap()
.into_owned();
let addressdata_opt = if input_type.is_address()
&& *first_addressindexes.get(input_type).unwrap()
> typeindex
&& !addresstype_to_typeindex_to_addressdata
.get(input_type)
.unwrap()
.contains_key(&typeindex)
&& let Some(address_data) =
stores.get_addressdata(input_type, typeindex).unwrap()
// Otherwise it was empty and got funds in the same block before sending them
{
Some(WithAddressDataSource::FromAddressDataStore(
address_data,
))
} else {
None
};
let prev_price = height_to_close_vec
.as_ref()
.map(|v| **v.get(prev_height.unwrap_to_usize()).unwrap());
let prev_timestamp = *height_to_timestamp_fixed_vec
.get(prev_height.unwrap_to_usize())
.unwrap();
let blocks_old =
height.unwrap_to_usize() - prev_height.unwrap_to_usize();
@@ -600,25 +664,27 @@ impl Vecs {
prev_height,
value,
input_type,
typeindex,
addressdata_opt,
prev_price,
blocks_old,
days_old,
older_than_hour,
Some((typeindex,
addressdata_opt,
prev_price,
blocks_old,
days_old,
older_than_hour
))
)
})
.fold(
|| {
(
BTreeMap::<Height, Transacted>::default(),
AddressTypeToTypeIndexVec::<(
AddressTypeToVec::<(
TypeIndex,
Sats,
Option<WithAddressDataSource<AddressData>>,
Option<Dollars>,
usize,
f64,
bool,
bool
)>::default(
),
)
@@ -628,25 +694,23 @@ impl Vecs {
height,
value,
input_type,
typeindex,
address_data_opt
)| {
tree.entry(height).or_default().iterate(value, input_type);
if let Some((typeindex,
addressdata_opt,
prev_price,
blocks_old,
days_old,
older_than_hour,
)| {
tree.entry(height).or_default().iterate(value, input_type);
if let Some(vec) = vecs.get_mut(input_type) {
vec.push((
older_than_hour)) = address_data_opt {
vecs.get_mut(input_type).unwrap().push((
typeindex,
(
value,
addressdata_opt,
prev_price,
blocks_old,
days_old,
older_than_hour,
),
value,
addressdata_opt,
prev_price,
blocks_old,
days_old,
older_than_hour,
));
}
(tree, vecs)
@@ -656,15 +720,15 @@ impl Vecs {
|| {
(
BTreeMap::<Height, Transacted>::default(),
AddressTypeToTypeIndexVec::<(
AddressTypeToVec::<(
TypeIndex,
Sats,
Option<WithAddressDataSource<AddressData>>,
Option<Dollars>,
usize,
f64,
bool,
)>::default(
),
)>::default(),
)
},
|(first_tree, mut source_vecs), (second_tree, other_vecs)| {
@@ -683,111 +747,119 @@ impl Vecs {
)
});
let received_handle = s.spawn(|| {
(first_outputindex..first_outputindex + *output_count)
.into_par_iter()
.map(OutputIndex::from)
.map(|outputindex| {
let value = outputindex_to_value
.get_or_read(outputindex, &outputindex_to_value_mmap)
.unwrap()
.unwrap()
.into_owned();
// let received_handle = s.spawn(|| {
let received_output = (first_outputindex..first_outputindex + *output_count)
.into_par_iter()
.map(OutputIndex::from)
.map(|outputindex| {
let value = outputindex_to_value
.get_or_read(outputindex, &outputindex_to_value_mmap)
.unwrap()
.unwrap()
.into_owned();
let output_type = outputindex_to_outputtype
.get_or_read(outputindex, &outputindex_to_outputtype_mmap)
.unwrap()
.unwrap()
.into_owned();
let output_type = outputindex_to_outputtype
.get_or_read(outputindex, &outputindex_to_outputtype_mmap)
.unwrap()
.unwrap()
.into_owned();
let typeindex = outputindex_to_typeindex
.get_or_read(outputindex, &outputindex_to_typeindex_mmap)
.unwrap()
.unwrap()
.into_owned();
if output_type.is_not_address() {
return (value, output_type, None);
}
let addressdata_opt = if output_type.is_address()
&& !addresstype_to_typeindex_to_addressdata
.get(output_type)
let typeindex = outputindex_to_typeindex
.get_or_read(outputindex, &outputindex_to_typeindex_mmap)
.unwrap()
.unwrap()
.into_owned();
let addressdata_opt = if *first_addressindexes.get(output_type).unwrap()
<= typeindex {
Some(WithAddressDataSource::New(AddressData::default()))
} else if !addresstype_to_typeindex_to_addressdata
.get(output_type)
.unwrap()
.contains_key(&typeindex)
&& !addresstype_to_typeindex_to_emptyaddressdata
.get(output_type)
.unwrap()
.contains_key(&typeindex) {
Some(
if let Some(addressdata) = stores
.get_addressdata(output_type, typeindex)
.unwrap()
.contains_key(&typeindex)
&& !addresstype_to_typeindex_to_emptyaddressdata
.get(output_type)
{
WithAddressDataSource::FromAddressDataStore(
addressdata,
)
} else if let Some(emptyaddressdata) = stores
.get_emptyaddressdata(output_type, typeindex)
.unwrap()
.contains_key(&typeindex)
{
Some(
if let Some(addressdata) = stores
.get_addressdata(output_type, typeindex)
.unwrap()
{
WithAddressDataSource::FromAddressDataStore(
addressdata,
)
} else if let Some(emptyaddressdata) = stores
.get_emptyaddressdata(output_type, typeindex)
.unwrap()
{
WithAddressDataSource::FromEmptyAddressDataStore(
emptyaddressdata.into(),
)
} else {
WithAddressDataSource::New(AddressData::default())
},
)
} else {
None
};
{
WithAddressDataSource::FromEmptyAddressDataStore(
emptyaddressdata.into(),
)
} else {
WithAddressDataSource::New(AddressData::default())
},
)
} else {
None
};
(value, output_type, typeindex, addressdata_opt)
})
.fold(
|| {
(
Transacted::default(),
AddressTypeToTypeIndexVec::<(
Sats,
Option<WithAddressDataSource<AddressData>>,
)>::default(
),
)
},
|(mut transacted, mut vecs),
(
value,
output_type,
typeindex,
addressdata_opt,
)| {
transacted.iterate(value, output_type);
if let Some(vec) = vecs.get_mut(output_type) {
vec.push((
typeindex,
(value, addressdata_opt),
));
}
(transacted, vecs)
},
)
.reduce(
|| {
(
Transacted::default(),
AddressTypeToTypeIndexVec::<(
Sats,
Option<WithAddressDataSource<AddressData>>,
)>::default(
),
)
},
|(transacted, mut vecs), (other_transacted, other_vecs)| {
vecs.merge(other_vecs);
(transacted + other_transacted, vecs)
},
)
});
(value, output_type, Some((typeindex, addressdata_opt)))
})
.fold(
|| {
(
Transacted::default(),
AddressTypeToVec::<(
TypeIndex,
Sats,
Option<WithAddressDataSource<AddressData>>,
)>::default(
),
)
},
|(mut transacted, mut vecs),
(
value,
output_type,
typeindex_with_addressdata_opt,
)| {
transacted.iterate(value, output_type);
if let Some(vec) = vecs.get_mut(output_type) {
let (typeindex,
addressdata_opt) = typeindex_with_addressdata_opt.unwrap();
vec.push((
typeindex,
value,
addressdata_opt,
));
}
(transacted, vecs)
},
)
.reduce(
|| {
(
Transacted::default(),
AddressTypeToVec::<(
TypeIndex,
Sats,
Option<WithAddressDataSource<AddressData>>,
)>::default(),
)
},
|(transacted, mut vecs), (other_transacted, other_vecs)| {
vecs.merge(other_vecs);
(transacted + other_transacted, vecs)
},
);
// });
(sent_handle.join().unwrap(), received_handle.join().unwrap())
(sent_handle.join().unwrap(), received_output)
});
if chain_state_starting_height > height {
@@ -904,36 +976,22 @@ impl Vecs {
.as_mut()
.map(|v| is_date_last_height.then(|| *v.unwrap_get_inner(dateindex)));
thread::scope(|scope| {
scope.spawn(|| {
separate_utxo_vecs
.par_iter_mut()
.try_for_each(|(_, v)| {
v.compute_then_force_push_unrealized_states(
height,
price,
is_date_last_height.then_some(dateindex),
date_price,
exit,
)
})
.unwrap();
});
scope.spawn(|| {
separate_utxo_vecs
.into_par_iter()
.map(|(_, v)| v as &mut dyn DynCohortVecs).chain(
separate_address_vecs
.par_iter_mut()
.try_for_each(|(_, v)| {
v.compute_then_force_push_unrealized_states(
height,
price,
is_date_last_height.then_some(dateindex),
date_price,
exit,
)
})
.unwrap();
});
});
.into_par_iter()
.map(|(_, v)| v as &mut dyn DynCohortVecs)
)
.try_for_each(|v| {
v.compute_then_force_push_unrealized_states(
height,
price,
is_date_last_height.then_some(dateindex),
date_price,
exit,
)
})?;
if height != Height::ZERO && height.unwrap_to_usize() % 10_000 == 0 {
info!("Flushing...");
@@ -1159,7 +1217,7 @@ impl Vecs {
}
}
impl AddressTypeToTypeIndexVec<(Sats, Option<WithAddressDataSource<AddressData>>)> {
impl AddressTypeToVec<(TypeIndex, Sats, Option<WithAddressDataSource<AddressData>>)> {
fn process_received(
mut self,
vecs: &mut address_cohorts::Vecs,
@@ -1175,7 +1233,7 @@ impl AddressTypeToTypeIndexVec<(Sats, Option<WithAddressDataSource<AddressData>>
) {
self.into_typed_vec().into_iter().for_each(|(_type, vec)| {
vec.into_iter()
.for_each(|(type_index, (value, addressdata_opt))| {
.for_each(|(type_index, value, addressdata_opt)| {
let mut is_new = false;
let mut from_any_empty = false;
@@ -1242,7 +1300,8 @@ impl AddressTypeToTypeIndexVec<(Sats, Option<WithAddressDataSource<AddressData>>
}
impl
AddressTypeToTypeIndexVec<(
AddressTypeToVec<(
TypeIndex,
Sats,
Option<WithAddressDataSource<AddressData>>,
Option<Dollars>,
@@ -1270,7 +1329,12 @@ impl
vec.into_iter().try_for_each(
|(
type_index,
(value, addressdata_opt, prev_price, blocks_old, days_old, older_than_hour),
value,
addressdata_opt,
prev_price,
blocks_old,
days_old,
older_than_hour,
)| {
let typeindex_to_addressdata = addresstype_to_typeindex_to_addressdata
.get_mut(_type)