From f280b03cabe78d49d2654a31e6ade87bf48fc40d Mon Sep 17 00:00:00 2001 From: nym21 Date: Sat, 6 Dec 2025 16:32:57 +0100 Subject: [PATCH] indexer: split --- crates/brk_indexer/src/indexes.rs | 17 + crates/brk_indexer/src/lib.rs | 639 ++------------------------ crates/brk_indexer/src/processor.rs | 688 ++++++++++++++++++++++++++++ crates/brk_indexer/src/readers.rs | 33 ++ crates/brk_indexer/src/vecs.rs | 59 ++- 5 files changed, 834 insertions(+), 602 deletions(-) create mode 100644 crates/brk_indexer/src/processor.rs create mode 100644 crates/brk_indexer/src/readers.rs diff --git a/crates/brk_indexer/src/indexes.rs b/crates/brk_indexer/src/indexes.rs index c0d278ab7..0decf389a 100644 --- a/crates/brk_indexer/src/indexes.rs +++ b/crates/brk_indexer/src/indexes.rs @@ -49,6 +49,23 @@ impl Indexes { } } + /// Increments the address index for the given address type and returns the previous value. + /// Only call this for address types (P2PK65, P2PK33, P2PKH, P2SH, P2WPKH, P2WSH, P2TR, P2A). + #[inline] + pub fn increment_address_index(&mut self, addresstype: OutputType) -> TypeIndex { + match addresstype { + OutputType::P2PK65 => self.p2pk65addressindex.copy_then_increment(), + OutputType::P2PK33 => self.p2pk33addressindex.copy_then_increment(), + OutputType::P2PKH => self.p2pkhaddressindex.copy_then_increment(), + OutputType::P2SH => self.p2shaddressindex.copy_then_increment(), + OutputType::P2WPKH => self.p2wpkhaddressindex.copy_then_increment(), + OutputType::P2WSH => self.p2wshaddressindex.copy_then_increment(), + OutputType::P2TR => self.p2traddressindex.copy_then_increment(), + OutputType::P2A => self.p2aaddressindex.copy_then_increment(), + _ => unreachable!(), + } + } + pub fn push_if_needed(&self, vecs: &mut Vecs) -> Result<()> { let height = self.height; vecs.height_to_first_txindex diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index dad72fd53..bbc39201e 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -2,29 +2,24 @@ use std::{path::Path, thread, time::Instant}; -use bitcoin::{TxIn, TxOut}; -use brk_error::{Error, Result}; -use brk_grouper::ByAddressType; +use brk_error::Result; use brk_iterator::Blocks; use brk_rpc::Client; -use brk_store::AnyStore; -use brk_types::{ - AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, - OutPoint, OutputType, Sats, StoredBool, Timestamp, TxInIndex, TxIndex, TxOutIndex, Txid, - TxidPrefix, TypeIndex, Unit, Vin, Vout, -}; -use log::{debug, error, info}; -use rayon::prelude::*; -use rustc_hash::{FxHashMap, FxHashSet}; -use vecdb::{AnyVec, Exit, GenericStoredVec, Reader, TypedVecIterator}; +use brk_types::Height; +use log::{debug, info}; +use vecdb::Exit; +mod constants; mod indexes; +mod processor; +mod readers; mod stores_v2; // mod stores_v3; -mod constants; mod vecs; use constants::*; pub use indexes::*; +pub use processor::*; +pub use readers::*; pub use stores_v2::*; // pub use stores_v3::*; pub use vecs::*; @@ -133,565 +128,59 @@ impl Indexer { for block in blocks.after(prev_hash)? { let height = block.height(); - let blockhash = block.hash(); info!("Indexing block {height}..."); indexes.height = height; // Used to check rapidhash collisions - let check_collisions = check_collisions && height > COLLISIONS_CHECKED_UP_TO; + let block_check_collisions = check_collisions && height > COLLISIONS_CHECKED_UP_TO; - let blockhash_prefix = BlockHashPrefix::from(blockhash); - - if stores - .blockhashprefix_to_height - .get(&blockhash_prefix)? - .is_some_and(|prev_height| *prev_height != height) - { - error!("BlockHash: {blockhash}"); - return Err(Error::Str("Collision, expect prefix to need be set yet")); - } - - indexes.push_if_needed(vecs)?; - - stores - .blockhashprefix_to_height - .insert_if_needed(blockhash_prefix, height, height); - - stores.height_to_coinbase_tag.insert_if_needed( + let mut processor = BlockProcessor { + block: &block, height, - block.coinbase_tag().into(), - height, - ); + check_collisions: block_check_collisions, + indexes: &mut indexes, + vecs, + stores, + readers: &readers, + }; - vecs.height_to_blockhash - .push_if_needed(height, blockhash.clone())?; - vecs.height_to_difficulty - .push_if_needed(height, block.header.difficulty_float().into())?; - vecs.height_to_timestamp - .push_if_needed(height, Timestamp::from(block.header.time))?; - vecs.height_to_total_size - .push_if_needed(height, block.total_size().into())?; - vecs.height_to_weight - .push_if_needed(height, block.weight().into())?; + // Phase 1: Process block metadata + processor.process_block_metadata()?; - let txs = block - .txdata - .par_iter() - .enumerate() - .map(|(index, tx)| { - // par_iter due to compute_txid being costly - let txid = Txid::from(tx.compute_txid()); + // Phase 2: Compute TXIDs in parallel + let txs = processor.compute_txids()?; - let txid_prefix = TxidPrefix::from(&txid); + // Phase 3: Process inputs in parallel + let txins = processor.process_inputs(&txs)?; - let prev_txindex_opt = - if check_collisions && stores.txidprefix_to_txindex.needs(height) { - // Should only find collisions for two txids (duplicates), see below - stores.txidprefix_to_txindex.get(&txid_prefix)?.map(|v| *v) - } else { - None - }; + // Phase 4: Collect same-block spent outpoints + let same_block_spent_outpoints = + BlockProcessor::collect_same_block_spent_outpoints(&txins); - Ok(( - indexes.txindex + TxIndex::from(index), - tx, - txid, - txid_prefix, - prev_txindex_opt, - )) - }) - .collect::>>()?; + // Phase 5: Process outputs in parallel + let txouts = processor.process_outputs()?; - let txid_prefix_to_txindex = txs - .iter() - .map(|(txindex, _, _, prefix, _)| (*prefix, txindex)) - .collect::>(); - let txins = block - .txdata - .iter() - .enumerate() - .flat_map(|(index, tx)| tx - .input - .iter() - .enumerate() - .map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx)) - ) - .collect::>() - .into_par_iter() - .enumerate() - .map(|(block_txinindex, (block_txindex, vin, txin, tx))| -> Result<(TxInIndex, InputSource)> { - let txindex = indexes.txindex + block_txindex; - let txinindex = indexes.txinindex + TxInIndex::from(block_txinindex); - - if tx.is_coinbase() { - return Ok((txinindex, InputSource::SameBlock((txindex, txin, vin, OutPoint::COINBASE)))); - } - - let outpoint = txin.previous_output; - let txid = Txid::from(outpoint.txid); - let txid_prefix = TxidPrefix::from(&txid); - - let prev_txindex = if let Some(txindex) = stores - .txidprefix_to_txindex - .get(&txid_prefix)? - .map(|v| *v) - .and_then(|txindex| { - // Checking if not finding txindex from the future - (txindex < indexes.txindex).then_some(txindex) - }) { - txindex - } else { - let vout = Vout::from(outpoint.vout); - - let prev_txindex = **txid_prefix_to_txindex - .get(&txid_prefix) - .ok_or(Error::Str("txid should be in same block")).inspect_err(|_| { - dbg!(&txs); - // panic!(); - })?; - - let outpoint = OutPoint::new(prev_txindex, vout); - - return Ok((txinindex, InputSource::SameBlock((txindex, txin, vin, outpoint)))); - }; - - let vout = Vout::from(outpoint.vout); - - let txoutindex = vecs.txindex_to_first_txoutindex.get_pushed_or_read(prev_txindex, &readers.txindex_to_first_txoutindex)? - .ok_or(Error::Str("Expect txoutindex to not be none")) - .inspect_err(|_| { - dbg!(outpoint.txid, prev_txindex, vout); - })? - + vout; - - let outpoint = OutPoint::new(prev_txindex, vout); - - let outputtype = vecs.txoutindex_to_outputtype.get_pushed_or_read(txoutindex, &readers.txoutindex_to_outputtype)? - .ok_or(Error::Str("Expect outputtype to not be none"))?; - - let mut tuple = ( - vin, - txindex, - outpoint, - None - ); - - // Rare but happens - // https://mempool.space/tx/8ebe1df6ebf008f7ec42ccd022478c9afaec3ca0444322243b745aa2e317c272#flow=&vin=89 - if outputtype.is_address() { - let typeindex = vecs - .txoutindex_to_typeindex - .get_pushed_or_read(txoutindex, &readers.txoutindex_to_typeindex)? - .ok_or(Error::Str("Expect typeindex to not be none"))?; - tuple.3 = Some((outputtype, typeindex)); - } - - Ok((txinindex, InputSource::PreviousBlock(tuple))) - }) - .collect::>>()?; - drop(txid_prefix_to_txindex); - - let same_block_spent_outpoints: FxHashSet = txins - .iter() - .filter_map(|(_, input_source)| { - let InputSource::SameBlock((_, _, _, outpoint)) = input_source else { - return None; - }; - if !outpoint.is_coinbase() { - Some(*outpoint) - } else { - None - } - }) - .collect(); - - let txouts = block - .txdata - .iter() - .enumerate() - .flat_map(|(index, tx)| { - tx.output.iter().enumerate().map(move |(vout, txout)| { - (TxIndex::from(index), Vout::from(vout), txout, tx) - }) - }) - .collect::>() - .into_par_iter() - .enumerate() - .map( - #[allow(clippy::type_complexity)] - |(block_txoutindex, (block_txindex, vout, txout, tx))| -> Result<( - TxOutIndex, - &TxOut, - TxIndex, - Vout, - OutputType, - Option<(AddressBytes, AddressHash)>, - Option, - )> { - let txindex = indexes.txindex + block_txindex; - let txoutindex = indexes.txoutindex + TxOutIndex::from(block_txoutindex); - - let script = &txout.script_pubkey; - - let outputtype = OutputType::from(script); - - let mut tuple = (txoutindex, txout, txindex, vout, outputtype, None, None); - - if outputtype.is_not_address() { - return Ok(tuple); - } - - let addresstype = outputtype; - - let address_bytes = AddressBytes::try_from((script, addresstype)).unwrap(); - - let address_hash = AddressHash::from(&address_bytes); - - let typeindex_opt = stores - .addresstype_to_addresshash_to_addressindex - .get_unwrap(addresstype) - .get(&address_hash) - .unwrap() - .map(|v| *v) - // Checking if not in the future (in case we started before the last processed block) - .and_then(|typeindex_local| { - (typeindex_local < indexes.to_typeindex(addresstype)) - .then_some(typeindex_local) - }); - - tuple.5 = Some((address_bytes, address_hash)); - tuple.6 = typeindex_opt; - - if check_collisions && let Some(typeindex) = typeindex_opt { - // unreachable!(); - - let prev_addressbytes_opt = match addresstype { - OutputType::P2PK65 => vecs - .p2pk65addressindex_to_p2pk65bytes - .get_pushed_or_read( - typeindex.into(), - &readers.p2pk65addressindex_to_p2pk65bytes, - )? - .map(AddressBytes::from), - OutputType::P2PK33 => vecs - .p2pk33addressindex_to_p2pk33bytes - .get_pushed_or_read( - typeindex.into(), - &readers.p2pk33addressindex_to_p2pk33bytes, - )? - .map(AddressBytes::from), - OutputType::P2PKH => vecs - .p2pkhaddressindex_to_p2pkhbytes - .get_pushed_or_read( - typeindex.into(), - &readers.p2pkhaddressindex_to_p2pkhbytes, - )? - .map(AddressBytes::from), - OutputType::P2SH => vecs - .p2shaddressindex_to_p2shbytes - .get_pushed_or_read( - typeindex.into(), - &readers.p2shaddressindex_to_p2shbytes, - )? - .map(AddressBytes::from), - OutputType::P2WPKH => vecs - .p2wpkhaddressindex_to_p2wpkhbytes - .get_pushed_or_read( - typeindex.into(), - &readers.p2wpkhaddressindex_to_p2wpkhbytes, - )? - .map(AddressBytes::from), - OutputType::P2WSH => vecs - .p2wshaddressindex_to_p2wshbytes - .get_pushed_or_read( - typeindex.into(), - &readers.p2wshaddressindex_to_p2wshbytes, - )? - .map(AddressBytes::from), - OutputType::P2TR => vecs - .p2traddressindex_to_p2trbytes - .get_pushed_or_read( - typeindex.into(), - &readers.p2traddressindex_to_p2trbytes, - )? - .map(AddressBytes::from), - OutputType::P2A => vecs - .p2aaddressindex_to_p2abytes - .get_pushed_or_read( - typeindex.into(), - &readers.p2aaddressindex_to_p2abytes, - )? - .map(AddressBytes::from), - _ => { - unreachable!() - } - }; - let prev_addressbytes = prev_addressbytes_opt - .as_ref() - .ok_or(Error::Str("Expect to have addressbytes"))?; - - let address_bytes = &tuple.5.as_ref().unwrap().0; - - if stores - .addresstype_to_addresshash_to_addressindex - .get_unwrap(addresstype) - .needs(height) - && prev_addressbytes != address_bytes - { - let txid = tx.compute_txid(); - dbg!( - height, - txid, - vout, - block_txindex, - addresstype, - prev_addressbytes, - address_bytes, - &indexes, - typeindex, - typeindex, - txout, - AddressHash::from(address_bytes), - ); - panic!() - } - } - - Ok(tuple) - }, - ) - .collect::>>()?; - - let outputs_len = txouts.len(); - let inputs_len = txins.len(); let tx_len = block.txdata.len(); + let inputs_len = txins.len(); + let outputs_len = txouts.len(); - let mut already_added_addresshash: ByAddressType> = - ByAddressType::default(); - let mut same_block_output_info: FxHashMap = - FxHashMap::default(); - for (txoutindex, txout, txindex, vout, outputtype, addressbytes_opt, typeindex_opt) in - txouts - { - let sats = Sats::from(txout.value); + // Phase 6: Finalize outputs sequentially + let mut same_block_output_info = + processor.finalize_outputs(txouts, &same_block_spent_outpoints)?; - if vout.is_zero() { - vecs.txindex_to_first_txoutindex - .push_if_needed(txindex, txoutindex)?; - } + // Phase 7: Finalize inputs sequentially + processor.finalize_inputs(txins, &mut same_block_output_info)?; - vecs.txoutindex_to_value.push_if_needed(txoutindex, sats)?; + // Phase 8: Check TXID collisions + processor.check_txid_collisions(&txs)?; - vecs.txoutindex_to_txindex - .push_if_needed(txoutindex, txindex)?; + // Phase 9: Store transaction metadata + processor.store_transaction_metadata(txs)?; - vecs.txoutindex_to_outputtype - .push_if_needed(txoutindex, outputtype)?; - - let typeindex = if let Some(ti) = typeindex_opt { - ti - } else if let Some((address_bytes, address_hash)) = addressbytes_opt { - let addresstype = outputtype; - if let Some(&ti) = already_added_addresshash - .get_unwrap(addresstype) - .get(&address_hash) - { - ti - } else { - let ti = match addresstype { - OutputType::P2PK65 => indexes.p2pk65addressindex.copy_then_increment(), - OutputType::P2PK33 => indexes.p2pk33addressindex.copy_then_increment(), - OutputType::P2PKH => indexes.p2pkhaddressindex.copy_then_increment(), - OutputType::P2SH => indexes.p2shaddressindex.copy_then_increment(), - OutputType::P2WPKH => indexes.p2wpkhaddressindex.copy_then_increment(), - OutputType::P2WSH => indexes.p2wshaddressindex.copy_then_increment(), - OutputType::P2TR => indexes.p2traddressindex.copy_then_increment(), - OutputType::P2A => indexes.p2aaddressindex.copy_then_increment(), - _ => unreachable!(), - }; - - already_added_addresshash - .get_mut_unwrap(addresstype) - .insert(address_hash, ti); - stores - .addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(addresstype) - .insert_if_needed(address_hash, ti, height); - vecs.push_bytes_if_needed(ti, address_bytes)?; - - ti - } - } else { - match outputtype { - OutputType::P2MS => { - vecs.p2msoutputindex_to_txindex - .push_if_needed(indexes.p2msoutputindex, txindex)?; - indexes.p2msoutputindex.copy_then_increment() - } - OutputType::OpReturn => { - vecs.opreturnindex_to_txindex - .push_if_needed(indexes.opreturnindex, txindex)?; - indexes.opreturnindex.copy_then_increment() - } - OutputType::Empty => { - vecs.emptyoutputindex_to_txindex - .push_if_needed(indexes.emptyoutputindex, txindex)?; - indexes.emptyoutputindex.copy_then_increment() - } - OutputType::Unknown => { - vecs.unknownoutputindex_to_txindex - .push_if_needed(indexes.unknownoutputindex, txindex)?; - indexes.unknownoutputindex.copy_then_increment() - } - _ => unreachable!(), - } - }; - - vecs.txoutindex_to_typeindex - .push_if_needed(txoutindex, typeindex)?; - - if outputtype.is_unspendable() { - continue; - } else if outputtype.is_address() { - let addresstype = outputtype; - let addressindex = typeindex; - - stores - .addresstype_to_addressindex_and_txindex - .get_mut_unwrap(addresstype) - .insert_if_needed( - AddressIndexTxIndex::from((addressindex, txindex)), - Unit, - height, - ); - } - - let outpoint = OutPoint::new(txindex, vout); - - if same_block_spent_outpoints.contains(&outpoint) { - same_block_output_info.insert(outpoint, (outputtype, typeindex)); - } else if outputtype.is_address() { - let addresstype = outputtype; - let addressindex = typeindex; - - stores - .addresstype_to_addressindex_and_unspentoutpoint - .get_mut_unwrap(addresstype) - .insert_if_needed( - AddressIndexOutPoint::from((addressindex, outpoint)), - Unit, - height, - ); - } - } - - for (txinindex, input_source) in txins { - let (vin, txindex, outpoint, addresstype_addressindex_opt) = match input_source { - InputSource::PreviousBlock(tuple) => tuple, - InputSource::SameBlock((txindex, txin, vin, outpoint)) => { - let mut tuple = (vin, txindex, outpoint, None); - if outpoint.is_coinbase() { - tuple - } else { - let outputtype_typeindex = same_block_output_info - .remove(&outpoint) - .ok_or(Error::Str("should have found addressindex from same block")) - .inspect_err(|_| { - dbg!(&same_block_output_info, txin); - })?; - if outputtype_typeindex.0.is_address() { - tuple.3 = Some(outputtype_typeindex); - } - (tuple.0, tuple.1, tuple.2, tuple.3) - } - } - }; - - if vin.is_zero() { - vecs.txindex_to_first_txinindex - .push_if_needed(txindex, txinindex)?; - } - - vecs.txinindex_to_outpoint - .push_if_needed(txinindex, outpoint)?; - - let Some((addresstype, addressindex)) = addresstype_addressindex_opt else { - continue; - }; - - stores - .addresstype_to_addressindex_and_txindex - .get_mut_unwrap(addresstype) - .insert_if_needed( - AddressIndexTxIndex::from((addressindex, txindex)), - Unit, - height, - ); - - stores - .addresstype_to_addressindex_and_unspentoutpoint - .get_mut_unwrap(addresstype) - .remove_if_needed(AddressIndexOutPoint::from((addressindex, outpoint)), height); - } - - if check_collisions { - let mut txindex_to_txid_iter = vecs.txindex_to_txid.into_iter(); - for (txindex, _, _, _, prev_txindex_opt) in txs.iter() { - let Some(prev_txindex) = prev_txindex_opt else { - continue; - }; - - // In case if we start at an already parsed height - if txindex == prev_txindex { - continue; - } - - let len = vecs.txindex_to_txid.len(); - // Ok if `get` is not par as should happen only twice - let prev_txid = txindex_to_txid_iter - .get(*prev_txindex) - .ok_or(Error::Str("To have txid for txindex")) - .inspect_err(|_| { - dbg!(txindex, len); - })?; - - // If another Txid needs to be added to the list - // We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated - let is_dup = DUPLICATE_TXIDS.contains(&prev_txid); - - if !is_dup { - dbg!(height, txindex, prev_txid, prev_txindex); - return Err(Error::Str("Expect none")); - } - } - } - - for (txindex, tx, txid, txid_prefix, prev_txindex_opt) in txs { - if prev_txindex_opt.is_none() { - stores - .txidprefix_to_txindex - .insert_if_needed(txid_prefix, txindex, height); - } - - vecs.txindex_to_height.push_if_needed(txindex, height)?; - vecs.txindex_to_txversion - .push_if_needed(txindex, tx.version.into())?; - vecs.txindex_to_txid.push_if_needed(txindex, txid)?; - vecs.txindex_to_rawlocktime - .push_if_needed(txindex, tx.lock_time.into())?; - vecs.txindex_to_base_size - .push_if_needed(txindex, tx.base_size().into())?; - vecs.txindex_to_total_size - .push_if_needed(txindex, tx.total_size().into())?; - vecs.txindex_to_is_explicitly_rbf - .push_if_needed(txindex, StoredBool::from(tx.is_explicitly_rbf()))?; - } - - indexes.txindex += TxIndex::from(tx_len); - indexes.txinindex += TxInIndex::from(inputs_len); - indexes.txoutindex += TxOutIndex::from(outputs_len); + // Phase 10: Update indexes + processor.update_indexes(tx_len, inputs_len, outputs_len); if should_export(height, false) { drop(readers); @@ -711,47 +200,3 @@ impl Indexer { Ok(starting_indexes) } } - -#[derive(Debug)] -enum InputSource<'a> { - PreviousBlock((Vin, TxIndex, OutPoint, Option<(OutputType, TypeIndex)>)), - SameBlock((TxIndex, &'a TxIn, Vin, OutPoint)), -} - -struct Readers { - txindex_to_first_txoutindex: Reader, - txoutindex_to_outputtype: Reader, - txoutindex_to_typeindex: Reader, - p2pk65addressindex_to_p2pk65bytes: Reader, - p2pk33addressindex_to_p2pk33bytes: Reader, - p2pkhaddressindex_to_p2pkhbytes: Reader, - p2shaddressindex_to_p2shbytes: Reader, - p2wpkhaddressindex_to_p2wpkhbytes: Reader, - p2wshaddressindex_to_p2wshbytes: Reader, - p2traddressindex_to_p2trbytes: Reader, - p2aaddressindex_to_p2abytes: Reader, -} - -impl Readers { - fn new(vecs: &Vecs) -> Self { - Self { - txindex_to_first_txoutindex: vecs.txindex_to_first_txoutindex.create_reader(), - txoutindex_to_outputtype: vecs.txoutindex_to_outputtype.create_reader(), - txoutindex_to_typeindex: vecs.txoutindex_to_typeindex.create_reader(), - p2pk65addressindex_to_p2pk65bytes: vecs - .p2pk65addressindex_to_p2pk65bytes - .create_reader(), - p2pk33addressindex_to_p2pk33bytes: vecs - .p2pk33addressindex_to_p2pk33bytes - .create_reader(), - p2pkhaddressindex_to_p2pkhbytes: vecs.p2pkhaddressindex_to_p2pkhbytes.create_reader(), - p2shaddressindex_to_p2shbytes: vecs.p2shaddressindex_to_p2shbytes.create_reader(), - p2wpkhaddressindex_to_p2wpkhbytes: vecs - .p2wpkhaddressindex_to_p2wpkhbytes - .create_reader(), - p2wshaddressindex_to_p2wshbytes: vecs.p2wshaddressindex_to_p2wshbytes.create_reader(), - p2traddressindex_to_p2trbytes: vecs.p2traddressindex_to_p2trbytes.create_reader(), - p2aaddressindex_to_p2abytes: vecs.p2aaddressindex_to_p2abytes.create_reader(), - } - } -} diff --git a/crates/brk_indexer/src/processor.rs b/crates/brk_indexer/src/processor.rs new file mode 100644 index 000000000..67ce75eef --- /dev/null +++ b/crates/brk_indexer/src/processor.rs @@ -0,0 +1,688 @@ +use bitcoin::{Transaction, TxIn, TxOut}; +use brk_error::{Error, Result}; +use brk_grouper::ByAddressType; +use brk_store::AnyStore; +use brk_types::{ + AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, Block, BlockHashPrefix, + Height, OutPoint, OutputType, Sats, StoredBool, Timestamp, TxInIndex, TxIndex, TxOutIndex, + Txid, TxidPrefix, TypeIndex, Unit, Vin, Vout, +}; +use log::error; +use rayon::prelude::*; +use rustc_hash::{FxHashMap, FxHashSet}; +use vecdb::{AnyVec, GenericStoredVec, TypedVecIterator}; + +use crate::{Indexes, Readers, Stores, Vecs, constants::*}; + +/// Input source for tracking where an input came from. +#[derive(Debug)] +pub enum InputSource<'a> { + PreviousBlock { + vin: Vin, + txindex: TxIndex, + outpoint: OutPoint, + address_info: Option<(OutputType, TypeIndex)>, + }, + SameBlock { + txindex: TxIndex, + txin: &'a TxIn, + vin: Vin, + outpoint: OutPoint, + }, +} + +/// Processed output data from parallel output processing. +pub struct ProcessedOutput<'a> { + pub txoutindex: TxOutIndex, + pub txout: &'a TxOut, + pub txindex: TxIndex, + pub vout: Vout, + pub outputtype: OutputType, + pub address_info: Option<(AddressBytes, AddressHash)>, + pub existing_typeindex: Option, +} + +/// Computed transaction data from parallel TXID computation. +pub struct ComputedTx<'a> { + pub txindex: TxIndex, + pub tx: &'a Transaction, + pub txid: Txid, + pub txid_prefix: TxidPrefix, + pub prev_txindex_opt: Option, +} + +/// Processes a single block, extracting and storing all indexed data. +pub struct BlockProcessor<'a> { + pub block: &'a Block, + pub height: Height, + pub check_collisions: bool, + pub indexes: &'a mut Indexes, + pub vecs: &'a mut Vecs, + pub stores: &'a mut Stores, + pub readers: &'a Readers, +} + +impl<'a> BlockProcessor<'a> { + /// Process block metadata (blockhash, difficulty, timestamp, etc.) + pub fn process_block_metadata(&mut self) -> Result<()> { + let height = self.height; + let blockhash = self.block.hash(); + let blockhash_prefix = BlockHashPrefix::from(blockhash); + + // Check for blockhash prefix collision + if self + .stores + .blockhashprefix_to_height + .get(&blockhash_prefix)? + .is_some_and(|prev_height| *prev_height != height) + { + error!("BlockHash: {blockhash}"); + return Err(Error::Str("Collision, expect prefix to need be set yet")); + } + + self.indexes.push_if_needed(self.vecs)?; + + self.stores + .blockhashprefix_to_height + .insert_if_needed(blockhash_prefix, height, height); + + self.stores.height_to_coinbase_tag.insert_if_needed( + height, + self.block.coinbase_tag().into(), + height, + ); + + self.vecs + .height_to_blockhash + .push_if_needed(height, blockhash.clone())?; + self.vecs + .height_to_difficulty + .push_if_needed(height, self.block.header.difficulty_float().into())?; + self.vecs + .height_to_timestamp + .push_if_needed(height, Timestamp::from(self.block.header.time))?; + self.vecs + .height_to_total_size + .push_if_needed(height, self.block.total_size().into())?; + self.vecs + .height_to_weight + .push_if_needed(height, self.block.weight().into())?; + + Ok(()) + } + + /// Compute TXIDs in parallel (CPU-intensive operation). + pub fn compute_txids(&self) -> Result>> { + let should_check_collisions = + self.check_collisions && self.stores.txidprefix_to_txindex.needs(self.height); + let base_txindex = self.indexes.txindex; + + self.block + .txdata + .par_iter() + .enumerate() + .map(|(index, tx)| { + let txid = Txid::from(tx.compute_txid()); + let txid_prefix = TxidPrefix::from(&txid); + + let prev_txindex_opt = if should_check_collisions { + self.stores + .txidprefix_to_txindex + .get(&txid_prefix)? + .map(|v| *v) + } else { + None + }; + + Ok(ComputedTx { + txindex: base_txindex + TxIndex::from(index), + tx, + txid, + txid_prefix, + prev_txindex_opt, + }) + }) + .collect() + } + + /// Process inputs in parallel. + /// + /// Uses collect().into_par_iter() pattern because: + /// 1. The inner work (store lookups, vector reads) is expensive + /// 2. We want to parallelize across ALL inputs, not just per-transaction + /// 3. The intermediate allocation (~8KB per block) is negligible compared to parallelism gains + pub fn process_inputs<'c>( + &self, + txs: &[ComputedTx<'c>], + ) -> Result)>> { + let txid_prefix_to_txindex: FxHashMap<_, _> = + txs.iter().map(|ct| (ct.txid_prefix, &ct.txindex)).collect(); + + let base_txindex = self.indexes.txindex; + let base_txinindex = self.indexes.txinindex; + + let txins = self + .block + .txdata + .iter() + .enumerate() + .flat_map(|(index, tx)| { + tx.input + .iter() + .enumerate() + .map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx)) + }) + .collect::>() + .into_par_iter() + .enumerate() + .map( + |(block_txinindex, (block_txindex, vin, txin, tx))| -> Result<(TxInIndex, InputSource)> { + let txindex = base_txindex + block_txindex; + let txinindex = base_txinindex + TxInIndex::from(block_txinindex); + + if tx.is_coinbase() { + return Ok(( + txinindex, + InputSource::SameBlock { + txindex, + txin, + vin, + outpoint: OutPoint::COINBASE, + }, + )); + } + + let outpoint = txin.previous_output; + let txid = Txid::from(outpoint.txid); + let txid_prefix = TxidPrefix::from(&txid); + + let prev_txindex = if let Some(txindex) = self + .stores + .txidprefix_to_txindex + .get(&txid_prefix)? + .map(|v| *v) + .and_then(|txindex| { + (txindex < self.indexes.txindex).then_some(txindex) + }) + { + txindex + } else { + let vout = Vout::from(outpoint.vout); + let prev_txindex = **txid_prefix_to_txindex + .get(&txid_prefix) + .ok_or(Error::Str("txid should be in same block"))?; + let outpoint = OutPoint::new(prev_txindex, vout); + + return Ok(( + txinindex, + InputSource::SameBlock { + txindex, + txin, + vin, + outpoint, + }, + )); + }; + + let vout = Vout::from(outpoint.vout); + let txoutindex = self + .vecs + .txindex_to_first_txoutindex + .get_pushed_or_read(prev_txindex, &self.readers.txindex_to_first_txoutindex)? + .ok_or(Error::Str("Expect txoutindex to not be none"))? + + vout; + + let outpoint = OutPoint::new(prev_txindex, vout); + let outputtype = self + .vecs + .txoutindex_to_outputtype + .get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_outputtype)? + .ok_or(Error::Str("Expect outputtype to not be none"))?; + + let address_info = if outputtype.is_address() { + let typeindex = self + .vecs + .txoutindex_to_typeindex + .get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_typeindex)? + .ok_or(Error::Str("Expect typeindex to not be none"))?; + Some((outputtype, typeindex)) + } else { + None + }; + + Ok(( + txinindex, + InputSource::PreviousBlock { + vin, + txindex, + outpoint, + address_info, + }, + )) + }, + ) + .collect::>>()?; + + Ok(txins) + } + + /// Collect same-block spent outpoints. + pub fn collect_same_block_spent_outpoints( + txins: &[(TxInIndex, InputSource)], + ) -> FxHashSet { + txins + .iter() + .filter_map(|(_, input_source)| { + let InputSource::SameBlock { outpoint, .. } = input_source else { + return None; + }; + if !outpoint.is_coinbase() { + Some(*outpoint) + } else { + None + } + }) + .collect() + } + + /// Process outputs in parallel. + pub fn process_outputs(&self) -> Result>> { + let height = self.height; + let check_collisions = self.check_collisions; + + let base_txindex = self.indexes.txindex; + let base_txoutindex = self.indexes.txoutindex; + + // Same pattern as inputs: collect then parallelize for maximum parallelism + self.block + .txdata + .iter() + .enumerate() + .flat_map(|(index, tx)| { + tx.output + .iter() + .enumerate() + .map(move |(vout, txout)| (TxIndex::from(index), Vout::from(vout), txout, tx)) + }) + .collect::>() + .into_par_iter() + .enumerate() + .map( + |(block_txoutindex, (block_txindex, vout, txout, tx))| -> Result { + let txindex = base_txindex + block_txindex; + let txoutindex = base_txoutindex + TxOutIndex::from(block_txoutindex); + + let script = &txout.script_pubkey; + let outputtype = OutputType::from(script); + + if outputtype.is_not_address() { + return Ok(ProcessedOutput { + txoutindex, + txout, + txindex, + vout, + outputtype, + address_info: None, + existing_typeindex: None, + }); + } + + let addresstype = outputtype; + let address_bytes = AddressBytes::try_from((script, addresstype)).unwrap(); + let address_hash = AddressHash::from(&address_bytes); + + let existing_typeindex = self + .stores + .addresstype_to_addresshash_to_addressindex + .get_unwrap(addresstype) + .get(&address_hash) + .unwrap() + .map(|v| *v) + .and_then(|typeindex_local| { + (typeindex_local < self.indexes.to_typeindex(addresstype)) + .then_some(typeindex_local) + }); + + if check_collisions && let Some(typeindex) = existing_typeindex { + let prev_addressbytes_opt = self.vecs.get_addressbytes_by_type( + addresstype, + typeindex, + self.readers.addressbytes.get_unwrap(addresstype), + )?; + let prev_addressbytes = prev_addressbytes_opt + .as_ref() + .ok_or(Error::Str("Expect to have addressbytes"))?; + + if self + .stores + .addresstype_to_addresshash_to_addressindex + .get_unwrap(addresstype) + .needs(height) + && prev_addressbytes != &address_bytes + { + let txid = tx.compute_txid(); + dbg!( + height, + txid, + vout, + block_txindex, + addresstype, + prev_addressbytes, + &address_bytes, + &self.indexes, + typeindex, + txout, + AddressHash::from(&address_bytes), + ); + panic!() + } + } + + Ok(ProcessedOutput { + txoutindex, + txout, + txindex, + vout, + outputtype, + address_info: Some((address_bytes, address_hash)), + existing_typeindex, + }) + }, + ) + .collect() + } + + /// Finalize outputs sequentially (stores addresses, tracks UTXOs). + pub fn finalize_outputs( + &mut self, + txouts: Vec, + same_block_spent_outpoints: &FxHashSet, + ) -> Result> { + let height = self.height; + let mut already_added_addresshash: ByAddressType> = + ByAddressType::default(); + // Pre-size based on the number of same-block spent outpoints + let mut same_block_output_info: FxHashMap = + FxHashMap::with_capacity_and_hasher( + same_block_spent_outpoints.len(), + Default::default(), + ); + + for ProcessedOutput { + txoutindex, + txout, + txindex, + vout, + outputtype, + address_info, + existing_typeindex, + } in txouts + { + let sats = Sats::from(txout.value); + + if vout.is_zero() { + self.vecs + .txindex_to_first_txoutindex + .push_if_needed(txindex, txoutindex)?; + } + + self.vecs + .txoutindex_to_value + .push_if_needed(txoutindex, sats)?; + self.vecs + .txoutindex_to_txindex + .push_if_needed(txoutindex, txindex)?; + self.vecs + .txoutindex_to_outputtype + .push_if_needed(txoutindex, outputtype)?; + + let typeindex = if let Some(ti) = existing_typeindex { + ti + } else if let Some((address_bytes, address_hash)) = address_info { + let addresstype = outputtype; + if let Some(&ti) = already_added_addresshash + .get_unwrap(addresstype) + .get(&address_hash) + { + ti + } else { + let ti = self.indexes.increment_address_index(addresstype); + + already_added_addresshash + .get_mut_unwrap(addresstype) + .insert(address_hash, ti); + self.stores + .addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(addresstype) + .insert_if_needed(address_hash, ti, height); + self.vecs.push_bytes_if_needed(ti, address_bytes)?; + + ti + } + } else { + match outputtype { + OutputType::P2MS => { + self.vecs + .p2msoutputindex_to_txindex + .push_if_needed(self.indexes.p2msoutputindex, txindex)?; + self.indexes.p2msoutputindex.copy_then_increment() + } + OutputType::OpReturn => { + self.vecs + .opreturnindex_to_txindex + .push_if_needed(self.indexes.opreturnindex, txindex)?; + self.indexes.opreturnindex.copy_then_increment() + } + OutputType::Empty => { + self.vecs + .emptyoutputindex_to_txindex + .push_if_needed(self.indexes.emptyoutputindex, txindex)?; + self.indexes.emptyoutputindex.copy_then_increment() + } + OutputType::Unknown => { + self.vecs + .unknownoutputindex_to_txindex + .push_if_needed(self.indexes.unknownoutputindex, txindex)?; + self.indexes.unknownoutputindex.copy_then_increment() + } + _ => unreachable!(), + } + }; + + self.vecs + .txoutindex_to_typeindex + .push_if_needed(txoutindex, typeindex)?; + + if outputtype.is_unspendable() { + continue; + } else if outputtype.is_address() { + let addresstype = outputtype; + let addressindex = typeindex; + + self.stores + .addresstype_to_addressindex_and_txindex + .get_mut_unwrap(addresstype) + .insert_if_needed( + AddressIndexTxIndex::from((addressindex, txindex)), + Unit, + height, + ); + } + + let outpoint = OutPoint::new(txindex, vout); + + if same_block_spent_outpoints.contains(&outpoint) { + same_block_output_info.insert(outpoint, (outputtype, typeindex)); + } else if outputtype.is_address() { + let addresstype = outputtype; + let addressindex = typeindex; + + self.stores + .addresstype_to_addressindex_and_unspentoutpoint + .get_mut_unwrap(addresstype) + .insert_if_needed( + AddressIndexOutPoint::from((addressindex, outpoint)), + Unit, + height, + ); + } + } + + Ok(same_block_output_info) + } + + /// Finalize inputs sequentially (stores outpoints, updates address UTXOs). + pub fn finalize_inputs( + &mut self, + txins: Vec<(TxInIndex, InputSource)>, + same_block_output_info: &mut FxHashMap, + ) -> Result<()> { + let height = self.height; + + for (txinindex, input_source) in txins { + let (vin, txindex, outpoint, address_info) = match input_source { + InputSource::PreviousBlock { + vin, + txindex, + outpoint, + address_info, + } => (vin, txindex, outpoint, address_info), + InputSource::SameBlock { + txindex, + txin, + vin, + outpoint, + } => { + if outpoint.is_coinbase() { + (vin, txindex, outpoint, None) + } else { + let outputtype_typeindex = same_block_output_info + .remove(&outpoint) + .ok_or(Error::Str("should have found addressindex from same block")) + .inspect_err(|_| { + dbg!(&same_block_output_info, txin); + })?; + let address_info = if outputtype_typeindex.0.is_address() { + Some(outputtype_typeindex) + } else { + None + }; + (vin, txindex, outpoint, address_info) + } + } + }; + + if vin.is_zero() { + self.vecs + .txindex_to_first_txinindex + .push_if_needed(txindex, txinindex)?; + } + + self.vecs + .txinindex_to_outpoint + .push_if_needed(txinindex, outpoint)?; + + let Some((addresstype, addressindex)) = address_info else { + continue; + }; + + self.stores + .addresstype_to_addressindex_and_txindex + .get_mut_unwrap(addresstype) + .insert_if_needed( + AddressIndexTxIndex::from((addressindex, txindex)), + Unit, + height, + ); + + self.stores + .addresstype_to_addressindex_and_unspentoutpoint + .get_mut_unwrap(addresstype) + .remove_if_needed(AddressIndexOutPoint::from((addressindex, outpoint)), height); + } + + Ok(()) + } + + /// Check for TXID collisions (only for known duplicate TXIDs). + pub fn check_txid_collisions(&self, txs: &[ComputedTx]) -> Result<()> { + if !self.check_collisions { + return Ok(()); + } + + let mut txindex_to_txid_iter = self.vecs.txindex_to_txid.into_iter(); + for ct in txs.iter() { + let Some(prev_txindex) = ct.prev_txindex_opt else { + continue; + }; + + // In case if we start at an already parsed height + if ct.txindex == prev_txindex { + continue; + } + + let len = self.vecs.txindex_to_txid.len(); + let prev_txid = txindex_to_txid_iter + .get(prev_txindex) + .ok_or(Error::Str("To have txid for txindex")) + .inspect_err(|_| { + dbg!(ct.txindex, len); + })?; + + let is_dup = DUPLICATE_TXIDS.contains(&prev_txid); + + if !is_dup { + dbg!(self.height, ct.txindex, prev_txid, prev_txindex); + return Err(Error::Str("Expect none")); + } + } + + Ok(()) + } + + /// Store transaction metadata. + pub fn store_transaction_metadata(&mut self, txs: Vec) -> Result<()> { + let height = self.height; + + for ct in txs { + if ct.prev_txindex_opt.is_none() { + self.stores.txidprefix_to_txindex.insert_if_needed( + ct.txid_prefix, + ct.txindex, + height, + ); + } + + self.vecs + .txindex_to_height + .push_if_needed(ct.txindex, height)?; + self.vecs + .txindex_to_txversion + .push_if_needed(ct.txindex, ct.tx.version.into())?; + self.vecs + .txindex_to_txid + .push_if_needed(ct.txindex, ct.txid)?; + self.vecs + .txindex_to_rawlocktime + .push_if_needed(ct.txindex, ct.tx.lock_time.into())?; + self.vecs + .txindex_to_base_size + .push_if_needed(ct.txindex, ct.tx.base_size().into())?; + self.vecs + .txindex_to_total_size + .push_if_needed(ct.txindex, ct.tx.total_size().into())?; + self.vecs + .txindex_to_is_explicitly_rbf + .push_if_needed(ct.txindex, StoredBool::from(ct.tx.is_explicitly_rbf()))?; + } + + Ok(()) + } + + /// Update global indexes after processing a block. + pub fn update_indexes(&mut self, tx_count: usize, input_count: usize, output_count: usize) { + self.indexes.txindex += TxIndex::from(tx_count); + self.indexes.txinindex += TxInIndex::from(input_count); + self.indexes.txoutindex += TxOutIndex::from(output_count); + } +} diff --git a/crates/brk_indexer/src/readers.rs b/crates/brk_indexer/src/readers.rs new file mode 100644 index 000000000..8bd32177b --- /dev/null +++ b/crates/brk_indexer/src/readers.rs @@ -0,0 +1,33 @@ +use brk_grouper::ByAddressType; +use vecdb::{GenericStoredVec, Reader}; + +use crate::Vecs; + +/// Readers for vectors that need to be accessed during block processing. +/// These provide consistent snapshots for reading while the main vectors are being modified. +pub struct Readers { + pub txindex_to_first_txoutindex: Reader, + pub txoutindex_to_outputtype: Reader, + pub txoutindex_to_typeindex: Reader, + pub addressbytes: ByAddressType, +} + +impl Readers { + pub fn new(vecs: &Vecs) -> Self { + Self { + txindex_to_first_txoutindex: vecs.txindex_to_first_txoutindex.create_reader(), + txoutindex_to_outputtype: vecs.txoutindex_to_outputtype.create_reader(), + txoutindex_to_typeindex: vecs.txoutindex_to_typeindex.create_reader(), + addressbytes: ByAddressType { + p2pk65: vecs.p2pk65addressindex_to_p2pk65bytes.create_reader(), + p2pk33: vecs.p2pk33addressindex_to_p2pk33bytes.create_reader(), + p2pkh: vecs.p2pkhaddressindex_to_p2pkhbytes.create_reader(), + p2sh: vecs.p2shaddressindex_to_p2shbytes.create_reader(), + p2wpkh: vecs.p2wpkhaddressindex_to_p2wpkhbytes.create_reader(), + p2wsh: vecs.p2wshaddressindex_to_p2wshbytes.create_reader(), + p2tr: vecs.p2traddressindex_to_p2trbytes.create_reader(), + p2a: vecs.p2aaddressindex_to_p2abytes.create_reader(), + }, + } + } +} diff --git a/crates/brk_indexer/src/vecs.rs b/crates/brk_indexer/src/vecs.rs index 3ef0e8961..eabef5926 100644 --- a/crates/brk_indexer/src/vecs.rs +++ b/crates/brk_indexer/src/vecs.rs @@ -12,8 +12,8 @@ use brk_types::{ }; use rayon::prelude::*; use vecdb::{ - AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, TypedVecIterator, PAGE_SIZE, - PcoVec, Stamp, + AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PAGE_SIZE, PcoVec, Reader, + Stamp, TypedVecIterator, }; use crate::Indexes; @@ -314,6 +314,52 @@ impl Vecs { Ok(()) } + /// Get address bytes by output type, using the reader for the specific address type. + /// Returns None if the index doesn't exist yet. + pub fn get_addressbytes_by_type( + &self, + addresstype: OutputType, + typeindex: TypeIndex, + reader: &Reader, + ) -> Result> { + match addresstype { + OutputType::P2PK65 => self + .p2pk65addressindex_to_p2pk65bytes + .get_pushed_or_read(typeindex.into(), reader) + .map(|opt| opt.map(AddressBytes::from)), + OutputType::P2PK33 => self + .p2pk33addressindex_to_p2pk33bytes + .get_pushed_or_read(typeindex.into(), reader) + .map(|opt| opt.map(AddressBytes::from)), + OutputType::P2PKH => self + .p2pkhaddressindex_to_p2pkhbytes + .get_pushed_or_read(typeindex.into(), reader) + .map(|opt| opt.map(AddressBytes::from)), + OutputType::P2SH => self + .p2shaddressindex_to_p2shbytes + .get_pushed_or_read(typeindex.into(), reader) + .map(|opt| opt.map(AddressBytes::from)), + OutputType::P2WPKH => self + .p2wpkhaddressindex_to_p2wpkhbytes + .get_pushed_or_read(typeindex.into(), reader) + .map(|opt| opt.map(AddressBytes::from)), + OutputType::P2WSH => self + .p2wshaddressindex_to_p2wshbytes + .get_pushed_or_read(typeindex.into(), reader) + .map(|opt| opt.map(AddressBytes::from)), + OutputType::P2TR => self + .p2traddressindex_to_p2trbytes + .get_pushed_or_read(typeindex.into(), reader) + .map(|opt| opt.map(AddressBytes::from)), + OutputType::P2A => self + .p2aaddressindex_to_p2abytes + .get_pushed_or_read(typeindex.into(), reader) + .map(|opt| opt.map(AddressBytes::from)), + _ => unreachable!("get_addressbytes_by_type called with non-address type"), + } + .map_err(|e| e.into()) + } + pub fn push_bytes_if_needed(&mut self, index: TypeIndex, bytes: AddressBytes) -> Result<()> { match bytes { AddressBytes::P2PK65(bytes) => self @@ -386,10 +432,13 @@ impl Vecs { index.increment(); AddressHash::from(&bytes) }) - })) as Box + '_>) + })) + as Box + '_>) + } + Err(_) => { + Ok(Box::new(std::iter::empty()) + as Box + '_>) } - Err(_) => Ok(Box::new(std::iter::empty()) - as Box + '_>), } }}; }