global: snapshot

This commit is contained in:
nym21
2025-11-11 17:41:12 +01:00
parent 2dcbd8df99
commit 81da73bc53
17 changed files with 811 additions and 427 deletions

3
.gitignore vendored
View File

@@ -29,3 +29,6 @@ flamegraph.svg
# Expand
expand.rs
# Benchmarks
[0-9]/

92
Cargo.lock generated
View File

@@ -211,7 +211,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -533,6 +533,8 @@ name = "brk_bencher"
version = "0.0.111"
dependencies = [
"brk_error",
"brk_logger",
"parking_lot",
]
[[package]]
@@ -814,7 +816,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_json",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1262,7 +1264,7 @@ version = "0.0.111"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1271,7 +1273,7 @@ version = "0.0.111"
dependencies = [
"bitcoin",
"brk_error",
"byteview 0.6.1",
"byteview 0.8.0",
"derive_deref",
"itoa",
"jiff",
@@ -1442,7 +1444,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1754,7 +1756,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1765,7 +1767,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81"
dependencies = [
"darling_core",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1799,7 +1801,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1830,7 +1832,7 @@ checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"unicode-xid",
]
@@ -1885,7 +1887,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1960,7 +1962,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2194,7 +2196,7 @@ checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2288,7 +2290,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2772,7 +2774,7 @@ checksum = "980af8b43c3ad5d8d349ace167ec8170839f753a42d233ba19e08afe1850fa69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -3064,7 +3066,7 @@ checksum = "4568f25ccbd45ab5d5603dc34318c1ec56b117531781260002151b8530a9f931"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -3189,7 +3191,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -3272,9 +3274,9 @@ dependencies = [
[[package]]
name = "oxc-browserslist"
version = "2.1.2"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7612127f5fa1ef0f3e98905ddfc10a7fa1df98944b75d774bb3cb4b530663616"
checksum = "f978be538ca5e2a64326d24b7991dc658cc8495132833ae387212ab3b8abd70a"
dependencies = [
"bincode",
"flate2",
@@ -3309,7 +3311,7 @@ checksum = "003b4612827f6501183873fb0735da92157e3c7daa71c40921c7d2758fec2229"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -3353,7 +3355,7 @@ dependencies = [
"phf",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -3879,7 +3881,7 @@ dependencies = [
"phf_shared",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -3908,7 +3910,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -4080,7 +4082,7 @@ checksum = "7347867d0a7e1208d93b46767be83e2b8f978c3dad35f775ac8d8847551d6fe1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -4283,7 +4285,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -4371,7 +4373,7 @@ checksum = "bd83f5f173ff41e00337d97f6572e416d022ef8a19f371817259ae960324c482"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -4541,7 +4543,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -4646,7 +4648,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -4657,7 +4659,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -4923,7 +4925,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -4948,9 +4950,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.109"
version = "2.0.110"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f17c7e013e88258aa9543dcbe81aca68a667a9ac37cd69c9fbc07858bfe0e2f"
checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea"
dependencies = [
"proc-macro2",
"quote",
@@ -4971,7 +4973,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -5033,7 +5035,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -5044,7 +5046,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -5123,7 +5125,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -5269,7 +5271,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -5359,7 +5361,7 @@ checksum = "ee6ff59666c9cbaec3533964505d39154dc4e0a56151fdea30a09ed0301f62e2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"termcolor",
]
@@ -5541,7 +5543,7 @@ name = "vecdb_derive"
version = "0.3.16"
dependencies = [
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -5628,7 +5630,7 @@ dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"wasm-bindgen-shared",
]
@@ -5747,7 +5749,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -5758,7 +5760,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -6032,7 +6034,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"synstructure",
]
@@ -6053,7 +6055,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -6073,7 +6075,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"synstructure",
]
@@ -6107,7 +6109,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]

View File

@@ -58,8 +58,8 @@ brk_store = { version = "0.0.111", path = "crates/brk_store" }
brk_types = { version = "0.0.111", path = "crates/brk_types" }
brk_traversable = { version = "0.0.111", path = "crates/brk_traversable", features = ["derive"] }
brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_derive" }
byteview = "=0.6.1"
# byteview = "~0.8.0"
# byteview = "=0.6.1"
byteview = "~0.8.0"
derive_deref = "1.1.1"
# fjall2 = { version = "2.11.5", package = "brk_fjall" }
fjall2 = { path = "../fjall2", package = "brk_fjall" }

View File

@@ -11,3 +11,5 @@ build = "build.rs"
[dependencies]
brk_error = { workspace = true }
brk_logger = { workspace = true }
parking_lot = { workspace = true }

View File

@@ -0,0 +1,54 @@
use std::collections::HashMap;
use std::fs;
use std::io;
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
pub struct DiskMonitor {
cache: HashMap<PathBuf, (u64, SystemTime)>, // path -> (bytes_used, mtime)
}
impl DiskMonitor {
pub fn new() -> Self {
Self {
cache: HashMap::new(),
}
}
/// Get disk usage in bytes (matches `du` and Finder)
pub fn get_disk_usage(&mut self, path: &Path) -> io::Result<u64> {
self.scan_recursive(path)
}
fn scan_recursive(&mut self, path: &Path) -> io::Result<u64> {
let mut total = 0;
for entry in fs::read_dir(path)? {
let entry = entry?;
let path = entry.path();
let metadata = entry.metadata()?;
if metadata.is_file() {
let mtime = metadata.modified()?;
// Check cache: if mtime unchanged, use cached value
if let Some((cached_bytes, cached_mtime)) = self.cache.get(&path)
&& *cached_mtime == mtime
{
total += cached_bytes;
continue;
}
// File is new or modified - get actual disk usage
let bytes = metadata.blocks() * 512;
self.cache.insert(path, (bytes, mtime));
total += bytes;
} else if metadata.is_dir() {
total += self.scan_recursive(&path)?;
}
}
Ok(total)
}
}

View File

@@ -1,8 +1,7 @@
use std::{
fs,
io::Write,
io::{self, Write},
path::{Path, PathBuf},
process::Command,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
@@ -13,11 +12,24 @@ use std::{
use brk_error::Result;
pub struct Bencher {
mod disk;
mod memory;
mod progression;
use disk::*;
use memory::*;
use parking_lot::Mutex;
use progression::*;
#[derive(Clone)]
pub struct Bencher(Arc<BencherInner>);
struct BencherInner {
bench_dir: PathBuf,
monitored_path: PathBuf,
stop_flag: Arc<AtomicBool>,
monitor_thread: Option<JoinHandle<Result<()>>>,
monitor_thread: Mutex<Option<JoinHandle<Result<()>>>>,
progression: Arc<ProgressionMonitor>,
}
impl Bencher {
@@ -33,12 +45,23 @@ impl Bencher {
fs::create_dir_all(&bench_dir)?;
Ok(Self {
let progress_csv = bench_dir.join("progress.csv");
let progression = Arc::new(ProgressionMonitor::new(&progress_csv)?);
let progression_clone = progression.clone();
// Register hook with logger
brk_logger::register_hook(move |message| {
progression_clone.check_and_record(message);
})
.map_err(|e| io::Error::new(io::ErrorKind::AlreadyExists, e))?;
Ok(Self(Arc::new(BencherInner {
bench_dir,
monitored_path: monitored_path.to_path_buf(),
stop_flag: Arc::new(AtomicBool::new(false)),
monitor_thread: None,
})
progression,
monitor_thread: Mutex::new(None),
})))
}
/// Create a bencher using CARGO_MANIFEST_DIR to find workspace root
@@ -69,144 +92,38 @@ impl Bencher {
/// Start monitoring disk usage and memory footprint
pub fn start(&mut self) -> Result<()> {
if self.monitor_thread.is_some() {
if self.0.monitor_thread.lock().is_some() {
return Err("Bencher already started".into());
}
let stop_flag = self.stop_flag.clone();
let bench_dir = self.bench_dir.clone();
let monitored_path = self.monitored_path.clone();
let stop_flag = self.0.stop_flag.clone();
let bench_dir = self.0.bench_dir.clone();
let monitored_path = self.0.monitored_path.clone();
let handle =
thread::spawn(move || monitor_resources(&monitored_path, &bench_dir, stop_flag));
self.monitor_thread = Some(handle);
*self.0.monitor_thread.lock() = Some(handle);
Ok(())
}
/// Stop monitoring and wait for the thread to finish
pub fn stop(mut self) -> Result<()> {
self.stop_flag.store(true, Ordering::Relaxed);
pub fn stop(&self) -> Result<()> {
self.0.stop_flag.store(true, Ordering::Relaxed);
if let Some(handle) = self.monitor_thread.take() {
if let Some(handle) = self.0.monitor_thread.lock().take() {
handle.join().map_err(|_| "Monitor thread panicked")??;
}
self.0.progression.flush()?;
Ok(())
}
/// Get the benchmark output directory
pub fn bench_dir(&self) -> &Path {
&self.bench_dir
}
}
fn parse_size_to_mb(value_str: &str, unit: &str) -> Option<f64> {
let value: f64 = value_str.parse().ok()?;
match unit {
"MB" | "M" => Some(value),
"GB" | "G" => Some(value * 1024.0),
"KB" | "K" => Some(value / 1024.0),
"B" => Some(value / 1024.0 / 1024.0),
_ => None,
}
}
fn parse_du_output(size_str: &str) -> Option<f64> {
// Parse outputs like "524M", "287G", "4.0K"
let size_str = size_str.trim();
if let Some(unit_pos) = size_str.find(|c: char| c.is_alphabetic()) {
let (value_part, unit_part) = size_str.split_at(unit_pos);
parse_size_to_mb(value_part, unit_part)
} else {
// No unit means bytes
let value: f64 = size_str.parse().ok()?;
Some(value / 1024.0 / 1024.0)
}
}
fn parse_footprint_output(output: &str) -> Option<(f64, f64)> {
let mut phys_footprint = None;
let mut phys_footprint_peak = None;
for line in output.lines() {
if line.contains("phys_footprint:") && !line.contains("peak") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 {
phys_footprint = parse_size_to_mb(parts[1], parts[2]);
}
} else if line.contains("phys_footprint_peak:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 {
phys_footprint_peak = parse_size_to_mb(parts[1], parts[2]);
}
}
}
match (phys_footprint, phys_footprint_peak) {
(Some(f), Some(p)) => Some((f, p)),
_ => None,
}
}
#[cfg(target_os = "linux")]
fn get_memory_usage_linux(pid: u32) -> Result<(f64, f64)> {
// Read /proc/[pid]/status for memory information
let status_path = format!("/proc/{}/status", pid);
let status_content = fs::read_to_string(status_path)?;
let mut vm_rss = None;
let mut vm_hwm = None;
for line in status_content.lines() {
if line.starts_with("VmRSS:") {
// Current RSS in kB
if let Some(value_str) = line.split_whitespace().nth(1)
&& let Ok(kb) = value_str.parse::<f64>()
{
vm_rss = Some(kb / 1024.0); // Convert kB to MB
}
} else if line.starts_with("VmHWM:") {
// Peak RSS (High Water Mark) in kB
if let Some(value_str) = line.split_whitespace().nth(1)
&& let Ok(kb) = value_str.parse::<f64>()
{
vm_hwm = Some(kb / 1024.0); // Convert kB to MB
}
}
}
match (vm_rss, vm_hwm) {
(Some(rss), Some(hwm)) => Ok((rss, hwm)),
_ => Err("Failed to parse memory info from /proc/[pid]/status".into()),
}
}
#[cfg(target_os = "macos")]
fn get_memory_usage_macos(pid: u32) -> Result<(f64, f64)> {
let output = Command::new("footprint")
.args(["-p", &pid.to_string()])
.output()?;
let stdout = String::from_utf8(output.stdout).unwrap();
parse_footprint_output(&stdout).ok_or_else(|| "Failed to parse footprint output".into())
}
fn get_memory_usage(pid: u32) -> Result<(f64, f64)> {
#[cfg(target_os = "macos")]
{
get_memory_usage_macos(pid)
}
#[cfg(target_os = "linux")]
{
get_memory_usage_linux(pid)
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
{
Err("Unsupported platform for memory monitoring".into())
impl Drop for Bencher {
fn drop(&mut self) {
let _ = self.stop();
}
}
@@ -215,41 +132,43 @@ fn monitor_resources(
bench_dir: &Path,
stop_flag: Arc<AtomicBool>,
) -> Result<()> {
let disk_file = bench_dir.join("disk_usage.csv");
let memory_file = bench_dir.join("memory_footprint.csv");
let disk_file = bench_dir.join("disk.csv");
let memory_file = bench_dir.join("memory.csv");
let mut disk_writer = fs::File::create(disk_file)?;
let mut memory_writer = fs::File::create(memory_file)?;
writeln!(disk_writer, "timestamp_ms,disk_usage_mb")?;
writeln!(disk_writer, "timestamp_ms,disk_usage")?;
writeln!(
memory_writer,
"timestamp_ms,phys_footprint_mb,phys_footprint_peak_mb"
"timestamp_ms,phys_footprint,phys_footprint_peak"
)?;
let pid = std::process::id();
let start = Instant::now();
while !stop_flag.load(Ordering::Relaxed) {
let mut disk_monitor = DiskMonitor::new();
let memory_monitor = MemoryMonitor::new(pid);
'l: loop {
let elapsed_ms = start.elapsed().as_millis();
// Get disk usage
if let Ok(output) = Command::new("du")
.args(["-sh", monitored_path.to_str().unwrap()])
.output()
&& let Ok(stdout) = String::from_utf8(output.stdout)
&& let Some(size_str) = stdout.split_whitespace().next()
&& let Some(size_mb) = parse_du_output(size_str)
{
writeln!(disk_writer, "{},{}", elapsed_ms, size_mb)?;
if let Ok(bytes) = disk_monitor.get_disk_usage(monitored_path) {
writeln!(disk_writer, "{},{}", elapsed_ms, bytes)?;
}
// Get memory footprint (cross-platform)
if let Ok((footprint, peak)) = get_memory_usage(pid) {
if let Ok((footprint, peak)) = memory_monitor.get_memory_usage() {
writeln!(memory_writer, "{},{},{}", elapsed_ms, footprint, peak)?;
}
thread::sleep(Duration::from_secs(5));
// Best version
for _ in 0..50 {
// 50 * 100ms = 5 seconds
if stop_flag.load(Ordering::Relaxed) {
break 'l;
}
thread::sleep(Duration::from_millis(100));
}
}
Ok(())

View File

@@ -0,0 +1,127 @@
use std::io;
#[cfg(target_os = "linux")]
use std::fs;
#[cfg(target_os = "macos")]
use std::process::Command;
pub struct MemoryMonitor {
pid: u32,
}
impl MemoryMonitor {
pub fn new(pid: u32) -> Self {
Self { pid }
}
/// Get memory usage in bytes
/// Returns (current_bytes, peak_bytes)
pub fn get_memory_usage(&self) -> io::Result<(u64, u64)> {
#[cfg(target_os = "linux")]
{
self.get_memory_usage_linux()
}
#[cfg(target_os = "macos")]
{
self.get_memory_usage_macos()
}
}
#[cfg(target_os = "linux")]
fn get_memory_usage_linux(&self) -> io::Result<(u64, u64)> {
let status_content = fs::read_to_string(format!("/proc/{}/status", self.pid))?;
let mut vm_rss = None;
let mut vm_hwm = None;
for line in status_content.lines() {
if line.starts_with("VmRSS:") {
if let Some(value_str) = line.split_whitespace().nth(1) {
if let Ok(kb) = value_str.parse::<u64>() {
vm_rss = Some(kb * 1024); // KiB to bytes
}
}
} else if line.starts_with("VmHWM:") {
if let Some(value_str) = line.split_whitespace().nth(1) {
if let Ok(kb) = value_str.parse::<u64>() {
vm_hwm = Some(kb * 1024); // KiB to bytes
}
}
}
}
match (vm_rss, vm_hwm) {
(Some(rss), Some(hwm)) => Ok((rss, hwm)),
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Failed to parse memory info from /proc/[pid]/status",
)),
}
}
#[cfg(target_os = "macos")]
fn get_memory_usage_macos(&self) -> io::Result<(u64, u64)> {
let output = Command::new("footprint")
.args(["-p", &self.pid.to_string()])
.output()?;
let stdout = String::from_utf8(output.stdout).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "Invalid UTF-8 from footprint")
})?;
parse_footprint_output(&stdout).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"Failed to parse footprint output",
)
})
}
}
#[cfg(target_os = "macos")]
fn parse_footprint_output(output: &str) -> Option<(u64, u64)> {
let mut phys_footprint = None;
let mut phys_footprint_peak = None;
for line in output.lines() {
let line = line.trim();
if line.starts_with("phys_footprint:") {
// Format: "phys_footprint: 7072 KB"
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 {
// parts[0] = "phys_footprint:"
// parts[1] = "7072"
// parts[2] = "KB"
phys_footprint = parse_size_to_bytes(parts[1], parts[2]);
}
} else if line.starts_with("phys_footprint_peak:") {
// Format: "phys_footprint_peak: 15 MB"
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 {
phys_footprint_peak = parse_size_to_bytes(parts[1], parts[2]);
}
}
}
match (phys_footprint, phys_footprint_peak) {
(Some(f), Some(p)) => Some((f, p)),
_ => None,
}
}
#[cfg(target_os = "macos")]
fn parse_size_to_bytes(value: &str, unit: &str) -> Option<u64> {
let value: f64 = value.parse().ok()?;
let multiplier = match unit.to_uppercase().as_str() {
"KB" => 1024.0, // KiB to bytes
"MB" => 1024.0 * 1024.0, // MiB to bytes
"GB" => 1024.0 * 1024.0 * 1024.0, // GiB to bytes
_ => return None,
};
Some((value * multiplier) as u64)
}

View File

@@ -0,0 +1,56 @@
use parking_lot::Mutex;
use std::{
fs,
io::{self, BufWriter, Write},
path::Path,
time::Instant,
};
pub struct ProgressionMonitor {
csv_file: Mutex<BufWriter<fs::File>>,
start_time: Instant,
}
impl ProgressionMonitor {
pub fn new(csv_path: &Path) -> io::Result<Self> {
let mut csv_file = BufWriter::new(fs::File::create(csv_path)?);
writeln!(csv_file, "timestamp_ms,block_number")?;
Ok(Self {
csv_file: Mutex::new(csv_file),
start_time: Instant::now(),
})
}
/// Fast inline check and record
#[inline]
pub fn check_and_record(&self, message: &str) {
if !message.contains("block ") {
return;
}
if let Some(block_num) = parse_block_number(message)
&& block_num % 10 == 0
{
let elapsed_ms = self.start_time.elapsed().as_millis();
let mut writer = self.csv_file.lock();
let _ = writeln!(writer, "{},{}", elapsed_ms, block_num);
}
}
pub fn flush(&self) -> io::Result<()> {
self.csv_file.lock().flush()
}
}
#[inline]
fn parse_block_number(message: &str) -> Option<u64> {
let start = message.find("block ")?;
let after_block = &message[start + 6..];
let end = after_block
.find(|c: char| !c.is_ascii_digit())
.unwrap_or(after_block.len());
after_block[..end].parse::<u64>().ok()
}

View File

@@ -6,6 +6,26 @@ use std::{
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
const FONT: &str = "monospace";
macro_rules! configure_chart_mesh {
($chart:expr, $x_desc:expr, $y_desc:expr, $y_formatter:expr) => {
$chart
.configure_mesh()
.disable_mesh()
.x_desc($x_desc)
.y_desc($y_desc)
.x_label_formatter(&|x| format!("{:.0}", x))
.y_label_formatter(&$y_formatter)
.x_labels(12)
.y_labels(10)
.x_label_style((FONT, 16).into_font().color(&TEXT_COLOR.mix(0.7)))
.y_label_style((FONT, 16).into_font().color(&TEXT_COLOR.mix(0.7)))
.axis_style(TEXT_COLOR.mix(0.3))
.draw()?
};
}
#[derive(Debug, Clone)]
struct DataPoint {
timestamp_ms: u64,
@@ -30,6 +50,9 @@ const CHART_COLORS: [RGBColor; 6] = [
RGBColor(255, 159, 64), // Orange
];
// Time window buffer in milliseconds (5 seconds)
const TIME_BUFFER_MS: u64 = 5000;
pub struct Visualizer {
workspace_root: PathBuf,
}
@@ -50,7 +73,6 @@ impl Visualizer {
Ok(Self { workspace_root })
}
/// Generate all charts for all crates in the benches directory
pub fn generate_all_charts(&self) -> Result<()> {
let benches_dir = self.workspace_root.join("benches");
@@ -58,7 +80,6 @@ impl Visualizer {
return Err("Benches directory does not exist".into());
}
// Iterate through each crate directory
for entry in fs::read_dir(&benches_dir)? {
let entry = entry?;
let path = entry.path();
@@ -77,11 +98,10 @@ impl Visualizer {
Ok(())
}
/// Generate charts for a specific crate
fn generate_crate_charts(&self, crate_path: &Path, crate_name: &str) -> Result<()> {
// Read all benchmark runs for this crate
let disk_runs = self.read_benchmark_runs(crate_path, "disk_usage.csv")?;
let memory_runs = self.read_benchmark_runs(crate_path, "memory_footprint.csv")?;
let disk_runs = self.read_benchmark_runs(crate_path, "disk.csv")?;
let memory_runs = self.read_benchmark_runs(crate_path, "memory.csv")?;
let progress_runs = self.read_benchmark_runs(crate_path, "progress.csv")?;
if !disk_runs.is_empty() {
self.generate_disk_chart(crate_path, crate_name, &disk_runs)?;
@@ -91,10 +111,13 @@ impl Visualizer {
self.generate_memory_chart(crate_path, crate_name, &memory_runs)?;
}
if !progress_runs.is_empty() {
self.generate_progress_chart(crate_path, crate_name, &progress_runs)?;
}
Ok(())
}
/// Read all benchmark runs from subdirectories
fn read_benchmark_runs(&self, crate_path: &Path, filename: &str) -> Result<Vec<BenchmarkRun>> {
let mut runs = Vec::new();
@@ -109,10 +132,15 @@ impl Visualizer {
.ok_or("Invalid run ID")?
.to_string();
// Skip directories that start with underscore
if run_id.starts_with('_') {
continue;
}
let csv_path = run_path.join(filename);
if csv_path.exists()
&& let Ok(data) = self.read_csv(&csv_path)
&& let Ok(data) = Self::read_csv(&csv_path, 2)
{
runs.push(BenchmarkRun { run_id, data });
}
@@ -122,18 +150,17 @@ impl Visualizer {
Ok(runs)
}
/// Read a CSV file and parse data points
fn read_csv(&self, path: &Path) -> Result<Vec<DataPoint>> {
fn read_csv(path: &Path, expected_columns: usize) -> Result<Vec<DataPoint>> {
let content = fs::read_to_string(path)?;
let mut data = Vec::new();
for (i, line) in content.lines().enumerate() {
if i == 0 {
continue;
} // Skip header
}
let parts: Vec<&str> = line.split(',').collect();
if parts.len() >= 2
if parts.len() >= expected_columns
&& let (Ok(timestamp_ms), Ok(value)) =
(parts[0].parse::<u64>(), parts[1].parse::<f64>())
{
@@ -147,107 +174,325 @@ impl Visualizer {
Ok(data)
}
/// Generate disk usage chart
// Helper methods
fn format_bytes(bytes: f64) -> (f64, &'static str) {
const KIB: f64 = 1024.0;
const MIB: f64 = 1024.0 * 1024.0;
const GIB: f64 = 1024.0 * 1024.0 * 1024.0;
if bytes >= GIB {
(bytes / GIB, "GiB")
} else if bytes >= MIB {
(bytes / MIB, "MiB")
} else if bytes >= KIB {
(bytes / KIB, "KiB")
} else {
(bytes, "bytes")
}
}
fn format_time(time_s: f64) -> (f64, &'static str, &'static str) {
const MINUTE: f64 = 60.0;
const HOUR: f64 = 3600.0;
// Only use larger units if the value would be >= 2 (to avoid decimals)
if time_s >= HOUR * 2.0 {
(time_s / HOUR, "h", "Time (h)")
} else if time_s >= MINUTE * 2.0 {
(time_s / MINUTE, "min", "Time (min)")
} else {
(time_s, "s", "Time (s)")
}
}
fn format_axis_number(value: f64) -> String {
if value >= 1000.0 {
let k_value = value / 1000.0;
// Show decimals only if needed
if k_value.fract() == 0.0 || k_value >= 100.0 {
format!("{:.0}k", k_value)
} else if k_value >= 10.0 {
format!("{:.1}k", k_value)
} else {
format!("{:.2}k", k_value)
}
} else {
format!("{:.0}", value)
}
}
fn calculate_min_max_time(runs: &[BenchmarkRun]) -> u64 {
runs.iter()
.filter_map(|r| r.data.iter().map(|d| d.timestamp_ms).max())
.min()
.unwrap_or(1000)
}
fn calculate_max_value(runs: &[BenchmarkRun]) -> f64 {
runs.iter()
.flat_map(|r| r.data.iter().map(|d| d.value))
.fold(0.0_f64, f64::max)
}
fn trim_runs_to_time_window(runs: &[BenchmarkRun], max_time_ms: u64) -> Vec<BenchmarkRun> {
runs.iter()
.map(|run| BenchmarkRun {
run_id: run.run_id.clone(),
data: run
.data
.iter()
.filter(|d| d.timestamp_ms <= max_time_ms)
.cloned()
.collect(),
})
.collect()
}
fn draw_line_series<'a, DB: DrawingBackend, I>(
chart: &mut ChartContext<
'a,
DB,
Cartesian2d<
plotters::coord::types::RangedCoordf64,
plotters::coord::types::RangedCoordf64,
>,
>,
data: I,
label: &str,
color: RGBColor,
) -> Result<()>
where
I: Iterator<Item = (f64, f64)> + Clone,
DB::ErrorType: 'static,
{
chart
.draw_series(LineSeries::new(data, color.stroke_width(1)))?
.label(label)
.legend(move |(x, y)| {
PathElement::new(vec![(x, y), (x + 20, y)], color.stroke_width(1))
});
Ok(())
}
fn configure_series_labels<'a, DB: DrawingBackend + 'a>(
chart: &mut ChartContext<
'a,
DB,
Cartesian2d<
plotters::coord::types::RangedCoordf64,
plotters::coord::types::RangedCoordf64,
>,
>,
) -> Result<()>
where
DB::ErrorType: 'static,
{
chart
.configure_series_labels()
.position(SeriesLabelPosition::UpperLeft)
.label_font((FONT, 16).into_font().color(&TEXT_COLOR.mix(0.9)))
.background_style(BG_COLOR.mix(0.98))
.border_style(BG_COLOR)
.margin(10)
.draw()?;
Ok(())
}
// Chart generation methods
fn generate_disk_chart(
&self,
crate_path: &Path,
crate_name: &str,
runs: &[BenchmarkRun],
) -> Result<()> {
let output_path = crate_path.join("disk_usage_chart.svg");
let output_path = crate_path.join("disk_chart.svg");
let root = SVGBackend::new(&output_path, (1200, 700)).into_drawing_area();
root.fill(&BG_COLOR)?;
let max_time = runs
.iter()
.flat_map(|r| r.data.iter().map(|d| d.timestamp_ms))
.max()
.unwrap_or(1000);
// Calculate time window based on shortest run + buffer
let min_max_time_ms = Self::calculate_min_max_time(runs) + TIME_BUFFER_MS;
let max_time_s = (min_max_time_ms as f64) / 1000.0;
let max_value = runs
.iter()
.flat_map(|r| r.data.iter().map(|d| d.value))
.fold(0.0_f64, f64::max);
// Trim all runs to the same time window
let trimmed_runs = Self::trim_runs_to_time_window(runs, min_max_time_ms);
// Convert to seconds and GB
let max_time_s = (max_time as f64) / 1000.0;
let max_value_gb = max_value / 1024.0;
let max_value = Self::calculate_max_value(&trimmed_runs);
let (max_value_scaled, unit) = Self::format_bytes(max_value);
let scale_factor = max_value / max_value_scaled;
// Format time based on duration
let (max_time_scaled, _time_unit, time_label) = Self::format_time(max_time_s);
let mut chart = ChartBuilder::on(&root)
.caption(
format!("{} — Disk Usage", crate_name),
("SF Mono", 24).into_font().color(&TEXT_COLOR),
(FONT, 24).into_font().color(&TEXT_COLOR),
)
.margin(20)
.x_label_area_size(55)
.y_label_area_size(75)
.build_cartesian_2d(0.0..max_time_s * 1.05, 0.0..max_value_gb * 1.1)?;
.x_label_area_size(50)
.margin_left(50)
.right_y_label_area_size(75)
.build_cartesian_2d(0.0..max_time_scaled * 1.025, 0.0..max_value_scaled * 1.1)?;
chart
.configure_mesh()
.disable_mesh()
.x_desc("Time (s)")
.y_desc("Disk Usage (GB)")
.x_label_offset(10)
.y_label_offset(10)
.x_label_formatter(&|x| format!("{:.1}", x))
.y_label_formatter(&|y| format!("{:.2}", y))
.x_labels(8)
.y_labels(6)
.x_label_style(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.7)))
.y_label_style(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.7)))
.axis_style(TEXT_COLOR.mix(0.3))
.draw()?;
configure_chart_mesh!(
chart,
time_label,
format!("Disk Usage ({})", unit),
|y: &f64| format!("{:.2}", y)
);
for (idx, run) in runs.iter().enumerate() {
for (idx, run) in trimmed_runs.iter().enumerate() {
let color = CHART_COLORS[idx % CHART_COLORS.len()];
chart
.draw_series(LineSeries::new(
run.data
.iter()
.map(|d| (d.timestamp_ms as f64 / 1000.0, d.value / 1024.0)),
color.stroke_width(2),
))?
.label(&run.run_id)
.legend(move |(x, y)| {
PathElement::new(vec![(x, y), (x + 20, y)], color.stroke_width(2))
});
let time_divisor = max_time_s / max_time_scaled;
Self::draw_line_series(
&mut chart,
run.data.iter().map(|d| {
(
d.timestamp_ms as f64 / 1000.0 / time_divisor,
d.value / scale_factor,
)
}),
&run.run_id,
color,
)?;
}
chart
.configure_series_labels()
.position(SeriesLabelPosition::UpperLeft)
.label_font(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.9)))
.background_style(BG_COLOR.mix(0.98))
.border_style(BG_COLOR)
.margin(10)
.draw()?;
Self::configure_series_labels(&mut chart)?;
root.present()?;
println!("Generated: {}", output_path.display());
Ok(())
}
/// Generate memory footprint chart
fn generate_memory_chart(
&self,
crate_path: &Path,
crate_name: &str,
runs: &[BenchmarkRun],
) -> Result<()> {
let output_path = crate_path.join("memory_footprint_chart.svg");
let output_path = crate_path.join("memory_chart.svg");
let root = SVGBackend::new(&output_path, (1200, 700)).into_drawing_area();
root.fill(&BG_COLOR)?;
// Calculate time window based on shortest run + buffer
let min_max_time_ms = Self::calculate_min_max_time(runs) + TIME_BUFFER_MS;
let max_time_s = (min_max_time_ms as f64) / 1000.0;
// Read memory CSV files which have 3 columns: timestamp, footprint, peak
let enhanced_runs = self.read_memory_data(crate_path, runs)?;
// Trim enhanced runs to the same time window
let trimmed_enhanced_runs: Vec<_> = enhanced_runs
.into_iter()
.map(|(run_id, footprint, peak)| {
let trimmed_footprint: Vec<_> = footprint
.into_iter()
.filter(|d| d.timestamp_ms <= min_max_time_ms)
.collect();
let trimmed_peak: Vec<_> = peak
.into_iter()
.filter(|d| d.timestamp_ms <= min_max_time_ms)
.collect();
(run_id, trimmed_footprint, trimmed_peak)
})
.collect();
let max_value = trimmed_enhanced_runs
.iter()
.flat_map(|(_, f, p)| f.iter().chain(p.iter()).map(|d| d.value))
.fold(0.0_f64, f64::max);
let (max_value_scaled, unit) = Self::format_bytes(max_value);
let scale_factor = max_value / max_value_scaled;
// Format time based on duration
let (max_time_scaled, _time_unit, time_label) = Self::format_time(max_time_s);
let mut chart = ChartBuilder::on(&root)
.caption(
format!("{} — Memory", crate_name),
(FONT, 24).into_font().color(&TEXT_COLOR),
)
.margin(20)
.x_label_area_size(50)
.margin_left(50)
.right_y_label_area_size(75)
.build_cartesian_2d(0.0..max_time_scaled * 1.025, 0.0..max_value_scaled * 1.1)?;
configure_chart_mesh!(
chart,
time_label,
format!("Memory ({})", unit),
|y: &f64| format!("{:.2}", y)
);
let time_divisor = max_time_s / max_time_scaled;
for (idx, (run_id, footprint_data, peak_data)) in trimmed_enhanced_runs.iter().enumerate() {
let color = CHART_COLORS[idx % CHART_COLORS.len()];
Self::draw_line_series(
&mut chart,
footprint_data.iter().map(|d| {
(
d.timestamp_ms as f64 / 1000.0 / time_divisor,
d.value / scale_factor,
)
}),
&format!("{} (current)", run_id),
color,
)?;
// Draw peak line (dashed) - inline to handle time_divisor
let dashed_color = color.mix(0.5);
chart
.draw_series(
peak_data
.iter()
.map(|d| {
(
d.timestamp_ms as f64 / 1000.0 / time_divisor,
d.value / scale_factor,
)
})
.zip(peak_data.iter().skip(1).map(|d| {
(
d.timestamp_ms as f64 / 1000.0 / time_divisor,
d.value / scale_factor,
)
}))
.enumerate()
.filter(|(i, _)| i % 2 == 0)
.map(|(_, (p1, p2))| {
PathElement::new(vec![p1, p2], dashed_color.stroke_width(2))
}),
)?
.label(format!("{} (peak)", run_id))
.legend(move |(x, y)| {
PathElement::new(
vec![(x, y), (x + 10, y), (x + 20, y)],
dashed_color.stroke_width(2),
)
});
}
Self::configure_series_labels(&mut chart)?;
root.present()?;
println!("Generated: {}", output_path.display());
Ok(())
}
#[allow(clippy::type_complexity)]
fn read_memory_data(
&self,
crate_path: &Path,
runs: &[BenchmarkRun],
) -> Result<Vec<(String, Vec<DataPoint>, Vec<DataPoint>)>> {
let mut enhanced_runs = Vec::new();
for run in runs {
// Re-read the CSV to get both footprint and peak values
let csv_path = crate_path.join(&run.run_id).join("memory_footprint.csv");
let csv_path = crate_path.join(&run.run_id).join("memory.csv");
if let Ok(content) = fs::read_to_string(&csv_path) {
let mut footprint_data = Vec::new();
let mut peak_data = Vec::new();
@@ -255,7 +500,7 @@ impl Visualizer {
for (i, line) in content.lines().enumerate() {
if i == 0 {
continue;
} // Skip header
}
let parts: Vec<&str> = line.split(',').collect();
if parts.len() >= 3
@@ -280,100 +525,71 @@ impl Visualizer {
}
}
let max_time = enhanced_runs
.iter()
.flat_map(|(_, f, p)| f.iter().chain(p.iter()).map(|d| d.timestamp_ms))
.max()
.unwrap_or(1000);
Ok(enhanced_runs)
}
let max_value = enhanced_runs
.iter()
.flat_map(|(_, f, p)| f.iter().chain(p.iter()).map(|d| d.value))
.fold(0.0_f64, f64::max);
fn generate_progress_chart(
&self,
crate_path: &Path,
crate_name: &str,
runs: &[BenchmarkRun],
) -> Result<()> {
let output_path = crate_path.join("progress_chart.svg");
let root = SVGBackend::new(&output_path, (1200, 700)).into_drawing_area();
root.fill(&BG_COLOR)?;
// Convert to seconds and GB
let max_time_s = (max_time as f64) / 1000.0;
let max_value_gb = max_value / 1024.0;
// Calculate time window based on shortest run + buffer
let min_max_time_ms = Self::calculate_min_max_time(runs) + TIME_BUFFER_MS;
let max_time_s = (min_max_time_ms as f64) / 1000.0;
// Trim all runs to the same time window
let trimmed_runs = Self::trim_runs_to_time_window(runs, min_max_time_ms);
let max_block = Self::calculate_max_value(&trimmed_runs);
// Format time based on duration
let (max_time_scaled, _time_unit, time_label) = Self::format_time(max_time_s);
let mut chart = ChartBuilder::on(&root)
.caption(
format!("{}Memory Footprint", crate_name),
("SF Mono", 24).into_font().color(&TEXT_COLOR),
format!("{}Progress", crate_name),
(FONT, 24).into_font().color(&TEXT_COLOR),
)
.margin(20)
.x_label_area_size(55)
.y_label_area_size(75)
.build_cartesian_2d(0.0..max_time_s * 1.05, 0.0..max_value_gb * 1.1)?;
.x_label_area_size(50)
.margin_left(50)
.right_y_label_area_size(75)
.build_cartesian_2d(0.0..max_time_scaled * 1.025, 0.0..max_block * 1.1)?;
chart
.configure_mesh()
.disable_mesh()
.x_desc("Time (s)")
.y_desc("Memory (GB)")
.x_label_offset(10)
.y_label_offset(10)
.x_label_formatter(&|x| format!("{:.1}", x))
.y_label_formatter(&|y| format!("{:.2}", y))
.x_labels(8)
.y_labels(6)
.x_label_style(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.7)))
.y_label_style(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.7)))
.x_desc(time_label)
.y_desc("Block Number")
.x_label_formatter(&|x| Self::format_axis_number(*x))
.y_label_formatter(&|y| Self::format_axis_number(*y))
.x_labels(12)
.y_labels(10)
.x_label_style((FONT, 16).into_font().color(&TEXT_COLOR.mix(0.7)))
.y_label_style((FONT, 16).into_font().color(&TEXT_COLOR.mix(0.7)))
.axis_style(TEXT_COLOR.mix(0.3))
.draw()?;
for (idx, (run_id, footprint_data, peak_data)) in enhanced_runs.iter().enumerate() {
let time_divisor = max_time_s / max_time_scaled;
for (idx, run) in trimmed_runs.iter().enumerate() {
let color = CHART_COLORS[idx % CHART_COLORS.len()];
// Draw footprint line (solid)
chart
.draw_series(LineSeries::new(
footprint_data
.iter()
.map(|d| (d.timestamp_ms as f64 / 1000.0, d.value / 1024.0)),
color.stroke_width(2),
))?
.label(format!("{} (current)", run_id))
.legend(move |(x, y)| {
PathElement::new(vec![(x, y), (x + 20, y)], color.stroke_width(2))
});
// Draw peak line (dashed, slightly transparent)
let dashed_color = color.mix(0.5);
chart
.draw_series(
peak_data
.iter()
.map(|d| (d.timestamp_ms as f64 / 1000.0, d.value / 1024.0))
.zip(
peak_data
.iter()
.skip(1)
.map(|d| (d.timestamp_ms as f64 / 1000.0, d.value / 1024.0)),
)
.enumerate()
.filter(|(i, _)| i % 2 == 0) // Create dashed effect
.map(|(_, (p1, p2))| {
PathElement::new(vec![p1, p2], dashed_color.stroke_width(2))
}),
)?
.label(format!("{} (peak)", run_id))
.legend(move |(x, y)| {
PathElement::new(
vec![(x, y), (x + 10, y), (x + 20, y)],
dashed_color.stroke_width(2),
)
});
Self::draw_line_series(
&mut chart,
run.data
.iter()
.map(|d| (d.timestamp_ms as f64 / 1000.0 / time_divisor, d.value)),
&run.run_id,
color,
)?;
}
chart
.configure_series_labels()
.position(SeriesLabelPosition::UpperLeft)
.label_font(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.9)))
.background_style(BG_COLOR.mix(0.98))
.border_style(BG_COLOR)
.margin(10)
.draw()?;
Self::configure_series_labels(&mut chart)?;
root.present()?;
println!("Generated: {}", output_path.display());
Ok(())

View File

@@ -1,4 +1,4 @@
use std::{env, fs, path::Path, time::Instant};
use std::{env, fs, io::Write, path::Path, time::Instant};
use brk_bencher::Bencher;
use brk_error::Result;
@@ -6,10 +6,11 @@ use brk_indexer::Indexer;
use brk_iterator::Blocks;
use brk_reader::Reader;
use brk_rpc::{Auth, Client};
use log::debug;
use vecdb::Exit;
fn main() -> Result<()> {
brk_logger::init(Some(Path::new(".log")))?;
brk_logger::init(None)?;
let bitcoin_dir = Client::default_bitcoin_path();
// let bitcoin_dir = Path::new("/Volumes/WD_BLACK1/bitcoin");
@@ -31,21 +32,24 @@ fn main() -> Result<()> {
let mut indexer = Indexer::forced_import(&outputs_dir)?;
let exit = Exit::new();
exit.set_ctrlc_handler();
let mut bencher =
Bencher::from_cargo_env(env!("CARGO_PKG_NAME"), &outputs_dir.join("indexed/stores"))?;
bencher.start()?;
let exit = Exit::new();
exit.set_ctrlc_handler();
let bencher_clone = bencher.clone();
exit.register_cleanup(move || {
let _ = bencher_clone.stop();
debug!("Bench stopped.");
});
let i = Instant::now();
indexer.checked_index(&blocks, &client, &exit)?;
dbg!(i.elapsed());
// We want to benchmark the drop too
drop(indexer);
// Stop and finalize
bencher.stop()?;
Ok(())
}

View File

@@ -17,13 +17,13 @@ use rayon::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet};
use vecdb::{AnyVec, Exit, GenericStoredVec, Reader, TypedVecIterator};
mod indexes;
mod stores_v2;
// mod stores_v3;
// mod stores_v2;
mod stores_v3;
mod vecs;
pub use indexes::*;
pub use stores_v2::*;
// pub use stores_v3::*;
// pub use stores_v2::*;
pub use stores_v3::*;
pub use vecs::*;
// One version for all data sources

View File

@@ -119,14 +119,34 @@ impl Stores {
}
pub fn starting_height(&self) -> Height {
self.iter_any_store()
.map(|store| {
// let height =
store.height().map(Height::incremented).unwrap_or_default()
// dbg!((height, store.name()));
})
.min()
.unwrap()
[
&self.blockhashprefix_to_height as &dyn AnyStore,
&self.height_to_coinbase_tag,
&self.txidprefix_to_txindex,
]
.into_iter()
.chain(
self.addresstype_to_addresshash_to_addressindex
.iter()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_txindex
.iter()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_unspentoutpoint
.iter()
.map(|s| s as &dyn AnyStore),
)
.map(|store| {
// let height =
store.height().map(Height::incremented).unwrap_or_default()
// dbg!((height, store.name()));
})
.min()
.unwrap()
}
pub fn commit(&mut self, height: Height) -> Result<()> {
@@ -158,38 +178,13 @@ impl Stores {
})
.collect::<Result<Vec<_>>>()?;
let batch = self.keyspace.inner().batch();
batch.commit_partitions(tuples)?;
self.keyspace.inner().batch().commit_partitions(tuples)?;
self.keyspace
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
fn iter_any_store(&self) -> impl Iterator<Item = &dyn AnyStore> {
[
&self.blockhashprefix_to_height as &dyn AnyStore,
&self.height_to_coinbase_tag,
&self.txidprefix_to_txindex,
]
.into_iter()
.chain(
self.addresstype_to_addresshash_to_addressindex
.iter()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_txindex
.iter()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_unspentoutpoint
.iter()
.map(|s| s as &dyn AnyStore),
)
}
pub fn rollback_if_needed(
&mut self,
vecs: &mut Vecs,

View File

@@ -8,7 +8,7 @@ use brk_types::{
OutPoint, OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version,
Vout,
};
use fjall3::{Database, PersistMode, WriteBatch};
use fjall3::{Database, PersistMode};
use rayon::prelude::*;
use vecdb::{AnyVec, GenericStoredVec, TypedVecIterator, VecIndex, VecIterator};
@@ -158,7 +158,7 @@ impl Stores {
}
pub fn commit(&mut self, height: Height) -> Result<()> {
let items = [
let tuples = [
&mut self.blockhashprefix_to_height as &mut dyn AnyStore,
&mut self.height_to_coinbase_tag,
&mut self.txidprefix_to_txindex,
@@ -182,16 +182,11 @@ impl Stores {
.map(|store| {
let items = store.take_all_f3();
store.export_meta_if_needed(height)?;
Ok(items)
Ok((store.keyspace(), items))
})
.collect::<Result<Vec<_>>>()?;
let capacity = items.iter().map(|v| v.len()).sum();
let mut batch = WriteBatch::with_capacity(self.database.clone(), capacity);
items.into_iter().for_each(|items| {
batch.ingest(items);
});
batch.commit()?;
self.database.batch().commit_partitions(tuples)?;
self.database
.persist(PersistMode::SyncAll)
@@ -203,9 +198,6 @@ impl Stores {
&self.blockhashprefix_to_height as &dyn AnyStore,
&self.height_to_coinbase_tag,
&self.txidprefix_to_txindex,
// &self.addresshash_to_typeindex,
// &self.addresstype_to_addressindex_and_txindex,
// &self.addresstype_to_addressindex_and_unspentoutpoint,
]
.into_iter()
.chain(
@@ -233,11 +225,6 @@ impl Stores {
if self.blockhashprefix_to_height.is_empty()?
&& self.txidprefix_to_txindex.is_empty()?
&& self.height_to_coinbase_tag.is_empty()?
// && self.addresshash_to_typeindex.is_empty()?
// && self.addresstype_to_addressindex_and_txindex.is_empty()?
// && self
// .addresstype_to_addressindex_and_unspentoutpoint
// .is_empty()?
&& self
.addresstype_to_addresshash_to_addressindex
.iter()

View File

@@ -3,7 +3,7 @@
use std::{
fmt::Display,
fs::{self, OpenOptions},
io::{self, Write},
io::{self, BufWriter, Write},
path::Path,
sync::OnceLock,
};
@@ -13,19 +13,23 @@ use jiff::{Timestamp, tz};
pub use owo_colors::OwoColorize;
use parking_lot::Mutex;
static LOG_FILE: OnceLock<Mutex<fs::File>> = OnceLock::new();
// Type alias for the hook function
type LogHook = Box<dyn Fn(&str) + Send + Sync>;
static LOG_HOOK: OnceLock<LogHook> = OnceLock::new();
static LOG_FILE: OnceLock<Mutex<BufWriter<fs::File>>> = OnceLock::new();
#[inline]
pub fn init(path: Option<&Path>) -> io::Result<()> {
if let Some(path) = path {
let _ = fs::remove_file(path);
let file = OpenOptions::new().create(true).append(true).open(path)?;
LOG_FILE.set(Mutex::new(file)).ok();
LOG_FILE.set(Mutex::new(BufWriter::new(file))).ok();
}
Builder::from_env(Env::default().default_filter_or(
"info,bitcoin=off,bitcoincore-rpc=off,fjall=off,lsm-tree=off,rolldown=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off,brk_aide=off",
// "debug,fjall=trace,bitcoin=off,bitcoincore-rpc=off,rolldown=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off,brk_aide=off",
"debug,bitcoin=off,bitcoincore-rpc=off,fjall=off,brk_fjall=off,lsm-tree=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off",
// "debug,fjall=trace,bitcoin=off,bitcoincore-rpc=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off",
))
.format(move |buf, record| {
let date_time = Timestamp::now()
@@ -38,6 +42,10 @@ pub fn init(path: Option<&Path>) -> io::Result<()> {
let dash = "-";
let args = record.args();
if let Some(hook) = LOG_HOOK.get() {
hook(&args.to_string());
}
if let Some(file) = LOG_FILE.get() {
let _ = write(&mut *file.lock(), &date_time, target, &level, dash, args);
}
@@ -67,6 +75,17 @@ pub fn init(path: Option<&Path>) -> io::Result<()> {
Ok(())
}
/// Register a hook that gets called for every log message
/// Can only be called once
pub fn register_hook<F>(hook: F) -> Result<(), &'static str>
where
F: Fn(&str) + Send + Sync + 'static,
{
LOG_HOOK
.set(Box::new(hook))
.map_err(|_| "Hook already registered")
}
fn write(
mut buf: impl Write,
date_time: impl Display,

View File

@@ -1,7 +1,5 @@
use brk_error::Result;
use brk_types::{Height, Version};
use fjall2::{InnerItem, PartitionHandle};
use fjall3::Item;
pub trait AnyStore: Send + Sync {
fn name(&self) -> &'static str;
@@ -10,8 +8,9 @@ pub trait AnyStore: Send + Sync {
fn needs(&self, height: Height) -> bool;
fn version(&self) -> Version;
fn export_meta_if_needed(&mut self, height: Height) -> Result<()>;
fn partition(&self) -> &PartitionHandle;
fn take_all_f2(&mut self) -> Vec<InnerItem>;
fn take_all_f3(&mut self) -> Vec<Item>;
fn keyspace(&self) -> &fjall3::Keyspace;
fn partition(&self) -> &fjall2::PartitionHandle;
fn take_all_f2(&mut self) -> Vec<fjall2::InnerItem>;
fn take_all_f3(&mut self) -> Vec<fjall3::InnerItem>;
// fn take_all_f3(&mut self) -> Box<dyn Iterator<Item = Item>>;
}

View File

@@ -174,6 +174,10 @@ where
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
fn keyspace(&self) -> &fjall3::Keyspace {
panic!()
}
fn partition(&self) -> &fjall2::PartitionHandle {
self.partition.inner()
}
@@ -192,7 +196,7 @@ where
items.into_iter().map(InnerItem::from).collect()
}
fn take_all_f3(&mut self) -> Vec<fjall3::Item> {
fn take_all_f3(&mut self) -> Vec<fjall3::InnerItem> {
panic!()
}

View File

@@ -175,6 +175,10 @@ where
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
fn keyspace(&self) -> &fjall3::Keyspace {
&self.keyspace
}
fn take_all_f2(&mut self) -> Vec<fjall2::InnerItem> {
vec![]
}
@@ -183,7 +187,7 @@ where
panic!()
}
fn take_all_f3(&mut self) -> Vec<fjall3::Item> {
fn take_all_f3(&mut self) -> Vec<fjall3::InnerItem> {
let mut items = mem::take(&mut self.puts)
.into_iter()
.map(|(key, value)| Item::Value { key, value })
@@ -194,10 +198,7 @@ where
)
.collect::<Vec<_>>();
items.sort_unstable();
items
.into_iter()
.map(|v| v.fjalled(&self.keyspace))
.collect()
items.into_iter().map(|v| v.fjalled()).collect()
}
fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
@@ -261,22 +262,18 @@ impl<K, V> Item<K, V> {
}
}
pub fn fjalled(self, keyspace: &Keyspace) -> fjall3::Item
pub fn fjalled(self) -> fjall3::InnerItem
where
K: Into<ByteView>,
V: Into<ByteView>,
{
let keyspace_id = keyspace.id;
// let keyspace_id = keyspace.inner().id;
match self {
Item::Value { key, value } => fjall3::Item {
keyspace_id,
Item::Value { key, value } => fjall3::InnerItem {
key: key.into().into(),
value: value.into().into(),
value_type: ValueType::Value,
},
Item::Tomb(key) => fjall3::Item {
keyspace_id,
Item::Tomb(key) => fjall3::InnerItem {
key: key.into().into(),
value: [].into(),
value_type: ValueType::Tombstone,