indexer: perf + support fjall v3

This commit is contained in:
nym21
2025-10-18 18:27:59 +02:00
parent 6cce92af22
commit 71078b5bdd
34 changed files with 2635 additions and 1698 deletions

2
.gitignore vendored
View File

@@ -14,7 +14,7 @@ bridge/
_*
# Logs
.log
.log*
# Environment variables/configs
.env

1112
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -65,13 +65,18 @@ brk_structs = { version = "0.0.111", path = "crates/brk_structs" }
brk_traversable = { version = "0.0.111", path = "crates/brk_traversable", features = ["derive"] }
brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_derive" }
byteview = "=0.6.1"
# byteview = "~0.8.0"
derive_deref = "1.1.1"
fjall = "2.11.2"
fjall_v2 = { version = "2.11.2", package = "fjall" }
# fjall_v3 = { version = "=3.0.0-pre.0", package = "fjall" }
fjall_v3 = { path = "../fjall", package = "fjall" }
# fjall_v3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "bb15057500dce3115d7644d268b9deeaa895b431", package = "fjall" }
jiff = "0.2.15"
log = "0.4.28"
minreq = { version = "2.14.1", features = ["https", "serde_json"] }
parking_lot = "0.12.5"
rayon = "1.11.0"
rustc-hash = "2.1.1"
schemars = "1.0.4"
serde = "1.0.228"
serde_bytes = "0.11.19"

View File

@@ -12,7 +12,8 @@ build = "build.rs"
[dependencies]
log = { workspace = true }
notify = "8.2.0"
brk_rolldown = "0.2.3"
rolldown = "0.1.0"
# brk_rolldown = "0.2.3"
# brk_rolldown = { path = "../../../rolldown/crates/rolldown"}
sugar_path = "1.2.0"
tokio = { workspace = true }

View File

@@ -6,12 +6,12 @@ use std::{
sync::Arc,
};
use brk_rolldown::{
use log::error;
use notify::{EventKind, RecursiveMode, Watcher};
use rolldown::{
Bundler, BundlerOptions, InlineConstConfig, InlineConstMode, InlineConstOption,
OptimizationOption, RawMinifyOptions, SourceMapType,
};
use log::error;
use notify::{EventKind, RecursiveMode, Watcher};
use sugar_path::SugarPath;
use tokio::sync::Mutex;
@@ -170,8 +170,7 @@ pub async fn bundle(
.watch(&absolute_modules_path_clone, RecursiveMode::Recursive)
.unwrap();
let watcher =
brk_rolldown::Watcher::new(vec![Arc::new(Mutex::new(bundler))], None).unwrap();
let watcher = rolldown::Watcher::new(vec![Arc::new(Mutex::new(bundler))], None).unwrap();
watcher.start().await;
});

View File

@@ -12,7 +12,8 @@ build = "build.rs"
[dependencies]
bitcoin = { workspace = true }
bitcoincore-rpc = { workspace = true }
fjall = { workspace = true }
fjall_v2 = { workspace = true }
fjall_v3 = { workspace = true }
jiff = { workspace = true }
minreq = { workspace = true }
sonic-rs = { workspace = true }

View File

