global: snapshot

This commit is contained in:
k
2024-10-20 18:31:43 +02:00
parent ffa4871035
commit 5b9d599e83
47 changed files with 666 additions and 251 deletions

View File

@@ -23,4 +23,6 @@ pub trait AnyMap {
fn pre_export(&mut self);
fn export(&self) -> color_eyre::Result<()>;
fn post_export(&mut self);
fn delete_files(&self);
}

View File

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use crate::log;
#[derive(Parser, Debug, Clone, Default, Serialize, Deserialize)]
#[derive(Parser, Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[command(version, about, long_about = None)]
pub struct Config {
/// Bitcoin data directory path, saved
@@ -32,6 +32,10 @@ pub struct Config {
#[arg(long, value_name = "SECONDS")]
pub delay: Option<u64>,
/// Maximum ram you want the program to use in GB, default: 75% of total, not saved
#[arg(long, value_name = "GB")]
pub max_ram: Option<f64>,
/// Start a dry run, default: false, not saved
#[arg(long, value_name = "BOOL")]
dry_run: Option<bool>,
@@ -39,6 +43,10 @@ pub struct Config {
/// Record ram usage, default: false, not saved
#[arg(long, value_name = "BOOL")]
record_ram_usage: Option<bool>,
/// Recompute all computed datasets, default: false, not saved
#[arg(long, value_name = "BOOL")]
recompute_computed: Option<bool>,
}
impl Config {
@@ -50,40 +58,48 @@ impl Config {
toml::from_str(&contents).unwrap_or_default()
});
let config_args = Config::parse();
let mut config_args = Config::parse();
if let Some(datadir) = config_args.datadir {
if let Some(datadir) = config_args.datadir.take() {
config_saved.datadir = Some(datadir);
}
if let Some(rpcconnect) = config_args.rpcconnect {
if let Some(rpcconnect) = config_args.rpcconnect.take() {
config_saved.rpcconnect = Some(rpcconnect);
}
if let Some(rpcport) = config_args.rpcport {
if let Some(rpcport) = config_args.rpcport.take() {
config_saved.rpcport = Some(rpcport);
}
if let Some(rpcuser) = config_args.rpcuser {
if let Some(rpcuser) = config_args.rpcuser.take() {
config_saved.rpcuser = Some(rpcuser);
}
if let Some(rpcpassword) = config_args.rpcpassword {
if let Some(rpcpassword) = config_args.rpcpassword.take() {
config_saved.rpcpassword = Some(rpcpassword);
}
if let Some(delay) = config_args.delay {
if let Some(delay) = config_args.delay.take() {
config_saved.delay = Some(delay);
}
if let Some(max_ram) = config_args.max_ram.take() {
config_saved.max_ram = Some(max_ram);
}
// Done importing
let config = config_saved;
let mut config = config_saved;
config.check();
config.write()?;
config.dry_run = config_args.dry_run.take();
config.record_ram_usage = config_args.record_ram_usage.take();
config.recompute_computed = config_args.recompute_computed.take();
log("---");
log("Configuration:");
log(&format!("datadir: {:?}", config.datadir));
@@ -92,10 +108,20 @@ impl Config {
log(&format!("rpcuser: {:?}", config.rpcuser));
log(&format!("rpcpassword: {:?}", config.rpcpassword));
log(&format!("delay: {:?}", config.delay));
log(&format!("max_ram: {:?}", config.max_ram));
log(&format!("dry_run: {:?}", config.dry_run));
log(&format!("record_ram_usage: {:?}", config.record_ram_usage));
log(&format!(
"recompute_computed: {:?}",
config.recompute_computed
));
log("---");
if config_args != Config::default() {
dbg!(config_args);
panic!("Didn't consume the full config")
}
Ok(config)
}
@@ -132,4 +158,8 @@ impl Config {
pub fn record_ram_usage(&self) -> bool {
self.record_ram_usage.is_some_and(|b| b)
}
pub fn recompute_computed(&self) -> bool {
self.recompute_computed.is_some_and(|b| b)
}
}

View File

@@ -73,6 +73,10 @@ impl Date {
Ordering::Greater => unreachable!("0.0 but shouldn't be called"),
}
}
pub fn is_new_year(&self) -> bool {
self.day() == 1 && self.month() == 1
}
}
impl MapKey<DateMapChunkId> for Date {

View File

@@ -156,9 +156,7 @@ where
if serialized.version() == s.version {
s.imported.insert(chunk_start, serialized);
} else {
s.read_dir()
.iter()
.for_each(|(_, path)| fs::remove_file(path).unwrap())
s.delete_files();
}
}
});
@@ -204,12 +202,10 @@ where
}
pub fn insert(&mut self, key: Key, value: Value) -> Value {
if !self.is_key_safe(key) {
self.to_insert
.entry(key.to_chunk_id())
.or_default()
.insert(key.to_serialized_key(), value);
}
self.to_insert
.entry(key.to_chunk_id())
.or_default()
.insert(key.to_serialized_key(), value);
value
}
@@ -341,8 +337,10 @@ where
});
let path = self.path_all.join(chunk_id.to_name());
self.serialization.export(Path::new(&path), serialized)?;
// Export last
if index == len - 1 {
if let Some(path_last) = self.path_last.as_ref() {
self.serialization
@@ -370,6 +368,12 @@ where
self.to_insert.clear();
}
fn delete_files(&self) {
self.read_dir()
.iter()
.for_each(|(_, path)| fs::remove_file(path).unwrap())
}
}
impl<Key, Value, ChunkId, Serialized> GenericMap<Key, Value, ChunkId, Serialized>
@@ -379,14 +383,16 @@ where
Key: MapKey<ChunkId>,
Serialized: MapSerialized<Key, Value, ChunkId>,
{
pub fn sum_keys(&self, keys: &[Key]) -> Value
pub fn sum_keys(&mut self, keys: &[Key]) -> Value
where
Value: Sum,
{
keys.iter().flat_map(|key| self.get(key)).sum::<Value>()
keys.iter()
.map(|key| self.get_or_import(key).unwrap())
.sum::<Value>()
}
pub fn average_keys(&self, keys: &[Key]) -> f32
pub fn average_keys(&mut self, keys: &[Key]) -> f32
where
Value: Sum,
f32: LossyFrom<Value>,
@@ -654,7 +660,7 @@ where
let previous_average: f32 = average
.unwrap_or_else(|| {
key.checked_sub(1)
.and_then(|previous_average_key| self.get(&previous_average_key))
.and_then(|previous_average_key| self.get_or_import(&previous_average_key))
.unwrap_or_default()
})
.into();
@@ -832,7 +838,7 @@ where
keys.iter().for_each(|key| {
if previous_max.is_none() {
key.checked_sub(1)
.and_then(|previous_max_key| self.get(&previous_max_key))
.and_then(|previous_max_key| self.get_or_import(&previous_max_key))
.and_then(|v| previous_max.replace(v));
}
@@ -850,4 +856,38 @@ where
self.insert(*key, previous_max.unwrap());
});
}
pub fn multi_insert_min(&mut self, keys: &[Key], source: &mut Self, min_excluded: Value)
where
Value: Default + PartialOrd,
{
let mut previous_min = None;
keys.iter().for_each(|key| {
if previous_min.is_none() {
if let Some(value) = key
.checked_sub(1)
.and_then(|previous_min_key| self.get_or_import(&previous_min_key))
{
if value > min_excluded {
previous_min.replace(value);
}
}
}
let last_value = source.get_or_import(key).unwrap_or_else(|| {
dbg!(key);
panic!()
});
if last_value > min_excluded
&& (previous_min.is_none()
|| previous_min.is_some_and(|previous_min| previous_min > last_value))
{
previous_min.replace(last_value);
}
self.insert(*key, previous_min.unwrap_or_default());
});
}
}

View File

@@ -26,6 +26,7 @@ mod liquidity;
mod map_value;
mod partial_txout_data;
mod price;
mod ram;
mod sent_data;
mod serialized_btreemap;
mod serialized_vec;
@@ -61,6 +62,7 @@ pub use liquidity::*;
pub use map_value::*;
pub use partial_txout_data::*;
pub use price::*;
pub use ram::*;
pub use sent_data::*;
pub use serialized_btreemap::*;
pub use serialized_vec::*;

26
parser/src/structs/ram.rs Normal file
View File

@@ -0,0 +1,26 @@
use derive_deref::Deref;
use memory_stats::memory_stats;
use sysinfo::System;
use crate::structs::Config;
#[allow(clippy::upper_case_acronyms)]
#[derive(Deref)]
pub struct RAM(System);
impl RAM {
pub fn new() -> Self {
Self(System::new_all())
}
pub fn max_exceeded(&self, config: &Config) -> bool {
let ram_used = memory_stats().unwrap().physical_mem as f64;
if let Some(max_ram) = config.max_ram {
(ram_used / 1_000_000_000.0) > max_ram
} else {
let ram_total = self.total_memory() as f64;
ram_used / ram_total > 0.75
}
}
}