From a122333aaa3da43f14e4dfee6af6ad7a13814433 Mon Sep 17 00:00:00 2001 From: nym21 Date: Sat, 15 Feb 2025 12:04:20 +0100 Subject: [PATCH] indexer: rm canopy+sanakirja + init rollback; svec: added truncate --- Cargo.lock | 26 +-- Cargo.toml | 2 +- indexer/src/storage/canopies/database.rs | 50 ----- indexer/src/storage/canopies/environment.rs | 20 -- indexer/src/storage/canopies/mod.rs | 9 - indexer/src/storage/canopies/transaction.rs | 19 -- indexer/src/storage/canopies/tree.rs | 84 -------- indexer/src/storage/sanakirjas/meta.rs | 81 -------- indexer/src/storage/sanakirjas/mod.rs | 174 ----------------- indexer/src/storage/sanakirjas/multi.rs | 103 ---------- indexer/src/storage/sanakirjas/unique.rs | 102 ---------- indexer/src/storage/storable_vecs/base.rs | 5 + indexer/src/storage/storable_vecs/mod.rs | 203 ++++++++++---------- indexer/src/structs/height.rs | 4 + pricer/src/lib.rs | 99 +++++----- pricer/src/main.rs | 8 +- storable_vec/src/lib.rs | 17 ++ 17 files changed, 195 insertions(+), 811 deletions(-) delete mode 100644 indexer/src/storage/canopies/database.rs delete mode 100644 indexer/src/storage/canopies/environment.rs delete mode 100644 indexer/src/storage/canopies/mod.rs delete mode 100644 indexer/src/storage/canopies/transaction.rs delete mode 100644 indexer/src/storage/canopies/tree.rs delete mode 100644 indexer/src/storage/sanakirjas/meta.rs delete mode 100644 indexer/src/storage/sanakirjas/mod.rs delete mode 100644 indexer/src/storage/sanakirjas/multi.rs delete mode 100644 indexer/src/storage/sanakirjas/unique.rs diff --git a/Cargo.lock b/Cargo.lock index 0c7f15b41..70148638b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -288,7 +288,7 @@ dependencies = [ "serde", "serde_bytes", "storable_vec", - "zerocopy 0.8.17", + "zerocopy 0.8.18", ] [[package]] @@ -400,7 +400,7 @@ dependencies = [ "fjall", "pricer", "storable_vec", - "zerocopy 0.8.17", + "zerocopy 0.8.18", ] [[package]] @@ -737,9 +737,9 @@ dependencies = [ [[package]] name = "equivalent" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "errno" @@ -2119,7 +2119,7 @@ dependencies = [ "serde", "serde_json", "storable_vec", - "zerocopy 0.8.17", + "zerocopy 0.8.18", ] [[package]] @@ -2587,9 +2587,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.13.2" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" [[package]] name = "smawk" @@ -2632,7 +2632,7 @@ dependencies = [ "memmap2", "serde", "serde_json", - "zerocopy 0.8.17", + "zerocopy 0.8.18", ] [[package]] @@ -3318,11 +3318,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.17" +version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa91407dacce3a68c56de03abe2760159582b846c6a4acd2f456618087f12713" +checksum = "79386d31a42a4996e3336b0919ddb90f81112af416270cff95b5f5af22b839c2" dependencies = [ - "zerocopy-derive 0.8.17", + "zerocopy-derive 0.8.18", ] [[package]] @@ -3338,9 +3338,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.17" +version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06718a168365cad3d5ff0bb133aad346959a2074bd4a85c121255a11304a8626" +checksum = "76331675d372f91bf8d17e13afbd5fe639200b73d01f0fc748bb059f9cca2db7" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 73e061e28..68e1b0483 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,4 +29,4 @@ serde = { version = "1.0.217", features = ["derive"] } serde_json = { version = "1.0.138", features = ["float_roundtrip"] } server = { path = "server", package = "berver" } storable_vec = { path = "storable_vec", features = ["json"] } -zerocopy = { version = "0.8.17", features = ["derive"] } +zerocopy = { version = "0.8.18", features = ["derive"] } diff --git a/indexer/src/storage/canopies/database.rs b/indexer/src/storage/canopies/database.rs deleted file mode 100644 index fd871d742..000000000 --- a/indexer/src/storage/canopies/database.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::{ - ops::{Deref, DerefMut}, - time::Duration, -}; - -use canopydb::{Database as CanopyDatabase, DbOptions, Error, WriteTransaction}; - -use super::Environment; - -#[derive(Debug)] -pub struct Database { - db: CanopyDatabase, - // pub wtx: WriteTransaction, -} -impl Deref for Database { - type Target = CanopyDatabase; - fn deref(&self) -> &Self::Target { - &self.db - } -} -impl DerefMut for Database { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.db - } -} - -impl Database { - pub fn new(environment: &Environment, name: &str) -> color_eyre::Result { - let mut options = DbOptions::default(); - options.use_wal = false; - options.checkpoint_interval = Duration::from_secs(u64::MAX); - options.checkpoint_target_size = usize::MAX; - options.throttle_memory_limit = usize::MAX; - options.stall_memory_limit = usize::MAX; - options.write_txn_memory_limit = usize::MAX; - - let db = environment.get_or_create_database_with(name, options)?; - - Ok(Self { - // wtx: db.begin_write()?, - db, - }) - } - - pub fn flush(&mut self) -> Result<(), Error> { - // drop(blockhash_prefix_to_height_tree); - // blockhash_prefix_to_height_tx_opt.take().map(|tx| tx.commit()); - self.checkpoint() - } -} diff --git a/indexer/src/storage/canopies/environment.rs b/indexer/src/storage/canopies/environment.rs deleted file mode 100644 index 562e159d9..000000000 --- a/indexer/src/storage/canopies/environment.rs +++ /dev/null @@ -1,20 +0,0 @@ -use std::path::Path; - -use canopydb::{EnvOptions, Environment as CanopyEnvironment}; -use derive_deref::{Deref, DerefMut}; - -#[derive(Debug, Deref, DerefMut)] -pub struct Environment(CanopyEnvironment); - -impl Environment { - pub fn new(path: &Path) -> color_eyre::Result { - let mut options = EnvOptions::new(path); - // options.use_mmap = true; - options.disable_fsync = true; - options.wal_new_file_on_checkpoint = false; - options.wal_background_sync_interval = None; - options.wal_write_batch_memory_limit = usize::MAX; - - Ok(Self(CanopyEnvironment::with_options(options)?)) - } -} diff --git a/indexer/src/storage/canopies/mod.rs b/indexer/src/storage/canopies/mod.rs deleted file mode 100644 index 46cb95a00..000000000 --- a/indexer/src/storage/canopies/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -mod database; -mod environment; -// mod transaction; -mod tree; - -pub use database::*; -pub use environment::*; -// pub use transaction::*; -pub use tree::*; diff --git a/indexer/src/storage/canopies/transaction.rs b/indexer/src/storage/canopies/transaction.rs deleted file mode 100644 index d1a6ca13c..000000000 --- a/indexer/src/storage/canopies/transaction.rs +++ /dev/null @@ -1,19 +0,0 @@ -use canopydb::{Tree as CanopyTree, TreeOptions, WriteTransaction}; - -use super::{Database, Tree}; - -#[derive(Debug)] -pub struct Transaction<'a, K, V> { - tx: WriteTransaction, - tree: Tree<'a, K, V>, -} - -impl<'a, K, V> Transaction<'a, K, V> { - pub fn new(db: &Database) -> color_eyre::Result { - let tx = db.begin_write()?; - - let tree = Tree::new(&tx)?; - - Ok(Self { tx, tree }) - } -} diff --git a/indexer/src/storage/canopies/tree.rs b/indexer/src/storage/canopies/tree.rs deleted file mode 100644 index 7809835ea..000000000 --- a/indexer/src/storage/canopies/tree.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::{ - fmt::Debug, - marker::PhantomData, - ops::{Deref, DerefMut}, -}; - -use canopydb::{Tree as CanopyTree, TreeOptions, WriteTransaction}; -use color_eyre::eyre::eyre; - -#[derive(Debug)] -pub struct Tree<'a, K, V> { - tree: CanopyTree<'a>, - k: PhantomData, - v: PhantomData, -} -impl<'a, K, V> Deref for Tree<'a, K, V> { - type Target = CanopyTree<'a>; - fn deref(&self) -> &Self::Target { - &self.tree - } -} -impl<'a, K, V> DerefMut for Tree<'a, K, V> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.tree - } -} - -impl<'a, K, V> Tree<'a, K, V> -where - K: Debug + Sized, - V: Debug + Sized + Clone + Copy, -{ - const SIZE_OF_K: usize = size_of::(); - const SIZE_OF_V: usize = size_of::(); - - pub fn new(tx: &'a WriteTransaction) -> color_eyre::Result { - let mut options = TreeOptions::new(); - options.compress_overflow_values = None; - options.fixed_key_len = size_of::() as i8; - options.fixed_value_len = size_of::() as i8; - - Ok(Self { - tree: tx.get_or_create_tree_with(b"tree", options)?, - k: PhantomData, - v: PhantomData, - }) - } - - pub fn get(&self, key: &K) -> color_eyre::Result> { - let slice = self.tree.get(Self::key_as_slice(key))?; - - if slice.is_none() { - return Ok(None); - } - - let slice = slice.unwrap(); - - let (prefix, shorts, suffix) = unsafe { slice.align_to::() }; - - if !prefix.is_empty() || shorts.len() != 1 || !suffix.is_empty() { - dbg!(&key, &prefix, &shorts, &suffix); - return Err(eyre!("align_to issue")); - } - - Ok(Some(shorts[0])) - } - - pub fn insert(&mut self, key: &K, value: &V) -> Result<(), canopydb::Error> { - self.tree - .insert(Self::key_as_slice(key), Self::value_as_slice(value)) - } - - fn key_as_slice(key: &K) -> &[u8] { - let data: *const K = key; - let data: *const u8 = data as *const u8; - unsafe { std::slice::from_raw_parts(data, Self::SIZE_OF_K) } - } - - fn value_as_slice(value: &V) -> &[u8] { - let data: *const V = value; - let data: *const u8 = data as *const u8; - unsafe { std::slice::from_raw_parts(data, Self::SIZE_OF_V) } - } -} diff --git a/indexer/src/storage/sanakirjas/meta.rs b/indexer/src/storage/sanakirjas/meta.rs deleted file mode 100644 index b73ac3385..000000000 --- a/indexer/src/storage/sanakirjas/meta.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::{ - fs, io, - path::{Path, PathBuf}, -}; - -use snkrj::UnitDatabase; - -use super::{Height, Version}; - -pub struct StoreMeta { - pathbuf: PathBuf, - version: Version, - height: Option, - pub len: usize, -} - -impl StoreMeta { - pub fn checked_open(path: &Path, version: Version) -> Result { - fs::create_dir_all(path)?; - - let is_same_version = - Version::try_from(Self::path_version_(path).as_path()).is_ok_and(|prev_version| version == prev_version); - - if !is_same_version { - fs::remove_dir_all(path)?; - fs::create_dir(path)?; - } - - let this = Self { - pathbuf: path.to_owned(), - version, - height: Height::try_from(Self::path_height_(path).as_path()).ok(), - len: UnitDatabase::read_length_(path), - }; - - this.version.write(&this.path_version())?; - - Ok(this) - } - - #[allow(unused)] - pub fn len(&self) -> usize { - self.len - } - - pub fn export(mut self, height: Height) -> Result<(), io::Error> { - self.height = Some(height); - height.write(&self.path_height())?; - UnitDatabase::write_length_(&self.pathbuf, self.len) - } - - pub fn path_parts(&self) -> PathBuf { - Self::path_parts_(&self.pathbuf) - } - fn path_parts_(path: &Path) -> PathBuf { - path.join("parts") - } - - fn path_version(&self) -> PathBuf { - Self::path_version_(&self.pathbuf) - } - fn path_version_(path: &Path) -> PathBuf { - path.join("version") - } - - pub fn height(&self) -> Option<&Height> { - self.height.as_ref() - } - pub fn needs(&self, height: Height) -> bool { - self.height.is_none_or(|self_height| height > self_height) - } - pub fn has(&self, height: Height) -> bool { - !self.needs(height) - } - fn path_height(&self) -> PathBuf { - Self::path_height_(&self.pathbuf) - } - fn path_height_(path: &Path) -> PathBuf { - path.join("height") - } -} diff --git a/indexer/src/storage/sanakirjas/mod.rs b/indexer/src/storage/sanakirjas/mod.rs deleted file mode 100644 index 8a00fb17a..000000000 --- a/indexer/src/storage/sanakirjas/mod.rs +++ /dev/null @@ -1,174 +0,0 @@ -use std::{path::Path, thread}; - -use crate::{ - structs::Version, AddressbytesPrefix, Addressindex, BlockHashPrefix, Height, TxidPrefix, Txindex, Txoutindex, -}; - -mod meta; -mod multi; -mod unique; - -use meta::*; -use unique::*; - -pub struct Stores { - pub addressbytes_prefix_to_addressindex: StoreUnique, - pub blockhash_prefix_to_height: StoreUnique, - pub txid_prefix_to_txindex: StoreUnique, -} - -impl Stores { - pub fn open(path: &Path) -> color_eyre::Result { - Ok(Self { - addressbytes_prefix_to_addressindex: StoreUnique::open( - &path.join("addressbytes_prefix_to_addressindex"), - Version::from(1), - )?, - blockhash_prefix_to_height: StoreUnique::open(&path.join("blockhash_prefix_to_height"), Version::from(1))?, - txid_prefix_to_txindex: StoreUnique::open(&path.join("txid_prefix_to_txindex"), Version::from(1))?, - }) - } - - // pub fn rollback_from( - // &mut self, - // _wtx: &mut WriteTransaction, - // _height: Height, - // _exit: &Exit, - // ) -> color_eyre::Result<()> { - // panic!(); - // let mut txindex = None; - - // wtx.range(self.height_to_blockhash.data(), Slice::from(height)..) - // .try_for_each(|slice| -> color_eyre::Result<()> { - // let (height_slice, slice_blockhash) = slice?; - // let blockhash = BlockHash::from_slice(&slice_blockhash)?; - - // wtx.remove(self.height_to_blockhash.data(), height_slice); - - // wtx.remove(self.blockhash_prefix_to_height.data(), blockhash.prefix()); - - // if txindex.is_none() { - // txindex.replace( - // wtx.get(self.height_to_first_txindex.data(), height_slice)? - // .context("for height to have first txindex")?, - // ); - // } - // wtx.remove(self.height_to_first_txindex.data(), height_slice); - // wtx.remove(self.height_to_last_txindex.data(), height_slice); - - // Ok(()) - // })?; - - // let txindex = txindex.context("txindex to not be none by now")?; - - // wtx.range(self.txindex_to_txid.data(), Slice::from(txindex)..) - // .try_for_each(|slice| -> color_eyre::Result<()> { - // let (slice_txindex, slice_txid) = slice?; - // let txindex = Txindex::from(slice_txindex); - // let txid = Txid::from_slice(&slice_txid)?; - - // wtx.remove(self.txindex_to_txid.data(), Slice::from(txindex)); - // wtx.remove(self.txindex_to_height.data(), Slice::from(txindex)); - // wtx.remove(self.txid_prefix_to_txindex.data(), txid.prefix()); - - // Ok(()) - // })?; - - // let txoutindex = Txoutindex::from(txindex); - - // let mut addressindexes = BTreeSet::new(); - - // wtx.range(self.txoutindex_to_amount.data(), Slice::from(txoutindex)..) - // .try_for_each(|slice| -> color_eyre::Result<()> { - // let (txoutindex_slice, _) = slice?; - - // wtx.remove(self.txoutindex_to_amount.data(), txoutindex_slice); - - // if let Some(addressindex_slice) = - // wtx.get(self.txoutindex_to_addressindex.data(), txoutindex_slice)? - // { - // wtx.remove(self.txoutindex_to_addressindex.data(), txoutindex_slice); - - // let addressindex = Addressindex::from(addressindex_slice); - // addressindexes.insert(addressindex); - - // let txoutindex = Txoutindex::from(txoutindex_slice); - // let addresstxoutindex = Addresstxoutindex::from((addressindex, txoutindex)); - - // wtx.remove( - // self.addressindex_to_txoutindexes.data(), - // Slice::from(addresstxoutindex), - // ); - // } - - // Ok(()) - // })?; - - // addressindexes - // .into_iter() - // .filter(|addressindex| { - // let is_empty = wtx - // .prefix( - // self.addressindex_to_txoutindexes.data(), - // Slice::from(*addressindex), - // ) - // .next() - // .is_none(); - // is_empty - // }) - // .try_for_each(|addressindex| -> color_eyre::Result<()> { - // let addressindex_slice = Slice::from(addressindex); - - // let addressbytes = Addressbytes::from( - // wtx.get( - // self.addressindex_to_addressbytes.data(), - // &addressindex_slice, - // )? - // .context("addressindex_to_address to have value")?, - // ); - // wtx.remove( - // self.addressbytes_prefix_to_addressindex.data(), - // addressbytes.prefix(), - // ); - // wtx.remove( - // self.addressindex_to_addressbytes.data(), - // &addressindex_slice, - // ); - // wtx.remove(self.addressindex_to_addresstype.data(), &addressindex_slice); - - // Ok(()) - // })?; - // - - // todo!("clear addresstxoutindexes_out") - // todo!("clear addresstxoutindexes_in") - // todo!("clear zero_txoutindexes") - // todo!("clear txindexvout_to_txoutindex") - - // Ok(()) - // } - - pub fn min_height(&self) -> Option { - [ - self.addressbytes_prefix_to_addressindex.height(), - self.blockhash_prefix_to_height.height(), - self.txid_prefix_to_txindex.height(), - ] - .into_iter() - .min() - .flatten() - .cloned() - } - - pub fn export(self, height: Height) -> Result<(), snkrj::Error> { - thread::scope(|scope| { - vec![ - scope.spawn(|| self.addressbytes_prefix_to_addressindex.export(height)), - scope.spawn(|| self.blockhash_prefix_to_height.export(height)), - scope.spawn(|| self.txid_prefix_to_txindex.export(height)), - ] - .into_iter() - .try_for_each(|handle| -> Result<(), snkrj::Error> { handle.join().unwrap() }) - }) - } -} diff --git a/indexer/src/storage/sanakirjas/multi.rs b/indexer/src/storage/sanakirjas/multi.rs deleted file mode 100644 index 4ac8f15b6..000000000 --- a/indexer/src/storage/sanakirjas/multi.rs +++ /dev/null @@ -1,103 +0,0 @@ -use std::{array, path::Path, sync::OnceLock}; - -use rayon::prelude::*; -use snkrj::{DatabaseKey, DatabaseMulti, DatabaseValue}; - -use super::{Height, StoreMeta, Version}; - -pub struct StoreMulti -where - K: DatabaseKey, - V: DatabaseValue, -{ - meta: StoreMeta, - pub parts: [OnceLock>>; 256], -} - -impl StoreMulti -where - K: DatabaseKey, - V: DatabaseValue, -{ - pub fn open(path: &Path, version: Version) -> Result { - let meta = StoreMeta::checked_open(path, version)?; - - Ok(Self { - meta, - parts: array::from_fn(|_| OnceLock::new()), - }) - } - - // pub fn len(&self) -> usize { - // self.meta.len() - // } - - fn get_or_init_store(&self, key: &K) -> &DatabaseMulti { - self.get_or_init_store_(key.as_ne_byte() as usize) - } - - fn get_or_init_store_(&self, storeindex: usize) -> &DatabaseMulti { - self.parts[storeindex] - .get_or_init(|| Box::new(DatabaseMulti::open(self.meta.path_parts().join(storeindex.to_string())).unwrap())) - } - - fn get_or_init_mut_store(&mut self, key: &K) -> &mut DatabaseMulti { - self.get_or_init_store(key); - - self.parts - .get_mut(key.as_ne_byte() as usize) - .unwrap() - .get_mut() - .unwrap() - } - - #[allow(unused)] - pub fn open_all(&self) { - (0..=(u8::MAX) as usize).for_each(|storeindex| { - self.get_or_init_store_(storeindex); - }); - } - - #[allow(unused)] - pub fn get(&self, key: &K) -> Result, snkrj::Error> { - self.get_or_init_store(key).get(key) - } - - pub fn insert(&mut self, key: K, value: V) -> Option { - self.meta.len += 1; - self.get_or_init_mut_store(&key).insert(key, value) - } - - pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { - if self.meta.needs(height) { - self.insert(key, value); - } - } - - pub fn export(self, height: Height) -> Result<(), snkrj::Error> { - if self.has(height) { - return Ok(()); - } - - self.meta.export(height)?; - - self.parts.into_par_iter().try_for_each(|s| { - if let Some(db) = s.into_inner() { - db.export() - } else { - Ok(()) - } - }) - } - - pub fn height(&self) -> Option<&Height> { - self.meta.height() - } - #[allow(unused)] - pub fn needs(&self, height: Height) -> bool { - self.meta.needs(height) - } - pub fn has(&self, height: Height) -> bool { - self.meta.has(height) - } -} diff --git a/indexer/src/storage/sanakirjas/unique.rs b/indexer/src/storage/sanakirjas/unique.rs deleted file mode 100644 index 6c958d6d1..000000000 --- a/indexer/src/storage/sanakirjas/unique.rs +++ /dev/null @@ -1,102 +0,0 @@ -use std::{array, path::Path, sync::OnceLock}; - -use rayon::prelude::*; -use snkrj::{DatabaseKey, DatabaseUnique, DatabaseValue}; - -use super::{Height, StoreMeta, Version}; - -pub struct StoreUnique -where - K: DatabaseKey, - V: DatabaseValue, -{ - meta: StoreMeta, - pub parts: [OnceLock>>; 256], -} - -impl StoreUnique -where - K: DatabaseKey, - V: DatabaseValue, -{ - pub fn open(path: &Path, version: Version) -> Result { - let meta = StoreMeta::checked_open(path, version)?; - - Ok(Self { - meta, - parts: array::from_fn(|_| OnceLock::new()), - }) - } - - // pub fn len(&self) -> usize { - // self.meta.len() - // } - - fn get_or_init_store(&self, key: &K) -> &DatabaseUnique { - self.get_or_init_store_(key.as_ne_byte() as usize) - } - - fn get_or_init_store_(&self, storeindex: usize) -> &DatabaseUnique { - self.parts[storeindex].get_or_init(|| { - Box::new(DatabaseUnique::open(self.meta.path_parts().join(storeindex.to_string())).unwrap()) - }) - } - - fn get_or_init_mut_store(&mut self, key: &K) -> &mut DatabaseUnique { - self.get_or_init_store(key); - - self.parts - .get_mut(key.as_ne_byte() as usize) - .unwrap() - .get_mut() - .unwrap() - } - - #[allow(unused)] - pub fn open_all(&self) { - (0..=(u8::MAX) as usize).for_each(|storeindex| { - self.get_or_init_store_(storeindex); - }); - } - - pub fn get(&self, key: &K) -> Result, snkrj::Error> { - self.get_or_init_store(key).get(key) - } - - pub fn insert(&mut self, key: K, value: V) -> Option { - self.meta.len += 1; - self.get_or_init_mut_store(&key).insert(key, value) - } - - pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { - if self.meta.needs(height) { - self.insert(key, value); - } - } - - pub fn export(self, height: Height) -> Result<(), snkrj::Error> { - if self.has(height) { - return Ok(()); - } - - self.meta.export(height)?; - - self.parts.into_par_iter().try_for_each(|s| { - if let Some(db) = s.into_inner() { - db.export() - } else { - Ok(()) - } - }) - } - - pub fn height(&self) -> Option<&Height> { - self.meta.height() - } - pub fn needs(&self, height: Height) -> bool { - self.meta.needs(height) - } - pub fn has(&self, height: Height) -> bool { - self.meta.has(height) - } -} diff --git a/indexer/src/storage/storable_vecs/base.rs b/indexer/src/storage/storable_vecs/base.rs index c05d21c5a..0f4f95ca9 100644 --- a/indexer/src/storage/storable_vecs/base.rs +++ b/indexer/src/storage/storable_vecs/base.rs @@ -34,6 +34,11 @@ where self.vec.flush() } + pub fn truncate_if_needed(&mut self, index: I, height: Height) -> storable_vec::Result> { + height.write(&self.path_height())?; + self.vec.truncate_if_needed(index) + } + pub fn height(&self) -> storable_vec::Result { Height::try_from(self.path_height().as_path()) } diff --git a/indexer/src/storage/storable_vecs/mod.rs b/indexer/src/storage/storable_vecs/mod.rs index 3e4c2c39a..7b063f867 100644 --- a/indexer/src/storage/storable_vecs/mod.rs +++ b/indexer/src/storage/storable_vecs/mod.rs @@ -1,4 +1,4 @@ -use std::{fs, io, path::Path}; +use std::{collections::BTreeMap, fs, io, path::Path}; use exit::Exit; use rayon::prelude::*; @@ -181,117 +181,114 @@ impl StorableVecs { } #[allow(unused)] - pub fn rollback_from(&mut self, _height: Height, _exit: &Exit) -> color_eyre::Result<()> { - panic!(); - // let mut txindex = None; + pub fn rollback_from(&mut self, height: Height, exit: &Exit) -> storable_vec::Result<()> { + let prev_height = height.decremented(); - // wtx.range(self.height_to_blockhash.data(), Slice::from(height)..) - // .try_for_each(|slice| -> color_eyre::Result<()> { - // let (height_slice, slice_blockhash) = slice?; - // let blockhash = BlockHash::from_slice(&slice_blockhash)?; + let mut truncated_indexes: BTreeMap> = BTreeMap::new(); - // wtx.remove(self.height_to_blockhash.data(), height_slice); + let addressindex = self + .height_to_first_addressindex + .truncate_if_needed(height, prev_height)?; + let txindex = self.height_to_first_txindex.truncate_if_needed(height, prev_height)?; + let txinindex = self.height_to_first_txinindex.truncate_if_needed(height, prev_height)?; + let txoutindex = self + .height_to_first_txoutindex + .truncate_if_needed(height, prev_height)?; + let p2pk33index = self + .height_to_first_p2pk33index + .truncate_if_needed(height, prev_height)?; + let p2pk65index = self + .height_to_first_p2pk65index + .truncate_if_needed(height, prev_height)?; + let p2pkhindex = self + .height_to_first_p2pkhindex + .truncate_if_needed(height, prev_height)?; + let p2shindex = self.height_to_first_p2shindex.truncate_if_needed(height, prev_height)?; + let p2trindex = self.height_to_first_p2trindex.truncate_if_needed(height, prev_height)?; + let p2wpkhindex = self + .height_to_first_p2wpkhindex + .truncate_if_needed(height, prev_height)?; + let p2wshindex = self + .height_to_first_p2wshindex + .truncate_if_needed(height, prev_height)?; - // wtx.remove(self.blockhash_prefix_to_height.data(), blockhash.prefix()); + self.height_to_blockhash.truncate_if_needed(height, prev_height)?; + self.height_to_difficulty.truncate_if_needed(height, prev_height)?; + self.height_to_first_emptyindex + .truncate_if_needed(height, prev_height)?; + self.height_to_first_multisigindex + .truncate_if_needed(height, prev_height)?; + self.height_to_first_opreturnindex + .truncate_if_needed(height, prev_height)?; + self.height_to_first_pushonlyindex + .truncate_if_needed(height, prev_height)?; + self.height_to_first_unknownindex + .truncate_if_needed(height, prev_height)?; + self.height_to_size.truncate_if_needed(height, prev_height)?; + self.height_to_timestamp.truncate_if_needed(height, prev_height)?; + self.height_to_weight.truncate_if_needed(height, prev_height)?; - // if txindex.is_none() { - // txindex.replace( - // wtx.get(self.height_to_first_txindex.data(), height_slice)? - // .context("for height to have first txindex")?, - // ); - // } - // wtx.remove(self.height_to_first_txindex.data(), height_slice); - // wtx.remove(self.height_to_last_txindex.data(), height_slice); + if let Some(addressindex) = addressindex { + self.addressindex_to_addresstype + .truncate_if_needed(addressindex, prev_height)?; + self.addressindex_to_addresstypeindex + .truncate_if_needed(addressindex, prev_height)?; + self.addressindex_to_height + .truncate_if_needed(addressindex, prev_height)?; + } - // Ok(()) - // })?; + if let Some(p2pk33index) = p2pk33index { + self.p2pk33index_to_p2pk33addressbytes + .truncate_if_needed(p2pk33index, prev_height)?; + } + if let Some(p2pk65index) = p2pk65index { + self.p2pk65index_to_p2pk65addressbytes + .truncate_if_needed(p2pk65index, prev_height)?; + } + if let Some(p2pkhindex) = p2pkhindex { + self.p2pkhindex_to_p2pkhaddressbytes + .truncate_if_needed(p2pkhindex, prev_height)?; + } + if let Some(p2shindex) = p2shindex { + self.p2shindex_to_p2shaddressbytes + .truncate_if_needed(p2shindex, prev_height)?; + } + if let Some(p2trindex) = p2trindex { + self.p2trindex_to_p2traddressbytes + .truncate_if_needed(p2trindex, prev_height)?; + } + if let Some(p2wpkhindex) = p2wpkhindex { + self.p2wpkhindex_to_p2wpkhaddressbytes + .truncate_if_needed(p2wpkhindex, prev_height)?; + } + if let Some(p2wshindex) = p2wshindex { + self.p2wshindex_to_p2wshaddressbytes + .truncate_if_needed(p2wshindex, prev_height); + } - // let txindex = txindex.context("txindex to not be none by now")?; + if let Some(txindex) = txindex { + self.txindex_to_first_txinindex + .truncate_if_needed(txindex, prev_height)?; + self.txindex_to_first_txoutindex + .truncate_if_needed(txindex, prev_height)?; + self.txindex_to_height.truncate_if_needed(txindex, prev_height)?; + self.txindex_to_locktime.truncate_if_needed(txindex, prev_height)?; + self.txindex_to_txid.truncate_if_needed(txindex, prev_height)?; + self.txindex_to_txversion.truncate_if_needed(txindex, prev_height)?; + } - // wtx.range(self.txindex_to_txid.data(), Slice::from(txindex)..) - // .try_for_each(|slice| -> color_eyre::Result<()> { - // let (slice_txindex, slice_txid) = slice?; - // let txindex = Txindex::from(slice_txindex); - // let txid = Txid::from_slice(&slice_txid)?; + if let Some(txinindex) = txinindex { + self.txinindex_to_txoutindex + .truncate_if_needed(txinindex, prev_height)?; + } - // wtx.remove(self.txindex_to_txid.data(), Slice::from(txindex)); - // wtx.remove(self.txindex_to_height.data(), Slice::from(txindex)); - // wtx.remove(self.txid_prefix_to_txindex.data(), txid.prefix()); + if let Some(txoutindex) = txoutindex { + self.txoutindex_to_addressindex + .truncate_if_needed(txoutindex, prev_height)?; + self.txoutindex_to_value.truncate_if_needed(txoutindex, prev_height)?; + } - // Ok(()) - // })?; - - // let txoutindex = Txoutindex::from(txindex); - - // let mut addressindexes = BTreeSet::new(); - - // wtx.range(self.txoutindex_to_amount.data(), Slice::from(txoutindex)..) - // .try_for_each(|slice| -> color_eyre::Result<()> { - // let (txoutindex_slice, _) = slice?; - - // wtx.remove(self.txoutindex_to_amount.data(), txoutindex_slice); - - // if let Some(addressindex_slice) = - // wtx.get(self.txoutindex_to_addressindex.data(), txoutindex_slice)? - // { - // wtx.remove(self.txoutindex_to_addressindex.data(), txoutindex_slice); - - // let addressindex = Addressindex::from(addressindex_slice); - // addressindexes.insert(addressindex); - - // let txoutindex = Txoutindex::from(txoutindex_slice); - // let addresstxoutindex = Addresstxoutindex::from((addressindex, txoutindex)); - - // wtx.remove( - // self.addressindex_to_txoutindexes.data(), - // Slice::from(addresstxoutindex), - // ); - // } - - // Ok(()) - // })?; - - // addressindexes - // .into_iter() - // .filter(|addressindex| { - // let is_empty = wtx - // .prefix( - // self.addressindex_to_txoutindexes.data(), - // Slice::from(*addressindex), - // ) - // .next() - // .is_none(); - // is_empty - // }) - // .try_for_each(|addressindex| -> color_eyre::Result<()> { - // let addressindex_slice = Slice::from(addressindex); - - // let addressbytes = Addressbytes::from( - // wtx.get( - // self.addressindex_to_addressbytes.data(), - // &addressindex_slice, - // )? - // .context("addressindex_to_address to have value")?, - // ); - // wtx.remove( - // self.addressbytes_prefix_to_addressindex.data(), - // addressbytes.prefix(), - // ); - // wtx.remove( - // self.addressindex_to_addressbytes.data(), - // &addressindex_slice, - // ); - // wtx.remove(self.addressindex_to_addresstype.data(), &addressindex_slice); - - // Ok(()) - // })?; - // - - // todo!("clear addresstxoutindexes_out") - // todo!("clear addresstxoutindexes_in") - // todo!("clear zero_txoutindexes") - - // Ok(()) + Ok(()) } pub fn flush(&mut self, height: Height) -> io::Result<()> { diff --git a/indexer/src/structs/height.rs b/indexer/src/structs/height.rs index e09fe4d11..daaa11c15 100644 --- a/indexer/src/structs/height.rs +++ b/indexer/src/structs/height.rs @@ -33,6 +33,10 @@ impl Height { pub fn write(&self, path: &Path) -> Result<(), io::Error> { fs::write(path, self.as_bytes()) } + + pub fn decremented(&self) -> Self { + Self(self.0.checked_sub(1).unwrap_or_default()) + } } impl PartialEq for Height { diff --git a/pricer/src/lib.rs b/pricer/src/lib.rs index 6d5f0f617..60a2fcc63 100644 --- a/pricer/src/lib.rs +++ b/pricer/src/lib.rs @@ -120,18 +120,19 @@ impl Pricer { } fn get_date_ohlc(&mut self, date: Date) -> color_eyre::Result { - if self.ohlc.date.is_key_safe(date) { - Ok(self.ohlc.date.get_or_import(&date).unwrap().to_owned()) - } else { - let ohlc = self - .get_from_daily_kraken(&date) - .or_else(|_| self.get_from_daily_binance(&date)) - .or_else(|_| self.get_from_date_kibo(&date))?; + todo!(); + // if self.ohlc.date.is_key_safe(date) { + // Ok(self.ohlc.date.get_or_import(&date).unwrap().to_owned()) + // } else { + // let ohlc = self + // .get_from_daily_kraken(&date) + // .or_else(|_| self.get_from_daily_binance(&date)) + // .or_else(|_| self.get_from_date_kibo(&date))?; - self.ohlc.date.insert(date, ohlc); + // self.ohlc.date.insert(date, ohlc); - Ok(ohlc) - } + // Ok(ohlc) + // } } fn get_height_ohlc( @@ -140,51 +141,53 @@ impl Pricer { timestamp: Timestamp, previous_timestamp: Option, ) -> color_eyre::Result { - if let Some(ohlc) = self.ohlc.height.get_or_import(&height) { - return Ok(ohlc); - } + todo!(); - let timestamp = timestamp.to_floored_seconds(); + // if let Some(ohlc) = self.ohlc.height.get_or_import(&height) { + // return Ok(ohlc); + // } - if previous_timestamp.is_none() && !height.is_first() { - panic!("Shouldn't be possible"); - } + // let timestamp = timestamp.to_floored_seconds(); - let previous_timestamp = previous_timestamp.map(|t| t.to_floored_seconds()); + // if previous_timestamp.is_none() && !height.is_first() { + // panic!("Shouldn't be possible"); + // } - let ohlc = self - .get_from_1mn_kraken(timestamp, previous_timestamp) - .unwrap_or_else(|_| { - self.get_from_1mn_binance(timestamp, previous_timestamp) - .unwrap_or_else(|_| { - self.get_from_har_binance(timestamp, previous_timestamp, config) - .unwrap_or_else(|_| { - self.get_from_height_kibo(&height).unwrap_or_else(|_| { - let date = timestamp.to_date(); + // let previous_timestamp = previous_timestamp.map(|t| t.to_floored_seconds()); - panic!( - "Can't find the price for: height: {height} - date: {date} -1mn APIs are limited to the last 16 hours for Binance's and the last 10 hours for Kraken's -How to fix this: -1. Go to https://www.binance.com/en/trade/BTC_USDT?type=spot -2. Select 1mn interval -3. Open the inspector/dev tools -4. Go to the Network Tab -5. Filter URLs by 'uiKlines' -6. Go back to the chart and scroll until you pass the date mentioned few lines ago -7. Go back to the dev tools -8. Export to a har file (if there is no explicit button, click on the cog button) -9. Move the file to 'parser/imports/binance.har' -" - ) - }) - }) - }) - }); + // let ohlc = self + // .get_from_1mn_kraken(timestamp, previous_timestamp) + // .unwrap_or_else(|_| { + // self.get_from_1mn_binance(timestamp, previous_timestamp) + // .unwrap_or_else(|_| { + // self.get_from_har_binance(timestamp, previous_timestamp, config) + // .unwrap_or_else(|_| { + // self.get_from_height_kibo(&height).unwrap_or_else(|_| { + // let date = timestamp.to_date(); - // self.ohlc.height.insert(height, ohlc); + // panic!( + // "Can't find the price for: height: {height} - date: {date} + // 1mn APIs are limited to the last 16 hours for Binance's and the last 10 hours for Kraken's + // How to fix this: + // 1. Go to https://www.binance.com/en/trade/BTC_USDT?type=spot + // 2. Select 1mn interval + // 3. Open the inspector/dev tools + // 4. Go to the Network Tab + // 5. Filter URLs by 'uiKlines' + // 6. Go back to the chart and scroll until you pass the date mentioned few lines ago + // 7. Go back to the dev tools + // 8. Export to a har file (if there is no explicit button, click on the cog button) + // 9. Move the file to 'parser/imports/binance.har' + // " + // ) + // }) + // }) + // }) + // }); - Ok(ohlc) + // // self.ohlc.height.insert(height, ohlc); + + // Ok(ohlc) } fn find_height_ohlc( diff --git a/pricer/src/main.rs b/pricer/src/main.rs index dcfafe61a..9efd33647 100644 --- a/pricer/src/main.rs +++ b/pricer/src/main.rs @@ -8,12 +8,12 @@ fn main() -> color_eyre::Result<()> { logger::init_log(None); - dbg!(Binance::fetch_1d_prices()?); + // dbg!(Binance::fetch_1d_prices()?); // dbg!(Binance::fetch_1mn_prices()); - dbg!(Kraken::fetch_1d()?); + // dbg!(Kraken::fetch_1d()?); // dbg!(Kraken::fetch_1mn_prices()?); - dbg!(Kibo::fetch_date_prices(2025)?); - dbg!(Kibo::fetch_height_prices(Height::from(880_000_u32))?); + // dbg!(Kibo::fetch_date_prices(2025)?); + // dbg!(Kibo::fetch_height_prices(Height::from(880_000_u32))?); Ok(()) } diff --git a/storable_vec/src/lib.rs b/storable_vec/src/lib.rs index 790ed1082..40c0eb220 100644 --- a/storable_vec/src/lib.rs +++ b/storable_vec/src/lib.rs @@ -282,6 +282,23 @@ where Ok(()) } + pub fn truncate_if_needed(&mut self, index: I) -> Result> { + let index = Self::i_to_usize(index)?; + + if index >= self.file_len { + return Ok(None); + } + + let value_at_index = self.open_file_at_then_read(index).ok(); + + self.file + .set_len(Self::index_to_byte_index(index.checked_sub(1).unwrap_or_default()))?; + + self.reset_disk_related_state()?; + + Ok(value_at_index) + } + #[inline] fn i_to_usize(index: I) -> Result { index.try_into().map_err(|_| Error::FailedKeyTryIntoUsize)