global: adding support for safe lengths

This commit is contained in:
nym21
2026-05-06 15:33:07 +02:00
parent da7671744f
commit 086bfd9938
177 changed files with 2445 additions and 2049 deletions

View File

@@ -25,7 +25,6 @@ serde = { workspace = true }
tracing = { workspace = true }
rayon = { workspace = true }
rustc-hash = { workspace = true }
smallvec = { workspace = true }
vecdb = { workspace = true }
[dev-dependencies]

View File

@@ -50,14 +50,16 @@ fn main() -> Result<()> {
indexer.index(&reader, &client, &exit)?;
info!("Done in {:?}", i.elapsed());
sleep(Duration::from_secs(60));
// We want to benchmark the drop too
drop(indexer);
sleep(Duration::from_secs(10));
sleep(Duration::from_secs(60));
Mimalloc::collect();
sleep(Duration::from_secs(10));
sleep(Duration::from_secs(60));
Ok(())
}

View File

@@ -1,240 +0,0 @@
use brk_error::Result;
use brk_types::{Height, Indexes};
use tracing::{debug, info};
use vecdb::{AnyStoredVec, PcoVec, PcoVecValue, ReadableVec, VecIndex, VecValue, WritableVec};
use crate::{Stores, Vecs};
/// Extension trait for Indexes with brk_indexer-specific functionality.
pub trait IndexesExt {
fn checked_push(&self, vecs: &mut Vecs) -> Result<()>;
fn from_vecs_and_stores(
required_height: Height,
vecs: &mut Vecs,
stores: &Stores,
) -> Option<Self>
where
Self: Sized;
}
impl IndexesExt for Indexes {
fn checked_push(&self, vecs: &mut Vecs) -> Result<()> {
let height = self.height;
vecs.transactions
.first_tx_index
.checked_push(height, self.tx_index)?;
vecs.inputs
.first_txin_index
.checked_push(height, self.txin_index)?;
vecs.outputs
.first_txout_index
.checked_push(height, self.txout_index)?;
vecs.scripts
.empty
.first_index
.checked_push(height, self.empty_output_index)?;
vecs.scripts
.p2ms
.first_index
.checked_push(height, self.p2ms_output_index)?;
vecs.scripts
.op_return
.first_index
.checked_push(height, self.op_return_index)?;
vecs.addrs
.p2a
.first_index
.checked_push(height, self.p2a_addr_index)?;
vecs.scripts
.unknown
.first_index
.checked_push(height, self.unknown_output_index)?;
vecs.addrs
.p2pk33
.first_index
.checked_push(height, self.p2pk33_addr_index)?;
vecs.addrs
.p2pk65
.first_index
.checked_push(height, self.p2pk65_addr_index)?;
vecs.addrs
.p2pkh
.first_index
.checked_push(height, self.p2pkh_addr_index)?;
vecs.addrs
.p2sh
.first_index
.checked_push(height, self.p2sh_addr_index)?;
vecs.addrs
.p2tr
.first_index
.checked_push(height, self.p2tr_addr_index)?;
vecs.addrs
.p2wpkh
.first_index
.checked_push(height, self.p2wpkh_addr_index)?;
vecs.addrs
.p2wsh
.first_index
.checked_push(height, self.p2wsh_addr_index)?;
Ok(())
}
fn from_vecs_and_stores(
required_height: Height,
vecs: &mut Vecs,
stores: &Stores,
) -> Option<Indexes> {
debug!("Creating indexes from vecs and stores...");
// Local data height: minimum of vecs and stores
let vecs_height = vecs.starting_height();
let stores_height = stores.starting_height();
let local_height = vecs_height.min(stores_height);
// Data inconsistency: local data behind required height
if local_height < required_height {
return None;
}
// Handle reorg: local data ahead of required height
let starting_height = if local_height > required_height {
info!(
"Reorg detected: rolling back from {} to {}",
local_height, required_height
);
required_height
} else {
local_height
};
let empty_output_index = starting_index(
&vecs.scripts.empty.first_index,
&vecs.scripts.empty.to_tx_index,
starting_height,
)?;
let p2ms_output_index = starting_index(
&vecs.scripts.p2ms.first_index,
&vecs.scripts.p2ms.to_tx_index,
starting_height,
)?;
let op_return_index = starting_index(
&vecs.scripts.op_return.first_index,
&vecs.scripts.op_return.to_tx_index,
starting_height,
)?;
let p2pk33_addr_index = starting_index(
&vecs.addrs.p2pk33.first_index,
&vecs.addrs.p2pk33.bytes,
starting_height,
)?;
let p2pk65_addr_index = starting_index(
&vecs.addrs.p2pk65.first_index,
&vecs.addrs.p2pk65.bytes,
starting_height,
)?;
let p2pkh_addr_index = starting_index(
&vecs.addrs.p2pkh.first_index,
&vecs.addrs.p2pkh.bytes,
starting_height,
)?;
let p2sh_addr_index = starting_index(
&vecs.addrs.p2sh.first_index,
&vecs.addrs.p2sh.bytes,
starting_height,
)?;
let p2tr_addr_index = starting_index(
&vecs.addrs.p2tr.first_index,
&vecs.addrs.p2tr.bytes,
starting_height,
)?;
let p2wpkh_addr_index = starting_index(
&vecs.addrs.p2wpkh.first_index,
&vecs.addrs.p2wpkh.bytes,
starting_height,
)?;
let p2wsh_addr_index = starting_index(
&vecs.addrs.p2wsh.first_index,
&vecs.addrs.p2wsh.bytes,
starting_height,
)?;
let p2a_addr_index = starting_index(
&vecs.addrs.p2a.first_index,
&vecs.addrs.p2a.bytes,
starting_height,
)?;
let tx_index = starting_index(
&vecs.transactions.first_tx_index,
&vecs.transactions.txid,
starting_height,
)?;
let txin_index = starting_index(
&vecs.inputs.first_txin_index,
&vecs.inputs.outpoint,
starting_height,
)?;
let txout_index = starting_index(
&vecs.outputs.first_txout_index,
&vecs.outputs.value,
starting_height,
)?;
let unknown_output_index = starting_index(
&vecs.scripts.unknown.first_index,
&vecs.scripts.unknown.to_tx_index,
starting_height,
)?;
Some(Indexes {
empty_output_index,
height: starting_height,
p2ms_output_index,
op_return_index,
p2pk33_addr_index,
p2pk65_addr_index,
p2pkh_addr_index,
p2sh_addr_index,
p2tr_addr_index,
p2wpkh_addr_index,
p2wsh_addr_index,
p2a_addr_index,
tx_index,
txin_index,
txout_index,
unknown_output_index,
})
}
}
pub fn starting_index<I, T>(
height_to_index: &PcoVec<Height, I>,
index_to_else: &impl ReadableVec<I, T>,
starting_height: Height,
) -> Option<I>
where
I: VecIndex + PcoVecValue + From<usize>,
T: VecValue,
{
let h = Height::from(height_to_index.stamp());
if h.is_zero() {
None
} else if h + 1_u32 == starting_height {
Some(I::from(index_to_else.len()))
} else {
height_to_index.collect_one(starting_height)
}
}

