global: snapshot

This commit is contained in:
nym21
2026-03-05 16:11:25 +01:00
parent 6f2a87be4f
commit eedb8d22c1
61 changed files with 2035 additions and 2757 deletions

View File

@@ -1,5 +1,3 @@
use std::thread;
use brk_cohort::ByAddressType;
use brk_error::Result;
use brk_indexer::Indexer;
@@ -95,6 +93,15 @@ pub(crate) fn process_blocks(
let height_to_timestamp_collected = &cached_timestamps[start_usize..end_usize];
let height_to_price_collected = &cached_prices[start_usize..end_usize];
// Pre-compute day boundaries to avoid per-block division in the hot loop
let is_last_of_day: Vec<bool> = (start_usize..end_usize)
.map(|h| {
h == end_usize - 1
|| *cached_timestamps[h] / ONE_DAY_IN_SEC
!= *cached_timestamps[h + 1] / ONE_DAY_IN_SEC
})
.collect();
debug!("creating VecsReaders");
let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
debug!("VecsReaders created");
@@ -246,14 +253,11 @@ pub(crate) fn process_blocks(
p2wsh: TypeIndex::from(first_p2wsh_vec[offset].to_usize()),
};
// Reset per-block values for all separate cohorts
reset_block_values(&mut vecs.utxo_cohorts, &mut vecs.address_cohorts);
// Reset per-block activity counts
activity_counts.reset();
// Collect output/input data using reusable iterators (16KB buffered reads)
// Must be done before thread::scope since iterators aren't Send
// Must be done before rayon::join since iterators aren't Send
let txoutdata_vec = txout_iters.collect_block_outputs(first_txoutindex, output_count);
let (input_values, input_prev_heights, input_outputtypes, input_typeindexes) =
@@ -263,55 +267,54 @@ pub(crate) fn process_blocks(
(&[][..], &[][..], &[][..], &[][..])
};
// Process outputs and inputs in parallel with tick-tock
let (outputs_result, inputs_result) = thread::scope(|scope| -> Result<_> {
// Tick-tock age transitions in background
scope.spawn(|| {
// Process outputs, inputs, and tick-tock in parallel via rayon::join
let (_, oi_result) = rayon::join(
|| {
vecs.utxo_cohorts
.tick_tock_next_block(chain_state, timestamp);
});
let outputs_handle = scope.spawn(|| {
// Process outputs (receive)
process_outputs(
txoutindex_to_txindex,
txoutdata_vec,
&first_addressindexes,
&cache,
&vr,
&vecs.any_address_indexes,
&vecs.addresses_data,
)
});
// Process inputs (send) - skip coinbase input
let inputs_result = if input_count > 1 {
process_inputs(
input_count - 1,
&txinindex_to_txindex[1..], // Skip coinbase
input_values,
input_outputtypes,
input_typeindexes,
input_prev_heights,
&first_addressindexes,
&cache,
&vr,
&vecs.any_address_indexes,
&vecs.addresses_data,
)?
} else {
InputsResult {
height_to_sent: Default::default(),
sent_data: Default::default(),
address_data: Default::default(),
txindex_vecs: Default::default(),
}
};
let outputs_result = outputs_handle.join().unwrap()?;
Ok((outputs_result, inputs_result))
})?;
},
|| -> Result<_> {
let (outputs_result, inputs_result) = rayon::join(
|| {
process_outputs(
txoutindex_to_txindex,
txoutdata_vec,
&first_addressindexes,
&cache,
&vr,
&vecs.any_address_indexes,
&vecs.addresses_data,
)
},
|| -> Result<_> {
if input_count > 1 {
process_inputs(
input_count - 1,
&txinindex_to_txindex[1..],
input_values,
input_outputtypes,
input_typeindexes,
input_prev_heights,
&first_addressindexes,
&cache,
&vr,
&vecs.any_address_indexes,
&vecs.addresses_data,
)
} else {
Ok(InputsResult {
height_to_sent: Default::default(),
sent_data: Default::default(),
address_data: Default::default(),
txindex_vecs: Default::default(),
})
}
},
);
Ok((outputs_result?, inputs_result?))
},
);
let (outputs_result, inputs_result) = oi_result?;
// Merge new address data into current cache
cache.merge_funded(outputs_result.address_data);
@@ -363,11 +366,20 @@ pub(crate) fn process_blocks(
}
// Process UTXO cohorts and Address cohorts in parallel
// - Main thread: UTXO cohorts receive/send
// - Spawned thread: Address cohorts process_received/process_sent
thread::scope(|scope| {
// Spawn address cohort processing in background thread
scope.spawn(|| {
let (_, addr_result) = rayon::join(
|| {
// UTXO cohorts receive/send
vecs.utxo_cohorts
.receive(transacted, height, timestamp, block_price);
if let Some(min_h) =
vecs.utxo_cohorts
.send(height_to_sent, chain_state, ctx.price_range_max)
{
min_supply_modified =
Some(min_supply_modified.map_or(min_h, |cur| cur.min(min_h)));
}
},
|| -> Result<()> {
let mut lookup = cache.as_lookup();
// Process received outputs (addresses receiving funds)
@@ -382,7 +394,6 @@ pub(crate) fn process_blocks(
);
// Process sent inputs (addresses sending funds)
// Uses separate price/timestamp vecs to avoid borrowing chain_state
process_sent(
inputs_result.sent_data,
&mut vecs.address_cohorts,
@@ -399,19 +410,9 @@ pub(crate) fn process_blocks(
timestamp,
&mut seen_senders,
)
.unwrap();
});
// Main thread: Update UTXO cohorts
vecs.utxo_cohorts
.receive(transacted, height, timestamp, block_price);
if let Some(min_h) =
vecs.utxo_cohorts
.send(height_to_sent, chain_state, ctx.price_range_max)
{
min_supply_modified = Some(min_supply_modified.map_or(min_h, |cur| cur.min(min_h)));
}
});
},
);
addr_result?;
// Push to height-indexed vectors
vecs.addr_count
@@ -424,9 +425,7 @@ pub(crate) fn process_blocks(
vecs.address_activity
.truncate_push_height(height, &activity_counts)?;
let h = height.to_usize();
let is_last_of_day = height == last_height
|| *cached_timestamps[h] / ONE_DAY_IN_SEC != *cached_timestamps[h + 1] / ONE_DAY_IN_SEC;
let is_last_of_day = is_last_of_day[offset];
let date_opt = is_last_of_day.then(|| Date::from(timestamp));
push_cohort_states(
@@ -434,11 +433,11 @@ pub(crate) fn process_blocks(
&mut vecs.address_cohorts,
height,
block_price,
date_opt.is_some(),
)?;
vecs.utxo_cohorts.truncate_push_aggregate_percentiles(
height,
block_price,
date_opt,
&vecs.states_path,
)?;
@@ -494,36 +493,42 @@ pub(crate) fn process_blocks(
Ok(())
}
/// Reset per-block values for all separate cohorts.
fn reset_block_values(utxo_cohorts: &mut UTXOCohorts, address_cohorts: &mut AddressCohorts) {
utxo_cohorts
.iter_separate_mut()
.for_each(|v| v.reset_single_iteration_values());
address_cohorts
.iter_separate_mut()
.for_each(|v| v.reset_single_iteration_values());
}
/// Push cohort states to height-indexed vectors.
/// Push cohort states to height-indexed vectors, then reset per-block values.
fn push_cohort_states(
utxo_cohorts: &mut UTXOCohorts,
address_cohorts: &mut AddressCohorts,
height: Height,
height_price: Cents,
is_day_boundary: bool,
) -> Result<()> {
let (r1, r2) = rayon::join(
|| {
utxo_cohorts.par_iter_separate_mut().try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(height, height_price)
})
utxo_cohorts
.par_iter_separate_mut()
.try_for_each(|v| -> Result<()> {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(
height,
height_price,
is_day_boundary,
)?;
v.reset_single_iteration_values();
Ok(())
})
},
|| {
address_cohorts.par_iter_separate_mut().try_for_each(|v| {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(height, height_price)
})
address_cohorts
.par_iter_separate_mut()
.try_for_each(|v| -> Result<()> {
v.truncate_push(height)?;
v.compute_then_truncate_push_unrealized_states(
height,
height_price,
is_day_boundary,
)?;
v.reset_single_iteration_values();
Ok(())
})
},
);
r1?;