diff --git a/crates/brk_computer/src/vecs/mod.rs b/crates/brk_computer/src/vecs/mod.rs index 24683c3b3..9bb9ad0ad 100644 --- a/crates/brk_computer/src/vecs/mod.rs +++ b/crates/brk_computer/src/vecs/mod.rs @@ -1,4 +1,4 @@ -use std::{path::Path, thread}; +use std::path::Path; use brk_core::Version; use brk_exit::Exit; @@ -44,31 +44,22 @@ impl Vecs { computation: Computation, format: Format, ) -> color_eyre::Result { - let (indexes, fetched) = thread::scope(|s| { - let indexes_handle = s.spawn(|| { - indexes::Vecs::forced_import( - path, - version + VERSION + Version::ZERO, - indexer, - computation, - format, - ) - .unwrap() - }); + let indexes = indexes::Vecs::forced_import( + path, + version + VERSION + Version::ZERO, + indexer, + computation, + format, + )?; - let fetch_handle = s.spawn(|| { - fetch.then(|| { - fetched::Vecs::forced_import( - path, - version + VERSION + Version::ZERO, - computation, - format, - ) - .unwrap() - }) - }); - - (indexes_handle.join().unwrap(), fetch_handle.join().unwrap()) + let fetched = fetch.then(|| { + fetched::Vecs::forced_import( + path, + version + VERSION + Version::ZERO, + computation, + format, + ) + .unwrap() }); Ok(Self { diff --git a/crates/brk_core/src/utils/rlimit.rs b/crates/brk_core/src/utils/rlimit.rs index 1e35e9385..162a52574 100644 --- a/crates/brk_core/src/utils/rlimit.rs +++ b/crates/brk_core/src/utils/rlimit.rs @@ -7,7 +7,7 @@ pub fn setrlimit() -> io::Result<()> { rlimit::setrlimit( Resource::NOFILE, - no_file_limit.0.max(420_000), + no_file_limit.0.max(210_000), no_file_limit.1, )?; diff --git a/crates/brk_vec/src/structs/header.rs b/crates/brk_vec/src/structs/header.rs index bfe6972ab..7a66793aa 100644 --- a/crates/brk_vec/src/structs/header.rs +++ b/crates/brk_vec/src/structs/header.rs @@ -57,6 +57,10 @@ impl Header { }); } + pub fn modified(&self) -> bool { + self.modified + } + pub fn vec_version(&self) -> Version { self.inner.load().vec_version } @@ -69,11 +73,9 @@ impl Header { self.inner.load().height } - pub fn write_if_needed(&mut self, file: &mut File) -> io::Result<()> { - if self.modified { - self.inner.load().write(file)?; - self.modified = false; - } + pub fn write(&mut self, file: &mut File) -> io::Result<()> { + self.inner.load().write(file)?; + self.modified = false; Ok(()) } } diff --git a/crates/brk_vec/src/traits/generic.rs b/crates/brk_vec/src/traits/generic.rs index 5e7ae4679..38955e43a 100644 --- a/crates/brk_vec/src/traits/generic.rs +++ b/crates/brk_vec/src/traits/generic.rs @@ -95,6 +95,9 @@ where // --- + fn open_file(&self) -> io::Result { + Self::open_file_(&self.path()) + } fn open_file_(path: &Path) -> io::Result { let mut file = OpenOptions::new() .read(true) @@ -106,13 +109,9 @@ where Ok(file) } - fn file(&self) -> &File; - fn mut_file(&mut self) -> &mut File; - - fn file_set_len(&mut self, len: u64) -> Result<()> { - let file = self.mut_file(); + fn file_set_len(&mut self, file: &mut File, len: u64) -> Result<()> { Self::file_set_len_(file, len)?; - self.update_mmap() + self.update_mmap(file) } fn file_set_len_(file: &mut File, len: u64) -> Result<()> { file.set_len(len)?; @@ -120,32 +119,31 @@ where Ok(()) } - fn file_write_all(&mut self, buf: &[u8]) -> Result<()> { - let file = self.mut_file(); + fn file_write_all(&mut self, file: &mut File, buf: &[u8]) -> Result<()> { file.write_all(buf)?; - self.update_mmap() + self.update_mmap(file) } - fn file_truncate_and_write_all(&mut self, len: u64, buf: &[u8]) -> Result<()> { - let file = self.mut_file(); + fn file_truncate_and_write_all(&mut self, file: &mut File, len: u64, buf: &[u8]) -> Result<()> { Self::file_set_len_(file, len)?; file.write_all(buf)?; - self.update_mmap() + self.update_mmap(file) } fn reset(&mut self) -> Result<()>; #[inline] fn reset_(&mut self) -> Result<()> { - self.file_truncate_and_write_all(HEADER_OFFSET as u64, &[]) + let mut file = self.open_file()?; + self.file_truncate_and_write_all(&mut file, HEADER_OFFSET as u64, &[]) } fn new_mmap(file: &File) -> Result> { Ok(Arc::new(unsafe { Mmap::map(file)? })) } - fn update_mmap(&mut self) -> Result<()> { - let mmap = Self::new_mmap(self.file())?; + fn update_mmap(&mut self, file: &File) -> Result<()> { + let mmap = Self::new_mmap(file)?; self.mmap().store(mmap); Ok(()) } diff --git a/crates/brk_vec/src/variants/compressed.rs b/crates/brk_vec/src/variants/compressed.rs index 98a9f366d..769e33bbb 100644 --- a/crates/brk_vec/src/variants/compressed.rs +++ b/crates/brk_vec/src/variants/compressed.rs @@ -1,6 +1,5 @@ use std::{ - fs::{self, File}, - mem, + fs, mem, path::{Path, PathBuf}, sync::Arc, }; @@ -189,14 +188,6 @@ where self.inner.parent() } - fn file(&self) -> &File { - self.inner.file() - } - - fn mut_file(&mut self) -> &mut File { - self.inner.mut_file() - } - #[inline] fn stored_len(&self) -> usize { Self::stored_len__(&self.pages_meta.load()) @@ -221,7 +212,7 @@ where } fn flush(&mut self) -> Result<()> { - self.inner.write_header_if_needed()?; + let file_opt = self.inner.write_header_if_needed()?; let pushed_len = self.pushed_len(); @@ -295,11 +286,13 @@ where pages_meta.write()?; + let mut file = file_opt.unwrap_or(self.open_file()?); + if let Some(truncate_at) = truncate_at { - self.file_set_len(truncate_at)?; + self.file_set_len(&mut file, truncate_at)?; } - self.file_write_all(&buf)?; + self.file_write_all(&mut file, &buf)?; self.pages_meta.store(Arc::new(pages_meta)); @@ -354,7 +347,9 @@ where self.pages_meta.store(Arc::new(pages_meta)); - self.file_truncate_and_write_all(len, &buf)?; + let mut file = self.open_file()?; + + self.file_truncate_and_write_all(&mut file, len, &buf)?; Ok(()) } diff --git a/crates/brk_vec/src/variants/raw.rs b/crates/brk_vec/src/variants/raw.rs index 4319c8c57..e984e99f9 100644 --- a/crates/brk_vec/src/variants/raw.rs +++ b/crates/brk_vec/src/variants/raw.rs @@ -24,7 +24,7 @@ pub struct RawVec { header: Header, parent: PathBuf, name: String, - file: Option, + // file: Option, // Consider Arc>> for dataraces when reorg ? mmap: Arc>, pushed: Vec, @@ -55,18 +55,18 @@ where pub fn import(parent: &Path, name: &str, version: Version) -> Result { let path = Self::path_(parent, name); - let (file, mmap, header) = match Self::open_file_(&path) { + let (mmap, header) = match Self::open_file_(&path) { Ok(mut file) => { if file.metadata()?.len() == 0 { let header = Header::create_and_write(&mut file, version, Format::Raw)?; let mmap = Self::new_mmap(&file)?; - (file, mmap, header) + (mmap, header) } else { let mmap = Self::new_mmap(&file)?; // dbg!(&mmap[..]); let header = Header::import_and_verify(&mmap, version, Format::Raw)?; // dbg!((&header, name, I::to_string())); - (file, mmap, header) + (mmap, header) } } Err(e) => match e.kind() { @@ -75,7 +75,7 @@ where let mut file = Self::open_file_(&path)?; let header = Header::create_and_write(&mut file, version, Format::Raw)?; let mmap = Self::new_mmap(&file)?; - (file, mmap, header) + (mmap, header) } _ => return Err(e.into()), }, @@ -86,7 +86,7 @@ where Ok(Self { mmap, header, - file: Some(file), + // file: Some(file), name: name.to_string(), parent: parent.to_owned(), pushed: vec![], @@ -111,8 +111,14 @@ where iter } - pub fn write_header_if_needed(&mut self) -> io::Result<()> { - self.header.write_if_needed(self.file.as_mut().unwrap()) + pub fn write_header_if_needed(&mut self) -> io::Result> { + if self.header.modified() { + let mut file = self.open_file()?; + self.header.write(&mut file)?; + Ok(Some(file)) + } else { + Ok(None) + } } } @@ -143,16 +149,6 @@ where &self.mmap } - #[inline] - fn file(&self) -> &File { - self.file.as_ref().unwrap() - } - - #[inline] - fn mut_file(&mut self) -> &mut File { - self.file.as_mut().unwrap() - } - #[inline] fn stored_len(&self) -> usize { self.stored_len_(&self.mmap.load()) @@ -177,7 +173,7 @@ where } fn flush(&mut self) -> Result<()> { - self.write_header_if_needed()?; + let file_opt = self.write_header_if_needed()?; let pushed_len = self.pushed_len(); @@ -200,7 +196,9 @@ where bytes }; - self.file_write_all(&bytes)?; + let mut file = file_opt.unwrap_or(self.open_file()?); + + self.file_write_all(&mut file, &bytes)?; Ok(()) } @@ -219,7 +217,8 @@ where let len = index * Self::SIZE_OF_T + HEADER_OFFSET; - self.file_set_len(len as u64)?; + let mut file = self.open_file()?; + self.file_set_len(&mut file, len as u64)?; Ok(()) } @@ -266,7 +265,6 @@ impl Clone for RawVec { header: self.header.clone(), parent: self.parent.clone(), name: self.name.clone(), - file: None, mmap: self.mmap.clone(), pushed: vec![], phantom: PhantomData, diff --git a/crates/brk_vec/src/variants/stored.rs b/crates/brk_vec/src/variants/stored.rs index a4cccb050..7283577ff 100644 --- a/crates/brk_vec/src/variants/stored.rs +++ b/crates/brk_vec/src/variants/stored.rs @@ -1,7 +1,4 @@ -use std::{ - fs::File, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; use arc_swap::ArcSwap; use brk_core::{Result, Value, Version}; @@ -91,22 +88,6 @@ where } } - #[inline] - fn file(&self) -> &File { - match self { - StoredVec::Raw(v) => v.file(), - StoredVec::Compressed(v) => v.file(), - } - } - - #[inline] - fn mut_file(&mut self) -> &mut File { - match self { - StoredVec::Raw(v) => v.mut_file(), - StoredVec::Compressed(v) => v.mut_file(), - } - } - #[inline] fn stored_len(&self) -> usize { match self {