indexer: updated

This commit is contained in:
nym21
2026-02-18 16:01:44 +01:00
parent 2f9dd47cc2
commit f04b548f8c
19 changed files with 189 additions and 205 deletions

View File

@@ -6,6 +6,7 @@ edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
exclude = ["examples/"]
[dependencies]
bitcoin = { workspace = true }
@@ -14,7 +15,7 @@ brk_cohort = { workspace = true }
brk_iterator = { workspace = true }
brk_logger = { workspace = true }
brk_reader = { workspace = true }
brk_rpc = { workspace = true }
brk_rpc = { workspace = true, features = ["corepc"] }
brk_store = { workspace = true }
brk_types = { workspace = true }
brk_traversable = { workspace = true }

View File

@@ -1,5 +1,6 @@
use brk_error::Result;
use brk_indexer::Indexer;
use vecdb::ReadableVec;
// use brk_types::Sats;
use std::{fs, path::Path};
@@ -14,25 +15,14 @@ fn main() -> Result<()> {
// let mut sum = Sats::ZERO;
// let mut count: usize = 0;
// for value in indexer.vecs.txoutindex_to_value.clean_iter()? {
// for value in indexer.vecs.txoutindex_to_value.clean_iter() {
// sum += value;
// count += 1;
// }
// println!("sum = {sum}, count = {count}");
dbg!(
indexer
.vecs
.outputs
.value
.iter()?
.enumerate()
.take(200)
// .filter(|(_, op)| !op.is_coinbase())
// .map(|(i, op)| (i, op.txindex(), op.vout()))
.collect::<Vec<_>>()
);
dbg!(indexer.vecs.outputs.value.collect_range(0, 200));
Ok(())
}

View File

@@ -3,16 +3,17 @@ use std::{fs, path::Path, time::Instant};
use brk_error::Result;
use brk_indexer::Indexer;
use brk_types::Sats;
use vecdb::ReadableVec;
fn run_benchmark(indexer: &Indexer) -> (Sats, std::time::Duration, usize) {
let start = Instant::now();
let mut sum = Sats::ZERO;
let mut count = 0;
for value in indexer.vecs.outputs.value.clean_iter().unwrap() {
indexer.vecs.outputs.value.for_each(|value| {
sum += value;
count += 1;
}
});
let duration = start.elapsed();
(sum, duration, count)

View File

@@ -1,7 +1,7 @@
use brk_error::Result;
use brk_types::Height;
use tracing::{debug, info};
use vecdb::{GenericStoredVec, IterableStoredVec, IterableVec, VecIndex, VecValue};
use vecdb::{AnyStoredVec, WritableVec, PcoVec, PcoVecValue, ReadableVec, VecIndex, VecValue};
use crate::{Stores, Vecs};
@@ -208,12 +208,12 @@ impl IndexesExt for Indexes {
}
pub fn starting_index<I, T>(
height_to_index: &impl IterableStoredVec<Height, I>,
index_to_else: &impl IterableVec<I, T>,
height_to_index: &PcoVec<Height, I>,
index_to_else: &impl ReadableVec<I, T>,
starting_height: Height,
) -> Option<I>
where
I: VecValue + VecIndex + From<usize>,
I: VecIndex + PcoVecValue + From<usize>,
T: VecValue,
{
let h = Height::from(height_to_index.stamp());
@@ -222,6 +222,6 @@ where
} else if h + 1_u32 == starting_height {
Some(I::from(index_to_else.len()))
} else {
height_to_index.iter().get(starting_height)
height_to_index.collect_one(starting_height.to_usize())
}
}

View File

