diff --git a/Cargo.lock b/Cargo.lock index ecba31013..d0afeec89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5390,7 +5390,6 @@ dependencies = [ "parking_lot", "pco", "rawdb", - "rayon", "serde", "serde_json", "thiserror 2.0.17", diff --git a/crates/brk/README.md b/crates/brk/README.md index 547f174ea..336240bd9 100644 --- a/crates/brk/README.md +++ b/crates/brk/README.md @@ -13,7 +13,7 @@ Single dependency to access any BRK component. Enable only what you need via fea brk = { version = "0.x", features = ["query", "types"] } ``` -```rust +```rust,ignore use brk::query::Query; use brk::types::Height; ``` @@ -41,16 +41,3 @@ use brk::types::Height; | `store` | `brk_store` | Key-value storage | | `traversable` | `brk_traversable` | Data traversal | | `types` | `brk_types` | Domain types | - -## Common Combinations - -```toml -# Query-only (read computed data) -brk = { features = ["query", "types"] } - -# Full indexing pipeline -brk = { features = ["indexer", "computer", "query"] } - -# API server -brk = { features = ["server", "query", "mempool"] } -``` diff --git a/crates/brk_bundler/README.md b/crates/brk_bundler/README.md index 5de296be2..23f41bb0c 100644 --- a/crates/brk_bundler/README.md +++ b/crates/brk_bundler/README.md @@ -16,7 +16,7 @@ Bundle and minify JavaScript modules using Rolldown, with file watching for deve ## Core API -```rust +```rust,ignore // One-shot build let dist = bundle(modules_path, websites_path, "src", false).await?; diff --git a/crates/brk_computer/README.md b/crates/brk_computer/README.md index 15e980ebf..6d84b9f77 100644 --- a/crates/brk_computer/README.md +++ b/crates/brk_computer/README.md @@ -18,7 +18,7 @@ Compute 1000+ on-chain metrics from indexed blockchain data: supply breakdowns, ## Core API -```rust +```rust,ignore let mut computer = Computer::forced_import(&outputs_path, &indexer, fetcher)?; // Compute all metrics for new blocks diff --git a/crates/brk_computer/src/grouped/price_percentiles.rs b/crates/brk_computer/src/grouped/price_percentiles.rs index 2e052a4f4..0a6b2f4d3 100644 --- a/crates/brk_computer/src/grouped/price_percentiles.rs +++ b/crates/brk_computer/src/grouped/price_percentiles.rs @@ -82,10 +82,10 @@ impl PricePercentiles { } impl PricePercentiles { - pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { + pub fn write(&mut self) -> Result<()> { for vec in self.vecs.iter_mut().flatten() { if let Some(dateindex_vec) = vec.dateindex.as_mut() { - dateindex_vec.safe_write(exit)?; + dateindex_vec.write()?; } } Ok(()) diff --git a/crates/brk_computer/src/stateful/address/address_count.rs b/crates/brk_computer/src/stateful/address/address_count.rs index 4cb7f291a..eeeb63616 100644 --- a/crates/brk_computer/src/stateful/address/address_count.rs +++ b/crates/brk_computer/src/stateful/address/address_count.rs @@ -5,7 +5,10 @@ use brk_grouper::ByAddressType; use brk_traversable::Traversable; use brk_types::{Height, StoredU64, Version}; use derive_deref::{Deref, DerefMut}; -use vecdb::{Database, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVec, TypedVecIterator}; +use vecdb::{ + AnyStoredVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec, PcoVec, + TypedVecIterator, +}; use crate::{ Indexes, @@ -61,8 +64,19 @@ impl AddressTypeToHeightToAddressCount { })?)) } + pub fn write(&mut self) -> Result<()> { + self.p2pk65.write()?; + self.p2pk33.write()?; + self.p2pkh.write()?; + self.p2sh.write()?; + self.p2wpkh.write()?; + self.p2wsh.write()?; + self.p2tr.write()?; + self.p2a.write()?; + Ok(()) + } + pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { - use vecdb::AnyStoredVec; self.p2pk65.safe_write(exit)?; self.p2pk33.safe_write(exit)?; self.p2pkh.safe_write(exit)?; diff --git a/crates/brk_computer/src/stateful/cohorts/address.rs b/crates/brk_computer/src/stateful/cohorts/address.rs index 639a24a25..1da0f4a22 100644 --- a/crates/brk_computer/src/stateful/cohorts/address.rs +++ b/crates/brk_computer/src/stateful/cohorts/address.rs @@ -224,9 +224,9 @@ impl DynCohortVecs for AddressCohortVecs { Ok(()) } - fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { - self.height_to_addr_count.safe_write(exit)?; - self.metrics.safe_write(exit)?; + fn write_stateful_vecs(&mut self, height: Height) -> Result<()> { + self.height_to_addr_count.write()?; + self.metrics.write()?; if let Some(state) = self.state.as_mut() { state.inner.commit(height)?; diff --git a/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs b/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs index a89dbb3b3..3bf48ddde 100644 --- a/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs +++ b/crates/brk_computer/src/stateful/cohorts/address_cohorts.rs @@ -223,9 +223,9 @@ impl AddressCohorts { } /// Write stateful vectors for separate cohorts. - pub fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { + pub fn write_stateful_vecs(&mut self, height: Height) -> Result<()> { self.par_iter_separate_mut() - .try_for_each(|v| v.safe_write_stateful_vecs(height, exit)) + .try_for_each(|v| v.write_stateful_vecs(height)) } /// Get minimum height from all separate cohorts' height-indexed vectors. diff --git a/crates/brk_computer/src/stateful/cohorts/traits.rs b/crates/brk_computer/src/stateful/cohorts/traits.rs index 81e8de277..6b1277567 100644 --- a/crates/brk_computer/src/stateful/cohorts/traits.rs +++ b/crates/brk_computer/src/stateful/cohorts/traits.rs @@ -35,7 +35,7 @@ pub trait DynCohortVecs: Send + Sync { ) -> Result<()>; /// Write stateful vectors to disk. - fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()>; + fn write_stateful_vecs(&mut self, height: Height) -> Result<()>; /// First phase of post-processing computations. #[allow(clippy::too_many_arguments)] diff --git a/crates/brk_computer/src/stateful/cohorts/utxo.rs b/crates/brk_computer/src/stateful/cohorts/utxo.rs index 7ebaa5092..4aed89d60 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo.rs @@ -189,8 +189,8 @@ impl DynCohortVecs for UTXOCohortVecs { Ok(()) } - fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { - self.metrics.safe_write(exit)?; + fn write_stateful_vecs(&mut self, height: Height) -> Result<()> { + self.metrics.write()?; if let Some(state) = self.state.as_mut() { state.commit(height)?; diff --git a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs index 992c15ccf..76f9f6114 100644 --- a/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs +++ b/crates/brk_computer/src/stateful/cohorts/utxo_cohorts/mod.rs @@ -350,15 +350,15 @@ impl UTXOCohorts { } /// Write stateful vectors for separate and aggregate cohorts. - pub fn safe_write_stateful_vecs(&mut self, height: Height, exit: &Exit) -> Result<()> { + pub fn write_stateful_vecs(&mut self, height: Height) -> Result<()> { // Flush separate cohorts (includes metrics + state) self.par_iter_separate_mut() - .try_for_each(|v| v.safe_write_stateful_vecs(height, exit))?; + .try_for_each(|v| v.write_stateful_vecs(height))?; // Write aggregate cohorts' metrics (including price_percentiles) // Note: aggregate cohorts no longer maintain price_to_amount state for v in self.0.iter_aggregate_mut() { - v.metrics.safe_write(exit)?; + v.metrics.write()?; } Ok(()) } diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index a07504a85..03ab134fe 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -473,8 +473,6 @@ pub fn process_blocks( && height != Height::ZERO && height.to_usize() % FLUSH_INTERVAL == 0 { - let _lock = exit.lock(); - // Drop readers to release mmap handles drop(vr); @@ -488,8 +486,10 @@ pub fn process_blocks( loaded_updates, )?; + let _lock = exit.lock(); + // Write to disk (pure I/O) - no changes saved for periodic flushes - write(vecs, height, chain_state, false, exit)?; + write(vecs, height, chain_state, false)?; // Recreate readers vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); @@ -512,7 +512,7 @@ pub fn process_blocks( )?; // Write to disk (pure I/O) - save changes for rollback - write(vecs, last_height, chain_state, true, exit)?; + write(vecs, last_height, chain_state, true)?; } Ok(()) diff --git a/crates/brk_computer/src/stateful/compute/write.rs b/crates/brk_computer/src/stateful/compute/write.rs index b0078ed92..f29dffe9b 100644 --- a/crates/brk_computer/src/stateful/compute/write.rs +++ b/crates/brk_computer/src/stateful/compute/write.rs @@ -4,9 +4,12 @@ //! - `process_address_updates`: applies cached address changes to storage //! - `flush`: writes all data to disk +use std::time::Instant; + use brk_error::Result; use brk_types::Height; -use vecdb::{AnyStoredVec, Exit, GenericStoredVec, Stamp}; +use log::info; +use vecdb::{AnyStoredVec, GenericStoredVec, Stamp}; use crate::stateful::{ Vecs, @@ -61,19 +64,19 @@ pub fn write( height: Height, chain_state: &[BlockState], with_changes: bool, - exit: &Exit, ) -> Result<()> { + info!("Writing to disk..."); + let i = Instant::now(); + // Flush cohort states (separate + aggregate) - vecs.utxo_cohorts.safe_write_stateful_vecs(height, exit)?; - vecs.address_cohorts - .safe_write_stateful_vecs(height, exit)?; + vecs.utxo_cohorts.write_stateful_vecs(height)?; + vecs.address_cohorts.write_stateful_vecs(height)?; // Flush height-indexed vectors - vecs.height_to_unspendable_supply.safe_write(exit)?; - vecs.height_to_opreturn_supply.safe_write(exit)?; - vecs.addresstype_to_height_to_addr_count.safe_write(exit)?; - vecs.addresstype_to_height_to_empty_addr_count - .safe_write(exit)?; + vecs.height_to_unspendable_supply.write()?; + vecs.height_to_opreturn_supply.write()?; + vecs.addresstype_to_height_to_addr_count.write()?; + vecs.addresstype_to_height_to_empty_addr_count.write()?; // Flush large vecs in parallel let stamp = Stamp::from(height); @@ -100,5 +103,7 @@ pub fn write( vecs.chain_state .stamped_write_maybe_with_changes(stamp, with_changes)?; + info!("Wrote in {:?}", i.elapsed()); + Ok(()) } diff --git a/crates/brk_computer/src/stateful/metrics/activity.rs b/crates/brk_computer/src/stateful/metrics/activity.rs index 0aa30537a..7a533e17a 100644 --- a/crates/brk_computer/src/stateful/metrics/activity.rs +++ b/crates/brk_computer/src/stateful/metrics/activity.rs @@ -114,10 +114,10 @@ impl ActivityMetrics { } /// Write height-indexed vectors to disk. - pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { - self.height_to_sent.safe_write(exit)?; - self.height_to_satblocks_destroyed.safe_write(exit)?; - self.height_to_satdays_destroyed.safe_write(exit)?; + pub fn write(&mut self) -> Result<()> { + self.height_to_sent.write()?; + self.height_to_satblocks_destroyed.write()?; + self.height_to_satdays_destroyed.write()?; Ok(()) } diff --git a/crates/brk_computer/src/stateful/metrics/mod.rs b/crates/brk_computer/src/stateful/metrics/mod.rs index b30a9eef2..00795d2b1 100644 --- a/crates/brk_computer/src/stateful/metrics/mod.rs +++ b/crates/brk_computer/src/stateful/metrics/mod.rs @@ -106,20 +106,20 @@ impl CohortMetrics { } /// Write height-indexed vectors to disk. - pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { - self.supply.safe_write(exit)?; - self.activity.safe_write(exit)?; + pub fn write(&mut self) -> Result<()> { + self.supply.write()?; + self.activity.write()?; if let Some(realized) = self.realized.as_mut() { - realized.safe_write(exit)?; + realized.write()?; } if let Some(unrealized) = self.unrealized.as_mut() { - unrealized.safe_write(exit)?; + unrealized.write()?; } if let Some(price_paid) = self.price_paid.as_mut() { - price_paid.safe_write(exit)?; + price_paid.write()?; } Ok(()) diff --git a/crates/brk_computer/src/stateful/metrics/price_paid.rs b/crates/brk_computer/src/stateful/metrics/price_paid.rs index afa6959c2..b3cf8f10e 100644 --- a/crates/brk_computer/src/stateful/metrics/price_paid.rs +++ b/crates/brk_computer/src/stateful/metrics/price_paid.rs @@ -112,11 +112,11 @@ impl PricePaidMetrics { } /// Write height-indexed vectors to disk. - pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { - self.height_to_min_price_paid.safe_write(exit)?; - self.height_to_max_price_paid.safe_write(exit)?; + pub fn write(&mut self) -> Result<()> { + self.height_to_min_price_paid.write()?; + self.height_to_max_price_paid.write()?; if let Some(price_percentiles) = self.price_percentiles.as_mut() { - price_percentiles.safe_write(exit)?; + price_percentiles.write()?; } Ok(()) } diff --git a/crates/brk_computer/src/stateful/metrics/realized.rs b/crates/brk_computer/src/stateful/metrics/realized.rs index 4dd0fa676..2311bac5f 100644 --- a/crates/brk_computer/src/stateful/metrics/realized.rs +++ b/crates/brk_computer/src/stateful/metrics/realized.rs @@ -433,17 +433,17 @@ impl RealizedMetrics { } /// Write height-indexed vectors to disk. - pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { - self.height_to_realized_cap.safe_write(exit)?; - self.height_to_realized_profit.safe_write(exit)?; - self.height_to_realized_loss.safe_write(exit)?; - self.height_to_value_created.safe_write(exit)?; - self.height_to_value_destroyed.safe_write(exit)?; + pub fn write(&mut self) -> Result<()> { + self.height_to_realized_cap.write()?; + self.height_to_realized_profit.write()?; + self.height_to_realized_loss.write()?; + self.height_to_value_created.write()?; + self.height_to_value_destroyed.write()?; if let Some(v) = self.height_to_adjusted_value_created.as_mut() { - v.safe_write(exit)?; + v.write()?; } if let Some(v) = self.height_to_adjusted_value_destroyed.as_mut() { - v.safe_write(exit)?; + v.write()?; } Ok(()) } diff --git a/crates/brk_computer/src/stateful/metrics/supply.rs b/crates/brk_computer/src/stateful/metrics/supply.rs index ddbcbf4d0..6ebe4ca6d 100644 --- a/crates/brk_computer/src/stateful/metrics/supply.rs +++ b/crates/brk_computer/src/stateful/metrics/supply.rs @@ -131,9 +131,9 @@ impl SupplyMetrics { } /// Write height-indexed vectors to disk. - pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { - self.height_to_supply.safe_write(exit)?; - self.height_to_utxo_count.safe_write(exit)?; + pub fn write(&mut self) -> Result<()> { + self.height_to_supply.write()?; + self.height_to_utxo_count.write()?; Ok(()) } diff --git a/crates/brk_computer/src/stateful/metrics/unrealized.rs b/crates/brk_computer/src/stateful/metrics/unrealized.rs index a6aadc245..2dd2a3bbb 100644 --- a/crates/brk_computer/src/stateful/metrics/unrealized.rs +++ b/crates/brk_computer/src/stateful/metrics/unrealized.rs @@ -219,15 +219,15 @@ impl UnrealizedMetrics { } /// Write height-indexed vectors to disk. - pub fn safe_write(&mut self, exit: &Exit) -> Result<()> { - self.height_to_supply_in_profit.safe_write(exit)?; - self.height_to_supply_in_loss.safe_write(exit)?; - self.height_to_unrealized_profit.safe_write(exit)?; - self.height_to_unrealized_loss.safe_write(exit)?; - self.dateindex_to_supply_in_profit.safe_write(exit)?; - self.dateindex_to_supply_in_loss.safe_write(exit)?; - self.dateindex_to_unrealized_profit.safe_write(exit)?; - self.dateindex_to_unrealized_loss.safe_write(exit)?; + pub fn write(&mut self) -> Result<()> { + self.height_to_supply_in_profit.write()?; + self.height_to_supply_in_loss.write()?; + self.height_to_unrealized_profit.write()?; + self.height_to_unrealized_loss.write()?; + self.dateindex_to_supply_in_profit.write()?; + self.dateindex_to_supply_in_loss.write()?; + self.dateindex_to_unrealized_profit.write()?; + self.dateindex_to_unrealized_loss.write()?; Ok(()) } diff --git a/crates/brk_fetcher/README.md b/crates/brk_fetcher/README.md index 15967f395..f3c0f24ad 100644 --- a/crates/brk_fetcher/README.md +++ b/crates/brk_fetcher/README.md @@ -16,7 +16,7 @@ Fetch OHLC (Open/High/Low/Close) price data from Binance, Kraken, or BRK's own A ## Core API -```rust +```rust,ignore let mut fetcher = Fetcher::import(true, Some(&hars_path))?; // Daily price diff --git a/crates/brk_indexer/README.md b/crates/brk_indexer/README.md index 0f4e01a92..26441c005 100644 --- a/crates/brk_indexer/README.md +++ b/crates/brk_indexer/README.md @@ -51,6 +51,15 @@ let blockhash = indexer.vecs.block.height_to_blockhash.get(height)?; 5. **Finalize**: Sequential store updates, UTXO set mutations 6. **Commit**: Periodic flush to disk +## Performance + +| Machine | Time | Disk | Peak Disk | Memory | Peak Memory | +|---------|------|------|-----------|--------|-------------| +| MBP M3 Pro (36GB, internal SSD) | 3.1h | 233 GB | 307 GB | 5.5 GB | 11 GB | +| Mac Mini M4 (16GB, external SSD) | 4.9h | 233 GB | 303 GB | 5.4 GB | 11 GB | + +Full benchmark data: [`/benches/brk_indexer`](/benches/brk_indexer) + ## Built On - `brk_iterator` for block iteration diff --git a/crates/brk_logger/examples/main.rs b/crates/brk_logger/examples/log.rs similarity index 100% rename from crates/brk_logger/examples/main.rs rename to crates/brk_logger/examples/log.rs diff --git a/crates/brk_store/examples/main.rs b/crates/brk_store/examples/store.rs similarity index 100% rename from crates/brk_store/examples/main.rs rename to crates/brk_store/examples/store.rs