mempool: snapshot

This commit is contained in:
nym21
2025-12-13 16:26:29 +01:00
parent 9c8b9b1a3b
commit c59ac62e45
22 changed files with 1808 additions and 582 deletions

2
Cargo.lock generated
View File

@@ -737,12 +737,14 @@ name = "brk_monitor"
version = "0.0.111" version = "0.0.111"
dependencies = [ dependencies = [
"brk_error", "brk_error",
"brk_logger",
"brk_rpc", "brk_rpc",
"brk_types", "brk_types",
"derive_deref", "derive_deref",
"log", "log",
"parking_lot", "parking_lot",
"rustc-hash", "rustc-hash",
"smallvec",
] ]
[[package]] [[package]]

View File

@@ -74,6 +74,7 @@ serde = "1.0.228"
serde_bytes = "0.11.19" serde_bytes = "0.11.19"
serde_derive = "1.0.228" serde_derive = "1.0.228"
serde_json = { version = "1.0.145", features = ["float_roundtrip"] } serde_json = { version = "1.0.145", features = ["float_roundtrip"] }
smallvec = "1.15.1"
tokio = { version = "1.48.0", features = ["rt-multi-thread"] } tokio = { version = "1.48.0", features = ["rt-multi-thread"] }
vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco"] } vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco"] }
# vecdb = { git = "https://github.com/anydb-rs/anydb", features = ["derive", "serde_json", "pco"] } # vecdb = { git = "https://github.com/anydb-rs/anydb", features = ["derive", "serde_json", "pco"] }

View File

@@ -28,7 +28,7 @@ pco = "0.4.7"
rayon = { workspace = true } rayon = { workspace = true }
rustc-hash = { workspace = true } rustc-hash = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
smallvec = "1.15.1" smallvec = { workspace = true }
vecdb = { workspace = true } vecdb = { workspace = true }
[dev-dependencies] [dev-dependencies]

View File

@@ -94,11 +94,11 @@ fn write(
dash: impl Display, dash: impl Display,
args: impl Display, args: impl Display,
) -> Result<(), std::io::Error> { ) -> Result<(), std::io::Error> {
// writeln!(buf, "{date_time} {dash} {level} {args}") writeln!(buf, "{date_time} {dash} {level} {args}")
// Don't remove, used to know the target of unwanted logs // Don't remove, used to know the target of unwanted logs
writeln!( // writeln!(
buf, // buf,
"{} {} {} {} {}", // "{} {} {} {} {}",
date_time, _target, level, dash, args // date_time, _target, level, dash, args
) // )
} }

View File

@@ -16,3 +16,7 @@ derive_deref = { workspace = true }
log = { workspace = true } log = { workspace = true }
parking_lot = { workspace = true } parking_lot = { workspace = true }
rustc-hash = { workspace = true } rustc-hash = { workspace = true }
smallvec = { workspace = true }
[dev-dependencies]
brk_logger = { workspace = true }

View File

