diff --git a/crates/brk_computer/src/states/price_to_amount.rs b/crates/brk_computer/src/states/price_to_amount.rs index 493b76336..8d6d0b8bc 100644 --- a/crates/brk_computer/src/states/price_to_amount.rs +++ b/crates/brk_computer/src/states/price_to_amount.rs @@ -87,12 +87,12 @@ impl PriceToAmount { self.height = Some(height); height.write(&self.path_height())?; - let config = config::standard(); let file = File::create(self.path_state()).inspect_err(|_| { dbg!(self.path_state()); })?; let mut writer = BufWriter::new(file); - encode_into_std_write(&self.state, &mut writer, config)?; + encode_into_std_write(&self.state, &mut writer, config::standard())?; + Ok(()) } diff --git a/crates/brk_computer/src/stores.rs b/crates/brk_computer/src/stores.rs deleted file mode 100644 index 572a8d639..000000000 --- a/crates/brk_computer/src/stores.rs +++ /dev/null @@ -1,641 +0,0 @@ -use std::{path::Path, thread}; - -use brk_core::{ - AddressData, ByAddressType, EmptyAddressData, Height, OutputType, P2AAddressIndex, - P2PK33AddressIndex, P2PK65AddressIndex, P2PKHAddressIndex, P2SHAddressIndex, P2TRAddressIndex, - P2WPKHAddressIndex, P2WSHAddressIndex, Result, TypeIndex, Version, -}; -use brk_store::{AnyStore, Store}; -use fjall::{PersistMode, TransactionalKeyspace}; -use log::info; - -use crate::stateful::{AddressTypeToTypeIndexTree, WithAddressDataSource}; - -const VERSION: Version = Version::ZERO; - -#[derive(Clone)] -pub struct Stores { - keyspace: TransactionalKeyspace, - - pub p2aaddressindex_to_addressdata: Store, - pub p2aaddressindex_to_emptyaddressdata: Store, - pub p2pk33addressindex_to_addressdata: Store, - pub p2pk33addressindex_to_emptyaddressdata: Store, - pub p2pk65addressindex_to_addressdata: Store, - pub p2pk65addressindex_to_emptyaddressdata: Store, - pub p2pkhaddressindex_to_addressdata: Store, - pub p2pkhaddressindex_to_emptyaddressdata: Store, - pub p2shaddressindex_to_addressdata: Store, - pub p2shaddressindex_to_emptyaddressdata: Store, - pub p2traddressindex_to_addressdata: Store, - pub p2traddressindex_to_emptyaddressdata: Store, - pub p2wpkhaddressindex_to_addressdata: Store, - pub p2wpkhaddressindex_to_emptyaddressdata: Store, - pub p2wshaddressindex_to_addressdata: Store, - pub p2wshaddressindex_to_emptyaddressdata: Store, -} - -impl Stores { - pub fn import( - path: &Path, - version: Version, - keyspace: &TransactionalKeyspace, - ) -> color_eyre::Result { - let ( - (p2aaddressindex_to_addressdata, p2aaddressindex_to_emptyaddressdata), - (p2pk33addressindex_to_addressdata, p2pk33addressindex_to_emptyaddressdata), - (p2pk65addressindex_to_addressdata, p2pk65addressindex_to_emptyaddressdata), - (p2pkhaddressindex_to_addressdata, p2pkhaddressindex_to_emptyaddressdata), - (p2shaddressindex_to_addressdata, p2shaddressindex_to_emptyaddressdata), - (p2traddressindex_to_addressdata, p2traddressindex_to_emptyaddressdata), - (p2wpkhaddressindex_to_addressdata, p2wpkhaddressindex_to_emptyaddressdata), - (p2wshaddressindex_to_addressdata, p2wshaddressindex_to_emptyaddressdata), - ) = thread::scope(|scope| { - let p2a = scope.spawn(|| { - ( - Store::import( - keyspace, - path, - "p2aaddressindex_to_addressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - Store::import( - keyspace, - path, - "p2aaddressindex_to_emptyaddressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - ) - }); - - let p2pk33 = scope.spawn(|| { - ( - Store::import( - keyspace, - path, - "p2pk33addressindex_to_addressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - Store::import( - keyspace, - path, - "p2pk33addressindex_to_emptyaddressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - ) - }); - - let p2pk65 = scope.spawn(|| { - ( - Store::import( - keyspace, - path, - "p2pk65addressindex_to_addressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - Store::import( - keyspace, - path, - "p2pk65addressindex_to_emptyaddressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - ) - }); - - let p2pkh = scope.spawn(|| { - ( - Store::import( - keyspace, - path, - "p2pkhaddressindex_to_addressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - Store::import( - keyspace, - path, - "p2pkhaddressindex_to_emptyaddressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - ) - }); - - let p2sh = scope.spawn(|| { - ( - Store::import( - keyspace, - path, - "p2shaddressindex_to_addressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - Store::import( - keyspace, - path, - "p2shaddressindex_to_emptyaddressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - ) - }); - - let p2tr = scope.spawn(|| { - ( - Store::import( - keyspace, - path, - "p2traddressindex_to_addressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - Store::import( - keyspace, - path, - "p2traddressindex_to_emptyaddressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - ) - }); - - let p2wpkh = scope.spawn(|| { - ( - Store::import( - keyspace, - path, - "p2wpkhaddressindex_to_addressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - Store::import( - keyspace, - path, - "p2wpkhaddressindex_to_emptyaddressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - ) - }); - - let p2wsh = scope.spawn(|| { - ( - Store::import( - keyspace, - path, - "p2wshaddressindex_to_addressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - Store::import( - keyspace, - path, - "p2wshaddressindex_to_emptyaddressdata", - version + VERSION + Version::ZERO, - None, - ) - .unwrap(), - ) - }); - - ( - p2a.join().unwrap(), - p2pk33.join().unwrap(), - p2pk65.join().unwrap(), - p2pkh.join().unwrap(), - p2sh.join().unwrap(), - p2tr.join().unwrap(), - p2wpkh.join().unwrap(), - p2wsh.join().unwrap(), - ) - }); - - Ok(Self { - keyspace: keyspace.clone(), - - p2aaddressindex_to_addressdata, - p2aaddressindex_to_emptyaddressdata, - - p2pk33addressindex_to_addressdata, - p2pk33addressindex_to_emptyaddressdata, - - p2pk65addressindex_to_addressdata, - p2pk65addressindex_to_emptyaddressdata, - - p2pkhaddressindex_to_addressdata, - p2pkhaddressindex_to_emptyaddressdata, - - p2shaddressindex_to_addressdata, - p2shaddressindex_to_emptyaddressdata, - - p2traddressindex_to_addressdata, - p2traddressindex_to_emptyaddressdata, - - p2wpkhaddressindex_to_addressdata, - p2wpkhaddressindex_to_emptyaddressdata, - - p2wshaddressindex_to_addressdata, - p2wshaddressindex_to_emptyaddressdata, - }) - } - - pub fn starting_height(&self) -> Height { - self.as_slice() - .into_iter() - .map(|store| store.height().map(Height::incremented).unwrap_or_default()) - .min() - .unwrap() - } - - pub fn reset(&mut self) -> Result<()> { - info!("Resetting stores..."); - info!("> If it gets stuck here, stop the program and start it again"); - - self.as_mut_slice() - .into_iter() - .try_for_each(|store| store.reset())?; - - self.keyspace - .persist(PersistMode::SyncAll) - .map_err(|e| e.into()) - } - - pub fn get_addressdata( - &self, - address_type: OutputType, - type_index: TypeIndex, - ) -> Result> { - Ok(match address_type { - OutputType::P2A => self - .p2aaddressindex_to_addressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2PK33 => self - .p2pk33addressindex_to_addressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2PK65 => self - .p2pk65addressindex_to_addressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2PKH => self - .p2pkhaddressindex_to_addressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2SH => self - .p2shaddressindex_to_addressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2TR => self - .p2traddressindex_to_addressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2WPKH => self - .p2wpkhaddressindex_to_addressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2WSH => self - .p2wshaddressindex_to_addressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - _ => unreachable!(), - }) - } - - pub fn get_emptyaddressdata( - &self, - address_type: OutputType, - type_index: TypeIndex, - ) -> Result> { - Ok(match address_type { - OutputType::P2A => self - .p2aaddressindex_to_emptyaddressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2PK33 => self - .p2pk33addressindex_to_emptyaddressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2PK65 => self - .p2pk65addressindex_to_emptyaddressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2PKH => self - .p2pkhaddressindex_to_emptyaddressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2SH => self - .p2shaddressindex_to_emptyaddressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2TR => self - .p2traddressindex_to_emptyaddressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2WPKH => self - .p2wpkhaddressindex_to_emptyaddressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - OutputType::P2WSH => self - .p2wshaddressindex_to_emptyaddressdata - .get(&type_index.into())? - .map(|c| c.into_owned()), - _ => unreachable!(), - }) - } - - pub fn commit( - &mut self, - height: Height, - addresstype_to_typeindex_to_addressdata: AddressTypeToTypeIndexTree< - WithAddressDataSource, - >, - addresstype_to_typeindex_to_emptyaddressdata: AddressTypeToTypeIndexTree< - WithAddressDataSource, - >, - ) -> Result<()> { - let ByAddressType { - p2pk65, - p2pk33, - p2pkh, - p2sh, - p2wpkh, - p2wsh, - p2tr, - p2a, - } = addresstype_to_typeindex_to_addressdata.unwrap(); - - let ByAddressType { - p2pk65: empty_p2pk65, - p2pk33: empty_p2pk33, - p2pkh: empty_p2pkh, - p2sh: empty_p2sh, - p2wpkh: empty_p2wpkh, - p2wsh: empty_p2wsh, - p2tr: empty_p2tr, - p2a: empty_p2a, - } = addresstype_to_typeindex_to_emptyaddressdata.unwrap(); - - thread::scope(|s| { - s.spawn(|| { - self.p2aaddressindex_to_addressdata.commit_( - height, - empty_p2a - .iter() - .filter(|(_, addressdata)| addressdata.is_from_addressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - p2a.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - s.spawn(|| { - self.p2pk33addressindex_to_addressdata.commit_( - height, - empty_p2pk33 - .iter() - .filter(|(_, addressdata)| addressdata.is_from_addressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - p2pk33.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - s.spawn(|| { - self.p2pk65addressindex_to_addressdata.commit_( - height, - empty_p2pk65 - .iter() - .filter(|(_, addressdata)| addressdata.is_from_addressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - p2pk65.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - s.spawn(|| { - self.p2pkhaddressindex_to_addressdata.commit_( - height, - empty_p2pkh - .iter() - .filter(|(_, addressdata)| addressdata.is_from_addressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - p2pkh.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - s.spawn(|| { - self.p2shaddressindex_to_addressdata.commit_( - height, - empty_p2sh - .iter() - .filter(|(_, addressdata)| addressdata.is_from_addressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - p2sh.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - s.spawn(|| { - self.p2traddressindex_to_addressdata.commit_( - height, - empty_p2tr - .iter() - .filter(|(_, addressdata)| addressdata.is_from_addressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - p2tr.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - s.spawn(|| { - self.p2wpkhaddressindex_to_addressdata.commit_( - height, - empty_p2wpkh - .iter() - .filter(|(_, addressdata)| addressdata.is_from_addressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - p2wpkh.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - s.spawn(|| { - self.p2wshaddressindex_to_addressdata.commit_( - height, - empty_p2wsh - .iter() - .filter(|(_, addressdata)| addressdata.is_from_addressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - p2wsh.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - }); - - thread::scope(|scope| { - scope.spawn(|| { - self.p2aaddressindex_to_emptyaddressdata.commit_( - height, - p2a.iter() - .filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - empty_p2a.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - scope.spawn(|| { - self.p2pk33addressindex_to_emptyaddressdata.commit_( - height, - p2pk33 - .iter() - .filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - empty_p2pk33.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - scope.spawn(|| { - self.p2pk65addressindex_to_emptyaddressdata.commit_( - height, - p2pk65 - .iter() - .filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - empty_p2pk65.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - scope.spawn(|| { - self.p2pkhaddressindex_to_emptyaddressdata.commit_( - height, - p2pkh - .iter() - .filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - empty_p2pkh.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - scope.spawn(|| { - self.p2shaddressindex_to_emptyaddressdata.commit_( - height, - p2sh.iter() - .filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - empty_p2sh.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - scope.spawn(|| { - self.p2traddressindex_to_emptyaddressdata.commit_( - height, - p2tr.iter() - .filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - empty_p2tr.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - scope.spawn(|| { - self.p2wpkhaddressindex_to_emptyaddressdata.commit_( - height, - p2wpkh - .iter() - .filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - empty_p2wpkh.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - scope.spawn(|| { - self.p2wshaddressindex_to_emptyaddressdata.commit_( - height, - p2wsh - .iter() - .filter(|(_, addressdata)| addressdata.is_from_emptyaddressdata()) - .map(|(typeindex, _)| (*typeindex).into()), - empty_p2wsh.iter().map(|(typeindex, addressdata)| { - ((*typeindex).into(), addressdata.deref().clone()) - }), - ) - }); - }); - - self.keyspace - .persist(PersistMode::SyncAll) - .map_err(|e| e.into()) - } - - pub fn as_slice(&self) -> [&(dyn AnyStore + Send + Sync); 16] { - [ - &self.p2aaddressindex_to_addressdata, - &self.p2aaddressindex_to_emptyaddressdata, - &self.p2pk33addressindex_to_addressdata, - &self.p2pk33addressindex_to_emptyaddressdata, - &self.p2pk65addressindex_to_addressdata, - &self.p2pk65addressindex_to_emptyaddressdata, - &self.p2pkhaddressindex_to_addressdata, - &self.p2pkhaddressindex_to_emptyaddressdata, - &self.p2shaddressindex_to_addressdata, - &self.p2shaddressindex_to_emptyaddressdata, - &self.p2traddressindex_to_addressdata, - &self.p2traddressindex_to_emptyaddressdata, - &self.p2wpkhaddressindex_to_addressdata, - &self.p2wpkhaddressindex_to_emptyaddressdata, - &self.p2wshaddressindex_to_addressdata, - &self.p2wshaddressindex_to_emptyaddressdata, - ] - } - - fn as_mut_slice(&mut self) -> [&mut (dyn AnyStore + Send + Sync); 16] { - [ - &mut self.p2aaddressindex_to_addressdata, - &mut self.p2aaddressindex_to_emptyaddressdata, - &mut self.p2pk33addressindex_to_addressdata, - &mut self.p2pk33addressindex_to_emptyaddressdata, - &mut self.p2pk65addressindex_to_addressdata, - &mut self.p2pk65addressindex_to_emptyaddressdata, - &mut self.p2pkhaddressindex_to_addressdata, - &mut self.p2pkhaddressindex_to_emptyaddressdata, - &mut self.p2shaddressindex_to_addressdata, - &mut self.p2shaddressindex_to_emptyaddressdata, - &mut self.p2traddressindex_to_addressdata, - &mut self.p2traddressindex_to_emptyaddressdata, - &mut self.p2wpkhaddressindex_to_addressdata, - &mut self.p2wpkhaddressindex_to_emptyaddressdata, - &mut self.p2wshaddressindex_to_addressdata, - &mut self.p2wshaddressindex_to_emptyaddressdata, - ] - } -} diff --git a/crates/brk_vecs/src/file/layout.rs b/crates/brk_vecs/src/file/layout.rs index 55ded5140..d3a78f215 100644 --- a/crates/brk_vecs/src/file/layout.rs +++ b/crates/brk_vecs/src/file/layout.rs @@ -1,95 +1,52 @@ use std::collections::BTreeMap; -use std::fs::OpenOptions; -use std::sync::Arc; -use std::{collections::HashMap, fs, io::BufReader, path::Path}; -use bincode::decode_from_std_read; -use bincode::{Decode, Encode, config}; +use brk_core::Error; use brk_core::Result; -use parking_lot::RwLock; -use crate::PAGE_SIZE; - -use super::Region; +use super::{PAGE_SIZE, Region, Regions}; #[derive(Debug)] pub struct Layout { - file: fs::File, - id_to_index: HashMap, start_to_index: BTreeMap, - index_to_region: Vec>>>, /// key: start, value: gap start_to_hole: BTreeMap, } -impl Layout { - pub fn open(path: &Path) -> Result { - let file = OpenOptions::new() - .read(true) - .create(true) - .write(true) - .truncate(false) - .open(path)?; +impl From<&Regions> for Layout { + fn from(value: &Regions) -> Self { + let mut start_to_index = BTreeMap::new(); + let mut start_to_hole = BTreeMap::new(); - Ok(if file.metadata()?.len() != 0 { - let config = config::standard(); + let mut prev_end = 0; - let mut reader = BufReader::new(&file); - let serialized: SerializedRegions = decode_from_std_read(&mut reader, config)?; - - let mut id_to_index = HashMap::new(); - let mut start_to_index = BTreeMap::new(); - let mut index_to_region = vec![]; - - serialized.0.into_iter().for_each(|(str, region)| { - let index = index_to_region.len(); - id_to_index.insert(str, index); - start_to_index.insert(region.start(), index); - index_to_region.push(Some(Arc::new(RwLock::new(region)))); + value + .as_array() + .into_iter() + .enumerate() + .flat_map(|(index, opt)| opt.as_ref().map(|region| (index, region))) + .for_each(|(index, region)| { + let region = region.read(); + let start = region.start(); + start_to_index.insert(start, index); + if prev_end != start { + start_to_hole.insert(prev_end, start - prev_end); + } + let reserved = region.reserved(); + prev_end = start + reserved; }); - Self { - file, - id_to_index, - start_to_index, - index_to_region, - start_to_hole: BTreeMap::new(), - } - } else { - Self { - file, - id_to_index: HashMap::new(), - index_to_region: Vec::new(), - start_to_index: BTreeMap::new(), - start_to_hole: BTreeMap::new(), - } - }) + Self { + start_to_index, + start_to_hole, + } } +} - pub fn get_region_from_index(&self, index: usize) -> Option>> { - self.index_to_region.get(index).cloned().flatten() - } - - pub fn get_region_index_from_id(&self, id: String) -> Option { - self.id_to_index.get(&id).copied() - } - - pub fn create_region_from_hole(&mut self, id: String) -> Option { - let index = self.index_to_region.len(); - - let start = self.find_smallest_adequate_hole(PAGE_SIZE)?; - - self.remove_or_compress_hole_to_right(start, PAGE_SIZE); - - self.id_to_index.insert(id, index); - self.start_to_index.insert(start, index); - - self.index_to_region - .push(Some(Arc::new(RwLock::new(Region::new( - start, PAGE_SIZE, PAGE_SIZE, - ))))); - - Some(index) +impl Layout { + pub fn get_last_region(&self) -> Option { + self.start_to_index + .last_key_value() + .map(|(_, index)| *index) } pub fn find_smallest_adequate_hole(&self, reserved: u64) -> Option { @@ -102,44 +59,23 @@ impl Layout { .map(|(_, s)| *s) } - pub fn push_region(&mut self, id: String) -> (usize, Region) { - let index = self.index_to_region.len(); - - self.id_to_index.insert(id, index); - - let start = self - .start_to_index - .last_key_value() - .map(|(_, index)| { - let region = self - .index_to_region - .get(*index) - .unwrap() - .as_ref() - .unwrap() - .read(); - region.start() + region.reserved() - }) - .unwrap_or_default(); - - let region = Region::new(start, PAGE_SIZE, PAGE_SIZE); - - self.index_to_region - .push(Some(Arc::new(RwLock::new(region.clone())))); - - (index, region) + pub fn insert_region(&mut self, start: u64, index: usize) { + assert!(self.start_to_index.insert(start, index).is_none()) } - pub fn remove_region(&mut self, index: usize) -> Option>> { - let region = self.index_to_region.get_mut(index).and_then(Option::take)?; + pub fn remove_region(&mut self, index: usize, region: &Region) -> Result<()> { + let start = region.start(); + let reserved = region.reserved(); - self.id_to_index - .remove(&self.find_id_from_index(index).unwrap().to_owned()); - self.start_to_index.remove(®ion.read().start()); - - let lock = region.read(); - let start = lock.start(); - let reserved = lock.reserved(); + if self + .start_to_index + .remove(&start) + .is_none_or(|index_| index != index_) + { + return Err(Error::Str( + "Something went wrong, indexes of removed region should be the same", + )); + } if self .widen_hole_to_the_left_if_any(start + reserved, reserved) @@ -150,9 +86,21 @@ impl Layout { self.widen_hole_to_the_right_if_any(hole_start, reserved); } - drop(lock); + if self + .start_to_index + .keys() + .last() + .is_none_or(|®ion_start| { + self.start_to_hole + .keys() + .last() + .is_some_and(|&hole_start| hole_start > region_start) + }) + { + self.start_to_hole.pop_last(); + } - Some(region) + Ok(()) } pub fn get_hole(&self, start: u64) -> Option { @@ -209,17 +157,4 @@ impl Layout { Some(start) } - - fn find_id_from_index(&self, index: usize) -> Option<&String> { - Some( - self.id_to_index - .iter() - .find(|(_, v)| **v == index) - .unwrap() - .0, - ) - } } - -#[derive(Debug, Encode, Decode)] -struct SerializedRegions(HashMap); diff --git a/crates/brk_vecs/src/file/mod.rs b/crates/brk_vecs/src/file/mod.rs index f6d0f9133..10f1b0ed2 100644 --- a/crates/brk_vecs/src/file/mod.rs +++ b/crates/brk_vecs/src/file/mod.rs @@ -23,6 +23,7 @@ use regions::*; pub const PAGE_SIZE: u64 = 4096; pub struct File { + regions: RwLock, layout: RwLock, file: RwLock, mmap: RwLock, @@ -32,20 +33,22 @@ impl File { pub fn open(path: &Path) -> Result { fs::create_dir_all(path)?; - let layout = Layout::open(&path.join("layout.dat"))?; + let regions = Regions::open(path)?; + let layout = Layout::from(®ions); let file = OpenOptions::new() .read(true) .create(true) .write(true) .truncate(false) - .open(path.join("data.dat"))?; + .open(path.join("data"))?; let mmap = Self::mmap(&file)?; Ok(Self { file: RwLock::new(file), mmap: RwLock::new(mmap), + regions: RwLock::new(regions), layout: RwLock::new(layout), }) } @@ -65,15 +68,32 @@ impl File { } pub fn get_or_create(&self, id: String) -> Result { - if let Some(index) = self.layout.read().get_region_index_from_id(id.clone()) { + if let Some(index) = self.regions.read().get_region_index_from_id(id.clone()) { return Ok(index); } + let mut regions = self.regions.write(); let mut layout = self.layout.write(); - if let Some(index) = layout.create_region_from_hole(id.clone()) { - return Ok(index); - } - let (index, region) = layout.push_region(id); - self.set_min_len(region.start() + region.reserved())?; + + let start = if let Some(start) = layout.find_smallest_adequate_hole(PAGE_SIZE) { + layout.remove_or_compress_hole_to_right(start, PAGE_SIZE); + start + } else { + let start = layout + .get_last_region() + .map(|index| { + let region_opt = regions.get_region_from_index(index); + let region = region_opt.as_ref().unwrap().read(); + region.start() + region.reserved() + }) + .unwrap_or_default(); + self.set_min_len(start + PAGE_SIZE)?; + start + }; + + let index = regions.create_region(id, start)?; + + layout.insert_region(start, index); + Ok(index) } @@ -81,7 +101,7 @@ impl File { let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read(); let region: RwLockReadGuard<'static, Region> = unsafe { std::mem::transmute( - self.layout + self.regions .read() .get_region_from_index(index) .ok_or(Error::Str("Unknown region"))? @@ -102,7 +122,7 @@ impl File { } fn write_all_at_(&mut self, region: usize, data: &[u8], at: Option) -> Result<()> { - let Some(region) = self.layout.read().get_region_from_index(region) else { + let Some(region) = self.regions.read().get_region_from_index(region) else { return Err(Error::Str("Unknown region")); }; let region_lock = region.read(); @@ -148,7 +168,7 @@ impl File { let reserved = reserved * 2; - // Find hole big enough to move the region or the next depending on which is smaller to if possible + // Find hole big enough to move the current region or the next region depending on which is smaller to if possible if let Some(hole_start) = layout_lock.find_smallest_adequate_hole(reserved) { layout_lock.remove_or_compress_hole_to_right(hole_start, reserved); // TODO: Before every drop of layout.write flush to disk @@ -187,14 +207,17 @@ impl File { let start = start as usize; let end = start + data_len; + if end > mmap.len() { + unreachable!("Trying to write beyond mmap") + } + let slice = unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) }; slice[start..end].copy_from_slice(data); } pub fn truncate(&self, index: usize, from: u64) -> Result<()> { - let layout = self.layout.read(); - let Some(region) = layout.get_region_from_index(index) else { + let Some(region) = self.regions.read().get_region_from_index(index) else { return Err(Error::Str("Unknown region")); }; let mut region_ = region.write(); @@ -216,11 +239,13 @@ impl File { } pub fn remove(&self, index: usize) -> Result>>> { + let mut regions = self.regions.write(); let mut layout = self.layout.write(); - let Some(region) = layout.remove_region(index) else { + let Some(region) = regions.remove_region(index)? else { return Ok(None); }; let region_ = region.write(); + layout.remove_region(index, ®ion_)?; self.punch_hole(region_.start(), region_.len())?; drop(region_); Ok(Some(region)) diff --git a/crates/brk_vecs/src/file/region.rs b/crates/brk_vecs/src/file/region.rs index 2ff002379..054eafac9 100644 --- a/crates/brk_vecs/src/file/region.rs +++ b/crates/brk_vecs/src/file/region.rs @@ -1,8 +1,9 @@ -use bincode::{Decode, Encode}; +use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; use crate::PAGE_SIZE; -#[derive(Debug, Clone, Encode, Decode)] +#[derive(Debug, Clone, FromBytes, IntoBytes, Immutable, KnownLayout)] +#[repr(C)] pub struct Region { /// Must be multiple of 4096 start: u64, @@ -11,6 +12,8 @@ pub struct Region { reserved: u64, } +pub const SIZE_OF_REGION: usize = size_of::(); + impl Region { pub fn new(start: u64, length: u64, reserved: u64) -> Self { assert!(reserved > 0); diff --git a/crates/brk_vecs/src/file/regions.rs b/crates/brk_vecs/src/file/regions.rs index 50714ac5e..8e8a50169 100644 --- a/crates/brk_vecs/src/file/regions.rs +++ b/crates/brk_vecs/src/file/regions.rs @@ -1,14 +1,163 @@ use std::{ collections::HashMap, - fs, - sync::{Arc, RwLock}, + fs::{self, OpenOptions}, + io::{BufReader, BufWriter}, + path::Path, + sync::Arc, }; -use crate::file::region::Region; +use bincode::{decode_from_std_read, encode_into_std_write}; +use brk_core::{Error, Result}; +use memmap2::MmapMut; +use parking_lot::RwLock; +use zerocopy::{FromBytes, IntoBytes}; + +use crate::{ + PAGE_SIZE, + file::region::{Region, SIZE_OF_REGION}, +}; #[derive(Debug)] pub struct Regions { - file: fs::File, id_to_index: HashMap, + id_to_index_file: fs::File, index_to_region: Vec>>>, + index_to_region_file: fs::File, + index_to_region_mmap: MmapMut, +} + +impl Regions { + pub fn open(path: &Path) -> Result { + let path = path.join("regions"); + + let id_to_index_file = OpenOptions::new() + .read(true) + .create(true) + .write(true) + .truncate(false) + .open(path.join("id_to_index"))?; + + let mut reader = BufReader::new(&id_to_index_file); + let id_to_index: HashMap = + decode_from_std_read(&mut reader, bincode::config::standard())?; + + let index_to_region_file = OpenOptions::new() + .read(true) + .create(true) + .write(true) + .truncate(false) + .open(path.join("index_to_region"))?; + + let index_to_region_mmap = unsafe { MmapMut::map_mut(&index_to_region_file)? }; + + let mut index_to_region: Vec>>> = vec![]; + + id_to_index + .iter() + .try_for_each(|(_, &index)| -> Result<()> { + let start = index * SIZE_OF_REGION; + let end = start + SIZE_OF_REGION; + let region = Region::read_from_bytes(&index_to_region_mmap[start..end])?; + if index_to_region.len() < index + 1 { + index_to_region.resize_with(index + 1, Default::default); + } + index_to_region + .get_mut(index) + .unwrap() + .replace(Arc::new(RwLock::new(region))); + Ok(()) + })?; + + // TODO: Removes Nones from vec if needed, update map accordingly and save them + + Ok(Self { + id_to_index, + id_to_index_file, + index_to_region, + index_to_region_file, + index_to_region_mmap, + }) + } + + pub fn create_region(&mut self, id: String, start: u64) -> Result { + let index = self + .index_to_region + .iter() + .enumerate() + .find(|(_, opt)| opt.is_none()) + .map(|(index, _)| index) + .unwrap_or_else(|| self.index_to_region.len()); + + let region = Region::new(start, PAGE_SIZE, PAGE_SIZE); + + self.index_to_region + .push(Some(Arc::new(RwLock::new(region.clone())))); + + let end = index * SIZE_OF_REGION + SIZE_OF_REGION; + if self.index_to_region_mmap.len() < end { + self.index_to_region_file.set_len(end as u64); + self.index_to_region_mmap = unsafe { MmapMut::map_mut(&self.index_to_region_file)? }; + } + + self.write_to_mmap(®ion, index); + + if self.id_to_index.insert(id, index).is_some() { + return Err(Error::Str("Already exists")); + } + self.flush_id_to_index()?; + + Ok(index) + } + + pub fn remove_region(&mut self, index: usize) -> Result>>> { + let Some(region) = self.index_to_region.get_mut(index).and_then(Option::take) else { + return Ok(None); + }; + + self.id_to_index + .remove(&self.find_id_from_index(index).unwrap().to_owned()); + + self.flush_id_to_index()?; + + Ok(Some(region)) + } + + fn flush_id_to_index(&mut self) -> Result<()> { + let mut writer = BufWriter::new(&mut self.id_to_index_file); + encode_into_std_write(&self.id_to_index, &mut writer, bincode::config::standard())?; + Ok(()) + } + + pub fn get_region_from_index(&self, index: usize) -> Option>> { + self.index_to_region.get(index).cloned().flatten() + } + + pub fn get_region_index_from_id(&self, id: String) -> Option { + self.id_to_index.get(&id).copied() + } + + fn find_id_from_index(&self, index: usize) -> Option<&String> { + Some( + self.id_to_index + .iter() + .find(|(_, v)| **v == index) + .unwrap() + .0, + ) + } + + pub fn as_array(&self) -> &[Option>>] { + &self.index_to_region + } + + fn write_to_mmap(&self, region: &Region, index: usize) { + let start = index * SIZE_OF_REGION; + let end = start + SIZE_OF_REGION; + let mmap = &self.index_to_region_mmap; + if end > mmap.len() { + unreachable!("Trying to write beyond mmap") + } + let slice = unsafe { std::slice::from_raw_parts_mut(mmap.as_ptr() as *mut u8, mmap.len()) }; + slice[start..end].copy_from_slice(region.as_bytes()); + } }