global: snapshot

This commit is contained in:
nym21
2026-02-27 23:00:43 +01:00
parent d5ec291579
commit 85c7933ad6
41 changed files with 534 additions and 583 deletions

View File

@@ -14,12 +14,49 @@ impl Vecs {
starting_indexes: &ComputeIndexes,
exit: &Exit,
) -> Result<()> {
self.time.timestamp.compute_first(
starting_indexes,
&indexer.vecs.blocks.timestamp,
indexes,
exit,
)?;
{
let ts = &mut self.time.timestamp;
macro_rules! period {
($field:ident) => {
ts.$field.compute_transform(
starting_indexes.$field,
&indexes.$field.first_height,
|(idx, _, _)| (idx, idx.to_timestamp()),
exit,
)?;
};
}
period!(minute1);
period!(minute5);
period!(minute10);
period!(minute30);
period!(hour1);
period!(hour4);
period!(hour12);
period!(day1);
period!(day3);
period!(week1);
period!(month1);
period!(month3);
period!(month6);
period!(year1);
period!(year10);
ts.halvingepoch.compute_indirect(
starting_indexes.halvingepoch,
&indexes.halvingepoch.first_height,
&indexer.vecs.blocks.timestamp,
exit,
)?;
ts.difficultyepoch.compute_indirect(
starting_indexes.difficultyepoch,
&indexes.difficultyepoch.first_height,
&indexer.vecs.blocks.timestamp,
exit,
)?;
}
self.count
.compute(indexer, &self.time, starting_indexes, exit)?;
self.interval

View File

@@ -204,7 +204,7 @@ pub(crate) fn process_blocks(
debug!("AddressCache created, entering main loop");
// Cache for day1 lookups - same day1 repeats ~140 times per day
let mut cached_day1 = Day1::default();
let mut cached_day1: Option<Day1> = None;
let mut cached_date_first_height = Height::ZERO;
let mut cached_date_height_count = StoredU64::default();
@@ -428,12 +428,12 @@ pub(crate) fn process_blocks(
// avoiding redundant PcoVec page decompressions.
let date = height_to_date_vec[offset];
let day1 = Day1::try_from(date).unwrap();
let (date_first_height, date_height_count) = if day1 == cached_day1 {
let (date_first_height, date_height_count) = if cached_day1 == Some(day1) {
(cached_date_first_height, cached_date_height_count)
} else {
let fh: Height = day1_to_first_height.collect_one(day1).unwrap();
let hc = day1_to_height_count.collect_one(day1).unwrap();
cached_day1 = day1;
cached_day1 = Some(day1);
cached_date_first_height = fh;
cached_date_height_count = hc;
(fh, hc)

View File

@@ -13,7 +13,8 @@ use brk_types::{
use derive_more::{Deref, DerefMut};
use schemars::JsonSchema;
use vecdb::{
Database, EagerVec, Exit, ImportableVec, PcoVec, ReadableVec, Rw, StorageMode, VecIndex,
AnyVec, Database, EagerVec, Exit, ImportableVec, PcoVec, ReadableVec, Rw, StorageMode,
VecIndex, WritableVec,
};
use crate::{
@@ -59,7 +60,7 @@ where
macro_rules! period {
($idx:ident) => {
ImportableVec::forced_import(db, &format!("{name}_{}", stringify!($idx)), v)?
ImportableVec::forced_import(db, name, v)?
};
}
@@ -76,15 +77,10 @@ where
) -> Result<()> {
macro_rules! period {
($field:ident) => {
self.0.$field.compute_transform(
self.0.$field.compute_indirect(
starting_indexes.$field,
&indexes.$field.first_height,
|(idx, first_h, _)| {
let v = height_source
.collect_one(first_h)
.unwrap_or_else(|| T::from(0_usize));
(idx, v)
},
height_source,
exit,
)?;
};
@@ -122,25 +118,17 @@ where
let src_len = height_source.len();
macro_rules! period {
($field:ident) => {{
let fh = &indexes.$field.first_height;
self.0.$field.compute_transform(
($field:ident) => {
compute_period_extremum(
&mut self.0.$field,
starting_indexes.$field,
fh,
|(idx, first_h, _)| {
let end_h = Height::from(
fh.collect_one_at(idx.to_usize() + 1)
.map(|h: Height| h.to_usize())
.unwrap_or(src_len),
);
let v = height_source
.max(first_h, end_h)
.unwrap_or_else(|| T::from(0_usize));
(idx, v)
},
&indexes.$field.first_height,
height_source,
src_len,
T::max,
exit,
)?;
}};
};
}
period!(minute1);
@@ -175,25 +163,17 @@ where
let src_len = height_source.len();
macro_rules! period {
($field:ident) => {{
let fh = &indexes.$field.first_height;
self.0.$field.compute_transform(
($field:ident) => {
compute_period_extremum(
&mut self.0.$field,
starting_indexes.$field,
fh,
|(idx, first_h, _)| {
let end_h = Height::from(
fh.collect_one_at(idx.to_usize() + 1)
.map(|h: Height| h.to_usize())
.unwrap_or(src_len),
);
let v = height_source
.min(first_h, end_h)
.unwrap_or_else(|| T::from(0_usize));
(idx, v)
},
&indexes.$field.first_height,
height_source,
src_len,
T::min,
exit,
)?;
}};
};
}
period!(minute1);
@@ -217,3 +197,60 @@ where
Ok(())
}
}
/// Compute per-period extremum (max or min) of height_source values.
///
/// Each period's range is `[fh[i]..fh[i+1])` of height_source.
/// Uses a cursor on height_source so each page is decompressed at most once.
fn compute_period_extremum<I: VecIndex, T: ComputedVecValue + JsonSchema>(
out: &mut EagerVec<PcoVec<I, T>>,
starting_index: I,
fh: &impl ReadableVec<I, Height>,
height_source: &impl ReadableVec<Height, T>,
src_len: usize,
better: fn(T, T) -> T,
exit: &Exit,
) -> Result<()> {
out.validate_and_truncate(fh.version() + height_source.version(), starting_index)?;
let mut cursor = height_source.cursor();
Ok(out.repeat_until_complete(exit, |this| {
let skip = this.len();
let end = fh.len();
if skip >= end {
return Ok(());
}
let fh_batch: Vec<Height> = fh.collect_range_at(skip, (end + 1).min(fh.len()));
if cursor.position() < fh_batch[0].to_usize() {
cursor.advance(fh_batch[0].to_usize() - cursor.position());
}
for j in 0..(end - skip) {
let first_h = fh_batch[j].to_usize();
let end_h = fh_batch.get(j + 1).map_or(src_len, |h| h.to_usize());
if cursor.position() < first_h {
cursor.advance(first_h - cursor.position());
}
let range_len = end_h.saturating_sub(first_h);
let v = if range_len > 0 {
cursor
.fold(range_len, None, |acc, b| {
Some(match acc {
Some(a) => better(a, b),
None => b,
})
})
.unwrap_or_else(|| T::from(0_usize))
} else {
T::from(0_usize)
};
this.checked_push_at(skip + j, v)?;
}
Ok(())
})?)
}

View File

@@ -59,7 +59,7 @@ where
macro_rules! period {
($idx:ident) => {
LazyVecFrom1::transformed::<Transform>(
&format!("{name}_{}", stringify!($idx)),
name,
version,
source.$idx.read_only_boxed_clone(),
)

View File

@@ -65,10 +65,17 @@ impl Computer {
let i = Instant::now();
let (indexes, positions) = thread::scope(|s| -> Result<_> {
let positions_handle = big_thread().spawn_scoped(s, || -> Result<_> {
Ok(Box::new(positions::Vecs::forced_import(&computed_path, VERSION)?))
Ok(Box::new(positions::Vecs::forced_import(
&computed_path,
VERSION,
)?))
})?;
let indexes = Box::new(indexes::Vecs::forced_import(&computed_path, VERSION, indexer)?);
let indexes = Box::new(indexes::Vecs::forced_import(
&computed_path,
VERSION,
indexer,
)?);
let positions = positions_handle.join().unwrap()?;
Ok((indexes, positions))
@@ -79,11 +86,19 @@ impl Computer {
let i = Instant::now();
let (inputs, outputs) = thread::scope(|s| -> Result<_> {
let inputs_handle = big_thread().spawn_scoped(s, || -> Result<_> {
Ok(Box::new(inputs::Vecs::forced_import(&computed_path, VERSION, &indexes)?))
Ok(Box::new(inputs::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?))
})?;
let outputs_handle = big_thread().spawn_scoped(s, || -> Result<_> {
Ok(Box::new(outputs::Vecs::forced_import(&computed_path, VERSION, &indexes)?))
Ok(Box::new(outputs::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?))
})?;
let inputs = inputs_handle.join().unwrap()?;
@@ -96,7 +111,11 @@ impl Computer {
let i = Instant::now();
let constants = Box::new(constants::Vecs::new(VERSION, &indexes));
// Price must be created before market since market's lazy vecs reference price
let prices = Box::new(prices::Vecs::forced_import(&computed_path, VERSION, &indexes)?);
let prices = Box::new(prices::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?);
info!("Imported price/constants in {:?}", i.elapsed());
let i = Instant::now();
@@ -104,12 +123,21 @@ impl Computer {
thread::scope(|s| -> Result<_> {
// Import blocks module (no longer needs prices)
let blocks_handle = big_thread().spawn_scoped(s, || -> Result<_> {
Ok(Box::new(blocks::Vecs::forced_import(&computed_path, VERSION, indexer, &indexes)?))
Ok(Box::new(blocks::Vecs::forced_import(
&computed_path,
VERSION,
indexer,
&indexes,
)?))
})?;
// Import mining module (separate database)
let mining_handle = big_thread().spawn_scoped(s, || -> Result<_> {
Ok(Box::new(mining::Vecs::forced_import(&computed_path, VERSION, &indexes)?))
Ok(Box::new(mining::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?))
})?;
// Import transactions module
@@ -130,9 +158,11 @@ impl Computer {
)?))
})?;
let cointime = Box::new(
cointime::Vecs::forced_import(&computed_path, VERSION, &indexes)?
);
let cointime = Box::new(cointime::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?);
let blocks = blocks_handle.join().unwrap()?;
let mining = mining_handle.join().unwrap()?;
@@ -154,16 +184,21 @@ impl Computer {
// Threads inside
let i = Instant::now();
let distribution = Box::new(
distribution::Vecs::forced_import(&computed_path, VERSION, &indexes)?
);
let distribution = Box::new(distribution::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
)?);
info!("Imported distribution in {:?}", i.elapsed());
// Supply must be imported after distribution (references distribution's supply)
let i = Instant::now();
let supply = Box::new(
supply::Vecs::forced_import(&computed_path, VERSION, &indexes, &distribution)?
);
let supply = Box::new(supply::Vecs::forced_import(
&computed_path,
VERSION,
&indexes,
&distribution,
)?);
info!("Imported supply in {:?}", i.elapsed());
let i = Instant::now();
@@ -286,14 +321,25 @@ impl Computer {
// Inputs → scripts → outputs (sequential)
info!("Computing inputs...");
let i = Instant::now();
self.inputs
.compute(indexer, &self.indexes, &self.blocks, &starting_indexes, exit)?;
self.inputs.compute(
indexer,
&self.indexes,
&self.blocks,
&starting_indexes,
exit,
)?;
info!("Computed inputs in {:?}", i.elapsed());
info!("Computing scripts...");
let i = Instant::now();
self.scripts
.compute(indexer, &self.blocks, &self.outputs, &self.prices, &starting_indexes, exit)?;
self.scripts.compute(
indexer,
&self.blocks,
&self.outputs,
&self.prices,
&starting_indexes,
exit,
)?;
info!("Computed scripts in {:?}", i.elapsed());
info!("Computing outputs...");

View File

@@ -37,7 +37,6 @@ impl Vecs {
&self.split.high.cents,
&self.split.low.cents,
&self.split.close.cents,
indexes,
exit,
)?;

View File

@@ -10,18 +10,18 @@ use schemars::JsonSchema;
use serde::Serialize;
use vecdb::{
BytesVec, BytesVecValue, Database, EagerVec, Exit, Formattable, ImportableVec, LazyVecFrom1,
ReadableCloneableVec, ReadableVec, Rw, StorageMode, UnaryTransform,
ReadableCloneableVec, Rw, StorageMode, UnaryTransform,
};
use crate::{
ComputeIndexes, indexes, indexes_from,
ComputeIndexes, indexes_from,
internal::{ComputedHeightDerivedLast, EagerIndexes, Indexes},
};
// ── EagerOhlcIndexes ─────────────────────────────────────────────────
#[derive(Deref, DerefMut, Traversable)]
#[traversable(transparent)]
#[traversable(merge)]
pub struct OhlcVecs<T, M: StorageMode = Rw>(
#[allow(clippy::type_complexity)]
pub Indexes<
@@ -58,7 +58,7 @@ where
macro_rules! period {
($idx:ident) => {
ImportableVec::forced_import(db, &format!("{name}_{}", stringify!($idx)), v)?
ImportableVec::forced_import(db, name, v)?
};
}
@@ -67,7 +67,6 @@ where
}
impl OhlcVecs<OHLCCents> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn compute_from_split(
&mut self,
starting_indexes: &ComputeIndexes,
@@ -75,26 +74,24 @@ impl OhlcVecs<OHLCCents> {
high: &EagerIndexes<Cents>,
low: &EagerIndexes<Cents>,
close: &ComputedHeightDerivedLast<Cents>,
indexes: &indexes::Vecs,
exit: &Exit,
) -> Result<()> {
macro_rules! period {
($field:ident) => {
self.0.$field.compute_transform(
self.0.$field.compute_transform4(
starting_indexes.$field,
&indexes.$field.first_height,
|(idx, _first_h, _)| {
let o = open.$field.collect_one(idx).unwrap_or_default();
let h = high.$field.collect_one(idx).unwrap_or_default();
let l = low.$field.collect_one(idx).unwrap_or_default();
let c = close.$field.collect_one(idx).flatten().unwrap_or_default();
&open.$field,
&high.$field,
&low.$field,
&close.$field,
|(idx, o, h, l, c, _)| {
(
idx,
OHLCCents {
open: Open::new(o),
high: High::new(h),
low: Low::new(l),
close: Close::new(c),
close: Close::new(c.unwrap_or_default()),
},
)
},
@@ -105,14 +102,13 @@ impl OhlcVecs<OHLCCents> {
macro_rules! epoch {
($field:ident) => {
self.0.$field.compute_transform(
self.0.$field.compute_transform4(
starting_indexes.$field,
&indexes.$field.first_height,
|(idx, _first_h, _)| {
let o = open.$field.collect_one(idx).unwrap_or_default();
let h = high.$field.collect_one(idx).unwrap_or_default();
let l = low.$field.collect_one(idx).unwrap_or_default();
let c = close.$field.collect_one(idx).unwrap_or_default();
&open.$field,
&high.$field,
&low.$field,
&close.$field,
|(idx, o, h, l, c, _)| {
(
idx,
OHLCCents {
@@ -153,7 +149,7 @@ impl OhlcVecs<OHLCCents> {
// ── LazyOhlcIndexes ──────────────────────────────────────────────────
#[derive(Clone, Deref, DerefMut, Traversable)]
#[traversable(transparent)]
#[traversable(merge)]
pub struct LazyOhlcVecs<T, S>(
#[allow(clippy::type_complexity)]
pub Indexes<
@@ -193,7 +189,7 @@ where
macro_rules! period {
($idx:ident) => {
LazyVecFrom1::transformed::<Transform>(
&format!("{name}_{}", stringify!($idx)),
name,
version,
source.$idx.read_only_boxed_clone(),
)