global: fully replace fjall2 by fjall3

This commit is contained in:
nym21
2025-12-10 17:36:12 +01:00
parent 998db1beed
commit abde9ed162
19 changed files with 447 additions and 1299 deletions
Generated
+10 -110
View File
@@ -644,7 +644,6 @@ version = "0.0.111"
dependencies = [
"bitcoin",
"bitcoincore-rpc",
"brk_fjall",
"fjall",
"jiff",
"minreq",
@@ -665,23 +664,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "brk_fjall"
version = "2.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da285ef974591f84284f6ab500dfd903c7a9bf8674e2c17f8b35a44a20726ee1"
dependencies = [
"byteorder",
"byteview 0.6.1",
"dashmap",
"log",
"lsm-tree 2.10.4",
"path-absolutize",
"std-semaphore",
"tempfile",
"xxhash-rust",
]
[[package]]
name = "brk_grouper"
version = "0.0.111"
@@ -700,7 +682,6 @@ dependencies = [
"bitcoin",
"brk_bencher",
"brk_error",
"brk_fjall",
"brk_grouper",
"brk_iterator",
"brk_logger",
@@ -1241,10 +1222,8 @@ name = "brk_store"
version = "0.0.111"
dependencies = [
"brk_error",
"brk_fjall",
"brk_types",
"byteview 0.6.1",
"byteview 0.9.1",
"byteview",
"fjall",
"rustc-hash",
]
@@ -1287,7 +1266,7 @@ version = "0.0.111"
dependencies = [
"bitcoin",
"brk_error",
"byteview 0.9.1",
"byteview",
"derive_deref",
"itoa",
"jiff",
@@ -1356,12 +1335,6 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3"
[[package]]
name = "byteview"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5"
[[package]]
name = "byteview"
version = "0.9.1"
@@ -1943,12 +1916,6 @@ dependencies = [
"libloading",
]
[[package]]
name = "double-ended-peekable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57"
[[package]]
name = "dragonbox_ecma"
version = "0.0.5"
@@ -2113,14 +2080,16 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "fjall"
version = "3.0.0-rc.5"
version = "3.0.0-rc.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff63be4348f42ed3c0c50175785ff14d0a833915c6b499a31f91d0e3ec5fc337"
dependencies = [
"byteorder-lite",
"byteview 0.9.1",
"byteview",
"dashmap",
"flume",
"log",
"lsm-tree 3.0.0-rc.5",
"lsm-tree",
"lz4_flex 0.11.5",
"tempfile",
"xxhash-rust",
@@ -2404,12 +2373,6 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "guardian"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17e2ac29387b1aa07a1e448f7bb4f35b500787971e965b02842b900afa5c8f6f"
[[package]]
name = "half"
version = "2.7.1"
@@ -2997,34 +2960,12 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "lsm-tree"
version = "2.10.4"
version = "3.0.0-rc.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "799399117a2bfb37660e08be33f470958babb98386b04185288d829df362ea15"
dependencies = [
"byteorder",
"crossbeam-skiplist",
"double-ended-peekable",
"enum_dispatch",
"guardian",
"interval-heap",
"log",
"lz4_flex 0.11.5",
"path-absolutize",
"quick_cache",
"rustc-hash",
"self_cell",
"tempfile",
"value-log",
"varint-rs",
"xxhash-rust",
]
[[package]]
name = "lsm-tree"
version = "3.0.0-rc.5"
checksum = "315d36f307af4d53f1030d6561de3fb6b914d5c242c353be101ddff91527c4b4"
dependencies = [
"byteorder-lite",
"byteview 0.9.1",
"byteview",
"crossbeam-skiplist",
"enum_dispatch",
"interval-heap",
@@ -3850,24 +3791,6 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "path-absolutize"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5"
dependencies = [
"path-dedot",
]
[[package]]
name = "path-dedot"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397"
dependencies = [
"once_cell",
]
[[package]]
name = "pathdiff"
version = "0.2.3"
@@ -4847,12 +4770,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "std-semaphore"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e"
[[package]]
name = "str_indices"
version = "0.4.4"
@@ -5430,23 +5347,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "value-log"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c"
dependencies = [
"byteorder",
"byteview 0.6.1",
"interval-heap",
"log",
"path-absolutize",
"rustc-hash",
"tempfile",
"varint-rs",
"xxhash-rust",
]
[[package]]
name = "value-trait"
version = "0.12.1"
+2 -4
View File
@@ -57,13 +57,11 @@ brk_store = { version = "0.0.111", path = "crates/brk_store" }
brk_types = { version = "0.0.111", path = "crates/brk_types" }
brk_traversable = { version = "0.0.111", path = "crates/brk_traversable", features = ["pco", "derive"] }
brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_derive" }
# byteview = "=0.6.1"
byteview = "0.9.1"
color-eyre = "0.6.5"
derive_deref = "1.1.1"
fjall2 = { version = "2.11.8", package = "brk_fjall" }
# fjall3 = { version = "3.0.0-rc.5", package = "fjall" }
fjall3 = { path = "../fjall3", package = "fjall" }
fjall = "3.0.0-rc.6"
# fjall3 = { path = "../fjall3", package = "fjall" }
# fjall3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "434979ef59d8fd2b36b91e6ff759a36d19a397ee", package = "fjall" }
jiff = "0.2.16"
log = "0.4.29"
+1 -2
View File
@@ -11,8 +11,7 @@ build = "build.rs"
[dependencies]
bitcoin = { workspace = true }
bitcoincore-rpc = { workspace = true }
fjall2 = { workspace = true }
fjall3 = { workspace = true }
fjall = { workspace = true }
jiff = { workspace = true }
minreq = { workspace = true }
serde_json = { workspace = true }
+5 -14
View File
@@ -12,8 +12,7 @@ pub enum Error {
IO(io::Error),
BitcoinRPC(bitcoincore_rpc::Error),
Jiff(jiff::Error),
FjallV2(fjall2::Error),
FjallV3(fjall3::Error),
Fjall(fjall::Error),
VecDB(vecdb::Error),
RawDB(vecdb::RawDBError),
Minreq(minreq::Error),
@@ -142,17 +141,10 @@ impl From<jiff::Error> for Error {
}
}
impl From<fjall3::Error> for Error {
impl From<fjall::Error> for Error {
#[inline]
fn from(value: fjall3::Error) -> Self {
Self::FjallV3(value)
}
}
impl From<fjall2::Error> for Error {
#[inline]
fn from(value: fjall2::Error) -> Self {
Self::FjallV2(value)
fn from(value: fjall::Error) -> Self {
Self::Fjall(value)
}
}
@@ -172,8 +164,7 @@ impl fmt::Display for Error {
Error::BitcoinHexError(error) => Display::fmt(&error, f),
Error::BitcoinHexToArrayError(error) => Display::fmt(&error, f),
Error::BitcoinRPC(error) => Display::fmt(&error, f),
Error::FjallV2(error) => Display::fmt(&error, f),
Error::FjallV3(error) => Display::fmt(&error, f),
Error::Fjall(error) => Display::fmt(&error, f),
Error::IO(error) => Display::fmt(&error, f),
Error::Jiff(error) => Display::fmt(&error, f),
Error::Minreq(error) => Display::fmt(&error, f),
+1 -2
View File
@@ -20,8 +20,7 @@ brk_rpc = { workspace = true }
brk_store = { workspace = true }
brk_types = { workspace = true }
brk_traversable = { workspace = true }
fjall2 = { workspace = true }
fjall3 = { workspace = true }
fjall = { workspace = true }
log = { workspace = true }
rayon = { workspace = true }
rustc-hash = { workspace = true }
+2 -4
View File
@@ -12,16 +12,14 @@ mod constants;
mod indexes;
mod processor;
mod readers;
// mod stores_v2;
mod stores_v3;
mod stores;
mod vecs;
use constants::*;
pub use indexes::*;
pub use processor::*;
pub use readers::*;
// pub use stores_v2::*;
pub use stores_v3::*;
pub use stores::*;
pub use vecs::*;
#[derive(Clone)]
+1 -1
View File
@@ -221,7 +221,7 @@ impl<'a> BlockProcessor<'a> {
{
txindex
} else {
return Err(Error::Str("Can't find txid = {txid}"));
return Err(Error::UnknownTxid);
};
let txoutindex = self
@@ -2,12 +2,12 @@ use std::{fs, path::Path, time::Instant};
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, Kind3, Mode3, StoreFjallV3 as Store};
use brk_store::{AnyStore, Kind, Mode, Store};
use brk_types::{
AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, OutPoint,
OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout,
};
use fjall3::{Database, PersistMode};
use fjall::{Database, PersistMode};
use log::info;
use rayon::prelude::*;
use vecdb::{AnyVec, TypedVecIterator, VecIndex, VecIterator};
@@ -36,7 +36,7 @@ impl Stores {
fs::create_dir_all(&pathbuf)?;
let database = match brk_store::open_fjall3_database(path) {
let database = match brk_store::open_database(path) {
Ok(database) => database,
Err(_) => {
fs::remove_dir_all(path)?;
@@ -47,14 +47,13 @@ impl Stores {
let database_ref = &database;
let create_addresshash_to_addressindex_store = |index| {
Store::import_cached(
Store::import(
database_ref,
path,
&format!("h2i{}", index),
version,
Mode3::PushOnly,
Kind3::Random,
10,
Mode::PushOnly,
Kind::Random,
)
};
@@ -64,8 +63,8 @@ impl Stores {
path,
&format!("a2t{}", index),
version,
Mode3::PushOnly,
Kind3::Vec,
Mode::PushOnly,
Kind::Vec,
)
};
@@ -75,8 +74,8 @@ impl Stores {
path,
&format!("a2u{}", index),
version,
Mode3::Any,
Kind3::Vec,
Mode::Any,
Kind::Vec,
)
};
@@ -88,8 +87,8 @@ impl Stores {
path,
"height_to_coinbase_tag",
version,
Mode3::PushOnly,
Kind3::Sequential,
Mode::PushOnly,
Kind::Sequential,
)?,
addresstype_to_addresshash_to_addressindex: ByAddressType::new_with_index(
create_addresshash_to_addressindex_store,
@@ -105,17 +104,17 @@ impl Stores {
path,
"blockhashprefix_to_height",
version,
Mode3::PushOnly,
Kind3::Random,
Mode::PushOnly,
Kind::Random,
)?,
txidprefix_to_txindex: Store::import_cached(
database_ref,
path,
"txidprefix_to_txindex",
version,
Mode3::PushOnly,
Kind3::Random,
10,
Mode::PushOnly,
Kind::Recent,
5,
)?,
})
}
@@ -170,7 +169,7 @@ impl Stores {
.par_values_mut()
.map(|s| s as &mut dyn AnyStore),
) // Changed from par_iter_mut()
.try_for_each(|store| store.commit_f3(height))?;
.try_for_each(|store| store.commit(height))?;
info!("Commits done in {:?}", i.elapsed());
let i = Instant::now();
-344
View File
@@ -1,344 +0,0 @@
use std::{fs, path::Path};
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, Mode, StoreFjallV2 as Store, Type};
use brk_types::{
AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, OutPoint,
OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout,
};
use fjall2::{CompressionType as Compression, PersistMode, TransactionalKeyspace};
use rayon::prelude::*;
use vecdb::{AnyVec, TypedVecIterator, VecIndex, VecIterator};
use crate::Indexes;
use super::Vecs;
#[derive(Clone)]
pub struct Stores {
pub keyspace: TransactionalKeyspace,
pub addresstype_to_addresshash_to_addressindex: ByAddressType<Store<AddressHash, TypeIndex>>,
pub addresstype_to_addressindex_and_txindex: ByAddressType<Store<AddressIndexTxIndex, Unit>>,
pub addresstype_to_addressindex_and_unspentoutpoint:
ByAddressType<Store<AddressIndexOutPoint, Unit>>,
pub blockhashprefix_to_height: Store<BlockHashPrefix, Height>,
pub height_to_coinbase_tag: Store<Height, StoredString>,
pub txidprefix_to_txindex: Store<TxidPrefix, TxIndex>,
}
impl Stores {
pub fn forced_import(parent: &Path, version: Version) -> Result<Self> {
let pathbuf = parent.join("stores");
let path = pathbuf.as_path();
fs::create_dir_all(&pathbuf)?;
let keyspace = match brk_store::open_keyspace(path) {
Ok(keyspace) => keyspace,
Err(_) => {
fs::remove_dir_all(path)?;
return Self::forced_import(path, version);
}
};
let keyspace_ref = &keyspace;
let create_addresshash_to_addressindex_store = |index| {
Store::import(
keyspace_ref,
path,
&format!("h2i{}", index),
version,
Mode::UniquePushOnly(Type::Random),
Compression::Lz4,
)
};
let create_addressindex_to_txindex_store = |index| {
Store::import(
keyspace_ref,
path,
&format!("a2t{}", index),
version,
Mode::VecLike,
Compression::Lz4,
)
};
let create_addressindex_to_unspentoutpoint_store = |index| {
Store::import(
keyspace_ref,
path,
&format!("a2u{}", index),
version,
Mode::VecLike,
Compression::Lz4,
)
};
Ok(Self {
keyspace: keyspace.clone(),
height_to_coinbase_tag: Store::import(
keyspace_ref,
path,
"h2c",
version,
Mode::UniquePushOnly(Type::Sequential),
Compression::Lz4,
)?,
addresstype_to_addresshash_to_addressindex: ByAddressType::new_with_index(
create_addresshash_to_addressindex_store,
)?,
blockhashprefix_to_height: Store::import(
keyspace_ref,
path,
"b2h",
version,
Mode::UniquePushOnly(Type::Random),
Compression::Lz4,
)?,
txidprefix_to_txindex: Store::import(
keyspace_ref,
path,
"t2t",
version,
Mode::UniquePushOnly(Type::Random),
Compression::Lz4,
)?,
addresstype_to_addressindex_and_txindex: ByAddressType::new_with_index(
create_addressindex_to_txindex_store,
)?,
addresstype_to_addressindex_and_unspentoutpoint: ByAddressType::new_with_index(
create_addressindex_to_unspentoutpoint_store,
)?,
})
}
pub fn starting_height(&self) -> Height {
[
&self.blockhashprefix_to_height as &dyn AnyStore,
&self.height_to_coinbase_tag,
&self.txidprefix_to_txindex,
]
.into_iter()
.chain(
self.addresstype_to_addresshash_to_addressindex
.values()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_txindex
.values()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_unspentoutpoint
.values()
.map(|s| s as &dyn AnyStore),
)
.map(|store| store.height().map(Height::incremented).unwrap_or_default())
.min()
.unwrap()
}
pub fn commit(&mut self, height: Height) -> Result<()> {
let tuples = [
&mut self.blockhashprefix_to_height as &mut dyn AnyStore,
&mut self.height_to_coinbase_tag,
&mut self.txidprefix_to_txindex,
]
.into_par_iter()
.chain(
self.addresstype_to_addresshash_to_addressindex
.par_values_mut()
.map(|s| s as &mut dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_txindex
.par_values_mut()
.map(|s| s as &mut dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_unspentoutpoint
.par_values_mut()
.map(|s| s as &mut dyn AnyStore),
)
.map(|store| {
let items = store.take_all_f2();
store.export_meta_if_needed(height)?;
Ok((store.partition(), items))
})
.collect::<Result<Vec<_>>>()?;
self.keyspace.inner().batch().commit_partitions(tuples)?;
self.keyspace
.persist(PersistMode::SyncData)
.map_err(|e| e.into())
}
pub fn rollback_if_needed(
&mut self,
vecs: &mut Vecs,
starting_indexes: &Indexes,
) -> Result<()> {
if self.blockhashprefix_to_height.is_empty()?
&& self.txidprefix_to_txindex.is_empty()?
&& self.height_to_coinbase_tag.is_empty()?
&& self
.addresstype_to_addresshash_to_addressindex
.values()
.try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?
&& self
.addresstype_to_addressindex_and_txindex
.values()
.try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?
&& self
.addresstype_to_addressindex_and_unspentoutpoint
.values()
.try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?
{
return Ok(());
}
if starting_indexes.height != Height::ZERO {
vecs.height_to_blockhash
.iter()?
.skip(starting_indexes.height.to_usize())
.map(BlockHashPrefix::from)
.for_each(|prefix| {
self.blockhashprefix_to_height.remove(prefix);
});
(starting_indexes.height.to_usize()..vecs.height_to_blockhash.len())
.map(Height::from)
.for_each(|h| {
self.height_to_coinbase_tag.remove(h);
});
// Remove address hashes for all address types starting from rollback height
for address_type in [
OutputType::P2PK65,
OutputType::P2PK33,
OutputType::P2PKH,
OutputType::P2SH,
OutputType::P2WPKH,
OutputType::P2WSH,
OutputType::P2TR,
OutputType::P2A,
] {
for hash in vecs.iter_address_hashes_from(address_type, starting_indexes.height)? {
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(address_type)
.remove(hash);
}
}
} else {
unreachable!();
}
if starting_indexes.txindex != TxIndex::ZERO {
vecs.txindex_to_txid
.iter()?
.enumerate()
.skip(starting_indexes.txindex.to_usize())
.for_each(|(txindex, txid)| {
let txindex = TxIndex::from(txindex);
let txidprefix = TxidPrefix::from(&txid);
let is_known_dup =
crate::DUPLICATE_TXID_PREFIXES
.iter()
.any(|(dup_prefix, dup_txindex)| {
txindex == *dup_txindex && txidprefix == *dup_prefix
});
if !is_known_dup {
self.txidprefix_to_txindex.remove(txidprefix);
}
});
} else {
unreachable!();
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
let mut txoutindex_to_txindex_iter = vecs.txoutindex_to_txindex.iter()?;
let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?;
vecs.txoutindex_to_outputtype
.iter()?
.enumerate()
.skip(starting_indexes.txoutindex.to_usize())
.zip(
vecs.txoutindex_to_typeindex
.iter()?
.skip(starting_indexes.txoutindex.to_usize()),
)
.filter(|((_, outputtype), _)| outputtype.is_address())
.for_each(|((txoutindex, addresstype), addressindex)| {
let txindex = txoutindex_to_txindex_iter.get_at_unwrap(txoutindex);
self.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.remove(AddressIndexTxIndex::from((addressindex, txindex)));
let vout = Vout::from(
txoutindex.to_usize()
- txindex_to_first_txoutindex_iter
.get_unwrap(txindex)
.to_usize(),
);
let outpoint = OutPoint::new(txindex, vout);
self.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.remove(AddressIndexOutPoint::from((addressindex, outpoint)));
});
// Add back outputs that were spent after the rollback point
let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?;
let mut txoutindex_to_outputtype_iter = vecs.txoutindex_to_outputtype.iter()?;
let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.iter()?;
vecs.txinindex_to_outpoint
.iter()?
.skip(starting_indexes.txinindex.to_usize())
.for_each(|outpoint| {
if outpoint.is_coinbase() {
return;
}
let txindex = outpoint.txindex();
let vout = outpoint.vout();
// Calculate txoutindex from txindex and vout
let txoutindex = txindex_to_first_txoutindex_iter.get_unwrap(txindex) + vout;
// Only process if this output was created before the rollback point
if txoutindex < starting_indexes.txoutindex {
let outputtype = txoutindex_to_outputtype_iter.get_unwrap(txoutindex);
if outputtype.is_address() {
let addresstype = outputtype;
let addressindex = txoutindex_to_typeindex_iter.get_unwrap(txoutindex);
self.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.remove(AddressIndexTxIndex::from((addressindex, txindex)));
self.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.insert(AddressIndexOutPoint::from((addressindex, outpoint)), Unit);
}
}
});
} else {
unreachable!();
}
self.commit(starting_indexes.height.decremented().unwrap_or_default())?;
Ok(())
}
}
+2 -4
View File
@@ -13,8 +13,6 @@ build = "build.rs"
[dependencies]
brk_error = { workspace = true }
brk_types = { workspace = true }
byteview_f2 = { version = "=0.6.1", package = "byteview" }
byteview_f3 = { version = "0.9.1", package = "byteview" }
fjall2 = { workspace = true }
fjall3 = { workspace = true }
byteview = { workspace = true }
fjall = { workspace = true }
rustc-hash = { workspace = true }
+3 -6
View File
@@ -1,5 +1,6 @@
use brk_error::Result;
use brk_types::{Height, Version};
use fjall::Keyspace;
pub trait AnyStore: Send + Sync {
fn name(&self) -> &'static str;
@@ -8,10 +9,6 @@ pub trait AnyStore: Send + Sync {
fn needs(&self, height: Height) -> bool;
fn version(&self) -> Version;
fn export_meta_if_needed(&mut self, height: Height) -> Result<()>;
fn keyspace(&self) -> &fjall3::Keyspace;
fn partition(&self) -> &fjall2::PartitionHandle;
fn take_all_f2(&mut self) -> Vec<fjall2::InnerItem>;
fn commit_f3(&mut self, height: Height) -> Result<()>;
// fn take_all_f3(&mut self) -> Vec<fjall3::InnerItem>;
// fn take_all_f3(&mut self) -> Box<dyn Iterator<Item = Item>>;
fn keyspace(&self) -> &Keyspace;
fn commit(&mut self, height: Height) -> Result<()>;
}
-92
View File
@@ -1,92 +0,0 @@
use std::{
fs, io,
path::{Path, PathBuf},
};
use brk_error::Result;
use brk_types::Version;
use fjall2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle};
use super::Height;
#[derive(Debug, Clone)]
pub struct StoreMeta {
pathbuf: PathBuf,
version: Version,
height: Option<Height>,
}
impl StoreMeta {
pub fn checked_open<F>(
keyspace: &TransactionalKeyspace,
path: &Path,
version: Version,
open_partition_handle: F,
) -> Result<(Self, TransactionalPartitionHandle)>
where
F: Fn() -> Result<TransactionalPartitionHandle>,
{
fs::create_dir_all(path)?;
let mut partition = open_partition_handle()?;
if Version::try_from(Self::path_version_(path).as_path())
.is_ok_and(|prev_version| version != prev_version)
{
fs::remove_dir_all(path)?;
keyspace.delete_partition(partition)?;
keyspace.persist(PersistMode::SyncAll)?;
fs::create_dir(path)?;
partition = open_partition_handle()?;
}
let slf = Self {
pathbuf: path.to_owned(),
version,
height: Height::try_from(Self::path_height_(path).as_path()).ok(),
};
slf.version.write(&slf.path_version())?;
Ok((slf, partition))
}
pub fn version(&self) -> Version {
self.version
}
pub fn export(&mut self, height: Height) -> io::Result<()> {
self.height = Some(height);
height.write(&self.path_height())
}
pub fn path(&self) -> &Path {
&self.pathbuf
}
fn path_version(&self) -> PathBuf {
Self::path_version_(&self.pathbuf)
}
fn path_version_(path: &Path) -> PathBuf {
path.join("version")
}
#[inline]
pub fn height(&self) -> Option<Height> {
self.height
}
#[inline]
pub fn needs(&self, height: Height) -> bool {
self.height.is_none_or(|self_height| height > self_height)
}
#[inline]
pub fn has(&self, height: Height) -> bool {
!self.needs(height)
}
fn path_height(&self) -> PathBuf {
Self::path_height_(&self.pathbuf)
}
fn path_height_(path: &Path) -> PathBuf {
path.join("height")
}
}
-312
View File
@@ -1,312 +0,0 @@
use std::{borrow::Cow, cmp, fmt::Debug, fs, hash::Hash, mem, path::Path};
use brk_error::Result;
use brk_types::{Height, Version};
use byteview_f2::ByteView;
use fjall2::{
CompressionType, InnerItem, PartitionCreateOptions, TransactionalKeyspace,
TransactionalPartitionHandle, ValueType,
};
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
mod meta;
use meta::*;
#[derive(Clone)]
pub struct StoreFjallV2<Key, Value> {
meta: StoreMeta,
name: &'static str,
keyspace: TransactionalKeyspace,
partition: TransactionalPartitionHandle,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
}
const MAJOR_FJALL_VERSION: Version = Version::TWO;
pub fn open_keyspace(path: &Path) -> fjall2::Result<TransactionalKeyspace> {
fjall2::Config::new(path.join("fjall"))
.manual_journal_persist(true)
.max_write_buffer_size(256 * 1_024 * 1_024)
.open_transactional()
}
impl<K, V> StoreFjallV2<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
{
fn open_partition_handle(
keyspace: &TransactionalKeyspace,
name: &str,
mode: Mode,
compression: CompressionType,
) -> Result<TransactionalPartitionHandle> {
let mut options = PartitionCreateOptions::default()
.compression(compression)
.manual_journal_persist(true);
if mode.is_unique_push_only() {
options = options.bloom_filter_bits(Some(7));
} else {
options = options
.max_memtable_size(8 * 1024 * 1024)
.bloom_filter_bits(None);
}
keyspace.open_partition(name, options).map_err(|e| e.into())
}
pub fn import(
keyspace: &TransactionalKeyspace,
path: &Path,
name: &str,
version: Version,
mode: Mode,
compression: CompressionType,
) -> Result<Self> {
fs::create_dir_all(path)?;
let (meta, partition) = StoreMeta::checked_open(
keyspace,
&path.join(format!("meta/{name}")),
MAJOR_FJALL_VERSION + version,
|| {
Self::open_partition_handle(keyspace, name, mode, compression).inspect_err(|e| {
eprintln!("{e}");
eprintln!("Delete {path:?} and try again");
})
},
)?;
Ok(Self {
meta,
name: Box::leak(Box::new(name.to_string())),
keyspace: keyspace.clone(),
partition,
puts: FxHashMap::default(),
dels: FxHashSet::default(),
})
}
#[inline]
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
ByteView: From<&'a K>,
{
if let Some(v) = self.puts.get(key) {
Ok(Some(Cow::Borrowed(v)))
} else if let Some(slice) = self.partition.get(ByteView::from(key))? {
Ok(Some(Cow::Owned(V::from(ByteView::from(&*slice)))))
} else {
Ok(None)
}
}
pub fn is_empty(&self) -> Result<bool> {
self.keyspace
.read_tx()
.is_empty(&self.partition)
.map_err(|e| e.into())
}
pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
self.keyspace
.read_tx()
.iter(&self.partition)
.map(|res| res.unwrap())
.map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
}
#[inline]
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
self.insert(key, value);
}
}
#[inline]
pub fn insert(&mut self, key: K, value: V) {
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
#[inline]
pub fn remove(&mut self, key: K) {
if self.puts.remove(&key).is_some() {
return;
}
let newly_inserted = self.dels.insert(key);
debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
}
#[inline]
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
}
}
#[inline]
pub fn approximate_len(&self) -> usize {
self.partition.approximate_len()
}
#[inline]
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
#[inline]
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
}
impl<K, V> AnyStore for StoreFjallV2<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash + 'static,
V: Debug + Clone + From<ByteView> + 'static,
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
fn keyspace(&self) -> &fjall3::Keyspace {
panic!()
}
fn partition(&self) -> &fjall2::PartitionHandle {
self.partition.inner()
}
fn take_all_f2(&mut self) -> Vec<InnerItem> {
let mut items = mem::take(&mut self.puts)
.into_iter()
.map(|(key, value)| Item::Value { key, value })
.chain(
mem::take(&mut self.dels)
.into_iter()
.map(|key| Item::Tomb(key)),
)
.collect::<Vec<_>>();
items.sort_unstable();
items.into_iter().map(InnerItem::from).collect()
}
// fn take_all_f3(&mut self) -> Vec<fjall3::InnerItem> {
// panic!()
// }
fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
if self.has(height) {
return Ok(());
}
self.meta.export(height)?;
Ok(())
}
fn name(&self) -> &'static str {
self.name
}
fn height(&self) -> Option<Height> {
self.meta.height()
}
fn has(&self, height: Height) -> bool {
self.has(height)
}
fn needs(&self, height: Height) -> bool {
self.needs(height)
}
fn version(&self) -> Version {
self.meta.version()
}
fn commit_f3(&mut self, _: Height) -> Result<()> {
Ok(())
}
}
enum Item<K, V> {
Value { key: K, value: V },
Tomb(K),
}
impl<K: Ord, V> Ord for Item<K, V> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.key().cmp(other.key())
}
}
impl<K: Ord, V> PartialOrd for Item<K, V> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<K: Eq, V> PartialEq for Item<K, V> {
fn eq(&self, other: &Self) -> bool {
self.key() == other.key()
}
}
impl<K: Eq, V> Eq for Item<K, V> {}
impl<K, V> Item<K, V> {
fn key(&self) -> &K {
match self {
Self::Value { key, .. } | Self::Tomb(key) => key,
}
}
}
impl<K, V> From<Item<K, V>> for InnerItem
where
K: Into<ByteView>,
V: Into<ByteView>,
{
#[inline]
fn from(value: Item<K, V>) -> Self {
match value {
Item::Value { key, value } => Self {
key: key.into().into(),
value: value.into().into(),
value_type: ValueType::Value,
},
Item::Tomb(key) => Self {
key: key.into().into(),
value: [].into(),
value_type: ValueType::Tombstone,
},
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum Mode {
VecLike,
UniquePushOnly(Type),
}
#[derive(Debug, Clone, Copy)]
pub enum Type {
Random,
Sequential,
}
impl Mode {
pub fn is_vec_like(&self) -> bool {
matches!(*self, Self::VecLike)
}
pub fn is_unique_push_only(&self) -> bool {
matches!(*self, Self::UniquePushOnly(_))
}
}
-380
View File
@@ -1,380 +0,0 @@
use std::{borrow::Cow, cmp::Ordering, fmt::Debug, fs, hash::Hash, mem, path::Path};
use brk_error::Result;
use brk_types::{Height, Version};
use byteview_f3::ByteView;
use fjall3::{Database, Keyspace, KeyspaceCreateOptions, config::*};
use rustc_hash::{FxHashMap, FxHashSet};
mod meta;
use meta::*;
use crate::any::AnyStore;
const MAJOR_FJALL_VERSION: Version = Version::new(3);
pub fn open_fjall3_database(path: &Path) -> fjall3::Result<Database> {
Database::builder(path.join("fjall"))
.cache_size(2 * 1024 * 1024 * 1024)
.open()
}
#[derive(Clone)]
pub struct StoreFjallV3<K, V> {
meta: StoreMeta,
name: &'static str,
keyspace: Keyspace,
puts: FxHashMap<K, V>,
dels: FxHashSet<K>,
caches: Vec<FxHashMap<K, V>>,
}
impl<K, V> StoreFjallV3<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
pub fn import(
db: &Database,
path: &Path,
name: &str,
version: Version,
mode: Mode3,
kind: Kind3,
) -> Result<Self> {
Self::import_inner(db, path, name, version, mode, kind, 0)
}
pub fn import_cached(
db: &Database,
path: &Path,
name: &str,
version: Version,
mode: Mode3,
kind: Kind3,
max_batches: u8,
) -> Result<Self> {
Self::import_inner(db, path, name, version, mode, kind, max_batches)
}
fn import_inner(
db: &Database,
path: &Path,
name: &str,
version: Version,
mode: Mode3,
kind: Kind3,
max_batches: u8,
) -> Result<Self> {
fs::create_dir_all(path)?;
let (meta, keyspace) = StoreMeta::checked_open(
db,
&path.join(format!("meta/{name}")),
MAJOR_FJALL_VERSION + version,
|| {
Self::open_keyspace(db, name, mode, kind).inspect_err(|e| {
eprintln!("{e}");
eprintln!("Delete {path:?} and try again");
})
},
)?;
let mut caches = vec![];
for _ in 0..max_batches {
caches.push(FxHashMap::default());
}
Ok(Self {
meta,
name: Box::leak(Box::new(name.to_string())),
keyspace,
puts: FxHashMap::default(),
dels: FxHashSet::default(),
caches,
})
}
fn open_keyspace(
database: &Database,
name: &str,
_mode: Mode3,
kind: Kind3,
) -> Result<Keyspace> {
let mut options = KeyspaceCreateOptions::default()
.manual_journal_persist(true)
.expect_point_read_hits(true)
.filter_block_partitioning_policy(PartitioningPolicy::new([false, false, true]))
.index_block_partitioning_policy(PartitioningPolicy::new([false, false, true]));
if kind.is_not_vec() {
options = options.filter_policy(FilterPolicy::new([
FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)),
FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)),
FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(8.0)),
FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(7.0)),
]));
} else {
options = options
.max_memtable_size(8 * 1024 * 1024)
.filter_policy(FilterPolicy::disabled());
}
if kind.is_sequential() {
options = options
.filter_block_partitioning_policy(PartitioningPolicy::all(true))
.index_block_partitioning_policy(PartitioningPolicy::all(true))
.filter_block_pinning_policy(PinningPolicy::all(false))
.index_block_pinning_policy(PinningPolicy::all(false));
}
database.keyspace(name, || options).map_err(|e| e.into())
}
#[inline]
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
ByteView: From<&'a K>,
{
if let Some(v) = self.puts.get(key) {
return Ok(Some(Cow::Borrowed(v)));
}
for cache in &self.caches {
if let Some(v) = cache.get(key) {
return Ok(Some(Cow::Borrowed(v)));
}
}
if let Some(slice) = self.keyspace.get(ByteView::from(key))? {
Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
} else {
Ok(None)
}
}
#[inline]
pub fn is_empty(&self) -> Result<bool> {
self.keyspace.is_empty().map_err(|e| e.into())
}
#[inline]
pub fn insert(&mut self, key: K, value: V) {
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
#[inline]
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
self.insert(key, value);
}
}
#[inline]
pub fn remove(&mut self, key: K) {
if self.puts.remove(&key).is_some() {
return;
}
let newly_inserted = self.dels.insert(key);
debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
}
#[inline]
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
}
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
self.keyspace
.iter()
.map(|res| res.into_inner().unwrap())
.map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
}
#[inline]
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
#[inline]
pub fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
if !self.has(height) {
self.meta.export(height)?;
}
Ok(())
}
fn ingest<'a>(
keyspace: &Keyspace,
puts: impl Iterator<Item = (&'a K, &'a V)>,
dels: impl Iterator<Item = &'a K>,
) -> Result<()>
where
ByteView: From<&'a K> + From<&'a V>,
K: 'a,
V: 'a,
{
let mut items: Vec<Item<&'a K, &'a V>> = puts
.map(|(key, value)| Item::Value { key, value })
.chain(dels.map(Item::Tomb))
.collect();
items.sort_unstable();
let mut ingestion = keyspace.start_ingestion()?;
for item in items {
match item {
Item::Value { key, value } => {
ingestion.write(ByteView::from(key), ByteView::from(value))?;
}
Item::Tomb(key) => {
ingestion.write_tombstone(ByteView::from(key))?;
}
}
}
ingestion.finish()?;
Ok(())
}
}
impl<K, V> AnyStore for StoreFjallV3<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
for<'a> ByteView: From<K> + From<V> + From<&'a K> + From<&'a V>,
Self: Send + Sync,
{
fn keyspace(&self) -> &Keyspace {
&self.keyspace
}
fn take_all_f2(&mut self) -> Vec<fjall2::InnerItem> {
vec![]
}
fn partition(&self) -> &fjall2::PartitionHandle {
panic!()
}
fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
self.export_meta_if_needed(height)
}
fn name(&self) -> &'static str {
self.name
}
fn height(&self) -> Option<Height> {
self.meta.height()
}
fn has(&self, height: Height) -> bool {
self.has(height)
}
fn needs(&self, height: Height) -> bool {
self.needs(height)
}
fn version(&self) -> Version {
self.meta.version()
}
fn commit_f3(&mut self, height: Height) -> Result<()> {
self.export_meta_if_needed(height)?;
let puts = mem::take(&mut self.puts);
let dels = mem::take(&mut self.dels);
if puts.is_empty() && dels.is_empty() {
return Ok(());
}
Self::ingest(&self.keyspace, puts.iter(), dels.iter())?;
if !self.caches.is_empty() {
self.caches.pop();
self.caches.insert(0, puts);
}
Ok(())
}
}
enum Item<K, V> {
Value { key: K, value: V },
Tomb(K),
}
impl<K, V> Item<K, V> {
#[inline]
fn key(&self) -> &K {
match self {
Self::Value { key, .. } | Self::Tomb(key) => key,
}
}
}
impl<K: Ord, V> Ord for Item<K, V> {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
self.key().cmp(other.key())
}
}
impl<K: Ord, V> PartialOrd for Item<K, V> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<K: Eq, V> PartialEq for Item<K, V> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.key() == other.key()
}
}
impl<K: Eq, V> Eq for Item<K, V> {}
#[derive(Debug, Clone, Copy)]
pub enum Mode3 {
Any,
PushOnly,
}
impl Mode3 {
pub fn is_any(&self) -> bool {
matches!(*self, Self::Any)
}
pub fn is_push_only(&self) -> bool {
matches!(*self, Self::PushOnly)
}
}
#[derive(Debug, Clone, Copy)]
pub enum Kind3 {
Random,
Sequential,
Vec,
}
impl Kind3 {
pub fn is_sequential(&self) -> bool {
matches!(*self, Self::Sequential)
}
pub fn is_random(&self) -> bool {
matches!(*self, Self::Random)
}
pub fn is_not_vec(&self) -> bool {
!matches!(*self, Self::Vec)
}
}
+38
View File
@@ -0,0 +1,38 @@
use std::cmp::Ordering;
pub enum Item<K, V> {
Value { key: K, value: V },
Tomb(K),
}
impl<K, V> Item<K, V> {
#[inline]
fn key(&self) -> &K {
match self {
Self::Value { key, .. } | Self::Tomb(key) => key,
}
}
}
impl<K: Ord, V> Ord for Item<K, V> {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
self.key().cmp(other.key())
}
}
impl<K: Ord, V> PartialOrd for Item<K, V> {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<K: Eq, V> PartialEq for Item<K, V> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.key() == other.key()
}
}
impl<K: Eq, V> Eq for Item<K, V> {}
+25
View File
@@ -0,0 +1,25 @@
#[derive(Debug, Clone, Copy)]
pub enum Kind {
Recent,
Random,
Sequential,
Vec,
}
impl Kind {
pub fn is_sequential(&self) -> bool {
matches!(*self, Self::Sequential)
}
pub fn is_recent(&self) -> bool {
matches!(*self, Self::Recent)
}
pub fn is_random(&self) -> bool {
matches!(*self, Self::Random)
}
pub fn is_vec(&self) -> bool {
matches!(*self, Self::Vec)
}
}
+323 -4
View File
@@ -1,9 +1,328 @@
#![doc = include_str!("../README.md")]
use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, path::Path};
use brk_error::Result;
use brk_types::{Height, Version};
use byteview::ByteView;
use fjall::{Database, Keyspace, KeyspaceCreateOptions, config::*};
use rustc_hash::{FxHashMap, FxHashSet};
mod any;
mod fjall_v2;
mod fjall_v3;
mod item;
mod kind;
mod meta;
mod mode;
pub use any::*;
pub use fjall_v2::*;
pub use fjall_v3::*;
pub use item::*;
pub use kind::*;
pub use meta::*;
pub use mode::*;
const MAJOR_FJALL_VERSION: Version = Version::new(3);
pub fn open_database(path: &Path) -> fjall::Result<Database> {
Database::builder(path.join("fjall"))
.cache_size(3 * 1024 * 1024 * 1024)
.open()
}
#[derive(Clone)]
pub struct Store<K, V> {
meta: StoreMeta,
name: &'static str,
keyspace: Keyspace,
puts: FxHashMap<K, V>,
dels: FxHashSet<K>,
caches: Vec<FxHashMap<K, V>>,
}
impl<K, V> Store<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
pub fn import(
db: &Database,
path: &Path,
name: &str,
version: Version,
mode: Mode,
kind: Kind,
) -> Result<Self> {
Self::import_inner(db, path, name, version, mode, kind, 0)
}
pub fn import_cached(
db: &Database,
path: &Path,
name: &str,
version: Version,
mode: Mode,
kind: Kind,
max_batches: u8,
) -> Result<Self> {
Self::import_inner(db, path, name, version, mode, kind, max_batches)
}
fn import_inner(
db: &Database,
path: &Path,
name: &str,
version: Version,
mode: Mode,
kind: Kind,
max_batches: u8,
) -> Result<Self> {
fs::create_dir_all(path)?;
let (meta, keyspace) = StoreMeta::checked_open(
db,
&path.join(format!("meta/{name}")),
MAJOR_FJALL_VERSION + version,
|| {
Self::open_keyspace(db, name, mode, kind).inspect_err(|e| {
eprintln!("{e}");
eprintln!("Delete {path:?} and try again");
})
},
)?;
let mut caches = vec![];
for _ in 0..max_batches {
caches.push(FxHashMap::default());
}
Ok(Self {
meta,
name: Box::leak(Box::new(name.to_string())),
keyspace,
puts: FxHashMap::default(),
dels: FxHashSet::default(),
caches,
})
}
fn open_keyspace(database: &Database, name: &str, _mode: Mode, kind: Kind) -> Result<Keyspace> {
let mut options = KeyspaceCreateOptions::default()
.manual_journal_persist(true)
.filter_block_partitioning_policy(PartitioningPolicy::new([false, false, true]))
.index_block_partitioning_policy(PartitioningPolicy::new([false, false, true]));
match kind {
Kind::Random => {
options = options
.filter_block_pinning_policy(PinningPolicy::new([true, true, true, false]))
.filter_policy(FilterPolicy::new([
FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(
0.0001,
)),
FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)),
FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)),
FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(9.0)),
]));
}
Kind::Recent => {
options = options
.expect_point_read_hits(true)
.filter_policy(FilterPolicy::new([
FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(
0.0001,
)),
FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.001)),
FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(8.0)),
FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(7.0)),
]));
}
Kind::Sequential => {
options = options
.filter_block_partitioning_policy(PartitioningPolicy::all(true))
.index_block_partitioning_policy(PartitioningPolicy::all(true))
.filter_block_pinning_policy(PinningPolicy::all(false))
.index_block_pinning_policy(PinningPolicy::all(false));
}
Kind::Vec => {
options = options
.max_memtable_size(8 * 1024 * 1024)
.filter_policy(FilterPolicy::disabled())
.filter_block_pinning_policy(PinningPolicy::all(false))
.index_block_pinning_policy(PinningPolicy::all(false));
}
}
database.keyspace(name, || options).map_err(|e| e.into())
}
#[inline]
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
ByteView: From<&'a K>,
{
if let Some(v) = self.puts.get(key) {
return Ok(Some(Cow::Borrowed(v)));
}
for cache in &self.caches {
if let Some(v) = cache.get(key) {
return Ok(Some(Cow::Borrowed(v)));
}
}
if let Some(slice) = self.keyspace.get(ByteView::from(key))? {
Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
} else {
Ok(None)
}
}
#[inline]
pub fn is_empty(&self) -> Result<bool> {
self.keyspace.is_empty().map_err(|e| e.into())
}
#[inline]
pub fn insert(&mut self, key: K, value: V) {
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
#[inline]
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
self.insert(key, value);
}
}
#[inline]
pub fn remove(&mut self, key: K) {
if self.puts.remove(&key).is_some() {
return;
}
let newly_inserted = self.dels.insert(key);
debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
}
#[inline]
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
}
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
self.keyspace
.iter()
.map(|res| res.into_inner().unwrap())
.map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
}
#[inline]
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
#[inline]
pub fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
if !self.has(height) {
self.meta.export(height)?;
}
Ok(())
}
fn ingest<'a>(
keyspace: &Keyspace,
puts: impl Iterator<Item = (&'a K, &'a V)>,
dels: impl Iterator<Item = &'a K>,
) -> Result<()>
where
ByteView: From<&'a K> + From<&'a V>,
K: 'a,
V: 'a,
{
let mut items: Vec<Item<&'a K, &'a V>> = puts
.map(|(key, value)| Item::Value { key, value })
.chain(dels.map(Item::Tomb))
.collect();
items.sort_unstable();
let mut ingestion = keyspace.start_ingestion()?;
for item in items {
match item {
Item::Value { key, value } => {
ingestion.write(ByteView::from(key), ByteView::from(value))?;
}
Item::Tomb(key) => {
ingestion.write_tombstone(ByteView::from(key))?;
}
}
}
ingestion.finish()?;
Ok(())
}
}
impl<K, V> AnyStore for Store<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
for<'a> ByteView: From<K> + From<V> + From<&'a K> + From<&'a V>,
Self: Send + Sync,
{
fn keyspace(&self) -> &Keyspace {
&self.keyspace
}
fn export_meta_if_needed(&mut self, height: Height) -> Result<()> {
self.export_meta_if_needed(height)
}
fn name(&self) -> &'static str {
self.name
}
fn height(&self) -> Option<Height> {
self.meta.height()
}
fn has(&self, height: Height) -> bool {
self.has(height)
}
fn needs(&self, height: Height) -> bool {
self.needs(height)
}
fn version(&self) -> Version {
self.meta.version()
}
fn commit(&mut self, height: Height) -> Result<()> {
self.export_meta_if_needed(height)?;
let puts = mem::take(&mut self.puts);
let dels = mem::take(&mut self.dels);
if puts.is_empty() && dels.is_empty() {
return Ok(());
}
Self::ingest(&self.keyspace, puts.iter(), dels.iter())?;
if !self.caches.is_empty() {
self.caches.pop();
self.caches.insert(0, puts);
}
Ok(())
}
}
@@ -5,7 +5,7 @@ use std::{
use brk_error::Result;
use brk_types::Version;
use fjall3::{Database, Keyspace};
use fjall::{Database, Keyspace};
use super::Height;
+15
View File
@@ -0,0 +1,15 @@
#[derive(Debug, Clone, Copy)]
pub enum Mode {
Any,
PushOnly,
}
impl Mode {
pub fn is_any(&self) -> bool {
matches!(*self, Self::Any)
}
pub fn is_push_only(&self) -> bool {
matches!(*self, Self::PushOnly)
}
}