computer: snapshot

This commit is contained in:
nym21
2025-12-18 18:13:48 +01:00
parent c5657b9c31
commit 08d17b4a09
23 changed files with 95 additions and 81 deletions

1
Cargo.lock generated
View File

@@ -5390,7 +5390,6 @@ dependencies = [
"parking_lot",
"pco",
"rawdb",
"rayon",
"serde",
"serde_json",
"thiserror 2.0.17",

View File

@@ -13,7 +13,7 @@ Single dependency to access any BRK component. Enable only what you need via fea
brk = { version = "0.x", features = ["query", "types"] }
```
```rust
```rust,ignore
use brk::query::Query;
use brk::types::Height;
```
@@ -41,16 +41,3 @@ use brk::types::Height;
| `store` | `brk_store` | Key-value storage |
| `traversable` | `brk_traversable` | Data traversal |
| `types` | `brk_types` | Domain types |
## Common Combinations
```toml
# Query-only (read computed data)
brk = { features = ["query", "types"] }
# Full indexing pipeline
brk = { features = ["indexer", "computer", "query"] }
# API server
brk = { features = ["server", "query", "mempool"] }
```

View File

@@ -16,7 +16,7 @@ Bundle and minify JavaScript modules using Rolldown, with file watching for deve
## Core API
```rust
```rust,ignore
// One-shot build
let dist = bundle(modules_path, websites_path, "src", false).await?;

View File

@@ -18,7 +18,7 @@ Compute 1000+ on-chain metrics from indexed blockchain data: supply breakdowns,
## Core API
```rust
```rust,ignore
let mut computer = Computer::forced_import(&outputs_path, &indexer, fetcher)?;
// Compute all metrics for new blocks

View File

@@ -82,10 +82,10 @@ impl PricePercentiles {
}
impl PricePercentiles {
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
pub fn write(&mut self) -> Result<()> {
for vec in self.vecs.iter_mut().flatten() {
if let Some(dateindex_vec) = vec.dateindex.as_mut() {
dateindex_vec.safe_write(exit)?;
dateindex_vec.write()?;
}
}
Ok(())

View File

@@ -5,7 +5,10 @@ use brk_grouper::ByAddressType;
use brk_traversable::Traversable;
use brk_types::{Height, StoredU64, Version};
use derive_deref::{Deref, DerefMut};
use vecdb::{Database, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVec, TypedVecIterator};
use vecdb::{
AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVec,
TypedVecIterator,
};
use crate::{
Indexes,
@@ -61,8 +64,19 @@ impl AddressTypeToHeightToAddressCount {
})?))
}
pub fn write(&mut self) -> Result<()> {
self.p2pk65.write()?;
self.p2pk33.write()?;
self.p2pkh.write()?;
self.p2sh.write()?;
self.p2wpkh.write()?;
self.p2wsh.write()?;
self.p2tr.write()?;
self.p2a.write()?;
Ok(())
}
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
use vecdb::AnyStoredVec;
self.p2pk65.safe_write(exit)?;
self.p2pk33.safe_write(exit)?;
self.p2pkh.safe_write(exit)?;

View File

