From dc2e847f587f1c97526760427d896de66676c48e Mon Sep 17 00:00:00 2001 From: nym21 Date: Sun, 9 Nov 2025 11:25:13 +0100 Subject: [PATCH] global: snapshot --- Cargo.lock | 1 + crates/brk_computer/examples/pools.rs | 2 +- crates/brk_computer/src/fetched.rs | 2 +- .../src/grouped/value_from_txindex.rs | 5 +- crates/brk_computer/src/indexes.rs | 4 +- crates/brk_error/Cargo.toml | 1 + crates/brk_error/src/lib.rs | 9 + crates/brk_grouper/src/by_address_type.rs | 8 +- crates/brk_indexer/README.md | 8 +- crates/brk_indexer/src/lib.rs | 96 +++--- crates/brk_indexer/src/stores_v2.rs | 286 ++++++++++++------ crates/brk_indexer/src/stores_v3.rs | 52 ++-- crates/brk_mcp/src/lib.rs | 19 +- crates/brk_query/src/async.rs | 140 ++++++++- crates/brk_query/src/chain/addresses.rs | 89 +++--- crates/brk_query/src/chain/transactions.rs | 18 +- crates/brk_query/src/lib.rs | 8 +- crates/brk_query/src/vecs.rs | 4 +- crates/brk_server/src/api/addresses/mod.rs | 7 +- crates/brk_server/src/api/metrics/mod.rs | 14 +- crates/brk_server/src/api/transactions/mod.rs | 7 +- crates/brk_server/src/extended/result.rs | 17 +- crates/brk_store/src/fjall_v2/mod.rs | 16 +- crates/brk_store/src/fjall_v3/mod.rs | 2 +- crates/brk_types/src/addressbytes.rs | 18 +- .../{addressbyteshash.rs => addresshash.rs} | 14 +- crates/brk_types/src/addressindexoutpoint.rs | 2 +- .../src/addresstypeaddressindexoutpoint.rs | 73 ----- .../src/addresstypeaddressindextxindex.rs | 88 ------ crates/brk_types/src/lib.rs | 8 +- 30 files changed, 521 insertions(+), 497 deletions(-) rename crates/brk_types/src/{addressbyteshash.rs => addresshash.rs} (69%) delete mode 100644 crates/brk_types/src/addresstypeaddressindexoutpoint.rs delete mode 100644 crates/brk_types/src/addresstypeaddressindextxindex.rs diff --git a/Cargo.lock b/Cargo.lock index c96f1c23e..5de293fde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -679,6 +679,7 @@ dependencies = [ "jiff", "minreq", "sonic-rs", + "tokio", "vecdb", "zerocopy", ] diff --git a/crates/brk_computer/examples/pools.rs b/crates/brk_computer/examples/pools.rs index bda6b719d..46a77b9e0 100644 --- a/crates/brk_computer/examples/pools.rs +++ b/crates/brk_computer/examples/pools.rs @@ -5,7 +5,7 @@ use brk_error::Result; use brk_fetcher::Fetcher; use brk_indexer::Indexer; use brk_types::{Address, AddressBytes, OutputType, TxOutIndex, pools}; -use vecdb::{AnyIterableVec, Exit, VecIterator, VecIteratorExtended}; +use vecdb::{AnyIterableVec, Exit, VecIteratorExtended}; fn main() -> Result<()> { brk_logger::init(Some(Path::new(".log")))?; diff --git a/crates/brk_computer/src/fetched.rs b/crates/brk_computer/src/fetched.rs index ec087788c..ac9fa8ff5 100644 --- a/crates/brk_computer/src/fetched.rs +++ b/crates/brk_computer/src/fetched.rs @@ -81,7 +81,7 @@ impl Vecs { .enumerate() .try_for_each(|(i, v)| -> Result<()> { self.height_to_price_ohlc_in_cents.forced_push_at( - i.into(), + i, self.fetcher .get_height( i.into(), diff --git a/crates/brk_computer/src/grouped/value_from_txindex.rs b/crates/brk_computer/src/grouped/value_from_txindex.rs index 96c4791cc..0038e2b43 100644 --- a/crates/brk_computer/src/grouped/value_from_txindex.rs +++ b/crates/brk_computer/src/grouped/value_from_txindex.rs @@ -57,10 +57,7 @@ impl ComputedValueVecsFromTxindex { &name_btc, version + VERSION, source_vec.map_or_else(|| sats.txindex.as_ref().unwrap().boxed_clone(), |s| s), - |txindex: TxIndex, iter| { - iter.get_at(txindex.to_usize()) - .map(|sats| Bitcoin::from(sats)) - }, + |txindex: TxIndex, iter| iter.get_at(txindex.to_usize()).map(Bitcoin::from), ); let bitcoin = ComputedVecsFromTxindex::forced_import( diff --git a/crates/brk_computer/src/indexes.rs b/crates/brk_computer/src/indexes.rs index 1a3cdb7e8..333278ed2 100644 --- a/crates/brk_computer/src/indexes.rs +++ b/crates/brk_computer/src/indexes.rs @@ -163,7 +163,7 @@ impl Vecs { let start = usize::from(start); let end = txindex_to_first_txinindex_iter .get_at(txindex + 1) - .map(|v| usize::from(v)) + .map(usize::from) .unwrap_or_else(|| txinindex_to_txoutindex_iter.len()); StoredU64::from((start..end).count()) }) @@ -183,7 +183,7 @@ impl Vecs { let start = usize::from(start); let end = txindex_to_first_txoutindex_iter .get_at(txindex + 1) - .map(|v| usize::from(v)) + .map(usize::from) .unwrap_or_else(|| txoutindex_to_value_iter.len()); StoredU64::from((start..end).count()) }) diff --git a/crates/brk_error/Cargo.toml b/crates/brk_error/Cargo.toml index 64ff8b853..461bf09be 100644 --- a/crates/brk_error/Cargo.toml +++ b/crates/brk_error/Cargo.toml @@ -17,5 +17,6 @@ fjall3 = { workspace = true } jiff = { workspace = true } minreq = { workspace = true } sonic-rs = { workspace = true } +tokio = { workspace = true } vecdb = { workspace = true } zerocopy = { workspace = true } diff --git a/crates/brk_error/src/lib.rs b/crates/brk_error/src/lib.rs index 22c620e89..03402d2c8 100644 --- a/crates/brk_error/src/lib.rs +++ b/crates/brk_error/src/lib.rs @@ -24,6 +24,7 @@ pub enum Error { BitcoinFromScriptError(bitcoin::address::FromScriptError), BitcoinHexToArrayError(bitcoin::hex::HexToArrayError), SonicRS(sonic_rs::Error), + TokioJoin(tokio::task::JoinError), ZeroCopyError, Vecs(vecdb::Error), @@ -92,6 +93,13 @@ impl From for Error { } } +impl From for Error { + #[inline] + fn from(error: tokio::task::JoinError) -> Self { + Self::TokioJoin(error) + } +} + impl From for Error { #[inline] fn from(value: io::Error) -> Self { @@ -186,6 +194,7 @@ impl fmt::Display for Error { Error::RawDB(error) => Display::fmt(&error, f), Error::SonicRS(error) => Display::fmt(&error, f), Error::SystemTimeError(error) => Display::fmt(&error, f), + Error::TokioJoin(error) => Display::fmt(&error, f), Error::VecDB(error) => Display::fmt(&error, f), Error::Vecs(error) => Display::fmt(&error, f), Error::ZeroCopyError => write!(f, "ZeroCopy error"), diff --git a/crates/brk_grouper/src/by_address_type.rs b/crates/brk_grouper/src/by_address_type.rs index 726a8f82d..adb84c1dc 100644 --- a/crates/brk_grouper/src/by_address_type.rs +++ b/crates/brk_grouper/src/by_address_type.rs @@ -63,8 +63,8 @@ impl ByAddressType { } #[inline] - pub fn get_unwrap(&self, address_type: OutputType) -> &T { - self.get(address_type).unwrap() + pub fn get_unwrap(&self, addresstype: OutputType) -> &T { + self.get(addresstype).unwrap() } #[inline] @@ -83,8 +83,8 @@ impl ByAddressType { } #[inline] - pub fn get_mut_unwrap(&mut self, address_type: OutputType) -> &mut T { - self.get_mut(address_type).unwrap() + pub fn get_mut_unwrap(&mut self, addresstype: OutputType) -> &mut T { + self.get_mut(addresstype).unwrap() } #[inline] diff --git a/crates/brk_indexer/README.md b/crates/brk_indexer/README.md index ed4c3e4fb..a94a3a849 100644 --- a/crates/brk_indexer/README.md +++ b/crates/brk_indexer/README.md @@ -88,7 +88,7 @@ Main indexing function processing blocks from parser with collision detection. **Key-Value Stores:** -- `addressbyteshash_to_typeindex`: Address hash to internal index mapping +- `addresshash_to_typeindex`: Address hash to internal index mapping - `blockhashprefix_to_height`: Block hash prefix to height lookup - `txidprefix_to_txindex`: Transaction ID prefix to internal index - `addresstype_to_typeindex_with_txoutindex`: Address type to output mappings @@ -137,7 +137,7 @@ println!("Total addresses: {}", final_indexes.total_address_count()); ```rust use brk_indexer::Indexer; -use brk_types::{Height, TxidPrefix, AddressBytesHash}; +use brk_types::{Height, TxidPrefix, AddressHash}; let indexer = Indexer::forced_import("./blockchain_index")?; @@ -154,8 +154,8 @@ if let Some(tx_index) = indexer.stores.txidprefix_to_txindex.get(&txid_prefix)? } // Query address information -let address_hash = AddressBytesHash::from(/* address bytes */); -if let Some(type_index) = indexer.stores.addressbyteshash_to_typeindex.get(&address_hash)? { +let address_hash = AddressHash::from(/* address bytes */); +if let Some(type_index) = indexer.stores.addresshash_to_typeindex.get(&address_hash)? { println!("Address type index: {}", type_index); } ``` diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 93788699a..fd5ce7696 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -8,10 +8,9 @@ use brk_iterator::Blocks; use brk_rpc::Client; use brk_store::AnyStore; use brk_types::{ - AddressBytes, AddressBytesHash, AddressTypeAddressIndexOutPoint, - AddressTypeAddressIndexTxIndex, BlockHashPrefix, Height, OutPoint, OutputType, Sats, - StoredBool, Timestamp, TxInIndex, TxIndex, TxOutIndex, Txid, TxidPrefix, TypeIndex, Unit, - Version, Vin, Vout, + AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, + OutPoint, OutputType, Sats, StoredBool, Timestamp, TxInIndex, TxIndex, TxOutIndex, Txid, + TxidPrefix, TypeIndex, Unit, Version, Vin, Vout, }; use log::{error, info}; use rayon::prelude::*; @@ -333,7 +332,7 @@ impl Indexer { TxIndex, Vout, OutputType, - Option<(AddressBytes, AddressBytesHash)>, + Option<(AddressBytes, AddressHash)>, Option, )> { let txindex = indexes.txindex + block_txindex; @@ -349,18 +348,21 @@ impl Indexer { return Ok(tuple); } - let address_bytes = AddressBytes::try_from((script, outputtype)).unwrap(); + let addresstype = outputtype; - let address_hash = AddressBytesHash::from(&address_bytes); + let address_bytes = AddressBytes::try_from((script, addresstype)).unwrap(); + + let address_hash = AddressHash::from(&address_bytes); let typeindex_opt = stores - .addressbyteshash_to_typeindex + .addresstype_to_addresshash_to_addressindex + .get_unwrap(addresstype) .get(&address_hash) .unwrap() .map(|v| *v) // Checking if not in the future (in case we started before the last processed block) .and_then(|typeindex_local| { - (typeindex_local < indexes.to_typeindex(outputtype)) + (typeindex_local < indexes.to_typeindex(addresstype)) .then_some(typeindex_local) }); @@ -370,7 +372,7 @@ impl Indexer { if check_collisions && let Some(typeindex) = typeindex_opt { // unreachable!(); - let prev_addressbytes_opt = match outputtype { + let prev_addressbytes_opt = match addresstype { OutputType::P2PK65 => vecs .p2pk65addressindex_to_p2pk65bytes .get_pushed_or_read( @@ -437,7 +439,10 @@ impl Indexer { let address_bytes = &tuple.5.as_ref().unwrap().0; - if stores.addressbyteshash_to_typeindex.needs(height) + if stores + .addresstype_to_addresshash_to_addressindex + .get_unwrap(addresstype) + .needs(height) && prev_addressbytes != address_bytes { let txid = tx.compute_txid(); @@ -446,14 +451,14 @@ impl Indexer { txid, vout, block_txindex, - outputtype, + addresstype, prev_addressbytes, address_bytes, &indexes, typeindex, typeindex, txout, - AddressBytesHash::from(address_bytes), + AddressHash::from(address_bytes), ); panic!() } @@ -470,7 +475,7 @@ impl Indexer { let tx_len = block.txdata.len(); // let i = Instant::now(); - let mut already_added_addressbyteshash: FxHashMap = + let mut already_added_addresshash: FxHashMap = FxHashMap::default(); let mut same_block_output_info: FxHashMap = FxHashMap::default(); @@ -495,47 +500,27 @@ impl Indexer { let typeindex = if let Some(ti) = typeindex_opt { ti } else if let Some((address_bytes, address_hash)) = addressbytes_opt { - if let Some(&ti) = already_added_addressbyteshash.get(&address_hash) { + let addresstype = outputtype; + if let Some(&ti) = already_added_addresshash.get(&address_hash) { ti } else { - let ti = match outputtype { + let ti = match addresstype { OutputType::P2PK65 => indexes.p2pk65addressindex.copy_then_increment(), OutputType::P2PK33 => indexes.p2pk33addressindex.copy_then_increment(), OutputType::P2PKH => indexes.p2pkhaddressindex.copy_then_increment(), - OutputType::P2MS => { - vecs.p2msoutputindex_to_txindex - .push_if_needed(indexes.p2msoutputindex, txindex)?; - indexes.p2msoutputindex.copy_then_increment() - } OutputType::P2SH => indexes.p2shaddressindex.copy_then_increment(), - OutputType::OpReturn => { - vecs.opreturnindex_to_txindex - .push_if_needed(indexes.opreturnindex, txindex)?; - indexes.opreturnindex.copy_then_increment() - } OutputType::P2WPKH => indexes.p2wpkhaddressindex.copy_then_increment(), OutputType::P2WSH => indexes.p2wshaddressindex.copy_then_increment(), OutputType::P2TR => indexes.p2traddressindex.copy_then_increment(), OutputType::P2A => indexes.p2aaddressindex.copy_then_increment(), - OutputType::Empty => { - vecs.emptyoutputindex_to_txindex - .push_if_needed(indexes.emptyoutputindex, txindex)?; - indexes.emptyoutputindex.copy_then_increment() - } - OutputType::Unknown => { - vecs.unknownoutputindex_to_txindex - .push_if_needed(indexes.unknownoutputindex, txindex)?; - indexes.unknownoutputindex.copy_then_increment() - } _ => unreachable!(), }; - already_added_addressbyteshash.insert(address_hash, ti); - stores.addressbyteshash_to_typeindex.insert_if_needed( - address_hash, - ti, - height, - ); + already_added_addresshash.insert(address_hash, ti); + stores + .addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(addresstype) + .insert_if_needed(address_hash, ti, height); vecs.push_bytes_if_needed(ti, address_bytes)?; ti @@ -577,12 +562,9 @@ impl Indexer { stores .addresstype_to_addressindex_and_txindex + .get_mut_unwrap(addresstype) .insert_if_needed( - AddressTypeAddressIndexTxIndex::from(( - addresstype, - addressindex, - txindex, - )), + AddressIndexTxIndex::from((addressindex, txindex)), Unit, height, ); @@ -598,12 +580,9 @@ impl Indexer { stores .addresstype_to_addressindex_and_unspentoutpoint + .get_mut_unwrap(addresstype) .insert_if_needed( - AddressTypeAddressIndexOutPoint::from(( - addresstype, - addressindex, - outpoint, - )), + AddressIndexOutPoint::from((addressindex, outpoint)), Unit, height, ); @@ -651,22 +630,17 @@ impl Indexer { stores .addresstype_to_addressindex_and_txindex + .get_mut_unwrap(addresstype) .insert_if_needed( - AddressTypeAddressIndexTxIndex::from((addresstype, addressindex, txindex)), + AddressIndexTxIndex::from((addressindex, txindex)), Unit, height, ); stores .addresstype_to_addressindex_and_unspentoutpoint - .remove_if_needed( - AddressTypeAddressIndexOutPoint::from(( - addresstype, - addressindex, - outpoint, - )), - height, - ); + .get_mut_unwrap(addresstype) + .remove_if_needed(AddressIndexOutPoint::from((addressindex, outpoint)), height); } // println!("txins.into_iter(): {:?}", i.elapsed()); diff --git a/crates/brk_indexer/src/stores_v2.rs b/crates/brk_indexer/src/stores_v2.rs index db987677b..e52035321 100644 --- a/crates/brk_indexer/src/stores_v2.rs +++ b/crates/brk_indexer/src/stores_v2.rs @@ -1,13 +1,14 @@ use std::{fs, path::Path}; use brk_error::Result; +use brk_grouper::ByAddressType; use brk_store::{AnyStore, Mode, StoreFjallV2 as Store, Type}; use brk_types::{ - AddressBytes, AddressBytesHash, AddressTypeAddressIndexOutPoint, - AddressTypeAddressIndexTxIndex, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex, - TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout, + AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height, + OutPoint, OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, + Vout, }; -use fjall2::{PersistMode, TransactionalKeyspace}; +use fjall2::{CompressionType as Compression, PersistMode, TransactionalKeyspace}; use rayon::prelude::*; use vecdb::{AnyVec, GenericStoredVec, StoredIndex, VecIterator, VecIteratorExtended}; @@ -19,13 +20,17 @@ use super::Vecs; pub struct Stores { pub keyspace: TransactionalKeyspace, - pub addressbyteshash_to_typeindex: Store, + // pub addresshash_to_typeindex: Store, + pub addresstype_to_addresshash_to_addressindex: ByAddressType>, + // pub addresstype_to_addressindex_and_txindex: Store, + pub addresstype_to_addressindex_and_txindex: ByAddressType>, + // pub addresstype_to_addressindex_and_unspentoutpoint: + // Store, + pub addresstype_to_addressindex_and_unspentoutpoint: + ByAddressType>, pub blockhashprefix_to_height: Store, pub height_to_coinbase_tag: Store, pub txidprefix_to_txindex: Store, - pub addresstype_to_addressindex_and_txindex: Store, - pub addresstype_to_addressindex_and_unspentoutpoint: - Store, } impl Stores { @@ -45,6 +50,39 @@ impl Stores { let keyspace_ref = &keyspace; + let create_addresshash_to_addressindex_store = |index| { + Store::import( + keyspace_ref, + path, + &format!("h2i{}", index), + version, + Mode::UniquePushOnly(Type::Random), + Compression::Lz4, + ) + }; + + let create_addressindex_to_txindex_store = |index| { + Store::import( + keyspace_ref, + path, + &format!("a2t{}", index), + version, + Mode::VecLike, + Compression::Lz4, + ) + }; + + let create_addressindex_to_unspentoutpoint_store = |index| { + Store::import( + keyspace_ref, + path, + &format!("a2u{}", index), + version, + Mode::VecLike, + Compression::Lz4, + ) + }; + Ok(Self { keyspace: keyspace.clone(), @@ -54,13 +92,18 @@ impl Stores { "h2c", version, Mode::UniquePushOnly(Type::Sequential), + Compression::Lz4, )?, - addressbyteshash_to_typeindex: Store::import( - keyspace_ref, - path, - "a2t", - version, - Mode::UniquePushOnly(Type::Random), + // addresshash_to_typeindex: Store::import( + // keyspace_ref, + // path, + // "a2t", + // version, + // Mode::UniquePushOnly(Type::Random), + // Compression::Lz4, + // )?, + addresstype_to_addresshash_to_addressindex: ByAddressType::new_with_index( + create_addresshash_to_addressindex_store, )?, blockhashprefix_to_height: Store::import( keyspace_ref, @@ -68,6 +111,7 @@ impl Stores { "b2h", version, Mode::UniquePushOnly(Type::Random), + Compression::Lz4, )?, txidprefix_to_txindex: Store::import( keyspace_ref, @@ -75,20 +119,29 @@ impl Stores { "t2t", version, Mode::UniquePushOnly(Type::Random), + Compression::Lz4, )?, - addresstype_to_addressindex_and_txindex: Store::import( - keyspace_ref, - path, - "aat", - version, - Mode::VecLike, + // addresstype_to_addressindex_and_txindex: Store::import( + // keyspace_ref, + // path, + // "aat", + // version, + // Mode::VecLike, + // Compression::Lz4, + // )?, + addresstype_to_addressindex_and_txindex: ByAddressType::new_with_index( + create_addressindex_to_txindex_store, )?, - addresstype_to_addressindex_and_unspentoutpoint: Store::import( - keyspace_ref, - path, - "aau", - version, - Mode::VecLike, + // addresstype_to_addressindex_and_unspentoutpoint: Store::import( + // keyspace_ref, + // path, + // "aau", + // version, + // Mode::VecLike, + // Compression::Lz4, + // )?, + addresstype_to_addressindex_and_unspentoutpoint: ByAddressType::new_with_index( + create_addressindex_to_unspentoutpoint_store, )?, }) } @@ -106,14 +159,29 @@ impl Stores { pub fn commit(&mut self, height: Height) -> Result<()> { [ - &mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore, - &mut self.blockhashprefix_to_height, + &mut self.blockhashprefix_to_height as &mut dyn AnyStore, &mut self.height_to_coinbase_tag, &mut self.txidprefix_to_txindex, - &mut self.addresstype_to_addressindex_and_txindex, - &mut self.addresstype_to_addressindex_and_unspentoutpoint, + // &mut self.addresshash_to_typeindex + // &mut self.addresstype_to_addressindex_and_txindex, + // &mut self.addresstype_to_addressindex_and_unspentoutpoint, ] - .into_par_iter() // Changed from par_iter_mut() + .into_par_iter() + .chain( + self.addresstype_to_addresshash_to_addressindex + .par_iter_mut() + .map(|s| s as &mut dyn AnyStore), + ) + .chain( + self.addresstype_to_addressindex_and_txindex + .par_iter_mut() + .map(|s| s as &mut dyn AnyStore), + ) + .chain( + self.addresstype_to_addressindex_and_unspentoutpoint + .par_iter_mut() + .map(|s| s as &mut dyn AnyStore), + ) // Changed from par_iter_mut() .try_for_each(|store| store.commit(height))?; self.keyspace @@ -123,14 +191,29 @@ impl Stores { fn iter_any_store(&self) -> impl Iterator { [ - &self.addressbyteshash_to_typeindex as &dyn AnyStore, - &self.blockhashprefix_to_height, + &self.blockhashprefix_to_height as &dyn AnyStore, &self.height_to_coinbase_tag, &self.txidprefix_to_txindex, - &self.addresstype_to_addressindex_and_txindex, - &self.addresstype_to_addressindex_and_unspentoutpoint, + // &self.addresshash_to_typeindex, + // &self.addresstype_to_addressindex_and_txindex, + // &self.addresstype_to_addressindex_and_unspentoutpoint, ] .into_iter() + .chain( + self.addresstype_to_addresshash_to_addressindex + .iter() + .map(|s| s as &dyn AnyStore), + ) + .chain( + self.addresstype_to_addressindex_and_txindex + .iter() + .map(|s| s as &dyn AnyStore), + ) + .chain( + self.addresstype_to_addressindex_and_unspentoutpoint + .iter() + .map(|s| s as &dyn AnyStore), + ) } pub fn rollback_if_needed( @@ -138,14 +221,26 @@ impl Stores { vecs: &mut Vecs, starting_indexes: &Indexes, ) -> Result<()> { - if self.addressbyteshash_to_typeindex.is_empty()? - && self.blockhashprefix_to_height.is_empty()? + if self.blockhashprefix_to_height.is_empty()? && self.txidprefix_to_txindex.is_empty()? && self.height_to_coinbase_tag.is_empty()? - && self.addresstype_to_addressindex_and_txindex.is_empty()? + // && self.addresshash_to_typeindex.is_empty()? + // && self.addresstype_to_addressindex_and_txindex.is_empty()? + // && self + // .addresstype_to_addressindex_and_unspentoutpoint + // .is_empty()? + && self + .addresstype_to_addresshash_to_addressindex + .iter() + .try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))? + && self + .addresstype_to_addressindex_and_txindex + .iter() + .try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))? && self .addresstype_to_addressindex_and_unspentoutpoint - .is_empty()? + .iter() + .try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))? { return Ok(()); } @@ -174,8 +269,10 @@ impl Stores { while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(OutputType::P2PK65) + .remove(hash); index.increment(); } } @@ -189,8 +286,10 @@ impl Stores { while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(OutputType::P2PK33) + .remove(hash); index.increment(); } } @@ -204,8 +303,10 @@ impl Stores { while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(OutputType::P2PKH) + .remove(hash); index.increment(); } } @@ -219,23 +320,10 @@ impl Stores { while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); - index.increment(); - } - } - - if let Ok(mut index) = vecs - .height_to_first_p2traddressindex - .read_once(starting_indexes.height) - { - let mut p2traddressindex_to_p2trbytes_iter = - vecs.p2traddressindex_to_p2trbytes.iter()?; - - while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(OutputType::P2SH) + .remove(hash); index.increment(); } } @@ -249,8 +337,10 @@ impl Stores { while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(OutputType::P2WPKH) + .remove(hash); index.increment(); } } @@ -264,8 +354,27 @@ impl Stores { while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(OutputType::P2WSH) + .remove(hash); + index.increment(); + } + } + + if let Ok(mut index) = vecs + .height_to_first_p2traddressindex + .read_once(starting_indexes.height) + { + let mut p2traddressindex_to_p2trbytes_iter = + vecs.p2traddressindex_to_p2trbytes.iter()?; + + while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressHash::from(&bytes); + self.addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(OutputType::P2TR) + .remove(hash); index.increment(); } } @@ -279,15 +388,17 @@ impl Stores { while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresstype_to_addresshash_to_addressindex + .get_mut_unwrap(OutputType::P2A) + .remove(hash); index.increment(); } } } else { unreachable!(); // self.blockhashprefix_to_height.reset()?; - // self.addressbyteshash_to_typeindex.reset()?; + // self.addresshash_to_typeindex.reset()?; } if starting_indexes.txindex != TxIndex::ZERO { @@ -333,9 +444,9 @@ impl Stores { .for_each(|((txoutindex, addresstype), addressindex)| { let txindex = txoutindex_to_txindex_iter.get_at_unwrap(txoutindex); - self.addresstype_to_addressindex_and_txindex.remove( - AddressTypeAddressIndexTxIndex::from((addresstype, addressindex, txindex)), - ); + self.addresstype_to_addressindex_and_txindex + .get_mut_unwrap(addresstype) + .remove(AddressIndexTxIndex::from((addressindex, txindex))); let vout = Vout::from( txoutindex.to_usize() @@ -345,13 +456,9 @@ impl Stores { ); let outpoint = OutPoint::new(txindex, vout); - self.addresstype_to_addressindex_and_unspentoutpoint.remove( - AddressTypeAddressIndexOutPoint::from(( - addresstype, - addressindex, - outpoint, - )), - ); + self.addresstype_to_addressindex_and_unspentoutpoint + .get_mut_unwrap(addresstype) + .remove(AddressIndexOutPoint::from((addressindex, outpoint))); }); // Add back outputs that were spent after the rollback point @@ -380,22 +487,13 @@ impl Stores { let addresstype = outputtype; let addressindex = txoutindex_to_typeindex_iter.get_unwrap(txoutindex); - self.addresstype_to_addressindex_and_txindex.remove( - AddressTypeAddressIndexTxIndex::from(( - addresstype, - addressindex, - txindex, - )), - ); + self.addresstype_to_addressindex_and_txindex + .get_mut_unwrap(addresstype) + .remove(AddressIndexTxIndex::from((addressindex, txindex))); - self.addresstype_to_addressindex_and_unspentoutpoint.insert( - AddressTypeAddressIndexOutPoint::from(( - addresstype, - addressindex, - outpoint, - )), - Unit, - ); + self.addresstype_to_addressindex_and_unspentoutpoint + .get_mut_unwrap(addresstype) + .insert(AddressIndexOutPoint::from((addressindex, outpoint)), Unit); } } }); diff --git a/crates/brk_indexer/src/stores_v3.rs b/crates/brk_indexer/src/stores_v3.rs index f983a0786..0c301ae78 100644 --- a/crates/brk_indexer/src/stores_v3.rs +++ b/crates/brk_indexer/src/stores_v3.rs @@ -3,9 +3,9 @@ use std::{fs, path::Path}; use brk_error::Result; use brk_store::{AnyStore, Kind3, Mode3, StoreFjallV3 as Store}; use brk_types::{ - AddressBytes, AddressBytesHash, AddressTypeAddressIndexOutPoint, - AddressTypeAddressIndexTxIndex, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex, - TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout, + AddressBytes, AddressHash, AddressTypeAddressIndexOutPoint, AddressTypeAddressIndexTxIndex, + BlockHashPrefix, Height, OutPoint, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, + Unit, Version, Vout, }; use fjall3::{Database, PersistMode}; use rayon::prelude::*; @@ -19,7 +19,7 @@ use super::Vecs; pub struct Stores { pub database: Database, - pub addressbyteshash_to_typeindex: Store, + pub addresshash_to_typeindex: Store, pub blockhashprefix_to_height: Store, pub height_to_coinbase_tag: Store, pub txidprefix_to_txindex: Store, @@ -56,10 +56,10 @@ impl Stores { Mode3::PushOnly, Kind3::Sequential, )?, - addressbyteshash_to_typeindex: Store::import( + addresshash_to_typeindex: Store::import( database_ref, path, - "addressbyteshash_to_typeindex", + "addresshash_to_typeindex", version, Mode3::PushOnly, Kind3::Random, @@ -112,7 +112,7 @@ impl Stores { pub fn commit(&mut self, height: Height) -> Result<()> { [ - &mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore, + &mut self.addresshash_to_typeindex as &mut dyn AnyStore, &mut self.blockhashprefix_to_height, &mut self.height_to_coinbase_tag, &mut self.txidprefix_to_txindex, @@ -129,7 +129,7 @@ impl Stores { fn iter_any_store(&self) -> impl Iterator { [ - &self.addressbyteshash_to_typeindex as &dyn AnyStore, + &self.addresshash_to_typeindex as &dyn AnyStore, &self.blockhashprefix_to_height, &self.height_to_coinbase_tag, &self.txidprefix_to_txindex, @@ -144,7 +144,7 @@ impl Stores { vecs: &mut Vecs, starting_indexes: &Indexes, ) -> Result<()> { - if self.addressbyteshash_to_typeindex.is_empty()? + if self.addresshash_to_typeindex.is_empty()? && self.blockhashprefix_to_height.is_empty()? && self.txidprefix_to_txindex.is_empty()? && self.height_to_coinbase_tag.is_empty()? @@ -180,8 +180,8 @@ impl Stores { while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresshash_to_typeindex.remove(hash); index.increment(); } } @@ -195,8 +195,8 @@ impl Stores { while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresshash_to_typeindex.remove(hash); index.increment(); } } @@ -210,8 +210,8 @@ impl Stores { while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresshash_to_typeindex.remove(hash); index.increment(); } } @@ -225,8 +225,8 @@ impl Stores { while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresshash_to_typeindex.remove(hash); index.increment(); } } @@ -240,8 +240,8 @@ impl Stores { while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresshash_to_typeindex.remove(hash); index.increment(); } } @@ -255,8 +255,8 @@ impl Stores { while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresshash_to_typeindex.remove(hash); index.increment(); } } @@ -270,8 +270,8 @@ impl Stores { while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresshash_to_typeindex.remove(hash); index.increment(); } } @@ -285,15 +285,15 @@ impl Stores { while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) { let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from(&bytes); - self.addressbyteshash_to_typeindex.remove(hash); + let hash = AddressHash::from(&bytes); + self.addresshash_to_typeindex.remove(hash); index.increment(); } } } else { unreachable!(); // self.blockhashprefix_to_height.reset()?; - // self.addressbyteshash_to_typeindex.reset()?; + // self.addresshash_to_typeindex.reset()?; } if starting_indexes.txindex != TxIndex::ZERO { diff --git a/crates/brk_mcp/src/lib.rs b/crates/brk_mcp/src/lib.rs index ae453ca7c..601367a34 100644 --- a/crates/brk_mcp/src/lib.rs +++ b/crates/brk_mcp/src/lib.rs @@ -37,7 +37,7 @@ Get the count of unique metrics. async fn get_metric_count(&self) -> Result { info!("mcp: distinct_metric_count"); Ok(CallToolResult::success(vec![ - Content::json(self.query.distinct_metric_count()).unwrap(), + Content::json(self.query.distinct_metric_count().await).unwrap(), ])) } @@ -47,7 +47,7 @@ Get the count of all metrics. (distinct metrics multiplied by the number of inde async fn get_vec_count(&self) -> Result { info!("mcp: total_metric_count"); Ok(CallToolResult::success(vec![ - Content::json(self.query.total_metric_count()).unwrap(), + Content::json(self.query.total_metric_count().await).unwrap(), ])) } @@ -57,7 +57,7 @@ Get the list of all existing indexes and their accepted variants. async fn get_indexes(&self) -> Result { info!("mcp: get_indexes"); Ok(CallToolResult::success(vec![ - Content::json(self.query.get_indexes()).unwrap(), + Content::json(self.query.get_indexes().await).unwrap(), ])) } @@ -72,7 +72,7 @@ If the `page` param is omitted, it will default to the first page. ) -> Result { info!("mcp: get_metrics"); Ok(CallToolResult::success(vec![ - Content::json(self.query.get_metrics(pagination)).unwrap(), + Content::json(self.query.get_metrics(pagination).await).unwrap(), ])) } @@ -87,7 +87,7 @@ If the `page` param is omitted, it will default to the first page. ) -> Result { info!("mcp: get_index_to_vecids"); Ok(CallToolResult::success(vec![ - Content::json(self.query.get_index_to_vecids(paginated_index)).unwrap(), + Content::json(self.query.get_index_to_vecids(paginated_index).await).unwrap(), ])) } @@ -101,7 +101,7 @@ The list will be empty if the vec id isn't correct. ) -> Result { info!("mcp: get_vecid_to_indexes"); Ok(CallToolResult::success(vec![ - Content::json(self.query.metric_to_indexes(param.id)).unwrap(), + Content::json(self.query.metric_to_indexes(param.id).await).unwrap(), ])) } @@ -112,10 +112,13 @@ The response's format will depend on the given parameters, it will be: - A list: If requested only one vec and the given range returns multiple values (for example: `from=-1000&count=100` or `from=-444&to=-333`) - A matrix: When multiple vecs are requested, even if they each return one value. ")] - fn get_vecs(&self, Parameters(params): Parameters) -> Result { + async fn get_vecs( + &self, + Parameters(params): Parameters, + ) -> Result { info!("mcp: get_vecs"); Ok(CallToolResult::success(vec![Content::text( - match self.query.search_and_format(params) { + match self.query.search_and_format(params).await { Ok(output) => output.to_string(), Err(e) => format!("Error:\n{e}"), }, diff --git a/crates/brk_query/src/async.rs b/crates/brk_query/src/async.rs index d6b1036af..df65faf72 100644 --- a/crates/brk_query/src/async.rs +++ b/crates/brk_query/src/async.rs @@ -1,18 +1,130 @@ -// Should be async -// anything related to IO should use -// -// Sync function -// fn get(db: &CandyStore, key: &str) -> Option> { -// db.get(key).ok().flatten() -// } +use std::collections::BTreeMap; -use crate::Query; +use brk_computer::Computer; +use brk_error::Result; +use brk_indexer::Indexer; +use brk_reader::Reader; +use brk_types::{ + Address, AddressStats, Height, Index, IndexInfo, Limit, Metric, MetricCount, Transaction, + TreeNode, TxidPath, +}; +#[cfg(feature = "tokio")] +use tokio::task::spawn_blocking; + +use crate::{ + Output, PaginatedIndexParam, PaginatedMetrics, PaginationParam, Params, ParamsOpt, Query, + vecs::{IndexToVec, MetricToVec, Vecs}, +}; -// // Async function -// async fn get_async(db: Arc, key: String) -> Option> { -// tokio::task::spawn_blocking(move || { -// db.get(&key).ok().flatten() -// }).await.ok()? -// } #[derive(Clone)] +#[cfg(feature = "tokio")] pub struct AsyncQuery(Query); + +impl AsyncQuery { + pub async fn build(reader: &Reader, indexer: &Indexer, computer: &Computer) -> Self { + Self(Query::build(reader, indexer, computer)) + } + + pub async fn get_height(&self) -> Height { + self.0.get_height() + } + + pub async fn get_address(&self, address: Address) -> Result { + let query = self.0.clone(); + spawn_blocking(move || query.get_address(address)).await? + } + + pub async fn get_transaction(&self, txid: TxidPath) -> Result { + let query = self.0.clone(); + spawn_blocking(move || query.get_transaction(txid)).await? + } + + pub async fn match_metric(&self, metric: Metric, limit: Limit) -> Result> { + let query = self.0.clone(); + spawn_blocking(move || Ok(query.match_metric(&metric, limit))).await? + } + + // pub async fn search_metric_with_index( + // &self, + // metric: &str, + // index: Index, + // // params: &Params, + // ) -> Result> { + // let query = self.0.clone(); + // spawn_blocking(move || query.search_metric_with_index(metric, index)).await? + // } + + // pub async fn format( + // &self, + // metrics: Vec<(String, &&dyn AnyCollectableVec)>, + // params: &ParamsOpt, + // ) -> Result { + // let query = self.0.clone(); + // spawn_blocking(move || query.format(metrics, params)).await? + // } + + pub async fn search_and_format(&self, params: Params) -> Result { + let query = self.0.clone(); + spawn_blocking(move || query.search_and_format(params)).await? + } + + pub async fn metric_to_index_to_vec(&self) -> &BTreeMap<&str, IndexToVec<'_>> { + self.0.metric_to_index_to_vec() + } + + pub async fn index_to_metric_to_vec(&self) -> &BTreeMap> { + self.0.index_to_metric_to_vec() + } + + pub async fn metric_count(&self) -> MetricCount { + self.0.metric_count() + } + + pub async fn distinct_metric_count(&self) -> usize { + self.0.distinct_metric_count() + } + + pub async fn total_metric_count(&self) -> usize { + self.0.total_metric_count() + } + + pub async fn get_indexes(&self) -> &[IndexInfo] { + self.0.get_indexes() + } + + pub async fn get_metrics(&self, pagination: PaginationParam) -> PaginatedMetrics { + self.0.get_metrics(pagination) + } + + pub async fn get_metrics_catalog(&self) -> &TreeNode { + self.0.get_metrics_catalog() + } + + pub async fn get_index_to_vecids(&self, paginated_index: PaginatedIndexParam) -> Vec<&str> { + self.0.get_index_to_vecids(paginated_index) + } + + pub async fn metric_to_indexes(&self, metric: String) -> Option<&Vec> { + self.0.metric_to_indexes(metric) + } + + #[inline] + pub async fn reader(&self) -> &Reader { + self.0.reader() + } + + #[inline] + pub async fn indexer(&self) -> &Indexer { + self.0.indexer() + } + + #[inline] + pub async fn computer(&self) -> &Computer { + self.0.computer() + } + + #[inline] + pub async fn vecs(&self) -> &'static Vecs<'static> { + self.0.vecs() + } +} diff --git a/crates/brk_query/src/chain/addresses.rs b/crates/brk_query/src/chain/addresses.rs index 03d145368..ca6d569c5 100644 --- a/crates/brk_query/src/chain/addresses.rs +++ b/crates/brk_query/src/chain/addresses.rs @@ -3,10 +3,10 @@ use std::str::FromStr; use bitcoin::{Network, PublicKey, ScriptBuf}; use brk_error::{Error, Result}; use brk_types::{ - Address, AddressBytes, AddressBytesHash, AddressChainStats, AddressMempoolStats, AddressStats, + Address, AddressBytes, AddressChainStats, AddressHash, AddressMempoolStats, AddressStats, AnyAddressDataIndexEnum, OutputType, }; -use vecdb::{AnyIterableVec, VecIterator}; +use vecdb::{AnyIterableVec, VecIteratorExtended}; use crate::Query; @@ -27,14 +27,17 @@ pub fn get_address(Address { address }: Address, query: &Query) -> Result Result stateful - .p2pk33addressindex_to_anyaddressindex - .iter() - .unwrap_get_inner(type_index.into()), + .any_address_indexes + .p2pk33 + .iter()? + .get_unwrap(type_index.into()), OutputType::P2PK65 => stateful - .p2pk65addressindex_to_anyaddressindex - .iter() - .unwrap_get_inner(type_index.into()), + .any_address_indexes + .p2pk65 + .iter()? + .get_unwrap(type_index.into()), OutputType::P2PKH => stateful - .p2pkhaddressindex_to_anyaddressindex - .iter() - .unwrap_get_inner(type_index.into()), + .any_address_indexes + .p2pkh + .iter()? + .get_unwrap(type_index.into()), OutputType::P2SH => stateful - .p2shaddressindex_to_anyaddressindex - .iter() - .unwrap_get_inner(type_index.into()), + .any_address_indexes + .p2sh + .iter()? + .get_unwrap(type_index.into()), OutputType::P2TR => stateful - .p2traddressindex_to_anyaddressindex - .iter() - .unwrap_get_inner(type_index.into()), + .any_address_indexes + .p2tr + .iter()? + .get_unwrap(type_index.into()), OutputType::P2WPKH => stateful - .p2wpkhaddressindex_to_anyaddressindex - .iter() - .unwrap_get_inner(type_index.into()), + .any_address_indexes + .p2wpkh + .iter()? + .get_unwrap(type_index.into()), OutputType::P2WSH => stateful - .p2wshaddressindex_to_anyaddressindex - .iter() - .unwrap_get_inner(type_index.into()), + .any_address_indexes + .p2wsh + .iter()? + .get_unwrap(type_index.into()), OutputType::P2A => stateful - .p2aaddressindex_to_anyaddressindex - .iter() - .unwrap_get_inner(type_index.into()), + .any_address_indexes + .p2a + .iter()? + .get_unwrap(type_index.into()), t => { return Err(Error::UnsupportedType(t.to_string())); } }; let address_data = match any_address_index.to_enum() { - AnyAddressDataIndexEnum::Loaded(index) => stateful - .loadedaddressindex_to_loadedaddressdata - .iter() - .unwrap_get_inner(index), + AnyAddressDataIndexEnum::Loaded(index) => { + stateful.addresses_data.loaded.iter()?.get_unwrap(index) + } AnyAddressDataIndexEnum::Empty(index) => stateful - .emptyaddressindex_to_emptyaddressdata - .iter() - .unwrap_get_inner(index) + .addresses_data + .empty + .iter()? + .get_unwrap(index) .into(), }; diff --git a/crates/brk_query/src/chain/transactions.rs b/crates/brk_query/src/chain/transactions.rs index cb6b7fd09..60a778d20 100644 --- a/crates/brk_query/src/chain/transactions.rs +++ b/crates/brk_query/src/chain/transactions.rs @@ -8,11 +8,11 @@ use bitcoin::consensus::Decodable; use brk_error::{Error, Result}; use brk_reader::XORIndex; use brk_types::{Transaction, Txid, TxidPath, TxidPrefix}; -use vecdb::VecIterator; +use vecdb::VecIteratorExtended; use crate::Query; -pub fn get_transaction_info(TxidPath { txid }: TxidPath, query: &Query) -> Result { +pub fn get_transaction(TxidPath { txid }: TxidPath, query: &Query) -> Result { let Ok(txid) = bitcoin::Txid::from_str(&txid) else { return Err(Error::InvalidTxid); }; @@ -29,21 +29,13 @@ pub fn get_transaction_info(TxidPath { txid }: TxidPath, query: &Query) -> Resul return Err(Error::UnknownTxid); }; - let txid = indexer.vecs.txindex_to_txid.iter().unwrap_get_inner(index); + let txid = indexer.vecs.txindex_to_txid.iter()?.get_unwrap(index); let reader = query.reader(); let computer = query.computer(); - let position = computer - .blks - .txindex_to_position - .iter() - .unwrap_get_inner(index); - let len = indexer - .vecs - .txindex_to_total_size - .iter() - .unwrap_get_inner(index); + let position = computer.blks.txindex_to_position.iter()?.get_unwrap(index); + let len = indexer.vecs.txindex_to_total_size.iter()?.get_unwrap(index); let blk_index_to_blk_path = reader.blk_index_to_blk_path(); diff --git a/crates/brk_query/src/lib.rs b/crates/brk_query/src/lib.rs index ecc588574..9b8209961 100644 --- a/crates/brk_query/src/lib.rs +++ b/crates/brk_query/src/lib.rs @@ -28,7 +28,7 @@ pub use params::{Params, ParamsDeprec, ParamsOpt}; use vecs::Vecs; use crate::{ - chain::{get_address, get_transaction_info}, + chain::{get_address, get_transaction}, vecs::{IndexToVec, MetricToVec}, }; @@ -64,11 +64,11 @@ impl Query { get_address(address, self) } - pub fn get_transaction_info(&self, txid: TxidPath) -> Result { - get_transaction_info(txid, self) + pub fn get_transaction(&self, txid: TxidPath) -> Result { + get_transaction(txid, self) } - pub fn match_metric(&self, metric: &Metric, limit: Limit) -> Vec<&str> { + pub fn match_metric(&self, metric: &Metric, limit: Limit) -> Vec<&'static str> { self.vecs().matches(metric, limit) } diff --git a/crates/brk_query/src/vecs.rs b/crates/brk_query/src/vecs.rs index 049a16847..93783c751 100644 --- a/crates/brk_query/src/vecs.rs +++ b/crates/brk_query/src/vecs.rs @@ -89,9 +89,7 @@ impl<'a> Vecs<'a> { .iter() .map(|(index, id_to_vec)| (*index, id_to_vec.keys().cloned().collect::>())) .collect(); - this.index_to_metrics - .values_mut() - .for_each(|ids| sort_ids(ids)); + this.index_to_metrics.values_mut().for_each(sort_ids); this.catalog.replace( TreeNode::Branch( [ diff --git a/crates/brk_server/src/api/addresses/mod.rs b/crates/brk_server/src/api/addresses/mod.rs index 17fbde5f5..1c7023f0c 100644 --- a/crates/brk_server/src/api/addresses/mod.rs +++ b/crates/brk_server/src/api/addresses/mod.rs @@ -30,14 +30,11 @@ impl AddressRoutes for ApiRouter { Path(address): Path
, State(state): State | { - let etag = format!("{VERSION}-{}", state.get_height()); + let etag = format!("{VERSION}-{}", state.get_height().await); if headers.has_etag(&etag) { return Response::new_not_modified(); } - match state.get_address(address).with_status() { - Ok(value) => Response::new_json(&value, &etag), - Err((status, message)) => Response::new_json_with(status, &message, &etag) - } + state.get_address(address).await.to_json_response(&etag) }, |op| op .addresses_tag() .summary("Address information") diff --git a/crates/brk_server/src/api/metrics/mod.rs b/crates/brk_server/src/api/metrics/mod.rs index 860b93848..1f5200da8 100644 --- a/crates/brk_server/src/api/metrics/mod.rs +++ b/crates/brk_server/src/api/metrics/mod.rs @@ -11,7 +11,7 @@ use brk_types::{Index, IndexInfo, Limit, Metric, MetricCount, Metrics}; use crate::{ VERSION, - extended::{HeaderMapExtended, ResponseExtended, TransformResponseExtended}, + extended::{HeaderMapExtended, ResponseExtended, ResultExtended, TransformResponseExtended}, }; use super::AppState; @@ -38,7 +38,7 @@ impl ApiMetricsRoutes for ApiRouter { if headers.has_etag(etag) { return Response::new_not_modified(); } - Response::new_json(state.metric_count(), etag) + Response::new_json(state.metric_count().await, etag) }, |op| op .metrics_tag() @@ -59,7 +59,7 @@ impl ApiMetricsRoutes for ApiRouter { if headers.has_etag(etag) { return Response::new_not_modified(); } - Response::new_json(state.get_indexes(), etag) + state.get_indexes().await.to_json_response(etag) }, |op| op .metrics_tag() @@ -83,7 +83,7 @@ impl ApiMetricsRoutes for ApiRouter { if headers.has_etag(etag) { return Response::new_not_modified(); } - Response::new_json(state.get_metrics(pagination), etag) + Response::new_json(state.get_metrics(pagination).await, etag) }, |op| op .metrics_tag() @@ -101,7 +101,7 @@ impl ApiMetricsRoutes for ApiRouter { if headers.has_etag(etag) { return Response::new_not_modified(); } - Response::new_json(state.get_metrics_catalog(), etag) + Response::new_json(state.get_metrics_catalog().await, etag) }, |op| op .metrics_tag() @@ -126,7 +126,7 @@ impl ApiMetricsRoutes for ApiRouter { if headers.has_etag(etag) { return Response::new_not_modified(); } - Response::new_json(state.match_metric(&metric, limit), etag) + state.match_metric(metric, limit).await.to_json_response(etag) }, |op| op .metrics_tag() @@ -151,7 +151,7 @@ impl ApiMetricsRoutes for ApiRouter { if let Some(indexes) = state.metric_to_indexes(metric.clone()) { return Response::new_json(indexes, etag) } - let value = if let Some(first) = state.match_metric(&metric, Limit::MIN).first() { + let value = if let Some(first) = state.match_metric(metric, Limit::MIN).await?.first() { format!("Could not find '{metric}', did you mean '{first}' ?") } else { format!("Could not find '{metric}'.") diff --git a/crates/brk_server/src/api/transactions/mod.rs b/crates/brk_server/src/api/transactions/mod.rs index 765fd241b..3b600d9ae 100644 --- a/crates/brk_server/src/api/transactions/mod.rs +++ b/crates/brk_server/src/api/transactions/mod.rs @@ -31,14 +31,11 @@ impl TxRoutes for ApiRouter { Path(txid): Path, State(state): State | { - let etag = format!("{VERSION}-{}", state.get_height()); + let etag = format!("{VERSION}-{}", state.get_height().await); if headers.has_etag(&etag) { return Response::new_not_modified(); } - match state.get_transaction_info(txid).with_status() { - Ok(value) => Response::new_json(&value, &etag), - Err((status, message)) => Response::new_json_with(status, &message, &etag) - } + state.get_transaction(txid).await.to_json_response(&etag) }, |op| op .transactions_tag() diff --git a/crates/brk_server/src/extended/result.rs b/crates/brk_server/src/extended/result.rs index 9cb606e6c..7b46a940d 100644 --- a/crates/brk_server/src/extended/result.rs +++ b/crates/brk_server/src/extended/result.rs @@ -1,8 +1,13 @@ -use axum::http::StatusCode; +use axum::{http::StatusCode, response::Response}; use brk_error::{Error, Result}; +use crate::extended::ResponseExtended; + pub trait ResultExtended { fn with_status(self) -> Result; + fn to_json_response(self, etag: &str) -> Response + where + T: sonic_rs::Serialize; } impl ResultExtended for Result { @@ -21,4 +26,14 @@ impl ResultExtended for Result { ) }) } + + fn to_json_response(self, etag: &str) -> Response + where + T: sonic_rs::Serialize, + { + match self.with_status() { + Ok(value) => Response::new_json(&value, etag), + Err((status, message)) => Response::new_json_with(status, &message, etag), + } + } } diff --git a/crates/brk_store/src/fjall_v2/mod.rs b/crates/brk_store/src/fjall_v2/mod.rs index edb9235d5..13b5a7df5 100644 --- a/crates/brk_store/src/fjall_v2/mod.rs +++ b/crates/brk_store/src/fjall_v2/mod.rs @@ -4,8 +4,8 @@ use brk_error::Result; use brk_types::{Height, Version}; use byteview6::ByteView; use fjall2::{ - InnerItem, PartitionCreateOptions, TransactionalKeyspace, TransactionalPartitionHandle, - ValueType, + CompressionType, InnerItem, PartitionCreateOptions, TransactionalKeyspace, + TransactionalPartitionHandle, ValueType, }; use rustc_hash::{FxHashMap, FxHashSet}; @@ -23,7 +23,6 @@ pub struct StoreFjallV2 { partition: TransactionalPartitionHandle, puts: FxHashMap, dels: FxHashSet, - mode: Mode, } const MAJOR_FJALL_VERSION: Version = Version::TWO; @@ -44,8 +43,11 @@ where keyspace: &TransactionalKeyspace, name: &str, mode: Mode, + compression: CompressionType, ) -> Result { - let mut options = PartitionCreateOptions::default().manual_journal_persist(true); + let mut options = PartitionCreateOptions::default() + .compression(compression) + .manual_journal_persist(true); if mode.is_unique_push_only() { options = options.bloom_filter_bits(Some(7)); @@ -64,6 +66,7 @@ where name: &str, version: Version, mode: Mode, + compression: CompressionType, ) -> Result { fs::create_dir_all(path)?; @@ -72,7 +75,7 @@ where &path.join(format!("meta/{name}")), MAJOR_FJALL_VERSION + version, || { - Self::open_partition_handle(keyspace, name, mode).inspect_err(|e| { + Self::open_partition_handle(keyspace, name, mode, compression).inspect_err(|e| { eprintln!("{e}"); eprintln!("Delete {path:?} and try again"); }) @@ -86,7 +89,6 @@ where partition, puts: FxHashMap::default(), dels: FxHashSet::default(), - mode, }) } @@ -285,7 +287,7 @@ where Item::Tomb(key) => Self { key: key.into().into(), value: [].into(), - value_type: ValueType::WeakTombstone, + value_type: ValueType::Tombstone, }, } } diff --git a/crates/brk_store/src/fjall_v3/mod.rs b/crates/brk_store/src/fjall_v3/mod.rs index 8c15066c0..b39422ca4 100644 --- a/crates/brk_store/src/fjall_v3/mod.rs +++ b/crates/brk_store/src/fjall_v3/mod.rs @@ -308,7 +308,7 @@ impl Item { // keyspace_id, // key: key.into().into(), // value: [].into(), - // value_type: ValueType::WeakTombstone, + // value_type: ValueType::Tombstone, // }, // } // } diff --git a/crates/brk_types/src/addressbytes.rs b/crates/brk_types/src/addressbytes.rs index 3d0c89d9b..aedbea463 100644 --- a/crates/brk_types/src/addressbytes.rs +++ b/crates/brk_types/src/addressbytes.rs @@ -35,23 +35,7 @@ impl AddressBytes { } pub fn hash(&self) -> u64 { - let mut slice = rapidhash::v3::rapidhash_v3(self.as_slice()).to_le_bytes(); - slice[0] = slice[0].wrapping_add(self.index()); - u64::from_ne_bytes(slice) - } - - fn index(&self) -> u8 { - // DO NOT CHANGE !!! - match self { - AddressBytes::P2PK65(_) => 0, - AddressBytes::P2PK33(_) => 1, - AddressBytes::P2PKH(_) => 2, - AddressBytes::P2SH(_) => 3, - AddressBytes::P2WPKH(_) => 4, - AddressBytes::P2WSH(_) => 5, - AddressBytes::P2TR(_) => 6, - AddressBytes::P2A(_) => 7, - } + rapidhash::v3::rapidhash_v3(self.as_slice()).to_le() } } diff --git a/crates/brk_types/src/addressbyteshash.rs b/crates/brk_types/src/addresshash.rs similarity index 69% rename from crates/brk_types/src/addressbyteshash.rs rename to crates/brk_types/src/addresshash.rs index ac62876cf..e110db30c 100644 --- a/crates/brk_types/src/addressbyteshash.rs +++ b/crates/brk_types/src/addresshash.rs @@ -21,32 +21,32 @@ use super::AddressBytes; KnownLayout, Hash, )] -pub struct AddressBytesHash(u64); +pub struct AddressHash(u64); -impl From<&AddressBytes> for AddressBytesHash { +impl From<&AddressBytes> for AddressHash { #[inline] fn from(address_bytes: &AddressBytes) -> Self { Self(address_bytes.hash()) } } -impl From for AddressBytesHash { +impl From for AddressHash { #[inline] fn from(value: ByteView) -> Self { Self(u64::from_be_bytes(copy_first_8bytes(&value).unwrap())) } } -impl From for ByteView { +impl From for ByteView { #[inline] - fn from(value: AddressBytesHash) -> Self { + fn from(value: AddressHash) -> Self { Self::from(&value) } } -impl From<&AddressBytesHash> for ByteView { +impl From<&AddressHash> for ByteView { #[inline] - fn from(value: &AddressBytesHash) -> Self { + fn from(value: &AddressHash) -> Self { Self::new(&value.0.to_be_bytes()) } } diff --git a/crates/brk_types/src/addressindexoutpoint.rs b/crates/brk_types/src/addressindexoutpoint.rs index e04e6e8ef..e8eb84de6 100644 --- a/crates/brk_types/src/addressindexoutpoint.rs +++ b/crates/brk_types/src/addressindexoutpoint.rs @@ -17,7 +17,7 @@ pub struct AddressIndexOutPoint { impl Hash for AddressIndexOutPoint { fn hash(&self, state: &mut H) { - let mut buf = [0u8; 11]; + let mut buf = [0u8; 10]; buf[0..8].copy_from_slice(self.addressindextxindex.as_bytes()); buf[8..].copy_from_slice(self.vout.as_bytes()); state.write(&buf); diff --git a/crates/brk_types/src/addresstypeaddressindexoutpoint.rs b/crates/brk_types/src/addresstypeaddressindexoutpoint.rs deleted file mode 100644 index 9ca703bc0..000000000 --- a/crates/brk_types/src/addresstypeaddressindexoutpoint.rs +++ /dev/null @@ -1,73 +0,0 @@ -use std::hash::{Hash, Hasher}; - -use byteview::ByteView; -use serde::Serialize; -use zerocopy::IntoBytes; - -use crate::{AddressTypeAddressIndexTxIndex, OutputType, Vout}; - -use super::{OutPoint, TypeIndex}; - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)] -#[repr(C)] -pub struct AddressTypeAddressIndexOutPoint { - addresstypeaddressindextxindex: AddressTypeAddressIndexTxIndex, // (u8; u64) - vout: Vout, // u16 -} - -impl Hash for AddressTypeAddressIndexOutPoint { - fn hash(&self, state: &mut H) { - let mut buf = [0u8; 11]; - buf[..1].copy_from_slice(self.addresstypeaddressindextxindex.addresstype().as_bytes()); - buf[1..9].copy_from_slice( - self.addresstypeaddressindextxindex - .addressindextxindex() - .as_bytes(), - ); - buf[9..].copy_from_slice(self.vout.as_bytes()); - state.write(&buf); - } -} - -impl From<(OutputType, TypeIndex, OutPoint)> for AddressTypeAddressIndexOutPoint { - #[inline] - fn from((addresstype, addressindex, outpoint): (OutputType, TypeIndex, OutPoint)) -> Self { - Self { - addresstypeaddressindextxindex: AddressTypeAddressIndexTxIndex::from(( - addresstype, - addressindex, - outpoint.txindex(), - )), - vout: outpoint.vout(), - } - } -} - -impl From for AddressTypeAddressIndexOutPoint { - #[inline] - fn from(value: ByteView) -> Self { - Self { - addresstypeaddressindextxindex: AddressTypeAddressIndexTxIndex::from(&value[0..9]), - vout: Vout::from(&value[9..]), - } - } -} - -impl From for ByteView { - #[inline] - fn from(value: AddressTypeAddressIndexOutPoint) -> Self { - ByteView::from(&value) - } -} -impl From<&AddressTypeAddressIndexOutPoint> for ByteView { - #[inline] - fn from(value: &AddressTypeAddressIndexOutPoint) -> Self { - ByteView::from( - [ - &ByteView::from(value.addresstypeaddressindextxindex), - value.vout.to_be_bytes().as_slice(), - ] - .concat(), - ) - } -} diff --git a/crates/brk_types/src/addresstypeaddressindextxindex.rs b/crates/brk_types/src/addresstypeaddressindextxindex.rs deleted file mode 100644 index 5b06e2514..000000000 --- a/crates/brk_types/src/addresstypeaddressindextxindex.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::hash::{Hash, Hasher}; - -use byteview::ByteView; -use serde::Serialize; -use zerocopy::IntoBytes; - -use crate::OutputType; - -use super::{TxIndex, TypeIndex}; - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)] -pub struct AddressTypeAddressIndexTxIndex { - addresstype: OutputType, - addressindextxindex: u64, -} - -impl Hash for AddressTypeAddressIndexTxIndex { - fn hash(&self, state: &mut H) { - let mut buf = [0u8; 9]; - buf[..1].copy_from_slice(self.addresstype.as_bytes()); - buf[1..].copy_from_slice(self.addressindextxindex.as_bytes()); - state.write(&buf); - } -} - -impl AddressTypeAddressIndexTxIndex { - pub fn addresstype(&self) -> OutputType { - self.addresstype - } - - pub fn addressindex(&self) -> u32 { - (self.addressindextxindex >> 32) as u32 - } - - pub fn txindex(&self) -> u32 { - self.addressindextxindex as u32 - } - - pub fn addressindextxindex(&self) -> u64 { - self.addressindextxindex - } -} - -impl From<(OutputType, TypeIndex, TxIndex)> for AddressTypeAddressIndexTxIndex { - #[inline] - fn from((addresstype, addressindex, txindex): (OutputType, TypeIndex, TxIndex)) -> Self { - Self { - addresstype, - addressindextxindex: (u64::from(addressindex) << 32) | u64::from(txindex), - } - } -} - -impl From for AddressTypeAddressIndexTxIndex { - #[inline] - fn from(value: ByteView) -> Self { - Self::from(&*value) - } -} - -impl From<&[u8]> for AddressTypeAddressIndexTxIndex { - #[inline] - fn from(value: &[u8]) -> Self { - let addresstype = OutputType::from(&value[0..1]); - let addressindex = TypeIndex::from(&value[1..5]); - let txindex = TxIndex::from(&value[5..9]); - Self::from((addresstype, addressindex, txindex)) - } -} - -impl From for ByteView { - #[inline] - fn from(value: AddressTypeAddressIndexTxIndex) -> Self { - ByteView::from(&value) - } -} -impl From<&AddressTypeAddressIndexTxIndex> for ByteView { - #[inline] - fn from(value: &AddressTypeAddressIndexTxIndex) -> Self { - ByteView::from( - [ - value.addresstype.as_bytes(), - value.addressindextxindex.to_be_bytes().as_slice(), - ] - .concat(), - ) - } -} diff --git a/crates/brk_types/src/lib.rs b/crates/brk_types/src/lib.rs index ff08143b7..d41cdfd5a 100644 --- a/crates/brk_types/src/lib.rs +++ b/crates/brk_types/src/lib.rs @@ -6,14 +6,12 @@ use brk_error::{Error, Result}; mod address; mod addressbytes; -mod addressbyteshash; mod addresschainstats; +mod addresshash; mod addressindexoutpoint; mod addressindextxindex; mod addressmempoolstats; mod addressstats; -mod addresstypeaddressindexoutpoint; -mod addresstypeaddressindextxindex; mod anyaddressindex; mod bitcoin; mod blkmetadata; @@ -106,14 +104,12 @@ mod yearindex; pub use address::*; pub use addressbytes::*; -pub use addressbyteshash::*; pub use addresschainstats::*; +pub use addresshash::*; pub use addressindexoutpoint::*; pub use addressindextxindex::*; pub use addressmempoolstats::*; pub use addressstats::*; -pub use addresstypeaddressindexoutpoint::*; -pub use addresstypeaddressindextxindex::*; pub use anyaddressindex::*; pub use bitcoin::*; pub use blkmetadata::*;