diff --git a/crates/brk_computer/src/stateful_new/compute/block_loop.rs b/crates/brk_computer/src/stateful_new/compute/block_loop.rs index e3e14eabf..e27c550c7 100644 --- a/crates/brk_computer/src/stateful_new/compute/block_loop.rs +++ b/crates/brk_computer/src/stateful_new/compute/block_loop.rs @@ -8,12 +8,14 @@ //! 5. Push to height-indexed vectors //! 6. Periodically flush checkpoints -use std::thread; +use std::{mem, thread}; use brk_error::Result; use brk_grouper::ByAddressType; use brk_indexer::Indexer; -use brk_types::{DateIndex, Dollars, Height, OutputType, Sats, Timestamp, TxInIndex, TxOutIndex, TypeIndex}; +use brk_types::{ + DateIndex, Dollars, Height, OutputType, Sats, Timestamp, TxInIndex, TxOutIndex, TypeIndex, +}; use log::info; use rayon::prelude::*; use vecdb::{AnyStoredVec, Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex}; @@ -27,8 +29,9 @@ use super::super::cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts}; use super::super::vecs::Vecs; use super::{ BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1, - BIP30_ORIGINAL_HEIGHT_2, FLUSH_INTERVAL, IndexerReaders, VecsReaders, build_txinindex_to_txindex, - build_txoutindex_to_txindex, flush::flush_checkpoint as flush_checkpoint_full, + BIP30_ORIGINAL_HEIGHT_2, FLUSH_INTERVAL, IndexerReaders, VecsReaders, + build_txinindex_to_txindex, build_txoutindex_to_txindex, + flush::flush_checkpoint as flush_checkpoint_full, }; use crate::stateful_new::address::AddressTypeToAddressCount; use crate::stateful_new::process::{ @@ -69,7 +72,12 @@ pub fn process_blocks( let height_to_tx_count = chain.indexes_to_tx_count.height.u(); let height_to_output_count = chain.indexes_to_output_count.height.unwrap_sum(); let height_to_input_count = chain.indexes_to_input_count.height.unwrap_sum(); - let height_to_unclaimed_rewards = chain.indexes_to_unclaimed_rewards.sats.height.as_ref().unwrap(); + let height_to_unclaimed_rewards = chain + .indexes_to_unclaimed_rewards + .sats + .height + .as_ref() + .unwrap(); // From indexes: let height_to_timestamp = &indexes.height_to_timestamp_fixed; @@ -114,43 +122,79 @@ pub fn process_blocks( let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data); // Create iterators for first address indexes per type - let mut first_p2a_iter = indexer.vecs.address.height_to_first_p2aaddressindex.into_iter(); - let mut first_p2pk33_iter = indexer.vecs.address.height_to_first_p2pk33addressindex.into_iter(); - let mut first_p2pk65_iter = indexer.vecs.address.height_to_first_p2pk65addressindex.into_iter(); - let mut first_p2pkh_iter = indexer.vecs.address.height_to_first_p2pkhaddressindex.into_iter(); - let mut first_p2sh_iter = indexer.vecs.address.height_to_first_p2shaddressindex.into_iter(); - let mut first_p2tr_iter = indexer.vecs.address.height_to_first_p2traddressindex.into_iter(); - let mut first_p2wpkh_iter = indexer.vecs.address.height_to_first_p2wpkhaddressindex.into_iter(); - let mut first_p2wsh_iter = indexer.vecs.address.height_to_first_p2wshaddressindex.into_iter(); + let mut first_p2a_iter = indexer + .vecs + .address + .height_to_first_p2aaddressindex + .into_iter(); + let mut first_p2pk33_iter = indexer + .vecs + .address + .height_to_first_p2pk33addressindex + .into_iter(); + let mut first_p2pk65_iter = indexer + .vecs + .address + .height_to_first_p2pk65addressindex + .into_iter(); + let mut first_p2pkh_iter = indexer + .vecs + .address + .height_to_first_p2pkhaddressindex + .into_iter(); + let mut first_p2sh_iter = indexer + .vecs + .address + .height_to_first_p2shaddressindex + .into_iter(); + let mut first_p2tr_iter = indexer + .vecs + .address + .height_to_first_p2traddressindex + .into_iter(); + let mut first_p2wpkh_iter = indexer + .vecs + .address + .height_to_first_p2wpkhaddressindex + .into_iter(); + let mut first_p2wsh_iter = indexer + .vecs + .address + .height_to_first_p2wshaddressindex + .into_iter(); // Track running totals - recover from previous height if resuming - let (mut unspendable_supply, mut opreturn_supply, mut addresstype_to_addr_count, mut addresstype_to_empty_addr_count) = - if starting_height > Height::ZERO { - let prev_height = starting_height.decremented().unwrap(); - ( - vecs.height_to_unspendable_supply - .into_iter() - .get_unwrap(prev_height), - vecs.height_to_opreturn_supply - .into_iter() - .get_unwrap(prev_height), - AddressTypeToAddressCount::from(( - &vecs.addresstype_to_height_to_addr_count, - starting_height, - )), - AddressTypeToAddressCount::from(( - &vecs.addresstype_to_height_to_empty_addr_count, - starting_height, - )), - ) - } else { - ( - Sats::ZERO, - Sats::ZERO, - AddressTypeToAddressCount::default(), - AddressTypeToAddressCount::default(), - ) - }; + let ( + mut unspendable_supply, + mut opreturn_supply, + mut addresstype_to_addr_count, + mut addresstype_to_empty_addr_count, + ) = if starting_height > Height::ZERO { + let prev_height = starting_height.decremented().unwrap(); + ( + vecs.height_to_unspendable_supply + .into_iter() + .get_unwrap(prev_height), + vecs.height_to_opreturn_supply + .into_iter() + .get_unwrap(prev_height), + AddressTypeToAddressCount::from(( + &vecs.addresstype_to_height_to_addr_count, + starting_height, + )), + AddressTypeToAddressCount::from(( + &vecs.addresstype_to_height_to_empty_addr_count, + starting_height, + )), + ) + } else { + ( + Sats::ZERO, + Sats::ZERO, + AddressTypeToAddressCount::default(), + AddressTypeToAddressCount::default(), + ) + }; // Persistent address data caches (accumulate across blocks, flushed at checkpoints) let mut loaded_cache: AddressTypeToTypeIndexMap = @@ -169,7 +213,9 @@ pub fn process_blocks( // Get block metadata let first_txindex = height_to_first_txindex_iter.get_unwrap(height); let tx_count = u64::from(height_to_tx_count_iter.get_unwrap(height)); - let first_txoutindex = height_to_first_txoutindex_iter.get_unwrap(height).to_usize(); + let first_txoutindex = height_to_first_txoutindex_iter + .get_unwrap(height) + .to_usize(); let output_count = u64::from(height_to_output_count_iter.get_unwrap(height)) as usize; let first_txinindex = height_to_first_txinindex_iter.get_unwrap(height).to_usize(); let input_count = u64::from(height_to_input_count_iter.get_unwrap(height)) as usize; @@ -260,8 +306,9 @@ pub fn process_blocks( loaded_cache.merge_mut(inputs_result.address_data); // Combine txindex_vecs from outputs and inputs, then update tx_count - let combined_txindex_vecs = - outputs_result.txindex_vecs.merge_vec(inputs_result.txindex_vecs); + let combined_txindex_vecs = outputs_result + .txindex_vecs + .merge_vec(inputs_result.txindex_vecs); update_tx_counts(&mut loaded_cache, &mut empty_cache, combined_txindex_vecs); let mut transacted = outputs_result.transacted; @@ -478,7 +525,8 @@ fn flush_checkpoint( // Flush cohort states vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?; - vecs.address_cohorts.safe_flush_stateful_vecs(height, exit)?; + vecs.address_cohorts + .safe_flush_stateful_vecs(height, exit)?; // Flush height-indexed vectors vecs.height_to_unspendable_supply.safe_write(exit)?; @@ -488,8 +536,8 @@ fn flush_checkpoint( .safe_flush(exit)?; // Process and flush address data updates - let empty_updates = std::mem::take(empty_cache); - let loaded_updates = std::mem::take(loaded_cache); + let empty_updates = mem::take(empty_cache); + let loaded_updates = mem::take(loaded_cache); flush_checkpoint_full( height, &mut vecs diff --git a/crates/brk_error/src/lib.rs b/crates/brk_error/src/lib.rs index 8288025a8..094385f7c 100644 --- a/crates/brk_error/src/lib.rs +++ b/crates/brk_error/src/lib.rs @@ -35,6 +35,7 @@ pub enum Error { InvalidAddress, InvalidNetwork, InvalidTxid, + MempoolNotAvailable, UnknownAddress, UnknownTxid, UnsupportedType(String), @@ -189,6 +190,7 @@ impl fmt::Display for Error { Error::InvalidTxid => write!(f, "The provided TXID appears to be invalid"), Error::InvalidNetwork => write!(f, "Invalid network"), Error::InvalidAddress => write!(f, "The provided address appears to be invalid"), + Error::MempoolNotAvailable => write!(f, "Mempool data is not available"), Error::UnknownAddress => write!( f, "Address not found in the blockchain (no transaction history)" diff --git a/crates/brk_monitor/src/lib.rs b/crates/brk_monitor/src/lib.rs index f3def8077..c5235fb26 100644 --- a/crates/brk_monitor/src/lib.rs +++ b/crates/brk_monitor/src/lib.rs @@ -1,25 +1,20 @@ -use std::{ - collections::BTreeMap, - sync::Arc, - thread, - time::Duration, -}; +use std::{sync::Arc, thread, time::Duration}; use brk_error::Result; use brk_rpc::Client; use brk_types::{ - AddressBytes, AddressMempoolStats, FeeRate, MempoolInfo, RecommendedFees, TxWithHex, Txid, - VSize, + AddressBytes, AddressMempoolStats, MempoolInfo, RecommendedFees, TxWithHex, Txid, }; use derive_deref::Deref; use log::error; use parking_lot::{RwLock, RwLockReadGuard}; use rustc_hash::{FxHashMap, FxHashSet}; -const MAX_FETCHES_PER_CYCLE: usize = 10_000; +mod mempool; -/// Target block vsize (1MB = 1_000_000 vbytes, but using 4MW weight / 4 = 1MW vbytes max) -const BLOCK_VSIZE_TARGET: u64 = 1_000_000; +use mempool::{ProjectedBlocks, TxGraph}; + +const MAX_FETCHES_PER_CYCLE: usize = 10_000; /// /// Mempool monitor @@ -39,8 +34,8 @@ pub struct MempoolInner { client: Client, info: RwLock, fees: RwLock, - /// Map of fee rate -> total vsize at that fee rate, used for fee estimation - fee_rates: RwLock>, + graph: RwLock, + projected_blocks: RwLock, txs: RwLock>, addresses: RwLock)>>, } @@ -51,7 +46,8 @@ impl MempoolInner { client, info: RwLock::new(MempoolInfo::default()), fees: RwLock::new(RecommendedFees::default()), - fee_rates: RwLock::new(BTreeMap::new()), + graph: RwLock::new(TxGraph::new()), + projected_blocks: RwLock::new(ProjectedBlocks::default()), txs: RwLock::new(FxHashMap::default()), addresses: RwLock::new(FxHashMap::default()), } @@ -65,6 +61,10 @@ impl MempoolInner { self.fees.read().clone() } + pub fn get_projected_blocks(&self) -> ProjectedBlocks { + self.projected_blocks.read().clone() + } + pub fn get_txs(&self) -> RwLockReadGuard<'_, FxHashMap> { self.txs.read() } @@ -86,89 +86,149 @@ impl MempoolInner { } pub fn update(&self) -> Result<()> { - let txids = self + let current_txids = self .client .get_raw_mempool()? .into_iter() .collect::>(); - let new_txs = { - let txs = self.txs.read(); - txids - .iter() - .filter(|txid| !txs.contains_key(*txid)) - .take(MAX_FETCHES_PER_CYCLE) - .cloned() - .collect::>() + let new_txs = self.fetch_new_txs(¤t_txids); + let has_changes = self.apply_changes(¤t_txids, &new_txs); + + if has_changes { + self.rebuild_projected_blocks(); } - .into_iter() - .filter_map(|txid| { - self.client - .get_mempool_transaction(&txid) - .ok() - .map(|tx| (txid, tx)) - }) - .collect::>(); - - let mut info = self.info.write(); - let mut txs = self.txs.write(); - let mut addresses = self.addresses.write(); - - txs.retain(|txid, tx_with_hex| { - if txids.contains(txid) { - return true; - } - let tx = tx_with_hex.tx(); - info.remove(tx); - - tx.input - .iter() - .flat_map(|txin| txin.prevout.as_ref()) - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - set.remove(txid); - stats.sent(txout); - stats.update_tx_count(set.len() as u32); - }); - tx.output - .iter() - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - set.remove(txid); - stats.received(txout); - stats.update_tx_count(set.len() as u32); - }); - false - }); - - new_txs.iter().for_each(|(txid, tx_with_hex)| { - let tx = tx_with_hex.tx(); - info.add(tx); - - tx.input - .iter() - .flat_map(|txin| txin.prevout.as_ref()) - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - set.insert(txid.clone()); - stats.sending(txout); - stats.update_tx_count(set.len() as u32); - }); - tx.output - .iter() - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - set.insert(txid.clone()); - stats.receiving(txout); - stats.update_tx_count(set.len() as u32); - }); - }); - txs.extend(new_txs); Ok(()) } + + /// Fetch transactions that are new to our mempool + fn fetch_new_txs(&self, current_txids: &FxHashSet) -> FxHashMap { + let txs = self.txs.read(); + current_txids + .iter() + .filter(|txid| !txs.contains_key(*txid)) + .take(MAX_FETCHES_PER_CYCLE) + .cloned() + .collect::>() + .into_iter() + .filter_map(|txid| { + self.client + .get_mempool_transaction(&txid) + .ok() + .map(|tx| (txid, tx)) + }) + .collect() + } + + /// Apply transaction additions and removals, returns true if there were changes + fn apply_changes( + &self, + current_txids: &FxHashSet, + new_txs: &FxHashMap, + ) -> bool { + let mut info = self.info.write(); + let mut graph = self.graph.write(); + let mut txs = self.txs.write(); + let mut addresses = self.addresses.write(); + + let mut had_removals = false; + let had_additions = !new_txs.is_empty(); + + // Remove transactions no longer in mempool + txs.retain(|txid, tx_with_hex| { + if current_txids.contains(txid) { + return true; + } + + had_removals = true; + let tx = tx_with_hex.tx(); + + info.remove(tx); + graph.remove(txid); + Self::update_address_stats_on_removal(tx, txid, &mut addresses); + + false + }); + + // Add new transactions + for (txid, tx_with_hex) in new_txs { + let tx = tx_with_hex.tx(); + + info.add(tx); + graph.insert(tx); + Self::update_address_stats_on_addition(tx, txid, &mut addresses); + } + txs.extend(new_txs.clone()); + + had_removals || had_additions + } + + /// Rebuild projected blocks and update recommended fees + fn rebuild_projected_blocks(&self) { + let graph = self.graph.read(); + let projected = ProjectedBlocks::build(&graph); + let fees = projected.recommended_fees(); + + *self.projected_blocks.write() = projected; + *self.fees.write() = fees; + } + + fn update_address_stats_on_removal( + tx: &brk_types::Transaction, + txid: &Txid, + addresses: &mut FxHashMap)>, + ) { + // Inputs: undo "sending" state + tx.input + .iter() + .flat_map(|txin| txin.prevout.as_ref()) + .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) + .for_each(|(txout, bytes)| { + let (stats, set) = addresses.entry(bytes).or_default(); + set.remove(txid); + stats.sent(txout); + stats.update_tx_count(set.len() as u32); + }); + + // Outputs: undo "receiving" state + tx.output + .iter() + .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) + .for_each(|(txout, bytes)| { + let (stats, set) = addresses.entry(bytes).or_default(); + set.remove(txid); + stats.received(txout); + stats.update_tx_count(set.len() as u32); + }); + } + + fn update_address_stats_on_addition( + tx: &brk_types::Transaction, + txid: &Txid, + addresses: &mut FxHashMap)>, + ) { + // Inputs: mark as "sending" + tx.input + .iter() + .flat_map(|txin| txin.prevout.as_ref()) + .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) + .for_each(|(txout, bytes)| { + let (stats, set) = addresses.entry(bytes).or_default(); + set.insert(txid.clone()); + stats.sending(txout); + stats.update_tx_count(set.len() as u32); + }); + + // Outputs: mark as "receiving" + tx.output + .iter() + .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) + .for_each(|(txout, bytes)| { + let (stats, set) = addresses.entry(bytes).or_default(); + set.insert(txid.clone()); + stats.receiving(txout); + stats.update_tx_count(set.len() as u32); + }); + } } diff --git a/crates/brk_monitor/src/mempool/entry.rs b/crates/brk_monitor/src/mempool/entry.rs index 852ab6c85..c8735518c 100644 --- a/crates/brk_monitor/src/mempool/entry.rs +++ b/crates/brk_monitor/src/mempool/entry.rs @@ -1,6 +1,9 @@ -use brk_types::{FeeRate, Outpoint, Sats, Transaction, Txid, VSize}; +use brk_types::{FeeRate, Sats, Transaction, Txid, VSize, Vout}; use rustc_hash::FxHashSet; +/// (txid, vout) tuple identifying an unspent output in the mempool +pub type MempoolOutpoint = (Txid, Vout); + /// A mempool transaction with its dependency metadata #[derive(Debug, Clone)] pub struct MempoolEntry { @@ -9,7 +12,7 @@ pub struct MempoolEntry { pub vsize: VSize, /// Outpoints this tx spends (inputs) - pub spends: Vec, + pub spends: Vec, /// Txids of unconfirmed ancestors (parents, grandparents, etc.) pub ancestors: FxHashSet, @@ -30,7 +33,7 @@ impl MempoolEntry { let spends = tx .input .iter() - .map(|txin| Outpoint::new(txin.txid.clone(), txin.vout)) + .map(|txin| (txin.txid.clone(), txin.vout)) .collect(); Self { diff --git a/crates/brk_monitor/src/mempool/graph.rs b/crates/brk_monitor/src/mempool/graph.rs index 9876cb518..93b5c968d 100644 --- a/crates/brk_monitor/src/mempool/graph.rs +++ b/crates/brk_monitor/src/mempool/graph.rs @@ -1,6 +1,7 @@ -use brk_types::{Outpoint, Sats, Transaction, Txid, VSize}; +use brk_types::{Sats, Transaction, Txid, VSize, Vout}; use rustc_hash::{FxHashMap, FxHashSet}; +use super::entry::MempoolOutpoint; use super::MempoolEntry; /// Transaction dependency graph for the mempool @@ -13,7 +14,7 @@ pub struct TxGraph { entries: FxHashMap, /// Maps outpoint -> txid that created it (for finding parents) - outpoint_to_tx: FxHashMap, + outpoint_to_tx: FxHashMap, /// Maps txid -> txids that spend its outputs (children) children: FxHashMap>, @@ -51,7 +52,7 @@ impl TxGraph { // Register this tx's outputs for (vout, _) in tx.output.iter().enumerate() { - let outpoint = Outpoint::new(entry.txid.clone(), vout as u32); + let outpoint = (entry.txid.clone(), Vout::from(vout as u32)); self.outpoint_to_tx.insert(outpoint, entry.txid.clone()); } @@ -97,7 +98,7 @@ impl TxGraph { } /// Find which inputs reference in-mempool transactions (parents) - fn find_parents(&self, spends: &[Outpoint]) -> Vec { + fn find_parents(&self, spends: &[MempoolOutpoint]) -> Vec { spends .iter() .filter_map(|outpoint| self.outpoint_to_tx.get(outpoint).cloned()) diff --git a/crates/brk_monitor/src/mempool/projected_blocks.rs b/crates/brk_monitor/src/mempool/projected_blocks.rs index 59dc4116c..1e705d5ac 100644 --- a/crates/brk_monitor/src/mempool/projected_blocks.rs +++ b/crates/brk_monitor/src/mempool/projected_blocks.rs @@ -1,3 +1,5 @@ +use std::mem; + use brk_types::{FeeRate, RecommendedFees, Sats, Txid, VSize}; use rustc_hash::FxHashSet; @@ -12,6 +14,9 @@ const BLOCK_VSIZE_TARGET: u64 = MAX_BLOCK_WEIGHT / 4; /// Number of projected blocks to build const NUM_PROJECTED_BLOCKS: usize = 8; +/// Minimum fee rate (no priority) +const MIN_FEE_RATE: f64 = 0.1; + /// A projected future block built from mempool transactions #[derive(Debug, Clone, Default)] pub struct ProjectedBlock { @@ -43,7 +48,14 @@ impl ProjectedBlocks { let mut sorted: Vec<_> = graph .entries() .iter() - .map(|(txid, entry)| (txid.clone(), entry.ancestor_fee_rate(), entry.vsize, entry.fee)) + .map(|(txid, entry)| { + ( + txid.clone(), + entry.ancestor_fee_rate(), + entry.vsize, + entry.fee, + ) + }) .collect(); sorted.sort_by(|a, b| b.1.cmp(&a.1)); @@ -62,19 +74,14 @@ impl ProjectedBlocks { // Would this tx fit in the current block? let new_vsize = current_block.total_vsize + vsize; - if u64::from(new_vsize) > BLOCK_VSIZE_TARGET { - // Finalize current block if it has transactions - if !current_block.txids.is_empty() { - Self::finalize_block(&mut current_block); - blocks.push(current_block); + if u64::from(new_vsize) > BLOCK_VSIZE_TARGET && !current_block.txids.is_empty() { + // Finalize and store current block + Self::finalize_block(&mut current_block); + blocks.push(mem::take(&mut current_block)); - if blocks.len() >= NUM_PROJECTED_BLOCKS { - break; - } + if blocks.len() >= NUM_PROJECTED_BLOCKS { + break; } - - // Start new block - current_block = ProjectedBlock::default(); } // Add to current block @@ -103,20 +110,19 @@ impl ProjectedBlocks { pub fn recommended_fees(&self) -> RecommendedFees { RecommendedFees { fastest_fee: self.fee_for_block(0), - half_hour_fee: self.fee_for_block(2), // ~3 blocks - hour_fee: self.fee_for_block(5), // ~6 blocks - economy_fee: self.fee_for_block(7), // ~12 blocks, but we only have 8 - minimum_fee: 1.0, + half_hour_fee: self.fee_for_block(2), // ~3 blocks + hour_fee: self.fee_for_block(5), // ~6 blocks + economy_fee: self.fee_for_block(7), // ~8 blocks + minimum_fee: FeeRate::from(MIN_FEE_RATE), } } /// Get the minimum fee rate needed to get into block N - fn fee_for_block(&self, block_index: usize) -> f64 { + fn fee_for_block(&self, block_index: usize) -> FeeRate { self.blocks .get(block_index) - .map(|b| f64::from(b.min_fee_rate)) - .unwrap_or(1.0) - .max(1.0) // Never recommend below 1 sat/vB + .map(|b| b.min_fee_rate) + .unwrap_or_else(|| FeeRate::from(MIN_FEE_RATE)) } fn finalize_block(block: &mut ProjectedBlock) { diff --git a/crates/brk_query/src/async.rs b/crates/brk_query/src/async.rs index 95941c711..4f5da9930 100644 --- a/crates/brk_query/src/async.rs +++ b/crates/brk_query/src/async.rs @@ -7,7 +7,7 @@ use brk_monitor::Mempool; use brk_reader::Reader; use brk_types::{ Address, AddressStats, BlockInfo, BlockStatus, Height, Index, IndexInfo, Limit, MempoolInfo, - Metric, MetricCount, Transaction, TreeNode, TxStatus, Txid, TxidPath, Utxo, + Metric, MetricCount, RecommendedFees, Transaction, TreeNode, TxStatus, Txid, TxidPath, Utxo, }; use tokio::task::spawn_blocking; @@ -98,13 +98,15 @@ impl AsyncQuery { } pub async fn get_mempool_info(&self) -> Result { - let query = self.0.clone(); - spawn_blocking(move || query.get_mempool_info()).await? + self.0.get_mempool_info() } pub async fn get_mempool_txids(&self) -> Result> { - let query = self.0.clone(); - spawn_blocking(move || query.get_mempool_txids()).await? + self.0.get_mempool_txids() + } + + pub async fn get_recommended_fees(&self) -> Result { + self.0.get_recommended_fees() } pub async fn match_metric(&self, metric: Metric, limit: Limit) -> Result> { diff --git a/crates/brk_query/src/chain/mempool/fees.rs b/crates/brk_query/src/chain/mempool/fees.rs new file mode 100644 index 000000000..9b84b99ef --- /dev/null +++ b/crates/brk_query/src/chain/mempool/fees.rs @@ -0,0 +1,11 @@ +use brk_error::{Error, Result}; +use brk_types::RecommendedFees; + +use crate::Query; + +pub fn get_recommended_fees(query: &Query) -> Result { + query + .mempool() + .map(|mempool| mempool.get_fees()) + .ok_or(Error::MempoolNotAvailable) +} diff --git a/crates/brk_query/src/chain/mempool/mod.rs b/crates/brk_query/src/chain/mempool/mod.rs index 56eda64ec..4b2ff80e4 100644 --- a/crates/brk_query/src/chain/mempool/mod.rs +++ b/crates/brk_query/src/chain/mempool/mod.rs @@ -1,5 +1,7 @@ +mod fees; mod info; mod txids; +pub use fees::*; pub use info::*; pub use txids::*; diff --git a/crates/brk_query/src/lib.rs b/crates/brk_query/src/lib.rs index a9ccdef21..805952609 100644 --- a/crates/brk_query/src/lib.rs +++ b/crates/brk_query/src/lib.rs @@ -11,7 +11,7 @@ use brk_reader::Reader; use brk_traversable::TreeNode; use brk_types::{ Address, AddressStats, BlockInfo, BlockStatus, Format, Height, Index, IndexInfo, Limit, - MempoolInfo, Metric, MetricCount, Transaction, TxStatus, Txid, TxidPath, Utxo, + MempoolInfo, Metric, MetricCount, RecommendedFees, Transaction, TxStatus, Txid, TxidPath, Utxo, }; use vecdb::{AnyExportableVec, AnyStoredVec}; @@ -35,8 +35,8 @@ use crate::{ chain::{ get_address, get_address_txids, get_address_utxos, get_block_by_height, get_block_status_by_height, get_block_txids, get_blocks, get_height_by_hash, - get_mempool_info, get_mempool_txids, get_transaction, get_transaction_hex, - get_transaction_status, + get_mempool_info, get_mempool_txids, get_recommended_fees, get_transaction, + get_transaction_hex, get_transaction_status, }, vecs::{IndexToVec, MetricToVec}, }; @@ -136,6 +136,10 @@ impl Query { get_mempool_txids(self) } + pub fn get_recommended_fees(&self) -> Result { + get_recommended_fees(self) + } + pub fn match_metric(&self, metric: &Metric, limit: Limit) -> Vec<&'static str> { self.vecs().matches(metric, limit) } diff --git a/crates/brk_server/src/api/mempool/mod.rs b/crates/brk_server/src/api/mempool/mod.rs index a6ec81913..4b4b1a054 100644 --- a/crates/brk_server/src/api/mempool/mod.rs +++ b/crates/brk_server/src/api/mempool/mod.rs @@ -5,7 +5,7 @@ use axum::{ response::{Redirect, Response}, routing::get, }; -use brk_types::{MempoolInfo, Txid}; +use brk_types::{MempoolInfo, RecommendedFees, Txid}; use crate::{ VERSION, @@ -62,5 +62,25 @@ impl MempoolRoutes for ApiRouter { }, ), ) + .api_route( + "/api/v1/fees/recommended", + get_with( + async |headers: HeaderMap, State(state): State| { + let etag = format!("{VERSION}-{}", state.get_height().await); + if headers.has_etag(&etag) { + return Response::new_not_modified(); + } + state.get_recommended_fees().await.to_json_response(&etag) + }, + |op| { + op.mempool_tag() + .summary("Recommended fees") + .description("Get recommended fee rates for different confirmation targets based on current mempool state.") + .ok_response::() + .not_modified() + .server_error() + }, + ), + ) } } diff --git a/crates/brk_types/src/metrics.rs b/crates/brk_types/src/metrics.rs index 9f0f97784..205108007 100644 --- a/crates/brk_types/src/metrics.rs +++ b/crates/brk_types/src/metrics.rs @@ -1,4 +1,4 @@ -use std::fmt; +use std::{fmt, mem}; use derive_deref::Deref; use schemars::JsonSchema; @@ -98,7 +98,7 @@ fn sanitize(dirty: impl Iterator) -> Vec { match c { ' ' | ',' | '+' => { if !current.is_empty() { - clean.push(std::mem::take(&mut current)); + clean.push(mem::take(&mut current)); } } '-' => current.push('_'),