storable_vec: add modes

This commit is contained in:
nym21
2025-02-04 20:56:48 +01:00
parent 42c996e16e
commit d11a1622f8
16 changed files with 696 additions and 495 deletions

View File

@@ -3,30 +3,55 @@ use std::{
io,
};
use crate::Version;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub enum Error {
WrongEndian,
DifferentVersion { found: Version, expected: Version },
MmapsVecIsTooSmall,
IO(io::Error),
UnsafeSliceSerde(unsafe_slice_serde::Error),
IndexTooHigh,
IndexTooLow,
ExpectFileToHaveIndex,
ExpectVecToHaveIndex,
FailedKeyTryIntoUsize,
UnsupportedUnflushedState,
}
impl From<io::Error> for Error {
fn from(value: io::Error) -> Self {
Self::IO(value)
}
}
impl From<unsafe_slice_serde::Error> for Error {
fn from(value: unsafe_slice_serde::Error) -> Self {
Self::UnsafeSliceSerde(value)
}
}
impl fmt::Display for Error {
// This trait requires `fmt` with this exact signature.
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::WrongEndian => write!(f, "Wrong endian"),
Error::DifferentVersion { found, expected } => {
write!(f, "Different version; found: {found:?}, expected: {expected:?}")
}
Error::MmapsVecIsTooSmall => write!(f, "Mmaps vec is too small"),
Error::IO(error) => Debug::fmt(&error, f),
Error::UnsafeSliceSerde(error) => Debug::fmt(&error, f),
Error::IndexTooHigh => write!(f, "Index too high"),
Error::IndexTooLow => write!(f, "Index too low"),
Error::ExpectFileToHaveIndex => write!(f, "Expect file to have index"),
Error::ExpectVecToHaveIndex => write!(f, "Expect vec to have index"),
Error::FailedKeyTryIntoUsize => write!(f, "Failed to convert key to usize"),
Error::UnsupportedUnflushedState => {
write!(f, "Unsupported unflush state, please flush before using this function")
}
}
}
}

View File