@@ -0,0 +1,455 @@
# Mempool Projected Blocks - Design Document
## Goal
Efficiently maintain projected blocks for fee estimation without rebuilding everything on each mempool change.
## Core Idea
Instead of rebuilding all projected blocks on every insert/remove:
1. Insert new tx/package into the correct block (binary search by fee rate)
2. Cascade overflow: if block > 1MB, move lowest fee rate item to next block
3. On remove: cascade up to fill the gap
## Data Structures
### MempoolPackage
```rust
enum MempoolPackage {
/// Tx with no unconfirmed parents - can be mined independently
Independent(MempoolEntry),
/// Tx(s) bundled for CPFP - ancestors + paying descendant
Bundle {
/// The descendant tx that "pays for" the ancestors
child: Txid,
/// All txs in topological order (ancestors first, child last)
entries: Vec<MempoolEntry>,
/// Sum of all fees
total_fee: Sats,
/// Sum of all vsizes
total_vsize: VSize,
},
/// Tx waiting for unconfirmed parent(s) that are in a different Bundle
/// Not placed in projected blocks until dependencies clear
Pending {
entry: MempoolEntry,
/// Parent txids this tx is waiting for
waiting_for: FxHashSet<Txid>,
},
}
```
### ProjectedBlock
```rust
struct ProjectedBlock {
/// Packages sorted by fee rate (highest first)
packages: Vec<MempoolPackage>,
/// Current total vsize
total_vsize: VSize,
/// Running fee total
total_fee: Sats,
}
```
## Insert Algorithm
### Case 1: No unconfirmed parents
```
tx A (10 sat/vB), no unconfirmed inputs
```
→ Create `Independent(A)`
→ Binary search blocks by fee rate, insert, cascade overflow
### Case 2: Has unconfirmed parent(s), CPFP beneficial
```
tx P (2 sat/vB) → tx C (50 sat/vB)
Package rate = (P.fee + C.fee) / (P.vsize + C.vsize) = 26 sat/vB
26 > max(2, 50)? No, 26 < 50
```
Wait, need to reconsider: CPFP is beneficial when child NEEDS parent to be mined.
The package rate matters for when C will be mined (C can't be mined without P).
Actually: C's effective rate = package rate, always. Because C cannot be mined without P.
So: Create `Bundle { child: C, entries: [P, C], rate: 26 }`
If P was already `Independent(P)` in a block:
1. Remove P from its block
2. Create Bundle
3. Insert Bundle at rate 26 sat/vB
### Case 3: Chain of unconfirmed txs
```
tx A (2 sat/vB) → tx B (3 sat/vB) → tx C (100 sat/vB)
```
C's package = A + B + C
`Bundle { child: C, entries: [A, B, C] }`
If A and B were already placed:
1. Remove A, B from their positions
2. Create Bundle with all three
3. Insert at package fee rate
### Case 4: Diamond dependency
```
tx A (5 sat/vB)
↙ ↘
tx B (2) tx C (2)
↘ ↙
tx D (100 sat/vB)
```
D's ancestors = {A, B, C}
`Bundle { child: D, entries: [A, B, C, D] }` (topological order)
### Case 5: Multiple children competing for same parent
```
tx P (2 sat/vB)
├→ tx C1 (50 sat/vB) → package rate 26 sat/vB
└→ tx C2 (100 sat/vB) → package rate 51 sat/vB
```
P can only be in ONE package (can only be mined once).
Choose the highest package fee rate: P + C2 at 51 sat/vB
`Bundle { child: C2, entries: [P, C2] }`
What happens to C1?
- C1 cannot be mined until P is mined
- C1 becomes `Pending { entry: C1, waiting_for: {P} }`
- C1 is NOT in projected blocks
When Bundle(P, C2) is mined:
- P leaves mempool
- C1's waiting_for becomes empty
- C1 converts to `Independent(C1)` at 50 sat/vB
- C1 gets inserted into projected blocks
### Case 6: New tx improves existing Bundle
```
Existing: Bundle { child: C, entries: [P, C], rate: 26 }
New tx D spends C's output, D has very high fee
New package rate (P + C + D) = 40 sat/vB
```
Since 40 > 26:
1. Remove old Bundle from its block
2. Create new `Bundle { child: D, entries: [P, C, D] }`
3. Insert at new rate
### Case 7: New tx doesn't improve, becomes Pending
```
Existing: Bundle { child: C, entries: [P, C], rate: 26 }
New tx D spends C's output, D has low fee
New package rate (P + C + D) = 20 sat/vB
```
Since 20 < 26, D doesn't help:
`Pending { entry: D, waiting_for: {C} }`
When Bundle(P, C) is mined:
→ D converts to `Independent(D)`
## Remove Algorithm
### Case 1: Remove an `Independent`
```
Remove Independent(A) from block 3
```
1. Remove A from block 3
2. Block 3 now has space
3. Pull highest fee rate package from block 4 into block 3
4. Cascade: pull from block 5 to block 4, etc.
5. Stop when a block has no underflow or no more blocks
### Case 2: Remove the "child" (paying tx) of a Bundle
```
Bundle { child: C, entries: [P, C] } in block 2
C gets dropped/RBF'd (NOT confirmed - if confirmed, P would be too)
```
1. Remove Bundle from block 2
2. P has no more CPFP boost
3. P becomes `Independent(P)` at its own rate (2 sat/vB)
4. Insert P into appropriate block (probably much later)
5. Cascade to fill block 2's gap
### Case 3: Remove an ancestor from a Bundle (confirmation)
```
Bundle { child: C, entries: [P, C], rate: 26 } in block 2
P gets confirmed (separate from C? unusual but possible in reorg)
```
1. Remove Bundle from block 2
2. C no longer needs P
3. C becomes `Independent(C)` at 50 sat/vB
4. Insert C into earlier block (higher rate now)
5. Cascade as needed
### Case 4: Remove tx that has Pending descendants
```
Independent(P) in block 5
Pending { entry: C, waiting_for: {P} }
P gets confirmed
```
1. Remove P from block 5
2. Find all Pending txs waiting for P
3. For each: remove P from waiting_for
4. If waiting_for is empty: convert to Independent, insert into blocks
5. Cascade to fill block 5's gap
### Case 5: Remove middle of a chain (tx dropped/invalid)
```
Bundle { child: D, entries: [A, B, C, D] }
B gets dropped (double-spend, RBF, etc.)
```
B invalid means C and D are invalid too (missing input):
1. Remove entire Bundle
2. A becomes `Independent(A)` if still valid
3. C, D removed from mempool entirely
### Case 6: RBF replacement
```
Independent(A) in block 3
New tx A' replaces A (same inputs, higher fee)
```
1. Remove A (and any descendants - they're now invalid)
2. Insert A' as new Independent or Bundle
## Pending → Active Transitions
When a tx is removed (confirmed), check all `Pending` entries:
```rust
fn on_tx_removed(&mut self, txid: Txid) {
let pending_to_update: Vec<_> = self.pending
.iter()
.filter(|(_, p)| p.waiting_for.contains(&txid))
.map(|(id, _)| *id)
.collect();
for pending_txid in pending_to_update {
let pending = self.pending.get_mut(&pending_txid).unwrap();
pending.waiting_for.remove(&txid);
if pending.waiting_for.is_empty() {
// Convert to Independent and insert
let entry = self.pending.remove(&pending_txid).unwrap();
self.insert_independent(entry.entry);
}
}
}
```
## Cascade Algorithm
### Cascade Down (after insert causes overflow)
```rust
fn cascade_down(&mut self, starting_block: usize) {
let mut block_idx = starting_block;
while block_idx < self.blocks.len() {
let block = &mut self.blocks[block_idx];
if block.total_vsize <= BLOCK_VSIZE_TARGET {
break; // No overflow
}
// Pop lowest fee rate package
let overflow = block.packages.pop().unwrap();
block.total_vsize -= overflow.vsize();
block.total_fee -= overflow.fee();
// Push to next block
if block_idx + 1 >= self.blocks.len() {
self.blocks.push(ProjectedBlock::new());
}
let next_block = &mut self.blocks[block_idx + 1];
// Insert at beginning (it's the highest fee rate in this block)
next_block.packages.insert(0, overflow);
next_block.total_vsize += overflow.vsize();
next_block.total_fee += overflow.fee();
block_idx += 1;
}
}
```
### Cascade Up (after remove causes underflow)
```rust
fn cascade_up(&mut self, starting_block: usize) {
let mut block_idx = starting_block;
while block_idx < self.blocks.len() {
let current_vsize = self.blocks[block_idx].total_vsize;
if current_vsize >= BLOCK_VSIZE_TARGET {
break; // Block is full enough
}
if block_idx + 1 >= self.blocks.len() {
break; // No more blocks to pull from
}
let next_block = &mut self.blocks[block_idx + 1];
if next_block.packages.is_empty() {
// Remove empty block
self.blocks.remove(block_idx + 1);
break;
}
// Pull highest fee rate package from next block
let pulled = next_block.packages.remove(0);
next_block.total_vsize -= pulled.vsize();
next_block.total_fee -= pulled.fee();
// Add to current block
let current_block = &mut self.blocks[block_idx];
current_block.packages.push(pulled);
current_block.total_vsize += pulled.vsize();
current_block.total_fee += pulled.fee();
block_idx += 1;
}
// Clean up empty trailing blocks
while self.blocks.last().map(|b| b.packages.is_empty()).unwrap_or(false) {
self.blocks.pop();
}
}
```
## Data Structure for Efficient Operations
Need to track:
1. `txid → MempoolPackage` - which package contains a tx
2. `txid → block_index` - which block a tx/package is in
3. `txid → Vec<Txid>` - descendants waiting for this tx (Pending)
4. Per-block: sorted packages by fee rate
```rust
struct MempoolState {
/// All packages (Independent, Bundle, or Pending)
packages: FxHashMap<Txid, MempoolPackage>,
/// For Bundle: maps each txid in bundle to the child txid (package key)
tx_to_package: FxHashMap<Txid, Txid>,
/// Maps package key (txid) to block index, None if Pending
package_to_block: FxHashMap<Txid, Option<usize>>,
/// Maps txid to list of Pending txids waiting for it
waiting_on: FxHashMap<Txid, FxHashSet<Txid>>,
/// The projected blocks
blocks: Vec<ProjectedBlock>,
}
```
## Open Questions
1. **Block fullness threshold**: Should we cascade when exactly at 1MB, or allow slight overflow?
2. **Minimum fee rate**: Packages below minimum relay fee should be excluded?
3. **Maximum ancestors**: Bitcoin Core limits ancestor count (25). Should we?
4. **Memory bounds**: For huge mempools, should we limit projected blocks count?
## mempool.space Implementation Analysis
Source: [mempool/mempool rust/gbt/src](https://github.com/mempool/mempool/tree/master/rust/gbt/src)
### Key Files
- `gbt.rs` - Main block building algorithm
- `audit_transaction.rs` - Transaction wrapper with ancestor tracking
- `thread_transaction.rs` - Lightweight tx representation
### Their Approach
**Not incremental!** They rebuild from scratch but optimize heavily:
1. **Use numeric UIDs** instead of 32-byte txids - massive memory/hash savings
2. **Ancestor score** = (fee + ancestor_fees) / (weight + ancestor_weights)
3. **Two-source selection**:
- `mempool_stack`: Original sorted order
- `modified` priority queue: Txs whose scores changed due to parent selection
4. **Greedy selection loop**:
- Pick highest ancestor score from either source
- Include all ancestors first
- Update all descendants' scores (via `update_descendants()`)
- Move affected descendants to `modified` queue
### Why They Don't Do Incremental
The `update_descendants()` cascade is the key insight: when you select a tx, ALL its descendants need score recalculation because their ancestor set changed. This cascade can touch a huge portion of the mempool.
Their solution: rebuild fast rather than update incrementally.
- Use u32 UIDs (4 bytes vs 32 bytes)
- Pre-allocate with capacity 1,048,576
- Custom u32-based hasher
- Run in separate thread via `spawn_blocking()`
### Should We Follow Their Approach?
**Pros of their approach:**
- Simpler code, fewer edge cases
- Proven correct (Bitcoin Core algorithm port)
- Fast enough with optimizations (sub-100ms for full rebuild)
**Pros of incremental:**
- Lower latency for small changes
- Less CPU spike on each update
- Better for very high tx throughput
**Recommendation:** Start with their approach (rebuild with optimizations), measure performance. Only add incremental if needed.
### Simplified Algorithm (from mempool.space)
```
1. Build audit_pool: Map<uid, AuditTransaction>
2. set_relatives(): compute ancestors for each tx
3. Sort by ancestor_score descending
4. while mempool not empty:
a. Pick best from (stack, modified_queue)
b. If ancestors not yet selected, select them first
c. Add to current block
d. If block full, start new block
e. update_descendants(): recalc scores, add to modified_queue
5. Return blocks
```
### Key Optimization: Ancestor Score
```rust
ancestor_score = (tx.fee + sum(ancestor.fee)) / (tx.weight + sum(ancestor.weight))
```
This single metric captures CPFP naturally - a high-fee child boosts its low-fee parents by being evaluated as a package.
## Revised Implementation Plan
Given mempool.space's approach works well, simplify our design:
### Phase 1: Fast Rebuild (MVP)
- Use u32 UIDs for txs
- Compute ancestor scores
- Greedy selection with modified queue
- Rebuild on each change (with spawn_blocking)
### Phase 2: Incremental (If Needed)
- Track which txs changed
- Only rebuild affected portions
- Cascade updates through dependency graph
### Phase 3: Further Optimizations
- Batch updates (coalesce rapid changes)
- Dirty flag + lazy rebuild
- Background thread continuous updates
## References
- Bitcoin Core's block assembly: `src/node/miner.cpp`
- [mempool.space Rust GBT](https://github.com/mempool/mempool/tree/master/rust/gbt/src)
- [mempool-blocks.ts](https://github.com/mempool/mempool/blob/master/backend/src/api/mempool-blocks.ts)

View File

@@ -5,10 +5,9 @@ use brk_monitor::Mempool;
use brk_rpc::{Auth, Client}; use brk_rpc::{Auth, Client};
fn main() -> Result<()> { fn main() -> Result<()> {
// Connect to Bitcoin Core brk_logger::init(None)?;
let bitcoin_dir = Client::default_bitcoin_path();
// let bitcoin_dir = Path::new("/Volumes/WD_BLACK/bitcoin");
let bitcoin_dir = Client::default_bitcoin_path();
let client = Client::new( let client = Client::new(
Client::default_url(), Client::default_url(),
Auth::CookieFile(bitcoin_dir.join(".cookie")), Auth::CookieFile(bitcoin_dir.join(".cookie")),
@@ -16,19 +15,58 @@ fn main() -> Result<()> {
let mempool = Mempool::new(&client); let mempool = Mempool::new(&client);
// Start mempool sync in background thread
let mempool_clone = mempool.clone(); let mempool_clone = mempool.clone();
thread::spawn(move || { thread::spawn(move || {
mempool_clone.start(); mempool_clone.start();
}); });
// Access from main thread // Poll and display stats every 5 seconds
loop { loop {
thread::sleep(Duration::from_secs(5)); thread::sleep(Duration::from_secs(5));
let txs = mempool.get_txs();
println!("mempool_tx_count: {}", txs.len());
let addresses = mempool.get_addresses();
println!("mempool_address_count: {}", addresses.len());
}
// Ok(()) // Basic mempool info
let info = mempool.get_info();
let block_stats = mempool.get_block_stats();
let total_fees: u64 = block_stats.iter().map(|s| u64::from(s.total_fee)).sum();
println!("\n=== Mempool Info ===");
println!(" Transactions: {}", info.count);
println!(" Total vsize: {} vB", info.vsize);
println!(
" Total fees: {:.4} BTC",
total_fees as f64 / 100_000_000.0
);
// Fee recommendations (like mempool.space)
let fees = mempool.get_fees();
println!("\n=== Recommended Fees (sat/vB) ===");
println!(" No Priority {:.4}", f64::from(fees.economy_fee));
println!(" Low Priority {:.4}", f64::from(fees.hour_fee));
println!(" Medium Priority {:.4}", f64::from(fees.half_hour_fee));
println!(" High Priority {:.4}", f64::from(fees.fastest_fee));
// Projected blocks (like mempool.space)
if !block_stats.is_empty() {
println!("\n=== Projected Blocks ===");
for (i, stats) in block_stats.iter().enumerate() {
let total_fee_btc = u64::from(stats.total_fee) as f64 / 100_000_000.0;
println!(
" Block {}: ~{:.4} sat/vB, {:.4}-{:.4} sat/vB, {:.3} BTC, {} txs",
i + 1,
f64::from(stats.median_fee_rate()),
f64::from(stats.min_fee_rate()),
f64::from(stats.max_fee_rate()),
total_fee_btc,
stats.tx_count,
);
}
}
// Address tracking stats
let addresses = mempool.get_addresses();
println!("\n=== Address Tracking ===");
println!(" Addresses with pending txs: {}", addresses.len());
println!("\n----------------------------------------");
}
} }

View File

@@ -1,234 +1,10 @@
use std::{sync::Arc, thread, time::Duration}; //! Bitcoin mempool monitor.
//!
use brk_error::Result; //! Provides real-time mempool tracking with:
use brk_rpc::Client; //! - Fee estimation via projected blocks
use brk_types::{ //! - Address mempool stats
AddressBytes, AddressMempoolStats, MempoolInfo, RecommendedFees, TxWithHex, Txid, //! - CPFP-aware block building
};
use derive_deref::Deref;
use log::error;
use parking_lot::{RwLock, RwLockReadGuard};
use rustc_hash::{FxHashMap, FxHashSet};
mod mempool; mod mempool;
use mempool::{ProjectedBlocks, TxGraph}; pub use mempool::{BlockStats, Mempool, MempoolInner, ProjectedSnapshot};
const MAX_FETCHES_PER_CYCLE: usize = 10_000;
///
/// Mempool monitor
///
/// Thread safe and free to clone
///
#[derive(Clone, Deref)]
pub struct Mempool(Arc<MempoolInner>);
impl Mempool {
pub fn new(client: &Client) -> Self {
Self(Arc::new(MempoolInner::new(client.clone())))
}
}
pub struct MempoolInner {
client: Client,
info: RwLock<MempoolInfo>,
fees: RwLock<RecommendedFees>,
graph: RwLock<TxGraph>,
projected_blocks: RwLock<ProjectedBlocks>,
txs: RwLock<FxHashMap<Txid, TxWithHex>>,
addresses: RwLock<FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>>,
}
impl MempoolInner {
pub fn new(client: Client) -> Self {
Self {
client,
info: RwLock::new(MempoolInfo::default()),
fees: RwLock::new(RecommendedFees::default()),
graph: RwLock::new(TxGraph::new()),
projected_blocks: RwLock::new(ProjectedBlocks::default()),
txs: RwLock::new(FxHashMap::default()),
addresses: RwLock::new(FxHashMap::default()),
}
}
pub fn get_info(&self) -> MempoolInfo {
self.info.read().clone()
}
pub fn get_fees(&self) -> RecommendedFees {
self.fees.read().clone()
}
pub fn get_projected_blocks(&self) -> ProjectedBlocks {
self.projected_blocks.read().clone()
}
pub fn get_txs(&self) -> RwLockReadGuard<'_, FxHashMap<Txid, TxWithHex>> {
self.txs.read()
}
pub fn get_addresses(
&self,
) -> RwLockReadGuard<'_, FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>> {
self.addresses.read()
}
/// Start an infinite update loop with a 1 second interval
pub fn start(&self) {
loop {
if let Err(e) = self.update() {
error!("Error updating mempool: {}", e);
}
thread::sleep(Duration::from_secs(1));
}
}
pub fn update(&self) -> Result<()> {
let current_txids = self
.client
.get_raw_mempool()?
.into_iter()
.collect::<FxHashSet<_>>();
let new_txs = self.fetch_new_txs(&current_txids);
let has_changes = self.apply_changes(&current_txids, &new_txs);
if has_changes {
self.rebuild_projected_blocks();
}
Ok(())
}
/// Fetch transactions that are new to our mempool
fn fetch_new_txs(&self, current_txids: &FxHashSet<Txid>) -> FxHashMap<Txid, TxWithHex> {
let txs = self.txs.read();
current_txids
.iter()
.filter(|txid| !txs.contains_key(*txid))
.take(MAX_FETCHES_PER_CYCLE)
.cloned()
.collect::<Vec<_>>()
.into_iter()
.filter_map(|txid| {
self.client
.get_mempool_transaction(&txid)
.ok()
.map(|tx| (txid, tx))
})
.collect()
}
/// Apply transaction additions and removals, returns true if there were changes
fn apply_changes(
&self,
current_txids: &FxHashSet<Txid>,
new_txs: &FxHashMap<Txid, TxWithHex>,
) -> bool {
let mut info = self.info.write();
let mut graph = self.graph.write();
let mut txs = self.txs.write();
let mut addresses = self.addresses.write();
let mut had_removals = false;
let had_additions = !new_txs.is_empty();
// Remove transactions no longer in mempool
txs.retain(|txid, tx_with_hex| {
if current_txids.contains(txid) {
return true;
}
had_removals = true;
let tx = tx_with_hex.tx();
info.remove(tx);
graph.remove(txid);
Self::update_address_stats_on_removal(tx, txid, &mut addresses);
false
});
// Add new transactions
for (txid, tx_with_hex) in new_txs {
let tx = tx_with_hex.tx();
info.add(tx);
graph.insert(tx);
Self::update_address_stats_on_addition(tx, txid, &mut addresses);
}
txs.extend(new_txs.clone());
had_removals || had_additions
}
/// Rebuild projected blocks and update recommended fees
fn rebuild_projected_blocks(&self) {
let graph = self.graph.read();
let projected = ProjectedBlocks::build(&graph);
let fees = projected.recommended_fees();
*self.projected_blocks.write() = projected;
*self.fees.write() = fees;
}
fn update_address_stats_on_removal(
tx: &brk_types::Transaction,
txid: &Txid,
addresses: &mut FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>,
) {
// Inputs: undo "sending" state
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.sent(txout);
stats.update_tx_count(set.len() as u32);
});
// Outputs: undo "receiving" state
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.received(txout);
stats.update_tx_count(set.len() as u32);
});
}
fn update_address_stats_on_addition(
tx: &brk_types::Transaction,
txid: &Txid,
addresses: &mut FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>,
) {
// Inputs: mark as "sending"
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.sending(txout);
stats.update_tx_count(set.len() as u32);
});
// Outputs: mark as "receiving"
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.receiving(txout);
stats.update_tx_count(set.len() as u32);
});
}
}

