mempool: init

This commit is contained in:
nym21
2025-10-12 17:55:21 +02:00
parent 5f87594ead
commit 7bfca87caf
22 changed files with 317 additions and 125 deletions

18
Cargo.lock generated
View File

@@ -750,6 +750,18 @@ dependencies = [
"serde_json",
]
[[package]]
name = "brk_mempool"
version = "0.0.111"
dependencies = [
"bitcoin",
"bitcoincore-rpc",
"brk_structs",
"log",
"parking_lot 0.12.5",
"rustc-hash",
]
[[package]]
name = "brk_parser"
version = "0.0.111"
@@ -4214,8 +4226,6 @@ checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc"
[[package]]
name = "seqdb"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1f2a11472b8979fde0e6c181fef50e36633491ff154ee19f481d8135fb9dd6a"
dependencies = [
"allocative",
"libc",
@@ -5084,8 +5094,6 @@ checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
[[package]]
name = "vecdb"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4bc53cae8d59e2201d5553b351b124feeed236b6bd31645b6dda8f8dcf95e9a"
dependencies = [
"allocative",
"ctrlc",
@@ -5104,8 +5112,6 @@ dependencies = [
[[package]]
name = "vecdb_derive"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0e16806c42dff3018dfeec2920f701975dd35ff7f6d31f3dde3ea7d91860960"
dependencies = [
"quote",
"syn 2.0.106",

View File

@@ -57,6 +57,7 @@ brk_indexer = { version = "0.0.111", path = "crates/brk_indexer" }
brk_interface = { version = "0.0.111", path = "crates/brk_interface" }
brk_logger = { version = "0.0.111", path = "crates/brk_logger" }
brk_mcp = { version = "0.0.111", path = "crates/brk_mcp" }
brk_mempool = { version = "0.0.111", path = "crates/brk_mempool" }
brk_parser = { version = "0.0.111", path = "crates/brk_parser" }
brk_server = { version = "0.0.111", path = "crates/brk_server" }
brk_store = { version = "0.0.111", path = "crates/brk_store" }
@@ -78,8 +79,8 @@ serde_derive = "1.0.228"
serde_json = { version = "1.0.145", features = ["float_roundtrip"] }
sonic-rs = "0.5.5"
tokio = { version = "1.47.1", features = ["rt-multi-thread"] }
# vecdb = { path = "../seqdb/crates/vecdb", features = ["derive"] }
vecdb = { version = "0.2.17", features = ["derive"] }
vecdb = { path = "../seqdb/crates/vecdb", features = ["derive"] }
# vecdb = { version = "0.2.17", features = ["derive"] }
zerocopy = { version = "0.8.27", features = ["derive"] }
[workspace.metadata.release]

View File

@@ -386,7 +386,7 @@ impl ComputedRatioVecsFromDateIndex {
let mut sorted = self.ratio.dateindex.as_ref().unwrap().collect_range(
Some(min_ratio_date.unwrap_to_usize()),
Some(starting_dateindex.unwrap_to_usize()),
)?;
);
sorted.sort_unstable();

View File

@@ -479,7 +479,7 @@ impl ComputedStandardDeviationVecsFromDateIndex {
let mut sorted = source.collect_range(
Some(min_date.unwrap_to_usize()),
Some(starting_dateindex.unwrap_to_usize()),
)?;
);
sorted.sort_unstable();

View File

@@ -737,7 +737,7 @@ impl Vecs {
if starting_height.is_not_zero() {
chain_state = self
.chain_state
.collect_range(None, None)?
.collect_range(None, None)
.into_iter()
.enumerate()
.map(|(height, supply)| {
@@ -822,10 +822,10 @@ impl Vecs {
let mut dateindex_to_first_height_iter = dateindex_to_first_height.into_iter();
let mut dateindex_to_height_count_iter = dateindex_to_height_count.into_iter();
let height_to_price_close_vec = height_to_price_close
.map(|height_to_price_close| height_to_price_close.collect().unwrap());
let height_to_price_close_vec =
height_to_price_close.map(|height_to_price_close| height_to_price_close.collect());
let height_to_timestamp_fixed_vec = height_to_timestamp_fixed.collect().unwrap();
let height_to_timestamp_fixed_vec = height_to_timestamp_fixed.collect();
let outputindex_range_to_height = RangeMap::from(height_to_first_outputindex);
let mut unspendable_supply = if let Some(prev_height) = starting_height.decremented() {

View File

@@ -30,11 +30,6 @@ use crate::{
vecs::{IndexToVec, MetricToVec},
};
// pub fn cached_errors() -> &'static Cache<String, String> {
// static CACHE: OnceLock<Cache<String, String>> = OnceLock::new();
// CACHE.get_or_init(|| Cache::new(1000))
// }
#[allow(dead_code)]
pub struct Interface<'a> {
vecs: Vecs<'a>,
@@ -153,8 +148,8 @@ impl<'a> Interface<'a> {
.collect::<Vec<_>>();
let mut values = metrics
.iter()
.map(|(_, vec)| Ok(vec.collect_range_string(from, to)?))
.collect::<Result<Vec<_>>>()?;
.map(|(_, vec)| vec.collect_range_string(from, to))
.collect::<Vec<_>>();
if values.is_empty() {
return Ok(Output::CSV(headers.join(",")));
@@ -195,10 +190,8 @@ impl<'a> Interface<'a> {
Format::JSON => {
let mut values = metrics
.iter()
.map(|(_, vec)| -> Result<Vec<u8>> {
Ok(vec.collect_range_json_bytes(from, to)?)
})
.collect::<Result<Vec<_>>>()?;
.map(|(_, vec)| vec.collect_range_json_bytes(from, to))
.collect::<Vec<_>>();
if values.is_empty() {
return Ok(Output::default(format));

View File

@@ -0,0 +1,18 @@
[package]
name = "brk_mempool"
description = "A Bitcoin mempool reader"
version.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
rust-version.workspace = true
build = "build.rs"
[dependencies]
bitcoin = { version = "0.32.7", features = ["serde"] }
bitcoincore-rpc = "0.19.0"
brk_structs = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
rustc-hash = "2.1.1"

View File

@@ -0,0 +1 @@
# brk_mempool

View File

@@ -0,0 +1,8 @@
fn main() {
let profile = std::env::var("PROFILE").unwrap_or_default();
if profile == "release" {
println!("cargo:rustc-flag=-C");
println!("cargo:rustc-flag=target-cpu=native");
}
}

View File

@@ -0,0 +1,71 @@
use std::{thread, time::Duration};
use bitcoin::{Transaction, consensus::encode};
use bitcoincore_rpc::{Client, RpcApi};
use log::error;
use parking_lot::{RwLock, RwLockReadGuard};
use rustc_hash::FxHashMap;
const MAX_FETCHES_PER_CYCLE: usize = 10_000;
pub struct Mempool {
rpc: &'static Client,
txs: RwLock<FxHashMap<String, Transaction>>,
}
impl Mempool {
pub fn new(rpc: &'static Client) -> Self {
Self {
rpc,
txs: RwLock::new(FxHashMap::default()),
}
}
pub fn get_txs(&self) -> RwLockReadGuard<'_, FxHashMap<String, Transaction>> {
self.txs.read()
}
pub fn start(&self) {
loop {
if let Err(e) = self.update() {
error!("Error updating mempool: {}", e);
}
thread::sleep(Duration::from_secs(1));
}
}
fn update(&self) -> Result<(), Box<dyn std::error::Error>> {
let current_txids = self.rpc.get_raw_mempool()?;
let current_set: std::collections::HashSet<String> =
current_txids.iter().map(|t| t.to_string()).collect();
// Fetch new transactions
let mut new_txs = FxHashMap::default();
let mut fetched = 0;
for txid in current_txids {
if fetched >= MAX_FETCHES_PER_CYCLE {
break;
}
let txid_str = txid.to_string();
if !self.txs.read().contains_key(&txid_str)
&& let Ok(hex) = self.rpc.get_raw_transaction_hex(&txid, None)
{
let tx: Transaction = encode::deserialize_hex(&hex)?;
new_txs.insert(txid_str, tx);
fetched += 1;
}
}
{
let mut mempool = self.txs.write();
mempool.retain(|txid, _| current_set.contains(txid));
mempool.extend(new_txs);
}
Ok(())
}
}

View File

@@ -0,0 +1,35 @@
use std::{path::Path, sync::Arc, thread, time::Duration};
use brk_mempool::Mempool;
fn main() {
// Connect to Bitcoin Core
let bitcoin_dir = Path::new(&std::env::var("HOME").unwrap())
.join("Library")
.join("Application Support")
.join("Bitcoin");
// let bitcoin_dir = Path::new("/Volumes/WD_BLACK/bitcoin");
let rpc = Box::leak(Box::new(
bitcoincore_rpc::Client::new(
"http://localhost:8332",
bitcoincore_rpc::Auth::CookieFile(bitcoin_dir.join(".cookie")),
)
.unwrap(),
));
let mempool = Arc::new(Mempool::new(rpc));
// Spawn monitoring thread
let mempool_clone = Arc::clone(&mempool);
thread::spawn(move || {
mempool_clone.start();
});
// Access from main thread
loop {
thread::sleep(Duration::from_secs(5));
let txs = mempool.get_txs();
println!("mempool_tx_count: {}", txs.len());
}
}

View File

@@ -2,7 +2,8 @@ use aide::axum::{ApiRouter, routing::get_with};
use axum::{
extract::{Path, State},
http::HeaderMap,
response::Response,
response::{Redirect, Response},
routing::get,
};
use brk_structs::{AddressInfo, AddressPath};
@@ -13,14 +14,17 @@ use crate::{
use super::AppState;
pub trait AddressesRoutes {
pub trait AddressRoutes {
fn add_addresses_routes(self) -> Self;
}
impl AddressesRoutes for ApiRouter<AppState> {
impl AddressRoutes for ApiRouter<AppState> {
fn add_addresses_routes(self) -> Self {
self.api_route(
"/api/chain/address/{address}",
self
.route("/api/address", get(Redirect::temporary("/api/addresses")))
.route("/api/addresses", get(Redirect::temporary("/api#tag/addresses")))
.api_route(
"/api/address/{address}",
get_with(async |
headers: HeaderMap,
Path(address): Path<AddressPath>,
@@ -35,14 +39,14 @@ impl AddressesRoutes for ApiRouter<AppState> {
Err((status, message)) => Response::new_json_with(status, &message, &etag)
}
}, |op| op
.tag("Chain")
.addresses_tag()
.summary("Address information")
.description("Retrieve comprehensive information about a Bitcoin address including balance, transaction history, UTXOs, and estimated investment metrics. Supports all standard Bitcoin address types (P2PKH, P2SH, P2WPKH, P2WSH, P2TR, etc.).")
.with_ok_response::<AddressInfo, _>(|res| res)
.with_not_modified()
.with_bad_request()
.with_not_found()
.with_server_error()
.ok_response::<AddressInfo>()
.not_modified()
.bad_request()
.not_found()
.server_error()
),
)
}

View File

@@ -1,21 +0,0 @@
use aide::axum::ApiRouter;
use axum::{response::Redirect, routing::get};
use crate::api::chain::{addresses::AddressesRoutes, transactions::TransactionsRoutes};
use super::AppState;
mod addresses;
mod transactions;
pub trait ChainRoutes {
fn add_chain_routes(self) -> Self;
}
impl ChainRoutes for ApiRouter<AppState> {
fn add_chain_routes(self) -> Self {
self.route("/api/chain", get(Redirect::temporary("/api#tag/chain")))
.add_addresses_routes()
.add_transactions_routes()
}
}

View File

@@ -76,10 +76,7 @@ fn req_to_response_res(
// .1
// .etag(Stamp::from(interface.get_height()), to);
// if headers
// .get_if_none_match()
// .is_some_and(|prev_etag| etag == prev_etag)
// {
// if headers.has_etag(etag) {
// return Ok(Response::new_not_modified());
// }

View File

@@ -25,6 +25,7 @@ pub trait ApiMetricsRoutes {
impl ApiMetricsRoutes for ApiRouter<AppState> {
fn add_metrics_routes(self) -> Self {
self
.route("/api/metric", get(Redirect::temporary("/api/metrics")))
.route("/api/metrics", get(Redirect::temporary("/api#tag/metrics")))
.api_route(
"/api/metrics/count",
@@ -39,11 +40,12 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
}
Response::new_json(state.metric_count(), etag)
},
|op| op.tag("Metrics")
|op| op
.metrics_tag()
.summary("Metric count")
.description("Current metric count")
.with_ok_response::<Vec<MetricCount>, _>(|res| res)
.with_not_modified(),
.ok_response::<Vec<MetricCount>>()
.not_modified(),
),
)
.api_route(
@@ -59,13 +61,14 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
}
Response::new_json(state.get_indexes(), etag)
},
|op| op.tag("Metrics")
|op| op
.metrics_tag()
.summary("List available indexes")
.description(
"Returns all available indexes with their accepted query aliases. Use any alias when querying metrics."
)
.with_ok_response::<Vec<IndexInfo>, _>(|res| res)
.with_not_modified(),
.ok_response::<Vec<IndexInfo>>()
.not_modified(),
),
)
.api_route(
@@ -82,11 +85,12 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
}
Response::new_json(state.get_metrics(pagination), etag)
},
|op| op.tag("Metrics")
|op| op
.metrics_tag()
.summary("Metrics list")
.description("Paginated list of available metrics")
.with_ok_response::<PaginatedMetrics, _>(|res| res)
.with_not_modified(),
.ok_response::<PaginatedMetrics>()
.not_modified(),
),
)
.api_route(
@@ -99,13 +103,14 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
}
Response::new_json(state.get_metrics_catalog(), etag)
},
|op| op.tag("Metrics")
|op| op
.metrics_tag()
.summary("Metrics catalog")
.description(
"Returns the complete hierarchical catalog of available metrics organized as a tree structure. Metrics are grouped by categories and subcategories. Best viewed in an interactive JSON viewer (e.g., Firefox's built-in JSON viewer) for easy navigation of the nested structure."
)
.with_ok_response::<TreeNode, _>(|res| res)
.with_not_modified(),
.ok_response::<TreeNode>()
.not_modified(),
),
)
.api_route(
@@ -122,15 +127,16 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
}
Response::new_json(state.match_metric(query), etag)
},
|op| op.tag("Metrics")
|op| op
.metrics_tag()
.summary("Search metrics")
.description("Fuzzy search for metrics by name. Supports partial matches and typos.")
.with_ok_response::<Vec<String>, _>(|res| res)
.with_not_modified(),
.ok_response::<Vec<String>>()
.not_modified(),
),
)
.api_route(
"/api/metrics/{metric}",
"/api/metric/{metric}",
get_with(
async |
headers: HeaderMap,
@@ -154,36 +160,38 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
};
Response::new_json_with(StatusCode::NOT_FOUND, value, etag)
},
|op| op.tag("Metrics")
|op| op
.metrics_tag()
.summary("Get supported indexes for a metric")
.description(
"Returns the list of indexes are supported by the specified metric. \
For example, `realized_price` might be available on dateindex, weekindex, and monthindex."
)
.with_ok_response::<Vec<Index>, _>(|res| res)
.with_not_modified()
.with_not_found(),
.ok_response::<Vec<Index>>()
.not_modified()
.not_found(),
),
)
// WIP
.route("/api/metrics/bulk", get(data::handler))
// WIP
.route(
"/api/metrics/{metric}/{index}",
"/api/metric/{metric}/{index}",
get(
async |uri: Uri,
headers: HeaderMap,
state: State<AppState>,
Path((metric, index)): Path<(String, Index)>,
Path((metric, index)): Path<(MetricPath, Index)>,
Query(params_opt): Query<ParamsOpt>|
-> Response {
data::handler(
uri,
headers,
Query(Params::from(((index, metric), params_opt))),
state,
)
.await
todo!();
// data::handler(
// uri,
// headers,
// Query(Params::from(((index, metric), params_opt))),
// state,
// )
// .await
},
),
)

View File

View File

@@ -14,15 +14,16 @@ use brk_structs::Health;
use crate::{
VERSION,
api::{chain::ChainRoutes, metrics::ApiMetricsRoutes},
api::{addresses::AddressRoutes, metrics::ApiMetricsRoutes, transactions::TxRoutes},
extended::{HeaderMapExtended, ResponseExtended, TransformResponseExtended},
};
use super::AppState;
mod chain;
mod addresses;
mod metrics;
mod openapi;
mod transactions;
pub use openapi::*;
@@ -32,7 +33,8 @@ pub trait ApiRoutes {
impl ApiRoutes for ApiRouter<AppState> {
fn add_api_routes(self) -> Self {
self.add_chain_routes()
self.add_addresses_routes()
.add_tx_routes()
.add_metrics_routes()
.route("/api/server", get(Redirect::temporary("/api#tag/server")))
.api_route(
@@ -40,10 +42,10 @@ impl ApiRoutes for ApiRouter<AppState> {
get_with(
async || -> Json<&'static str> { Json(VERSION) },
|op| {
op.tag("Server")
op.server_tag()
.summary("API version")
.description("Returns the current version of the API server")
.with_ok_response::<String, _>(|res| res)
.ok_response::<String>()
},
),
)
@@ -58,10 +60,10 @@ impl ApiRoutes for ApiRouter<AppState> {
})
},
|op| {
op.tag("Server")
op.server_tag()
.summary("Health check")
.description("Returns the health status of the API server")
.with_ok_response::<Health, _>(|res| res)
.ok_response::<Health>()
},
),
)

View File

@@ -18,8 +18,7 @@ pub fn create_openapi() -> OpenApi {
let info = Info {
title: "Bitcoin Research Kit".to_string(),
description: Some(
"API for querying Bitcoin blockchain data including addresses, transactions, and chain statistics. This API provides low-level access to indexed blockchain data with advanced analytics capabilities.\n\n\
⚠️ **Early Development**: This API is in very early stages of development. Breaking changes may occur without notice. For a more stable experience, [self-host](/install) or use the [hosting service](/service)."
"API for querying Bitcoin blockchain data including addresses, transactions, and chain statistics. This API provides low-level access to indexed blockchain data with advanced analytics capabilities."
.to_string(),
),
version: format!("v{VERSION}"),
@@ -28,9 +27,17 @@ pub fn create_openapi() -> OpenApi {
let tags = vec![
Tag {
name: "Chain".to_string(),
name: "Addresses".to_string(),
description: Some(
"Explore Bitcoin blockchain data: addresses, transactions, blocks, balances, and UTXOs."
"Explore Bitcoin addresses."
.to_string()
),
..Default::default()
},
Tag {
name: "Blocks".to_string(),
description: Some(
"Explore Bitcoin blocks."
.to_string()
),
..Default::default()
@@ -44,6 +51,14 @@ pub fn create_openapi() -> OpenApi {
),
..Default::default()
},
Tag {
name: "Mining".to_string(),
description: Some(
"Explore mining related endpoints."
.to_string()
),
..Default::default()
},
Tag {
name: "Server".to_string(),
description: Some(
@@ -51,7 +66,15 @@ pub fn create_openapi() -> OpenApi {
.to_string()
),
..Default::default()
},
},
Tag {
name: "Transactions".to_string(),
description: Some(
"Explore Bitcoin transactions."
.to_string()
),
..Default::default()
},
];
OpenApi {

View File

@@ -2,7 +2,8 @@ use aide::axum::{ApiRouter, routing::get_with};
use axum::{
extract::{Path, State},
http::HeaderMap,
response::Response,
response::{Redirect, Response},
routing::get,
};
use brk_structs::{TransactionInfo, TxidPath};
@@ -13,14 +14,17 @@ use crate::{
use super::AppState;
pub trait TransactionsRoutes {
fn add_transactions_routes(self) -> Self;
pub trait TxRoutes {
fn add_tx_routes(self) -> Self;
}
impl TransactionsRoutes for ApiRouter<AppState> {
fn add_transactions_routes(self) -> Self {
self.api_route(
"/api/chain/tx/{txid}",
impl TxRoutes for ApiRouter<AppState> {
fn add_tx_routes(self) -> Self {
self
.route("/api/tx", get(Redirect::temporary("/api/transactions")))
.route("/api/transactions", get(Redirect::temporary("/api#tag/transactions")))
.api_route(
"/api/tx/{txid}",
get_with(
async |
headers: HeaderMap,
@@ -37,16 +41,16 @@ impl TransactionsRoutes for ApiRouter<AppState> {
}
},
|op| op
.tag("Chain")
.transactions_tag()
.summary("Transaction information")
.description(
"Retrieve complete transaction data by transaction ID (txid). Returns the full transaction details including inputs, outputs, and metadata. The transaction data is read directly from the blockchain data files.",
)
.with_ok_response::<TransactionInfo, _>(|res| res)
.with_not_modified()
.with_bad_request()
.with_not_found()
.with_server_error(),
.ok_response::<TransactionInfo>()
.not_modified()
.bad_request()
.not_found()
.server_error(),
),
)
}

View File

@@ -3,23 +3,65 @@ use axum::Json;
use schemars::JsonSchema;
pub trait TransformResponseExtended<'t> {
fn addresses_tag(self) -> Self;
fn blocks_tag(self) -> Self;
fn metrics_tag(self) -> Self;
fn mining_tag(self) -> Self;
fn server_tag(self) -> Self;
fn transactions_tag(self) -> Self;
/// 200
fn with_ok_response<R, F>(self, f: F) -> Self
fn ok_response<R>(self) -> Self
where
R: JsonSchema;
/// 200
fn ok_response_with<R, F>(self, f: F) -> Self
where
R: JsonSchema,
F: FnOnce(TransformResponse<'_, R>) -> TransformResponse<'_, R>;
/// 400
fn with_bad_request(self) -> Self;
fn bad_request(self) -> Self;
/// 404
fn with_not_found(self) -> Self;
fn not_found(self) -> Self;
/// 304
fn with_not_modified(self) -> Self;
fn not_modified(self) -> Self;
/// 500
fn with_server_error(self) -> Self;
fn server_error(self) -> Self;
}
impl<'t> TransformResponseExtended<'t> for TransformOperation<'t> {
fn with_ok_response<R, F>(self, f: F) -> Self
fn addresses_tag(self) -> Self {
self.tag("Addresses")
}
fn blocks_tag(self) -> Self {
self.tag("Blocks")
}
fn metrics_tag(self) -> Self {
self.tag("Metrics")
}
fn mining_tag(self) -> Self {
self.tag("Mining")
}
fn server_tag(self) -> Self {
self.tag("Server")
}
fn transactions_tag(self) -> Self {
self.tag("Transactions")
}
fn ok_response<R>(self) -> Self
where
R: JsonSchema,
{
self.ok_response_with(|r: TransformResponse<'_, R>| r)
}
fn ok_response_with<R, F>(self, f: F) -> Self
where
R: JsonSchema,
F: FnOnce(TransformResponse<'_, R>) -> TransformResponse<'_, R>,
@@ -27,23 +69,23 @@ impl<'t> TransformResponseExtended<'t> for TransformOperation<'t> {
self.response_with::<200, Json<R>, _>(|res| f(res.description("Successful response")))
}
fn with_bad_request(self) -> Self {
fn bad_request(self) -> Self {
self.response_with::<400, Json<String>, _>(|res| {
res.description("Invalid request parameters")
})
}
fn with_not_found(self) -> Self {
fn not_found(self) -> Self {
self.response_with::<404, Json<String>, _>(|res| res.description("Resource not found"))
}
fn with_not_modified(self) -> Self {
fn not_modified(self) -> Self {
self.response_with::<304, (), _>(|res| {
res.description("Not modified - content unchanged since last request")
})
}
fn with_server_error(self) -> Self {
fn server_error(self) -> Self {
self.response_with::<500, Json<String>, _>(|res| res.description("Internal server error"))
}
}