server: add support for dataset by timestamp

This commit is contained in:
nym21
2024-12-27 12:28:27 +01:00
parent 2b017ac6b5
commit 481f5c0a97
19 changed files with 286 additions and 121 deletions

View File

@@ -72,7 +72,7 @@ pub fn iter_blocks(
if let Some((_current_block_height, current_block, _current_block_hash)) =
current_block_opt
{
let timestamp = Timestamp::wrap(current_block.header.time);
let timestamp = Timestamp::from(current_block.header.time);
let current_block_date = timestamp.to_date();
let current_block_height: Height = height + blocks_loop_i;
@@ -83,7 +83,7 @@ pub fn iter_blocks(
}
next_date_opt = next_block_opt.as_ref().map(|(_, next_block, _)| {
Timestamp::wrap(next_block.header.time).to_date()
Timestamp::from(next_block.header.time).to_date()
});
// Always run for the first block of the loop

View File

@@ -61,7 +61,7 @@ pub fn parse(
) {
// log(&format!("{height}"));
let timestamp = Timestamp::wrap(block.header.time);
let timestamp = Timestamp::from(block.header.time);
// If false, expect that the code is flawless
// or create a 0 value txid database

View File

@@ -171,7 +171,7 @@ impl Binance {
// [timestamp, open, high, low, close, volume, ...]
let array = value.as_array().unwrap();
let date = Timestamp::wrap(
let date = Timestamp::from(
(array.first().unwrap().as_u64().unwrap() / 1_000) as u32,
)
.to_date();

View File

@@ -91,7 +91,7 @@ impl Kraken {
.map(|value| {
let array = value.as_array().unwrap();
let date = Timestamp::wrap(array.first().unwrap().as_u64().unwrap() as u32)
let date = Timestamp::from(array.first().unwrap().as_u64().unwrap() as u32)
.to_date();
let get_f32 = |index: usize| {

View File

@@ -1,6 +1,7 @@
use std::{fmt::Debug, path::PathBuf, time::Instant};
use axum::{
body::Body,
extract::{Path, Query, State},
http::HeaderMap,
response::{IntoResponse, Response},
@@ -14,15 +15,19 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use crate::{
server::{
api::{
structs::{ChunkMetadata, DatasetRange, DatasetRangeChunk, Extension, Kind, Route},
structs::{
ChunkMetadata, DatasetRange, DatasetRangeChunk, Extension, Kind, Route, Routes,
},
API_URL_PREFIX,
},
header_map::HeaderMapUtils,
log_result, AppState,
header_map::{HeaderMapExtended, Modified},
log_result,
response::ResponseExtended,
AppState,
},
structs::{
Date, GenericMap, Height, MapChunkId, MapKey, MapSerialized, MapValue, SerializedDateMap,
SerializedVec, OHLC,
Date, GenericMap, Height, HeightMapChunkId, MapChunkId, MapKey, MapSerialized, MapValue,
SerializedBTreeMap, SerializedDateMap, SerializedTimeMap, SerializedVec, Timestamp, OHLC,
},
};
@@ -86,19 +91,17 @@ fn result_handler(
let type_name = route.type_name.as_str();
Ok(match type_name {
"u8" => typed_handler::<u8>(headers, id, ext, query, route)?,
"u16" => typed_handler::<u16>(headers, id, ext, query, route)?,
"u32" => typed_handler::<u32>(headers, id, ext, query, route)?,
"u64" => typed_handler::<u64>(headers, id, ext, query, route)?,
"usize" => typed_handler::<usize>(headers, id, ext, query, route)?,
"f32" => typed_handler::<f32>(headers, id, ext, query, route)?,
"f64" => typed_handler::<f64>(headers, id, ext, query, route)?,
"OHLC" => typed_handler::<OHLC>(headers, id, ext, query, route)?,
"Date" => typed_handler::<Date>(headers, id, ext, query, route)?,
"Height" => typed_handler::<Height>(headers, id, ext, query, route)?,
// "Value" => {
// value_to_response::<serde_json::Value>(Json::import(&route.file_path)?, extension)
// }
"u8" => typed_handler::<u8>(headers, id, ext, query, route, &routes)?,
"u16" => typed_handler::<u16>(headers, id, ext, query, route, &routes)?,
"u32" => typed_handler::<u32>(headers, id, ext, query, route, &routes)?,
"u64" => typed_handler::<u64>(headers, id, ext, query, route, &routes)?,
"usize" => typed_handler::<usize>(headers, id, ext, query, route, &routes)?,
"f32" => typed_handler::<f32>(headers, id, ext, query, route, &routes)?,
"f64" => typed_handler::<f64>(headers, id, ext, query, route, &routes)?,
"OHLC" => typed_handler::<OHLC>(headers, id, ext, query, route, &routes)?,
"Date" => typed_handler::<Date>(headers, id, ext, query, route, &routes)?,
"Height" => typed_handler::<Height>(headers, id, ext, query, route, &routes)?,
"Timestamp" => typed_handler::<Timestamp>(headers, id, ext, query, route, &routes)?,
_ => panic!("Incompatible type: {type_name}"),
})
}
@@ -109,6 +112,7 @@ fn typed_handler<T>(
ext: Option<Extension>,
query: &Query<DatasetParams>,
route: &Route,
routes: &Routes,
) -> color_eyre::Result<Response>
where
T: Serialize + Debug + DeserializeOwned + Decode + MapValue,
@@ -121,26 +125,97 @@ where
let range = DatasetRange::try_from(query)?;
let (mut response, date_modified) = match kind {
Kind::Date => map_to_response::<Date, T, _, SerializedDateMap<T>>(
id, headers, route, &ext, range, query,
),
Kind::Height => map_to_response::<Height, T, _, SerializedVec<T>>(
id, headers, route, &ext, range, query,
),
Kind::Last => {
let last_value: T = route.serialization.import(&route.path.join("last"))?;
return Ok(axum::response::Json(last_value).into_response());
}
}?;
Kind::Date => match read_serialized::<Date, T, _, SerializedDateMap<T>>(
id, &headers, route, &range, query,
)? {
ReadSerialized::DatasetAndDate((dataset, date, chunk_meta)) => {
(serialized_to_response(dataset, id, chunk_meta, ext), date)
}
ReadSerialized::NotModified => return Ok(Response::new_not_modified()),
ReadSerialized::_Phantom(_) => unreachable!(),
},
Kind::Height => match read_serialized::<Height, T, _, SerializedVec<T>>(
id, &headers, route, &range, query,
)? {
ReadSerialized::DatasetAndDate((dataset, date, chunk_meta)) => (
serialized_to_response::<Height, T, _, SerializedVec<T>>(
dataset, id, chunk_meta, ext,
),
date,
),
ReadSerialized::NotModified => return Ok(Response::new_not_modified()),
ReadSerialized::_Phantom(_) => unreachable!(),
},
Kind::Timestamp => {
let (dataset, date, chunk_meta) = match read_serialized::<Height, T, _, SerializedVec<T>>(
id, &headers, route, &range, query,
)? {
ReadSerialized::DatasetAndDate(tuple) => tuple,
ReadSerialized::NotModified => return Ok(Response::new_not_modified()),
ReadSerialized::_Phantom(_) => unreachable!(),
};
let (timestamp_dataset, _, _) =
match read_serialized::<Height, Timestamp, _, SerializedVec<Timestamp>>(
"timestamp",
&headers,
routes.get("timestamp").unwrap(),
&range,
query,
)? {
ReadSerialized::DatasetAndDate(tuple) => tuple,
ReadSerialized::NotModified => return Ok(Response::new_not_modified()),
ReadSerialized::_Phantom(_) => unreachable!(),
};
let mut serialized_timemap: SerializedTimeMap<T> = SerializedBTreeMap::default();
dataset
.map
.into_iter()
.enumerate()
.for_each(|(index, value)| {
serialized_timemap.map.insert(
timestamp_dataset
.get_index(index)
.cloned()
.unwrap_or(Timestamp::now()),
value,
);
});
(
serialized_to_response::<Timestamp, T, HeightMapChunkId, SerializedTimeMap<T>>(
serialized_timemap,
id,
chunk_meta,
ext,
),
date,
)
// let m = read_serialized::<Height, T, _, SerializedVec<T>>(
// id, &headers, route, &range, query,
// )?;
// let t = read_serialized::<Height, Timestamp, _, SerializedVec<Timestamp>>(
// "timestamp",
// &headers,
// routes.get("timestamp").unwrap(),
// &range,
// query,
// );
// t
}
};
let status_ok = response.status() == StatusCode::OK;
let headers = response.headers_mut();
headers.insert_cors();
if status_ok {
headers.insert_last_modified(date_modified);
}
headers.insert_last_modified(date_modified);
match ext {
Some(extension) => {
@@ -156,14 +231,51 @@ where
Ok(response)
}
fn map_to_response<Key, Value, ChunkId, Serialized>(
fn serialized_to_response<Key, Value, ChunkId, Serialized>(
dataset: Serialized,
id: &str,
headers: HeaderMap,
chunk_meta: Option<ChunkMetadata>,
ext: Option<Extension>,
) -> Response<Body>
where
Key: MapKey<ChunkId>,
Value: MapValue,
ChunkId: MapChunkId,
Serialized: MapSerialized<Key, Value, ChunkId>,
{
if ext == Some(Extension::CSV) {
dataset.to_csv(id).into_response()
} else if let Some(chunk) = chunk_meta {
axum::response::Json(SerializedMapChunk {
chunk,
map: dataset.map(),
version: dataset.version(),
})
.into_response()
} else {
axum::response::Json(dataset).into_response()
}
}
enum ReadSerialized<Key, Value, ChunkId, Serialized>
where
Key: MapKey<ChunkId>,
Value: MapValue,
ChunkId: MapChunkId,
Serialized: MapSerialized<Key, Value, ChunkId>,
{
DatasetAndDate((Serialized, DateTime<Utc>, Option<ChunkMetadata>)),
NotModified,
_Phantom((Key, Value, ChunkId)),
}
fn read_serialized<Key, Value, ChunkId, Serialized>(
id: &str,
headers: &HeaderMap,
route: &Route,
ext: &Option<Extension>,
range: DatasetRange,
range: &DatasetRange,
query: &Query<DatasetParams>,
) -> color_eyre::Result<(Response, DateTime<Utc>)>
) -> color_eyre::Result<ReadSerialized<Key, Value, ChunkId, Serialized>>
where
Key: MapKey<ChunkId>,
Value: MapValue,
@@ -188,7 +300,7 @@ where
.context("Last tuple of dataset directory")?
.0
}
DatasetRangeChunk::Chunk(chunk) => ChunkId::from_usize(chunk),
DatasetRangeChunk::Chunk(chunk) => ChunkId::from_usize(*chunk),
};
let chunk_path = datasets.get(&chunk_id);
@@ -197,12 +309,11 @@ where
}
let chunk_path = chunk_path.unwrap();
let (date, response) = headers.check_if_modified_since(chunk_path)?;
date_modified = date;
if let Some(response) = response {
return Ok((response, date_modified));
let (modified, date) = headers.check_if_modified_since(chunk_path)?;
if modified == Modified::NotModifiedSince {
return Ok(ReadSerialized::NotModified);
}
date_modified = date;
let to_url = |chunk: Option<ChunkId>| {
chunk.and_then(|chunk| {
@@ -237,30 +348,21 @@ where
})
.context("Expect to find newest file")?;
let (date, response) = headers.check_if_modified_since(newest_file)?;
date_modified = date;
if let Some(response) = response {
return Ok((response, date_modified));
let (modified, date) = headers.check_if_modified_since(newest_file)?;
if modified == Modified::NotModifiedSince {
return Ok(ReadSerialized::NotModified);
}
date_modified = date;
Serialized::import_all(&folder_path, serialization)
};
let response = if *ext == Some(Extension::CSV) {
dataset.to_csv(id).into_response()
} else if let Some(chunk) = chunk_meta {
axum::response::Json(SerializedMapChunk {
chunk,
map: dataset.map(),
version: dataset.version(),
})
.into_response()
} else {
axum::response::Json(dataset).into_response()
};
Ok((response, date_modified))
Ok(ReadSerialized::DatasetAndDate((
dataset,
date_modified,
chunk_meta,
)))
}
#[derive(Serialize)]

View File

@@ -1,6 +1,6 @@
use std::path::Path;
#[derive(PartialEq, Eq)]
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum Extension {
#[allow(clippy::upper_case_acronyms)]
CSV,

View File

@@ -9,6 +9,7 @@ use crate::structs::{AnyMap, Date, Height, MapKey};
pub enum Kind {
Date,
Height,
Timestamp,
Last,
}
@@ -25,6 +26,7 @@ impl TryFrom<&String> for Kind {
{
'd' => Self::Date,
'h' => Self::Height,
't' => Self::Timestamp,
'l' => Self::Last,
_ => return Err(eyre!("Bad kind")),
},
@@ -40,6 +42,7 @@ impl From<&(dyn AnyMap + Send + Sync)> for BTreeSet<Kind> {
}
if map.key_name() == Height::map_name() {
s.insert(Kind::Height);
s.insert(Kind::Timestamp);
}
if map.last_value().is_some() {
s.insert(Kind::Last);
@@ -47,13 +50,3 @@ impl From<&(dyn AnyMap + Send + Sync)> for BTreeSet<Kind> {
s
}
}
// impl std::fmt::Display for Kind {
// fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
// match *self {
// Self::Date => write!(f, "date"),
// Self::Height => write!(f, "height"),
// Self::Last => write!(f, "last"),
// }
// }
// }

View File

@@ -1,21 +1,20 @@
use std::path::Path;
use axum::{
body::Body,
http::{header, HeaderMap, Response},
response::IntoResponse,
};
use axum::http::{header, HeaderMap};
use chrono::{DateTime, Timelike, Utc};
use log::info;
use reqwest::{
header::{HOST, IF_MODIFIED_SINCE},
StatusCode,
};
use reqwest::header::{HOST, IF_MODIFIED_SINCE};
const STALE_IF_ERROR: u64 = 30_000_000; // 1 Year ish
const MODIFIED_SINCE_FORMAT: &str = "%a, %d %b %Y %H:%M:%S GMT";
pub trait HeaderMapUtils {
#[derive(PartialEq, Eq)]
pub enum Modified {
ModifiedSince,
NotModifiedSince,
}
pub trait HeaderMapExtended {
fn get_scheme(&self) -> &str;
fn get_host(&self) -> &str;
fn check_if_host_is_any_local(&self) -> bool;
@@ -25,10 +24,8 @@ pub trait HeaderMapUtils {
fn insert_cors(&mut self);
fn get_if_modified_since(&self) -> Option<DateTime<Utc>>;
fn check_if_modified_since(
&self,
path: &Path,
) -> color_eyre::Result<(DateTime<Utc>, Option<Response<Body>>)>;
fn check_if_modified_since(&self, path: &Path)
-> color_eyre::Result<(Modified, DateTime<Utc>)>;
fn insert_cache_control_immutable(&mut self);
#[allow(unused)]
@@ -52,7 +49,7 @@ pub trait HeaderMapUtils {
fn insert_content_type_font_woff2(&mut self);
}
impl HeaderMapUtils for HeaderMap {
impl HeaderMapExtended for HeaderMap {
fn get_scheme(&self) -> &str {
if self.check_if_host_is_any_local() {
"http"
@@ -114,22 +111,18 @@ impl HeaderMapUtils for HeaderMap {
fn check_if_modified_since(
&self,
path: &Path,
) -> color_eyre::Result<(DateTime<Utc>, Option<Response<Body>>)> {
) -> color_eyre::Result<(Modified, DateTime<Utc>)> {
let time = path.metadata()?.modified()?;
let date: DateTime<Utc> = time.into();
let date = date.with_nanosecond(0).unwrap();
let mut response_opt = None;
if let Some(if_modified_since) = self.get_if_modified_since() {
if if_modified_since == date {
let mut response = (StatusCode::NOT_MODIFIED, "").into_response();
let headers = response.headers_mut();
headers.insert_cors();
response_opt.replace(response);
return Ok((Modified::NotModifiedSince, date));
}
}
Ok((date, response_opt))
Ok((Modified::ModifiedSince, date))
}
fn get_if_modified_since(&self) -> Option<DateTime<Utc>> {

View File

@@ -13,6 +13,7 @@ use crate::structs::Config;
pub mod api;
mod header_map;
mod response;
mod website;
#[derive(Clone)]

20
src/server/response.rs Normal file
View File

@@ -0,0 +1,20 @@
use axum::{body::Body, http::Response, response::IntoResponse};
use reqwest::StatusCode;
use super::header_map::HeaderMapExtended;
pub trait ResponseExtended
where
Self: Sized,
{
fn new_not_modified() -> Self;
}
impl ResponseExtended for Response<Body> {
fn new_not_modified() -> Response<Body> {
let mut response = (StatusCode::NOT_MODIFIED, "").into_response();
let headers = response.headers_mut();
headers.insert_cors();
response
}
}

View File

@@ -13,7 +13,11 @@ use axum::{
use log::{error, info};
use reqwest::StatusCode;
use crate::server::{header_map::HeaderMapUtils, log_result};
use crate::server::{
header_map::{HeaderMapExtended, Modified},
log_result,
response::ResponseExtended,
};
use super::minify_js;
@@ -80,10 +84,9 @@ fn path_to_response(headers: &HeaderMap, path: &Path) -> Response {
}
fn _path_to_response(headers: &HeaderMap, path: &Path) -> color_eyre::Result<Response> {
let (date, response) = headers.check_if_modified_since(path)?;
if let Some(response) = response {
return Ok(response);
let (modified, date) = headers.check_if_modified_since(path)?;
if modified == Modified::NotModifiedSince {
return Ok(Response::new_not_modified());
}
let mut response;

View File

@@ -77,6 +77,7 @@ pub struct Config {
impl Config {
pub const DATASET_DIR_NAME: &str = "datasets";
pub const DATABASES_DIR_NAME: &str = "databases";
pub fn import() -> color_eyre::Result<Self> {
let path = Self::path_dot_kibo();
@@ -285,7 +286,7 @@ impl Config {
}
pub fn path_datasets(&self) -> MapPath {
MapPath::from(self.path_kibodir().join("datasets"))
MapPath::from(self.path_kibodir().join(Self::DATASET_DIR_NAME))
}
pub fn path_datasets_last_values(&self) -> MapPath {
@@ -297,7 +298,7 @@ impl Config {
}
pub fn path_databases(&self) -> PathBuf {
self.path_kibodir().join(Self::DATASET_DIR_NAME)
self.path_kibodir().join(Self::DATABASES_DIR_NAME)
}
pub fn path_states(&self) -> PathBuf {

View File

@@ -49,7 +49,7 @@ impl Date {
}
pub fn to_timestamp(self) -> Timestamp {
Timestamp::wrap(NaiveDateTime::from(*self).and_utc().timestamp() as u32)
Timestamp::from(NaiveDateTime::from(*self).and_utc().timestamp() as u32)
}
/// Returns value between 0.0 and 1.0 depending on its completion

View File

@@ -322,8 +322,8 @@ where
fn id(&self, config: &Config) -> String {
let path_to_string = |p: &Path| p.to_str().unwrap().to_owned();
path_to_string(self.path_parent())
.replace(&path_to_string(&config.path_kibodir()), "")
.replace(&format!("/{}/", Config::DATASET_DIR_NAME), "")
.replace(&format!("{}/", path_to_string(&config.path_kibodir())), "")
.replace(&format!("{}/", Config::DATASET_DIR_NAME), "")
.replace("/", "-")
}

View File

@@ -6,9 +6,10 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use crate::io::Serialization;
use super::{Date, DateMap, MapChunkId, MapKey, MapSerialized, MapValue};
use super::{Date, DateMap, MapChunkId, MapKey, MapSerialized, MapValue, Timestamp};
pub type SerializedDateMap<T> = SerializedBTreeMap<Date, T>;
pub type SerializedTimeMap<T> = SerializedBTreeMap<Timestamp, T>;
#[derive(Debug, Default, Serialize, Deserialize, Encode, Decode, Allocative)]
pub struct SerializedBTreeMap<Key, Value>

View File

@@ -14,6 +14,12 @@ pub struct SerializedVec<Value> {
pub map: Vec<Value>,
}
impl<Value> SerializedVec<Value> {
pub fn get_index(&self, index: usize) -> Option<&Value> {
self.map.get(index)
}
}
impl<Key, Value, ChunkId> MapSerialized<Key, Value, ChunkId> for SerializedVec<Value>
where
Self: Debug + Serialize + DeserializeOwned + Encode + Decode,

View File

@@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
use crate::utils::{ONE_DAY_IN_S, ONE_HOUR_IN_S};
use super::Date;
use super::{Date, HeightMapChunkId, MapKey};
#[derive(
Debug,
@@ -32,10 +32,6 @@ pub struct Timestamp(u32);
impl Timestamp {
pub const ZERO: Self = Self(0);
pub fn wrap(timestamp: u32) -> Self {
Self(timestamp)
}
pub fn now() -> Self {
Self(chrono::offset::Utc::now().timestamp() as u32)
}
@@ -51,7 +47,7 @@ impl Timestamp {
pub fn to_floored_seconds(self) -> Self {
let date_time = Utc.timestamp_opt(i64::from(self.0), 0).unwrap();
Self::wrap(
Self::from(
NaiveDateTime::new(
date_time.date_naive(),
NaiveTime::from_hms_opt(date_time.hour(), date_time.minute(), 0).unwrap(),
@@ -70,7 +66,7 @@ impl Timestamp {
}
pub fn older_by_1h_plus_than(&self, younger: Self) -> bool {
younger.checked_sub(**self).unwrap_or_default() > ONE_HOUR_IN_S as u32
(*younger).checked_sub(**self).unwrap_or_default() > ONE_HOUR_IN_S as u32
}
}
@@ -78,7 +74,7 @@ impl Sub for Timestamp {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
Self::wrap(self.0 - rhs.0)
Self::from(self.0 - rhs.0)
}
}
@@ -87,3 +83,55 @@ impl fmt::Display for Timestamp {
write!(f, "{}", **self)
}
}
impl From<u32> for Timestamp {
fn from(value: u32) -> Self {
Self(value)
}
}
impl MapKey<HeightMapChunkId> for Timestamp {
fn to_chunk_id(&self) -> HeightMapChunkId {
unreachable!();
}
fn to_first_unsafe(&self) -> Option<Self> {
unreachable!();
}
fn to_serialized_key(&self) -> Self {
unreachable!();
}
fn is_out_of_bounds(&self) -> bool {
unreachable!();
}
fn is_first(&self) -> bool {
unreachable!();
}
fn checked_sub(&self, _: usize) -> Option<Self> {
unreachable!();
}
fn min_percentile_key() -> Self {
unreachable!();
}
fn iter_up_to(&self, other: &Self) -> impl Iterator<Item = Self> {
(**self..=**other).map(Timestamp::from)
}
fn map_name<'a>() -> &'a str {
"timestamp"
}
fn to_usize(&self) -> usize {
(**self) as usize
}
fn from_usize(t: usize) -> Self {
Self(t as u32)
}
}

View File

@@ -5,10 +5,6 @@ pub fn retry<T>(
sleep_in_s: u64,
retries: usize,
) -> color_eyre::Result<T> {
if retries < 1 {
unreachable!()
}
let mut i = 0;
loop {