diff --git a/bindex/src/lib.rs b/bindex/src/lib.rs index 0d8cdbac4..5a4bbb159 100644 --- a/bindex/src/lib.rs +++ b/bindex/src/lib.rs @@ -12,7 +12,6 @@ pub use biter::*; use biter::bitcoin::{Transaction, TxIn, TxOut, Txid}; use color_eyre::eyre::{eyre, ContextCompat}; use exit::Exit; -use fjall::{PersistMode, ReadTransaction, TransactionalKeyspace}; use rayon::prelude::*; mod storage; @@ -36,10 +35,10 @@ pub struct Indexer { impl Indexer { pub fn import(indexes_dir: &Path) -> color_eyre::Result { - let vecs = StorableVecs::import(&indexes_dir.join("vecs"))?; - let parts = Fjalls::import(&indexes_dir.join("fjall"))?; - - Ok(Self { vecs, parts }) + Ok(Self { + vecs: StorableVecs::import(&indexes_dir.join("vecs"))?, + parts: Fjalls::import(&indexes_dir.join("fjall"))?, + }) } pub fn index(&mut self, bitcoin_dir: &Path, rpc: rpc::Client, exit: &Exit) -> color_eyre::Result<()> { @@ -93,7 +92,7 @@ impl Indexer { // // let mut stores_opt = Some(stores); // let mut rtx_opt = Some(rtx); - biter::new(bitcoin_dir, Some(height.into()), None, rpc) + biter::new(bitcoin_dir, Some(height.into()), Some(400_000), rpc) .iter() .try_for_each(|(_height, block, blockhash)| -> color_eyre::Result<()> { println!("Processing block {_height}..."); @@ -674,6 +673,7 @@ enum InputSource<'a> { SameBlock((&'a Transaction, Txindex, &'a TxIn, Vin)), } +#[allow(unused)] fn pause() { let mut stdin = std::io::stdin(); let mut stdout = std::io::stdout(); diff --git a/bindex/src/storage/fjalls/base.rs b/bindex/src/storage/fjalls/base.rs index 43385992c..ca69556f1 100644 --- a/bindex/src/storage/fjalls/base.rs +++ b/bindex/src/storage/fjalls/base.rs @@ -4,6 +4,7 @@ use fjall::{ PartitionCreateOptions, PersistMode, ReadTransaction, Result, Slice, TransactionalKeyspace, TransactionalPartitionHandle, }; +use storable_vec::UnsafeSizedSerDe; use crate::structs::{Height, Version}; @@ -24,48 +25,38 @@ where { pub fn import(path: &Path, version: Version) -> color_eyre::Result { let meta = Meta::checked_open(path, version)?; - - let keyspace = fjall::Config::new(path.join("fjall")).open_transactional()?; - let handle = keyspace.open_partition( - "partition", - PartitionCreateOptions::default().manual_journal_persist(true), - )?; + let keyspace = if let Ok(keyspace) = Self::open_keyspace(path) { + keyspace + } else { + meta.reset()?; + return Self::import(path, version); + }; + let part = if let Ok(part) = Self::open_partition_handle(&keyspace) { + part + } else { + drop(keyspace); + meta.reset()?; + return Self::import(path, version); + }; let rtx = keyspace.read_tx(); Ok(Self { meta, keyspace, - part: handle, + part, rtx, puts: BTreeMap::new(), }) } - pub fn len(&self) -> usize { - self.meta.len() + self.puts.len() - } - - pub fn has(&self, height: Height) -> bool { - self.height().is_some_and(|self_height| self_height >= &height) - } - pub fn needs(&self, height: Height) -> bool { - !self.has(height) - } - - pub fn get<'a>(&self, key: &'a Key) -> color_eyre::Result> + pub fn get(&self, key: &Key) -> color_eyre::Result> where - fjall::Slice: std::convert::From<&'a Key>, - >::Error: std::error::Error + Send + Sync, - >::Error: 'static, + >::Error: std::error::Error + Send + Sync + 'static, { if let Some(v) = self.puts.get(key) { - return Ok(Some(v.clone())); - } - - if let Some(slice) = self.rtx.get(&self.part, Slice::from(key))? { - let v_res = Value::try_from(slice); - let v = v_res?; - Ok(Some(v)) + Ok(Some(v.clone())) + } else if let Some(slice) = self.rtx.get(&self.part, key.unsafe_as_slice())? { + Ok(Some(Value::try_from(slice)?)) } else { Ok(None) } @@ -82,11 +73,13 @@ where return Ok(()); } + self.meta.export(self.len(), height)?; + let mut wtx = self.keyspace.write_tx(); mem::take(&mut self.puts) .into_iter() .for_each(|(key, value)| wtx.insert(&self.part, key, value)); - self.meta.export(self.len(), height)?; + wtx.commit()?; self.keyspace.persist(PersistMode::SyncAll)?; @@ -99,4 +92,26 @@ where pub fn height(&self) -> Option<&Height> { self.meta.height() } + + pub fn len(&self) -> usize { + self.meta.len() + self.puts.len() + } + + pub fn has(&self, height: Height) -> bool { + self.meta.has(height) + } + pub fn needs(&self, height: Height) -> bool { + self.meta.needs(height) + } + + fn open_keyspace(path: &Path) -> Result { + fjall::Config::new(path.join("fjall")).open_transactional() + } + + fn open_partition_handle(keyspace: &TransactionalKeyspace) -> Result { + keyspace.open_partition( + "partition", + PartitionCreateOptions::default().manual_journal_persist(true), + ) + } } diff --git a/bindex/src/storage/fjalls/meta.rs b/bindex/src/storage/fjalls/meta.rs index 3c04e38e2..2431a0fe4 100644 --- a/bindex/src/storage/fjalls/meta.rs +++ b/bindex/src/storage/fjalls/meta.rs @@ -22,8 +22,7 @@ impl Meta { Version::try_from(Self::path_version_(path).as_path()).is_ok_and(|prev_version| version == prev_version); if !is_same_version { - fs::remove_dir_all(path)?; - fs::create_dir(path)?; + Self::reset_(path)?; } let this = Self { @@ -49,6 +48,14 @@ impl Meta { height.write(&self.path_height()) } + pub fn reset(&self) -> io::Result<()> { + Self::reset_(self.pathbuf.as_path()) + } + fn reset_(path: &Path) -> io::Result<()> { + fs::remove_dir_all(path)?; + fs::create_dir(path) + } + fn path_version(&self) -> PathBuf { Self::path_version_(&self.pathbuf) } @@ -72,9 +79,9 @@ impl Meta { path.join("height") } - fn read_length(&self) -> color_eyre::Result { - Self::read_length_(&self.pathbuf) - } + // fn read_length(&self) -> color_eyre::Result { + // Self::read_length_(&self.pathbuf) + // } fn read_length_(path: &Path) -> color_eyre::Result { Ok(fs::read(Self::path_length(path)) .map(|v| usize::unsafe_try_from_slice(v.as_slice()).cloned().unwrap_or_default())