@@ -12,7 +12,8 @@ pub enum Error {
IO(io::Error),
BitcoinRPC(bitcoincore_rpc::Error),
Jiff(jiff::Error),
Fjall(fjall::Error),
FjallV2(fjall_v2::Error),
FjallV3(fjall_v3::Error),
VecDB(vecdb::Error),
SeqDB(vecdb::SeqDBError),
Minreq(minreq::Error),
@@ -106,9 +107,15 @@ impl From<jiff::Error> for Error {
}
}
impl From<fjall::Error> for Error {
fn from(value: fjall::Error) -> Self {
Self::Fjall(value)
impl From<fjall_v3::Error> for Error {
fn from(value: fjall_v3::Error) -> Self {
Self::FjallV3(value)
}
}
impl From<fjall_v2::Error> for Error {
fn from(value: fjall_v2::Error) -> Self {
Self::FjallV2(value)
}
}
@@ -137,7 +144,8 @@ impl fmt::Display for Error {
Error::BitcoinBip34Error(error) => Display::fmt(&error, f),
Error::BitcoinFromScriptError(error) => Display::fmt(&error, f),
Error::BitcoinRPC(error) => Display::fmt(&error, f),
Error::Fjall(error) => Display::fmt(&error, f),
Error::FjallV2(error) => Display::fmt(&error, f),
Error::FjallV3(error) => Display::fmt(&error, f),
Error::IO(error) => Display::fmt(&error, f),
Error::Jiff(error) => Display::fmt(&error, f),
Error::Minreq(error) => Display::fmt(&error, f),

View File

@@ -14,3 +14,4 @@ brk_error = { workspace = true }
brk_structs = { workspace = true }
brk_traversable = { workspace = true }
vecdb = { workspace = true }
rayon = { workspace = true }

View File

@@ -3,6 +3,7 @@ use std::ops::{Add, AddAssign};
use brk_error::Result;
use brk_structs::OutputType;
use brk_traversable::{Traversable, TreeNode};
use rayon::prelude::*;
use vecdb::AnyCollectableVec;
use super::{Filter, Filtered};
@@ -63,6 +64,10 @@ impl<T> ByAddressType<T> {
}
}
pub fn get_mut_unwrap(&mut self, address_type: OutputType) -> &mut T {
self.get_mut(address_type).unwrap()
}
pub fn get_mut(&mut self, address_type: OutputType) -> Option<&mut T> {
match address_type {
OutputType::P2PK65 => Some(&mut self.p2pk65),
@@ -105,6 +110,40 @@ impl<T> ByAddressType<T> {
.into_iter()
}
pub fn par_iter(&mut self) -> impl ParallelIterator<Item = &T>
where
T: Send + Sync,
{
[
&self.p2pk65,
&self.p2pk33,
&self.p2pkh,
&self.p2sh,
&self.p2wpkh,
&self.p2wsh,
&self.p2tr,
&self.p2a,
]
.into_par_iter()
}
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut T>
where
T: Send + Sync,
{
[
&mut self.p2pk65,
&mut self.p2pk33,
&mut self.p2pkh,
&mut self.p2sh,
&mut self.p2wpkh,
&mut self.p2wsh,
&mut self.p2tr,
&mut self.p2a,
]
.into_par_iter()
}
pub fn iter_typed(&self) -> impl Iterator<Item = (OutputType, &T)> {
[
(OutputType::P2PK65, &self.p2pk65),

View File

@@ -19,7 +19,9 @@ brk_reader = { workspace = true }
brk_store = { workspace = true }
brk_structs = { workspace = true }
brk_traversable = { workspace = true }
fjall = { workspace = true }
fjall_v2 = { workspace = true }
fjall_v3 = { workspace = true }
log = { workspace = true }
rayon = { workspace = true }
rustc-hash = { workspace = true }
vecdb = { workspace = true }

File diff suppressed because it is too large Load Diff

View File

@@ -2,12 +2,12 @@ use std::{borrow::Cow, fs, path::Path};
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, Store};
use brk_store::{AnyStore, StoreV2 as Store};
use brk_structs::{
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutputType, StoredString, TxIndex,
TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, StoredString, TxIndex, TxOutIndex,
TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
};
use fjall::{PersistMode, TransactionalKeyspace};
use fjall_v2::{PersistMode, TransactionalKeyspace};
use rayon::prelude::*;
use vecdb::{AnyVec, StoredIndex, VecIterator};
@@ -61,7 +61,7 @@ impl Stores {
path,
&format!("{}addressindex_and_unspentoutpoint", cohort),
version,
Some(false),
None,
)
};
@@ -117,9 +117,24 @@ impl Stores {
}
pub fn commit(&mut self, height: Height) -> Result<()> {
self.iter_mut_any_store()
.par_bridge()
.try_for_each(|store| store.commit(height))?;
[
&mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore,
&mut self.blockhashprefix_to_height,
&mut self.height_to_coinbase_tag,
&mut self.txidprefix_to_txindex,
]
.into_par_iter() // Changed from par_iter_mut()
.chain(
self.addresstype_to_typeindex_and_txindex
.par_iter_mut()
.map(|s| s as &mut dyn AnyStore),
)
.chain(
self.addresstype_to_typeindex_and_unspentoutpoint
.par_iter_mut()
.map(|s| s as &mut dyn AnyStore),
)
.try_for_each(|store| store.commit(height))?;
self.keyspace
.persist(PersistMode::SyncAll)
@@ -146,26 +161,6 @@ impl Stores {
)
}
fn iter_mut_any_store(&mut self) -> impl Iterator<Item = &mut dyn AnyStore> {
[
&mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore,
&mut self.blockhashprefix_to_height,
&mut self.height_to_coinbase_tag,
&mut self.txidprefix_to_txindex,
]
.into_iter()
.chain(
self.addresstype_to_typeindex_and_txindex
.iter_mut()
.map(|s| s as &mut dyn AnyStore),
)
.chain(
self.addresstype_to_typeindex_and_unspentoutpoint
.iter_mut()
.map(|s| s as &mut dyn AnyStore),
)
}
pub fn rollback_if_needed(
&mut self,
vecs: &mut Vecs,
@@ -221,7 +216,7 @@ impl Stores {
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PK65));
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
@@ -241,7 +236,7 @@ impl Stores {
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PK33));
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
@@ -261,7 +256,7 @@ impl Stores {
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PKH));
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
@@ -281,7 +276,7 @@ impl Stores {
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2SH));
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
@@ -301,7 +296,7 @@ impl Stores {
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2TR));
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
@@ -321,7 +316,7 @@ impl Stores {
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2WPKH));
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
@@ -341,7 +336,7 @@ impl Stores {
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2WSH));
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
@@ -360,7 +355,7 @@ impl Stores {
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2A));
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
@@ -393,7 +388,7 @@ impl Stores {
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
todo!();
// todo!();
// let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.into_iter();
// vecs.txoutindex_to_outputtype
// .iter_at(starting_indexes.txoutindex)

View File

@@ -0,0 +1,419 @@
use std::{borrow::Cow, fs, path::Path};
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, StoreV3 as Store};
use brk_structs::{
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, StoredString, TxIndex, TxOutIndex,
TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
};
use fjall_v3::{PersistMode, TxDatabase};
use rayon::prelude::*;
use vecdb::{AnyVec, StoredIndex, VecIterator};
use crate::Indexes;
use super::Vecs;
#[derive(Clone)]
pub struct Stores {
pub database: TxDatabase,
pub addressbyteshash_to_typeindex: Store<AddressBytesHash, TypeIndex>,
pub blockhashprefix_to_height: Store<BlockHashPrefix, Height>,
pub height_to_coinbase_tag: Store<Height, StoredString>,
pub txidprefix_to_txindex: Store<TxidPrefix, TxIndex>,
pub addresstype_to_typeindex_and_txindex: ByAddressType<Store<TypeIndexAndTxIndex, Unit>>,
pub addresstype_to_typeindex_and_unspentoutpoint:
ByAddressType<Store<TypeIndexAndOutPoint, Unit>>,
}
impl Stores {
pub fn forced_import(parent: &Path, version: Version) -> Result<Self> {
let pathbuf = parent.join("stores");
let path = pathbuf.as_path();
fs::create_dir_all(&pathbuf)?;
let database = match brk_store::open_database(path) {
Ok(database) => database,
Err(_) => {
fs::remove_dir_all(path)?;
return Self::forced_import(path, version);
}
};
let database_ref = &database;
let create_addressindex_and_txindex_store = |cohort| {
Store::import(
database_ref,
path,
&format!("{}addressindex_and_txindex", cohort),
version,
Some(false),
)
};
let create_addressindex_and_unspentoutpoint_store = |cohort| {
Store::import(
database_ref,
path,
&format!("{}addressindex_and_unspentoutpoint", cohort),
version,
None,
)
};
Ok(Self {
database: database.clone(),
height_to_coinbase_tag: Store::import(
database_ref,
path,
"height_to_coinbase_tag",
version,
None,
)?,
addressbyteshash_to_typeindex: Store::import(
database_ref,
path,
"addressbyteshash_to_typeindex",
version,
None,
)?,
blockhashprefix_to_height: Store::import(
database_ref,
path,
"blockhashprefix_to_height",
version,
None,
)?,
txidprefix_to_txindex: Store::import(
database_ref,
path,
"txidprefix_to_txindex",
version,
None,
)?,
addresstype_to_typeindex_and_txindex: ByAddressType::new(
create_addressindex_and_txindex_store,
)?,
addresstype_to_typeindex_and_unspentoutpoint: ByAddressType::new(
create_addressindex_and_unspentoutpoint_store,
)?,
})
}
pub fn starting_height(&self) -> Height {
self.iter_any_store()
.map(|store| {
// let height =
store.height().map(Height::incremented).unwrap_or_default()
// dbg!((height, store.name()));
})
.min()
.unwrap()
}
pub fn commit(&mut self, height: Height) -> Result<()> {
[
&mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore,
&mut self.blockhashprefix_to_height,
&mut self.height_to_coinbase_tag,
&mut self.txidprefix_to_txindex,
]
.into_par_iter() // Changed from par_iter_mut()
.chain(
self.addresstype_to_typeindex_and_txindex
.par_iter_mut()
.map(|s| s as &mut dyn AnyStore),
)
.chain(
self.addresstype_to_typeindex_and_unspentoutpoint
.par_iter_mut()
.map(|s| s as &mut dyn AnyStore),
)
.try_for_each(|store| store.commit(height))?;
self.database
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
fn iter_any_store(&self) -> impl Iterator<Item = &dyn AnyStore> {
[
&self.addressbyteshash_to_typeindex as &dyn AnyStore,
&self.blockhashprefix_to_height,
&self.height_to_coinbase_tag,
&self.txidprefix_to_txindex,
]
.into_iter()
.chain(
self.addresstype_to_typeindex_and_txindex
.iter()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_typeindex_and_unspentoutpoint
.iter()
.map(|s| s as &dyn AnyStore),
)
}
pub fn rollback_if_needed(
&mut self,
vecs: &mut Vecs,
starting_indexes: &Indexes,
) -> Result<()> {
if self.addressbyteshash_to_typeindex.is_empty()?
&& self.blockhashprefix_to_height.is_empty()?
&& self.txidprefix_to_txindex.is_empty()?
&& self.height_to_coinbase_tag.is_empty()?
&& self
.addresstype_to_typeindex_and_txindex
.iter()
.map(|s| s.is_empty())
.collect::<Result<Vec<_>>>()?
.into_iter()
.all(|empty| empty)
&& self
.addresstype_to_typeindex_and_unspentoutpoint
.iter()
.map(|s| s.is_empty())
.collect::<Result<Vec<_>>>()?
.into_iter()
.all(|empty| empty)
{
return Ok(());
}
if starting_indexes.height != Height::ZERO {
vecs.height_to_blockhash
.iter_at(starting_indexes.height)
.for_each(|(_, v)| {
let blockhashprefix = BlockHashPrefix::from(v.into_owned());
self.blockhashprefix_to_height.remove(blockhashprefix);
});
(starting_indexes.height.unwrap_to_usize()..vecs.height_to_blockhash.len())
.map(Height::from)
.for_each(|h| {
self.height_to_coinbase_tag.remove(h);
});
if let Some(mut index) = vecs
.height_to_first_p2pk65addressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
{
let mut p2pk65addressindex_to_p2pk65bytes_iter =
vecs.p2pk65addressindex_to_p2pk65bytes.iter();
while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter
.get(index)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2pk33addressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
{
let mut p2pk33addressindex_to_p2pk33bytes_iter =
vecs.p2pk33addressindex_to_p2pk33bytes.iter();
while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter
.get(index)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2pkhaddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
{
let mut p2pkhaddressindex_to_p2pkhbytes_iter =
vecs.p2pkhaddressindex_to_p2pkhbytes.iter();
while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter
.get(index)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2shaddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
{
let mut p2shaddressindex_to_p2shbytes_iter =
vecs.p2shaddressindex_to_p2shbytes.iter();
while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter
.get(index)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2traddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
{
let mut p2traddressindex_to_p2trbytes_iter =
vecs.p2traddressindex_to_p2trbytes.iter();
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter
.get(index)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2wpkhaddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
{
let mut p2wpkhaddressindex_to_p2wpkhbytes_iter =
vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter();
while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter
.get(index)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2wshaddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
{
let mut p2wshaddressindex_to_p2wshbytes_iter =
vecs.p2wshaddressindex_to_p2wshbytes.iter();
while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter
.get(index)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2aaddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
{
let mut p2aaddressindex_to_p2abytes_iter = vecs.p2aaddressindex_to_p2abytes.iter();
while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter
.get(index)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
} else {
self.blockhashprefix_to_height.reset()?;
self.addressbyteshash_to_typeindex.reset()?;
}
if starting_indexes.txindex != TxIndex::ZERO {
vecs.txindex_to_txid
.iter_at(starting_indexes.txindex)
.for_each(|(txindex, txid)| {
let txidprefix = TxidPrefix::from(&txid.into_owned());
// "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599"
let is_not_first_dup = txindex != TxIndex::new(142783)
|| txidprefix != TxidPrefix::from([153, 133, 216, 41, 84, 225, 15, 34]);
// "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468"
let is_not_second_dup = txindex != TxIndex::new(142841)
|| txidprefix != TxidPrefix::from([104, 180, 95, 88, 182, 116, 233, 78]);
if is_not_first_dup && is_not_second_dup {
self.txidprefix_to_txindex.remove(txidprefix);
}
});
} else {
self.txidprefix_to_txindex.reset()?;
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
// todo!();
// let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.into_iter();
// vecs.txoutindex_to_outputtype
// .iter_at(starting_indexes.txoutindex)
// .filter(|(_, outputtype)| outputtype.is_address())
// .for_each(|(txoutindex, outputtype)| {
// let outputtype = outputtype.into_owned();
// let typeindex = txoutindex_to_typeindex_iter.unwrap_get_inner(txoutindex);
// self.addresstype_to_typeindex_and_unspentoutpoint
// .get_mut(outputtype)
// .unwrap()
// .remove(TypeIndexAndTxIndex::from((typeindex, txoutindex)));
// });
} else {
self.addresstype_to_typeindex_and_txindex
.iter_mut()
.try_for_each(|s| s.reset())?;
self.addresstype_to_typeindex_and_unspentoutpoint
.iter_mut()
.try_for_each(|s| s.reset())?;
}
self.commit(starting_indexes.height.decremented().unwrap_or_default())?;
Ok(())
}
}

View File

@@ -70,9 +70,6 @@ impl Vecs {
let db = Database::open(&parent.join("vecs"))?;
db.set_min_len(PAGE_SIZE * 50_000_000)?;
let db_positions = Database::open(&parent.join("vecs/positions"))?;
db_positions.set_min_len(PAGE_SIZE * 1_000_000)?;
let this = Self {
emptyoutputindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?,
height_to_blockhash: RawVec::forced_import(&db, "blockhash", version)?,
@@ -368,56 +365,6 @@ impl Vecs {
Ok(())
}
// pub fn iter_any_collectable(&self) -> impl Iterator<Item = &dyn AnyCollectableVec> {
// [
// &self.emptyoutputindex_to_txindex as &dyn AnyCollectableVec,
// &self.height_to_blockhash,
// &self.height_to_difficulty,
// &self.height_to_first_emptyoutputindex,
// &self.height_to_first_txinindex,
// &self.height_to_first_opreturnindex,
// &self.height_to_first_txoutindex,
// &self.height_to_first_p2aaddressindex,
// &self.height_to_first_p2msoutputindex,
// &self.height_to_first_p2pk33addressindex,
// &self.height_to_first_p2pk65addressindex,
// &self.height_to_first_p2pkhaddressindex,
// &self.height_to_first_p2shaddressindex,
// &self.height_to_first_p2traddressindex,
// &self.height_to_first_p2wpkhaddressindex,
// &self.height_to_first_p2wshaddressindex,
// &self.height_to_first_txindex,
// &self.height_to_first_unknownoutputindex,
// &self.height_to_timestamp,
// &self.height_to_total_size,
// &self.height_to_weight,
// &self.txinindex_to_txoutindex,
// &self.opreturnindex_to_txindex,
// &self.txoutindex_to_outputtype,
// &self.txoutindex_to_typeindex,
// &self.txoutindex_to_value,
// &self.p2aaddressindex_to_p2abytes,
// &self.p2msoutputindex_to_txindex,
// &self.p2pk33addressindex_to_p2pk33bytes,
// &self.p2pk65addressindex_to_p2pk65bytes,
// &self.p2pkhaddressindex_to_p2pkhbytes,
// &self.p2shaddressindex_to_p2shbytes,
// &self.p2traddressindex_to_p2trbytes,
// &self.p2wpkhaddressindex_to_p2wpkhbytes,
// &self.p2wshaddressindex_to_p2wshbytes,
// &self.txindex_to_base_size,
// &self.txindex_to_first_txinindex,
// &self.txindex_to_first_txoutindex,
// &self.txindex_to_is_explicitly_rbf,
// &self.txindex_to_rawlocktime,
// &self.txindex_to_total_size,
// &self.txindex_to_txid,
// &self.txindex_to_txversion,
// &self.unknownoutputindex_to_txindex,
// ]
// .into_iter()
// }
fn iter_mut_any_stored_vec(&mut self) -> impl Iterator<Item = &mut dyn AnyStoredVec> {
[
&mut self.emptyoutputindex_to_txindex as &mut dyn AnyStoredVec,

View File

@@ -23,7 +23,7 @@ pub fn init(path: Option<&Path>) -> io::Result<()> {
});
Builder::from_env(Env::default().default_filter_or(
"info,bitcoin=off,bitcoincore-rpc=off,fjall=off,lsm_tree=off,rolldown=off,brk_rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off,brk_aide=off",
"info,bitcoin=off,bitcoincore-rpc=off,fjall=off,lsm_tree=off,rolldown=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off,brk_aide=off",
))
.format(move |buf, record| {
let date_time = Timestamp::now()

View File

@@ -15,4 +15,4 @@ bitcoincore-rpc = { workspace = true }
brk_structs = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
rustc-hash = "2.1.1"
rustc-hash = { workspace = true }

View File

@@ -14,7 +14,10 @@ build = "build.rs"
[dependencies]
brk_error = { workspace = true }
brk_structs = { workspace = true }
byteview = { workspace = true }
fjall = { workspace = true }
byteview_v6 = { version = "=0.6.1", package = "byteview" }
byteview_v8 = { version = "~0.8.0", package = "byteview" }
fjall_v2 = { workspace = true }
fjall_v3 = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
rustc-hash = { workspace = true }

View File

@@ -5,7 +5,7 @@ use brk_error::Result;
fn main() -> Result<()> {
let p = Path::new("./examples/_fjall");
let _keyspace = brk_store::open_keyspace(p)?;
// let _keyspace = brk_store::open_keyspace(p)?;
// let mut store: Store<usize, usize> =
// brk_store::Store::import(&keyspace, p, "n", Version::ZERO, None)?;

View File

@@ -1,329 +1,9 @@
#![doc = include_str!("../README.md")]
use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet},
fmt::Debug,
fs, mem,
path::Path,
sync::Arc,
};
use brk_error::Result;
use brk_structs::{Height, Version};
use byteview::ByteView;
use fjall::{
PartitionCreateOptions, PersistMode, ReadTransaction, TransactionalKeyspace,
TransactionalPartitionHandle,
};
mod any;
mod meta;
mod v2;
mod v3;
pub use any::*;
use log::info;
use meta::*;
use parking_lot::RwLock;
#[derive(Clone)]
pub struct Store<Key, Value> {
meta: StoreMeta,
name: &'static str,
keyspace: TransactionalKeyspace,
partition: Arc<RwLock<Option<TransactionalPartitionHandle>>>,
rtx: Arc<RwLock<Option<ReadTransaction>>>,
puts: BTreeMap<Key, Value>,
dels: BTreeSet<Key>,
bloom_filters: Option<bool>,
}
// const CHECK_COLLISIONS: bool = true;
const MAJOR_FJALL_VERSION: Version = Version::TWO;
pub fn open_keyspace(path: &Path) -> fjall::Result<TransactionalKeyspace> {
fjall::Config::new(path.join("fjall"))
.max_write_buffer_size(32 * 1024 * 1024)
.open_transactional()
}
impl<K, V> Store<K, V>
where
K: Debug + Clone + From<ByteView> + Ord,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
{
pub fn import(
keyspace: &TransactionalKeyspace,
path: &Path,
name: &str,
version: Version,
bloom_filters: Option<bool>,
) -> Result<Self> {
fs::create_dir_all(path)?;
let (meta, partition) = StoreMeta::checked_open(
keyspace,
&path.join(format!("meta/{name}")),
MAJOR_FJALL_VERSION + version,
|| {
Self::open_partition_handle(keyspace, name, bloom_filters).inspect_err(|e| {
eprintln!("{e}");
eprintln!("Delete {path:?} and try again");
})
},
)?;
let rtx = keyspace.read_tx();
Ok(Self {
meta,
name: Box::leak(Box::new(name.to_string())),
keyspace: keyspace.clone(),
partition: Arc::new(RwLock::new(Some(partition))),
rtx: Arc::new(RwLock::new(Some(rtx))),
puts: BTreeMap::new(),
dels: BTreeSet::new(),
bloom_filters,
})
}
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
ByteView: From<&'a K>,
{
if let Some(v) = self.puts.get(key) {
Ok(Some(Cow::Borrowed(v)))
} else if let Some(slice) = self
.rtx
.read()
.as_ref()
.unwrap()
.get(self.partition.read().as_ref().unwrap(), ByteView::from(key))?
{
Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
} else {
Ok(None)
}
}
pub fn is_empty(&self) -> Result<bool> {
self.rtx
.read()
.as_ref()
.unwrap()
.is_empty(self.partition.read().as_ref().unwrap())
.map_err(|e| e.into())
}
// pub fn puts_first_key_value(&self) -> Option<(&K, &V)> {
// self.puts.first_key_value()
// }
// pub fn puts_last_key_value(&self) -> Option<(&K, &V)> {
// self.puts.last_key_value()
// }
// pub fn rtx_first_key_value(&self) -> Result<Option<(K, V)>> {
// Ok(self
// .rtx
// .first_key_value(&self.partition.load())?
// .map(|(k, v)| (K::from(ByteView::from(k)), V::from(ByteView::from(v)))))
// }
// pub fn rtx_last_key_value(&self) -> Result<Option<(K, V)>> {
// Ok(self
// .rtx
// .last_key_value(&self.partition.load())?
// .map(|(k, v)| (K::from(ByteView::from(k)), V::from(ByteView::from(v)))))
// }
pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
self.rtx
.read()
.as_ref()
.unwrap()
.iter(self.partition.read().as_ref().unwrap())
.map(|res| res.unwrap())
.map(|(k, v)| (K::from(ByteView::from(k)), V::from(ByteView::from(v))))
}
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
if !self.dels.is_empty() {
self.dels.remove(&key);
// unreachable!("Shouldn't reach this");
}
self.puts.insert(key, value);
}
}
pub fn remove(&mut self, key: K) {
// if self.is_empty()? {
// return Ok(());
// }
// if !self.puts.is_empty() {
// unreachable!("Shouldn't reach this");
// }
if (self.puts.is_empty() || self.puts.remove(&key).is_none()) && !self.dels.insert(key) {
dbg!(&self.meta.path());
unreachable!();
}
// Ok(())
}
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
}
}
// pub fn retain_or_del<F>(&mut self, retain: F)
// where
// F: Fn(&K, &mut V) -> bool,
// {
// self.puts.retain(|k, v| {
// let ret = retain(k, v);
// if !ret {
// self.dels.insert(k.clone());
// }
// ret
// });
// }
fn open_partition_handle(
keyspace: &TransactionalKeyspace,
name: &str,
bloom_filters: Option<bool>,
) -> Result<TransactionalPartitionHandle> {
let mut options = PartitionCreateOptions::default()
.max_memtable_size(8 * 1024 * 1024)
.manual_journal_persist(true);
if bloom_filters.is_some_and(|b| !b) {
options = options.bloom_filter_bits(None);
}
keyspace.open_partition(name, options).map_err(|e| e.into())
}
pub fn commit_(
&mut self,
height: Height,
remove: impl Iterator<Item = K>,
insert: impl Iterator<Item = (K, V)>,
) -> Result<()> {
if self.has(height) {
return Ok(());
}
self.meta.export(height)?;
let mut rtx = self.rtx.write();
let _ = rtx.take();
let mut wtx = self.keyspace.write_tx();
let partition = self.partition.read();
let partition = partition.as_ref().unwrap();
remove.for_each(|key| wtx.remove(partition, ByteView::from(key)));
insert.for_each(|(key, value)| {
// if CHECK_COLLISIONS {
// #[allow(unused_must_use)]
// if let Ok(Some(value)) = wtx.get(&self.partition, key.as_bytes()) {
// dbg!(
// &key,
// V::try_from(value.as_bytes().into()).unwrap(),
// &self.meta,
// self.rtx.get(&self.partition, key.as_bytes())
// );
// unreachable!();
// }
// }
wtx.insert(partition, ByteView::from(key), ByteView::from(value))
});
wtx.commit()?;
rtx.replace(self.keyspace.read_tx());
Ok(())
}
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
}
impl<K, V> AnyStore for Store<K, V>
where
K: Debug + Clone + From<ByteView> + Ord,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
fn commit(&mut self, height: Height) -> Result<()> {
if self.puts.is_empty() && self.dels.is_empty() {
self.meta.export(height)?;
return Ok(());
}
let dels = mem::take(&mut self.dels);
let puts = mem::take(&mut self.puts);
self.commit_(height, dels.into_iter(), puts.into_iter())
}
fn persist(&self) -> Result<()> {
self.keyspace
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
fn name(&self) -> &'static str {
self.name
}
fn reset(&mut self) -> Result<()> {
info!("Resetting {}...", self.name);
let mut opt = self.partition.write();
let partition = opt.take().unwrap();
self.keyspace.delete_partition(partition)?;
self.meta.reset();
let partition = Self::open_partition_handle(&self.keyspace, self.name, self.bloom_filters)?;
opt.replace(partition);
Ok(())
}
fn height(&self) -> Option<Height> {
self.meta.height()
}
fn has(&self, height: Height) -> bool {
self.has(height)
}
fn needs(&self, height: Height) -> bool {
self.needs(height)
}
fn version(&self) -> Version {
self.meta.version()
}
}
pub use v2::*;
pub use v3::*;

View File

@@ -5,7 +5,7 @@ use std::{
use brk_error::Result;
use brk_structs::Version;
use fjall::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle};
use fjall_v2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle};
use super::Height;
@@ -28,19 +28,15 @@ impl StoreMeta {
{
fs::create_dir_all(path)?;
let read_version = Version::try_from(Self::path_version_(path).as_path());
let is_same_version = read_version
.as_ref()
.is_ok_and(|prev_version| &version == prev_version);
let mut partition = open_partition_handle()?;
if !is_same_version {
if Version::try_from(Self::path_version_(path).as_path())
.is_ok_and(|prev_version| version != prev_version)
{
fs::remove_dir_all(path)?;
fs::create_dir(path)?;
keyspace.delete_partition(partition)?;
keyspace.persist(PersistMode::SyncAll)?;
fs::create_dir(path)?;
partition = open_partition_handle()?;
}

View File

@@ -0,0 +1,281 @@
use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path, sync::Arc};
use brk_error::Result;
use brk_structs::{Height, Version};
use byteview_v6::ByteView;
use fjall_v2::{
PartitionCreateOptions, PersistMode, ReadTransaction, TransactionalKeyspace,
TransactionalPartitionHandle,
};
mod meta;
use log::info;
use meta::*;
use parking_lot::RwLock;
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
#[derive(Clone)]
pub struct StoreV2<Key, Value> {
meta: StoreMeta,
name: &'static str,
keyspace: TransactionalKeyspace,
partition: Arc<RwLock<Option<TransactionalPartitionHandle>>>,
rtx: Arc<RwLock<Option<ReadTransaction>>>,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
bloom_filters: Option<bool>,
}
// const CHECK_COLLISIONS: bool = true;
const MAJOR_FJALL_VERSION: Version = Version::TWO;
pub fn open_keyspace(path: &Path) -> fjall_v2::Result<TransactionalKeyspace> {
fjall_v2::Config::new(path.join("fjall"))
// .cache_size(1024 * 1024 * 1024) // for tests only
.max_write_buffer_size(32 * 1024 * 1024)
.open_transactional()
}
impl<K, V> StoreV2<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
{
fn open_partition_handle(
keyspace: &TransactionalKeyspace,
name: &str,
bloom_filters: Option<bool>,
) -> Result<TransactionalPartitionHandle> {
let mut options = PartitionCreateOptions::default()
// .max_memtable_size(64 * 1024 * 1024) // for tests only
.max_memtable_size(8 * 1024 * 1024)
.manual_journal_persist(true);
if bloom_filters.is_some_and(|b| !b) {
options = options.bloom_filter_bits(None);
}
keyspace.open_partition(name, options).map_err(|e| e.into())
}
pub fn import(
keyspace: &TransactionalKeyspace,
path: &Path,
name: &str,
version: Version,
bloom_filters: Option<bool>,
) -> Result<Self> {
fs::create_dir_all(path)?;
let (meta, partition) = StoreMeta::checked_open(
keyspace,
&path.join(format!("meta/{name}")),
MAJOR_FJALL_VERSION + version,
|| {
Self::open_partition_handle(keyspace, name, bloom_filters).inspect_err(|e| {
eprintln!("{e}");
eprintln!("Delete {path:?} and try again");
})
},
)?;
let rtx = keyspace.read_tx();
Ok(Self {
meta,
name: Box::leak(Box::new(name.to_string())),
keyspace: keyspace.clone(),
partition: Arc::new(RwLock::new(Some(partition))),
rtx: Arc::new(RwLock::new(Some(rtx))),
puts: FxHashMap::default(),
dels: FxHashSet::default(),
bloom_filters,
})
}
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
ByteView: From<&'a K>,
{
if let Some(v) = self.puts.get(key) {
Ok(Some(Cow::Borrowed(v)))
} else if let Some(slice) = self
.rtx
.read()
.as_ref()
.unwrap()
.get(self.partition.read().as_ref().unwrap(), ByteView::from(key))?
{
Ok(Some(Cow::Owned(V::from(ByteView::from(&*slice)))))
} else {
Ok(None)
}
}
pub fn is_empty(&self) -> Result<bool> {
self.rtx
.read()
.as_ref()
.unwrap()
.is_empty(self.partition.read().as_ref().unwrap())
.map_err(|e| e.into())
}
pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
self.rtx
.read()
.as_ref()
.unwrap()
.iter(self.partition.read().as_ref().unwrap())
.map(|res| res.unwrap())
.map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
}
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
if !self.dels.is_empty() {
self.dels.remove(&key);
// unreachable!("Shouldn't reach this");
}
self.puts.insert(key, value);
}
}
pub fn remove(&mut self, key: K) {
// if self.is_empty()? {
// return Ok(());
// }
// if !self.puts.is_empty() {
// unreachable!("Shouldn't reach this");
// }
if (self.puts.is_empty() || self.puts.remove(&key).is_none()) && !self.dels.insert(key) {
dbg!(&self.meta.path());
unreachable!();
}
// Ok(())
}
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
}
}
// pub fn retain_or_del<F>(&mut self, retain: F)
// where
// F: Fn(&K, &mut V) -> bool,
// {
// self.puts.retain(|k, v| {
// let ret = retain(k, v);
// if !ret {
// self.dels.insert(k.clone());
// }
// ret
// });
// }
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
}
impl<K, V> AnyStore for StoreV2<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
fn commit(&mut self, height: Height) -> Result<()> {
if self.has(height) {
return Ok(());
}
self.meta.export(height)?;
if self.puts.is_empty() && self.dels.is_empty() {
return Ok(());
}
let mut rtx = self.rtx.write();
let _ = rtx.take();
let mut wtx = self.keyspace.write_tx();
let partition = self.partition.read();
let partition = partition.as_ref().unwrap();
let mut dels = self.dels.drain().collect::<Vec<_>>();
dels.sort_unstable();
dels.into_iter()
.for_each(|key| wtx.remove(partition, ByteView::from(key)));
let mut puts = self.puts.drain().collect::<Vec<_>>();
puts.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
puts.into_iter().for_each(|(key, value)| {
wtx.insert(partition, ByteView::from(key), ByteView::from(value))
});
wtx.commit()?;
rtx.replace(self.keyspace.read_tx());
Ok(())
}
fn persist(&self) -> Result<()> {
self.keyspace
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
fn name(&self) -> &'static str {
self.name
}
fn reset(&mut self) -> Result<()> {
info!("Resetting {}...", self.name);
let mut opt = self.partition.write();
let partition = opt.take().unwrap();
self.keyspace.delete_partition(partition)?;
self.meta.reset();
let partition = Self::open_partition_handle(&self.keyspace, self.name, self.bloom_filters)?;
opt.replace(partition);
Ok(())
}
fn height(&self) -> Option<Height> {
self.meta.height()
}
fn has(&self, height: Height) -> bool {
self.has(height)
}
fn needs(&self, height: Height) -> bool {
self.needs(height)
}
fn version(&self) -> Version {
self.meta.version()
}
}

View File

@@ -0,0 +1,95 @@
use std::{
fs, io,
path::{Path, PathBuf},
};
use brk_error::Result;
use brk_structs::Version;
use fjall_v3::{PersistMode, TxDatabase, TxKeyspace};
use super::Height;
#[derive(Debug, Clone)]
pub struct StoreMeta {
pathbuf: PathBuf,
version: Version,
height: Option<Height>,
}
impl StoreMeta {
pub fn checked_open<F>(
database: &TxDatabase,
path: &Path,
version: Version,
open_partition_handle: F,
) -> Result<(Self, TxKeyspace)>
where
F: Fn() -> Result<TxKeyspace>,
{
fs::create_dir_all(path)?;
let mut partition = open_partition_handle()?;
if Version::try_from(Self::path_version_(path).as_path())
.is_ok_and(|prev_version| version != prev_version)
{
todo!();
fs::remove_dir_all(path)?;
// Doesn't exist
// database.delete_partition(partition)?;
fs::create_dir(path)?;
database.persist(PersistMode::SyncAll)?;
partition = open_partition_handle()?;
}
let slf = Self {
pathbuf: path.to_owned(),
version,
height: Height::try_from(Self::path_height_(path).as_path()).ok(),
};
slf.version.write(&slf.path_version())?;
Ok((slf, partition))
}
pub fn version(&self) -> Version {
self.version
}
pub fn export(&mut self, height: Height) -> io::Result<()> {
self.height = Some(height);
height.write(&self.path_height())
}
pub fn reset(&mut self) {
self.height.take();
}
pub fn path(&self) -> &Path {
&self.pathbuf
}
fn path_version(&self) -> PathBuf {
Self::path_version_(&self.pathbuf)
}
fn path_version_(path: &Path) -> PathBuf {
path.join("version")
}
pub fn height(&self) -> Option<Height> {
self.height
}
pub fn needs(&self, height: Height) -> bool {
self.height.is_none_or(|self_height| height > self_height)
}
pub fn has(&self, height: Height) -> bool {
!self.needs(height)
}
fn path_height(&self) -> PathBuf {
Self::path_height_(&self.pathbuf)
}
fn path_height_(path: &Path) -> PathBuf {
path.join("height")
}
}

View File

@@ -0,0 +1,272 @@
use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path, sync::Arc};
use brk_error::Result;
use brk_structs::{Height, Version};
use byteview_v8::ByteView;
use fjall_v3::{KeyspaceCreateOptions, PersistMode, ReadTransaction, TxDatabase, TxKeyspace};
mod meta;
use log::info;
use meta::*;
use parking_lot::RwLock;
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
#[derive(Clone)]
pub struct StoreV3<Key, Value> {
meta: StoreMeta,
name: &'static str,
database: TxDatabase,
keyspace: Arc<RwLock<Option<TxKeyspace>>>,
rtx: Arc<RwLock<Option<ReadTransaction>>>,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
}
const MAJOR_FJALL_VERSION: Version = Version::new(3);
pub fn open_database(path: &Path) -> fjall_v3::Result<TxDatabase> {
TxDatabase::builder(path.join("fjall"))
.cache_size(4 * 1024 * 1024 * 1024)
.open()
}
impl<K, V> StoreV3<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
{
fn open_keyspace(database: &TxDatabase, name: &str) -> Result<TxKeyspace> {
database
.keyspace(
name,
KeyspaceCreateOptions::default().manual_journal_persist(true),
)
.map_err(|e| e.into())
}
pub fn import(
database: &TxDatabase,
path: &Path,
name: &str,
version: Version,
_bloom_filters: Option<bool>,
) -> Result<Self> {
fs::create_dir_all(path)?;
let (meta, keyspace) = StoreMeta::checked_open(
database,
&path.join(format!("meta/{name}")),
MAJOR_FJALL_VERSION + version,
|| {
Self::open_keyspace(database, name).inspect_err(|e| {
eprintln!("{e}");
eprintln!("Delete {path:?} and try again");
})
},
)?;
let rtx = database.read_tx();
Ok(Self {
meta,
name: Box::leak(Box::new(name.to_string())),
database: database.clone(),
keyspace: Arc::new(RwLock::new(Some(keyspace))),
rtx: Arc::new(RwLock::new(Some(rtx))),
puts: FxHashMap::default(),
dels: FxHashSet::default(),
})
}
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
ByteView: From<&'a K>,
{
if let Some(v) = self.puts.get(key) {
Ok(Some(Cow::Borrowed(v)))
} else if let Some(slice) = self
.rtx
.read()
.as_ref()
.unwrap()
.get(self.keyspace.read().as_ref().unwrap(), ByteView::from(key))?
{
Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
} else {
Ok(None)
}
}
pub fn is_empty(&self) -> Result<bool> {
self.rtx
.read()
.as_ref()
.unwrap()
.is_empty(self.keyspace.read().as_ref().unwrap())
.map_err(|e| e.into())
}
// pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
// let keyspace = self.keyspace.read().as_ref().unwrap();
// self.rtx
// .read()
// .as_ref()
// .unwrap()
// .iter(keyspace)
// .map(|res| res.into_inner().unwrap())
// .map(|(k, v)| (K::from(ByteView::from(k)), V::from(ByteView::from(v))))
// }
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
if !self.dels.is_empty() {
self.dels.remove(&key);
// unreachable!("Shouldn't reach this");
}
self.puts.insert(key, value);
}
}
pub fn remove(&mut self, key: K) {
// if self.is_empty()? {
// return Ok(());
// }
// if !self.puts.is_empty() {
// unreachable!("Shouldn't reach this");
// }
if (self.puts.is_empty() || self.puts.remove(&key).is_none()) && !self.dels.insert(key) {
dbg!(&self.meta.path());
unreachable!();
}
// Ok(())
}
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
}
}
// pub fn retain_or_del<F>(&mut self, retain: F)
// where
// F: Fn(&K, &mut V) -> bool,
// {
// self.puts.retain(|k, v| {
// let ret = retain(k, v);
// if !ret {
// self.dels.insert(k.clone());
// }
// ret
// });
// }
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
}
impl<K, V> AnyStore for StoreV3<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
fn commit(&mut self, height: Height) -> Result<()> {
if self.has(height) {
return Ok(());
}
self.meta.export(height)?;
if self.puts.is_empty() && self.dels.is_empty() {
return Ok(());
}
let mut rtx = self.rtx.write();
let bad_rtx = rtx.take();
drop(bad_rtx);
let mut wtx = self.database.write_tx();
let keyspace = self.keyspace.read();
let partition = keyspace.as_ref().unwrap();
let mut dels = self.dels.drain().collect::<Vec<_>>();
dels.sort_unstable();
dels.into_iter()
.for_each(|key| wtx.remove(partition, ByteView::from(key)));
let mut puts = self.puts.drain().collect::<Vec<_>>();
puts.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
puts.into_iter().for_each(|(key, value)| {
wtx.insert(partition, ByteView::from(key), ByteView::from(value))
});
wtx.commit()?;
rtx.replace(self.database.read_tx());
Ok(())
}
fn persist(&self) -> Result<()> {
self.database
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
fn name(&self) -> &'static str {
self.name
}
fn reset(&mut self) -> Result<()> {
info!("Resetting {}...", self.name);
todo!();
let mut opt = self.keyspace.write();
let keyspace = opt.take().unwrap();
// Doesn't exist yet
// self.database.remove_keyspace(keyspace)?;
self.meta.reset();
let keyspace = Self::open_keyspace(&self.database, self.name)?;
opt.replace(keyspace);
Ok(())
}
fn height(&self) -> Option<Height> {
self.meta.height()
}
fn has(&self, height: Height) -> bool {
self.has(height)
}
fn needs(&self, height: Height) -> bool {
self.needs(height)
}
fn version(&self) -> Version {
self.meta.version()
}
}

