vec: single file with header

This commit is contained in:
nym21
2025-06-23 20:48:00 +02:00
parent c0f4ece17b
commit 589bb02411
40 changed files with 685 additions and 404 deletions

View File

@@ -21,10 +21,8 @@ impl CompressedPagesMetadata {
const PAGE_SIZE: usize = size_of::<CompressedPageMetadata>();
pub fn read(path: &Path) -> Result<CompressedPagesMetadata> {
let path = path.join("pages_meta");
let slf = Self {
vec: fs::read(&path)
let this = Self {
vec: fs::read(path)
.unwrap_or_default()
.chunks(Self::PAGE_SIZE)
.map(|bytes| {
@@ -38,7 +36,7 @@ impl CompressedPagesMetadata {
change_at: None,
};
Ok(slf)
Ok(this)
}
pub fn write(&mut self) -> io::Result<()> {

View File

@@ -18,6 +18,10 @@ impl Format {
fs::write(path, self.as_bytes())
}
pub fn is_raw(&self) -> bool {
*self == Self::Raw
}
pub fn is_compressed(&self) -> bool {
*self == Self::Compressed
}

View File

@@ -0,0 +1,183 @@
use std::{
fs::File,
io::{self, Seek, SeekFrom},
os::unix::fs::FileExt,
sync::Arc,
};
use arc_swap::ArcSwap;
use brk_core::{Error, Height, Result, Version};
use memmap2::Mmap;
use zerocopy::{FromBytes, IntoBytes};
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::Format;
const HEADER_VERSION: Version = Version::ONE;
pub const HEADER_OFFSET: usize = size_of::<HeaderInner>();
#[derive(Debug, Clone)]
pub struct Header {
inner: Arc<ArcSwap<HeaderInner>>,
modified: bool,
}
impl Header {
pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result<Self> {
let inner = HeaderInner::create_and_write(file, vec_version, format)?;
Ok(Self {
inner: Arc::new(ArcSwap::from_pointee(inner)),
modified: false,
})
}
pub fn import_and_verify(mmap: &Mmap, vec_version: Version, format: Format) -> Result<Self> {
let inner = HeaderInner::import_and_verify(mmap, vec_version, format)?;
Ok(Self {
inner: Arc::new(ArcSwap::from_pointee(inner)),
modified: false,
})
}
pub fn update_height(&mut self, height: Height) {
self.modified = true;
self.inner.rcu(|header| {
let mut header = (**header).clone();
header.height = height;
header
});
}
pub fn update_computed_version(&mut self, computed_version: Version) {
self.modified = true;
self.inner.rcu(|header| {
let mut header = (**header).clone();
header.computed_version = computed_version;
header
});
}
pub fn vec_version(&self) -> Version {
self.inner.load().vec_version
}
pub fn computed_version(&self) -> Version {
self.inner.load().computed_version
}
pub fn height(&self) -> Height {
self.inner.load().height
}
pub fn write_if_needed(&mut self, file: &mut File) -> io::Result<()> {
if self.modified {
self.inner.load().write(file)?;
self.modified = false;
}
Ok(())
}
}
#[repr(C)]
#[derive(Debug, Clone, FromBytes, IntoBytes, Immutable, KnownLayout)]
struct HeaderInner {
pub header_version: Version,
pub vec_version: Version,
pub computed_version: Version,
pub height: Height,
pub compressed: ZeroCopyBool,
}
impl HeaderInner {
pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result<Self> {
let header = Self {
header_version: HEADER_VERSION,
vec_version,
computed_version: Version::default(),
height: Height::default(),
compressed: ZeroCopyBool::from(format),
};
header.write(file)?;
// dbg!(file.bytes().map(|b| b.unwrap()).collect::<Vec<_>>());
file.seek(SeekFrom::End(0))?;
Ok(header)
}
pub fn write(&self, file: &mut File) -> io::Result<()> {
file.write_all_at(self.as_bytes(), 0)
}
pub fn import_and_verify(mmap: &Mmap, vec_version: Version, format: Format) -> Result<Self> {
if mmap.len() < HEADER_OFFSET {
return Err(Error::WrongLength);
}
// dbg!(mmap.len());
let header = HeaderInner::read_from_bytes(&mmap[..HEADER_OFFSET])?;
// dbg!(&header);
if header.header_version != HEADER_VERSION {
return Err(Error::DifferentVersion {
found: header.header_version,
expected: HEADER_VERSION,
});
}
if header.vec_version != vec_version {
return Err(Error::DifferentVersion {
found: header.vec_version,
expected: vec_version,
});
}
if header.compressed.is_broken() {
return Err(Error::WrongEndian);
}
if (header.compressed.is_true() && format.is_raw())
|| (header.compressed.is_false() && format.is_compressed())
{
return Err(Error::DifferentCompressionMode);
}
Ok(header)
}
}
#[derive(
Debug,
Clone,
Copy,
Default,
PartialEq,
Eq,
PartialOrd,
Ord,
FromBytes,
IntoBytes,
Immutable,
KnownLayout,
)]
#[repr(C)]
pub struct ZeroCopyBool(u32);
impl ZeroCopyBool {
pub const TRUE: Self = Self(1);
pub const FALSE: Self = Self(0);
pub fn is_true(&self) -> bool {
*self == Self::TRUE
}
pub fn is_false(&self) -> bool {
*self == Self::FALSE
}
pub fn is_broken(&self) -> bool {
*self > Self::TRUE
}
}
impl From<Format> for ZeroCopyBool {
fn from(value: Format) -> Self {
if value.is_raw() {
Self::FALSE
} else {
Self::TRUE
}
}
}

View File

@@ -1,11 +1,13 @@
mod compressed_page_meta;
mod compressed_pages_meta;
mod format;
mod length;
mod header;
// mod length;
mod unsafe_slice;
pub use compressed_page_meta::*;
pub use compressed_pages_meta::*;
pub use format::*;
pub use length::*;
pub use header::*;
// pub use length::*;
pub use unsafe_slice::*;

View File

@@ -10,10 +10,13 @@ use arc_swap::ArcSwap;
use brk_core::{Result, Value};
use memmap2::Mmap;
use crate::{AnyVec, HEADER_OFFSET, Header};
use super::{StoredIndex, StoredType};
pub trait GenericStoredVec<I, T>: Send + Sync
where
Self: AnyVec,
I: StoredIndex,
T: StoredType,
{
@@ -50,6 +53,10 @@ where
self.stored_len() + self.pushed_len()
}
fn index_to_name(&self) -> String {
format!("{}_to_{}", I::to_string(), self.name())
}
fn mmap(&self) -> &ArcSwap<Mmap>;
fn stored_len(&self) -> usize;
@@ -66,26 +73,47 @@ where
self.mut_pushed().push(value)
}
fn path(&self) -> PathBuf;
fn header(&self) -> &Header;
fn mut_header(&mut self) -> &mut Header;
fn parent(&self) -> &Path;
fn folder(&self) -> PathBuf {
self.parent().join(self.name())
}
fn folder_(parent: &Path, name: &str) -> PathBuf {
parent.join(name)
}
fn path(&self) -> PathBuf {
Self::path_(self.parent(), self.name())
}
fn path_(parent: &Path, name: &str) -> PathBuf {
Self::folder_(parent, name).join(I::to_string())
}
// ---
fn open_file(&self) -> io::Result<File> {
Self::open_file_(&self.path_vec())
}
fn open_file_(path: &Path) -> io::Result<File> {
OpenOptions::new()
let mut file = OpenOptions::new()
.read(true)
.create(true)
.write(true)
.truncate(false)
.append(true)
.open(path)
.open(path)?;
file.seek(SeekFrom::End(0))?;
Ok(file)
}
fn file(&self) -> &File;
fn mut_file(&mut self) -> &mut File;
fn file_set_len(&mut self, len: u64) -> Result<()> {
let mut file = self.open_file()?;
Self::file_set_len_(&mut file, len)?;
self.update_mmap(file)
let file = self.mut_file();
Self::file_set_len_(file, len)?;
self.update_mmap()
}
fn file_set_len_(file: &mut File, len: u64) -> Result<()> {
file.set_len(len)?;
@@ -94,29 +122,31 @@ where
}
fn file_write_all(&mut self, buf: &[u8]) -> Result<()> {
let mut file = self.open_file()?;
let file = self.mut_file();
file.write_all(buf)?;
self.update_mmap(file)
self.update_mmap()
}
fn file_truncate_and_write_all(&mut self, len: u64, buf: &[u8]) -> Result<()> {
let mut file = self.open_file()?;
Self::file_set_len_(&mut file, len)?;
let file = self.mut_file();
Self::file_set_len_(file, len)?;
file.write_all(buf)?;
self.update_mmap(file)
self.update_mmap()
}
fn reset(&mut self) -> Result<()>;
#[inline]
fn reset(&mut self) -> Result<()> {
self.file_truncate_and_write_all(0, &[])
fn reset_(&mut self) -> Result<()> {
self.file_truncate_and_write_all(HEADER_OFFSET as u64, &[])
}
fn new_mmap(file: File) -> Result<Arc<Mmap>> {
Ok(Arc::new(unsafe { Mmap::map(&file)? }))
fn new_mmap(file: &File) -> Result<Arc<Mmap>> {
Ok(Arc::new(unsafe { Mmap::map(file)? }))
}
fn update_mmap(&mut self, file: File) -> Result<()> {
let mmap = Self::new_mmap(file)?;
fn update_mmap(&mut self) -> Result<()> {
let mmap = Self::new_mmap(self.file())?;
self.mmap().store(mmap);
Ok(())
}
@@ -139,28 +169,9 @@ where
fn truncate_if_needed(&mut self, index: I) -> Result<()>;
#[inline]
fn path_vec(&self) -> PathBuf {
Self::path_vec_(&self.path())
}
#[inline]
fn path_vec_(path: &Path) -> PathBuf {
path.join("vec")
}
#[inline]
fn path_version_(path: &Path) -> PathBuf {
path.join("version")
}
#[inline]
fn path_compressed_(path: &Path) -> PathBuf {
path.join("compressed")
}
fn modified_time_(&self) -> Result<Duration> {
Ok(self
.path_vec()
.file()
.metadata()?
.modified()?
.duration_since(time::UNIX_EPOCH)?)

View File

@@ -14,8 +14,8 @@ use zstd::DEFAULT_COMPRESSION_LEVEL;
use crate::{
AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec,
CompressedPageMetadata, CompressedPagesMetadata, GenericStoredVec, RawVec, StoredIndex,
StoredType, UnsafeSlice,
CompressedPageMetadata, CompressedPagesMetadata, GenericStoredVec, HEADER_OFFSET, Header,
RawVec, StoredIndex, StoredType, UnsafeSlice,
};
const ONE_KIB: usize = 1024;
@@ -23,7 +23,7 @@ const ONE_MIB: usize = ONE_KIB * ONE_KIB;
pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB;
pub const MAX_PAGE_SIZE: usize = 64 * ONE_KIB;
const VERSION: Version = Version::ONE;
const VERSION: Version = Version::TWO;
#[derive(Debug)]
pub struct CompressedVec<I, T> {
@@ -41,47 +41,38 @@ where
pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE;
/// Same as import but will reset the folder under certain errors, so be careful !
pub fn forced_import(path: &Path, name: &str, mut version: Version) -> Result<Self> {
pub fn forced_import(parent: &Path, name: &str, mut version: Version) -> Result<Self> {
version = version + VERSION;
let res = Self::import(path, name, version);
let res = Self::import(parent, name, version);
match res {
Err(Error::WrongEndian)
| Err(Error::DifferentVersion { .. })
| Err(Error::DifferentCompressionMode) => {
fs::remove_dir_all(path)?;
Self::import(path, name, version)
Err(Error::DifferentCompressionMode)
| Err(Error::WrongEndian)
| Err(Error::WrongLength)
| Err(Error::DifferentVersion { .. }) => {
let path = Self::path_(parent, name);
fs::remove_file(path)?;
Self::import(parent, name, version)
}
_ => res,
}
}
pub fn import(path: &Path, name: &str, version: Version) -> Result<Self> {
pub fn import(parent: &Path, name: &str, version: Version) -> Result<Self> {
let inner = RawVec::import(parent, name, version)?;
let pages_meta = {
let path = path.join(name).join(I::to_string());
let vec_exists = fs::exists(Self::path_vec_(&path)).is_ok_and(|b| b);
let compressed_path = Self::path_compressed_(&path);
let compressed_exists = fs::exists(&compressed_path).is_ok_and(|b| b);
if vec_exists && !compressed_exists {
return Err(Error::DifferentCompressionMode);
let path = inner
.folder()
.join(format!("{}-pages-meta", I::to_string()));
if inner.is_empty() {
let _ = fs::remove_file(&path);
}
if !vec_exists && !compressed_exists {
fs::create_dir_all(&path)?;
File::create(&compressed_path)?;
}
Arc::new(ArcSwap::new(Arc::new(CompressedPagesMetadata::read(
&path,
)?)))
};
Ok(Self {
inner: RawVec::import(path, name, version)?,
pages_meta,
})
Ok(Self { inner, pages_meta })
}
fn decode_page(&self, page_index: usize, mmap: &Mmap) -> Result<Vec<T>> {
@@ -182,11 +173,31 @@ where
.cloned())
}
fn header(&self) -> &Header {
self.inner.header()
}
fn mut_header(&mut self) -> &mut Header {
self.inner.mut_header()
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
self.inner.mmap()
}
fn parent(&self) -> &Path {
self.inner.parent()
}
fn file(&self) -> &File {
self.inner.file()
}
fn mut_file(&mut self) -> &mut File {
self.inner.mut_file()
}
#[inline]
fn stored_len(&self) -> usize {
Self::stored_len__(&self.pages_meta.load())
@@ -211,6 +222,8 @@ where
}
fn flush(&mut self) -> Result<()> {
self.inner.write_header_if_needed()?;
let pushed_len = self.pushed_len();
if pushed_len == 0 {
@@ -266,11 +279,12 @@ where
} else {
0
};
let offsetted_start = start + HEADER_OFFSET as u64;
let bytes_len = compressed_bytes.len() as u32;
let values_len = *values_len as u32;
let page = CompressedPageMetadata::new(start, bytes_len, values_len);
let page = CompressedPageMetadata::new(offsetted_start, bytes_len, values_len);
pages_meta.push(page_index, page);
});
@@ -298,7 +312,7 @@ where
pages_meta.truncate(0);
pages_meta.write()?;
self.pages_meta.store(Arc::new(pages_meta));
self.file_truncate_and_write_all(0, &[])
self.reset_()
}
fn truncate_if_needed(&mut self, index: I) -> Result<()> {

View File

@@ -5,7 +5,6 @@ use std::{
fmt::Debug,
ops::{Add, Div, Mul},
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
@@ -29,10 +28,8 @@ const MAX_CACHE_SIZE: usize = 210 * ONE_MIB;
const DCA_AMOUNT: Dollars = Dollars::mint(100.0);
#[derive(Debug, Clone)]
pub struct EagerVec<I, T> {
computed_version: Arc<ArcSwap<Option<Version>>>,
inner: StoredVec<I, T>,
}
pub struct EagerVec<I, T>(StoredVec<I, T>);
// computed_version: Arc<ArcSwap<Option<Version>>>,
impl<I, T> EagerVec<I, T>
where
@@ -47,12 +44,9 @@ where
version: Version,
format: Format,
) -> Result<Self> {
let inner = StoredVec::forced_import(path, value_name, version, format)?;
Ok(Self {
computed_version: Arc::new(ArcSwap::from_pointee(None)),
inner,
})
Ok(Self(StoredVec::forced_import(
path, value_name, version, format,
)?))
}
fn safe_truncate_if_needed(&mut self, index: I, exit: &Exit) -> Result<()> {
@@ -63,7 +57,7 @@ where
if !blocked {
exit.block();
}
self.inner.truncate_if_needed(index)?;
self.0.truncate_if_needed(index)?;
if !blocked {
exit.release();
}
@@ -80,11 +74,11 @@ where
if ord == Ordering::Greater {
self.safe_truncate_if_needed(index, exit)?;
}
self.inner.push(value);
self.0.push(value);
}
}
if self.inner.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE {
if self.0.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE {
self.safe_flush(exit)
} else {
Ok(())
@@ -99,7 +93,7 @@ where
if !blocked {
exit.block();
}
self.inner.flush()?;
self.0.flush()?;
if !blocked {
exit.release();
}
@@ -107,33 +101,34 @@ where
}
pub fn path(&self) -> PathBuf {
self.inner.path()
self.0.path()
}
pub fn get_or_read(&self, index: I, mmap: &Mmap) -> Result<Option<Value<T>>> {
self.inner.get_or_read(index, mmap)
self.0.get_or_read(index, mmap)
}
pub fn mmap(&self) -> &ArcSwap<Mmap> {
self.inner.mmap()
self.0.mmap()
}
pub fn inner_version(&self) -> Version {
self.inner.version()
self.0.version()
}
#[inline]
fn path_computed_version(&self) -> PathBuf {
self.inner.path().join("computed_version")
fn update_computed_version(&mut self, computed_version: Version) {
self.0
.mut_header()
.update_computed_version(computed_version);
}
pub fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> {
let path = self.path_computed_version();
if version.validate(path.as_ref()).is_err() {
self.inner.reset()?;
if version != self.0.header().computed_version() {
self.update_computed_version(version);
if !self.is_empty() {
self.0.reset()?;
}
}
version.write(path.as_ref())?;
self.computed_version.store(Arc::new(Some(version)));
if self.is_empty() {
info!(
@@ -157,9 +152,7 @@ where
where
F: FnMut(I) -> (I, T),
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + version,
)?;
self.validate_computed_version_or_reset_file(Version::ZERO + self.0.version() + version)?;
let index = max_from.min(I::from(self.len()));
(index.to_usize()?..to).try_for_each(|i| {
@@ -216,7 +209,7 @@ where
F: FnMut((A, B, &Self)) -> (I, T),
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + other.version(),
Version::ZERO + self.0.version() + other.version(),
)?;
let index = max_from.min(A::from(self.len()));
@@ -239,7 +232,7 @@ where
T: Add<Output = T>,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + added.version() + adder.version(),
Version::ZERO + self.0.version() + added.version() + adder.version(),
)?;
let index = max_from.min(I::from(self.len()));
@@ -265,7 +258,7 @@ where
T: CheckedSub,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + subtracted.version() + subtracter.version(),
Version::ZERO + self.0.version() + subtracted.version() + subtracter.version(),
)?;
let index = max_from.min(I::from(self.len()));
@@ -294,7 +287,7 @@ where
T2: StoredType,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + source.version(),
Version::ZERO + self.0.version() + source.version(),
)?;
let index = max_from.min(I::from(self.len()));
@@ -333,7 +326,7 @@ where
T: From<T4>,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + multiplied.version() + multiplier.version(),
Version::ZERO + self.0.version() + multiplied.version() + multiplier.version(),
)?;
let index = max_from.min(I::from(self.len()));
@@ -416,7 +409,7 @@ where
T: From<T5>,
{
self.validate_computed_version_or_reset_file(
Version::ONE + self.inner.version() + divided.version() + divider.version(),
Version::ONE + self.0.version() + divided.version() + divider.version(),
)?;
let index = max_from.min(I::from(self.len()));
@@ -453,7 +446,7 @@ where
T: From<StoredF32>,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + ath.version() + close.version(),
Version::ZERO + self.0.version() + ath.version() + close.version(),
)?;
let index = max_from.min(I::from(self.len()));
@@ -483,12 +476,11 @@ where
T: StoredIndex,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + other.version(),
Version::ZERO + self.0.version() + other.version(),
)?;
let index = max_from.min(
VecIterator::last(self.inner.into_iter())
.map_or_else(T::default, |(_, v)| v.into_inner()),
VecIterator::last(self.0.into_iter()).map_or_else(T::default, |(_, v)| v.into_inner()),
);
let mut prev_i = None;
other.iter_at(index).try_for_each(|(v, i)| -> Result<()> {
@@ -518,10 +510,7 @@ where
T: StoredIndex,
{
self.validate_computed_version_or_reset_file(
Version::ZERO
+ self.inner.version()
+ first_indexes.version()
+ indexes_count.version(),
Version::ZERO + self.0.version() + first_indexes.version() + indexes_count.version(),
)?;
let mut indexes_count_iter = indexes_count.iter();
@@ -613,10 +602,7 @@ where
<T2 as TryInto<T>>::Error: error::Error + 'static,
{
self.validate_computed_version_or_reset_file(
Version::ZERO
+ self.inner.version()
+ first_indexes.version()
+ other_to_else.version(),
Version::ZERO + self.0.version() + first_indexes.version() + other_to_else.version(),
)?;
let mut other_iter = first_indexes.iter();
@@ -654,10 +640,7 @@ where
A: StoredIndex + StoredType,
{
self.validate_computed_version_or_reset_file(
Version::ZERO
+ self.inner.version()
+ self_to_other.version()
+ other_to_self.version(),
Version::ZERO + self.0.version() + self_to_other.version() + other_to_self.version(),
)?;
let mut other_to_self_iter = other_to_self.iter();
@@ -686,10 +669,7 @@ where
T2: StoredIndex + StoredType,
{
self.validate_computed_version_or_reset_file(
Version::ZERO
+ self.inner.version()
+ first_indexes.version()
+ indexes_count.version(),
Version::ZERO + self.0.version() + first_indexes.version() + indexes_count.version(),
)?;
let mut indexes_count_iter = indexes_count.iter();
@@ -721,7 +701,7 @@ where
T: From<usize> + Add<T, Output = T>,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + others.iter().map(|v| v.version()).sum(),
Version::ZERO + self.0.version() + others.iter().map(|v| v.version()).sum(),
)?;
if others.is_empty() {
@@ -756,7 +736,7 @@ where
T: From<usize> + Add<T, Output = T> + Ord,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + others.iter().map(|v| v.version()).sum(),
Version::ZERO + self.0.version() + others.iter().map(|v| v.version()).sum(),
)?;
if others.is_empty() {
@@ -793,7 +773,7 @@ where
T: From<usize> + Add<T, Output = T> + Ord,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + others.iter().map(|v| v.version()).sum(),
Version::ZERO + self.0.version() + others.iter().map(|v| v.version()).sum(),
)?;
if others.is_empty() {
@@ -849,12 +829,13 @@ where
f32: From<T> + From<T2>,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + source.version(),
Version::ONE + self.0.version() + source.version(),
)?;
let index = max_from.min(I::from(self.len()));
let mut prev = None;
let min_prev_i = min_i.unwrap_or_default().unwrap_to_usize();
let mut other_iter = source.iter();
source.iter_at(index).try_for_each(|(i, value)| {
let value = value.into_inner();
@@ -867,11 +848,21 @@ where
T::from(0.0)
});
}
let len = (i.unwrap_to_usize() - min_prev_i + 1).min(sma);
let sma = T::from(
(f32::from(prev.clone().unwrap()) * (len - 1) as f32 + f32::from(value))
/ len as f32,
);
let processed_values_count = i.unwrap_to_usize() - min_prev_i + 1;
let len = (processed_values_count).min(sma);
let value = f32::from(value);
let sma = T::from(if processed_values_count > sma {
let prev_sum = f32::from(prev.clone().unwrap()) * len as f32;
let value_to_subtract = f32::from(
other_iter.unwrap_get_inner_(i.unwrap_to_usize().checked_sub(sma).unwrap()),
);
(prev_sum - value_to_subtract + value) / len as f32
} else {
(f32::from(prev.clone().unwrap()) * (len - 1) as f32 + value) / len as f32
});
prev.replace(sma.clone());
self.forced_push_at(i, sma, exit)
@@ -897,7 +888,7 @@ where
T: From<f32>,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + source.version(),
Version::ZERO + self.0.version() + source.version(),
)?;
let index = max_from.min(I::from(self.len()));
@@ -928,7 +919,7 @@ where
T: CheckedSub + Default,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + source.version(),
Version::ZERO + self.0.version() + source.version(),
)?;
let index = max_from.min(I::from(self.len()));
@@ -961,7 +952,7 @@ where
T: From<f32>,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + source.version(),
Version::ZERO + self.0.version() + source.version(),
)?;
let index = max_from.min(I::from(self.len()));
@@ -997,7 +988,7 @@ where
T: From<f32>,
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + percentage_returns.version(),
Version::ZERO + self.0.version() + percentage_returns.version(),
)?;
if days % 365 != 0 {
@@ -1056,7 +1047,7 @@ impl EagerVec<DateIndex, Sats> {
exit: &Exit,
) -> Result<()> {
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + closes.version(),
Version::ZERO + self.0.version() + closes.version(),
)?;
let mut other_iter = closes.iter();
@@ -1105,7 +1096,7 @@ impl EagerVec<DateIndex, Sats> {
exit: &Exit,
) -> Result<()> {
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + closes.version(),
Version::ZERO + self.0.version() + closes.version(),
)?;
let mut prev = None;
@@ -1146,7 +1137,7 @@ impl EagerVec<DateIndex, Dollars> {
exit: &Exit,
) -> Result<()> {
self.validate_computed_version_or_reset_file(
Version::ONE + self.inner.version() + stacks.version(),
Version::ONE + self.0.version() + stacks.version(),
)?;
let index = max_from.min(DateIndex::from(self.len()));
@@ -1177,7 +1168,7 @@ impl EagerVec<DateIndex, Dollars> {
exit: &Exit,
) -> Result<()> {
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + stacks.version(),
Version::ZERO + self.0.version() + stacks.version(),
)?;
let index = max_from.min(DateIndex::from(self.len()));
@@ -1209,7 +1200,7 @@ where
exit: &Exit,
) -> Result<()> {
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + sats.version(),
Version::ZERO + self.0.version() + sats.version(),
)?;
let index = max_from.min(I::from(self.len()));
@@ -1234,7 +1225,7 @@ where
exit: &Exit,
) -> Result<()> {
self.validate_computed_version_or_reset_file(
Version::ZERO + self.inner.version() + bitcoin.version(),
Version::ZERO + self.0.version() + bitcoin.version(),
)?;
let mut price_iter = price.iter();
@@ -1260,7 +1251,7 @@ where
// ) -> Result<()> {
// self.validate_computed_version_or_reset_file(
// Version::ZERO
// + self.inner.version()
// + self.0.version()
// + bitcoin.version()
// + i_to_height.version()
// + price.version(),
@@ -1289,7 +1280,7 @@ where
type IntoIter = StoredVecIterator<'a, I, T>;
fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
self.0.into_iter()
}
}
@@ -1300,28 +1291,22 @@ where
{
#[inline]
fn version(&self) -> Version {
self.computed_version
.load()
.or_else(|| {
dbg!(self.path());
None
})
.unwrap()
self.0.header().computed_version()
}
#[inline]
fn name(&self) -> &str {
self.inner.name()
self.0.name()
}
#[inline]
fn len(&self) -> usize {
self.inner.len()
self.0.len()
}
#[inline]
fn modified_time(&self) -> Result<Duration> {
self.inner.modified_time()
self.0.modified_time()
}
#[inline]
@@ -1345,7 +1330,7 @@ where
I: StoredIndex,
T: StoredType + 'a,
{
Box::new(self.inner.into_iter())
Box::new(self.0.into_iter())
}
}

View File

@@ -1,25 +1,17 @@
use std::{
cmp::Ordering,
fmt::Debug,
path::{Path, PathBuf},
time::Duration,
};
use std::{cmp::Ordering, fmt::Debug, path::Path, time::Duration};
use arc_swap::ArcSwap;
use brk_core::{Error, Height, Result, Value, Version};
use crate::{
AnyCollectableVec, AnyIterableVec, AnyVec, BoxedVecIterator, CollectableVec, Format,
GenericStoredVec, Mmap, StoredIndex, StoredType, StoredVec,
GenericStoredVec, Header, Mmap, StoredIndex, StoredType, StoredVec,
};
use super::StoredVecIterator;
#[derive(Debug, Clone)]
pub struct IndexedVec<I, T> {
height: Option<Height>,
inner: StoredVec<I, T>,
}
pub struct IndexedVec<I, T>(StoredVec<I, T>);
impl<I, T> IndexedVec<I, T>
where
@@ -28,26 +20,23 @@ where
{
pub fn forced_import(
path: &Path,
value_name: &str,
name: &str,
version: Version,
format: Format,
) -> Result<Self> {
let inner = StoredVec::forced_import(path, value_name, version, format)?;
Ok(Self {
height: Height::try_from(Self::path_height_(&inner.path()).as_path()).ok(),
inner,
})
Ok(Self(
StoredVec::forced_import(path, name, version, format).unwrap(),
))
}
#[inline]
pub fn get_or_read(&self, index: I, mmap: &Mmap) -> Result<Option<Value<T>>> {
self.inner.get_or_read(index, mmap)
self.0.get_or_read(index, mmap)
}
#[inline]
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
let len = self.inner.len();
let len = self.0.len();
match len.cmp(&index.to_usize()?) {
Ordering::Greater => {
// dbg!(len, index, &self.pathbuf);
@@ -55,46 +44,42 @@ where
Ok(())
}
Ordering::Equal => {
self.inner.push(value);
self.0.push(value);
Ok(())
}
Ordering::Less => {
dbg!(index, value, len, self.path_height());
dbg!(index, value, len, self.0.header());
Err(Error::IndexTooHigh)
}
}
}
fn update_height(&mut self, height: Height) {
self.0.mut_header().update_height(height);
}
pub fn truncate_if_needed(&mut self, index: I, height: Height) -> Result<()> {
if self.height.is_none_or(|self_height| self_height != height) {
height.write(&self.path_height())?;
}
self.inner.truncate_if_needed(index)?;
self.update_height(height);
self.0.truncate_if_needed(index)?;
Ok(())
}
pub fn flush(&mut self, height: Height) -> Result<()> {
height.write(&self.path_height())?;
self.inner.flush()
self.update_height(height);
self.0.flush()
}
pub fn header(&self) -> &Header {
self.0.header()
}
pub fn mmap(&self) -> &ArcSwap<Mmap> {
self.inner.mmap()
self.0.mmap()
}
#[inline]
pub fn hasnt(&self, index: I) -> Result<bool> {
self.inner.has(index).map(|b| !b)
}
pub fn height(&self) -> brk_core::Result<Height> {
Height::try_from(self.path_height().as_path())
}
fn path_height(&self) -> PathBuf {
Self::path_height_(&self.inner.path())
}
fn path_height_(path: &Path) -> PathBuf {
path.join("height")
self.0.has(index).map(|b| !b)
}
}
@@ -105,22 +90,22 @@ where
{
#[inline]
fn version(&self) -> Version {
self.inner.version()
self.0.version()
}
#[inline]
fn name(&self) -> &str {
self.inner.name()
self.0.name()
}
#[inline]
fn len(&self) -> usize {
self.inner.len()
self.0.len()
}
#[inline]
fn modified_time(&self) -> Result<Duration> {
self.inner.modified_time()
self.0.modified_time()
}
#[inline]
@@ -135,7 +120,7 @@ where
}
pub trait AnyIndexedVec: AnyVec {
fn height(&self) -> brk_core::Result<Height>;
fn height(&self) -> Height;
fn flush(&mut self, height: Height) -> Result<()>;
}
@@ -144,8 +129,8 @@ where
I: StoredIndex,
T: StoredType,
{
fn height(&self) -> brk_core::Result<Height> {
self.height()
fn height(&self) -> Height {
self.0.header().height()
}
fn flush(&mut self, height: Height) -> Result<()> {
@@ -162,7 +147,7 @@ where
type IntoIter = StoredVecIterator<'a, I, T>;
fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
self.0.into_iter()
}
}

View File

@@ -1,5 +1,6 @@
use std::{
fs,
fs::{self, File},
io,
marker::PhantomData,
mem,
path::{Path, PathBuf},
@@ -14,14 +15,17 @@ use rayon::prelude::*;
use crate::{
AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec,
GenericStoredVec, StoredIndex, StoredType, UnsafeSlice,
Format, GenericStoredVec, HEADER_OFFSET, Header, StoredIndex, StoredType, UnsafeSlice,
};
const VERSION: Version = Version::ONE;
#[derive(Debug)]
pub struct RawVec<I, T> {
version: Version,
header: Header,
parent: PathBuf,
name: String,
file: Option<File>,
// Consider Arc<ArcSwap<Option<Mmap>>> for dataraces when reorg ?
mmap: Arc<ArcSwap<Mmap>>,
pushed: Vec<T>,
@@ -34,40 +38,58 @@ where
T: StoredType,
{
/// Same as import but will reset the folder under certain errors, so be careful !
pub fn forced_import(path: &Path, name: &str, version: Version) -> Result<Self> {
let res = Self::import(path, name, version);
pub fn forced_import(parent: &Path, name: &str, mut version: Version) -> Result<Self> {
version = version + VERSION;
let res = Self::import(parent, name, version);
match res {
Err(Error::WrongEndian) | Err(Error::DifferentVersion { .. }) => {
fs::remove_dir_all(path)?;
Self::import(path, name, version)
}
// Err(Error::DifferentCompressionMode)
// | Err(Error::WrongEndian)
// | Err(Error::WrongLength)
// | Err(Error::DifferentVersion { .. }) => {
// let path = Self::path_(parent, name);
// fs::remove_file(path)?;
// Self::import(parent, name, version)
// }
_ => res,
}
}
pub fn import(path: &Path, name: &str, version: Version) -> Result<Self> {
let (version, mmap) = {
let path = path.join(name).join(I::to_string());
fs::create_dir_all(&path)?;
let version_path = Self::path_version_(&path);
if !version.validate(version_path.as_ref())? {
version.write(version_path.as_ref())?;
pub fn import(parent: &Path, name: &str, version: Version) -> Result<Self> {
let path = Self::path_(parent, name);
let (file, mmap, header) = match Self::open_file_(&path) {
Ok(mut file) => {
if file.metadata()?.len() == 0 {
let header = Header::create_and_write(&mut file, version, Format::Raw)?;
let mmap = Self::new_mmap(&file)?;
(file, mmap, header)
} else {
let mmap = Self::new_mmap(&file)?;
// dbg!(&mmap[..]);
let header = Header::import_and_verify(&mmap, version, Format::Raw)?;
// dbg!((&header, name, I::to_string()));
(file, mmap, header)
}
}
let file = Self::open_file_(Self::path_vec_(&path).as_path())?;
let mmap = Arc::new(ArcSwap::new(Self::new_mmap(file)?));
(version, mmap)
Err(e) => match e.kind() {
io::ErrorKind::NotFound => {
fs::create_dir_all(Self::folder_(parent, name))?;
let mut file = Self::open_file_(&path)?;
let header = Header::create_and_write(&mut file, version, Format::Raw)?;
let mmap = Self::new_mmap(&file)?;
(file, mmap, header)
}
_ => return Err(e.into()),
},
};
let mmap = Arc::new(ArcSwap::new(mmap));
Ok(Self {
mmap,
version,
header,
file: Some(file),
name: name.to_string(),
parent: path.to_owned(),
parent: parent.to_owned(),
pushed: vec![],
phantom: PhantomData,
})
@@ -89,6 +111,10 @@ where
iter.set_(i);
iter
}
pub fn write_header_if_needed(&mut self) -> io::Result<()> {
self.header.write_if_needed(self.file.as_mut().unwrap())
}
}
impl<I, T> GenericStoredVec<I, T> for RawVec<I, T>
@@ -98,25 +124,43 @@ where
{
#[inline]
fn read_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let index = index * Self::SIZE_OF_T;
let index = index * Self::SIZE_OF_T + HEADER_OFFSET;
let slice = &mmap[index..(index + Self::SIZE_OF_T)];
T::try_read_from_bytes(slice)
.map(|v| Some(v))
.map_err(Error::from)
}
fn header(&self) -> &Header {
&self.header
}
fn mut_header(&mut self) -> &mut Header {
&mut self.header
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
&self.mmap
}
#[inline]
fn file(&self) -> &File {
self.file.as_ref().unwrap()
}
#[inline]
fn mut_file(&mut self) -> &mut File {
self.file.as_mut().unwrap()
}
#[inline]
fn stored_len(&self) -> usize {
self.stored_len_(&self.mmap.load())
}
#[inline]
fn stored_len_(&self, mmap: &Mmap) -> usize {
mmap.len() / Self::SIZE_OF_T
(mmap.len() - HEADER_OFFSET) / Self::SIZE_OF_T
}
#[inline]
@@ -129,11 +173,13 @@ where
}
#[inline]
fn path(&self) -> PathBuf {
self.parent.join(self.name()).join(I::to_string())
fn parent(&self) -> &Path {
&self.parent
}
fn flush(&mut self) -> Result<()> {
self.write_header_if_needed()?;
let pushed_len = self.pushed_len();
if pushed_len == 0 {
@@ -172,12 +218,16 @@ where
return Ok(());
}
let len = index * Self::SIZE_OF_T;
let len = index * Self::SIZE_OF_T + HEADER_OFFSET;
self.file_set_len(len as u64)?;
Ok(())
}
fn reset(&mut self) -> Result<()> {
self.reset_()
}
}
impl<I, T> AnyVec for RawVec<I, T>
@@ -187,12 +237,12 @@ where
{
#[inline]
fn version(&self) -> Version {
self.version
self.header.vec_version()
}
#[inline]
fn name(&self) -> &str {
self.name.as_str()
&self.name
}
#[inline]
@@ -219,9 +269,10 @@ where
impl<I, T> Clone for RawVec<I, T> {
fn clone(&self) -> Self {
Self {
version: self.version,
header: self.header.clone(),
parent: self.parent.clone(),
name: self.name.clone(),
file: None,
mmap: self.mmap.clone(),
pushed: vec![],
phantom: PhantomData,

View File

@@ -1,4 +1,5 @@
use std::{
fs::File,
path::{Path, PathBuf},
time::Duration,
};
@@ -9,7 +10,7 @@ use memmap2::Mmap;
use crate::{
AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec,
Format, GenericStoredVec, StoredIndex, StoredType,
Format, GenericStoredVec, Header, StoredIndex, StoredType,
};
use super::{CompressedVec, CompressedVecIterator, RawVec, RawVecIterator};
@@ -31,8 +32,6 @@ where
version: Version,
format: Format,
) -> Result<Self> {
// let path = I::path(path, value_name);
if version == Version::ZERO {
dbg!(path, name);
panic!("Version must be at least 1, can't verify endianess otherwise");
@@ -61,6 +60,22 @@ where
}
}
#[inline]
fn header(&self) -> &Header {
match self {
StoredVec::Raw(v) => v.header(),
StoredVec::Compressed(v) => v.header(),
}
}
#[inline]
fn mut_header(&mut self) -> &mut Header {
match self {
StoredVec::Raw(v) => v.mut_header(),
StoredVec::Compressed(v) => v.mut_header(),
}
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
match self {
@@ -69,6 +84,30 @@ where
}
}
#[inline]
fn parent(&self) -> &Path {
match self {
StoredVec::Raw(v) => v.parent(),
StoredVec::Compressed(v) => v.parent(),
}
}
#[inline]
fn file(&self) -> &File {
match self {
StoredVec::Raw(v) => v.file(),
StoredVec::Compressed(v) => v.file(),
}
}
#[inline]
fn mut_file(&mut self) -> &mut File {
match self {
StoredVec::Raw(v) => v.mut_file(),
StoredVec::Compressed(v) => v.mut_file(),
}
}
#[inline]
fn stored_len(&self) -> usize {
match self {
@@ -120,6 +159,13 @@ where
StoredVec::Compressed(v) => v.truncate_if_needed(index),
}
}
fn reset(&mut self) -> Result<()> {
match self {
StoredVec::Raw(v) => v.reset(),
StoredVec::Compressed(v) => v.reset(),
}
}
}
impl<I, T> AnyVec for StoredVec<I, T>