mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
global: snapshot
This commit is contained in:
@@ -4,7 +4,7 @@ use brk_types::{
|
||||
CheckedSub, FeeRate, HalvingEpoch, Height, ONE_DAY_IN_SEC_F64, Sats, StoredF32, StoredF64,
|
||||
StoredU32, StoredU64, Timestamp, TxOutIndex, TxVersion,
|
||||
};
|
||||
use vecdb::{Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex, unlikely};
|
||||
use vecdb::{Exit, IterableVec, TypedVecIterator, VecIndex, unlikely};
|
||||
|
||||
use crate::{grouped::ComputedVecsFromHeight, indexes, price, utils::OptionExt, Indexes};
|
||||
|
||||
@@ -275,39 +275,11 @@ impl Vecs {
|
||||
// TxInIndex
|
||||
// ---
|
||||
|
||||
let txindex_to_first_txoutindex = &indexer.vecs.tx.txindex_to_first_txoutindex;
|
||||
let txindex_to_first_txoutindex_reader = txindex_to_first_txoutindex.create_reader();
|
||||
let txoutindex_to_value = &indexer.vecs.txout.txoutindex_to_value;
|
||||
let txoutindex_to_value_reader = indexer.vecs.txout.txoutindex_to_value.create_reader();
|
||||
self.txinindex_to_value.compute_transform(
|
||||
starting_indexes.txinindex,
|
||||
&indexer.vecs.txin.txinindex_to_outpoint,
|
||||
|(txinindex, outpoint, ..)| {
|
||||
if unlikely(outpoint.is_coinbase()) {
|
||||
return (txinindex, Sats::MAX);
|
||||
}
|
||||
let txoutindex = txindex_to_first_txoutindex
|
||||
.read_unwrap(outpoint.txindex(), &txindex_to_first_txoutindex_reader)
|
||||
+ outpoint.vout();
|
||||
|
||||
let value = if unlikely(txoutindex == TxOutIndex::COINBASE) {
|
||||
unreachable!()
|
||||
} else {
|
||||
txoutindex_to_value
|
||||
.unchecked_read(txoutindex, &txoutindex_to_value_reader)
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
(txinindex, value)
|
||||
},
|
||||
exit,
|
||||
)?;
|
||||
|
||||
self.txindex_to_input_value.compute_sum_from_indexes(
|
||||
starting_indexes.txindex,
|
||||
&indexer.vecs.tx.txindex_to_first_txinindex,
|
||||
&indexes.txindex_to_input_count,
|
||||
&self.txinindex_to_value,
|
||||
&indexer.vecs.txin.txinindex_to_value,
|
||||
exit,
|
||||
)?;
|
||||
|
||||
@@ -393,7 +365,8 @@ impl Vecs {
|
||||
let mut txindex_to_first_txoutindex_iter =
|
||||
indexer.vecs.tx.txindex_to_first_txoutindex.iter()?;
|
||||
let mut txindex_to_output_count_iter = indexes.txindex_to_output_count.iter();
|
||||
let mut txoutindex_to_value_iter = indexer.vecs.txout.txoutindex_to_value.iter()?;
|
||||
let mut txoutindex_to_txoutdata_iter =
|
||||
indexer.vecs.txout.txoutindex_to_txoutdata.iter()?;
|
||||
vec.compute_transform(
|
||||
starting_indexes.height,
|
||||
&indexer.vecs.tx.height_to_first_txindex,
|
||||
@@ -405,8 +378,9 @@ impl Vecs {
|
||||
let mut sats = Sats::ZERO;
|
||||
(first_txoutindex..first_txoutindex + usize::from(output_count)).for_each(
|
||||
|txoutindex| {
|
||||
sats += txoutindex_to_value_iter
|
||||
.get_unwrap(TxOutIndex::from(txoutindex));
|
||||
sats += txoutindex_to_txoutdata_iter
|
||||
.get_unwrap(TxOutIndex::from(txoutindex))
|
||||
.value;
|
||||
},
|
||||
);
|
||||
(height, sats)
|
||||
|
||||
@@ -126,9 +126,8 @@ impl Vecs {
|
||||
let mut txindex_to_first_txoutindex_iter =
|
||||
indexer.vecs.tx.txindex_to_first_txoutindex.iter()?;
|
||||
let mut txindex_to_output_count_iter = indexes.txindex_to_output_count.iter();
|
||||
let mut txoutindex_to_outputtype_iter =
|
||||
indexer.vecs.txout.txoutindex_to_outputtype.iter()?;
|
||||
let mut txoutindex_to_typeindex_iter = indexer.vecs.txout.txoutindex_to_typeindex.iter()?;
|
||||
let mut txoutindex_to_txoutdata_iter =
|
||||
indexer.vecs.txout.txoutindex_to_txoutdata.iter()?;
|
||||
let mut p2pk65addressindex_to_p2pk65bytes_iter = indexer
|
||||
.vecs
|
||||
.address
|
||||
@@ -181,8 +180,9 @@ impl Vecs {
|
||||
let pool = (*txoutindex..(*txoutindex + *outputcount))
|
||||
.map(TxOutIndex::from)
|
||||
.find_map(|txoutindex| {
|
||||
let outputtype = txoutindex_to_outputtype_iter.get_unwrap(txoutindex);
|
||||
let typeindex = txoutindex_to_typeindex_iter.get_unwrap(txoutindex);
|
||||
let txoutdata = txoutindex_to_txoutdata_iter.get_unwrap(txoutindex);
|
||||
let outputtype = txoutdata.outputtype;
|
||||
let typeindex = txoutdata.typeindex;
|
||||
|
||||
match outputtype {
|
||||
OutputType::P2PK65 => Some(AddressBytes::from(
|
||||
|
||||
@@ -24,8 +24,8 @@ use crate::{
|
||||
address::AddressTypeToAddressCount,
|
||||
compute::write::{process_address_updates, write},
|
||||
process::{
|
||||
AddressCache, InputsResult, build_txoutindex_to_height_map, process_inputs,
|
||||
process_outputs, process_received, process_sent,
|
||||
AddressCache, InputsResult, process_inputs, process_outputs, process_received,
|
||||
process_sent,
|
||||
},
|
||||
states::{BlockState, Transacted},
|
||||
},
|
||||
@@ -38,8 +38,8 @@ use super::{
|
||||
vecs::Vecs,
|
||||
},
|
||||
BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1,
|
||||
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, IndexerReaders, TxInIterators,
|
||||
TxOutIterators, VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex,
|
||||
BIP30_ORIGINAL_HEIGHT_2, ComputeContext, FLUSH_INTERVAL, TxInIterators, TxOutIterators,
|
||||
VecsReaders, build_txinindex_to_txindex, build_txoutindex_to_txindex,
|
||||
};
|
||||
|
||||
/// Process all blocks from starting_height to last_height.
|
||||
@@ -124,15 +124,8 @@ pub fn process_blocks(
|
||||
let mut height_to_price_iter = height_to_price.map(|v| v.into_iter());
|
||||
let mut dateindex_to_price_iter = dateindex_to_price.map(|v| v.into_iter());
|
||||
|
||||
info!("Building txoutindex_to_height map...");
|
||||
|
||||
// Build txoutindex -> height map for input processing
|
||||
let txoutindex_to_height = build_txoutindex_to_height_map(height_to_first_txoutindex);
|
||||
|
||||
info!("Creating readers...");
|
||||
|
||||
// Create readers for parallel data access
|
||||
let ir = IndexerReaders::new(indexer);
|
||||
let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
|
||||
|
||||
// Create reusable iterators for sequential txout/txin reads (16KB buffered)
|
||||
@@ -273,14 +266,14 @@ pub fn process_blocks(
|
||||
|
||||
// Collect output/input data using reusable iterators (16KB buffered reads)
|
||||
// Must be done before thread::scope since iterators aren't Send
|
||||
let (output_values, output_types, output_typeindexes) =
|
||||
txout_iters.collect_block_outputs(first_txoutindex, output_count);
|
||||
let txoutdata_vec = txout_iters.collect_block_outputs(first_txoutindex, output_count);
|
||||
|
||||
let input_outpoints = if input_count > 1 {
|
||||
txin_iters.collect_block_outpoints(first_txinindex + 1, input_count - 1)
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let (input_values, input_prev_heights, input_outputtypes, input_typeindexes) =
|
||||
if input_count > 1 {
|
||||
txin_iters.collect_block_inputs(first_txinindex + 1, input_count - 1)
|
||||
} else {
|
||||
(Vec::new(), Vec::new(), Vec::new(), Vec::new())
|
||||
};
|
||||
|
||||
// Process outputs and inputs in parallel with tick-tock
|
||||
let (outputs_result, inputs_result) = thread::scope(|scope| {
|
||||
@@ -293,11 +286,8 @@ pub fn process_blocks(
|
||||
let outputs_handle = scope.spawn(|| {
|
||||
// Process outputs (receive)
|
||||
process_outputs(
|
||||
output_count,
|
||||
&txoutindex_to_txindex,
|
||||
&output_values,
|
||||
&output_types,
|
||||
&output_typeindexes,
|
||||
&txoutdata_vec,
|
||||
&first_addressindexes,
|
||||
&cache,
|
||||
&vr,
|
||||
@@ -309,16 +299,12 @@ pub fn process_blocks(
|
||||
// Process inputs (send) - skip coinbase input
|
||||
let inputs_result = if input_count > 1 {
|
||||
process_inputs(
|
||||
first_txinindex + 1, // Skip coinbase
|
||||
input_count - 1,
|
||||
&txinindex_to_txindex[1..], // Skip coinbase
|
||||
&input_outpoints,
|
||||
&indexer.vecs.tx.txindex_to_first_txoutindex,
|
||||
&indexer.vecs.txout.txoutindex_to_value,
|
||||
&indexer.vecs.txout.txoutindex_to_outputtype,
|
||||
&indexer.vecs.txout.txoutindex_to_typeindex,
|
||||
&txoutindex_to_height,
|
||||
&ir,
|
||||
&input_values,
|
||||
&input_outputtypes,
|
||||
&input_typeindexes,
|
||||
&input_prev_heights,
|
||||
&first_addressindexes,
|
||||
&cache,
|
||||
&vr,
|
||||
@@ -331,7 +317,6 @@ pub fn process_blocks(
|
||||
sent_data: Default::default(),
|
||||
address_data: Default::default(),
|
||||
txindex_vecs: Default::default(),
|
||||
txoutindex_to_txinindex_updates: Default::default(),
|
||||
}
|
||||
};
|
||||
|
||||
@@ -426,12 +411,6 @@ pub fn process_blocks(
|
||||
vecs.utxo_cohorts.send(height_to_sent, chain_state);
|
||||
});
|
||||
|
||||
// Update txoutindex_to_txinindex
|
||||
vecs.update_txoutindex_to_txinindex(
|
||||
output_count,
|
||||
inputs_result.txoutindex_to_txinindex_updates,
|
||||
)?;
|
||||
|
||||
// Push to height-indexed vectors
|
||||
vecs.height_to_unspendable_supply
|
||||
.truncate_push(height, unspendable_supply)?;
|
||||
|
||||
@@ -17,7 +17,7 @@ mod write;
|
||||
pub use block_loop::process_blocks;
|
||||
pub use context::ComputeContext;
|
||||
pub use readers::{
|
||||
IndexerReaders, TxInIterators, TxOutIterators, VecsReaders, build_txinindex_to_txindex,
|
||||
TxInIterators, TxOutIterators, VecsReaders, build_txinindex_to_txindex,
|
||||
build_txoutindex_to_txindex,
|
||||
};
|
||||
pub use recover::{StartMode, determine_start_mode, recover_state, reset_state};
|
||||
|
||||
@@ -4,7 +4,9 @@
|
||||
|
||||
use brk_grouper::{ByAddressType, ByAnyAddress};
|
||||
use brk_indexer::Indexer;
|
||||
use brk_types::{OutPoint, OutputType, Sats, StoredU64, TxInIndex, TxIndex, TxOutIndex, TypeIndex};
|
||||
use brk_types::{
|
||||
Height, OutputType, Sats, StoredU64, TxInIndex, TxIndex, TxOutData, TxOutIndex, TypeIndex,
|
||||
};
|
||||
use vecdb::{
|
||||
BoxedVecIterator, BytesVecIterator, GenericStoredVec, PcodecVecIterator, Reader, VecIndex,
|
||||
VecIterator,
|
||||
@@ -12,45 +14,18 @@ use vecdb::{
|
||||
|
||||
use crate::stateful::address::{AddressesDataVecs, AnyAddressIndexesVecs};
|
||||
|
||||
/// Cached readers for indexer vectors.
|
||||
pub struct IndexerReaders {
|
||||
pub txindex_to_first_txoutindex: Reader,
|
||||
pub txoutindex_to_value: Reader,
|
||||
pub txoutindex_to_outputtype: Reader,
|
||||
pub txoutindex_to_typeindex: Reader,
|
||||
}
|
||||
|
||||
impl IndexerReaders {
|
||||
pub fn new(indexer: &Indexer) -> Self {
|
||||
Self {
|
||||
txindex_to_first_txoutindex: indexer
|
||||
.vecs
|
||||
.tx
|
||||
.txindex_to_first_txoutindex
|
||||
.create_reader(),
|
||||
txoutindex_to_value: indexer.vecs.txout.txoutindex_to_value.create_reader(),
|
||||
txoutindex_to_outputtype: indexer.vecs.txout.txoutindex_to_outputtype.create_reader(),
|
||||
txoutindex_to_typeindex: indexer.vecs.txout.txoutindex_to_typeindex.create_reader(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Reusable iterators for txout vectors (16KB buffered reads).
|
||||
///
|
||||
/// Iterators are created once and re-positioned each block to avoid
|
||||
/// creating new file handles repeatedly.
|
||||
pub struct TxOutIterators<'a> {
|
||||
value_iter: BytesVecIterator<'a, TxOutIndex, Sats>,
|
||||
outputtype_iter: BytesVecIterator<'a, TxOutIndex, OutputType>,
|
||||
typeindex_iter: BytesVecIterator<'a, TxOutIndex, TypeIndex>,
|
||||
txoutdata_iter: BytesVecIterator<'a, TxOutIndex, TxOutData>,
|
||||
}
|
||||
|
||||
impl<'a> TxOutIterators<'a> {
|
||||
pub fn new(indexer: &'a Indexer) -> Self {
|
||||
Self {
|
||||
value_iter: indexer.vecs.txout.txoutindex_to_value.into_iter(),
|
||||
outputtype_iter: indexer.vecs.txout.txoutindex_to_outputtype.into_iter(),
|
||||
typeindex_iter: indexer.vecs.txout.txoutindex_to_typeindex.into_iter(),
|
||||
txoutdata_iter: indexer.vecs.txout.txoutindex_to_txoutdata.into_iter(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,43 +34,50 @@ impl<'a> TxOutIterators<'a> {
|
||||
&mut self,
|
||||
first_txoutindex: usize,
|
||||
output_count: usize,
|
||||
) -> (Vec<Sats>, Vec<OutputType>, Vec<TypeIndex>) {
|
||||
let mut values = Vec::with_capacity(output_count);
|
||||
let mut output_types = Vec::with_capacity(output_count);
|
||||
let mut type_indexes = Vec::with_capacity(output_count);
|
||||
|
||||
for i in first_txoutindex..first_txoutindex + output_count {
|
||||
values.push(self.value_iter.get_at_unwrap(i));
|
||||
output_types.push(self.outputtype_iter.get_at_unwrap(i));
|
||||
type_indexes.push(self.typeindex_iter.get_at_unwrap(i));
|
||||
}
|
||||
|
||||
(values, output_types, type_indexes)
|
||||
) -> Vec<TxOutData> {
|
||||
(first_txoutindex..first_txoutindex + output_count)
|
||||
.map(|i| self.txoutdata_iter.get_at_unwrap(i))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Reusable iterator for txin outpoints (PcoVec - avoids repeated page decompression).
|
||||
/// Reusable iterators for txin vectors (PcoVec - avoids repeated page decompression).
|
||||
pub struct TxInIterators<'a> {
|
||||
outpoint_iter: PcodecVecIterator<'a, TxInIndex, OutPoint>,
|
||||
value_iter: PcodecVecIterator<'a, TxInIndex, Sats>,
|
||||
prev_height_iter: PcodecVecIterator<'a, TxInIndex, Height>,
|
||||
outputtype_iter: PcodecVecIterator<'a, TxInIndex, OutputType>,
|
||||
typeindex_iter: PcodecVecIterator<'a, TxInIndex, TypeIndex>,
|
||||
}
|
||||
|
||||
impl<'a> TxInIterators<'a> {
|
||||
pub fn new(indexer: &'a Indexer) -> Self {
|
||||
Self {
|
||||
outpoint_iter: indexer.vecs.txin.txinindex_to_outpoint.into_iter(),
|
||||
value_iter: indexer.vecs.txin.txinindex_to_value.into_iter(),
|
||||
prev_height_iter: indexer.vecs.txin.txinindex_to_prev_height.into_iter(),
|
||||
outputtype_iter: indexer.vecs.txin.txinindex_to_outputtype.into_iter(),
|
||||
typeindex_iter: indexer.vecs.txin.txinindex_to_typeindex.into_iter(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Collect outpoints for a block range using buffered iteration.
|
||||
/// This avoids repeated PcoVec page decompression (~1000x speedup).
|
||||
pub fn collect_block_outpoints(
|
||||
/// Collect input data for a block range using buffered iteration.
|
||||
pub fn collect_block_inputs(
|
||||
&mut self,
|
||||
first_txinindex: usize,
|
||||
input_count: usize,
|
||||
) -> Vec<OutPoint> {
|
||||
(first_txinindex..first_txinindex + input_count)
|
||||
.map(|i| self.outpoint_iter.get_at_unwrap(i))
|
||||
.collect()
|
||||
) -> (Vec<Sats>, Vec<Height>, Vec<OutputType>, Vec<TypeIndex>) {
|
||||
let mut values = Vec::with_capacity(input_count);
|
||||
let mut prev_heights = Vec::with_capacity(input_count);
|
||||
let mut outputtypes = Vec::with_capacity(input_count);
|
||||
let mut typeindexes = Vec::with_capacity(input_count);
|
||||
|
||||
for i in first_txinindex..first_txinindex + input_count {
|
||||
values.push(self.value_iter.get_at_unwrap(i));
|
||||
prev_heights.push(self.prev_height_iter.get_at_unwrap(i));
|
||||
outputtypes.push(self.outputtype_iter.get_at_unwrap(i));
|
||||
typeindexes.push(self.typeindex_iter.get_at_unwrap(i));
|
||||
}
|
||||
|
||||
(values, prev_heights, outputtypes, typeindexes)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,6 @@ pub struct RecoveredState {
|
||||
pub fn recover_state(
|
||||
height: Height,
|
||||
chain_state_rollback: vecdb::Result<Stamp>,
|
||||
txoutindex_rollback: vecdb::Result<Stamp>,
|
||||
any_address_indexes: &mut AnyAddressIndexesVecs,
|
||||
addresses_data: &mut AddressesDataVecs,
|
||||
utxo_cohorts: &mut UTXOCohorts,
|
||||
@@ -42,7 +41,6 @@ pub fn recover_state(
|
||||
// Verify rollback consistency - all must agree on the same height
|
||||
let consistent_height = rollback_states(
|
||||
chain_state_rollback,
|
||||
txoutindex_rollback,
|
||||
address_indexes_rollback,
|
||||
address_data_rollback,
|
||||
);
|
||||
@@ -127,7 +125,6 @@ pub enum StartMode {
|
||||
/// otherwise returns Height::ZERO (need fresh start).
|
||||
fn rollback_states(
|
||||
chain_state_rollback: vecdb::Result<Stamp>,
|
||||
txoutindex_rollback: vecdb::Result<Stamp>,
|
||||
address_indexes_rollbacks: Result<Vec<Stamp>>,
|
||||
address_data_rollbacks: Result<[Stamp; 2]>,
|
||||
) -> Height {
|
||||
@@ -139,11 +136,6 @@ fn rollback_states(
|
||||
};
|
||||
heights.insert(Height::from(s).incremented());
|
||||
|
||||
let Ok(s) = txoutindex_rollback else {
|
||||
return Height::ZERO;
|
||||
};
|
||||
heights.insert(Height::from(s).incremented());
|
||||
|
||||
let Ok(stamps) = address_indexes_rollbacks else {
|
||||
return Height::ZERO;
|
||||
};
|
||||
|
||||
@@ -89,9 +89,6 @@ pub fn write(
|
||||
vecs.addresstype_to_height_to_empty_addr_count
|
||||
.par_iter_mut(),
|
||||
)
|
||||
.chain(rayon::iter::once(
|
||||
&mut vecs.txoutindex_to_txinindex as &mut dyn AnyStoredVec,
|
||||
))
|
||||
.chain(rayon::iter::once(
|
||||
&mut vecs.chain_state as &mut dyn AnyStoredVec,
|
||||
))
|
||||
|
||||
@@ -39,8 +39,3 @@ pub use address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexe
|
||||
|
||||
// Cohort re-exports
|
||||
pub use cohorts::{AddressCohorts, CohortVecs, DynCohortVecs, UTXOCohorts};
|
||||
|
||||
// Compute re-exports
|
||||
pub use compute::IndexerReaders;
|
||||
|
||||
// Metrics re-exports
|
||||
|
||||
@@ -1,24 +1,20 @@
|
||||
//! Parallel input processing.
|
||||
//!
|
||||
//! Processes a block's inputs (spent UTXOs) in parallel, building:
|
||||
//! - height_to_sent: map from creation height -> Transacted for sends
|
||||
//! - Address data for address cohort tracking (optional)
|
||||
|
||||
use brk_grouper::ByAddressType;
|
||||
use brk_types::{Height, OutPoint, OutputType, Sats, TxInIndex, TxIndex, TxOutIndex, TypeIndex};
|
||||
use brk_types::{Height, OutputType, Sats, TxIndex, TypeIndex};
|
||||
use rayon::prelude::*;
|
||||
use rustc_hash::FxHashMap;
|
||||
use vecdb::{BytesVec, GenericStoredVec};
|
||||
|
||||
use crate::stateful::address::{
|
||||
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
|
||||
use crate::stateful::{
|
||||
address::{AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs},
|
||||
compute::VecsReaders,
|
||||
states::Transacted,
|
||||
};
|
||||
use crate::stateful::compute::VecsReaders;
|
||||
use crate::stateful::states::Transacted;
|
||||
use crate::stateful::{IndexerReaders, process::RangeMap};
|
||||
|
||||
use super::super::address::HeightToAddressTypeToVec;
|
||||
use super::{load_uncached_address_data, AddressCache, LoadedAddressDataWithSource, TxIndexVec};
|
||||
use super::{
|
||||
super::address::HeightToAddressTypeToVec, AddressCache, LoadedAddressDataWithSource,
|
||||
TxIndexVec, load_uncached_address_data,
|
||||
};
|
||||
|
||||
/// Result of processing inputs for a block.
|
||||
pub struct InputsResult {
|
||||
@@ -30,8 +26,6 @@ pub struct InputsResult {
|
||||
pub address_data: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource>,
|
||||
/// Transaction indexes per address for tx_count tracking.
|
||||
pub txindex_vecs: AddressTypeToTypeIndexMap<TxIndexVec>,
|
||||
/// Updates to txoutindex_to_txinindex: (spent txoutindex, spending txinindex).
|
||||
pub txoutindex_to_txinindex_updates: Vec<(TxOutIndex, TxInIndex)>,
|
||||
}
|
||||
|
||||
/// Process inputs (spent UTXOs) for a block.
|
||||
@@ -49,52 +43,32 @@ pub struct InputsResult {
|
||||
/// expensive merge overhead from rayon's fold/reduce pattern.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn process_inputs(
|
||||
first_txinindex: usize,
|
||||
input_count: usize,
|
||||
txinindex_to_txindex: &[TxIndex],
|
||||
// Pre-collected outpoints (from reusable iterator with page caching)
|
||||
outpoints: &[OutPoint],
|
||||
txindex_to_first_txoutindex: &BytesVec<TxIndex, TxOutIndex>,
|
||||
txoutindex_to_value: &BytesVec<TxOutIndex, Sats>,
|
||||
txoutindex_to_outputtype: &BytesVec<TxOutIndex, OutputType>,
|
||||
txoutindex_to_typeindex: &BytesVec<TxOutIndex, TypeIndex>,
|
||||
txoutindex_to_height: &RangeMap<TxOutIndex, Height>,
|
||||
ir: &IndexerReaders,
|
||||
// Address lookup parameters
|
||||
txinindex_to_value: &[Sats],
|
||||
txinindex_to_outputtype: &[OutputType],
|
||||
txinindex_to_typeindex: &[TypeIndex],
|
||||
txinindex_to_prev_height: &[Height],
|
||||
first_addressindexes: &ByAddressType<TypeIndex>,
|
||||
cache: &AddressCache,
|
||||
vr: &VecsReaders,
|
||||
any_address_indexes: &AnyAddressIndexesVecs,
|
||||
addresses_data: &AddressesDataVecs,
|
||||
) -> InputsResult {
|
||||
// Parallel reads - collect all input data (outpoints already in memory)
|
||||
let items: Vec<_> = (0..input_count)
|
||||
.into_par_iter()
|
||||
.map(|local_idx| {
|
||||
let txinindex = TxInIndex::from(first_txinindex + local_idx);
|
||||
let txindex = txinindex_to_txindex[local_idx];
|
||||
|
||||
// Get outpoint from pre-collected vec and resolve to txoutindex
|
||||
let outpoint = outpoints[local_idx];
|
||||
let first_txoutindex = txindex_to_first_txoutindex
|
||||
.read_unwrap(outpoint.txindex(), &ir.txindex_to_first_txoutindex);
|
||||
let txoutindex = first_txoutindex + outpoint.vout();
|
||||
let prev_height = *txinindex_to_prev_height.get(local_idx).unwrap();
|
||||
let value = *txinindex_to_value.get(local_idx).unwrap();
|
||||
let input_type = *txinindex_to_outputtype.get(local_idx).unwrap();
|
||||
|
||||
// Get creation height
|
||||
let prev_height = *txoutindex_to_height.get(txoutindex).unwrap();
|
||||
|
||||
// Get value and type from the output being spent
|
||||
let value = txoutindex_to_value.read_unwrap(txoutindex, &ir.txoutindex_to_value);
|
||||
let input_type =
|
||||
txoutindex_to_outputtype.read_unwrap(txoutindex, &ir.txoutindex_to_outputtype);
|
||||
|
||||
// Non-address inputs don't need typeindex or address lookup
|
||||
if input_type.is_not_address() {
|
||||
return (txinindex, txoutindex, prev_height, value, input_type, None);
|
||||
return (prev_height, value, input_type, None);
|
||||
}
|
||||
|
||||
let typeindex =
|
||||
txoutindex_to_typeindex.read_unwrap(txoutindex, &ir.txoutindex_to_typeindex);
|
||||
let typeindex = *txinindex_to_typeindex.get(local_idx).unwrap();
|
||||
|
||||
// Look up address data
|
||||
let addr_data_opt = load_uncached_address_data(
|
||||
@@ -108,8 +82,6 @@ pub fn process_inputs(
|
||||
);
|
||||
|
||||
(
|
||||
txinindex,
|
||||
txoutindex,
|
||||
prev_height,
|
||||
value,
|
||||
input_type,
|
||||
@@ -131,16 +103,13 @@ pub fn process_inputs(
|
||||
AddressTypeToTypeIndexMap::<LoadedAddressDataWithSource>::with_capacity(estimated_per_type);
|
||||
let mut txindex_vecs =
|
||||
AddressTypeToTypeIndexMap::<TxIndexVec>::with_capacity(estimated_per_type);
|
||||
let mut txoutindex_to_txinindex_updates = Vec::with_capacity(input_count);
|
||||
|
||||
for (txinindex, txoutindex, prev_height, value, output_type, addr_info) in items {
|
||||
for (prev_height, value, output_type, addr_info) in items {
|
||||
height_to_sent
|
||||
.entry(prev_height)
|
||||
.or_default()
|
||||
.iterate(value, output_type);
|
||||
|
||||
txoutindex_to_txinindex_updates.push((txoutindex, txinindex));
|
||||
|
||||
if let Some((typeindex, txindex, value, addr_data_opt)) = addr_info {
|
||||
sent_data
|
||||
.entry(prev_height)
|
||||
@@ -167,7 +136,5 @@ pub fn process_inputs(
|
||||
sent_data,
|
||||
address_data,
|
||||
txindex_vecs,
|
||||
txoutindex_to_txinindex_updates,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@ mod cache;
|
||||
mod inputs;
|
||||
mod lookup;
|
||||
mod outputs;
|
||||
mod range_map;
|
||||
mod received;
|
||||
mod sent;
|
||||
mod tx_counts;
|
||||
@@ -14,7 +13,6 @@ pub use cache::*;
|
||||
pub use inputs::*;
|
||||
pub use lookup::*;
|
||||
pub use outputs::*;
|
||||
pub use range_map::*;
|
||||
pub use received::*;
|
||||
pub use sent::*;
|
||||
pub use tx_counts::*;
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
//! - Address data for address cohort tracking (optional)
|
||||
|
||||
use brk_grouper::ByAddressType;
|
||||
use brk_types::{OutputType, Sats, TxIndex, TypeIndex};
|
||||
use brk_types::{Sats, TxIndex, TxOutData, TypeIndex};
|
||||
|
||||
use crate::stateful::address::{
|
||||
AddressTypeToTypeIndexMap, AddressesDataVecs, AnyAddressIndexesVecs,
|
||||
@@ -37,19 +37,16 @@ pub struct OutputsResult {
|
||||
/// 4. Track address-specific data for address cohort processing
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn process_outputs(
|
||||
output_count: usize,
|
||||
txoutindex_to_txindex: &[TxIndex],
|
||||
// Pre-collected output data (from reusable iterators with 16KB buffered reads)
|
||||
values: &[Sats],
|
||||
output_types: &[OutputType],
|
||||
typeindexes: &[TypeIndex],
|
||||
// Address lookup parameters
|
||||
txoutdata_vec: &[TxOutData],
|
||||
first_addressindexes: &ByAddressType<TypeIndex>,
|
||||
cache: &AddressCache,
|
||||
vr: &VecsReaders,
|
||||
any_address_indexes: &AnyAddressIndexesVecs,
|
||||
addresses_data: &AddressesDataVecs,
|
||||
) -> OutputsResult {
|
||||
let output_count = txoutdata_vec.len();
|
||||
|
||||
// Pre-allocate result structures
|
||||
let estimated_per_type = (output_count / 8).max(8);
|
||||
let mut transacted = Transacted::default();
|
||||
@@ -60,10 +57,10 @@ pub fn process_outputs(
|
||||
AddressTypeToTypeIndexMap::<TxIndexVec>::with_capacity(estimated_per_type);
|
||||
|
||||
// Single pass: read from pre-collected vecs and accumulate
|
||||
for local_idx in 0..output_count {
|
||||
for (local_idx, txoutdata) in txoutdata_vec.iter().enumerate() {
|
||||
let txindex = txoutindex_to_txindex[local_idx];
|
||||
let value = values[local_idx];
|
||||
let output_type = output_types[local_idx];
|
||||
let value = txoutdata.value;
|
||||
let output_type = txoutdata.outputtype;
|
||||
|
||||
transacted.iterate(value, output_type);
|
||||
|
||||
@@ -71,7 +68,7 @@ pub fn process_outputs(
|
||||
continue;
|
||||
}
|
||||
|
||||
let typeindex = typeindexes[local_idx];
|
||||
let typeindex = txoutdata.typeindex;
|
||||
|
||||
received_data
|
||||
.get_mut(output_type)
|
||||
|
||||
@@ -1,65 +0,0 @@
|
||||
//! Range-based lookup map.
|
||||
//!
|
||||
//! Maps ranges of indices to values for efficient reverse lookups.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use brk_types::{Height, TxOutIndex};
|
||||
use vecdb::{BytesVec, BytesVecValue, PcoVec, PcoVecValue, VecIndex};
|
||||
|
||||
/// Maps ranges of indices to their corresponding height.
|
||||
/// Used to efficiently look up which block a txoutindex belongs to.
|
||||
#[derive(Debug)]
|
||||
pub struct RangeMap<I, T>(BTreeMap<I, T>);
|
||||
|
||||
impl<I, T> RangeMap<I, T>
|
||||
where
|
||||
I: VecIndex,
|
||||
T: VecIndex,
|
||||
{
|
||||
/// Look up value for a key using range search.
|
||||
/// Returns the value associated with the largest key <= given key.
|
||||
#[inline]
|
||||
pub fn get(&self, key: I) -> Option<&T> {
|
||||
self.0.range(..=key).next_back().map(|(_, value)| value)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, T> From<&BytesVec<I, T>> for RangeMap<T, I>
|
||||
where
|
||||
I: VecIndex,
|
||||
T: VecIndex + BytesVecValue,
|
||||
{
|
||||
#[inline]
|
||||
fn from(vec: &BytesVec<I, T>) -> Self {
|
||||
Self(
|
||||
vec.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, v)| (v, I::from(i)))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, T> From<&PcoVec<I, T>> for RangeMap<T, I>
|
||||
where
|
||||
I: VecIndex,
|
||||
T: VecIndex + PcoVecValue,
|
||||
{
|
||||
#[inline]
|
||||
fn from(vec: &PcoVec<I, T>) -> Self {
|
||||
Self(
|
||||
vec.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, v)| (v, I::from(i)))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a RangeMap from height_to_first_txoutindex for fast txoutindex -> height lookups.
|
||||
pub fn build_txoutindex_to_height_map(
|
||||
height_to_first_txoutindex: &PcoVec<Height, TxOutIndex>,
|
||||
) -> RangeMap<TxOutIndex, Height> {
|
||||
RangeMap::from(height_to_first_txoutindex)
|
||||
}
|
||||
@@ -7,11 +7,11 @@ use brk_indexer::Indexer;
|
||||
use brk_traversable::Traversable;
|
||||
use brk_types::{
|
||||
Dollars, EmptyAddressData, EmptyAddressIndex, Height, LoadedAddressData, LoadedAddressIndex,
|
||||
Sats, StoredU64, TxInIndex, TxOutIndex, Version,
|
||||
Sats, StoredU64, Version,
|
||||
};
|
||||
use log::info;
|
||||
use vecdb::{
|
||||
AnyStoredVec, AnyVec, BytesVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec,
|
||||
AnyVec, BytesVec, Database, EagerVec, Exit, GenericStoredVec, ImportableVec,
|
||||
IterableCloneableVec, LazyVecFrom1, PAGE_SIZE, PcoVec, Stamp, TypedVecIterator, VecIndex,
|
||||
};
|
||||
|
||||
@@ -47,7 +47,6 @@ pub struct Vecs {
|
||||
// States
|
||||
// ---
|
||||
pub chain_state: BytesVec<Height, SupplyState>,
|
||||
pub txoutindex_to_txinindex: BytesVec<TxOutIndex, TxInIndex>,
|
||||
pub any_address_indexes: AnyAddressIndexesVecs,
|
||||
pub addresses_data: AddressesDataVecs,
|
||||
pub utxo_cohorts: UTXOCohorts,
|
||||
@@ -126,10 +125,6 @@ impl Vecs {
|
||||
vecdb::ImportOptions::new(&db, "chain", v0)
|
||||
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
|
||||
)?,
|
||||
txoutindex_to_txinindex: BytesVec::forced_import_with(
|
||||
vecdb::ImportOptions::new(&db, "txinindex", v0)
|
||||
.with_saved_stamped_changes(SAVED_STAMPED_CHANGES),
|
||||
)?,
|
||||
|
||||
height_to_unspendable_supply: EagerVec::forced_import(&db, "unspendable_supply", v0)?,
|
||||
indexes_to_unspendable_supply: ComputedValueVecsFromHeight::forced_import(
|
||||
@@ -265,12 +260,13 @@ impl Vecs {
|
||||
let stateful_min = utxo_min
|
||||
.min(address_min)
|
||||
.min(Height::from(self.chain_state.len()))
|
||||
.min(Height::from(self.txoutindex_to_txinindex.stamp()).incremented())
|
||||
.min(self.any_address_indexes.min_stamped_height())
|
||||
.min(self.addresses_data.min_stamped_height())
|
||||
.min(Height::from(self.height_to_unspendable_supply.len()))
|
||||
.min(Height::from(self.height_to_opreturn_supply.len()))
|
||||
.min(Height::from(self.addresstype_to_height_to_addr_count.min_len()))
|
||||
.min(Height::from(
|
||||
self.addresstype_to_height_to_addr_count.min_len(),
|
||||
))
|
||||
.min(Height::from(
|
||||
self.addresstype_to_height_to_empty_addr_count.min_len(),
|
||||
));
|
||||
@@ -285,13 +281,11 @@ impl Vecs {
|
||||
|
||||
// Rollback BytesVec state and capture results for validation
|
||||
let chain_state_rollback = self.chain_state.rollback_before(stamp);
|
||||
let txoutindex_rollback = self.txoutindex_to_txinindex.rollback_before(stamp);
|
||||
|
||||
// Validate all rollbacks and imports are consistent
|
||||
let recovered = recover_state(
|
||||
height,
|
||||
chain_state_rollback,
|
||||
txoutindex_rollback,
|
||||
&mut self.any_address_indexes,
|
||||
&mut self.addresses_data,
|
||||
&mut self.utxo_cohorts,
|
||||
@@ -309,7 +303,6 @@ impl Vecs {
|
||||
// Fresh start: reset all state
|
||||
let (starting_height, mut chain_state) = if recovered_height.is_zero() {
|
||||
self.chain_state.reset()?;
|
||||
self.txoutindex_to_txinindex.reset()?;
|
||||
self.height_to_unspendable_supply.reset()?;
|
||||
self.height_to_opreturn_supply.reset()?;
|
||||
self.addresstype_to_height_to_addr_count.reset()?;
|
||||
@@ -505,24 +498,4 @@ impl Vecs {
|
||||
self.db.compact()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update txoutindex_to_txinindex for a block.
|
||||
///
|
||||
/// 1. Push UNSPENT for all new outputs in the block
|
||||
/// 2. Update spent outputs with their spending txinindex
|
||||
pub fn update_txoutindex_to_txinindex(
|
||||
&mut self,
|
||||
output_count: usize,
|
||||
updates: Vec<(TxOutIndex, TxInIndex)>,
|
||||
) -> Result<()> {
|
||||
// Push UNSPENT for all new outputs in this block
|
||||
for _ in 0..output_count {
|
||||
self.txoutindex_to_txinindex.push(TxInIndex::UNSPENT);
|
||||
}
|
||||
// Update spent outputs with their spending txinindex
|
||||
for (txoutindex, txinindex) in updates {
|
||||
self.txoutindex_to_txinindex.update(txoutindex, txinindex)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user