From f4edb695deeff2473c7b018c3983016833002208 Mon Sep 17 00:00:00 2001 From: nym21 Date: Thu, 4 Dec 2025 23:11:21 +0100 Subject: [PATCH] indexer: fix bug --- crates/brk_indexer/src/indexes.rs | 2 +- crates/brk_indexer/src/lib.rs | 75 +++++------ crates/brk_indexer/src/stores_v2.rs | 187 ++++------------------------ crates/brk_indexer/src/stores_v3.rs | 6 + crates/brk_indexer/src/vecs.rs | 71 ++++++++++- 5 files changed, 130 insertions(+), 211 deletions(-) diff --git a/crates/brk_indexer/src/indexes.rs b/crates/brk_indexer/src/indexes.rs index 045b66de7..c0d278ab7 100644 --- a/crates/brk_indexer/src/indexes.rs +++ b/crates/brk_indexer/src/indexes.rs @@ -37,7 +37,7 @@ impl Indexes { OutputType::OpReturn => *self.opreturnindex, OutputType::P2A => *self.p2aaddressindex, OutputType::P2MS => *self.p2msoutputindex, - OutputType::P2PK33 => *self.p2pkhaddressindex, + OutputType::P2PK33 => *self.p2pk33addressindex, OutputType::P2PK65 => *self.p2pk65addressindex, OutputType::P2PKH => *self.p2pkhaddressindex, OutputType::P2SH => *self.p2shaddressindex, diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index a9e67b31c..dcdae9768 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -4,6 +4,7 @@ use std::{path::Path, str::FromStr, thread, time::Instant}; use bitcoin::{TxIn, TxOut}; use brk_error::{Error, Result}; +use brk_grouper::ByAddressType; use brk_iterator::Blocks; use brk_rpc::Client; use brk_store::AnyStore; @@ -32,6 +33,31 @@ const VERSION: Version = Version::new(23); const SNAPSHOT_BLOCK_RANGE: usize = 1_000; const COLLISIONS_CHECKED_UP_TO: Height = Height::new(0); +/// Known duplicate Bitcoin transactions (BIP30) +/// https://github.com/bitcoin/bips/blob/master/bip-0030.mediawiki +/// Each entry is (txid_str, txindex) - these are coinbase txs that were duplicated. +const DUPLICATE_TXID_STRS: [(&str, u32); 2] = [ + ("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599", 142783), + ("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468", 142841), +]; + +static DUPLICATE_TXIDS: std::sync::LazyLock<[Txid; 2]> = std::sync::LazyLock::new(|| { + [ + bitcoin::Txid::from_str(DUPLICATE_TXID_STRS[0].0).unwrap().into(), + bitcoin::Txid::from_str(DUPLICATE_TXID_STRS[1].0).unwrap().into(), + ] +}); + +static DUPLICATE_TXID_PREFIXES: std::sync::LazyLock<[(TxidPrefix, TxIndex); 2]> = + std::sync::LazyLock::new(|| { + DUPLICATE_TXID_STRS.map(|(s, txindex)| { + ( + TxidPrefix::from(&Txid::from(bitcoin::Txid::from_str(s).unwrap())), + TxIndex::new(txindex), + ) + }) + }); + #[derive(Clone)] pub struct Indexer { pub vecs: Vecs, @@ -119,7 +145,6 @@ impl Indexer { let export = move |stores: &mut Stores, vecs: &mut Vecs, height: Height| -> Result<()> { info!("Exporting..."); - // std::process::exit(0); let _lock = exit.lock(); let i = Instant::now(); stores.commit(height).unwrap(); @@ -136,8 +161,6 @@ impl Indexer { let stores = &mut self.stores; for block in blocks.after(prev_hash)? { - // let i_tot = Instant::now(); - let height = block.height(); let blockhash = block.hash(); @@ -182,7 +205,6 @@ impl Indexer { vecs.height_to_weight .push_if_needed(height, block.weight().into())?; - // let i = Instant::now(); let txs = block .txdata .par_iter() @@ -210,9 +232,7 @@ impl Indexer { )) }) .collect::>>()?; - // println!("txs = : {:?}", i.elapsed()); - // let i = Instant::now(); let txid_prefix_to_txindex = txs .iter() .map(|(txindex, _, _, prefix, _)| (*prefix, txindex)) @@ -301,9 +321,7 @@ impl Indexer { }) .collect::>>()?; drop(txid_prefix_to_txindex); - // println!("txinindex_and_txindata = : {:?}", i.elapsed()); - // let i = Instant::now(); let same_block_spent_outpoints: FxHashSet = txins .iter() .filter_map(|(_, input_source)| { @@ -317,9 +335,7 @@ impl Indexer { } }) .collect(); - // println!("same_block_spent_outpoints = : {:?}", i.elapsed()); - // let i = Instant::now(); let txouts = block .txdata .iter() @@ -476,15 +492,13 @@ impl Indexer { }, ) .collect::>>()?; - // println!("txouts = : {:?}", i.elapsed()); let outputs_len = txouts.len(); let inputs_len = txins.len(); let tx_len = block.txdata.len(); - // let i = Instant::now(); - let mut already_added_addresshash: FxHashMap = - FxHashMap::default(); + let mut already_added_addresshash: ByAddressType> = + ByAddressType::default(); let mut same_block_output_info: FxHashMap = FxHashMap::default(); for (txoutindex, txout, txindex, vout, outputtype, addressbytes_opt, typeindex_opt) in @@ -509,7 +523,7 @@ impl Indexer { ti } else if let Some((address_bytes, address_hash)) = addressbytes_opt { let addresstype = outputtype; - if let Some(&ti) = already_added_addresshash.get(&address_hash) { + if let Some(&ti) = already_added_addresshash.get_unwrap(addresstype).get(&address_hash) { ti } else { let ti = match addresstype { @@ -524,7 +538,7 @@ impl Indexer { _ => unreachable!(), }; - already_added_addresshash.insert(address_hash, ti); + already_added_addresshash.get_mut_unwrap(addresstype).insert(address_hash, ti); stores .addresstype_to_addresshash_to_addressindex .get_mut_unwrap(addresstype) @@ -596,12 +610,7 @@ impl Indexer { ); } } - // println!( - // "txouts.into_iter() = : {:?}", - // i.elapsed() - // ); - // let i = Instant::now(); for (txinindex, input_source) in txins { let (vin, txindex, outpoint, addresstype_addressindex_opt) = match input_source { InputSource::PreviousBlock(tuple) => tuple, @@ -650,9 +659,7 @@ impl Indexer { .get_mut_unwrap(addresstype) .remove_if_needed(AddressIndexOutPoint::from((addressindex, outpoint)), height); } - // println!("txins.into_iter(): {:?}", i.elapsed()); - // let i = Instant::now(); if check_collisions { let mut txindex_to_txid_iter = vecs.txindex_to_txid.into_iter(); for (txindex, _, _, _, prev_txindex_opt) in txs.iter() { @@ -676,20 +683,7 @@ impl Indexer { // If another Txid needs to be added to the list // We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated - let only_known_dup_txids = [ - bitcoin::Txid::from_str( - "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599", - ) - .unwrap() - .into(), - bitcoin::Txid::from_str( - "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468", - ) - .unwrap() - .into(), - ]; - - let is_dup = only_known_dup_txids.contains(&prev_txid); + let is_dup = DUPLICATE_TXIDS.contains(&prev_txid); if !is_dup { dbg!(height, txindex, prev_txid, prev_txindex); @@ -697,9 +691,7 @@ impl Indexer { } } } - // println!("txindex_to_tx_and_txid = : {:?}", i.elapsed()); - // let i = Instant::now(); for (txindex, tx, txid, txid_prefix, prev_txindex_opt) in txs { if prev_txindex_opt.is_none() { stores @@ -720,14 +712,11 @@ impl Indexer { vecs.txindex_to_is_explicitly_rbf .push_if_needed(txindex, StoredBool::from(tx.is_explicitly_rbf()))?; } - // println!("txindex_to_tx_and_txid.into_iter(): {:?}", i.elapsed()); indexes.txindex += TxIndex::from(tx_len); indexes.txinindex += TxInIndex::from(inputs_len); indexes.txoutindex += TxOutIndex::from(outputs_len); - // println!("full block: {:?}", i_tot.elapsed()); - if should_export(height, false) { drop(readers); export(stores, vecs, height)?; @@ -741,9 +730,7 @@ impl Indexer { export(stores, vecs, indexes.height)?; } - // let i = Instant::now(); self.vecs.compact()?; - // info!("Punched holes in db in {}s", i.elapsed().as_secs()); Ok(starting_indexes) } diff --git a/crates/brk_indexer/src/stores_v2.rs b/crates/brk_indexer/src/stores_v2.rs index 34a4c5e9d..a7d233371 100644 --- a/crates/brk_indexer/src/stores_v2.rs +++ b/crates/brk_indexer/src/stores_v2.rs @@ -1,16 +1,15 @@ -use std::{fs, path::Path, str::FromStr}; +use std::{fs, path::Path}; use brk_error::Result; use brk_grouper::ByAddressType; use brk_store::{AnyStore, Mode, StoreFjallV2 as Store, Type}; use brk_types::{ - AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, - OutPoint, OutputType, StoredString, TxIndex, TxOutIndex, Txid, TxidPrefix, TypeIndex, Unit, - Version, Vout, + AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, OutPoint, + OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout, }; use fjall2::{CompressionType as Compression, PersistMode, TransactionalKeyspace}; use rayon::prelude::*; -use vecdb::{AnyVec, GenericStoredVec, TypedVecIterator, VecIndex, VecIterator}; +use vecdb::{AnyVec, TypedVecIterator, VecIndex, VecIterator}; use crate::Indexes; @@ -140,11 +139,7 @@ impl Stores { .values() .map(|s| s as &dyn AnyStore), ) - .map(|store| { - // let height = - store.height().map(Height::incremented).unwrap_or_default() - // dbg!((height, store.name())); - }) + .map(|store| store.height().map(Height::incremented).unwrap_or_default()) .min() .unwrap() } @@ -224,176 +219,48 @@ impl Stores { self.height_to_coinbase_tag.remove(h); }); - if let Ok(mut index) = vecs - .height_to_first_p2pk65addressindex - .read_once(starting_indexes.height) - { - let mut p2pk65addressindex_to_p2pk65bytes_iter = - vecs.p2pk65addressindex_to_p2pk65bytes.iter()?; - - while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressHash::from(&bytes); + // Remove address hashes for all address types starting from rollback height + for address_type in [ + OutputType::P2PK65, + OutputType::P2PK33, + OutputType::P2PKH, + OutputType::P2SH, + OutputType::P2WPKH, + OutputType::P2WSH, + OutputType::P2TR, + OutputType::P2A, + ] { + for hash in vecs.iter_address_hashes_from(address_type, starting_indexes.height)? { self.addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(OutputType::P2PK65) + .get_mut_unwrap(address_type) .remove(hash); - index.increment(); - } - } - - if let Ok(mut index) = vecs - .height_to_first_p2pk33addressindex - .read_once(starting_indexes.height) - { - let mut p2pk33addressindex_to_p2pk33bytes_iter = - vecs.p2pk33addressindex_to_p2pk33bytes.iter()?; - - while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressHash::from(&bytes); - self.addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(OutputType::P2PK33) - .remove(hash); - index.increment(); - } - } - - if let Ok(mut index) = vecs - .height_to_first_p2pkhaddressindex - .read_once(starting_indexes.height) - { - let mut p2pkhaddressindex_to_p2pkhbytes_iter = - vecs.p2pkhaddressindex_to_p2pkhbytes.iter()?; - - while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressHash::from(&bytes); - self.addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(OutputType::P2PKH) - .remove(hash); - index.increment(); - } - } - - if let Ok(mut index) = vecs - .height_to_first_p2shaddressindex - .read_once(starting_indexes.height) - { - let mut p2shaddressindex_to_p2shbytes_iter = - vecs.p2shaddressindex_to_p2shbytes.iter()?; - - while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressHash::from(&bytes); - self.addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(OutputType::P2SH) - .remove(hash); - index.increment(); - } - } - - if let Ok(mut index) = vecs - .height_to_first_p2wpkhaddressindex - .read_once(starting_indexes.height) - { - let mut p2wpkhaddressindex_to_p2wpkhbytes_iter = - vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter()?; - - while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressHash::from(&bytes); - self.addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(OutputType::P2WPKH) - .remove(hash); - index.increment(); - } - } - - if let Ok(mut index) = vecs - .height_to_first_p2wshaddressindex - .read_once(starting_indexes.height) - { - let mut p2wshaddressindex_to_p2wshbytes_iter = - vecs.p2wshaddressindex_to_p2wshbytes.iter()?; - - while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressHash::from(&bytes); - self.addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(OutputType::P2WSH) - .remove(hash); - index.increment(); - } - } - - if let Ok(mut index) = vecs - .height_to_first_p2traddressindex - .read_once(starting_indexes.height) - { - let mut p2traddressindex_to_p2trbytes_iter = - vecs.p2traddressindex_to_p2trbytes.iter()?; - - while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressHash::from(&bytes); - self.addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(OutputType::P2TR) - .remove(hash); - index.increment(); - } - } - - if let Ok(mut index) = vecs - .height_to_first_p2aaddressindex - .read_once(starting_indexes.height) - { - let mut p2aaddressindex_to_p2abytes_iter = - vecs.p2aaddressindex_to_p2abytes.iter()?; - - while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressHash::from(&bytes); - self.addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(OutputType::P2A) - .remove(hash); - index.increment(); } } } else { unreachable!(); - // self.blockhashprefix_to_height.reset()?; - // self.addresshash_to_typeindex.reset()?; } if starting_indexes.txindex != TxIndex::ZERO { - let txidprefix_dup1 = TxidPrefix::from(Txid::from(bitcoin::Txid::from_str( - "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599", - )?)); - let txidprefix_dup2 = TxidPrefix::from(Txid::from(bitcoin::Txid::from_str( - "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468", - )?)); vecs.txindex_to_txid .iter()? .enumerate() .skip(starting_indexes.txindex.to_usize()) .for_each(|(txindex, txid)| { let txindex = TxIndex::from(txindex); - let txidprefix = TxidPrefix::from(&txid); - let is_not_first_dup = - txindex != TxIndex::new(142783) || txidprefix != txidprefix_dup1; + let is_known_dup = crate::DUPLICATE_TXID_PREFIXES + .iter() + .any(|(dup_prefix, dup_txindex)| { + txindex == *dup_txindex && txidprefix == *dup_prefix + }); - let is_not_second_dup = - txindex != TxIndex::new(142841) || txidprefix != txidprefix_dup2; - - if is_not_first_dup && is_not_second_dup { + if !is_known_dup { self.txidprefix_to_txindex.remove(txidprefix); } }); } else { unreachable!(); - // self.txidprefix_to_txindex.reset()?; } if starting_indexes.txoutindex != TxOutIndex::ZERO { @@ -467,12 +334,6 @@ impl Stores { }); } else { unreachable!(); - // self.addresstype_to_typeindex_and_txindex - // .iter_mut() - // .try_for_each(|s| s.reset())?; - // self.addresstype_to_typeindex_and_unspentoutpoint - // .iter_mut() - // .try_for_each(|s| s.reset())?; } self.commit(starting_indexes.height.decremented().unwrap_or_default())?; diff --git a/crates/brk_indexer/src/stores_v3.rs b/crates/brk_indexer/src/stores_v3.rs index 1e4b45334..7ddc14db7 100644 --- a/crates/brk_indexer/src/stores_v3.rs +++ b/crates/brk_indexer/src/stores_v3.rs @@ -1,3 +1,9 @@ +//! Experimental stores implementation using fjall3. +//! +//! This module is currently commented out in lib.rs and not in use. +//! It exists as a work-in-progress upgrade path from fjall2 (stores_v2) to fjall3. +//! Do not delete - intended for future activation once fjall3 is stable and tested. + use std::{fs, path::Path, time::Instant}; use brk_error::Result; diff --git a/crates/brk_indexer/src/vecs.rs b/crates/brk_indexer/src/vecs.rs index 8e78e4d75..3ef0e8961 100644 --- a/crates/brk_indexer/src/vecs.rs +++ b/crates/brk_indexer/src/vecs.rs @@ -3,8 +3,8 @@ use std::path::Path; use brk_error::Result; use brk_traversable::Traversable; use brk_types::{ - AddressBytes, BlockHash, EmptyOutputIndex, Height, OpReturnIndex, OutPoint, OutputType, - P2AAddressIndex, P2ABytes, P2MSOutputIndex, P2PK33AddressIndex, P2PK33Bytes, + AddressBytes, AddressHash, BlockHash, EmptyOutputIndex, Height, OpReturnIndex, OutPoint, + OutputType, P2AAddressIndex, P2ABytes, P2MSOutputIndex, P2PK33AddressIndex, P2PK33Bytes, P2PK65AddressIndex, P2PK65Bytes, P2PKHAddressIndex, P2PKHBytes, P2SHAddressIndex, P2SHBytes, P2TRAddressIndex, P2TRBytes, P2WPKHAddressIndex, P2WPKHBytes, P2WSHAddressIndex, P2WSHBytes, RawLockTime, Sats, StoredBool, StoredF64, StoredU32, StoredU64, Timestamp, TxInIndex, TxIndex, @@ -12,7 +12,8 @@ use brk_types::{ }; use rayon::prelude::*; use vecdb::{ - AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, PAGE_SIZE, PcoVec, Stamp, + AnyStoredVec, BytesVec, Database, GenericStoredVec, ImportableVec, TypedVecIterator, PAGE_SIZE, + PcoVec, Stamp, }; use crate::Indexes; @@ -366,6 +367,70 @@ impl Vecs { Ok(()) } + /// Iterate address hashes starting from a given height (for rollback). + /// Returns an iterator of AddressHash values for all addresses of the given type + /// that were added at or after the given height. + pub fn iter_address_hashes_from( + &self, + address_type: OutputType, + height: Height, + ) -> Result + '_>> { + macro_rules! make_iter { + ($height_vec:expr, $bytes_vec:expr) => {{ + match $height_vec.read_once(height) { + Ok(mut index) => { + let mut iter = $bytes_vec.iter()?; + Ok(Box::new(std::iter::from_fn(move || { + iter.get(index).map(|typedbytes| { + let bytes = AddressBytes::from(typedbytes); + index.increment(); + AddressHash::from(&bytes) + }) + })) as Box + '_>) + } + Err(_) => Ok(Box::new(std::iter::empty()) + as Box + '_>), + } + }}; + } + + match address_type { + OutputType::P2PK65 => make_iter!( + self.height_to_first_p2pk65addressindex, + self.p2pk65addressindex_to_p2pk65bytes + ), + OutputType::P2PK33 => make_iter!( + self.height_to_first_p2pk33addressindex, + self.p2pk33addressindex_to_p2pk33bytes + ), + OutputType::P2PKH => make_iter!( + self.height_to_first_p2pkhaddressindex, + self.p2pkhaddressindex_to_p2pkhbytes + ), + OutputType::P2SH => make_iter!( + self.height_to_first_p2shaddressindex, + self.p2shaddressindex_to_p2shbytes + ), + OutputType::P2WPKH => make_iter!( + self.height_to_first_p2wpkhaddressindex, + self.p2wpkhaddressindex_to_p2wpkhbytes + ), + OutputType::P2WSH => make_iter!( + self.height_to_first_p2wshaddressindex, + self.p2wshaddressindex_to_p2wshbytes + ), + OutputType::P2TR => make_iter!( + self.height_to_first_p2traddressindex, + self.p2traddressindex_to_p2trbytes + ), + OutputType::P2A => make_iter!( + self.height_to_first_p2aaddressindex, + self.p2aaddressindex_to_p2abytes + ), + _ => Ok(Box::new(std::iter::empty())), + } + } + fn iter_mut_any_stored_vec(&mut self) -> impl Iterator { [ &mut self.emptyoutputindex_to_txindex as &mut dyn AnyStoredVec,