@@ -7,7 +7,7 @@ use brk_iterator::Blocks;
use brk_rpc::Client;
use brk_types::Height;
use tracing::{debug, info};
use vecdb::Exit;
use vecdb::{Exit, ReadableVec};
mod constants;
mod indexes;
mod processor;
@@ -97,7 +97,7 @@ impl Indexer {
) -> Result<Indexes> {
debug!("Starting indexing...");
let last_blockhash = self.vecs.blocks.blockhash.iter()?.last();
let last_blockhash = self.vecs.blocks.blockhash.collect_last();
debug!("Last block hash found.");
let (starting_indexes, prev_hash) = if let Some(hash) = last_blockhash {
@@ -193,16 +193,21 @@ impl Indexer {
// Phase 2: Compute TXIDs in parallel
let txs = processor.compute_txids()?;
// Phase 3: Process inputs in parallel
let txins = processor.process_inputs(&txs)?;
// Phase 3+5: Process inputs and outputs in parallel
// They access different stores (txidprefix vs addresshash) and
// different vecs, so running concurrently hides latency of the
// shorter phase behind the longer one.
let (txins_result, txouts_result) = rayon::join(
|| processor.process_inputs(&txs),
|| processor.process_outputs(),
);
let txins = txins_result?;
let txouts = txouts_result?;
// Phase 4: Collect same-block spent outpoints
let same_block_spent_outpoints =
BlockProcessor::collect_same_block_spent_outpoints(&txins);
// Phase 5: Process outputs in parallel
let txouts = processor.process_outputs()?;
let tx_len = block.txdata.len();
let inputs_len = txins.len();
let outputs_len = txouts.len();

View File

@@ -1,7 +1,7 @@
use brk_error::{Error, Result};
use brk_types::{BlockHashPrefix, Timestamp};
use tracing::error;
use vecdb::GenericStoredVec;
use vecdb::WritableVec;
use super::BlockProcessor;
use crate::IndexesExt;
@@ -26,12 +26,11 @@ impl BlockProcessor<'_> {
self.stores
.blockhashprefix_to_height
.insert_if_needed(blockhash_prefix, height, height);
.insert(blockhash_prefix, height);
self.stores.height_to_coinbase_tag.insert_if_needed(
self.stores.height_to_coinbase_tag.insert(
height,
self.block.coinbase_tag().into(),
height,
);
self.vecs
@@ -46,14 +45,15 @@ impl BlockProcessor<'_> {
.blocks
.timestamp
.checked_push(height, Timestamp::from(self.block.header.time))?;
let (block_total_size, block_weight) = self.block.total_size_and_weight();
self.vecs
.blocks
.total_size
.checked_push(height, self.block.total_size().into())?;
.checked_push(height, block_total_size.into())?;
self.vecs
.blocks
.weight
.checked_push(height, self.block.weight().into())?;
.checked_push(height, block_weight.into())?;
Ok(())
}

View File

@@ -1,7 +1,7 @@
use brk_error::{Error, Result};
use brk_types::{StoredBool, TxIndex, Txid, TxidPrefix};
use rayon::prelude::*;
use vecdb::{AnyVec, GenericStoredVec, TypedVecIterator, likely};
use vecdb::{AnyVec, WritableVec, likely};
use crate::constants::DUPLICATE_TXIDS;
@@ -9,8 +9,7 @@ use super::{BlockProcessor, ComputedTx};
impl<'a> BlockProcessor<'a> {
pub fn compute_txids(&self) -> Result<Vec<ComputedTx<'a>>> {
let will_check_collisions =
self.check_collisions && self.stores.txidprefix_to_txindex.needs(self.height);
let will_check_collisions = self.check_collisions;
let base_txindex = self.indexes.txindex;
self.block
@@ -36,6 +35,8 @@ impl<'a> BlockProcessor<'a> {
txid,
txid_prefix,
prev_txindex_opt,
base_size: tx.base_size() as u32,
total_size: tx.total_size() as u32,
})
})
.collect()
@@ -47,7 +48,6 @@ impl<'a> BlockProcessor<'a> {
return Ok(());
}
let mut txindex_to_txid_iter = self.vecs.transactions.txid.into_iter();
for ct in txs.iter() {
let Some(prev_txindex) = ct.prev_txindex_opt else {
continue;
@@ -58,8 +58,11 @@ impl<'a> BlockProcessor<'a> {
}
let len = self.vecs.transactions.txid.len();
let prev_txid = txindex_to_txid_iter
.get(prev_txindex)
let prev_txid = self
.vecs
.transactions
.txid
.get_pushed_or_read(prev_txindex, &self.readers.txid)
.ok_or(Error::Internal("Missing txid for txindex"))
.inspect_err(|_| {
dbg!(ct.txindex, len);
@@ -81,11 +84,9 @@ impl<'a> BlockProcessor<'a> {
for ct in txs {
if ct.prev_txindex_opt.is_none() {
self.stores.txidprefix_to_txindex.insert_if_needed(
ct.txid_prefix,
ct.txindex,
height,
);
self.stores
.txidprefix_to_txindex
.insert(ct.txid_prefix, ct.txindex);
}
self.vecs
@@ -107,11 +108,11 @@ impl<'a> BlockProcessor<'a> {
self.vecs
.transactions
.base_size
.checked_push(ct.txindex, ct.tx.base_size().into())?;
.checked_push(ct.txindex, ct.base_size.into())?;
self.vecs
.transactions
.total_size
.checked_push(ct.txindex, ct.tx.total_size().into())?;
.checked_push(ct.txindex, ct.total_size.into())?;
self.vecs
.transactions
.is_explicitly_rbf

View File

@@ -5,33 +5,30 @@ use brk_types::{
};
use rayon::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet};
use vecdb::GenericStoredVec;
use vecdb::WritableVec;
use super::{BlockProcessor, ComputedTx, InputSource, SameBlockOutputInfo};
impl<'a> BlockProcessor<'a> {
pub fn process_inputs<'c>(
pub fn process_inputs(
&self,
txs: &[ComputedTx<'c>],
) -> Result<Vec<(TxInIndex, InputSource<'a>)>> {
txs: &[ComputedTx],
) -> Result<Vec<(TxInIndex, InputSource)>> {
let txid_prefix_to_txindex: FxHashMap<_, _> =
txs.iter().map(|ct| (ct.txid_prefix, &ct.txindex)).collect();
txs.iter().map(|ct| (ct.txid_prefix, ct.txindex)).collect();
let base_txindex = self.indexes.txindex;
let base_txinindex = self.indexes.txinindex;
let txins = self
.block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.input
.iter()
.enumerate()
.map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx))
})
.collect::<Vec<_>>()
let total_inputs: usize = self.block.txdata.iter().map(|tx| tx.input.len()).sum();
let mut items = Vec::with_capacity(total_inputs);
for (index, tx) in self.block.txdata.iter().enumerate() {
for (vin, txin) in tx.input.iter().enumerate() {
items.push((TxIndex::from(index), Vin::from(vin), txin, tx));
}
}
let txins = items
.into_par_iter()
.enumerate()
.map(
@@ -44,7 +41,6 @@ impl<'a> BlockProcessor<'a> {
txinindex,
InputSource::SameBlock {
txindex,
txin,
vin,
outpoint: OutPoint::COINBASE,
},
@@ -56,44 +52,41 @@ impl<'a> BlockProcessor<'a> {
let txid_prefix = TxidPrefix::from(&txid);
let vout = Vout::from(outpoint.vout);
if let Some(&&same_block_txindex) = txid_prefix_to_txindex
if let Some(&same_block_txindex) = txid_prefix_to_txindex
.get(&txid_prefix) {
let outpoint = OutPoint::new(same_block_txindex, vout);
return Ok((
txinindex,
InputSource::SameBlock {
txindex,
txin,
vin,
outpoint,
},
));
}
let prev_txindex = if let Some(txindex) = self
let store_result = self
.stores
.txidprefix_to_txindex
.get(&txid_prefix)?
.map(|v| *v)
.and_then(|txindex| {
(txindex < self.indexes.txindex).then_some(txindex)
})
{
txindex
} else {
let store_result = self.stores.txidprefix_to_txindex.get(&txid_prefix)?;
tracing::error!(
"UnknownTxid: txid={}, prefix={:?}, store_result={:?}, current_txindex={:?}",
txid, txid_prefix, store_result, self.indexes.txindex
);
return Err(Error::UnknownTxid);
.map(|v| *v);
let prev_txindex = match store_result {
Some(txindex) if txindex < self.indexes.txindex => txindex,
_ => {
tracing::error!(
"UnknownTxid: txid={}, prefix={:?}, store_result={:?}, current_txindex={:?}",
txid, txid_prefix, store_result, self.indexes.txindex
);
return Err(Error::UnknownTxid);
}
};
let txoutindex = self
.vecs
.transactions
.first_txoutindex
.get_pushed_or_read(prev_txindex, &self.readers.txindex_to_first_txoutindex)?
.get_pushed_or_read(prev_txindex, &self.readers.txindex_to_first_txoutindex)
.ok_or(Error::Internal("Missing txoutindex"))?
+ vout;
@@ -103,14 +96,14 @@ impl<'a> BlockProcessor<'a> {
.vecs
.outputs
.outputtype
.get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_outputtype)?
.get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_outputtype)
.ok_or(Error::Internal("Missing outputtype"))?;
let typeindex = self
.vecs
.outputs
.typeindex
.get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_typeindex)?
.get_pushed_or_read(txoutindex, &self.readers.txoutindex_to_typeindex)
.ok_or(Error::Internal("Missing typeindex"))?;
Ok((
@@ -153,8 +146,6 @@ impl<'a> BlockProcessor<'a> {
txins: Vec<(TxInIndex, InputSource)>,
mut same_block_output_info: FxHashMap<OutPoint, SameBlockOutputInfo>,
) -> Result<()> {
let height = self.height;
for (txinindex, input_source) in txins {
let (vin, txindex, outpoint, outputtype, typeindex) = match input_source {
InputSource::PreviousBlock {
@@ -166,7 +157,6 @@ impl<'a> BlockProcessor<'a> {
} => (vin, txindex, outpoint, outputtype, typeindex),
InputSource::SameBlock {
txindex,
txin,
vin,
outpoint,
} => {
@@ -177,7 +167,7 @@ impl<'a> BlockProcessor<'a> {
.remove(&outpoint)
.ok_or(Error::Internal("Same-block output not found"))
.inspect_err(|_| {
dbg!(&same_block_output_info, txin);
dbg!(&same_block_output_info, outpoint);
})?;
(vin, txindex, outpoint, info.outputtype, info.typeindex)
}
@@ -217,16 +207,15 @@ impl<'a> BlockProcessor<'a> {
self.stores
.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.insert_if_needed(
.insert(
AddressIndexTxIndex::from((addressindex, txindex)),
Unit,
height,
);
self.stores
.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.remove_if_needed(AddressIndexOutPoint::from((addressindex, outpoint)), height);
.remove(AddressIndexOutPoint::from((addressindex, outpoint)));
}
Ok(())

View File

@@ -6,7 +6,7 @@ use brk_types::{
};
use rayon::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet};
use vecdb::GenericStoredVec;
use vecdb::WritableVec;
use super::{BlockProcessor, ProcessedOutput, SameBlockOutputInfo};
@@ -18,17 +18,15 @@ impl<'a> BlockProcessor<'a> {
let base_txindex = self.indexes.txindex;
let base_txoutindex = self.indexes.txoutindex;
self.block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.output
.iter()
.enumerate()
.map(move |(vout, txout)| (TxIndex::from(index), Vout::from(vout), txout, tx))
})
.collect::<Vec<_>>()
let total_outputs: usize = self.block.txdata.iter().map(|tx| tx.output.len()).sum();
let mut items = Vec::with_capacity(total_outputs);
for (index, tx) in self.block.txdata.iter().enumerate() {
for (vout, txout) in tx.output.iter().enumerate() {
items.push((TxIndex::from(index), Vout::from(vout), txout, tx));
}
}
items
.into_par_iter()
.enumerate()
.map(
@@ -59,8 +57,7 @@ impl<'a> BlockProcessor<'a> {
.stores
.addresstype_to_addresshash_to_addressindex
.get_unwrap(addresstype)
.get(&address_hash)
.unwrap()
.get(&address_hash)?
.map(|v| *v)
.and_then(|typeindex_local| {
(typeindex_local < self.indexes.to_typeindex(addresstype))
@@ -68,22 +65,14 @@ impl<'a> BlockProcessor<'a> {
});
if check_collisions && let Some(typeindex) = existing_typeindex {
let prev_addressbytes_opt = self.vecs.get_addressbytes_by_type(
let prev_addressbytes = self.vecs.get_addressbytes_by_type(
addresstype,
typeindex,
self.readers.addressbytes.get_unwrap(addresstype),
)?;
let prev_addressbytes = prev_addressbytes_opt
.as_ref()
.ok_or(Error::Internal("Missing addressbytes"))?;
)
.ok_or(Error::Internal("Missing addressbytes"))?;
if self
.stores
.addresstype_to_addresshash_to_addressindex
.get_unwrap(addresstype)
.needs(height)
&& prev_addressbytes != &address_bytes
{
if prev_addressbytes != address_bytes {
let txid = tx.compute_txid();
dbg!(
height,
@@ -121,7 +110,6 @@ impl<'a> BlockProcessor<'a> {
txouts: Vec<ProcessedOutput>,
same_block_spent_outpoints: &FxHashSet<OutPoint>,
) -> Result<FxHashMap<OutPoint, SameBlockOutputInfo>> {
let height = self.height;
let mut already_added_addresshash: ByAddressType<FxHashMap<AddressHash, TypeIndex>> =
ByAddressType::default();
let mut same_block_output_info: FxHashMap<OutPoint, SameBlockOutputInfo> =
@@ -172,7 +160,7 @@ impl<'a> BlockProcessor<'a> {
self.stores
.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(addresstype)
.insert_if_needed(address_hash, ti, height);
.insert(address_hash, ti);
self.vecs.push_bytes_if_needed(ti, address_bytes)?;
ti
@@ -230,10 +218,9 @@ impl<'a> BlockProcessor<'a> {
self.stores
.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.insert_if_needed(
.insert(
AddressIndexTxIndex::from((addressindex, txindex)),
Unit,
height,
);
}
@@ -254,10 +241,9 @@ impl<'a> BlockProcessor<'a> {
self.stores
.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.insert_if_needed(
.insert(
AddressIndexOutPoint::from((addressindex, outpoint)),
Unit,
height,
);
}
}

View File

@@ -1,11 +1,11 @@
use bitcoin::{Transaction, TxIn, TxOut};
use bitcoin::{Transaction, TxOut};
use brk_types::{
AddressBytes, AddressHash, OutPoint, OutputType, TxIndex, TxOutIndex, Txid, TxidPrefix,
TypeIndex, Vin, Vout,
};
#[derive(Debug)]
pub enum InputSource<'a> {
pub enum InputSource {
PreviousBlock {
vin: Vin,
txindex: TxIndex,
@@ -15,7 +15,6 @@ pub enum InputSource<'a> {
},
SameBlock {
txindex: TxIndex,
txin: &'a TxIn,
vin: Vin,
outpoint: OutPoint,
},
@@ -43,4 +42,6 @@ pub struct ComputedTx<'a> {
pub txid: Txid,
pub txid_prefix: TxidPrefix,
pub prev_txindex_opt: Option<TxIndex>,
pub base_size: u32,
pub total_size: u32,
}

View File

@@ -1,11 +1,12 @@
use brk_cohort::ByAddressType;
use vecdb::{GenericStoredVec, Reader};
use vecdb::Reader;
use crate::Vecs;
/// Readers for vectors that need to be accessed during block processing.
/// These provide consistent snapshots for reading while the main vectors are being modified.
pub struct Readers {
pub txid: Reader,
pub txindex_to_first_txoutindex: Reader,
pub txoutindex_to_outputtype: Reader,
pub txoutindex_to_typeindex: Reader,
@@ -15,6 +16,7 @@ pub struct Readers {
impl Readers {
pub fn new(vecs: &Vecs) -> Self {
Self {
txid: vecs.transactions.txid.create_reader(),
txindex_to_first_txoutindex: vecs.transactions.first_txoutindex.create_reader(),
txoutindex_to_outputtype: vecs.outputs.outputtype.create_reader(),
txoutindex_to_typeindex: vecs.outputs.typeindex.create_reader(),

View File

@@ -12,7 +12,7 @@ use brk_types::{
use fjall::{Database, PersistMode};
use rayon::prelude::*;
use tracing::info;
use vecdb::{AnyVec, TypedVecIterator, VecIndex, VecIterator};
use vecdb::{AnyVec, ReadableVec, VecIndex};
use crate::{Indexes, constants::DUPLICATE_TXID_PREFIXES};
@@ -214,14 +214,14 @@ impl Stores {
}
if starting_indexes.height != Height::ZERO {
vecs.blocks
.blockhash
.iter()?
.skip(starting_indexes.height.to_usize())
.map(BlockHashPrefix::from)
.for_each(|prefix| {
vecs.blocks.blockhash.for_each_range(
starting_indexes.height.to_usize(),
vecs.blocks.blockhash.len(),
|blockhash| {
let prefix = BlockHashPrefix::from(blockhash);
self.blockhashprefix_to_height.remove(prefix);
});
},
);
(starting_indexes.height.to_usize()..vecs.blocks.blockhash.len())
.map(Height::from)
@@ -257,16 +257,17 @@ impl Stores {
let remove_count = txid_vec_len.saturating_sub(skip_count);
tracing::debug!(
"Rollback TXIDs: vec_len={}, skip={}, removing={}",
txid_vec_len, skip_count, remove_count
txid_vec_len,
skip_count,
remove_count
);
vecs.transactions
.txid
.iter()?
.enumerate()
.skip(starting_indexes.txindex.to_usize())
.for_each(|(txindex, txid)| {
let txindex = TxIndex::from(txindex);
{
let start = starting_indexes.txindex.to_usize();
let end = vecs.transactions.txid.len();
let mut current_index = start;
vecs.transactions.txid.for_each_range(start, end, |txid| {
let txindex = TxIndex::from(current_index);
let txidprefix = TxidPrefix::from(&txid);
let is_known_dup =
@@ -279,7 +280,9 @@ impl Stores {
if !is_known_dup {
self.txidprefix_to_txindex.remove(txidprefix);
}
current_index += 1;
});
}
// Clear caches to prevent stale reads after rollback
self.txidprefix_to_txindex.clear_caches();
@@ -288,35 +291,38 @@ impl Stores {
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
let mut txoutindex_to_txindex_iter = vecs.outputs.txindex.iter()?;
let mut txindex_to_first_txoutindex_iter =
vecs.transactions.first_txoutindex.iter()?;
let mut txoutindex_to_outputtype_iter = vecs.outputs.outputtype.iter()?;
let mut txoutindex_to_typeindex_iter = vecs.outputs.typeindex.iter()?;
let txindex_to_first_txoutindex_reader = vecs.transactions.first_txoutindex.reader();
let txoutindex_to_outputtype_reader = vecs.outputs.outputtype.reader();
let txoutindex_to_typeindex_reader = vecs.outputs.typeindex.reader();
// Collect unique (addresstype, addressindex, txindex) to avoid double deletion
// when same address receives multiple outputs in same transaction
let mut addressindex_txindex_to_remove: FxHashSet<(OutputType, TypeIndex, TxIndex)> =
FxHashSet::default();
for txoutindex in
starting_indexes.txoutindex.to_usize()..vecs.outputs.outputtype.len()
{
let outputtype = txoutindex_to_outputtype_iter.get_at_unwrap(txoutindex);
let rollback_start = starting_indexes.txoutindex.to_usize();
let rollback_end = vecs.outputs.outputtype.len();
// Pre-collect PcoVec range to avoid per-element page decompression
let txindexes: Vec<TxIndex> =
vecs.outputs.txindex.collect_range(rollback_start, rollback_end);
for (i, txoutindex) in (rollback_start..rollback_end).enumerate() {
let outputtype = txoutindex_to_outputtype_reader.get(txoutindex);
if !outputtype.is_address() {
continue;
}
let addresstype = outputtype;
let addressindex = txoutindex_to_typeindex_iter.get_at_unwrap(txoutindex);
let txindex = txoutindex_to_txindex_iter.get_at_unwrap(txoutindex);
let addressindex = txoutindex_to_typeindex_reader.get(txoutindex);
let txindex = txindexes[i];
addressindex_txindex_to_remove.insert((addresstype, addressindex, txindex));
let vout = Vout::from(
txoutindex
- txindex_to_first_txoutindex_iter
.get_unwrap(txindex)
- txindex_to_first_txoutindex_reader
.get(txindex.to_usize())
.to_usize(),
);
let outpoint = OutPoint::new(txindex, vout);
@@ -331,19 +337,15 @@ impl Stores {
// Collect outputs that were spent after the rollback point
// We need to: 1) reset their spend status, 2) restore address stores
let mut txindex_to_first_txoutindex_iter =
vecs.transactions.first_txoutindex.iter()?;
let mut txoutindex_to_outputtype_iter = vecs.outputs.outputtype.iter()?;
let mut txoutindex_to_typeindex_iter = vecs.outputs.typeindex.iter()?;
let mut txinindex_to_txindex_iter = vecs.inputs.txindex.iter()?;
let start = starting_indexes.txinindex.to_usize();
let end = vecs.inputs.outpoint.len();
let outpoints: Vec<OutPoint> = vecs.inputs.outpoint.collect_range(start, end);
let spending_txindexes: Vec<TxIndex> = vecs.inputs.txindex.collect_range(start, end);
let outputs_to_unspend: Vec<_> = vecs
.inputs
.outpoint
.iter()?
.enumerate()
.skip(starting_indexes.txinindex.to_usize())
.filter_map(|(txinindex, outpoint): (usize, OutPoint)| {
let outputs_to_unspend: Vec<_> = outpoints
.into_iter()
.zip(spending_txindexes)
.filter_map(|(outpoint, spending_txindex)| {
if outpoint.is_coinbase() {
return None;
}
@@ -353,13 +355,13 @@ impl Stores {
// Calculate txoutindex from output's txindex and vout
let txoutindex =
txindex_to_first_txoutindex_iter.get_unwrap(output_txindex) + vout;
txindex_to_first_txoutindex_reader.get(output_txindex.to_usize()) + vout;
// Only process if this output was created before the rollback point
if txoutindex < starting_indexes.txoutindex {
let outputtype = txoutindex_to_outputtype_iter.get_unwrap(txoutindex);
let typeindex = txoutindex_to_typeindex_iter.get_unwrap(txoutindex);
let spending_txindex = txinindex_to_txindex_iter.get_at_unwrap(txinindex);
let outputtype =
txoutindex_to_outputtype_reader.get(txoutindex.to_usize());
let typeindex = txoutindex_to_typeindex_reader.get(txoutindex.to_usize());
Some((outpoint, outputtype, typeindex, spending_txindex))
} else {
@@ -378,7 +380,11 @@ impl Stores {
let addressindex = typeindex;
// Add to same set as first loop
addressindex_txindex_to_remove.insert((addresstype, addressindex, spending_txindex));
addressindex_txindex_to_remove.insert((
addresstype,
addressindex,
spending_txindex,
));
// OutPoints are unique, no dedup needed for insert
self.addresstype_to_addressindex_and_unspentoutpoint

View File

@@ -8,8 +8,8 @@ use brk_types::{
};
use rayon::prelude::*;
use vecdb::{
AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PcoVec, Reader, Stamp,
TypedVecIterator,
AnyStoredVec, BytesVec, Database, WritableVec, ImportableVec, PcoVec, Reader, ReadableVec,
Stamp, VecIndex,
};
use crate::parallel_import;
@@ -171,71 +171,70 @@ impl AddressesVecs {
addresstype: OutputType,
typeindex: TypeIndex,
reader: &Reader,
) -> Result<Option<AddressBytes>> {
) -> Option<AddressBytes> {
match addresstype {
OutputType::P2PK65 => self
.p2pk65bytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
.map(AddressBytes::from),
OutputType::P2PK33 => self
.p2pk33bytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
.map(AddressBytes::from),
OutputType::P2PKH => self
.p2pkhbytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
.map(AddressBytes::from),
OutputType::P2SH => self
.p2shbytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
.map(AddressBytes::from),
OutputType::P2WPKH => self
.p2wpkhbytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
.map(AddressBytes::from),
OutputType::P2WSH => self
.p2wshbytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
.map(AddressBytes::from),
OutputType::P2TR => self
.p2trbytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
.map(AddressBytes::from),
OutputType::P2A => self
.p2abytes
.get_pushed_or_read(typeindex.into(), reader)
.map(|opt| opt.map(AddressBytes::from)),
.map(AddressBytes::from),
_ => unreachable!("get_bytes_by_type called with non-address type"),
}
.map_err(|e| e.into())
}
pub fn push_bytes_if_needed(&mut self, index: TypeIndex, bytes: AddressBytes) -> Result<()> {
match bytes {
AddressBytes::P2PK65(bytes) => self
.p2pk65bytes
.checked_push(index.into(), *bytes)?,
.checked_push(index.into(), bytes)?,
AddressBytes::P2PK33(bytes) => self
.p2pk33bytes
.checked_push(index.into(), *bytes)?,
.checked_push(index.into(), bytes)?,
AddressBytes::P2PKH(bytes) => self
.p2pkhbytes
.checked_push(index.into(), *bytes)?,
.checked_push(index.into(), bytes)?,
AddressBytes::P2SH(bytes) => self
.p2shbytes
.checked_push(index.into(), *bytes)?,
.checked_push(index.into(), bytes)?,
AddressBytes::P2WPKH(bytes) => self
.p2wpkhbytes
.checked_push(index.into(), *bytes)?,
.checked_push(index.into(), bytes)?,
AddressBytes::P2WSH(bytes) => self
.p2wshbytes
.checked_push(index.into(), *bytes)?,
.checked_push(index.into(), bytes)?,
AddressBytes::P2TR(bytes) => self
.p2trbytes
.checked_push(index.into(), *bytes)?,
.checked_push(index.into(), bytes)?,
AddressBytes::P2A(bytes) => self
.p2abytes
.checked_push(index.into(), *bytes)?,
.checked_push(index.into(), bytes)?,
};
Ok(())
}
@@ -250,11 +249,12 @@ impl AddressesVecs {
) -> Result<Box<dyn Iterator<Item = AddressHash> + '_>> {
macro_rules! make_iter {
($height_vec:expr, $bytes_vec:expr) => {{
match $height_vec.read_once(height) {
Ok(mut index) => {
let mut iter = $bytes_vec.iter()?;
let h = height.to_usize();
match $height_vec.collect_one(h) {
Some(mut index) => {
let reader = $bytes_vec.reader();
Ok(Box::new(std::iter::from_fn(move || {
iter.get(index).map(|typedbytes| {
reader.try_get(index.to_usize()).map(|typedbytes| {
let bytes = AddressBytes::from(typedbytes);
index.increment();
AddressHash::from(&bytes)
@@ -262,7 +262,7 @@ impl AddressesVecs {
}))
as Box<dyn Iterator<Item = AddressHash> + '_>)
}
Err(_) => {
None => {
Ok(Box::new(std::iter::empty())
as Box<dyn Iterator<Item = AddressHash> + '_>)
}

View File

@@ -2,7 +2,7 @@ use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{BlockHash, Height, StoredF64, StoredU64, Timestamp, Version, Weight};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PcoVec, Stamp};
use vecdb::{AnyStoredVec, BytesVec, Database, WritableVec, ImportableVec, PcoVec, Stamp};
use crate::parallel_import;

View File

@@ -2,7 +2,7 @@ use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, OutPoint, OutputType, TxInIndex, TxIndex, TypeIndex, Version};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Database, GenericStoredVec, ImportableVec, PcoVec, Stamp};
use vecdb::{AnyStoredVec, Database, WritableVec, ImportableVec, PcoVec, Stamp};
use crate::parallel_import;

View File

@@ -4,7 +4,9 @@ use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{AddressBytes, AddressHash, Height, OutputType, TypeIndex, Version};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Database, PAGE_SIZE, Reader, Stamp};
use vecdb::{AnyStoredVec, Database, Reader, Stamp};
const PAGE_SIZE: usize = 4096;
use crate::parallel_import;
@@ -149,7 +151,7 @@ impl Vecs {
addresstype: OutputType,
typeindex: TypeIndex,
reader: &Reader,
) -> Result<Option<AddressBytes>> {
) -> Option<AddressBytes> {
self.addresses
.get_bytes_by_type(addresstype, typeindex, reader)
}

View File

@@ -2,7 +2,7 @@ use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, OutputType, Sats, TxIndex, TxOutIndex, TypeIndex, Version};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PcoVec, Stamp};
use vecdb::{AnyStoredVec, BytesVec, Database, WritableVec, ImportableVec, PcoVec, Stamp};
use crate::parallel_import;

View File

@@ -4,7 +4,7 @@ use brk_types::{
EmptyOutputIndex, Height, OpReturnIndex, P2MSOutputIndex, TxIndex, UnknownOutputIndex, Version,
};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Database, GenericStoredVec, ImportableVec, PcoVec, Stamp};
use vecdb::{AnyStoredVec, Database, WritableVec, ImportableVec, PcoVec, Stamp};
use crate::parallel_import;

View File

@@ -5,7 +5,7 @@ use brk_types::{
Version,
};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PcoVec, Stamp};
use vecdb::{AnyStoredVec, BytesVec, Database, WritableVec, ImportableVec, PcoVec, Stamp};
use crate::parallel_import;