diff --git a/crates/brk_cli/src/run.rs b/crates/brk_cli/src/run.rs index ab10f1b99..83589fb53 100644 --- a/crates/brk_cli/src/run.rs +++ b/crates/brk_cli/src/run.rs @@ -65,7 +65,7 @@ pub fn run(config: RunConfig) -> color_eyre::Result<()> { let starting_indexes = indexer.index(&parser, rpc, &exit)?; - computer.compute(&mut indexer, starting_indexes, &exit)?; + // computer.compute(&mut indexer, starting_indexes, &exit)?; if let Some(delay) = config.delay() { sleep(Duration::from_secs(delay)) diff --git a/crates/brk_computer/src/storage/vecs/base.rs b/crates/brk_computer/src/storage/vecs/base.rs index 933debe7c..d1b0158b8 100644 --- a/crates/brk_computer/src/storage/vecs/base.rs +++ b/crates/brk_computer/src/storage/vecs/base.rs @@ -15,7 +15,7 @@ use brk_vec::{ const ONE_KIB: usize = 1024; const ONE_MIB: usize = ONE_KIB * ONE_KIB; -const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; +const MAX_CACHE_SIZE: usize = 210 * ONE_MIB; #[derive(Debug)] pub struct ComputedVec @@ -24,7 +24,7 @@ where T: StoredType, { computed_version: Option, - vec: StoredVec, + inner: StoredVec, } impl ComputedVec @@ -43,7 +43,7 @@ where Ok(Self { computed_version: None, - vec, + inner: vec, }) } @@ -52,7 +52,7 @@ where return Ok(()); } exit.block(); - self.vec.truncate_if_needed(index)?; + self.inner.truncate_if_needed(index)?; exit.release(); Ok(()) } @@ -67,11 +67,11 @@ where if ord == Ordering::Greater { self.safe_truncate_if_needed(index, exit)?; } - self.vec.push(value); + self.inner.push(value); } } - if self.vec.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE { + if self.inner.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE { self.safe_flush(exit) } else { Ok(()) @@ -83,52 +83,56 @@ where return Ok(()); } exit.block(); - self.vec.flush()?; + self.inner.flush()?; exit.release(); Ok(()) } fn version(&self) -> Version { - self.vec.version() + self.inner.version() } pub fn len(&self) -> usize { - self.vec.len() + self.inner.len() } pub fn vec(&self) -> &StoredVec { - &self.vec + &self.inner } pub fn mut_vec(&mut self) -> &mut StoredVec { - &mut self.vec + &mut self.inner } pub fn any_vec(&self) -> &dyn brk_vec::AnyStoredVec { - &self.vec + &self.inner } pub fn mut_any_vec(&mut self) -> &mut dyn brk_vec::AnyStoredVec { - &mut self.vec + &mut self.inner } - pub fn get(&mut self, index: I) -> Result>> { - self.vec.get(index) + pub fn cached_get(&mut self, index: I) -> Result>> { + self.inner.cached_get(index) } - pub fn collect_range(&self, from: Option, to: Option) -> Result> { - self.vec.collect_range(from, to) + pub fn collect_inclusive_range(&self, from: I, to: I) -> Result> { + self.inner.collect_inclusive_range(from, to) + } + + pub fn path(&self) -> &Path { + self.inner.path() } #[inline] fn path_computed_version(&self) -> PathBuf { - self.vec.path().join("computed_version") + self.inner.path().join("computed_version") } fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> { let path = self.path_computed_version(); if version.validate(path.as_ref()).is_err() { - self.vec.reset()?; + self.inner.reset()?; } version.write(path.as_ref())?; Ok(()) @@ -174,12 +178,12 @@ where )?; let index = max_from.min( - self.vec - .get_last()? + self.inner + .cached_get_last()? .map_or_else(T::default, |v| v.into_inner()), ); other.iter_from(index, |(v, i, ..)| { - if self.get(i).unwrap().is_none_or(|old_v| *old_v > v) { + if self.cached_get(i).unwrap().is_none_or(|old_v| *old_v > v) { self.forced_push_at(i, v, exit) } else { Ok(()) @@ -207,7 +211,7 @@ where let index = max_from.min(T::from(self.len())); first_indexes.iter_from(index, |(value, first_index, ..)| { let first_index = (first_index).to_usize()?; - let last_index = (last_indexes.get(value)?.unwrap()).to_usize()?; + let last_index = (last_indexes.cached_get(value)?.unwrap()).to_usize()?; (first_index..last_index) .try_for_each(|index| self.forced_push_at(I::from(index), value, exit)) })?; @@ -268,7 +272,7 @@ where let index = max_from.min(I::from(self.len())); first_indexes.iter_from(index, |(i, first_index, ..)| { - let last_index = last_indexes.get(i)?.unwrap(); + let last_index = last_indexes.cached_get(i)?.unwrap(); let count = (*last_index + 1_usize) .checked_sub(first_index) .unwrap_or_default(); @@ -298,7 +302,7 @@ where self_to_other.iter_from(index, |(i, other, ..)| { self.forced_push_at( i, - T::from(other_to_self.get(other)?.unwrap().into_inner() == i), + T::from(other_to_self.cached_get(other)?.unwrap().into_inner() == i), exit, ) })?; @@ -324,7 +328,7 @@ where let index = max_from.min(I::from(self.len())); first_indexes.iter_from(index, |(index, first_index, ..)| { - let last_index = last_indexes.get(index)?.unwrap(); + let last_index = last_indexes.cached_get(index)?.unwrap(); let count = *last_index + 1_usize - first_index; self.forced_push_at(index, count.into(), exit) })?; @@ -341,7 +345,7 @@ where fn clone(&self) -> Self { Self { computed_version: self.computed_version, - vec: self.vec.clone(), + inner: self.inner.clone(), } } } diff --git a/crates/brk_computer/src/storage/vecs/grouped/builder.rs b/crates/brk_computer/src/storage/vecs/grouped/builder.rs index cb951347c..5e2f82402 100644 --- a/crates/brk_computer/src/storage/vecs/grouped/builder.rs +++ b/crates/brk_computer/src/storage/vecs/grouped/builder.rs @@ -2,8 +2,7 @@ use std::path::Path; use brk_exit::Exit; use brk_vec::{ - AnyStoredVec, Compressed, DynamicVec, GenericVec, Result, StoredIndex, StoredType, StoredVec, - Version, + Compressed, DynamicVec, GenericVec, Result, StoredIndex, StoredType, StoredVec, Version, }; use crate::storage::vecs::base::ComputedVec; @@ -133,7 +132,7 @@ where .checked_sub(1) .map_or(T::from(0_usize), |prev_i| { total_vec - .get(I::from(prev_i)) + .cached_get(I::from(prev_i)) .unwrap() .map_or(T::from(0_usize), |v| v.into_inner()) }); @@ -164,21 +163,24 @@ where let index = self.starting_index(max_from); first_indexes.iter_from(index, |(i, first_index, ..)| { - let last_index = *last_indexes.get(i).unwrap().unwrap(); + let last_index = *last_indexes.cached_get(i)?.unwrap(); if let Some(first) = self.first.as_mut() { - let v = source.get(first_index).unwrap().unwrap().into_inner(); + let v = source.cached_get(first_index)?.unwrap().into_inner(); first.forced_push_at(index, v, exit)?; } if let Some(last) = self.last.as_mut() { - let v = source.get(last_index).unwrap().unwrap().into_inner(); + let v = source + .cached_get(last_index) + .inspect_err(|_| { + dbg!(last.path(), last_index); + })? + .unwrap() + .into_inner(); last.forced_push_at(index, v, exit)?; } - let first_index = first_index.to_usize()?; - let last_index = last_index.to_usize()?; - let needs_sum_or_total = self.sum.is_some() || self.total.is_some(); let needs_average_sum_or_total = needs_sum_or_total || self.average.is_some(); let needs_sorted = self.max.is_some() @@ -191,8 +193,7 @@ where let needs_values = needs_sorted || needs_average_sum_or_total; if needs_values { - let mut values = - source.collect_range(Some(first_index as i64), Some(last_index as i64))?; + let mut values = source.collect_inclusive_range(first_index, last_index)?; if needs_sorted { values.sort_unstable(); @@ -251,7 +252,7 @@ where T::from(0_usize), |prev_i| { total_vec - .get(I::from(prev_i)) + .cached_get(I::from(prev_i)) .unwrap() .unwrap() .to_owned() @@ -298,14 +299,14 @@ where let index = self.starting_index(max_from); first_indexes.iter_from(index, |(i, first_index, ..)| { - let last_index = *last_indexes.get(i).unwrap().unwrap(); + let last_index = *last_indexes.cached_get(i).unwrap().unwrap(); if let Some(first) = self.first.as_mut() { let v = source .first .as_mut() .unwrap() - .get(first_index) + .cached_get(first_index) .unwrap() .unwrap() .into_inner(); @@ -317,16 +318,13 @@ where .last .as_mut() .unwrap() - .get(last_index) + .cached_get(last_index) .unwrap() .unwrap() .into_inner(); last.forced_push_at(index, v, exit)?; } - let first_index = Some(first_index.to_usize()? as i64); - let last_index = Some(last_index.to_usize()? as i64); - let needs_sum_or_total = self.sum.is_some() || self.total.is_some(); let needs_average_sum_or_total = needs_sum_or_total || self.average.is_some(); let needs_sorted = self.max.is_some() || self.min.is_some(); @@ -339,7 +337,7 @@ where .max .as_ref() .unwrap() - .collect_range(first_index, last_index)?; + .collect_inclusive_range(first_index, last_index)?; values.sort_unstable(); max.forced_push_at(i, values.last().unwrap().clone(), exit)?; } @@ -349,7 +347,7 @@ where .min .as_ref() .unwrap() - .collect_range(first_index, last_index)?; + .collect_inclusive_range(first_index, last_index)?; values.sort_unstable(); min.forced_push_at(i, values.first().unwrap().clone(), exit)?; } @@ -361,7 +359,7 @@ where .average .as_ref() .unwrap() - .collect_range(first_index, last_index)?; + .collect_inclusive_range(first_index, last_index)?; let len = values.len() as f64; let total = values .into_iter() @@ -378,7 +376,7 @@ where .sum .as_ref() .unwrap() - .collect_range(first_index, last_index)?; + .collect_inclusive_range(first_index, last_index)?; let sum = values.into_iter().fold(T::from(0), |a, b| a + b); if let Some(sum_vec) = self.sum.as_mut() { @@ -390,7 +388,7 @@ where T::from(0_usize), |prev_i| { total_vec - .get(I::from(prev_i)) + .cached_get(I::from(prev_i)) .unwrap() .unwrap() .into_inner() @@ -438,8 +436,8 @@ where )) } - pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> { - let mut v: Vec<&dyn AnyStoredVec> = vec![]; + pub fn any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> { + let mut v: Vec<&dyn brk_vec::AnyStoredVec> = vec![]; if let Some(first) = self.first.as_ref() { v.push(first.any_vec()); diff --git a/crates/brk_computer/src/storage/vecs/indexes.rs b/crates/brk_computer/src/storage/vecs/indexes.rs index 7a068b97f..38c4984a4 100644 --- a/crates/brk_computer/src/storage/vecs/indexes.rs +++ b/crates/brk_computer/src/storage/vecs/indexes.rs @@ -345,7 +345,7 @@ impl Vecs { |(h, d, s, ..)| { let d = h .decremented() - .and_then(|h| s.get(h).ok()) + .and_then(|h| s.cached_get(h).ok()) .flatten() .map_or(d, |prev_d| { let prev_d = *prev_d; @@ -365,7 +365,7 @@ impl Vecs { let starting_dateindex = self .height_to_dateindex - .get(starting_indexes.height.decremented().unwrap_or_default())? + .cached_get(starting_indexes.height.decremented().unwrap_or_default())? .map_or_else(Default::default, |v| v.into_inner()); self.height_to_dateindex.compute_transform( @@ -377,7 +377,7 @@ impl Vecs { let starting_dateindex = if let Some(dateindex) = self .height_to_dateindex - .get(starting_indexes.height.decremented().unwrap_or_default())? + .cached_get(starting_indexes.height.decremented().unwrap_or_default())? .map(|v| v.into_inner()) { starting_dateindex.min(dateindex) @@ -450,7 +450,7 @@ impl Vecs { let starting_weekindex = self .dateindex_to_weekindex - .get(starting_dateindex)? + .cached_get(starting_dateindex)? .map_or_else(Default::default, |v| v.into_inner()); self.dateindex_to_weekindex.compute_transform( @@ -485,7 +485,12 @@ impl Vecs { self.weekindex_to_timestamp.compute_transform( starting_weekindex, self.weekindex_to_first_dateindex.mut_vec(), - |(i, d, ..)| (i, *self.dateindex_to_timestamp.get(d).unwrap().unwrap()), + |(i, d, ..)| { + ( + i, + *self.dateindex_to_timestamp.cached_get(d).unwrap().unwrap(), + ) + }, exit, )?; @@ -493,7 +498,7 @@ impl Vecs { let starting_monthindex = self .dateindex_to_monthindex - .get(starting_dateindex)? + .cached_get(starting_dateindex)? .map_or_else(Default::default, |v| v.into_inner()); self.dateindex_to_monthindex.compute_transform( @@ -530,7 +535,12 @@ impl Vecs { self.monthindex_to_timestamp.compute_transform( starting_monthindex, self.monthindex_to_first_dateindex.mut_vec(), - |(i, d, ..)| (i, *self.dateindex_to_timestamp.get(d).unwrap().unwrap()), + |(i, d, ..)| { + ( + i, + *self.dateindex_to_timestamp.cached_get(d).unwrap().unwrap(), + ) + }, exit, )?; @@ -538,7 +548,7 @@ impl Vecs { let starting_quarterindex = self .monthindex_to_quarterindex - .get(starting_monthindex)? + .cached_get(starting_monthindex)? .map_or_else(Default::default, |v| v.into_inner()); self.monthindex_to_quarterindex.compute_transform( @@ -575,7 +585,12 @@ impl Vecs { self.quarterindex_to_timestamp.compute_transform( starting_quarterindex, self.quarterindex_to_first_monthindex.mut_vec(), - |(i, m, ..)| (i, *self.monthindex_to_timestamp.get(m).unwrap().unwrap()), + |(i, m, ..)| { + ( + i, + *self.monthindex_to_timestamp.cached_get(m).unwrap().unwrap(), + ) + }, exit, )?; @@ -583,7 +598,7 @@ impl Vecs { let starting_yearindex = self .monthindex_to_yearindex - .get(starting_monthindex)? + .cached_get(starting_monthindex)? .map_or_else(Default::default, |v| v.into_inner()); self.monthindex_to_yearindex.compute_transform( @@ -620,7 +635,12 @@ impl Vecs { self.yearindex_to_timestamp.compute_transform( starting_yearindex, self.yearindex_to_first_monthindex.mut_vec(), - |(i, m, ..)| (i, *self.monthindex_to_timestamp.get(m).unwrap().unwrap()), + |(i, m, ..)| { + ( + i, + *self.monthindex_to_timestamp.cached_get(m).unwrap().unwrap(), + ) + }, exit, )?; @@ -628,7 +648,7 @@ impl Vecs { let starting_decadeindex = self .yearindex_to_decadeindex - .get(starting_yearindex)? + .cached_get(starting_yearindex)? .map_or_else(Default::default, |v| v.into_inner()); self.yearindex_to_decadeindex.compute_transform( @@ -663,7 +683,12 @@ impl Vecs { self.decadeindex_to_timestamp.compute_transform( starting_decadeindex, self.decadeindex_to_first_yearindex.mut_vec(), - |(i, y, ..)| (i, *self.yearindex_to_timestamp.get(y).unwrap().unwrap()), + |(i, y, ..)| { + ( + i, + *self.yearindex_to_timestamp.cached_get(y).unwrap().unwrap(), + ) + }, exit, )?; @@ -671,7 +696,7 @@ impl Vecs { let starting_difficultyepoch = self .height_to_difficultyepoch - .get(starting_indexes.height)? + .cached_get(starting_indexes.height)? .map_or_else(Default::default, |v| v.into_inner()); self.height_to_difficultyepoch.compute_transform( @@ -709,7 +734,11 @@ impl Vecs { |(i, h, ..)| { ( i, - *indexer_vecs.height_to_timestamp.get(h).unwrap().unwrap(), + *indexer_vecs + .height_to_timestamp + .cached_get(h) + .unwrap() + .unwrap(), ) }, exit, @@ -719,7 +748,7 @@ impl Vecs { let starting_halvingepoch = self .height_to_halvingepoch - .get(starting_indexes.height)? + .cached_get(starting_indexes.height)? .map_or_else(Default::default, |v| v.into_inner()); self.height_to_halvingepoch.compute_transform( @@ -757,7 +786,7 @@ impl Vecs { // |(i, h, ..)| { // ( // i, - // *indexer_vecs.height_to_timestamp.get(h).unwrap().unwrap(), + // *indexer_vecs.height_to_timestamp.cached_get(h).unwrap().unwrap(), // ) // }, // exit, diff --git a/crates/brk_computer/src/storage/vecs/marketprice.rs b/crates/brk_computer/src/storage/vecs/marketprice.rs index c0aa20e33..2f849aa7c 100644 --- a/crates/brk_computer/src/storage/vecs/marketprice.rs +++ b/crates/brk_computer/src/storage/vecs/marketprice.rs @@ -243,8 +243,9 @@ impl Vecs { .get_height( h, t, - h.decremented() - .map(|prev_h| *height_to_timestamp.get(prev_h).unwrap().unwrap()), + h.decremented().map(|prev_h| { + *height_to_timestamp.cached_get(prev_h).unwrap().unwrap() + }), ) .unwrap(); (h, ohlc) @@ -470,7 +471,7 @@ impl Vecs { .first .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), high: *self @@ -479,7 +480,7 @@ impl Vecs { .max .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), low: *self @@ -488,7 +489,7 @@ impl Vecs { .min .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), close, @@ -516,7 +517,7 @@ impl Vecs { .first .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), high: *self @@ -525,7 +526,7 @@ impl Vecs { .max .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), low: *self @@ -534,7 +535,7 @@ impl Vecs { .min .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), close, @@ -562,7 +563,7 @@ impl Vecs { .first .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), high: *self @@ -571,7 +572,7 @@ impl Vecs { .max .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), low: *self @@ -580,7 +581,7 @@ impl Vecs { .min .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), close, @@ -608,7 +609,7 @@ impl Vecs { .first .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), high: *self @@ -617,7 +618,7 @@ impl Vecs { .max .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), low: *self @@ -626,7 +627,7 @@ impl Vecs { .min .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), close, @@ -654,7 +655,7 @@ impl Vecs { .first .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), high: *self @@ -663,7 +664,7 @@ impl Vecs { .max .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), low: *self @@ -672,7 +673,7 @@ impl Vecs { .min .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), close, @@ -704,7 +705,7 @@ impl Vecs { .first .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), high: *self @@ -713,7 +714,7 @@ impl Vecs { .max .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), low: *self @@ -722,7 +723,7 @@ impl Vecs { .min .as_mut() .unwrap() - .get(i) + .cached_get(i) .unwrap() .unwrap(), close, diff --git a/crates/brk_computer/src/storage/vecs/transactions.rs b/crates/brk_computer/src/storage/vecs/transactions.rs index 1acd8762a..c39937cba 100644 --- a/crates/brk_computer/src/storage/vecs/transactions.rs +++ b/crates/brk_computer/src/storage/vecs/transactions.rs @@ -182,8 +182,10 @@ impl Vecs { |(txinindex, txoutindex, slf, other)| { let value = if txoutindex == Txoutindex::COINBASE { Sats::ZERO - } else if let Ok(Some(value)) = - indexer_vecs.txoutindex_to_value.mut_vec().get(txoutindex) + } else if let Ok(Some(value)) = indexer_vecs + .txoutindex_to_value + .mut_vec() + .cached_get(txoutindex) { *value } else { diff --git a/crates/brk_indexer/examples/main.rs b/crates/brk_indexer/examples/main.rs index 849ccd39f..a835ffe49 100644 --- a/crates/brk_indexer/examples/main.rs +++ b/crates/brk_indexer/examples/main.rs @@ -24,7 +24,7 @@ fn main() -> color_eyre::Result<()> { let outputs = Path::new("../../_outputs"); - let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), false, false)?; + let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), false, true)?; indexer.import_stores()?; indexer.import_vecs()?; diff --git a/crates/brk_indexer/src/vecs/base.rs b/crates/brk_indexer/src/vecs/base.rs index d340f263c..4c597fbbd 100644 --- a/crates/brk_indexer/src/vecs/base.rs +++ b/crates/brk_indexer/src/vecs/base.rs @@ -18,7 +18,7 @@ where T: StoredType, { height: Option, - vec: StoredVec, + inner: StoredVec, } impl IndexedVec @@ -31,76 +31,46 @@ where version: Version, compressed: Compressed, ) -> brk_vec::Result { - let mut vec = StoredVec::forced_import(path, version, compressed)?; + let mut inner = StoredVec::forced_import(path, version, compressed)?; - vec.enable_large_cache_if_needed(); + inner.enable_large_cache_if_needed(); Ok(Self { height: Height::try_from(Self::path_height_(path).as_path()).ok(), - vec, + inner, }) } #[inline] pub fn get(&self, index: I) -> Result>> { - self.get_(index.to_usize()?) + self.inner.get(index) } #[inline] - fn get_(&self, index: usize) -> Result>> { - self.vec.get_(index) - // match self.vec.index_to_pushed_index(index) { - // Ok(index) => { - // if let Some(index) = index { - // return Ok(self.vec.pushed().get(index).map(|v| Value::Ref(v))); - // } - // } - // Err(Error::IndexTooHigh) => return Ok(None), - // Err(Error::IndexTooLow) => {} - // Err(error) => return Err(error), - // } - - // let large_cache_len = self.vec.large_cache_len(); - // if large_cache_len != 0 { - // let page_index = Self::index_to_page_index(index); - // let last_index = self.vec.stored_len() - 1; - // let max_page_index = Self::index_to_page_index(last_index); - // let min_page_index = (max_page_index + 1) - large_cache_len; - - // if page_index >= min_page_index { - // self.vec - // .pages() - // .unwrap() - // .get(page_index - min_page_index) - // .ok_or(Error::MmapsVecIsTooSmall)? - // .get_or_init(|| self.vec.decode_page(page_index).unwrap()) - // .get(index) - // } - // } - - // Ok(self.vec.read_(index)?.map(|v| Value::Owned(v))) + pub fn cached_get(&mut self, index: I) -> Result>> { + self.inner.cached_get(index) } pub fn iter_from(&mut self, index: I, f: F) -> Result<()> where F: FnMut((I, T, &mut dyn DynamicVec)) -> Result<()>, { - self.vec.iter_from(index, f) + self.inner.iter_from(index, f) } #[inline] pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { - match self.vec.len().cmp(&index.to_usize()?) { + match self.inner.len().cmp(&index.to_usize()?) { Ordering::Greater => { // dbg!(len, index, &self.pathbuf); // panic!(); Ok(()) } Ordering::Equal => { - self.vec.push(value); + self.inner.push(value); Ok(()) } Ordering::Less => { - dbg!(index, value, self.vec.len(), self.path_height()); + dbg!(index, value, self.inner.len(), self.path_height()); Err(Error::IndexTooHigh) } } @@ -110,45 +80,45 @@ where if self.height.is_none_or(|self_height| self_height != height) { height.write(&self.path_height())?; } - self.vec.truncate_if_needed(index)?; + self.inner.truncate_if_needed(index)?; Ok(()) } pub fn flush(&mut self, height: Height) -> Result<()> { height.write(&self.path_height())?; - self.vec.flush() + self.inner.flush() } pub fn vec(&self) -> &StoredVec { - &self.vec + &self.inner } pub fn mut_vec(&mut self) -> &mut StoredVec { - &mut self.vec + &mut self.inner } pub fn any_vec(&self) -> &dyn brk_vec::AnyStoredVec { - &self.vec + &self.inner } pub fn len(&self) -> usize { - self.vec.len() + self.inner.len() } pub fn is_empty(&self) -> bool { - self.vec.is_empty() + self.inner.is_empty() } #[inline] pub fn hasnt(&self, index: I) -> Result { - self.vec.has(index).map(|b| !b) + self.inner.has(index).map(|b| !b) } pub fn height(&self) -> brk_core::Result { Height::try_from(self.path_height().as_path()) } fn path_height(&self) -> PathBuf { - Self::path_height_(self.vec.path()) + Self::path_height_(self.inner.path()) } fn path_height_(path: &Path) -> PathBuf { path.join("height") diff --git a/crates/brk_indexer/src/vecs/mod.rs b/crates/brk_indexer/src/vecs/mod.rs index 1eac952ec..ab6754c61 100644 --- a/crates/brk_indexer/src/vecs/mod.rs +++ b/crates/brk_indexer/src/vecs/mod.rs @@ -374,7 +374,6 @@ impl Vecs { pub fn rollback_if_needed(&mut self, starting_indexes: &Indexes) -> brk_vec::Result<()> { let saved_height = starting_indexes.height.decremented().unwrap_or_default(); - // Now we can cut everything that's out of date let &Indexes { addressindex, height, diff --git a/crates/brk_vec/examples/main.rs b/crates/brk_vec/examples/main.rs index 6062b07c8..c183a4546 100644 --- a/crates/brk_vec/examples/main.rs +++ b/crates/brk_vec/examples/main.rs @@ -6,7 +6,7 @@ fn main() -> Result<(), Box> { let _ = fs::remove_dir_all("./vec"); let version = Version::ZERO; - let compressed = Compressed::NO; + let compressed = Compressed::YES; { let mut vec: StoredVec = @@ -48,7 +48,7 @@ fn main() -> Result<(), Box> { let mut vec: StoredVec = StoredVec::forced_import(Path::new("./vec"), version, compressed)?; - // vec.enable_large_cache_if_possible(); + vec.enable_large_cache_if_needed(); dbg!(vec.get(0)?); dbg!(vec.get(20)?); @@ -71,7 +71,7 @@ fn main() -> Result<(), Box> { Ok(()) })?; - dbg!(vec.collect_range(Some(-5), None)?); + dbg!(vec.collect_signed_range(Some(-5), None)?); } Ok(()) diff --git a/crates/brk_vec/src/lib.rs b/crates/brk_vec/src/lib.rs index 565930d57..f7ec58872 100644 --- a/crates/brk_vec/src/lib.rs +++ b/crates/brk_vec/src/lib.rs @@ -63,10 +63,40 @@ where type T = T; #[inline] - fn get_(&self, index: usize) -> Result>> { + fn get_stored_(&self, index: usize, guard: &Mmap) -> Result> { match self { - StoredVec::Raw(v) => v.get_(index), - StoredVec::Compressed(v) => v.get_(index), + StoredVec::Raw(v) => v.get_stored_(index, guard), + StoredVec::Compressed(v) => v.get_stored_(index, guard), + } + } + #[inline] + fn cached_get_stored_(&mut self, index: usize, guard: &Mmap) -> Result> { + match self { + StoredVec::Raw(v) => v.cached_get_stored_(index, guard), + StoredVec::Compressed(v) => v.cached_get_stored_(index, guard), + } + } + + #[inline] + fn mmap(&self) -> &ArcSwap { + match self { + StoredVec::Raw(v) => v.mmap(), + StoredVec::Compressed(v) => v.mmap(), + } + } + + #[inline] + fn guard(&self) -> &Option>> { + match self { + StoredVec::Raw(v) => v.guard(), + StoredVec::Compressed(v) => v.guard(), + } + } + #[inline] + fn mut_guard(&mut self) -> &mut Option>> { + match self { + StoredVec::Raw(v) => v.mut_guard(), + StoredVec::Compressed(v) => v.mut_guard(), } } @@ -96,8 +126,8 @@ where impl GenericVec for StoredVec where - I: StoredIndex + Send + Sync, - T: StoredType + Send + Sync, + I: StoredIndex, + T: StoredType, { fn iter_from(&mut self, index: I, f: F) -> Result<()> where @@ -109,7 +139,7 @@ where } } - fn collect_range(&self, from: Option, to: Option) -> Result> { + fn collect_range(&self, from: Option, to: Option) -> Result> { match self { StoredVec::Raw(v) => v.collect_range(from, to), StoredVec::Compressed(v) => v.collect_range(from, to), @@ -130,29 +160,6 @@ where } } - #[inline] - fn mmap(&self) -> &ArcSwap { - match self { - StoredVec::Raw(v) => v.mmap(), - StoredVec::Compressed(v) => v.mmap(), - } - } - - #[inline] - fn guard(&self) -> &Option>> { - match self { - StoredVec::Raw(v) => v.guard(), - StoredVec::Compressed(v) => v.guard(), - } - } - #[inline] - fn mut_guard(&mut self) -> &mut Option>> { - match self { - StoredVec::Raw(v) => v.mut_guard(), - StoredVec::Compressed(v) => v.mut_guard(), - } - } - #[inline] fn path(&self) -> &Path { match self { diff --git a/crates/brk_vec/src/structs/compressed_pages_meta.rs b/crates/brk_vec/src/structs/compressed_pages_meta.rs index 05ab3b38f..b57b8f069 100644 --- a/crates/brk_vec/src/structs/compressed_pages_meta.rs +++ b/crates/brk_vec/src/structs/compressed_pages_meta.rs @@ -22,8 +22,10 @@ impl CompressedPagesMetadata { const PAGE_SIZE: usize = size_of::(); pub fn read(path: &Path) -> Result { + let path = path.join("pages_meta"); + let slf = Self { - vec: fs::read(path) + vec: fs::read(&path) .unwrap_or_default() .chunks(Self::PAGE_SIZE) .map(|bytes| { diff --git a/crates/brk_vec/src/traits/dynamic.rs b/crates/brk_vec/src/traits/dynamic.rs index 5359b83db..853739c61 100644 --- a/crates/brk_vec/src/traits/dynamic.rs +++ b/crates/brk_vec/src/traits/dynamic.rs @@ -1,4 +1,9 @@ -use crate::{Result, Value}; +use std::sync::Arc; + +use arc_swap::{ArcSwap, Guard}; +use memmap2::Mmap; + +use crate::{Error, Result, Value}; use super::{StoredIndex, StoredType}; @@ -10,7 +15,28 @@ pub trait DynamicVec: Send + Sync { fn get(&self, index: Self::I) -> Result>> { self.get_(index.to_usize()?) } - fn get_(&self, index: usize) -> Result>>; + #[inline] + fn cached_get(&mut self, index: Self::I) -> Result>> { + self.get_(index.to_usize()?) + } + #[inline] + fn get_(&self, index: usize) -> Result>> { + match self.index_to_pushed_index(index) { + Ok(index) => { + if let Some(index) = index { + return Ok(self.pushed().get(index).map(Value::Ref)); + } + } + Err(Error::IndexTooHigh) => return Ok(None), + Err(Error::IndexTooLow) => {} + Err(error) => return Err(error), + } + + Ok(self + .get_stored_(index.to_usize()?, self.guard().as_ref().unwrap())? + .map(Value::Owned)) + } + fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result>; fn get_last(&self) -> Result>> { let len = self.len(); if len == 0 { @@ -18,6 +44,33 @@ pub trait DynamicVec: Send + Sync { } self.get_(len - 1) } + #[inline] + fn cached_get_(&mut self, index: usize) -> Result>> { + match self.index_to_pushed_index(index) { + Ok(index) => { + if let Some(index) = index { + return Ok(self.pushed().get(index).map(Value::Ref)); + } + } + Err(Error::IndexTooHigh) => return Ok(None), + Err(Error::IndexTooLow) => {} + Err(error) => return Err(error), + } + + let mmap = Arc::clone(self.guard().as_ref().unwrap()); + + Ok(self + .cached_get_stored_(index.to_usize()?, &mmap)? + .map(Value::Owned)) + } + fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result>; + fn cached_get_last(&mut self) -> Result>> { + let len = self.len(); + if len == 0 { + return Ok(None); + } + self.cached_get_(len - 1) + } #[inline] fn len(&self) -> usize { @@ -28,6 +81,15 @@ pub trait DynamicVec: Send + Sync { self.len() == 0 } + fn mmap(&self) -> &ArcSwap; + + #[inline] + fn new_guard(&self) -> Guard> { + self.mmap().load() + } + fn guard(&self) -> &Option>>; + fn mut_guard(&mut self) -> &mut Option>>; + fn stored_len(&self) -> usize; fn pushed(&self) -> &[Self::T]; @@ -40,4 +102,19 @@ pub trait DynamicVec: Send + Sync { fn push(&mut self, value: Self::T) { self.mut_pushed().push(value) } + #[inline] + fn index_to_pushed_index(&self, index: usize) -> Result> { + let stored_len = self.stored_len(); + + if index >= stored_len { + let index = index - stored_len; + if index >= self.pushed_len() { + Err(Error::IndexTooHigh) + } else { + Ok(Some(index)) + } + } else { + Err(Error::IndexTooLow) + } + } } diff --git a/crates/brk_vec/src/traits/generic.rs b/crates/brk_vec/src/traits/generic.rs index 83282dd25..606d78e16 100644 --- a/crates/brk_vec/src/traits/generic.rs +++ b/crates/brk_vec/src/traits/generic.rs @@ -5,7 +5,6 @@ use std::{ sync::Arc, }; -use arc_swap::{ArcSwap, Guard}; use axum::{ Json, response::{IntoResponse, Response}, @@ -38,9 +37,13 @@ where fn file_set_len(&mut self, len: u64) -> Result<()> { let mut file = self.open_file()?; + Self::file_set_len_(&mut file, len)?; + self.update_mmap(file) + } + fn file_set_len_(file: &mut File, len: u64) -> Result<()> { file.set_len(len)?; file.seek(SeekFrom::End(0))?; - self.update_mmap(file) + Ok(()) } fn file_write_all(&mut self, buf: &[u8]) -> Result<()> { @@ -49,34 +52,31 @@ where self.update_mmap(file) } + fn file_truncate_and_write_all(&mut self, len: u64, buf: &[u8]) -> Result<()> { + let mut file = self.open_file()?; + Self::file_set_len_(&mut file, len)?; + file.write_all(buf)?; + self.update_mmap(file) + } + #[inline] fn reset(&mut self) -> Result<()> { self.file_write_all(&[])?; Ok(()) } - fn mmap(&self) -> &ArcSwap; - - #[inline] - fn new_guard(&self) -> Guard> { - self.mmap().load() - } - fn guard(&self) -> &Option>>; - fn mut_guard(&mut self) -> &mut Option>>; - fn new_mmap(file: File) -> Result> { Ok(Arc::new(unsafe { Mmap::map(&file)? })) } fn update_mmap(&mut self, file: File) -> Result<()> { - file.sync_all()?; let mmap = Self::new_mmap(file)?; self.mmap().store(mmap); if self.guard().is_some() { let guard = self.new_guard(); self.mut_guard().replace(guard); } else { - unreachable!() + unreachable!("This function shouldn't be called in a cloned instance") } Ok(()) } @@ -86,22 +86,6 @@ where self.pushed_len() == 0 } - #[inline] - fn index_to_pushed_index(&self, index: usize) -> Result> { - let stored_len = self.stored_len(); - - if index >= stored_len { - let index = index - stored_len; - if index >= self.pushed_len() { - Err(Error::IndexTooHigh) - } else { - Ok(Some(index)) - } - } else { - Err(Error::IndexTooLow) - } - } - #[inline] fn has(&self, index: Self::I) -> Result { Ok(self.has_(index.to_usize()?)) @@ -140,27 +124,33 @@ where ), ) -> Result<()>; - fn fix_i64(i: i64, len: usize, from: bool) -> usize { + fn flush(&mut self) -> Result<()>; + + fn truncate_if_needed(&mut self, index: Self::I) -> Result<()>; + + fn collect_range(&self, from: Option, to: Option) -> Result>; + + #[inline] + fn collect_inclusive_range(&self, from: I, to: I) -> Result> { + self.collect_range(Some(from.to_usize()?), Some(to.to_usize()? + 1)) + } + + #[inline] + fn i64_to_usize(i: i64, len: usize) -> usize { if i >= 0 { - let v = i as usize; - if v < len { - v - } else if from { - len - 1 - } else { - len - } + i as usize } else { let v = len as i64 + i; if v < 0 { 0 } else { v as usize } } } - fn flush(&mut self) -> Result<()>; - - fn truncate_if_needed(&mut self, index: Self::I) -> Result<()>; - - fn collect_range(&self, from: Option, to: Option) -> Result>; + fn collect_signed_range(&self, from: Option, to: Option) -> Result> { + let len = self.len(); + let from = from.map(|i| Self::i64_to_usize(i, len)); + let to = to.map(|i| Self::i64_to_usize(i, len)); + self.collect_range(from, to) + } #[inline] fn collect_range_axum_json( @@ -168,12 +158,12 @@ where from: Option, to: Option, ) -> Result>> { - Ok(Json(self.collect_range(from, to)?)) + Ok(Json(self.collect_signed_range(from, to)?)) } #[inline] fn collect_range_serde_json(&self, from: Option, to: Option) -> Result> { - self.collect_range(from, to)? + self.collect_signed_range(from, to)? .into_iter() .map(|v| serde_json::to_value(v).map_err(Error::from)) .collect::>>() @@ -200,6 +190,11 @@ where path.join("version") } + #[inline] + fn path_compressed_(path: &Path) -> PathBuf { + path.join("compressed") + } + #[inline] fn file_name(&self) -> String { self.path() diff --git a/crates/brk_vec/src/variants/compressed.rs b/crates/brk_vec/src/variants/compressed.rs index f8e62bb43..24ead9dc1 100644 --- a/crates/brk_vec/src/variants/compressed.rs +++ b/crates/brk_vec/src/variants/compressed.rs @@ -1,5 +1,6 @@ use std::{ - fs, mem, + fs::{self, File}, + mem, path::Path, sync::{Arc, OnceLock}, }; @@ -11,7 +12,7 @@ use zstd::DEFAULT_COMPRESSION_LEVEL; use crate::{ CompressedPageMetadata, CompressedPagesMetadata, DynamicVec, Error, GenericVec, RawVec, Result, - StoredIndex, StoredType, UnsafeSlice, Value, Version, + StoredIndex, StoredType, UnsafeSlice, Version, }; const ONE_KIB: usize = 1024; @@ -54,6 +55,20 @@ where } pub fn import(path: &Path, version: Version) -> Result { + fs::create_dir_all(path)?; + + let vec_exists = fs::exists(Self::path_vec_(path)).is_ok_and(|b| b); + let compressed_path = Self::path_compressed_(path); + let compressed_exists = fs::exists(&compressed_path).is_ok_and(|b| b); + + if vec_exists && !compressed_exists { + return Err(Error::DifferentCompressionMode); + } + + if !vec_exists && !compressed_exists { + File::create(&compressed_path)?; + } + Ok(Self { inner: RawVec::import(path, version)?, decoded_page: None, @@ -62,13 +77,30 @@ where }) } - pub fn decode_page(&self, page_index: usize) -> Result> { - Self::decode_page_( - self.stored_len(), - page_index, - self.guard().as_ref().unwrap(), - &self.pages_meta.load(), - ) + fn cached_get_stored__( + index: usize, + mmap: &Mmap, + stored_len: usize, + decoded_page: &mut Option<(usize, Vec)>, + compressed_pages_meta: &CompressedPagesMetadata, + ) -> Result> { + let page_index = Self::index_to_page_index(index); + + if decoded_page.as_ref().is_none_or(|b| b.0 != page_index) { + let values = Self::decode_page_(stored_len, page_index, mmap, compressed_pages_meta)?; + decoded_page.replace((page_index, values)); + } + + Ok(decoded_page + .as_ref() + .unwrap() + .1 + .get(index % Self::PER_PAGE) + .cloned()) + } + + fn decode_page(&self, page_index: usize, mmap: &Mmap) -> Result> { + Self::decode_page_(self.stored_len(), page_index, mmap, &self.pages_meta.load()) } fn decode_page_( @@ -96,7 +128,7 @@ where .collect::>()) } - fn compress_page(chunk: &[T]) -> Box<[u8]> { + fn compress_page(chunk: &[T]) -> Vec { if chunk.len() > Self::PER_PAGE { panic!(); } @@ -110,9 +142,7 @@ where .enumerate() .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); - zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL) - .unwrap() - .into_boxed_slice() + zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL).unwrap() } pub fn enable_large_cache(&mut self) { @@ -174,39 +204,70 @@ where type T = T; #[inline] - fn get_(&self, index: usize) -> Result>> { - match self.index_to_pushed_index(index) { - Ok(index) => { - if let Some(index) = index { - return Ok(self.pushed().get(index).map(|v| Value::Ref(v))); - } + fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result> { + let cached_start = self + .stored_len() + .checked_sub(Self::CACHE_LENGTH) + .unwrap_or_default(); + + let decoded_index = index % Self::PER_PAGE; + + if index >= cached_start { + let trimmed_index = index - cached_start; + if let Some(decoded_pages) = self.decoded_pages.as_ref() { + let decoded_page = decoded_pages + .get(Self::index_to_page_index(trimmed_index)) + .unwrap(); + + return Ok(decoded_page + .get_or_init(|| { + self.decode_page(Self::index_to_page_index(index), mmap) + .unwrap() + }) + .get(decoded_index) + .cloned()); } - Err(Error::IndexTooHigh) => return Ok(None), - Err(Error::IndexTooLow) => {} - Err(error) => return Err(error), } let page_index = Self::index_to_page_index(index); - // if self.page().is_none_or(|b| b.0 != page_index) { - // let values = self.decode_page(page_index)?; - // self.mut_page().replace((page_index, values)); - // } + Ok(self + .decode_page(page_index, mmap)? + .get(decoded_index) + .cloned()) + } + #[inline] + fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result> { + Self::cached_get_stored__( + index, + mmap, + self.stored_len(), + &mut self.decoded_page, + &self.pages_meta.load(), + ) + } - // self.page().unwrap().1.get(index) - // - todo!(); + #[inline] + fn mmap(&self) -> &ArcSwap { + self.inner.mmap() + } - // let v = self.inner.guard().as_ref().map_or_else( - // || Self::guard_to_value(&self.new_guard(), index), - // |guard| Self::guard_to_value(guard, index), - // ); - - // Ok(Some(Value::Owned(v))) + #[inline] + fn guard(&self) -> &Option>> { + self.inner.guard() + } + #[inline] + fn mut_guard(&mut self) -> &mut Option>> { + self.inner.mut_guard() } fn stored_len(&self) -> usize { - todo!() + let pages_meta = self.pages_meta.load(); + if let Some(last) = pages_meta.last() { + (pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize + } else { + 0 + } } #[inline] @@ -224,15 +285,61 @@ where I: StoredIndex, T: StoredType, { - fn iter_from(&mut self, _index: I, _f: F) -> Result<()> + fn iter_from(&mut self, index: I, mut f: F) -> Result<()> where F: FnMut((I, T, &mut dyn DynamicVec)) -> Result<()>, { - todo!() + if !self.is_pushed_empty() { + return Err(Error::UnsupportedUnflushedState); + } + + let start = index.to_usize()?; + + let stored_len = self.stored_len(); + if start >= stored_len { + return Ok(()); + } + + let mmap = self.mmap().load(); + let pages_meta = self.pages_meta.load(); + + (start..stored_len).try_for_each(|index| { + let v = Self::cached_get_stored__( + index, + &mmap, + stored_len, + &mut self.decoded_page, + &pages_meta, + )? + .unwrap(); + f((I::from(index), v, self as &mut dyn DynamicVec)) + }) } - fn collect_range(&self, from: Option, to: Option) -> Result> { - todo!() + fn collect_range(&self, from: Option, to: Option) -> Result> { + if !self.is_pushed_empty() { + return Err(Error::UnsupportedUnflushedState); + } + + let stored_len = self.stored_len(); + + let from = from.unwrap_or_default(); + let to = to.map_or(stored_len, |i| i.min(stored_len)); + + if from >= stored_len { + return Ok(vec![]); + } + + let mmap = self.mmap().load(); + let pages_meta = self.pages_meta.load(); + let mut decoded_page: Option<(usize, Vec)> = None; + + (from..to) + .map(|index| { + Self::cached_get_stored__(index, &mmap, stored_len, &mut decoded_page, &pages_meta) + .map(|opt| opt.unwrap()) + }) + .collect::>>() } fn flush(&mut self) -> Result<()> { @@ -260,7 +367,7 @@ where values = if let Some(values) = self .decoded_pages .as_mut() - .and_then(|big_cache| big_cache.last_mut().and_then(|lock| lock.take())) + .and_then(|v| v.last_mut().and_then(|lock| lock.take())) { values } else if self @@ -314,13 +421,13 @@ where pages_meta.push(page_index, page); }); - pages_meta.write()?; - let buf = compressed .into_iter() .flat_map(|(v, _)| v) .collect::>(); + pages_meta.write()?; + if let Some(truncate_at) = truncate_at { self.file_set_len(truncate_at)?; } @@ -335,21 +442,52 @@ where } fn truncate_if_needed(&mut self, index: I) -> Result<()> { - todo!() - } + let index = index.to_usize()?; - #[inline] - fn mmap(&self) -> &ArcSwap { - self.inner.mmap() - } + if index >= self.stored_len() { + return Ok(()); + } - #[inline] - fn guard(&self) -> &Option>> { - self.inner.guard() - } - #[inline] - fn mut_guard(&mut self) -> &mut Option>> { - self.inner.mut_guard() + if index == 0 { + self.reset()?; + return Ok(()); + } + + let page_index = Self::index_to_page_index(index); + + let guard = self.guard().as_ref().unwrap(); + + let mut pages_meta = (**self.pages_meta.load()).clone(); + + let values = self.decode_page(page_index, guard)?; + let mut buf = vec![]; + + let mut page = pages_meta.truncate(page_index).unwrap(); + + let len = page.start; + + let decoded_index = index % Self::PER_PAGE; + + if decoded_index != 0 { + let chunk = &values[..decoded_index]; + + buf = Self::compress_page(chunk); + + page.values_len = chunk.len() as u32; + page.bytes_len = buf.len() as u32; + + pages_meta.push(page_index, page); + } + + pages_meta.write()?; + + self.pages_meta.store(Arc::new(pages_meta)); + + self.file_truncate_and_write_all(len, &buf)?; + + self.reset_caches(); + + Ok(()) } #[inline] diff --git a/crates/brk_vec/src/variants/raw.rs b/crates/brk_vec/src/variants/raw.rs index 92154c1d6..d878460d2 100644 --- a/crates/brk_vec/src/variants/raw.rs +++ b/crates/brk_vec/src/variants/raw.rs @@ -10,9 +10,7 @@ use arc_swap::{ArcSwap, Guard}; use memmap2::Mmap; use rayon::prelude::*; -use crate::{ - DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Value, Version, -}; +use crate::{DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Version}; #[derive(Debug)] pub struct RawVec { @@ -73,25 +71,30 @@ where type T = T; #[inline] - fn get_(&self, index: usize) -> Result>> { - match self.index_to_pushed_index(index) { - Ok(index) => { - if let Some(index) = index { - return Ok(self.pushed().get(index).map(|v| Value::Ref(v))); - } - } - Err(Error::IndexTooHigh) => return Ok(None), - Err(Error::IndexTooLow) => {} - Err(error) => return Err(error), - } - - let guard = self.guard.as_ref().unwrap(); + fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result> { let index = index * Self::SIZE_OF_T; - let slice = &guard[index..(index + Self::SIZE_OF_T)]; + let slice = &mmap[index..(index + Self::SIZE_OF_T)]; + Self::T::try_read_from_bytes(slice) + .map(|v| Some(v)) + .map_err(Error::from) + } + #[inline] + fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result> { + self.get_stored_(index, mmap) + } - let v = Self::T::try_read_from_bytes(slice)?; + #[inline] + fn mmap(&self) -> &ArcSwap { + &self.mmap + } - Ok(Some(Value::Owned(v))) + #[inline] + fn guard(&self) -> &Option>> { + &self.guard + } + #[inline] + fn mut_guard(&mut self) -> &mut Option>> { + &mut self.guard } #[inline] @@ -126,21 +129,40 @@ where return Err(Error::UnsupportedUnflushedState); } + let start = index.to_usize()?; + + let stored_len = self.stored_len(); + if start >= stored_len { + return Ok(()); + } + let guard = self.mmap.load(); - let start = index.to_usize()? * Self::SIZE_OF_T; + (start..stored_len).try_for_each(|index| { + let v = self.get_stored_(index, &guard)?.unwrap(); + f((I::from(index), v, self as &mut dyn DynamicVec)) + }) + } - dbg!(self.path()); + fn collect_range(&self, from: Option, to: Option) -> Result> { + if !self.is_pushed_empty() { + return Err(Error::UnsupportedUnflushedState); + } - guard[start..] - .chunks(Self::SIZE_OF_T) - .enumerate() - .try_for_each(|(i, chunk)| { - let v = T::try_read_from_bytes(chunk).unwrap(); - f((I::from(i), v, self as &mut dyn DynamicVec)) - })?; + let stored_len = self.stored_len(); - Ok(()) + let from = from.unwrap_or_default(); + let to = to.map_or(stored_len, |i| i.min(stored_len)); + + if from >= stored_len { + return Ok(vec![]); + } + + let mmap = self.mmap.load(); + + (from..to) + .map(|index| self.get_stored_(index, &mmap).map(|opt| opt.unwrap())) + .collect::>>() } fn flush(&mut self) -> Result<()> { @@ -189,38 +211,6 @@ where Ok(()) } - fn collect_range(&self, from: Option, to: Option) -> Result> { - let guard = self.mmap.load(); - - let len = guard.len() / Self::SIZE_OF_T; - - if len == 0 { - return Ok(vec![]); - } - - let from = from.map_or(0, |i| Self::fix_i64(i, len, true)); - let to = to.map_or(len, |i| Self::fix_i64(i, len, false)); - - Ok(guard[from * Self::SIZE_OF_T..to * Self::SIZE_OF_T] - .chunks(Self::SIZE_OF_T) - .map(|chunk| T::try_read_from_bytes(chunk).unwrap()) - .collect::>()) - } - - #[inline] - fn mmap(&self) -> &ArcSwap { - &self.mmap - } - - #[inline] - fn guard(&self) -> &Option>> { - &self.guard - } - #[inline] - fn mut_guard(&mut self) -> &mut Option>> { - &mut self.guard - } - #[inline] fn path(&self) -> &Path { self.pathbuf.as_path()