global: snapshot + monitor: add addresses to mempool

This commit is contained in:
nym21
2025-10-14 17:36:16 +02:00
parent db0298ac1b
commit 5425085953
63 changed files with 707 additions and 330 deletions

View File

@@ -31,5 +31,7 @@ fn main() {
thread::sleep(Duration::from_secs(5));
let txs = mempool.get_txs();
println!("mempool_tx_count: {}", txs.len());
let addresses = mempool.get_addresses();
println!("mempool_address_count: {}", addresses.len());
}
}

View File

@@ -1,7 +1,8 @@
use std::{thread, time::Duration};
use bitcoin::{Transaction, Txid, consensus::encode};
use bitcoin::consensus::encode;
use bitcoincore_rpc::{Client, RpcApi};
use brk_structs::{AddressBytes, AddressMempoolStats, Transaction, Txid};
use log::error;
use parking_lot::{RwLock, RwLockReadGuard};
use rustc_hash::{FxHashMap, FxHashSet};
@@ -11,6 +12,7 @@ const MAX_FETCHES_PER_CYCLE: usize = 10_000;
pub struct Mempool {
rpc: &'static Client,
txs: RwLock<FxHashMap<Txid, Transaction>>,
addresses: RwLock<FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>>,
}
impl Mempool {
@@ -18,6 +20,7 @@ impl Mempool {
Self {
rpc,
txs: RwLock::new(FxHashMap::default()),
addresses: RwLock::new(FxHashMap::default()),
}
}
@@ -25,6 +28,12 @@ impl Mempool {
self.txs.read()
}
pub fn get_addresses(
&self,
) -> RwLockReadGuard<'_, FxHashMap<AddressBytes, (AddressMempoolStats, FxHashSet<Txid>)>> {
self.addresses.read()
}
pub fn start(&self) {
loop {
if let Err(e) = self.update() {
@@ -34,35 +43,82 @@ impl Mempool {
}
}
fn update(&self) -> Result<(), Box<dyn std::error::Error>> {
pub fn update(&self) -> Result<(), Box<dyn std::error::Error>> {
let txids = self
.rpc
.get_raw_mempool()?
.into_iter()
.map(Txid::from)
.collect::<FxHashSet<_>>();
let missing_txids = {
let new_txs = {
let txs = self.txs.read();
txids
.iter()
.filter(|txid| !txs.contains_key(*txid))
.take(MAX_FETCHES_PER_CYCLE)
.cloned()
.collect::<Vec<_>>()
};
let new_txs = missing_txids
.into_iter()
.filter_map(|txid| {
self.rpc
.get_raw_transaction_hex(txid, None)
.ok()
.and_then(|hex| encode::deserialize_hex(&hex).ok())
.map(|tx| (*txid, tx))
})
.collect::<FxHashMap<_, _>>();
}
.into_iter()
.filter_map(|txid| {
self.rpc
.get_raw_transaction_hex(&bitcoin::Txid::from(&txid), None)
.ok()
.and_then(|hex| encode::deserialize_hex::<bitcoin::Transaction>(&hex).ok())
.map(|tx| Transaction::from_mempool(tx, self.rpc))
.map(|tx| (txid, tx))
})
.collect::<FxHashMap<_, _>>();
let mut txs = self.txs.write();
txs.retain(|txid, _| txids.contains(txid));
let mut addresses = self.addresses.write();
txs.retain(|txid, tx| {
if txids.contains(txid) {
return true;
}
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.sent(txout);
stats.update_tx_count(set.len() as u32);
});
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.remove(txid);
stats.received(txout);
stats.update_tx_count(set.len() as u32);
});
false
});
new_txs.iter().for_each(|(txid, tx)| {
tx.input
.iter()
.flat_map(|txin| txin.prevout.as_ref())
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.sending(txout);
stats.update_tx_count(set.len() as u32);
});
tx.output
.iter()
.flat_map(|txout| txout.address_bytes().map(|bytes| (txout, bytes)))
.for_each(|(txout, bytes)| {
let (stats, set) = addresses.entry(bytes).or_default();
set.insert(txid.clone());
stats.receiving(txout);
stats.update_tx_count(set.len() as u32);
});
});
txs.extend(new_txs);
Ok(())