View File

@@ -0,0 +1,164 @@
use std::ops::{Index, IndexMut};
use brk_types::{Sats, VSize};
use smallvec::SmallVec;
use crate::mempool::{MempoolTxIndex, PoolIndex};
/// Compare ancestor fee rates using cross-multiplication (avoids f64 division).
/// Returns true if (fee_a / vsize_a) > (fee_b / vsize_b).
#[inline]
fn has_higher_fee_rate(fee_a: Sats, vsize_a: VSize, fee_b: Sats, vsize_b: VSize) -> bool {
// Cross multiply: fee_a/vsize_a > fee_b/vsize_b ⟺ fee_a * vsize_b > fee_b * vsize_a
let score_a = u64::from(fee_a) as u128 * u64::from(vsize_b) as u128;
let score_b = u64::from(fee_b) as u128 * u64::from(vsize_a) as u128;
score_a > score_b
}
/// Type-safe wrapper around Vec<AuditTx> that only allows PoolIndex access.
pub struct Pool(Vec<AuditTx>);
impl Pool {
pub fn new(txs: Vec<AuditTx>) -> Self {
Self(txs)
}
#[inline]
pub fn len(&self) -> usize {
self.0.len()
}
}
impl Index<PoolIndex> for Pool {
type Output = AuditTx;
#[inline]
fn index(&self, idx: PoolIndex) -> &Self::Output {
&self.0[idx.as_usize()]
}
}
impl IndexMut<PoolIndex> for Pool {
#[inline]
fn index_mut(&mut self, idx: PoolIndex) -> &mut Self::Output {
&mut self.0[idx.as_usize()]
}
}
/// Lightweight transaction for block building.
/// Created fresh each rebuild, discarded after.
pub struct AuditTx {
/// Original entries index (for final output)
pub entries_idx: MempoolTxIndex,
/// Pool index (for internal graph traversal)
pub pool_idx: PoolIndex,
pub fee: Sats,
pub vsize: VSize,
/// In-mempool parent pool indices
pub parents: SmallVec<[PoolIndex; 4]>,
/// In-mempool child pool indices
pub children: SmallVec<[PoolIndex; 8]>,
/// Cumulative fee (self + all ancestors)
pub ancestor_fee: Sats,
/// Cumulative vsize (self + all ancestors)
pub ancestor_vsize: VSize,
/// Already selected into a block
pub used: bool,
/// Already in modified priority queue
pub in_modified: bool,
}
impl AuditTx {
/// Create AuditTx with pre-computed ancestor values from Bitcoin Core.
pub fn new_with_ancestors(
entries_idx: MempoolTxIndex,
pool_idx: PoolIndex,
fee: Sats,
vsize: VSize,
ancestor_fee: Sats,
ancestor_vsize: VSize,
) -> Self {
Self {
entries_idx,
pool_idx,
fee,
vsize,
parents: SmallVec::new(),
children: SmallVec::new(),
ancestor_fee,
ancestor_vsize,
used: false,
in_modified: false,
}
}
#[inline]
pub fn has_higher_score_than(&self, other: &Self) -> bool {
has_higher_fee_rate(
self.ancestor_fee,
self.ancestor_vsize,
other.ancestor_fee,
other.ancestor_vsize,
)
}
}
/// Priority queue entry for the modified queue.
/// Stores a snapshot of ancestor values at time of insertion.
#[derive(Clone, Copy)]
pub struct TxPriority {
/// Pool index (for indexing into pool array)
pub pool_idx: PoolIndex,
/// Snapshot of ancestor fee at insertion time
pub ancestor_fee: Sats,
/// Snapshot of ancestor vsize at insertion time
pub ancestor_vsize: VSize,
}
impl TxPriority {
pub fn new(tx: &AuditTx) -> Self {
Self {
pool_idx: tx.pool_idx,
ancestor_fee: tx.ancestor_fee,
ancestor_vsize: tx.ancestor_vsize,
}
}
#[inline]
pub fn has_higher_score_than(&self, other: &Self) -> bool {
has_higher_fee_rate(
self.ancestor_fee,
self.ancestor_vsize,
other.ancestor_fee,
other.ancestor_vsize,
)
}
}
impl PartialEq for TxPriority {
fn eq(&self, other: &Self) -> bool {
self.pool_idx == other.pool_idx
}
}
impl Eq for TxPriority {}
impl PartialOrd for TxPriority {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for TxPriority {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Higher score = higher priority (for max-heap)
if self.has_higher_score_than(other) {
std::cmp::Ordering::Greater
} else if other.has_higher_score_than(self) {
std::cmp::Ordering::Less
} else {
// Tiebreaker: lower index first (deterministic)
other.pool_idx.cmp(&self.pool_idx)
}
}
}

