diff --git a/crates/brk_query/src/impl/addr.rs b/crates/brk_query/src/impl/addr.rs index 6713c8b3c..dc4369bcf 100644 --- a/crates/brk_query/src/impl/addr.rs +++ b/crates/brk_query/src/impl/addr.rs @@ -4,8 +4,8 @@ use bitcoin::{Network, PublicKey, ScriptBuf}; use brk_error::{Error, Result}; use brk_types::{ Addr, AddrBytes, AddrChainStats, AddrHash, AddrIndexOutPoint, AddrIndexTxIndex, AddrStats, - AnyAddrDataIndexEnum, Height, OutputType, Transaction, TxIndex, TxStatus, Txid, TypeIndex, - Unit, Utxo, Vout, + AnyAddrDataIndexEnum, BlockHash, Height, OutputType, Timestamp, Transaction, TxIndex, TxStatus, + Txid, TypeIndex, Unit, Utxo, Vout, }; use vecdb::{ReadableVec, VecIndex}; @@ -131,33 +131,33 @@ impl Query { .get(output_type) .unwrap(); - let prefix = u32::from(type_index).to_be_bytes(); - - let after_tx_index = if let Some(after_txid) = after_txid { - let tx_index = stores + if let Some(after_txid) = after_txid { + let after_tx_index = stores .txid_prefix_to_tx_index .get(&after_txid.into()) .map_err(|_| Error::UnknownTxid)? .ok_or(Error::UnknownTxid)? .into_owned(); - Some(tx_index) - } else { - None - }; - Ok(store - .prefix(prefix) - .rev() - .filter(|(key, _): &(AddrIndexTxIndex, Unit)| { - if let Some(after) = after_tx_index { - key.tx_index() < after - } else { - true - } - }) - .take(limit) - .map(|(key, _)| key.tx_index()) - .collect()) + // Seek directly to after_tx_index and iterate backward — O(limit) + let min = AddrIndexTxIndex::min_for_addr(type_index); + let bound = AddrIndexTxIndex::from((type_index, after_tx_index)); + Ok(store + .range(min..bound) + .rev() + .take(limit) + .map(|(key, _): (AddrIndexTxIndex, Unit)| key.tx_index()) + .collect()) + } else { + // No pagination — scan from end of prefix + let prefix = u32::from(type_index).to_be_bytes(); + Ok(store + .prefix(prefix) + .rev() + .take(limit) + .map(|(key, _): (AddrIndexTxIndex, Unit)| key.tx_index()) + .collect()) + } } pub fn addr_utxos(&self, addr: Addr) -> Result> { @@ -186,30 +186,38 @@ impl Query { let mut height_cursor = vecs.transactions.height.cursor(); let mut block_ts_cursor = vecs.blocks.timestamp.cursor(); - let utxos: Vec = outpoints - .into_iter() - .map(|(tx_index, vout)| { - let txid = txid_reader.get(tx_index.to_usize()); - let height = height_cursor.get(tx_index.to_usize()).unwrap(); - let first_txout_index = first_txout_index_reader.get(tx_index.to_usize()); - let txout_index = first_txout_index + vout; - let value = value_reader.get(usize::from(txout_index)); - let block_hash = blockhash_reader.get(usize::from(height)); - let block_time = block_ts_cursor.get(height.to_usize()).unwrap(); + let mut cached_block: Option<(Height, BlockHash, Timestamp)> = None; + let mut utxos = Vec::with_capacity(outpoints.len()); - Utxo { - txid, - vout, - status: TxStatus { - confirmed: true, - block_height: Some(height), - block_hash: Some(block_hash), - block_time: Some(block_time), - }, - value, - } - }) - .collect(); + for (tx_index, vout) in outpoints { + let txid = txid_reader.get(tx_index.to_usize()); + let height: Height = height_cursor.get(tx_index.to_usize()).unwrap(); + let first_txout_index = first_txout_index_reader.get(tx_index.to_usize()); + let value = value_reader.get(usize::from(first_txout_index + vout)); + + let (block_hash, block_time) = if let Some((h, ref bh, bt)) = cached_block + && h == height + { + (bh.clone(), bt) + } else { + let bh = blockhash_reader.get(height.to_usize()); + let bt = block_ts_cursor.get(height.to_usize()).unwrap(); + cached_block = Some((height, bh.clone(), bt)); + (bh, bt) + }; + + utxos.push(Utxo { + txid, + vout, + status: TxStatus { + confirmed: true, + block_height: Some(height), + block_hash: Some(block_hash), + block_time: Some(block_time), + }, + value, + }); + } Ok(utxos) } diff --git a/crates/brk_query/src/impl/block/info.rs b/crates/brk_query/src/impl/block/info.rs index 36ba89773..c9c77d1d1 100644 --- a/crates/brk_query/src/impl/block/info.rs +++ b/crates/brk_query/src/impl/block/info.rs @@ -288,7 +288,6 @@ impl Query { let varint_len = Self::compact_size_len(tx_count); let coinbase_offset = HEADER_SIZE as u32 + varint_len; let coinbase_pos = positions[i] + coinbase_offset; - let coinbase_read_len = size as usize - coinbase_offset as usize; let ( coinbase_raw, @@ -297,7 +296,7 @@ impl Query { coinbase_signature, coinbase_signature_ascii, scriptsig_bytes, - ) = Self::parse_coinbase_tx(reader, coinbase_pos, coinbase_read_len); + ) = Self::parse_coinbase_tx(reader, coinbase_pos); let miner_names = if pool_slug == PoolSlug::Ocean { Self::parse_datum_miner_names(&scriptsig_bytes) @@ -514,15 +513,14 @@ impl Query { fn parse_coinbase_tx( reader: &Reader, position: BlkPosition, - len: usize, ) -> (String, Option, Vec, String, String, Vec) { let empty = (String::new(), None, vec![], String::new(), String::new(), vec![]); - let raw_bytes = match reader.read_raw_bytes(position, len) { - Ok(bytes) => bytes, + let blk_reader = match reader.reader_at(position) { + Ok(r) => r, Err(_) => return empty, }; - let tx = match bitcoin::Transaction::consensus_decode(&mut raw_bytes.as_slice()) { + let tx = match bitcoin::Transaction::consensus_decode(&mut bitcoin::io::FromStd::new(blk_reader)) { Ok(tx) => tx, Err(_) => return empty, }; diff --git a/crates/brk_query/src/impl/block/timestamp.rs b/crates/brk_query/src/impl/block/timestamp.rs index 4c1224196..8f53da9af 100644 --- a/crates/brk_query/src/impl/block/timestamp.rs +++ b/crates/brk_query/src/impl/block/timestamp.rs @@ -31,14 +31,14 @@ impl Query { let start: usize = usize::from(first_height_of_day).min(max_height_usize); - let timestamps = &indexer.vecs.blocks.timestamp; + let mut ts_cursor = indexer.vecs.blocks.timestamp.cursor(); // Search forward from start to find the last block <= target timestamp let mut best_height = start; - let mut best_ts = timestamps.collect_one_at(start).unwrap(); + let mut best_ts = ts_cursor.get(start).unwrap(); for h in (start + 1)..=max_height_usize { - let block_ts = timestamps.collect_one_at(h).unwrap(); + let block_ts = ts_cursor.get(h).unwrap(); if block_ts <= target { best_height = h; best_ts = block_ts; @@ -49,7 +49,7 @@ impl Query { // Check one block before start in case we need to go backward if start > 0 && best_ts > target { - let prev_ts = timestamps.collect_one_at(start - 1).unwrap(); + let prev_ts = ts_cursor.get(start - 1).unwrap(); if prev_ts <= target { best_height = start - 1; best_ts = prev_ts; diff --git a/crates/brk_query/src/impl/block/txs.rs b/crates/brk_query/src/impl/block/txs.rs index ad127cfa4..75b87b26b 100644 --- a/crates/brk_query/src/impl/block/txs.rs +++ b/crates/brk_query/src/impl/block/txs.rs @@ -91,6 +91,8 @@ impl Query { let output_type_reader = indexer.vecs.outputs.output_type.reader(); let type_index_reader = indexer.vecs.outputs.type_index.reader(); let addr_readers = indexer.vecs.addrs.addr_readers(); + let blockhash_reader = indexer.vecs.blocks.blockhash.reader(); + let mut block_ts_cursor = indexer.vecs.blocks.timestamp.cursor(); let mut cached_block: Option<(Height, BlockHash, Timestamp)> = None; @@ -113,8 +115,8 @@ impl Query { { (bh.clone(), bt) } else { - let bh = indexer.vecs.blocks.blockhash.read_once(height)?; - let bt = indexer.vecs.blocks.timestamp.collect_one(height).unwrap(); + let bh = blockhash_reader.get(height.to_usize()); + let bt = block_ts_cursor.get(height.to_usize()).unwrap(); cached_block = Some((height, bh.clone(), bt)); (bh, bt) }; diff --git a/crates/brk_query/src/impl/mining/epochs.rs b/crates/brk_query/src/impl/mining/epochs.rs index e53ee84a4..4d61a43a2 100644 --- a/crates/brk_query/src/impl/mining/epochs.rs +++ b/crates/brk_query/src/impl/mining/epochs.rs @@ -1,5 +1,5 @@ use brk_computer::Computer; -use brk_types::{DifficultyAdjustmentEntry, Epoch, Height}; +use brk_types::{DifficultyAdjustmentEntry, Height}; use vecdb::{ReadableVec, Ro, VecIndex}; /// Iterate over difficulty epochs within a height range. @@ -21,25 +21,24 @@ pub fn iter_difficulty_epochs( .collect_one(Height::from(end_height)) .unwrap_or_default(); - let epoch_to_height = &computer.indexes.epoch.first_height; - let epoch_to_timestamp = &computer.indexes.timestamp.epoch; - let epoch_to_difficulty = &computer.blocks.difficulty.value.epoch; + let mut height_cursor = computer.indexes.epoch.first_height.cursor(); + let mut timestamp_cursor = computer.indexes.timestamp.epoch.cursor(); + let mut difficulty_cursor = computer.blocks.difficulty.value.epoch.cursor(); let mut results = Vec::with_capacity(end_epoch.to_usize() - start_epoch.to_usize() + 1); let mut prev_difficulty: Option = None; for epoch_usize in start_epoch.to_usize()..=end_epoch.to_usize() { - let epoch = Epoch::from(epoch_usize); - let epoch_height = epoch_to_height.collect_one(epoch).unwrap_or_default(); + let epoch_height = height_cursor.get(epoch_usize).unwrap_or_default(); // Skip epochs before our start height but track difficulty if epoch_height.to_usize() < start_height { - prev_difficulty = epoch_to_difficulty.collect_one(epoch).map(|d| *d); + prev_difficulty = difficulty_cursor.get(epoch_usize).map(|d| *d); continue; } - let epoch_timestamp = epoch_to_timestamp.collect_one(epoch).unwrap_or_default(); - let epoch_difficulty = *epoch_to_difficulty.collect_one(epoch).unwrap_or_default(); + let epoch_timestamp = timestamp_cursor.get(epoch_usize).unwrap_or_default(); + let epoch_difficulty = *difficulty_cursor.get(epoch_usize).unwrap_or_default(); let change_percent = match prev_difficulty { Some(prev) if prev > 0.0 => epoch_difficulty / prev, diff --git a/crates/brk_query/src/impl/mining/hashrate.rs b/crates/brk_query/src/impl/mining/hashrate.rs index 79a545616..723d5f5d6 100644 --- a/crates/brk_query/src/impl/mining/hashrate.rs +++ b/crates/brk_query/src/impl/mining/hashrate.rs @@ -1,5 +1,5 @@ use brk_error::Result; -use brk_types::{Day1, DifficultyEntry, HashrateEntry, HashrateSummary, Height, TimePeriod}; +use brk_types::{DifficultyEntry, HashrateEntry, HashrateSummary, Height, TimePeriod}; use vecdb::{ReadableOptionVec, ReadableVec, VecIndex}; use super::epochs::iter_difficulty_epochs; @@ -56,16 +56,15 @@ impl Query { let total_days = end_day1.to_usize().saturating_sub(start_day1.to_usize()) + 1; let step = (total_days / 200).max(1); // Max ~200 data points - let hashrate_vec = &computer.mining.hashrate.rate.base.day1; - let timestamp_vec = &computer.indexes.timestamp.day1; + let mut hr_cursor = computer.mining.hashrate.rate.base.day1.cursor(); + let mut ts_cursor = computer.indexes.timestamp.day1.cursor(); let mut hashrates = Vec::with_capacity(total_days / step + 1); let mut di = start_day1.to_usize(); while di <= end_day1.to_usize() { - let day1 = Day1::from(di); - if let (Some(hr), Some(timestamp)) = ( - hashrate_vec.collect_one_flat(day1), - timestamp_vec.collect_one(day1), + if let (Some(Some(hr)), Some(timestamp)) = ( + hr_cursor.get(di), + ts_cursor.get(di), ) { hashrates.push(HashrateEntry { timestamp, diff --git a/crates/brk_query/src/impl/tx.rs b/crates/brk_query/src/impl/tx.rs index 8ba2e530c..1d7c5cf4f 100644 --- a/crates/brk_query/src/impl/tx.rs +++ b/crates/brk_query/src/impl/tx.rs @@ -53,13 +53,8 @@ impl Query { }; // Get block info for status - let height = indexer - .vecs - .transactions - .height - .collect_one(tx_index) - .unwrap(); - let block_hash = indexer.vecs.blocks.blockhash.read_once(height)?; + let height = indexer.vecs.transactions.height.collect_one(tx_index).unwrap(); + let block_hash = indexer.vecs.blocks.blockhash.reader().get(height.to_usize()); let block_time = indexer.vecs.blocks.timestamp.collect_one(height).unwrap(); Ok(TxStatus { @@ -136,14 +131,13 @@ impl Query { let indexer = self.indexer(); let txin_index_reader = self.computer().outputs.spent.txin_index.reader(); let txid_reader = indexer.vecs.transactions.txid.reader(); + let blockhash_reader = indexer.vecs.blocks.blockhash.reader(); - // Cursors buffer chunks so nearby indices share decompression let mut input_tx_cursor = indexer.vecs.inputs.tx_index.cursor(); let mut first_txin_cursor = indexer.vecs.transactions.first_txin_index.cursor(); let mut height_cursor = indexer.vecs.transactions.height.cursor(); let mut block_ts_cursor = indexer.vecs.blocks.timestamp.cursor(); - // Spending txs in the same block share block hash/time let mut cached_block: Option<(Height, BlockHash, Timestamp)> = None; let mut outspends = Vec::with_capacity(output_count); @@ -167,7 +161,7 @@ impl Query { { (bh.clone(), bt) } else { - let bh = indexer.vecs.blocks.blockhash.read_once(spending_height)?; + let bh = blockhash_reader.get(spending_height.to_usize()); let bt = block_ts_cursor.get(spending_height.to_usize()).unwrap(); cached_block = Some((spending_height, bh.clone(), bt)); (bh, bt) @@ -262,7 +256,14 @@ impl Query { status: Some(TxStatus { confirmed: true, block_height: Some(spending_height), - block_hash: Some(indexer.vecs.blocks.blockhash.read_once(spending_height)?), + block_hash: Some( + indexer + .vecs + .blocks + .blockhash + .reader() + .get(spending_height.to_usize()), + ), block_time: Some( indexer .vecs diff --git a/crates/brk_reader/src/lib.rs b/crates/brk_reader/src/lib.rs index 032b398c7..346c177b0 100644 --- a/crates/brk_reader/src/lib.rs +++ b/crates/brk_reader/src/lib.rs @@ -96,41 +96,47 @@ impl ReaderInner { self.xor_bytes } - /// Read raw bytes from a blk file at the given position with XOR decoding. - /// File handles are cached per blk_index; reads use pread (no seek, thread-safe). - pub fn read_raw_bytes(&self, position: BlkPosition, size: usize) -> Result> { - let blk_index = position.blk_index(); - - { - let cache = self.blk_file_cache.read(); - if let Some(file) = cache.get(&blk_index) { - let mut buffer = vec![0u8; size]; - file.read_at(&mut buffer, position.offset() as u64)?; - self.xor_decode(&mut buffer, position.offset()); - return Ok(buffer); - } + /// Ensure the blk file for `blk_index` is in the file handle cache. + fn ensure_blk_cached(&self, blk_index: u16) -> Result<()> { + if self.blk_file_cache.read().contains_key(&blk_index) { + return Ok(()); } - - // Cache miss: open file, insert, and read let blk_paths = self.blk_index_to_blk_path(); let blk_path = blk_paths .get(&blk_index) .ok_or(Error::NotFound("Blk file not found".into()))?; let file = File::open(blk_path)?; + self.blk_file_cache.write().entry(blk_index).or_insert(file); + Ok(()) + } + /// Read raw bytes from a blk file at the given position with XOR decoding. + pub fn read_raw_bytes(&self, position: BlkPosition, size: usize) -> Result> { + self.ensure_blk_cached(position.blk_index())?; + + let cache = self.blk_file_cache.read(); + let file = cache.get(&position.blk_index()).unwrap(); let mut buffer = vec![0u8; size]; file.read_at(&mut buffer, position.offset() as u64)?; - self.xor_decode(&mut buffer, position.offset()); - - self.blk_file_cache.write().entry(blk_index).or_insert(file); - + XORIndex::decode_at(&mut buffer, position.offset() as usize, self.xor_bytes); Ok(buffer) } - fn xor_decode(&self, buffer: &mut [u8], offset: u32) { - let mut xori = XORIndex::default(); - xori.add_assign(offset as usize); - xori.bytes(buffer, self.xor_bytes); + /// Returns a `Read` impl positioned at `position` in the blk file. + /// Reads only the bytes requested — no upfront allocation. + pub fn reader_at(&self, position: BlkPosition) -> Result> { + self.ensure_blk_cached(position.blk_index())?; + + let mut xor_index = XORIndex::default(); + xor_index.add_assign(position.offset() as usize); + + Ok(BlkRead { + cache: self.blk_file_cache.read(), + blk_index: position.blk_index(), + offset: position.offset() as u64, + xor_index, + xor_bytes: self.xor_bytes, + }) } /// Returns a receiver streaming `ReadBlock`s from `hash + 1` to the chain tip. @@ -468,3 +474,22 @@ impl ReaderInner { Ok(Height::new(height)) } } + +/// Streaming reader at a position in a blk file. Reads via pread + XOR on demand. +pub struct BlkRead<'a> { + cache: RwLockReadGuard<'a, BTreeMap>, + blk_index: u16, + offset: u64, + xor_index: XORIndex, + xor_bytes: XORBytes, +} + +impl Read for BlkRead<'_> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let file = self.cache.get(&self.blk_index).unwrap(); + let n = file.read_at(buf, self.offset)?; + self.xor_index.bytes(&mut buf[..n], self.xor_bytes); + self.offset += n as u64; + Ok(n) + } +} diff --git a/crates/brk_reader/src/xor_index.rs b/crates/brk_reader/src/xor_index.rs index 05d314b8f..33c6d1664 100644 --- a/crates/brk_reader/src/xor_index.rs +++ b/crates/brk_reader/src/xor_index.rs @@ -36,4 +36,12 @@ impl XORIndex { pub fn add_assign(&mut self, i: usize) { self.0 = (self.0 + i) % XOR_LEN; } + + /// XOR-decode `buffer` starting at `offset`. + #[inline] + pub fn decode_at(buffer: &mut [u8], offset: usize, xor_bytes: XORBytes) { + let mut xori = Self::default(); + xori.add_assign(offset); + xori.bytes(buffer, xor_bytes); + } } diff --git a/crates/brk_store/src/lib.rs b/crates/brk_store/src/lib.rs index 7cfe0e79c..32a08e517 100644 --- a/crates/brk_store/src/lib.rs +++ b/crates/brk_store/src/lib.rs @@ -1,6 +1,6 @@ #![doc = include_str!("../README.md")] -use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, path::Path}; +use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, ops::Range, path::Path}; use brk_error::Result; use brk_types::{Height, Version}; @@ -246,6 +246,19 @@ where .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) } + #[inline] + pub fn range>( + &self, + range: Range, + ) -> impl DoubleEndedIterator + '_ { + let start: ByteView = range.start.into(); + let end: ByteView = range.end.into(); + self.keyspace + .range(start..end) + .map(|res| res.into_inner().unwrap()) + .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) + } + pub fn approximate_len(&self) -> usize { self.keyspace.approximate_len() }