diff --git a/crates/brk_indexer/examples/indexer.rs b/crates/brk_indexer/examples/indexer.rs index ac33d12b4..06ae1dc14 100644 --- a/crates/brk_indexer/examples/indexer.rs +++ b/crates/brk_indexer/examples/indexer.rs @@ -36,8 +36,6 @@ fn main() -> color_eyre::Result<()> { let blocks = Blocks::new(&client, &reader); debug!("Blocks created."); - fs::create_dir_all(&outputs_dir)?; - let mut indexer = Indexer::forced_import(&outputs_dir)?; debug!("Indexer imported."); diff --git a/crates/brk_indexer/examples/indexer_bench2.rs b/crates/brk_indexer/examples/indexer_bench2.rs index 14d496b5c..8bb438701 100644 --- a/crates/brk_indexer/examples/indexer_bench2.rs +++ b/crates/brk_indexer/examples/indexer_bench2.rs @@ -35,8 +35,6 @@ fn main() -> Result<()> { let blocks = Blocks::new(&client, &reader); - fs::create_dir_all(&outputs_dir)?; - let mut indexer = Indexer::forced_import(&outputs_dir)?; let mut bencher = diff --git a/crates/brk_indexer/examples/indexer_read.rs b/crates/brk_indexer/examples/indexer_read.rs index eeecf1e3c..ac3c04735 100644 --- a/crates/brk_indexer/examples/indexer_read.rs +++ b/crates/brk_indexer/examples/indexer_read.rs @@ -1,8 +1,8 @@ +use std::{fs, path::Path}; + use brk_error::Result; use brk_indexer::Indexer; use vecdb::ReadableVec; -// use brk_types::Sats; -use std::{fs, path::Path}; fn main() -> Result<()> { brk_logger::init(Some(Path::new(".log")))?; @@ -12,17 +12,7 @@ fn main() -> Result<()> { let indexer = Indexer::forced_import(&outputs_dir)?; - // let mut sum = Sats::ZERO; - // let mut count: usize = 0; - - // for value in indexer.vecs.txoutindex_to_value.clean_iter() { - // sum += value; - // count += 1; - // } - - // println!("sum = {sum}, count = {count}"); - - dbg!(indexer.vecs.outputs.value.collect_range(0, 200)); + println!("{:?}", indexer.vecs.outputs.value.collect_range(0, 200)); Ok(()) } diff --git a/crates/brk_indexer/examples/indexer_read_speed.rs b/crates/brk_indexer/examples/indexer_read_speed.rs index 78ed2897c..6157ea7c3 100644 --- a/crates/brk_indexer/examples/indexer_read_speed.rs +++ b/crates/brk_indexer/examples/indexer_read_speed.rs @@ -31,7 +31,7 @@ fn main() -> Result<()> { println!("Loading indexer from: {}", outputs_dir.display()); let indexer = Indexer::forced_import(&outputs_dir)?; - println!("✅ Indexer funded\n"); + println!("Indexer loaded.\n"); // Warmup run println!("🔥 Warmup run..."); diff --git a/crates/brk_indexer/src/indexes.rs b/crates/brk_indexer/src/indexes.rs index 84c8da079..c94fe99f4 100644 --- a/crates/brk_indexer/src/indexes.rs +++ b/crates/brk_indexer/src/indexes.rs @@ -1,12 +1,10 @@ use brk_error::Result; -use brk_types::Height; +use brk_types::{Height, Indexes}; use tracing::{debug, info}; use vecdb::{AnyStoredVec, WritableVec, PcoVec, PcoVecValue, ReadableVec, VecIndex, VecValue}; use crate::{Stores, Vecs}; -pub use brk_types::Indexes; - /// Extension trait for Indexes with brk_indexer-specific functionality. pub trait IndexesExt { fn checked_push(&self, vecs: &mut Vecs) -> Result<()>; @@ -93,98 +91,85 @@ impl IndexesExt for Indexes { &vecs.scripts.first_emptyoutputindex, &vecs.scripts.empty_to_txindex, starting_height, - ) - .unwrap(); + )?; let p2msoutputindex = starting_index( &vecs.scripts.first_p2msoutputindex, &vecs.scripts.p2ms_to_txindex, starting_height, - ) - .unwrap(); + )?; let opreturnindex = starting_index( &vecs.scripts.first_opreturnindex, &vecs.scripts.opreturn_to_txindex, starting_height, - ) - .unwrap(); + )?; let p2pk33addressindex = starting_index( &vecs.addresses.first_p2pk33addressindex, &vecs.addresses.p2pk33bytes, starting_height, - ) - .unwrap(); + )?; let p2pk65addressindex = starting_index( &vecs.addresses.first_p2pk65addressindex, &vecs.addresses.p2pk65bytes, starting_height, - ) - .unwrap(); + )?; let p2pkhaddressindex = starting_index( &vecs.addresses.first_p2pkhaddressindex, &vecs.addresses.p2pkhbytes, starting_height, - ) - .unwrap(); + )?; let p2shaddressindex = starting_index( &vecs.addresses.first_p2shaddressindex, &vecs.addresses.p2shbytes, starting_height, - ) - .unwrap(); + )?; let p2traddressindex = starting_index( &vecs.addresses.first_p2traddressindex, &vecs.addresses.p2trbytes, starting_height, - ) - .unwrap(); + )?; let p2wpkhaddressindex = starting_index( &vecs.addresses.first_p2wpkhaddressindex, &vecs.addresses.p2wpkhbytes, starting_height, - ) - .unwrap(); + )?; let p2wshaddressindex = starting_index( &vecs.addresses.first_p2wshaddressindex, &vecs.addresses.p2wshbytes, starting_height, - ) - .unwrap(); + )?; let p2aaddressindex = starting_index( &vecs.addresses.first_p2aaddressindex, &vecs.addresses.p2abytes, starting_height, - ) - .unwrap(); + )?; let txindex = starting_index( &vecs.transactions.first_txindex, &vecs.transactions.txid, starting_height, - ) - .unwrap(); + )?; let txinindex = - starting_index(&vecs.inputs.first_txinindex, &vecs.inputs.outpoint, starting_height).unwrap(); + starting_index(&vecs.inputs.first_txinindex, &vecs.inputs.outpoint, starting_height)?; let txoutindex = - starting_index(&vecs.outputs.first_txoutindex, &vecs.outputs.value, starting_height).unwrap(); + starting_index(&vecs.outputs.first_txoutindex, &vecs.outputs.value, starting_height)?; let unknownoutputindex = starting_index( &vecs.scripts.first_unknownoutputindex, &vecs.scripts.unknown_to_txindex, starting_height, - ) - .unwrap(); + )?; Some(Indexes { emptyoutputindex, diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 7754db4f8..b11f83999 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -16,10 +16,12 @@ mod stores; mod vecs; use constants::*; -pub use indexes::*; -pub use processor::*; -pub use readers::*; -pub use stores::*; +use indexes::IndexesExt; +use processor::{BlockBuffers, BlockProcessor}; +use readers::Readers; + +pub use brk_types::Indexes; +pub use stores::Stores; pub use vecs::*; #[derive(Clone)] @@ -136,9 +138,8 @@ impl Indexer { let mut indexes = starting_indexes.clone(); debug!("Indexes cloned."); - let is_export_height = |height: Height| -> bool { - height != 0 && height % SNAPSHOT_BLOCK_RANGE == 0 - }; + let is_export_height = + |height: Height| -> bool { height != 0 && height % SNAPSHOT_BLOCK_RANGE == 0 }; let export = move |stores: &mut Stores, vecs: &mut Vecs, height: Height| -> Result<()> { info!("Exporting..."); diff --git a/crates/brk_indexer/src/processor/tx.rs b/crates/brk_indexer/src/processor/tx.rs index 8c84b1da6..3023a10a1 100644 --- a/crates/brk_indexer/src/processor/tx.rs +++ b/crates/brk_indexer/src/processor/tx.rs @@ -2,6 +2,7 @@ use brk_error::{Error, Result}; use brk_store::Store; use brk_types::{Height, StoredBool, TxIndex, Txid, TxidPrefix}; use rayon::prelude::*; +use tracing::error; use vecdb::{AnyVec, WritableVec, likely}; use crate::TxMetadataVecs; @@ -69,13 +70,17 @@ impl<'a> BlockProcessor<'a> { .get_pushed_or_read(prev_txindex, &self.readers.txid) .ok_or(Error::Internal("Missing txid for txindex")) .inspect_err(|_| { - dbg!(ct.txindex, len); + error!(txindex = ?ct.txindex, len, "Missing txid for txindex"); })?; let is_dup = DUPLICATE_TXIDS.contains(&prev_txid); if !is_dup { - dbg!(self.height, ct.txindex, prev_txid, prev_txindex); + error!( + height = ?self.height, txindex = ?ct.txindex, + ?prev_txid, ?prev_txindex, + "Unexpected TXID collision" + ); return Err(Error::Internal("Unexpected TXID collision")); } } diff --git a/crates/brk_indexer/src/processor/txin.rs b/crates/brk_indexer/src/processor/txin.rs index e49fbaeef..cb986dd17 100644 --- a/crates/brk_indexer/src/processor/txin.rs +++ b/crates/brk_indexer/src/processor/txin.rs @@ -7,6 +7,7 @@ use brk_types::{ }; use rayon::prelude::*; use rustc_hash::{FxHashMap, FxHashSet}; +use tracing::error; use vecdb::{PcoVec, WritableVec}; use super::{BlockProcessor, ComputedTx, InputSource, SameBlockOutputInfo}; @@ -80,7 +81,7 @@ impl<'a> BlockProcessor<'a> { let prev_txindex = match store_result { Some(txindex) if txindex < self.indexes.txindex => txindex, _ => { - tracing::error!( + error!( "UnknownTxid: txid={}, prefix={:?}, store_result={:?}, current_txindex={:?}", txid, txid_prefix, store_result, self.indexes.txindex ); @@ -170,7 +171,7 @@ pub(super) fn finalize_inputs( .remove(&outpoint) .ok_or(Error::Internal("Same-block output not found")) .inspect_err(|_| { - dbg!(&same_block_output_info, outpoint); + error!(?outpoint, remaining = same_block_output_info.len(), "Same-block output not found"); })?; (vin, txindex, outpoint, info.outputtype, info.typeindex) } diff --git a/crates/brk_indexer/src/processor/txout.rs b/crates/brk_indexer/src/processor/txout.rs index 515a9b74f..e0421752a 100644 --- a/crates/brk_indexer/src/processor/txout.rs +++ b/crates/brk_indexer/src/processor/txout.rs @@ -6,6 +6,7 @@ use brk_types::{ Sats, TxIndex, TxOutIndex, TypeIndex, Unit, Vout, }; use rayon::prelude::*; +use tracing::error; use rustc_hash::{FxHashMap, FxHashSet}; use vecdb::{BytesVec, WritableVec}; @@ -24,7 +25,7 @@ impl<'a> BlockProcessor<'a> { let mut items = Vec::with_capacity(total_outputs); for (index, tx) in self.block.txdata.iter().enumerate() { for (vout, txout) in tx.output.iter().enumerate() { - items.push((TxIndex::from(index), Vout::from(vout), txout, tx)); + items.push((TxIndex::from(index), Vout::from(vout), txout)); } } @@ -32,7 +33,7 @@ impl<'a> BlockProcessor<'a> { .into_par_iter() .enumerate() .map( - |(block_txoutindex, (block_txindex, vout, txout, tx))| -> Result { + |(block_txoutindex, (block_txindex, vout, txout))| -> Result { let txindex = base_txindex + block_txindex; let txoutindex = base_txoutindex + TxOutIndex::from(block_txoutindex); @@ -67,7 +68,7 @@ impl<'a> BlockProcessor<'a> { }); if check_collisions && let Some(typeindex) = existing_typeindex { - let prev_addressbytes = self.vecs.get_addressbytes_by_type( + let prev_addressbytes = self.vecs.addresses.get_bytes_by_type( addresstype, typeindex, &self.readers.addressbytes, @@ -75,21 +76,12 @@ impl<'a> BlockProcessor<'a> { .ok_or(Error::Internal("Missing addressbytes"))?; if prev_addressbytes != address_bytes { - let txid = tx.compute_txid(); - dbg!( - height, - txid, - vout, - block_txindex, - addresstype, - prev_addressbytes, - &address_bytes, - &self.indexes, - typeindex, - txout, - AddressHash::from(&address_bytes), + error!( + ?height, ?vout, ?block_txindex, ?addresstype, + ?prev_addressbytes, ?address_bytes, ?typeindex, + "Address hash collision" ); - panic!() + return Err(Error::Internal("Address hash collision")); } } diff --git a/crates/brk_indexer/src/stores.rs b/crates/brk_indexer/src/stores.rs index 4b71ad1a2..49822f21d 100644 --- a/crates/brk_indexer/src/stores.rs +++ b/crates/brk_indexer/src/stores.rs @@ -33,6 +33,10 @@ pub struct Stores { impl Stores { pub fn forced_import(parent: &Path, version: Version) -> Result { + Self::forced_import_inner(parent, version, true) + } + + fn forced_import_inner(parent: &Path, version: Version, can_retry: bool) -> Result { let pathbuf = parent.join("stores"); let path = pathbuf.as_path(); @@ -40,10 +44,11 @@ impl Stores { let database = match brk_store::open_database(path) { Ok(database) => database, - Err(_) => { + Err(_) if can_retry => { fs::remove_dir_all(path)?; - return Self::forced_import(parent, version); + return Self::forced_import_inner(parent, version, false); } + Err(err) => return Err(err.into()), }; let database_ref = &database; @@ -194,7 +199,28 @@ impl Stores { vecs: &mut Vecs, starting_indexes: &Indexes, ) -> Result<()> { - if self.blockhashprefix_to_height.is_empty()? + if self.is_empty()? { + return Ok(()); + } + + debug_assert!(starting_indexes.height != Height::ZERO); + debug_assert!(starting_indexes.txindex != TxIndex::ZERO); + debug_assert!(starting_indexes.txoutindex != TxOutIndex::ZERO); + + self.rollback_block_metadata(vecs, starting_indexes)?; + self.rollback_txids(vecs, starting_indexes); + self.rollback_outputs_and_inputs(vecs, starting_indexes); + + let rollback_height = starting_indexes.height.decremented().unwrap_or_default(); + self.par_iter_any_mut() + .try_for_each(|store| store.export_meta(rollback_height))?; + self.commit(rollback_height)?; + + Ok(()) + } + + fn is_empty(&self) -> Result { + Ok(self.blockhashprefix_to_height.is_empty()? && self.txidprefix_to_txindex.is_empty()? && self.height_to_coinbase_tag.is_empty()? && self @@ -208,213 +234,154 @@ impl Stores { && self .addresstype_to_addressindex_and_unspentoutpoint .values() - .try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))? - { - return Ok(()); + .try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?) + } + + fn rollback_block_metadata( + &mut self, + vecs: &mut Vecs, + starting_indexes: &Indexes, + ) -> Result<()> { + vecs.blocks.blockhash.for_each_range( + starting_indexes.height.to_usize(), + vecs.blocks.blockhash.len(), + |blockhash| { + self.blockhashprefix_to_height + .remove(BlockHashPrefix::from(blockhash)); + }, + ); + + (starting_indexes.height.to_usize()..vecs.blocks.blockhash.len()) + .map(Height::from) + .for_each(|h| { + self.height_to_coinbase_tag.remove(h); + }); + + for address_type in OutputType::ADDRESS_TYPES { + for hash in vecs.iter_address_hashes_from(address_type, starting_indexes.height)? { + self.addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(address_type) + .remove(hash); + } } - if starting_indexes.height != Height::ZERO { - vecs.blocks.blockhash.for_each_range( - starting_indexes.height.to_usize(), - vecs.blocks.blockhash.len(), - |blockhash| { - let prefix = BlockHashPrefix::from(blockhash); - self.blockhashprefix_to_height.remove(prefix); - }, - ); - - (starting_indexes.height.to_usize()..vecs.blocks.blockhash.len()) - .map(Height::from) - .for_each(|h| { - self.height_to_coinbase_tag.remove(h); - }); - - // Remove address hashes for all address types starting from rollback height - // (each address only appears once in bytes vec, so no dedup needed) - for address_type in [ - OutputType::P2PK65, - OutputType::P2PK33, - OutputType::P2PKH, - OutputType::P2SH, - OutputType::P2WPKH, - OutputType::P2WSH, - OutputType::P2TR, - OutputType::P2A, - ] { - for hash in vecs.iter_address_hashes_from(address_type, starting_indexes.height)? { - self.addresstype_to_addresshash_to_addressindex - .get_mut_unwrap(address_type) - .remove(hash); - } - } - } else { - unreachable!(); - } - - if starting_indexes.txindex != TxIndex::ZERO { - let txid_vec_len = vecs.transactions.txid.len(); - let skip_count = starting_indexes.txindex.to_usize(); - let remove_count = txid_vec_len.saturating_sub(skip_count); - tracing::debug!( - "Rollback TXIDs: vec_len={}, skip={}, removing={}", - txid_vec_len, - skip_count, - remove_count - ); - - { - let start = starting_indexes.txindex.to_usize(); - let end = vecs.transactions.txid.len(); - let mut current_index = start; - vecs.transactions.txid.for_each_range(start, end, |txid| { - let txindex = TxIndex::from(current_index); - let txidprefix = TxidPrefix::from(&txid); - - let is_known_dup = - DUPLICATE_TXID_PREFIXES - .iter() - .any(|(dup_prefix, dup_txindex)| { - txindex == *dup_txindex && txidprefix == *dup_prefix - }); - - if !is_known_dup { - self.txidprefix_to_txindex.remove(txidprefix); - } - current_index += 1; - }); - } - - // Clear caches to prevent stale reads after rollback - self.txidprefix_to_txindex.clear_caches(); - } else { - unreachable!(); - } - - if starting_indexes.txoutindex != TxOutIndex::ZERO { - let txindex_to_first_txoutindex_reader = vecs.transactions.first_txoutindex.reader(); - let txoutindex_to_outputtype_reader = vecs.outputs.outputtype.reader(); - let txoutindex_to_typeindex_reader = vecs.outputs.typeindex.reader(); - - // Collect unique (addresstype, addressindex, txindex) to avoid double deletion - // when same address receives multiple outputs in same transaction - let mut addressindex_txindex_to_remove: FxHashSet<(OutputType, TypeIndex, TxIndex)> = - FxHashSet::default(); - - let rollback_start = starting_indexes.txoutindex.to_usize(); - let rollback_end = vecs.outputs.outputtype.len(); - - // Pre-collect PcoVec range to avoid per-element page decompression - let txindexes: Vec = - vecs.outputs.txindex.collect_range(rollback_start, rollback_end); - - for (i, txoutindex) in (rollback_start..rollback_end).enumerate() { - let outputtype = txoutindex_to_outputtype_reader.get(txoutindex); - if !outputtype.is_address() { - continue; - } - - let addresstype = outputtype; - let addressindex = txoutindex_to_typeindex_reader.get(txoutindex); - let txindex = txindexes[i]; - - addressindex_txindex_to_remove.insert((addresstype, addressindex, txindex)); - - let vout = Vout::from( - txoutindex - - txindex_to_first_txoutindex_reader - .get(txindex.to_usize()) - .to_usize(), - ); - let outpoint = OutPoint::new(txindex, vout); - - // OutPoints are unique per output, no dedup needed - self.addresstype_to_addressindex_and_unspentoutpoint - .get_mut_unwrap(addresstype) - .remove(AddressIndexOutPoint::from((addressindex, outpoint))); - } - - // Don't remove yet - merge with second loop's set first - - // Collect outputs that were spent after the rollback point - // We need to: 1) reset their spend status, 2) restore address stores - let start = starting_indexes.txinindex.to_usize(); - let end = vecs.inputs.outpoint.len(); - let outpoints: Vec = vecs.inputs.outpoint.collect_range(start, end); - let spending_txindexes: Vec = vecs.inputs.txindex.collect_range(start, end); - - let outputs_to_unspend: Vec<_> = outpoints - .into_iter() - .zip(spending_txindexes) - .filter_map(|(outpoint, spending_txindex)| { - if outpoint.is_coinbase() { - return None; - } - - let output_txindex = outpoint.txindex(); - let vout = outpoint.vout(); - - // Calculate txoutindex from output's txindex and vout - let txoutindex = - txindex_to_first_txoutindex_reader.get(output_txindex.to_usize()) + vout; - - // Only process if this output was created before the rollback point - if txoutindex < starting_indexes.txoutindex { - let outputtype = - txoutindex_to_outputtype_reader.get(txoutindex.to_usize()); - let typeindex = txoutindex_to_typeindex_reader.get(txoutindex.to_usize()); - - Some((outpoint, outputtype, typeindex, spending_txindex)) - } else { - None - } - }) - .collect(); - - // Now process the collected outputs (iterators dropped, can mutate vecs) - // Add spending tx entries to the same set (avoid double deletion when same tx - // both creates output to address A and spends output from address A) - for (outpoint, outputtype, typeindex, spending_txindex) in outputs_to_unspend { - // Restore address stores if this is an address output - if outputtype.is_address() { - let addresstype = outputtype; - let addressindex = typeindex; - - // Add to same set as first loop - addressindex_txindex_to_remove.insert(( - addresstype, - addressindex, - spending_txindex, - )); - - // OutPoints are unique, no dedup needed for insert - self.addresstype_to_addressindex_and_unspentoutpoint - .get_mut_unwrap(addresstype) - .insert(AddressIndexOutPoint::from((addressindex, outpoint)), Unit); - } - } - - // Now remove all deduplicated addressindex_txindex entries (from both loops) - for (addresstype, addressindex, txindex) in addressindex_txindex_to_remove { - self.addresstype_to_addressindex_and_txindex - .get_mut_unwrap(addresstype) - .remove(AddressIndexTxIndex::from((addressindex, txindex))); - } - } else { - unreachable!(); - } - - // Force-lower the height on all stores before committing. - // This is necessary because commit() only updates the height if needed, - // but during rollback we must lower it even if it's already higher. - let rollback_height = starting_indexes.height.decremented().unwrap_or_default(); - self.par_iter_any_mut() - .try_for_each(|store| store.export_meta(rollback_height))?; - - self.commit(rollback_height)?; - Ok(()) } + fn rollback_txids(&mut self, vecs: &mut Vecs, starting_indexes: &Indexes) { + let start = starting_indexes.txindex.to_usize(); + let end = vecs.transactions.txid.len(); + let mut current_index = start; + vecs.transactions.txid.for_each_range(start, end, |txid| { + let txindex = TxIndex::from(current_index); + let txidprefix = TxidPrefix::from(&txid); + + let is_known_dup = DUPLICATE_TXID_PREFIXES + .iter() + .any(|(dup_prefix, dup_txindex)| { + txindex == *dup_txindex && txidprefix == *dup_prefix + }); + + if !is_known_dup { + self.txidprefix_to_txindex.remove(txidprefix); + } + current_index += 1; + }); + + self.txidprefix_to_txindex.clear_caches(); + } + + fn rollback_outputs_and_inputs(&mut self, vecs: &mut Vecs, starting_indexes: &Indexes) { + let txindex_to_first_txoutindex_reader = vecs.transactions.first_txoutindex.reader(); + let txoutindex_to_outputtype_reader = vecs.outputs.outputtype.reader(); + let txoutindex_to_typeindex_reader = vecs.outputs.typeindex.reader(); + + let mut addressindex_txindex_to_remove: FxHashSet<(OutputType, TypeIndex, TxIndex)> = + FxHashSet::default(); + + let rollback_start = starting_indexes.txoutindex.to_usize(); + let rollback_end = vecs.outputs.outputtype.len(); + + let txindexes: Vec = + vecs.outputs.txindex.collect_range(rollback_start, rollback_end); + + for (i, txoutindex) in (rollback_start..rollback_end).enumerate() { + let outputtype = txoutindex_to_outputtype_reader.get(txoutindex); + if !outputtype.is_address() { + continue; + } + + let addresstype = outputtype; + let addressindex = txoutindex_to_typeindex_reader.get(txoutindex); + let txindex = txindexes[i]; + + addressindex_txindex_to_remove.insert((addresstype, addressindex, txindex)); + + let vout = Vout::from( + txoutindex + - txindex_to_first_txoutindex_reader + .get(txindex.to_usize()) + .to_usize(), + ); + let outpoint = OutPoint::new(txindex, vout); + + self.addresstype_to_addressindex_and_unspentoutpoint + .get_mut_unwrap(addresstype) + .remove(AddressIndexOutPoint::from((addressindex, outpoint))); + } + + let start = starting_indexes.txinindex.to_usize(); + let end = vecs.inputs.outpoint.len(); + let outpoints: Vec = vecs.inputs.outpoint.collect_range(start, end); + let spending_txindexes: Vec = vecs.inputs.txindex.collect_range(start, end); + + let outputs_to_unspend: Vec<_> = outpoints + .into_iter() + .zip(spending_txindexes) + .filter_map(|(outpoint, spending_txindex)| { + if outpoint.is_coinbase() { + return None; + } + + let output_txindex = outpoint.txindex(); + let vout = outpoint.vout(); + let txoutindex = + txindex_to_first_txoutindex_reader.get(output_txindex.to_usize()) + vout; + + if txoutindex < starting_indexes.txoutindex { + let outputtype = txoutindex_to_outputtype_reader.get(txoutindex.to_usize()); + let typeindex = txoutindex_to_typeindex_reader.get(txoutindex.to_usize()); + Some((outpoint, outputtype, typeindex, spending_txindex)) + } else { + None + } + }) + .collect(); + + for (outpoint, outputtype, typeindex, spending_txindex) in outputs_to_unspend { + if outputtype.is_address() { + let addresstype = outputtype; + let addressindex = typeindex; + + addressindex_txindex_to_remove.insert(( + addresstype, + addressindex, + spending_txindex, + )); + + self.addresstype_to_addressindex_and_unspentoutpoint + .get_mut_unwrap(addresstype) + .insert(AddressIndexOutPoint::from((addressindex, outpoint)), Unit); + } + } + + for (addresstype, addressindex, txindex) in addressindex_txindex_to_remove { + self.addresstype_to_addressindex_and_txindex + .get_mut_unwrap(addresstype) + .remove(AddressIndexTxIndex::from((addressindex, txindex))); + } + } + pub fn reset(&mut self) -> Result<()> { info!("Resetting stores..."); diff --git a/crates/brk_indexer/src/vecs/addresses.rs b/crates/brk_indexer/src/vecs/addresses.rs index eecf0b95b..7b89d2237 100644 --- a/crates/brk_indexer/src/vecs/addresses.rs +++ b/crates/brk_indexer/src/vecs/addresses.rs @@ -12,7 +12,7 @@ use vecdb::{ Stamp, VecIndex, }; -use crate::AddressReaders; +use crate::readers::AddressReaders; use crate::parallel_import; #[derive(Clone, Traversable)] diff --git a/crates/brk_indexer/src/vecs/mod.rs b/crates/brk_indexer/src/vecs/mod.rs index 46b4f060f..6d8597e4e 100644 --- a/crates/brk_indexer/src/vecs/mod.rs +++ b/crates/brk_indexer/src/vecs/mod.rs @@ -2,12 +2,10 @@ use std::path::Path; use brk_error::Result; use brk_traversable::Traversable; -use brk_types::{AddressBytes, AddressHash, Height, OutputType, TypeIndex, Version}; +use brk_types::{AddressHash, Height, OutputType, Version}; use rayon::prelude::*; use vecdb::{AnyStoredVec, Database, Stamp}; -use crate::AddressReaders; - const PAGE_SIZE: usize = 4096; use crate::parallel_import; @@ -47,46 +45,14 @@ impl Vecs { tracing::debug!("Setting min len..."); db.set_min_len(PAGE_SIZE * 50_000_000)?; - tracing::debug!("Importing sub-vecs in parallel..."); let (blocks, transactions, inputs, outputs, addresses, scripts) = parallel_import! { - blocks = { - tracing::debug!("Importing BlocksVecs..."); - let r = BlocksVecs::forced_import(&db, version); - tracing::debug!("BlocksVecs imported."); - r - }, - transactions = { - tracing::debug!("Importing TransactionsVecs..."); - let r = TransactionsVecs::forced_import(&db, version); - tracing::debug!("TransactionsVecs imported."); - r - }, - inputs = { - tracing::debug!("Importing InputsVecs..."); - let r = InputsVecs::forced_import(&db, version); - tracing::debug!("InputsVecs imported."); - r - }, - outputs = { - tracing::debug!("Importing OutputsVecs..."); - let r = OutputsVecs::forced_import(&db, version); - tracing::debug!("OutputsVecs imported."); - r - }, - addresses = { - tracing::debug!("Importing AddressesVecs..."); - let r = AddressesVecs::forced_import(&db, version); - tracing::debug!("AddressesVecs imported."); - r - }, - scripts = { - tracing::debug!("Importing ScriptsVecs..."); - let r = ScriptsVecs::forced_import(&db, version); - tracing::debug!("ScriptsVecs imported."); - r - }, + blocks = BlocksVecs::forced_import(&db, version), + transactions = TransactionsVecs::forced_import(&db, version), + inputs = InputsVecs::forced_import(&db, version), + outputs = OutputsVecs::forced_import(&db, version), + addresses = AddressesVecs::forced_import(&db, version), + scripts = ScriptsVecs::forced_import(&db, version), }; - tracing::debug!("Sub-vecs imported."); let this = Self { db, @@ -148,20 +114,6 @@ impl Vecs { Ok(()) } - pub fn get_addressbytes_by_type( - &self, - addresstype: OutputType, - typeindex: TypeIndex, - readers: &AddressReaders, - ) -> Option { - self.addresses - .get_bytes_by_type(addresstype, typeindex, readers) - } - - pub fn push_bytes_if_needed(&mut self, index: TypeIndex, bytes: AddressBytes) -> Result<()> { - self.addresses.push_bytes_if_needed(index, bytes) - } - pub fn flush(&mut self, height: Height) -> Result<()> { self.par_iter_mut_any_stored_vec() .try_for_each(|vec| vec.stamped_write(Stamp::from(height)))?; diff --git a/crates/brk_oracle/Cargo.toml b/crates/brk_oracle/Cargo.toml index b202f8380..08d719575 100644 --- a/crates/brk_oracle/Cargo.toml +++ b/crates/brk_oracle/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true license.workspace = true homepage.workspace = true repository.workspace = true +exclude = ["examples/"] [dependencies] brk_types = { workspace = true } diff --git a/crates/brk_oracle/README.md b/crates/brk_oracle/README.md index 3345c138f..055762380 100644 --- a/crates/brk_oracle/README.md +++ b/crates/brk_oracle/README.md @@ -143,7 +143,7 @@ All parameters via `Config` with sensible defaults: ## Accuracy -Tested over 361,245 blocks (heights 575,000 to 936,244) against exchange OHLC data. Error is measured per block as distance from the oracle estimate to the exchange high/low range at that height. If the oracle falls within the range, the error is zero. +Tested over 361,245 blocks (heights 575,000 to 936,244, as of February 2026) against exchange OHLC data. Error is measured per block as distance from the oracle estimate to the exchange high/low range at that height. If the oracle falls within the range, the error is zero. ### Per-block diff --git a/crates/brk_oracle/examples/noise.rs b/crates/brk_oracle/examples/noise.rs new file mode 100644 index 000000000..6f06163dc --- /dev/null +++ b/crates/brk_oracle/examples/noise.rs @@ -0,0 +1,283 @@ +//! Diagnostic: sweep oracle start heights and clamp-top-N strategies. +//! +//! Run with: cargo run -p brk_oracle --example noise --release + +use std::path::PathBuf; +use std::time::Instant; + +use brk_indexer::Indexer; +use brk_oracle::{Config, NUM_BINS, Oracle, PRICES, cents_to_bin, sats_to_bin}; +use brk_types::{Sats, TxIndex, TxOutIndex}; +use vecdb::{AnyVec, ReadableVec, VecIndex}; + +const BINS_5PCT: f64 = 4.24; +const BINS_10PCT: f64 = 8.28; +const BINS_20PCT: f64 = 15.84; +const BPD: f64 = 200.0; + +fn bins_to_pct(bins: f64) -> f64 { + (10.0_f64.powf(bins / BPD) - 1.0) * 100.0 +} + +fn price_seed_bin(start_height: usize) -> f64 { + let price: f64 = PRICES + .lines() + .nth(start_height - 1) + .expect("prices.txt too short") + .parse() + .expect("Failed to parse seed price"); + cents_to_bin(price * 100.0) +} + +/// Clamp the top N bins in `src` down to the (N+1)th highest value, writing into `dst`. +fn clamp_top_n(src: &[u32; NUM_BINS], dst: &mut [u32; NUM_BINS], n: usize) { + // Find the (n+1)th largest value. + // Collect non-zero counts, sort descending, take the (n+1)th. + let mut top: Vec = src.iter().copied().filter(|&v| v > 0).collect(); + top.sort_unstable_by(|a, b| b.cmp(a)); + let clamp_to = if top.len() > n { top[n] } else { 0 }; + + for (i, &v) in src.iter().enumerate() { + dst[i] = v.min(clamp_to.max(v.min(clamp_to))); + } +} + +fn main() { + let t0 = Instant::now(); + + let data_dir = std::env::var("BRK_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| { + let home = std::env::var("HOME").unwrap(); + PathBuf::from(home).join(".brk") + }); + + let indexer = Indexer::forced_import(&data_dir).expect("Failed to load indexer"); + let total_heights = indexer.vecs.blocks.timestamp.len(); + + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + + let height_ohlc: Vec<[f64; 4]> = serde_json::from_str( + &std::fs::read_to_string(format!("{manifest_dir}/examples/height_price_ohlc.json")) + .expect("Failed to read height_price_ohlc.json"), + ) + .expect("Failed to parse height OHLC"); + + let height_bands: Vec<(f64, f64)> = height_ohlc + .iter() + .map(|ohlc| { + let high = ohlc[1]; + let low = ohlc[2]; + if high > 0.0 && low > 0.0 { + (cents_to_bin(high * 100.0), cents_to_bin(low * 100.0)) + } else { + (0.0, 0.0) + } + }) + .collect(); + + // Start heights: 630k, 600k, 575k, then 570k down to 500k by 5k. + let mut start_heights: Vec = vec![630_000, 600_000, 575_000]; + let mut h = 570_000; + while h >= 500_000 { + start_heights.push(h); + h -= 5_000; + } + let lowest = *start_heights.iter().min().unwrap(); + + // Clamp-top-N values to test: 0 (no clamp), 2, 3, 5, 10. + let clamp_values: Vec = vec![0, 2, 3, 5, 10]; + + // Build per-block RAW histograms from the lowest start height. + eprintln!("Building histograms from height {}...", lowest); + + let total_txs = indexer.vecs.transactions.height.len(); + let total_outputs = indexer.vecs.outputs.value.len(); + + let first_txoutindex_reader = indexer.vecs.transactions.first_txoutindex.reader(); + let value_reader = indexer.vecs.outputs.value.reader(); + let outputtype_reader = indexer.vecs.outputs.outputtype.reader(); + + let config = Config::default(); + let total_blocks = total_heights - lowest; + + struct BlockData { + hist: Box<[u32; NUM_BINS]>, + high_bin: f64, + low_bin: f64, + } + + let mut blocks: Vec = Vec::with_capacity(total_blocks); + + for h in lowest..total_heights { + let first_txindex: TxIndex = indexer + .vecs + .transactions + .first_txindex + .collect_one(h) + .unwrap(); + let next_first_txindex: TxIndex = indexer + .vecs + .transactions + .first_txindex + .collect_one(h + 1) + .unwrap_or(TxIndex::from(total_txs)); + + let out_start = if first_txindex.to_usize() + 1 < next_first_txindex.to_usize() { + first_txoutindex_reader + .get(first_txindex.to_usize() + 1) + .to_usize() + } else { + indexer + .vecs + .outputs + .first_txoutindex + .collect_one(h + 1) + .unwrap_or(TxOutIndex::from(total_outputs)) + .to_usize() + }; + let out_end: usize = indexer + .vecs + .outputs + .first_txoutindex + .collect_one(h + 1) + .unwrap_or(TxOutIndex::from(total_outputs)) + .to_usize(); + + let mut hist = Box::new([0u32; NUM_BINS]); + for i in out_start..out_end { + let sats: Sats = value_reader.get(i); + let output_type = outputtype_reader.get(i); + if config.excluded_output_types.contains(&output_type) { + continue; + } + if *sats < config.min_sats { + continue; + } + if config.exclude_common_round_values && sats.is_common_round_value() { + continue; + } + if let Some(bin) = sats_to_bin(sats) { + hist[bin] += 1; + } + } + + let (high_bin, low_bin) = if h < height_bands.len() { + height_bands[h] + } else { + (0.0, 0.0) + }; + + blocks.push(BlockData { + hist, + high_bin, + low_bin, + }); + + if (h - lowest) % 50_000 == 0 { + eprint!( + "\r {}/{} ({:.0}%)", + h - lowest, + total_blocks, + (h - lowest) as f64 / total_blocks as f64 * 100.0 + ); + } + } + + eprintln!( + "\r {} blocks built in {:.1}s", + blocks.len(), + t0.elapsed().as_secs_f64() + ); + + // For each clamp value, run all start heights. + for &clamp_n in &clamp_values { + println!(); + let label = if clamp_n == 0 { + "no clamp".to_string() + } else { + format!("clamp top {}", clamp_n) + }; + println!("=== {} ===", label); + println!( + "{:>8} {:>8} {:>8} {:>8} {:>6} {:>6} {:>6} {:>8}", + "Start", "Blocks", "RMSE%", "Worst%", ">5%", ">10%", ">20%", "Worst@" + ); + println!("{}", "-".repeat(72)); + + for &start_height in &start_heights { + let mut oracle = Oracle::new(price_seed_bin(start_height), config.clone()); + let block_offset = start_height - lowest; + + let mut worst_err: f64 = 0.0; + let mut worst_height: usize = 0; + let mut gt_5: u64 = 0; + let mut gt_10: u64 = 0; + let mut gt_20: u64 = 0; + let mut total_sq_err: f64 = 0.0; + let mut total_measured: u64 = 0; + + let mut clamped_hist = [0u32; NUM_BINS]; + for (i, bd) in blocks[block_offset..].iter().enumerate() { + if clamp_n > 0 { + clamp_top_n(&bd.hist, &mut clamped_hist, clamp_n); + oracle.process_histogram(&clamped_hist); + } else { + oracle.process_histogram(&bd.hist); + } + + let height = start_height + i; + let ref_bin = oracle.ref_bin(); + + if bd.high_bin <= 0.0 || bd.low_bin <= 0.0 { + continue; + } + + let err = if ref_bin < bd.high_bin { + ref_bin - bd.high_bin + } else if ref_bin > bd.low_bin { + ref_bin - bd.low_bin + } else { + 0.0 + }; + + total_measured += 1; + total_sq_err += err * err; + let abs_err = err.abs(); + if abs_err > BINS_5PCT { + gt_5 += 1; + } + if abs_err > BINS_10PCT { + gt_10 += 1; + } + if abs_err > BINS_20PCT { + gt_20 += 1; + } + if abs_err > worst_err { + worst_err = abs_err; + worst_height = height; + } + } + + let rmse = if total_measured > 0 { + bins_to_pct((total_sq_err / total_measured as f64).sqrt()) + } else { + 0.0 + }; + + println!( + "{:>8} {:>8} {:>7.3}% {:>7.1}% {:>6} {:>6} {:>6} {}", + format!("{}k", start_height / 1000), + total_measured, + rmse, + bins_to_pct(worst_err), + gt_5, + gt_10, + gt_20, + worst_height, + ); + } + } + + println!("\nTotal time: {:.1}s", t0.elapsed().as_secs_f64()); +} diff --git a/crates/brk_oracle/src/lib.rs b/crates/brk_oracle/src/lib.rs index db7a81115..5e2374621 100644 --- a/crates/brk_oracle/src/lib.rs +++ b/crates/brk_oracle/src/lib.rs @@ -3,13 +3,13 @@ //! Detects round-dollar transaction patterns ($1, $5, $10, ... $10,000) in Bitcoin //! block outputs to derive the current price without any exchange data. -use brk_types::{Block, CentsUnsigned, Dollars, OutputType, Sats}; +use brk_types::{Block, Cents, Dollars, OutputType, Sats}; /// Pre-oracle dollar prices, one per line, heights 0..630_000. pub const PRICES: &str = include_str!("prices.txt"); /// First height where the oracle computes from on-chain data. -pub const START_HEIGHT: usize = 575_000; +pub const START_HEIGHT: usize = 550_000; pub const BINS_PER_DECADE: usize = 200; const MIN_LOG_BTC: i32 = -8; @@ -120,8 +120,16 @@ fn find_best_bin( // Parabolic sub-bin interpolation for fractional precision. let score_center = best_score; - let score_left = if best_bin > search_start { score(best_bin - 1) } else { score_center }; - let score_right = if best_bin + 1 < search_end { score(best_bin + 1) } else { score_center }; + let score_left = if best_bin > search_start { + score(best_bin - 1) + } else { + score_center + }; + let score_right = if best_bin + 1 < search_end { + score(best_bin + 1) + } else { + score_center + }; let denom = score_left - 2.0 * score_center + score_right; let sub_bin = if denom.abs() > 1e-10 { (0.5 * (score_left - score_right) / denom).clamp(-0.5, 0.5) @@ -207,7 +215,12 @@ impl Oracle { .iter() .skip(1) // skip coinbase .flat_map(|tx| &tx.output) - .map(|txout| (Sats::from(txout.value), OutputType::from(&txout.script_pubkey))), + .map(|txout| { + ( + Sats::from(txout.value), + OutputType::from(&txout.script_pubkey), + ) + }), ) } @@ -242,7 +255,7 @@ impl Oracle { self.ref_bin } - pub fn price_cents(&self) -> CentsUnsigned { + pub fn price_cents(&self) -> Cents { bin_to_cents(self.ref_bin).into() } @@ -291,13 +304,12 @@ impl Oracle { fn recompute_ema(&mut self) { self.ema.fill(0.0); for age in 0..self.filled { - let idx = - (self.cursor + self.config.window_size - 1 - age) % self.config.window_size; + let idx = (self.cursor + self.config.window_size - 1 - age) % self.config.window_size; let weight = self.weights[age]; let h = &self.histograms[idx]; - for bin in 0..NUM_BINS { + (0..NUM_BINS).for_each(|bin| { self.ema[bin] += weight * h[bin] as f64; - } + }); } } }