mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-07-01 14:29:01 -07:00
global: snapshot
This commit is contained in:
@@ -1,15 +1,15 @@
|
||||
use std::{path::PathBuf, sync::Arc, time::Instant};
|
||||
use std::{future::Future, path::PathBuf, sync::Arc, time::{Duration, Instant}};
|
||||
|
||||
use derive_more::Deref;
|
||||
|
||||
use axum::{
|
||||
body::{Body, Bytes},
|
||||
http::{HeaderMap, Response},
|
||||
http::{HeaderMap, Response, Uri},
|
||||
};
|
||||
use brk_query::AsyncQuery;
|
||||
use brk_rpc::Client;
|
||||
use jiff::Timestamp;
|
||||
use quick_cache::sync::Cache;
|
||||
use quick_cache::sync::{Cache, GuardResult};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{
|
||||
@@ -35,33 +35,12 @@ impl AppState {
|
||||
CacheStrategy::MempoolHash(hash)
|
||||
}
|
||||
|
||||
/// JSON response with caching
|
||||
/// JSON response with HTTP + server-side caching
|
||||
pub async fn cached_json<T, F>(
|
||||
&self,
|
||||
headers: &HeaderMap,
|
||||
strategy: CacheStrategy,
|
||||
f: F,
|
||||
) -> Response<Body>
|
||||
where
|
||||
T: Serialize + Send + 'static,
|
||||
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
|
||||
{
|
||||
let params = CacheParams::resolve(&strategy, || self.sync(|q| q.height().into()));
|
||||
if params.matches_etag(headers) {
|
||||
return ResponseExtended::new_not_modified();
|
||||
}
|
||||
match self.run(f).await {
|
||||
Ok(value) => ResponseExtended::new_json_cached(&value, ¶ms),
|
||||
Err(e) => ResultExtended::<T>::to_json_response(Err(e), params.etag_str()),
|
||||
}
|
||||
}
|
||||
|
||||
/// JSON response with HTTP caching + server-side cache
|
||||
pub async fn server_cached_json<T, F>(
|
||||
&self,
|
||||
headers: &HeaderMap,
|
||||
strategy: CacheStrategy,
|
||||
cache_prefix: &str,
|
||||
uri: &Uri,
|
||||
f: F,
|
||||
) -> Response<Body>
|
||||
where
|
||||
@@ -73,9 +52,9 @@ impl AppState {
|
||||
return ResponseExtended::new_not_modified();
|
||||
}
|
||||
|
||||
let cache_key = format!("{cache_prefix}-{}", params.etag_str());
|
||||
let full_key = format!("{}-{}", uri, params.etag_str());
|
||||
let result = self
|
||||
.get_or_insert(&cache_key, async move {
|
||||
.get_or_insert(&full_key, async move {
|
||||
let value = self.run(f).await?;
|
||||
Ok(serde_json::to_vec(&value).unwrap().into())
|
||||
})
|
||||
@@ -96,18 +75,95 @@ impl AppState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Text response with HTTP + server-side caching
|
||||
pub async fn cached_text<T, F>(
|
||||
&self,
|
||||
headers: &HeaderMap,
|
||||
strategy: CacheStrategy,
|
||||
uri: &Uri,
|
||||
f: F,
|
||||
) -> Response<Body>
|
||||
where
|
||||
T: AsRef<str> + Send + 'static,
|
||||
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
|
||||
{
|
||||
let params = CacheParams::resolve(&strategy, || self.sync(|q| q.height().into()));
|
||||
if params.matches_etag(headers) {
|
||||
return ResponseExtended::new_not_modified();
|
||||
}
|
||||
|
||||
let full_key = format!("{}-{}", uri, params.etag_str());
|
||||
let result = self
|
||||
.get_or_insert(&full_key, async move {
|
||||
let value = self.run(f).await?;
|
||||
Ok(Bytes::from(value.as_ref().to_owned()))
|
||||
})
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(bytes) => {
|
||||
let mut response = Response::new(Body::from(bytes));
|
||||
let h = response.headers_mut();
|
||||
h.insert_content_type_text_plain();
|
||||
h.insert_cache_control(¶ms.cache_control);
|
||||
if let Some(etag) = ¶ms.etag {
|
||||
h.insert_etag(etag);
|
||||
}
|
||||
response
|
||||
}
|
||||
Err(e) => ResultExtended::<T>::to_text_response(Err(e), params.etag_str()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Binary response with HTTP + server-side caching
|
||||
pub async fn cached_bytes<T, F>(
|
||||
&self,
|
||||
headers: &HeaderMap,
|
||||
strategy: CacheStrategy,
|
||||
uri: &Uri,
|
||||
f: F,
|
||||
) -> Response<Body>
|
||||
where
|
||||
T: Into<Vec<u8>> + Send + 'static,
|
||||
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
|
||||
{
|
||||
let params = CacheParams::resolve(&strategy, || self.sync(|q| q.height().into()));
|
||||
if params.matches_etag(headers) {
|
||||
return ResponseExtended::new_not_modified();
|
||||
}
|
||||
|
||||
let full_key = format!("{}-{}", uri, params.etag_str());
|
||||
let result = self
|
||||
.get_or_insert(&full_key, async move {
|
||||
let value = self.run(f).await?;
|
||||
Ok(Bytes::from(value.into()))
|
||||
})
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(bytes) => {
|
||||
let mut response = Response::new(Body::from(bytes));
|
||||
let h = response.headers_mut();
|
||||
h.insert_content_type_octet_stream();
|
||||
h.insert_cache_control(¶ms.cache_control);
|
||||
if let Some(etag) = ¶ms.etag {
|
||||
h.insert_etag(etag);
|
||||
}
|
||||
response
|
||||
}
|
||||
Err(e) => ResultExtended::<T>::to_bytes_response(Err(e), params.etag_str()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Check server-side cache, compute on miss
|
||||
pub async fn get_or_insert(
|
||||
&self,
|
||||
cache_key: &str,
|
||||
compute: impl std::future::Future<Output = brk_error::Result<Bytes>>,
|
||||
compute: impl Future<Output = brk_error::Result<Bytes>>,
|
||||
) -> brk_error::Result<Bytes> {
|
||||
use quick_cache::sync::GuardResult;
|
||||
|
||||
let guard_res = self.cache.get_value_or_guard(
|
||||
cache_key,
|
||||
Some(std::time::Duration::from_millis(50)),
|
||||
);
|
||||
let guard_res = self
|
||||
.cache
|
||||
.get_value_or_guard(cache_key, Some(Duration::from_millis(50)));
|
||||
|
||||
if let GuardResult::Value(bytes) = guard_res {
|
||||
return Ok(bytes);
|
||||
@@ -121,46 +177,4 @@ impl AppState {
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
/// Text response with caching
|
||||
pub async fn cached_text<T, F>(
|
||||
&self,
|
||||
headers: &HeaderMap,
|
||||
strategy: CacheStrategy,
|
||||
f: F,
|
||||
) -> Response<Body>
|
||||
where
|
||||
T: AsRef<str> + Send + 'static,
|
||||
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
|
||||
{
|
||||
let params = CacheParams::resolve(&strategy, || self.sync(|q| q.height().into()));
|
||||
if params.matches_etag(headers) {
|
||||
return ResponseExtended::new_not_modified();
|
||||
}
|
||||
match self.run(f).await {
|
||||
Ok(value) => ResponseExtended::new_text_cached(value.as_ref(), ¶ms),
|
||||
Err(e) => ResultExtended::<T>::to_text_response(Err(e), params.etag_str()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Binary response with caching
|
||||
pub async fn cached_bytes<T, F>(
|
||||
&self,
|
||||
headers: &HeaderMap,
|
||||
strategy: CacheStrategy,
|
||||
f: F,
|
||||
) -> Response<Body>
|
||||
where
|
||||
T: Into<Vec<u8>> + Send + 'static,
|
||||
F: FnOnce(&brk_query::Query) -> brk_error::Result<T> + Send + 'static,
|
||||
{
|
||||
let params = CacheParams::resolve(&strategy, || self.sync(|q| q.height().into()));
|
||||
if params.matches_etag(headers) {
|
||||
return ResponseExtended::new_not_modified();
|
||||
}
|
||||
match self.run(f).await {
|
||||
Ok(value) => ResponseExtended::new_bytes_cached(value.into(), ¶ms),
|
||||
Err(e) => ResultExtended::<T>::to_bytes_response(Err(e), params.etag_str()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user