parser: AnyDataset DX improvements

This commit is contained in:
k
2024-10-28 16:48:27 +01:00
parent 36ad0b3014
commit 48a8aad20e
43 changed files with 1688 additions and 1870 deletions

View File

@@ -2,6 +2,8 @@ use std::path::{Path, PathBuf};
use serde_json::Value;
use super::MapKind;
pub trait AnyMap {
fn path(&self) -> &Path;
fn path_last(&self) -> &Option<PathBuf>;
@@ -25,4 +27,6 @@ pub trait AnyMap {
fn post_export(&mut self);
fn delete_files(&self);
fn kind(&self) -> MapKind;
}

View File

@@ -7,9 +7,11 @@ use allocative::Allocative;
use crate::utils::{LossyFrom, TARGET_BLOCKS_PER_DAY};
use super::{AnyDateMap, AnyHeightMap, AnyMap, Date, DateMap, Height, HeightMap, MapValue};
use super::{
AnyDateMap, AnyHeightMap, AnyMap, Date, DateMap, Height, HeightMap, MapKind, MapValue,
};
#[derive(Default, Allocative)]
#[derive(Allocative)]
pub struct BiMap<Value>
where
Value: MapValue,
@@ -22,17 +24,17 @@ impl<Value> BiMap<Value>
where
Value: MapValue,
{
pub fn new_bin(version: u32, path: &str) -> Self {
pub fn new_bin(version: u32, kind: MapKind, path: &str) -> Self {
Self {
height: HeightMap::_new_bin(version, path, true),
date: DateMap::_new_bin(version, path, false),
height: HeightMap::_new_bin(version, kind, path, true),
date: DateMap::_new_bin(version, kind, path, false),
}
}
pub fn new_json(version: u32, path: &str) -> Self {
pub fn new_json(version: u32, kind: MapKind, path: &str) -> Self {
Self {
height: HeightMap::new_json(version, path, true),
date: DateMap::new_json(version, path, false),
height: HeightMap::new_json(version, kind, path, true),
date: DateMap::new_json(version, kind, path, false),
}
}
@@ -292,6 +294,10 @@ where
self.height.multi_insert_max(heights, &mut source.height);
self.date.multi_insert_max(dates, &mut source.date);
}
pub fn kind(&self) -> MapKind {
self.date.kind()
}
}
pub trait AnyBiMap {

View File

@@ -17,7 +17,7 @@ where
last_height: &mut DateMap<Height>,
) {
dates.iter().for_each(|date| {
self.insert(
self.insert_computed(
*date,
source
.get_or_import(&last_height.get_or_import(date).unwrap())
@@ -40,7 +40,7 @@ where
let last_height = last_height.get_or_import(date).unwrap();
let range = (*first_height)..=(*last_height);
self.insert(*date, height_map.sum_range(&range));
self.insert_computed(*date, height_map.sum_range(&range));
})
}
}
@@ -55,27 +55,6 @@ pub trait AnyDateMap: AnyMap {
fn as_any_mut_map(&mut self) -> &mut dyn AnyMap;
}
#[inline(always)]
pub fn date_map_vec_to_any_date_map_vec<T>(
vec: Vec<&DateMap<T>>,
) -> impl Iterator<Item = &(dyn AnyDateMap + Send + Sync)>
where
T: MapValue,
{
vec.into_iter()
.map(|map| map as &(dyn AnyDateMap + Send + Sync))
}
#[inline(always)]
pub fn date_map_vec_to_mut_any_date_map_vec<T>(
vec: Vec<&mut DateMap<T>>,
) -> impl Iterator<Item = &mut (dyn AnyDateMap)>
where
T: MapValue,
{
vec.into_iter().map(|map| map as &mut dyn AnyDateMap)
}
impl<T> AnyDateMap for DateMap<T>
where
T: MapValue,

View File

@@ -22,6 +22,12 @@ use crate::{
use super::{AnyMap, MapValue};
#[derive(Debug, Clone, Copy, Allocative, PartialEq, Eq)]
pub enum MapKind {
Inserted,
Computed,
}
pub trait MapKey<ChunkId>
where
Self: Sized + PartialOrd + Ord + Clone + Copy + Debug,
@@ -68,9 +74,10 @@ where
fn from_usize(id: usize) -> Self;
}
#[derive(Default, Debug, Allocative)]
#[derive(Debug, Allocative)]
pub struct GenericMap<Key, Value, ChunkId, Serialized> {
version: u32,
kind: MapKind,
path_all: PathBuf,
path_last: Option<PathBuf>,
@@ -93,20 +100,28 @@ where
Key: MapKey<ChunkId>,
Serialized: MapSerialized<Key, Value, ChunkId>,
{
pub fn new_bin(version: u32, path: &str) -> Self {
Self::new(version, path, Serialization::Binary, 1, true)
pub fn new_bin(version: u32, kind: MapKind, path: &str) -> Self {
Self::new(version, kind, path, Serialization::Binary, 1, true)
}
pub fn _new_bin(version: u32, path: &str, export_last: bool) -> Self {
Self::new(version, path, Serialization::Binary, 1, export_last)
pub fn _new_bin(version: u32, kind: MapKind, path: &str, export_last: bool) -> Self {
Self::new(version, kind, path, Serialization::Binary, 1, export_last)
}
pub fn new_json(version: u32, path: &str, export_last: bool) -> Self {
Self::new(version, path, Serialization::Json, usize::MAX, export_last)
pub fn new_json(version: u32, kind: MapKind, path: &str, export_last: bool) -> Self {
Self::new(
version,
kind,
path,
Serialization::Json,
usize::MAX,
export_last,
)
}
fn new(
version: u32,
kind: MapKind,
path: &str,
serialization: Serialization,
chunks_in_memory: usize,
@@ -132,6 +147,7 @@ where
let mut s = Self {
version,
kind,
path_all,
path_last,
@@ -202,6 +218,19 @@ where
}
pub fn insert(&mut self, key: Key, value: Value) -> Value {
self.checked_insert(key, value, MapKind::Inserted)
}
pub fn insert_computed(&mut self, key: Key, value: Value) -> Value {
self.checked_insert(key, value, MapKind::Computed)
}
fn checked_insert(&mut self, key: Key, value: Value, kind: MapKind) -> Value {
if self.kind != kind {
dbg!(&self.path());
panic!("Called at the wrong place");
}
if !self.is_key_safe(key) {
self.to_insert
.entry(key.to_chunk_id())
@@ -376,6 +405,10 @@ where
.iter()
.for_each(|(_, path)| fs::remove_file(path).unwrap())
}
fn kind(&self) -> MapKind {
self.kind
}
}
impl<Key, Value, ChunkId, Serialized> GenericMap<Key, Value, ChunkId, Serialized>
@@ -407,13 +440,13 @@ where
F: FnMut(&Key) -> Value,
{
keys.iter().for_each(|key| {
self.insert(*key, callback(key));
self.insert_computed(*key, callback(key));
});
}
pub fn multi_insert_const(&mut self, keys: &[Key], constant: Value) {
keys.iter().for_each(|key| {
self.insert(*key, constant);
self.insert_computed(*key, constant);
});
}
@@ -428,7 +461,7 @@ where
F: FnMut(SourceValue, &Key) -> Value,
{
keys.iter().for_each(|key| {
self.insert(*key, transform(source.get_or_import(key).unwrap(), key));
self.insert_computed(*key, transform(source.get_or_import(key).unwrap(), key));
});
}
@@ -452,7 +485,7 @@ where
keys.iter().for_each(|key| {
let value = transform((source.get_or_import(key).unwrap(), key, source, self));
self.insert(*key, value);
self.insert_computed(*key, value);
});
}
@@ -469,7 +502,7 @@ where
Value: LossyFrom<A> + LossyFrom<B> + Add<Output = Value>,
{
keys.iter().for_each(|key| {
self.insert(
self.insert_computed(
*key,
Value::lossy_from(added.get_or_import(key).unwrap())
+ Value::lossy_from(adder.get_or_import(key).unwrap()),
@@ -490,7 +523,7 @@ where
Value: LossyFrom<A> + LossyFrom<B> + Sub<Output = Value>,
{
keys.iter().for_each(|key| {
self.insert(
self.insert_computed(
*key,
Value::lossy_from(subtracted.get_or_import(key).unwrap())
- Value::lossy_from(subtracter.get_or_import(key).unwrap()),
@@ -511,7 +544,7 @@ where
Value: LossyFrom<A> + LossyFrom<B> + Mul<Output = Value>,
{
keys.iter().for_each(|key| {
self.insert(
self.insert_computed(
*key,
Value::lossy_from(multiplied.get_or_import(key).unwrap())
* Value::lossy_from(multiplier.get_or_import(key).unwrap()),
@@ -565,7 +598,7 @@ where
let multiplier = Value::from(if as_percentage { 100 } else { 1 });
keys.iter().for_each(|key| {
self.insert(
self.insert_computed(
*key,
Value::lossy_from(divided.get_or_import(key).unwrap())
/ Value::lossy_from(divider.get_or_import(key).unwrap())
@@ -634,7 +667,7 @@ where
previous_sum + Value::lossy_from(last_value) - Value::lossy_from(to_subtract),
);
self.insert(*key, sum.unwrap());
self.insert_computed(*key, sum.unwrap());
});
}
@@ -677,7 +710,7 @@ where
average.replace(((previous_average * (len - 1.0) + last_value) / len).into());
self.insert(*key, average.unwrap());
self.insert_computed(*key, average.unwrap());
});
}
@@ -695,7 +728,7 @@ where
let net_change = last_value - previous_value;
self.insert(*key, net_change);
self.insert_computed(*key, net_change);
});
}
@@ -718,7 +751,7 @@ where
let percentage_change = ((last_value / previous_value) - one) * hundred;
self.insert(*key, Value::lossy_from(percentage_change));
self.insert_computed(*key, Value::lossy_from(percentage_change));
});
}
@@ -753,7 +786,7 @@ where
keys.iter().cloned().try_for_each(|key| {
if key < min_percentile_key {
map_and_percentiles.iter_mut().for_each(|(map, _)| {
(*map).insert(key, nan);
(*map).insert_computed(key, nan);
});
return ControlFlow::Continue::<()>(());
}
@@ -818,11 +851,11 @@ where
let float_value = get_percentile::<OrderedFloat<f32>>(vec, *percentile).0;
(*map).insert(key, Value::lossy_from(float_value));
(*map).insert_computed(key, Value::lossy_from(float_value));
});
} else {
map_and_percentiles.iter_mut().for_each(|(map, _)| {
(*map).insert(key, nan);
(*map).insert_computed(key, nan);
});
}
@@ -854,7 +887,7 @@ where
previous_max.replace(last_value);
}
self.insert(*key, previous_max.unwrap());
self.insert_computed(*key, previous_max.unwrap());
});
}
@@ -888,7 +921,7 @@ where
previous_min.replace(last_value);
}
self.insert(*key, previous_min.unwrap_or_default());
self.insert_computed(*key, previous_min.unwrap_or_default());
});
}
}

View File

@@ -4,9 +4,7 @@ use allocative::Allocative;
use bincode::{Decode, Encode};
use serde::{de::DeserializeOwned, Serialize};
use crate::datasets::OHLC;
use super::{Date, Height, Timestamp};
use super::{Date, Height, Timestamp, OHLC};
pub trait MapValue:
Clone

View File

@@ -25,6 +25,7 @@ mod height_map;
mod height_map_chunk_id;
mod liquidity;
mod map_value;
mod ohlc;
mod partial_txout_data;
mod price;
mod sent_data;
@@ -61,6 +62,7 @@ pub use height_map::*;
pub use height_map_chunk_id::*;
pub use liquidity::*;
pub use map_value::*;
pub use ohlc::*;
pub use partial_txout_data::*;
pub use price::*;
pub use sent_data::*;

View File

@@ -0,0 +1,12 @@
use allocative::Allocative;
use bincode::{Decode, Encode};
use serde::{Deserialize, Serialize};
#[allow(clippy::upper_case_acronyms)]
#[derive(Debug, Default, Deserialize, Serialize, Encode, Decode, Clone, Copy, Allocative)]
pub struct OHLC {
pub open: f32,
pub high: f32,
pub low: f32,
pub close: f32,
}