global: snapshot

This commit is contained in:
nym21
2025-11-09 11:25:13 +01:00
parent e77fe0253e
commit dc2e847f58
30 changed files with 521 additions and 497 deletions

1
Cargo.lock generated
View File

@@ -679,6 +679,7 @@ dependencies = [
"jiff",
"minreq",
"sonic-rs",
"tokio",
"vecdb",
"zerocopy",
]

View File

@@ -5,7 +5,7 @@ use brk_error::Result;
use brk_fetcher::Fetcher;
use brk_indexer::Indexer;
use brk_types::{Address, AddressBytes, OutputType, TxOutIndex, pools};
use vecdb::{AnyIterableVec, Exit, VecIterator, VecIteratorExtended};
use vecdb::{AnyIterableVec, Exit, VecIteratorExtended};
fn main() -> Result<()> {
brk_logger::init(Some(Path::new(".log")))?;

View File

@@ -81,7 +81,7 @@ impl Vecs {
.enumerate()
.try_for_each(|(i, v)| -> Result<()> {
self.height_to_price_ohlc_in_cents.forced_push_at(
i.into(),
i,
self.fetcher
.get_height(
i.into(),

View File

@@ -57,10 +57,7 @@ impl ComputedValueVecsFromTxindex {
&name_btc,
version + VERSION,
source_vec.map_or_else(|| sats.txindex.as_ref().unwrap().boxed_clone(), |s| s),
|txindex: TxIndex, iter| {
iter.get_at(txindex.to_usize())
.map(|sats| Bitcoin::from(sats))
},
|txindex: TxIndex, iter| iter.get_at(txindex.to_usize()).map(Bitcoin::from),
);
let bitcoin = ComputedVecsFromTxindex::forced_import(

View File

@@ -163,7 +163,7 @@ impl Vecs {
let start = usize::from(start);
let end = txindex_to_first_txinindex_iter
.get_at(txindex + 1)
.map(|v| usize::from(v))
.map(usize::from)
.unwrap_or_else(|| txinindex_to_txoutindex_iter.len());
StoredU64::from((start..end).count())
})
@@ -183,7 +183,7 @@ impl Vecs {
let start = usize::from(start);
let end = txindex_to_first_txoutindex_iter
.get_at(txindex + 1)
.map(|v| usize::from(v))
.map(usize::from)
.unwrap_or_else(|| txoutindex_to_value_iter.len());
StoredU64::from((start..end).count())
})

View File

@@ -17,5 +17,6 @@ fjall3 = { workspace = true }
jiff = { workspace = true }
minreq = { workspace = true }
sonic-rs = { workspace = true }
tokio = { workspace = true }
vecdb = { workspace = true }
zerocopy = { workspace = true }

View File

@@ -24,6 +24,7 @@ pub enum Error {
BitcoinFromScriptError(bitcoin::address::FromScriptError),
BitcoinHexToArrayError(bitcoin::hex::HexToArrayError),
SonicRS(sonic_rs::Error),
TokioJoin(tokio::task::JoinError),
ZeroCopyError,
Vecs(vecdb::Error),
@@ -92,6 +93,13 @@ impl From<sonic_rs::Error> for Error {
}
}
impl From<tokio::task::JoinError> for Error {
#[inline]
fn from(error: tokio::task::JoinError) -> Self {
Self::TokioJoin(error)
}
}
impl From<io::Error> for Error {
#[inline]
fn from(value: io::Error) -> Self {
@@ -186,6 +194,7 @@ impl fmt::Display for Error {
Error::RawDB(error) => Display::fmt(&error, f),
Error::SonicRS(error) => Display::fmt(&error, f),
Error::SystemTimeError(error) => Display::fmt(&error, f),
Error::TokioJoin(error) => Display::fmt(&error, f),
Error::VecDB(error) => Display::fmt(&error, f),
Error::Vecs(error) => Display::fmt(&error, f),
Error::ZeroCopyError => write!(f, "ZeroCopy error"),

View File

@@ -63,8 +63,8 @@ impl<T> ByAddressType<T> {
}
#[inline]
pub fn get_unwrap(&self, address_type: OutputType) -> &T {
self.get(address_type).unwrap()
pub fn get_unwrap(&self, addresstype: OutputType) -> &T {
self.get(addresstype).unwrap()
}
#[inline]
@@ -83,8 +83,8 @@ impl<T> ByAddressType<T> {
}
#[inline]
pub fn get_mut_unwrap(&mut self, address_type: OutputType) -> &mut T {
self.get_mut(address_type).unwrap()
pub fn get_mut_unwrap(&mut self, addresstype: OutputType) -> &mut T {
self.get_mut(addresstype).unwrap()
}
#[inline]

View File

@@ -88,7 +88,7 @@ Main indexing function processing blocks from parser with collision detection.
**Key-Value Stores:**
- `addressbyteshash_to_typeindex`: Address hash to internal index mapping
- `addresshash_to_typeindex`: Address hash to internal index mapping
- `blockhashprefix_to_height`: Block hash prefix to height lookup
- `txidprefix_to_txindex`: Transaction ID prefix to internal index
- `addresstype_to_typeindex_with_txoutindex`: Address type to output mappings
@@ -137,7 +137,7 @@ println!("Total addresses: {}", final_indexes.total_address_count());
```rust
use brk_indexer::Indexer;
use brk_types::{Height, TxidPrefix, AddressBytesHash};
use brk_types::{Height, TxidPrefix, AddressHash};
let indexer = Indexer::forced_import("./blockchain_index")?;
@@ -154,8 +154,8 @@ if let Some(tx_index) = indexer.stores.txidprefix_to_txindex.get(&txid_prefix)?
}
// Query address information
let address_hash = AddressBytesHash::from(/* address bytes */);
if let Some(type_index) = indexer.stores.addressbyteshash_to_typeindex.get(&address_hash)? {
let address_hash = AddressHash::from(/* address bytes */);
if let Some(type_index) = indexer.stores.addresshash_to_typeindex.get(&address_hash)? {
println!("Address type index: {}", type_index);
}
```

View File

@@ -8,10 +8,9 @@ use brk_iterator::Blocks;
use brk_rpc::Client;
use brk_store::AnyStore;
use brk_types::{
AddressBytes, AddressBytesHash, AddressTypeAddressIndexOutPoint,
AddressTypeAddressIndexTxIndex, BlockHashPrefix, Height, OutPoint, OutputType, Sats,
StoredBool, Timestamp, TxInIndex, TxIndex, TxOutIndex, Txid, TxidPrefix, TypeIndex, Unit,
Version, Vin, Vout,
AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height,
OutPoint, OutputType, Sats, StoredBool, Timestamp, TxInIndex, TxIndex, TxOutIndex, Txid,
TxidPrefix, TypeIndex, Unit, Version, Vin, Vout,
};
use log::{error, info};
use rayon::prelude::*;
@@ -333,7 +332,7 @@ impl Indexer {
TxIndex,
Vout,
OutputType,
Option<(AddressBytes, AddressBytesHash)>,
Option<(AddressBytes, AddressHash)>,
Option<TypeIndex>,
)> {
let txindex = indexes.txindex + block_txindex;
@@ -349,18 +348,21 @@ impl Indexer {
return Ok(tuple);
}
let address_bytes = AddressBytes::try_from((script, outputtype)).unwrap();
let addresstype = outputtype;
let address_hash = AddressBytesHash::from(&address_bytes);
let address_bytes = AddressBytes::try_from((script, addresstype)).unwrap();
let address_hash = AddressHash::from(&address_bytes);
let typeindex_opt = stores
.addressbyteshash_to_typeindex
.addresstype_to_addresshash_to_addressindex
.get_unwrap(addresstype)
.get(&address_hash)
.unwrap()
.map(|v| *v)
// Checking if not in the future (in case we started before the last processed block)
.and_then(|typeindex_local| {
(typeindex_local < indexes.to_typeindex(outputtype))
(typeindex_local < indexes.to_typeindex(addresstype))
.then_some(typeindex_local)
});
@@ -370,7 +372,7 @@ impl Indexer {
if check_collisions && let Some(typeindex) = typeindex_opt {
// unreachable!();
let prev_addressbytes_opt = match outputtype {
let prev_addressbytes_opt = match addresstype {
OutputType::P2PK65 => vecs
.p2pk65addressindex_to_p2pk65bytes
.get_pushed_or_read(
@@ -437,7 +439,10 @@ impl Indexer {
let address_bytes = &tuple.5.as_ref().unwrap().0;
if stores.addressbyteshash_to_typeindex.needs(height)
if stores
.addresstype_to_addresshash_to_addressindex
.get_unwrap(addresstype)
.needs(height)
&& prev_addressbytes != address_bytes
{
let txid = tx.compute_txid();
@@ -446,14 +451,14 @@ impl Indexer {
txid,
vout,
block_txindex,
outputtype,
addresstype,
prev_addressbytes,
address_bytes,
&indexes,
typeindex,
typeindex,
txout,
AddressBytesHash::from(address_bytes),
AddressHash::from(address_bytes),
);
panic!()
}
@@ -470,7 +475,7 @@ impl Indexer {
let tx_len = block.txdata.len();
// let i = Instant::now();
let mut already_added_addressbyteshash: FxHashMap<AddressBytesHash, TypeIndex> =
let mut already_added_addresshash: FxHashMap<AddressHash, TypeIndex> =
FxHashMap::default();
let mut same_block_output_info: FxHashMap<OutPoint, (OutputType, TypeIndex)> =
FxHashMap::default();
@@ -495,47 +500,27 @@ impl Indexer {
let typeindex = if let Some(ti) = typeindex_opt {
ti
} else if let Some((address_bytes, address_hash)) = addressbytes_opt {
if let Some(&ti) = already_added_addressbyteshash.get(&address_hash) {
let addresstype = outputtype;
if let Some(&ti) = already_added_addresshash.get(&address_hash) {
ti
} else {
let ti = match outputtype {
let ti = match addresstype {
OutputType::P2PK65 => indexes.p2pk65addressindex.copy_then_increment(),
OutputType::P2PK33 => indexes.p2pk33addressindex.copy_then_increment(),
OutputType::P2PKH => indexes.p2pkhaddressindex.copy_then_increment(),
OutputType::P2MS => {
vecs.p2msoutputindex_to_txindex
.push_if_needed(indexes.p2msoutputindex, txindex)?;
indexes.p2msoutputindex.copy_then_increment()
}
OutputType::P2SH => indexes.p2shaddressindex.copy_then_increment(),
OutputType::OpReturn => {
vecs.opreturnindex_to_txindex
.push_if_needed(indexes.opreturnindex, txindex)?;
indexes.opreturnindex.copy_then_increment()
}
OutputType::P2WPKH => indexes.p2wpkhaddressindex.copy_then_increment(),
OutputType::P2WSH => indexes.p2wshaddressindex.copy_then_increment(),
OutputType::P2TR => indexes.p2traddressindex.copy_then_increment(),
OutputType::P2A => indexes.p2aaddressindex.copy_then_increment(),
OutputType::Empty => {
vecs.emptyoutputindex_to_txindex
.push_if_needed(indexes.emptyoutputindex, txindex)?;
indexes.emptyoutputindex.copy_then_increment()
}
OutputType::Unknown => {
vecs.unknownoutputindex_to_txindex
.push_if_needed(indexes.unknownoutputindex, txindex)?;
indexes.unknownoutputindex.copy_then_increment()
}
_ => unreachable!(),
};
already_added_addressbyteshash.insert(address_hash, ti);
stores.addressbyteshash_to_typeindex.insert_if_needed(
address_hash,
ti,
height,
);
already_added_addresshash.insert(address_hash, ti);
stores
.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(addresstype)
.insert_if_needed(address_hash, ti, height);
vecs.push_bytes_if_needed(ti, address_bytes)?;
ti
@@ -577,12 +562,9 @@ impl Indexer {
stores
.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.insert_if_needed(
AddressTypeAddressIndexTxIndex::from((
addresstype,
addressindex,
txindex,
)),
AddressIndexTxIndex::from((addressindex, txindex)),
Unit,
height,
);
@@ -598,12 +580,9 @@ impl Indexer {
stores
.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.insert_if_needed(
AddressTypeAddressIndexOutPoint::from((
addresstype,
addressindex,
outpoint,
)),
AddressIndexOutPoint::from((addressindex, outpoint)),
Unit,
height,
);
@@ -651,22 +630,17 @@ impl Indexer {
stores
.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.insert_if_needed(
AddressTypeAddressIndexTxIndex::from((addresstype, addressindex, txindex)),
AddressIndexTxIndex::from((addressindex, txindex)),
Unit,
height,
);
stores
.addresstype_to_addressindex_and_unspentoutpoint
.remove_if_needed(
AddressTypeAddressIndexOutPoint::from((
addresstype,
addressindex,
outpoint,
)),
height,
);
.get_mut_unwrap(addresstype)
.remove_if_needed(AddressIndexOutPoint::from((addressindex, outpoint)), height);
}
// println!("txins.into_iter(): {:?}", i.elapsed());

View File

@@ -1,13 +1,14 @@
use std::{fs, path::Path};
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, Mode, StoreFjallV2 as Store, Type};
use brk_types::{
AddressBytes, AddressBytesHash, AddressTypeAddressIndexOutPoint,
AddressTypeAddressIndexTxIndex, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex,
TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout,
AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, BlockHashPrefix, Height,
OutPoint, OutputType, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex, Unit, Version,
Vout,
};
use fjall2::{PersistMode, TransactionalKeyspace};
use fjall2::{CompressionType as Compression, PersistMode, TransactionalKeyspace};
use rayon::prelude::*;
use vecdb::{AnyVec, GenericStoredVec, StoredIndex, VecIterator, VecIteratorExtended};
@@ -19,13 +20,17 @@ use super::Vecs;
pub struct Stores {
pub keyspace: TransactionalKeyspace,
pub addressbyteshash_to_typeindex: Store<AddressBytesHash, TypeIndex>,
// pub addresshash_to_typeindex: Store<AddressHash, TypeIndex>,
pub addresstype_to_addresshash_to_addressindex: ByAddressType<Store<AddressHash, TypeIndex>>,
// pub addresstype_to_addressindex_and_txindex: Store<AddressTypeAddressIndexTxIndex, Unit>,
pub addresstype_to_addressindex_and_txindex: ByAddressType<Store<AddressIndexTxIndex, Unit>>,
// pub addresstype_to_addressindex_and_unspentoutpoint:
// Store<AddressTypeAddressIndexOutPoint, Unit>,
pub addresstype_to_addressindex_and_unspentoutpoint:
ByAddressType<Store<AddressIndexOutPoint, Unit>>,
pub blockhashprefix_to_height: Store<BlockHashPrefix, Height>,
pub height_to_coinbase_tag: Store<Height, StoredString>,
pub txidprefix_to_txindex: Store<TxidPrefix, TxIndex>,
pub addresstype_to_addressindex_and_txindex: Store<AddressTypeAddressIndexTxIndex, Unit>,
pub addresstype_to_addressindex_and_unspentoutpoint:
Store<AddressTypeAddressIndexOutPoint, Unit>,
}
impl Stores {
@@ -45,6 +50,39 @@ impl Stores {
let keyspace_ref = &keyspace;
let create_addresshash_to_addressindex_store = |index| {
Store::import(
keyspace_ref,
path,
&format!("h2i{}", index),
version,
Mode::UniquePushOnly(Type::Random),
Compression::Lz4,
)
};
let create_addressindex_to_txindex_store = |index| {
Store::import(
keyspace_ref,
path,
&format!("a2t{}", index),
version,
Mode::VecLike,
Compression::Lz4,
)
};
let create_addressindex_to_unspentoutpoint_store = |index| {
Store::import(
keyspace_ref,
path,
&format!("a2u{}", index),
version,
Mode::VecLike,
Compression::Lz4,
)
};
Ok(Self {
keyspace: keyspace.clone(),
@@ -54,13 +92,18 @@ impl Stores {
"h2c",
version,
Mode::UniquePushOnly(Type::Sequential),
Compression::Lz4,
)?,
addressbyteshash_to_typeindex: Store::import(
keyspace_ref,
path,
"a2t",
version,
Mode::UniquePushOnly(Type::Random),
// addresshash_to_typeindex: Store::import(
// keyspace_ref,
// path,
// "a2t",
// version,
// Mode::UniquePushOnly(Type::Random),
// Compression::Lz4,
// )?,
addresstype_to_addresshash_to_addressindex: ByAddressType::new_with_index(
create_addresshash_to_addressindex_store,
)?,
blockhashprefix_to_height: Store::import(
keyspace_ref,
@@ -68,6 +111,7 @@ impl Stores {
"b2h",
version,
Mode::UniquePushOnly(Type::Random),
Compression::Lz4,
)?,
txidprefix_to_txindex: Store::import(
keyspace_ref,
@@ -75,20 +119,29 @@ impl Stores {
"t2t",
version,
Mode::UniquePushOnly(Type::Random),
Compression::Lz4,
)?,
addresstype_to_addressindex_and_txindex: Store::import(
keyspace_ref,
path,
"aat",
version,
Mode::VecLike,
// addresstype_to_addressindex_and_txindex: Store::import(
// keyspace_ref,
// path,
// "aat",
// version,
// Mode::VecLike,
// Compression::Lz4,
// )?,
addresstype_to_addressindex_and_txindex: ByAddressType::new_with_index(
create_addressindex_to_txindex_store,
)?,
addresstype_to_addressindex_and_unspentoutpoint: Store::import(
keyspace_ref,
path,
"aau",
version,
Mode::VecLike,
// addresstype_to_addressindex_and_unspentoutpoint: Store::import(
// keyspace_ref,
// path,
// "aau",
// version,
// Mode::VecLike,
// Compression::Lz4,
// )?,
addresstype_to_addressindex_and_unspentoutpoint: ByAddressType::new_with_index(
create_addressindex_to_unspentoutpoint_store,
)?,
})
}
@@ -106,14 +159,29 @@ impl Stores {
pub fn commit(&mut self, height: Height) -> Result<()> {
[
&mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore,
&mut self.blockhashprefix_to_height,
&mut self.blockhashprefix_to_height as &mut dyn AnyStore,
&mut self.height_to_coinbase_tag,
&mut self.txidprefix_to_txindex,
&mut self.addresstype_to_addressindex_and_txindex,
&mut self.addresstype_to_addressindex_and_unspentoutpoint,
// &mut self.addresshash_to_typeindex
// &mut self.addresstype_to_addressindex_and_txindex,
// &mut self.addresstype_to_addressindex_and_unspentoutpoint,
]
.into_par_iter() // Changed from par_iter_mut()
.into_par_iter()
.chain(
self.addresstype_to_addresshash_to_addressindex
.par_iter_mut()
.map(|s| s as &mut dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_txindex
.par_iter_mut()
.map(|s| s as &mut dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_unspentoutpoint
.par_iter_mut()
.map(|s| s as &mut dyn AnyStore),
) // Changed from par_iter_mut()
.try_for_each(|store| store.commit(height))?;
self.keyspace
@@ -123,14 +191,29 @@ impl Stores {
fn iter_any_store(&self) -> impl Iterator<Item = &dyn AnyStore> {
[
&self.addressbyteshash_to_typeindex as &dyn AnyStore,
&self.blockhashprefix_to_height,
&self.blockhashprefix_to_height as &dyn AnyStore,
&self.height_to_coinbase_tag,
&self.txidprefix_to_txindex,
&self.addresstype_to_addressindex_and_txindex,
&self.addresstype_to_addressindex_and_unspentoutpoint,
// &self.addresshash_to_typeindex,
// &self.addresstype_to_addressindex_and_txindex,
// &self.addresstype_to_addressindex_and_unspentoutpoint,
]
.into_iter()
.chain(
self.addresstype_to_addresshash_to_addressindex
.iter()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_txindex
.iter()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_addressindex_and_unspentoutpoint
.iter()
.map(|s| s as &dyn AnyStore),
)
}
pub fn rollback_if_needed(
@@ -138,14 +221,26 @@ impl Stores {
vecs: &mut Vecs,
starting_indexes: &Indexes,
) -> Result<()> {
if self.addressbyteshash_to_typeindex.is_empty()?
&& self.blockhashprefix_to_height.is_empty()?
if self.blockhashprefix_to_height.is_empty()?
&& self.txidprefix_to_txindex.is_empty()?
&& self.height_to_coinbase_tag.is_empty()?
&& self.addresstype_to_addressindex_and_txindex.is_empty()?
// && self.addresshash_to_typeindex.is_empty()?
// && self.addresstype_to_addressindex_and_txindex.is_empty()?
// && self
// .addresstype_to_addressindex_and_unspentoutpoint
// .is_empty()?
&& self
.addresstype_to_addresshash_to_addressindex
.iter()
.try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?
&& self
.addresstype_to_addressindex_and_txindex
.iter()
.try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?
&& self
.addresstype_to_addressindex_and_unspentoutpoint
.is_empty()?
.iter()
.try_fold(true, |acc, s| s.is_empty().map(|empty| acc && empty))?
{
return Ok(());
}
@@ -174,8 +269,10 @@ impl Stores {
while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2PK65)
.remove(hash);
index.increment();
}
}
@@ -189,8 +286,10 @@ impl Stores {
while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2PK33)
.remove(hash);
index.increment();
}
}
@@ -204,8 +303,10 @@ impl Stores {
while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2PKH)
.remove(hash);
index.increment();
}
}
@@ -219,23 +320,10 @@ impl Stores {
while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2traddressindex
.read_once(starting_indexes.height)
{
let mut p2traddressindex_to_p2trbytes_iter =
vecs.p2traddressindex_to_p2trbytes.iter()?;
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2SH)
.remove(hash);
index.increment();
}
}
@@ -249,8 +337,10 @@ impl Stores {
while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2WPKH)
.remove(hash);
index.increment();
}
}
@@ -264,8 +354,27 @@ impl Stores {
while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2WSH)
.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2traddressindex
.read_once(starting_indexes.height)
{
let mut p2traddressindex_to_p2trbytes_iter =
vecs.p2traddressindex_to_p2trbytes.iter()?;
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2TR)
.remove(hash);
index.increment();
}
}
@@ -279,15 +388,17 @@ impl Stores {
while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresstype_to_addresshash_to_addressindex
.get_mut_unwrap(OutputType::P2A)
.remove(hash);
index.increment();
}
}
} else {
unreachable!();
// self.blockhashprefix_to_height.reset()?;
// self.addressbyteshash_to_typeindex.reset()?;
// self.addresshash_to_typeindex.reset()?;
}
if starting_indexes.txindex != TxIndex::ZERO {
@@ -333,9 +444,9 @@ impl Stores {
.for_each(|((txoutindex, addresstype), addressindex)| {
let txindex = txoutindex_to_txindex_iter.get_at_unwrap(txoutindex);
self.addresstype_to_addressindex_and_txindex.remove(
AddressTypeAddressIndexTxIndex::from((addresstype, addressindex, txindex)),
);
self.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.remove(AddressIndexTxIndex::from((addressindex, txindex)));
let vout = Vout::from(
txoutindex.to_usize()
@@ -345,13 +456,9 @@ impl Stores {
);
let outpoint = OutPoint::new(txindex, vout);
self.addresstype_to_addressindex_and_unspentoutpoint.remove(
AddressTypeAddressIndexOutPoint::from((
addresstype,
addressindex,
outpoint,
)),
);
self.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.remove(AddressIndexOutPoint::from((addressindex, outpoint)));
});
// Add back outputs that were spent after the rollback point
@@ -380,22 +487,13 @@ impl Stores {
let addresstype = outputtype;
let addressindex = txoutindex_to_typeindex_iter.get_unwrap(txoutindex);
self.addresstype_to_addressindex_and_txindex.remove(
AddressTypeAddressIndexTxIndex::from((
addresstype,
addressindex,
txindex,
)),
);
self.addresstype_to_addressindex_and_txindex
.get_mut_unwrap(addresstype)
.remove(AddressIndexTxIndex::from((addressindex, txindex)));
self.addresstype_to_addressindex_and_unspentoutpoint.insert(
AddressTypeAddressIndexOutPoint::from((
addresstype,
addressindex,
outpoint,
)),
Unit,
);
self.addresstype_to_addressindex_and_unspentoutpoint
.get_mut_unwrap(addresstype)
.insert(AddressIndexOutPoint::from((addressindex, outpoint)), Unit);
}
}
});

View File

@@ -3,9 +3,9 @@ use std::{fs, path::Path};
use brk_error::Result;
use brk_store::{AnyStore, Kind3, Mode3, StoreFjallV3 as Store};
use brk_types::{
AddressBytes, AddressBytesHash, AddressTypeAddressIndexOutPoint,
AddressTypeAddressIndexTxIndex, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex,
TxOutIndex, TxidPrefix, TypeIndex, Unit, Version, Vout,
AddressBytes, AddressHash, AddressTypeAddressIndexOutPoint, AddressTypeAddressIndexTxIndex,
BlockHashPrefix, Height, OutPoint, StoredString, TxIndex, TxOutIndex, TxidPrefix, TypeIndex,
Unit, Version, Vout,
};
use fjall3::{Database, PersistMode};
use rayon::prelude::*;
@@ -19,7 +19,7 @@ use super::Vecs;
pub struct Stores {
pub database: Database,
pub addressbyteshash_to_typeindex: Store<AddressBytesHash, TypeIndex>,
pub addresshash_to_typeindex: Store<AddressHash, TypeIndex>,
pub blockhashprefix_to_height: Store<BlockHashPrefix, Height>,
pub height_to_coinbase_tag: Store<Height, StoredString>,
pub txidprefix_to_txindex: Store<TxidPrefix, TxIndex>,
@@ -56,10 +56,10 @@ impl Stores {
Mode3::PushOnly,
Kind3::Sequential,
)?,
addressbyteshash_to_typeindex: Store::import(
addresshash_to_typeindex: Store::import(
database_ref,
path,
"addressbyteshash_to_typeindex",
"addresshash_to_typeindex",
version,
Mode3::PushOnly,
Kind3::Random,
@@ -112,7 +112,7 @@ impl Stores {
pub fn commit(&mut self, height: Height) -> Result<()> {
[
&mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore,
&mut self.addresshash_to_typeindex as &mut dyn AnyStore,
&mut self.blockhashprefix_to_height,
&mut self.height_to_coinbase_tag,
&mut self.txidprefix_to_txindex,
@@ -129,7 +129,7 @@ impl Stores {
fn iter_any_store(&self) -> impl Iterator<Item = &dyn AnyStore> {
[
&self.addressbyteshash_to_typeindex as &dyn AnyStore,
&self.addresshash_to_typeindex as &dyn AnyStore,
&self.blockhashprefix_to_height,
&self.height_to_coinbase_tag,
&self.txidprefix_to_txindex,
@@ -144,7 +144,7 @@ impl Stores {
vecs: &mut Vecs,
starting_indexes: &Indexes,
) -> Result<()> {
if self.addressbyteshash_to_typeindex.is_empty()?
if self.addresshash_to_typeindex.is_empty()?
&& self.blockhashprefix_to_height.is_empty()?
&& self.txidprefix_to_txindex.is_empty()?
&& self.height_to_coinbase_tag.is_empty()?
@@ -180,8 +180,8 @@ impl Stores {
while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresshash_to_typeindex.remove(hash);
index.increment();
}
}
@@ -195,8 +195,8 @@ impl Stores {
while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresshash_to_typeindex.remove(hash);
index.increment();
}
}
@@ -210,8 +210,8 @@ impl Stores {
while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresshash_to_typeindex.remove(hash);
index.increment();
}
}
@@ -225,8 +225,8 @@ impl Stores {
while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresshash_to_typeindex.remove(hash);
index.increment();
}
}
@@ -240,8 +240,8 @@ impl Stores {
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresshash_to_typeindex.remove(hash);
index.increment();
}
}
@@ -255,8 +255,8 @@ impl Stores {
while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresshash_to_typeindex.remove(hash);
index.increment();
}
}
@@ -270,8 +270,8 @@ impl Stores {
while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresshash_to_typeindex.remove(hash);
index.increment();
}
}
@@ -285,15 +285,15 @@ impl Stores {
while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
let hash = AddressHash::from(&bytes);
self.addresshash_to_typeindex.remove(hash);
index.increment();
}
}
} else {
unreachable!();
// self.blockhashprefix_to_height.reset()?;
// self.addressbyteshash_to_typeindex.reset()?;
// self.addresshash_to_typeindex.reset()?;
}
if starting_indexes.txindex != TxIndex::ZERO {

View File

@@ -37,7 +37,7 @@ Get the count of unique metrics.
async fn get_metric_count(&self) -> Result<CallToolResult, McpError> {
info!("mcp: distinct_metric_count");
Ok(CallToolResult::success(vec![
Content::json(self.query.distinct_metric_count()).unwrap(),
Content::json(self.query.distinct_metric_count().await).unwrap(),
]))
}
@@ -47,7 +47,7 @@ Get the count of all metrics. (distinct metrics multiplied by the number of inde
async fn get_vec_count(&self) -> Result<CallToolResult, McpError> {
info!("mcp: total_metric_count");
Ok(CallToolResult::success(vec![
Content::json(self.query.total_metric_count()).unwrap(),
Content::json(self.query.total_metric_count().await).unwrap(),
]))
}
@@ -57,7 +57,7 @@ Get the list of all existing indexes and their accepted variants.
async fn get_indexes(&self) -> Result<CallToolResult, McpError> {
info!("mcp: get_indexes");
Ok(CallToolResult::success(vec![
Content::json(self.query.get_indexes()).unwrap(),
Content::json(self.query.get_indexes().await).unwrap(),
]))
}
@@ -72,7 +72,7 @@ If the `page` param is omitted, it will default to the first page.
) -> Result<CallToolResult, McpError> {
info!("mcp: get_metrics");
Ok(CallToolResult::success(vec![
Content::json(self.query.get_metrics(pagination)).unwrap(),
Content::json(self.query.get_metrics(pagination).await).unwrap(),
]))
}
@@ -87,7 +87,7 @@ If the `page` param is omitted, it will default to the first page.
) -> Result<CallToolResult, McpError> {
info!("mcp: get_index_to_vecids");
Ok(CallToolResult::success(vec![
Content::json(self.query.get_index_to_vecids(paginated_index)).unwrap(),
Content::json(self.query.get_index_to_vecids(paginated_index).await).unwrap(),
]))
}
@@ -101,7 +101,7 @@ The list will be empty if the vec id isn't correct.
) -> Result<CallToolResult, McpError> {
info!("mcp: get_vecid_to_indexes");
Ok(CallToolResult::success(vec![
Content::json(self.query.metric_to_indexes(param.id)).unwrap(),
Content::json(self.query.metric_to_indexes(param.id).await).unwrap(),
]))
}
@@ -112,10 +112,13 @@ The response's format will depend on the given parameters, it will be:
- A list: If requested only one vec and the given range returns multiple values (for example: `from=-1000&count=100` or `from=-444&to=-333`)
- A matrix: When multiple vecs are requested, even if they each return one value.
")]
fn get_vecs(&self, Parameters(params): Parameters<Params>) -> Result<CallToolResult, McpError> {
async fn get_vecs(
&self,
Parameters(params): Parameters<Params>,
) -> Result<CallToolResult, McpError> {
info!("mcp: get_vecs");
Ok(CallToolResult::success(vec![Content::text(
match self.query.search_and_format(params) {
match self.query.search_and_format(params).await {
Ok(output) => output.to_string(),
Err(e) => format!("Error:\n{e}"),
},

View File

@@ -1,18 +1,130 @@
// Should be async
// anything related to IO should use
//
// Sync function
// fn get(db: &CandyStore, key: &str) -> Option<Vec<u8>> {
// db.get(key).ok().flatten()
// }
use std::collections::BTreeMap;
use crate::Query;
use brk_computer::Computer;
use brk_error::Result;
use brk_indexer::Indexer;
use brk_reader::Reader;
use brk_types::{
Address, AddressStats, Height, Index, IndexInfo, Limit, Metric, MetricCount, Transaction,
TreeNode, TxidPath,
};
#[cfg(feature = "tokio")]
use tokio::task::spawn_blocking;
use crate::{
Output, PaginatedIndexParam, PaginatedMetrics, PaginationParam, Params, ParamsOpt, Query,
vecs::{IndexToVec, MetricToVec, Vecs},
};
// // Async function
// async fn get_async(db: Arc<CandyStore>, key: String) -> Option<Vec<u8>> {
// tokio::task::spawn_blocking(move || {
// db.get(&key).ok().flatten()
// }).await.ok()?
// }
#[derive(Clone)]
#[cfg(feature = "tokio")]
pub struct AsyncQuery(Query);
impl AsyncQuery {
pub async fn build(reader: &Reader, indexer: &Indexer, computer: &Computer) -> Self {
Self(Query::build(reader, indexer, computer))
}
pub async fn get_height(&self) -> Height {
self.0.get_height()
}
pub async fn get_address(&self, address: Address) -> Result<AddressStats> {
let query = self.0.clone();
spawn_blocking(move || query.get_address(address)).await?
}
pub async fn get_transaction(&self, txid: TxidPath) -> Result<Transaction> {
let query = self.0.clone();
spawn_blocking(move || query.get_transaction(txid)).await?
}
pub async fn match_metric(&self, metric: Metric, limit: Limit) -> Result<Vec<&'static str>> {
let query = self.0.clone();
spawn_blocking(move || Ok(query.match_metric(&metric, limit))).await?
}
// pub async fn search_metric_with_index(
// &self,
// metric: &str,
// index: Index,
// // params: &Params,
// ) -> Result<Vec<(String, &&dyn AnyCollectableVec)>> {
// let query = self.0.clone();
// spawn_blocking(move || query.search_metric_with_index(metric, index)).await?
// }
// pub async fn format(
// &self,
// metrics: Vec<(String, &&dyn AnyCollectableVec)>,
// params: &ParamsOpt,
// ) -> Result<Output> {
// let query = self.0.clone();
// spawn_blocking(move || query.format(metrics, params)).await?
// }
pub async fn search_and_format(&self, params: Params) -> Result<Output> {
let query = self.0.clone();
spawn_blocking(move || query.search_and_format(params)).await?
}
pub async fn metric_to_index_to_vec(&self) -> &BTreeMap<&str, IndexToVec<'_>> {
self.0.metric_to_index_to_vec()
}
pub async fn index_to_metric_to_vec(&self) -> &BTreeMap<Index, MetricToVec<'_>> {
self.0.index_to_metric_to_vec()
}
pub async fn metric_count(&self) -> MetricCount {
self.0.metric_count()
}
pub async fn distinct_metric_count(&self) -> usize {
self.0.distinct_metric_count()
}
pub async fn total_metric_count(&self) -> usize {
self.0.total_metric_count()
}
pub async fn get_indexes(&self) -> &[IndexInfo] {
self.0.get_indexes()
}
pub async fn get_metrics(&self, pagination: PaginationParam) -> PaginatedMetrics {
self.0.get_metrics(pagination)
}
pub async fn get_metrics_catalog(&self) -> &TreeNode {
self.0.get_metrics_catalog()
}
pub async fn get_index_to_vecids(&self, paginated_index: PaginatedIndexParam) -> Vec<&str> {
self.0.get_index_to_vecids(paginated_index)
}
pub async fn metric_to_indexes(&self, metric: String) -> Option<&Vec<Index>> {
self.0.metric_to_indexes(metric)
}
#[inline]
pub async fn reader(&self) -> &Reader {
self.0.reader()
}
#[inline]
pub async fn indexer(&self) -> &Indexer {
self.0.indexer()
}
#[inline]
pub async fn computer(&self) -> &Computer {
self.0.computer()
}
#[inline]
pub async fn vecs(&self) -> &'static Vecs<'static> {
self.0.vecs()
}
}

View File

@@ -3,10 +3,10 @@ use std::str::FromStr;
use bitcoin::{Network, PublicKey, ScriptBuf};
use brk_error::{Error, Result};
use brk_types::{
Address, AddressBytes, AddressBytesHash, AddressChainStats, AddressMempoolStats, AddressStats,
Address, AddressBytes, AddressChainStats, AddressHash, AddressMempoolStats, AddressStats,
AnyAddressDataIndexEnum, OutputType,
};
use vecdb::{AnyIterableVec, VecIterator};
use vecdb::{AnyIterableVec, VecIteratorExtended};
use crate::Query;
@@ -27,14 +27,17 @@ pub fn get_address(Address { address }: Address, query: &Query) -> Result<Addres
return Err(Error::InvalidAddress);
};
let type_ = OutputType::from(&script);
let Ok(bytes) = AddressBytes::try_from((&script, type_)) else {
let outputtype = OutputType::from(&script);
let Ok(bytes) = AddressBytes::try_from((&script, outputtype)) else {
return Err(Error::Str("Failed to convert the address to bytes"));
};
let hash = AddressBytesHash::from(&bytes);
let addresstype = outputtype;
let hash = AddressHash::from(&bytes);
let Ok(Some(type_index)) = stores
.addressbyteshash_to_typeindex
.addresstype_to_addresshash_to_addressindex
.get(addresstype)
.unwrap()
.get(&hash)
.map(|opt| opt.map(|cow| cow.into_owned()))
else {
@@ -50,57 +53,63 @@ pub fn get_address(Address { address }: Address, query: &Query) -> Result<Addres
.iter()
.last()
.unwrap()
.1
.into_owned()
});
let any_address_index = match type_ {
let any_address_index = match outputtype {
OutputType::P2PK33 => stateful
.p2pk33addressindex_to_anyaddressindex
.iter()
.unwrap_get_inner(type_index.into()),
.any_address_indexes
.p2pk33
.iter()?
.get_unwrap(type_index.into()),
OutputType::P2PK65 => stateful
.p2pk65addressindex_to_anyaddressindex
.iter()
.unwrap_get_inner(type_index.into()),
.any_address_indexes
.p2pk65
.iter()?
.get_unwrap(type_index.into()),
OutputType::P2PKH => stateful
.p2pkhaddressindex_to_anyaddressindex
.iter()
.unwrap_get_inner(type_index.into()),
.any_address_indexes
.p2pkh
.iter()?
.get_unwrap(type_index.into()),
OutputType::P2SH => stateful
.p2shaddressindex_to_anyaddressindex
.iter()
.unwrap_get_inner(type_index.into()),
.any_address_indexes
.p2sh
.iter()?
.get_unwrap(type_index.into()),
OutputType::P2TR => stateful
.p2traddressindex_to_anyaddressindex
.iter()
.unwrap_get_inner(type_index.into()),
.any_address_indexes
.p2tr
.iter()?
.get_unwrap(type_index.into()),
OutputType::P2WPKH => stateful
.p2wpkhaddressindex_to_anyaddressindex
.iter()
.unwrap_get_inner(type_index.into()),
.any_address_indexes
.p2wpkh
.iter()?
.get_unwrap(type_index.into()),
OutputType::P2WSH => stateful
.p2wshaddressindex_to_anyaddressindex
.iter()
.unwrap_get_inner(type_index.into()),
.any_address_indexes
.p2wsh
.iter()?
.get_unwrap(type_index.into()),
OutputType::P2A => stateful
.p2aaddressindex_to_anyaddressindex
.iter()
.unwrap_get_inner(type_index.into()),
.any_address_indexes
.p2a
.iter()?
.get_unwrap(type_index.into()),
t => {
return Err(Error::UnsupportedType(t.to_string()));
}
};
let address_data = match any_address_index.to_enum() {
AnyAddressDataIndexEnum::Loaded(index) => stateful
.loadedaddressindex_to_loadedaddressdata
.iter()
.unwrap_get_inner(index),
AnyAddressDataIndexEnum::Loaded(index) => {
stateful.addresses_data.loaded.iter()?.get_unwrap(index)
}
AnyAddressDataIndexEnum::Empty(index) => stateful
.emptyaddressindex_to_emptyaddressdata
.iter()
.unwrap_get_inner(index)
.addresses_data
.empty
.iter()?
.get_unwrap(index)
.into(),
};

View File

@@ -8,11 +8,11 @@ use bitcoin::consensus::Decodable;
use brk_error::{Error, Result};
use brk_reader::XORIndex;
use brk_types::{Transaction, Txid, TxidPath, TxidPrefix};
use vecdb::VecIterator;
use vecdb::VecIteratorExtended;
use crate::Query;
pub fn get_transaction_info(TxidPath { txid }: TxidPath, query: &Query) -> Result<Transaction> {
pub fn get_transaction(TxidPath { txid }: TxidPath, query: &Query) -> Result<Transaction> {
let Ok(txid) = bitcoin::Txid::from_str(&txid) else {
return Err(Error::InvalidTxid);
};
@@ -29,21 +29,13 @@ pub fn get_transaction_info(TxidPath { txid }: TxidPath, query: &Query) -> Resul
return Err(Error::UnknownTxid);
};
let txid = indexer.vecs.txindex_to_txid.iter().unwrap_get_inner(index);
let txid = indexer.vecs.txindex_to_txid.iter()?.get_unwrap(index);
let reader = query.reader();
let computer = query.computer();
let position = computer
.blks
.txindex_to_position
.iter()
.unwrap_get_inner(index);
let len = indexer
.vecs
.txindex_to_total_size
.iter()
.unwrap_get_inner(index);
let position = computer.blks.txindex_to_position.iter()?.get_unwrap(index);
let len = indexer.vecs.txindex_to_total_size.iter()?.get_unwrap(index);
let blk_index_to_blk_path = reader.blk_index_to_blk_path();

View File

@@ -28,7 +28,7 @@ pub use params::{Params, ParamsDeprec, ParamsOpt};
use vecs::Vecs;
use crate::{
chain::{get_address, get_transaction_info},
chain::{get_address, get_transaction},
vecs::{IndexToVec, MetricToVec},
};
@@ -64,11 +64,11 @@ impl Query {
get_address(address, self)
}
pub fn get_transaction_info(&self, txid: TxidPath) -> Result<Transaction> {
get_transaction_info(txid, self)
pub fn get_transaction(&self, txid: TxidPath) -> Result<Transaction> {
get_transaction(txid, self)
}
pub fn match_metric(&self, metric: &Metric, limit: Limit) -> Vec<&str> {
pub fn match_metric(&self, metric: &Metric, limit: Limit) -> Vec<&'static str> {
self.vecs().matches(metric, limit)
}

View File

@@ -89,9 +89,7 @@ impl<'a> Vecs<'a> {
.iter()
.map(|(index, id_to_vec)| (*index, id_to_vec.keys().cloned().collect::<Vec<_>>()))
.collect();
this.index_to_metrics
.values_mut()
.for_each(|ids| sort_ids(ids));
this.index_to_metrics.values_mut().for_each(sort_ids);
this.catalog.replace(
TreeNode::Branch(
[

View File

@@ -30,14 +30,11 @@ impl AddressRoutes for ApiRouter<AppState> {
Path(address): Path<Address>,
State(state): State<AppState>
| {
let etag = format!("{VERSION}-{}", state.get_height());
let etag = format!("{VERSION}-{}", state.get_height().await);
if headers.has_etag(&etag) {
return Response::new_not_modified();
}
match state.get_address(address).with_status() {
Ok(value) => Response::new_json(&value, &etag),
Err((status, message)) => Response::new_json_with(status, &message, &etag)
}
state.get_address(address).await.to_json_response(&etag)
}, |op| op
.addresses_tag()
.summary("Address information")

View File

@@ -11,7 +11,7 @@ use brk_types::{Index, IndexInfo, Limit, Metric, MetricCount, Metrics};
use crate::{
VERSION,
extended::{HeaderMapExtended, ResponseExtended, TransformResponseExtended},
extended::{HeaderMapExtended, ResponseExtended, ResultExtended, TransformResponseExtended},
};
use super::AppState;
@@ -38,7 +38,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
if headers.has_etag(etag) {
return Response::new_not_modified();
}
Response::new_json(state.metric_count(), etag)
Response::new_json(state.metric_count().await, etag)
},
|op| op
.metrics_tag()
@@ -59,7 +59,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
if headers.has_etag(etag) {
return Response::new_not_modified();
}
Response::new_json(state.get_indexes(), etag)
state.get_indexes().await.to_json_response(etag)
},
|op| op
.metrics_tag()
@@ -83,7 +83,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
if headers.has_etag(etag) {
return Response::new_not_modified();
}
Response::new_json(state.get_metrics(pagination), etag)
Response::new_json(state.get_metrics(pagination).await, etag)
},
|op| op
.metrics_tag()
@@ -101,7 +101,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
if headers.has_etag(etag) {
return Response::new_not_modified();
}
Response::new_json(state.get_metrics_catalog(), etag)
Response::new_json(state.get_metrics_catalog().await, etag)
},
|op| op
.metrics_tag()
@@ -126,7 +126,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
if headers.has_etag(etag) {
return Response::new_not_modified();
}
Response::new_json(state.match_metric(&metric, limit), etag)
state.match_metric(metric, limit).await.to_json_response(etag)
},
|op| op
.metrics_tag()
@@ -151,7 +151,7 @@ impl ApiMetricsRoutes for ApiRouter<AppState> {
if let Some(indexes) = state.metric_to_indexes(metric.clone()) {
return Response::new_json(indexes, etag)
}
let value = if let Some(first) = state.match_metric(&metric, Limit::MIN).first() {
let value = if let Some(first) = state.match_metric(metric, Limit::MIN).await?.first() {
format!("Could not find '{metric}', did you mean '{first}' ?")
} else {
format!("Could not find '{metric}'.")

View File

@@ -31,14 +31,11 @@ impl TxRoutes for ApiRouter<AppState> {
Path(txid): Path<TxidPath>,
State(state): State<AppState>
| {
let etag = format!("{VERSION}-{}", state.get_height());
let etag = format!("{VERSION}-{}", state.get_height().await);
if headers.has_etag(&etag) {
return Response::new_not_modified();
}
match state.get_transaction_info(txid).with_status() {
Ok(value) => Response::new_json(&value, &etag),
Err((status, message)) => Response::new_json_with(status, &message, &etag)
}
state.get_transaction(txid).await.to_json_response(&etag)
},
|op| op
.transactions_tag()

View File

@@ -1,8 +1,13 @@
use axum::http::StatusCode;
use axum::{http::StatusCode, response::Response};
use brk_error::{Error, Result};
use crate::extended::ResponseExtended;
pub trait ResultExtended<T> {
fn with_status(self) -> Result<T, (StatusCode, String)>;
fn to_json_response(self, etag: &str) -> Response
where
T: sonic_rs::Serialize;
}
impl<T> ResultExtended<T> for Result<T> {
@@ -21,4 +26,14 @@ impl<T> ResultExtended<T> for Result<T> {
)
})
}
fn to_json_response(self, etag: &str) -> Response
where
T: sonic_rs::Serialize,
{
match self.with_status() {
Ok(value) => Response::new_json(&value, etag),
Err((status, message)) => Response::new_json_with(status, &message, etag),
}
}
}

View File

@@ -4,8 +4,8 @@ use brk_error::Result;
use brk_types::{Height, Version};
use byteview6::ByteView;
use fjall2::{
InnerItem, PartitionCreateOptions, TransactionalKeyspace, TransactionalPartitionHandle,
ValueType,
CompressionType, InnerItem, PartitionCreateOptions, TransactionalKeyspace,
TransactionalPartitionHandle, ValueType,
};
use rustc_hash::{FxHashMap, FxHashSet};
@@ -23,7 +23,6 @@ pub struct StoreFjallV2<Key, Value> {
partition: TransactionalPartitionHandle,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
mode: Mode,
}
const MAJOR_FJALL_VERSION: Version = Version::TWO;
@@ -44,8 +43,11 @@ where
keyspace: &TransactionalKeyspace,
name: &str,
mode: Mode,
compression: CompressionType,
) -> Result<TransactionalPartitionHandle> {
let mut options = PartitionCreateOptions::default().manual_journal_persist(true);
let mut options = PartitionCreateOptions::default()
.compression(compression)
.manual_journal_persist(true);
if mode.is_unique_push_only() {
options = options.bloom_filter_bits(Some(7));
@@ -64,6 +66,7 @@ where
name: &str,
version: Version,
mode: Mode,
compression: CompressionType,
) -> Result<Self> {
fs::create_dir_all(path)?;
@@ -72,7 +75,7 @@ where
&path.join(format!("meta/{name}")),
MAJOR_FJALL_VERSION + version,
|| {
Self::open_partition_handle(keyspace, name, mode).inspect_err(|e| {
Self::open_partition_handle(keyspace, name, mode, compression).inspect_err(|e| {
eprintln!("{e}");
eprintln!("Delete {path:?} and try again");
})
@@ -86,7 +89,6 @@ where
partition,
puts: FxHashMap::default(),
dels: FxHashSet::default(),
mode,
})
}
@@ -285,7 +287,7 @@ where
Item::Tomb(key) => Self {
key: key.into().into(),
value: [].into(),
value_type: ValueType::WeakTombstone,
value_type: ValueType::Tombstone,
},
}
}

View File

@@ -308,7 +308,7 @@ impl<K, V> Item<K, V> {
// keyspace_id,
// key: key.into().into(),
// value: [].into(),
// value_type: ValueType::WeakTombstone,
// value_type: ValueType::Tombstone,
// },
// }
// }

View File

@@ -35,23 +35,7 @@ impl AddressBytes {
}
pub fn hash(&self) -> u64 {
let mut slice = rapidhash::v3::rapidhash_v3(self.as_slice()).to_le_bytes();
slice[0] = slice[0].wrapping_add(self.index());
u64::from_ne_bytes(slice)
}
fn index(&self) -> u8 {
// DO NOT CHANGE !!!
match self {
AddressBytes::P2PK65(_) => 0,
AddressBytes::P2PK33(_) => 1,
AddressBytes::P2PKH(_) => 2,
AddressBytes::P2SH(_) => 3,
AddressBytes::P2WPKH(_) => 4,
AddressBytes::P2WSH(_) => 5,
AddressBytes::P2TR(_) => 6,
AddressBytes::P2A(_) => 7,
}
rapidhash::v3::rapidhash_v3(self.as_slice()).to_le()
}
}

View File

@@ -21,32 +21,32 @@ use super::AddressBytes;
KnownLayout,
Hash,
)]
pub struct AddressBytesHash(u64);
pub struct AddressHash(u64);
impl From<&AddressBytes> for AddressBytesHash {
impl From<&AddressBytes> for AddressHash {
#[inline]
fn from(address_bytes: &AddressBytes) -> Self {
Self(address_bytes.hash())
}
}
impl From<ByteView> for AddressBytesHash {
impl From<ByteView> for AddressHash {
#[inline]
fn from(value: ByteView) -> Self {
Self(u64::from_be_bytes(copy_first_8bytes(&value).unwrap()))
}
}
impl From<AddressBytesHash> for ByteView {
impl From<AddressHash> for ByteView {
#[inline]
fn from(value: AddressBytesHash) -> Self {
fn from(value: AddressHash) -> Self {
Self::from(&value)
}
}
impl From<&AddressBytesHash> for ByteView {
impl From<&AddressHash> for ByteView {
#[inline]
fn from(value: &AddressBytesHash) -> Self {
fn from(value: &AddressHash) -> Self {
Self::new(&value.0.to_be_bytes())
}
}

View File

@@ -17,7 +17,7 @@ pub struct AddressIndexOutPoint {
impl Hash for AddressIndexOutPoint {
fn hash<H: Hasher>(&self, state: &mut H) {
let mut buf = [0u8; 11];
let mut buf = [0u8; 10];
buf[0..8].copy_from_slice(self.addressindextxindex.as_bytes());
buf[8..].copy_from_slice(self.vout.as_bytes());
state.write(&buf);

View File

@@ -1,73 +0,0 @@
use std::hash::{Hash, Hasher};
use byteview::ByteView;
use serde::Serialize;
use zerocopy::IntoBytes;
use crate::{AddressTypeAddressIndexTxIndex, OutputType, Vout};
use super::{OutPoint, TypeIndex};
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)]
#[repr(C)]
pub struct AddressTypeAddressIndexOutPoint {
addresstypeaddressindextxindex: AddressTypeAddressIndexTxIndex, // (u8; u64)
vout: Vout, // u16
}
impl Hash for AddressTypeAddressIndexOutPoint {
fn hash<H: Hasher>(&self, state: &mut H) {
let mut buf = [0u8; 11];
buf[..1].copy_from_slice(self.addresstypeaddressindextxindex.addresstype().as_bytes());
buf[1..9].copy_from_slice(
self.addresstypeaddressindextxindex
.addressindextxindex()
.as_bytes(),
);
buf[9..].copy_from_slice(self.vout.as_bytes());
state.write(&buf);
}
}
impl From<(OutputType, TypeIndex, OutPoint)> for AddressTypeAddressIndexOutPoint {
#[inline]
fn from((addresstype, addressindex, outpoint): (OutputType, TypeIndex, OutPoint)) -> Self {
Self {
addresstypeaddressindextxindex: AddressTypeAddressIndexTxIndex::from((
addresstype,
addressindex,
outpoint.txindex(),
)),
vout: outpoint.vout(),
}
}
}
impl From<ByteView> for AddressTypeAddressIndexOutPoint {
#[inline]
fn from(value: ByteView) -> Self {
Self {
addresstypeaddressindextxindex: AddressTypeAddressIndexTxIndex::from(&value[0..9]),
vout: Vout::from(&value[9..]),
}
}
}
impl From<AddressTypeAddressIndexOutPoint> for ByteView {
#[inline]
fn from(value: AddressTypeAddressIndexOutPoint) -> Self {
ByteView::from(&value)
}
}
impl From<&AddressTypeAddressIndexOutPoint> for ByteView {
#[inline]
fn from(value: &AddressTypeAddressIndexOutPoint) -> Self {
ByteView::from(
[
&ByteView::from(value.addresstypeaddressindextxindex),
value.vout.to_be_bytes().as_slice(),
]
.concat(),
)
}
}

View File

@@ -1,88 +0,0 @@
use std::hash::{Hash, Hasher};
use byteview::ByteView;
use serde::Serialize;
use zerocopy::IntoBytes;
use crate::OutputType;
use super::{TxIndex, TypeIndex};
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)]
pub struct AddressTypeAddressIndexTxIndex {
addresstype: OutputType,
addressindextxindex: u64,
}
impl Hash for AddressTypeAddressIndexTxIndex {
fn hash<H: Hasher>(&self, state: &mut H) {
let mut buf = [0u8; 9];
buf[..1].copy_from_slice(self.addresstype.as_bytes());
buf[1..].copy_from_slice(self.addressindextxindex.as_bytes());
state.write(&buf);
}
}
impl AddressTypeAddressIndexTxIndex {
pub fn addresstype(&self) -> OutputType {
self.addresstype
}
pub fn addressindex(&self) -> u32 {
(self.addressindextxindex >> 32) as u32
}
pub fn txindex(&self) -> u32 {
self.addressindextxindex as u32
}
pub fn addressindextxindex(&self) -> u64 {
self.addressindextxindex
}
}
impl From<(OutputType, TypeIndex, TxIndex)> for AddressTypeAddressIndexTxIndex {
#[inline]
fn from((addresstype, addressindex, txindex): (OutputType, TypeIndex, TxIndex)) -> Self {
Self {
addresstype,
addressindextxindex: (u64::from(addressindex) << 32) | u64::from(txindex),
}
}
}
impl From<ByteView> for AddressTypeAddressIndexTxIndex {
#[inline]
fn from(value: ByteView) -> Self {
Self::from(&*value)
}
}
impl From<&[u8]> for AddressTypeAddressIndexTxIndex {
#[inline]
fn from(value: &[u8]) -> Self {
let addresstype = OutputType::from(&value[0..1]);
let addressindex = TypeIndex::from(&value[1..5]);
let txindex = TxIndex::from(&value[5..9]);
Self::from((addresstype, addressindex, txindex))
}
}
impl From<AddressTypeAddressIndexTxIndex> for ByteView {
#[inline]
fn from(value: AddressTypeAddressIndexTxIndex) -> Self {
ByteView::from(&value)
}
}
impl From<&AddressTypeAddressIndexTxIndex> for ByteView {
#[inline]
fn from(value: &AddressTypeAddressIndexTxIndex) -> Self {
ByteView::from(
[
value.addresstype.as_bytes(),
value.addressindextxindex.to_be_bytes().as_slice(),
]
.concat(),
)
}
}

View File

@@ -6,14 +6,12 @@ use brk_error::{Error, Result};
mod address;
mod addressbytes;
mod addressbyteshash;
mod addresschainstats;
mod addresshash;
mod addressindexoutpoint;
mod addressindextxindex;
mod addressmempoolstats;
mod addressstats;
mod addresstypeaddressindexoutpoint;
mod addresstypeaddressindextxindex;
mod anyaddressindex;
mod bitcoin;
mod blkmetadata;
@@ -106,14 +104,12 @@ mod yearindex;
pub use address::*;
pub use addressbytes::*;
pub use addressbyteshash::*;
pub use addresschainstats::*;
pub use addresshash::*;
pub use addressindexoutpoint::*;
pub use addressindextxindex::*;
pub use addressmempoolstats::*;
pub use addressstats::*;
pub use addresstypeaddressindexoutpoint::*;
pub use addresstypeaddressindextxindex::*;
pub use anyaddressindex::*;
pub use bitcoin::*;
pub use blkmetadata::*;