View File

@@ -33,6 +33,26 @@ impl AddressBytes {
AddressBytes::P2A(bytes) => &bytes[..],
}
}
pub fn hash(&self) -> u64 {
let mut slice = rapidhash::v3::rapidhash_v3(self.as_slice()).to_le_bytes();
slice[0] = slice[0].wrapping_add(self.index());
u64::from_ne_bytes(slice)
}
fn index(&self) -> u8 {
// DO NOT CHANGE !!!
match self {
AddressBytes::P2PK65(_) => 0,
AddressBytes::P2PK33(_) => 1,
AddressBytes::P2PKH(_) => 2,
AddressBytes::P2SH(_) => 3,
AddressBytes::P2WPKH(_) => 4,
AddressBytes::P2WSH(_) => 5,
AddressBytes::P2TR(_) => 6,
AddressBytes::P2A(_) => 7,
}
}
}
impl fmt::Display for AddressBytes {

View File

@@ -2,7 +2,7 @@ use byteview::ByteView;
use derive_deref::Deref;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use super::{AddressBytes, OutputType};
use super::AddressBytes;
#[derive(
Debug,
@@ -17,20 +17,13 @@ use super::{AddressBytes, OutputType};
Immutable,
IntoBytes,
KnownLayout,
Hash,
)]
pub struct AddressBytesHash([u8; 8]);
pub struct AddressBytesHash(u64);
impl From<(&AddressBytes, OutputType)> for AddressBytesHash {
fn from((address_bytes, outputtype): (&AddressBytes, OutputType)) -> Self {
let mut slice = rapidhash::v3::rapidhash_v3(address_bytes.as_slice()).to_le_bytes();
slice[0] = slice[0].wrapping_add(outputtype as u8);
Self(slice)
}
}
impl From<[u8; 8]> for AddressBytesHash {
fn from(value: [u8; 8]) -> Self {
Self(value)
impl From<&AddressBytes> for AddressBytesHash {
fn from(address_bytes: &AddressBytes) -> Self {
Self(address_bytes.hash())
}
}
@@ -40,14 +33,14 @@ impl From<ByteView> for AddressBytesHash {
}
}
impl From<&AddressBytesHash> for ByteView {
fn from(value: &AddressBytesHash) -> Self {
Self::new(value.as_bytes())
}
}
impl From<AddressBytesHash> for ByteView {
fn from(value: AddressBytesHash) -> Self {
Self::from(&value)
}
}
impl From<&AddressBytesHash> for ByteView {
fn from(value: &AddressBytesHash) -> Self {
Self::new(value.as_bytes())
}
}

View File

@@ -19,8 +19,9 @@ use super::BlockHash;
Immutable,
IntoBytes,
KnownLayout,
Hash,
)]
pub struct BlockHashPrefix([u8; 8]);
pub struct BlockHashPrefix(u64);
impl From<BlockHash> for BlockHashPrefix {
fn from(value: BlockHash) -> Self {
@@ -30,7 +31,7 @@ impl From<BlockHash> for BlockHashPrefix {
impl From<&BlockHash> for BlockHashPrefix {
fn from(value: &BlockHash) -> Self {
Self(copy_first_8bytes(&value[..]).unwrap())
Self(u64::from_ne_bytes(copy_first_8bytes(&value[..]).unwrap()))
}
}

View File

@@ -36,6 +36,7 @@ use super::StoredU64;
StoredCompressed,
Allocative,
JsonSchema,
Hash,
)]
pub struct Height(u32);
@@ -240,12 +241,6 @@ impl From<ByteView> for Height {
impl From<Height> for ByteView {
fn from(value: Height) -> Self {
Self::from(&value)
}
}
impl From<&Height> for ByteView {
fn from(value: &Height) -> Self {
Self::new(&value.0.to_be_bytes())
}
}

View File

@@ -21,59 +21,34 @@ use crate::{TxIndex, Vout};
Serialize,
Allocative,
JsonSchema,
Hash,
)]
pub struct OutPoint {
txindex: TxIndex,
vout: Vout,
_padding: u16,
}
pub struct OutPoint(pub u64);
impl OutPoint {
pub const COINBASE: Self = Self {
txindex: TxIndex::COINBASE,
vout: Vout::MAX,
_padding: 0,
};
pub const COINBASE: Self = Self(u64::MAX);
pub fn new(txindex: TxIndex, vout: Vout) -> Self {
Self {
txindex,
vout,
_padding: 0,
}
let txindex_bits = u64::from(txindex) << 32;
let vout_bits = u64::from(vout);
Self(txindex_bits | vout_bits)
}
pub fn txindex(&self) -> TxIndex {
self.txindex
pub fn txindex(self) -> TxIndex {
TxIndex::from((self.0 >> 32) as u32)
}
pub fn vout(&self) -> Vout {
self.vout
pub fn vout(self) -> Vout {
Vout::from(self.0 as u32)
}
pub fn is_coinbase(self) -> bool {
self == Self::COINBASE
}
pub fn to_be_bytes(&self) -> [u8; 6] {
let txindex = self.txindex.to_be_bytes();
let vout = self.vout.to_be_bytes();
[
txindex[0], txindex[1], txindex[2], txindex[3], vout[0], vout[1],
]
}
}
impl From<&[u8]> for OutPoint {
fn from(value: &[u8]) -> Self {
let txindex = TxIndex::from(&value[4..8]);
let vout = Vout::from(&value[8..10]);
Self::new(txindex, vout)
}
}
impl std::fmt::Display for OutPoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "txindex: {}, vout: {}", self.txindex, self.vout)
write!(f, "txindex: {}, vout: {}", self.txindex(), self.vout())
}
}

