diff --git a/crates/brk_vec/src/lib2.rs b/crates/brk_vec/src/lib2.rs deleted file mode 100644 index 6304d9804..000000000 --- a/crates/brk_vec/src/lib2.rs +++ /dev/null @@ -1,880 +0,0 @@ -#![doc = include_str!("../README.md")] -#![doc = "\n## Example\n\n```rust"] -#![doc = include_str!("../examples/main.rs")] -#![doc = "```"] - -use std::{ - fs::{self, File, OpenOptions}, - io::{self, Read, Seek, SeekFrom, Write}, - marker::PhantomData, - mem, - path::{Path, PathBuf}, - sync::{Arc, OnceLock}, -}; - -use arc_swap::ArcSwap; -pub use memmap2; -use memmap2::Mmap; -use rayon::prelude::*; -pub use zerocopy; -use zstd::DEFAULT_COMPRESSION_LEVEL; - -mod enums; -mod structs; -mod traits; - -pub use enums::*; -pub use structs::*; -pub use traits::*; - -use crate::{Compressed, CompressedPagesMetadata, Length, StoredIndex, StoredType, Version}; - -const ONE_KIB: usize = 1024; -pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB; -const ONE_MIB: usize = ONE_KIB * ONE_KIB; -pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; - -#[allow(private_interfaces)] -#[derive(Debug)] -pub enum StorableVec { - Raw { - base: Base, - }, - Compressed { - base: Base, - decoded_page: Option<(usize, Box<[T]>)>, - decoded_pages: Option>>>, - // pages: Option>>>, - // page: Option<(usize, Values)>, - pages_meta: CompressedPagesMetadata, - }, -} - -impl StorableVec -where - I: StoredIndex, - T: StoredType, -{ - pub const SIZE_OF_T: usize = size_of::(); - pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; - pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; - pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; - - /// Same as import but will reset the folder under certain errors, so be careful ! - pub fn forced_import(path: &Path, version: Version, compressed: Compressed) -> Result { - let res = Self::import(path, version, compressed); - match res { - Err(Error::WrongEndian) - | Err(Error::DifferentCompressionMode) - | Err(Error::DifferentVersion { - found: _, - expected: _, - }) => { - fs::remove_dir_all(path)?; - Self::import(path, version, compressed) - } - _ => res, - } - } - - pub fn import(path: &Path, version: Version, compressed: Compressed) -> Result { - let base = Base::import(path, version, compressed)?; - - if *compressed { - let pages_meta = Self::read_pages_meta_(path)?; - - Ok(Self::Compressed { - base, - page: None, - pages: None, - pages_meta, - }) - } else { - Ok(Self::Raw { base }) - } - } - - fn read_pages_meta(&self) -> Result { - Self::read_pages_meta_(self.path()) - } - fn read_pages_meta_(path: &Path) -> Result { - CompressedPagesMetadata::read(Self::path_pages_meta_(path).as_path()) - } - - #[inline] - pub fn get(&mut self, index: I) -> Result> { - self.get_(index.to_usize()?) - } - #[inline] - pub fn get_(&mut self, index: usize) -> Result> { - match self.index_to_pushed_index(index) { - Ok(index) => { - if let Some(index) = index { - return Ok(self.pushed().get(index)); - } - } - Err(Error::IndexTooHigh) => return Ok(None), - Err(Error::IndexTooLow) => {} - Err(error) => return Err(error), - } - - let page_index = Self::index_to_page_index(index); - - if self.page().is_none_or(|b| b.0 != page_index) { - let values = self.decode_page(page_index)?; - self.mut_page().replace((page_index, values)); - } - - self.page().unwrap().1.get(index) - } - - pub fn get_last(&mut self) -> Result> { - let len = self.len(); - if len == 0 { - return Ok(None); - } - self.get_(len - 1) - } - - pub fn read(&self, index: I) -> Result> { - self.read_(index.to_usize()?) - } - pub fn read_(&self, index: usize) -> Result> { - Ok(match self { - Self::Raw { .. } => { - let mut file = self.open_file()?; - let byte_index = Self::index_to_byte_index(index); - file.seek(SeekFrom::Start(byte_index))?; - let mut buf = vec![0; Self::SIZE_OF_T]; - file.read_exact(&mut buf)?; - T::try_ref_from_bytes(&buf[..]).ok().map(|v| v.to_owned()) - } - Self::Compressed { .. } => self - .decode_page(Self::index_to_page_index(index))? - .get(index)? - .cloned(), - }) - } - - pub fn iter(&mut self, f: F) -> Result<()> - where - F: FnMut((I, &T)) -> Result<()>, - { - self.iter_from(I::default(), f) - } - - pub fn iter_from(&mut self, mut index: I, mut f: F) -> Result<()> - where - F: FnMut((I, &T)) -> Result<()>, - { - if !self.is_pushed_empty() { - return Err(Error::UnsupportedUnflushedState); - } - - let stored_len = I::from(self.stored_len()); - - while index < stored_len { - let v = self.get(index)?.unwrap(); - f((index, v))?; - index = index + 1; - } - - Ok(()) - } - - pub fn iter_from_cloned(&mut self, mut index: I, mut f: F) -> Result<()> - where - F: FnMut((I, T, &mut Self)) -> Result<()>, - { - if !self.is_pushed_empty() { - return Err(Error::UnsupportedUnflushedState); - } - - let stored_len = I::from(self.stored_len()); - - while index < stored_len { - let v = self.get(index)?.unwrap().clone(); - f((index, v, self))?; - index = index + 1; - } - - Ok(()) - } - - pub fn collect_range(&self, from: Option, to: Option) -> Result> { - if !self.is_pushed_empty() { - return Err(Error::UnsupportedUnflushedState); - } - - let len = self - .base() - .read_stored_length() - .unwrap() - .to_usize() - .unwrap(); - - if len == 0 { - return Err(Error::IndexTooHigh); - } - - let from = from.map_or(0, |from| { - if from >= 0 { - from as usize - } else { - let from = len as i64 + from; - if from < 0 { 0 } else { from as usize } - } - }); - - let to = to.map_or(len - 1, |to| { - if to >= 0 { - to as usize - } else { - let max = len - 1; - let to = max as i64 + to; - if to > max as i64 { max } else { to as usize } - } - }); - - if from > to { - return Err(Error::RangeFromAfterTo(from, to)); - } - - let mut page: Option<(usize, Values)> = None; - - let values = (from..=to) - .flat_map(|index| { - let page_index = Self::index_to_page_index(index); - - if page.as_ref().is_none_or(|b| b.0 != page_index) { - let pages_meta = match self { - Self::Raw { .. } => None, - Self::Compressed { .. } => Some(self.read_pages_meta().unwrap()), - }; - - let values = Self::decode_page_( - len, - page_index, - &self.base().open_file().unwrap(), - pages_meta.as_ref(), - ) - .inspect_err(|_| { - dbg!(from, to); - }) - .unwrap(); - page.replace((page_index, values)); - } - - page.as_ref().unwrap().1.get(index).ok().flatten().cloned() - }) - .collect::>(); - - Ok(values) - } - - pub fn decode_page(&self, page_index: usize) -> Result> { - Self::decode_page_( - self.stored_len(), - page_index, - self.file(), - match self { - Self::Raw { .. } => None, - Self::Compressed { pages_meta, .. } => Some(pages_meta), - }, - ) - } - - fn decode_page_( - stored_len: usize, - page_index: usize, - file: &File, - compressed_pages_meta: Option<&CompressedPagesMetadata>, - ) -> Result> { - if Self::page_index_to_index(page_index) >= stored_len { - return Err(Error::IndexTooHigh); - } - - let (len, offset) = if let Some(pages_meta) = compressed_pages_meta { - if pages_meta.len() <= page_index { - return Err(Error::ExpectVecToHaveIndex); - } - let page = pages_meta.get(page_index).unwrap(); - (page.bytes_len as usize, page.start) - } else { - (Self::PAGE_SIZE, Self::page_index_to_byte_index(page_index)) - }; - - let mmap = unsafe { - memmap2::MmapOptions::new() - .len(len) - .offset(offset) - .map(file)? - }; - - let compressed = compressed_pages_meta.is_some(); - - if compressed { - let decoded = zstd::decode_all(&mmap[..]); - - if decoded.is_err() { - dbg!((len, offset, page_index, &mmap[..], &mmap.len(), &decoded)); - } - - Ok(Values::from( - decoded? - .chunks(Self::SIZE_OF_T) - .map(|slice| T::try_read_from_bytes(slice).unwrap()) - .collect::>() - .into_boxed_slice(), - )) - } else { - Ok(Values::from(mmap)) - } - } - - #[inline] - pub fn push(&mut self, value: T) { - self.mut_base().pushed.push(value) - } - - pub fn flush(&mut self) -> io::Result<()> { - let pushed_len = self.pushed_len(); - - if pushed_len == 0 { - return Ok(()); - } - - let stored_len = self.stored_len(); - - let bytes = match self { - Self::Compressed { base, pages_meta } => { - let (starting_page_index, values) = if *base.stored_len % Self::PER_PAGE != 0 { - if pages_meta.is_empty() { - unreachable!() - } - - let last_page_index = pages_meta.len() - 1; - - let values = if let Some(values) = base - .pages - .as_mut() - .and_then(|big_cache| big_cache.last_mut().and_then(|lock| lock.take())) - { - values - } else if base - .page - .as_ref() - .is_some_and(|(page_index, _)| *page_index == last_page_index) - { - base.page.take().unwrap().1 - } else { - Self::decode_page_( - stored_len, - last_page_index, - &base.file, - Some(pages_meta), - ) - .inspect_err(|_| { - dbg!(last_page_index, &pages_meta); - }) - .unwrap() - }; - - let file_len = pages_meta.pop().unwrap().start; - - file_set_len(&mut base.file, file_len)?; - - (last_page_index, values) - } else { - (pages_meta.len(), Values::default()) - }; - - let compressed = Vec::from(values.as_arr()) - .into_par_iter() - .chain(mem::take(&mut base.pushed).into_par_iter()) - .chunks(Self::PER_PAGE) - .map(|chunk| (Self::compress_chunk(chunk.as_ref()), chunk.len())) - .collect::>(); - - compressed - .iter() - .enumerate() - .for_each(|(i, (compressed_bytes, values_len))| { - let page_index = starting_page_index + i; - - let start = if page_index != 0 { - let prev = pages_meta.get(page_index - 1).unwrap(); - prev.start + prev.bytes_len as u64 - } else { - 0 - }; - - let bytes_len = compressed_bytes.len() as u32; - let values_len = *values_len as u32; - - let page = CompressedPageMetadata::new(start, bytes_len, values_len); - - pages_meta.push(page_index, page); - }); - - pages_meta.write()?; - - compressed - .into_iter() - .flat_map(|(v, _)| v) - .collect::>() - } - Self::Raw { base } => { - let pushed = &mut base.pushed; - - let mut bytes: Vec = vec![0; pushed.len() * Self::SIZE_OF_T]; - - let unsafe_bytes = UnsafeSlice::new(&mut bytes); - - mem::take(pushed) - .into_par_iter() - .enumerate() - .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); - - bytes - } - }; - - let file = self.mut_file(); - file.write_all(&bytes)?; - file.sync_all()?; - - self.reset_caches(); - - self.increase_stored_len(pushed_len); - - self.write_stored_length()?; - - Ok(()) - } - - pub fn truncate_if_needed(&mut self, index: I) -> Result<()> { - let index = index.to_usize()?; - - if index >= self.stored_len() { - return Ok(()); - } - - if index == 0 { - self.reset()?; - return Ok(()); - } - - let page_index = Self::index_to_page_index(index); - - let values = match self { - Self::Compressed { .. } => self.decode_page(page_index)?, - Self::Raw { .. } => Values::default(), - }; - - let (len, bytes) = match self { - Self::Compressed { pages_meta, .. } => { - let mut page = pages_meta.truncate(page_index).unwrap(); - - let len = page.start; - - let decoded_index = Self::index_to_decoded_index(index); - - let compressed = if decoded_index != 0 { - let chunk = &values.as_arr()[..decoded_index]; - - let compressed = Self::compress_chunk(chunk); - - page.values_len = chunk.len() as u32; - page.bytes_len = compressed.len() as u32; - - pages_meta.push(page_index, page); - - compressed - } else { - vec![].into_boxed_slice() - }; - - pages_meta.write()?; - - (len, compressed) - } - Self::Raw { .. } => { - // let value_at_index = self.open_then_read_(index).ok(); - - let len = Self::index_to_byte_index(index); - - (len, vec![].into_boxed_slice()) - } - }; - - let file = self.mut_file(); - - file_set_len(file, len)?; - - if !bytes.is_empty() { - file.write_all(&bytes)?; - } - - self.set_stored_len(index); - - self.write_stored_length()?; - - self.reset_caches(); - - Ok(()) - } - - fn compress_chunk(chunk: &[T]) -> Box<[u8]> { - if chunk.len() > Self::PER_PAGE { - panic!(); - } - - let mut bytes: Vec = vec![0; chunk.len() * Self::SIZE_OF_T]; - - let unsafe_bytes = UnsafeSlice::new(&mut bytes); - - chunk - .into_par_iter() - .enumerate() - .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); - - zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL) - .unwrap() - .into_boxed_slice() - } - - pub fn enable_large_cache_if_possible(&mut self) { - self.mut_pages().replace(vec![]); - self.reset_large_cache(); - } - - pub fn disable_large_cache_if_possible(&mut self) { - self.mut_base().pages.take(); - } - - fn reset_large_cache(&mut self) { - let stored_len = self.stored_len(); - - if let Some(pages) = self.mut_pages().as_mut() { - pages.par_iter_mut().for_each(|lock| { - lock.take(); - }); - - let len = (stored_len as f64 / Self::PER_PAGE as f64).ceil() as usize; - let len = Self::CACHE_LENGTH.min(len); - - if pages.len() != len { - pages.resize_with(len, Default::default); - } - } - } - - pub fn large_cache_len(&self) -> usize { - self.pages().map_or(0, |v| v.len()) - } - - fn reset_small_cache(&mut self) { - self.mut_base().page.take(); - } - - fn reset_caches(&mut self) { - self.reset_small_cache(); - self.reset_large_cache(); - } - - pub fn reset(&mut self) -> Result<()> { - self.mut_base().reset_file()?; - self.reset_stored_len(); - self.reset_caches(); - Ok(()) - } - - #[inline] - pub fn index_to_pushed_index(&self, index: usize) -> Result> { - let stored_len = self.stored_len(); - - if index >= stored_len { - let index = index - stored_len; - if index >= self.pushed_len() { - Err(Error::IndexTooHigh) - } else { - Ok(Some(index)) - } - } else { - Err(Error::IndexTooLow) - } - } - - #[inline] - fn index_to_byte_index(index: usize) -> u64 { - (index * Self::SIZE_OF_T) as u64 - } - - #[inline(always)] - fn index_to_page_index(index: usize) -> usize { - index / Self::PER_PAGE - } - - #[inline(always)] - fn page_index_to_index(page_index: usize) -> usize { - page_index * Self::PER_PAGE - } - - #[inline(always)] - fn page_index_to_byte_index(page_index: usize) -> u64 { - (page_index * Self::PAGE_SIZE) as u64 - } - - #[inline(always)] - fn index_to_decoded_index(index: usize) -> usize { - index % Self::PER_PAGE - } - - #[inline] - fn path_pages_meta_(path: &Path) -> PathBuf { - path.join("pages_meta") - } - - // #[inline] - // fn page(&self) -> Option<&(usize, Values)> { - // self.base().page.as_ref() - // } - - // #[inline] - // fn mut_page(&mut self) -> &mut Option<(usize, Values)> { - // &mut self.mut_base().page - // } - - // #[inline] - // pub fn pages(&self) -> Option<&Vec>>> { - // self.base().pages.as_ref() - // } - - // #[inline] - // fn mut_pages(&mut self) -> &mut Option>>> { - // &mut self.mut_base().pages - // } - - #[inline] - pub fn len(&self) -> usize { - self.stored_len() + self.pushed_len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - #[inline] - pub fn has(&self, index: I) -> Result { - Ok(self.has_(index.to_usize()?)) - } - #[inline] - fn has_(&self, index: usize) -> bool { - index < self.len() - } - - #[inline] - pub fn pushed(&self) -> &Vec { - &self.base().pushed - } - - #[inline] - pub fn pushed_len(&self) -> usize { - self.pushed().len() - } - - #[inline] - fn is_pushed_empty(&self) -> bool { - self.pushed_len() == 0 - } - - #[inline] - pub fn stored_len(&self) -> usize { - *self.base().stored_len - } - - #[inline] - fn set_stored_len(&mut self, len: usize) { - *self.mut_base().stored_len = len; - } - - fn increase_stored_len(&mut self, len: usize) { - *self.mut_base().stored_len += len; - } - - #[inline] - fn reset_stored_len(&mut self) { - self.set_stored_len(0); - } - - fn write_stored_length(&self) -> io::Result<()> { - self.base().write_stored_length() - } - - #[inline] - pub fn path(&self) -> &Path { - &self.base().pathbuf - } - - fn file(&self) -> &File { - &self.base().file - } - - fn mut_file(&mut self) -> &mut File { - &mut self.mut_base().file - } - - fn open_file(&self) -> io::Result { - self.base().open_file() - } - - pub fn file_name(&self) -> String { - self.path() - .file_name() - .unwrap() - .to_str() - .unwrap() - .to_owned() - } - - #[inline] - pub fn version(&self) -> Version { - self.base().version - } - - #[inline] - fn compressed(&self) -> Compressed { - self.base().compressed - } - - #[inline] - fn base(&self) -> &Base { - match self { - Self::Raw { base, .. } => base, - Self::Compressed { base, .. } => base, - } - } - - #[inline] - fn mut_base(&mut self) -> &mut Base { - match self { - Self::Raw { base, .. } => base, - Self::Compressed { base, .. } => base, - } - } - - pub fn index_type_to_string(&self) -> &str { - I::to_string() - } -} - -#[derive(Debug)] -struct Base { - pub version: Version, - pub pathbuf: PathBuf, - pub stored_len: Length, - pub compressed: Compressed, - pub mmap: ArcSwap, - // pub page: Option<(usize, Values)>, - // pub pages: Option>>>, - pub pushed: Vec, - pub file: File, - pub phantom: PhantomData, -} - -impl Base { - pub fn import(path: &Path, version: Version, compressed: Compressed) -> Result { - fs::create_dir_all(path)?; - - let version_path = Self::path_version_(path); - version.validate(version_path.as_ref())?; - version.write(version_path.as_ref())?; - - let compressed_path = Self::path_compressed_(path); - compressed.validate(compressed_path.as_ref())?; - compressed.write(compressed_path.as_ref())?; - - let stored_len = Length::try_from(Self::path_length_(path).as_path())?; - - let file = Self::open_file_(Self::path_vec_(path).as_path())?; - - Ok(Self { - mmap: ArcSwap::new(Arc::new(unsafe { Mmap::map(&file)? })), - version, - compressed, - pathbuf: path.to_owned(), - file, - stored_len, - pushed: vec![], - phantom: PhantomData, - }) - } - - fn open_file(&self) -> io::Result { - Self::open_file_(&self.path_vec()) - } - fn open_file_(path: &Path) -> io::Result { - OpenOptions::new() - .read(true) - .create(true) - .truncate(false) - .append(true) - .open(path) - } - - fn reset_file(&mut self) -> Result<()> { - file_set_len(&mut self.file, 0)?; - Ok(()) - } - - #[inline] - fn path_vec(&self) -> PathBuf { - Self::path_vec_(&self.pathbuf) - } - #[inline] - fn path_vec_(path: &Path) -> PathBuf { - path.join("vec") - } - - pub fn read_stored_length(&self) -> Result { - Length::try_from(self.path_length().as_path()) - } - fn write_stored_length(&self) -> io::Result<()> { - self.stored_len.write(&self.path_length()) - } - #[inline] - fn path_length(&self) -> PathBuf { - Self::path_length_(&self.pathbuf) - } - #[inline] - fn path_length_(path: &Path) -> PathBuf { - path.join("length") - } - - #[inline] - fn path_version_(path: &Path) -> PathBuf { - path.join("version") - } - - #[inline] - fn path_compressed_(path: &Path) -> PathBuf { - path.join("compressed") - } -} - -impl Clone for StorableVec -where - I: StoredIndex, - T: StoredType, -{ - fn clone(&self) -> Self { - Self::import(self.path(), self.version(), self.compressed()).unwrap() - } -} - -fn file_set_len(file: &mut File, len: u64) -> io::Result<()> { - file.set_len(len)?; - file.seek(SeekFrom::End(0))?; - Ok(()) -}