global: fixes

This commit is contained in:
nym21
2025-12-06 21:35:19 +01:00
parent f280b03cab
commit f23907768f
30 changed files with 439 additions and 295 deletions

View File

@@ -173,30 +173,77 @@ impl Vecs {
}
macro_rules! computed_h {
($name:expr, $source:expr, $opts:expr) => {
ComputedVecsFromHeight::forced_import(&db, $name, $source, version + v0, indexes, $opts)?
ComputedVecsFromHeight::forced_import(
&db,
$name,
$source,
version + v0,
indexes,
$opts,
)?
};
($name:expr, $source:expr, $v:expr, $opts:expr) => {
ComputedVecsFromHeight::forced_import(&db, $name, $source, version + $v, indexes, $opts)?
ComputedVecsFromHeight::forced_import(
&db,
$name,
$source,
version + $v,
indexes,
$opts,
)?
};
}
macro_rules! computed_di {
($name:expr, $opts:expr) => {
ComputedVecsFromDateIndex::forced_import(&db, $name, Source::Compute, version + v0, indexes, $opts)?
ComputedVecsFromDateIndex::forced_import(
&db,
$name,
Source::Compute,
version + v0,
indexes,
$opts,
)?
};
($name:expr, $v:expr, $opts:expr) => {
ComputedVecsFromDateIndex::forced_import(&db, $name, Source::Compute, version + $v, indexes, $opts)?
ComputedVecsFromDateIndex::forced_import(
&db,
$name,
Source::Compute,
version + $v,
indexes,
$opts,
)?
};
}
macro_rules! computed_tx {
($name:expr, $source:expr, $opts:expr) => {
ComputedVecsFromTxindex::forced_import(&db, $name, $source, version + v0, indexes, $opts)?
ComputedVecsFromTxindex::forced_import(
&db,
$name,
$source,
version + v0,
indexes,
$opts,
)?
};
}
let last = || VecBuilderOptions::default().add_last();
let sum = || VecBuilderOptions::default().add_sum();
let sum_cum = || VecBuilderOptions::default().add_sum().add_cumulative();
let stats = || VecBuilderOptions::default().add_average().add_minmax().add_percentiles();
let full_stats = || VecBuilderOptions::default().add_average().add_minmax().add_percentiles().add_sum().add_cumulative();
let stats = || {
VecBuilderOptions::default()
.add_average()
.add_minmax()
.add_percentiles()
};
let full_stats = || {
VecBuilderOptions::default()
.add_average()
.add_minmax()
.add_percentiles()
.add_sum()
.add_cumulative()
};
let txinindex_to_value: EagerVec<PcoVec<TxInIndex, Sats>> = eager!("value");
@@ -299,7 +346,10 @@ impl Vecs {
yearindex_to_block_count_target,
decadeindex_to_block_count_target,
height_to_interval: eager!("interval"),
timeindexes_to_timestamp: computed_di!("timestamp", VecBuilderOptions::default().add_first()),
timeindexes_to_timestamp: computed_di!(
"timestamp",
VecBuilderOptions::default().add_first()
),
indexes_to_block_interval: computed_h!("block_interval", Source::None, stats()),
indexes_to_block_count: computed_h!("block_count", Source::Compute, sum_cum()),
indexes_to_1w_block_count: computed_di!("1w_block_count", last()),
@@ -328,7 +378,7 @@ impl Vecs {
indexes_to_tx_v3: computed_h!("tx_v3", Source::Compute, sum_cum()),
indexes_to_sent: ComputedValueVecsFromHeight::forced_import(
&db,
"sent",
"sent_sum",
Source::Compute,
version + Version::ZERO,
VecBuilderOptions::default().add_sum(),
@@ -400,36 +450,120 @@ impl Vecs {
indexes_to_p2wpkh_count: computed_h!("p2wpkh_count", Source::Compute, full_stats()),
indexes_to_p2wsh_count: computed_h!("p2wsh_count", Source::Compute, full_stats()),
indexes_to_opreturn_count: computed_h!("opreturn_count", Source::Compute, full_stats()),
indexes_to_unknownoutput_count: computed_h!("unknownoutput_count", Source::Compute, full_stats()),
indexes_to_emptyoutput_count: computed_h!("emptyoutput_count", Source::Compute, full_stats()),
indexes_to_unknownoutput_count: computed_h!(
"unknownoutput_count",
Source::Compute,
full_stats()
),
indexes_to_emptyoutput_count: computed_h!(
"emptyoutput_count",
Source::Compute,
full_stats()
),
indexes_to_exact_utxo_count: computed_h!("exact_utxo_count", Source::Compute, last()),
indexes_to_subsidy_usd_1y_sma: compute_dollars
.then(|| ComputedVecsFromDateIndex::forced_import(&db, "subsidy_usd_1y_sma", Source::Compute, version + v0, indexes, last()))
.then(|| {
ComputedVecsFromDateIndex::forced_import(
&db,
"subsidy_usd_1y_sma",
Source::Compute,
version + v0,
indexes,
last(),
)
})
.transpose()?,
indexes_to_puell_multiple: compute_dollars
.then(|| ComputedVecsFromDateIndex::forced_import(&db, "puell_multiple", Source::Compute, version + v0, indexes, last()))
.then(|| {
ComputedVecsFromDateIndex::forced_import(
&db,
"puell_multiple",
Source::Compute,
version + v0,
indexes,
last(),
)
})
.transpose()?,
indexes_to_hash_rate: computed_h!("hash_rate", Source::Compute, v5, last()),
indexes_to_hash_rate_1w_sma: computed_di!("hash_rate_1w_sma", last()),
indexes_to_hash_rate_1m_sma: computed_di!("hash_rate_1m_sma", last()),
indexes_to_hash_rate_2m_sma: computed_di!("hash_rate_2m_sma", last()),
indexes_to_hash_rate_1y_sma: computed_di!("hash_rate_1y_sma", last()),
indexes_to_difficulty_as_hash: computed_h!("difficulty_as_hash", Source::Compute, last()),
indexes_to_difficulty_adjustment: computed_h!("difficulty_adjustment", Source::Compute, sum()),
indexes_to_blocks_before_next_difficulty_adjustment: computed_h!("blocks_before_next_difficulty_adjustment", Source::Compute, v2, last()),
indexes_to_days_before_next_difficulty_adjustment: computed_h!("days_before_next_difficulty_adjustment", Source::Compute, v2, last()),
indexes_to_blocks_before_next_halving: computed_h!("blocks_before_next_halving", Source::Compute, v2, last()),
indexes_to_days_before_next_halving: computed_h!("days_before_next_halving", Source::Compute, v2, last()),
indexes_to_difficulty_as_hash: computed_h!(
"difficulty_as_hash",
Source::Compute,
last()
),
indexes_to_difficulty_adjustment: computed_h!(
"difficulty_adjustment",
Source::Compute,
sum()
),
indexes_to_blocks_before_next_difficulty_adjustment: computed_h!(
"blocks_before_next_difficulty_adjustment",
Source::Compute,
v2,
last()
),
indexes_to_days_before_next_difficulty_adjustment: computed_h!(
"days_before_next_difficulty_adjustment",
Source::Compute,
v2,
last()
),
indexes_to_blocks_before_next_halving: computed_h!(
"blocks_before_next_halving",
Source::Compute,
v2,
last()
),
indexes_to_days_before_next_halving: computed_h!(
"days_before_next_halving",
Source::Compute,
v2,
last()
),
indexes_to_hash_price_ths: computed_h!("hash_price_ths", Source::Compute, v4, last()),
indexes_to_hash_price_phs: computed_h!("hash_price_phs", Source::Compute, v4, last()),
indexes_to_hash_value_ths: computed_h!("hash_value_ths", Source::Compute, v4, last()),
indexes_to_hash_value_phs: computed_h!("hash_value_phs", Source::Compute, v4, last()),
indexes_to_hash_price_ths_min: computed_h!("hash_price_ths_min", Source::Compute, v4, last()),
indexes_to_hash_price_phs_min: computed_h!("hash_price_phs_min", Source::Compute, v4, last()),
indexes_to_hash_price_rebound: computed_h!("hash_price_rebound", Source::Compute, v4, last()),
indexes_to_hash_value_ths_min: computed_h!("hash_value_ths_min", Source::Compute, v4, last()),
indexes_to_hash_value_phs_min: computed_h!("hash_value_phs_min", Source::Compute, v4, last()),
indexes_to_hash_value_rebound: computed_h!("hash_value_rebound", Source::Compute, v4, last()),
indexes_to_hash_price_ths_min: computed_h!(
"hash_price_ths_min",
Source::Compute,
v4,
last()
),
indexes_to_hash_price_phs_min: computed_h!(
"hash_price_phs_min",
Source::Compute,
v4,
last()
),
indexes_to_hash_price_rebound: computed_h!(
"hash_price_rebound",
Source::Compute,
v4,
last()
),
indexes_to_hash_value_ths_min: computed_h!(
"hash_value_ths_min",
Source::Compute,
v4,
last()
),
indexes_to_hash_value_phs_min: computed_h!(
"hash_value_phs_min",
Source::Compute,
v4,
last()
),
indexes_to_hash_value_rebound: computed_h!(
"hash_value_rebound",
Source::Compute,
v4,
last()
),
indexes_to_inflation_rate: computed_di!("inflation_rate", last()),
indexes_to_annualized_volume: computed_di!("annualized_volume", last()),
indexes_to_annualized_volume_btc: computed_di!("annualized_volume_btc", last()),

View File

@@ -94,7 +94,7 @@ impl Vecs {
prev_timestamp = Some(v);
Ok(())
})?;
self.height_to_price_ohlc_in_cents.safe_flush(exit)?;
self.height_to_price_ohlc_in_cents.safe_write(exit)?;
let index = starting_indexes
.dateindex
@@ -130,7 +130,7 @@ impl Vecs {
Ok(())
})?;
self.dateindex_to_price_ohlc_in_cents.safe_flush(exit)?;
self.dateindex_to_price_ohlc_in_cents.safe_write(exit)?;
Ok(())
}

View File

@@ -238,7 +238,7 @@ where
Ok(())
})?;
self.safe_flush(exit)?;
self.safe_write(exit)?;
Ok(())
}
@@ -335,7 +335,7 @@ where
Ok(())
})?;
self.safe_flush(exit)?;
self.safe_write(exit)?;
Ok(())
}
@@ -460,7 +460,7 @@ where
Ok(())
})?;
self.safe_flush(exit)?;
self.safe_write(exit)?;
Ok(())
}
@@ -527,42 +527,42 @@ where
self.cumulative.u()
}
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
if let Some(first) = self.first.as_mut() {
first.safe_flush(exit)?;
first.safe_write(exit)?;
}
if let Some(last) = self.last.as_mut() {
last.safe_flush(exit)?;
last.safe_write(exit)?;
}
if let Some(min) = self.min.as_mut() {
min.safe_flush(exit)?;
min.safe_write(exit)?;
}
if let Some(max) = self.max.as_mut() {
max.safe_flush(exit)?;
max.safe_write(exit)?;
}
if let Some(median) = self.median.as_mut() {
median.safe_flush(exit)?;
median.safe_write(exit)?;
}
if let Some(average) = self.average.as_mut() {
average.safe_flush(exit)?;
average.safe_write(exit)?;
}
if let Some(sum) = self.sum.as_mut() {
sum.safe_flush(exit)?;
sum.safe_write(exit)?;
}
if let Some(cumulative) = self.cumulative.as_mut() {
cumulative.safe_flush(exit)?;
cumulative.safe_write(exit)?;
}
if let Some(pct90) = self.pct90.as_mut() {
pct90.safe_flush(exit)?;
pct90.safe_write(exit)?;
}
if let Some(pct75) = self.pct75.as_mut() {
pct75.safe_flush(exit)?;
pct75.safe_write(exit)?;
}
if let Some(pct25) = self.pct25.as_mut() {
pct25.safe_flush(exit)?;
pct25.safe_write(exit)?;
}
if let Some(pct10) = self.pct10.as_mut() {
pct10.safe_flush(exit)?;
pct10.safe_write(exit)?;
}
Ok(())

