indexer: split

This commit is contained in:
nym21
2025-12-06 16:32:57 +01:00
parent 554c0e565d
commit f280b03cab
5 changed files with 834 additions and 602 deletions
+17
View File
@@ -49,6 +49,23 @@ impl Indexes {
}
}
/// Increments the address index for the given address type and returns the previous value.
/// Only call this for address types (P2PK65, P2PK33, P2PKH, P2SH, P2WPKH, P2WSH, P2TR, P2A).
#[inline]
pub fn increment_address_index(&mut self, addresstype: OutputType) -> TypeIndex {
match addresstype {
OutputType::P2PK65 => self.p2pk65addressindex.copy_then_increment(),
OutputType::P2PK33 => self.p2pk33addressindex.copy_then_increment(),
OutputType::P2PKH => self.p2pkhaddressindex.copy_then_increment(),
OutputType::P2SH => self.p2shaddressindex.copy_then_increment(),
OutputType::P2WPKH => self.p2wpkhaddressindex.copy_then_increment(),
OutputType::P2WSH => self.p2wshaddressindex.copy_then_increment(),
OutputType::P2TR => self.p2traddressindex.copy_then_increment(),
OutputType::P2A => self.p2aaddressindex.copy_then_increment(),
_ => unreachable!(),
}
}
pub fn push_if_needed(&self, vecs: &mut Vecs) -> Result<()> {
let height = self.height;
vecs.height_to_first_txindex
+42 -597
View File
@@ -2,29 +2,24 @@
use std::{path::Path, thread, time::Instant};
use bitcoin::{TxIn, TxOut};
use brk_error::{Error, Result};
use brk_grouper::ByAddressType;
use brk_error::Result;
use brk_iterator::Blocks;
use brk_rpc::Client;
use brk_store::AnyStore;
use brk_types::{
AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height,
OutPoint, OutputType, Sats, StoredBool, Timestamp, TxInIndex, TxIndex, TxOutIndex, Txid,
TxidPrefix, TypeIndex, Unit, Vin, Vout,
};
use log::{debug, error, info};
use rayon::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet};
use vecdb::{AnyVec, Exit, GenericStoredVec, Reader, TypedVecIterator};
use brk_types::Height;
use log::{debug, info};
use vecdb::Exit;
mod constants;
mod indexes;
mod processor;
mod readers;
mod stores_v2;
// mod stores_v3;
mod constants;
mod vecs;
use constants::*;
pub use indexes::*;
pub use processor::*;
pub use readers::*;
pub use stores_v2::*;
// pub use stores_v3::*;
pub use vecs::*;
@@ -133,565 +128,59 @@ impl Indexer {
for block in blocks.after(prev_hash)? {
let height = block.height();
let blockhash = block.hash();
info!("Indexing block {height}...");
indexes.height = height;
// Used to check rapidhash collisions
let check_collisions = check_collisions && height > COLLISIONS_CHECKED_UP_TO;
let block_check_collisions = check_collisions && height > COLLISIONS_CHECKED_UP_TO;
let blockhash_prefix = BlockHashPrefix::from(blockhash);
if stores
.blockhashprefix_to_height
.get(&blockhash_prefix)?
.is_some_and(|prev_height| *prev_height != height)
{
error!("BlockHash: {blockhash}");
return Err(Error::Str("Collision, expect prefix to need be set yet"));
}
indexes.push_if_needed(vecs)?;
stores
.blockhashprefix_to_height
.insert_if_needed(blockhash_prefix, height, height);
stores.height_to_coinbase_tag.insert_if_needed(
let mut processor = BlockProcessor {
block: &block,
height,
block.coinbase_tag().into(),
height,
);
check_collisions: block_check_collisions,
indexes: &mut indexes,
vecs,
stores,
readers: &readers,
};
vecs.height_to_blockhash
.push_if_needed(height, blockhash.clone())?;
vecs.height_to_difficulty
.push_if_needed(height, block.header.difficulty_float().into())?;
vecs.height_to_timestamp
.push_if_needed(height, Timestamp::from(block.header.time))?;
vecs.height_to_total_size
.push_if_needed(height, block.total_size().into())?;
vecs.height_to_weight
.push_if_needed(height, block.weight().into())?;
// Phase 1: Process block metadata
processor.process_block_metadata()?;
let txs = block
.txdata
.par_iter()
.enumerate()
.map(|(index, tx)| {
// par_iter due to compute_txid being costly
let txid = Txid::from(tx.compute_txid());
// Phase 2: Compute TXIDs in parallel
let txs = processor.compute_txids()?;
let txid_prefix = TxidPrefix::from(&txid);
// Phase 3: Process inputs in parallel
let txins = processor.process_inputs(&txs)?;
let prev_txindex_opt =
if check_collisions && stores.txidprefix_to_txindex.needs(height) {
// Should only find collisions for two txids (duplicates), see below
stores.txidprefix_to_txindex.get(&txid_prefix)?.map(|v| *v)
} else {
None
};
// Phase 4: Collect same-block spent outpoints
let same_block_spent_outpoints =
BlockProcessor::collect_same_block_spent_outpoints(&txins);
Ok((
indexes.txindex + TxIndex::from(index),
tx,
txid,
txid_prefix,
prev_txindex_opt,
))
})
.collect::<Result<Vec<_>>>()?;
// Phase 5: Process outputs in parallel
let txouts = processor.process_outputs()?;
let txid_prefix_to_txindex = txs
.iter()
.map(|(txindex, _, _, prefix, _)| (*prefix, txindex))
.collect::<FxHashMap<_, _>>();
let txins = block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| tx
.input
.iter()
.enumerate()
.map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx))
)
.collect::<Vec<_>>()
.into_par_iter()
.enumerate()
.map(|(block_txinindex, (block_txindex, vin, txin, tx))| -> Result<(TxInIndex, InputSource)> {
let txindex = indexes.txindex + block_txindex;
let txinindex = indexes.txinindex + TxInIndex::from(block_txinindex);
if tx.is_coinbase() {
return Ok((txinindex, InputSource::SameBlock((txindex, txin, vin, OutPoint::COINBASE))));
}
let outpoint = txin.previous_output;
let txid = Txid::from(outpoint.txid);
let txid_prefix = TxidPrefix::from(&txid);
let prev_txindex = if let Some(txindex) = stores
.txidprefix_to_txindex
.get(&txid_prefix)?
.map(|v| *v)
.and_then(|txindex| {
// Checking if not finding txindex from the future
(txindex < indexes.txindex).then_some(txindex)
}) {
txindex
} else {
let vout = Vout::from(outpoint.vout);
let prev_txindex = **txid_prefix_to_txindex
.get(&txid_prefix)
.ok_or(Error::Str("txid should be in same block")).inspect_err(|_| {
dbg!(&txs);
// panic!();
})?;
let outpoint = OutPoint::new(prev_txindex, vout);
return Ok((txinindex, InputSource::SameBlock((txindex, txin, vin, outpoint))));
};
let vout = Vout::from(outpoint.vout);
let txoutindex = vecs.txindex_to_first_txoutindex.get_pushed_or_read(prev_txindex, &readers.txindex_to_first_txoutindex)?
.ok_or(Error::Str("Expect txoutindex to not be none"))
.inspect_err(|_| {
dbg!(outpoint.txid, prev_txindex, vout);
})?
+ vout;
let outpoint = OutPoint::new(prev_txindex, vout);
let outputtype = vecs.txoutindex_to_outputtype.get_pushed_or_read(txoutindex, &readers.txoutindex_to_outputtype)?
.ok_or(Error::Str("Expect outputtype to not be none"))?;
let mut tuple = (
vin,
txindex,
outpoint,
None
);
// Rare but happens
// https://mempool.space/tx/8ebe1df6ebf008f7ec42ccd022478c9afaec3ca0444322243b745aa2e317c272#flow=&vin=89
if outputtype.is_address() {
let typeindex = vecs
.txoutindex_to_typeindex
.get_pushed_or_read(txoutindex, &readers.txoutindex_to_typeindex)?
.ok_or(Error::Str("Expect typeindex to not be none"))?;
tuple.3 = Some((outputtype, typeindex));
}
Ok((txinindex, InputSource::PreviousBlock(tuple)))
})
.collect::<Result<Vec<_>>>()?;
drop(txid_prefix_to_txindex);
let same_block_spent_outpoints: FxHashSet<OutPoint> = txins
.iter()
.filter_map(|(_, input_source)| {
let InputSource::SameBlock((_, _, _, outpoint)) = input_source else {
return None;
};
if !outpoint.is_coinbase() {
Some(*outpoint)
} else {
None
}
})
.collect();
let txouts = block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.output.iter().enumerate().map(move |(vout, txout)| {
(TxIndex::from(index), Vout::from(vout), txout, tx)
})
})
.collect::<Vec<_>>()
.into_par_iter()
.enumerate()
.map(
#[allow(clippy::type_complexity)]
|(block_txoutindex, (block_txindex, vout, txout, tx))| -> Result<(
TxOutIndex,
&TxOut,
TxIndex,
Vout,
OutputType,
Option<(AddressBytes, AddressHash)>,
Option<TypeIndex>,
)> {
let txindex = indexes.txindex + block_txindex;
let txoutindex = indexes.txoutindex + TxOutIndex::from(block_txoutindex);
let script = &txout.script_pubkey;
let outputtype = OutputType::from(script);
let mut tuple = (txoutindex, txout, txindex, vout, outputtype, None, None);
if outputtype.is_not_address() {
return Ok(tuple);
}
let addresstype = outputtype;
let address_bytes = AddressBytes::try_from((script, addresstype)).unwrap();
let address_hash = AddressHash::from(&address_bytes);
let typeindex_opt = stores
.addresstype_to_addresshash_to_addressindex
.get_unwrap(addresstype)
.get(&address_hash)
.unwrap()
.map(|v| *v)
// Checking if not in the future (in case we started before the last processed block)
.and_then(|typeindex_local| {
(typeindex_local < indexes.to_typeindex(addresstype))
.then_some(typeindex_local)
});
tuple.5 = Some((address_bytes, address_hash));
tuple.6 = typeindex_opt;
if check_collisions && let Some(typeindex) = typeindex_opt {
// unreachable!();
let prev_addressbytes_opt = match addresstype {
OutputType::P2PK65 => vecs
.p2pk65addressindex_to_p2pk65bytes
.get_pushed_or_read(
typeindex.into(),
&readers.p2pk65addressindex_to_p2pk65bytes,
)?
.map(AddressBytes::from),
OutputType::P2PK33 => vecs
.p2pk33addressindex_to_p2pk33bytes
.get_pushed_or_read(
typeindex.into(),
&readers.p2pk33addressindex_to_p2pk33bytes,
)?
.map(AddressBytes::from),
OutputType::P2PKH => vecs
.p2pkhaddressindex_to_p2pkhbytes
.get_pushed_or_read(
typeindex.into(),
&readers.p2pkhaddressindex_to_p2pkhbytes,
)?
.map(AddressBytes::from),
OutputType::P2SH => vecs
.p2shaddressindex_to_p2shbytes
.get_pushed_or_read(
typeindex.into(),
&readers.p2shaddressindex_to_p2shbytes,
)?
.map(AddressBytes::from),
OutputType::P2WPKH => vecs
.p2wpkhaddressindex_to_p2wpkhbytes
.get_pushed_or_read(
typeindex.into(),
&readers.p2wpkhaddressindex_to_p2wpkhbytes,
)?
.map(AddressBytes::from),
OutputType::P2WSH => vecs
.p2wshaddressindex_to_p2wshbytes
.get_pushed_or_read(
typeindex.into(),
&readers.p2wshaddressindex_to_p2wshbytes,
)?
.map(AddressBytes::from),
OutputType::P2TR => vecs
.p2traddressindex_to_p2trbytes
.get_pushed_or_read(
typeindex.into(),
&readers.p2traddressindex_to_p2trbytes,
)?
.map(AddressBytes::from),
OutputType::P2A => vecs
.p2aaddressindex_to_p2abytes
.get_pushed_or_read(
typeindex.into(),
&readers.p2aaddressindex_to_p2abytes,
)?
.map(AddressBytes::from),
_ => {
unreachable!()
}
};
let prev_addressbytes = prev_addressbytes_opt
.as_ref()
.ok_or(Error::Str("Expect to have addressbytes"))?;
let address_bytes = &tuple.5.as_ref().unwrap().0;
if stores
.addresstype_to_addresshash_to_addressindex
.get_unwrap(addresstype)
.needs(height)
&& prev_addressbytes != address_bytes
{
let txid = tx.compute_txid();
dbg!(
height,
txid,
vout,
block_txindex,
addresstype,
prev_addressbytes,
address_bytes,
&indexes,
typeindex,
typeindex,
txout,
AddressHash::from(address_bytes),
);
panic!()
}
}
Ok(tuple)
},
)
.collect::<Result<Vec<_>>>()?;
let outputs_len = txouts.len();
let inputs_len = txins.len();
let tx_len = block.txdata.len();
let inputs_len = txins.len();
let outputs_len = txouts.len();
let mut already_added_addresshash: ByAddressType<FxHashMap<AddressHash, TypeIndex>> =
ByAddressType::default();
let mut same_block_output_info: FxHashMap<OutPoint, (OutputType, TypeIndex)> =
FxHashMap::default();
for (txoutindex, txout, txindex, vout, outputtype, addressbytes_opt, typeindex_opt) in
txouts
{
let sats = Sats::from(txout.value);
// Phase 6: Finalize outputs sequentially
let mut same_block_output_info =
processor.finalize_outputs(txouts, &same_block_spent_outpoints)?;
if vout.is_zero() {
vecs.txindex_to_first_txoutindex
.push_if_needed(txindex, txoutindex)?;
}
// Phase 7: Finalize inputs sequentially
processor.finalize_inputs(txins, &mut same_block_output_info)?;
vecs.txoutindex_to_value.push_if_needed(txoutindex, sats)?;
// Phase 8: Check TXID collisions
processor.check_txid_collisions(&txs)?;
vecs.txoutindex_to_txindex
.push_if_needed(txoutindex, txindex)?;
// Phase 9: Store transaction metadata
processor.store_transaction_metadata(txs)?;
vecs.txoutindex_to_outputtype
.push_if_needed(txoutindex, outputtype)?;
let typeindex = if let Some(ti) = typeindex_opt {
ti
} else if let Some((address_bytes, address_hash)) = addressbytes_opt {
let addresstype = outputtype;
if let Some(&ti) = already_added_addresshash
.get_unwrap(addresstype)
.get(&address_hash)
{
ti
} else {
let ti = match addresstype {
OutputType::P2PK65 => indexes.p2pk65addressindex.copy_then_increment(),
OutputType::P2PK33 => indexes.p2pk33addressindex.copy_then_increment(),
OutputType::P2PKH => indexes.p2pkhaddressindex.copy_then_increment(),
OutputType::P2SH => indexes.p2shaddressindex.copy_then_increment(),
OutputType::P2WPKH => indexes.p2wpkhaddressindex.copy_then_increment(),
OutputType::P2WSH => indexes.p2wshaddressindex.copy_then_increment(),
OutputType::P2TR => indexes.p2traddressindex.copy_then_increment(),
OutputType::P2A => indexes.p2aaddressindex.copy_then_increment(),
_ => unreachable!(),
};
already_added_addresshash
.get_mut_unwrap(addresstype)
.insert(address_hash, ti);
stores
.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(addresstype)
.insert_if_needed(address_hash, ti, height);
vecs.push_bytes_if_needed(ti, address_bytes)?;
ti
}
} else {
match outputtype {
OutputType::P2MS => {
vecs.p2msoutputindex_to_txindex
.push_if_needed(indexes.p2msoutputindex, txindex)?;
indexes.p2msoutputindex.copy_then_increment()
}
OutputType::OpReturn => {
vecs.opreturnindex_to_txindex
.push_if_needed(indexes.opreturnindex, txindex)?;
indexes.opreturnindex.copy_then_increment()
}
OutputType::Empty => {
vecs.emptyoutputindex_to_txindex
.push_if_needed(indexes.emptyoutputindex, txindex)?;
indexes.emptyoutputindex.copy_then_increment()
}
OutputType::Unknown => {
vecs.unknownoutputindex_to_txindex
.push_if_needed(indexes.unknownoutputindex, txindex)?;
indexes.unknownoutputindex.copy_then_increment()
}
_ => unreachable!(),
}
};
vecs.txoutindex_to_typeindex
.push_if_needed(txoutindex, typeindex)?;
if outputtype.is_unspendable() {
continue;
} else if outputtype.is_address() {
let addresstype = outputtype;
let addressindex = typeindex;
stores
.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.insert_if_needed(
AddressIndexTxIndex::from((addressindex, txindex)),
Unit,
height,
);
}
let outpoint = OutPoint::new(txindex, vout);
if same_block_spent_outpoints.contains(&outpoint) {
same_block_output_info.insert(outpoint, (outputtype, typeindex));
} else if outputtype.is_address() {
let addresstype = outputtype;
let addressindex = typeindex;
stores
.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.insert_if_needed(
AddressIndexOutPoint::from((addressindex, outpoint)),
Unit,
height,
);
}
}
for (txinindex, input_source) in txins {
let (vin, txindex, outpoint, addresstype_addressindex_opt) = match input_source {
InputSource::PreviousBlock(tuple) => tuple,
InputSource::SameBlock((txindex, txin, vin, outpoint)) => {
let mut tuple = (vin, txindex, outpoint, None);
if outpoint.is_coinbase() {
tuple
} else {
let outputtype_typeindex = same_block_output_info
.remove(&outpoint)
.ok_or(Error::Str("should have found addressindex from same block"))
.inspect_err(|_| {
dbg!(&same_block_output_info, txin);
})?;
if outputtype_typeindex.0.is_address() {
tuple.3 = Some(outputtype_typeindex);
}
(tuple.0, tuple.1, tuple.2, tuple.3)
}
}
};
if vin.is_zero() {
vecs.txindex_to_first_txinindex
.push_if_needed(txindex, txinindex)?;
}
vecs.txinindex_to_outpoint
.push_if_needed(txinindex, outpoint)?;
let Some((addresstype, addressindex)) = addresstype_addressindex_opt else {
continue;
};
stores
.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.insert_if_needed(
AddressIndexTxIndex::from((addressindex, txindex)),
Unit,
height,
);
stores
.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.remove_if_needed(AddressIndexOutPoint::from((addressindex, outpoint)), height);
}
if check_collisions {
let mut txindex_to_txid_iter = vecs.txindex_to_txid.into_iter();
for (txindex, _, _, _, prev_txindex_opt) in txs.iter() {
let Some(prev_txindex) = prev_txindex_opt else {
continue;
};
// In case if we start at an already parsed height
if txindex == prev_txindex {
continue;
}
let len = vecs.txindex_to_txid.len();
// Ok if `get` is not par as should happen only twice
let prev_txid = txindex_to_txid_iter
.get(*prev_txindex)
.ok_or(Error::Str("To have txid for txindex"))
.inspect_err(|_| {
dbg!(txindex, len);
})?;
// If another Txid needs to be added to the list
// We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated
let is_dup = DUPLICATE_TXIDS.contains(&prev_txid);
if !is_dup {
dbg!(height, txindex, prev_txid, prev_txindex);
return Err(Error::Str("Expect none"));
}
}
}
for (txindex, tx, txid, txid_prefix, prev_txindex_opt) in txs {
if prev_txindex_opt.is_none() {
stores
.txidprefix_to_txindex
.insert_if_needed(txid_prefix, txindex, height);
}
vecs.txindex_to_height.push_if_needed(txindex, height)?;
vecs.txindex_to_txversion
.push_if_needed(txindex, tx.version.into())?;
vecs.txindex_to_txid.push_if_needed(txindex, txid)?;
vecs.txindex_to_rawlocktime
.push_if_needed(txindex, tx.lock_time.into())?;
vecs.txindex_to_base_size
.push_if_needed(txindex, tx.base_size().into())?;
vecs.txindex_to_total_size
.push_if_needed(txindex, tx.total_size().into())?;
vecs.txindex_to_is_explicitly_rbf
.push_if_needed(txindex, StoredBool::from(tx.is_explicitly_rbf()))?;
}
indexes.txindex += TxIndex::from(tx_len);
indexes.txinindex += TxInIndex::from(inputs_len);
indexes.txoutindex += TxOutIndex::from(outputs_len);
// Phase 10: Update indexes
processor.update_indexes(tx_len, inputs_len, outputs_len);
if should_export(height, false) {
drop(readers);
@@ -711,47 +200,3 @@ impl Indexer {
Ok(starting_indexes)
}
}
#[derive(Debug)]
enum InputSource<'a> {
PreviousBlock((Vin, TxIndex, OutPoint, Option<(OutputType, TypeIndex)>)),
SameBlock((TxIndex, &'a TxIn, Vin, OutPoint)),
}
struct Readers {
txindex_to_first_txoutindex: Reader,
txoutindex_to_outputtype: Reader,
txoutindex_to_typeindex: Reader,
p2pk65addressindex_to_p2pk65bytes: Reader,
p2pk33addressindex_to_p2pk33bytes: Reader,
p2pkhaddressindex_to_p2pkhbytes: Reader,
p2shaddressindex_to_p2shbytes: Reader,
p2wpkhaddressindex_to_p2wpkhbytes: Reader,
p2wshaddressindex_to_p2wshbytes: Reader,
p2traddressindex_to_p2trbytes: Reader,
p2aaddressindex_to_p2abytes: Reader,
}
impl Readers {
fn new(vecs: &Vecs) -> Self {
Self {
txindex_to_first_txoutindex: vecs.txindex_to_first_txoutindex.create_reader(),
txoutindex_to_outputtype: vecs.txoutindex_to_outputtype.create_reader(),
txoutindex_to_typeindex: vecs.txoutindex_to_typeindex.create_reader(),
p2pk65addressindex_to_p2pk65bytes: vecs
.p2pk65addressindex_to_p2pk65bytes
.create_reader(),
p2pk33addressindex_to_p2pk33bytes: vecs
.p2pk33addressindex_to_p2pk33bytes
.create_reader(),
p2pkhaddressindex_to_p2pkhbytes: vecs.p2pkhaddressindex_to_p2pkhbytes.create_reader(),
p2shaddressindex_to_p2shbytes: vecs.p2shaddressindex_to_p2shbytes.create_reader(),
p2wpkhaddressindex_to_p2wpkhbytes: vecs
.p2wpkhaddressindex_to_p2wpkhbytes
.create_reader(),
p2wshaddressindex_to_p2wshbytes: vecs.p2wshaddressindex_to_p2wshbytes.create_reader(),
p2traddressindex_to_p2trbytes: vecs.p2traddressindex_to_p2trbytes.create_reader(),
p2aaddressindex_to_p2abytes: vecs.p2aaddressindex_to_p2abytes.create_reader(),
}
}
}
+688
View File
@@ -0,0 +1,688 @@
use bitcoin::{Transaction, TxIn, TxOut};
use brk_error::{Error, Result};
use brk_grouper::ByAddressType;
use brk_store::AnyStore;
use brk_types::{
AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, Block, BlockHashPrefix,
Height, OutPoint, OutputType, Sats, StoredBool, Timestamp, TxInIndex, TxIndex, TxOutIndex,
Txid, TxidPrefix, TypeIndex, Unit, Vin, Vout,
};
use log::error;
use rayon::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet};
use vecdb::{AnyVec, GenericStoredVec, TypedVecIterator};
use crate::{Indexes, Readers, Stores, Vecs, constants::*};
/// Input source for tracking where an input came from.
#[derive(Debug)]
pub enum InputSource<'a> {
PreviousBlock {
vin: Vin,
txindex: TxIndex,
outpoint: OutPoint,
address_info: Option<(OutputType, TypeIndex)>,
},
SameBlock {
txindex: TxIndex,
txin: &'a TxIn,
vin: Vin,
outpoint: OutPoint,
},
}
/// Processed output data from parallel output processing.
pub struct ProcessedOutput<'a> {
pub txoutindex: TxOutIndex,
pub txout: &'a TxOut,
pub txindex: TxIndex,
pub vout: Vout,
pub outputtype: OutputType,
pub address_info: Option<(AddressBytes, AddressHash)>,
pub existing_typeindex: Option<TypeIndex>,
}
/// Computed transaction data from parallel TXID computation.
pub struct ComputedTx<'a> {
pub txindex: TxIndex,
pub tx: &'a Transaction,
pub txid: Txid,
pub txid_prefix: TxidPrefix,
pub prev_txindex_opt: Option<TxIndex>,
}
/// Processes a single block, extracting and storing all indexed data.
pub struct BlockProcessor<'a> {
pub block: &'a Block,
pub height: Height,
pub check_collisions: bool,
pub indexes: &'a mut Indexes,
pub vecs: &'a mut Vecs,
pub stores: &'a mut Stores,
pub readers: &'a Readers,
}
impl<'a> BlockProcessor<'a> {
/// Process block metadata (blockhash, difficulty, timestamp, etc.)
pub fn process_block_metadata(&mut self) -> Result<()> {
let height = self.height;
let blockhash = self.block.hash();
let blockhash_prefix = BlockHashPrefix::from(blockhash);
// Check for blockhash prefix collision
if self
.stores
.blockhashprefix_to_height
.get(&blockhash_prefix)?
.is_some_and(|prev_height| *prev_height != height)
{
error!("BlockHash: {blockhash}");
return Err(Error::Str("Collision, expect prefix to need be set yet"));
}
self.indexes.push_if_needed(self.vecs)?;
self.stores
.blockhashprefix_to_height
.insert_if_needed(blockhash_prefix, height, height);
self.stores.height_to_coinbase_tag.insert_if_needed(
height,
self.block.coinbase_tag().into(),
height,
);
self.vecs
.height_to_blockhash
.push_if_needed(height, blockhash.clone())?;
self.vecs
.height_to_difficulty
.push_if_needed(height, self.block.header.difficulty_float().into())?;
self.vecs
.height_to_timestamp
.push_if_needed(height, Timestamp::from(self.block.header.time))?;
self.vecs
.height_to_total_size
.push_if_needed(height, self.block.total_size().into())?;
self.vecs
.height_to_weight
.push_if_needed(height, self.block.weight().into())?;
Ok(())
}
/// Compute TXIDs in parallel (CPU-intensive operation).
pub fn compute_txids(&self) -> Result<Vec<ComputedTx<'a>>> {
let should_check_collisions =
self.check_collisions && self.stores.txidprefix_to_txindex.needs(self.height);
let base_txindex = self.indexes.txindex;
self.block
.txdata
.par_iter()
.enumerate()
.map(|(index, tx)| {
let txid = Txid::from(tx.compute_txid());
let txid_prefix = TxidPrefix::from(&txid);
let prev_txindex_opt = if should_check_collisions {
self.stores
.txidprefix_to_txindex
.get(&txid_prefix)?
.map(|v| *v)
} else {
None
};
Ok(ComputedTx {
txindex: base_txindex + TxIndex::from(index),
tx,
txid,
txid_prefix,
prev_txindex_opt,
})
})
.collect()
}
/// Process inputs in parallel.
///
/// Uses collect().into_par_iter() pattern because:
/// 1. The inner work (store lookups, vector reads) is expensive
/// 2. We want to parallelize across ALL inputs, not just per-transaction
/// 3. The intermediate allocation (~8KB per block) is negligible compared to parallelism gains
pub fn process_inputs<'c>(
&self,
txs: &[ComputedTx<'c>],
) -> Result<Vec<(TxInIndex, InputSource<'a>)>> {
let txid_prefix_to_txindex: FxHashMap<_, _> =
txs.iter().map(|ct| (ct.txid_prefix, &ct.txindex)).collect();
let base_txindex = self.indexes.txindex;
let base_txinindex = self.indexes.txinindex;
let txins = self
.block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.input
.iter()
.enumerate()
.map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx))
})
.collect::<Vec<_>>()
.into_par_iter()
.enumerate()
.map(
|(block_txinindex, (block_txindex, vin, txin, tx))| -> Result<(TxInIndex, InputSource)> {
let txindex = base_txindex + block_txindex;
let txinindex = base_txinindex + TxInIndex::from(block_txinindex);
if tx.is_coinbase() {
return Ok((
txinindex,
InputSource::SameBlock {
txindex,
txin,
vin,
outpoint: OutPoint::COINBASE,
},
));
}
let outpoint = txin.previous_output;
let txid = Txid::from(outpoint.txid);
let txid_prefix = TxidPrefix::from(&txid);
let prev_txindex = if let Some(txindex) = self
.stores
.txidprefix_to_txindex
.get(&txid_prefix)?
.map(|v| *v)
.and_then(|txindex| {
(txindex < self.indexes.txindex).then_some(txindex)
})
{
txindex
} else {
let vout = Vout::from(outpoint.vout);
let prev_txindex = **txid_prefix_to_txindex
.get(&txid_prefix)
.ok_or(Error::Str("txid should be in same block"))?;
let outpoint = OutPoint::new(prev_txindex, vout);
return Ok((
txinindex,
InputSource::SameBlock {
txindex,
txin,
vin,
outpoint,
},
));
};
let vout = Vout::from(outpoint.vout);
let txoutindex = self
.vecs
.txindex_to_first_txoutindex
.get_pushed_or_read(prev_txindex, &self.readers.txindex_to_first_txoutindex)?
.ok_or(Error::Str("Expect txoutindex to not be none"))?
+ vout;
let outpoint = OutPoint::new(prev_txindex, vout);
let outputtype = self
.vecs
.txoutindex_to_outputtype
.get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_outputtype)?
.ok_or(Error::Str("Expect outputtype to not be none"))?;
let address_info = if outputtype.is_address() {
let typeindex = self
.vecs
.txoutindex_to_typeindex
.get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_typeindex)?
.ok_or(Error::Str("Expect typeindex to not be none"))?;
Some((outputtype, typeindex))
} else {
None
};
Ok((
txinindex,
InputSource::PreviousBlock {
vin,
txindex,
outpoint,
address_info,
},
))
},
)
.collect::<Result<Vec<_>>>()?;
Ok(txins)
}
/// Collect same-block spent outpoints.
pub fn collect_same_block_spent_outpoints(
txins: &[(TxInIndex, InputSource)],
) -> FxHashSet<OutPoint> {
txins
.iter()
.filter_map(|(_, input_source)| {
let InputSource::SameBlock { outpoint, .. } = input_source else {
return None;
};
if !outpoint.is_coinbase() {
Some(*outpoint)
} else {
None
}
})
.collect()
}
/// Process outputs in parallel.
pub fn process_outputs(&self) -> Result<Vec<ProcessedOutput<'a>>> {
let height = self.height;
let check_collisions = self.check_collisions;
let base_txindex = self.indexes.txindex;
let base_txoutindex = self.indexes.txoutindex;
// Same pattern as inputs: collect then parallelize for maximum parallelism
self.block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.output
.iter()
.enumerate()
.map(move |(vout, txout)| (TxIndex::from(index), Vout::from(vout), txout, tx))
})
.collect::<Vec<_>>()
.into_par_iter()
.enumerate()
.map(
|(block_txoutindex, (block_txindex, vout, txout, tx))| -> Result<ProcessedOutput> {
let txindex = base_txindex + block_txindex;
let txoutindex = base_txoutindex + TxOutIndex::from(block_txoutindex);
let script = &txout.script_pubkey;
let outputtype = OutputType::from(script);
if outputtype.is_not_address() {
return Ok(ProcessedOutput {
txoutindex,
txout,
txindex,
vout,
outputtype,
address_info: None,
existing_typeindex: None,
});
}
let addresstype = outputtype;
let address_bytes = AddressBytes::try_from((script, addresstype)).unwrap();
let address_hash = AddressHash::from(&address_bytes);
let existing_typeindex = self
.stores
.addresstype_to_addresshash_to_addressindex
.get_unwrap(addresstype)
.get(&address_hash)
.unwrap()
.map(|v| *v)
.and_then(|typeindex_local| {
(typeindex_local < self.indexes.to_typeindex(addresstype))
.then_some(typeindex_local)
});
if check_collisions && let Some(typeindex) = existing_typeindex {
let prev_addressbytes_opt = self.vecs.get_addressbytes_by_type(
addresstype,
typeindex,
self.readers.addressbytes.get_unwrap(addresstype),
)?;
let prev_addressbytes = prev_addressbytes_opt
.as_ref()
.ok_or(Error::Str("Expect to have addressbytes"))?;
if self
.stores
.addresstype_to_addresshash_to_addressindex
.get_unwrap(addresstype)
.needs(height)
&& prev_addressbytes != &address_bytes
{
let txid = tx.compute_txid();
dbg!(
height,
txid,
vout,
block_txindex,
addresstype,
prev_addressbytes,
&address_bytes,
&self.indexes,
typeindex,
txout,
AddressHash::from(&address_bytes),
);
panic!()
}
}
Ok(ProcessedOutput {
txoutindex,
txout,
txindex,
vout,
outputtype,
address_info: Some((address_bytes, address_hash)),
existing_typeindex,
})
},
)
.collect()
}
/// Finalize outputs sequentially (stores addresses, tracks UTXOs).
pub fn finalize_outputs(
&mut self,
txouts: Vec<ProcessedOutput>,
same_block_spent_outpoints: &FxHashSet<OutPoint>,
) -> Result<FxHashMap<OutPoint, (OutputType, TypeIndex)>> {
let height = self.height;
let mut already_added_addresshash: ByAddressType<FxHashMap<AddressHash, TypeIndex>> =
ByAddressType::default();
// Pre-size based on the number of same-block spent outpoints
let mut same_block_output_info: FxHashMap<OutPoint, (OutputType, TypeIndex)> =
FxHashMap::with_capacity_and_hasher(
same_block_spent_outpoints.len(),
Default::default(),
);
for ProcessedOutput {
txoutindex,
txout,
txindex,
vout,
outputtype,
address_info,
existing_typeindex,
} in txouts
{
let sats = Sats::from(txout.value);
if vout.is_zero() {
self.vecs
.txindex_to_first_txoutindex
.push_if_needed(txindex, txoutindex)?;
}
self.vecs
.txoutindex_to_value
.push_if_needed(txoutindex, sats)?;
self.vecs
.txoutindex_to_txindex
.push_if_needed(txoutindex, txindex)?;
self.vecs
.txoutindex_to_outputtype
.push_if_needed(txoutindex, outputtype)?;
let typeindex = if let Some(ti) = existing_typeindex {
ti
} else if let Some((address_bytes, address_hash)) = address_info {
let addresstype = outputtype;
if let Some(&ti) = already_added_addresshash
.get_unwrap(addresstype)
.get(&address_hash)
{
ti
} else {
let ti = self.indexes.increment_address_index(addresstype);
already_added_addresshash
.get_mut_unwrap(addresstype)
.insert(address_hash, ti);
self.stores
.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(addresstype)
.insert_if_needed(address_hash, ti, height);
self.vecs.push_bytes_if_needed(ti, address_bytes)?;
ti
}
} else {
match outputtype {
OutputType::P2MS => {
self.vecs
.p2msoutputindex_to_txindex
.push_if_needed(self.indexes.p2msoutputindex, txindex)?;
self.indexes.p2msoutputindex.copy_then_increment()
}
OutputType::OpReturn => {
self.vecs
.opreturnindex_to_txindex
.push_if_needed(self.indexes.opreturnindex, txindex)?;
self.indexes.opreturnindex.copy_then_increment()
}
OutputType::Empty => {
self.vecs
.emptyoutputindex_to_txindex
.push_if_needed(self.indexes.emptyoutputindex, txindex)?;
self.indexes.emptyoutputindex.copy_then_increment()
}
OutputType::Unknown => {
self.vecs
.unknownoutputindex_to_txindex
.push_if_needed(self.indexes.unknownoutputindex, txindex)?;
self.indexes.unknownoutputindex.copy_then_increment()
}
_ => unreachable!(),
}
};
self.vecs
.txoutindex_to_typeindex
.push_if_needed(txoutindex, typeindex)?;
if outputtype.is_unspendable() {
continue;
} else if outputtype.is_address() {
let addresstype = outputtype;
let addressindex = typeindex;
self.stores
.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.insert_if_needed(
AddressIndexTxIndex::from((addressindex, txindex)),
Unit,
height,
);
}
let outpoint = OutPoint::new(txindex, vout);
if same_block_spent_outpoints.contains(&outpoint) {
same_block_output_info.insert(outpoint, (outputtype, typeindex));
} else if outputtype.is_address() {
let addresstype = outputtype;
let addressindex = typeindex;
self.stores
.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.insert_if_needed(
AddressIndexOutPoint::from((addressindex, outpoint)),
Unit,
height,
);
}
}
Ok(same_block_output_info)
}
/// Finalize inputs sequentially (stores outpoints, updates address UTXOs).
pub fn finalize_inputs(
&mut self,
txins: Vec<(TxInIndex, InputSource)>,
same_block_output_info: &mut FxHashMap<OutPoint, (OutputType, TypeIndex)>,
) -> Result<()> {
let height = self.height;
for (txinindex, input_source) in txins {
let (vin, txindex, outpoint, address_info) = match input_source {
InputSource::PreviousBlock {
vin,
txindex,
outpoint,
address_info,
} => (vin, txindex, outpoint, address_info),
InputSource::SameBlock {
txindex,
txin,
vin,
outpoint,
} => {
if outpoint.is_coinbase() {
(vin, txindex, outpoint, None)
} else {
let outputtype_typeindex = same_block_output_info
.remove(&outpoint)
.ok_or(Error::Str("should have found addressindex from same block"))
.inspect_err(|_| {
dbg!(&same_block_output_info, txin);
})?;
let address_info = if outputtype_typeindex.0.is_address() {
Some(outputtype_typeindex)
} else {
None
};
(vin, txindex, outpoint, address_info)
}
}
};
if vin.is_zero() {
self.vecs
.txindex_to_first_txinindex
.push_if_needed(txindex, txinindex)?;
}
self.vecs
.txinindex_to_outpoint
.push_if_needed(txinindex, outpoint)?;
let Some((addresstype, addressindex)) = address_info else {
continue;
};
self.stores
.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.insert_if_needed(
AddressIndexTxIndex::from((addressindex, txindex)),
Unit,
height,
);
self.stores
.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.remove_if_needed(AddressIndexOutPoint::from((addressindex, outpoint)), height);
}
Ok(())
}
/// Check for TXID collisions (only for known duplicate TXIDs).
pub fn check_txid_collisions(&self, txs: &[ComputedTx]) -> Result<()> {
if !self.check_collisions {
return Ok(());
}
let mut txindex_to_txid_iter = self.vecs.txindex_to_txid.into_iter();
for ct in txs.iter() {
let Some(prev_txindex) = ct.prev_txindex_opt else {
continue;
};
// In case if we start at an already parsed height
if ct.txindex == prev_txindex {
continue;
}
let len = self.vecs.txindex_to_txid.len();
let prev_txid = txindex_to_txid_iter
.get(prev_txindex)
.ok_or(Error::Str("To have txid for txindex"))
.inspect_err(|_| {
dbg!(ct.txindex, len);
})?;
let is_dup = DUPLICATE_TXIDS.contains(&prev_txid);
if !is_dup {
dbg!(self.height, ct.txindex, prev_txid, prev_txindex);
return Err(Error::Str("Expect none"));
}
}
Ok(())
}
/// Store transaction metadata.
pub fn store_transaction_metadata(&mut self, txs: Vec<ComputedTx>) -> Result<()> {
let height = self.height;
for ct in txs {
if ct.prev_txindex_opt.is_none() {
self.stores.txidprefix_to_txindex.insert_if_needed(
ct.txid_prefix,
ct.txindex,
height,
);
}
self.vecs
.txindex_to_height
.push_if_needed(ct.txindex, height)?;
self.vecs
.txindex_to_txversion
.push_if_needed(ct.txindex, ct.tx.version.into())?;
self.vecs
.txindex_to_txid
.push_if_needed(ct.txindex, ct.txid)?;
self.vecs
.txindex_to_rawlocktime
.push_if_needed(ct.txindex, ct.tx.lock_time.into())?;
self.vecs
.txindex_to_base_size
.push_if_needed(ct.txindex, ct.tx.base_size().into())?;
self.vecs
.txindex_to_total_size
.push_if_needed(ct.txindex, ct.tx.total_size().into())?;
self.vecs
.txindex_to_is_explicitly_rbf
.push_if_needed(ct.txindex, StoredBool::from(ct.tx.is_explicitly_rbf()))?;
}
Ok(())
}
/// Update global indexes after processing a block.
pub fn update_indexes(&mut self, tx_count: usize, input_count: usize, output_count: usize) {
self.indexes.txindex += TxIndex::from(tx_count);
self.indexes.txinindex += TxInIndex::from(input_count);
self.indexes.txoutindex += TxOutIndex::from(output_count);
}
}
+33
View File
@@ -0,0 +1,33 @@
use brk_grouper::ByAddressType;
use vecdb::{GenericStoredVec, Reader};
use crate::Vecs;
/// Readers for vectors that need to be accessed during block processing.
/// These provide consistent snapshots for reading while the main vectors are being modified.
pub struct Readers {
pub txindex_to_first_txoutindex: Reader,
pub txoutindex_to_outputtype: Reader,
pub txoutindex_to_typeindex: Reader,
pub addressbytes: ByAddressType<Reader>,
}
impl Readers {
pub fn new(vecs: &Vecs) -> Self {
Self {
txindex_to_first_txoutindex: vecs.txindex_to_first_txoutindex.create_reader(),
txoutindex_to_outputtype: vecs.txoutindex_to_outputtype.create_reader(),
txoutindex_to_typeindex: vecs.txoutindex_to_typeindex.create_reader(),
addressbytes: ByAddressType {
p2pk65: vecs.p2pk65addressindex_to_p2pk65bytes.create_reader(),
p2pk33: vecs.p2pk33addressindex_to_p2pk33bytes.create_reader(),
p2pkh: vecs.p2pkhaddressindex_to_p2pkhbytes.create_reader(),
p2sh: vecs.p2shaddressindex_to_p2shbytes.create_reader(),
p2wpkh: vecs.p2wpkhaddressindex_to_p2wpkhbytes.create_reader(),
p2wsh: vecs.p2wshaddressindex_to_p2wshbytes.create_reader(),
p2tr: vecs.p2traddressindex_to_p2trbytes.create_reader(),
p2a: vecs.p2aaddressindex_to_p2abytes.create_reader(),
},
}
}
}
+54 -5
View File
@@ -12,8 +12,8 @@ use brk_types::{
};
use rayon::prelude::*;
use vecdb::{
AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, TypedVecIterator, PAGE_SIZE,
PcoVec, Stamp,
AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PAGE_SIZE, PcoVec, Reader,
Stamp, TypedVecIterator,
};
use crate::Indexes;
@@ -314,6 +314,52 @@ impl Vecs {
Ok(())
}
/// Get address bytes by output type, using the reader for the specific address type.
/// Returns None if the index doesn't exist yet.
pub fn get_addressbytes_by_type(
&self,
addresstype: OutputType,
typeindex: TypeIndex,
reader: &Reader,
) -> Result<Option<AddressBytes>> {
match addresstype {
OutputType::P2PK65 => self
.p2pk65addressindex_to_p2pk65bytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
OutputType::P2PK33 => self
.p2pk33addressindex_to_p2pk33bytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
OutputType::P2PKH => self
.p2pkhaddressindex_to_p2pkhbytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
OutputType::P2SH => self
.p2shaddressindex_to_p2shbytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
OutputType::P2WPKH => self
.p2wpkhaddressindex_to_p2wpkhbytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
OutputType::P2WSH => self
.p2wshaddressindex_to_p2wshbytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
OutputType::P2TR => self
.p2traddressindex_to_p2trbytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
OutputType::P2A => self
.p2aaddressindex_to_p2abytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
_ => unreachable!("get_addressbytes_by_type called with non-address type"),
}
.map_err(|e| e.into())
}
pub fn push_bytes_if_needed(&mut self, index: TypeIndex, bytes: AddressBytes) -> Result<()> {
match bytes {
AddressBytes::P2PK65(bytes) => self
@@ -386,10 +432,13 @@ impl Vecs {
index.increment();
AddressHash::from(&bytes)
})
})) as Box<dyn Iterator<Item = AddressHash> + '_>)
}))
as Box<dyn Iterator<Item = AddressHash> + '_>)
}
Err(_) => {
Ok(Box::new(std::iter::empty())
as Box<dyn Iterator<Item = AddressHash> + '_>)
}
Err(_) => Ok(Box::new(std::iter::empty())
as Box<dyn Iterator<Item = AddressHash> + '_>),
}
}};
}