diff --git a/Cargo.lock b/Cargo.lock index dbf164757..4ecff97e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -737,12 +737,14 @@ name = "brk_monitor" version = "0.0.111" dependencies = [ "brk_error", + "brk_logger", "brk_rpc", "brk_types", "derive_deref", "log", "parking_lot", "rustc-hash", + "smallvec", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2a96d3e5b..2cdc79f69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ serde = "1.0.228" serde_bytes = "0.11.19" serde_derive = "1.0.228" serde_json = { version = "1.0.145", features = ["float_roundtrip"] } +smallvec = "1.15.1" tokio = { version = "1.48.0", features = ["rt-multi-thread"] } vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco"] } # vecdb = { git = "https://github.com/anydb-rs/anydb", features = ["derive", "serde_json", "pco"] } diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index 519699d42..e52b26b04 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -28,7 +28,7 @@ pco = "0.4.7" rayon = { workspace = true } rustc-hash = { workspace = true } serde = { workspace = true } -smallvec = "1.15.1" +smallvec = { workspace = true } vecdb = { workspace = true } [dev-dependencies] diff --git a/crates/brk_logger/src/lib.rs b/crates/brk_logger/src/lib.rs index 1a8d9af08..72a5446e5 100644 --- a/crates/brk_logger/src/lib.rs +++ b/crates/brk_logger/src/lib.rs @@ -94,11 +94,11 @@ fn write( dash: impl Display, args: impl Display, ) -> Result<(), std::io::Error> { - // writeln!(buf, "{date_time} {dash} {level} {args}") + writeln!(buf, "{date_time} {dash} {level} {args}") // Don't remove, used to know the target of unwanted logs - writeln!( - buf, - "{} {} {} {} {}", - date_time, _target, level, dash, args - ) + // writeln!( + // buf, + // "{} {} {} {} {}", + // date_time, _target, level, dash, args + // ) } diff --git a/crates/brk_monitor/Cargo.toml b/crates/brk_monitor/Cargo.toml index 709607cc9..b2bece04c 100644 --- a/crates/brk_monitor/Cargo.toml +++ b/crates/brk_monitor/Cargo.toml @@ -16,3 +16,7 @@ derive_deref = { workspace = true } log = { workspace = true } parking_lot = { workspace = true } rustc-hash = { workspace = true } +smallvec = { workspace = true } + +[dev-dependencies] +brk_logger = { workspace = true } diff --git a/crates/brk_monitor/DESIGN.md b/crates/brk_monitor/DESIGN.md new file mode 100644 index 000000000..8a14da9e4 --- /dev/null +++ b/crates/brk_monitor/DESIGN.md @@ -0,0 +1,455 @@ +# Mempool Projected Blocks - Design Document + +## Goal + +Efficiently maintain projected blocks for fee estimation without rebuilding everything on each mempool change. + +## Core Idea + +Instead of rebuilding all projected blocks on every insert/remove: +1. Insert new tx/package into the correct block (binary search by fee rate) +2. Cascade overflow: if block > 1MB, move lowest fee rate item to next block +3. On remove: cascade up to fill the gap + +## Data Structures + +### MempoolPackage + +```rust +enum MempoolPackage { + /// Tx with no unconfirmed parents - can be mined independently + Independent(MempoolEntry), + + /// Tx(s) bundled for CPFP - ancestors + paying descendant + Bundle { + /// The descendant tx that "pays for" the ancestors + child: Txid, + /// All txs in topological order (ancestors first, child last) + entries: Vec, + /// Sum of all fees + total_fee: Sats, + /// Sum of all vsizes + total_vsize: VSize, + }, + + /// Tx waiting for unconfirmed parent(s) that are in a different Bundle + /// Not placed in projected blocks until dependencies clear + Pending { + entry: MempoolEntry, + /// Parent txids this tx is waiting for + waiting_for: FxHashSet, + }, +} +``` + +### ProjectedBlock + +```rust +struct ProjectedBlock { + /// Packages sorted by fee rate (highest first) + packages: Vec, + /// Current total vsize + total_vsize: VSize, + /// Running fee total + total_fee: Sats, +} +``` + +## Insert Algorithm + +### Case 1: No unconfirmed parents +``` +tx A (10 sat/vB), no unconfirmed inputs +``` +→ Create `Independent(A)` +→ Binary search blocks by fee rate, insert, cascade overflow + +### Case 2: Has unconfirmed parent(s), CPFP beneficial +``` +tx P (2 sat/vB) → tx C (50 sat/vB) +Package rate = (P.fee + C.fee) / (P.vsize + C.vsize) = 26 sat/vB +26 > max(2, 50)? No, 26 < 50 +``` +Wait, need to reconsider: CPFP is beneficial when child NEEDS parent to be mined. +The package rate matters for when C will be mined (C can't be mined without P). + +Actually: C's effective rate = package rate, always. Because C cannot be mined without P. + +So: Create `Bundle { child: C, entries: [P, C], rate: 26 }` + +If P was already `Independent(P)` in a block: +1. Remove P from its block +2. Create Bundle +3. Insert Bundle at rate 26 sat/vB + +### Case 3: Chain of unconfirmed txs +``` +tx A (2 sat/vB) → tx B (3 sat/vB) → tx C (100 sat/vB) +``` +C's package = A + B + C +→ `Bundle { child: C, entries: [A, B, C] }` + +If A and B were already placed: +1. Remove A, B from their positions +2. Create Bundle with all three +3. Insert at package fee rate + +### Case 4: Diamond dependency +``` + tx A (5 sat/vB) + ↙ ↘ +tx B (2) tx C (2) + ↘ ↙ + tx D (100 sat/vB) +``` +D's ancestors = {A, B, C} +→ `Bundle { child: D, entries: [A, B, C, D] }` (topological order) + +### Case 5: Multiple children competing for same parent +``` +tx P (2 sat/vB) + ├→ tx C1 (50 sat/vB) → package rate 26 sat/vB + └→ tx C2 (100 sat/vB) → package rate 51 sat/vB +``` +P can only be in ONE package (can only be mined once). + +Choose the highest package fee rate: P + C2 at 51 sat/vB +→ `Bundle { child: C2, entries: [P, C2] }` + +What happens to C1? +- C1 cannot be mined until P is mined +- C1 becomes `Pending { entry: C1, waiting_for: {P} }` +- C1 is NOT in projected blocks + +When Bundle(P, C2) is mined: +- P leaves mempool +- C1's waiting_for becomes empty +- C1 converts to `Independent(C1)` at 50 sat/vB +- C1 gets inserted into projected blocks + +### Case 6: New tx improves existing Bundle +``` +Existing: Bundle { child: C, entries: [P, C], rate: 26 } +New tx D spends C's output, D has very high fee +New package rate (P + C + D) = 40 sat/vB +``` +Since 40 > 26: +1. Remove old Bundle from its block +2. Create new `Bundle { child: D, entries: [P, C, D] }` +3. Insert at new rate + +### Case 7: New tx doesn't improve, becomes Pending +``` +Existing: Bundle { child: C, entries: [P, C], rate: 26 } +New tx D spends C's output, D has low fee +New package rate (P + C + D) = 20 sat/vB +``` +Since 20 < 26, D doesn't help: +→ `Pending { entry: D, waiting_for: {C} }` + +When Bundle(P, C) is mined: +→ D converts to `Independent(D)` + +## Remove Algorithm + +### Case 1: Remove an `Independent` +``` +Remove Independent(A) from block 3 +``` +1. Remove A from block 3 +2. Block 3 now has space +3. Pull highest fee rate package from block 4 into block 3 +4. Cascade: pull from block 5 to block 4, etc. +5. Stop when a block has no underflow or no more blocks + +### Case 2: Remove the "child" (paying tx) of a Bundle +``` +Bundle { child: C, entries: [P, C] } in block 2 +C gets dropped/RBF'd (NOT confirmed - if confirmed, P would be too) +``` +1. Remove Bundle from block 2 +2. P has no more CPFP boost +3. P becomes `Independent(P)` at its own rate (2 sat/vB) +4. Insert P into appropriate block (probably much later) +5. Cascade to fill block 2's gap + +### Case 3: Remove an ancestor from a Bundle (confirmation) +``` +Bundle { child: C, entries: [P, C], rate: 26 } in block 2 +P gets confirmed (separate from C? unusual but possible in reorg) +``` +1. Remove Bundle from block 2 +2. C no longer needs P +3. C becomes `Independent(C)` at 50 sat/vB +4. Insert C into earlier block (higher rate now) +5. Cascade as needed + +### Case 4: Remove tx that has Pending descendants +``` +Independent(P) in block 5 +Pending { entry: C, waiting_for: {P} } +P gets confirmed +``` +1. Remove P from block 5 +2. Find all Pending txs waiting for P +3. For each: remove P from waiting_for +4. If waiting_for is empty: convert to Independent, insert into blocks +5. Cascade to fill block 5's gap + +### Case 5: Remove middle of a chain (tx dropped/invalid) +``` +Bundle { child: D, entries: [A, B, C, D] } +B gets dropped (double-spend, RBF, etc.) +``` +B invalid means C and D are invalid too (missing input): +1. Remove entire Bundle +2. A becomes `Independent(A)` if still valid +3. C, D removed from mempool entirely + +### Case 6: RBF replacement +``` +Independent(A) in block 3 +New tx A' replaces A (same inputs, higher fee) +``` +1. Remove A (and any descendants - they're now invalid) +2. Insert A' as new Independent or Bundle + +## Pending → Active Transitions + +When a tx is removed (confirmed), check all `Pending` entries: + +```rust +fn on_tx_removed(&mut self, txid: Txid) { + let pending_to_update: Vec<_> = self.pending + .iter() + .filter(|(_, p)| p.waiting_for.contains(&txid)) + .map(|(id, _)| *id) + .collect(); + + for pending_txid in pending_to_update { + let pending = self.pending.get_mut(&pending_txid).unwrap(); + pending.waiting_for.remove(&txid); + + if pending.waiting_for.is_empty() { + // Convert to Independent and insert + let entry = self.pending.remove(&pending_txid).unwrap(); + self.insert_independent(entry.entry); + } + } +} +``` + +## Cascade Algorithm + +### Cascade Down (after insert causes overflow) + +```rust +fn cascade_down(&mut self, starting_block: usize) { + let mut block_idx = starting_block; + + while block_idx < self.blocks.len() { + let block = &mut self.blocks[block_idx]; + + if block.total_vsize <= BLOCK_VSIZE_TARGET { + break; // No overflow + } + + // Pop lowest fee rate package + let overflow = block.packages.pop().unwrap(); + block.total_vsize -= overflow.vsize(); + block.total_fee -= overflow.fee(); + + // Push to next block + if block_idx + 1 >= self.blocks.len() { + self.blocks.push(ProjectedBlock::new()); + } + + let next_block = &mut self.blocks[block_idx + 1]; + // Insert at beginning (it's the highest fee rate in this block) + next_block.packages.insert(0, overflow); + next_block.total_vsize += overflow.vsize(); + next_block.total_fee += overflow.fee(); + + block_idx += 1; + } +} +``` + +### Cascade Up (after remove causes underflow) + +```rust +fn cascade_up(&mut self, starting_block: usize) { + let mut block_idx = starting_block; + + while block_idx < self.blocks.len() { + let current_vsize = self.blocks[block_idx].total_vsize; + + if current_vsize >= BLOCK_VSIZE_TARGET { + break; // Block is full enough + } + + if block_idx + 1 >= self.blocks.len() { + break; // No more blocks to pull from + } + + let next_block = &mut self.blocks[block_idx + 1]; + if next_block.packages.is_empty() { + // Remove empty block + self.blocks.remove(block_idx + 1); + break; + } + + // Pull highest fee rate package from next block + let pulled = next_block.packages.remove(0); + next_block.total_vsize -= pulled.vsize(); + next_block.total_fee -= pulled.fee(); + + // Add to current block + let current_block = &mut self.blocks[block_idx]; + current_block.packages.push(pulled); + current_block.total_vsize += pulled.vsize(); + current_block.total_fee += pulled.fee(); + + block_idx += 1; + } + + // Clean up empty trailing blocks + while self.blocks.last().map(|b| b.packages.is_empty()).unwrap_or(false) { + self.blocks.pop(); + } +} +``` + +## Data Structure for Efficient Operations + +Need to track: +1. `txid → MempoolPackage` - which package contains a tx +2. `txid → block_index` - which block a tx/package is in +3. `txid → Vec` - descendants waiting for this tx (Pending) +4. Per-block: sorted packages by fee rate + +```rust +struct MempoolState { + /// All packages (Independent, Bundle, or Pending) + packages: FxHashMap, + + /// For Bundle: maps each txid in bundle to the child txid (package key) + tx_to_package: FxHashMap, + + /// Maps package key (txid) to block index, None if Pending + package_to_block: FxHashMap>, + + /// Maps txid to list of Pending txids waiting for it + waiting_on: FxHashMap>, + + /// The projected blocks + blocks: Vec, +} +``` + +## Open Questions + +1. **Block fullness threshold**: Should we cascade when exactly at 1MB, or allow slight overflow? + +2. **Minimum fee rate**: Packages below minimum relay fee should be excluded? + +3. **Maximum ancestors**: Bitcoin Core limits ancestor count (25). Should we? + +4. **Memory bounds**: For huge mempools, should we limit projected blocks count? + +## mempool.space Implementation Analysis + +Source: [mempool/mempool rust/gbt/src](https://github.com/mempool/mempool/tree/master/rust/gbt/src) + +### Key Files +- `gbt.rs` - Main block building algorithm +- `audit_transaction.rs` - Transaction wrapper with ancestor tracking +- `thread_transaction.rs` - Lightweight tx representation + +### Their Approach + +**Not incremental!** They rebuild from scratch but optimize heavily: + +1. **Use numeric UIDs** instead of 32-byte txids - massive memory/hash savings +2. **Ancestor score** = (fee + ancestor_fees) / (weight + ancestor_weights) +3. **Two-source selection**: + - `mempool_stack`: Original sorted order + - `modified` priority queue: Txs whose scores changed due to parent selection +4. **Greedy selection loop**: + - Pick highest ancestor score from either source + - Include all ancestors first + - Update all descendants' scores (via `update_descendants()`) + - Move affected descendants to `modified` queue + +### Why They Don't Do Incremental + +The `update_descendants()` cascade is the key insight: when you select a tx, ALL its descendants need score recalculation because their ancestor set changed. This cascade can touch a huge portion of the mempool. + +Their solution: rebuild fast rather than update incrementally. +- Use u32 UIDs (4 bytes vs 32 bytes) +- Pre-allocate with capacity 1,048,576 +- Custom u32-based hasher +- Run in separate thread via `spawn_blocking()` + +### Should We Follow Their Approach? + +**Pros of their approach:** +- Simpler code, fewer edge cases +- Proven correct (Bitcoin Core algorithm port) +- Fast enough with optimizations (sub-100ms for full rebuild) + +**Pros of incremental:** +- Lower latency for small changes +- Less CPU spike on each update +- Better for very high tx throughput + +**Recommendation:** Start with their approach (rebuild with optimizations), measure performance. Only add incremental if needed. + +### Simplified Algorithm (from mempool.space) + +``` +1. Build audit_pool: Map +2. set_relatives(): compute ancestors for each tx +3. Sort by ancestor_score descending +4. while mempool not empty: + a. Pick best from (stack, modified_queue) + b. If ancestors not yet selected, select them first + c. Add to current block + d. If block full, start new block + e. update_descendants(): recalc scores, add to modified_queue +5. Return blocks +``` + +### Key Optimization: Ancestor Score + +```rust +ancestor_score = (tx.fee + sum(ancestor.fee)) / (tx.weight + sum(ancestor.weight)) +``` + +This single metric captures CPFP naturally - a high-fee child boosts its low-fee parents by being evaluated as a package. + +## Revised Implementation Plan + +Given mempool.space's approach works well, simplify our design: + +### Phase 1: Fast Rebuild (MVP) +- Use u32 UIDs for txs +- Compute ancestor scores +- Greedy selection with modified queue +- Rebuild on each change (with spawn_blocking) + +### Phase 2: Incremental (If Needed) +- Track which txs changed +- Only rebuild affected portions +- Cascade updates through dependency graph + +### Phase 3: Further Optimizations +- Batch updates (coalesce rapid changes) +- Dirty flag + lazy rebuild +- Background thread continuous updates + +## References + +- Bitcoin Core's block assembly: `src/node/miner.cpp` +- [mempool.space Rust GBT](https://github.com/mempool/mempool/tree/master/rust/gbt/src) +- [mempool-blocks.ts](https://github.com/mempool/mempool/blob/master/backend/src/api/mempool-blocks.ts) diff --git a/crates/brk_monitor/examples/mempool.rs b/crates/brk_monitor/examples/mempool.rs index 4eb3abe48..64eac9915 100644 --- a/crates/brk_monitor/examples/mempool.rs +++ b/crates/brk_monitor/examples/mempool.rs @@ -5,10 +5,9 @@ use brk_monitor::Mempool; use brk_rpc::{Auth, Client}; fn main() -> Result<()> { - // Connect to Bitcoin Core - let bitcoin_dir = Client::default_bitcoin_path(); - // let bitcoin_dir = Path::new("/Volumes/WD_BLACK/bitcoin"); + brk_logger::init(None)?; + let bitcoin_dir = Client::default_bitcoin_path(); let client = Client::new( Client::default_url(), Auth::CookieFile(bitcoin_dir.join(".cookie")), @@ -16,19 +15,58 @@ fn main() -> Result<()> { let mempool = Mempool::new(&client); + // Start mempool sync in background thread let mempool_clone = mempool.clone(); thread::spawn(move || { mempool_clone.start(); }); - // Access from main thread + // Poll and display stats every 5 seconds loop { thread::sleep(Duration::from_secs(5)); - let txs = mempool.get_txs(); - println!("mempool_tx_count: {}", txs.len()); - let addresses = mempool.get_addresses(); - println!("mempool_address_count: {}", addresses.len()); - } - // Ok(()) + // Basic mempool info + let info = mempool.get_info(); + let block_stats = mempool.get_block_stats(); + let total_fees: u64 = block_stats.iter().map(|s| u64::from(s.total_fee)).sum(); + println!("\n=== Mempool Info ==="); + println!(" Transactions: {}", info.count); + println!(" Total vsize: {} vB", info.vsize); + println!( + " Total fees: {:.4} BTC", + total_fees as f64 / 100_000_000.0 + ); + + // Fee recommendations (like mempool.space) + let fees = mempool.get_fees(); + println!("\n=== Recommended Fees (sat/vB) ==="); + println!(" No Priority {:.4}", f64::from(fees.economy_fee)); + println!(" Low Priority {:.4}", f64::from(fees.hour_fee)); + println!(" Medium Priority {:.4}", f64::from(fees.half_hour_fee)); + println!(" High Priority {:.4}", f64::from(fees.fastest_fee)); + + // Projected blocks (like mempool.space) + if !block_stats.is_empty() { + println!("\n=== Projected Blocks ==="); + for (i, stats) in block_stats.iter().enumerate() { + let total_fee_btc = u64::from(stats.total_fee) as f64 / 100_000_000.0; + println!( + " Block {}: ~{:.4} sat/vB, {:.4}-{:.4} sat/vB, {:.3} BTC, {} txs", + i + 1, + f64::from(stats.median_fee_rate()), + f64::from(stats.min_fee_rate()), + f64::from(stats.max_fee_rate()), + total_fee_btc, + stats.tx_count, + ); + } + } + + // Address tracking stats + let addresses = mempool.get_addresses(); + println!("\n=== Address Tracking ==="); + println!(" Addresses with pending txs: {}", addresses.len()); + + println!("\n----------------------------------------"); + } } diff --git a/crates/brk_monitor/src/lib.rs b/crates/brk_monitor/src/lib.rs index c5235fb26..7c39c3954 100644 --- a/crates/brk_monitor/src/lib.rs +++ b/crates/brk_monitor/src/lib.rs @@ -1,234 +1,10 @@ -use std::{sync::Arc, thread, time::Duration}; - -use brk_error::Result; -use brk_rpc::Client; -use brk_types::{ - AddressBytes, AddressMempoolStats, MempoolInfo, RecommendedFees, TxWithHex, Txid, -}; -use derive_deref::Deref; -use log::error; -use parking_lot::{RwLock, RwLockReadGuard}; -use rustc_hash::{FxHashMap, FxHashSet}; +//! Bitcoin mempool monitor. +//! +//! Provides real-time mempool tracking with: +//! - Fee estimation via projected blocks +//! - Address mempool stats +//! - CPFP-aware block building mod mempool; -use mempool::{ProjectedBlocks, TxGraph}; - -const MAX_FETCHES_PER_CYCLE: usize = 10_000; - -/// -/// Mempool monitor -/// -/// Thread safe and free to clone -/// -#[derive(Clone, Deref)] -pub struct Mempool(Arc); - -impl Mempool { - pub fn new(client: &Client) -> Self { - Self(Arc::new(MempoolInner::new(client.clone()))) - } -} - -pub struct MempoolInner { - client: Client, - info: RwLock, - fees: RwLock, - graph: RwLock, - projected_blocks: RwLock, - txs: RwLock>, - addresses: RwLock)>>, -} - -impl MempoolInner { - pub fn new(client: Client) -> Self { - Self { - client, - info: RwLock::new(MempoolInfo::default()), - fees: RwLock::new(RecommendedFees::default()), - graph: RwLock::new(TxGraph::new()), - projected_blocks: RwLock::new(ProjectedBlocks::default()), - txs: RwLock::new(FxHashMap::default()), - addresses: RwLock::new(FxHashMap::default()), - } - } - - pub fn get_info(&self) -> MempoolInfo { - self.info.read().clone() - } - - pub fn get_fees(&self) -> RecommendedFees { - self.fees.read().clone() - } - - pub fn get_projected_blocks(&self) -> ProjectedBlocks { - self.projected_blocks.read().clone() - } - - pub fn get_txs(&self) -> RwLockReadGuard<'_, FxHashMap> { - self.txs.read() - } - - pub fn get_addresses( - &self, - ) -> RwLockReadGuard<'_, FxHashMap)>> { - self.addresses.read() - } - - /// Start an infinite update loop with a 1 second interval - pub fn start(&self) { - loop { - if let Err(e) = self.update() { - error!("Error updating mempool: {}", e); - } - thread::sleep(Duration::from_secs(1)); - } - } - - pub fn update(&self) -> Result<()> { - let current_txids = self - .client - .get_raw_mempool()? - .into_iter() - .collect::>(); - - let new_txs = self.fetch_new_txs(¤t_txids); - let has_changes = self.apply_changes(¤t_txids, &new_txs); - - if has_changes { - self.rebuild_projected_blocks(); - } - - Ok(()) - } - - /// Fetch transactions that are new to our mempool - fn fetch_new_txs(&self, current_txids: &FxHashSet) -> FxHashMap { - let txs = self.txs.read(); - current_txids - .iter() - .filter(|txid| !txs.contains_key(*txid)) - .take(MAX_FETCHES_PER_CYCLE) - .cloned() - .collect::>() - .into_iter() - .filter_map(|txid| { - self.client - .get_mempool_transaction(&txid) - .ok() - .map(|tx| (txid, tx)) - }) - .collect() - } - - /// Apply transaction additions and removals, returns true if there were changes - fn apply_changes( - &self, - current_txids: &FxHashSet, - new_txs: &FxHashMap, - ) -> bool { - let mut info = self.info.write(); - let mut graph = self.graph.write(); - let mut txs = self.txs.write(); - let mut addresses = self.addresses.write(); - - let mut had_removals = false; - let had_additions = !new_txs.is_empty(); - - // Remove transactions no longer in mempool - txs.retain(|txid, tx_with_hex| { - if current_txids.contains(txid) { - return true; - } - - had_removals = true; - let tx = tx_with_hex.tx(); - - info.remove(tx); - graph.remove(txid); - Self::update_address_stats_on_removal(tx, txid, &mut addresses); - - false - }); - - // Add new transactions - for (txid, tx_with_hex) in new_txs { - let tx = tx_with_hex.tx(); - - info.add(tx); - graph.insert(tx); - Self::update_address_stats_on_addition(tx, txid, &mut addresses); - } - txs.extend(new_txs.clone()); - - had_removals || had_additions - } - - /// Rebuild projected blocks and update recommended fees - fn rebuild_projected_blocks(&self) { - let graph = self.graph.read(); - let projected = ProjectedBlocks::build(&graph); - let fees = projected.recommended_fees(); - - *self.projected_blocks.write() = projected; - *self.fees.write() = fees; - } - - fn update_address_stats_on_removal( - tx: &brk_types::Transaction, - txid: &Txid, - addresses: &mut FxHashMap)>, - ) { - // Inputs: undo "sending" state - tx.input - .iter() - .flat_map(|txin| txin.prevout.as_ref()) - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - set.remove(txid); - stats.sent(txout); - stats.update_tx_count(set.len() as u32); - }); - - // Outputs: undo "receiving" state - tx.output - .iter() - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - set.remove(txid); - stats.received(txout); - stats.update_tx_count(set.len() as u32); - }); - } - - fn update_address_stats_on_addition( - tx: &brk_types::Transaction, - txid: &Txid, - addresses: &mut FxHashMap)>, - ) { - // Inputs: mark as "sending" - tx.input - .iter() - .flat_map(|txin| txin.prevout.as_ref()) - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - set.insert(txid.clone()); - stats.sending(txout); - stats.update_tx_count(set.len() as u32); - }); - - // Outputs: mark as "receiving" - tx.output - .iter() - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - set.insert(txid.clone()); - stats.receiving(txout); - stats.update_tx_count(set.len() as u32); - }); - } -} +pub use mempool::{BlockStats, Mempool, MempoolInner, ProjectedSnapshot}; diff --git a/crates/brk_monitor/src/mempool/block_builder/audit.rs b/crates/brk_monitor/src/mempool/block_builder/audit.rs new file mode 100644 index 000000000..13ace27d4 --- /dev/null +++ b/crates/brk_monitor/src/mempool/block_builder/audit.rs @@ -0,0 +1,164 @@ +use std::ops::{Index, IndexMut}; + +use brk_types::{Sats, VSize}; +use smallvec::SmallVec; + +use crate::mempool::{MempoolTxIndex, PoolIndex}; + +/// Compare ancestor fee rates using cross-multiplication (avoids f64 division). +/// Returns true if (fee_a / vsize_a) > (fee_b / vsize_b). +#[inline] +fn has_higher_fee_rate(fee_a: Sats, vsize_a: VSize, fee_b: Sats, vsize_b: VSize) -> bool { + // Cross multiply: fee_a/vsize_a > fee_b/vsize_b ⟺ fee_a * vsize_b > fee_b * vsize_a + let score_a = u64::from(fee_a) as u128 * u64::from(vsize_b) as u128; + let score_b = u64::from(fee_b) as u128 * u64::from(vsize_a) as u128; + score_a > score_b +} + +/// Type-safe wrapper around Vec that only allows PoolIndex access. +pub struct Pool(Vec); + +impl Pool { + pub fn new(txs: Vec) -> Self { + Self(txs) + } + + #[inline] + pub fn len(&self) -> usize { + self.0.len() + } +} + +impl Index for Pool { + type Output = AuditTx; + + #[inline] + fn index(&self, idx: PoolIndex) -> &Self::Output { + &self.0[idx.as_usize()] + } +} + +impl IndexMut for Pool { + #[inline] + fn index_mut(&mut self, idx: PoolIndex) -> &mut Self::Output { + &mut self.0[idx.as_usize()] + } +} + +/// Lightweight transaction for block building. +/// Created fresh each rebuild, discarded after. +pub struct AuditTx { + /// Original entries index (for final output) + pub entries_idx: MempoolTxIndex, + /// Pool index (for internal graph traversal) + pub pool_idx: PoolIndex, + pub fee: Sats, + pub vsize: VSize, + /// In-mempool parent pool indices + pub parents: SmallVec<[PoolIndex; 4]>, + /// In-mempool child pool indices + pub children: SmallVec<[PoolIndex; 8]>, + /// Cumulative fee (self + all ancestors) + pub ancestor_fee: Sats, + /// Cumulative vsize (self + all ancestors) + pub ancestor_vsize: VSize, + /// Already selected into a block + pub used: bool, + /// Already in modified priority queue + pub in_modified: bool, +} + +impl AuditTx { + /// Create AuditTx with pre-computed ancestor values from Bitcoin Core. + pub fn new_with_ancestors( + entries_idx: MempoolTxIndex, + pool_idx: PoolIndex, + fee: Sats, + vsize: VSize, + ancestor_fee: Sats, + ancestor_vsize: VSize, + ) -> Self { + Self { + entries_idx, + pool_idx, + fee, + vsize, + parents: SmallVec::new(), + children: SmallVec::new(), + ancestor_fee, + ancestor_vsize, + used: false, + in_modified: false, + } + } + + #[inline] + pub fn has_higher_score_than(&self, other: &Self) -> bool { + has_higher_fee_rate( + self.ancestor_fee, + self.ancestor_vsize, + other.ancestor_fee, + other.ancestor_vsize, + ) + } +} + +/// Priority queue entry for the modified queue. +/// Stores a snapshot of ancestor values at time of insertion. +#[derive(Clone, Copy)] +pub struct TxPriority { + /// Pool index (for indexing into pool array) + pub pool_idx: PoolIndex, + /// Snapshot of ancestor fee at insertion time + pub ancestor_fee: Sats, + /// Snapshot of ancestor vsize at insertion time + pub ancestor_vsize: VSize, +} + +impl TxPriority { + pub fn new(tx: &AuditTx) -> Self { + Self { + pool_idx: tx.pool_idx, + ancestor_fee: tx.ancestor_fee, + ancestor_vsize: tx.ancestor_vsize, + } + } + + #[inline] + pub fn has_higher_score_than(&self, other: &Self) -> bool { + has_higher_fee_rate( + self.ancestor_fee, + self.ancestor_vsize, + other.ancestor_fee, + other.ancestor_vsize, + ) + } +} + +impl PartialEq for TxPriority { + fn eq(&self, other: &Self) -> bool { + self.pool_idx == other.pool_idx + } +} + +impl Eq for TxPriority {} + +impl PartialOrd for TxPriority { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TxPriority { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Higher score = higher priority (for max-heap) + if self.has_higher_score_than(other) { + std::cmp::Ordering::Greater + } else if other.has_higher_score_than(self) { + std::cmp::Ordering::Less + } else { + // Tiebreaker: lower index first (deterministic) + other.pool_idx.cmp(&self.pool_idx) + } + } +} diff --git a/crates/brk_monitor/src/mempool/block_builder/build.rs b/crates/brk_monitor/src/mempool/block_builder/build.rs new file mode 100644 index 000000000..0a1e7989f --- /dev/null +++ b/crates/brk_monitor/src/mempool/block_builder/build.rs @@ -0,0 +1,118 @@ +use brk_types::TxidPrefix; +use rustc_hash::FxHashMap; + +use super::audit::{AuditTx, Pool}; +use super::selection::select_into_blocks; +use crate::mempool::{MempoolEntry, MempoolTxIndex, PoolIndex, SelectedTx}; + +/// Number of projected blocks to build +const NUM_PROJECTED_BLOCKS: usize = 8; + +/// Estimated txs per block (for partial sort optimization) +const TXS_PER_BLOCK: usize = 4000; + +/// Build projected blocks from mempool entries. +/// +/// Returns SelectedTx (with effective fee rate) grouped by block, in mining priority order. +pub fn build_projected_blocks(entries: &[Option]) -> Vec> { + // Collect live entries + let live: Vec<(MempoolTxIndex, &MempoolEntry)> = entries + .iter() + .enumerate() + .filter_map(|(i, opt)| opt.as_ref().map(|e| (MempoolTxIndex::from(i), e))) + .collect(); + + if live.is_empty() { + return Vec::new(); + } + + // Build AuditTx pool with pre-computed ancestor values from Bitcoin Core + let mut pool = Pool::new(build_audit_pool(&live)); + + // Sort by ancestor score (partial sort for efficiency) + let sorted = partial_sort_by_score(&pool); + + // Run selection algorithm + select_into_blocks(&mut pool, sorted, NUM_PROJECTED_BLOCKS) +} + +/// Build the AuditTx pool with parent/child relationships. +/// AuditTx.parents and .children store pool indices (for graph traversal). +/// AuditTx.entries_idx stores the original entries index (for final output). +/// Uses Bitcoin Core's pre-computed ancestor values (correct, no double-counting). +fn build_audit_pool(live: &[(MempoolTxIndex, &MempoolEntry)]) -> Vec { + // Create mapping from TxidPrefix to pool index + let prefix_to_pool_idx: FxHashMap = live + .iter() + .enumerate() + .map(|(pool_idx, (_, entry))| (entry.txid_prefix(), PoolIndex::from(pool_idx))) + .collect(); + + // Build pool with parent relationships + // Use Bitcoin Core's pre-computed ancestor_fee and ancestor_vsize + let mut pool: Vec = live + .iter() + .enumerate() + .map(|(pool_idx, (entries_idx, entry))| { + let pool_idx = PoolIndex::from(pool_idx); + let mut tx = AuditTx::new_with_ancestors( + *entries_idx, + pool_idx, + entry.fee, + entry.vsize, + entry.ancestor_fee, + entry.ancestor_vsize, + ); + + // Find in-mempool parents from depends list (provided by Bitcoin Core) + for parent_prefix in &entry.depends { + if let Some(&parent_pool_idx) = prefix_to_pool_idx.get(parent_prefix) { + tx.parents.push(parent_pool_idx); + } + } + + tx + }) + .collect(); + + // Build child relationships (reverse of parents) + for pool_idx in 0..pool.len() { + let parents = pool[pool_idx].parents.clone(); + for parent_pool_idx in parents { + pool[parent_pool_idx.as_usize()].children.push(PoolIndex::from(pool_idx)); + } + } + + pool +} + +/// Partial sort: only fully sort the top N txs needed for blocks. +/// Returns pool indices sorted by ancestor score. +fn partial_sort_by_score(pool: &Pool) -> Vec { + let mut indices: Vec = (0..pool.len()).map(PoolIndex::from).collect(); + let needed = NUM_PROJECTED_BLOCKS * TXS_PER_BLOCK; + + // Comparator: descending by score, then ascending by index (deterministic tiebreaker) + let cmp = |a: &PoolIndex, b: &PoolIndex| -> std::cmp::Ordering { + let tx_a = &pool[*a]; + let tx_b = &pool[*b]; + if tx_b.has_higher_score_than(tx_a) { + std::cmp::Ordering::Greater + } else if tx_a.has_higher_score_than(tx_b) { + std::cmp::Ordering::Less + } else { + a.cmp(b) + } + }; + + if indices.len() > needed { + // Partition: move top `needed` to front (unordered), then sort just those + indices.select_nth_unstable_by(needed, cmp); + indices[..needed].sort_unstable_by(cmp); + indices.truncate(needed); + } else { + indices.sort_unstable_by(cmp); + } + + indices +} diff --git a/crates/brk_monitor/src/mempool/block_builder/mod.rs b/crates/brk_monitor/src/mempool/block_builder/mod.rs new file mode 100644 index 000000000..2cf327216 --- /dev/null +++ b/crates/brk_monitor/src/mempool/block_builder/mod.rs @@ -0,0 +1,5 @@ +mod audit; +mod build; +mod selection; + +pub use build::build_projected_blocks; diff --git a/crates/brk_monitor/src/mempool/block_builder/selection.rs b/crates/brk_monitor/src/mempool/block_builder/selection.rs new file mode 100644 index 000000000..88afc0902 --- /dev/null +++ b/crates/brk_monitor/src/mempool/block_builder/selection.rs @@ -0,0 +1,278 @@ +use std::collections::BinaryHeap; + +use brk_types::FeeRate; + +use super::audit::{Pool, TxPriority}; +use crate::mempool::{PoolIndex, SelectedTx}; + +/// Target vsize per block (~1MB, derived from 4MW weight limit) +const BLOCK_VSIZE_LIMIT: u64 = 1_000_000; + +/// Select transactions into blocks using the two-source algorithm. +/// +/// Takes pool indices (sorted by score), returns SelectedTx with effective fee rate at selection time. +pub fn select_into_blocks( + pool: &mut Pool, + sorted_pool_indices: Vec, + num_blocks: usize, +) -> Vec> { + let mut blocks: Vec> = Vec::with_capacity(num_blocks); + let mut current_block: Vec = Vec::new(); + let mut current_vsize: u64 = 0; + + let mut sorted_iter = sorted_pool_indices.into_iter().peekable(); + let mut modified_queue: BinaryHeap = BinaryHeap::new(); + + 'outer: loop { + // Pick best candidate from either sorted list or modified queue + let best_pool_idx = pick_best_candidate(pool, &mut sorted_iter, &mut modified_queue); + let Some(pool_idx) = best_pool_idx else { + break; + }; + + // Skip if already used + if pool[pool_idx].used { + continue; + } + + // Capture the package rate BEFORE selecting ancestors + // This is the rate that justified this tx (and its ancestors) for inclusion + let package_rate = { + let tx = &pool[pool_idx]; + FeeRate::from((tx.ancestor_fee, tx.ancestor_vsize)) + }; + + // Select this tx and all its unselected ancestors + let selected = select_with_ancestors(pool, pool_idx); + + for sel_pool_idx in selected { + let tx = &pool[sel_pool_idx]; + let tx_vsize = u64::from(tx.vsize); + + // Check if tx fits in current block + if current_vsize + tx_vsize > BLOCK_VSIZE_LIMIT && !current_block.is_empty() { + blocks.push(std::mem::take(&mut current_block)); + current_vsize = 0; + + if blocks.len() >= num_blocks { + // Early exit - we have enough blocks + break 'outer; + } + } + + // Effective fee rate = the package rate at selection time. + // This is the mining score that determined which block this tx lands in. + // For CPFP, both parent and child get the same package rate (the child's score). + current_block.push(SelectedTx { + entries_idx: tx.entries_idx, + effective_fee_rate: package_rate, + }); + current_vsize += tx_vsize; + + // Update descendants' ancestor scores + update_descendants(pool, sel_pool_idx, &mut modified_queue); + } + } + + // Don't forget the last block + if !current_block.is_empty() && blocks.len() < num_blocks { + blocks.push(current_block); + } + + // Post-process: fix fee rate ordering violations between adjacent blocks. + // This handles cases where a tx's score improved after its target block was full. + fix_block_ordering(&mut blocks); + + // Log how many txs were left unselected + let total_selected: usize = blocks.iter().map(|b| b.len()).sum(); + log::debug!( + "Selected {} txs into {} blocks, modified_queue has {} remaining", + total_selected, + blocks.len(), + modified_queue.len() + ); + + blocks +} + +/// Pick the best candidate from sorted list or modified queue. +/// Returns a pool index. +fn pick_best_candidate( + pool: &Pool, + sorted_iter: &mut std::iter::Peekable>, + modified_queue: &mut BinaryHeap, +) -> Option { + // Skip used txs in sorted iterator + while sorted_iter.peek().is_some_and(|&idx| pool[idx].used) { + sorted_iter.next(); + } + + // Skip used txs and stale entries in modified queue. + // A tx can be pushed multiple times as its score improves (when different ancestors are selected). + // For example: tx C depends on A and B. When A is selected, C is pushed with score 2.0. + // When B is selected, C is pushed again with score 4.0. The queue now has two entries for C. + // We skip the stale 2.0 entry and use the current 4.0 entry. + while let Some(p) = modified_queue.peek() { + let tx = &pool[p.pool_idx]; + if tx.used { + modified_queue.pop(); + continue; + } + // Check if this queue entry has outdated snapshot (a newer entry exists with better score) + if p.ancestor_fee != tx.ancestor_fee || p.ancestor_vsize != tx.ancestor_vsize { + modified_queue.pop(); + continue; + } + break; + } + + let sorted_best = sorted_iter.peek().map(|&idx| &pool[idx]); + let modified_best = modified_queue.peek().map(|p| &pool[p.pool_idx]); + + match (sorted_best, modified_best) { + (None, None) => None, + (Some(_), None) => sorted_iter.next(), + (None, Some(_)) => { + let p = modified_queue.pop().unwrap(); + Some(p.pool_idx) + } + (Some(sorted_tx), Some(modified_tx)) => { + // Compare CURRENT scores from pool (not stale snapshots) + if sorted_tx.has_higher_score_than(modified_tx) { + sorted_iter.next() + } else { + let p = modified_queue.pop().unwrap(); + Some(p.pool_idx) + } + } + } +} + +/// Select a tx and all its unselected ancestors (topological order). +/// Takes and returns pool indices. +fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec { + let mut to_select: Vec = Vec::new(); + let mut stack = vec![pool_idx]; + + // DFS to find all unselected ancestors + while let Some(current) = stack.pop() { + let tx = &pool[current]; + if tx.used { + continue; + } + + // Push unselected parents onto stack (process parents first) + let has_unselected_parents = tx.parents.iter().any(|&p| !pool[p].used); + if has_unselected_parents { + stack.push(current); // Re-add self to process after parents + for &parent in &tx.parents { + if !pool[parent].used { + stack.push(parent); + } + } + } else { + // All parents selected, can select this one + if !pool[current].used { + pool[current].used = true; + to_select.push(current); + } + } + } + + to_select +} + +/// Fix fee rate ordering violations between blocks. +/// Swaps txs between adjacent blocks until Block N's min >= Block N+1's max. +fn fix_block_ordering(blocks: &mut [Vec]) { + // Iterate until no more swaps needed + let mut changed = true; + let mut iterations = 0; + const MAX_ITERATIONS: usize = 100; // Prevent infinite loops + + while changed && iterations < MAX_ITERATIONS { + changed = false; + iterations += 1; + + for i in 0..blocks.len().saturating_sub(1) { + // Find min in block i and max in block i+1 + let Some(curr_min_idx) = blocks[i] + .iter() + .enumerate() + .min_by_key(|(_, s)| s.effective_fee_rate) + .map(|(idx, _)| idx) + else { + continue; + }; + + let Some(next_max_idx) = blocks[i + 1] + .iter() + .enumerate() + .max_by_key(|(_, s)| s.effective_fee_rate) + .map(|(idx, _)| idx) + else { + continue; + }; + + let curr_min = blocks[i][curr_min_idx].effective_fee_rate; + let next_max = blocks[i + 1][next_max_idx].effective_fee_rate; + + // If violation exists, swap the two txs + if next_max > curr_min { + // Swap: move high-fee tx to earlier block, low-fee tx to later block + let high_tx = blocks[i + 1].swap_remove(next_max_idx); + let low_tx = blocks[i].swap_remove(curr_min_idx); + blocks[i].push(high_tx); + blocks[i + 1].push(low_tx); + changed = true; + } + } + } + + if iterations >= MAX_ITERATIONS { + log::warn!("fix_block_ordering: reached max iterations, some violations may remain"); + } +} + +/// Update descendants' ancestor scores after selecting a tx. +/// Takes a pool index. +fn update_descendants( + pool: &mut Pool, + selected_pool_idx: PoolIndex, + modified_queue: &mut BinaryHeap, +) { + let selected_fee = pool[selected_pool_idx].fee; + let selected_vsize = pool[selected_pool_idx].vsize; + + // Track visited to avoid double-subtracting in diamond patterns + let mut visited = rustc_hash::FxHashSet::default(); + + // BFS through children (children are pool indices) + let mut stack: Vec = pool[selected_pool_idx].children.to_vec(); + + while let Some(child_idx) = stack.pop() { + // Skip if already visited (handles diamond patterns) + if !visited.insert(child_idx) { + continue; + } + + let child = &mut pool[child_idx]; + + if child.used { + continue; + } + + // Subtract selected tx from ancestor totals + child.ancestor_fee -= selected_fee; + child.ancestor_vsize -= selected_vsize; + + // Always re-push to modified queue with updated score. + // This may create duplicates, but we handle that by checking + // if the tx is used or if the snapshot is stale when popping. + modified_queue.push(TxPriority::new(child)); + child.in_modified = true; + + // Continue to grandchildren + stack.extend(child.children.iter().copied()); + } +} diff --git a/crates/brk_monitor/src/mempool/entry.rs b/crates/brk_monitor/src/mempool/entry.rs index c8735518c..5b3b3768c 100644 --- a/crates/brk_monitor/src/mempool/entry.rs +++ b/crates/brk_monitor/src/mempool/entry.rs @@ -1,62 +1,55 @@ -use brk_types::{FeeRate, Sats, Transaction, Txid, VSize, Vout}; -use rustc_hash::FxHashSet; +use brk_types::{FeeRate, MempoolEntryInfo, Sats, Txid, TxidPrefix, VSize}; -/// (txid, vout) tuple identifying an unspent output in the mempool -pub type MempoolOutpoint = (Txid, Vout); - -/// A mempool transaction with its dependency metadata +/// A mempool transaction entry. +/// +/// Stores only the data needed for fee estimation and block building. +/// Ancestor values are pre-computed by Bitcoin Core (correctly handling shared ancestors). #[derive(Debug, Clone)] pub struct MempoolEntry { pub txid: Txid, pub fee: Sats, pub vsize: VSize, - - /// Outpoints this tx spends (inputs) - pub spends: Vec, - - /// Txids of unconfirmed ancestors (parents, grandparents, etc.) - pub ancestors: FxHashSet, - - /// Cumulative fee of this tx + all ancestors + /// Pre-computed ancestor fee (self + all ancestors, no double-counting) pub ancestor_fee: Sats, - - /// Cumulative vsize of this tx + all ancestors + /// Pre-computed ancestor vsize (self + all ancestors, no double-counting) pub ancestor_vsize: VSize, + /// Parent txid prefixes (transactions this tx depends on) + pub depends: Vec, } impl MempoolEntry { - pub fn new(tx: &Transaction) -> Self { - let txid = tx.txid.clone(); - let fee = tx.fee; - let vsize = tx.vsize(); - - let spends = tx - .input - .iter() - .map(|txin| (txin.txid.clone(), txin.vout)) - .collect(); - + pub fn from_info(info: &MempoolEntryInfo) -> Self { Self { - txid, - fee, - vsize, - spends, - ancestors: FxHashSet::default(), - ancestor_fee: fee, - ancestor_vsize: vsize, + txid: info.txid.clone(), + fee: info.fee, + vsize: VSize::from(info.vsize), + ancestor_fee: info.ancestor_fee, + ancestor_vsize: VSize::from(info.ancestor_size), + depends: info.depends.iter().map(TxidPrefix::from).collect(), } } - /// Individual fee rate (without ancestors) #[inline] pub fn fee_rate(&self) -> FeeRate { FeeRate::from((self.fee, self.vsize)) } - /// Ancestor fee rate (fee + ancestors_fee) / (vsize + ancestors_vsize) - /// This is the effective mining priority + /// Ancestor fee rate (package rate for CPFP) #[inline] pub fn ancestor_fee_rate(&self) -> FeeRate { FeeRate::from((self.ancestor_fee, self.ancestor_vsize)) } + + /// Effective fee rate for display - the rate that justified this tx's inclusion. + /// For CPFP parents, this is their ancestor_fee_rate (child paying for them). + /// For regular txs, this is their own fee_rate. + #[inline] + pub fn effective_fee_rate(&self) -> FeeRate { + std::cmp::max(self.fee_rate(), self.ancestor_fee_rate()) + } + + #[inline] + pub fn txid_prefix(&self) -> TxidPrefix { + TxidPrefix::from(&self.txid) + } } diff --git a/crates/brk_monitor/src/mempool/graph.rs b/crates/brk_monitor/src/mempool/graph.rs deleted file mode 100644 index 93b5c968d..000000000 --- a/crates/brk_monitor/src/mempool/graph.rs +++ /dev/null @@ -1,175 +0,0 @@ -use brk_types::{Sats, Transaction, Txid, VSize, Vout}; -use rustc_hash::{FxHashMap, FxHashSet}; - -use super::entry::MempoolOutpoint; -use super::MempoolEntry; - -/// Transaction dependency graph for the mempool -/// -/// Tracks parent-child relationships and computes ancestor feerates -/// for proper CPFP (Child-Pays-For-Parent) handling. -#[derive(Debug, Default)] -pub struct TxGraph { - /// All mempool entries by txid - entries: FxHashMap, - - /// Maps outpoint -> txid that created it (for finding parents) - outpoint_to_tx: FxHashMap, - - /// Maps txid -> txids that spend its outputs (children) - children: FxHashMap>, -} - -impl TxGraph { - pub fn new() -> Self { - Self::default() - } - - pub fn entries(&self) -> &FxHashMap { - &self.entries - } - - pub fn len(&self) -> usize { - self.entries.len() - } - - pub fn is_empty(&self) -> bool { - self.entries.is_empty() - } - - /// Add a transaction to the graph - pub fn insert(&mut self, tx: &Transaction) { - let mut entry = MempoolEntry::new(tx); - - // Find in-mempool parents and build ancestor set - let parents = self.find_parents(&entry.spends); - entry.ancestors = self.compute_ancestors(&parents); - - // Compute ancestor fee/vsize - let (ancestor_fee, ancestor_vsize) = self.sum_ancestors(&entry.ancestors); - entry.ancestor_fee = entry.fee + ancestor_fee; - entry.ancestor_vsize = entry.vsize + ancestor_vsize; - - // Register this tx's outputs - for (vout, _) in tx.output.iter().enumerate() { - let outpoint = (entry.txid.clone(), Vout::from(vout as u32)); - self.outpoint_to_tx.insert(outpoint, entry.txid.clone()); - } - - // Register as child of parents - for parent in &parents { - self.children - .entry(parent.clone()) - .or_default() - .insert(entry.txid.clone()); - } - - self.entries.insert(entry.txid.clone(), entry); - } - - /// Remove a transaction from the graph - pub fn remove(&mut self, txid: &Txid) -> Option { - let entry = self.entries.remove(txid)?; - - // Remove from outpoint index - // Note: We don't know the vout count, so we remove all entries pointing to this txid - self.outpoint_to_tx.retain(|_, tx| tx != txid); - - // Remove from children index - self.children.remove(txid); - for children_set in self.children.values_mut() { - children_set.remove(txid); - } - - // Update descendants' ancestor data - self.update_descendants_after_removal(txid, &entry); - - Some(entry) - } - - /// Check if a txid is in the mempool - pub fn contains(&self, txid: &Txid) -> bool { - self.entries.contains_key(txid) - } - - /// Get all txids currently in the graph - pub fn txids(&self) -> impl Iterator { - self.entries.keys() - } - - /// Find which inputs reference in-mempool transactions (parents) - fn find_parents(&self, spends: &[MempoolOutpoint]) -> Vec { - spends - .iter() - .filter_map(|outpoint| self.outpoint_to_tx.get(outpoint).cloned()) - .collect() - } - - /// Compute full ancestor set (transitive closure) - fn compute_ancestors(&self, parents: &[Txid]) -> FxHashSet { - let mut ancestors = FxHashSet::default(); - let mut stack: Vec = parents.to_vec(); - - while let Some(txid) = stack.pop() { - if ancestors.insert(txid.clone()) { - if let Some(entry) = self.entries.get(&txid) { - stack.extend(entry.ancestors.iter().cloned()); - } - } - } - - ancestors - } - - /// Sum fee and vsize of all ancestors - fn sum_ancestors(&self, ancestors: &FxHashSet) -> (Sats, VSize) { - ancestors.iter().fold( - (Sats::default(), VSize::default()), - |(fee, vsize), txid| { - if let Some(entry) = self.entries.get(txid) { - (fee + entry.fee, vsize + entry.vsize) - } else { - (fee, vsize) - } - }, - ) - } - - /// Update all descendants after removing a transaction - fn update_descendants_after_removal(&mut self, removed: &Txid, removed_entry: &MempoolEntry) { - // Find all descendants - let descendants = self.find_descendants(removed); - - // Update each descendant's ancestor set and cumulative values - for desc_txid in descendants { - if let Some(desc) = self.entries.get_mut(&desc_txid) { - // Remove the removed tx from ancestors - desc.ancestors.remove(removed); - - // Subtract the removed tx's contribution - desc.ancestor_fee = desc.ancestor_fee - removed_entry.fee; - desc.ancestor_vsize = desc.ancestor_vsize - removed_entry.vsize; - } - } - } - - /// Find all descendants of a transaction (children, grandchildren, etc.) - fn find_descendants(&self, txid: &Txid) -> Vec { - let mut descendants = Vec::new(); - let mut stack = vec![txid.clone()]; - let mut visited = FxHashSet::default(); - - while let Some(current) = stack.pop() { - if let Some(children) = self.children.get(¤t) { - for child in children { - if visited.insert(child.clone()) { - descendants.push(child.clone()); - stack.push(child.clone()); - } - } - } - } - - descendants - } -} diff --git a/crates/brk_monitor/src/mempool/mod.rs b/crates/brk_monitor/src/mempool/mod.rs index a892c128e..cb670de27 100644 --- a/crates/brk_monitor/src/mempool/mod.rs +++ b/crates/brk_monitor/src/mempool/mod.rs @@ -1,7 +1,13 @@ +mod block_builder; mod entry; -mod graph; +mod monitor; mod projected_blocks; +mod types; -pub use entry::MempoolEntry; -pub use graph::TxGraph; -pub use projected_blocks::ProjectedBlocks; +// Public API +pub use monitor::{Mempool, MempoolInner}; +pub use projected_blocks::{BlockStats, ProjectedSnapshot}; + +// Crate-internal (used by submodules) +pub(crate) use entry::MempoolEntry; +pub(crate) use types::{MempoolTxIndex, PoolIndex, SelectedTx}; diff --git a/crates/brk_monitor/src/mempool/monitor.rs b/crates/brk_monitor/src/mempool/monitor.rs new file mode 100644 index 000000000..8e7083bef --- /dev/null +++ b/crates/brk_monitor/src/mempool/monitor.rs @@ -0,0 +1,341 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicBool, AtomicU64, Ordering}, + }, + thread, + time::{Duration, Instant}, +}; + +use brk_error::Result; +use brk_rpc::Client; +use brk_types::{ + AddressBytes, AddressMempoolStats, MempoolEntryInfo, MempoolInfo, RecommendedFees, TxWithHex, + Txid, TxidPrefix, +}; +use derive_deref::Deref; +use log::{error, info}; +use parking_lot::{RwLock, RwLockReadGuard}; +use rustc_hash::{FxHashMap, FxHashSet}; + +use super::block_builder::build_projected_blocks; +use super::entry::MempoolEntry; +use super::projected_blocks::{BlockStats, ProjectedSnapshot}; +use super::types::MempoolTxIndex; + +/// Max new txs to fetch full data for per update cycle (for address tracking) +const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000; + +/// Minimum interval between rebuilds (milliseconds) +const MIN_REBUILD_INTERVAL_MS: u64 = 1000; + +/// Block building state - grouped for atomic locking. +#[derive(Default)] +struct BlockBuildingState { + /// Slot-based entry storage + entries: Vec>, + /// TxidPrefix -> slot index + txid_prefix_to_idx: FxHashMap, + /// Recycled slot indices + free_indices: Vec, +} + +/// Mempool monitor. +/// +/// Thread-safe wrapper around `MempoolInner`. Free to clone. +#[derive(Clone, Deref)] +pub struct Mempool(Arc); + +impl Mempool { + pub fn new(client: &Client) -> Self { + Self(Arc::new(MempoolInner::new(client.clone()))) + } +} + +/// Inner mempool state and logic. +pub struct MempoolInner { + client: Client, + + // Mempool state + info: RwLock, + txs: RwLock>, + addresses: RwLock)>>, + + // Block building data (single lock for consistency) + block_state: RwLock, + + // Projected blocks snapshot + snapshot: RwLock, + + // Rate limiting + dirty: AtomicBool, + last_rebuild_ms: AtomicU64, +} + +impl MempoolInner { + pub fn new(client: Client) -> Self { + Self { + client, + info: RwLock::new(MempoolInfo::default()), + txs: RwLock::new(FxHashMap::default()), + addresses: RwLock::new(FxHashMap::default()), + block_state: RwLock::new(BlockBuildingState::default()), + snapshot: RwLock::new(ProjectedSnapshot::default()), + dirty: AtomicBool::new(false), + last_rebuild_ms: AtomicU64::new(0), + } + } + + pub fn get_info(&self) -> MempoolInfo { + self.info.read().clone() + } + + pub fn get_fees(&self) -> RecommendedFees { + self.snapshot.read().fees.clone() + } + + pub fn get_snapshot(&self) -> ProjectedSnapshot { + self.snapshot.read().clone() + } + + pub fn get_block_stats(&self) -> Vec { + self.snapshot.read().block_stats.clone() + } + + pub fn get_txs(&self) -> RwLockReadGuard<'_, FxHashMap> { + self.txs.read() + } + + pub fn get_addresses( + &self, + ) -> RwLockReadGuard<'_, FxHashMap)>> { + self.addresses.read() + } + + /// Start an infinite update loop with a 1 second interval. + pub fn start(&self) { + loop { + if let Err(e) = self.update() { + error!("Error updating mempool: {}", e); + } + thread::sleep(Duration::from_secs(1)); + } + } + + /// Sync with Bitcoin Core mempool and rebuild projections if needed. + pub fn update(&self) -> Result<()> { + // Single RPC call gets all entries with fees + let entries_info = self.client.get_raw_mempool_verbose()?; + + let current_txids: FxHashSet = entries_info.iter().map(|e| e.txid.clone()).collect(); + + // Find new txids and fetch full tx data for address tracking + let new_txs = self.fetch_new_txs(¤t_txids); + + // Apply changes using the entry info (has fees) and full txs (for addresses) + let has_changes = self.apply_changes(&entries_info, &new_txs); + + if has_changes { + self.dirty.store(true, Ordering::Release); + } + + self.rebuild_if_needed(); + + Ok(()) + } + + /// Fetch full transaction data for new txids (needed for address tracking). + fn fetch_new_txs(&self, current_txids: &FxHashSet) -> FxHashMap { + let txs = self.txs.read(); + current_txids + .iter() + .filter(|txid| !txs.contains_key(*txid)) + .take(MAX_TX_FETCHES_PER_CYCLE) + .cloned() + .collect::>() + .into_iter() + .filter_map(|txid| { + self.client + .get_mempool_transaction(&txid) + .ok() + .map(|tx| (txid, tx)) + }) + .collect() + } + + /// Apply transaction additions and removals. Returns true if there were changes. + fn apply_changes( + &self, + entries_info: &[MempoolEntryInfo], + new_txs: &FxHashMap, + ) -> bool { + // Build lookup map for current entries + let current_entries: FxHashMap = entries_info + .iter() + .map(|e| (TxidPrefix::from(&e.txid), e)) + .collect(); + + let mut info = self.info.write(); + let mut txs = self.txs.write(); + let mut addresses = self.addresses.write(); + let mut block_state = self.block_state.write(); + + let mut had_removals = false; + let had_additions = !new_txs.is_empty(); + + // Remove transactions no longer in mempool + txs.retain(|txid, tx_with_hex| { + let prefix = TxidPrefix::from(txid); + if current_entries.contains_key(&prefix) { + return true; + } + + had_removals = true; + let tx = tx_with_hex.tx(); + + info.remove(tx); + Self::update_address_stats_on_removal(tx, txid, &mut addresses); + + // Remove from slot-based storage + if let Some(idx) = block_state.txid_prefix_to_idx.remove(&prefix) { + if let Some(slot) = block_state.entries.get_mut(idx.as_usize()) { + *slot = None; + } + block_state.free_indices.push(idx); + } + + false + }); + + // Add new transactions + for (txid, tx_with_hex) in new_txs { + let tx = tx_with_hex.tx(); + let prefix = TxidPrefix::from(txid); + + // Get the entry info (has fee from Bitcoin Core) + let Some(entry_info) = current_entries.get(&prefix) else { + continue; + }; + + let entry = MempoolEntry::from_info(entry_info); + + info.add(tx); + Self::update_address_stats_on_addition(tx, txid, &mut addresses); + + // Allocate slot + let idx = if let Some(idx) = block_state.free_indices.pop() { + block_state.entries[idx.as_usize()] = Some(entry); + idx + } else { + let idx = MempoolTxIndex::from(block_state.entries.len()); + block_state.entries.push(Some(entry)); + idx + }; + + block_state.txid_prefix_to_idx.insert(prefix, idx); + } + txs.extend(new_txs.clone()); + + had_removals || had_additions + } + + /// Rebuild projected blocks if dirty and enough time has passed. + fn rebuild_if_needed(&self) { + if !self.dirty.load(Ordering::Acquire) { + return; + } + + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + + let last = self.last_rebuild_ms.load(Ordering::Acquire); + if now_ms.saturating_sub(last) < MIN_REBUILD_INTERVAL_MS { + return; + } + + // Attempt to claim the rebuild (atomic compare-exchange) + if self + .last_rebuild_ms + .compare_exchange(last, now_ms, Ordering::AcqRel, Ordering::Relaxed) + .is_err() + { + return; // Another thread is rebuilding + } + + self.dirty.store(false, Ordering::Release); + + let i = Instant::now(); + self.rebuild_projected_blocks(); + info!("mempool: rebuild_projected_blocks in {:?}", i.elapsed()); + } + + /// Rebuild projected blocks snapshot. + fn rebuild_projected_blocks(&self) { + let block_state = self.block_state.read(); + + let blocks = build_projected_blocks(&block_state.entries); + let snapshot = ProjectedSnapshot::build(blocks, &block_state.entries); + + *self.snapshot.write() = snapshot; + } + + fn update_address_stats_on_removal( + tx: &brk_types::Transaction, + txid: &Txid, + addresses: &mut FxHashMap)>, + ) { + // Inputs: undo "sending" state + tx.input + .iter() + .flat_map(|txin| txin.prevout.as_ref()) + .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) + .for_each(|(txout, bytes)| { + let (stats, set) = addresses.entry(bytes).or_default(); + set.remove(txid); + stats.sent(txout); + stats.update_tx_count(set.len() as u32); + }); + + // Outputs: undo "receiving" state + tx.output + .iter() + .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) + .for_each(|(txout, bytes)| { + let (stats, set) = addresses.entry(bytes).or_default(); + set.remove(txid); + stats.received(txout); + stats.update_tx_count(set.len() as u32); + }); + } + + fn update_address_stats_on_addition( + tx: &brk_types::Transaction, + txid: &Txid, + addresses: &mut FxHashMap)>, + ) { + // Inputs: mark as "sending" + tx.input + .iter() + .flat_map(|txin| txin.prevout.as_ref()) + .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) + .for_each(|(txout, bytes)| { + let (stats, set) = addresses.entry(bytes).or_default(); + set.insert(txid.clone()); + stats.sending(txout); + stats.update_tx_count(set.len() as u32); + }); + + // Outputs: mark as "receiving" + tx.output + .iter() + .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) + .for_each(|(txout, bytes)| { + let (stats, set) = addresses.entry(bytes).or_default(); + set.insert(txid.clone()); + stats.receiving(txout); + stats.update_tx_count(set.len() as u32); + }); + } +} diff --git a/crates/brk_monitor/src/mempool/projected_blocks.rs b/crates/brk_monitor/src/mempool/projected_blocks.rs index 1e705d5ac..c6c11d268 100644 --- a/crates/brk_monitor/src/mempool/projected_blocks.rs +++ b/crates/brk_monitor/src/mempool/projected_blocks.rs @@ -1,135 +1,137 @@ -use std::mem; +use brk_types::{FeeRate, RecommendedFees, Sats, VSize}; -use brk_types::{FeeRate, RecommendedFees, Sats, Txid, VSize}; -use rustc_hash::FxHashSet; +use super::{MempoolEntry, MempoolTxIndex, SelectedTx}; -use super::TxGraph; +/// Minimum fee rate for estimation (sat/vB) +const MIN_FEE_RATE: f64 = 1.0; -/// Maximum block weight in weight units (4 million) -const MAX_BLOCK_WEIGHT: u64 = 4_000_000; - -/// Target block vsize (weight / 4) -const BLOCK_VSIZE_TARGET: u64 = MAX_BLOCK_WEIGHT / 4; - -/// Number of projected blocks to build -const NUM_PROJECTED_BLOCKS: usize = 8; - -/// Minimum fee rate (no priority) -const MIN_FEE_RATE: f64 = 0.1; - -/// A projected future block built from mempool transactions +/// Immutable snapshot of projected blocks. +/// Stores indices into live entries + pre-computed stats. #[derive(Debug, Clone, Default)] -pub struct ProjectedBlock { - pub txids: Vec, +pub struct ProjectedSnapshot { + /// Block structure: indices into entries Vec + pub blocks: Vec>, + /// Pre-computed stats per block + pub block_stats: Vec, + /// Pre-computed fee recommendations + pub fees: RecommendedFees, +} + +/// Statistics for a single projected block. +#[derive(Debug, Clone, Default)] +pub struct BlockStats { + pub tx_count: u32, pub total_vsize: VSize, pub total_fee: Sats, - pub min_fee_rate: FeeRate, - pub max_fee_rate: FeeRate, - pub median_fee_rate: FeeRate, + /// Fee rate percentiles: [0%, 10%, 25%, 50%, 75%, 90%, 100%] + /// - fee_range[0] = min, fee_range[3] = median, fee_range[6] = max + pub fee_range: [FeeRate; 7], } -/// Projected mempool blocks for fee estimation -#[derive(Debug, Clone, Default)] -pub struct ProjectedBlocks { - pub blocks: Vec, +impl BlockStats { + pub fn min_fee_rate(&self) -> FeeRate { + self.fee_range[0] + } + + pub fn median_fee_rate(&self) -> FeeRate { + self.fee_range[3] + } + + pub fn max_fee_rate(&self) -> FeeRate { + self.fee_range[6] + } } -impl ProjectedBlocks { - /// Build projected blocks from a transaction graph - /// - /// Simulates how miners would construct blocks by selecting - /// transactions with highest ancestor fee rates first. - pub fn build(graph: &TxGraph) -> Self { - if graph.is_empty() { - return Self::default(); - } - - // Collect entries sorted by ancestor fee rate (descending) - let mut sorted: Vec<_> = graph - .entries() +impl ProjectedSnapshot { + /// Build snapshot from selected transactions (with effective fee rates) and entries. + pub fn build(blocks: Vec>, entries: &[Option]) -> Self { + let block_stats: Vec = blocks .iter() - .map(|(txid, entry)| { - ( - txid.clone(), - entry.ancestor_fee_rate(), - entry.vsize, - entry.fee, - ) - }) + .map(|selected| compute_block_stats(selected, entries)) .collect(); - sorted.sort_by(|a, b| b.1.cmp(&a.1)); + let fees = compute_recommended_fees(&block_stats); - // Build blocks greedily - let mut blocks = Vec::with_capacity(NUM_PROJECTED_BLOCKS); - let mut current_block = ProjectedBlock::default(); - let mut included: FxHashSet = FxHashSet::default(); + // Convert to just indices for storage + let blocks: Vec> = blocks + .into_iter() + .map(|selected| selected.into_iter().map(|s| s.entries_idx).collect()) + .collect(); - for (txid, fee_rate, vsize, fee) in sorted { - // Skip if already included (as part of ancestor package) - if included.contains(&txid) { - continue; - } - - // Would this tx fit in the current block? - let new_vsize = current_block.total_vsize + vsize; - - if u64::from(new_vsize) > BLOCK_VSIZE_TARGET && !current_block.txids.is_empty() { - // Finalize and store current block - Self::finalize_block(&mut current_block); - blocks.push(mem::take(&mut current_block)); - - if blocks.len() >= NUM_PROJECTED_BLOCKS { - break; - } - } - - // Add to current block - current_block.txids.push(txid.clone()); - current_block.total_vsize += vsize; - current_block.total_fee += fee; - included.insert(txid); - - // Track fee rate bounds - if current_block.max_fee_rate == FeeRate::default() { - current_block.max_fee_rate = fee_rate; - } - current_block.min_fee_rate = fee_rate; + Self { + blocks, + block_stats, + fees, } - - // Don't forget the last block - if !current_block.txids.is_empty() && blocks.len() < NUM_PROJECTED_BLOCKS { - Self::finalize_block(&mut current_block); - blocks.push(current_block); - } - - Self { blocks } - } - - /// Compute recommended fees from projected blocks - pub fn recommended_fees(&self) -> RecommendedFees { - RecommendedFees { - fastest_fee: self.fee_for_block(0), - half_hour_fee: self.fee_for_block(2), // ~3 blocks - hour_fee: self.fee_for_block(5), // ~6 blocks - economy_fee: self.fee_for_block(7), // ~8 blocks - minimum_fee: FeeRate::from(MIN_FEE_RATE), - } - } - - /// Get the minimum fee rate needed to get into block N - fn fee_for_block(&self, block_index: usize) -> FeeRate { - self.blocks - .get(block_index) - .map(|b| b.min_fee_rate) - .unwrap_or_else(|| FeeRate::from(MIN_FEE_RATE)) - } - - fn finalize_block(block: &mut ProjectedBlock) { - // Compute median fee rate from min/max as approximation - // (true median would require storing all fee rates) - let min = f64::from(block.min_fee_rate); - let max = f64::from(block.max_fee_rate); - block.median_fee_rate = FeeRate::from((min + max) / 2.0); } } + +/// Compute statistics for a single block using effective fee rates from selection time. +fn compute_block_stats(selected: &[SelectedTx], entries: &[Option]) -> BlockStats { + if selected.is_empty() { + return BlockStats::default(); + } + + let mut total_fee = Sats::default(); + let mut total_vsize = VSize::default(); + let mut fee_rates: Vec = Vec::with_capacity(selected.len()); + + for sel in selected { + if let Some(entry) = &entries[sel.entries_idx.as_usize()] { + total_fee += entry.fee; + total_vsize += entry.vsize; + // Use the effective fee rate captured at selection time + // This is the actual mining score that determined this tx's block placement + fee_rates.push(sel.effective_fee_rate); + } + } + + fee_rates.sort(); + + BlockStats { + tx_count: selected.len() as u32, + total_vsize, + total_fee, + fee_range: [ + percentile(&fee_rates, 0), + percentile(&fee_rates, 10), + percentile(&fee_rates, 25), + percentile(&fee_rates, 50), + percentile(&fee_rates, 75), + percentile(&fee_rates, 90), + percentile(&fee_rates, 100), + ], + } +} + +/// Get percentile value from sorted array. +fn percentile(sorted: &[FeeRate], p: usize) -> FeeRate { + if sorted.is_empty() { + return FeeRate::default(); + } + let idx = (p * (sorted.len() - 1)) / 100; + sorted[idx] +} + +/// Compute recommended fees from block stats (mempool.space style). +fn compute_recommended_fees(stats: &[BlockStats]) -> RecommendedFees { + RecommendedFees { + // High priority: median of block 1 + fastest_fee: median_fee_for_block(stats, 0), + // Medium priority: median of blocks 2-3 + half_hour_fee: median_fee_for_block(stats, 2), + // Low priority: median of blocks 4-6 + hour_fee: median_fee_for_block(stats, 5), + // No priority: median of later blocks + economy_fee: median_fee_for_block(stats, 7), + minimum_fee: FeeRate::from(MIN_FEE_RATE), + } +} + +/// Get the median fee rate for block N. +fn median_fee_for_block(stats: &[BlockStats], block_index: usize) -> FeeRate { + stats + .get(block_index) + .map(|s| s.median_fee_rate()) + .unwrap_or_else(|| FeeRate::from(MIN_FEE_RATE)) +} diff --git a/crates/brk_monitor/src/mempool/types.rs b/crates/brk_monitor/src/mempool/types.rs new file mode 100644 index 000000000..051d6c8d2 --- /dev/null +++ b/crates/brk_monitor/src/mempool/types.rs @@ -0,0 +1,47 @@ +/// Index into the mempool entries Vec. +/// NOT the global TxIndex for confirmed transactions. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct MempoolTxIndex(pub(crate) u32); + +impl MempoolTxIndex { + #[inline] + pub fn as_usize(self) -> usize { + self.0 as usize + } +} + +impl From for MempoolTxIndex { + #[inline] + fn from(value: usize) -> Self { + Self(value as u32) + } +} + +/// Index into the temporary pool Vec used during block building. +/// Distinct from MempoolTxIndex to prevent mixing up index spaces. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct PoolIndex(u32); + +impl PoolIndex { + #[inline] + pub fn as_usize(self) -> usize { + self.0 as usize + } +} + +impl From for PoolIndex { + #[inline] + fn from(value: usize) -> Self { + Self(value as u32) + } +} + +/// A selected transaction with its effective mining score at selection time. +/// The effective_fee_rate is the ancestor score when this tx was selected, +/// which may differ from the original ancestor score (if ancestors were already mined). +#[derive(Debug, Clone, Copy)] +pub struct SelectedTx { + pub entries_idx: MempoolTxIndex, + /// Fee rate at selection time (ancestor_fee / ancestor_vsize) + pub effective_fee_rate: brk_types::FeeRate, +} diff --git a/crates/brk_rpc/src/lib.rs b/crates/brk_rpc/src/lib.rs index ef2d5bc59..752927b3e 100644 --- a/crates/brk_rpc/src/lib.rs +++ b/crates/brk_rpc/src/lib.rs @@ -9,7 +9,10 @@ use bitcoincore_rpc::{ {Client as CoreClient, Error as RpcError, RpcApi}, }; use brk_error::Result; -use brk_types::{BlockHash, Height, Sats, Transaction, TxIn, TxOut, TxStatus, TxWithHex, Txid, Vout}; +use brk_types::{ + BlockHash, Height, MempoolEntryInfo, Sats, Transaction, TxIn, TxOut, TxStatus, TxWithHex, Txid, + Vout, +}; pub use bitcoincore_rpc::Auth; @@ -196,6 +199,24 @@ impl Client { .map_err(Into::into) } + /// Get all mempool entries with their fee data in a single RPC call + pub fn get_raw_mempool_verbose(&self) -> Result> { + let result = self.call(|c| c.get_raw_mempool_verbose())?; + Ok(result + .into_iter() + .map(|(txid, entry)| MempoolEntryInfo { + txid: txid.into(), + vsize: entry.vsize, + weight: entry.weight.unwrap_or(entry.vsize * 4), + fee: Sats::from(entry.fees.base.to_sat()), + ancestor_count: entry.ancestor_count, + ancestor_size: entry.ancestor_size, + ancestor_fee: Sats::from(entry.fees.ancestor.to_sat()), + depends: entry.depends.into_iter().map(Txid::from).collect(), + }) + .collect()) + } + pub fn get_raw_transaction<'a, T, H>( &self, txid: &'a T, diff --git a/crates/brk_types/src/lib.rs b/crates/brk_types/src/lib.rs index 9b2df6f23..97adefddc 100644 --- a/crates/brk_types/src/lib.rs +++ b/crates/brk_types/src/lib.rs @@ -43,6 +43,7 @@ mod limit; mod loadedaddressdata; mod loadedaddressindex; mod metric; +mod mempoolentryinfo; mod mempoolinfo; mod metriccount; mod metrics; @@ -151,6 +152,7 @@ pub use indexinfo::*; pub use limit::*; pub use loadedaddressdata::*; pub use loadedaddressindex::*; +pub use mempoolentryinfo::*; pub use mempoolinfo::*; pub use metric::*; pub use metriccount::*; diff --git a/crates/brk_types/src/mempoolentryinfo.rs b/crates/brk_types/src/mempoolentryinfo.rs new file mode 100644 index 000000000..017178b2f --- /dev/null +++ b/crates/brk_types/src/mempoolentryinfo.rs @@ -0,0 +1,15 @@ +use crate::{Sats, Txid}; + +/// Mempool entry info from Bitcoin Core's getrawmempool verbose +#[derive(Debug, Clone)] +pub struct MempoolEntryInfo { + pub txid: Txid, + pub vsize: u64, + pub weight: u64, + pub fee: Sats, + pub ancestor_count: u64, + pub ancestor_size: u64, + pub ancestor_fee: Sats, + /// Parent txids in the mempool + pub depends: Vec, +} diff --git a/docs/mempool-api-status.md b/docs/mempool-api-status.md new file mode 100644 index 000000000..c068c8425 --- /dev/null +++ b/docs/mempool-api-status.md @@ -0,0 +1,133 @@ +# Mempool.space API Compatibility - Implementation Status + +Plan file: `/Users/k/.claude/plans/smooth-weaving-crayon.md` + +## Completed Endpoints + +| Endpoint | Path | Notes | +|----------|------|-------| +| GET Block | `/api/block/{hash}` | | +| GET Block Height | `/api/block-height/{height}` | Returns plain text hash | +| GET Block Status | `/api/block/{hash}/status` | | +| GET Block Txids | `/api/block/{hash}/txids` | | +| GET Blocks | `/api/blocks[/:start_height]` | Last 10 blocks | +| GET Transaction | `/api/tx/{txid}` | | +| GET Tx Status | `/api/tx/{txid}/status` | | +| GET Tx Hex | `/api/tx/{txid}/hex` | Returns plain text | +| GET Address | `/api/address/{address}` | | +| GET Address Txs | `/api/address/{address}/txs` | | +| GET Address UTXOs | `/api/address/{address}/utxo` | | +| GET Mempool Info | `/api/mempool/info` | | +| GET Mempool Txids | `/api/mempool/txids` | | +| GET Recommended Fees | `/api/v1/fees/recommended` | Basic impl, needs optimization | + +## Remaining Endpoints + +### Mempool/Fees (4) + +| # | Endpoint | Path | Dependencies | Priority | +|---|----------|------|--------------|----------| +| 1 | Optimize projected blocks | - | CPFP/ancestor scores | HIGH | +| 2 | GET Mempool Blocks | `/api/v1/fees/mempool-blocks` | #1 | HIGH | +| 3 | GET Mempool Recent | `/api/mempool/recent` | | MED | +| 4 | GET RBF Replacements | `/api/v1/replacements` | RBF tracking in brk_monitor | LOW | + +### Blocks (4) + +| # | Endpoint | Path | Dependencies | Priority | +|---|----------|------|--------------|----------| +| 5 | GET Block Txs | `/api/block/{hash}/txs[/:start_index]` | | MED | +| 6 | GET Block Txid at Index | `/api/block/{hash}/txid/{index}` | | LOW | +| 7 | GET Block Raw | `/api/block/{hash}/raw` | brk_reader | LOW | +| 8 | GET Block by Timestamp | `/api/v1/mining/blocks/timestamp/{timestamp}` | Binary search | LOW | + +### Addresses (3) + +| # | Endpoint | Path | Dependencies | Priority | +|---|----------|------|--------------|----------| +| 9 | GET Address Txs Chain | `/api/address/{address}/txs/chain[/:after_txid]` | | MED | +| 10 | GET Address Txs Mempool | `/api/address/{address}/txs/mempool` | brk_monitor | MED | +| 11 | GET Validate Address | `/api/v1/validate-address/{address}` | | LOW | + +### Transactions (4) + +| # | Endpoint | Path | Dependencies | Priority | +|---|----------|------|--------------|----------| +| 12 | GET Tx Outspend | `/api/tx/{txid}/outspend/{vout}` | #27 txoutindex_to_txinindex | HIGH | +| 13 | GET Tx Outspends | `/api/tx/{txid}/outspends` | #27 | HIGH | +| 14 | GET Tx Merkle Proof | `/api/tx/{txid}/merkle-proof` | | LOW | +| 15 | POST Tx Broadcast | `/api/tx` | brk_rpc | MED | + +### General (1) + +| # | Endpoint | Path | Dependencies | Priority | +|---|----------|------|--------------|----------| +| 16 | GET Difficulty Adjustment | `/api/v1/difficulty-adjustment` | | MED | + +### Mining (9) + +| # | Endpoint | Path | Dependencies | Priority | +|---|----------|------|--------------|----------| +| 17 | GET Mining Pools | `/api/v1/mining/pools[/:timePeriod]` | #28 pool identification | LOW | +| 18 | GET Mining Pool | `/api/v1/mining/pool/{slug}` | #28 | LOW | +| 19 | GET Hashrate | `/api/v1/mining/hashrate[/:timePeriod]` | | MED | +| 20 | GET Difficulty Adjustments | `/api/v1/mining/difficulty-adjustments[/:interval]` | | LOW | +| 21 | GET Reward Stats | `/api/v1/mining/reward-stats/{blockCount}` | | LOW | +| 22 | GET Block Fees | `/api/v1/mining/blocks/fees/{timePeriod}` | | LOW | +| 23 | GET Block Rewards | `/api/v1/mining/blocks/rewards/{timePeriod}` | | LOW | +| 24 | GET Block Fee Rates | `/api/v1/mining/blocks/fee-rates/{timePeriod}` | | LOW | +| 25 | GET Block Sizes/Weights | `/api/v1/mining/blocks/sizes-weights/{timePeriod}` | | LOW | + +### Infrastructure (3) + +| # | Task | Location | Priority | +|---|------|----------|----------| +| 26 | Index txindex_to_sigop_cost | brk_indexer | MED | +| 27 | Add txoutindex_to_txinindex mapping | brk_computer/stateful | HIGH | +| 28 | Pool identification from coinbase | brk_computer | LOW | + +## Priority Order + +### Phase 1: Core Functionality (HIGH) +1. **#27** Add txoutindex_to_txinindex mapping (enables outspend lookups) +2. **#12** GET Tx Outspend +3. **#13** GET Tx Outspends +4. **#1** Optimize projected blocks (CPFP/ancestor scores) +5. **#2** GET Mempool Blocks + +### Phase 2: Essential Features (MED) +6. **#15** POST Tx Broadcast +7. **#16** GET Difficulty Adjustment +8. **#5** GET Block Txs (paginated) +9. **#9** GET Address Txs Chain +10. **#10** GET Address Txs Mempool +11. **#19** GET Hashrate +12. **#26** Index txindex_to_sigop_cost +13. **#3** GET Mempool Recent + +### Phase 3: Nice to Have (LOW) +14. **#6** GET Block Txid at Index +15. **#7** GET Block Raw +16. **#8** GET Block by Timestamp +17. **#11** GET Validate Address +18. **#14** GET Tx Merkle Proof +19. **#4** GET RBF Replacements +20. **#20** GET Difficulty Adjustments +21. **#21** GET Reward Stats +22. **#22-25** Mining block statistics +23. **#17-18** Mining pools (requires #28) +24. **#28** Pool identification + +## Design Documents + +- Mempool projected blocks: `crates/brk_monitor/src/mempool/DESIGN.md` + +## Skipped Endpoints + +| Endpoint | Reason | +|----------|--------| +| GET Price | `/api/v1/prices` | External data source needed | +| GET Historical Price | `/api/v1/historical-price` | External data source needed | +| GET Full-RBF Replacements | `/api/v1/fullrbf/replacements` | Low priority | +| Lightning endpoints | Requires separate Lightning indexing | +| Accelerator endpoints | mempool.space-specific paid service |