mempool: snapshot

This commit is contained in:
nym21
2025-12-13 11:10:11 +01:00
parent 158b0254ed
commit 9c8b9b1a3b
12 changed files with 333 additions and 174 deletions

View File

@@ -8,12 +8,14 @@
//! 5. Push to height-indexed vectors
//! 6. Periodically flush checkpoints
use std::thread;
use std::{mem, thread};
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_indexer::Indexer;
use brk_types::{DateIndex, Dollars, Height, OutputType, Sats, Timestamp, TxInIndex, TxOutIndex, TypeIndex};
use brk_types::{
DateIndex, Dollars, Height, OutputType, Sats, Timestamp, TxInIndex, TxOutIndex, TypeIndex,
};
use log::info;
use rayon::prelude::*;
use vecdb::{AnyStoredVec, Exit, GenericStoredVec, IterableVec, TypedVecIterator, VecIndex};
@@ -27,8 +29,9 @@ use super::super::cohorts::{AddressCohorts, DynCohortVecs, UTXOCohorts};
use super::super::vecs::Vecs;
use super::{
BIP30_DUPLICATE_HEIGHT_1, BIP30_DUPLICATE_HEIGHT_2, BIP30_ORIGINAL_HEIGHT_1,
BIP30_ORIGINAL_HEIGHT_2, FLUSH_INTERVAL, IndexerReaders, VecsReaders, build_txinindex_to_txindex,
build_txoutindex_to_txindex, flush::flush_checkpoint as flush_checkpoint_full,
BIP30_ORIGINAL_HEIGHT_2, FLUSH_INTERVAL, IndexerReaders, VecsReaders,
build_txinindex_to_txindex, build_txoutindex_to_txindex,
flush::flush_checkpoint as flush_checkpoint_full,
};
use crate::stateful_new::address::AddressTypeToAddressCount;
use crate::stateful_new::process::{
@@ -69,7 +72,12 @@ pub fn process_blocks(
let height_to_tx_count = chain.indexes_to_tx_count.height.u();
let height_to_output_count = chain.indexes_to_output_count.height.unwrap_sum();
let height_to_input_count = chain.indexes_to_input_count.height.unwrap_sum();
let height_to_unclaimed_rewards = chain.indexes_to_unclaimed_rewards.sats.height.as_ref().unwrap();
let height_to_unclaimed_rewards = chain
.indexes_to_unclaimed_rewards
.sats
.height
.as_ref()
.unwrap();
// From indexes:
let height_to_timestamp = &indexes.height_to_timestamp_fixed;
@@ -114,43 +122,79 @@ pub fn process_blocks(
let mut vr = VecsReaders::new(&vecs.any_address_indexes, &vecs.addresses_data);
// Create iterators for first address indexes per type
let mut first_p2a_iter = indexer.vecs.address.height_to_first_p2aaddressindex.into_iter();
let mut first_p2pk33_iter = indexer.vecs.address.height_to_first_p2pk33addressindex.into_iter();
let mut first_p2pk65_iter = indexer.vecs.address.height_to_first_p2pk65addressindex.into_iter();
let mut first_p2pkh_iter = indexer.vecs.address.height_to_first_p2pkhaddressindex.into_iter();
let mut first_p2sh_iter = indexer.vecs.address.height_to_first_p2shaddressindex.into_iter();
let mut first_p2tr_iter = indexer.vecs.address.height_to_first_p2traddressindex.into_iter();
let mut first_p2wpkh_iter = indexer.vecs.address.height_to_first_p2wpkhaddressindex.into_iter();
let mut first_p2wsh_iter = indexer.vecs.address.height_to_first_p2wshaddressindex.into_iter();
let mut first_p2a_iter = indexer
.vecs
.address
.height_to_first_p2aaddressindex
.into_iter();
let mut first_p2pk33_iter = indexer
.vecs
.address
.height_to_first_p2pk33addressindex
.into_iter();
let mut first_p2pk65_iter = indexer
.vecs
.address
.height_to_first_p2pk65addressindex
.into_iter();
let mut first_p2pkh_iter = indexer
.vecs
.address
.height_to_first_p2pkhaddressindex
.into_iter();
let mut first_p2sh_iter = indexer
.vecs
.address
.height_to_first_p2shaddressindex
.into_iter();
let mut first_p2tr_iter = indexer
.vecs
.address
.height_to_first_p2traddressindex
.into_iter();
let mut first_p2wpkh_iter = indexer
.vecs
.address
.height_to_first_p2wpkhaddressindex
.into_iter();
let mut first_p2wsh_iter = indexer
.vecs
.address
.height_to_first_p2wshaddressindex
.into_iter();
// Track running totals - recover from previous height if resuming
let (mut unspendable_supply, mut opreturn_supply, mut addresstype_to_addr_count, mut addresstype_to_empty_addr_count) =
if starting_height > Height::ZERO {
let prev_height = starting_height.decremented().unwrap();
(
vecs.height_to_unspendable_supply
.into_iter()
.get_unwrap(prev_height),
vecs.height_to_opreturn_supply
.into_iter()
.get_unwrap(prev_height),
AddressTypeToAddressCount::from((
&vecs.addresstype_to_height_to_addr_count,
starting_height,
)),
AddressTypeToAddressCount::from((
&vecs.addresstype_to_height_to_empty_addr_count,
starting_height,
)),
)
} else {
(
Sats::ZERO,
Sats::ZERO,
AddressTypeToAddressCount::default(),
AddressTypeToAddressCount::default(),
)
};
let (
mut unspendable_supply,
mut opreturn_supply,
mut addresstype_to_addr_count,
mut addresstype_to_empty_addr_count,
) = if starting_height > Height::ZERO {
let prev_height = starting_height.decremented().unwrap();
(
vecs.height_to_unspendable_supply
.into_iter()
.get_unwrap(prev_height),
vecs.height_to_opreturn_supply
.into_iter()
.get_unwrap(prev_height),
AddressTypeToAddressCount::from((
&vecs.addresstype_to_height_to_addr_count,
starting_height,
)),
AddressTypeToAddressCount::from((
&vecs.addresstype_to_height_to_empty_addr_count,
starting_height,
)),
)
} else {
(
Sats::ZERO,
Sats::ZERO,
AddressTypeToAddressCount::default(),
AddressTypeToAddressCount::default(),
)
};
// Persistent address data caches (accumulate across blocks, flushed at checkpoints)
let mut loaded_cache: AddressTypeToTypeIndexMap<LoadedAddressDataWithSource> =
@@ -169,7 +213,9 @@ pub fn process_blocks(
// Get block metadata
let first_txindex = height_to_first_txindex_iter.get_unwrap(height);
let tx_count = u64::from(height_to_tx_count_iter.get_unwrap(height));
let first_txoutindex = height_to_first_txoutindex_iter.get_unwrap(height).to_usize();
let first_txoutindex = height_to_first_txoutindex_iter
.get_unwrap(height)
.to_usize();
let output_count = u64::from(height_to_output_count_iter.get_unwrap(height)) as usize;
let first_txinindex = height_to_first_txinindex_iter.get_unwrap(height).to_usize();
let input_count = u64::from(height_to_input_count_iter.get_unwrap(height)) as usize;
@@ -260,8 +306,9 @@ pub fn process_blocks(
loaded_cache.merge_mut(inputs_result.address_data);
// Combine txindex_vecs from outputs and inputs, then update tx_count
let combined_txindex_vecs =
outputs_result.txindex_vecs.merge_vec(inputs_result.txindex_vecs);
let combined_txindex_vecs = outputs_result
.txindex_vecs
.merge_vec(inputs_result.txindex_vecs);
update_tx_counts(&mut loaded_cache, &mut empty_cache, combined_txindex_vecs);
let mut transacted = outputs_result.transacted;
@@ -478,7 +525,8 @@ fn flush_checkpoint(
// Flush cohort states
vecs.utxo_cohorts.safe_flush_stateful_vecs(height, exit)?;
vecs.address_cohorts.safe_flush_stateful_vecs(height, exit)?;
vecs.address_cohorts
.safe_flush_stateful_vecs(height, exit)?;
// Flush height-indexed vectors
vecs.height_to_unspendable_supply.safe_write(exit)?;
@@ -488,8 +536,8 @@ fn flush_checkpoint(
.safe_flush(exit)?;
// Process and flush address data updates
let empty_updates = std::mem::take(empty_cache);
let loaded_updates = std::mem::take(loaded_cache);
let empty_updates = mem::take(empty_cache);
let loaded_updates = mem::take(loaded_cache);
flush_checkpoint_full(
height,
&mut vecs

View File

@@ -35,6 +35,7 @@ pub enum Error {
InvalidAddress,
InvalidNetwork,
InvalidTxid,
MempoolNotAvailable,
UnknownAddress,
UnknownTxid,
UnsupportedType(String),
@@ -189,6 +190,7 @@ impl fmt::Display for Error {
Error::InvalidTxid => write!(f, "The provided TXID appears to be invalid"),
Error::InvalidNetwork => write!(f, "Invalid network"),
Error::InvalidAddress => write!(f, "The provided address appears to be invalid"),
Error::MempoolNotAvailable => write!(f, "Mempool data is not available"),
Error::UnknownAddress => write!(
f,
"Address not found in the blockchain (no transaction history)"

View File

@@ -1,25 +1,20 @@
use std::{
collections::BTreeMap,
sync::Arc,
thread,
time::Duration,
};
use std::{sync::Arc, thread, time::Duration};
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{
AddressBytes, AddressMempoolStats, FeeRate, MempoolInfo, RecommendedFees, TxWithHex, Txid,
VSize,
AddressBytes, AddressMempoolStats, MempoolInfo, RecommendedFees, TxWithHex, Txid,
};
use derive_deref::Deref;
use log::error;
use parking_lot::{RwLock, RwLockReadGuard};
use rustc_hash::{FxHashMap, FxHashSet};
const MAX_FETCHES_PER_CYCLE: usize = 10_000;
mod mempool;
/// Target block vsize (1MB = 1_000_000 vbytes, but using 4MW weight / 4 = 1MW vbytes max)
const BLOCK_VSIZE_TARGET: u64 = 1_000_000;
use mempool::{ProjectedBlocks, TxGraph};
const MAX_FETCHES_PER_CYCLE: usize = 10_000;
///
/// Mempool monitor
@@ -39,8 +34,8 @@ pub struct MempoolInner {
client: Client,
info: RwLock<MempoolInfo>,
fees: RwLock<RecommendedFees>,
/// Map of fee rate -> total vsize at that fee rate, used for fee estimation
fee_rates: RwLock<BTreeMap<FeeRate, VSize>>,
graph: RwLock<TxGraph>,
projected_blocks: RwLock<ProjectedBlocks>,
txs: RwLock<FxHashMap<Txid, TxWithHex>>,
addresses: RwLock<FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>>,
}
@@ -51,7 +46,8 @@ impl MempoolInner {
client,
info: RwLock::new(MempoolInfo::default()),
fees: RwLock::new(RecommendedFees::default()),
fee_rates: RwLock::new(BTreeMap::new()),
graph: RwLock::new(TxGraph::new()),
projected_blocks: RwLock::new(ProjectedBlocks::default()),
txs: RwLock::new(FxHashMap::default()),
addresses: RwLock::new(FxHashMap::default()),
}
@@ -65,6 +61,10 @@ impl MempoolInner {
self.fees.read().clone()
}
pub fn get_projected_blocks(&self) -> ProjectedBlocks {
self.projected_blocks.read().clone()
}
pub fn get_txs(&self) -> RwLockReadGuard<'_, FxHashMap<Txid, TxWithHex>> {
self.txs.read()
}
@@ -86,89 +86,149 @@ impl MempoolInner {
}
pub fn update(&self) -> Result<()> {
let txids = self
let current_txids = self
.client
.get_raw_mempool()?
.into_iter()
.collect::<FxHashSet<_>>();
let new_txs = {
let txs = self.txs.read();
txids
.iter()
.filter(|txid| !txs.contains_key(*txid))
.take(MAX_FETCHES_PER_CYCLE)
.cloned()
.collect::<Vec<_>>()
let new_txs = self.fetch_new_txs(&current_txids);
let has_changes = self.apply_changes(&current_txids, &new_txs);
if has_changes {
self.rebuild_projected_blocks();
}
.into_iter()
.filter_map(|txid| {
self.client
.get_mempool_transaction(&txid)
.ok()
.map(|tx| (txid, tx))
})
.collect::<FxHashMap<_, _>>();
let mut info = self.info.write();
let mut txs = self.txs.write();
let mut addresses = self.addresses.write();
txs.retain(|txid, tx_with_hex| {
if txids.contains(txid) {
return true;
}
let tx = tx_with_hex.tx();
info.remove(tx);
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.sent(txout);
stats.update_tx_count(set.len() as u32);
});
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.received(txout);
stats.update_tx_count(set.len() as u32);
});
false
});
new_txs.iter().for_each(|(txid, tx_with_hex)| {
let tx = tx_with_hex.tx();
info.add(tx);
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.sending(txout);
stats.update_tx_count(set.len() as u32);
});
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.receiving(txout);
stats.update_tx_count(set.len() as u32);
});
});
txs.extend(new_txs);
Ok(())
}
/// Fetch transactions that are new to our mempool
fn fetch_new_txs(&self, current_txids: &FxHashSet<Txid>) -> FxHashMap<Txid, TxWithHex> {
let txs = self.txs.read();
current_txids
.iter()
.filter(|txid| !txs.contains_key(*txid))
.take(MAX_FETCHES_PER_CYCLE)
.cloned()
.collect::<Vec<_>>()
.into_iter()
.filter_map(|txid| {
self.client
.get_mempool_transaction(&txid)
.ok()
.map(|tx| (txid, tx))
})
.collect()
}
/// Apply transaction additions and removals, returns true if there were changes
fn apply_changes(
&self,
current_txids: &FxHashSet<Txid>,
new_txs: &FxHashMap<Txid, TxWithHex>,
) -> bool {
let mut info = self.info.write();
let mut graph = self.graph.write();
let mut txs = self.txs.write();
let mut addresses = self.addresses.write();
let mut had_removals = false;
let had_additions = !new_txs.is_empty();
// Remove transactions no longer in mempool
txs.retain(|txid, tx_with_hex| {
if current_txids.contains(txid) {
return true;
}
had_removals = true;
let tx = tx_with_hex.tx();
info.remove(tx);
graph.remove(txid);
Self::update_address_stats_on_removal(tx, txid, &mut addresses);
false
});
// Add new transactions
for (txid, tx_with_hex) in new_txs {
let tx = tx_with_hex.tx();
info.add(tx);
graph.insert(tx);
Self::update_address_stats_on_addition(tx, txid, &mut addresses);
}
txs.extend(new_txs.clone());
had_removals || had_additions
}
/// Rebuild projected blocks and update recommended fees
fn rebuild_projected_blocks(&self) {
let graph = self.graph.read();
let projected = ProjectedBlocks::build(&graph);
let fees = projected.recommended_fees();
*self.projected_blocks.write() = projected;
*self.fees.write() = fees;
}
fn update_address_stats_on_removal(
tx: &brk_types::Transaction,
txid: &Txid,
addresses: &mut FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>,
) {
// Inputs: undo "sending" state
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.sent(txout);
stats.update_tx_count(set.len() as u32);
});
// Outputs: undo "receiving" state
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.received(txout);
stats.update_tx_count(set.len() as u32);
});
}
fn update_address_stats_on_addition(
tx: &brk_types::Transaction,
txid: &Txid,
addresses: &mut FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>,
) {
// Inputs: mark as "sending"
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.sending(txout);
stats.update_tx_count(set.len() as u32);
});
// Outputs: mark as "receiving"
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.receiving(txout);
stats.update_tx_count(set.len() as u32);
});
}
}