View File

@@ -0,0 +1,254 @@
use brk_error::Result;
use brk_types::{
EmptyOutputIndex, Height, OpReturnIndex, OutputType, P2AAddrIndex, P2MSOutputIndex,
P2PK33AddrIndex, P2PK65AddrIndex, P2PKHAddrIndex, P2SHAddrIndex, P2TRAddrIndex,
P2WPKHAddrIndex, P2WSHAddrIndex, TxInIndex, TxIndex, TxOutIndex, TypeIndex, UnknownOutputIndex,
};
use tracing::info;
use vecdb::{AnyStoredVec, PcoVec, PcoVecValue, ReadableVec, VecIndex, VecValue, WritableVec};
use crate::{Stores, Vecs};
/// Pipeline-wide length/count snapshot. Lengths semantics:
/// `bound.f = N` means positions `0..N` are fully written; readers
/// reject `pos >= bound.f`.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct Lengths {
pub empty_output_index: EmptyOutputIndex,
pub height: Height,
pub op_return_index: OpReturnIndex,
pub p2ms_output_index: P2MSOutputIndex,
pub p2pk33_addr_index: P2PK33AddrIndex,
pub p2pk65_addr_index: P2PK65AddrIndex,
pub p2pkh_addr_index: P2PKHAddrIndex,
pub p2sh_addr_index: P2SHAddrIndex,
pub p2tr_addr_index: P2TRAddrIndex,
pub p2wpkh_addr_index: P2WPKHAddrIndex,
pub p2wsh_addr_index: P2WSHAddrIndex,
pub p2a_addr_index: P2AAddrIndex,
pub tx_index: TxIndex,
pub txin_index: TxInIndex,
pub txout_index: TxOutIndex,
pub unknown_output_index: UnknownOutputIndex,
}
impl Lengths {
pub fn to_type_index(&self, output_type: OutputType) -> TypeIndex {
match output_type {
OutputType::Empty => *self.empty_output_index,
OutputType::OpReturn => *self.op_return_index,
OutputType::P2A => *self.p2a_addr_index,
OutputType::P2MS => *self.p2ms_output_index,
OutputType::P2PK33 => *self.p2pk33_addr_index,
OutputType::P2PK65 => *self.p2pk65_addr_index,
OutputType::P2PKH => *self.p2pkh_addr_index,
OutputType::P2SH => *self.p2sh_addr_index,
OutputType::P2TR => *self.p2tr_addr_index,
OutputType::P2WPKH => *self.p2wpkh_addr_index,
OutputType::P2WSH => *self.p2wsh_addr_index,
OutputType::Unknown => *self.unknown_output_index,
}
}
/// Bump per-block totals after processing a block.
pub fn add_block(&mut self, tx_count: usize, input_count: usize, output_count: usize) {
self.tx_index += TxIndex::from(tx_count);
self.txin_index += TxInIndex::from(input_count);
self.txout_index += TxOutIndex::from(output_count);
}
/// Increments the address index for the given address type and returns the previous value.
/// Only call this for address types (P2PK65, P2PK33, P2PKH, P2SH, P2WPKH, P2WSH, P2TR, P2A).
#[inline]
pub fn increment_addr_index(&mut self, addr_type: OutputType) -> TypeIndex {
match addr_type {
OutputType::P2PK65 => self.p2pk65_addr_index.copy_then_increment(),
OutputType::P2PK33 => self.p2pk33_addr_index.copy_then_increment(),
OutputType::P2PKH => self.p2pkh_addr_index.copy_then_increment(),
OutputType::P2SH => self.p2sh_addr_index.copy_then_increment(),
OutputType::P2WPKH => self.p2wpkh_addr_index.copy_then_increment(),
OutputType::P2WSH => self.p2wsh_addr_index.copy_then_increment(),
OutputType::P2TR => self.p2tr_addr_index.copy_then_increment(),
OutputType::P2A => self.p2a_addr_index.copy_then_increment(),
_ => unreachable!(),
}
}
pub fn checked_push(&self, vecs: &mut Vecs) -> Result<()> {
let height = self.height;
vecs.transactions
.first_tx_index
.checked_push(height, self.tx_index)?;
vecs.inputs
.first_txin_index
.checked_push(height, self.txin_index)?;
vecs.outputs
.first_txout_index
.checked_push(height, self.txout_index)?;
vecs.scripts
.empty
.first_index
.checked_push(height, self.empty_output_index)?;
vecs.scripts
.p2ms
.first_index
.checked_push(height, self.p2ms_output_index)?;
vecs.scripts
.op_return
.first_index
.checked_push(height, self.op_return_index)?;
vecs.addrs
.p2a
.first_index
.checked_push(height, self.p2a_addr_index)?;
vecs.scripts
.unknown
.first_index
.checked_push(height, self.unknown_output_index)?;
vecs.addrs
.p2pk33
.first_index
.checked_push(height, self.p2pk33_addr_index)?;
vecs.addrs
.p2pk65
.first_index
.checked_push(height, self.p2pk65_addr_index)?;
vecs.addrs
.p2pkh
.first_index
.checked_push(height, self.p2pkh_addr_index)?;
vecs.addrs
.p2sh
.first_index
.checked_push(height, self.p2sh_addr_index)?;
vecs.addrs
.p2tr
.first_index
.checked_push(height, self.p2tr_addr_index)?;
vecs.addrs
.p2wpkh
.first_index
.checked_push(height, self.p2wpkh_addr_index)?;
vecs.addrs
.p2wsh
.first_index
.checked_push(height, self.p2wsh_addr_index)?;
Ok(())
}
/// Read current local lengths. `None` pre-genesis.
pub fn from_local(vecs: &mut Vecs, stores: &Stores) -> Option<Self> {
let height = vecs.next_height().min(stores.next_height());
Self::collect_at(height, vecs)
}
/// Read lengths to resume at `required_height`. Reorg-aware:
/// - if local is ahead, clamp down to `required_height`;
/// - if local is behind, return `None` (caller must full-reset).
pub fn resume_at(required_height: Height, vecs: &mut Vecs, stores: &Stores) -> Option<Self> {
let local = vecs.next_height().min(stores.next_height());
if local < required_height {
return None;
}
let height = if local > required_height {
info!(
"Reorg detected: rolling back from {} to {}",
local, required_height
);
required_height
} else {
local
};
Self::collect_at(height, vecs)
}
fn collect_at(height: Height, vecs: &mut Vecs) -> Option<Self> {
Some(Self {
empty_output_index: next_index(
&vecs.scripts.empty.first_index,
&vecs.scripts.empty.to_tx_index,
height,
)?,
height,
p2ms_output_index: next_index(
&vecs.scripts.p2ms.first_index,
&vecs.scripts.p2ms.to_tx_index,
height,
)?,
op_return_index: next_index(
&vecs.scripts.op_return.first_index,
&vecs.scripts.op_return.to_tx_index,
height,
)?,
p2pk33_addr_index: next_index(
&vecs.addrs.p2pk33.first_index,
&vecs.addrs.p2pk33.bytes,
height,
)?,
p2pk65_addr_index: next_index(
&vecs.addrs.p2pk65.first_index,
&vecs.addrs.p2pk65.bytes,
height,
)?,
p2pkh_addr_index: next_index(
&vecs.addrs.p2pkh.first_index,
&vecs.addrs.p2pkh.bytes,
height,
)?,
p2sh_addr_index: next_index(
&vecs.addrs.p2sh.first_index,
&vecs.addrs.p2sh.bytes,
height,
)?,
p2tr_addr_index: next_index(
&vecs.addrs.p2tr.first_index,
&vecs.addrs.p2tr.bytes,
height,
)?,
p2wpkh_addr_index: next_index(
&vecs.addrs.p2wpkh.first_index,
&vecs.addrs.p2wpkh.bytes,
height,
)?,
p2wsh_addr_index: next_index(
&vecs.addrs.p2wsh.first_index,
&vecs.addrs.p2wsh.bytes,
height,
)?,
p2a_addr_index: next_index(&vecs.addrs.p2a.first_index, &vecs.addrs.p2a.bytes, height)?,
tx_index: next_index(
&vecs.transactions.first_tx_index,
&vecs.transactions.txid,
height,
)?,
txin_index: next_index(&vecs.inputs.first_txin_index, &vecs.inputs.outpoint, height)?,
txout_index: next_index(&vecs.outputs.first_txout_index, &vecs.outputs.value, height)?,
unknown_output_index: next_index(
&vecs.scripts.unknown.first_index,
&vecs.scripts.unknown.to_tx_index,
height,
)?,
})
}
}
/// Per-type next-to-write counter at `next_height`. `None` pre-genesis.
fn next_index<I, T>(
height_to_index: &PcoVec<Height, I>,
index_to_else: &impl ReadableVec<I, T>,
next_height: Height,
) -> Option<I>
where
I: VecIndex + PcoVecValue + From<usize>,
T: VecValue,
{
let h = Height::from(height_to_index.stamp());
if h.is_zero() {
None
} else if h + 1_u32 == next_height {
Some(I::from(index_to_else.len()))
} else {
height_to_index.collect_one(next_height)
}
}

