mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-06-12 16:03:31 -07:00
store: faster everything
This commit is contained in:
@@ -7,7 +7,7 @@ use brk_structs::{
|
||||
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, StoredString, TxIndex, TxOutIndex,
|
||||
TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
|
||||
};
|
||||
use fjall2::{PersistMode, TransactionalKeyspace};
|
||||
use fjall2::{Keyspace, PersistMode};
|
||||
use rayon::prelude::*;
|
||||
use vecdb::{AnyVec, StoredIndex, VecIterator};
|
||||
|
||||
@@ -17,7 +17,7 @@ use super::Vecs;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Stores {
|
||||
pub keyspace: TransactionalKeyspace,
|
||||
pub keyspace: Keyspace,
|
||||
|
||||
pub addressbyteshash_to_typeindex: Store<AddressBytesHash, TypeIndex>,
|
||||
pub blockhashprefix_to_height: Store<BlockHashPrefix, Height>,
|
||||
@@ -361,8 +361,9 @@ impl Stores {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.blockhashprefix_to_height.reset()?;
|
||||
self.addressbyteshash_to_typeindex.reset()?;
|
||||
unreachable!();
|
||||
// self.blockhashprefix_to_height.reset()?;
|
||||
// self.addressbyteshash_to_typeindex.reset()?;
|
||||
}
|
||||
|
||||
if starting_indexes.txindex != TxIndex::ZERO {
|
||||
@@ -384,11 +385,12 @@ impl Stores {
|
||||
}
|
||||
});
|
||||
} else {
|
||||
self.txidprefix_to_txindex.reset()?;
|
||||
unreachable!();
|
||||
// self.txidprefix_to_txindex.reset()?;
|
||||
}
|
||||
|
||||
if starting_indexes.txoutindex != TxOutIndex::ZERO {
|
||||
// todo!();
|
||||
todo!();
|
||||
// let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.into_iter();
|
||||
// vecs.txoutindex_to_outputtype
|
||||
// .iter_at(starting_indexes.txoutindex)
|
||||
@@ -404,12 +406,13 @@ impl Stores {
|
||||
// .remove(TypeIndexAndTxIndex::from((typeindex, txoutindex)));
|
||||
// });
|
||||
} else {
|
||||
self.addresstype_to_typeindex_and_txindex
|
||||
.iter_mut()
|
||||
.try_for_each(|s| s.reset())?;
|
||||
self.addresstype_to_typeindex_and_unspentoutpoint
|
||||
.iter_mut()
|
||||
.try_for_each(|s| s.reset())?;
|
||||
unreachable!();
|
||||
// self.addresstype_to_typeindex_and_txindex
|
||||
// .iter_mut()
|
||||
// .try_for_each(|s| s.reset())?;
|
||||
// self.addresstype_to_typeindex_and_unspentoutpoint
|
||||
// .iter_mut()
|
||||
// .try_for_each(|s| s.reset())?;
|
||||
}
|
||||
|
||||
self.commit(starting_indexes.height.decremented().unwrap_or_default())?;
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::path::Path;
|
||||
// use std::path::Path;
|
||||
|
||||
use brk_error::Result;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let p = Path::new("./examples/_fjall");
|
||||
// let p = Path::new("./examples/_fjall");
|
||||
|
||||
// let _keyspace = brk_store::open_keyspace(p)?;
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ use brk_structs::{Height, Version};
|
||||
pub trait AnyStore: Send + Sync {
|
||||
fn commit(&mut self, height: Height) -> Result<()>;
|
||||
fn persist(&self) -> Result<()>;
|
||||
fn reset(&mut self) -> Result<()>;
|
||||
// fn reset(&mut self) -> Result<()>;
|
||||
fn name(&self) -> &'static str;
|
||||
fn height(&self) -> Option<Height>;
|
||||
fn has(&self, height: Height) -> bool;
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
|
||||
mod any;
|
||||
mod v2;
|
||||
mod v3;
|
||||
// mod v3;
|
||||
|
||||
pub use any::*;
|
||||
pub use v2::*;
|
||||
pub use v3::*;
|
||||
// pub use v3::*;
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::{
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_structs::Version;
|
||||
use fjall2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle};
|
||||
use fjall2::{Keyspace, PartitionHandle, PersistMode};
|
||||
|
||||
use super::Height;
|
||||
|
||||
@@ -18,13 +18,13 @@ pub struct StoreMeta {
|
||||
|
||||
impl StoreMeta {
|
||||
pub fn checked_open<F>(
|
||||
keyspace: &TransactionalKeyspace,
|
||||
keyspace: &Keyspace,
|
||||
path: &Path,
|
||||
version: Version,
|
||||
open_partition_handle: F,
|
||||
) -> Result<(Self, TransactionalPartitionHandle)>
|
||||
) -> Result<(Self, PartitionHandle)>
|
||||
where
|
||||
F: Fn() -> Result<TransactionalPartitionHandle>,
|
||||
F: Fn() -> Result<PartitionHandle>,
|
||||
{
|
||||
fs::create_dir_all(path)?;
|
||||
|
||||
@@ -60,10 +60,6 @@ impl StoreMeta {
|
||||
height.write(&self.path_height())
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.height.take();
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &Path {
|
||||
&self.pathbuf
|
||||
}
|
||||
|
||||
@@ -1,42 +1,35 @@
|
||||
use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, path::Path, sync::Arc};
|
||||
use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path};
|
||||
|
||||
use brk_error::Result;
|
||||
use brk_structs::{Height, Version};
|
||||
use byteview6::ByteView;
|
||||
use fjall2::{
|
||||
PartitionCreateOptions, PersistMode, ReadTransaction, TransactionalKeyspace,
|
||||
TransactionalPartitionHandle,
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use fjall2::{InnerItem, Keyspace, PartitionCreateOptions, PartitionHandle, PersistMode};
|
||||
use rustc_hash::{FxHashMap, FxHashSet};
|
||||
|
||||
use crate::any::AnyStore;
|
||||
|
||||
mod meta;
|
||||
|
||||
use log::info;
|
||||
use meta::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct StoreV2<Key, Value> {
|
||||
meta: StoreMeta,
|
||||
name: &'static str,
|
||||
keyspace: TransactionalKeyspace,
|
||||
partition: Arc<RwLock<Option<TransactionalPartitionHandle>>>,
|
||||
rtx: Arc<RwLock<Option<ReadTransaction>>>,
|
||||
keyspace: Keyspace,
|
||||
// no, make this faster
|
||||
// no ! remove it altogether, reset too
|
||||
partition: PartitionHandle,
|
||||
puts: FxHashMap<Key, Value>,
|
||||
dels: FxHashSet<Key>,
|
||||
bloom_filters: Option<bool>,
|
||||
}
|
||||
|
||||
// const CHECK_COLLISIONS: bool = true;
|
||||
const MAJOR_FJALL_VERSION: Version = Version::TWO;
|
||||
|
||||
pub fn open_keyspace(path: &Path) -> fjall2::Result<TransactionalKeyspace> {
|
||||
pub fn open_keyspace(path: &Path) -> fjall2::Result<Keyspace> {
|
||||
fjall2::Config::new(path.join("fjall"))
|
||||
// .cache_size(1024 * 1024 * 1024) // for tests only
|
||||
.max_write_buffer_size(32 * 1024 * 1024)
|
||||
.open_transactional()
|
||||
.open()
|
||||
}
|
||||
|
||||
impl<K, V> StoreV2<K, V>
|
||||
@@ -46,12 +39,11 @@ where
|
||||
ByteView: From<K> + From<V>,
|
||||
{
|
||||
fn open_partition_handle(
|
||||
keyspace: &TransactionalKeyspace,
|
||||
keyspace: &Keyspace,
|
||||
name: &str,
|
||||
bloom_filters: Option<bool>,
|
||||
) -> Result<TransactionalPartitionHandle> {
|
||||
) -> Result<PartitionHandle> {
|
||||
let mut options = PartitionCreateOptions::default()
|
||||
// .max_memtable_size(64 * 1024 * 1024) // for tests only
|
||||
.max_memtable_size(8 * 1024 * 1024)
|
||||
.manual_journal_persist(true);
|
||||
|
||||
@@ -63,7 +55,7 @@ where
|
||||
}
|
||||
|
||||
pub fn import(
|
||||
keyspace: &TransactionalKeyspace,
|
||||
keyspace: &Keyspace,
|
||||
path: &Path,
|
||||
name: &str,
|
||||
version: Version,
|
||||
@@ -83,17 +75,13 @@ where
|
||||
},
|
||||
)?;
|
||||
|
||||
let rtx = keyspace.read_tx();
|
||||
|
||||
Ok(Self {
|
||||
meta,
|
||||
name: Box::leak(Box::new(name.to_string())),
|
||||
keyspace: keyspace.clone(),
|
||||
partition: Arc::new(RwLock::new(Some(partition))),
|
||||
rtx: Arc::new(RwLock::new(Some(rtx))),
|
||||
partition,
|
||||
puts: FxHashMap::default(),
|
||||
dels: FxHashSet::default(),
|
||||
bloom_filters,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -104,13 +92,7 @@ where
|
||||
{
|
||||
if let Some(v) = self.puts.get(key) {
|
||||
Ok(Some(Cow::Borrowed(v)))
|
||||
} else if let Some(slice) = self
|
||||
.rtx
|
||||
.read()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get(self.partition.read().as_ref().unwrap(), ByteView::from(key))?
|
||||
{
|
||||
} else if let Some(slice) = self.partition.get(ByteView::from(key))? {
|
||||
Ok(Some(Cow::Owned(V::from(ByteView::from(&*slice)))))
|
||||
} else {
|
||||
Ok(None)
|
||||
@@ -118,20 +100,12 @@ where
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> Result<bool> {
|
||||
self.rtx
|
||||
.read()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.is_empty(self.partition.read().as_ref().unwrap())
|
||||
.map_err(|e| e.into())
|
||||
self.partition.is_empty().map_err(|e| e.into())
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
|
||||
self.rtx
|
||||
.read()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.iter(self.partition.read().as_ref().unwrap())
|
||||
self.partition
|
||||
.iter()
|
||||
.map(|res| res.unwrap())
|
||||
.map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
|
||||
}
|
||||
@@ -146,7 +120,6 @@ where
|
||||
|
||||
#[inline]
|
||||
pub fn remove(&mut self, key: K) {
|
||||
// Hot path: key was recently inserted
|
||||
if self.puts.remove(&key).is_some() {
|
||||
return;
|
||||
}
|
||||
@@ -162,19 +135,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// pub fn retain_or_del<F>(&mut self, retain: F)
|
||||
// where
|
||||
// F: Fn(&K, &mut V) -> bool,
|
||||
// {
|
||||
// self.puts.retain(|k, v| {
|
||||
// let ret = retain(k, v);
|
||||
// if !ret {
|
||||
// self.dels.insert(k.clone());
|
||||
// }
|
||||
// ret
|
||||
// });
|
||||
// }
|
||||
|
||||
#[inline]
|
||||
fn has(&self, height: Height) -> bool {
|
||||
self.meta.has(height)
|
||||
@@ -204,27 +164,17 @@ where
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut rtx = self.rtx.write();
|
||||
let _ = rtx.take();
|
||||
let mut items = self
|
||||
.puts
|
||||
.drain()
|
||||
.map(|(key, value)| InnerItem::Value { key, value })
|
||||
.chain(self.dels.drain().map(|key| InnerItem::WeakTombstone(key)))
|
||||
.collect::<Vec<_>>();
|
||||
items.sort_unstable();
|
||||
|
||||
let mut wtx = self.keyspace.write_tx();
|
||||
|
||||
let partition = self.partition.read();
|
||||
|
||||
let partition = partition.as_ref().unwrap();
|
||||
|
||||
wtx.remove_batch(partition, self.dels.drain().map(ByteView::from));
|
||||
|
||||
wtx.insert_batch(
|
||||
partition,
|
||||
self.puts
|
||||
.drain()
|
||||
.map(|(k, v)| (ByteView::from(k), ByteView::from(v))),
|
||||
);
|
||||
|
||||
wtx.commit()?;
|
||||
|
||||
rtx.replace(self.keyspace.read_tx());
|
||||
self.keyspace
|
||||
.batch()
|
||||
.commit_single_partition(&self.partition, items)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -239,24 +189,6 @@ where
|
||||
self.name
|
||||
}
|
||||
|
||||
fn reset(&mut self) -> Result<()> {
|
||||
info!("Resetting {}...", self.name);
|
||||
|
||||
let mut opt = self.partition.write();
|
||||
|
||||
let partition = opt.take().unwrap();
|
||||
|
||||
self.keyspace.delete_partition(partition)?;
|
||||
|
||||
self.meta.reset();
|
||||
|
||||
let partition = Self::open_partition_handle(&self.keyspace, self.name, self.bloom_filters)?;
|
||||
|
||||
opt.replace(partition);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn height(&self) -> Option<Height> {
|
||||
self.meta.height()
|
||||
}
|
||||
|
||||
@@ -217,26 +217,26 @@ where
|
||||
self.name
|
||||
}
|
||||
|
||||
fn reset(&mut self) -> Result<()> {
|
||||
info!("Resetting {}...", self.name);
|
||||
// fn reset(&mut self) -> Result<()> {
|
||||
// info!("Resetting {}...", self.name);
|
||||
|
||||
todo!();
|
||||
// todo!();
|
||||
|
||||
let mut opt = self.keyspace.write();
|
||||
// let mut opt = self.keyspace.write();
|
||||
|
||||
let keyspace = opt.take().unwrap();
|
||||
// let keyspace = opt.take().unwrap();
|
||||
|
||||
// Doesn't exist yet
|
||||
// self.database.remove_keyspace(keyspace)?;
|
||||
// // Doesn't exist yet
|
||||
// // self.database.remove_keyspace(keyspace)?;
|
||||
|
||||
self.meta.reset();
|
||||
// self.meta.reset();
|
||||
|
||||
let keyspace = Self::open_keyspace(&self.database, self.name)?;
|
||||
// let keyspace = Self::open_keyspace(&self.database, self.name)?;
|
||||
|
||||
opt.replace(keyspace);
|
||||
// opt.replace(keyspace);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
fn height(&self) -> Option<Height> {
|
||||
self.meta.height()
|
||||
|
||||
Reference in New Issue
Block a user