View File

@@ -0,0 +1,118 @@
use brk_types::TxidPrefix;
use rustc_hash::FxHashMap;
use super::audit::{AuditTx, Pool};
use super::selection::select_into_blocks;
use crate::mempool::{MempoolEntry, MempoolTxIndex, PoolIndex, SelectedTx};
/// Number of projected blocks to build
const NUM_PROJECTED_BLOCKS: usize = 8;
/// Estimated txs per block (for partial sort optimization)
const TXS_PER_BLOCK: usize = 4000;
/// Build projected blocks from mempool entries.
///
/// Returns SelectedTx (with effective fee rate) grouped by block, in mining priority order.
pub fn build_projected_blocks(entries: &[Option<MempoolEntry>]) -> Vec<Vec<SelectedTx>> {
// Collect live entries
let live: Vec<(MempoolTxIndex, &MempoolEntry)> = entries
.iter()
.enumerate()
.filter_map(|(i, opt)| opt.as_ref().map(|e| (MempoolTxIndex::from(i), e)))
.collect();
if live.is_empty() {
return Vec::new();
}
// Build AuditTx pool with pre-computed ancestor values from Bitcoin Core
let mut pool = Pool::new(build_audit_pool(&live));
// Sort by ancestor score (partial sort for efficiency)
let sorted = partial_sort_by_score(&pool);
// Run selection algorithm
select_into_blocks(&mut pool, sorted, NUM_PROJECTED_BLOCKS)
}
/// Build the AuditTx pool with parent/child relationships.
/// AuditTx.parents and .children store pool indices (for graph traversal).
/// AuditTx.entries_idx stores the original entries index (for final output).
/// Uses Bitcoin Core's pre-computed ancestor values (correct, no double-counting).
fn build_audit_pool(live: &[(MempoolTxIndex, &MempoolEntry)]) -> Vec<AuditTx> {
// Create mapping from TxidPrefix to pool index
let prefix_to_pool_idx: FxHashMap<TxidPrefix, PoolIndex> = live
.iter()
.enumerate()
.map(|(pool_idx, (_, entry))| (entry.txid_prefix(), PoolIndex::from(pool_idx)))
.collect();
// Build pool with parent relationships
// Use Bitcoin Core's pre-computed ancestor_fee and ancestor_vsize
let mut pool: Vec<AuditTx> = live
.iter()
.enumerate()
.map(|(pool_idx, (entries_idx, entry))| {
let pool_idx = PoolIndex::from(pool_idx);
let mut tx = AuditTx::new_with_ancestors(
*entries_idx,
pool_idx,
entry.fee,
entry.vsize,
entry.ancestor_fee,
entry.ancestor_vsize,
);
// Find in-mempool parents from depends list (provided by Bitcoin Core)
for parent_prefix in &entry.depends {
if let Some(&parent_pool_idx) = prefix_to_pool_idx.get(parent_prefix) {
tx.parents.push(parent_pool_idx);
}
}
tx
})
.collect();
// Build child relationships (reverse of parents)
for pool_idx in 0..pool.len() {
let parents = pool[pool_idx].parents.clone();
for parent_pool_idx in parents {
pool[parent_pool_idx.as_usize()].children.push(PoolIndex::from(pool_idx));
}
}
pool
}
/// Partial sort: only fully sort the top N txs needed for blocks.
/// Returns pool indices sorted by ancestor score.
fn partial_sort_by_score(pool: &Pool) -> Vec<PoolIndex> {
let mut indices: Vec<PoolIndex> = (0..pool.len()).map(PoolIndex::from).collect();
let needed = NUM_PROJECTED_BLOCKS * TXS_PER_BLOCK;
// Comparator: descending by score, then ascending by index (deterministic tiebreaker)
let cmp = |a: &PoolIndex, b: &PoolIndex| -> std::cmp::Ordering {
let tx_a = &pool[*a];
let tx_b = &pool[*b];
if tx_b.has_higher_score_than(tx_a) {
std::cmp::Ordering::Greater
} else if tx_a.has_higher_score_than(tx_b) {
std::cmp::Ordering::Less
} else {
a.cmp(b)
}
};
if indices.len() > needed {
// Partition: move top `needed` to front (unordered), then sort just those
indices.select_nth_unstable_by(needed, cmp);
indices[..needed].sort_unstable_by(cmp);
indices.truncate(needed);
} else {
indices.sort_unstable_by(cmp);
}
indices
}

View File

@@ -0,0 +1,5 @@
mod audit;
mod build;
mod selection;
pub use build::build_projected_blocks;

View File

@@ -0,0 +1,278 @@
use std::collections::BinaryHeap;
use brk_types::FeeRate;
use super::audit::{Pool, TxPriority};
use crate::mempool::{PoolIndex, SelectedTx};
/// Target vsize per block (~1MB, derived from 4MW weight limit)
const BLOCK_VSIZE_LIMIT: u64 = 1_000_000;
/// Select transactions into blocks using the two-source algorithm.
///
/// Takes pool indices (sorted by score), returns SelectedTx with effective fee rate at selection time.
pub fn select_into_blocks(
pool: &mut Pool,
sorted_pool_indices: Vec<PoolIndex>,
num_blocks: usize,
) -> Vec<Vec<SelectedTx>> {
let mut blocks: Vec<Vec<SelectedTx>> = Vec::with_capacity(num_blocks);
let mut current_block: Vec<SelectedTx> = Vec::new();
let mut current_vsize: u64 = 0;
let mut sorted_iter = sorted_pool_indices.into_iter().peekable();
let mut modified_queue: BinaryHeap<TxPriority> = BinaryHeap::new();
'outer: loop {
// Pick best candidate from either sorted list or modified queue
let best_pool_idx = pick_best_candidate(pool, &mut sorted_iter, &mut modified_queue);
let Some(pool_idx) = best_pool_idx else {
break;
};
// Skip if already used
if pool[pool_idx].used {
continue;
}
// Capture the package rate BEFORE selecting ancestors
// This is the rate that justified this tx (and its ancestors) for inclusion
let package_rate = {
let tx = &pool[pool_idx];
FeeRate::from((tx.ancestor_fee, tx.ancestor_vsize))
};
// Select this tx and all its unselected ancestors
let selected = select_with_ancestors(pool, pool_idx);
for sel_pool_idx in selected {
let tx = &pool[sel_pool_idx];
let tx_vsize = u64::from(tx.vsize);
// Check if tx fits in current block
if current_vsize + tx_vsize > BLOCK_VSIZE_LIMIT && !current_block.is_empty() {
blocks.push(std::mem::take(&mut current_block));
current_vsize = 0;
if blocks.len() >= num_blocks {
// Early exit - we have enough blocks
break 'outer;
}
}
// Effective fee rate = the package rate at selection time.
// This is the mining score that determined which block this tx lands in.
// For CPFP, both parent and child get the same package rate (the child's score).
current_block.push(SelectedTx {
entries_idx: tx.entries_idx,
effective_fee_rate: package_rate,
});
current_vsize += tx_vsize;
// Update descendants' ancestor scores
update_descendants(pool, sel_pool_idx, &mut modified_queue);
}
}
// Don't forget the last block
if !current_block.is_empty() && blocks.len() < num_blocks {
blocks.push(current_block);
}
// Post-process: fix fee rate ordering violations between adjacent blocks.
// This handles cases where a tx's score improved after its target block was full.
fix_block_ordering(&mut blocks);
// Log how many txs were left unselected
let total_selected: usize = blocks.iter().map(|b| b.len()).sum();
log::debug!(
"Selected {} txs into {} blocks, modified_queue has {} remaining",
total_selected,
blocks.len(),
modified_queue.len()
);
blocks
}
/// Pick the best candidate from sorted list or modified queue.
/// Returns a pool index.
fn pick_best_candidate(
pool: &Pool,
sorted_iter: &mut std::iter::Peekable<std::vec::IntoIter<PoolIndex>>,
modified_queue: &mut BinaryHeap<TxPriority>,
) -> Option<PoolIndex> {
// Skip used txs in sorted iterator
while sorted_iter.peek().is_some_and(|&idx| pool[idx].used) {
sorted_iter.next();
}
// Skip used txs and stale entries in modified queue.
// A tx can be pushed multiple times as its score improves (when different ancestors are selected).
// For example: tx C depends on A and B. When A is selected, C is pushed with score 2.0.
// When B is selected, C is pushed again with score 4.0. The queue now has two entries for C.
// We skip the stale 2.0 entry and use the current 4.0 entry.
while let Some(p) = modified_queue.peek() {
let tx = &pool[p.pool_idx];
if tx.used {
modified_queue.pop();
continue;
}
// Check if this queue entry has outdated snapshot (a newer entry exists with better score)
if p.ancestor_fee != tx.ancestor_fee || p.ancestor_vsize != tx.ancestor_vsize {
modified_queue.pop();
continue;
}
break;
}
let sorted_best = sorted_iter.peek().map(|&idx| &pool[idx]);
let modified_best = modified_queue.peek().map(|p| &pool[p.pool_idx]);
match (sorted_best, modified_best) {
(None, None) => None,
(Some(_), None) => sorted_iter.next(),
(None, Some(_)) => {
let p = modified_queue.pop().unwrap();
Some(p.pool_idx)
}
(Some(sorted_tx), Some(modified_tx)) => {
// Compare CURRENT scores from pool (not stale snapshots)
if sorted_tx.has_higher_score_than(modified_tx) {
sorted_iter.next()
} else {
let p = modified_queue.pop().unwrap();
Some(p.pool_idx)
}
}
}
}
/// Select a tx and all its unselected ancestors (topological order).
/// Takes and returns pool indices.
fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec<PoolIndex> {
let mut to_select: Vec<PoolIndex> = Vec::new();
let mut stack = vec![pool_idx];
// DFS to find all unselected ancestors
while let Some(current) = stack.pop() {
let tx = &pool[current];
if tx.used {
continue;
}
// Push unselected parents onto stack (process parents first)
let has_unselected_parents = tx.parents.iter().any(|&p| !pool[p].used);
if has_unselected_parents {
stack.push(current); // Re-add self to process after parents
for &parent in &tx.parents {
if !pool[parent].used {
stack.push(parent);
}
}
} else {
// All parents selected, can select this one
if !pool[current].used {
pool[current].used = true;
to_select.push(current);
}
}
}
to_select
}
/// Fix fee rate ordering violations between blocks.
/// Swaps txs between adjacent blocks until Block N's min >= Block N+1's max.
fn fix_block_ordering(blocks: &mut [Vec<SelectedTx>]) {
// Iterate until no more swaps needed
let mut changed = true;
let mut iterations = 0;
const MAX_ITERATIONS: usize = 100; // Prevent infinite loops
while changed && iterations < MAX_ITERATIONS {
changed = false;
iterations += 1;
for i in 0..blocks.len().saturating_sub(1) {
// Find min in block i and max in block i+1
let Some(curr_min_idx) = blocks[i]
.iter()
.enumerate()
.min_by_key(|(_, s)| s.effective_fee_rate)
.map(|(idx, _)| idx)
else {
continue;
};
let Some(next_max_idx) = blocks[i + 1]
.iter()
.enumerate()
.max_by_key(|(_, s)| s.effective_fee_rate)
.map(|(idx, _)| idx)
else {
continue;
};
let curr_min = blocks[i][curr_min_idx].effective_fee_rate;
let next_max = blocks[i + 1][next_max_idx].effective_fee_rate;
// If violation exists, swap the two txs
if next_max > curr_min {
// Swap: move high-fee tx to earlier block, low-fee tx to later block
let high_tx = blocks[i + 1].swap_remove(next_max_idx);
let low_tx = blocks[i].swap_remove(curr_min_idx);
blocks[i].push(high_tx);
blocks[i + 1].push(low_tx);
changed = true;
}
}
}
if iterations >= MAX_ITERATIONS {
log::warn!("fix_block_ordering: reached max iterations, some violations may remain");
}
}
/// Update descendants' ancestor scores after selecting a tx.
/// Takes a pool index.
fn update_descendants(
pool: &mut Pool,
selected_pool_idx: PoolIndex,
modified_queue: &mut BinaryHeap<TxPriority>,
) {
let selected_fee = pool[selected_pool_idx].fee;
let selected_vsize = pool[selected_pool_idx].vsize;
// Track visited to avoid double-subtracting in diamond patterns
let mut visited = rustc_hash::FxHashSet::default();
// BFS through children (children are pool indices)
let mut stack: Vec<PoolIndex> = pool[selected_pool_idx].children.to_vec();
while let Some(child_idx) = stack.pop() {
// Skip if already visited (handles diamond patterns)
if !visited.insert(child_idx) {
continue;
}
let child = &mut pool[child_idx];
if child.used {
continue;
}
// Subtract selected tx from ancestor totals
child.ancestor_fee -= selected_fee;
child.ancestor_vsize -= selected_vsize;
// Always re-push to modified queue with updated score.
// This may create duplicates, but we handle that by checking
// if the tx is used or if the snapshot is stale when popping.
modified_queue.push(TxPriority::new(child));
child.in_modified = true;
// Continue to grandchildren
stack.extend(child.children.iter().copied());
}
}

