indexer: improved rollback; global: snapshot

This commit is contained in:
nym21
2025-02-18 22:43:12 +01:00
parent a122333aaa
commit 15f2e05192
29 changed files with 708 additions and 523 deletions

View File

@@ -12,6 +12,7 @@ json = ["dep:serde", "dep:serde_json"]
[dependencies]
memmap2 = "0.9.5"
zerocopy = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
zerocopy = { workspace = true }

View File

@@ -12,6 +12,7 @@ use std::{
};
pub use memmap2;
use rayon::prelude::*;
pub use zerocopy;
mod enums;
@@ -71,9 +72,9 @@ pub struct StorableVec<I, T, const MODE: u8> {
/// In bytes
const MAX_PAGE_SIZE: usize = 4 * 4096;
// const ONE_MB: usize = 1000 * 1024;
const MAX_CACHE_SIZE: usize = usize::MAX;
// const MAX_CACHE_SIZE: usize = 100 * ONE_MB;
const ONE_MB: usize = 1000 * 1024;
// const MAX_CACHE_SIZE: usize = usize::MAX;
const MAX_CACHE_SIZE: usize = 100 * ONE_MB;
impl<I, T, const MODE: u8> StorableVec<I, T, MODE>
where
@@ -182,8 +183,7 @@ where
fn reset_cache(&mut self) -> io::Result<()> {
match MODE {
CACHED_GETS => {
// par_iter_mut ?
self.cache.iter_mut().for_each(|lock| {
self.cache.par_iter_mut().for_each(|lock| {
lock.take();
});
@@ -192,7 +192,7 @@ where
if self.cache.len() != len {
self.cache.resize_with(len, Default::default);
self.cache.shrink_to_fit();
// self.cache.shrink_to_fit();
}
Ok(())
@@ -270,10 +270,14 @@ where
return Ok(());
}
let mut bytes: Vec<u8> = Vec::with_capacity(self.pushed_len() * Self::SIZE_OF_T);
let mut bytes: Vec<u8> = vec![0; self.pushed_len() * Self::SIZE_OF_T];
let unsafe_bytes = UnsafeSlice::new(&mut bytes);
mem::take(&mut self.pushed)
.into_iter()
.for_each(|v| bytes.extend_from_slice(v.as_bytes()));
.into_par_iter()
.enumerate()
.for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes()));
self.file.write_all(&bytes)?;
@@ -291,8 +295,7 @@ where
let value_at_index = self.open_file_at_then_read(index).ok();
self.file
.set_len(Self::index_to_byte_index(index.checked_sub(1).unwrap_or_default()))?;
self.file.set_len(Self::index_to_byte_index(index))?;
self.reset_disk_related_state()?;
@@ -429,6 +432,28 @@ where
Ok(self.get(index)?.map(|v| (*v).clone()).unwrap_or(Default::default()))
}
pub fn iter_from<F>(&self, mut index: I, mut f: F) -> Result<()>
where
F: FnMut((I, Value<T>)) -> Result<()>,
{
let disk_len = I::from(Self::read_disk_len_(&self.file)?);
while index < disk_len {
f((index, self.get(index)?.unwrap()))?;
index = index + 1;
}
let mut index = I::from(0);
let pushed_len = I::from(self.pushed_len());
let disk_len = Self::i_to_usize(disk_len)?;
while index < pushed_len {
f(((index + disk_len), self.get(index)?.map(Value::from).unwrap()))?;
index = index + 1;
}
Ok(())
}
#[inline]
pub fn push(&mut self, value: T) {
self.push_(value)

View File

@@ -1,3 +1,5 @@
mod unsafe_slice;
mod version;
pub use unsafe_slice::*;
pub use version::*;

View File

@@ -0,0 +1,30 @@
use std::cell::UnsafeCell;
#[derive(Copy, Clone)]
pub struct UnsafeSlice<'a, T>(&'a [UnsafeCell<T>]);
unsafe impl<T: Send + Sync> Send for UnsafeSlice<'_, T> {}
unsafe impl<T: Send + Sync> Sync for UnsafeSlice<'_, T> {}
impl<'a, T> UnsafeSlice<'a, T> {
pub fn new(slice: &'a mut [T]) -> Self {
let ptr = slice as *mut [T] as *const [UnsafeCell<T>];
Self(unsafe { &*ptr })
}
/// SAFETY: It is UB if two threads write to the same index without
/// synchronization.
pub fn write(&self, i: usize, value: T) {
unsafe {
*self.0[i].get() = value;
}
}
pub fn copy_slice(&self, start: usize, slice: &[T])
where
T: Copy,
{
slice.iter().enumerate().for_each(|(i, v)| {
self.write(start + i, *v);
});
}
}