From 4352868410bee48af8c6302ff0942e00e374cbf7 Mon Sep 17 00:00:00 2001 From: nym21 Date: Wed, 18 Feb 2026 19:38:19 +0100 Subject: [PATCH] indexer: snapshot --- crates/brk_indexer/src/constants.rs | 2 +- crates/brk_indexer/src/lib.rs | 61 +++-- crates/brk_indexer/src/processor/metadata.rs | 35 ++- crates/brk_indexer/src/processor/mod.rs | 67 ++++- crates/brk_indexer/src/processor/tx.rs | 73 ++--- crates/brk_indexer/src/processor/txin.rs | 179 ++++++------ crates/brk_indexer/src/processor/txout.rs | 272 +++++++++---------- crates/brk_indexer/src/processor/types.rs | 12 + crates/brk_indexer/src/vecs/transactions.rs | 40 ++- 9 files changed, 413 insertions(+), 328 deletions(-) diff --git a/crates/brk_indexer/src/constants.rs b/crates/brk_indexer/src/constants.rs index 60ca8f6df..2293ee6ab 100644 --- a/crates/brk_indexer/src/constants.rs +++ b/crates/brk_indexer/src/constants.rs @@ -4,7 +4,7 @@ use brk_types::{TxIndex, Txid, TxidPrefix, Version}; // One version for all data sources // Increment on **change _OR_ addition** -pub const VERSION: Version = Version::new(24); +pub const VERSION: Version = Version::new(25); pub const SNAPSHOT_BLOCK_RANGE: usize = 1_000; /// Known duplicate Bitcoin transactions (BIP30) diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index a897064aa..7754db4f8 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -136,8 +136,8 @@ impl Indexer { let mut indexes = starting_indexes.clone(); debug!("Indexes cloned."); - let should_export = |height: Height, rem: bool| -> bool { - height != 0 && (height % SNAPSHOT_BLOCK_RANGE == 0) != rem + let is_export_height = |height: Height| -> bool { + height != 0 && height % SNAPSHOT_BLOCK_RANGE == 0 }; let export = move |stores: &mut Stores, vecs: &mut Vecs, height: Height| -> Result<()> { @@ -166,6 +166,7 @@ impl Indexer { }; let mut readers = Readers::new(&self.vecs); + let mut buffers = BlockBuffers::default(); let vecs = &mut self.vecs; let stores = &mut self.stores; @@ -187,48 +188,50 @@ impl Indexer { readers: &readers, }; - // Phase 1: Process block metadata + // 1. Process block metadata processor.process_block_metadata()?; - // Phase 2: Compute TXIDs in parallel + // 2. Compute TXIDs (parallel) let txs = processor.compute_txids()?; - // Phase 3+5: Process inputs and outputs in parallel - // They access different stores (txidprefix vs addresshash) and - // different vecs, so running concurrently hides latency of the - // shorter phase behind the longer one. + // 2.5 Push block size/weight reusing per-tx sizes from compute_txids + processor.push_block_size_and_weight(&txs)?; + + // 3. Process inputs and outputs (parallel) let (txins_result, txouts_result) = rayon::join( - || processor.process_inputs(&txs), + || processor.process_inputs(&txs, &mut buffers.txid_prefix_map), || processor.process_outputs(), ); let txins = txins_result?; let txouts = txouts_result?; - // Phase 4: Collect same-block spent outpoints - let same_block_spent_outpoints = - BlockProcessor::collect_same_block_spent_outpoints(&txins); + let tx_count = block.txdata.len(); + let input_count = txins.len(); + let output_count = txouts.len(); - let tx_len = block.txdata.len(); - let inputs_len = txins.len(); - let outputs_len = txouts.len(); + // 4. Collect same-block spent outpoints + BlockProcessor::collect_same_block_spent_outpoints( + &txins, + &mut buffers.same_block_spent, + ); - // Phase 6: Finalize outputs sequentially - let same_block_output_info = - processor.finalize_outputs(txouts, &same_block_spent_outpoints)?; - - // Phase 7: Finalize inputs sequentially - processor.finalize_inputs(txins, same_block_output_info)?; - - // Phase 8: Check TXID collisions + // 5. Check TXID collisions (BIP-30) processor.check_txid_collisions(&txs)?; - // Phase 9: Store transaction metadata - processor.store_transaction_metadata(txs)?; + // 6. Finalize outputs/inputs || store tx metadata (parallel) + processor.finalize_and_store_metadata( + txs, + txouts, + txins, + &buffers.same_block_spent, + &mut buffers.already_added_addresses, + &mut buffers.same_block_output_info, + )?; - // Phase 10: Update indexes - processor.update_indexes(tx_len, inputs_len, outputs_len); + // 7. Update indexes + processor.update_indexes(tx_count, input_count, output_count); - if should_export(height, false) { + if is_export_height(height) { drop(readers); export(stores, vecs, height)?; readers = Readers::new(vecs); @@ -237,7 +240,7 @@ impl Indexer { drop(readers); - if should_export(indexes.height, true) { + if !is_export_height(indexes.height) { export(stores, vecs, indexes.height)?; } diff --git a/crates/brk_indexer/src/processor/metadata.rs b/crates/brk_indexer/src/processor/metadata.rs index 44efa1f9a..61785c60a 100644 --- a/crates/brk_indexer/src/processor/metadata.rs +++ b/crates/brk_indexer/src/processor/metadata.rs @@ -3,7 +3,7 @@ use brk_types::{BlockHashPrefix, Timestamp}; use tracing::error; use vecdb::WritableVec; -use super::BlockProcessor; +use super::{BlockProcessor, ComputedTx}; use crate::IndexesExt; impl BlockProcessor<'_> { @@ -45,16 +45,31 @@ impl BlockProcessor<'_> { .blocks .timestamp .checked_push(height, Timestamp::from(self.block.header.time))?; - let (block_total_size, block_weight) = self.block.total_size_and_weight(); - self.vecs - .blocks - .total_size - .checked_push(height, block_total_size.into())?; - self.vecs - .blocks - .weight - .checked_push(height, block_weight.into())?; Ok(()) } + + /// Push block total_size and weight, reusing per-tx sizes already computed in ComputedTx. + /// This avoids redundant tx serialization (base_size + total_size were already computed). + pub fn push_block_size_and_weight(&mut self, txs: &[ComputedTx]) -> Result<()> { + let overhead = + bitcoin::block::Header::SIZE + bitcoin::VarInt::from(txs.len()).size(); + let mut total_size = overhead; + let mut weight_wu = overhead * 4; + for ct in txs { + let base = ct.base_size as usize; + let total = ct.total_size as usize; + total_size += total; + weight_wu += base * 3 + total; + } + self.vecs + .blocks + .total_size + .checked_push(self.height, total_size.into())?; + self.vecs + .blocks + .weight + .checked_push(self.height, weight_wu.into())?; + Ok(()) + } } diff --git a/crates/brk_indexer/src/processor/mod.rs b/crates/brk_indexer/src/processor/mod.rs index e698975ff..f9038090f 100644 --- a/crates/brk_indexer/src/processor/mod.rs +++ b/crates/brk_indexer/src/processor/mod.rs @@ -6,7 +6,10 @@ mod types; pub use types::*; -use brk_types::{Block, Height, TxInIndex, TxIndex, TxOutIndex}; +use brk_cohort::ByAddressType; +use brk_error::Result; +use brk_types::{AddressHash, Block, Height, OutPoint, TxInIndex, TxIndex, TxOutIndex, TypeIndex}; +use rustc_hash::{FxHashMap, FxHashSet}; use crate::{Indexes, Readers, Stores, Vecs}; @@ -28,4 +31,66 @@ impl BlockProcessor<'_> { self.indexes.txinindex += TxInIndex::from(input_count); self.indexes.txoutindex += TxOutIndex::from(output_count); } + + /// Finalizes outputs/inputs in parallel with storing tx metadata. + #[allow(clippy::too_many_arguments)] + pub fn finalize_and_store_metadata( + &mut self, + txs: Vec, + txouts: Vec, + txins: Vec<(TxInIndex, InputSource)>, + same_block_spent_outpoints: &FxHashSet, + already_added: &mut ByAddressType>, + same_block_info: &mut FxHashMap, + ) -> Result<()> { + let height = self.height; + let indexes = &mut *self.indexes; + + // Split transactions vecs: finalize needs first_txoutindex/first_txinindex, metadata needs the rest + let (first_txoutindex, first_txinindex, mut tx_metadata) = + self.vecs.transactions.split_for_finalize(); + + let outputs = &mut self.vecs.outputs; + let inputs = &mut self.vecs.inputs; + let addresses = &mut self.vecs.addresses; + let scripts = &mut self.vecs.scripts; + + let addr_hash_stores = &mut self.stores.addresstype_to_addresshash_to_addressindex; + let addr_txindex_stores = &mut self.stores.addresstype_to_addressindex_and_txindex; + let addr_outpoint_stores = + &mut self.stores.addresstype_to_addressindex_and_unspentoutpoint; + let txidprefix_store = &mut self.stores.txidprefix_to_txindex; + + let (finalize_result, metadata_result) = rayon::join( + || -> Result<()> { + txout::finalize_outputs( + indexes, + first_txoutindex, + outputs, + addresses, + scripts, + addr_hash_stores, + addr_txindex_stores, + addr_outpoint_stores, + txouts, + same_block_spent_outpoints, + already_added, + same_block_info, + )?; + txin::finalize_inputs( + first_txinindex, + inputs, + addr_txindex_stores, + addr_outpoint_stores, + txins, + same_block_info, + ) + }, + || tx::store_tx_metadata(height, txs, txidprefix_store, &mut tx_metadata), + ); + + finalize_result?; + metadata_result?; + Ok(()) + } } diff --git a/crates/brk_indexer/src/processor/tx.rs b/crates/brk_indexer/src/processor/tx.rs index 9dc18437b..1d9ab9120 100644 --- a/crates/brk_indexer/src/processor/tx.rs +++ b/crates/brk_indexer/src/processor/tx.rs @@ -1,8 +1,10 @@ use brk_error::{Error, Result}; -use brk_types::{StoredBool, TxIndex, Txid, TxidPrefix}; +use brk_store::Store; +use brk_types::{Height, StoredBool, TxIndex, Txid, TxidPrefix}; use rayon::prelude::*; use vecdb::{AnyVec, WritableVec, likely}; +use crate::TxMetadataVecs; use crate::constants::DUPLICATE_TXIDS; use super::{BlockProcessor, ComputedTx}; @@ -78,47 +80,30 @@ impl<'a> BlockProcessor<'a> { Ok(()) } - - pub fn store_transaction_metadata(&mut self, txs: Vec) -> Result<()> { - let height = self.height; - - for ct in txs { - if ct.prev_txindex_opt.is_none() { - self.stores - .txidprefix_to_txindex - .insert(ct.txid_prefix, ct.txindex); - } - - self.vecs - .transactions - .height - .checked_push(ct.txindex, height)?; - self.vecs - .transactions - .txversion - .checked_push(ct.txindex, ct.tx.version.into())?; - self.vecs - .transactions - .txid - .checked_push(ct.txindex, ct.txid)?; - self.vecs - .transactions - .rawlocktime - .checked_push(ct.txindex, ct.tx.lock_time.into())?; - self.vecs - .transactions - .base_size - .checked_push(ct.txindex, ct.base_size.into())?; - self.vecs - .transactions - .total_size - .checked_push(ct.txindex, ct.total_size.into())?; - self.vecs - .transactions - .is_explicitly_rbf - .checked_push(ct.txindex, StoredBool::from(ct.tx.is_explicitly_rbf()))?; - } - - Ok(()) - } +} + +pub(super) fn store_tx_metadata( + height: Height, + txs: Vec, + store: &mut Store, + md: &mut TxMetadataVecs<'_>, +) -> Result<()> { + for ct in txs { + if ct.prev_txindex_opt.is_none() { + store.insert(ct.txid_prefix, ct.txindex); + } + md.height.checked_push(ct.txindex, height)?; + md.txversion + .checked_push(ct.txindex, ct.tx.version.into())?; + md.txid.checked_push(ct.txindex, ct.txid)?; + md.rawlocktime + .checked_push(ct.txindex, ct.tx.lock_time.into())?; + md.base_size + .checked_push(ct.txindex, ct.base_size.into())?; + md.total_size + .checked_push(ct.txindex, ct.total_size.into())?; + md.is_explicitly_rbf + .checked_push(ct.txindex, StoredBool::from(ct.tx.is_explicitly_rbf()))?; + } + Ok(()) } diff --git a/crates/brk_indexer/src/processor/txin.rs b/crates/brk_indexer/src/processor/txin.rs index 59ba39887..e49fbaeef 100644 --- a/crates/brk_indexer/src/processor/txin.rs +++ b/crates/brk_indexer/src/processor/txin.rs @@ -1,21 +1,25 @@ +use brk_cohort::ByAddressType; use brk_error::{Error, Result}; +use brk_store::Store; use brk_types::{ AddressIndexOutPoint, AddressIndexTxIndex, OutPoint, OutputType, TxInIndex, TxIndex, Txid, TxidPrefix, TypeIndex, Unit, Vin, Vout, }; use rayon::prelude::*; use rustc_hash::{FxHashMap, FxHashSet}; -use vecdb::WritableVec; +use vecdb::{PcoVec, WritableVec}; use super::{BlockProcessor, ComputedTx, InputSource, SameBlockOutputInfo}; +use crate::InputsVecs; impl<'a> BlockProcessor<'a> { pub fn process_inputs( &self, txs: &[ComputedTx], + txid_prefix_to_txindex: &mut FxHashMap, ) -> Result> { - let txid_prefix_to_txindex: FxHashMap<_, _> = - txs.iter().map(|ct| (ct.txid_prefix, ct.txindex)).collect(); + txid_prefix_to_txindex.clear(); + txid_prefix_to_txindex.extend(txs.iter().map(|ct| (ct.txid_prefix, ct.txindex))); let base_txindex = self.indexes.txindex; let base_txinindex = self.indexes.txinindex; @@ -28,6 +32,8 @@ impl<'a> BlockProcessor<'a> { } } + let txid_prefix_to_txindex = &*txid_prefix_to_txindex; + let txins = items .into_par_iter() .enumerate() @@ -125,99 +131,78 @@ impl<'a> BlockProcessor<'a> { pub fn collect_same_block_spent_outpoints( txins: &[(TxInIndex, InputSource)], - ) -> FxHashSet { - txins - .iter() - .filter_map(|(_, input_source)| { - let InputSource::SameBlock { outpoint, .. } = input_source else { - return None; - }; - if !outpoint.is_coinbase() { - Some(*outpoint) - } else { - None - } - }) - .collect() - } - - pub fn finalize_inputs( - &mut self, - txins: Vec<(TxInIndex, InputSource)>, - mut same_block_output_info: FxHashMap, - ) -> Result<()> { - for (txinindex, input_source) in txins { - let (vin, txindex, outpoint, outputtype, typeindex) = match input_source { - InputSource::PreviousBlock { - vin, - txindex, - outpoint, - outputtype, - typeindex, - } => (vin, txindex, outpoint, outputtype, typeindex), - InputSource::SameBlock { - txindex, - vin, - outpoint, - } => { - if outpoint.is_coinbase() { - (vin, txindex, outpoint, OutputType::Unknown, TypeIndex::COINBASE) - } else { - let info = same_block_output_info - .remove(&outpoint) - .ok_or(Error::Internal("Same-block output not found")) - .inspect_err(|_| { - dbg!(&same_block_output_info, outpoint); - })?; - (vin, txindex, outpoint, info.outputtype, info.typeindex) - } - } - }; - - if vin.is_zero() { - self.vecs - .transactions - .first_txinindex - .checked_push(txindex, txinindex)?; - } - - self.vecs - .inputs - .txindex - .checked_push(txinindex, txindex)?; - self.vecs - .inputs - .outpoint - .checked_push(txinindex, outpoint)?; - self.vecs - .inputs - .outputtype - .checked_push(txinindex, outputtype)?; - self.vecs - .inputs - .typeindex - .checked_push(txinindex, typeindex)?; - - if !outputtype.is_address() { - continue; - } - let addresstype = outputtype; - let addressindex = typeindex; - - self.stores - .addresstype_to_addressindex_and_txindex - .get_mut_unwrap(addresstype) - .insert( - AddressIndexTxIndex::from((addressindex, txindex)), - Unit, - ); - - self.stores - .addresstype_to_addressindex_and_unspentoutpoint - .get_mut_unwrap(addresstype) - .remove(AddressIndexOutPoint::from((addressindex, outpoint))); - } - - Ok(()) + out: &mut FxHashSet, + ) { + out.clear(); + out.extend(txins.iter().filter_map(|(_, input_source)| match input_source { + InputSource::SameBlock { outpoint, .. } if !outpoint.is_coinbase() => Some(*outpoint), + _ => None, + })); } } + +pub(super) fn finalize_inputs( + first_txinindex: &mut PcoVec, + inputs: &mut InputsVecs, + addr_txindex_stores: &mut ByAddressType>, + addr_outpoint_stores: &mut ByAddressType>, + txins: Vec<(TxInIndex, InputSource)>, + same_block_output_info: &mut FxHashMap, +) -> Result<()> { + for (txinindex, input_source) in txins { + let (vin, txindex, outpoint, outputtype, typeindex) = match input_source { + InputSource::PreviousBlock { + vin, + txindex, + outpoint, + outputtype, + typeindex, + } => (vin, txindex, outpoint, outputtype, typeindex), + InputSource::SameBlock { + txindex, + vin, + outpoint, + } => { + if outpoint.is_coinbase() { + (vin, txindex, outpoint, OutputType::Unknown, TypeIndex::COINBASE) + } else { + let info = same_block_output_info + .remove(&outpoint) + .ok_or(Error::Internal("Same-block output not found")) + .inspect_err(|_| { + dbg!(&same_block_output_info, outpoint); + })?; + (vin, txindex, outpoint, info.outputtype, info.typeindex) + } + } + }; + + if vin.is_zero() { + first_txinindex.checked_push(txindex, txinindex)?; + } + + inputs.txindex.checked_push(txinindex, txindex)?; + inputs.outpoint.checked_push(txinindex, outpoint)?; + inputs.outputtype.checked_push(txinindex, outputtype)?; + inputs.typeindex.checked_push(txinindex, typeindex)?; + + if !outputtype.is_address() { + continue; + } + let addresstype = outputtype; + let addressindex = typeindex; + + addr_txindex_stores + .get_mut_unwrap(addresstype) + .insert( + AddressIndexTxIndex::from((addressindex, txindex)), + Unit, + ); + + addr_outpoint_stores + .get_mut_unwrap(addresstype) + .remove(AddressIndexOutPoint::from((addressindex, outpoint))); + } + + Ok(()) +} diff --git a/crates/brk_indexer/src/processor/txout.rs b/crates/brk_indexer/src/processor/txout.rs index 0249585f5..e2f4cf820 100644 --- a/crates/brk_indexer/src/processor/txout.rs +++ b/crates/brk_indexer/src/processor/txout.rs @@ -1,14 +1,16 @@ use brk_cohort::ByAddressType; use brk_error::{Error, Result}; +use brk_store::Store; use brk_types::{ AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, OutPoint, OutputType, Sats, TxIndex, TxOutIndex, TypeIndex, Unit, Vout, }; use rayon::prelude::*; use rustc_hash::{FxHashMap, FxHashSet}; -use vecdb::WritableVec; +use vecdb::{BytesVec, WritableVec}; use super::{BlockProcessor, ProcessedOutput, SameBlockOutputInfo}; +use crate::{AddressesVecs, Indexes, OutputsVecs, ScriptsVecs}; impl<'a> BlockProcessor<'a> { pub fn process_outputs(&self) -> Result>> { @@ -104,150 +106,138 @@ impl<'a> BlockProcessor<'a> { ) .collect() } +} - pub fn finalize_outputs( - &mut self, - txouts: Vec, - same_block_spent_outpoints: &FxHashSet, - ) -> Result> { - let mut already_added_addresshash: ByAddressType> = - ByAddressType::default(); - let mut same_block_output_info: FxHashMap = - FxHashMap::with_capacity_and_hasher( - same_block_spent_outpoints.len(), - Default::default(), - ); +#[allow(clippy::too_many_arguments)] +pub(super) fn finalize_outputs( + indexes: &mut Indexes, + first_txoutindex: &mut BytesVec, + outputs: &mut OutputsVecs, + addresses: &mut AddressesVecs, + scripts: &mut ScriptsVecs, + addr_hash_stores: &mut ByAddressType>, + addr_txindex_stores: &mut ByAddressType>, + addr_outpoint_stores: &mut ByAddressType>, + txouts: Vec, + same_block_spent_outpoints: &FxHashSet, + already_added_addresshash: &mut ByAddressType>, + same_block_output_info: &mut FxHashMap, +) -> Result<()> { + already_added_addresshash + .values_mut() + .for_each(|m| m.clear()); + same_block_output_info.clear(); - for ProcessedOutput { - txoutindex, - txout, - txindex, - vout, - outputtype, - address_info, - existing_typeindex, - } in txouts - { - let sats = Sats::from(txout.value); + for ProcessedOutput { + txoutindex, + txout, + txindex, + vout, + outputtype, + address_info, + existing_typeindex, + } in txouts + { + let sats = Sats::from(txout.value); - if vout.is_zero() { - self.vecs - .transactions - .first_txoutindex - .checked_push(txindex, txoutindex)?; - } - - self.vecs - .outputs - .txindex - .checked_push(txoutindex, txindex)?; - - let typeindex = if let Some(ti) = existing_typeindex { - ti - } else if let Some((address_bytes, address_hash)) = address_info { - let addresstype = outputtype; - if let Some(&ti) = already_added_addresshash - .get_unwrap(addresstype) - .get(&address_hash) - { - ti - } else { - let ti = self.indexes.increment_address_index(addresstype); - - already_added_addresshash - .get_mut_unwrap(addresstype) - .insert(address_hash, ti); - self.stores - .addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(addresstype) - .insert(address_hash, ti); - self.vecs.push_bytes_if_needed(ti, address_bytes)?; - - ti - } - } else { - match outputtype { - OutputType::P2MS => { - self.vecs - .scripts - .p2ms_to_txindex - .checked_push(self.indexes.p2msoutputindex, txindex)?; - self.indexes.p2msoutputindex.copy_then_increment() - } - OutputType::OpReturn => { - self.vecs - .scripts - .opreturn_to_txindex - .checked_push(self.indexes.opreturnindex, txindex)?; - self.indexes.opreturnindex.copy_then_increment() - } - OutputType::Empty => { - self.vecs - .scripts - .empty_to_txindex - .checked_push(self.indexes.emptyoutputindex, txindex)?; - self.indexes.emptyoutputindex.copy_then_increment() - } - OutputType::Unknown => { - self.vecs - .scripts - .unknown_to_txindex - .checked_push(self.indexes.unknownoutputindex, txindex)?; - self.indexes.unknownoutputindex.copy_then_increment() - } - _ => unreachable!(), - } - }; - - self.vecs.outputs.value.checked_push(txoutindex, sats)?; - self.vecs - .outputs - .outputtype - .checked_push(txoutindex, outputtype)?; - self.vecs - .outputs - .typeindex - .checked_push(txoutindex, typeindex)?; - - if outputtype.is_unspendable() { - continue; - } else if outputtype.is_address() { - let addresstype = outputtype; - let addressindex = typeindex; - - self.stores - .addresstype_to_addressindex_and_txindex - .get_mut_unwrap(addresstype) - .insert( - AddressIndexTxIndex::from((addressindex, txindex)), - Unit, - ); - } - - let outpoint = OutPoint::new(txindex, vout); - - if same_block_spent_outpoints.contains(&outpoint) { - same_block_output_info.insert( - outpoint, - SameBlockOutputInfo { - outputtype, - typeindex, - }, - ); - } else if outputtype.is_address() { - let addresstype = outputtype; - let addressindex = typeindex; - - self.stores - .addresstype_to_addressindex_and_unspentoutpoint - .get_mut_unwrap(addresstype) - .insert( - AddressIndexOutPoint::from((addressindex, outpoint)), - Unit, - ); - } + if vout.is_zero() { + first_txoutindex.checked_push(txindex, txoutindex)?; } - Ok(same_block_output_info) + outputs.txindex.checked_push(txoutindex, txindex)?; + + let typeindex = if let Some(ti) = existing_typeindex { + ti + } else if let Some((address_bytes, address_hash)) = address_info { + let addresstype = outputtype; + if let Some(&ti) = already_added_addresshash + .get_unwrap(addresstype) + .get(&address_hash) + { + ti + } else { + let ti = indexes.increment_address_index(addresstype); + + already_added_addresshash + .get_mut_unwrap(addresstype) + .insert(address_hash, ti); + addr_hash_stores + .get_mut_unwrap(addresstype) + .insert(address_hash, ti); + addresses.push_bytes_if_needed(ti, address_bytes)?; + + ti + } + } else { + match outputtype { + OutputType::P2MS => { + scripts + .p2ms_to_txindex + .checked_push(indexes.p2msoutputindex, txindex)?; + indexes.p2msoutputindex.copy_then_increment() + } + OutputType::OpReturn => { + scripts + .opreturn_to_txindex + .checked_push(indexes.opreturnindex, txindex)?; + indexes.opreturnindex.copy_then_increment() + } + OutputType::Empty => { + scripts + .empty_to_txindex + .checked_push(indexes.emptyoutputindex, txindex)?; + indexes.emptyoutputindex.copy_then_increment() + } + OutputType::Unknown => { + scripts + .unknown_to_txindex + .checked_push(indexes.unknownoutputindex, txindex)?; + indexes.unknownoutputindex.copy_then_increment() + } + _ => unreachable!(), + } + }; + + outputs.value.checked_push(txoutindex, sats)?; + outputs.outputtype.checked_push(txoutindex, outputtype)?; + outputs.typeindex.checked_push(txoutindex, typeindex)?; + + if outputtype.is_unspendable() { + continue; + } else if outputtype.is_address() { + let addresstype = outputtype; + let addressindex = typeindex; + + addr_txindex_stores + .get_mut_unwrap(addresstype) + .insert( + AddressIndexTxIndex::from((addressindex, txindex)), + Unit, + ); + } + + let outpoint = OutPoint::new(txindex, vout); + + if same_block_spent_outpoints.contains(&outpoint) { + same_block_output_info.insert( + outpoint, + SameBlockOutputInfo { + outputtype, + typeindex, + }, + ); + } else if outputtype.is_address() { + let addresstype = outputtype; + let addressindex = typeindex; + + addr_outpoint_stores + .get_mut_unwrap(addresstype) + .insert( + AddressIndexOutPoint::from((addressindex, outpoint)), + Unit, + ); + } } + + Ok(()) } diff --git a/crates/brk_indexer/src/processor/types.rs b/crates/brk_indexer/src/processor/types.rs index b63673fb8..f309a4438 100644 --- a/crates/brk_indexer/src/processor/types.rs +++ b/crates/brk_indexer/src/processor/types.rs @@ -1,8 +1,10 @@ use bitcoin::{Transaction, TxOut}; +use brk_cohort::ByAddressType; use brk_types::{ AddressBytes, AddressHash, OutPoint, OutputType, TxIndex, TxOutIndex, Txid, TxidPrefix, TypeIndex, Vin, Vout, }; +use rustc_hash::{FxHashMap, FxHashSet}; #[derive(Debug)] pub enum InputSource { @@ -45,3 +47,13 @@ pub struct ComputedTx<'a> { pub base_size: u32, pub total_size: u32, } + +/// Reusable buffers cleared and refilled each block to avoid allocation churn. +#[derive(Default)] +pub struct BlockBuffers { + pub txid_prefix_map: FxHashMap, + pub same_block_spent: FxHashSet, + pub already_added_addresses: ByAddressType>, + pub same_block_output_info: FxHashMap, +} + diff --git a/crates/brk_indexer/src/vecs/transactions.rs b/crates/brk_indexer/src/vecs/transactions.rs index a3cb0d2c6..c777fac09 100644 --- a/crates/brk_indexer/src/vecs/transactions.rs +++ b/crates/brk_indexer/src/vecs/transactions.rs @@ -5,7 +5,7 @@ use brk_types::{ Version, }; use rayon::prelude::*; -use vecdb::{AnyStoredVec, BytesVec, Database, WritableVec, ImportableVec, PcoVec, Stamp}; +use vecdb::{AnyStoredVec, BytesVec, Database, ImportableVec, PcoVec, Stamp, WritableVec}; use crate::parallel_import; @@ -23,7 +23,39 @@ pub struct TransactionsVecs { pub first_txoutindex: BytesVec, } +pub struct TxMetadataVecs<'a> { + pub height: &'a mut PcoVec, + pub txversion: &'a mut PcoVec, + pub txid: &'a mut BytesVec, + pub rawlocktime: &'a mut PcoVec, + pub base_size: &'a mut PcoVec, + pub total_size: &'a mut PcoVec, + pub is_explicitly_rbf: &'a mut PcoVec, +} + impl TransactionsVecs { + pub fn split_for_finalize( + &mut self, + ) -> ( + &mut BytesVec, + &mut PcoVec, + TxMetadataVecs<'_>, + ) { + ( + &mut self.first_txoutindex, + &mut self.first_txinindex, + TxMetadataVecs { + height: &mut self.height, + txversion: &mut self.txversion, + txid: &mut self.txid, + rawlocktime: &mut self.rawlocktime, + base_size: &mut self.base_size, + total_size: &mut self.total_size, + is_explicitly_rbf: &mut self.is_explicitly_rbf, + }, + ) + } + pub fn forced_import(db: &Database, version: Version) -> Result { let ( first_txindex, @@ -65,10 +97,8 @@ impl TransactionsVecs { pub fn truncate(&mut self, height: Height, txindex: TxIndex, stamp: Stamp) -> Result<()> { self.first_txindex .truncate_if_needed_with_stamp(height, stamp)?; - self.height - .truncate_if_needed_with_stamp(txindex, stamp)?; - self.txid - .truncate_if_needed_with_stamp(txindex, stamp)?; + self.height.truncate_if_needed_with_stamp(txindex, stamp)?; + self.txid.truncate_if_needed_with_stamp(txindex, stamp)?; self.txversion .truncate_if_needed_with_stamp(txindex, stamp)?; self.rawlocktime