View File

@@ -19,8 +19,9 @@ use super::Txid;
Immutable,
IntoBytes,
KnownLayout,
Hash,
)]
pub struct TxidPrefix([u8; 8]);
pub struct TxidPrefix(u64);
impl From<Txid> for TxidPrefix {
fn from(value: Txid) -> Self {
@@ -30,7 +31,7 @@ impl From<Txid> for TxidPrefix {
impl From<&Txid> for TxidPrefix {
fn from(value: &Txid) -> Self {
Self(copy_first_8bytes(&value[..]).unwrap())
Self(u64::from_ne_bytes(copy_first_8bytes(&value[..]).unwrap()))
}
}
@@ -54,6 +55,6 @@ impl From<TxidPrefix> for ByteView {
impl From<[u8; 8]> for TxidPrefix {
fn from(value: [u8; 8]) -> Self {
Self(value)
Self(u64::from_ne_bytes(value))
}
}

View File

@@ -31,6 +31,7 @@ use super::StoredU32;
StoredCompressed,
Allocative,
JsonSchema,
Hash,
)]
pub struct TxIndex(u32);
@@ -49,6 +50,10 @@ impl TxIndex {
pub fn to_be_bytes(&self) -> [u8; 4] {
self.0.to_be_bytes()
}
pub fn to_ne_bytes(&self) -> [u8; 4] {
self.0.to_ne_bytes()
}
}
impl Add<TxIndex> for TxIndex {

View File

@@ -26,6 +26,7 @@ use crate::copy_first_4bytes;
Deserialize,
StoredCompressed,
JsonSchema,
Hash,
)]
pub struct TypeIndex(u32);
@@ -47,6 +48,14 @@ impl TypeIndex {
self.increment();
i
}
pub fn to_be_bytes(&self) -> [u8; 4] {
self.0.to_be_bytes()
}
pub fn to_ne_bytes(&self) -> [u8; 4] {
self.0.to_ne_bytes()
}
}
impl From<u32> for TypeIndex {

View File

@@ -1,19 +1,34 @@
use std::hash::{Hash, Hasher};
use byteview::ByteView;
use serde::Serialize;
use zerocopy::IntoBytes;
use crate::{TypeIndexAndTxIndex, Vout};
use super::{OutPoint, TypeIndex};
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default, Serialize)]
#[repr(C)]
pub struct TypeIndexAndOutPoint {
typeindex: TypeIndex,
outpoint: OutPoint,
typeindexandtxindex: TypeIndexAndTxIndex, // u64
vout: Vout, // u16
}
impl Hash for TypeIndexAndOutPoint {
fn hash<H: Hasher>(&self, state: &mut H) {
let mut buf = [0u8; 10];
buf[..8].copy_from_slice(self.typeindexandtxindex.as_bytes());
buf[8..].copy_from_slice(self.vout.as_bytes());
state.write(&buf);
}
}
impl From<(TypeIndex, OutPoint)> for TypeIndexAndOutPoint {
fn from(value: (TypeIndex, OutPoint)) -> Self {
Self {
typeindex: value.0,
outpoint: value.1,
typeindexandtxindex: TypeIndexAndTxIndex::from((value.0, value.1.txindex())),
vout: value.1.vout(),
}
}
}
@@ -21,8 +36,8 @@ impl From<(TypeIndex, OutPoint)> for TypeIndexAndOutPoint {
impl From<ByteView> for TypeIndexAndOutPoint {
fn from(value: ByteView) -> Self {
Self {
typeindex: TypeIndex::from(&value[0..4]),
outpoint: OutPoint::from(&value[4..]),
typeindexandtxindex: TypeIndexAndTxIndex::from(&value[0..8]),
vout: Vout::from(&value[8..]),
}
}
}
@@ -36,8 +51,8 @@ impl From<&TypeIndexAndOutPoint> for ByteView {
fn from(value: &TypeIndexAndOutPoint) -> Self {
ByteView::from(
[
u32::from(value.typeindex).to_be_bytes().as_slice(),
value.outpoint.to_be_bytes().as_slice(),
value.typeindexandtxindex.to_be_bytes().as_slice(),
value.vout.to_be_bytes().as_slice(),
]
.concat(),
)

View File

@@ -1,28 +1,58 @@
use byteview::ByteView;
use serde::Serialize;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use super::{TxIndex, TypeIndex};
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Default, Serialize)]
pub struct TypeIndexAndTxIndex {
typeindex: TypeIndex,
txindex: TxIndex,
#[derive(
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Clone,
Copy,
Default,
Serialize,
Hash,
FromBytes,
Immutable,
IntoBytes,
KnownLayout,
)]
pub struct TypeIndexAndTxIndex(u64);
impl TypeIndexAndTxIndex {
pub fn typeindex(&self) -> u32 {
(self.0 >> 32) as u32
}
pub fn txindex(&self) -> u32 {
self.0 as u32
}
pub fn to_be_bytes(&self) -> [u8; 8] {
self.0.to_be_bytes()
}
}
impl From<(TypeIndex, TxIndex)> for TypeIndexAndTxIndex {
fn from(value: (TypeIndex, TxIndex)) -> Self {
Self {
typeindex: value.0,
txindex: value.1,
}
fn from((typeindex, txindex): (TypeIndex, TxIndex)) -> Self {
Self((u64::from(typeindex) << 32) | u64::from(txindex))
}
}
impl From<ByteView> for TypeIndexAndTxIndex {
fn from(value: ByteView) -> Self {
Self::from(&*value)
}
}
impl From<&[u8]> for TypeIndexAndTxIndex {
fn from(value: &[u8]) -> Self {
let typeindex = TypeIndex::from(&value[0..4]);
let txindex = TxIndex::from(&value[4..8]);
Self { typeindex, txindex }
Self::from((typeindex, txindex))
}
}
@@ -33,12 +63,12 @@ impl From<TypeIndexAndTxIndex> for ByteView {
}
impl From<&TypeIndexAndTxIndex> for ByteView {
fn from(value: &TypeIndexAndTxIndex) -> Self {
ByteView::from(
[
u32::from(value.typeindex).to_be_bytes().as_slice(),
u32::from(value.txindex).to_be_bytes().as_slice(),
]
.concat(),
)
ByteView::from(value.0.to_be_bytes().as_slice())
}
}
impl From<TypeIndexAndTxIndex> for u64 {
fn from(value: TypeIndexAndTxIndex) -> Self {
value.0
}
}

View File

@@ -24,6 +24,7 @@ use crate::copy_first_2bytes;
Serialize,
Allocative,
JsonSchema,
Hash,
)]
pub struct Vout(u16);
@@ -38,6 +39,10 @@ impl Vout {
pub fn to_be_bytes(&self) -> [u8; 2] {
self.0.to_be_bytes()
}
pub fn to_ne_bytes(&self) -> [u8; 2] {
self.0.to_ne_bytes()
}
}
const U16_MAX_AS_U32: u32 = u16::MAX as u32;
@@ -60,6 +65,18 @@ impl From<usize> for Vout {
}
}
impl From<Vout> for u16 {
fn from(value: Vout) -> Self {
value.0
}
}
impl From<Vout> for u32 {
fn from(value: Vout) -> Self {
value.0 as u32
}
}
impl From<Vout> for u64 {
fn from(value: Vout) -> Self {
value.0 as u64