computer: stateful: maybe got rollback to work, tbd

This commit is contained in:
nym21
2025-08-19 23:34:05 +02:00
parent 05036c682f
commit da1ff2cacc
20 changed files with 267 additions and 139 deletions

View File

@@ -26,6 +26,7 @@ mod utils;
use indexes::Indexes;
pub use states::PriceToAmount;
use states::*;
#[derive(Clone)]
@@ -79,7 +80,6 @@ impl Computer {
Format::Compressed,
&indexes,
price.as_ref(),
&computed_path.join("states"),
)?,
transactions: transactions::Vecs::forced_import(
&computed_path,

View File

@@ -88,26 +88,26 @@ impl Vecs {
}
impl DynCohortVecs for Vecs {
fn starting_height(&self) -> Height {
fn min_height_vecs_len(&self) -> usize {
[
self.height_to_address_count.len().into(),
self.inner.starting_height(),
self.height_to_address_count.len(),
self.inner.min_height_vecs_len(),
]
.into_iter()
.min()
.unwrap()
}
fn set_starting_height(&mut self, starting_height: Height) {
self.starting_height = Some(starting_height);
fn reset_state_starting_height(&mut self) {
self.starting_height = Some(Height::ZERO);
}
fn import_state_at(&mut self, starting_height: Height) -> Result<()> {
if starting_height > self.starting_height() {
unreachable!()
}
fn import_state(&mut self, starting_height: Height) -> Result<Height> {
let starting_height = self
.inner
.import_state(starting_height, &mut self.state.as_mut().unwrap().inner)?;
self.set_starting_height(starting_height);
self.starting_height = Some(starting_height);
if let Some(prev_height) = starting_height.decremented() {
self.state.as_mut().unwrap().address_count = *self
@@ -116,10 +116,7 @@ impl DynCohortVecs for Vecs {
.unwrap_get_inner(prev_height);
}
self.inner.import_state_at(
self.starting_height.unwrap(),
&mut self.state.as_mut().unwrap().inner,
)
Ok(starting_height)
}
fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {

View File

@@ -973,7 +973,7 @@ impl Vecs {
})
}
pub fn starting_height(&self) -> Height {
pub fn min_height_vecs_len(&self) -> usize {
[
self.height_to_supply.len(),
self.height_to_utxo_count.len(),
@@ -1023,17 +1023,20 @@ impl Vecs {
self.height_to_satblocks_destroyed.len(),
]
.into_iter()
.map(Height::from)
.min()
.unwrap()
}
pub fn import_state_at(
pub fn import_state(
&mut self,
starting_height: Height,
state: &mut CohortState,
) -> Result<()> {
if let Some(prev_height) = starting_height.decremented() {
) -> Result<Height> {
if let Some(mut prev_height) = starting_height.decremented() {
if self.height_to_realized_cap.as_mut().is_some() {
prev_height = state.import_at_or_before(prev_height)?;
}
state.supply.value = self
.height_to_supply
.into_iter()
@@ -1047,10 +1050,9 @@ impl Vecs {
state.realized.as_mut().unwrap().cap = height_to_realized_cap
.into_iter()
.unwrap_get_inner(prev_height);
state.import_at(prev_height)?;
}
Ok(())
Ok(prev_height.incremented())
} else {
Err(Error::Str("Unset"))
}

View File

@@ -1,4 +1,10 @@
use std::{cmp::Ordering, collections::BTreeMap, mem, path::Path, thread};
use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet},
mem,
path::Path,
thread,
};
use brk_error::Result;
use brk_indexer::Indexer;
@@ -87,19 +93,19 @@ impl Vecs {
format: Format,
indexes: &indexes::Vecs,
price: Option<&price::Vecs>,
states_path: &Path,
) -> Result<Self> {
let db = Database::open(&parent.join("stateful"))?;
let db_path = parent.join("stateful");
let states_path = db_path.join("states");
let db = Database::open(&db_path)?;
db.set_min_len(PAGE_SIZE * 20_000_000)?;
db.set_min_regions(50_000)?;
let compute_dollars = price.is_some();
let chain_db = Database::open(&parent.join("chain"))?;
Ok(Self {
chain_state: RawVec::forced_import_with(
ImportOptions::new(&chain_db, "chain", version + VERSION + Version::ZERO)
ImportOptions::new(&db, "chain", version + VERSION + Version::ZERO)
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
)?,
@@ -377,7 +383,7 @@ impl Vecs {
format,
indexes,
price,
states_path,
&states_path,
)?,
address_cohorts: address_cohorts::Vecs::forced_import(
&db,
@@ -385,7 +391,7 @@ impl Vecs {
format,
indexes,
price,
states_path,
&states_path,
)?,
p2aaddressindex_to_anyaddressindex: RawVec::forced_import_with(
@@ -558,17 +564,16 @@ impl Vecs {
base_version + self.height_to_opreturn_supply.inner_version(),
)?;
let mut chain_state: Vec<BlockState> = vec![];
let mut chain_state_starting_height = Height::from(self.chain_state.len());
let stateful_starting_height = match separate_utxo_vecs
.par_iter_mut()
.map(|(_, v)| v.starting_height())
.map(|(_, v)| Height::from(v.min_height_vecs_len()))
.min()
.unwrap_or_default()
.min(
separate_address_vecs
.par_iter_mut()
.map(|(_, v)| v.starting_height())
.map(|(_, v)| Height::from(v.min_height_vecs_len()))
.min()
.unwrap_or_default(),
)
@@ -588,7 +593,82 @@ impl Vecs {
.cmp(&chain_state_starting_height)
{
Ordering::Greater => unreachable!(),
Ordering::Equal => {
Ordering::Equal => chain_state_starting_height,
Ordering::Less => Height::ZERO,
};
// dbg!(stateful_starting_height);
// let stateful_starting_height = stateful_starting_height
// .checked_sub(Height::new(1))
// .unwrap_or_default();
// dbg!(stateful_starting_height);
let starting_height = starting_indexes.height.min(stateful_starting_height);
// dbg!(starting_height);
let last_height = Height::from(indexer.vecs.height_to_blockhash.stamp());
// dbg!(last_height);
if starting_height <= last_height {
// dbg!(starting_height);
let starting_height = if starting_height.is_not_zero() {
let mut set = separate_utxo_vecs
.iter_mut()
.map(|(_, v)| v.import_state(starting_height).unwrap_or_default())
.collect::<BTreeSet<Height>>();
if set.len() == 1 {
set.pop_first().unwrap()
} else {
Height::ZERO
}
} else {
Height::ZERO
};
// dbg!(starting_height);
let starting_height = if starting_height.is_not_zero()
&& separate_address_vecs
.iter_mut()
.map(|(_, v)| v.import_state(starting_height).unwrap_or_default())
.chain(
[
self.chain_state.rollback_before(starting_height.into())?,
self.p2pk33addressindex_to_anyaddressindex
.rollback_before(starting_height.into())?,
self.p2pk65addressindex_to_anyaddressindex
.rollback_before(starting_height.into())?,
self.p2pkhaddressindex_to_anyaddressindex
.rollback_before(starting_height.into())?,
self.p2shaddressindex_to_anyaddressindex
.rollback_before(starting_height.into())?,
self.p2traddressindex_to_anyaddressindex
.rollback_before(starting_height.into())?,
self.p2wpkhaddressindex_to_anyaddressindex
.rollback_before(starting_height.into())?,
self.p2wshaddressindex_to_anyaddressindex
.rollback_before(starting_height.into())?,
self.p2aaddressindex_to_anyaddressindex
.rollback_before(starting_height.into())?,
self.loadedaddressindex_to_loadedaddressdata
.rollback_before(starting_height.into())?,
self.emptyaddressindex_to_emptyaddressdata
.rollback_before(starting_height.into())?,
]
.into_iter()
.map(Height::from)
.map(Height::incremented),
)
.all(|h| h == starting_height)
{
starting_height
} else {
Height::ZERO
};
// dbg!(starting_height);
// std::process::exit(0);
let mut chain_state: Vec<BlockState>;
if starting_height.is_not_zero() {
chain_state = self
.chain_state
.collect_range(None, None)?
@@ -607,33 +687,10 @@ impl Vecs {
}
})
.collect::<Vec<_>>();
chain_state_starting_height
}
Ordering::Less => Height::ZERO,
};
let starting_height = starting_indexes.height.min(stateful_starting_height);
let last_height = Height::from(indexer.vecs.height_to_blockhash.stamp());
if starting_height <= last_height {
let starting_height = if separate_utxo_vecs
.par_iter_mut()
.try_for_each(|(_, v)| v.import_state_at(starting_height))
.is_err()
|| separate_address_vecs
.par_iter_mut()
.try_for_each(|(_, v)| v.import_state_at(starting_height))
.is_err()
{
Height::ZERO
} else {
starting_height
};
if starting_height.is_zero() {
info!("Starting processing utxos from the start");
chain_state = vec![];
chain_state_starting_height = Height::ZERO;
self.p2pk33addressindex_to_anyaddressindex.reset()?;
self.p2pk65addressindex_to_anyaddressindex.reset()?;
@@ -647,18 +704,20 @@ impl Vecs {
self.emptyaddressindex_to_emptyaddressdata.reset()?;
separate_utxo_vecs.par_iter_mut().try_for_each(|(_, v)| {
v.set_starting_height(starting_height);
v.reset_state_starting_height();
v.state.as_mut().unwrap().reset_price_to_amount_if_needed()
})?;
separate_address_vecs
.par_iter_mut()
.try_for_each(|(_, v)| {
v.set_starting_height(starting_height);
v.reset_state_starting_height();
v.state.as_mut().unwrap().reset_price_to_amount_if_needed()
})?;
}
chain_state_starting_height = starting_height;
starting_indexes.update_from_height(starting_height, indexes);
let inputindex_to_outputindex_reader = inputindex_to_outputindex.create_reader();
@@ -971,7 +1030,7 @@ impl Vecs {
}
height_to_addresstype_to_typedindex_to_data
.entry(height)
.entry(prev_height)
.or_default()
.get_mut(output_type)
.unwrap()

View File

@@ -6,10 +6,10 @@ use vecdb::{AnyCollectableVec, AnyIterableVec, Exit};
use crate::{Indexes, indexes, market, price};
pub trait DynCohortVecs: Send + Sync {
fn starting_height(&self) -> Height;
fn set_starting_height(&mut self, starting_height: Height);
fn min_height_vecs_len(&self) -> usize;
fn reset_state_starting_height(&mut self);
fn import_state_at(&mut self, starting_height: Height) -> Result<()>;
fn import_state(&mut self, starting_height: Height) -> Result<Height>;
fn validate_computed_versions(&mut self, base_version: Version) -> Result<()>;

View File

@@ -15,7 +15,7 @@ use crate::{
#[derive(Clone)]
pub struct Vecs {
starting_height: Option<Height>,
state_starting_height: Option<Height>,
pub state: Option<UTXOCohortState>,
@@ -39,7 +39,7 @@ impl Vecs {
let compute_dollars = price.is_some();
Ok(Self {
starting_height: None,
state_starting_height: None,
state: states_path.map(|states_path| {
UTXOCohortState::new(
@@ -65,23 +65,22 @@ impl Vecs {
}
impl DynCohortVecs for Vecs {
fn starting_height(&self) -> Height {
self.inner.starting_height()
fn min_height_vecs_len(&self) -> usize {
self.inner.min_height_vecs_len()
}
fn set_starting_height(&mut self, starting_height: Height) {
self.starting_height = Some(starting_height);
fn reset_state_starting_height(&mut self) {
self.state_starting_height = Some(Height::ZERO);
}
fn import_state_at(&mut self, starting_height: Height) -> Result<()> {
if starting_height > self.starting_height() {
unreachable!()
}
fn import_state(&mut self, starting_height: Height) -> Result<Height> {
let starting_height = self
.inner
.import_state(starting_height, self.state.as_mut().unwrap())?;
self.set_starting_height(starting_height);
self.state_starting_height = Some(starting_height);
self.inner
.import_state_at(self.starting_height.unwrap(), self.state.as_mut().unwrap())
Ok(starting_height)
}
fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {
@@ -89,7 +88,7 @@ impl DynCohortVecs for Vecs {
}
fn forced_pushed_at(&mut self, height: Height, exit: &Exit) -> Result<()> {
if self.starting_height.unwrap() > height {
if self.state_starting_height.unwrap() > height {
return Ok(());
}

View File

@@ -1,13 +1,17 @@
use std::ops::{Add, AddAssign, SubAssign};
use brk_structs::{Dollars, Timestamp};
use serde::Serialize;
use super::SupplyState;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct BlockState {
#[serde(flatten)]
pub supply: SupplyState,
#[serde(skip)]
pub price: Option<Dollars>,
#[serde(skip)]
pub timestamp: Timestamp,
}

View File

@@ -27,11 +27,12 @@ impl CohortState {
}
}
pub fn import_at(&mut self, height: Height) -> Result<()> {
pub fn import_at_or_before(&mut self, height: Height) -> Result<Height> {
if let Some(price_to_amount) = self.price_to_amount.as_mut() {
price_to_amount.import_at(height)?;
price_to_amount.import_at_or_before(height)
} else {
Ok(height)
}
Ok(())
}
pub fn reset_price_to_amount_if_needed(&mut self) -> Result<()> {

View File

@@ -4,7 +4,7 @@ use std::{
path::{Path, PathBuf},
};
use brk_error::Result;
use brk_error::{Error, Result};
use brk_structs::{Dollars, Height, Sats};
use derive_deref::{Deref, DerefMut};
use pco::standalone::{simple_decompress, simpler_compress};
@@ -30,9 +30,14 @@ impl PriceToAmount {
}
}
pub fn import_at(&mut self, height: Height) -> Result<()> {
self.state = Some(State::deserialize(&fs::read(self.path_state(height))?)?);
Ok(())
pub fn import_at_or_before(&mut self, height: Height) -> Result<Height> {
let files = self.read_dir(None)?;
let (&height, path) = files
.range(..=height)
.next_back()
.ok_or(Error::Str("Not found"))?;
self.state = Some(State::deserialize(&fs::read(path)?)?);
Ok(height)
}
pub fn iter(&self) -> impl Iterator<Item = (&Dollars, &Sats)> {
@@ -77,27 +82,28 @@ impl PriceToAmount {
Ok(())
}
pub fn flush(&mut self, height: Height) -> Result<()> {
let files: BTreeMap<Height, PathBuf> = fs::read_dir(&self.pathbuf)?
fn read_dir(&self, keep_only_before: Option<Height>) -> Result<BTreeMap<Height, PathBuf>> {
Ok(fs::read_dir(&self.pathbuf)?
.filter_map(|entry| {
let path = entry.ok()?.path();
let name = path.file_name()?.to_str()?;
if let Some(height_str) = name.strip_prefix(STATE_AT_) {
if let Ok(h) = height_str.parse::<u64>().map(Height::from) {
if h < height {
Some((h, path))
} else {
let _ = fs::remove_file(path);
None
}
let height_str = name.strip_prefix(STATE_AT_).unwrap_or(name);
if let Ok(h) = height_str.parse::<u32>().map(Height::from) {
if keep_only_before.is_none_or(|height| h < height) {
Some((h, path))
} else {
let _ = fs::remove_file(path);
None
}
} else {
None
}
})
.collect();
.collect::<BTreeMap<Height, PathBuf>>())
}
pub fn flush(&mut self, height: Height) -> Result<()> {
let files = self.read_dir(Some(height))?;
for (_, path) in files
.iter()
@@ -118,7 +124,7 @@ impl PriceToAmount {
Self::path_state_(&self.pathbuf, height)
}
fn path_state_(path: &Path, height: Height) -> PathBuf {
path.join(format!("{STATE_AT_}{}", height))
path.join(u32::from(height).to_string())
}
}
@@ -130,6 +136,7 @@ const COMPRESSION_LEVEL: usize = 4;
impl State {
fn serialize(&self) -> vecdb::Result<Vec<u8>> {
let keys: Vec<f64> = self.keys().cloned().map(f64::from).collect();
let values: Vec<u64> = self.values().cloned().map(u64::from).collect();
let compressed_keys = simpler_compress(&keys, COMPRESSION_LEVEL)?;