diff --git a/Cargo.lock b/Cargo.lock index 2ca871a44..cbd1033fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1072,23 +1072,6 @@ dependencies = [ "zstd", ] -[[package]] -name = "brk_vecs_gen" -version = "0.0.81" -dependencies = [ - "brk_core", - "env_logger", - "log", - "memmap2", - "rayon", - "serde", - "serde_json", - "thiserror 2.0.12", - "zerocopy", - "zerocopy-derive", - "zstd", -] - [[package]] name = "brotli" version = "8.0.1" diff --git a/crates/brk_vecs/.gitignore b/crates/brk_vecs/.gitignore index 15176147d..75c17868d 100644 --- a/crates/brk_vecs/.gitignore +++ b/crates/brk_vecs/.gitignore @@ -1 +1,2 @@ /vecs +/raw diff --git a/crates/brk_vecs/examples/main.rs b/crates/brk_vecs/examples/file.rs similarity index 76% rename from crates/brk_vecs/examples/main.rs rename to crates/brk_vecs/examples/file.rs index 7e35cd4c6..cef7d2b92 100644 --- a/crates/brk_vecs/examples/main.rs +++ b/crates/brk_vecs/examples/file.rs @@ -8,9 +8,13 @@ fn main() -> Result<()> { let file = File::open(Path::new("vecs"))?; - // file.set_min_len(PAGE_SIZE * 1_000_000)?; + let file_min_len = PAGE_SIZE * 1_000_000; + let min_regions = 20_000; - let region1_i = file.create_region_if_needed("region1")?; + file.set_min_len(file_min_len)?; + file.set_min_regions(min_regions)?; + + let (region1_i, region1) = file.create_region_if_needed("region1")?; { let layout = file.layout(); @@ -25,16 +29,16 @@ fn main() -> Result<()> { .is_some_and(|i| i == region1_i) ); - let region = file.get_region(region1_i)?; + let region = file.get_region(region1_i.into())?; assert!(region.start() == 0); assert!(region.len() == 0); assert!(region.reserved() == PAGE_SIZE); } - file.write_all_to_region(region1_i, &[0, 1, 2, 3, 4])?; + file.write_all_to_region(region1_i.into(), &[0, 1, 2, 3, 4])?; { - let region = file.get_region(region1_i)?; + let region = file.get_region(region1_i.into())?; assert!(region.start() == 0); assert!(region.len() == 5); assert!(region.reserved() == PAGE_SIZE); @@ -42,10 +46,10 @@ fn main() -> Result<()> { assert!(file.mmap()[0..10] == [0, 1, 2, 3, 4, 0, 0, 0, 0, 0]); } - file.write_all_to_region(region1_i, &[5, 6, 7, 8, 9])?; + file.write_all_to_region(region1_i.into(), &[5, 6, 7, 8, 9])?; { - let region = file.get_region(region1_i)?; + let region = file.get_region(region1_i.into())?; assert!(region.start() == 0); assert!(region.len() == 10); assert!(region.reserved() == PAGE_SIZE); @@ -53,10 +57,10 @@ fn main() -> Result<()> { assert!(file.mmap()[0..10] == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); } - file.write_all_to_region_at(region1_i, &[1, 2], 0)?; + file.write_all_to_region_at(region1_i.into(), &[1, 2], 0)?; { - let region = file.get_region(region1_i)?; + let region = file.get_region(region1_i.into())?; assert!(region.start() == 0); assert!(region.len() == 10); assert!(region.reserved() == PAGE_SIZE); @@ -64,10 +68,10 @@ fn main() -> Result<()> { assert!(file.mmap()[0..10] == [1, 2, 2, 3, 4, 5, 6, 7, 8, 9]); } - file.write_all_to_region_at(region1_i, &[10, 11, 12, 13, 14, 15, 16, 17, 18], 4)?; + file.write_all_to_region_at(region1_i.into(), &[10, 11, 12, 13, 14, 15, 16, 17, 18], 4)?; { - let region = file.get_region(region1_i)?; + let region = file.get_region(region1_i.into())?; assert!(region.start() == 0); assert!(region.len() == 13); assert!(region.reserved() == PAGE_SIZE); @@ -80,10 +84,10 @@ fn main() -> Result<()> { ); } - file.write_all_to_region_at(region1_i, &[1], 18)?; + file.write_all_to_region_at(region1_i.into(), &[1], 18)?; { - let region = file.get_region(region1_i)?; + let region = file.get_region(region1_i.into())?; assert!(region.start() == 0); assert!(region.len() == 19); assert!(region.reserved() == PAGE_SIZE); @@ -96,10 +100,10 @@ fn main() -> Result<()> { ); } - file.write_all_to_region_at(region1_i, &[1; 8000], 0)?; + file.write_all_to_region_at(region1_i.into(), &[1; 8000], 0)?; { - let region = file.get_region(region1_i)?; + let region = file.get_region(region1_i.into())?; assert!(region.start() == 0); assert!(region.len() == 8000); assert!(region.reserved() == PAGE_SIZE * 2); @@ -109,13 +113,13 @@ fn main() -> Result<()> { } println!("Disk usage - pre sync: {}", file.disk_usage()); - file.sync_data()?; + file.flush()?; println!("Disk usage - post sync: {}", file.disk_usage()); - file.truncate_region(region1_i, 10)?; + file.truncate_region(region1_i.into(), 10)?; { - let region = file.get_region(region1_i)?; + let region = file.get_region(region1_i.into())?; assert!(region.start() == 0); assert!(region.len() == 10); assert!(region.reserved() == PAGE_SIZE * 2); @@ -125,10 +129,12 @@ fn main() -> Result<()> { assert!(file.mmap()[4095..=4096] == [1, 0]); } - file.sync_data()?; + file.flush()?; println!("Disk usage - post trunc: {}", file.disk_usage()); - file.remove_region(region1_i)?; + file.remove_region(region1_i.into())?; + + file.flush()?; println!("Disk usage - post remove: {}", file.disk_usage()); @@ -144,23 +150,23 @@ fn main() -> Result<()> { assert!(layout.start_to_hole().is_empty()); } - let region1_i = file.create_region_if_needed("region1")?; - let region2_i = file.create_region_if_needed("region2")?; - let region3_i = file.create_region_if_needed("region3")?; + let (region1_i, region1) = file.create_region_if_needed("region1")?; + let (region2_i, region2) = file.create_region_if_needed("region2")?; + let (region3_i, region3) = file.create_region_if_needed("region3")?; { let regions = file.regions(); let index_to_region = regions.index_to_region(); assert!(index_to_region.len() == 3); - let region1 = file.get_region(region1_i)?; + let region1 = file.get_region(region1_i.into())?; assert!(region1.start() == 0); assert!(region1.len() == 0); assert!(region1.reserved() == PAGE_SIZE); - let region2 = file.get_region(region2_i)?; + let region2 = file.get_region(region2_i.into())?; assert!(region2.start() == PAGE_SIZE); assert!(region2.len() == 0); assert!(region2.reserved() == PAGE_SIZE); - let region3 = file.get_region(region3_i)?; + let region3 = file.get_region(region3_i.into())?; assert!(region3.start() == PAGE_SIZE * 2); assert!(region3.len() == 0); assert!(region3.reserved() == PAGE_SIZE); @@ -179,23 +185,23 @@ fn main() -> Result<()> { assert!(layout.start_to_hole().is_empty()); } - file.remove_region(region2_i)?; + file.remove_region(region2_i.into())?; { let regions = file.regions(); let index_to_region = regions.index_to_region(); assert!(index_to_region.len() == 3); - let region1 = file.get_region(region1_i)?; + let region1 = file.get_region(region1_i.into())?; assert!(region1.start() == 0); assert!(region1.len() == 0); assert!(region1.reserved() == PAGE_SIZE); - assert!(file.get_region(region2_i).is_err()); + assert!(file.get_region(region2_i.into()).is_err()); assert!( index_to_region .get(region2_i) .is_some_and(|opt| opt.is_none()) ); - let region3 = file.get_region(region3_i)?; + let region3 = file.get_region(region3_i.into())?; assert!(region3.start() == PAGE_SIZE * 2); assert!(region3.len() == 0); assert!(region3.reserved() == PAGE_SIZE); @@ -216,32 +222,35 @@ fn main() -> Result<()> { drop(regions); drop(layout); - assert!(file.remove_region(region2_i).is_ok_and(|o| o.is_none())); + assert!( + file.remove_region(region2_i.into()) + .is_ok_and(|o| o.is_none()) + ); } - let region2_i = file.create_region_if_needed("region2")?; + let (region2_i, region2) = file.create_region_if_needed("region2")?; { assert!(region2_i == 1) } - file.remove_region(region2_i)?; + file.remove_region(region2_i.into())?; { let regions = file.regions(); let index_to_region = regions.index_to_region(); assert!(index_to_region.len() == 3); - let region1 = file.get_region(region1_i)?; + let region1 = file.get_region(region1_i.into())?; assert!(region1.start() == 0); assert!(region1.len() == 0); assert!(region1.reserved() == PAGE_SIZE); - assert!(file.get_region(region2_i).is_err()); + assert!(file.get_region(region2_i.into()).is_err()); assert!( index_to_region .get(region2_i) .is_some_and(|opt| opt.is_none()) ); - let region3 = file.get_region(region3_i)?; + let region3 = file.get_region(region3_i.into())?; assert!(region3.start() == PAGE_SIZE * 2); assert!(region3.len() == 0); assert!(region3.reserved() == PAGE_SIZE); @@ -262,26 +271,29 @@ fn main() -> Result<()> { drop(regions); drop(layout); - assert!(file.remove_region(region2_i).is_ok_and(|o| o.is_none())); + assert!( + file.remove_region(region2_i.into()) + .is_ok_and(|o| o.is_none()) + ); } - file.write_all_to_region_at(region1_i, &[1; 8000], 0)?; + file.write_all_to_region_at(region1_i.into(), &[1; 8000], 0)?; { let regions = file.regions(); let index_to_region = regions.index_to_region(); assert!(index_to_region.len() == 3); - let region1 = file.get_region(region1_i)?; + let region1 = file.get_region(region1_i.into())?; assert!(region1.start() == 0); assert!(region1.len() == 8000); assert!(region1.reserved() == 2 * PAGE_SIZE); - assert!(file.get_region(region2_i).is_err()); + assert!(file.get_region(region2_i.into()).is_err()); assert!( index_to_region .get(region2_i) .is_some_and(|opt| opt.is_none()) ); - let region3 = file.get_region(region3_i)?; + let region3 = file.get_region(region3_i.into())?; assert!(region3.start() == PAGE_SIZE * 2); assert!(region3.len() == 0); assert!(region3.reserved() == PAGE_SIZE); @@ -300,21 +312,21 @@ fn main() -> Result<()> { assert!(start_to_hole.is_empty()); } - let region2_i = file.create_region_if_needed("region2")?; + let (region2_i, region2) = file.create_region_if_needed("region2")?; { let regions = file.regions(); let index_to_region = regions.index_to_region(); assert!(index_to_region.len() == 3); - let region1 = file.get_region(region1_i)?; + let region1 = file.get_region(region1_i.into())?; assert!(region1.start() == 0); assert!(region1.len() == 8000); assert!(region1.reserved() == 2 * PAGE_SIZE); - let region2 = file.get_region(region2_i)?; + let region2 = file.get_region(region2_i.into())?; assert!(region2.start() == PAGE_SIZE * 3); assert!(region2.len() == 0); assert!(region2.reserved() == PAGE_SIZE); - let region3 = file.get_region(region3_i)?; + let region3 = file.get_region(region3_i.into())?; assert!(region3.start() == PAGE_SIZE * 2); assert!(region3.len() == 0); assert!(region3.reserved() == PAGE_SIZE); @@ -334,21 +346,21 @@ fn main() -> Result<()> { assert!(start_to_hole.is_empty()); } - file.remove_region(region3_i)?; + file.remove_region(region3_i.into())?; { let regions = file.regions(); let index_to_region = regions.index_to_region(); assert!(index_to_region.len() == 3); - let region1 = file.get_region(region1_i)?; + let region1 = file.get_region(region1_i.into())?; assert!(region1.start() == 0); assert!(region1.len() == 8000); assert!(region1.reserved() == 2 * PAGE_SIZE); - let region2 = file.get_region(region2_i)?; + let region2 = file.get_region(region2_i.into())?; assert!(region2.start() == PAGE_SIZE * 3); assert!(region2.len() == 0); assert!(region2.reserved() == PAGE_SIZE); - assert!(file.get_region(region3_i).is_err()); + assert!(file.get_region(region3_i.into()).is_err()); let id_to_index = regions.id_to_index(); assert!(id_to_index.len() == 2); assert!(id_to_index.get("region1") == Some(&0)); @@ -364,21 +376,21 @@ fn main() -> Result<()> { assert!(start_to_hole.get(&(PAGE_SIZE * 2)) == Some(&PAGE_SIZE)); } - file.write_all_to_region(region1_i, &[1; 8000])?; + file.write_all_to_region(region1_i.into(), &[1; 8000])?; { let regions = file.regions(); let index_to_region = regions.index_to_region(); assert!(index_to_region.len() == 3); - let region1 = file.get_region(region1_i)?; + let region1 = file.get_region(region1_i.into())?; assert!(region1.start() == PAGE_SIZE * 4); assert!(region1.len() == 16_000); assert!(region1.reserved() == 4 * PAGE_SIZE); - let region2 = file.get_region(region2_i)?; + let region2 = file.get_region(region2_i.into())?; assert!(region2.start() == PAGE_SIZE * 3); assert!(region2.len() == 0); assert!(region2.reserved() == PAGE_SIZE); - assert!(file.get_region(region3_i).is_err()); + assert!(file.get_region(region3_i.into()).is_err()); let id_to_index = regions.id_to_index(); assert!(id_to_index.len() == 2); assert!(id_to_index.get("region1") == Some(&0)); @@ -394,11 +406,11 @@ fn main() -> Result<()> { assert!(start_to_hole.get(&0) == Some(&(PAGE_SIZE * 3))); } - file.write_all_to_region(region2_i, &[1; 6000])?; + file.write_all_to_region(region2_i.into(), &[1; 6000])?; - let region4_i = file.create_region_if_needed("region4")?; - file.remove_region(region2_i); - file.remove_region(region4_i); + let (region4_i, region4) = file.create_region_if_needed("region4")?; + file.remove_region(region2_i.into()); + file.remove_region(region4_i.into()); dbg!(file.regions()); dbg!(file.layout()); diff --git a/crates/brk_vecs/examples/raw.rs b/crates/brk_vecs/examples/raw.rs new file mode 100644 index 000000000..8445b8288 --- /dev/null +++ b/crates/brk_vecs/examples/raw.rs @@ -0,0 +1,151 @@ +use std::{fs, path::Path, sync::Arc}; + +use brk_core::{DateIndex, Version}; +use brk_vecs::{AnyVec, CollectableVec, File, GenericStoredVec, RawVec, Stamp, VecIterator}; + +type I = DateIndex; +#[allow(clippy::upper_case_acronyms)] +type VEC = RawVec; + +fn main() -> Result<(), Box> { + let _ = fs::remove_dir_all("raw"); + + let version = Version::TWO; + // let format = Format::Raw; + // + let file = Arc::new(File::open(Path::new("raw"))?); + + { + let mut vec: VEC = RawVec::forced_import(&file, "vec", version)?; + + (0..21_u32).for_each(|v| { + vec.push(v); + }); + + let mut iter = vec.into_iter(); + dbg!(iter.get(0.into())); + dbg!(iter.get(1.into())); + dbg!(iter.get(2.into())); + dbg!(iter.get(20.into())); + dbg!(iter.get(21.into())); + drop(iter); + + vec.flush()?; + + dbg!(vec.header()); + } + + { + let mut vec: VEC = RawVec::forced_import(&file, "vec", version)?; + + vec.mut_header().update_stamp(Stamp::new(100)); + + let mut iter = vec.into_iter(); + dbg!(iter.get(0.into())); + dbg!(iter.get(1.into())); + dbg!(iter.get(2.into())); + dbg!(iter.get(3.into())); + dbg!(iter.get(4.into())); + dbg!(iter.get(5.into())); + dbg!(iter.get(20.into())); + dbg!(iter.get(20.into())); + dbg!(iter.get(0.into())); + drop(iter); + + vec.push(21); + vec.push(22); + + let mut iter = vec.into_iter(); + dbg!(iter.get(20.into())); + dbg!(iter.get(21.into())); + dbg!(iter.get(22.into())); + dbg!(iter.get(23.into())); + drop(iter); + + vec.flush()?; + } + + { + let mut vec: VEC = RawVec::forced_import(&file, "vec", version)?; + + let mut iter = vec.into_iter(); + dbg!(iter.get(0.into())); + dbg!(iter.get(20.into())); + dbg!(iter.get(21.into())); + dbg!(iter.get(22.into())); + drop(iter); + + vec.truncate_if_needed(14.into())?; + + let mut iter = vec.into_iter(); + dbg!(iter.get(0.into())); + dbg!(iter.get(5.into())); + dbg!(iter.get(20.into())); + drop(iter); + + dbg!(vec.collect_signed_range(Some(-5), None)?); + + vec.push(vec.len() as u32); + dbg!(VecIterator::last(vec.into_iter())); + + dbg!(vec.into_iter().collect::>()); + } + + { + let mut vec: VEC = RawVec::forced_import(&file, "vec", version)?; + + vec.reset()?; + + dbg!(vec.header(), vec.pushed_len(), vec.stored_len(), vec.len()); + + (0..21_u32).for_each(|v| { + vec.push(v); + }); + + let mut iter = vec.into_iter(); + dbg!(iter.get(0.into())); + dbg!(iter.get(20.into())); + dbg!(iter.get(21.into())); + drop(iter); + + let reader = vec.create_static_reader(); + dbg!(vec.take(10.into(), &reader)?); + dbg!(vec.get_or_read(10.into(), &reader)?); + dbg!(vec.holes()); + drop(reader); + + vec.flush()?; + dbg!(vec.holes()); + } + + { + let mut vec: VEC = RawVec::forced_import(&file, "vec", version)?; + + dbg!(vec.holes()); + + let reader = vec.create_static_reader(); + dbg!(vec.get_or_read(10.into(), &reader)?); + drop(reader); + + vec.update(10.into(), 10)?; + vec.update(0.into(), 10)?; + + let reader = vec.create_static_reader(); + dbg!( + vec.holes(), + vec.get_or_read(0.into(), &reader)?, + vec.get_or_read(10.into(), &reader)? + ); + drop(reader); + + vec.flush()?; + } + + { + let vec: VEC = RawVec::forced_import(&file, "vec", version)?; + + dbg!(vec.collect()?); + } + + Ok(()) +} diff --git a/crates/brk_vecs/src/file/identifier.rs b/crates/brk_vecs/src/file/identifier.rs new file mode 100644 index 000000000..d081fb81a --- /dev/null +++ b/crates/brk_vecs/src/file/identifier.rs @@ -0,0 +1,23 @@ +#[derive(Debug, Clone)] +pub enum Identifier { + Number(usize), + String(String), +} + +impl<'a> From<&'a str> for Identifier { + fn from(value: &'a str) -> Self { + Self::String(value.to_owned()) + } +} + +impl From for Identifier { + fn from(value: String) -> Self { + Self::String(value) + } +} + +impl From for Identifier { + fn from(value: usize) -> Self { + Self::Number(value) + } +} diff --git a/crates/brk_vecs/src/file/layout.rs b/crates/brk_vecs/src/file/layout.rs index c630f78aa..848697907 100644 --- a/crates/brk_vecs/src/file/layout.rs +++ b/crates/brk_vecs/src/file/layout.rs @@ -110,15 +110,12 @@ impl Layout { .is_some_and(|&hole_start| hole_start > region_start) }) { - // dbg!("Remove last hole"); self.start_to_hole.pop_last(); } else if let Some((&hole_start, gap)) = self.start_to_hole.range_mut(..start).next_back() && hole_start + *gap == start { - // dbg!("Expand hole"); *gap += reserved; } else { - // dbg!("Insert hole"); self.start_to_hole.insert(start, reserved); } diff --git a/crates/brk_vecs/src/file/mod.rs b/crates/brk_vecs/src/file/mod.rs index 851eb2ff2..5f0792bdd 100644 --- a/crates/brk_vecs/src/file/mod.rs +++ b/crates/brk_vecs/src/file/mod.rs @@ -1,6 +1,5 @@ use std::{ fs::{self, OpenOptions}, - io::Write, os::unix::io::AsRawFd, path::{Path, PathBuf}, sync::Arc, @@ -11,14 +10,16 @@ use libc::off_t; use memmap2::{MmapMut, MmapOptions}; use parking_lot::{RwLock, RwLockReadGuard}; +mod identifier; mod layout; mod reader; mod region; mod regions; +pub use identifier::*; use layout::*; -pub use reader::Reader; -use region::*; +pub use reader::*; +pub use region::*; use regions::*; pub const PAGE_SIZE: u64 = 4096; @@ -65,7 +66,8 @@ impl File { pub fn set_min_len(&self, len: u64) -> Result<()> { let len = Self::ceil_number_to_page_size_multiple(len); - if self.file_len()? < len { + let file_len = self.file_len()?; + if file_len < len { let mut mmap = self.mmap.write(); let file = self.file.write(); file.set_len(len)?; @@ -83,10 +85,13 @@ impl File { self.set_min_len(regions as u64 * PAGE_SIZE) } - pub fn create_region_if_needed(&self, id: &str) -> Result { - if let Some(index) = self.regions.read().get_region_index_from_id(id) { - return Ok(index); + pub fn create_region_if_needed(&self, id: &str) -> Result<(usize, Arc>)> { + let regions = self.regions.read(); + if let Some(index) = regions.get_region_index_from_id(id) { + return Ok((index, regions.get_region_from_index(index).unwrap())); } + drop(regions); + let mut regions = self.regions.write(); let mut layout = self.layout.write(); @@ -102,51 +107,64 @@ impl File { region.start() + region.reserved() }) .unwrap_or_default(); - self.set_min_len(start + PAGE_SIZE)?; + + let len = start + PAGE_SIZE; + + self.set_min_len(len)?; + start }; - let index = regions.create_region(id.to_owned(), start)?; + let (index, region) = regions.create_region(id.to_owned(), start)?; layout.insert_region(start, index); - Ok(index) + Ok((index, region)) } - pub fn get_region(&self, index: usize) -> Result> { + pub fn get_region(&self, identifier: Identifier) -> Result> { let regions = self.regions.read(); - let region_opt = regions.get_region_from_index(index); + let region_opt = regions.get_region(identifier); let region_arc = region_opt.ok_or(Error::Str("Unknown region"))?; let region = region_arc.read(); let region: RwLockReadGuard<'static, Region> = unsafe { std::mem::transmute(region) }; Ok(region) } - pub fn read_region<'a>(&'a self, index: usize) -> Result> { + pub fn create_region_reader<'a>(&'a self, identifier: Identifier) -> Result> { let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read(); - let region = self.get_region(index)?; + let region = self.get_region(identifier)?; Ok(Reader::new(mmap, region)) } #[inline] - pub fn write_all_to_region(&self, region: usize, data: &[u8]) -> Result<()> { - self.write_all_to_region_at_(region, data, None) + pub fn write_all_to_region(&self, identifier: Identifier, data: &[u8]) -> Result<()> { + self.write_all_to_region_at_(identifier, data, None) } #[inline] - pub fn write_all_to_region_at(&self, region: usize, data: &[u8], at: u64) -> Result<()> { - self.write_all_to_region_at_(region, data, Some(at)) + pub fn write_all_to_region_at( + &self, + identifier: Identifier, + data: &[u8], + at: u64, + ) -> Result<()> { + self.write_all_to_region_at_(identifier, data, Some(at)) } fn write_all_to_region_at_( &self, - region_index: usize, + identifier: Identifier, data: &[u8], at: Option, ) -> Result<()> { - let Some(region) = self.regions.read().get_region_from_index(region_index) else { + let regions = self.regions.read(); + let Some(region) = regions.get_region(identifier.clone()) else { return Err(Error::Str("Unknown region")); }; + let region_index = regions.identifier_to_index(identifier).unwrap(); + drop(regions); + let region_lock = region.read(); let start = region_lock.start(); let reserved = region_lock.reserved(); @@ -300,8 +318,9 @@ impl File { slice[start..end].copy_from_slice(data); } - pub fn truncate_region(&self, index: usize, from: u64) -> Result<()> { - let Some(region) = self.regions.read().get_region_from_index(index) else { + /// From relative to start + pub fn truncate_region(&self, identifier: Identifier, from: u64) -> Result<()> { + let Some(region) = self.regions.read().get_region(identifier) else { return Err(Error::Str("Unknown region")); }; let mut region_ = region.write(); @@ -309,16 +328,18 @@ impl File { let len = region_.len(); let reserved = region_.reserved(); - if from < start { - return Err(Error::Str("Truncating too much")); - } else if from >= len { - return Err(Error::Str("Not truncating enough")); + // dbg!(from, start); + + if from == len { + return Ok(()); + } else if from > len { + return Err(Error::Str("Truncating further than length")); } region_.set_len(from); let end = start + reserved; - let start = Self::ceil_number_to_page_size_multiple(from); + let start = Self::ceil_number_to_page_size_multiple(start + from); if start > end { unreachable!("Should not be possible"); } else if start < end { @@ -327,18 +348,27 @@ impl File { Ok(()) } - pub fn remove_region(&self, index: usize) -> Result>>> { + pub fn remove_region(&self, identifier: Identifier) -> Result>>> { let mut regions = self.regions.write(); + let mut layout = self.layout.write(); - let Some(region) = regions.remove_region(index)? else { + + let index_opt = regions.identifier_to_index(identifier.clone()); + + let Some(region) = regions.remove_region(identifier)? else { return Ok(None); }; - // dbg!(®ions); + + let index = index_opt.unwrap(); + let region_ = region.write(); + layout.remove_region(index, ®ion_)?; - // dbg!(layout); + self.punch_hole(region_.start(), region_.reserved())?; + drop(region_); + Ok(Some(region)) } @@ -400,7 +430,7 @@ impl File { } pub fn flush(&self) -> Result<()> { - self.file.write().flush().map_err(|e| e.into()) + self.mmap.write().flush().map_err(|e| e.into()) } pub fn sync_data(&self) -> Result<()> { diff --git a/crates/brk_vecs/src/file/reader.rs b/crates/brk_vecs/src/file/reader.rs index 372e1b551..88c151a7f 100644 --- a/crates/brk_vecs/src/file/reader.rs +++ b/crates/brk_vecs/src/file/reader.rs @@ -18,12 +18,16 @@ impl<'a> Reader<'a> { } pub fn read(&self, offset: u64, len: u64) -> &[u8] { - debug_assert!(offset + len < self.region.len()); + debug_assert!(offset + len <= self.region.len()); let start = self.region.start() + offset; let end = start + len; &self.mmap[start as usize..end as usize] } + pub fn read_all(&self) -> &[u8] { + self.read(0, self.region().len()) + } + pub fn region(&self) -> &Region { &self.region } diff --git a/crates/brk_vecs/src/file/region.rs b/crates/brk_vecs/src/file/region.rs index 0bff03d92..3803074be 100644 --- a/crates/brk_vecs/src/file/region.rs +++ b/crates/brk_vecs/src/file/region.rs @@ -1,6 +1,8 @@ +use memmap2::MmapMut; +use parking_lot::RwLockReadGuard; use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; -use crate::PAGE_SIZE; +use super::{File, PAGE_SIZE, Reader}; #[derive(Debug, Clone, FromBytes, IntoBytes, Immutable, KnownLayout)] #[repr(C)] @@ -62,3 +64,16 @@ impl Region { self.reserved - self.len } } + +pub trait RegionReader { + fn create_reader(self, file: &File) -> Reader<'_>; +} + +impl<'a> RegionReader for RwLockReadGuard<'a, Region> { + fn create_reader(self, file: &File) -> Reader<'static> { + let region: RwLockReadGuard<'static, Region> = unsafe { std::mem::transmute(self) }; + let mmap: RwLockReadGuard<'static, MmapMut> = + unsafe { std::mem::transmute(file.mmap.read()) }; + Reader::new(mmap, region) + } +} diff --git a/crates/brk_vecs/src/file/regions.rs b/crates/brk_vecs/src/file/regions.rs index e33414311..0a3a5cfd2 100644 --- a/crates/brk_vecs/src/file/regions.rs +++ b/crates/brk_vecs/src/file/regions.rs @@ -12,9 +12,9 @@ use memmap2::MmapMut; use parking_lot::RwLock; use zerocopy::{FromBytes, IntoBytes}; -use crate::{ - PAGE_SIZE, - file::region::{Region, SIZE_OF_REGION}, +use super::{ + Identifier, PAGE_SIZE, + region::{Region, SIZE_OF_REGION}, }; #[derive(Debug)] @@ -92,7 +92,11 @@ impl Regions { Ok(()) } - pub fn create_region(&mut self, id: String, start: u64) -> Result { + pub fn create_region( + &mut self, + id: String, + start: u64, + ) -> Result<(usize, Arc>)> { let index = self .index_to_region .iter() @@ -103,36 +107,25 @@ impl Regions { let region = Region::new(start, 0, PAGE_SIZE); - let region_arc = Some(Arc::new(RwLock::new(region.clone()))); - if index < self.index_to_region.len() { - self.index_to_region[index] = region_arc - } else { - self.index_to_region.push(region_arc); - } - self.set_min_len(((index + 1) * SIZE_OF_REGION) as u64)?; self.write_to_mmap(®ion, index); + let region_arc = Arc::new(RwLock::new(region)); + + let region_opt = Some(region_arc.clone()); + if index < self.index_to_region.len() { + self.index_to_region[index] = region_opt + } else { + self.index_to_region.push(region_opt); + } + if self.id_to_index.insert(id, index).is_some() { return Err(Error::Str("Already exists")); } self.flush_id_to_index()?; - Ok(index) - } - - pub fn remove_region(&mut self, index: usize) -> Result>>> { - let Some(region) = self.index_to_region.get_mut(index).and_then(Option::take) else { - return Ok(None); - }; - - self.id_to_index - .remove(&self.find_id_from_index(index).unwrap().to_owned()); - - self.flush_id_to_index()?; - - Ok(Some(region)) + Ok((index, region_arc)) } fn flush_id_to_index(&mut self) -> Result<()> { @@ -141,10 +134,26 @@ impl Regions { Ok(()) } + #[inline] + pub fn get_region(&self, identifier: Identifier) -> Option>> { + match identifier { + Identifier::Number(index) => self.get_region_from_index(index), + Identifier::String(id) => self.get_region_from_id(&id), + } + } + + #[inline] pub fn get_region_from_index(&self, index: usize) -> Option>> { self.index_to_region.get(index).cloned().flatten() } + #[inline] + pub fn get_region_from_id(&self, id: &str) -> Option>> { + self.get_region_index_from_id(id) + .and_then(|index| self.get_region_from_index(index)) + } + + #[inline] pub fn get_region_index_from_id(&self, id: &str) -> Option { self.id_to_index.get(id).copied() } @@ -159,22 +168,65 @@ impl Regions { ) } + #[inline] pub fn index_to_region(&self) -> &[Option>>] { &self.index_to_region } + #[inline] pub fn id_to_index(&self) -> &HashMap { &self.id_to_index } + #[inline] + pub fn identifier_to_index(&self, identifier: Identifier) -> Option { + match identifier { + Identifier::Number(index) => Some(index), + Identifier::String(id) => self.get_region_index_from_id(&id), + } + } + + pub fn remove_region(&mut self, identifier: Identifier) -> Result>>> { + match identifier { + Identifier::Number(index) => self.remove_region_from_index(index), + Identifier::String(id) => self.remove_region_from_id(&id), + } + } + + pub fn remove_region_from_id(&mut self, id: &str) -> Result>>> { + let Some(index) = self.get_region_index_from_id(id) else { + return Ok(None); + }; + self.remove_region_from_index(index) + } + + pub fn remove_region_from_index( + &mut self, + index: usize, + ) -> Result>>> { + let Some(region) = self.index_to_region.get_mut(index).and_then(Option::take) else { + return Ok(None); + }; + + self.id_to_index + .remove(&self.find_id_from_index(index).unwrap().to_owned()); + + self.flush_id_to_index()?; + + Ok(Some(region)) + } + pub fn write_to_mmap(&self, region: &Region, index: usize) { let start = index * SIZE_OF_REGION; let end = start + SIZE_OF_REGION; let mmap = &self.index_to_region_mmap; + if end > mmap.len() { unreachable!("Trying to write beyond mmap") } + let slice = unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) }; + slice[start..end].copy_from_slice(region.as_bytes()); } } diff --git a/crates/brk_vecs/src/lib.rs b/crates/brk_vecs/src/lib.rs index 429d3e99d..3fd098fcf 100644 --- a/crates/brk_vecs/src/lib.rs +++ b/crates/brk_vecs/src/lib.rs @@ -3,8 +3,8 @@ mod traits; mod variants; use file::*; -use traits::*; use variants::*; -pub use file::File; +pub use file::{File, PAGE_SIZE}; +pub use traits::*; pub use variants::{RawVec, Stamp, StampedVec}; diff --git a/crates/brk_vecs/src/traits/generic.rs b/crates/brk_vecs/src/traits/generic.rs index 9c880af11..52c26dcce 100644 --- a/crates/brk_vecs/src/traits/generic.rs +++ b/crates/brk_vecs/src/traits/generic.rs @@ -5,9 +5,8 @@ use std::{ }; use brk_core::{Error, Result}; -use memmap2::Mmap; -use crate::{AnyVec, HEADER_OFFSET, Header}; +use crate::{AnyVec, File, HEADER_OFFSET, Header, file::Reader}; use super::{StoredIndex, StoredType}; @@ -20,21 +19,21 @@ where const SIZE_OF_T: usize = size_of::(); #[inline] - fn unwrap_read(&self, index: I, mmap: &Mmap) -> T { - self.read(index, mmap).unwrap().unwrap() + fn unwrap_read(&self, index: I, reader: &Reader<'_>) -> T { + self.read(index, reader).unwrap().unwrap() } #[inline] - fn read(&self, index: I, mmap: &Mmap) -> Result> { - self.read_(index.to_usize()?, mmap) + fn read(&self, index: I, reader: &Reader<'_>) -> Result> { + self.read_(index.to_usize()?, reader) } - fn read_(&self, index: usize, mmap: &Mmap) -> Result>; + fn read_(&self, index: usize, reader: &Reader<'_>) -> Result>; #[inline] - fn get_or_read(&self, index: I, mmap: &Mmap) -> Result>> { - self.get_or_read_(index.to_usize()?, mmap) + fn get_or_read(&self, index: I, reader: &Reader<'_>) -> Result>> { + self.get_or_read_(index.to_usize()?, reader) } #[inline] - fn get_or_read_(&self, index: usize, mmap: &Mmap) -> Result>> { + fn get_or_read_(&self, index: usize, reader: &Reader<'_>) -> Result>> { let stored_len = self.stored_len(); if index >= stored_len { @@ -58,7 +57,7 @@ where return Ok(None); } - Ok(self.read_(index, mmap)?.map(Cow::Owned)) + Ok(self.read_(index, reader)?.map(Cow::Owned)) } #[inline] @@ -66,10 +65,6 @@ where self.stored_len() + self.pushed_len() } - fn index_to_name(&self) -> String { - format!("{}_to_{}", I::to_string(), self.name()) - } - fn stored_len(&self) -> usize; fn pushed(&self) -> &[T]; @@ -122,8 +117,8 @@ where fn holes(&self) -> &BTreeSet; fn mut_holes(&mut self) -> &mut BTreeSet; - fn take(&mut self, index: I, mmap: &Mmap) -> Result> { - let opt = self.get_or_read(index, mmap)?.map(|v| v.into_owned()); + fn take(&mut self, index: I, reader: &Reader<'_>) -> Result> { + let opt = self.get_or_read(index, reader)?.map(|v| v.into_owned()); if opt.is_some() { self.unchecked_delete(index); } @@ -179,12 +174,9 @@ where #[inline] fn reset_(&mut self) -> Result<()> { - let holes_path = self.holes_path(); - if fs::exists(&holes_path)? { - fs::remove_file(&holes_path)?; - } - let mut file = self.open_file()?; - self.file_truncate_and_write_all(&mut file, HEADER_OFFSET as u64, &[]) + self.file().remove_region(self.holes_region_name().into())?; + self.file() + .truncate_region(self.region_index().into(), HEADER_OFFSET as u64) } #[inline] @@ -201,7 +193,49 @@ where index < self.len_() } + fn file(&self) -> &File; + + fn region_index(&self) -> usize; + + /// Be careful with deadlocks + /// + /// You'll want to drop the reader before mutable ops + fn create_reader(&self) -> Reader<'_> { + self.create_static_reader() + } + + /// Be careful with deadlocks + /// + /// You'll want to drop the reader before mutable ops + fn create_static_reader(&self) -> Reader<'static> { + unsafe { + std::mem::transmute( + self.file() + .create_region_reader(self.region_index().into()) + .unwrap(), + ) + } + } + fn flush(&mut self) -> Result<()>; fn truncate_if_needed(&mut self, index: I) -> Result<()>; + + fn index_to_name(&self) -> String { + format!("{}_to_{}", I::to_string(), self.name()) + } + + fn vec_region_name(&self) -> String { + Self::vec_region_name_(self.name()) + } + fn vec_region_name_(name: &str) -> String { + format!("{name}_{}", I::to_string()) + } + + fn holes_region_name(&self) -> String { + Self::holes_region_name_(self.name()) + } + fn holes_region_name_(name: &str) -> String { + format!("{}_holes", Self::vec_region_name_(name)) + } } diff --git a/crates/brk_vecs/src/variants/compressed/mod.rs b/crates/brk_vecs/src/variants/compressed/mod.rs index e69de29bb..794dcb29e 100644 --- a/crates/brk_vecs/src/variants/compressed/mod.rs +++ b/crates/brk_vecs/src/variants/compressed/mod.rs @@ -0,0 +1,556 @@ +use std::{ + borrow::Cow, + collections::{BTreeMap, BTreeSet}, + fs, mem, + sync::Arc, +}; + +use brk_core::{Error, Result, Version}; +use memmap2::Mmap; +use parking_lot::{RwLock, RwLockReadGuard}; +use rayon::prelude::*; +use zstd::DEFAULT_COMPRESSION_LEVEL; + +use crate::{ + AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec, + File, GenericStoredVec, HEADER_OFFSET, Header, RawVec, Reader, StoredIndex, StoredType, + UnsafeSlice, +}; + +mod compressed_page_meta; +mod compressed_pages_meta; + +use compressed_page_meta::*; +use compressed_pages_meta::*; + +const ONE_KIB: usize = 1024; +const ONE_MIB: usize = ONE_KIB * ONE_KIB; +pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; +pub const MAX_PAGE_SIZE: usize = 64 * ONE_KIB; + +const VERSION: Version = Version::TWO; + +#[derive(Debug)] +pub struct CompressedVec { + inner: RawVec, + pages_meta: Arc>, +} + +impl CompressedVec +where + I: StoredIndex, + T: StoredType, +{ + pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; + pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; + pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; + + /// Same as import but will reset the vec under certain errors, so be careful ! + pub fn forced_import(file: &Arc, name: &str, mut version: Version) -> Result { + version = version + VERSION; + let res = Self::import(file, name, version); + match res { + Err(Error::DifferentCompressionMode) + | Err(Error::WrongEndian) + | Err(Error::WrongLength) + | Err(Error::DifferentVersion { .. }) => { + todo!(); + + // let path = Self::path_(file, name); + // fs::remove_file(path)?; + // Self::import(file, name, version) + } + _ => res, + } + } + + #[allow(unreachable_code, unused_variables)] + pub fn import(file: &Arc, name: &str, version: Version) -> Result { + // let mut inner = RawVec::import(file, name, version)?; + + todo!(); + + // let pages_meta = { + // let path = inner + // .folder() + // .join(format!("{}-pages-meta", I::to_string())); + // if inner.is_empty() { + // let _ = fs::remove_file(&path); + // } + // CompressedPagesMetadata::read(&path)? + // }; + + // inner.set_stored_len(if let Some(last) = pages_meta.last() { + // (pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize + // } else { + // 0 + // }); + + // Ok(Self { + // inner, + // pages_meta: Arc::new(RwLock::new(pages_meta)), + // }) + } + + fn decode_page(&self, page_index: usize, reader: &Reader<'_>) -> Result> { + Self::decode_page_( + self.stored_len(), + page_index, + reader, + &self.pages_meta.read(), + ) + } + + fn decode_page_( + stored_len: usize, + page_index: usize, + reader: &Reader<'_>, + compressed_pages_meta: &CompressedPagesMetadata, + ) -> Result> { + if Self::page_index_to_index(page_index) >= stored_len { + return Err(Error::IndexTooHigh); + } else if compressed_pages_meta.len() <= page_index { + return Err(Error::ExpectVecToHaveIndex); + } + + let page = compressed_pages_meta.get(page_index).unwrap(); + let len = page.bytes_len as usize; + let offset = page.start as usize; + + let slice = reader.read(offset as u64, (offset + len) as u64); + + Ok(zstd::decode_all(slice) + .inspect_err(|_| { + dbg!((len, offset, page_index, slice)); + })? + .chunks(Self::SIZE_OF_T) + .map(|slice| T::try_read_from_bytes(slice).unwrap()) + .collect::>()) + } + + fn compress_page(chunk: &[T]) -> Vec { + if chunk.len() > Self::PER_PAGE { + panic!(); + } + + let mut bytes: Vec = vec![0; chunk.len() * Self::SIZE_OF_T]; + + let unsafe_bytes = UnsafeSlice::new(&mut bytes); + + chunk + .into_par_iter() + .enumerate() + .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); + + zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL).unwrap() + } + + #[inline] + fn index_to_page_index(index: usize) -> usize { + index / Self::PER_PAGE + } + + #[inline] + fn page_index_to_index(page_index: usize) -> usize { + page_index * Self::PER_PAGE + } + + #[inline] + pub fn iter(&self) -> CompressedVecIterator<'_, I, T> { + self.into_iter() + } + + #[inline] + pub fn iter_at(&self, i: I) -> CompressedVecIterator<'_, I, T> { + self.iter_at_(i.unwrap_to_usize()) + } + + #[inline] + pub fn iter_at_(&self, i: usize) -> CompressedVecIterator<'_, I, T> { + let mut iter = self.into_iter(); + iter.set_(i); + iter + } +} + +impl GenericStoredVec for CompressedVec +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn read_(&self, index: usize, reader: &Reader<'_>) -> Result> { + let page_index = Self::index_to_page_index(index); + let decoded_index = index % Self::PER_PAGE; + + Ok(self + .decode_page(page_index, reader)? + .get(decoded_index) + .cloned()) + } + + fn file(&self) -> &File { + self.inner.file() + } + + fn region_index(&self) -> usize { + self.inner.region_index() + } + + fn header(&self) -> &Header { + self.inner.header() + } + + fn mut_header(&mut self) -> &mut Header { + self.inner.mut_header() + } + + #[inline] + fn stored_len(&self) -> usize { + self.inner.stored_len() + } + + #[inline] + fn pushed(&self) -> &[T] { + self.inner.pushed() + } + #[inline] + fn mut_pushed(&mut self) -> &mut Vec { + self.inner.mut_pushed() + } + #[inline] + fn holes(&self) -> &BTreeSet { + self.inner.holes() + } + #[inline] + fn mut_holes(&mut self) -> &mut BTreeSet { + panic!("unsupported") + } + #[inline] + fn updated(&self) -> &BTreeMap { + self.inner.updated() + } + #[inline] + fn mut_updated(&mut self) -> &mut BTreeMap { + panic!("unsupported") + } + + fn flush(&mut self) -> Result<()> { + todo!(); + + // let file_opt = self.inner.write_header_if_needed()?; + + // let pushed_len = self.pushed_len(); + + // if pushed_len == 0 { + // return Ok(()); + // } + + // let stored_len = self.stored_len(); + + // let mut file = file_opt.unwrap_or(self.open_file()?); + + let mut pages_meta = self.pages_meta.read(); + + // let mut starting_page_index = pages_meta.len(); + // let mut values = vec![]; + // let mut truncate_at = None; + + // if self.stored_len() % Self::PER_PAGE != 0 { + // if pages_meta.is_empty() { + // unreachable!() + // } + + // let last_page_index = pages_meta.len() - 1; + + // let mmap = unsafe { Mmap::map(&file)? }; + + // values = Self::decode_page_(stored_len, last_page_index, &mmap, &pages_meta) + // .inspect_err(|_| { + // dbg!(last_page_index, &pages_meta); + // }) + // .unwrap(); + + // truncate_at.replace(pages_meta.pop().unwrap().start); + // starting_page_index = last_page_index; + // } + + // let compressed = values + // .into_par_iter() + // .chain(mem::take(self.mut_pushed()).into_par_iter()) + // .chunks(Self::PER_PAGE) + // .map(|chunk| (Self::compress_page(chunk.as_ref()), chunk.len())) + // .collect::>(); + + // compressed + // .iter() + // .enumerate() + // .for_each(|(i, (compressed_bytes, values_len))| { + // let page_index = starting_page_index + i; + + // let start = if page_index != 0 { + // let prev = pages_meta.get(page_index - 1).unwrap(); + // prev.start + prev.bytes_len as u64 + // } else { + // 0 + // }; + // let offsetted_start = start + HEADER_OFFSET as u64; + + // let bytes_len = compressed_bytes.len() as u32; + // let values_len = *values_len as u32; + + // let page = CompressedPageMetadata::new(offsetted_start, bytes_len, values_len); + + // pages_meta.push(page_index, page); + // }); + + // let buf = compressed + // .into_iter() + // .flat_map(|(v, _)| v) + // .collect::>(); + + pages_meta.write()?; + + // if let Some(truncate_at) = truncate_at { + // self.file_set_len(&mut file, truncate_at)?; + // } + + // self.file_write_all(&mut file, &buf)?; + + // self.pages_meta.store(Arc::new(pages_meta)); + + Ok(()) + } + + fn reset(&mut self) -> Result<()> { + // let mut pages_meta = (**self.pages_meta.load()).clone(); + // pages_meta.truncate(0); + // pages_meta.write()?; + // self.pages_meta.store(Arc::new(pages_meta)); + self.reset_() + } + + fn truncate_if_needed(&mut self, index: I) -> Result<()> { + let index = index.to_usize()?; + + if index >= self.stored_len() { + return Ok(()); + } + + if index == 0 { + self.reset()?; + return Ok(()); + } + + let mut pages_meta = self.pages_meta.write(); + + let page_index = Self::index_to_page_index(index); + + let reader = self.create_static_reader(); + let values = self.decode_page(page_index, &reader)?; + drop(reader); + + let mut buf = vec![]; + + let mut page = pages_meta.truncate(page_index).unwrap(); + + let len = page.start; + + let decoded_index = index % Self::PER_PAGE; + + if decoded_index != 0 { + let chunk = &values[..decoded_index]; + + buf = Self::compress_page(chunk); + + page.values_len = chunk.len() as u32; + page.bytes_len = buf.len() as u32; + + pages_meta.push(page_index, page); + } + + pages_meta.write()?; + + // self.file_truncate_and_write_all(&mut file, len, &buf)?; + + Ok(()) + } +} + +impl AnyVec for CompressedVec +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn version(&self) -> Version { + self.inner.version() + } + + #[inline] + fn name(&self) -> &str { + self.inner.name() + } + + #[inline] + fn len(&self) -> usize { + self.len_() + } + + #[inline] + fn index_type_to_string(&self) -> &'static str { + I::to_string() + } + + #[inline] + fn value_type_to_size_of(&self) -> usize { + size_of::() + } +} + +impl Clone for CompressedVec { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + pages_meta: self.pages_meta.clone(), + } + } +} + +#[derive(Debug)] +pub struct CompressedVecIterator<'a, I, T> { + vec: &'a CompressedVec, + reader: Reader<'a>, + decoded_page: Option<(usize, Vec)>, + pages_meta: RwLockReadGuard<'a, CompressedPagesMetadata>, + stored_len: usize, + index: usize, +} + +impl CompressedVecIterator<'_, I, T> +where + I: StoredIndex, + T: StoredType, +{ + const SIZE_OF_T: usize = size_of::(); + const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; +} + +impl BaseVecIterator for CompressedVecIterator<'_, I, T> +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn mut_index(&mut self) -> &mut usize { + &mut self.index + } + + #[inline] + fn len(&self) -> usize { + self.vec.len() + } + + #[inline] + fn name(&self) -> &str { + self.vec.name() + } +} + +impl<'a, I, T> Iterator for CompressedVecIterator<'a, I, T> +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Cow<'a, T>); + + fn next(&mut self) -> Option { + let i = self.index; + let stored_len = self.stored_len; + + let result = if i >= stored_len { + let j = i - stored_len; + if j >= self.vec.pushed_len() { + return None; + } + self.vec + .pushed() + .get(j) + .map(|v| (I::from(i), Cow::Borrowed(v))) + } else { + let page_index = i / Self::PER_PAGE; + + if self.decoded_page.as_ref().is_none_or(|b| b.0 != page_index) { + let values = CompressedVec::::decode_page_( + stored_len, + page_index, + &self.reader, + &self.pages_meta, + ) + .unwrap(); + self.decoded_page.replace((page_index, values)); + } + + self.decoded_page + .as_ref() + .unwrap() + .1 + .get(i % Self::PER_PAGE) + .map(|v| (I::from(i), Cow::Owned(v.clone()))) + }; + + self.index += 1; + + result + } +} + +impl<'a, I, T> IntoIterator for &'a CompressedVec +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Cow<'a, T>); + type IntoIter = CompressedVecIterator<'a, I, T>; + + fn into_iter(self) -> Self::IntoIter { + let pages_meta = self.pages_meta.read(); + let stored_len = self.stored_len(); + + CompressedVecIterator { + vec: self, + reader: self.create_static_reader(), + decoded_page: None, + pages_meta, + index: 0, + stored_len, + } + } +} + +impl AnyIterableVec for CompressedVec +where + I: StoredIndex, + T: StoredType, +{ + fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> + where + T: 'a, + { + Box::new(self.into_iter()) + } +} + +impl AnyCollectableVec for CompressedVec +where + I: StoredIndex, + T: StoredType, +{ + fn collect_range_serde_json( + &self, + from: Option, + to: Option, + ) -> Result> { + CollectableVec::collect_range_serde_json(self, from, to) + } +} diff --git a/crates/brk_vecs/src/variants/raw/header.rs b/crates/brk_vecs/src/variants/raw/header.rs index 12e969384..f8f1d9d31 100644 --- a/crates/brk_vecs/src/variants/raw/header.rs +++ b/crates/brk_vecs/src/variants/raw/header.rs @@ -1,16 +1,11 @@ -use std::{ - fs::File, - io::{self, Seek, SeekFrom}, - os::unix::fs::FileExt, - sync::Arc, -}; +use std::sync::Arc; use brk_core::{Error, Result, Version}; use parking_lot::RwLock; use zerocopy::{FromBytes, IntoBytes}; use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; -use crate::Stamp; +use crate::{File, Stamp}; use super::Format; @@ -24,8 +19,13 @@ pub struct Header { } impl Header { - pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result { - let inner = HeaderInner::create_and_write(file, vec_version, format)?; + pub fn create_and_write( + file: &File, + region_index: usize, + vec_version: Version, + format: Format, + ) -> Result { + let inner = HeaderInner::create_and_write(file, region_index, vec_version, format)?; Ok(Self { inner: Arc::new(RwLock::new(inner)), modified: false, @@ -33,11 +33,14 @@ impl Header { } pub fn import_and_verify( - file: &mut File, + file: &File, + region_index: usize, + region_len: u64, vec_version: Version, format: Format, ) -> Result { - let inner = HeaderInner::import_and_verify(file, vec_version, format)?; + let inner = + HeaderInner::import_and_verify(file, region_index, region_len, vec_version, format)?; Ok(Self { inner: Arc::new(RwLock::new(inner)), modified: false, @@ -70,15 +73,11 @@ impl Header { self.inner.read().stamp } - pub fn write(&mut self, file: &mut File) -> io::Result<()> { - self.inner.read().write(file)?; + pub fn write(&mut self, file: &File, region_index: usize) -> Result<()> { + self.inner.read().write(file, region_index)?; self.modified = false; Ok(()) } - - pub fn inner(&self) -> &Arc> { - &self.inner - } } #[repr(C)] @@ -92,7 +91,12 @@ struct HeaderInner { } impl HeaderInner { - pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result { + pub fn create_and_write( + file: &File, + region_index: usize, + vec_version: Version, + format: Format, + ) -> Result { let header = Self { header_version: HEADER_VERSION, vec_version, @@ -100,30 +104,30 @@ impl HeaderInner { stamp: Stamp::default(), compressed: ZeroCopyBool::from(format), }; - header.write(file)?; - file.seek(SeekFrom::End(0))?; + header.write(file, region_index)?; Ok(header) } - pub fn write(&self, file: &mut File) -> io::Result<()> { - file.write_all_at(self.as_bytes(), 0) + pub fn write(&self, file: &File, region_index: usize) -> Result<()> { + file.write_all_to_region_at(region_index.into(), self.as_bytes(), 0) } pub fn import_and_verify( - file: &mut File, + file: &File, + region_index: usize, + region_len: u64, vec_version: Version, format: Format, ) -> Result { - let len = file.metadata()?.len(); + let len = region_len; if len < HEADER_OFFSET as u64 { return Err(Error::WrongLength); } - let mut buf = [0; HEADER_OFFSET]; - file.read_exact_at(&mut buf, 0)?; - - let header = HeaderInner::read_from_bytes(&buf)?; + let reader = file.create_region_reader(region_index.into())?; + let slice = reader.read(0, HEADER_OFFSET as u64); + let header = HeaderInner::read_from_bytes(slice)?; if header.header_version != HEADER_VERSION { return Err(Error::DifferentVersion { diff --git a/crates/brk_vecs/src/variants/raw/mod.rs b/crates/brk_vecs/src/variants/raw/mod.rs index f3a636325..bebdd995a 100644 --- a/crates/brk_vecs/src/variants/raw/mod.rs +++ b/crates/brk_vecs/src/variants/raw/mod.rs @@ -1,21 +1,18 @@ use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet}, - fs, io, marker::PhantomData, mem, - os::unix::fs::FileExt, sync::Arc, }; use brk_core::{Error, Result, Version}; -use memmap2::Mmap; use rayon::prelude::*; -use zerocopy::IntoBytes; use crate::{ AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec, - File, GenericStoredVec, StoredIndex, StoredType, file::Reader, + File, GenericStoredVec, StoredIndex, StoredType, + file::{Reader, RegionReader}, }; use super::Format; @@ -30,8 +27,8 @@ const VERSION: Version = Version::ONE; #[derive(Debug)] pub struct RawVec { - region: usize, file: Arc, + region_index: usize, header: Header, name: &'static str, @@ -47,7 +44,7 @@ where I: StoredIndex, T: StoredType, { - /// Same as import but will reset the folder under certain errors, so be careful ! + /// Same as import but will reset the vec under certain errors, so be careful ! pub fn forced_import(file: &Arc, name: &str, mut version: Version) -> Result { version = version + VERSION; let res = Self::import(file, name, version); @@ -56,11 +53,8 @@ where | Err(Error::WrongEndian) | Err(Error::WrongLength) | Err(Error::DifferentVersion { .. }) => { - fs::remove_file(path)?; - let holes_path = Self::holes_path_(parent, name); - if fs::exists(&holes_path)? { - fs::remove_file(holes_path)?; - } + let _ = file.remove_region(Self::vec_region_name_(name).into()); + let _ = file.remove_region(Self::holes_region_name_(name).into()); Self::import(file, name, version) } _ => res, @@ -68,45 +62,33 @@ where } pub fn import(file: &Arc, name: &str, version: Version) -> Result { - let region = file.create_region_if_needed(&format!("{name}_{}", I::to_string()))?; + let (region_index, region) = file.create_region_if_needed(&Self::vec_region_name_(name))?; - let (header, file) = match Self::open_file_(&path) { - Ok(mut file) => { - if file.metadata()?.len() == 0 { - ( - Header::create_and_write(&mut file, version, Format::Raw)?, - Some(file), - ) - } else { - ( - Header::import_and_verify(&mut file, version, Format::Raw)?, - Some(file), - ) - } - } - Err(e) => match e.kind() { - io::ErrorKind::NotFound => { - fs::create_dir_all(Self::folder_(parent, name))?; - let mut file = Self::open_file_(&path)?; - let header = Header::create_and_write(&mut file, version, Format::Raw)?; - (header, None) - } - _ => { - return Err(e.into()); - } - }, - }; + let region_len = region.read().len() as usize; + if region_len > 0 + && (region_len < HEADER_OFFSET || (region_len - HEADER_OFFSET) % Self::SIZE_OF_T != 0) + { + dbg!(region_len); + return Err(Error::Str("Region was saved incorrectly")); + } - let stored_len = if let Some(file) = file { - (file.metadata()?.len() as usize - HEADER_OFFSET) / Self::SIZE_OF_T + let header = if region_len == 0 { + Header::create_and_write(file, region_index, version, Format::Raw)? } else { - 0 + Header::import_and_verify( + file, + region_index, + region.read().len(), + version, + Format::Raw, + )? }; - let holes_path = Self::holes_path_(parent, name); - let holes = if fs::exists(&holes_path)? { + let holes = if let Ok(holes) = file.get_region(Self::holes_region_name_(name).into()) { Some( - fs::read(&holes_path)? + holes + .create_reader(file) + .read_all() .chunks(size_of::()) .map(|b| -> Result { Ok(usize::from_ne_bytes(brk_core::copy_first_8bytes(b)?)) @@ -118,8 +100,8 @@ where }; Ok(Self { - file, - region, + file: file.clone(), + region_index, header, name: Box::leak(Box::new(name.to_string())), pushed: vec![], @@ -149,11 +131,7 @@ where pub fn write_header_if_needed(&mut self) -> Result<()> { if self.header.modified() { - self.file.write_all_to_region_at( - self.region, - self.header.inner().read().as_bytes(), - 0, - )?; + self.header.write(&self.file, self.region_index)?; } Ok(()) } @@ -165,9 +143,11 @@ where T: StoredType, { #[inline] - fn read_(&self, index: usize, mmap: &Mmap) -> Result> { - let index = index * Self::SIZE_OF_T + HEADER_OFFSET; - let slice = &mmap[index..(index + Self::SIZE_OF_T)]; + fn read_(&self, index: usize, reader: &Reader<'_>) -> Result> { + let slice = reader.read( + (index * Self::SIZE_OF_T + HEADER_OFFSET) as u64, + (Self::SIZE_OF_T) as u64, + ); T::try_read_from_bytes(slice) .map(|v| Some(v)) .map_err(Error::from) @@ -183,7 +163,13 @@ where #[inline] fn stored_len(&self) -> usize { - self.file.get_region(self.region).unwrap().len() as usize / Self::SIZE_OF_T + (self + .file + .get_region(self.region_index.into()) + .unwrap() + .len() as usize + - HEADER_OFFSET) + / Self::SIZE_OF_T } #[inline] @@ -214,7 +200,7 @@ where } fn flush(&mut self) -> Result<()> { - let file_opt = self.write_header_if_needed()?; + self.write_header_if_needed()?; let pushed_len = self.pushed_len(); @@ -228,7 +214,7 @@ where } if has_new_data || has_updated_data { - let mut file = file_opt.unwrap_or(self.open_file()?); + let file = &self.file; if has_new_data { let bytes = { @@ -246,36 +232,37 @@ where bytes }; - self.file_write_all(&mut file, &bytes)?; + file.write_all_to_region(self.region_index.into(), &bytes)?; } if has_updated_data { mem::take(&mut self.updated) .into_iter() .try_for_each(|(i, v)| -> Result<()> { - file.write_all_at( - v.as_bytes(), - ((i * Self::SIZE_OF_T) + HEADER_OFFSET) as u64, - )?; + let bytes = v.as_bytes(); + let at = ((i * Self::SIZE_OF_T) + HEADER_OFFSET) as u64; + file.write_all_to_region_at(self.region_index.into(), bytes, at)?; Ok(()) })?; } } if has_holes || had_holes { - let holes_path = self.holes_path(); if has_holes { self.has_stored_holes = true; - fs::write( - &holes_path, - self.holes - .iter() - .flat_map(|i| i.to_ne_bytes()) - .collect::>(), - )?; + let (holes_index, _) = self + .file + .create_region_if_needed(&self.holes_region_name())?; + self.file.truncate_region(holes_index.into(), 0)?; + let bytes = self + .holes + .iter() + .flat_map(|i| i.to_ne_bytes()) + .collect::>(); + self.file.write_all_to_region(holes_index.into(), &bytes)?; } else if had_holes { self.has_stored_holes = false; - let _ = fs::remove_file(&holes_path); + let _ = self.file.remove_region(self.holes_region_name().into()); } } @@ -295,13 +282,21 @@ where } let from = index * Self::SIZE_OF_T + HEADER_OFFSET; - self.file.truncate_region(self.region, from as u64) + self.file + .truncate_region(self.region_index.into(), from as u64) } fn reset(&mut self) -> Result<()> { - self.set_stored_len(0); self.reset_() } + + fn file(&self) -> &File { + &self.file + } + + fn region_index(&self) -> usize { + self.region_index + } } impl AnyVec for RawVec @@ -339,7 +334,7 @@ impl Clone for RawVec { fn clone(&self) -> Self { Self { file: self.file.clone(), - region: self.region, + region_index: self.region_index, header: self.header.clone(), name: self.name, pushed: vec![], @@ -391,7 +386,7 @@ where let opt = self .vec - .get_or_read_(index, &self.mmap) + .get_or_read_(index, &self.reader) .unwrap() .map(|v| (I::from(index), v)); @@ -414,7 +409,7 @@ where fn into_iter(self) -> Self::IntoIter { RawVecIterator { vec: self, - reader: self.file.read_region(self.region).unwrap(), + reader: self.create_static_reader(), index: 0, } } diff --git a/crates/brk_vecs/src/variants/stamped/mod.rs b/crates/brk_vecs/src/variants/stamped/mod.rs index cf947ac58..6da32ae9d 100644 --- a/crates/brk_vecs/src/variants/stamped/mod.rs +++ b/crates/brk_vecs/src/variants/stamped/mod.rs @@ -1,5 +1,224 @@ +use std::{borrow::Cow, cmp::Ordering, fmt::Debug, sync::Arc}; + +use brk_core::{Error, Result, Version}; + +use crate::{ + AnyCollectableVec, AnyIterableVec, AnyVec, BoxedVecIterator, CollectableVec, File, Format, + GenericStoredVec, Header, StoredIndex, StoredType, StoredVec, file::Reader, +}; + +use super::StoredVecIterator; + mod stamp; pub use stamp::*; -pub struct StampedVec; +#[derive(Debug, Clone)] +pub struct StampedVec(StoredVec); + +impl StampedVec +where + I: StoredIndex, + T: StoredType, +{ + pub fn forced_import( + file: &Arc, + name: &str, + version: Version, + format: Format, + ) -> Result { + Ok(Self( + StoredVec::forced_import(file, name, version, format).unwrap(), + )) + } + + #[inline] + pub fn unwrap_read(&self, index: I, reader: &Reader<'_>) -> T { + self.0.unwrap_read(index, reader) + } + + #[inline] + pub fn get_or_read(&self, index: I, reader: &Reader<'_>) -> Result>> { + self.0.get_or_read(index, reader) + } + + #[inline] + pub fn update_or_push(&mut self, index: I, value: T) -> Result<()> { + self.0.update_or_push(index, value) + } + + #[inline] + pub fn checked_push(&mut self, index: I, value: T) -> Result<()> { + let len = self.0.len(); + match len.cmp(&index.to_usize()?) { + Ordering::Greater => { + dbg!(index, value, len, self.0.header()); + Err(Error::IndexTooLow) + } + Ordering::Equal => { + self.0.push(value); + Ok(()) + } + Ordering::Less => { + dbg!(index, value, len, self.0.header()); + Err(Error::IndexTooHigh) + } + } + } + + #[inline] + pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { + let len = self.0.len(); + match len.cmp(&index.to_usize()?) { + Ordering::Greater => { + // dbg!(len, index, &self.pathbuf); + // panic!(); + Ok(()) + } + Ordering::Equal => { + self.0.push(value); + Ok(()) + } + Ordering::Less => { + dbg!(index, value, len, self.0.header()); + Err(Error::IndexTooHigh) + } + } + } + + #[inline] + pub fn fill_first_hole_or_push(&mut self, value: T) -> Result { + self.0.fill_first_hole_or_push(value) + } + + pub fn update(&mut self, index: I, value: T) -> Result<()> { + self.0.update(index, value) + } + + pub fn take(&mut self, index: I, reader: &Reader<'_>) -> Result> { + self.0.take(index, reader) + } + + pub fn delete(&mut self, index: I) { + self.0.delete(index) + } + + fn update_stamp(&mut self, stamp: Stamp) { + self.0.mut_header().update_stamp(stamp); + } + + pub fn reset(&mut self) -> Result<()> { + self.update_stamp(Stamp::default()); + self.0.reset() + } + + pub fn truncate_if_needed(&mut self, index: I, stamp: Stamp) -> Result<()> { + self.update_stamp(stamp); + self.0.truncate_if_needed(index)?; + Ok(()) + } + + pub fn flush(&mut self, stamp: Stamp) -> Result<()> { + self.update_stamp(stamp); + self.0.flush() + } + + pub fn header(&self) -> &Header { + self.0.header() + } + + #[inline] + pub fn hasnt(&self, index: I) -> Result { + self.0.has(index).map(|b| !b) + } +} + +impl AnyVec for StampedVec +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn version(&self) -> Version { + self.0.version() + } + + #[inline] + fn name(&self) -> &str { + self.0.name() + } + + #[inline] + fn len(&self) -> usize { + self.0.len() + } + + #[inline] + fn index_type_to_string(&self) -> &'static str { + I::to_string() + } + + #[inline] + fn value_type_to_size_of(&self) -> usize { + size_of::() + } +} + +pub trait AnyStampedVec: AnyVec { + fn stamp(&self) -> Stamp; + fn flush(&mut self, stamp: Stamp) -> Result<()>; +} + +impl AnyStampedVec for StampedVec +where + I: StoredIndex, + T: StoredType, +{ + fn stamp(&self) -> Stamp { + self.0.header().stamp() + } + + fn flush(&mut self, stamp: Stamp) -> Result<()> { + self.flush(stamp) + } +} + +impl<'a, I, T> IntoIterator for &'a StampedVec +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Cow<'a, T>); + type IntoIter = StoredVecIterator<'a, I, T>; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl AnyIterableVec for StampedVec +where + I: StoredIndex, + T: StoredType, +{ + fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> + where + T: 'a, + { + Box::new(self.into_iter()) + } +} + +impl AnyCollectableVec for StampedVec +where + I: StoredIndex, + T: StoredType, +{ + fn collect_range_serde_json( + &self, + from: Option, + to: Option, + ) -> Result> { + CollectableVec::collect_range_serde_json(self, from, to) + } +} diff --git a/crates/brk_vecs/src/variants/stamped/stamp.rs b/crates/brk_vecs/src/variants/stamped/stamp.rs index 794963c98..2a9a3fc88 100644 --- a/crates/brk_vecs/src/variants/stamped/stamp.rs +++ b/crates/brk_vecs/src/variants/stamped/stamp.rs @@ -2,3 +2,9 @@ use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; #[derive(Debug, Default, Clone, Copy, FromBytes, IntoBytes, Immutable, KnownLayout)] pub struct Stamp(u64); + +impl Stamp { + pub fn new(stamp: u64) -> Self { + Self(stamp) + } +} diff --git a/crates/brk_vecs/src/variants/stored/format.rs b/crates/brk_vecs/src/variants/stored/format.rs index b5816653e..b3f7bc47a 100644 --- a/crates/brk_vecs/src/variants/stored/format.rs +++ b/crates/brk_vecs/src/variants/stored/format.rs @@ -3,7 +3,7 @@ use std::{fs, io, path::Path}; use brk_core::{Error, Result}; use serde::{Deserialize, Serialize}; -#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum Format { Compressed, #[default] diff --git a/crates/brk_vecs/src/variants/stored/mod.rs b/crates/brk_vecs/src/variants/stored/mod.rs index af5f01f09..d6428e651 100644 --- a/crates/brk_vecs/src/variants/stored/mod.rs +++ b/crates/brk_vecs/src/variants/stored/mod.rs @@ -1,3 +1,298 @@ +use std::{ + borrow::Cow, + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; + +use brk_core::{Result, Version}; + +use crate::{ + AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec, + File, GenericStoredVec, Header, StoredIndex, StoredType, file::Reader, +}; + +use super::{CompressedVec, CompressedVecIterator, RawVec, RawVecIterator}; + mod format; pub use format::*; + +#[derive(Debug, Clone)] +pub enum StoredVec { + Raw(RawVec), + Compressed(CompressedVec), +} + +impl StoredVec +where + I: StoredIndex, + T: StoredType, +{ + pub fn forced_import( + file: &Arc, + name: &str, + version: Version, + format: Format, + ) -> Result { + if version == Version::ZERO { + dbg!(file, name); + panic!("Version must be at least 1, can't verify endianess otherwise"); + } + + if format.is_compressed() { + Ok(Self::Compressed(CompressedVec::forced_import( + file, name, version, + )?)) + } else { + Ok(Self::Raw(RawVec::forced_import(file, name, version)?)) + } + } +} + +impl GenericStoredVec for StoredVec +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn read_(&self, index: usize, reader: &Reader<'_>) -> Result> { + match self { + StoredVec::Raw(v) => v.read_(index, reader), + StoredVec::Compressed(v) => v.read_(index, reader), + } + } + + #[inline] + fn header(&self) -> &Header { + match self { + StoredVec::Raw(v) => v.header(), + StoredVec::Compressed(v) => v.header(), + } + } + + #[inline] + fn file(&self) -> &File { + match self { + StoredVec::Raw(v) => v.file(), + StoredVec::Compressed(v) => v.file(), + } + } + + #[inline] + fn region_index(&self) -> usize { + match self { + StoredVec::Raw(v) => v.region_index(), + StoredVec::Compressed(v) => v.region_index(), + } + } + + #[inline] + fn mut_header(&mut self) -> &mut Header { + match self { + StoredVec::Raw(v) => v.mut_header(), + StoredVec::Compressed(v) => v.mut_header(), + } + } + + #[inline] + fn stored_len(&self) -> usize { + match self { + StoredVec::Raw(v) => v.stored_len(), + StoredVec::Compressed(v) => v.stored_len(), + } + } + + #[inline] + fn pushed(&self) -> &[T] { + match self { + StoredVec::Raw(v) => v.pushed(), + StoredVec::Compressed(v) => v.pushed(), + } + } + #[inline] + fn mut_pushed(&mut self) -> &mut Vec { + match self { + StoredVec::Raw(v) => v.mut_pushed(), + StoredVec::Compressed(v) => v.mut_pushed(), + } + } + + #[inline] + fn holes(&self) -> &BTreeSet { + match self { + StoredVec::Raw(v) => v.holes(), + StoredVec::Compressed(v) => v.holes(), + } + } + #[inline] + fn mut_holes(&mut self) -> &mut BTreeSet { + match self { + StoredVec::Raw(v) => v.mut_holes(), + StoredVec::Compressed(v) => v.mut_holes(), + } + } + + #[inline] + fn updated(&self) -> &BTreeMap { + match self { + StoredVec::Raw(v) => v.updated(), + StoredVec::Compressed(v) => v.updated(), + } + } + #[inline] + fn mut_updated(&mut self) -> &mut BTreeMap { + match self { + StoredVec::Raw(v) => v.mut_updated(), + StoredVec::Compressed(v) => v.mut_updated(), + } + } + + fn flush(&mut self) -> Result<()> { + match self { + StoredVec::Raw(v) => v.flush(), + StoredVec::Compressed(v) => v.flush(), + } + } + + fn truncate_if_needed(&mut self, index: I) -> Result<()> { + match self { + StoredVec::Raw(v) => v.truncate_if_needed(index), + StoredVec::Compressed(v) => v.truncate_if_needed(index), + } + } + + fn reset(&mut self) -> Result<()> { + match self { + StoredVec::Raw(v) => v.reset(), + StoredVec::Compressed(v) => v.reset(), + } + } +} + +impl AnyVec for StoredVec +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn version(&self) -> Version { + match self { + StoredVec::Raw(v) => v.version(), + StoredVec::Compressed(v) => v.version(), + } + } + + #[inline] + fn index_type_to_string(&self) -> &'static str { + I::to_string() + } + + #[inline] + fn len(&self) -> usize { + self.pushed_len() + self.stored_len() + } + + fn name(&self) -> &str { + match self { + StoredVec::Raw(v) => v.name(), + StoredVec::Compressed(v) => v.name(), + } + } + + #[inline] + fn value_type_to_size_of(&self) -> usize { + size_of::() + } +} + +#[derive(Debug)] +pub enum StoredVecIterator<'a, I, T> { + Raw(RawVecIterator<'a, I, T>), + Compressed(CompressedVecIterator<'a, I, T>), +} + +impl<'a, I, T> Iterator for StoredVecIterator<'a, I, T> +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Cow<'a, T>); + fn next(&mut self) -> Option { + match self { + Self::Compressed(i) => i.next(), + Self::Raw(i) => i.next(), + } + } +} + +impl BaseVecIterator for StoredVecIterator<'_, I, T> +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn mut_index(&mut self) -> &mut usize { + match self { + Self::Compressed(iter) => iter.mut_index(), + Self::Raw(iter) => iter.mut_index(), + } + } + + fn len(&self) -> usize { + match self { + Self::Compressed(i) => i.len(), + Self::Raw(i) => i.len(), + } + } + + #[inline] + fn name(&self) -> &str { + match self { + Self::Compressed(i) => i.name(), + Self::Raw(i) => i.name(), + } + } +} + +impl<'a, I, T> IntoIterator for &'a StoredVec +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Cow<'a, T>); + type IntoIter = StoredVecIterator<'a, I, T>; + + fn into_iter(self) -> Self::IntoIter { + match self { + StoredVec::Compressed(v) => StoredVecIterator::Compressed(v.into_iter()), + StoredVec::Raw(v) => StoredVecIterator::Raw(v.into_iter()), + } + } +} + +impl AnyIterableVec for StoredVec +where + I: StoredIndex, + T: StoredType, +{ + fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> + where + T: 'a, + { + Box::new(self.into_iter()) + } +} + +impl AnyCollectableVec for StoredVec +where + I: StoredIndex, + T: StoredType, +{ + fn collect_range_serde_json( + &self, + from: Option, + to: Option, + ) -> Result> { + CollectableVec::collect_range_serde_json(self, from, to) + } +}