@@ -21,20 +21,22 @@ pub use enums::*;
pub use structs::*;
pub use traits::*;
type Buffer = Vec<u8>;
/// Uses `Mmap` instead of `File`
///
/// Used in `/indexer`
const CACHED: u8 = 0;
pub const CACHED_GETS: u8 = 0;
/// Will use the same `File` for every read, so not thread safe
///
/// Used in `/computer`
const RAW_SYNC: u8 = 1;
pub const SINGLE_THREAD: u8 = 1;
/// Will spin up a new `File` for every read
///
/// Used in `/server`
const RAW_ASYNC: u8 = 2;
pub const ASYNC_READ_ONLY: u8 = 2;
///
/// A very small, fast, efficient and simple storable Vec
@@ -48,12 +50,16 @@ const RAW_ASYNC: u8 = 2;
/// If you don't call `.flush()` it just acts as a normal Vec
///
#[derive(Debug)]
pub struct StorableVec<I, T> {
pub struct StorableVec<I, T, const MODE: u8> {
pathbuf: PathBuf,
unsafe_file: File,
file: File,
/// **Number of values NOT number of bytes**
file_len: usize,
/// Only for SINGLE_THREAD
file_position: u64,
buf: Buffer,
/// Only for CACHED_GETS
cache: Vec<OnceLock<Box<Mmap>>>, // Boxed Mmap to reduce the size of the Lock (from 24 to 16)
buf: Vec<u8>,
disk_len: usize,
pushed: Vec<T>,
// updated: BTreeMap<usize, T>,
// inserted: BTreeMap<usize, T>,
@@ -69,7 +75,7 @@ const ONE_MB: usize = 1000 * 1024;
const MAX_CACHE_SIZE: usize = 100 * ONE_MB;
// const MAX_CACHE_SIZE: usize = 100 * ONE_MB;
impl<I, T> StorableVec<I, T>
impl<I, T, const MODE: u8> StorableVec<I, T, MODE>
where
I: StorableVecIndex,
T: StorableVecType,
@@ -80,24 +86,44 @@ where
pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T;
pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE;
pub fn import(path: &Path, version: Version) -> Result<Self, io::Error> {
/// Same as import but will remove the folder if the endian or the version is different, so be careful !
pub fn forced_import(path: &Path, version: Version) -> Result<Self> {
let res = Self::import(path, version);
match res {
Err(Error::WrongEndian) | Err(Error::DifferentVersion { found: _, expected: _ }) => {
fs::remove_dir_all(path)?;
Self::import(path, version)
}
_ => res,
}
}
pub fn import(path: &Path, version: Version) -> Result<Self> {
fs::create_dir_all(path)?;
let path_version = Self::path_version_(path);
let is_same_version =
Version::try_from(path_version.as_path()).is_ok_and(|prev_version| version == prev_version);
if !is_same_version {
fs::remove_dir_all(path)?;
if let Ok(prev_version) = Version::try_from(path_version.as_path()) {
if prev_version != version {
if prev_version.swap_bytes() == version {
return Err(Error::WrongEndian);
}
return Err(Error::DifferentVersion {
found: prev_version,
expected: version,
});
}
}
version.write(&path_version)?;
let unsafe_file = Self::open_file_(&Self::path_vec_(path))?;
let file = Self::open_file_(&Self::path_vec_(path))?;
let mut this = Self {
let mut slf = Self {
pathbuf: path.to_owned(),
disk_len: Self::disk_len(&unsafe_file)?,
unsafe_file,
buf: vec![0; Self::SIZE_OF_T],
file_position: 0,
file_len: Self::read_disk_len_(&file)?,
file,
buf: Self::create_buffer(),
cache: vec![],
pushed: vec![],
// updated: BTreeMap::new(),
@@ -108,35 +134,20 @@ where
// opened_mmaps: AtomicUsize::new(0),
};
// TODO: Only if write mode
this.reset_cache();
slf.reset_disk_related_state()?;
Ok(this)
Ok(slf)
}
pub fn disk_len(file: &File) -> io::Result<usize> {
Ok(Self::byte_index_to_index(file.metadata()?.len() as usize))
#[inline]
fn create_buffer() -> Buffer {
vec![0; Self::SIZE_OF_T]
}
pub fn reset_cache(&mut self) {
// par_iter_mut ?
self.cache.iter_mut().for_each(|lock| {
lock.take();
});
let len = (self.disk_len as f64 / Self::PER_PAGE as f64).ceil() as usize;
let len = Self::CACHE_LENGTH.min(len);
if self.cache.len() != len {
self.cache.resize_with(len, Default::default);
self.cache.shrink_to_fit();
}
fn open_file(&self) -> io::Result<File> {
Self::open_file_(&self.path_vec())
}
fn open_file(&self) -> Result<File, Error> {
Self::open_file_(&self.path_vec()).map_err(Error::IO)
}
fn open_file_(path: &Path) -> Result<File, io::Error> {
fn open_file_(path: &Path) -> io::Result<File> {
OpenOptions::new()
.read(true)
.create(true)
@@ -145,199 +156,104 @@ where
.open(path)
}
#[inline]
fn index_to_byte_range(index: usize) -> Range<usize> {
let index = Self::index_to_byte_index(index) % Self::PAGE_SIZE;
index..(index + Self::SIZE_OF_T)
fn read_disk_len(&self) -> io::Result<usize> {
Self::read_disk_len_(&self.file)
}
fn read_disk_len_(file: &File) -> io::Result<usize> {
Ok(Self::byte_index_to_index(file.metadata()?.len() as usize))
}
#[inline]
fn index_to_byte_index(index: usize) -> usize {
index * Self::SIZE_OF_T
fn reset_disk_related_state(&mut self) -> io::Result<()> {
self.file = self.open_file()?;
self.file_len = self.read_disk_len()?;
self.file_position = 0;
self.reset_cache()
}
#[inline]
fn byte_index_to_index(byte_index: usize) -> usize {
byte_index / Self::SIZE_OF_T
}
fn index_to_pushed_index(&self, index: usize) -> Result<Option<usize>> {
if index >= self.disk_len {
let index = index - self.disk_len;
if index >= self.pushed.len() {
Err(Error::IndexTooHigh)
} else {
Ok(Some(index))
}
} else {
Ok(None)
}
}
#[inline]
pub fn cached_get(&self, index: I) -> Result<Option<Value<'_, T>>> {
self.cached_get_(index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?)
}
fn cached_get_(&self, index: usize) -> Result<Option<Value<'_, T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed.get(index).map(|v| Value::Ref(v)));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(error) => return Err(error),
}
// if !self.updated.is_empty() {
// if let Some(v) = self.updated.get(&index) {
// return Ok(Some(v));
// }
// }
let page_index = index / Self::PER_PAGE;
let last_index = self.disk_len - 1;
let max_page_index = last_index / Self::PER_PAGE;
let min_page_index = (max_page_index + 1).checked_sub(self.cache.len()).unwrap_or_default();
// let min_open_page = self.min.load(AtomicOrdering::SeqCst);
// if self.min.load(AtomicOrdering::SeqCst) {
// self.min.set(value)
// }
if page_index >= min_page_index {
let mmap = &**self
.cache
.get(page_index - min_page_index)
.ok_or(Error::MmapsVecIsTooSmall)?
.get_or_init(|| {
Box::new(unsafe {
MmapOptions::new()
.len(Self::PAGE_SIZE)
.offset((page_index * Self::PAGE_SIZE) as u64)
.map(&self.unsafe_file)
.unwrap()
})
fn reset_cache(&mut self) -> io::Result<()> {
match MODE {
CACHED_GETS => {
// par_iter_mut ?
self.cache.iter_mut().for_each(|lock| {
lock.take();
});
let range = Self::index_to_byte_range(index);
let len = (self.file_len as f64 / Self::PER_PAGE as f64).ceil() as usize;
let len = Self::CACHE_LENGTH.min(len);
let slice = &mmap[range];
if self.cache.len() != len {
self.cache.resize_with(len, Default::default);
self.cache.shrink_to_fit();
}
Ok(Some(Value::Ref(
T::unsafe_try_from_slice(slice).map_err(Error::UnsafeSliceSerde)?,
)))
} else {
let (mut file, mut buf) = self.prepare_to_read()?;
Self::seek_(&mut file, index)?;
let value = self.read_exact(&mut file, &mut buf)?;
Ok(Some(Value::Owned(value.to_owned())))
Ok(())
}
_ => Ok(()),
}
}
pub fn get_or_default(&self, index: I) -> Result<T>
where
T: Default + Clone,
{
Ok(self
.cached_get(index)?
.map(|v| (*v).clone())
.unwrap_or(Default::default()))
}
// #[inline]
// fn open_file_at_then_read(&self, index: I) -> Result<T> {
// self.open_file_at_then_read_(Self::i_to_usize(index)?)
// }
fn open_file_at_then_read(&self, index: usize) -> Result<T> {
// let (mut file, mut buf) = self.open_file_at(index)?;
let mut file = self.open_file()?;
let mut buf = Self::create_buffer();
pub fn seek(file: &mut File, index: I) -> Result<()> {
Self::seek_(file, index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?)
}
pub fn seek_(file: &mut File, index: usize) -> Result<()> {
let byte_index = Self::index_to_byte_index(index);
file.seek(SeekFrom::Start(byte_index as u64)).map_err(Error::IO)?;
Ok(())
}
Self::seek(&mut file, byte_index)?;
pub fn iter<F>(&self, f: F) -> Result<()>
where
F: FnMut((I, &T)) -> Result<()>,
{
self.iter_from(I::from(0_usize), f)
Ok(Self::read_exact(&mut file, &mut buf)?.to_owned())
}
pub fn prepare_to_read(&self) -> Result<(File, Vec<u8>)> {
let file = self.open_file()?;
let buf = vec![0; Self::SIZE_OF_T];
Ok((file, buf))
// #[inline]
// fn open_file_at(&self, index: I) -> Result<(File, Buffer)> {
// self.open_file_at_(Self::i_to_usize(index)?)
// }
// fn open_file_at(&self, index: usize) -> Result<(File, Buffer)> {
// let mut file = self.open_file()?;
// let buf = Self::create_buffer();
// let byte_index = Self::index_to_byte_index(index);
// Self::seek(&mut file, byte_index)?;
// Ok((file, buf))
// }
// #[inline]
// fn seek_if_needed_(file: &mut File, index: I) -> Result<u64> {
// Self::seek_if_needed__(file, Self::i_to_usize(index)?).map_err(Error::IO)
// }
// #[inline]
// fn seek_if_needed(file: &mut File, index: usize) -> io::Result<u64> {
// let byte_index = Self::index_to_byte_index(index);
// if file.stream_position()? != byte_index {
// Self::seek(file, byte_index)?;
// }
// Ok(byte_index)
// }
#[inline]
fn seek(file: &mut File, byte_index: u64) -> io::Result<u64> {
file.seek(SeekFrom::Start(byte_index))
}
pub fn prepare_to_read_at(&self, index: I) -> Result<(File, Vec<u8>)> {
self.prepare_to_read_at_(index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?)
}
pub fn prepare_to_read_at_(&self, index: usize) -> Result<(File, Vec<u8>)> {
let (mut file, buf) = self.prepare_to_read()?;
Self::seek_(&mut file, index)?;
Ok((file, buf))
}
pub fn read_exact<'a>(&self, file: &'a mut File, buf: &'a mut [u8]) -> Result<&'a T> {
file.read_exact(buf).map_err(Error::IO)?;
let v = T::unsafe_try_from_slice(&buf[..]).map_err(Error::UnsafeSliceSerde)?;
fn read_exact<'a>(file: &'a mut File, buf: &'a mut [u8]) -> Result<&'a T> {
file.read_exact(buf)?;
let v = T::unsafe_try_from_slice(&buf[..])?;
Ok(v)
}
pub fn iter_from<F>(&self, index: I, mut f: F) -> Result<()>
where
F: FnMut((I, &T)) -> Result<()>,
{
let (mut file, mut buf) = self.prepare_to_read()?;
let disk_len = Self::disk_len(&file).map_err(Error::IO)?;
Self::seek(&mut file, index)?;
let mut i: usize = index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?;
while i < disk_len {
let v = self.read_exact(&mut file, &mut buf)?;
f((I::from(i), v))?;
i += 1;
}
i = 0;
while i < self.pushed_len() {
f((I::from(i + disk_len), self.pushed.get(i).as_ref().unwrap()))?;
i += 1;
}
Ok(())
}
#[allow(unused)]
pub fn first(&self) -> Result<Option<Value<'_, T>>> {
self.cached_get_(0)
}
#[allow(unused)]
pub fn last(&self) -> Result<Option<Value<'_, T>>> {
let len = self.len();
if len == 0 {
return Ok(None);
}
self.cached_get_(len - 1)
}
pub fn push(&mut self, value: T) {
#[inline]
fn push_(&mut self, value: T) {
self.pushed.push(value)
}
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
self.push_if_needed_(index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?, value)
}
fn push_if_needed_(&mut self, index: usize, value: T) -> Result<()> {
let len = self.len();
match len.cmp(&index) {
#[inline]
fn push_if_needed_(&mut self, index: I, value: T) -> Result<()> {
match self.pushed_len().cmp(&Self::i_to_usize(index)?) {
Ordering::Greater => {
// dbg!(len, index, &self.pathbuf);
// panic!();
Ok(())
}
Ordering::Equal => {
self.push(value);
self.pushed.push(value);
Ok(())
}
Ordering::Less => {
@@ -345,7 +261,34 @@ where
Err(Error::IndexTooHigh)
}
}
// Self::push_to_vec_if_needed(&mut self.pushed, index, value)
}
// #[inline]
// fn push_if_needed__(&mut self, index: usize, value: T) -> Result<()> {
// Self::push_to_vec_if_needed_(&mut self.pushed, index, value)
// }
// #[inline]
// fn push_to_vec_if_needed(vec: &mut Vec<T>, index: I, value: T) -> Result<()> {
// Self::push_to_vec_if_needed_(vec, Self::i_to_usize(index)?, value)
// }
// fn push_to_vec_if_needed_(vec: &mut Vec<T>, index: usize, value: T) -> Result<()> {
// let len = vec.len();
// match len.cmp(&index) {
// Ordering::Greater => {
// // dbg!(len, index, &self.pathbuf);
// // panic!();
// Ok(())
// }
// Ordering::Equal => {
// vec.push(value);
// Ok(())
// }
// Ordering::Less => {
// dbg!(index, value);
// Err(Error::IndexTooHigh)
// }
// }
// }
// pub fn update(&mut self, index: I, value: T) -> Result<()> {
// self._update(index.into(), value)
@@ -387,77 +330,332 @@ where
// self.removed.insert(index);
// }
#[inline]
pub fn len(&self) -> usize {
self.disk_len + self.pushed_len()
self.file_len + self.pushed_len()
}
#[inline]
pub fn pushed_len(&self) -> usize {
self.pushed.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn has(&self, index: I) -> Result<bool> {
Ok(self.has_(index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?))
Ok(self.has_(Self::i_to_usize(index)?))
}
#[inline]
fn has_(&self, index: usize) -> bool {
index < self.len()
}
#[inline]
pub fn hasnt(&self, index: I) -> Result<bool> {
Ok(self.hasnt_(index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?))
self.has(index).map(|b| !b)
}
#[inline]
fn hasnt_(&self, index: usize) -> bool {
!self.has_(index)
}
// pub fn flush(&mut self) -> io::Result<()>
// where
// T: Bytes,
// {
// self.flush_(|bytes, v| bytes.extend_from_slice(&v.to_bytes()))
// }
pub fn flush(&mut self) -> io::Result<()> {
// self.flush_(|bytes, v| bytes.extend_from_slice(v.unsafe_as_slice()))
// }
// fn flush_<F>(&mut self, mut extend: F) -> io::Result<()>
// where
// F: FnMut(&mut Vec<u8>, T),
// {
self.reset_cache();
self.reset_disk_related_state()?;
if self.pushed.is_empty() {
return Ok(());
}
self.disk_len += self.pushed.len();
self.file_len += self.pushed.len();
let mut bytes: Vec<u8> = vec![];
mem::take(&mut self.pushed)
.into_iter()
.for_each(|v| bytes.extend_from_slice(v.unsafe_as_slice()));
// .for_each(|v| extend(&mut bytes, v));
self.unsafe_file.write_all(&bytes)?;
self.file.write_all(&bytes)?;
Ok(())
}
#[inline]
fn i_to_usize(index: I) -> Result<usize> {
index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)
}
#[inline]
fn byte_index_to_index(byte_index: usize) -> usize {
byte_index / Self::SIZE_OF_T
}
#[inline]
fn index_to_byte_index(index: usize) -> u64 {
(index * Self::SIZE_OF_T) as u64
}
#[inline]
fn index_to_byte_range(index: usize) -> Range<usize> {
let index = (Self::index_to_byte_index(index) as usize) % Self::PAGE_SIZE;
index..(index + Self::SIZE_OF_T)
}
fn index_to_pushed_index(&self, index: usize) -> Result<Option<usize>> {
if index >= self.file_len {
let index = index - self.file_len;
if index >= self.pushed.len() {
Err(Error::IndexTooHigh)
} else {
Ok(Some(index))
}
} else {
Err(Error::IndexTooLow)
}
}
#[inline]
pub fn path(&self) -> &Path {
&self.pathbuf
}
#[inline]
fn path_vec(&self) -> PathBuf {
Self::path_vec_(&self.pathbuf)
}
#[inline]
fn path_vec_(path: &Path) -> PathBuf {
path.join("vec")
}
#[inline]
fn path_version_(path: &Path) -> PathBuf {
path.join("version")
}
}
impl<I, T> StorableVec<I, T, CACHED_GETS>
where
I: StorableVecIndex,
T: StorableVecType,
{
#[inline]
pub fn get(&self, index: I) -> Result<Option<Value<'_, T>>> {
self.get_(Self::i_to_usize(index)?)
}
fn get_(&self, index: usize) -> Result<Option<Value<'_, T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed.get(index).map(|v| Value::Ref(v)));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
// if !self.updated.is_empty() {
// if let Some(v) = self.updated.get(&index) {
// return Ok(Some(v));
// }
// }
let page_index = index / Self::PER_PAGE;
let last_index = self.file_len - 1;
let max_page_index = last_index / Self::PER_PAGE;
let min_page_index = (max_page_index + 1).checked_sub(self.cache.len()).unwrap_or_default();
// let min_open_page = self.min.load(AtomicOrdering::SeqCst);
// if self.min.load(AtomicOrdering::SeqCst) {
// self.min.set(value)
// }
if page_index >= min_page_index {
let mmap = &**self
.cache
.get(page_index - min_page_index)
.ok_or(Error::MmapsVecIsTooSmall)?
.get_or_init(|| {
Box::new(unsafe {
MmapOptions::new()
.len(Self::PAGE_SIZE)
.offset((page_index * Self::PAGE_SIZE) as u64)
.map(&self.file)
.unwrap()
})
});
let range = Self::index_to_byte_range(index);
let slice = &mmap[range];
return Ok(Some(Value::Ref(T::unsafe_try_from_slice(slice)?)));
}
Ok(Some(Value::Owned(self.open_file_at_then_read(index)?.to_owned())))
}
pub fn get_or_default(&self, index: I) -> Result<T>
where
T: Default + Clone,
{
Ok(self.get(index)?.map(|v| (*v).clone()).unwrap_or(Default::default()))
}
#[inline]
pub fn push(&mut self, value: T) {
self.push_(value)
}
#[inline]
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
self.push_if_needed_(index, value)
}
}
const FLUSH_EVERY: usize = 10_000;
impl<I, T> StorableVec<I, T, SINGLE_THREAD>
where
I: StorableVecIndex,
T: StorableVecType,
{
pub fn get(&mut self, index: I) -> Result<&T> {
self.get_(Self::i_to_usize(index)?)
}
fn get_(&mut self, index: usize) -> Result<&T> {
let byte_index = Self::index_to_byte_index(index);
if self.file_position != byte_index {
self.file_position = Self::seek(&mut self.file, byte_index)?;
}
let res = Self::read_exact(&mut self.file, &mut self.buf);
if res.is_ok() {
self.file_position += Self::SIZE_OF_T as u64;
}
res
}
pub fn last(&mut self) -> Result<Option<&T>> {
let len = self.len();
if len == 0 {
return Ok(None);
}
Ok(self.get_(len - 1).ok())
}
#[inline]
pub fn push(&mut self, value: T) {
self.push_(value)
}
#[inline]
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
self.push_if_needed_(index, value)?;
if self.pushed_len() >= FLUSH_EVERY {
Ok(self.flush()?)
} else {
Ok(())
}
}
// #[inline]
// fn seek_if_needed(&mut self, index: I) -> Result<()> {
// if self.file_position == Self::index_to_byte_index(Self::i_to_usize(index)?) {
// return Ok(());
// }
// self.file_position = Self::seek_if_needed_(&mut self.file, index)?;
// Ok(())
// }
pub fn iter<F>(&mut self, f: F) -> Result<()>
where
F: FnMut((I, &T)) -> Result<()>,
{
self.iter_from(I::default(), f)
}
pub fn iter_from<F>(&mut self, mut index: I, mut f: F) -> Result<()>
where
F: FnMut((I, &T)) -> Result<()>,
{
// let pushed_len = self.pushed_len();
// self.seek_if_needed(index)?;
if !self.pushed.is_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let disk_len = I::from(Self::read_disk_len_(&self.file)?);
while index < disk_len {
f((index, self.get(index)?))?;
index = index + 1;
}
// i = 0;
// while i < pushed_len {
// f((I::from(i + disk_len), self.pushed.get(i).as_ref().unwrap()))?;
// i += 1;
// }
Ok(())
}
pub fn compute_inverse_more_to_less(&mut self, other: &mut StorableVec<T, I, SINGLE_THREAD>) -> Result<()>
where
I: StorableVecType,
T: StorableVecIndex,
{
let index = self.last()?.cloned().unwrap_or_default();
other.iter_from(index, |(v, i)| self.push_if_needed(*i, v))?;
Ok(self.flush()?)
}
pub fn compute_inverse_less_to_more(
&mut self,
first_indexes: &mut StorableVec<T, I, SINGLE_THREAD>,
last_indexes: &mut StorableVec<T, I, SINGLE_THREAD>,
) -> Result<()>
where
I: StorableVecType,
T: StorableVecIndex,
{
first_indexes.iter_from(T::from(self.len()), |(value, first_index)| {
let first_index = Self::i_to_usize(*first_index)?;
let last_index = Self::i_to_usize(*last_indexes.get(value)?)?;
(first_index..last_index).try_for_each(|index| self.push_if_needed(I::from(index), value))
})?;
Ok(self.flush()?)
}
pub fn compute_transform<A, F>(&mut self, other: &mut StorableVec<I, A, SINGLE_THREAD>, t: F) -> Result<()>
where
A: StorableVecType,
F: Fn(&A) -> T,
{
other.iter_from(I::from(self.len()), |(i, a)| self.push_if_needed(i, t(a)))?;
Ok(self.flush()?)
}
}
impl<I, T> StorableVec<I, T, ASYNC_READ_ONLY>
where
I: StorableVecIndex,
T: StorableVecType,
{
#[inline]
pub fn get(&self, index: I) -> Result<Option<Value<'_, T>>> {
self.get_(Self::i_to_usize(index)?)
}
#[inline]
fn get_(&self, index: usize) -> Result<Option<Value<'_, T>>> {
Ok(Some(Value::Owned(self.open_file_at_then_read(index)?.to_owned())))
}
// Add iter iter_from iter_range collect..
// + add memory cap
}

View File

@@ -1,24 +1,25 @@
use std::path::Path;
use storable_vec::{StorableVec, Version};
use storable_vec::{StorableVec, Version, CACHED_GETS};
fn main() -> Result<(), Box<dyn std::error::Error>> {
{
let mut vec: StorableVec<usize, u32> = StorableVec::import(Path::new("./v"), Version::from(1))?;
let mut vec: StorableVec<usize, u32, CACHED_GETS> =
StorableVec::forced_import(Path::new("./v"), Version::from(1))?;
vec.push(0);
vec.push(1);
vec.push(2);
dbg!(vec.cached_get(0)?); // Some(0)
dbg!(vec.cached_get(21)?); // None
dbg!(vec.get(0)?); // Some(0)
dbg!(vec.get(21)?); // None
vec.flush()?;
}
{
let vec: StorableVec<usize, u32> = StorableVec::import(Path::new("./v"), Version::from(1))?;
let vec: StorableVec<usize, u32, CACHED_GETS> = StorableVec::forced_import(Path::new("./v"), Version::from(1))?;
dbg!(vec.cached_get(0)?); // 0
dbg!(vec.get(0)?); // 0
}
Ok(())

View File

@@ -4,12 +4,20 @@ use std::{
path::Path,
};
use unsafe_slice_serde::UnsafeSliceSerde;
use crate::Error;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Version(u32);
impl Version {
pub fn write(&self, path: &Path) -> Result<(), io::Error> {
fs::write(path, self.0.to_le_bytes())
fs::write(path, self.0.unsafe_as_slice())
}
pub fn swap_bytes(self) -> Self {
Self(self.0.swap_bytes())
}
}
@@ -20,10 +28,10 @@ impl From<u32> for Version {
}
impl TryFrom<&Path> for Version {
type Error = io::Error;
type Error = Error;
fn try_from(value: &Path) -> Result<Self, Self::Error> {
let mut buf = [0; 4];
fs::read(value)?.as_slice().read_exact(&mut buf)?;
Ok(Self(u32::from_le_bytes(buf)))
Ok(*(Self::unsafe_try_from_slice(&buf)?))
}
}

View File

@@ -7,10 +7,10 @@ use super::{StorableVecIndex, StorableVecType};
pub trait AnyStorableVec {
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn unsafe_flush(&mut self) -> io::Result<()>;
fn flush(&mut self) -> io::Result<()>;
}
impl<I, T> AnyStorableVec for StorableVec<I, T>
impl<I, T, const MODE: u8> AnyStorableVec for StorableVec<I, T, MODE>
where
I: StorableVecIndex,
T: StorableVecType,
@@ -23,7 +23,7 @@ where
self.is_empty()
}
fn unsafe_flush(&mut self) -> io::Result<()> {
fn flush(&mut self) -> io::Result<()> {
self.flush()
}
}

View File

@@ -2,10 +2,10 @@ use std::{fmt::Debug, ops::Add};
pub trait StorableVecIndex
where
Self: Debug + Default + Copy + Clone + TryInto<usize> + From<usize> + Add<usize, Output = Self>,
Self: Debug + Default + Copy + Clone + PartialOrd + Ord + TryInto<usize> + From<usize> + Add<usize, Output = Self>,
{
}
impl<I> StorableVecIndex for I where
I: Debug + Default + Copy + Clone + TryInto<usize> + From<usize> + Add<usize, Output = Self>
I: Debug + Default + Copy + Clone + PartialOrd + Ord + TryInto<usize> + From<usize> + Add<usize, Output = Self>
{
}