View File

@@ -4,7 +4,7 @@ use std::{
fs,
path::{Path, PathBuf},
sync::Arc,
thread::{self, sleep},
thread,
time::{Duration, Instant},
};
@@ -19,18 +19,19 @@ use vecdb::{
Exit, RawDBError, ReadOnlyClone, ReadableVec, Ro, Rw, StorageMode, WritableVec, unlikely,
};
mod constants;
mod indexes;
mod lengths;
mod processor;
mod readers;
mod safe_lengths;
mod stores;
mod vecs;
use constants::*;
use indexes::IndexesExt;
use processor::{BlockBuffers, BlockProcessor};
use readers::Readers;
pub use brk_types::Indexes;
pub use lengths::Lengths;
pub use safe_lengths::SafeLengths;
pub use stores::Stores;
pub use vecs::*;
@@ -39,32 +40,27 @@ pub struct Indexer<M: StorageMode = Rw> {
pub vecs: Vecs<M>,
pub stores: Stores,
tip_blockhash: Arc<RwLock<BlockHash>>,
safe_lengths: SafeLengths,
}
impl<M: StorageMode> Indexer<M> {
pub fn tip_blockhash(&self) -> BlockHash {
*self.tip_blockhash.read()
}
}
impl Indexer<Ro> {
/// Last height whose data is durably indexed, derived from the
/// `blockhash` vec's stamp.
pub fn indexed_height(&self) -> Height {
Height::from(self.vecs.blocks.blockhash.inner.stamp())
/// Pipeline-safe `Lengths` snapshot shared with `Query`. Writers
/// advance and lower this internally; readers clamp non-series
/// answers against this loaded snapshot.
pub fn safe_lengths(&self) -> Lengths {
self.safe_lengths.load()
}
}
impl ReadOnlyClone for Indexer {
type ReadOnly = Indexer<Ro>;
fn read_only_clone(&self) -> Indexer<Ro> {
Indexer {
path: self.path.clone(),
vecs: self.vecs.read_only_clone(),
stores: self.stores.clone(),
tip_blockhash: self.tip_blockhash.clone(),
}
impl Indexer<Ro> {
/// Live indexer stamp for diagnostics. For data reads use
/// [`crate::SafeLengths::load`] (via `Query::height`).
pub fn indexed_height(&self) -> Height {
Height::from(self.vecs.blocks.blockhash.inner.stamp())
}
}
@@ -94,6 +90,7 @@ impl Indexer {
vecs,
stores,
tip_blockhash: Arc::new(RwLock::new(tip_blockhash)),
safe_lengths: SafeLengths::new(),
})
};
@@ -119,6 +116,8 @@ impl Indexer {
/// record that gets replayed on every recovery), this cleanly recreates.
fn full_reset(&mut self) -> Result<()> {
info!("Full reset...");
self.safe_lengths.reset();
*self.tip_blockhash.write() = BlockHash::default();
self.vecs.reset()?;
let stores_path = self.path.join("stores");
fs::remove_dir_all(&stores_path).ok();
@@ -126,41 +125,21 @@ impl Indexer {
Ok(())
}
pub fn index(&mut self, reader: &Reader, client: &Client, exit: &Exit) -> Result<Indexes> {
pub fn index(&mut self, reader: &Reader, client: &Client, exit: &Exit) -> Result<()> {
self.index_(reader, client, exit, false)
}
pub fn checked_index(
&mut self,
reader: &Reader,
client: &Client,
exit: &Exit,
) -> Result<Indexes> {
pub fn checked_index(&mut self, reader: &Reader, client: &Client, exit: &Exit) -> Result<()> {
self.index_(reader, client, exit, true)
}
fn check_xor_bytes(&mut self, reader: &Reader) -> Result<()> {
let current = reader.xor_bytes();
let cached = XORBytes::from(self.path.as_path());
if cached == current {
return Ok(());
}
self.full_reset()?;
fs::write(self.path.join("xor.dat"), *current)?;
Ok(())
}
fn index_(
&mut self,
reader: &Reader,
client: &Client,
exit: &Exit,
check_collisions: bool,
) -> Result<Indexes> {
) -> Result<()> {
self.vecs.db.sync_bg_tasks()?;
self.check_xor_bytes(reader)?;
@@ -176,42 +155,40 @@ impl Indexer {
// .collect_one_at(self.vecs.blocks.blockhash.len() - 2);
debug!("Last block hash found.");
let (starting_indexes, prev_hash) = if let Some(hash) = last_blockhash {
let (starting_lengths, prev_hash) = if let Some(hash) = last_blockhash {
let (height, hash) = client.get_closest_valid_height(hash)?;
match Indexes::from_vecs_and_stores(height.incremented(), &mut self.vecs, &self.stores)
{
Some(starting_indexes) => {
if starting_indexes.height > client.get_last_height()? {
match Lengths::resume_at(height.incremented(), &mut self.vecs, &self.stores) {
Some(starting_lengths) => {
if starting_lengths.height > client.get_last_height()? {
info!("Up to date, nothing to index.");
return Ok(starting_indexes);
return Ok(());
}
(starting_indexes, Some(hash))
(starting_lengths, Some(hash))
}
None => {
info!("Data inconsistency detected, resetting indexer...");
self.full_reset()?;
(Indexes::default(), None)
(Lengths::default(), None)
}
}
} else {
(Indexes::default(), None)
(Lengths::default(), None)
};
debug!("Starting indexes set.");
debug!("Starting lengths set.");
let lock = exit.lock();
self.safe_lengths.lower_before(&starting_lengths);
self.stores
.rollback_if_needed(&mut self.vecs, &starting_indexes)?;
.rollback_if_needed(&mut self.vecs, &starting_lengths)?;
debug!("Rollback stores done.");
self.vecs.rollback_if_needed(&starting_indexes)?;
self.vecs.rollback_if_needed(&starting_lengths)?;
debug!("Rollback vecs done.");
if let Some(hash) = prev_hash.as_ref() {
*self.tip_blockhash.write() = *hash;
}
drop(lock);
// Cloned because we want to return starting indexes for the computer
let mut indexes = starting_indexes.clone();
debug!("Indexes cloned.");
let mut lengths = starting_lengths;
let is_export_height =
|height: Height| -> bool { height != 0 && height % SNAPSHOT_BLOCK_RANGE == 0 };
@@ -268,7 +245,7 @@ impl Indexer {
debug!("Indexing block {height}...");
}
indexes.height = height;
lengths.height = height;
vecs.blocks.position.push(block.metadata().position());
block.tx_metadata().iter().for_each(|m| {
@@ -279,7 +256,7 @@ impl Indexer {
block: &block,
height,
check_collisions,
indexes: &mut indexes,
lengths: &mut lengths,
vecs,
stores,
readers: &readers,
@@ -309,7 +286,7 @@ impl Indexer {
processor.check_txid_collisions(&txs)?;
let sigops = processor.compute_sigops(&txins);
let sigops = processor.compute_sigops(&txins, &txouts);
processor.finalize_and_store_metadata(
txs,
@@ -321,16 +298,14 @@ impl Indexer {
&mut buffers.same_block_output_info,
)?;
processor.update_indexes(tx_count, input_count, output_count);
processor
.lengths
.add_block(tx_count, input_count, output_count);
if is_export_height(height) {
drop(readers);
export(stores, vecs, height)?;
readers = Readers::new(vecs);
if height == Height::new(500_000) {
break;
}
}
*self.tip_blockhash.write() = block.block_hash().into();
@@ -339,14 +314,14 @@ impl Indexer {
drop(readers);
let lock = exit.lock();
let tasks = self.stores.take_all_pending_ingests(indexes.height)?;
self.vecs.stamped_write(indexes.height)?;
let tasks = self.stores.take_all_pending_ingests(lengths.height)?;
self.vecs.stamped_write(lengths.height)?;
let fjall_db = self.stores.db.clone();
self.vecs.db.run_bg(move |db| {
let _lock = lock;
sleep(Duration::from_secs(5));
db.bg_sleep(Duration::from_secs(3));
info!("Exporting...");
let i = Instant::now();
@@ -371,6 +346,45 @@ impl Indexer {
Ok(())
});
Ok(starting_indexes)
Ok(())
}
fn check_xor_bytes(&mut self, reader: &Reader) -> Result<()> {
let current = reader.xor_bytes();
let cached = XORBytes::from(self.path.as_path());
if cached == current {
return Ok(());
}
self.full_reset()?;
fs::write(self.path.join("xor.dat"), *current)?;
Ok(())
}
/// Publish disk state as the new safe-lengths snapshot. Drains pending
/// bg ingest first so stores are queryable at the new bound.
pub fn advance_safe_lengths(&mut self) -> Result<()> {
self.vecs.db.sync_bg_tasks()?;
if let Some(lengths) = Lengths::from_local(&mut self.vecs, &self.stores) {
self.safe_lengths.advance(lengths);
}
Ok(())
}
}
impl ReadOnlyClone for Indexer {
type ReadOnly = Indexer<Ro>;
fn read_only_clone(&self) -> Indexer<Ro> {
Indexer {
path: self.path.clone(),
vecs: self.vecs.read_only_clone(),
stores: self.stores.clone(),
tip_blockhash: self.tip_blockhash.clone(),
safe_lengths: self.safe_lengths.clone(),
}
}
}

View File

@@ -4,7 +4,6 @@ use tracing::error;
use vecdb::WritableVec;
use super::{BlockProcessor, ComputedTx};
use crate::IndexesExt;
impl BlockProcessor<'_> {
pub fn process_block_metadata(&mut self) -> Result<()> {
@@ -22,7 +21,7 @@ impl BlockProcessor<'_> {
return Err(Error::Internal("BlockHash prefix collision"));
}
self.indexes.checked_push(self.vecs)?;
self.lengths.checked_push(self.vecs)?;
self.stores
.blockhash_prefix_to_height

View File

@@ -9,32 +9,23 @@ pub use types::*;
use brk_cohort::ByAddrType;
use brk_error::Result;
use brk_types::{
AddrHash, Block, Height, OutPoint, SigOps, TxInIndex, TxIndex, TxOutIndex, TypeIndex,
};
use brk_types::{AddrHash, Block, Height, OutPoint, SigOps, TxInIndex, TypeIndex};
use rustc_hash::{FxHashMap, FxHashSet};
use crate::{Indexes, Readers, Stores, Vecs};
use crate::{Lengths, Readers, Stores, Vecs};
/// Processes a single block, extracting and storing all indexed data.
pub struct BlockProcessor<'a> {
pub block: &'a Block,
pub height: Height,
pub check_collisions: bool,
pub indexes: &'a mut Indexes,
pub lengths: &'a mut Lengths,
pub vecs: &'a mut Vecs,
pub stores: &'a mut Stores,
pub readers: &'a Readers,
}
impl BlockProcessor<'_> {
/// Update global indexes after processing a block.
pub fn update_indexes(&mut self, tx_count: usize, input_count: usize, output_count: usize) {
self.indexes.tx_index += TxIndex::from(tx_count);
self.indexes.txin_index += TxInIndex::from(input_count);
self.indexes.txout_index += TxOutIndex::from(output_count);
}
/// Finalizes outputs/inputs in parallel with storing tx metadata.
#[allow(clippy::too_many_arguments)]
pub fn finalize_and_store_metadata(
@@ -47,7 +38,7 @@ impl BlockProcessor<'_> {
already_added: &mut ByAddrType<FxHashMap<AddrHash, TypeIndex>>,
same_block_info: &mut FxHashMap<OutPoint, SameBlockOutputInfo>,
) -> Result<()> {
let indexes = &mut *self.indexes;
let lengths = &mut *self.lengths;
// Split transactions vecs: finalize needs first_txout_index/first_txin_index, metadata needs the rest
let (first_txout_index, first_txin_index, mut tx_metadata) =
@@ -66,7 +57,7 @@ impl BlockProcessor<'_> {
let (finalize_result, metadata_result) = rayon::join(
|| -> Result<()> {
txout::finalize_outputs(
indexes,
lengths,
first_txout_index,
outputs,
addrs,

View File

@@ -1,24 +1,37 @@
use bitcoin::{Script, script::Instruction};
use brk_types::{OutputType, SigOps, TxInIndex};
use rayon::prelude::*;
use smallvec::SmallVec;
use super::{BlockProcessor, InputSource};
use super::{BlockProcessor, InputSource, ProcessedOutput};
impl BlockProcessor<'_> {
/// BIP-141 sigop cost per tx in the block. Uses each input's prevout
/// `OutputType` (already resolved by `process_inputs` for the
/// previous-block case, looked up from `block.txdata` for the
/// same-block case) to feed canonical-shaped synthetic prevouts into
/// `bitcoin::Transaction::total_sigop_cost`.
pub fn compute_sigops(&self, txins: &[(TxInIndex, InputSource)]) -> Vec<SigOps> {
/// BIP-141 sigop cost per tx in the block. Mirrors
/// `bitcoin::Transaction::total_sigop_cost` but dispatches on each
/// input's prevout `OutputType` and each output's `OutputType`
/// (already resolved by `process_inputs`/`process_outputs`) instead
/// of round-tripping through bitcoin's closure API with
/// synthetic-prevout `ScriptBuf` allocations. The legacy-sigop walk
/// is short-circuited by `OutputType` for every script with a
/// canonical shape (~99% of outputs and ~95% of inputs on mainnet);
/// only `OpReturn`/`Unknown` outputs and non-segwit/non-P2SH inputs
/// fall back to a real script walk.
pub fn compute_sigops(
&self,
txins: &[(TxInIndex, InputSource)],
txouts: &[ProcessedOutput<'_>],
) -> Vec<SigOps> {
let txdata = &self.block.txdata;
let base_tx_index = u32::from(self.indexes.tx_index);
let base_tx_index = u32::from(self.lengths.tx_index);
let mut tx_input_offsets = Vec::with_capacity(txdata.len());
let mut offset = 0usize;
let mut tx_output_offsets = Vec::with_capacity(txdata.len());
let mut input_offset = 0usize;
let mut output_offset = 0usize;
for tx in txdata {
tx_input_offsets.push(offset);
offset += tx.input.len();
tx_input_offsets.push(input_offset);
input_offset += tx.input.len();
tx_output_offsets.push(output_offset);
output_offset += tx.output.len();
}
txdata
@@ -28,31 +41,144 @@ impl BlockProcessor<'_> {
if tx.is_coinbase() {
return SigOps::ZERO;
}
let start = tx_input_offsets[i];
let tx_inputs = &txins[start..start + tx.input.len()];
let in_start = tx_input_offsets[i];
let tx_inputs = &txins[in_start..in_start + tx.input.len()];
let out_start = tx_output_offsets[i];
let tx_outputs = &txouts[out_start..out_start + tx.output.len()];
let kinds: SmallVec<[(bitcoin::OutPoint, OutputType); 4]> = tx
.input
.iter()
.zip(tx_inputs.iter())
.map(|(txin, (_, source))| {
let kind = match source {
InputSource::PreviousBlock { output_type, .. } => *output_type,
InputSource::SameBlock { outpoint, .. } => {
let local =
(u32::from(outpoint.tx_index()) - base_tx_index) as usize;
let vout = u32::from(outpoint.vout()) as usize;
OutputType::from(&txdata[local].output[vout].script_pubkey)
let mut legacy: usize = 0;
let mut redeem: usize = 0;
let mut witness: usize = 0;
for (input, (_, source)) in tx.input.iter().zip(tx_inputs.iter()) {
let prev_kind = match source {
InputSource::PreviousBlock { output_type, .. } => *output_type,
InputSource::SameBlock { outpoint, .. } => {
let local =
(u32::from(outpoint.tx_index()) - base_tx_index) as usize;
let vout = u32::from(outpoint.vout()) as usize;
txouts[tx_output_offsets[local] + vout].output_type
}
};
// Single match per input: legacy script_sig sigops AND the
// redeem/witness contribution. Consensus enforces a
// push-only or empty script_sig in the four cases below
// (BIP-16 for P2SH from block 173805 onwards; BIP-141 /
// BIP-341 for segwit/taproot from activation), so legacy
// sigops are guaranteed 0 there. Everything else falls
// through to a real `count_sigops_legacy` walk.
match prev_kind {
OutputType::P2SH => {
// Faithful to bitcoin's count_p2sh_sigops + the
// nested-segwit branch of count_witness_sigops in
// a single script walk: redeem sigops use
// last_pushdata (no push-only check), wrapped
// witness sigops require both push-only and
// last_pushdata.
let (last_push, is_push_only) =
last_push_and_push_only(&input.script_sig);
let Some(redeem_bytes) = last_push else {
continue;
};
let rs = Script::from_bytes(redeem_bytes);
redeem = redeem.saturating_add(rs.count_sigops());
if !is_push_only {
continue;
}
};
(txin.previous_output, kind)
})
.collect();
if rs.is_p2wpkh() {
witness = witness.saturating_add(1);
} else if rs.is_p2wsh()
&& let Some(last) = input.witness.last()
{
witness = witness
.saturating_add(Script::from_bytes(last).count_sigops());
}
}
OutputType::P2WPKH => {
witness = witness.saturating_add(1);
}
OutputType::P2WSH => {
if let Some(last) = input.witness.last() {
witness = witness
.saturating_add(Script::from_bytes(last).count_sigops());
}
}
OutputType::P2TR => {}
_ => {
legacy = legacy
.saturating_add(input.script_sig.count_sigops_legacy());
}
}
}
SigOps::of_bitcoin_tx_with_kinds(tx, |op| {
kinds.iter().find(|(o, _)| o == op).map(|(_, k)| *k)
})
for processed in tx_outputs {
legacy = legacy.saturating_add(legacy_sigops_for_output(
processed.output_type,
&processed.txout.script_pubkey,
));
}
SigOps::from(
legacy
.saturating_mul(4)
.saturating_add(redeem.saturating_mul(4))
.saturating_add(witness),
)
})
.collect()
}
}
/// Legacy sigop count of a script_pubkey, dispatched on `OutputType`.
/// Every variant except `OpReturn` and `Unknown` has a canonical shape
/// recognised by `OutputType::from`'s exact byte-pattern matchers, so
/// the legacy sigop count is fixed: P2PKH and P2PK both end in a
/// single OP_CHECKSIG (1), P2MS contains one OP_CHECKMULTISIG counted
/// as 20 in legacy mode, and P2SH/P2WPKH/P2WSH/P2TR/P2A/Empty contain
/// no CHECKSIG-class opcodes outside their pushdata. `OpReturn`
/// payloads can include 0xac/0xae bytes outside a push, and `Unknown`
/// can be anything, so both fall back to a real script walk.
#[inline]
fn legacy_sigops_for_output(output_type: OutputType, script_pubkey: &Script) -> usize {
match output_type {
OutputType::P2PKH | OutputType::P2PK33 | OutputType::P2PK65 => 1,
OutputType::P2MS => 20,
OutputType::P2SH
| OutputType::P2WPKH
| OutputType::P2WSH
| OutputType::P2TR
| OutputType::P2A
| OutputType::Empty => 0,
OutputType::OpReturn | OutputType::Unknown => script_pubkey.count_sigops_legacy(),
}
}
/// Single-pass equivalent of bitcoin's private `last_pushdata()` plus the
/// public `is_push_only()`: returns the bytes of the script's last
/// `Instruction::PushBytes` (only when it is the *last* instruction)
/// alongside whether every instruction was a push (per Core,
/// `OP_RESERVED` and `OP_PUSHNUM_1..16` count as pushes too).
fn last_push_and_push_only(script: &Script) -> (Option<&[u8]>, bool) {
let mut last: Option<&[u8]> = None;
let mut push_only = true;
for inst in script.instructions() {
match inst {
Ok(Instruction::PushBytes(b)) => {
last = Some(b.as_bytes());
}
Ok(Instruction::Op(op)) => {
last = None;
if op.to_u8() > 0x60 {
push_only = false;
}
}
Err(_) => {
last = None;
push_only = false;
break;
}
}
}
(last, push_only)
}

View File

@@ -13,7 +13,7 @@ use super::{BlockProcessor, ComputedTx};
impl<'a> BlockProcessor<'a> {
pub fn compute_txids(&self) -> Result<Vec<ComputedTx<'a>>> {
let will_check_collisions = self.check_collisions;
let base_tx_index = self.indexes.tx_index;
let base_tx_index = self.lengths.tx_index;
self.block
.txdata

View File

@@ -22,8 +22,8 @@ impl<'a> BlockProcessor<'a> {
txid_prefix_to_tx_index.clear();
txid_prefix_to_tx_index.extend(txs.iter().map(|ct| (ct.txid_prefix, ct.tx_index)));
let base_tx_index = self.indexes.tx_index;
let base_txin_index = self.indexes.txin_index;
let base_tx_index = self.lengths.tx_index;
let base_txin_index = self.lengths.txin_index;
let total_inputs: usize = self.block.txdata.iter().map(|tx| tx.input.len()).sum();
let mut items = Vec::with_capacity(total_inputs);
@@ -79,11 +79,11 @@ impl<'a> BlockProcessor<'a> {
.map(|v| *v);
let prev_tx_index = match store_result {
Some(tx_index) if tx_index < self.indexes.tx_index => tx_index,
Some(tx_index) if tx_index < self.lengths.tx_index => tx_index,
_ => {
error!(
"UnknownTxid: txid={}, prefix={:?}, store_result={:?}, current_tx_index={:?}",
txid, txid_prefix, store_result, self.indexes.tx_index
txid, txid_prefix, store_result, self.lengths.tx_index
);
return Err(Error::UnknownTxid);
}

View File

@@ -11,15 +11,15 @@ use tracing::error;
use vecdb::{BytesVec, WritableVec};
use super::{BlockProcessor, ProcessedOutput, SameBlockOutputInfo};
use crate::{AddrsVecs, Indexes, OutputsVecs, ScriptsVecs};
use crate::{AddrsVecs, Lengths, OutputsVecs, ScriptsVecs};
impl<'a> BlockProcessor<'a> {
pub fn process_outputs(&self) -> Result<Vec<ProcessedOutput<'a>>> {
let height = self.height;
let check_collisions = self.check_collisions;
let base_tx_index = self.indexes.tx_index;
let base_txout_index = self.indexes.txout_index;
let base_tx_index = self.lengths.tx_index;
let base_txout_index = self.lengths.txout_index;
let total_outputs: usize = self.block.txdata.iter().map(|tx| tx.output.len()).sum();
let mut items = Vec::with_capacity(total_outputs);
@@ -63,7 +63,7 @@ impl<'a> BlockProcessor<'a> {
.get(&addr_hash)?
.map(|v| *v)
.and_then(|type_index_local| {
(type_index_local < self.indexes.to_type_index(addr_type))
(type_index_local < self.lengths.to_type_index(addr_type))
.then_some(type_index_local)
});
@@ -106,7 +106,7 @@ impl<'a> BlockProcessor<'a> {
#[allow(clippy::too_many_arguments)]
pub(super) fn finalize_outputs(
indexes: &mut Indexes,
lengths: &mut Lengths,
first_txout_index: &mut BytesVec<TxIndex, TxOutIndex>,
outputs: &mut OutputsVecs,
addrs: &mut AddrsVecs,
@@ -150,7 +150,7 @@ pub(super) fn finalize_outputs(
{
ti
} else {
let ti = indexes.increment_addr_index(addr_type);
let ti = lengths.increment_addr_index(addr_type);
already_added_addr_hash
.get_mut_unwrap(addr_type)
@@ -168,29 +168,29 @@ pub(super) fn finalize_outputs(
scripts
.p2ms
.to_tx_index
.checked_push(indexes.p2ms_output_index, tx_index)?;
indexes.p2ms_output_index.copy_then_increment()
.checked_push(lengths.p2ms_output_index, tx_index)?;
lengths.p2ms_output_index.copy_then_increment()
}
OutputType::OpReturn => {
scripts
.op_return
.to_tx_index
.checked_push(indexes.op_return_index, tx_index)?;
indexes.op_return_index.copy_then_increment()
.checked_push(lengths.op_return_index, tx_index)?;
lengths.op_return_index.copy_then_increment()
}
OutputType::Empty => {
scripts
.empty
.to_tx_index
.checked_push(indexes.empty_output_index, tx_index)?;
indexes.empty_output_index.copy_then_increment()
.checked_push(lengths.empty_output_index, tx_index)?;
lengths.empty_output_index.copy_then_increment()
}
OutputType::Unknown => {
scripts
.unknown
.to_tx_index
.checked_push(indexes.unknown_output_index, tx_index)?;
indexes.unknown_output_index.copy_then_increment()
.checked_push(lengths.unknown_output_index, tx_index)?;
lengths.unknown_output_index.copy_then_increment()
}
_ => unreachable!(),
}

View File

@@ -0,0 +1,112 @@
use std::sync::Arc;
use parking_lot::RwLock;
use crate::lengths::Lengths;
/// Pipeline-wide safe-read snapshot. All fields are lengths/counts
/// (next-to-write totals): `bound.f = N` means positions `0..N` are
/// fully written; readers reject `pos >= bound.f`. Covers vecs only:
/// reorg store rewrites can briefly tear in-flight reads.
#[derive(Clone, Default)]
pub struct SafeLengths(Arc<RwLock<Lengths>>);
impl SafeLengths {
pub fn new() -> Self {
Self::default()
}
pub fn load(&self) -> Lengths {
self.0.read().clone()
}
pub fn reset(&self) {
*self.0.write() = Lengths::default();
}
pub fn advance(&self, next: Lengths) {
let mut g = self.0.write();
debug_assert!(
next.height >= g.height
&& next.tx_index >= g.tx_index
&& next.txin_index >= g.txin_index
&& next.txout_index >= g.txout_index
&& next.empty_output_index >= g.empty_output_index
&& next.op_return_index >= g.op_return_index
&& next.p2ms_output_index >= g.p2ms_output_index
&& next.p2pk33_addr_index >= g.p2pk33_addr_index
&& next.p2pk65_addr_index >= g.p2pk65_addr_index
&& next.p2pkh_addr_index >= g.p2pkh_addr_index
&& next.p2sh_addr_index >= g.p2sh_addr_index
&& next.p2tr_addr_index >= g.p2tr_addr_index
&& next.p2wpkh_addr_index >= g.p2wpkh_addr_index
&& next.p2wsh_addr_index >= g.p2wsh_addr_index
&& next.p2a_addr_index >= g.p2a_addr_index
&& next.unknown_output_index >= g.unknown_output_index,
"advance: per-field regression"
);
*g = next;
}
/// Drop each field to at most `starting`. Must be called BEFORE
/// any rewrite at positions `>= starting`.
pub fn lower_before(&self, starting: &Lengths) {
let mut g = self.0.write();
g.height = g.height.min(starting.height);
g.tx_index = g.tx_index.min(starting.tx_index);
g.txin_index = g.txin_index.min(starting.txin_index);
g.txout_index = g.txout_index.min(starting.txout_index);
g.empty_output_index = g.empty_output_index.min(starting.empty_output_index);
g.op_return_index = g.op_return_index.min(starting.op_return_index);
g.p2ms_output_index = g.p2ms_output_index.min(starting.p2ms_output_index);
g.p2pk33_addr_index = g.p2pk33_addr_index.min(starting.p2pk33_addr_index);
g.p2pk65_addr_index = g.p2pk65_addr_index.min(starting.p2pk65_addr_index);
g.p2pkh_addr_index = g.p2pkh_addr_index.min(starting.p2pkh_addr_index);
g.p2sh_addr_index = g.p2sh_addr_index.min(starting.p2sh_addr_index);
g.p2tr_addr_index = g.p2tr_addr_index.min(starting.p2tr_addr_index);
g.p2wpkh_addr_index = g.p2wpkh_addr_index.min(starting.p2wpkh_addr_index);
g.p2wsh_addr_index = g.p2wsh_addr_index.min(starting.p2wsh_addr_index);
g.p2a_addr_index = g.p2a_addr_index.min(starting.p2a_addr_index);
g.unknown_output_index = g.unknown_output_index.min(starting.unknown_output_index);
}
}
#[cfg(test)]
mod tests {
use brk_types::{
EmptyOutputIndex, Height, OpReturnIndex, P2AAddrIndex, P2MSOutputIndex, P2PK33AddrIndex,
P2PK65AddrIndex, P2PKHAddrIndex, P2SHAddrIndex, P2TRAddrIndex, P2WPKHAddrIndex,
P2WSHAddrIndex, TxInIndex, TxIndex, TxOutIndex, UnknownOutputIndex,
};
use super::*;
#[test]
fn lower_before_clamps_every_field() {
let sentinel = u32::MAX as usize;
let max = Lengths {
empty_output_index: EmptyOutputIndex::from(sentinel),
height: Height::from(sentinel),
op_return_index: OpReturnIndex::from(sentinel),
p2ms_output_index: P2MSOutputIndex::from(sentinel),
p2pk33_addr_index: P2PK33AddrIndex::from(sentinel),
p2pk65_addr_index: P2PK65AddrIndex::from(sentinel),
p2pkh_addr_index: P2PKHAddrIndex::from(sentinel),
p2sh_addr_index: P2SHAddrIndex::from(sentinel),
p2tr_addr_index: P2TRAddrIndex::from(sentinel),
p2wpkh_addr_index: P2WPKHAddrIndex::from(sentinel),
p2wsh_addr_index: P2WSHAddrIndex::from(sentinel),
p2a_addr_index: P2AAddrIndex::from(sentinel),
tx_index: TxIndex::from(sentinel),
txin_index: TxInIndex::from(sentinel),
txout_index: TxOutIndex::from(sentinel),
unknown_output_index: UnknownOutputIndex::from(sentinel),
};
let safe = SafeLengths::new();
safe.advance(max);
safe.lower_before(&Lengths::default());
assert_eq!(safe.load(), Lengths::default());
}
}

View File

@@ -14,7 +14,7 @@ use rayon::prelude::*;
use tracing::{debug, info};
use vecdb::{AnyVec, ReadableVec, VecIndex};
use crate::{Indexes, constants::DUPLICATE_TXID_PREFIXES};
use crate::{Lengths, constants::DUPLICATE_TXID_PREFIXES};
use super::Vecs;
@@ -119,7 +119,7 @@ impl Stores {
Ok(stores)
}
pub fn starting_height(&self) -> Height {
pub fn next_height(&self) -> Height {
self.iter_any()
.map(|store| store.height().map(Height::incremented).unwrap_or_default())
.min()
@@ -220,24 +220,26 @@ impl Stores {
Ok(tasks)
}
/// Rewrites reverse-key entries below the lowered bound. In-flight
/// readers may briefly see torn state.
pub fn rollback_if_needed(
&mut self,
vecs: &mut Vecs,
starting_indexes: &Indexes,
starting_lengths: &Lengths,
) -> Result<()> {
if self.is_empty()? {
return Ok(());
}
debug_assert!(starting_indexes.height != Height::ZERO);
debug_assert!(starting_indexes.tx_index != TxIndex::ZERO);
debug_assert!(starting_indexes.txout_index != TxOutIndex::ZERO);
debug_assert!(starting_lengths.height != Height::ZERO);
debug_assert!(starting_lengths.tx_index != TxIndex::ZERO);
debug_assert!(starting_lengths.txout_index != TxOutIndex::ZERO);
self.rollback_block_metadata(vecs, starting_indexes)?;
self.rollback_txids(vecs, starting_indexes);
self.rollback_outputs_and_inputs(vecs, starting_indexes);
self.rollback_block_metadata(vecs, starting_lengths)?;
self.rollback_txids(vecs, starting_lengths);
self.rollback_outputs_and_inputs(vecs, starting_lengths);
let rollback_height = starting_indexes.height.decremented().unwrap_or_default();
let rollback_height = starting_lengths.height.decremented().unwrap_or_default();
self.par_iter_any_mut()
.try_for_each(|store| store.export_meta(rollback_height))?;
self.commit(rollback_height)?;
@@ -265,10 +267,10 @@ impl Stores {
fn rollback_block_metadata(
&mut self,
vecs: &mut Vecs,
starting_indexes: &Indexes,
starting_lengths: &Lengths,
) -> Result<()> {
vecs.blocks.blockhash.for_each_range_at(
starting_indexes.height.to_usize(),
starting_lengths.height.to_usize(),
vecs.blocks.blockhash.len(),
|blockhash| {
self.blockhash_prefix_to_height
@@ -277,7 +279,7 @@ impl Stores {
);
for addr_type in OutputType::ADDR_TYPES {
for hash in vecs.iter_addr_hashes_from(addr_type, starting_indexes.height)? {
for hash in vecs.iter_addr_hashes_from(addr_type, starting_lengths.height)? {
self.addr_type_to_addr_hash_to_addr_index
.get_mut_unwrap(addr_type)
.remove(hash);
@@ -287,8 +289,8 @@ impl Stores {
Ok(())
}
fn rollback_txids(&mut self, vecs: &mut Vecs, starting_indexes: &Indexes) {
let start = starting_indexes.tx_index.to_usize();
fn rollback_txids(&mut self, vecs: &mut Vecs, starting_lengths: &Lengths) {
let start = starting_lengths.tx_index.to_usize();
let end = vecs.transactions.txid.len();
let mut current_index = start;
vecs.transactions
@@ -313,7 +315,7 @@ impl Stores {
self.txid_prefix_to_tx_index.clear_caches();
}
fn rollback_outputs_and_inputs(&mut self, vecs: &mut Vecs, starting_indexes: &Indexes) {
fn rollback_outputs_and_inputs(&mut self, vecs: &mut Vecs, starting_lengths: &Lengths) {
let tx_index_to_first_txout_index_reader = vecs.transactions.first_txout_index.reader();
let txout_index_to_output_type_reader = vecs.outputs.output_type.reader();
let txout_index_to_type_index_reader = vecs.outputs.type_index.reader();
@@ -321,7 +323,7 @@ impl Stores {
let mut addr_index_tx_index_to_remove: FxHashSet<(OutputType, TypeIndex, TxIndex)> =
FxHashSet::default();
let rollback_start = starting_indexes.txout_index.to_usize();
let rollback_start = starting_lengths.txout_index.to_usize();
let rollback_end = vecs.outputs.output_type.len();
let tx_indexes: Vec<TxIndex> = vecs
@@ -354,7 +356,7 @@ impl Stores {
.remove(AddrIndexOutPoint::from((addr_index, outpoint)));
}
let start = starting_indexes.txin_index.to_usize();
let start = starting_lengths.txin_index.to_usize();
let end = vecs.inputs.outpoint.len();
let outpoints: Vec<OutPoint> = vecs.inputs.outpoint.collect_range_at(start, end);
let spending_tx_indexes: Vec<TxIndex> = vecs.inputs.tx_index.collect_range_at(start, end);
@@ -372,7 +374,7 @@ impl Stores {
let txout_index =
tx_index_to_first_txout_index_reader.get(output_tx_index.to_usize()) + vout;
if txout_index < starting_indexes.txout_index {
if txout_index < starting_lengths.txout_index {
let output_type = txout_index_to_output_type_reader.get(txout_index.to_usize());
let type_index = txout_index_to_type_index_reader.get(txout_index.to_usize());
Some((outpoint, output_type, type_index, spending_tx_index))

View File

@@ -25,7 +25,7 @@ pub use outputs::*;
pub use scripts::*;
pub use transactions::*;
use crate::Indexes;
use crate::Lengths;
#[derive(Traversable)]
pub struct Vecs<M: StorageMode = Rw> {
@@ -80,40 +80,40 @@ impl Vecs {
Ok(this)
}
pub fn rollback_if_needed(&mut self, starting_indexes: &Indexes) -> Result<()> {
let saved_height = starting_indexes.height.decremented().unwrap_or_default();
pub fn rollback_if_needed(&mut self, starting_lengths: &Lengths) -> Result<()> {
let saved_height = starting_lengths.height.decremented().unwrap_or_default();
let stamp = Stamp::from(u64::from(saved_height));
self.blocks.truncate(starting_indexes.height, stamp)?;
self.blocks.truncate(starting_lengths.height, stamp)?;
self.transactions
.truncate(starting_indexes.height, starting_indexes.tx_index, stamp)?;
.truncate(starting_lengths.height, starting_lengths.tx_index, stamp)?;
self.inputs
.truncate(starting_indexes.height, starting_indexes.txin_index, stamp)?;
.truncate(starting_lengths.height, starting_lengths.txin_index, stamp)?;
self.outputs
.truncate(starting_indexes.height, starting_indexes.txout_index, stamp)?;
.truncate(starting_lengths.height, starting_lengths.txout_index, stamp)?;
self.addrs.truncate(
starting_indexes.height,
starting_indexes.p2pk65_addr_index,
starting_indexes.p2pk33_addr_index,
starting_indexes.p2pkh_addr_index,
starting_indexes.p2sh_addr_index,
starting_indexes.p2wpkh_addr_index,
starting_indexes.p2wsh_addr_index,
starting_indexes.p2tr_addr_index,
starting_indexes.p2a_addr_index,
starting_lengths.height,
starting_lengths.p2pk65_addr_index,
starting_lengths.p2pk33_addr_index,
starting_lengths.p2pkh_addr_index,
starting_lengths.p2sh_addr_index,
starting_lengths.p2wpkh_addr_index,
starting_lengths.p2wsh_addr_index,
starting_lengths.p2tr_addr_index,
starting_lengths.p2a_addr_index,
stamp,
)?;
self.scripts.truncate(
starting_indexes.height,
starting_indexes.empty_output_index,
starting_indexes.op_return_index,
starting_indexes.p2ms_output_index,
starting_indexes.unknown_output_index,
starting_lengths.height,
starting_lengths.empty_output_index,
starting_lengths.op_return_index,
starting_lengths.p2ms_output_index,
starting_lengths.unknown_output_index,
stamp,
)?;
@@ -126,7 +126,7 @@ impl Vecs {
Ok(())
}
pub fn starting_height(&mut self) -> Height {
pub fn next_height(&mut self) -> Height {
self.par_iter_mut_any_stored_vec()
.map(|vec| {
let h = Height::from(vec.stamp());