diff --git a/Cargo.lock b/Cargo.lock index 283c91343..2ca871a44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1065,6 +1065,8 @@ dependencies = [ "memmap2", "parking_lot", "rayon", + "serde", + "serde_json", "zerocopy", "zerocopy-derive", "zstd", diff --git a/crates/brk_vec/src/structs/length.rs b/crates/brk_vec/src/structs/length.rs deleted file mode 100644 index e32d87e00..000000000 --- a/crates/brk_vec/src/structs/length.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::{ - fs, - io::{self, Read}, - ops::{AddAssign, Deref, DerefMut}, - path::Path, -}; - -use brk_core::{Error, Result}; -use zerocopy::{FromBytes, IntoBytes}; -use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; - -#[derive( - Debug, - Default, - Clone, - Copy, - PartialEq, - Eq, - PartialOrd, - Ord, - FromBytes, - IntoBytes, - Immutable, - KnownLayout, -)] -pub struct Length(usize); - -impl Length { - pub fn write(&self, path: &Path) -> Result<(), io::Error> { - fs::write(path, self.as_bytes()) - } -} - -impl From for Length { - fn from(value: usize) -> Self { - Self(value) - } -} - -impl Deref for Length { - type Target = usize; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for Length { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl TryFrom<&Path> for Length { - type Error = Error; - fn try_from(value: &Path) -> Result { - let mut buf = [0; 8]; - if let Ok(bytes) = fs::read(value) { - bytes.as_slice().read_exact(&mut buf)?; - Ok(*(Self::ref_from_bytes(&buf)?)) - } else { - Ok(Self::default()) - } - } -} - -impl AddAssign for Length { - fn add_assign(&mut self, rhs: usize) { - self.0 += rhs; - } -} diff --git a/crates/brk_vec/src/structs/mod.rs b/crates/brk_vec/src/structs/mod.rs index 265e9eaad..237b1d4ba 100644 --- a/crates/brk_vec/src/structs/mod.rs +++ b/crates/brk_vec/src/structs/mod.rs @@ -2,12 +2,10 @@ mod compressed_page_meta; mod compressed_pages_meta; mod format; mod header; -// mod length; mod unsafe_slice; pub use compressed_page_meta::*; pub use compressed_pages_meta::*; pub use format::*; pub use header::*; -// pub use length::*; pub use unsafe_slice::*; diff --git a/crates/brk_vecs/Cargo.toml b/crates/brk_vecs/Cargo.toml index 14bdb1941..33bd0cbc1 100644 --- a/crates/brk_vecs/Cargo.toml +++ b/crates/brk_vecs/Cargo.toml @@ -18,6 +18,8 @@ log = { workspace = true } memmap2 = "0.9.7" parking_lot = {workspace = true} rayon = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } zerocopy = { workspace = true } zerocopy-derive = { workspace = true } zstd = "0.13.3" diff --git a/crates/brk_vecs/examples/main.rs b/crates/brk_vecs/examples/main.rs index 632d6802d..7e35cd4c6 100644 --- a/crates/brk_vecs/examples/main.rs +++ b/crates/brk_vecs/examples/main.rs @@ -8,38 +8,400 @@ fn main() -> Result<()> { let file = File::open(Path::new("vecs"))?; + // file.set_min_len(PAGE_SIZE * 1_000_000)?; + let region1_i = file.create_region_if_needed("region1")?; - dbg!(region1_i); - - assert!(file.get_region(region1_i).unwrap().read().len() == 0); - - file.write_all(region1_i, &[0, 1, 2, 3, 4])?; - { - let opt = file.get_region(region1_i); - let region = opt.as_ref().unwrap().read(); - assert!(region.start() == 0 && region.len() == 5 && region.reserved() == PAGE_SIZE); + let layout = file.layout(); + assert!(layout.start_to_index().len() == 1); + assert!(layout.start_to_index().first_key_value() == Some((&0, &0))); + assert!(layout.start_to_hole().is_empty()); + + let regions = file.regions(); + assert!( + regions + .get_region_index_from_id("region1") + .is_some_and(|i| i == region1_i) + ); + + let region = file.get_region(region1_i)?; + assert!(region.start() == 0); + assert!(region.len() == 0); + assert!(region.reserved() == PAGE_SIZE); } - assert!(file.mmap.read()[0..10] == [0, 1, 2, 3, 4, 0, 0, 0, 0, 0]); - - file.write_all(region1_i, &[5, 6, 7, 8, 9])?; - - assert!(file.mmap.read()[0..10] == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); - - file.write_all_at(region1_i, &[1, 2], 0)?; - - assert!(file.mmap.read()[0..10] == [1, 2, 2, 3, 4, 5, 6, 7, 8, 9]); + file.write_all_to_region(region1_i, &[0, 1, 2, 3, 4])?; { - let opt = file.get_region(region1_i); - let region = opt.as_ref().unwrap().read(); - dbg!(®ion); - assert!(region.start() == 0 && region.len() == 10 && region.reserved() == PAGE_SIZE); + let region = file.get_region(region1_i)?; + assert!(region.start() == 0); + assert!(region.len() == 5); + assert!(region.reserved() == PAGE_SIZE); + + assert!(file.mmap()[0..10] == [0, 1, 2, 3, 4, 0, 0, 0, 0, 0]); } - // file.set_min_len(PAGE_SIZE * 1_000_000)?; + file.write_all_to_region(region1_i, &[5, 6, 7, 8, 9])?; + + { + let region = file.get_region(region1_i)?; + assert!(region.start() == 0); + assert!(region.len() == 10); + assert!(region.reserved() == PAGE_SIZE); + + assert!(file.mmap()[0..10] == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + } + + file.write_all_to_region_at(region1_i, &[1, 2], 0)?; + + { + let region = file.get_region(region1_i)?; + assert!(region.start() == 0); + assert!(region.len() == 10); + assert!(region.reserved() == PAGE_SIZE); + + assert!(file.mmap()[0..10] == [1, 2, 2, 3, 4, 5, 6, 7, 8, 9]); + } + + file.write_all_to_region_at(region1_i, &[10, 11, 12, 13, 14, 15, 16, 17, 18], 4)?; + + { + let region = file.get_region(region1_i)?; + assert!(region.start() == 0); + assert!(region.len() == 13); + assert!(region.reserved() == PAGE_SIZE); + + assert!( + file.mmap()[0..20] + == [ + 1, 2, 2, 3, 10, 11, 12, 13, 14, 15, 16, 17, 18, 0, 0, 0, 0, 0, 0, 0 + ] + ); + } + + file.write_all_to_region_at(region1_i, &[1], 18)?; + + { + let region = file.get_region(region1_i)?; + assert!(region.start() == 0); + assert!(region.len() == 19); + assert!(region.reserved() == PAGE_SIZE); + + assert!( + file.mmap()[0..20] + == [ + 1, 2, 2, 3, 10, 11, 12, 13, 14, 15, 16, 17, 18, 0, 0, 0, 0, 0, 1, 0 + ] + ); + } + + file.write_all_to_region_at(region1_i, &[1; 8000], 0)?; + + { + let region = file.get_region(region1_i)?; + assert!(region.start() == 0); + assert!(region.len() == 8000); + assert!(region.reserved() == PAGE_SIZE * 2); + + assert!(file.mmap()[0..8000] == [1; 8000]); + assert!(file.mmap()[8000..8001] == [0]); + } + + println!("Disk usage - pre sync: {}", file.disk_usage()); + file.sync_data()?; + println!("Disk usage - post sync: {}", file.disk_usage()); + + file.truncate_region(region1_i, 10)?; + + { + let region = file.get_region(region1_i)?; + assert!(region.start() == 0); + assert!(region.len() == 10); + assert!(region.reserved() == PAGE_SIZE * 2); + // We only punch a hole in whole pages (4096 bytes) + // Thus the last byte of the page where the is still data wasn't overwritten when truncating + // And the first byte of the punched page was set to 0 + assert!(file.mmap()[4095..=4096] == [1, 0]); + } + + file.sync_data()?; + println!("Disk usage - post trunc: {}", file.disk_usage()); + + file.remove_region(region1_i)?; + + println!("Disk usage - post remove: {}", file.disk_usage()); + + { + let regions = file.regions(); + let index_to_region = regions.index_to_region(); + assert!(index_to_region.len() == 1); + assert!(index_to_region[0].is_none()); + assert!(regions.id_to_index().is_empty()); + + let layout = file.layout(); + assert!(layout.start_to_index().is_empty()); + assert!(layout.start_to_hole().is_empty()); + } + + let region1_i = file.create_region_if_needed("region1")?; + let region2_i = file.create_region_if_needed("region2")?; + let region3_i = file.create_region_if_needed("region3")?; + + { + let regions = file.regions(); + let index_to_region = regions.index_to_region(); + assert!(index_to_region.len() == 3); + let region1 = file.get_region(region1_i)?; + assert!(region1.start() == 0); + assert!(region1.len() == 0); + assert!(region1.reserved() == PAGE_SIZE); + let region2 = file.get_region(region2_i)?; + assert!(region2.start() == PAGE_SIZE); + assert!(region2.len() == 0); + assert!(region2.reserved() == PAGE_SIZE); + let region3 = file.get_region(region3_i)?; + assert!(region3.start() == PAGE_SIZE * 2); + assert!(region3.len() == 0); + assert!(region3.reserved() == PAGE_SIZE); + let id_to_index = regions.id_to_index(); + assert!(id_to_index.len() == 3); + assert!(id_to_index.get("region1") == Some(&0)); + assert!(id_to_index.get("region2") == Some(&1)); + assert!(id_to_index.get("region3") == Some(&2)); + + let layout = file.layout(); + let start_to_index = layout.start_to_index(); + assert!(start_to_index.len() == 3); + assert!(start_to_index.get(&0) == Some(&0)); + assert!(start_to_index.get(&PAGE_SIZE) == Some(&1)); + assert!(start_to_index.get(&(PAGE_SIZE * 2)) == Some(&2)); + assert!(layout.start_to_hole().is_empty()); + } + + file.remove_region(region2_i)?; + + { + let regions = file.regions(); + let index_to_region = regions.index_to_region(); + assert!(index_to_region.len() == 3); + let region1 = file.get_region(region1_i)?; + assert!(region1.start() == 0); + assert!(region1.len() == 0); + assert!(region1.reserved() == PAGE_SIZE); + assert!(file.get_region(region2_i).is_err()); + assert!( + index_to_region + .get(region2_i) + .is_some_and(|opt| opt.is_none()) + ); + let region3 = file.get_region(region3_i)?; + assert!(region3.start() == PAGE_SIZE * 2); + assert!(region3.len() == 0); + assert!(region3.reserved() == PAGE_SIZE); + let id_to_index = regions.id_to_index(); + assert!(id_to_index.len() == 2); + assert!(id_to_index.get("region1") == Some(&0)); + assert!(id_to_index.get("region2").is_none()); + assert!(id_to_index.get("region3") == Some(&2)); + + let layout = file.layout(); + let start_to_index = layout.start_to_index(); + assert!(start_to_index.len() == 2); + assert!(start_to_index.get(&0) == Some(®ion1_i)); + assert!(start_to_index.get(&(PAGE_SIZE * 2)) == Some(®ion3_i)); + let start_to_hole = layout.start_to_hole(); + assert!(start_to_hole.len() == 1); + assert!(start_to_hole.get(&PAGE_SIZE) == Some(&PAGE_SIZE)); + + drop(regions); + drop(layout); + assert!(file.remove_region(region2_i).is_ok_and(|o| o.is_none())); + } + + let region2_i = file.create_region_if_needed("region2")?; + + { + assert!(region2_i == 1) + } + + file.remove_region(region2_i)?; + + { + let regions = file.regions(); + let index_to_region = regions.index_to_region(); + assert!(index_to_region.len() == 3); + let region1 = file.get_region(region1_i)?; + assert!(region1.start() == 0); + assert!(region1.len() == 0); + assert!(region1.reserved() == PAGE_SIZE); + assert!(file.get_region(region2_i).is_err()); + assert!( + index_to_region + .get(region2_i) + .is_some_and(|opt| opt.is_none()) + ); + let region3 = file.get_region(region3_i)?; + assert!(region3.start() == PAGE_SIZE * 2); + assert!(region3.len() == 0); + assert!(region3.reserved() == PAGE_SIZE); + let id_to_index = regions.id_to_index(); + assert!(id_to_index.len() == 2); + assert!(id_to_index.get("region1") == Some(&0)); + assert!(id_to_index.get("region2").is_none()); + assert!(id_to_index.get("region3") == Some(&2)); + + let layout = file.layout(); + let start_to_index = layout.start_to_index(); + assert!(start_to_index.len() == 2); + assert!(start_to_index.get(&0) == Some(®ion1_i)); + assert!(start_to_index.get(&(PAGE_SIZE * 2)) == Some(®ion3_i)); + let start_to_hole = layout.start_to_hole(); + assert!(start_to_hole.len() == 1); + assert!(start_to_hole.get(&PAGE_SIZE) == Some(&PAGE_SIZE)); + + drop(regions); + drop(layout); + assert!(file.remove_region(region2_i).is_ok_and(|o| o.is_none())); + } + + file.write_all_to_region_at(region1_i, &[1; 8000], 0)?; + + { + let regions = file.regions(); + let index_to_region = regions.index_to_region(); + assert!(index_to_region.len() == 3); + let region1 = file.get_region(region1_i)?; + assert!(region1.start() == 0); + assert!(region1.len() == 8000); + assert!(region1.reserved() == 2 * PAGE_SIZE); + assert!(file.get_region(region2_i).is_err()); + assert!( + index_to_region + .get(region2_i) + .is_some_and(|opt| opt.is_none()) + ); + let region3 = file.get_region(region3_i)?; + assert!(region3.start() == PAGE_SIZE * 2); + assert!(region3.len() == 0); + assert!(region3.reserved() == PAGE_SIZE); + let id_to_index = regions.id_to_index(); + assert!(id_to_index.len() == 2); + assert!(id_to_index.get("region1") == Some(&0)); + assert!(id_to_index.get("region2").is_none()); + assert!(id_to_index.get("region3") == Some(&2)); + + let layout = file.layout(); + let start_to_index = layout.start_to_index(); + assert!(start_to_index.len() == 2); + assert!(start_to_index.get(&0) == Some(®ion1_i)); + assert!(start_to_index.get(&(PAGE_SIZE * 2)) == Some(®ion3_i)); + let start_to_hole = layout.start_to_hole(); + assert!(start_to_hole.is_empty()); + } + + let region2_i = file.create_region_if_needed("region2")?; + + { + let regions = file.regions(); + let index_to_region = regions.index_to_region(); + assert!(index_to_region.len() == 3); + let region1 = file.get_region(region1_i)?; + assert!(region1.start() == 0); + assert!(region1.len() == 8000); + assert!(region1.reserved() == 2 * PAGE_SIZE); + let region2 = file.get_region(region2_i)?; + assert!(region2.start() == PAGE_SIZE * 3); + assert!(region2.len() == 0); + assert!(region2.reserved() == PAGE_SIZE); + let region3 = file.get_region(region3_i)?; + assert!(region3.start() == PAGE_SIZE * 2); + assert!(region3.len() == 0); + assert!(region3.reserved() == PAGE_SIZE); + let id_to_index = regions.id_to_index(); + assert!(id_to_index.len() == 3); + assert!(id_to_index.get("region1") == Some(&0)); + assert!(id_to_index.get("region2") == Some(&1)); + assert!(id_to_index.get("region3") == Some(&2)); + + let layout = file.layout(); + let start_to_index = layout.start_to_index(); + assert!(start_to_index.len() == 3); + assert!(start_to_index.get(&0) == Some(®ion1_i)); + assert!(start_to_index.get(&(PAGE_SIZE * 2)) == Some(®ion3_i)); + assert!(start_to_index.get(&(PAGE_SIZE * 3)) == Some(®ion2_i)); + let start_to_hole = layout.start_to_hole(); + assert!(start_to_hole.is_empty()); + } + + file.remove_region(region3_i)?; + + { + let regions = file.regions(); + let index_to_region = regions.index_to_region(); + assert!(index_to_region.len() == 3); + let region1 = file.get_region(region1_i)?; + assert!(region1.start() == 0); + assert!(region1.len() == 8000); + assert!(region1.reserved() == 2 * PAGE_SIZE); + let region2 = file.get_region(region2_i)?; + assert!(region2.start() == PAGE_SIZE * 3); + assert!(region2.len() == 0); + assert!(region2.reserved() == PAGE_SIZE); + assert!(file.get_region(region3_i).is_err()); + let id_to_index = regions.id_to_index(); + assert!(id_to_index.len() == 2); + assert!(id_to_index.get("region1") == Some(&0)); + assert!(id_to_index.get("region2") == Some(&1)); + assert!(id_to_index.get("region3").is_none()); + + let layout = file.layout(); + let start_to_index = layout.start_to_index(); + assert!(start_to_index.len() == 2); + assert!(start_to_index.get(&0) == Some(®ion1_i)); + assert!(start_to_index.get(&(PAGE_SIZE * 3)) == Some(®ion2_i)); + let start_to_hole = layout.start_to_hole(); + assert!(start_to_hole.get(&(PAGE_SIZE * 2)) == Some(&PAGE_SIZE)); + } + + file.write_all_to_region(region1_i, &[1; 8000])?; + + { + let regions = file.regions(); + let index_to_region = regions.index_to_region(); + assert!(index_to_region.len() == 3); + let region1 = file.get_region(region1_i)?; + assert!(region1.start() == PAGE_SIZE * 4); + assert!(region1.len() == 16_000); + assert!(region1.reserved() == 4 * PAGE_SIZE); + let region2 = file.get_region(region2_i)?; + assert!(region2.start() == PAGE_SIZE * 3); + assert!(region2.len() == 0); + assert!(region2.reserved() == PAGE_SIZE); + assert!(file.get_region(region3_i).is_err()); + let id_to_index = regions.id_to_index(); + assert!(id_to_index.len() == 2); + assert!(id_to_index.get("region1") == Some(&0)); + assert!(id_to_index.get("region2") == Some(&1)); + assert!(id_to_index.get("region3").is_none()); + + let layout = file.layout(); + let start_to_index = layout.start_to_index(); + assert!(start_to_index.len() == 2); + assert!(start_to_index.get(&(PAGE_SIZE * 4)) == Some(®ion1_i)); + assert!(start_to_index.get(&(PAGE_SIZE * 3)) == Some(®ion2_i)); + let start_to_hole = layout.start_to_hole(); + assert!(start_to_hole.get(&0) == Some(&(PAGE_SIZE * 3))); + } + + file.write_all_to_region(region2_i, &[1; 6000])?; + + let region4_i = file.create_region_if_needed("region4")?; + file.remove_region(region2_i); + file.remove_region(region4_i); + + dbg!(file.regions()); + dbg!(file.layout()); Ok(()) } diff --git a/crates/brk_vecs/src/file/layout.rs b/crates/brk_vecs/src/file/layout.rs index cfc30b4bd..c630f78aa 100644 --- a/crates/brk_vecs/src/file/layout.rs +++ b/crates/brk_vecs/src/file/layout.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use brk_core::Error; use brk_core::Result; -use super::{PAGE_SIZE, Region, Regions}; +use super::{Region, Regions}; #[derive(Debug)] pub struct Layout { @@ -19,7 +19,7 @@ impl From<&Regions> for Layout { let mut prev_end = 0; value - .as_array() + .index_to_region() .iter() .enumerate() .flat_map(|(index, opt)| opt.as_ref().map(|region| (index, region))) @@ -42,6 +42,14 @@ impl From<&Regions> for Layout { } impl Layout { + pub fn start_to_index(&self) -> &BTreeMap { + &self.start_to_index + } + + pub fn start_to_hole(&self) -> &BTreeMap { + &self.start_to_hole + } + pub fn get_last_region(&self) -> Option<(u64, usize)> { self.start_to_index .last_key_value() @@ -66,15 +74,15 @@ impl Layout { // TODO: Other checks related to holes ? } - pub fn move_region(&mut self, start: u64, index: usize, region: &Region) -> Result<()> { + pub fn move_region(&mut self, new_start: u64, index: usize, region: &Region) -> Result<()> { self.remove_region(index, region)?; - self.insert_region(start, index); + self.insert_region(new_start, index); Ok(()) } pub fn remove_region(&mut self, index: usize, region: &Region) -> Result<()> { let start = region.start(); - let reserved = region.reserved(); + let mut reserved = region.reserved(); if self .start_to_index @@ -86,14 +94,10 @@ impl Layout { )); } - if self - .widen_hole_to_the_left_if_any(start + reserved, reserved) - .is_none() - && let Some((&hole_start, gap)) = self.start_to_hole.range(..start).next_back() - && hole_start + *gap == start - { - self.widen_hole_to_the_right_if_any(hole_start, reserved); - } + reserved += self + .start_to_hole + .remove(&(start + reserved)) + .unwrap_or_default(); if self .start_to_index @@ -106,7 +110,16 @@ impl Layout { .is_some_and(|&hole_start| hole_start > region_start) }) { + // dbg!("Remove last hole"); self.start_to_hole.pop_last(); + } else if let Some((&hole_start, gap)) = self.start_to_hole.range_mut(..start).next_back() + && hole_start + *gap == start + { + // dbg!("Expand hole"); + *gap += reserved; + } else { + // dbg!("Insert hole"); + self.start_to_hole.insert(start, reserved); } Ok(()) @@ -138,42 +151,4 @@ impl Layout { } } } - - fn widen_hole_to_the_left_if_any(&mut self, start: u64, widen_by: u64) -> Option { - debug_assert!(start % PAGE_SIZE == 0); - - if widen_by > start { - panic!("Hole too small") - } - - let gap = self.start_to_hole.remove(&start)?; - debug_assert!(widen_by % PAGE_SIZE == 0); - let start = start - widen_by; - let gap = gap + widen_by; - - if let Some((&prev_start, prev_gap)) = self.start_to_hole.range_mut(..start).next_back() - && prev_start + *prev_gap == start - { - *prev_gap += gap; - } else { - debug_assert!(self.start_to_hole.insert(start, gap).is_none()); - } - - Some(start) - } - - fn widen_hole_to_the_right_if_any(&mut self, start: u64, widen_by: u64) -> Option { - debug_assert!(start % PAGE_SIZE == 0); - - let gap = self.start_to_hole.get_mut(&start)?; - debug_assert!(widen_by % PAGE_SIZE == 0); - *gap += widen_by; - - let next_hole_start = start + *gap; - if let Some(next_gap) = self.start_to_hole.remove(&next_hole_start) { - *self.start_to_hole.get_mut(&start).unwrap() += next_gap; - } - - Some(start) - } } diff --git a/crates/brk_vecs/src/file/mod.rs b/crates/brk_vecs/src/file/mod.rs index e6ea769bf..851eb2ff2 100644 --- a/crates/brk_vecs/src/file/mod.rs +++ b/crates/brk_vecs/src/file/mod.rs @@ -1,7 +1,8 @@ use std::{ fs::{self, OpenOptions}, + io::Write, os::unix::io::AsRawFd, - path::Path, + path::{Path, PathBuf}, sync::Arc, }; @@ -16,19 +17,20 @@ mod region; mod regions; use layout::*; -use reader::*; +pub use reader::Reader; use region::*; use regions::*; pub const PAGE_SIZE: u64 = 4096; pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1; +#[derive(Debug)] pub struct File { - // TODO: Remove pub - pub regions: RwLock, - pub layout: RwLock, - pub file: RwLock, - pub mmap: RwLock, + path: PathBuf, + regions: RwLock, + layout: RwLock, + file: RwLock, + mmap: RwLock, } impl File { @@ -43,11 +45,12 @@ impl File { .create(true) .write(true) .truncate(false) - .open(path.join("data"))?; + .open(Self::data_path_(path))?; - let mmap = Self::mmap(&file)?; + let mmap = Self::create_mmap(&file)?; Ok(Self { + path: path.to_owned(), file: RwLock::new(file), mmap: RwLock::new(mmap), regions: RwLock::new(regions), @@ -55,14 +58,18 @@ impl File { }) } + pub fn file_len(&self) -> Result { + Ok(self.file.read().metadata()?.len()) + } + pub fn set_min_len(&self, len: u64) -> Result<()> { let len = Self::ceil_number_to_page_size_multiple(len); - if self.file.read().metadata()?.len() < len { + if self.file_len()? < len { let mut mmap = self.mmap.write(); let file = self.file.write(); file.set_len(len)?; - *mmap = Self::mmap(&file)?; + *mmap = Self::create_mmap(&file)?; Ok(()) } else { Ok(()) @@ -77,7 +84,7 @@ impl File { } pub fn create_region_if_needed(&self, id: &str) -> Result { - if let Some(index) = self.regions.read().get_region_index_from_id(id.to_owned()) { + if let Some(index) = self.regions.read().get_region_index_from_id(id) { return Ok(index); } let mut regions = self.regions.write(); @@ -106,45 +113,48 @@ impl File { Ok(index) } - pub fn get_region(&self, index: usize) -> Option>> { - self.regions.read().get_region_from_index(index) + pub fn get_region(&self, index: usize) -> Result> { + let regions = self.regions.read(); + let region_opt = regions.get_region_from_index(index); + let region_arc = region_opt.ok_or(Error::Str("Unknown region"))?; + let region = region_arc.read(); + let region: RwLockReadGuard<'static, Region> = unsafe { std::mem::transmute(region) }; + Ok(region) } - pub fn read<'a>(&'a self, index: usize) -> Result> { + pub fn read_region<'a>(&'a self, index: usize) -> Result> { let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read(); - let region: RwLockReadGuard<'static, Region> = unsafe { - std::mem::transmute( - self.regions - .read() - .get_region_from_index(index) - .ok_or(Error::Str("Unknown region"))? - .read(), - ) - }; + let region = self.get_region(index)?; Ok(Reader::new(mmap, region)) } #[inline] - pub fn write_all(&self, region: usize, data: &[u8]) -> Result<()> { - self.write_all_at_(region, data, None) + pub fn write_all_to_region(&self, region: usize, data: &[u8]) -> Result<()> { + self.write_all_to_region_at_(region, data, None) } #[inline] - pub fn write_all_at(&self, region: usize, data: &[u8], at: u64) -> Result<()> { - self.write_all_at_(region, data, Some(at)) + pub fn write_all_to_region_at(&self, region: usize, data: &[u8], at: u64) -> Result<()> { + self.write_all_to_region_at_(region, data, Some(at)) } - fn write_all_at_(&self, region_index: usize, data: &[u8], at: Option) -> Result<()> { + fn write_all_to_region_at_( + &self, + region_index: usize, + data: &[u8], + at: Option, + ) -> Result<()> { let Some(region) = self.regions.read().get_region_from_index(region_index) else { return Err(Error::Str("Unknown region")); }; let region_lock = region.read(); let start = region_lock.start(); let reserved = region_lock.reserved(); - let left = region_lock.left(); let len = region_lock.len(); let data_len = data.len() as u64; drop(region_lock); + let new_len = at.map_or(len + data_len, |at| (at + data_len).max(len)); + // dbg!(new_len); let write_start = at.unwrap_or(start + len); if at.is_some_and(|at| at < start || at >= start + reserved) { @@ -152,42 +162,44 @@ impl File { } // Write to reserved space if possible - let at_left = at.map_or_else(|| left, |at| reserved - at); - if at_left >= data_len { - let len = reserved - at_left.min(left) + data_len; - - dbg!(write_start); + if new_len <= reserved { + // dbh!("Write to reserved space"); + // dbg!(write_start); self.write(write_start, data); let regions = self.regions.read(); let mut region_lock = region.write(); - dbg!(len); - region_lock.set_len(len); + if len != new_len { + region_lock.set_len(new_len); + } regions.write_to_mmap(®ion_lock, region_index); return Ok(()); } let mut layout_lock = self.layout.write(); - let new_len = len + data_len; debug_assert!(new_len > reserved); let mut new_reserved = reserved; - while new_len < new_reserved { + while new_len > new_reserved { new_reserved *= 2; } + debug_assert!(new_len <= new_reserved); let added_reserve = new_reserved - reserved; // If is last continue writing if layout_lock.is_last_region(region_index) { + // dbg!("Append to file"); + // dbg!(start, new_reserved, start + new_reserved); + self.set_min_len(start + new_reserved)?; self.write(write_start, data); let regions = self.regions.read(); let mut region_lock = region.write(); - region_lock.set_len(new_len); region_lock.set_reserved(new_reserved); + region_lock.set_len(new_len); regions.write_to_mmap(®ion_lock, region_index); return Ok(()); } @@ -196,6 +208,8 @@ impl File { let hole_start = start + reserved; let gap = layout_lock.get_hole(hole_start); if gap.is_some_and(|gap| gap >= added_reserve) { + // dbg!("Expand to hole"); + self.write(write_start, data); layout_lock.remove_or_compress_hole_to_right(hole_start, added_reserve); @@ -203,14 +217,16 @@ impl File { let regions = self.regions.read(); let mut region_lock = region.write(); - region_lock.set_len(new_len); region_lock.set_reserved(new_reserved); + region_lock.set_len(new_len); regions.write_to_mmap(®ion_lock, region_index); return Ok(()); } // Find hole big enough to move the region if let Some(hole_start) = layout_lock.find_smallest_adequate_hole(new_reserved) { + // dbg!("Move to hole"); + self.write( hole_start, &self.mmap.read()[start as usize..(start + len) as usize], @@ -224,8 +240,8 @@ impl File { layout_lock.move_region(hole_start, region_index, ®ion_lock)?; region_lock.set_start(hole_start); - region_lock.set_len(new_len); region_lock.set_reserved(new_reserved); + region_lock.set_len(new_len); regions.write_to_mmap(®ion_lock, region_index); drop(layout_lock); @@ -236,6 +252,7 @@ impl File { } // Write at the end + // dbg!("Move and write at the end"); let regions = self.regions.read(); let mut region_lock = region.write(); let (last_region_start, last_region_index) = layout_lock.get_last_region().unwrap(); @@ -253,9 +270,13 @@ impl File { ); self.write(new_start + len, data); + // dbg!(new_start, region_index, ®ion_lock, new_reserved, new_len); + + layout_lock.move_region(new_start, region_index, ®ion_lock)?; + region_lock.set_start(new_start); - region_lock.set_len(new_len); region_lock.set_reserved(new_reserved); + region_lock.set_len(new_len); regions.write_to_mmap(®ion_lock, region_index); self.punch_hole(start, reserved)?; @@ -279,7 +300,7 @@ impl File { slice[start..end].copy_from_slice(data); } - pub fn truncate(&self, index: usize, from: u64) -> Result<()> { + pub fn truncate_region(&self, index: usize, from: u64) -> Result<()> { let Some(region) = self.regions.read().get_region_from_index(index) else { return Err(Error::Str("Unknown region")); }; @@ -306,20 +327,22 @@ impl File { Ok(()) } - pub fn remove(&self, index: usize) -> Result>>> { + pub fn remove_region(&self, index: usize) -> Result>>> { let mut regions = self.regions.write(); let mut layout = self.layout.write(); let Some(region) = regions.remove_region(index)? else { return Ok(None); }; + // dbg!(®ions); let region_ = region.write(); layout.remove_region(index, ®ion_)?; - self.punch_hole(region_.start(), region_.len())?; + // dbg!(layout); + self.punch_hole(region_.start(), region_.reserved())?; drop(region_); Ok(Some(region)) } - fn mmap(file: &fs::File) -> Result { + fn create_mmap(file: &fs::File) -> Result { Ok(unsafe { MmapOptions::new().map_mut(file)? }) } @@ -353,9 +376,55 @@ impl File { Ok(()) } + pub fn regions(&self) -> RwLockReadGuard<'_, Regions> { + self.regions.read() + } + + pub fn layout(&self) -> RwLockReadGuard<'_, Layout> { + self.layout.read() + } + + pub fn mmap(&self) -> RwLockReadGuard<'_, MmapMut> { + self.mmap.read() + } + fn ceil_number_to_page_size_multiple(num: u64) -> u64 { (num + PAGE_SIZE_MINUS_1) & !PAGE_SIZE_MINUS_1 } + + fn data_path(&self) -> PathBuf { + Self::data_path_(&self.path) + } + fn data_path_(path: &Path) -> PathBuf { + path.join("data") + } + + pub fn flush(&self) -> Result<()> { + self.file.write().flush().map_err(|e| e.into()) + } + + pub fn sync_data(&self) -> Result<()> { + self.file.read().sync_data().map_err(|e| e.into()) + } + + pub fn sync_all(&self) -> Result<()> { + self.file.read().sync_all().map_err(|e| e.into()) + } + + pub fn disk_usage(&self) -> String { + let path = self.data_path(); + + let output = std::process::Command::new("du") + .arg("-h") + .arg(&path) + .output() + .expect("Failed to run du"); + + String::from_utf8_lossy(&output.stdout) + .replace(path.to_str().unwrap(), " ") + .trim() + .to_string() + } } #[repr(C)] diff --git a/crates/brk_vecs/src/file/reader.rs b/crates/brk_vecs/src/file/reader.rs index a203ebf3d..372e1b551 100644 --- a/crates/brk_vecs/src/file/reader.rs +++ b/crates/brk_vecs/src/file/reader.rs @@ -3,6 +3,7 @@ use parking_lot::RwLockReadGuard; use super::Region; +#[derive(Debug)] pub struct Reader<'a> { mmap: RwLockReadGuard<'a, MmapMut>, region: RwLockReadGuard<'static, Region>, diff --git a/crates/brk_vecs/src/file/region.rs b/crates/brk_vecs/src/file/region.rs index 253a343a1..0bff03d92 100644 --- a/crates/brk_vecs/src/file/region.rs +++ b/crates/brk_vecs/src/file/region.rs @@ -7,23 +7,23 @@ use crate::PAGE_SIZE; pub struct Region { /// Must be multiple of 4096 start: u64, - length: u64, - /// Must be multiple of 4096 + len: u64, + /// Must be multiple of 4096, greater or equal to len reserved: u64, } pub const SIZE_OF_REGION: usize = size_of::(); impl Region { - pub fn new(start: u64, length: u64, reserved: u64) -> Self { - debug_assert!(reserved > 0); + pub fn new(start: u64, len: u64, reserved: u64) -> Self { debug_assert!(start % PAGE_SIZE == 0); + debug_assert!(reserved >= PAGE_SIZE); debug_assert!(reserved % PAGE_SIZE == 0); - debug_assert!(length <= reserved); + debug_assert!(len <= reserved); Self { start, - length, + len, reserved, } } @@ -38,11 +38,12 @@ impl Region { } pub fn len(&self) -> u64 { - self.length + self.len } pub fn set_len(&mut self, len: u64) { - self.length = len + debug_assert!(len <= self.reserved()); + self.len = len } pub fn reserved(&self) -> u64 { @@ -50,10 +51,14 @@ impl Region { } pub fn set_reserved(&mut self, reserved: u64) { + debug_assert!(self.len() <= reserved); + debug_assert!(reserved >= PAGE_SIZE); + debug_assert!(reserved % PAGE_SIZE == 0); + self.reserved = reserved; } pub fn left(&self) -> u64 { - self.reserved - self.length + self.reserved - self.len } } diff --git a/crates/brk_vecs/src/file/regions.rs b/crates/brk_vecs/src/file/regions.rs index 1b8ac50da..e33414311 100644 --- a/crates/brk_vecs/src/file/regions.rs +++ b/crates/brk_vecs/src/file/regions.rs @@ -103,8 +103,12 @@ impl Regions { let region = Region::new(start, 0, PAGE_SIZE); - self.index_to_region - .push(Some(Arc::new(RwLock::new(region.clone())))); + let region_arc = Some(Arc::new(RwLock::new(region.clone()))); + if index < self.index_to_region.len() { + self.index_to_region[index] = region_arc + } else { + self.index_to_region.push(region_arc); + } self.set_min_len(((index + 1) * SIZE_OF_REGION) as u64)?; @@ -141,8 +145,8 @@ impl Regions { self.index_to_region.get(index).cloned().flatten() } - pub fn get_region_index_from_id(&self, id: String) -> Option { - self.id_to_index.get(&id).copied() + pub fn get_region_index_from_id(&self, id: &str) -> Option { + self.id_to_index.get(id).copied() } fn find_id_from_index(&self, index: usize) -> Option<&String> { @@ -155,10 +159,14 @@ impl Regions { ) } - pub fn as_array(&self) -> &[Option>>] { + pub fn index_to_region(&self) -> &[Option>>] { &self.index_to_region } + pub fn id_to_index(&self) -> &HashMap { + &self.id_to_index + } + pub fn write_to_mmap(&self, region: &Region, index: usize) { let start = index * SIZE_OF_REGION; let end = start + SIZE_OF_REGION; diff --git a/crates/brk_vecs/src/lib.rs b/crates/brk_vecs/src/lib.rs index 6e89f3fb2..429d3e99d 100644 --- a/crates/brk_vecs/src/lib.rs +++ b/crates/brk_vecs/src/lib.rs @@ -1,5 +1,10 @@ mod file; +mod traits; mod variants; -pub use file::*; -pub use variants::*; +use file::*; +use traits::*; +use variants::*; + +pub use file::File; +pub use variants::{RawVec, Stamp, StampedVec}; diff --git a/crates/brk_vecs/src/traits/any.rs b/crates/brk_vecs/src/traits/any.rs new file mode 100644 index 000000000..84c516788 --- /dev/null +++ b/crates/brk_vecs/src/traits/any.rs @@ -0,0 +1,102 @@ +use brk_core::{Height, Version}; + +use super::{BoxedVecIterator, StoredIndex, StoredType}; + +pub fn i64_to_usize(i: i64, len: usize) -> usize { + if i >= 0 { + (i as usize).min(len) + } else { + let v = len as i64 + i; + if v < 0 { 0 } else { v as usize } + } +} + +pub trait AnyVec: Send + Sync { + fn version(&self) -> Version; + fn name(&self) -> &str; + fn len(&self) -> usize; + fn is_empty(&self) -> bool { + self.len() == 0 + } + fn index_type_to_string(&self) -> &'static str; + fn value_type_to_size_of(&self) -> usize; + fn etag(&self, height: Height, to: Option) -> String { + let len = self.len(); + format!( + "{}-{}-{}", + to.map_or(len, |to| { + if to.is_negative() { + len.checked_sub(to.unsigned_abs() as usize) + .unwrap_or_default() + } else { + to as usize + } + }), + u64::from(self.version()), + u32::from(height), + ) + } + + #[inline] + fn i64_to_usize(&self, i: i64) -> usize { + let len = self.len(); + i64_to_usize(i, len) + } +} + +pub trait AnyIterableVec: AnyVec { + #[allow(clippy::wrong_self_convention)] + fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> + where + I: StoredIndex, + T: StoredType + 'a; + + fn iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> + where + I: StoredIndex, + T: StoredType + 'a, + { + self.boxed_iter() + } + + fn iter_at<'a>(&'a self, i: I) -> BoxedVecIterator<'a, I, T> + where + I: StoredIndex, + T: StoredType + 'a, + { + let mut iter = self.boxed_iter(); + iter.set(i); + iter + } + + fn iter_at_<'a>(&'a self, i: usize) -> BoxedVecIterator<'a, I, T> + where + I: StoredIndex, + T: StoredType + 'a, + { + let mut iter = self.boxed_iter(); + iter.set_(i); + iter + } +} + +pub trait CloneableAnyIterableVec: AnyIterableVec { + fn boxed_clone(&self) -> Box>; +} + +impl CloneableAnyIterableVec for U +where + U: 'static + AnyIterableVec + Clone, +{ + fn boxed_clone(&self) -> Box> { + Box::new(self.clone()) + } +} + +impl Clone for Box> { + fn clone(&self) -> Self { + self.boxed_clone() + } +} + +pub type BoxedAnyIterableVec = Box>; diff --git a/crates/brk_vecs/src/traits/collectable.rs b/crates/brk_vecs/src/traits/collectable.rs new file mode 100644 index 000000000..87d7b840b --- /dev/null +++ b/crates/brk_vecs/src/traits/collectable.rs @@ -0,0 +1,87 @@ +use brk_core::{Error, Result}; + +use crate::i64_to_usize; + +use super::{AnyIterableVec, AnyVec, StoredIndex, StoredType}; + +pub trait CollectableVec: AnyVec + AnyIterableVec +where + Self: Clone, + I: StoredIndex, + T: StoredType, +{ + fn collect(&self) -> Result> { + self.collect_range(None, None) + } + + fn collect_range(&self, from: Option, to: Option) -> Result> { + let len = self.len(); + let from = from.unwrap_or_default(); + let to = to.map_or(len, |to| to.min(len)); + + if from >= len || from >= to { + return Ok(vec![]); + } + + Ok(self + .iter_at_(from) + .take(to - from) + .map(|(_, v)| v.into_owned()) + .collect::>()) + } + + #[inline] + fn i64_to_usize_(i: i64, len: usize) -> usize { + if i >= 0 { + (i as usize).min(len) + } else { + let v = len as i64 + i; + if v < 0 { 0 } else { v as usize } + } + } + + fn collect_signed_range(&self, from: Option, to: Option) -> Result> { + let from = from.map(|i| self.i64_to_usize(i)); + let to = to.map(|i| self.i64_to_usize(i)); + self.collect_range(from, to) + } + + #[inline] + fn collect_range_serde_json( + &self, + from: Option, + to: Option, + ) -> Result> { + self.collect_range(from, to)? + .into_iter() + .map(|v| serde_json::to_value(v).map_err(Error::from)) + .collect::>>() + } +} + +impl CollectableVec for V +where + V: AnyVec + AnyIterableVec + Clone, + I: StoredIndex, + T: StoredType, +{ +} + +pub trait AnyCollectableVec: AnyVec { + fn collect_range_serde_json( + &self, + from: Option, + to: Option, + ) -> Result>; + + fn range_count(&self, from: Option, to: Option) -> usize { + let len = self.len(); + let from = from.map(|i| i64_to_usize(i, len)); + let to = to.map(|i| i64_to_usize(i, len)); + (from.unwrap_or_default()..to.unwrap_or(len)).count() + } + + fn range_weight(&self, from: Option, to: Option) -> usize { + self.range_count(from, to) * self.value_type_to_size_of() + } +} diff --git a/crates/brk_vecs/src/traits/generic.rs b/crates/brk_vecs/src/traits/generic.rs new file mode 100644 index 000000000..9c880af11 --- /dev/null +++ b/crates/brk_vecs/src/traits/generic.rs @@ -0,0 +1,207 @@ +use std::{ + borrow::Cow, + cmp::Ordering, + collections::{BTreeMap, BTreeSet}, +}; + +use brk_core::{Error, Result}; +use memmap2::Mmap; + +use crate::{AnyVec, HEADER_OFFSET, Header}; + +use super::{StoredIndex, StoredType}; + +pub trait GenericStoredVec: Send + Sync +where + Self: AnyVec, + I: StoredIndex, + T: StoredType, +{ + const SIZE_OF_T: usize = size_of::(); + + #[inline] + fn unwrap_read(&self, index: I, mmap: &Mmap) -> T { + self.read(index, mmap).unwrap().unwrap() + } + #[inline] + fn read(&self, index: I, mmap: &Mmap) -> Result> { + self.read_(index.to_usize()?, mmap) + } + fn read_(&self, index: usize, mmap: &Mmap) -> Result>; + + #[inline] + fn get_or_read(&self, index: I, mmap: &Mmap) -> Result>> { + self.get_or_read_(index.to_usize()?, mmap) + } + #[inline] + fn get_or_read_(&self, index: usize, mmap: &Mmap) -> Result>> { + let stored_len = self.stored_len(); + + if index >= stored_len { + let pushed = self.pushed(); + let j = index - stored_len; + if j >= pushed.len() { + return Ok(None); + } + return Ok(pushed.get(j).map(Cow::Borrowed)); + } + + let updated = self.updated(); + if !updated.is_empty() + && let Some(updated) = updated.get(&index) + { + return Ok(Some(Cow::Borrowed(updated))); + } + + let holes = self.holes(); + if !holes.is_empty() && holes.contains(&index) { + return Ok(None); + } + + Ok(self.read_(index, mmap)?.map(Cow::Owned)) + } + + #[inline] + fn len_(&self) -> usize { + self.stored_len() + self.pushed_len() + } + + fn index_to_name(&self) -> String { + format!("{}_to_{}", I::to_string(), self.name()) + } + + fn stored_len(&self) -> usize; + + fn pushed(&self) -> &[T]; + #[inline] + fn pushed_len(&self) -> usize { + self.pushed().len() + } + fn mut_pushed(&mut self) -> &mut Vec; + #[inline] + fn push(&mut self, value: T) { + self.mut_pushed().push(value) + } + + #[inline] + fn update_or_push(&mut self, index: I, value: T) -> Result<()> { + let len = self.len(); + match len.cmp(&index.to_usize()?) { + Ordering::Less => { + dbg!(index, value, len, self.header()); + Err(Error::IndexTooHigh) + } + Ordering::Equal => { + self.push(value); + Ok(()) + } + Ordering::Greater => self.update(index, value), + } + } + + fn get_first_empty_index(&self) -> I { + self.holes() + .first() + .cloned() + .unwrap_or_else(|| self.len_()) + .into() + } + + #[inline] + fn fill_first_hole_or_push(&mut self, value: T) -> Result { + Ok( + if let Some(hole) = self.mut_holes().pop_first().map(I::from) { + self.update(hole, value)?; + hole + } else { + self.push(value); + I::from(self.len() - 1) + }, + ) + } + + fn holes(&self) -> &BTreeSet; + fn mut_holes(&mut self) -> &mut BTreeSet; + fn take(&mut self, index: I, mmap: &Mmap) -> Result> { + let opt = self.get_or_read(index, mmap)?.map(|v| v.into_owned()); + if opt.is_some() { + self.unchecked_delete(index); + } + Ok(opt) + } + #[inline] + fn delete(&mut self, index: I) { + if index.unwrap_to_usize() < self.len() { + self.unchecked_delete(index); + } + } + #[inline] + #[doc(hidden)] + fn unchecked_delete(&mut self, index: I) { + let uindex = index.unwrap_to_usize(); + let updated = self.mut_updated(); + if !updated.is_empty() { + updated.remove(&uindex); + } + self.mut_holes().insert(uindex); + } + + fn updated(&self) -> &BTreeMap; + fn mut_updated(&mut self) -> &mut BTreeMap; + #[inline] + fn update(&mut self, index: I, value: T) -> Result<()> { + let uindex = index.unwrap_to_usize(); + let stored_len = self.stored_len(); + + if uindex >= stored_len { + if let Some(prev) = self.mut_pushed().get_mut(uindex - stored_len) { + *prev = value; + return Ok(()); + } else { + return Err(Error::IndexTooHigh); + } + } + + let holes = self.mut_holes(); + if !holes.is_empty() { + holes.remove(&index.unwrap_to_usize()); + } + + self.mut_updated().insert(index.unwrap_to_usize(), value); + + Ok(()) + } + + fn header(&self) -> &Header; + fn mut_header(&mut self) -> &mut Header; + + fn reset(&mut self) -> Result<()>; + + #[inline] + fn reset_(&mut self) -> Result<()> { + let holes_path = self.holes_path(); + if fs::exists(&holes_path)? { + fs::remove_file(&holes_path)?; + } + let mut file = self.open_file()?; + self.file_truncate_and_write_all(&mut file, HEADER_OFFSET as u64, &[]) + } + + #[inline] + fn is_pushed_empty(&self) -> bool { + self.pushed_len() == 0 + } + + #[inline] + fn has(&self, index: I) -> Result { + Ok(self.has_(index.to_usize()?)) + } + #[inline] + fn has_(&self, index: usize) -> bool { + index < self.len_() + } + + fn flush(&mut self) -> Result<()>; + + fn truncate_if_needed(&mut self, index: I) -> Result<()>; +} diff --git a/crates/brk_vecs/src/traits/index.rs b/crates/brk_vecs/src/traits/index.rs new file mode 100644 index 000000000..723d28b45 --- /dev/null +++ b/crates/brk_vecs/src/traits/index.rs @@ -0,0 +1,67 @@ +use std::{fmt::Debug, ops::Add}; + +use brk_core::{Error, Printable, Result}; +use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; + +pub trait StoredIndex +where + Self: Debug + + Default + + Copy + + Clone + + PartialEq + + Eq + + PartialOrd + + Ord + + TryInto + + From + + Add + + TryFromBytes + + IntoBytes + + Immutable + + KnownLayout + + Send + + Sync + + Printable, +{ + fn unwrap_to_usize(self) -> usize; + fn to_usize(self) -> Result; + fn decremented(self) -> Option; +} + +impl StoredIndex for I +where + I: Debug + + Default + + Copy + + Clone + + PartialEq + + Eq + + PartialOrd + + Ord + + TryInto + + From + + Add + + TryFromBytes + + IntoBytes + + Immutable + + KnownLayout + + Send + + Sync + + Printable, +{ + #[inline] + fn unwrap_to_usize(self) -> usize { + self.to_usize().unwrap() + } + + #[inline] + fn to_usize(self) -> Result { + self.try_into().map_err(|_| Error::FailedKeyTryIntoUsize) + } + + #[inline] + fn decremented(self) -> Option { + self.unwrap_to_usize().checked_sub(1).map(Self::from) + } +} diff --git a/crates/brk_vecs/src/traits/iterator.rs b/crates/brk_vecs/src/traits/iterator.rs new file mode 100644 index 000000000..097e513a7 --- /dev/null +++ b/crates/brk_vecs/src/traits/iterator.rs @@ -0,0 +1,105 @@ +use std::{borrow::Cow, iter::Skip}; + +use brk_core::Printable; + +use super::{StoredIndex, StoredType}; + +pub trait BaseVecIterator: Iterator { + fn mut_index(&mut self) -> &mut usize; + + #[inline] + fn set_(&mut self, i: usize) { + *self.mut_index() = i; + } + + #[inline] + fn next_at(&mut self, i: usize) -> Option { + self.set_(i); + self.next() + } + + fn len(&self) -> usize; + + fn name(&self) -> &str; + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn skip(self, _: usize) -> Skip + where + Self: Sized, + { + todo!("") + } +} + +pub trait VecIterator<'a>: BaseVecIterator)> { + type I: StoredIndex; + type T: StoredType + 'a; + + #[inline] + fn set(&mut self, i: Self::I) { + self.set_(i.unwrap_to_usize()) + } + + #[inline] + fn get_(&mut self, i: usize) -> Option> { + self.next_at(i).map(|(_, v)| v) + } + + #[inline] + fn get(&mut self, i: Self::I) -> Option> { + self.get_(i.unwrap_to_usize()) + } + + #[inline] + fn unwrap_get_inner(&mut self, i: Self::I) -> Self::T { + self.unwrap_get_inner_(i.unwrap_to_usize()) + } + + #[inline] + fn unwrap_get_inner_(&mut self, i: usize) -> Self::T { + self.get_(i) + .unwrap_or_else(|| { + dbg!(self.name(), i, self.len(), Self::I::to_string()); + panic!("unwrap_get_inner_") + }) + .into_owned() + } + + #[inline] + fn get_inner(&mut self, i: Self::I) -> Option { + self.get_(i.unwrap_to_usize()).map(|v| v.into_owned()) + } + + fn last(mut self) -> Option + where + Self: Sized, + { + let len = self.len(); + if len == 0 { + return None; + } + let i = len - 1; + self.set_(i); + self.next() + } + + fn index_type_to_string(&self) -> &'static str { + Self::I::to_string() + } +} + +impl<'a, I, T, Iter> VecIterator<'a> for Iter +where + Iter: BaseVecIterator)>, + I: StoredIndex, + T: StoredType + 'a, +{ + type I = I; + type T = T; +} + +pub type BoxedVecIterator<'a, I, T> = + Box)> + 'a>; diff --git a/crates/brk_vecs/src/traits/mod.rs b/crates/brk_vecs/src/traits/mod.rs new file mode 100644 index 000000000..d07c2a598 --- /dev/null +++ b/crates/brk_vecs/src/traits/mod.rs @@ -0,0 +1,13 @@ +mod any; +mod collectable; +mod generic; +mod index; +mod iterator; +mod r#type; + +pub use any::*; +pub use collectable::*; +pub use generic::*; +pub use index::*; +pub use iterator::*; +pub use r#type::*; diff --git a/crates/brk_vecs/src/traits/type.rs b/crates/brk_vecs/src/traits/type.rs new file mode 100644 index 000000000..3acb2f7bf --- /dev/null +++ b/crates/brk_vecs/src/traits/type.rs @@ -0,0 +1,33 @@ +use std::fmt::Debug; + +use serde::Serialize; +use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; + +pub trait StoredType +where + Self: Sized + + Debug + + Clone + + TryFromBytes + + IntoBytes + + Immutable + + KnownLayout + + Send + + Sync + + Serialize, +{ +} + +impl StoredType for T where + T: Sized + + Debug + + Clone + + TryFromBytes + + IntoBytes + + Immutable + + KnownLayout + + Send + + Sync + + Serialize +{ +} diff --git a/crates/brk_vecs/src/variants/compressed/compressed_page_meta.rs b/crates/brk_vecs/src/variants/compressed/compressed_page_meta.rs new file mode 100644 index 000000000..a0b18cf7e --- /dev/null +++ b/crates/brk_vecs/src/variants/compressed/compressed_page_meta.rs @@ -0,0 +1,18 @@ +use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +#[derive(Debug, Clone, IntoBytes, Immutable, FromBytes, KnownLayout)] +pub struct CompressedPageMetadata { + pub start: u64, + pub bytes_len: u32, + pub values_len: u32, +} + +impl CompressedPageMetadata { + pub fn new(start: u64, bytes_len: u32, values_len: u32) -> Self { + Self { + start, + bytes_len, + values_len, + } + } +} diff --git a/crates/brk_vecs/src/variants/compressed/compressed_pages_meta.rs b/crates/brk_vecs/src/variants/compressed/compressed_pages_meta.rs new file mode 100644 index 000000000..6d170e064 --- /dev/null +++ b/crates/brk_vecs/src/variants/compressed/compressed_pages_meta.rs @@ -0,0 +1,117 @@ +use std::{ + fs::{self, OpenOptions}, + io::{self, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, +}; + +use brk_core::Result; +use rayon::prelude::*; +use zerocopy::{IntoBytes, TryFromBytes}; + +use super::{CompressedPageMetadata, UnsafeSlice}; + +#[derive(Debug, Clone)] +pub struct CompressedPagesMetadata { + vec: Vec, + change_at: Option, + path: PathBuf, +} + +impl CompressedPagesMetadata { + const PAGE_SIZE: usize = size_of::(); + + pub fn read(path: &Path) -> Result { + let this = Self { + vec: fs::read(path) + .unwrap_or_default() + .chunks(Self::PAGE_SIZE) + .map(|bytes| { + if bytes.len() != Self::PAGE_SIZE { + panic!() + } + CompressedPageMetadata::try_read_from_bytes(bytes).unwrap() + }) + .collect::>(), + path: path.to_owned(), + change_at: None, + }; + + Ok(this) + } + + pub fn write(&mut self) -> io::Result<()> { + if self.change_at.is_none() { + return Ok(()); + } + + let change_at = self.change_at.take().unwrap(); + + let len = (self.vec.len() - change_at) * Self::PAGE_SIZE; + + let mut bytes: Vec = vec![0; len]; + + let unsafe_bytes = UnsafeSlice::new(&mut bytes); + + self.vec[change_at..] + .par_iter() + .enumerate() + .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::PAGE_SIZE, v.as_bytes())); + + let mut file = OpenOptions::new() + .read(true) + .create(true) + .truncate(false) + .append(true) + .open(&self.path)?; + + file.set_len((change_at * Self::PAGE_SIZE) as u64)?; + file.seek(SeekFrom::End(0))?; + + file.write_all(&bytes)?; + + Ok(()) + } + + pub fn len(&self) -> usize { + self.vec.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn get(&self, page_index: usize) -> Option<&CompressedPageMetadata> { + self.vec.get(page_index) + } + + pub fn last(&self) -> Option<&CompressedPageMetadata> { + self.vec.last() + } + + pub fn pop(&mut self) -> Option { + self.vec.pop() + } + + pub fn push(&mut self, page_index: usize, page: CompressedPageMetadata) { + if page_index != self.vec.len() { + panic!(); + } + + self.set_changed_at(page_index); + + self.vec.push(page); + } + + fn set_changed_at(&mut self, page_index: usize) { + if self.change_at.is_none_or(|pi| pi > page_index) { + self.change_at.replace(page_index); + } + } + + pub fn truncate(&mut self, page_index: usize) -> Option { + let page = self.get(page_index).cloned(); + self.vec.truncate(page_index); + self.set_changed_at(page_index); + page + } +} diff --git a/crates/brk_vecs/src/variants/compressed/mod.rs b/crates/brk_vecs/src/variants/compressed/mod.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/brk_vecs/src/variants/mod.rs b/crates/brk_vecs/src/variants/mod.rs index 8e5473b96..95f3f0e74 100644 --- a/crates/brk_vecs/src/variants/mod.rs +++ b/crates/brk_vecs/src/variants/mod.rs @@ -1,3 +1,9 @@ +mod compressed; mod raw; +mod stamped; +mod stored; +pub use compressed::*; pub use raw::*; +pub use stamped::*; +pub use stored::*; diff --git a/crates/brk_vecs/src/variants/raw.rs b/crates/brk_vecs/src/variants/raw.rs deleted file mode 100644 index 172eb1c8d..000000000 --- a/crates/brk_vecs/src/variants/raw.rs +++ /dev/null @@ -1,8 +0,0 @@ -use std::sync::Arc; - -use crate::File; - -pub struct RawVec { - region: usize, - file: Arc, -} diff --git a/crates/brk_vecs/src/variants/raw/header.rs b/crates/brk_vecs/src/variants/raw/header.rs new file mode 100644 index 000000000..12e969384 --- /dev/null +++ b/crates/brk_vecs/src/variants/raw/header.rs @@ -0,0 +1,195 @@ +use std::{ + fs::File, + io::{self, Seek, SeekFrom}, + os::unix::fs::FileExt, + sync::Arc, +}; + +use brk_core::{Error, Result, Version}; +use parking_lot::RwLock; +use zerocopy::{FromBytes, IntoBytes}; +use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +use crate::Stamp; + +use super::Format; + +const HEADER_VERSION: Version = Version::ONE; +pub const HEADER_OFFSET: usize = size_of::(); + +#[derive(Debug, Clone)] +pub struct Header { + inner: Arc>, + modified: bool, +} + +impl Header { + pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result { + let inner = HeaderInner::create_and_write(file, vec_version, format)?; + Ok(Self { + inner: Arc::new(RwLock::new(inner)), + modified: false, + }) + } + + pub fn import_and_verify( + file: &mut File, + vec_version: Version, + format: Format, + ) -> Result { + let inner = HeaderInner::import_and_verify(file, vec_version, format)?; + Ok(Self { + inner: Arc::new(RwLock::new(inner)), + modified: false, + }) + } + + pub fn update_stamp(&mut self, stamp: Stamp) { + self.modified = true; + self.inner.write().stamp = stamp; + } + + pub fn update_computed_version(&mut self, computed_version: Version) { + self.modified = true; + self.inner.write().computed_version = computed_version; + } + + pub fn modified(&self) -> bool { + self.modified + } + + pub fn vec_version(&self) -> Version { + self.inner.read().vec_version + } + + pub fn computed_version(&self) -> Version { + self.inner.read().computed_version + } + + pub fn stamp(&self) -> Stamp { + self.inner.read().stamp + } + + pub fn write(&mut self, file: &mut File) -> io::Result<()> { + self.inner.read().write(file)?; + self.modified = false; + Ok(()) + } + + pub fn inner(&self) -> &Arc> { + &self.inner + } +} + +#[repr(C)] +#[derive(Debug, Clone, FromBytes, IntoBytes, Immutable, KnownLayout)] +struct HeaderInner { + pub header_version: Version, + pub vec_version: Version, + pub computed_version: Version, + pub stamp: Stamp, + pub compressed: ZeroCopyBool, +} + +impl HeaderInner { + pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result { + let header = Self { + header_version: HEADER_VERSION, + vec_version, + computed_version: Version::default(), + stamp: Stamp::default(), + compressed: ZeroCopyBool::from(format), + }; + header.write(file)?; + file.seek(SeekFrom::End(0))?; + Ok(header) + } + + pub fn write(&self, file: &mut File) -> io::Result<()> { + file.write_all_at(self.as_bytes(), 0) + } + + pub fn import_and_verify( + file: &mut File, + vec_version: Version, + format: Format, + ) -> Result { + let len = file.metadata()?.len(); + + if len < HEADER_OFFSET as u64 { + return Err(Error::WrongLength); + } + + let mut buf = [0; HEADER_OFFSET]; + file.read_exact_at(&mut buf, 0)?; + + let header = HeaderInner::read_from_bytes(&buf)?; + + if header.header_version != HEADER_VERSION { + return Err(Error::DifferentVersion { + found: header.header_version, + expected: HEADER_VERSION, + }); + } + if header.vec_version != vec_version { + return Err(Error::DifferentVersion { + found: header.vec_version, + expected: vec_version, + }); + } + if header.compressed.is_broken() { + return Err(Error::WrongEndian); + } + if (header.compressed.is_true() && format.is_raw()) + || (header.compressed.is_false() && format.is_compressed()) + { + return Err(Error::DifferentCompressionMode); + } + + Ok(header) + } +} + +#[derive( + Debug, + Clone, + Copy, + Default, + PartialEq, + Eq, + PartialOrd, + Ord, + FromBytes, + IntoBytes, + Immutable, + KnownLayout, +)] +#[repr(C)] +pub struct ZeroCopyBool(u64); + +impl ZeroCopyBool { + pub const TRUE: Self = Self(1); + pub const FALSE: Self = Self(0); + + pub fn is_true(&self) -> bool { + *self == Self::TRUE + } + + pub fn is_false(&self) -> bool { + *self == Self::FALSE + } + + pub fn is_broken(&self) -> bool { + *self > Self::TRUE + } +} + +impl From for ZeroCopyBool { + fn from(value: Format) -> Self { + if value.is_raw() { + Self::FALSE + } else { + Self::TRUE + } + } +} diff --git a/crates/brk_vecs/src/variants/raw/mod.rs b/crates/brk_vecs/src/variants/raw/mod.rs new file mode 100644 index 000000000..f3a636325 --- /dev/null +++ b/crates/brk_vecs/src/variants/raw/mod.rs @@ -0,0 +1,448 @@ +use std::{ + borrow::Cow, + collections::{BTreeMap, BTreeSet}, + fs, io, + marker::PhantomData, + mem, + os::unix::fs::FileExt, + sync::Arc, +}; + +use brk_core::{Error, Result, Version}; +use memmap2::Mmap; +use rayon::prelude::*; +use zerocopy::IntoBytes; + +use crate::{ + AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec, + File, GenericStoredVec, StoredIndex, StoredType, file::Reader, +}; + +use super::Format; + +mod header; +mod unsafe_slice; + +pub use header::*; +pub use unsafe_slice::*; + +const VERSION: Version = Version::ONE; + +#[derive(Debug)] +pub struct RawVec { + region: usize, + file: Arc, + + header: Header, + name: &'static str, + pushed: Vec, + has_stored_holes: bool, + holes: BTreeSet, + updated: BTreeMap, + phantom: PhantomData, +} + +impl RawVec +where + I: StoredIndex, + T: StoredType, +{ + /// Same as import but will reset the folder under certain errors, so be careful ! + pub fn forced_import(file: &Arc, name: &str, mut version: Version) -> Result { + version = version + VERSION; + let res = Self::import(file, name, version); + match res { + Err(Error::DifferentCompressionMode) + | Err(Error::WrongEndian) + | Err(Error::WrongLength) + | Err(Error::DifferentVersion { .. }) => { + fs::remove_file(path)?; + let holes_path = Self::holes_path_(parent, name); + if fs::exists(&holes_path)? { + fs::remove_file(holes_path)?; + } + Self::import(file, name, version) + } + _ => res, + } + } + + pub fn import(file: &Arc, name: &str, version: Version) -> Result { + let region = file.create_region_if_needed(&format!("{name}_{}", I::to_string()))?; + + let (header, file) = match Self::open_file_(&path) { + Ok(mut file) => { + if file.metadata()?.len() == 0 { + ( + Header::create_and_write(&mut file, version, Format::Raw)?, + Some(file), + ) + } else { + ( + Header::import_and_verify(&mut file, version, Format::Raw)?, + Some(file), + ) + } + } + Err(e) => match e.kind() { + io::ErrorKind::NotFound => { + fs::create_dir_all(Self::folder_(parent, name))?; + let mut file = Self::open_file_(&path)?; + let header = Header::create_and_write(&mut file, version, Format::Raw)?; + (header, None) + } + _ => { + return Err(e.into()); + } + }, + }; + + let stored_len = if let Some(file) = file { + (file.metadata()?.len() as usize - HEADER_OFFSET) / Self::SIZE_OF_T + } else { + 0 + }; + + let holes_path = Self::holes_path_(parent, name); + let holes = if fs::exists(&holes_path)? { + Some( + fs::read(&holes_path)? + .chunks(size_of::()) + .map(|b| -> Result { + Ok(usize::from_ne_bytes(brk_core::copy_first_8bytes(b)?)) + }) + .collect::>>()?, + ) + } else { + None + }; + + Ok(Self { + file, + region, + header, + name: Box::leak(Box::new(name.to_string())), + pushed: vec![], + has_stored_holes: holes.is_some(), + holes: holes.unwrap_or_default(), + updated: BTreeMap::new(), + phantom: PhantomData, + }) + } + + #[inline] + pub fn iter(&self) -> RawVecIterator<'_, I, T> { + self.into_iter() + } + + #[inline] + pub fn iter_at(&self, i: I) -> RawVecIterator<'_, I, T> { + self.iter_at_(i.unwrap_to_usize()) + } + + #[inline] + pub fn iter_at_(&self, i: usize) -> RawVecIterator<'_, I, T> { + let mut iter = self.into_iter(); + iter.set_(i); + iter + } + + pub fn write_header_if_needed(&mut self) -> Result<()> { + if self.header.modified() { + self.file.write_all_to_region_at( + self.region, + self.header.inner().read().as_bytes(), + 0, + )?; + } + Ok(()) + } +} + +impl GenericStoredVec for RawVec +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn read_(&self, index: usize, mmap: &Mmap) -> Result> { + let index = index * Self::SIZE_OF_T + HEADER_OFFSET; + let slice = &mmap[index..(index + Self::SIZE_OF_T)]; + T::try_read_from_bytes(slice) + .map(|v| Some(v)) + .map_err(Error::from) + } + + fn header(&self) -> &Header { + &self.header + } + + fn mut_header(&mut self) -> &mut Header { + &mut self.header + } + + #[inline] + fn stored_len(&self) -> usize { + self.file.get_region(self.region).unwrap().len() as usize / Self::SIZE_OF_T + } + + #[inline] + fn pushed(&self) -> &[T] { + self.pushed.as_slice() + } + #[inline] + fn mut_pushed(&mut self) -> &mut Vec { + &mut self.pushed + } + + #[inline] + fn holes(&self) -> &BTreeSet { + &self.holes + } + #[inline] + fn mut_holes(&mut self) -> &mut BTreeSet { + &mut self.holes + } + + #[inline] + fn updated(&self) -> &BTreeMap { + &self.updated + } + #[inline] + fn mut_updated(&mut self) -> &mut BTreeMap { + &mut self.updated + } + + fn flush(&mut self) -> Result<()> { + let file_opt = self.write_header_if_needed()?; + + let pushed_len = self.pushed_len(); + + let has_new_data = pushed_len != 0; + let has_updated_data = !self.updated.is_empty(); + let has_holes = !self.holes.is_empty(); + let had_holes = self.has_stored_holes && !has_holes; + + if !has_new_data && !has_updated_data && !has_holes && !had_holes { + return Ok(()); + } + + if has_new_data || has_updated_data { + let mut file = file_opt.unwrap_or(self.open_file()?); + + if has_new_data { + let bytes = { + let mut bytes: Vec = vec![0; pushed_len * Self::SIZE_OF_T]; + + let unsafe_bytes = UnsafeSlice::new(&mut bytes); + + mem::take(&mut self.pushed) + .into_par_iter() + .enumerate() + .for_each(|(i, v)| { + unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes()) + }); + + bytes + }; + + self.file_write_all(&mut file, &bytes)?; + } + + if has_updated_data { + mem::take(&mut self.updated) + .into_iter() + .try_for_each(|(i, v)| -> Result<()> { + file.write_all_at( + v.as_bytes(), + ((i * Self::SIZE_OF_T) + HEADER_OFFSET) as u64, + )?; + Ok(()) + })?; + } + } + + if has_holes || had_holes { + let holes_path = self.holes_path(); + if has_holes { + self.has_stored_holes = true; + fs::write( + &holes_path, + self.holes + .iter() + .flat_map(|i| i.to_ne_bytes()) + .collect::>(), + )?; + } else if had_holes { + self.has_stored_holes = false; + let _ = fs::remove_file(&holes_path); + } + } + + Ok(()) + } + + fn truncate_if_needed(&mut self, index: I) -> Result<()> { + let index = index.to_usize()?; + + if index >= self.stored_len() { + return Ok(()); + } + + if index == 0 { + self.reset()?; + return Ok(()); + } + + let from = index * Self::SIZE_OF_T + HEADER_OFFSET; + self.file.truncate_region(self.region, from as u64) + } + + fn reset(&mut self) -> Result<()> { + self.set_stored_len(0); + self.reset_() + } +} + +impl AnyVec for RawVec +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn version(&self) -> Version { + self.header.vec_version() + } + + #[inline] + fn name(&self) -> &str { + self.name + } + + #[inline] + fn len(&self) -> usize { + self.len_() + } + + #[inline] + fn index_type_to_string(&self) -> &'static str { + I::to_string() + } + + #[inline] + fn value_type_to_size_of(&self) -> usize { + size_of::() + } +} + +impl Clone for RawVec { + fn clone(&self) -> Self { + Self { + file: self.file.clone(), + region: self.region, + header: self.header.clone(), + name: self.name, + pushed: vec![], + updated: BTreeMap::new(), + has_stored_holes: false, + holes: BTreeSet::new(), + phantom: PhantomData, + } + } +} + +#[derive(Debug)] +pub struct RawVecIterator<'a, I, T> { + vec: &'a RawVec, + reader: Reader<'a>, + index: usize, +} + +impl BaseVecIterator for RawVecIterator<'_, I, T> +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn mut_index(&mut self) -> &mut usize { + &mut self.index + } + + #[inline] + fn len(&self) -> usize { + self.vec.len() + } + + #[inline] + fn name(&self) -> &str { + self.vec.name() + } +} + +impl<'a, I, T> Iterator for RawVecIterator<'a, I, T> +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Cow<'a, T>); + + fn next(&mut self) -> Option { + let index = self.index; + + let opt = self + .vec + .get_or_read_(index, &self.mmap) + .unwrap() + .map(|v| (I::from(index), v)); + + if opt.is_some() { + self.index += 1; + } + + opt + } +} + +impl<'a, I, T> IntoIterator for &'a RawVec +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Cow<'a, T>); + type IntoIter = RawVecIterator<'a, I, T>; + + fn into_iter(self) -> Self::IntoIter { + RawVecIterator { + vec: self, + reader: self.file.read_region(self.region).unwrap(), + index: 0, + } + } +} + +impl AnyIterableVec for RawVec +where + I: StoredIndex, + T: StoredType, +{ + fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> + where + T: 'a, + { + Box::new(self.into_iter()) + } +} + +impl AnyCollectableVec for RawVec +where + I: StoredIndex, + T: StoredType, +{ + fn collect_range_serde_json( + &self, + from: Option, + to: Option, + ) -> Result> { + CollectableVec::collect_range_serde_json(self, from, to) + } +} diff --git a/crates/brk_vecs/src/variants/raw/unsafe_slice.rs b/crates/brk_vecs/src/variants/raw/unsafe_slice.rs new file mode 100644 index 000000000..37bdeb6f4 --- /dev/null +++ b/crates/brk_vecs/src/variants/raw/unsafe_slice.rs @@ -0,0 +1,35 @@ +use std::cell::UnsafeCell; + +#[derive(Copy, Clone)] +pub struct UnsafeSlice<'a, T>(&'a [UnsafeCell]); +unsafe impl Send for UnsafeSlice<'_, T> {} +unsafe impl Sync for UnsafeSlice<'_, T> {} + +impl<'a, T> UnsafeSlice<'a, T> { + pub fn new(slice: &'a mut [T]) -> Self { + let ptr = slice as *mut [T] as *const [UnsafeCell]; + Self(unsafe { &*ptr }) + } + + /// SAFETY: It is UB if two threads write to the same index without + /// synchronization. + pub fn write(&self, i: usize, value: T) { + unsafe { + *self.0[i].get() = value; + } + } + + /// SAFETY: It is UB + pub fn get(&self, i: usize) -> *mut T { + self.0[i].get() + } + + pub fn copy_slice(&self, start: usize, slice: &[T]) + where + T: Copy, + { + slice.iter().enumerate().for_each(|(i, v)| { + self.write(start + i, *v); + }); + } +} diff --git a/crates/brk_vecs/src/variants/stamped.rs b/crates/brk_vecs/src/variants/stamped.rs deleted file mode 100644 index 6854e4181..000000000 --- a/crates/brk_vecs/src/variants/stamped.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub struct Stamp(u64); - -pub struct StampedVec; diff --git a/crates/brk_vecs/src/variants/stamped/mod.rs b/crates/brk_vecs/src/variants/stamped/mod.rs new file mode 100644 index 000000000..cf947ac58 --- /dev/null +++ b/crates/brk_vecs/src/variants/stamped/mod.rs @@ -0,0 +1,5 @@ +mod stamp; + +pub use stamp::*; + +pub struct StampedVec; diff --git a/crates/brk_vecs/src/variants/stamped/stamp.rs b/crates/brk_vecs/src/variants/stamped/stamp.rs new file mode 100644 index 000000000..794963c98 --- /dev/null +++ b/crates/brk_vecs/src/variants/stamped/stamp.rs @@ -0,0 +1,4 @@ +use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +#[derive(Debug, Default, Clone, Copy, FromBytes, IntoBytes, Immutable, KnownLayout)] +pub struct Stamp(u64); diff --git a/crates/brk_vecs/src/variants/stored/format.rs b/crates/brk_vecs/src/variants/stored/format.rs new file mode 100644 index 000000000..b5816653e --- /dev/null +++ b/crates/brk_vecs/src/variants/stored/format.rs @@ -0,0 +1,63 @@ +use std::{fs, io, path::Path}; + +use brk_core::{Error, Result}; +use serde::{Deserialize, Serialize}; + +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub enum Format { + Compressed, + #[default] + Raw, +} + +impl Format { + pub fn write(&self, path: &Path) -> Result<(), io::Error> { + fs::write(path, self.as_bytes()) + } + + pub fn is_raw(&self) -> bool { + *self == Self::Raw + } + + pub fn is_compressed(&self) -> bool { + *self == Self::Compressed + } + + fn as_bytes(&self) -> Vec { + if self.is_compressed() { + vec![1] + } else { + vec![0] + } + } + + fn from_bytes(bytes: &[u8]) -> Self { + if bytes.len() != 1 { + panic!(); + } + if bytes[0] == 1 { + Self::Compressed + } else if bytes[0] == 0 { + Self::Raw + } else { + panic!() + } + } + + pub fn validate(&self, path: &Path) -> Result<()> { + if let Ok(prev_compressed) = Format::try_from(path) { + if prev_compressed != *self { + return Err(Error::DifferentCompressionMode); + } + } + + Ok(()) + } +} + +impl TryFrom<&Path> for Format { + type Error = Error; + fn try_from(value: &Path) -> Result { + Ok(Self::from_bytes(&fs::read(value)?)) + } +} diff --git a/crates/brk_vecs/src/variants/stored/mod.rs b/crates/brk_vecs/src/variants/stored/mod.rs new file mode 100644 index 000000000..af5f01f09 --- /dev/null +++ b/crates/brk_vecs/src/variants/stored/mod.rs @@ -0,0 +1,3 @@ +mod format; + +pub use format::*;