parser: fix utxo panic after soft reset

This commit is contained in:
k
2024-07-18 12:27:24 +02:00
parent 4d23fdef61
commit 1f9d1542f1
9 changed files with 135 additions and 99 deletions

View File

@@ -11,7 +11,7 @@ use crate::{
bitcoin::{check_if_height_safe, BitcoinDB, NUMBER_OF_UNSAFE_BLOCKS},
databases::Databases,
datasets::{AllDatasets, ComputeData},
states::States,
states::{AddressCohortsDurableStates, States, UTXOCohortsDurableStates},
structs::{DateData, WNaiveDate},
utils::{generate_allocation_files, log, time},
};
@@ -33,8 +33,7 @@ pub fn iter_blocks(bitcoin_db: &BitcoinDB, block_count: usize) -> color_eyre::Re
log("Imported databases");
let mut states =
States::import(&mut databases.address_index_to_address_data, &datasets).unwrap_or_default();
let mut states = States::import().unwrap_or_default();
log("Imported states");
@@ -74,6 +73,26 @@ pub fn iter_blocks(bitcoin_db: &BitcoinDB, block_count: usize) -> color_eyre::Re
let current_block_date = WNaiveDate::from_timestamp(timestamp);
let current_block_height = height + blocks_loop_i;
if states.address_cohorts_durable_states.is_none()
&& datasets
.address
.needs_durable_states(current_block_height, current_block_date)
{
states.address_cohorts_durable_states =
Some(AddressCohortsDurableStates::init(
&mut databases.address_index_to_address_data,
));
}
if states.utxo_cohorts_durable_states.is_none()
&& datasets
.utxo
.needs_durable_states(current_block_height, current_block_date)
{
states.utxo_cohorts_durable_states =
Some(UTXOCohortsDurableStates::init(&states.date_data_vec));
}
let next_block_date = next_block_opt
.as_ref()
.map(|next_block| WNaiveDate::from_timestamp(next_block.header.time));

View File