View File

@@ -1,6 +1,9 @@
use brk_types::{FeeRate, Outpoint, Sats, Transaction, Txid, VSize};
use brk_types::{FeeRate, Sats, Transaction, Txid, VSize, Vout};
use rustc_hash::FxHashSet;
/// (txid, vout) tuple identifying an unspent output in the mempool
pub type MempoolOutpoint = (Txid, Vout);
/// A mempool transaction with its dependency metadata
#[derive(Debug, Clone)]
pub struct MempoolEntry {
@@ -9,7 +12,7 @@ pub struct MempoolEntry {
pub vsize: VSize,
/// Outpoints this tx spends (inputs)
pub spends: Vec<Outpoint>,
pub spends: Vec<MempoolOutpoint>,
/// Txids of unconfirmed ancestors (parents, grandparents, etc.)
pub ancestors: FxHashSet<Txid>,
@@ -30,7 +33,7 @@ impl MempoolEntry {
let spends = tx
.input
.iter()
.map(|txin| Outpoint::new(txin.txid.clone(), txin.vout))
.map(|txin| (txin.txid.clone(), txin.vout))
.collect();
Self {

View File

@@ -1,6 +1,7 @@
use brk_types::{Outpoint, Sats, Transaction, Txid, VSize};
use brk_types::{Sats, Transaction, Txid, VSize, Vout};
use rustc_hash::{FxHashMap, FxHashSet};
use super::entry::MempoolOutpoint;
use super::MempoolEntry;
/// Transaction dependency graph for the mempool
@@ -13,7 +14,7 @@ pub struct TxGraph {
entries: FxHashMap<Txid, MempoolEntry>,
/// Maps outpoint -> txid that created it (for finding parents)
outpoint_to_tx: FxHashMap<Outpoint, Txid>,
outpoint_to_tx: FxHashMap<MempoolOutpoint, Txid>,
/// Maps txid -> txids that spend its outputs (children)
children: FxHashMap<Txid, FxHashSet<Txid>>,
@@ -51,7 +52,7 @@ impl TxGraph {
// Register this tx's outputs
for (vout, _) in tx.output.iter().enumerate() {
let outpoint = Outpoint::new(entry.txid.clone(), vout as u32);
let outpoint = (entry.txid.clone(), Vout::from(vout as u32));
self.outpoint_to_tx.insert(outpoint, entry.txid.clone());
}
@@ -97,7 +98,7 @@ impl TxGraph {
}
/// Find which inputs reference in-mempool transactions (parents)
fn find_parents(&self, spends: &[Outpoint]) -> Vec<Txid> {
fn find_parents(&self, spends: &[MempoolOutpoint]) -> Vec<Txid> {
spends
.iter()
.filter_map(|outpoint| self.outpoint_to_tx.get(outpoint).cloned())

View File

@@ -1,3 +1,5 @@
use std::mem;
use brk_types::{FeeRate, RecommendedFees, Sats, Txid, VSize};
use rustc_hash::FxHashSet;
@@ -12,6 +14,9 @@ const BLOCK_VSIZE_TARGET: u64 = MAX_BLOCK_WEIGHT / 4;
/// Number of projected blocks to build
const NUM_PROJECTED_BLOCKS: usize = 8;
/// Minimum fee rate (no priority)
const MIN_FEE_RATE: f64 = 0.1;
/// A projected future block built from mempool transactions
#[derive(Debug, Clone, Default)]
pub struct ProjectedBlock {
@@ -43,7 +48,14 @@ impl ProjectedBlocks {
let mut sorted: Vec<_> = graph
.entries()
.iter()
.map(|(txid, entry)| (txid.clone(), entry.ancestor_fee_rate(), entry.vsize, entry.fee))
.map(|(txid, entry)| {
(
txid.clone(),
entry.ancestor_fee_rate(),
entry.vsize,
entry.fee,
)
})
.collect();
sorted.sort_by(|a, b| b.1.cmp(&a.1));
@@ -62,19 +74,14 @@ impl ProjectedBlocks {
// Would this tx fit in the current block?
let new_vsize = current_block.total_vsize + vsize;
if u64::from(new_vsize) > BLOCK_VSIZE_TARGET {
// Finalize current block if it has transactions
if !current_block.txids.is_empty() {
Self::finalize_block(&mut current_block);
blocks.push(current_block);
if u64::from(new_vsize) > BLOCK_VSIZE_TARGET && !current_block.txids.is_empty() {
// Finalize and store current block
Self::finalize_block(&mut current_block);
blocks.push(mem::take(&mut current_block));
if blocks.len() >= NUM_PROJECTED_BLOCKS {
break;
}
if blocks.len() >= NUM_PROJECTED_BLOCKS {
break;
}
// Start new block
current_block = ProjectedBlock::default();
}
// Add to current block
@@ -103,20 +110,19 @@ impl ProjectedBlocks {
pub fn recommended_fees(&self) -> RecommendedFees {
RecommendedFees {
fastest_fee: self.fee_for_block(0),
half_hour_fee: self.fee_for_block(2), // ~3 blocks
hour_fee: self.fee_for_block(5), // ~6 blocks
economy_fee: self.fee_for_block(7), // ~12 blocks, but we only have 8
minimum_fee: 1.0,
half_hour_fee: self.fee_for_block(2), // ~3 blocks
hour_fee: self.fee_for_block(5), // ~6 blocks
economy_fee: self.fee_for_block(7), // ~8 blocks
minimum_fee: FeeRate::from(MIN_FEE_RATE),
}
}
/// Get the minimum fee rate needed to get into block N
fn fee_for_block(&self, block_index: usize) -> f64 {
fn fee_for_block(&self, block_index: usize) -> FeeRate {
self.blocks
.get(block_index)
.map(|b| f64::from(b.min_fee_rate))
.unwrap_or(1.0)
.max(1.0) // Never recommend below 1 sat/vB
.map(|b| b.min_fee_rate)
.unwrap_or_else(|| FeeRate::from(MIN_FEE_RATE))
}
fn finalize_block(block: &mut ProjectedBlock) {

View File

@@ -7,7 +7,7 @@ use brk_monitor::Mempool;
use brk_reader::Reader;
use brk_types::{
Address, AddressStats, BlockInfo, BlockStatus, Height, Index, IndexInfo, Limit, MempoolInfo,
Metric, MetricCount, Transaction, TreeNode, TxStatus, Txid, TxidPath, Utxo,
Metric, MetricCount, RecommendedFees, Transaction, TreeNode, TxStatus, Txid, TxidPath, Utxo,
};
use tokio::task::spawn_blocking;
@@ -98,13 +98,15 @@ impl AsyncQuery {
}
pub async fn get_mempool_info(&self) -> Result<MempoolInfo> {
let query = self.0.clone();
spawn_blocking(move || query.get_mempool_info()).await?
self.0.get_mempool_info()
}
pub async fn get_mempool_txids(&self) -> Result<Vec<Txid>> {
let query = self.0.clone();
spawn_blocking(move || query.get_mempool_txids()).await?
self.0.get_mempool_txids()
}
pub async fn get_recommended_fees(&self) -> Result<RecommendedFees> {
self.0.get_recommended_fees()
}
pub async fn match_metric(&self, metric: Metric, limit: Limit) -> Result<Vec<&'static str>> {

View File

@@ -0,0 +1,11 @@
use brk_error::{Error, Result};
use brk_types::RecommendedFees;
use crate::Query;
pub fn get_recommended_fees(query: &Query) -> Result<RecommendedFees> {
query
.mempool()
.map(|mempool| mempool.get_fees())
.ok_or(Error::MempoolNotAvailable)
}

View File

@@ -1,5 +1,7 @@
mod fees;
mod info;
mod txids;
pub use fees::*;
pub use info::*;
pub use txids::*;

View File

@@ -11,7 +11,7 @@ use brk_reader::Reader;
use brk_traversable::TreeNode;
use brk_types::{
Address, AddressStats, BlockInfo, BlockStatus, Format, Height, Index, IndexInfo, Limit,
MempoolInfo, Metric, MetricCount, Transaction, TxStatus, Txid, TxidPath, Utxo,
MempoolInfo, Metric, MetricCount, RecommendedFees, Transaction, TxStatus, Txid, TxidPath, Utxo,
};
use vecdb::{AnyExportableVec, AnyStoredVec};
@@ -35,8 +35,8 @@ use crate::{
chain::{
get_address, get_address_txids, get_address_utxos, get_block_by_height,
get_block_status_by_height, get_block_txids, get_blocks, get_height_by_hash,
get_mempool_info, get_mempool_txids, get_transaction, get_transaction_hex,
get_transaction_status,
get_mempool_info, get_mempool_txids, get_recommended_fees, get_transaction,
get_transaction_hex, get_transaction_status,
},
vecs::{IndexToVec, MetricToVec},
};
@@ -136,6 +136,10 @@ impl Query {
get_mempool_txids(self)
}
pub fn get_recommended_fees(&self) -> Result<RecommendedFees> {
get_recommended_fees(self)
}
pub fn match_metric(&self, metric: &Metric, limit: Limit) -> Vec<&'static str> {
self.vecs().matches(metric, limit)
}

View File

@@ -5,7 +5,7 @@ use axum::{
response::{Redirect, Response},
routing::get,
};
use brk_types::{MempoolInfo, Txid};
use brk_types::{MempoolInfo, RecommendedFees, Txid};
use crate::{
VERSION,
@@ -62,5 +62,25 @@ impl MempoolRoutes for ApiRouter<AppState> {
},
),
)
.api_route(
"/api/v1/fees/recommended",
get_with(
async |headers: HeaderMap, State(state): State<AppState>| {
let etag = format!("{VERSION}-{}", state.get_height().await);
if headers.has_etag(&etag) {
return Response::new_not_modified();
}
state.get_recommended_fees().await.to_json_response(&etag)
},
|op| {
op.mempool_tag()
.summary("Recommended fees")
.description("Get recommended fee rates for different confirmation targets based on current mempool state.")
.ok_response::<RecommendedFees>()
.not_modified()
.server_error()
},
),
)
}
}

View File

@@ -1,4 +1,4 @@
use std::fmt;
use std::{fmt, mem};
use derive_deref::Deref;
use schemars::JsonSchema;
@@ -98,7 +98,7 @@ fn sanitize(dirty: impl Iterator<Item = String>) -> Vec<String> {
match c {
' ' | ',' | '+' => {
if !current.is_empty() {
clean.push(std::mem::take(&mut current));
clean.push(mem::take(&mut current));
}
}
'-' => current.push('_'),