computer: snapshot

This commit is contained in:
nym21
2026-02-27 10:54:36 +01:00
parent 72c17096ea
commit c75421f46e
44 changed files with 1079 additions and 722 deletions

View File

@@ -171,7 +171,7 @@ impl Vecs {
_30d: &self.height_1m_ago,
_1y: &self.height_1y_ago,
};
self.block_count.rolling.compute_rolling_sum(
self.block_count.sum.compute_rolling_sum(
starting_indexes.height,
&ws,
&self.block_count.height,

View File

@@ -128,7 +128,7 @@ pub(crate) fn process_blocks(
debug!("txindex_to_height RangeMap built");
// Create reusable iterators for sequential txout/txin reads (16KB buffered)
let txout_iters = TxOutReaders::new(indexer);
let mut txout_iters = TxOutReaders::new(indexer);
let mut txin_iters = TxInReaders::new(indexer, inputs, &mut txindex_to_height);
// Pre-collect first address indexes per type for the block range

View File

@@ -21,32 +21,40 @@ pub struct TxOutData {
pub typeindex: TypeIndex,
}
/// Readers for txout vectors. Uses collect_range for bulk reads.
/// Readers for txout vectors. Reuses internal buffers across blocks.
pub struct TxOutReaders<'a> {
indexer: &'a Indexer,
values_buf: Vec<Sats>,
outputtypes_buf: Vec<OutputType>,
typeindexes_buf: Vec<TypeIndex>,
}
impl<'a> TxOutReaders<'a> {
pub(crate) fn new(indexer: &'a Indexer) -> Self {
Self { indexer }
Self {
indexer,
values_buf: Vec::new(),
outputtypes_buf: Vec::new(),
typeindexes_buf: Vec::new(),
}
}
/// Collect output data for a block range using bulk reads.
/// Collect output data for a block range using bulk reads with buffer reuse.
pub(crate) fn collect_block_outputs(
&self,
&mut self,
first_txoutindex: usize,
output_count: usize,
) -> Vec<TxOutData> {
let end = first_txoutindex + output_count;
let values: Vec<Sats> = self.indexer.vecs.outputs.value.collect_range_at(first_txoutindex, end);
let outputtypes: Vec<OutputType> = self.indexer.vecs.outputs.outputtype.collect_range_at(first_txoutindex, end);
let typeindexes: Vec<TypeIndex> = self.indexer.vecs.outputs.typeindex.collect_range_at(first_txoutindex, end);
self.indexer.vecs.outputs.value.collect_range_into_at(first_txoutindex, end, &mut self.values_buf);
self.indexer.vecs.outputs.outputtype.collect_range_into_at(first_txoutindex, end, &mut self.outputtypes_buf);
self.indexer.vecs.outputs.typeindex.collect_range_into_at(first_txoutindex, end, &mut self.typeindexes_buf);
values
.into_iter()
.zip(outputtypes)
.zip(typeindexes)
.map(|((value, outputtype), typeindex)| TxOutData {
self.values_buf
.iter()
.zip(&self.outputtypes_buf)
.zip(&self.typeindexes_buf)
.map(|((&value, &outputtype), &typeindex)| TxOutData {
value,
outputtype,
typeindex,
@@ -55,11 +63,12 @@ impl<'a> TxOutReaders<'a> {
}
}
/// Readers for txin vectors. Uses collect_range for bulk reads.
/// Readers for txin vectors. Reuses outpoint buffer across blocks.
pub struct TxInReaders<'a> {
indexer: &'a Indexer,
txins: &'a inputs::Vecs,
txindex_to_height: &'a mut RangeMap<TxIndex, Height>,
outpoints_buf: Vec<OutPoint>,
}
impl<'a> TxInReaders<'a> {
@@ -72,11 +81,12 @@ impl<'a> TxInReaders<'a> {
indexer,
txins,
txindex_to_height,
outpoints_buf: Vec::new(),
}
}
/// Collect input data for a block range using bulk reads.
/// Computes prev_height on-the-fly from outpoint using RangeMap lookup.
/// Outpoint buffer is reused across blocks; returned vecs are fresh (caller-owned).
pub(crate) fn collect_block_inputs(
&mut self,
first_txinindex: usize,
@@ -85,11 +95,11 @@ impl<'a> TxInReaders<'a> {
) -> (Vec<Sats>, Vec<Height>, Vec<OutputType>, Vec<TypeIndex>) {
let end = first_txinindex + input_count;
let values: Vec<Sats> = self.txins.spent.value.collect_range_at(first_txinindex, end);
let outpoints: Vec<OutPoint> = self.indexer.vecs.inputs.outpoint.collect_range_at(first_txinindex, end);
self.indexer.vecs.inputs.outpoint.collect_range_into_at(first_txinindex, end, &mut self.outpoints_buf);
let outputtypes: Vec<OutputType> = self.indexer.vecs.inputs.outputtype.collect_range_at(first_txinindex, end);
let typeindexes: Vec<TypeIndex> = self.indexer.vecs.inputs.typeindex.collect_range_at(first_txinindex, end);
let prev_heights: Vec<Height> = outpoints
let prev_heights: Vec<Height> = self.outpoints_buf
.iter()
.map(|outpoint| {
if outpoint.is_coinbase() {

View File

@@ -11,8 +11,6 @@ use brk_types::{
use rustc_hash::FxHashMap;
use vecdb::Bytes;
use crate::utils::OptionExt;
use super::{CachedUnrealizedState, Percentiles, UnrealizedState};
/// Type alias for the price-to-sats map used in cost basis data.
@@ -97,17 +95,17 @@ impl CostBasisData {
pub(crate) fn iter(&self) -> impl Iterator<Item = (CentsCompact, &Sats)> {
self.assert_pending_empty();
self.state.u().base.map.iter().map(|(&k, v)| (k, v))
self.state.as_ref().unwrap().base.map.iter().map(|(&k, v)| (k, v))
}
pub(crate) fn is_empty(&self) -> bool {
self.pending.is_empty() && self.state.u().base.map.is_empty()
self.pending.is_empty() && self.state.as_ref().unwrap().base.map.is_empty()
}
pub(crate) fn first_key_value(&self) -> Option<(CentsCompact, &Sats)> {
self.assert_pending_empty();
self.state
.u()
.as_ref().unwrap()
.base
.map
.first_key_value()
@@ -117,7 +115,7 @@ impl CostBasisData {
pub(crate) fn last_key_value(&self) -> Option<(CentsCompact, &Sats)> {
self.assert_pending_empty();
self.state
.u()
.as_ref().unwrap()
.base
.map
.last_key_value()
@@ -127,13 +125,13 @@ impl CostBasisData {
/// Get the exact cap_raw value (not recomputed from map).
pub(crate) fn cap_raw(&self) -> CentsSats {
self.assert_pending_empty();
self.state.u().cap_raw
self.state.as_ref().unwrap().cap_raw
}
/// Get the exact investor_cap_raw value (not recomputed from map).
pub(crate) fn investor_cap_raw(&self) -> CentsSquaredSats {
self.assert_pending_empty();
self.state.u().investor_cap_raw
self.state.as_ref().unwrap().investor_cap_raw
}
/// Increment with pre-computed typed values.
@@ -181,7 +179,7 @@ impl CostBasisData {
self.percentiles_dirty = true;
}
for (cents, (inc, dec)) in self.pending.drain() {
let entry = self.state.um().base.map.entry(cents).or_default();
let entry = self.state.as_mut().unwrap().base.map.entry(cents).or_default();
*entry += inc;
if *entry < dec {
panic!(
@@ -198,12 +196,12 @@ impl CostBasisData {
}
*entry -= dec;
if *entry == Sats::ZERO {
self.state.um().base.map.remove(&cents);
self.state.as_mut().unwrap().base.map.remove(&cents);
}
}
// Apply raw values
let state = self.state.um();
let state = self.state.as_mut().unwrap();
state.cap_raw += self.pending_raw.cap_inc;
// Check for underflow before subtracting
@@ -271,7 +269,7 @@ impl CostBasisData {
);
}
let map = &self.state.u().base.map;
let map = &self.state.as_ref().unwrap().base.map;
let date_state =
date_price.map(|p| CachedUnrealizedState::compute_full_standalone(p.into(), map));
@@ -336,7 +334,7 @@ impl CostBasisData {
}
}
fs::write(self.path_state(height), self.state.u().serialize()?)?;
fs::write(self.path_state(height), self.state.as_ref().unwrap().serialize()?)?;
Ok(())
}

View File

@@ -48,19 +48,18 @@ impl Vecs {
while batch_start < target {
let batch_end = (batch_start + BATCH_SIZE).min(target);
let outpoints = indexer.vecs.inputs.outpoint.collect_range_at(batch_start, batch_end);
entries.clear();
for (j, outpoint) in outpoints.into_iter().enumerate() {
let txinindex = TxInIndex::from(batch_start + j);
let mut j = 0usize;
indexer.vecs.inputs.outpoint.for_each_range_at(batch_start, batch_end, |outpoint| {
entries.push(Entry {
txinindex,
txinindex: TxInIndex::from(batch_start + j),
txindex: outpoint.txindex(),
vout: outpoint.vout(),
txoutindex: TxOutIndex::COINBASE,
value: Sats::MAX,
});
}
j += 1;
});
// Coinbase entries (txindex MAX) sorted to end
entries.sort_unstable_by_key(|e| e.txindex);

View File

@@ -11,7 +11,7 @@ use vecdb::{
VecValue,
};
use crate::utils::get_percentile;
use brk_types::get_percentile;
use super::ComputedVecValue;
@@ -358,6 +358,7 @@ where
let window_starts_batch: Vec<I> = window_starts.collect_range_at(start, fi_len);
let zero = T::from(0_usize);
let mut values: Vec<T> = Vec::new();
first_indexes_batch
.iter()
@@ -389,8 +390,7 @@ where
vec.truncate_push_at(idx, zero)?;
}
} else {
let mut values: Vec<T> =
source.collect_range_at(range_start_usize, range_end_usize);
source.collect_range_into_at(range_start_usize, range_end_usize, &mut values);
// Compute sum before sorting
let len = values.len();

View File

@@ -15,3 +15,31 @@ pub struct DistributionStats<A, B = A, C = A, D = A, E = A, F = A, G = A, H = A>
pub p75: G,
pub p90: H,
}
impl<A> DistributionStats<A> {
/// Apply a fallible operation to each of the 8 fields.
pub fn try_for_each_mut(&mut self, mut f: impl FnMut(&mut A) -> brk_error::Result<()>) -> brk_error::Result<()> {
f(&mut self.average)?;
f(&mut self.min)?;
f(&mut self.max)?;
f(&mut self.p10)?;
f(&mut self.p25)?;
f(&mut self.median)?;
f(&mut self.p75)?;
f(&mut self.p90)?;
Ok(())
}
/// Get minimum value by applying a function to each field.
pub fn min_by(&self, mut f: impl FnMut(&A) -> usize) -> usize {
f(&self.average)
.min(f(&self.min))
.min(f(&self.max))
.min(f(&self.p10))
.min(f(&self.p25))
.min(f(&self.median))
.min(f(&self.p75))
.min(f(&self.p90))
}
}

View File

@@ -6,6 +6,7 @@ mod indexes;
mod lazy_eager_indexes;
mod multi;
mod single;
pub(crate) mod sliding_window;
mod traits;
mod windows;

View File

@@ -24,7 +24,7 @@ where
{
pub height: M::Stored<EagerVec<PcoVec<Height, T>>>,
pub cumulative: ComputedFromHeightLast<T, M>,
pub rolling: RollingWindows<T, M>,
pub sum: RollingWindows<T, M>,
}
const VERSION: Version = Version::ZERO;
@@ -49,7 +49,7 @@ where
Ok(Self {
height,
cumulative,
rolling,
sum: rolling,
})
}
@@ -68,7 +68,7 @@ where
self.cumulative
.height
.compute_cumulative(max_from, &self.height, exit)?;
self.rolling
self.sum
.compute_rolling_sum(max_from, windows, &self.height, exit)?;
Ok(())
}

View File

@@ -6,8 +6,8 @@ use vecdb::{AnyStoredVec, AnyVec, Database, EagerVec, Exit, PcoVec, ReadableVec,
use crate::{
ComputeIndexes, blocks, indexes,
internal::{ComputedFromHeightStdDevExtended, Price},
utils::get_percentile,
};
use brk_types::get_percentile;
use super::super::ComputedFromHeightLast;

View File

@@ -7,7 +7,7 @@ use brk_types::{
};
use derive_more::{Deref, DerefMut};
use schemars::JsonSchema;
use vecdb::{LazyAggVec, ReadableBoxedVec, ReadableCloneableVec};
use vecdb::{LazyAggVec, ReadOnlyClone, ReadableBoxedVec, ReadableCloneableVec};
use crate::{
indexes, indexes_from,
@@ -41,6 +41,17 @@ pub struct ComputedHeightDerivedLast<T>(
where
T: ComputedVecValue + PartialOrd + JsonSchema;
/// Already read-only (no StorageMode); cloning is sufficient.
impl<T> ReadOnlyClone for ComputedHeightDerivedLast<T>
where
T: ComputedVecValue + PartialOrd + JsonSchema,
{
type ReadOnly = Self;
fn read_only_clone(&self) -> Self {
self.clone()
}
}
const VERSION: Version = Version::ZERO;
impl<T> ComputedHeightDerivedLast<T>

View File

@@ -9,7 +9,9 @@ use brk_types::{
};
use derive_more::{Deref, DerefMut};
use schemars::JsonSchema;
use vecdb::{LazyVecFrom1, ReadableBoxedVec, ReadableCloneableVec, UnaryTransform, VecIndex, VecValue};
use vecdb::{
LazyVecFrom1, ReadableBoxedVec, ReadableCloneableVec, UnaryTransform, VecIndex, VecValue,
};
use crate::{
indexes, indexes_from,
@@ -108,7 +110,8 @@ where
where
S1T: NumericValue,
{
let derived = ComputedHeightDerivedLast::forced_import(name, height_source, version, indexes);
let derived =
ComputedHeightDerivedLast::forced_import(name, height_source, version, indexes);
Self::from_derived_computed::<F>(name, version, &derived)
}

View File

@@ -66,62 +66,33 @@ where
T: Copy + Ord + From<f64> + Default,
f64: From<T>,
{
// Single pass per window: all 8 stats extracted from one sorted vec
compute_rolling_distribution_from_starts(
max_from,
windows._24h,
source,
&mut self.0.average._24h.height,
&mut self.0.min._24h.height,
&mut self.0.max._24h.height,
&mut self.0.p10._24h.height,
&mut self.0.p25._24h.height,
&mut self.0.median._24h.height,
&mut self.0.p75._24h.height,
&mut self.0.p90._24h.height,
exit,
max_from, windows._24h, source,
&mut self.0.average._24h.height, &mut self.0.min._24h.height,
&mut self.0.max._24h.height, &mut self.0.p10._24h.height,
&mut self.0.p25._24h.height, &mut self.0.median._24h.height,
&mut self.0.p75._24h.height, &mut self.0.p90._24h.height, exit,
)?;
compute_rolling_distribution_from_starts(
max_from,
windows._7d,
source,
&mut self.0.average._7d.height,
&mut self.0.min._7d.height,
&mut self.0.max._7d.height,
&mut self.0.p10._7d.height,
&mut self.0.p25._7d.height,
&mut self.0.median._7d.height,
&mut self.0.p75._7d.height,
&mut self.0.p90._7d.height,
exit,
max_from, windows._7d, source,
&mut self.0.average._7d.height, &mut self.0.min._7d.height,
&mut self.0.max._7d.height, &mut self.0.p10._7d.height,
&mut self.0.p25._7d.height, &mut self.0.median._7d.height,
&mut self.0.p75._7d.height, &mut self.0.p90._7d.height, exit,
)?;
compute_rolling_distribution_from_starts(
max_from,
windows._30d,
source,
&mut self.0.average._30d.height,
&mut self.0.min._30d.height,
&mut self.0.max._30d.height,
&mut self.0.p10._30d.height,
&mut self.0.p25._30d.height,
&mut self.0.median._30d.height,
&mut self.0.p75._30d.height,
&mut self.0.p90._30d.height,
exit,
max_from, windows._30d, source,
&mut self.0.average._30d.height, &mut self.0.min._30d.height,
&mut self.0.max._30d.height, &mut self.0.p10._30d.height,
&mut self.0.p25._30d.height, &mut self.0.median._30d.height,
&mut self.0.p75._30d.height, &mut self.0.p90._30d.height, exit,
)?;
compute_rolling_distribution_from_starts(
max_from,
windows._1y,
source,
&mut self.0.average._1y.height,
&mut self.0.min._1y.height,
&mut self.0.max._1y.height,
&mut self.0.p10._1y.height,
&mut self.0.p25._1y.height,
&mut self.0.median._1y.height,
&mut self.0.p75._1y.height,
&mut self.0.p90._1y.height,
exit,
max_from, windows._1y, source,
&mut self.0.average._1y.height, &mut self.0.min._1y.height,
&mut self.0.max._1y.height, &mut self.0.p10._1y.height,
&mut self.0.p25._1y.height, &mut self.0.median._1y.height,
&mut self.0.p75._1y.height, &mut self.0.p90._1y.height, exit,
)?;
Ok(())

View File

@@ -72,18 +72,9 @@ impl StoredValueRollingWindows {
usd_source: &impl ReadableVec<Height, Dollars>,
exit: &Exit,
) -> Result<()> {
self.0
._24h
.compute_rolling_sum(max_from, windows._24h, sats_source, usd_source, exit)?;
self.0
._7d
.compute_rolling_sum(max_from, windows._7d, sats_source, usd_source, exit)?;
self.0
._30d
.compute_rolling_sum(max_from, windows._30d, sats_source, usd_source, exit)?;
self.0
._1y
.compute_rolling_sum(max_from, windows._1y, sats_source, usd_source, exit)?;
for (w, starts) in self.0.as_mut_array().into_iter().zip(windows.as_array()) {
w.compute_rolling_sum(max_from, starts, sats_source, usd_source, exit)?;
}
Ok(())
}
}

View File

@@ -26,6 +26,12 @@ pub struct WindowStarts<'a> {
pub _1y: &'a EagerVec<PcoVec<Height, Height>>,
}
impl<'a> WindowStarts<'a> {
pub fn as_array(&self) -> [&'a EagerVec<PcoVec<Height, Height>>; 4] {
[self._24h, self._7d, self._30d, self._1y]
}
}
/// 4 rolling window vecs (24h, 7d, 30d, 1y), each with height data + all 17 index views.
#[derive(Deref, DerefMut, Traversable)]
#[traversable(transparent)]
@@ -64,22 +70,9 @@ where
where
T: Default + SubAssign,
{
self.0
._24h
.height
.compute_rolling_sum(max_from, windows._24h, source, exit)?;
self.0
._7d
.height
.compute_rolling_sum(max_from, windows._7d, source, exit)?;
self.0
._30d
.height
.compute_rolling_sum(max_from, windows._30d, source, exit)?;
self.0
._1y
.height
.compute_rolling_sum(max_from, windows._1y, source, exit)?;
for (w, starts) in self.0.as_mut_array().into_iter().zip(windows.as_array()) {
w.height.compute_rolling_sum(max_from, starts, source, exit)?;
}
Ok(())
}
}

View File

@@ -1,6 +1,8 @@
mod block_count_target;
mod cents_to_dollars;
mod cents_to_sats;
mod ohlc_cents_to_dollars;
mod ohlc_cents_to_sats;
mod dollar_halve;
mod dollar_identity;
@@ -35,6 +37,8 @@ mod volatility_sqrt7;
pub use block_count_target::*;
pub use cents_to_dollars::*;
pub use cents_to_sats::*;
pub use ohlc_cents_to_dollars::*;
pub use ohlc_cents_to_sats::*;
pub use dollar_halve::*;
pub use dollar_identity::*;

View File

@@ -0,0 +1,11 @@
use brk_types::{OHLCCents, OHLCDollars};
use vecdb::UnaryTransform;
pub struct OhlcCentsToDollars;
impl UnaryTransform<OHLCCents, OHLCDollars> for OhlcCentsToDollars {
#[inline(always)]
fn apply(cents: OHLCCents) -> OHLCDollars {
OHLCDollars::from(cents)
}
}

View File

@@ -0,0 +1,19 @@
use brk_types::{Close, High, Low, OHLCCents, OHLCSats, Open};
use vecdb::UnaryTransform;
use super::CentsUnsignedToSats;
/// OHLCCents -> OHLCSats with high/low swapped (inverse price relationship).
pub struct OhlcCentsToSats;
impl UnaryTransform<OHLCCents, OHLCSats> for OhlcCentsToSats {
#[inline(always)]
fn apply(cents: OHLCCents) -> OHLCSats {
OHLCSats {
open: Open::new(CentsUnsignedToSats::apply(*cents.open)),
high: High::new(CentsUnsignedToSats::apply(*cents.low)),
low: Low::new(CentsUnsignedToSats::apply(*cents.high)),
close: Close::new(CentsUnsignedToSats::apply(*cents.close)),
}
}
}

View File

@@ -0,0 +1,188 @@
/// Sqrt-decomposed sorted structure for O(sqrt(n)) insert/remove/kth.
///
/// Maintains `blocks` sorted sub-arrays where each block is sorted and
/// the blocks are ordered (max of block[i] <= min of block[i+1]).
/// Total element count is tracked via `total_len`.
struct SortedBlocks {
blocks: Vec<Vec<f64>>,
total_len: usize,
block_size: usize,
}
impl SortedBlocks {
fn new(capacity: usize) -> Self {
let block_size = ((capacity as f64).sqrt() as usize).max(64);
Self {
blocks: Vec::new(),
total_len: 0,
block_size,
}
}
fn len(&self) -> usize {
self.total_len
}
fn is_empty(&self) -> bool {
self.total_len == 0
}
/// Insert a value in sorted order. O(sqrt(n)).
fn insert(&mut self, value: f64) {
self.total_len += 1;
if self.blocks.is_empty() {
self.blocks.push(vec![value]);
return;
}
// Find the block where value belongs: first block whose max >= value
let block_idx = self.blocks.iter().position(|b| {
*b.last().unwrap() >= value
}).unwrap_or(self.blocks.len() - 1);
let block = &mut self.blocks[block_idx];
let pos = block.partition_point(|a| *a < value);
block.insert(pos, value);
// Split if block too large
if block.len() > 2 * self.block_size {
let mid = block.len() / 2;
let right = block[mid..].to_vec();
block.truncate(mid);
self.blocks.insert(block_idx + 1, right);
}
}
/// Remove one occurrence of value. O(sqrt(n)).
fn remove(&mut self, value: f64) -> bool {
for (bi, block) in self.blocks.iter_mut().enumerate() {
if block.is_empty() {
continue;
}
// If value > block max, it's not in this block
if *block.last().unwrap() < value {
continue;
}
let pos = block.partition_point(|a| *a < value);
if pos < block.len() && block[pos] == value {
block.remove(pos);
self.total_len -= 1;
if block.is_empty() {
self.blocks.remove(bi);
}
return true;
}
// Value not found (would be in this block range but isn't)
return false;
}
false
}
/// Get the k-th smallest element (0-indexed). O(sqrt(n)).
fn kth(&self, mut k: usize) -> f64 {
for block in &self.blocks {
if k < block.len() {
return block[k];
}
k -= block.len();
}
unreachable!("kth out of bounds")
}
fn first(&self) -> f64 {
self.blocks.first().unwrap().first().copied().unwrap()
}
fn last(&self) -> f64 {
self.blocks.last().unwrap().last().copied().unwrap()
}
}
/// Sorted sliding window for rolling distribution/median computations.
///
/// Uses sqrt-decomposition for O(sqrt(n)) insert/remove/kth instead of
/// O(n) memmoves with a flat sorted Vec.
pub(crate) struct SlidingWindowSorted {
sorted: SortedBlocks,
running_sum: f64,
prev_start: usize,
}
impl SlidingWindowSorted {
pub fn with_capacity(cap: usize) -> Self {
Self {
sorted: SortedBlocks::new(cap),
running_sum: 0.0,
prev_start: 0,
}
}
/// Reconstruct state from historical data (the elements in [range_start..skip]).
pub fn reconstruct(&mut self, partial_values: &[f64], range_start: usize, skip: usize) {
self.prev_start = range_start;
for idx in range_start..skip {
let v = partial_values[idx - range_start];
self.running_sum += v;
self.sorted.insert(v);
}
}
/// Add a new value and remove all expired values up to `new_start`.
pub fn advance(&mut self, value: f64, new_start: usize, partial_values: &[f64], range_start: usize) {
self.running_sum += value;
self.sorted.insert(value);
while self.prev_start < new_start {
let old = partial_values[self.prev_start - range_start];
self.running_sum -= old;
self.sorted.remove(old);
self.prev_start += 1;
}
}
#[inline]
pub fn is_empty(&self) -> bool {
self.sorted.is_empty()
}
#[inline]
pub fn average(&self) -> f64 {
if self.sorted.is_empty() {
0.0
} else {
self.running_sum / self.sorted.len() as f64
}
}
#[inline]
pub fn min(&self) -> f64 {
if self.sorted.is_empty() { 0.0 } else { self.sorted.first() }
}
#[inline]
pub fn max(&self) -> f64 {
if self.sorted.is_empty() { 0.0 } else { self.sorted.last() }
}
/// Extract a percentile (0.0-1.0) using linear interpolation.
#[inline]
pub fn percentile(&self, p: f64) -> f64 {
let len = self.sorted.len();
if len == 0 {
return 0.0;
}
if len == 1 {
return self.sorted.kth(0);
}
let rank = p * (len - 1) as f64;
let lo = rank.floor() as usize;
let hi = rank.ceil() as usize;
if lo == hi {
self.sorted.kth(lo)
} else {
let frac = rank - lo as f64;
self.sorted.kth(lo) * (1.0 - frac) + self.sorted.kth(hi) * frac
}
}
}

View File

@@ -15,3 +15,9 @@ pub struct Windows<A, B = A, C = A, D = A> {
#[traversable(rename = "1y")]
pub _1y: D,
}
impl<A> Windows<A> {
pub fn as_mut_array(&mut self) -> [&mut A; 4] {
[&mut self._24h, &mut self._7d, &mut self._30d, &mut self._1y]
}
}

View File

@@ -27,7 +27,6 @@ mod scripts;
mod supply;
mod traits;
mod transactions;
mod utils;
use indexes::ComputeIndexes;

View File

@@ -23,7 +23,7 @@ impl Vecs {
exit: &Exit,
) -> Result<()> {
let h2d = &indexes.height.day1;
let close = &prices.usd.close.day1;
let close = &prices.usd.split.close.day1;
let first_price_di = Day1::try_from(Date::new(2010, 7, 12))
.unwrap()

View File

@@ -16,10 +16,10 @@ pub(super) fn collect_returns(tf: &str, returns: &ReturnsVecs) -> Vec<f32> {
pub(super) fn collect_closes(tf: &str, prices: &prices::Vecs) -> Vec<Dollars> {
match tf {
"1d" => prices.usd.close.day1.collect_or_default(),
"1w" => prices.usd.close.week1.collect_or_default(),
"1m" => prices.usd.close.month1.collect_or_default(),
"1y" => prices.usd.close.year1.collect_or_default(),
"1d" => prices.usd.split.close.day1.collect_or_default(),
"1w" => prices.usd.split.close.week1.collect_or_default(),
"1m" => prices.usd.split.close.month1.collect_or_default(),
"1y" => prices.usd.split.close.year1.collect_or_default(),
_ => unreachable!(),
}
}

View File

@@ -42,7 +42,7 @@ impl Vecs {
}
let h2d = &indexes.height.day1;
let closes: Vec<Dollars> = prices.usd.close.day1.collect_or_default();
let closes: Vec<Dollars> = prices.usd.split.close.day1.collect_or_default();
for (ema, period) in [
(&mut self.price_1w_ema, 7),

View File

@@ -98,18 +98,15 @@ impl Vecs {
first_txinindex_data[batch_end_height.to_usize() + 1 - offset].to_usize()
};
// Collect and process txins
// Stream txins directly into pairs — avoids intermediate Vec allocation
pairs.clear();
let txoutindexes: Vec<TxOutIndex> = txinindex_to_txoutindex.collect_range_at(txin_start, txin_end);
for (j, txoutindex) in txoutindexes.into_iter().enumerate() {
let txinindex = TxInIndex::from(txin_start + j);
if txoutindex.is_coinbase() {
continue;
let mut j = txin_start;
txinindex_to_txoutindex.for_each_range_at(txin_start, txin_end, |txoutindex: TxOutIndex| {
if !txoutindex.is_coinbase() {
pairs.push((txoutindex, TxInIndex::from(j)));
}
pairs.push((txoutindex, txinindex));
}
j += 1;
});
pairs.sort_unstable_by_key(|(txoutindex, _)| *txoutindex);

View File

@@ -19,12 +19,24 @@ impl Vecs {
exit: &Exit,
) -> Result<()> {
self.compute_prices(indexer, starting_indexes, exit)?;
self.open
self.split
.open
.compute_first(starting_indexes, &self.price, indexes, exit)?;
self.high
self.split
.high
.compute_max(starting_indexes, &self.price, indexes, exit)?;
self.low
self.split
.low
.compute_min(starting_indexes, &self.price, indexes, exit)?;
self.ohlc.compute_from_split(
starting_indexes,
&self.split.open,
&self.split.high,
&self.split.low,
&self.split.close,
indexes,
exit,
)?;
Ok(())
}
@@ -132,6 +144,10 @@ impl Vecs {
// across blocks, so the cursor only advances forward.
let mut txout_cursor = indexer.vecs.transactions.first_txoutindex.cursor();
// Reusable buffers — avoid per-block allocation
let mut values: Vec<Sats> = Vec::new();
let mut output_types: Vec<OutputType> = Vec::new();
for (idx, _h) in range.enumerate() {
let first_txindex = first_txindexes[idx];
let next_first_txindex = first_txindexes
@@ -156,16 +172,8 @@ impl Vecs {
.unwrap_or(TxOutIndex::from(total_outputs))
.to_usize();
let values: Vec<Sats> = indexer
.vecs
.outputs
.value
.collect_range_at(out_start, out_end);
let output_types: Vec<OutputType> = indexer
.vecs
.outputs
.outputtype
.collect_range_at(out_start, out_end);
indexer.vecs.outputs.value.collect_range_into_at(out_start, out_end, &mut values);
indexer.vecs.outputs.outputtype.collect_range_into_at(out_start, out_end, &mut output_types);
let mut hist = [0u32; NUM_BINS];
for i in 0..values.len() {

View File

@@ -5,6 +5,7 @@ use vecdb::{Database, ImportableVec, PcoVec, ReadableCloneableVec};
use super::Vecs;
use crate::indexes;
use crate::internal::{ComputedHeightDerivedLast, EagerIndexes};
use crate::prices::{ohlcs::OhlcVecs, split::SplitOhlc};
impl Vecs {
pub(crate) fn forced_import(
@@ -16,23 +17,26 @@ impl Vecs {
let price = PcoVec::forced_import(db, "price_cents", version)?;
let open = EagerIndexes::forced_import(db, "price_cents_open", version)?;
let high = EagerIndexes::forced_import(db, "price_cents_high", version)?;
let low = EagerIndexes::forced_import(db, "price_cents_low", version)?;
let open = EagerIndexes::forced_import(db, "price_open_cents", version)?;
let high = EagerIndexes::forced_import(db, "price_high_cents", version)?;
let low = EagerIndexes::forced_import(db, "price_low_cents", version)?;
let close = ComputedHeightDerivedLast::forced_import(
"price_cents_close",
"price_close_cents",
price.read_only_boxed_clone(),
version,
indexes,
);
Ok(Self {
price,
let split = SplitOhlc {
open,
high,
low,
close,
})
};
let ohlc = OhlcVecs::forced_import(db, "price_ohlc_cents", version)?;
Ok(Self { split, ohlc, price })
}
}

View File

@@ -1,14 +1,19 @@
use brk_traversable::Traversable;
use brk_types::{Cents, Height};
use brk_types::{Cents, Height, OHLCCents};
use vecdb::{PcoVec, Rw, StorageMode};
use crate::internal::{ComputedHeightDerivedLast, EagerIndexes};
use crate::prices::{ohlcs::OhlcVecs, split::SplitOhlc};
#[derive(Traversable)]
pub struct Vecs<M: StorageMode = Rw> {
#[allow(clippy::type_complexity)]
pub split: SplitOhlc<
EagerIndexes<Cents, M>,
EagerIndexes<Cents, M>,
EagerIndexes<Cents, M>,
ComputedHeightDerivedLast<Cents>,
>,
pub ohlc: OhlcVecs<OHLCCents, M>,
pub price: M::Stored<PcoVec<Height, Cents>>,
pub open: EagerIndexes<Cents, M>,
pub high: EagerIndexes<Cents, M>,
pub low: EagerIndexes<Cents, M>,
pub close: ComputedHeightDerivedLast<Cents>,
}

View File

@@ -1,4 +1,6 @@
mod compute;
pub(crate) mod ohlcs;
pub(crate) mod split;
pub mod cents;
pub mod sats;

View File

@@ -0,0 +1,205 @@
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{
Cents, Close, Day1, Day3, DifficultyEpoch, HalvingEpoch, High, Hour1, Hour4, Hour12, Low,
Minute1, Minute5, Minute10, Minute30, Month1, Month3, Month6, OHLCCents, Open, Version, Week1,
Year1, Year10,
};
use derive_more::{Deref, DerefMut};
use schemars::JsonSchema;
use serde::Serialize;
use vecdb::{
BytesVec, BytesVecValue, Database, EagerVec, Exit, Formattable, ImportableVec, LazyVecFrom1,
ReadableCloneableVec, ReadableVec, Rw, StorageMode, UnaryTransform,
};
use crate::{
ComputeIndexes, indexes, indexes_from,
internal::{ComputedHeightDerivedLast, EagerIndexes, Indexes},
};
// ── EagerOhlcIndexes ─────────────────────────────────────────────────
#[derive(Deref, DerefMut, Traversable)]
#[traversable(transparent)]
pub struct OhlcVecs<T, M: StorageMode = Rw>(
#[allow(clippy::type_complexity)]
pub Indexes<
<M as StorageMode>::Stored<EagerVec<BytesVec<Minute1, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Minute5, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Minute10, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Minute30, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Hour1, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Hour4, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Hour12, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Day1, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Day3, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Week1, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Month1, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Month3, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Month6, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Year1, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<Year10, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<HalvingEpoch, T>>>,
<M as StorageMode>::Stored<EagerVec<BytesVec<DifficultyEpoch, T>>>,
>,
)
where
T: BytesVecValue + Formattable + Serialize + JsonSchema;
const EAGER_VERSION: Version = Version::ZERO;
impl<T> OhlcVecs<T>
where
T: BytesVecValue + Formattable + Serialize + JsonSchema,
{
pub(crate) fn forced_import(db: &Database, name: &str, version: Version) -> Result<Self> {
let v = version + EAGER_VERSION;
macro_rules! period {
($idx:ident) => {
ImportableVec::forced_import(db, &format!("{name}_{}", stringify!($idx)), v)?
};
}
Ok(Self(indexes_from!(period)))
}
}
impl OhlcVecs<OHLCCents> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn compute_from_split(
&mut self,
starting_indexes: &ComputeIndexes,
open: &EagerIndexes<Cents>,
high: &EagerIndexes<Cents>,
low: &EagerIndexes<Cents>,
close: &ComputedHeightDerivedLast<Cents>,
indexes: &indexes::Vecs,
exit: &Exit,
) -> Result<()> {
macro_rules! period {
($field:ident) => {
self.0.$field.compute_transform(
starting_indexes.$field,
&indexes.$field.first_height,
|(idx, _first_h, _)| {
let o = open.$field.collect_one(idx).unwrap_or_default();
let h = high.$field.collect_one(idx).unwrap_or_default();
let l = low.$field.collect_one(idx).unwrap_or_default();
let c = close.$field.collect_one(idx).flatten().unwrap_or_default();
(
idx,
OHLCCents {
open: Open::new(o),
high: High::new(h),
low: Low::new(l),
close: Close::new(c),
},
)
},
exit,
)?;
};
}
macro_rules! epoch {
($field:ident) => {
self.0.$field.compute_transform(
starting_indexes.$field,
&indexes.$field.first_height,
|(idx, _first_h, _)| {
let o = open.$field.collect_one(idx).unwrap_or_default();
let h = high.$field.collect_one(idx).unwrap_or_default();
let l = low.$field.collect_one(idx).unwrap_or_default();
let c = close.$field.collect_one(idx).unwrap_or_default();
(
idx,
OHLCCents {
open: Open::new(o),
high: High::new(h),
low: Low::new(l),
close: Close::new(c),
},
)
},
exit,
)?;
};
}
period!(minute1);
period!(minute5);
period!(minute10);
period!(minute30);
period!(hour1);
period!(hour4);
period!(hour12);
period!(day1);
period!(day3);
period!(week1);
period!(month1);
period!(month3);
period!(month6);
period!(year1);
period!(year10);
epoch!(halvingepoch);
epoch!(difficultyepoch);
Ok(())
}
}
// ── LazyOhlcIndexes ──────────────────────────────────────────────────
#[derive(Clone, Deref, DerefMut, Traversable)]
#[traversable(transparent)]
pub struct LazyOhlcVecs<T, S>(
#[allow(clippy::type_complexity)]
pub Indexes<
LazyVecFrom1<Minute1, T, Minute1, S>,
LazyVecFrom1<Minute5, T, Minute5, S>,
LazyVecFrom1<Minute10, T, Minute10, S>,
LazyVecFrom1<Minute30, T, Minute30, S>,
LazyVecFrom1<Hour1, T, Hour1, S>,
LazyVecFrom1<Hour4, T, Hour4, S>,
LazyVecFrom1<Hour12, T, Hour12, S>,
LazyVecFrom1<Day1, T, Day1, S>,
LazyVecFrom1<Day3, T, Day3, S>,
LazyVecFrom1<Week1, T, Week1, S>,
LazyVecFrom1<Month1, T, Month1, S>,
LazyVecFrom1<Month3, T, Month3, S>,
LazyVecFrom1<Month6, T, Month6, S>,
LazyVecFrom1<Year1, T, Year1, S>,
LazyVecFrom1<Year10, T, Year10, S>,
LazyVecFrom1<HalvingEpoch, T, HalvingEpoch, S>,
LazyVecFrom1<DifficultyEpoch, T, DifficultyEpoch, S>,
>,
)
where
T: BytesVecValue + Formattable + Serialize + JsonSchema,
S: BytesVecValue;
impl<T, S> LazyOhlcVecs<T, S>
where
T: BytesVecValue + Formattable + Serialize + JsonSchema,
S: BytesVecValue + Formattable + Serialize + JsonSchema,
{
pub(crate) fn from_eager_ohlc_indexes<Transform: UnaryTransform<S, T>>(
name: &str,
version: Version,
source: &OhlcVecs<S>,
) -> Self {
macro_rules! period {
($idx:ident) => {
LazyVecFrom1::transformed::<Transform>(
&format!("{name}_{}", stringify!($idx)),
version,
source.$idx.read_only_boxed_clone(),
)
};
}
Self(indexes_from!(period))
}
}

View File

@@ -3,9 +3,10 @@ use vecdb::{LazyVecFrom1, ReadableCloneableVec};
use super::super::cents;
use super::Vecs;
use crate::prices::{ohlcs::LazyOhlcVecs, split::SplitOhlc};
use crate::{
indexes,
internal::{CentsUnsignedToSats, ComputedHeightDerivedLast, LazyEagerIndexes},
internal::{CentsUnsignedToSats, ComputedHeightDerivedLast, LazyEagerIndexes, OhlcCentsToSats},
};
impl Vecs {
@@ -21,26 +22,43 @@ impl Vecs {
);
// Sats are inversely related to cents (sats = 10B/cents), so high↔low are swapped
let open =
LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToSats>("price_sats_open", version, &cents.open);
let high =
LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToSats>("price_sats_high", version, &cents.low);
let low =
LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToSats>("price_sats_low", version, &cents.high);
let open = LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToSats>(
"price_open_sats",
version,
&cents.split.open,
);
let high = LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToSats>(
"price_high_sats",
version,
&cents.split.low,
);
let low = LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToSats>(
"price_low_sats",
version,
&cents.split.high,
);
let close = ComputedHeightDerivedLast::forced_import(
"price_sats_close",
"price_close_sats",
price.read_only_boxed_clone(),
version,
indexes,
);
Self {
price,
let split = SplitOhlc {
open,
high,
low,
close,
}
};
// OhlcCentsToSats handles the high↔low swap internally
let ohlc = LazyOhlcVecs::from_eager_ohlc_indexes::<OhlcCentsToSats>(
"price_ohlc_sats",
version,
&cents.ohlc,
);
Self { split, ohlc, price }
}
}

View File

@@ -1,14 +1,19 @@
use brk_traversable::Traversable;
use brk_types::{Cents, Height, Sats};
use brk_types::{Cents, Height, OHLCCents, OHLCSats, Sats};
use vecdb::LazyVecFrom1;
use crate::internal::{ComputedHeightDerivedLast, LazyEagerIndexes};
use crate::prices::{ohlcs::LazyOhlcVecs, split::SplitOhlc};
#[derive(Clone, Traversable)]
pub struct Vecs {
#[allow(clippy::type_complexity)]
pub split: SplitOhlc<
LazyEagerIndexes<Sats, Cents>,
LazyEagerIndexes<Sats, Cents>,
LazyEagerIndexes<Sats, Cents>,
ComputedHeightDerivedLast<Sats>,
>,
pub ohlc: LazyOhlcVecs<OHLCSats, OHLCCents>,
pub price: LazyVecFrom1<Height, Sats, Height, Cents>,
pub open: LazyEagerIndexes<Sats, Cents>,
pub high: LazyEagerIndexes<Sats, Cents>,
pub low: LazyEagerIndexes<Sats, Cents>,
pub close: ComputedHeightDerivedLast<Sats>,
}

View File

@@ -0,0 +1,9 @@
use brk_traversable::Traversable;
#[derive(Clone, Traversable)]
pub struct SplitOhlc<O, H, L, C> {
pub open: O,
pub high: H,
pub low: L,
pub close: C,
}

View File

@@ -3,9 +3,12 @@ use vecdb::{LazyVecFrom1, ReadableCloneableVec};
use super::super::cents;
use super::Vecs;
use crate::prices::{ohlcs::LazyOhlcVecs, split::SplitOhlc};
use crate::{
indexes,
internal::{CentsUnsignedToDollars, ComputedHeightDerivedLast, LazyEagerIndexes},
internal::{
CentsUnsignedToDollars, ComputedHeightDerivedLast, LazyEagerIndexes, OhlcCentsToDollars,
},
};
impl Vecs {
@@ -15,32 +18,48 @@ impl Vecs {
cents: &cents::Vecs,
) -> Self {
let price = LazyVecFrom1::transformed::<CentsUnsignedToDollars>(
"price_usd",
"price",
version,
cents.price.read_only_boxed_clone(),
);
// Dollars are monotonically increasing from cents, so open→open, high→high, low→low
let open =
LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToDollars>("price_usd_open", version, &cents.open);
let high =
LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToDollars>("price_usd_high", version, &cents.high);
let low =
LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToDollars>("price_usd_low", version, &cents.low);
let open = LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToDollars>(
"price_open",
version,
&cents.split.open,
);
let high = LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToDollars>(
"price_high",
version,
&cents.split.high,
);
let low = LazyEagerIndexes::from_eager_indexes::<CentsUnsignedToDollars>(
"price_low",
version,
&cents.split.low,
);
let close = ComputedHeightDerivedLast::forced_import(
"price_usd_close",
"price_close",
price.read_only_boxed_clone(),
version,
indexes,
);
Self {
price,
let split = SplitOhlc {
open,
high,
low,
close,
}
};
let ohlc = LazyOhlcVecs::from_eager_ohlc_indexes::<OhlcCentsToDollars>(
"price_ohlc",
version,
&cents.ohlc,
);
Self { split, ohlc, price }
}
}

View File

@@ -1,14 +1,19 @@
use brk_traversable::Traversable;
use brk_types::{Cents, Dollars, Height};
use brk_types::{Cents, Dollars, Height, OHLCCents, OHLCDollars};
use vecdb::LazyVecFrom1;
use crate::internal::{ComputedHeightDerivedLast, LazyEagerIndexes};
use crate::prices::{ohlcs::LazyOhlcVecs, split::SplitOhlc};
#[derive(Clone, Traversable)]
pub struct Vecs {
#[allow(clippy::type_complexity)]
pub split: SplitOhlc<
LazyEagerIndexes<Dollars, Cents>,
LazyEagerIndexes<Dollars, Cents>,
LazyEagerIndexes<Dollars, Cents>,
ComputedHeightDerivedLast<Dollars>,
>,
pub ohlc: LazyOhlcVecs<OHLCDollars, OHLCCents>,
pub price: LazyVecFrom1<Height, Dollars, Height, Cents>,
pub open: LazyEagerIndexes<Dollars, Cents>,
pub high: LazyEagerIndexes<Dollars, Cents>,
pub low: LazyEagerIndexes<Dollars, Cents>,
pub close: ComputedHeightDerivedLast<Dollars>,
}

View File

@@ -62,16 +62,17 @@ impl Vecs {
let out_start = first_txoutindex.to_usize();
let out_end = next_first_txoutindex.to_usize();
// Sum opreturn values — batch read both ranges for the block
let values = indexer.vecs.outputs.value.collect_range_at(out_start, out_end);
// Sum opreturn values — fold over both vecs without allocation
let opreturn_value = indexer.vecs.outputs.outputtype.fold_range_at(
out_start, out_end,
(Sats::ZERO, 0_usize),
|(mut sum, idx), ot| {
if ot == OutputType::OpReturn {
sum += values[idx];
}
(sum, idx + 1)
(Sats::ZERO, out_start),
|(sum, vi), ot| {
let new_sum = if ot == OutputType::OpReturn {
sum + indexer.vecs.outputs.value.collect_one_at(vi).unwrap()
} else {
sum
};
(new_sum, vi + 1)
},
).0;

View File

@@ -5,6 +5,77 @@ use vecdb::{
WritableVec,
};
use crate::internal::sliding_window::SlidingWindowSorted;
/// Unified rolling extremum (min or max) from window starts.
///
/// `should_replace` determines whether to evict the deque back:
/// - For min: `|back, new| *back >= *new`
/// - For max: `|back, new| *back <= *new`
pub fn compute_rolling_extremum_from_starts<I, T, A>(
out: &mut EagerVec<PcoVec<I, T>>,
max_from: I,
window_starts: &impl ReadableVec<I, I>,
values: &impl ReadableVec<I, A>,
should_replace: fn(&A, &A) -> bool,
exit: &Exit,
) -> Result<()>
where
I: VecIndex,
T: PcoVecValue + From<A>,
A: VecValue + Ord,
{
out.validate_and_truncate(window_starts.version() + values.version(), max_from)?;
out.repeat_until_complete(exit, |this| {
let skip = this.len();
let mut deque: std::collections::VecDeque<(usize, A)> =
std::collections::VecDeque::new();
let start_offset = if skip > 0 {
window_starts.collect_one_at(skip - 1).unwrap().to_usize()
} else {
0
};
let end = window_starts.len().min(values.len());
let starts_batch = window_starts.collect_range_at(start_offset, end);
let values_batch = values.collect_range_at(start_offset, end);
for (j, (start, value)) in starts_batch.into_iter().zip(values_batch).enumerate() {
let i = start_offset + j;
let start_usize = start.to_usize();
while let Some(&(idx, _)) = deque.front() {
if idx < start_usize {
deque.pop_front();
} else {
break;
}
}
while let Some((_, back)) = deque.back() {
if should_replace(back, &value) {
deque.pop_back();
} else {
break;
}
}
deque.push_back((i, value));
if i >= skip {
let extremum = deque.front().unwrap().1.clone();
this.checked_push_at(i, T::from(extremum))?;
if this.batch_limit_reached() {
break;
}
}
}
Ok(())
})?;
Ok(())
}
pub trait ComputeRollingMinFromStarts<I: VecIndex, T> {
fn compute_rolling_min_from_starts<A>(
&mut self,
@@ -34,56 +105,14 @@ where
A: VecValue + Ord,
T: From<A>,
{
self.validate_computed_version_or_reset(window_starts.version() + values.version())?;
self.truncate_if_needed(max_from)?;
self.repeat_until_complete(exit, |this| {
let skip = this.len();
let mut deque: std::collections::VecDeque<(usize, A)> =
std::collections::VecDeque::new();
let start_offset = if skip > 0 {
window_starts.collect_one_at(skip - 1).unwrap().to_usize()
} else {
0
};
let end = window_starts.len().min(values.len());
let starts_batch = window_starts.collect_range_at(start_offset, end);
let values_batch = values.collect_range_at(start_offset, end);
for (j, (start, value)) in starts_batch.into_iter().zip(values_batch).enumerate() {
let i = start_offset + j;
let start_usize = start.to_usize();
while let Some(&(idx, _)) = deque.front() {
if idx < start_usize {
deque.pop_front();
} else {
break;
}
}
while let Some((_, back)) = deque.back() {
if *back >= value {
deque.pop_back();
} else {
break;
}
}
deque.push_back((i, value));
if i >= skip {
let min_val = deque.front().unwrap().1.clone();
this.checked_push_at(i, T::from(min_val))?;
if this.batch_limit_reached() {
break;
}
}
}
Ok(())
})?;
Ok(())
compute_rolling_extremum_from_starts(
self,
max_from,
window_starts,
values,
|back, new| *back >= *new,
exit,
)
}
}
@@ -116,56 +145,14 @@ where
A: VecValue + Ord,
T: From<A>,
{
self.validate_computed_version_or_reset(window_starts.version() + values.version())?;
self.truncate_if_needed(max_from)?;
self.repeat_until_complete(exit, |this| {
let skip = this.len();
let mut deque: std::collections::VecDeque<(usize, A)> =
std::collections::VecDeque::new();
let start_offset = if skip > 0 {
window_starts.collect_one_at(skip - 1).unwrap().to_usize()
} else {
0
};
let end = window_starts.len().min(values.len());
let starts_batch = window_starts.collect_range_at(start_offset, end);
let values_batch = values.collect_range_at(start_offset, end);
for (j, (start, value)) in starts_batch.into_iter().zip(values_batch).enumerate() {
let i = start_offset + j;
let start_usize = start.to_usize();
while let Some(&(idx, _)) = deque.front() {
if idx < start_usize {
deque.pop_front();
} else {
break;
}
}
while let Some((_, back)) = deque.back() {
if *back <= value {
deque.pop_back();
} else {
break;
}
}
deque.push_back((i, value));
if i >= skip {
let max_val = deque.front().unwrap().1.clone();
this.checked_push_at(i, T::from(max_val))?;
if this.batch_limit_reached() {
break;
}
}
}
Ok(())
})?;
Ok(())
compute_rolling_extremum_from_starts(
self,
max_from,
window_starts,
values,
|back, new| *back <= *new,
exit,
)
}
}
@@ -198,70 +185,47 @@ where
A: VecValue + Copy,
f64: From<A>,
{
self.validate_computed_version_or_reset(window_starts.version() + values.version())?;
self.truncate_if_needed(max_from)?;
self.validate_and_truncate(window_starts.version() + values.version(), max_from)?;
self.repeat_until_complete(exit, |this| {
let skip = this.len();
let end = window_starts.len().min(values.len());
// Only collect the range needed: from window start of previous
// element to end. For incremental (1 block) this is ~window_size
// instead of the full history.
let range_start = if skip > 0 {
window_starts.collect_one_at(skip - 1).unwrap().to_usize()
} else {
0
};
let partial_values: Vec<A> = values.collect_range_at(range_start, end);
let partial_values: Vec<f64> = values
.collect_range_at(range_start, end)
.into_iter()
.map(|a| f64::from(a))
.collect();
let mut sorted: Vec<f64> = Vec::new();
let mut prev_start_usize: usize = range_start;
let capacity = if skip > 0 && skip < end {
let first_start = window_starts.collect_one_at(skip).unwrap().to_usize();
(skip + 1).saturating_sub(first_start)
} else if !partial_values.is_empty() {
partial_values.len().min(1024)
} else {
0
};
let mut window = SlidingWindowSorted::with_capacity(capacity);
// Reconstruct state from historical data
if skip > 0 {
(range_start..skip).for_each(|idx| {
let v = f64::from(partial_values[idx - range_start]);
let pos = sorted
.binary_search_by(|a| {
a.partial_cmp(&v).unwrap_or(std::cmp::Ordering::Equal)
})
.unwrap_or_else(|x| x);
sorted.insert(pos, v);
});
window.reconstruct(&partial_values, range_start, skip);
}
let starts_batch = window_starts.collect_range_at(skip, end);
for (j, start) in starts_batch.into_iter().enumerate() {
let i = skip + j;
let v = f64::from(partial_values[i - range_start]);
let pos = sorted
.binary_search_by(|a| a.partial_cmp(&v).unwrap_or(std::cmp::Ordering::Equal))
.unwrap_or_else(|x| x);
sorted.insert(pos, v);
let v = partial_values[i - range_start];
let start_usize = start.to_usize();
while prev_start_usize < start_usize {
let old = f64::from(partial_values[prev_start_usize - range_start]);
if let Ok(pos) = sorted.binary_search_by(|a| {
a.partial_cmp(&old).unwrap_or(std::cmp::Ordering::Equal)
}) {
sorted.remove(pos);
}
prev_start_usize += 1;
}
let median = if sorted.is_empty() {
0.0
} else if sorted.len().is_multiple_of(2) {
let mid = sorted.len() / 2;
(sorted[mid - 1] + sorted[mid]) / 2.0
} else {
sorted[sorted.len() / 2]
};
window.advance(v, start_usize, &partial_values, range_start);
let median = window.percentile(0.50);
this.checked_push_at(i, T::from(median))?;
if this.batch_limit_reached() {
@@ -278,11 +242,6 @@ where
/// Compute all 8 rolling distribution stats (avg, min, max, p10, p25, median, p75, p90)
/// in a single sorted-vec pass per window.
///
/// Since the percentile pass already sorts data, min = sorted[0], max = sorted[last],
/// and average = running_sum / count — all extracted at negligible extra cost.
/// This replaces 3 separate passes (avg, min, max) + 1 percentile pass = 4 passes
/// with a single unified pass.
#[allow(clippy::too_many_arguments)]
pub fn compute_rolling_distribution_from_starts<I, T, A>(
max_from: I,
@@ -306,34 +265,12 @@ where
{
let version = window_starts.version() + values.version();
average_out.validate_computed_version_or_reset(version)?;
min_out.validate_computed_version_or_reset(version)?;
max_out.validate_computed_version_or_reset(version)?;
p10_out.validate_computed_version_or_reset(version)?;
p25_out.validate_computed_version_or_reset(version)?;
median_out.validate_computed_version_or_reset(version)?;
p75_out.validate_computed_version_or_reset(version)?;
p90_out.validate_computed_version_or_reset(version)?;
for v in [&mut *average_out, &mut *min_out, &mut *max_out, &mut *p10_out, &mut *p25_out, &mut *median_out, &mut *p75_out, &mut *p90_out] {
v.validate_and_truncate(version, max_from)?;
}
average_out.truncate_if_needed(max_from)?;
min_out.truncate_if_needed(max_from)?;
max_out.truncate_if_needed(max_from)?;
p10_out.truncate_if_needed(max_from)?;
p25_out.truncate_if_needed(max_from)?;
median_out.truncate_if_needed(max_from)?;
p75_out.truncate_if_needed(max_from)?;
p90_out.truncate_if_needed(max_from)?;
// All 8 vecs should be at the same length; use min to be safe
let skip = average_out
.len()
.min(min_out.len())
.min(max_out.len())
.min(p10_out.len())
.min(p25_out.len())
.min(median_out.len())
.min(p75_out.len())
.min(p90_out.len());
let skip = [average_out.len(), min_out.len(), max_out.len(), p10_out.len(), p25_out.len(), median_out.len(), p75_out.len(), p90_out.len()]
.into_iter().min().unwrap();
let end = window_starts.len().min(values.len());
if skip >= end {
@@ -345,113 +282,68 @@ where
} else {
0
};
let partial_values: Vec<A> = values.collect_range_at(range_start, end);
let partial_values: Vec<f64> = values
.collect_range_at(range_start, end)
.into_iter()
.map(|a| f64::from(a))
.collect();
let mut sorted: Vec<f64> = Vec::new();
let mut running_sum: f64 = 0.0;
let mut prev_start_usize: usize = range_start;
let capacity = if skip > 0 && skip < end {
let first_start = window_starts.collect_one_at(skip).unwrap().to_usize();
(skip + 1).saturating_sub(first_start)
} else if !partial_values.is_empty() {
partial_values.len().min(1024)
} else {
0
};
let mut window = SlidingWindowSorted::with_capacity(capacity);
// Reconstruct sorted state + running sum from historical data
if skip > 0 {
for idx in range_start..skip {
let v = f64::from(partial_values[idx - range_start]);
running_sum += v;
let pos = sorted
.binary_search_by(|a| a.partial_cmp(&v).unwrap_or(std::cmp::Ordering::Equal))
.unwrap_or_else(|x| x);
sorted.insert(pos, v);
}
window.reconstruct(&partial_values, range_start, skip);
}
let starts_batch = window_starts.collect_range_at(skip, end);
for (j, start) in starts_batch.into_iter().enumerate() {
let i = skip + j;
let v = f64::from(partial_values[i - range_start]);
running_sum += v;
let pos = sorted
.binary_search_by(|a| a.partial_cmp(&v).unwrap_or(std::cmp::Ordering::Equal))
.unwrap_or_else(|x| x);
sorted.insert(pos, v);
let v = partial_values[i - range_start];
let start_usize = start.to_usize();
while prev_start_usize < start_usize {
let old = f64::from(partial_values[prev_start_usize - range_start]);
running_sum -= old;
if let Ok(pos) = sorted
.binary_search_by(|a| a.partial_cmp(&old).unwrap_or(std::cmp::Ordering::Equal))
{
sorted.remove(pos);
}
prev_start_usize += 1;
}
window.advance(v, start_usize, &partial_values, range_start);
let len = sorted.len();
if len == 0 {
if window.is_empty() {
let zero = T::from(0.0);
average_out.checked_push_at(i, zero)?;
min_out.checked_push_at(i, zero)?;
max_out.checked_push_at(i, zero)?;
p10_out.checked_push_at(i, zero)?;
p25_out.checked_push_at(i, zero)?;
median_out.checked_push_at(i, zero)?;
p75_out.checked_push_at(i, zero)?;
p90_out.checked_push_at(i, zero)?;
for v in [&mut *average_out, &mut *min_out, &mut *max_out, &mut *p10_out, &mut *p25_out, &mut *median_out, &mut *p75_out, &mut *p90_out] {
v.checked_push_at(i, zero)?;
}
} else {
average_out.checked_push_at(i, T::from(running_sum / len as f64))?;
min_out.checked_push_at(i, T::from(sorted[0]))?;
max_out.checked_push_at(i, T::from(sorted[len - 1]))?;
p10_out.checked_push_at(i, T::from(percentile_of_sorted(&sorted, 0.10)))?;
p25_out.checked_push_at(i, T::from(percentile_of_sorted(&sorted, 0.25)))?;
median_out.checked_push_at(i, T::from(percentile_of_sorted(&sorted, 0.50)))?;
p75_out.checked_push_at(i, T::from(percentile_of_sorted(&sorted, 0.75)))?;
p90_out.checked_push_at(i, T::from(percentile_of_sorted(&sorted, 0.90)))?;
average_out.checked_push_at(i, T::from(window.average()))?;
min_out.checked_push_at(i, T::from(window.min()))?;
max_out.checked_push_at(i, T::from(window.max()))?;
p10_out.checked_push_at(i, T::from(window.percentile(0.10)))?;
p25_out.checked_push_at(i, T::from(window.percentile(0.25)))?;
median_out.checked_push_at(i, T::from(window.percentile(0.50)))?;
p75_out.checked_push_at(i, T::from(window.percentile(0.75)))?;
p90_out.checked_push_at(i, T::from(window.percentile(0.90)))?;
}
if average_out.batch_limit_reached() {
let _lock = exit.lock();
average_out.write()?;
min_out.write()?;
max_out.write()?;
p10_out.write()?;
p25_out.write()?;
median_out.write()?;
p75_out.write()?;
p90_out.write()?;
for v in [&mut *average_out, &mut *min_out, &mut *max_out, &mut *p10_out, &mut *p25_out, &mut *median_out, &mut *p75_out, &mut *p90_out] {
v.write()?;
}
}
}
// Final flush
let _lock = exit.lock();
average_out.write()?;
min_out.write()?;
max_out.write()?;
p10_out.write()?;
p25_out.write()?;
median_out.write()?;
p75_out.write()?;
p90_out.write()?;
for v in [average_out, min_out, max_out, p10_out, p25_out, median_out, p75_out, p90_out] {
v.write()?;
}
Ok(())
}
/// Extract a percentile (0.0-1.0) from a sorted slice using linear interpolation.
fn percentile_of_sorted(sorted: &[f64], p: f64) -> f64 {
let len = sorted.len();
if len == 1 {
return sorted[0];
}
let rank = p * (len - 1) as f64;
let lo = rank.floor() as usize;
let hi = rank.ceil() as usize;
if lo == hi {
sorted[lo]
} else {
let frac = rank - lo as f64;
sorted[lo] * (1.0 - frac) + sorted[hi] * frac
}
}
pub trait ComputeDrawdown<I: VecIndex> {
fn compute_drawdown<C, A>(
&mut self,

View File

@@ -1,47 +0,0 @@
use std::ops::{Add, Div};
/// Extension trait for Option to provide shorter unwrap methods
pub trait OptionExt<T> {
/// Shorthand for `.as_ref().unwrap()`
fn u(&self) -> &T;
/// Shorthand for `.as_mut().unwrap()`
fn um(&mut self) -> &mut T;
}
impl<T> OptionExt<T> for Option<T> {
#[inline]
fn u(&self) -> &T {
self.as_ref().unwrap()
}
#[inline]
fn um(&mut self) -> &mut T {
self.as_mut().unwrap()
}
}
pub(crate) fn get_percentile<T>(sorted: &[T], percentile: f64) -> T
where
T: Clone + Div<usize, Output = T> + Add<T, Output = T>,
{
let len = sorted.len();
if len == 0 {
panic!();
} else if len == 1 {
sorted[0].clone()
} else {
let index = (len - 1) as f64 * percentile;
let fract = index.fract();
if fract != 0.0 {
let left = sorted.get(index as usize).unwrap().clone();
let right = sorted.get(index.ceil() as usize).unwrap().clone();
left / 2 + right / 2
} else {
// dbg!(sorted.len(), index);
sorted.get(index as usize).unwrap().clone()
}
}
}