mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
global: snapshot
This commit is contained in:
@@ -25,7 +25,7 @@ impl Vecs {
|
||||
let (r1, r2) = rayon::join(
|
||||
|| {
|
||||
self.supply
|
||||
.compute(starting_indexes, distribution, &self.activity, exit)
|
||||
.compute(starting_indexes, prices, distribution, &self.activity, exit)
|
||||
},
|
||||
|| {
|
||||
rayon::join(
|
||||
|
||||
@@ -4,12 +4,13 @@ use vecdb::Exit;
|
||||
|
||||
use super::super::activity;
|
||||
use super::Vecs;
|
||||
use crate::distribution;
|
||||
use crate::{distribution, prices};
|
||||
|
||||
impl Vecs {
|
||||
pub(crate) fn compute(
|
||||
&mut self,
|
||||
starting_indexes: &Indexes,
|
||||
prices: &prices::Vecs,
|
||||
distribution: &distribution::Vecs,
|
||||
activity: &activity::Vecs,
|
||||
exit: &Exit,
|
||||
@@ -37,6 +38,9 @@ impl Vecs {
|
||||
exit,
|
||||
)?;
|
||||
|
||||
self.vaulted.compute(prices, starting_indexes.height, exit)?;
|
||||
self.active.compute(prices, starting_indexes.height, exit)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,18 +221,26 @@ impl Vecs {
|
||||
starting_indexes: &Indexes,
|
||||
exit: &Exit,
|
||||
) -> Result<()> {
|
||||
self.tx_index.input_count.compute_count_from_indexes(
|
||||
starting_indexes.tx_index,
|
||||
&indexer.vecs.transactions.first_txin_index,
|
||||
&indexer.vecs.inputs.outpoint,
|
||||
exit,
|
||||
)?;
|
||||
self.tx_index.output_count.compute_count_from_indexes(
|
||||
starting_indexes.tx_index,
|
||||
&indexer.vecs.transactions.first_txout_index,
|
||||
&indexer.vecs.outputs.value,
|
||||
exit,
|
||||
)?;
|
||||
let (r1, r2) = rayon::join(
|
||||
|| {
|
||||
self.tx_index.input_count.compute_count_from_indexes(
|
||||
starting_indexes.tx_index,
|
||||
&indexer.vecs.transactions.first_txin_index,
|
||||
&indexer.vecs.inputs.outpoint,
|
||||
exit,
|
||||
)
|
||||
},
|
||||
|| {
|
||||
self.tx_index.output_count.compute_count_from_indexes(
|
||||
starting_indexes.tx_index,
|
||||
&indexer.vecs.transactions.first_txout_index,
|
||||
&indexer.vecs.outputs.value,
|
||||
exit,
|
||||
)
|
||||
},
|
||||
);
|
||||
r1?;
|
||||
r2?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use brk_error::Result;
|
||||
use brk_indexer::Indexer;
|
||||
use brk_types::{Indexes, Sats, TxIndex, TxOutIndex, Vout};
|
||||
use rayon::prelude::*;
|
||||
use tracing::info;
|
||||
use vecdb::{AnyStoredVec, AnyVec, Exit, ReadableVec, VecIndex, WritableVec};
|
||||
|
||||
@@ -68,7 +69,7 @@ impl Vecs {
|
||||
});
|
||||
|
||||
// Sort 1: by tx_index (group by transaction for sequential first_txout_index reads)
|
||||
entries.sort_unstable_by_key(|e| e.tx_index);
|
||||
entries.par_sort_unstable_by_key(|e| e.tx_index);
|
||||
for entry in &mut entries {
|
||||
if entry.tx_index.is_coinbase() {
|
||||
break;
|
||||
@@ -78,7 +79,7 @@ impl Vecs {
|
||||
}
|
||||
|
||||
// Sort 2: by txout_index (sequential value reads)
|
||||
entries.sort_unstable_by_key(|e| e.txout_index);
|
||||
entries.par_sort_unstable_by_key(|e| e.txout_index);
|
||||
for entry in &mut entries {
|
||||
if entry.txout_index.is_coinbase() {
|
||||
break;
|
||||
@@ -105,8 +106,9 @@ impl Vecs {
|
||||
}
|
||||
|
||||
let _lock = exit.lock();
|
||||
self.txout_index.write()?;
|
||||
self.value.write()?;
|
||||
let (r1, r2) = rayon::join(|| self.txout_index.write(), || self.value.write());
|
||||
r1?;
|
||||
r2?;
|
||||
|
||||
if batch_end < target {
|
||||
info!("TxIns: {:.2}%", batch_end as f64 / target as f64 * 100.0);
|
||||
|
||||
@@ -2,8 +2,8 @@ use brk_error::Result;
|
||||
use brk_traversable::Traversable;
|
||||
use schemars::JsonSchema;
|
||||
use vecdb::{
|
||||
Database, EagerVec, Exit, ImportableVec, PcoVec, ReadableVec, Ro, Rw, StorageMode, StoredVec,
|
||||
VecIndex, VecValue, Version,
|
||||
CheckedSub, Database, EagerVec, Exit, ImportableVec, PcoVec, ReadableVec, Ro, Rw, StorageMode,
|
||||
StoredVec, VecIndex, VecValue, Version,
|
||||
};
|
||||
|
||||
use crate::internal::{
|
||||
@@ -81,13 +81,14 @@ impl<I: VecIndex, T: ComputedVecValue + JsonSchema> Distribution<I, T> {
|
||||
pub(crate) fn compute_from_nblocks<A>(
|
||||
&mut self,
|
||||
max_from: I,
|
||||
source: &impl ReadableVec<A, T>,
|
||||
source: &(impl ReadableVec<A, T> + Sized),
|
||||
first_indexes: &impl ReadableVec<I, A>,
|
||||
count_indexes: &impl ReadableVec<I, brk_types::StoredU64>,
|
||||
n_blocks: usize,
|
||||
exit: &Exit,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: CheckedSub,
|
||||
A: VecIndex + VecValue + brk_types::CheckedSub<A>,
|
||||
{
|
||||
compute_aggregations_nblock_window(
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_types::{CheckedSub, StoredU64};
|
||||
use brk_types::{CheckedSub, StoredU64, get_percentile};
|
||||
use schemars::JsonSchema;
|
||||
use vecdb::{
|
||||
AnyStoredVec, AnyVec, EagerVec, Exit, PcoVec, ReadableVec, VecIndex, VecValue, WritableVec,
|
||||
};
|
||||
|
||||
use brk_types::get_percentile;
|
||||
|
||||
use crate::internal::ComputedVecValue;
|
||||
|
||||
fn validate_and_start<I: VecIndex, T: ComputedVecValue + JsonSchema>(
|
||||
@@ -98,12 +98,16 @@ where
|
||||
})*
|
||||
};
|
||||
}
|
||||
truncate_vec!(first, last, min, max, average, sum, cumulative, median, pct10, pct25, pct75, pct90);
|
||||
truncate_vec!(
|
||||
first, last, min, max, average, sum, cumulative, median, pct10, pct25, pct75, pct90
|
||||
);
|
||||
|
||||
let fi_len = first_indexes.len();
|
||||
let first_indexes_batch: Vec<A> = first_indexes.collect_range_at(start, fi_len);
|
||||
let count_indexes_batch: Vec<StoredU64> = count_indexes.collect_range_at(start, fi_len);
|
||||
|
||||
let mut values: Vec<T> = Vec::new();
|
||||
|
||||
first_indexes_batch
|
||||
.into_iter()
|
||||
.zip(count_indexes_batch)
|
||||
@@ -157,9 +161,10 @@ where
|
||||
max_vec.push(max_val.or(min_val).unwrap_or_else(|| T::from(0_usize)));
|
||||
}
|
||||
} else if needs_percentiles || needs_minmax {
|
||||
let mut values: Vec<T> = source.collect_range_at(
|
||||
source.collect_range_into_at(
|
||||
effective_first_index.to_usize(),
|
||||
effective_first_index.to_usize() + effective_count,
|
||||
&mut values,
|
||||
);
|
||||
|
||||
if values.is_empty() {
|
||||
@@ -224,17 +229,20 @@ where
|
||||
}
|
||||
}
|
||||
} else if needs_minmax {
|
||||
// Single pass for min + max + optional sum
|
||||
let (min_val, max_val, sum_val, len) = values.iter().copied().fold(
|
||||
(values[0], values[0], T::from(0_usize), 0_usize),
|
||||
|(mn, mx, s, c), v| (mn.min(v), mx.max(v), s + v, c + 1),
|
||||
);
|
||||
|
||||
if let Some(ref mut min_vec) = min {
|
||||
min_vec.push(*values.iter().min().unwrap());
|
||||
min_vec.push(min_val);
|
||||
}
|
||||
if let Some(ref mut max_vec) = max {
|
||||
max_vec.push(*values.iter().max().unwrap());
|
||||
max_vec.push(max_val);
|
||||
}
|
||||
|
||||
if needs_aggregates {
|
||||
let len = values.len();
|
||||
let sum_val = values.into_iter().fold(T::from(0), |a, b| a + b);
|
||||
|
||||
if let Some(ref mut average_vec) = average {
|
||||
average_vec.push(sum_val / len);
|
||||
}
|
||||
@@ -302,7 +310,7 @@ where
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn compute_aggregations_nblock_window<I, T, A>(
|
||||
max_from: I,
|
||||
source: &impl ReadableVec<A, T>,
|
||||
source: &(impl ReadableVec<A, T> + Sized),
|
||||
first_indexes: &impl ReadableVec<I, A>,
|
||||
count_indexes: &impl ReadableVec<I, StoredU64>,
|
||||
n_blocks: usize,
|
||||
@@ -318,7 +326,7 @@ pub(crate) fn compute_aggregations_nblock_window<I, T, A>(
|
||||
) -> Result<()>
|
||||
where
|
||||
I: VecIndex,
|
||||
T: ComputedVecValue + JsonSchema,
|
||||
T: ComputedVecValue + CheckedSub + JsonSchema,
|
||||
A: VecIndex + VecValue + CheckedSub<A>,
|
||||
{
|
||||
let combined_version = source.version() + first_indexes.version() + count_indexes.version();
|
||||
@@ -341,13 +349,11 @@ where
|
||||
let start = index.to_usize();
|
||||
let fi_len = first_indexes.len();
|
||||
|
||||
// Only fetch first_indexes from the earliest possible window start
|
||||
let batch_start = start.saturating_sub(n_blocks - 1);
|
||||
let first_indexes_batch: Vec<A> = first_indexes.collect_range_at(batch_start, fi_len);
|
||||
let count_indexes_batch: Vec<StoredU64> = count_indexes.collect_range_at(start, fi_len);
|
||||
let count_indexes_all: Vec<StoredU64> = count_indexes.collect_range_at(batch_start, fi_len);
|
||||
|
||||
let zero = T::from(0_usize);
|
||||
let mut values: Vec<T> = Vec::new();
|
||||
|
||||
for vec in [
|
||||
&mut *min,
|
||||
@@ -362,60 +368,116 @@ where
|
||||
vec.truncate_if_needed_at(start)?;
|
||||
}
|
||||
|
||||
count_indexes_batch
|
||||
.iter()
|
||||
.enumerate()
|
||||
.try_for_each(|(j, ci)| -> Result<()> {
|
||||
let idx = start + j;
|
||||
// Persistent sorted window: O(n) merge-insert for new block, O(n) merge-filter
|
||||
// for expired block. Avoids re-sorting every block. Cursor reads only the new
|
||||
// block (~1 page decompress vs original's ~4). Ring buffer caches per-block
|
||||
// sorted values + sums for O(1) expiry.
|
||||
// Peak memory: 2 × ~15k window elements + n_blocks × ~2500 cached ≈ 360 KB.
|
||||
let mut block_ring: VecDeque<(Vec<T>, T)> = VecDeque::with_capacity(n_blocks + 1);
|
||||
let mut cursor = source.cursor();
|
||||
let mut sorted_window: Vec<T> = Vec::new();
|
||||
let mut merge_buf: Vec<T> = Vec::new();
|
||||
let mut running_sum = T::from(0_usize);
|
||||
|
||||
// Window start: max(0, idx - n_blocks + 1)
|
||||
let window_start = idx.saturating_sub(n_blocks - 1);
|
||||
// Pre-fill initial window blocks [window_start_of_first..start)
|
||||
let window_start_of_first = start.saturating_sub(n_blocks - 1);
|
||||
for block_idx in window_start_of_first..start {
|
||||
let fi = first_indexes_batch[block_idx - batch_start].to_usize();
|
||||
let count = u64::from(count_indexes_all[block_idx - batch_start]) as usize;
|
||||
if cursor.position() < fi {
|
||||
cursor.advance(fi - cursor.position());
|
||||
}
|
||||
let mut bv = Vec::with_capacity(count);
|
||||
cursor.for_each(count, |v: T| bv.push(v));
|
||||
bv.sort_unstable();
|
||||
let block_sum = bv.iter().copied().fold(T::from(0), |a, b| a + b);
|
||||
running_sum += block_sum;
|
||||
sorted_window.extend_from_slice(&bv);
|
||||
block_ring.push_back((bv, block_sum));
|
||||
}
|
||||
// Initial sorted_window was built by extending individually sorted blocks —
|
||||
// stable sort detects these sorted runs and merges in O(n × log(k)) instead of O(n log n).
|
||||
sorted_window.sort();
|
||||
|
||||
// Last tx index (exclusive) of current block
|
||||
let count = u64::from(*ci) as usize;
|
||||
let fi = first_indexes_batch[idx - batch_start];
|
||||
let range_end_usize = fi.to_usize() + count;
|
||||
for j in 0..(fi_len - start) {
|
||||
let idx = start + j;
|
||||
|
||||
// First tx index of the window start block
|
||||
let range_start_usize = first_indexes_batch[window_start - batch_start].to_usize();
|
||||
// Read and sort new block's values
|
||||
let fi = first_indexes_batch[idx - batch_start].to_usize();
|
||||
let count = u64::from(count_indexes_all[idx - batch_start]) as usize;
|
||||
if cursor.position() < fi {
|
||||
cursor.advance(fi - cursor.position());
|
||||
}
|
||||
let mut new_block = Vec::with_capacity(count);
|
||||
cursor.for_each(count, |v: T| new_block.push(v));
|
||||
new_block.sort_unstable();
|
||||
let new_sum = new_block.iter().copied().fold(T::from(0), |a, b| a + b);
|
||||
running_sum += new_sum;
|
||||
|
||||
let effective_count = range_end_usize.saturating_sub(range_start_usize);
|
||||
|
||||
if effective_count == 0 {
|
||||
for vec in [
|
||||
&mut *min,
|
||||
&mut *max,
|
||||
&mut *average,
|
||||
&mut *median,
|
||||
&mut *pct10,
|
||||
&mut *pct25,
|
||||
&mut *pct75,
|
||||
&mut *pct90,
|
||||
] {
|
||||
vec.push(zero);
|
||||
}
|
||||
// Merge-insert new sorted block into sorted_window: O(n+m)
|
||||
merge_buf.clear();
|
||||
merge_buf.reserve(sorted_window.len() + new_block.len());
|
||||
let (mut si, mut ni) = (0, 0);
|
||||
while si < sorted_window.len() && ni < new_block.len() {
|
||||
if sorted_window[si] <= new_block[ni] {
|
||||
merge_buf.push(sorted_window[si]);
|
||||
si += 1;
|
||||
} else {
|
||||
source.collect_range_into_at(range_start_usize, range_end_usize, &mut values);
|
||||
|
||||
// Compute sum before sorting
|
||||
let len = values.len();
|
||||
let sum_val = values.iter().copied().fold(T::from(0), |a, b| a + b);
|
||||
let avg = sum_val / len;
|
||||
|
||||
values.sort_unstable();
|
||||
|
||||
max.push(*values.last().unwrap());
|
||||
pct90.push(get_percentile(&values, 0.90));
|
||||
pct75.push(get_percentile(&values, 0.75));
|
||||
median.push(get_percentile(&values, 0.50));
|
||||
pct25.push(get_percentile(&values, 0.25));
|
||||
pct10.push(get_percentile(&values, 0.10));
|
||||
min.push(*values.first().unwrap());
|
||||
average.push(avg);
|
||||
merge_buf.push(new_block[ni]);
|
||||
ni += 1;
|
||||
}
|
||||
}
|
||||
merge_buf.extend_from_slice(&sorted_window[si..]);
|
||||
merge_buf.extend_from_slice(&new_block[ni..]);
|
||||
std::mem::swap(&mut sorted_window, &mut merge_buf);
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
block_ring.push_back((new_block, new_sum));
|
||||
|
||||
// Expire oldest block: merge-filter its sorted values from sorted_window in O(n)
|
||||
if block_ring.len() > n_blocks {
|
||||
let (expired, expired_sum) = block_ring.pop_front().unwrap();
|
||||
running_sum = running_sum.checked_sub(expired_sum).unwrap();
|
||||
|
||||
merge_buf.clear();
|
||||
merge_buf.reserve(sorted_window.len());
|
||||
let mut ei = 0;
|
||||
for &v in &sorted_window {
|
||||
if ei < expired.len() && v == expired[ei] {
|
||||
ei += 1;
|
||||
} else {
|
||||
merge_buf.push(v);
|
||||
}
|
||||
}
|
||||
std::mem::swap(&mut sorted_window, &mut merge_buf);
|
||||
}
|
||||
|
||||
if sorted_window.is_empty() {
|
||||
for vec in [
|
||||
&mut *min,
|
||||
&mut *max,
|
||||
&mut *average,
|
||||
&mut *median,
|
||||
&mut *pct10,
|
||||
&mut *pct25,
|
||||
&mut *pct75,
|
||||
&mut *pct90,
|
||||
] {
|
||||
vec.push(zero);
|
||||
}
|
||||
} else {
|
||||
let len = sorted_window.len();
|
||||
let avg = running_sum / len;
|
||||
|
||||
max.push(*sorted_window.last().unwrap());
|
||||
pct90.push(get_percentile(&sorted_window, 0.90));
|
||||
pct75.push(get_percentile(&sorted_window, 0.75));
|
||||
median.push(get_percentile(&sorted_window, 0.50));
|
||||
pct25.push(get_percentile(&sorted_window, 0.25));
|
||||
pct10.push(get_percentile(&sorted_window, 0.10));
|
||||
min.push(*sorted_window.first().unwrap());
|
||||
average.push(avg);
|
||||
}
|
||||
}
|
||||
|
||||
let _lock = exit.lock();
|
||||
for vec in [min, max, average, median, pct10, pct25, pct75, pct90] {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::cmp::Ordering;
|
||||
|
||||
/// Sqrt-decomposed sorted structure for O(sqrt(n)) insert/remove/kth.
|
||||
///
|
||||
/// Maintains `blocks` sorted sub-arrays where each block is sorted and
|
||||
@@ -19,6 +21,17 @@ impl SortedBlocks {
|
||||
}
|
||||
}
|
||||
|
||||
/// Build from a pre-sorted slice in O(n) by chunking directly into blocks.
|
||||
fn from_sorted(sorted: &[f64], block_size: usize) -> Self {
|
||||
let total_len = sorted.len();
|
||||
let blocks: Vec<Vec<f64>> = sorted.chunks(block_size).map(|c| c.to_vec()).collect();
|
||||
Self {
|
||||
blocks,
|
||||
total_len,
|
||||
block_size,
|
||||
}
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.total_len
|
||||
}
|
||||
@@ -122,13 +135,17 @@ impl SlidingWindowSorted {
|
||||
}
|
||||
|
||||
/// Reconstruct state from historical data (the elements in [range_start..skip]).
|
||||
/// Uses O(n log n) sort + O(n) block construction instead of O(n√n) individual inserts.
|
||||
pub fn reconstruct(&mut self, partial_values: &[f64], range_start: usize, skip: usize) {
|
||||
self.prev_start = range_start;
|
||||
for idx in range_start..skip {
|
||||
let v = partial_values[idx - range_start];
|
||||
self.running_sum += v;
|
||||
self.sorted.insert(v);
|
||||
let slice = &partial_values[..skip - range_start];
|
||||
if slice.is_empty() {
|
||||
return;
|
||||
}
|
||||
let mut sorted_copy: Vec<f64> = slice.to_vec();
|
||||
self.running_sum = sorted_copy.iter().sum();
|
||||
sorted_copy.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
|
||||
self.sorted = SortedBlocks::from_sorted(&sorted_copy, self.sorted.block_size);
|
||||
}
|
||||
|
||||
/// Add a new value and remove all expired values up to `new_start`.
|
||||
|
||||
@@ -4,6 +4,7 @@ mod per_resolution;
|
||||
mod window_24h;
|
||||
mod windows;
|
||||
mod windows_from_1w;
|
||||
mod windows_to_1m;
|
||||
|
||||
pub use constant::*;
|
||||
pub use distribution_stats::*;
|
||||
@@ -11,3 +12,4 @@ pub use per_resolution::*;
|
||||
pub use window_24h::*;
|
||||
pub use windows::*;
|
||||
pub use windows_from_1w::*;
|
||||
pub use windows_to_1m::*;
|
||||
|
||||
31
crates/brk_computer/src/internal/containers/windows_to_1m.rs
Normal file
31
crates/brk_computer/src/internal/containers/windows_to_1m.rs
Normal file
@@ -0,0 +1,31 @@
|
||||
use brk_traversable::Traversable;
|
||||
|
||||
#[derive(Clone, Traversable)]
|
||||
pub struct WindowsTo1m<A> {
|
||||
pub _24h: A,
|
||||
pub _1w: A,
|
||||
pub _1m: A,
|
||||
}
|
||||
|
||||
impl<A> WindowsTo1m<A> {
|
||||
pub const SUFFIXES: [&'static str; 3] = ["24h", "1w", "1m"];
|
||||
pub const DAYS: [usize; 3] = [1, 7, 30];
|
||||
|
||||
pub fn try_from_fn<E>(
|
||||
mut f: impl FnMut(&str) -> std::result::Result<A, E>,
|
||||
) -> std::result::Result<Self, E> {
|
||||
Ok(Self {
|
||||
_24h: f(Self::SUFFIXES[0])?,
|
||||
_1w: f(Self::SUFFIXES[1])?,
|
||||
_1m: f(Self::SUFFIXES[2])?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn as_array(&self) -> [&A; 3] {
|
||||
[&self._24h, &self._1w, &self._1m]
|
||||
}
|
||||
|
||||
pub fn as_mut_array(&mut self) -> [&mut A; 3] {
|
||||
[&mut self._24h, &mut self._1w, &mut self._1m]
|
||||
}
|
||||
}
|
||||
@@ -307,48 +307,12 @@ impl Computer {
|
||||
})?;
|
||||
|
||||
thread::scope(|scope| -> Result<()> {
|
||||
// Positions only needs indexer + starting_indexes — start immediately.
|
||||
let positions = scope.spawn(|| {
|
||||
timed("Computed positions", || {
|
||||
self.positions
|
||||
.compute(indexer, &starting_indexes, reader, exit)
|
||||
})
|
||||
});
|
||||
timed("Computed blocks", || {
|
||||
self.blocks
|
||||
.compute(indexer, &self.indexes, &starting_indexes, exit)
|
||||
})?;
|
||||
|
||||
// Prices and blocks are independent — parallelize.
|
||||
let (prices_result, blocks_result) = rayon::join(
|
||||
|| {
|
||||
timed("Computed prices", || {
|
||||
self.prices
|
||||
.compute(indexer, &self.indexes, &starting_indexes, exit)
|
||||
})
|
||||
},
|
||||
|| {
|
||||
timed("Computed blocks", || {
|
||||
self.blocks
|
||||
.compute(indexer, &self.indexes, &starting_indexes, exit)
|
||||
})
|
||||
},
|
||||
);
|
||||
prices_result?;
|
||||
blocks_result?;
|
||||
|
||||
// Market only needs indexes, prices, blocks — start it early
|
||||
// so it runs in the background alongside the rest of the pipeline.
|
||||
let market = scope.spawn(|| {
|
||||
timed("Computed market", || {
|
||||
self.market.compute(
|
||||
&self.indexes,
|
||||
&self.prices,
|
||||
&self.blocks,
|
||||
&starting_indexes,
|
||||
exit,
|
||||
)
|
||||
})
|
||||
});
|
||||
|
||||
// inputs and scripts are independent — parallelize
|
||||
let (inputs_result, scripts_result) = rayon::join(
|
||||
let (inputs_result, prices_result) = rayon::join(
|
||||
|| {
|
||||
timed("Computed inputs", || {
|
||||
self.inputs.compute(
|
||||
@@ -361,19 +325,36 @@ impl Computer {
|
||||
})
|
||||
},
|
||||
|| {
|
||||
timed("Computed scripts", || {
|
||||
self.scripts.compute(
|
||||
indexer,
|
||||
&self.outputs,
|
||||
&self.prices,
|
||||
&starting_indexes,
|
||||
exit,
|
||||
)
|
||||
timed("Computed prices", || {
|
||||
self.prices
|
||||
.compute(indexer, &self.indexes, &starting_indexes, exit)
|
||||
})
|
||||
},
|
||||
);
|
||||
inputs_result?;
|
||||
scripts_result?;
|
||||
prices_result?;
|
||||
|
||||
let market = scope.spawn(|| {
|
||||
timed("Computed market", || {
|
||||
self.market.compute(
|
||||
&self.indexes,
|
||||
&self.prices,
|
||||
&self.blocks,
|
||||
&starting_indexes,
|
||||
exit,
|
||||
)
|
||||
})
|
||||
});
|
||||
|
||||
timed("Computed scripts", || {
|
||||
self.scripts.compute(
|
||||
indexer,
|
||||
&self.outputs,
|
||||
&self.prices,
|
||||
&starting_indexes,
|
||||
exit,
|
||||
)
|
||||
})?;
|
||||
|
||||
timed("Computed outputs", || {
|
||||
self.outputs.compute(
|
||||
@@ -387,6 +368,13 @@ impl Computer {
|
||||
)
|
||||
})?;
|
||||
|
||||
let positions = scope.spawn(|| {
|
||||
timed("Computed positions", || {
|
||||
self.positions
|
||||
.compute(indexer, &starting_indexes, reader, exit)
|
||||
})
|
||||
});
|
||||
|
||||
timed("Computed transactions", || {
|
||||
self.transactions.compute(
|
||||
indexer,
|
||||
|
||||
@@ -78,7 +78,7 @@ impl Vecs {
|
||||
let di_usize = di.to_usize();
|
||||
let stack_sats = stack_data[h.to_usize() - start];
|
||||
let avg = if di_usize > first_price_di {
|
||||
let num_days = days.min(di_usize + 1).min(di_usize + 1 - first_price_di);
|
||||
let num_days = days.min(di_usize + 1 - first_price_di);
|
||||
Cents::from(DCA_AMOUNT * num_days / Bitcoin::from(stack_sats))
|
||||
} else {
|
||||
Cents::ZERO
|
||||
|
||||
@@ -6,9 +6,7 @@ use super::{
|
||||
super::{moving_average, range, returns},
|
||||
Vecs, macd, rsi,
|
||||
};
|
||||
use crate::{blocks, internal::RatioDollarsBp32, prices};
|
||||
|
||||
const TF_MULTIPLIERS: [usize; 4] = [1, 7, 30, 365];
|
||||
use crate::{blocks, internal::{RatioDollarsBp32, WindowsTo1m}, prices};
|
||||
|
||||
impl Vecs {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -47,7 +45,7 @@ impl Vecs {
|
||||
)?;
|
||||
|
||||
let daily_returns = &returns.periods._24h.ratio.height;
|
||||
for (rsi_chain, &m) in self.rsi.as_mut_array().into_iter().zip(&TF_MULTIPLIERS) {
|
||||
for (rsi_chain, &m) in self.rsi.as_mut_array().into_iter().zip(&WindowsTo1m::<()>::DAYS) {
|
||||
rsi::compute(
|
||||
rsi_chain,
|
||||
blocks,
|
||||
@@ -59,7 +57,7 @@ impl Vecs {
|
||||
)?;
|
||||
}
|
||||
|
||||
for (macd_chain, &m) in self.macd.as_mut_array().into_iter().zip(&TF_MULTIPLIERS) {
|
||||
for (macd_chain, &m) in self.macd.as_mut_array().into_iter().zip(&WindowsTo1m::<()>::DAYS) {
|
||||
macd::compute(
|
||||
macd_chain,
|
||||
blocks,
|
||||
|
||||
@@ -5,10 +5,10 @@ use vecdb::Database;
|
||||
use super::{MacdChain, RsiChain, Vecs};
|
||||
use crate::{
|
||||
indexes,
|
||||
internal::{PerBlock, PercentPerBlock, RatioPerBlock, Windows},
|
||||
internal::{PerBlock, PercentPerBlock, RatioPerBlock, WindowsTo1m},
|
||||
};
|
||||
|
||||
const VERSION: Version = Version::new(2);
|
||||
const VERSION: Version = Version::new(4);
|
||||
|
||||
impl RsiChain {
|
||||
fn forced_import(
|
||||
@@ -107,8 +107,8 @@ impl Vecs {
|
||||
let v = version + VERSION;
|
||||
|
||||
let rsi =
|
||||
Windows::try_from_fn(|tf| RsiChain::forced_import(db, tf, v + Version::TWO, indexes))?;
|
||||
let macd = Windows::try_from_fn(|tf| MacdChain::forced_import(db, tf, v, indexes))?;
|
||||
WindowsTo1m::try_from_fn(|tf| RsiChain::forced_import(db, tf, v + Version::TWO, indexes))?;
|
||||
let macd = WindowsTo1m::try_from_fn(|tf| MacdChain::forced_import(db, tf, v, indexes))?;
|
||||
|
||||
let stoch_k = PercentPerBlock::forced_import(db, "stoch_k", v, indexes)?;
|
||||
let stoch_d = PercentPerBlock::forced_import(db, "stoch_d", v, indexes)?;
|
||||
|
||||
@@ -31,7 +31,6 @@ pub(super) fn compute(
|
||||
.height
|
||||
.compute_rolling_ema(starting_indexes.height, ws_slow, close, exit)?;
|
||||
|
||||
// MACD line = ema_fast - ema_slow
|
||||
chain.line.height.compute_subtract(
|
||||
starting_indexes.height,
|
||||
&chain.ema_fast.height,
|
||||
@@ -39,7 +38,6 @@ pub(super) fn compute(
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// Signal = EMA of MACD line
|
||||
chain.signal.height.compute_rolling_ema(
|
||||
starting_indexes.height,
|
||||
ws_signal,
|
||||
@@ -47,7 +45,6 @@ pub(super) fn compute(
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// Histogram = line - signal
|
||||
chain.histogram.height.compute_subtract(
|
||||
starting_indexes.height,
|
||||
&chain.line.height,
|
||||
|
||||
@@ -17,7 +17,6 @@ pub(super) fn compute(
|
||||
let ws_rma = blocks.lookback.start_vec(rma_days);
|
||||
let ws_sma = blocks.lookback.start_vec(stoch_sma_days);
|
||||
|
||||
// Gains = max(return, 0)
|
||||
chain.gains.height.compute_transform(
|
||||
starting_indexes.height,
|
||||
returns_source,
|
||||
@@ -25,7 +24,6 @@ pub(super) fn compute(
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// Losses = max(-return, 0)
|
||||
chain.losses.height.compute_transform(
|
||||
starting_indexes.height,
|
||||
returns_source,
|
||||
@@ -33,7 +31,6 @@ pub(super) fn compute(
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// Average gain = RMA of gains
|
||||
chain.average_gain.height.compute_rolling_rma(
|
||||
starting_indexes.height,
|
||||
ws_rma,
|
||||
@@ -41,7 +38,6 @@ pub(super) fn compute(
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// Average loss = RMA of losses
|
||||
chain.average_loss.height.compute_rolling_rma(
|
||||
starting_indexes.height,
|
||||
ws_rma,
|
||||
@@ -49,7 +45,6 @@ pub(super) fn compute(
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// RSI = avg_gain / (avg_gain + avg_loss), stored as ratio (0–1)
|
||||
chain.rsi.bps.height.compute_transform2(
|
||||
starting_indexes.height,
|
||||
&chain.average_gain.height,
|
||||
@@ -62,7 +57,6 @@ pub(super) fn compute(
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// Rolling min/max of RSI over rma_days window
|
||||
chain.rsi_min.bps.height.compute_rolling_min_from_starts(
|
||||
starting_indexes.height,
|
||||
ws_rma,
|
||||
@@ -77,7 +71,6 @@ pub(super) fn compute(
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// StochRSI = (rsi - rsi_min) / (rsi_max - rsi_min), stored as ratio (0–1)
|
||||
chain.stoch_rsi.bps.height.compute_transform3(
|
||||
starting_indexes.height,
|
||||
&chain.rsi.bps.height,
|
||||
@@ -95,7 +88,6 @@ pub(super) fn compute(
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// StochRSI K = SMA of StochRSI
|
||||
chain.stoch_rsi_k.bps.height.compute_rolling_average(
|
||||
starting_indexes.height,
|
||||
ws_sma,
|
||||
@@ -103,7 +95,6 @@ pub(super) fn compute(
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// StochRSI D = SMA of K
|
||||
chain.stoch_rsi_d.bps.height.compute_rolling_average(
|
||||
starting_indexes.height,
|
||||
ws_sma,
|
||||
|
||||
@@ -2,7 +2,7 @@ use brk_traversable::Traversable;
|
||||
use brk_types::{BasisPoints16, BasisPoints32, StoredF32};
|
||||
use vecdb::{Rw, StorageMode};
|
||||
|
||||
use crate::internal::{PerBlock, PercentPerBlock, RatioPerBlock, Windows};
|
||||
use crate::internal::{PerBlock, PercentPerBlock, RatioPerBlock, WindowsTo1m};
|
||||
|
||||
#[derive(Traversable)]
|
||||
pub struct RsiChain<M: StorageMode = Rw> {
|
||||
@@ -29,12 +29,12 @@ pub struct MacdChain<M: StorageMode = Rw> {
|
||||
|
||||
#[derive(Traversable)]
|
||||
pub struct Vecs<M: StorageMode = Rw> {
|
||||
pub rsi: Windows<RsiChain<M>>,
|
||||
pub rsi: WindowsTo1m<RsiChain<M>>,
|
||||
|
||||
pub stoch_k: PercentPerBlock<BasisPoints16, M>,
|
||||
pub stoch_d: PercentPerBlock<BasisPoints16, M>,
|
||||
|
||||
pub pi_cycle: RatioPerBlock<BasisPoints32, M>,
|
||||
|
||||
pub macd: Windows<MacdChain<M>>,
|
||||
pub macd: WindowsTo1m<MacdChain<M>>,
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ impl Vecs {
|
||||
starting_indexes: &Indexes,
|
||||
exit: &Exit,
|
||||
) -> Result<()> {
|
||||
// count, versions, size are independent — parallelize
|
||||
let (r1, (r2, r3)) = rayon::join(
|
||||
|| self.count.compute(indexer, &blocks.lookback, starting_indexes, exit),
|
||||
|| {
|
||||
@@ -34,11 +33,9 @@ impl Vecs {
|
||||
r2?;
|
||||
r3?;
|
||||
|
||||
// Fees depends on size
|
||||
self.fees
|
||||
.compute(indexer, indexes, inputs, &self.size, starting_indexes, exit)?;
|
||||
.compute(indexer, indexes, &inputs.spent, &self.size, starting_indexes, exit)?;
|
||||
|
||||
// Volume depends on fees, counts, and blocks (lookback vecs, interval)
|
||||
self.volume.compute(
|
||||
indexer,
|
||||
indexes,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use brk_error::Result;
|
||||
use brk_indexer::Indexer;
|
||||
use brk_types::{FeeRate, Indexes, Sats};
|
||||
use vecdb::{Exit, unlikely};
|
||||
use vecdb::{AnyStoredVec, AnyVec, Exit, ReadableVec, VecIndex, WritableVec, unlikely};
|
||||
|
||||
use super::super::size;
|
||||
use super::Vecs;
|
||||
@@ -13,65 +13,105 @@ impl Vecs {
|
||||
&mut self,
|
||||
indexer: &Indexer,
|
||||
indexes: &indexes::Vecs,
|
||||
txins: &inputs::Vecs,
|
||||
spent: &inputs::SpentVecs,
|
||||
size_vecs: &size::Vecs,
|
||||
starting_indexes: &Indexes,
|
||||
exit: &Exit,
|
||||
) -> Result<()> {
|
||||
// input_value and output_value are independent — parallelize
|
||||
let (r1, r2) = rayon::join(
|
||||
|| {
|
||||
self.input_value.compute_sum_from_indexes(
|
||||
starting_indexes.tx_index,
|
||||
&indexer.vecs.transactions.first_txin_index,
|
||||
&indexes.tx_index.input_count,
|
||||
&txins.spent.value,
|
||||
exit,
|
||||
)
|
||||
},
|
||||
|| {
|
||||
self.output_value.compute_sum_from_indexes(
|
||||
starting_indexes.tx_index,
|
||||
&indexer.vecs.transactions.first_txout_index,
|
||||
&indexes.tx_index.output_count,
|
||||
&indexer.vecs.outputs.value,
|
||||
exit,
|
||||
)
|
||||
},
|
||||
);
|
||||
r1?;
|
||||
r2?;
|
||||
|
||||
self.fee.tx_index.compute_transform2(
|
||||
self.input_value.compute_sum_from_indexes(
|
||||
starting_indexes.tx_index,
|
||||
&self.input_value,
|
||||
&self.output_value,
|
||||
|(i, input, output, ..)| {
|
||||
let fee = if unlikely(input.is_max()) {
|
||||
&indexer.vecs.transactions.first_txin_index,
|
||||
&indexes.tx_index.input_count,
|
||||
&spent.value,
|
||||
exit,
|
||||
)?;
|
||||
self.output_value.compute_sum_from_indexes(
|
||||
starting_indexes.tx_index,
|
||||
&indexer.vecs.transactions.first_txout_index,
|
||||
&indexes.tx_index.output_count,
|
||||
&indexer.vecs.outputs.value,
|
||||
exit,
|
||||
)?;
|
||||
|
||||
self.compute_fee_and_fee_rate(size_vecs, starting_indexes, exit)?;
|
||||
|
||||
let (r3, r4) = rayon::join(
|
||||
|| self.fee.derive_from_with_skip(indexer, indexes, starting_indexes, exit, 1),
|
||||
|| self.fee_rate.derive_from_with_skip(indexer, indexes, starting_indexes, exit, 1),
|
||||
);
|
||||
r3?;
|
||||
r4?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compute_fee_and_fee_rate(
|
||||
&mut self,
|
||||
size_vecs: &size::Vecs,
|
||||
starting_indexes: &Indexes,
|
||||
exit: &Exit,
|
||||
) -> Result<()> {
|
||||
let dep_version = self.input_value.version()
|
||||
+ self.output_value.version()
|
||||
+ size_vecs.vsize.tx_index.version();
|
||||
|
||||
self.fee
|
||||
.tx_index
|
||||
.validate_computed_version_or_reset(dep_version)?;
|
||||
self.fee_rate
|
||||
.tx_index
|
||||
.validate_computed_version_or_reset(dep_version)?;
|
||||
|
||||
let target = self
|
||||
.input_value
|
||||
.len()
|
||||
.min(self.output_value.len())
|
||||
.min(size_vecs.vsize.tx_index.len());
|
||||
let min = self
|
||||
.fee
|
||||
.tx_index
|
||||
.len()
|
||||
.min(self.fee_rate.tx_index.len())
|
||||
.min(starting_indexes.tx_index.to_usize());
|
||||
|
||||
if min >= target {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.fee.tx_index.truncate_if_needed(starting_indexes.tx_index)?;
|
||||
self.fee_rate.tx_index.truncate_if_needed(starting_indexes.tx_index)?;
|
||||
|
||||
loop {
|
||||
let skip = self.fee.tx_index.len();
|
||||
let end = self.fee.tx_index.batch_end(target);
|
||||
if skip >= end {
|
||||
break;
|
||||
}
|
||||
|
||||
let input_batch = self.input_value.collect_range_at(skip, end);
|
||||
let output_batch = self.output_value.collect_range_at(skip, end);
|
||||
let vsize_batch = size_vecs.vsize.tx_index.collect_range_at(skip, end);
|
||||
|
||||
for j in 0..input_batch.len() {
|
||||
let fee = if unlikely(input_batch[j].is_max()) {
|
||||
Sats::ZERO
|
||||
} else {
|
||||
input - output
|
||||
input_batch[j] - output_batch[j]
|
||||
};
|
||||
(i, fee)
|
||||
},
|
||||
exit,
|
||||
)?;
|
||||
self.fee.tx_index.push(fee);
|
||||
self.fee_rate
|
||||
.tx_index
|
||||
.push(FeeRate::from((fee, vsize_batch[j])));
|
||||
}
|
||||
|
||||
self.fee_rate.tx_index.compute_transform2(
|
||||
starting_indexes.tx_index,
|
||||
&self.fee.tx_index,
|
||||
&size_vecs.vsize.tx_index,
|
||||
|(tx_index, fee, vsize, ..)| (tx_index, FeeRate::from((fee, vsize))),
|
||||
exit,
|
||||
)?;
|
||||
|
||||
// Skip coinbase (first tx per block) since it has fee=0
|
||||
self.fee
|
||||
.derive_from_with_skip(indexer, indexes, starting_indexes, exit, 1)?;
|
||||
|
||||
// Skip coinbase (first tx per block) since it has no feerate
|
||||
self.fee_rate
|
||||
.derive_from_with_skip(indexer, indexes, starting_indexes, exit, 1)?;
|
||||
let _lock = exit.lock();
|
||||
let (r1, r2) = rayon::join(
|
||||
|| self.fee.tx_index.write(),
|
||||
|| self.fee_rate.tx_index.write(),
|
||||
);
|
||||
r1?;
|
||||
r2?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user