global: snapshot

This commit is contained in:
nym21
2026-03-10 18:10:50 +01:00
parent db1dce0f3b
commit d50c6e0a73
54 changed files with 2398 additions and 3239 deletions

View File

@@ -12,7 +12,7 @@ use brk_types::{Format, MetricSelection, Output};
use crate::{
Result,
api::metrics::{CACHE_CONTROL, max_weight},
extended::HeaderMapExtended,
extended::{ContentEncoding, HeaderMapExtended},
};
use super::AppState;
@@ -38,15 +38,27 @@ pub async fn handler(
}
// Phase 2: Format (expensive, server-side cached)
let cache_key = format!("bulk-{}{}{}", uri.path(), uri.query().unwrap_or(""), etag);
let encoding = ContentEncoding::negotiate(&headers);
let cache_key = format!(
"bulk-{}{}{}-{}",
uri.path(),
uri.query().unwrap_or(""),
etag,
encoding.as_str()
);
let query = &state;
let bytes = state
.get_or_insert(&cache_key, async move {
let out = query.run(move |q| q.format(resolved)).await?;
Ok(match out.output {
Output::CSV(s) => Bytes::from(s),
Output::Json(v) => Bytes::from(v),
})
query
.run(move |q| {
let out = q.format(resolved)?;
let raw = match out.output {
Output::CSV(s) => Bytes::from(s),
Output::Json(v) => Bytes::from(v),
};
Ok(encoding.compress(raw))
})
.await
})
.await?;
@@ -54,6 +66,7 @@ pub async fn handler(
let h = response.headers_mut();
h.insert_etag(etag.as_str());
h.insert_cache_control(CACHE_CONTROL);
h.insert_content_encoding(encoding);
match format {
Format::CSV => {
h.insert_content_disposition_attachment(&csv_filename);

View File

@@ -12,7 +12,7 @@ use brk_types::{Format, MetricSelection, Output};
use crate::{
Result,
api::metrics::{CACHE_CONTROL, max_weight},
extended::HeaderMapExtended,
extended::{ContentEncoding, HeaderMapExtended},
};
use super::AppState;
@@ -38,15 +38,27 @@ pub async fn handler(
}
// Phase 2: Format (expensive, server-side cached)
let cache_key = format!("single-{}{}{}", uri.path(), uri.query().unwrap_or(""), etag);
let encoding = ContentEncoding::negotiate(&headers);
let cache_key = format!(
"single-{}{}{}-{}",
uri.path(),
uri.query().unwrap_or(""),
etag,
encoding.as_str()
);
let query = &state;
let bytes = state
.get_or_insert(&cache_key, async move {
let out = query.run(move |q| q.format(resolved)).await?;
Ok(match out.output {
Output::CSV(s) => Bytes::from(s),
Output::Json(v) => Bytes::from(v),
})
query
.run(move |q| {
let out = q.format(resolved)?;
let raw = match out.output {
Output::CSV(s) => Bytes::from(s),
Output::Json(v) => Bytes::from(v),
};
Ok(encoding.compress(raw))
})
.await
})
.await?;
@@ -54,6 +66,7 @@ pub async fn handler(
let h = response.headers_mut();
h.insert_etag(etag.as_str());
h.insert_cache_control(CACHE_CONTROL);
h.insert_content_encoding(encoding);
match format {
Format::CSV => {
h.insert_content_disposition_attachment(&csv_filename);

View File

@@ -12,7 +12,7 @@ use brk_types::{Format, MetricSelection, OutputLegacy};
use crate::{
Result,
api::metrics::{CACHE_CONTROL, max_weight},
extended::HeaderMapExtended,
extended::{ContentEncoding, HeaderMapExtended},
};
const SUNSET: &str = "2027-01-01T00:00:00Z";
@@ -40,15 +40,27 @@ pub async fn handler(
}
// Phase 2: Format (expensive, server-side cached)
let cache_key = format!("legacy-{}{}{}", uri.path(), uri.query().unwrap_or(""), etag);
let encoding = ContentEncoding::negotiate(&headers);
let cache_key = format!(
"legacy-{}{}{}-{}",
uri.path(),
uri.query().unwrap_or(""),
etag,
encoding.as_str()
);
let query = &state;
let bytes = state
.get_or_insert(&cache_key, async move {
let out = query.run(move |q| q.format_legacy(resolved)).await?;
Ok(match out.output {
OutputLegacy::CSV(s) => Bytes::from(s),
OutputLegacy::Json(v) => Bytes::from(v.to_vec()),
})
query
.run(move |q| {
let out = q.format_legacy(resolved)?;
let raw = match out.output {
OutputLegacy::CSV(s) => Bytes::from(s),
OutputLegacy::Json(v) => Bytes::from(v.to_vec()),
};
Ok(encoding.compress(raw))
})
.await
})
.await?;
@@ -56,6 +68,7 @@ pub async fn handler(
let h = response.headers_mut();
h.insert_etag(etag.as_str());
h.insert_cache_control(CACHE_CONTROL);
h.insert_content_encoding(encoding);
match format {
Format::CSV => {
h.insert_content_disposition_attachment(&csv_filename);

View File

@@ -0,0 +1,88 @@
use axum::{
body::Bytes,
http::{header, HeaderMap, HeaderValue},
};
/// HTTP content encoding for pre-compressed caching.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContentEncoding {
Brotli,
Gzip,
Zstd,
Identity,
}
impl ContentEncoding {
/// Negotiate the best encoding from the Accept-Encoding header.
/// Priority: br > zstd > gzip > identity.
pub fn negotiate(headers: &HeaderMap) -> Self {
let accept = match headers.get(header::ACCEPT_ENCODING) {
Some(v) => v,
None => return Self::Identity,
};
let s = match accept.to_str() {
Ok(s) => s,
Err(_) => return Self::Identity,
};
let mut best = Self::Identity;
for part in s.split(',') {
let name = part.split(';').next().unwrap_or("").trim();
match name {
"br" => return Self::Brotli,
"zstd" => best = Self::Zstd,
"gzip" if matches!(best, Self::Identity) => best = Self::Gzip,
_ => {}
}
}
best
}
/// Compress bytes with this encoding. Identity returns bytes unchanged.
pub fn compress(self, bytes: Bytes) -> Bytes {
match self {
Self::Identity => bytes,
Self::Brotli => {
use std::io::Write;
let mut output = Vec::with_capacity(bytes.len() / 2);
{
let mut writer = brotli::CompressorWriter::new(&mut output, 4096, 4, 22);
writer.write_all(&bytes).expect("brotli compression failed");
}
Bytes::from(output)
}
Self::Gzip => {
use flate2::write::GzEncoder;
use std::io::Write;
let mut encoder = GzEncoder::new(
Vec::with_capacity(bytes.len() / 2),
flate2::Compression::new(3),
);
encoder.write_all(&bytes).expect("gzip compression failed");
Bytes::from(encoder.finish().expect("gzip finish failed"))
}
Self::Zstd => Bytes::from(
zstd::encode_all(bytes.as_ref(), 3).expect("zstd compression failed"),
),
}
}
/// Wire name used for Content-Encoding header and cache key suffix.
#[inline]
pub fn as_str(self) -> &'static str {
match self {
Self::Brotli => "br",
Self::Gzip => "gzip",
Self::Zstd => "zstd",
Self::Identity => "identity",
}
}
#[inline]
pub(crate) fn header_value(self) -> Option<HeaderValue> {
match self {
Self::Identity => None,
_ => Some(HeaderValue::from_static(self.as_str())),
}
}
}

View File

@@ -3,6 +3,8 @@ use axum::http::{
header::{self, IF_NONE_MATCH},
};
use super::ContentEncoding;
pub trait HeaderMapExtended {
fn has_etag(&self, etag: &str) -> bool;
fn insert_etag(&mut self, etag: &str);
@@ -12,10 +14,10 @@ pub trait HeaderMapExtended {
fn insert_content_disposition_attachment(&mut self, filename: &str);
fn insert_content_encoding(&mut self, encoding: ContentEncoding);
fn insert_content_type_application_json(&mut self);
fn insert_content_type_text_csv(&mut self);
fn insert_content_type_text_plain(&mut self);
fn insert_content_type_octet_stream(&mut self);
fn insert_deprecation(&mut self, sunset: &'static str);
}
@@ -45,6 +47,12 @@ impl HeaderMapExtended for HeaderMap {
);
}
fn insert_content_encoding(&mut self, encoding: ContentEncoding) {
if let Some(value) = encoding.header_value() {
self.insert(header::CONTENT_ENCODING, value);
}
}
fn insert_content_type_application_json(&mut self) {
self.insert(header::CONTENT_TYPE, "application/json".parse().unwrap());
}
@@ -53,17 +61,6 @@ impl HeaderMapExtended for HeaderMap {
self.insert(header::CONTENT_TYPE, "text/csv".parse().unwrap());
}
fn insert_content_type_text_plain(&mut self) {
self.insert(header::CONTENT_TYPE, "text/plain".parse().unwrap());
}
fn insert_content_type_octet_stream(&mut self) {
self.insert(
header::CONTENT_TYPE,
"application/octet-stream".parse().unwrap(),
);
}
fn insert_deprecation(&mut self, sunset: &'static str) {
self.insert("Deprecation", "true".parse().unwrap());
self.insert("Sunset", sunset.parse().unwrap());

View File

@@ -1,9 +1,9 @@
mod encoding;
mod header_map;
mod response;
mod result;
mod transform_operation;
pub use encoding::*;
pub use header_map::*;
pub use response::*;
pub use result::*;
pub use transform_operation::*;

View File

@@ -13,22 +13,12 @@ where
Self: Sized,
{
fn new_not_modified() -> Self;
fn new_json<T>(value: T, etag: &str) -> Self
where
T: Serialize;
fn new_json_with<T>(status: StatusCode, value: T, etag: &str) -> Self
where
T: Serialize;
fn new_json_cached<T>(value: T, params: &CacheParams) -> Self
where
T: Serialize;
fn static_json<T>(headers: &HeaderMap, value: T) -> Self
where
T: Serialize;
fn new_text(value: &str, etag: &str) -> Self;
fn new_text_with(status: StatusCode, value: &str, etag: &str) -> Self;
fn new_bytes(value: Vec<u8>, etag: &str) -> Self;
fn new_bytes_with(status: StatusCode, value: Vec<u8>, etag: &str) -> Self;
}
impl ResponseExtended for Response<Body> {
@@ -38,55 +28,6 @@ impl ResponseExtended for Response<Body> {
response
}
fn new_json<T>(value: T, etag: &str) -> Self
where
T: Serialize,
{
Self::new_json_with(StatusCode::default(), value, etag)
}
fn new_json_with<T>(status: StatusCode, value: T, etag: &str) -> Self
where
T: Serialize,
{
let bytes = serde_json::to_vec(&value).unwrap();
let mut response = Response::builder().body(bytes.into()).unwrap();
*response.status_mut() = status;
let headers = response.headers_mut();
headers.insert_content_type_application_json();
headers.insert_cache_control_must_revalidate();
headers.insert_etag(etag);
response
}
fn new_text(value: &str, etag: &str) -> Self {
Self::new_text_with(StatusCode::default(), value, etag)
}
fn new_text_with(status: StatusCode, value: &str, etag: &str) -> Self {
let mut response = Response::builder().body(value.to_string().into()).unwrap();
*response.status_mut() = status;
let headers = response.headers_mut();
headers.insert_content_type_text_plain();
headers.insert_cache_control_must_revalidate();
headers.insert_etag(etag);
response
}
fn new_bytes(value: Vec<u8>, etag: &str) -> Self {
Self::new_bytes_with(StatusCode::default(), value, etag)
}
fn new_bytes_with(status: StatusCode, value: Vec<u8>, etag: &str) -> Self {
let mut response = Response::builder().body(value.into()).unwrap();
*response.status_mut() = status;
let headers = response.headers_mut();
headers.insert_content_type_octet_stream();
headers.insert_cache_control_must_revalidate();
headers.insert_etag(etag);
response
}
fn new_json_cached<T>(value: T, params: &CacheParams) -> Self
where
T: Serialize,

View File

@@ -1,49 +0,0 @@
use axum::response::Response;
use brk_error::Result;
use serde::Serialize;
use crate::{Error, extended::ResponseExtended};
pub trait ResultExtended<T> {
fn to_json_response(self, etag: &str) -> Response
where
T: Serialize;
fn to_text_response(self, etag: &str) -> Response
where
T: AsRef<str>;
fn to_bytes_response(self, etag: &str) -> Response
where
T: Into<Vec<u8>>;
}
impl<T> ResultExtended<T> for Result<T> {
fn to_json_response(self, etag: &str) -> Response
where
T: Serialize,
{
match self {
Ok(value) => Response::new_json(&value, etag),
Err(e) => Error::from(e).into_response_with_etag(etag),
}
}
fn to_text_response(self, etag: &str) -> Response
where
T: AsRef<str>,
{
match self {
Ok(value) => Response::new_text(value.as_ref(), etag),
Err(e) => Error::from(e).into_response_with_etag(etag),
}
}
fn to_bytes_response(self, etag: &str) -> Response
where
T: Into<Vec<u8>>,
{
match self {
Ok(value) => Response::new_bytes(value.into(), etag),
Err(e) => Error::from(e).into_response_with_etag(etag),
}
}
}

View File

@@ -9,7 +9,7 @@ use derive_more::Deref;
use axum::{
body::{Body, Bytes},
http::{HeaderMap, Response, Uri},
http::{HeaderMap, HeaderValue, Response, Uri, header},
};
use brk_query::AsyncQuery;
use brk_rpc::Client;
@@ -18,8 +18,8 @@ use quick_cache::sync::{Cache, GuardResult};
use serde::Serialize;
use crate::{
CacheParams, CacheStrategy, Website,
extended::{HeaderMapExtended, ResponseExtended, ResultExtended},
CacheParams, CacheStrategy, Error, Website,
extended::{ContentEncoding, HeaderMapExtended, ResponseExtended},
};
#[derive(Clone, Deref)]
@@ -40,6 +40,47 @@ impl AppState {
CacheStrategy::MempoolHash(hash)
}
/// Cached + pre-compressed response. Compression runs on the blocking thread.
async fn cached<F>(
&self,
headers: &HeaderMap,
strategy: CacheStrategy,
uri: &Uri,
content_type: &'static str,
f: F,
) -> Response<Body>
where
F: FnOnce(&brk_query::Query, ContentEncoding) -> brk_error::Result<Bytes> + Send + 'static,
{
let encoding = ContentEncoding::negotiate(headers);
let params = CacheParams::resolve(&strategy, || self.sync(|q| q.height().into()));
if params.matches_etag(headers) {
return ResponseExtended::new_not_modified();
}
let full_key = format!("{}-{}-{}", uri, params.etag_str(), encoding.as_str());
let result = self
.get_or_insert(&full_key, async move {
self.run(move |q| f(q, encoding)).await
})
.await;
match result {
Ok(bytes) => {
let mut response = Response::new(Body::from(bytes));
let h = response.headers_mut();
h.insert(header::CONTENT_TYPE, HeaderValue::from_static(content_type));
h.insert_cache_control(&params.cache_control);
h.insert_content_encoding(encoding);
if let Some(etag) = &params.etag {
h.insert_etag(etag);
}
response
}
Err(e) => Error::from(e).into_response_with_etag(params.etag_str()),
}
}
/// JSON response with HTTP + server-side caching
pub async fn cached_json<T, F>(
&self,
@@ -52,32 +93,11 @@ impl AppState {
T: Serialize + Send + 'static,
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
{
let params = CacheParams::resolve(&strategy, || self.sync(|q| q.height().into()));
if params.matches_etag(headers) {
return ResponseExtended::new_not_modified();
}
let full_key = format!("{}-{}", uri, params.etag_str());
let result = self
.get_or_insert(&full_key, async move {
let value = self.run(f).await?;
Ok(serde_json::to_vec(&value).unwrap().into())
})
.await;
match result {
Ok(bytes) => {
let mut response = Response::new(Body::from(bytes));
let h = response.headers_mut();
h.insert_content_type_application_json();
h.insert_cache_control(&params.cache_control);
if let Some(etag) = &params.etag {
h.insert_etag(etag);
}
response
}
Err(e) => ResultExtended::<T>::to_json_response(Err(e), params.etag_str()),
}
self.cached(headers, strategy, uri, "application/json", move |q, enc| {
let value = f(q)?;
Ok(enc.compress(Bytes::from(serde_json::to_vec(&value).unwrap())))
})
.await
}
/// Text response with HTTP + server-side caching
@@ -92,32 +112,11 @@ impl AppState {
T: AsRef<str> + Send + 'static,
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
{
let params = CacheParams::resolve(&strategy, || self.sync(|q| q.height().into()));
if params.matches_etag(headers) {
return ResponseExtended::new_not_modified();
}
let full_key = format!("{}-{}", uri, params.etag_str());
let result = self
.get_or_insert(&full_key, async move {
let value = self.run(f).await?;
Ok(Bytes::from(value.as_ref().to_owned()))
})
.await;
match result {
Ok(bytes) => {
let mut response = Response::new(Body::from(bytes));
let h = response.headers_mut();
h.insert_content_type_text_plain();
h.insert_cache_control(&params.cache_control);
if let Some(etag) = &params.etag {
h.insert_etag(etag);
}
response
}
Err(e) => ResultExtended::<T>::to_text_response(Err(e), params.etag_str()),
}
self.cached(headers, strategy, uri, "text/plain", move |q, enc| {
let value = f(q)?;
Ok(enc.compress(Bytes::from(value.as_ref().as_bytes().to_vec())))
})
.await
}
/// Binary response with HTTP + server-side caching
@@ -132,32 +131,17 @@ impl AppState {
T: Into<Vec<u8>> + Send + 'static,
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
{
let params = CacheParams::resolve(&strategy, || self.sync(|q| q.height().into()));
if params.matches_etag(headers) {
return ResponseExtended::new_not_modified();
}
let full_key = format!("{}-{}", uri, params.etag_str());
let result = self
.get_or_insert(&full_key, async move {
let value = self.run(f).await?;
Ok(Bytes::from(value.into()))
})
.await;
match result {
Ok(bytes) => {
let mut response = Response::new(Body::from(bytes));
let h = response.headers_mut();
h.insert_content_type_octet_stream();
h.insert_cache_control(&params.cache_control);
if let Some(etag) = &params.etag {
h.insert_etag(etag);
}
response
}
Err(e) => ResultExtended::<T>::to_bytes_response(Err(e), params.etag_str()),
}
self.cached(
headers,
strategy,
uri,
"application/octet-stream",
move |q, enc| {
let value = f(q)?;
Ok(enc.compress(Bytes::from(value.into())))
},
)
.await
}
/// Check server-side cache, compute on miss