From 9b230d23ddb8aa9b944b1b880b91661cf0e64851 Mon Sep 17 00:00:00 2001 From: nym21 Date: Mon, 20 Oct 2025 13:05:46 +0200 Subject: [PATCH] indexer: move txoutindex->txindex and txindex->height from computer --- crates/brk_computer/src/indexes.rs | 31 --------- crates/brk_indexer/src/lib.rs | 74 ++++++++++---------- crates/brk_indexer/src/stores_v2.rs | 102 +++++++++++++++++++++++----- crates/brk_indexer/src/vecs.rs | 38 ++++++----- crates/brk_store/src/v2/meta.rs | 8 +-- crates/brk_store/src/v2/mod.rs | 40 ++++++----- 6 files changed, 164 insertions(+), 129 deletions(-) diff --git a/crates/brk_computer/src/indexes.rs b/crates/brk_computer/src/indexes.rs index 94e8069b7..5226f0ddb 100644 --- a/crates/brk_computer/src/indexes.rs +++ b/crates/brk_computer/src/indexes.rs @@ -56,7 +56,6 @@ pub struct Vecs { pub opreturnindex_to_opreturnindex: LazyVecFrom1, pub txoutindex_to_txoutindex: LazyVecFrom1, - pub txoutindex_to_txindex: EagerVec, pub p2aaddressindex_to_p2aaddressindex: LazyVecFrom1, pub p2msoutputindex_to_p2msoutputindex: @@ -81,7 +80,6 @@ pub struct Vecs { pub semesterindex_to_first_monthindex: EagerVec, pub semesterindex_to_monthindex_count: EagerVec, pub semesterindex_to_semesterindex: EagerVec, - pub txindex_to_height: EagerVec, pub txindex_to_input_count: LazyVecFrom2, pub txindex_to_output_count: @@ -411,11 +409,6 @@ impl Vecs { "dateindex", version + VERSION + Version::ZERO, )?, - txindex_to_height: EagerVec::forced_import_compressed( - &db, - "height", - version + VERSION + Version::ZERO, - )?, height_to_timestamp_fixed: EagerVec::forced_import_compressed( &db, "timestamp_fixed", @@ -466,12 +459,6 @@ impl Vecs { "yearindex_count", version + VERSION + Version::ZERO, )?, - txoutindex_to_txindex: EagerVec::forced_import_compressed( - &db, - "txindex", - version + VERSION + Version::ZERO, - )?, - db, }; @@ -501,17 +488,6 @@ impl Vecs { starting_indexes: brk_indexer::Indexes, exit: &Exit, ) -> Result { - // --- - // TxOutIndex - // --- - - self.txoutindex_to_txindex.compute_inverse_less_to_more( - starting_indexes.txindex, - &indexer.vecs.txindex_to_first_txoutindex, - &self.txindex_to_output_count, - exit, - )?; - // --- // TxIndex // --- @@ -523,13 +499,6 @@ impl Vecs { exit, )?; - self.txindex_to_height.compute_inverse_less_to_more( - starting_indexes.height, - &indexer.vecs.height_to_first_txindex, - &self.height_to_txindex_count, - exit, - )?; - // --- // Height // --- diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index b74a43094..4a9a39799 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -197,29 +197,24 @@ impl Indexer { )) }) .collect::>>()?; - // println!("txid_prefix_and_txid_and_... = : {:?}", i.elapsed()); + // println!("txs = : {:?}", i.elapsed()); + // let i = Instant::now(); let txid_prefix_to_txindex = txs .iter() .map(|(txindex, _, _, prefix, _)| (*prefix, txindex)) .collect::>(); - - // let i = Instant::now(); - let inputs = block + let txins = block .txdata .iter() .enumerate() - .flat_map(|(index, tx)| { - tx.input - .iter() - .enumerate() - .map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx)) - }) - .collect::>(); - // println!("inputs = : {:?}", i.elapsed()); - - // let i = Instant::now(); - let txinindex_and_txindata = inputs + .flat_map(|(index, tx)| tx + .input + .iter() + .enumerate() + .map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx)) + ) + .collect::>() .into_par_iter() .enumerate() .map(|(block_txinindex, (block_txindex, vin, txin, tx))| -> Result<(TxInIndex, InputSource)> { @@ -292,25 +287,24 @@ impl Indexer { Ok((txinindex, InputSource::PreviousBlock(tuple))) }) .collect::>>()?; + drop(txid_prefix_to_txindex); // println!("txinindex_and_txindata = : {:?}", i.elapsed()); // let i = Instant::now(); - same_block_spent_outpoints.extend(txinindex_and_txindata.iter().filter_map( - |(_, input_source)| { - let InputSource::SameBlock((_, _, _, outpoint)) = input_source else { - return None; - }; - if !outpoint.is_coinbase() { - Some(*outpoint) - } else { - None - } - }, - )); + same_block_spent_outpoints.extend(txins.iter().filter_map(|(_, input_source)| { + let InputSource::SameBlock((_, _, _, outpoint)) = input_source else { + return None; + }; + if !outpoint.is_coinbase() { + Some(*outpoint) + } else { + None + } + })); // println!("same_block_spent_outpoints = : {:?}", i.elapsed()); // let i = Instant::now(); - let outputs = block + let txouts = block .txdata .iter() .enumerate() @@ -319,11 +313,7 @@ impl Indexer { (TxIndex::from(index), Vout::from(vout), txout, tx) }) }) - .collect::>(); - // println!("outputs = : {:?}", i.elapsed()); - - // let i = Instant::now(); - let txoutindex_to_txoutdata = outputs + .collect::>() .into_par_iter() .enumerate() .map( @@ -464,14 +454,14 @@ impl Indexer { }, ) .collect::>>()?; - // println!("txoutindex_to_txoutdata = : {:?}", i.elapsed()); + // println!("txouts = : {:?}", i.elapsed()); - let outputs_len = txoutindex_to_txoutdata.len(); - let inputs_len = txinindex_and_txindata.len(); + let outputs_len = txouts.len(); + let inputs_len = txins.len(); let tx_len = block.txdata.len(); // let i = Instant::now(); - txoutindex_to_txoutdata + txouts .into_iter() .try_for_each(|data| -> Result<()> { let ( @@ -493,6 +483,9 @@ impl Indexer { vecs.txoutindex_to_value.push_if_needed(txoutindex, sats)?; + vecs.txoutindex_to_txindex + .push_if_needed(txoutindex, txindex)?; + vecs.txoutindex_to_outputtype .push_if_needed(txoutindex, outputtype)?; @@ -608,12 +601,12 @@ impl Indexer { Ok(()) })?; // println!( - // "outpoint_to_outputtype_and_addressindex = : {:?}", + // "txouts.into_iter() = : {:?}", // i.elapsed() // ); // let i = Instant::now(); - txinindex_and_txindata + txins .into_iter() .map( #[allow(clippy::type_complexity)] @@ -699,7 +692,7 @@ impl Indexer { Ok(()) })?; - // println!("txinindex_and_txindata.into_iter(): {:?}", i.elapsed()); + // println!("txins.into_iter(): {:?}", i.elapsed()); // let i = Instant::now(); if check_collisions { @@ -762,6 +755,7 @@ impl Indexer { .insert_if_needed(txid_prefix, txindex, height); } + vecs.txindex_to_height.push_if_needed(txindex, height)?; vecs.txindex_to_txversion .push_if_needed(txindex, tx.version.into())?; vecs.txindex_to_txid.push_if_needed(txindex, txid)?; diff --git a/crates/brk_indexer/src/stores_v2.rs b/crates/brk_indexer/src/stores_v2.rs index 8e3829312..c7f922f6c 100644 --- a/crates/brk_indexer/src/stores_v2.rs +++ b/crates/brk_indexer/src/stores_v2.rs @@ -4,10 +4,11 @@ use brk_error::Result; use brk_grouper::ByAddressType; use brk_store::{AnyStore, StoreV2 as Store}; use brk_structs::{ - AddressBytes, AddressBytesHash, BlockHashPrefix, Height, StoredString, TxIndex, TxOutIndex, - TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version, + AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex, + TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version, + Vout, }; -use fjall2::{Keyspace, PersistMode, TransactionalKeyspace}; +use fjall2::{Keyspace, PersistMode}; use rayon::prelude::*; use vecdb::{AnyVec, StoredIndex, VecIterator}; @@ -17,7 +18,7 @@ use super::Vecs; #[derive(Clone)] pub struct Stores { - pub keyspace: TransactionalKeyspace, + pub keyspace: Keyspace, pub addressbyteshash_to_typeindex: Store, pub blockhashprefix_to_height: Store, @@ -390,21 +391,88 @@ impl Stores { } if starting_indexes.txoutindex != TxOutIndex::ZERO { - todo!(); - // let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.into_iter(); - // vecs.txoutindex_to_outputtype - // .iter_at(starting_indexes.txoutindex) - // .filter(|(_, outputtype)| outputtype.is_address()) - // .for_each(|(txoutindex, outputtype)| { - // let outputtype = outputtype.into_owned(); + vecs.txoutindex_to_outputtype + .iter_at(starting_indexes.txoutindex) + .zip( + vecs.txoutindex_to_typeindex + .iter_at(starting_indexes.txoutindex), + ) + .filter(|((_, outputtype), _)| outputtype.is_address()) + .for_each(|((txoutindex, outputtype), (_, typeindex))| { + let outputtype = outputtype.into_owned(); + let typeindex = typeindex.into_owned(); - // let typeindex = txoutindex_to_typeindex_iter.unwrap_get_inner(txoutindex); + let txindex = vecs + .txoutindex_to_txindex + .iter() + .get(txoutindex) + .unwrap() + .into_owned(); - // self.addresstype_to_typeindex_and_unspentoutpoint - // .get_mut(outputtype) - // .unwrap() - // .remove(TypeIndexAndTxIndex::from((typeindex, txoutindex))); - // }); + let vout = Vout::from( + txoutindex.to_usize() + - vecs + .txindex_to_first_txoutindex + .iter() + .get(txindex) + .unwrap() + .into_owned() + .to_usize(), + ); + let outpoint = OutPoint::new(txindex, vout); + + self.addresstype_to_typeindex_and_unspentoutpoint + .get_mut(outputtype) + .unwrap() + .remove(TypeIndexAndOutPoint::from((typeindex, outpoint))); + }); + + // Add back outputs that were spent after the rollback point + vecs.txinindex_to_outpoint + .iter_at(starting_indexes.txinindex) + .for_each(|(_, outpoint)| { + let outpoint = outpoint.into_owned(); + + if outpoint.is_coinbase() { + return; + } + + let txindex = outpoint.txindex(); + let vout = outpoint.vout(); + + // Calculate txoutindex from txindex and vout + let txoutindex = vecs + .txindex_to_first_txoutindex + .iter() + .get(txindex) + .unwrap() + .into_owned() + + vout; + + // Only process if this output was created before the rollback point + if txoutindex < starting_indexes.txoutindex { + let outputtype = vecs + .txoutindex_to_outputtype + .iter() + .get(txoutindex) + .unwrap() + .into_owned(); + + if outputtype.is_address() { + let typeindex = vecs + .txoutindex_to_typeindex + .iter() + .get(txoutindex) + .unwrap() + .into_owned(); + + self.addresstype_to_typeindex_and_unspentoutpoint + .get_mut(outputtype) + .unwrap() + .insert(TypeIndexAndOutPoint::from((typeindex, outpoint)), Unit); + } + } + }); } else { unreachable!(); // self.addresstype_to_typeindex_and_txindex diff --git a/crates/brk_indexer/src/vecs.rs b/crates/brk_indexer/src/vecs.rs index c3e9413b8..24429f989 100644 --- a/crates/brk_indexer/src/vecs.rs +++ b/crates/brk_indexer/src/vecs.rs @@ -22,9 +22,7 @@ pub struct Vecs { pub height_to_blockhash: RawVec, pub height_to_difficulty: CompressedVec, pub height_to_first_emptyoutputindex: CompressedVec, - pub height_to_first_txinindex: CompressedVec, pub height_to_first_opreturnindex: CompressedVec, - pub height_to_first_txoutindex: CompressedVec, pub height_to_first_p2aaddressindex: CompressedVec, pub height_to_first_p2msoutputindex: CompressedVec, pub height_to_first_p2pk33addressindex: CompressedVec, @@ -35,16 +33,14 @@ pub struct Vecs { pub height_to_first_p2wpkhaddressindex: CompressedVec, pub height_to_first_p2wshaddressindex: CompressedVec, pub height_to_first_txindex: CompressedVec, + pub height_to_first_txinindex: CompressedVec, + pub height_to_first_txoutindex: CompressedVec, pub height_to_first_unknownoutputindex: CompressedVec, /// Doesn't guarantee continuity due to possible reorgs and more generally the nature of mining pub height_to_timestamp: CompressedVec, pub height_to_total_size: CompressedVec, pub height_to_weight: CompressedVec, - pub txinindex_to_outpoint: RawVec, pub opreturnindex_to_txindex: CompressedVec, - pub txoutindex_to_outputtype: RawVec, - pub txoutindex_to_typeindex: RawVec, - pub txoutindex_to_value: RawVec, pub p2aaddressindex_to_p2abytes: RawVec, pub p2msoutputindex_to_txindex: CompressedVec, pub p2pk33addressindex_to_p2pk33bytes: RawVec, @@ -57,11 +53,17 @@ pub struct Vecs { pub txindex_to_base_size: CompressedVec, pub txindex_to_first_txinindex: CompressedVec, pub txindex_to_first_txoutindex: CompressedVec, + pub txindex_to_height: CompressedVec, pub txindex_to_is_explicitly_rbf: CompressedVec, pub txindex_to_rawlocktime: CompressedVec, pub txindex_to_total_size: CompressedVec, pub txindex_to_txid: RawVec, pub txindex_to_txversion: CompressedVec, + pub txinindex_to_outpoint: RawVec, + pub txoutindex_to_outputtype: RawVec, + pub txoutindex_to_txindex: CompressedVec, + pub txoutindex_to_typeindex: RawVec, + pub txoutindex_to_value: RawVec, pub unknownoutputindex_to_txindex: CompressedVec, } @@ -148,11 +150,7 @@ impl Vecs { height_to_timestamp: CompressedVec::forced_import(&db, "timestamp", version)?, height_to_total_size: CompressedVec::forced_import(&db, "total_size", version)?, height_to_weight: CompressedVec::forced_import(&db, "weight", version)?, - txinindex_to_outpoint: RawVec::forced_import(&db, "outpoint", version)?, opreturnindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?, - txoutindex_to_outputtype: RawVec::forced_import(&db, "outputtype", version)?, - txoutindex_to_typeindex: RawVec::forced_import(&db, "typeindex", version)?, - txoutindex_to_value: RawVec::forced_import(&db, "value", version)?, p2aaddressindex_to_p2abytes: RawVec::forced_import(&db, "p2abytes", version)?, p2msoutputindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?, p2pk33addressindex_to_p2pk33bytes: RawVec::forced_import(&db, "p2pk33bytes", version)?, @@ -163,6 +161,7 @@ impl Vecs { p2wpkhaddressindex_to_p2wpkhbytes: RawVec::forced_import(&db, "p2wpkhbytes", version)?, p2wshaddressindex_to_p2wshbytes: RawVec::forced_import(&db, "p2wshbytes", version)?, txindex_to_base_size: CompressedVec::forced_import(&db, "base_size", version)?, + txindex_to_height: CompressedVec::forced_import(&db, "height", version)?, txindex_to_first_txinindex: CompressedVec::forced_import( &db, "first_txinindex", @@ -182,6 +181,11 @@ impl Vecs { txindex_to_total_size: CompressedVec::forced_import(&db, "total_size", version)?, txindex_to_txid: RawVec::forced_import(&db, "txid", version)?, txindex_to_txversion: CompressedVec::forced_import(&db, "txversion", version)?, + txinindex_to_outpoint: RawVec::forced_import(&db, "outpoint", version)?, + txoutindex_to_outputtype: RawVec::forced_import(&db, "outputtype", version)?, + txoutindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?, + txoutindex_to_typeindex: RawVec::forced_import(&db, "typeindex", version)?, + txoutindex_to_value: RawVec::forced_import(&db, "value", version)?, unknownoutputindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?, db, @@ -371,9 +375,7 @@ impl Vecs { &mut self.height_to_blockhash, &mut self.height_to_difficulty, &mut self.height_to_first_emptyoutputindex, - &mut self.height_to_first_txinindex, &mut self.height_to_first_opreturnindex, - &mut self.height_to_first_txoutindex, &mut self.height_to_first_p2aaddressindex, &mut self.height_to_first_p2msoutputindex, &mut self.height_to_first_p2pk33addressindex, @@ -384,15 +386,13 @@ impl Vecs { &mut self.height_to_first_p2wpkhaddressindex, &mut self.height_to_first_p2wshaddressindex, &mut self.height_to_first_txindex, + &mut self.height_to_first_txinindex, + &mut self.height_to_first_txoutindex, &mut self.height_to_first_unknownoutputindex, &mut self.height_to_timestamp, &mut self.height_to_total_size, &mut self.height_to_weight, - &mut self.txinindex_to_outpoint, &mut self.opreturnindex_to_txindex, - &mut self.txoutindex_to_outputtype, - &mut self.txoutindex_to_typeindex, - &mut self.txoutindex_to_value, &mut self.p2aaddressindex_to_p2abytes, &mut self.p2msoutputindex_to_txindex, &mut self.p2pk33addressindex_to_p2pk33bytes, @@ -405,11 +405,17 @@ impl Vecs { &mut self.txindex_to_base_size, &mut self.txindex_to_first_txinindex, &mut self.txindex_to_first_txoutindex, + &mut self.txindex_to_height, &mut self.txindex_to_is_explicitly_rbf, &mut self.txindex_to_rawlocktime, &mut self.txindex_to_total_size, &mut self.txindex_to_txid, &mut self.txindex_to_txversion, + &mut self.txinindex_to_outpoint, + &mut self.txoutindex_to_outputtype, + &mut self.txoutindex_to_txindex, + &mut self.txoutindex_to_typeindex, + &mut self.txoutindex_to_value, &mut self.unknownoutputindex_to_txindex, ] .into_iter() diff --git a/crates/brk_store/src/v2/meta.rs b/crates/brk_store/src/v2/meta.rs index 9276bb035..e0e0b483b 100644 --- a/crates/brk_store/src/v2/meta.rs +++ b/crates/brk_store/src/v2/meta.rs @@ -5,7 +5,7 @@ use std::{ use brk_error::Result; use brk_structs::Version; -use fjall2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle}; +use fjall2::{Keyspace, PartitionHandle, PersistMode}; use super::Height; @@ -18,13 +18,13 @@ pub struct StoreMeta { impl StoreMeta { pub fn checked_open( - keyspace: &TransactionalKeyspace, + keyspace: &Keyspace, path: &Path, version: Version, open_partition_handle: F, - ) -> Result<(Self, TransactionalPartitionHandle)> + ) -> Result<(Self, PartitionHandle)> where - F: Fn() -> Result, + F: Fn() -> Result, { fs::create_dir_all(path)?; diff --git a/crates/brk_store/src/v2/mod.rs b/crates/brk_store/src/v2/mod.rs index 9671b15d0..a22ef020d 100644 --- a/crates/brk_store/src/v2/mod.rs +++ b/crates/brk_store/src/v2/mod.rs @@ -3,10 +3,7 @@ use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path}; use brk_error::Result; use brk_structs::{Height, Version}; use byteview6::ByteView; -use fjall2::{ - InnerItem, PartitionCreateOptions, PersistMode, TransactionalKeyspace, - TransactionalPartitionHandle, -}; +use fjall2::{InnerItem, Keyspace, PartitionCreateOptions, PartitionHandle, PersistMode}; use rustc_hash::{FxHashMap, FxHashSet}; use crate::any::AnyStore; @@ -19,18 +16,18 @@ use meta::*; pub struct StoreV2 { meta: StoreMeta, name: &'static str, - keyspace: TransactionalKeyspace, - partition: TransactionalPartitionHandle, + keyspace: Keyspace, + partition: PartitionHandle, puts: FxHashMap, dels: FxHashSet, } const MAJOR_FJALL_VERSION: Version = Version::TWO; -pub fn open_keyspace(path: &Path) -> fjall2::Result { +pub fn open_keyspace(path: &Path) -> fjall2::Result { fjall2::Config::new(path.join("fjall")) .max_write_buffer_size(32 * 1024 * 1024) - .open_transactional() + .open() } impl StoreV2 @@ -40,10 +37,10 @@ where ByteView: From + From, { fn open_partition_handle( - keyspace: &TransactionalKeyspace, + keyspace: &Keyspace, name: &str, bloom_filters: Option, - ) -> Result { + ) -> Result { let mut options = PartitionCreateOptions::default() .max_memtable_size(8 * 1024 * 1024) .manual_journal_persist(true); @@ -56,7 +53,7 @@ where } pub fn import( - keyspace: &TransactionalKeyspace, + keyspace: &Keyspace, path: &Path, name: &str, version: Version, @@ -101,16 +98,12 @@ where } pub fn is_empty(&self) -> Result { - self.keyspace - .read_tx() - .is_empty(&self.partition) - .map_err(|e| e.into()) + self.partition.is_empty().map_err(|e| e.into()) } pub fn iter(&self) -> impl Iterator { - self.keyspace - .read_tx() - .iter(&self.partition) + self.partition + .iter() .map(|res| res.unwrap()) .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) } @@ -118,11 +111,16 @@ where #[inline] pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { if self.needs(height) { - let _ = self.dels.is_empty() || self.dels.remove(&key); - self.puts.insert(key, value); + self.insert(key, value); } } + #[inline] + pub fn insert(&mut self, key: K, value: V) { + let _ = self.dels.is_empty() || self.dels.remove(&key); + self.puts.insert(key, value); + } + #[inline] pub fn remove(&mut self, key: K) { if self.puts.remove(&key).is_some() { @@ -179,7 +177,7 @@ where self.keyspace .batch() - .commit_single_partition(self.partition.inner(), items)?; + .commit_single_partition(&self.partition, items)?; Ok(()) }