global: snapshot

This commit is contained in:
nym21
2025-06-06 16:08:20 +02:00
parent a11bf5523b
commit cc0f9c42df
12 changed files with 162 additions and 119 deletions

View File

@@ -31,6 +31,7 @@ pub struct Store<Key, Value> {
puts: BTreeMap<Key, Value>,
dels: BTreeSet<Key>,
bloom_filter_bits: Option<Option<u8>>,
override_partition: bool,
}
/// Use default if will read
@@ -80,6 +81,7 @@ where
puts: BTreeMap::new(),
dels: BTreeSet::new(),
bloom_filter_bits,
override_partition: false,
})
}
@@ -119,9 +121,20 @@ where
where
V: Default,
{
if !self.dels.is_empty() {
self.dels.remove(key);
}
self.puts.entry(key.clone()).or_default()
}
pub fn puts_remove(&mut self, key: &K) -> Option<V> {
self.puts.remove(key)
}
pub fn dels_insert(&mut self, key: K) -> bool {
self.dels.insert(key)
}
pub fn tx_iter(&self) -> impl Iterator<Item = (K, V)> {
self.rtx
.iter(&self.partition.load())
@@ -142,7 +155,9 @@ where
}
pub fn copy_db_to_puts(&mut self) {
self.override_partition = true;
self.append_puts(self.tx_iter().collect());
self.meta.reset_len();
}
// pub fn unordered_clone_iter(&self) -> impl Iterator<Item = (K, V)> {
@@ -180,18 +195,18 @@ where
}
}
pub fn retain_or_del<F>(&mut self, retain: F)
where
F: Fn(&K, &mut V) -> bool,
{
self.puts.retain(|k, v| {
let ret = retain(k, v);
if !ret {
self.dels.insert(k.clone());
}
ret
});
}
// pub fn retain_or_del<F>(&mut self, retain: F)
// where
// F: Fn(&K, &mut V) -> bool,
// {
// self.puts.retain(|k, v| {
// let ret = retain(k, v);
// if !ret {
// self.dels.insert(k.clone());
// }
// ret
// });
// }
pub fn commit(&mut self, height: Height) -> Result<()> {
if self.has(height) && self.puts.is_empty() && self.dels.is_empty() {
@@ -200,6 +215,10 @@ where
self.meta.export(self.len(), height)?;
if self.override_partition {
self.reset_partition()?;
}
let mut wtx = self.keyspace.write_tx();
let partition = &self.partition.load();
@@ -242,7 +261,18 @@ where
}
pub fn len(&self) -> usize {
self.meta.len() + self.puts.len() - self.dels.len()
let len = self.meta.len() + self.puts.len() - self.dels.len();
if len > 18440000000000000000 {
dbg!((
len,
self.meta.path(),
self.meta.len(),
self.puts.len(),
&self.dels,
));
unreachable!()
}
len
}
pub fn is_empty(&self) -> bool {
self.len() == 0
@@ -308,6 +338,7 @@ where
puts: self.puts.clone(),
dels: self.dels.clone(),
bloom_filter_bits: self.bloom_filter_bits,
override_partition: self.override_partition,
}
}
}

View File

@@ -36,18 +36,28 @@ impl StoreMeta {
let mut partition = open_partition_handle()?;
let mut did_reset = false;
if !is_same_version {
did_reset = true;
Self::reset_(path)?;
keyspace.delete_partition(partition)?;
keyspace.persist(fjall::PersistMode::SyncAll)?;
partition = open_partition_handle()?;
}
let len = Self::read_length_(path);
if did_reset && len != 0 {
dbg!(&path);
unreachable!();
}
let slf = Self {
pathbuf: path.to_owned(),
version,
height: Height::try_from(Self::path_height_(path).as_path()).ok(),
len: Self::read_length_(path),
len,
};
slf.version.write(&slf.path_version())?;
@@ -58,6 +68,10 @@ impl StoreMeta {
pub fn len(&self) -> usize {
self.len
}
pub fn reset_len(&mut self) {
self.len = 0
}
// pub fn is_empty(&self) -> bool {
// self.len() == 0
// }
@@ -110,7 +124,7 @@ impl StoreMeta {
fn read_length_(path: &Path) -> usize {
fs::read(Self::path_length(path))
.map(|v| usize::from_ne_bytes(copy_first_8bytes(v.as_slice()).unwrap_or_default()))
.map(|v| usize::from_ne_bytes(copy_first_8bytes(v.as_slice()).unwrap()))
.unwrap_or_default()
}
fn write_length(&self) -> io::Result<()> {