vecs: part 7

This commit is contained in:
nym21
2025-07-23 23:55:13 +02:00
parent c4fc24c513
commit b10f5e3f67
31 changed files with 2074 additions and 221 deletions

View File

@@ -1,71 +0,0 @@
use std::{
fs,
io::{self, Read},
ops::{AddAssign, Deref, DerefMut},
path::Path,
};
use brk_core::{Error, Result};
use zerocopy::{FromBytes, IntoBytes};
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
#[derive(
Debug,
Default,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
FromBytes,
IntoBytes,
Immutable,
KnownLayout,
)]
pub struct Length(usize);
impl Length {
pub fn write(&self, path: &Path) -> Result<(), io::Error> {
fs::write(path, self.as_bytes())
}
}
impl From<usize> for Length {
fn from(value: usize) -> Self {
Self(value)
}
}
impl Deref for Length {
type Target = usize;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Length {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl TryFrom<&Path> for Length {
type Error = Error;
fn try_from(value: &Path) -> Result<Self, Self::Error> {
let mut buf = [0; 8];
if let Ok(bytes) = fs::read(value) {
bytes.as_slice().read_exact(&mut buf)?;
Ok(*(Self::ref_from_bytes(&buf)?))
} else {
Ok(Self::default())
}
}
}
impl AddAssign<usize> for Length {
fn add_assign(&mut self, rhs: usize) {
self.0 += rhs;
}
}

View File

@@ -2,12 +2,10 @@ mod compressed_page_meta;
mod compressed_pages_meta;
mod format;
mod header;
// mod length;
mod unsafe_slice;
pub use compressed_page_meta::*;
pub use compressed_pages_meta::*;
pub use format::*;
pub use header::*;
// pub use length::*;
pub use unsafe_slice::*;

View File

@@ -18,6 +18,8 @@ log = { workspace = true }
memmap2 = "0.9.7"
parking_lot = {workspace = true}
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
zerocopy = { workspace = true }
zerocopy-derive = { workspace = true }
zstd = "0.13.3"

View File

@@ -8,38 +8,400 @@ fn main() -> Result<()> {
let file = File::open(Path::new("vecs"))?;
// file.set_min_len(PAGE_SIZE * 1_000_000)?;
let region1_i = file.create_region_if_needed("region1")?;
dbg!(region1_i);
assert!(file.get_region(region1_i).unwrap().read().len() == 0);
file.write_all(region1_i, &[0, 1, 2, 3, 4])?;
{
let opt = file.get_region(region1_i);
let region = opt.as_ref().unwrap().read();
assert!(region.start() == 0 && region.len() == 5 && region.reserved() == PAGE_SIZE);
let layout = file.layout();
assert!(layout.start_to_index().len() == 1);
assert!(layout.start_to_index().first_key_value() == Some((&0, &0)));
assert!(layout.start_to_hole().is_empty());
let regions = file.regions();
assert!(
regions
.get_region_index_from_id("region1")
.is_some_and(|i| i == region1_i)
);
let region = file.get_region(region1_i)?;
assert!(region.start() == 0);
assert!(region.len() == 0);
assert!(region.reserved() == PAGE_SIZE);
}
assert!(file.mmap.read()[0..10] == [0, 1, 2, 3, 4, 0, 0, 0, 0, 0]);
file.write_all(region1_i, &[5, 6, 7, 8, 9])?;
assert!(file.mmap.read()[0..10] == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
file.write_all_at(region1_i, &[1, 2], 0)?;
assert!(file.mmap.read()[0..10] == [1, 2, 2, 3, 4, 5, 6, 7, 8, 9]);
file.write_all_to_region(region1_i, &[0, 1, 2, 3, 4])?;
{
let opt = file.get_region(region1_i);
let region = opt.as_ref().unwrap().read();
dbg!(&region);
assert!(region.start() == 0 && region.len() == 10 && region.reserved() == PAGE_SIZE);
let region = file.get_region(region1_i)?;
assert!(region.start() == 0);
assert!(region.len() == 5);
assert!(region.reserved() == PAGE_SIZE);
assert!(file.mmap()[0..10] == [0, 1, 2, 3, 4, 0, 0, 0, 0, 0]);
}
// file.set_min_len(PAGE_SIZE * 1_000_000)?;
file.write_all_to_region(region1_i, &[5, 6, 7, 8, 9])?;
{
let region = file.get_region(region1_i)?;
assert!(region.start() == 0);
assert!(region.len() == 10);
assert!(region.reserved() == PAGE_SIZE);
assert!(file.mmap()[0..10] == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
}
file.write_all_to_region_at(region1_i, &[1, 2], 0)?;
{
let region = file.get_region(region1_i)?;
assert!(region.start() == 0);
assert!(region.len() == 10);
assert!(region.reserved() == PAGE_SIZE);
assert!(file.mmap()[0..10] == [1, 2, 2, 3, 4, 5, 6, 7, 8, 9]);
}
file.write_all_to_region_at(region1_i, &[10, 11, 12, 13, 14, 15, 16, 17, 18], 4)?;
{
let region = file.get_region(region1_i)?;
assert!(region.start() == 0);
assert!(region.len() == 13);
assert!(region.reserved() == PAGE_SIZE);
assert!(
file.mmap()[0..20]
== [
1, 2, 2, 3, 10, 11, 12, 13, 14, 15, 16, 17, 18, 0, 0, 0, 0, 0, 0, 0
]
);
}
file.write_all_to_region_at(region1_i, &[1], 18)?;
{
let region = file.get_region(region1_i)?;
assert!(region.start() == 0);
assert!(region.len() == 19);
assert!(region.reserved() == PAGE_SIZE);
assert!(
file.mmap()[0..20]
== [
1, 2, 2, 3, 10, 11, 12, 13, 14, 15, 16, 17, 18, 0, 0, 0, 0, 0, 1, 0
]
);
}
file.write_all_to_region_at(region1_i, &[1; 8000], 0)?;
{
let region = file.get_region(region1_i)?;
assert!(region.start() == 0);
assert!(region.len() == 8000);
assert!(region.reserved() == PAGE_SIZE * 2);
assert!(file.mmap()[0..8000] == [1; 8000]);
assert!(file.mmap()[8000..8001] == [0]);
}
println!("Disk usage - pre sync: {}", file.disk_usage());
file.sync_data()?;
println!("Disk usage - post sync: {}", file.disk_usage());
file.truncate_region(region1_i, 10)?;
{
let region = file.get_region(region1_i)?;
assert!(region.start() == 0);
assert!(region.len() == 10);
assert!(region.reserved() == PAGE_SIZE * 2);
// We only punch a hole in whole pages (4096 bytes)
// Thus the last byte of the page where the is still data wasn't overwritten when truncating
// And the first byte of the punched page was set to 0
assert!(file.mmap()[4095..=4096] == [1, 0]);
}
file.sync_data()?;
println!("Disk usage - post trunc: {}", file.disk_usage());
file.remove_region(region1_i)?;
println!("Disk usage - post remove: {}", file.disk_usage());
{
let regions = file.regions();
let index_to_region = regions.index_to_region();
assert!(index_to_region.len() == 1);
assert!(index_to_region[0].is_none());
assert!(regions.id_to_index().is_empty());
let layout = file.layout();
assert!(layout.start_to_index().is_empty());
assert!(layout.start_to_hole().is_empty());
}
let region1_i = file.create_region_if_needed("region1")?;
let region2_i = file.create_region_if_needed("region2")?;
let region3_i = file.create_region_if_needed("region3")?;
{
let regions = file.regions();
let index_to_region = regions.index_to_region();
assert!(index_to_region.len() == 3);
let region1 = file.get_region(region1_i)?;
assert!(region1.start() == 0);
assert!(region1.len() == 0);
assert!(region1.reserved() == PAGE_SIZE);
let region2 = file.get_region(region2_i)?;
assert!(region2.start() == PAGE_SIZE);
assert!(region2.len() == 0);
assert!(region2.reserved() == PAGE_SIZE);
let region3 = file.get_region(region3_i)?;
assert!(region3.start() == PAGE_SIZE * 2);
assert!(region3.len() == 0);
assert!(region3.reserved() == PAGE_SIZE);
let id_to_index = regions.id_to_index();
assert!(id_to_index.len() == 3);
assert!(id_to_index.get("region1") == Some(&0));
assert!(id_to_index.get("region2") == Some(&1));
assert!(id_to_index.get("region3") == Some(&2));
let layout = file.layout();
let start_to_index = layout.start_to_index();
assert!(start_to_index.len() == 3);
assert!(start_to_index.get(&0) == Some(&0));
assert!(start_to_index.get(&PAGE_SIZE) == Some(&1));
assert!(start_to_index.get(&(PAGE_SIZE * 2)) == Some(&2));
assert!(layout.start_to_hole().is_empty());
}
file.remove_region(region2_i)?;
{
let regions = file.regions();
let index_to_region = regions.index_to_region();
assert!(index_to_region.len() == 3);
let region1 = file.get_region(region1_i)?;
assert!(region1.start() == 0);
assert!(region1.len() == 0);
assert!(region1.reserved() == PAGE_SIZE);
assert!(file.get_region(region2_i).is_err());
assert!(
index_to_region
.get(region2_i)
.is_some_and(|opt| opt.is_none())
);
let region3 = file.get_region(region3_i)?;
assert!(region3.start() == PAGE_SIZE * 2);
assert!(region3.len() == 0);
assert!(region3.reserved() == PAGE_SIZE);
let id_to_index = regions.id_to_index();
assert!(id_to_index.len() == 2);
assert!(id_to_index.get("region1") == Some(&0));
assert!(id_to_index.get("region2").is_none());
assert!(id_to_index.get("region3") == Some(&2));
let layout = file.layout();
let start_to_index = layout.start_to_index();
assert!(start_to_index.len() == 2);
assert!(start_to_index.get(&0) == Some(&region1_i));
assert!(start_to_index.get(&(PAGE_SIZE * 2)) == Some(&region3_i));
let start_to_hole = layout.start_to_hole();
assert!(start_to_hole.len() == 1);
assert!(start_to_hole.get(&PAGE_SIZE) == Some(&PAGE_SIZE));
drop(regions);
drop(layout);
assert!(file.remove_region(region2_i).is_ok_and(|o| o.is_none()));
}
let region2_i = file.create_region_if_needed("region2")?;
{
assert!(region2_i == 1)
}
file.remove_region(region2_i)?;
{
let regions = file.regions();
let index_to_region = regions.index_to_region();
assert!(index_to_region.len() == 3);
let region1 = file.get_region(region1_i)?;
assert!(region1.start() == 0);
assert!(region1.len() == 0);
assert!(region1.reserved() == PAGE_SIZE);
assert!(file.get_region(region2_i).is_err());
assert!(
index_to_region
.get(region2_i)
.is_some_and(|opt| opt.is_none())
);
let region3 = file.get_region(region3_i)?;
assert!(region3.start() == PAGE_SIZE * 2);
assert!(region3.len() == 0);
assert!(region3.reserved() == PAGE_SIZE);
let id_to_index = regions.id_to_index();
assert!(id_to_index.len() == 2);
assert!(id_to_index.get("region1") == Some(&0));
assert!(id_to_index.get("region2").is_none());
assert!(id_to_index.get("region3") == Some(&2));
let layout = file.layout();
let start_to_index = layout.start_to_index();
assert!(start_to_index.len() == 2);
assert!(start_to_index.get(&0) == Some(&region1_i));
assert!(start_to_index.get(&(PAGE_SIZE * 2)) == Some(&region3_i));
let start_to_hole = layout.start_to_hole();
assert!(start_to_hole.len() == 1);
assert!(start_to_hole.get(&PAGE_SIZE) == Some(&PAGE_SIZE));
drop(regions);
drop(layout);
assert!(file.remove_region(region2_i).is_ok_and(|o| o.is_none()));
}
file.write_all_to_region_at(region1_i, &[1; 8000], 0)?;
{
let regions = file.regions();
let index_to_region = regions.index_to_region();
assert!(index_to_region.len() == 3);
let region1 = file.get_region(region1_i)?;
assert!(region1.start() == 0);
assert!(region1.len() == 8000);
assert!(region1.reserved() == 2 * PAGE_SIZE);
assert!(file.get_region(region2_i).is_err());
assert!(
index_to_region
.get(region2_i)
.is_some_and(|opt| opt.is_none())
);
let region3 = file.get_region(region3_i)?;
assert!(region3.start() == PAGE_SIZE * 2);
assert!(region3.len() == 0);
assert!(region3.reserved() == PAGE_SIZE);
let id_to_index = regions.id_to_index();
assert!(id_to_index.len() == 2);
assert!(id_to_index.get("region1") == Some(&0));
assert!(id_to_index.get("region2").is_none());
assert!(id_to_index.get("region3") == Some(&2));
let layout = file.layout();
let start_to_index = layout.start_to_index();
assert!(start_to_index.len() == 2);
assert!(start_to_index.get(&0) == Some(&region1_i));
assert!(start_to_index.get(&(PAGE_SIZE * 2)) == Some(&region3_i));
let start_to_hole = layout.start_to_hole();
assert!(start_to_hole.is_empty());
}
let region2_i = file.create_region_if_needed("region2")?;
{
let regions = file.regions();
let index_to_region = regions.index_to_region();
assert!(index_to_region.len() == 3);
let region1 = file.get_region(region1_i)?;
assert!(region1.start() == 0);
assert!(region1.len() == 8000);
assert!(region1.reserved() == 2 * PAGE_SIZE);
let region2 = file.get_region(region2_i)?;
assert!(region2.start() == PAGE_SIZE * 3);
assert!(region2.len() == 0);
assert!(region2.reserved() == PAGE_SIZE);
let region3 = file.get_region(region3_i)?;
assert!(region3.start() == PAGE_SIZE * 2);
assert!(region3.len() == 0);
assert!(region3.reserved() == PAGE_SIZE);
let id_to_index = regions.id_to_index();
assert!(id_to_index.len() == 3);
assert!(id_to_index.get("region1") == Some(&0));
assert!(id_to_index.get("region2") == Some(&1));
assert!(id_to_index.get("region3") == Some(&2));
let layout = file.layout();
let start_to_index = layout.start_to_index();
assert!(start_to_index.len() == 3);
assert!(start_to_index.get(&0) == Some(&region1_i));
assert!(start_to_index.get(&(PAGE_SIZE * 2)) == Some(&region3_i));
assert!(start_to_index.get(&(PAGE_SIZE * 3)) == Some(&region2_i));
let start_to_hole = layout.start_to_hole();
assert!(start_to_hole.is_empty());
}
file.remove_region(region3_i)?;
{
let regions = file.regions();
let index_to_region = regions.index_to_region();
assert!(index_to_region.len() == 3);
let region1 = file.get_region(region1_i)?;
assert!(region1.start() == 0);
assert!(region1.len() == 8000);
assert!(region1.reserved() == 2 * PAGE_SIZE);
let region2 = file.get_region(region2_i)?;
assert!(region2.start() == PAGE_SIZE * 3);
assert!(region2.len() == 0);
assert!(region2.reserved() == PAGE_SIZE);
assert!(file.get_region(region3_i).is_err());
let id_to_index = regions.id_to_index();
assert!(id_to_index.len() == 2);
assert!(id_to_index.get("region1") == Some(&0));
assert!(id_to_index.get("region2") == Some(&1));
assert!(id_to_index.get("region3").is_none());
let layout = file.layout();
let start_to_index = layout.start_to_index();
assert!(start_to_index.len() == 2);
assert!(start_to_index.get(&0) == Some(&region1_i));
assert!(start_to_index.get(&(PAGE_SIZE * 3)) == Some(&region2_i));
let start_to_hole = layout.start_to_hole();
assert!(start_to_hole.get(&(PAGE_SIZE * 2)) == Some(&PAGE_SIZE));
}
file.write_all_to_region(region1_i, &[1; 8000])?;
{
let regions = file.regions();
let index_to_region = regions.index_to_region();
assert!(index_to_region.len() == 3);
let region1 = file.get_region(region1_i)?;
assert!(region1.start() == PAGE_SIZE * 4);
assert!(region1.len() == 16_000);
assert!(region1.reserved() == 4 * PAGE_SIZE);
let region2 = file.get_region(region2_i)?;
assert!(region2.start() == PAGE_SIZE * 3);
assert!(region2.len() == 0);
assert!(region2.reserved() == PAGE_SIZE);
assert!(file.get_region(region3_i).is_err());
let id_to_index = regions.id_to_index();
assert!(id_to_index.len() == 2);
assert!(id_to_index.get("region1") == Some(&0));
assert!(id_to_index.get("region2") == Some(&1));
assert!(id_to_index.get("region3").is_none());
let layout = file.layout();
let start_to_index = layout.start_to_index();
assert!(start_to_index.len() == 2);
assert!(start_to_index.get(&(PAGE_SIZE * 4)) == Some(&region1_i));
assert!(start_to_index.get(&(PAGE_SIZE * 3)) == Some(&region2_i));
let start_to_hole = layout.start_to_hole();
assert!(start_to_hole.get(&0) == Some(&(PAGE_SIZE * 3)));
}
file.write_all_to_region(region2_i, &[1; 6000])?;
let region4_i = file.create_region_if_needed("region4")?;
file.remove_region(region2_i);
file.remove_region(region4_i);
dbg!(file.regions());
dbg!(file.layout());
Ok(())
}

View File

@@ -3,7 +3,7 @@ use std::collections::BTreeMap;
use brk_core::Error;
use brk_core::Result;
use super::{PAGE_SIZE, Region, Regions};
use super::{Region, Regions};
#[derive(Debug)]
pub struct Layout {
@@ -19,7 +19,7 @@ impl From<&Regions> for Layout {
let mut prev_end = 0;
value
.as_array()
.index_to_region()
.iter()
.enumerate()
.flat_map(|(index, opt)| opt.as_ref().map(|region| (index, region)))
@@ -42,6 +42,14 @@ impl From<&Regions> for Layout {
}
impl Layout {
pub fn start_to_index(&self) -> &BTreeMap<u64, usize> {
&self.start_to_index
}
pub fn start_to_hole(&self) -> &BTreeMap<u64, u64> {
&self.start_to_hole
}
pub fn get_last_region(&self) -> Option<(u64, usize)> {
self.start_to_index
.last_key_value()
@@ -66,15 +74,15 @@ impl Layout {
// TODO: Other checks related to holes ?
}
pub fn move_region(&mut self, start: u64, index: usize, region: &Region) -> Result<()> {
pub fn move_region(&mut self, new_start: u64, index: usize, region: &Region) -> Result<()> {
self.remove_region(index, region)?;
self.insert_region(start, index);
self.insert_region(new_start, index);
Ok(())
}
pub fn remove_region(&mut self, index: usize, region: &Region) -> Result<()> {
let start = region.start();
let reserved = region.reserved();
let mut reserved = region.reserved();
if self
.start_to_index
@@ -86,14 +94,10 @@ impl Layout {
));
}
if self
.widen_hole_to_the_left_if_any(start + reserved, reserved)
.is_none()
&& let Some((&hole_start, gap)) = self.start_to_hole.range(..start).next_back()
&& hole_start + *gap == start
{
self.widen_hole_to_the_right_if_any(hole_start, reserved);
}
reserved += self
.start_to_hole
.remove(&(start + reserved))
.unwrap_or_default();
if self
.start_to_index
@@ -106,7 +110,16 @@ impl Layout {
.is_some_and(|&hole_start| hole_start > region_start)
})
{
// dbg!("Remove last hole");
self.start_to_hole.pop_last();
} else if let Some((&hole_start, gap)) = self.start_to_hole.range_mut(..start).next_back()
&& hole_start + *gap == start
{
// dbg!("Expand hole");
*gap += reserved;
} else {
// dbg!("Insert hole");
self.start_to_hole.insert(start, reserved);
}
Ok(())
@@ -138,42 +151,4 @@ impl Layout {
}
}
}
fn widen_hole_to_the_left_if_any(&mut self, start: u64, widen_by: u64) -> Option<u64> {
debug_assert!(start % PAGE_SIZE == 0);
if widen_by > start {
panic!("Hole too small")
}
let gap = self.start_to_hole.remove(&start)?;
debug_assert!(widen_by % PAGE_SIZE == 0);
let start = start - widen_by;
let gap = gap + widen_by;
if let Some((&prev_start, prev_gap)) = self.start_to_hole.range_mut(..start).next_back()
&& prev_start + *prev_gap == start
{
*prev_gap += gap;
} else {
debug_assert!(self.start_to_hole.insert(start, gap).is_none());
}
Some(start)
}
fn widen_hole_to_the_right_if_any(&mut self, start: u64, widen_by: u64) -> Option<u64> {
debug_assert!(start % PAGE_SIZE == 0);
let gap = self.start_to_hole.get_mut(&start)?;
debug_assert!(widen_by % PAGE_SIZE == 0);
*gap += widen_by;
let next_hole_start = start + *gap;
if let Some(next_gap) = self.start_to_hole.remove(&next_hole_start) {
*self.start_to_hole.get_mut(&start).unwrap() += next_gap;
}
Some(start)
}
}

View File

@@ -1,7 +1,8 @@
use std::{
fs::{self, OpenOptions},
io::Write,
os::unix::io::AsRawFd,
path::Path,
path::{Path, PathBuf},
sync::Arc,
};
@@ -16,19 +17,20 @@ mod region;
mod regions;
use layout::*;
use reader::*;
pub use reader::Reader;
use region::*;
use regions::*;
pub const PAGE_SIZE: u64 = 4096;
pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1;
#[derive(Debug)]
pub struct File {
// TODO: Remove pub
pub regions: RwLock<Regions>,
pub layout: RwLock<Layout>,
pub file: RwLock<fs::File>,
pub mmap: RwLock<MmapMut>,
path: PathBuf,
regions: RwLock<Regions>,
layout: RwLock<Layout>,
file: RwLock<fs::File>,
mmap: RwLock<MmapMut>,
}
impl File {
@@ -43,11 +45,12 @@ impl File {
.create(true)
.write(true)
.truncate(false)
.open(path.join("data"))?;
.open(Self::data_path_(path))?;
let mmap = Self::mmap(&file)?;
let mmap = Self::create_mmap(&file)?;
Ok(Self {
path: path.to_owned(),
file: RwLock::new(file),
mmap: RwLock::new(mmap),
regions: RwLock::new(regions),
@@ -55,14 +58,18 @@ impl File {
})
}
pub fn file_len(&self) -> Result<u64> {
Ok(self.file.read().metadata()?.len())
}
pub fn set_min_len(&self, len: u64) -> Result<()> {
let len = Self::ceil_number_to_page_size_multiple(len);
if self.file.read().metadata()?.len() < len {
if self.file_len()? < len {
let mut mmap = self.mmap.write();
let file = self.file.write();
file.set_len(len)?;
*mmap = Self::mmap(&file)?;
*mmap = Self::create_mmap(&file)?;
Ok(())
} else {
Ok(())
@@ -77,7 +84,7 @@ impl File {
}
pub fn create_region_if_needed(&self, id: &str) -> Result<usize> {
if let Some(index) = self.regions.read().get_region_index_from_id(id.to_owned()) {
if let Some(index) = self.regions.read().get_region_index_from_id(id) {
return Ok(index);
}
let mut regions = self.regions.write();
@@ -106,45 +113,48 @@ impl File {
Ok(index)
}
pub fn get_region(&self, index: usize) -> Option<Arc<RwLock<Region>>> {
self.regions.read().get_region_from_index(index)
pub fn get_region(&self, index: usize) -> Result<RwLockReadGuard<'static, Region>> {
let regions = self.regions.read();
let region_opt = regions.get_region_from_index(index);
let region_arc = region_opt.ok_or(Error::Str("Unknown region"))?;
let region = region_arc.read();
let region: RwLockReadGuard<'static, Region> = unsafe { std::mem::transmute(region) };
Ok(region)
}
pub fn read<'a>(&'a self, index: usize) -> Result<Reader<'a>> {
pub fn read_region<'a>(&'a self, index: usize) -> Result<Reader<'a>> {
let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read();
let region: RwLockReadGuard<'static, Region> = unsafe {
std::mem::transmute(
self.regions
.read()
.get_region_from_index(index)
.ok_or(Error::Str("Unknown region"))?
.read(),
)
};
let region = self.get_region(index)?;
Ok(Reader::new(mmap, region))
}
#[inline]
pub fn write_all(&self, region: usize, data: &[u8]) -> Result<()> {
self.write_all_at_(region, data, None)
pub fn write_all_to_region(&self, region: usize, data: &[u8]) -> Result<()> {
self.write_all_to_region_at_(region, data, None)
}
#[inline]
pub fn write_all_at(&self, region: usize, data: &[u8], at: u64) -> Result<()> {
self.write_all_at_(region, data, Some(at))
pub fn write_all_to_region_at(&self, region: usize, data: &[u8], at: u64) -> Result<()> {
self.write_all_to_region_at_(region, data, Some(at))
}
fn write_all_at_(&self, region_index: usize, data: &[u8], at: Option<u64>) -> Result<()> {
fn write_all_to_region_at_(
&self,
region_index: usize,
data: &[u8],
at: Option<u64>,
) -> Result<()> {
let Some(region) = self.regions.read().get_region_from_index(region_index) else {
return Err(Error::Str("Unknown region"));
};
let region_lock = region.read();
let start = region_lock.start();
let reserved = region_lock.reserved();
let left = region_lock.left();
let len = region_lock.len();
let data_len = data.len() as u64;
drop(region_lock);
let new_len = at.map_or(len + data_len, |at| (at + data_len).max(len));
// dbg!(new_len);
let write_start = at.unwrap_or(start + len);
if at.is_some_and(|at| at < start || at >= start + reserved) {
@@ -152,42 +162,44 @@ impl File {
}
// Write to reserved space if possible
let at_left = at.map_or_else(|| left, |at| reserved - at);
if at_left >= data_len {
let len = reserved - at_left.min(left) + data_len;
dbg!(write_start);
if new_len <= reserved {
// dbh!("Write to reserved space");
// dbg!(write_start);
self.write(write_start, data);
let regions = self.regions.read();
let mut region_lock = region.write();
dbg!(len);
region_lock.set_len(len);
if len != new_len {
region_lock.set_len(new_len);
}
regions.write_to_mmap(&region_lock, region_index);
return Ok(());
}
let mut layout_lock = self.layout.write();
let new_len = len + data_len;
debug_assert!(new_len > reserved);
let mut new_reserved = reserved;
while new_len < new_reserved {
while new_len > new_reserved {
new_reserved *= 2;
}
debug_assert!(new_len <= new_reserved);
let added_reserve = new_reserved - reserved;
// If is last continue writing
if layout_lock.is_last_region(region_index) {
// dbg!("Append to file");
// dbg!(start, new_reserved, start + new_reserved);
self.set_min_len(start + new_reserved)?;
self.write(write_start, data);
let regions = self.regions.read();
let mut region_lock = region.write();
region_lock.set_len(new_len);
region_lock.set_reserved(new_reserved);
region_lock.set_len(new_len);
regions.write_to_mmap(&region_lock, region_index);
return Ok(());
}
@@ -196,6 +208,8 @@ impl File {
let hole_start = start + reserved;
let gap = layout_lock.get_hole(hole_start);
if gap.is_some_and(|gap| gap >= added_reserve) {
// dbg!("Expand to hole");
self.write(write_start, data);
layout_lock.remove_or_compress_hole_to_right(hole_start, added_reserve);
@@ -203,14 +217,16 @@ impl File {
let regions = self.regions.read();
let mut region_lock = region.write();
region_lock.set_len(new_len);
region_lock.set_reserved(new_reserved);
region_lock.set_len(new_len);
regions.write_to_mmap(&region_lock, region_index);
return Ok(());
}
// Find hole big enough to move the region
if let Some(hole_start) = layout_lock.find_smallest_adequate_hole(new_reserved) {
// dbg!("Move to hole");
self.write(
hole_start,
&self.mmap.read()[start as usize..(start + len) as usize],
@@ -224,8 +240,8 @@ impl File {
layout_lock.move_region(hole_start, region_index, &region_lock)?;
region_lock.set_start(hole_start);
region_lock.set_len(new_len);
region_lock.set_reserved(new_reserved);
region_lock.set_len(new_len);
regions.write_to_mmap(&region_lock, region_index);
drop(layout_lock);
@@ -236,6 +252,7 @@ impl File {
}
// Write at the end
// dbg!("Move and write at the end");
let regions = self.regions.read();
let mut region_lock = region.write();
let (last_region_start, last_region_index) = layout_lock.get_last_region().unwrap();
@@ -253,9 +270,13 @@ impl File {
);
self.write(new_start + len, data);
// dbg!(new_start, region_index, &region_lock, new_reserved, new_len);
layout_lock.move_region(new_start, region_index, &region_lock)?;
region_lock.set_start(new_start);
region_lock.set_len(new_len);
region_lock.set_reserved(new_reserved);
region_lock.set_len(new_len);
regions.write_to_mmap(&region_lock, region_index);
self.punch_hole(start, reserved)?;
@@ -279,7 +300,7 @@ impl File {
slice[start..end].copy_from_slice(data);
}
pub fn truncate(&self, index: usize, from: u64) -> Result<()> {
pub fn truncate_region(&self, index: usize, from: u64) -> Result<()> {
let Some(region) = self.regions.read().get_region_from_index(index) else {
return Err(Error::Str("Unknown region"));
};
@@ -306,20 +327,22 @@ impl File {
Ok(())
}
pub fn remove(&self, index: usize) -> Result<Option<Arc<RwLock<Region>>>> {
pub fn remove_region(&self, index: usize) -> Result<Option<Arc<RwLock<Region>>>> {
let mut regions = self.regions.write();
let mut layout = self.layout.write();
let Some(region) = regions.remove_region(index)? else {
return Ok(None);
};
// dbg!(&regions);
let region_ = region.write();
layout.remove_region(index, &region_)?;
self.punch_hole(region_.start(), region_.len())?;
// dbg!(layout);
self.punch_hole(region_.start(), region_.reserved())?;
drop(region_);
Ok(Some(region))
}
fn mmap(file: &fs::File) -> Result<MmapMut> {
fn create_mmap(file: &fs::File) -> Result<MmapMut> {
Ok(unsafe { MmapOptions::new().map_mut(file)? })
}
@@ -353,9 +376,55 @@ impl File {
Ok(())
}
pub fn regions(&self) -> RwLockReadGuard<'_, Regions> {
self.regions.read()
}
pub fn layout(&self) -> RwLockReadGuard<'_, Layout> {
self.layout.read()
}
pub fn mmap(&self) -> RwLockReadGuard<'_, MmapMut> {
self.mmap.read()
}
fn ceil_number_to_page_size_multiple(num: u64) -> u64 {
(num + PAGE_SIZE_MINUS_1) & !PAGE_SIZE_MINUS_1
}
fn data_path(&self) -> PathBuf {
Self::data_path_(&self.path)
}
fn data_path_(path: &Path) -> PathBuf {
path.join("data")
}
pub fn flush(&self) -> Result<()> {
self.file.write().flush().map_err(|e| e.into())
}
pub fn sync_data(&self) -> Result<()> {
self.file.read().sync_data().map_err(|e| e.into())
}
pub fn sync_all(&self) -> Result<()> {
self.file.read().sync_all().map_err(|e| e.into())
}
pub fn disk_usage(&self) -> String {
let path = self.data_path();
let output = std::process::Command::new("du")
.arg("-h")
.arg(&path)
.output()
.expect("Failed to run du");
String::from_utf8_lossy(&output.stdout)
.replace(path.to_str().unwrap(), " ")
.trim()
.to_string()
}
}
#[repr(C)]

View File

@@ -3,6 +3,7 @@ use parking_lot::RwLockReadGuard;
use super::Region;
#[derive(Debug)]
pub struct Reader<'a> {
mmap: RwLockReadGuard<'a, MmapMut>,
region: RwLockReadGuard<'static, Region>,

View File

@@ -7,23 +7,23 @@ use crate::PAGE_SIZE;
pub struct Region {
/// Must be multiple of 4096
start: u64,
length: u64,
/// Must be multiple of 4096
len: u64,
/// Must be multiple of 4096, greater or equal to len
reserved: u64,
}
pub const SIZE_OF_REGION: usize = size_of::<Region>();
impl Region {
pub fn new(start: u64, length: u64, reserved: u64) -> Self {
debug_assert!(reserved > 0);
pub fn new(start: u64, len: u64, reserved: u64) -> Self {
debug_assert!(start % PAGE_SIZE == 0);
debug_assert!(reserved >= PAGE_SIZE);
debug_assert!(reserved % PAGE_SIZE == 0);
debug_assert!(length <= reserved);
debug_assert!(len <= reserved);
Self {
start,
length,
len,
reserved,
}
}
@@ -38,11 +38,12 @@ impl Region {
}
pub fn len(&self) -> u64 {
self.length
self.len
}
pub fn set_len(&mut self, len: u64) {
self.length = len
debug_assert!(len <= self.reserved());
self.len = len
}
pub fn reserved(&self) -> u64 {
@@ -50,10 +51,14 @@ impl Region {
}
pub fn set_reserved(&mut self, reserved: u64) {
debug_assert!(self.len() <= reserved);
debug_assert!(reserved >= PAGE_SIZE);
debug_assert!(reserved % PAGE_SIZE == 0);
self.reserved = reserved;
}
pub fn left(&self) -> u64 {
self.reserved - self.length
self.reserved - self.len
}
}

View File

@@ -103,8 +103,12 @@ impl Regions {
let region = Region::new(start, 0, PAGE_SIZE);
self.index_to_region
.push(Some(Arc::new(RwLock::new(region.clone()))));
let region_arc = Some(Arc::new(RwLock::new(region.clone())));
if index < self.index_to_region.len() {
self.index_to_region[index] = region_arc
} else {
self.index_to_region.push(region_arc);
}
self.set_min_len(((index + 1) * SIZE_OF_REGION) as u64)?;
@@ -141,8 +145,8 @@ impl Regions {
self.index_to_region.get(index).cloned().flatten()
}
pub fn get_region_index_from_id(&self, id: String) -> Option<usize> {
self.id_to_index.get(&id).copied()
pub fn get_region_index_from_id(&self, id: &str) -> Option<usize> {
self.id_to_index.get(id).copied()
}
fn find_id_from_index(&self, index: usize) -> Option<&String> {
@@ -155,10 +159,14 @@ impl Regions {
)
}
pub fn as_array(&self) -> &[Option<Arc<RwLock<Region>>>] {
pub fn index_to_region(&self) -> &[Option<Arc<RwLock<Region>>>] {
&self.index_to_region
}
pub fn id_to_index(&self) -> &HashMap<String, usize> {
&self.id_to_index
}
pub fn write_to_mmap(&self, region: &Region, index: usize) {
let start = index * SIZE_OF_REGION;
let end = start + SIZE_OF_REGION;

View File

@@ -1,5 +1,10 @@
mod file;
mod traits;
mod variants;
pub use file::*;
pub use variants::*;
use file::*;
use traits::*;
use variants::*;
pub use file::File;
pub use variants::{RawVec, Stamp, StampedVec};

View File

@@ -0,0 +1,102 @@
use brk_core::{Height, Version};
use super::{BoxedVecIterator, StoredIndex, StoredType};
pub fn i64_to_usize(i: i64, len: usize) -> usize {
if i >= 0 {
(i as usize).min(len)
} else {
let v = len as i64 + i;
if v < 0 { 0 } else { v as usize }
}
}
pub trait AnyVec: Send + Sync {
fn version(&self) -> Version;
fn name(&self) -> &str;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn index_type_to_string(&self) -> &'static str;
fn value_type_to_size_of(&self) -> usize;
fn etag(&self, height: Height, to: Option<i64>) -> String {
let len = self.len();
format!(
"{}-{}-{}",
to.map_or(len, |to| {
if to.is_negative() {
len.checked_sub(to.unsigned_abs() as usize)
.unwrap_or_default()
} else {
to as usize
}
}),
u64::from(self.version()),
u32::from(height),
)
}
#[inline]
fn i64_to_usize(&self, i: i64) -> usize {
let len = self.len();
i64_to_usize(i, len)
}
}
pub trait AnyIterableVec<I, T>: AnyVec {
#[allow(clippy::wrong_self_convention)]
fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T>
where
I: StoredIndex,
T: StoredType + 'a;
fn iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T>
where
I: StoredIndex,
T: StoredType + 'a,
{
self.boxed_iter()
}
fn iter_at<'a>(&'a self, i: I) -> BoxedVecIterator<'a, I, T>
where
I: StoredIndex,
T: StoredType + 'a,
{
let mut iter = self.boxed_iter();
iter.set(i);
iter
}
fn iter_at_<'a>(&'a self, i: usize) -> BoxedVecIterator<'a, I, T>
where
I: StoredIndex,
T: StoredType + 'a,
{
let mut iter = self.boxed_iter();
iter.set_(i);
iter
}
}
pub trait CloneableAnyIterableVec<I, T>: AnyIterableVec<I, T> {
fn boxed_clone(&self) -> Box<dyn CloneableAnyIterableVec<I, T>>;
}
impl<I, T, U> CloneableAnyIterableVec<I, T> for U
where
U: 'static + AnyIterableVec<I, T> + Clone,
{
fn boxed_clone(&self) -> Box<dyn CloneableAnyIterableVec<I, T>> {
Box::new(self.clone())
}
}
impl<I, T> Clone for Box<dyn CloneableAnyIterableVec<I, T>> {
fn clone(&self) -> Self {
self.boxed_clone()
}
}
pub type BoxedAnyIterableVec<I, T> = Box<dyn CloneableAnyIterableVec<I, T>>;

View File

@@ -0,0 +1,87 @@
use brk_core::{Error, Result};
use crate::i64_to_usize;
use super::{AnyIterableVec, AnyVec, StoredIndex, StoredType};
pub trait CollectableVec<I, T>: AnyVec + AnyIterableVec<I, T>
where
Self: Clone,
I: StoredIndex,
T: StoredType,
{
fn collect(&self) -> Result<Vec<T>> {
self.collect_range(None, None)
}
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<T>> {
let len = self.len();
let from = from.unwrap_or_default();
let to = to.map_or(len, |to| to.min(len));
if from >= len || from >= to {
return Ok(vec![]);
}
Ok(self
.iter_at_(from)
.take(to - from)
.map(|(_, v)| v.into_owned())
.collect::<Vec<_>>())
}
#[inline]
fn i64_to_usize_(i: i64, len: usize) -> usize {
if i >= 0 {
(i as usize).min(len)
} else {
let v = len as i64 + i;
if v < 0 { 0 } else { v as usize }
}
}
fn collect_signed_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<T>> {
let from = from.map(|i| self.i64_to_usize(i));
let to = to.map(|i| self.i64_to_usize(i));
self.collect_range(from, to)
}
#[inline]
fn collect_range_serde_json(
&self,
from: Option<usize>,
to: Option<usize>,
) -> Result<Vec<serde_json::Value>> {
self.collect_range(from, to)?
.into_iter()
.map(|v| serde_json::to_value(v).map_err(Error::from))
.collect::<Result<Vec<_>>>()
}
}
impl<I, T, V> CollectableVec<I, T> for V
where
V: AnyVec + AnyIterableVec<I, T> + Clone,
I: StoredIndex,
T: StoredType,
{
}
pub trait AnyCollectableVec: AnyVec {
fn collect_range_serde_json(
&self,
from: Option<usize>,
to: Option<usize>,
) -> Result<Vec<serde_json::Value>>;
fn range_count(&self, from: Option<i64>, to: Option<i64>) -> usize {
let len = self.len();
let from = from.map(|i| i64_to_usize(i, len));
let to = to.map(|i| i64_to_usize(i, len));
(from.unwrap_or_default()..to.unwrap_or(len)).count()
}
fn range_weight(&self, from: Option<i64>, to: Option<i64>) -> usize {
self.range_count(from, to) * self.value_type_to_size_of()
}
}

View File

@@ -0,0 +1,207 @@
use std::{
borrow::Cow,
cmp::Ordering,
collections::{BTreeMap, BTreeSet},
};
use brk_core::{Error, Result};
use memmap2::Mmap;
use crate::{AnyVec, HEADER_OFFSET, Header};
use super::{StoredIndex, StoredType};
pub trait GenericStoredVec<I, T>: Send + Sync
where
Self: AnyVec,
I: StoredIndex,
T: StoredType,
{
const SIZE_OF_T: usize = size_of::<T>();
#[inline]
fn unwrap_read(&self, index: I, mmap: &Mmap) -> T {
self.read(index, mmap).unwrap().unwrap()
}
#[inline]
fn read(&self, index: I, mmap: &Mmap) -> Result<Option<T>> {
self.read_(index.to_usize()?, mmap)
}
fn read_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>>;
#[inline]
fn get_or_read(&self, index: I, mmap: &Mmap) -> Result<Option<Cow<T>>> {
self.get_or_read_(index.to_usize()?, mmap)
}
#[inline]
fn get_or_read_(&self, index: usize, mmap: &Mmap) -> Result<Option<Cow<T>>> {
let stored_len = self.stored_len();
if index >= stored_len {
let pushed = self.pushed();
let j = index - stored_len;
if j >= pushed.len() {
return Ok(None);
}
return Ok(pushed.get(j).map(Cow::Borrowed));
}
let updated = self.updated();
if !updated.is_empty()
&& let Some(updated) = updated.get(&index)
{
return Ok(Some(Cow::Borrowed(updated)));
}
let holes = self.holes();
if !holes.is_empty() && holes.contains(&index) {
return Ok(None);
}
Ok(self.read_(index, mmap)?.map(Cow::Owned))
}
#[inline]
fn len_(&self) -> usize {
self.stored_len() + self.pushed_len()
}
fn index_to_name(&self) -> String {
format!("{}_to_{}", I::to_string(), self.name())
}
fn stored_len(&self) -> usize;
fn pushed(&self) -> &[T];
#[inline]
fn pushed_len(&self) -> usize {
self.pushed().len()
}
fn mut_pushed(&mut self) -> &mut Vec<T>;
#[inline]
fn push(&mut self, value: T) {
self.mut_pushed().push(value)
}
#[inline]
fn update_or_push(&mut self, index: I, value: T) -> Result<()> {
let len = self.len();
match len.cmp(&index.to_usize()?) {
Ordering::Less => {
dbg!(index, value, len, self.header());
Err(Error::IndexTooHigh)
}
Ordering::Equal => {
self.push(value);
Ok(())
}
Ordering::Greater => self.update(index, value),
}
}
fn get_first_empty_index(&self) -> I {
self.holes()
.first()
.cloned()
.unwrap_or_else(|| self.len_())
.into()
}
#[inline]
fn fill_first_hole_or_push(&mut self, value: T) -> Result<I> {
Ok(
if let Some(hole) = self.mut_holes().pop_first().map(I::from) {
self.update(hole, value)?;
hole
} else {
self.push(value);
I::from(self.len() - 1)
},
)
}
fn holes(&self) -> &BTreeSet<usize>;
fn mut_holes(&mut self) -> &mut BTreeSet<usize>;
fn take(&mut self, index: I, mmap: &Mmap) -> Result<Option<T>> {
let opt = self.get_or_read(index, mmap)?.map(|v| v.into_owned());
if opt.is_some() {
self.unchecked_delete(index);
}
Ok(opt)
}
#[inline]
fn delete(&mut self, index: I) {
if index.unwrap_to_usize() < self.len() {
self.unchecked_delete(index);
}
}
#[inline]
#[doc(hidden)]
fn unchecked_delete(&mut self, index: I) {
let uindex = index.unwrap_to_usize();
let updated = self.mut_updated();
if !updated.is_empty() {
updated.remove(&uindex);
}
self.mut_holes().insert(uindex);
}
fn updated(&self) -> &BTreeMap<usize, T>;
fn mut_updated(&mut self) -> &mut BTreeMap<usize, T>;
#[inline]
fn update(&mut self, index: I, value: T) -> Result<()> {
let uindex = index.unwrap_to_usize();
let stored_len = self.stored_len();
if uindex >= stored_len {
if let Some(prev) = self.mut_pushed().get_mut(uindex - stored_len) {
*prev = value;
return Ok(());
} else {
return Err(Error::IndexTooHigh);
}
}
let holes = self.mut_holes();
if !holes.is_empty() {
holes.remove(&index.unwrap_to_usize());
}
self.mut_updated().insert(index.unwrap_to_usize(), value);
Ok(())
}
fn header(&self) -> &Header;
fn mut_header(&mut self) -> &mut Header;
fn reset(&mut self) -> Result<()>;
#[inline]
fn reset_(&mut self) -> Result<()> {
let holes_path = self.holes_path();
if fs::exists(&holes_path)? {
fs::remove_file(&holes_path)?;
}
let mut file = self.open_file()?;
self.file_truncate_and_write_all(&mut file, HEADER_OFFSET as u64, &[])
}
#[inline]
fn is_pushed_empty(&self) -> bool {
self.pushed_len() == 0
}
#[inline]
fn has(&self, index: I) -> Result<bool> {
Ok(self.has_(index.to_usize()?))
}
#[inline]
fn has_(&self, index: usize) -> bool {
index < self.len_()
}
fn flush(&mut self) -> Result<()>;
fn truncate_if_needed(&mut self, index: I) -> Result<()>;
}

View File

@@ -0,0 +1,67 @@
use std::{fmt::Debug, ops::Add};
use brk_core::{Error, Printable, Result};
use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes};
pub trait StoredIndex
where
Self: Debug
+ Default
+ Copy
+ Clone
+ PartialEq
+ Eq
+ PartialOrd
+ Ord
+ TryInto<usize>
+ From<usize>
+ Add<usize, Output = Self>
+ TryFromBytes
+ IntoBytes
+ Immutable
+ KnownLayout
+ Send
+ Sync
+ Printable,
{
fn unwrap_to_usize(self) -> usize;
fn to_usize(self) -> Result<usize>;
fn decremented(self) -> Option<Self>;
}
impl<I> StoredIndex for I
where
I: Debug
+ Default
+ Copy
+ Clone
+ PartialEq
+ Eq
+ PartialOrd
+ Ord
+ TryInto<usize>
+ From<usize>
+ Add<usize, Output = Self>
+ TryFromBytes
+ IntoBytes
+ Immutable
+ KnownLayout
+ Send
+ Sync
+ Printable,
{
#[inline]
fn unwrap_to_usize(self) -> usize {
self.to_usize().unwrap()
}
#[inline]
fn to_usize(self) -> Result<usize> {
self.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)
}
#[inline]
fn decremented(self) -> Option<Self> {
self.unwrap_to_usize().checked_sub(1).map(Self::from)
}
}

View File

@@ -0,0 +1,105 @@
use std::{borrow::Cow, iter::Skip};
use brk_core::Printable;
use super::{StoredIndex, StoredType};
pub trait BaseVecIterator: Iterator {
fn mut_index(&mut self) -> &mut usize;
#[inline]
fn set_(&mut self, i: usize) {
*self.mut_index() = i;
}
#[inline]
fn next_at(&mut self, i: usize) -> Option<Self::Item> {
self.set_(i);
self.next()
}
fn len(&self) -> usize;
fn name(&self) -> &str;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn skip(self, _: usize) -> Skip<Self>
where
Self: Sized,
{
todo!("")
}
}
pub trait VecIterator<'a>: BaseVecIterator<Item = (Self::I, Cow<'a, Self::T>)> {
type I: StoredIndex;
type T: StoredType + 'a;
#[inline]
fn set(&mut self, i: Self::I) {
self.set_(i.unwrap_to_usize())
}
#[inline]
fn get_(&mut self, i: usize) -> Option<Cow<'a, Self::T>> {
self.next_at(i).map(|(_, v)| v)
}
#[inline]
fn get(&mut self, i: Self::I) -> Option<Cow<'a, Self::T>> {
self.get_(i.unwrap_to_usize())
}
#[inline]
fn unwrap_get_inner(&mut self, i: Self::I) -> Self::T {
self.unwrap_get_inner_(i.unwrap_to_usize())
}
#[inline]
fn unwrap_get_inner_(&mut self, i: usize) -> Self::T {
self.get_(i)
.unwrap_or_else(|| {
dbg!(self.name(), i, self.len(), Self::I::to_string());
panic!("unwrap_get_inner_")
})
.into_owned()
}
#[inline]
fn get_inner(&mut self, i: Self::I) -> Option<Self::T> {
self.get_(i.unwrap_to_usize()).map(|v| v.into_owned())
}
fn last(mut self) -> Option<Self::Item>
where
Self: Sized,
{
let len = self.len();
if len == 0 {
return None;
}
let i = len - 1;
self.set_(i);
self.next()
}
fn index_type_to_string(&self) -> &'static str {
Self::I::to_string()
}
}
impl<'a, I, T, Iter> VecIterator<'a> for Iter
where
Iter: BaseVecIterator<Item = (I, Cow<'a, T>)>,
I: StoredIndex,
T: StoredType + 'a,
{
type I = I;
type T = T;
}
pub type BoxedVecIterator<'a, I, T> =
Box<dyn VecIterator<'a, I = I, T = T, Item = (I, Cow<'a, T>)> + 'a>;

View File

@@ -0,0 +1,13 @@
mod any;
mod collectable;
mod generic;
mod index;
mod iterator;
mod r#type;
pub use any::*;
pub use collectable::*;
pub use generic::*;
pub use index::*;
pub use iterator::*;
pub use r#type::*;

View File

@@ -0,0 +1,33 @@
use std::fmt::Debug;
use serde::Serialize;
use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes};
pub trait StoredType
where
Self: Sized
+ Debug
+ Clone
+ TryFromBytes
+ IntoBytes
+ Immutable
+ KnownLayout
+ Send
+ Sync
+ Serialize,
{
}
impl<T> StoredType for T where
T: Sized
+ Debug
+ Clone
+ TryFromBytes
+ IntoBytes
+ Immutable
+ KnownLayout
+ Send
+ Sync
+ Serialize
{
}

View File

@@ -0,0 +1,18 @@
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
#[derive(Debug, Clone, IntoBytes, Immutable, FromBytes, KnownLayout)]
pub struct CompressedPageMetadata {
pub start: u64,
pub bytes_len: u32,
pub values_len: u32,
}
impl CompressedPageMetadata {
pub fn new(start: u64, bytes_len: u32, values_len: u32) -> Self {
Self {
start,
bytes_len,
values_len,
}
}
}

View File

@@ -0,0 +1,117 @@
use std::{
fs::{self, OpenOptions},
io::{self, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};
use brk_core::Result;
use rayon::prelude::*;
use zerocopy::{IntoBytes, TryFromBytes};
use super::{CompressedPageMetadata, UnsafeSlice};
#[derive(Debug, Clone)]
pub struct CompressedPagesMetadata {
vec: Vec<CompressedPageMetadata>,
change_at: Option<usize>,
path: PathBuf,
}
impl CompressedPagesMetadata {
const PAGE_SIZE: usize = size_of::<CompressedPageMetadata>();
pub fn read(path: &Path) -> Result<CompressedPagesMetadata> {
let this = Self {
vec: fs::read(path)
.unwrap_or_default()
.chunks(Self::PAGE_SIZE)
.map(|bytes| {
if bytes.len() != Self::PAGE_SIZE {
panic!()
}
CompressedPageMetadata::try_read_from_bytes(bytes).unwrap()
})
.collect::<Vec<_>>(),
path: path.to_owned(),
change_at: None,
};
Ok(this)
}
pub fn write(&mut self) -> io::Result<()> {
if self.change_at.is_none() {
return Ok(());
}
let change_at = self.change_at.take().unwrap();
let len = (self.vec.len() - change_at) * Self::PAGE_SIZE;
let mut bytes: Vec<u8> = vec![0; len];
let unsafe_bytes = UnsafeSlice::new(&mut bytes);
self.vec[change_at..]
.par_iter()
.enumerate()
.for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::PAGE_SIZE, v.as_bytes()));
let mut file = OpenOptions::new()
.read(true)
.create(true)
.truncate(false)
.append(true)
.open(&self.path)?;
file.set_len((change_at * Self::PAGE_SIZE) as u64)?;
file.seek(SeekFrom::End(0))?;
file.write_all(&bytes)?;
Ok(())
}
pub fn len(&self) -> usize {
self.vec.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn get(&self, page_index: usize) -> Option<&CompressedPageMetadata> {
self.vec.get(page_index)
}
pub fn last(&self) -> Option<&CompressedPageMetadata> {
self.vec.last()
}
pub fn pop(&mut self) -> Option<CompressedPageMetadata> {
self.vec.pop()
}
pub fn push(&mut self, page_index: usize, page: CompressedPageMetadata) {
if page_index != self.vec.len() {
panic!();
}
self.set_changed_at(page_index);
self.vec.push(page);
}
fn set_changed_at(&mut self, page_index: usize) {
if self.change_at.is_none_or(|pi| pi > page_index) {
self.change_at.replace(page_index);
}
}
pub fn truncate(&mut self, page_index: usize) -> Option<CompressedPageMetadata> {
let page = self.get(page_index).cloned();
self.vec.truncate(page_index);
self.set_changed_at(page_index);
page
}
}

View File

@@ -1,3 +1,9 @@
mod compressed;
mod raw;
mod stamped;
mod stored;
pub use compressed::*;
pub use raw::*;
pub use stamped::*;
pub use stored::*;

View File

@@ -1,8 +0,0 @@
use std::sync::Arc;
use crate::File;
pub struct RawVec {
region: usize,
file: Arc<File>,
}

View File

@@ -0,0 +1,195 @@
use std::{
fs::File,
io::{self, Seek, SeekFrom},
os::unix::fs::FileExt,
sync::Arc,
};
use brk_core::{Error, Result, Version};
use parking_lot::RwLock;
use zerocopy::{FromBytes, IntoBytes};
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::Stamp;
use super::Format;
const HEADER_VERSION: Version = Version::ONE;
pub const HEADER_OFFSET: usize = size_of::<HeaderInner>();
#[derive(Debug, Clone)]
pub struct Header {
inner: Arc<RwLock<HeaderInner>>,
modified: bool,
}
impl Header {
pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result<Self> {
let inner = HeaderInner::create_and_write(file, vec_version, format)?;
Ok(Self {
inner: Arc::new(RwLock::new(inner)),
modified: false,
})
}
pub fn import_and_verify(
file: &mut File,
vec_version: Version,
format: Format,
) -> Result<Self> {
let inner = HeaderInner::import_and_verify(file, vec_version, format)?;
Ok(Self {
inner: Arc::new(RwLock::new(inner)),
modified: false,
})
}
pub fn update_stamp(&mut self, stamp: Stamp) {
self.modified = true;
self.inner.write().stamp = stamp;
}
pub fn update_computed_version(&mut self, computed_version: Version) {
self.modified = true;
self.inner.write().computed_version = computed_version;
}
pub fn modified(&self) -> bool {
self.modified
}
pub fn vec_version(&self) -> Version {
self.inner.read().vec_version
}
pub fn computed_version(&self) -> Version {
self.inner.read().computed_version
}
pub fn stamp(&self) -> Stamp {
self.inner.read().stamp
}
pub fn write(&mut self, file: &mut File) -> io::Result<()> {
self.inner.read().write(file)?;
self.modified = false;
Ok(())
}
pub fn inner(&self) -> &Arc<RwLock<HeaderInner>> {
&self.inner
}
}
#[repr(C)]
#[derive(Debug, Clone, FromBytes, IntoBytes, Immutable, KnownLayout)]
struct HeaderInner {
pub header_version: Version,
pub vec_version: Version,
pub computed_version: Version,
pub stamp: Stamp,
pub compressed: ZeroCopyBool,
}
impl HeaderInner {
pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result<Self> {
let header = Self {
header_version: HEADER_VERSION,
vec_version,
computed_version: Version::default(),
stamp: Stamp::default(),
compressed: ZeroCopyBool::from(format),
};
header.write(file)?;
file.seek(SeekFrom::End(0))?;
Ok(header)
}
pub fn write(&self, file: &mut File) -> io::Result<()> {
file.write_all_at(self.as_bytes(), 0)
}
pub fn import_and_verify(
file: &mut File,
vec_version: Version,
format: Format,
) -> Result<Self> {
let len = file.metadata()?.len();
if len < HEADER_OFFSET as u64 {
return Err(Error::WrongLength);
}
let mut buf = [0; HEADER_OFFSET];
file.read_exact_at(&mut buf, 0)?;
let header = HeaderInner::read_from_bytes(&buf)?;
if header.header_version != HEADER_VERSION {
return Err(Error::DifferentVersion {
found: header.header_version,
expected: HEADER_VERSION,
});
}
if header.vec_version != vec_version {
return Err(Error::DifferentVersion {
found: header.vec_version,
expected: vec_version,
});
}
if header.compressed.is_broken() {
return Err(Error::WrongEndian);
}
if (header.compressed.is_true() && format.is_raw())
|| (header.compressed.is_false() && format.is_compressed())
{
return Err(Error::DifferentCompressionMode);
}
Ok(header)
}
}
#[derive(
Debug,
Clone,
Copy,
Default,
PartialEq,
Eq,
PartialOrd,
Ord,
FromBytes,
IntoBytes,
Immutable,
KnownLayout,
)]
#[repr(C)]
pub struct ZeroCopyBool(u64);
impl ZeroCopyBool {
pub const TRUE: Self = Self(1);
pub const FALSE: Self = Self(0);
pub fn is_true(&self) -> bool {
*self == Self::TRUE
}
pub fn is_false(&self) -> bool {
*self == Self::FALSE
}
pub fn is_broken(&self) -> bool {
*self > Self::TRUE
}
}
impl From<Format> for ZeroCopyBool {
fn from(value: Format) -> Self {
if value.is_raw() {
Self::FALSE
} else {
Self::TRUE
}
}
}

View File

@@ -0,0 +1,448 @@
use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet},
fs, io,
marker::PhantomData,
mem,
os::unix::fs::FileExt,
sync::Arc,
};
use brk_core::{Error, Result, Version};
use memmap2::Mmap;
use rayon::prelude::*;
use zerocopy::IntoBytes;
use crate::{
AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec,
File, GenericStoredVec, StoredIndex, StoredType, file::Reader,
};
use super::Format;
mod header;
mod unsafe_slice;
pub use header::*;
pub use unsafe_slice::*;
const VERSION: Version = Version::ONE;
#[derive(Debug)]
pub struct RawVec<I, T> {
region: usize,
file: Arc<File>,
header: Header,
name: &'static str,
pushed: Vec<T>,
has_stored_holes: bool,
holes: BTreeSet<usize>,
updated: BTreeMap<usize, T>,
phantom: PhantomData<I>,
}
impl<I, T> RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
/// Same as import but will reset the folder under certain errors, so be careful !
pub fn forced_import(file: &Arc<File>, name: &str, mut version: Version) -> Result<Self> {
version = version + VERSION;
let res = Self::import(file, name, version);
match res {
Err(Error::DifferentCompressionMode)
| Err(Error::WrongEndian)
| Err(Error::WrongLength)
| Err(Error::DifferentVersion { .. }) => {
fs::remove_file(path)?;
let holes_path = Self::holes_path_(parent, name);
if fs::exists(&holes_path)? {
fs::remove_file(holes_path)?;
}
Self::import(file, name, version)
}
_ => res,
}
}
pub fn import(file: &Arc<File>, name: &str, version: Version) -> Result<Self> {
let region = file.create_region_if_needed(&format!("{name}_{}", I::to_string()))?;
let (header, file) = match Self::open_file_(&path) {
Ok(mut file) => {
if file.metadata()?.len() == 0 {
(
Header::create_and_write(&mut file, version, Format::Raw)?,
Some(file),
)
} else {
(
Header::import_and_verify(&mut file, version, Format::Raw)?,
Some(file),
)
}
}
Err(e) => match e.kind() {
io::ErrorKind::NotFound => {
fs::create_dir_all(Self::folder_(parent, name))?;
let mut file = Self::open_file_(&path)?;
let header = Header::create_and_write(&mut file, version, Format::Raw)?;
(header, None)
}
_ => {
return Err(e.into());
}
},
};
let stored_len = if let Some(file) = file {
(file.metadata()?.len() as usize - HEADER_OFFSET) / Self::SIZE_OF_T
} else {
0
};
let holes_path = Self::holes_path_(parent, name);
let holes = if fs::exists(&holes_path)? {
Some(
fs::read(&holes_path)?
.chunks(size_of::<usize>())
.map(|b| -> Result<usize> {
Ok(usize::from_ne_bytes(brk_core::copy_first_8bytes(b)?))
})
.collect::<Result<BTreeSet<usize>>>()?,
)
} else {
None
};
Ok(Self {
file,
region,
header,
name: Box::leak(Box::new(name.to_string())),
pushed: vec![],
has_stored_holes: holes.is_some(),
holes: holes.unwrap_or_default(),
updated: BTreeMap::new(),
phantom: PhantomData,
})
}
#[inline]
pub fn iter(&self) -> RawVecIterator<'_, I, T> {
self.into_iter()
}
#[inline]
pub fn iter_at(&self, i: I) -> RawVecIterator<'_, I, T> {
self.iter_at_(i.unwrap_to_usize())
}
#[inline]
pub fn iter_at_(&self, i: usize) -> RawVecIterator<'_, I, T> {
let mut iter = self.into_iter();
iter.set_(i);
iter
}
pub fn write_header_if_needed(&mut self) -> Result<()> {
if self.header.modified() {
self.file.write_all_to_region_at(
self.region,
self.header.inner().read().as_bytes(),
0,
)?;
}
Ok(())
}
}
impl<I, T> GenericStoredVec<I, T> for RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
#[inline]
fn read_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let index = index * Self::SIZE_OF_T + HEADER_OFFSET;
let slice = &mmap[index..(index + Self::SIZE_OF_T)];
T::try_read_from_bytes(slice)
.map(|v| Some(v))
.map_err(Error::from)
}
fn header(&self) -> &Header {
&self.header
}
fn mut_header(&mut self) -> &mut Header {
&mut self.header
}
#[inline]
fn stored_len(&self) -> usize {
self.file.get_region(self.region).unwrap().len() as usize / Self::SIZE_OF_T
}
#[inline]
fn pushed(&self) -> &[T] {
self.pushed.as_slice()
}
#[inline]
fn mut_pushed(&mut self) -> &mut Vec<T> {
&mut self.pushed
}
#[inline]
fn holes(&self) -> &BTreeSet<usize> {
&self.holes
}
#[inline]
fn mut_holes(&mut self) -> &mut BTreeSet<usize> {
&mut self.holes
}
#[inline]
fn updated(&self) -> &BTreeMap<usize, T> {
&self.updated
}
#[inline]
fn mut_updated(&mut self) -> &mut BTreeMap<usize, T> {
&mut self.updated
}
fn flush(&mut self) -> Result<()> {
let file_opt = self.write_header_if_needed()?;
let pushed_len = self.pushed_len();
let has_new_data = pushed_len != 0;
let has_updated_data = !self.updated.is_empty();
let has_holes = !self.holes.is_empty();
let had_holes = self.has_stored_holes && !has_holes;
if !has_new_data && !has_updated_data && !has_holes && !had_holes {
return Ok(());
}
if has_new_data || has_updated_data {
let mut file = file_opt.unwrap_or(self.open_file()?);
if has_new_data {
let bytes = {
let mut bytes: Vec<u8> = vec![0; pushed_len * Self::SIZE_OF_T];
let unsafe_bytes = UnsafeSlice::new(&mut bytes);
mem::take(&mut self.pushed)
.into_par_iter()
.enumerate()
.for_each(|(i, v)| {
unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())
});
bytes
};
self.file_write_all(&mut file, &bytes)?;
}
if has_updated_data {
mem::take(&mut self.updated)
.into_iter()
.try_for_each(|(i, v)| -> Result<()> {
file.write_all_at(
v.as_bytes(),
((i * Self::SIZE_OF_T) + HEADER_OFFSET) as u64,
)?;
Ok(())
})?;
}
}
if has_holes || had_holes {
let holes_path = self.holes_path();
if has_holes {
self.has_stored_holes = true;
fs::write(
&holes_path,
self.holes
.iter()
.flat_map(|i| i.to_ne_bytes())
.collect::<Vec<_>>(),
)?;
} else if had_holes {
self.has_stored_holes = false;
let _ = fs::remove_file(&holes_path);
}
}
Ok(())
}
fn truncate_if_needed(&mut self, index: I) -> Result<()> {
let index = index.to_usize()?;
if index >= self.stored_len() {
return Ok(());
}
if index == 0 {
self.reset()?;
return Ok(());
}
let from = index * Self::SIZE_OF_T + HEADER_OFFSET;
self.file.truncate_region(self.region, from as u64)
}
fn reset(&mut self) -> Result<()> {
self.set_stored_len(0);
self.reset_()
}
}
impl<I, T> AnyVec for RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
#[inline]
fn version(&self) -> Version {
self.header.vec_version()
}
#[inline]
fn name(&self) -> &str {
self.name
}
#[inline]
fn len(&self) -> usize {
self.len_()
}
#[inline]
fn index_type_to_string(&self) -> &'static str {
I::to_string()
}
#[inline]
fn value_type_to_size_of(&self) -> usize {
size_of::<T>()
}
}
impl<I, T> Clone for RawVec<I, T> {
fn clone(&self) -> Self {
Self {
file: self.file.clone(),
region: self.region,
header: self.header.clone(),
name: self.name,
pushed: vec![],
updated: BTreeMap::new(),
has_stored_holes: false,
holes: BTreeSet::new(),
phantom: PhantomData,
}
}
}
#[derive(Debug)]
pub struct RawVecIterator<'a, I, T> {
vec: &'a RawVec<I, T>,
reader: Reader<'a>,
index: usize,
}
impl<I, T> BaseVecIterator for RawVecIterator<'_, I, T>
where
I: StoredIndex,
T: StoredType,
{
#[inline]
fn mut_index(&mut self) -> &mut usize {
&mut self.index
}
#[inline]
fn len(&self) -> usize {
self.vec.len()
}
#[inline]
fn name(&self) -> &str {
self.vec.name()
}
}
impl<'a, I, T> Iterator for RawVecIterator<'a, I, T>
where
I: StoredIndex,
T: StoredType,
{
type Item = (I, Cow<'a, T>);
fn next(&mut self) -> Option<Self::Item> {
let index = self.index;
let opt = self
.vec
.get_or_read_(index, &self.mmap)
.unwrap()
.map(|v| (I::from(index), v));
if opt.is_some() {
self.index += 1;
}
opt
}
}
impl<'a, I, T> IntoIterator for &'a RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
type Item = (I, Cow<'a, T>);
type IntoIter = RawVecIterator<'a, I, T>;
fn into_iter(self) -> Self::IntoIter {
RawVecIterator {
vec: self,
reader: self.file.read_region(self.region).unwrap(),
index: 0,
}
}
}
impl<I, T> AnyIterableVec<I, T> for RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn boxed_iter<'a>(&'a self) -> BoxedVecIterator<'a, I, T>
where
T: 'a,
{
Box::new(self.into_iter())
}
}
impl<I, T> AnyCollectableVec for RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn collect_range_serde_json(
&self,
from: Option<usize>,
to: Option<usize>,
) -> Result<Vec<serde_json::Value>> {
CollectableVec::collect_range_serde_json(self, from, to)
}
}