@@ -224,9 +224,9 @@ impl DynCohortVecs for AddressCohortVecs {
Ok(())
}
fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
self.height_to_addr_count.safe_write(exit)?;
self.metrics.safe_write(exit)?;
fn write_stateful_vecs(&mut self, height: Height) -> Result<()> {
self.height_to_addr_count.write()?;
self.metrics.write()?;
if let Some(state) = self.state.as_mut() {
state.inner.commit(height)?;

View File

@@ -223,9 +223,9 @@ impl AddressCohorts {
}
/// Write stateful vectors for separate cohorts.
pub fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
pub fn write_stateful_vecs(&mut self, height: Height) -> Result<()> {
self.par_iter_separate_mut()
.try_for_each(|v| v.safe_write_stateful_vecs(height, exit))
.try_for_each(|v| v.write_stateful_vecs(height))
}
/// Get minimum height from all separate cohorts' height-indexed vectors.

View File

@@ -35,7 +35,7 @@ pub trait DynCohortVecs: Send + Sync {
) -> Result<()>;
/// Write stateful vectors to disk.
fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()>;
fn write_stateful_vecs(&mut self, height: Height) -> Result<()>;
/// First phase of post-processing computations.
#[allow(clippy::too_many_arguments)]

View File

@@ -189,8 +189,8 @@ impl DynCohortVecs for UTXOCohortVecs {
Ok(())
}
fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
self.metrics.safe_write(exit)?;
fn write_stateful_vecs(&mut self, height: Height) -> Result<()> {
self.metrics.write()?;
if let Some(state) = self.state.as_mut() {
state.commit(height)?;

View File

@@ -350,15 +350,15 @@ impl UTXOCohorts {
}
/// Write stateful vectors for separate and aggregate cohorts.
pub fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> {
pub fn write_stateful_vecs(&mut self, height: Height) -> Result<()> {
// Flush separate cohorts (includes metrics + state)
self.par_iter_separate_mut()
.try_for_each(|v| v.safe_write_stateful_vecs(height, exit))?;
.try_for_each(|v| v.write_stateful_vecs(height))?;
// Write aggregate cohorts' metrics (including price_percentiles)
// Note: aggregate cohorts no longer maintain price_to_amount state
for v in self.0.iter_aggregate_mut() {
v.metrics.safe_write(exit)?;
v.metrics.write()?;
}
Ok(())
}

View File

@@ -473,8 +473,6 @@ pub fn process_blocks(
&& height != Height::ZERO
&& height.to_usize() % FLUSH_INTERVAL == 0
{
let _lock = exit.lock();
// Drop readers to release mmap handles
drop(vr);
@@ -488,8 +486,10 @@ pub fn process_blocks(
loaded_updates,
)?;
let _lock = exit.lock();
// Write to disk (pure I/O) - no changes saved for periodic flushes
write(vecs, height, chain_state, false, exit)?;
write(vecs, height, chain_state, false)?;
// Recreate readers
vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
@@ -512,7 +512,7 @@ pub fn process_blocks(
)?;
// Write to disk (pure I/O) - save changes for rollback
write(vecs, last_height, chain_state, true, exit)?;
write(vecs, last_height, chain_state, true)?;
}
Ok(())

View File

@@ -4,9 +4,12 @@
//! - `process_address_updates`: applies cached address changes to storage
//! - `flush`: writes all data to disk
use std::time::Instant;
use brk_error::Result;
use brk_types::Height;
use vecdb::{AnyStoredVec, Exit, GenericStoredVec, Stamp};
use log::info;
use vecdb::{AnyStoredVec, GenericStoredVec, Stamp};
use crate::stateful::{
Vecs,
@@ -61,19 +64,19 @@ pub fn write(
height: Height,
chain_state: &[BlockState],
with_changes: bool,
exit: &Exit,
) -> Result<()> {
info!("Writing to disk...");
let i = Instant::now();
// Flush cohort states (separate + aggregate)
vecs.utxo_cohorts.safe_write_stateful_vecs(height, exit)?;
vecs.address_cohorts
.safe_write_stateful_vecs(height, exit)?;
vecs.utxo_cohorts.write_stateful_vecs(height)?;
vecs.address_cohorts.write_stateful_vecs(height)?;
// Flush height-indexed vectors
vecs.height_to_unspendable_supply.safe_write(exit)?;
vecs.height_to_opreturn_supply.safe_write(exit)?;
vecs.addresstype_to_height_to_addr_count.safe_write(exit)?;
vecs.addresstype_to_height_to_empty_addr_count
.safe_write(exit)?;
vecs.height_to_unspendable_supply.write()?;
vecs.height_to_opreturn_supply.write()?;
vecs.addresstype_to_height_to_addr_count.write()?;
vecs.addresstype_to_height_to_empty_addr_count.write()?;
// Flush large vecs in parallel
let stamp = Stamp::from(height);
@@ -100,5 +103,7 @@ pub fn write(
vecs.chain_state
.stamped_write_maybe_with_changes(stamp, with_changes)?;
info!("Wrote in {:?}", i.elapsed());
Ok(())
}

View File

@@ -114,10 +114,10 @@ impl ActivityMetrics {
}
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.height_to_sent.safe_write(exit)?;
self.height_to_satblocks_destroyed.safe_write(exit)?;
self.height_to_satdays_destroyed.safe_write(exit)?;
pub fn write(&mut self) -> Result<()> {
self.height_to_sent.write()?;
self.height_to_satblocks_destroyed.write()?;
self.height_to_satdays_destroyed.write()?;
Ok(())
}

View File

@@ -106,20 +106,20 @@ impl CohortMetrics {
}
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.supply.safe_write(exit)?;
self.activity.safe_write(exit)?;
pub fn write(&mut self) -> Result<()> {
self.supply.write()?;
self.activity.write()?;
if let Some(realized) = self.realized.as_mut() {
realized.safe_write(exit)?;
realized.write()?;
}
if let Some(unrealized) = self.unrealized.as_mut() {
unrealized.safe_write(exit)?;
unrealized.write()?;
}
if let Some(price_paid) = self.price_paid.as_mut() {
price_paid.safe_write(exit)?;
price_paid.write()?;
}
Ok(())

View File

@@ -112,11 +112,11 @@ impl PricePaidMetrics {
}
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.height_to_min_price_paid.safe_write(exit)?;
self.height_to_max_price_paid.safe_write(exit)?;
pub fn write(&mut self) -> Result<()> {
self.height_to_min_price_paid.write()?;
self.height_to_max_price_paid.write()?;
if let Some(price_percentiles) = self.price_percentiles.as_mut() {
price_percentiles.safe_write(exit)?;
price_percentiles.write()?;
}
Ok(())
}

View File

@@ -433,17 +433,17 @@ impl RealizedMetrics {
}
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.height_to_realized_cap.safe_write(exit)?;
self.height_to_realized_profit.safe_write(exit)?;
self.height_to_realized_loss.safe_write(exit)?;
self.height_to_value_created.safe_write(exit)?;
self.height_to_value_destroyed.safe_write(exit)?;
pub fn write(&mut self) -> Result<()> {
self.height_to_realized_cap.write()?;
self.height_to_realized_profit.write()?;
self.height_to_realized_loss.write()?;
self.height_to_value_created.write()?;
self.height_to_value_destroyed.write()?;
if let Some(v) = self.height_to_adjusted_value_created.as_mut() {
v.safe_write(exit)?;
v.write()?;
}
if let Some(v) = self.height_to_adjusted_value_destroyed.as_mut() {
v.safe_write(exit)?;
v.write()?;
}
Ok(())
}

View File

@@ -131,9 +131,9 @@ impl SupplyMetrics {
}
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.height_to_supply.safe_write(exit)?;
self.height_to_utxo_count.safe_write(exit)?;
pub fn write(&mut self) -> Result<()> {
self.height_to_supply.write()?;
self.height_to_utxo_count.write()?;
Ok(())
}

View File

@@ -219,15 +219,15 @@ impl UnrealizedMetrics {
}
/// Write height-indexed vectors to disk.
pub fn safe_write(&mut self, exit: &Exit) -> Result<()> {
self.height_to_supply_in_profit.safe_write(exit)?;
self.height_to_supply_in_loss.safe_write(exit)?;
self.height_to_unrealized_profit.safe_write(exit)?;
self.height_to_unrealized_loss.safe_write(exit)?;
self.dateindex_to_supply_in_profit.safe_write(exit)?;
self.dateindex_to_supply_in_loss.safe_write(exit)?;
self.dateindex_to_unrealized_profit.safe_write(exit)?;
self.dateindex_to_unrealized_loss.safe_write(exit)?;
pub fn write(&mut self) -> Result<()> {
self.height_to_supply_in_profit.write()?;
self.height_to_supply_in_loss.write()?;
self.height_to_unrealized_profit.write()?;
self.height_to_unrealized_loss.write()?;
self.dateindex_to_supply_in_profit.write()?;
self.dateindex_to_supply_in_loss.write()?;
self.dateindex_to_unrealized_profit.write()?;
self.dateindex_to_unrealized_loss.write()?;
Ok(())
}

View File

@@ -16,7 +16,7 @@ Fetch OHLC (Open/High/Low/Close) price data from Binance, Kraken, or BRK's own A
## Core API
```rust
```rust,ignore
let mut fetcher = Fetcher::import(true, Some(&hars_path))?;
// Daily price

View File

@@ -51,6 +51,15 @@ let blockhash = indexer.vecs.block.height_to_blockhash.get(height)?;
5. **Finalize**: Sequential store updates, UTXO set mutations
6. **Commit**: Periodic flush to disk
## Performance
| Machine | Time | Disk | Peak Disk | Memory | Peak Memory |
|---------|------|------|-----------|--------|-------------|
| MBP M3 Pro (36GB, internal SSD) | 3.1h | 233 GB | 307 GB | 5.5 GB | 11 GB |
| Mac Mini M4 (16GB, external SSD) | 4.9h | 233 GB | 303 GB | 5.4 GB | 11 GB |
Full benchmark data: [`/benches/brk_indexer`](/benches/brk_indexer)
## Built On
- `brk_iterator` for block iteration