diff --git a/Cargo.lock b/Cargo.lock index 60a489b5a..eab7848a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4192,9 +4192,9 @@ dependencies = [ [[package]] name = "rawdb" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abd96eb8d340052584b01120ce302e283b9176b278b5b5944a5f0fc00493f861" +checksum = "b50bc582b92390d3efc2dabb509f77c1d8078eaa07c2bb3528bd414c0d337425" dependencies = [ "libc", "log", @@ -5384,9 +5384,9 @@ checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" [[package]] name = "vecdb" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "371b5a52a650ac0661a56ceae92e8af88e7930448ae9d3bb29c68a71437dc5d4" +checksum = "09567d773f1f8fd20bfe9822f5c1d6c1bb148f1edc88fe2676ac93b692ad561f" dependencies = [ "ctrlc", "log", @@ -5404,9 +5404,9 @@ dependencies = [ [[package]] name = "vecdb_derive" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f035c1c36fae53aeed226bdda0009292294986ea0ed6a932535cacdbec08d73d" +checksum = "5297336eb5d686ea22acddd2368c1b97d131f035c6d9a736d61087a318546e41" dependencies = [ "quote", "syn 2.0.111", diff --git a/Cargo.toml b/Cargo.toml index bd0afb915..b80ef281c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ serde_derive = "1.0.228" serde_json = { version = "1.0.145", features = ["float_roundtrip"] } smallvec = "1.15.1" tokio = { version = "1.48.0", features = ["rt-multi-thread"] } -vecdb = { version = "0.4.1", features = ["derive", "serde_json", "pco"] } +vecdb = { version = "0.4.2", features = ["derive", "serde_json", "pco"] } # vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco"] } # vecdb = { git = "https://github.com/anydb-rs/anydb", features = ["derive", "serde_json", "pco"] } diff --git a/crates/brk_computer/src/stateful/address/any_address_indexes.rs b/crates/brk_computer/src/stateful/address/any_address_indexes.rs index 24535dd78..271786f2b 100644 --- a/crates/brk_computer/src/stateful/address/any_address_indexes.rs +++ b/crates/brk_computer/src/stateful/address/any_address_indexes.rs @@ -76,9 +76,9 @@ macro_rules! define_any_address_indexes_vecs { Ok(()) } - /// Flush all address types with stamp. - pub fn flush(&mut self, stamp: Stamp, with_changes: bool) -> Result<()> { - $(self.$field.stamped_flush_maybe_with_changes(stamp, with_changes)?;)* + /// Write all address types with stamp. + pub fn write(&mut self, stamp: Stamp, with_changes: bool) -> Result<()> { + $(self.$field.stamped_write_maybe_with_changes(stamp, with_changes)?;)* Ok(()) } } diff --git a/crates/brk_computer/src/stateful/address/data.rs b/crates/brk_computer/src/stateful/address/data.rs index d99bbc68e..f8310d6e7 100644 --- a/crates/brk_computer/src/stateful/address/data.rs +++ b/crates/brk_computer/src/stateful/address/data.rs @@ -56,11 +56,11 @@ impl AddressesDataVecs { } /// Flush both loaded and empty data with stamp. - pub fn flush(&mut self, stamp: Stamp, with_changes: bool) -> Result<()> { + pub fn write(&mut self, stamp: Stamp, with_changes: bool) -> Result<()> { self.loaded - .stamped_flush_maybe_with_changes(stamp, with_changes)?; + .stamped_write_maybe_with_changes(stamp, with_changes)?; self.empty - .stamped_flush_maybe_with_changes(stamp, with_changes)?; + .stamped_write_maybe_with_changes(stamp, with_changes)?; Ok(()) } } diff --git a/crates/brk_computer/src/stateful/compute/block_loop.rs b/crates/brk_computer/src/stateful/compute/block_loop.rs index 619f923b6..8c1d81d31 100644 --- a/crates/brk_computer/src/stateful/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful/compute/block_loop.rs @@ -21,7 +21,7 @@ use crate::{ chain, indexes, price, stateful::{ address::AddressTypeToAddressCount, - compute::flush::{flush, process_address_updates}, + compute::write::{process_address_updates, write}, process::{ AddressLookup, EmptyAddressDataWithSource, InputsResult, LoadedAddressDataWithSource, build_txoutindex_to_height_map, process_inputs, process_outputs, process_received, @@ -502,28 +502,30 @@ pub fn process_blocks( std::mem::take(&mut loaded_cache), )?; - // Flush to disk (pure I/O) - flush(vecs, height, chain_state, exit)?; + // Flush to disk (pure I/O) - no changes saved for periodic flushes + write(vecs, height, chain_state, false, exit)?; // Recreate readers after flush to pick up new data vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); } } - // Final flush - let _lock = exit.lock(); - drop(vr); + // Final flush - always save changes for rollback support + { + let _lock = exit.lock(); + drop(vr); - // Process address updates (mutations) - process_address_updates( - &mut vecs.addresses_data, - &mut vecs.any_address_indexes, - std::mem::take(&mut empty_cache), - std::mem::take(&mut loaded_cache), - )?; + // Process address updates (mutations) + process_address_updates( + &mut vecs.addresses_data, + &mut vecs.any_address_indexes, + std::mem::take(&mut empty_cache), + std::mem::take(&mut loaded_cache), + )?; - // Flush to disk (pure I/O) - flush(vecs, last_height, chain_state, exit)?; + // Flush to disk (pure I/O) - save changes for rollback + write(vecs, last_height, chain_state, true, exit)?; + } Ok(()) } diff --git a/crates/brk_computer/src/stateful/compute/mod.rs b/crates/brk_computer/src/stateful/compute/mod.rs index e8ce132f7..3d993109a 100644 --- a/crates/brk_computer/src/stateful/compute/mod.rs +++ b/crates/brk_computer/src/stateful/compute/mod.rs @@ -10,9 +10,9 @@ pub mod aggregates; mod block_loop; mod context; -mod flush; mod readers; mod recover; +mod write; pub use block_loop::process_blocks; pub use context::ComputeContext; diff --git a/crates/brk_computer/src/stateful/compute/flush.rs b/crates/brk_computer/src/stateful/compute/write.rs similarity index 64% rename from crates/brk_computer/src/stateful/compute/flush.rs rename to crates/brk_computer/src/stateful/compute/write.rs index e0f9dc554..694d68ae4 100644 --- a/crates/brk_computer/src/stateful/compute/flush.rs +++ b/crates/brk_computer/src/stateful/compute/write.rs @@ -75,21 +75,27 @@ pub fn process_address_updates( /// Writes all accumulated data: /// - Cohort stateful vectors /// - Height-indexed vectors -/// - Address indexes and data -/// - Transaction output index mappings +/// - Address indexes and data (parallel) +/// - Transaction output index mappings (parallel) /// - Chain state -pub fn flush( +/// +/// Set `with_changes=true` near chain tip to enable rollback support. +pub fn write( vecs: &mut Vecs, height: Height, chain_state: &[BlockState], + with_changes: bool, exit: &Exit, ) -> Result<()> { - info!("Flushing at height {}...", height); + let t0 = Instant::now(); // Flush cohort states (separate + aggregate) vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?; + let t1 = Instant::now(); + vecs.address_cohorts .safe_flush_stateful_vecs(height, exit)?; + let t2 = Instant::now(); // Flush height-indexed vectors vecs.height_to_unspendable_supply.safe_write(exit)?; @@ -97,22 +103,57 @@ pub fn flush( vecs.addresstype_to_height_to_addr_count.safe_flush(exit)?; vecs.addresstype_to_height_to_empty_addr_count .safe_flush(exit)?; + let t3 = Instant::now(); - // Flush address data + // Flush large vecs in parallel let stamp = Stamp::from(height); - vecs.any_address_indexes.flush(stamp, true)?; - vecs.addresses_data.flush(stamp, true)?; + let any_address_indexes = &mut vecs.any_address_indexes; + let addresses_data = &mut vecs.addresses_data; + let txoutindex_to_txinindex = &mut vecs.txoutindex_to_txinindex; - // Flush txoutindex_to_txinindex - vecs.txoutindex_to_txinindex - .stamped_flush_with_changes(height.into())?; + let ((addr_result, addr_idx_time, addr_data_time), (txout_result, txout_time)) = rayon::join( + || { + let t0 = Instant::now(); + let r1 = any_address_indexes.write(stamp, with_changes); + let t1 = Instant::now(); + let r2 = addresses_data.write(stamp, with_changes); + let t2 = Instant::now(); + let r = r1.and(r2); + (r, t1 - t0, t2 - t1) + }, + || { + let t = Instant::now(); + let r = txoutindex_to_txinindex.stamped_write_maybe_with_changes(stamp, with_changes); + (r, t.elapsed()) + }, + ); + addr_result?; + txout_result?; + let t4 = Instant::now(); + info!( + " parallel breakdown: addr_idx={:?} addr_data={:?} txout={:?}", + addr_idx_time, addr_data_time, txout_time + ); // Sync in-memory chain_state to persisted and flush vecs.chain_state.truncate_if_needed(Height::ZERO)?; for block_state in chain_state { vecs.chain_state.push(block_state.supply.clone()); } - vecs.chain_state.stamped_flush_with_changes(height.into())?; + vecs.chain_state + .stamped_write_maybe_with_changes(stamp, with_changes)?; + let t5 = Instant::now(); + + info!( + "flush: utxo={:?} addr={:?} height={:?} parallel={:?} chain={:?} total={:?} (with_changes={})", + t1 - t0, + t2 - t1, + t3 - t2, + t4 - t3, + t5 - t4, + t5 - t0, + with_changes + ); Ok(()) } diff --git a/crates/brk_computer/src/stateful/process/empty_addresses.rs b/crates/brk_computer/src/stateful/process/empty_addresses.rs index f42cec802..b0e44c519 100644 --- a/crates/brk_computer/src/stateful/process/empty_addresses.rs +++ b/crates/brk_computer/src/stateful/process/empty_addresses.rs @@ -1,5 +1,5 @@ use brk_error::Result; -use brk_types::AnyAddressIndex; +use brk_types::{AnyAddressIndex, EmptyAddressData, EmptyAddressIndex, LoadedAddressIndex, OutputType, TypeIndex}; use super::EmptyAddressDataWithSource; use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs}; @@ -10,36 +10,56 @@ use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs}; /// - New empty address: push to empty storage /// - Updated empty address (was empty): update in place /// - Transition loaded -> empty: delete from loaded, push to empty +/// +/// Optimized to batch operations: deletes first (creates holes), then updates, then pushes. pub fn process_empty_addresses( addresses_data: &mut AddressesDataVecs, empty_updates: AddressTypeToTypeIndexMap, ) -> Result> { - let mut result = AddressTypeToTypeIndexMap::default(); + // Estimate capacity from input size + let total: usize = empty_updates.iter().map(|(_, m)| m.len()).sum(); - for (address_type, sorted) in empty_updates.into_sorted_iter() { - for (typeindex, source) in sorted { + // Collect operations by type (no sorting needed) + let mut updates: Vec<(EmptyAddressIndex, EmptyAddressData)> = Vec::with_capacity(total); + let mut deletes: Vec = Vec::with_capacity(total); + let mut pushes: Vec<(OutputType, TypeIndex, EmptyAddressData)> = Vec::with_capacity(total); + + for (address_type, items) in empty_updates.into_iter() { + for (typeindex, source) in items { match source { EmptyAddressDataWithSource::New(data) => { - let index = addresses_data.empty.fill_first_hole_or_push(data)?; - result - .get_mut(address_type) - .unwrap() - .insert(typeindex, AnyAddressIndex::from(index)); + pushes.push((address_type, typeindex, data)); } EmptyAddressDataWithSource::FromEmpty(index, data) => { - addresses_data.empty.update(index, data)?; + updates.push((index, data)); } EmptyAddressDataWithSource::FromLoaded(loaded_index, data) => { - addresses_data.loaded.delete(loaded_index); - let empty_index = addresses_data.empty.fill_first_hole_or_push(data)?; - result - .get_mut(address_type) - .unwrap() - .insert(typeindex, AnyAddressIndex::from(empty_index)); + deletes.push(loaded_index); + pushes.push((address_type, typeindex, data)); } } } } + // Phase 1: All deletes (creates holes in loaded vec) + for loaded_index in deletes { + addresses_data.loaded.delete(loaded_index); + } + + // Phase 2: All updates (in-place in empty vec) + for (index, data) in updates { + addresses_data.empty.update(index, data)?; + } + + // Phase 3: All pushes (fills holes in empty vec, then grows) + let mut result = AddressTypeToTypeIndexMap::default(); + for (address_type, typeindex, data) in pushes { + let index = addresses_data.empty.fill_first_hole_or_push(data)?; + result + .get_mut(address_type) + .unwrap() + .insert(typeindex, AnyAddressIndex::from(index)); + } + Ok(result) } diff --git a/crates/brk_computer/src/stateful/process/loaded_addresses.rs b/crates/brk_computer/src/stateful/process/loaded_addresses.rs index bcd45afa5..f527df890 100644 --- a/crates/brk_computer/src/stateful/process/loaded_addresses.rs +++ b/crates/brk_computer/src/stateful/process/loaded_addresses.rs @@ -1,5 +1,5 @@ use brk_error::Result; -use brk_types::AnyAddressIndex; +use brk_types::{AnyAddressIndex, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex, OutputType, TypeIndex}; use super::LoadedAddressDataWithSource; use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs}; @@ -10,36 +10,56 @@ use crate::stateful::{AddressTypeToTypeIndexMap, AddressesDataVecs}; /// - New loaded address: push to loaded storage /// - Updated loaded address (was loaded): update in place /// - Transition empty -> loaded: delete from empty, push to loaded +/// +/// Optimized to batch operations: deletes first (creates holes), then updates, then pushes. pub fn process_loaded_addresses( addresses_data: &mut AddressesDataVecs, loaded_updates: AddressTypeToTypeIndexMap, ) -> Result> { - let mut result = AddressTypeToTypeIndexMap::default(); + // Estimate capacity from input size + let total: usize = loaded_updates.iter().map(|(_, m)| m.len()).sum(); - for (address_type, sorted) in loaded_updates.into_sorted_iter() { - for (typeindex, source) in sorted { + // Collect operations by type (no sorting needed) + let mut updates: Vec<(LoadedAddressIndex, LoadedAddressData)> = Vec::with_capacity(total); + let mut deletes: Vec = Vec::with_capacity(total); + let mut pushes: Vec<(OutputType, TypeIndex, LoadedAddressData)> = Vec::with_capacity(total); + + for (address_type, items) in loaded_updates.into_iter() { + for (typeindex, source) in items { match source { LoadedAddressDataWithSource::New(data) => { - let index = addresses_data.loaded.fill_first_hole_or_push(data)?; - result - .get_mut(address_type) - .unwrap() - .insert(typeindex, AnyAddressIndex::from(index)); + pushes.push((address_type, typeindex, data)); } LoadedAddressDataWithSource::FromLoaded(index, data) => { - addresses_data.loaded.update(index, data)?; + updates.push((index, data)); } LoadedAddressDataWithSource::FromEmpty(empty_index, data) => { - addresses_data.empty.delete(empty_index); - let loaded_index = addresses_data.loaded.fill_first_hole_or_push(data)?; - result - .get_mut(address_type) - .unwrap() - .insert(typeindex, AnyAddressIndex::from(loaded_index)); + deletes.push(empty_index); + pushes.push((address_type, typeindex, data)); } } } } + // Phase 1: All deletes (creates holes in empty vec) + for empty_index in deletes { + addresses_data.empty.delete(empty_index); + } + + // Phase 2: All updates (in-place in loaded vec) + for (index, data) in updates { + addresses_data.loaded.update(index, data)?; + } + + // Phase 3: All pushes (fills holes in loaded vec, then grows) + let mut result = AddressTypeToTypeIndexMap::default(); + for (address_type, typeindex, data) in pushes { + let index = addresses_data.loaded.fill_first_hole_or_push(data)?; + result + .get_mut(address_type) + .unwrap() + .insert(typeindex, AnyAddressIndex::from(index)); + } + Ok(result) } diff --git a/crates/brk_indexer/src/vecs/mod.rs b/crates/brk_indexer/src/vecs/mod.rs index 2d926c6af..beead49b7 100644 --- a/crates/brk_indexer/src/vecs/mod.rs +++ b/crates/brk_indexer/src/vecs/mod.rs @@ -4,7 +4,7 @@ use brk_error::Result; use brk_traversable::Traversable; use brk_types::{AddressBytes, AddressHash, Height, OutputType, TypeIndex, Version}; use rayon::prelude::*; -use vecdb::{AnyStoredVec, Database, Reader, Stamp, PAGE_SIZE}; +use vecdb::{AnyStoredVec, Database, PAGE_SIZE, Reader, Stamp}; mod address; mod blocks; @@ -78,11 +78,8 @@ impl Vecs { self.txin .truncate(starting_indexes.height, starting_indexes.txinindex, stamp)?; - self.txout.truncate( - starting_indexes.height, - starting_indexes.txoutindex, - stamp, - )?; + self.txout + .truncate(starting_indexes.height, starting_indexes.txoutindex, stamp)?; self.address.truncate( starting_indexes.height, @@ -115,7 +112,8 @@ impl Vecs { typeindex: TypeIndex, reader: &Reader, ) -> Result> { - self.address.get_bytes_by_type(addresstype, typeindex, reader) + self.address + .get_bytes_by_type(addresstype, typeindex, reader) } pub fn push_bytes_if_needed(&mut self, index: TypeIndex, bytes: AddressBytes) -> Result<()> { @@ -125,7 +123,7 @@ impl Vecs { pub fn flush(&mut self, height: Height) -> Result<()> { self.iter_mut_any_stored_vec() .par_bridge() - .try_for_each(|vec| vec.stamped_flush(Stamp::from(height)))?; + .try_for_each(|vec| vec.stamped_write(Stamp::from(height)))?; self.db.flush()?; Ok(()) }