indexer: move txoutindex->txindex and txindex->height from computer

This commit is contained in:
nym21
2025-10-20 13:05:46 +02:00
parent baa7c9cc22
commit 9b230d23dd
6 changed files with 164 additions and 129 deletions
-31
View File
@@ -56,7 +56,6 @@ pub struct Vecs {
pub opreturnindex_to_opreturnindex:
LazyVecFrom1<OpReturnIndex, OpReturnIndex, OpReturnIndex, TxIndex>,
pub txoutindex_to_txoutindex: LazyVecFrom1<TxOutIndex, TxOutIndex, TxOutIndex, Sats>,
pub txoutindex_to_txindex: EagerVec<TxOutIndex, TxIndex>,
pub p2aaddressindex_to_p2aaddressindex:
LazyVecFrom1<P2AAddressIndex, P2AAddressIndex, P2AAddressIndex, P2ABytes>,
pub p2msoutputindex_to_p2msoutputindex:
@@ -81,7 +80,6 @@ pub struct Vecs {
pub semesterindex_to_first_monthindex: EagerVec<SemesterIndex, MonthIndex>,
pub semesterindex_to_monthindex_count: EagerVec<SemesterIndex, StoredU64>,
pub semesterindex_to_semesterindex: EagerVec<SemesterIndex, SemesterIndex>,
pub txindex_to_height: EagerVec<TxIndex, Height>,
pub txindex_to_input_count:
LazyVecFrom2<TxIndex, StoredU64, TxIndex, TxInIndex, TxInIndex, TxOutIndex>,
pub txindex_to_output_count:
@@ -411,11 +409,6 @@ impl Vecs {
"dateindex",
version + VERSION + Version::ZERO,
)?,
txindex_to_height: EagerVec::forced_import_compressed(
&db,
"height",
version + VERSION + Version::ZERO,
)?,
height_to_timestamp_fixed: EagerVec::forced_import_compressed(
&db,
"timestamp_fixed",
@@ -466,12 +459,6 @@ impl Vecs {
"yearindex_count",
version + VERSION + Version::ZERO,
)?,
txoutindex_to_txindex: EagerVec::forced_import_compressed(
&db,
"txindex",
version + VERSION + Version::ZERO,
)?,
db,
};
@@ -501,17 +488,6 @@ impl Vecs {
starting_indexes: brk_indexer::Indexes,
exit: &Exit,
) -> Result<Indexes> {
// ---
// TxOutIndex
// ---
self.txoutindex_to_txindex.compute_inverse_less_to_more(
starting_indexes.txindex,
&indexer.vecs.txindex_to_first_txoutindex,
&self.txindex_to_output_count,
exit,
)?;
// ---
// TxIndex
// ---
@@ -523,13 +499,6 @@ impl Vecs {
exit,
)?;
self.txindex_to_height.compute_inverse_less_to_more(
starting_indexes.height,
&indexer.vecs.height_to_first_txindex,
&self.height_to_txindex_count,
exit,
)?;
// ---
// Height
// ---
+34 -40
View File
@@ -197,29 +197,24 @@ impl Indexer {
))
})
.collect::<Result<Vec<_>>>()?;
// println!("txid_prefix_and_txid_and_... = : {:?}", i.elapsed());
// println!("txs = : {:?}", i.elapsed());
// let i = Instant::now();
let txid_prefix_to_txindex = txs
.iter()
.map(|(txindex, _, _, prefix, _)| (*prefix, txindex))
.collect::<FxHashMap<_, _>>();
// let i = Instant::now();
let inputs = block
let txins = block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.input
.iter()
.enumerate()
.map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx))
})
.collect::<Vec<_>>();
// println!("inputs = : {:?}", i.elapsed());
// let i = Instant::now();
let txinindex_and_txindata = inputs
.flat_map(|(index, tx)| tx
.input
.iter()
.enumerate()
.map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx))
)
.collect::<Vec<_>>()
.into_par_iter()
.enumerate()
.map(|(block_txinindex, (block_txindex, vin, txin, tx))| -> Result<(TxInIndex, InputSource)> {
@@ -292,25 +287,24 @@ impl Indexer {
Ok((txinindex, InputSource::PreviousBlock(tuple)))
})
.collect::<Result<Vec<_>>>()?;
drop(txid_prefix_to_txindex);
// println!("txinindex_and_txindata = : {:?}", i.elapsed());
// let i = Instant::now();
same_block_spent_outpoints.extend(txinindex_and_txindata.iter().filter_map(
|(_, input_source)| {
let InputSource::SameBlock((_, _, _, outpoint)) = input_source else {
return None;
};
if !outpoint.is_coinbase() {
Some(*outpoint)
} else {
None
}
},
));
same_block_spent_outpoints.extend(txins.iter().filter_map(|(_, input_source)| {
let InputSource::SameBlock((_, _, _, outpoint)) = input_source else {
return None;
};
if !outpoint.is_coinbase() {
Some(*outpoint)
} else {
None
}
}));
// println!("same_block_spent_outpoints = : {:?}", i.elapsed());
// let i = Instant::now();
let outputs = block
let txouts = block
.txdata
.iter()
.enumerate()
@@ -319,11 +313,7 @@ impl Indexer {
(TxIndex::from(index), Vout::from(vout), txout, tx)
})
})
.collect::<Vec<_>>();
// println!("outputs = : {:?}", i.elapsed());
// let i = Instant::now();
let txoutindex_to_txoutdata = outputs
.collect::<Vec<_>>()
.into_par_iter()
.enumerate()
.map(
@@ -464,14 +454,14 @@ impl Indexer {
},
)
.collect::<Result<Vec<_>>>()?;
// println!("txoutindex_to_txoutdata = : {:?}", i.elapsed());
// println!("txouts = : {:?}", i.elapsed());
let outputs_len = txoutindex_to_txoutdata.len();
let inputs_len = txinindex_and_txindata.len();
let outputs_len = txouts.len();
let inputs_len = txins.len();
let tx_len = block.txdata.len();
// let i = Instant::now();
txoutindex_to_txoutdata
txouts
.into_iter()
.try_for_each(|data| -> Result<()> {
let (
@@ -493,6 +483,9 @@ impl Indexer {
vecs.txoutindex_to_value.push_if_needed(txoutindex, sats)?;
vecs.txoutindex_to_txindex
.push_if_needed(txoutindex, txindex)?;
vecs.txoutindex_to_outputtype
.push_if_needed(txoutindex, outputtype)?;
@@ -608,12 +601,12 @@ impl Indexer {
Ok(())
})?;
// println!(
// "outpoint_to_outputtype_and_addressindex = : {:?}",
// "txouts.into_iter() = : {:?}",
// i.elapsed()
// );
// let i = Instant::now();
txinindex_and_txindata
txins
.into_iter()
.map(
#[allow(clippy::type_complexity)]
@@ -699,7 +692,7 @@ impl Indexer {
Ok(())
})?;
// println!("txinindex_and_txindata.into_iter(): {:?}", i.elapsed());
// println!("txins.into_iter(): {:?}", i.elapsed());
// let i = Instant::now();
if check_collisions {
@@ -762,6 +755,7 @@ impl Indexer {
.insert_if_needed(txid_prefix, txindex, height);
}
vecs.txindex_to_height.push_if_needed(txindex, height)?;
vecs.txindex_to_txversion
.push_if_needed(txindex, tx.version.into())?;
vecs.txindex_to_txid.push_if_needed(txindex, txid)?;
+85 -17
View File
@@ -4,10 +4,11 @@ use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, StoreV2 as Store};
use brk_structs::{
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, StoredString, TxIndex, TxOutIndex,
TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex,
TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
Vout,
};
use fjall2::{Keyspace, PersistMode, TransactionalKeyspace};
use fjall2::{Keyspace, PersistMode};
use rayon::prelude::*;
use vecdb::{AnyVec, StoredIndex, VecIterator};
@@ -17,7 +18,7 @@ use super::Vecs;
#[derive(Clone)]
pub struct Stores {
pub keyspace: TransactionalKeyspace,
pub keyspace: Keyspace,
pub addressbyteshash_to_typeindex: Store<AddressBytesHash, TypeIndex>,
pub blockhashprefix_to_height: Store<BlockHashPrefix, Height>,
@@ -390,21 +391,88 @@ impl Stores {
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
todo!();
// let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.into_iter();
// vecs.txoutindex_to_outputtype
// .iter_at(starting_indexes.txoutindex)
// .filter(|(_, outputtype)| outputtype.is_address())
// .for_each(|(txoutindex, outputtype)| {
// let outputtype = outputtype.into_owned();
vecs.txoutindex_to_outputtype
.iter_at(starting_indexes.txoutindex)
.zip(
vecs.txoutindex_to_typeindex
.iter_at(starting_indexes.txoutindex),
)
.filter(|((_, outputtype), _)| outputtype.is_address())
.for_each(|((txoutindex, outputtype), (_, typeindex))| {
let outputtype = outputtype.into_owned();
let typeindex = typeindex.into_owned();
// let typeindex = txoutindex_to_typeindex_iter.unwrap_get_inner(txoutindex);
let txindex = vecs
.txoutindex_to_txindex
.iter()
.get(txoutindex)
.unwrap()
.into_owned();
// self.addresstype_to_typeindex_and_unspentoutpoint
// .get_mut(outputtype)
// .unwrap()
// .remove(TypeIndexAndTxIndex::from((typeindex, txoutindex)));
// });
let vout = Vout::from(
txoutindex.to_usize()
- vecs
.txindex_to_first_txoutindex
.iter()
.get(txindex)
.unwrap()
.into_owned()
.to_usize(),
);
let outpoint = OutPoint::new(txindex, vout);
self.addresstype_to_typeindex_and_unspentoutpoint
.get_mut(outputtype)
.unwrap()
.remove(TypeIndexAndOutPoint::from((typeindex, outpoint)));
});
// Add back outputs that were spent after the rollback point
vecs.txinindex_to_outpoint
.iter_at(starting_indexes.txinindex)
.for_each(|(_, outpoint)| {
let outpoint = outpoint.into_owned();
if outpoint.is_coinbase() {
return;
}
let txindex = outpoint.txindex();
let vout = outpoint.vout();
// Calculate txoutindex from txindex and vout
let txoutindex = vecs
.txindex_to_first_txoutindex
.iter()
.get(txindex)
.unwrap()
.into_owned()
+ vout;
// Only process if this output was created before the rollback point
if txoutindex < starting_indexes.txoutindex {
let outputtype = vecs
.txoutindex_to_outputtype
.iter()
.get(txoutindex)
.unwrap()
.into_owned();
if outputtype.is_address() {
let typeindex = vecs
.txoutindex_to_typeindex
.iter()
.get(txoutindex)
.unwrap()
.into_owned();
self.addresstype_to_typeindex_and_unspentoutpoint
.get_mut(outputtype)
.unwrap()
.insert(TypeIndexAndOutPoint::from((typeindex, outpoint)), Unit);
}
}
});
} else {
unreachable!();
// self.addresstype_to_typeindex_and_txindex
+22 -16
View File
@@ -22,9 +22,7 @@ pub struct Vecs {
pub height_to_blockhash: RawVec<Height, BlockHash>,
pub height_to_difficulty: CompressedVec<Height, StoredF64>,
pub height_to_first_emptyoutputindex: CompressedVec<Height, EmptyOutputIndex>,
pub height_to_first_txinindex: CompressedVec<Height, TxInIndex>,
pub height_to_first_opreturnindex: CompressedVec<Height, OpReturnIndex>,
pub height_to_first_txoutindex: CompressedVec<Height, TxOutIndex>,
pub height_to_first_p2aaddressindex: CompressedVec<Height, P2AAddressIndex>,
pub height_to_first_p2msoutputindex: CompressedVec<Height, P2MSOutputIndex>,
pub height_to_first_p2pk33addressindex: CompressedVec<Height, P2PK33AddressIndex>,
@@ -35,16 +33,14 @@ pub struct Vecs {
pub height_to_first_p2wpkhaddressindex: CompressedVec<Height, P2WPKHAddressIndex>,
pub height_to_first_p2wshaddressindex: CompressedVec<Height, P2WSHAddressIndex>,
pub height_to_first_txindex: CompressedVec<Height, TxIndex>,
pub height_to_first_txinindex: CompressedVec<Height, TxInIndex>,
pub height_to_first_txoutindex: CompressedVec<Height, TxOutIndex>,
pub height_to_first_unknownoutputindex: CompressedVec<Height, UnknownOutputIndex>,
/// Doesn't guarantee continuity due to possible reorgs and more generally the nature of mining
pub height_to_timestamp: CompressedVec<Height, Timestamp>,
pub height_to_total_size: CompressedVec<Height, StoredU64>,
pub height_to_weight: CompressedVec<Height, Weight>,
pub txinindex_to_outpoint: RawVec<TxInIndex, OutPoint>,
pub opreturnindex_to_txindex: CompressedVec<OpReturnIndex, TxIndex>,
pub txoutindex_to_outputtype: RawVec<TxOutIndex, OutputType>,
pub txoutindex_to_typeindex: RawVec<TxOutIndex, TypeIndex>,
pub txoutindex_to_value: RawVec<TxOutIndex, Sats>,
pub p2aaddressindex_to_p2abytes: RawVec<P2AAddressIndex, P2ABytes>,
pub p2msoutputindex_to_txindex: CompressedVec<P2MSOutputIndex, TxIndex>,
pub p2pk33addressindex_to_p2pk33bytes: RawVec<P2PK33AddressIndex, P2PK33Bytes>,
@@ -57,11 +53,17 @@ pub struct Vecs {
pub txindex_to_base_size: CompressedVec<TxIndex, StoredU32>,
pub txindex_to_first_txinindex: CompressedVec<TxIndex, TxInIndex>,
pub txindex_to_first_txoutindex: CompressedVec<TxIndex, TxOutIndex>,
pub txindex_to_height: CompressedVec<TxIndex, Height>,
pub txindex_to_is_explicitly_rbf: CompressedVec<TxIndex, StoredBool>,
pub txindex_to_rawlocktime: CompressedVec<TxIndex, RawLockTime>,
pub txindex_to_total_size: CompressedVec<TxIndex, StoredU32>,
pub txindex_to_txid: RawVec<TxIndex, Txid>,
pub txindex_to_txversion: CompressedVec<TxIndex, TxVersion>,
pub txinindex_to_outpoint: RawVec<TxInIndex, OutPoint>,
pub txoutindex_to_outputtype: RawVec<TxOutIndex, OutputType>,
pub txoutindex_to_txindex: CompressedVec<TxOutIndex, TxIndex>,
pub txoutindex_to_typeindex: RawVec<TxOutIndex, TypeIndex>,
pub txoutindex_to_value: RawVec<TxOutIndex, Sats>,
pub unknownoutputindex_to_txindex: CompressedVec<UnknownOutputIndex, TxIndex>,
}
@@ -148,11 +150,7 @@ impl Vecs {
height_to_timestamp: CompressedVec::forced_import(&db, "timestamp", version)?,
height_to_total_size: CompressedVec::forced_import(&db, "total_size", version)?,
height_to_weight: CompressedVec::forced_import(&db, "weight", version)?,
txinindex_to_outpoint: RawVec::forced_import(&db, "outpoint", version)?,
opreturnindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?,
txoutindex_to_outputtype: RawVec::forced_import(&db, "outputtype", version)?,
txoutindex_to_typeindex: RawVec::forced_import(&db, "typeindex", version)?,
txoutindex_to_value: RawVec::forced_import(&db, "value", version)?,
p2aaddressindex_to_p2abytes: RawVec::forced_import(&db, "p2abytes", version)?,
p2msoutputindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?,
p2pk33addressindex_to_p2pk33bytes: RawVec::forced_import(&db, "p2pk33bytes", version)?,
@@ -163,6 +161,7 @@ impl Vecs {
p2wpkhaddressindex_to_p2wpkhbytes: RawVec::forced_import(&db, "p2wpkhbytes", version)?,
p2wshaddressindex_to_p2wshbytes: RawVec::forced_import(&db, "p2wshbytes", version)?,
txindex_to_base_size: CompressedVec::forced_import(&db, "base_size", version)?,
txindex_to_height: CompressedVec::forced_import(&db, "height", version)?,
txindex_to_first_txinindex: CompressedVec::forced_import(
&db,
"first_txinindex",
@@ -182,6 +181,11 @@ impl Vecs {
txindex_to_total_size: CompressedVec::forced_import(&db, "total_size", version)?,
txindex_to_txid: RawVec::forced_import(&db, "txid", version)?,
txindex_to_txversion: CompressedVec::forced_import(&db, "txversion", version)?,
txinindex_to_outpoint: RawVec::forced_import(&db, "outpoint", version)?,
txoutindex_to_outputtype: RawVec::forced_import(&db, "outputtype", version)?,
txoutindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?,
txoutindex_to_typeindex: RawVec::forced_import(&db, "typeindex", version)?,
txoutindex_to_value: RawVec::forced_import(&db, "value", version)?,
unknownoutputindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?,
db,
@@ -371,9 +375,7 @@ impl Vecs {
&mut self.height_to_blockhash,
&mut self.height_to_difficulty,
&mut self.height_to_first_emptyoutputindex,
&mut self.height_to_first_txinindex,
&mut self.height_to_first_opreturnindex,
&mut self.height_to_first_txoutindex,
&mut self.height_to_first_p2aaddressindex,
&mut self.height_to_first_p2msoutputindex,
&mut self.height_to_first_p2pk33addressindex,
@@ -384,15 +386,13 @@ impl Vecs {
&mut self.height_to_first_p2wpkhaddressindex,
&mut self.height_to_first_p2wshaddressindex,
&mut self.height_to_first_txindex,
&mut self.height_to_first_txinindex,
&mut self.height_to_first_txoutindex,
&mut self.height_to_first_unknownoutputindex,
&mut self.height_to_timestamp,
&mut self.height_to_total_size,
&mut self.height_to_weight,
&mut self.txinindex_to_outpoint,
&mut self.opreturnindex_to_txindex,
&mut self.txoutindex_to_outputtype,
&mut self.txoutindex_to_typeindex,
&mut self.txoutindex_to_value,
&mut self.p2aaddressindex_to_p2abytes,
&mut self.p2msoutputindex_to_txindex,
&mut self.p2pk33addressindex_to_p2pk33bytes,
@@ -405,11 +405,17 @@ impl Vecs {
&mut self.txindex_to_base_size,
&mut self.txindex_to_first_txinindex,
&mut self.txindex_to_first_txoutindex,
&mut self.txindex_to_height,
&mut self.txindex_to_is_explicitly_rbf,
&mut self.txindex_to_rawlocktime,
&mut self.txindex_to_total_size,
&mut self.txindex_to_txid,
&mut self.txindex_to_txversion,
&mut self.txinindex_to_outpoint,
&mut self.txoutindex_to_outputtype,
&mut self.txoutindex_to_txindex,
&mut self.txoutindex_to_typeindex,
&mut self.txoutindex_to_value,
&mut self.unknownoutputindex_to_txindex,
]
.into_iter()
+4 -4
View File
@@ -5,7 +5,7 @@ use std::{
use brk_error::Result;
use brk_structs::Version;
use fjall2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle};
use fjall2::{Keyspace, PartitionHandle, PersistMode};
use super::Height;
@@ -18,13 +18,13 @@ pub struct StoreMeta {
impl StoreMeta {
pub fn checked_open<F>(
keyspace: &TransactionalKeyspace,
keyspace: &Keyspace,
path: &Path,
version: Version,
open_partition_handle: F,
) -> Result<(Self, TransactionalPartitionHandle)>
) -> Result<(Self, PartitionHandle)>
where
F: Fn() -> Result<TransactionalPartitionHandle>,
F: Fn() -> Result<PartitionHandle>,
{
fs::create_dir_all(path)?;
+19 -21
View File
@@ -3,10 +3,7 @@ use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path};
use brk_error::Result;
use brk_structs::{Height, Version};
use byteview6::ByteView;
use fjall2::{
InnerItem, PartitionCreateOptions, PersistMode, TransactionalKeyspace,
TransactionalPartitionHandle,
};
use fjall2::{InnerItem, Keyspace, PartitionCreateOptions, PartitionHandle, PersistMode};
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
@@ -19,18 +16,18 @@ use meta::*;
pub struct StoreV2<Key, Value> {
meta: StoreMeta,
name: &'static str,
keyspace: TransactionalKeyspace,
partition: TransactionalPartitionHandle,
keyspace: Keyspace,
partition: PartitionHandle,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
}
const MAJOR_FJALL_VERSION: Version = Version::TWO;
pub fn open_keyspace(path: &Path) -> fjall2::Result<TransactionalKeyspace> {
pub fn open_keyspace(path: &Path) -> fjall2::Result<Keyspace> {
fjall2::Config::new(path.join("fjall"))
.max_write_buffer_size(32 * 1024 * 1024)
.open_transactional()
.open()
}
impl<K, V> StoreV2<K, V>
@@ -40,10 +37,10 @@ where
ByteView: From<K> + From<V>,
{
fn open_partition_handle(
keyspace: &TransactionalKeyspace,
keyspace: &Keyspace,
name: &str,
bloom_filters: Option<bool>,
) -> Result<TransactionalPartitionHandle> {
) -> Result<PartitionHandle> {
let mut options = PartitionCreateOptions::default()
.max_memtable_size(8 * 1024 * 1024)
.manual_journal_persist(true);
@@ -56,7 +53,7 @@ where
}
pub fn import(
keyspace: &TransactionalKeyspace,
keyspace: &Keyspace,
path: &Path,
name: &str,
version: Version,
@@ -101,16 +98,12 @@ where
}
pub fn is_empty(&self) -> Result<bool> {
self.keyspace
.read_tx()
.is_empty(&self.partition)
.map_err(|e| e.into())
self.partition.is_empty().map_err(|e| e.into())
}
pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
self.keyspace
.read_tx()
.iter(&self.partition)
self.partition
.iter()
.map(|res| res.unwrap())
.map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
}
@@ -118,11 +111,16 @@ where
#[inline]
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
self.insert(key, value);
}
}
#[inline]
pub fn insert(&mut self, key: K, value: V) {
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
#[inline]
pub fn remove(&mut self, key: K) {
if self.puts.remove(&key).is_some() {
@@ -179,7 +177,7 @@ where
self.keyspace
.batch()
.commit_single_partition(self.partition.inner(), items)?;
.commit_single_partition(&self.partition, items)?;
Ok(())
}