vec: rework part 4

This commit is contained in:
nym21
2025-04-10 15:55:26 +02:00
parent 0dd7e9359e
commit 139e93b2f0
16 changed files with 553 additions and 341 deletions
+1 -1
View File
@@ -65,7 +65,7 @@ pub fn run(config: RunConfig) -> color_eyre::Result<()> {
let starting_indexes = indexer.index(&parser, rpc, &exit)?;
computer.compute(&mut indexer, starting_indexes, &exit)?;
// computer.compute(&mut indexer, starting_indexes, &exit)?;
if let Some(delay) = config.delay() {
sleep(Duration::from_secs(delay))
+31 -27
View File
@@ -15,7 +15,7 @@ use brk_vec::{
const ONE_KIB: usize = 1024;
const ONE_MIB: usize = ONE_KIB * ONE_KIB;
const MAX_CACHE_SIZE: usize = 100 * ONE_MIB;
const MAX_CACHE_SIZE: usize = 210 * ONE_MIB;
#[derive(Debug)]
pub struct ComputedVec<I, T>
@@ -24,7 +24,7 @@ where
T: StoredType,
{
computed_version: Option<Version>,
vec: StoredVec<I, T>,
inner: StoredVec<I, T>,
}
impl<I, T> ComputedVec<I, T>
@@ -43,7 +43,7 @@ where
Ok(Self {
computed_version: None,
vec,
inner: vec,
})
}
@@ -52,7 +52,7 @@ where
return Ok(());
}
exit.block();
self.vec.truncate_if_needed(index)?;
self.inner.truncate_if_needed(index)?;
exit.release();
Ok(())
}
@@ -67,11 +67,11 @@ where
if ord == Ordering::Greater {
self.safe_truncate_if_needed(index, exit)?;
}
self.vec.push(value);
self.inner.push(value);
}
}
if self.vec.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE {
if self.inner.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE {
self.safe_flush(exit)
} else {
Ok(())
@@ -83,52 +83,56 @@ where
return Ok(());
}
exit.block();
self.vec.flush()?;
self.inner.flush()?;
exit.release();
Ok(())
}
fn version(&self) -> Version {
self.vec.version()
self.inner.version()
}
pub fn len(&self) -> usize {
self.vec.len()
self.inner.len()
}
pub fn vec(&self) -> &StoredVec<I, T> {
&self.vec
&self.inner
}
pub fn mut_vec(&mut self) -> &mut StoredVec<I, T> {
&mut self.vec
&mut self.inner
}
pub fn any_vec(&self) -> &dyn brk_vec::AnyStoredVec {
&self.vec
&self.inner
}
pub fn mut_any_vec(&mut self) -> &mut dyn brk_vec::AnyStoredVec {
&mut self.vec
&mut self.inner
}
pub fn get(&mut self, index: I) -> Result<Option<Value<T>>> {
self.vec.get(index)
pub fn cached_get(&mut self, index: I) -> Result<Option<Value<T>>> {
self.inner.cached_get(index)
}
pub fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<T>> {
self.vec.collect_range(from, to)
pub fn collect_inclusive_range(&self, from: I, to: I) -> Result<Vec<T>> {
self.inner.collect_inclusive_range(from, to)
}
pub fn path(&self) -> &Path {
self.inner.path()
}
#[inline]
fn path_computed_version(&self) -> PathBuf {
self.vec.path().join("computed_version")
self.inner.path().join("computed_version")
}
fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> {
let path = self.path_computed_version();
if version.validate(path.as_ref()).is_err() {
self.vec.reset()?;
self.inner.reset()?;
}
version.write(path.as_ref())?;
Ok(())
@@ -174,12 +178,12 @@ where
)?;
let index = max_from.min(
self.vec
.get_last()?
self.inner
.cached_get_last()?
.map_or_else(T::default, |v| v.into_inner()),
);
other.iter_from(index, |(v, i, ..)| {
if self.get(i).unwrap().is_none_or(|old_v| *old_v > v) {
if self.cached_get(i).unwrap().is_none_or(|old_v| *old_v > v) {
self.forced_push_at(i, v, exit)
} else {
Ok(())
@@ -207,7 +211,7 @@ where
let index = max_from.min(T::from(self.len()));
first_indexes.iter_from(index, |(value, first_index, ..)| {
let first_index = (first_index).to_usize()?;
let last_index = (last_indexes.get(value)?.unwrap()).to_usize()?;
let last_index = (last_indexes.cached_get(value)?.unwrap()).to_usize()?;
(first_index..last_index)
.try_for_each(|index| self.forced_push_at(I::from(index), value, exit))
})?;
@@ -268,7 +272,7 @@ where
let index = max_from.min(I::from(self.len()));
first_indexes.iter_from(index, |(i, first_index, ..)| {
let last_index = last_indexes.get(i)?.unwrap();
let last_index = last_indexes.cached_get(i)?.unwrap();
let count = (*last_index + 1_usize)
.checked_sub(first_index)
.unwrap_or_default();
@@ -298,7 +302,7 @@ where
self_to_other.iter_from(index, |(i, other, ..)| {
self.forced_push_at(
i,
T::from(other_to_self.get(other)?.unwrap().into_inner() == i),
T::from(other_to_self.cached_get(other)?.unwrap().into_inner() == i),
exit,
)
})?;
@@ -324,7 +328,7 @@ where
let index = max_from.min(I::from(self.len()));
first_indexes.iter_from(index, |(index, first_index, ..)| {
let last_index = last_indexes.get(index)?.unwrap();
let last_index = last_indexes.cached_get(index)?.unwrap();
let count = *last_index + 1_usize - first_index;
self.forced_push_at(index, count.into(), exit)
})?;
@@ -341,7 +345,7 @@ where
fn clone(&self) -> Self {
Self {
computed_version: self.computed_version,
vec: self.vec.clone(),
inner: self.inner.clone(),
}
}
}
@@ -2,8 +2,7 @@ use std::path::Path;
use brk_exit::Exit;
use brk_vec::{
AnyStoredVec, Compressed, DynamicVec, GenericVec, Result, StoredIndex, StoredType, StoredVec,
Version,
Compressed, DynamicVec, GenericVec, Result, StoredIndex, StoredType, StoredVec, Version,
};
use crate::storage::vecs::base::ComputedVec;
@@ -133,7 +132,7 @@ where
.checked_sub(1)
.map_or(T::from(0_usize), |prev_i| {
total_vec
.get(I::from(prev_i))
.cached_get(I::from(prev_i))
.unwrap()
.map_or(T::from(0_usize), |v| v.into_inner())
});
@@ -164,21 +163,24 @@ where
let index = self.starting_index(max_from);
first_indexes.iter_from(index, |(i, first_index, ..)| {
let last_index = *last_indexes.get(i).unwrap().unwrap();
let last_index = *last_indexes.cached_get(i)?.unwrap();
if let Some(first) = self.first.as_mut() {
let v = source.get(first_index).unwrap().unwrap().into_inner();
let v = source.cached_get(first_index)?.unwrap().into_inner();
first.forced_push_at(index, v, exit)?;
}
if let Some(last) = self.last.as_mut() {
let v = source.get(last_index).unwrap().unwrap().into_inner();
let v = source
.cached_get(last_index)
.inspect_err(|_| {
dbg!(last.path(), last_index);
})?
.unwrap()
.into_inner();
last.forced_push_at(index, v, exit)?;
}
let first_index = first_index.to_usize()?;
let last_index = last_index.to_usize()?;
let needs_sum_or_total = self.sum.is_some() || self.total.is_some();
let needs_average_sum_or_total = needs_sum_or_total || self.average.is_some();
let needs_sorted = self.max.is_some()
@@ -191,8 +193,7 @@ where
let needs_values = needs_sorted || needs_average_sum_or_total;
if needs_values {
let mut values =
source.collect_range(Some(first_index as i64), Some(last_index as i64))?;
let mut values = source.collect_inclusive_range(first_index, last_index)?;
if needs_sorted {
values.sort_unstable();
@@ -251,7 +252,7 @@ where
T::from(0_usize),
|prev_i| {
total_vec
.get(I::from(prev_i))
.cached_get(I::from(prev_i))
.unwrap()
.unwrap()
.to_owned()
@@ -298,14 +299,14 @@ where
let index = self.starting_index(max_from);
first_indexes.iter_from(index, |(i, first_index, ..)| {
let last_index = *last_indexes.get(i).unwrap().unwrap();
let last_index = *last_indexes.cached_get(i).unwrap().unwrap();
if let Some(first) = self.first.as_mut() {
let v = source
.first
.as_mut()
.unwrap()
.get(first_index)
.cached_get(first_index)
.unwrap()
.unwrap()
.into_inner();
@@ -317,16 +318,13 @@ where
.last
.as_mut()
.unwrap()
.get(last_index)
.cached_get(last_index)
.unwrap()
.unwrap()
.into_inner();
last.forced_push_at(index, v, exit)?;
}
let first_index = Some(first_index.to_usize()? as i64);
let last_index = Some(last_index.to_usize()? as i64);
let needs_sum_or_total = self.sum.is_some() || self.total.is_some();
let needs_average_sum_or_total = needs_sum_or_total || self.average.is_some();
let needs_sorted = self.max.is_some() || self.min.is_some();
@@ -339,7 +337,7 @@ where
.max
.as_ref()
.unwrap()
.collect_range(first_index, last_index)?;
.collect_inclusive_range(first_index, last_index)?;
values.sort_unstable();
max.forced_push_at(i, values.last().unwrap().clone(), exit)?;
}
@@ -349,7 +347,7 @@ where
.min
.as_ref()
.unwrap()
.collect_range(first_index, last_index)?;
.collect_inclusive_range(first_index, last_index)?;
values.sort_unstable();
min.forced_push_at(i, values.first().unwrap().clone(), exit)?;
}
@@ -361,7 +359,7 @@ where
.average
.as_ref()
.unwrap()
.collect_range(first_index, last_index)?;
.collect_inclusive_range(first_index, last_index)?;
let len = values.len() as f64;
let total = values
.into_iter()
@@ -378,7 +376,7 @@ where
.sum
.as_ref()
.unwrap()
.collect_range(first_index, last_index)?;
.collect_inclusive_range(first_index, last_index)?;
let sum = values.into_iter().fold(T::from(0), |a, b| a + b);
if let Some(sum_vec) = self.sum.as_mut() {
@@ -390,7 +388,7 @@ where
T::from(0_usize),
|prev_i| {
total_vec
.get(I::from(prev_i))
.cached_get(I::from(prev_i))
.unwrap()
.unwrap()
.into_inner()
@@ -438,8 +436,8 @@ where
))
}
pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
let mut v: Vec<&dyn AnyStoredVec> = vec![];
pub fn any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
let mut v: Vec<&dyn brk_vec::AnyStoredVec> = vec![];
if let Some(first) = self.first.as_ref() {
v.push(first.any_vec());
+46 -17
View File
@@ -345,7 +345,7 @@ impl Vecs {
|(h, d, s, ..)| {
let d = h
.decremented()
.and_then(|h| s.get(h).ok())
.and_then(|h| s.cached_get(h).ok())
.flatten()
.map_or(d, |prev_d| {
let prev_d = *prev_d;
@@ -365,7 +365,7 @@ impl Vecs {
let starting_dateindex = self
.height_to_dateindex
.get(starting_indexes.height.decremented().unwrap_or_default())?
.cached_get(starting_indexes.height.decremented().unwrap_or_default())?
.map_or_else(Default::default, |v| v.into_inner());
self.height_to_dateindex.compute_transform(
@@ -377,7 +377,7 @@ impl Vecs {
let starting_dateindex = if let Some(dateindex) = self
.height_to_dateindex
.get(starting_indexes.height.decremented().unwrap_or_default())?
.cached_get(starting_indexes.height.decremented().unwrap_or_default())?
.map(|v| v.into_inner())
{
starting_dateindex.min(dateindex)
@@ -450,7 +450,7 @@ impl Vecs {
let starting_weekindex = self
.dateindex_to_weekindex
.get(starting_dateindex)?
.cached_get(starting_dateindex)?
.map_or_else(Default::default, |v| v.into_inner());
self.dateindex_to_weekindex.compute_transform(
@@ -485,7 +485,12 @@ impl Vecs {
self.weekindex_to_timestamp.compute_transform(
starting_weekindex,
self.weekindex_to_first_dateindex.mut_vec(),
|(i, d, ..)| (i, *self.dateindex_to_timestamp.get(d).unwrap().unwrap()),
|(i, d, ..)| {
(
i,
*self.dateindex_to_timestamp.cached_get(d).unwrap().unwrap(),
)
},
exit,
)?;
@@ -493,7 +498,7 @@ impl Vecs {
let starting_monthindex = self
.dateindex_to_monthindex
.get(starting_dateindex)?
.cached_get(starting_dateindex)?
.map_or_else(Default::default, |v| v.into_inner());
self.dateindex_to_monthindex.compute_transform(
@@ -530,7 +535,12 @@ impl Vecs {
self.monthindex_to_timestamp.compute_transform(
starting_monthindex,
self.monthindex_to_first_dateindex.mut_vec(),
|(i, d, ..)| (i, *self.dateindex_to_timestamp.get(d).unwrap().unwrap()),
|(i, d, ..)| {
(
i,
*self.dateindex_to_timestamp.cached_get(d).unwrap().unwrap(),
)
},
exit,
)?;
@@ -538,7 +548,7 @@ impl Vecs {
let starting_quarterindex = self
.monthindex_to_quarterindex
.get(starting_monthindex)?
.cached_get(starting_monthindex)?
.map_or_else(Default::default, |v| v.into_inner());
self.monthindex_to_quarterindex.compute_transform(
@@ -575,7 +585,12 @@ impl Vecs {
self.quarterindex_to_timestamp.compute_transform(
starting_quarterindex,
self.quarterindex_to_first_monthindex.mut_vec(),
|(i, m, ..)| (i, *self.monthindex_to_timestamp.get(m).unwrap().unwrap()),
|(i, m, ..)| {
(
i,
*self.monthindex_to_timestamp.cached_get(m).unwrap().unwrap(),
)
},
exit,
)?;
@@ -583,7 +598,7 @@ impl Vecs {
let starting_yearindex = self
.monthindex_to_yearindex
.get(starting_monthindex)?
.cached_get(starting_monthindex)?
.map_or_else(Default::default, |v| v.into_inner());
self.monthindex_to_yearindex.compute_transform(
@@ -620,7 +635,12 @@ impl Vecs {
self.yearindex_to_timestamp.compute_transform(
starting_yearindex,
self.yearindex_to_first_monthindex.mut_vec(),
|(i, m, ..)| (i, *self.monthindex_to_timestamp.get(m).unwrap().unwrap()),
|(i, m, ..)| {
(
i,
*self.monthindex_to_timestamp.cached_get(m).unwrap().unwrap(),
)
},
exit,
)?;
@@ -628,7 +648,7 @@ impl Vecs {
let starting_decadeindex = self
.yearindex_to_decadeindex
.get(starting_yearindex)?
.cached_get(starting_yearindex)?
.map_or_else(Default::default, |v| v.into_inner());
self.yearindex_to_decadeindex.compute_transform(
@@ -663,7 +683,12 @@ impl Vecs {
self.decadeindex_to_timestamp.compute_transform(
starting_decadeindex,
self.decadeindex_to_first_yearindex.mut_vec(),
|(i, y, ..)| (i, *self.yearindex_to_timestamp.get(y).unwrap().unwrap()),
|(i, y, ..)| {
(
i,
*self.yearindex_to_timestamp.cached_get(y).unwrap().unwrap(),
)
},
exit,
)?;
@@ -671,7 +696,7 @@ impl Vecs {
let starting_difficultyepoch = self
.height_to_difficultyepoch
.get(starting_indexes.height)?
.cached_get(starting_indexes.height)?
.map_or_else(Default::default, |v| v.into_inner());
self.height_to_difficultyepoch.compute_transform(
@@ -709,7 +734,11 @@ impl Vecs {
|(i, h, ..)| {
(
i,
*indexer_vecs.height_to_timestamp.get(h).unwrap().unwrap(),
*indexer_vecs
.height_to_timestamp
.cached_get(h)
.unwrap()
.unwrap(),
)
},
exit,
@@ -719,7 +748,7 @@ impl Vecs {
let starting_halvingepoch = self
.height_to_halvingepoch
.get(starting_indexes.height)?
.cached_get(starting_indexes.height)?
.map_or_else(Default::default, |v| v.into_inner());
self.height_to_halvingepoch.compute_transform(
@@ -757,7 +786,7 @@ impl Vecs {
// |(i, h, ..)| {
// (
// i,
// *indexer_vecs.height_to_timestamp.get(h).unwrap().unwrap(),
// *indexer_vecs.height_to_timestamp.cached_get(h).unwrap().unwrap(),
// )
// },
// exit,
@@ -243,8 +243,9 @@ impl Vecs {
.get_height(
h,
t,
h.decremented()
.map(|prev_h| *height_to_timestamp.get(prev_h).unwrap().unwrap()),
h.decremented().map(|prev_h| {
*height_to_timestamp.cached_get(prev_h).unwrap().unwrap()
}),
)
.unwrap();
(h, ohlc)
@@ -470,7 +471,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -479,7 +480,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -488,7 +489,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -516,7 +517,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -525,7 +526,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -534,7 +535,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -562,7 +563,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -571,7 +572,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -580,7 +581,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -608,7 +609,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -617,7 +618,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -626,7 +627,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -654,7 +655,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -663,7 +664,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -672,7 +673,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -704,7 +705,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -713,7 +714,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -722,7 +723,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -182,8 +182,10 @@ impl Vecs {
|(txinindex, txoutindex, slf, other)| {
let value = if txoutindex == Txoutindex::COINBASE {
Sats::ZERO
} else if let Ok(Some(value)) =
indexer_vecs.txoutindex_to_value.mut_vec().get(txoutindex)
} else if let Ok(Some(value)) = indexer_vecs
.txoutindex_to_value
.mut_vec()
.cached_get(txoutindex)
{
*value
} else {
+1 -1
View File
@@ -24,7 +24,7 @@ fn main() -> color_eyre::Result<()> {
let outputs = Path::new("../../_outputs");
let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), false, false)?;
let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), false, true)?;
indexer.import_stores()?;
indexer.import_vecs()?;
+20 -50
View File
@@ -18,7 +18,7 @@ where
T: StoredType,
{
height: Option<Height>,
vec: StoredVec<I, T>,
inner: StoredVec<I, T>,
}
impl<I, T> IndexedVec<I, T>
@@ -31,76 +31,46 @@ where
version: Version,
compressed: Compressed,
) -> brk_vec::Result<Self> {
let mut vec = StoredVec::forced_import(path, version, compressed)?;
let mut inner = StoredVec::forced_import(path, version, compressed)?;
vec.enable_large_cache_if_needed();
inner.enable_large_cache_if_needed();
Ok(Self {
height: Height::try_from(Self::path_height_(path).as_path()).ok(),
vec,
inner,
})
}
#[inline]
pub fn get(&self, index: I) -> Result<Option<Value<'_, T>>> {
self.get_(index.to_usize()?)
self.inner.get(index)
}
#[inline]
fn get_(&self, index: usize) -> Result<Option<Value<'_, T>>> {
self.vec.get_(index)
// match self.vec.index_to_pushed_index(index) {
// Ok(index) => {
// if let Some(index) = index {
// return Ok(self.vec.pushed().get(index).map(|v| Value::Ref(v)));
// }
// }
// Err(Error::IndexTooHigh) => return Ok(None),
// Err(Error::IndexTooLow) => {}
// Err(error) => return Err(error),
// }
// let large_cache_len = self.vec.large_cache_len();
// if large_cache_len != 0 {
// let page_index = Self::index_to_page_index(index);
// let last_index = self.vec.stored_len() - 1;
// let max_page_index = Self::index_to_page_index(last_index);
// let min_page_index = (max_page_index + 1) - large_cache_len;
// if page_index >= min_page_index {
// self.vec
// .pages()
// .unwrap()
// .get(page_index - min_page_index)
// .ok_or(Error::MmapsVecIsTooSmall)?
// .get_or_init(|| self.vec.decode_page(page_index).unwrap())
// .get(index)
// }
// }
// Ok(self.vec.read_(index)?.map(|v| Value::Owned(v)))
pub fn cached_get(&mut self, index: I) -> Result<Option<Value<'_, T>>> {
self.inner.cached_get(index)
}
pub fn iter_from<F>(&mut self, index: I, f: F) -> Result<()>
where
F: FnMut((I, T, &mut dyn DynamicVec<I = I, T = T>)) -> Result<()>,
{
self.vec.iter_from(index, f)
self.inner.iter_from(index, f)
}
#[inline]
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
match self.vec.len().cmp(&index.to_usize()?) {
match self.inner.len().cmp(&index.to_usize()?) {
Ordering::Greater => {
// dbg!(len, index, &self.pathbuf);
// panic!();
Ok(())
}
Ordering::Equal => {
self.vec.push(value);
self.inner.push(value);
Ok(())
}
Ordering::Less => {
dbg!(index, value, self.vec.len(), self.path_height());
dbg!(index, value, self.inner.len(), self.path_height());
Err(Error::IndexTooHigh)
}
}
@@ -110,45 +80,45 @@ where
if self.height.is_none_or(|self_height| self_height != height) {
height.write(&self.path_height())?;
}
self.vec.truncate_if_needed(index)?;
self.inner.truncate_if_needed(index)?;
Ok(())
}
pub fn flush(&mut self, height: Height) -> Result<()> {
height.write(&self.path_height())?;
self.vec.flush()
self.inner.flush()
}
pub fn vec(&self) -> &StoredVec<I, T> {
&self.vec
&self.inner
}
pub fn mut_vec(&mut self) -> &mut StoredVec<I, T> {
&mut self.vec
&mut self.inner
}
pub fn any_vec(&self) -> &dyn brk_vec::AnyStoredVec {
&self.vec
&self.inner
}
pub fn len(&self) -> usize {
self.vec.len()
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.vec.is_empty()
self.inner.is_empty()
}
#[inline]
pub fn hasnt(&self, index: I) -> Result<bool> {
self.vec.has(index).map(|b| !b)
self.inner.has(index).map(|b| !b)
}
pub fn height(&self) -> brk_core::Result<Height> {
Height::try_from(self.path_height().as_path())
}
fn path_height(&self) -> PathBuf {
Self::path_height_(self.vec.path())
Self::path_height_(self.inner.path())
}
fn path_height_(path: &Path) -> PathBuf {
path.join("height")
-1
View File
@@ -374,7 +374,6 @@ impl Vecs {
pub fn rollback_if_needed(&mut self, starting_indexes: &Indexes) -> brk_vec::Result<()> {
let saved_height = starting_indexes.height.decremented().unwrap_or_default();
// Now we can cut everything that's out of date
let &Indexes {
addressindex,
height,
+3 -3
View File
@@ -6,7 +6,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = fs::remove_dir_all("./vec");
let version = Version::ZERO;
let compressed = Compressed::NO;
let compressed = Compressed::YES;
{
let mut vec: StoredVec<usize, u32> =
@@ -48,7 +48,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut vec: StoredVec<usize, u32> =
StoredVec::forced_import(Path::new("./vec"), version, compressed)?;
// vec.enable_large_cache_if_possible();
vec.enable_large_cache_if_needed();
dbg!(vec.get(0)?);
dbg!(vec.get(20)?);
@@ -71,7 +71,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
})?;
dbg!(vec.collect_range(Some(-5), None)?);
dbg!(vec.collect_signed_range(Some(-5), None)?);
}
Ok(())
+36 -29
View File
@@ -63,10 +63,40 @@ where
type T = T;
#[inline]
fn get_(&self, index: usize) -> Result<Option<Value<T>>> {
fn get_stored_(&self, index: usize, guard: &Mmap) -> Result<Option<T>> {
match self {
StoredVec::Raw(v) => v.get_(index),
StoredVec::Compressed(v) => v.get_(index),
StoredVec::Raw(v) => v.get_stored_(index, guard),
StoredVec::Compressed(v) => v.get_stored_(index, guard),
}
}
#[inline]
fn cached_get_stored_(&mut self, index: usize, guard: &Mmap) -> Result<Option<T>> {
match self {
StoredVec::Raw(v) => v.cached_get_stored_(index, guard),
StoredVec::Compressed(v) => v.cached_get_stored_(index, guard),
}
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
match self {
StoredVec::Raw(v) => v.mmap(),
StoredVec::Compressed(v) => v.mmap(),
}
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
match self {
StoredVec::Raw(v) => v.guard(),
StoredVec::Compressed(v) => v.guard(),
}
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
match self {
StoredVec::Raw(v) => v.mut_guard(),
StoredVec::Compressed(v) => v.mut_guard(),
}
}
@@ -96,8 +126,8 @@ where
impl<I, T> GenericVec<I, T> for StoredVec<I, T>
where
I: StoredIndex + Send + Sync,
T: StoredType + Send + Sync,
I: StoredIndex,
T: StoredType,
{
fn iter_from<F>(&mut self, index: I, f: F) -> Result<()>
where
@@ -109,7 +139,7 @@ where
}
}
fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Self::T>> {
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<Self::T>> {
match self {
StoredVec::Raw(v) => v.collect_range(from, to),
StoredVec::Compressed(v) => v.collect_range(from, to),
@@ -130,29 +160,6 @@ where
}
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
match self {
StoredVec::Raw(v) => v.mmap(),
StoredVec::Compressed(v) => v.mmap(),
}
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
match self {
StoredVec::Raw(v) => v.guard(),
StoredVec::Compressed(v) => v.guard(),
}
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
match self {
StoredVec::Raw(v) => v.mut_guard(),
StoredVec::Compressed(v) => v.mut_guard(),
}
}
#[inline]
fn path(&self) -> &Path {
match self {
@@ -22,8 +22,10 @@ impl CompressedPagesMetadata {
const PAGE_SIZE: usize = size_of::<CompressedPageMetadata>();
pub fn read(path: &Path) -> Result<CompressedPagesMetadata> {
let path = path.join("pages_meta");
let slf = Self {
vec: fs::read(path)
vec: fs::read(&path)
.unwrap_or_default()
.chunks(Self::PAGE_SIZE)
.map(|bytes| {
+79 -2
View File
@@ -1,4 +1,9 @@
use crate::{Result, Value};
use std::sync::Arc;
use arc_swap::{ArcSwap, Guard};
use memmap2::Mmap;
use crate::{Error, Result, Value};
use super::{StoredIndex, StoredType};
@@ -10,7 +15,28 @@ pub trait DynamicVec: Send + Sync {
fn get(&self, index: Self::I) -> Result<Option<Value<Self::T>>> {
self.get_(index.to_usize()?)
}
fn get_(&self, index: usize) -> Result<Option<Value<Self::T>>>;
#[inline]
fn cached_get(&mut self, index: Self::I) -> Result<Option<Value<Self::T>>> {
self.get_(index.to_usize()?)
}
#[inline]
fn get_(&self, index: usize) -> Result<Option<Value<Self::T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed().get(index).map(Value::Ref));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
Ok(self
.get_stored_(index.to_usize()?, self.guard().as_ref().unwrap())?
.map(Value::Owned))
}
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<Self::T>>;
fn get_last(&self) -> Result<Option<Value<Self::T>>> {
let len = self.len();
if len == 0 {
@@ -18,6 +44,33 @@ pub trait DynamicVec: Send + Sync {
}
self.get_(len - 1)
}
#[inline]
fn cached_get_(&mut self, index: usize) -> Result<Option<Value<Self::T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed().get(index).map(Value::Ref));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
let mmap = Arc::clone(self.guard().as_ref().unwrap());
Ok(self
.cached_get_stored_(index.to_usize()?, &mmap)?
.map(Value::Owned))
}
fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result<Option<Self::T>>;
fn cached_get_last(&mut self) -> Result<Option<Value<Self::T>>> {
let len = self.len();
if len == 0 {
return Ok(None);
}
self.cached_get_(len - 1)
}
#[inline]
fn len(&self) -> usize {
@@ -28,6 +81,15 @@ pub trait DynamicVec: Send + Sync {
self.len() == 0
}
fn mmap(&self) -> &ArcSwap<Mmap>;
#[inline]
fn new_guard(&self) -> Guard<Arc<Mmap>> {
self.mmap().load()
}
fn guard(&self) -> &Option<Guard<Arc<Mmap>>>;
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>>;
fn stored_len(&self) -> usize;
fn pushed(&self) -> &[Self::T];
@@ -40,4 +102,19 @@ pub trait DynamicVec: Send + Sync {
fn push(&mut self, value: Self::T) {
self.mut_pushed().push(value)
}
#[inline]
fn index_to_pushed_index(&self, index: usize) -> Result<Option<usize>> {
let stored_len = self.stored_len();
if index >= stored_len {
let index = index - stored_len;
if index >= self.pushed_len() {
Err(Error::IndexTooHigh)
} else {
Ok(Some(index))
}
} else {
Err(Error::IndexTooLow)
}
}
}
+40 -45
View File
@@ -5,7 +5,6 @@ use std::{
sync::Arc,
};
use arc_swap::{ArcSwap, Guard};
use axum::{
Json,
response::{IntoResponse, Response},
@@ -38,9 +37,13 @@ where
fn file_set_len(&mut self, len: u64) -> Result<()> {
let mut file = self.open_file()?;
Self::file_set_len_(&mut file, len)?;
self.update_mmap(file)
}
fn file_set_len_(file: &mut File, len: u64) -> Result<()> {
file.set_len(len)?;
file.seek(SeekFrom::End(0))?;
self.update_mmap(file)
Ok(())
}
fn file_write_all(&mut self, buf: &[u8]) -> Result<()> {
@@ -49,34 +52,31 @@ where
self.update_mmap(file)
}
fn file_truncate_and_write_all(&mut self, len: u64, buf: &[u8]) -> Result<()> {
let mut file = self.open_file()?;
Self::file_set_len_(&mut file, len)?;
file.write_all(buf)?;
self.update_mmap(file)
}
#[inline]
fn reset(&mut self) -> Result<()> {
self.file_write_all(&[])?;
Ok(())
}
fn mmap(&self) -> &ArcSwap<Mmap>;
#[inline]
fn new_guard(&self) -> Guard<Arc<Mmap>> {
self.mmap().load()
}
fn guard(&self) -> &Option<Guard<Arc<Mmap>>>;
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>>;
fn new_mmap(file: File) -> Result<Arc<Mmap>> {
Ok(Arc::new(unsafe { Mmap::map(&file)? }))
}
fn update_mmap(&mut self, file: File) -> Result<()> {
file.sync_all()?;
let mmap = Self::new_mmap(file)?;
self.mmap().store(mmap);
if self.guard().is_some() {
let guard = self.new_guard();
self.mut_guard().replace(guard);
} else {
unreachable!()
unreachable!("This function shouldn't be called in a cloned instance")
}
Ok(())
}
@@ -86,22 +86,6 @@ where
self.pushed_len() == 0
}
#[inline]
fn index_to_pushed_index(&self, index: usize) -> Result<Option<usize>> {
let stored_len = self.stored_len();
if index >= stored_len {
let index = index - stored_len;
if index >= self.pushed_len() {
Err(Error::IndexTooHigh)
} else {
Ok(Some(index))
}
} else {
Err(Error::IndexTooLow)
}
}
#[inline]
fn has(&self, index: Self::I) -> Result<bool> {
Ok(self.has_(index.to_usize()?))
@@ -140,27 +124,33 @@ where
),
) -> Result<()>;
fn fix_i64(i: i64, len: usize, from: bool) -> usize {
fn flush(&mut self) -> Result<()>;
fn truncate_if_needed(&mut self, index: Self::I) -> Result<()>;
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<Self::T>>;
#[inline]
fn collect_inclusive_range(&self, from: I, to: I) -> Result<Vec<Self::T>> {
self.collect_range(Some(from.to_usize()?), Some(to.to_usize()? + 1))
}
#[inline]
fn i64_to_usize(i: i64, len: usize) -> usize {
if i >= 0 {
let v = i as usize;
if v < len {
v
} else if from {
len - 1
} else {
len
}
i as usize
} else {
let v = len as i64 + i;
if v < 0 { 0 } else { v as usize }
}
}
fn flush(&mut self) -> Result<()>;
fn truncate_if_needed(&mut self, index: Self::I) -> Result<()>;
fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Self::T>>;
fn collect_signed_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Self::T>> {
let len = self.len();
let from = from.map(|i| Self::i64_to_usize(i, len));
let to = to.map(|i| Self::i64_to_usize(i, len));
self.collect_range(from, to)
}
#[inline]
fn collect_range_axum_json(
@@ -168,12 +158,12 @@ where
from: Option<i64>,
to: Option<i64>,
) -> Result<Json<Vec<Self::T>>> {
Ok(Json(self.collect_range(from, to)?))
Ok(Json(self.collect_signed_range(from, to)?))
}
#[inline]
fn collect_range_serde_json(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Value>> {
self.collect_range(from, to)?
self.collect_signed_range(from, to)?
.into_iter()
.map(|v| serde_json::to_value(v).map_err(Error::from))
.collect::<Result<Vec<_>>>()
@@ -200,6 +190,11 @@ where
path.join("version")
}
#[inline]
fn path_compressed_(path: &Path) -> PathBuf {
path.join("compressed")
}
#[inline]
fn file_name(&self) -> String {
self.path()
+194 -56
View File
@@ -1,5 +1,6 @@
use std::{
fs, mem,
fs::{self, File},
mem,
path::Path,
sync::{Arc, OnceLock},
};
@@ -11,7 +12,7 @@ use zstd::DEFAULT_COMPRESSION_LEVEL;
use crate::{
CompressedPageMetadata, CompressedPagesMetadata, DynamicVec, Error, GenericVec, RawVec, Result,
StoredIndex, StoredType, UnsafeSlice, Value, Version,
StoredIndex, StoredType, UnsafeSlice, Version,
};
const ONE_KIB: usize = 1024;
@@ -54,6 +55,20 @@ where
}
pub fn import(path: &Path, version: Version) -> Result<Self> {
fs::create_dir_all(path)?;
let vec_exists = fs::exists(Self::path_vec_(path)).is_ok_and(|b| b);
let compressed_path = Self::path_compressed_(path);
let compressed_exists = fs::exists(&compressed_path).is_ok_and(|b| b);
if vec_exists && !compressed_exists {
return Err(Error::DifferentCompressionMode);
}
if !vec_exists && !compressed_exists {
File::create(&compressed_path)?;
}
Ok(Self {
inner: RawVec::import(path, version)?,
decoded_page: None,
@@ -62,13 +77,30 @@ where
})
}
pub fn decode_page(&self, page_index: usize) -> Result<Vec<T>> {
Self::decode_page_(
self.stored_len(),
page_index,
self.guard().as_ref().unwrap(),
&self.pages_meta.load(),
)
fn cached_get_stored__(
index: usize,
mmap: &Mmap,
stored_len: usize,
decoded_page: &mut Option<(usize, Vec<T>)>,
compressed_pages_meta: &CompressedPagesMetadata,
) -> Result<Option<T>> {
let page_index = Self::index_to_page_index(index);
if decoded_page.as_ref().is_none_or(|b| b.0 != page_index) {
let values = Self::decode_page_(stored_len, page_index, mmap, compressed_pages_meta)?;
decoded_page.replace((page_index, values));
}
Ok(decoded_page
.as_ref()
.unwrap()
.1
.get(index % Self::PER_PAGE)
.cloned())
}
fn decode_page(&self, page_index: usize, mmap: &Mmap) -> Result<Vec<T>> {
Self::decode_page_(self.stored_len(), page_index, mmap, &self.pages_meta.load())
}
fn decode_page_(
@@ -96,7 +128,7 @@ where
.collect::<Vec<_>>())
}
fn compress_page(chunk: &[T]) -> Box<[u8]> {
fn compress_page(chunk: &[T]) -> Vec<u8> {
if chunk.len() > Self::PER_PAGE {
panic!();
}
@@ -110,9 +142,7 @@ where
.enumerate()
.for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes()));
zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL)
.unwrap()
.into_boxed_slice()
zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL).unwrap()
}
pub fn enable_large_cache(&mut self) {
@@ -174,39 +204,70 @@ where
type T = T;
#[inline]
fn get_(&self, index: usize) -> Result<Option<Value<T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed().get(index).map(|v| Value::Ref(v)));
}
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let cached_start = self
.stored_len()
.checked_sub(Self::CACHE_LENGTH)
.unwrap_or_default();
let decoded_index = index % Self::PER_PAGE;
if index >= cached_start {
let trimmed_index = index - cached_start;
if let Some(decoded_pages) = self.decoded_pages.as_ref() {
let decoded_page = decoded_pages
.get(Self::index_to_page_index(trimmed_index))
.unwrap();
return Ok(decoded_page
.get_or_init(|| {
self.decode_page(Self::index_to_page_index(index), mmap)
.unwrap()
})
.get(decoded_index)
.cloned());
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
let page_index = Self::index_to_page_index(index);
// if self.page().is_none_or(|b| b.0 != page_index) {
// let values = self.decode_page(page_index)?;
// self.mut_page().replace((page_index, values));
// }
Ok(self
.decode_page(page_index, mmap)?
.get(decoded_index)
.cloned())
}
#[inline]
fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
Self::cached_get_stored__(
index,
mmap,
self.stored_len(),
&mut self.decoded_page,
&self.pages_meta.load(),
)
}
// self.page().unwrap().1.get(index)
//
todo!();
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
self.inner.mmap()
}
// let v = self.inner.guard().as_ref().map_or_else(
// || Self::guard_to_value(&self.new_guard(), index),
// |guard| Self::guard_to_value(guard, index),
// );
// Ok(Some(Value::Owned(v)))
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
self.inner.guard()
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
self.inner.mut_guard()
}
fn stored_len(&self) -> usize {
todo!()
let pages_meta = self.pages_meta.load();
if let Some(last) = pages_meta.last() {
(pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize
} else {
0
}
}
#[inline]
@@ -224,15 +285,61 @@ where
I: StoredIndex,
T: StoredType,
{
fn iter_from<F>(&mut self, _index: I, _f: F) -> Result<()>
fn iter_from<F>(&mut self, index: I, mut f: F) -> Result<()>
where
F: FnMut((I, T, &mut dyn DynamicVec<I = Self::I, T = Self::T>)) -> Result<()>,
{
todo!()
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let start = index.to_usize()?;
let stored_len = self.stored_len();
if start >= stored_len {
return Ok(());
}
let mmap = self.mmap().load();
let pages_meta = self.pages_meta.load();
(start..stored_len).try_for_each(|index| {
let v = Self::cached_get_stored__(
index,
&mmap,
stored_len,
&mut self.decoded_page,
&pages_meta,
)?
.unwrap();
f((I::from(index), v, self as &mut dyn DynamicVec<I = I, T = T>))
})
}
fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Self::T>> {
todo!()
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<Self::T>> {
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let stored_len = self.stored_len();
let from = from.unwrap_or_default();
let to = to.map_or(stored_len, |i| i.min(stored_len));
if from >= stored_len {
return Ok(vec![]);
}
let mmap = self.mmap().load();
let pages_meta = self.pages_meta.load();
let mut decoded_page: Option<(usize, Vec<T>)> = None;
(from..to)
.map(|index| {
Self::cached_get_stored__(index, &mmap, stored_len, &mut decoded_page, &pages_meta)
.map(|opt| opt.unwrap())
})
.collect::<Result<Vec<_>>>()
}
fn flush(&mut self) -> Result<()> {
@@ -260,7 +367,7 @@ where
values = if let Some(values) = self
.decoded_pages
.as_mut()
.and_then(|big_cache| big_cache.last_mut().and_then(|lock| lock.take()))
.and_then(|v| v.last_mut().and_then(|lock| lock.take()))
{
values
} else if self
@@ -314,13 +421,13 @@ where
pages_meta.push(page_index, page);
});
pages_meta.write()?;
let buf = compressed
.into_iter()
.flat_map(|(v, _)| v)
.collect::<Vec<_>>();
pages_meta.write()?;
if let Some(truncate_at) = truncate_at {
self.file_set_len(truncate_at)?;
}
@@ -335,21 +442,52 @@ where
}
fn truncate_if_needed(&mut self, index: I) -> Result<()> {
todo!()
}
let index = index.to_usize()?;
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
self.inner.mmap()
}
if index >= self.stored_len() {
return Ok(());
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
self.inner.guard()
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
self.inner.mut_guard()
if index == 0 {
self.reset()?;
return Ok(());
}
let page_index = Self::index_to_page_index(index);
let guard = self.guard().as_ref().unwrap();
let mut pages_meta = (**self.pages_meta.load()).clone();
let values = self.decode_page(page_index, guard)?;
let mut buf = vec![];
let mut page = pages_meta.truncate(page_index).unwrap();
let len = page.start;
let decoded_index = index % Self::PER_PAGE;
if decoded_index != 0 {
let chunk = &values[..decoded_index];
buf = Self::compress_page(chunk);
page.values_len = chunk.len() as u32;
page.bytes_len = buf.len() as u32;
pages_meta.push(page_index, page);
}
pages_meta.write()?;
self.pages_meta.store(Arc::new(pages_meta));
self.file_truncate_and_write_all(len, &buf)?;
self.reset_caches();
Ok(())
}
#[inline]
+51 -61
View File
@@ -10,9 +10,7 @@ use arc_swap::{ArcSwap, Guard};
use memmap2::Mmap;
use rayon::prelude::*;
use crate::{
DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Value, Version,
};
use crate::{DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Version};
#[derive(Debug)]
pub struct RawVec<I, T> {
@@ -73,25 +71,30 @@ where
type T = T;
#[inline]
fn get_(&self, index: usize) -> Result<Option<Value<T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed().get(index).map(|v| Value::Ref(v)));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
let guard = self.guard.as_ref().unwrap();
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let index = index * Self::SIZE_OF_T;
let slice = &guard[index..(index + Self::SIZE_OF_T)];
let slice = &mmap[index..(index + Self::SIZE_OF_T)];
Self::T::try_read_from_bytes(slice)
.map(|v| Some(v))
.map_err(Error::from)
}
#[inline]
fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
self.get_stored_(index, mmap)
}
let v = Self::T::try_read_from_bytes(slice)?;
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
&self.mmap
}
Ok(Some(Value::Owned(v)))
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
&self.guard
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
&mut self.guard
}
#[inline]
@@ -126,21 +129,40 @@ where
return Err(Error::UnsupportedUnflushedState);
}
let start = index.to_usize()?;
let stored_len = self.stored_len();
if start >= stored_len {
return Ok(());
}
let guard = self.mmap.load();
let start = index.to_usize()? * Self::SIZE_OF_T;
(start..stored_len).try_for_each(|index| {
let v = self.get_stored_(index, &guard)?.unwrap();
f((I::from(index), v, self as &mut dyn DynamicVec<I = I, T = T>))
})
}
dbg!(self.path());
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<T>> {
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
guard[start..]
.chunks(Self::SIZE_OF_T)
.enumerate()
.try_for_each(|(i, chunk)| {
let v = T::try_read_from_bytes(chunk).unwrap();
f((I::from(i), v, self as &mut dyn DynamicVec<I = I, T = T>))
})?;
let stored_len = self.stored_len();
Ok(())
let from = from.unwrap_or_default();
let to = to.map_or(stored_len, |i| i.min(stored_len));
if from >= stored_len {
return Ok(vec![]);
}
let mmap = self.mmap.load();
(from..to)
.map(|index| self.get_stored_(index, &mmap).map(|opt| opt.unwrap()))
.collect::<Result<Vec<_>>>()
}
fn flush(&mut self) -> Result<()> {
@@ -189,38 +211,6 @@ where
Ok(())
}
fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<T>> {
let guard = self.mmap.load();
let len = guard.len() / Self::SIZE_OF_T;
if len == 0 {
return Ok(vec![]);
}
let from = from.map_or(0, |i| Self::fix_i64(i, len, true));
let to = to.map_or(len, |i| Self::fix_i64(i, len, false));
Ok(guard[from * Self::SIZE_OF_T..to * Self::SIZE_OF_T]
.chunks(Self::SIZE_OF_T)
.map(|chunk| T::try_read_from_bytes(chunk).unwrap())
.collect::<Vec<_>>())
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
&self.mmap
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
&self.guard
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
&mut self.guard
}
#[inline]
fn path(&self) -> &Path {
self.pathbuf.as_path()