indexer: fix bug

This commit is contained in:
nym21
2025-12-04 23:11:21 +01:00
parent dc2fa233ab
commit f4edb695de
5 changed files with 130 additions and 211 deletions

View File

@@ -37,7 +37,7 @@ impl Indexes {
OutputType::OpReturn => *self.opreturnindex,
OutputType::P2A => *self.p2aaddressindex,
OutputType::P2MS => *self.p2msoutputindex,
OutputType::P2PK33 => *self.p2pkhaddressindex,
OutputType::P2PK33 => *self.p2pk33addressindex,
OutputType::P2PK65 => *self.p2pk65addressindex,
OutputType::P2PKH => *self.p2pkhaddressindex,
OutputType::P2SH => *self.p2shaddressindex,

View File

@@ -4,6 +4,7 @@ use std::{path::Path, str::FromStr, thread, time::Instant};
use bitcoin::{TxIn, TxOut};
use brk_error::{Error, Result};
use brk_grouper::ByAddressType;
use brk_iterator::Blocks;
use brk_rpc::Client;
use brk_store::AnyStore;
@@ -32,6 +33,31 @@ const VERSION: Version = Version::new(23);
const SNAPSHOT_BLOCK_RANGE: usize = 1_000;
const COLLISIONS_CHECKED_UP_TO: Height = Height::new(0);
/// Known duplicate Bitcoin transactions (BIP30)
/// https://github.com/bitcoin/bips/blob/master/bip-0030.mediawiki
/// Each entry is (txid_str, txindex) - these are coinbase txs that were duplicated.
const DUPLICATE_TXID_STRS: [(&str, u32); 2] = [
("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599", 142783),
("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468", 142841),
];
static DUPLICATE_TXIDS: std::sync::LazyLock<[Txid; 2]> = std::sync::LazyLock::new(|| {
[
bitcoin::Txid::from_str(DUPLICATE_TXID_STRS[0].0).unwrap().into(),
bitcoin::Txid::from_str(DUPLICATE_TXID_STRS[1].0).unwrap().into(),
]
});
static DUPLICATE_TXID_PREFIXES: std::sync::LazyLock<[(TxidPrefix, TxIndex); 2]> =
std::sync::LazyLock::new(|| {
DUPLICATE_TXID_STRS.map(|(s, txindex)| {
(
TxidPrefix::from(&Txid::from(bitcoin::Txid::from_str(s).unwrap())),
TxIndex::new(txindex),
)
})
});
#[derive(Clone)]
pub struct Indexer {
pub vecs: Vecs,
@@ -119,7 +145,6 @@ impl Indexer {
let export = move |stores: &mut Stores, vecs: &mut Vecs, height: Height| -> Result<()> {
info!("Exporting...");
// std::process::exit(0);
let _lock = exit.lock();
let i = Instant::now();
stores.commit(height).unwrap();
@@ -136,8 +161,6 @@ impl Indexer {
let stores = &mut self.stores;
for block in blocks.after(prev_hash)? {
// let i_tot = Instant::now();
let height = block.height();
let blockhash = block.hash();
@@ -182,7 +205,6 @@ impl Indexer {
vecs.height_to_weight
.push_if_needed(height, block.weight().into())?;
// let i = Instant::now();
let txs = block
.txdata
.par_iter()
@@ -210,9 +232,7 @@ impl Indexer {
))
})
.collect::<Result<Vec<_>>>()?;
// println!("txs = : {:?}", i.elapsed());
// let i = Instant::now();
let txid_prefix_to_txindex = txs
.iter()
.map(|(txindex, _, _, prefix, _)| (*prefix, txindex))
@@ -301,9 +321,7 @@ impl Indexer {
})
.collect::<Result<Vec<_>>>()?;
drop(txid_prefix_to_txindex);
// println!("txinindex_and_txindata = : {:?}", i.elapsed());
// let i = Instant::now();
let same_block_spent_outpoints: FxHashSet<OutPoint> = txins
.iter()
.filter_map(|(_, input_source)| {
@@ -317,9 +335,7 @@ impl Indexer {
}
})
.collect();
// println!("same_block_spent_outpoints = : {:?}", i.elapsed());
// let i = Instant::now();
let txouts = block
.txdata
.iter()
@@ -476,15 +492,13 @@ impl Indexer {
},
)
.collect::<Result<Vec<_>>>()?;
// println!("txouts = : {:?}", i.elapsed());
let outputs_len = txouts.len();
let inputs_len = txins.len();
let tx_len = block.txdata.len();
// let i = Instant::now();
let mut already_added_addresshash: FxHashMap<AddressHash, TypeIndex> =
FxHashMap::default();
let mut already_added_addresshash: ByAddressType<FxHashMap<AddressHash, TypeIndex>> =
ByAddressType::default();
let mut same_block_output_info: FxHashMap<OutPoint, (OutputType, TypeIndex)> =
FxHashMap::default();
for (txoutindex, txout, txindex, vout, outputtype, addressbytes_opt, typeindex_opt) in
@@ -509,7 +523,7 @@ impl Indexer {
ti
} else if let Some((address_bytes, address_hash)) = addressbytes_opt {
let addresstype = outputtype;
if let Some(&ti) = already_added_addresshash.get(&address_hash) {
if let Some(&ti) = already_added_addresshash.get_unwrap(addresstype).get(&address_hash) {
ti
} else {
let ti = match addresstype {
@@ -524,7 +538,7 @@ impl Indexer {
_ => unreachable!(),
};
already_added_addresshash.insert(address_hash, ti);
already_added_addresshash.get_mut_unwrap(addresstype).insert(address_hash, ti);
stores
.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(addresstype)
@@ -596,12 +610,7 @@ impl Indexer {
);
}
}
// println!(
// "txouts.into_iter() = : {:?}",
// i.elapsed()
// );
// let i = Instant::now();
for (txinindex, input_source) in txins {
let (vin, txindex, outpoint, addresstype_addressindex_opt) = match input_source {
InputSource::PreviousBlock(tuple) => tuple,
@@ -650,9 +659,7 @@ impl Indexer {
.get_mut_unwrap(addresstype)
.remove_if_needed(AddressIndexOutPoint::from((addressindex, outpoint)), height);
}
// println!("txins.into_iter(): {:?}", i.elapsed());
// let i = Instant::now();
if check_collisions {
let mut txindex_to_txid_iter = vecs.txindex_to_txid.into_iter();
for (txindex, _, _, _, prev_txindex_opt) in txs.iter() {
@@ -676,20 +683,7 @@ impl Indexer {
// If another Txid needs to be added to the list
// We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated
let only_known_dup_txids = [
bitcoin::Txid::from_str(
"d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599",
)
.unwrap()
.into(),
bitcoin::Txid::from_str(
"e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468",
)
.unwrap()
.into(),
];
let is_dup = only_known_dup_txids.contains(&prev_txid);
let is_dup = DUPLICATE_TXIDS.contains(&prev_txid);
if !is_dup {
dbg!(height, txindex, prev_txid, prev_txindex);
@@ -697,9 +691,7 @@ impl Indexer {
}
}
}
// println!("txindex_to_tx_and_txid = : {:?}", i.elapsed());
// let i = Instant::now();
for (txindex, tx, txid, txid_prefix, prev_txindex_opt) in txs {
if prev_txindex_opt.is_none() {
stores
@@ -720,14 +712,11 @@ impl Indexer {
vecs.txindex_to_is_explicitly_rbf
.push_if_needed(txindex, StoredBool::from(tx.is_explicitly_rbf()))?;
}
// println!("txindex_to_tx_and_txid.into_iter(): {:?}", i.elapsed());
indexes.txindex += TxIndex::from(tx_len);
indexes.txinindex += TxInIndex::from(inputs_len);
indexes.txoutindex += TxOutIndex::from(outputs_len);
// println!("full block: {:?}", i_tot.elapsed());
if should_export(height, false) {
drop(readers);
export(stores, vecs, height)?;
@@ -741,9 +730,7 @@ impl Indexer {
export(stores, vecs, indexes.height)?;
}
// let i = Instant::now();
self.vecs.compact()?;
// info!("Punched holes in db in {}s", i.elapsed().as_secs());
Ok(starting_indexes)
}

View File

@@ -1,16 +1,15 @@
use std::{fs, path::Path, str::FromStr};
use std::{fs, path::Path};
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, Mode, StoreFjallV2 as Store, Type};
use brk_types::{
AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height,
OutPoint, OutputType, StoredString, TxIndex, TxOutIndex, Txid, TxidPrefix, TypeIndex, Unit,
Version, Vout,
AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, OutPoint,
OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout,
};
use fjall2::{CompressionType as Compression, PersistMode, TransactionalKeyspace};
use rayon::prelude::*;
use vecdb::{AnyVec, GenericStoredVec, TypedVecIterator, VecIndex, VecIterator};
use vecdb::{AnyVec, TypedVecIterator, VecIndex, VecIterator};
use crate::Indexes;
@@ -140,11 +139,7 @@ impl Stores {
.values()
.map(|s| s as &dyn AnyStore),
)
.map(|store| {
// let height =
store.height().map(Height::incremented).unwrap_or_default()
// dbg!((height, store.name()));
})
.map(|store| store.height().map(Height::incremented).unwrap_or_default())
.min()
.unwrap()
}
@@ -224,176 +219,48 @@ impl Stores {
self.height_to_coinbase_tag.remove(h);
});
if let Ok(mut index) = vecs
.height_to_first_p2pk65addressindex
.read_once(starting_indexes.height)
{
let mut p2pk65addressindex_to_p2pk65bytes_iter =
vecs.p2pk65addressindex_to_p2pk65bytes.iter()?;
while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
// Remove address hashes for all address types starting from rollback height
for address_type in [
OutputType::P2PK65,
OutputType::P2PK33,
OutputType::P2PKH,
OutputType::P2SH,
OutputType::P2WPKH,
OutputType::P2WSH,
OutputType::P2TR,
OutputType::P2A,
] {
for hash in vecs.iter_address_hashes_from(address_type, starting_indexes.height)? {
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2PK65)
.get_mut_unwrap(address_type)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2pk33addressindex
.read_once(starting_indexes.height)
{
let mut p2pk33addressindex_to_p2pk33bytes_iter =
vecs.p2pk33addressindex_to_p2pk33bytes.iter()?;
while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2PK33)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2pkhaddressindex
.read_once(starting_indexes.height)
{
let mut p2pkhaddressindex_to_p2pkhbytes_iter =
vecs.p2pkhaddressindex_to_p2pkhbytes.iter()?;
while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2PKH)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2shaddressindex
.read_once(starting_indexes.height)
{
let mut p2shaddressindex_to_p2shbytes_iter =
vecs.p2shaddressindex_to_p2shbytes.iter()?;
while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2SH)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2wpkhaddressindex
.read_once(starting_indexes.height)
{
let mut p2wpkhaddressindex_to_p2wpkhbytes_iter =
vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter()?;
while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2WPKH)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2wshaddressindex
.read_once(starting_indexes.height)
{
let mut p2wshaddressindex_to_p2wshbytes_iter =
vecs.p2wshaddressindex_to_p2wshbytes.iter()?;
while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2WSH)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2traddressindex
.read_once(starting_indexes.height)
{
let mut p2traddressindex_to_p2trbytes_iter =
vecs.p2traddressindex_to_p2trbytes.iter()?;
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2TR)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2aaddressindex
.read_once(starting_indexes.height)
{
let mut p2aaddressindex_to_p2abytes_iter =
vecs.p2aaddressindex_to_p2abytes.iter()?;
while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2A)
.remove(hash);
index.increment();
}
}
} else {
unreachable!();
// self.blockhashprefix_to_height.reset()?;
// self.addresshash_to_typeindex.reset()?;
}
if starting_indexes.txindex != TxIndex::ZERO {
let txidprefix_dup1 = TxidPrefix::from(Txid::from(bitcoin::Txid::from_str(
"d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599",
)?));
let txidprefix_dup2 = TxidPrefix::from(Txid::from(bitcoin::Txid::from_str(
"e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468",
)?));
vecs.txindex_to_txid
.iter()?
.enumerate()
.skip(starting_indexes.txindex.to_usize())
.for_each(|(txindex, txid)| {
let txindex = TxIndex::from(txindex);
let txidprefix = TxidPrefix::from(&txid);
let is_not_first_dup =
txindex != TxIndex::new(142783) || txidprefix != txidprefix_dup1;
let is_known_dup = crate::DUPLICATE_TXID_PREFIXES
.iter()
.any(|(dup_prefix, dup_txindex)| {
txindex == *dup_txindex && txidprefix == *dup_prefix
});
let is_not_second_dup =
txindex != TxIndex::new(142841) || txidprefix != txidprefix_dup2;
if is_not_first_dup && is_not_second_dup {
if !is_known_dup {
self.txidprefix_to_txindex.remove(txidprefix);
}
});
} else {
unreachable!();
// self.txidprefix_to_txindex.reset()?;
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
@@ -467,12 +334,6 @@ impl Stores {
});
} else {
unreachable!();
// self.addresstype_to_typeindex_and_txindex
// .iter_mut()
// .try_for_each(|s| s.reset())?;
// self.addresstype_to_typeindex_and_unspentoutpoint
// .iter_mut()
// .try_for_each(|s| s.reset())?;
}
self.commit(starting_indexes.height.decremented().unwrap_or_default())?;

View File

@@ -1,3 +1,9 @@
//! Experimental stores implementation using fjall3.
//!
//! This module is currently commented out in lib.rs and not in use.
//! It exists as a work-in-progress upgrade path from fjall2 (stores_v2) to fjall3.
//! Do not delete - intended for future activation once fjall3 is stable and tested.
use std::{fs, path::Path, time::Instant};
use brk_error::Result;

View File

@@ -3,8 +3,8 @@ use std::path::Path;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{
AddressBytes, BlockHash, EmptyOutputIndex, Height, OpReturnIndex, OutPoint, OutputType,
P2AAddressIndex, P2ABytes, P2MSOutputIndex, P2PK33AddressIndex, P2PK33Bytes,
AddressBytes, AddressHash, BlockHash, EmptyOutputIndex, Height, OpReturnIndex, OutPoint,
OutputType, P2AAddressIndex, P2ABytes, P2MSOutputIndex, P2PK33AddressIndex, P2PK33Bytes,
P2PK65AddressIndex, P2PK65Bytes, P2PKHAddressIndex, P2PKHBytes, P2SHAddressIndex, P2SHBytes,
P2TRAddressIndex, P2TRBytes, P2WPKHAddressIndex, P2WPKHBytes, P2WSHAddressIndex, P2WSHBytes,
RawLockTime, Sats, StoredBool, StoredF64, StoredU32, StoredU64, Timestamp, TxInIndex, TxIndex,
@@ -12,7 +12,8 @@ use brk_types::{
};
use rayon::prelude::*;
use vecdb::{
AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PAGE_SIZE, PcoVec, Stamp,
AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, TypedVecIterator, PAGE_SIZE,
PcoVec, Stamp,
};
use crate::Indexes;
@@ -366,6 +367,70 @@ impl Vecs {
Ok(())
}
/// Iterate address hashes starting from a given height (for rollback).
/// Returns an iterator of AddressHash values for all addresses of the given type
/// that were added at or after the given height.
pub fn iter_address_hashes_from(
&self,
address_type: OutputType,
height: Height,
) -> Result<Box<dyn Iterator<Item = AddressHash> + '_>> {
macro_rules! make_iter {
($height_vec:expr, $bytes_vec:expr) => {{
match $height_vec.read_once(height) {
Ok(mut index) => {
let mut iter = $bytes_vec.iter()?;
Ok(Box::new(std::iter::from_fn(move || {
iter.get(index).map(|typedbytes| {
let bytes = AddressBytes::from(typedbytes);
index.increment();
AddressHash::from(&bytes)
})
})) as Box<dyn Iterator<Item = AddressHash> + '_>)
}
Err(_) => Ok(Box::new(std::iter::empty())
as Box<dyn Iterator<Item = AddressHash> + '_>),
}
}};
}
match address_type {
OutputType::P2PK65 => make_iter!(
self.height_to_first_p2pk65addressindex,
self.p2pk65addressindex_to_p2pk65bytes
),
OutputType::P2PK33 => make_iter!(
self.height_to_first_p2pk33addressindex,
self.p2pk33addressindex_to_p2pk33bytes
),
OutputType::P2PKH => make_iter!(
self.height_to_first_p2pkhaddressindex,
self.p2pkhaddressindex_to_p2pkhbytes
),
OutputType::P2SH => make_iter!(
self.height_to_first_p2shaddressindex,
self.p2shaddressindex_to_p2shbytes
),
OutputType::P2WPKH => make_iter!(
self.height_to_first_p2wpkhaddressindex,
self.p2wpkhaddressindex_to_p2wpkhbytes
),
OutputType::P2WSH => make_iter!(
self.height_to_first_p2wshaddressindex,
self.p2wshaddressindex_to_p2wshbytes
),
OutputType::P2TR => make_iter!(
self.height_to_first_p2traddressindex,
self.p2traddressindex_to_p2trbytes
),
OutputType::P2A => make_iter!(
self.height_to_first_p2aaddressindex,
self.p2aaddressindex_to_p2abytes
),
_ => Ok(Box::new(std::iter::empty())),
}
}
fn iter_mut_any_stored_vec(&mut self) -> impl Iterator<Item = &mut dyn AnyStoredVec> {
[
&mut self.emptyoutputindex_to_txindex as &mut dyn AnyStoredVec,