global: wip

This commit is contained in:
nym21
2025-06-01 14:37:19 +02:00
parent f976f672cf
commit cbcf603b63
30 changed files with 326 additions and 183 deletions

View File

@@ -9,8 +9,7 @@ license.workspace = true
repository.workspace = true
[dependencies]
arc-swap = { workspace = true }
brk_core = { workspace = true }
byteview = { workspace = true }
color-eyre = { workspace = true }
fjall = { workspace = true }
zerocopy = { workspace = true }

View File

@@ -0,0 +1 @@
# BRK Store

View File

@@ -0,0 +1,43 @@
use std::path::Path;
use brk_core::{Dollars, Height, Result, Sats, Version};
use brk_store::Store;
fn main() -> Result<()> {
let p = Path::new("./examples/_fjall");
let keyspace = brk_store::open_keyspace(p)?;
let mut store: Store<Dollars, Sats> =
brk_store::Store::import(&keyspace, p, "n", Version::ZERO, None)?;
store.copy_db_to_puts();
*store.puts_entry_or_default(&Dollars::from(10.0)) += Sats::ONE_BTC;
*store.puts_entry_or_default(&Dollars::from(1.0)) += Sats::ONE_BTC;
*store.puts_entry_or_default(&Dollars::ZERO) += Sats::ONE_BTC;
*store.puts_entry_or_default(&Dollars::ZERO) += Sats::ONE_BTC;
dbg!(store.tx_iter().collect::<Vec<_>>());
store.commit(Height::ZERO)?;
store.copy_db_to_puts();
dbg!(store.tx_iter().collect::<Vec<_>>());
*store.puts_entry_or_default(&Dollars::from(10.0)) += Sats::ONE_BTC;
*store.puts_entry_or_default(&Dollars::from(1.0)) += Sats::ONE_BTC;
*store.puts_entry_or_default(&Dollars::ZERO) += Sats::ONE_BTC;
*store.puts_entry_or_default(&Dollars::ZERO) += Sats::ONE_BTC;
dbg!(store.tx_iter().collect::<Vec<_>>());
store.commit(Height::from(1_u32))?;
store.copy_db_to_puts();
dbg!(store.tx_iter().collect::<Vec<_>>());
Ok(())
}

View File

