diff --git a/Cargo.lock b/Cargo.lock index 209de4467..36335637b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,6 +138,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayvec" version = "0.7.6" @@ -548,6 +554,8 @@ dependencies = [ name = "brk_vec" version = "0.0.19" dependencies = [ + "arc-swap", + "axum", "memmap2", "rayon", "serde", diff --git a/Cargo.toml b/Cargo.toml index 8f5bd1171..4e7a1251c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ panic = "abort" inherits = "release" [workspace.dependencies] +axum = "0.8.3" bitcoin = { version = "0.32.5", features = ["serde"] } bitcoincore-rpc = "0.19.0" brk_cli = { version = "0", path = "crates/brk_cli" } diff --git a/crates/brk_computer/src/storage/vecs/base.rs b/crates/brk_computer/src/storage/vecs/base.rs index c7f35c968..896afeb3f 100644 --- a/crates/brk_computer/src/storage/vecs/base.rs +++ b/crates/brk_computer/src/storage/vecs/base.rs @@ -10,14 +10,14 @@ use std::{ use brk_core::CheckedSub; use brk_exit::Exit; use brk_vec::{ - AnyStorableVec, Compressed, Error, MAX_CACHE_SIZE, Result, StorableVec, StoredIndex, - StoredType, Version, + AnyStoredVec, Compressed, Error, MAX_CACHE_SIZE, Result, StoredIndex, StoredType, StoredVec, + Version, }; #[derive(Debug)] pub struct ComputedVec { computed_version: Option, - vec: StorableVec, + vec: StoredVec, } impl ComputedVec @@ -32,7 +32,7 @@ where version: Version, compressed: Compressed, ) -> brk_vec::Result { - let vec = StorableVec::forced_import(path, version, compressed)?; + let vec = StoredVec::forced_import(path, version, compressed)?; Ok(Self { computed_version: None, @@ -89,19 +89,19 @@ where self.vec.len() } - pub fn vec(&self) -> &StorableVec { + pub fn vec(&self) -> &StoredVec { &self.vec } - pub fn mut_vec(&mut self) -> &mut StorableVec { + pub fn mut_vec(&mut self) -> &mut StoredVec { &mut self.vec } - pub fn any_vec(&self) -> &dyn AnyStorableVec { + pub fn any_vec(&self) -> &dyn AnyStoredVec { &self.vec } - pub fn mut_any_vec(&mut self) -> &mut dyn AnyStorableVec { + pub fn mut_any_vec(&mut self) -> &mut dyn AnyStoredVec { &mut self.vec } @@ -130,14 +130,14 @@ where pub fn compute_transform( &mut self, max_from: A, - other: &mut StorableVec, + other: &mut StoredVec, mut t: F, exit: &Exit, ) -> Result<()> where A: StoredIndex, B: StoredType, - F: FnMut((A, B, &mut Self, &mut StorableVec)) -> (I, T), + F: FnMut((A, B, &mut Self, &mut StoredVec)) -> (I, T), { self.validate_computed_version_or_reset_file( Version::ZERO + self.version() + other.version(), @@ -155,7 +155,7 @@ where pub fn compute_inverse_more_to_less( &mut self, max_from: T, - other: &mut StorableVec, + other: &mut StoredVec, exit: &Exit, ) -> Result<()> where @@ -182,8 +182,8 @@ where pub fn compute_inverse_less_to_more( &mut self, max_from: T, - first_indexes: &mut StorableVec, - last_indexes: &mut StorableVec, + first_indexes: &mut StoredVec, + last_indexes: &mut StoredVec, exit: &Exit, ) -> Result<()> where @@ -208,7 +208,7 @@ where pub fn compute_last_index_from_first( &mut self, max_from: I, - first_indexes: &mut StorableVec, + first_indexes: &mut StoredVec, final_len: usize, exit: &Exit, ) -> Result<()> @@ -243,8 +243,8 @@ where pub fn compute_count_from_indexes( &mut self, max_from: I, - first_indexes: &mut StorableVec, - last_indexes: &mut StorableVec, + first_indexes: &mut StoredVec, + last_indexes: &mut StoredVec, exit: &Exit, ) -> Result<()> where @@ -271,8 +271,8 @@ where pub fn compute_is_first_ordered( &mut self, max_from: I, - self_to_other: &mut StorableVec, - other_to_self: &mut StorableVec, + self_to_other: &mut StoredVec, + other_to_self: &mut StoredVec, exit: &Exit, ) -> Result<()> where @@ -295,8 +295,8 @@ where pub fn compute_sum_from_indexes( &mut self, max_from: I, - first_indexes: &mut StorableVec, - last_indexes: &mut StorableVec, + first_indexes: &mut StoredVec, + last_indexes: &mut StoredVec, exit: &Exit, ) -> Result<()> where diff --git a/crates/brk_computer/src/storage/vecs/blocks.rs b/crates/brk_computer/src/storage/vecs/blocks.rs index 4faea83fe..2b086ad67 100644 --- a/crates/brk_computer/src/storage/vecs/blocks.rs +++ b/crates/brk_computer/src/storage/vecs/blocks.rs @@ -4,7 +4,7 @@ use brk_core::{CheckedSub, StoredU32, StoredU64, StoredUsize, Timestamp, Weight} use brk_exit::Exit; use brk_indexer::Indexer; use brk_parser::bitcoin; -use brk_vec::{AnyStorableVec, Compressed, Version}; +use brk_vec::{AnyStoredVec, Compressed, Version}; use super::{ Indexes, @@ -156,7 +156,7 @@ impl Vecs { Ok(()) } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> { [ self.indexes_to_block_interval.any_vecs(), self.indexes_to_block_count.any_vecs(), diff --git a/crates/brk_computer/src/storage/vecs/grouped/builder.rs b/crates/brk_computer/src/storage/vecs/grouped/builder.rs index f6a0fd1bf..43cf98a53 100644 --- a/crates/brk_computer/src/storage/vecs/grouped/builder.rs +++ b/crates/brk_computer/src/storage/vecs/grouped/builder.rs @@ -1,7 +1,7 @@ use std::path::Path; use brk_exit::Exit; -use brk_vec::{AnyStorableVec, Compressed, Result, StorableVec, StoredIndex, StoredType, Version}; +use brk_vec::{AnyStoredVec, Compressed, Result, StoredIndex, StoredType, StoredVec, Version}; use crate::storage::vecs::base::ComputedVec; @@ -114,12 +114,7 @@ where Ok(s) } - pub fn extend( - &mut self, - max_from: I, - source: &mut StorableVec, - exit: &Exit, - ) -> Result<()> { + pub fn extend(&mut self, max_from: I, source: &mut StoredVec, exit: &Exit) -> Result<()> { if self.total.is_none() { return Ok(()); }; @@ -154,9 +149,9 @@ where pub fn compute( &mut self, max_from: I, - source: &mut StorableVec, - first_indexes: &mut StorableVec, - last_indexes: &mut StorableVec, + source: &mut StoredVec, + first_indexes: &mut StoredVec, + last_indexes: &mut StoredVec, exit: &Exit, ) -> Result<()> where @@ -276,8 +271,8 @@ where &mut self, max_from: I, source: &mut ComputedVecBuilder, - first_indexes: &mut StorableVec, - last_indexes: &mut StorableVec, + first_indexes: &mut StoredVec, + last_indexes: &mut StoredVec, exit: &Exit, ) -> Result<()> where @@ -434,8 +429,8 @@ where )) } - pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> { - let mut v: Vec<&dyn AnyStorableVec> = vec![]; + pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> { + let mut v: Vec<&dyn AnyStoredVec> = vec![]; if let Some(first) = self.first.as_ref() { v.push(first.any_vec()); diff --git a/crates/brk_computer/src/storage/vecs/grouped/from_dateindex.rs b/crates/brk_computer/src/storage/vecs/grouped/from_dateindex.rs index 5b8566465..3bfaa121f 100644 --- a/crates/brk_computer/src/storage/vecs/grouped/from_dateindex.rs +++ b/crates/brk_computer/src/storage/vecs/grouped/from_dateindex.rs @@ -3,7 +3,7 @@ use std::path::Path; use brk_core::{Dateindex, Decadeindex, Monthindex, Quarterindex, Weekindex, Yearindex}; use brk_exit::Exit; use brk_indexer::Indexer; -use brk_vec::{AnyStorableVec, Compressed, Result, Version}; +use brk_vec::{AnyStoredVec, Compressed, Result, Version}; use crate::storage::vecs::{Indexes, base::ComputedVec, indexes}; @@ -126,7 +126,7 @@ where Ok(()) } - pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> { [ vec![self.dateindex.any_vec()], self.dateindex_extra.any_vecs(), diff --git a/crates/brk_computer/src/storage/vecs/grouped/from_height.rs b/crates/brk_computer/src/storage/vecs/grouped/from_height.rs index 945d1083d..a1e1662c0 100644 --- a/crates/brk_computer/src/storage/vecs/grouped/from_height.rs +++ b/crates/brk_computer/src/storage/vecs/grouped/from_height.rs @@ -5,7 +5,7 @@ use brk_core::{ }; use brk_exit::Exit; use brk_indexer::Indexer; -use brk_vec::{AnyStorableVec, Compressed, Result, StorableVec, Version}; +use brk_vec::{AnyStoredVec, Compressed, Result, StoredVec, Version}; use crate::storage::vecs::{Indexes, base::ComputedVec, indexes}; @@ -102,7 +102,7 @@ where indexes: &mut indexes::Vecs, starting_indexes: &Indexes, exit: &Exit, - height: Option<&mut StorableVec>, + height: Option<&mut StoredVec>, ) -> color_eyre::Result<()> { let height = height.unwrap_or_else(|| self.height.as_mut().unwrap().mut_vec()); @@ -168,7 +168,7 @@ where Ok(()) } - pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> { [ self.height.as_ref().map_or(vec![], |v| vec![v.any_vec()]), self.height_extra.any_vecs(), diff --git a/crates/brk_computer/src/storage/vecs/grouped/from_height_strict.rs b/crates/brk_computer/src/storage/vecs/grouped/from_height_strict.rs index 5c260d77e..6519a04cc 100644 --- a/crates/brk_computer/src/storage/vecs/grouped/from_height_strict.rs +++ b/crates/brk_computer/src/storage/vecs/grouped/from_height_strict.rs @@ -3,7 +3,7 @@ use std::path::Path; use brk_core::{Difficultyepoch, Height}; use brk_exit::Exit; use brk_indexer::Indexer; -use brk_vec::{AnyStorableVec, Compressed, Result, Version}; +use brk_vec::{AnyStoredVec, Compressed, Result, Version}; use crate::storage::vecs::{Indexes, base::ComputedVec, indexes}; @@ -84,7 +84,7 @@ where Ok(()) } - pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> { [ vec![self.height.any_vec()], self.height_extra.any_vecs(), diff --git a/crates/brk_computer/src/storage/vecs/grouped/from_txindex.rs b/crates/brk_computer/src/storage/vecs/grouped/from_txindex.rs index 0bdbd1eac..f94c4150d 100644 --- a/crates/brk_computer/src/storage/vecs/grouped/from_txindex.rs +++ b/crates/brk_computer/src/storage/vecs/grouped/from_txindex.rs @@ -6,7 +6,7 @@ use brk_core::{ }; use brk_exit::Exit; use brk_indexer::Indexer; -use brk_vec::{AnyStorableVec, Compressed, Result, StorableVec, Version}; +use brk_vec::{AnyStoredVec, Compressed, Result, StoredVec, Version}; use crate::storage::vecs::{Indexes, base::ComputedVec, indexes}; @@ -115,7 +115,7 @@ where indexes: &mut indexes::Vecs, starting_indexes: &Indexes, exit: &Exit, - txindex: Option<&mut StorableVec>, + txindex: Option<&mut StoredVec>, ) -> color_eyre::Result<()> { let txindex = txindex.unwrap_or_else(|| self.txindex.as_mut().unwrap().mut_vec()); @@ -189,7 +189,7 @@ where Ok(()) } - pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> { [ self.txindex.as_ref().map_or(vec![], |v| vec![v.any_vec()]), self.txindex_extra.any_vecs(), diff --git a/crates/brk_computer/src/storage/vecs/indexes.rs b/crates/brk_computer/src/storage/vecs/indexes.rs index e1961beed..5536289ee 100644 --- a/crates/brk_computer/src/storage/vecs/indexes.rs +++ b/crates/brk_computer/src/storage/vecs/indexes.rs @@ -6,7 +6,7 @@ use brk_core::{ }; use brk_exit::Exit; use brk_indexer::Indexer; -use brk_vec::{AnyStorableVec, Compressed, Version}; +use brk_vec::{AnyStoredVec, Compressed, Version}; use super::ComputedVec; @@ -784,7 +784,7 @@ impl Vecs { }) } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> { vec![ self.dateindex_to_date.any_vec(), self.dateindex_to_dateindex.any_vec(), diff --git a/crates/brk_computer/src/storage/vecs/marketprice.rs b/crates/brk_computer/src/storage/vecs/marketprice.rs index c3946299e..fa058a976 100644 --- a/crates/brk_computer/src/storage/vecs/marketprice.rs +++ b/crates/brk_computer/src/storage/vecs/marketprice.rs @@ -7,7 +7,7 @@ use brk_core::{ use brk_exit::Exit; use brk_fetcher::Fetcher; use brk_indexer::Indexer; -use brk_vec::{AnyStorableVec, Compressed, Version}; +use brk_vec::{AnyStoredVec, Compressed, Version}; use super::{ ComputedVec, Indexes, @@ -765,7 +765,7 @@ impl Vecs { Ok(()) } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> { vec![ vec![ self.dateindex_to_close_in_cents.any_vec(), diff --git a/crates/brk_computer/src/storage/vecs/mod.rs b/crates/brk_computer/src/storage/vecs/mod.rs index ce8a1d064..a5c133f88 100644 --- a/crates/brk_computer/src/storage/vecs/mod.rs +++ b/crates/brk_computer/src/storage/vecs/mod.rs @@ -3,7 +3,7 @@ use std::{fs, path::Path}; use brk_exit::Exit; use brk_fetcher::Fetcher; use brk_indexer::Indexer; -use brk_vec::{AnyStorableVec, Compressed}; +use brk_vec::{AnyStoredVec, Compressed}; mod base; mod blocks; @@ -63,7 +63,7 @@ impl Vecs { Ok(()) } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> { [ self.indexes.as_any_vecs(), self.blocks.as_any_vecs(), diff --git a/crates/brk_computer/src/storage/vecs/transactions.rs b/crates/brk_computer/src/storage/vecs/transactions.rs index 063112dec..d820862e8 100644 --- a/crates/brk_computer/src/storage/vecs/transactions.rs +++ b/crates/brk_computer/src/storage/vecs/transactions.rs @@ -3,7 +3,7 @@ use std::{fs, path::Path}; use brk_core::{Sats, StoredU64, Txindex, Txinindex, Txoutindex}; use brk_exit::Exit; use brk_indexer::Indexer; -use brk_vec::{AnyStorableVec, Compressed, Version}; +use brk_vec::{AnyStoredVec, Compressed, Version}; use super::{ ComputedVec, Indexes, @@ -203,7 +203,7 @@ impl Vecs { Ok(()) } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> { [ vec![ self.txindex_to_is_coinbase.any_vec(), diff --git a/crates/brk_indexer/src/vecs/base.rs b/crates/brk_indexer/src/vecs/base.rs index 7199ed0c9..e3f902ea9 100644 --- a/crates/brk_indexer/src/vecs/base.rs +++ b/crates/brk_indexer/src/vecs/base.rs @@ -6,8 +6,8 @@ use std::{ }; use brk_vec::{ - AnyStorableVec, Compressed, Error, MAX_CACHE_SIZE, MAX_PAGE_SIZE, Result, StorableVec, - StoredIndex, StoredType, Value, Version, + AnyVec, Compressed, Error, MAX_CACHE_SIZE, MAX_PAGE_SIZE, Result, StoredIndex, StoredType, + StoredVec, Value, Version, }; use super::Height; @@ -15,7 +15,7 @@ use super::Height; #[derive(Debug)] pub struct IndexedVec { height: Option, - vec: StorableVec, + vec: StoredVec, } impl IndexedVec @@ -33,9 +33,9 @@ where version: Version, compressed: Compressed, ) -> brk_vec::Result { - let mut vec = StorableVec::forced_import(path, version, compressed)?; + let mut vec = StoredVec::forced_import(path, version, compressed)?; - vec.enable_large_cache(); + vec.enable_large_cache_if_possible(); Ok(Self { height: Height::try_from(Self::path_height_(path).as_path()).ok(), @@ -67,15 +67,13 @@ where let min_page_index = (max_page_index + 1) - large_cache_len; if page_index >= min_page_index { - let values = self - .vec + self.vec .pages() .unwrap() .get(page_index - min_page_index) .ok_or(Error::MmapsVecIsTooSmall)? - .get_or_init(|| self.vec.decode_page(page_index).unwrap()); - - return Ok(values.get(index)?.map(|v| Value::Ref(v))); + .get_or_init(|| self.vec.decode_page(page_index).unwrap()) + .get(index) } } @@ -126,15 +124,15 @@ where self.vec.flush() } - pub fn vec(&self) -> &StorableVec { + pub fn vec(&self) -> &StoredVec { &self.vec } - pub fn mut_vec(&mut self) -> &mut StorableVec { + pub fn mut_vec(&mut self) -> &mut StoredVec { &mut self.vec } - pub fn any_vec(&self) -> &dyn AnyStorableVec { + pub fn any_vec(&self) -> &dyn AnyVec { &self.vec } diff --git a/crates/brk_indexer/src/vecs/mod.rs b/crates/brk_indexer/src/vecs/mod.rs index 8c2bf4199..28e3c04b3 100644 --- a/crates/brk_indexer/src/vecs/mod.rs +++ b/crates/brk_indexer/src/vecs/mod.rs @@ -7,7 +7,7 @@ use brk_core::{ P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, Sats, StoredUsize, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight, }; -use brk_vec::{AnyStorableVec, Compressed, Version}; +use brk_vec::{AnyVec, Compressed, Version}; use rayon::prelude::*; use crate::Indexes; @@ -609,7 +609,7 @@ impl Vecs { .unwrap() } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn AnyVec> { vec![ self.addressindex_to_addresstype.any_vec(), self.addressindex_to_addresstypeindex.any_vec(), diff --git a/crates/brk_query/src/lib.rs b/crates/brk_query/src/lib.rs index a08e7396d..c3b731712 100644 --- a/crates/brk_query/src/lib.rs +++ b/crates/brk_query/src/lib.rs @@ -5,7 +5,7 @@ use brk_computer::Computer; use brk_indexer::Indexer; -use brk_vec::AnyStorableVec; +use brk_vec::AnyStoredVec; use tabled::settings::Style; mod format; @@ -51,7 +51,7 @@ impl<'a> Query<'a> { } } - pub fn search(&self, index: Index, ids: &[&str]) -> Vec<(String, &&dyn AnyStorableVec)> { + pub fn search(&self, index: Index, ids: &[&str]) -> Vec<(String, &&dyn AnyStoredVec)> { let tuples = ids .iter() .flat_map(|s| { @@ -86,7 +86,7 @@ impl<'a> Query<'a> { pub fn format( &self, - vecs: Vec<(String, &&dyn AnyStorableVec)>, + vecs: Vec<(String, &&dyn AnyStoredVec)>, from: Option, to: Option, format: Option, diff --git a/crates/brk_query/src/vec_trees.rs b/crates/brk_query/src/vec_trees.rs index 2cc22828e..b5088947a 100644 --- a/crates/brk_query/src/vec_trees.rs +++ b/crates/brk_query/src/vec_trees.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use brk_vec::AnyStorableVec; +use brk_vec::AnyStoredVec; use derive_deref::{Deref, DerefMut}; use super::index::Index; @@ -13,7 +13,7 @@ pub struct VecTrees<'a> { impl<'a> VecTrees<'a> { // Not the most performant or type safe but only built once so that's okay - pub fn insert(&mut self, vec: &'a dyn AnyStorableVec) { + pub fn insert(&mut self, vec: &'a dyn AnyStoredVec) { let file_name = vec.file_name(); let split = file_name.split("_to_").collect::>(); if split.len() != 2 { @@ -88,7 +88,7 @@ impl<'a> VecTrees<'a> { } #[derive(Default, Deref, DerefMut)] -pub struct IndexToVec<'a>(BTreeMap); +pub struct IndexToVec<'a>(BTreeMap); #[derive(Default, Deref, DerefMut)] -pub struct IdToVec<'a>(BTreeMap); +pub struct IdToVec<'a>(BTreeMap); diff --git a/crates/brk_server/Cargo.toml b/crates/brk_server/Cargo.toml index fa6384ba7..8dc6bb29d 100644 --- a/crates/brk_server/Cargo.toml +++ b/crates/brk_server/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true repository.workspace = true [dependencies] -axum = "0.8.3" +axum = { workspace = true } brk_computer = { workspace = true } brk_exit = { workspace = true } brk_fetcher = { workspace = true } diff --git a/crates/brk_vec/Cargo.toml b/crates/brk_vec/Cargo.toml index 1fd531032..656e2751a 100644 --- a/crates/brk_vec/Cargo.toml +++ b/crates/brk_vec/Cargo.toml @@ -9,6 +9,8 @@ license.workspace = true repository.workspace = true [dependencies] +axum = { workspace = true } +arc-swap = "1.7.1" memmap2 = "0.9.5" rayon = { workspace = true } serde = { workspace = true } diff --git a/crates/brk_vec/examples/main.rs b/crates/brk_vec/examples/main.rs index bc17f3549..0b26bb7ba 100644 --- a/crates/brk_vec/examples/main.rs +++ b/crates/brk_vec/examples/main.rs @@ -1,13 +1,12 @@ use std::{fs, path::Path}; -use brk_vec::{Compressed, StorableVec, Version}; +use brk_vec::{AnyVec, RawVec, Version}; fn main() -> Result<(), Box> { let _ = fs::remove_dir_all("./vec"); { - let mut vec: StorableVec = - StorableVec::forced_import(Path::new("./vec"), Version::ZERO, Compressed::YES)?; + let mut vec: RawVec = RawVec::forced_import(Path::new("./vec"), Version::ZERO)?; (0..21_u32).for_each(|v| { vec.push(v); @@ -20,8 +19,7 @@ fn main() -> Result<(), Box> { } { - let mut vec: StorableVec = - StorableVec::forced_import(Path::new("./vec"), Version::ZERO, Compressed::YES)?; + let mut vec: RawVec = RawVec::forced_import(Path::new("./vec"), Version::ZERO)?; dbg!(vec.get(0)?); dbg!(vec.get(0)?); @@ -42,10 +40,9 @@ fn main() -> Result<(), Box> { } { - let mut vec: StorableVec = - StorableVec::forced_import(Path::new("./vec"), Version::ZERO, Compressed::YES)?; + let mut vec: RawVec = RawVec::forced_import(Path::new("./vec"), Version::ZERO)?; - vec.enable_large_cache(); + // vec.enable_large_cache_if_possible(); dbg!(vec.get(0)?); dbg!(vec.get(20)?); @@ -58,12 +55,12 @@ fn main() -> Result<(), Box> { dbg!(vec.get(5)?); dbg!(vec.get(20)?); - vec.iter(|(_, v)| { + vec.iter(|(_, v, ..)| { dbg!(v); Ok(()) })?; - vec.iter_from(5, |(_, v)| { + vec.iter_from(5, |(_, v, ..)| { dbg!(v); Ok(()) })?; diff --git a/crates/brk_vec/src/enums/error.rs b/crates/brk_vec/src/enums/error.rs index 8729e5076..f1a117467 100644 --- a/crates/brk_vec/src/enums/error.rs +++ b/crates/brk_vec/src/enums/error.rs @@ -15,6 +15,7 @@ pub enum Error { IO(io::Error), ZeroCopyError, IndexTooHigh, + EmptyVec, IndexTooLow, ExpectFileToHaveIndex, ExpectVecToHaveIndex, @@ -68,6 +69,7 @@ impl fmt::Display for Error { Error::ZeroCopyError => write!(f, "Zero copy convert error"), Error::RangeFromAfterTo(from, to) => write!(f, "Range, from {from} is after to {to}"), Error::DifferentCompressionMode => write!(f, "Different compression mode chosen"), + Error::EmptyVec => write!(f, "The Vec is empty, maybe wait for a bit"), } } } diff --git a/crates/brk_vec/src/enums/values.rs b/crates/brk_vec/src/enums/values.rs index dd3d2bed2..51194e180 100644 --- a/crates/brk_vec/src/enums/values.rs +++ b/crates/brk_vec/src/enums/values.rs @@ -3,7 +3,10 @@ use std::ops::Range; use memmap2::Mmap; use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; -use crate::MAX_PAGE_SIZE; +const ONE_KIB: usize = 1024; +pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB; +const ONE_MIB: usize = ONE_KIB * ONE_KIB; +pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; use super::Result; diff --git a/crates/brk_vec/src/lib.rs b/crates/brk_vec/src/lib.rs index 40ef32dee..d41cc87d8 100644 --- a/crates/brk_vec/src/lib.rs +++ b/crates/brk_vec/src/lib.rs @@ -3,863 +3,139 @@ #![doc = include_str!("../examples/main.rs")] #![doc = "```"] -use std::{ - fs::{self, File, OpenOptions}, - io::{self, Read, Seek, SeekFrom, Write}, - marker::PhantomData, - mem, - path::{Path, PathBuf}, - sync::OnceLock, -}; - -pub use memmap2; -use rayon::prelude::*; -pub use zerocopy; -use zstd::DEFAULT_COMPRESSION_LEVEL; - mod enums; mod structs; mod traits; +mod variants; +use std::{path::Path, sync::Arc}; + +use arc_swap::{ArcSwap, Guard}; +use axum::Json; pub use enums::*; +use memmap2::Mmap; pub use structs::*; pub use traits::*; +pub use variants::*; -const ONE_KIB: usize = 1024; -pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB; -const ONE_MIB: usize = ONE_KIB * ONE_KIB; -pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; - -#[allow(private_interfaces)] #[derive(Debug)] -pub enum StorableVec { - Raw { - base: Base, - }, - Compressed { - base: Base, - pages_meta: CompressedPagesMetadata, - }, +pub enum StoredVec { + Raw(RawVec), + Compressed(CompressedVec), } -impl StorableVec +impl StoredVec where I: StoredIndex, T: StoredType, { - pub const SIZE_OF_T: usize = size_of::(); - pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; - pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; - pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; - - /// Same as import but will reset the folder under certain errors, so be careful ! pub fn forced_import(path: &Path, version: Version, compressed: Compressed) -> Result { - let res = Self::import(path, version, compressed); - match res { - Err(Error::WrongEndian) - | Err(Error::DifferentCompressionMode) - | Err(Error::DifferentVersion { - found: _, - expected: _, - }) => { - fs::remove_dir_all(path)?; - Self::import(path, version, compressed) - } - _ => res, - } - } - - pub fn import(path: &Path, version: Version, compressed: Compressed) -> Result { - let base = Base::import(path, version, compressed)?; - if *compressed { - let pages_meta = Self::read_pages_meta_(path)?; - - Ok(Self::Compressed { base, pages_meta }) + Ok(Self::Compressed(CompressedVec::forced_import( + path, version, + )?)) } else { - Ok(Self::Raw { base }) + Ok(Self::Raw(RawVec::forced_import(path, version)?)) } } +} - fn read_pages_meta(&self) -> Result { - Self::read_pages_meta_(self.path()) - } - fn read_pages_meta_(path: &Path) -> Result { - CompressedPagesMetadata::read(Self::path_pages_meta_(path).as_path()) - } - +impl AnyVec for StoredVec +where + I: StoredIndex, + T: StoredType, +{ #[inline] - pub fn get(&mut self, index: I) -> Result> { - self.get_(index.to_usize()?) - } - #[inline] - pub fn get_(&mut self, index: usize) -> Result> { - match self.index_to_pushed_index(index) { - Ok(index) => { - if let Some(index) = index { - return Ok(self.pushed().get(index)); - } - } - Err(Error::IndexTooHigh) => return Ok(None), - Err(Error::IndexTooLow) => {} - Err(error) => return Err(error), + fn get_(&mut self, index: usize) -> Result>> { + match self { + StoredVec::Raw(v) => v.get_(index), + StoredVec::Compressed(v) => v.get_(index), } - - let page_index = Self::index_to_page_index(index); - - if self.page().is_none_or(|b| b.0 != page_index) { - let values = self.decode_page(page_index)?; - self.mut_page().replace((page_index, values)); - } - - self.page().unwrap().1.get(index) } - pub fn get_last(&mut self) -> Result> { - let len = self.len(); - if len == 0 { - return Ok(None); - } - self.get_(len - 1) - } - - pub fn read(&self, index: I) -> Result> { - self.read_(index.to_usize()?) - } - pub fn read_(&self, index: usize) -> Result> { - Ok(match self { - Self::Raw { .. } => { - let mut file = self.open_file()?; - let byte_index = Self::index_to_byte_index(index); - file.seek(SeekFrom::Start(byte_index))?; - let mut buf = vec![0; Self::SIZE_OF_T]; - file.read_exact(&mut buf)?; - T::try_ref_from_bytes(&buf[..]).ok().map(|v| v.to_owned()) - } - Self::Compressed { .. } => self - .decode_page(Self::index_to_page_index(index))? - .get(index)? - .cloned(), - }) - } - - pub fn iter(&mut self, f: F) -> Result<()> - where - F: FnMut((I, &T)) -> Result<()>, - { - self.iter_from(I::default(), f) - } - - pub fn iter_from(&mut self, mut index: I, mut f: F) -> Result<()> - where - F: FnMut((I, &T)) -> Result<()>, - { - if !self.is_pushed_empty() { - return Err(Error::UnsupportedUnflushedState); - } - - let stored_len = I::from(self.stored_len()); - - while index < stored_len { - let v = self.get(index)?.unwrap(); - f((index, v))?; - index = index + 1; - } - - Ok(()) - } - - pub fn iter_from_cloned(&mut self, mut index: I, mut f: F) -> Result<()> + fn iter_from(&mut self, index: I, mut f: F) -> Result<()> where F: FnMut((I, T, &mut Self)) -> Result<()>, { - if !self.is_pushed_empty() { - return Err(Error::UnsupportedUnflushedState); - } - - let stored_len = I::from(self.stored_len()); - - while index < stored_len { - let v = self.get(index)?.unwrap().clone(); - f((index, v, self))?; - index = index + 1; - } - - Ok(()) + todo!(); + // match self { + // StoredVec::Raw(v) => v.iter_from(index, |(i, t, inner)| f((i, t, self))), + // StoredVec::Compressed(v) => v.iter_from(index, |(i, t, inner)| f((i, t, self))), + // } } - pub fn collect_range(&self, from: Option, to: Option) -> Result> { - if !self.is_pushed_empty() { - return Err(Error::UnsupportedUnflushedState); - } - - let len = self - .base() - .read_stored_length() - .unwrap() - .to_usize() - .unwrap(); - - if len == 0 { - return Err(Error::IndexTooHigh); - } - - let from = from.map_or(0, |from| { - if from >= 0 { - from as usize - } else { - let from = len as i64 + from; - if from < 0 { 0 } else { from as usize } - } - }); - - let to = to.map_or(len - 1, |to| { - if to >= 0 { - to as usize - } else { - let max = len - 1; - let to = max as i64 + to; - if to > max as i64 { max } else { to as usize } - } - }); - - if from > to { - return Err(Error::RangeFromAfterTo(from, to)); - } - - let mut page: Option<(usize, Values)> = None; - - let values = (from..=to) - .flat_map(|index| { - let page_index = Self::index_to_page_index(index); - - if page.as_ref().is_none_or(|b| b.0 != page_index) { - let pages_meta = match self { - Self::Raw { .. } => None, - Self::Compressed { .. } => Some(self.read_pages_meta().unwrap()), - }; - - let values = Self::decode_page_( - len, - page_index, - &self.base().open_file().unwrap(), - pages_meta.as_ref(), - ) - .inspect_err(|_| { - dbg!(from, to); - }) - .unwrap(); - page.replace((page_index, values)); - } - - page.as_ref().unwrap().1.get(index).ok().flatten().cloned() - }) - .collect::>(); - - Ok(values) - } - - pub fn decode_page(&self, page_index: usize) -> Result> { - Self::decode_page_( - self.stored_len(), - page_index, - self.file(), - match self { - Self::Raw { .. } => None, - Self::Compressed { pages_meta, .. } => Some(pages_meta), - }, - ) - } - - fn decode_page_( - stored_len: usize, - page_index: usize, - file: &File, - compressed_pages_meta: Option<&CompressedPagesMetadata>, - ) -> Result> { - if Self::page_index_to_index(page_index) >= stored_len { - return Err(Error::IndexTooHigh); - } - - let (len, offset) = if let Some(pages_meta) = compressed_pages_meta { - if pages_meta.len() <= page_index { - return Err(Error::ExpectVecToHaveIndex); - } - let page = pages_meta.get(page_index).unwrap(); - (page.bytes_len as usize, page.start) - } else { - (Self::PAGE_SIZE, Self::page_index_to_byte_index(page_index)) - }; - - let mmap = unsafe { - memmap2::MmapOptions::new() - .len(len) - .offset(offset) - .map(file)? - }; - - let compressed = compressed_pages_meta.is_some(); - - if compressed { - let decoded = zstd::decode_all(&mmap[..]); - - if decoded.is_err() { - dbg!((len, offset, page_index, &mmap[..], &mmap.len(), &decoded)); - } - - Ok(Values::from( - decoded? - .chunks(Self::SIZE_OF_T) - .map(|slice| T::try_read_from_bytes(slice).unwrap()) - .collect::>() - .into_boxed_slice(), - )) - } else { - Ok(Values::from(mmap)) - } - } - - #[inline] - pub fn push(&mut self, value: T) { - self.mut_base().pushed.push(value) - } - - pub fn flush(&mut self) -> io::Result<()> { - let pushed_len = self.pushed_len(); - - if pushed_len == 0 { - return Ok(()); - } - - let stored_len = self.stored_len(); - - let bytes = match self { - Self::Compressed { base, pages_meta } => { - let (starting_page_index, values) = if *base.stored_len % Self::PER_PAGE != 0 { - if pages_meta.is_empty() { - unreachable!() - } - - let last_page_index = pages_meta.len() - 1; - - let values = if let Some(values) = base - .pages - .as_mut() - .and_then(|big_cache| big_cache.last_mut().and_then(|lock| lock.take())) - { - values - } else if base - .page - .as_ref() - .is_some_and(|(page_index, _)| *page_index == last_page_index) - { - base.page.take().unwrap().1 - } else { - Self::decode_page_( - stored_len, - last_page_index, - &base.file, - Some(pages_meta), - ) - .inspect_err(|_| { - dbg!(last_page_index, &pages_meta); - }) - .unwrap() - }; - - let file_len = pages_meta.pop().unwrap().start; - - file_set_len(&mut base.file, file_len)?; - - (last_page_index, values) - } else { - (pages_meta.len(), Values::default()) - }; - - let compressed = Vec::from(values.as_arr()) - .into_par_iter() - .chain(mem::take(&mut base.pushed).into_par_iter()) - .chunks(Self::PER_PAGE) - .map(|chunk| (Self::compress_chunk(chunk.as_ref()), chunk.len())) - .collect::>(); - - compressed - .iter() - .enumerate() - .for_each(|(i, (compressed_bytes, values_len))| { - let page_index = starting_page_index + i; - - let start = if page_index != 0 { - let prev = pages_meta.get(page_index - 1).unwrap(); - prev.start + prev.bytes_len as u64 - } else { - 0 - }; - - let bytes_len = compressed_bytes.len() as u32; - let values_len = *values_len as u32; - - let page = CompressedPageMetadata::new(start, bytes_len, values_len); - - pages_meta.push(page_index, page); - }); - - pages_meta.write()?; - - compressed - .into_iter() - .flat_map(|(v, _)| v) - .collect::>() - } - Self::Raw { base } => { - let pushed = &mut base.pushed; - - let mut bytes: Vec = vec![0; pushed.len() * Self::SIZE_OF_T]; - - let unsafe_bytes = UnsafeSlice::new(&mut bytes); - - mem::take(pushed) - .into_par_iter() - .enumerate() - .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); - - bytes - } - }; - - let file = self.mut_file(); - file.write_all(&bytes)?; - file.sync_all()?; - - self.reset_caches(); - - self.increase_stored_len(pushed_len); - - self.write_stored_length()?; - - Ok(()) - } - - pub fn truncate_if_needed(&mut self, index: I) -> Result<()> { - let index = index.to_usize()?; - - if index >= self.stored_len() { - return Ok(()); - } - - if index == 0 { - self.reset()?; - return Ok(()); - } - - let page_index = Self::index_to_page_index(index); - - let values = match self { - Self::Compressed { .. } => self.decode_page(page_index)?, - Self::Raw { .. } => Values::default(), - }; - - let (len, bytes) = match self { - Self::Compressed { pages_meta, .. } => { - let mut page = pages_meta.truncate(page_index).unwrap(); - - let len = page.start; - - let decoded_index = Self::index_to_decoded_index(index); - - let compressed = if decoded_index != 0 { - let chunk = &values.as_arr()[..decoded_index]; - - let compressed = Self::compress_chunk(chunk); - - page.values_len = chunk.len() as u32; - page.bytes_len = compressed.len() as u32; - - pages_meta.push(page_index, page); - - compressed - } else { - vec![].into_boxed_slice() - }; - - pages_meta.write()?; - - (len, compressed) - } - Self::Raw { .. } => { - // let value_at_index = self.open_then_read_(index).ok(); - - let len = Self::index_to_byte_index(index); - - (len, vec![].into_boxed_slice()) - } - }; - - let file = self.mut_file(); - - file_set_len(file, len)?; - - if !bytes.is_empty() { - file.write_all(&bytes)?; - } - - self.set_stored_len(index); - - self.write_stored_length()?; - - self.reset_caches(); - - Ok(()) - } - - fn compress_chunk(chunk: &[T]) -> Box<[u8]> { - if chunk.len() > Self::PER_PAGE { - panic!(); - } - - let mut bytes: Vec = vec![0; chunk.len() * Self::SIZE_OF_T]; - - let unsafe_bytes = UnsafeSlice::new(&mut bytes); - - chunk - .into_par_iter() - .enumerate() - .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); - - zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL) - .unwrap() - .into_boxed_slice() - } - - pub fn enable_large_cache(&mut self) { - self.mut_pages().replace(vec![]); - self.reset_large_cache(); - } - - pub fn disable_large_cache(&mut self) { - self.mut_base().pages.take(); - } - - fn reset_large_cache(&mut self) { - let stored_len = self.stored_len(); - - if let Some(pages) = self.mut_pages().as_mut() { - pages.par_iter_mut().for_each(|lock| { - lock.take(); - }); - - let len = (stored_len as f64 / Self::PER_PAGE as f64).ceil() as usize; - let len = Self::CACHE_LENGTH.min(len); - - if pages.len() != len { - pages.resize_with(len, Default::default); - } - } - } - - pub fn large_cache_len(&self) -> usize { - self.pages().map_or(0, |v| v.len()) - } - - fn reset_small_cache(&mut self) { - self.mut_base().page.take(); - } - - fn reset_caches(&mut self) { - self.reset_small_cache(); - self.reset_large_cache(); - } - - pub fn reset(&mut self) -> Result<()> { - self.mut_base().reset_file()?; - self.reset_stored_len(); - self.reset_caches(); - Ok(()) - } - - #[inline] - pub fn index_to_pushed_index(&self, index: usize) -> Result> { - let stored_len = self.stored_len(); - - if index >= stored_len { - let index = index - stored_len; - if index >= self.pushed_len() { - Err(Error::IndexTooHigh) - } else { - Ok(Some(index)) - } - } else { - Err(Error::IndexTooLow) - } - } - - #[inline] - fn index_to_byte_index(index: usize) -> u64 { - (index * Self::SIZE_OF_T) as u64 - } - - #[inline(always)] - fn index_to_page_index(index: usize) -> usize { - index / Self::PER_PAGE - } - - #[inline(always)] - fn page_index_to_index(page_index: usize) -> usize { - page_index * Self::PER_PAGE - } - - #[inline(always)] - fn page_index_to_byte_index(page_index: usize) -> u64 { - (page_index * Self::PAGE_SIZE) as u64 - } - - #[inline(always)] - fn index_to_decoded_index(index: usize) -> usize { - index % Self::PER_PAGE - } - - #[inline] - fn path_pages_meta_(path: &Path) -> PathBuf { - path.join("pages_meta") - } - - #[inline] - fn page(&self) -> Option<&(usize, Values)> { - self.base().page.as_ref() - } - - #[inline] - fn mut_page(&mut self) -> &mut Option<(usize, Values)> { - &mut self.mut_base().page - } - - #[inline] - pub fn pages(&self) -> Option<&Vec>>> { - self.base().pages.as_ref() - } - - #[inline] - fn mut_pages(&mut self) -> &mut Option>>> { - &mut self.mut_base().pages - } - - #[inline] - pub fn len(&self) -> usize { - self.stored_len() + self.pushed_len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - #[inline] - pub fn has(&self, index: I) -> Result { - Ok(self.has_(index.to_usize()?)) - } - #[inline] - fn has_(&self, index: usize) -> bool { - index < self.len() - } - - #[inline] - pub fn pushed(&self) -> &Vec { - &self.base().pushed - } - - #[inline] - pub fn pushed_len(&self) -> usize { - self.pushed().len() - } - - #[inline] - fn is_pushed_empty(&self) -> bool { - self.pushed_len() == 0 - } - - #[inline] - pub fn stored_len(&self) -> usize { - *self.base().stored_len - } - - #[inline] - fn set_stored_len(&mut self, len: usize) { - *self.mut_base().stored_len = len; - } - - fn increase_stored_len(&mut self, len: usize) { - *self.mut_base().stored_len += len; - } - - #[inline] - fn reset_stored_len(&mut self) { - self.set_stored_len(0); - } - - fn write_stored_length(&self) -> io::Result<()> { - self.base().write_stored_length() - } - - #[inline] - pub fn path(&self) -> &Path { - &self.base().pathbuf - } - - fn file(&self) -> &File { - &self.base().file - } - - fn mut_file(&mut self) -> &mut File { - &mut self.mut_base().file - } - - fn open_file(&self) -> io::Result { - self.base().open_file() - } - - pub fn file_name(&self) -> String { - self.path() - .file_name() - .unwrap() - .to_str() - .unwrap() - .to_owned() - } - - #[inline] - pub fn version(&self) -> Version { - self.base().version - } - - #[inline] - fn compressed(&self) -> Compressed { - self.base().compressed - } - - #[inline] - fn base(&self) -> &Base { + fn collect_range(&self, from: Option, to: Option) -> Result>> { match self { - Self::Raw { base, .. } => base, - Self::Compressed { base, .. } => base, + StoredVec::Raw(v) => v.collect_range(from, to), + StoredVec::Compressed(v) => v.collect_range(from, to), } } - #[inline] - fn mut_base(&mut self) -> &mut Base { + fn flush(&mut self) -> Result<()> { match self { - Self::Raw { base, .. } => base, - Self::Compressed { base, .. } => base, + StoredVec::Raw(v) => v.flush(), + StoredVec::Compressed(v) => v.flush(), } } - pub fn index_type_to_string(&self) -> &str { - I::to_string() + fn truncate_if_needed(&mut self, index: I) -> Result<()> { + match self { + StoredVec::Raw(v) => v.truncate_if_needed(index), + StoredVec::Compressed(v) => v.truncate_if_needed(index), + } + } + + #[inline] + fn mmap(&self) -> &ArcSwap { + match self { + StoredVec::Raw(v) => v.mmap(), + StoredVec::Compressed(v) => v.mmap(), + } + } + + #[inline] + fn guard(&self) -> &Option>> { + match self { + StoredVec::Raw(v) => v.guard(), + StoredVec::Compressed(v) => v.guard(), + } + } + #[inline] + fn mut_guard(&mut self) -> &mut Option>> { + match self { + StoredVec::Raw(v) => v.mut_guard(), + StoredVec::Compressed(v) => v.mut_guard(), + } + } + + #[inline] + fn pushed(&self) -> &[T] { + match self { + StoredVec::Raw(v) => v.pushed(), + StoredVec::Compressed(v) => v.pushed(), + } + } + #[inline] + fn mut_pushed(&mut self) -> &mut Vec { + match self { + StoredVec::Raw(v) => v.mut_pushed(), + StoredVec::Compressed(v) => v.mut_pushed(), + } + } + + #[inline] + fn path(&self) -> &Path { + match self { + StoredVec::Raw(v) => v.path(), + StoredVec::Compressed(v) => v.path(), + } + } + + #[inline] + fn version(&self) -> Version { + match self { + StoredVec::Raw(v) => v.version(), + StoredVec::Compressed(v) => v.version(), + } } } - -#[derive(Debug)] -struct Base { - pub version: Version, - pub pathbuf: PathBuf, - pub stored_len: Length, - pub compressed: Compressed, - pub page: Option<(usize, Values)>, - pub pages: Option>>>, - pub pushed: Vec, - pub file: File, - pub phantom: PhantomData, -} - -impl Base { - pub fn import(path: &Path, version: Version, compressed: Compressed) -> Result { - fs::create_dir_all(path)?; - - let version_path = Self::path_version_(path); - version.validate(version_path.as_ref())?; - version.write(version_path.as_ref())?; - - let compressed_path = Self::path_compressed_(path); - compressed.validate(compressed_path.as_ref())?; - compressed.write(compressed_path.as_ref())?; - - let stored_len = Length::try_from(Self::path_length_(path).as_path())?; - - Ok(Self { - version, - compressed, - pathbuf: path.to_owned(), - file: Self::open_file_(Self::path_vec_(path).as_path())?, - stored_len, - page: None, - pages: None, - pushed: vec![], - phantom: PhantomData, - }) - } - - fn open_file(&self) -> io::Result { - Self::open_file_(&self.path_vec()) - } - fn open_file_(path: &Path) -> io::Result { - OpenOptions::new() - .read(true) - .create(true) - .truncate(false) - .append(true) - .open(path) - } - - fn reset_file(&mut self) -> Result<()> { - file_set_len(&mut self.file, 0)?; - Ok(()) - } - - #[inline] - fn path_vec(&self) -> PathBuf { - Self::path_vec_(&self.pathbuf) - } - #[inline] - fn path_vec_(path: &Path) -> PathBuf { - path.join("vec") - } - - pub fn read_stored_length(&self) -> Result { - Length::try_from(self.path_length().as_path()) - } - fn write_stored_length(&self) -> io::Result<()> { - self.stored_len.write(&self.path_length()) - } - #[inline] - fn path_length(&self) -> PathBuf { - Self::path_length_(&self.pathbuf) - } - #[inline] - fn path_length_(path: &Path) -> PathBuf { - path.join("length") - } - - #[inline] - fn path_version_(path: &Path) -> PathBuf { - path.join("version") - } - - #[inline] - fn path_compressed_(path: &Path) -> PathBuf { - path.join("compressed") - } -} - -impl Clone for StorableVec -where - I: StoredIndex, - T: StoredType, -{ - fn clone(&self) -> Self { - Self::import(self.path(), self.version(), self.compressed()).unwrap() - } -} - -fn file_set_len(file: &mut File, len: u64) -> io::Result<()> { - file.set_len(len)?; - file.seek(SeekFrom::End(0))?; - Ok(()) -} diff --git a/crates/brk_vec/src/lib2.rs b/crates/brk_vec/src/lib2.rs new file mode 100644 index 000000000..85e493a3e --- /dev/null +++ b/crates/brk_vec/src/lib2.rs @@ -0,0 +1,878 @@ +#![doc = include_str!("../README.md")] +#![doc = "\n## Example\n\n```rust"] +#![doc = include_str!("../examples/main.rs")] +#![doc = "```"] + +use std::{ + fs::{self, File, OpenOptions}, + io::{self, Read, Seek, SeekFrom, Write}, + marker::PhantomData, + mem, + path::{Path, PathBuf}, + sync::{Arc, OnceLock}, +}; + +use arc_swap::ArcSwap; +pub use memmap2; +use memmap2::Mmap; +use rayon::prelude::*; +pub use zerocopy; +use zstd::DEFAULT_COMPRESSION_LEVEL; + +mod enums; +mod structs; +mod traits; + +pub use enums::*; +pub use structs::*; +pub use traits::*; + +const ONE_KIB: usize = 1024; +pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB; +const ONE_MIB: usize = ONE_KIB * ONE_KIB; +pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; + +#[allow(private_interfaces)] +#[derive(Debug)] +pub enum StorableVec { + Raw { + base: Base, + }, + Compressed { + base: Base, + decoded_page: Option<(usize, Box<[T]>)>, + decoded_pages: Option>>>, + // pages: Option>>>, + // page: Option<(usize, Values)>, + pages_meta: CompressedPagesMetadata, + }, +} + +impl StorableVec +where + I: StoredIndex, + T: StoredType, +{ + pub const SIZE_OF_T: usize = size_of::(); + pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; + pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; + pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; + + /// Same as import but will reset the folder under certain errors, so be careful ! + pub fn forced_import(path: &Path, version: Version, compressed: Compressed) -> Result { + let res = Self::import(path, version, compressed); + match res { + Err(Error::WrongEndian) + | Err(Error::DifferentCompressionMode) + | Err(Error::DifferentVersion { + found: _, + expected: _, + }) => { + fs::remove_dir_all(path)?; + Self::import(path, version, compressed) + } + _ => res, + } + } + + pub fn import(path: &Path, version: Version, compressed: Compressed) -> Result { + let base = Base::import(path, version, compressed)?; + + if *compressed { + let pages_meta = Self::read_pages_meta_(path)?; + + Ok(Self::Compressed { + base, + page: None, + pages: None, + pages_meta, + }) + } else { + Ok(Self::Raw { base }) + } + } + + fn read_pages_meta(&self) -> Result { + Self::read_pages_meta_(self.path()) + } + fn read_pages_meta_(path: &Path) -> Result { + CompressedPagesMetadata::read(Self::path_pages_meta_(path).as_path()) + } + + #[inline] + pub fn get(&mut self, index: I) -> Result> { + self.get_(index.to_usize()?) + } + #[inline] + pub fn get_(&mut self, index: usize) -> Result> { + match self.index_to_pushed_index(index) { + Ok(index) => { + if let Some(index) = index { + return Ok(self.pushed().get(index)); + } + } + Err(Error::IndexTooHigh) => return Ok(None), + Err(Error::IndexTooLow) => {} + Err(error) => return Err(error), + } + + let page_index = Self::index_to_page_index(index); + + if self.page().is_none_or(|b| b.0 != page_index) { + let values = self.decode_page(page_index)?; + self.mut_page().replace((page_index, values)); + } + + self.page().unwrap().1.get(index) + } + + pub fn get_last(&mut self) -> Result> { + let len = self.len(); + if len == 0 { + return Ok(None); + } + self.get_(len - 1) + } + + pub fn read(&self, index: I) -> Result> { + self.read_(index.to_usize()?) + } + pub fn read_(&self, index: usize) -> Result> { + Ok(match self { + Self::Raw { .. } => { + let mut file = self.open_file()?; + let byte_index = Self::index_to_byte_index(index); + file.seek(SeekFrom::Start(byte_index))?; + let mut buf = vec![0; Self::SIZE_OF_T]; + file.read_exact(&mut buf)?; + T::try_ref_from_bytes(&buf[..]).ok().map(|v| v.to_owned()) + } + Self::Compressed { .. } => self + .decode_page(Self::index_to_page_index(index))? + .get(index)? + .cloned(), + }) + } + + pub fn iter(&mut self, f: F) -> Result<()> + where + F: FnMut((I, &T)) -> Result<()>, + { + self.iter_from(I::default(), f) + } + + pub fn iter_from(&mut self, mut index: I, mut f: F) -> Result<()> + where + F: FnMut((I, &T)) -> Result<()>, + { + if !self.is_pushed_empty() { + return Err(Error::UnsupportedUnflushedState); + } + + let stored_len = I::from(self.stored_len()); + + while index < stored_len { + let v = self.get(index)?.unwrap(); + f((index, v))?; + index = index + 1; + } + + Ok(()) + } + + pub fn iter_from_cloned(&mut self, mut index: I, mut f: F) -> Result<()> + where + F: FnMut((I, T, &mut Self)) -> Result<()>, + { + if !self.is_pushed_empty() { + return Err(Error::UnsupportedUnflushedState); + } + + let stored_len = I::from(self.stored_len()); + + while index < stored_len { + let v = self.get(index)?.unwrap().clone(); + f((index, v, self))?; + index = index + 1; + } + + Ok(()) + } + + pub fn collect_range(&self, from: Option, to: Option) -> Result> { + if !self.is_pushed_empty() { + return Err(Error::UnsupportedUnflushedState); + } + + let len = self + .base() + .read_stored_length() + .unwrap() + .to_usize() + .unwrap(); + + if len == 0 { + return Err(Error::IndexTooHigh); + } + + let from = from.map_or(0, |from| { + if from >= 0 { + from as usize + } else { + let from = len as i64 + from; + if from < 0 { 0 } else { from as usize } + } + }); + + let to = to.map_or(len - 1, |to| { + if to >= 0 { + to as usize + } else { + let max = len - 1; + let to = max as i64 + to; + if to > max as i64 { max } else { to as usize } + } + }); + + if from > to { + return Err(Error::RangeFromAfterTo(from, to)); + } + + let mut page: Option<(usize, Values)> = None; + + let values = (from..=to) + .flat_map(|index| { + let page_index = Self::index_to_page_index(index); + + if page.as_ref().is_none_or(|b| b.0 != page_index) { + let pages_meta = match self { + Self::Raw { .. } => None, + Self::Compressed { .. } => Some(self.read_pages_meta().unwrap()), + }; + + let values = Self::decode_page_( + len, + page_index, + &self.base().open_file().unwrap(), + pages_meta.as_ref(), + ) + .inspect_err(|_| { + dbg!(from, to); + }) + .unwrap(); + page.replace((page_index, values)); + } + + page.as_ref().unwrap().1.get(index).ok().flatten().cloned() + }) + .collect::>(); + + Ok(values) + } + + pub fn decode_page(&self, page_index: usize) -> Result> { + Self::decode_page_( + self.stored_len(), + page_index, + self.file(), + match self { + Self::Raw { .. } => None, + Self::Compressed { pages_meta, .. } => Some(pages_meta), + }, + ) + } + + fn decode_page_( + stored_len: usize, + page_index: usize, + file: &File, + compressed_pages_meta: Option<&CompressedPagesMetadata>, + ) -> Result> { + if Self::page_index_to_index(page_index) >= stored_len { + return Err(Error::IndexTooHigh); + } + + let (len, offset) = if let Some(pages_meta) = compressed_pages_meta { + if pages_meta.len() <= page_index { + return Err(Error::ExpectVecToHaveIndex); + } + let page = pages_meta.get(page_index).unwrap(); + (page.bytes_len as usize, page.start) + } else { + (Self::PAGE_SIZE, Self::page_index_to_byte_index(page_index)) + }; + + let mmap = unsafe { + memmap2::MmapOptions::new() + .len(len) + .offset(offset) + .map(file)? + }; + + let compressed = compressed_pages_meta.is_some(); + + if compressed { + let decoded = zstd::decode_all(&mmap[..]); + + if decoded.is_err() { + dbg!((len, offset, page_index, &mmap[..], &mmap.len(), &decoded)); + } + + Ok(Values::from( + decoded? + .chunks(Self::SIZE_OF_T) + .map(|slice| T::try_read_from_bytes(slice).unwrap()) + .collect::>() + .into_boxed_slice(), + )) + } else { + Ok(Values::from(mmap)) + } + } + + #[inline] + pub fn push(&mut self, value: T) { + self.mut_base().pushed.push(value) + } + + pub fn flush(&mut self) -> io::Result<()> { + let pushed_len = self.pushed_len(); + + if pushed_len == 0 { + return Ok(()); + } + + let stored_len = self.stored_len(); + + let bytes = match self { + Self::Compressed { base, pages_meta } => { + let (starting_page_index, values) = if *base.stored_len % Self::PER_PAGE != 0 { + if pages_meta.is_empty() { + unreachable!() + } + + let last_page_index = pages_meta.len() - 1; + + let values = if let Some(values) = base + .pages + .as_mut() + .and_then(|big_cache| big_cache.last_mut().and_then(|lock| lock.take())) + { + values + } else if base + .page + .as_ref() + .is_some_and(|(page_index, _)| *page_index == last_page_index) + { + base.page.take().unwrap().1 + } else { + Self::decode_page_( + stored_len, + last_page_index, + &base.file, + Some(pages_meta), + ) + .inspect_err(|_| { + dbg!(last_page_index, &pages_meta); + }) + .unwrap() + }; + + let file_len = pages_meta.pop().unwrap().start; + + file_set_len(&mut base.file, file_len)?; + + (last_page_index, values) + } else { + (pages_meta.len(), Values::default()) + }; + + let compressed = Vec::from(values.as_arr()) + .into_par_iter() + .chain(mem::take(&mut base.pushed).into_par_iter()) + .chunks(Self::PER_PAGE) + .map(|chunk| (Self::compress_chunk(chunk.as_ref()), chunk.len())) + .collect::>(); + + compressed + .iter() + .enumerate() + .for_each(|(i, (compressed_bytes, values_len))| { + let page_index = starting_page_index + i; + + let start = if page_index != 0 { + let prev = pages_meta.get(page_index - 1).unwrap(); + prev.start + prev.bytes_len as u64 + } else { + 0 + }; + + let bytes_len = compressed_bytes.len() as u32; + let values_len = *values_len as u32; + + let page = CompressedPageMetadata::new(start, bytes_len, values_len); + + pages_meta.push(page_index, page); + }); + + pages_meta.write()?; + + compressed + .into_iter() + .flat_map(|(v, _)| v) + .collect::>() + } + Self::Raw { base } => { + let pushed = &mut base.pushed; + + let mut bytes: Vec = vec![0; pushed.len() * Self::SIZE_OF_T]; + + let unsafe_bytes = UnsafeSlice::new(&mut bytes); + + mem::take(pushed) + .into_par_iter() + .enumerate() + .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); + + bytes + } + }; + + let file = self.mut_file(); + file.write_all(&bytes)?; + file.sync_all()?; + + self.reset_caches(); + + self.increase_stored_len(pushed_len); + + self.write_stored_length()?; + + Ok(()) + } + + pub fn truncate_if_needed(&mut self, index: I) -> Result<()> { + let index = index.to_usize()?; + + if index >= self.stored_len() { + return Ok(()); + } + + if index == 0 { + self.reset()?; + return Ok(()); + } + + let page_index = Self::index_to_page_index(index); + + let values = match self { + Self::Compressed { .. } => self.decode_page(page_index)?, + Self::Raw { .. } => Values::default(), + }; + + let (len, bytes) = match self { + Self::Compressed { pages_meta, .. } => { + let mut page = pages_meta.truncate(page_index).unwrap(); + + let len = page.start; + + let decoded_index = Self::index_to_decoded_index(index); + + let compressed = if decoded_index != 0 { + let chunk = &values.as_arr()[..decoded_index]; + + let compressed = Self::compress_chunk(chunk); + + page.values_len = chunk.len() as u32; + page.bytes_len = compressed.len() as u32; + + pages_meta.push(page_index, page); + + compressed + } else { + vec![].into_boxed_slice() + }; + + pages_meta.write()?; + + (len, compressed) + } + Self::Raw { .. } => { + // let value_at_index = self.open_then_read_(index).ok(); + + let len = Self::index_to_byte_index(index); + + (len, vec![].into_boxed_slice()) + } + }; + + let file = self.mut_file(); + + file_set_len(file, len)?; + + if !bytes.is_empty() { + file.write_all(&bytes)?; + } + + self.set_stored_len(index); + + self.write_stored_length()?; + + self.reset_caches(); + + Ok(()) + } + + fn compress_chunk(chunk: &[T]) -> Box<[u8]> { + if chunk.len() > Self::PER_PAGE { + panic!(); + } + + let mut bytes: Vec = vec![0; chunk.len() * Self::SIZE_OF_T]; + + let unsafe_bytes = UnsafeSlice::new(&mut bytes); + + chunk + .into_par_iter() + .enumerate() + .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); + + zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL) + .unwrap() + .into_boxed_slice() + } + + pub fn enable_large_cache_if_possible(&mut self) { + self.mut_pages().replace(vec![]); + self.reset_large_cache(); + } + + pub fn disable_large_cache_if_possible(&mut self) { + self.mut_base().pages.take(); + } + + fn reset_large_cache(&mut self) { + let stored_len = self.stored_len(); + + if let Some(pages) = self.mut_pages().as_mut() { + pages.par_iter_mut().for_each(|lock| { + lock.take(); + }); + + let len = (stored_len as f64 / Self::PER_PAGE as f64).ceil() as usize; + let len = Self::CACHE_LENGTH.min(len); + + if pages.len() != len { + pages.resize_with(len, Default::default); + } + } + } + + pub fn large_cache_len(&self) -> usize { + self.pages().map_or(0, |v| v.len()) + } + + fn reset_small_cache(&mut self) { + self.mut_base().page.take(); + } + + fn reset_caches(&mut self) { + self.reset_small_cache(); + self.reset_large_cache(); + } + + pub fn reset(&mut self) -> Result<()> { + self.mut_base().reset_file()?; + self.reset_stored_len(); + self.reset_caches(); + Ok(()) + } + + #[inline] + pub fn index_to_pushed_index(&self, index: usize) -> Result> { + let stored_len = self.stored_len(); + + if index >= stored_len { + let index = index - stored_len; + if index >= self.pushed_len() { + Err(Error::IndexTooHigh) + } else { + Ok(Some(index)) + } + } else { + Err(Error::IndexTooLow) + } + } + + #[inline] + fn index_to_byte_index(index: usize) -> u64 { + (index * Self::SIZE_OF_T) as u64 + } + + #[inline(always)] + fn index_to_page_index(index: usize) -> usize { + index / Self::PER_PAGE + } + + #[inline(always)] + fn page_index_to_index(page_index: usize) -> usize { + page_index * Self::PER_PAGE + } + + #[inline(always)] + fn page_index_to_byte_index(page_index: usize) -> u64 { + (page_index * Self::PAGE_SIZE) as u64 + } + + #[inline(always)] + fn index_to_decoded_index(index: usize) -> usize { + index % Self::PER_PAGE + } + + #[inline] + fn path_pages_meta_(path: &Path) -> PathBuf { + path.join("pages_meta") + } + + // #[inline] + // fn page(&self) -> Option<&(usize, Values)> { + // self.base().page.as_ref() + // } + + // #[inline] + // fn mut_page(&mut self) -> &mut Option<(usize, Values)> { + // &mut self.mut_base().page + // } + + // #[inline] + // pub fn pages(&self) -> Option<&Vec>>> { + // self.base().pages.as_ref() + // } + + // #[inline] + // fn mut_pages(&mut self) -> &mut Option>>> { + // &mut self.mut_base().pages + // } + + #[inline] + pub fn len(&self) -> usize { + self.stored_len() + self.pushed_len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + #[inline] + pub fn has(&self, index: I) -> Result { + Ok(self.has_(index.to_usize()?)) + } + #[inline] + fn has_(&self, index: usize) -> bool { + index < self.len() + } + + #[inline] + pub fn pushed(&self) -> &Vec { + &self.base().pushed + } + + #[inline] + pub fn pushed_len(&self) -> usize { + self.pushed().len() + } + + #[inline] + fn is_pushed_empty(&self) -> bool { + self.pushed_len() == 0 + } + + #[inline] + pub fn stored_len(&self) -> usize { + *self.base().stored_len + } + + #[inline] + fn set_stored_len(&mut self, len: usize) { + *self.mut_base().stored_len = len; + } + + fn increase_stored_len(&mut self, len: usize) { + *self.mut_base().stored_len += len; + } + + #[inline] + fn reset_stored_len(&mut self) { + self.set_stored_len(0); + } + + fn write_stored_length(&self) -> io::Result<()> { + self.base().write_stored_length() + } + + #[inline] + pub fn path(&self) -> &Path { + &self.base().pathbuf + } + + fn file(&self) -> &File { + &self.base().file + } + + fn mut_file(&mut self) -> &mut File { + &mut self.mut_base().file + } + + fn open_file(&self) -> io::Result { + self.base().open_file() + } + + pub fn file_name(&self) -> String { + self.path() + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_owned() + } + + #[inline] + pub fn version(&self) -> Version { + self.base().version + } + + #[inline] + fn compressed(&self) -> Compressed { + self.base().compressed + } + + #[inline] + fn base(&self) -> &Base { + match self { + Self::Raw { base, .. } => base, + Self::Compressed { base, .. } => base, + } + } + + #[inline] + fn mut_base(&mut self) -> &mut Base { + match self { + Self::Raw { base, .. } => base, + Self::Compressed { base, .. } => base, + } + } + + pub fn index_type_to_string(&self) -> &str { + I::to_string() + } +} + +#[derive(Debug)] +struct Base { + pub version: Version, + pub pathbuf: PathBuf, + pub stored_len: Length, + pub compressed: Compressed, + pub mmap: ArcSwap, + // pub page: Option<(usize, Values)>, + // pub pages: Option>>>, + pub pushed: Vec, + pub file: File, + pub phantom: PhantomData, +} + +impl Base { + pub fn import(path: &Path, version: Version, compressed: Compressed) -> Result { + fs::create_dir_all(path)?; + + let version_path = Self::path_version_(path); + version.validate(version_path.as_ref())?; + version.write(version_path.as_ref())?; + + let compressed_path = Self::path_compressed_(path); + compressed.validate(compressed_path.as_ref())?; + compressed.write(compressed_path.as_ref())?; + + let stored_len = Length::try_from(Self::path_length_(path).as_path())?; + + let file = Self::open_file_(Self::path_vec_(path).as_path())?; + + Ok(Self { + mmap: ArcSwap::new(Arc::new(unsafe { Mmap::map(&file)? })), + version, + compressed, + pathbuf: path.to_owned(), + file, + stored_len, + pushed: vec![], + phantom: PhantomData, + }) + } + + fn open_file(&self) -> io::Result { + Self::open_file_(&self.path_vec()) + } + fn open_file_(path: &Path) -> io::Result { + OpenOptions::new() + .read(true) + .create(true) + .truncate(false) + .append(true) + .open(path) + } + + fn reset_file(&mut self) -> Result<()> { + file_set_len(&mut self.file, 0)?; + Ok(()) + } + + #[inline] + fn path_vec(&self) -> PathBuf { + Self::path_vec_(&self.pathbuf) + } + #[inline] + fn path_vec_(path: &Path) -> PathBuf { + path.join("vec") + } + + pub fn read_stored_length(&self) -> Result { + Length::try_from(self.path_length().as_path()) + } + fn write_stored_length(&self) -> io::Result<()> { + self.stored_len.write(&self.path_length()) + } + #[inline] + fn path_length(&self) -> PathBuf { + Self::path_length_(&self.pathbuf) + } + #[inline] + fn path_length_(path: &Path) -> PathBuf { + path.join("length") + } + + #[inline] + fn path_version_(path: &Path) -> PathBuf { + path.join("version") + } + + #[inline] + fn path_compressed_(path: &Path) -> PathBuf { + path.join("compressed") + } +} + +impl Clone for StorableVec +where + I: StoredIndex, + T: StoredType, +{ + fn clone(&self) -> Self { + Self::import(self.path(), self.version(), self.compressed()).unwrap() + } +} + +fn file_set_len(file: &mut File, len: u64) -> io::Result<()> { + file.set_len(len)?; + file.seek(SeekFrom::End(0))?; + Ok(()) +} diff --git a/crates/brk_vec/src/traits/any.rs b/crates/brk_vec/src/traits/any.rs index 1a7ccd843..a2c2dbc5c 100644 --- a/crates/brk_vec/src/traits/any.rs +++ b/crates/brk_vec/src/traits/any.rs @@ -1,61 +1,247 @@ -use std::{io, path::PathBuf}; +// use std::{io, path::PathBuf}; -use crate::{Result, StorableVec}; +// use crate::{Result}; + +use std::{ + fs::{File, OpenOptions}, + io::{self, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use arc_swap::{ArcSwap, Guard}; +use axum::{ + Json, + response::{IntoResponse, Response}, +}; +use memmap2::Mmap; + +use crate::{Error, Result, Value, Version}; use super::{StoredIndex, StoredType}; -pub trait AnyStorableVec: Send + Sync { - fn file_name(&self) -> String; - fn index_type_to_string(&self) -> &str; - fn len(&self) -> usize; - fn is_empty(&self) -> bool; - fn collect_range_values( - &self, - from: Option, - to: Option, - ) -> Result>; - fn flush(&mut self) -> io::Result<()>; - fn path_vec(&self) -> PathBuf; -} - -impl AnyStorableVec for StorableVec +pub trait AnyVec: Send + Sync where - I: StoredIndex, + I: StoredIndex + Sized, T: StoredType, + Self: Sized, { - fn file_name(&self) -> String { - self.file_name() + const SIZE_OF_T: usize = size_of::(); + + fn open_file(&self) -> io::Result { + Self::open_file_(&self.path_vec()) + } + fn open_file_(path: &Path) -> io::Result { + OpenOptions::new() + .read(true) + .create(true) + .truncate(false) + .append(true) + .open(path) } - fn index_type_to_string(&self) -> &str { - self.index_type_to_string() + fn file_set_len(&mut self, len: u64) -> Result<()> { + let mut file = self.open_file()?; + file.set_len(len)?; + file.seek(SeekFrom::End(0))?; + self.update_mmap(file) } + fn file_write_all(&mut self, buf: &[u8]) -> Result<()> { + let mut file = self.open_file()?; + file.write_all(buf)?; + self.update_mmap(file) + } + + #[inline] + fn reset(&mut self) -> Result<()> { + self.file_write_all(&[])?; + Ok(()) + } + + fn mmap(&self) -> &ArcSwap; + + #[inline] + fn new_guard(&self) -> Guard> { + self.mmap().load() + } + fn guard(&self) -> &Option>>; + fn mut_guard(&mut self) -> &mut Option>>; + #[inline] + fn guard_to_value(guard: &Guard>, index: usize) -> T { + let index = index * Self::SIZE_OF_T; + let slice = &guard[index..(index + Self::SIZE_OF_T)]; + + let v = T::try_ref_from_bytes(slice).unwrap(); + + v.clone() + } + + fn new_mmap(file: File) -> Result> { + Ok(Arc::new(unsafe { Mmap::map(&file)? })) + } + + fn update_mmap(&mut self, file: File) -> Result<()> { + file.sync_all()?; + let mmap = Self::new_mmap(file)?; + self.mmap().store(mmap); + if self.guard().is_some() { + let guard = self.new_guard(); + self.mut_guard().replace(guard); + } else { + unreachable!() + } + Ok(()) + } + + #[inline] + fn get(&mut self, index: I) -> Result>> { + self.get_(index.to_usize()?) + } + fn get_(&mut self, index: usize) -> Result>>; + fn get_last(&mut self) -> Result>> { + let len = self.len(); + if len == 0 { + return Ok(None); + } + self.get_(len - 1) + } + + #[inline] + fn stored_len(&self) -> usize { + if let Some(guard) = self.guard() { + guard.len() / Self::SIZE_OF_T + } else { + self.new_guard().len() / Self::SIZE_OF_T + } + } + + fn pushed(&self) -> &[T]; + #[inline] + fn pushed_len(&self) -> usize { + self.pushed().len() + } + fn mut_pushed(&mut self) -> &mut Vec; + #[inline] + fn push(&mut self, value: T) { + self.mut_pushed().push(value) + } + + #[inline] + fn is_pushed_empty(&self) -> bool { + self.pushed_len() == 0 + } + + #[inline] + fn index_to_pushed_index(&self, index: usize) -> Result> { + let stored_len = self.stored_len(); + + if index >= stored_len { + let index = index - stored_len; + if index >= self.pushed_len() { + Err(Error::IndexTooHigh) + } else { + Ok(Some(index)) + } + } else { + Err(Error::IndexTooLow) + } + } + + #[inline] fn len(&self) -> usize { - self.len() + self.stored_len() + self.pushed_len() } + #[inline] + fn has(&self, index: I) -> Result { + Ok(self.has_(index.to_usize()?)) + } + #[inline] + fn has_(&self, index: usize) -> bool { + index < self.len() + } + + #[inline] fn is_empty(&self) -> bool { - self.is_empty() + self.len() == 0 } - fn flush(&mut self) -> io::Result<()> { - self.flush() + #[inline] + fn index_type_to_string(&self) -> &str { + I::to_string() } - fn collect_range_values( - &self, - from: Option, - to: Option, - ) -> Result> { - Ok(self - .collect_range(from, to)? - .into_iter() - .map(|v| serde_json::to_value(v).unwrap()) - .collect::>()) + #[inline] + fn iter(&mut self, f: F) -> Result<()> + where + F: FnMut((I, T, &mut Self)) -> Result<()>, + { + self.iter_from(I::default(), f) } + fn iter_from(&mut self, index: I, f: F) -> Result<()> + where + F: FnMut((I, T, &mut Self)) -> Result<()>; + + fn fix_i64(i: i64, len: usize, from: bool) -> usize { + if i >= 0 { + let v = i as usize; + if v < len { + v + } else if from { + len - 1 + } else { + len + } + } else { + let v = len as i64 + i; + if v < 0 { 0 } else { v as usize } + } + } + + fn flush(&mut self) -> Result<()>; + + fn truncate_if_needed(&mut self, index: I) -> Result<()>; + + fn collect_range(&self, from: Option, to: Option) -> Result>>; + + fn collect_range_response(&self, from: Option, to: Option) -> Result { + Ok(self.collect_range(from, to)?.into_response()) + } + + fn path(&self) -> &Path; + + #[inline] fn path_vec(&self) -> PathBuf { - self.base().path_vec() + Self::path_vec_(self.path()) + } + #[inline] + fn path_vec_(path: &Path) -> PathBuf { + path.join("vec") + } + + #[inline] + fn path_version_(path: &Path) -> PathBuf { + path.join("version") + } + + fn file_name(&self) -> String { + self.path() + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_owned() + } + + fn version(&self) -> Version; + + fn any(&self) -> &Self { + self + } + + fn mut_any(&mut self) -> &mut Self { + self } } diff --git a/crates/brk_vec/src/traits/mod.rs b/crates/brk_vec/src/traits/mod.rs index c12f4f424..44d292241 100644 --- a/crates/brk_vec/src/traits/mod.rs +++ b/crates/brk_vec/src/traits/mod.rs @@ -1,5 +1,4 @@ mod any; -// mod bytes; mod stored_index; mod stored_type; diff --git a/crates/brk_vec/src/variants/compressed.rs b/crates/brk_vec/src/variants/compressed.rs new file mode 100644 index 000000000..6297bfdfe --- /dev/null +++ b/crates/brk_vec/src/variants/compressed.rs @@ -0,0 +1,117 @@ +use std::{ + fs, + path::Path, + sync::{Arc, OnceLock}, +}; + +use arc_swap::{ArcSwap, Guard}; +use axum::Json; +use memmap2::Mmap; + +use crate::{ + AnyVec, CompressedPagesMetadata, Error, RawVec, Result, StoredIndex, StoredType, Value, Version, +}; + +#[derive(Debug)] +pub struct CompressedVec { + inner: RawVec, + decoded_page: Option<(usize, Box<[T]>)>, + pages_meta: CompressedPagesMetadata, + decoded_pages: Option>>>, + // pages: Option>>>, + // page: Option<(usize, Values)>, + // length: Length +} + +impl CompressedVec +where + I: StoredIndex, + T: StoredType, +{ + /// Same as import but will reset the folder under certain errors, so be careful ! + pub fn forced_import(path: &Path, version: Version) -> Result { + let res = Self::import(path, version); + match res { + Err(Error::WrongEndian) + | Err(Error::DifferentVersion { .. }) + | Err(Error::DifferentCompressionMode) => { + fs::remove_dir_all(path)?; + Self::import(path, version) + } + _ => res, + } + } + + pub fn import(path: &Path, version: Version) -> Result { + Ok(Self { + inner: RawVec::import(path, version)?, + decoded_page: None, + decoded_pages: None, + pages_meta: CompressedPagesMetadata::read(path)?, + }) + } +} + +impl AnyVec for CompressedVec +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn get_(&mut self, index: usize) -> Result>> { + self.inner.get_(index) + } + + fn iter_from(&mut self, _index: I, _f: F) -> Result<()> + where + F: FnMut((I, T, &mut Self)) -> Result<()>, + { + todo!() + // self.inner.iter_from(index, f) + } + + fn collect_range(&self, from: Option, to: Option) -> Result>> { + self.inner.collect_range(from, to) + } + + fn flush(&mut self) -> Result<()> { + self.inner.flush() + } + + fn truncate_if_needed(&mut self, index: I) -> Result<()> { + self.inner.truncate_if_needed(index) + } + + #[inline] + fn mmap(&self) -> &ArcSwap { + self.inner.mmap() + } + + #[inline] + fn guard(&self) -> &Option>> { + self.inner.guard() + } + #[inline] + fn mut_guard(&mut self) -> &mut Option>> { + self.inner.mut_guard() + } + + #[inline] + fn pushed(&self) -> &[T] { + self.inner.pushed() + } + #[inline] + fn mut_pushed(&mut self) -> &mut Vec { + self.inner.mut_pushed() + } + + #[inline] + fn path(&self) -> &Path { + self.inner.path() + } + + #[inline] + fn version(&self) -> Version { + self.inner.version() + } +} diff --git a/crates/brk_vec/src/variants/mod.rs b/crates/brk_vec/src/variants/mod.rs new file mode 100644 index 000000000..325b8f96c --- /dev/null +++ b/crates/brk_vec/src/variants/mod.rs @@ -0,0 +1,5 @@ +mod compressed; +mod raw; + +pub use compressed::*; +pub use raw::*; diff --git a/crates/brk_vec/src/variants/raw.rs b/crates/brk_vec/src/variants/raw.rs new file mode 100644 index 000000000..c1cc77a04 --- /dev/null +++ b/crates/brk_vec/src/variants/raw.rs @@ -0,0 +1,214 @@ +use std::{ + fs, + marker::PhantomData, + mem, + path::{Path, PathBuf}, + sync::Arc, +}; + +use arc_swap::{ArcSwap, Guard}; +use axum::Json; +use memmap2::Mmap; +use rayon::prelude::*; + +use crate::{AnyVec, Error, Result, StoredIndex, StoredType, UnsafeSlice, Value, Version}; + +#[derive(Debug)] +pub struct RawVec { + version: Version, + pathbuf: PathBuf, + // Consider Arc>> for dataraces when reorg ? + mmap: Arc>, + guard: Option>>, + pushed: Vec, + phantom: PhantomData, +} + +impl RawVec +where + I: StoredIndex, + T: StoredType, +{ + /// Same as import but will reset the folder under certain errors, so be careful ! + pub fn forced_import(path: &Path, version: Version) -> Result { + let res = Self::import(path, version); + match res { + Err(Error::WrongEndian) | Err(Error::DifferentVersion { .. }) => { + fs::remove_dir_all(path)?; + Self::import(path, version) + } + _ => res, + } + } + + pub fn import(path: &Path, version: Version) -> Result { + fs::create_dir_all(path)?; + + let version_path = Self::path_version_(path); + version.validate(version_path.as_ref())?; + version.write(version_path.as_ref())?; + + let file = Self::open_file_(Self::path_vec_(path).as_path())?; + let mmap = Arc::new(ArcSwap::new(Self::new_mmap(file)?)); + let guard = Some(mmap.load()); + + Ok(Self { + mmap, + guard, + version, + pathbuf: path.to_owned(), + pushed: vec![], + phantom: PhantomData, + }) + } +} + +impl AnyVec for RawVec +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn get_(&mut self, index: usize) -> Result>> { + match self.index_to_pushed_index(index) { + Ok(index) => { + if let Some(index) = index { + return Ok(self.pushed().get(index).map(|v| Value::Ref(v))); + } + } + Err(Error::IndexTooHigh) => return Ok(None), + Err(Error::IndexTooLow) => {} + Err(error) => return Err(error), + } + + let v = if let Some(guard) = self.guard.as_ref() { + Self::guard_to_value(guard, index) + } else { + Self::guard_to_value(&self.new_guard(), index) + }; + + Ok(Some(Value::Owned(v))) + } + + fn iter_from(&mut self, index: I, mut f: F) -> Result<()> + where + F: FnMut((I, T, &mut Self)) -> Result<()>, + { + if !self.is_pushed_empty() { + return Err(Error::UnsupportedUnflushedState); + } + + let guard = self.mmap.load(); + + let start = index.to_usize()? * Self::SIZE_OF_T; + + guard[start..] + .chunks(Self::SIZE_OF_T) + .enumerate() + .try_for_each(|(i, chunk)| { + let v = T::try_read_from_bytes(chunk).unwrap(); + f((I::from(i), v, self)) + })?; + + Ok(()) + } + + fn flush(&mut self) -> Result<()> { + let pushed_len = self.pushed_len(); + + if pushed_len == 0 { + return Ok(()); + } + + let bytes = { + let pushed = &mut self.pushed; + + let mut bytes: Vec = vec![0; pushed.len() * Self::SIZE_OF_T]; + + let unsafe_bytes = UnsafeSlice::new(&mut bytes); + + mem::take(pushed) + .into_par_iter() + .enumerate() + .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); + + bytes + }; + + self.file_write_all(&bytes)?; + + Ok(()) + } + + fn truncate_if_needed(&mut self, index: I) -> Result<()> { + let index = index.to_usize()?; + + if index >= self.stored_len() { + return Ok(()); + } + + if index == 0 { + self.reset()?; + return Ok(()); + } + + let len = index * Self::SIZE_OF_T; + + self.file_set_len(len as u64)?; + + Ok(()) + } + + fn collect_range(&self, from: Option, to: Option) -> Result>> { + let guard = self.mmap.load(); + + let len = guard.len() / Self::SIZE_OF_T; + + if len == 0 { + return Ok(Json(vec![])); + } + + let from = from.map_or(0, |i| Self::fix_i64(i, len, true)) * Self::SIZE_OF_T; + let to = to.map_or(len, |i| Self::fix_i64(i, len, false)) * Self::SIZE_OF_T; + + Ok(Json( + guard[from * Self::SIZE_OF_T..to * Self::SIZE_OF_T] + .chunks(Self::SIZE_OF_T) + .map(|chunk| T::try_read_from_bytes(chunk).unwrap()) + .collect::>(), + )) + } + + #[inline] + fn mmap(&self) -> &ArcSwap { + &self.mmap + } + + #[inline] + fn guard(&self) -> &Option>> { + &self.guard + } + #[inline] + fn mut_guard(&mut self) -> &mut Option>> { + &mut self.guard + } + + #[inline] + fn pushed(&self) -> &[T] { + self.pushed.as_slice() + } + #[inline] + fn mut_pushed(&mut self) -> &mut Vec { + &mut self.pushed + } + + #[inline] + fn path(&self) -> &Path { + self.pathbuf.as_path() + } + + #[inline] + fn version(&self) -> Version { + self.version + } +}