global: reorg fixes + clients improved

This commit is contained in:
nym21
2026-01-28 23:35:51 +01:00
parent fecaf0f400
commit 6709ded66c
55 changed files with 4312 additions and 83 deletions

View File

@@ -58,9 +58,10 @@ macro_rules! define_any_address_indexes_vecs {
}
/// Get address index for a given type and typeindex.
/// Uses get_any_or_read_at_unwrap to check updated layer (needed after rollback).
pub fn get(&self, address_type: OutputType, typeindex: TypeIndex, reader: &Reader) -> AnyAddressIndex {
match address_type {
$(OutputType::$variant => self.$field.get_pushed_or_read_at_unwrap(typeindex.into(), reader),)*
$(OutputType::$variant => self.$field.get_any_or_read_at_unwrap(typeindex.into(), reader),)*
_ => unreachable!("Invalid address type: {:?}", address_type),
}
}

View File

@@ -112,16 +112,18 @@ pub fn load_uncached_address_data(
Some(match anyaddressindex.to_enum() {
AnyAddressDataIndexEnum::Loaded(loaded_index) => {
let reader = &vr.anyaddressindex_to_anyaddressdata.loaded;
// Use get_any_or_read_unwrap to check updated layer (needed after rollback)
let loaded_data = addresses_data
.loaded
.get_pushed_or_read_unwrap(loaded_index, reader);
.get_any_or_read_unwrap(loaded_index, reader);
WithAddressDataSource::FromLoaded(loaded_index, loaded_data)
}
AnyAddressDataIndexEnum::Empty(empty_index) => {
let reader = &vr.anyaddressindex_to_anyaddressdata.empty;
// Use get_any_or_read_unwrap to check updated layer (needed after rollback)
let empty_data = addresses_data
.empty
.get_pushed_or_read_unwrap(empty_index, reader);
.get_any_or_read_unwrap(empty_index, reader);
WithAddressDataSource::FromEmpty(empty_index, empty_data.into())
}
})

View File

@@ -2,6 +2,7 @@ use std::{cmp::Ordering, collections::BTreeSet};
use brk_error::Result;
use brk_types::Height;
use tracing::{debug, warn};
use vecdb::Stamp;
use super::super::{
@@ -44,27 +45,49 @@ pub fn recover_state(
// If rollbacks are inconsistent, start fresh
if consistent_height.is_zero() {
warn!("Rollback consistency check failed: inconsistent heights");
return Ok(RecoveredState {
starting_height: Height::ZERO,
});
}
// Rollback can land at an earlier height (multi-block change file), which is fine.
// But if it lands AHEAD of target, that means rollback failed (missing change files).
if consistent_height > height {
warn!(
"Rollback failed: still at {} but target was {}, falling back to fresh start",
consistent_height, height
);
return Ok(RecoveredState {
starting_height: Height::ZERO,
});
}
if consistent_height != height {
debug!(
"Rollback landed at {} instead of {}, will resume from there",
consistent_height, height
);
}
// Import UTXO cohort states - all must succeed
if !utxo_cohorts.import_separate_states(height) {
if !utxo_cohorts.import_separate_states(consistent_height) {
warn!("UTXO cohort state import failed at height {}", consistent_height);
return Ok(RecoveredState {
starting_height: Height::ZERO,
});
}
// Import address cohort states - all must succeed
if !address_cohorts.import_separate_states(height) {
if !address_cohorts.import_separate_states(consistent_height) {
warn!("Address cohort state import failed at height {}", consistent_height);
return Ok(RecoveredState {
starting_height: Height::ZERO,
});
}
Ok(RecoveredState {
starting_height: height,
starting_height: consistent_height,
})
}
@@ -132,28 +155,38 @@ fn rollback_states(
// All rollbacks must succeed - any error means fresh start
let Ok(s) = chain_state_rollback else {
warn!("chain_state rollback failed: {:?}", chain_state_rollback);
return Height::ZERO;
};
heights.insert(Height::from(s).incremented());
let chain_height = Height::from(s).incremented();
debug!("chain_state rolled back to stamp {:?}, height {}", s, chain_height);
heights.insert(chain_height);
let Ok(stamps) = address_indexes_rollbacks else {
warn!("address_indexes rollback failed: {:?}", address_indexes_rollbacks);
return Height::ZERO;
};
for s in stamps {
heights.insert(Height::from(s).incremented());
for (i, s) in stamps.iter().enumerate() {
let h = Height::from(*s).incremented();
debug!("address_indexes[{}] rolled back to stamp {:?}, height {}", i, s, h);
heights.insert(h);
}
let Ok(stamps) = address_data_rollbacks else {
warn!("address_data rollback failed: {:?}", address_data_rollbacks);
return Height::ZERO;
};
for s in stamps {
heights.insert(Height::from(s).incremented());
for (i, s) in stamps.iter().enumerate() {
let h = Height::from(*s).incremented();
debug!("address_data[{}] rolled back to stamp {:?}, height {}", i, s, h);
heights.insert(h);
}
// All must agree on the same height
if heights.len() == 1 {
heights.pop_first().unwrap()
} else {
warn!("Rollback heights inconsistent: {:?}", heights);
Height::ZERO
}
}

View File

@@ -1,6 +1,6 @@
use std::ops::Bound;
use brk_types::{Dollars, Sats};
use brk_types::{CentsUnsigned, Dollars, Sats};
use vecdb::CheckedSub;
use super::price_to_amount::PriceToAmount;
@@ -11,6 +11,10 @@ pub struct UnrealizedState {
pub supply_in_loss: Sats,
pub unrealized_profit: Dollars,
pub unrealized_loss: Dollars,
/// Invested capital in profit: Σ(sats × price) where price <= spot
pub invested_capital_in_profit: Dollars,
/// Invested capital in loss: Σ(sats × price) where price > spot
pub invested_capital_in_loss: Dollars,
}
impl UnrealizedState {
@@ -19,6 +23,8 @@ impl UnrealizedState {
supply_in_loss: Sats::ZERO,
unrealized_profit: Dollars::NAN,
unrealized_loss: Dollars::NAN,
invested_capital_in_profit: Dollars::NAN,
invested_capital_in_loss: Dollars::NAN,
};
pub const ZERO: Self = Self {
@@ -26,6 +32,8 @@ impl UnrealizedState {
supply_in_loss: Sats::ZERO,
unrealized_profit: Dollars::ZERO,
unrealized_loss: Dollars::ZERO,
invested_capital_in_profit: Dollars::ZERO,
invested_capital_in_loss: Dollars::ZERO,
};
}
@@ -62,14 +70,17 @@ impl CachedUnrealizedState {
/// Update cached state when a receive happens.
/// Determines profit/loss classification relative to cached price.
pub fn on_receive(&mut self, purchase_price: Dollars, sats: Sats) {
let invested_capital = purchase_price * sats;
if purchase_price <= self.at_price {
self.state.supply_in_profit += sats;
self.state.invested_capital_in_profit += invested_capital;
if purchase_price < self.at_price {
let diff = self.at_price.checked_sub(purchase_price).unwrap();
self.state.unrealized_profit += diff * sats;
}
} else {
self.state.supply_in_loss += sats;
self.state.invested_capital_in_loss += invested_capital;
let diff = purchase_price.checked_sub(self.at_price).unwrap();
self.state.unrealized_loss += diff * sats;
}
@@ -77,9 +88,15 @@ impl CachedUnrealizedState {
/// Update cached state when a send happens from historical price.
pub fn on_send(&mut self, historical_price: Dollars, sats: Sats) {
let invested_capital = historical_price * sats;
if historical_price <= self.at_price {
// Was in profit
self.state.supply_in_profit -= sats;
self.state.invested_capital_in_profit = self
.state
.invested_capital_in_profit
.checked_sub(invested_capital)
.unwrap();
if historical_price < self.at_price {
let diff = self.at_price.checked_sub(historical_price).unwrap();
let profit_removed = diff * sats;
@@ -92,6 +109,11 @@ impl CachedUnrealizedState {
} else {
// Was in loss
self.state.supply_in_loss -= sats;
self.state.invested_capital_in_loss = self
.state
.invested_capital_in_loss
.checked_sub(invested_capital)
.unwrap();
let diff = historical_price.checked_sub(self.at_price).unwrap();
let loss_removed = diff * sats;
self.state.unrealized_loss = self
@@ -210,14 +232,17 @@ impl CachedUnrealizedState {
let mut state = UnrealizedState::ZERO;
for (price, &sats) in price_to_amount.iter() {
let invested_capital = price * sats;
if price <= current_price {
state.supply_in_profit += sats;
state.invested_capital_in_profit += invested_capital;
if price < current_price {
let diff = current_price.checked_sub(price).unwrap();
state.unrealized_profit += diff * sats;
}
} else {
state.supply_in_loss += sats;
state.invested_capital_in_loss += invested_capital;
let diff = price.checked_sub(current_price).unwrap();
state.unrealized_loss += diff * sats;
}