View File

@@ -1,62 +1,55 @@
use brk_types::{FeeRate, Sats, Transaction, Txid, VSize, Vout}; use brk_types::{FeeRate, MempoolEntryInfo, Sats, Txid, TxidPrefix, VSize};
use rustc_hash::FxHashSet;
/// (txid, vout) tuple identifying an unspent output in the mempool /// A mempool transaction entry.
pub type MempoolOutpoint = (Txid, Vout); ///
/// Stores only the data needed for fee estimation and block building.
/// A mempool transaction with its dependency metadata /// Ancestor values are pre-computed by Bitcoin Core (correctly handling shared ancestors).
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct MempoolEntry { pub struct MempoolEntry {
pub txid: Txid, pub txid: Txid,
pub fee: Sats, pub fee: Sats,
pub vsize: VSize, pub vsize: VSize,
/// Pre-computed ancestor fee (self + all ancestors, no double-counting)
/// Outpoints this tx spends (inputs)
pub spends: Vec<MempoolOutpoint>,
/// Txids of unconfirmed ancestors (parents, grandparents, etc.)
pub ancestors: FxHashSet<Txid>,
/// Cumulative fee of this tx + all ancestors
pub ancestor_fee: Sats, pub ancestor_fee: Sats,
/// Pre-computed ancestor vsize (self + all ancestors, no double-counting)
/// Cumulative vsize of this tx + all ancestors
pub ancestor_vsize: VSize, pub ancestor_vsize: VSize,
/// Parent txid prefixes (transactions this tx depends on)
pub depends: Vec<TxidPrefix>,
} }
impl MempoolEntry { impl MempoolEntry {
pub fn new(tx: &Transaction) -> Self { pub fn from_info(info: &MempoolEntryInfo) -> Self {
let txid = tx.txid.clone();
let fee = tx.fee;
let vsize = tx.vsize();
let spends = tx
.input
.iter()
.map(|txin| (txin.txid.clone(), txin.vout))
.collect();
Self { Self {
txid, txid: info.txid.clone(),
fee, fee: info.fee,
vsize, vsize: VSize::from(info.vsize),
spends, ancestor_fee: info.ancestor_fee,
ancestors: FxHashSet::default(), ancestor_vsize: VSize::from(info.ancestor_size),
ancestor_fee: fee, depends: info.depends.iter().map(TxidPrefix::from).collect(),
ancestor_vsize: vsize,
} }
} }
/// Individual fee rate (without ancestors)
#[inline] #[inline]
pub fn fee_rate(&self) -> FeeRate { pub fn fee_rate(&self) -> FeeRate {
FeeRate::from((self.fee, self.vsize)) FeeRate::from((self.fee, self.vsize))
} }
/// Ancestor fee rate (fee + ancestors_fee) / (vsize + ancestors_vsize) /// Ancestor fee rate (package rate for CPFP)
/// This is the effective mining priority
#[inline] #[inline]
pub fn ancestor_fee_rate(&self) -> FeeRate { pub fn ancestor_fee_rate(&self) -> FeeRate {
FeeRate::from((self.ancestor_fee, self.ancestor_vsize)) FeeRate::from((self.ancestor_fee, self.ancestor_vsize))
} }
/// Effective fee rate for display - the rate that justified this tx's inclusion.
/// For CPFP parents, this is their ancestor_fee_rate (child paying for them).
/// For regular txs, this is their own fee_rate.
#[inline]
pub fn effective_fee_rate(&self) -> FeeRate {
std::cmp::max(self.fee_rate(), self.ancestor_fee_rate())
}
#[inline]
pub fn txid_prefix(&self) -> TxidPrefix {
TxidPrefix::from(&self.txid)
}
} }

View File

