global: datasets compression via zstd

This commit is contained in:
k
2024-08-05 00:44:46 +02:00
parent 9067c28d24
commit c646d6dc60
36 changed files with 544 additions and 249 deletions

View File

@@ -11,6 +11,7 @@ use crate::{
create_rpc,
databases::Databases,
datasets::{AllDatasets, ComputeData},
io::OUTPUTS_FOLDER_PATH,
states::{AddressCohortsDurableStates, States, UTXOCohortsDurableStates},
structs::{Date, DateData, MapKey},
utils::{generate_allocation_files, log, time},
@@ -22,8 +23,6 @@ pub fn iter_blocks(
rpc: &biter::bitcoincore_rpc::Client,
approx_block_count: usize,
) -> color_eyre::Result<()> {
let export_dir = "./target/outputs";
let should_insert = true;
let should_export = true;
let study_ram_usage = false;
@@ -56,7 +55,7 @@ pub fn iter_blocks(
let block_receiver = biter::new(
config.datadir.as_ref().unwrap(),
export_dir,
OUTPUTS_FOLDER_PATH,
Some(height.to_usize()),
None,
create_rpc(config).unwrap(),

View File

@@ -1,14 +1,16 @@
use allocative::Allocative;
use bincode::{Decode, Encode};
use serde::{Deserialize, Serialize};
use std::{
fmt::Debug,
fs, io,
ops::{Deref, DerefMut},
path::Path,
};
use crate::{
io::Binary,
structs::{Counter, Date, Height},
Serialization,
};
#[derive(Default, Debug, Encode, Decode, Allocative)]
@@ -73,7 +75,7 @@ impl Metadata {
}
}
#[derive(Default, Debug, Encode, Decode, Allocative)]
#[derive(Default, Debug, Encode, Decode, Serialize, Deserialize, Allocative)]
pub struct MetadataData {
pub serial: usize,
pub len: Counter,
@@ -88,17 +90,17 @@ impl MetadataData {
fn full_path(folder_path: &str) -> String {
let name = Self::name();
format!("{folder_path}/{name}.bin")
format!("{folder_path}/{name}")
}
pub fn import(path: &str) -> color_eyre::Result<Self> {
fs::create_dir_all(path)?;
Binary::import(&Self::full_path(path))
Serialization::Binary.import(Path::new(&Self::full_path(path)))
}
pub fn export(&self, path: &str) -> color_eyre::Result<()> {
Binary::export(&Self::full_path(path), self)
Serialization::Binary.export(Path::new(&Self::full_path(path)), self)
}
pub fn reset(&mut self, path: &str) -> color_eyre::Result<(), io::Error> {

View File

@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, fs, ops::RangeInclusive};
use std::{collections::BTreeMap, fs, ops::RangeInclusive, path::Path};
use allocative::Allocative;
@@ -282,7 +282,7 @@ impl AllDatasets {
fs::create_dir_all(server_inputs_path)?;
Json::export(
&format!("{server_inputs_path}/disk_path_to_type.json"),
Path::new(&format!("{server_inputs_path}/disk_path_to_type.json")),
&path_to_type,
)?;

View File

@@ -1,15 +1,39 @@
use std::{
fmt::Debug,
fs::File,
io::{BufReader, BufWriter},
fs::{self, File},
io::{BufReader, BufWriter, Cursor},
path::Path,
};
use bincode::{config, decode_from_std_read, encode_into_std_write, Decode, Encode};
use bincode::{
config, decode_from_slice, decode_from_std_read, encode_into_std_write, Decode, Encode,
};
use zstd::decode_all;
const ZST_EXTENSION: &str = "zst";
pub const BIN_EXTENSION: &str = "bin";
pub const COMPRESSED_BIN_EXTENSION: &str = "bin.zst";
enum BinaryType {
Raw,
Compressed,
}
pub struct Binary;
impl Binary {
pub fn import<T>(path: &str) -> color_eyre::Result<T>
pub fn import<T>(path: &Path) -> color_eyre::Result<T>
where
T: Decode,
{
match Self::type_from_path(path) {
BinaryType::Compressed => Self::import_compressed(path),
BinaryType::Raw => Self::import_raw(path),
}
}
fn import_raw<T>(path: &Path) -> color_eyre::Result<T>
where
T: Decode,
{
@@ -24,7 +48,34 @@ impl Binary {
Ok(decoded)
}
pub fn export<T>(path: &str, value: &T) -> color_eyre::Result<()>
fn import_compressed<T>(path: &Path) -> color_eyre::Result<T>
where
T: Decode,
{
let file = File::open(path).unwrap();
let reader = BufReader::new(file);
let decompressed = decode_all(reader).unwrap();
let config = config::standard();
let decoded = decode_from_slice::<T, _>(&decompressed, config).unwrap().0;
Ok(decoded)
}
pub fn export<T>(path: &Path, value: &T) -> color_eyre::Result<()>
where
T: Debug + Encode,
{
match Self::type_from_path(path) {
BinaryType::Compressed => Self::export_compressed(path, value),
BinaryType::Raw => Self::export_raw(path, value),
}
}
fn export_raw<T>(path: &Path, value: &T) -> color_eyre::Result<()>
where
T: Debug + Encode,
{
@@ -40,4 +91,47 @@ impl Binary {
Ok(())
}
fn export_compressed<T>(path: &Path, value: &T) -> color_eyre::Result<()>
where
T: Debug + Encode,
{
let config = config::standard();
let encoded = bincode::encode_to_vec(value, config).unwrap();
let cursor = Cursor::new(encoded);
let compressed = zstd::encode_all(cursor, 0).unwrap();
fs::write(path, compressed).unwrap();
Ok(())
}
pub fn has_correct_extension(path: &Path) -> bool {
let path = path.to_str().unwrap();
path.ends_with(BIN_EXTENSION) || path.ends_with(COMPRESSED_BIN_EXTENSION)
}
fn type_from_path(path: &Path) -> BinaryType {
let extension = path.extension();
if extension.is_none() {
panic!("Should have extension");
}
if !Self::has_correct_extension(path) {
dbg!(path);
panic!("Wrong extension")
}
let extension = extension.unwrap();
if extension == ZST_EXTENSION {
BinaryType::Compressed
} else {
BinaryType::Raw
}
}
}

View File

@@ -1,2 +1,2 @@
pub const INPUTS_FOLDER_PATH: &str = "./in";
pub const OUTPUTS_FOLDER_PATH: &str = "./target/outputs";
pub const OUTPUTS_FOLDER_PATH: &str = "./out";

View File

@@ -1,17 +1,25 @@
use std::{
fs::File,
io::{BufReader, BufWriter},
path::Path,
};
use serde::{de::DeserializeOwned, Serialize};
pub struct Json;
pub const JSON_EXTENSION: &str = "json";
pub const HAR_EXTENSION: &str = "har";
impl Json {
pub fn import<T>(path: &str) -> color_eyre::Result<T>
pub fn import<T>(path: &Path) -> color_eyre::Result<T>
where
T: DeserializeOwned,
{
if !Self::has_correct_extension(path) {
panic!("Wrong extension");
}
let file = File::open(path)?;
let reader = BufReader::new(file);
@@ -19,10 +27,15 @@ impl Json {
Ok(serde_json::from_reader(reader)?)
}
pub fn export<T>(path: &str, value: &T) -> color_eyre::Result<()>
pub fn export<T>(path: &Path, value: &T) -> color_eyre::Result<()>
where
T: Serialize,
{
if !Self::has_correct_extension(path) {
dbg!(path);
panic!("Wrong extension");
}
let file = File::create(path).unwrap_or_else(|_| {
dbg!(&path);
panic!("No such file or directory")
@@ -34,4 +47,10 @@ impl Json {
Ok(())
}
#[inline(always)]
pub fn has_correct_extension(path: &Path) -> bool {
let path = path.to_str().unwrap();
path.ends_with(JSON_EXTENSION) || path.ends_with(HAR_EXTENSION)
}
}

View File

@@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{fmt::Debug, fs, path::Path};
use allocative::Allocative;
use bincode::{Decode, Encode};
@@ -6,6 +6,8 @@ use serde::{de::DeserializeOwned, Serialize};
use crate::io::{Binary, Json};
use super::{BIN_EXTENSION, COMPRESSED_BIN_EXTENSION, HAR_EXTENSION, JSON_EXTENSION};
#[derive(PartialEq, PartialOrd, Ord, Eq, Debug, Clone, Copy, Default, Allocative)]
pub enum Serialization {
#[default]
@@ -14,42 +16,105 @@ pub enum Serialization {
}
impl Serialization {
pub fn to_extension(&self) -> &str {
pub fn is_serializable(&self, path: &Path) -> bool {
let path = path.to_str().unwrap();
match self {
Self::Binary => "bin",
Self::Json => "json",
Self::Binary => {
path.ends_with(BIN_EXTENSION) || path.ends_with(COMPRESSED_BIN_EXTENSION)
}
Self::Json => path.ends_with(JSON_EXTENSION) || path.ends_with(HAR_EXTENSION),
}
}
pub fn from_extension(extension: &str) -> Self {
match extension {
"bin" => Self::Binary,
"json" => Self::Json,
_ => panic!("Extension \"{extension}\" isn't supported"),
pub fn from_path(path: &Path) -> Self {
let path = path.to_str().unwrap();
if path.ends_with(BIN_EXTENSION) || path.ends_with(COMPRESSED_BIN_EXTENSION) {
Self::Binary
} else if path.ends_with(JSON_EXTENSION) || path.ends_with(HAR_EXTENSION) {
Self::Json
} else {
panic!("Extension \"{path}\" isn't supported")
}
}
pub fn append_extension(&self, path: &str) -> String {
format!("{path}.{}", self.to_extension())
}
pub fn import<T>(&self, path: &str) -> color_eyre::Result<T>
pub fn import<T>(&self, path: &Path) -> color_eyre::Result<T>
where
T: Debug + DeserializeOwned + Decode,
{
match self {
Serialization::Binary => Binary::import(path),
Serialization::Json => Json::import(path),
Serialization::Binary => {
if self.is_serializable(path) {
Binary::import(path)
} else {
let path = path.to_str().unwrap();
let bin_path_str = format!("{path}.{BIN_EXTENSION}");
let bin_path = Path::new(&bin_path_str);
if bin_path.exists() {
return Binary::import(bin_path);
}
let compressed_bin_path_str = format!("{path}.{COMPRESSED_BIN_EXTENSION}");
let compressed_bin_path = Path::new(&compressed_bin_path_str);
if compressed_bin_path.exists() {
return Binary::import(compressed_bin_path);
}
panic!("Wrong path")
}
}
Serialization::Json => {
if self.is_serializable(path) {
Json::import(path)
} else {
let path = path.to_str().unwrap();
let json_path_str = format!("{path}.{JSON_EXTENSION}");
let json_path = Path::new(&json_path_str);
if json_path.exists() {
return Json::import(json_path);
}
panic!("Wrong path")
}
}
}
}
pub fn export<T>(&self, path: &str, value: &T) -> color_eyre::Result<()>
pub fn export<T>(&self, path: &Path, value: &T) -> color_eyre::Result<()>
where
T: Debug + Serialize + Encode,
{
match self {
Serialization::Binary => Binary::export(path, value),
Serialization::Json => Json::export(path, value),
Serialization::Binary => {
if self.is_serializable(path) {
Binary::export(path, value)
} else {
let path = path.to_str().unwrap();
let res = Binary::export(
Path::new(&format!("{}.{COMPRESSED_BIN_EXTENSION}", path,)),
value,
);
if res.is_ok() {
let _ = fs::remove_file(Path::new(&format!("{}.{BIN_EXTENSION}", path)));
}
res
}
}
Serialization::Json => {
if self.is_serializable(path) {
Json::export(path, value)
} else {
Json::export(
Path::new(&format!("{}.{JSON_EXTENSION}", path.to_str().unwrap())),
value,
)
}
}
}
}
}

View File

@@ -15,5 +15,5 @@ pub use crate::{
Config, Date, DateMap, Height, HeightMap, MapChunkId, SerializedBTreeMap, SerializedVec,
HEIGHT_MAP_CHUNK_SIZE,
},
utils::{create_rpc, log},
utils::{create_rpc, log, reset_logs},
};

View File

@@ -1,11 +1,13 @@
use std::{thread::sleep, time::Duration};
use biter::bitcoincore_rpc::RpcApi;
use parser::{create_rpc, iter_blocks, log, Config};
use parser::{create_rpc, iter_blocks, log, reset_logs, Config};
fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
reset_logs();
let config = Config::import();
let rpc = create_rpc(&config).unwrap();

View File

@@ -23,8 +23,7 @@ impl Binance {
let path_binance_har = Path::new(INPUTS_FOLDER_PATH).join("binance.har");
let json: BTreeMap<String, Value> =
Json::import(path_binance_har.to_str().unwrap()).unwrap_or_default();
let json: BTreeMap<String, Value> = Json::import(&path_binance_har).unwrap_or_default();
Ok(json
.get("log")

View File

@@ -1,13 +1,14 @@
use std::{fmt::Debug, fs, io};
use std::{fmt::Debug, fs, io, path::Path};
use bincode::{Decode, Encode};
use serde::{de::DeserializeOwned, Serialize};
use crate::io::{Binary, OUTPUTS_FOLDER_PATH};
use crate::{io::OUTPUTS_FOLDER_PATH, Serialization};
// https://github.com/djkoloski/rust_serialization_benchmark
pub trait AnyState
where
Self: Debug + Encode + Decode,
Self: Debug + Encode + Decode + Serialize + DeserializeOwned,
{
fn name<'a>() -> &'a str;
@@ -24,7 +25,7 @@ where
let folder_path = Self::folder_path();
format!("{folder_path}/{name}.bin")
format!("{folder_path}/{name}")
}
fn reset(&mut self) -> color_eyre::Result<(), io::Error> {
@@ -36,11 +37,11 @@ where
fn import() -> color_eyre::Result<Self> {
Self::create_dir_all()?;
Binary::import(&Self::full_path())
Serialization::Binary.import(Path::new(&Self::full_path()))
}
fn export(&self) -> color_eyre::Result<()> {
Binary::export(&Self::full_path(), self)
Serialization::Binary.export(Path::new(&Self::full_path()), self)
}
fn clear(&mut self);

View File

@@ -1,11 +1,12 @@
use allocative::Allocative;
use bincode::{Decode, Encode};
use serde::{Deserialize, Serialize};
use crate::structs::Counter;
use super::AnyState;
#[derive(Default, Debug, Encode, Decode, Allocative)]
#[derive(Default, Debug, Encode, Decode, Serialize, Deserialize, Allocative)]
pub struct Counters {
pub op_return_addresses: Counter,
pub push_only_addresses: Counter,

View File

@@ -1,12 +1,13 @@
use allocative::Allocative;
use bincode::{Decode, Encode};
use derive_deref::{Deref, DerefMut};
use serde::{Deserialize, Serialize};
use crate::structs::{BlockData, BlockPath, DateData};
use super::AnyState;
#[derive(Default, Deref, DerefMut, Debug, Encode, Decode, Allocative)]
#[derive(Default, Deref, DerefMut, Debug, Serialize, Deserialize, Encode, Decode, Allocative)]
pub struct DateDataVec(Vec<DateData>);
impl DateDataVec {

View File

@@ -1,9 +1,10 @@
use allocative::Allocative;
use bincode::{Decode, Encode};
use serde::{Deserialize, Serialize};
use super::{Amount, Height, Price};
#[derive(Debug, Encode, Decode, Allocative)]
#[derive(Debug, Serialize, Deserialize, Encode, Decode, Allocative)]
pub struct BlockData {
pub height: Height,
pub price: Price,

View File

@@ -1,8 +1,11 @@
use allocative::Allocative;
use bincode::{Decode, Encode};
use derive_deref::{Deref, DerefMut};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deref, DerefMut, Default, Clone, Copy, Encode, Decode, Allocative)]
#[derive(
Debug, Deref, DerefMut, Default, Clone, Copy, Encode, Decode, Serialize, Deserialize, Allocative,
)]
pub struct Counter(u32);
impl Counter {

View File

@@ -1,9 +1,10 @@
use allocative::Allocative;
use bincode::{Decode, Encode};
use serde::{Deserialize, Serialize};
use super::{BlockData, BlockPath, Date};
#[derive(Debug, Encode, Decode, Allocative)]
#[derive(Debug, Serialize, Deserialize, Encode, Decode, Allocative)]
pub struct DateData {
pub date: Date,
pub blocks: Vec<BlockData>,

View File

@@ -1,3 +1,5 @@
use std::path::Path;
use allocative::Allocative;
use chrono::Datelike;
@@ -19,8 +21,18 @@ impl MapChunkId for DateMapChunkId {
self.0.to_string()
}
fn from_name(name: &str) -> Self {
Self(name.parse::<i32>().unwrap())
fn from_path(path: &Path) -> Self {
Self(
path.file_name()
.unwrap()
.to_str()
.unwrap()
.split(".")
.next()
.unwrap()
.parse::<i32>()
.unwrap(),
)
}
fn to_usize(self) -> usize {

View File

@@ -63,7 +63,7 @@ where
Self: Ord + Debug + Copy + Clone,
{
fn to_name(&self) -> String;
fn from_name(name: &str) -> Self;
fn from_path(path: &Path) -> Self;
fn to_usize(self) -> usize;
fn from_usize(id: usize) -> Self;
}
@@ -124,7 +124,7 @@ where
let path_last = {
if export_last {
Some(serialization.append_extension(&format!("{path}/last")))
Some(format!("{path}/last"))
} else {
None
}
@@ -188,22 +188,16 @@ where
fs::read_dir(path)
.unwrap()
.map(|entry| entry.unwrap().path())
.filter(|path| {
let extension = path.extension().unwrap().to_str().unwrap();
path.is_file() && extension == serialization.to_extension()
})
.filter(|path| serialization.is_serializable(path))
.map(|path| {
let chunk_id = ChunkId::from_name(path.file_stem().unwrap().to_str().unwrap());
let chunk_id = ChunkId::from_path(&path);
(chunk_id, path)
})
.collect()
}
fn import(&self, path: &Path) -> color_eyre::Result<Serialized> {
self.serialization
.import::<Serialized>(path.to_str().unwrap())
self.serialization.import::<Serialized>(path)
}
pub fn insert(&mut self, key: Key, value: Value) -> Value {
@@ -309,10 +303,7 @@ where
panic!();
});
let serialized = self
.serialization
.import::<Serialized>(path.to_str().unwrap())
.unwrap();
let serialized = self.serialization.import::<Serialized>(path).unwrap();
self.imported.insert(*chunk_id, serialized);
}
@@ -334,23 +325,18 @@ where
unreachable!()
}
let path = self.serialization.append_extension(&format!(
"{}/{}",
self.path_all,
chunk_id.to_name()
));
let serialized = self.imported.get(chunk_id).unwrap_or_else(|| {
dbg!(&self.path_all, chunk_id, &self.imported);
panic!();
});
self.serialization.export(&path, serialized)?;
let path = format!("{}/{}", self.path_all, chunk_id.to_name());
self.serialization.export(Path::new(&path), serialized)?;
if index == len - 1 {
if let Some(path_last) = self.path_last.as_ref() {
self.serialization
.export(path_last, serialized.last().unwrap())?;
.export(Path::new(path_last), serialized.last().unwrap())?;
}
}

View File

@@ -1,3 +1,5 @@
use std::path::Path;
use allocative::Allocative;
use derive_deref::{Deref, DerefMut};
@@ -26,9 +28,20 @@ impl MapChunkId for HeightMapChunkId {
format!("{start}..{end}")
}
fn from_name(name: &str) -> Self {
fn from_path(path: &Path) -> Self {
Self(Height::new(
name.split("..").next().unwrap().parse::<u32>().unwrap(),
path.file_name()
.unwrap()
.to_str()
.unwrap()
.split("..")
.next()
.unwrap()
.parse::<u32>()
.unwrap_or_else(|_| {
dbg!(path);
panic!()
}),
))
}

View File

@@ -2,11 +2,24 @@ use std::ops::{Add, AddAssign, Div, Mul, Sub, SubAssign};
use allocative::Allocative;
use bincode::{Decode, Encode};
use serde::{Deserialize, Serialize};
use super::Amount;
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Encode, Decode, Allocative,
Debug,
Default,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Serialize,
Deserialize,
Encode,
Decode,
Allocative,
)]
pub struct Price(u64);

View File

@@ -1,8 +1,17 @@
use std::{fs::OpenOptions, io::Write};
use std::{
fs::{self, OpenOptions},
io::Write,
};
use chrono::Local;
use color_eyre::owo_colors::OwoColorize;
const LOG_PATH: &str = "./.log";
pub fn reset_logs() {
let _ = fs::remove_file(LOG_PATH);
}
#[inline(always)]
pub fn log(str: &str) {
let date_time = format!("{}", Local::now().format("%Y-%m-%d %H:%M:%S -"));
@@ -13,7 +22,7 @@ pub fn log(str: &str) {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open("./parser.log")
.open(LOG_PATH)
.unwrap();
if let Err(e) = writeln!(file, "{} {}", date_time, line) {