vecs: part 6

This commit is contained in:
nym21
2025-07-23 09:17:26 +02:00
parent 3ac9c2d95e
commit c4fc24c513
5 changed files with 80 additions and 33 deletions

4
Cargo.lock generated
View File

@@ -1057,7 +1057,6 @@ dependencies = [
name = "brk_vecs" name = "brk_vecs"
version = "0.0.81" version = "0.0.81"
dependencies = [ dependencies = [
"arc-swap",
"bincode", "bincode",
"brk_core", "brk_core",
"brk_exit", "brk_exit",
@@ -1066,9 +1065,6 @@ dependencies = [
"memmap2", "memmap2",
"parking_lot", "parking_lot",
"rayon", "rayon",
"serde",
"serde_derive",
"serde_json",
"zerocopy", "zerocopy",
"zerocopy-derive", "zerocopy-derive",
"zstd", "zstd",

View File

@@ -10,7 +10,6 @@ homepage.workspace = true
repository.workspace = true repository.workspace = true
[dependencies] [dependencies]
arc-swap = { workspace = true }
bincode = { workspace = true } bincode = { workspace = true }
brk_core = { workspace = true } brk_core = { workspace = true }
brk_exit = { workspace = true } brk_exit = { workspace = true }
@@ -19,9 +18,6 @@ log = { workspace = true }
memmap2 = "0.9.7" memmap2 = "0.9.7"
parking_lot = {workspace = true} parking_lot = {workspace = true}
rayon = { workspace = true } rayon = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
zerocopy = { workspace = true } zerocopy = { workspace = true }
zerocopy-derive = { workspace = true } zerocopy-derive = { workspace = true }
zstd = "0.13.3" zstd = "0.13.3"

View File

@@ -1,12 +1,45 @@
use std::path::Path; use std::{fs, path::Path};
use brk_core::Result; use brk_core::Result;
use brk_vecs::{File, PAGE_SIZE}; use brk_vecs::{File, PAGE_SIZE};
fn main() -> Result<()> { fn main() -> Result<()> {
let _ = fs::remove_dir_all("vecs");
let file = File::open(Path::new("vecs"))?; let file = File::open(Path::new("vecs"))?;
file.set_min_len(PAGE_SIZE * 1_000_000)?; let region1_i = file.create_region_if_needed("region1")?;
dbg!(region1_i);
assert!(file.get_region(region1_i).unwrap().read().len() == 0);
file.write_all(region1_i, &[0, 1, 2, 3, 4])?;
{
let opt = file.get_region(region1_i);
let region = opt.as_ref().unwrap().read();
assert!(region.start() == 0 && region.len() == 5 && region.reserved() == PAGE_SIZE);
}
assert!(file.mmap.read()[0..10] == [0, 1, 2, 3, 4, 0, 0, 0, 0, 0]);
file.write_all(region1_i, &[5, 6, 7, 8, 9])?;
assert!(file.mmap.read()[0..10] == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
file.write_all_at(region1_i, &[1, 2], 0)?;
assert!(file.mmap.read()[0..10] == [1, 2, 2, 3, 4, 5, 6, 7, 8, 9]);
{
let opt = file.get_region(region1_i);
let region = opt.as_ref().unwrap().read();
dbg!(&region);
assert!(region.start() == 0 && region.len() == 10 && region.reserved() == PAGE_SIZE);
}
// file.set_min_len(PAGE_SIZE * 1_000_000)?;
Ok(()) Ok(())
} }

View File

@@ -24,10 +24,11 @@ pub const PAGE_SIZE: u64 = 4096;
pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1; pub const PAGE_SIZE_MINUS_1: u64 = PAGE_SIZE - 1;
pub struct File { pub struct File {
regions: RwLock<Regions>, // TODO: Remove pub
layout: RwLock<Layout>, pub regions: RwLock<Regions>,
file: RwLock<fs::File>, pub layout: RwLock<Layout>,
mmap: RwLock<MmapMut>, pub file: RwLock<fs::File>,
pub mmap: RwLock<MmapMut>,
} }
impl File { impl File {
@@ -75,8 +76,8 @@ impl File {
self.set_min_len(regions as u64 * PAGE_SIZE) self.set_min_len(regions as u64 * PAGE_SIZE)
} }
pub fn get_or_create(&self, id: String) -> Result<usize> { pub fn create_region_if_needed(&self, id: &str) -> Result<usize> {
if let Some(index) = self.regions.read().get_region_index_from_id(id.clone()) { if let Some(index) = self.regions.read().get_region_index_from_id(id.to_owned()) {
return Ok(index); return Ok(index);
} }
let mut regions = self.regions.write(); let mut regions = self.regions.write();
@@ -98,13 +99,17 @@ impl File {
start start
}; };
let index = regions.create_region(id, start)?; let index = regions.create_region(id.to_owned(), start)?;
layout.insert_region(start, index); layout.insert_region(start, index);
Ok(index) Ok(index)
} }
pub fn get_region(&self, index: usize) -> Option<Arc<RwLock<Region>>> {
self.regions.read().get_region_from_index(index)
}
pub fn read<'a>(&'a self, index: usize) -> Result<Reader<'a>> { pub fn read<'a>(&'a self, index: usize) -> Result<Reader<'a>> {
let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read(); let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read();
let region: RwLockReadGuard<'static, Region> = unsafe { let region: RwLockReadGuard<'static, Region> = unsafe {
@@ -120,16 +125,16 @@ impl File {
} }
#[inline] #[inline]
pub fn write_all(&mut self, region: usize, data: &[u8]) -> Result<()> { pub fn write_all(&self, region: usize, data: &[u8]) -> Result<()> {
self.write_all_at_(region, data, None) self.write_all_at_(region, data, None)
} }
#[inline] #[inline]
pub fn write_all_at(&mut self, region: usize, data: &[u8], at: u64) -> Result<()> { pub fn write_all_at(&self, region: usize, data: &[u8], at: u64) -> Result<()> {
self.write_all_at_(region, data, Some(at)) self.write_all_at_(region, data, Some(at))
} }
fn write_all_at_(&mut self, region_index: usize, data: &[u8], at: Option<u64>) -> Result<()> { fn write_all_at_(&self, region_index: usize, data: &[u8], at: Option<u64>) -> Result<()> {
let Some(region) = self.regions.read().get_region_from_index(region_index) else { let Some(region) = self.regions.read().get_region_from_index(region_index) else {
return Err(Error::Str("Unknown region")); return Err(Error::Str("Unknown region"));
}; };
@@ -138,21 +143,27 @@ impl File {
let reserved = region_lock.reserved(); let reserved = region_lock.reserved();
let left = region_lock.left(); let left = region_lock.left();
let len = region_lock.len(); let len = region_lock.len();
let end = start + len;
let data_len = data.len() as u64; let data_len = data.len() as u64;
drop(region_lock); drop(region_lock);
let new_left = at.map_or_else(|| left, |at| reserved - (at - start));
let new_len = reserved - new_left;
let write_start = at.unwrap_or(start + len); let write_start = at.unwrap_or(start + len);
if at.is_some_and(|at| at < start || at >= start + reserved) {
return Err(Error::Str("Invalid at parameter"));
}
// Write to reserved space if possible // Write to reserved space if possible
if new_left >= data_len { let at_left = at.map_or_else(|| left, |at| reserved - at);
if at_left >= data_len {
let len = reserved - at_left.min(left) + data_len;
dbg!(write_start);
self.write(write_start, data); self.write(write_start, data);
let regions = self.regions.read(); let regions = self.regions.read();
let mut region_lock = region.write(); let mut region_lock = region.write();
region_lock.set_len(new_len); dbg!(len);
region_lock.set_len(len);
regions.write_to_mmap(&region_lock, region_index); regions.write_to_mmap(&region_lock, region_index);
return Ok(()); return Ok(());
} }
@@ -200,7 +211,10 @@ impl File {
// Find hole big enough to move the region // Find hole big enough to move the region
if let Some(hole_start) = layout_lock.find_smallest_adequate_hole(new_reserved) { if let Some(hole_start) = layout_lock.find_smallest_adequate_hole(new_reserved) {
self.write(hole_start, &self.mmap.read()[start as usize..end as usize]); self.write(
hole_start,
&self.mmap.read()[start as usize..(start + len) as usize],
);
self.write(hole_start + len, data); self.write(hole_start + len, data);
let regions = self.regions.read(); let regions = self.regions.read();
@@ -233,7 +247,10 @@ impl File {
.reserved(); .reserved();
self.set_min_len(new_start + new_reserved)?; self.set_min_len(new_start + new_reserved)?;
self.write(new_start, &self.mmap.read()[start as usize..end as usize]); self.write(
new_start,
&self.mmap.read()[start as usize..(start + len) as usize],
);
self.write(new_start + len, data); self.write(new_start + len, data);
region_lock.set_start(new_start); region_lock.set_start(new_start);
@@ -271,7 +288,7 @@ impl File {
let len = region_.len(); let len = region_.len();
let reserved = region_.reserved(); let reserved = region_.reserved();
if from <= start { if from < start {
return Err(Error::Str("Truncating too much")); return Err(Error::Str("Truncating too much"));
} else if from >= len { } else if from >= len {
return Err(Error::Str("Not truncating enough")); return Err(Error::Str("Not truncating enough"));

View File

@@ -30,6 +30,8 @@ impl Regions {
pub fn open(path: &Path) -> Result<Self> { pub fn open(path: &Path) -> Result<Self> {
let path = path.join("regions"); let path = path.join("regions");
fs::create_dir_all(&path)?;
let id_to_index_file = OpenOptions::new() let id_to_index_file = OpenOptions::new()
.read(true) .read(true)
.create(true) .create(true)
@@ -37,9 +39,12 @@ impl Regions {
.truncate(false) .truncate(false)
.open(path.join("id_to_index"))?; .open(path.join("id_to_index"))?;
let mut reader = BufReader::new(&id_to_index_file); let mut id_to_index: HashMap<String, usize> = HashMap::new();
let id_to_index: HashMap<String, usize> =
decode_from_std_read(&mut reader, bincode::config::standard())?; if id_to_index_file.metadata()?.len() > 0 {
let mut reader = BufReader::new(&id_to_index_file);
id_to_index = decode_from_std_read(&mut reader, bincode::config::standard())?;
}
let index_to_region_file = OpenOptions::new() let index_to_region_file = OpenOptions::new()
.read(true) .read(true)
@@ -96,7 +101,7 @@ impl Regions {
.map(|(index, _)| index) .map(|(index, _)| index)
.unwrap_or_else(|| self.index_to_region.len()); .unwrap_or_else(|| self.index_to_region.len());
let region = Region::new(start, PAGE_SIZE, PAGE_SIZE); let region = Region::new(start, 0, PAGE_SIZE);
self.index_to_region self.index_to_region
.push(Some(Arc::new(RwLock::new(region.clone())))); .push(Some(Arc::new(RwLock::new(region.clone()))));