diff --git a/Cargo.lock b/Cargo.lock index 4fabac1e7..3ca72ee8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -492,7 +492,7 @@ dependencies = [ "brk_parser", "brk_server", "brk_store", - "brk_vec", + "brk_vecs", ] [[package]] @@ -542,11 +542,9 @@ dependencies = [ "brk_indexer", "brk_logger", "brk_parser", - "brk_store", "brk_vecs", "color-eyre", "derive_deref", - "fjall", "libc", "log", "rayon", @@ -612,7 +610,6 @@ dependencies = [ "brk_vecs", "color-eyre", "fjall", - "libc", "log", "rayon", ] @@ -3384,9 +3381,9 @@ dependencies = [ [[package]] name = "quick_cache" -version = "0.6.14" +version = "0.6.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b450dad8382b1b95061d5ca1eb792081fb082adf48c678791fe917509596d5f" +checksum = "8565e62e02af316570d4b492f17af1481d6c07cea60f4e7edd71700da5052ba9" dependencies = [ "equivalent", "hashbrown 0.15.4", diff --git a/Cargo.toml b/Cargo.toml index d794495e6..4541a34ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,6 @@ brk_mcp = { version = "0.0.81", path = "crates/brk_mcp" } brk_parser = { version = "0.0.81", path = "crates/brk_parser" } brk_server = { version = "0.0.81", path = "crates/brk_server" } brk_store = { version = "0.0.81", path = "crates/brk_store" } -brk_vec = { version = "0.0.81", path = "crates/brk_vec" } brk_vecs = { version = "0.0.81", path = "crates/brk_vecs" } byteview = "=0.6.1" clap = { version = "4.5.41", features = ["string"] } diff --git a/crates/brk/Cargo.toml b/crates/brk/Cargo.toml index 4d872f4b9..3c1f56510 100644 --- a/crates/brk/Cargo.toml +++ b/crates/brk/Cargo.toml @@ -22,7 +22,7 @@ full = [ "interface", "server", "store", - "vec", + "vecs", ] bundler = ["brk_bundler"] core = ["brk_core"] @@ -36,7 +36,7 @@ parser = ["brk_parser"] interface = ["brk_interface"] server = ["brk_server"] store = ["brk_store"] -vec = ["brk_vec"] +vecs = ["brk_vecs"] [dependencies] brk_bundler = { workspace = true, optional = true } @@ -52,7 +52,7 @@ brk_parser = { workspace = true, optional = true } brk_interface = { workspace = true, optional = true } brk_server = { workspace = true, optional = true } brk_store = { workspace = true, optional = true } -brk_vec = { workspace = true, optional = true } +brk_vecs = { workspace = true, optional = true } [package.metadata.docs.rs] all-features = true diff --git a/crates/brk/src/lib.rs b/crates/brk/src/lib.rs index f568a5ba6..1b7f87a0a 100644 --- a/crates/brk/src/lib.rs +++ b/crates/brk/src/lib.rs @@ -51,6 +51,6 @@ pub use brk_server as server; #[doc(inline)] pub use brk_store as store; -#[cfg(feature = "vec")] +#[cfg(feature = "vecs")] #[doc(inline)] -pub use brk_vec as vec; +pub use brk_vecs as vecs; diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index 1b91586c0..c5f74b692 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -17,11 +17,9 @@ brk_fetcher = { workspace = true } brk_indexer = { workspace = true } brk_logger = { workspace = true } brk_parser = { workspace = true } -brk_store = { workspace = true } brk_vecs = { workspace = true } color-eyre = { workspace = true } derive_deref = { workspace = true } -fjall = { workspace = true } libc = { workspace = true } log = { workspace = true } rayon = { workspace = true } diff --git a/crates/brk_indexer/Cargo.toml b/crates/brk_indexer/Cargo.toml index c77e9ee7c..d3e1aadf3 100644 --- a/crates/brk_indexer/Cargo.toml +++ b/crates/brk_indexer/Cargo.toml @@ -18,6 +18,5 @@ brk_store = { workspace = true } brk_vecs = { workspace = true } color-eyre = { workspace = true } fjall = { workspace = true } -libc = { workspace = true } log = { workspace = true } rayon = { workspace = true } diff --git a/crates/brk_vec/.gitignore b/crates/brk_vec/.gitignore deleted file mode 100644 index d1d84f03b..000000000 --- a/crates/brk_vec/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/vec -_lib.rs diff --git a/crates/brk_vec/Cargo.lock b/crates/brk_vec/Cargo.lock deleted file mode 100644 index 86ab81ef7..000000000 --- a/crates/brk_vec/Cargo.lock +++ /dev/null @@ -1,25 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -version = 4 - -[[package]] -name = "libc" -version = "0.2.169" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" - -[[package]] -name = "memmap2" -version = "0.9.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" -dependencies = [ - "libc", -] - -[[package]] -name = "storable_vec" -version = "0.1.2" -dependencies = [ - "memmap2", -] diff --git a/crates/brk_vec/Cargo.toml b/crates/brk_vec/Cargo.toml deleted file mode 100644 index d8bbfb936..000000000 --- a/crates/brk_vec/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "brk_vec" -description = "A storeable vec" -keywords = ["vec", "disk", "data"] -categories = ["database"] -version.workspace = true -edition.workspace = true -license.workspace = true -homepage.workspace = true -repository.workspace = true - -[dependencies] -arc-swap = { workspace = true } -brk_core = { workspace = true } -brk_exit = { workspace = true } -clap = { workspace = true } -clap_derive = { workspace = true } -log = { workspace = true } -memmap2 = "0.9.7" -rayon = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -zerocopy = { workspace = true } -zerocopy-derive = { workspace = true } -zstd = "0.13.3" - -[package.metadata.cargo-machete] -ignored = ["clap"] diff --git a/crates/brk_vec/README.md b/crates/brk_vec/README.md deleted file mode 100644 index 5b48b5283..000000000 --- a/crates/brk_vec/README.md +++ /dev/null @@ -1,38 +0,0 @@ -# BRK Vec - -

- - GitHub Repo stars - - - License - - - Version - - - Documentation - - Size - - Dependency status - - - Discord - - - Nostr - - - Bluesky - - - X - -

- -A `Vec` (an array) that is stored on disk and thus which can be much larger than the available RAM. - -Compared to a key/value store, the data stored is raw byte interpretation of the Vec's values without any overhead which is very efficient. Additionally it uses close to no RAM when caching isn't active and up to 100 MB when it is. - -Compression is also available and built on top [`zstd`](https://crates.io/crates/zstd) to save even more space (from 0 to 75%). The tradeoff being slower reading speeds, especially random reading speeds. This is due to the data being stored in compressed pages of 16 KB, which means that if you to read even one value in that page you have to uncompress the whole page. diff --git a/crates/brk_vec/examples/main.rs b/crates/brk_vec/examples/main.rs deleted file mode 100644 index 7a79de218..000000000 --- a/crates/brk_vec/examples/main.rs +++ /dev/null @@ -1,139 +0,0 @@ -use std::{fs, path::Path}; - -use brk_core::{DateIndex, Height, Version}; -use brk_vec::{AnyVec, CollectableVec, Format, GenericStoredVec, StoredVec, VecIterator}; - -type I = DateIndex; -#[allow(clippy::upper_case_acronyms)] -type VEC = StoredVec; - -fn main() -> Result<(), Box> { - let _ = fs::remove_dir_all("./vec"); - - let version = Version::TWO; - let format = Format::Raw; - - { - let mut vec: VEC = StoredVec::forced_import(Path::new("."), "vec", version, format)?; - - (0..21_u32).for_each(|v| { - vec.push(v); - }); - - let mut iter = vec.into_iter(); - dbg!(iter.get(0.into())); - dbg!(iter.get(1.into())); - dbg!(iter.get(2.into())); - dbg!(iter.get(20.into())); - dbg!(iter.get(21.into())); - - vec.flush()?; - - // dbg!(vec.header()); - } - - { - let mut vec: VEC = StoredVec::forced_import(Path::new("."), "vec", version, format)?; - - vec.mut_header().update_height(Height::new(100)); - - let mut iter = vec.into_iter(); - dbg!(iter.get(0.into())); - dbg!(iter.get(1.into())); - dbg!(iter.get(2.into())); - dbg!(iter.get(3.into())); - dbg!(iter.get(4.into())); - dbg!(iter.get(5.into())); - dbg!(iter.get(20.into())); - dbg!(iter.get(20.into())); - dbg!(iter.get(0.into())); - - vec.push(21); - vec.push(22); - - let mut iter = vec.into_iter(); - - dbg!(iter.get(20.into())); - dbg!(iter.get(21.into())); - dbg!(iter.get(22.into())); - dbg!(iter.get(23.into())); - - vec.flush()?; - } - - { - let mut vec: VEC = StoredVec::forced_import(Path::new("."), "vec", version, format)?; - let mut iter = vec.into_iter(); - - dbg!(iter.get(0.into())); - dbg!(iter.get(20.into())); - dbg!(iter.get(21.into())); - dbg!(iter.get(22.into())); - - vec.truncate_if_needed(14.into())?; - - let mut iter = vec.into_iter(); - - iter.get(0.into()); - iter.get(5.into()); - dbg!(iter.get(20.into())); - - dbg!(vec.collect_signed_range(Some(-5), None)?); - - vec.push(vec.len() as u32); - dbg!(VecIterator::last(vec.into_iter())); - - dbg!(vec.into_iter().collect::>()); - } - - { - let mut vec: VEC = StoredVec::forced_import(Path::new("."), "vec", version, format)?; - - vec.reset()?; - - dbg!(vec.header(), vec.pushed_len(), vec.stored_len(), vec.len()); - - (0..21_u32).for_each(|v| { - vec.push(v); - }); - - let mut iter = vec.into_iter(); - dbg!(iter.get(0.into())); - dbg!(iter.get(20.into())); - dbg!(iter.get(21.into())); - - let mmap = vec.create_mmap()?; - dbg!(vec.take(10.into(), &mmap)?); - dbg!(vec.get_or_read(10.into(), &mmap)?); - dbg!(vec.holes()); - vec.flush()?; - dbg!(vec.holes()); - } - - { - let mut vec: VEC = StoredVec::forced_import(Path::new("."), "vec", version, format)?; - - let mmap = vec.create_mmap()?; - - dbg!(vec.holes()); - dbg!(vec.get_or_read(10.into(), &mmap)?); - - vec.update(10.into(), 10)?; - vec.update(0.into(), 10)?; - dbg!( - vec.holes(), - vec.get_or_read(0.into(), &mmap)?, - vec.get_or_read(10.into(), &mmap)? - ); - - vec.flush()?; - } - - { - let vec: VEC = StoredVec::forced_import(Path::new("."), "vec", version, format)?; - - dbg!(vec.collect()?); - } - - Ok(()) -} diff --git a/crates/brk_vec/src/lib.rs b/crates/brk_vec/src/lib.rs deleted file mode 100644 index a3f1fe79b..000000000 --- a/crates/brk_vec/src/lib.rs +++ /dev/null @@ -1,13 +0,0 @@ -#![doc = include_str!("../README.md")] -#![doc = "\n## Example\n\n```rust"] -#![doc = include_str!("../examples/main.rs")] -#![doc = "```"] - -mod structs; -mod traits; -mod variants; - -pub use memmap2::Mmap; -pub use structs::*; -pub use traits::*; -pub use variants::*; diff --git a/crates/brk_vec/src/structs/compressed_page_meta.rs b/crates/brk_vec/src/structs/compressed_page_meta.rs deleted file mode 100644 index a0b18cf7e..000000000 --- a/crates/brk_vec/src/structs/compressed_page_meta.rs +++ /dev/null @@ -1,18 +0,0 @@ -use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; - -#[derive(Debug, Clone, IntoBytes, Immutable, FromBytes, KnownLayout)] -pub struct CompressedPageMetadata { - pub start: u64, - pub bytes_len: u32, - pub values_len: u32, -} - -impl CompressedPageMetadata { - pub fn new(start: u64, bytes_len: u32, values_len: u32) -> Self { - Self { - start, - bytes_len, - values_len, - } - } -} diff --git a/crates/brk_vec/src/structs/compressed_pages_meta.rs b/crates/brk_vec/src/structs/compressed_pages_meta.rs deleted file mode 100644 index 6d170e064..000000000 --- a/crates/brk_vec/src/structs/compressed_pages_meta.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::{ - fs::{self, OpenOptions}, - io::{self, Seek, SeekFrom, Write}, - path::{Path, PathBuf}, -}; - -use brk_core::Result; -use rayon::prelude::*; -use zerocopy::{IntoBytes, TryFromBytes}; - -use super::{CompressedPageMetadata, UnsafeSlice}; - -#[derive(Debug, Clone)] -pub struct CompressedPagesMetadata { - vec: Vec, - change_at: Option, - path: PathBuf, -} - -impl CompressedPagesMetadata { - const PAGE_SIZE: usize = size_of::(); - - pub fn read(path: &Path) -> Result { - let this = Self { - vec: fs::read(path) - .unwrap_or_default() - .chunks(Self::PAGE_SIZE) - .map(|bytes| { - if bytes.len() != Self::PAGE_SIZE { - panic!() - } - CompressedPageMetadata::try_read_from_bytes(bytes).unwrap() - }) - .collect::>(), - path: path.to_owned(), - change_at: None, - }; - - Ok(this) - } - - pub fn write(&mut self) -> io::Result<()> { - if self.change_at.is_none() { - return Ok(()); - } - - let change_at = self.change_at.take().unwrap(); - - let len = (self.vec.len() - change_at) * Self::PAGE_SIZE; - - let mut bytes: Vec = vec![0; len]; - - let unsafe_bytes = UnsafeSlice::new(&mut bytes); - - self.vec[change_at..] - .par_iter() - .enumerate() - .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::PAGE_SIZE, v.as_bytes())); - - let mut file = OpenOptions::new() - .read(true) - .create(true) - .truncate(false) - .append(true) - .open(&self.path)?; - - file.set_len((change_at * Self::PAGE_SIZE) as u64)?; - file.seek(SeekFrom::End(0))?; - - file.write_all(&bytes)?; - - Ok(()) - } - - pub fn len(&self) -> usize { - self.vec.len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn get(&self, page_index: usize) -> Option<&CompressedPageMetadata> { - self.vec.get(page_index) - } - - pub fn last(&self) -> Option<&CompressedPageMetadata> { - self.vec.last() - } - - pub fn pop(&mut self) -> Option { - self.vec.pop() - } - - pub fn push(&mut self, page_index: usize, page: CompressedPageMetadata) { - if page_index != self.vec.len() { - panic!(); - } - - self.set_changed_at(page_index); - - self.vec.push(page); - } - - fn set_changed_at(&mut self, page_index: usize) { - if self.change_at.is_none_or(|pi| pi > page_index) { - self.change_at.replace(page_index); - } - } - - pub fn truncate(&mut self, page_index: usize) -> Option { - let page = self.get(page_index).cloned(); - self.vec.truncate(page_index); - self.set_changed_at(page_index); - page - } -} diff --git a/crates/brk_vec/src/structs/format.rs b/crates/brk_vec/src/structs/format.rs deleted file mode 100644 index ab5e5452c..000000000 --- a/crates/brk_vec/src/structs/format.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::{fs, io, path::Path}; - -use brk_core::{Error, Result}; -use clap_derive::ValueEnum; -use serde::{Deserialize, Serialize}; - -#[derive( - Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, ValueEnum, -)] -pub enum Format { - Compressed, - #[default] - Raw, -} - -impl Format { - pub fn write(&self, path: &Path) -> Result<(), io::Error> { - fs::write(path, self.as_bytes()) - } - - pub fn is_raw(&self) -> bool { - *self == Self::Raw - } - - pub fn is_compressed(&self) -> bool { - *self == Self::Compressed - } - - fn as_bytes(&self) -> Vec { - if self.is_compressed() { - vec![1] - } else { - vec![0] - } - } - - fn from_bytes(bytes: &[u8]) -> Self { - if bytes.len() != 1 { - panic!(); - } - if bytes[0] == 1 { - Self::Compressed - } else if bytes[0] == 0 { - Self::Raw - } else { - panic!() - } - } - - pub fn validate(&self, path: &Path) -> Result<()> { - if let Ok(prev_compressed) = Format::try_from(path) { - if prev_compressed != *self { - return Err(Error::DifferentCompressionMode); - } - } - - Ok(()) - } -} - -impl TryFrom<&Path> for Format { - type Error = Error; - fn try_from(value: &Path) -> Result { - Ok(Self::from_bytes(&fs::read(value)?)) - } -} diff --git a/crates/brk_vec/src/structs/header.rs b/crates/brk_vec/src/structs/header.rs deleted file mode 100644 index 30bd0c936..000000000 --- a/crates/brk_vec/src/structs/header.rs +++ /dev/null @@ -1,197 +0,0 @@ -use std::{ - fs::File, - io::{self, Seek, SeekFrom}, - os::unix::fs::FileExt, - sync::Arc, -}; - -use arc_swap::ArcSwap; -use brk_core::{Error, Height, Result, Version}; -use zerocopy::{FromBytes, IntoBytes}; -use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; - -use crate::Format; - -const HEADER_VERSION: Version = Version::ONE; -pub const HEADER_OFFSET: usize = size_of::(); - -#[derive(Debug, Clone)] -pub struct Header { - inner: Arc>, - modified: bool, -} - -impl Header { - pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result { - let inner = HeaderInner::create_and_write(file, vec_version, format)?; - Ok(Self { - inner: Arc::new(ArcSwap::from_pointee(inner)), - modified: false, - }) - } - - pub fn import_and_verify( - file: &mut File, - vec_version: Version, - format: Format, - ) -> Result { - let inner = HeaderInner::import_and_verify(file, vec_version, format)?; - Ok(Self { - inner: Arc::new(ArcSwap::from_pointee(inner)), - modified: false, - }) - } - - pub fn update_height(&mut self, height: Height) { - self.modified = true; - self.inner.rcu(|header| { - let mut header = (**header).clone(); - header.height = height; - header - }); - } - - pub fn update_computed_version(&mut self, computed_version: Version) { - self.modified = true; - self.inner.rcu(|header| { - let mut header = (**header).clone(); - header.computed_version = computed_version; - header - }); - } - - pub fn modified(&self) -> bool { - self.modified - } - - pub fn vec_version(&self) -> Version { - self.inner.load().vec_version - } - - pub fn computed_version(&self) -> Version { - self.inner.load().computed_version - } - - pub fn height(&self) -> Height { - self.inner.load().height - } - - pub fn write(&mut self, file: &mut File) -> io::Result<()> { - self.inner.load().write(file)?; - self.modified = false; - Ok(()) - } -} - -#[repr(C)] -#[derive(Debug, Clone, FromBytes, IntoBytes, Immutable, KnownLayout)] -struct HeaderInner { - pub header_version: Version, - pub vec_version: Version, - pub computed_version: Version, - pub height: Height, - pub compressed: ZeroCopyBool, -} - -impl HeaderInner { - pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result { - let header = Self { - header_version: HEADER_VERSION, - vec_version, - computed_version: Version::default(), - height: Height::default(), - compressed: ZeroCopyBool::from(format), - }; - header.write(file)?; - file.seek(SeekFrom::End(0))?; - Ok(header) - } - - pub fn write(&self, file: &mut File) -> io::Result<()> { - file.write_all_at(self.as_bytes(), 0) - } - - pub fn import_and_verify( - file: &mut File, - vec_version: Version, - format: Format, - ) -> Result { - let len = file.metadata()?.len(); - - if len < HEADER_OFFSET as u64 { - return Err(Error::WrongLength); - } - - let mut buf = [0; HEADER_OFFSET]; - file.read_exact_at(&mut buf, 0)?; - - let header = HeaderInner::read_from_bytes(&buf)?; - - if header.header_version != HEADER_VERSION { - return Err(Error::DifferentVersion { - found: header.header_version, - expected: HEADER_VERSION, - }); - } - if header.vec_version != vec_version { - return Err(Error::DifferentVersion { - found: header.vec_version, - expected: vec_version, - }); - } - if header.compressed.is_broken() { - return Err(Error::WrongEndian); - } - if (header.compressed.is_true() && format.is_raw()) - || (header.compressed.is_false() && format.is_compressed()) - { - return Err(Error::DifferentCompressionMode); - } - - Ok(header) - } -} - -#[derive( - Debug, - Clone, - Copy, - Default, - PartialEq, - Eq, - PartialOrd, - Ord, - FromBytes, - IntoBytes, - Immutable, - KnownLayout, -)] -#[repr(C)] -pub struct ZeroCopyBool(u32); - -impl ZeroCopyBool { - pub const TRUE: Self = Self(1); - pub const FALSE: Self = Self(0); - - pub fn is_true(&self) -> bool { - *self == Self::TRUE - } - - pub fn is_false(&self) -> bool { - *self == Self::FALSE - } - - pub fn is_broken(&self) -> bool { - *self > Self::TRUE - } -} - -impl From for ZeroCopyBool { - fn from(value: Format) -> Self { - if value.is_raw() { - Self::FALSE - } else { - Self::TRUE - } - } -} diff --git a/crates/brk_vec/src/structs/mod.rs b/crates/brk_vec/src/structs/mod.rs deleted file mode 100644 index 237b1d4ba..000000000 --- a/crates/brk_vec/src/structs/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -mod compressed_page_meta; -mod compressed_pages_meta; -mod format; -mod header; -mod unsafe_slice; - -pub use compressed_page_meta::*; -pub use compressed_pages_meta::*; -pub use format::*; -pub use header::*; -pub use unsafe_slice::*; diff --git a/crates/brk_vec/src/structs/unsafe_slice.rs b/crates/brk_vec/src/structs/unsafe_slice.rs deleted file mode 100644 index 37bdeb6f4..000000000 --- a/crates/brk_vec/src/structs/unsafe_slice.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::cell::UnsafeCell; - -#[derive(Copy, Clone)] -pub struct UnsafeSlice<'a, T>(&'a [UnsafeCell]); -unsafe impl Send for UnsafeSlice<'_, T> {} -unsafe impl Sync for UnsafeSlice<'_, T> {} - -impl<'a, T> UnsafeSlice<'a, T> { - pub fn new(slice: &'a mut [T]) -> Self { - let ptr = slice as *mut [T] as *const [UnsafeCell]; - Self(unsafe { &*ptr }) - } - - /// SAFETY: It is UB if two threads write to the same index without - /// synchronization. - pub fn write(&self, i: usize, value: T) { - unsafe { - *self.0[i].get() = value; - } - } - - /// SAFETY: It is UB - pub fn get(&self, i: usize) -> *mut T { - self.0[i].get() - } - - pub fn copy_slice(&self, start: usize, slice: &[T]) - where - T: Copy, - { - slice.iter().enumerate().for_each(|(i, v)| { - self.write(start + i, *v); - }); - } -} diff --git a/crates/brk_vec/src/traits/any.rs b/crates/brk_vec/src/traits/any.rs deleted file mode 100644 index 84c516788..000000000 --- a/crates/brk_vec/src/traits/any.rs +++ /dev/null @@ -1,102 +0,0 @@ -use brk_core::{Height, Version}; - -use super::{BoxedVecIterator, StoredIndex, StoredType}; - -pub fn i64_to_usize(i: i64, len: usize) -> usize { - if i >= 0 { - (i as usize).min(len) - } else { - let v = len as i64 + i; - if v < 0 { 0 } else { v as usize } - } -} - -pub trait AnyVec: Send + Sync { - fn version(&self) -> Version; - fn name(&self) -> &str; - fn len(&self) -> usize; - fn is_empty(&self) -> bool { - self.len() == 0 - } - fn index_type_to_string(&self) -> &'static str; - fn value_type_to_size_of(&self) -> usize; - fn etag(&self, height: Height, to: Option) -> String { - let len = self.len(); - format!( - "{}-{}-{}", - to.map_or(len, |to| { - if to.is_negative() { - len.checked_sub(to.unsigned_abs() as usize) - .unwrap_or_default() - } else { - to as usize - } - }), - u64::from(self.version()), - u32::from(height), - ) - } - - #[inline] - fn i64_to_usize(&self, i: i64) -> usize { - let len = self.len(); - i64_to_usize(i, len) - } -} - -pub trait AnyIterableVec: AnyVec { - #[allow(clippy::wrong_self_convention)] - fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - I: StoredIndex, - T: StoredType + 'a; - - fn iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - I: StoredIndex, - T: StoredType + 'a, - { - self.boxed_iter() - } - - fn iter_at<'a>(&'a self, i: I) -> BoxedVecIterator<'a, I, T> - where - I: StoredIndex, - T: StoredType + 'a, - { - let mut iter = self.boxed_iter(); - iter.set(i); - iter - } - - fn iter_at_<'a>(&'a self, i: usize) -> BoxedVecIterator<'a, I, T> - where - I: StoredIndex, - T: StoredType + 'a, - { - let mut iter = self.boxed_iter(); - iter.set_(i); - iter - } -} - -pub trait CloneableAnyIterableVec: AnyIterableVec { - fn boxed_clone(&self) -> Box>; -} - -impl CloneableAnyIterableVec for U -where - U: 'static + AnyIterableVec + Clone, -{ - fn boxed_clone(&self) -> Box> { - Box::new(self.clone()) - } -} - -impl Clone for Box> { - fn clone(&self) -> Self { - self.boxed_clone() - } -} - -pub type BoxedAnyIterableVec = Box>; diff --git a/crates/brk_vec/src/traits/collectable.rs b/crates/brk_vec/src/traits/collectable.rs deleted file mode 100644 index 87d7b840b..000000000 --- a/crates/brk_vec/src/traits/collectable.rs +++ /dev/null @@ -1,87 +0,0 @@ -use brk_core::{Error, Result}; - -use crate::i64_to_usize; - -use super::{AnyIterableVec, AnyVec, StoredIndex, StoredType}; - -pub trait CollectableVec: AnyVec + AnyIterableVec -where - Self: Clone, - I: StoredIndex, - T: StoredType, -{ - fn collect(&self) -> Result> { - self.collect_range(None, None) - } - - fn collect_range(&self, from: Option, to: Option) -> Result> { - let len = self.len(); - let from = from.unwrap_or_default(); - let to = to.map_or(len, |to| to.min(len)); - - if from >= len || from >= to { - return Ok(vec![]); - } - - Ok(self - .iter_at_(from) - .take(to - from) - .map(|(_, v)| v.into_owned()) - .collect::>()) - } - - #[inline] - fn i64_to_usize_(i: i64, len: usize) -> usize { - if i >= 0 { - (i as usize).min(len) - } else { - let v = len as i64 + i; - if v < 0 { 0 } else { v as usize } - } - } - - fn collect_signed_range(&self, from: Option, to: Option) -> Result> { - let from = from.map(|i| self.i64_to_usize(i)); - let to = to.map(|i| self.i64_to_usize(i)); - self.collect_range(from, to) - } - - #[inline] - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result> { - self.collect_range(from, to)? - .into_iter() - .map(|v| serde_json::to_value(v).map_err(Error::from)) - .collect::>>() - } -} - -impl CollectableVec for V -where - V: AnyVec + AnyIterableVec + Clone, - I: StoredIndex, - T: StoredType, -{ -} - -pub trait AnyCollectableVec: AnyVec { - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result>; - - fn range_count(&self, from: Option, to: Option) -> usize { - let len = self.len(); - let from = from.map(|i| i64_to_usize(i, len)); - let to = to.map(|i| i64_to_usize(i, len)); - (from.unwrap_or_default()..to.unwrap_or(len)).count() - } - - fn range_weight(&self, from: Option, to: Option) -> usize { - self.range_count(from, to) * self.value_type_to_size_of() - } -} diff --git a/crates/brk_vec/src/traits/generic.rs b/crates/brk_vec/src/traits/generic.rs deleted file mode 100644 index 75e157db8..000000000 --- a/crates/brk_vec/src/traits/generic.rs +++ /dev/null @@ -1,281 +0,0 @@ -use std::{ - borrow::Cow, - cmp::Ordering, - collections::{BTreeMap, BTreeSet}, - fs::{self, File, OpenOptions}, - io::{self, Seek, SeekFrom, Write}, - path::{Path, PathBuf}, -}; - -use brk_core::{Error, Result}; -use memmap2::Mmap; - -use crate::{AnyVec, HEADER_OFFSET, Header}; - -use super::{StoredIndex, StoredType}; - -pub trait GenericStoredVec: Send + Sync -where - Self: AnyVec, - I: StoredIndex, - T: StoredType, -{ - const SIZE_OF_T: usize = size_of::(); - - #[inline] - fn unwrap_read(&self, index: I, mmap: &Mmap) -> T { - self.read(index, mmap).unwrap().unwrap() - } - #[inline] - fn read(&self, index: I, mmap: &Mmap) -> Result> { - self.read_(index.to_usize()?, mmap) - } - fn read_(&self, index: usize, mmap: &Mmap) -> Result>; - - #[inline] - fn get_or_read(&self, index: I, mmap: &Mmap) -> Result>> { - self.get_or_read_(index.to_usize()?, mmap) - } - #[inline] - fn get_or_read_(&self, index: usize, mmap: &Mmap) -> Result>> { - let stored_len = self.stored_len(); - - if index >= stored_len { - let pushed = self.pushed(); - let j = index - stored_len; - if j >= pushed.len() { - return Ok(None); - } - return Ok(pushed.get(j).map(Cow::Borrowed)); - } - - let updated = self.updated(); - if !updated.is_empty() - && let Some(updated) = updated.get(&index) - { - return Ok(Some(Cow::Borrowed(updated))); - } - - let holes = self.holes(); - if !holes.is_empty() && holes.contains(&index) { - return Ok(None); - } - - Ok(self.read_(index, mmap)?.map(Cow::Owned)) - } - - #[inline] - fn len_(&self) -> usize { - self.stored_len() + self.pushed_len() - } - - fn index_to_name(&self) -> String { - format!("{}_to_{}", I::to_string(), self.name()) - } - - fn stored_len(&self) -> usize; - - fn pushed(&self) -> &[T]; - #[inline] - fn pushed_len(&self) -> usize { - self.pushed().len() - } - fn mut_pushed(&mut self) -> &mut Vec; - #[inline] - fn push(&mut self, value: T) { - self.mut_pushed().push(value) - } - - #[inline] - fn update_or_push(&mut self, index: I, value: T) -> Result<()> { - let len = self.len(); - match len.cmp(&index.to_usize()?) { - Ordering::Less => { - dbg!(index, value, len, self.header()); - Err(Error::IndexTooHigh) - } - Ordering::Equal => { - self.push(value); - Ok(()) - } - Ordering::Greater => self.update(index, value), - } - } - - fn get_first_empty_index(&self) -> I { - self.holes() - .first() - .cloned() - .unwrap_or_else(|| self.len_()) - .into() - } - - #[inline] - fn fill_first_hole_or_push(&mut self, value: T) -> Result { - Ok( - if let Some(hole) = self.mut_holes().pop_first().map(I::from) { - self.update(hole, value)?; - hole - } else { - self.push(value); - I::from(self.len() - 1) - }, - ) - } - - fn holes(&self) -> &BTreeSet; - fn mut_holes(&mut self) -> &mut BTreeSet; - fn take(&mut self, index: I, mmap: &Mmap) -> Result> { - let opt = self.get_or_read(index, mmap)?.map(|v| v.into_owned()); - if opt.is_some() { - self.unchecked_delete(index); - } - Ok(opt) - } - #[inline] - fn delete(&mut self, index: I) { - if index.unwrap_to_usize() < self.len() { - self.unchecked_delete(index); - } - } - #[inline] - #[doc(hidden)] - fn unchecked_delete(&mut self, index: I) { - let uindex = index.unwrap_to_usize(); - let updated = self.mut_updated(); - if !updated.is_empty() { - updated.remove(&uindex); - } - self.mut_holes().insert(uindex); - } - - fn updated(&self) -> &BTreeMap; - fn mut_updated(&mut self) -> &mut BTreeMap; - #[inline] - fn update(&mut self, index: I, value: T) -> Result<()> { - let uindex = index.unwrap_to_usize(); - let stored_len = self.stored_len(); - - if uindex >= stored_len { - if let Some(prev) = self.mut_pushed().get_mut(uindex - stored_len) { - *prev = value; - return Ok(()); - } else { - return Err(Error::IndexTooHigh); - } - } - - let holes = self.mut_holes(); - if !holes.is_empty() { - holes.remove(&index.unwrap_to_usize()); - } - - self.mut_updated().insert(index.unwrap_to_usize(), value); - - Ok(()) - } - - fn header(&self) -> &Header; - fn mut_header(&mut self) -> &mut Header; - - fn parent(&self) -> &Path; - - fn folder(&self) -> PathBuf { - self.parent().join(self.name()) - } - - fn folder_(parent: &Path, name: &str) -> PathBuf { - parent.join(name) - } - - #[inline] - fn path(&self) -> PathBuf { - Self::path_(self.parent(), self.name()) - } - #[inline] - fn path_(parent: &Path, name: &str) -> PathBuf { - Self::folder_(parent, name).join(I::to_string()) - } - - #[inline] - fn holes_path(&self) -> PathBuf { - Self::holes_path_(self.parent(), self.name()) - } - #[inline] - fn holes_path_(parent: &Path, name: &str) -> PathBuf { - Self::folder_(parent, name).join(format!("{}_holes", I::to_string())) - } - - // --- - - fn open_file(&self) -> io::Result { - Self::open_file_(&self.path()) - } - fn open_file_(path: &Path) -> io::Result { - let mut file = OpenOptions::new() - .read(true) - .create(true) - .write(true) - .truncate(false) - .open(path)?; - file.seek(SeekFrom::End(0))?; - Ok(file) - } - - fn file_set_len(&mut self, file: &mut File, len: u64) -> Result<()> { - Self::file_set_len_(file, len)?; - Ok(()) - } - fn file_set_len_(file: &mut File, len: u64) -> Result<()> { - file.set_len(len)?; - file.seek(SeekFrom::End(0))?; - Ok(()) - } - - fn file_write_all(&mut self, file: &mut File, buf: &[u8]) -> Result<()> { - file.write_all(buf)?; - file.flush()?; - Ok(()) - } - - fn file_truncate_and_write_all(&mut self, file: &mut File, len: u64, buf: &[u8]) -> Result<()> { - Self::file_set_len_(file, len)?; - self.file_write_all(file, buf) - } - - fn reset(&mut self) -> Result<()>; - - #[inline] - fn reset_(&mut self) -> Result<()> { - let holes_path = self.holes_path(); - if fs::exists(&holes_path)? { - fs::remove_file(&holes_path)?; - } - let mut file = self.open_file()?; - self.file_truncate_and_write_all(&mut file, HEADER_OFFSET as u64, &[]) - } - - #[inline] - fn create_mmap(&self) -> Result { - let file = self.open_file()?; - unsafe { Mmap::map(&file).map_err(|e| e.into()) } - } - - #[inline] - fn is_pushed_empty(&self) -> bool { - self.pushed_len() == 0 - } - - #[inline] - fn has(&self, index: I) -> Result { - Ok(self.has_(index.to_usize()?)) - } - #[inline] - fn has_(&self, index: usize) -> bool { - index < self.len_() - } - - fn flush(&mut self) -> Result<()>; - - fn truncate_if_needed(&mut self, index: I) -> Result<()>; -} diff --git a/crates/brk_vec/src/traits/index.rs b/crates/brk_vec/src/traits/index.rs deleted file mode 100644 index 723d28b45..000000000 --- a/crates/brk_vec/src/traits/index.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::{fmt::Debug, ops::Add}; - -use brk_core::{Error, Printable, Result}; -use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; - -pub trait StoredIndex -where - Self: Debug - + Default - + Copy - + Clone - + PartialEq - + Eq - + PartialOrd - + Ord - + TryInto - + From - + Add - + TryFromBytes - + IntoBytes - + Immutable - + KnownLayout - + Send - + Sync - + Printable, -{ - fn unwrap_to_usize(self) -> usize; - fn to_usize(self) -> Result; - fn decremented(self) -> Option; -} - -impl StoredIndex for I -where - I: Debug - + Default - + Copy - + Clone - + PartialEq - + Eq - + PartialOrd - + Ord - + TryInto - + From - + Add - + TryFromBytes - + IntoBytes - + Immutable - + KnownLayout - + Send - + Sync - + Printable, -{ - #[inline] - fn unwrap_to_usize(self) -> usize { - self.to_usize().unwrap() - } - - #[inline] - fn to_usize(self) -> Result { - self.try_into().map_err(|_| Error::FailedKeyTryIntoUsize) - } - - #[inline] - fn decremented(self) -> Option { - self.unwrap_to_usize().checked_sub(1).map(Self::from) - } -} diff --git a/crates/brk_vec/src/traits/iterator.rs b/crates/brk_vec/src/traits/iterator.rs deleted file mode 100644 index 097e513a7..000000000 --- a/crates/brk_vec/src/traits/iterator.rs +++ /dev/null @@ -1,105 +0,0 @@ -use std::{borrow::Cow, iter::Skip}; - -use brk_core::Printable; - -use super::{StoredIndex, StoredType}; - -pub trait BaseVecIterator: Iterator { - fn mut_index(&mut self) -> &mut usize; - - #[inline] - fn set_(&mut self, i: usize) { - *self.mut_index() = i; - } - - #[inline] - fn next_at(&mut self, i: usize) -> Option { - self.set_(i); - self.next() - } - - fn len(&self) -> usize; - - fn name(&self) -> &str; - - fn is_empty(&self) -> bool { - self.len() == 0 - } - - fn skip(self, _: usize) -> Skip - where - Self: Sized, - { - todo!("") - } -} - -pub trait VecIterator<'a>: BaseVecIterator)> { - type I: StoredIndex; - type T: StoredType + 'a; - - #[inline] - fn set(&mut self, i: Self::I) { - self.set_(i.unwrap_to_usize()) - } - - #[inline] - fn get_(&mut self, i: usize) -> Option> { - self.next_at(i).map(|(_, v)| v) - } - - #[inline] - fn get(&mut self, i: Self::I) -> Option> { - self.get_(i.unwrap_to_usize()) - } - - #[inline] - fn unwrap_get_inner(&mut self, i: Self::I) -> Self::T { - self.unwrap_get_inner_(i.unwrap_to_usize()) - } - - #[inline] - fn unwrap_get_inner_(&mut self, i: usize) -> Self::T { - self.get_(i) - .unwrap_or_else(|| { - dbg!(self.name(), i, self.len(), Self::I::to_string()); - panic!("unwrap_get_inner_") - }) - .into_owned() - } - - #[inline] - fn get_inner(&mut self, i: Self::I) -> Option { - self.get_(i.unwrap_to_usize()).map(|v| v.into_owned()) - } - - fn last(mut self) -> Option - where - Self: Sized, - { - let len = self.len(); - if len == 0 { - return None; - } - let i = len - 1; - self.set_(i); - self.next() - } - - fn index_type_to_string(&self) -> &'static str { - Self::I::to_string() - } -} - -impl<'a, I, T, Iter> VecIterator<'a> for Iter -where - Iter: BaseVecIterator)>, - I: StoredIndex, - T: StoredType + 'a, -{ - type I = I; - type T = T; -} - -pub type BoxedVecIterator<'a, I, T> = - Box)> + 'a>; diff --git a/crates/brk_vec/src/traits/mod.rs b/crates/brk_vec/src/traits/mod.rs deleted file mode 100644 index d07c2a598..000000000 --- a/crates/brk_vec/src/traits/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod any; -mod collectable; -mod generic; -mod index; -mod iterator; -mod r#type; - -pub use any::*; -pub use collectable::*; -pub use generic::*; -pub use index::*; -pub use iterator::*; -pub use r#type::*; diff --git a/crates/brk_vec/src/traits/type.rs b/crates/brk_vec/src/traits/type.rs deleted file mode 100644 index 3acb2f7bf..000000000 --- a/crates/brk_vec/src/traits/type.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::fmt::Debug; - -use serde::Serialize; -use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; - -pub trait StoredType -where - Self: Sized - + Debug - + Clone - + TryFromBytes - + IntoBytes - + Immutable - + KnownLayout - + Send - + Sync - + Serialize, -{ -} - -impl StoredType for T where - T: Sized - + Debug - + Clone - + TryFromBytes - + IntoBytes - + Immutable - + KnownLayout - + Send - + Sync - + Serialize -{ -} diff --git a/crates/brk_vec/src/variants/compressed.rs b/crates/brk_vec/src/variants/compressed.rs deleted file mode 100644 index 307be2f0d..000000000 --- a/crates/brk_vec/src/variants/compressed.rs +++ /dev/null @@ -1,544 +0,0 @@ -use std::{ - borrow::Cow, - collections::{BTreeMap, BTreeSet}, - fs, mem, - path::{Path, PathBuf}, - sync::Arc, -}; - -use arc_swap::{ArcSwap, Guard}; -use brk_core::{Error, Result, Version}; -use memmap2::Mmap; -use rayon::prelude::*; -use zstd::DEFAULT_COMPRESSION_LEVEL; - -use crate::{ - AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec, - CompressedPageMetadata, CompressedPagesMetadata, GenericStoredVec, HEADER_OFFSET, Header, - RawVec, StoredIndex, StoredType, UnsafeSlice, -}; - -const ONE_KIB: usize = 1024; -const ONE_MIB: usize = ONE_KIB * ONE_KIB; -pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; -pub const MAX_PAGE_SIZE: usize = 64 * ONE_KIB; - -const VERSION: Version = Version::TWO; - -#[derive(Debug)] -pub struct CompressedVec { - inner: RawVec, - pages_meta: Arc>, -} - -impl CompressedVec -where - I: StoredIndex, - T: StoredType, -{ - pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; - pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; - pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; - - /// Same as import but will reset the folder under certain errors, so be careful ! - pub fn forced_import(parent: &Path, name: &str, mut version: Version) -> Result { - version = version + VERSION; - let res = Self::import(parent, name, version); - match res { - Err(Error::DifferentCompressionMode) - | Err(Error::WrongEndian) - | Err(Error::WrongLength) - | Err(Error::DifferentVersion { .. }) => { - let path = Self::path_(parent, name); - fs::remove_file(path)?; - Self::import(parent, name, version) - } - _ => res, - } - } - - #[allow(unreachable_code, unused_variables)] - pub fn import(parent: &Path, name: &str, version: Version) -> Result { - let mut inner = RawVec::import(parent, name, version)?; - - let pages_meta = { - let path = inner - .folder() - .join(format!("{}-pages-meta", I::to_string())); - if inner.is_empty() { - let _ = fs::remove_file(&path); - } - CompressedPagesMetadata::read(&path)? - }; - - inner.set_stored_len(if let Some(last) = pages_meta.last() { - (pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize - } else { - 0 - }); - - Ok(Self { - inner, - pages_meta: Arc::new(ArcSwap::new(Arc::new(pages_meta))), - }) - } - - fn decode_page(&self, page_index: usize, mmap: &Mmap) -> Result> { - Self::decode_page_(self.stored_len(), page_index, mmap, &self.pages_meta.load()) - } - - fn decode_page_( - stored_len: usize, - page_index: usize, - mmap: &Mmap, - compressed_pages_meta: &CompressedPagesMetadata, - ) -> Result> { - if Self::page_index_to_index(page_index) >= stored_len { - return Err(Error::IndexTooHigh); - } else if compressed_pages_meta.len() <= page_index { - return Err(Error::ExpectVecToHaveIndex); - } - - let page = compressed_pages_meta.get(page_index).unwrap(); - let len = page.bytes_len as usize; - let offset = page.start as usize; - - Ok(zstd::decode_all(&mmap[offset..offset + len]) - .inspect_err(|_| { - dbg!((len, offset, page_index, &mmap[..], &mmap.len())); - })? - .chunks(Self::SIZE_OF_T) - .map(|slice| T::try_read_from_bytes(slice).unwrap()) - .collect::>()) - } - - fn compress_page(chunk: &[T]) -> Vec { - if chunk.len() > Self::PER_PAGE { - panic!(); - } - - let mut bytes: Vec = vec![0; chunk.len() * Self::SIZE_OF_T]; - - let unsafe_bytes = UnsafeSlice::new(&mut bytes); - - chunk - .into_par_iter() - .enumerate() - .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); - - zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL).unwrap() - } - - #[inline] - fn index_to_page_index(index: usize) -> usize { - index / Self::PER_PAGE - } - - #[inline] - fn page_index_to_index(page_index: usize) -> usize { - page_index * Self::PER_PAGE - } - - #[inline] - pub fn iter(&self) -> CompressedVecIterator<'_, I, T> { - self.into_iter() - } - - #[inline] - pub fn iter_at(&self, i: I) -> CompressedVecIterator<'_, I, T> { - self.iter_at_(i.unwrap_to_usize()) - } - - #[inline] - pub fn iter_at_(&self, i: usize) -> CompressedVecIterator<'_, I, T> { - let mut iter = self.into_iter(); - iter.set_(i); - iter - } -} - -impl GenericStoredVec for CompressedVec -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn read_(&self, index: usize, mmap: &Mmap) -> Result> { - let page_index = Self::index_to_page_index(index); - let decoded_index = index % Self::PER_PAGE; - - Ok(self - .decode_page(page_index, mmap)? - .get(decoded_index) - .cloned()) - } - - fn header(&self) -> &Header { - self.inner.header() - } - - fn mut_header(&mut self) -> &mut Header { - self.inner.mut_header() - } - - fn parent(&self) -> &Path { - self.inner.parent() - } - - #[inline] - fn stored_len(&self) -> usize { - self.inner.stored_len() - } - - #[inline] - fn pushed(&self) -> &[T] { - self.inner.pushed() - } - #[inline] - fn mut_pushed(&mut self) -> &mut Vec { - self.inner.mut_pushed() - } - #[inline] - fn holes(&self) -> &BTreeSet { - self.inner.holes() - } - #[inline] - fn mut_holes(&mut self) -> &mut BTreeSet { - panic!("unsupported") - } - #[inline] - fn updated(&self) -> &BTreeMap { - self.inner.updated() - } - #[inline] - fn mut_updated(&mut self) -> &mut BTreeMap { - panic!("unsupported") - } - - #[inline] - fn path(&self) -> PathBuf { - self.inner.path() - } - - fn flush(&mut self) -> Result<()> { - let file_opt = self.inner.write_header_if_needed()?; - - let pushed_len = self.pushed_len(); - - if pushed_len == 0 { - return Ok(()); - } - - let stored_len = self.stored_len(); - - let mut file = file_opt.unwrap_or(self.open_file()?); - - let mut pages_meta = (**self.pages_meta.load()).clone(); - - let mut starting_page_index = pages_meta.len(); - let mut values = vec![]; - let mut truncate_at = None; - - if self.stored_len() % Self::PER_PAGE != 0 { - if pages_meta.is_empty() { - unreachable!() - } - - let last_page_index = pages_meta.len() - 1; - - let mmap = unsafe { Mmap::map(&file)? }; - - values = Self::decode_page_(stored_len, last_page_index, &mmap, &pages_meta) - .inspect_err(|_| { - dbg!(last_page_index, &pages_meta); - }) - .unwrap(); - - truncate_at.replace(pages_meta.pop().unwrap().start); - starting_page_index = last_page_index; - } - - let compressed = values - .into_par_iter() - .chain(mem::take(self.mut_pushed()).into_par_iter()) - .chunks(Self::PER_PAGE) - .map(|chunk| (Self::compress_page(chunk.as_ref()), chunk.len())) - .collect::>(); - - compressed - .iter() - .enumerate() - .for_each(|(i, (compressed_bytes, values_len))| { - let page_index = starting_page_index + i; - - let start = if page_index != 0 { - let prev = pages_meta.get(page_index - 1).unwrap(); - prev.start + prev.bytes_len as u64 - } else { - 0 - }; - let offsetted_start = start + HEADER_OFFSET as u64; - - let bytes_len = compressed_bytes.len() as u32; - let values_len = *values_len as u32; - - let page = CompressedPageMetadata::new(offsetted_start, bytes_len, values_len); - - pages_meta.push(page_index, page); - }); - - let buf = compressed - .into_iter() - .flat_map(|(v, _)| v) - .collect::>(); - - pages_meta.write()?; - - if let Some(truncate_at) = truncate_at { - self.file_set_len(&mut file, truncate_at)?; - } - - self.file_write_all(&mut file, &buf)?; - - self.pages_meta.store(Arc::new(pages_meta)); - - Ok(()) - } - - fn reset(&mut self) -> Result<()> { - let mut pages_meta = (**self.pages_meta.load()).clone(); - pages_meta.truncate(0); - pages_meta.write()?; - self.pages_meta.store(Arc::new(pages_meta)); - self.reset_() - } - - fn truncate_if_needed(&mut self, index: I) -> Result<()> { - let index = index.to_usize()?; - - if index >= self.stored_len() { - return Ok(()); - } - - if index == 0 { - self.reset()?; - return Ok(()); - } - - let mut pages_meta = (**self.pages_meta.load()).clone(); - - let page_index = Self::index_to_page_index(index); - - let mut file = self.open_file()?; - - let mmap = unsafe { Mmap::map(&file)? }; - - let values = self.decode_page(page_index, &mmap)?; - let mut buf = vec![]; - - let mut page = pages_meta.truncate(page_index).unwrap(); - - let len = page.start; - - let decoded_index = index % Self::PER_PAGE; - - if decoded_index != 0 { - let chunk = &values[..decoded_index]; - - buf = Self::compress_page(chunk); - - page.values_len = chunk.len() as u32; - page.bytes_len = buf.len() as u32; - - pages_meta.push(page_index, page); - } - - pages_meta.write()?; - - self.pages_meta.store(Arc::new(pages_meta)); - - self.file_truncate_and_write_all(&mut file, len, &buf)?; - - Ok(()) - } -} - -impl AnyVec for CompressedVec -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn version(&self) -> Version { - self.inner.version() - } - - #[inline] - fn name(&self) -> &str { - self.inner.name() - } - - #[inline] - fn len(&self) -> usize { - self.len_() - } - - #[inline] - fn index_type_to_string(&self) -> &'static str { - I::to_string() - } - - #[inline] - fn value_type_to_size_of(&self) -> usize { - size_of::() - } -} - -impl Clone for CompressedVec { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - pages_meta: self.pages_meta.clone(), - } - } -} - -#[derive(Debug)] -pub struct CompressedVecIterator<'a, I, T> { - vec: &'a CompressedVec, - mmap: Mmap, - decoded_page: Option<(usize, Vec)>, - // second_decoded_page?: Option<(usize, Vec)>, - pages_meta: Guard>, - stored_len: usize, - index: usize, -} - -impl CompressedVecIterator<'_, I, T> -where - I: StoredIndex, - T: StoredType, -{ - const SIZE_OF_T: usize = size_of::(); - const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; -} - -impl BaseVecIterator for CompressedVecIterator<'_, I, T> -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn mut_index(&mut self) -> &mut usize { - &mut self.index - } - - #[inline] - fn len(&self) -> usize { - self.vec.len() - } - - #[inline] - fn name(&self) -> &str { - self.vec.name() - } -} - -impl<'a, I, T> Iterator for CompressedVecIterator<'a, I, T> -where - I: StoredIndex, - T: StoredType, -{ - type Item = (I, Cow<'a, T>); - - fn next(&mut self) -> Option { - let mmap = &self.mmap; - let i = self.index; - let stored_len = self.stored_len; - - let result = if i >= stored_len { - let j = i - stored_len; - if j >= self.vec.pushed_len() { - return None; - } - self.vec - .pushed() - .get(j) - .map(|v| (I::from(i), Cow::Borrowed(v))) - } else { - let page_index = i / Self::PER_PAGE; - - if self.decoded_page.as_ref().is_none_or(|b| b.0 != page_index) { - let values = CompressedVec::::decode_page_( - stored_len, - page_index, - mmap, - &self.pages_meta, - ) - .unwrap(); - self.decoded_page.replace((page_index, values)); - } - - self.decoded_page - .as_ref() - .unwrap() - .1 - .get(i % Self::PER_PAGE) - .map(|v| (I::from(i), Cow::Owned(v.clone()))) - }; - - self.index += 1; - - result - } -} - -impl<'a, I, T> IntoIterator for &'a CompressedVec -where - I: StoredIndex, - T: StoredType, -{ - type Item = (I, Cow<'a, T>); - type IntoIter = CompressedVecIterator<'a, I, T>; - - fn into_iter(self) -> Self::IntoIter { - let pages_meta = self.pages_meta.load(); - let stored_len = self.stored_len(); - - CompressedVecIterator { - vec: self, - mmap: self.create_mmap().unwrap(), - decoded_page: None, - pages_meta, - index: 0, - stored_len, - } - } -} - -impl AnyIterableVec for CompressedVec -where - I: StoredIndex, - T: StoredType, -{ - fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - T: 'a, - { - Box::new(self.into_iter()) - } -} - -impl AnyCollectableVec for CompressedVec -where - I: StoredIndex, - T: StoredType, -{ - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result> { - CollectableVec::collect_range_serde_json(self, from, to) - } -} diff --git a/crates/brk_vec/src/variants/computed.rs b/crates/brk_vec/src/variants/computed.rs deleted file mode 100644 index 53cc45ff2..000000000 --- a/crates/brk_vec/src/variants/computed.rs +++ /dev/null @@ -1,405 +0,0 @@ -use std::{borrow::Cow, fs, path::Path}; - -use brk_exit::Exit; -use clap_derive::ValueEnum; -use serde::{Deserialize, Serialize}; - -use brk_core::{Result, StoredPhantom, Version}; - -use crate::{ - AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedAnyIterableVec, - BoxedVecIterator, CollectableVec, Format, StoredIndex, StoredType, -}; - -use super::{ - ComputeFrom1, ComputeFrom2, ComputeFrom3, EagerVec, LazyVecFrom1, LazyVecFrom1Iterator, - LazyVecFrom2, LazyVecFrom2Iterator, LazyVecFrom3, LazyVecFrom3Iterator, StoredVecIterator, -}; - -#[derive( - Default, Debug, PartialEq, PartialOrd, Ord, Eq, Clone, Copy, Serialize, Deserialize, ValueEnum, -)] -pub enum Computation { - Eager, - #[default] - Lazy, -} - -impl Computation { - pub fn eager(&self) -> bool { - *self == Self::Eager - } - - pub fn lazy(&self) -> bool { - *self == Self::Lazy - } -} - -#[derive(Clone)] -pub enum Dependencies -where - S1T: Clone, - S2T: Clone, - S3T: Clone, -{ - From1(BoxedAnyIterableVec, ComputeFrom1), - From2( - (BoxedAnyIterableVec, BoxedAnyIterableVec), - ComputeFrom2, - ), - From3( - ( - BoxedAnyIterableVec, - BoxedAnyIterableVec, - BoxedAnyIterableVec, - ), - ComputeFrom3, - ), -} - -pub type ComputedVecFrom1 = - ComputedVec; -pub type ComputedVecFrom2 = - ComputedVec; -pub type ComputedVecFrom3 = - ComputedVec; - -#[derive(Clone)] -pub enum ComputedVec -where - S1T: Clone, - S2T: Clone, - S3T: Clone, -{ - Eager { - vec: EagerVec, - deps: Dependencies, - }, - LazyFrom1(LazyVecFrom1), - LazyFrom2(LazyVecFrom2), - LazyFrom3(LazyVecFrom3), -} - -impl ComputedVec -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - pub fn forced_import_or_init_from_1( - computation: Computation, - path: &Path, - name: &str, - version: Version, - format: Format, - source: BoxedAnyIterableVec, - compute: ComputeFrom1, - ) -> Result { - Ok(match computation { - Computation::Eager => Self::Eager { - vec: EagerVec::forced_import(path, name, version, format)?, - deps: Dependencies::From1(source, compute), - }, - Computation::Lazy => { - let _ = fs::remove_dir_all(path.join(name).join(I::to_string())); - Self::LazyFrom1(LazyVecFrom1::init(name, version, source, compute)) - } - }) - } - - #[allow(clippy::too_many_arguments)] - pub fn forced_import_or_init_from_2( - computation: Computation, - path: &Path, - name: &str, - version: Version, - format: Format, - source1: BoxedAnyIterableVec, - source2: BoxedAnyIterableVec, - compute: ComputeFrom2, - ) -> Result { - Ok(match computation { - Computation::Eager => Self::Eager { - vec: EagerVec::forced_import(path, name, version, format)?, - deps: Dependencies::From2((source1, source2), compute), - }, - Computation::Lazy => { - let _ = fs::remove_dir_all(path.join(name).join(I::to_string())); - Self::LazyFrom2(LazyVecFrom2::init(name, version, source1, source2, compute)) - } - }) - } - - #[allow(clippy::too_many_arguments)] - pub fn forced_import_or_init_from_3( - computation: Computation, - path: &Path, - name: &str, - version: Version, - format: Format, - source1: BoxedAnyIterableVec, - source2: BoxedAnyIterableVec, - source3: BoxedAnyIterableVec, - compute: ComputeFrom3, - ) -> Result { - Ok(match computation { - Computation::Eager => Self::Eager { - vec: EagerVec::forced_import(path, name, version, format)?, - deps: Dependencies::From3((source1, source2, source3), compute), - }, - Computation::Lazy => { - let _ = fs::remove_dir_all(path.join(name).join(I::to_string())); - Self::LazyFrom3(LazyVecFrom3::init( - name, version, source1, source2, source3, compute, - )) - } - }) - } - - pub fn compute_if_necessary( - &mut self, - max_from: I, - len_source: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> { - let (vec, dependencies) = if let ComputedVec::Eager { - vec, - deps: dependencies, - } = self - { - (vec, dependencies) - } else { - return Ok(()); - }; - - let len = len_source.len(); - - match dependencies { - Dependencies::From1(source, compute) => { - let version = source.version(); - let mut iter = source.iter(); - let t = |i: I| compute(i, &mut *iter).map(|v| (i, v)).unwrap(); - vec.compute_to(max_from, len, version, t, exit) - } - Dependencies::From2((source1, source2), compute) => { - let version = source1.version() + source2.version(); - let mut iter1 = source1.iter(); - let mut iter2 = source2.iter(); - let t = |i: I| { - compute(i, &mut *iter1, &mut *iter2) - .map(|v| (i, v)) - .unwrap() - }; - vec.compute_to(max_from, len, version, t, exit) - } - Dependencies::From3((source1, source2, source3), compute) => { - let version = source1.version() + source2.version() + source3.version(); - let mut iter1 = source1.iter(); - let mut iter2 = source2.iter(); - let mut iter3 = source3.iter(); - let t = |i: I| { - compute(i, &mut *iter1, &mut *iter2, &mut *iter3) - .map(|v| (i, v)) - .unwrap() - }; - vec.compute_to(max_from, len, version, t, exit) - } - } - } -} - -impl AnyVec for ComputedVec -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - fn version(&self) -> Version { - match self { - ComputedVec::Eager { vec, .. } => vec.version(), - ComputedVec::LazyFrom1(v) => v.version(), - ComputedVec::LazyFrom2(v) => v.version(), - ComputedVec::LazyFrom3(v) => v.version(), - } - } - - fn name(&self) -> &str { - match self { - ComputedVec::Eager { vec, .. } => vec.name(), - ComputedVec::LazyFrom1(v) => v.name(), - ComputedVec::LazyFrom2(v) => v.name(), - ComputedVec::LazyFrom3(v) => v.name(), - } - } - - fn index_type_to_string(&self) -> &'static str { - I::to_string() - } - - fn len(&self) -> usize { - match self { - ComputedVec::Eager { vec, .. } => vec.len(), - ComputedVec::LazyFrom1(v) => v.len(), - ComputedVec::LazyFrom2(v) => v.len(), - ComputedVec::LazyFrom3(v) => v.len(), - } - } - - #[inline] - fn value_type_to_size_of(&self) -> usize { - size_of::() - } -} - -pub enum ComputedVecIterator<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T> -where - S1T: Clone, - S2T: Clone, - S3T: Clone, -{ - Eager(StoredVecIterator<'a, I, T>), - LazyFrom1(LazyVecFrom1Iterator<'a, I, T, S1I, S1T>), - LazyFrom2(LazyVecFrom2Iterator<'a, I, T, S1I, S1T, S2I, S2T>), - LazyFrom3(LazyVecFrom3Iterator<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T>), -} - -impl<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T> Iterator - for ComputedVecIterator<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T> -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - type Item = (I, Cow<'a, T>); - fn next(&mut self) -> Option { - match self { - Self::Eager(i) => i.next(), - Self::LazyFrom1(i) => i.next(), - Self::LazyFrom2(i) => i.next(), - Self::LazyFrom3(i) => i.next(), - } - } -} - -impl BaseVecIterator - for ComputedVecIterator<'_, I, T, S1I, S1T, S2I, S2T, S3I, S3T> -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - #[inline] - fn mut_index(&mut self) -> &mut usize { - match self { - Self::Eager(i) => i.mut_index(), - Self::LazyFrom1(i) => i.mut_index(), - Self::LazyFrom2(i) => i.mut_index(), - Self::LazyFrom3(i) => i.mut_index(), - } - } - - fn len(&self) -> usize { - match self { - Self::Eager(i) => i.len(), - Self::LazyFrom1(i) => i.len(), - Self::LazyFrom2(i) => i.len(), - Self::LazyFrom3(i) => i.len(), - } - } - - #[inline] - fn name(&self) -> &str { - match self { - Self::Eager(i) => i.name(), - Self::LazyFrom1(i) => i.name(), - Self::LazyFrom2(i) => i.name(), - Self::LazyFrom3(i) => i.name(), - } - } -} - -impl<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T> IntoIterator - for &'a ComputedVec -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - type Item = (I, Cow<'a, T>); - type IntoIter = ComputedVecIterator<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T>; - - fn into_iter(self) -> Self::IntoIter { - match self { - ComputedVec::Eager { vec, .. } => ComputedVecIterator::Eager(vec.into_iter()), - ComputedVec::LazyFrom1(v) => ComputedVecIterator::LazyFrom1(v.into_iter()), - ComputedVec::LazyFrom2(v) => ComputedVecIterator::LazyFrom2(v.into_iter()), - ComputedVec::LazyFrom3(v) => ComputedVecIterator::LazyFrom3(v.into_iter()), - } - } -} - -impl AnyIterableVec - for ComputedVec -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - T: 'a, - { - Box::new(self.into_iter()) - } -} - -impl AnyCollectableVec - for ComputedVec -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result> { - CollectableVec::collect_range_serde_json(self, from, to) - } -} diff --git a/crates/brk_vec/src/variants/eager.rs b/crates/brk_vec/src/variants/eager.rs deleted file mode 100644 index 68adea914..000000000 --- a/crates/brk_vec/src/variants/eager.rs +++ /dev/null @@ -1,1321 +0,0 @@ -use core::error; -use std::{ - borrow::Cow, - cmp::Ordering, - f32, - fmt::Debug, - ops::{Add, Div, Mul}, - path::{Path, PathBuf}, -}; - -use brk_core::{ - Bitcoin, CheckedSub, Close, Date, DateIndex, Dollars, Error, Result, Sats, StoredF32, - StoredUsize, Version, -}; -use brk_exit::Exit; -use log::info; -use memmap2::Mmap; - -use crate::{ - AnyCollectableVec, AnyIterableVec, AnyVec, BoxedVecIterator, CollectableVec, Format, - GenericStoredVec, StoredIndex, StoredType, StoredVec, StoredVecIterator, VecIterator, -}; - -const ONE_KIB: usize = 1024; -const ONE_MIB: usize = ONE_KIB * ONE_KIB; -const MAX_CACHE_SIZE: usize = 256 * ONE_MIB; -const DCA_AMOUNT: Dollars = Dollars::mint(100.0); - -#[derive(Debug, Clone)] -pub struct EagerVec(StoredVec); -// computed_version: Arc>>, - -impl EagerVec -where - I: StoredIndex, - T: StoredType, -{ - const SIZE_OF: usize = size_of::(); - - pub fn forced_import( - path: &Path, - value_name: &str, - version: Version, - format: Format, - ) -> Result { - Ok(Self(StoredVec::forced_import( - path, value_name, version, format, - )?)) - } - - fn safe_truncate_if_needed(&mut self, index: I, exit: &Exit) -> Result<()> { - let _lock = exit.lock(); - self.0.truncate_if_needed(index)?; - Ok(()) - } - - #[inline] - pub fn forced_push_at(&mut self, index: I, value: T, exit: &Exit) -> Result<()> { - match self.len().cmp(&index.to_usize()?) { - Ordering::Less => { - return Err(Error::IndexTooHigh); - } - ord => { - if ord == Ordering::Greater { - self.safe_truncate_if_needed(index, exit)?; - } - self.0.push(value); - } - } - - if self.0.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE { - self.safe_flush(exit) - } else { - Ok(()) - } - } - - pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { - let _lock = exit.lock(); - self.0.flush()?; - Ok(()) - } - - pub fn path(&self) -> PathBuf { - self.0.path() - } - - pub fn get_or_read(&self, index: I, mmap: &Mmap) -> Result>> { - self.0.get_or_read(index, mmap) - } - - pub fn inner_version(&self) -> Version { - self.0.version() - } - - fn update_computed_version(&mut self, computed_version: Version) { - self.0 - .mut_header() - .update_computed_version(computed_version); - } - - pub fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> { - if version != self.0.header().computed_version() { - self.update_computed_version(version); - if !self.is_empty() { - self.0.reset()?; - } - } - - if self.is_empty() { - info!( - "Computing {}_to_{}...", - self.index_type_to_string(), - self.name() - ) - } - - Ok(()) - } - - pub fn compute_to( - &mut self, - max_from: I, - to: usize, - version: Version, - mut t: F, - exit: &Exit, - ) -> Result<()> - where - F: FnMut(I) -> (I, T), - { - self.validate_computed_version_or_reset_file(Version::ZERO + self.0.version() + version)?; - - let index = max_from.min(I::from(self.len())); - (index.to_usize()?..to).try_for_each(|i| { - let (i, v) = t(I::from(i)); - self.forced_push_at(i, v, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_range( - &mut self, - max_from: I, - other: &impl AnyIterableVec, - t: F, - exit: &Exit, - ) -> Result<()> - where - A: StoredType, - F: FnMut(I) -> (I, T), - { - self.compute_to(max_from, other.len(), other.version(), t, exit) - } - - pub fn compute_from_index( - &mut self, - max_from: I, - other: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T: From, - T2: StoredType, - { - self.compute_to( - max_from, - other.len(), - other.version(), - |i| (i, T::from(i)), - exit, - ) - } - - pub fn compute_transform( - &mut self, - max_from: A, - other: &impl AnyIterableVec, - mut t: F, - exit: &Exit, - ) -> Result<()> - where - A: StoredIndex, - B: StoredType, - F: FnMut((A, B, &Self)) -> (I, T), - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + other.version(), - )?; - - let index = max_from.min(A::from(self.len())); - other.iter_at(index).try_for_each(|(a, b)| { - let (i, v) = t((a, b.into_owned(), self)); - self.forced_push_at(i, v, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_add( - &mut self, - max_from: I, - added: &impl AnyIterableVec, - adder: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T: Add, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + added.version() + adder.version(), - )?; - - let index = max_from.min(I::from(self.len())); - let mut adder_iter = adder.iter(); - - added.iter_at(index).try_for_each(|(i, v)| { - let v = v.into_owned() + adder_iter.unwrap_get_inner(i); - - self.forced_push_at(i, v, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_subtract( - &mut self, - max_from: I, - subtracted: &impl AnyIterableVec, - subtracter: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T: CheckedSub, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + subtracted.version() + subtracter.version(), - )?; - - let index = max_from.min(I::from(self.len())); - let mut subtracter_iter = subtracter.iter(); - - subtracted.iter_at(index).try_for_each(|(i, v)| { - let v = v - .into_owned() - .checked_sub(subtracter_iter.unwrap_get_inner(i)) - .unwrap(); - - self.forced_push_at(i, v, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_max( - &mut self, - max_from: I, - source: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T: From + Ord, - T2: StoredType, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + source.version(), - )?; - - let index = max_from.min(I::from(self.len())); - - let mut prev = None; - - source.iter_at(index).try_for_each(|(i, v)| { - if prev.is_none() { - let i = i.unwrap_to_usize(); - prev.replace(if i > 0 { - self.into_iter().unwrap_get_inner_(i - 1) - } else { - T::from(source.iter().unwrap_get_inner_(0)) - }); - } - let max = prev.clone().unwrap().max(T::from(v.into_owned())); - prev.replace(max.clone()); - - self.forced_push_at(i, max, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_multiply( - &mut self, - max_from: I, - multiplied: &impl AnyIterableVec, - multiplier: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T2: StoredType + Mul, - T3: StoredType, - T4: StoredType, - T: From, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + multiplied.version() + multiplier.version(), - )?; - - let index = max_from.min(I::from(self.len())); - let mut multiplier_iter = multiplier.iter(); - - multiplied.iter_at(index).try_for_each(|(i, v)| { - let v = v.into_owned() * multiplier_iter.unwrap_get_inner(i); - - self.forced_push_at(i, v.into(), exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_divide( - &mut self, - max_from: I, - divided: &impl AnyIterableVec, - divider: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T2: StoredType + Mul, - T3: StoredType, - T4: Div + From, - T5: CheckedSub, - T: From, - { - self.compute_divide_(max_from, divided, divider, exit, false, false) - } - - pub fn compute_percentage( - &mut self, - max_from: I, - divided: &impl AnyIterableVec, - divider: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T2: StoredType + Mul, - T3: StoredType, - T4: Div + From, - T5: CheckedSub, - T: From, - { - self.compute_divide_(max_from, divided, divider, exit, true, false) - } - - pub fn compute_percentage_difference( - &mut self, - max_from: I, - divided: &impl AnyIterableVec, - divider: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T2: StoredType + Mul, - T3: StoredType, - T4: Div + From, - T5: CheckedSub, - T: From, - { - self.compute_divide_(max_from, divided, divider, exit, true, true) - } - - pub fn compute_divide_( - &mut self, - max_from: I, - divided: &impl AnyIterableVec, - divider: &impl AnyIterableVec, - exit: &Exit, - as_percentage: bool, - as_difference: bool, - ) -> Result<()> - where - T2: StoredType + Mul, - T3: StoredType, - T4: Div + From, - T5: CheckedSub, - T: From, - { - self.validate_computed_version_or_reset_file( - Version::ONE + self.0.version() + divided.version() + divider.version(), - )?; - - let index = max_from.min(I::from(self.len())); - let multiplier = if as_percentage { 100 } else { 1 }; - - let mut divider_iter = divider.iter(); - divided.iter_at(index).try_for_each(|(i, divided)| { - let divided = divided.into_owned(); - let divider = divider_iter.unwrap_get_inner(i); - - let v = if as_percentage { - divided * multiplier - } else { - T4::from(divided) - }; - let mut v = v / divider; - if as_difference { - v = v.checked_sub(multiplier).unwrap(); - } - self.forced_push_at(i, T::from(v), exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_drawdown( - &mut self, - max_from: I, - close: &impl AnyIterableVec>, - ath: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T: From, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + ath.version() + close.version(), - )?; - - let index = max_from.min(I::from(self.len())); - let mut close_iter = close.iter(); - ath.iter_at(index).try_for_each(|(i, ath)| { - let ath = ath.into_owned(); - if ath == Dollars::ZERO { - self.forced_push_at(i, T::from(StoredF32::default()), exit) - } else { - let close = *close_iter.unwrap_get_inner(i); - let drawdown = StoredF32::from((*ath - *close) / *ath * -100.0); - self.forced_push_at(i, T::from(drawdown), exit) - } - })?; - - self.safe_flush(exit) - } - - pub fn compute_inverse_more_to_less( - &mut self, - max_from: T, - other: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - I: StoredType + StoredIndex, - T: StoredIndex, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + other.version(), - )?; - - let index = max_from.min( - VecIterator::last(self.0.into_iter()).map_or_else(T::default, |(_, v)| v.into_owned()), - ); - let mut prev_i = None; - other.iter_at(index).try_for_each(|(v, i)| -> Result<()> { - let i = i.into_owned(); - if prev_i.is_some_and(|prev_i| prev_i == i) { - return Ok(()); - } - if self.iter().get_inner(i).is_none_or(|old_v| old_v > v) { - self.forced_push_at(i, v, exit)?; - } - prev_i.replace(i); - Ok(()) - })?; - - self.safe_flush(exit) - } - - pub fn compute_inverse_less_to_more( - &mut self, - max_from: T, - first_indexes: &impl AnyIterableVec, - indexes_count: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - I: StoredType, - T: StoredIndex, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + first_indexes.version() + indexes_count.version(), - )?; - - let mut indexes_count_iter = indexes_count.iter(); - - let index = max_from.min(T::from(self.len())); - first_indexes - .iter_at(index) - .try_for_each(|(value, first_index)| { - let first_index = (first_index).to_usize()?; - let count = *indexes_count_iter.unwrap_get_inner(value); - (first_index..first_index + count) - .try_for_each(|index| self.forced_push_at(I::from(index), value, exit)) - })?; - - self.safe_flush(exit) - } - - pub fn compute_count_from_indexes( - &mut self, - max_from: I, - first_indexes: &impl AnyIterableVec, - other_to_else: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T: From, - T2: StoredType - + StoredIndex - + Copy - + Add - + CheckedSub - + TryInto - + Default, - >::Error: error::Error + 'static, - T3: StoredType, - { - let opt: Option bool>> = None; - self.compute_filtered_count_from_indexes_(max_from, first_indexes, other_to_else, opt, exit) - } - - pub fn compute_filtered_count_from_indexes( - &mut self, - max_from: I, - first_indexes: &impl AnyIterableVec, - other_to_else: &impl AnyIterableVec, - filter: F, - exit: &Exit, - ) -> Result<()> - where - T: From, - T2: StoredType - + StoredIndex - + Copy - + Add - + CheckedSub - + TryInto - + Default, - >::Error: error::Error + 'static, - T3: StoredType, - F: FnMut(T2) -> bool, - { - self.compute_filtered_count_from_indexes_( - max_from, - first_indexes, - other_to_else, - Some(Box::new(filter)), - exit, - ) - } - - fn compute_filtered_count_from_indexes_( - &mut self, - max_from: I, - first_indexes: &impl AnyIterableVec, - other_to_else: &impl AnyIterableVec, - mut filter: Option bool + '_>>, - exit: &Exit, - ) -> Result<()> - where - T: From, - T2: StoredType - + StoredIndex - + Copy - + Add - + CheckedSub - + TryInto - + Default, - T3: StoredType, - >::Error: error::Error + 'static, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + first_indexes.version() + other_to_else.version(), - )?; - - let mut other_iter = first_indexes.iter(); - let index = max_from.min(I::from(self.len())); - first_indexes - .iter_at(index) - .try_for_each(|(i, first_index)| { - let end = other_iter - .get_inner(i + 1) - .map(|v| v.unwrap_to_usize()) - .unwrap_or_else(|| other_to_else.len()); - - let range = first_index.unwrap_to_usize()..end; - let count = if let Some(filter) = filter.as_mut() { - range.into_iter().filter(|i| filter(T2::from(*i))).count() - } else { - range.count() - }; - self.forced_push_at(i, T::from(T2::from(count)), exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_is_first_ordered( - &mut self, - max_from: I, - self_to_other: &impl AnyIterableVec, - other_to_self: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - I: StoredType, - T: From, - A: StoredIndex + StoredType, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + self_to_other.version() + other_to_self.version(), - )?; - - let mut other_to_self_iter = other_to_self.iter(); - let index = max_from.min(I::from(self.len())); - self_to_other.iter_at(index).try_for_each(|(i, other)| { - self.forced_push_at( - i, - T::from(other_to_self_iter.unwrap_get_inner(other.into_owned()) == i), - exit, - ) - })?; - - self.safe_flush(exit) - } - - pub fn compute_sum_from_indexes( - &mut self, - max_from: I, - first_indexes: &impl AnyIterableVec, - indexes_count: &impl AnyIterableVec, - source: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T: From + Add, - T2: StoredIndex + StoredType, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + first_indexes.version() + indexes_count.version(), - )?; - - let mut indexes_count_iter = indexes_count.iter(); - let mut source_iter = source.iter(); - let index = max_from.min(I::from(self.len())); - first_indexes - .iter_at(index) - .try_for_each(|(i, first_index)| { - let count = *indexes_count_iter.unwrap_get_inner(i); - let first_index = first_index.unwrap_to_usize(); - let range = first_index..first_index + count; - let mut sum = T::from(0_usize); - range.into_iter().for_each(|i| { - sum = sum.clone() + source_iter.unwrap_get_inner(T2::from(i)); - }); - self.forced_push_at(i, sum, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_sum_of_others( - &mut self, - max_from: I, - others: &[&impl AnyIterableVec], - exit: &Exit, - ) -> Result<()> - where - T: From + Add, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + others.iter().map(|v| v.version()).sum(), - )?; - - if others.is_empty() { - unreachable!("others should've length of 1 at least"); - } - - let mut others_iter = others[1..].iter().map(|v| v.iter()).collect::>(); - - let index = max_from.min(I::from(self.len())); - others - .first() - .unwrap() - .iter_at(index) - .try_for_each(|(i, v)| { - let mut sum = v.into_owned(); - others_iter.iter_mut().for_each(|iter| { - sum = sum.clone() + iter.unwrap_get_inner(i); - }); - self.forced_push_at(i, sum, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_min_of_others( - &mut self, - max_from: I, - others: &[&impl AnyIterableVec], - exit: &Exit, - ) -> Result<()> - where - T: From + Add + Ord, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + others.iter().map(|v| v.version()).sum(), - )?; - - if others.is_empty() { - unreachable!("others should've length of 1 at least"); - } - - let mut others_iter = others[1..].iter().map(|v| v.iter()).collect::>(); - - let index = max_from.min(I::from(self.len())); - others - .first() - .unwrap() - .iter_at(index) - .try_for_each(|(i, v)| { - let min = v.into_owned(); - let min = others_iter - .iter_mut() - .map(|iter| iter.unwrap_get_inner(i)) - .min() - .map_or(min.clone(), |min2| min.min(min2)); - self.forced_push_at(i, min, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_max_of_others( - &mut self, - max_from: I, - others: &[&impl AnyIterableVec], - exit: &Exit, - ) -> Result<()> - where - T: From + Add + Ord, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + others.iter().map(|v| v.version()).sum(), - )?; - - if others.is_empty() { - unreachable!("others should've length of 1 at least"); - } - - let mut others_iter = others[1..].iter().map(|v| v.iter()).collect::>(); - - let index = max_from.min(I::from(self.len())); - others - .first() - .unwrap() - .iter_at(index) - .try_for_each(|(i, v)| { - let max = v.into_owned(); - let max = others_iter - .iter_mut() - .map(|iter| iter.unwrap_get_inner(i)) - .max() - .map_or(max.clone(), |max2| max.max(max2)); - self.forced_push_at(i, max, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_sma( - &mut self, - max_from: I, - source: &impl AnyIterableVec, - sma: usize, - exit: &Exit, - ) -> Result<()> - where - T: Add + From + Div + From, - T2: StoredType, - f32: From + From, - { - self.compute_sma_(max_from, source, sma, exit, None) - } - - pub fn compute_sma_( - &mut self, - max_from: I, - source: &impl AnyIterableVec, - sma: usize, - exit: &Exit, - min_i: Option, - ) -> Result<()> - where - T: Add + From + Div + From, - T2: StoredType, - f32: From + From, - { - self.validate_computed_version_or_reset_file( - Version::ONE + self.0.version() + source.version(), - )?; - - let index = max_from.min(I::from(self.len())); - let mut prev = None; - let min_prev_i = min_i.unwrap_or_default().unwrap_to_usize(); - let mut other_iter = source.iter(); - source.iter_at(index).try_for_each(|(i, value)| { - let value = value.into_owned(); - - if min_i.is_none() || min_i.is_some_and(|min_i| min_i <= i) { - if prev.is_none() { - let i = i.unwrap_to_usize(); - prev.replace(if i > min_prev_i { - self.into_iter().unwrap_get_inner_(i - 1) - } else { - T::from(0.0) - }); - } - - let processed_values_count = i.unwrap_to_usize() - min_prev_i + 1; - let len = (processed_values_count).min(sma); - - let value = f32::from(value); - - let sma = T::from(if processed_values_count > sma { - let prev_sum = f32::from(prev.clone().unwrap()) * len as f32; - let value_to_subtract = f32::from( - other_iter.unwrap_get_inner_(i.unwrap_to_usize().checked_sub(sma).unwrap()), - ); - (prev_sum - value_to_subtract + value) / len as f32 - } else { - (f32::from(prev.clone().unwrap()) * (len - 1) as f32 + value) / len as f32 - }); - - prev.replace(sma.clone()); - self.forced_push_at(i, sma, exit) - } else { - self.forced_push_at(i, T::from(f32::NAN), exit) - } - })?; - - self.safe_flush(exit) - } - - pub fn compute_previous_value( - &mut self, - max_from: I, - source: &impl AnyIterableVec, - len: usize, - exit: &Exit, - ) -> Result<()> - where - I: CheckedSub, - T2: StoredType + Default, - f32: From, - T: From, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + source.version(), - )?; - - let index = max_from.min(I::from(self.len())); - let mut source_iter = source.iter(); - (index.to_usize()?..source.len()).try_for_each(|i| { - let i = I::from(i); - - let previous_value = i - .checked_sub(I::from(len)) - .map(|prev_i| f32::from(source_iter.unwrap_get_inner(prev_i))) - .unwrap_or(f32::NAN); - - self.forced_push_at(i, T::from(previous_value), exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_change( - &mut self, - max_from: I, - source: &impl AnyIterableVec, - len: usize, - exit: &Exit, - ) -> Result<()> - where - I: CheckedSub, - T: CheckedSub + Default, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + source.version(), - )?; - - let index = max_from.min(I::from(self.len())); - let mut source_iter = source.iter(); - source.iter_at(index).try_for_each(|(i, current)| { - let current = current.into_owned(); - - let prev = i - .checked_sub(I::from(len)) - .map(|prev_i| source_iter.unwrap_get_inner(prev_i)) - .unwrap_or_default(); - - self.forced_push_at(i, current.checked_sub(prev).unwrap(), exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_percentage_change( - &mut self, - max_from: I, - source: &impl AnyIterableVec, - len: usize, - exit: &Exit, - ) -> Result<()> - where - I: CheckedSub, - T2: StoredType + Default, - f32: From, - T: From, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + source.version(), - )?; - - let index = max_from.min(I::from(self.len())); - let mut source_iter = source.iter(); - source.iter_at(index).try_for_each(|(i, b)| { - let previous_value = f32::from( - i.checked_sub(I::from(len)) - .map(|prev_i| source_iter.unwrap_get_inner(prev_i)) - .unwrap_or_default(), - ); - - let last_value = f32::from(b.into_owned()); - - let percentage_change = ((last_value / previous_value) - 1.0) * 100.0; - - self.forced_push_at(i, T::from(percentage_change), exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_cagr( - &mut self, - max_from: I, - percentage_returns: &impl AnyIterableVec, - days: usize, - exit: &Exit, - ) -> Result<()> - where - I: CheckedSub, - T2: StoredType + Default, - f32: From, - T: From, - { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + percentage_returns.version(), - )?; - - if days % 365 != 0 { - panic!("bad days"); - } - - let years = days / 365; - let index = max_from.min(I::from(self.len())); - percentage_returns - .iter_at(index) - .try_for_each(|(i, percentage)| { - let percentage = percentage.into_owned(); - - let cagr = (((f32::from(percentage) / 100.0 + 1.0).powf(1.0 / years as f32)) - 1.0) - * 100.0; - - self.forced_push_at(i, T::from(cagr), exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_zscore( - &mut self, - max_from: I, - ratio: &impl AnyIterableVec, - sma: &impl AnyIterableVec, - sd: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> - where - T: From, - { - let mut sma_iter = sma.iter(); - let mut sd_iter = sd.iter(); - - self.compute_transform( - max_from, - ratio, - |(i, ratio, ..)| { - let sma = sma_iter.unwrap_get_inner(i); - let sd = sd_iter.unwrap_get_inner(i); - (i, T::from((ratio - sma) / sd)) - }, - exit, - ) - } -} - -impl EagerVec { - pub fn compute_dca_stack_via_len( - &mut self, - max_from: DateIndex, - closes: &impl AnyIterableVec>, - len: usize, - exit: &Exit, - ) -> Result<()> { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + closes.version(), - )?; - - let mut other_iter = closes.iter(); - let mut prev = None; - - let index = max_from.min(DateIndex::from(self.len())); - closes.iter_at(index).try_for_each(|(i, closes)| { - let price = *closes.into_owned(); - let i_usize = i.unwrap_to_usize(); - if prev.is_none() { - if i_usize == 0 { - prev.replace(Sats::ZERO); - } else { - prev.replace(self.into_iter().unwrap_get_inner_(i_usize - 1)); - } - } - - let mut stack = Sats::ZERO; - - if price != Dollars::ZERO { - stack = prev.unwrap() + Sats::from(Bitcoin::from(DCA_AMOUNT / price)); - - if i_usize >= len { - let prev_price = *other_iter.unwrap_get_inner_(i_usize - len); - if prev_price != Dollars::ZERO { - stack = stack - .checked_sub(Sats::from(Bitcoin::from(DCA_AMOUNT / prev_price))) - .unwrap(); - } - } - } - - prev.replace(stack); - - self.forced_push_at(i, stack, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_dca_stack_via_from( - &mut self, - max_from: DateIndex, - closes: &impl AnyIterableVec>, - from: DateIndex, - exit: &Exit, - ) -> Result<()> { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + closes.version(), - )?; - - let mut prev = None; - - let index = max_from.min(DateIndex::from(self.len())); - closes.iter_at(index).try_for_each(|(i, closes)| { - let price = *closes.into_owned(); - let i_usize = i.unwrap_to_usize(); - if prev.is_none() { - if i_usize == 0 { - prev.replace(Sats::ZERO); - } else { - prev.replace(self.into_iter().unwrap_get_inner_(i_usize - 1)); - } - } - - let mut stack = Sats::ZERO; - - if price != Dollars::ZERO && i >= from { - stack = prev.unwrap() + Sats::from(Bitcoin::from(DCA_AMOUNT / price)); - } - - prev.replace(stack); - - self.forced_push_at(i, stack, exit) - })?; - - self.safe_flush(exit) - } -} - -impl EagerVec { - pub fn compute_dca_avg_price_via_len( - &mut self, - max_from: DateIndex, - stacks: &impl AnyIterableVec, - len: usize, - exit: &Exit, - ) -> Result<()> { - self.validate_computed_version_or_reset_file( - Version::ONE + self.0.version() + stacks.version(), - )?; - - let index = max_from.min(DateIndex::from(self.len())); - - let first_price_date = DateIndex::try_from(Date::new(2010, 7, 12)).unwrap(); - - stacks.iter_at(index).try_for_each(|(i, stack)| { - let stack = stack.into_owned(); - let mut avg_price = Dollars::from(f64::NAN); - if i > first_price_date { - avg_price = DCA_AMOUNT - * len - .min(i.unwrap_to_usize() + 1) - .min(i.checked_sub(first_price_date).unwrap().unwrap_to_usize() + 1) - / Bitcoin::from(stack); - } - self.forced_push_at(i, avg_price, exit) - })?; - - self.safe_flush(exit) - } - - pub fn compute_dca_avg_price_via_from( - &mut self, - max_from: DateIndex, - stacks: &impl AnyIterableVec, - from: DateIndex, - exit: &Exit, - ) -> Result<()> { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + stacks.version(), - )?; - - let index = max_from.min(DateIndex::from(self.len())); - - let from_usize = from.unwrap_to_usize(); - - stacks.iter_at(index).try_for_each(|(i, stack)| { - let stack = stack.into_owned(); - let mut avg_price = Dollars::from(f64::NAN); - if i >= from { - avg_price = - DCA_AMOUNT * (i.unwrap_to_usize() + 1 - from_usize) / Bitcoin::from(stack); - } - self.forced_push_at(i, avg_price, exit) - })?; - - self.safe_flush(exit) - } -} - -impl EagerVec -where - I: StoredIndex, -{ - pub fn compute_from_sats( - &mut self, - max_from: I, - sats: &impl AnyIterableVec, - exit: &Exit, - ) -> Result<()> { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + sats.version(), - )?; - - let index = max_from.min(I::from(self.len())); - sats.iter_at(index).try_for_each(|(i, sats)| { - let (i, v) = (i, Bitcoin::from(sats.into_owned())); - self.forced_push_at(i, v, exit) - })?; - - self.safe_flush(exit) - } -} - -impl EagerVec -where - I: StoredIndex, -{ - pub fn compute_from_bitcoin( - &mut self, - max_from: I, - bitcoin: &impl AnyIterableVec, - price: &impl AnyIterableVec>, - exit: &Exit, - ) -> Result<()> { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.0.version() + bitcoin.version(), - )?; - - let mut price_iter = price.iter(); - let index = max_from.min(I::from(self.len())); - bitcoin.iter_at(index).try_for_each(|(i, bitcoin)| { - let dollars = price_iter.unwrap_get_inner(i); - let (i, v) = (i, *dollars * bitcoin.into_owned()); - self.forced_push_at(i, v, exit) - })?; - - self.safe_flush(exit) - } -} - -// impl EagerVec { -// pub fn compute_txindex_from_bitcoin( -// &mut self, -// max_from: TxIndex, -// bitcoin: &impl AnyIterableVec, -// i_to_height: &impl AnyIterableVec, -// price: &impl AnyIterableVec>, -// exit: &Exit, -// ) -> Result<()> { -// self.validate_computed_version_or_reset_file( -// Version::ZERO -// + self.0.version() -// + bitcoin.version() -// + i_to_height.version() -// + price.version(), -// )?; - -// let mut i_to_height_iter = i_to_height.iter(); -// let mut price_iter = price.iter(); -// let index = max_from.min(TxIndex::from(self.len())); -// bitcoin.iter_at(index).try_for_each(|(i, bitcoin, ..)| { -// let height = i_to_height_iter.unwrap_get_inner(i); -// let dollars = price_iter.unwrap_get_inner(height); -// let (i, v) = (i, *dollars * bitcoin.into_owned()); -// self.forced_push_at(i, v, exit) -// })?; - -// self.safe_flush(exit) -// } -// } - -impl<'a, I, T> IntoIterator for &'a EagerVec -where - I: StoredIndex, - T: StoredType, -{ - type Item = (I, Cow<'a, T>); - type IntoIter = StoredVecIterator<'a, I, T>; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -impl AnyVec for EagerVec -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn version(&self) -> Version { - self.0.header().computed_version() - } - - #[inline] - fn name(&self) -> &str { - self.0.name() - } - - #[inline] - fn len(&self) -> usize { - self.0.len() - } - - #[inline] - fn index_type_to_string(&self) -> &'static str { - I::to_string() - } - - #[inline] - fn value_type_to_size_of(&self) -> usize { - size_of::() - } -} - -impl AnyIterableVec for EagerVec -where - I: StoredIndex, - T: StoredType, -{ - fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - I: StoredIndex, - T: StoredType + 'a, - { - Box::new(self.0.into_iter()) - } -} - -impl AnyCollectableVec for EagerVec -where - I: StoredIndex, - T: StoredType, -{ - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result> { - CollectableVec::collect_range_serde_json(self, from, to) - } -} diff --git a/crates/brk_vec/src/variants/indexed.rs b/crates/brk_vec/src/variants/indexed.rs deleted file mode 100644 index 142dad4e3..000000000 --- a/crates/brk_vec/src/variants/indexed.rs +++ /dev/null @@ -1,224 +0,0 @@ -use std::{borrow::Cow, cmp::Ordering, fmt::Debug, path::Path}; - -use brk_core::{Error, Height, Result, Version}; - -use crate::{ - AnyCollectableVec, AnyIterableVec, AnyVec, BoxedVecIterator, CollectableVec, Format, - GenericStoredVec, Header, Mmap, StoredIndex, StoredType, StoredVec, -}; - -use super::StoredVecIterator; - -#[derive(Debug, Clone)] -pub struct IndexedVec(StoredVec); - -impl IndexedVec -where - I: StoredIndex, - T: StoredType, -{ - pub fn forced_import( - path: &Path, - name: &str, - version: Version, - format: Format, - ) -> Result { - Ok(Self( - StoredVec::forced_import(path, name, version, format).unwrap(), - )) - } - - #[inline] - pub fn unwrap_read(&self, index: I, mmap: &Mmap) -> T { - self.0.unwrap_read(index, mmap) - } - - #[inline] - pub fn get_or_read(&self, index: I, mmap: &Mmap) -> Result>> { - self.0.get_or_read(index, mmap) - } - - #[inline] - pub fn update_or_push(&mut self, index: I, value: T) -> Result<()> { - self.0.update_or_push(index, value) - } - - #[inline] - pub fn checked_push(&mut self, index: I, value: T) -> Result<()> { - let len = self.0.len(); - match len.cmp(&index.to_usize()?) { - Ordering::Greater => { - dbg!(index, value, len, self.0.header()); - Err(Error::IndexTooLow) - } - Ordering::Equal => { - self.0.push(value); - Ok(()) - } - Ordering::Less => { - dbg!(index, value, len, self.0.header()); - Err(Error::IndexTooHigh) - } - } - } - - #[inline] - pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { - let len = self.0.len(); - match len.cmp(&index.to_usize()?) { - Ordering::Greater => { - // dbg!(len, index, &self.pathbuf); - // panic!(); - Ok(()) - } - Ordering::Equal => { - self.0.push(value); - Ok(()) - } - Ordering::Less => { - dbg!(index, value, len, self.0.header()); - Err(Error::IndexTooHigh) - } - } - } - - #[inline] - pub fn fill_first_hole_or_push(&mut self, value: T) -> Result { - self.0.fill_first_hole_or_push(value) - } - - pub fn update(&mut self, index: I, value: T) -> Result<()> { - self.0.update(index, value) - } - - pub fn take(&mut self, index: I, mmap: &Mmap) -> Result> { - self.0.take(index, mmap) - } - - pub fn delete(&mut self, index: I) { - self.0.delete(index) - } - - fn update_height(&mut self, height: Height) { - self.0.mut_header().update_height(height); - } - - pub fn reset(&mut self) -> Result<()> { - self.update_height(Height::ZERO); - self.0.reset() - } - - pub fn truncate_if_needed(&mut self, index: I, height: Height) -> Result<()> { - self.update_height(height); - self.0.truncate_if_needed(index)?; - Ok(()) - } - - pub fn flush(&mut self, height: Height) -> Result<()> { - self.update_height(height); - self.0.flush() - } - - pub fn header(&self) -> &Header { - self.0.header() - } - - pub fn create_mmap(&self) -> Result { - self.0.create_mmap() - } - - #[inline] - pub fn hasnt(&self, index: I) -> Result { - self.0.has(index).map(|b| !b) - } -} - -impl AnyVec for IndexedVec -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn version(&self) -> Version { - self.0.version() - } - - #[inline] - fn name(&self) -> &str { - self.0.name() - } - - #[inline] - fn len(&self) -> usize { - self.0.len() - } - - #[inline] - fn index_type_to_string(&self) -> &'static str { - I::to_string() - } - - #[inline] - fn value_type_to_size_of(&self) -> usize { - size_of::() - } -} - -pub trait AnyIndexedVec: AnyVec { - fn height(&self) -> Height; - fn flush(&mut self, height: Height) -> Result<()>; -} - -impl AnyIndexedVec for IndexedVec -where - I: StoredIndex, - T: StoredType, -{ - fn height(&self) -> Height { - self.0.header().height() - } - - fn flush(&mut self, height: Height) -> Result<()> { - self.flush(height) - } -} - -impl<'a, I, T> IntoIterator for &'a IndexedVec -where - I: StoredIndex, - T: StoredType, -{ - type Item = (I, Cow<'a, T>); - type IntoIter = StoredVecIterator<'a, I, T>; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -impl AnyIterableVec for IndexedVec -where - I: StoredIndex, - T: StoredType, -{ - fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - T: 'a, - { - Box::new(self.into_iter()) - } -} - -impl AnyCollectableVec for IndexedVec -where - I: StoredIndex, - T: StoredType, -{ - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result> { - CollectableVec::collect_range_serde_json(self, from, to) - } -} diff --git a/crates/brk_vec/src/variants/lazy1.rs b/crates/brk_vec/src/variants/lazy1.rs deleted file mode 100644 index 568192ca5..000000000 --- a/crates/brk_vec/src/variants/lazy1.rs +++ /dev/null @@ -1,185 +0,0 @@ -use std::borrow::Cow; - -use brk_core::{Result, Version}; - -use crate::{ - AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedAnyIterableVec, - BoxedVecIterator, CollectableVec, StoredIndex, StoredType, -}; - -pub type ComputeFrom1 = - for<'a> fn(I, &mut dyn BaseVecIterator)>) -> Option; - -#[derive(Clone)] -pub struct LazyVecFrom1 -where - S1T: Clone, -{ - name: String, - version: Version, - source: BoxedAnyIterableVec, - compute: ComputeFrom1, -} - -impl LazyVecFrom1 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, -{ - pub fn init( - name: &str, - version: Version, - source: BoxedAnyIterableVec, - compute: ComputeFrom1, - ) -> Self { - if I::to_string() != S1I::to_string() { - unreachable!() - } - - Self { - name: name.to_string(), - version, - source, - compute, - } - } - - fn version(&self) -> Version { - self.version - } -} - -pub struct LazyVecFrom1Iterator<'a, I, T, S1I, S1T> -where - S1T: Clone, -{ - lazy: &'a LazyVecFrom1, - source: BoxedVecIterator<'a, S1I, S1T>, - index: usize, -} - -impl<'a, I, T, S1I, S1T> Iterator for LazyVecFrom1Iterator<'a, I, T, S1I, S1T> -where - I: StoredIndex, - T: StoredType + 'a, - S1I: StoredIndex, - S1T: StoredType, -{ - type Item = (I, Cow<'a, T>); - - fn next(&mut self) -> Option { - if self.index >= self.len() { - return None; - } - let index = I::from(self.index); - let opt = (self.lazy.compute)(index, &mut *self.source).map(|v| (index, Cow::Owned(v))); - if opt.is_some() { - self.index += 1; - } - opt - } -} - -impl BaseVecIterator for LazyVecFrom1Iterator<'_, I, T, S1I, S1T> -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, -{ - #[inline] - fn mut_index(&mut self) -> &mut usize { - &mut self.index - } - - #[inline] - fn len(&self) -> usize { - self.source.len() - } - - #[inline] - fn name(&self) -> &str { - self.source.name() - } -} - -impl<'a, I, T, S1I, S1T> IntoIterator for &'a LazyVecFrom1 -where - I: StoredIndex, - T: StoredType + 'a, - S1I: StoredIndex, - S1T: StoredType, -{ - type Item = (I, Cow<'a, T>); - type IntoIter = LazyVecFrom1Iterator<'a, I, T, S1I, S1T>; - - fn into_iter(self) -> Self::IntoIter { - LazyVecFrom1Iterator { - lazy: self, - source: self.source.iter(), - index: 0, - } - } -} - -impl AnyVec for LazyVecFrom1 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, -{ - fn version(&self) -> Version { - self.version() - } - - fn name(&self) -> &str { - self.name.as_str() - } - - fn index_type_to_string(&self) -> &'static str { - I::to_string() - } - - fn len(&self) -> usize { - self.source.len() - } - - #[inline] - fn value_type_to_size_of(&self) -> usize { - size_of::() - } -} - -impl AnyIterableVec for LazyVecFrom1 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, -{ - fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - T: 'a, - { - Box::new(self.into_iter()) - } -} - -impl AnyCollectableVec for LazyVecFrom1 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, -{ - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result> { - CollectableVec::collect_range_serde_json(self, from, to) - } -} diff --git a/crates/brk_vec/src/variants/lazy2.rs b/crates/brk_vec/src/variants/lazy2.rs deleted file mode 100644 index 0093f6454..000000000 --- a/crates/brk_vec/src/variants/lazy2.rs +++ /dev/null @@ -1,236 +0,0 @@ -use std::borrow::Cow; - -use brk_core::{Result, Version}; - -use crate::{ - AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedAnyIterableVec, - BoxedVecIterator, CollectableVec, StoredIndex, StoredType, -}; - -pub type ComputeFrom2 = for<'a> fn( - I, - &mut dyn BaseVecIterator)>, - &mut dyn BaseVecIterator)>, -) -> Option; - -#[derive(Clone)] -pub struct LazyVecFrom2 -where - S1T: Clone, - S2T: Clone, -{ - name: String, - version: Version, - source1: BoxedAnyIterableVec, - source2: BoxedAnyIterableVec, - compute: ComputeFrom2, -} - -impl LazyVecFrom2 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, -{ - pub fn init( - name: &str, - version: Version, - source1: BoxedAnyIterableVec, - source2: BoxedAnyIterableVec, - compute: ComputeFrom2, - ) -> Self { - if ([ - source1.index_type_to_string(), - source2.index_type_to_string(), - ]) - .into_iter() - .filter(|t| *t == I::to_string()) - .count() - == 0 - { - panic!("At least one should have same index"); - } - - Self { - name: name.to_string(), - version, - source1, - source2, - compute, - } - } - - fn version(&self) -> Version { - self.version - } -} - -pub struct LazyVecFrom2Iterator<'a, I, T, S1I, S1T, S2I, S2T> -where - S1T: Clone, - S2T: Clone, -{ - lazy: &'a LazyVecFrom2, - source1: BoxedVecIterator<'a, S1I, S1T>, - source2: BoxedVecIterator<'a, S2I, S2T>, - index: usize, -} - -impl<'a, I, T, S1I, S1T, S2I, S2T> Iterator for LazyVecFrom2Iterator<'a, I, T, S1I, S1T, S2I, S2T> -where - I: StoredIndex, - T: StoredType + 'a, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, -{ - type Item = (I, Cow<'a, T>); - - fn next(&mut self) -> Option { - let index = I::from(self.index); - let opt = (self.lazy.compute)(index, &mut *self.source1, &mut *self.source2) - .map(|v| (index, Cow::Owned(v))); - if opt.is_some() { - self.index += 1; - } - opt - } -} - -impl BaseVecIterator - for LazyVecFrom2Iterator<'_, I, T, S1I, S1T, S2I, S2T> -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, -{ - #[inline] - fn mut_index(&mut self) -> &mut usize { - &mut self.index - } - - #[inline] - fn len(&self) -> usize { - let len1 = if self.source1.index_type_to_string() == I::to_string() { - self.source1.len() - } else { - usize::MAX - }; - let len2 = if self.source2.index_type_to_string() == I::to_string() { - self.source2.len() - } else { - usize::MAX - }; - len1.min(len2) - } - - #[inline] - fn name(&self) -> &str { - self.source1.name() - } -} - -impl<'a, I, T, S1I, S1T, S2I, S2T> IntoIterator for &'a LazyVecFrom2 -where - I: StoredIndex, - T: StoredType + 'a, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, -{ - type Item = (I, Cow<'a, T>); - type IntoIter = LazyVecFrom2Iterator<'a, I, T, S1I, S1T, S2I, S2T>; - - fn into_iter(self) -> Self::IntoIter { - LazyVecFrom2Iterator { - lazy: self, - source1: self.source1.iter(), - source2: self.source2.iter(), - index: 0, - } - } -} - -impl AnyVec for LazyVecFrom2 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, -{ - fn version(&self) -> Version { - self.version() - } - - fn name(&self) -> &str { - self.name.as_str() - } - - fn index_type_to_string(&self) -> &'static str { - I::to_string() - } - - fn len(&self) -> usize { - let len1 = if self.source1.index_type_to_string() == I::to_string() { - self.source1.len() - } else { - usize::MAX - }; - let len2 = if self.source2.index_type_to_string() == I::to_string() { - self.source2.len() - } else { - usize::MAX - }; - len1.min(len2) - } - - #[inline] - fn value_type_to_size_of(&self) -> usize { - size_of::() - } -} - -impl AnyIterableVec for LazyVecFrom2 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, -{ - fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - T: 'a, - { - Box::new(self.into_iter()) - } -} - -impl AnyCollectableVec for LazyVecFrom2 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, -{ - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result> { - CollectableVec::collect_range_serde_json(self, from, to) - } -} diff --git a/crates/brk_vec/src/variants/lazy3.rs b/crates/brk_vec/src/variants/lazy3.rs deleted file mode 100644 index b94ad165b..000000000 --- a/crates/brk_vec/src/variants/lazy3.rs +++ /dev/null @@ -1,278 +0,0 @@ -use std::borrow::Cow; - -use brk_core::{Result, Version}; - -use crate::{ - AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedAnyIterableVec, - BoxedVecIterator, CollectableVec, StoredIndex, StoredType, -}; - -pub type ComputeFrom3 = for<'a> fn( - I, - &mut dyn BaseVecIterator)>, - &mut dyn BaseVecIterator)>, - &mut dyn BaseVecIterator)>, -) -> Option; - -#[derive(Clone)] -pub struct LazyVecFrom3 -where - S1T: Clone, - S2T: Clone, - S3T: Clone, -{ - name: String, - version: Version, - source1: BoxedAnyIterableVec, - source2: BoxedAnyIterableVec, - source3: BoxedAnyIterableVec, - compute: ComputeFrom3, -} - -impl LazyVecFrom3 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - pub fn init( - name: &str, - version: Version, - source1: BoxedAnyIterableVec, - source2: BoxedAnyIterableVec, - source3: BoxedAnyIterableVec, - compute: ComputeFrom3, - ) -> Self { - if ([ - source1.index_type_to_string(), - source2.index_type_to_string(), - source3.index_type_to_string(), - ]) - .into_iter() - .filter(|t| *t == I::to_string()) - .count() - == 0 - { - panic!("At least one should have same index"); - } - - Self { - name: name.to_string(), - version, - source1, - source2, - source3, - compute, - } - } - - fn version(&self) -> Version { - self.version - } -} - -pub struct LazyVecFrom3Iterator<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T> -where - S1T: Clone, - S2T: Clone, - S3T: Clone, -{ - lazy: &'a LazyVecFrom3, - source1: BoxedVecIterator<'a, S1I, S1T>, - source2: BoxedVecIterator<'a, S2I, S2T>, - source3: BoxedVecIterator<'a, S3I, S3T>, - index: usize, -} - -impl<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T> Iterator - for LazyVecFrom3Iterator<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T> -where - I: StoredIndex, - T: StoredType + 'a, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - type Item = (I, Cow<'a, T>); - - fn next(&mut self) -> Option { - let index = I::from(self.index); - let opt = (self.lazy.compute)( - index, - &mut *self.source1, - &mut *self.source2, - &mut *self.source3, - ) - .map(|v| (index, Cow::Owned(v))); - if opt.is_some() { - self.index += 1; - } - opt - } -} - -impl BaseVecIterator - for LazyVecFrom3Iterator<'_, I, T, S1I, S1T, S2I, S2T, S3I, S3T> -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - #[inline] - fn mut_index(&mut self) -> &mut usize { - &mut self.index - } - - #[inline] - fn len(&self) -> usize { - let len1 = if self.source1.index_type_to_string() == I::to_string() { - self.source1.len() - } else { - usize::MAX - }; - let len2 = if self.source2.index_type_to_string() == I::to_string() { - self.source2.len() - } else { - usize::MAX - }; - let len3 = if self.source3.index_type_to_string() == I::to_string() { - self.source3.len() - } else { - usize::MAX - }; - len1.min(len2).min(len3) - } - - #[inline] - fn name(&self) -> &str { - self.source1.name() - } -} - -impl<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T> IntoIterator - for &'a LazyVecFrom3 -where - I: StoredIndex, - T: StoredType + 'a, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - type Item = (I, Cow<'a, T>); - type IntoIter = LazyVecFrom3Iterator<'a, I, T, S1I, S1T, S2I, S2T, S3I, S3T>; - - fn into_iter(self) -> Self::IntoIter { - LazyVecFrom3Iterator { - lazy: self, - source1: self.source1.iter(), - source2: self.source2.iter(), - source3: self.source3.iter(), - index: 0, - } - } -} - -impl AnyVec for LazyVecFrom3 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - fn version(&self) -> Version { - self.version() - } - - fn name(&self) -> &str { - self.name.as_str() - } - - fn index_type_to_string(&self) -> &'static str { - I::to_string() - } - - fn len(&self) -> usize { - let len1 = if self.source1.index_type_to_string() == I::to_string() { - self.source1.len() - } else { - usize::MAX - }; - let len2 = if self.source2.index_type_to_string() == I::to_string() { - self.source2.len() - } else { - usize::MAX - }; - let len3 = if self.source3.index_type_to_string() == I::to_string() { - self.source3.len() - } else { - usize::MAX - }; - len1.min(len2).min(len3) - } - - #[inline] - fn value_type_to_size_of(&self) -> usize { - size_of::() - } -} - -impl AnyIterableVec - for LazyVecFrom3 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - T: 'a, - { - Box::new(self.into_iter()) - } -} - -impl AnyCollectableVec - for LazyVecFrom3 -where - I: StoredIndex, - T: StoredType, - S1I: StoredIndex, - S1T: StoredType, - S2I: StoredIndex, - S2T: StoredType, - S3I: StoredIndex, - S3T: StoredType, -{ - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result> { - CollectableVec::collect_range_serde_json(self, from, to) - } -} diff --git a/crates/brk_vec/src/variants/mod.rs b/crates/brk_vec/src/variants/mod.rs deleted file mode 100644 index 191a28472..000000000 --- a/crates/brk_vec/src/variants/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -mod compressed; -mod computed; -mod eager; -mod indexed; -mod lazy1; -mod lazy2; -mod lazy3; -mod raw; -mod stored; - -pub use compressed::*; -pub use computed::*; -pub use eager::*; -pub use indexed::*; -pub use lazy1::*; -pub use lazy2::*; -pub use lazy3::*; -pub use raw::*; -pub use stored::*; diff --git a/crates/brk_vec/src/variants/raw.rs b/crates/brk_vec/src/variants/raw.rs deleted file mode 100644 index ae0cc39cb..000000000 --- a/crates/brk_vec/src/variants/raw.rs +++ /dev/null @@ -1,471 +0,0 @@ -use std::{ - borrow::Cow, - collections::{BTreeMap, BTreeSet}, - fs::{self, File}, - io, - marker::PhantomData, - mem, - os::unix::fs::FileExt, - path::{Path, PathBuf}, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, -}; - -use brk_core::{Error, Result, Version}; -use memmap2::Mmap; -use rayon::prelude::*; - -use crate::{ - AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec, - Format, GenericStoredVec, HEADER_OFFSET, Header, StoredIndex, StoredType, UnsafeSlice, -}; - -const VERSION: Version = Version::ONE; - -#[derive(Debug)] -pub struct RawVec { - // --- Needed for &, TODO: Weak copy ? - header: Header, - parent: PathBuf, - name: &'static str, - shared_stored_len: Arc, - // --- Needed for &mut - pushed: Vec, - has_stored_holes: bool, - holes: BTreeSet, - updated: BTreeMap, - local_stored_len: Option, - phantom: PhantomData, -} - -impl RawVec -where - I: StoredIndex, - T: StoredType, -{ - /// Same as import but will reset the folder under certain errors, so be careful ! - pub fn forced_import(parent: &Path, name: &str, mut version: Version) -> Result { - version = version + VERSION; - let res = Self::import(parent, name, version); - match res { - Err(Error::DifferentCompressionMode) - | Err(Error::WrongEndian) - | Err(Error::WrongLength) - | Err(Error::DifferentVersion { .. }) => { - let path = Self::path_(parent, name); - fs::remove_file(path)?; - let holes_path = Self::holes_path_(parent, name); - if fs::exists(&holes_path)? { - fs::remove_file(holes_path)?; - } - Self::import(parent, name, version) - } - _ => res, - } - } - - pub fn import(parent: &Path, name: &str, version: Version) -> Result { - let path = Self::path_(parent, name); - let (header, file) = match Self::open_file_(&path) { - Ok(mut file) => { - if file.metadata()?.len() == 0 { - ( - Header::create_and_write(&mut file, version, Format::Raw)?, - Some(file), - ) - } else { - ( - Header::import_and_verify(&mut file, version, Format::Raw)?, - Some(file), - ) - } - } - Err(e) => match e.kind() { - io::ErrorKind::NotFound => { - fs::create_dir_all(Self::folder_(parent, name))?; - let mut file = Self::open_file_(&path)?; - let header = Header::create_and_write(&mut file, version, Format::Raw)?; - (header, None) - } - _ => { - return Err(e.into()); - } - }, - }; - - let stored_len = if let Some(file) = file { - (file.metadata()?.len() as usize - HEADER_OFFSET) / Self::SIZE_OF_T - } else { - 0 - }; - - let holes_path = Self::holes_path_(parent, name); - let holes = if fs::exists(&holes_path)? { - Some( - fs::read(&holes_path)? - .chunks(size_of::()) - .map(|b| -> Result { - Ok(usize::from_ne_bytes(brk_core::copy_first_8bytes(b)?)) - }) - .collect::>>()?, - ) - } else { - None - }; - - Ok(Self { - header, - name: Box::leak(Box::new(name.to_string())), - parent: parent.to_owned(), - pushed: vec![], - has_stored_holes: holes.is_some(), - holes: holes.unwrap_or_default(), - updated: BTreeMap::new(), - local_stored_len: Some(stored_len), - shared_stored_len: Arc::new(AtomicUsize::new(stored_len)), - phantom: PhantomData, - }) - } - - #[doc(hidden)] - pub fn set_stored_len(&mut self, len: usize) { - self.local_stored_len.replace(len); - self.shared_stored_len.store(len, Ordering::Relaxed); - } - - #[inline] - pub fn iter(&self) -> RawVecIterator<'_, I, T> { - self.into_iter() - } - - #[inline] - pub fn iter_at(&self, i: I) -> RawVecIterator<'_, I, T> { - self.iter_at_(i.unwrap_to_usize()) - } - - #[inline] - pub fn iter_at_(&self, i: usize) -> RawVecIterator<'_, I, T> { - let mut iter = self.into_iter(); - iter.set_(i); - iter - } - - pub fn write_header_if_needed(&mut self) -> io::Result> { - if self.header.modified() { - let mut file = self.open_file()?; - self.header.write(&mut file)?; - Ok(Some(file)) - } else { - Ok(None) - } - } -} - -impl GenericStoredVec for RawVec -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn read_(&self, index: usize, mmap: &Mmap) -> Result> { - let index = index * Self::SIZE_OF_T + HEADER_OFFSET; - let slice = &mmap[index..(index + Self::SIZE_OF_T)]; - T::try_read_from_bytes(slice) - .map(|v| Some(v)) - .map_err(Error::from) - } - - fn header(&self) -> &Header { - &self.header - } - - fn mut_header(&mut self) -> &mut Header { - &mut self.header - } - - #[inline] - fn stored_len(&self) -> usize { - self.local_stored_len - .unwrap_or_else(|| self.shared_stored_len.load(Ordering::Relaxed)) - } - - #[inline] - fn pushed(&self) -> &[T] { - self.pushed.as_slice() - } - #[inline] - fn mut_pushed(&mut self) -> &mut Vec { - &mut self.pushed - } - - #[inline] - fn holes(&self) -> &BTreeSet { - &self.holes - } - #[inline] - fn mut_holes(&mut self) -> &mut BTreeSet { - &mut self.holes - } - - #[inline] - fn updated(&self) -> &BTreeMap { - &self.updated - } - #[inline] - fn mut_updated(&mut self) -> &mut BTreeMap { - &mut self.updated - } - - #[inline] - fn parent(&self) -> &Path { - &self.parent - } - - fn flush(&mut self) -> Result<()> { - let file_opt = self.write_header_if_needed()?; - - let pushed_len = self.pushed_len(); - - let has_new_data = pushed_len != 0; - let has_updated_data = !self.updated.is_empty(); - let has_holes = !self.holes.is_empty(); - let had_holes = self.has_stored_holes && !has_holes; - - if !has_new_data && !has_updated_data && !has_holes && !had_holes { - return Ok(()); - } - - if has_new_data || has_updated_data { - let mut file = file_opt.unwrap_or(self.open_file()?); - - if has_new_data { - let bytes = { - let mut bytes: Vec = vec![0; pushed_len * Self::SIZE_OF_T]; - - let unsafe_bytes = UnsafeSlice::new(&mut bytes); - - mem::take(&mut self.pushed) - .into_par_iter() - .enumerate() - .for_each(|(i, v)| { - unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes()) - }); - - bytes - }; - - self.file_write_all(&mut file, &bytes)?; - - if let Some(local_stored_len) = self.local_stored_len.as_mut() { - *local_stored_len += pushed_len; - } - self.shared_stored_len - .fetch_add(pushed_len, Ordering::Relaxed); - } - - if has_updated_data { - mem::take(&mut self.updated) - .into_iter() - .try_for_each(|(i, v)| -> Result<()> { - file.write_all_at( - v.as_bytes(), - ((i * Self::SIZE_OF_T) + HEADER_OFFSET) as u64, - )?; - Ok(()) - })?; - } - } - - if has_holes || had_holes { - let holes_path = self.holes_path(); - if has_holes { - self.has_stored_holes = true; - fs::write( - &holes_path, - self.holes - .iter() - .flat_map(|i| i.to_ne_bytes()) - .collect::>(), - )?; - } else if had_holes { - self.has_stored_holes = false; - let _ = fs::remove_file(&holes_path); - } - } - - Ok(()) - } - - fn truncate_if_needed(&mut self, index: I) -> Result<()> { - let index = index.to_usize()?; - - if index >= self.stored_len() { - return Ok(()); - } - - if index == 0 { - self.reset()?; - return Ok(()); - } - - self.set_stored_len(index); - - let len = index * Self::SIZE_OF_T + HEADER_OFFSET; - - let mut file = self.open_file()?; - self.file_set_len(&mut file, len as u64)?; - - Ok(()) - } - - fn reset(&mut self) -> Result<()> { - self.set_stored_len(0); - self.reset_() - } -} - -impl AnyVec for RawVec -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn version(&self) -> Version { - self.header.vec_version() - } - - #[inline] - fn name(&self) -> &str { - self.name - } - - #[inline] - fn len(&self) -> usize { - self.len_() - } - - #[inline] - fn index_type_to_string(&self) -> &'static str { - I::to_string() - } - - #[inline] - fn value_type_to_size_of(&self) -> usize { - size_of::() - } -} - -impl Clone for RawVec { - fn clone(&self) -> Self { - Self { - header: self.header.clone(), - parent: self.parent.clone(), - name: self.name, - pushed: vec![], - updated: BTreeMap::new(), - has_stored_holes: false, - holes: BTreeSet::new(), - local_stored_len: None, - shared_stored_len: self.shared_stored_len.clone(), - phantom: PhantomData, - } - } -} - -#[derive(Debug)] -pub struct RawVecIterator<'a, I, T> { - vec: &'a RawVec, - mmap: Mmap, - index: usize, -} - -impl BaseVecIterator for RawVecIterator<'_, I, T> -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn mut_index(&mut self) -> &mut usize { - &mut self.index - } - - #[inline] - fn len(&self) -> usize { - self.vec.len() - } - - #[inline] - fn name(&self) -> &str { - self.vec.name() - } -} - -impl<'a, I, T> Iterator for RawVecIterator<'a, I, T> -where - I: StoredIndex, - T: StoredType, -{ - type Item = (I, Cow<'a, T>); - - fn next(&mut self) -> Option { - let index = self.index; - - let opt = self - .vec - .get_or_read_(index, &self.mmap) - .unwrap() - .map(|v| (I::from(index), v)); - - if opt.is_some() { - self.index += 1; - } - - opt - } -} - -impl<'a, I, T> IntoIterator for &'a RawVec -where - I: StoredIndex, - T: StoredType, -{ - type Item = (I, Cow<'a, T>); - type IntoIter = RawVecIterator<'a, I, T>; - - fn into_iter(self) -> Self::IntoIter { - RawVecIterator { - vec: self, - mmap: self.create_mmap().unwrap(), - index: 0, - } - } -} - -impl AnyIterableVec for RawVec -where - I: StoredIndex, - T: StoredType, -{ - fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - T: 'a, - { - Box::new(self.into_iter()) - } -} - -impl AnyCollectableVec for RawVec -where - I: StoredIndex, - T: StoredType, -{ - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result> { - CollectableVec::collect_range_serde_json(self, from, to) - } -} diff --git a/crates/brk_vec/src/variants/stored.rs b/crates/brk_vec/src/variants/stored.rs deleted file mode 100644 index 765ca9dcb..000000000 --- a/crates/brk_vec/src/variants/stored.rs +++ /dev/null @@ -1,295 +0,0 @@ -use std::{ - borrow::Cow, - collections::{BTreeMap, BTreeSet}, - path::{Path, PathBuf}, -}; - -use brk_core::{Result, Version}; -use memmap2::Mmap; - -use crate::{ - AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec, - Format, GenericStoredVec, Header, StoredIndex, StoredType, -}; - -use super::{CompressedVec, CompressedVecIterator, RawVec, RawVecIterator}; - -#[derive(Debug, Clone)] -pub enum StoredVec { - Raw(RawVec), - Compressed(CompressedVec), -} - -impl StoredVec -where - I: StoredIndex, - T: StoredType, -{ - pub fn forced_import( - path: &Path, - name: &str, - version: Version, - format: Format, - ) -> Result { - if version == Version::ZERO { - dbg!(path, name); - panic!("Version must be at least 1, can't verify endianess otherwise"); - } - - if format.is_compressed() { - Ok(Self::Compressed(CompressedVec::forced_import( - path, name, version, - )?)) - } else { - Ok(Self::Raw(RawVec::forced_import(path, name, version)?)) - } - } -} - -impl GenericStoredVec for StoredVec -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn read_(&self, index: usize, guard: &Mmap) -> Result> { - match self { - StoredVec::Raw(v) => v.read_(index, guard), - StoredVec::Compressed(v) => v.read_(index, guard), - } - } - - #[inline] - fn header(&self) -> &Header { - match self { - StoredVec::Raw(v) => v.header(), - StoredVec::Compressed(v) => v.header(), - } - } - - #[inline] - fn mut_header(&mut self) -> &mut Header { - match self { - StoredVec::Raw(v) => v.mut_header(), - StoredVec::Compressed(v) => v.mut_header(), - } - } - - #[inline] - fn parent(&self) -> &Path { - match self { - StoredVec::Raw(v) => v.parent(), - StoredVec::Compressed(v) => v.parent(), - } - } - - #[inline] - fn stored_len(&self) -> usize { - match self { - StoredVec::Raw(v) => v.stored_len(), - StoredVec::Compressed(v) => v.stored_len(), - } - } - - #[inline] - fn pushed(&self) -> &[T] { - match self { - StoredVec::Raw(v) => v.pushed(), - StoredVec::Compressed(v) => v.pushed(), - } - } - #[inline] - fn mut_pushed(&mut self) -> &mut Vec { - match self { - StoredVec::Raw(v) => v.mut_pushed(), - StoredVec::Compressed(v) => v.mut_pushed(), - } - } - - #[inline] - fn holes(&self) -> &BTreeSet { - match self { - StoredVec::Raw(v) => v.holes(), - StoredVec::Compressed(v) => v.holes(), - } - } - #[inline] - fn mut_holes(&mut self) -> &mut BTreeSet { - match self { - StoredVec::Raw(v) => v.mut_holes(), - StoredVec::Compressed(v) => v.mut_holes(), - } - } - - #[inline] - fn updated(&self) -> &BTreeMap { - match self { - StoredVec::Raw(v) => v.updated(), - StoredVec::Compressed(v) => v.updated(), - } - } - #[inline] - fn mut_updated(&mut self) -> &mut BTreeMap { - match self { - StoredVec::Raw(v) => v.mut_updated(), - StoredVec::Compressed(v) => v.mut_updated(), - } - } - - #[inline] - fn path(&self) -> PathBuf { - match self { - StoredVec::Raw(v) => v.path(), - StoredVec::Compressed(v) => v.path(), - } - } - - fn flush(&mut self) -> Result<()> { - match self { - StoredVec::Raw(v) => v.flush(), - StoredVec::Compressed(v) => v.flush(), - } - } - - fn truncate_if_needed(&mut self, index: I) -> Result<()> { - match self { - StoredVec::Raw(v) => v.truncate_if_needed(index), - StoredVec::Compressed(v) => v.truncate_if_needed(index), - } - } - - fn reset(&mut self) -> Result<()> { - match self { - StoredVec::Raw(v) => v.reset(), - StoredVec::Compressed(v) => v.reset(), - } - } -} - -impl AnyVec for StoredVec -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn version(&self) -> Version { - match self { - StoredVec::Raw(v) => v.version(), - StoredVec::Compressed(v) => v.version(), - } - } - - #[inline] - fn index_type_to_string(&self) -> &'static str { - I::to_string() - } - - #[inline] - fn len(&self) -> usize { - self.pushed_len() + self.stored_len() - } - - fn name(&self) -> &str { - match self { - StoredVec::Raw(v) => v.name(), - StoredVec::Compressed(v) => v.name(), - } - } - - #[inline] - fn value_type_to_size_of(&self) -> usize { - size_of::() - } -} - -#[derive(Debug)] -pub enum StoredVecIterator<'a, I, T> { - Raw(RawVecIterator<'a, I, T>), - Compressed(CompressedVecIterator<'a, I, T>), -} - -impl<'a, I, T> Iterator for StoredVecIterator<'a, I, T> -where - I: StoredIndex, - T: StoredType, -{ - type Item = (I, Cow<'a, T>); - fn next(&mut self) -> Option { - match self { - Self::Compressed(i) => i.next(), - Self::Raw(i) => i.next(), - } - } -} - -impl BaseVecIterator for StoredVecIterator<'_, I, T> -where - I: StoredIndex, - T: StoredType, -{ - #[inline] - fn mut_index(&mut self) -> &mut usize { - match self { - Self::Compressed(iter) => iter.mut_index(), - Self::Raw(iter) => iter.mut_index(), - } - } - - fn len(&self) -> usize { - match self { - Self::Compressed(i) => i.len(), - Self::Raw(i) => i.len(), - } - } - - #[inline] - fn name(&self) -> &str { - match self { - Self::Compressed(i) => i.name(), - Self::Raw(i) => i.name(), - } - } -} - -impl<'a, I, T> IntoIterator for &'a StoredVec -where - I: StoredIndex, - T: StoredType, -{ - type Item = (I, Cow<'a, T>); - type IntoIter = StoredVecIterator<'a, I, T>; - - fn into_iter(self) -> Self::IntoIter { - match self { - StoredVec::Compressed(v) => StoredVecIterator::Compressed(v.into_iter()), - StoredVec::Raw(v) => StoredVecIterator::Raw(v.into_iter()), - } - } -} - -impl AnyIterableVec for StoredVec -where - I: StoredIndex, - T: StoredType, -{ - fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T> - where - T: 'a, - { - Box::new(self.into_iter()) - } -} - -impl AnyCollectableVec for StoredVec -where - I: StoredIndex, - T: StoredType, -{ - fn collect_range_serde_json( - &self, - from: Option, - to: Option, - ) -> Result> { - CollectableVec::collect_range_serde_json(self, from, to) - } -} diff --git a/crates/brk_vecs/Cargo.toml b/crates/brk_vecs/Cargo.toml index 040dbed6b..96abcc4af 100644 --- a/crates/brk_vecs/Cargo.toml +++ b/crates/brk_vecs/Cargo.toml @@ -24,3 +24,6 @@ serde_json = { workspace = true } zerocopy = { workspace = true } zerocopy-derive = { workspace = true } zstd = "0.13.3" + +[package.metadata.cargo-machete] +ignored = ["clap"]