diff --git a/crates/brk_cli/src/run.rs b/crates/brk_cli/src/run.rs index 5e393c309..d09790366 100644 --- a/crates/brk_cli/src/run.rs +++ b/crates/brk_cli/src/run.rs @@ -272,7 +272,7 @@ impl RunConfig { } fn read(path: &Path) -> Self { - fs::read_to_string(path).map_or(RunConfig::default(), |contents| { + fs::read_to_string(path).map_or_else(RunConfig::default, |contents| { toml::from_str(&contents).unwrap_or_default() }) } diff --git a/crates/brk_computer/src/storage/vecs/base.rs b/crates/brk_computer/src/storage/vecs/base.rs index 896afeb3f..61b34ace0 100644 --- a/crates/brk_computer/src/storage/vecs/base.rs +++ b/crates/brk_computer/src/storage/vecs/base.rs @@ -10,12 +10,20 @@ use std::{ use brk_core::CheckedSub; use brk_exit::Exit; use brk_vec::{ - AnyStoredVec, Compressed, Error, MAX_CACHE_SIZE, Result, StoredIndex, StoredType, StoredVec, + Compressed, DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, StoredVec, Value, Version, }; +const ONE_KIB: usize = 1024; +const ONE_MIB: usize = ONE_KIB * ONE_KIB; +const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; + #[derive(Debug)] -pub struct ComputedVec { +pub struct ComputedVec +where + I: StoredIndex, + T: StoredType, +{ computed_version: Option, vec: StoredVec, } @@ -71,7 +79,7 @@ where } } - pub fn safe_flush(&mut self, exit: &Exit) -> io::Result<()> { + pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> { if exit.triggered() { return Ok(()); } @@ -97,21 +105,21 @@ where &mut self.vec } - pub fn any_vec(&self) -> &dyn AnyStoredVec { + pub fn any_vec(&self) -> &dyn brk_vec::AnyStoredVec { &self.vec } - pub fn mut_any_vec(&mut self) -> &mut dyn AnyStoredVec { + pub fn mut_any_vec(&mut self) -> &mut dyn brk_vec::AnyStoredVec { &mut self.vec } - pub fn get(&mut self, index: I) -> Result> { + pub fn get(&mut self, index: I) -> Result>> { self.vec.get(index) } - pub fn collect_range(&self, from: Option, to: Option) -> Result> { - self.vec.collect_range(from, to) - } + // pub fn collect_range(&self, from: Option, to: Option) -> Result> { + // self.vec.collect_range(from, to) + // } #[inline] fn path_computed_version(&self) -> PathBuf { @@ -137,14 +145,14 @@ where where A: StoredIndex, B: StoredType, - F: FnMut((A, B, &mut Self, &mut StoredVec)) -> (I, T), + F: FnMut((A, B, &mut Self, &mut dyn DynamicVec)) -> (I, T), { self.validate_computed_version_or_reset_file( Version::ZERO + self.version() + other.version(), )?; let index = max_from.min(A::from(self.len())); - other.iter_from_cloned(index, |(a, b, other)| { + other.iter_from(index, |(a, b, other)| { let (i, v) = t((a, b, self, other)); self.forced_push_at(i, v, exit) })?; @@ -166,9 +174,12 @@ where Version::ZERO + self.version() + other.version(), )?; - let index = max_from.min(self.vec.get_last()?.cloned().unwrap_or_default()); + let index = max_from.min( + self.vec + .get_last()? + .map_or_else(T::default, |v| v.into_inner()), + ); other.iter_from(index, |(v, i, ..)| { - let i = *i; if self.get(i).unwrap().is_none_or(|old_v| *old_v > v) { self.forced_push_at(i, v, exit) } else { @@ -260,7 +271,7 @@ where first_indexes.iter_from(index, |(i, first_index, ..)| { let last_index = last_indexes.get(i)?.unwrap(); let count = (*last_index + 1_usize) - .checked_sub(*first_index) + .checked_sub(first_index) .unwrap_or_default(); self.forced_push_at(i, count.into(), exit) })?; @@ -286,7 +297,11 @@ where let index = max_from.min(I::from(self.len())); self_to_other.iter_from(index, |(i, other, ..)| { - self.forced_push_at(i, T::from(other_to_self.get(*other)?.unwrap() == &i), exit) + self.forced_push_at( + i, + T::from(other_to_self.get(other)?.unwrap().into_inner() == i), + exit, + ) })?; Ok(self.safe_flush(exit)?) @@ -311,7 +326,7 @@ where let index = max_from.min(I::from(self.len())); first_indexes.iter_from(index, |(index, first_index, ..)| { let last_index = last_indexes.get(index)?.unwrap(); - let count = *last_index + 1_usize - *first_index; + let count = *last_index + 1_usize - first_index; self.forced_push_at(index, count.into(), exit) })?; diff --git a/crates/brk_computer/src/storage/vecs/blocks.rs b/crates/brk_computer/src/storage/vecs/blocks.rs index 2b086ad67..94c0d97d6 100644 --- a/crates/brk_computer/src/storage/vecs/blocks.rs +++ b/crates/brk_computer/src/storage/vecs/blocks.rs @@ -4,7 +4,7 @@ use brk_core::{CheckedSub, StoredU32, StoredU64, StoredUsize, Timestamp, Weight} use brk_exit::Exit; use brk_indexer::Indexer; use brk_parser::bitcoin; -use brk_vec::{AnyStoredVec, Compressed, Version}; +use brk_vec::{Compressed, DynamicVec, Version}; use super::{ Indexes, @@ -156,7 +156,7 @@ impl Vecs { Ok(()) } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> { [ self.indexes_to_block_interval.any_vecs(), self.indexes_to_block_count.any_vecs(), diff --git a/crates/brk_computer/src/storage/vecs/grouped/builder.rs b/crates/brk_computer/src/storage/vecs/grouped/builder.rs index 43cf98a53..4f615b0d0 100644 --- a/crates/brk_computer/src/storage/vecs/grouped/builder.rs +++ b/crates/brk_computer/src/storage/vecs/grouped/builder.rs @@ -1,7 +1,10 @@ use std::path::Path; use brk_exit::Exit; -use brk_vec::{AnyStoredVec, Compressed, Result, StoredIndex, StoredType, StoredVec, Version}; +use brk_vec::{ + AnyStoredVec, Compressed, DynamicVec, GenericVec, Result, StoredIndex, StoredType, StoredVec, + Version, +}; use crate::storage::vecs::base::ComputedVec; @@ -123,7 +126,7 @@ where let total_vec = self.total.as_mut().unwrap(); - source.iter_from(index, |(i, v)| { + source.iter_from(index, |(i, v, ..)| { let prev = i .to_usize() .unwrap() @@ -132,8 +135,7 @@ where total_vec .get(I::from(prev_i)) .unwrap() - .unwrap_or(&T::from(0_usize)) - .to_owned() + .map_or(T::from(0_usize), |v| v.into_inner()) }); let value = v.clone() + prev; total_vec.forced_push_at(i, value, exit)?; @@ -161,18 +163,18 @@ where { let index = self.starting_index(max_from); - first_indexes.iter_from(index, |(i, first_index)| { - let first_index = *first_index; + first_indexes.iter_from(index, |(i, first_index, ..)| { + let first_index = first_index; let last_index = *last_indexes.get(i).unwrap().unwrap(); if let Some(first) = self.first.as_mut() { - let v = source.get(first_index).unwrap().unwrap(); - first.forced_push_at(index, v.clone(), exit)?; + let v = source.get(first_index).unwrap().unwrap().into_inner(); + first.forced_push_at(index, v, exit)?; } if let Some(last) = self.last.as_mut() { - let v = source.get(last_index).unwrap().unwrap(); - last.forced_push_at(index, v.clone(), exit)?; + let v = source.get(last_index).unwrap().unwrap().into_inner(); + last.forced_push_at(index, v, exit)?; } let first_index = first_index.to_usize()?; @@ -249,7 +251,12 @@ where let prev = i.to_usize().unwrap().checked_sub(1).map_or( T::from(0_usize), |prev_i| { - total_vec.get(I::from(prev_i)).unwrap().unwrap().to_owned() + total_vec + .get(I::from(prev_i)) + .unwrap() + .unwrap() + .to_owned() + .into_inner() }, ); total_vec.forced_push_at(i, prev + sum, exit)?; @@ -302,8 +309,8 @@ where .unwrap() .get(first_index) .unwrap() - .cloned() - .unwrap(); + .unwrap() + .into_inner(); first.forced_push_at(index, v, exit)?; } diff --git a/crates/brk_computer/src/storage/vecs/indexes.rs b/crates/brk_computer/src/storage/vecs/indexes.rs index 5536289ee..7a068b97f 100644 --- a/crates/brk_computer/src/storage/vecs/indexes.rs +++ b/crates/brk_computer/src/storage/vecs/indexes.rs @@ -6,7 +6,7 @@ use brk_core::{ }; use brk_exit::Exit; use brk_indexer::Indexer; -use brk_vec::{AnyStoredVec, Compressed, Version}; +use brk_vec::{Compressed, Version}; use super::ComputedVec; @@ -366,8 +366,7 @@ impl Vecs { let starting_dateindex = self .height_to_dateindex .get(starting_indexes.height.decremented().unwrap_or_default())? - .copied() - .unwrap_or_default(); + .map_or_else(Default::default, |v| v.into_inner()); self.height_to_dateindex.compute_transform( starting_indexes.height, @@ -379,7 +378,7 @@ impl Vecs { let starting_dateindex = if let Some(dateindex) = self .height_to_dateindex .get(starting_indexes.height.decremented().unwrap_or_default())? - .copied() + .map(|v| v.into_inner()) { starting_dateindex.min(dateindex) } else { @@ -452,8 +451,7 @@ impl Vecs { let starting_weekindex = self .dateindex_to_weekindex .get(starting_dateindex)? - .copied() - .unwrap_or_default(); + .map_or_else(Default::default, |v| v.into_inner()); self.dateindex_to_weekindex.compute_transform( starting_dateindex, @@ -496,8 +494,7 @@ impl Vecs { let starting_monthindex = self .dateindex_to_monthindex .get(starting_dateindex)? - .copied() - .unwrap_or_default(); + .map_or_else(Default::default, |v| v.into_inner()); self.dateindex_to_monthindex.compute_transform( starting_dateindex, @@ -542,8 +539,7 @@ impl Vecs { let starting_quarterindex = self .monthindex_to_quarterindex .get(starting_monthindex)? - .copied() - .unwrap_or_default(); + .map_or_else(Default::default, |v| v.into_inner()); self.monthindex_to_quarterindex.compute_transform( starting_monthindex, @@ -588,8 +584,7 @@ impl Vecs { let starting_yearindex = self .monthindex_to_yearindex .get(starting_monthindex)? - .copied() - .unwrap_or_default(); + .map_or_else(Default::default, |v| v.into_inner()); self.monthindex_to_yearindex.compute_transform( starting_monthindex, @@ -634,8 +629,7 @@ impl Vecs { let starting_decadeindex = self .yearindex_to_decadeindex .get(starting_yearindex)? - .copied() - .unwrap_or_default(); + .map_or_else(Default::default, |v| v.into_inner()); self.yearindex_to_decadeindex.compute_transform( starting_yearindex, @@ -678,8 +672,7 @@ impl Vecs { let starting_difficultyepoch = self .height_to_difficultyepoch .get(starting_indexes.height)? - .copied() - .unwrap_or_default(); + .map_or_else(Default::default, |v| v.into_inner()); self.height_to_difficultyepoch.compute_transform( starting_indexes.height, @@ -727,8 +720,7 @@ impl Vecs { let starting_halvingepoch = self .height_to_halvingepoch .get(starting_indexes.height)? - .copied() - .unwrap_or_default(); + .map_or_else(Default::default, |v| v.into_inner()); self.height_to_halvingepoch.compute_transform( starting_indexes.height, @@ -784,7 +776,7 @@ impl Vecs { }) } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> { vec![ self.dateindex_to_date.any_vec(), self.dateindex_to_dateindex.any_vec(), diff --git a/crates/brk_computer/src/storage/vecs/marketprice.rs b/crates/brk_computer/src/storage/vecs/marketprice.rs index fa058a976..959b5ae6f 100644 --- a/crates/brk_computer/src/storage/vecs/marketprice.rs +++ b/crates/brk_computer/src/storage/vecs/marketprice.rs @@ -7,7 +7,7 @@ use brk_core::{ use brk_exit::Exit; use brk_fetcher::Fetcher; use brk_indexer::Indexer; -use brk_vec::{AnyStoredVec, Compressed, Version}; +use brk_vec::{Compressed, DynamicVec, Version}; use super::{ ComputedVec, Indexes, @@ -765,7 +765,7 @@ impl Vecs { Ok(()) } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> { vec![ vec![ self.dateindex_to_close_in_cents.any_vec(), diff --git a/crates/brk_computer/src/storage/vecs/transactions.rs b/crates/brk_computer/src/storage/vecs/transactions.rs index d820862e8..1acd8762a 100644 --- a/crates/brk_computer/src/storage/vecs/transactions.rs +++ b/crates/brk_computer/src/storage/vecs/transactions.rs @@ -3,7 +3,7 @@ use std::{fs, path::Path}; use brk_core::{Sats, StoredU64, Txindex, Txinindex, Txoutindex}; use brk_exit::Exit; use brk_indexer::Indexer; -use brk_vec::{AnyStoredVec, Compressed, Version}; +use brk_vec::{Compressed, DynamicVec, Version}; use super::{ ComputedVec, Indexes, @@ -203,7 +203,7 @@ impl Vecs { Ok(()) } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> { [ vec![ self.txindex_to_is_coinbase.any_vec(), diff --git a/crates/brk_indexer/examples/main.rs b/crates/brk_indexer/examples/main.rs index 864845afa..849ccd39f 100644 --- a/crates/brk_indexer/examples/main.rs +++ b/crates/brk_indexer/examples/main.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::{path::Path, time::Instant}; use brk_core::default_bitcoin_path; use brk_exit::Exit; @@ -8,6 +8,8 @@ use brk_parser::{Parser, rpc}; fn main() -> color_eyre::Result<()> { color_eyre::install()?; + let i = Instant::now(); + brk_logger::init(Some(Path::new(".log"))); let bitcoin_dir = default_bitcoin_path(); @@ -22,12 +24,14 @@ fn main() -> color_eyre::Result<()> { let outputs = Path::new("../../_outputs"); - let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), true, true)?; + let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), false, false)?; indexer.import_stores()?; indexer.import_vecs()?; indexer.index(&parser, rpc, &exit)?; + dbg!(i.elapsed()); + Ok(()) } diff --git a/crates/brk_indexer/src/vecs/base.rs b/crates/brk_indexer/src/vecs/base.rs index e3f902ea9..daf87217f 100644 --- a/crates/brk_indexer/src/vecs/base.rs +++ b/crates/brk_indexer/src/vecs/base.rs @@ -6,14 +6,18 @@ use std::{ }; use brk_vec::{ - AnyVec, Compressed, Error, MAX_CACHE_SIZE, MAX_PAGE_SIZE, Result, StoredIndex, StoredType, - StoredVec, Value, Version, + Compressed, DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, StoredVec, Value, + Version, }; use super::Height; -#[derive(Debug)] -pub struct IndexedVec { +#[derive(Debug, Clone)] +pub struct IndexedVec +where + I: StoredIndex, + T: StoredType, +{ height: Option, vec: StoredVec, } @@ -23,11 +27,6 @@ where I: StoredIndex, T: StoredType, { - pub const SIZE_OF_T: usize = size_of::(); - pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; - pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; - pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; - pub fn forced_import( path: &Path, version: Version, @@ -35,7 +34,7 @@ where ) -> brk_vec::Result { let mut vec = StoredVec::forced_import(path, version, compressed)?; - vec.enable_large_cache_if_possible(); + vec.enable_large_cache_if_needed(); Ok(Self { height: Height::try_from(Self::path_height_(path).as_path()).ok(), @@ -47,51 +46,48 @@ where pub fn get(&self, index: I) -> Result>> { self.get_(index.to_usize()?) } + #[inline] fn get_(&self, index: usize) -> Result>> { - match self.vec.index_to_pushed_index(index) { - Ok(index) => { - if let Some(index) = index { - return Ok(self.vec.pushed().get(index).map(|v| Value::Ref(v))); - } - } - Err(Error::IndexTooHigh) => return Ok(None), - Err(Error::IndexTooLow) => {} - Err(error) => return Err(error), - } + self.vec.get_(index) + // match self.vec.index_to_pushed_index(index) { + // Ok(index) => { + // if let Some(index) = index { + // return Ok(self.vec.pushed().get(index).map(|v| Value::Ref(v))); + // } + // } + // Err(Error::IndexTooHigh) => return Ok(None), + // Err(Error::IndexTooLow) => {} + // Err(error) => return Err(error), + // } - let large_cache_len = self.vec.large_cache_len(); - if large_cache_len != 0 { - let page_index = Self::index_to_page_index(index); - let last_index = self.vec.stored_len() - 1; - let max_page_index = Self::index_to_page_index(last_index); - let min_page_index = (max_page_index + 1) - large_cache_len; + // let large_cache_len = self.vec.large_cache_len(); + // if large_cache_len != 0 { + // let page_index = Self::index_to_page_index(index); + // let last_index = self.vec.stored_len() - 1; + // let max_page_index = Self::index_to_page_index(last_index); + // let min_page_index = (max_page_index + 1) - large_cache_len; - if page_index >= min_page_index { - self.vec - .pages() - .unwrap() - .get(page_index - min_page_index) - .ok_or(Error::MmapsVecIsTooSmall)? - .get_or_init(|| self.vec.decode_page(page_index).unwrap()) - .get(index) - } - } + // if page_index >= min_page_index { + // self.vec + // .pages() + // .unwrap() + // .get(page_index - min_page_index) + // .ok_or(Error::MmapsVecIsTooSmall)? + // .get_or_init(|| self.vec.decode_page(page_index).unwrap()) + // .get(index) + // } + // } - Ok(self.vec.read_(index)?.map(|v| Value::Owned(v))) + // Ok(self.vec.read_(index)?.map(|v| Value::Owned(v))) } pub fn iter_from(&mut self, index: I, f: F) -> Result<()> where - F: FnMut((I, &T)) -> Result<()>, + F: FnMut((I, T, &mut dyn DynamicVec)) -> Result<()>, { self.vec.iter_from(index, f) } - #[inline(always)] - fn index_to_page_index(index: usize) -> usize { - index / Self::PER_PAGE - } - #[inline] pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { match self.vec.len().cmp(&index.to_usize()?) { @@ -119,7 +115,7 @@ where Ok(()) } - pub fn flush(&mut self, height: Height) -> io::Result<()> { + pub fn flush(&mut self, height: Height) -> Result<()> { height.write(&self.path_height())?; self.vec.flush() } @@ -132,7 +128,7 @@ where &mut self.vec } - pub fn any_vec(&self) -> &dyn AnyVec { + pub fn any_vec(&self) -> &dyn brk_vec::AnyStoredVec { &self.vec } @@ -160,22 +156,9 @@ where } } -impl Clone for IndexedVec -where - I: StoredIndex, - T: StoredType, -{ - fn clone(&self) -> Self { - Self { - height: self.height, - vec: self.vec.clone(), - } - } -} - pub trait AnyIndexedVec: Send + Sync { fn height(&self) -> brk_core::Result; - fn flush(&mut self, height: Height) -> io::Result<()>; + fn flush(&mut self, height: Height) -> Result<()>; } impl AnyIndexedVec for IndexedVec @@ -187,7 +170,7 @@ where self.height() } - fn flush(&mut self, height: Height) -> io::Result<()> { + fn flush(&mut self, height: Height) -> Result<()> { self.flush(height) } } diff --git a/crates/brk_indexer/src/vecs/mod.rs b/crates/brk_indexer/src/vecs/mod.rs index 28e3c04b3..1eac952ec 100644 --- a/crates/brk_indexer/src/vecs/mod.rs +++ b/crates/brk_indexer/src/vecs/mod.rs @@ -1,4 +1,4 @@ -use std::{fs, io, path::Path}; +use std::{fs, path::Path}; use brk_core::{ Addressbytes, Addressindex, Addresstype, Addresstypeindex, BlockHash, Emptyindex, Height, @@ -7,7 +7,7 @@ use brk_core::{ P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, Sats, StoredUsize, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight, }; -use brk_vec::{AnyVec, Compressed, Version}; +use brk_vec::{AnyStoredVec, Compressed, Result, Version}; use rayon::prelude::*; use crate::Indexes; @@ -595,7 +595,7 @@ impl Vecs { } } - pub fn flush(&mut self, height: Height) -> io::Result<()> { + pub fn flush(&mut self, height: Height) -> Result<()> { self.as_mut_any_vecs() .into_par_iter() .try_for_each(|vec| vec.flush(height)) @@ -609,7 +609,7 @@ impl Vecs { .unwrap() } - pub fn as_any_vecs(&self) -> Vec<&dyn AnyVec> { + pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> { vec![ self.addressindex_to_addresstype.any_vec(), self.addressindex_to_addresstypeindex.any_vec(), diff --git a/crates/brk_vec/examples/main.rs b/crates/brk_vec/examples/main.rs index 0b26bb7ba..6062b07c8 100644 --- a/crates/brk_vec/examples/main.rs +++ b/crates/brk_vec/examples/main.rs @@ -1,12 +1,16 @@ use std::{fs, path::Path}; -use brk_vec::{AnyVec, RawVec, Version}; +use brk_vec::{Compressed, DynamicVec, GenericVec, StoredVec, Version}; fn main() -> Result<(), Box> { let _ = fs::remove_dir_all("./vec"); + let version = Version::ZERO; + let compressed = Compressed::NO; + { - let mut vec: RawVec = RawVec::forced_import(Path::new("./vec"), Version::ZERO)?; + let mut vec: StoredVec = + StoredVec::forced_import(Path::new("./vec"), version, compressed)?; (0..21_u32).for_each(|v| { vec.push(v); @@ -19,7 +23,8 @@ fn main() -> Result<(), Box> { } { - let mut vec: RawVec = RawVec::forced_import(Path::new("./vec"), Version::ZERO)?; + let mut vec: StoredVec = + StoredVec::forced_import(Path::new("./vec"), version, compressed)?; dbg!(vec.get(0)?); dbg!(vec.get(0)?); @@ -40,7 +45,8 @@ fn main() -> Result<(), Box> { } { - let mut vec: RawVec = RawVec::forced_import(Path::new("./vec"), Version::ZERO)?; + let mut vec: StoredVec = + StoredVec::forced_import(Path::new("./vec"), version, compressed)?; // vec.enable_large_cache_if_possible(); diff --git a/crates/brk_vec/src/enums/mod.rs b/crates/brk_vec/src/enums/mod.rs index acf1f9efb..9198ca8e0 100644 --- a/crates/brk_vec/src/enums/mod.rs +++ b/crates/brk_vec/src/enums/mod.rs @@ -1,7 +1,5 @@ mod error; mod value; -mod values; pub use error::*; pub use value::*; -pub use values::*; diff --git a/crates/brk_vec/src/enums/values.rs b/crates/brk_vec/src/enums/values.rs deleted file mode 100644 index 51194e180..000000000 --- a/crates/brk_vec/src/enums/values.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::ops::Range; - -use memmap2::Mmap; -use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; - -const ONE_KIB: usize = 1024; -pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB; -const ONE_MIB: usize = ONE_KIB * ONE_KIB; -pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; - -use super::Result; - -#[derive(Debug)] -pub enum Values { - Owned(Box<[T]>), - Ref(Box), -} - -impl Values { - const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; - const SIZE_OF_T: usize = size_of::(); - - pub fn get(&self, index: usize) -> Result> - where - T: TryFromBytes + IntoBytes + Immutable + KnownLayout, - { - let index = Self::index_to_decoded_index(index); - - Ok(match self { - Self::Owned(a) => a.get(index), - Self::Ref(m) => { - let range = Self::index_to_byte_range(index); - let source = &m[range]; - Some(T::try_ref_from_bytes(source)?) - } - }) - } - - pub fn as_arr(&self) -> &[T] { - match self { - Self::Owned(a) => a, - Self::Ref(_) => unreachable!(), - } - } - - pub fn as_mmap(&self) -> &Mmap { - match self { - Self::Owned(_) => unreachable!(), - Self::Ref(m) => m, - } - } - - #[inline] - fn index_to_byte_range(index: usize) -> Range { - let index = Self::index_to_byte_index(index) as usize; - index..(index + Self::SIZE_OF_T) - } - - #[inline] - fn index_to_byte_index(index: usize) -> u64 { - (index * Self::SIZE_OF_T) as u64 - } - - #[inline(always)] - fn index_to_decoded_index(index: usize) -> usize { - index % Self::PER_PAGE - } -} - -impl From> for Values { - fn from(value: Box<[T]>) -> Self { - Self::Owned(value) - } -} - -impl From for Values { - fn from(value: Mmap) -> Self { - Self::Ref(Box::new(value)) - } -} - -impl Default for Values { - fn default() -> Self { - Self::Owned(vec![].into_boxed_slice()) - } -} diff --git a/crates/brk_vec/src/lib.rs b/crates/brk_vec/src/lib.rs index d41cc87d8..13711a91a 100644 --- a/crates/brk_vec/src/lib.rs +++ b/crates/brk_vec/src/lib.rs @@ -8,18 +8,25 @@ mod structs; mod traits; mod variants; -use std::{path::Path, sync::Arc}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; use arc_swap::{ArcSwap, Guard}; -use axum::Json; +use axum::{Json, response::Response}; pub use enums::*; use memmap2::Mmap; pub use structs::*; pub use traits::*; -pub use variants::*; +use variants::*; -#[derive(Debug)] -pub enum StoredVec { +#[derive(Debug, Clone)] +pub enum StoredVec +where + I: StoredIndex, + T: StoredType, +{ Raw(RawVec), Compressed(CompressedVec), } @@ -38,30 +45,68 @@ where Ok(Self::Raw(RawVec::forced_import(path, version)?)) } } + + pub fn enable_large_cache_if_needed(&mut self) { + match self { + StoredVec::Compressed(v) => v.enable_large_cache(), + Self::Raw(_) => {} + } + } } -impl AnyVec for StoredVec +impl DynamicVec for StoredVec where I: StoredIndex, T: StoredType, { + type I = I; + type T = T; + #[inline] - fn get_(&mut self, index: usize) -> Result>> { + fn get_(&self, index: usize) -> Result>> { match self { StoredVec::Raw(v) => v.get_(index), StoredVec::Compressed(v) => v.get_(index), } } - fn iter_from(&mut self, index: I, mut f: F) -> Result<()> + #[inline] + fn stored_len(&self) -> usize { + match self { + StoredVec::Raw(v) => v.stored_len(), + StoredVec::Compressed(v) => v.stored_len(), + } + } + + #[inline] + fn pushed(&self) -> &[T] { + match self { + StoredVec::Raw(v) => v.pushed(), + StoredVec::Compressed(v) => v.pushed(), + } + } + #[inline] + fn mut_pushed(&mut self) -> &mut Vec { + match self { + StoredVec::Raw(v) => v.mut_pushed(), + StoredVec::Compressed(v) => v.mut_pushed(), + } + } +} + +impl GenericVec for StoredVec +where + I: StoredIndex + Send + Sync, + T: StoredType + Send + Sync, +{ + fn iter_from(&mut self, index: I, f: F) -> Result<()> where - F: FnMut((I, T, &mut Self)) -> Result<()>, + F: FnMut((I, T, &mut dyn DynamicVec)) -> Result<()>, { - todo!(); - // match self { - // StoredVec::Raw(v) => v.iter_from(index, |(i, t, inner)| f((i, t, self))), - // StoredVec::Compressed(v) => v.iter_from(index, |(i, t, inner)| f((i, t, self))), - // } + match self { + StoredVec::Raw(v) => v.iter_from(index, f), + StoredVec::Compressed(v) => v.iter_from(index, f), + } } fn collect_range(&self, from: Option, to: Option) -> Result>> { @@ -108,21 +153,6 @@ where } } - #[inline] - fn pushed(&self) -> &[T] { - match self { - StoredVec::Raw(v) => v.pushed(), - StoredVec::Compressed(v) => v.pushed(), - } - } - #[inline] - fn mut_pushed(&mut self) -> &mut Vec { - match self { - StoredVec::Raw(v) => v.mut_pushed(), - StoredVec::Compressed(v) => v.mut_pushed(), - } - } - #[inline] fn path(&self) -> &Path { match self { @@ -139,3 +169,51 @@ where } } } + +pub trait AnyStoredVec: Send + Sync { + fn file_name(&self) -> String; + fn index_type_to_string(&self) -> &str; + fn len(&self) -> usize; + fn is_empty(&self) -> bool; + fn flush(&mut self) -> Result<()>; + fn collect_range_response(&self, from: Option, to: Option) -> Result; + fn path_vec(&self) -> PathBuf; +} + +impl AnyStoredVec for StoredVec +where + I: StoredIndex, + T: StoredType, +{ + #[inline] + fn len(&self) -> usize { + DynamicVec::len(self) + } + + #[inline] + fn is_empty(&self) -> bool { + DynamicVec::is_empty(self) + } + + #[inline] + fn index_type_to_string(&self) -> &str { + GenericVec::index_type_to_string(self) + } + + fn flush(&mut self) -> Result<()> { + GenericVec::flush(self) + } + + fn collect_range_response(&self, from: Option, to: Option) -> Result { + GenericVec::collect_range_response(self, from, to) + } + + #[inline] + fn path_vec(&self) -> PathBuf { + GenericVec::path_vec(self) + } + + fn file_name(&self) -> String { + GenericVec::file_name(self) + } +} diff --git a/crates/brk_vec/src/lib2.rs b/crates/brk_vec/src/lib2.rs index 85e493a3e..6304d9804 100644 --- a/crates/brk_vec/src/lib2.rs +++ b/crates/brk_vec/src/lib2.rs @@ -27,6 +27,8 @@ pub use enums::*; pub use structs::*; pub use traits::*; +use crate::{Compressed, CompressedPagesMetadata, Length, StoredIndex, StoredType, Version}; + const ONE_KIB: usize = 1024; pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB; const ONE_MIB: usize = ONE_KIB * ONE_KIB; diff --git a/crates/brk_vec/src/traits/dynamic.rs b/crates/brk_vec/src/traits/dynamic.rs new file mode 100644 index 000000000..5359b83db --- /dev/null +++ b/crates/brk_vec/src/traits/dynamic.rs @@ -0,0 +1,43 @@ +use crate::{Result, Value}; + +use super::{StoredIndex, StoredType}; + +pub trait DynamicVec: Send + Sync { + type I: StoredIndex; + type T: StoredType; + + #[inline] + fn get(&self, index: Self::I) -> Result>> { + self.get_(index.to_usize()?) + } + fn get_(&self, index: usize) -> Result>>; + fn get_last(&self) -> Result>> { + let len = self.len(); + if len == 0 { + return Ok(None); + } + self.get_(len - 1) + } + + #[inline] + fn len(&self) -> usize { + self.stored_len() + self.pushed_len() + } + #[inline] + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn stored_len(&self) -> usize; + + fn pushed(&self) -> &[Self::T]; + #[inline] + fn pushed_len(&self) -> usize { + self.pushed().len() + } + fn mut_pushed(&mut self) -> &mut Vec; + #[inline] + fn push(&mut self, value: Self::T) { + self.mut_pushed().push(value) + } +} diff --git a/crates/brk_vec/src/traits/any.rs b/crates/brk_vec/src/traits/generic.rs similarity index 65% rename from crates/brk_vec/src/traits/any.rs rename to crates/brk_vec/src/traits/generic.rs index a2c2dbc5c..5402da62f 100644 --- a/crates/brk_vec/src/traits/any.rs +++ b/crates/brk_vec/src/traits/generic.rs @@ -1,7 +1,3 @@ -// use std::{io, path::PathBuf}; - -// use crate::{Result}; - use std::{ fs::{File, OpenOptions}, io::{self, Seek, SeekFrom, Write}, @@ -16,17 +12,16 @@ use axum::{ }; use memmap2::Mmap; -use crate::{Error, Result, Value, Version}; +use crate::{Error, Result, Version}; -use super::{StoredIndex, StoredType}; +use super::{DynamicVec, StoredIndex, StoredType}; -pub trait AnyVec: Send + Sync +pub trait GenericVec: DynamicVec where - I: StoredIndex + Sized, + I: StoredIndex, T: StoredType, - Self: Sized, { - const SIZE_OF_T: usize = size_of::(); + const SIZE_OF_T: usize = size_of::(); fn open_file(&self) -> io::Result { Self::open_file_(&self.path_vec()) @@ -67,15 +62,6 @@ where } fn guard(&self) -> &Option>>; fn mut_guard(&mut self) -> &mut Option>>; - #[inline] - fn guard_to_value(guard: &Guard>, index: usize) -> T { - let index = index * Self::SIZE_OF_T; - let slice = &guard[index..(index + Self::SIZE_OF_T)]; - - let v = T::try_ref_from_bytes(slice).unwrap(); - - v.clone() - } fn new_mmap(file: File) -> Result> { Ok(Arc::new(unsafe { Mmap::map(&file)? })) @@ -94,39 +80,6 @@ where Ok(()) } - #[inline] - fn get(&mut self, index: I) -> Result>> { - self.get_(index.to_usize()?) - } - fn get_(&mut self, index: usize) -> Result>>; - fn get_last(&mut self) -> Result>> { - let len = self.len(); - if len == 0 { - return Ok(None); - } - self.get_(len - 1) - } - - #[inline] - fn stored_len(&self) -> usize { - if let Some(guard) = self.guard() { - guard.len() / Self::SIZE_OF_T - } else { - self.new_guard().len() / Self::SIZE_OF_T - } - } - - fn pushed(&self) -> &[T]; - #[inline] - fn pushed_len(&self) -> usize { - self.pushed().len() - } - fn mut_pushed(&mut self) -> &mut Vec; - #[inline] - fn push(&mut self, value: T) { - self.mut_pushed().push(value) - } - #[inline] fn is_pushed_empty(&self) -> bool { self.pushed_len() == 0 @@ -149,12 +102,7 @@ where } #[inline] - fn len(&self) -> usize { - self.stored_len() + self.pushed_len() - } - - #[inline] - fn has(&self, index: I) -> Result { + fn has(&self, index: Self::I) -> Result { Ok(self.has_(index.to_usize()?)) } #[inline] @@ -162,27 +110,34 @@ where index < self.len() } - #[inline] - fn is_empty(&self) -> bool { - self.len() == 0 - } - #[inline] fn index_type_to_string(&self) -> &str { - I::to_string() + Self::I::to_string() } #[inline] fn iter(&mut self, f: F) -> Result<()> where - F: FnMut((I, T, &mut Self)) -> Result<()>, + F: FnMut( + ( + Self::I, + Self::T, + &mut dyn DynamicVec, + ), + ) -> Result<()>, { - self.iter_from(I::default(), f) + self.iter_from(Self::I::default(), f) } - fn iter_from(&mut self, index: I, f: F) -> Result<()> + fn iter_from(&mut self, index: Self::I, f: F) -> Result<()> where - F: FnMut((I, T, &mut Self)) -> Result<()>; + F: FnMut( + ( + Self::I, + Self::T, + &mut dyn DynamicVec, + ), + ) -> Result<()>; fn fix_i64(i: i64, len: usize, from: bool) -> usize { if i >= 0 { @@ -202,9 +157,9 @@ where fn flush(&mut self) -> Result<()>; - fn truncate_if_needed(&mut self, index: I) -> Result<()>; + fn truncate_if_needed(&mut self, index: Self::I) -> Result<()>; - fn collect_range(&self, from: Option, to: Option) -> Result>>; + fn collect_range(&self, from: Option, to: Option) -> Result>>; fn collect_range_response(&self, from: Option, to: Option) -> Result { Ok(self.collect_range(from, to)?.into_response()) @@ -236,12 +191,4 @@ where } fn version(&self) -> Version; - - fn any(&self) -> &Self { - self - } - - fn mut_any(&mut self) -> &mut Self { - self - } } diff --git a/crates/brk_vec/src/traits/mod.rs b/crates/brk_vec/src/traits/mod.rs index 44d292241..7f210ba99 100644 --- a/crates/brk_vec/src/traits/mod.rs +++ b/crates/brk_vec/src/traits/mod.rs @@ -1,7 +1,9 @@ -mod any; +mod dynamic; +mod generic; mod stored_index; mod stored_type; -pub use any::*; +pub use dynamic::*; +pub use generic::*; pub use stored_index::*; pub use stored_type::*; diff --git a/crates/brk_vec/src/traits/stored_type.rs b/crates/brk_vec/src/traits/stored_type.rs index 2ae350140..bb9dd1495 100644 --- a/crates/brk_vec/src/traits/stored_type.rs +++ b/crates/brk_vec/src/traits/stored_type.rs @@ -5,10 +5,28 @@ use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; pub trait StoredType where - Self: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + Serialize, + Self: Sized + + Debug + + Clone + + TryFromBytes + + IntoBytes + + Immutable + + KnownLayout + + Send + + Sync + + Serialize, { } impl StoredType for T where - T: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + Serialize + T: Sized + + Debug + + Clone + + TryFromBytes + + IntoBytes + + Immutable + + KnownLayout + + Send + + Sync + + Serialize { } diff --git a/crates/brk_vec/src/variants/compressed.rs b/crates/brk_vec/src/variants/compressed.rs index 6297bfdfe..a2edb5708 100644 --- a/crates/brk_vec/src/variants/compressed.rs +++ b/crates/brk_vec/src/variants/compressed.rs @@ -1,5 +1,5 @@ use std::{ - fs, + fs, mem, path::Path, sync::{Arc, OnceLock}, }; @@ -7,17 +7,25 @@ use std::{ use arc_swap::{ArcSwap, Guard}; use axum::Json; use memmap2::Mmap; +use rayon::prelude::*; +use zstd::DEFAULT_COMPRESSION_LEVEL; use crate::{ - AnyVec, CompressedPagesMetadata, Error, RawVec, Result, StoredIndex, StoredType, Value, Version, + CompressedPageMetadata, CompressedPagesMetadata, DynamicVec, Error, GenericVec, RawVec, Result, + StoredIndex, StoredType, UnsafeSlice, Value, Version, }; +const ONE_KIB: usize = 1024; +const ONE_MIB: usize = ONE_KIB * ONE_KIB; +pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; +pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB; + #[derive(Debug)] pub struct CompressedVec { inner: RawVec, - decoded_page: Option<(usize, Box<[T]>)>, - pages_meta: CompressedPagesMetadata, - decoded_pages: Option>>>, + decoded_page: Option<(usize, Vec)>, + decoded_pages: Option>>>, + pages_meta: Arc>, // pages: Option>>>, // page: Option<(usize, Values)>, // length: Length @@ -28,6 +36,10 @@ where I: StoredIndex, T: StoredType, { + pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; + pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; + pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; + /// Same as import but will reset the folder under certain errors, so be careful ! pub fn forced_import(path: &Path, version: Version) -> Result { let res = Self::import(path, version); @@ -47,39 +59,284 @@ where inner: RawVec::import(path, version)?, decoded_page: None, decoded_pages: None, - pages_meta: CompressedPagesMetadata::read(path)?, + pages_meta: Arc::new(ArcSwap::new(Arc::new(CompressedPagesMetadata::read(path)?))), }) } + + pub fn decode_page(&self, page_index: usize) -> Result> { + Self::decode_page_( + self.stored_len(), + page_index, + self.guard().as_ref().unwrap(), + &self.pages_meta.load(), + ) + } + + fn decode_page_( + stored_len: usize, + page_index: usize, + mmap: &Mmap, + compressed_pages_meta: &CompressedPagesMetadata, + ) -> Result> { + if Self::page_index_to_index(page_index) >= stored_len { + return Err(Error::IndexTooHigh); + } else if compressed_pages_meta.len() <= page_index { + return Err(Error::ExpectVecToHaveIndex); + } + + let page = compressed_pages_meta.get(page_index).unwrap(); + let len = page.bytes_len as usize; + let offset = page.start as usize; + + Ok(zstd::decode_all(&mmap[offset..offset + len]) + .inspect_err(|_| { + dbg!((len, offset, page_index, &mmap[..], &mmap.len())); + })? + .chunks(Self::SIZE_OF_T) + .map(|slice| T::try_read_from_bytes(slice).unwrap()) + .collect::>()) + } + + fn compress_page(chunk: &[T]) -> Box<[u8]> { + if chunk.len() > Self::PER_PAGE { + panic!(); + } + + let mut bytes: Vec = vec![0; chunk.len() * Self::SIZE_OF_T]; + + let unsafe_bytes = UnsafeSlice::new(&mut bytes); + + chunk + .into_par_iter() + .enumerate() + .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); + + zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL) + .unwrap() + .into_boxed_slice() + } + + pub fn enable_large_cache(&mut self) { + self.decoded_pages.replace(vec![]); + self.reset_large_cache(); + } + + pub fn disable_large_cache(&mut self) { + self.decoded_pages.take(); + } + + fn reset_large_cache(&mut self) { + let stored_len = self.stored_len(); + + if let Some(pages) = self.decoded_pages.as_mut() { + pages.par_iter_mut().for_each(|lock| { + lock.take(); + }); + + let len = (stored_len as f64 / Self::PER_PAGE as f64).ceil() as usize; + let len = Self::CACHE_LENGTH.min(len); + + if pages.len() != len { + pages.resize_with(len, Default::default); + } + } + } + + pub fn large_cache_len(&self) -> usize { + self.decoded_pages.as_ref().map_or(0, |v| v.len()) + } + + fn reset_small_cache(&mut self) { + self.decoded_page.take(); + } + + fn reset_caches(&mut self) { + self.reset_small_cache(); + self.reset_large_cache(); + } + + #[inline(always)] + fn index_to_page_index(index: usize) -> usize { + index / Self::PER_PAGE + } + + #[inline(always)] + fn page_index_to_index(page_index: usize) -> usize { + page_index * Self::PER_PAGE + } } -impl AnyVec for CompressedVec +impl DynamicVec for CompressedVec where I: StoredIndex, T: StoredType, { + type I = I; + type T = T; + #[inline] - fn get_(&mut self, index: usize) -> Result>> { - self.inner.get_(index) + fn get_(&self, index: usize) -> Result>> { + match self.index_to_pushed_index(index) { + Ok(index) => { + if let Some(index) = index { + return Ok(self.pushed().get(index).map(|v| Value::Ref(v))); + } + } + Err(Error::IndexTooHigh) => return Ok(None), + Err(Error::IndexTooLow) => {} + Err(error) => return Err(error), + } + + let page_index = Self::index_to_page_index(index); + + // if self.page().is_none_or(|b| b.0 != page_index) { + // let values = self.decode_page(page_index)?; + // self.mut_page().replace((page_index, values)); + // } + + // self.page().unwrap().1.get(index) + // + todo!(); + + // let v = self.inner.guard().as_ref().map_or_else( + // || Self::guard_to_value(&self.new_guard(), index), + // |guard| Self::guard_to_value(guard, index), + // ); + + // Ok(Some(Value::Owned(v))) } + fn stored_len(&self) -> usize { + todo!() + } + + #[inline] + fn pushed(&self) -> &[T] { + self.inner.pushed() + } + #[inline] + fn mut_pushed(&mut self) -> &mut Vec { + self.inner.mut_pushed() + } +} + +impl GenericVec for CompressedVec +where + I: StoredIndex, + T: StoredType, +{ fn iter_from(&mut self, _index: I, _f: F) -> Result<()> where - F: FnMut((I, T, &mut Self)) -> Result<()>, + F: FnMut((I, T, &mut dyn DynamicVec)) -> Result<()>, { todo!() - // self.inner.iter_from(index, f) } fn collect_range(&self, from: Option, to: Option) -> Result>> { - self.inner.collect_range(from, to) + todo!() } fn flush(&mut self) -> Result<()> { - self.inner.flush() + let pushed_len = self.pushed_len(); + + if pushed_len == 0 { + return Ok(()); + } + + let stored_len = self.stored_len(); + + let mut pages_meta = (**self.pages_meta.load()).clone(); + + let mut starting_page_index = pages_meta.len(); + let mut values = vec![]; + let mut truncate_at = None; + + if self.stored_len() % Self::PER_PAGE != 0 { + if pages_meta.is_empty() { + unreachable!() + } + + let last_page_index = pages_meta.len() - 1; + + values = if let Some(values) = self + .decoded_pages + .as_mut() + .and_then(|big_cache| big_cache.last_mut().and_then(|lock| lock.take())) + { + values + } else if self + .decoded_page + .as_ref() + .is_some_and(|(page_index, _)| *page_index == last_page_index) + { + self.decoded_page.take().unwrap().1 + } else { + Self::decode_page_( + stored_len, + last_page_index, + self.guard().as_ref().unwrap(), + &pages_meta, + ) + .inspect_err(|_| { + dbg!(last_page_index, &pages_meta); + }) + .unwrap() + }; + + truncate_at.replace(pages_meta.pop().unwrap().start); + starting_page_index = last_page_index; + } + + let compressed = values + .into_par_iter() + .chain(mem::take(self.mut_pushed()).into_par_iter()) + .chunks(Self::PER_PAGE) + .map(|chunk| (Self::compress_page(chunk.as_ref()), chunk.len())) + .collect::>(); + + compressed + .iter() + .enumerate() + .for_each(|(i, (compressed_bytes, values_len))| { + let page_index = starting_page_index + i; + + let start = if page_index != 0 { + let prev = pages_meta.get(page_index - 1).unwrap(); + prev.start + prev.bytes_len as u64 + } else { + 0 + }; + + let bytes_len = compressed_bytes.len() as u32; + let values_len = *values_len as u32; + + let page = CompressedPageMetadata::new(start, bytes_len, values_len); + + pages_meta.push(page_index, page); + }); + + pages_meta.write()?; + + let buf = compressed + .into_iter() + .flat_map(|(v, _)| v) + .collect::>(); + + if let Some(truncate_at) = truncate_at { + self.file_set_len(truncate_at)?; + } + + self.file_write_all(&buf)?; + + self.pages_meta.store(Arc::new(pages_meta)); + + self.reset_caches(); + + Ok(()) } fn truncate_if_needed(&mut self, index: I) -> Result<()> { - self.inner.truncate_if_needed(index) + todo!() } #[inline] @@ -96,15 +353,6 @@ where self.inner.mut_guard() } - #[inline] - fn pushed(&self) -> &[T] { - self.inner.pushed() - } - #[inline] - fn mut_pushed(&mut self) -> &mut Vec { - self.inner.mut_pushed() - } - #[inline] fn path(&self) -> &Path { self.inner.path() @@ -115,3 +363,18 @@ where self.inner.version() } } + +impl Clone for CompressedVec +where + I: StoredIndex, + T: StoredType, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + decoded_page: None, + decoded_pages: None, + pages_meta: self.pages_meta.clone(), + } + } +} diff --git a/crates/brk_vec/src/variants/raw.rs b/crates/brk_vec/src/variants/raw.rs index c1cc77a04..687f40e9a 100644 --- a/crates/brk_vec/src/variants/raw.rs +++ b/crates/brk_vec/src/variants/raw.rs @@ -11,7 +11,9 @@ use axum::Json; use memmap2::Mmap; use rayon::prelude::*; -use crate::{AnyVec, Error, Result, StoredIndex, StoredType, UnsafeSlice, Value, Version}; +use crate::{ + DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Value, Version, +}; #[derive(Debug)] pub struct RawVec { @@ -63,13 +65,16 @@ where } } -impl AnyVec for RawVec +impl DynamicVec for RawVec where I: StoredIndex, T: StoredType, { + type I = I; + type T = T; + #[inline] - fn get_(&mut self, index: usize) -> Result>> { + fn get_(&self, index: usize) -> Result>> { match self.index_to_pushed_index(index) { Ok(index) => { if let Some(index) = index { @@ -81,18 +86,42 @@ where Err(error) => return Err(error), } - let v = if let Some(guard) = self.guard.as_ref() { - Self::guard_to_value(guard, index) - } else { - Self::guard_to_value(&self.new_guard(), index) - }; + let guard = self.guard.as_ref().unwrap(); + let index = index * Self::SIZE_OF_T; + let slice = &guard[index..(index + Self::SIZE_OF_T)]; + + let v = Self::T::try_read_from_bytes(slice)?; Ok(Some(Value::Owned(v))) } + #[inline] + fn stored_len(&self) -> usize { + if let Some(guard) = self.guard() { + guard.len() / Self::SIZE_OF_T + } else { + self.new_guard().len() / Self::SIZE_OF_T + } + } + + #[inline] + fn pushed(&self) -> &[T] { + self.pushed.as_slice() + } + #[inline] + fn mut_pushed(&mut self) -> &mut Vec { + &mut self.pushed + } +} + +impl GenericVec for RawVec +where + I: StoredIndex + Send + Sync, + T: StoredType + Send + Sync, +{ fn iter_from(&mut self, index: I, mut f: F) -> Result<()> where - F: FnMut((I, T, &mut Self)) -> Result<()>, + F: FnMut((I, T, &mut dyn DynamicVec)) -> Result<()>, { if !self.is_pushed_empty() { return Err(Error::UnsupportedUnflushedState); @@ -107,7 +136,7 @@ where .enumerate() .try_for_each(|(i, chunk)| { let v = T::try_read_from_bytes(chunk).unwrap(); - f((I::from(i), v, self)) + f((I::from(i), v, self as &mut dyn DynamicVec)) })?; Ok(()) @@ -168,8 +197,8 @@ where return Ok(Json(vec![])); } - let from = from.map_or(0, |i| Self::fix_i64(i, len, true)) * Self::SIZE_OF_T; - let to = to.map_or(len, |i| Self::fix_i64(i, len, false)) * Self::SIZE_OF_T; + let from = from.map_or(0, |i| Self::fix_i64(i, len, true)); + let to = to.map_or(len, |i| Self::fix_i64(i, len, false)); Ok(Json( guard[from * Self::SIZE_OF_T..to * Self::SIZE_OF_T] @@ -193,15 +222,6 @@ where &mut self.guard } - #[inline] - fn pushed(&self) -> &[T] { - self.pushed.as_slice() - } - #[inline] - fn mut_pushed(&mut self) -> &mut Vec { - &mut self.pushed - } - #[inline] fn path(&self) -> &Path { self.pathbuf.as_path() @@ -212,3 +232,21 @@ where self.version } } + +impl Clone for RawVec +where + I: StoredIndex, + T: StoredType, +{ + fn clone(&self) -> Self { + Self { + version: self.version, + pathbuf: self.pathbuf.clone(), + // Consider Arc>> for dataraces when reorg ? + mmap: self.mmap.clone(), + guard: None, + pushed: vec![], + phantom: PhantomData, + } + } +}