diff --git a/Cargo.lock b/Cargo.lock index 9345e9f2a..7470cbbeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -422,6 +422,7 @@ dependencies = [ "brk_parser", "brk_vec", "color-eyre", + "fjall", "log", ] diff --git a/crates/brk_cli/src/query.rs b/crates/brk_cli/src/query.rs index 8e826e067..ac358bbf4 100644 --- a/crates/brk_cli/src/query.rs +++ b/crates/brk_cli/src/query.rs @@ -10,10 +10,10 @@ pub fn query(params: QueryParams) -> color_eyre::Result<()> { let compressed = config.compressed(); - let mut indexer = Indexer::new(config.indexeddir(), compressed, config.check_collisions())?; + let mut indexer = Indexer::new(&config.outputsdir(), compressed, config.check_collisions())?; indexer.import_vecs()?; - let mut computer = Computer::new(config.computeddir(), config.fetcher(), compressed); + let mut computer = Computer::new(&config.outputsdir(), config.fetcher(), compressed); computer.import_vecs()?; let query = Query::build(&indexer, &computer); diff --git a/crates/brk_cli/src/run.rs b/crates/brk_cli/src/run.rs index 00ce5463d..d623b3111 100644 --- a/crates/brk_cli/src/run.rs +++ b/crates/brk_cli/src/run.rs @@ -28,12 +28,12 @@ pub fn run(config: RunConfig) -> color_eyre::Result<()> { let compressed = config.compressed(); - let mut indexer = Indexer::new(config.indexeddir(), compressed, config.check_collisions())?; + let mut indexer = Indexer::new(&config.outputsdir(), compressed, config.check_collisions())?; indexer.import_stores()?; indexer.import_vecs()?; - let mut computer = Computer::new(config.computeddir(), config.fetcher(), compressed); - computer.import_stores()?; + let mut computer = Computer::new(&config.outputsdir(), config.fetcher(), compressed); + computer.import_stores(&indexer)?; computer.import_vecs()?; tokio::runtime::Builder::new_multi_thread() @@ -357,18 +357,10 @@ impl RunConfig { .map_or_else(default_brk_path, |s| Self::fix_user_path(s.as_ref())) } - fn outputsdir(&self) -> PathBuf { + pub fn outputsdir(&self) -> PathBuf { self.brkdir().join("outputs") } - pub fn indexeddir(&self) -> PathBuf { - self.outputsdir().join("indexed") - } - - pub fn computeddir(&self) -> PathBuf { - self.outputsdir().join("computed") - } - pub fn harsdir(&self) -> PathBuf { self.outputsdir().join("hars") } diff --git a/crates/brk_computer/Cargo.toml b/crates/brk_computer/Cargo.toml index f34dbb718..5afc56a7d 100644 --- a/crates/brk_computer/Cargo.toml +++ b/crates/brk_computer/Cargo.toml @@ -15,4 +15,5 @@ brk_logger = { workspace = true } brk_parser = { workspace = true } brk_vec = { workspace = true } color-eyre = { workspace = true } +fjall = { workspace = true } log = { workspace = true } diff --git a/crates/brk_computer/examples/main.rs b/crates/brk_computer/examples/main.rs index b4b7688f2..a1961c788 100644 --- a/crates/brk_computer/examples/main.rs +++ b/crates/brk_computer/examples/main.rs @@ -26,14 +26,14 @@ pub fn main() -> color_eyre::Result<()> { let compressed = true; - let mut indexer = Indexer::new(outputs_dir.join("indexed"), compressed, true)?; + let mut indexer = Indexer::new(outputs_dir, compressed, true)?; indexer.import_stores()?; indexer.import_vecs()?; let fetcher = Fetcher::import(None)?; - let mut computer = Computer::new(outputs_dir.join("computed"), Some(fetcher), compressed); - computer.import_stores()?; + let mut computer = Computer::new(outputs_dir, Some(fetcher), compressed); + computer.import_stores(&indexer)?; computer.import_vecs()?; let starting_indexes = indexer.index(&parser, rpc, &exit)?; diff --git a/crates/brk_computer/src/lib.rs b/crates/brk_computer/src/lib.rs index cb01b03b5..4edf1fa00 100644 --- a/crates/brk_computer/src/lib.rs +++ b/crates/brk_computer/src/lib.rs @@ -26,9 +26,9 @@ pub struct Computer { } impl Computer { - pub fn new(computed_dir: PathBuf, fetcher: Option, compressed: bool) -> Self { + pub fn new(outputs_dir: &Path, fetcher: Option, compressed: bool) -> Self { Self { - path: computed_dir, + path: outputs_dir.to_owned(), fetcher, vecs: None, stores: None, @@ -38,7 +38,7 @@ impl Computer { pub fn import_vecs(&mut self) -> color_eyre::Result<()> { self.vecs = Some(Vecs::import( - &self.path.join("vecs"), + &self.path.join("vecs/computed"), self.fetcher.is_some(), self.compressed, )?); @@ -47,8 +47,11 @@ impl Computer { /// Do NOT import multiple times or things will break !!! /// Clone struct instead - pub fn import_stores(&mut self) -> color_eyre::Result<()> { - self.stores = Some(Stores::import(&self.path.join("stores"))?); + pub fn import_stores(&mut self, indexer: &Indexer) -> color_eyre::Result<()> { + self.stores = Some(Stores::import( + &self.path.join("stores"), + indexer.keyspace(), + )?); Ok(()) } } @@ -72,10 +75,6 @@ impl Computer { Ok(()) } - pub fn path(&self) -> &Path { - &self.path - } - pub fn vecs(&self) -> &Vecs { self.vecs.as_ref().unwrap() } diff --git a/crates/brk_computer/src/storage/stores.rs b/crates/brk_computer/src/storage/stores.rs index d7cc4adee..3ecae31cd 100644 --- a/crates/brk_computer/src/storage/stores.rs +++ b/crates/brk_computer/src/storage/stores.rs @@ -3,6 +3,7 @@ use std::path::Path; use brk_core::{AddressindexTxoutindex, Unit}; use brk_indexer::Store; use brk_vec::Version; +use fjall::TransactionalKeyspace; #[derive(Clone)] pub struct Stores { @@ -11,11 +12,19 @@ pub struct Stores { } impl Stores { - pub fn import(path: &Path) -> color_eyre::Result { - let address_to_utxos_received = - Store::import(&path.join("address_to_utxos_received"), Version::ZERO)?; - let address_to_utxos_spent = - Store::import(&path.join("address_to_utxos_spent"), Version::ZERO)?; + pub fn import(path: &Path, keyspace: &TransactionalKeyspace) -> color_eyre::Result { + let address_to_utxos_received = Store::import( + keyspace.clone(), + path, + "address_to_utxos_received", + Version::ZERO, + )?; + let address_to_utxos_spent = Store::import( + keyspace.clone(), + path, + "address_to_utxos_spent", + Version::ZERO, + )?; Ok(Self { address_to_utxos_received, diff --git a/crates/brk_computer/src/storage/vecs/marketprice.rs b/crates/brk_computer/src/storage/vecs/marketprice.rs index ff5f3ebb1..6530a566e 100644 --- a/crates/brk_computer/src/storage/vecs/marketprice.rs +++ b/crates/brk_computer/src/storage/vecs/marketprice.rs @@ -74,8 +74,7 @@ impl Vecs { let mut fetched_path = path.to_owned(); fetched_path.pop(); - fetched_path.pop(); - fetched_path = fetched_path.join("fetched/vecs"); + fetched_path = fetched_path.join("fetched"); Ok(Self { dateindex_to_ohlc_in_cents: EagerVec::forced_import( diff --git a/crates/brk_computer/src/storage/vecs/mod.rs b/crates/brk_computer/src/storage/vecs/mod.rs index 04a060007..cf8b8c316 100644 --- a/crates/brk_computer/src/storage/vecs/mod.rs +++ b/crates/brk_computer/src/storage/vecs/mod.rs @@ -12,7 +12,7 @@ pub mod marketprice; pub mod transactions; pub mod vec; -pub use indexes::*; +pub use indexes::Indexes; pub use vec::*; #[derive(Clone)] diff --git a/crates/brk_core/src/structs/height.rs b/crates/brk_core/src/structs/height.rs index 5edc88e66..3e5b3573d 100644 --- a/crates/brk_core/src/structs/height.rs +++ b/crates/brk_core/src/structs/height.rs @@ -181,12 +181,6 @@ impl From for Height { } } -impl From for bitcoin::locktime::absolute::Height { - fn from(value: Height) -> Self { - bitcoin::locktime::absolute::Height::from_consensus(value.0).unwrap() - } -} - impl TryFrom<&std::path::Path> for Height { type Error = crate::Error; fn try_from(value: &std::path::Path) -> Result { diff --git a/crates/brk_core/src/structs/locktime.rs b/crates/brk_core/src/structs/locktime.rs deleted file mode 100644 index 3c436eed2..000000000 --- a/crates/brk_core/src/structs/locktime.rs +++ /dev/null @@ -1,30 +0,0 @@ -use serde::Serialize; -use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; - -use super::{Height, Timestamp}; - -#[derive(Debug, Immutable, Clone, Copy, IntoBytes, KnownLayout, TryFromBytes, Serialize)] -#[repr(C)] -#[allow(warnings)] -pub enum LockTime { - Height(Height), - Timestamp(Timestamp), -} - -impl From for LockTime { - fn from(value: bitcoin::absolute::LockTime) -> Self { - match value { - bitcoin::absolute::LockTime::Blocks(h) => LockTime::Height(h.into()), - bitcoin::absolute::LockTime::Seconds(t) => LockTime::Timestamp(t.into()), - } - } -} - -impl From for bitcoin::absolute::LockTime { - fn from(value: LockTime) -> Self { - match value { - LockTime::Height(h) => bitcoin::absolute::LockTime::Blocks(h.into()), - LockTime::Timestamp(t) => bitcoin::absolute::LockTime::Seconds(t.into()), - } - } -} diff --git a/crates/brk_core/src/structs/mod.rs b/crates/brk_core/src/structs/mod.rs index a25923be8..b5723d8b1 100644 --- a/crates/brk_core/src/structs/mod.rs +++ b/crates/brk_core/src/structs/mod.rs @@ -15,10 +15,10 @@ mod dollars; mod feerate; mod halvingepoch; mod height; -mod locktime; mod monthindex; mod ohlc; mod quarterindex; +mod rawlocktime; mod sats; mod stored_u32; mod stored_u64; @@ -54,10 +54,10 @@ pub use dollars::*; pub use feerate::*; pub use halvingepoch::*; pub use height::*; -pub use locktime::*; pub use monthindex::*; pub use ohlc::*; pub use quarterindex::*; +pub use rawlocktime::*; pub use sats::*; pub use stored_u8::*; pub use stored_u32::*; diff --git a/crates/brk_core/src/structs/rawlocktime.rs b/crates/brk_core/src/structs/rawlocktime.rs new file mode 100644 index 000000000..d4263763f --- /dev/null +++ b/crates/brk_core/src/structs/rawlocktime.rs @@ -0,0 +1,29 @@ +use bitcoin::absolute::LockTime; +use serde::Serialize; +use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes}; + +#[derive(Debug, Immutable, Clone, Copy, IntoBytes, KnownLayout, TryFromBytes, Serialize)] +pub struct RawLockTime(u32); + +impl From for RawLockTime { + fn from(value: LockTime) -> Self { + Self(value.to_consensus_u32()) + } +} + +const CONSENSUS_DELIMITER: u32 = 500_000_000; + +impl From for LockTime { + fn from(value: RawLockTime) -> Self { + let value = value.0; + if value >= CONSENSUS_DELIMITER { + bitcoin::locktime::absolute::Height::from_consensus(value) + .unwrap() + .into() + } else { + bitcoin::locktime::absolute::Time::from_consensus(value) + .unwrap() + .into() + } + } +} diff --git a/crates/brk_core/src/structs/timestamp.rs b/crates/brk_core/src/structs/timestamp.rs index 419a4b2ca..ced075237 100644 --- a/crates/brk_core/src/structs/timestamp.rs +++ b/crates/brk_core/src/structs/timestamp.rs @@ -65,12 +65,6 @@ impl From for Timestamp { } } -impl From for bitcoin::locktime::absolute::Time { - fn from(value: Timestamp) -> Self { - bitcoin::locktime::absolute::Time::from_consensus(*value).unwrap() - } -} - impl From for Timestamp { fn from(value: usize) -> Self { Self(value as u32) diff --git a/crates/brk_indexer/examples/main.rs b/crates/brk_indexer/examples/main.rs index 849ccd39f..5a5ce1285 100644 --- a/crates/brk_indexer/examples/main.rs +++ b/crates/brk_indexer/examples/main.rs @@ -24,7 +24,7 @@ fn main() -> color_eyre::Result<()> { let outputs = Path::new("../../_outputs"); - let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), false, false)?; + let mut indexer = Indexer::new(outputs, false, false)?; indexer.import_stores()?; indexer.import_vecs()?; diff --git a/crates/brk_indexer/src/indexes.rs b/crates/brk_indexer/src/indexes.rs index aeb4b1b4b..1b3d1e315 100644 --- a/crates/brk_indexer/src/indexes.rs +++ b/crates/brk_indexer/src/indexes.rs @@ -207,19 +207,20 @@ impl TryFrom<(&mut Vecs, &Stores, &Client)> for Indexes { } } -pub fn starting_index<'a, I>( +pub fn starting_index<'a, I, T>( height_to_index: &'a IndexedVec, - index_to_height: &'a IndexedVec, + index_to_else: &'a IndexedVec, starting_height: Height, ) -> Result>> where I: StoredType + StoredIndex + From, + T: StoredType, { if height_to_index .height() .is_ok_and(|h| h + 1_u32 == starting_height) { - Ok(Some(Value::Owned(I::from(index_to_height.len())))) + Ok(Some(Value::Owned(I::from(index_to_else.len())))) } else { height_to_index.get(starting_height) } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 7b41dae9d..affc2d72c 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -20,6 +20,7 @@ use bitcoin::{Transaction, TxIn, TxOut}; use brk_exit::Exit; use brk_vec::Compressed; use color_eyre::eyre::{ContextCompat, eyre}; +use fjall::TransactionalKeyspace; use log::info; use rayon::prelude::*; mod indexes; @@ -44,13 +45,13 @@ pub struct Indexer { impl Indexer { pub fn new( - indexes_dir: PathBuf, + outputs_dir: &Path, compressed: bool, check_collisions: bool, ) -> color_eyre::Result { setrlimit()?; Ok(Self { - path: indexes_dir, + path: outputs_dir.to_owned(), vecs: None, stores: None, compressed: Compressed::from(compressed), @@ -59,14 +60,17 @@ impl Indexer { } pub fn import_vecs(&mut self) -> color_eyre::Result<()> { - self.vecs = Some(Vecs::import(&self.path.join("vecs"), self.compressed)?); + self.vecs = Some(Vecs::forced_import( + &self.path.join("vecs/indexed"), + self.compressed, + )?); Ok(()) } /// Do NOT import multiple times are things will break !!! /// Clone struct instead pub fn import_stores(&mut self) -> color_eyre::Result<()> { - self.stores = Some(Stores::import(&self.path.join("stores"))?); + self.stores = Some(Stores::forced_import(&self.path.join("stores"))?); Ok(()) } @@ -704,7 +708,7 @@ impl Indexer { vecs.txindex_to_txversion.push_if_needed(txindex, tx.version.into())?; vecs.txindex_to_txid.push_if_needed(txindex, txid)?; vecs.txindex_to_height.push_if_needed(txindex, height)?; - vecs.txindex_to_locktime.push_if_needed(txindex, tx.lock_time.into())?; + vecs.txindex_to_rawlocktime.push_if_needed(txindex, tx.lock_time.into())?; vecs.txindex_to_base_size.push_if_needed(txindex, tx.base_size())?; vecs.txindex_to_total_size.push_if_needed(txindex, tx.total_size())?; vecs.txindex_to_is_explicitly_rbf.push_if_needed(txindex, tx.is_explicitly_rbf())?; @@ -728,10 +732,6 @@ impl Indexer { Ok(starting_indexes) } - pub fn path(&self) -> &Path { - &self.path - } - pub fn vecs(&self) -> &Vecs { self.vecs.as_ref().unwrap() } @@ -747,6 +747,10 @@ impl Indexer { pub fn mut_stores(&mut self) -> &mut Stores { self.stores.as_mut().unwrap() } + + pub fn keyspace(&self) -> &TransactionalKeyspace { + &self.stores().keyspace + } } #[derive(Debug)] diff --git a/crates/brk_indexer/src/stores/base.rs b/crates/brk_indexer/src/stores/base.rs index 5f61d9087..1bfbd4e2d 100644 --- a/crates/brk_indexer/src/stores/base.rs +++ b/crates/brk_indexer/src/stores/base.rs @@ -10,7 +10,7 @@ use brk_core::Height; use brk_vec::{Value, Version}; use byteview::ByteView; use fjall::{ - PartitionCreateOptions, PersistMode, ReadTransaction, Result, TransactionalKeyspace, + PartitionCreateOptions, ReadTransaction, Result, TransactionalKeyspace, TransactionalPartitionHandle, }; use zerocopy::{Immutable, IntoBytes}; @@ -20,7 +20,7 @@ use super::StoreMeta; pub struct Store { meta: StoreMeta, keyspace: TransactionalKeyspace, - part: TransactionalPartitionHandle, + partition: TransactionalPartitionHandle, rtx: ReadTransaction, puts: BTreeMap, dels: BTreeSet, @@ -35,36 +35,31 @@ where V: Debug + Clone + Into + TryFrom, >::Error: error::Error + Send + Sync + 'static, { - pub fn import(path: &Path, version: Version) -> color_eyre::Result { + pub fn import( + keyspace: TransactionalKeyspace, + path: &Path, + name: &str, + version: Version, + ) -> color_eyre::Result { let version = MAJOR_FJALL_VERSION + version; - let meta = StoreMeta::checked_open(path, version)?; - - let keyspace = match Self::open_keyspace(path) { - Ok(keyspace) => keyspace, - Err(e) => { - dbg!(e); - meta.reset()?; - return Self::import(path, version); - } - }; - - let part = match Self::open_partition_handle(&keyspace) { - Ok(part) => part, - Err(e) => { - dbg!(e); - drop(keyspace); - meta.reset()?; - return Self::import(path, version); - } - }; + let (meta, partition) = StoreMeta::checked_open( + &keyspace, + &path.join(format!("meta/{name}")), + version, + || { + Self::open_partition_handle(&keyspace, name).inspect_err(|_| { + eprintln!("Delete {path:?} and try again"); + }) + }, + )?; let rtx = keyspace.read_tx(); Ok(Self { meta, keyspace, - part, + partition, rtx, puts: BTreeMap::new(), dels: BTreeSet::new(), @@ -74,7 +69,7 @@ where pub fn get(&self, key: &K) -> color_eyre::Result>> { if let Some(v) = self.puts.get(key) { Ok(Some(Value::Ref(v))) - } else if let Some(slice) = self.rtx.get(&self.part, key.as_bytes())? { + } else if let Some(slice) = self.rtx.get(&self.partition, key.as_bytes())? { Ok(Some(Value::Owned(V::try_from(slice.as_bytes().into())?))) } else { Ok(None) @@ -117,25 +112,25 @@ where mem::take(&mut self.dels) .into_iter() - .for_each(|key| wtx.remove(&self.part, key.as_bytes())); + .for_each(|key| wtx.remove(&self.partition, key.as_bytes())); mem::take(&mut self.puts) .into_iter() .for_each(|(key, value)| { if CHECK_COLLISISONS { #[allow(unused_must_use)] - if let Ok(Some(value)) = wtx.get(&self.part, key.as_bytes()) { + if let Ok(Some(value)) = wtx.get(&self.partition, key.as_bytes()) { dbg!( &key, V::try_from(value.as_bytes().into()).unwrap(), &self.meta, - self.rtx.get(&self.part, key.as_bytes()) + self.rtx.get(&self.partition, key.as_bytes()) ); unreachable!(); } } wtx.insert( - &self.part, + &self.partition, key.as_bytes(), &*ByteView::try_from(value).unwrap(), ) @@ -143,15 +138,13 @@ where wtx.commit()?; - self.keyspace.persist(PersistMode::SyncAll)?; - self.rtx = self.keyspace.read_tx(); Ok(()) } pub fn rotate_memtable(&self) { - let _ = self.part.inner().rotate_memtable(); + let _ = self.partition.inner().rotate_memtable(); } pub fn height(&self) -> Option { @@ -172,17 +165,12 @@ where self.meta.needs(height) } - fn open_keyspace(path: &Path) -> Result { - fjall::Config::new(path.join("fjall")) - .max_write_buffer_size(32 * 1024 * 1024) - .open_transactional() - } - fn open_partition_handle( keyspace: &TransactionalKeyspace, + name: &str, ) -> Result { keyspace.open_partition( - "partition", + name, PartitionCreateOptions::default() .bloom_filter_bits(Some(5)) .max_memtable_size(8 * 1024 * 1024) @@ -200,7 +188,7 @@ where Self { meta: self.meta.clone(), keyspace: self.keyspace.clone(), - part: self.part.clone(), + partition: self.partition.clone(), rtx: self.keyspace.read_tx(), puts: self.puts.clone(), dels: self.dels.clone(), diff --git a/crates/brk_indexer/src/stores/meta.rs b/crates/brk_indexer/src/stores/meta.rs index dc6b8b914..80967f4d1 100644 --- a/crates/brk_indexer/src/stores/meta.rs +++ b/crates/brk_indexer/src/stores/meta.rs @@ -4,6 +4,7 @@ use std::{ }; use brk_vec::Version; +use fjall::{TransactionalKeyspace, TransactionalPartitionHandle}; use zerocopy::{FromBytes, IntoBytes}; use super::Height; @@ -17,14 +18,27 @@ pub struct StoreMeta { } impl StoreMeta { - pub fn checked_open(path: &Path, version: Version) -> color_eyre::Result { + pub fn checked_open( + keyspace: &TransactionalKeyspace, + path: &Path, + version: Version, + open_partition_handle: F, + ) -> color_eyre::Result<(Self, TransactionalPartitionHandle)> + where + F: Fn() -> fjall::Result, + { fs::create_dir_all(path)?; let is_same_version = Version::try_from(Self::path_version_(path).as_path()) .is_ok_and(|prev_version| version == prev_version); + let mut partition = open_partition_handle()?; + if !is_same_version { Self::reset_(path)?; + keyspace.delete_partition(partition)?; + keyspace.persist(fjall::PersistMode::SyncAll)?; + partition = open_partition_handle()?; } let slf = Self { @@ -36,7 +50,7 @@ impl StoreMeta { slf.version.write(&slf.path_version())?; - Ok(slf) + Ok((slf, partition)) } pub fn len(&self) -> usize { diff --git a/crates/brk_indexer/src/stores/mod.rs b/crates/brk_indexer/src/stores/mod.rs index ecda0e5b6..8fc5ebf00 100644 --- a/crates/brk_indexer/src/stores/mod.rs +++ b/crates/brk_indexer/src/stores/mod.rs @@ -1,10 +1,11 @@ -use std::{path::Path, thread}; +use std::{fs, path::Path, thread}; use brk_core::{ AddressHash, Addressbytes, Addressindex, Addresstype, BlockHashPrefix, Height, TxidPrefix, Txindex, }; use brk_vec::{Value, Version}; +use fjall::{PersistMode, TransactionalKeyspace}; use crate::Indexes; @@ -18,22 +19,52 @@ use super::Vecs; #[derive(Clone)] pub struct Stores { + pub keyspace: TransactionalKeyspace, pub addresshash_to_addressindex: Store, pub blockhash_prefix_to_height: Store, pub txid_prefix_to_txindex: Store, } impl Stores { - pub fn import(path: &Path) -> color_eyre::Result { + pub fn forced_import(path: &Path) -> color_eyre::Result { + fs::create_dir_all(path)?; + + let keyspace = match Self::open_keyspace(path) { + Ok(keyspace) => keyspace, + Err(_) => { + fs::remove_dir_all(path)?; + return Self::forced_import(path); + } + }; + thread::scope(|scope| { - let addresshash_to_addressindex = scope - .spawn(|| Store::import(&path.join("addresshash_to_addressindex"), Version::ZERO)); - let blockhash_prefix_to_height = scope - .spawn(|| Store::import(&path.join("blockhash_prefix_to_height"), Version::ZERO)); - let txid_prefix_to_txindex = - scope.spawn(|| Store::import(&path.join("txid_prefix_to_txindex"), Version::ZERO)); + let addresshash_to_addressindex = scope.spawn(|| { + Store::import( + keyspace.clone(), + path, + "addresshash_to_addressindex", + Version::ZERO, + ) + }); + let blockhash_prefix_to_height = scope.spawn(|| { + Store::import( + keyspace.clone(), + path, + "blockhash_prefix_to_height", + Version::ZERO, + ) + }); + let txid_prefix_to_txindex = scope.spawn(|| { + Store::import( + keyspace.clone(), + path, + "txid_prefix_to_txindex", + Version::ZERO, + ) + }); Ok(Self { + keyspace: keyspace.clone(), addresshash_to_addressindex: addresshash_to_addressindex.join().unwrap()?, blockhash_prefix_to_height: blockhash_prefix_to_height.join().unwrap()?, txid_prefix_to_txindex: txid_prefix_to_txindex.join().unwrap()?, @@ -204,7 +235,7 @@ impl Stores { } pub fn commit(&mut self, height: Height) -> fjall::Result<()> { - thread::scope(|scope| { + thread::scope(|scope| -> fjall::Result<()> { let addresshash_to_addressindex_commit_handle = scope.spawn(|| self.addresshash_to_addressindex.commit(height)); let blockhash_prefix_to_height_commit_handle = @@ -217,7 +248,9 @@ impl Stores { txid_prefix_to_txindex_commit_handle.join().unwrap()?; Ok(()) - }) + })?; + + self.keyspace.persist(PersistMode::SyncAll) } pub fn rotate_memtables(&self) { @@ -225,4 +258,10 @@ impl Stores { self.blockhash_prefix_to_height.rotate_memtable(); self.txid_prefix_to_txindex.rotate_memtable(); } + + fn open_keyspace(path: &Path) -> fjall::Result { + fjall::Config::new(path.join("fjall")) + .max_write_buffer_size(32 * 1024 * 1024) + .open_transactional() + } } diff --git a/crates/brk_indexer/src/vecs/mod.rs b/crates/brk_indexer/src/vecs/mod.rs index a671bf0db..439d42d7e 100644 --- a/crates/brk_indexer/src/vecs/mod.rs +++ b/crates/brk_indexer/src/vecs/mod.rs @@ -2,10 +2,11 @@ use std::{fs, path::Path}; use brk_core::{ Addressbytes, Addressindex, Addresstype, Addresstypeindex, BlockHash, Emptyindex, Height, - LockTime, Multisigindex, Opreturnindex, P2PK33AddressBytes, P2PK33index, P2PK65AddressBytes, - P2PK65index, P2PKHAddressBytes, P2PKHindex, P2SHAddressBytes, P2SHindex, P2TRAddressBytes, - P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, Sats, - StoredUsize, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight, + Multisigindex, Opreturnindex, P2PK33AddressBytes, P2PK33index, P2PK65AddressBytes, P2PK65index, + P2PKHAddressBytes, P2PKHindex, P2SHAddressBytes, P2SHindex, P2TRAddressBytes, P2TRindex, + P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, RawLockTime, + Sats, StoredUsize, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, + Weight, }; use brk_vec::{AnyStoredVec, Compressed, Result, Version}; use rayon::prelude::*; @@ -65,7 +66,7 @@ pub struct Vecs { pub txindex_to_first_txoutindex: IndexedVec, pub txindex_to_height: IndexedVec, pub txindex_to_is_explicitly_rbf: IndexedVec, - pub txindex_to_locktime: IndexedVec, + pub txindex_to_rawlocktime: IndexedVec, pub txindex_to_total_size: IndexedVec, pub txindex_to_txid: IndexedVec, pub txindex_to_txversion: IndexedVec, @@ -79,7 +80,7 @@ pub struct Vecs { } impl Vecs { - pub fn import(path: &Path, compressed: Compressed) -> color_eyre::Result { + pub fn forced_import(path: &Path, compressed: Compressed) -> color_eyre::Result { fs::create_dir_all(path)?; Ok(Self { @@ -253,7 +254,7 @@ impl Vecs { Version::ZERO, compressed, )?, - txindex_to_locktime: IndexedVec::forced_import( + txindex_to_rawlocktime: IndexedVec::forced_import( &path.join("txindex_to_locktime"), Version::ZERO, compressed, @@ -466,7 +467,7 @@ impl Vecs { .truncate_if_needed(txindex, saved_height)?; self.txindex_to_height .truncate_if_needed(txindex, saved_height)?; - self.txindex_to_locktime + self.txindex_to_rawlocktime .truncate_if_needed(txindex, saved_height)?; self.txindex_to_txid .truncate_if_needed(txindex, saved_height)?; @@ -644,7 +645,7 @@ impl Vecs { self.txindex_to_first_txinindex.any_vec(), self.txindex_to_first_txoutindex.any_vec(), self.txindex_to_height.any_vec(), - self.txindex_to_locktime.any_vec(), + self.txindex_to_rawlocktime.any_vec(), self.txindex_to_txid.any_vec(), self.txindex_to_base_size.any_vec(), self.txindex_to_total_size.any_vec(), @@ -706,7 +707,7 @@ impl Vecs { &mut self.txindex_to_first_txinindex, &mut self.txindex_to_first_txoutindex, &mut self.txindex_to_height, - &mut self.txindex_to_locktime, + &mut self.txindex_to_rawlocktime, &mut self.txindex_to_txid, &mut self.txindex_to_base_size, &mut self.txindex_to_total_size, diff --git a/crates/brk_query/examples/main.rs b/crates/brk_query/examples/main.rs index 19dafe051..a59fce8b5 100644 --- a/crates/brk_query/examples/main.rs +++ b/crates/brk_query/examples/main.rs @@ -11,10 +11,10 @@ pub fn main() -> color_eyre::Result<()> { let compressed = true; - let mut indexer = Indexer::new(outputs_dir.join("indexed"), compressed, true)?; + let mut indexer = Indexer::new(outputs_dir, compressed, true)?; indexer.import_vecs()?; - let mut computer = Computer::new(outputs_dir.join("computed"), None, compressed); + let mut computer = Computer::new(outputs_dir, None, compressed); computer.import_vecs()?; let query = Query::build(&indexer, &computer); diff --git a/crates/brk_server/examples/main.rs b/crates/brk_server/examples/main.rs index 76e260d74..0d2ac3426 100644 --- a/crates/brk_server/examples/main.rs +++ b/crates/brk_server/examples/main.rs @@ -32,14 +32,14 @@ pub fn main() -> color_eyre::Result<()> { let compressed = true; - let mut indexer = Indexer::new(outputs_dir.join("indexed"), compressed, true)?; + let mut indexer = Indexer::new(outputs_dir, compressed, true)?; indexer.import_stores()?; indexer.import_vecs()?; let fetcher = Some(Fetcher::import(None)?); - let mut computer = Computer::new(outputs_dir.join("computed"), fetcher, compressed); - computer.import_stores()?; + let mut computer = Computer::new(outputs_dir, fetcher, compressed); + computer.import_stores(&indexer)?; computer.import_vecs()?; tokio::runtime::Builder::new_multi_thread()