View File

@@ -0,0 +1,35 @@
use std::cell::UnsafeCell;
#[derive(Copy, Clone)]
pub struct UnsafeSlice<'a, T>(&'a [UnsafeCell<T>]);
unsafe impl<T: Send + Sync> Send for UnsafeSlice<'_, T> {}
unsafe impl<T: Send + Sync> Sync for UnsafeSlice<'_, T> {}
impl<'a, T> UnsafeSlice<'a, T> {
pub fn new(slice: &'a mut [T]) -> Self {
let ptr = slice as *mut [T] as *const [UnsafeCell<T>];
Self(unsafe { &*ptr })
}
/// SAFETY: It is UB if two threads write to the same index without
/// synchronization.
pub fn write(&self, i: usize, value: T) {
unsafe {
*self.0[i].get() = value;
}
}
/// SAFETY: It is UB
pub fn get(&self, i: usize) -> *mut T {
self.0[i].get()
}
pub fn copy_slice(&self, start: usize, slice: &[T])
where
T: Copy,
{
slice.iter().enumerate().for_each(|(i, v)| {
self.write(start + i, *v);
});
}
}

View File

@@ -1,3 +0,0 @@
pub struct Stamp(u64);
pub struct StampedVec;

View File

@@ -0,0 +1,5 @@
mod stamp;
pub use stamp::*;
pub struct StampedVec;