@@ -1,175 +0,0 @@
use brk_types::{Sats, Transaction, Txid, VSize, Vout};
use rustc_hash::{FxHashMap, FxHashSet};
use super::entry::MempoolOutpoint;
use super::MempoolEntry;
/// Transaction dependency graph for the mempool
///
/// Tracks parent-child relationships and computes ancestor feerates
/// for proper CPFP (Child-Pays-For-Parent) handling.
#[derive(Debug, Default)]
pub struct TxGraph {
/// All mempool entries by txid
entries: FxHashMap<Txid, MempoolEntry>,
/// Maps outpoint -> txid that created it (for finding parents)
outpoint_to_tx: FxHashMap<MempoolOutpoint, Txid>,
/// Maps txid -> txids that spend its outputs (children)
children: FxHashMap<Txid, FxHashSet<Txid>>,
}
impl TxGraph {
pub fn new() -> Self {
Self::default()
}
pub fn entries(&self) -> &FxHashMap<Txid, MempoolEntry> {
&self.entries
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
/// Add a transaction to the graph
pub fn insert(&mut self, tx: &Transaction) {
let mut entry = MempoolEntry::new(tx);
// Find in-mempool parents and build ancestor set
let parents = self.find_parents(&entry.spends);
entry.ancestors = self.compute_ancestors(&parents);
// Compute ancestor fee/vsize
let (ancestor_fee, ancestor_vsize) = self.sum_ancestors(&entry.ancestors);
entry.ancestor_fee = entry.fee + ancestor_fee;
entry.ancestor_vsize = entry.vsize + ancestor_vsize;
// Register this tx's outputs
for (vout, _) in tx.output.iter().enumerate() {
let outpoint = (entry.txid.clone(), Vout::from(vout as u32));
self.outpoint_to_tx.insert(outpoint, entry.txid.clone());
}
// Register as child of parents
for parent in &parents {
self.children
.entry(parent.clone())
.or_default()
.insert(entry.txid.clone());
}
self.entries.insert(entry.txid.clone(), entry);
}
/// Remove a transaction from the graph
pub fn remove(&mut self, txid: &Txid) -> Option<MempoolEntry> {
let entry = self.entries.remove(txid)?;
// Remove from outpoint index
// Note: We don't know the vout count, so we remove all entries pointing to this txid
self.outpoint_to_tx.retain(|_, tx| tx != txid);
// Remove from children index
self.children.remove(txid);
for children_set in self.children.values_mut() {
children_set.remove(txid);
}
// Update descendants' ancestor data
self.update_descendants_after_removal(txid, &entry);
Some(entry)
}
/// Check if a txid is in the mempool
pub fn contains(&self, txid: &Txid) -> bool {
self.entries.contains_key(txid)
}
/// Get all txids currently in the graph
pub fn txids(&self) -> impl Iterator<Item = &Txid> {
self.entries.keys()
}
/// Find which inputs reference in-mempool transactions (parents)
fn find_parents(&self, spends: &[MempoolOutpoint]) -> Vec<Txid> {
spends
.iter()
.filter_map(|outpoint| self.outpoint_to_tx.get(outpoint).cloned())
.collect()
}
/// Compute full ancestor set (transitive closure)
fn compute_ancestors(&self, parents: &[Txid]) -> FxHashSet<Txid> {
let mut ancestors = FxHashSet::default();
let mut stack: Vec<Txid> = parents.to_vec();
while let Some(txid) = stack.pop() {
if ancestors.insert(txid.clone()) {
if let Some(entry) = self.entries.get(&txid) {
stack.extend(entry.ancestors.iter().cloned());
}
}
}
ancestors
}
/// Sum fee and vsize of all ancestors
fn sum_ancestors(&self, ancestors: &FxHashSet<Txid>) -> (Sats, VSize) {
ancestors.iter().fold(
(Sats::default(), VSize::default()),
|(fee, vsize), txid| {
if let Some(entry) = self.entries.get(txid) {
(fee + entry.fee, vsize + entry.vsize)
} else {
(fee, vsize)
}
},
)
}
/// Update all descendants after removing a transaction
fn update_descendants_after_removal(&mut self, removed: &Txid, removed_entry: &MempoolEntry) {
// Find all descendants
let descendants = self.find_descendants(removed);
// Update each descendant's ancestor set and cumulative values
for desc_txid in descendants {
if let Some(desc) = self.entries.get_mut(&desc_txid) {
// Remove the removed tx from ancestors
desc.ancestors.remove(removed);
// Subtract the removed tx's contribution
desc.ancestor_fee = desc.ancestor_fee - removed_entry.fee;
desc.ancestor_vsize = desc.ancestor_vsize - removed_entry.vsize;
}
}
}
/// Find all descendants of a transaction (children, grandchildren, etc.)
fn find_descendants(&self, txid: &Txid) -> Vec<Txid> {
let mut descendants = Vec::new();
let mut stack = vec![txid.clone()];
let mut visited = FxHashSet::default();
while let Some(current) = stack.pop() {
if let Some(children) = self.children.get(&current) {
for child in children {
if visited.insert(child.clone()) {
descendants.push(child.clone());
stack.push(child.clone());
}
}
}
}
descendants
}
}

View File

@@ -1,7 +1,13 @@
mod block_builder;
mod entry; mod entry;
mod graph; mod monitor;
mod projected_blocks; mod projected_blocks;
mod types;
pub use entry::MempoolEntry; // Public API
pub use graph::TxGraph; pub use monitor::{Mempool, MempoolInner};
pub use projected_blocks::ProjectedBlocks; pub use projected_blocks::{BlockStats, ProjectedSnapshot};
// Crate-internal (used by submodules)
pub(crate) use entry::MempoolEntry;
pub(crate) use types::{MempoolTxIndex, PoolIndex, SelectedTx};

View File

@@ -0,0 +1,341 @@
use std::{
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
thread,
time::{Duration, Instant},
};
use brk_error::Result;
use brk_rpc::Client;
use brk_types::{
AddressBytes, AddressMempoolStats, MempoolEntryInfo, MempoolInfo, RecommendedFees, TxWithHex,
Txid, TxidPrefix,
};
use derive_deref::Deref;
use log::{error, info};
use parking_lot::{RwLock, RwLockReadGuard};
use rustc_hash::{FxHashMap, FxHashSet};
use super::block_builder::build_projected_blocks;
use super::entry::MempoolEntry;
use super::projected_blocks::{BlockStats, ProjectedSnapshot};
use super::types::MempoolTxIndex;
/// Max new txs to fetch full data for per update cycle (for address tracking)
const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000;
/// Minimum interval between rebuilds (milliseconds)
const MIN_REBUILD_INTERVAL_MS: u64 = 1000;
/// Block building state - grouped for atomic locking.
#[derive(Default)]
struct BlockBuildingState {
/// Slot-based entry storage
entries: Vec<Option<MempoolEntry>>,
/// TxidPrefix -> slot index
txid_prefix_to_idx: FxHashMap<TxidPrefix, MempoolTxIndex>,
/// Recycled slot indices
free_indices: Vec<MempoolTxIndex>,
}
/// Mempool monitor.
///
/// Thread-safe wrapper around `MempoolInner`. Free to clone.
#[derive(Clone, Deref)]
pub struct Mempool(Arc<MempoolInner>);
impl Mempool {
pub fn new(client: &Client) -> Self {
Self(Arc::new(MempoolInner::new(client.clone())))
}
}
/// Inner mempool state and logic.
pub struct MempoolInner {
client: Client,
// Mempool state
info: RwLock<MempoolInfo>,
txs: RwLock<FxHashMap<Txid, TxWithHex>>,
addresses: RwLock<FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>>,
// Block building data (single lock for consistency)
block_state: RwLock<BlockBuildingState>,
// Projected blocks snapshot
snapshot: RwLock<ProjectedSnapshot>,
// Rate limiting
dirty: AtomicBool,
last_rebuild_ms: AtomicU64,
}
impl MempoolInner {
pub fn new(client: Client) -> Self {
Self {
client,
info: RwLock::new(MempoolInfo::default()),
txs: RwLock::new(FxHashMap::default()),
addresses: RwLock::new(FxHashMap::default()),
block_state: RwLock::new(BlockBuildingState::default()),
snapshot: RwLock::new(ProjectedSnapshot::default()),
dirty: AtomicBool::new(false),
last_rebuild_ms: AtomicU64::new(0),
}
}
pub fn get_info(&self) -> MempoolInfo {
self.info.read().clone()
}
pub fn get_fees(&self) -> RecommendedFees {
self.snapshot.read().fees.clone()
}
pub fn get_snapshot(&self) -> ProjectedSnapshot {
self.snapshot.read().clone()
}
pub fn get_block_stats(&self) -> Vec<BlockStats> {
self.snapshot.read().block_stats.clone()
}
pub fn get_txs(&self) -> RwLockReadGuard<'_, FxHashMap<Txid, TxWithHex>> {
self.txs.read()
}
pub fn get_addresses(
&self,
) -> RwLockReadGuard<'_, FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>> {
self.addresses.read()
}
/// Start an infinite update loop with a 1 second interval.
pub fn start(&self) {
loop {
if let Err(e) = self.update() {
error!("Error updating mempool: {}", e);
}
thread::sleep(Duration::from_secs(1));
}
}
/// Sync with Bitcoin Core mempool and rebuild projections if needed.
pub fn update(&self) -> Result<()> {
// Single RPC call gets all entries with fees
let entries_info = self.client.get_raw_mempool_verbose()?;
let current_txids: FxHashSet<Txid> = entries_info.iter().map(|e| e.txid.clone()).collect();
// Find new txids and fetch full tx data for address tracking
let new_txs = self.fetch_new_txs(&current_txids);
// Apply changes using the entry info (has fees) and full txs (for addresses)
let has_changes = self.apply_changes(&entries_info, &new_txs);
if has_changes {
self.dirty.store(true, Ordering::Release);
}
self.rebuild_if_needed();
Ok(())
}
/// Fetch full transaction data for new txids (needed for address tracking).
fn fetch_new_txs(&self, current_txids: &FxHashSet<Txid>) -> FxHashMap<Txid, TxWithHex> {
let txs = self.txs.read();
current_txids
.iter()
.filter(|txid| !txs.contains_key(*txid))
.take(MAX_TX_FETCHES_PER_CYCLE)
.cloned()
.collect::<Vec<_>>()
.into_iter()
.filter_map(|txid| {
self.client
.get_mempool_transaction(&txid)
.ok()
.map(|tx| (txid, tx))
})
.collect()
}
/// Apply transaction additions and removals. Returns true if there were changes.
fn apply_changes(
&self,
entries_info: &[MempoolEntryInfo],
new_txs: &FxHashMap<Txid, TxWithHex>,
) -> bool {
// Build lookup map for current entries
let current_entries: FxHashMap<TxidPrefix, &MempoolEntryInfo> = entries_info
.iter()
.map(|e| (TxidPrefix::from(&e.txid), e))
.collect();
let mut info = self.info.write();
let mut txs = self.txs.write();
let mut addresses = self.addresses.write();
let mut block_state = self.block_state.write();
let mut had_removals = false;
let had_additions = !new_txs.is_empty();
// Remove transactions no longer in mempool
txs.retain(|txid, tx_with_hex| {
let prefix = TxidPrefix::from(txid);
if current_entries.contains_key(&prefix) {
return true;
}
had_removals = true;
let tx = tx_with_hex.tx();
info.remove(tx);
Self::update_address_stats_on_removal(tx, txid, &mut addresses);
// Remove from slot-based storage
if let Some(idx) = block_state.txid_prefix_to_idx.remove(&prefix) {
if let Some(slot) = block_state.entries.get_mut(idx.as_usize()) {
*slot = None;
}
block_state.free_indices.push(idx);
}
false
});
// Add new transactions
for (txid, tx_with_hex) in new_txs {
let tx = tx_with_hex.tx();
let prefix = TxidPrefix::from(txid);
// Get the entry info (has fee from Bitcoin Core)
let Some(entry_info) = current_entries.get(&prefix) else {
continue;
};
let entry = MempoolEntry::from_info(entry_info);
info.add(tx);
Self::update_address_stats_on_addition(tx, txid, &mut addresses);
// Allocate slot
let idx = if let Some(idx) = block_state.free_indices.pop() {
block_state.entries[idx.as_usize()] = Some(entry);
idx
} else {
let idx = MempoolTxIndex::from(block_state.entries.len());
block_state.entries.push(Some(entry));
idx
};
block_state.txid_prefix_to_idx.insert(prefix, idx);
}
txs.extend(new_txs.clone());
had_removals || had_additions
}
/// Rebuild projected blocks if dirty and enough time has passed.
fn rebuild_if_needed(&self) {
if !self.dirty.load(Ordering::Acquire) {
return;
}
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let last = self.last_rebuild_ms.load(Ordering::Acquire);
if now_ms.saturating_sub(last) < MIN_REBUILD_INTERVAL_MS {
return;
}
// Attempt to claim the rebuild (atomic compare-exchange)
if self
.last_rebuild_ms
.compare_exchange(last, now_ms, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
return; // Another thread is rebuilding
}
self.dirty.store(false, Ordering::Release);
let i = Instant::now();
self.rebuild_projected_blocks();
info!("mempool: rebuild_projected_blocks in {:?}", i.elapsed());
}
/// Rebuild projected blocks snapshot.
fn rebuild_projected_blocks(&self) {
let block_state = self.block_state.read();
let blocks = build_projected_blocks(&block_state.entries);
let snapshot = ProjectedSnapshot::build(blocks, &block_state.entries);
*self.snapshot.write() = snapshot;
}
fn update_address_stats_on_removal(
tx: &brk_types::Transaction,
txid: &Txid,
addresses: &mut FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>,
) {
// Inputs: undo "sending" state
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.sent(txout);
stats.update_tx_count(set.len() as u32);
});
// Outputs: undo "receiving" state
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.received(txout);
stats.update_tx_count(set.len() as u32);
});
}
fn update_address_stats_on_addition(
tx: &brk_types::Transaction,
txid: &Txid,
addresses: &mut FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>,
) {
// Inputs: mark as "sending"
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.sending(txout);
stats.update_tx_count(set.len() as u32);
});
// Outputs: mark as "receiving"
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.receiving(txout);
stats.update_tx_count(set.len() as u32);
});
}
}

