mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
global: speed improvement
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -14,6 +14,7 @@ mod month1;
|
||||
mod month3;
|
||||
mod month6;
|
||||
pub mod timestamp;
|
||||
mod tx_heights;
|
||||
mod tx_index;
|
||||
mod txin_index;
|
||||
mod txout_index;
|
||||
@@ -50,6 +51,7 @@ pub use month1::Vecs as Month1Vecs;
|
||||
pub use month3::Vecs as Month3Vecs;
|
||||
pub use month6::Vecs as Month6Vecs;
|
||||
pub use timestamp::Timestamps;
|
||||
pub use tx_heights::TxHeights;
|
||||
pub use tx_index::Vecs as TxIndexVecs;
|
||||
pub use txin_index::Vecs as TxInIndexVecs;
|
||||
pub use txout_index::Vecs as TxOutIndexVecs;
|
||||
@@ -64,6 +66,8 @@ pub struct Vecs<M: StorageMode = Rw> {
|
||||
db: Database,
|
||||
#[traversable(skip)]
|
||||
pub cached_mappings: CachedMappings,
|
||||
#[traversable(skip)]
|
||||
pub tx_heights: TxHeights,
|
||||
pub addr: AddrVecs,
|
||||
pub height: HeightVecs<M>,
|
||||
pub epoch: EpochVecs<M>,
|
||||
@@ -143,6 +147,7 @@ impl Vecs {
|
||||
|
||||
let this = Self {
|
||||
cached_mappings,
|
||||
tx_heights: TxHeights::init(indexer),
|
||||
addr,
|
||||
height,
|
||||
epoch,
|
||||
@@ -179,6 +184,8 @@ impl Vecs {
|
||||
) -> Result<Indexes> {
|
||||
self.db.sync_bg_tasks()?;
|
||||
|
||||
self.tx_heights.update(indexer, starting_indexes.height);
|
||||
|
||||
// timestamp_monotonic must be computed first — other mappings read it
|
||||
self.timestamp
|
||||
.compute_monotonic(indexer, starting_indexes.height, exit)?;
|
||||
|
||||
61
crates/brk_computer/src/indexes/tx_heights.rs
Normal file
61
crates/brk_computer/src/indexes/tx_heights.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use brk_indexer::Indexer;
|
||||
use brk_types::{Height, RangeMap, TxIndex};
|
||||
use parking_lot::RwLock;
|
||||
use vecdb::{AnyVec, ReadableVec, VecIndex};
|
||||
|
||||
/// Reverse mapping from `TxIndex` → `Height` via binary search on block boundaries.
|
||||
///
|
||||
/// Built from `first_tx_index` (the first TxIndex in each block). A floor lookup
|
||||
/// on any TxIndex gives the block height that contains it.
|
||||
///
|
||||
/// Wrapped in `Arc<RwLock<>>` so the compute thread can extend it while
|
||||
/// query threads read concurrently — the inner `RangeMap` is purely in-memory
|
||||
/// and wouldn't stay current through mmap like PcoVec/BytesVec do.
|
||||
#[derive(Clone)]
|
||||
pub struct TxHeights(Arc<RwLock<RangeMap<TxIndex, Height>>>);
|
||||
|
||||
impl TxHeights {
|
||||
/// Build from the full `first_tx_index` vec at startup.
|
||||
pub fn init(indexer: &Indexer) -> Self {
|
||||
let len = indexer.vecs.transactions.first_tx_index.len();
|
||||
let entries: Vec<TxIndex> = if len > 0 {
|
||||
indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.first_tx_index
|
||||
.collect_range_at(0, len)
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
Self(Arc::new(RwLock::new(RangeMap::from(entries))))
|
||||
}
|
||||
|
||||
/// Extend with new blocks since last call. Truncates on reorg.
|
||||
pub fn update(&self, indexer: &Indexer, reorg_height: Height) {
|
||||
let mut inner = self.0.write();
|
||||
let reorg_len = reorg_height.to_usize();
|
||||
if inner.len() > reorg_len {
|
||||
inner.truncate(reorg_len);
|
||||
}
|
||||
let target_len = indexer.vecs.transactions.first_tx_index.len();
|
||||
let current_len = inner.len();
|
||||
if current_len < target_len {
|
||||
let new_entries: Vec<TxIndex> = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.first_tx_index
|
||||
.collect_range_at(current_len, target_len);
|
||||
for entry in new_entries {
|
||||
inner.push(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up the block height for a given tx_index.
|
||||
#[inline]
|
||||
pub fn get_shared(&self, tx_index: TxIndex) -> Option<Height> {
|
||||
self.0.read().get_shared(tx_index)
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,9 @@ use vecdb::{
|
||||
|
||||
pub mod major;
|
||||
pub mod minor;
|
||||
mod pool_heights;
|
||||
|
||||
pub use pool_heights::PoolHeights;
|
||||
|
||||
use crate::{
|
||||
blocks, indexes,
|
||||
@@ -30,6 +33,8 @@ pub struct Vecs<M: StorageMode = Rw> {
|
||||
pools: &'static Pools,
|
||||
|
||||
pub pool: M::Stored<BytesVec<Height, PoolSlug>>,
|
||||
#[traversable(skip)]
|
||||
pub pool_heights: PoolHeights,
|
||||
pub major: BTreeMap<PoolSlug, major::Vecs<M>>,
|
||||
pub minor: BTreeMap<PoolSlug, minor::Vecs<M>>,
|
||||
}
|
||||
@@ -63,8 +68,12 @@ impl Vecs {
|
||||
}
|
||||
}
|
||||
|
||||
let pool = BytesVec::forced_import(&db, "pool", version)?;
|
||||
let pool_heights = PoolHeights::build(&pool);
|
||||
|
||||
let this = Self {
|
||||
pool: BytesVec::forced_import(&db, "pool", version)?,
|
||||
pool,
|
||||
pool_heights,
|
||||
major: major_map,
|
||||
minor: minor_map,
|
||||
pools,
|
||||
@@ -149,8 +158,10 @@ impl Vecs {
|
||||
let mut output_count_cursor = indexes.tx_index.output_count.cursor();
|
||||
|
||||
self.pool.truncate_if_needed_at(min)?;
|
||||
self.pool_heights.truncate(min);
|
||||
|
||||
let len = indexer.vecs.blocks.coinbase_tag.len();
|
||||
let mut next_height = min;
|
||||
|
||||
indexer.vecs.blocks.coinbase_tag.try_for_each_range_at(
|
||||
min,
|
||||
@@ -186,6 +197,9 @@ impl Vecs {
|
||||
.unwrap_or(unknown);
|
||||
|
||||
self.pool.push(pool.slug);
|
||||
self.pool_heights.push(pool.slug, Height::from(next_height));
|
||||
next_height += 1;
|
||||
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
|
||||
39
crates/brk_computer/src/pools/pool_heights.rs
Normal file
39
crates/brk_computer/src/pools/pool_heights.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use brk_types::{Height, PoolSlug};
|
||||
use parking_lot::RwLock;
|
||||
use rustc_hash::FxHashMap;
|
||||
use vecdb::{AnyVec, BytesVec, VecIndex};
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct PoolHeights(Arc<RwLock<FxHashMap<PoolSlug, Vec<Height>>>>);
|
||||
|
||||
impl PoolHeights {
|
||||
pub fn build(pool: &BytesVec<Height, PoolSlug>) -> Self {
|
||||
let len = pool.len();
|
||||
let mut map: FxHashMap<PoolSlug, Vec<Height>> = FxHashMap::default();
|
||||
let reader = pool.reader();
|
||||
for h in 0..len {
|
||||
map.entry(reader.get(h))
|
||||
.or_default()
|
||||
.push(Height::from(h));
|
||||
}
|
||||
Self(Arc::new(RwLock::new(map)))
|
||||
}
|
||||
|
||||
pub fn truncate(&self, min: usize) {
|
||||
let mut cache = self.0.write();
|
||||
for heights in cache.values_mut() {
|
||||
let cut = heights.partition_point(|h| h.to_usize() < min);
|
||||
heights.truncate(cut);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push(&self, slug: PoolSlug, height: Height) {
|
||||
self.0.write().entry(slug).or_default().push(height);
|
||||
}
|
||||
|
||||
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, FxHashMap<PoolSlug, Vec<Height>>> {
|
||||
self.0.read()
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,21 @@ use thiserror::Error;
|
||||
|
||||
pub type Result<T, E = Error> = result::Result<T, E>;
|
||||
|
||||
/// Convert `Option<T>` → `Result<T>` without panicking.
|
||||
///
|
||||
/// Replaces `.unwrap()` in query paths so a missing value returns
|
||||
/// HTTP 500 instead of crashing the server (`panic = "abort"`).
|
||||
pub trait OptionData<T> {
|
||||
fn data(self) -> Result<T>;
|
||||
}
|
||||
|
||||
impl<T> OptionData<T> for Option<T> {
|
||||
#[inline]
|
||||
fn data(self) -> Result<T> {
|
||||
self.ok_or(Error::Internal("data unavailable"))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error(transparent)]
|
||||
|
||||
@@ -26,6 +26,7 @@ jiff = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
# quickmatch = { path = "../../../quickmatch" }
|
||||
quickmatch = "0.4.0"
|
||||
rustc-hash = { workspace = true }
|
||||
tokio = { workspace = true, optional = true }
|
||||
serde_json = { workspace = true }
|
||||
vecdb = { workspace = true }
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use bitcoin::{Network, PublicKey, ScriptBuf};
|
||||
use brk_error::{Error, Result};
|
||||
use brk_error::{Error, OptionData, Result};
|
||||
use brk_types::{
|
||||
Addr, AddrBytes, AddrChainStats, AddrHash, AddrIndexOutPoint, AddrIndexTxIndex, AddrStats,
|
||||
AnyAddrDataIndexEnum, BlockHash, Dollars, Height, OutputType, Timestamp, Transaction, TxIndex,
|
||||
@@ -136,7 +136,7 @@ impl Query {
|
||||
let store = stores
|
||||
.addr_type_to_addr_index_and_tx_index
|
||||
.get(output_type)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
|
||||
if let Some(after_txid) = after_txid {
|
||||
let after_tx_index = stores
|
||||
@@ -177,7 +177,7 @@ impl Query {
|
||||
let store = stores
|
||||
.addr_type_to_addr_index_and_unspent_outpoint
|
||||
.get(output_type)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
|
||||
let prefix = u32::from(type_index).to_be_bytes();
|
||||
|
||||
@@ -190,7 +190,7 @@ impl Query {
|
||||
let first_txout_index_reader = vecs.transactions.first_txout_index.reader();
|
||||
let value_reader = vecs.outputs.value.reader();
|
||||
let blockhash_reader = vecs.blocks.blockhash.reader();
|
||||
let mut height_cursor = vecs.transactions.height.cursor();
|
||||
let tx_heights = &self.computer().indexes.tx_heights;
|
||||
let mut block_ts_cursor = vecs.blocks.timestamp.cursor();
|
||||
|
||||
let mut cached_block: Option<(Height, BlockHash, Timestamp)> = None;
|
||||
@@ -198,7 +198,7 @@ impl Query {
|
||||
|
||||
for (tx_index, vout) in outpoints {
|
||||
let txid = txid_reader.get(tx_index.to_usize());
|
||||
let height: Height = height_cursor.get(tx_index.to_usize()).unwrap();
|
||||
let height: Height = tx_heights.get_shared(tx_index).data()?;
|
||||
let first_txout_index = first_txout_index_reader.get(tx_index.to_usize());
|
||||
let value = value_reader.get(usize::from(first_txout_index + vout));
|
||||
|
||||
@@ -208,7 +208,7 @@ impl Query {
|
||||
(bh.clone(), bt)
|
||||
} else {
|
||||
let bh = blockhash_reader.get(height.to_usize());
|
||||
let bt = block_ts_cursor.get(height.to_usize()).unwrap();
|
||||
let bt = block_ts_cursor.get(height.to_usize()).data()?;
|
||||
cached_block = Some((height, bh.clone(), bt));
|
||||
(bh, bt)
|
||||
};
|
||||
@@ -261,7 +261,7 @@ impl Query {
|
||||
.stores
|
||||
.addr_type_to_addr_index_and_tx_index
|
||||
.get(output_type)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
let prefix = u32::from(type_index).to_be_bytes();
|
||||
let last_tx_index = store
|
||||
.prefix(prefix)
|
||||
@@ -287,7 +287,7 @@ impl Query {
|
||||
let Ok(Some(type_index)) = stores
|
||||
.addr_type_to_addr_hash_to_addr_index
|
||||
.get(output_type)
|
||||
.unwrap()
|
||||
.data()?
|
||||
.get(&hash)
|
||||
.map(|opt| opt.map(|cow| cow.into_owned()))
|
||||
else {
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::io::Read;
|
||||
|
||||
use bitcoin::consensus::Decodable;
|
||||
use bitcoin::hex::DisplayHex;
|
||||
use brk_error::{Error, Result};
|
||||
use brk_error::{Error, OptionData, Result};
|
||||
use brk_types::{
|
||||
BlockExtras, BlockHash, BlockHashPrefix, BlockHeader, BlockInfo, BlockInfoV1, BlockPool,
|
||||
FeeRate, Height, PoolSlug, Sats, Timestamp, TxIndex, VSize, pools,
|
||||
@@ -443,7 +443,7 @@ impl Query {
|
||||
.blocks
|
||||
.position
|
||||
.collect_one(height)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
let raw = self.reader().read_raw_bytes(position, HEADER_SIZE)?;
|
||||
bitcoin::block::Header::consensus_decode(&mut raw.as_slice())
|
||||
.map_err(|_| Error::Internal("Failed to decode block header"))
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use brk_error::{Error, Result};
|
||||
use brk_error::{Error, OptionData, Result};
|
||||
use brk_types::{BlockHash, Height};
|
||||
use vecdb::{AnyVec, ReadableVec};
|
||||
|
||||
@@ -19,8 +19,8 @@ impl Query {
|
||||
return Err(Error::OutOfRange("Block height out of range".into()));
|
||||
}
|
||||
|
||||
let position = indexer.vecs.blocks.position.collect_one(height).unwrap();
|
||||
let size = indexer.vecs.blocks.total.collect_one(height).unwrap();
|
||||
let position = indexer.vecs.blocks.position.collect_one(height).data()?;
|
||||
let size = indexer.vecs.blocks.total.collect_one(height).data()?;
|
||||
|
||||
reader.read_raw_bytes(position, *size as usize)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use brk_error::{Error, Result};
|
||||
use brk_error::{Error, OptionData, Result};
|
||||
use brk_types::{BlockTimestamp, Date, Day1, Height, Timestamp};
|
||||
use jiff::Timestamp as JiffTimestamp;
|
||||
use vecdb::ReadableVec;
|
||||
@@ -35,10 +35,10 @@ impl Query {
|
||||
|
||||
// Search forward from start to find the last block <= target timestamp
|
||||
let mut best_height = start;
|
||||
let mut best_ts = ts_cursor.get(start).unwrap();
|
||||
let mut best_ts = ts_cursor.get(start).data()?;
|
||||
|
||||
for h in (start + 1)..=max_height_usize {
|
||||
let block_ts = ts_cursor.get(h).unwrap();
|
||||
let block_ts = ts_cursor.get(h).data()?;
|
||||
if block_ts <= target {
|
||||
best_height = h;
|
||||
best_ts = block_ts;
|
||||
@@ -49,7 +49,7 @@ impl Query {
|
||||
|
||||
// Check one block before start in case we need to go backward
|
||||
if start > 0 && best_ts > target {
|
||||
let prev_ts = ts_cursor.get(start - 1).unwrap();
|
||||
let prev_ts = ts_cursor.get(start - 1).data()?;
|
||||
if prev_ts <= target {
|
||||
best_height = start - 1;
|
||||
best_ts = prev_ts;
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
use std::io::Cursor;
|
||||
|
||||
use bitcoin::{consensus::Decodable, hex::DisplayHex};
|
||||
use brk_error::{Error, Result};
|
||||
use brk_error::{Error, OptionData, Result};
|
||||
use brk_types::{
|
||||
BlockHash, Height, OutputType, Sats, Timestamp, Transaction, TxIn, TxIndex, TxOut, TxStatus,
|
||||
Txid, Vout, Weight,
|
||||
BlkPosition, BlockHash, Height, OutPoint, OutputType, RawLockTime, Sats, StoredU32, Timestamp,
|
||||
Transaction, TxIn, TxInIndex, TxIndex, TxOut, TxOutIndex, TxStatus, Txid, Vout, Weight,
|
||||
};
|
||||
use rustc_hash::FxHashMap;
|
||||
use vecdb::{AnyVec, ReadableVec, VecIndex};
|
||||
|
||||
use super::BLOCK_TXS_PAGE_SIZE;
|
||||
@@ -64,6 +65,11 @@ impl Query {
|
||||
|
||||
/// Batch-read transactions at arbitrary indices.
|
||||
/// Reads in ascending index order for I/O locality, returns in caller's order.
|
||||
///
|
||||
/// Three-phase approach for optimal I/O:
|
||||
/// Phase 1 — Decode transactions & collect outpoints (sorted by tx_index)
|
||||
/// Phase 2 — Batch-read all prevout data (sorted by prev_tx_index, then txout_index)
|
||||
/// Phase 3 — Assemble Transaction objects from pre-fetched data
|
||||
pub fn transactions_by_indices(&self, indices: &[TxIndex]) -> Result<Vec<Transaction>> {
|
||||
if indices.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
@@ -78,37 +84,46 @@ impl Query {
|
||||
let indexer = self.indexer();
|
||||
let reader = self.reader();
|
||||
|
||||
// ── Phase 1: Decode all transactions, collect outpoints ─────────
|
||||
|
||||
let tx_heights = &self.computer().indexes.tx_heights;
|
||||
let mut txid_cursor = indexer.vecs.transactions.txid.cursor();
|
||||
let mut height_cursor = indexer.vecs.transactions.height.cursor();
|
||||
let mut locktime_cursor = indexer.vecs.transactions.raw_locktime.cursor();
|
||||
let mut total_size_cursor = indexer.vecs.transactions.total_size.cursor();
|
||||
let mut first_txin_cursor = indexer.vecs.transactions.first_txin_index.cursor();
|
||||
let mut position_cursor = indexer.vecs.transactions.position.cursor();
|
||||
|
||||
let txid_reader = indexer.vecs.transactions.txid.reader();
|
||||
let first_txout_index_reader = indexer.vecs.transactions.first_txout_index.reader();
|
||||
let value_reader = indexer.vecs.outputs.value.reader();
|
||||
let output_type_reader = indexer.vecs.outputs.output_type.reader();
|
||||
let type_index_reader = indexer.vecs.outputs.type_index.reader();
|
||||
let addr_readers = indexer.vecs.addrs.addr_readers();
|
||||
let blockhash_reader = indexer.vecs.blocks.blockhash.reader();
|
||||
let mut block_ts_cursor = indexer.vecs.blocks.timestamp.cursor();
|
||||
|
||||
struct DecodedTx {
|
||||
pos: usize,
|
||||
tx_index: TxIndex,
|
||||
txid: Txid,
|
||||
height: Height,
|
||||
lock_time: RawLockTime,
|
||||
total_size: StoredU32,
|
||||
block_hash: BlockHash,
|
||||
block_time: Timestamp,
|
||||
decoded: bitcoin::Transaction,
|
||||
first_txin_index: TxInIndex,
|
||||
outpoints: Vec<OutPoint>,
|
||||
}
|
||||
|
||||
let mut cached_block: Option<(Height, BlockHash, Timestamp)> = None;
|
||||
let mut decoded_txs: Vec<DecodedTx> = Vec::with_capacity(len);
|
||||
let mut total_inputs: usize = 0;
|
||||
|
||||
// Read in sorted order, write directly to original position
|
||||
let mut txs: Vec<Option<Transaction>> = (0..len).map(|_| None).collect();
|
||||
|
||||
// Phase 1a: Read metadata + decode transactions (no outpoint reads yet)
|
||||
for &pos in &order {
|
||||
let tx_index = indices[pos];
|
||||
let idx = tx_index.to_usize();
|
||||
|
||||
let txid = txid_cursor.get(idx).unwrap();
|
||||
let height = height_cursor.get(idx).unwrap();
|
||||
let lock_time = locktime_cursor.get(idx).unwrap();
|
||||
let total_size = total_size_cursor.get(idx).unwrap();
|
||||
let first_txin_index = first_txin_cursor.get(idx).unwrap();
|
||||
let position = position_cursor.get(idx).unwrap();
|
||||
let txid: Txid = txid_cursor.get(idx).data()?;
|
||||
let height: Height = tx_heights.get_shared(tx_index).data()?;
|
||||
let lock_time: RawLockTime = locktime_cursor.get(idx).data()?;
|
||||
let total_size: StoredU32 = total_size_cursor.get(idx).data()?;
|
||||
let first_txin_index: TxInIndex = first_txin_cursor.get(idx).data()?;
|
||||
let position: BlkPosition = position_cursor.get(idx).data()?;
|
||||
|
||||
let (block_hash, block_time) = if let Some((h, ref bh, bt)) = cached_block
|
||||
&& h == height
|
||||
@@ -116,48 +131,126 @@ impl Query {
|
||||
(bh.clone(), bt)
|
||||
} else {
|
||||
let bh = blockhash_reader.get(height.to_usize());
|
||||
let bt = block_ts_cursor.get(height.to_usize()).unwrap();
|
||||
let bt = block_ts_cursor.get(height.to_usize()).data()?;
|
||||
cached_block = Some((height, bh.clone(), bt));
|
||||
(bh, bt)
|
||||
};
|
||||
|
||||
let buffer = reader.read_raw_bytes(position, *total_size as usize)?;
|
||||
let tx = bitcoin::Transaction::consensus_decode(&mut Cursor::new(buffer))
|
||||
let decoded = bitcoin::Transaction::consensus_decode(&mut Cursor::new(buffer))
|
||||
.map_err(|_| Error::Parse("Failed to decode transaction".into()))?;
|
||||
|
||||
let outpoints = indexer.vecs.inputs.outpoint.collect_range_at(
|
||||
usize::from(first_txin_index),
|
||||
usize::from(first_txin_index) + tx.input.len(),
|
||||
);
|
||||
total_inputs += decoded.input.len();
|
||||
|
||||
let input: Vec<TxIn> = tx
|
||||
decoded_txs.push(DecodedTx {
|
||||
pos,
|
||||
tx_index,
|
||||
txid,
|
||||
height,
|
||||
lock_time,
|
||||
total_size,
|
||||
block_hash,
|
||||
block_time,
|
||||
decoded,
|
||||
first_txin_index,
|
||||
outpoints: Vec::new(),
|
||||
});
|
||||
}
|
||||
|
||||
// Phase 1b: Batch-read outpoints via cursor (PcoVec — sequential
|
||||
// cursor avoids re-decompressing the same pages)
|
||||
let mut outpoint_cursor = indexer.vecs.inputs.outpoint.cursor();
|
||||
for dtx in &mut decoded_txs {
|
||||
let start = usize::from(dtx.first_txin_index);
|
||||
let count = dtx.decoded.input.len();
|
||||
let mut outpoints = Vec::with_capacity(count);
|
||||
for i in 0..count {
|
||||
outpoints.push(outpoint_cursor.get(start + i).data()?);
|
||||
}
|
||||
dtx.outpoints = outpoints;
|
||||
}
|
||||
|
||||
// ── Phase 2: Batch-read prevout data in sorted order ────────────
|
||||
|
||||
// Collect all non-coinbase outpoints, deduplicate, sort by tx_index
|
||||
let mut prevout_keys: Vec<OutPoint> = Vec::with_capacity(total_inputs);
|
||||
for dtx in &decoded_txs {
|
||||
for &op in &dtx.outpoints {
|
||||
if op.is_not_coinbase() {
|
||||
prevout_keys.push(op);
|
||||
}
|
||||
}
|
||||
}
|
||||
prevout_keys.sort_unstable();
|
||||
prevout_keys.dedup();
|
||||
|
||||
// Batch-read txid + first_txout_index sorted by prev_tx_index
|
||||
let txid_reader = indexer.vecs.transactions.txid.reader();
|
||||
let first_txout_index_reader = indexer.vecs.transactions.first_txout_index.reader();
|
||||
|
||||
struct PrevoutIntermediate {
|
||||
outpoint: OutPoint,
|
||||
txid: Txid,
|
||||
txout_index: TxOutIndex,
|
||||
}
|
||||
|
||||
let mut intermediates: Vec<PrevoutIntermediate> = Vec::with_capacity(prevout_keys.len());
|
||||
|
||||
for &op in &prevout_keys {
|
||||
let prev_tx_idx = op.tx_index().to_usize();
|
||||
let txid = txid_reader.get(prev_tx_idx);
|
||||
let first_txout = first_txout_index_reader.get(prev_tx_idx);
|
||||
let txout_index = first_txout + op.vout();
|
||||
intermediates.push(PrevoutIntermediate {
|
||||
outpoint: op,
|
||||
txid,
|
||||
txout_index,
|
||||
});
|
||||
}
|
||||
|
||||
// Re-sort by txout_index for sequential output data reads
|
||||
intermediates.sort_unstable_by_key(|i| i.txout_index);
|
||||
|
||||
let value_reader = indexer.vecs.outputs.value.reader();
|
||||
let output_type_reader = indexer.vecs.outputs.output_type.reader();
|
||||
let type_index_reader = indexer.vecs.outputs.type_index.reader();
|
||||
let addr_readers = indexer.vecs.addrs.addr_readers();
|
||||
|
||||
let mut prevout_map: FxHashMap<OutPoint, (Txid, TxOut)> =
|
||||
FxHashMap::with_capacity_and_hasher(intermediates.len(), Default::default());
|
||||
|
||||
for inter in &intermediates {
|
||||
let txout_idx = usize::from(inter.txout_index);
|
||||
let value: Sats = value_reader.get(txout_idx);
|
||||
let output_type: OutputType = output_type_reader.get(txout_idx);
|
||||
let type_index = type_index_reader.get(txout_idx);
|
||||
let script_pubkey = addr_readers.script_pubkey(output_type, type_index);
|
||||
prevout_map.insert(
|
||||
inter.outpoint,
|
||||
(inter.txid.clone(), TxOut::from((script_pubkey, value))),
|
||||
);
|
||||
}
|
||||
|
||||
// ── Phase 3: Assemble Transaction objects ───────────────────────
|
||||
|
||||
let mut txs: Vec<Option<Transaction>> = (0..len).map(|_| None).collect();
|
||||
|
||||
for dtx in decoded_txs {
|
||||
let input: Vec<TxIn> = dtx
|
||||
.decoded
|
||||
.input
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(j, txin)| {
|
||||
let outpoint = outpoints[j];
|
||||
let outpoint = dtx.outpoints[j];
|
||||
let is_coinbase = outpoint.is_coinbase();
|
||||
|
||||
let (prev_txid, prev_vout, prevout) = if is_coinbase {
|
||||
(Txid::COINBASE, Vout::MAX, None)
|
||||
} else {
|
||||
let prev_tx_index = outpoint.tx_index();
|
||||
let prev_vout = outpoint.vout();
|
||||
let prev_txid = txid_reader.get(prev_tx_index.to_usize());
|
||||
let prev_first_txout_index =
|
||||
first_txout_index_reader.get(prev_tx_index.to_usize());
|
||||
let prev_txout_index = prev_first_txout_index + prev_vout;
|
||||
let prev_value = value_reader.get(usize::from(prev_txout_index));
|
||||
let prev_output_type: OutputType =
|
||||
output_type_reader.get(usize::from(prev_txout_index));
|
||||
let prev_type_index = type_index_reader.get(usize::from(prev_txout_index));
|
||||
let script_pubkey =
|
||||
addr_readers.script_pubkey(prev_output_type, prev_type_index);
|
||||
(
|
||||
prev_txid,
|
||||
prev_vout,
|
||||
Some(TxOut::from((script_pubkey, prev_value))),
|
||||
)
|
||||
let (prev_txid, prev_txout) =
|
||||
prevout_map.get(&outpoint).data()?.clone();
|
||||
(prev_txid, outpoint.vout(), Some(prev_txout))
|
||||
};
|
||||
|
||||
let witness = txin
|
||||
@@ -166,7 +259,7 @@ impl Query {
|
||||
.map(|w| w.to_lower_hex_string())
|
||||
.collect();
|
||||
|
||||
TxIn {
|
||||
Ok(TxIn {
|
||||
txid: prev_txid,
|
||||
vout: prev_vout,
|
||||
prevout,
|
||||
@@ -177,29 +270,39 @@ impl Query {
|
||||
sequence: txin.sequence.0,
|
||||
inner_redeem_script_asm: (),
|
||||
inner_witness_script_asm: (),
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Result<_>>()?;
|
||||
|
||||
let weight = Weight::from(dtx.decoded.weight());
|
||||
|
||||
// O(n) sigop cost via FxHashMap instead of O(n²) linear scan
|
||||
let outpoint_to_idx: FxHashMap<bitcoin::OutPoint, usize> = dtx
|
||||
.decoded
|
||||
.input
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(j, txin)| (txin.previous_output, j))
|
||||
.collect();
|
||||
|
||||
let weight = Weight::from(tx.weight());
|
||||
let total_sigop_cost = tx.total_sigop_cost(|outpoint| {
|
||||
tx.input
|
||||
.iter()
|
||||
.position(|i| i.previous_output == *outpoint)
|
||||
.and_then(|j| input[j].prevout.as_ref())
|
||||
let total_sigop_cost = dtx.decoded.total_sigop_cost(|outpoint| {
|
||||
outpoint_to_idx
|
||||
.get(outpoint)
|
||||
.and_then(|&j| input[j].prevout.as_ref())
|
||||
.map(|p| bitcoin::TxOut {
|
||||
value: bitcoin::Amount::from_sat(u64::from(p.value)),
|
||||
script_pubkey: p.script_pubkey.clone(),
|
||||
})
|
||||
});
|
||||
let output: Vec<TxOut> = tx.output.into_iter().map(TxOut::from).collect();
|
||||
|
||||
let output: Vec<TxOut> = dtx.decoded.output.into_iter().map(TxOut::from).collect();
|
||||
|
||||
let mut transaction = Transaction {
|
||||
index: Some(tx_index),
|
||||
txid,
|
||||
version: tx.version.into(),
|
||||
lock_time,
|
||||
total_size: *total_size as usize,
|
||||
index: Some(dtx.tx_index),
|
||||
txid: dtx.txid,
|
||||
version: dtx.decoded.version.into(),
|
||||
lock_time: dtx.lock_time,
|
||||
total_size: *dtx.total_size as usize,
|
||||
weight,
|
||||
total_sigop_cost,
|
||||
fee: Sats::ZERO,
|
||||
@@ -207,14 +310,14 @@ impl Query {
|
||||
output,
|
||||
status: TxStatus {
|
||||
confirmed: true,
|
||||
block_height: Some(height),
|
||||
block_hash: Some(block_hash),
|
||||
block_time: Some(block_time),
|
||||
block_height: Some(dtx.height),
|
||||
block_hash: Some(dtx.block_hash),
|
||||
block_time: Some(dtx.block_time),
|
||||
},
|
||||
};
|
||||
|
||||
transaction.compute_fee();
|
||||
txs[pos] = Some(transaction);
|
||||
txs[dtx.pos] = Some(transaction);
|
||||
}
|
||||
|
||||
Ok(txs.into_iter().map(Option::unwrap).collect())
|
||||
@@ -231,7 +334,7 @@ impl Query {
|
||||
.transactions
|
||||
.first_tx_index
|
||||
.collect_one(height)
|
||||
.unwrap()
|
||||
.data()?
|
||||
.into();
|
||||
let next: usize = indexer
|
||||
.vecs
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_error::{OptionData, Result};
|
||||
use brk_types::{DifficultyAdjustment, Epoch, Height};
|
||||
use vecdb::ReadableVec;
|
||||
|
||||
@@ -25,7 +25,7 @@ impl Query {
|
||||
.height
|
||||
.epoch
|
||||
.collect_one(current_height)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
let current_epoch_usize: usize = current_epoch.into();
|
||||
|
||||
// Get epoch start height
|
||||
@@ -34,7 +34,7 @@ impl Query {
|
||||
.epoch
|
||||
.first_height
|
||||
.collect_one(current_epoch)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
let epoch_start_u32: u32 = epoch_start_height.into();
|
||||
|
||||
// Calculate epoch progress
|
||||
@@ -49,13 +49,13 @@ impl Query {
|
||||
.timestamp
|
||||
.epoch
|
||||
.collect_one(current_epoch)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
let current_timestamp = indexer
|
||||
.vecs
|
||||
.blocks
|
||||
.timestamp
|
||||
.collect_one(current_height)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
|
||||
// Calculate average block time in current epoch
|
||||
let elapsed_time = (*current_timestamp - *epoch_start_timestamp) as u64;
|
||||
@@ -92,20 +92,20 @@ impl Query {
|
||||
.epoch
|
||||
.first_height
|
||||
.collect_one(prev_epoch)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
|
||||
let prev_difficulty = indexer
|
||||
.vecs
|
||||
.blocks
|
||||
.difficulty
|
||||
.collect_one(prev_epoch_start)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
let curr_difficulty = indexer
|
||||
.vecs
|
||||
.blocks
|
||||
.difficulty
|
||||
.collect_one(epoch_start_height)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
|
||||
let retarget = if *prev_difficulty > 0.0 {
|
||||
((*curr_difficulty / *prev_difficulty) - 1.0) * 100.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use brk_error::Result;
|
||||
use brk_error::{OptionData, Result};
|
||||
use brk_types::{DifficultyEntry, HashrateEntry, HashrateSummary, Height, TimePeriod};
|
||||
use vecdb::{ReadableOptionVec, ReadableVec, VecIndex};
|
||||
|
||||
@@ -17,7 +17,7 @@ impl Query {
|
||||
.blocks
|
||||
.difficulty
|
||||
.collect_one(current_height)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
|
||||
// Get current hashrate
|
||||
let current_day1 = computer
|
||||
@@ -25,7 +25,7 @@ impl Query {
|
||||
.height
|
||||
.day1
|
||||
.collect_one(current_height)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
|
||||
let current_hashrate = *computer
|
||||
.mining
|
||||
@@ -49,7 +49,7 @@ impl Query {
|
||||
.height
|
||||
.day1
|
||||
.collect_one(Height::from(start))
|
||||
.unwrap();
|
||||
.data()?;
|
||||
let end_day1 = current_day1;
|
||||
|
||||
// Sample at regular intervals to avoid too many data points
|
||||
|
||||
@@ -291,20 +291,25 @@ impl Query {
|
||||
let computer = self.computer();
|
||||
let max_height = self.height().to_usize();
|
||||
let start = start_height.map(|h| h.to_usize()).unwrap_or(max_height);
|
||||
|
||||
let reader = computer.pools.pool.reader();
|
||||
let end = start.min(reader.len().saturating_sub(1));
|
||||
let end = start.min(computer.pools.pool.len().saturating_sub(1));
|
||||
|
||||
const POOL_BLOCKS_LIMIT: usize = 100;
|
||||
let mut heights = Vec::with_capacity(POOL_BLOCKS_LIMIT);
|
||||
for h in (0..=end).rev() {
|
||||
if reader.get(h) == slug {
|
||||
heights.push(h);
|
||||
if heights.len() >= POOL_BLOCKS_LIMIT {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let heights: Vec<usize> = computer
|
||||
.pools
|
||||
.pool_heights
|
||||
.read()
|
||||
.get(&slug)
|
||||
.map(|pool_heights| {
|
||||
let pos = pool_heights.partition_point(|h| h.to_usize() <= end);
|
||||
let start = pos.saturating_sub(POOL_BLOCKS_LIMIT);
|
||||
pool_heights[start..pos]
|
||||
.iter()
|
||||
.rev()
|
||||
.map(|h| h.to_usize())
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
// Group consecutive descending heights into ranges for batch reads
|
||||
let mut blocks = Vec::with_capacity(heights.len());
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use bitcoin::hex::{DisplayHex, FromHex};
|
||||
use brk_error::{Error, Result};
|
||||
use brk_error::{Error, OptionData, Result};
|
||||
use brk_types::{
|
||||
BlockHash, Height, MerkleProof, Timestamp, Transaction, TxInIndex, TxIndex, TxOutIndex,
|
||||
TxOutspend, TxStatus, Txid, TxidPrefix, Vin, Vout,
|
||||
@@ -53,19 +53,19 @@ impl Query {
|
||||
};
|
||||
|
||||
// Get block info for status
|
||||
let height = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.height
|
||||
.collect_one(tx_index)
|
||||
.unwrap();
|
||||
let height = self
|
||||
.computer()
|
||||
.indexes
|
||||
.tx_heights
|
||||
.get_shared(tx_index)
|
||||
.data()?;
|
||||
let block_hash = indexer
|
||||
.vecs
|
||||
.blocks
|
||||
.blockhash
|
||||
.reader()
|
||||
.get(height.to_usize());
|
||||
let block_time = indexer.vecs.blocks.timestamp.collect_one(height).unwrap();
|
||||
let block_time = indexer.vecs.blocks.timestamp.collect_one(height).data()?;
|
||||
|
||||
Ok(TxStatus {
|
||||
confirmed: true,
|
||||
@@ -146,9 +146,9 @@ impl Query {
|
||||
let txid_reader = indexer.vecs.transactions.txid.reader();
|
||||
let blockhash_reader = indexer.vecs.blocks.blockhash.reader();
|
||||
|
||||
let tx_heights = &self.computer().indexes.tx_heights;
|
||||
let mut input_tx_cursor = indexer.vecs.inputs.tx_index.cursor();
|
||||
let mut first_txin_cursor = indexer.vecs.transactions.first_txin_index.cursor();
|
||||
let mut height_cursor = indexer.vecs.transactions.height.cursor();
|
||||
let mut block_ts_cursor = indexer.vecs.blocks.timestamp.cursor();
|
||||
|
||||
let mut cached_block: Option<(Height, BlockHash, Timestamp)> = None;
|
||||
@@ -162,11 +162,11 @@ impl Query {
|
||||
continue;
|
||||
}
|
||||
|
||||
let spending_tx_index = input_tx_cursor.get(usize::from(txin_index)).unwrap();
|
||||
let spending_first_txin = first_txin_cursor.get(spending_tx_index.to_usize()).unwrap();
|
||||
let spending_tx_index = input_tx_cursor.get(usize::from(txin_index)).data()?;
|
||||
let spending_first_txin = first_txin_cursor.get(spending_tx_index.to_usize()).data()?;
|
||||
let vin = Vin::from(usize::from(txin_index) - usize::from(spending_first_txin));
|
||||
let spending_txid = txid_reader.get(spending_tx_index.to_usize());
|
||||
let spending_height = height_cursor.get(spending_tx_index.to_usize()).unwrap();
|
||||
let spending_height = tx_heights.get_shared(spending_tx_index).data()?;
|
||||
|
||||
let (block_hash, block_time) = if let Some((h, ref bh, bt)) = cached_block
|
||||
&& h == spending_height
|
||||
@@ -174,7 +174,7 @@ impl Query {
|
||||
(bh.clone(), bt)
|
||||
} else {
|
||||
let bh = blockhash_reader.get(spending_height.to_usize());
|
||||
let bt = block_ts_cursor.get(spending_height.to_usize()).unwrap();
|
||||
let bt = block_ts_cursor.get(spending_height.to_usize()).data()?;
|
||||
cached_block = Some((spending_height, bh.clone(), bt));
|
||||
(bh, bt)
|
||||
};
|
||||
@@ -238,19 +238,19 @@ impl Query {
|
||||
.inputs
|
||||
.tx_index
|
||||
.collect_one_at(usize::from(txin_index))
|
||||
.unwrap();
|
||||
.data()?;
|
||||
let spending_first_txin = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.first_txin_index
|
||||
.collect_one(spending_tx_index)
|
||||
.unwrap();
|
||||
let spending_height = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.height
|
||||
.collect_one(spending_tx_index)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
let spending_height = self
|
||||
.computer()
|
||||
.indexes
|
||||
.tx_heights
|
||||
.get_shared(spending_tx_index)
|
||||
.data()?;
|
||||
|
||||
Ok(TxOutspend {
|
||||
spent: true,
|
||||
@@ -282,7 +282,7 @@ impl Query {
|
||||
.blocks
|
||||
.timestamp
|
||||
.collect_one(spending_height)
|
||||
.unwrap(),
|
||||
.data()?,
|
||||
),
|
||||
}),
|
||||
})
|
||||
@@ -304,13 +304,13 @@ impl Query {
|
||||
.transactions
|
||||
.total_size
|
||||
.collect_one(tx_index)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
let position = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.position
|
||||
.collect_one(tx_index)
|
||||
.unwrap();
|
||||
.data()?;
|
||||
self.reader().read_raw_bytes(position, *total_size as usize)
|
||||
}
|
||||
|
||||
@@ -329,12 +329,12 @@ impl Query {
|
||||
.get(&prefix)?
|
||||
.map(|cow| cow.into_owned())
|
||||
.ok_or(Error::UnknownTxid)?;
|
||||
let height: Height = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.height
|
||||
.collect_one(tx_index)
|
||||
.unwrap();
|
||||
let height: Height = self
|
||||
.computer()
|
||||
.indexes
|
||||
.tx_heights
|
||||
.get_shared(tx_index)
|
||||
.data()?;
|
||||
Ok((tx_index, height))
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ impl ContentEncoding {
|
||||
/// Negotiate the best encoding from the Accept-Encoding header.
|
||||
/// Priority: zstd > br > gzip > identity.
|
||||
/// zstd is preferred over brotli: ~3-5x faster compression at comparable ratios.
|
||||
/// Respects q=0 (RFC 9110 §12.5.3): encodings explicitly rejected are never selected.
|
||||
pub fn negotiate(headers: &HeaderMap) -> Self {
|
||||
let accept = match headers.get(header::ACCEPT_ENCODING) {
|
||||
Some(v) => v,
|
||||
@@ -28,7 +29,15 @@ impl ContentEncoding {
|
||||
|
||||
let mut best = Self::Identity;
|
||||
for part in s.split(',') {
|
||||
let name = part.split(';').next().unwrap_or("").trim();
|
||||
let mut iter = part.split(';');
|
||||
let name = iter.next().unwrap_or("").trim();
|
||||
let rejected = iter.any(|p| {
|
||||
let p = p.trim();
|
||||
p == "q=0" || p == "q=0.0" || p == "q=0.00" || p == "q=0.000"
|
||||
});
|
||||
if rejected {
|
||||
continue;
|
||||
}
|
||||
match name {
|
||||
"zstd" => return Self::Zstd,
|
||||
"br" => best = Self::Brotli,
|
||||
|
||||
@@ -15,6 +15,7 @@ use vecdb::{Bytes, Formattable};
|
||||
Eq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
Hash,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
JsonSchema,
|
||||
|
||||
@@ -132,6 +132,17 @@ impl<I: Ord + Copy + Default + Into<usize>, V: From<usize> + Copy + Default> Ran
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared (immutable) floor lookup — binary search only, no cache update.
|
||||
/// Use when you only have `&self` (e.g. read-only clones in the query layer).
|
||||
#[inline]
|
||||
pub fn get_shared(&self, index: I) -> Option<V> {
|
||||
if self.first_indexes.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let pos = self.first_indexes.partition_point(|&first| first <= index);
|
||||
if pos > 0 { Some(V::from(pos - 1)) } else { None }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn cache_slot(index: &I) -> usize {
|
||||
let v: usize = (*index).into();
|
||||
|
||||
Reference in New Issue
Block a user