View File

@@ -330,7 +330,7 @@ impl ComputedVecsFromTxindex<Bitcoin> {
Ok(())
})?;
self.height.safe_flush(exit)?;
self.height.safe_write(exit)?;
self.compute_after_height(indexes, starting_indexes, exit)
}
@@ -455,7 +455,7 @@ impl ComputedVecsFromTxindex<Dollars> {
Ok(())
})?;
self.height.safe_flush(exit)?;
self.height.safe_write(exit)?;
self.compute_after_height(indexes, starting_indexes, exit)
}

View File

@@ -95,6 +95,15 @@ impl Flushable for PricePercentiles {
}
Ok(())
}
fn safe_write(&mut self, exit: &Exit) -> Result<()> {
for vec in self.vecs.iter_mut().flatten() {
if let Some(height_vec) = vec.height.as_mut() {
height_vec.safe_write(exit)?;
}
}
Ok(())
}
}
impl Traversable for PricePercentiles {

View File

@@ -57,12 +57,14 @@ impl Computer {
fetcher: Option<Fetcher>,
) -> Result<Self> {
info!("Importing computer...");
let import_start = Instant::now();
let computed_path = outputs_path.join("computed");
const STACK_SIZE: usize = 512 * 1024 * 1024;
let big_thread = || thread::Builder::new().stack_size(STACK_SIZE);
let i = Instant::now();
let (indexes, fetched, blks) = thread::scope(|s| -> Result<_> {
let fetched_handle = fetcher
.map(|fetcher| {
@@ -81,7 +83,9 @@ impl Computer {
Ok((indexes, fetched, blks))
})?;
info!("Imported indexes/fetched/blks in {:?}", i.elapsed());
let i = Instant::now();
let (price, constants, market) = thread::scope(|s| -> Result<_> {
let constants_handle = big_thread().spawn_scoped(s, || {
constants::Vecs::forced_import(&computed_path, VERSION, &indexes)
@@ -100,7 +104,9 @@ impl Computer {
Ok((price, constants, market))
})?;
info!("Imported price/constants/market in {:?}", i.elapsed());
let i = Instant::now();
let (chain, pools, cointime) = thread::scope(|s| -> Result<_> {
let chain_handle = big_thread().spawn_scoped(s, || {
chain::Vecs::forced_import(
@@ -124,10 +130,15 @@ impl Computer {
Ok((chain, pools, cointime))
})?;
info!("Imported chain/pools/cointime in {:?}", i.elapsed());
// Threads inside
let i = Instant::now();
let stateful =
stateful::Vecs::forced_import(&computed_path, VERSION, &indexes, price.as_ref())?;
info!("Imported stateful in {:?}", i.elapsed());
info!("Total import time: {:?}", import_start.elapsed());
Ok(Self {
constants,
@@ -150,6 +161,7 @@ impl Computer {
reader: &Reader,
exit: &Exit,
) -> Result<()> {
let compute_start = Instant::now();
info!("Computing indexes...");
let i = Instant::now();
let mut starting_indexes = self.indexes.compute(indexer, starting_indexes, exit)?;
@@ -163,12 +175,9 @@ impl Computer {
info!("Computing prices...");
let i = Instant::now();
self.price.um().compute(
&self.indexes,
&starting_indexes,
fetched,
exit,
)?;
self.price
.um()
.compute(&self.indexes, &starting_indexes, fetched, exit)?;
info!("Computed prices in {:?}", i.elapsed());
}
@@ -231,6 +240,7 @@ impl Computer {
info!("Computed pools in {:?}", i.elapsed());
info!("Computing stateful...");
let i = Instant::now();
self.stateful.compute(
indexer,
&self.indexes,
@@ -239,8 +249,10 @@ impl Computer {
&mut starting_indexes,
exit,
)?;
info!("Computed stateful in {:?}", i.elapsed());
info!("Computing cointime...");
let i = Instant::now();
self.cointime.compute(
&self.indexes,
&starting_indexes,
@@ -249,7 +261,9 @@ impl Computer {
&self.stateful,
exit,
)?;
info!("Computed cointime in {:?}", i.elapsed());
info!("Total compute time: {:?}", compute_start.elapsed());
Ok(())
}
}

View File

@@ -205,7 +205,7 @@ impl Vecs {
Ok(())
})?;
self.height_to_pool.safe_flush(exit)?;
self.height_to_pool.safe_write(exit)?;
Ok(())
}
}

View File

@@ -154,7 +154,7 @@ impl DynCohortVecs for Vecs {
}
fn safe_flush_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
self.height_to_addr_count.safe_flush(exit)?;
self.height_to_addr_count.safe_write(exit)?;
self.inner
.safe_flush_stateful_vecs(height, exit, &mut self.state.um().inner)

View File

@@ -1,4 +1,4 @@
use brk_error::Result;
use brk_error::{Error, Result};
use brk_traversable::Traversable;
use brk_types::{
AnyAddressIndex, EmptyAddressData, EmptyAddressIndex, Height, LoadedAddressData,
@@ -93,6 +93,48 @@ impl AnyAddressIndexesVecs {
}
}
pub fn get_anyaddressindex_once(
&self,
address_type: OutputType,
typeindex: TypeIndex,
) -> Result<AnyAddressIndex> {
match address_type {
OutputType::P2PK33 => self
.p2pk33
.read_at_once(typeindex.into())
.map_err(|e| e.into()),
OutputType::P2PK65 => self
.p2pk65
.read_at_once(typeindex.into())
.map_err(|e| e.into()),
OutputType::P2PKH => self
.p2pkh
.read_at_once(typeindex.into())
.map_err(|e| e.into()),
OutputType::P2SH => self
.p2sh
.read_at_once(typeindex.into())
.map_err(|e| e.into()),
OutputType::P2TR => self
.p2tr
.read_at_once(typeindex.into())
.map_err(|e| e.into()),
OutputType::P2WPKH => self
.p2wpkh
.read_at_once(typeindex.into())
.map_err(|e| e.into()),
OutputType::P2WSH => self
.p2wsh
.read_at_once(typeindex.into())
.map_err(|e| e.into()),
OutputType::P2A => self
.p2a
.read_at_once(typeindex.into())
.map_err(|e| e.into()),
_ => Err(Error::UnsupportedType(address_type.to_string())),
}
}
pub fn update_or_push(
&mut self,
address_type: OutputType,

View File

@@ -135,40 +135,40 @@ impl Vecs {
exit: &Exit,
state: &mut CohortState,
) -> Result<()> {
self.height_to_supply.safe_flush(exit)?;
self.height_to_utxo_count.safe_flush(exit)?;
self.height_to_sent.safe_flush(exit)?;
self.height_to_satdays_destroyed.safe_flush(exit)?;
self.height_to_satblocks_destroyed.safe_flush(exit)?;
self.height_to_supply.safe_write(exit)?;
self.height_to_utxo_count.safe_write(exit)?;
self.height_to_sent.safe_write(exit)?;
self.height_to_satdays_destroyed.safe_write(exit)?;
self.height_to_satblocks_destroyed.safe_write(exit)?;
if let Some(height_to_realized_cap) = self.height_to_realized_cap.as_mut() {
height_to_realized_cap.safe_flush(exit)?;
self.height_to_realized_profit.um().safe_flush(exit)?;
self.height_to_realized_loss.um().safe_flush(exit)?;
self.height_to_value_created.um().safe_flush(exit)?;
self.height_to_value_destroyed.um().safe_flush(exit)?;
self.height_to_supply_in_profit.um().safe_flush(exit)?;
self.height_to_supply_in_loss.um().safe_flush(exit)?;
self.height_to_unrealized_profit.um().safe_flush(exit)?;
self.height_to_unrealized_loss.um().safe_flush(exit)?;
self.dateindex_to_supply_in_profit.um().safe_flush(exit)?;
self.dateindex_to_supply_in_loss.um().safe_flush(exit)?;
self.dateindex_to_unrealized_profit.um().safe_flush(exit)?;
self.dateindex_to_unrealized_loss.um().safe_flush(exit)?;
self.height_to_min_price_paid.um().safe_flush(exit)?;
self.height_to_max_price_paid.um().safe_flush(exit)?;
height_to_realized_cap.safe_write(exit)?;
self.height_to_realized_profit.um().safe_write(exit)?;
self.height_to_realized_loss.um().safe_write(exit)?;
self.height_to_value_created.um().safe_write(exit)?;
self.height_to_value_destroyed.um().safe_write(exit)?;
self.height_to_supply_in_profit.um().safe_write(exit)?;
self.height_to_supply_in_loss.um().safe_write(exit)?;
self.height_to_unrealized_profit.um().safe_write(exit)?;
self.height_to_unrealized_loss.um().safe_write(exit)?;
self.dateindex_to_supply_in_profit.um().safe_write(exit)?;
self.dateindex_to_supply_in_loss.um().safe_write(exit)?;
self.dateindex_to_unrealized_profit.um().safe_write(exit)?;
self.dateindex_to_unrealized_loss.um().safe_write(exit)?;
self.height_to_min_price_paid.um().safe_write(exit)?;
self.height_to_max_price_paid.um().safe_write(exit)?;
if self.height_to_adjusted_value_created.is_some() {
self.height_to_adjusted_value_created
.um()
.safe_flush(exit)?;
.safe_write(exit)?;
self.height_to_adjusted_value_destroyed
.um()
.safe_flush(exit)?;
.safe_write(exit)?;
}
// Uses Flushable trait - Option<T> impl handles None case
self.price_percentiles.safe_flush(exit)?;
self.price_percentiles.safe_write(exit)?;
}
state.commit(height)?;

View File

@@ -14,6 +14,9 @@ use vecdb::Exit;
pub trait Flushable {
/// Safely flush data to disk.
fn safe_flush(&mut self, exit: &Exit) -> Result<()>;
/// Write to mmap without fsync. Data visible to readers immediately but not durable.
fn safe_write(&mut self, exit: &Exit) -> Result<()>;
}
/// Trait for stateful components that track data indexed by height.
@@ -42,6 +45,13 @@ impl<T: Flushable> Flushable for Option<T> {
}
Ok(())
}
fn safe_write(&mut self, exit: &Exit) -> Result<()> {
if let Some(inner) = self.as_mut() {
inner.safe_write(exit)?;
}
Ok(())
}
}
/// Blanket implementation for Option<T> where T: HeightFlushable

View File

@@ -1473,8 +1473,8 @@ impl Vecs {
self.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?;
self.address_cohorts
.safe_flush_stateful_vecs(height, exit)?;
self.height_to_unspendable_supply.safe_flush(exit)?;
self.height_to_opreturn_supply.safe_flush(exit)?;
self.height_to_unspendable_supply.safe_write(exit)?;
self.height_to_opreturn_supply.safe_write(exit)?;
self.addresstype_to_height_to_addr_count
.values_mut()
.try_for_each(|v| v.safe_flush(exit))?;

View File

@@ -617,7 +617,7 @@ impl Vecs {
// Using traits ensures we can't forget to flush any field
self.0.par_iter_aggregate_mut().try_for_each(|v| {
v.price_to_amount.flush_at_height(height, exit)?;
v.inner.price_percentiles.safe_flush(exit)?;
v.inner.price_percentiles.safe_write(exit)?;
Ok(())
})
}

View File

@@ -74,7 +74,7 @@ impl ComputeDCAStackViaLen for EagerVec<PcoVec<DateIndex, Sats>> {
self.truncate_push_at(i, stack)
})?;
self.safe_flush(exit)?;
self.safe_write(exit)?;
Ok(())
}
@@ -118,7 +118,7 @@ impl ComputeDCAStackViaLen for EagerVec<PcoVec<DateIndex, Sats>> {
self.truncate_push_at(i, stack)
})?;
self.safe_flush(exit)?;
self.safe_write(exit)?;
Ok(())
}
@@ -176,7 +176,7 @@ impl ComputeDCAAveragePriceViaLen for EagerVec<PcoVec<DateIndex, Dollars>> {
self.truncate_push_at(i, avg_price)
})?;
self.safe_flush(exit)?;
self.safe_write(exit)?;
Ok(())
}
@@ -208,7 +208,7 @@ impl ComputeDCAAveragePriceViaLen for EagerVec<PcoVec<DateIndex, Dollars>> {
self.truncate_push_at(i, avg_price)
})?;
self.safe_flush(exit)?;
self.safe_write(exit)?;
Ok(())
}