server: snapshot

This commit is contained in:
nym21
2025-12-15 16:32:45 +01:00
parent 882a3525af
commit 825a4a77c0
100 changed files with 2677 additions and 3438 deletions

View File

@@ -1,31 +1,29 @@
use std::time::Duration;
use axum::{
Json,
body::Body,
extract::{Query, State},
http::{HeaderMap, StatusCode, Uri},
response::{IntoResponse, Response},
};
use brk_error::{Error, Result};
use brk_query::{Output, Params};
use brk_query::{MetricSelection, Output};
use brk_types::Format;
use quick_cache::sync::GuardResult;
use vecdb::Stamp;
use crate::{HeaderMapExtended, ResponseExtended};
use crate::{CacheStrategy, cache::CacheParams, extended::HeaderMapExtended};
use super::AppState;
/// Maximum allowed request weight in bytes (650KB)
const MAX_WEIGHT: usize = 65 * 10_000;
pub async fn handler(
uri: Uri,
headers: HeaderMap,
query: Query<Params>,
query: Query<MetricSelection>,
State(state): State<AppState>,
) -> Response {
match req_to_response_res(uri, headers, query, state) {
match req_to_response_res(uri, headers, query, state).await {
Ok(response) => response,
Err(error) => {
let mut response =
@@ -36,91 +34,64 @@ pub async fn handler(
}
}
fn req_to_response_res(
async fn req_to_response_res(
uri: Uri,
headers: HeaderMap,
Query(params): Query<Params>,
AppState {
query: interface,
cache,
..
}: AppState,
) -> Result<Response> {
todo!();
Query(params): Query<MetricSelection>,
AppState { query, cache, .. }: AppState,
) -> brk_error::Result<Response> {
let format = params.format();
let height = query.sync(|q| q.height());
let to = params.to();
// let vecs = interface.search(&params)?;
let cache_params = CacheParams::resolve(&CacheStrategy::height_with(format!("{to:?}")), || height.into());
// if vecs.is_empty() {
// return Ok(Json(vec![] as Vec<usize>).into_response());
// }
if cache_params.matches_etag(&headers) {
let mut response = (StatusCode::NOT_MODIFIED, "").into_response();
response.headers_mut().insert_cors();
return Ok(response);
}
// let from = params.from();
// let to = params.to();
// let format = params.format();
let cache_key = format!("{}{}{}", uri.path(), uri.query().unwrap_or(""), cache_params.etag_str());
let guard_res = cache.get_value_or_guard(&cache_key, Some(Duration::from_millis(50)));
// // TODO: From and to should be capped here
let mut response = if let GuardResult::Value(v) = guard_res {
Response::new(Body::from(v))
} else {
match query
.run(move |q| q.search_and_format_checked(params, MAX_WEIGHT))
.await?
{
Output::CSV(s) => {
if let GuardResult::Guard(g) = guard_res {
let _ = g.insert(s.clone().into());
}
s.into_response()
}
Output::Json(v) => {
let json = v.to_vec();
if let GuardResult::Guard(g) = guard_res {
let _ = g.insert(json.clone().into());
}
json.into_response()
}
}
};
// let weight = vecs
// .iter()
// .map(|(_, v)| v.range_weight(from, to))
// .sum::<usize>();
let headers = response.headers_mut();
headers.insert_cors();
if let Some(etag) = &cache_params.etag {
headers.insert_etag(etag);
}
headers.insert_cache_control(&cache_params.cache_control);
// if weight > MAX_WEIGHT {
// return Err(Error::String(format!(
// "Request is too heavy, max weight is {MAX_WEIGHT} bytes"
// )));
// }
match format {
Format::CSV => {
headers.insert_content_disposition_attachment();
headers.insert_content_type_text_csv()
}
Format::JSON => headers.insert_content_type_application_json(),
}
// // TODO: height should be from vec, but good enough for now
// let etag = vecs
// .first()
// .unwrap()
// .1
// .etag(Stamp::from(interface.get_height()), to);
// if headers.has_etag(etag) {
// return Ok(Response::new_not_modified());
// }
// let guard_res = cache.get_value_or_guard(
// &format!("{}{}{etag}", uri.path(), uri.query().unwrap_or("")),
// Some(Duration::from_millis(50)),
// );
// let mut response = if let GuardResult::Value(v) = guard_res {
// Response::new(Body::from(v))
// } else {
// match interface.format(vecs, &params.rest)? {
// Output::CSV(s) => {
// if let GuardResult::Guard(g) = guard_res {
// let _ = g.insert(s.clone().into());
// }
// s.into_response()
// }
// Output::Json(v) => {
// let json = v.to_vec();
// if let GuardResult::Guard(g) = guard_res {
// let _ = g.insert(json.clone().into());
// }
// json.into_response()
// }
// }
// };
// let headers = response.headers_mut();
// headers.insert_cors();
// headers.insert_etag(&etag);
// headers.insert_cache_control_must_revalidate();
// match format {
// Format::CSV => {
// headers.insert_content_disposition_attachment();
// headers.insert_content_type_text_csv()
// }
// Format::JSON => headers.insert_content_type_application_json(),
// }
// Ok(response)
Ok(response)
}

View File

@@ -1,18 +1,15 @@
use aide::axum::{ApiRouter, routing::get_with};
use axum::{
extract::{Path, Query, State},
http::{HeaderMap, StatusCode, Uri},
http::{HeaderMap, Uri},
response::{IntoResponse, Redirect, Response},
routing::get,
};
use brk_query::{PaginatedMetrics, PaginationParam, Params, ParamsDeprec, ParamsOpt};
use brk_query::{DataRangeFormat, MetricSelection, MetricSelectionLegacy, PaginatedMetrics, Pagination};
use brk_traversable::TreeNode;
use brk_types::{Index, IndexInfo, Limit, Metric, MetricCount, Metrics};
use crate::{
VERSION,
extended::{HeaderMapExtended, ResponseExtended, ResultExtended, TransformResponseExtended},
};
use crate::{CacheStrategy, extended::TransformResponseExtended};
use super::AppState;
@@ -34,11 +31,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
headers: HeaderMap,
State(state): State<AppState>
| {
let etag = VERSION;
if headers.has_etag(etag) {
return Response::new_not_modified();
}
Response::new_json(state.metric_count().await, etag)
state.cached_json(&headers, CacheStrategy::Static, |q| Ok(q.metric_count())).await
},
|op| op
.metrics_tag()
@@ -55,11 +48,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
headers: HeaderMap,
State(state): State<AppState>
| {
let etag = VERSION;
if headers.has_etag(etag) {
return Response::new_not_modified();
}
Response::new_json( state.get_indexes().await, etag)
state.cached_json(&headers, CacheStrategy::Static, |q| Ok(q.get_indexes().to_vec())).await
},
|op| op
.metrics_tag()
@@ -77,13 +66,9 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
async |
headers: HeaderMap,
State(state): State<AppState>,
Query(pagination): Query<PaginationParam>
Query(pagination): Query<Pagination>
| {
let etag = VERSION;
if headers.has_etag(etag) {
return Response::new_not_modified();
}
Response::new_json(state.get_metrics(pagination).await, etag)
state.cached_json(&headers, CacheStrategy::Static, move |q| Ok(q.get_metrics(pagination))).await
},
|op| op
.metrics_tag()
@@ -96,12 +81,8 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
.api_route(
"/api/metrics/catalog",
get_with(
async |headers: HeaderMap, State(state): State<AppState>| -> Response {
let etag = VERSION;
if headers.has_etag(etag) {
return Response::new_not_modified();
}
Response::new_json(state.get_metrics_catalog().await, etag)
async |headers: HeaderMap, State(state): State<AppState>| {
state.cached_json(&headers, CacheStrategy::Static, |q| Ok(q.get_metrics_catalog().clone())).await
},
|op| op
.metrics_tag()
@@ -122,11 +103,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
Path(metric): Path<Metric>,
Query(limit): Query<Limit>
| {
let etag = VERSION;
if headers.has_etag(etag) {
return Response::new_not_modified();
}
state.match_metric(metric, limit).await.to_json_response(etag)
state.cached_json(&headers, CacheStrategy::Static, move |q| Ok(q.match_metric(&metric, limit))).await
},
|op| op
.metrics_tag()
@@ -144,20 +121,18 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
State(state): State<AppState>,
Path(metric): Path<Metric>
| {
let etag = VERSION;
if headers.has_etag(etag) {
return Response::new_not_modified();
}
if let Some(indexes) = state.metric_to_indexes(metric.clone()).await {
return Response::new_json(indexes, etag)
}
// REMOVE UNWRAP !!
let value = if let Some(first) = state.match_metric(metric.clone(), Limit::MIN).await.unwrap().first() {
format!("Could not find '{metric}', did you mean '{first}' ?")
} else {
format!("Could not find '{metric}'.")
};
Response::new_json_with(StatusCode::NOT_FOUND, value, etag)
state.cached_json(&headers, CacheStrategy::Static, move |q| {
if let Some(indexes) = q.metric_to_indexes(metric.clone()) {
return Ok(indexes.clone())
}
Err(brk_error::Error::String(
if let Some(first) = q.match_metric(&metric, Limit::MIN).first() {
format!("Could not find '{metric}', did you mean '{first}' ?")
} else {
format!("Could not find '{metric}'.")
}
))
}).await
},
|op| op
.metrics_tag()
@@ -173,7 +148,6 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
)
// WIP
.route("/api/metrics/bulk", get(data::handler))
// WIP
.route(
"/api/metric/{metric}/{index}",
get(
@@ -181,16 +155,15 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
headers: HeaderMap,
state: State<AppState>,
Path((metric, index)): Path<(Metric, Index)>,
Query(params_opt): Query<ParamsOpt>|
Query(range): Query<DataRangeFormat>|
-> Response {
todo!();
// data::handler(
// uri,
// headers,
// Query(Params::from(((index, metric), params_opt))),
// state,
// )
// .await
data::handler(
uri,
headers,
Query(MetricSelection::from((index, metric, range))),
state,
)
.await
},
),
)
@@ -202,7 +175,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
get(
async |uri: Uri,
headers: HeaderMap,
Query(params): Query<ParamsDeprec>,
Query(params): Query<MetricSelectionLegacy>,
state: State<AppState>|
-> Response {
data::handler(uri, headers, Query(params.into()), state).await
@@ -218,7 +191,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
async |uri: Uri,
headers: HeaderMap,
Path(variant): Path<String>,
Query(params_opt): Query<ParamsOpt>,
Query(range): Query<DataRangeFormat>,
state: State<AppState>|
-> Response {
let separator = "_to_";
@@ -230,10 +203,10 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
return format!("Index {ser_index} doesn't exist").into_response();
};
let params = Params::from((
let params = MetricSelection::from((
index,
Metrics::from(split.collect::<Vec<_>>().join(separator)),
params_opt,
range,
));
data::handler(uri, headers, Query(params), state).await
},