diff --git a/crates/brk_monitor/src/block_builder/graph.rs b/crates/brk_monitor/src/block_builder/graph.rs new file mode 100644 index 000000000..d3c18e91a --- /dev/null +++ b/crates/brk_monitor/src/block_builder/graph.rs @@ -0,0 +1,96 @@ +use std::ops::{Index, IndexMut}; + +use brk_types::TxidPrefix; +use rustc_hash::FxHashMap; + +use super::tx_node::TxNode; +use crate::mempool::Entry; +use crate::types::{PoolIndex, TxIndex}; + +/// Type-safe wrapper around Vec that only allows PoolIndex access. +pub struct Graph(Vec); + +impl Graph { + #[inline] + pub fn len(&self) -> usize { + self.0.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl Index for Graph { + type Output = TxNode; + + #[inline] + fn index(&self, idx: PoolIndex) -> &Self::Output { + &self.0[idx.as_usize()] + } +} + +impl IndexMut for Graph { + #[inline] + fn index_mut(&mut self, idx: PoolIndex) -> &mut Self::Output { + &mut self.0[idx.as_usize()] + } +} + +/// Build a dependency graph from mempool entries. +pub fn build_graph(entries: &[Option]) -> Graph { + // Collect live entries with their indices + let live: Vec<(TxIndex, &Entry)> = entries + .iter() + .enumerate() + .filter_map(|(i, opt)| opt.as_ref().map(|e| (TxIndex::from(i), e))) + .collect(); + + if live.is_empty() { + return Graph(Vec::new()); + } + + // Map TxidPrefix -> PoolIndex for parent lookups + let prefix_to_pool: FxHashMap = live + .iter() + .enumerate() + .map(|(i, (_, entry))| (entry.txid_prefix(), PoolIndex::from(i))) + .collect(); + + // Build nodes with parent relationships + let mut nodes: Vec = live + .iter() + .enumerate() + .map(|(pool_idx, (tx_index, entry))| { + let pool_index = PoolIndex::from(pool_idx); + let mut node = TxNode::new( + *tx_index, + pool_index, + entry.fee, + entry.vsize, + entry.ancestor_fee, + entry.ancestor_vsize, + ); + + // Add in-mempool parents + for parent_prefix in &entry.depends { + if let Some(&parent_pool_idx) = prefix_to_pool.get(parent_prefix) { + node.parents.push(parent_pool_idx); + } + } + + node + }) + .collect(); + + // Build child relationships (reverse of parents) + for i in 0..nodes.len() { + let parents = nodes[i].parents.clone(); + for parent_idx in parents { + nodes[parent_idx.as_usize()].children.push(PoolIndex::from(i)); + } + } + + Graph(nodes) +} diff --git a/crates/brk_monitor/src/block_builder/heap_entry.rs b/crates/brk_monitor/src/block_builder/heap_entry.rs new file mode 100644 index 000000000..14b9adfc3 --- /dev/null +++ b/crates/brk_monitor/src/block_builder/heap_entry.rs @@ -0,0 +1,71 @@ +use brk_types::{Sats, VSize}; + +use super::tx_node::TxNode; +use crate::types::PoolIndex; + +/// Entry in the priority heap for transaction selection. +/// +/// Stores a snapshot of the score at insertion time. +/// The generation field detects stale entries after ancestor updates. +#[derive(Clone, Copy)] +pub struct HeapEntry { + pub pool_index: PoolIndex, + ancestor_fee: Sats, + ancestor_vsize: VSize, + pub generation: u32, +} + +impl HeapEntry { + pub fn new(node: &TxNode) -> Self { + Self { + pool_index: node.pool_index, + ancestor_fee: node.ancestor_fee, + ancestor_vsize: node.ancestor_vsize, + generation: node.generation, + } + } + + /// Returns true if this entry is outdated. + #[inline] + pub fn is_stale(&self, node: &TxNode) -> bool { + self.generation != node.generation + } + + /// Compare fee rates: self > other? + #[inline] + fn has_higher_fee_rate_than(&self, other: &Self) -> bool { + // Cross multiply to avoid division: + // fee_a/vsize_a > fee_b/vsize_b ⟺ fee_a * vsize_b > fee_b * vsize_a + let self_score = u64::from(self.ancestor_fee) as u128 * u64::from(other.ancestor_vsize) as u128; + let other_score = u64::from(other.ancestor_fee) as u128 * u64::from(self.ancestor_vsize) as u128; + self_score > other_score + } +} + +impl PartialEq for HeapEntry { + fn eq(&self, other: &Self) -> bool { + self.pool_index == other.pool_index + } +} + +impl Eq for HeapEntry {} + +impl PartialOrd for HeapEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for HeapEntry { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Higher fee rate = higher priority + if self.has_higher_fee_rate_than(other) { + std::cmp::Ordering::Greater + } else if other.has_higher_fee_rate_than(self) { + std::cmp::Ordering::Less + } else { + // Tiebreaker: lower index first (deterministic) + other.pool_index.cmp(&self.pool_index) + } + } +} diff --git a/crates/brk_monitor/src/block_builder/mod.rs b/crates/brk_monitor/src/block_builder/mod.rs new file mode 100644 index 000000000..e333b7890 --- /dev/null +++ b/crates/brk_monitor/src/block_builder/mod.rs @@ -0,0 +1,41 @@ +//! Builds projected blocks from mempool transactions. +//! +//! The algorithm: +//! 1. Build a dependency graph from mempool entries +//! 2. Select transactions using a heap (CPFP-aware) +//! 3. Group into atomic packages (parent + child stay together) +//! 4. Partition packages into blocks by fee rate + +mod graph; +mod heap_entry; +mod package; +mod partitioner; +mod selector; +mod tx_node; + +use crate::mempool::Entry; +use crate::types::SelectedTx; + +/// Target vsize per block (~1MB, derived from 4MW weight limit). +const BLOCK_VSIZE: u64 = 1_000_000; + +/// Number of projected blocks to build. +const NUM_BLOCKS: usize = 8; + +/// Build projected blocks from mempool entries. +/// +/// Returns transactions grouped by projected block, sorted by fee rate. +pub fn build_projected_blocks(entries: &[Option]) -> Vec> { + // Build dependency graph + let mut graph = graph::build_graph(entries); + + if graph.is_empty() { + return Vec::new(); + } + + // Select transactions into packages + let packages = selector::select_packages(&mut graph, NUM_BLOCKS); + + // Partition packages into blocks + partitioner::partition_into_blocks(packages, NUM_BLOCKS) +} diff --git a/crates/brk_monitor/src/block_builder/package.rs b/crates/brk_monitor/src/block_builder/package.rs new file mode 100644 index 000000000..c84939adb --- /dev/null +++ b/crates/brk_monitor/src/block_builder/package.rs @@ -0,0 +1,36 @@ +use brk_types::FeeRate; + +use crate::types::{SelectedTx, TxIndex}; + +/// A CPFP package - transactions that must be included together. +/// +/// When a child pays for its parent (CPFP), both must be in the same block. +/// The package fee rate is the combined rate of all transactions. +pub struct Package { + /// Transactions in topological order (parents before children) + pub txs: Vec, + + /// Combined vsize of all transactions + pub vsize: u64, + + /// Package fee rate + pub fee_rate: FeeRate, +} + +impl Package { + pub fn new(fee_rate: FeeRate) -> Self { + Self { + txs: Vec::new(), + vsize: 0, + fee_rate, + } + } + + pub fn add_tx(&mut self, tx_index: TxIndex, vsize: u64) { + self.txs.push(SelectedTx { + tx_index, + effective_fee_rate: self.fee_rate, + }); + self.vsize += vsize; + } +} diff --git a/crates/brk_monitor/src/block_builder/partitioner.rs b/crates/brk_monitor/src/block_builder/partitioner.rs new file mode 100644 index 000000000..bdc10739d --- /dev/null +++ b/crates/brk_monitor/src/block_builder/partitioner.rs @@ -0,0 +1,99 @@ +use super::package::Package; +use super::BLOCK_VSIZE; +use crate::types::SelectedTx; + +/// How many packages to look ahead when current doesn't fit. +const LOOK_AHEAD: usize = 100; + +/// Partition packages into blocks by fee rate. +/// +/// Packages are sorted by fee rate descending, then placed into blocks. +/// When a package doesn't fit, we look ahead for smaller packages that do. +/// Atomic packages are never split across blocks. +pub fn partition_into_blocks(mut packages: Vec, num_blocks: usize) -> Vec> { + packages.sort_unstable_by(|a, b| b.fee_rate.cmp(&a.fee_rate)); + + let mut blocks: Vec> = Vec::with_capacity(num_blocks); + let mut current_block: Vec = Vec::new(); + let mut current_vsize: u64 = 0; + let mut used = vec![false; packages.len()]; + + let mut idx = 0; + while idx < packages.len() && blocks.len() < num_blocks { + if used[idx] { + idx += 1; + continue; + } + + let remaining_space = BLOCK_VSIZE.saturating_sub(current_vsize); + let package = &packages[idx]; + + if package.vsize <= remaining_space { + current_block.extend(package.txs.iter().copied()); + current_vsize += package.vsize; + used[idx] = true; + idx += 1; + continue; + } + + // Package doesn't fit + if current_block.is_empty() { + // Empty block: add oversized package anyway + current_block.extend(package.txs.iter().copied()); + current_vsize += package.vsize; + used[idx] = true; + idx += 1; + continue; + } + + // Look ahead for a smaller package that fits + let found_smaller = try_fill_with_smaller( + &packages, + &mut used, + idx, + remaining_space, + &mut current_block, + &mut current_vsize, + ); + + if !found_smaller { + // No package fits, finalize current block + blocks.push(std::mem::take(&mut current_block)); + current_vsize = 0; + } + } + + if !current_block.is_empty() && blocks.len() < num_blocks { + blocks.push(current_block); + } + + blocks +} + +/// Try to find a smaller package in the look-ahead window that fits. +fn try_fill_with_smaller( + packages: &[Package], + used: &mut [bool], + start: usize, + remaining_space: u64, + block: &mut Vec, + block_vsize: &mut u64, +) -> bool { + let end = (start + LOOK_AHEAD).min(packages.len()); + + for idx in (start + 1)..end { + if used[idx] { + continue; + } + + let package = &packages[idx]; + if package.vsize <= remaining_space { + block.extend(package.txs.iter().copied()); + *block_vsize += package.vsize; + used[idx] = true; + return true; + } + } + + false +} diff --git a/crates/brk_monitor/src/block_builder/selector.rs b/crates/brk_monitor/src/block_builder/selector.rs new file mode 100644 index 000000000..c549750a5 --- /dev/null +++ b/crates/brk_monitor/src/block_builder/selector.rs @@ -0,0 +1,111 @@ +use std::collections::BinaryHeap; + +use brk_types::FeeRate; +use rustc_hash::FxHashSet; + +use super::graph::Graph; +use super::heap_entry::HeapEntry; +use super::package::Package; +use super::BLOCK_VSIZE; +use crate::types::PoolIndex; + +/// Select transactions from the graph and group into CPFP packages. +pub fn select_packages(graph: &mut Graph, num_blocks: usize) -> Vec { + let target_vsize = BLOCK_VSIZE * num_blocks as u64; + let mut total_vsize: u64 = 0; + let mut packages: Vec = Vec::new(); + + // Initialize heap with all transactions + let mut heap: BinaryHeap = (0..graph.len()) + .map(|i| HeapEntry::new(&graph[PoolIndex::from(i)])) + .collect(); + + while let Some(entry) = heap.pop() { + let node = &graph[entry.pool_index]; + + // Skip if already selected or entry is stale + if node.selected || entry.is_stale(node) { + continue; + } + + // Package fee rate at selection time + let package_rate = FeeRate::from((node.ancestor_fee, node.ancestor_vsize)); + + // Select this tx and all unselected ancestors (parents first) + let ancestors = select_with_ancestors(graph, entry.pool_index); + + let mut package = Package::new(package_rate); + for pool_idx in ancestors { + let vsize = u64::from(graph[pool_idx].vsize); + package.add_tx(graph[pool_idx].tx_index, vsize); + update_descendants(graph, pool_idx, &mut heap); + } + + total_vsize += package.vsize; + packages.push(package); + + if total_vsize >= target_vsize { + break; + } + } + + packages +} + +/// Select a tx and all its unselected ancestors in topological order. +fn select_with_ancestors(graph: &mut Graph, pool_idx: PoolIndex) -> Vec { + let mut result: Vec = Vec::new(); + let mut stack: Vec<(PoolIndex, bool)> = vec![(pool_idx, false)]; + + while let Some((idx, parents_done)) = stack.pop() { + if graph[idx].selected { + continue; + } + + if parents_done { + graph[idx].selected = true; + result.push(idx); + } else { + stack.push((idx, true)); + for &parent in &graph[idx].parents { + if !graph[parent].selected { + stack.push((parent, false)); + } + } + } + } + + result +} + +/// Update descendants' ancestor scores after selecting a tx. +fn update_descendants(graph: &mut Graph, selected_idx: PoolIndex, heap: &mut BinaryHeap) { + let selected_fee = graph[selected_idx].fee; + let selected_vsize = graph[selected_idx].vsize; + + // Track visited to avoid double-updates in diamond patterns + let mut visited: FxHashSet = FxHashSet::default(); + let mut stack: Vec = graph[selected_idx].children.to_vec(); + + while let Some(child_idx) = stack.pop() { + if !visited.insert(child_idx) { + continue; + } + + let child = &mut graph[child_idx]; + if child.selected { + continue; + } + + // Update ancestor totals + child.ancestor_fee -= selected_fee; + child.ancestor_vsize -= selected_vsize; + + // Increment generation and re-push to heap + child.generation += 1; + heap.push(HeapEntry::new(child)); + + // Continue to grandchildren + stack.extend(child.children.iter().copied()); + } +} diff --git a/crates/brk_monitor/src/block_builder/tx_node.rs b/crates/brk_monitor/src/block_builder/tx_node.rs new file mode 100644 index 000000000..28eb56056 --- /dev/null +++ b/crates/brk_monitor/src/block_builder/tx_node.rs @@ -0,0 +1,63 @@ +use brk_types::{Sats, VSize}; +use smallvec::SmallVec; + +use crate::types::{PoolIndex, TxIndex}; + +/// A transaction node in the dependency graph. +/// +/// Created fresh for each block building cycle, then discarded. +pub struct TxNode { + /// Index into mempool entries (for final output) + pub tx_index: TxIndex, + + /// Index in the graph pool + pub pool_index: PoolIndex, + + /// Transaction fee + pub fee: Sats, + + /// Transaction virtual size + pub vsize: VSize, + + /// Parent transactions (dependencies) + pub parents: SmallVec<[PoolIndex; 4]>, + + /// Child transactions (dependents) + pub children: SmallVec<[PoolIndex; 8]>, + + /// Cumulative fee (self + all ancestors) + pub ancestor_fee: Sats, + + /// Cumulative vsize (self + all ancestors) + pub ancestor_vsize: VSize, + + /// Whether this tx has been selected + pub selected: bool, + + /// Generation counter for heap staleness detection + pub generation: u32, +} + +impl TxNode { + pub fn new( + tx_index: TxIndex, + pool_index: PoolIndex, + fee: Sats, + vsize: VSize, + ancestor_fee: Sats, + ancestor_vsize: VSize, + ) -> Self { + Self { + tx_index, + pool_index, + fee, + vsize, + parents: SmallVec::new(), + children: SmallVec::new(), + ancestor_fee, + ancestor_vsize, + selected: false, + generation: 0, + } + } +} diff --git a/crates/brk_monitor/src/lib.rs b/crates/brk_monitor/src/lib.rs index 7c39c3954..de567c624 100644 --- a/crates/brk_monitor/src/lib.rs +++ b/crates/brk_monitor/src/lib.rs @@ -5,6 +5,10 @@ //! - Address mempool stats //! - CPFP-aware block building +mod block_builder; mod mempool; +mod projected_blocks; +mod types; -pub use mempool::{BlockStats, Mempool, MempoolInner, ProjectedSnapshot}; +pub use mempool::{Mempool, MempoolInner}; +pub use projected_blocks::{BlockStats, RecommendedFees, Snapshot}; diff --git a/crates/brk_monitor/src/mempool/addresses.rs b/crates/brk_monitor/src/mempool/addresses.rs new file mode 100644 index 000000000..b4886fd22 --- /dev/null +++ b/crates/brk_monitor/src/mempool/addresses.rs @@ -0,0 +1,70 @@ +use std::ops::Deref; + +use brk_types::{AddressBytes, AddressMempoolStats, Transaction, Txid}; +use rustc_hash::{FxHashMap, FxHashSet}; + +/// Per-address stats with associated transaction set. +pub type AddressStats = (AddressMempoolStats, FxHashSet); + +/// Tracks per-address mempool statistics. +#[derive(Default)] +pub struct AddressTracker(FxHashMap); + +impl Deref for AddressTracker { + type Target = FxHashMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl AddressTracker { + /// Add a transaction to address tracking. + pub fn add_tx(&mut self, tx: &Transaction, txid: &Txid) { + self.update(tx, txid, true); + } + + /// Remove a transaction from address tracking. + pub fn remove_tx(&mut self, tx: &Transaction, txid: &Txid) { + self.update(tx, txid, false); + } + + fn update(&mut self, tx: &Transaction, txid: &Txid, is_addition: bool) { + // Inputs: track sending + for txin in &tx.input { + let Some(prevout) = txin.prevout.as_ref() else { + continue; + }; + let Some(bytes) = prevout.address_bytes() else { + continue; + }; + + let (stats, txids) = self.0.entry(bytes).or_default(); + if is_addition { + txids.insert(txid.clone()); + stats.sending(prevout); + } else { + txids.remove(txid); + stats.sent(prevout); + } + stats.update_tx_count(txids.len() as u32); + } + + // Outputs: track receiving + for txout in &tx.output { + let Some(bytes) = txout.address_bytes() else { + continue; + }; + + let (stats, txids) = self.0.entry(bytes).or_default(); + if is_addition { + txids.insert(txid.clone()); + stats.receiving(txout); + } else { + txids.remove(txid); + stats.received(txout); + } + stats.update_tx_count(txids.len() as u32); + } + } +} diff --git a/crates/brk_monitor/src/mempool/block_builder/audit.rs b/crates/brk_monitor/src/mempool/block_builder/audit.rs deleted file mode 100644 index c90233e61..000000000 --- a/crates/brk_monitor/src/mempool/block_builder/audit.rs +++ /dev/null @@ -1,160 +0,0 @@ -use std::ops::{Index, IndexMut}; - -use brk_types::{Sats, VSize}; -use smallvec::SmallVec; - -use crate::mempool::{MempoolTxIndex, PoolIndex}; - -/// Compare ancestor fee rates using cross-multiplication (avoids f64 division). -/// Returns true if (fee_a / vsize_a) > (fee_b / vsize_b). -#[inline] -fn has_higher_fee_rate(fee_a: Sats, vsize_a: VSize, fee_b: Sats, vsize_b: VSize) -> bool { - // Cross multiply: fee_a/vsize_a > fee_b/vsize_b ⟺ fee_a * vsize_b > fee_b * vsize_a - let score_a = u64::from(fee_a) as u128 * u64::from(vsize_b) as u128; - let score_b = u64::from(fee_b) as u128 * u64::from(vsize_a) as u128; - score_a > score_b -} - -/// Type-safe wrapper around Vec that only allows PoolIndex access. -pub struct Pool(Vec); - -impl Pool { - pub fn new(txs: Vec) -> Self { - Self(txs) - } - - #[inline] - pub fn len(&self) -> usize { - self.0.len() - } -} - -impl Index for Pool { - type Output = AuditTx; - - #[inline] - fn index(&self, idx: PoolIndex) -> &Self::Output { - &self.0[idx.as_usize()] - } -} - -impl IndexMut for Pool { - #[inline] - fn index_mut(&mut self, idx: PoolIndex) -> &mut Self::Output { - &mut self.0[idx.as_usize()] - } -} - -/// Lightweight transaction for block building. -/// Created fresh each rebuild, discarded after. -pub struct AuditTx { - /// Original entries index (for final output) - pub entries_idx: MempoolTxIndex, - /// Pool index (for internal graph traversal) - pub pool_idx: PoolIndex, - pub fee: Sats, - pub vsize: VSize, - /// In-mempool parent pool indices - pub parents: SmallVec<[PoolIndex; 4]>, - /// In-mempool child pool indices - pub children: SmallVec<[PoolIndex; 8]>, - /// Cumulative fee (self + all ancestors) - pub ancestor_fee: Sats, - /// Cumulative vsize (self + all ancestors) - pub ancestor_vsize: VSize, - /// Already selected into a block - pub used: bool, - /// Generation counter for invalidating stale heap entries - pub generation: u32, -} - -impl AuditTx { - /// Create AuditTx with pre-computed ancestor values from Bitcoin Core. - pub fn new_with_ancestors( - entries_idx: MempoolTxIndex, - pool_idx: PoolIndex, - fee: Sats, - vsize: VSize, - ancestor_fee: Sats, - ancestor_vsize: VSize, - ) -> Self { - Self { - entries_idx, - pool_idx, - fee, - vsize, - parents: SmallVec::new(), - children: SmallVec::new(), - ancestor_fee, - ancestor_vsize, - used: false, - generation: 0, - } - } -} - -/// Priority queue entry. Stores snapshot of score at insertion time. -#[derive(Clone, Copy)] -pub struct TxPriority { - pub pool_idx: PoolIndex, - /// Score snapshot for heap ordering - ancestor_fee: Sats, - ancestor_vsize: VSize, - /// Generation at insertion (detects stale entries) - pub generation: u32, -} - -impl TxPriority { - pub fn new(tx: &AuditTx) -> Self { - Self { - pool_idx: tx.pool_idx, - ancestor_fee: tx.ancestor_fee, - ancestor_vsize: tx.ancestor_vsize, - generation: tx.generation, - } - } - - /// Check if this entry is stale (tx was updated since insertion). - #[inline] - pub fn is_stale(&self, tx: &AuditTx) -> bool { - self.generation != tx.generation - } - - #[inline] - fn has_higher_score_than(&self, other: &Self) -> bool { - has_higher_fee_rate( - self.ancestor_fee, - self.ancestor_vsize, - other.ancestor_fee, - other.ancestor_vsize, - ) - } -} - -impl PartialEq for TxPriority { - fn eq(&self, other: &Self) -> bool { - self.pool_idx == other.pool_idx - } -} - -impl Eq for TxPriority {} - -impl PartialOrd for TxPriority { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for TxPriority { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // Higher score = higher priority (for max-heap) - if self.has_higher_score_than(other) { - std::cmp::Ordering::Greater - } else if other.has_higher_score_than(self) { - std::cmp::Ordering::Less - } else { - // Tiebreaker: lower index first (deterministic) - other.pool_idx.cmp(&self.pool_idx) - } - } -} diff --git a/crates/brk_monitor/src/mempool/block_builder/build.rs b/crates/brk_monitor/src/mempool/block_builder/build.rs deleted file mode 100644 index 94e539933..000000000 --- a/crates/brk_monitor/src/mempool/block_builder/build.rs +++ /dev/null @@ -1,71 +0,0 @@ -use brk_types::TxidPrefix; -use rustc_hash::FxHashMap; - -use super::audit::{AuditTx, Pool}; -use super::selection::select_into_blocks; -use crate::mempool::{MempoolEntry, MempoolTxIndex, PoolIndex, SelectedTx}; - -/// Number of projected blocks to build -const NUM_PROJECTED_BLOCKS: usize = 8; - -/// Build projected blocks from mempool entries. -pub fn build_projected_blocks(entries: &[Option]) -> Vec> { - let live: Vec<(MempoolTxIndex, &MempoolEntry)> = entries - .iter() - .enumerate() - .filter_map(|(i, opt)| opt.as_ref().map(|e| (MempoolTxIndex::from(i), e))) - .collect(); - - if live.is_empty() { - return Vec::new(); - } - - let mut pool = Pool::new(build_audit_pool(&live)); - select_into_blocks(&mut pool, NUM_PROJECTED_BLOCKS) -} - -/// Build the AuditTx pool with parent/child relationships. -fn build_audit_pool(live: &[(MempoolTxIndex, &MempoolEntry)]) -> Vec { - // Map TxidPrefix -> pool index - let prefix_to_idx: FxHashMap = live - .iter() - .enumerate() - .map(|(i, (_, entry))| (entry.txid_prefix(), PoolIndex::from(i))) - .collect(); - - // Build pool with parent relationships - let mut pool: Vec = live - .iter() - .enumerate() - .map(|(pool_idx, (entries_idx, entry))| { - let pool_idx = PoolIndex::from(pool_idx); - let mut tx = AuditTx::new_with_ancestors( - *entries_idx, - pool_idx, - entry.fee, - entry.vsize, - entry.ancestor_fee, - entry.ancestor_vsize, - ); - - // Add in-mempool parents - for parent_prefix in &entry.depends { - if let Some(&parent_idx) = prefix_to_idx.get(parent_prefix) { - tx.parents.push(parent_idx); - } - } - - tx - }) - .collect(); - - // Build child relationships (reverse of parents) - for i in 0..pool.len() { - let parents = pool[i].parents.clone(); - for parent_idx in parents { - pool[parent_idx.as_usize()].children.push(PoolIndex::from(i)); - } - } - - pool -} diff --git a/crates/brk_monitor/src/mempool/block_builder/mod.rs b/crates/brk_monitor/src/mempool/block_builder/mod.rs deleted file mode 100644 index 2cf327216..000000000 --- a/crates/brk_monitor/src/mempool/block_builder/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod audit; -mod build; -mod selection; - -pub use build::build_projected_blocks; diff --git a/crates/brk_monitor/src/mempool/block_builder/selection.rs b/crates/brk_monitor/src/mempool/block_builder/selection.rs deleted file mode 100644 index 46ea7cf6b..000000000 --- a/crates/brk_monitor/src/mempool/block_builder/selection.rs +++ /dev/null @@ -1,271 +0,0 @@ -use std::collections::BinaryHeap; - -use brk_types::FeeRate; -use rustc_hash::FxHashSet; - -use super::audit::{Pool, TxPriority}; -use crate::mempool::{MempoolTxIndex, PoolIndex, SelectedTx}; - -/// Target vsize per block (~1MB, derived from 4MW weight limit) -const BLOCK_VSIZE_LIMIT: u64 = 1_000_000; - -/// How many packages to look ahead when current doesn't fit -const LOOK_AHEAD: usize = 100; - -/// A CPFP package - atomic unit that must be included together. -struct Package { - /// Transactions in topological order (parents before children) - txs: Vec<(MempoolTxIndex, FeeRate)>, - /// Combined vsize of all txs - vsize: u64, - /// Package fee rate (same for all txs) - fee_rate: FeeRate, -} - -/// Select transactions into projected blocks. -/// -/// Algorithm: -/// 1. Select txs via heap, grouping CPFP chains into atomic packages -/// 2. Sort packages by fee rate -/// 3. Partition into blocks (packages are atomic, never split) -pub fn select_into_blocks(pool: &mut Pool, num_blocks: usize) -> Vec> { - // Phase 1: Select and group into packages - let packages = select_packages(pool, num_blocks); - - // Phase 2: Partition packages into blocks - partition_into_blocks(packages, num_blocks) -} - -/// Select txs and group CPFP chains into atomic packages. -fn select_packages(pool: &mut Pool, num_blocks: usize) -> Vec { - let target_vsize = BLOCK_VSIZE_LIMIT * num_blocks as u64; - let mut total_vsize: u64 = 0; - let mut packages: Vec = Vec::new(); - - let mut heap: BinaryHeap = (0..pool.len()) - .map(|i| TxPriority::new(&pool[PoolIndex::from(i)])) - .collect(); - - while let Some(entry) = heap.pop() { - let tx = &pool[entry.pool_idx]; - - if tx.used || entry.is_stale(tx) { - continue; - } - - // Package rate at selection time - let package_rate = FeeRate::from((tx.ancestor_fee, tx.ancestor_vsize)); - - // Select this tx and all unselected ancestors (parents first) - let ancestors = select_with_ancestors(pool, entry.pool_idx); - - let mut package_vsize: u64 = 0; - let mut txs = Vec::with_capacity(ancestors.len()); - - for sel_idx in ancestors { - let vsize = u64::from(pool[sel_idx].vsize); - txs.push((pool[sel_idx].entries_idx, package_rate)); - package_vsize += vsize; - - update_descendants(pool, sel_idx, &mut heap); - } - - total_vsize += package_vsize; - packages.push(Package { - txs, - vsize: package_vsize, - fee_rate: package_rate, - }); - - if total_vsize >= target_vsize { - break; - } - } - - packages -} - -/// Sort packages by fee rate and partition into blocks. -fn partition_into_blocks(mut packages: Vec, num_blocks: usize) -> Vec> { - // Sort by fee rate descending - packages.sort_unstable_by(|a, b| b.fee_rate.cmp(&a.fee_rate)); - - // Debug: show top and bottom packages after sorting - log::info!("=== Top 10 packages after sorting ==="); - for (i, pkg) in packages.iter().take(10).enumerate() { - log::info!( - " #{}: rate={:.4} sat/vB, vsize={}, txs={}", - i, - f64::from(pkg.fee_rate), - pkg.vsize, - pkg.txs.len() - ); - } - log::info!("=== Bottom 10 packages after sorting ==="); - let start = packages.len().saturating_sub(10); - for (i, pkg) in packages.iter().skip(start).enumerate() { - log::info!( - " #{}: rate={:.4} sat/vB, vsize={}, txs={}", - start + i, - f64::from(pkg.fee_rate), - pkg.vsize, - pkg.txs.len() - ); - } - - let mut blocks: Vec> = Vec::with_capacity(num_blocks); - let mut current_block: Vec = Vec::new(); - let mut current_block_packages: Vec<(FeeRate, u64, usize)> = Vec::new(); // for debug - let mut current_vsize: u64 = 0; - let mut used = vec![false; packages.len()]; - - let mut i = 0; - while i < packages.len() && blocks.len() < num_blocks { - if used[i] { - i += 1; - continue; - } - - let remaining = BLOCK_VSIZE_LIMIT.saturating_sub(current_vsize); - - if packages[i].vsize <= remaining { - // Package fits in current block - current_block_packages.push((packages[i].fee_rate, packages[i].vsize, packages[i].txs.len())); - add_package_to_block(&packages[i], &mut current_block); - current_vsize += packages[i].vsize; - used[i] = true; - i += 1; - } else if current_block.is_empty() { - // Empty block - add package anyway (handles edge case) - current_block_packages.push((packages[i].fee_rate, packages[i].vsize, packages[i].txs.len())); - add_package_to_block(&packages[i], &mut current_block); - current_vsize += packages[i].vsize; - used[i] = true; - i += 1; - } else { - // Look ahead for a smaller package that fits - let mut found = false; - let look_ahead_end = (i + LOOK_AHEAD).min(packages.len()); - - for j in (i + 1)..look_ahead_end { - if used[j] || packages[j].vsize > remaining { - continue; - } - log::debug!( - "Look-ahead: adding pkg #{} (rate={:.4}) to block {} (min so far={:.4})", - j, - f64::from(packages[j].fee_rate), - blocks.len() + 1, - current_block_packages.iter().map(|(r, _, _)| f64::from(*r)).fold(f64::INFINITY, f64::min) - ); - current_block_packages.push((packages[j].fee_rate, packages[j].vsize, packages[j].txs.len())); - add_package_to_block(&packages[j], &mut current_block); - current_vsize += packages[j].vsize; - used[j] = true; - found = true; - break; - } - - if !found { - // No package fits, start new block - log_block_debug(blocks.len() + 1, ¤t_block_packages); - blocks.push(std::mem::take(&mut current_block)); - current_block_packages.clear(); - current_vsize = 0; - } - } - } - - if !current_block.is_empty() && blocks.len() < num_blocks { - log_block_debug(blocks.len() + 1, ¤t_block_packages); - blocks.push(current_block); - } - - blocks -} - -fn log_block_debug(block_num: usize, packages: &[(FeeRate, u64, usize)]) { - if packages.is_empty() { - return; - } - let mut sorted: Vec<_> = packages.to_vec(); - sorted.sort_by(|a, b| b.0.cmp(&a.0)); - - log::info!("=== Block {} - {} packages ===", block_num, packages.len()); - log::info!(" Top 5:"); - for (rate, vsize, txs) in sorted.iter().take(5) { - log::info!(" rate={:.4} sat/vB, vsize={}, txs={}", f64::from(*rate), vsize, txs); - } - log::info!(" Bottom 5:"); - let start = sorted.len().saturating_sub(5); - for (rate, vsize, txs) in sorted.iter().skip(start) { - log::info!(" rate={:.4} sat/vB, vsize={}, txs={}", f64::from(*rate), vsize, txs); - } -} - -fn add_package_to_block(package: &Package, block: &mut Vec) { - for (entries_idx, effective_fee_rate) in &package.txs { - block.push(SelectedTx { - entries_idx: *entries_idx, - effective_fee_rate: *effective_fee_rate, - }); - } -} - -/// Select a tx and all its unselected ancestors in topological order. -fn select_with_ancestors(pool: &mut Pool, pool_idx: PoolIndex) -> Vec { - let mut result: Vec = Vec::new(); - let mut stack: Vec<(PoolIndex, bool)> = vec![(pool_idx, false)]; - - while let Some((idx, parents_done)) = stack.pop() { - if pool[idx].used { - continue; - } - - if parents_done { - pool[idx].used = true; - result.push(idx); - } else { - stack.push((idx, true)); - for &parent in &pool[idx].parents { - if !pool[parent].used { - stack.push((parent, false)); - } - } - } - } - - result -} - -/// Update descendants' ancestor scores after selecting a tx. -fn update_descendants(pool: &mut Pool, selected_idx: PoolIndex, heap: &mut BinaryHeap) { - let selected_fee = pool[selected_idx].fee; - let selected_vsize = pool[selected_idx].vsize; - - // Track visited to avoid double-updates in diamond patterns - let mut visited: FxHashSet = FxHashSet::default(); - let mut stack: Vec = pool[selected_idx].children.to_vec(); - - while let Some(child_idx) = stack.pop() { - if !visited.insert(child_idx) { - continue; - } - - let child = &mut pool[child_idx]; - if child.used { - continue; - } - - // Update ancestor totals - child.ancestor_fee -= selected_fee; - child.ancestor_vsize -= selected_vsize; - - // Increment generation and re-push to heap - child.generation += 1; - heap.push(TxPriority::new(child)); - - // Continue to grandchildren - stack.extend(child.children.iter().copied()); - } -} diff --git a/crates/brk_monitor/src/mempool/entry.rs b/crates/brk_monitor/src/mempool/entry.rs index 5b3b3768c..ad525a370 100644 --- a/crates/brk_monitor/src/mempool/entry.rs +++ b/crates/brk_monitor/src/mempool/entry.rs @@ -5,7 +5,7 @@ use brk_types::{FeeRate, MempoolEntryInfo, Sats, Txid, TxidPrefix, VSize}; /// Stores only the data needed for fee estimation and block building. /// Ancestor values are pre-computed by Bitcoin Core (correctly handling shared ancestors). #[derive(Debug, Clone)] -pub struct MempoolEntry { +pub struct Entry { pub txid: Txid, pub fee: Sats, pub vsize: VSize, @@ -17,7 +17,7 @@ pub struct MempoolEntry { pub depends: Vec, } -impl MempoolEntry { +impl Entry { pub fn from_info(info: &MempoolEntryInfo) -> Self { Self { txid: info.txid.clone(), @@ -34,18 +34,16 @@ impl MempoolEntry { FeeRate::from((self.fee, self.vsize)) } - /// Ancestor fee rate (package rate for CPFP) + /// Ancestor fee rate (package rate for CPFP). #[inline] pub fn ancestor_fee_rate(&self) -> FeeRate { FeeRate::from((self.ancestor_fee, self.ancestor_vsize)) } - /// Effective fee rate for display - the rate that justified this tx's inclusion. - /// For CPFP parents, this is their ancestor_fee_rate (child paying for them). - /// For regular txs, this is their own fee_rate. + /// Effective fee rate for display. #[inline] pub fn effective_fee_rate(&self) -> FeeRate { - std::cmp::max(self.fee_rate(), self.ancestor_fee_rate()) + self.fee_rate().max(self.ancestor_fee_rate()) } #[inline] diff --git a/crates/brk_monitor/src/mempool/mod.rs b/crates/brk_monitor/src/mempool/mod.rs index cb670de27..acbd315ae 100644 --- a/crates/brk_monitor/src/mempool/mod.rs +++ b/crates/brk_monitor/src/mempool/mod.rs @@ -1,13 +1,6 @@ -mod block_builder; +mod addresses; mod entry; mod monitor; -mod projected_blocks; -mod types; -// Public API +pub use entry::Entry; pub use monitor::{Mempool, MempoolInner}; -pub use projected_blocks::{BlockStats, ProjectedSnapshot}; - -// Crate-internal (used by submodules) -pub(crate) use entry::MempoolEntry; -pub(crate) use types::{MempoolTxIndex, PoolIndex, SelectedTx}; diff --git a/crates/brk_monitor/src/mempool/monitor.rs b/crates/brk_monitor/src/mempool/monitor.rs index 6729d2af0..73fb4d0a4 100644 --- a/crates/brk_monitor/src/mempool/monitor.rs +++ b/crates/brk_monitor/src/mempool/monitor.rs @@ -9,35 +9,33 @@ use std::{ use brk_error::Result; use brk_rpc::Client; -use brk_types::{ - AddressBytes, AddressMempoolStats, MempoolEntryInfo, MempoolInfo, RecommendedFees, TxWithHex, - Txid, TxidPrefix, -}; +use brk_types::{MempoolEntryInfo, MempoolInfo, TxWithHex, Txid, TxidPrefix}; use derive_deref::Deref; use log::{error, info}; use parking_lot::{RwLock, RwLockReadGuard}; use rustc_hash::{FxHashMap, FxHashSet}; -use super::block_builder::build_projected_blocks; -use super::entry::MempoolEntry; -use super::projected_blocks::{BlockStats, ProjectedSnapshot}; -use super::types::MempoolTxIndex; +use super::addresses::AddressTracker; +use super::entry::Entry; +use crate::block_builder::build_projected_blocks; +use crate::projected_blocks::{BlockStats, RecommendedFees, Snapshot}; +use crate::types::TxIndex; -/// Max new txs to fetch full data for per update cycle (for address tracking) +/// Max new txs to fetch full data for per update cycle (for address tracking). const MAX_TX_FETCHES_PER_CYCLE: usize = 10_000; -/// Minimum interval between rebuilds (milliseconds) +/// Minimum interval between rebuilds (milliseconds). const MIN_REBUILD_INTERVAL_MS: u64 = 1000; /// Block building state - grouped for atomic locking. #[derive(Default)] struct BlockBuildingState { /// Slot-based entry storage - entries: Vec>, + entries: Vec>, /// TxidPrefix -> slot index - txid_prefix_to_idx: FxHashMap, + txid_prefix_to_idx: FxHashMap, /// Recycled slot indices - free_indices: Vec, + free_indices: Vec, } /// Mempool monitor. @@ -59,13 +57,13 @@ pub struct MempoolInner { // Mempool state info: RwLock, txs: RwLock>, - addresses: RwLock)>>, + addresses: RwLock, // Block building data (single lock for consistency) block_state: RwLock, // Projected blocks snapshot - snapshot: RwLock, + snapshot: RwLock, // Rate limiting dirty: AtomicBool, @@ -78,9 +76,9 @@ impl MempoolInner { client, info: RwLock::new(MempoolInfo::default()), txs: RwLock::new(FxHashMap::default()), - addresses: RwLock::new(FxHashMap::default()), + addresses: RwLock::new(AddressTracker::default()), block_state: RwLock::new(BlockBuildingState::default()), - snapshot: RwLock::new(ProjectedSnapshot::default()), + snapshot: RwLock::new(Snapshot::default()), dirty: AtomicBool::new(false), last_rebuild_ms: AtomicU64::new(0), } @@ -94,7 +92,7 @@ impl MempoolInner { self.snapshot.read().fees.clone() } - pub fn get_snapshot(&self) -> ProjectedSnapshot { + pub fn get_snapshot(&self) -> Snapshot { self.snapshot.read().clone() } @@ -106,9 +104,7 @@ impl MempoolInner { self.txs.read() } - pub fn get_addresses( - &self, - ) -> RwLockReadGuard<'_, FxHashMap)>> { + pub fn get_addresses(&self) -> RwLockReadGuard<'_, AddressTracker> { self.addresses.read() } @@ -124,15 +120,11 @@ impl MempoolInner { /// Sync with Bitcoin Core mempool and rebuild projections if needed. pub fn update(&self) -> Result<()> { - // Single RPC call gets all entries with fees let entries_info = self.client.get_raw_mempool_verbose()?; let current_txids: FxHashSet = entries_info.iter().map(|e| e.txid.clone()).collect(); - // Find new txids and fetch full tx data for address tracking let new_txs = self.fetch_new_txs(¤t_txids); - - // Apply changes using the entry info (has fees) and full txs (for addresses) let has_changes = self.apply_changes(&entries_info, new_txs); if has_changes { @@ -146,7 +138,6 @@ impl MempoolInner { /// Fetch full transaction data for new txids (needed for address tracking). fn fetch_new_txs(&self, current_txids: &FxHashSet) -> FxHashMap { - // Collect txids to fetch while holding read lock, then release it let txids_to_fetch: Vec = { let txs = self.txs.read(); current_txids @@ -157,7 +148,6 @@ impl MempoolInner { .collect() }; - // Make RPC calls without holding the lock txids_to_fetch .into_iter() .filter_map(|txid| { @@ -175,7 +165,6 @@ impl MempoolInner { entries_info: &[MempoolEntryInfo], new_txs: FxHashMap, ) -> bool { - // Build lookup map for current entries let current_entries: FxHashMap = entries_info .iter() .map(|e| (TxidPrefix::from(&e.txid), e)) @@ -200,9 +189,8 @@ impl MempoolInner { let tx = tx_with_hex.tx(); info.remove(tx); - Self::update_address_stats(tx, txid, &mut addresses, false); + addresses.remove_tx(tx, txid); - // Remove from slot-based storage if let Some(idx) = block_state.txid_prefix_to_idx.remove(&prefix) { if let Some(slot) = block_state.entries.get_mut(idx.as_usize()) { *slot = None; @@ -218,22 +206,20 @@ impl MempoolInner { let tx = tx_with_hex.tx(); let prefix = TxidPrefix::from(txid); - // Get the entry info (has fee from Bitcoin Core) let Some(entry_info) = current_entries.get(&prefix) else { continue; }; - let entry = MempoolEntry::from_info(entry_info); + let entry = Entry::from_info(entry_info); info.add(tx); - Self::update_address_stats(tx, txid, &mut addresses, true); + addresses.add_tx(tx, txid); - // Allocate slot let idx = if let Some(idx) = block_state.free_indices.pop() { block_state.entries[idx.as_usize()] = Some(entry); idx } else { - let idx = MempoolTxIndex::from(block_state.entries.len()); + let idx = TxIndex::from(block_state.entries.len()); block_state.entries.push(Some(entry)); idx }; @@ -261,13 +247,12 @@ impl MempoolInner { return; } - // Attempt to claim the rebuild (atomic compare-exchange) if self .last_rebuild_ms .compare_exchange(last, now_ms, Ordering::AcqRel, Ordering::Relaxed) .is_err() { - return; // Another thread is rebuilding + return; } self.dirty.store(false, Ordering::Release); @@ -282,48 +267,8 @@ impl MempoolInner { let block_state = self.block_state.read(); let blocks = build_projected_blocks(&block_state.entries); - let snapshot = ProjectedSnapshot::build(blocks, &block_state.entries); + let snapshot = Snapshot::build(blocks, &block_state.entries); *self.snapshot.write() = snapshot; } - - fn update_address_stats( - tx: &brk_types::Transaction, - txid: &Txid, - addresses: &mut FxHashMap)>, - is_addition: bool, - ) { - // Inputs: track sending - tx.input - .iter() - .flat_map(|txin| txin.prevout.as_ref()) - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - if is_addition { - set.insert(txid.clone()); - stats.sending(txout); - } else { - set.remove(txid); - stats.sent(txout); - } - stats.update_tx_count(set.len() as u32); - }); - - // Outputs: track receiving - tx.output - .iter() - .flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes))) - .for_each(|(txout, bytes)| { - let (stats, set) = addresses.entry(bytes).or_default(); - if is_addition { - set.insert(txid.clone()); - stats.receiving(txout); - } else { - set.remove(txid); - stats.received(txout); - } - stats.update_tx_count(set.len() as u32); - }); - } } diff --git a/crates/brk_monitor/src/mempool/projected_blocks.rs b/crates/brk_monitor/src/mempool/projected_blocks.rs deleted file mode 100644 index c6c11d268..000000000 --- a/crates/brk_monitor/src/mempool/projected_blocks.rs +++ /dev/null @@ -1,137 +0,0 @@ -use brk_types::{FeeRate, RecommendedFees, Sats, VSize}; - -use super::{MempoolEntry, MempoolTxIndex, SelectedTx}; - -/// Minimum fee rate for estimation (sat/vB) -const MIN_FEE_RATE: f64 = 1.0; - -/// Immutable snapshot of projected blocks. -/// Stores indices into live entries + pre-computed stats. -#[derive(Debug, Clone, Default)] -pub struct ProjectedSnapshot { - /// Block structure: indices into entries Vec - pub blocks: Vec>, - /// Pre-computed stats per block - pub block_stats: Vec, - /// Pre-computed fee recommendations - pub fees: RecommendedFees, -} - -/// Statistics for a single projected block. -#[derive(Debug, Clone, Default)] -pub struct BlockStats { - pub tx_count: u32, - pub total_vsize: VSize, - pub total_fee: Sats, - /// Fee rate percentiles: [0%, 10%, 25%, 50%, 75%, 90%, 100%] - /// - fee_range[0] = min, fee_range[3] = median, fee_range[6] = max - pub fee_range: [FeeRate; 7], -} - -impl BlockStats { - pub fn min_fee_rate(&self) -> FeeRate { - self.fee_range[0] - } - - pub fn median_fee_rate(&self) -> FeeRate { - self.fee_range[3] - } - - pub fn max_fee_rate(&self) -> FeeRate { - self.fee_range[6] - } -} - -impl ProjectedSnapshot { - /// Build snapshot from selected transactions (with effective fee rates) and entries. - pub fn build(blocks: Vec>, entries: &[Option]) -> Self { - let block_stats: Vec = blocks - .iter() - .map(|selected| compute_block_stats(selected, entries)) - .collect(); - - let fees = compute_recommended_fees(&block_stats); - - // Convert to just indices for storage - let blocks: Vec> = blocks - .into_iter() - .map(|selected| selected.into_iter().map(|s| s.entries_idx).collect()) - .collect(); - - Self { - blocks, - block_stats, - fees, - } - } -} - -/// Compute statistics for a single block using effective fee rates from selection time. -fn compute_block_stats(selected: &[SelectedTx], entries: &[Option]) -> BlockStats { - if selected.is_empty() { - return BlockStats::default(); - } - - let mut total_fee = Sats::default(); - let mut total_vsize = VSize::default(); - let mut fee_rates: Vec = Vec::with_capacity(selected.len()); - - for sel in selected { - if let Some(entry) = &entries[sel.entries_idx.as_usize()] { - total_fee += entry.fee; - total_vsize += entry.vsize; - // Use the effective fee rate captured at selection time - // This is the actual mining score that determined this tx's block placement - fee_rates.push(sel.effective_fee_rate); - } - } - - fee_rates.sort(); - - BlockStats { - tx_count: selected.len() as u32, - total_vsize, - total_fee, - fee_range: [ - percentile(&fee_rates, 0), - percentile(&fee_rates, 10), - percentile(&fee_rates, 25), - percentile(&fee_rates, 50), - percentile(&fee_rates, 75), - percentile(&fee_rates, 90), - percentile(&fee_rates, 100), - ], - } -} - -/// Get percentile value from sorted array. -fn percentile(sorted: &[FeeRate], p: usize) -> FeeRate { - if sorted.is_empty() { - return FeeRate::default(); - } - let idx = (p * (sorted.len() - 1)) / 100; - sorted[idx] -} - -/// Compute recommended fees from block stats (mempool.space style). -fn compute_recommended_fees(stats: &[BlockStats]) -> RecommendedFees { - RecommendedFees { - // High priority: median of block 1 - fastest_fee: median_fee_for_block(stats, 0), - // Medium priority: median of blocks 2-3 - half_hour_fee: median_fee_for_block(stats, 2), - // Low priority: median of blocks 4-6 - hour_fee: median_fee_for_block(stats, 5), - // No priority: median of later blocks - economy_fee: median_fee_for_block(stats, 7), - minimum_fee: FeeRate::from(MIN_FEE_RATE), - } -} - -/// Get the median fee rate for block N. -fn median_fee_for_block(stats: &[BlockStats], block_index: usize) -> FeeRate { - stats - .get(block_index) - .map(|s| s.median_fee_rate()) - .unwrap_or_else(|| FeeRate::from(MIN_FEE_RATE)) -} diff --git a/crates/brk_monitor/src/mempool/types.rs b/crates/brk_monitor/src/mempool/types.rs deleted file mode 100644 index 051d6c8d2..000000000 --- a/crates/brk_monitor/src/mempool/types.rs +++ /dev/null @@ -1,47 +0,0 @@ -/// Index into the mempool entries Vec. -/// NOT the global TxIndex for confirmed transactions. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct MempoolTxIndex(pub(crate) u32); - -impl MempoolTxIndex { - #[inline] - pub fn as_usize(self) -> usize { - self.0 as usize - } -} - -impl From for MempoolTxIndex { - #[inline] - fn from(value: usize) -> Self { - Self(value as u32) - } -} - -/// Index into the temporary pool Vec used during block building. -/// Distinct from MempoolTxIndex to prevent mixing up index spaces. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct PoolIndex(u32); - -impl PoolIndex { - #[inline] - pub fn as_usize(self) -> usize { - self.0 as usize - } -} - -impl From for PoolIndex { - #[inline] - fn from(value: usize) -> Self { - Self(value as u32) - } -} - -/// A selected transaction with its effective mining score at selection time. -/// The effective_fee_rate is the ancestor score when this tx was selected, -/// which may differ from the original ancestor score (if ancestors were already mined). -#[derive(Debug, Clone, Copy)] -pub struct SelectedTx { - pub entries_idx: MempoolTxIndex, - /// Fee rate at selection time (ancestor_fee / ancestor_vsize) - pub effective_fee_rate: brk_types::FeeRate, -} diff --git a/crates/brk_monitor/src/projected_blocks/fees.rs b/crates/brk_monitor/src/projected_blocks/fees.rs new file mode 100644 index 000000000..8f0d00f10 --- /dev/null +++ b/crates/brk_monitor/src/projected_blocks/fees.rs @@ -0,0 +1,25 @@ +use brk_types::{FeeRate, RecommendedFees}; + +use super::stats::BlockStats; + +/// Minimum fee rate for estimation (sat/vB). +const MIN_FEE_RATE: f64 = 1.0; + +/// Compute recommended fees from block stats (mempool.space style). +pub fn compute_recommended_fees(stats: &[BlockStats]) -> RecommendedFees { + RecommendedFees { + fastest_fee: median_fee_for_block(stats, 0), + half_hour_fee: median_fee_for_block(stats, 2), + hour_fee: median_fee_for_block(stats, 5), + economy_fee: median_fee_for_block(stats, 7), + minimum_fee: FeeRate::from(MIN_FEE_RATE), + } +} + +/// Get the median fee rate for block N. +fn median_fee_for_block(stats: &[BlockStats], block_index: usize) -> FeeRate { + stats + .get(block_index) + .map(|s| s.median_fee_rate()) + .unwrap_or_else(|| FeeRate::from(MIN_FEE_RATE)) +} diff --git a/crates/brk_monitor/src/projected_blocks/mod.rs b/crates/brk_monitor/src/projected_blocks/mod.rs new file mode 100644 index 000000000..02ada3e5e --- /dev/null +++ b/crates/brk_monitor/src/projected_blocks/mod.rs @@ -0,0 +1,9 @@ +//! Projected block building and fee estimation. + +mod fees; +mod snapshot; +mod stats; + +pub use brk_types::RecommendedFees; +pub use snapshot::Snapshot; +pub use stats::BlockStats; diff --git a/crates/brk_monitor/src/projected_blocks/snapshot.rs b/crates/brk_monitor/src/projected_blocks/snapshot.rs new file mode 100644 index 000000000..56b127ce3 --- /dev/null +++ b/crates/brk_monitor/src/projected_blocks/snapshot.rs @@ -0,0 +1,41 @@ +use brk_types::RecommendedFees; + +use super::fees; +use super::stats::{self, BlockStats}; +use crate::mempool::Entry; +use crate::types::{SelectedTx, TxIndex}; + +/// Immutable snapshot of projected blocks. +#[derive(Debug, Clone, Default)] +pub struct Snapshot { + /// Block structure: indices into entries Vec + pub blocks: Vec>, + /// Pre-computed stats per block + pub block_stats: Vec, + /// Pre-computed fee recommendations + pub fees: RecommendedFees, +} + +impl Snapshot { + /// Build snapshot from selected transactions and entries. + pub fn build(blocks: Vec>, entries: &[Option]) -> Self { + let block_stats: Vec = blocks + .iter() + .map(|selected| stats::compute_block_stats(selected, entries)) + .collect(); + + let fees = fees::compute_recommended_fees(&block_stats); + + // Extract just the indices from selected transactions + let blocks = blocks + .into_iter() + .map(|selected| selected.into_iter().map(|s| s.tx_index).collect()) + .collect(); + + Self { + blocks, + block_stats, + fees, + } + } +} diff --git a/crates/brk_monitor/src/projected_blocks/stats.rs b/crates/brk_monitor/src/projected_blocks/stats.rs new file mode 100644 index 000000000..1495338e4 --- /dev/null +++ b/crates/brk_monitor/src/projected_blocks/stats.rs @@ -0,0 +1,73 @@ +use brk_types::{FeeRate, Sats, VSize}; + +use crate::mempool::Entry; +use crate::types::SelectedTx; + +/// Statistics for a single projected block. +#[derive(Debug, Clone, Default)] +pub struct BlockStats { + pub tx_count: u32, + pub total_vsize: VSize, + pub total_fee: Sats, + /// Fee rate percentiles: [0%, 10%, 25%, 50%, 75%, 90%, 100%] + pub fee_range: [FeeRate; 7], +} + +impl BlockStats { + pub fn min_fee_rate(&self) -> FeeRate { + self.fee_range[0] + } + + pub fn median_fee_rate(&self) -> FeeRate { + self.fee_range[3] + } + + pub fn max_fee_rate(&self) -> FeeRate { + self.fee_range[6] + } +} + +/// Compute statistics for a single block using effective fee rates from selection time. +pub fn compute_block_stats(selected: &[SelectedTx], entries: &[Option]) -> BlockStats { + if selected.is_empty() { + return BlockStats::default(); + } + + let mut total_fee = Sats::default(); + let mut total_vsize = VSize::default(); + let mut fee_rates: Vec = Vec::with_capacity(selected.len()); + + for sel in selected { + if let Some(entry) = &entries[sel.tx_index.as_usize()] { + total_fee += entry.fee; + total_vsize += entry.vsize; + fee_rates.push(sel.effective_fee_rate); + } + } + + fee_rates.sort(); + + BlockStats { + tx_count: selected.len() as u32, + total_vsize, + total_fee, + fee_range: [ + percentile(&fee_rates, 0), + percentile(&fee_rates, 10), + percentile(&fee_rates, 25), + percentile(&fee_rates, 50), + percentile(&fee_rates, 75), + percentile(&fee_rates, 90), + percentile(&fee_rates, 100), + ], + } +} + +/// Get percentile value from sorted array. +fn percentile(sorted: &[FeeRate], p: usize) -> FeeRate { + if sorted.is_empty() { + return FeeRate::default(); + } + let idx = (p * (sorted.len() - 1)) / 100; + sorted[idx] +} diff --git a/crates/brk_monitor/src/types/mod.rs b/crates/brk_monitor/src/types/mod.rs new file mode 100644 index 000000000..523902c9f --- /dev/null +++ b/crates/brk_monitor/src/types/mod.rs @@ -0,0 +1,7 @@ +mod pool_index; +mod selected_tx; +mod tx_index; + +pub use pool_index::PoolIndex; +pub use selected_tx::SelectedTx; +pub use tx_index::TxIndex; diff --git a/crates/brk_monitor/src/types/pool_index.rs b/crates/brk_monitor/src/types/pool_index.rs new file mode 100644 index 000000000..5c56f4fdf --- /dev/null +++ b/crates/brk_monitor/src/types/pool_index.rs @@ -0,0 +1,17 @@ +/// Index into the temporary pool used during block building. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct PoolIndex(u32); + +impl PoolIndex { + #[inline] + pub fn as_usize(self) -> usize { + self.0 as usize + } +} + +impl From for PoolIndex { + #[inline] + fn from(value: usize) -> Self { + Self(value as u32) + } +} diff --git a/crates/brk_monitor/src/types/selected_tx.rs b/crates/brk_monitor/src/types/selected_tx.rs new file mode 100644 index 000000000..29c335093 --- /dev/null +++ b/crates/brk_monitor/src/types/selected_tx.rs @@ -0,0 +1,12 @@ +use brk_types::FeeRate; + +use super::TxIndex; + +/// A transaction selected for a projected block. +#[derive(Debug, Clone, Copy)] +pub struct SelectedTx { + /// Index into mempool entries + pub tx_index: TxIndex, + /// Fee rate at selection time (includes CPFP) + pub effective_fee_rate: FeeRate, +} diff --git a/crates/brk_monitor/src/types/tx_index.rs b/crates/brk_monitor/src/types/tx_index.rs new file mode 100644 index 000000000..e4a22b3e8 --- /dev/null +++ b/crates/brk_monitor/src/types/tx_index.rs @@ -0,0 +1,17 @@ +/// Index into the mempool entries storage. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct TxIndex(u32); + +impl TxIndex { + #[inline] + pub fn as_usize(self) -> usize { + self.0 as usize + } +} + +impl From for TxIndex { + #[inline] + fn from(value: usize) -> Self { + Self(value as u32) + } +}