View File

@@ -1,135 +1,137 @@
use std::mem; use brk_types::{FeeRate, RecommendedFees, Sats, VSize};
use brk_types::{FeeRate, RecommendedFees, Sats, Txid, VSize}; use super::{MempoolEntry, MempoolTxIndex, SelectedTx};
use rustc_hash::FxHashSet;
use super::TxGraph; /// Minimum fee rate for estimation (sat/vB)
const MIN_FEE_RATE: f64 = 1.0;
/// Maximum block weight in weight units (4 million) /// Immutable snapshot of projected blocks.
const MAX_BLOCK_WEIGHT: u64 = 4_000_000; /// Stores indices into live entries + pre-computed stats.
/// Target block vsize (weight / 4)
const BLOCK_VSIZE_TARGET: u64 = MAX_BLOCK_WEIGHT / 4;
/// Number of projected blocks to build
const NUM_PROJECTED_BLOCKS: usize = 8;
/// Minimum fee rate (no priority)
const MIN_FEE_RATE: f64 = 0.1;
/// A projected future block built from mempool transactions
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub struct ProjectedBlock { pub struct ProjectedSnapshot {
pub txids: Vec<Txid>, /// Block structure: indices into entries Vec
pub blocks: Vec<Vec<MempoolTxIndex>>,
/// Pre-computed stats per block
pub block_stats: Vec<BlockStats>,
/// Pre-computed fee recommendations
pub fees: RecommendedFees,
}
/// Statistics for a single projected block.
#[derive(Debug, Clone, Default)]
pub struct BlockStats {
pub tx_count: u32,
pub total_vsize: VSize, pub total_vsize: VSize,
pub total_fee: Sats, pub total_fee: Sats,
pub min_fee_rate: FeeRate, /// Fee rate percentiles: [0%, 10%, 25%, 50%, 75%, 90%, 100%]
pub max_fee_rate: FeeRate, /// - fee_range[0] = min, fee_range[3] = median, fee_range[6] = max
pub median_fee_rate: FeeRate, pub fee_range: [FeeRate; 7],
} }
/// Projected mempool blocks for fee estimation impl BlockStats {
#[derive(Debug, Clone, Default)] pub fn min_fee_rate(&self) -> FeeRate {
pub struct ProjectedBlocks { self.fee_range[0]
pub blocks: Vec<ProjectedBlock>, }
pub fn median_fee_rate(&self) -> FeeRate {
self.fee_range[3]
}
pub fn max_fee_rate(&self) -> FeeRate {
self.fee_range[6]
}
} }
impl ProjectedBlocks { impl ProjectedSnapshot {
/// Build projected blocks from a transaction graph /// Build snapshot from selected transactions (with effective fee rates) and entries.
/// pub fn build(blocks: Vec<Vec<SelectedTx>>, entries: &[Option<MempoolEntry>]) -> Self {
/// Simulates how miners would construct blocks by selecting let block_stats: Vec<BlockStats> = blocks
/// transactions with highest ancestor fee rates first.
pub fn build(graph: &TxGraph) -> Self {
if graph.is_empty() {
return Self::default();
}
// Collect entries sorted by ancestor fee rate (descending)
let mut sorted: Vec<_> = graph
.entries()
.iter() .iter()
.map(|(txid, entry)| { .map(|selected| compute_block_stats(selected, entries))
(
txid.clone(),
entry.ancestor_fee_rate(),
entry.vsize,
entry.fee,
)
})
.collect(); .collect();
sorted.sort_by(|a, b| b.1.cmp(&a.1)); let fees = compute_recommended_fees(&block_stats);
// Build blocks greedily // Convert to just indices for storage
let mut blocks = Vec::with_capacity(NUM_PROJECTED_BLOCKS); let blocks: Vec<Vec<MempoolTxIndex>> = blocks
let mut current_block = ProjectedBlock::default(); .into_iter()
let mut included: FxHashSet<Txid> = FxHashSet::default(); .map(|selected| selected.into_iter().map(|s| s.entries_idx).collect())
.collect();
for (txid, fee_rate, vsize, fee) in sorted { Self {
// Skip if already included (as part of ancestor package) blocks,
if included.contains(&txid) { block_stats,
continue; fees,
}
// Would this tx fit in the current block?
let new_vsize = current_block.total_vsize + vsize;
if u64::from(new_vsize) > BLOCK_VSIZE_TARGET && !current_block.txids.is_empty() {
// Finalize and store current block
Self::finalize_block(&mut current_block);
blocks.push(mem::take(&mut current_block));
if blocks.len() >= NUM_PROJECTED_BLOCKS {
break;
}
}
// Add to current block
current_block.txids.push(txid.clone());
current_block.total_vsize += vsize;
current_block.total_fee += fee;
included.insert(txid);
// Track fee rate bounds
if current_block.max_fee_rate == FeeRate::default() {
current_block.max_fee_rate = fee_rate;
}
current_block.min_fee_rate = fee_rate;
} }
// Don't forget the last block
if !current_block.txids.is_empty() && blocks.len() < NUM_PROJECTED_BLOCKS {
Self::finalize_block(&mut current_block);
blocks.push(current_block);
}
Self { blocks }
}
/// Compute recommended fees from projected blocks
pub fn recommended_fees(&self) -> RecommendedFees {
RecommendedFees {
fastest_fee: self.fee_for_block(0),
half_hour_fee: self.fee_for_block(2), // ~3 blocks
hour_fee: self.fee_for_block(5), // ~6 blocks
economy_fee: self.fee_for_block(7), // ~8 blocks
minimum_fee: FeeRate::from(MIN_FEE_RATE),
}
}
/// Get the minimum fee rate needed to get into block N
fn fee_for_block(&self, block_index: usize) -> FeeRate {
self.blocks
.get(block_index)
.map(|b| b.min_fee_rate)
.unwrap_or_else(|| FeeRate::from(MIN_FEE_RATE))
}
fn finalize_block(block: &mut ProjectedBlock) {
// Compute median fee rate from min/max as approximation
// (true median would require storing all fee rates)
let min = f64::from(block.min_fee_rate);
let max = f64::from(block.max_fee_rate);
block.median_fee_rate = FeeRate::from((min + max) / 2.0);
} }
} }
/// Compute statistics for a single block using effective fee rates from selection time.
fn compute_block_stats(selected: &[SelectedTx], entries: &[Option<MempoolEntry>]) -> BlockStats {
if selected.is_empty() {
return BlockStats::default();
}
let mut total_fee = Sats::default();
let mut total_vsize = VSize::default();
let mut fee_rates: Vec<FeeRate> = Vec::with_capacity(selected.len());
for sel in selected {
if let Some(entry) = &entries[sel.entries_idx.as_usize()] {
total_fee += entry.fee;
total_vsize += entry.vsize;
// Use the effective fee rate captured at selection time
// This is the actual mining score that determined this tx's block placement
fee_rates.push(sel.effective_fee_rate);
}
}
fee_rates.sort();
BlockStats {
tx_count: selected.len() as u32,
total_vsize,
total_fee,
fee_range: [
percentile(&fee_rates, 0),
percentile(&fee_rates, 10),
percentile(&fee_rates, 25),
percentile(&fee_rates, 50),
percentile(&fee_rates, 75),
percentile(&fee_rates, 90),
percentile(&fee_rates, 100),
],
}
}
/// Get percentile value from sorted array.
fn percentile(sorted: &[FeeRate], p: usize) -> FeeRate {
if sorted.is_empty() {
return FeeRate::default();
}
let idx = (p * (sorted.len() - 1)) / 100;
sorted[idx]
}
/// Compute recommended fees from block stats (mempool.space style).
fn compute_recommended_fees(stats: &[BlockStats]) -> RecommendedFees {
RecommendedFees {
// High priority: median of block 1
fastest_fee: median_fee_for_block(stats, 0),
// Medium priority: median of blocks 2-3
half_hour_fee: median_fee_for_block(stats, 2),
// Low priority: median of blocks 4-6
hour_fee: median_fee_for_block(stats, 5),
// No priority: median of later blocks
economy_fee: median_fee_for_block(stats, 7),
minimum_fee: FeeRate::from(MIN_FEE_RATE),
}
}
/// Get the median fee rate for block N.
fn median_fee_for_block(stats: &[BlockStats], block_index: usize) -> FeeRate {
stats
.get(block_index)
.map(|s| s.median_fee_rate())
.unwrap_or_else(|| FeeRate::from(MIN_FEE_RATE))
}

View File

@@ -0,0 +1,47 @@
/// Index into the mempool entries Vec.
/// NOT the global TxIndex for confirmed transactions.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct MempoolTxIndex(pub(crate) u32);
impl MempoolTxIndex {
#[inline]
pub fn as_usize(self) -> usize {
self.0 as usize
}
}
impl From<usize> for MempoolTxIndex {
#[inline]
fn from(value: usize) -> Self {
Self(value as u32)
}
}
/// Index into the temporary pool Vec used during block building.
/// Distinct from MempoolTxIndex to prevent mixing up index spaces.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct PoolIndex(u32);
impl PoolIndex {
#[inline]
pub fn as_usize(self) -> usize {
self.0 as usize
}
}
impl From<usize> for PoolIndex {
#[inline]
fn from(value: usize) -> Self {
Self(value as u32)
}
}
/// A selected transaction with its effective mining score at selection time.
/// The effective_fee_rate is the ancestor score when this tx was selected,
/// which may differ from the original ancestor score (if ancestors were already mined).
#[derive(Debug, Clone, Copy)]
pub struct SelectedTx {
pub entries_idx: MempoolTxIndex,
/// Fee rate at selection time (ancestor_fee / ancestor_vsize)
pub effective_fee_rate: brk_types::FeeRate,
}

