From 138ca80c106abe5a6c341cfffcff2f9784db1bf7 Mon Sep 17 00:00:00 2001 From: nym21 Date: Wed, 5 Feb 2025 00:23:58 +0100 Subject: [PATCH] global: snapshot --- computer/src/lib.rs | 18 +- computer/src/storage/storable_vecs/base.rs | 94 +--------- indexer/src/storage/storable_vecs/base.rs | 2 +- storable_vec/src/lib.rs | 191 ++++++++------------- storable_vec/src/main.rs | 8 +- 5 files changed, 88 insertions(+), 225 deletions(-) diff --git a/computer/src/lib.rs b/computer/src/lib.rs index 10a889e6b..0d4a788d8 100644 --- a/computer/src/lib.rs +++ b/computer/src/lib.rs @@ -13,8 +13,8 @@ use structs::*; pub struct Computer { outputs_dir: PathBuf, - vecs: StorableVecs, - trees: Fjalls, + pub vecs: StorableVecs, + pub trees: Fjalls, } impl Computer { @@ -53,20 +53,20 @@ impl Computer { self.vecs .txindex_to_last_txinindex - .compute_last_index_from_first(&indexer.vecs.txindex_to_first_txinindex, txinindexes_count)?; + .compute_last_index_from_first(&mut indexer.vecs.txindex_to_first_txinindex, txinindexes_count)?; self.vecs.txindex_to_inputcount.compute_count_from_indexes( - &indexer.vecs.txindex_to_first_txinindex, - &self.vecs.txindex_to_last_txinindex, + &mut indexer.vecs.txindex_to_first_txinindex, + &mut self.vecs.txindex_to_last_txinindex, )?; self.vecs .txindex_to_last_txoutindex - .compute_last_index_from_first(&indexer.vecs.txindex_to_first_txoutindex, txoutindexes_count)?; + .compute_last_index_from_first(&mut indexer.vecs.txindex_to_first_txoutindex, txoutindexes_count)?; self.vecs.txindex_to_outputcount.compute_count_from_indexes( - &indexer.vecs.txindex_to_first_txoutindex, - &self.vecs.txindex_to_last_txoutindex, + &mut indexer.vecs.txindex_to_first_txoutindex, + &mut self.vecs.txindex_to_last_txoutindex, )?; self.vecs @@ -75,7 +75,7 @@ impl Computer { self.vecs .height_to_last_txindex - .compute_last_index_from_first(&indexer.vecs.height_to_first_txindex, height_count)?; + .compute_last_index_from_first(&mut indexer.vecs.height_to_first_txindex, height_count)?; self.vecs.txindex_to_height.compute_inverse_less_to_more( &mut indexer.vecs.height_to_first_txindex, diff --git a/computer/src/storage/storable_vecs/base.rs b/computer/src/storage/storable_vecs/base.rs index a3cb4f25e..11cbbcee3 100644 --- a/computer/src/storage/storable_vecs/base.rs +++ b/computer/src/storage/storable_vecs/base.rs @@ -1,19 +1,11 @@ -use std::{ - error, - fmt::Debug, - io, - ops::{Add, Sub}, - path::Path, -}; +use std::{fmt::Debug, path::Path}; use derive_deref::{Deref, DerefMut}; -use storable_vec::{StorableVecIndex, StorableVecType, Version, SINGLE_THREAD}; +use storable_vec::{StorableVecIndex, StorableVecType, Version}; #[derive(Debug, Deref, DerefMut)] pub struct StorableVec(storable_vec::StorableVec); -const FLUSH_EVERY: usize = 10_000; - impl StorableVec where I: StorableVecIndex, @@ -23,85 +15,3 @@ where Ok(Self(storable_vec::StorableVec::forced_import(path, version)?)) } } - -impl StorableVec -where - I: StorableVecIndex, - T: StorableVecType, -{ - fn flush_vec_if_needed(&mut self) -> io::Result<()> { - if self.pushed_len() == FLUSH_EVERY { - self.flush() - } else { - Ok(()) - } - } - - pub fn compute_is_first_ordered( - &mut self, - self_to_other: &storable_vec::StorableVec, - other_to_self: &storable_vec::StorableVec, - ) -> storable_vec::Result<()> - where - A: StorableVecIndex + StorableVecType, - { - // let mut prev_a_opt = None; - // self_to_other.iter_from(I::from(self.len()), |(i, a)| { - // if prev_a_opt.is_none() { - // prev_a_opt.replace(a); - // self.push_if_needed(i, other_to_self.read_at(a) == i); - // } else { - // let prev_a = prev_a_opt.unwrap(); - // if a != prev_a - // } - // other_to_self.seek_read(a); - // self.push_if_needed(i, t(a)); - // Ok(()) - // }) - Ok(()) - } - - pub fn compute_last_index_from_first( - &mut self, - first_index_vec: &storable_vec::StorableVec, - final_len: usize, - ) -> color_eyre::Result<()> - where - T: Copy + From + Sub + StorableVecIndex, - { - let mut prev_index: Option = None; - first_index_vec.iter_from(I::from(self.len()), |(i, v)| { - if let Some(prev_index) = prev_index { - self.push_if_needed(prev_index, *v - T::from(1))?; - } - prev_index.replace(i); - Ok(self.flush_vec_if_needed()?) - })?; - if let Some(prev_index) = prev_index { - self.push_if_needed(prev_index, T::from(final_len) - T::from(1))?; - } - self.flush()?; - Ok(()) - } - - pub fn compute_count_from_indexes( - &mut self, - first_indexes: &storable_vec::StorableVec, - last_indexes: &storable_vec::StorableVec, - ) -> color_eyre::Result<()> - where - T: From, - T2: StorableVecType + Copy + Add + Sub + TryInto, - >::Error: error::Error + Send + Sync + 'static, - { - let (mut file_last, mut buf_last) = last_indexes.prepare_to_read_at_(self.len())?; - first_indexes.iter_from(I::from(self.len()), |(i, first_index)| { - let last_index = last_indexes.read_exact(&mut file_last, &mut buf_last)?; - let count = *last_index + 1_usize - *first_index; - self.push_if_needed(i, count.into())?; - Ok(self.flush_vec_if_needed()?) - })?; - self.flush()?; - Ok(()) - } -} diff --git a/indexer/src/storage/storable_vecs/base.rs b/indexer/src/storage/storable_vecs/base.rs index 99b6adfa8..5f32bdc26 100644 --- a/indexer/src/storage/storable_vecs/base.rs +++ b/indexer/src/storage/storable_vecs/base.rs @@ -5,7 +5,7 @@ use std::{ path::{Path, PathBuf}, }; -use storable_vec::{StorableVecIndex, StorableVecType, Version, CACHED_GETS}; +use storable_vec::{StorableVecIndex, StorableVecType, Version}; use super::Height; diff --git a/storable_vec/src/lib.rs b/storable_vec/src/lib.rs index 95da53292..cbaf540ae 100644 --- a/storable_vec/src/lib.rs +++ b/storable_vec/src/lib.rs @@ -1,11 +1,12 @@ use std::{ cmp::Ordering, + error, fmt::Debug, fs::{self, File, OpenOptions}, io::{self, Read, Seek, SeekFrom, Write}, marker::PhantomData, mem, - ops::Range, + ops::{Add, Range, Sub}, path::{Path, PathBuf}, sync::OnceLock, }; @@ -192,47 +193,18 @@ where } } - // #[inline] - // fn open_file_at_then_read(&self, index: I) -> Result { - // self.open_file_at_then_read_(Self::i_to_usize(index)?) - // } fn open_file_at_then_read(&self, index: usize) -> Result { - // let (mut file, mut buf) = self.open_file_at(index)?; let mut file = self.open_file()?; let mut buf = Self::create_buffer(); - - let byte_index = Self::index_to_byte_index(index); - Self::seek(&mut file, byte_index)?; - - Ok(Self::read_exact(&mut file, &mut buf)?.to_owned()) + Self::seek(&mut file, Self::index_to_byte_index(index))?; + Self::read_exact(&mut file, &mut buf).map(|v| v.to_owned()) } - // #[inline] - // fn open_file_at(&self, index: I) -> Result<(File, Buffer)> { - // self.open_file_at_(Self::i_to_usize(index)?) - // } - // fn open_file_at(&self, index: usize) -> Result<(File, Buffer)> { - // let mut file = self.open_file()?; - // let buf = Self::create_buffer(); - // let byte_index = Self::index_to_byte_index(index); - // Self::seek(&mut file, byte_index)?; - // Ok((file, buf)) - // } - // #[inline] - // fn seek_if_needed_(file: &mut File, index: I) -> Result { - // Self::seek_if_needed__(file, Self::i_to_usize(index)?).map_err(Error::IO) - // } - // #[inline] - // fn seek_if_needed(file: &mut File, index: usize) -> io::Result { - // let byte_index = Self::index_to_byte_index(index); - // if file.stream_position()? != byte_index { - // Self::seek(file, byte_index)?; - // } - // Ok(byte_index) - // } + #[inline] fn seek(file: &mut File, byte_index: u64) -> io::Result { file.seek(SeekFrom::Start(byte_index)) } + fn read_exact<'a>(file: &'a mut File, buf: &'a mut [u8]) -> Result<&'a T> { file.read_exact(buf)?; let v = T::unsafe_try_from_slice(&buf[..])?; @@ -261,74 +233,7 @@ where Err(Error::IndexTooHigh) } } - // Self::push_to_vec_if_needed(&mut self.pushed, index, value) } - // #[inline] - // fn push_if_needed__(&mut self, index: usize, value: T) -> Result<()> { - // Self::push_to_vec_if_needed_(&mut self.pushed, index, value) - // } - // #[inline] - // fn push_to_vec_if_needed(vec: &mut Vec, index: I, value: T) -> Result<()> { - // Self::push_to_vec_if_needed_(vec, Self::i_to_usize(index)?, value) - // } - // fn push_to_vec_if_needed_(vec: &mut Vec, index: usize, value: T) -> Result<()> { - // let len = vec.len(); - // match len.cmp(&index) { - // Ordering::Greater => { - // // dbg!(len, index, &self.pathbuf); - // // panic!(); - // Ok(()) - // } - // Ordering::Equal => { - // vec.push(value); - // Ok(()) - // } - // Ordering::Less => { - // dbg!(index, value); - // Err(Error::IndexTooHigh) - // } - // } - // } - - // pub fn update(&mut self, index: I, value: T) -> Result<()> { - // self._update(index.into(), value) - // } - // pub fn update_(&mut self, index: usize, value: T) -> Result<()> { - // if let Some(index) = self.index_to_pushed_index(index) { - // self.pushed[index] = value; - // } else { - // self.updated.insert(index, value); - // } - // Ok(()) - // } - - // pub fn fetch_update(&mut self, index: I, value: T) -> Result - // where - // T: Clone, - // { - // self._fetch_update(index.into(), value) - // } - // pub fn fetch_update_(&mut self, index: usize, value: T) -> Result - // where - // T: Clone, - // { - // let prev_opt = self.updated.insert(index, value); - // if let Some(prev) = prev_opt { - // Ok(prev) - // } else { - // Ok(self - // ._get(index)? - // .ok_or(Error::ExpectFileToHaveIndex)? - // .clone()) - // } - // } - - // pub fn remove(&mut self, index: I) { - // self._remove(index.into()) - // } - // pub fn remove_(&mut self, index: usize) { - // self.removed.insert(index); - // } #[inline] pub fn len(&self) -> usize { @@ -358,10 +263,6 @@ where pub fn hasnt(&self, index: I) -> Result { self.has(index).map(|b| !b) } - #[inline] - fn hasnt_(&self, index: usize) -> bool { - !self.has_(index) - } pub fn flush(&mut self) -> io::Result<()> { self.reset_disk_related_state()?; @@ -528,6 +429,7 @@ where fn get_(&mut self, index: usize) -> Result<&T> { let byte_index = Self::index_to_byte_index(index); if self.file_position != byte_index { + println!("seek"); self.file_position = Self::seek(&mut self.file, byte_index)?; } let res = Self::read_exact(&mut self.file, &mut self.buf); @@ -561,15 +463,6 @@ where } } - // #[inline] - // fn seek_if_needed(&mut self, index: I) -> Result<()> { - // if self.file_position == Self::index_to_byte_index(Self::i_to_usize(index)?) { - // return Ok(()); - // } - // self.file_position = Self::seek_if_needed_(&mut self.file, index)?; - // Ok(()) - // } - pub fn iter(&mut self, f: F) -> Result<()> where F: FnMut((I, &T)) -> Result<()>, @@ -605,6 +498,15 @@ where Ok(()) } + pub fn compute_transform(&mut self, other: &mut StorableVec, t: F) -> Result<()> + where + A: StorableVecType, + F: Fn(&A) -> T, + { + other.iter_from(I::from(self.len()), |(i, a)| self.push_if_needed(i, t(a)))?; + Ok(self.flush()?) + } + pub fn compute_inverse_more_to_less(&mut self, other: &mut StorableVec) -> Result<()> where I: StorableVecType, @@ -632,12 +534,59 @@ where Ok(self.flush()?) } - pub fn compute_transform(&mut self, other: &mut StorableVec, t: F) -> Result<()> + pub fn compute_last_index_from_first( + &mut self, + first_index_vec: &mut StorableVec, + final_len: usize, + ) -> Result<()> where - A: StorableVecType, - F: Fn(&A) -> T, + T: Copy + From + Sub + StorableVecIndex, { - other.iter_from(I::from(self.len()), |(i, a)| self.push_if_needed(i, t(a)))?; + let one = T::from(1); + let mut prev_index: Option = None; + first_index_vec.iter_from(I::from(self.len()), |(i, v)| { + if let Some(prev_index) = prev_index { + self.push_if_needed(prev_index, *v - one)?; + } + prev_index.replace(i); + Ok(()) + })?; + if let Some(prev_index) = prev_index { + self.push_if_needed(prev_index, T::from(final_len) - one)?; + } + Ok(self.flush()?) + } + + pub fn compute_count_from_indexes( + &mut self, + first_indexes: &mut StorableVec, + last_indexes: &mut StorableVec, + ) -> Result<()> + where + T: From, + T2: StorableVecType + Copy + Add + Sub + TryInto, + >::Error: error::Error + Send + Sync + 'static, + { + first_indexes.iter_from(I::from(self.len()), |(i, first_index)| { + let last_index = last_indexes.get(i)?; + let count = *last_index + 1_usize - *first_index; + self.push_if_needed(i, count.into()) + })?; + Ok(self.flush()?) + } + + pub fn compute_is_first_ordered( + &mut self, + self_to_other: &mut StorableVec, + other_to_self: &mut StorableVec, + ) -> Result<()> + where + A: StorableVecIndex + StorableVecType, + T: From, + { + self_to_other.iter_from(I::from(self.len()), |(i, other)| { + self.push_if_needed(i, T::from(other_to_self.get(*other)? == &i)) + })?; Ok(self.flush()?) } } @@ -648,12 +597,12 @@ where T: StorableVecType, { #[inline] - pub fn get(&self, index: I) -> Result>> { + pub fn get(&self, index: I) -> Result> { self.get_(Self::i_to_usize(index)?) } #[inline] - fn get_(&self, index: usize) -> Result>> { - Ok(Some(Value::Owned(self.open_file_at_then_read(index)?.to_owned()))) + fn get_(&self, index: usize) -> Result> { + Ok(Some(self.open_file_at_then_read(index)?.to_owned())) } // Add iter iter_from iter_range collect.. diff --git a/storable_vec/src/main.rs b/storable_vec/src/main.rs index 08e5b9612..9c55fea26 100644 --- a/storable_vec/src/main.rs +++ b/storable_vec/src/main.rs @@ -1,6 +1,6 @@ use std::path::Path; -use storable_vec::{StorableVec, Version, CACHED_GETS}; +use storable_vec::{StorableVec, Version, ASYNC_READ_ONLY, CACHED_GETS, SINGLE_THREAD}; fn main() -> Result<(), Box> { { @@ -17,9 +17,13 @@ fn main() -> Result<(), Box> { } { - let vec: StorableVec = StorableVec::forced_import(Path::new("./v"), Version::from(1))?; + let mut vec: StorableVec = + StorableVec::forced_import(Path::new("./v"), Version::from(1))?; dbg!(vec.get(0)?); // 0 + dbg!(vec.get(1)?); // 0 + dbg!(vec.get(2)?); // 0 + dbg!(vec.get(0)?); // 0 } Ok(())