mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-05-14 09:38:36 -07:00
fjall: use a single keyspace for all stores + core: locktime -> rawlocktime
This commit is contained in:
@@ -24,7 +24,7 @@ fn main() -> color_eyre::Result<()> {
|
||||
|
||||
let outputs = Path::new("../../_outputs");
|
||||
|
||||
let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), false, false)?;
|
||||
let mut indexer = Indexer::new(outputs, false, false)?;
|
||||
|
||||
indexer.import_stores()?;
|
||||
indexer.import_vecs()?;
|
||||
|
||||
@@ -207,19 +207,20 @@ impl TryFrom<(&mut Vecs, &Stores, &Client)> for Indexes {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn starting_index<'a, I>(
|
||||
pub fn starting_index<'a, I, T>(
|
||||
height_to_index: &'a IndexedVec<Height, I>,
|
||||
index_to_height: &'a IndexedVec<I, Height>,
|
||||
index_to_else: &'a IndexedVec<I, T>,
|
||||
starting_height: Height,
|
||||
) -> Result<Option<Value<'a, I>>>
|
||||
where
|
||||
I: StoredType + StoredIndex + From<usize>,
|
||||
T: StoredType,
|
||||
{
|
||||
if height_to_index
|
||||
.height()
|
||||
.is_ok_and(|h| h + 1_u32 == starting_height)
|
||||
{
|
||||
Ok(Some(Value::Owned(I::from(index_to_height.len()))))
|
||||
Ok(Some(Value::Owned(I::from(index_to_else.len()))))
|
||||
} else {
|
||||
height_to_index.get(starting_height)
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use bitcoin::{Transaction, TxIn, TxOut};
|
||||
use brk_exit::Exit;
|
||||
use brk_vec::Compressed;
|
||||
use color_eyre::eyre::{ContextCompat, eyre};
|
||||
use fjall::TransactionalKeyspace;
|
||||
use log::info;
|
||||
use rayon::prelude::*;
|
||||
mod indexes;
|
||||
@@ -44,13 +45,13 @@ pub struct Indexer {
|
||||
|
||||
impl Indexer {
|
||||
pub fn new(
|
||||
indexes_dir: PathBuf,
|
||||
outputs_dir: &Path,
|
||||
compressed: bool,
|
||||
check_collisions: bool,
|
||||
) -> color_eyre::Result<Self> {
|
||||
setrlimit()?;
|
||||
Ok(Self {
|
||||
path: indexes_dir,
|
||||
path: outputs_dir.to_owned(),
|
||||
vecs: None,
|
||||
stores: None,
|
||||
compressed: Compressed::from(compressed),
|
||||
@@ -59,14 +60,17 @@ impl Indexer {
|
||||
}
|
||||
|
||||
pub fn import_vecs(&mut self) -> color_eyre::Result<()> {
|
||||
self.vecs = Some(Vecs::import(&self.path.join("vecs"), self.compressed)?);
|
||||
self.vecs = Some(Vecs::forced_import(
|
||||
&self.path.join("vecs/indexed"),
|
||||
self.compressed,
|
||||
)?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Do NOT import multiple times are things will break !!!
|
||||
/// Clone struct instead
|
||||
pub fn import_stores(&mut self) -> color_eyre::Result<()> {
|
||||
self.stores = Some(Stores::import(&self.path.join("stores"))?);
|
||||
self.stores = Some(Stores::forced_import(&self.path.join("stores"))?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -704,7 +708,7 @@ impl Indexer {
|
||||
vecs.txindex_to_txversion.push_if_needed(txindex, tx.version.into())?;
|
||||
vecs.txindex_to_txid.push_if_needed(txindex, txid)?;
|
||||
vecs.txindex_to_height.push_if_needed(txindex, height)?;
|
||||
vecs.txindex_to_locktime.push_if_needed(txindex, tx.lock_time.into())?;
|
||||
vecs.txindex_to_rawlocktime.push_if_needed(txindex, tx.lock_time.into())?;
|
||||
vecs.txindex_to_base_size.push_if_needed(txindex, tx.base_size())?;
|
||||
vecs.txindex_to_total_size.push_if_needed(txindex, tx.total_size())?;
|
||||
vecs.txindex_to_is_explicitly_rbf.push_if_needed(txindex, tx.is_explicitly_rbf())?;
|
||||
@@ -728,10 +732,6 @@ impl Indexer {
|
||||
Ok(starting_indexes)
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &Path {
|
||||
&self.path
|
||||
}
|
||||
|
||||
pub fn vecs(&self) -> &Vecs {
|
||||
self.vecs.as_ref().unwrap()
|
||||
}
|
||||
@@ -747,6 +747,10 @@ impl Indexer {
|
||||
pub fn mut_stores(&mut self) -> &mut Stores {
|
||||
self.stores.as_mut().unwrap()
|
||||
}
|
||||
|
||||
pub fn keyspace(&self) -> &TransactionalKeyspace {
|
||||
&self.stores().keyspace
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -10,7 +10,7 @@ use brk_core::Height;
|
||||
use brk_vec::{Value, Version};
|
||||
use byteview::ByteView;
|
||||
use fjall::{
|
||||
PartitionCreateOptions, PersistMode, ReadTransaction, Result, TransactionalKeyspace,
|
||||
PartitionCreateOptions, ReadTransaction, Result, TransactionalKeyspace,
|
||||
TransactionalPartitionHandle,
|
||||
};
|
||||
use zerocopy::{Immutable, IntoBytes};
|
||||
@@ -20,7 +20,7 @@ use super::StoreMeta;
|
||||
pub struct Store<Key, Value> {
|
||||
meta: StoreMeta,
|
||||
keyspace: TransactionalKeyspace,
|
||||
part: TransactionalPartitionHandle,
|
||||
partition: TransactionalPartitionHandle,
|
||||
rtx: ReadTransaction,
|
||||
puts: BTreeMap<Key, Value>,
|
||||
dels: BTreeSet<Key>,
|
||||
@@ -35,36 +35,31 @@ where
|
||||
V: Debug + Clone + Into<ByteView> + TryFrom<ByteView>,
|
||||
<V as TryFrom<ByteView>>::Error: error::Error + Send + Sync + 'static,
|
||||
{
|
||||
pub fn import(path: &Path, version: Version) -> color_eyre::Result<Self> {
|
||||
pub fn import(
|
||||
keyspace: TransactionalKeyspace,
|
||||
path: &Path,
|
||||
name: &str,
|
||||
version: Version,
|
||||
) -> color_eyre::Result<Self> {
|
||||
let version = MAJOR_FJALL_VERSION + version;
|
||||
|
||||
let meta = StoreMeta::checked_open(path, version)?;
|
||||
|
||||
let keyspace = match Self::open_keyspace(path) {
|
||||
Ok(keyspace) => keyspace,
|
||||
Err(e) => {
|
||||
dbg!(e);
|
||||
meta.reset()?;
|
||||
return Self::import(path, version);
|
||||
}
|
||||
};
|
||||
|
||||
let part = match Self::open_partition_handle(&keyspace) {
|
||||
Ok(part) => part,
|
||||
Err(e) => {
|
||||
dbg!(e);
|
||||
drop(keyspace);
|
||||
meta.reset()?;
|
||||
return Self::import(path, version);
|
||||
}
|
||||
};
|
||||
let (meta, partition) = StoreMeta::checked_open(
|
||||
&keyspace,
|
||||
&path.join(format!("meta/{name}")),
|
||||
version,
|
||||
|| {
|
||||
Self::open_partition_handle(&keyspace, name).inspect_err(|_| {
|
||||
eprintln!("Delete {path:?} and try again");
|
||||
})
|
||||
},
|
||||
)?;
|
||||
|
||||
let rtx = keyspace.read_tx();
|
||||
|
||||
Ok(Self {
|
||||
meta,
|
||||
keyspace,
|
||||
part,
|
||||
partition,
|
||||
rtx,
|
||||
puts: BTreeMap::new(),
|
||||
dels: BTreeSet::new(),
|
||||
@@ -74,7 +69,7 @@ where
|
||||
pub fn get(&self, key: &K) -> color_eyre::Result<Option<Value<V>>> {
|
||||
if let Some(v) = self.puts.get(key) {
|
||||
Ok(Some(Value::Ref(v)))
|
||||
} else if let Some(slice) = self.rtx.get(&self.part, key.as_bytes())? {
|
||||
} else if let Some(slice) = self.rtx.get(&self.partition, key.as_bytes())? {
|
||||
Ok(Some(Value::Owned(V::try_from(slice.as_bytes().into())?)))
|
||||
} else {
|
||||
Ok(None)
|
||||
@@ -117,25 +112,25 @@ where
|
||||
|
||||
mem::take(&mut self.dels)
|
||||
.into_iter()
|
||||
.for_each(|key| wtx.remove(&self.part, key.as_bytes()));
|
||||
.for_each(|key| wtx.remove(&self.partition, key.as_bytes()));
|
||||
|
||||
mem::take(&mut self.puts)
|
||||
.into_iter()
|
||||
.for_each(|(key, value)| {
|
||||
if CHECK_COLLISISONS {
|
||||
#[allow(unused_must_use)]
|
||||
if let Ok(Some(value)) = wtx.get(&self.part, key.as_bytes()) {
|
||||
if let Ok(Some(value)) = wtx.get(&self.partition, key.as_bytes()) {
|
||||
dbg!(
|
||||
&key,
|
||||
V::try_from(value.as_bytes().into()).unwrap(),
|
||||
&self.meta,
|
||||
self.rtx.get(&self.part, key.as_bytes())
|
||||
self.rtx.get(&self.partition, key.as_bytes())
|
||||
);
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
wtx.insert(
|
||||
&self.part,
|
||||
&self.partition,
|
||||
key.as_bytes(),
|
||||
&*ByteView::try_from(value).unwrap(),
|
||||
)
|
||||
@@ -143,15 +138,13 @@ where
|
||||
|
||||
wtx.commit()?;
|
||||
|
||||
self.keyspace.persist(PersistMode::SyncAll)?;
|
||||
|
||||
self.rtx = self.keyspace.read_tx();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn rotate_memtable(&self) {
|
||||
let _ = self.part.inner().rotate_memtable();
|
||||
let _ = self.partition.inner().rotate_memtable();
|
||||
}
|
||||
|
||||
pub fn height(&self) -> Option<Height> {
|
||||
@@ -172,17 +165,12 @@ where
|
||||
self.meta.needs(height)
|
||||
}
|
||||
|
||||
fn open_keyspace(path: &Path) -> Result<TransactionalKeyspace> {
|
||||
fjall::Config::new(path.join("fjall"))
|
||||
.max_write_buffer_size(32 * 1024 * 1024)
|
||||
.open_transactional()
|
||||
}
|
||||
|
||||
fn open_partition_handle(
|
||||
keyspace: &TransactionalKeyspace,
|
||||
name: &str,
|
||||
) -> Result<TransactionalPartitionHandle> {
|
||||
keyspace.open_partition(
|
||||
"partition",
|
||||
name,
|
||||
PartitionCreateOptions::default()
|
||||
.bloom_filter_bits(Some(5))
|
||||
.max_memtable_size(8 * 1024 * 1024)
|
||||
@@ -200,7 +188,7 @@ where
|
||||
Self {
|
||||
meta: self.meta.clone(),
|
||||
keyspace: self.keyspace.clone(),
|
||||
part: self.part.clone(),
|
||||
partition: self.partition.clone(),
|
||||
rtx: self.keyspace.read_tx(),
|
||||
puts: self.puts.clone(),
|
||||
dels: self.dels.clone(),
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::{
|
||||
};
|
||||
|
||||
use brk_vec::Version;
|
||||
use fjall::{TransactionalKeyspace, TransactionalPartitionHandle};
|
||||
use zerocopy::{FromBytes, IntoBytes};
|
||||
|
||||
use super::Height;
|
||||
@@ -17,14 +18,27 @@ pub struct StoreMeta {
|
||||
}
|
||||
|
||||
impl StoreMeta {
|
||||
pub fn checked_open(path: &Path, version: Version) -> color_eyre::Result<Self> {
|
||||
pub fn checked_open<F>(
|
||||
keyspace: &TransactionalKeyspace,
|
||||
path: &Path,
|
||||
version: Version,
|
||||
open_partition_handle: F,
|
||||
) -> color_eyre::Result<(Self, TransactionalPartitionHandle)>
|
||||
where
|
||||
F: Fn() -> fjall::Result<TransactionalPartitionHandle>,
|
||||
{
|
||||
fs::create_dir_all(path)?;
|
||||
|
||||
let is_same_version = Version::try_from(Self::path_version_(path).as_path())
|
||||
.is_ok_and(|prev_version| version == prev_version);
|
||||
|
||||
let mut partition = open_partition_handle()?;
|
||||
|
||||
if !is_same_version {
|
||||
Self::reset_(path)?;
|
||||
keyspace.delete_partition(partition)?;
|
||||
keyspace.persist(fjall::PersistMode::SyncAll)?;
|
||||
partition = open_partition_handle()?;
|
||||
}
|
||||
|
||||
let slf = Self {
|
||||
@@ -36,7 +50,7 @@ impl StoreMeta {
|
||||
|
||||
slf.version.write(&slf.path_version())?;
|
||||
|
||||
Ok(slf)
|
||||
Ok((slf, partition))
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use std::{path::Path, thread};
|
||||
use std::{fs, path::Path, thread};
|
||||
|
||||
use brk_core::{
|
||||
AddressHash, Addressbytes, Addressindex, Addresstype, BlockHashPrefix, Height, TxidPrefix,
|
||||
Txindex,
|
||||
};
|
||||
use brk_vec::{Value, Version};
|
||||
use fjall::{PersistMode, TransactionalKeyspace};
|
||||
|
||||
use crate::Indexes;
|
||||
|
||||
@@ -18,22 +19,52 @@ use super::Vecs;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Stores {
|
||||
pub keyspace: TransactionalKeyspace,
|
||||
pub addresshash_to_addressindex: Store<AddressHash, Addressindex>,
|
||||
pub blockhash_prefix_to_height: Store<BlockHashPrefix, Height>,
|
||||
pub txid_prefix_to_txindex: Store<TxidPrefix, Txindex>,
|
||||
}
|
||||
|
||||
impl Stores {
|
||||
pub fn import(path: &Path) -> color_eyre::Result<Self> {
|
||||
pub fn forced_import(path: &Path) -> color_eyre::Result<Self> {
|
||||
fs::create_dir_all(path)?;
|
||||
|
||||
let keyspace = match Self::open_keyspace(path) {
|
||||
Ok(keyspace) => keyspace,
|
||||
Err(_) => {
|
||||
fs::remove_dir_all(path)?;
|
||||
return Self::forced_import(path);
|
||||
}
|
||||
};
|
||||
|
||||
thread::scope(|scope| {
|
||||
let addresshash_to_addressindex = scope
|
||||
.spawn(|| Store::import(&path.join("addresshash_to_addressindex"), Version::ZERO));
|
||||
let blockhash_prefix_to_height = scope
|
||||
.spawn(|| Store::import(&path.join("blockhash_prefix_to_height"), Version::ZERO));
|
||||
let txid_prefix_to_txindex =
|
||||
scope.spawn(|| Store::import(&path.join("txid_prefix_to_txindex"), Version::ZERO));
|
||||
let addresshash_to_addressindex = scope.spawn(|| {
|
||||
Store::import(
|
||||
keyspace.clone(),
|
||||
path,
|
||||
"addresshash_to_addressindex",
|
||||
Version::ZERO,
|
||||
)
|
||||
});
|
||||
let blockhash_prefix_to_height = scope.spawn(|| {
|
||||
Store::import(
|
||||
keyspace.clone(),
|
||||
path,
|
||||
"blockhash_prefix_to_height",
|
||||
Version::ZERO,
|
||||
)
|
||||
});
|
||||
let txid_prefix_to_txindex = scope.spawn(|| {
|
||||
Store::import(
|
||||
keyspace.clone(),
|
||||
path,
|
||||
"txid_prefix_to_txindex",
|
||||
Version::ZERO,
|
||||
)
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
keyspace: keyspace.clone(),
|
||||
addresshash_to_addressindex: addresshash_to_addressindex.join().unwrap()?,
|
||||
blockhash_prefix_to_height: blockhash_prefix_to_height.join().unwrap()?,
|
||||
txid_prefix_to_txindex: txid_prefix_to_txindex.join().unwrap()?,
|
||||
@@ -204,7 +235,7 @@ impl Stores {
|
||||
}
|
||||
|
||||
pub fn commit(&mut self, height: Height) -> fjall::Result<()> {
|
||||
thread::scope(|scope| {
|
||||
thread::scope(|scope| -> fjall::Result<()> {
|
||||
let addresshash_to_addressindex_commit_handle =
|
||||
scope.spawn(|| self.addresshash_to_addressindex.commit(height));
|
||||
let blockhash_prefix_to_height_commit_handle =
|
||||
@@ -217,7 +248,9 @@ impl Stores {
|
||||
txid_prefix_to_txindex_commit_handle.join().unwrap()?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})?;
|
||||
|
||||
self.keyspace.persist(PersistMode::SyncAll)
|
||||
}
|
||||
|
||||
pub fn rotate_memtables(&self) {
|
||||
@@ -225,4 +258,10 @@ impl Stores {
|
||||
self.blockhash_prefix_to_height.rotate_memtable();
|
||||
self.txid_prefix_to_txindex.rotate_memtable();
|
||||
}
|
||||
|
||||
fn open_keyspace(path: &Path) -> fjall::Result<TransactionalKeyspace> {
|
||||
fjall::Config::new(path.join("fjall"))
|
||||
.max_write_buffer_size(32 * 1024 * 1024)
|
||||
.open_transactional()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,10 +2,11 @@ use std::{fs, path::Path};
|
||||
|
||||
use brk_core::{
|
||||
Addressbytes, Addressindex, Addresstype, Addresstypeindex, BlockHash, Emptyindex, Height,
|
||||
LockTime, Multisigindex, Opreturnindex, P2PK33AddressBytes, P2PK33index, P2PK65AddressBytes,
|
||||
P2PK65index, P2PKHAddressBytes, P2PKHindex, P2SHAddressBytes, P2SHindex, P2TRAddressBytes,
|
||||
P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, Sats,
|
||||
StoredUsize, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight,
|
||||
Multisigindex, Opreturnindex, P2PK33AddressBytes, P2PK33index, P2PK65AddressBytes, P2PK65index,
|
||||
P2PKHAddressBytes, P2PKHindex, P2SHAddressBytes, P2SHindex, P2TRAddressBytes, P2TRindex,
|
||||
P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, RawLockTime,
|
||||
Sats, StoredUsize, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex,
|
||||
Weight,
|
||||
};
|
||||
use brk_vec::{AnyStoredVec, Compressed, Result, Version};
|
||||
use rayon::prelude::*;
|
||||
@@ -65,7 +66,7 @@ pub struct Vecs {
|
||||
pub txindex_to_first_txoutindex: IndexedVec<Txindex, Txoutindex>,
|
||||
pub txindex_to_height: IndexedVec<Txindex, Height>,
|
||||
pub txindex_to_is_explicitly_rbf: IndexedVec<Txindex, bool>,
|
||||
pub txindex_to_locktime: IndexedVec<Txindex, LockTime>,
|
||||
pub txindex_to_rawlocktime: IndexedVec<Txindex, RawLockTime>,
|
||||
pub txindex_to_total_size: IndexedVec<Txindex, usize>,
|
||||
pub txindex_to_txid: IndexedVec<Txindex, Txid>,
|
||||
pub txindex_to_txversion: IndexedVec<Txindex, TxVersion>,
|
||||
@@ -79,7 +80,7 @@ pub struct Vecs {
|
||||
}
|
||||
|
||||
impl Vecs {
|
||||
pub fn import(path: &Path, compressed: Compressed) -> color_eyre::Result<Self> {
|
||||
pub fn forced_import(path: &Path, compressed: Compressed) -> color_eyre::Result<Self> {
|
||||
fs::create_dir_all(path)?;
|
||||
|
||||
Ok(Self {
|
||||
@@ -253,7 +254,7 @@ impl Vecs {
|
||||
Version::ZERO,
|
||||
compressed,
|
||||
)?,
|
||||
txindex_to_locktime: IndexedVec::forced_import(
|
||||
txindex_to_rawlocktime: IndexedVec::forced_import(
|
||||
&path.join("txindex_to_locktime"),
|
||||
Version::ZERO,
|
||||
compressed,
|
||||
@@ -466,7 +467,7 @@ impl Vecs {
|
||||
.truncate_if_needed(txindex, saved_height)?;
|
||||
self.txindex_to_height
|
||||
.truncate_if_needed(txindex, saved_height)?;
|
||||
self.txindex_to_locktime
|
||||
self.txindex_to_rawlocktime
|
||||
.truncate_if_needed(txindex, saved_height)?;
|
||||
self.txindex_to_txid
|
||||
.truncate_if_needed(txindex, saved_height)?;
|
||||
@@ -644,7 +645,7 @@ impl Vecs {
|
||||
self.txindex_to_first_txinindex.any_vec(),
|
||||
self.txindex_to_first_txoutindex.any_vec(),
|
||||
self.txindex_to_height.any_vec(),
|
||||
self.txindex_to_locktime.any_vec(),
|
||||
self.txindex_to_rawlocktime.any_vec(),
|
||||
self.txindex_to_txid.any_vec(),
|
||||
self.txindex_to_base_size.any_vec(),
|
||||
self.txindex_to_total_size.any_vec(),
|
||||
@@ -706,7 +707,7 @@ impl Vecs {
|
||||
&mut self.txindex_to_first_txinindex,
|
||||
&mut self.txindex_to_first_txoutindex,
|
||||
&mut self.txindex_to_height,
|
||||
&mut self.txindex_to_locktime,
|
||||
&mut self.txindex_to_rawlocktime,
|
||||
&mut self.txindex_to_txid,
|
||||
&mut self.txindex_to_base_size,
|
||||
&mut self.txindex_to_total_size,
|
||||
|
||||
Reference in New Issue
Block a user