View File

@@ -9,7 +9,10 @@ use bitcoincore_rpc::{
{Client as CoreClient, Error as RpcError, RpcApi}, {Client as CoreClient, Error as RpcError, RpcApi},
}; };
use brk_error::Result; use brk_error::Result;
use brk_types::{BlockHash, Height, Sats, Transaction, TxIn, TxOut, TxStatus, TxWithHex, Txid, Vout}; use brk_types::{
BlockHash, Height, MempoolEntryInfo, Sats, Transaction, TxIn, TxOut, TxStatus, TxWithHex, Txid,
Vout,
};
pub use bitcoincore_rpc::Auth; pub use bitcoincore_rpc::Auth;
@@ -196,6 +199,24 @@ impl Client {
.map_err(Into::into) .map_err(Into::into)
} }
/// Get all mempool entries with their fee data in a single RPC call
pub fn get_raw_mempool_verbose(&self) -> Result<Vec<MempoolEntryInfo>> {
let result = self.call(|c| c.get_raw_mempool_verbose())?;
Ok(result
.into_iter()
.map(|(txid, entry)| MempoolEntryInfo {
txid: txid.into(),
vsize: entry.vsize,
weight: entry.weight.unwrap_or(entry.vsize * 4),
fee: Sats::from(entry.fees.base.to_sat()),
ancestor_count: entry.ancestor_count,
ancestor_size: entry.ancestor_size,
ancestor_fee: Sats::from(entry.fees.ancestor.to_sat()),
depends: entry.depends.into_iter().map(Txid::from).collect(),
})
.collect())
}
pub fn get_raw_transaction<'a, T, H>( pub fn get_raw_transaction<'a, T, H>(
&self, &self,
txid: &'a T, txid: &'a T,

View File

@@ -43,6 +43,7 @@ mod limit;
mod loadedaddressdata; mod loadedaddressdata;
mod loadedaddressindex; mod loadedaddressindex;
mod metric; mod metric;
mod mempoolentryinfo;
mod mempoolinfo; mod mempoolinfo;
mod metriccount; mod metriccount;
mod metrics; mod metrics;
@@ -151,6 +152,7 @@ pub use indexinfo::*;
pub use limit::*; pub use limit::*;
pub use loadedaddressdata::*; pub use loadedaddressdata::*;
pub use loadedaddressindex::*; pub use loadedaddressindex::*;
pub use mempoolentryinfo::*;
pub use mempoolinfo::*; pub use mempoolinfo::*;
pub use metric::*; pub use metric::*;
pub use metriccount::*; pub use metriccount::*;

View File

@@ -0,0 +1,15 @@
use crate::{Sats, Txid};
/// Mempool entry info from Bitcoin Core's getrawmempool verbose
#[derive(Debug, Clone)]
pub struct MempoolEntryInfo {
pub txid: Txid,
pub vsize: u64,
pub weight: u64,
pub fee: Sats,
pub ancestor_count: u64,
pub ancestor_size: u64,
pub ancestor_fee: Sats,
/// Parent txids in the mempool
pub depends: Vec<Txid>,
}

133
docs/mempool-api-status.md Normal file
View File

@@ -0,0 +1,133 @@
# Mempool.space API Compatibility - Implementation Status
Plan file: `/Users/k/.claude/plans/smooth-weaving-crayon.md`
## Completed Endpoints
| Endpoint | Path | Notes |
|----------|------|-------|
| GET Block | `/api/block/{hash}` | |
| GET Block Height | `/api/block-height/{height}` | Returns plain text hash |
| GET Block Status | `/api/block/{hash}/status` | |
| GET Block Txids | `/api/block/{hash}/txids` | |
| GET Blocks | `/api/blocks[/:start_height]` | Last 10 blocks |
| GET Transaction | `/api/tx/{txid}` | |
| GET Tx Status | `/api/tx/{txid}/status` | |
| GET Tx Hex | `/api/tx/{txid}/hex` | Returns plain text |
| GET Address | `/api/address/{address}` | |
| GET Address Txs | `/api/address/{address}/txs` | |
| GET Address UTXOs | `/api/address/{address}/utxo` | |
| GET Mempool Info | `/api/mempool/info` | |
| GET Mempool Txids | `/api/mempool/txids` | |
| GET Recommended Fees | `/api/v1/fees/recommended` | Basic impl, needs optimization |
## Remaining Endpoints
### Mempool/Fees (4)
| # | Endpoint | Path | Dependencies | Priority |
|---|----------|------|--------------|----------|
| 1 | Optimize projected blocks | - | CPFP/ancestor scores | HIGH |
| 2 | GET Mempool Blocks | `/api/v1/fees/mempool-blocks` | #1 | HIGH |
| 3 | GET Mempool Recent | `/api/mempool/recent` | | MED |
| 4 | GET RBF Replacements | `/api/v1/replacements` | RBF tracking in brk_monitor | LOW |
### Blocks (4)
| # | Endpoint | Path | Dependencies | Priority |
|---|----------|------|--------------|----------|
| 5 | GET Block Txs | `/api/block/{hash}/txs[/:start_index]` | | MED |
| 6 | GET Block Txid at Index | `/api/block/{hash}/txid/{index}` | | LOW |
| 7 | GET Block Raw | `/api/block/{hash}/raw` | brk_reader | LOW |
| 8 | GET Block by Timestamp | `/api/v1/mining/blocks/timestamp/{timestamp}` | Binary search | LOW |
### Addresses (3)
| # | Endpoint | Path | Dependencies | Priority |
|---|----------|------|--------------|----------|
| 9 | GET Address Txs Chain | `/api/address/{address}/txs/chain[/:after_txid]` | | MED |
| 10 | GET Address Txs Mempool | `/api/address/{address}/txs/mempool` | brk_monitor | MED |
| 11 | GET Validate Address | `/api/v1/validate-address/{address}` | | LOW |
### Transactions (4)
| # | Endpoint | Path | Dependencies | Priority |
|---|----------|------|--------------|----------|
| 12 | GET Tx Outspend | `/api/tx/{txid}/outspend/{vout}` | #27 txoutindex_to_txinindex | HIGH |
| 13 | GET Tx Outspends | `/api/tx/{txid}/outspends` | #27 | HIGH |
| 14 | GET Tx Merkle Proof | `/api/tx/{txid}/merkle-proof` | | LOW |
| 15 | POST Tx Broadcast | `/api/tx` | brk_rpc | MED |
### General (1)
| # | Endpoint | Path | Dependencies | Priority |
|---|----------|------|--------------|----------|
| 16 | GET Difficulty Adjustment | `/api/v1/difficulty-adjustment` | | MED |
### Mining (9)
| # | Endpoint | Path | Dependencies | Priority |
|---|----------|------|--------------|----------|
| 17 | GET Mining Pools | `/api/v1/mining/pools[/:timePeriod]` | #28 pool identification | LOW |
| 18 | GET Mining Pool | `/api/v1/mining/pool/{slug}` | #28 | LOW |
| 19 | GET Hashrate | `/api/v1/mining/hashrate[/:timePeriod]` | | MED |
| 20 | GET Difficulty Adjustments | `/api/v1/mining/difficulty-adjustments[/:interval]` | | LOW |
| 21 | GET Reward Stats | `/api/v1/mining/reward-stats/{blockCount}` | | LOW |
| 22 | GET Block Fees | `/api/v1/mining/blocks/fees/{timePeriod}` | | LOW |
| 23 | GET Block Rewards | `/api/v1/mining/blocks/rewards/{timePeriod}` | | LOW |
| 24 | GET Block Fee Rates | `/api/v1/mining/blocks/fee-rates/{timePeriod}` | | LOW |
| 25 | GET Block Sizes/Weights | `/api/v1/mining/blocks/sizes-weights/{timePeriod}` | | LOW |
### Infrastructure (3)
| # | Task | Location | Priority |
|---|------|----------|----------|
| 26 | Index txindex_to_sigop_cost | brk_indexer | MED |
| 27 | Add txoutindex_to_txinindex mapping | brk_computer/stateful | HIGH |
| 28 | Pool identification from coinbase | brk_computer | LOW |
## Priority Order
### Phase 1: Core Functionality (HIGH)
1. **#27** Add txoutindex_to_txinindex mapping (enables outspend lookups)
2. **#12** GET Tx Outspend
3. **#13** GET Tx Outspends
4. **#1** Optimize projected blocks (CPFP/ancestor scores)
5. **#2** GET Mempool Blocks
### Phase 2: Essential Features (MED)
6. **#15** POST Tx Broadcast
7. **#16** GET Difficulty Adjustment
8. **#5** GET Block Txs (paginated)
9. **#9** GET Address Txs Chain
10. **#10** GET Address Txs Mempool
11. **#19** GET Hashrate
12. **#26** Index txindex_to_sigop_cost
13. **#3** GET Mempool Recent
### Phase 3: Nice to Have (LOW)
14. **#6** GET Block Txid at Index
15. **#7** GET Block Raw
16. **#8** GET Block by Timestamp
17. **#11** GET Validate Address
18. **#14** GET Tx Merkle Proof
19. **#4** GET RBF Replacements
20. **#20** GET Difficulty Adjustments
21. **#21** GET Reward Stats
22. **#22-25** Mining block statistics
23. **#17-18** Mining pools (requires #28)
24. **#28** Pool identification
## Design Documents
- Mempool projected blocks: `crates/brk_monitor/src/mempool/DESIGN.md`
## Skipped Endpoints
| Endpoint | Reason |
|----------|--------|
| GET Price | `/api/v1/prices` | External data source needed |
| GET Historical Price | `/api/v1/historical-price` | External data source needed |
| GET Full-RBF Replacements | `/api/v1/fullrbf/replacements` | Low priority |
| Lightning endpoints | Requires separate Lightning indexing |
| Accelerator endpoints | mempool.space-specific paid service |