global: snapshot

This commit is contained in:
nym21
2025-08-17 21:38:28 +02:00
parent 7d47bc8042
commit 05036c682f
51 changed files with 2551 additions and 6115 deletions

View File

@@ -23,7 +23,7 @@ const VERSION: Version = Version::ZERO;
#[derive(Clone)]
pub struct Vecs {
starting_height: Height,
starting_height: Option<Height>,
pub state: Option<AddressCohortState>,
@@ -50,7 +50,7 @@ impl Vecs {
let suffix = |s: &str| cohort_name.map_or(s.to_string(), |name| format!("{name}_{s}"));
Ok(Self {
starting_height: Height::ZERO,
starting_height: None,
state: states_path.map(|states_path| {
AddressCohortState::new(
states_path,
@@ -98,12 +98,16 @@ impl DynCohortVecs for Vecs {
.unwrap()
}
fn set_starting_height(&mut self, starting_height: Height) {
self.starting_height = Some(starting_height);
}
fn import_state_at(&mut self, starting_height: Height) -> Result<()> {
if starting_height > self.starting_height() {
unreachable!()
}
self.starting_height = starting_height;
self.set_starting_height(starting_height);
if let Some(prev_height) = starting_height.decremented() {
self.state.as_mut().unwrap().address_count = *self
@@ -113,7 +117,7 @@ impl DynCohortVecs for Vecs {
}
self.inner.import_state_at(
&mut self.starting_height,
self.starting_height.unwrap(),
&mut self.state.as_mut().unwrap().inner,
)
}
@@ -128,7 +132,7 @@ impl DynCohortVecs for Vecs {
}
fn forced_pushed_at(&mut self, height: Height, exit: &Exit) -> Result<()> {
if self.starting_height > height {
if self.starting_height.unwrap() > height {
return Ok(());
}

View File

@@ -1030,7 +1030,7 @@ impl Vecs {
pub fn import_state_at(
&mut self,
starting_height: &mut Height,
starting_height: Height,
state: &mut CohortState,
) -> Result<()> {
if let Some(prev_height) = starting_height.decremented() {

View File

@@ -13,7 +13,7 @@ use log::info;
use rayon::prelude::*;
use vecdb::{
AnyCollectableVec, AnyStoredVec, AnyVec, CollectableVec, Database, EagerVec, Exit, Format,
GenericStoredVec, PAGE_SIZE, RawVec, Reader, Stamp, StoredIndex, VecIterator,
GenericStoredVec, ImportOptions, PAGE_SIZE, RawVec, Reader, Stamp, StoredIndex, VecIterator,
};
use crate::{
@@ -47,8 +47,6 @@ pub struct Vecs {
// ---
// States
// ---
// Rollback: diff on stamped_flush + add rollback to stamp
pub chain_state: RawVec<Height, SupplyState>,
pub p2pk33addressindex_to_anyaddressindex: RawVec<P2PK33AddressIndex, AnyAddressIndex>,
pub p2pk65addressindex_to_anyaddressindex: RawVec<P2PK65AddressIndex, AnyAddressIndex>,
@@ -61,7 +59,6 @@ pub struct Vecs {
pub loadedaddressindex_to_loadedaddressdata: RawVec<LoadedAddressIndex, LoadedAddressData>,
pub emptyaddressindex_to_emptyaddressdata: RawVec<EmptyAddressIndex, EmptyAddressData>,
// Rollback: inner state: save price_to_amount_STAMP instead of price_to_amount
pub utxo_cohorts: utxo_cohorts::Vecs,
pub address_cohorts: address_cohorts::Vecs,
@@ -81,6 +78,8 @@ pub struct Vecs {
pub indexes_to_empty_address_count: ComputedVecsFromHeight<StoredU64>,
}
const SAVED_STAMPED_CHANGES: u16 = 10;
impl Vecs {
pub fn forced_import(
parent: &Path,
@@ -99,10 +98,9 @@ impl Vecs {
let chain_db = Database::open(&parent.join("chain"))?;
Ok(Self {
chain_state: RawVec::forced_import(
&chain_db,
"chain",
version + VERSION + Version::ZERO,
chain_state: RawVec::forced_import_with(
ImportOptions::new(&chain_db, "chain", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
height_to_unspendable_supply: EagerVec::forced_import_compressed(
@@ -390,56 +388,46 @@ impl Vecs {
states_path,
)?,
p2aaddressindex_to_anyaddressindex: RawVec::forced_import(
&db,
"anyaddressindex",
version + VERSION + Version::ZERO,
p2aaddressindex_to_anyaddressindex: RawVec::forced_import_with(
ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
p2pk33addressindex_to_anyaddressindex: RawVec::forced_import(
&db,
"anyaddressindex",
version + VERSION + Version::ZERO,
p2pk33addressindex_to_anyaddressindex: RawVec::forced_import_with(
ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
p2pk65addressindex_to_anyaddressindex: RawVec::forced_import(
&db,
"anyaddressindex",
version + VERSION + Version::ZERO,
p2pk65addressindex_to_anyaddressindex: RawVec::forced_import_with(
ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
p2pkhaddressindex_to_anyaddressindex: RawVec::forced_import(
&db,
"anyaddressindex",
version + VERSION + Version::ZERO,
p2pkhaddressindex_to_anyaddressindex: RawVec::forced_import_with(
ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
p2shaddressindex_to_anyaddressindex: RawVec::forced_import(
&db,
"anyaddressindex",
version + VERSION + Version::ZERO,
p2shaddressindex_to_anyaddressindex: RawVec::forced_import_with(
ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
p2traddressindex_to_anyaddressindex: RawVec::forced_import(
&db,
"anyaddressindex",
version + VERSION + Version::ZERO,
p2traddressindex_to_anyaddressindex: RawVec::forced_import_with(
ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
p2wpkhaddressindex_to_anyaddressindex: RawVec::forced_import(
&db,
"anyaddressindex",
version + VERSION + Version::ZERO,
p2wpkhaddressindex_to_anyaddressindex: RawVec::forced_import_with(
ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
p2wshaddressindex_to_anyaddressindex: RawVec::forced_import(
&db,
"anyaddressindex",
version + VERSION + Version::ZERO,
p2wshaddressindex_to_anyaddressindex: RawVec::forced_import_with(
ImportOptions::new(&db, "anyaddressindex", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
loadedaddressindex_to_loadedaddressdata: RawVec::forced_import(
&db,
"loadedaddressdata",
version + VERSION + Version::ZERO,
loadedaddressindex_to_loadedaddressdata: RawVec::forced_import_with(
ImportOptions::new(&db, "loadedaddressdata", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
emptyaddressindex_to_emptyaddressdata: RawVec::forced_import(
&db,
"emptyaddressdata",
version + VERSION + Version::ZERO,
emptyaddressindex_to_emptyaddressdata: RawVec::forced_import_with(
ImportOptions::new(&db, "emptyaddressdata", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
db,
@@ -572,7 +560,6 @@ impl Vecs {
let mut chain_state: Vec<BlockState> = vec![];
let mut chain_state_starting_height = Height::from(self.chain_state.len());
let stateful_starting_height = match separate_utxo_vecs
.par_iter_mut()
.map(|(_, v)| v.starting_height())
@@ -659,18 +646,15 @@ impl Vecs {
self.loadedaddressindex_to_loadedaddressdata.reset()?;
self.emptyaddressindex_to_emptyaddressdata.reset()?;
info!("Resetting utxo price maps...");
separate_utxo_vecs
.par_iter_mut()
.flat_map(|(_, v)| v.state.as_mut())
.try_for_each(|state| state.reset_price_to_amount_if_needed())?;
info!("Resetting address price maps...");
separate_utxo_vecs.par_iter_mut().try_for_each(|(_, v)| {
v.set_starting_height(starting_height);
v.state.as_mut().unwrap().reset_price_to_amount_if_needed()
})?;
separate_address_vecs
.par_iter_mut()
.try_for_each(|(_, v)| {
v.set_starting_height(starting_height);
v.state.as_mut().unwrap().reset_price_to_amount_if_needed()
})?;
}
@@ -753,6 +737,13 @@ impl Vecs {
&mut anyaddressindex_to_anyaddressdata_reader_opt,
);
let last_height = Height::from(
height_to_date_fixed
.len()
.checked_sub(1)
.unwrap_or_default(),
);
(height.unwrap_to_usize()..height_to_date_fixed.len())
.map(Height::from)
.try_for_each(|_height| -> Result<()> {
@@ -1150,13 +1141,13 @@ impl Vecs {
)
})?;
if height != Height::ZERO && height.unwrap_to_usize() % 10_000 == 0 {
if height != last_height && height != Height::ZERO && height.unwrap_to_usize() % 10_000 == 0 {
let _lock = exit.lock();
addresstypeindex_to_anyaddressindex_reader_opt.take();
anyaddressindex_to_anyaddressdata_reader_opt.take();
self.flush_states(height, &chain_state, mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata), mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata), exit)?;
self.flush_states(height, &chain_state, mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata), mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata), false, exit)?;
self.reset_readers_options(
&mut addresstypeindex_to_anyaddressindex_reader_opt,
@@ -1177,6 +1168,7 @@ impl Vecs {
&chain_state,
mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata),
mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata),
true,
exit,
)?;
}
@@ -1507,6 +1499,7 @@ impl Vecs {
mut addresstype_to_typeindex_to_emptyaddressdata: AddressTypeToTypeIndexTree<
WithAddressDataSource<EmptyAddressData>,
>,
with_changes: bool,
exit: &Exit,
) -> Result<()> {
info!("Flushing...");
@@ -1676,32 +1669,35 @@ impl Vecs {
})
})?;
let stamp = Stamp::from(height);
self.p2pk33addressindex_to_anyaddressindex
.stamped_flush(Stamp::from(height))?;
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
self.p2pk65addressindex_to_anyaddressindex
.stamped_flush(Stamp::from(height))?;
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
self.p2pkhaddressindex_to_anyaddressindex
.stamped_flush(Stamp::from(height))?;
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
self.p2shaddressindex_to_anyaddressindex
.stamped_flush(Stamp::from(height))?;
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
self.p2traddressindex_to_anyaddressindex
.stamped_flush(Stamp::from(height))?;
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
self.p2wpkhaddressindex_to_anyaddressindex
.stamped_flush(Stamp::from(height))?;
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
self.p2wshaddressindex_to_anyaddressindex
.stamped_flush(Stamp::from(height))?;
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
self.p2aaddressindex_to_anyaddressindex
.stamped_flush(Stamp::from(height))?;
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
self.loadedaddressindex_to_loadedaddressdata
.stamped_flush(Stamp::from(height))?;
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
self.emptyaddressindex_to_emptyaddressdata
.stamped_flush(Stamp::from(height))?;
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
self.chain_state.truncate_if_needed(Height::ZERO)?;
chain_state.iter().for_each(|block_state| {
self.chain_state.push(block_state.supply.clone());
});
self.chain_state.flush()?;
self.chain_state
.stamped_flush_maybe_with_changes(stamp, with_changes)?;
Ok(())
}

View File

@@ -7,6 +7,7 @@ use crate::{Indexes, indexes, market, price};
pub trait DynCohortVecs: Send + Sync {
fn starting_height(&self) -> Height;
fn set_starting_height(&mut self, starting_height: Height);
fn import_state_at(&mut self, starting_height: Height) -> Result<()>;

View File

@@ -69,17 +69,19 @@ impl DynCohortVecs for Vecs {
self.inner.starting_height()
}
fn set_starting_height(&mut self, starting_height: Height) {
self.starting_height = Some(starting_height);
}
fn import_state_at(&mut self, starting_height: Height) -> Result<()> {
if starting_height > self.starting_height() {
unreachable!()
}
self.starting_height = Some(starting_height);
self.set_starting_height(starting_height);
self.inner.import_state_at(
self.starting_height.as_mut().unwrap(),
self.state.as_mut().unwrap(),
)
self.inner
.import_state_at(self.starting_height.unwrap(), self.state.as_mut().unwrap())
}
fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {

View File

@@ -234,12 +234,12 @@ impl Vecs {
.into_owned();
let range = first_index..first_index + count as usize;
range.into_iter().fold(Sats::ZERO, |total, outputindex| {
total
+ outputindex_to_value_iter
.next_at(outputindex)
.unwrap()
.1
.into_owned()
let v = outputindex_to_value_iter
.next_at(outputindex)
.unwrap()
.1
.into_owned();
total + v
})
})
},
@@ -704,10 +704,21 @@ impl Vecs {
// },
// )?;
self.txindex_to_fee.compute_subtract(
self.txindex_to_fee.compute_transform3(
starting_indexes.txindex,
&self.txindex_to_input_value,
&self.txindex_to_output_value,
&self.txindex_to_is_coinbase,
|(i, input, output, coinbase, ..)| {
(
i,
if coinbase.is_true() {
Sats::ZERO
} else {
input.checked_sub(output).unwrap()
},
)
},
exit,
)?;