global: fixes

This commit is contained in:
nym21
2026-05-01 19:14:15 +02:00
parent 1068ad4e8f
commit 6f879a5551
36 changed files with 949 additions and 337 deletions

View File

@@ -3,7 +3,6 @@ use brk_error::Result;
use brk_indexer::Indexer;
use brk_mempool::Mempool;
use brk_reader::Reader;
use brk_rpc::Client;
use tokio::task::spawn_blocking;
use crate::Query;
@@ -51,11 +50,8 @@ impl AsyncQuery {
f(&self.0)
}
#[inline]
pub fn inner(&self) -> &Query {
&self.0
}
pub fn client(&self) -> &Client {
self.0.client()
}
}

View File

@@ -1,5 +1,5 @@
use brk_error::{Error, OptionData, Result};
use brk_mempool::{EntryPool, TxEntry, TxGraveyard, TxRemoval, TxStore, TxTombstone};
use brk_mempool::{EntryPool, Mempool, TxEntry, TxGraveyard, TxRemoval, TxStore, TxTombstone};
use brk_types::{
CheckedSub, CpfpEntry, CpfpInfo, FeeRate, MempoolBlock, MempoolInfo, MempoolRecentTx,
OutputType, RbfResponse, RbfTx, RecommendedFees, ReplacementNode, Sats, Timestamp, Transaction,
@@ -10,6 +10,8 @@ use vecdb::{AnyVec, ReadableVec, VecIndex};
use crate::Query;
const RECENT_REPLACEMENTS_LIMIT: usize = 25;
impl Query {
pub fn mempool_info(&self) -> Result<MempoolInfo> {
let mempool = self.mempool().ok_or(Error::MempoolNotAvailable)?;
@@ -305,11 +307,7 @@ impl Query {
let replaces = (!replaces_vec.is_empty()).then_some(replaces_vec);
let replacements =
Self::build_rbf_node(&root_txid, None, &txs, &entries, &graveyard).map(|mut node| {
node.tx.full_rbf = Some(node.full_rbf);
node.interval = None;
node
});
self.build_rbf_node(&root_txid, None, mempool, &txs, &entries, &graveyard);
Ok(RbfResponse {
replacements,
@@ -336,9 +334,17 @@ impl Query {
/// Predecessors are always in the graveyard (that's where
/// `Removal::Replaced` lives), so the recursion only needs the
/// graveyard; the live pool is consulted for the root.
///
/// `rate` matches mempool.space's `tx.effectiveFeePerVsize`: live
/// txs get the live CPFP-cluster effective rate; mined txs get the
/// computer's stored same-block-cluster effective rate; never-mined
/// replaced predecessors have no recorded effective rate, so we
/// fall back to the simple `fee/vsize` snapshotted at burial.
fn build_rbf_node(
&self,
txid: &Txid,
successor_time: Option<Timestamp>,
mempool: &Mempool,
txs: &TxStore,
entries: &EntryPool,
graveyard: &TxGraveyard,
@@ -348,7 +354,14 @@ impl Query {
let replaces: Vec<ReplacementNode> = graveyard
.predecessors_of(txid)
.filter_map(|(pred_txid, _)| {
Self::build_rbf_node(pred_txid, Some(entry.first_seen), txs, entries, graveyard)
self.build_rbf_node(
pred_txid,
Some(entry.first_seen),
mempool,
txs,
entries,
graveyard,
)
})
.collect();
@@ -359,6 +372,24 @@ impl Query {
.map(|d| usize::from(d) as u32);
let value = Sats::from(tx.output.iter().map(|o| u64::from(o.value)).sum::<u64>());
let tx_index = self.resolve_tx_index(txid).ok();
let mined = tx_index.map(|_| true);
let rate = if txs.contains(txid) {
mempool
.cpfp_info(&TxidPrefix::from(txid))
.and_then(|info| info.effective_fee_per_vsize)
.unwrap_or_else(|| entry.fee_rate())
} else if let Some(idx) = tx_index {
self.computer()
.transactions
.fees
.effective_fee_rate
.tx_index
.collect_one(idx)
.unwrap_or_else(|| entry.fee_rate())
} else {
entry.fee_rate()
};
Some(ReplacementNode {
tx: RbfTx {
@@ -366,14 +397,16 @@ impl Query {
fee: entry.fee,
vsize: entry.vsize,
value,
rate: entry.fee_rate(),
rate,
time: entry.first_seen,
rbf: entry.rbf,
full_rbf: None,
full_rbf: Some(full_rbf),
mined,
},
time: entry.first_seen,
full_rbf,
interval,
mined,
replaces,
})
}
@@ -381,45 +414,39 @@ impl Query {
/// Recent RBF replacements across the whole mempool, matching
/// mempool.space's `GET /api/v1/replacements` and
/// `GET /api/v1/fullrbf/replacements`. Each entry is a complete
/// replacement tree rooted at the latest replacer; same shape as
/// `tx_rbf().replacements`. Sorted most-recent-first by root
/// `time`. When `full_rbf_only` is true, only trees with at least
/// one non-signaling predecessor are returned.
/// replacement tree rooted at the terminal replacer; same shape as
/// `tx_rbf().replacements`. Ordered by most-recent replacement
/// event first (matches mempool.space's reversed-`replacedBy`
/// iteration) and capped at 25 entries. When `full_rbf_only` is
/// true, only trees with at least one non-signaling predecessor
/// are returned.
pub fn recent_replacements(&self, full_rbf_only: bool) -> Result<Vec<ReplacementNode>> {
let mempool = self.mempool().ok_or(Error::MempoolNotAvailable)?;
let txs = mempool.txs();
let entries = mempool.entries();
let graveyard = mempool.graveyard();
// Collect every distinct tree-root replacer. A predecessor's
// `by` may itself have been replaced; walk forward through
// chained Replaced tombstones until reaching a tx that's no
// longer flagged as replaced (live, Vanished, or unknown).
let mut roots: FxHashSet<Txid> = FxHashSet::default();
for (_, by) in graveyard.replaced_iter() {
let mut root = by.clone();
while let Some(TxRemoval::Replaced { by: next }) =
graveyard.get(&root).map(TxTombstone::reason)
{
root = next.clone();
}
roots.insert(root);
}
let mut trees: Vec<ReplacementNode> = roots
.iter()
// A predecessor's `by` may itself be replaced; walk the chain
// forward to the terminal replacer for each tree, dedup so each
// tree is emitted once at its first (most recent) sighting.
let mut seen: FxHashSet<Txid> = FxHashSet::default();
Ok(graveyard
.replaced_iter_recent_first()
.filter_map(|(_, by)| {
let mut root = by.clone();
while let Some(TxRemoval::Replaced { by: next }) =
graveyard.get(&root).map(TxTombstone::reason)
{
root = next.clone();
}
seen.insert(root.clone()).then_some(root)
})
.filter_map(|root| {
Self::build_rbf_node(root, None, &txs, &entries, &graveyard).map(|mut node| {
node.tx.full_rbf = Some(node.full_rbf);
node.interval = None;
node
})
self.build_rbf_node(&root, None, mempool, &txs, &entries, &graveyard)
})
.filter(|node| !full_rbf_only || node.full_rbf)
.collect();
trees.sort_by(|a, b| b.time.cmp(&a.time));
Ok(trees)
.take(RECENT_REPLACEMENTS_LIMIT)
.collect())
}
pub fn transaction_times(&self, txids: &[Txid]) -> Result<Vec<u64>> {

View File

@@ -1,5 +1,7 @@
use brk_error::Result;
use brk_types::{Dollars, ExchangeRates, HistoricalPrice, HistoricalPriceEntry, Hour4, Timestamp};
use brk_types::{
Dollars, ExchangeRates, HistoricalPrice, HistoricalPriceEntry, Hour4, INDEX_EPOCH, Timestamp,
};
use vecdb::ReadableVec;
use crate::Query;
@@ -32,6 +34,9 @@ impl Query {
}
fn price_at(&self, target: Timestamp) -> Result<Vec<HistoricalPriceEntry>> {
if *target < INDEX_EPOCH {
return Ok(vec![]);
}
let h4 = Hour4::from_timestamp(target);
let cents = self.computer().prices.spot.cents.hour4.collect_one(h4);
Ok(vec![HistoricalPriceEntry {

View File

@@ -32,7 +32,7 @@ impl Query {
pub fn series_not_found_error(&self, series: &SeriesName) -> Error {
// Check if series exists but with different indexes
if let Some(indexes) = self.vecs().series_to_indexes(series.clone()) {
if let Some(indexes) = self.vecs().series_to_indexes(series) {
let supported = indexes
.iter()
.map(|i| format!("/api/series/{series}/{}", i.name()))
@@ -382,7 +382,7 @@ impl Query {
})
}
pub fn series_to_indexes(&self, series: SeriesName) -> Option<&Vec<Index>> {
pub fn series_to_indexes(&self, series: &SeriesName) -> Option<&Vec<Index>> {
self.vecs().series_to_indexes(series)
}

View File

@@ -1,9 +1,10 @@
#![doc = include_str!("../README.md")]
#![allow(clippy::module_inception)]
use std::sync::Arc;
use std::{path::Path, sync::Arc};
use brk_computer::Computer;
use brk_error::{OptionData, Result};
use brk_indexer::Indexer;
use brk_mempool::Mempool;
use brk_reader::Reader;
@@ -84,7 +85,7 @@ impl Query {
}
/// Build sync status with the given tip height
pub fn sync_status(&self, tip_height: Height) -> SyncStatus {
pub fn sync_status(&self, tip_height: Height) -> Result<SyncStatus> {
let indexed_height = self.indexed_height();
let computed_height = self.computed_height();
let blocks_behind = Height::from(tip_height.saturating_sub(*indexed_height));
@@ -94,16 +95,16 @@ impl Query {
.blocks
.timestamp
.collect_one(indexed_height)
.unwrap();
.data()?;
SyncStatus {
Ok(SyncStatus {
indexed_height,
computed_height,
tip_height,
blocks_behind,
last_indexed_at: last_indexed_at_unix.to_iso8601(),
last_indexed_at_unix,
}
})
}
#[inline]
@@ -117,7 +118,7 @@ impl Query {
}
#[inline]
pub fn blocks_dir(&self) -> &std::path::Path {
pub fn blocks_dir(&self) -> &Path {
self.0.reader.blocks_dir()
}

View File

@@ -8,7 +8,7 @@ use brk_types::{
};
use derive_more::{Deref, DerefMut};
use quickmatch::{QuickMatch, QuickMatchConfig};
use vecdb::AnyExportableVec;
use vecdb::{AnyExportableVec, Ro};
#[derive(Default)]
pub struct Vecs<'a> {
@@ -25,7 +25,7 @@ pub struct Vecs<'a> {
}
impl<'a> Vecs<'a> {
pub fn build(indexer: &'a Indexer<vecdb::Ro>, computer: &'a Computer<vecdb::Ro>) -> Self {
pub fn build(indexer: &'a Indexer<Ro>, computer: &'a Computer<Ro>) -> Self {
Self::build_from(
indexer.vecs.iter_any_visible(),
indexer.vecs.to_tree_node(),
@@ -57,24 +57,17 @@ impl<'a> Vecs<'a> {
let mut ids = this
.series_to_index_to_vec
.keys()
.cloned()
.copied()
.collect::<Vec<_>>();
let sort_ids = |ids: &mut Vec<&str>| {
ids.sort_unstable_by(|a, b| {
let len_cmp = a.len().cmp(&b.len());
if len_cmp == std::cmp::Ordering::Equal {
a.cmp(b)
} else {
len_cmp
}
})
ids.sort_unstable_by(|a, b| a.len().cmp(&b.len()).then_with(|| a.cmp(b)))
};
sort_ids(&mut ids);
this.series = ids;
this.counts.distinct_series = this.series_to_index_to_vec.keys().count();
this.counts.distinct_series = this.series_to_index_to_vec.len();
this.counts.total_endpoints = this
.index_to_series_to_vec
.values()
@@ -108,7 +101,7 @@ impl<'a> Vecs<'a> {
this.index_to_series = this
.index_to_series_to_vec
.iter()
.map(|(index, id_to_vec)| (*index, id_to_vec.keys().cloned().collect::<Vec<_>>()))
.map(|(index, id_to_vec)| (*index, id_to_vec.keys().copied().collect::<Vec<_>>()))
.collect();
this.index_to_series.values_mut().for_each(sort_ids);
this.catalog.replace(
@@ -121,7 +114,7 @@ impl<'a> Vecs<'a> {
.collect(),
)
.merge_branches()
.unwrap(),
.expect("indexed/computed catalog merge: same series leaf with incompatible schemas"),
);
this.matcher = Some(QuickMatch::new(&this.series));
@@ -144,17 +137,11 @@ impl<'a> Vecs<'a> {
"Duplicate series: {name} for index {index:?}"
);
let prev = self
.index_to_series_to_vec
self.index_to_series_to_vec
.entry(index)
.or_default()
.insert(name, vec);
assert!(
prev.is_none(),
"Duplicate series: {name} for index {index:?}"
);
// Track per-db counts
let is_lazy = vec.region_names().is_empty();
self.counts_by_db
.entry(db.to_string())
@@ -182,7 +169,7 @@ impl<'a> Vecs<'a> {
}
}
pub fn series_to_indexes(&self, series: SeriesName) -> Option<&Vec<Index>> {
pub fn series_to_indexes(&self, series: &SeriesName) -> Option<&Vec<Index>> {
self.series_to_indexes
.get(series.replace("-", "_").as_str())
}