mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
indexer: snapshot
This commit is contained in:
@@ -4,7 +4,7 @@ use brk_types::{TxIndex, Txid, TxidPrefix, Version};
|
||||
|
||||
// One version for all data sources
|
||||
// Increment on **change _OR_ addition**
|
||||
pub const VERSION: Version = Version::new(24);
|
||||
pub const VERSION: Version = Version::new(25);
|
||||
pub const SNAPSHOT_BLOCK_RANGE: usize = 1_000;
|
||||
|
||||
/// Known duplicate Bitcoin transactions (BIP30)
|
||||
|
||||
@@ -136,8 +136,8 @@ impl Indexer {
|
||||
let mut indexes = starting_indexes.clone();
|
||||
debug!("Indexes cloned.");
|
||||
|
||||
let should_export = |height: Height, rem: bool| -> bool {
|
||||
height != 0 && (height % SNAPSHOT_BLOCK_RANGE == 0) != rem
|
||||
let is_export_height = |height: Height| -> bool {
|
||||
height != 0 && height % SNAPSHOT_BLOCK_RANGE == 0
|
||||
};
|
||||
|
||||
let export = move |stores: &mut Stores, vecs: &mut Vecs, height: Height| -> Result<()> {
|
||||
@@ -166,6 +166,7 @@ impl Indexer {
|
||||
};
|
||||
|
||||
let mut readers = Readers::new(&self.vecs);
|
||||
let mut buffers = BlockBuffers::default();
|
||||
|
||||
let vecs = &mut self.vecs;
|
||||
let stores = &mut self.stores;
|
||||
@@ -187,48 +188,50 @@ impl Indexer {
|
||||
readers: &readers,
|
||||
};
|
||||
|
||||
// Phase 1: Process block metadata
|
||||
// 1. Process block metadata
|
||||
processor.process_block_metadata()?;
|
||||
|
||||
// Phase 2: Compute TXIDs in parallel
|
||||
// 2. Compute TXIDs (parallel)
|
||||
let txs = processor.compute_txids()?;
|
||||
|
||||
// Phase 3+5: Process inputs and outputs in parallel
|
||||
// They access different stores (txidprefix vs addresshash) and
|
||||
// different vecs, so running concurrently hides latency of the
|
||||
// shorter phase behind the longer one.
|
||||
// 2.5 Push block size/weight reusing per-tx sizes from compute_txids
|
||||
processor.push_block_size_and_weight(&txs)?;
|
||||
|
||||
// 3. Process inputs and outputs (parallel)
|
||||
let (txins_result, txouts_result) = rayon::join(
|
||||
|| processor.process_inputs(&txs),
|
||||
|| processor.process_inputs(&txs, &mut buffers.txid_prefix_map),
|
||||
|| processor.process_outputs(),
|
||||
);
|
||||
let txins = txins_result?;
|
||||
let txouts = txouts_result?;
|
||||
|
||||
// Phase 4: Collect same-block spent outpoints
|
||||
let same_block_spent_outpoints =
|
||||
BlockProcessor::collect_same_block_spent_outpoints(&txins);
|
||||
let tx_count = block.txdata.len();
|
||||
let input_count = txins.len();
|
||||
let output_count = txouts.len();
|
||||
|
||||
let tx_len = block.txdata.len();
|
||||
let inputs_len = txins.len();
|
||||
let outputs_len = txouts.len();
|
||||
// 4. Collect same-block spent outpoints
|
||||
BlockProcessor::collect_same_block_spent_outpoints(
|
||||
&txins,
|
||||
&mut buffers.same_block_spent,
|
||||
);
|
||||
|
||||
// Phase 6: Finalize outputs sequentially
|
||||
let same_block_output_info =
|
||||
processor.finalize_outputs(txouts, &same_block_spent_outpoints)?;
|
||||
|
||||
// Phase 7: Finalize inputs sequentially
|
||||
processor.finalize_inputs(txins, same_block_output_info)?;
|
||||
|
||||
// Phase 8: Check TXID collisions
|
||||
// 5. Check TXID collisions (BIP-30)
|
||||
processor.check_txid_collisions(&txs)?;
|
||||
|
||||
// Phase 9: Store transaction metadata
|
||||
processor.store_transaction_metadata(txs)?;
|
||||
// 6. Finalize outputs/inputs || store tx metadata (parallel)
|
||||
processor.finalize_and_store_metadata(
|
||||
txs,
|
||||
txouts,
|
||||
txins,
|
||||
&buffers.same_block_spent,
|
||||
&mut buffers.already_added_addresses,
|
||||
&mut buffers.same_block_output_info,
|
||||
)?;
|
||||
|
||||
// Phase 10: Update indexes
|
||||
processor.update_indexes(tx_len, inputs_len, outputs_len);
|
||||
// 7. Update indexes
|
||||
processor.update_indexes(tx_count, input_count, output_count);
|
||||
|
||||
if should_export(height, false) {
|
||||
if is_export_height(height) {
|
||||
drop(readers);
|
||||
export(stores, vecs, height)?;
|
||||
readers = Readers::new(vecs);
|
||||
@@ -237,7 +240,7 @@ impl Indexer {
|
||||
|
||||
drop(readers);
|
||||
|
||||
if should_export(indexes.height, true) {
|
||||
if !is_export_height(indexes.height) {
|
||||
export(stores, vecs, indexes.height)?;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ use brk_types::{BlockHashPrefix, Timestamp};
|
||||
use tracing::error;
|
||||
use vecdb::WritableVec;
|
||||
|
||||
use super::BlockProcessor;
|
||||
use super::{BlockProcessor, ComputedTx};
|
||||
use crate::IndexesExt;
|
||||
|
||||
impl BlockProcessor<'_> {
|
||||
@@ -45,16 +45,31 @@ impl BlockProcessor<'_> {
|
||||
.blocks
|
||||
.timestamp
|
||||
.checked_push(height, Timestamp::from(self.block.header.time))?;
|
||||
let (block_total_size, block_weight) = self.block.total_size_and_weight();
|
||||
self.vecs
|
||||
.blocks
|
||||
.total_size
|
||||
.checked_push(height, block_total_size.into())?;
|
||||
self.vecs
|
||||
.blocks
|
||||
.weight
|
||||
.checked_push(height, block_weight.into())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Push block total_size and weight, reusing per-tx sizes already computed in ComputedTx.
|
||||
/// This avoids redundant tx serialization (base_size + total_size were already computed).
|
||||
pub fn push_block_size_and_weight(&mut self, txs: &[ComputedTx]) -> Result<()> {
|
||||
let overhead =
|
||||
bitcoin::block::Header::SIZE + bitcoin::VarInt::from(txs.len()).size();
|
||||
let mut total_size = overhead;
|
||||
let mut weight_wu = overhead * 4;
|
||||
for ct in txs {
|
||||
let base = ct.base_size as usize;
|
||||
let total = ct.total_size as usize;
|
||||
total_size += total;
|
||||
weight_wu += base * 3 + total;
|
||||
}
|
||||
self.vecs
|
||||
.blocks
|
||||
.total_size
|
||||
.checked_push(self.height, total_size.into())?;
|
||||
self.vecs
|
||||
.blocks
|
||||
.weight
|
||||
.checked_push(self.height, weight_wu.into())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,10 @@ mod types;
|
||||
|
||||
pub use types::*;
|
||||
|
||||
use brk_types::{Block, Height, TxInIndex, TxIndex, TxOutIndex};
|
||||
use brk_cohort::ByAddressType;
|
||||
use brk_error::Result;
|
||||
use brk_types::{AddressHash, Block, Height, OutPoint, TxInIndex, TxIndex, TxOutIndex, TypeIndex};
|
||||
use rustc_hash::{FxHashMap, FxHashSet};
|
||||
|
||||
use crate::{Indexes, Readers, Stores, Vecs};
|
||||
|
||||
@@ -28,4 +31,66 @@ impl BlockProcessor<'_> {
|
||||
self.indexes.txinindex += TxInIndex::from(input_count);
|
||||
self.indexes.txoutindex += TxOutIndex::from(output_count);
|
||||
}
|
||||
|
||||
/// Finalizes outputs/inputs in parallel with storing tx metadata.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn finalize_and_store_metadata(
|
||||
&mut self,
|
||||
txs: Vec<ComputedTx>,
|
||||
txouts: Vec<ProcessedOutput>,
|
||||
txins: Vec<(TxInIndex, InputSource)>,
|
||||
same_block_spent_outpoints: &FxHashSet<OutPoint>,
|
||||
already_added: &mut ByAddressType<FxHashMap<AddressHash, TypeIndex>>,
|
||||
same_block_info: &mut FxHashMap<OutPoint, SameBlockOutputInfo>,
|
||||
) -> Result<()> {
|
||||
let height = self.height;
|
||||
let indexes = &mut *self.indexes;
|
||||
|
||||
// Split transactions vecs: finalize needs first_txoutindex/first_txinindex, metadata needs the rest
|
||||
let (first_txoutindex, first_txinindex, mut tx_metadata) =
|
||||
self.vecs.transactions.split_for_finalize();
|
||||
|
||||
let outputs = &mut self.vecs.outputs;
|
||||
let inputs = &mut self.vecs.inputs;
|
||||
let addresses = &mut self.vecs.addresses;
|
||||
let scripts = &mut self.vecs.scripts;
|
||||
|
||||
let addr_hash_stores = &mut self.stores.addresstype_to_addresshash_to_addressindex;
|
||||
let addr_txindex_stores = &mut self.stores.addresstype_to_addressindex_and_txindex;
|
||||
let addr_outpoint_stores =
|
||||
&mut self.stores.addresstype_to_addressindex_and_unspentoutpoint;
|
||||
let txidprefix_store = &mut self.stores.txidprefix_to_txindex;
|
||||
|
||||
let (finalize_result, metadata_result) = rayon::join(
|
||||
|| -> Result<()> {
|
||||
txout::finalize_outputs(
|
||||
indexes,
|
||||
first_txoutindex,
|
||||
outputs,
|
||||
addresses,
|
||||
scripts,
|
||||
addr_hash_stores,
|
||||
addr_txindex_stores,
|
||||
addr_outpoint_stores,
|
||||
txouts,
|
||||
same_block_spent_outpoints,
|
||||
already_added,
|
||||
same_block_info,
|
||||
)?;
|
||||
txin::finalize_inputs(
|
||||
first_txinindex,
|
||||
inputs,
|
||||
addr_txindex_stores,
|
||||
addr_outpoint_stores,
|
||||
txins,
|
||||
same_block_info,
|
||||
)
|
||||
},
|
||||
|| tx::store_tx_metadata(height, txs, txidprefix_store, &mut tx_metadata),
|
||||
);
|
||||
|
||||
finalize_result?;
|
||||
metadata_result?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use brk_error::{Error, Result};
|
||||
use brk_types::{StoredBool, TxIndex, Txid, TxidPrefix};
|
||||
use brk_store::Store;
|
||||
use brk_types::{Height, StoredBool, TxIndex, Txid, TxidPrefix};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{AnyVec, WritableVec, likely};
|
||||
|
||||
use crate::TxMetadataVecs;
|
||||
use crate::constants::DUPLICATE_TXIDS;
|
||||
|
||||
use super::{BlockProcessor, ComputedTx};
|
||||
@@ -78,47 +80,30 @@ impl<'a> BlockProcessor<'a> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn store_transaction_metadata(&mut self, txs: Vec<ComputedTx>) -> Result<()> {
|
||||
let height = self.height;
|
||||
|
||||
for ct in txs {
|
||||
if ct.prev_txindex_opt.is_none() {
|
||||
self.stores
|
||||
.txidprefix_to_txindex
|
||||
.insert(ct.txid_prefix, ct.txindex);
|
||||
}
|
||||
|
||||
self.vecs
|
||||
.transactions
|
||||
.height
|
||||
.checked_push(ct.txindex, height)?;
|
||||
self.vecs
|
||||
.transactions
|
||||
.txversion
|
||||
.checked_push(ct.txindex, ct.tx.version.into())?;
|
||||
self.vecs
|
||||
.transactions
|
||||
.txid
|
||||
.checked_push(ct.txindex, ct.txid)?;
|
||||
self.vecs
|
||||
.transactions
|
||||
.rawlocktime
|
||||
.checked_push(ct.txindex, ct.tx.lock_time.into())?;
|
||||
self.vecs
|
||||
.transactions
|
||||
.base_size
|
||||
.checked_push(ct.txindex, ct.base_size.into())?;
|
||||
self.vecs
|
||||
.transactions
|
||||
.total_size
|
||||
.checked_push(ct.txindex, ct.total_size.into())?;
|
||||
self.vecs
|
||||
.transactions
|
||||
.is_explicitly_rbf
|
||||
.checked_push(ct.txindex, StoredBool::from(ct.tx.is_explicitly_rbf()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn store_tx_metadata(
|
||||
height: Height,
|
||||
txs: Vec<ComputedTx>,
|
||||
store: &mut Store<TxidPrefix, TxIndex>,
|
||||
md: &mut TxMetadataVecs<'_>,
|
||||
) -> Result<()> {
|
||||
for ct in txs {
|
||||
if ct.prev_txindex_opt.is_none() {
|
||||
store.insert(ct.txid_prefix, ct.txindex);
|
||||
}
|
||||
md.height.checked_push(ct.txindex, height)?;
|
||||
md.txversion
|
||||
.checked_push(ct.txindex, ct.tx.version.into())?;
|
||||
md.txid.checked_push(ct.txindex, ct.txid)?;
|
||||
md.rawlocktime
|
||||
.checked_push(ct.txindex, ct.tx.lock_time.into())?;
|
||||
md.base_size
|
||||
.checked_push(ct.txindex, ct.base_size.into())?;
|
||||
md.total_size
|
||||
.checked_push(ct.txindex, ct.total_size.into())?;
|
||||
md.is_explicitly_rbf
|
||||
.checked_push(ct.txindex, StoredBool::from(ct.tx.is_explicitly_rbf()))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,21 +1,25 @@
|
||||
use brk_cohort::ByAddressType;
|
||||
use brk_error::{Error, Result};
|
||||
use brk_store::Store;
|
||||
use brk_types::{
|
||||
AddressIndexOutPoint, AddressIndexTxIndex, OutPoint, OutputType, TxInIndex, TxIndex, Txid,
|
||||
TxidPrefix, TypeIndex, Unit, Vin, Vout,
|
||||
};
|
||||
use rayon::prelude::*;
|
||||
use rustc_hash::{FxHashMap, FxHashSet};
|
||||
use vecdb::WritableVec;
|
||||
use vecdb::{PcoVec, WritableVec};
|
||||
|
||||
use super::{BlockProcessor, ComputedTx, InputSource, SameBlockOutputInfo};
|
||||
use crate::InputsVecs;
|
||||
|
||||
impl<'a> BlockProcessor<'a> {
|
||||
pub fn process_inputs(
|
||||
&self,
|
||||
txs: &[ComputedTx],
|
||||
txid_prefix_to_txindex: &mut FxHashMap<TxidPrefix, TxIndex>,
|
||||
) -> Result<Vec<(TxInIndex, InputSource)>> {
|
||||
let txid_prefix_to_txindex: FxHashMap<_, _> =
|
||||
txs.iter().map(|ct| (ct.txid_prefix, ct.txindex)).collect();
|
||||
txid_prefix_to_txindex.clear();
|
||||
txid_prefix_to_txindex.extend(txs.iter().map(|ct| (ct.txid_prefix, ct.txindex)));
|
||||
|
||||
let base_txindex = self.indexes.txindex;
|
||||
let base_txinindex = self.indexes.txinindex;
|
||||
@@ -28,6 +32,8 @@ impl<'a> BlockProcessor<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
let txid_prefix_to_txindex = &*txid_prefix_to_txindex;
|
||||
|
||||
let txins = items
|
||||
.into_par_iter()
|
||||
.enumerate()
|
||||
@@ -125,99 +131,78 @@ impl<'a> BlockProcessor<'a> {
|
||||
|
||||
pub fn collect_same_block_spent_outpoints(
|
||||
txins: &[(TxInIndex, InputSource)],
|
||||
) -> FxHashSet<OutPoint> {
|
||||
txins
|
||||
.iter()
|
||||
.filter_map(|(_, input_source)| {
|
||||
let InputSource::SameBlock { outpoint, .. } = input_source else {
|
||||
return None;
|
||||
};
|
||||
if !outpoint.is_coinbase() {
|
||||
Some(*outpoint)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn finalize_inputs(
|
||||
&mut self,
|
||||
txins: Vec<(TxInIndex, InputSource)>,
|
||||
mut same_block_output_info: FxHashMap<OutPoint, SameBlockOutputInfo>,
|
||||
) -> Result<()> {
|
||||
for (txinindex, input_source) in txins {
|
||||
let (vin, txindex, outpoint, outputtype, typeindex) = match input_source {
|
||||
InputSource::PreviousBlock {
|
||||
vin,
|
||||
txindex,
|
||||
outpoint,
|
||||
outputtype,
|
||||
typeindex,
|
||||
} => (vin, txindex, outpoint, outputtype, typeindex),
|
||||
InputSource::SameBlock {
|
||||
txindex,
|
||||
vin,
|
||||
outpoint,
|
||||
} => {
|
||||
if outpoint.is_coinbase() {
|
||||
(vin, txindex, outpoint, OutputType::Unknown, TypeIndex::COINBASE)
|
||||
} else {
|
||||
let info = same_block_output_info
|
||||
.remove(&outpoint)
|
||||
.ok_or(Error::Internal("Same-block output not found"))
|
||||
.inspect_err(|_| {
|
||||
dbg!(&same_block_output_info, outpoint);
|
||||
})?;
|
||||
(vin, txindex, outpoint, info.outputtype, info.typeindex)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if vin.is_zero() {
|
||||
self.vecs
|
||||
.transactions
|
||||
.first_txinindex
|
||||
.checked_push(txindex, txinindex)?;
|
||||
}
|
||||
|
||||
self.vecs
|
||||
.inputs
|
||||
.txindex
|
||||
.checked_push(txinindex, txindex)?;
|
||||
self.vecs
|
||||
.inputs
|
||||
.outpoint
|
||||
.checked_push(txinindex, outpoint)?;
|
||||
self.vecs
|
||||
.inputs
|
||||
.outputtype
|
||||
.checked_push(txinindex, outputtype)?;
|
||||
self.vecs
|
||||
.inputs
|
||||
.typeindex
|
||||
.checked_push(txinindex, typeindex)?;
|
||||
|
||||
if !outputtype.is_address() {
|
||||
continue;
|
||||
}
|
||||
let addresstype = outputtype;
|
||||
let addressindex = typeindex;
|
||||
|
||||
self.stores
|
||||
.addresstype_to_addressindex_and_txindex
|
||||
.get_mut_unwrap(addresstype)
|
||||
.insert(
|
||||
AddressIndexTxIndex::from((addressindex, txindex)),
|
||||
Unit,
|
||||
);
|
||||
|
||||
self.stores
|
||||
.addresstype_to_addressindex_and_unspentoutpoint
|
||||
.get_mut_unwrap(addresstype)
|
||||
.remove(AddressIndexOutPoint::from((addressindex, outpoint)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
out: &mut FxHashSet<OutPoint>,
|
||||
) {
|
||||
out.clear();
|
||||
out.extend(txins.iter().filter_map(|(_, input_source)| match input_source {
|
||||
InputSource::SameBlock { outpoint, .. } if !outpoint.is_coinbase() => Some(*outpoint),
|
||||
_ => None,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn finalize_inputs(
|
||||
first_txinindex: &mut PcoVec<TxIndex, TxInIndex>,
|
||||
inputs: &mut InputsVecs,
|
||||
addr_txindex_stores: &mut ByAddressType<Store<AddressIndexTxIndex, Unit>>,
|
||||
addr_outpoint_stores: &mut ByAddressType<Store<AddressIndexOutPoint, Unit>>,
|
||||
txins: Vec<(TxInIndex, InputSource)>,
|
||||
same_block_output_info: &mut FxHashMap<OutPoint, SameBlockOutputInfo>,
|
||||
) -> Result<()> {
|
||||
for (txinindex, input_source) in txins {
|
||||
let (vin, txindex, outpoint, outputtype, typeindex) = match input_source {
|
||||
InputSource::PreviousBlock {
|
||||
vin,
|
||||
txindex,
|
||||
outpoint,
|
||||
outputtype,
|
||||
typeindex,
|
||||
} => (vin, txindex, outpoint, outputtype, typeindex),
|
||||
InputSource::SameBlock {
|
||||
txindex,
|
||||
vin,
|
||||
outpoint,
|
||||
} => {
|
||||
if outpoint.is_coinbase() {
|
||||
(vin, txindex, outpoint, OutputType::Unknown, TypeIndex::COINBASE)
|
||||
} else {
|
||||
let info = same_block_output_info
|
||||
.remove(&outpoint)
|
||||
.ok_or(Error::Internal("Same-block output not found"))
|
||||
.inspect_err(|_| {
|
||||
dbg!(&same_block_output_info, outpoint);
|
||||
})?;
|
||||
(vin, txindex, outpoint, info.outputtype, info.typeindex)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if vin.is_zero() {
|
||||
first_txinindex.checked_push(txindex, txinindex)?;
|
||||
}
|
||||
|
||||
inputs.txindex.checked_push(txinindex, txindex)?;
|
||||
inputs.outpoint.checked_push(txinindex, outpoint)?;
|
||||
inputs.outputtype.checked_push(txinindex, outputtype)?;
|
||||
inputs.typeindex.checked_push(txinindex, typeindex)?;
|
||||
|
||||
if !outputtype.is_address() {
|
||||
continue;
|
||||
}
|
||||
let addresstype = outputtype;
|
||||
let addressindex = typeindex;
|
||||
|
||||
addr_txindex_stores
|
||||
.get_mut_unwrap(addresstype)
|
||||
.insert(
|
||||
AddressIndexTxIndex::from((addressindex, txindex)),
|
||||
Unit,
|
||||
);
|
||||
|
||||
addr_outpoint_stores
|
||||
.get_mut_unwrap(addresstype)
|
||||
.remove(AddressIndexOutPoint::from((addressindex, outpoint)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
use brk_cohort::ByAddressType;
|
||||
use brk_error::{Error, Result};
|
||||
use brk_store::Store;
|
||||
use brk_types::{
|
||||
AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, OutPoint, OutputType,
|
||||
Sats, TxIndex, TxOutIndex, TypeIndex, Unit, Vout,
|
||||
};
|
||||
use rayon::prelude::*;
|
||||
use rustc_hash::{FxHashMap, FxHashSet};
|
||||
use vecdb::WritableVec;
|
||||
use vecdb::{BytesVec, WritableVec};
|
||||
|
||||
use super::{BlockProcessor, ProcessedOutput, SameBlockOutputInfo};
|
||||
use crate::{AddressesVecs, Indexes, OutputsVecs, ScriptsVecs};
|
||||
|
||||
impl<'a> BlockProcessor<'a> {
|
||||
pub fn process_outputs(&self) -> Result<Vec<ProcessedOutput<'a>>> {
|
||||
@@ -104,150 +106,138 @@ impl<'a> BlockProcessor<'a> {
|
||||
)
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finalize_outputs(
|
||||
&mut self,
|
||||
txouts: Vec<ProcessedOutput>,
|
||||
same_block_spent_outpoints: &FxHashSet<OutPoint>,
|
||||
) -> Result<FxHashMap<OutPoint, SameBlockOutputInfo>> {
|
||||
let mut already_added_addresshash: ByAddressType<FxHashMap<AddressHash, TypeIndex>> =
|
||||
ByAddressType::default();
|
||||
let mut same_block_output_info: FxHashMap<OutPoint, SameBlockOutputInfo> =
|
||||
FxHashMap::with_capacity_and_hasher(
|
||||
same_block_spent_outpoints.len(),
|
||||
Default::default(),
|
||||
);
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn finalize_outputs(
|
||||
indexes: &mut Indexes,
|
||||
first_txoutindex: &mut BytesVec<TxIndex, TxOutIndex>,
|
||||
outputs: &mut OutputsVecs,
|
||||
addresses: &mut AddressesVecs,
|
||||
scripts: &mut ScriptsVecs,
|
||||
addr_hash_stores: &mut ByAddressType<Store<AddressHash, TypeIndex>>,
|
||||
addr_txindex_stores: &mut ByAddressType<Store<AddressIndexTxIndex, Unit>>,
|
||||
addr_outpoint_stores: &mut ByAddressType<Store<AddressIndexOutPoint, Unit>>,
|
||||
txouts: Vec<ProcessedOutput>,
|
||||
same_block_spent_outpoints: &FxHashSet<OutPoint>,
|
||||
already_added_addresshash: &mut ByAddressType<FxHashMap<AddressHash, TypeIndex>>,
|
||||
same_block_output_info: &mut FxHashMap<OutPoint, SameBlockOutputInfo>,
|
||||
) -> Result<()> {
|
||||
already_added_addresshash
|
||||
.values_mut()
|
||||
.for_each(|m| m.clear());
|
||||
same_block_output_info.clear();
|
||||
|
||||
for ProcessedOutput {
|
||||
txoutindex,
|
||||
txout,
|
||||
txindex,
|
||||
vout,
|
||||
outputtype,
|
||||
address_info,
|
||||
existing_typeindex,
|
||||
} in txouts
|
||||
{
|
||||
let sats = Sats::from(txout.value);
|
||||
for ProcessedOutput {
|
||||
txoutindex,
|
||||
txout,
|
||||
txindex,
|
||||
vout,
|
||||
outputtype,
|
||||
address_info,
|
||||
existing_typeindex,
|
||||
} in txouts
|
||||
{
|
||||
let sats = Sats::from(txout.value);
|
||||
|
||||
if vout.is_zero() {
|
||||
self.vecs
|
||||
.transactions
|
||||
.first_txoutindex
|
||||
.checked_push(txindex, txoutindex)?;
|
||||
}
|
||||
|
||||
self.vecs
|
||||
.outputs
|
||||
.txindex
|
||||
.checked_push(txoutindex, txindex)?;
|
||||
|
||||
let typeindex = if let Some(ti) = existing_typeindex {
|
||||
ti
|
||||
} else if let Some((address_bytes, address_hash)) = address_info {
|
||||
let addresstype = outputtype;
|
||||
if let Some(&ti) = already_added_addresshash
|
||||
.get_unwrap(addresstype)
|
||||
.get(&address_hash)
|
||||
{
|
||||
ti
|
||||
} else {
|
||||
let ti = self.indexes.increment_address_index(addresstype);
|
||||
|
||||
already_added_addresshash
|
||||
.get_mut_unwrap(addresstype)
|
||||
.insert(address_hash, ti);
|
||||
self.stores
|
||||
.addresstype_to_addresshash_to_addressindex
|
||||
.get_mut_unwrap(addresstype)
|
||||
.insert(address_hash, ti);
|
||||
self.vecs.push_bytes_if_needed(ti, address_bytes)?;
|
||||
|
||||
ti
|
||||
}
|
||||
} else {
|
||||
match outputtype {
|
||||
OutputType::P2MS => {
|
||||
self.vecs
|
||||
.scripts
|
||||
.p2ms_to_txindex
|
||||
.checked_push(self.indexes.p2msoutputindex, txindex)?;
|
||||
self.indexes.p2msoutputindex.copy_then_increment()
|
||||
}
|
||||
OutputType::OpReturn => {
|
||||
self.vecs
|
||||
.scripts
|
||||
.opreturn_to_txindex
|
||||
.checked_push(self.indexes.opreturnindex, txindex)?;
|
||||
self.indexes.opreturnindex.copy_then_increment()
|
||||
}
|
||||
OutputType::Empty => {
|
||||
self.vecs
|
||||
.scripts
|
||||
.empty_to_txindex
|
||||
.checked_push(self.indexes.emptyoutputindex, txindex)?;
|
||||
self.indexes.emptyoutputindex.copy_then_increment()
|
||||
}
|
||||
OutputType::Unknown => {
|
||||
self.vecs
|
||||
.scripts
|
||||
.unknown_to_txindex
|
||||
.checked_push(self.indexes.unknownoutputindex, txindex)?;
|
||||
self.indexes.unknownoutputindex.copy_then_increment()
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
};
|
||||
|
||||
self.vecs.outputs.value.checked_push(txoutindex, sats)?;
|
||||
self.vecs
|
||||
.outputs
|
||||
.outputtype
|
||||
.checked_push(txoutindex, outputtype)?;
|
||||
self.vecs
|
||||
.outputs
|
||||
.typeindex
|
||||
.checked_push(txoutindex, typeindex)?;
|
||||
|
||||
if outputtype.is_unspendable() {
|
||||
continue;
|
||||
} else if outputtype.is_address() {
|
||||
let addresstype = outputtype;
|
||||
let addressindex = typeindex;
|
||||
|
||||
self.stores
|
||||
.addresstype_to_addressindex_and_txindex
|
||||
.get_mut_unwrap(addresstype)
|
||||
.insert(
|
||||
AddressIndexTxIndex::from((addressindex, txindex)),
|
||||
Unit,
|
||||
);
|
||||
}
|
||||
|
||||
let outpoint = OutPoint::new(txindex, vout);
|
||||
|
||||
if same_block_spent_outpoints.contains(&outpoint) {
|
||||
same_block_output_info.insert(
|
||||
outpoint,
|
||||
SameBlockOutputInfo {
|
||||
outputtype,
|
||||
typeindex,
|
||||
},
|
||||
);
|
||||
} else if outputtype.is_address() {
|
||||
let addresstype = outputtype;
|
||||
let addressindex = typeindex;
|
||||
|
||||
self.stores
|
||||
.addresstype_to_addressindex_and_unspentoutpoint
|
||||
.get_mut_unwrap(addresstype)
|
||||
.insert(
|
||||
AddressIndexOutPoint::from((addressindex, outpoint)),
|
||||
Unit,
|
||||
);
|
||||
}
|
||||
if vout.is_zero() {
|
||||
first_txoutindex.checked_push(txindex, txoutindex)?;
|
||||
}
|
||||
|
||||
Ok(same_block_output_info)
|
||||
outputs.txindex.checked_push(txoutindex, txindex)?;
|
||||
|
||||
let typeindex = if let Some(ti) = existing_typeindex {
|
||||
ti
|
||||
} else if let Some((address_bytes, address_hash)) = address_info {
|
||||
let addresstype = outputtype;
|
||||
if let Some(&ti) = already_added_addresshash
|
||||
.get_unwrap(addresstype)
|
||||
.get(&address_hash)
|
||||
{
|
||||
ti
|
||||
} else {
|
||||
let ti = indexes.increment_address_index(addresstype);
|
||||
|
||||
already_added_addresshash
|
||||
.get_mut_unwrap(addresstype)
|
||||
.insert(address_hash, ti);
|
||||
addr_hash_stores
|
||||
.get_mut_unwrap(addresstype)
|
||||
.insert(address_hash, ti);
|
||||
addresses.push_bytes_if_needed(ti, address_bytes)?;
|
||||
|
||||
ti
|
||||
}
|
||||
} else {
|
||||
match outputtype {
|
||||
OutputType::P2MS => {
|
||||
scripts
|
||||
.p2ms_to_txindex
|
||||
.checked_push(indexes.p2msoutputindex, txindex)?;
|
||||
indexes.p2msoutputindex.copy_then_increment()
|
||||
}
|
||||
OutputType::OpReturn => {
|
||||
scripts
|
||||
.opreturn_to_txindex
|
||||
.checked_push(indexes.opreturnindex, txindex)?;
|
||||
indexes.opreturnindex.copy_then_increment()
|
||||
}
|
||||
OutputType::Empty => {
|
||||
scripts
|
||||
.empty_to_txindex
|
||||
.checked_push(indexes.emptyoutputindex, txindex)?;
|
||||
indexes.emptyoutputindex.copy_then_increment()
|
||||
}
|
||||
OutputType::Unknown => {
|
||||
scripts
|
||||
.unknown_to_txindex
|
||||
.checked_push(indexes.unknownoutputindex, txindex)?;
|
||||
indexes.unknownoutputindex.copy_then_increment()
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
};
|
||||
|
||||
outputs.value.checked_push(txoutindex, sats)?;
|
||||
outputs.outputtype.checked_push(txoutindex, outputtype)?;
|
||||
outputs.typeindex.checked_push(txoutindex, typeindex)?;
|
||||
|
||||
if outputtype.is_unspendable() {
|
||||
continue;
|
||||
} else if outputtype.is_address() {
|
||||
let addresstype = outputtype;
|
||||
let addressindex = typeindex;
|
||||
|
||||
addr_txindex_stores
|
||||
.get_mut_unwrap(addresstype)
|
||||
.insert(
|
||||
AddressIndexTxIndex::from((addressindex, txindex)),
|
||||
Unit,
|
||||
);
|
||||
}
|
||||
|
||||
let outpoint = OutPoint::new(txindex, vout);
|
||||
|
||||
if same_block_spent_outpoints.contains(&outpoint) {
|
||||
same_block_output_info.insert(
|
||||
outpoint,
|
||||
SameBlockOutputInfo {
|
||||
outputtype,
|
||||
typeindex,
|
||||
},
|
||||
);
|
||||
} else if outputtype.is_address() {
|
||||
let addresstype = outputtype;
|
||||
let addressindex = typeindex;
|
||||
|
||||
addr_outpoint_stores
|
||||
.get_mut_unwrap(addresstype)
|
||||
.insert(
|
||||
AddressIndexOutPoint::from((addressindex, outpoint)),
|
||||
Unit,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use bitcoin::{Transaction, TxOut};
|
||||
use brk_cohort::ByAddressType;
|
||||
use brk_types::{
|
||||
AddressBytes, AddressHash, OutPoint, OutputType, TxIndex, TxOutIndex, Txid, TxidPrefix,
|
||||
TypeIndex, Vin, Vout,
|
||||
};
|
||||
use rustc_hash::{FxHashMap, FxHashSet};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum InputSource {
|
||||
@@ -45,3 +47,13 @@ pub struct ComputedTx<'a> {
|
||||
pub base_size: u32,
|
||||
pub total_size: u32,
|
||||
}
|
||||
|
||||
/// Reusable buffers cleared and refilled each block to avoid allocation churn.
|
||||
#[derive(Default)]
|
||||
pub struct BlockBuffers {
|
||||
pub txid_prefix_map: FxHashMap<TxidPrefix, TxIndex>,
|
||||
pub same_block_spent: FxHashSet<OutPoint>,
|
||||
pub already_added_addresses: ByAddressType<FxHashMap<AddressHash, TypeIndex>>,
|
||||
pub same_block_output_info: FxHashMap<OutPoint, SameBlockOutputInfo>,
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ use brk_types::{
|
||||
Version,
|
||||
};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{AnyStoredVec, BytesVec, Database, WritableVec, ImportableVec, PcoVec, Stamp};
|
||||
use vecdb::{AnyStoredVec, BytesVec, Database, ImportableVec, PcoVec, Stamp, WritableVec};
|
||||
|
||||
use crate::parallel_import;
|
||||
|
||||
@@ -23,7 +23,39 @@ pub struct TransactionsVecs {
|
||||
pub first_txoutindex: BytesVec<TxIndex, TxOutIndex>,
|
||||
}
|
||||
|
||||
pub struct TxMetadataVecs<'a> {
|
||||
pub height: &'a mut PcoVec<TxIndex, Height>,
|
||||
pub txversion: &'a mut PcoVec<TxIndex, TxVersion>,
|
||||
pub txid: &'a mut BytesVec<TxIndex, Txid>,
|
||||
pub rawlocktime: &'a mut PcoVec<TxIndex, RawLockTime>,
|
||||
pub base_size: &'a mut PcoVec<TxIndex, StoredU32>,
|
||||
pub total_size: &'a mut PcoVec<TxIndex, StoredU32>,
|
||||
pub is_explicitly_rbf: &'a mut PcoVec<TxIndex, StoredBool>,
|
||||
}
|
||||
|
||||
impl TransactionsVecs {
|
||||
pub fn split_for_finalize(
|
||||
&mut self,
|
||||
) -> (
|
||||
&mut BytesVec<TxIndex, TxOutIndex>,
|
||||
&mut PcoVec<TxIndex, TxInIndex>,
|
||||
TxMetadataVecs<'_>,
|
||||
) {
|
||||
(
|
||||
&mut self.first_txoutindex,
|
||||
&mut self.first_txinindex,
|
||||
TxMetadataVecs {
|
||||
height: &mut self.height,
|
||||
txversion: &mut self.txversion,
|
||||
txid: &mut self.txid,
|
||||
rawlocktime: &mut self.rawlocktime,
|
||||
base_size: &mut self.base_size,
|
||||
total_size: &mut self.total_size,
|
||||
is_explicitly_rbf: &mut self.is_explicitly_rbf,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn forced_import(db: &Database, version: Version) -> Result<Self> {
|
||||
let (
|
||||
first_txindex,
|
||||
@@ -65,10 +97,8 @@ impl TransactionsVecs {
|
||||
pub fn truncate(&mut self, height: Height, txindex: TxIndex, stamp: Stamp) -> Result<()> {
|
||||
self.first_txindex
|
||||
.truncate_if_needed_with_stamp(height, stamp)?;
|
||||
self.height
|
||||
.truncate_if_needed_with_stamp(txindex, stamp)?;
|
||||
self.txid
|
||||
.truncate_if_needed_with_stamp(txindex, stamp)?;
|
||||
self.height.truncate_if_needed_with_stamp(txindex, stamp)?;
|
||||
self.txid.truncate_if_needed_with_stamp(txindex, stamp)?;
|
||||
self.txversion
|
||||
.truncate_if_needed_with_stamp(txindex, stamp)?;
|
||||
self.rawlocktime
|
||||
|
||||
Reference in New Issue
Block a user