View File

@@ -0,0 +1,4 @@
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
#[derive(Debug, Default, Clone, Copy, FromBytes, IntoBytes, Immutable, KnownLayout)]
pub struct Stamp(u64);

View File

@@ -0,0 +1,63 @@
use std::{fs, io, path::Path};
use brk_core::{Error, Result};
use serde::{Deserialize, Serialize};
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Format {
Compressed,
#[default]
Raw,
}
impl Format {
pub fn write(&self, path: &Path) -> Result<(), io::Error> {
fs::write(path, self.as_bytes())
}
pub fn is_raw(&self) -> bool {
*self == Self::Raw
}
pub fn is_compressed(&self) -> bool {
*self == Self::Compressed
}
fn as_bytes(&self) -> Vec<u8> {
if self.is_compressed() {
vec![1]
} else {
vec![0]
}
}
fn from_bytes(bytes: &[u8]) -> Self {
if bytes.len() != 1 {
panic!();
}
if bytes[0] == 1 {
Self::Compressed
} else if bytes[0] == 0 {
Self::Raw
} else {
panic!()
}
}
pub fn validate(&self, path: &Path) -> Result<()> {
if let Ok(prev_compressed) = Format::try_from(path) {
if prev_compressed != *self {
return Err(Error::DifferentCompressionMode);
}
}
Ok(())
}
}
impl TryFrom<&Path> for Format {
type Error = Error;
fn try_from(value: &Path) -> Result<Self, Self::Error> {
Ok(Self::from_bytes(&fs::read(value)?))
}
}

View File

@@ -0,0 +1,3 @@
mod format;
pub use format::*;