@@ -570,11 +570,15 @@ pub fn parse(
states.date_data_vec.get_block_data(block_path).unwrap();
if block_data.height != height as u32 {
states.utxo_cohorts_durable_states.subtract_moved(
block_data,
sent_data,
previous_last_block_data,
);
states
.utxo_cohorts_durable_states
.as_mut()
.unwrap()
.subtract_moved(
block_data,
sent_data,
previous_last_block_data,
);
}
});
}
@@ -590,17 +594,24 @@ pub fn parse(
.iter()
.flat_map(|date_data| &date_data.blocks)
.for_each(|block_data| {
states.utxo_cohorts_durable_states.udpate_age_if_needed(
block_data,
last_block_data,
previous_last_block_data,
);
states
.utxo_cohorts_durable_states
.as_mut()
.unwrap()
.udpate_age_if_needed(
block_data,
last_block_data,
previous_last_block_data,
);
});
}
if datasets.utxo.needs_one_shot_states(height, date) {
utxo_cohorts_one_shot_states =
states.utxo_cohorts_durable_states.compute_one_shot_states(
utxo_cohorts_one_shot_states = states
.utxo_cohorts_durable_states
.as_ref()
.unwrap()
.compute_one_shot_states(
block_price,
if is_date_last_block {
Some(date_price)
@@ -646,6 +657,8 @@ pub fn parse(
states
.address_cohorts_durable_states
.as_mut()
.unwrap()
.iterate(address_realized_data, current_address_data)
.unwrap_or_else(|report| {
dbg!(report.to_string(), address_index);
@@ -686,6 +699,8 @@ pub fn parse(
address_cohorts_one_shot_states.replace(
states
.address_cohorts_durable_states
.as_ref()
.unwrap()
.compute_one_shot_states(
block_price,
if is_date_last_block {

View File

@@ -88,19 +88,19 @@ impl CohortDataset {
.any(|sub| sub.price_paid.needs_insert(height, date))
}
fn needs_insert_realized(&self, height: usize, date: WNaiveDate) -> bool {
pub fn needs_insert_realized(&self, height: usize, date: WNaiveDate) -> bool {
self.sub_datasets_vec()
.iter()
.any(|sub| sub.realized.needs_insert(height, date))
}
fn needs_insert_unrealized(&self, height: usize, date: WNaiveDate) -> bool {
pub fn needs_insert_unrealized(&self, height: usize, date: WNaiveDate) -> bool {
self.sub_datasets_vec()
.iter()
.any(|sub| sub.unrealized.needs_insert(height, date))
}
fn needs_insert_input(&self, height: usize, date: WNaiveDate) -> bool {
pub fn needs_insert_input(&self, height: usize, date: WNaiveDate) -> bool {
self.sub_datasets_vec()
.iter()
.any(|sub| sub.input.needs_insert(height, date))
@@ -112,7 +112,7 @@ impl CohortDataset {
// .any(|sub| sub.output.needs_insert(height, date))
// }
fn insert_realized_data(&mut self, insert_data: &InsertData) {
pub fn insert_realized_data(&mut self, insert_data: &InsertData) {
let split_realized_state = insert_data
.address_cohorts_realized_states
.as_ref()
@@ -141,6 +141,8 @@ impl CohortDataset {
let address_count = insert_data
.states
.address_cohorts_durable_states
.as_ref()
.unwrap()
.get(&self.split)
.unwrap()
.address_count;
@@ -377,6 +379,8 @@ impl CohortDataset {
let liquidity_split_processed_address_state = insert_data
.states
.address_cohorts_durable_states
.as_ref()
.unwrap()
.get(&self.split);
if liquidity_split_processed_address_state.is_none() {

View File

@@ -6,7 +6,7 @@ use allocative::Allocative;
use itertools::Itertools;
use rayon::prelude::*;
use crate::{states::SplitByAddressCohort, structs::BiMap};
use crate::{states::SplitByAddressCohort, structs::BiMap, WNaiveDate};
use self::{all_metadata::AllAddressesMetadataDataset, cohort::CohortDataset};
@@ -59,53 +59,73 @@ impl AddressDatasets {
.for_each(|(cohort, _)| cohort.insert(insert_data))
}
// pub fn needs_insert_utxo(&self, height: usize, date: WNaiveDate) -> bool {
// self.cohorts
// .as_vec()
// .iter()
// .any(|(dataset, _)| dataset.utxo.needs_insert(height, date))
pub fn needs_durable_states(&self, height: usize, date: WNaiveDate) -> bool {
let needs_insert_utxo = self.needs_insert_utxo(height, date);
let needs_insert_capitalization = self.needs_insert_capitalization(height, date);
let needs_insert_supply = self.needs_insert_supply(height, date);
let needs_one_shot_states = self.needs_one_shot_states(height, date);
needs_insert_utxo
|| needs_insert_capitalization
|| needs_insert_supply
|| needs_one_shot_states
}
pub fn needs_one_shot_states(&self, height: usize, date: WNaiveDate) -> bool {
self.needs_insert_price_paid(height, date) || self.needs_insert_unrealized(height, date)
}
// pub fn needs_sent_states(&self, height: usize, date: WNaiveDate) -> bool {
// self.needs_insert_input(height, date) || self.needs_insert_realized(height, date)
// }
// pub fn needs_insert_capitalization(&self, height: usize, date: WNaiveDate) -> bool {
pub fn needs_insert_utxo(&self, height: usize, date: WNaiveDate) -> bool {
self.cohorts
.as_vec()
.iter()
.any(|(dataset, _)| dataset.needs_insert_utxo(height, date))
}
pub fn needs_insert_capitalization(&self, height: usize, date: WNaiveDate) -> bool {
self.cohorts
.as_vec()
.iter()
.any(|(dataset, _)| dataset.needs_insert_capitalization(height, date))
}
pub fn needs_insert_supply(&self, height: usize, date: WNaiveDate) -> bool {
self.cohorts
.as_vec()
.iter()
.any(|(dataset, _)| dataset.needs_insert_supply(height, date))
}
pub fn needs_insert_price_paid(&self, height: usize, date: WNaiveDate) -> bool {
self.cohorts
.as_vec()
.iter()
.any(|(dataset, _)| dataset.needs_insert_price_paid(height, date))
}
// pub fn needs_insert_realized(&self, height: usize, date: WNaiveDate) -> bool {
// self.cohorts
// .as_vec()
// .iter()
// .any(|(dataset, _)| dataset.capitalization.needs_insert(height, date))
// .any(|(dataset, _)| dataset.needs_insert_realized(height, date))
// }
// pub fn needs_insert_supply(&self, height: usize, date: WNaiveDate) -> bool {
// self.cohorts
// .as_vec()
// .iter()
// .any(|(dataset, _)| dataset.supply.needs_insert(height, date))
// }
pub fn needs_insert_unrealized(&self, height: usize, date: WNaiveDate) -> bool {
self.cohorts
.as_vec()
.iter()
.any(|(dataset, _)| dataset.needs_insert_unrealized(height, date))
}
// pub fn needs_insert_price_paid(&self, height: usize, date: WNaiveDate) -> bool {
// pub fn needs_insert_input(&self, height: usize, date: WNaiveDate) -> bool {
// self.cohorts
// .as_vec()
// .iter()
// .any(|(dataset, _)| dataset.price_paid.needs_insert(height, date))
// }
// fn needs_insert_realized(&self, height: usize, date: WNaiveDate) -> bool {
// self.cohorts
// .as_vec()
// .iter()
// .any(|(dataset, _)| dataset.realized.needs_insert(height, date))
// }
// fn needs_insert_unrealized(&self, height: usize, date: WNaiveDate) -> bool {
// self.cohorts
// .as_vec()
// .iter()
// .any(|(dataset, _)| dataset.unrealized.needs_insert(height, date))
// }
// fn needs_insert_input(&self, height: usize, date: WNaiveDate) -> bool {
// self.cohorts
// .as_vec()
// .iter()
// .any(|(dataset, _)| dataset.input.needs_insert(height, date))
// .any(|(dataset, _)| dataset.needs_insert_input(height, date))
// }
pub fn compute(

View File

@@ -48,6 +48,8 @@ impl UTXODataset {
insert_data,
&states
.utxo_cohorts_durable_states
.as_ref()
.unwrap()
.get(&self.id)
.durable_states
.supply_state,
@@ -59,6 +61,8 @@ impl UTXODataset {
insert_data,
&states
.utxo_cohorts_durable_states
.as_ref()
.unwrap()
.get(&self.id)
.durable_states
.utxo_state,
@@ -70,6 +74,8 @@ impl UTXODataset {
insert_data,
&states
.utxo_cohorts_durable_states
.as_ref()
.unwrap()
.get(&self.id)
.durable_states
.capitalization_state,

View File

@@ -12,7 +12,7 @@ pub struct AddressCohortDurableStates {
pub price_to_split_amount: PriceToValue<SplitByLiquidity<WAmount>>,
}
const ONE_THIRD: f64 = 0.33333333333;
const ONE_THIRD: f64 = 1.0 / 3.0;
// TODO: Clean that mess, move to a generic liquidity split and somehow support rest for non floats
impl AddressCohortDurableStates {

View File

@@ -118,7 +118,7 @@ impl AddressCohortsDurableStates {
}
pub fn compute_one_shot_states(
&mut self,
&self,
block_price: Price,
date_price: Option<Price>,
) -> AddressCohortsOneShotStates {

View File

@@ -20,7 +20,10 @@ impl UTXOCohortsDurableStates {
let mut s = Self::default();
if let Some(last_date_data) = date_data_vec.last() {
let last_block_data = last_date_data.blocks.last().unwrap();
let last_block_data = last_date_data.blocks.last().unwrap_or_else(|| {
dbg!(&last_date_data);
panic!()
});
date_data_vec.iter().for_each(|date_data| {
let year = date_data.date.year() as u32;
@@ -137,7 +140,7 @@ impl UTXOCohortsDurableStates {
}
pub fn compute_one_shot_states(
&mut self,
&self,
block_price: Price,
date_price: Option<Price>,
) -> UTXOCohortsOneShotStates {

View File

@@ -12,60 +12,29 @@ pub use cohorts_states::*;
use counters::*;
use date_data_vec::*;
use crate::{databases::AddressIndexToAddressData, datasets::AllDatasets, utils::log};
use crate::utils::log;
#[derive(Default, Allocative)]
pub struct States {
pub address_counters: Counters,
pub date_data_vec: DateDataVec,
pub address_cohorts_durable_states: AddressCohortsDurableStates,
pub utxo_cohorts_durable_states: UTXOCohortsDurableStates,
pub address_cohorts_durable_states: Option<AddressCohortsDurableStates>,
pub utxo_cohorts_durable_states: Option<UTXOCohortsDurableStates>,
}
impl States {
pub fn import(
address_index_to_address_data: &mut AddressIndexToAddressData,
datasets: &AllDatasets,
) -> color_eyre::Result<Self> {
pub fn import() -> color_eyre::Result<Self> {
let date_data_vec_handle = thread::spawn(DateDataVec::import);
let address_counters = Counters::import()?;
let date_data_vec = date_data_vec_handle.join().unwrap()?;
// TODO:
// For both address and utxo check if any of these datasets have a None min
// If so use default state otherwise init
// unrealized
// price_paid
// capitalization
// supply
// utxo
let mut address_cohorts_durable_states = AddressCohortsDurableStates::default();
let mut utxo_cohorts_durable_states = UTXOCohortsDurableStates::default();
if let Some(first_date_data) = date_data_vec.first() {
if let Some(first_block_data) = first_date_data.blocks.first() {
let first_height = first_block_data.height as usize;
let first_date = first_date_data.date;
// TODO: Do the same for addresses
address_cohorts_durable_states =
AddressCohortsDurableStates::init(address_index_to_address_data);
if !datasets.utxo.needs_durable_states(first_height, first_date) {
utxo_cohorts_durable_states = UTXOCohortsDurableStates::init(&date_data_vec);
}
}
}
Ok(Self {
address_cohorts_durable_states,
address_cohorts_durable_states: None,
address_counters,
date_data_vec,
utxo_cohorts_durable_states,
utxo_cohorts_durable_states: None,
})
}
@@ -74,13 +43,13 @@ impl States {
let _ = self.date_data_vec.reset();
self.utxo_cohorts_durable_states = UTXOCohortsDurableStates::default();
self.utxo_cohorts_durable_states = None;
// TODO: Check that they are ONLY computed in an `if include_addresses`
if include_addresses {
let _ = self.address_counters.reset();
self.address_cohorts_durable_states = AddressCohortsDurableStates::default();
self.address_cohorts_durable_states = None;
}
}