vecs: part 4

This commit is contained in:
nym21
2025-07-22 17:36:34 +02:00
parent 10ae1911c3
commit e5ab4dafc0
6 changed files with 258 additions and 787 deletions

View File

@@ -87,12 +87,12 @@ impl PriceToAmount {
self.height = Some(height);
height.write(&self.path_height())?;
let config = config::standard();
let file = File::create(self.path_state()).inspect_err(|_| {
dbg!(self.path_state());
})?;
let mut writer = BufWriter::new(file);
encode_into_std_write(&self.state, &mut writer, config)?;
encode_into_std_write(&self.state, &mut writer, config::standard())?;
Ok(())
}

View File

@@ -1,641 +0,0 @@
use std::{path::Path, thread};
use brk_core::{
AddressData, ByAddressType, EmptyAddressData, Height, OutputType, P2AAddressIndex,
P2PK33AddressIndex, P2PK65AddressIndex, P2PKHAddressIndex, P2SHAddressIndex, P2TRAddressIndex,
P2WPKHAddressIndex, P2WSHAddressIndex, Result, TypeIndex, Version,
};
use brk_store::{AnyStore, Store};
use fjall::{PersistMode, TransactionalKeyspace};
use log::info;
use crate::stateful::{AddressTypeToTypeIndexTree, WithAddressDataSource};
const VERSION: Version = Version::ZERO;
#[derive(Clone)]
pub struct Stores {
keyspace: TransactionalKeyspace,
pub p2aaddressindex_to_addressdata: Store<P2AAddressIndex, AddressData>,
pub p2aaddressindex_to_emptyaddressdata: Store<P2AAddressIndex, EmptyAddressData>,
pub p2pk33addressindex_to_addressdata: Store<P2PK33AddressIndex, AddressData>,
pub p2pk33addressindex_to_emptyaddressdata: Store<P2PK33AddressIndex, EmptyAddressData>,
pub p2pk65addressindex_to_addressdata: Store<P2PK65AddressIndex, AddressData>,
pub p2pk65addressindex_to_emptyaddressdata: Store<P2PK65AddressIndex, EmptyAddressData>,
pub p2pkhaddressindex_to_addressdata: Store<P2PKHAddressIndex, AddressData>,
pub p2pkhaddressindex_to_emptyaddressdata: Store<P2PKHAddressIndex, EmptyAddressData>,
pub p2shaddressindex_to_addressdata: Store<P2SHAddressIndex, AddressData>,
pub p2shaddressindex_to_emptyaddressdata: Store<P2SHAddressIndex, EmptyAddressData>,
pub p2traddressindex_to_addressdata: Store<P2TRAddressIndex, AddressData>,
pub p2traddressindex_to_emptyaddressdata: Store<P2TRAddressIndex, EmptyAddressData>,
pub p2wpkhaddressindex_to_addressdata: Store<P2WPKHAddressIndex, AddressData>,
pub p2wpkhaddressindex_to_emptyaddressdata: Store<P2WPKHAddressIndex, EmptyAddressData>,
pub p2wshaddressindex_to_addressdata: Store<P2WSHAddressIndex, AddressData>,
pub p2wshaddressindex_to_emptyaddressdata: Store<P2WSHAddressIndex, EmptyAddressData>,
}
impl Stores {
pub fn import(
path: &Path,
version: Version,
keyspace: &TransactionalKeyspace,
) -> color_eyre::Result<Self> {
let (
(p2aaddressindex_to_addressdata, p2aaddressindex_to_emptyaddressdata),
(p2pk33addressindex_to_addressdata, p2pk33addressindex_to_emptyaddressdata),
(p2pk65addressindex_to_addressdata, p2pk65addressindex_to_emptyaddressdata),
(p2pkhaddressindex_to_addressdata, p2pkhaddressindex_to_emptyaddressdata),
(p2shaddressindex_to_addressdata, p2shaddressindex_to_emptyaddressdata),
(p2traddressindex_to_addressdata, p2traddressindex_to_emptyaddressdata),
(p2wpkhaddressindex_to_addressdata, p2wpkhaddressindex_to_emptyaddressdata),
(p2wshaddressindex_to_addressdata, p2wshaddressindex_to_emptyaddressdata),
) = thread::scope(|scope| {
let p2a = scope.spawn(|| {
(
Store::import(
keyspace,
path,
"p2aaddressindex_to_addressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
Store::import(
keyspace,
path,
"p2aaddressindex_to_emptyaddressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
)
});
let p2pk33 = scope.spawn(|| {
(
Store::import(
keyspace,
path,
"p2pk33addressindex_to_addressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
Store::import(
keyspace,
path,
"p2pk33addressindex_to_emptyaddressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
)
});
let p2pk65 = scope.spawn(|| {
(
Store::import(
keyspace,
path,
"p2pk65addressindex_to_addressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
Store::import(
keyspace,
path,
"p2pk65addressindex_to_emptyaddressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
)
});
let p2pkh = scope.spawn(|| {
(
Store::import(
keyspace,
path,
"p2pkhaddressindex_to_addressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
Store::import(
keyspace,
path,
"p2pkhaddressindex_to_emptyaddressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
)
});
let p2sh = scope.spawn(|| {
(
Store::import(
keyspace,
path,
"p2shaddressindex_to_addressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
Store::import(
keyspace,
path,
"p2shaddressindex_to_emptyaddressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
)
});
let p2tr = scope.spawn(|| {
(
Store::import(
keyspace,
path,
"p2traddressindex_to_addressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
Store::import(
keyspace,
path,
"p2traddressindex_to_emptyaddressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
)
});
let p2wpkh = scope.spawn(|| {
(
Store::import(
keyspace,
path,
"p2wpkhaddressindex_to_addressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
Store::import(
keyspace,
path,
"p2wpkhaddressindex_to_emptyaddressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
)
});
let p2wsh = scope.spawn(|| {
(
Store::import(
keyspace,
path,
"p2wshaddressindex_to_addressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
Store::import(
keyspace,
path,
"p2wshaddressindex_to_emptyaddressdata",
version + VERSION + Version::ZERO,
None,
)
.unwrap(),
)
});
(
p2a.join().unwrap(),
p2pk33.join().unwrap(),
p2pk65.join().unwrap(),
p2pkh.join().unwrap(),
p2sh.join().unwrap(),
p2tr.join().unwrap(),
p2wpkh.join().unwrap(),
p2wsh.join().unwrap(),
)
});
Ok(Self {
keyspace: keyspace.clone(),
p2aaddressindex_to_addressdata,
p2aaddressindex_to_emptyaddressdata,
p2pk33addressindex_to_addressdata,
p2pk33addressindex_to_emptyaddressdata,
p2pk65addressindex_to_addressdata,
p2pk65addressindex_to_emptyaddressdata,
p2pkhaddressindex_to_addressdata,
p2pkhaddressindex_to_emptyaddressdata,
p2shaddressindex_to_addressdata,
p2shaddressindex_to_emptyaddressdata,
p2traddressindex_to_addressdata,
p2traddressindex_to_emptyaddressdata,
p2wpkhaddressindex_to_addressdata,
p2wpkhaddressindex_to_emptyaddressdata,
p2wshaddressindex_to_addressdata,
p2wshaddressindex_to_emptyaddressdata,
})
}
pub fn starting_height(&self) -> Height {
self.as_slice()
.into_iter()
.map(|store| store.height().map(Height::incremented).unwrap_or_default())
.min()
.unwrap()
}
pub fn reset(&mut self) -> Result<()> {
info!("Resetting stores...");
info!("> If it gets stuck here, stop the program and start it again");
self.as_mut_slice()
.into_iter()
.try_for_each(|store| store.reset())?;
self.keyspace
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
pub fn get_addressdata(
&self,
address_type: OutputType,
type_index: TypeIndex,
) -> Result<Option<AddressData>> {
Ok(match address_type {
OutputType::P2A => self
.p2aaddressindex_to_addressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2PK33 => self
.p2pk33addressindex_to_addressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2PK65 => self
.p2pk65addressindex_to_addressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2PKH => self
.p2pkhaddressindex_to_addressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2SH => self
.p2shaddressindex_to_addressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2TR => self
.p2traddressindex_to_addressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2WPKH => self
.p2wpkhaddressindex_to_addressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2WSH => self
.p2wshaddressindex_to_addressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
_ => unreachable!(),
})
}
pub fn get_emptyaddressdata(
&self,
address_type: OutputType,
type_index: TypeIndex,
) -> Result<Option<EmptyAddressData>> {
Ok(match address_type {
OutputType::P2A => self
.p2aaddressindex_to_emptyaddressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2PK33 => self
.p2pk33addressindex_to_emptyaddressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2PK65 => self
.p2pk65addressindex_to_emptyaddressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2PKH => self
.p2pkhaddressindex_to_emptyaddressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2SH => self
.p2shaddressindex_to_emptyaddressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2TR => self
.p2traddressindex_to_emptyaddressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2WPKH => self
.p2wpkhaddressindex_to_emptyaddressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
OutputType::P2WSH => self
.p2wshaddressindex_to_emptyaddressdata
.get(&type_index.into())?
.map(|c| c.into_owned()),
_ => unreachable!(),
})
}
pub fn commit(
&mut self,
height: Height,
addresstype_to_typeindex_to_addressdata: AddressTypeToTypeIndexTree<
WithAddressDataSource<AddressData>,
>,
addresstype_to_typeindex_to_emptyaddressdata: AddressTypeToTypeIndexTree<
WithAddressDataSource<EmptyAddressData>,
>,
) -> Result<()> {
let ByAddressType {
p2pk65,
p2pk33,
p2pkh,
p2sh,
p2wpkh,
p2wsh,
p2tr,
p2a,
} = addresstype_to_typeindex_to_addressdata.unwrap();
let ByAddressType {
p2pk65: empty_p2pk65,
p2pk33: empty_p2pk33,
p2pkh: empty_p2pkh,
p2sh: empty_p2sh,
p2wpkh: empty_p2wpkh,
p2wsh: empty_p2wsh,
p2tr: empty_p2tr,
p2a: empty_p2a,
} = addresstype_to_typeindex_to_emptyaddressdata.unwrap();
thread::scope(|s| {
s.spawn(|| {
self.p2aaddressindex_to_addressdata.commit_(
height,
empty_p2a
.iter()
.filter(|(_, addressdata)| addressdata.is_from_addressdata())
.map(|(typeindex, _)| (*typeindex).into()),
p2a.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
s.spawn(|| {
self.p2pk33addressindex_to_addressdata.commit_(
height,
empty_p2pk33
.iter()
.filter(|(_, addressdata)| addressdata.is_from_addressdata())
.map(|(typeindex, _)| (*typeindex).into()),
p2pk33.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
s.spawn(|| {
self.p2pk65addressindex_to_addressdata.commit_(
height,
empty_p2pk65
.iter()
.filter(|(_, addressdata)| addressdata.is_from_addressdata())
.map(|(typeindex, _)| (*typeindex).into()),
p2pk65.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
s.spawn(|| {
self.p2pkhaddressindex_to_addressdata.commit_(
height,
empty_p2pkh
.iter()
.filter(|(_, addressdata)| addressdata.is_from_addressdata())
.map(|(typeindex, _)| (*typeindex).into()),
p2pkh.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
s.spawn(|| {
self.p2shaddressindex_to_addressdata.commit_(
height,
empty_p2sh
.iter()
.filter(|(_, addressdata)| addressdata.is_from_addressdata())
.map(|(typeindex, _)| (*typeindex).into()),
p2sh.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
s.spawn(|| {
self.p2traddressindex_to_addressdata.commit_(
height,
empty_p2tr
.iter()
.filter(|(_, addressdata)| addressdata.is_from_addressdata())
.map(|(typeindex, _)| (*typeindex).into()),
p2tr.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
s.spawn(|| {
self.p2wpkhaddressindex_to_addressdata.commit_(
height,
empty_p2wpkh
.iter()
.filter(|(_, addressdata)| addressdata.is_from_addressdata())
.map(|(typeindex, _)| (*typeindex).into()),
p2wpkh.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
s.spawn(|| {
self.p2wshaddressindex_to_addressdata.commit_(
height,
empty_p2wsh
.iter()
.filter(|(_, addressdata)| addressdata.is_from_addressdata())
.map(|(typeindex, _)| (*typeindex).into()),
p2wsh.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
});
thread::scope(|scope| {
scope.spawn(|| {
self.p2aaddressindex_to_emptyaddressdata.commit_(
height,
p2a.iter()
.filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata())
.map(|(typeindex, _)| (*typeindex).into()),
empty_p2a.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
scope.spawn(|| {
self.p2pk33addressindex_to_emptyaddressdata.commit_(
height,
p2pk33
.iter()
.filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata())
.map(|(typeindex, _)| (*typeindex).into()),
empty_p2pk33.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
scope.spawn(|| {
self.p2pk65addressindex_to_emptyaddressdata.commit_(
height,
p2pk65
.iter()
.filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata())
.map(|(typeindex, _)| (*typeindex).into()),
empty_p2pk65.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
scope.spawn(|| {
self.p2pkhaddressindex_to_emptyaddressdata.commit_(
height,
p2pkh
.iter()
.filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata())
.map(|(typeindex, _)| (*typeindex).into()),
empty_p2pkh.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
scope.spawn(|| {
self.p2shaddressindex_to_emptyaddressdata.commit_(
height,
p2sh.iter()
.filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata())
.map(|(typeindex, _)| (*typeindex).into()),
empty_p2sh.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
scope.spawn(|| {
self.p2traddressindex_to_emptyaddressdata.commit_(
height,
p2tr.iter()
.filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata())
.map(|(typeindex, _)| (*typeindex).into()),
empty_p2tr.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
scope.spawn(|| {
self.p2wpkhaddressindex_to_emptyaddressdata.commit_(
height,
p2wpkh
.iter()
.filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata())
.map(|(typeindex, _)| (*typeindex).into()),
empty_p2wpkh.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
scope.spawn(|| {
self.p2wshaddressindex_to_emptyaddressdata.commit_(
height,
p2wsh
.iter()
.filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata())
.map(|(typeindex, _)| (*typeindex).into()),
empty_p2wsh.iter().map(|(typeindex, addressdata)| {
((*typeindex).into(), addressdata.deref().clone())
}),
)
});
});
self.keyspace
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
pub fn as_slice(&self) -> [&(dyn AnyStore + Send + Sync); 16] {
[
&self.p2aaddressindex_to_addressdata,
&self.p2aaddressindex_to_emptyaddressdata,
&self.p2pk33addressindex_to_addressdata,
&self.p2pk33addressindex_to_emptyaddressdata,
&self.p2pk65addressindex_to_addressdata,
&self.p2pk65addressindex_to_emptyaddressdata,
&self.p2pkhaddressindex_to_addressdata,
&self.p2pkhaddressindex_to_emptyaddressdata,
&self.p2shaddressindex_to_addressdata,
&self.p2shaddressindex_to_emptyaddressdata,
&self.p2traddressindex_to_addressdata,
&self.p2traddressindex_to_emptyaddressdata,
&self.p2wpkhaddressindex_to_addressdata,
&self.p2wpkhaddressindex_to_emptyaddressdata,
&self.p2wshaddressindex_to_addressdata,
&self.p2wshaddressindex_to_emptyaddressdata,
]
}
fn as_mut_slice(&mut self) -> [&mut (dyn AnyStore + Send + Sync); 16] {
[
&mut self.p2aaddressindex_to_addressdata,
&mut self.p2aaddressindex_to_emptyaddressdata,
&mut self.p2pk33addressindex_to_addressdata,
&mut self.p2pk33addressindex_to_emptyaddressdata,
&mut self.p2pk65addressindex_to_addressdata,
&mut self.p2pk65addressindex_to_emptyaddressdata,
&mut self.p2pkhaddressindex_to_addressdata,
&mut self.p2pkhaddressindex_to_emptyaddressdata,
&mut self.p2shaddressindex_to_addressdata,
&mut self.p2shaddressindex_to_emptyaddressdata,
&mut self.p2traddressindex_to_addressdata,
&mut self.p2traddressindex_to_emptyaddressdata,
&mut self.p2wpkhaddressindex_to_addressdata,
&mut self.p2wpkhaddressindex_to_emptyaddressdata,
&mut self.p2wshaddressindex_to_addressdata,
&mut self.p2wshaddressindex_to_emptyaddressdata,
]
}
}

View File

@@ -1,95 +1,52 @@
use std::collections::BTreeMap;
use std::fs::OpenOptions;
use std::sync::Arc;
use std::{collections::HashMap, fs, io::BufReader, path::Path};
use bincode::decode_from_std_read;
use bincode::{Decode, Encode, config};
use brk_core::Error;
use brk_core::Result;
use parking_lot::RwLock;
use crate::PAGE_SIZE;
use super::Region;
use super::{PAGE_SIZE, Region, Regions};
#[derive(Debug)]
pub struct Layout {
file: fs::File,
id_to_index: HashMap<String, usize>,
start_to_index: BTreeMap<u64, usize>,
index_to_region: Vec<Option<Arc<RwLock<Region>>>>,
/// key: start, value: gap
start_to_hole: BTreeMap<u64, u64>,
}
impl Layout {
pub fn open(path: &Path) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.create(true)
.write(true)
.truncate(false)
.open(path)?;
impl From<&Regions> for Layout {
fn from(value: &Regions) -> Self {
let mut start_to_index = BTreeMap::new();
let mut start_to_hole = BTreeMap::new();
Ok(if file.metadata()?.len() != 0 {
let config = config::standard();
let mut prev_end = 0;
let mut reader = BufReader::new(&file);
let serialized: SerializedRegions = decode_from_std_read(&mut reader, config)?;
let mut id_to_index = HashMap::new();
let mut start_to_index = BTreeMap::new();
let mut index_to_region = vec![];
serialized.0.into_iter().for_each(|(str, region)| {
let index = index_to_region.len();
id_to_index.insert(str, index);
start_to_index.insert(region.start(), index);
index_to_region.push(Some(Arc::new(RwLock::new(region))));
value
.as_array()
.into_iter()
.enumerate()
.flat_map(|(index, opt)| opt.as_ref().map(|region| (index, region)))
.for_each(|(index, region)| {
let region = region.read();
let start = region.start();
start_to_index.insert(start, index);
if prev_end != start {
start_to_hole.insert(prev_end, start - prev_end);
}
let reserved = region.reserved();
prev_end = start + reserved;
});
Self {
file,
id_to_index,
start_to_index,
index_to_region,
start_to_hole: BTreeMap::new(),
}
} else {
Self {
file,
id_to_index: HashMap::new(),
index_to_region: Vec::new(),
start_to_index: BTreeMap::new(),
start_to_hole: BTreeMap::new(),
}
})
Self {
start_to_index,
start_to_hole,
}
}
}
pub fn get_region_from_index(&self, index: usize) -> Option<Arc<RwLock<Region>>> {
self.index_to_region.get(index).cloned().flatten()
}
pub fn get_region_index_from_id(&self, id: String) -> Option<usize> {
self.id_to_index.get(&id).copied()
}
pub fn create_region_from_hole(&mut self, id: String) -> Option<usize> {
let index = self.index_to_region.len();
let start = self.find_smallest_adequate_hole(PAGE_SIZE)?;
self.remove_or_compress_hole_to_right(start, PAGE_SIZE);
self.id_to_index.insert(id, index);
self.start_to_index.insert(start, index);
self.index_to_region
.push(Some(Arc::new(RwLock::new(Region::new(
start, PAGE_SIZE, PAGE_SIZE,
)))));
Some(index)
impl Layout {
pub fn get_last_region(&self) -> Option<usize> {
self.start_to_index
.last_key_value()
.map(|(_, index)| *index)
}
pub fn find_smallest_adequate_hole(&self, reserved: u64) -> Option<u64> {
@@ -102,44 +59,23 @@ impl Layout {
.map(|(_, s)| *s)
}
pub fn push_region(&mut self, id: String) -> (usize, Region) {
let index = self.index_to_region.len();
self.id_to_index.insert(id, index);
let start = self
.start_to_index
.last_key_value()
.map(|(_, index)| {
let region = self
.index_to_region
.get(*index)
.unwrap()
.as_ref()
.unwrap()
.read();
region.start() + region.reserved()
})
.unwrap_or_default();
let region = Region::new(start, PAGE_SIZE, PAGE_SIZE);
self.index_to_region
.push(Some(Arc::new(RwLock::new(region.clone()))));
(index, region)
pub fn insert_region(&mut self, start: u64, index: usize) {
assert!(self.start_to_index.insert(start, index).is_none())
}
pub fn remove_region(&mut self, index: usize) -> Option<Arc<RwLock<Region>>> {
let region = self.index_to_region.get_mut(index).and_then(Option::take)?;
pub fn remove_region(&mut self, index: usize, region: &Region) -> Result<()> {
let start = region.start();
let reserved = region.reserved();
self.id_to_index
.remove(&self.find_id_from_index(index).unwrap().to_owned());
self.start_to_index.remove(&region.read().start());
let lock = region.read();
let start = lock.start();
let reserved = lock.reserved();
if self
.start_to_index
.remove(&start)
.is_none_or(|index_| index != index_)
{
return Err(Error::Str(
"Something went wrong, indexes of removed region should be the same",
));
}
if self
.widen_hole_to_the_left_if_any(start + reserved, reserved)
@@ -150,9 +86,21 @@ impl Layout {
self.widen_hole_to_the_right_if_any(hole_start, reserved);
}
drop(lock);
if self
.start_to_index
.keys()
.last()
.is_none_or(|&region_start| {
self.start_to_hole
.keys()
.last()
.is_some_and(|&hole_start| hole_start > region_start)
})
{
self.start_to_hole.pop_last();
}
Some(region)
Ok(())
}
pub fn get_hole(&self, start: u64) -> Option<u64> {
@@ -209,17 +157,4 @@ impl Layout {
Some(start)
}
fn find_id_from_index(&self, index: usize) -> Option<&String> {
Some(
self.id_to_index
.iter()
.find(|(_, v)| **v == index)
.unwrap()
.0,
)
}
}
#[derive(Debug, Encode, Decode)]
struct SerializedRegions(HashMap<String, Region>);

View File

@@ -23,6 +23,7 @@ use regions::*;
pub const PAGE_SIZE: u64 = 4096;
pub struct File {
regions: RwLock<Regions>,
layout: RwLock<Layout>,
file: RwLock<fs::File>,
mmap: RwLock<MmapMut>,
@@ -32,20 +33,22 @@ impl File {
pub fn open(path: &Path) -> Result<Self> {
fs::create_dir_all(path)?;
let layout = Layout::open(&path.join("layout.dat"))?;
let regions = Regions::open(path)?;
let layout = Layout::from(&regions);
let file = OpenOptions::new()
.read(true)
.create(true)
.write(true)
.truncate(false)
.open(path.join("data.dat"))?;
.open(path.join("data"))?;
let mmap = Self::mmap(&file)?;
Ok(Self {
file: RwLock::new(file),
mmap: RwLock::new(mmap),
regions: RwLock::new(regions),
layout: RwLock::new(layout),
})
}
@@ -65,15 +68,32 @@ impl File {
}
pub fn get_or_create(&self, id: String) -> Result<usize> {
if let Some(index) = self.layout.read().get_region_index_from_id(id.clone()) {
if let Some(index) = self.regions.read().get_region_index_from_id(id.clone()) {
return Ok(index);
}
let mut regions = self.regions.write();
let mut layout = self.layout.write();
if let Some(index) = layout.create_region_from_hole(id.clone()) {
return Ok(index);
}
let (index, region) = layout.push_region(id);
self.set_min_len(region.start() + region.reserved())?;
let start = if let Some(start) = layout.find_smallest_adequate_hole(PAGE_SIZE) {
layout.remove_or_compress_hole_to_right(start, PAGE_SIZE);
start
} else {
let start = layout
.get_last_region()
.map(|index| {
let region_opt = regions.get_region_from_index(index);
let region = region_opt.as_ref().unwrap().read();
region.start() + region.reserved()
})
.unwrap_or_default();
self.set_min_len(start + PAGE_SIZE)?;
start
};
let index = regions.create_region(id, start)?;
layout.insert_region(start, index);
Ok(index)
}
@@ -81,7 +101,7 @@ impl File {
let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read();
let region: RwLockReadGuard<'static, Region> = unsafe {
std::mem::transmute(
self.layout
self.regions
.read()
.get_region_from_index(index)
.ok_or(Error::Str("Unknown region"))?
@@ -102,7 +122,7 @@ impl File {
}
fn write_all_at_(&mut self, region: usize, data: &[u8], at: Option<u64>) -> Result<()> {
let Some(region) = self.layout.read().get_region_from_index(region) else {
let Some(region) = self.regions.read().get_region_from_index(region) else {
return Err(Error::Str("Unknown region"));
};
let region_lock = region.read();
@@ -148,7 +168,7 @@ impl File {
let reserved = reserved * 2;
// Find hole big enough to move the region or the next depending on which is smaller to if possible
// Find hole big enough to move the current region or the next region depending on which is smaller to if possible
if let Some(hole_start) = layout_lock.find_smallest_adequate_hole(reserved) {
layout_lock.remove_or_compress_hole_to_right(hole_start, reserved);
// TODO: Before every drop of layout.write flush to disk
@@ -187,14 +207,17 @@ impl File {
let start = start as usize;
let end = start + data_len;
if end > mmap.len() {
unreachable!("Trying to write beyond mmap")
}
let slice = unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) };
slice[start..end].copy_from_slice(data);
}
pub fn truncate(&self, index: usize, from: u64) -> Result<()> {
let layout = self.layout.read();
let Some(region) = layout.get_region_from_index(index) else {
let Some(region) = self.regions.read().get_region_from_index(index) else {
return Err(Error::Str("Unknown region"));
};
let mut region_ = region.write();
@@ -216,11 +239,13 @@ impl File {
}
pub fn remove(&self, index: usize) -> Result<Option<Arc<RwLock<Region>>>> {
let mut regions = self.regions.write();
let mut layout = self.layout.write();
let Some(region) = layout.remove_region(index) else {
let Some(region) = regions.remove_region(index)? else {
return Ok(None);
};
let region_ = region.write();
layout.remove_region(index, &region_)?;
self.punch_hole(region_.start(), region_.len())?;
drop(region_);
Ok(Some(region))

View File

@@ -1,8 +1,9 @@
use bincode::{Decode, Encode};
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::PAGE_SIZE;
#[derive(Debug, Clone, Encode, Decode)]
#[derive(Debug, Clone, FromBytes, IntoBytes, Immutable, KnownLayout)]
#[repr(C)]
pub struct Region {
/// Must be multiple of 4096
start: u64,
@@ -11,6 +12,8 @@ pub struct Region {
reserved: u64,
}
pub const SIZE_OF_REGION: usize = size_of::<Region>();
impl Region {
pub fn new(start: u64, length: u64, reserved: u64) -> Self {
assert!(reserved > 0);

View File

@@ -1,14 +1,163 @@
use std::{
collections::HashMap,
fs,
sync::{Arc, RwLock},
fs::{self, OpenOptions},
io::{BufReader, BufWriter},
path::Path,
sync::Arc,
};
use crate::file::region::Region;
use bincode::{decode_from_std_read, encode_into_std_write};
use brk_core::{Error, Result};
use memmap2::MmapMut;
use parking_lot::RwLock;
use zerocopy::{FromBytes, IntoBytes};
use crate::{
PAGE_SIZE,
file::region::{Region, SIZE_OF_REGION},
};
#[derive(Debug)]
pub struct Regions {
file: fs::File,
id_to_index: HashMap<String, usize>,
id_to_index_file: fs::File,
index_to_region: Vec<Option<Arc<RwLock<Region>>>>,
index_to_region_file: fs::File,
index_to_region_mmap: MmapMut,
}
impl Regions {
pub fn open(path: &Path) -> Result<Self> {
let path = path.join("regions");
let id_to_index_file = OpenOptions::new()
.read(true)
.create(true)
.write(true)
.truncate(false)
.open(path.join("id_to_index"))?;
let mut reader = BufReader::new(&id_to_index_file);
let id_to_index: HashMap<String, usize> =
decode_from_std_read(&mut reader, bincode::config::standard())?;
let index_to_region_file = OpenOptions::new()
.read(true)
.create(true)
.write(true)
.truncate(false)
.open(path.join("index_to_region"))?;
let index_to_region_mmap = unsafe { MmapMut::map_mut(&index_to_region_file)? };
let mut index_to_region: Vec<Option<Arc<RwLock<Region>>>> = vec![];
id_to_index
.iter()
.try_for_each(|(_, &index)| -> Result<()> {
let start = index * SIZE_OF_REGION;
let end = start + SIZE_OF_REGION;
let region = Region::read_from_bytes(&index_to_region_mmap[start..end])?;
if index_to_region.len() < index + 1 {
index_to_region.resize_with(index + 1, Default::default);
}
index_to_region
.get_mut(index)
.unwrap()
.replace(Arc::new(RwLock::new(region)));
Ok(())
})?;
// TODO: Removes Nones from vec if needed, update map accordingly and save them
Ok(Self {
id_to_index,
id_to_index_file,
index_to_region,
index_to_region_file,
index_to_region_mmap,
})
}
pub fn create_region(&mut self, id: String, start: u64) -> Result<usize> {
let index = self
.index_to_region
.iter()
.enumerate()
.find(|(_, opt)| opt.is_none())
.map(|(index, _)| index)
.unwrap_or_else(|| self.index_to_region.len());
let region = Region::new(start, PAGE_SIZE, PAGE_SIZE);
self.index_to_region
.push(Some(Arc::new(RwLock::new(region.clone()))));
let end = index * SIZE_OF_REGION + SIZE_OF_REGION;
if self.index_to_region_mmap.len() < end {
self.index_to_region_file.set_len(end as u64);
self.index_to_region_mmap = unsafe { MmapMut::map_mut(&self.index_to_region_file)? };
}
self.write_to_mmap(&region, index);
if self.id_to_index.insert(id, index).is_some() {
return Err(Error::Str("Already exists"));
}
self.flush_id_to_index()?;
Ok(index)
}
pub fn remove_region(&mut self, index: usize) -> Result<Option<Arc<RwLock<Region>>>> {
let Some(region) = self.index_to_region.get_mut(index).and_then(Option::take) else {
return Ok(None);
};
self.id_to_index
.remove(&self.find_id_from_index(index).unwrap().to_owned());
self.flush_id_to_index()?;
Ok(Some(region))
}
fn flush_id_to_index(&mut self) -> Result<()> {
let mut writer = BufWriter::new(&mut self.id_to_index_file);
encode_into_std_write(&self.id_to_index, &mut writer, bincode::config::standard())?;
Ok(())
}
pub fn get_region_from_index(&self, index: usize) -> Option<Arc<RwLock<Region>>> {
self.index_to_region.get(index).cloned().flatten()
}
pub fn get_region_index_from_id(&self, id: String) -> Option<usize> {
self.id_to_index.get(&id).copied()
}
fn find_id_from_index(&self, index: usize) -> Option<&String> {
Some(
self.id_to_index
.iter()
.find(|(_, v)| **v == index)
.unwrap()
.0,
)
}
pub fn as_array(&self) -> &[Option<Arc<RwLock<Region>>>] {
&self.index_to_region
}
fn write_to_mmap(&self, region: &Region, index: usize) {
let start = index * SIZE_OF_REGION;
let end = start + SIZE_OF_REGION;
let mmap = &self.index_to_region_mmap;
if end > mmap.len() {
unreachable!("Trying to write beyond mmap")
}
let slice = unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) };
slice[start..end].copy_from_slice(region.as_bytes());
}
}