vec: caching only in iter

This commit is contained in:
nym21
2025-04-30 18:29:18 +02:00
parent 664b125ce2
commit 700352ec45
22 changed files with 411 additions and 573 deletions
+10 -35
View File
@@ -8,15 +8,12 @@ mod structs;
mod traits;
mod variants;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use std::path::{Path, PathBuf};
use arc_swap::{ArcSwap, Guard};
use arc_swap::ArcSwap;
use axum::response::Response;
pub use enums::*;
use memmap2::Mmap;
pub use memmap2::Mmap;
pub use structs::*;
pub use traits::*;
use variants::*;
@@ -46,13 +43,6 @@ where
}
}
pub fn enable_large_cache_if_needed(&mut self) {
match self {
StoredVec::Compressed(v) => v.enable_large_cache(),
Self::Raw(_) => {}
}
}
pub fn iter(&self) -> StoredVecIterator<'_, I, T> {
self.into_iter()
}
@@ -73,10 +63,10 @@ where
type T = T;
#[inline]
fn get_stored_(&self, index: usize, guard: &Mmap) -> Result<Option<T>> {
fn read_(&self, index: usize, guard: &Mmap) -> Result<Option<T>> {
match self {
StoredVec::Raw(v) => v.get_stored_(index, guard),
StoredVec::Compressed(v) => v.get_stored_(index, guard),
StoredVec::Raw(v) => v.read_(index, guard),
StoredVec::Compressed(v) => v.read_(index, guard),
}
}
@@ -88,21 +78,6 @@ where
}
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
match self {
StoredVec::Raw(v) => v.guard(),
StoredVec::Compressed(v) => v.guard(),
}
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
match self {
StoredVec::Raw(v) => v.mut_guard(),
StoredVec::Compressed(v) => v.mut_guard(),
}
}
#[inline]
fn stored_len(&self) -> usize {
match self {
@@ -250,21 +225,21 @@ where
{
#[inline]
pub fn unwrap_get_inner(&mut self, i: I) -> T {
self.get_(i.unwrap_to_usize()).unwrap().1.into_inner()
self.get_(i.unwrap_to_usize()).unwrap().into_inner()
}
#[inline]
pub fn get_inner(&mut self, i: I) -> Option<T> {
self.get_(i.unwrap_to_usize()).map(|(_, v)| v.into_inner())
self.get_(i.unwrap_to_usize()).map(|v| v.into_inner())
}
#[inline]
pub fn get(&mut self, i: I) -> Option<(I, Value<'_, T>)> {
pub fn get(&mut self, i: I) -> Option<Value<'_, T>> {
self.get_(i.unwrap_to_usize())
}
#[inline]
pub fn get_(&mut self, i: usize) -> Option<(I, Value<'_, T>)> {
pub fn get_(&mut self, i: usize) -> Option<Value<'_, T>> {
match self {
Self::Compressed(iter) => iter.get_(i),
Self::Raw(iter) => iter.get_(i),
+23 -44
View File
@@ -1,45 +1,42 @@
use std::{path::Path, sync::Arc};
use std::path::Path;
use arc_swap::{ArcSwap, Guard};
use arc_swap::ArcSwap;
use memmap2::Mmap;
use crate::{Error, Result, Value};
use crate::{Result, Value};
use super::{StoredIndex, StoredType};
pub trait DynamicVec: Send + Sync {
type I: StoredIndex;
type T: StoredType;
const SIZE_OF_T: usize = size_of::<Self::T>();
#[inline]
fn get(&self, index: Self::I) -> Result<Option<Value<Self::T>>> {
self.get_(index.to_usize()?)
fn read(&self, index: Self::I, mmap: &Mmap) -> Result<Option<Self::T>> {
self.read_(index.to_usize()?, mmap)
}
fn read_(&self, index: usize, mmap: &Mmap) -> Result<Option<Self::T>>;
#[inline]
fn get_or_read(&self, index: Self::I, mmap: &Mmap) -> Result<Option<Value<Self::T>>> {
self.get_or_read_(index.to_usize()?, mmap)
}
#[inline]
fn get_(&self, index: usize) -> Result<Option<Value<Self::T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed().get(index).map(Value::Ref));
}
fn get_or_read_(&self, index: usize, mmap: &Mmap) -> Result<Option<Value<Self::T>>> {
let stored_len = mmap.len() / Self::SIZE_OF_T;
if index >= stored_len {
let pushed = self.pushed();
let j = index - stored_len;
if j >= pushed.len() {
return Ok(None);
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
Ok(pushed.get(j).map(Value::Ref))
} else {
Ok(self.read_(index, mmap)?.map(Value::Owned))
}
Ok(self
.get_stored_(index.to_usize()?, self.guard().as_ref().unwrap())?
.map(Value::Owned))
}
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<Self::T>>;
// fn last(&self) -> Result<Option<Value<Self::T>>> {
// let len = self.len();
// if len == 0 {
// return Ok(None);
// }
// self.get_(len - 1)
// }
#[inline]
fn len(&self) -> usize {
@@ -52,9 +49,6 @@ pub trait DynamicVec: Send + Sync {
fn mmap(&self) -> &ArcSwap<Mmap>;
fn guard(&self) -> &Option<Guard<Arc<Mmap>>>;
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>>;
fn stored_len(&self) -> usize;
fn pushed(&self) -> &[Self::T];
@@ -67,21 +61,6 @@ pub trait DynamicVec: Send + Sync {
fn push(&mut self, value: Self::T) {
self.mut_pushed().push(value)
}
#[inline]
fn index_to_pushed_index(&self, index: usize) -> Result<Option<usize>> {
let stored_len = self.stored_len();
if index >= stored_len {
let index = index - stored_len;
if index >= self.pushed_len() {
Err(Error::IndexTooHigh)
} else {
Ok(Some(index))
}
} else {
Err(Error::IndexTooLow)
}
}
fn path(&self) -> &Path;
}
-8
View File
@@ -20,8 +20,6 @@ where
I: StoredIndex,
T: StoredType,
{
const SIZE_OF_T: usize = size_of::<Self::T>();
fn open_file(&self) -> io::Result<File> {
Self::open_file_(&self.path_vec())
}
@@ -70,12 +68,6 @@ where
fn update_mmap(&mut self, file: File) -> Result<()> {
let mmap = Self::new_mmap(file)?;
self.mmap().store(mmap);
if self.guard().is_some() {
let guard = self.mmap().load();
self.mut_guard().replace(guard);
} else {
unreachable!("This function shouldn't be called in a cloned instance")
}
Ok(())
}
+43 -134
View File
@@ -2,7 +2,7 @@ use std::{
fs::{self, File},
mem,
path::Path,
sync::{Arc, OnceLock},
sync::Arc,
};
use arc_swap::{ArcSwap, Guard};
@@ -23,7 +23,6 @@ pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB;
#[derive(Debug)]
pub struct CompressedVec<I, T> {
inner: RawVec<I, T>,
decoded_pages: Option<Vec<OnceLock<Vec<T>>>>,
pages_meta: Arc<ArcSwap<CompressedPagesMetadata>>,
}
@@ -67,33 +66,10 @@ where
Ok(Self {
inner: RawVec::import(path, version)?,
decoded_pages: None,
pages_meta: Arc::new(ArcSwap::new(Arc::new(CompressedPagesMetadata::read(path)?))),
})
}
fn cached_get_stored__(
index: usize,
mmap: &Mmap,
stored_len: usize,
decoded_page: &mut Option<(usize, Vec<T>)>,
compressed_pages_meta: &CompressedPagesMetadata,
) -> Result<Option<T>> {
let page_index = Self::index_to_page_index(index);
if decoded_page.as_ref().is_none_or(|b| b.0 != page_index) {
let values = Self::decode_page_(stored_len, page_index, mmap, compressed_pages_meta)?;
decoded_page.replace((page_index, values));
}
Ok(decoded_page
.as_ref()
.unwrap()
.1
.get(index % Self::PER_PAGE)
.cloned())
}
fn decode_page(&self, page_index: usize, mmap: &Mmap) -> Result<Vec<T>> {
Self::decode_page_(self.stored_len(), page_index, mmap, &self.pages_meta.load())
}
@@ -140,40 +116,6 @@ where
zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL).unwrap()
}
pub fn enable_large_cache(&mut self) {
self.decoded_pages.replace(vec![]);
self.reset_large_cache();
}
pub fn disable_large_cache(&mut self) {
self.decoded_pages.take();
}
fn reset_large_cache(&mut self) {
let stored_len = self.stored_len();
if let Some(pages) = self.decoded_pages.as_mut() {
pages.par_iter_mut().for_each(|lock| {
lock.take();
});
let len = (stored_len as f64 / Self::PER_PAGE as f64).ceil() as usize;
let len = Self::CACHE_LENGTH.min(len);
if pages.len() != len {
pages.resize_with(len, Default::default);
}
}
}
pub fn large_cache_len(&self) -> usize {
self.decoded_pages.as_ref().map_or(0, |v| v.len())
}
fn reset_caches(&mut self) {
self.reset_large_cache();
}
#[inline(always)]
fn index_to_page_index(index: usize) -> usize {
index / Self::PER_PAGE
@@ -219,32 +161,9 @@ where
type T = T;
#[inline]
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let cached_start = self
.stored_len()
.checked_sub(Self::CACHE_LENGTH)
.unwrap_or_default();
let decoded_index = index % Self::PER_PAGE;
if index >= cached_start {
let trimmed_index = index - cached_start;
if let Some(decoded_pages) = self.decoded_pages.as_ref() {
let decoded_page = decoded_pages
.get(Self::index_to_page_index(trimmed_index))
.unwrap();
return Ok(decoded_page
.get_or_init(|| {
self.decode_page(Self::index_to_page_index(index), mmap)
.unwrap()
})
.get(decoded_index)
.cloned());
}
}
fn read_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let page_index = Self::index_to_page_index(index);
let decoded_index = index % Self::PER_PAGE;
Ok(self
.decode_page(page_index, mmap)?
@@ -258,14 +177,6 @@ where
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
self.inner.guard()
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
self.inner.mut_guard()
}
fn stored_len(&self) -> usize {
Self::stored_len_(&self.pages_meta.load())
}
@@ -328,24 +239,16 @@ where
let last_page_index = pages_meta.len() - 1;
values = if let Some(values) = self
.decoded_pages
.as_mut()
.and_then(|v| v.last_mut().and_then(|lock| lock.take()))
{
values
} else {
Self::decode_page_(
stored_len,
last_page_index,
self.guard().as_ref().unwrap(),
&pages_meta,
)
.inspect_err(|_| {
dbg!(last_page_index, &pages_meta);
})
.unwrap()
};
values = Self::decode_page_(
stored_len,
last_page_index,
&self.mmap().load(),
&pages_meta,
)
.inspect_err(|_| {
dbg!(last_page_index, &pages_meta);
})
.unwrap();
truncate_at.replace(pages_meta.pop().unwrap().start);
starting_page_index = last_page_index;
@@ -394,8 +297,6 @@ where
self.pages_meta.store(Arc::new(pages_meta));
self.reset_caches();
Ok(())
}
@@ -404,7 +305,6 @@ where
pages_meta.truncate(0);
pages_meta.write()?;
self.pages_meta.store(Arc::new(pages_meta));
self.reset_caches();
self.file_truncate_and_write_all(0, &[])
}
@@ -424,9 +324,7 @@ where
let page_index = Self::index_to_page_index(index);
let guard = self.guard().as_ref().unwrap();
let values = self.decode_page(page_index, guard)?;
let values = self.decode_page(page_index, &self.mmap().load())?;
let mut buf = vec![];
let mut page = pages_meta.truncate(page_index).unwrap();
@@ -452,8 +350,6 @@ where
self.file_truncate_and_write_all(len, &buf)?;
self.reset_caches();
Ok(())
}
@@ -471,7 +367,6 @@ where
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
decoded_pages: None,
pages_meta: self.pages_meta.clone(),
}
}
@@ -493,6 +388,9 @@ where
I: StoredIndex,
T: StoredType,
{
const SIZE_OF_T: usize = size_of::<T>();
const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T;
#[inline]
pub fn set(&mut self, i: I) -> &mut Self {
self.index = i.unwrap_to_usize();
@@ -505,14 +403,14 @@ where
}
#[inline]
pub fn get(&mut self, i: I) -> Option<(I, Value<'_, T>)> {
self.set(i).next()
pub fn get(&mut self, i: I) -> Option<Value<'_, T>> {
self.set(i).next().map(|(_, v)| v)
}
#[inline]
pub fn get_(&mut self, i: usize) -> Option<(I, Value<'_, T>)> {
pub fn get_(&mut self, i: usize) -> Option<Value<'_, T>> {
self.set_(i);
self.next()
self.next().map(|(_, v)| v)
}
}
@@ -537,15 +435,25 @@ where
.get(j)
.map(|v| (I::from(i), Value::Ref(v)))
} else {
CompressedVec::<I, T>::cached_get_stored__(
i,
mmap,
stored_len,
&mut self.decoded_page,
&self.pages_meta,
)
.unwrap()
.map(|v| (I::from(i), Value::Owned(v)))
let page_index = i / Self::PER_PAGE;
if self.decoded_page.as_ref().is_none_or(|b| b.0 != page_index) {
let values = CompressedVec::<I, T>::decode_page_(
stored_len,
page_index,
mmap,
&self.pages_meta,
)
.unwrap();
self.decoded_page.replace((page_index, values));
}
self.decoded_page
.as_ref()
.unwrap()
.1
.get(i % Self::PER_PAGE)
.map(|v| (I::from(i), Value::Owned(v.clone())))
};
self.index += 1;
@@ -562,8 +470,9 @@ where
if len == 0 {
return None;
}
self.get_(len - 1)
.map(|(i, v)| (i, Value::Owned(v.into_inner())))
let i = len - 1;
self.get_(i)
.map(|v| (I::from(i), Value::Owned(v.into_inner())))
}
}
+19 -45
View File
@@ -20,7 +20,6 @@ pub struct RawVec<I, T> {
pathbuf: PathBuf,
// Consider Arc<ArcSwap<Option<Mmap>>> for dataraces when reorg ?
mmap: Arc<ArcSwap<Mmap>>,
guard: Option<Guard<Arc<Mmap>>>,
pushed: Vec<T>,
phantom: PhantomData<I>,
}
@@ -51,11 +50,9 @@ where
let file = Self::open_file_(Self::path_vec_(path).as_path())?;
let mmap = Arc::new(ArcSwap::new(Self::new_mmap(file)?));
let guard = Some(mmap.load());
Ok(Self {
mmap,
guard,
version,
pathbuf: path.to_owned(),
pushed: vec![],
@@ -90,7 +87,7 @@ where
type T = T;
#[inline]
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
fn read_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let index = index * Self::SIZE_OF_T;
let slice = &mmap[index..(index + Self::SIZE_OF_T)];
Self::T::try_read_from_bytes(slice)
@@ -103,22 +100,9 @@ where
&self.mmap
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
&self.guard
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
&mut self.guard
}
#[inline]
fn stored_len(&self) -> usize {
if let Some(guard) = self.guard() {
guard.len() / Self::SIZE_OF_T
} else {
self.mmap.load().len() / Self::SIZE_OF_T
}
self.mmap.load().len() / Self::SIZE_OF_T
}
#[inline]
@@ -218,9 +202,7 @@ where
Self {
version: self.version,
pathbuf: self.pathbuf.clone(),
// Consider Arc<ArcSwap<Option<Mmap>>> for dataraces when reorg ?
mmap: self.mmap.clone(),
guard: None,
pushed: vec![],
phantom: PhantomData,
}
@@ -239,8 +221,6 @@ where
I: StoredIndex,
T: StoredType,
{
const SIZE_OF_T: usize = size_of::<T>();
#[inline]
pub fn set(&mut self, i: I) -> &mut Self {
self.index = i.unwrap_to_usize();
@@ -253,14 +233,14 @@ where
}
#[inline]
pub fn get(&mut self, i: I) -> Option<(I, Value<'_, T>)> {
self.set(i).next()
pub fn get(&mut self, i: I) -> Option<Value<'_, T>> {
self.set(i).next().map(|(_, v)| v)
}
#[inline]
pub fn get_(&mut self, i: usize) -> Option<(I, Value<'_, T>)> {
pub fn get_(&mut self, i: usize) -> Option<Value<'_, T>> {
self.set_(i);
self.next()
self.next().map(|(_, v)| v)
}
}
@@ -272,28 +252,21 @@ where
type Item = (I, Value<'a, T>);
fn next(&mut self) -> Option<Self::Item> {
let mmap = &self.guard;
let vec = self.vec;
let i = self.index;
let index = self.index;
let stored_len = mmap.len() / Self::SIZE_OF_T;
let opt = self
.vec
.get_or_read_(index, mmap)
.unwrap()
.map(|v| (I::from(index), v));
let result = if i >= stored_len {
let j = i - stored_len;
if j >= vec.pushed_len() {
return None;
}
vec.pushed().get(j).map(|v| (I::from(i), Value::Ref(v)))
} else {
vec.get_stored_(i, mmap)
.unwrap()
.map(|v| (I::from(i), Value::Owned(v)))
};
if opt.is_some() {
self.index += 1;
}
self.index += 1;
result
opt
}
#[inline]
fn last(mut self) -> Option<Self::Item>
where
Self: Sized,
@@ -302,8 +275,9 @@ where
if len == 0 {
return None;
}
self.get_(len - 1)
.map(|(i, v)| (i, Value::Owned(v.into_inner())))
let i = len - 1;
self.get_(i)
.map(|v| (I::from(i), Value::Owned(v.into_inner())))
}
}