diff --git a/Cargo.toml b/Cargo.toml index d8e9bfe38..1d166cb17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,3 +24,4 @@ rayon = "1.10.0" storable_vec = { path = "storable_vec" } struct_iterable = { path = "struct_iterable" } unsafe_slice_serde = { path = "unsafe_slice_serde" } +zerocopy = "0.8.15" diff --git a/computer/src/lib.rs b/computer/src/lib.rs index 124ed8369..10a889e6b 100644 --- a/computer/src/lib.rs +++ b/computer/src/lib.rs @@ -7,16 +7,17 @@ use exit::Exit; mod storage; mod structs; +use storable_vec::{CACHED_GETS, SINGLE_THREAD}; use storage::{Fjalls, StorableVecs}; use structs::*; -pub struct Computer { +pub struct Computer { outputs_dir: PathBuf, - vecs: StorableVecs, + vecs: StorableVecs, trees: Fjalls, } -impl Computer { +impl Computer { pub fn import(outputs_dir: &Path) -> color_eyre::Result { let outputs_dir = outputs_dir.to_owned(); let computed_dir = outputs_dir.join("computed"); @@ -29,56 +30,63 @@ impl Computer { }) } - pub fn compute(&mut self, bitcoin_dir: &Path, rpc: rpc::Client, exit: &Exit) -> color_eyre::Result<()> { - let mut indexer = Indexer::import(&self.outputs_dir.join("indexes"))?; + fn open_indexer(&self) -> color_eyre::Result> { + Indexer::import(&self.outputs_dir.join("indexes")) + } +} +impl Computer { + pub fn compute(&mut self, bitcoin_dir: &Path, rpc: rpc::Client, exit: &Exit) -> color_eyre::Result<()> { if false { + let mut indexer: Indexer = self.open_indexer()?; indexer.index(bitcoin_dir, rpc, exit)?; } - let height_count = indexer.vecs().height_to_size.len(); - let txindexes_count = indexer.vecs().txindex_to_txid.len(); - let txinindexes_count = indexer.vecs().txinindex_to_txoutindex.len(); - let txoutindexes_count = indexer.vecs().txoutindex_to_addressindex.len(); + let mut indexer: Indexer = self.open_indexer()?; + + let height_count = indexer.vecs.height_to_size.len(); + let txindexes_count = indexer.vecs.txindex_to_txid.len(); + let txinindexes_count = indexer.vecs.txinindex_to_txoutindex.len(); + let txoutindexes_count = indexer.vecs.txoutindex_to_addressindex.len(); // TODO: Remove all outdated self.vecs .txindex_to_last_txinindex - .compute_last_index_from_first(&indexer.vecs().txindex_to_first_txinindex, txinindexes_count)?; + .compute_last_index_from_first(&indexer.vecs.txindex_to_first_txinindex, txinindexes_count)?; self.vecs.txindex_to_inputcount.compute_count_from_indexes( - &indexer.vecs().txindex_to_first_txinindex, + &indexer.vecs.txindex_to_first_txinindex, &self.vecs.txindex_to_last_txinindex, )?; self.vecs .txindex_to_last_txoutindex - .compute_last_index_from_first(&indexer.vecs().txindex_to_first_txoutindex, txoutindexes_count)?; + .compute_last_index_from_first(&indexer.vecs.txindex_to_first_txoutindex, txoutindexes_count)?; self.vecs.txindex_to_outputcount.compute_count_from_indexes( - &indexer.vecs().txindex_to_first_txoutindex, + &indexer.vecs.txindex_to_first_txoutindex, &self.vecs.txindex_to_last_txoutindex, )?; self.vecs .height_to_date - .compute_transform(&indexer.vecs().height_to_timestamp, |timestamp| Date::from(timestamp))?; + .compute_transform(&mut indexer.vecs.height_to_timestamp, |timestamp| Date::from(timestamp))?; self.vecs .height_to_last_txindex - .compute_last_index_from_first(&indexer.vecs().height_to_first_txindex, height_count)?; + .compute_last_index_from_first(&indexer.vecs.height_to_first_txindex, height_count)?; self.vecs.txindex_to_height.compute_inverse_less_to_more( - &indexer.vecs().height_to_first_txindex, - &self.vecs.height_to_last_txindex, + &mut indexer.vecs.height_to_first_txindex, + &mut self.vecs.height_to_last_txindex, )?; let date_count = self.vecs.height_to_date.len(); self.vecs .date_to_first_height - .compute_inverse_more_to_less(&self.vecs.height_to_date)?; + .compute_inverse_more_to_less(&mut self.vecs.height_to_date)?; // --- // Date to X diff --git a/computer/src/main.rs b/computer/src/main.rs index 92623c1ed..d6c4fd584 100644 --- a/computer/src/main.rs +++ b/computer/src/main.rs @@ -3,6 +3,7 @@ use std::path::Path; use biter::rpc; use bomputer::Computer; use exit::Exit; +use storable_vec::SINGLE_THREAD; mod structs; @@ -18,7 +19,7 @@ pub fn main() -> color_eyre::Result<()> { let i = std::time::Instant::now(); - let mut computer = Computer::import(Path::new("../_outputs"))?; + let mut computer: Computer = Computer::import(Path::new("../_outputs"))?; computer.compute(data_dir, rpc, &exit)?; diff --git a/computer/src/storage/storable_vecs/base.rs b/computer/src/storage/storable_vecs/base.rs index b19e8196a..a3cb4f25e 100644 --- a/computer/src/storage/storable_vecs/base.rs +++ b/computer/src/storage/storable_vecs/base.rs @@ -7,22 +7,28 @@ use std::{ }; use derive_deref::{Deref, DerefMut}; -use storable_vec::{StorableVecIndex, StorableVecType, Version}; +use storable_vec::{StorableVecIndex, StorableVecType, Version, SINGLE_THREAD}; #[derive(Debug, Deref, DerefMut)] -pub struct StorableVec(storable_vec::StorableVec); +pub struct StorableVec(storable_vec::StorableVec); const FLUSH_EVERY: usize = 10_000; -impl StorableVec +impl StorableVec where I: StorableVecIndex, T: StorableVecType, { - pub fn import(path: &Path, version: Version) -> io::Result { - Ok(Self(storable_vec::StorableVec::import(path, version)?)) + pub fn import(path: &Path, version: Version) -> storable_vec::Result { + Ok(Self(storable_vec::StorableVec::forced_import(path, version)?)) } +} +impl StorableVec +where + I: StorableVecIndex, + T: StorableVecType, +{ fn flush_vec_if_needed(&mut self) -> io::Result<()> { if self.pushed_len() == FLUSH_EVERY { self.flush() @@ -31,53 +37,10 @@ where } } - pub fn compute_inverse_more_to_less(&mut self, other: &storable_vec::StorableVec) -> storable_vec::Result<()> - where - I: StorableVecType, - T: StorableVecIndex, - { - other.iter_from(self.last()?.map(|v| *v).unwrap_or_default(), |(v, i)| { - self.push_if_needed(*i, v) - }) - } - - pub fn compute_inverse_less_to_more( - &mut self, - first_indexes: &storable_vec::StorableVec, - last_indexes: &storable_vec::StorableVec, - ) -> color_eyre::Result<()> - where - I: StorableVecType, - T: StorableVecIndex, - { - let (mut file_last, mut buf_last) = last_indexes.prepare_to_read_at_(self.len())?; - first_indexes.iter_from(T::from(self.len()), |(value, first_index)| { - let first_index: usize = (*first_index) - .try_into() - .map_err(|_| storable_vec::Error::FailedKeyTryIntoUsize)?; - let last_index = last_indexes.read_exact(&mut file_last, &mut buf_last)?; - let last_index: usize = (*last_index) - .try_into() - .map_err(|_| storable_vec::Error::FailedKeyTryIntoUsize)?; - (first_index..last_index).try_for_each(|index| self.push_if_needed(I::from(index), value))?; - Ok(()) - })?; - self.flush()?; - Ok(()) - } - - pub fn compute_transform(&mut self, other: &storable_vec::StorableVec, t: F) -> storable_vec::Result<()> - where - A: StorableVecType, - F: Fn(&A) -> T, - { - other.iter_from(I::from(self.len()), |(i, a)| self.push_if_needed(i, t(a))) - } - pub fn compute_is_first_ordered( &mut self, - self_to_other: &storable_vec::StorableVec, - other_to_self: &storable_vec::StorableVec, + self_to_other: &storable_vec::StorableVec, + other_to_self: &storable_vec::StorableVec, ) -> storable_vec::Result<()> where A: StorableVecIndex + StorableVecType, @@ -100,7 +63,7 @@ where pub fn compute_last_index_from_first( &mut self, - first_index_vec: &storable_vec::StorableVec, + first_index_vec: &storable_vec::StorableVec, final_len: usize, ) -> color_eyre::Result<()> where @@ -112,7 +75,7 @@ where self.push_if_needed(prev_index, *v - T::from(1))?; } prev_index.replace(i); - self.flush_vec_if_needed().map_err(storable_vec::Error::IO) + Ok(self.flush_vec_if_needed()?) })?; if let Some(prev_index) = prev_index { self.push_if_needed(prev_index, T::from(final_len) - T::from(1))?; @@ -123,8 +86,8 @@ where pub fn compute_count_from_indexes( &mut self, - first_indexes: &storable_vec::StorableVec, - last_indexes: &storable_vec::StorableVec, + first_indexes: &storable_vec::StorableVec, + last_indexes: &storable_vec::StorableVec, ) -> color_eyre::Result<()> where T: From, @@ -136,7 +99,7 @@ where let last_index = last_indexes.read_exact(&mut file_last, &mut buf_last)?; let count = *last_index + 1_usize - *first_index; self.push_if_needed(i, count.into())?; - self.flush_vec_if_needed().map_err(storable_vec::Error::IO) + Ok(self.flush_vec_if_needed()?) })?; self.flush()?; Ok(()) diff --git a/computer/src/storage/storable_vecs/mod.rs b/computer/src/storage/storable_vecs/mod.rs index c2becc932..a63fdbc2c 100644 --- a/computer/src/storage/storable_vecs/mod.rs +++ b/computer/src/storage/storable_vecs/mod.rs @@ -9,33 +9,33 @@ mod base; use base::*; -pub struct StorableVecs { - pub date_to_first_height: StorableVec, - // pub height_to_block_interval: StorableVec, - pub height_to_date: StorableVec, - // pub height_to_fee: StorableVec, - // pub height_to_inputcount: StorableVec, - // pub height_to_last_addressindex: StorableVec, - pub height_to_last_txindex: StorableVec, - // pub height_to_last_txoutindex: StorableVec, - // pub height_to_maxfeerate: StorableVec, - // pub height_to_medianfeerate: StorableVec, - // pub height_to_minfeerate: StorableVec, - // pub height_to_outputcount: StorableVec, - // pub height_to_subsidy: StorableVec, - // pub height_to_totalfees: StorableVec, - // pub height_to_txcount: StorableVec, - pub txindex_to_fee: StorableVec, - pub txindex_to_height: StorableVec, - pub txindex_to_is_coinbase: StorableVec, - // pub txindex_to_feerate: StorableVec, - pub txindex_to_inputcount: StorableVec, - pub txindex_to_last_txinindex: StorableVec, - pub txindex_to_last_txoutindex: StorableVec, - pub txindex_to_outputcount: StorableVec, +pub struct StorableVecs { + pub date_to_first_height: StorableVec, + // pub height_to_block_interval: StorableVec, + pub height_to_date: StorableVec, + // pub height_to_fee: StorableVec, + // pub height_to_inputcount: StorableVec, + // pub height_to_last_addressindex: StorableVec, + pub height_to_last_txindex: StorableVec, + // pub height_to_last_txoutindex: StorableVec, + // pub height_to_maxfeerate: StorableVec, + // pub height_to_medianfeerate: StorableVec, + // pub height_to_minfeerate: StorableVec, + // pub height_to_outputcount: StorableVec, + // pub height_to_subsidy: StorableVec, + // pub height_to_totalfees: StorableVec, + // pub height_to_txcount: StorableVec, + pub txindex_to_fee: StorableVec, + pub txindex_to_height: StorableVec, + pub txindex_to_is_coinbase: StorableVec, + // pub txindex_to_feerate: StorableVec, + pub txindex_to_inputcount: StorableVec, + pub txindex_to_last_txinindex: StorableVec, + pub txindex_to_last_txoutindex: StorableVec, + pub txindex_to_outputcount: StorableVec, } -impl StorableVecs { +impl StorableVecs { pub fn import(path: &Path) -> color_eyre::Result { fs::create_dir_all(path)?; diff --git a/computer/src/structs/date.rs b/computer/src/structs/date.rs index fd6937b0f..9d965662b 100644 --- a/computer/src/structs/date.rs +++ b/computer/src/structs/date.rs @@ -5,9 +5,15 @@ use color_eyre::eyre::eyre; use derive_deref::Deref; use jiff::{civil::Date as _Date, tz::TimeZone, Span}; -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deref)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deref)] pub struct Date(_Date); +impl Default for Date { + fn default() -> Self { + Self::INDEX_ZERO + } +} + impl Date { const INDEX_ZERO: Self = Self(_Date::constant(2009, 1, 3)); const INDEX_ONE: Self = Self(_Date::constant(2009, 1, 9)); diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 2256ebf1b..511feade3 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -13,6 +13,7 @@ use bitcoin::{Transaction, TxIn, TxOut, Txid}; use color_eyre::eyre::{eyre, ContextCompat}; use exit::Exit; use rayon::prelude::*; +use storable_vec::CACHED_GETS; mod storage; mod structs; @@ -29,26 +30,20 @@ pub use structs::{ const UNSAFE_BLOCKS: u32 = 100; const SNAPSHOT_BLOCK_RANGE: usize = 1000; -pub struct Indexer { - vecs: StorableVecs, - trees: Fjalls, +pub struct Indexer { + pub vecs: StorableVecs, + pub trees: Fjalls, } -impl Indexer { +impl Indexer { pub fn import(indexes_dir: &Path) -> color_eyre::Result { let vecs = StorableVecs::import(&indexes_dir.join("vecs"))?; let trees = Fjalls::import(&indexes_dir.join("fjall"))?; Ok(Self { vecs, trees }) } +} - pub fn vecs(&self) -> &StorableVecs { - &self.vecs - } - - pub fn trees(&self) -> &Fjalls { - &self.trees - } - +impl Indexer { pub fn index(&mut self, bitcoin_dir: &Path, rpc: rpc::Client, exit: &Exit) -> color_eyre::Result<()> { let check_collisions = true; @@ -80,22 +75,23 @@ impl Indexer { let mut p2wpkhindex_global = vecs.height_to_first_p2wpkhindex.get_or_default(height)?; let mut p2wshindex_global = vecs.height_to_first_p2wshindex.get_or_default(height)?; - let export = |trees: &mut Fjalls, vecs: &mut StorableVecs, height: Height| -> color_eyre::Result<()> { - println!("Exporting..."); + let export = + |trees: &mut Fjalls, vecs: &mut StorableVecs, height: Height| -> color_eyre::Result<()> { + println!("Exporting..."); - exit.block(); + exit.block(); + + thread::scope(|scope| -> color_eyre::Result<()> { + let vecs_handle = scope.spawn(|| vecs.flush(height)); + trees.commit(height)?; + vecs_handle.join().unwrap()?; + Ok(()) + })?; + + exit.unblock(); - thread::scope(|scope| -> color_eyre::Result<()> { - let vecs_handle = scope.spawn(|| vecs.flush(height)); - trees.commit(height)?; - vecs_handle.join().unwrap()?; Ok(()) - })?; - - exit.unblock(); - - Ok(()) - }; + }; biter::new(bitcoin_dir, Some(height.into()), None, rpc) .iter() @@ -104,7 +100,7 @@ impl Indexer { height = Height::from(_height); - if let Some(saved_blockhash) = vecs.height_to_blockhash.cached_get(height)? { + if let Some(saved_blockhash) = vecs.height_to_blockhash.get(height)? { if &blockhash != saved_blockhash.as_ref() { todo!("Rollback not implemented"); // trees.rollback_from(&mut rtx, height, &exit)?; @@ -260,7 +256,7 @@ impl Indexer { let txoutindex = *vecs .txindex_to_first_txoutindex - .cached_get(prev_txindex)? + .get(prev_txindex)? .context("Expect txoutindex to not be none") .inspect_err(|_| { dbg!(outpoint.txid, prev_txindex, vout); @@ -336,12 +332,12 @@ impl Indexer { let prev_addresstype = *vecs .addressindex_to_addresstype - .cached_get(addressindex)? + .get(addressindex)? .context("Expect to have address type")?; let addresstypeindex = *vecs .addressindex_to_addresstypeindex - .cached_get(addressindex)? + .get(addressindex)? .context("Expect to have address type index")?; // Good first time // Wrong after rerun @@ -605,7 +601,7 @@ impl Indexer { // Ok if `get` is not par as should happen only twice let prev_txid = vecs .txindex_to_txid - .cached_get(prev_txindex)? + .get(prev_txindex)? .context("To have txid for txindex") .inspect_err(|_| { dbg!(txindex, txid, len); @@ -630,7 +626,7 @@ impl Indexer { if !is_dup { let prev_height = - vecs.txindex_to_height.cached_get(prev_txindex)?.expect("To have height"); + vecs.txindex_to_height.get(prev_txindex)?.expect("To have height"); dbg!(height, txid, txindex, prev_height, prev_txid, prev_txindex); return Err(eyre!("Expect none")); } diff --git a/indexer/src/main.rs b/indexer/src/main.rs index d51f8bac7..4cb954e3f 100644 --- a/indexer/src/main.rs +++ b/indexer/src/main.rs @@ -3,6 +3,7 @@ use std::path::Path; use bindex::Indexer; use biter::rpc; use exit::Exit; +use storable_vec::CACHED_GETS; fn main() -> color_eyre::Result<()> { color_eyre::install()?; @@ -16,7 +17,7 @@ fn main() -> color_eyre::Result<()> { let i = std::time::Instant::now(); - let mut indexer = Indexer::import(Path::new("../_outputs/indexes"))?; + let mut indexer: Indexer = Indexer::import(Path::new("../_outputs/indexes"))?; indexer.index(data_dir, rpc, &exit)?; diff --git a/indexer/src/storage/storable_vecs/base.rs b/indexer/src/storage/storable_vecs/base.rs index 8d1030ab7..99b6adfa8 100644 --- a/indexer/src/storage/storable_vecs/base.rs +++ b/indexer/src/storage/storable_vecs/base.rs @@ -5,25 +5,25 @@ use std::{ path::{Path, PathBuf}, }; -use storable_vec::{StorableVecIndex, StorableVecType, Version}; +use storable_vec::{StorableVecIndex, StorableVecType, Version, CACHED_GETS}; use super::Height; #[derive(Debug)] -pub struct StorableVec { +pub struct StorableVec { height: Option, - vec: storable_vec::StorableVec, + vec: storable_vec::StorableVec, } -impl StorableVec +impl StorableVec where I: StorableVecIndex, T: StorableVecType, { - pub fn import(path: &Path, version: Version) -> io::Result { + pub fn import(path: &Path, version: Version) -> storable_vec::Result { Ok(Self { height: Height::try_from(Self::path_height_(path).as_path()).ok(), - vec: storable_vec::StorableVec::import(path, version)?, + vec: storable_vec::StorableVec::forced_import(path, version)?, }) } @@ -53,13 +53,13 @@ where } } -impl Deref for StorableVec { - type Target = storable_vec::StorableVec; +impl Deref for StorableVec { + type Target = storable_vec::StorableVec; fn deref(&self) -> &Self::Target { &self.vec } } -impl DerefMut for StorableVec { +impl DerefMut for StorableVec { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.vec } @@ -68,10 +68,9 @@ impl DerefMut for StorableVec { pub trait AnyStorableVec { fn height(&self) -> color_eyre::Result; fn flush(&mut self, height: Height) -> io::Result<()>; - fn reset_cache(&mut self); } -impl AnyStorableVec for StorableVec +impl AnyStorableVec for StorableVec where I: StorableVecIndex, T: StorableVecType, @@ -80,10 +79,6 @@ where self.height() } - fn reset_cache(&mut self) { - self.vec.reset_cache() - } - fn flush(&mut self, height: Height) -> io::Result<()> { self.flush(height) } diff --git a/indexer/src/storage/storable_vecs/mod.rs b/indexer/src/storage/storable_vecs/mod.rs index 15aa0c749..4c43f3e8f 100644 --- a/indexer/src/storage/storable_vecs/mod.rs +++ b/indexer/src/storage/storable_vecs/mod.rs @@ -3,7 +3,7 @@ use std::{fs, io, path::Path}; use biter::bitcoin::{self, transaction, BlockHash, Txid, Weight}; use exit::Exit; use rayon::prelude::*; -use storable_vec::Version; +use storable_vec::{Version, CACHED_GETS}; use crate::structs::{ Addressbytes, Addressindex, Addresstype, Addresstypeindex, Amount, Height, P2PK33AddressBytes, P2PK65AddressBytes, @@ -15,52 +15,52 @@ mod base; pub use base::*; -pub struct StorableVecs { - pub addressindex_to_addresstype: StorableVec, - pub addressindex_to_addresstypeindex: StorableVec, - pub addressindex_to_height: StorableVec, - pub height_to_blockhash: StorableVec, - pub height_to_difficulty: StorableVec, - pub height_to_first_addressindex: StorableVec, - pub height_to_first_emptyindex: StorableVec, - pub height_to_first_multisigindex: StorableVec, - pub height_to_first_opreturnindex: StorableVec, - pub height_to_first_pushonlyindex: StorableVec, - pub height_to_first_txindex: StorableVec, - pub height_to_first_txinindex: StorableVec, - pub height_to_first_txoutindex: StorableVec, - pub height_to_first_unknownindex: StorableVec, - pub height_to_first_p2pk33index: StorableVec, - pub height_to_first_p2pk65index: StorableVec, - pub height_to_first_p2pkhindex: StorableVec, - pub height_to_first_p2shindex: StorableVec, - pub height_to_first_p2trindex: StorableVec, - pub height_to_first_p2wpkhindex: StorableVec, - pub height_to_first_p2wshindex: StorableVec, - pub height_to_size: StorableVec, - pub height_to_timestamp: StorableVec, - pub height_to_weight: StorableVec, - pub p2pk33index_to_p2pk33addressbytes: StorableVec, - pub p2pk65index_to_p2pk65addressbytes: StorableVec, - pub p2pkhindex_to_p2pkhaddressbytes: StorableVec, - pub p2shindex_to_p2shaddressbytes: StorableVec, - pub p2trindex_to_p2traddressbytes: StorableVec, - pub p2wpkhindex_to_p2wpkhaddressbytes: StorableVec, - pub p2wshindex_to_p2wshaddressbytes: StorableVec, - pub txindex_to_first_txinindex: StorableVec, - pub txindex_to_first_txoutindex: StorableVec, - pub txindex_to_height: StorableVec, - pub txindex_to_locktime: StorableVec, - pub txindex_to_txid: StorableVec, - pub txindex_to_txversion: StorableVec, - pub txinindex_to_txoutindex: StorableVec, - pub txoutindex_to_addressindex: StorableVec, - pub txoutindex_to_amount: StorableVec, +pub struct StorableVecs { + pub addressindex_to_addresstype: StorableVec, + pub addressindex_to_addresstypeindex: StorableVec, + pub addressindex_to_height: StorableVec, + pub height_to_blockhash: StorableVec, + pub height_to_difficulty: StorableVec, + pub height_to_first_addressindex: StorableVec, + pub height_to_first_emptyindex: StorableVec, + pub height_to_first_multisigindex: StorableVec, + pub height_to_first_opreturnindex: StorableVec, + pub height_to_first_pushonlyindex: StorableVec, + pub height_to_first_txindex: StorableVec, + pub height_to_first_txinindex: StorableVec, + pub height_to_first_txoutindex: StorableVec, + pub height_to_first_unknownindex: StorableVec, + pub height_to_first_p2pk33index: StorableVec, + pub height_to_first_p2pk65index: StorableVec, + pub height_to_first_p2pkhindex: StorableVec, + pub height_to_first_p2shindex: StorableVec, + pub height_to_first_p2trindex: StorableVec, + pub height_to_first_p2wpkhindex: StorableVec, + pub height_to_first_p2wshindex: StorableVec, + pub height_to_size: StorableVec, + pub height_to_timestamp: StorableVec, + pub height_to_weight: StorableVec, + pub p2pk33index_to_p2pk33addressbytes: StorableVec, + pub p2pk65index_to_p2pk65addressbytes: StorableVec, + pub p2pkhindex_to_p2pkhaddressbytes: StorableVec, + pub p2shindex_to_p2shaddressbytes: StorableVec, + pub p2trindex_to_p2traddressbytes: StorableVec, + pub p2wpkhindex_to_p2wpkhaddressbytes: StorableVec, + pub p2wshindex_to_p2wshaddressbytes: StorableVec, + pub txindex_to_first_txinindex: StorableVec, + pub txindex_to_first_txoutindex: StorableVec, + pub txindex_to_height: StorableVec, + pub txindex_to_locktime: StorableVec, + pub txindex_to_txid: StorableVec, + pub txindex_to_txversion: StorableVec, + pub txinindex_to_txoutindex: StorableVec, + pub txoutindex_to_addressindex: StorableVec, + pub txoutindex_to_amount: StorableVec, } // const UNSAFE_BLOCKS: usize = 100; -impl StorableVecs { +impl StorableVecs { pub fn import(path: &Path) -> color_eyre::Result { fs::create_dir_all(path)?; @@ -180,67 +180,6 @@ impl StorableVecs { }) } - pub fn push_addressbytes_if_needed( - &mut self, - index: Addresstypeindex, - addressbytes: Addressbytes, - ) -> storable_vec::Result<()> { - match addressbytes { - Addressbytes::P2PK65(bytes) => self.p2pk65index_to_p2pk65addressbytes.push_if_needed(index, bytes), - Addressbytes::P2PK33(bytes) => self.p2pk33index_to_p2pk33addressbytes.push_if_needed(index, bytes), - Addressbytes::P2PKH(bytes) => self.p2pkhindex_to_p2pkhaddressbytes.push_if_needed(index, bytes), - Addressbytes::P2SH(bytes) => self.p2shindex_to_p2shaddressbytes.push_if_needed(index, bytes), - Addressbytes::P2WPKH(bytes) => self.p2wpkhindex_to_p2wpkhaddressbytes.push_if_needed(index, bytes), - Addressbytes::P2WSH(bytes) => self.p2wshindex_to_p2wshaddressbytes.push_if_needed(index, bytes), - Addressbytes::P2TR(bytes) => self.p2trindex_to_p2traddressbytes.push_if_needed(index, bytes), - } - } - - pub fn get_addressbytes( - &self, - addresstype: Addresstype, - addresstypeindex: Addresstypeindex, - ) -> storable_vec::Result> { - Ok(match addresstype { - Addresstype::P2PK65 => self - .p2pk65index_to_p2pk65addressbytes - .cached_get(addresstypeindex)? - // .map(|v| Addressbytes::from(v.clone())), - .map(|v| Addressbytes::from(v.into_inner())), - Addresstype::P2PK33 => self - .p2pk33index_to_p2pk33addressbytes - .cached_get(addresstypeindex)? - // .map(|v| Addressbytes::from(v.clone())), - .map(|v| Addressbytes::from(v.into_inner())), - Addresstype::P2PKH => self - .p2pkhindex_to_p2pkhaddressbytes - .cached_get(addresstypeindex)? - // .map(|v| Addressbytes::from(v.clone())), - .map(|v| Addressbytes::from(v.into_inner())), - Addresstype::P2SH => self - .p2shindex_to_p2shaddressbytes - .cached_get(addresstypeindex)? - // .map(|v| Addressbytes::from(v.clone())), - .map(|v| Addressbytes::from(v.into_inner())), - Addresstype::P2WPKH => self - .p2wpkhindex_to_p2wpkhaddressbytes - .cached_get(addresstypeindex)? - // .map(|v| Addressbytes::from(v.clone())), - .map(|v| Addressbytes::from(v.into_inner())), - Addresstype::P2WSH => self - .p2wshindex_to_p2wshaddressbytes - .cached_get(addresstypeindex)? - // .map(|v| Addressbytes::from(v.clone())), - .map(|v| Addressbytes::from(v.into_inner())), - Addresstype::P2TR => self - .p2trindex_to_p2traddressbytes - .cached_get(addresstypeindex)? - // .map(|v| Addressbytes::from(v.clone())), - .map(|v| Addressbytes::from(v.into_inner())), - _ => unreachable!(), - }) - } - #[allow(unused)] pub fn rollback_from(&mut self, _height: Height, _exit: &Exit) -> color_eyre::Result<()> { panic!(); @@ -355,10 +294,6 @@ impl StorableVecs { // Ok(()) } - pub fn reset_cache(&mut self) { - self.as_mut_slice().into_par_iter().for_each(|vec| vec.reset_cache()) - } - pub fn flush(&mut self, height: Height) -> io::Result<()> { self.as_mut_slice() .into_par_iter() @@ -463,3 +398,66 @@ impl StorableVecs { ] } } + +impl StorableVecs { + pub fn get_addressbytes( + &self, + addresstype: Addresstype, + addresstypeindex: Addresstypeindex, + ) -> storable_vec::Result> { + Ok(match addresstype { + Addresstype::P2PK65 => self + .p2pk65index_to_p2pk65addressbytes + .get(addresstypeindex)? + // .map(|v| Addressbytes::from(v.clone())), + .map(|v| Addressbytes::from(v.into_inner())), + Addresstype::P2PK33 => self + .p2pk33index_to_p2pk33addressbytes + .get(addresstypeindex)? + // .map(|v| Addressbytes::from(v.clone())), + .map(|v| Addressbytes::from(v.into_inner())), + Addresstype::P2PKH => self + .p2pkhindex_to_p2pkhaddressbytes + .get(addresstypeindex)? + // .map(|v| Addressbytes::from(v.clone())), + .map(|v| Addressbytes::from(v.into_inner())), + Addresstype::P2SH => self + .p2shindex_to_p2shaddressbytes + .get(addresstypeindex)? + // .map(|v| Addressbytes::from(v.clone())), + .map(|v| Addressbytes::from(v.into_inner())), + Addresstype::P2WPKH => self + .p2wpkhindex_to_p2wpkhaddressbytes + .get(addresstypeindex)? + // .map(|v| Addressbytes::from(v.clone())), + .map(|v| Addressbytes::from(v.into_inner())), + Addresstype::P2WSH => self + .p2wshindex_to_p2wshaddressbytes + .get(addresstypeindex)? + // .map(|v| Addressbytes::from(v.clone())), + .map(|v| Addressbytes::from(v.into_inner())), + Addresstype::P2TR => self + .p2trindex_to_p2traddressbytes + .get(addresstypeindex)? + // .map(|v| Addressbytes::from(v.clone())), + .map(|v| Addressbytes::from(v.into_inner())), + _ => unreachable!(), + }) + } + + pub fn push_addressbytes_if_needed( + &mut self, + index: Addresstypeindex, + addressbytes: Addressbytes, + ) -> storable_vec::Result<()> { + match addressbytes { + Addressbytes::P2PK65(bytes) => self.p2pk65index_to_p2pk65addressbytes.push_if_needed(index, bytes), + Addressbytes::P2PK33(bytes) => self.p2pk33index_to_p2pk33addressbytes.push_if_needed(index, bytes), + Addressbytes::P2PKH(bytes) => self.p2pkhindex_to_p2pkhaddressbytes.push_if_needed(index, bytes), + Addressbytes::P2SH(bytes) => self.p2shindex_to_p2shaddressbytes.push_if_needed(index, bytes), + Addressbytes::P2WPKH(bytes) => self.p2wpkhindex_to_p2wpkhaddressbytes.push_if_needed(index, bytes), + Addressbytes::P2WSH(bytes) => self.p2wshindex_to_p2wshaddressbytes.push_if_needed(index, bytes), + Addressbytes::P2TR(bytes) => self.p2trindex_to_p2traddressbytes.push_if_needed(index, bytes), + } + } +} diff --git a/storable_vec/src/enums/error.rs b/storable_vec/src/enums/error.rs index 63f503b52..0905fd97c 100644 --- a/storable_vec/src/enums/error.rs +++ b/storable_vec/src/enums/error.rs @@ -3,30 +3,55 @@ use std::{ io, }; +use crate::Version; + pub type Result = std::result::Result; #[derive(Debug)] pub enum Error { + WrongEndian, + DifferentVersion { found: Version, expected: Version }, MmapsVecIsTooSmall, IO(io::Error), UnsafeSliceSerde(unsafe_slice_serde::Error), IndexTooHigh, + IndexTooLow, ExpectFileToHaveIndex, ExpectVecToHaveIndex, FailedKeyTryIntoUsize, + UnsupportedUnflushedState, +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::IO(value) + } +} + +impl From for Error { + fn from(value: unsafe_slice_serde::Error) -> Self { + Self::UnsafeSliceSerde(value) + } } impl fmt::Display for Error { - // This trait requires `fmt` with this exact signature. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + Error::WrongEndian => write!(f, "Wrong endian"), + Error::DifferentVersion { found, expected } => { + write!(f, "Different version; found: {found:?}, expected: {expected:?}") + } Error::MmapsVecIsTooSmall => write!(f, "Mmaps vec is too small"), Error::IO(error) => Debug::fmt(&error, f), Error::UnsafeSliceSerde(error) => Debug::fmt(&error, f), Error::IndexTooHigh => write!(f, "Index too high"), + Error::IndexTooLow => write!(f, "Index too low"), Error::ExpectFileToHaveIndex => write!(f, "Expect file to have index"), Error::ExpectVecToHaveIndex => write!(f, "Expect vec to have index"), Error::FailedKeyTryIntoUsize => write!(f, "Failed to convert key to usize"), + Error::UnsupportedUnflushedState => { + write!(f, "Unsupported unflush state, please flush before using this function") + } } } } diff --git a/storable_vec/src/lib.rs b/storable_vec/src/lib.rs index 5613cb6a8..95da53292 100644 --- a/storable_vec/src/lib.rs +++ b/storable_vec/src/lib.rs @@ -21,20 +21,22 @@ pub use enums::*; pub use structs::*; pub use traits::*; +type Buffer = Vec; + /// Uses `Mmap` instead of `File` /// /// Used in `/indexer` -const CACHED: u8 = 0; +pub const CACHED_GETS: u8 = 0; /// Will use the same `File` for every read, so not thread safe /// /// Used in `/computer` -const RAW_SYNC: u8 = 1; +pub const SINGLE_THREAD: u8 = 1; /// Will spin up a new `File` for every read /// /// Used in `/server` -const RAW_ASYNC: u8 = 2; +pub const ASYNC_READ_ONLY: u8 = 2; /// /// A very small, fast, efficient and simple storable Vec @@ -48,12 +50,16 @@ const RAW_ASYNC: u8 = 2; /// If you don't call `.flush()` it just acts as a normal Vec /// #[derive(Debug)] -pub struct StorableVec { +pub struct StorableVec { pathbuf: PathBuf, - unsafe_file: File, + file: File, + /// **Number of values NOT number of bytes** + file_len: usize, + /// Only for SINGLE_THREAD + file_position: u64, + buf: Buffer, + /// Only for CACHED_GETS cache: Vec>>, // Boxed Mmap to reduce the size of the Lock (from 24 to 16) - buf: Vec, - disk_len: usize, pushed: Vec, // updated: BTreeMap, // inserted: BTreeMap, @@ -69,7 +75,7 @@ const ONE_MB: usize = 1000 * 1024; const MAX_CACHE_SIZE: usize = 100 * ONE_MB; // const MAX_CACHE_SIZE: usize = 100 * ONE_MB; -impl StorableVec +impl StorableVec where I: StorableVecIndex, T: StorableVecType, @@ -80,24 +86,44 @@ where pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; - pub fn import(path: &Path, version: Version) -> Result { + /// Same as import but will remove the folder if the endian or the version is different, so be careful ! + pub fn forced_import(path: &Path, version: Version) -> Result { + let res = Self::import(path, version); + match res { + Err(Error::WrongEndian) | Err(Error::DifferentVersion { found: _, expected: _ }) => { + fs::remove_dir_all(path)?; + Self::import(path, version) + } + _ => res, + } + } + + pub fn import(path: &Path, version: Version) -> Result { fs::create_dir_all(path)?; let path_version = Self::path_version_(path); - let is_same_version = - Version::try_from(path_version.as_path()).is_ok_and(|prev_version| version == prev_version); - if !is_same_version { - fs::remove_dir_all(path)?; + + if let Ok(prev_version) = Version::try_from(path_version.as_path()) { + if prev_version != version { + if prev_version.swap_bytes() == version { + return Err(Error::WrongEndian); + } + return Err(Error::DifferentVersion { + found: prev_version, + expected: version, + }); + } } version.write(&path_version)?; - let unsafe_file = Self::open_file_(&Self::path_vec_(path))?; + let file = Self::open_file_(&Self::path_vec_(path))?; - let mut this = Self { + let mut slf = Self { pathbuf: path.to_owned(), - disk_len: Self::disk_len(&unsafe_file)?, - unsafe_file, - buf: vec![0; Self::SIZE_OF_T], + file_position: 0, + file_len: Self::read_disk_len_(&file)?, + file, + buf: Self::create_buffer(), cache: vec![], pushed: vec![], // updated: BTreeMap::new(), @@ -108,35 +134,20 @@ where // opened_mmaps: AtomicUsize::new(0), }; - // TODO: Only if write mode - this.reset_cache(); + slf.reset_disk_related_state()?; - Ok(this) + Ok(slf) } - pub fn disk_len(file: &File) -> io::Result { - Ok(Self::byte_index_to_index(file.metadata()?.len() as usize)) + #[inline] + fn create_buffer() -> Buffer { + vec![0; Self::SIZE_OF_T] } - pub fn reset_cache(&mut self) { - // par_iter_mut ? - self.cache.iter_mut().for_each(|lock| { - lock.take(); - }); - - let len = (self.disk_len as f64 / Self::PER_PAGE as f64).ceil() as usize; - let len = Self::CACHE_LENGTH.min(len); - - if self.cache.len() != len { - self.cache.resize_with(len, Default::default); - self.cache.shrink_to_fit(); - } + fn open_file(&self) -> io::Result { + Self::open_file_(&self.path_vec()) } - - fn open_file(&self) -> Result { - Self::open_file_(&self.path_vec()).map_err(Error::IO) - } - fn open_file_(path: &Path) -> Result { + fn open_file_(path: &Path) -> io::Result { OpenOptions::new() .read(true) .create(true) @@ -145,199 +156,104 @@ where .open(path) } - #[inline] - fn index_to_byte_range(index: usize) -> Range { - let index = Self::index_to_byte_index(index) % Self::PAGE_SIZE; - index..(index + Self::SIZE_OF_T) + fn read_disk_len(&self) -> io::Result { + Self::read_disk_len_(&self.file) + } + fn read_disk_len_(file: &File) -> io::Result { + Ok(Self::byte_index_to_index(file.metadata()?.len() as usize)) } - #[inline] - fn index_to_byte_index(index: usize) -> usize { - index * Self::SIZE_OF_T + fn reset_disk_related_state(&mut self) -> io::Result<()> { + self.file = self.open_file()?; + self.file_len = self.read_disk_len()?; + self.file_position = 0; + self.reset_cache() } - #[inline] - fn byte_index_to_index(byte_index: usize) -> usize { - byte_index / Self::SIZE_OF_T - } - - fn index_to_pushed_index(&self, index: usize) -> Result> { - if index >= self.disk_len { - let index = index - self.disk_len; - if index >= self.pushed.len() { - Err(Error::IndexTooHigh) - } else { - Ok(Some(index)) - } - } else { - Ok(None) - } - } - - #[inline] - pub fn cached_get(&self, index: I) -> Result>> { - self.cached_get_(index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?) - } - fn cached_get_(&self, index: usize) -> Result>> { - match self.index_to_pushed_index(index) { - Ok(index) => { - if let Some(index) = index { - return Ok(self.pushed.get(index).map(|v| Value::Ref(v))); - } - } - Err(Error::IndexTooHigh) => return Ok(None), - Err(error) => return Err(error), - } - - // if !self.updated.is_empty() { - // if let Some(v) = self.updated.get(&index) { - // return Ok(Some(v)); - // } - // } - - let page_index = index / Self::PER_PAGE; - let last_index = self.disk_len - 1; - let max_page_index = last_index / Self::PER_PAGE; - let min_page_index = (max_page_index + 1).checked_sub(self.cache.len()).unwrap_or_default(); - - // let min_open_page = self.min.load(AtomicOrdering::SeqCst); - - // if self.min.load(AtomicOrdering::SeqCst) { - // self.min.set(value) - // } - - if page_index >= min_page_index { - let mmap = &**self - .cache - .get(page_index - min_page_index) - .ok_or(Error::MmapsVecIsTooSmall)? - .get_or_init(|| { - Box::new(unsafe { - MmapOptions::new() - .len(Self::PAGE_SIZE) - .offset((page_index * Self::PAGE_SIZE) as u64) - .map(&self.unsafe_file) - .unwrap() - }) + fn reset_cache(&mut self) -> io::Result<()> { + match MODE { + CACHED_GETS => { + // par_iter_mut ? + self.cache.iter_mut().for_each(|lock| { + lock.take(); }); - let range = Self::index_to_byte_range(index); + let len = (self.file_len as f64 / Self::PER_PAGE as f64).ceil() as usize; + let len = Self::CACHE_LENGTH.min(len); - let slice = &mmap[range]; + if self.cache.len() != len { + self.cache.resize_with(len, Default::default); + self.cache.shrink_to_fit(); + } - Ok(Some(Value::Ref( - T::unsafe_try_from_slice(slice).map_err(Error::UnsafeSliceSerde)?, - ))) - } else { - let (mut file, mut buf) = self.prepare_to_read()?; - Self::seek_(&mut file, index)?; - let value = self.read_exact(&mut file, &mut buf)?; - Ok(Some(Value::Owned(value.to_owned()))) + Ok(()) + } + _ => Ok(()), } } - pub fn get_or_default(&self, index: I) -> Result - where - T: Default + Clone, - { - Ok(self - .cached_get(index)? - .map(|v| (*v).clone()) - .unwrap_or(Default::default())) - } + // #[inline] + // fn open_file_at_then_read(&self, index: I) -> Result { + // self.open_file_at_then_read_(Self::i_to_usize(index)?) + // } + fn open_file_at_then_read(&self, index: usize) -> Result { + // let (mut file, mut buf) = self.open_file_at(index)?; + let mut file = self.open_file()?; + let mut buf = Self::create_buffer(); - pub fn seek(file: &mut File, index: I) -> Result<()> { - Self::seek_(file, index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?) - } - - pub fn seek_(file: &mut File, index: usize) -> Result<()> { let byte_index = Self::index_to_byte_index(index); - file.seek(SeekFrom::Start(byte_index as u64)).map_err(Error::IO)?; - Ok(()) - } + Self::seek(&mut file, byte_index)?; - pub fn iter(&self, f: F) -> Result<()> - where - F: FnMut((I, &T)) -> Result<()>, - { - self.iter_from(I::from(0_usize), f) + Ok(Self::read_exact(&mut file, &mut buf)?.to_owned()) } - - pub fn prepare_to_read(&self) -> Result<(File, Vec)> { - let file = self.open_file()?; - let buf = vec![0; Self::SIZE_OF_T]; - Ok((file, buf)) + // #[inline] + // fn open_file_at(&self, index: I) -> Result<(File, Buffer)> { + // self.open_file_at_(Self::i_to_usize(index)?) + // } + // fn open_file_at(&self, index: usize) -> Result<(File, Buffer)> { + // let mut file = self.open_file()?; + // let buf = Self::create_buffer(); + // let byte_index = Self::index_to_byte_index(index); + // Self::seek(&mut file, byte_index)?; + // Ok((file, buf)) + // } + // #[inline] + // fn seek_if_needed_(file: &mut File, index: I) -> Result { + // Self::seek_if_needed__(file, Self::i_to_usize(index)?).map_err(Error::IO) + // } + // #[inline] + // fn seek_if_needed(file: &mut File, index: usize) -> io::Result { + // let byte_index = Self::index_to_byte_index(index); + // if file.stream_position()? != byte_index { + // Self::seek(file, byte_index)?; + // } + // Ok(byte_index) + // } + #[inline] + fn seek(file: &mut File, byte_index: u64) -> io::Result { + file.seek(SeekFrom::Start(byte_index)) } - - pub fn prepare_to_read_at(&self, index: I) -> Result<(File, Vec)> { - self.prepare_to_read_at_(index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?) - } - pub fn prepare_to_read_at_(&self, index: usize) -> Result<(File, Vec)> { - let (mut file, buf) = self.prepare_to_read()?; - Self::seek_(&mut file, index)?; - Ok((file, buf)) - } - - pub fn read_exact<'a>(&self, file: &'a mut File, buf: &'a mut [u8]) -> Result<&'a T> { - file.read_exact(buf).map_err(Error::IO)?; - let v = T::unsafe_try_from_slice(&buf[..]).map_err(Error::UnsafeSliceSerde)?; + fn read_exact<'a>(file: &'a mut File, buf: &'a mut [u8]) -> Result<&'a T> { + file.read_exact(buf)?; + let v = T::unsafe_try_from_slice(&buf[..])?; Ok(v) } - pub fn iter_from(&self, index: I, mut f: F) -> Result<()> - where - F: FnMut((I, &T)) -> Result<()>, - { - let (mut file, mut buf) = self.prepare_to_read()?; - let disk_len = Self::disk_len(&file).map_err(Error::IO)?; - Self::seek(&mut file, index)?; - - let mut i: usize = index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?; - while i < disk_len { - let v = self.read_exact(&mut file, &mut buf)?; - f((I::from(i), v))?; - i += 1; - } - i = 0; - while i < self.pushed_len() { - f((I::from(i + disk_len), self.pushed.get(i).as_ref().unwrap()))?; - i += 1; - } - - Ok(()) - } - - #[allow(unused)] - pub fn first(&self) -> Result>> { - self.cached_get_(0) - } - - #[allow(unused)] - pub fn last(&self) -> Result>> { - let len = self.len(); - if len == 0 { - return Ok(None); - } - self.cached_get_(len - 1) - } - - pub fn push(&mut self, value: T) { + #[inline] + fn push_(&mut self, value: T) { self.pushed.push(value) } - pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { - self.push_if_needed_(index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?, value) - } - fn push_if_needed_(&mut self, index: usize, value: T) -> Result<()> { - let len = self.len(); - match len.cmp(&index) { + #[inline] + fn push_if_needed_(&mut self, index: I, value: T) -> Result<()> { + match self.pushed_len().cmp(&Self::i_to_usize(index)?) { Ordering::Greater => { // dbg!(len, index, &self.pathbuf); // panic!(); Ok(()) } Ordering::Equal => { - self.push(value); + self.pushed.push(value); Ok(()) } Ordering::Less => { @@ -345,7 +261,34 @@ where Err(Error::IndexTooHigh) } } + // Self::push_to_vec_if_needed(&mut self.pushed, index, value) } + // #[inline] + // fn push_if_needed__(&mut self, index: usize, value: T) -> Result<()> { + // Self::push_to_vec_if_needed_(&mut self.pushed, index, value) + // } + // #[inline] + // fn push_to_vec_if_needed(vec: &mut Vec, index: I, value: T) -> Result<()> { + // Self::push_to_vec_if_needed_(vec, Self::i_to_usize(index)?, value) + // } + // fn push_to_vec_if_needed_(vec: &mut Vec, index: usize, value: T) -> Result<()> { + // let len = vec.len(); + // match len.cmp(&index) { + // Ordering::Greater => { + // // dbg!(len, index, &self.pathbuf); + // // panic!(); + // Ok(()) + // } + // Ordering::Equal => { + // vec.push(value); + // Ok(()) + // } + // Ordering::Less => { + // dbg!(index, value); + // Err(Error::IndexTooHigh) + // } + // } + // } // pub fn update(&mut self, index: I, value: T) -> Result<()> { // self._update(index.into(), value) @@ -387,77 +330,332 @@ where // self.removed.insert(index); // } + #[inline] pub fn len(&self) -> usize { - self.disk_len + self.pushed_len() + self.file_len + self.pushed_len() } + #[inline] pub fn pushed_len(&self) -> usize { self.pushed.len() } + #[inline] pub fn is_empty(&self) -> bool { self.len() == 0 } + #[inline] pub fn has(&self, index: I) -> Result { - Ok(self.has_(index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?)) + Ok(self.has_(Self::i_to_usize(index)?)) } + #[inline] fn has_(&self, index: usize) -> bool { index < self.len() } + #[inline] pub fn hasnt(&self, index: I) -> Result { - Ok(self.hasnt_(index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)?)) + self.has(index).map(|b| !b) } + #[inline] fn hasnt_(&self, index: usize) -> bool { !self.has_(index) } - // pub fn flush(&mut self) -> io::Result<()> - // where - // T: Bytes, - // { - // self.flush_(|bytes, v| bytes.extend_from_slice(&v.to_bytes())) - // } pub fn flush(&mut self) -> io::Result<()> { - // self.flush_(|bytes, v| bytes.extend_from_slice(v.unsafe_as_slice())) - // } - // fn flush_(&mut self, mut extend: F) -> io::Result<()> - // where - // F: FnMut(&mut Vec, T), - // { - self.reset_cache(); + self.reset_disk_related_state()?; if self.pushed.is_empty() { return Ok(()); } - self.disk_len += self.pushed.len(); + self.file_len += self.pushed.len(); let mut bytes: Vec = vec![]; mem::take(&mut self.pushed) .into_iter() .for_each(|v| bytes.extend_from_slice(v.unsafe_as_slice())); - // .for_each(|v| extend(&mut bytes, v)); - self.unsafe_file.write_all(&bytes)?; + self.file.write_all(&bytes)?; Ok(()) } + #[inline] + fn i_to_usize(index: I) -> Result { + index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize) + } + + #[inline] + fn byte_index_to_index(byte_index: usize) -> usize { + byte_index / Self::SIZE_OF_T + } + + #[inline] + fn index_to_byte_index(index: usize) -> u64 { + (index * Self::SIZE_OF_T) as u64 + } + + #[inline] + fn index_to_byte_range(index: usize) -> Range { + let index = (Self::index_to_byte_index(index) as usize) % Self::PAGE_SIZE; + index..(index + Self::SIZE_OF_T) + } + + fn index_to_pushed_index(&self, index: usize) -> Result> { + if index >= self.file_len { + let index = index - self.file_len; + if index >= self.pushed.len() { + Err(Error::IndexTooHigh) + } else { + Ok(Some(index)) + } + } else { + Err(Error::IndexTooLow) + } + } + + #[inline] pub fn path(&self) -> &Path { &self.pathbuf } + #[inline] fn path_vec(&self) -> PathBuf { Self::path_vec_(&self.pathbuf) } + #[inline] fn path_vec_(path: &Path) -> PathBuf { path.join("vec") } + #[inline] fn path_version_(path: &Path) -> PathBuf { path.join("version") } } + +impl StorableVec +where + I: StorableVecIndex, + T: StorableVecType, +{ + #[inline] + pub fn get(&self, index: I) -> Result>> { + self.get_(Self::i_to_usize(index)?) + } + fn get_(&self, index: usize) -> Result>> { + match self.index_to_pushed_index(index) { + Ok(index) => { + if let Some(index) = index { + return Ok(self.pushed.get(index).map(|v| Value::Ref(v))); + } + } + Err(Error::IndexTooHigh) => return Ok(None), + Err(Error::IndexTooLow) => {} + Err(error) => return Err(error), + } + + // if !self.updated.is_empty() { + // if let Some(v) = self.updated.get(&index) { + // return Ok(Some(v)); + // } + // } + + let page_index = index / Self::PER_PAGE; + let last_index = self.file_len - 1; + let max_page_index = last_index / Self::PER_PAGE; + let min_page_index = (max_page_index + 1).checked_sub(self.cache.len()).unwrap_or_default(); + + // let min_open_page = self.min.load(AtomicOrdering::SeqCst); + + // if self.min.load(AtomicOrdering::SeqCst) { + // self.min.set(value) + // } + + if page_index >= min_page_index { + let mmap = &**self + .cache + .get(page_index - min_page_index) + .ok_or(Error::MmapsVecIsTooSmall)? + .get_or_init(|| { + Box::new(unsafe { + MmapOptions::new() + .len(Self::PAGE_SIZE) + .offset((page_index * Self::PAGE_SIZE) as u64) + .map(&self.file) + .unwrap() + }) + }); + + let range = Self::index_to_byte_range(index); + let slice = &mmap[range]; + return Ok(Some(Value::Ref(T::unsafe_try_from_slice(slice)?))); + } + + Ok(Some(Value::Owned(self.open_file_at_then_read(index)?.to_owned()))) + } + + pub fn get_or_default(&self, index: I) -> Result + where + T: Default + Clone, + { + Ok(self.get(index)?.map(|v| (*v).clone()).unwrap_or(Default::default())) + } + + #[inline] + pub fn push(&mut self, value: T) { + self.push_(value) + } + + #[inline] + pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { + self.push_if_needed_(index, value) + } +} + +const FLUSH_EVERY: usize = 10_000; +impl StorableVec +where + I: StorableVecIndex, + T: StorableVecType, +{ + pub fn get(&mut self, index: I) -> Result<&T> { + self.get_(Self::i_to_usize(index)?) + } + fn get_(&mut self, index: usize) -> Result<&T> { + let byte_index = Self::index_to_byte_index(index); + if self.file_position != byte_index { + self.file_position = Self::seek(&mut self.file, byte_index)?; + } + let res = Self::read_exact(&mut self.file, &mut self.buf); + if res.is_ok() { + self.file_position += Self::SIZE_OF_T as u64; + } + res + } + + pub fn last(&mut self) -> Result> { + let len = self.len(); + if len == 0 { + return Ok(None); + } + Ok(self.get_(len - 1).ok()) + } + + #[inline] + pub fn push(&mut self, value: T) { + self.push_(value) + } + + #[inline] + pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { + self.push_if_needed_(index, value)?; + + if self.pushed_len() >= FLUSH_EVERY { + Ok(self.flush()?) + } else { + Ok(()) + } + } + + // #[inline] + // fn seek_if_needed(&mut self, index: I) -> Result<()> { + // if self.file_position == Self::index_to_byte_index(Self::i_to_usize(index)?) { + // return Ok(()); + // } + // self.file_position = Self::seek_if_needed_(&mut self.file, index)?; + // Ok(()) + // } + + pub fn iter(&mut self, f: F) -> Result<()> + where + F: FnMut((I, &T)) -> Result<()>, + { + self.iter_from(I::default(), f) + } + + pub fn iter_from(&mut self, mut index: I, mut f: F) -> Result<()> + where + F: FnMut((I, &T)) -> Result<()>, + { + // let pushed_len = self.pushed_len(); + + // self.seek_if_needed(index)?; + + if !self.pushed.is_empty() { + return Err(Error::UnsupportedUnflushedState); + } + + let disk_len = I::from(Self::read_disk_len_(&self.file)?); + + while index < disk_len { + f((index, self.get(index)?))?; + index = index + 1; + } + + // i = 0; + // while i < pushed_len { + // f((I::from(i + disk_len), self.pushed.get(i).as_ref().unwrap()))?; + // i += 1; + // } + + Ok(()) + } + + pub fn compute_inverse_more_to_less(&mut self, other: &mut StorableVec) -> Result<()> + where + I: StorableVecType, + T: StorableVecIndex, + { + let index = self.last()?.cloned().unwrap_or_default(); + other.iter_from(index, |(v, i)| self.push_if_needed(*i, v))?; + Ok(self.flush()?) + } + + pub fn compute_inverse_less_to_more( + &mut self, + first_indexes: &mut StorableVec, + last_indexes: &mut StorableVec, + ) -> Result<()> + where + I: StorableVecType, + T: StorableVecIndex, + { + first_indexes.iter_from(T::from(self.len()), |(value, first_index)| { + let first_index = Self::i_to_usize(*first_index)?; + let last_index = Self::i_to_usize(*last_indexes.get(value)?)?; + (first_index..last_index).try_for_each(|index| self.push_if_needed(I::from(index), value)) + })?; + Ok(self.flush()?) + } + + pub fn compute_transform(&mut self, other: &mut StorableVec, t: F) -> Result<()> + where + A: StorableVecType, + F: Fn(&A) -> T, + { + other.iter_from(I::from(self.len()), |(i, a)| self.push_if_needed(i, t(a)))?; + Ok(self.flush()?) + } +} + +impl StorableVec +where + I: StorableVecIndex, + T: StorableVecType, +{ + #[inline] + pub fn get(&self, index: I) -> Result>> { + self.get_(Self::i_to_usize(index)?) + } + #[inline] + fn get_(&self, index: usize) -> Result>> { + Ok(Some(Value::Owned(self.open_file_at_then_read(index)?.to_owned()))) + } + + // Add iter iter_from iter_range collect.. + // + add memory cap +} diff --git a/storable_vec/src/main.rs b/storable_vec/src/main.rs index 8a32b4ed2..08e5b9612 100644 --- a/storable_vec/src/main.rs +++ b/storable_vec/src/main.rs @@ -1,24 +1,25 @@ use std::path::Path; -use storable_vec::{StorableVec, Version}; +use storable_vec::{StorableVec, Version, CACHED_GETS}; fn main() -> Result<(), Box> { { - let mut vec: StorableVec = StorableVec::import(Path::new("./v"), Version::from(1))?; + let mut vec: StorableVec = + StorableVec::forced_import(Path::new("./v"), Version::from(1))?; vec.push(0); vec.push(1); vec.push(2); - dbg!(vec.cached_get(0)?); // Some(0) - dbg!(vec.cached_get(21)?); // None + dbg!(vec.get(0)?); // Some(0) + dbg!(vec.get(21)?); // None vec.flush()?; } { - let vec: StorableVec = StorableVec::import(Path::new("./v"), Version::from(1))?; + let vec: StorableVec = StorableVec::forced_import(Path::new("./v"), Version::from(1))?; - dbg!(vec.cached_get(0)?); // 0 + dbg!(vec.get(0)?); // 0 } Ok(()) diff --git a/storable_vec/src/structs/version.rs b/storable_vec/src/structs/version.rs index e3c22c552..67af8da9f 100644 --- a/storable_vec/src/structs/version.rs +++ b/storable_vec/src/structs/version.rs @@ -4,12 +4,20 @@ use std::{ path::Path, }; +use unsafe_slice_serde::UnsafeSliceSerde; + +use crate::Error; + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct Version(u32); impl Version { pub fn write(&self, path: &Path) -> Result<(), io::Error> { - fs::write(path, self.0.to_le_bytes()) + fs::write(path, self.0.unsafe_as_slice()) + } + + pub fn swap_bytes(self) -> Self { + Self(self.0.swap_bytes()) } } @@ -20,10 +28,10 @@ impl From for Version { } impl TryFrom<&Path> for Version { - type Error = io::Error; + type Error = Error; fn try_from(value: &Path) -> Result { let mut buf = [0; 4]; fs::read(value)?.as_slice().read_exact(&mut buf)?; - Ok(Self(u32::from_le_bytes(buf))) + Ok(*(Self::unsafe_try_from_slice(&buf)?)) } } diff --git a/storable_vec/src/traits/any.rs b/storable_vec/src/traits/any.rs index 22ff38e9c..8dcf72a82 100644 --- a/storable_vec/src/traits/any.rs +++ b/storable_vec/src/traits/any.rs @@ -7,10 +7,10 @@ use super::{StorableVecIndex, StorableVecType}; pub trait AnyStorableVec { fn len(&self) -> usize; fn is_empty(&self) -> bool; - fn unsafe_flush(&mut self) -> io::Result<()>; + fn flush(&mut self) -> io::Result<()>; } -impl AnyStorableVec for StorableVec +impl AnyStorableVec for StorableVec where I: StorableVecIndex, T: StorableVecType, @@ -23,7 +23,7 @@ where self.is_empty() } - fn unsafe_flush(&mut self) -> io::Result<()> { + fn flush(&mut self) -> io::Result<()> { self.flush() } } diff --git a/storable_vec/src/traits/index.rs b/storable_vec/src/traits/index.rs index fff90ada1..6ff29dcca 100644 --- a/storable_vec/src/traits/index.rs +++ b/storable_vec/src/traits/index.rs @@ -2,10 +2,10 @@ use std::{fmt::Debug, ops::Add}; pub trait StorableVecIndex where - Self: Debug + Default + Copy + Clone + TryInto + From + Add, + Self: Debug + Default + Copy + Clone + PartialOrd + Ord + TryInto + From + Add, { } impl StorableVecIndex for I where - I: Debug + Default + Copy + Clone + TryInto + From + Add + I: Debug + Default + Copy + Clone + PartialOrd + Ord + TryInto + From + Add { }