global: snapshot

This commit is contained in:
nym21
2026-02-19 19:19:35 +01:00
parent f559e4027e
commit 2128aab6ca
16 changed files with 534 additions and 349 deletions

View File

@@ -36,8 +36,6 @@ fn main() -> color_eyre::Result<()> {
let blocks = Blocks::new(&client, &reader); let blocks = Blocks::new(&client, &reader);
debug!("Blocks created."); debug!("Blocks created.");
fs::create_dir_all(&outputs_dir)?;
let mut indexer = Indexer::forced_import(&outputs_dir)?; let mut indexer = Indexer::forced_import(&outputs_dir)?;
debug!("Indexer imported."); debug!("Indexer imported.");

View File

@@ -35,8 +35,6 @@ fn main() -> Result<()> {
let blocks = Blocks::new(&client, &reader); let blocks = Blocks::new(&client, &reader);
fs::create_dir_all(&outputs_dir)?;
let mut indexer = Indexer::forced_import(&outputs_dir)?; let mut indexer = Indexer::forced_import(&outputs_dir)?;
let mut bencher = let mut bencher =

View File

@@ -1,8 +1,8 @@
use std::{fs, path::Path};
use brk_error::Result; use brk_error::Result;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use vecdb::ReadableVec; use vecdb::ReadableVec;
// use brk_types::Sats;
use std::{fs, path::Path};
fn main() -> Result<()> { fn main() -> Result<()> {
brk_logger::init(Some(Path::new(".log")))?; brk_logger::init(Some(Path::new(".log")))?;
@@ -12,17 +12,7 @@ fn main() -> Result<()> {
let indexer = Indexer::forced_import(&outputs_dir)?; let indexer = Indexer::forced_import(&outputs_dir)?;
// let mut sum = Sats::ZERO; println!("{:?}", indexer.vecs.outputs.value.collect_range(0, 200));
// let mut count: usize = 0;
// for value in indexer.vecs.txoutindex_to_value.clean_iter() {
// sum += value;
// count += 1;
// }
// println!("sum = {sum}, count = {count}");
dbg!(indexer.vecs.outputs.value.collect_range(0, 200));
Ok(()) Ok(())
} }

View File

@@ -31,7 +31,7 @@ fn main() -> Result<()> {
println!("Loading indexer from: {}", outputs_dir.display()); println!("Loading indexer from: {}", outputs_dir.display());
let indexer = Indexer::forced_import(&outputs_dir)?; let indexer = Indexer::forced_import(&outputs_dir)?;
println!("Indexer funded\n"); println!("Indexer loaded.\n");
// Warmup run // Warmup run
println!("🔥 Warmup run..."); println!("🔥 Warmup run...");

View File

@@ -1,12 +1,10 @@
use brk_error::Result; use brk_error::Result;
use brk_types::Height; use brk_types::{Height, Indexes};
use tracing::{debug, info}; use tracing::{debug, info};
use vecdb::{AnyStoredVec, WritableVec, PcoVec, PcoVecValue, ReadableVec, VecIndex, VecValue}; use vecdb::{AnyStoredVec, WritableVec, PcoVec, PcoVecValue, ReadableVec, VecIndex, VecValue};
use crate::{Stores, Vecs}; use crate::{Stores, Vecs};
pub use brk_types::Indexes;
/// Extension trait for Indexes with brk_indexer-specific functionality. /// Extension trait for Indexes with brk_indexer-specific functionality.
pub trait IndexesExt { pub trait IndexesExt {
fn checked_push(&self, vecs: &mut Vecs) -> Result<()>; fn checked_push(&self, vecs: &mut Vecs) -> Result<()>;
@@ -93,98 +91,85 @@ impl IndexesExt for Indexes {
&vecs.scripts.first_emptyoutputindex, &vecs.scripts.first_emptyoutputindex,
&vecs.scripts.empty_to_txindex, &vecs.scripts.empty_to_txindex,
starting_height, starting_height,
) )?;
.unwrap();
let p2msoutputindex = starting_index( let p2msoutputindex = starting_index(
&vecs.scripts.first_p2msoutputindex, &vecs.scripts.first_p2msoutputindex,
&vecs.scripts.p2ms_to_txindex, &vecs.scripts.p2ms_to_txindex,
starting_height, starting_height,
) )?;
.unwrap();
let opreturnindex = starting_index( let opreturnindex = starting_index(
&vecs.scripts.first_opreturnindex, &vecs.scripts.first_opreturnindex,
&vecs.scripts.opreturn_to_txindex, &vecs.scripts.opreturn_to_txindex,
starting_height, starting_height,
) )?;
.unwrap();
let p2pk33addressindex = starting_index( let p2pk33addressindex = starting_index(
&vecs.addresses.first_p2pk33addressindex, &vecs.addresses.first_p2pk33addressindex,
&vecs.addresses.p2pk33bytes, &vecs.addresses.p2pk33bytes,
starting_height, starting_height,
) )?;
.unwrap();
let p2pk65addressindex = starting_index( let p2pk65addressindex = starting_index(
&vecs.addresses.first_p2pk65addressindex, &vecs.addresses.first_p2pk65addressindex,
&vecs.addresses.p2pk65bytes, &vecs.addresses.p2pk65bytes,
starting_height, starting_height,
) )?;
.unwrap();
let p2pkhaddressindex = starting_index( let p2pkhaddressindex = starting_index(
&vecs.addresses.first_p2pkhaddressindex, &vecs.addresses.first_p2pkhaddressindex,
&vecs.addresses.p2pkhbytes, &vecs.addresses.p2pkhbytes,
starting_height, starting_height,
) )?;
.unwrap();
let p2shaddressindex = starting_index( let p2shaddressindex = starting_index(
&vecs.addresses.first_p2shaddressindex, &vecs.addresses.first_p2shaddressindex,
&vecs.addresses.p2shbytes, &vecs.addresses.p2shbytes,
starting_height, starting_height,
) )?;
.unwrap();
let p2traddressindex = starting_index( let p2traddressindex = starting_index(
&vecs.addresses.first_p2traddressindex, &vecs.addresses.first_p2traddressindex,
&vecs.addresses.p2trbytes, &vecs.addresses.p2trbytes,
starting_height, starting_height,
) )?;
.unwrap();
let p2wpkhaddressindex = starting_index( let p2wpkhaddressindex = starting_index(
&vecs.addresses.first_p2wpkhaddressindex, &vecs.addresses.first_p2wpkhaddressindex,
&vecs.addresses.p2wpkhbytes, &vecs.addresses.p2wpkhbytes,
starting_height, starting_height,
) )?;
.unwrap();
let p2wshaddressindex = starting_index( let p2wshaddressindex = starting_index(
&vecs.addresses.first_p2wshaddressindex, &vecs.addresses.first_p2wshaddressindex,
&vecs.addresses.p2wshbytes, &vecs.addresses.p2wshbytes,
starting_height, starting_height,
) )?;
.unwrap();
let p2aaddressindex = starting_index( let p2aaddressindex = starting_index(
&vecs.addresses.first_p2aaddressindex, &vecs.addresses.first_p2aaddressindex,
&vecs.addresses.p2abytes, &vecs.addresses.p2abytes,
starting_height, starting_height,
) )?;
.unwrap();
let txindex = starting_index( let txindex = starting_index(
&vecs.transactions.first_txindex, &vecs.transactions.first_txindex,
&vecs.transactions.txid, &vecs.transactions.txid,
starting_height, starting_height,
) )?;
.unwrap();
let txinindex = let txinindex =
starting_index(&vecs.inputs.first_txinindex, &vecs.inputs.outpoint, starting_height).unwrap(); starting_index(&vecs.inputs.first_txinindex, &vecs.inputs.outpoint, starting_height)?;
let txoutindex = let txoutindex =
starting_index(&vecs.outputs.first_txoutindex, &vecs.outputs.value, starting_height).unwrap(); starting_index(&vecs.outputs.first_txoutindex, &vecs.outputs.value, starting_height)?;
let unknownoutputindex = starting_index( let unknownoutputindex = starting_index(
&vecs.scripts.first_unknownoutputindex, &vecs.scripts.first_unknownoutputindex,
&vecs.scripts.unknown_to_txindex, &vecs.scripts.unknown_to_txindex,
starting_height, starting_height,
) )?;
.unwrap();
Some(Indexes { Some(Indexes {
emptyoutputindex, emptyoutputindex,

View File

@@ -16,10 +16,12 @@ mod stores;
mod vecs; mod vecs;
use constants::*; use constants::*;
pub use indexes::*; use indexes::IndexesExt;
pub use processor::*; use processor::{BlockBuffers, BlockProcessor};
pub use readers::*; use readers::Readers;
pub use stores::*;
pub use brk_types::Indexes;
pub use stores::Stores;
pub use vecs::*; pub use vecs::*;
#[derive(Clone)] #[derive(Clone)]
@@ -136,9 +138,8 @@ impl Indexer {
let mut indexes = starting_indexes.clone(); let mut indexes = starting_indexes.clone();
debug!("Indexes cloned."); debug!("Indexes cloned.");
let is_export_height = |height: Height| -> bool { let is_export_height =
height != 0 && height % SNAPSHOT_BLOCK_RANGE == 0 |height: Height| -> bool { height != 0 && height % SNAPSHOT_BLOCK_RANGE == 0 };
};
let export = move |stores: &mut Stores, vecs: &mut Vecs, height: Height| -> Result<()> { let export = move |stores: &mut Stores, vecs: &mut Vecs, height: Height| -> Result<()> {
info!("Exporting..."); info!("Exporting...");

View File

@@ -2,6 +2,7 @@ use brk_error::{Error, Result};
use brk_store::Store; use brk_store::Store;
use brk_types::{Height, StoredBool, TxIndex, Txid, TxidPrefix}; use brk_types::{Height, StoredBool, TxIndex, Txid, TxidPrefix};
use rayon::prelude::*; use rayon::prelude::*;
use tracing::error;
use vecdb::{AnyVec, WritableVec, likely}; use vecdb::{AnyVec, WritableVec, likely};
use crate::TxMetadataVecs; use crate::TxMetadataVecs;
@@ -69,13 +70,17 @@ impl<'a> BlockProcessor<'a> {
.get_pushed_or_read(prev_txindex, &self.readers.txid) .get_pushed_or_read(prev_txindex, &self.readers.txid)
.ok_or(Error::Internal("Missing txid for txindex")) .ok_or(Error::Internal("Missing txid for txindex"))
.inspect_err(|_| { .inspect_err(|_| {
dbg!(ct.txindex, len); error!(txindex = ?ct.txindex, len, "Missing txid for txindex");
})?; })?;
let is_dup = DUPLICATE_TXIDS.contains(&prev_txid); let is_dup = DUPLICATE_TXIDS.contains(&prev_txid);
if !is_dup { if !is_dup {
dbg!(self.height, ct.txindex, prev_txid, prev_txindex); error!(
height = ?self.height, txindex = ?ct.txindex,
?prev_txid, ?prev_txindex,
"Unexpected TXID collision"
);
return Err(Error::Internal("Unexpected TXID collision")); return Err(Error::Internal("Unexpected TXID collision"));
} }
} }

View File

@@ -7,6 +7,7 @@ use brk_types::{
}; };
use rayon::prelude::*; use rayon::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet}; use rustc_hash::{FxHashMap, FxHashSet};
use tracing::error;
use vecdb::{PcoVec, WritableVec}; use vecdb::{PcoVec, WritableVec};
use super::{BlockProcessor, ComputedTx, InputSource, SameBlockOutputInfo}; use super::{BlockProcessor, ComputedTx, InputSource, SameBlockOutputInfo};
@@ -80,7 +81,7 @@ impl<'a> BlockProcessor<'a> {
let prev_txindex = match store_result { let prev_txindex = match store_result {
Some(txindex) if txindex < self.indexes.txindex => txindex, Some(txindex) if txindex < self.indexes.txindex => txindex,
_ => { _ => {
tracing::error!( error!(
"UnknownTxid: txid={}, prefix={:?}, store_result={:?}, current_txindex={:?}", "UnknownTxid: txid={}, prefix={:?}, store_result={:?}, current_txindex={:?}",
txid, txid_prefix, store_result, self.indexes.txindex txid, txid_prefix, store_result, self.indexes.txindex
); );
@@ -170,7 +171,7 @@ pub(super) fn finalize_inputs(
.remove(&outpoint) .remove(&outpoint)
.ok_or(Error::Internal("Same-block output not found")) .ok_or(Error::Internal("Same-block output not found"))
.inspect_err(|_| { .inspect_err(|_| {
dbg!(&same_block_output_info, outpoint); error!(?outpoint, remaining = same_block_output_info.len(), "Same-block output not found");
})?; })?;
(vin, txindex, outpoint, info.outputtype, info.typeindex) (vin, txindex, outpoint, info.outputtype, info.typeindex)
} }

View File

@@ -6,6 +6,7 @@ use brk_types::{
Sats, TxIndex, TxOutIndex, TypeIndex, Unit, Vout, Sats, TxIndex, TxOutIndex, TypeIndex, Unit, Vout,
}; };
use rayon::prelude::*; use rayon::prelude::*;
use tracing::error;
use rustc_hash::{FxHashMap, FxHashSet}; use rustc_hash::{FxHashMap, FxHashSet};
use vecdb::{BytesVec, WritableVec}; use vecdb::{BytesVec, WritableVec};
@@ -24,7 +25,7 @@ impl<'a> BlockProcessor<'a> {
let mut items = Vec::with_capacity(total_outputs); let mut items = Vec::with_capacity(total_outputs);
for (index, tx) in self.block.txdata.iter().enumerate() { for (index, tx) in self.block.txdata.iter().enumerate() {
for (vout, txout) in tx.output.iter().enumerate() { for (vout, txout) in tx.output.iter().enumerate() {
items.push((TxIndex::from(index), Vout::from(vout), txout, tx)); items.push((TxIndex::from(index), Vout::from(vout), txout));
} }
} }
@@ -32,7 +33,7 @@ impl<'a> BlockProcessor<'a> {
.into_par_iter() .into_par_iter()
.enumerate() .enumerate()
.map( .map(
|(block_txoutindex, (block_txindex, vout, txout, tx))| -> Result<ProcessedOutput> { |(block_txoutindex, (block_txindex, vout, txout))| -> Result<ProcessedOutput> {
let txindex = base_txindex + block_txindex; let txindex = base_txindex + block_txindex;
let txoutindex = base_txoutindex + TxOutIndex::from(block_txoutindex); let txoutindex = base_txoutindex + TxOutIndex::from(block_txoutindex);
@@ -67,7 +68,7 @@ impl<'a> BlockProcessor<'a> {
}); });
if check_collisions && let Some(typeindex) = existing_typeindex { if check_collisions && let Some(typeindex) = existing_typeindex {
let prev_addressbytes = self.vecs.get_addressbytes_by_type( let prev_addressbytes = self.vecs.addresses.get_bytes_by_type(
addresstype, addresstype,
typeindex, typeindex,
&self.readers.addressbytes, &self.readers.addressbytes,
@@ -75,21 +76,12 @@ impl<'a> BlockProcessor<'a> {
.ok_or(Error::Internal("Missing addressbytes"))?; .ok_or(Error::Internal("Missing addressbytes"))?;
if prev_addressbytes != address_bytes { if prev_addressbytes != address_bytes {
let txid = tx.compute_txid(); error!(
dbg!( ?height, ?vout, ?block_txindex, ?addresstype,
height, ?prev_addressbytes, ?address_bytes, ?typeindex,
txid, "Address hash collision"
vout,
block_txindex,
addresstype,
prev_addressbytes,
&address_bytes,
&self.indexes,
typeindex,
txout,
AddressHash::from(&address_bytes),
); );
panic!() return Err(Error::Internal("Address hash collision"));
} }
} }

View File

@@ -33,6 +33,10 @@ pub struct Stores {
impl Stores { impl Stores {
pub fn forced_import(parent: &Path, version: Version) -> Result<Self> { pub fn forced_import(parent: &Path, version: Version) -> Result<Self> {
Self::forced_import_inner(parent, version, true)
}
fn forced_import_inner(parent: &Path, version: Version, can_retry: bool) -> Result<Self> {
let pathbuf = parent.join("stores"); let pathbuf = parent.join("stores");
let path = pathbuf.as_path(); let path = pathbuf.as_path();
@@ -40,10 +44,11 @@ impl Stores {
let database = match brk_store::open_database(path) { let database = match brk_store::open_database(path) {
Ok(database) => database, Ok(database) => database,
Err(_) => { Err(_) if can_retry => {
fs::remove_dir_all(path)?; fs::remove_dir_all(path)?;
return Self::forced_import(parent, version); return Self::forced_import_inner(parent, version, false);
} }
Err(err) => return Err(err.into()),
}; };
let database_ref = &database; let database_ref = &database;
@@ -194,7 +199,28 @@ impl Stores {
vecs: &mut Vecs, vecs: &mut Vecs,
starting_indexes: &Indexes, starting_indexes: &Indexes,
) -> Result<()> { ) -> Result<()> {
if self.blockhashprefix_to_height.is_empty()? if self.is_empty()? {
return Ok(());
}
debug_assert!(starting_indexes.height != Height::ZERO);
debug_assert!(starting_indexes.txindex != TxIndex::ZERO);
debug_assert!(starting_indexes.txoutindex != TxOutIndex::ZERO);
self.rollback_block_metadata(vecs, starting_indexes)?;
self.rollback_txids(vecs, starting_indexes);
self.rollback_outputs_and_inputs(vecs, starting_indexes);
let rollback_height = starting_indexes.height.decremented().unwrap_or_default();
self.par_iter_any_mut()
.try_for_each(|store| store.export_meta(rollback_height))?;
self.commit(rollback_height)?;
Ok(())
}
fn is_empty(&self) -> Result<bool> {
Ok(self.blockhashprefix_to_height.is_empty()?
&& self.txidprefix_to_txindex.is_empty()? && self.txidprefix_to_txindex.is_empty()?
&& self.height_to_coinbase_tag.is_empty()? && self.height_to_coinbase_tag.is_empty()?
&& self && self
@@ -208,213 +234,154 @@ impl Stores {
&& self && self
.addresstype_to_addressindex_and_unspentoutpoint .addresstype_to_addressindex_and_unspentoutpoint
.values() .values()
.try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))? .try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?)
{ }
return Ok(());
fn rollback_block_metadata(
&mut self,
vecs: &mut Vecs,
starting_indexes: &Indexes,
) -> Result<()> {
vecs.blocks.blockhash.for_each_range(
starting_indexes.height.to_usize(),
vecs.blocks.blockhash.len(),
|blockhash| {
self.blockhashprefix_to_height
.remove(BlockHashPrefix::from(blockhash));
},
);
(starting_indexes.height.to_usize()..vecs.blocks.blockhash.len())
.map(Height::from)
.for_each(|h| {
self.height_to_coinbase_tag.remove(h);
});
for address_type in OutputType::ADDRESS_TYPES {
for hash in vecs.iter_address_hashes_from(address_type, starting_indexes.height)? {
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(address_type)
.remove(hash);
}
} }
if starting_indexes.height != Height::ZERO {
vecs.blocks.blockhash.for_each_range(
starting_indexes.height.to_usize(),
vecs.blocks.blockhash.len(),
|blockhash| {
let prefix = BlockHashPrefix::from(blockhash);
self.blockhashprefix_to_height.remove(prefix);
},
);
(starting_indexes.height.to_usize()..vecs.blocks.blockhash.len())
.map(Height::from)
.for_each(|h| {
self.height_to_coinbase_tag.remove(h);
});
// Remove address hashes for all address types starting from rollback height
// (each address only appears once in bytes vec, so no dedup needed)
for address_type in [
OutputType::P2PK65,
OutputType::P2PK33,
OutputType::P2PKH,
OutputType::P2SH,
OutputType::P2WPKH,
OutputType::P2WSH,
OutputType::P2TR,
OutputType::P2A,
] {
for hash in vecs.iter_address_hashes_from(address_type, starting_indexes.height)? {
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(address_type)
.remove(hash);
}
}
} else {
unreachable!();
}
if starting_indexes.txindex != TxIndex::ZERO {
let txid_vec_len = vecs.transactions.txid.len();
let skip_count = starting_indexes.txindex.to_usize();
let remove_count = txid_vec_len.saturating_sub(skip_count);
tracing::debug!(
"Rollback TXIDs: vec_len={}, skip={}, removing={}",
txid_vec_len,
skip_count,
remove_count
);
{
let start = starting_indexes.txindex.to_usize();
let end = vecs.transactions.txid.len();
let mut current_index = start;
vecs.transactions.txid.for_each_range(start, end, |txid| {
let txindex = TxIndex::from(current_index);
let txidprefix = TxidPrefix::from(&txid);
let is_known_dup =
DUPLICATE_TXID_PREFIXES
.iter()
.any(|(dup_prefix, dup_txindex)| {
txindex == *dup_txindex && txidprefix == *dup_prefix
});
if !is_known_dup {
self.txidprefix_to_txindex.remove(txidprefix);
}
current_index += 1;
});
}
// Clear caches to prevent stale reads after rollback
self.txidprefix_to_txindex.clear_caches();
} else {
unreachable!();
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
let txindex_to_first_txoutindex_reader = vecs.transactions.first_txoutindex.reader();
let txoutindex_to_outputtype_reader = vecs.outputs.outputtype.reader();
let txoutindex_to_typeindex_reader = vecs.outputs.typeindex.reader();
// Collect unique (addresstype, addressindex, txindex) to avoid double deletion
// when same address receives multiple outputs in same transaction
let mut addressindex_txindex_to_remove: FxHashSet<(OutputType, TypeIndex, TxIndex)> =
FxHashSet::default();
let rollback_start = starting_indexes.txoutindex.to_usize();
let rollback_end = vecs.outputs.outputtype.len();
// Pre-collect PcoVec range to avoid per-element page decompression
let txindexes: Vec<TxIndex> =
vecs.outputs.txindex.collect_range(rollback_start, rollback_end);
for (i, txoutindex) in (rollback_start..rollback_end).enumerate() {
let outputtype = txoutindex_to_outputtype_reader.get(txoutindex);
if !outputtype.is_address() {
continue;
}
let addresstype = outputtype;
let addressindex = txoutindex_to_typeindex_reader.get(txoutindex);
let txindex = txindexes[i];
addressindex_txindex_to_remove.insert((addresstype, addressindex, txindex));
let vout = Vout::from(
txoutindex
- txindex_to_first_txoutindex_reader
.get(txindex.to_usize())
.to_usize(),
);
let outpoint = OutPoint::new(txindex, vout);
// OutPoints are unique per output, no dedup needed
self.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.remove(AddressIndexOutPoint::from((addressindex, outpoint)));
}
// Don't remove yet - merge with second loop's set first
// Collect outputs that were spent after the rollback point
// We need to: 1) reset their spend status, 2) restore address stores
let start = starting_indexes.txinindex.to_usize();
let end = vecs.inputs.outpoint.len();
let outpoints: Vec<OutPoint> = vecs.inputs.outpoint.collect_range(start, end);
let spending_txindexes: Vec<TxIndex> = vecs.inputs.txindex.collect_range(start, end);
let outputs_to_unspend: Vec<_> = outpoints
.into_iter()
.zip(spending_txindexes)
.filter_map(|(outpoint, spending_txindex)| {
if outpoint.is_coinbase() {
return None;
}
let output_txindex = outpoint.txindex();
let vout = outpoint.vout();
// Calculate txoutindex from output's txindex and vout
let txoutindex =
txindex_to_first_txoutindex_reader.get(output_txindex.to_usize()) + vout;
// Only process if this output was created before the rollback point
if txoutindex < starting_indexes.txoutindex {
let outputtype =
txoutindex_to_outputtype_reader.get(txoutindex.to_usize());
let typeindex = txoutindex_to_typeindex_reader.get(txoutindex.to_usize());
Some((outpoint, outputtype, typeindex, spending_txindex))
} else {
None
}
})
.collect();
// Now process the collected outputs (iterators dropped, can mutate vecs)
// Add spending tx entries to the same set (avoid double deletion when same tx
// both creates output to address A and spends output from address A)
for (outpoint, outputtype, typeindex, spending_txindex) in outputs_to_unspend {
// Restore address stores if this is an address output
if outputtype.is_address() {
let addresstype = outputtype;
let addressindex = typeindex;
// Add to same set as first loop
addressindex_txindex_to_remove.insert((
addresstype,
addressindex,
spending_txindex,
));
// OutPoints are unique, no dedup needed for insert
self.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.insert(AddressIndexOutPoint::from((addressindex, outpoint)), Unit);
}
}
// Now remove all deduplicated addressindex_txindex entries (from both loops)
for (addresstype, addressindex, txindex) in addressindex_txindex_to_remove {
self.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.remove(AddressIndexTxIndex::from((addressindex, txindex)));
}
} else {
unreachable!();
}
// Force-lower the height on all stores before committing.
// This is necessary because commit() only updates the height if needed,
// but during rollback we must lower it even if it's already higher.
let rollback_height = starting_indexes.height.decremented().unwrap_or_default();
self.par_iter_any_mut()
.try_for_each(|store| store.export_meta(rollback_height))?;
self.commit(rollback_height)?;
Ok(()) Ok(())
} }
fn rollback_txids(&mut self, vecs: &mut Vecs, starting_indexes: &Indexes) {
let start = starting_indexes.txindex.to_usize();
let end = vecs.transactions.txid.len();
let mut current_index = start;
vecs.transactions.txid.for_each_range(start, end, |txid| {
let txindex = TxIndex::from(current_index);
let txidprefix = TxidPrefix::from(&txid);
let is_known_dup = DUPLICATE_TXID_PREFIXES
.iter()
.any(|(dup_prefix, dup_txindex)| {
txindex == *dup_txindex && txidprefix == *dup_prefix
});
if !is_known_dup {
self.txidprefix_to_txindex.remove(txidprefix);
}
current_index += 1;
});
self.txidprefix_to_txindex.clear_caches();
}
fn rollback_outputs_and_inputs(&mut self, vecs: &mut Vecs, starting_indexes: &Indexes) {
let txindex_to_first_txoutindex_reader = vecs.transactions.first_txoutindex.reader();
let txoutindex_to_outputtype_reader = vecs.outputs.outputtype.reader();
let txoutindex_to_typeindex_reader = vecs.outputs.typeindex.reader();
let mut addressindex_txindex_to_remove: FxHashSet<(OutputType, TypeIndex, TxIndex)> =
FxHashSet::default();
let rollback_start = starting_indexes.txoutindex.to_usize();
let rollback_end = vecs.outputs.outputtype.len();
let txindexes: Vec<TxIndex> =
vecs.outputs.txindex.collect_range(rollback_start, rollback_end);
for (i, txoutindex) in (rollback_start..rollback_end).enumerate() {
let outputtype = txoutindex_to_outputtype_reader.get(txoutindex);
if !outputtype.is_address() {
continue;
}
let addresstype = outputtype;
let addressindex = txoutindex_to_typeindex_reader.get(txoutindex);
let txindex = txindexes[i];
addressindex_txindex_to_remove.insert((addresstype, addressindex, txindex));
let vout = Vout::from(
txoutindex
- txindex_to_first_txoutindex_reader
.get(txindex.to_usize())
.to_usize(),
);
let outpoint = OutPoint::new(txindex, vout);
self.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.remove(AddressIndexOutPoint::from((addressindex, outpoint)));
}
let start = starting_indexes.txinindex.to_usize();
let end = vecs.inputs.outpoint.len();
let outpoints: Vec<OutPoint> = vecs.inputs.outpoint.collect_range(start, end);
let spending_txindexes: Vec<TxIndex> = vecs.inputs.txindex.collect_range(start, end);
let outputs_to_unspend: Vec<_> = outpoints
.into_iter()
.zip(spending_txindexes)
.filter_map(|(outpoint, spending_txindex)| {
if outpoint.is_coinbase() {
return None;
}
let output_txindex = outpoint.txindex();
let vout = outpoint.vout();
let txoutindex =
txindex_to_first_txoutindex_reader.get(output_txindex.to_usize()) + vout;
if txoutindex < starting_indexes.txoutindex {
let outputtype = txoutindex_to_outputtype_reader.get(txoutindex.to_usize());
let typeindex = txoutindex_to_typeindex_reader.get(txoutindex.to_usize());
Some((outpoint, outputtype, typeindex, spending_txindex))
} else {
None
}
})
.collect();
for (outpoint, outputtype, typeindex, spending_txindex) in outputs_to_unspend {
if outputtype.is_address() {
let addresstype = outputtype;
let addressindex = typeindex;
addressindex_txindex_to_remove.insert((
addresstype,
addressindex,
spending_txindex,
));
self.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.insert(AddressIndexOutPoint::from((addressindex, outpoint)), Unit);
}
}
for (addresstype, addressindex, txindex) in addressindex_txindex_to_remove {
self.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.remove(AddressIndexTxIndex::from((addressindex, txindex)));
}
}
pub fn reset(&mut self) -> Result<()> { pub fn reset(&mut self) -> Result<()> {
info!("Resetting stores..."); info!("Resetting stores...");

View File

@@ -12,7 +12,7 @@ use vecdb::{
Stamp, VecIndex, Stamp, VecIndex,
}; };
use crate::AddressReaders; use crate::readers::AddressReaders;
use crate::parallel_import; use crate::parallel_import;
#[derive(Clone, Traversable)] #[derive(Clone, Traversable)]

View File

@@ -2,12 +2,10 @@ use std::path::Path;
use brk_error::Result; use brk_error::Result;
use brk_traversable::Traversable; use brk_traversable::Traversable;
use brk_types::{AddressBytes, AddressHash, Height, OutputType, TypeIndex, Version}; use brk_types::{AddressHash, Height, OutputType, Version};
use rayon::prelude::*; use rayon::prelude::*;
use vecdb::{AnyStoredVec, Database, Stamp}; use vecdb::{AnyStoredVec, Database, Stamp};
use crate::AddressReaders;
const PAGE_SIZE: usize = 4096; const PAGE_SIZE: usize = 4096;
use crate::parallel_import; use crate::parallel_import;
@@ -47,46 +45,14 @@ impl Vecs {
tracing::debug!("Setting min len..."); tracing::debug!("Setting min len...");
db.set_min_len(PAGE_SIZE * 50_000_000)?; db.set_min_len(PAGE_SIZE * 50_000_000)?;
tracing::debug!("Importing sub-vecs in parallel...");
let (blocks, transactions, inputs, outputs, addresses, scripts) = parallel_import! { let (blocks, transactions, inputs, outputs, addresses, scripts) = parallel_import! {
blocks = { blocks = BlocksVecs::forced_import(&db, version),
tracing::debug!("Importing BlocksVecs..."); transactions = TransactionsVecs::forced_import(&db, version),
let r = BlocksVecs::forced_import(&db, version); inputs = InputsVecs::forced_import(&db, version),
tracing::debug!("BlocksVecs imported."); outputs = OutputsVecs::forced_import(&db, version),
r addresses = AddressesVecs::forced_import(&db, version),
}, scripts = ScriptsVecs::forced_import(&db, version),
transactions = {
tracing::debug!("Importing TransactionsVecs...");
let r = TransactionsVecs::forced_import(&db, version);
tracing::debug!("TransactionsVecs imported.");
r
},
inputs = {
tracing::debug!("Importing InputsVecs...");
let r = InputsVecs::forced_import(&db, version);
tracing::debug!("InputsVecs imported.");
r
},
outputs = {
tracing::debug!("Importing OutputsVecs...");
let r = OutputsVecs::forced_import(&db, version);
tracing::debug!("OutputsVecs imported.");
r
},
addresses = {
tracing::debug!("Importing AddressesVecs...");
let r = AddressesVecs::forced_import(&db, version);
tracing::debug!("AddressesVecs imported.");
r
},
scripts = {
tracing::debug!("Importing ScriptsVecs...");
let r = ScriptsVecs::forced_import(&db, version);
tracing::debug!("ScriptsVecs imported.");
r
},
}; };
tracing::debug!("Sub-vecs imported.");
let this = Self { let this = Self {
db, db,
@@ -148,20 +114,6 @@ impl Vecs {
Ok(()) Ok(())
} }
pub fn get_addressbytes_by_type(
&self,
addresstype: OutputType,
typeindex: TypeIndex,
readers: &AddressReaders,
) -> Option<AddressBytes> {
self.addresses
.get_bytes_by_type(addresstype, typeindex, readers)
}
pub fn push_bytes_if_needed(&mut self, index: TypeIndex, bytes: AddressBytes) -> Result<()> {
self.addresses.push_bytes_if_needed(index, bytes)
}
pub fn flush(&mut self, height: Height) -> Result<()> { pub fn flush(&mut self, height: Height) -> Result<()> {
self.par_iter_mut_any_stored_vec() self.par_iter_mut_any_stored_vec()
.try_for_each(|vec| vec.stamped_write(Stamp::from(height)))?; .try_for_each(|vec| vec.stamped_write(Stamp::from(height)))?;

View File

@@ -6,6 +6,7 @@ edition.workspace = true
license.workspace = true license.workspace = true
homepage.workspace = true homepage.workspace = true
repository.workspace = true repository.workspace = true
exclude = ["examples/"]
[dependencies] [dependencies]
brk_types = { workspace = true } brk_types = { workspace = true }

View File

@@ -143,7 +143,7 @@ All parameters via `Config` with sensible defaults:
## Accuracy ## Accuracy
Tested over 361,245 blocks (heights 575,000 to 936,244) against exchange OHLC data. Error is measured per block as distance from the oracle estimate to the exchange high/low range at that height. If the oracle falls within the range, the error is zero. Tested over 361,245 blocks (heights 575,000 to 936,244, as of February 2026) against exchange OHLC data. Error is measured per block as distance from the oracle estimate to the exchange high/low range at that height. If the oracle falls within the range, the error is zero.
### Per-block ### Per-block

View File

@@ -0,0 +1,283 @@
//! Diagnostic: sweep oracle start heights and clamp-top-N strategies.
//!
//! Run with: cargo run -p brk_oracle --example noise --release
use std::path::PathBuf;
use std::time::Instant;
use brk_indexer::Indexer;
use brk_oracle::{Config, NUM_BINS, Oracle, PRICES, cents_to_bin, sats_to_bin};
use brk_types::{Sats, TxIndex, TxOutIndex};
use vecdb::{AnyVec, ReadableVec, VecIndex};
const BINS_5PCT: f64 = 4.24;
const BINS_10PCT: f64 = 8.28;
const BINS_20PCT: f64 = 15.84;
const BPD: f64 = 200.0;
fn bins_to_pct(bins: f64) -> f64 {
(10.0_f64.powf(bins / BPD) - 1.0) * 100.0
}
fn price_seed_bin(start_height: usize) -> f64 {
let price: f64 = PRICES
.lines()
.nth(start_height - 1)
.expect("prices.txt too short")
.parse()
.expect("Failed to parse seed price");
cents_to_bin(price * 100.0)
}
/// Clamp the top N bins in `src` down to the (N+1)th highest value, writing into `dst`.
fn clamp_top_n(src: &[u32; NUM_BINS], dst: &mut [u32; NUM_BINS], n: usize) {
// Find the (n+1)th largest value.
// Collect non-zero counts, sort descending, take the (n+1)th.
let mut top: Vec<u32> = src.iter().copied().filter(|&v| v > 0).collect();
top.sort_unstable_by(|a, b| b.cmp(a));
let clamp_to = if top.len() > n { top[n] } else { 0 };
for (i, &v) in src.iter().enumerate() {
dst[i] = v.min(clamp_to.max(v.min(clamp_to)));
}
}
fn main() {
let t0 = Instant::now();
let data_dir = std::env::var("BRK_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| {
let home = std::env::var("HOME").unwrap();
PathBuf::from(home).join(".brk")
});
let indexer = Indexer::forced_import(&data_dir).expect("Failed to load indexer");
let total_heights = indexer.vecs.blocks.timestamp.len();
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let height_ohlc: Vec<[f64; 4]> = serde_json::from_str(
&std::fs::read_to_string(format!("{manifest_dir}/examples/height_price_ohlc.json"))
.expect("Failed to read height_price_ohlc.json"),
)
.expect("Failed to parse height OHLC");
let height_bands: Vec<(f64, f64)> = height_ohlc
.iter()
.map(|ohlc| {
let high = ohlc[1];
let low = ohlc[2];
if high > 0.0 && low > 0.0 {
(cents_to_bin(high * 100.0), cents_to_bin(low * 100.0))
} else {
(0.0, 0.0)
}
})
.collect();
// Start heights: 630k, 600k, 575k, then 570k down to 500k by 5k.
let mut start_heights: Vec<usize> = vec![630_000, 600_000, 575_000];
let mut h = 570_000;
while h >= 500_000 {
start_heights.push(h);
h -= 5_000;
}
let lowest = *start_heights.iter().min().unwrap();
// Clamp-top-N values to test: 0 (no clamp), 2, 3, 5, 10.
let clamp_values: Vec<usize> = vec![0, 2, 3, 5, 10];
// Build per-block RAW histograms from the lowest start height.
eprintln!("Building histograms from height {}...", lowest);
let total_txs = indexer.vecs.transactions.height.len();
let total_outputs = indexer.vecs.outputs.value.len();
let first_txoutindex_reader = indexer.vecs.transactions.first_txoutindex.reader();
let value_reader = indexer.vecs.outputs.value.reader();
let outputtype_reader = indexer.vecs.outputs.outputtype.reader();
let config = Config::default();
let total_blocks = total_heights - lowest;
struct BlockData {
hist: Box<[u32; NUM_BINS]>,
high_bin: f64,
low_bin: f64,
}
let mut blocks: Vec<BlockData> = Vec::with_capacity(total_blocks);
for h in lowest..total_heights {
let first_txindex: TxIndex = indexer
.vecs
.transactions
.first_txindex
.collect_one(h)
.unwrap();
let next_first_txindex: TxIndex = indexer
.vecs
.transactions
.first_txindex
.collect_one(h + 1)
.unwrap_or(TxIndex::from(total_txs));
let out_start = if first_txindex.to_usize() + 1 < next_first_txindex.to_usize() {
first_txoutindex_reader
.get(first_txindex.to_usize() + 1)
.to_usize()
} else {
indexer
.vecs
.outputs
.first_txoutindex
.collect_one(h + 1)
.unwrap_or(TxOutIndex::from(total_outputs))
.to_usize()
};
let out_end: usize = indexer
.vecs
.outputs
.first_txoutindex
.collect_one(h + 1)
.unwrap_or(TxOutIndex::from(total_outputs))
.to_usize();
let mut hist = Box::new([0u32; NUM_BINS]);
for i in out_start..out_end {
let sats: Sats = value_reader.get(i);
let output_type = outputtype_reader.get(i);
if config.excluded_output_types.contains(&output_type) {
continue;
}
if *sats < config.min_sats {
continue;
}
if config.exclude_common_round_values && sats.is_common_round_value() {
continue;
}
if let Some(bin) = sats_to_bin(sats) {
hist[bin] += 1;
}
}
let (high_bin, low_bin) = if h < height_bands.len() {
height_bands[h]
} else {
(0.0, 0.0)
};
blocks.push(BlockData {
hist,
high_bin,
low_bin,
});
if (h - lowest) % 50_000 == 0 {
eprint!(
"\r {}/{} ({:.0}%)",
h - lowest,
total_blocks,
(h - lowest) as f64 / total_blocks as f64 * 100.0
);
}
}
eprintln!(
"\r {} blocks built in {:.1}s",
blocks.len(),
t0.elapsed().as_secs_f64()
);
// For each clamp value, run all start heights.
for &clamp_n in &clamp_values {
println!();
let label = if clamp_n == 0 {
"no clamp".to_string()
} else {
format!("clamp top {}", clamp_n)
};
println!("=== {} ===", label);
println!(
"{:>8} {:>8} {:>8} {:>8} {:>6} {:>6} {:>6} {:>8}",
"Start", "Blocks", "RMSE%", "Worst%", ">5%", ">10%", ">20%", "Worst@"
);
println!("{}", "-".repeat(72));
for &start_height in &start_heights {
let mut oracle = Oracle::new(price_seed_bin(start_height), config.clone());
let block_offset = start_height - lowest;
let mut worst_err: f64 = 0.0;
let mut worst_height: usize = 0;
let mut gt_5: u64 = 0;
let mut gt_10: u64 = 0;
let mut gt_20: u64 = 0;
let mut total_sq_err: f64 = 0.0;
let mut total_measured: u64 = 0;
let mut clamped_hist = [0u32; NUM_BINS];
for (i, bd) in blocks[block_offset..].iter().enumerate() {
if clamp_n > 0 {
clamp_top_n(&bd.hist, &mut clamped_hist, clamp_n);
oracle.process_histogram(&clamped_hist);
} else {
oracle.process_histogram(&bd.hist);
}
let height = start_height + i;
let ref_bin = oracle.ref_bin();
if bd.high_bin <= 0.0 || bd.low_bin <= 0.0 {
continue;
}
let err = if ref_bin < bd.high_bin {
ref_bin - bd.high_bin
} else if ref_bin > bd.low_bin {
ref_bin - bd.low_bin
} else {
0.0
};
total_measured += 1;
total_sq_err += err * err;
let abs_err = err.abs();
if abs_err > BINS_5PCT {
gt_5 += 1;
}
if abs_err > BINS_10PCT {
gt_10 += 1;
}
if abs_err > BINS_20PCT {
gt_20 += 1;
}
if abs_err > worst_err {
worst_err = abs_err;
worst_height = height;
}
}
let rmse = if total_measured > 0 {
bins_to_pct((total_sq_err / total_measured as f64).sqrt())
} else {
0.0
};
println!(
"{:>8} {:>8} {:>7.3}% {:>7.1}% {:>6} {:>6} {:>6} {}",
format!("{}k", start_height / 1000),
total_measured,
rmse,
bins_to_pct(worst_err),
gt_5,
gt_10,
gt_20,
worst_height,
);
}
}
println!("\nTotal time: {:.1}s", t0.elapsed().as_secs_f64());
}

View File

@@ -3,13 +3,13 @@
//! Detects round-dollar transaction patterns ($1, $5, $10, ... $10,000) in Bitcoin //! Detects round-dollar transaction patterns ($1, $5, $10, ... $10,000) in Bitcoin
//! block outputs to derive the current price without any exchange data. //! block outputs to derive the current price without any exchange data.
use brk_types::{Block, CentsUnsigned, Dollars, OutputType, Sats}; use brk_types::{Block, Cents, Dollars, OutputType, Sats};
/// Pre-oracle dollar prices, one per line, heights 0..630_000. /// Pre-oracle dollar prices, one per line, heights 0..630_000.
pub const PRICES: &str = include_str!("prices.txt"); pub const PRICES: &str = include_str!("prices.txt");
/// First height where the oracle computes from on-chain data. /// First height where the oracle computes from on-chain data.
pub const START_HEIGHT: usize = 575_000; pub const START_HEIGHT: usize = 550_000;
pub const BINS_PER_DECADE: usize = 200; pub const BINS_PER_DECADE: usize = 200;
const MIN_LOG_BTC: i32 = -8; const MIN_LOG_BTC: i32 = -8;
@@ -120,8 +120,16 @@ fn find_best_bin(
// Parabolic sub-bin interpolation for fractional precision. // Parabolic sub-bin interpolation for fractional precision.
let score_center = best_score; let score_center = best_score;
let score_left = if best_bin > search_start { score(best_bin - 1) } else { score_center }; let score_left = if best_bin > search_start {
let score_right = if best_bin + 1 < search_end { score(best_bin + 1) } else { score_center }; score(best_bin - 1)
} else {
score_center
};
let score_right = if best_bin + 1 < search_end {
score(best_bin + 1)
} else {
score_center
};
let denom = score_left - 2.0 * score_center + score_right; let denom = score_left - 2.0 * score_center + score_right;
let sub_bin = if denom.abs() > 1e-10 { let sub_bin = if denom.abs() > 1e-10 {
(0.5 * (score_left - score_right) / denom).clamp(-0.5, 0.5) (0.5 * (score_left - score_right) / denom).clamp(-0.5, 0.5)
@@ -207,7 +215,12 @@ impl Oracle {
.iter() .iter()
.skip(1) // skip coinbase .skip(1) // skip coinbase
.flat_map(|tx| &tx.output) .flat_map(|tx| &tx.output)
.map(|txout| (Sats::from(txout.value), OutputType::from(&txout.script_pubkey))), .map(|txout| {
(
Sats::from(txout.value),
OutputType::from(&txout.script_pubkey),
)
}),
) )
} }
@@ -242,7 +255,7 @@ impl Oracle {
self.ref_bin self.ref_bin
} }
pub fn price_cents(&self) -> CentsUnsigned { pub fn price_cents(&self) -> Cents {
bin_to_cents(self.ref_bin).into() bin_to_cents(self.ref_bin).into()
} }
@@ -291,13 +304,12 @@ impl Oracle {
fn recompute_ema(&mut self) { fn recompute_ema(&mut self) {
self.ema.fill(0.0); self.ema.fill(0.0);
for age in 0..self.filled { for age in 0..self.filled {
let idx = let idx = (self.cursor + self.config.window_size - 1 - age) % self.config.window_size;
(self.cursor + self.config.window_size - 1 - age) % self.config.window_size;
let weight = self.weights[age]; let weight = self.weights[age];
let h = &self.histograms[idx]; let h = &self.histograms[idx];
for bin in 0..NUM_BINS { (0..NUM_BINS).for_each(|bin| {
self.ema[bin] += weight * h[bin] as f64; self.ema[bin] += weight * h[bin] as f64;
} });
} }
} }
} }