global: snapshot

This commit is contained in:
nym21
2025-12-07 19:13:41 +01:00
parent f23907768f
commit b88f0bab56
17 changed files with 388 additions and 377 deletions
+7 -6
View File
@@ -165,7 +165,7 @@ impl Stores {
self.addresstype_to_addressindex_and_unspentoutpoint
.par_values_mut()
.map(|s| s as &mut dyn AnyStore),
) // Changed from par_iter_mut()
)
.map(|store| {
let items = store.take_all_f2();
store.export_meta_if_needed(height)?;
@@ -249,11 +249,12 @@ impl Stores {
let txindex = TxIndex::from(txindex);
let txidprefix = TxidPrefix::from(&txid);
let is_known_dup = crate::DUPLICATE_TXID_PREFIXES
.iter()
.any(|(dup_prefix, dup_txindex)| {
txindex == *dup_txindex && txidprefix == *dup_prefix
});
let is_known_dup =
crate::DUPLICATE_TXID_PREFIXES
.iter()
.any(|(dup_prefix, dup_txindex)| {
txindex == *dup_txindex && txidprefix == *dup_prefix
});
if !is_known_dup {
self.txidprefix_to_txindex.remove(txidprefix);
+57 -219
View File
@@ -10,14 +10,13 @@ use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, Kind3, Mode3, StoreFjallV3 as Store};
use brk_types::{
AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height,
OutPoint, OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version,
Vout,
AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, OutPoint,
OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout,
};
use fjall3::{AbstractTree, Database, PersistMode};
use fjall3::{Database, PersistMode};
use log::info;
use rayon::prelude::*;
use vecdb::{AnyVec, GenericStoredVec, TypedVecIterator, VecIndex, VecIterator};
use vecdb::{AnyVec, TypedVecIterator, VecIndex, VecIterator};
use crate::Indexes;
@@ -130,24 +129,35 @@ impl Stores {
}
pub fn starting_height(&self) -> Height {
self.iter_any_store()
.map(|store| {
// let height =
store.height().map(Height::incremented).unwrap_or_default()
// dbg!((height, store.name()));
})
.min()
.unwrap()
[
&self.blockhashprefix_to_height as &dyn AnyStore,
&self.height_to_coinbase_tag,
&self.txidprefix_to_txindex,
]
.into_iter()
.chain(
self.addresstype_to_addresshash_to_addressindex
.values()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_txindex
.values()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_unspentoutpoint
.values()
.map(|s| s as &dyn AnyStore),
)
.map(|store| store.height().map(Height::incremented).unwrap_or_default())
.min()
.unwrap()
}
pub fn commit(&mut self, height: Height) -> Result<()> {
info!(
"self.db.config.cache.size = {}",
self.db.config.cache.size()
);
let i = Instant::now();
let tuples = [
[
&mut self.blockhashprefix_to_height as &mut dyn AnyStore,
&mut self.height_to_coinbase_tag,
&mut self.txidprefix_to_txindex,
@@ -155,40 +165,21 @@ impl Stores {
.into_par_iter()
.chain(
self.addresstype_to_addresshash_to_addressindex
.par_iter_mut()
.par_values_mut()
.map(|s| s as &mut dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_txindex
.par_iter_mut()
.par_values_mut()
.map(|s| s as &mut dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_unspentoutpoint
.par_iter_mut()
.par_values_mut()
.map(|s| s as &mut dyn AnyStore),
) // Changed from par_iter_mut()
.map(|store| {
let items = store.take_all_f3();
store.export_meta_if_needed(height)?;
Ok((store.keyspace(), items))
})
.collect::<Result<Vec<_>>>()?;
info!("Store items collected in {:?}", i.elapsed());
let version_memtable_size_sum = tuples
.iter()
.map(|(keyspace, _)| keyspace.tree.version_memtable_size_sum())
.collect::<Vec<_>>();
// let sum = version_memtable_size_sum.iter().sum::<usize>();
println!(
"version_memtable_size_sum = {:?} = ",
version_memtable_size_sum
);
let i = Instant::now();
self.db.batch().commit_keyspaces(tuples)?;
info!("Batch done in {:?}", i.elapsed());
.try_for_each(|store| store.commit_f3(height))?;
info!("Commits done in {:?}", i.elapsed());
let i = Instant::now();
self.db.persist(PersistMode::SyncData)?;
@@ -202,30 +193,6 @@ impl Stores {
Ok(())
}
fn iter_any_store(&self) -> impl Iterator<Item = &dyn AnyStore> {
[
&self.blockhashprefix_to_height as &dyn AnyStore,
&self.height_to_coinbase_tag,
&self.txidprefix_to_txindex,
]
.into_iter()
.chain(
self.addresstype_to_addresshash_to_addressindex
.iter()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_txindex
.iter()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_unspentoutpoint
.iter()
.map(|s| s as &dyn AnyStore),
)
}
pub fn rollback_if_needed(
&mut self,
vecs: &mut Vecs,
@@ -236,15 +203,15 @@ impl Stores {
&& self.height_to_coinbase_tag.is_empty()?
&& self
.addresstype_to_addresshash_to_addressindex
.iter()
.values()
.try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?
&& self
.addresstype_to_addressindex_and_txindex
.iter()
.values()
.try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?
&& self
.addresstype_to_addressindex_and_unspentoutpoint
.iter()
.values()
.try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?
{
return Ok(());
@@ -265,145 +232,25 @@ impl Stores {
self.height_to_coinbase_tag.remove(h);
});
if let Ok(mut index) = vecs
.height_to_first_p2pk65addressindex
.read_once(starting_indexes.height)
{
let mut p2pk65addressindex_to_p2pk65bytes_iter =
vecs.p2pk65addressindex_to_p2pk65bytes.iter()?;
while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
// Remove address hashes for all address types starting from rollback height
for address_type in [
OutputType::P2PK65,
OutputType::P2PK33,
OutputType::P2PKH,
OutputType::P2SH,
OutputType::P2WPKH,
OutputType::P2WSH,
OutputType::P2TR,
OutputType::P2A,
] {
for hash in vecs.iter_address_hashes_from(address_type, starting_indexes.height)? {
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2PK65)
.get_mut_unwrap(address_type)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2pk33addressindex
.read_once(starting_indexes.height)
{
let mut p2pk33addressindex_to_p2pk33bytes_iter =
vecs.p2pk33addressindex_to_p2pk33bytes.iter()?;
while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2PK33)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2pkhaddressindex
.read_once(starting_indexes.height)
{
let mut p2pkhaddressindex_to_p2pkhbytes_iter =
vecs.p2pkhaddressindex_to_p2pkhbytes.iter()?;
while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2PKH)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2shaddressindex
.read_once(starting_indexes.height)
{
let mut p2shaddressindex_to_p2shbytes_iter =
vecs.p2shaddressindex_to_p2shbytes.iter()?;
while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2SH)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2wpkhaddressindex
.read_once(starting_indexes.height)
{
let mut p2wpkhaddressindex_to_p2wpkhbytes_iter =
vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter()?;
while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2WPKH)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2wshaddressindex
.read_once(starting_indexes.height)
{
let mut p2wshaddressindex_to_p2wshbytes_iter =
vecs.p2wshaddressindex_to_p2wshbytes.iter()?;
while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2WSH)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2traddressindex
.read_once(starting_indexes.height)
{
let mut p2traddressindex_to_p2trbytes_iter =
vecs.p2traddressindex_to_p2trbytes.iter()?;
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2TR)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2aaddressindex
.read_once(starting_indexes.height)
{
let mut p2aaddressindex_to_p2abytes_iter =
vecs.p2aaddressindex_to_p2abytes.iter()?;
while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2A)
.remove(hash);
index.increment();
}
}
} else {
unreachable!();
// self.blockhashprefix_to_height.reset()?;
// self.addresshash_to_typeindex.reset()?;
}
if starting_indexes.txindex != TxIndex::ZERO {
@@ -413,24 +260,21 @@ impl Stores {
.skip(starting_indexes.txindex.to_usize())
.for_each(|(txindex, txid)| {
let txindex = TxIndex::from(txindex);
let txidprefix = TxidPrefix::from(&txid);
// "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599"
let is_not_first_dup = txindex != TxIndex::new(142783)
|| txidprefix != TxidPrefix::from([153, 133, 216, 41, 84, 225, 15, 34]);
let is_known_dup =
crate::DUPLICATE_TXID_PREFIXES
.iter()
.any(|(dup_prefix, dup_txindex)| {
txindex == *dup_txindex && txidprefix == *dup_prefix
});
// "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468"
let is_not_second_dup = txindex != TxIndex::new(142841)
|| txidprefix != TxidPrefix::from([104, 180, 95, 88, 182, 116, 233, 78]);
if is_not_first_dup && is_not_second_dup {
if !is_known_dup {
self.txidprefix_to_txindex.remove(txidprefix);
}
});
} else {
unreachable!();
// self.txidprefix_to_txindex.reset()?;
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
@@ -504,12 +348,6 @@ impl Stores {
});
} else {
unreachable!();
// self.addresstype_to_typeindex_and_txindex
// .iter_mut()
// .try_for_each(|s| s.reset())?;
// self.addresstype_to_typeindex_and_unspentoutpoint
// .iter_mut()
// .try_for_each(|s| s.reset())?;
}
self.commit(starting_indexes.height.decremented().unwrap_or_default())?;