diff --git a/.gitignore b/.gitignore index 32c7474d9..18b8eec54 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,6 @@ _* profile.json.gz flamegraph.svg *.trace + +# AI +CLAUDE.md diff --git a/Cargo.lock b/Cargo.lock index 37db97173..dfc50c411 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -583,6 +583,7 @@ dependencies = [ "brk_logger", "ctrlc", "log", + "parking_lot", ] [[package]] @@ -1052,6 +1053,44 @@ dependencies = [ "zstd", ] +[[package]] +name = "brk_vecs" +version = "0.0.81" +dependencies = [ + "arc-swap", + "bincode", + "brk_core", + "brk_exit", + "libc", + "log", + "memmap2", + "parking_lot", + "rayon", + "serde", + "serde_derive", + "serde_json", + "zerocopy", + "zerocopy-derive", + "zstd", +] + +[[package]] +name = "brk_vecs_gen" +version = "0.0.81" +dependencies = [ + "brk_core", + "env_logger", + "log", + "memmap2", + "rayon", + "serde", + "serde_json", + "thiserror 2.0.12", + "zerocopy", + "zerocopy-derive", + "zstd", +] + [[package]] name = "brotli" version = "8.0.1" @@ -3106,6 +3145,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "parking_lot" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" +dependencies = [ + "lock_api", + "parking_lot_core", +] + [[package]] name = "parking_lot_core" version = "0.9.11" diff --git a/Cargo.toml b/Cargo.toml index 8fb9db165..cc818c9b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ brk_parser = { version = "0.0.81", path = "crates/brk_parser" } brk_server = { version = "0.0.81", path = "crates/brk_server" } brk_store = { version = "0.0.81", path = "crates/brk_store" } brk_vec = { version = "0.0.81", path = "crates/brk_vec" } +brk_vecs = { version = "0.0.81", path = "crates/brk_vecs" } byteview = "=0.6.1" clap = { version = "4.5.41", features = ["string"] } clap_derive = "4.5.41" @@ -51,6 +52,7 @@ jiff = "0.2.15" libc = "0.2.174" log = { version = "0.4.27" } minreq = { version = "2.14.0", features = ["https", "serde_json"] } +parking_lot = "0.12.4" rayon = "1.10.0" rmcp = { version = "0.3.0", features = [ "transport-worker", diff --git a/crates/brk_computer/src/stateful/mod.rs b/crates/brk_computer/src/stateful/mod.rs index efb18c109..f94a3f449 100644 --- a/crates/brk_computer/src/stateful/mod.rs +++ b/crates/brk_computer/src/stateful/mod.rs @@ -747,9 +747,10 @@ impl Vecs { separate_address_vecs .par_iter_mut() .try_for_each(|(_, v)| v.state.reset_price_to_amount())?; - } + }; let last_height = indexer.vecs.height_to_blockhash.height(); + if starting_height <= last_height { let inputindex_to_outputindex_mmap = inputindex_to_outputindex.create_mmap()?; let outputindex_to_value_mmap = outputindex_to_value.create_mmap()?; @@ -1237,7 +1238,7 @@ impl Vecs { if height != Height::ZERO && height.unwrap_to_usize() % 10_000 == 0 { info!("Flushing..."); - exit.block(); + let _lock = exit.lock(); self.flush_states(height, &chain_state, mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata), mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata), exit)?; @@ -1245,14 +1246,12 @@ impl Vecs { &mut addresstypeindex_to_anyaddressindex_mmap_opt, &mut anyaddressindex_to_anyaddressdata_mmap_opt, ); - - exit.release(); } Ok(()) })?; - exit.block(); + let _lock = exit.lock(); info!("Flushing..."); @@ -1263,8 +1262,6 @@ impl Vecs { mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata), exit, )?; - } else { - exit.block(); } unsafe { libc::sync() } @@ -1420,8 +1417,6 @@ impl Vecs { unsafe { libc::sync() } - exit.release(); - Ok(()) } diff --git a/crates/brk_core/src/error.rs b/crates/brk_core/src/error.rs index 14964904d..106718f52 100644 --- a/crates/brk_core/src/error.rs +++ b/crates/brk_core/src/error.rs @@ -35,7 +35,8 @@ pub enum Error { WrongAddressType, UnindexableDate, - String(&'static str), + Str(&'static str), + String(String), } impl From for Error { @@ -134,6 +135,7 @@ impl fmt::Display for Error { "Date cannot be indexed, must be 2009-01-03, 2009-01-09 or greater" ), + Error::Str(s) => write!(f, "{s}"), Error::String(s) => write!(f, "{s}"), } } diff --git a/crates/brk_core/src/structs/loadedaddressdata.rs b/crates/brk_core/src/structs/loadedaddressdata.rs index ca33699d9..667932a56 100644 --- a/crates/brk_core/src/structs/loadedaddressdata.rs +++ b/crates/brk_core/src/structs/loadedaddressdata.rs @@ -44,7 +44,7 @@ impl LoadedAddressData { pub fn send(&mut self, amount: Sats, previous_price: Option) -> Result<()> { if self.amount() < amount { - return Err(Error::String("Previous_amount smaller than sent amount")); + return Err(Error::Str("Previous_amount smaller than sent amount")); } self.sent += amount; self.outputs_len -= 1; diff --git a/crates/brk_core/src/utils/bytes.rs b/crates/brk_core/src/utils/bytes.rs index 52ea86924..476058e49 100644 --- a/crates/brk_core/src/utils/bytes.rs +++ b/crates/brk_core/src/utils/bytes.rs @@ -5,7 +5,7 @@ pub fn copy_first_4bytes(slice: &[u8]) -> Result<[u8; 4]> { let mut buf: [u8; 4] = [0; 4]; let buf_len = buf.len(); if slice.len() < buf_len { - return Err(Error::String("Buffer is too small to convert to 8 bytes")); + return Err(Error::Str("Buffer is too small to convert to 8 bytes")); } slice.iter().take(buf_len).enumerate().for_each(|(i, r)| { buf[i] = *r; @@ -18,7 +18,7 @@ pub fn copy_first_8bytes(slice: &[u8]) -> Result<[u8; 8]> { let mut buf: [u8; 8] = [0; 8]; let buf_len = buf.len(); if slice.len() < buf_len { - return Err(Error::String("Buffer is too small to convert to 8 bytes")); + return Err(Error::Str("Buffer is too small to convert to 8 bytes")); } slice.iter().take(buf_len).enumerate().for_each(|(i, r)| { buf[i] = *r; diff --git a/crates/brk_exit/Cargo.toml b/crates/brk_exit/Cargo.toml index 2b363dde3..58677bac4 100644 --- a/crates/brk_exit/Cargo.toml +++ b/crates/brk_exit/Cargo.toml @@ -11,3 +11,4 @@ repository.workspace = true brk_logger = { workspace = true } ctrlc = { version = "3.4.7", features = ["termination"] } log = { workspace = true } +parking_lot = { workspace = true } diff --git a/crates/brk_exit/examples/main.rs b/crates/brk_exit/examples/main.rs index 4d911f89c..de192af1e 100644 --- a/crates/brk_exit/examples/main.rs +++ b/crates/brk_exit/examples/main.rs @@ -8,7 +8,7 @@ fn main() { brk_logger::init(Some(Path::new(".log"))); - exit.block(); + let lock = exit.lock(); let mut i = 0; while i < 21 { @@ -17,7 +17,7 @@ fn main() { i += 1; } - exit.release(); + drop(lock); let mut j = 0; while j < 10 { diff --git a/crates/brk_exit/src/lib.rs b/crates/brk_exit/src/lib.rs index d29339c9f..1776e20a0 100644 --- a/crates/brk_exit/src/lib.rs +++ b/crates/brk_exit/src/lib.rs @@ -3,69 +3,34 @@ #![doc = include_str!("../examples/main.rs")] #![doc = "```"] -use std::{ - process::exit, - sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - }, - thread::sleep, - time::Duration, -}; +use std::{process::exit, sync::Arc}; use log::info; +use parking_lot::{RwLock, RwLockReadGuard}; #[derive(Default, Clone)] -pub struct Exit { - blocking: Arc, - triggered: Arc, -} +pub struct Exit(Arc>); impl Exit { pub fn new() -> Self { - let s = Self { - triggered: Arc::new(AtomicBool::new(false)), - blocking: Arc::new(AtomicBool::new(false)), - }; + let arc = Arc::new(RwLock::new(())); - let triggered = s.triggered.clone(); - - let blocking = s.blocking.clone(); - let is_blocking = move || blocking.load(Ordering::SeqCst); + let copy = arc.clone(); ctrlc::set_handler(move || { - info!("Exitting..."); - - triggered.store(true, Ordering::SeqCst); - - if is_blocking() { + if copy.is_locked() { info!("Waiting to exit safely..."); - - while is_blocking() { - sleep(Duration::from_millis(50)); - } } - + let _lock = copy.write(); + info!("Exiting..."); exit(0); }) .expect("Error setting Ctrl-C handler"); - s + Self(arc) } - pub fn block(&self) { - self.blocking.store(true, Ordering::SeqCst); - } - - pub fn blocked(&self) -> bool { - self.blocking.load(Ordering::SeqCst) - } - - pub fn release(&self) { - self.blocking.store(false, Ordering::SeqCst); - } - - pub fn triggered(&self) -> bool { - self.triggered.load(Ordering::SeqCst) + pub fn lock(&self) -> RwLockReadGuard<'_, ()> { + self.0.read() } } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index bae40291d..a724c58ba 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -60,11 +60,11 @@ impl Indexer { // dbg!(starting_indexes); // panic!(); - exit.block(); + let lock = exit.lock(); self.stores .rollback_if_needed(&mut self.vecs, &starting_indexes)?; self.vecs.rollback_if_needed(&starting_indexes)?; - exit.release(); + drop(lock); let vecs = &mut self.vecs; let stores = &mut self.stores; @@ -90,15 +90,14 @@ impl Indexer { rem: bool, exit: &Exit| -> color_eyre::Result { - if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem || exit.triggered() { + if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem { return Ok(false); } info!("Exporting..."); - exit.block(); + let _lock = exit.lock(); stores.commit(height)?; vecs.flush(height)?; - exit.release(); Ok(true) }; diff --git a/crates/brk_vec/src/variants/eager.rs b/crates/brk_vec/src/variants/eager.rs index 7bd5d3a47..68adea914 100644 --- a/crates/brk_vec/src/variants/eager.rs +++ b/crates/brk_vec/src/variants/eager.rs @@ -49,17 +49,8 @@ where } fn safe_truncate_if_needed(&mut self, index: I, exit: &Exit) -> Result<()> { - if exit.triggered() { - return Ok(()); - } - let blocked = exit.blocked(); - if !blocked { - exit.block(); - } + let _lock = exit.lock(); self.0.truncate_if_needed(index)?; - if !blocked { - exit.release(); - } Ok(()) } @@ -85,17 +76,8 @@ where } pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { - if exit.triggered() { - return Ok(()); - } - let blocked = exit.blocked(); - if !blocked { - exit.block(); - } + let _lock = exit.lock(); self.0.flush()?; - if !blocked { - exit.release(); - } Ok(()) } diff --git a/crates/brk_vecs/.gitignore b/crates/brk_vecs/.gitignore new file mode 100644 index 000000000..15176147d --- /dev/null +++ b/crates/brk_vecs/.gitignore @@ -0,0 +1 @@ +/vecs diff --git a/crates/brk_vecs/Cargo.toml b/crates/brk_vecs/Cargo.toml new file mode 100644 index 000000000..7b080d9af --- /dev/null +++ b/crates/brk_vecs/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "brk_vecs" +description = "A storeable vec" +keywords = ["vec", "disk", "data"] +categories = ["database"] +version.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +arc-swap = { workspace = true } +bincode = { workspace = true } +brk_core = { workspace = true } +brk_exit = { workspace = true } +libc = "0.2.174" +log = { workspace = true } +memmap2 = "0.9.7" +parking_lot = {workspace = true} +rayon = { workspace = true } +serde = { workspace = true } +serde_derive = { workspace = true } +serde_json = { workspace = true } +zerocopy = { workspace = true } +zerocopy-derive = { workspace = true } +zstd = "0.13.3" diff --git a/crates/brk_vecs/examples/main.rs b/crates/brk_vecs/examples/main.rs new file mode 100644 index 000000000..89bf6949a --- /dev/null +++ b/crates/brk_vecs/examples/main.rs @@ -0,0 +1,12 @@ +use std::path::Path; + +use brk_core::Result; +use brk_vecs::{File, PAGE_SIZE}; + +fn main() -> Result<()> { + let file = File::open(Path::new("vecs"))?; + + file.grow_if_needed(PAGE_SIZE * 1_000_000)?; + + Ok(()) +} diff --git a/crates/brk_vecs/src/file/layout.rs b/crates/brk_vecs/src/file/layout.rs new file mode 100644 index 000000000..1193ea8ec --- /dev/null +++ b/crates/brk_vecs/src/file/layout.rs @@ -0,0 +1,82 @@ +use std::fs::OpenOptions; +use std::sync::Arc; +use std::{collections::HashMap, fs, io::BufReader, path::Path}; + +use bincode::decode_from_std_read; +use bincode::{Decode, Encode, config}; +use brk_core::Result; +use parking_lot::{RwLock, RwLockReadGuard}; + +use crate::PAGE_SIZE; + +use super::Region; + +#[derive(Debug)] +pub struct Layout { + file: fs::File, + pub id_to_index: HashMap, + pub index_to_region: Vec>>, + // holes +} + +impl Layout { + pub fn open(path: &Path) -> Result { + let file = OpenOptions::new() + .read(true) + .create(true) + .write(true) + .truncate(false) + .open(path)?; + + Ok(if file.metadata()?.len() != 0 { + let config = config::standard(); + + let mut reader = BufReader::new(&file); + let serialized: SerializedRegions = decode_from_std_read(&mut reader, config)?; + + let mut id_to_index = HashMap::new(); + let mut index_to_region = vec![]; + + serialized.0.into_iter().for_each(|(str, region)| { + id_to_index.insert(str, index_to_region.len()); + index_to_region.push(Arc::new(RwLock::new(region))); + }); + + Self { + file, + id_to_index, + index_to_region, + } + } else { + Self { + file, + id_to_index: HashMap::new(), + index_to_region: Vec::new(), + } + }) + } + + pub fn get_or_create_region_from_id(&mut self, id: String) -> Result { + if let Some(v) = self.id_to_index.get(&id) { + return Ok(*v); + } + let index = self.create_region()?; + self.id_to_index.insert(id, index); + Ok(index) + } + + fn create_region(&mut self) -> Result { + let index = self.index_to_region.len(); + + let length = PAGE_SIZE; + + Ok(0) + } + + pub fn get(&self, region: usize) -> Option> { + self.index_to_region.get(region).map(|r| r.read()) + } +} + +#[derive(Debug, Encode, Decode)] +struct SerializedRegions(HashMap); diff --git a/crates/brk_vecs/src/file/mod.rs b/crates/brk_vecs/src/file/mod.rs new file mode 100644 index 000000000..988542506 --- /dev/null +++ b/crates/brk_vecs/src/file/mod.rs @@ -0,0 +1,128 @@ +use std::{ + fs::{self, OpenOptions}, + os::unix::io::AsRawFd, + path::Path, + sync::Arc, +}; + +use brk_core::{Error, Result}; +use libc::off_t; +use memmap2::{MmapMut, MmapOptions}; +use parking_lot::{RwLock, RwLockReadGuard}; + +mod layout; +mod reader; +mod region; + +use layout::*; +use region::*; + +use crate::file::reader::Reader; + +pub const PAGE_SIZE: usize = 4096; + +pub struct File { + layout: Arc>, + file: Arc>, + mmap: Arc>, +} + +impl File { + pub fn open(path: &Path) -> Result { + fs::create_dir_all(path)?; + + let layout = Layout::open(&path.join("layout.dat"))?; + + let file = OpenOptions::new() + .read(true) + .create(true) + .write(true) + .truncate(false) + .open(path.join("data.dat"))?; + + let mmap = Self::mmap(&file)?; + + Ok(Self { + file: Arc::new(RwLock::new(file)), + mmap: Arc::new(RwLock::new(mmap)), + layout: Arc::new(RwLock::new(layout)), + }) + } + + /// len % PAGE_SIZE == 0 + pub fn grow_if_needed(&self, len: usize) -> Result<()> { + assert!(len % PAGE_SIZE == 0); + let file = self.file.write(); + let len = len as u64; + if file.metadata()?.len() < len { + file.set_len(len)?; + self.remap_(&file) + } else { + Ok(()) + } + } + + pub fn get_or_create_region_from_id(&mut self, id: String) -> Result { + self.layout.write().get_or_create_region_from_id(id) + } + + pub fn create_reader<'a, 'b>(&'a self, region_id: usize) -> Result> + where + 'a: 'b, + { + let layout: RwLockReadGuard<'a, Layout> = self.layout.read(); + let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read(); + + let region: RwLockReadGuard<'b, Region> = + layout.get(region_id).ok_or(Error::Str("Unknown region"))?; + + Ok(Reader::new(mmap, layout, region)) + } + + fn remap(&self) -> Result<()> { + *self.mmap.write() = Self::mmap(&self.file.read())?; + Ok(()) + } + fn remap_(&self, file: &fs::File) -> Result<()> { + *self.mmap.write() = Self::mmap(file)?; + Ok(()) + } + fn mmap(file: &fs::File) -> Result { + Ok(unsafe { MmapOptions::new().map_mut(file)? }) + } + + pub fn delete() {} + + #[cfg(target_os = "macos")] + fn punch_hole(file: &fs::File, offset: u64, length: u64) -> Result<()> { + let fpunchhole = FPunchhole { + fp_flags: 0, + reserved: 0, + fp_offset: offset as libc::off_t, + fp_length: length as libc::off_t, + }; + + let result = unsafe { + libc::fcntl( + file.as_raw_fd(), + libc::F_PUNCHHOLE, + &fpunchhole as *const FPunchhole, + ) + }; + + if result == -1 { + let err = std::io::Error::last_os_error(); + return Err(Error::String(format!("Failed to punch hole: {err}"))); + } + + Ok(()) + } +} + +#[repr(C)] +struct FPunchhole { + fp_flags: u32, + reserved: u32, + fp_offset: off_t, + fp_length: off_t, +} diff --git a/crates/brk_vecs/src/file/reader.rs b/crates/brk_vecs/src/file/reader.rs new file mode 100644 index 000000000..8a2032463 --- /dev/null +++ b/crates/brk_vecs/src/file/reader.rs @@ -0,0 +1,44 @@ +use memmap2::MmapMut; +use parking_lot::RwLockReadGuard; + +use crate::file::layout::Layout; + +use super::Region; + +pub struct Reader<'a, 'b> +where + 'a: 'b, +{ + layout: RwLockReadGuard<'a, Layout>, + mmap: RwLockReadGuard<'a, MmapMut>, + region: RwLockReadGuard<'b, Region>, +} + +impl<'a, 'b> Reader<'a, 'b> +where + 'a: 'b, +{ + pub fn new( + mmap: RwLockReadGuard<'a, MmapMut>, + layout: RwLockReadGuard<'a, Layout>, + region: RwLockReadGuard<'b, Region>, + ) -> Self { + Self { + mmap, + layout, + region, + } + } + + pub fn read(&self, offset: usize, len: usize) -> &[u8] { + assert!(offset + len < self.region.length()); + + let start = self.region.start() + offset; + let end = start + len; + &self.mmap[start..end] + } + + pub fn region(&self) -> &Region { + &self.region + } +} diff --git a/crates/brk_vecs/src/file/region.rs b/crates/brk_vecs/src/file/region.rs new file mode 100644 index 000000000..cf8e1ff36 --- /dev/null +++ b/crates/brk_vecs/src/file/region.rs @@ -0,0 +1,79 @@ +// use std::sync::Arc; + +use bincode::{Decode, Encode}; +// use parking_lot::{RwLock, RwLockReadGuard}; + +use crate::PAGE_SIZE; + +// #[derive(Debug, Encode, Decode)] +#[derive(Debug, Encode, Decode)] +pub struct Region { + // Bad name + /// Must be multiple of 4096 + start: usize, + length: usize, + /// Must be multiple of 4096 + reserved: usize, + // lock: Arc>, + // variant: usize, // Raw or Compressed or something else ? to know if there is a header ? Since blocks 4096, storing headers individually would be dumb +} + +impl Region { + pub fn new(start: usize, length: usize, reserved: usize) -> Self { + assert!(reserved > 0); + assert!(start % PAGE_SIZE == 0); + assert!(reserved % PAGE_SIZE == 0); + assert!(length <= reserved); + + Self { + start, + length, + reserved, + // lock: Arc::new(RwLock::new(())), + } + } + + pub fn start(&self) -> usize { + self.start + } + + pub fn length(&self) -> usize { + self.length + } + + pub fn reserved(&self) -> usize { + self.reserved + } + + // pub fn lock(&self) -> RwLockReadGuard<'_, ()> { + // self.lock.read() + // } +} + +// #[derive(Debug, Encode, Decode)] +// pub struct RegionInner { +// start: usize, +// length: usize, +// reserved: usize, +// } + +// impl From for RegionInner { +// fn from(value: Region) -> Self { +// Self { +// start: value.start, +// length: value.length, +// reserved: value.reserved, +// } +// } +// } + +// impl From for Region { +// fn from(value: RegionInner) -> Self { +// Self { +// start: value.start, +// length: value.length, +// reserved: value.reserved, +// lock: Arc::new(RwLock::new(())), +// } +// } +// } diff --git a/crates/brk_vecs/src/lib.rs b/crates/brk_vecs/src/lib.rs new file mode 100644 index 000000000..6e89f3fb2 --- /dev/null +++ b/crates/brk_vecs/src/lib.rs @@ -0,0 +1,5 @@ +mod file; +mod variants; + +pub use file::*; +pub use variants::*; diff --git a/crates/brk_vecs/src/variants/mod.rs b/crates/brk_vecs/src/variants/mod.rs new file mode 100644 index 000000000..8e5473b96 --- /dev/null +++ b/crates/brk_vecs/src/variants/mod.rs @@ -0,0 +1,3 @@ +mod raw; + +pub use raw::*; diff --git a/crates/brk_vecs/src/variants/raw.rs b/crates/brk_vecs/src/variants/raw.rs new file mode 100644 index 000000000..00ab1f1ac --- /dev/null +++ b/crates/brk_vecs/src/variants/raw.rs @@ -0,0 +1,3 @@ +pub struct RawVec { + region: usize, +} diff --git a/crates/brk_vecs/src/variants/stamped.rs b/crates/brk_vecs/src/variants/stamped.rs new file mode 100644 index 000000000..6854e4181 --- /dev/null +++ b/crates/brk_vecs/src/variants/stamped.rs @@ -0,0 +1,3 @@ +pub struct Stamp(u64); + +pub struct StampedVec;