diff --git a/crates/brk_indexer/Cargo.toml b/crates/brk_indexer/Cargo.toml index 9651fa938..1ff8e3285 100644 --- a/crates/brk_indexer/Cargo.toml +++ b/crates/brk_indexer/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true license.workspace = true homepage.workspace = true repository.workspace = true +exclude = ["examples/"] [dependencies] bitcoin = { workspace = true } @@ -14,7 +15,7 @@ brk_cohort = { workspace = true } brk_iterator = { workspace = true } brk_logger = { workspace = true } brk_reader = { workspace = true } -brk_rpc = { workspace = true } +brk_rpc = { workspace = true, features = ["corepc"] } brk_store = { workspace = true } brk_types = { workspace = true } brk_traversable = { workspace = true } diff --git a/crates/brk_indexer/examples/indexer_read.rs b/crates/brk_indexer/examples/indexer_read.rs index f73e9f651..eeecf1e3c 100644 --- a/crates/brk_indexer/examples/indexer_read.rs +++ b/crates/brk_indexer/examples/indexer_read.rs @@ -1,5 +1,6 @@ use brk_error::Result; use brk_indexer::Indexer; +use vecdb::ReadableVec; // use brk_types::Sats; use std::{fs, path::Path}; @@ -14,25 +15,14 @@ fn main() -> Result<()> { // let mut sum = Sats::ZERO; // let mut count: usize = 0; - // for value in indexer.vecs.txoutindex_to_value.clean_iter()? { + // for value in indexer.vecs.txoutindex_to_value.clean_iter() { // sum += value; // count += 1; // } // println!("sum = {sum}, count = {count}"); - dbg!( - indexer - .vecs - .outputs - .value - .iter()? - .enumerate() - .take(200) - // .filter(|(_, op)| !op.is_coinbase()) - // .map(|(i, op)| (i, op.txindex(), op.vout())) - .collect::>() - ); + dbg!(indexer.vecs.outputs.value.collect_range(0, 200)); Ok(()) } diff --git a/crates/brk_indexer/examples/indexer_read_speed.rs b/crates/brk_indexer/examples/indexer_read_speed.rs index c2ab372db..78ed2897c 100644 --- a/crates/brk_indexer/examples/indexer_read_speed.rs +++ b/crates/brk_indexer/examples/indexer_read_speed.rs @@ -3,16 +3,17 @@ use std::{fs, path::Path, time::Instant}; use brk_error::Result; use brk_indexer::Indexer; use brk_types::Sats; +use vecdb::ReadableVec; fn run_benchmark(indexer: &Indexer) -> (Sats, std::time::Duration, usize) { let start = Instant::now(); let mut sum = Sats::ZERO; let mut count = 0; - for value in indexer.vecs.outputs.value.clean_iter().unwrap() { + indexer.vecs.outputs.value.for_each(|value| { sum += value; count += 1; - } + }); let duration = start.elapsed(); (sum, duration, count) diff --git a/crates/brk_indexer/src/indexes.rs b/crates/brk_indexer/src/indexes.rs index a0059699e..84c8da079 100644 --- a/crates/brk_indexer/src/indexes.rs +++ b/crates/brk_indexer/src/indexes.rs @@ -1,7 +1,7 @@ use brk_error::Result; use brk_types::Height; use tracing::{debug, info}; -use vecdb::{GenericStoredVec, IterableStoredVec, IterableVec, VecIndex, VecValue}; +use vecdb::{AnyStoredVec, WritableVec, PcoVec, PcoVecValue, ReadableVec, VecIndex, VecValue}; use crate::{Stores, Vecs}; @@ -208,12 +208,12 @@ impl IndexesExt for Indexes { } pub fn starting_index( - height_to_index: &impl IterableStoredVec, - index_to_else: &impl IterableVec, + height_to_index: &PcoVec, + index_to_else: &impl ReadableVec, starting_height: Height, ) -> Option where - I: VecValue + VecIndex + From, + I: VecIndex + PcoVecValue + From, T: VecValue, { let h = Height::from(height_to_index.stamp()); @@ -222,6 +222,6 @@ where } else if h + 1_u32 == starting_height { Some(I::from(index_to_else.len())) } else { - height_to_index.iter().get(starting_height) + height_to_index.collect_one(starting_height.to_usize()) } } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 206e293ae..a897064aa 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -7,7 +7,7 @@ use brk_iterator::Blocks; use brk_rpc::Client; use brk_types::Height; use tracing::{debug, info}; -use vecdb::Exit; +use vecdb::{Exit, ReadableVec}; mod constants; mod indexes; mod processor; @@ -97,7 +97,7 @@ impl Indexer { ) -> Result { debug!("Starting indexing..."); - let last_blockhash = self.vecs.blocks.blockhash.iter()?.last(); + let last_blockhash = self.vecs.blocks.blockhash.collect_last(); debug!("Last block hash found."); let (starting_indexes, prev_hash) = if let Some(hash) = last_blockhash { @@ -193,16 +193,21 @@ impl Indexer { // Phase 2: Compute TXIDs in parallel let txs = processor.compute_txids()?; - // Phase 3: Process inputs in parallel - let txins = processor.process_inputs(&txs)?; + // Phase 3+5: Process inputs and outputs in parallel + // They access different stores (txidprefix vs addresshash) and + // different vecs, so running concurrently hides latency of the + // shorter phase behind the longer one. + let (txins_result, txouts_result) = rayon::join( + || processor.process_inputs(&txs), + || processor.process_outputs(), + ); + let txins = txins_result?; + let txouts = txouts_result?; // Phase 4: Collect same-block spent outpoints let same_block_spent_outpoints = BlockProcessor::collect_same_block_spent_outpoints(&txins); - // Phase 5: Process outputs in parallel - let txouts = processor.process_outputs()?; - let tx_len = block.txdata.len(); let inputs_len = txins.len(); let outputs_len = txouts.len(); diff --git a/crates/brk_indexer/src/processor/metadata.rs b/crates/brk_indexer/src/processor/metadata.rs index 4f32e7f0d..44efa1f9a 100644 --- a/crates/brk_indexer/src/processor/metadata.rs +++ b/crates/brk_indexer/src/processor/metadata.rs @@ -1,7 +1,7 @@ use brk_error::{Error, Result}; use brk_types::{BlockHashPrefix, Timestamp}; use tracing::error; -use vecdb::GenericStoredVec; +use vecdb::WritableVec; use super::BlockProcessor; use crate::IndexesExt; @@ -26,12 +26,11 @@ impl BlockProcessor<'_> { self.stores .blockhashprefix_to_height - .insert_if_needed(blockhash_prefix, height, height); + .insert(blockhash_prefix, height); - self.stores.height_to_coinbase_tag.insert_if_needed( + self.stores.height_to_coinbase_tag.insert( height, self.block.coinbase_tag().into(), - height, ); self.vecs @@ -46,14 +45,15 @@ impl BlockProcessor<'_> { .blocks .timestamp .checked_push(height, Timestamp::from(self.block.header.time))?; + let (block_total_size, block_weight) = self.block.total_size_and_weight(); self.vecs .blocks .total_size - .checked_push(height, self.block.total_size().into())?; + .checked_push(height, block_total_size.into())?; self.vecs .blocks .weight - .checked_push(height, self.block.weight().into())?; + .checked_push(height, block_weight.into())?; Ok(()) } diff --git a/crates/brk_indexer/src/processor/tx.rs b/crates/brk_indexer/src/processor/tx.rs index 4ec0e446c..9dc18437b 100644 --- a/crates/brk_indexer/src/processor/tx.rs +++ b/crates/brk_indexer/src/processor/tx.rs @@ -1,7 +1,7 @@ use brk_error::{Error, Result}; use brk_types::{StoredBool, TxIndex, Txid, TxidPrefix}; use rayon::prelude::*; -use vecdb::{AnyVec, GenericStoredVec, TypedVecIterator, likely}; +use vecdb::{AnyVec, WritableVec, likely}; use crate::constants::DUPLICATE_TXIDS; @@ -9,8 +9,7 @@ use super::{BlockProcessor, ComputedTx}; impl<'a> BlockProcessor<'a> { pub fn compute_txids(&self) -> Result>> { - let will_check_collisions = - self.check_collisions && self.stores.txidprefix_to_txindex.needs(self.height); + let will_check_collisions = self.check_collisions; let base_txindex = self.indexes.txindex; self.block @@ -36,6 +35,8 @@ impl<'a> BlockProcessor<'a> { txid, txid_prefix, prev_txindex_opt, + base_size: tx.base_size() as u32, + total_size: tx.total_size() as u32, }) }) .collect() @@ -47,7 +48,6 @@ impl<'a> BlockProcessor<'a> { return Ok(()); } - let mut txindex_to_txid_iter = self.vecs.transactions.txid.into_iter(); for ct in txs.iter() { let Some(prev_txindex) = ct.prev_txindex_opt else { continue; @@ -58,8 +58,11 @@ impl<'a> BlockProcessor<'a> { } let len = self.vecs.transactions.txid.len(); - let prev_txid = txindex_to_txid_iter - .get(prev_txindex) + let prev_txid = self + .vecs + .transactions + .txid + .get_pushed_or_read(prev_txindex, &self.readers.txid) .ok_or(Error::Internal("Missing txid for txindex")) .inspect_err(|_| { dbg!(ct.txindex, len); @@ -81,11 +84,9 @@ impl<'a> BlockProcessor<'a> { for ct in txs { if ct.prev_txindex_opt.is_none() { - self.stores.txidprefix_to_txindex.insert_if_needed( - ct.txid_prefix, - ct.txindex, - height, - ); + self.stores + .txidprefix_to_txindex + .insert(ct.txid_prefix, ct.txindex); } self.vecs @@ -107,11 +108,11 @@ impl<'a> BlockProcessor<'a> { self.vecs .transactions .base_size - .checked_push(ct.txindex, ct.tx.base_size().into())?; + .checked_push(ct.txindex, ct.base_size.into())?; self.vecs .transactions .total_size - .checked_push(ct.txindex, ct.tx.total_size().into())?; + .checked_push(ct.txindex, ct.total_size.into())?; self.vecs .transactions .is_explicitly_rbf diff --git a/crates/brk_indexer/src/processor/txin.rs b/crates/brk_indexer/src/processor/txin.rs index 4700ba788..59ba39887 100644 --- a/crates/brk_indexer/src/processor/txin.rs +++ b/crates/brk_indexer/src/processor/txin.rs @@ -5,33 +5,30 @@ use brk_types::{ }; use rayon::prelude::*; use rustc_hash::{FxHashMap, FxHashSet}; -use vecdb::GenericStoredVec; +use vecdb::WritableVec; use super::{BlockProcessor, ComputedTx, InputSource, SameBlockOutputInfo}; impl<'a> BlockProcessor<'a> { - pub fn process_inputs<'c>( + pub fn process_inputs( &self, - txs: &[ComputedTx<'c>], - ) -> Result)>> { + txs: &[ComputedTx], + ) -> Result> { let txid_prefix_to_txindex: FxHashMap<_, _> = - txs.iter().map(|ct| (ct.txid_prefix, &ct.txindex)).collect(); + txs.iter().map(|ct| (ct.txid_prefix, ct.txindex)).collect(); let base_txindex = self.indexes.txindex; let base_txinindex = self.indexes.txinindex; - let txins = self - .block - .txdata - .iter() - .enumerate() - .flat_map(|(index, tx)| { - tx.input - .iter() - .enumerate() - .map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx)) - }) - .collect::>() + let total_inputs: usize = self.block.txdata.iter().map(|tx| tx.input.len()).sum(); + let mut items = Vec::with_capacity(total_inputs); + for (index, tx) in self.block.txdata.iter().enumerate() { + for (vin, txin) in tx.input.iter().enumerate() { + items.push((TxIndex::from(index), Vin::from(vin), txin, tx)); + } + } + + let txins = items .into_par_iter() .enumerate() .map( @@ -44,7 +41,6 @@ impl<'a> BlockProcessor<'a> { txinindex, InputSource::SameBlock { txindex, - txin, vin, outpoint: OutPoint::COINBASE, }, @@ -56,44 +52,41 @@ impl<'a> BlockProcessor<'a> { let txid_prefix = TxidPrefix::from(&txid); let vout = Vout::from(outpoint.vout); - if let Some(&&same_block_txindex) = txid_prefix_to_txindex + if let Some(&same_block_txindex) = txid_prefix_to_txindex .get(&txid_prefix) { let outpoint = OutPoint::new(same_block_txindex, vout); return Ok(( txinindex, InputSource::SameBlock { txindex, - txin, vin, outpoint, }, )); } - let prev_txindex = if let Some(txindex) = self + let store_result = self .stores .txidprefix_to_txindex .get(&txid_prefix)? - .map(|v| *v) - .and_then(|txindex| { - (txindex < self.indexes.txindex).then_some(txindex) - }) - { - txindex - } else { - let store_result = self.stores.txidprefix_to_txindex.get(&txid_prefix)?; - tracing::error!( - "UnknownTxid: txid={}, prefix={:?}, store_result={:?}, current_txindex={:?}", - txid, txid_prefix, store_result, self.indexes.txindex - ); - return Err(Error::UnknownTxid); + .map(|v| *v); + + let prev_txindex = match store_result { + Some(txindex) if txindex < self.indexes.txindex => txindex, + _ => { + tracing::error!( + "UnknownTxid: txid={}, prefix={:?}, store_result={:?}, current_txindex={:?}", + txid, txid_prefix, store_result, self.indexes.txindex + ); + return Err(Error::UnknownTxid); + } }; let txoutindex = self .vecs .transactions .first_txoutindex - .get_pushed_or_read(prev_txindex, &self.readers.txindex_to_first_txoutindex)? + .get_pushed_or_read(prev_txindex, &self.readers.txindex_to_first_txoutindex) .ok_or(Error::Internal("Missing txoutindex"))? + vout; @@ -103,14 +96,14 @@ impl<'a> BlockProcessor<'a> { .vecs .outputs .outputtype - .get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_outputtype)? + .get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_outputtype) .ok_or(Error::Internal("Missing outputtype"))?; let typeindex = self .vecs .outputs .typeindex - .get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_typeindex)? + .get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_typeindex) .ok_or(Error::Internal("Missing typeindex"))?; Ok(( @@ -153,8 +146,6 @@ impl<'a> BlockProcessor<'a> { txins: Vec<(TxInIndex, InputSource)>, mut same_block_output_info: FxHashMap, ) -> Result<()> { - let height = self.height; - for (txinindex, input_source) in txins { let (vin, txindex, outpoint, outputtype, typeindex) = match input_source { InputSource::PreviousBlock { @@ -166,7 +157,6 @@ impl<'a> BlockProcessor<'a> { } => (vin, txindex, outpoint, outputtype, typeindex), InputSource::SameBlock { txindex, - txin, vin, outpoint, } => { @@ -177,7 +167,7 @@ impl<'a> BlockProcessor<'a> { .remove(&outpoint) .ok_or(Error::Internal("Same-block output not found")) .inspect_err(|_| { - dbg!(&same_block_output_info, txin); + dbg!(&same_block_output_info, outpoint); })?; (vin, txindex, outpoint, info.outputtype, info.typeindex) } @@ -217,16 +207,15 @@ impl<'a> BlockProcessor<'a> { self.stores .addresstype_to_addressindex_and_txindex .get_mut_unwrap(addresstype) - .insert_if_needed( + .insert( AddressIndexTxIndex::from((addressindex, txindex)), Unit, - height, ); self.stores .addresstype_to_addressindex_and_unspentoutpoint .get_mut_unwrap(addresstype) - .remove_if_needed(AddressIndexOutPoint::from((addressindex, outpoint)), height); + .remove(AddressIndexOutPoint::from((addressindex, outpoint))); } Ok(()) diff --git a/crates/brk_indexer/src/processor/txout.rs b/crates/brk_indexer/src/processor/txout.rs index 72f4883cc..0249585f5 100644 --- a/crates/brk_indexer/src/processor/txout.rs +++ b/crates/brk_indexer/src/processor/txout.rs @@ -6,7 +6,7 @@ use brk_types::{ }; use rayon::prelude::*; use rustc_hash::{FxHashMap, FxHashSet}; -use vecdb::GenericStoredVec; +use vecdb::WritableVec; use super::{BlockProcessor, ProcessedOutput, SameBlockOutputInfo}; @@ -18,17 +18,15 @@ impl<'a> BlockProcessor<'a> { let base_txindex = self.indexes.txindex; let base_txoutindex = self.indexes.txoutindex; - self.block - .txdata - .iter() - .enumerate() - .flat_map(|(index, tx)| { - tx.output - .iter() - .enumerate() - .map(move |(vout, txout)| (TxIndex::from(index), Vout::from(vout), txout, tx)) - }) - .collect::>() + let total_outputs: usize = self.block.txdata.iter().map(|tx| tx.output.len()).sum(); + let mut items = Vec::with_capacity(total_outputs); + for (index, tx) in self.block.txdata.iter().enumerate() { + for (vout, txout) in tx.output.iter().enumerate() { + items.push((TxIndex::from(index), Vout::from(vout), txout, tx)); + } + } + + items .into_par_iter() .enumerate() .map( @@ -59,8 +57,7 @@ impl<'a> BlockProcessor<'a> { .stores .addresstype_to_addresshash_to_addressindex .get_unwrap(addresstype) - .get(&address_hash) - .unwrap() + .get(&address_hash)? .map(|v| *v) .and_then(|typeindex_local| { (typeindex_local < self.indexes.to_typeindex(addresstype)) @@ -68,22 +65,14 @@ impl<'a> BlockProcessor<'a> { }); if check_collisions && let Some(typeindex) = existing_typeindex { - let prev_addressbytes_opt = self.vecs.get_addressbytes_by_type( + let prev_addressbytes = self.vecs.get_addressbytes_by_type( addresstype, typeindex, self.readers.addressbytes.get_unwrap(addresstype), - )?; - let prev_addressbytes = prev_addressbytes_opt - .as_ref() - .ok_or(Error::Internal("Missing addressbytes"))?; + ) + .ok_or(Error::Internal("Missing addressbytes"))?; - if self - .stores - .addresstype_to_addresshash_to_addressindex - .get_unwrap(addresstype) - .needs(height) - && prev_addressbytes != &address_bytes - { + if prev_addressbytes != address_bytes { let txid = tx.compute_txid(); dbg!( height, @@ -121,7 +110,6 @@ impl<'a> BlockProcessor<'a> { txouts: Vec, same_block_spent_outpoints: &FxHashSet, ) -> Result> { - let height = self.height; let mut already_added_addresshash: ByAddressType> = ByAddressType::default(); let mut same_block_output_info: FxHashMap = @@ -172,7 +160,7 @@ impl<'a> BlockProcessor<'a> { self.stores .addresstype_to_addresshash_to_addressindex .get_mut_unwrap(addresstype) - .insert_if_needed(address_hash, ti, height); + .insert(address_hash, ti); self.vecs.push_bytes_if_needed(ti, address_bytes)?; ti @@ -230,10 +218,9 @@ impl<'a> BlockProcessor<'a> { self.stores .addresstype_to_addressindex_and_txindex .get_mut_unwrap(addresstype) - .insert_if_needed( + .insert( AddressIndexTxIndex::from((addressindex, txindex)), Unit, - height, ); } @@ -254,10 +241,9 @@ impl<'a> BlockProcessor<'a> { self.stores .addresstype_to_addressindex_and_unspentoutpoint .get_mut_unwrap(addresstype) - .insert_if_needed( + .insert( AddressIndexOutPoint::from((addressindex, outpoint)), Unit, - height, ); } } diff --git a/crates/brk_indexer/src/processor/types.rs b/crates/brk_indexer/src/processor/types.rs index 242ec9a03..b63673fb8 100644 --- a/crates/brk_indexer/src/processor/types.rs +++ b/crates/brk_indexer/src/processor/types.rs @@ -1,11 +1,11 @@ -use bitcoin::{Transaction, TxIn, TxOut}; +use bitcoin::{Transaction, TxOut}; use brk_types::{ AddressBytes, AddressHash, OutPoint, OutputType, TxIndex, TxOutIndex, Txid, TxidPrefix, TypeIndex, Vin, Vout, }; #[derive(Debug)] -pub enum InputSource<'a> { +pub enum InputSource { PreviousBlock { vin: Vin, txindex: TxIndex, @@ -15,7 +15,6 @@ pub enum InputSource<'a> { }, SameBlock { txindex: TxIndex, - txin: &'a TxIn, vin: Vin, outpoint: OutPoint, }, @@ -43,4 +42,6 @@ pub struct ComputedTx<'a> { pub txid: Txid, pub txid_prefix: TxidPrefix, pub prev_txindex_opt: Option, + pub base_size: u32, + pub total_size: u32, } diff --git a/crates/brk_indexer/src/readers.rs b/crates/brk_indexer/src/readers.rs index 6f6716111..5e22bd022 100644 --- a/crates/brk_indexer/src/readers.rs +++ b/crates/brk_indexer/src/readers.rs @@ -1,11 +1,12 @@ use brk_cohort::ByAddressType; -use vecdb::{GenericStoredVec, Reader}; +use vecdb::Reader; use crate::Vecs; /// Readers for vectors that need to be accessed during block processing. /// These provide consistent snapshots for reading while the main vectors are being modified. pub struct Readers { + pub txid: Reader, pub txindex_to_first_txoutindex: Reader, pub txoutindex_to_outputtype: Reader, pub txoutindex_to_typeindex: Reader, @@ -15,6 +16,7 @@ pub struct Readers { impl Readers { pub fn new(vecs: &Vecs) -> Self { Self { + txid: vecs.transactions.txid.create_reader(), txindex_to_first_txoutindex: vecs.transactions.first_txoutindex.create_reader(), txoutindex_to_outputtype: vecs.outputs.outputtype.create_reader(), txoutindex_to_typeindex: vecs.outputs.typeindex.create_reader(), diff --git a/crates/brk_indexer/src/stores.rs b/crates/brk_indexer/src/stores.rs index 7bddff020..4b71ad1a2 100644 --- a/crates/brk_indexer/src/stores.rs +++ b/crates/brk_indexer/src/stores.rs @@ -12,7 +12,7 @@ use brk_types::{ use fjall::{Database, PersistMode}; use rayon::prelude::*; use tracing::info; -use vecdb::{AnyVec, TypedVecIterator, VecIndex, VecIterator}; +use vecdb::{AnyVec, ReadableVec, VecIndex}; use crate::{Indexes, constants::DUPLICATE_TXID_PREFIXES}; @@ -214,14 +214,14 @@ impl Stores { } if starting_indexes.height != Height::ZERO { - vecs.blocks - .blockhash - .iter()? - .skip(starting_indexes.height.to_usize()) - .map(BlockHashPrefix::from) - .for_each(|prefix| { + vecs.blocks.blockhash.for_each_range( + starting_indexes.height.to_usize(), + vecs.blocks.blockhash.len(), + |blockhash| { + let prefix = BlockHashPrefix::from(blockhash); self.blockhashprefix_to_height.remove(prefix); - }); + }, + ); (starting_indexes.height.to_usize()..vecs.blocks.blockhash.len()) .map(Height::from) @@ -257,16 +257,17 @@ impl Stores { let remove_count = txid_vec_len.saturating_sub(skip_count); tracing::debug!( "Rollback TXIDs: vec_len={}, skip={}, removing={}", - txid_vec_len, skip_count, remove_count + txid_vec_len, + skip_count, + remove_count ); - vecs.transactions - .txid - .iter()? - .enumerate() - .skip(starting_indexes.txindex.to_usize()) - .for_each(|(txindex, txid)| { - let txindex = TxIndex::from(txindex); + { + let start = starting_indexes.txindex.to_usize(); + let end = vecs.transactions.txid.len(); + let mut current_index = start; + vecs.transactions.txid.for_each_range(start, end, |txid| { + let txindex = TxIndex::from(current_index); let txidprefix = TxidPrefix::from(&txid); let is_known_dup = @@ -279,7 +280,9 @@ impl Stores { if !is_known_dup { self.txidprefix_to_txindex.remove(txidprefix); } + current_index += 1; }); + } // Clear caches to prevent stale reads after rollback self.txidprefix_to_txindex.clear_caches(); @@ -288,35 +291,38 @@ impl Stores { } if starting_indexes.txoutindex != TxOutIndex::ZERO { - let mut txoutindex_to_txindex_iter = vecs.outputs.txindex.iter()?; - let mut txindex_to_first_txoutindex_iter = - vecs.transactions.first_txoutindex.iter()?; - let mut txoutindex_to_outputtype_iter = vecs.outputs.outputtype.iter()?; - let mut txoutindex_to_typeindex_iter = vecs.outputs.typeindex.iter()?; + let txindex_to_first_txoutindex_reader = vecs.transactions.first_txoutindex.reader(); + let txoutindex_to_outputtype_reader = vecs.outputs.outputtype.reader(); + let txoutindex_to_typeindex_reader = vecs.outputs.typeindex.reader(); // Collect unique (addresstype, addressindex, txindex) to avoid double deletion // when same address receives multiple outputs in same transaction let mut addressindex_txindex_to_remove: FxHashSet<(OutputType, TypeIndex, TxIndex)> = FxHashSet::default(); - for txoutindex in - starting_indexes.txoutindex.to_usize()..vecs.outputs.outputtype.len() - { - let outputtype = txoutindex_to_outputtype_iter.get_at_unwrap(txoutindex); + let rollback_start = starting_indexes.txoutindex.to_usize(); + let rollback_end = vecs.outputs.outputtype.len(); + + // Pre-collect PcoVec range to avoid per-element page decompression + let txindexes: Vec = + vecs.outputs.txindex.collect_range(rollback_start, rollback_end); + + for (i, txoutindex) in (rollback_start..rollback_end).enumerate() { + let outputtype = txoutindex_to_outputtype_reader.get(txoutindex); if !outputtype.is_address() { continue; } let addresstype = outputtype; - let addressindex = txoutindex_to_typeindex_iter.get_at_unwrap(txoutindex); - let txindex = txoutindex_to_txindex_iter.get_at_unwrap(txoutindex); + let addressindex = txoutindex_to_typeindex_reader.get(txoutindex); + let txindex = txindexes[i]; addressindex_txindex_to_remove.insert((addresstype, addressindex, txindex)); let vout = Vout::from( txoutindex - - txindex_to_first_txoutindex_iter - .get_unwrap(txindex) + - txindex_to_first_txoutindex_reader + .get(txindex.to_usize()) .to_usize(), ); let outpoint = OutPoint::new(txindex, vout); @@ -331,19 +337,15 @@ impl Stores { // Collect outputs that were spent after the rollback point // We need to: 1) reset their spend status, 2) restore address stores - let mut txindex_to_first_txoutindex_iter = - vecs.transactions.first_txoutindex.iter()?; - let mut txoutindex_to_outputtype_iter = vecs.outputs.outputtype.iter()?; - let mut txoutindex_to_typeindex_iter = vecs.outputs.typeindex.iter()?; - let mut txinindex_to_txindex_iter = vecs.inputs.txindex.iter()?; + let start = starting_indexes.txinindex.to_usize(); + let end = vecs.inputs.outpoint.len(); + let outpoints: Vec = vecs.inputs.outpoint.collect_range(start, end); + let spending_txindexes: Vec = vecs.inputs.txindex.collect_range(start, end); - let outputs_to_unspend: Vec<_> = vecs - .inputs - .outpoint - .iter()? - .enumerate() - .skip(starting_indexes.txinindex.to_usize()) - .filter_map(|(txinindex, outpoint): (usize, OutPoint)| { + let outputs_to_unspend: Vec<_> = outpoints + .into_iter() + .zip(spending_txindexes) + .filter_map(|(outpoint, spending_txindex)| { if outpoint.is_coinbase() { return None; } @@ -353,13 +355,13 @@ impl Stores { // Calculate txoutindex from output's txindex and vout let txoutindex = - txindex_to_first_txoutindex_iter.get_unwrap(output_txindex) + vout; + txindex_to_first_txoutindex_reader.get(output_txindex.to_usize()) + vout; // Only process if this output was created before the rollback point if txoutindex < starting_indexes.txoutindex { - let outputtype = txoutindex_to_outputtype_iter.get_unwrap(txoutindex); - let typeindex = txoutindex_to_typeindex_iter.get_unwrap(txoutindex); - let spending_txindex = txinindex_to_txindex_iter.get_at_unwrap(txinindex); + let outputtype = + txoutindex_to_outputtype_reader.get(txoutindex.to_usize()); + let typeindex = txoutindex_to_typeindex_reader.get(txoutindex.to_usize()); Some((outpoint, outputtype, typeindex, spending_txindex)) } else { @@ -378,7 +380,11 @@ impl Stores { let addressindex = typeindex; // Add to same set as first loop - addressindex_txindex_to_remove.insert((addresstype, addressindex, spending_txindex)); + addressindex_txindex_to_remove.insert(( + addresstype, + addressindex, + spending_txindex, + )); // OutPoints are unique, no dedup needed for insert self.addresstype_to_addressindex_and_unspentoutpoint diff --git a/crates/brk_indexer/src/vecs/addresses.rs b/crates/brk_indexer/src/vecs/addresses.rs index 051ee6a6b..002f35485 100644 --- a/crates/brk_indexer/src/vecs/addresses.rs +++ b/crates/brk_indexer/src/vecs/addresses.rs @@ -8,8 +8,8 @@ use brk_types::{ }; use rayon::prelude::*; use vecdb::{ - AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PcoVec, Reader, Stamp, - TypedVecIterator, + AnyStoredVec, BytesVec, Database, WritableVec, ImportableVec, PcoVec, Reader, ReadableVec, + Stamp, VecIndex, }; use crate::parallel_import; @@ -171,71 +171,70 @@ impl AddressesVecs { addresstype: OutputType, typeindex: TypeIndex, reader: &Reader, - ) -> Result> { + ) -> Option { match addresstype { OutputType::P2PK65 => self .p2pk65bytes .get_pushed_or_read(typeindex.into(), reader) - .map(|opt| opt.map(AddressBytes::from)), + .map(AddressBytes::from), OutputType::P2PK33 => self .p2pk33bytes .get_pushed_or_read(typeindex.into(), reader) - .map(|opt| opt.map(AddressBytes::from)), + .map(AddressBytes::from), OutputType::P2PKH => self .p2pkhbytes .get_pushed_or_read(typeindex.into(), reader) - .map(|opt| opt.map(AddressBytes::from)), + .map(AddressBytes::from), OutputType::P2SH => self .p2shbytes .get_pushed_or_read(typeindex.into(), reader) - .map(|opt| opt.map(AddressBytes::from)), + .map(AddressBytes::from), OutputType::P2WPKH => self .p2wpkhbytes .get_pushed_or_read(typeindex.into(), reader) - .map(|opt| opt.map(AddressBytes::from)), + .map(AddressBytes::from), OutputType::P2WSH => self .p2wshbytes .get_pushed_or_read(typeindex.into(), reader) - .map(|opt| opt.map(AddressBytes::from)), + .map(AddressBytes::from), OutputType::P2TR => self .p2trbytes .get_pushed_or_read(typeindex.into(), reader) - .map(|opt| opt.map(AddressBytes::from)), + .map(AddressBytes::from), OutputType::P2A => self .p2abytes .get_pushed_or_read(typeindex.into(), reader) - .map(|opt| opt.map(AddressBytes::from)), + .map(AddressBytes::from), _ => unreachable!("get_bytes_by_type called with non-address type"), } - .map_err(|e| e.into()) } pub fn push_bytes_if_needed(&mut self, index: TypeIndex, bytes: AddressBytes) -> Result<()> { match bytes { AddressBytes::P2PK65(bytes) => self .p2pk65bytes - .checked_push(index.into(), *bytes)?, + .checked_push(index.into(), bytes)?, AddressBytes::P2PK33(bytes) => self .p2pk33bytes - .checked_push(index.into(), *bytes)?, + .checked_push(index.into(), bytes)?, AddressBytes::P2PKH(bytes) => self .p2pkhbytes - .checked_push(index.into(), *bytes)?, + .checked_push(index.into(), bytes)?, AddressBytes::P2SH(bytes) => self .p2shbytes - .checked_push(index.into(), *bytes)?, + .checked_push(index.into(), bytes)?, AddressBytes::P2WPKH(bytes) => self .p2wpkhbytes - .checked_push(index.into(), *bytes)?, + .checked_push(index.into(), bytes)?, AddressBytes::P2WSH(bytes) => self .p2wshbytes - .checked_push(index.into(), *bytes)?, + .checked_push(index.into(), bytes)?, AddressBytes::P2TR(bytes) => self .p2trbytes - .checked_push(index.into(), *bytes)?, + .checked_push(index.into(), bytes)?, AddressBytes::P2A(bytes) => self .p2abytes - .checked_push(index.into(), *bytes)?, + .checked_push(index.into(), bytes)?, }; Ok(()) } @@ -250,11 +249,12 @@ impl AddressesVecs { ) -> Result + '_>> { macro_rules! make_iter { ($height_vec:expr, $bytes_vec:expr) => {{ - match $height_vec.read_once(height) { - Ok(mut index) => { - let mut iter = $bytes_vec.iter()?; + let h = height.to_usize(); + match $height_vec.collect_one(h) { + Some(mut index) => { + let reader = $bytes_vec.reader(); Ok(Box::new(std::iter::from_fn(move || { - iter.get(index).map(|typedbytes| { + reader.try_get(index.to_usize()).map(|typedbytes| { let bytes = AddressBytes::from(typedbytes); index.increment(); AddressHash::from(&bytes) @@ -262,7 +262,7 @@ impl AddressesVecs { })) as Box + '_>) } - Err(_) => { + None => { Ok(Box::new(std::iter::empty()) as Box + '_>) } diff --git a/crates/brk_indexer/src/vecs/blocks.rs b/crates/brk_indexer/src/vecs/blocks.rs index a34973531..58acaa601 100644 --- a/crates/brk_indexer/src/vecs/blocks.rs +++ b/crates/brk_indexer/src/vecs/blocks.rs @@ -2,7 +2,7 @@ use brk_error::Result; use brk_traversable::Traversable; use brk_types::{BlockHash, Height, StoredF64, StoredU64, Timestamp, Version, Weight}; use rayon::prelude::*; -use vecdb::{AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PcoVec, Stamp}; +use vecdb::{AnyStoredVec, BytesVec, Database, WritableVec, ImportableVec, PcoVec, Stamp}; use crate::parallel_import; diff --git a/crates/brk_indexer/src/vecs/inputs.rs b/crates/brk_indexer/src/vecs/inputs.rs index 8dce93426..515face8c 100644 --- a/crates/brk_indexer/src/vecs/inputs.rs +++ b/crates/brk_indexer/src/vecs/inputs.rs @@ -2,7 +2,7 @@ use brk_error::Result; use brk_traversable::Traversable; use brk_types::{Height, OutPoint, OutputType, TxInIndex, TxIndex, TypeIndex, Version}; use rayon::prelude::*; -use vecdb::{AnyStoredVec, Database, GenericStoredVec, ImportableVec, PcoVec, Stamp}; +use vecdb::{AnyStoredVec, Database, WritableVec, ImportableVec, PcoVec, Stamp}; use crate::parallel_import; diff --git a/crates/brk_indexer/src/vecs/mod.rs b/crates/brk_indexer/src/vecs/mod.rs index 60eadf739..295e16126 100644 --- a/crates/brk_indexer/src/vecs/mod.rs +++ b/crates/brk_indexer/src/vecs/mod.rs @@ -4,7 +4,9 @@ use brk_error::Result; use brk_traversable::Traversable; use brk_types::{AddressBytes, AddressHash, Height, OutputType, TypeIndex, Version}; use rayon::prelude::*; -use vecdb::{AnyStoredVec, Database, PAGE_SIZE, Reader, Stamp}; +use vecdb::{AnyStoredVec, Database, Reader, Stamp}; + +const PAGE_SIZE: usize = 4096; use crate::parallel_import; @@ -149,7 +151,7 @@ impl Vecs { addresstype: OutputType, typeindex: TypeIndex, reader: &Reader, - ) -> Result> { + ) -> Option { self.addresses .get_bytes_by_type(addresstype, typeindex, reader) } diff --git a/crates/brk_indexer/src/vecs/outputs.rs b/crates/brk_indexer/src/vecs/outputs.rs index 783d6b65b..3b32b4354 100644 --- a/crates/brk_indexer/src/vecs/outputs.rs +++ b/crates/brk_indexer/src/vecs/outputs.rs @@ -2,7 +2,7 @@ use brk_error::Result; use brk_traversable::Traversable; use brk_types::{Height, OutputType, Sats, TxIndex, TxOutIndex, TypeIndex, Version}; use rayon::prelude::*; -use vecdb::{AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PcoVec, Stamp}; +use vecdb::{AnyStoredVec, BytesVec, Database, WritableVec, ImportableVec, PcoVec, Stamp}; use crate::parallel_import; diff --git a/crates/brk_indexer/src/vecs/scripts.rs b/crates/brk_indexer/src/vecs/scripts.rs index 0b63adbb1..b525709a2 100644 --- a/crates/brk_indexer/src/vecs/scripts.rs +++ b/crates/brk_indexer/src/vecs/scripts.rs @@ -4,7 +4,7 @@ use brk_types::{ EmptyOutputIndex, Height, OpReturnIndex, P2MSOutputIndex, TxIndex, UnknownOutputIndex, Version, }; use rayon::prelude::*; -use vecdb::{AnyStoredVec, Database, GenericStoredVec, ImportableVec, PcoVec, Stamp}; +use vecdb::{AnyStoredVec, Database, WritableVec, ImportableVec, PcoVec, Stamp}; use crate::parallel_import; diff --git a/crates/brk_indexer/src/vecs/transactions.rs b/crates/brk_indexer/src/vecs/transactions.rs index 83530fd40..a3cb0d2c6 100644 --- a/crates/brk_indexer/src/vecs/transactions.rs +++ b/crates/brk_indexer/src/vecs/transactions.rs @@ -5,7 +5,7 @@ use brk_types::{ Version, }; use rayon::prelude::*; -use vecdb::{AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PcoVec, Stamp}; +use vecdb::{AnyStoredVec, BytesVec, Database, WritableVec, ImportableVec, PcoVec, Stamp}; use crate::parallel_import;