mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-04-24 06:39:58 -07:00
global: snap
This commit is contained in:
@@ -8364,8 +8364,8 @@ impl BrkClient {
|
||||
/// *[Mempool.space docs](https://mempool.space/docs/api/rest#get-block-raw)*
|
||||
///
|
||||
/// Endpoint: `GET /api/block/{hash}/raw`
|
||||
pub fn get_block_raw(&self, hash: BlockHash) -> Result<Vec<f64>> {
|
||||
self.base.get_json(&format!("/api/block/{hash}/raw"))
|
||||
pub fn get_block_raw(&self, hash: BlockHash) -> Result<String> {
|
||||
self.base.get_text(&format!("/api/block/{hash}/raw"))
|
||||
}
|
||||
|
||||
/// Block status
|
||||
@@ -8789,8 +8789,8 @@ impl BrkClient {
|
||||
/// *[Mempool.space docs](https://mempool.space/docs/api/rest#get-transaction-raw)*
|
||||
///
|
||||
/// Endpoint: `GET /api/tx/{txid}/raw`
|
||||
pub fn get_tx_raw(&self, txid: Txid) -> Result<Vec<f64>> {
|
||||
self.base.get_json(&format!("/api/tx/{txid}/raw"))
|
||||
pub fn get_tx_raw(&self, txid: Txid) -> Result<String> {
|
||||
self.base.get_text(&format!("/api/tx/{txid}/raw"))
|
||||
}
|
||||
|
||||
/// Transaction status
|
||||
|
||||
@@ -24,7 +24,6 @@ schemars = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
rayon = { workspace = true }
|
||||
rlimit = "0.11.0"
|
||||
rustc-hash = { workspace = true }
|
||||
vecdb = { workspace = true }
|
||||
|
||||
|
||||
@@ -66,14 +66,6 @@ impl Indexer {
|
||||
}
|
||||
|
||||
fn forced_import_inner(outputs_dir: &Path, can_retry: bool) -> Result<Self> {
|
||||
info!("Increasing number of open files limit...");
|
||||
let no_file_limit = rlimit::getrlimit(rlimit::Resource::NOFILE)?;
|
||||
rlimit::setrlimit(
|
||||
rlimit::Resource::NOFILE,
|
||||
no_file_limit.0.max(10_000),
|
||||
no_file_limit.1,
|
||||
)?;
|
||||
|
||||
info!("Importing indexer...");
|
||||
|
||||
let indexed_path = outputs_dir.join("indexed");
|
||||
|
||||
@@ -97,10 +97,7 @@ impl Query {
|
||||
limit: usize,
|
||||
) -> Result<Vec<Transaction>> {
|
||||
let txindices = self.addr_txindices(&addr, after_txid, limit)?;
|
||||
txindices
|
||||
.into_iter()
|
||||
.map(|tx_index| self.transaction_by_index(tx_index))
|
||||
.collect()
|
||||
self.transactions_by_indices(&txindices)
|
||||
}
|
||||
|
||||
pub fn addr_txids(
|
||||
|
||||
@@ -25,7 +25,10 @@ impl Query {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let count = BLOCK_TXS_PAGE_SIZE.min(tx_count - start);
|
||||
self.transactions_by_range(first + start, count)
|
||||
let indices: Vec<TxIndex> = (first + start..first + start + count)
|
||||
.map(TxIndex::from)
|
||||
.collect();
|
||||
self.transactions_by_indices(&indices)
|
||||
}
|
||||
|
||||
pub fn block_txid_at_index(&self, hash: &BlockHash, index: TxIndex) -> Result<Txid> {
|
||||
@@ -33,48 +36,55 @@ impl Query {
|
||||
self.block_txid_at_index_by_height(height, index.into())
|
||||
}
|
||||
|
||||
// === Bulk transaction read ===
|
||||
// === Helper methods ===
|
||||
|
||||
/// Batch-read `count` consecutive transactions starting at raw index `start`.
|
||||
/// Block info is cached per unique height — free for same-block batches.
|
||||
pub fn transactions_by_range(&self, start: usize, count: usize) -> Result<Vec<Transaction>> {
|
||||
if count == 0 {
|
||||
pub(crate) fn block_txids_by_height(&self, height: Height) -> Result<Vec<Txid>> {
|
||||
let (first, tx_count) = self.block_tx_range(height)?;
|
||||
Ok(self
|
||||
.indexer()
|
||||
.vecs
|
||||
.transactions
|
||||
.txid
|
||||
.collect_range_at(first, first + tx_count))
|
||||
}
|
||||
|
||||
fn block_txid_at_index_by_height(&self, height: Height, index: usize) -> Result<Txid> {
|
||||
let (first, tx_count) = self.block_tx_range(height)?;
|
||||
if index >= tx_count {
|
||||
return Err(Error::OutOfRange("Transaction index out of range".into()));
|
||||
}
|
||||
Ok(self
|
||||
.indexer()
|
||||
.vecs
|
||||
.transactions
|
||||
.txid
|
||||
.reader()
|
||||
.get(first + index))
|
||||
}
|
||||
|
||||
/// Batch-read transactions at arbitrary indices.
|
||||
/// Reads in ascending index order for I/O locality, returns in caller's order.
|
||||
pub fn transactions_by_indices(&self, indices: &[TxIndex]) -> Result<Vec<Transaction>> {
|
||||
if indices.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let len = indices.len();
|
||||
|
||||
// Sort positions ascending for sequential I/O (O(n) when already sorted)
|
||||
let mut order: Vec<usize> = (0..len).collect();
|
||||
order.sort_unstable_by_key(|&i| indices[i]);
|
||||
|
||||
let indexer = self.indexer();
|
||||
let reader = self.reader();
|
||||
let end = start + count;
|
||||
|
||||
// 7 range reads instead of count * 7 point reads
|
||||
let txids: Vec<Txid> = indexer.vecs.transactions.txid.collect_range_at(start, end);
|
||||
let heights: Vec<Height> = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.height
|
||||
.collect_range_at(start, end);
|
||||
let lock_times = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.raw_locktime
|
||||
.collect_range_at(start, end);
|
||||
let total_sizes = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.total_size
|
||||
.collect_range_at(start, end);
|
||||
let first_txin_indices = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.first_txin_index
|
||||
.collect_range_at(start, end);
|
||||
let positions = indexer
|
||||
.vecs
|
||||
.transactions
|
||||
.position
|
||||
.collect_range_at(start, end);
|
||||
let mut txid_cursor = indexer.vecs.transactions.txid.cursor();
|
||||
let mut height_cursor = indexer.vecs.transactions.height.cursor();
|
||||
let mut locktime_cursor = indexer.vecs.transactions.raw_locktime.cursor();
|
||||
let mut total_size_cursor = indexer.vecs.transactions.total_size.cursor();
|
||||
let mut first_txin_cursor = indexer.vecs.transactions.first_txin_index.cursor();
|
||||
let mut position_cursor = indexer.vecs.transactions.position.cursor();
|
||||
|
||||
// Readers for prevout lookups (created once)
|
||||
let txid_reader = indexer.vecs.transactions.txid.reader();
|
||||
let first_txout_index_reader = indexer.vecs.transactions.first_txout_index.reader();
|
||||
let value_reader = indexer.vecs.outputs.value.reader();
|
||||
@@ -82,15 +92,22 @@ impl Query {
|
||||
let type_index_reader = indexer.vecs.outputs.type_index.reader();
|
||||
let addr_readers = indexer.vecs.addrs.addr_readers();
|
||||
|
||||
// Block info cache — for same-block batches, read once
|
||||
let mut cached_block: Option<(Height, BlockHash, Timestamp)> = None;
|
||||
|
||||
let mut txs = Vec::with_capacity(count);
|
||||
// Read in sorted order, write directly to original position
|
||||
let mut txs: Vec<Option<Transaction>> = (0..len).map(|_| None).collect();
|
||||
|
||||
for i in 0..count {
|
||||
let height = heights[i];
|
||||
for &pos in &order {
|
||||
let tx_index = indices[pos];
|
||||
let idx = tx_index.to_usize();
|
||||
|
||||
let txid = txid_cursor.get(idx).unwrap();
|
||||
let height = height_cursor.get(idx).unwrap();
|
||||
let lock_time = locktime_cursor.get(idx).unwrap();
|
||||
let total_size = total_size_cursor.get(idx).unwrap();
|
||||
let first_txin_index = first_txin_cursor.get(idx).unwrap();
|
||||
let position = position_cursor.get(idx).unwrap();
|
||||
|
||||
// Reuse block info if same height as previous tx
|
||||
let (block_hash, block_time) = if let Some((h, ref bh, bt)) = cached_block
|
||||
&& h == height
|
||||
{
|
||||
@@ -102,15 +119,13 @@ impl Query {
|
||||
(bh, bt)
|
||||
};
|
||||
|
||||
// Decode raw transaction from blk file
|
||||
let buffer = reader.read_raw_bytes(positions[i], *total_sizes[i] as usize)?;
|
||||
let buffer = reader.read_raw_bytes(position, *total_size as usize)?;
|
||||
let tx = bitcoin::Transaction::consensus_decode(&mut Cursor::new(buffer))
|
||||
.map_err(|_| Error::Parse("Failed to decode transaction".into()))?;
|
||||
|
||||
// Batch-read outpoints for this tx's inputs
|
||||
let outpoints = indexer.vecs.inputs.outpoint.collect_range_at(
|
||||
usize::from(first_txin_indices[i]),
|
||||
usize::from(first_txin_indices[i]) + tx.input.len(),
|
||||
usize::from(first_txin_index),
|
||||
usize::from(first_txin_index) + tx.input.len(),
|
||||
);
|
||||
|
||||
let input: Vec<TxIn> = tx
|
||||
@@ -178,11 +193,11 @@ impl Query {
|
||||
let output: Vec<TxOut> = tx.output.into_iter().map(TxOut::from).collect();
|
||||
|
||||
let mut transaction = Transaction {
|
||||
index: Some(TxIndex::from(start + i)),
|
||||
txid: txids[i].clone(),
|
||||
index: Some(tx_index),
|
||||
txid,
|
||||
version: tx.version.into(),
|
||||
lock_time: lock_times[i],
|
||||
total_size: *total_sizes[i] as usize,
|
||||
lock_time,
|
||||
total_size: *total_size as usize,
|
||||
weight,
|
||||
total_sigop_cost,
|
||||
fee: Sats::ZERO,
|
||||
@@ -197,36 +212,10 @@ impl Query {
|
||||
};
|
||||
|
||||
transaction.compute_fee();
|
||||
txs.push(transaction);
|
||||
txs[pos] = Some(transaction);
|
||||
}
|
||||
|
||||
Ok(txs)
|
||||
}
|
||||
|
||||
// === Helper methods ===
|
||||
|
||||
pub(crate) fn block_txids_by_height(&self, height: Height) -> Result<Vec<Txid>> {
|
||||
let (first, tx_count) = self.block_tx_range(height)?;
|
||||
Ok(self
|
||||
.indexer()
|
||||
.vecs
|
||||
.transactions
|
||||
.txid
|
||||
.collect_range_at(first, first + tx_count))
|
||||
}
|
||||
|
||||
fn block_txid_at_index_by_height(&self, height: Height, index: usize) -> Result<Txid> {
|
||||
let (first, tx_count) = self.block_tx_range(height)?;
|
||||
if index >= tx_count {
|
||||
return Err(Error::OutOfRange("Transaction index out of range".into()));
|
||||
}
|
||||
Ok(self
|
||||
.indexer()
|
||||
.vecs
|
||||
.transactions
|
||||
.txid
|
||||
.reader()
|
||||
.get(first + index))
|
||||
Ok(txs.into_iter().map(Option::unwrap).collect())
|
||||
}
|
||||
|
||||
/// Returns (first_tx_raw_index, tx_count) for a block at `height`.
|
||||
|
||||
@@ -278,7 +278,7 @@ impl Query {
|
||||
// === Helper methods ===
|
||||
|
||||
pub fn transaction_by_index(&self, tx_index: TxIndex) -> Result<Transaction> {
|
||||
self.transactions_by_range(tx_index.to_usize(), 1)?
|
||||
self.transactions_by_indices(&[tx_index])?
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or(Error::NotFound("Transaction not found".into()))
|
||||
|
||||
@@ -20,3 +20,4 @@ derive_more = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
rayon = { workspace = true }
|
||||
rlimit = "0.11.0"
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::{
|
||||
fs::{self, File},
|
||||
io::{Read, Seek, SeekFrom},
|
||||
ops::ControlFlow,
|
||||
os::unix::fs::FileExt,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
thread,
|
||||
@@ -53,6 +54,7 @@ impl Reader {
|
||||
#[derive(Debug)]
|
||||
pub struct ReaderInner {
|
||||
blk_index_to_blk_path: Arc<RwLock<BlkIndexToBlkPath>>,
|
||||
blk_file_cache: RwLock<BTreeMap<u16, File>>,
|
||||
xor_bytes: XORBytes,
|
||||
blocks_dir: PathBuf,
|
||||
client: Client,
|
||||
@@ -60,11 +62,19 @@ pub struct ReaderInner {
|
||||
|
||||
impl ReaderInner {
|
||||
pub fn new(blocks_dir: PathBuf, client: Client) -> Self {
|
||||
let no_file_limit = rlimit::getrlimit(rlimit::Resource::NOFILE).unwrap_or((0, 0));
|
||||
let _ = rlimit::setrlimit(
|
||||
rlimit::Resource::NOFILE,
|
||||
no_file_limit.0.max(15_000),
|
||||
no_file_limit.1,
|
||||
);
|
||||
|
||||
Self {
|
||||
xor_bytes: XORBytes::from(blocks_dir.as_path()),
|
||||
blk_index_to_blk_path: Arc::new(RwLock::new(BlkIndexToBlkPath::scan(
|
||||
blocks_dir.as_path(),
|
||||
))),
|
||||
blk_file_cache: RwLock::new(BTreeMap::new()),
|
||||
blocks_dir,
|
||||
client,
|
||||
}
|
||||
@@ -86,26 +96,43 @@ impl ReaderInner {
|
||||
self.xor_bytes
|
||||
}
|
||||
|
||||
/// Read raw bytes from a blk file at the given position with XOR decoding
|
||||
/// Read raw bytes from a blk file at the given position with XOR decoding.
|
||||
/// File handles are cached per blk_index; reads use pread (no seek, thread-safe).
|
||||
pub fn read_raw_bytes(&self, position: BlkPosition, size: usize) -> Result<Vec<u8>> {
|
||||
let blk_index = position.blk_index();
|
||||
|
||||
{
|
||||
let cache = self.blk_file_cache.read();
|
||||
if let Some(file) = cache.get(&blk_index) {
|
||||
let mut buffer = vec![0u8; size];
|
||||
file.read_at(&mut buffer, position.offset() as u64)?;
|
||||
self.xor_decode(&mut buffer, position.offset());
|
||||
return Ok(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
// Cache miss: open file, insert, and read
|
||||
let blk_paths = self.blk_index_to_blk_path();
|
||||
let blk_path = blk_paths
|
||||
.get(&position.blk_index())
|
||||
.get(&blk_index)
|
||||
.ok_or(Error::NotFound("Blk file not found".into()))?;
|
||||
|
||||
let mut file = File::open(blk_path)?;
|
||||
file.seek(SeekFrom::Start(position.offset() as u64))?;
|
||||
let file = File::open(blk_path)?;
|
||||
|
||||
let mut buffer = vec![0u8; size];
|
||||
file.read_exact(&mut buffer)?;
|
||||
file.read_at(&mut buffer, position.offset() as u64)?;
|
||||
self.xor_decode(&mut buffer, position.offset());
|
||||
|
||||
let mut xori = XORIndex::default();
|
||||
xori.add_assign(position.offset() as usize);
|
||||
xori.bytes(&mut buffer, self.xor_bytes);
|
||||
self.blk_file_cache.write().entry(blk_index).or_insert(file);
|
||||
|
||||
Ok(buffer)
|
||||
}
|
||||
|
||||
fn xor_decode(&self, buffer: &mut [u8], offset: u32) {
|
||||
let mut xori = XORIndex::default();
|
||||
xori.add_assign(offset as usize);
|
||||
xori.bytes(buffer, self.xor_bytes);
|
||||
}
|
||||
|
||||
/// Returns a receiver streaming `ReadBlock`s from `hash + 1` to the chain tip.
|
||||
/// If `hash` is `None`, starts from genesis.
|
||||
pub fn after(&self, hash: Option<BlockHash>) -> Result<Receiver<ReadBlock>> {
|
||||
|
||||
Reference in New Issue
Block a user