vec: don't store mmap in struct anymore

This commit is contained in:
nym21
2025-07-13 11:50:34 +02:00
parent a98546f605
commit 2dd608dfed
13 changed files with 204 additions and 158 deletions

1
Cargo.lock generated
View File

@@ -548,7 +548,6 @@ dependencies = [
"derive_deref",
"either",
"fjall",
"jiff",
"log",
"rayon",
"serde",

View File

@@ -23,7 +23,6 @@ color-eyre = { workspace = true }
derive_deref = { workspace = true }
either = "1.15.0"
fjall = { workspace = true }
jiff = { workspace = true }
log = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }

View File

@@ -10,6 +10,7 @@ use brk_fetcher::Fetcher;
use brk_indexer::Indexer;
use brk_vec::{
AnyCollectableVec, AnyIterableVec, AnyVec, Computation, EagerVec, Format, StoredIndex,
VecIterator,
};
use crate::vecs::grouped::Source;
@@ -456,33 +457,34 @@ impl Vecs {
exit,
)?;
let mut prev = None;
self.dateindex_to_ohlc_in_cents.compute_transform(
starting_indexes.dateindex,
&indexes.dateindex_to_date,
|(di, d, this)| {
let get_prev = || {
this.get_or_read(di, &this.mmap().load())
.unwrap()
.unwrap()
.into_owned()
};
if prev.is_none() {
let i = di.unwrap_to_usize();
prev.replace(if i > 0 {
this.into_iter().unwrap_get_inner_(i - 1)
} else {
OHLCCents::default()
});
}
let mut ohlc = if di.unwrap_to_usize() + 100 >= this.len() {
fetcher.get_date(d).unwrap_or_else(|_| get_prev())
} else {
get_prev()
};
if let Some(prev) = di.decremented() {
let prev_open = *this
.get_or_read(prev, &this.mmap().load())
.unwrap()
.unwrap()
.close;
let ohlc = if di.unwrap_to_usize() + 100 >= this.len()
&& let Ok(mut ohlc) = fetcher.get_date(d)
{
let prev_open = *prev.as_ref().unwrap().close;
*ohlc.open = prev_open;
*ohlc.high = (*ohlc.high).max(prev_open);
*ohlc.low = (*ohlc.low).min(prev_open);
}
ohlc
} else {
prev.clone().unwrap()
};
prev.replace(ohlc.clone());
(di, ohlc)
},
exit,

View File

@@ -498,9 +498,9 @@ impl Vecs {
let dateindex_to_first_height = &indexes.dateindex_to_first_height;
let dateindex_to_height_count = &indexes.dateindex_to_height_count;
let outputindex_to_value_mmap = outputindex_to_value.mmap().load();
let outputindex_to_outputtype_mmap = outputindex_to_outputtype.mmap().load();
let outputindex_to_typeindex_mmap = outputindex_to_typeindex.mmap().load();
let outputindex_to_value_mmap = outputindex_to_value.create_mmap()?;
let outputindex_to_outputtype_mmap = outputindex_to_outputtype.create_mmap()?;
let outputindex_to_typeindex_mmap = outputindex_to_typeindex.create_mmap()?;
let mut inputindex_to_outputindex_iter = inputindex_to_outputindex.into_iter();
let mut height_to_first_outputindex_iter = height_to_first_outputindex.into_iter();

View File

@@ -7,7 +7,7 @@ pub fn setrlimit() -> io::Result<()> {
rlimit::setrlimit(
Resource::NOFILE,
no_file_limit.0.max(250_000),
no_file_limit.0.max(10_000),
no_file_limit.1,
)?;

View File

@@ -15,7 +15,7 @@ use bitcoin::{Transaction, TxIn, TxOut};
use brk_exit::Exit;
use brk_parser::Parser;
use brk_store::AnyStore;
use brk_vec::{AnyVec, VecIterator};
use brk_vec::{AnyVec, Mmap, VecIterator};
use color_eyre::eyre::{ContextCompat, eyre};
use log::{error, info};
use rayon::prelude::*;
@@ -40,6 +40,7 @@ pub struct Indexer {
impl Indexer {
pub fn forced_import(outputs_dir: &Path) -> color_eyre::Result<Self> {
setrlimit()?;
Ok(Self {
vecs: Vecs::forced_import(&outputs_dir.join("vecs/indexed"), VERSION + Version::ZERO)?,
stores: Stores::forced_import(&outputs_dir.join("stores"), VERSION + Version::ZERO)?,
@@ -88,9 +89,9 @@ impl Indexer {
height: Height,
rem: bool,
exit: &Exit|
-> color_eyre::Result<()> {
-> color_eyre::Result<bool> {
if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem || exit.triggered() {
return Ok(());
return Ok(false);
}
info!("Exporting...");
@@ -98,15 +99,88 @@ impl Indexer {
stores.commit(height)?;
vecs.flush(height)?;
exit.release();
Ok(())
Ok(true)
};
let mut txindex_to_first_outputindex_mmap_opt = None;
let mut p2pk65addressindex_to_p2pk65bytes_mmap_opt = None;
let mut p2pk33addressindex_to_p2pk33bytes_mmap_opt = None;
let mut p2pkhaddressindex_to_p2pkhbytes_mmap_opt = None;
let mut p2shaddressindex_to_p2shbytes_mmap_opt = None;
let mut p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt = None;
let mut p2wshaddressindex_to_p2wshbytes_mmap_opt = None;
let mut p2traddressindex_to_p2trbytes_mmap_opt = None;
let mut p2aaddressindex_to_p2abytes_mmap_opt = None;
let reset_mmaps_options =
|vecs: &mut Vecs,
txindex_to_first_outputindex_mmap_opt: &mut Option<Mmap>,
p2pk65addressindex_to_p2pk65bytes_mmap_opt: &mut Option<Mmap>,
p2pk33addressindex_to_p2pk33bytes_mmap_opt: &mut Option<Mmap>,
p2pkhaddressindex_to_p2pkhbytes_mmap_opt: &mut Option<Mmap>,
p2shaddressindex_to_p2shbytes_mmap_opt: &mut Option<Mmap>,
p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt: &mut Option<Mmap>,
p2wshaddressindex_to_p2wshbytes_mmap_opt: &mut Option<Mmap>,
p2traddressindex_to_p2trbytes_mmap_opt: &mut Option<Mmap>,
p2aaddressindex_to_p2abytes_mmap_opt: &mut Option<Mmap>| {
txindex_to_first_outputindex_mmap_opt
.replace(vecs.txindex_to_first_outputindex.create_mmap().unwrap());
p2pk65addressindex_to_p2pk65bytes_mmap_opt.replace(
vecs.p2pk65addressindex_to_p2pk65bytes
.create_mmap()
.unwrap(),
);
p2pk33addressindex_to_p2pk33bytes_mmap_opt.replace(
vecs.p2pk33addressindex_to_p2pk33bytes
.create_mmap()
.unwrap(),
);
p2pkhaddressindex_to_p2pkhbytes_mmap_opt
.replace(vecs.p2pkhaddressindex_to_p2pkhbytes.create_mmap().unwrap());
p2shaddressindex_to_p2shbytes_mmap_opt
.replace(vecs.p2shaddressindex_to_p2shbytes.create_mmap().unwrap());
p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt.replace(
vecs.p2wpkhaddressindex_to_p2wpkhbytes
.create_mmap()
.unwrap(),
);
p2wshaddressindex_to_p2wshbytes_mmap_opt
.replace(vecs.p2wshaddressindex_to_p2wshbytes.create_mmap().unwrap());
p2traddressindex_to_p2trbytes_mmap_opt
.replace(vecs.p2traddressindex_to_p2trbytes.create_mmap().unwrap());
p2aaddressindex_to_p2abytes_mmap_opt
.replace(vecs.p2aaddressindex_to_p2abytes.create_mmap().unwrap());
};
reset_mmaps_options(
vecs,
&mut txindex_to_first_outputindex_mmap_opt,
&mut p2pk65addressindex_to_p2pk65bytes_mmap_opt,
&mut p2pk33addressindex_to_p2pk33bytes_mmap_opt,
&mut p2pkhaddressindex_to_p2pkhbytes_mmap_opt,
&mut p2shaddressindex_to_p2shbytes_mmap_opt,
&mut p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt,
&mut p2wshaddressindex_to_p2wshbytes_mmap_opt,
&mut p2traddressindex_to_p2trbytes_mmap_opt,
&mut p2aaddressindex_to_p2abytes_mmap_opt,
);
parser.parse(start, end).iter().try_for_each(
|(height, block, blockhash)| -> color_eyre::Result<()> {
info!("Indexing block {height}...");
idxs.height = height;
let txindex_to_first_outputindex_mmap = txindex_to_first_outputindex_mmap_opt.as_ref().unwrap();
let p2pk65addressindex_to_p2pk65bytes_mmap = p2pk65addressindex_to_p2pk65bytes_mmap_opt.as_ref().unwrap();
let p2pk33addressindex_to_p2pk33bytes_mmap = p2pk33addressindex_to_p2pk33bytes_mmap_opt.as_ref().unwrap();
let p2pkhaddressindex_to_p2pkhbytes_mmap = p2pkhaddressindex_to_p2pkhbytes_mmap_opt.as_ref().unwrap();
let p2shaddressindex_to_p2shbytes_mmap = p2shaddressindex_to_p2shbytes_mmap_opt.as_ref().unwrap();
let p2wpkhaddressindex_to_p2wpkhbytes_mmap = p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt.as_ref().unwrap();
let p2wshaddressindex_to_p2wshbytes_mmap = p2wshaddressindex_to_p2wshbytes_mmap_opt.as_ref().unwrap();
let p2traddressindex_to_p2trbytes_mmap = p2traddressindex_to_p2trbytes_mmap_opt.as_ref().unwrap();
let p2aaddressindex_to_p2abytes_mmap = p2aaddressindex_to_p2abytes_mmap_opt.as_ref().unwrap();
// Used to check rapidhash collisions
let check_collisions = check_collisions && height > Height::new(COLLISIONS_CHECKED_UP_TO);
@@ -166,9 +240,6 @@ impl Indexer {
});
let input_source_vec_handle = scope.spawn(|| {
let txindex_to_first_outputindex_mmap = vecs
.txindex_to_first_outputindex.mmap().load();
let inputs = block
.txdata
.iter()
@@ -211,7 +282,7 @@ impl Indexer {
let vout = Vout::from(outpoint.vout);
let outputindex = vecs.txindex_to_first_outputindex.get_or_read(prev_txindex, &txindex_to_first_outputindex_mmap)?
let outputindex = vecs.txindex_to_first_outputindex.get_or_read(prev_txindex, txindex_to_first_outputindex_mmap)?
.context("Expect outputindex to not be none")
.inspect_err(|_| {
dbg!(outpoint.txid, prev_txindex, vout);
@@ -240,16 +311,6 @@ impl Indexer {
})
});
let p2pk65addressindex_to_p2pk65bytes_mmap = vecs
.p2pk65addressindex_to_p2pk65bytes.mmap().load();
let p2pk33addressindex_to_p2pk33bytes_mmap = vecs.p2pk33addressindex_to_p2pk33bytes.mmap().load();
let p2pkhaddressindex_to_p2pkhbytes_mmap = vecs.p2pkhaddressindex_to_p2pkhbytes.mmap().load();
let p2shaddressindex_to_p2shbytes_mmap = vecs.p2shaddressindex_to_p2shbytes.mmap().load();
let p2wpkhaddressindex_to_p2wpkhbytes_mmap = vecs.p2wpkhaddressindex_to_p2wpkhbytes.mmap().load();
let p2wshaddressindex_to_p2wshbytes_mmap = vecs.p2wshaddressindex_to_p2wshbytes.mmap().load();
let p2traddressindex_to_p2trbytes_mmap = vecs.p2traddressindex_to_p2trbytes.mmap().load();
let p2aaddressindex_to_p2abytes_mmap = vecs.p2aaddressindex_to_p2abytes.mmap().load();
let outputs = block
.txdata
.iter()
@@ -307,35 +368,35 @@ impl Indexer {
let prev_addressbytes_opt = match outputtype {
OutputType::P2PK65 => vecs
.p2pk65addressindex_to_p2pk65bytes
.get_or_read(typeindex.into(), &p2pk65addressindex_to_p2pk65bytes_mmap)?
.get_or_read(typeindex.into(), p2pk65addressindex_to_p2pk65bytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2PK33 => vecs
.p2pk33addressindex_to_p2pk33bytes
.get_or_read(typeindex.into(), &p2pk33addressindex_to_p2pk33bytes_mmap)?
.get_or_read(typeindex.into(), p2pk33addressindex_to_p2pk33bytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2PKH => vecs
.p2pkhaddressindex_to_p2pkhbytes
.get_or_read(typeindex.into(), &p2pkhaddressindex_to_p2pkhbytes_mmap)?
.get_or_read(typeindex.into(), p2pkhaddressindex_to_p2pkhbytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2SH => vecs
.p2shaddressindex_to_p2shbytes
.get_or_read(typeindex.into(), &p2shaddressindex_to_p2shbytes_mmap)?
.get_or_read(typeindex.into(), p2shaddressindex_to_p2shbytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2WPKH => vecs
.p2wpkhaddressindex_to_p2wpkhbytes
.get_or_read(typeindex.into(), &p2wpkhaddressindex_to_p2wpkhbytes_mmap)?
.get_or_read(typeindex.into(), p2wpkhaddressindex_to_p2wpkhbytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2WSH => vecs
.p2wshaddressindex_to_p2wshbytes
.get_or_read(typeindex.into(), &p2wshaddressindex_to_p2wshbytes_mmap)?
.get_or_read(typeindex.into(), p2wshaddressindex_to_p2wshbytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2TR => vecs
.p2traddressindex_to_p2trbytes
.get_or_read(typeindex.into(), &p2traddressindex_to_p2trbytes_mmap)?
.get_or_read(typeindex.into(), p2traddressindex_to_p2trbytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2A => vecs
.p2aaddressindex_to_p2abytes
.get_or_read(typeindex.into(), &p2aaddressindex_to_p2abytes_mmap)?
.get_or_read(typeindex.into(), p2aaddressindex_to_p2abytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::Empty | OutputType::OpReturn | OutputType::P2MS | OutputType::Unknown => {
unreachable!()
@@ -677,7 +738,22 @@ impl Indexer {
idxs.inputindex += InputIndex::from(inputs_len);
idxs.outputindex += OutputIndex::from(outputs_len);
export_if_needed(stores, vecs, height, false, exit)?;
let exported = export_if_needed(stores, vecs, height, false, exit)?;
if exported {
reset_mmaps_options(
vecs,
&mut txindex_to_first_outputindex_mmap_opt,
&mut p2pk65addressindex_to_p2pk65bytes_mmap_opt,
&mut p2pk33addressindex_to_p2pk33bytes_mmap_opt,
&mut p2pkhaddressindex_to_p2pkhbytes_mmap_opt,
&mut p2shaddressindex_to_p2shbytes_mmap_opt,
&mut p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt,
&mut p2wshaddressindex_to_p2wshbytes_mmap_opt,
&mut p2traddressindex_to_p2trbytes_mmap_opt,
&mut p2aaddressindex_to_p2abytes_mmap_opt,
);
}
Ok(())
},

View File

@@ -1,13 +1,12 @@
use std::{
fs::File,
io::{self, Seek, SeekFrom},
io::{self, Read, Seek, SeekFrom},
os::unix::fs::FileExt,
sync::Arc,
};
use arc_swap::ArcSwap;
use brk_core::{Error, Height, Result, Version};
use memmap2::Mmap;
use zerocopy::{FromBytes, IntoBytes};
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
@@ -31,8 +30,12 @@ impl Header {
})
}
pub fn import_and_verify(mmap: &Mmap, vec_version: Version, format: Format) -> Result<Self> {
let inner = HeaderInner::import_and_verify(mmap, vec_version, format)?;
pub fn import_and_verify(
file: &mut File,
vec_version: Version,
format: Format,
) -> Result<Self> {
let inner = HeaderInner::import_and_verify(file, vec_version, format)?;
Ok(Self {
inner: Arc::new(ArcSwap::from_pointee(inner)),
modified: false,
@@ -109,13 +112,20 @@ impl HeaderInner {
file.write_all_at(self.as_bytes(), 0)
}
pub fn import_and_verify(mmap: &Mmap, vec_version: Version, format: Format) -> Result<Self> {
if mmap.len() < HEADER_OFFSET {
pub fn import_and_verify(
file: &mut File,
vec_version: Version,
format: Format,
) -> Result<Self> {
if file.metadata()?.len() < HEADER_OFFSET as u64 {
return Err(Error::WrongLength);
}
// dbg!(mmap.len());
let header = HeaderInner::read_from_bytes(&mmap[..HEADER_OFFSET])?;
// dbg!(&header);
let mut buf = [0; HEADER_OFFSET];
file.read_exact(&mut buf)?;
let header = HeaderInner::read_from_bytes(&buf)?;
if header.header_version != HEADER_VERSION {
return Err(Error::DifferentVersion {
found: header.header_version,

View File

@@ -3,10 +3,8 @@ use std::{
fs::{File, OpenOptions},
io::{self, Seek, SeekFrom, Write},
path::{Path, PathBuf},
sync::Arc,
};
use arc_swap::ArcSwap;
use brk_core::Result;
use memmap2::Mmap;
@@ -38,7 +36,7 @@ where
}
#[inline]
fn get_or_read_(&self, index: usize, mmap: &Mmap) -> Result<Option<Cow<T>>> {
let stored_len = self.stored_len_(mmap);
let stored_len = self.stored_len();
if index >= stored_len {
let pushed = self.pushed();
@@ -61,10 +59,7 @@ where
format!("{}_to_{}", I::to_string(), self.name())
}
fn mmap(&self) -> &ArcSwap<Mmap>;
fn stored_len(&self) -> usize;
fn stored_len_(&self, mmap: &Mmap) -> usize;
fn pushed(&self) -> &[T];
#[inline]
@@ -116,7 +111,7 @@ where
fn file_set_len(&mut self, file: &mut File, len: u64) -> Result<()> {
Self::file_set_len_(file, len)?;
self.update_mmap(file)
Ok(())
}
fn file_set_len_(file: &mut File, len: u64) -> Result<()> {
file.set_len(len)?;
@@ -127,7 +122,7 @@ where
fn file_write_all(&mut self, file: &mut File, buf: &[u8]) -> Result<()> {
file.write_all(buf)?;
file.flush()?;
self.update_mmap(file)
Ok(())
}
fn file_truncate_and_write_all(&mut self, file: &mut File, len: u64, buf: &[u8]) -> Result<()> {
@@ -143,14 +138,10 @@ where
self.file_truncate_and_write_all(&mut file, HEADER_OFFSET as u64, &[])
}
fn new_mmap(file: &File) -> Result<Arc<Mmap>> {
Ok(Arc::new(unsafe { Mmap::map(file)? }))
}
fn update_mmap(&mut self, file: &File) -> Result<()> {
let mmap = Self::new_mmap(file)?;
self.mmap().store(mmap);
Ok(())
#[inline]
fn create_mmap(&self) -> Result<Mmap> {
let file = self.open_file()?;
unsafe { Mmap::map(&file).map_err(|e| e.into()) }
}
#[inline]

View File

@@ -180,11 +180,6 @@ where
self.inner.mut_header()
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
self.inner.mmap()
}
fn parent(&self) -> &Path {
self.inner.parent()
}
@@ -193,10 +188,6 @@ where
fn stored_len(&self) -> usize {
Self::stored_len__(&self.pages_meta.load())
}
#[inline]
fn stored_len_(&self, _: &Mmap) -> usize {
self.stored_len()
}
#[inline]
fn pushed(&self) -> &[T] {
@@ -223,6 +214,8 @@ where
let stored_len = self.stored_len();
let mut file = file_opt.unwrap_or(self.open_file()?);
let mut pages_meta = (**self.pages_meta.load()).clone();
let mut starting_page_index = pages_meta.len();
@@ -236,16 +229,13 @@ where
let last_page_index = pages_meta.len() - 1;
values = Self::decode_page_(
stored_len,
last_page_index,
&self.mmap().load(),
&pages_meta,
)
.inspect_err(|_| {
dbg!(last_page_index, &pages_meta);
})
.unwrap();
let mmap = unsafe { Mmap::map(&file)? };
values = Self::decode_page_(stored_len, last_page_index, &mmap, &pages_meta)
.inspect_err(|_| {
dbg!(last_page_index, &pages_meta);
})
.unwrap();
truncate_at.replace(pages_meta.pop().unwrap().start);
starting_page_index = last_page_index;
@@ -287,8 +277,6 @@ where
pages_meta.write()?;
let mut file = file_opt.unwrap_or(self.open_file()?);
if let Some(truncate_at) = truncate_at {
self.file_set_len(&mut file, truncate_at)?;
}
@@ -324,7 +312,11 @@ where
let page_index = Self::index_to_page_index(index);
let values = self.decode_page(page_index, &self.mmap().load())?;
let mut file = self.open_file()?;
let mmap = unsafe { Mmap::map(&file)? };
let values = self.decode_page(page_index, &mmap)?;
let mut buf = vec![];
let mut page = pages_meta.truncate(page_index).unwrap();
@@ -348,8 +340,6 @@ where
self.pages_meta.store(Arc::new(pages_meta));
let mut file = self.open_file()?;
self.file_truncate_and_write_all(&mut file, len, &buf)?;
Ok(())
@@ -399,7 +389,7 @@ impl<I, T> Clone for CompressedVec<I, T> {
#[derive(Debug)]
pub struct CompressedVecIterator<'a, I, T> {
vec: &'a CompressedVec<I, T>,
guard: Guard<Arc<Mmap>>,
mmap: Mmap,
decoded_page: Option<(usize, Vec<T>)>,
// second_decoded_page?: Option<(usize, Vec<T>)>,
pages_meta: Guard<Arc<CompressedPagesMetadata>>,
@@ -445,7 +435,7 @@ where
type Item = (I, Cow<'a, T>);
fn next(&mut self) -> Option<Self::Item> {
let mmap = &self.guard;
let mmap = &self.mmap;
let i = self.index;
let stored_len = self.stored_len;
@@ -499,7 +489,7 @@ where
let stored_len = CompressedVec::<I, T>::stored_len__(&pages_meta);
CompressedVecIterator {
vec: self,
guard: self.mmap().load(),
mmap: self.create_mmap().unwrap(),
decoded_page: None,
pages_meta,
stored_len,

View File

@@ -8,7 +8,6 @@ use std::{
path::{Path, PathBuf},
};
use arc_swap::ArcSwap;
use brk_core::{
Bitcoin, CheckedSub, Close, Date, DateIndex, Dollars, Error, Result, Sats, StoredF32,
StoredUsize, Version,
@@ -108,10 +107,6 @@ where
self.0.get_or_read(index, mmap)
}
pub fn mmap(&self) -> &ArcSwap<Mmap> {
self.0.mmap()
}
pub fn inner_version(&self) -> Version {
self.0.version()
}

View File

@@ -1,6 +1,5 @@
use std::{borrow::Cow, cmp::Ordering, fmt::Debug, path::Path};
use arc_swap::ArcSwap;
use brk_core::{Error, Height, Result, Version};
use crate::{
@@ -78,8 +77,8 @@ where
self.0.header()
}
pub fn mmap(&self) -> &ArcSwap<Mmap> {
self.0.mmap()
pub fn create_mmap(&self) -> Result<Mmap> {
self.0.create_mmap()
}
#[inline]

View File

@@ -5,10 +5,12 @@ use std::{
marker::PhantomData,
mem,
path::{Path, PathBuf},
sync::Arc,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use arc_swap::{ArcSwap, Guard};
use brk_core::{Error, Result, Version};
use memmap2::Mmap;
use rayon::prelude::*;
@@ -25,10 +27,9 @@ pub struct RawVec<I, T> {
header: Header,
parent: PathBuf,
name: &'static str,
// Consider Arc<ArcSwap<Option<Mmap>>> for dataraces when reorg ?
mmap: Arc<ArcSwap<Mmap>>,
pushed: Vec<T>,
phantom: PhantomData<I>,
stored_len: Arc<AtomicUsize>,
}
impl<I, T> RawVec<I, T>
@@ -55,16 +56,18 @@ where
pub fn import(parent: &Path, name: &str, version: Version) -> Result<Self> {
let path = Self::path_(parent, name);
let (mmap, header) = match Self::open_file_(&path) {
let (header, file) = match Self::open_file_(&path) {
Ok(mut file) => {
if file.metadata()?.len() == 0 {
let header = Header::create_and_write(&mut file, version, Format::Raw)?;
let mmap = Self::new_mmap(&file)?;
(mmap, header)
(
Header::create_and_write(&mut file, version, Format::Raw)?,
Some(file),
)
} else {
let mmap = Self::new_mmap(&file)?;
let header = Header::import_and_verify(&mmap, version, Format::Raw)?;
(mmap, header)
(
Header::import_and_verify(&mut file, version, Format::Raw)?,
None,
)
}
}
Err(e) => match e.kind() {
@@ -72,22 +75,25 @@ where
fs::create_dir_all(Self::folder_(parent, name))?;
let mut file = Self::open_file_(&path)?;
let header = Header::create_and_write(&mut file, version, Format::Raw)?;
let mmap = Self::new_mmap(&file)?;
(mmap, header)
(header, None)
}
_ => return Err(e.into()),
},
};
let mmap = Arc::new(ArcSwap::new(mmap));
let stored_len = if let Some(file) = file {
(file.metadata()?.len() as usize - HEADER_OFFSET) / Self::SIZE_OF_T
} else {
0
};
Ok(Self {
mmap,
header,
name: Box::leak(Box::new(name.to_string())),
parent: parent.to_owned(),
pushed: vec![],
phantom: PhantomData,
stored_len: Arc::new(AtomicUsize::new(stored_len)),
})
}
@@ -141,18 +147,9 @@ where
&mut self.header
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
&self.mmap
}
#[inline]
fn stored_len(&self) -> usize {
self.stored_len_(&self.mmap.load())
}
#[inline]
fn stored_len_(&self, mmap: &Mmap) -> usize {
(mmap.len() - HEADER_OFFSET) / Self::SIZE_OF_T
self.stored_len.load(Ordering::SeqCst)
}
#[inline]
@@ -194,9 +191,10 @@ where
};
let mut file = file_opt.unwrap_or(self.open_file()?);
self.file_write_all(&mut file, &bytes)?;
self.stored_len.fetch_add(pushed_len, Ordering::SeqCst);
Ok(())
}
@@ -212,6 +210,8 @@ where
return Ok(());
}
self.stored_len.store(index, Ordering::SeqCst);
let len = index * Self::SIZE_OF_T + HEADER_OFFSET;
let mut file = self.open_file()?;
@@ -221,6 +221,7 @@ where
}
fn reset(&mut self) -> Result<()> {
self.stored_len.store(0, Ordering::SeqCst);
self.reset_()
}
}
@@ -262,9 +263,9 @@ impl<I, T> Clone for RawVec<I, T> {
header: self.header.clone(),
parent: self.parent.clone(),
name: self.name,
mmap: self.mmap.clone(),
pushed: vec![],
phantom: PhantomData,
stored_len: self.stored_len.clone(),
}
}
}
@@ -272,7 +273,7 @@ impl<I, T> Clone for RawVec<I, T> {
#[derive(Debug)]
pub struct RawVecIterator<'a, I, T> {
vec: &'a RawVec<I, T>,
guard: Guard<Arc<Mmap>>,
mmap: Mmap,
index: usize,
}
@@ -305,7 +306,7 @@ where
type Item = (I, Cow<'a, T>);
fn next(&mut self) -> Option<Self::Item> {
let mmap = &self.guard;
let mmap = &self.mmap;
let index = self.index;
let opt = self
@@ -333,7 +334,7 @@ where
fn into_iter(self) -> Self::IntoIter {
RawVecIterator {
vec: self,
guard: self.mmap.load(),
mmap: self.create_mmap().unwrap(),
index: 0,
}
}

View File

@@ -3,7 +3,6 @@ use std::{
path::{Path, PathBuf},
};
use arc_swap::ArcSwap;
use brk_core::{Result, Version};
use memmap2::Mmap;
@@ -75,14 +74,6 @@ where
}
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
match self {
StoredVec::Raw(v) => v.mmap(),
StoredVec::Compressed(v) => v.mmap(),
}
}
#[inline]
fn parent(&self) -> &Path {
match self {
@@ -98,13 +89,6 @@ where
StoredVec::Compressed(v) => v.stored_len(),
}
}
#[inline]
fn stored_len_(&self, mmap: &Mmap) -> usize {
match self {
StoredVec::Raw(v) => v.stored_len_(mmap),
StoredVec::Compressed(v) => v.stored_len_(mmap),
}
}
#[inline]
fn pushed(&self) -> &[T] {