@@ -1,18 +1,23 @@
#![doc = include_str!("../README.md")]
#![doc = "\n## Example\n\n```rust"]
#![doc = include_str!("../examples/main.rs")]
#![doc = "```"]
use std::{
collections::{BTreeMap, BTreeSet},
error,
fmt::Debug,
mem,
path::Path,
sync::Arc,
};
use arc_swap::ArcSwap;
use brk_core::{Height, Result, Value, Version};
use byteview::ByteView;
use fjall::{
PartitionCreateOptions, PersistMode, ReadTransaction, TransactionalKeyspace,
TransactionalPartitionHandle,
};
use zerocopy::{Immutable, IntoBytes};
mod meta;
use meta::*;
@@ -21,7 +26,7 @@ pub struct Store<Key, Value> {
meta: StoreMeta,
name: String,
keyspace: TransactionalKeyspace,
partition: TransactionalPartitionHandle,
partition: Arc<ArcSwap<TransactionalPartitionHandle>>,
rtx: ReadTransaction,
puts: BTreeMap<Key, Value>,
dels: BTreeSet<Key>,
@@ -30,15 +35,20 @@ pub struct Store<Key, Value> {
/// Use default if will read
const DEFAULT_BLOOM_FILTER_BITS: Option<u8> = Some(5);
const CHECK_COLLISISONS: bool = true;
// const CHECK_COLLISISONS: bool = true;
const MAJOR_FJALL_VERSION: Version = Version::TWO;
impl<K, V> Store<K, V>
pub fn open_keyspace(path: &Path) -> fjall::Result<TransactionalKeyspace> {
fjall::Config::new(path.join("fjall"))
.max_write_buffer_size(32 * 1024 * 1024)
.open_transactional()
}
impl<'a, K, V> Store<K, V>
where
K: Debug + Clone + Into<ByteView> + TryFrom<ByteView> + Ord + Immutable + IntoBytes,
V: Debug + Clone + Into<ByteView> + TryFrom<ByteView>,
<K as TryFrom<ByteView>>::Error: error::Error + Send + Sync + 'static,
<V as TryFrom<ByteView>>::Error: error::Error + Send + Sync + 'static,
K: Debug + Clone + From<ByteView> + Ord + 'a,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<&'a K> + From<V>,
{
pub fn import(
keyspace: &TransactionalKeyspace,
@@ -65,7 +75,7 @@ where
meta,
name: name.to_owned(),
keyspace: keyspace.clone(),
partition,
partition: Arc::new(ArcSwap::from_pointee(partition)),
rtx,
puts: BTreeMap::new(),
dels: BTreeSet::new(),
@@ -73,39 +83,70 @@ where
})
}
pub fn get(&self, key: &K) -> color_eyre::Result<Option<Value<V>>> {
pub fn get(&self, key: &'a K) -> Result<Option<Value<V>>> {
if let Some(v) = self.puts.get(key) {
Ok(Some(Value::Ref(v)))
} else if let Some(slice) = self.rtx.get(&self.partition, key.as_bytes())? {
Ok(Some(Value::Owned(V::try_from(slice.as_bytes().into())?)))
} else if let Some(slice) = self.rtx.get(&self.partition.load(), ByteView::from(key))? {
Ok(Some(Value::Owned(V::from(ByteView::from(slice)))))
} else {
Ok(None)
}
}
pub fn get_mut_or_default(&mut self, key: &K) -> &mut V
pub fn first_key_value(&self) -> Result<Option<(K, V)>> {
Ok(self
.rtx
.first_key_value(&self.partition.load())?
.map(|(k, v)| (K::from(ByteView::from(k)), V::from(ByteView::from(v)))))
}
pub fn last_key_value(&self) -> Result<Option<(K, V)>> {
Ok(self
.rtx
.last_key_value(&self.partition.load())?
.map(|(k, v)| (K::from(ByteView::from(k)), V::from(ByteView::from(v)))))
}
pub fn puts_entry_or_default(&mut self, key: &'a K) -> &mut V
where
V: Default,
{
self.puts.entry(key.clone()).or_insert_with(|| {
if let Some(slice) = self.rtx.get(&self.partition, key.as_bytes()).unwrap() {
V::try_from(slice.as_bytes().into()).unwrap()
} else {
V::default()
}
})
self.puts.entry(key.clone()).or_default()
}
pub fn unordered_clone_iter(&self) -> impl Iterator<Item = (K, V)> {
pub fn tx_iter(&self) -> impl Iterator<Item = (K, V)> {
self.rtx
.iter(&self.partition)
.iter(&self.partition.load())
.map(|res| res.unwrap())
.map(|(k, v)| (K::try_from(ByteView::from(k)).unwrap(), v))
.filter(|(k, _)| !self.puts.contains_key(k) && !self.dels.contains(k))
.map(|(k, v)| (k, V::try_from(ByteView::from(v)).unwrap()))
.chain(self.puts.iter().map(|(k, v)| (k.clone(), v.clone())))
.map(|(k, v)| (K::from(ByteView::from(k)), V::from(ByteView::from(v))))
}
pub fn puts_iter(&self) -> impl Iterator<Item = (&K, &V)> {
self.puts.iter()
}
pub fn clone_puts(&self) -> BTreeMap<K, V> {
self.puts.clone()
}
pub fn append_puts(&mut self, mut other: BTreeMap<K, V>) {
self.puts.append(&mut other);
}
pub fn copy_db_to_puts(&mut self) {
self.append_puts(self.tx_iter().collect());
}
// pub fn unordered_clone_iter(&self) -> impl Iterator<Item = (K, V)> {
// self.rtx
// .keys(&self.partition.load())
// .map(|res| res.unwrap())
// .map(|k| K::from(ByteView::from(k)))
// .filter(|k| !self.puts.contains_key(k) && !self.dels.contains(k))
// .map(|k| (k, self.rtx.get(partition, key) V::from(ByteView::from(v))))
// .chain(self.puts.iter().map(|(k, v)| (k.clone(), v.clone())))
// }
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
if !self.dels.is_empty() {
@@ -153,30 +194,28 @@ where
let mut wtx = self.keyspace.write_tx();
let partition = &self.partition.load();
mem::take(&mut self.dels)
.into_iter()
.for_each(|key| wtx.remove(&self.partition, key.as_bytes()));
.for_each(|key| wtx.remove(partition, ByteView::from(key)));
mem::take(&mut self.puts)
.into_iter()
.for_each(|(key, value)| {
if CHECK_COLLISISONS {
#[allow(unused_must_use)]
if let Ok(Some(value)) = wtx.get(&self.partition, key.as_bytes()) {
dbg!(
&key,
V::try_from(value.as_bytes().into()).unwrap(),
&self.meta,
self.rtx.get(&self.partition, key.as_bytes())
);
unreachable!();
}
}
wtx.insert(
&self.partition,
key.as_bytes(),
&*ByteView::try_from(value).unwrap(),
)
// if CHECK_COLLISISONS {
// #[allow(unused_must_use)]
// if let Ok(Some(value)) = wtx.get(&self.partition, key.as_bytes()) {
// dbg!(
// &key,
// V::try_from(value.as_bytes().into()).unwrap(),
// &self.meta,
// self.rtx.get(&self.partition, key.as_bytes())
// );
// unreachable!();
// }
// }
wtx.insert(partition, ByteView::from(key), ByteView::from(value))
});
wtx.commit()?;
@@ -187,7 +226,7 @@ where
}
pub fn rotate_memtable(&self) {
let _ = self.partition.inner().rotate_memtable();
let _ = self.partition.load().inner().rotate_memtable();
}
pub fn height(&self) -> Option<Height> {
@@ -225,10 +264,23 @@ where
}
pub fn reset_partition(&mut self) -> Result<()> {
self.keyspace.delete_partition(self.partition.clone())?;
let partition = Arc::try_unwrap(self.partition.swap(unsafe {
#[allow(clippy::uninit_assumed_init, invalid_value)]
mem::MaybeUninit::uninit().assume_init()
}))
.ok()
.unwrap();
self.keyspace.delete_partition(partition)?;
self.keyspace.persist(PersistMode::SyncAll)?;
self.partition =
Self::open_partition_handle(&self.keyspace, &self.name, self.bloom_filter_bits)?;
self.partition.store(Arc::new(Self::open_partition_handle(
&self.keyspace,
&self.name,
self.bloom_filter_bits,
)?));
Ok(())
}
}

View File

@@ -3,9 +3,8 @@ use std::{
path::{Path, PathBuf},
};
use brk_core::{Result, Version};
use brk_core::{Result, Version, copy_first_8bytes};
use fjall::{TransactionalKeyspace, TransactionalPartitionHandle};
use zerocopy::{FromBytes, IntoBytes};
use super::Height;
@@ -48,7 +47,7 @@ impl StoreMeta {
pathbuf: path.to_owned(),
version,
height: Height::try_from(Self::path_height_(path).as_path()).ok(),
len: Self::read_length_(path)?,
len: Self::read_length_(path),
};
slf.version.write(&slf.path_version())?;
@@ -109,16 +108,16 @@ impl StoreMeta {
path.join("height")
}
fn read_length_(path: &Path) -> Result<usize> {
Ok(fs::read(Self::path_length(path))
.map(|v| usize::read_from_bytes(v.as_slice()).unwrap_or_default())
.unwrap_or_default())
fn read_length_(path: &Path) -> usize {
fs::read(Self::path_length(path))
.map(|v| usize::from_ne_bytes(copy_first_8bytes(v.as_slice()).unwrap_or_default()))
.unwrap_or_default()
}
fn write_length(&self) -> io::Result<()> {
Self::write_length_(&self.pathbuf, self.len)
}
fn write_length_(path: &Path, len: usize) -> io::Result<()> {
fs::write(Self::path_length(path), len.as_bytes())
fs::write(Self::path_length(path), len.to_ne_bytes())
}
fn path_length(path: &Path) -> PathBuf {
path.join("length")