diff --git a/Cargo.lock b/Cargo.lock index 04d5b1965..1346970a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -548,7 +548,6 @@ dependencies = [ "derive_deref", "either", "fjall", - "jiff", "log", "rayon", "serde", diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index 18c70b469..32cc7b581 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -23,7 +23,6 @@ color-eyre = { workspace = true } derive_deref = { workspace = true } either = "1.15.0" fjall = { workspace = true } -jiff = { workspace = true } log = { workspace = true } rayon = { workspace = true } serde = { workspace = true } diff --git a/crates/brk_computer/src/vecs/fetched.rs b/crates/brk_computer/src/vecs/fetched.rs index 26fd71c93..8ae500870 100644 --- a/crates/brk_computer/src/vecs/fetched.rs +++ b/crates/brk_computer/src/vecs/fetched.rs @@ -10,6 +10,7 @@ use brk_fetcher::Fetcher; use brk_indexer::Indexer; use brk_vec::{ AnyCollectableVec, AnyIterableVec, AnyVec, Computation, EagerVec, Format, StoredIndex, + VecIterator, }; use crate::vecs::grouped::Source; @@ -456,33 +457,34 @@ impl Vecs { exit, )?; + let mut prev = None; self.dateindex_to_ohlc_in_cents.compute_transform( starting_indexes.dateindex, &indexes.dateindex_to_date, |(di, d, this)| { - let get_prev = || { - this.get_or_read(di, &this.mmap().load()) - .unwrap() - .unwrap() - .into_owned() - }; + if prev.is_none() { + let i = di.unwrap_to_usize(); + prev.replace(if i > 0 { + this.into_iter().unwrap_get_inner_(i - 1) + } else { + OHLCCents::default() + }); + } - let mut ohlc = if di.unwrap_to_usize() + 100 >= this.len() { - fetcher.get_date(d).unwrap_or_else(|_| get_prev()) - } else { - get_prev() - }; - - if let Some(prev) = di.decremented() { - let prev_open = *this - .get_or_read(prev, &this.mmap().load()) - .unwrap() - .unwrap() - .close; + let ohlc = if di.unwrap_to_usize() + 100 >= this.len() + && let Ok(mut ohlc) = fetcher.get_date(d) + { + let prev_open = *prev.as_ref().unwrap().close; *ohlc.open = prev_open; *ohlc.high = (*ohlc.high).max(prev_open); *ohlc.low = (*ohlc.low).min(prev_open); - } + ohlc + } else { + prev.clone().unwrap() + }; + + prev.replace(ohlc.clone()); + (di, ohlc) }, exit, diff --git a/crates/brk_computer/src/vecs/stateful/mod.rs b/crates/brk_computer/src/vecs/stateful/mod.rs index 6a0f32f75..bfc34b12d 100644 --- a/crates/brk_computer/src/vecs/stateful/mod.rs +++ b/crates/brk_computer/src/vecs/stateful/mod.rs @@ -498,9 +498,9 @@ impl Vecs { let dateindex_to_first_height = &indexes.dateindex_to_first_height; let dateindex_to_height_count = &indexes.dateindex_to_height_count; - let outputindex_to_value_mmap = outputindex_to_value.mmap().load(); - let outputindex_to_outputtype_mmap = outputindex_to_outputtype.mmap().load(); - let outputindex_to_typeindex_mmap = outputindex_to_typeindex.mmap().load(); + let outputindex_to_value_mmap = outputindex_to_value.create_mmap()?; + let outputindex_to_outputtype_mmap = outputindex_to_outputtype.create_mmap()?; + let outputindex_to_typeindex_mmap = outputindex_to_typeindex.create_mmap()?; let mut inputindex_to_outputindex_iter = inputindex_to_outputindex.into_iter(); let mut height_to_first_outputindex_iter = height_to_first_outputindex.into_iter(); diff --git a/crates/brk_core/src/utils/rlimit.rs b/crates/brk_core/src/utils/rlimit.rs index a84b8a2a6..f725c7b96 100644 --- a/crates/brk_core/src/utils/rlimit.rs +++ b/crates/brk_core/src/utils/rlimit.rs @@ -7,7 +7,7 @@ pub fn setrlimit() -> io::Result<()> { rlimit::setrlimit( Resource::NOFILE, - no_file_limit.0.max(250_000), + no_file_limit.0.max(10_000), no_file_limit.1, )?; diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 51db9c4a4..a38605d5b 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -15,7 +15,7 @@ use bitcoin::{Transaction, TxIn, TxOut}; use brk_exit::Exit; use brk_parser::Parser; use brk_store::AnyStore; -use brk_vec::{AnyVec, VecIterator}; +use brk_vec::{AnyVec, Mmap, VecIterator}; use color_eyre::eyre::{ContextCompat, eyre}; use log::{error, info}; use rayon::prelude::*; @@ -40,6 +40,7 @@ pub struct Indexer { impl Indexer { pub fn forced_import(outputs_dir: &Path) -> color_eyre::Result { setrlimit()?; + Ok(Self { vecs: Vecs::forced_import(&outputs_dir.join("vecs/indexed"), VERSION + Version::ZERO)?, stores: Stores::forced_import(&outputs_dir.join("stores"), VERSION + Version::ZERO)?, @@ -88,9 +89,9 @@ impl Indexer { height: Height, rem: bool, exit: &Exit| - -> color_eyre::Result<()> { + -> color_eyre::Result { if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem || exit.triggered() { - return Ok(()); + return Ok(false); } info!("Exporting..."); @@ -98,15 +99,88 @@ impl Indexer { stores.commit(height)?; vecs.flush(height)?; exit.release(); - Ok(()) + Ok(true) }; + let mut txindex_to_first_outputindex_mmap_opt = None; + let mut p2pk65addressindex_to_p2pk65bytes_mmap_opt = None; + let mut p2pk33addressindex_to_p2pk33bytes_mmap_opt = None; + let mut p2pkhaddressindex_to_p2pkhbytes_mmap_opt = None; + let mut p2shaddressindex_to_p2shbytes_mmap_opt = None; + let mut p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt = None; + let mut p2wshaddressindex_to_p2wshbytes_mmap_opt = None; + let mut p2traddressindex_to_p2trbytes_mmap_opt = None; + let mut p2aaddressindex_to_p2abytes_mmap_opt = None; + + let reset_mmaps_options = + |vecs: &mut Vecs, + txindex_to_first_outputindex_mmap_opt: &mut Option, + p2pk65addressindex_to_p2pk65bytes_mmap_opt: &mut Option, + p2pk33addressindex_to_p2pk33bytes_mmap_opt: &mut Option, + p2pkhaddressindex_to_p2pkhbytes_mmap_opt: &mut Option, + p2shaddressindex_to_p2shbytes_mmap_opt: &mut Option, + p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt: &mut Option, + p2wshaddressindex_to_p2wshbytes_mmap_opt: &mut Option, + p2traddressindex_to_p2trbytes_mmap_opt: &mut Option, + p2aaddressindex_to_p2abytes_mmap_opt: &mut Option| { + txindex_to_first_outputindex_mmap_opt + .replace(vecs.txindex_to_first_outputindex.create_mmap().unwrap()); + p2pk65addressindex_to_p2pk65bytes_mmap_opt.replace( + vecs.p2pk65addressindex_to_p2pk65bytes + .create_mmap() + .unwrap(), + ); + p2pk33addressindex_to_p2pk33bytes_mmap_opt.replace( + vecs.p2pk33addressindex_to_p2pk33bytes + .create_mmap() + .unwrap(), + ); + p2pkhaddressindex_to_p2pkhbytes_mmap_opt + .replace(vecs.p2pkhaddressindex_to_p2pkhbytes.create_mmap().unwrap()); + p2shaddressindex_to_p2shbytes_mmap_opt + .replace(vecs.p2shaddressindex_to_p2shbytes.create_mmap().unwrap()); + p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt.replace( + vecs.p2wpkhaddressindex_to_p2wpkhbytes + .create_mmap() + .unwrap(), + ); + p2wshaddressindex_to_p2wshbytes_mmap_opt + .replace(vecs.p2wshaddressindex_to_p2wshbytes.create_mmap().unwrap()); + p2traddressindex_to_p2trbytes_mmap_opt + .replace(vecs.p2traddressindex_to_p2trbytes.create_mmap().unwrap()); + p2aaddressindex_to_p2abytes_mmap_opt + .replace(vecs.p2aaddressindex_to_p2abytes.create_mmap().unwrap()); + }; + + reset_mmaps_options( + vecs, + &mut txindex_to_first_outputindex_mmap_opt, + &mut p2pk65addressindex_to_p2pk65bytes_mmap_opt, + &mut p2pk33addressindex_to_p2pk33bytes_mmap_opt, + &mut p2pkhaddressindex_to_p2pkhbytes_mmap_opt, + &mut p2shaddressindex_to_p2shbytes_mmap_opt, + &mut p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt, + &mut p2wshaddressindex_to_p2wshbytes_mmap_opt, + &mut p2traddressindex_to_p2trbytes_mmap_opt, + &mut p2aaddressindex_to_p2abytes_mmap_opt, + ); + parser.parse(start, end).iter().try_for_each( |(height, block, blockhash)| -> color_eyre::Result<()> { info!("Indexing block {height}..."); idxs.height = height; + let txindex_to_first_outputindex_mmap = txindex_to_first_outputindex_mmap_opt.as_ref().unwrap(); + let p2pk65addressindex_to_p2pk65bytes_mmap = p2pk65addressindex_to_p2pk65bytes_mmap_opt.as_ref().unwrap(); + let p2pk33addressindex_to_p2pk33bytes_mmap = p2pk33addressindex_to_p2pk33bytes_mmap_opt.as_ref().unwrap(); + let p2pkhaddressindex_to_p2pkhbytes_mmap = p2pkhaddressindex_to_p2pkhbytes_mmap_opt.as_ref().unwrap(); + let p2shaddressindex_to_p2shbytes_mmap = p2shaddressindex_to_p2shbytes_mmap_opt.as_ref().unwrap(); + let p2wpkhaddressindex_to_p2wpkhbytes_mmap = p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt.as_ref().unwrap(); + let p2wshaddressindex_to_p2wshbytes_mmap = p2wshaddressindex_to_p2wshbytes_mmap_opt.as_ref().unwrap(); + let p2traddressindex_to_p2trbytes_mmap = p2traddressindex_to_p2trbytes_mmap_opt.as_ref().unwrap(); + let p2aaddressindex_to_p2abytes_mmap = p2aaddressindex_to_p2abytes_mmap_opt.as_ref().unwrap(); + // Used to check rapidhash collisions let check_collisions = check_collisions && height > Height::new(COLLISIONS_CHECKED_UP_TO); @@ -166,9 +240,6 @@ impl Indexer { }); let input_source_vec_handle = scope.spawn(|| { - let txindex_to_first_outputindex_mmap = vecs - .txindex_to_first_outputindex.mmap().load(); - let inputs = block .txdata .iter() @@ -211,7 +282,7 @@ impl Indexer { let vout = Vout::from(outpoint.vout); - let outputindex = vecs.txindex_to_first_outputindex.get_or_read(prev_txindex, &txindex_to_first_outputindex_mmap)? + let outputindex = vecs.txindex_to_first_outputindex.get_or_read(prev_txindex, txindex_to_first_outputindex_mmap)? .context("Expect outputindex to not be none") .inspect_err(|_| { dbg!(outpoint.txid, prev_txindex, vout); @@ -240,16 +311,6 @@ impl Indexer { }) }); - let p2pk65addressindex_to_p2pk65bytes_mmap = vecs - .p2pk65addressindex_to_p2pk65bytes.mmap().load(); - let p2pk33addressindex_to_p2pk33bytes_mmap = vecs.p2pk33addressindex_to_p2pk33bytes.mmap().load(); - let p2pkhaddressindex_to_p2pkhbytes_mmap = vecs.p2pkhaddressindex_to_p2pkhbytes.mmap().load(); - let p2shaddressindex_to_p2shbytes_mmap = vecs.p2shaddressindex_to_p2shbytes.mmap().load(); - let p2wpkhaddressindex_to_p2wpkhbytes_mmap = vecs.p2wpkhaddressindex_to_p2wpkhbytes.mmap().load(); - let p2wshaddressindex_to_p2wshbytes_mmap = vecs.p2wshaddressindex_to_p2wshbytes.mmap().load(); - let p2traddressindex_to_p2trbytes_mmap = vecs.p2traddressindex_to_p2trbytes.mmap().load(); - let p2aaddressindex_to_p2abytes_mmap = vecs.p2aaddressindex_to_p2abytes.mmap().load(); - let outputs = block .txdata .iter() @@ -307,35 +368,35 @@ impl Indexer { let prev_addressbytes_opt = match outputtype { OutputType::P2PK65 => vecs .p2pk65addressindex_to_p2pk65bytes - .get_or_read(typeindex.into(), &p2pk65addressindex_to_p2pk65bytes_mmap)? + .get_or_read(typeindex.into(), p2pk65addressindex_to_p2pk65bytes_mmap)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2PK33 => vecs .p2pk33addressindex_to_p2pk33bytes - .get_or_read(typeindex.into(), &p2pk33addressindex_to_p2pk33bytes_mmap)? + .get_or_read(typeindex.into(), p2pk33addressindex_to_p2pk33bytes_mmap)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2PKH => vecs .p2pkhaddressindex_to_p2pkhbytes - .get_or_read(typeindex.into(), &p2pkhaddressindex_to_p2pkhbytes_mmap)? + .get_or_read(typeindex.into(), p2pkhaddressindex_to_p2pkhbytes_mmap)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2SH => vecs .p2shaddressindex_to_p2shbytes - .get_or_read(typeindex.into(), &p2shaddressindex_to_p2shbytes_mmap)? + .get_or_read(typeindex.into(), p2shaddressindex_to_p2shbytes_mmap)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2WPKH => vecs .p2wpkhaddressindex_to_p2wpkhbytes - .get_or_read(typeindex.into(), &p2wpkhaddressindex_to_p2wpkhbytes_mmap)? + .get_or_read(typeindex.into(), p2wpkhaddressindex_to_p2wpkhbytes_mmap)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2WSH => vecs .p2wshaddressindex_to_p2wshbytes - .get_or_read(typeindex.into(), &p2wshaddressindex_to_p2wshbytes_mmap)? + .get_or_read(typeindex.into(), p2wshaddressindex_to_p2wshbytes_mmap)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2TR => vecs .p2traddressindex_to_p2trbytes - .get_or_read(typeindex.into(), &p2traddressindex_to_p2trbytes_mmap)? + .get_or_read(typeindex.into(), p2traddressindex_to_p2trbytes_mmap)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2A => vecs .p2aaddressindex_to_p2abytes - .get_or_read(typeindex.into(), &p2aaddressindex_to_p2abytes_mmap)? + .get_or_read(typeindex.into(), p2aaddressindex_to_p2abytes_mmap)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::Empty | OutputType::OpReturn | OutputType::P2MS | OutputType::Unknown => { unreachable!() @@ -677,7 +738,22 @@ impl Indexer { idxs.inputindex += InputIndex::from(inputs_len); idxs.outputindex += OutputIndex::from(outputs_len); - export_if_needed(stores, vecs, height, false, exit)?; + let exported = export_if_needed(stores, vecs, height, false, exit)?; + + if exported { + reset_mmaps_options( + vecs, + &mut txindex_to_first_outputindex_mmap_opt, + &mut p2pk65addressindex_to_p2pk65bytes_mmap_opt, + &mut p2pk33addressindex_to_p2pk33bytes_mmap_opt, + &mut p2pkhaddressindex_to_p2pkhbytes_mmap_opt, + &mut p2shaddressindex_to_p2shbytes_mmap_opt, + &mut p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt, + &mut p2wshaddressindex_to_p2wshbytes_mmap_opt, + &mut p2traddressindex_to_p2trbytes_mmap_opt, + &mut p2aaddressindex_to_p2abytes_mmap_opt, + ); + } Ok(()) }, diff --git a/crates/brk_vec/src/structs/header.rs b/crates/brk_vec/src/structs/header.rs index 7a66793aa..b50254c57 100644 --- a/crates/brk_vec/src/structs/header.rs +++ b/crates/brk_vec/src/structs/header.rs @@ -1,13 +1,12 @@ use std::{ fs::File, - io::{self, Seek, SeekFrom}, + io::{self, Read, Seek, SeekFrom}, os::unix::fs::FileExt, sync::Arc, }; use arc_swap::ArcSwap; use brk_core::{Error, Height, Result, Version}; -use memmap2::Mmap; use zerocopy::{FromBytes, IntoBytes}; use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; @@ -31,8 +30,12 @@ impl Header { }) } - pub fn import_and_verify(mmap: &Mmap, vec_version: Version, format: Format) -> Result { - let inner = HeaderInner::import_and_verify(mmap, vec_version, format)?; + pub fn import_and_verify( + file: &mut File, + vec_version: Version, + format: Format, + ) -> Result { + let inner = HeaderInner::import_and_verify(file, vec_version, format)?; Ok(Self { inner: Arc::new(ArcSwap::from_pointee(inner)), modified: false, @@ -109,13 +112,20 @@ impl HeaderInner { file.write_all_at(self.as_bytes(), 0) } - pub fn import_and_verify(mmap: &Mmap, vec_version: Version, format: Format) -> Result { - if mmap.len() < HEADER_OFFSET { + pub fn import_and_verify( + file: &mut File, + vec_version: Version, + format: Format, + ) -> Result { + if file.metadata()?.len() < HEADER_OFFSET as u64 { return Err(Error::WrongLength); } - // dbg!(mmap.len()); - let header = HeaderInner::read_from_bytes(&mmap[..HEADER_OFFSET])?; - // dbg!(&header); + + let mut buf = [0; HEADER_OFFSET]; + file.read_exact(&mut buf)?; + + let header = HeaderInner::read_from_bytes(&buf)?; + if header.header_version != HEADER_VERSION { return Err(Error::DifferentVersion { found: header.header_version, diff --git a/crates/brk_vec/src/traits/generic.rs b/crates/brk_vec/src/traits/generic.rs index 576740c89..4b0ffd2cf 100644 --- a/crates/brk_vec/src/traits/generic.rs +++ b/crates/brk_vec/src/traits/generic.rs @@ -3,10 +3,8 @@ use std::{ fs::{File, OpenOptions}, io::{self, Seek, SeekFrom, Write}, path::{Path, PathBuf}, - sync::Arc, }; -use arc_swap::ArcSwap; use brk_core::Result; use memmap2::Mmap; @@ -38,7 +36,7 @@ where } #[inline] fn get_or_read_(&self, index: usize, mmap: &Mmap) -> Result>> { - let stored_len = self.stored_len_(mmap); + let stored_len = self.stored_len(); if index >= stored_len { let pushed = self.pushed(); @@ -61,10 +59,7 @@ where format!("{}_to_{}", I::to_string(), self.name()) } - fn mmap(&self) -> &ArcSwap; - fn stored_len(&self) -> usize; - fn stored_len_(&self, mmap: &Mmap) -> usize; fn pushed(&self) -> &[T]; #[inline] @@ -116,7 +111,7 @@ where fn file_set_len(&mut self, file: &mut File, len: u64) -> Result<()> { Self::file_set_len_(file, len)?; - self.update_mmap(file) + Ok(()) } fn file_set_len_(file: &mut File, len: u64) -> Result<()> { file.set_len(len)?; @@ -127,7 +122,7 @@ where fn file_write_all(&mut self, file: &mut File, buf: &[u8]) -> Result<()> { file.write_all(buf)?; file.flush()?; - self.update_mmap(file) + Ok(()) } fn file_truncate_and_write_all(&mut self, file: &mut File, len: u64, buf: &[u8]) -> Result<()> { @@ -143,14 +138,10 @@ where self.file_truncate_and_write_all(&mut file, HEADER_OFFSET as u64, &[]) } - fn new_mmap(file: &File) -> Result> { - Ok(Arc::new(unsafe { Mmap::map(file)? })) - } - - fn update_mmap(&mut self, file: &File) -> Result<()> { - let mmap = Self::new_mmap(file)?; - self.mmap().store(mmap); - Ok(()) + #[inline] + fn create_mmap(&self) -> Result { + let file = self.open_file()?; + unsafe { Mmap::map(&file).map_err(|e| e.into()) } } #[inline] diff --git a/crates/brk_vec/src/variants/compressed.rs b/crates/brk_vec/src/variants/compressed.rs index ec3e52fc2..5ced0c22c 100644 --- a/crates/brk_vec/src/variants/compressed.rs +++ b/crates/brk_vec/src/variants/compressed.rs @@ -180,11 +180,6 @@ where self.inner.mut_header() } - #[inline] - fn mmap(&self) -> &ArcSwap { - self.inner.mmap() - } - fn parent(&self) -> &Path { self.inner.parent() } @@ -193,10 +188,6 @@ where fn stored_len(&self) -> usize { Self::stored_len__(&self.pages_meta.load()) } - #[inline] - fn stored_len_(&self, _: &Mmap) -> usize { - self.stored_len() - } #[inline] fn pushed(&self) -> &[T] { @@ -223,6 +214,8 @@ where let stored_len = self.stored_len(); + let mut file = file_opt.unwrap_or(self.open_file()?); + let mut pages_meta = (**self.pages_meta.load()).clone(); let mut starting_page_index = pages_meta.len(); @@ -236,16 +229,13 @@ where let last_page_index = pages_meta.len() - 1; - values = Self::decode_page_( - stored_len, - last_page_index, - &self.mmap().load(), - &pages_meta, - ) - .inspect_err(|_| { - dbg!(last_page_index, &pages_meta); - }) - .unwrap(); + let mmap = unsafe { Mmap::map(&file)? }; + + values = Self::decode_page_(stored_len, last_page_index, &mmap, &pages_meta) + .inspect_err(|_| { + dbg!(last_page_index, &pages_meta); + }) + .unwrap(); truncate_at.replace(pages_meta.pop().unwrap().start); starting_page_index = last_page_index; @@ -287,8 +277,6 @@ where pages_meta.write()?; - let mut file = file_opt.unwrap_or(self.open_file()?); - if let Some(truncate_at) = truncate_at { self.file_set_len(&mut file, truncate_at)?; } @@ -324,7 +312,11 @@ where let page_index = Self::index_to_page_index(index); - let values = self.decode_page(page_index, &self.mmap().load())?; + let mut file = self.open_file()?; + + let mmap = unsafe { Mmap::map(&file)? }; + + let values = self.decode_page(page_index, &mmap)?; let mut buf = vec![]; let mut page = pages_meta.truncate(page_index).unwrap(); @@ -348,8 +340,6 @@ where self.pages_meta.store(Arc::new(pages_meta)); - let mut file = self.open_file()?; - self.file_truncate_and_write_all(&mut file, len, &buf)?; Ok(()) @@ -399,7 +389,7 @@ impl Clone for CompressedVec { #[derive(Debug)] pub struct CompressedVecIterator<'a, I, T> { vec: &'a CompressedVec, - guard: Guard>, + mmap: Mmap, decoded_page: Option<(usize, Vec)>, // second_decoded_page?: Option<(usize, Vec)>, pages_meta: Guard>, @@ -445,7 +435,7 @@ where type Item = (I, Cow<'a, T>); fn next(&mut self) -> Option { - let mmap = &self.guard; + let mmap = &self.mmap; let i = self.index; let stored_len = self.stored_len; @@ -499,7 +489,7 @@ where let stored_len = CompressedVec::::stored_len__(&pages_meta); CompressedVecIterator { vec: self, - guard: self.mmap().load(), + mmap: self.create_mmap().unwrap(), decoded_page: None, pages_meta, stored_len, diff --git a/crates/brk_vec/src/variants/eager.rs b/crates/brk_vec/src/variants/eager.rs index eec317497..7bd5d3a47 100644 --- a/crates/brk_vec/src/variants/eager.rs +++ b/crates/brk_vec/src/variants/eager.rs @@ -8,7 +8,6 @@ use std::{ path::{Path, PathBuf}, }; -use arc_swap::ArcSwap; use brk_core::{ Bitcoin, CheckedSub, Close, Date, DateIndex, Dollars, Error, Result, Sats, StoredF32, StoredUsize, Version, @@ -108,10 +107,6 @@ where self.0.get_or_read(index, mmap) } - pub fn mmap(&self) -> &ArcSwap { - self.0.mmap() - } - pub fn inner_version(&self) -> Version { self.0.version() } diff --git a/crates/brk_vec/src/variants/indexed.rs b/crates/brk_vec/src/variants/indexed.rs index 32aa0d007..6ab8c836b 100644 --- a/crates/brk_vec/src/variants/indexed.rs +++ b/crates/brk_vec/src/variants/indexed.rs @@ -1,6 +1,5 @@ use std::{borrow::Cow, cmp::Ordering, fmt::Debug, path::Path}; -use arc_swap::ArcSwap; use brk_core::{Error, Height, Result, Version}; use crate::{ @@ -78,8 +77,8 @@ where self.0.header() } - pub fn mmap(&self) -> &ArcSwap { - self.0.mmap() + pub fn create_mmap(&self) -> Result { + self.0.create_mmap() } #[inline] diff --git a/crates/brk_vec/src/variants/raw.rs b/crates/brk_vec/src/variants/raw.rs index 94a400a9e..7f3aaace8 100644 --- a/crates/brk_vec/src/variants/raw.rs +++ b/crates/brk_vec/src/variants/raw.rs @@ -5,10 +5,12 @@ use std::{ marker::PhantomData, mem, path::{Path, PathBuf}, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, }; -use arc_swap::{ArcSwap, Guard}; use brk_core::{Error, Result, Version}; use memmap2::Mmap; use rayon::prelude::*; @@ -25,10 +27,9 @@ pub struct RawVec { header: Header, parent: PathBuf, name: &'static str, - // Consider Arc>> for dataraces when reorg ? - mmap: Arc>, pushed: Vec, phantom: PhantomData, + stored_len: Arc, } impl RawVec @@ -55,16 +56,18 @@ where pub fn import(parent: &Path, name: &str, version: Version) -> Result { let path = Self::path_(parent, name); - let (mmap, header) = match Self::open_file_(&path) { + let (header, file) = match Self::open_file_(&path) { Ok(mut file) => { if file.metadata()?.len() == 0 { - let header = Header::create_and_write(&mut file, version, Format::Raw)?; - let mmap = Self::new_mmap(&file)?; - (mmap, header) + ( + Header::create_and_write(&mut file, version, Format::Raw)?, + Some(file), + ) } else { - let mmap = Self::new_mmap(&file)?; - let header = Header::import_and_verify(&mmap, version, Format::Raw)?; - (mmap, header) + ( + Header::import_and_verify(&mut file, version, Format::Raw)?, + None, + ) } } Err(e) => match e.kind() { @@ -72,22 +75,25 @@ where fs::create_dir_all(Self::folder_(parent, name))?; let mut file = Self::open_file_(&path)?; let header = Header::create_and_write(&mut file, version, Format::Raw)?; - let mmap = Self::new_mmap(&file)?; - (mmap, header) + (header, None) } _ => return Err(e.into()), }, }; - let mmap = Arc::new(ArcSwap::new(mmap)); + let stored_len = if let Some(file) = file { + (file.metadata()?.len() as usize - HEADER_OFFSET) / Self::SIZE_OF_T + } else { + 0 + }; Ok(Self { - mmap, header, name: Box::leak(Box::new(name.to_string())), parent: parent.to_owned(), pushed: vec![], phantom: PhantomData, + stored_len: Arc::new(AtomicUsize::new(stored_len)), }) } @@ -141,18 +147,9 @@ where &mut self.header } - #[inline] - fn mmap(&self) -> &ArcSwap { - &self.mmap - } - #[inline] fn stored_len(&self) -> usize { - self.stored_len_(&self.mmap.load()) - } - #[inline] - fn stored_len_(&self, mmap: &Mmap) -> usize { - (mmap.len() - HEADER_OFFSET) / Self::SIZE_OF_T + self.stored_len.load(Ordering::SeqCst) } #[inline] @@ -194,9 +191,10 @@ where }; let mut file = file_opt.unwrap_or(self.open_file()?); - self.file_write_all(&mut file, &bytes)?; + self.stored_len.fetch_add(pushed_len, Ordering::SeqCst); + Ok(()) } @@ -212,6 +210,8 @@ where return Ok(()); } + self.stored_len.store(index, Ordering::SeqCst); + let len = index * Self::SIZE_OF_T + HEADER_OFFSET; let mut file = self.open_file()?; @@ -221,6 +221,7 @@ where } fn reset(&mut self) -> Result<()> { + self.stored_len.store(0, Ordering::SeqCst); self.reset_() } } @@ -262,9 +263,9 @@ impl Clone for RawVec { header: self.header.clone(), parent: self.parent.clone(), name: self.name, - mmap: self.mmap.clone(), pushed: vec![], phantom: PhantomData, + stored_len: self.stored_len.clone(), } } } @@ -272,7 +273,7 @@ impl Clone for RawVec { #[derive(Debug)] pub struct RawVecIterator<'a, I, T> { vec: &'a RawVec, - guard: Guard>, + mmap: Mmap, index: usize, } @@ -305,7 +306,7 @@ where type Item = (I, Cow<'a, T>); fn next(&mut self) -> Option { - let mmap = &self.guard; + let mmap = &self.mmap; let index = self.index; let opt = self @@ -333,7 +334,7 @@ where fn into_iter(self) -> Self::IntoIter { RawVecIterator { vec: self, - guard: self.mmap.load(), + mmap: self.create_mmap().unwrap(), index: 0, } } diff --git a/crates/brk_vec/src/variants/stored.rs b/crates/brk_vec/src/variants/stored.rs index ffd6d06f6..20f81283f 100644 --- a/crates/brk_vec/src/variants/stored.rs +++ b/crates/brk_vec/src/variants/stored.rs @@ -3,7 +3,6 @@ use std::{ path::{Path, PathBuf}, }; -use arc_swap::ArcSwap; use brk_core::{Result, Version}; use memmap2::Mmap; @@ -75,14 +74,6 @@ where } } - #[inline] - fn mmap(&self) -> &ArcSwap { - match self { - StoredVec::Raw(v) => v.mmap(), - StoredVec::Compressed(v) => v.mmap(), - } - } - #[inline] fn parent(&self) -> &Path { match self { @@ -98,13 +89,6 @@ where StoredVec::Compressed(v) => v.stored_len(), } } - #[inline] - fn stored_len_(&self, mmap: &Mmap) -> usize { - match self { - StoredVec::Raw(v) => v.stored_len_(mmap), - StoredVec::Compressed(v) => v.stored_len_(mmap), - } - } #[inline] fn pushed(&self) -> &[T] {