mempool: general improvements

This commit is contained in:
nym21
2026-04-28 18:46:37 +02:00
parent 66494c081c
commit f1749472e7
95 changed files with 2545 additions and 2670 deletions
+11 -16
View File
@@ -2,8 +2,6 @@ use aide::axum::{ApiRouter, routing::get_with};
use axum::{
extract::{Path, Query, State},
http::{HeaderMap, Uri},
response::Redirect,
routing::get,
};
use brk_types::{AddrStats, AddrValidation, Transaction, Txid, Utxo, Version};
@@ -19,10 +17,7 @@ pub trait AddrRoutes {
impl AddrRoutes for ApiRouter<AppState> {
fn add_addr_routes(self) -> Self {
self
.route("/api/address", get(Redirect::temporary("/api/addresses")))
.route("/api/addresses", get(Redirect::temporary("/api#tag/addresses")))
.api_route(
self.api_route(
"/api/address/{address}",
get_with(async |
uri: Uri,
@@ -31,8 +26,8 @@ impl AddrRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>
| {
let strategy = state.addr_cache(Version::ONE, &path.addr, false);
state.cached_json(&headers, strategy, &uri, move |q| q.addr(path.addr)).await
let strategy = state.addr_strategy(Version::ONE, &path.addr, false);
state.respond_json(&headers, strategy, &uri, move |q| q.addr(path.addr)).await
}, |op| op
.id("get_address")
.addrs_tag()
@@ -54,8 +49,8 @@ impl AddrRoutes for ApiRouter<AppState> {
Query(params): Query<AddrTxidsParam>,
State(state): State<AppState>
| {
let strategy = state.addr_cache(Version::ONE, &path.addr, false);
state.cached_json(&headers, strategy, &uri, move |q| q.addr_txs(path.addr, params.after_txid, 50)).await
let strategy = state.addr_strategy(Version::ONE, &path.addr, false);
state.respond_json(&headers, strategy, &uri, move |q| q.addr_txs(path.addr, params.after_txid, 50)).await
}, |op| op
.id("get_address_txs")
.addrs_tag()
@@ -77,8 +72,8 @@ impl AddrRoutes for ApiRouter<AppState> {
Query(params): Query<AddrTxidsParam>,
State(state): State<AppState>
| {
let strategy = state.addr_cache(Version::ONE, &path.addr, true);
state.cached_json(&headers, strategy, &uri, move |q| q.addr_txs(path.addr, params.after_txid, 25)).await
let strategy = state.addr_strategy(Version::ONE, &path.addr, true);
state.respond_json(&headers, strategy, &uri, move |q| q.addr_txs(path.addr, params.after_txid, 25)).await
}, |op| op
.id("get_address_confirmed_txs")
.addrs_tag()
@@ -101,7 +96,7 @@ impl AddrRoutes for ApiRouter<AppState> {
State(state): State<AppState>
| {
let hash = state.sync(|q| q.addr_mempool_hash(&path.addr));
state.cached_json(&headers, CacheStrategy::MempoolHash(hash), &uri, move |q| q.addr_mempool_txids(path.addr)).await
state.respond_json(&headers, CacheStrategy::MempoolHash(hash), &uri, move |q| q.addr_mempool_txids(path.addr)).await
}, |op| op
.id("get_address_mempool_txs")
.addrs_tag()
@@ -123,8 +118,8 @@ impl AddrRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>
| {
let strategy = state.addr_cache(Version::ONE, &path.addr, false);
state.cached_json(&headers, strategy, &uri, move |q| q.addr_utxos(path.addr)).await
let strategy = state.addr_strategy(Version::ONE, &path.addr, false);
state.respond_json(&headers, strategy, &uri, move |q| q.addr_utxos(path.addr)).await
}, |op| op
.id("get_address_utxos")
.addrs_tag()
@@ -146,7 +141,7 @@ impl AddrRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, move |_q| Ok(AddrValidation::from_addr(&path.addr))).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, move |_q| Ok(AddrValidation::from_addr(&path.addr))).await
}, |op| op
.id("validate_address")
.addrs_tag()
+25 -25
View File
@@ -29,8 +29,8 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<BlockHashParam>,
_: Empty, State(state): State<AppState>| {
let strategy = state.block_cache(Version::ONE, &path.hash);
state.cached_json(&headers, strategy, &uri, move |q| q.block(&path.hash)).await
let strategy = state.block_strategy(Version::ONE, &path.hash);
state.respond_json(&headers, strategy, &uri, move |q| q.block(&path.hash)).await
},
|op| {
op.id("get_block")
@@ -51,8 +51,8 @@ impl BlockRoutes for ApiRouter<AppState> {
"/api/v1/block/{hash}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<BlockHashParam>, _: Empty, State(state): State<AppState>| {
let strategy = state.block_cache(Version::ONE, &path.hash);
state.cached_json(&headers, strategy, &uri, move |q| {
let strategy = state.block_strategy(Version::ONE, &path.hash);
state.respond_json(&headers, strategy, &uri, move |q| {
let height = q.height_by_hash(&path.hash)?;
q.block_by_height_v1(height)
}).await
@@ -74,8 +74,8 @@ impl BlockRoutes for ApiRouter<AppState> {
"/api/block/{hash}/header",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<BlockHashParam>, _: Empty, State(state): State<AppState>| {
let strategy = state.block_cache(Version::ONE, &path.hash);
state.cached_text(&headers, strategy, &uri, move |q| q.block_header_hex(&path.hash)).await
let strategy = state.block_strategy(Version::ONE, &path.hash);
state.respond_text(&headers, strategy, &uri, move |q| q.block_header_hex(&path.hash)).await
},
|op| {
op.id("get_block_header")
@@ -97,7 +97,7 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<HeightParam>,
_: Empty, State(state): State<AppState>| {
state.cached_text(&headers, state.height_cache(Version::ONE, path.height), &uri, move |q| q.block_hash_by_height(path.height).map(|h| h.to_string())).await
state.respond_text(&headers, state.height_strategy(Version::ONE, path.height), &uri, move |q| q.block_hash_by_height(path.height).map(|h| h.to_string())).await
},
|op| {
op.id("get_block_by_height")
@@ -121,7 +121,7 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<TimestampParam>,
_: Empty, State(state): State<AppState>| {
state.cached_json(&headers, state.timestamp_cache(Version::ONE, path.timestamp), &uri, move |q| q.block_by_timestamp(path.timestamp)).await
state.respond_json(&headers, state.timestamp_strategy(Version::ONE, path.timestamp), &uri, move |q| q.block_by_timestamp(path.timestamp)).await
},
|op| {
op.id("get_block_by_timestamp")
@@ -143,8 +143,8 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<BlockHashParam>,
_: Empty, State(state): State<AppState>| {
let strategy = state.block_cache(Version::ONE, &path.hash);
state.cached_bytes(&headers, strategy, &uri, move |q| q.block_raw(&path.hash)).await
let strategy = state.block_strategy(Version::ONE, &path.hash);
state.respond_bytes(&headers, strategy, &uri, move |q| q.block_raw(&path.hash)).await
},
|op| {
op.id("get_block_raw")
@@ -168,7 +168,7 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<BlockHashParam>,
_: Empty, State(state): State<AppState>| {
state.cached_json(&headers, state.block_status_cache(Version::ONE, &path.hash), &uri, move |q| q.block_status(&path.hash)).await
state.respond_json(&headers, state.block_status_strategy(Version::ONE, &path.hash), &uri, move |q| q.block_status(&path.hash)).await
},
|op| {
op.id("get_block_status")
@@ -189,7 +189,7 @@ impl BlockRoutes for ApiRouter<AppState> {
"/api/blocks/tip/height",
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state.cached_text(&headers, CacheStrategy::Tip, &uri, |q| Ok(q.indexed_height().to_string())).await
state.respond_text(&headers, CacheStrategy::Tip, &uri, |q| Ok(q.indexed_height().to_string())).await
},
|op| {
op.id("get_block_tip_height")
@@ -206,7 +206,7 @@ impl BlockRoutes for ApiRouter<AppState> {
"/api/blocks/tip/hash",
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state.cached_text(&headers, CacheStrategy::Tip, &uri, |q| Ok(q.tip_blockhash().to_string())).await
state.respond_text(&headers, CacheStrategy::Tip, &uri, |q| Ok(q.tip_blockhash().to_string())).await
},
|op| {
op.id("get_block_tip_hash")
@@ -226,8 +226,8 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<BlockHashTxIndex>,
_: Empty, State(state): State<AppState>| {
let strategy = state.block_cache(Version::ONE, &path.hash);
state.cached_text(&headers, strategy, &uri, move |q| q.block_txid_at_index(&path.hash, path.index).map(|t| t.to_string())).await
let strategy = state.block_strategy(Version::ONE, &path.hash);
state.respond_text(&headers, strategy, &uri, move |q| q.block_txid_at_index(&path.hash, path.index).map(|t| t.to_string())).await
},
|op| {
op.id("get_block_txid")
@@ -251,8 +251,8 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<BlockHashParam>,
_: Empty, State(state): State<AppState>| {
let strategy = state.block_cache(Version::ONE, &path.hash);
state.cached_json(&headers, strategy, &uri, move |q| q.block_txids(&path.hash)).await
let strategy = state.block_strategy(Version::ONE, &path.hash);
state.respond_json(&headers, strategy, &uri, move |q| q.block_txids(&path.hash)).await
},
|op| {
op.id("get_block_txids")
@@ -276,8 +276,8 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<BlockHashParam>,
_: Empty, State(state): State<AppState>| {
let strategy = state.block_cache(Version::ONE, &path.hash);
state.cached_json(&headers, strategy, &uri, move |q| q.block_txs(&path.hash, TxIndex::default())).await
let strategy = state.block_strategy(Version::ONE, &path.hash);
state.respond_json(&headers, strategy, &uri, move |q| q.block_txs(&path.hash, TxIndex::default())).await
},
|op| {
op.id("get_block_txs")
@@ -302,8 +302,8 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<BlockHashStartIndex>,
_: Empty, State(state): State<AppState>| {
let strategy = state.block_cache(Version::ONE, &path.hash);
state.cached_json(&headers, strategy, &uri, move |q| q.block_txs(&path.hash, path.start_index)).await
let strategy = state.block_strategy(Version::ONE, &path.hash);
state.respond_json(&headers, strategy, &uri, move |q| q.block_txs(&path.hash, path.start_index)).await
},
|op| {
op.id("get_block_txs_from_index")
@@ -326,7 +326,7 @@ impl BlockRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.blocks(None))
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.blocks(None))
.await
},
|op| {
@@ -347,7 +347,7 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<HeightParam>,
_: Empty, State(state): State<AppState>| {
state.cached_json(&headers, state.height_cache(Version::ONE, path.height), &uri, move |q| q.blocks(Some(path.height))).await
state.respond_json(&headers, state.height_strategy(Version::ONE, path.height), &uri, move |q| q.blocks(Some(path.height))).await
},
|op| {
op.id("get_blocks_from_height")
@@ -368,7 +368,7 @@ impl BlockRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.blocks_v1(None))
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.blocks_v1(None))
.await
},
|op| {
@@ -389,7 +389,7 @@ impl BlockRoutes for ApiRouter<AppState> {
headers: HeaderMap,
Path(path): Path<HeightParam>,
_: Empty, State(state): State<AppState>| {
state.cached_json(&headers, state.height_cache(Version::ONE, path.height), &uri, move |q| q.blocks_v1(Some(path.height))).await
state.respond_json(&headers, state.height_strategy(Version::ONE, path.height), &uri, move |q| q.blocks_v1(Some(path.height))).await
},
|op| {
op.id("get_blocks_v1_from_height")
+3 -3
View File
@@ -18,7 +18,7 @@ impl FeesRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, state.mempool_cache(), &uri, |q| {
.respond_json(&headers, state.mempool_strategy(), &uri, |q| {
q.mempool_blocks()
})
.await
@@ -39,7 +39,7 @@ impl FeesRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, state.mempool_cache(), &uri, |q| {
.respond_json(&headers, state.mempool_strategy(), &uri, |q| {
q.recommended_fees()
})
.await
@@ -60,7 +60,7 @@ impl FeesRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, state.mempool_cache(), &uri, |q| {
.respond_json(&headers, state.mempool_strategy(), &uri, |q| {
q.recommended_fees()
})
.await
+4 -4
View File
@@ -22,7 +22,7 @@ impl GeneralRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, |q| {
q.difficulty_adjustment()
})
.await
@@ -43,7 +43,7 @@ impl GeneralRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, state.mempool_cache(), &uri, |q| {
.respond_json(&headers, state.mempool_strategy(), &uri, |q| {
Ok(Prices {
time: Timestamp::now(),
usd: q.live_price()?,
@@ -71,10 +71,10 @@ impl GeneralRoutes for ApiRouter<AppState> {
State(state): State<AppState>| {
let strategy = params
.timestamp
.map(|ts| state.timestamp_cache(Version::ONE, ts))
.map(|ts| state.timestamp_strategy(Version::ONE, ts))
.unwrap_or(CacheStrategy::Tip);
state
.cached_json(&headers, strategy, &uri, move |q| {
.respond_json(&headers, strategy, &uri, move |q| {
q.historical_price(params.timestamp)
})
.await
+4 -4
View File
@@ -18,7 +18,7 @@ impl MempoolRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, state.mempool_cache(), &uri, |q| q.mempool_info())
.respond_json(&headers, state.mempool_strategy(), &uri, |q| q.mempool_info())
.await
},
|op| {
@@ -37,7 +37,7 @@ impl MempoolRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, state.mempool_cache(), &uri, |q| q.mempool_txids())
.respond_json(&headers, state.mempool_strategy(), &uri, |q| q.mempool_txids())
.await
},
|op| {
@@ -56,7 +56,7 @@ impl MempoolRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, state.mempool_cache(), &uri, |q| q.mempool_recent())
.respond_json(&headers, state.mempool_strategy(), &uri, |q| q.mempool_recent())
.await
},
|op| {
@@ -75,7 +75,7 @@ impl MempoolRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, state.mempool_cache(), &uri, |q| q.live_price())
.respond_json(&headers, state.mempool_strategy(), &uri, |q| q.live_price())
.await
},
|op| {
+15 -22
View File
@@ -1,8 +1,5 @@
use std::net::SocketAddr;
use aide::axum::{ApiRouter, routing::get_with};
use axum::{
Extension,
extract::{Path, Query, State},
http::{HeaderMap, Uri},
response::{IntoResponse, Response},
@@ -47,7 +44,7 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
"/api/metrics",
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.series_catalog().clone())).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.series_catalog().clone())).await
},
|op| op
.id("get_metrics_tree_deprecated")
@@ -71,7 +68,7 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.series_count())).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.series_count())).await
},
|op| op
.id("get_metrics_count_deprecated")
@@ -95,7 +92,7 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.indexes().to_vec())).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.indexes().to_vec())).await
},
|op| op
.id("get_indexes_deprecated")
@@ -119,7 +116,7 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Query(pagination): Query<Pagination>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, move |q| Ok(q.series_list(pagination))).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, move |q| Ok(q.series_list(pagination))).await
},
|op| op
.id("list_metrics_deprecated")
@@ -143,7 +140,7 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Query(query): Query<SearchQuery>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, move |q| Ok(q.search_series(&query))).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, move |q| Ok(q.search_series(&query))).await
},
|op| op
.id("search_metrics_deprecated")
@@ -162,8 +159,8 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
.api_route(
"/api/metrics/bulk",
get_with(
|uri: Uri, headers: HeaderMap, addr: Extension<SocketAddr>, query: Query<SeriesSelection>, state: State<AppState>| async move {
series_legacy::handler(uri, headers, addr, query, state)
|uri: Uri, headers: HeaderMap, query: Query<SeriesSelection>, state: State<AppState>| async move {
series_legacy::handler(uri, headers, query, state)
.await
.into_response()
},
@@ -192,7 +189,7 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Path(path): Path<LegacySeriesParam>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, move |q| {
state.respond_json(&headers, CacheStrategy::Deploy, &uri, move |q| {
q.series_info(&path.metric).ok_or_else(|| q.series_not_found_error(&path.metric))
}).await
},
@@ -216,13 +213,12 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri,
headers: HeaderMap,
addr: Extension<SocketAddr>,
state: State<AppState>,
Path(path): Path<LegacySeriesWithIndex>,
Query(range): Query<DataRangeFormat>|
-> Response {
let params = SeriesSelection::from((path.index, path.metric, range));
series_legacy::handler(uri, headers, addr, Query(params), state)
series_legacy::handler(uri, headers, Query(params), state)
.await
.into_response()
},
@@ -246,13 +242,12 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri,
headers: HeaderMap,
addr: Extension<SocketAddr>,
state: State<AppState>,
Path(path): Path<LegacySeriesWithIndex>,
Query(range): Query<DataRangeFormat>|
-> Response {
let params = SeriesSelection::from((path.index, path.metric, range));
series_legacy::handler(uri, headers, addr, Query(params), state)
series_legacy::handler(uri, headers, Query(params), state)
.await
.into_response()
},
@@ -280,7 +275,7 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Path(path): Path<LegacySeriesWithIndex>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
q.latest(&path.metric, path.index)
})
.await
@@ -307,7 +302,7 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Path(path): Path<LegacySeriesWithIndex>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
q.len(&path.metric, path.index)
})
.await
@@ -334,7 +329,7 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Path(path): Path<LegacySeriesWithIndex>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
q.version(&path.metric, path.index)
})
.await
@@ -358,7 +353,6 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri,
headers: HeaderMap,
addr: Extension<SocketAddr>,
Path(variant): Path<String>,
Query(range): Query<DataRangeFormat>,
state: State<AppState>|
@@ -379,7 +373,7 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
SeriesList::from(split.collect::<Vec<_>>().join(separator)),
range,
));
series_legacy::handler(uri, headers, addr, Query(params), state)
series_legacy::handler(uri, headers, Query(params), state)
.await
.into_response()
},
@@ -402,12 +396,11 @@ impl ApiMetricsLegacyRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri,
headers: HeaderMap,
addr: Extension<SocketAddr>,
Query(params): Query<SeriesSelectionLegacy>,
state: State<AppState>|
-> Response {
let params: SeriesSelection = params.into();
series_legacy::handler(uri, headers, addr, Query(params), state)
series_legacy::handler(uri, headers, Query(params), state)
.await
.into_response()
},
+18 -24
View File
@@ -2,8 +2,6 @@ use aide::axum::{ApiRouter, routing::get_with};
use axum::{
extract::{Path, State},
http::{HeaderMap, Uri},
response::Redirect,
routing::get,
};
use brk_types::{
BlockFeeRatesEntry, BlockFeesEntry, BlockInfoV1, BlockRewardsEntry, BlockSizesWeights,
@@ -23,16 +21,12 @@ pub trait MiningRoutes {
impl MiningRoutes for ApiRouter<AppState> {
fn add_mining_routes(self) -> Self {
self.route(
"/api/v1/mining",
get(Redirect::temporary("/api#tag/mining")),
)
.api_route(
self.api_route(
"/api/v1/mining/pools",
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
// Pool list is compiled-in, only changes on deploy
state.cached_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.all_pools())).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.all_pools())).await
},
|op| {
op.id("get_pools")
@@ -49,7 +43,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/pools/{time_period}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<TimePeriodParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.mining_pools(path.time_period)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.mining_pools(path.time_period)).await
},
|op| {
op.id("get_pool_stats")
@@ -67,7 +61,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/pool/{slug}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<PoolSlugParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.pool_detail(path.slug)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.pool_detail(path.slug)).await
},
|op| {
op.id("get_pool")
@@ -85,7 +79,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/hashrate/pools",
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, |q| q.pools_hashrate(None)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, |q| q.pools_hashrate(None)).await
},
|op| {
op.id("get_pools_hashrate")
@@ -102,7 +96,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/hashrate/pools/{time_period}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<TimePeriodParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.pools_hashrate(Some(path.time_period))).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.pools_hashrate(Some(path.time_period))).await
},
|op| {
op.id("get_pools_hashrate_by_period")
@@ -120,7 +114,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/pool/{slug}/hashrate",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<PoolSlugParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.pool_hashrate(path.slug)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.pool_hashrate(path.slug)).await
},
|op| {
op.id("get_pool_hashrate")
@@ -138,7 +132,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/pool/{slug}/blocks",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<PoolSlugParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.pool_blocks(path.slug, None)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.pool_blocks(path.slug, None)).await
},
|op| {
op.id("get_pool_blocks")
@@ -156,7 +150,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/pool/{slug}/blocks/{height}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(PoolSlugAndHeightParam {slug, height}): Path<PoolSlugAndHeightParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, state.height_cache(Version::ONE, height), &uri, move |q| q.pool_blocks(slug, Some(height))).await
state.respond_json(&headers, state.height_strategy(Version::ONE, height), &uri, move |q| q.pool_blocks(slug, Some(height))).await
},
|op| {
op.id("get_pool_blocks_from")
@@ -174,7 +168,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/hashrate",
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, |q| q.hashrate(None)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, |q| q.hashrate(None)).await
},
|op| {
op.id("get_hashrate")
@@ -191,7 +185,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/hashrate/{time_period}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<TimePeriodParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.hashrate(Some(path.time_period))).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.hashrate(Some(path.time_period))).await
},
|op| {
op.id("get_hashrate_by_period")
@@ -209,7 +203,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/difficulty-adjustments",
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, |q| q.difficulty_adjustments(None)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, |q| q.difficulty_adjustments(None)).await
},
|op| {
op.id("get_difficulty_adjustments")
@@ -226,7 +220,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/difficulty-adjustments/{time_period}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<TimePeriodParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.difficulty_adjustments(Some(path.time_period))).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.difficulty_adjustments(Some(path.time_period))).await
},
|op| {
op.id("get_difficulty_adjustments_by_period")
@@ -244,7 +238,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/reward-stats/{block_count}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<BlockCountParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.reward_stats(path.block_count)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.reward_stats(path.block_count)).await
},
|op| {
op.id("get_reward_stats")
@@ -262,7 +256,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/blocks/fees/{time_period}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<TimePeriodParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.block_fees(path.time_period)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.block_fees(path.time_period)).await
},
|op| {
op.id("get_block_fees")
@@ -280,7 +274,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/blocks/rewards/{time_period}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<TimePeriodParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.block_rewards(path.time_period)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.block_rewards(path.time_period)).await
},
|op| {
op.id("get_block_rewards")
@@ -298,7 +292,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/blocks/fee-rates/{time_period}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<TimePeriodParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.block_fee_rates(path.time_period)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.block_fee_rates(path.time_period)).await
},
|op| {
op.id("get_block_fee_rates")
@@ -316,7 +310,7 @@ impl MiningRoutes for ApiRouter<AppState> {
"/api/v1/mining/blocks/sizes-weights/{time_period}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(path): Path<TimePeriodParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| q.block_sizes_weights(path.time_period)).await
state.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| q.block_sizes_weights(path.time_period)).await
},
|op| {
op.id("get_block_sizes_weights")
+7 -11
View File
@@ -1,9 +1,4 @@
use std::sync::Arc;
use aide::{
axum::{ApiRouter, routing::get_with},
openapi::OpenApi,
};
use aide::axum::{ApiRouter, routing::get_with};
use axum::{
Extension,
http::HeaderMap,
@@ -63,13 +58,14 @@ impl ApiRoutes for ApiRouter<AppState> {
.add_fees_routes()
.add_mempool_routes()
.add_tx_routes()
.route("/api/server", get(Redirect::temporary("/api#tag/server")))
.api_route(
"/openapi.json",
get_with(
async |headers: HeaderMap,
Extension(api): Extension<Arc<OpenApi>>|
-> Response { Response::static_json(&headers, &*api) },
Extension(api): Extension<OpenApiJson>|
-> Response {
Response::static_json_bytes(&headers, api.bytes())
},
|op| {
op.id("get_openapi")
.server_tag()
@@ -82,9 +78,9 @@ impl ApiRoutes for ApiRouter<AppState> {
"/api.json",
get_with(
async |headers: HeaderMap,
Extension(api): Extension<Arc<ApiJson>>|
Extension(api): Extension<ApiJson>|
-> Response {
Response::static_json(&headers, api.to_json())
Response::static_json_bytes(&headers, api.bytes())
},
|op| {
op.id("get_api")
+7 -6
View File
@@ -1,19 +1,20 @@
use aide::openapi::OpenApi;
use derive_more::Deref;
use axum::body::Bytes;
use serde_json::{Map, Value};
/// Compact OpenAPI spec optimized for LLM consumption.
#[derive(Deref)]
pub struct ApiJson(String);
/// Pre-serialized at startup, served as raw bytes per request.
#[derive(Clone)]
pub struct ApiJson(Bytes);
impl ApiJson {
pub fn new(openapi: &OpenApi) -> Self {
let json = serde_json::to_string(openapi).unwrap();
Self(compact_json(&json))
Self(Bytes::from(compact_json(&json)))
}
pub fn to_json(&self) -> serde_json::Value {
serde_json::from_str(&self.0).unwrap()
pub fn bytes(&self) -> Bytes {
self.0.clone()
}
}
+16
View File
@@ -0,0 +1,16 @@
use aide::openapi::OpenApi;
use axum::body::Bytes;
/// Full OpenAPI spec, pre-serialized at startup and served as raw bytes per request.
#[derive(Clone)]
pub struct OpenApiJson(Bytes);
impl OpenApiJson {
pub fn new(openapi: &OpenApi) -> Self {
Self(Bytes::from(serde_json::to_vec(openapi).unwrap()))
}
pub fn bytes(&self) -> Bytes {
self.0.clone()
}
}
+2
View File
@@ -11,8 +11,10 @@
//
mod compact;
mod full;
pub use compact::ApiJson;
pub use full::OpenApiJson;
use aide::openapi::{Contact, Info, License, OpenApi, Tag};
+17 -27
View File
@@ -4,11 +4,8 @@
//! a formatted body (single + raw + bulk + the legacy module's deprecated
//! handler in `series_legacy.rs`).
use std::net::SocketAddr;
use aide::axum::{ApiRouter, routing::get_with};
use axum::{
Extension,
body::Bytes,
extract::{Path, Query, State},
http::{HeaderMap, Uri},
@@ -31,17 +28,16 @@ use crate::{
/// Shared response pipeline for every series endpoint.
///
/// Resolves the query (which determines the cache key), then delegates to
/// [`AppState::cached_with_params`] for the etag short-circuit, server-side
/// [`AppState::respond_with_params`] for the etag short-circuit, server-side
/// cache lookup, body formatting, and header assembly.
pub(super) async fn serve(
state: AppState,
uri: Uri,
headers: HeaderMap,
addr: SocketAddr,
params: SeriesSelection,
to_bytes: impl FnOnce(&BrkQuery, ResolvedQuery) -> BrkResult<Bytes> + Send + 'static,
) -> Result<Response> {
let max_weight = state.max_weight_for(&addr);
let max_weight = state.max_weight;
let resolved = state.run(move |q| q.resolve(params, max_weight)).await?;
let format = resolved.format();
@@ -54,7 +50,7 @@ pub(super) async fn serve(
);
Ok(state
.cached_with_params(
.respond_with_params(
&headers,
&uri,
cache_params,
@@ -65,7 +61,7 @@ pub(super) async fn serve(
}
Format::JSON => h.insert_content_type_application_json(),
},
move |q, enc| Ok(enc.compress(to_bytes(q, resolved)?)),
move |q| to_bytes(q, resolved),
)
.await)
}
@@ -80,11 +76,10 @@ fn output_to_bytes(out: brk_types::SeriesOutput) -> BrkResult<Bytes> {
async fn data_handler(
uri: Uri,
headers: HeaderMap,
Extension(addr): Extension<SocketAddr>,
Query(params): Query<SeriesSelection>,
State(state): State<AppState>,
) -> Result<Response> {
serve(state, uri, headers, addr, params, |q, r| {
serve(state, uri, headers, params, |q, r| {
output_to_bytes(q.format(r)?)
})
.await
@@ -93,11 +88,10 @@ async fn data_handler(
async fn data_raw_handler(
uri: Uri,
headers: HeaderMap,
Extension(addr): Extension<SocketAddr>,
Query(params): Query<SeriesSelection>,
State(state): State<AppState>,
) -> Result<Response> {
serve(state, uri, headers, addr, params, |q, r| {
serve(state, uri, headers, params, |q, r| {
output_to_bytes(q.format_raw(r)?)
})
.await
@@ -113,7 +107,7 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
"/api/series",
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.series_catalog().clone())).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.series_catalog().clone())).await
},
|op| op
.id("get_series_tree")
@@ -136,7 +130,7 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.series_count())).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.series_count())).await
},
|op| op
.id("get_series_count")
@@ -156,7 +150,7 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.indexes().to_vec())).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, |q| Ok(q.indexes().to_vec())).await
},
|op| op
.id("get_indexes")
@@ -178,7 +172,7 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Query(pagination): Query<Pagination>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, move |q| Ok(q.series_list(pagination))).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, move |q| Ok(q.series_list(pagination))).await
},
|op| op
.id("list_series")
@@ -198,7 +192,7 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Query(query): Query<SearchQuery>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, move |q| Ok(q.search_series(&query))).await
state.respond_json(&headers, CacheStrategy::Deploy, &uri, move |q| Ok(q.search_series(&query))).await
},
|op| op
.id("search_series")
@@ -220,7 +214,7 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Path(path): Path<SeriesParam>
| {
state.cached_json(&headers, CacheStrategy::Deploy, &uri, move |q| {
state.respond_json(&headers, CacheStrategy::Deploy, &uri, move |q| {
q.series_info(&path.series).ok_or_else(|| q.series_not_found_error(&path.series))
}).await
},
@@ -242,7 +236,6 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri,
headers: HeaderMap,
addr: Extension<SocketAddr>,
state: State<AppState>,
Path(path): Path<SeriesNameWithIndex>,
Query(range): Query<DataRangeFormat>|
@@ -250,7 +243,6 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
data_handler(
uri,
headers,
addr,
Query(SeriesSelection::from((path.index, path.series, range))),
state,
)
@@ -276,7 +268,6 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri,
headers: HeaderMap,
addr: Extension<SocketAddr>,
state: State<AppState>,
Path(path): Path<SeriesNameWithIndex>,
Query(range): Query<DataRangeFormat>|
@@ -284,7 +275,6 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
data_raw_handler(
uri,
headers,
addr,
Query(SeriesSelection::from((path.index, path.series, range))),
state,
)
@@ -314,7 +304,7 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Path(path): Path<SeriesNameWithIndex>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
q.latest(&path.series, path.index)
})
.await
@@ -340,7 +330,7 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Path(path): Path<SeriesNameWithIndex>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
q.len(&path.series, path.index)
})
.await
@@ -364,7 +354,7 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Path(path): Path<SeriesNameWithIndex>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
q.version(&path.series, path.index)
})
.await
@@ -382,8 +372,8 @@ impl ApiSeriesRoutes for ApiRouter<AppState> {
.api_route(
"/api/series/bulk",
get_with(
|uri, headers, addr, query, state| async move {
data_handler(uri, headers, addr, query, state).await.into_response()
|uri, headers, query, state| async move {
data_handler(uri, headers, query, state).await.into_response()
},
|op| op
.id("get_series_bulk")
+6 -8
View File
@@ -5,11 +5,10 @@
//! in legacy mode (registered by metrics endpoints that emit the old format).
//! - `add_series_legacy_routes`: the deprecated `/api/series/cost-basis/*` URLs.
use std::{collections::BTreeMap, net::SocketAddr};
use std::collections::BTreeMap;
use aide::axum::{ApiRouter, routing::get_with};
use axum::{
Extension,
body::Bytes,
extract::{Path, Query, State},
http::{HeaderMap, StatusCode, Uri},
@@ -40,12 +39,11 @@ pub const SUNSET: &str = "2027-01-01T00:00:00Z";
pub async fn handler(
uri: Uri,
headers: HeaderMap,
Extension(addr): Extension<SocketAddr>,
Query(params): Query<SeriesSelection>,
State(state): State<AppState>,
) -> Result<Response> {
let mut response =
super::series::serve(state, uri, headers, addr, params, legacy_bytes).await?;
super::series::serve(state, uri, headers, params, legacy_bytes).await?;
if response.status() == StatusCode::OK {
response.headers_mut().insert_deprecation(SUNSET);
}
@@ -152,7 +150,7 @@ impl ApiSeriesLegacyRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, CacheStrategy::Deploy, &uri, |q| q.urpd_cohorts())
.respond_json(&headers, CacheStrategy::Deploy, &uri, |q| q.urpd_cohorts())
.await
},
|op| {
@@ -179,7 +177,7 @@ impl ApiSeriesLegacyRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
q.urpd_dates(&params.cohort)
})
.await
@@ -208,9 +206,9 @@ impl ApiSeriesLegacyRoutes for ApiRouter<AppState> {
Path(params): Path<CostBasisParams>,
Query(query): Query<CostBasisQuery>,
State(state): State<AppState>| {
let strategy = state.date_cache(Version::ONE, params.date);
let strategy = state.date_strategy(Version::ONE, params.date);
state
.cached_json(&headers, strategy, &uri, move |q| {
.respond_json(&headers, strategy, &uri, move |q| {
cost_basis_formatted(
q,
&params.cohort,
+16 -6
View File
@@ -4,10 +4,15 @@ use aide::axum::{ApiRouter, routing::get_with};
use axum::{
extract::State,
http::{HeaderMap, Uri},
response::{IntoResponse, Response},
};
use brk_types::{DiskUsage, Health, SyncStatus};
use crate::{CacheStrategy, VERSION, extended::TransformResponseExtended, params::Empty};
use crate::{
CacheStrategy, VERSION,
extended::{HeaderMapExtended, TransformResponseExtended},
params::Empty,
};
use super::AppState;
@@ -20,7 +25,7 @@ impl ServerRoutes for ApiRouter<AppState> {
self.api_route(
"/health",
get_with(
async |_: Empty, State(state): State<AppState>| -> axum::Json<Health> {
async |_: Empty, State(state): State<AppState>| -> Response {
let uptime = state.started_instant.elapsed();
let started_at = state.started_at.to_string();
let sync = state
@@ -33,7 +38,7 @@ impl ServerRoutes for ApiRouter<AppState> {
})
.await
.expect("health sync task panicked");
axum::Json(Health {
let mut response = axum::Json(Health {
status: Cow::Borrowed("healthy"),
service: Cow::Borrowed("brk"),
version: Cow::Borrowed(VERSION),
@@ -42,6 +47,11 @@ impl ServerRoutes for ApiRouter<AppState> {
uptime_seconds: uptime.as_secs(),
sync,
})
.into_response();
let h = response.headers_mut();
h.insert_cache_control("no-store");
h.insert_cdn_cache_control("no-store");
response
},
|op| {
op.id("get_health")
@@ -57,7 +67,7 @@ impl ServerRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, CacheStrategy::Deploy, &uri, |_| {
.respond_json(&headers, CacheStrategy::Deploy, &uri, |_| {
Ok(env!("CARGO_PKG_VERSION"))
})
.await
@@ -77,7 +87,7 @@ impl ServerRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
let tip_height = q.client().get_last_height()?;
Ok(q.sync_status(tip_height))
})
@@ -102,7 +112,7 @@ impl ServerRoutes for ApiRouter<AppState> {
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
let brk_path = state.data_path.clone();
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
let brk_bytes = dir_size(&brk_path)?;
let bitcoin_bytes = dir_size(q.blocks_dir())?;
Ok(DiskUsage::new(brk_bytes, bitcoin_bytes))
+12 -12
View File
@@ -28,7 +28,7 @@ impl TxRoutes for ApiRouter<AppState> {
"/api/tx-index/{index}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(param): Path<TxIndexParam>, _: Empty, State(state): State<AppState>| {
state.cached_text(&headers, CacheStrategy::Immutable(Version::ONE), &uri, move |q| q.txid_by_index(param.index).map(|t| t.to_string())).await
state.respond_text(&headers, CacheStrategy::Immutable(Version::ONE), &uri, move |q| q.txid_by_index(param.index).map(|t| t.to_string())).await
},
|op| op
.id("get_tx_by_index")
@@ -46,7 +46,7 @@ impl TxRoutes for ApiRouter<AppState> {
"/api/v1/cpfp/{txid}",
get_with(
async |uri: Uri, headers: HeaderMap, Path(param): Path<TxidParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, state.tx_cache(Version::ONE, &param.txid), &uri, move |q| q.cpfp(&param.txid)).await
state.respond_json(&headers, state.tx_strategy(Version::ONE, &param.txid), &uri, move |q| q.cpfp(&param.txid)).await
},
|op| op
.id("get_cpfp")
@@ -64,7 +64,7 @@ impl TxRoutes for ApiRouter<AppState> {
"/api/v1/tx/{txid}/rbf",
get_with(
async |uri: Uri, headers: HeaderMap, Path(param): Path<TxidParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, state.mempool_cache(), &uri, move |q| q.tx_rbf(&param.txid)).await
state.respond_json(&headers, state.mempool_strategy(), &uri, move |q| q.tx_rbf(&param.txid)).await
},
|op| op
.id("get_tx_rbf")
@@ -88,7 +88,7 @@ impl TxRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>
| {
state.cached_json(&headers, state.tx_cache(Version::ONE, &param.txid), &uri, move |q| q.transaction(&param.txid)).await
state.respond_json(&headers, state.tx_strategy(Version::ONE, &param.txid), &uri, move |q| q.transaction(&param.txid)).await
},
|op| op
.id("get_tx")
@@ -114,7 +114,7 @@ impl TxRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>
| {
state.cached_text(&headers, state.tx_cache(Version::ONE, &param.txid), &uri, move |q| q.transaction_hex(&param.txid)).await
state.respond_text(&headers, state.tx_strategy(Version::ONE, &param.txid), &uri, move |q| q.transaction_hex(&param.txid)).await
},
|op| op
.id("get_tx_hex")
@@ -134,7 +134,7 @@ impl TxRoutes for ApiRouter<AppState> {
"/api/tx/{txid}/merkleblock-proof",
get_with(
async |uri: Uri, headers: HeaderMap, Path(param): Path<TxidParam>, _: Empty, State(state): State<AppState>| {
state.cached_text(&headers, state.tx_cache(Version::ONE, &param.txid), &uri, move |q| q.merkleblock_proof(&param.txid)).await
state.respond_text(&headers, state.tx_strategy(Version::ONE, &param.txid), &uri, move |q| q.merkleblock_proof(&param.txid)).await
},
|op| op
.id("get_tx_merkleblock_proof")
@@ -152,7 +152,7 @@ impl TxRoutes for ApiRouter<AppState> {
"/api/tx/{txid}/merkle-proof",
get_with(
async |uri: Uri, headers: HeaderMap, Path(param): Path<TxidParam>, _: Empty, State(state): State<AppState>| {
state.cached_json(&headers, state.tx_cache(Version::ONE, &param.txid), &uri, move |q| q.merkle_proof(&param.txid)).await
state.respond_json(&headers, state.tx_strategy(Version::ONE, &param.txid), &uri, move |q| q.merkle_proof(&param.txid)).await
},
|op| op
.id("get_tx_merkle_proof")
@@ -177,7 +177,7 @@ impl TxRoutes for ApiRouter<AppState> {
State(state): State<AppState>
| {
let v = Version::ONE;
state.cached_json_optimistic(&headers, CacheStrategy::Immutable(v), &uri, move |q| {
state.respond_json_optimistic(&headers, CacheStrategy::Immutable(v), &uri, move |q| {
let outspend = q.outspend(&path.txid, path.vout)?;
let strategy = if outspend.is_deeply_spent(q.height()) {
CacheStrategy::Immutable(v)
@@ -212,7 +212,7 @@ impl TxRoutes for ApiRouter<AppState> {
State(state): State<AppState>
| {
let v = Version::ONE;
state.cached_json_optimistic(&headers, CacheStrategy::Immutable(v), &uri, move |q| {
state.respond_json_optimistic(&headers, CacheStrategy::Immutable(v), &uri, move |q| {
let outspends = q.outspends(&param.txid)?;
let height = q.height();
let all_deep = outspends.iter().all(|o| o.is_deeply_spent(height));
@@ -238,7 +238,7 @@ impl TxRoutes for ApiRouter<AppState> {
"/api/tx/{txid}/raw",
get_with(
async |uri: Uri, headers: HeaderMap, Path(param): Path<TxidParam>, _: Empty, State(state): State<AppState>| {
state.cached_bytes(&headers, state.tx_cache(Version::ONE, &param.txid), &uri, move |q| q.transaction_raw(&param.txid)).await
state.respond_bytes(&headers, state.tx_strategy(Version::ONE, &param.txid), &uri, move |q| q.transaction_raw(&param.txid)).await
},
|op| op
.id("get_tx_raw")
@@ -262,7 +262,7 @@ impl TxRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>
| {
state.cached_json(&headers, state.tx_cache(Version::ONE, &param.txid), &uri, move |q| q.transaction_status(&param.txid)).await
state.respond_json(&headers, state.tx_strategy(Version::ONE, &param.txid), &uri, move |q| q.transaction_status(&param.txid)).await
},
|op| op
.id("get_tx_status")
@@ -284,7 +284,7 @@ impl TxRoutes for ApiRouter<AppState> {
async |uri: Uri, headers: HeaderMap, State(state): State<AppState>| -> Result<Response> {
let params = TxidsParam::from_query(uri.query().unwrap_or(""))
.map_err(Error::bad_request)?;
Ok(state.cached_json(&headers, state.mempool_cache(), &uri, move |q| q.transaction_times(&params.txids)).await)
Ok(state.respond_json(&headers, state.mempool_strategy(), &uri, move |q| q.transaction_times(&params.txids)).await)
},
|op| op
.id("get_transaction_times")
+5 -5
View File
@@ -24,7 +24,7 @@ impl ApiUrpdRoutes for ApiRouter<AppState> {
get_with(
async |uri: Uri, headers: HeaderMap, _: Empty, State(state): State<AppState>| {
state
.cached_json(&headers, CacheStrategy::Deploy, &uri, |q| q.urpd_cohorts())
.respond_json(&headers, CacheStrategy::Deploy, &uri, |q| q.urpd_cohorts())
.await
},
|op| {
@@ -50,7 +50,7 @@ impl ApiUrpdRoutes for ApiRouter<AppState> {
_: Empty,
State(state): State<AppState>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
q.urpd_dates(&params.cohort)
})
.await
@@ -79,7 +79,7 @@ impl ApiUrpdRoutes for ApiRouter<AppState> {
Query(query): Query<UrpdQuery>,
State(state): State<AppState>| {
state
.cached_json(&headers, CacheStrategy::Tip, &uri, move |q| {
.respond_json(&headers, CacheStrategy::Tip, &uri, move |q| {
q.urpd_latest(&params.cohort, query.aggregation)
})
.await
@@ -108,9 +108,9 @@ impl ApiUrpdRoutes for ApiRouter<AppState> {
Path(params): Path<UrpdParams>,
Query(query): Query<UrpdQuery>,
State(state): State<AppState>| {
let strategy = state.date_cache(Version::ONE, params.date);
let strategy = state.date_strategy(Version::ONE, params.date);
state
.cached_json(&headers, strategy, &uri, move |q| {
.respond_json(&headers, strategy, &uri, move |q| {
q.urpd_at(&params.cohort, params.date, query.aggregation)
})
.await
+39
View File
@@ -125,3 +125,42 @@ impl CacheParams {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn v(n: u32) -> Version {
Version::new(n)
}
fn h(n: u64) -> BlockHashPrefix {
BlockHashPrefix::from(n)
}
#[test]
fn series_tail_uses_tip_hash() {
let p = CacheParams::series(v(3), 100, 100, h(0xabcd));
assert_eq!(p.etag.as_str(), "s3-abcd");
}
#[test]
fn series_historical_uses_total() {
let p = CacheParams::series(v(3), 100, 50, h(0xabcd));
assert_eq!(p.etag.as_str(), "s3-100");
}
#[test]
fn series_historical_ignores_tip_hash() {
let a = CacheParams::series(v(3), 100, 50, h(0xabcd));
let b = CacheParams::series(v(3), 100, 50, h(0xdead));
assert_eq!(a.etag.as_str(), b.etag.as_str());
}
#[test]
fn series_tail_changes_with_tip_hash() {
let a = CacheParams::series(v(3), 100, 100, h(0xabcd));
let b = CacheParams::series(v(3), 100, 100, h(0xdead));
assert_ne!(a.etag.as_str(), b.etag.as_str());
}
}
+3 -13
View File
@@ -4,15 +4,9 @@ use brk_website::Website;
use crate::cache::CdnCacheMode;
/// Default max series-query response weight for non-loopback clients.
/// `4 * 8 * 10_000` = 320 KB (4 vecs x 8 bytes x 10k rows).
pub const DEFAULT_MAX_WEIGHT: usize = 4 * 8 * 10_000;
/// Default max series-query response weight for loopback clients.
pub const DEFAULT_MAX_WEIGHT_LOCALHOST: usize = 50 * 1_000_000;
/// Default LRU capacity for the in-process response cache.
pub const DEFAULT_CACHE_SIZE: usize = 1_000;
/// Default max series-query response weight.
/// 50 MB - generous enough for any honest query, low enough to limit cache-buster leverage.
pub const DEFAULT_MAX_WEIGHT: usize = 50 * 1_000_000;
/// Server-wide configuration set at startup.
#[derive(Debug, Clone)]
@@ -21,8 +15,6 @@ pub struct ServerConfig {
pub website: Website,
pub cdn_cache_mode: CdnCacheMode,
pub max_weight: usize,
pub max_weight_localhost: usize,
pub cache_size: usize,
}
impl Default for ServerConfig {
@@ -32,8 +24,6 @@ impl Default for ServerConfig {
website: Website::default(),
cdn_cache_mode: CdnCacheMode::default(),
max_weight: DEFAULT_MAX_WEIGHT,
max_weight_localhost: DEFAULT_MAX_WEIGHT_LOCALHOST,
cache_size: DEFAULT_CACHE_SIZE,
}
}
}
@@ -1,98 +0,0 @@
use axum::{
body::Bytes,
http::{HeaderMap, HeaderValue, header},
};
/// HTTP content encoding for pre-compressed caching.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContentEncoding {
Brotli,
Gzip,
Zstd,
Identity,
}
impl ContentEncoding {
/// Negotiate the best encoding from the Accept-Encoding header.
/// Priority: zstd > br > gzip > identity.
/// zstd is preferred over brotli: ~3-5x faster compression at comparable ratios.
/// Respects q=0 (RFC 9110 §12.5.3): encodings explicitly rejected are never selected.
pub fn negotiate(headers: &HeaderMap) -> Self {
let accept = match headers.get(header::ACCEPT_ENCODING) {
Some(v) => v,
None => return Self::Identity,
};
let s = match accept.to_str() {
Ok(s) => s,
Err(_) => return Self::Identity,
};
let mut best = Self::Identity;
for part in s.split(',') {
let mut iter = part.split(';');
let name = iter.next().unwrap_or("").trim();
let rejected = iter.any(|p| {
let p = p.trim();
p == "q=0" || p == "q=0.0" || p == "q=0.00" || p == "q=0.000"
});
if rejected {
continue;
}
match name {
"zstd" => return Self::Zstd,
"br" => best = Self::Brotli,
"gzip" if matches!(best, Self::Identity) => best = Self::Gzip,
_ => {}
}
}
best
}
/// Compress bytes with this encoding. Identity returns bytes unchanged.
pub fn compress(self, bytes: Bytes) -> Bytes {
match self {
Self::Identity => bytes,
Self::Brotli => {
use std::io::Write;
let mut output = Vec::with_capacity(bytes.len() / 2);
{
let mut writer = brotli::CompressorWriter::new(&mut output, 4096, 4, 22);
writer.write_all(&bytes).expect("brotli compression failed");
}
Bytes::from(output)
}
Self::Gzip => {
use flate2::write::GzEncoder;
use std::io::Write;
let mut encoder = GzEncoder::new(
Vec::with_capacity(bytes.len() / 2),
flate2::Compression::new(3),
);
encoder.write_all(&bytes).expect("gzip compression failed");
Bytes::from(encoder.finish().expect("gzip finish failed"))
}
Self::Zstd => {
Bytes::from(zstd::encode_all(bytes.as_ref(), 3).expect("zstd compression failed"))
}
}
}
/// Wire name used for Content-Encoding header and cache key suffix.
#[inline]
pub fn as_str(self) -> &'static str {
match self {
Self::Brotli => "br",
Self::Gzip => "gzip",
Self::Zstd => "zstd",
Self::Identity => "identity",
}
}
#[inline]
pub(crate) fn header_value(self) -> Option<HeaderValue> {
match self {
Self::Identity => None,
_ => Some(HeaderValue::from_static(self.as_str())),
}
}
}
+42 -19
View File
@@ -3,8 +3,6 @@ use axum::http::{
header::{self, IF_NONE_MATCH},
};
use super::ContentEncoding;
pub trait HeaderMapExtended {
fn has_etag(&self, etag: &str) -> bool;
fn insert_etag(&mut self, etag: &str);
@@ -14,8 +12,6 @@ pub trait HeaderMapExtended {
fn insert_content_disposition_attachment(&mut self, filename: &str);
fn insert_content_encoding(&mut self, encoding: ContentEncoding);
fn insert_content_type_application_json(&mut self);
fn insert_content_type_text_csv(&mut self);
@@ -27,18 +23,17 @@ pub trait HeaderMapExtended {
impl HeaderMapExtended for HeaderMap {
fn has_etag(&self, etag: &str) -> bool {
self.get(IF_NONE_MATCH).is_some_and(|v| {
let s = v.as_bytes();
// Match both quoted and unquoted: "etag" or etag
s == etag.as_bytes()
|| (s.len() == etag.len() + 2
&& s[0] == b'"'
&& s[s.len() - 1] == b'"'
&& &s[1..s.len() - 1] == etag.as_bytes())
let raw = v.as_bytes();
let target = etag.as_bytes();
raw == b"*"
|| raw
.split(|&b| b == b',')
.any(|entry| normalize_etag(entry) == target)
})
}
fn insert_etag(&mut self, etag: &str) {
self.insert(header::ETAG, format!("\"{etag}\"").parse().unwrap());
self.insert(header::ETAG, format!("W/\"{etag}\"").parse().unwrap());
}
fn insert_cache_control(&mut self, value: &str) {
@@ -61,13 +56,6 @@ impl HeaderMapExtended for HeaderMap {
);
}
fn insert_content_encoding(&mut self, encoding: ContentEncoding) {
if let Some(value) = encoding.header_value() {
self.insert(header::CONTENT_ENCODING, value);
self.insert_vary_accept_encoding();
}
}
fn insert_content_type_application_json(&mut self) {
self.insert(header::CONTENT_TYPE, "application/json".parse().unwrap());
}
@@ -85,3 +73,38 @@ impl HeaderMapExtended for HeaderMap {
self.insert("Sunset", sunset.parse().unwrap());
}
}
fn normalize_etag(entry: &[u8]) -> &[u8] {
let s = entry.trim_ascii();
let s = s.strip_prefix(b"W/").unwrap_or(s);
s.strip_prefix(b"\"")
.and_then(|s| s.strip_suffix(b"\""))
.unwrap_or(s)
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::HeaderValue;
fn map(if_none_match: &str) -> HeaderMap {
let mut h = HeaderMap::new();
h.insert(IF_NONE_MATCH, HeaderValue::from_str(if_none_match).unwrap());
h
}
#[test]
fn matches_weak_strong_wildcard_and_list() {
assert!(map("W/\"s1-abc\"").has_etag("s1-abc"));
assert!(map("\"s1-abc\"").has_etag("s1-abc"));
assert!(map("*").has_etag("anything"));
assert!(map("W/\"a\", W/\"s1-abc\"").has_etag("s1-abc"));
assert!(map(" W/\"s1-abc\" ").has_etag("s1-abc"));
}
#[test]
fn rejects_mismatch_and_missing() {
assert!(!map("W/\"other\"").has_etag("s1-abc"));
assert!(!HeaderMap::new().has_etag("s1-abc"));
}
}
-2
View File
@@ -1,9 +1,7 @@
mod encoding;
mod header_map;
mod response;
mod transform_operation;
pub use encoding::*;
pub use header_map::*;
pub use response::*;
pub use transform_operation::*;
+8 -19
View File
@@ -1,30 +1,18 @@
use axum::{
body::Body,
body::{Body, Bytes},
http::{HeaderMap, Response, StatusCode, header},
response::IntoResponse,
};
use serde::Serialize;
use super::header_map::HeaderMapExtended;
use crate::cache::CacheParams;
fn new_json_cached<T: Serialize>(value: T, params: &CacheParams) -> Response<Body> {
let bytes = serde_json::to_vec(&value).unwrap();
let mut response = Response::builder().body(bytes.into()).unwrap();
let h = response.headers_mut();
h.insert_content_type_application_json();
params.apply_to(h);
response
}
pub trait ResponseExtended
where
Self: Sized,
{
fn new_not_modified(params: &CacheParams) -> Self;
fn static_json<T>(headers: &HeaderMap, value: T) -> Self
where
T: Serialize;
fn static_json_bytes(headers: &HeaderMap, bytes: Bytes) -> Self;
fn static_bytes(
headers: &HeaderMap,
bytes: &'static [u8],
@@ -40,15 +28,16 @@ impl ResponseExtended for Response<Body> {
response
}
fn static_json<T>(headers: &HeaderMap, value: T) -> Self
where
T: Serialize,
{
fn static_json_bytes(headers: &HeaderMap, bytes: Bytes) -> Self {
let params = CacheParams::deploy();
if params.matches_etag(headers) {
return Self::new_not_modified(&params);
}
new_json_cached(value, &params)
let mut response = Response::new(Body::from(bytes));
let h = response.headers_mut();
h.insert_content_type_application_json();
params.apply_to(h);
response
}
fn static_bytes(
+74 -57
View File
@@ -2,8 +2,6 @@
use std::{
any::Any,
net::SocketAddr,
sync::{Arc, atomic::AtomicU64},
time::{Duration, Instant},
};
@@ -16,7 +14,7 @@ use axum::{
body::Body,
http::{
Request, Response, StatusCode, Uri,
header::{CONTENT_TYPE, VARY},
header::{ALLOW, CONTENT_TYPE, VARY},
},
middleware::Next,
response::{IntoResponse, Redirect},
@@ -24,12 +22,15 @@ use axum::{
serve,
};
use brk_query::AsyncQuery;
use quick_cache::sync::Cache;
use tokio::net::TcpListener;
use tower_http::{
catch_panic::CatchPanicLayer, classify::ServerErrorsFailureClass,
compression::CompressionLayer, cors::CorsLayer, normalize_path::NormalizePathLayer,
timeout::TimeoutLayer, trace::TraceLayer,
catch_panic::CatchPanicLayer,
classify::ServerErrorsFailureClass,
compression::{CompressionLayer, CompressionLevel},
cors::CorsLayer,
normalize_path::NormalizePathLayer,
timeout::TimeoutLayer,
trace::TraceLayer,
};
use tower_layer::Layer;
use tracing::{error, info};
@@ -49,14 +50,27 @@ pub use brk_types::Port;
pub use brk_website::Website;
pub use cache::CdnCacheMode;
use cache::{CacheParams, CacheStrategy};
pub use config::{
DEFAULT_CACHE_SIZE, DEFAULT_MAX_WEIGHT, DEFAULT_MAX_WEIGHT_LOCALHOST, ServerConfig,
};
pub use config::{DEFAULT_MAX_WEIGHT, ServerConfig};
pub use error::{Error, Result};
use state::*;
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
/// Cap for buffering an upstream error body before re-wrapping it as JSON.
/// Larger bodies are truncated; the bound only affects the message we surface.
const MAX_ERROR_BODY_BYTES: usize = 4096;
/// Per-request timeout. Hits return 504 Gateway Timeout.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
/// Matches `application/json` and `application/...+json`, ignoring parameters
/// like `; charset=utf-8`. Used to skip JSON-error rewriting for already-JSON bodies.
fn is_json_content_type(s: &str) -> bool {
let mime = s.split(';').next().unwrap_or("").trim();
mime == "application/json"
|| (mime.starts_with("application/") && mime.ends_with("+json"))
}
pub struct Server(AppState);
impl Server {
@@ -67,12 +81,9 @@ impl Server {
query: query.clone(),
data_path: config.data_path,
website: config.website,
cache: Arc::new(Cache::new(config.cache_size)),
last_tip: Arc::new(AtomicU64::new(0)),
started_at: jiff::Timestamp::now(),
started_instant: Instant::now(),
max_weight: config.max_weight,
max_weight_localhost: config.max_weight_localhost,
})
}
@@ -82,26 +93,11 @@ impl Server {
#[cfg(feature = "bindgen")]
let vecs = state.query.inner().vecs();
let compression_layer = CompressionLayer::new().br(true).gzip(true).zstd(true);
let connect_info_layer = axum::middleware::from_fn(
async |connect_info: axum::extract::ConnectInfo<SocketAddr>,
mut request: Request<Body>,
next: Next|
-> Response<Body> {
let mut addr = connect_info.0;
// When behind a reverse proxy (e.g. cloudflared), the direct
// connection comes from loopback but the request is external.
// Mark it as non-loopback so it gets the stricter limit.
if addr.ip().is_loopback() && request.headers().contains_key("CF-Connecting-IP") {
addr.set_ip(std::net::Ipv4Addr::UNSPECIFIED.into());
}
request.extensions_mut().insert(addr);
next.run(request).await
},
);
let compression_layer = CompressionLayer::new()
.br(true)
.gzip(true)
.zstd(true)
.quality(CompressionLevel::Precise(3));
let response_time_layer = axum::middleware::from_fn(
async |request: Request<Body>, next: Next| -> Response<Body> {
@@ -127,16 +123,18 @@ impl Server {
if status.is_success()
|| status.is_redirection()
|| status.is_informational()
|| response.headers().get(CONTENT_TYPE).is_some_and(|v| {
let b = v.as_bytes();
b.starts_with(b"application/") && b.ends_with(b"json")
})
|| response
.headers()
.get(CONTENT_TYPE)
.is_some_and(|v| v.to_str().is_ok_and(is_json_content_type))
{
return response;
}
let (parts, body) = response.into_parts();
let bytes = axum::body::to_bytes(body, 4096).await.unwrap_or_default();
let bytes = axum::body::to_bytes(body, MAX_ERROR_BODY_BYTES)
.await
.unwrap_or_default();
let msg = String::from_utf8_lossy(&bytes);
let (code, msg) = match parts.status {
StatusCode::NOT_FOUND => (
@@ -172,6 +170,9 @@ impl Server {
let msg = msg.into_owned();
let mut response = Error::new(parts.status, code, msg).into_response();
response.extensions_mut().extend(parts.extensions);
if let Some(allow) = parts.headers.get(ALLOW) {
response.headers_mut().insert(ALLOW, allow.clone());
}
response
},
);
@@ -210,17 +211,9 @@ impl Server {
.merge(website_router)
.layer(response_time_layer)
.layer(trace_layer)
.layer(CatchPanicLayer::custom(|panic: Box<dyn Any + Send>| {
let msg = panic
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| panic.downcast_ref::<&str>().copied())
.unwrap_or("Unknown panic");
Error::internal(msg).into_response()
}))
.layer(TimeoutLayer::with_status_code(
StatusCode::GATEWAY_TIMEOUT,
Duration::from_secs(5),
REQUEST_TIMEOUT,
))
.layer(json_error_layer)
.layer(compression_layer)
@@ -242,7 +235,14 @@ impl Server {
response
},
))
.layer(connect_info_layer);
.layer(CatchPanicLayer::custom(|panic: Box<dyn Any + Send>| {
let msg = panic
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| panic.downcast_ref::<&str>().copied())
.unwrap_or("Unknown panic");
Error::internal(msg).into_response()
}));
let (listener, port) = match port {
Some(port) => {
@@ -292,20 +292,14 @@ impl Server {
}
}
let api_json = Arc::new(ApiJson::new(&openapi));
let router = router
.layer(Extension(Arc::new(openapi)))
.layer(Extension(api_json));
.layer(Extension(OpenApiJson::new(&openapi)))
.layer(Extension(ApiJson::new(&openapi)));
// NormalizePath must wrap the router (not be a layer) to run before route matching
let app = NormalizePathLayer::trim_trailing_slash().layer(router);
serve(
listener,
ServiceExt::<Request<Body>>::into_make_service_with_connect_info::<SocketAddr>(app),
)
.await?;
serve(listener, ServiceExt::<Request<Body>>::into_make_service(app)).await?;
Ok(())
}
@@ -330,3 +324,26 @@ pub fn generate_bindings(
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
brk_bindgen::generate_clients(vecs, &openapi_json, output_paths)
}
#[cfg(test)]
mod tests {
use super::is_json_content_type;
#[test]
fn json_content_type_matches() {
assert!(is_json_content_type("application/json"));
assert!(is_json_content_type("application/json; charset=utf-8"));
assert!(is_json_content_type(" application/json "));
assert!(is_json_content_type("application/problem+json"));
assert!(is_json_content_type("application/vnd.api+json; charset=utf-8"));
}
#[test]
fn json_content_type_rejects_non_json() {
assert!(!is_json_content_type("text/plain"));
assert!(!is_json_content_type("application/xml"));
assert!(!is_json_content_type("application/json+xml"));
assert!(!is_json_content_type(""));
assert!(!is_json_content_type("text/json"));
}
}
@@ -4,6 +4,8 @@ use schemars::JsonSchema;
use brk_types::Txid;
const MAX_TXIDS: usize = 250;
/// Query parameter for transaction-times endpoint.
#[derive(JsonSchema)]
pub struct TxidsParam {
@@ -24,6 +26,9 @@ impl TxidsParam {
format!("malformed query parameter `{pair}`, expected `txId[]=<txid>`")
})?;
if key == "txId[]" || key == "txId%5B%5D" {
if txids.len() == MAX_TXIDS {
return Err(format!("too many txids, max {MAX_TXIDS} per request"));
}
let txid = Txid::from_str(val).map_err(|e| format!("invalid txid `{val}`: {e}"))?;
txids.push(txid);
} else {
@@ -35,3 +40,31 @@ impl TxidsParam {
Ok(Self { txids })
}
}
#[cfg(test)]
mod tests {
use super::*;
const T1: &str = "0000000000000000000000000000000000000000000000000000000000000001";
const T2: &str = "0000000000000000000000000000000000000000000000000000000000000002";
#[test]
fn parses_empty_single_and_multi() {
assert!(TxidsParam::from_query("").unwrap().txids.is_empty());
assert_eq!(TxidsParam::from_query(&format!("txId[]={T1}")).unwrap().txids.len(), 1);
assert_eq!(
TxidsParam::from_query(&format!("txId%5B%5D={T1}&txId[]={T2}"))
.unwrap()
.txids
.len(),
2,
);
}
#[test]
fn rejects_unknown_key_and_invalid_txid() {
assert!(TxidsParam::from_query("foo=bar").is_err());
assert!(TxidsParam::from_query("txId[]=notahex").is_err());
assert!(TxidsParam::from_query("noequals").is_err());
}
}
+36 -101
View File
@@ -1,13 +1,4 @@
use std::{
future::Future,
net::SocketAddr,
path::PathBuf,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::{Duration, Instant},
};
use std::{path::PathBuf, time::Instant};
use axum::{
body::{Body, Bytes},
@@ -20,14 +11,10 @@ use brk_types::{
};
use derive_more::Deref;
use jiff::Timestamp;
use quick_cache::sync::{Cache, GuardResult};
use serde::Serialize;
use vecdb::ReadableVec;
use crate::{
CacheParams, CacheStrategy, Error, Website,
extended::{ContentEncoding, HeaderMapExtended, ResponseExtended},
};
use crate::{CacheParams, CacheStrategy, Error, Website, extended::ResponseExtended};
#[derive(Clone, Deref)]
pub struct AppState {
@@ -35,29 +22,14 @@ pub struct AppState {
pub query: AsyncQuery,
pub data_path: PathBuf,
pub website: Website,
pub cache: Arc<Cache<String, Bytes>>,
pub last_tip: Arc<AtomicU64>,
pub started_at: Timestamp,
pub started_instant: Instant,
pub max_weight: usize,
pub max_weight_localhost: usize,
}
impl AppState {
/// Per-request series weight cap: loopback gets `max_weight_localhost`,
/// everyone else gets `max_weight`. The `connect_info_layer` rewrites the
/// peer to non-loopback when `CF-Connecting-IP` is present, so requests
/// proxied through a tunnel are billed at the external rate.
pub fn max_weight_for(&self, addr: &SocketAddr) -> usize {
if addr.ip().is_loopback() {
self.max_weight_localhost
} else {
self.max_weight
}
}
/// `Immutable` if height is >6 deep, `Tip` otherwise.
pub fn height_cache(&self, version: Version, height: Height) -> CacheStrategy {
pub fn height_strategy(&self, version: Version, height: Height) -> CacheStrategy {
let is_deep = self.sync(|q| (*q.height()).saturating_sub(*height) > 6);
if is_deep {
CacheStrategy::Immutable(version)
@@ -67,7 +39,7 @@ impl AppState {
}
/// `Immutable` if timestamp is >6 hours old (block definitely >6 deep), `Tip` otherwise.
pub fn timestamp_cache(&self, version: Version, timestamp: BrkTimestamp) -> CacheStrategy {
pub fn timestamp_strategy(&self, version: Version, timestamp: BrkTimestamp) -> CacheStrategy {
if (*BrkTimestamp::now()).saturating_sub(*timestamp) > 6 * ONE_HOUR_IN_SEC {
CacheStrategy::Immutable(version)
} else {
@@ -78,7 +50,7 @@ impl AppState {
/// `Immutable` if `date` is strictly before the indexed tip's date, `Tip` otherwise.
/// For per-date files that keep being rewritten while the tip is still within the
/// date's day, then settle once the tip crosses the day boundary.
pub fn date_cache(&self, version: Version, date: Date) -> CacheStrategy {
pub fn date_strategy(&self, version: Version, date: Date) -> CacheStrategy {
self.sync(|q| {
let height = q.indexed_height();
q.indexer()
@@ -101,7 +73,7 @@ impl AppState {
/// - Address has mempool txs → `MempoolHash(addr_specific_hash)`
/// - No mempool, has on-chain activity → `BlockBound(last_activity_block)`
/// - Unknown address → `Tip`
pub fn addr_cache(&self, version: Version, addr: &Addr, chain_only: bool) -> CacheStrategy {
pub fn addr_strategy(&self, version: Version, addr: &Addr, chain_only: bool) -> CacheStrategy {
self.sync(|q| {
if !chain_only {
let mempool_hash = q.addr_mempool_hash(addr);
@@ -123,7 +95,7 @@ impl AppState {
/// `Immutable` if the block is >6 deep (status stable), `Tip` otherwise.
/// For block status which changes when the next block arrives.
pub fn block_status_cache(&self, version: Version, hash: &BlockHash) -> CacheStrategy {
pub fn block_status_strategy(&self, version: Version, hash: &BlockHash) -> CacheStrategy {
self.sync(|q| {
q.height_by_hash(hash)
.map(|h| {
@@ -138,7 +110,7 @@ impl AppState {
}
/// `BlockBound` if the block exists (reorg-safe via block hash), `Tip` if not found.
pub fn block_cache(&self, version: Version, hash: &BlockHash) -> CacheStrategy {
pub fn block_strategy(&self, version: Version, hash: &BlockHash) -> CacheStrategy {
self.sync(|q| {
if q.height_by_hash(hash).is_ok() {
CacheStrategy::BlockBound(version, BlockHashPrefix::from(hash))
@@ -149,7 +121,7 @@ impl AppState {
}
/// Mempool → `MempoolHash`, confirmed → `BlockBound`, unknown → `Tip`.
pub fn tx_cache(&self, version: Version, txid: &Txid) -> CacheStrategy {
pub fn tx_strategy(&self, version: Version, txid: &Txid) -> CacheStrategy {
self.sync(|q| {
if let Some(mempool) = q.mempool()
&& mempool.txs().contains(txid)
@@ -165,58 +137,44 @@ impl AppState {
})
}
pub fn mempool_cache(&self) -> CacheStrategy {
pub fn mempool_strategy(&self) -> CacheStrategy {
let hash = self.sync(|q| q.mempool().map(|m| m.next_block_hash()).unwrap_or(0));
CacheStrategy::MempoolHash(hash)
}
/// Shared response pipeline: tip-clear, etag short-circuit, server-side
/// cache lookup, body computation on a blocking thread, header assembly.
/// Used by [`AppState::cached`] (strategy-driven) and the series endpoint
/// (which builds [`CacheParams`] directly from query resolution).
pub(crate) async fn cached_with_params<F>(
/// Shared response pipeline: etag short-circuit, body computation on the
/// query thread, header assembly. Used by [`AppState::respond`]
/// (strategy-driven) and the series endpoint (which builds [`CacheParams`]
/// directly from query resolution).
pub(crate) async fn respond_with_params<F>(
&self,
headers: &HeaderMap,
uri: &Uri,
_uri: &Uri,
params: CacheParams,
apply_content_headers: impl FnOnce(&mut HeaderMap),
f: F,
) -> Response<Body>
where
F: FnOnce(&brk_query::Query, ContentEncoding) -> brk_error::Result<Bytes> + Send + 'static,
F: FnOnce(&brk_query::Query) -> brk_error::Result<Bytes> + Send + 'static,
{
let tip = self.sync(|q| q.tip_hash_prefix());
if self.last_tip.swap(*tip, Ordering::Relaxed) != *tip {
self.cache.clear();
}
if params.matches_etag(headers) {
return ResponseExtended::new_not_modified(&params);
}
let encoding = ContentEncoding::negotiate(headers);
let cache_key = format!("{}-{}-{}", uri, params.etag, encoding.as_str());
let result = self
.get_or_insert(&cache_key, async move {
self.run(move |q| f(q, encoding)).await
})
.await;
match result {
match self.run(f).await {
Ok(bytes) => {
let mut response = Response::new(Body::from(bytes));
let h = response.headers_mut();
apply_content_headers(h);
params.apply_to(h);
h.insert_content_encoding(encoding);
response
}
Err(e) => Error::from(e).into_response_with_etag(params.etag.clone()),
}
}
/// Strategy-driven cached response. Compression runs on the blocking thread.
async fn cached<F>(
/// Strategy-driven cached response.
async fn respond<F>(
&self,
headers: &HeaderMap,
strategy: CacheStrategy,
@@ -225,11 +183,11 @@ impl AppState {
f: F,
) -> Response<Body>
where
F: FnOnce(&brk_query::Query, ContentEncoding) -> brk_error::Result<Bytes> + Send + 'static,
F: FnOnce(&brk_query::Query) -> brk_error::Result<Bytes> + Send + 'static,
{
let tip = self.sync(|q| q.tip_hash_prefix());
let params = CacheParams::resolve(&strategy, tip);
self.cached_with_params(
self.respond_with_params(
headers,
uri,
params,
@@ -242,7 +200,7 @@ impl AppState {
}
/// JSON response with HTTP + server-side caching
pub async fn cached_json<T, F>(
pub async fn respond_json<T, F>(
&self,
headers: &HeaderMap,
strategy: CacheStrategy,
@@ -253,9 +211,9 @@ impl AppState {
T: Serialize + Send + 'static,
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
{
self.cached(headers, strategy, uri, "application/json", move |q, enc| {
self.respond(headers, strategy, uri, "application/json", move |q| {
let value = f(q)?;
Ok(enc.compress(Bytes::from(serde_json::to_vec(&value).unwrap())))
Ok(Bytes::from(serde_json::to_vec(&value).unwrap()))
})
.await
}
@@ -268,7 +226,7 @@ impl AppState {
/// confirmed, `Tip` otherwise). Errors fall back to `Tip`. Use for
/// resources whose freshness category depends on the data itself
/// (outspends, threshold-based block status).
pub async fn cached_json_optimistic<T, F>(
pub async fn respond_json_optimistic<T, F>(
&self,
headers: &HeaderMap,
optimistic: CacheStrategy,
@@ -290,7 +248,7 @@ impl AppState {
Err(e) => (Err(e), CacheStrategy::Tip),
};
let params = CacheParams::resolve(&strategy, tip);
self.cached_with_params(
self.respond_with_params(
headers,
uri,
params,
@@ -300,16 +258,16 @@ impl AppState {
HeaderValue::from_static("application/json"),
);
},
move |_q, enc| {
move |_q| {
let value = value_result?;
Ok(enc.compress(Bytes::from(serde_json::to_vec(&value).unwrap())))
Ok(Bytes::from(serde_json::to_vec(&value).unwrap()))
},
)
.await
}
/// Text response with HTTP + server-side caching
pub async fn cached_text<T, F>(
pub async fn respond_text<T, F>(
&self,
headers: &HeaderMap,
strategy: CacheStrategy,
@@ -320,15 +278,15 @@ impl AppState {
T: AsRef<str> + Send + 'static,
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
{
self.cached(headers, strategy, uri, "text/plain", move |q, enc| {
self.respond(headers, strategy, uri, "text/plain", move |q| {
let value = f(q)?;
Ok(enc.compress(Bytes::from(value.as_ref().as_bytes().to_vec())))
Ok(Bytes::from(value.as_ref().as_bytes().to_vec()))
})
.await
}
/// Binary response with HTTP + server-side caching
pub async fn cached_bytes<T, F>(
pub async fn respond_bytes<T, F>(
&self,
headers: &HeaderMap,
strategy: CacheStrategy,
@@ -339,39 +297,16 @@ impl AppState {
T: Into<Vec<u8>> + Send + 'static,
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
{
self.cached(
self.respond(
headers,
strategy,
uri,
"application/octet-stream",
move |q, enc| {
move |q| {
let value = f(q)?;
Ok(enc.compress(Bytes::from(value.into())))
Ok(Bytes::from(value.into()))
},
)
.await
}
/// Check server-side cache, compute on miss
async fn get_or_insert(
&self,
cache_key: &str,
compute: impl Future<Output = brk_error::Result<Bytes>>,
) -> brk_error::Result<Bytes> {
let guard_res = self
.cache
.get_value_or_guard(cache_key, Some(Duration::from_millis(50)));
if let GuardResult::Value(bytes) = guard_res {
return Ok(bytes);
}
let bytes = compute.await?;
if let GuardResult::Guard(g) = guard_res {
let _ = g.insert(bytes.clone());
}
Ok(bytes)
}
}