diff --git a/.gitignore b/.gitignore index 08c1e99aa..524853c18 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,6 @@ flamegraph.svg # Expand expand.rs + +# Benchmarks +[0-9]/ diff --git a/Cargo.lock b/Cargo.lock index dc77b0c9a..7ce679ba8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -211,7 +211,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -533,6 +533,8 @@ name = "brk_bencher" version = "0.0.111" dependencies = [ "brk_error", + "brk_logger", + "parking_lot", ] [[package]] @@ -814,7 +816,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1262,7 +1264,7 @@ version = "0.0.111" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1271,7 +1273,7 @@ version = "0.0.111" dependencies = [ "bitcoin", "brk_error", - "byteview 0.6.1", + "byteview 0.8.0", "derive_deref", "itoa", "jiff", @@ -1442,7 +1444,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1754,7 +1756,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1765,7 +1767,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1799,7 +1801,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1830,7 +1832,7 @@ checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "unicode-xid", ] @@ -1885,7 +1887,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1960,7 +1962,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2194,7 +2196,7 @@ checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2288,7 +2290,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2772,7 +2774,7 @@ checksum = "980af8b43c3ad5d8d349ace167ec8170839f753a42d233ba19e08afe1850fa69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3064,7 +3066,7 @@ checksum = "4568f25ccbd45ab5d5603dc34318c1ec56b117531781260002151b8530a9f931" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3189,7 +3191,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3272,9 +3274,9 @@ dependencies = [ [[package]] name = "oxc-browserslist" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7612127f5fa1ef0f3e98905ddfc10a7fa1df98944b75d774bb3cb4b530663616" +checksum = "f978be538ca5e2a64326d24b7991dc658cc8495132833ae387212ab3b8abd70a" dependencies = [ "bincode", "flate2", @@ -3309,7 +3311,7 @@ checksum = "003b4612827f6501183873fb0735da92157e3c7daa71c40921c7d2758fec2229" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3353,7 +3355,7 @@ dependencies = [ "phf", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3879,7 +3881,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3908,7 +3910,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4080,7 +4082,7 @@ checksum = "7347867d0a7e1208d93b46767be83e2b8f978c3dad35f775ac8d8847551d6fe1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4283,7 +4285,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4371,7 +4373,7 @@ checksum = "bd83f5f173ff41e00337d97f6572e416d022ef8a19f371817259ae960324c482" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4541,7 +4543,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4646,7 +4648,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4657,7 +4659,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4923,7 +4925,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -4948,9 +4950,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.109" +version = "2.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f17c7e013e88258aa9543dcbe81aca68a667a9ac37cd69c9fbc07858bfe0e2f" +checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" dependencies = [ "proc-macro2", "quote", @@ -4971,7 +4973,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5033,7 +5035,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5044,7 +5046,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5123,7 +5125,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5269,7 +5271,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5359,7 +5361,7 @@ checksum = "ee6ff59666c9cbaec3533964505d39154dc4e0a56151fdea30a09ed0301f62e2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "termcolor", ] @@ -5541,7 +5543,7 @@ name = "vecdb_derive" version = "0.3.16" dependencies = [ "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5628,7 +5630,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "wasm-bindgen-shared", ] @@ -5747,7 +5749,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5758,7 +5760,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -6032,7 +6034,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "synstructure", ] @@ -6053,7 +6055,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -6073,7 +6075,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "synstructure", ] @@ -6107,7 +6109,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ec5ca2ca5..7887533d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,8 +58,8 @@ brk_store = { version = "0.0.111", path = "crates/brk_store" } brk_types = { version = "0.0.111", path = "crates/brk_types" } brk_traversable = { version = "0.0.111", path = "crates/brk_traversable", features = ["derive"] } brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_derive" } -byteview = "=0.6.1" -# byteview = "~0.8.0" +# byteview = "=0.6.1" +byteview = "~0.8.0" derive_deref = "1.1.1" # fjall2 = { version = "2.11.5", package = "brk_fjall" } fjall2 = { path = "../fjall2", package = "brk_fjall" } diff --git a/crates/brk_bencher/Cargo.toml b/crates/brk_bencher/Cargo.toml index 9515c2d57..87b3ec885 100644 --- a/crates/brk_bencher/Cargo.toml +++ b/crates/brk_bencher/Cargo.toml @@ -11,3 +11,5 @@ build = "build.rs" [dependencies] brk_error = { workspace = true } +brk_logger = { workspace = true } +parking_lot = { workspace = true } diff --git a/crates/brk_bencher/src/disk.rs b/crates/brk_bencher/src/disk.rs new file mode 100644 index 000000000..d930e6ba0 --- /dev/null +++ b/crates/brk_bencher/src/disk.rs @@ -0,0 +1,54 @@ +use std::collections::HashMap; +use std::fs; +use std::io; +use std::os::unix::fs::MetadataExt; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +pub struct DiskMonitor { + cache: HashMap, // path -> (bytes_used, mtime) +} + +impl DiskMonitor { + pub fn new() -> Self { + Self { + cache: HashMap::new(), + } + } + + /// Get disk usage in bytes (matches `du` and Finder) + pub fn get_disk_usage(&mut self, path: &Path) -> io::Result { + self.scan_recursive(path) + } + + fn scan_recursive(&mut self, path: &Path) -> io::Result { + let mut total = 0; + + for entry in fs::read_dir(path)? { + let entry = entry?; + let path = entry.path(); + let metadata = entry.metadata()?; + + if metadata.is_file() { + let mtime = metadata.modified()?; + + // Check cache: if mtime unchanged, use cached value + if let Some((cached_bytes, cached_mtime)) = self.cache.get(&path) + && *cached_mtime == mtime + { + total += cached_bytes; + continue; + } + + // File is new or modified - get actual disk usage + let bytes = metadata.blocks() * 512; + self.cache.insert(path, (bytes, mtime)); + total += bytes; + } else if metadata.is_dir() { + total += self.scan_recursive(&path)?; + } + } + + Ok(total) + } +} diff --git a/crates/brk_bencher/src/lib.rs b/crates/brk_bencher/src/lib.rs index 77ffd7344..beadace0e 100644 --- a/crates/brk_bencher/src/lib.rs +++ b/crates/brk_bencher/src/lib.rs @@ -1,8 +1,7 @@ use std::{ fs, - io::Write, + io::{self, Write}, path::{Path, PathBuf}, - process::Command, sync::{ Arc, atomic::{AtomicBool, Ordering}, @@ -13,11 +12,24 @@ use std::{ use brk_error::Result; -pub struct Bencher { +mod disk; +mod memory; +mod progression; + +use disk::*; +use memory::*; +use parking_lot::Mutex; +use progression::*; + +#[derive(Clone)] +pub struct Bencher(Arc); + +struct BencherInner { bench_dir: PathBuf, monitored_path: PathBuf, stop_flag: Arc, - monitor_thread: Option>>, + monitor_thread: Mutex>>>, + progression: Arc, } impl Bencher { @@ -33,12 +45,23 @@ impl Bencher { fs::create_dir_all(&bench_dir)?; - Ok(Self { + let progress_csv = bench_dir.join("progress.csv"); + let progression = Arc::new(ProgressionMonitor::new(&progress_csv)?); + let progression_clone = progression.clone(); + + // Register hook with logger + brk_logger::register_hook(move |message| { + progression_clone.check_and_record(message); + }) + .map_err(|e| io::Error::new(io::ErrorKind::AlreadyExists, e))?; + + Ok(Self(Arc::new(BencherInner { bench_dir, monitored_path: monitored_path.to_path_buf(), stop_flag: Arc::new(AtomicBool::new(false)), - monitor_thread: None, - }) + progression, + monitor_thread: Mutex::new(None), + }))) } /// Create a bencher using CARGO_MANIFEST_DIR to find workspace root @@ -69,144 +92,38 @@ impl Bencher { /// Start monitoring disk usage and memory footprint pub fn start(&mut self) -> Result<()> { - if self.monitor_thread.is_some() { + if self.0.monitor_thread.lock().is_some() { return Err("Bencher already started".into()); } - let stop_flag = self.stop_flag.clone(); - let bench_dir = self.bench_dir.clone(); - let monitored_path = self.monitored_path.clone(); + let stop_flag = self.0.stop_flag.clone(); + let bench_dir = self.0.bench_dir.clone(); + let monitored_path = self.0.monitored_path.clone(); let handle = thread::spawn(move || monitor_resources(&monitored_path, &bench_dir, stop_flag)); - self.monitor_thread = Some(handle); + *self.0.monitor_thread.lock() = Some(handle); Ok(()) } /// Stop monitoring and wait for the thread to finish - pub fn stop(mut self) -> Result<()> { - self.stop_flag.store(true, Ordering::Relaxed); + pub fn stop(&self) -> Result<()> { + self.0.stop_flag.store(true, Ordering::Relaxed); - if let Some(handle) = self.monitor_thread.take() { + if let Some(handle) = self.0.monitor_thread.lock().take() { handle.join().map_err(|_| "Monitor thread panicked")??; } + self.0.progression.flush()?; + Ok(()) } - - /// Get the benchmark output directory - pub fn bench_dir(&self) -> &Path { - &self.bench_dir - } } -fn parse_size_to_mb(value_str: &str, unit: &str) -> Option { - let value: f64 = value_str.parse().ok()?; - match unit { - "MB" | "M" => Some(value), - "GB" | "G" => Some(value * 1024.0), - "KB" | "K" => Some(value / 1024.0), - "B" => Some(value / 1024.0 / 1024.0), - _ => None, - } -} - -fn parse_du_output(size_str: &str) -> Option { - // Parse outputs like "524M", "287G", "4.0K" - let size_str = size_str.trim(); - - if let Some(unit_pos) = size_str.find(|c: char| c.is_alphabetic()) { - let (value_part, unit_part) = size_str.split_at(unit_pos); - parse_size_to_mb(value_part, unit_part) - } else { - // No unit means bytes - let value: f64 = size_str.parse().ok()?; - Some(value / 1024.0 / 1024.0) - } -} - -fn parse_footprint_output(output: &str) -> Option<(f64, f64)> { - let mut phys_footprint = None; - let mut phys_footprint_peak = None; - - for line in output.lines() { - if line.contains("phys_footprint:") && !line.contains("peak") { - let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.len() >= 3 { - phys_footprint = parse_size_to_mb(parts[1], parts[2]); - } - } else if line.contains("phys_footprint_peak:") { - let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.len() >= 3 { - phys_footprint_peak = parse_size_to_mb(parts[1], parts[2]); - } - } - } - - match (phys_footprint, phys_footprint_peak) { - (Some(f), Some(p)) => Some((f, p)), - _ => None, - } -} - -#[cfg(target_os = "linux")] -fn get_memory_usage_linux(pid: u32) -> Result<(f64, f64)> { - // Read /proc/[pid]/status for memory information - let status_path = format!("/proc/{}/status", pid); - let status_content = fs::read_to_string(status_path)?; - - let mut vm_rss = None; - let mut vm_hwm = None; - - for line in status_content.lines() { - if line.starts_with("VmRSS:") { - // Current RSS in kB - if let Some(value_str) = line.split_whitespace().nth(1) - && let Ok(kb) = value_str.parse::() - { - vm_rss = Some(kb / 1024.0); // Convert kB to MB - } - } else if line.starts_with("VmHWM:") { - // Peak RSS (High Water Mark) in kB - if let Some(value_str) = line.split_whitespace().nth(1) - && let Ok(kb) = value_str.parse::() - { - vm_hwm = Some(kb / 1024.0); // Convert kB to MB - } - } - } - - match (vm_rss, vm_hwm) { - (Some(rss), Some(hwm)) => Ok((rss, hwm)), - _ => Err("Failed to parse memory info from /proc/[pid]/status".into()), - } -} - -#[cfg(target_os = "macos")] -fn get_memory_usage_macos(pid: u32) -> Result<(f64, f64)> { - let output = Command::new("footprint") - .args(["-p", &pid.to_string()]) - .output()?; - - let stdout = String::from_utf8(output.stdout).unwrap(); - parse_footprint_output(&stdout).ok_or_else(|| "Failed to parse footprint output".into()) -} - -fn get_memory_usage(pid: u32) -> Result<(f64, f64)> { - #[cfg(target_os = "macos")] - { - get_memory_usage_macos(pid) - } - - #[cfg(target_os = "linux")] - { - get_memory_usage_linux(pid) - } - - #[cfg(not(any(target_os = "macos", target_os = "linux")))] - { - Err("Unsupported platform for memory monitoring".into()) +impl Drop for Bencher { + fn drop(&mut self) { + let _ = self.stop(); } } @@ -215,41 +132,43 @@ fn monitor_resources( bench_dir: &Path, stop_flag: Arc, ) -> Result<()> { - let disk_file = bench_dir.join("disk_usage.csv"); - let memory_file = bench_dir.join("memory_footprint.csv"); + let disk_file = bench_dir.join("disk.csv"); + let memory_file = bench_dir.join("memory.csv"); let mut disk_writer = fs::File::create(disk_file)?; let mut memory_writer = fs::File::create(memory_file)?; - writeln!(disk_writer, "timestamp_ms,disk_usage_mb")?; + writeln!(disk_writer, "timestamp_ms,disk_usage")?; writeln!( memory_writer, - "timestamp_ms,phys_footprint_mb,phys_footprint_peak_mb" + "timestamp_ms,phys_footprint,phys_footprint_peak" )?; let pid = std::process::id(); let start = Instant::now(); - while !stop_flag.load(Ordering::Relaxed) { + let mut disk_monitor = DiskMonitor::new(); + let memory_monitor = MemoryMonitor::new(pid); + + 'l: loop { let elapsed_ms = start.elapsed().as_millis(); - // Get disk usage - if let Ok(output) = Command::new("du") - .args(["-sh", monitored_path.to_str().unwrap()]) - .output() - && let Ok(stdout) = String::from_utf8(output.stdout) - && let Some(size_str) = stdout.split_whitespace().next() - && let Some(size_mb) = parse_du_output(size_str) - { - writeln!(disk_writer, "{},{}", elapsed_ms, size_mb)?; + if let Ok(bytes) = disk_monitor.get_disk_usage(monitored_path) { + writeln!(disk_writer, "{},{}", elapsed_ms, bytes)?; } - // Get memory footprint (cross-platform) - if let Ok((footprint, peak)) = get_memory_usage(pid) { + if let Ok((footprint, peak)) = memory_monitor.get_memory_usage() { writeln!(memory_writer, "{},{},{}", elapsed_ms, footprint, peak)?; } - thread::sleep(Duration::from_secs(5)); + // Best version + for _ in 0..50 { + // 50 * 100ms = 5 seconds + if stop_flag.load(Ordering::Relaxed) { + break 'l; + } + thread::sleep(Duration::from_millis(100)); + } } Ok(()) diff --git a/crates/brk_bencher/src/memory.rs b/crates/brk_bencher/src/memory.rs new file mode 100644 index 000000000..58489f051 --- /dev/null +++ b/crates/brk_bencher/src/memory.rs @@ -0,0 +1,127 @@ +use std::io; + +#[cfg(target_os = "linux")] +use std::fs; + +#[cfg(target_os = "macos")] +use std::process::Command; + +pub struct MemoryMonitor { + pid: u32, +} + +impl MemoryMonitor { + pub fn new(pid: u32) -> Self { + Self { pid } + } + + /// Get memory usage in bytes + /// Returns (current_bytes, peak_bytes) + pub fn get_memory_usage(&self) -> io::Result<(u64, u64)> { + #[cfg(target_os = "linux")] + { + self.get_memory_usage_linux() + } + + #[cfg(target_os = "macos")] + { + self.get_memory_usage_macos() + } + } + + #[cfg(target_os = "linux")] + fn get_memory_usage_linux(&self) -> io::Result<(u64, u64)> { + let status_content = fs::read_to_string(format!("/proc/{}/status", self.pid))?; + + let mut vm_rss = None; + let mut vm_hwm = None; + + for line in status_content.lines() { + if line.starts_with("VmRSS:") { + if let Some(value_str) = line.split_whitespace().nth(1) { + if let Ok(kb) = value_str.parse::() { + vm_rss = Some(kb * 1024); // KiB to bytes + } + } + } else if line.starts_with("VmHWM:") { + if let Some(value_str) = line.split_whitespace().nth(1) { + if let Ok(kb) = value_str.parse::() { + vm_hwm = Some(kb * 1024); // KiB to bytes + } + } + } + } + + match (vm_rss, vm_hwm) { + (Some(rss), Some(hwm)) => Ok((rss, hwm)), + _ => Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to parse memory info from /proc/[pid]/status", + )), + } + } + + #[cfg(target_os = "macos")] + fn get_memory_usage_macos(&self) -> io::Result<(u64, u64)> { + let output = Command::new("footprint") + .args(["-p", &self.pid.to_string()]) + .output()?; + + let stdout = String::from_utf8(output.stdout).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "Invalid UTF-8 from footprint") + })?; + + parse_footprint_output(&stdout).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + "Failed to parse footprint output", + ) + }) + } +} + +#[cfg(target_os = "macos")] +fn parse_footprint_output(output: &str) -> Option<(u64, u64)> { + let mut phys_footprint = None; + let mut phys_footprint_peak = None; + + for line in output.lines() { + let line = line.trim(); + + if line.starts_with("phys_footprint:") { + // Format: "phys_footprint: 7072 KB" + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 3 { + // parts[0] = "phys_footprint:" + // parts[1] = "7072" + // parts[2] = "KB" + phys_footprint = parse_size_to_bytes(parts[1], parts[2]); + } + } else if line.starts_with("phys_footprint_peak:") { + // Format: "phys_footprint_peak: 15 MB" + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 3 { + phys_footprint_peak = parse_size_to_bytes(parts[1], parts[2]); + } + } + } + + match (phys_footprint, phys_footprint_peak) { + (Some(f), Some(p)) => Some((f, p)), + _ => None, + } +} + +#[cfg(target_os = "macos")] +fn parse_size_to_bytes(value: &str, unit: &str) -> Option { + let value: f64 = value.parse().ok()?; + + let multiplier = match unit.to_uppercase().as_str() { + "KB" => 1024.0, // KiB to bytes + "MB" => 1024.0 * 1024.0, // MiB to bytes + "GB" => 1024.0 * 1024.0 * 1024.0, // GiB to bytes + _ => return None, + }; + + Some((value * multiplier) as u64) +} diff --git a/crates/brk_bencher/src/progression.rs b/crates/brk_bencher/src/progression.rs new file mode 100644 index 000000000..9643b548b --- /dev/null +++ b/crates/brk_bencher/src/progression.rs @@ -0,0 +1,56 @@ +use parking_lot::Mutex; +use std::{ + fs, + io::{self, BufWriter, Write}, + path::Path, + time::Instant, +}; + +pub struct ProgressionMonitor { + csv_file: Mutex>, + start_time: Instant, +} + +impl ProgressionMonitor { + pub fn new(csv_path: &Path) -> io::Result { + let mut csv_file = BufWriter::new(fs::File::create(csv_path)?); + writeln!(csv_file, "timestamp_ms,block_number")?; + + Ok(Self { + csv_file: Mutex::new(csv_file), + start_time: Instant::now(), + }) + } + + /// Fast inline check and record + #[inline] + pub fn check_and_record(&self, message: &str) { + if !message.contains("block ") { + return; + } + + if let Some(block_num) = parse_block_number(message) + && block_num % 10 == 0 + { + let elapsed_ms = self.start_time.elapsed().as_millis(); + let mut writer = self.csv_file.lock(); + let _ = writeln!(writer, "{},{}", elapsed_ms, block_num); + } + } + + pub fn flush(&self) -> io::Result<()> { + self.csv_file.lock().flush() + } +} + +#[inline] +fn parse_block_number(message: &str) -> Option { + let start = message.find("block ")?; + let after_block = &message[start + 6..]; + + let end = after_block + .find(|c: char| !c.is_ascii_digit()) + .unwrap_or(after_block.len()); + + after_block[..end].parse::().ok() +} diff --git a/crates/brk_bencher_visualizer/src/lib.rs b/crates/brk_bencher_visualizer/src/lib.rs index 4efb89a9a..8673956f2 100644 --- a/crates/brk_bencher_visualizer/src/lib.rs +++ b/crates/brk_bencher_visualizer/src/lib.rs @@ -6,6 +6,26 @@ use std::{ type Result = std::result::Result>; +const FONT: &str = "monospace"; + +macro_rules! configure_chart_mesh { + ($chart:expr, $x_desc:expr, $y_desc:expr, $y_formatter:expr) => { + $chart + .configure_mesh() + .disable_mesh() + .x_desc($x_desc) + .y_desc($y_desc) + .x_label_formatter(&|x| format!("{:.0}", x)) + .y_label_formatter(&$y_formatter) + .x_labels(12) + .y_labels(10) + .x_label_style((FONT, 16).into_font().color(&TEXT_COLOR.mix(0.7))) + .y_label_style((FONT, 16).into_font().color(&TEXT_COLOR.mix(0.7))) + .axis_style(TEXT_COLOR.mix(0.3)) + .draw()? + }; +} + #[derive(Debug, Clone)] struct DataPoint { timestamp_ms: u64, @@ -30,6 +50,9 @@ const CHART_COLORS: [RGBColor; 6] = [ RGBColor(255, 159, 64), // Orange ]; +// Time window buffer in milliseconds (5 seconds) +const TIME_BUFFER_MS: u64 = 5000; + pub struct Visualizer { workspace_root: PathBuf, } @@ -50,7 +73,6 @@ impl Visualizer { Ok(Self { workspace_root }) } - /// Generate all charts for all crates in the benches directory pub fn generate_all_charts(&self) -> Result<()> { let benches_dir = self.workspace_root.join("benches"); @@ -58,7 +80,6 @@ impl Visualizer { return Err("Benches directory does not exist".into()); } - // Iterate through each crate directory for entry in fs::read_dir(&benches_dir)? { let entry = entry?; let path = entry.path(); @@ -77,11 +98,10 @@ impl Visualizer { Ok(()) } - /// Generate charts for a specific crate fn generate_crate_charts(&self, crate_path: &Path, crate_name: &str) -> Result<()> { - // Read all benchmark runs for this crate - let disk_runs = self.read_benchmark_runs(crate_path, "disk_usage.csv")?; - let memory_runs = self.read_benchmark_runs(crate_path, "memory_footprint.csv")?; + let disk_runs = self.read_benchmark_runs(crate_path, "disk.csv")?; + let memory_runs = self.read_benchmark_runs(crate_path, "memory.csv")?; + let progress_runs = self.read_benchmark_runs(crate_path, "progress.csv")?; if !disk_runs.is_empty() { self.generate_disk_chart(crate_path, crate_name, &disk_runs)?; @@ -91,10 +111,13 @@ impl Visualizer { self.generate_memory_chart(crate_path, crate_name, &memory_runs)?; } + if !progress_runs.is_empty() { + self.generate_progress_chart(crate_path, crate_name, &progress_runs)?; + } + Ok(()) } - /// Read all benchmark runs from subdirectories fn read_benchmark_runs(&self, crate_path: &Path, filename: &str) -> Result> { let mut runs = Vec::new(); @@ -109,10 +132,15 @@ impl Visualizer { .ok_or("Invalid run ID")? .to_string(); + // Skip directories that start with underscore + if run_id.starts_with('_') { + continue; + } + let csv_path = run_path.join(filename); if csv_path.exists() - && let Ok(data) = self.read_csv(&csv_path) + && let Ok(data) = Self::read_csv(&csv_path, 2) { runs.push(BenchmarkRun { run_id, data }); } @@ -122,18 +150,17 @@ impl Visualizer { Ok(runs) } - /// Read a CSV file and parse data points - fn read_csv(&self, path: &Path) -> Result> { + fn read_csv(path: &Path, expected_columns: usize) -> Result> { let content = fs::read_to_string(path)?; let mut data = Vec::new(); for (i, line) in content.lines().enumerate() { if i == 0 { continue; - } // Skip header + } let parts: Vec<&str> = line.split(',').collect(); - if parts.len() >= 2 + if parts.len() >= expected_columns && let (Ok(timestamp_ms), Ok(value)) = (parts[0].parse::(), parts[1].parse::()) { @@ -147,107 +174,325 @@ impl Visualizer { Ok(data) } - /// Generate disk usage chart + // Helper methods + + fn format_bytes(bytes: f64) -> (f64, &'static str) { + const KIB: f64 = 1024.0; + const MIB: f64 = 1024.0 * 1024.0; + const GIB: f64 = 1024.0 * 1024.0 * 1024.0; + + if bytes >= GIB { + (bytes / GIB, "GiB") + } else if bytes >= MIB { + (bytes / MIB, "MiB") + } else if bytes >= KIB { + (bytes / KIB, "KiB") + } else { + (bytes, "bytes") + } + } + + fn format_time(time_s: f64) -> (f64, &'static str, &'static str) { + const MINUTE: f64 = 60.0; + const HOUR: f64 = 3600.0; + + // Only use larger units if the value would be >= 2 (to avoid decimals) + if time_s >= HOUR * 2.0 { + (time_s / HOUR, "h", "Time (h)") + } else if time_s >= MINUTE * 2.0 { + (time_s / MINUTE, "min", "Time (min)") + } else { + (time_s, "s", "Time (s)") + } + } + + fn format_axis_number(value: f64) -> String { + if value >= 1000.0 { + let k_value = value / 1000.0; + // Show decimals only if needed + if k_value.fract() == 0.0 || k_value >= 100.0 { + format!("{:.0}k", k_value) + } else if k_value >= 10.0 { + format!("{:.1}k", k_value) + } else { + format!("{:.2}k", k_value) + } + } else { + format!("{:.0}", value) + } + } + + fn calculate_min_max_time(runs: &[BenchmarkRun]) -> u64 { + runs.iter() + .filter_map(|r| r.data.iter().map(|d| d.timestamp_ms).max()) + .min() + .unwrap_or(1000) + } + + fn calculate_max_value(runs: &[BenchmarkRun]) -> f64 { + runs.iter() + .flat_map(|r| r.data.iter().map(|d| d.value)) + .fold(0.0_f64, f64::max) + } + + fn trim_runs_to_time_window(runs: &[BenchmarkRun], max_time_ms: u64) -> Vec { + runs.iter() + .map(|run| BenchmarkRun { + run_id: run.run_id.clone(), + data: run + .data + .iter() + .filter(|d| d.timestamp_ms <= max_time_ms) + .cloned() + .collect(), + }) + .collect() + } + + fn draw_line_series<'a, DB: DrawingBackend, I>( + chart: &mut ChartContext< + 'a, + DB, + Cartesian2d< + plotters::coord::types::RangedCoordf64, + plotters::coord::types::RangedCoordf64, + >, + >, + data: I, + label: &str, + color: RGBColor, + ) -> Result<()> + where + I: Iterator + Clone, + DB::ErrorType: 'static, + { + chart + .draw_series(LineSeries::new(data, color.stroke_width(1)))? + .label(label) + .legend(move |(x, y)| { + PathElement::new(vec![(x, y), (x + 20, y)], color.stroke_width(1)) + }); + Ok(()) + } + + fn configure_series_labels<'a, DB: DrawingBackend + 'a>( + chart: &mut ChartContext< + 'a, + DB, + Cartesian2d< + plotters::coord::types::RangedCoordf64, + plotters::coord::types::RangedCoordf64, + >, + >, + ) -> Result<()> + where + DB::ErrorType: 'static, + { + chart + .configure_series_labels() + .position(SeriesLabelPosition::UpperLeft) + .label_font((FONT, 16).into_font().color(&TEXT_COLOR.mix(0.9))) + .background_style(BG_COLOR.mix(0.98)) + .border_style(BG_COLOR) + .margin(10) + .draw()?; + Ok(()) + } + + // Chart generation methods + fn generate_disk_chart( &self, crate_path: &Path, crate_name: &str, runs: &[BenchmarkRun], ) -> Result<()> { - let output_path = crate_path.join("disk_usage_chart.svg"); - + let output_path = crate_path.join("disk_chart.svg"); let root = SVGBackend::new(&output_path, (1200, 700)).into_drawing_area(); root.fill(&BG_COLOR)?; - let max_time = runs - .iter() - .flat_map(|r| r.data.iter().map(|d| d.timestamp_ms)) - .max() - .unwrap_or(1000); + // Calculate time window based on shortest run + buffer + let min_max_time_ms = Self::calculate_min_max_time(runs) + TIME_BUFFER_MS; + let max_time_s = (min_max_time_ms as f64) / 1000.0; - let max_value = runs - .iter() - .flat_map(|r| r.data.iter().map(|d| d.value)) - .fold(0.0_f64, f64::max); + // Trim all runs to the same time window + let trimmed_runs = Self::trim_runs_to_time_window(runs, min_max_time_ms); - // Convert to seconds and GB - let max_time_s = (max_time as f64) / 1000.0; - let max_value_gb = max_value / 1024.0; + let max_value = Self::calculate_max_value(&trimmed_runs); + let (max_value_scaled, unit) = Self::format_bytes(max_value); + let scale_factor = max_value / max_value_scaled; + + // Format time based on duration + let (max_time_scaled, _time_unit, time_label) = Self::format_time(max_time_s); let mut chart = ChartBuilder::on(&root) .caption( format!("{} — Disk Usage", crate_name), - ("SF Mono", 24).into_font().color(&TEXT_COLOR), + (FONT, 24).into_font().color(&TEXT_COLOR), ) .margin(20) - .x_label_area_size(55) - .y_label_area_size(75) - .build_cartesian_2d(0.0..max_time_s * 1.05, 0.0..max_value_gb * 1.1)?; + .x_label_area_size(50) + .margin_left(50) + .right_y_label_area_size(75) + .build_cartesian_2d(0.0..max_time_scaled * 1.025, 0.0..max_value_scaled * 1.1)?; - chart - .configure_mesh() - .disable_mesh() - .x_desc("Time (s)") - .y_desc("Disk Usage (GB)") - .x_label_offset(10) - .y_label_offset(10) - .x_label_formatter(&|x| format!("{:.1}", x)) - .y_label_formatter(&|y| format!("{:.2}", y)) - .x_labels(8) - .y_labels(6) - .x_label_style(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.7))) - .y_label_style(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.7))) - .axis_style(TEXT_COLOR.mix(0.3)) - .draw()?; + configure_chart_mesh!( + chart, + time_label, + format!("Disk Usage ({})", unit), + |y: &f64| format!("{:.2}", y) + ); - for (idx, run) in runs.iter().enumerate() { + for (idx, run) in trimmed_runs.iter().enumerate() { let color = CHART_COLORS[idx % CHART_COLORS.len()]; - - chart - .draw_series(LineSeries::new( - run.data - .iter() - .map(|d| (d.timestamp_ms as f64 / 1000.0, d.value / 1024.0)), - color.stroke_width(2), - ))? - .label(&run.run_id) - .legend(move |(x, y)| { - PathElement::new(vec![(x, y), (x + 20, y)], color.stroke_width(2)) - }); + let time_divisor = max_time_s / max_time_scaled; + Self::draw_line_series( + &mut chart, + run.data.iter().map(|d| { + ( + d.timestamp_ms as f64 / 1000.0 / time_divisor, + d.value / scale_factor, + ) + }), + &run.run_id, + color, + )?; } - chart - .configure_series_labels() - .position(SeriesLabelPosition::UpperLeft) - .label_font(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.9))) - .background_style(BG_COLOR.mix(0.98)) - .border_style(BG_COLOR) - .margin(10) - .draw()?; - + Self::configure_series_labels(&mut chart)?; root.present()?; println!("Generated: {}", output_path.display()); Ok(()) } - /// Generate memory footprint chart fn generate_memory_chart( &self, crate_path: &Path, crate_name: &str, runs: &[BenchmarkRun], ) -> Result<()> { - let output_path = crate_path.join("memory_footprint_chart.svg"); - + let output_path = crate_path.join("memory_chart.svg"); let root = SVGBackend::new(&output_path, (1200, 700)).into_drawing_area(); root.fill(&BG_COLOR)?; + // Calculate time window based on shortest run + buffer + let min_max_time_ms = Self::calculate_min_max_time(runs) + TIME_BUFFER_MS; + let max_time_s = (min_max_time_ms as f64) / 1000.0; + // Read memory CSV files which have 3 columns: timestamp, footprint, peak + let enhanced_runs = self.read_memory_data(crate_path, runs)?; + + // Trim enhanced runs to the same time window + let trimmed_enhanced_runs: Vec<_> = enhanced_runs + .into_iter() + .map(|(run_id, footprint, peak)| { + let trimmed_footprint: Vec<_> = footprint + .into_iter() + .filter(|d| d.timestamp_ms <= min_max_time_ms) + .collect(); + let trimmed_peak: Vec<_> = peak + .into_iter() + .filter(|d| d.timestamp_ms <= min_max_time_ms) + .collect(); + (run_id, trimmed_footprint, trimmed_peak) + }) + .collect(); + + let max_value = trimmed_enhanced_runs + .iter() + .flat_map(|(_, f, p)| f.iter().chain(p.iter()).map(|d| d.value)) + .fold(0.0_f64, f64::max); + + let (max_value_scaled, unit) = Self::format_bytes(max_value); + let scale_factor = max_value / max_value_scaled; + + // Format time based on duration + let (max_time_scaled, _time_unit, time_label) = Self::format_time(max_time_s); + + let mut chart = ChartBuilder::on(&root) + .caption( + format!("{} — Memory", crate_name), + (FONT, 24).into_font().color(&TEXT_COLOR), + ) + .margin(20) + .x_label_area_size(50) + .margin_left(50) + .right_y_label_area_size(75) + .build_cartesian_2d(0.0..max_time_scaled * 1.025, 0.0..max_value_scaled * 1.1)?; + + configure_chart_mesh!( + chart, + time_label, + format!("Memory ({})", unit), + |y: &f64| format!("{:.2}", y) + ); + + let time_divisor = max_time_s / max_time_scaled; + + for (idx, (run_id, footprint_data, peak_data)) in trimmed_enhanced_runs.iter().enumerate() { + let color = CHART_COLORS[idx % CHART_COLORS.len()]; + + Self::draw_line_series( + &mut chart, + footprint_data.iter().map(|d| { + ( + d.timestamp_ms as f64 / 1000.0 / time_divisor, + d.value / scale_factor, + ) + }), + &format!("{} (current)", run_id), + color, + )?; + + // Draw peak line (dashed) - inline to handle time_divisor + let dashed_color = color.mix(0.5); + chart + .draw_series( + peak_data + .iter() + .map(|d| { + ( + d.timestamp_ms as f64 / 1000.0 / time_divisor, + d.value / scale_factor, + ) + }) + .zip(peak_data.iter().skip(1).map(|d| { + ( + d.timestamp_ms as f64 / 1000.0 / time_divisor, + d.value / scale_factor, + ) + })) + .enumerate() + .filter(|(i, _)| i % 2 == 0) + .map(|(_, (p1, p2))| { + PathElement::new(vec![p1, p2], dashed_color.stroke_width(2)) + }), + )? + .label(format!("{} (peak)", run_id)) + .legend(move |(x, y)| { + PathElement::new( + vec![(x, y), (x + 10, y), (x + 20, y)], + dashed_color.stroke_width(2), + ) + }); + } + + Self::configure_series_labels(&mut chart)?; + root.present()?; + println!("Generated: {}", output_path.display()); + Ok(()) + } + + #[allow(clippy::type_complexity)] + fn read_memory_data( + &self, + crate_path: &Path, + runs: &[BenchmarkRun], + ) -> Result, Vec)>> { let mut enhanced_runs = Vec::new(); for run in runs { - // Re-read the CSV to get both footprint and peak values - let csv_path = crate_path.join(&run.run_id).join("memory_footprint.csv"); + let csv_path = crate_path.join(&run.run_id).join("memory.csv"); if let Ok(content) = fs::read_to_string(&csv_path) { let mut footprint_data = Vec::new(); let mut peak_data = Vec::new(); @@ -255,7 +500,7 @@ impl Visualizer { for (i, line) in content.lines().enumerate() { if i == 0 { continue; - } // Skip header + } let parts: Vec<&str> = line.split(',').collect(); if parts.len() >= 3 @@ -280,100 +525,71 @@ impl Visualizer { } } - let max_time = enhanced_runs - .iter() - .flat_map(|(_, f, p)| f.iter().chain(p.iter()).map(|d| d.timestamp_ms)) - .max() - .unwrap_or(1000); + Ok(enhanced_runs) + } - let max_value = enhanced_runs - .iter() - .flat_map(|(_, f, p)| f.iter().chain(p.iter()).map(|d| d.value)) - .fold(0.0_f64, f64::max); + fn generate_progress_chart( + &self, + crate_path: &Path, + crate_name: &str, + runs: &[BenchmarkRun], + ) -> Result<()> { + let output_path = crate_path.join("progress_chart.svg"); + let root = SVGBackend::new(&output_path, (1200, 700)).into_drawing_area(); + root.fill(&BG_COLOR)?; - // Convert to seconds and GB - let max_time_s = (max_time as f64) / 1000.0; - let max_value_gb = max_value / 1024.0; + // Calculate time window based on shortest run + buffer + let min_max_time_ms = Self::calculate_min_max_time(runs) + TIME_BUFFER_MS; + let max_time_s = (min_max_time_ms as f64) / 1000.0; + + // Trim all runs to the same time window + let trimmed_runs = Self::trim_runs_to_time_window(runs, min_max_time_ms); + + let max_block = Self::calculate_max_value(&trimmed_runs); + + // Format time based on duration + let (max_time_scaled, _time_unit, time_label) = Self::format_time(max_time_s); let mut chart = ChartBuilder::on(&root) .caption( - format!("{} — Memory Footprint", crate_name), - ("SF Mono", 24).into_font().color(&TEXT_COLOR), + format!("{} — Progress", crate_name), + (FONT, 24).into_font().color(&TEXT_COLOR), ) .margin(20) - .x_label_area_size(55) - .y_label_area_size(75) - .build_cartesian_2d(0.0..max_time_s * 1.05, 0.0..max_value_gb * 1.1)?; + .x_label_area_size(50) + .margin_left(50) + .right_y_label_area_size(75) + .build_cartesian_2d(0.0..max_time_scaled * 1.025, 0.0..max_block * 1.1)?; chart .configure_mesh() .disable_mesh() - .x_desc("Time (s)") - .y_desc("Memory (GB)") - .x_label_offset(10) - .y_label_offset(10) - .x_label_formatter(&|x| format!("{:.1}", x)) - .y_label_formatter(&|y| format!("{:.2}", y)) - .x_labels(8) - .y_labels(6) - .x_label_style(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.7))) - .y_label_style(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.7))) + .x_desc(time_label) + .y_desc("Block Number") + .x_label_formatter(&|x| Self::format_axis_number(*x)) + .y_label_formatter(&|y| Self::format_axis_number(*y)) + .x_labels(12) + .y_labels(10) + .x_label_style((FONT, 16).into_font().color(&TEXT_COLOR.mix(0.7))) + .y_label_style((FONT, 16).into_font().color(&TEXT_COLOR.mix(0.7))) .axis_style(TEXT_COLOR.mix(0.3)) .draw()?; - for (idx, (run_id, footprint_data, peak_data)) in enhanced_runs.iter().enumerate() { + let time_divisor = max_time_s / max_time_scaled; + + for (idx, run) in trimmed_runs.iter().enumerate() { let color = CHART_COLORS[idx % CHART_COLORS.len()]; - - // Draw footprint line (solid) - chart - .draw_series(LineSeries::new( - footprint_data - .iter() - .map(|d| (d.timestamp_ms as f64 / 1000.0, d.value / 1024.0)), - color.stroke_width(2), - ))? - .label(format!("{} (current)", run_id)) - .legend(move |(x, y)| { - PathElement::new(vec![(x, y), (x + 20, y)], color.stroke_width(2)) - }); - - // Draw peak line (dashed, slightly transparent) - let dashed_color = color.mix(0.5); - chart - .draw_series( - peak_data - .iter() - .map(|d| (d.timestamp_ms as f64 / 1000.0, d.value / 1024.0)) - .zip( - peak_data - .iter() - .skip(1) - .map(|d| (d.timestamp_ms as f64 / 1000.0, d.value / 1024.0)), - ) - .enumerate() - .filter(|(i, _)| i % 2 == 0) // Create dashed effect - .map(|(_, (p1, p2))| { - PathElement::new(vec![p1, p2], dashed_color.stroke_width(2)) - }), - )? - .label(format!("{} (peak)", run_id)) - .legend(move |(x, y)| { - PathElement::new( - vec![(x, y), (x + 10, y), (x + 20, y)], - dashed_color.stroke_width(2), - ) - }); + Self::draw_line_series( + &mut chart, + run.data + .iter() + .map(|d| (d.timestamp_ms as f64 / 1000.0 / time_divisor, d.value)), + &run.run_id, + color, + )?; } - chart - .configure_series_labels() - .position(SeriesLabelPosition::UpperLeft) - .label_font(("SF Mono", 16).into_font().color(&TEXT_COLOR.mix(0.9))) - .background_style(BG_COLOR.mix(0.98)) - .border_style(BG_COLOR) - .margin(10) - .draw()?; - + Self::configure_series_labels(&mut chart)?; root.present()?; println!("Generated: {}", output_path.display()); Ok(()) diff --git a/crates/brk_indexer/examples/indexer_bench.rs b/crates/brk_indexer/examples/indexer_bench.rs index 5f9e288a3..8ae95ce12 100644 --- a/crates/brk_indexer/examples/indexer_bench.rs +++ b/crates/brk_indexer/examples/indexer_bench.rs @@ -1,4 +1,4 @@ -use std::{env, fs, path::Path, time::Instant}; +use std::{env, fs, io::Write, path::Path, time::Instant}; use brk_bencher::Bencher; use brk_error::Result; @@ -6,10 +6,11 @@ use brk_indexer::Indexer; use brk_iterator::Blocks; use brk_reader::Reader; use brk_rpc::{Auth, Client}; +use log::debug; use vecdb::Exit; fn main() -> Result<()> { - brk_logger::init(Some(Path::new(".log")))?; + brk_logger::init(None)?; let bitcoin_dir = Client::default_bitcoin_path(); // let bitcoin_dir = Path::new("/Volumes/WD_BLACK1/bitcoin"); @@ -31,21 +32,24 @@ fn main() -> Result<()> { let mut indexer = Indexer::forced_import(&outputs_dir)?; - let exit = Exit::new(); - exit.set_ctrlc_handler(); - let mut bencher = Bencher::from_cargo_env(env!("CARGO_PKG_NAME"), &outputs_dir.join("indexed/stores"))?; bencher.start()?; + let exit = Exit::new(); + exit.set_ctrlc_handler(); + let bencher_clone = bencher.clone(); + exit.register_cleanup(move || { + let _ = bencher_clone.stop(); + debug!("Bench stopped."); + }); + let i = Instant::now(); indexer.checked_index(&blocks, &client, &exit)?; dbg!(i.elapsed()); + // We want to benchmark the drop too drop(indexer); - // Stop and finalize - bencher.stop()?; - Ok(()) } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index bcf26cc0f..b31483320 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -17,13 +17,13 @@ use rayon::prelude::*; use rustc_hash::{FxHashMap, FxHashSet}; use vecdb::{AnyVec, Exit, GenericStoredVec, Reader, TypedVecIterator}; mod indexes; -mod stores_v2; -// mod stores_v3; +// mod stores_v2; +mod stores_v3; mod vecs; pub use indexes::*; -pub use stores_v2::*; -// pub use stores_v3::*; +// pub use stores_v2::*; +pub use stores_v3::*; pub use vecs::*; // One version for all data sources diff --git a/crates/brk_indexer/src/stores_v2.rs b/crates/brk_indexer/src/stores_v2.rs index 9deda2125..f8b71fa6c 100644 --- a/crates/brk_indexer/src/stores_v2.rs +++ b/crates/brk_indexer/src/stores_v2.rs @@ -119,14 +119,34 @@ impl Stores { } pub fn starting_height(&self) -> Height { - self.iter_any_store() - .map(|store| { - // let height = - store.height().map(Height::incremented).unwrap_or_default() - // dbg!((height, store.name())); - }) - .min() - .unwrap() + [ + &self.blockhashprefix_to_height as &dyn AnyStore, + &self.height_to_coinbase_tag, + &self.txidprefix_to_txindex, + ] + .into_iter() + .chain( + self.addresstype_to_addresshash_to_addressindex + .iter() + .map(|s| s as &dyn AnyStore), + ) + .chain( + self.addresstype_to_addressindex_and_txindex + .iter() + .map(|s| s as &dyn AnyStore), + ) + .chain( + self.addresstype_to_addressindex_and_unspentoutpoint + .iter() + .map(|s| s as &dyn AnyStore), + ) + .map(|store| { + // let height = + store.height().map(Height::incremented).unwrap_or_default() + // dbg!((height, store.name())); + }) + .min() + .unwrap() } pub fn commit(&mut self, height: Height) -> Result<()> { @@ -158,38 +178,13 @@ impl Stores { }) .collect::>>()?; - let batch = self.keyspace.inner().batch(); - batch.commit_partitions(tuples)?; + self.keyspace.inner().batch().commit_partitions(tuples)?; self.keyspace .persist(PersistMode::SyncAll) .map_err(|e| e.into()) } - fn iter_any_store(&self) -> impl Iterator { - [ - &self.blockhashprefix_to_height as &dyn AnyStore, - &self.height_to_coinbase_tag, - &self.txidprefix_to_txindex, - ] - .into_iter() - .chain( - self.addresstype_to_addresshash_to_addressindex - .iter() - .map(|s| s as &dyn AnyStore), - ) - .chain( - self.addresstype_to_addressindex_and_txindex - .iter() - .map(|s| s as &dyn AnyStore), - ) - .chain( - self.addresstype_to_addressindex_and_unspentoutpoint - .iter() - .map(|s| s as &dyn AnyStore), - ) - } - pub fn rollback_if_needed( &mut self, vecs: &mut Vecs, diff --git a/crates/brk_indexer/src/stores_v3.rs b/crates/brk_indexer/src/stores_v3.rs index 6f63594b1..c28dc09f9 100644 --- a/crates/brk_indexer/src/stores_v3.rs +++ b/crates/brk_indexer/src/stores_v3.rs @@ -8,7 +8,7 @@ use brk_types::{ OutPoint, OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout, }; -use fjall3::{Database, PersistMode, WriteBatch}; +use fjall3::{Database, PersistMode}; use rayon::prelude::*; use vecdb::{AnyVec, GenericStoredVec, TypedVecIterator, VecIndex, VecIterator}; @@ -158,7 +158,7 @@ impl Stores { } pub fn commit(&mut self, height: Height) -> Result<()> { - let items = [ + let tuples = [ &mut self.blockhashprefix_to_height as &mut dyn AnyStore, &mut self.height_to_coinbase_tag, &mut self.txidprefix_to_txindex, @@ -182,16 +182,11 @@ impl Stores { .map(|store| { let items = store.take_all_f3(); store.export_meta_if_needed(height)?; - Ok(items) + Ok((store.keyspace(), items)) }) .collect::>>()?; - let capacity = items.iter().map(|v| v.len()).sum(); - let mut batch = WriteBatch::with_capacity(self.database.clone(), capacity); - items.into_iter().for_each(|items| { - batch.ingest(items); - }); - batch.commit()?; + self.database.batch().commit_partitions(tuples)?; self.database .persist(PersistMode::SyncAll) @@ -203,9 +198,6 @@ impl Stores { &self.blockhashprefix_to_height as &dyn AnyStore, &self.height_to_coinbase_tag, &self.txidprefix_to_txindex, - // &self.addresshash_to_typeindex, - // &self.addresstype_to_addressindex_and_txindex, - // &self.addresstype_to_addressindex_and_unspentoutpoint, ] .into_iter() .chain( @@ -233,11 +225,6 @@ impl Stores { if self.blockhashprefix_to_height.is_empty()? && self.txidprefix_to_txindex.is_empty()? && self.height_to_coinbase_tag.is_empty()? - // && self.addresshash_to_typeindex.is_empty()? - // && self.addresstype_to_addressindex_and_txindex.is_empty()? - // && self - // .addresstype_to_addressindex_and_unspentoutpoint - // .is_empty()? && self .addresstype_to_addresshash_to_addressindex .iter() diff --git a/crates/brk_logger/src/lib.rs b/crates/brk_logger/src/lib.rs index c1ced63d4..cafce99ab 100644 --- a/crates/brk_logger/src/lib.rs +++ b/crates/brk_logger/src/lib.rs @@ -3,7 +3,7 @@ use std::{ fmt::Display, fs::{self, OpenOptions}, - io::{self, Write}, + io::{self, BufWriter, Write}, path::Path, sync::OnceLock, }; @@ -13,19 +13,23 @@ use jiff::{Timestamp, tz}; pub use owo_colors::OwoColorize; use parking_lot::Mutex; -static LOG_FILE: OnceLock> = OnceLock::new(); +// Type alias for the hook function +type LogHook = Box; + +static LOG_HOOK: OnceLock = OnceLock::new(); +static LOG_FILE: OnceLock>> = OnceLock::new(); #[inline] pub fn init(path: Option<&Path>) -> io::Result<()> { if let Some(path) = path { let _ = fs::remove_file(path); let file = OpenOptions::new().create(true).append(true).open(path)?; - LOG_FILE.set(Mutex::new(file)).ok(); + LOG_FILE.set(Mutex::new(BufWriter::new(file))).ok(); } Builder::from_env(Env::default().default_filter_or( - "info,bitcoin=off,bitcoincore-rpc=off,fjall=off,lsm-tree=off,rolldown=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off,brk_aide=off", - // "debug,fjall=trace,bitcoin=off,bitcoincore-rpc=off,rolldown=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off,brk_aide=off", + "debug,bitcoin=off,bitcoincore-rpc=off,fjall=off,brk_fjall=off,lsm-tree=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off", + // "debug,fjall=trace,bitcoin=off,bitcoincore-rpc=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off", )) .format(move |buf, record| { let date_time = Timestamp::now() @@ -38,6 +42,10 @@ pub fn init(path: Option<&Path>) -> io::Result<()> { let dash = "-"; let args = record.args(); + if let Some(hook) = LOG_HOOK.get() { + hook(&args.to_string()); + } + if let Some(file) = LOG_FILE.get() { let _ = write(&mut *file.lock(), &date_time, target, &level, dash, args); } @@ -67,6 +75,17 @@ pub fn init(path: Option<&Path>) -> io::Result<()> { Ok(()) } +/// Register a hook that gets called for every log message +/// Can only be called once +pub fn register_hook(hook: F) -> Result<(), &'static str> +where + F: Fn(&str) + Send + Sync + 'static, +{ + LOG_HOOK + .set(Box::new(hook)) + .map_err(|_| "Hook already registered") +} + fn write( mut buf: impl Write, date_time: impl Display, diff --git a/crates/brk_store/src/any.rs b/crates/brk_store/src/any.rs index 74eb2b245..a43312bb6 100644 --- a/crates/brk_store/src/any.rs +++ b/crates/brk_store/src/any.rs @@ -1,7 +1,5 @@ use brk_error::Result; use brk_types::{Height, Version}; -use fjall2::{InnerItem, PartitionHandle}; -use fjall3::Item; pub trait AnyStore: Send + Sync { fn name(&self) -> &'static str; @@ -10,8 +8,9 @@ pub trait AnyStore: Send + Sync { fn needs(&self, height: Height) -> bool; fn version(&self) -> Version; fn export_meta_if_needed(&mut self, height: Height) -> Result<()>; - fn partition(&self) -> &PartitionHandle; - fn take_all_f2(&mut self) -> Vec; - fn take_all_f3(&mut self) -> Vec; + fn keyspace(&self) -> &fjall3::Keyspace; + fn partition(&self) -> &fjall2::PartitionHandle; + fn take_all_f2(&mut self) -> Vec; + fn take_all_f3(&mut self) -> Vec; // fn take_all_f3(&mut self) -> Box>; } diff --git a/crates/brk_store/src/fjall_v2/mod.rs b/crates/brk_store/src/fjall_v2/mod.rs index 76365a2ea..b39a1fb2a 100644 --- a/crates/brk_store/src/fjall_v2/mod.rs +++ b/crates/brk_store/src/fjall_v2/mod.rs @@ -174,6 +174,10 @@ where ByteView: From + From, Self: Send + Sync, { + fn keyspace(&self) -> &fjall3::Keyspace { + panic!() + } + fn partition(&self) -> &fjall2::PartitionHandle { self.partition.inner() } @@ -192,7 +196,7 @@ where items.into_iter().map(InnerItem::from).collect() } - fn take_all_f3(&mut self) -> Vec { + fn take_all_f3(&mut self) -> Vec { panic!() } diff --git a/crates/brk_store/src/fjall_v3/mod.rs b/crates/brk_store/src/fjall_v3/mod.rs index e021fd0eb..4e4d33427 100644 --- a/crates/brk_store/src/fjall_v3/mod.rs +++ b/crates/brk_store/src/fjall_v3/mod.rs @@ -175,6 +175,10 @@ where ByteView: From + From, Self: Send + Sync, { + fn keyspace(&self) -> &fjall3::Keyspace { + &self.keyspace + } + fn take_all_f2(&mut self) -> Vec { vec![] } @@ -183,7 +187,7 @@ where panic!() } - fn take_all_f3(&mut self) -> Vec { + fn take_all_f3(&mut self) -> Vec { let mut items = mem::take(&mut self.puts) .into_iter() .map(|(key, value)| Item::Value { key, value }) @@ -194,10 +198,7 @@ where ) .collect::>(); items.sort_unstable(); - items - .into_iter() - .map(|v| v.fjalled(&self.keyspace)) - .collect() + items.into_iter().map(|v| v.fjalled()).collect() } fn export_meta_if_needed(&mut self, height: Height) -> Result<()> { @@ -261,22 +262,18 @@ impl Item { } } - pub fn fjalled(self, keyspace: &Keyspace) -> fjall3::Item + pub fn fjalled(self) -> fjall3::InnerItem where K: Into, V: Into, { - let keyspace_id = keyspace.id; - // let keyspace_id = keyspace.inner().id; match self { - Item::Value { key, value } => fjall3::Item { - keyspace_id, + Item::Value { key, value } => fjall3::InnerItem { key: key.into().into(), value: value.into().into(), value_type: ValueType::Value, }, - Item::Tomb(key) => fjall3::Item { - keyspace_id, + Item::Tomb(key) => fjall3::InnerItem { key: key.into().into(), value: [].into(), value_type: ValueType::Tombstone,