global: snapshot

This commit is contained in:
nym21
2025-10-26 22:30:41 +01:00
parent 7d01e9e91e
commit 82e59d409e
34 changed files with 2192 additions and 1090 deletions
Generated
+145
View File
@@ -653,6 +653,7 @@ dependencies = [
"log",
"pco",
"rayon",
"rustc-hash",
"serde",
"vecdb",
"zerocopy",
@@ -798,6 +799,7 @@ dependencies = [
"schemars",
"serde",
"serde_json",
"tokio",
"vecdb",
]
@@ -1261,6 +1263,7 @@ dependencies = [
"brk_types",
"byteview 0.6.1",
"byteview 0.8.0",
"candystore",
"log",
"parking_lot 0.12.5",
"rustc-hash",
@@ -1350,6 +1353,26 @@ dependencies = [
"allocator-api2",
]
[[package]]
name = "bytemuck"
version = "1.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4"
dependencies = [
"bytemuck_derive",
]
[[package]]
name = "bytemuck_derive"
version = "1.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
]
[[package]]
name = "byteorder"
version = "1.5.0"
@@ -1374,6 +1397,26 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e6b0e42e210b794e14b152c6fe1a55831e30ef4a0f5dc39d73d714fb5f1906c"
[[package]]
name = "candystore"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "304e90a814421e59b7e986f06d6d9041ecddeb28ce8bcc0a20bbf0091f22d0f0"
dependencies = [
"anyhow",
"bytemuck",
"crossbeam-channel",
"databuf",
"fslock",
"libc",
"memmap",
"parking_lot 0.12.5",
"rand 0.9.2",
"simd-itertools",
"siphasher",
"uuid",
]
[[package]]
name = "castaway"
version = "0.2.4"
@@ -1761,6 +1804,34 @@ dependencies = [
"parking_lot_core 0.9.12",
]
[[package]]
name = "databuf"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1ad1d99bee317a8dac0b7cd86896c5a5f24307009292985dabbf3e412c8b9d"
dependencies = [
"databuf-derive",
]
[[package]]
name = "databuf-derive"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04040c9fc8fcb4084222a26c99faf5b3014772a6115e076b7a50fe49bf25d0ea"
dependencies = [
"databuf_derive_impl",
]
[[package]]
name = "databuf_derive_impl"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf656eb071fe87d23716f933788a35a8ad6baa6fdbf66a67a261dbd3f9dc81a"
dependencies = [
"quote2",
"syn 2.0.108",
]
[[package]]
name = "deranged"
version = "0.5.4"
@@ -2067,6 +2138,16 @@ dependencies = [
"libc",
]
[[package]]
name = "fslock"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04412b8935272e3a9bae6f48c7bfff74c2911f60525404edfdd28e49884c3bfb"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "futures"
version = "0.3.31"
@@ -2777,6 +2858,16 @@ version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "memmap"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "memmap2"
version = "0.9.9"
@@ -2827,6 +2918,28 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "multiversion"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edb7f0ff51249dfda9ab96b5823695e15a052dc15074c9dbf3d118afaf2c201"
dependencies = [
"multiversion-macros",
"target-features",
]
[[package]]
name = "multiversion-macros"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b093064383341eb3271f42e381cb8f10a01459478446953953c75d24bd339fc0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.108",
"target-features",
]
[[package]]
name = "munge"
version = "0.4.7"
@@ -3824,6 +3937,23 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "quote2"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "970573b86f7e5795c8c6c50c56ef602368593f0687188da27fd489a59e253630"
dependencies = [
"proc-macro2",
"quote",
"quote2-macros",
]
[[package]]
name = "quote2-macros"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f4b89c37b2d870a28629ad20da669bb0e7d7214878d0d5111b304aa466e1977"
[[package]]
name = "r-efi"
version = "5.3.0"
@@ -4412,6 +4542,15 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
[[package]]
name = "simd-itertools"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a037ed5ba0cb7102a5b720453b642c5b2cf39960edd2ceace91af8ec3743082a"
dependencies = [
"multiversion",
]
[[package]]
name = "simdutf8"
version = "0.1.5"
@@ -4603,6 +4742,12 @@ dependencies = [
"syn 2.0.108",
]
[[package]]
name = "target-features"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1bbb9f3c5c463a01705937a24fdabc5047929ac764b2d5b9cf681c1f5041ed5"
[[package]]
name = "tempfile"
version = "3.23.0"
+1 -1
View File
@@ -40,7 +40,7 @@ brk_error = { version = "0.0.111", path = "crates/brk_error" }
brk_fetcher = { version = "0.0.111", path = "crates/brk_fetcher" }
brk_grouper = { version = "0.0.111", path = "crates/brk_grouper" }
brk_indexer = { version = "0.0.111", path = "crates/brk_indexer" }
brk_query = { version = "0.0.111", path = "crates/brk_query" }
brk_query = { version = "0.0.111", path = "crates/brk_query", features = ["tokio"] }
brk_iterator = { version = "0.0.111", path = "crates/brk_iterator" }
brk_logger = { version = "0.0.111", path = "crates/brk_logger" }
brk_mcp = { version = "0.0.111", path = "crates/brk_mcp" }
+1
View File
@@ -27,6 +27,7 @@ derive_deref = { workspace = true }
log = { workspace = true }
pco = "0.4.7"
rayon = { workspace = true }
rustc-hash = { workspace = true }
serde = { workspace = true }
vecdb = { workspace = true }
zerocopy = { workspace = true }
+4 -5
View File
@@ -168,7 +168,7 @@ impl Vecs {
|(txinindex, txoutindex)| {
let txoutindex = txoutindex.into_owned();
if txoutindex == TxOutIndex::COINBASE {
Sats::ZERO
Sats::MAX
} else if let Some((_, value)) =
txoutindex_to_value_iter.next_at(txoutindex.to_usize())
{
@@ -1456,15 +1456,14 @@ impl Vecs {
// },
// )?;
self.txindex_to_fee.compute_transform3(
self.txindex_to_fee.compute_transform2(
starting_indexes.txindex,
&self.txindex_to_input_value,
&self.txindex_to_output_value,
&self.txindex_to_is_coinbase,
|(i, input, output, coinbase, ..)| {
|(i, input, output, ..)| {
(
i,
if coinbase.is_true() {
if input.is_max() {
Sats::ZERO
} else {
input.checked_sub(output).unwrap()
+3
View File
@@ -120,6 +120,9 @@ impl Vecs {
.next_at(index.to_usize())
.map(|(_, outpoint)| {
let outpoint = outpoint.into_owned();
if outpoint.is_coinbase() {
return TxOutIndex::COINBASE;
}
txindex_to_first_txoutindex_iter
.next_at(outpoint.txindex().to_usize())
.unwrap()
+23 -13
View File
@@ -59,12 +59,20 @@ impl Computer {
let computed_path = outputs_path.join("computed");
let (indexes, fetched, blks) = thread::scope(|s| -> Result<_> {
let fetched_handle = fetcher.map(|fetcher| {
s.spawn(move || fetched::Vecs::forced_import(outputs_path, fetcher, VERSION))
});
const STACK_SIZE: usize = 512 * 1024 * 1024;
let big_thread = || thread::Builder::new().stack_size(STACK_SIZE);
let blks_handle = s.spawn(|| blks::Vecs::forced_import(&computed_path, VERSION));
let (indexes, fetched, blks) = thread::scope(|s| -> Result<_> {
let fetched_handle = fetcher
.map(|fetcher| {
big_thread().spawn_scoped(s, move || {
fetched::Vecs::forced_import(outputs_path, fetcher, VERSION)
})
})
.transpose()?;
let blks_handle = big_thread()
.spawn_scoped(s, || blks::Vecs::forced_import(&computed_path, VERSION))?;
let indexes = indexes::Vecs::forced_import(&computed_path, VERSION, indexer)?;
let fetched = fetched_handle.map(|h| h.join().unwrap()).transpose()?;
@@ -74,11 +82,13 @@ impl Computer {
})?;
let (price, constants, market) = thread::scope(|s| -> Result<_> {
let constants_handle =
s.spawn(|| constants::Vecs::forced_import(&computed_path, VERSION, &indexes));
let constants_handle = big_thread().spawn_scoped(s, || {
constants::Vecs::forced_import(&computed_path, VERSION, &indexes)
})?;
let market_handle =
s.spawn(|| market::Vecs::forced_import(&computed_path, VERSION, &indexes));
let market_handle = big_thread().spawn_scoped(s, || {
market::Vecs::forced_import(&computed_path, VERSION, &indexes)
})?;
let price = fetched
.is_some()
@@ -91,7 +101,7 @@ impl Computer {
})?;
let (chain, pools, cointime) = thread::scope(|s| -> Result<_> {
let chain_handle = s.spawn(|| {
let chain_handle = big_thread().spawn_scoped(s, || {
chain::Vecs::forced_import(
&computed_path,
VERSION,
@@ -99,11 +109,11 @@ impl Computer {
&indexes,
price.as_ref(),
)
});
})?;
let pools_handle = s.spawn(|| {
let pools_handle = big_thread().spawn_scoped(s, || {
pools::Vecs::forced_import(&computed_path, VERSION, &indexes, price.as_ref())
});
})?;
let cointime =
cointime::Vecs::forced_import(&computed_path, VERSION, &indexes, price.as_ref())?;
@@ -2,12 +2,12 @@ mod addresscount;
mod height_to_addresscount;
mod height_to_vec;
mod indexes_to_addresscount;
mod typeindex_tree;
mod typeindex_map;
mod vec;
pub use addresscount::*;
pub use height_to_addresscount::*;
pub use height_to_vec::*;
pub use indexes_to_addresscount::*;
pub use typeindex_tree::*;
pub use typeindex_map::*;
pub use vec::*;
@@ -0,0 +1,68 @@
use std::mem;
use brk_grouper::ByAddressType;
use brk_types::{OutputType, TypeIndex};
use derive_deref::{Deref, DerefMut};
use rustc_hash::FxHashMap;
#[derive(Debug, Deref, DerefMut)]
pub struct AddressTypeToTypeIndexMap<T>(ByAddressType<FxHashMap<TypeIndex, T>>);
impl<T> AddressTypeToTypeIndexMap<T> {
pub fn merge(mut self, mut other: Self) -> Self {
Self::merge_(&mut self.p2pk65, &mut other.p2pk65);
Self::merge_(&mut self.p2pk33, &mut other.p2pk33);
Self::merge_(&mut self.p2pkh, &mut other.p2pkh);
Self::merge_(&mut self.p2sh, &mut other.p2sh);
Self::merge_(&mut self.p2wpkh, &mut other.p2wpkh);
Self::merge_(&mut self.p2wsh, &mut other.p2wsh);
Self::merge_(&mut self.p2tr, &mut other.p2tr);
Self::merge_(&mut self.p2a, &mut other.p2a);
self
}
fn merge_(own: &mut FxHashMap<TypeIndex, T>, other: &mut FxHashMap<TypeIndex, T>) {
if own.len() < other.len() {
mem::swap(own, other);
}
own.extend(other.drain());
}
// pub fn get_for_type(&self, address_type: OutputType, typeindex: &TypeIndex) -> Option<&T> {
// self.get(address_type).unwrap().get(typeindex)
// }
pub fn insert_for_type(&mut self, address_type: OutputType, typeindex: TypeIndex, value: T) {
self.get_mut(address_type).unwrap().insert(typeindex, value);
}
pub fn remove_for_type(&mut self, address_type: OutputType, typeindex: &TypeIndex) -> T {
self.get_mut(address_type)
.unwrap()
.remove(typeindex)
.unwrap()
}
pub fn into_sorted_iter(self) -> impl Iterator<Item = (OutputType, Vec<(TypeIndex, T)>)> {
self.0.into_iter_typed().map(|(output_type, map)| {
let mut sorted: Vec<_> = map.into_iter().collect();
sorted.sort_unstable_by_key(|(typeindex, _)| *typeindex);
(output_type, sorted)
})
}
}
impl<T> Default for AddressTypeToTypeIndexMap<T> {
fn default() -> Self {
Self(ByAddressType {
p2pk65: FxHashMap::default(),
p2pk33: FxHashMap::default(),
p2pkh: FxHashMap::default(),
p2sh: FxHashMap::default(),
p2wpkh: FxHashMap::default(),
p2wsh: FxHashMap::default(),
p2tr: FxHashMap::default(),
p2a: FxHashMap::default(),
})
}
}
@@ -1,50 +0,0 @@
use std::{collections::BTreeMap, mem};
use brk_grouper::ByAddressType;
use brk_types::TypeIndex;
use derive_deref::{Deref, DerefMut};
#[derive(Debug, Deref, DerefMut)]
pub struct AddressTypeToTypeIndexTree<T>(ByAddressType<BTreeMap<TypeIndex, T>>);
impl<T> AddressTypeToTypeIndexTree<T> {
pub fn merge(mut self, mut other: Self) -> Self {
Self::merge_(&mut self.p2pk65, &mut other.p2pk65);
Self::merge_(&mut self.p2pk33, &mut other.p2pk33);
Self::merge_(&mut self.p2pkh, &mut other.p2pkh);
Self::merge_(&mut self.p2sh, &mut other.p2sh);
Self::merge_(&mut self.p2wpkh, &mut other.p2wpkh);
Self::merge_(&mut self.p2wsh, &mut other.p2wsh);
Self::merge_(&mut self.p2tr, &mut other.p2tr);
Self::merge_(&mut self.p2a, &mut other.p2a);
self
}
fn merge_(own: &mut BTreeMap<TypeIndex, T>, other: &mut BTreeMap<TypeIndex, T>) {
if own.len() >= other.len() {
own.append(other);
} else {
other.append(own);
mem::swap(own, other);
}
}
pub fn unwrap(self) -> ByAddressType<BTreeMap<TypeIndex, T>> {
self.0
}
}
impl<T> Default for AddressTypeToTypeIndexTree<T> {
fn default() -> Self {
Self(ByAddressType {
p2pk65: BTreeMap::default(),
p2pk33: BTreeMap::default(),
p2pkh: BTreeMap::default(),
p2sh: BTreeMap::default(),
p2wpkh: BTreeMap::default(),
p2wsh: BTreeMap::default(),
p2tr: BTreeMap::default(),
p2a: BTreeMap::default(),
})
}
}
File diff suppressed because it is too large Load Diff
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, ops::ControlFlow, path::Path};
use std::{ops::ControlFlow, path::Path};
use brk_error::Result;
use brk_grouper::{
@@ -10,6 +10,7 @@ use brk_types::{
Bitcoin, CheckedSub, DateIndex, Dollars, HalvingEpoch, Height, Timestamp, Version,
};
use derive_deref::{Deref, DerefMut};
use rustc_hash::FxHashMap;
use vecdb::{AnyIterableVec, Database, Exit, Format, StoredIndex};
use crate::{
@@ -1498,7 +1499,7 @@ impl Vecs {
pub fn send(
&mut self,
height_to_sent: BTreeMap<Height, Transacted>,
height_to_sent: FxHashMap<Height, Transacted>,
chain_state: &mut [BlockState],
) {
let mut time_based_vecs = self
+8
View File
@@ -1,4 +1,5 @@
use brk_traversable::Traversable;
use rayon::prelude::*;
use crate::Filtered;
@@ -23,6 +24,13 @@ impl<T> AddressGroups<T> {
self.amount_range.iter_mut()
}
pub fn par_iter_separate_mut(&mut self) -> impl ParallelIterator<Item = &mut T>
where
T: Send + Sync,
{
self.amount_range.par_iter_mut()
}
pub fn iter_overlapping_mut(&mut self) -> impl Iterator<Item = &mut T> {
self.lt_amount.iter_mut().chain(self.ge_amount.iter_mut())
}
+11
View File
@@ -62,10 +62,12 @@ impl<T> ByAddressType<T> {
})
}
#[inline]
pub fn get_unwrap(&self, address_type: OutputType) -> &T {
self.get(address_type).unwrap()
}
#[inline]
pub fn get(&self, address_type: OutputType) -> Option<&T> {
match address_type {
OutputType::P2PK65 => Some(&self.p2pk65),
@@ -80,10 +82,12 @@ impl<T> ByAddressType<T> {
}
}
#[inline]
pub fn get_mut_unwrap(&mut self, address_type: OutputType) -> &mut T {
self.get_mut(address_type).unwrap()
}
#[inline]
pub fn get_mut(&mut self, address_type: OutputType) -> Option<&mut T> {
match address_type {
OutputType::P2PK65 => Some(&mut self.p2pk65),
@@ -98,6 +102,7 @@ impl<T> ByAddressType<T> {
}
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &T> {
[
&self.p2pk65,
@@ -112,6 +117,7 @@ impl<T> ByAddressType<T> {
.into_iter()
}
#[inline]
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
[
&mut self.p2pk65,
@@ -126,6 +132,7 @@ impl<T> ByAddressType<T> {
.into_iter()
}
#[inline]
pub fn par_iter(&mut self) -> impl ParallelIterator<Item = &T>
where
T: Send + Sync,
@@ -143,6 +150,7 @@ impl<T> ByAddressType<T> {
.into_par_iter()
}
#[inline]
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut T>
where
T: Send + Sync,
@@ -160,6 +168,7 @@ impl<T> ByAddressType<T> {
.into_par_iter()
}
#[inline]
pub fn iter_typed(&self) -> impl Iterator<Item = (OutputType, &T)> {
[
(OutputType::P2PK65, &self.p2pk65),
@@ -174,6 +183,7 @@ impl<T> ByAddressType<T> {
.into_iter()
}
#[inline]
pub fn into_iter_typed(self) -> impl Iterator<Item = (OutputType, T)> {
[
(OutputType::P2PK65, self.p2pk65),
@@ -188,6 +198,7 @@ impl<T> ByAddressType<T> {
.into_iter()
}
#[inline]
pub fn iter_typed_mut(&mut self) -> impl Iterator<Item = (OutputType, &mut T)> {
[
(OutputType::P2PK65, &mut self.p2pk65),
+30
View File
@@ -1,4 +1,5 @@
use brk_traversable::Traversable;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use crate::Filtered;
@@ -107,6 +108,35 @@ impl<T> ByAgeRange<T> {
]
.into_iter()
}
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut T>
where
T: Send + Sync,
{
[
&mut self.up_to_1d,
&mut self._1d_to_1w,
&mut self._1w_to_1m,
&mut self._1m_to_2m,
&mut self._2m_to_3m,
&mut self._3m_to_4m,
&mut self._4m_to_5m,
&mut self._5m_to_6m,
&mut self._6m_to_1y,
&mut self._1y_to_2y,
&mut self._2y_to_3y,
&mut self._3y_to_4y,
&mut self._4y_to_5y,
&mut self._5y_to_6y,
&mut self._6y_to_7y,
&mut self._7y_to_8y,
&mut self._8y_to_10y,
&mut self._10y_to_12y,
&mut self._12y_to_15y,
&mut self.from_15y,
]
.into_par_iter()
}
}
impl<T> ByAgeRange<Filtered<T>> {
+25
View File
@@ -2,6 +2,7 @@ use std::ops::{Add, AddAssign};
use brk_traversable::Traversable;
use brk_types::Sats;
use rayon::prelude::*;
use super::{Filter, Filtered};
@@ -201,6 +202,30 @@ impl<T> ByAmountRange<T> {
]
.into_iter()
}
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut T>
where
T: Send + Sync,
{
[
&mut self._0sats,
&mut self._1sat_to_10sats,
&mut self._10sats_to_100sats,
&mut self._100sats_to_1k_sats,
&mut self._1k_sats_to_10k_sats,
&mut self._10k_sats_to_100k_sats,
&mut self._100k_sats_to_1m_sats,
&mut self._1m_sats_to_10m_sats,
&mut self._10m_sats_to_1btc,
&mut self._1btc_to_10btc,
&mut self._10btc_to_100btc,
&mut self._100btc_to_1k_btc,
&mut self._1k_btc_to_10k_btc,
&mut self._10k_btc_to_100k_btc,
&mut self._100k_btc_or_more,
]
.into_par_iter()
}
}
impl<T> ByAmountRange<Filtered<T>> {
+15
View File
@@ -1,5 +1,6 @@
use brk_traversable::Traversable;
use brk_types::{HalvingEpoch, Height};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use super::{Filter, Filtered};
@@ -36,6 +37,20 @@ impl<T> ByEpoch<T> {
.into_iter()
}
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut T>
where
T: Send + Sync,
{
[
&mut self._0,
&mut self._1,
&mut self._2,
&mut self._3,
&mut self._4,
]
.into_par_iter()
}
pub fn mut_vec_from_height(&mut self, height: Height) -> &mut T {
let epoch = HalvingEpoch::from(height);
if epoch == HalvingEpoch::new(0) {
@@ -2,6 +2,7 @@ use std::ops::{Add, AddAssign};
use brk_traversable::Traversable;
use brk_types::OutputType;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use super::{Filter, Filtered};
@@ -55,6 +56,26 @@ impl<T> BySpendableType<T> {
.into_iter()
}
pub fn par_iter_mut(&mut self) -> impl ParallelIterator<Item = &mut T>
where
T: Send + Sync,
{
[
&mut self.p2pk65,
&mut self.p2pk33,
&mut self.p2pkh,
&mut self.p2ms,
&mut self.p2sh,
&mut self.p2wpkh,
&mut self.p2wsh,
&mut self.p2tr,
&mut self.p2a,
&mut self.unknown,
&mut self.empty,
]
.into_par_iter()
}
pub fn iter_typed(&self) -> impl Iterator<Item = (OutputType, &T)> {
[
(OutputType::P2PK65, &self.p2pk65),
+12
View File
@@ -1,4 +1,5 @@
use brk_traversable::Traversable;
use rayon::prelude::*;
use crate::{
ByAgeRange, ByAmountRange, ByEpoch, ByGreatEqualAmount, ByLowerThanAmount, ByMaxAge, ByMinAge,
@@ -42,6 +43,17 @@ impl<T> UTXOGroups<T> {
.chain(self._type.iter_mut())
}
pub fn par_iter_separate_mut(&mut self) -> impl ParallelIterator<Item = &mut T>
where
T: Send + Sync,
{
self.age_range
.par_iter_mut()
.chain(self.epoch.par_iter_mut())
.chain(self.amount_range.par_iter_mut())
.chain(self._type.par_iter_mut())
}
pub fn iter_overlapping_mut(&mut self) -> impl Iterator<Item = &mut T> {
[&mut self.all]
.into_iter()
+15
View File
@@ -0,0 +1,15 @@
## Bench
MBP M3 Pro
0..920750
Time: 13h 6mn
RAM:
Peak: ~12GB
After finish: ~8GB
After restart ~6.5GB
storage: 230GB
mode: checked
@@ -0,0 +1,772 @@
use std::{fs, path::Path, thread, time::Instant};
use brk_error::Result;
use brk_indexer::Indexer;
use brk_types::TxInIndex;
use rayon::prelude::*;
use vecdb::{GenericStoredVec, StoredIndex};
fn main() -> Result<()> {
brk_logger::init(Some(Path::new(".log")))?;
let outputs_dir = Path::new(&std::env::var("HOME").unwrap()).join(".brk");
fs::create_dir_all(&outputs_dir)?;
let indexer = Indexer::forced_import(&outputs_dir)?;
let vecs = indexer.vecs;
let output_len = vecs.txoutindex_to_value.len_();
let input_len = vecs.txinindex_to_outpoint.len_();
dbg!(output_len, input_len);
// Simulate processing blocks
const NUM_BLOCKS: usize = 10_000;
const OUTPUTS_PER_BLOCK: usize = 5_000;
const INPUTS_PER_BLOCK: usize = 5_000;
const OUTPUT_START_OFFSET: usize = 2_000_000_000;
const INPUT_START_OFFSET: usize = 2_000_000_000;
const NUM_RUNS: usize = 3;
println!(
"\n=== Running {} iterations of {} blocks ===",
NUM_RUNS, NUM_BLOCKS
);
println!(" {} outputs per block", OUTPUTS_PER_BLOCK);
println!(" {} inputs per block\n", INPUTS_PER_BLOCK);
// Store all run times
let mut method1_times = Vec::new();
let mut method2_times = Vec::new();
let mut method4_times = Vec::new();
let mut method5_times = Vec::new();
let mut method6_times = Vec::new();
let mut method7_times = Vec::new();
let mut method8_times = Vec::new();
for run in 0..NUM_RUNS {
println!("--- Run {}/{} ---", run + 1, NUM_RUNS);
// Randomize order for this run
let order = match run % 4 {
0 => vec![1, 2, 4, 5, 6, 7, 8],
1 => vec![8, 7, 6, 5, 4, 2, 1],
2 => vec![2, 5, 8, 1, 7, 4, 6],
_ => vec![6, 4, 7, 1, 8, 5, 2],
};
let mut run_times = [std::time::Duration::ZERO; 7];
for &method in &order {
match method {
1 => {
let time = run_method1(
&vecs,
NUM_BLOCKS,
OUTPUTS_PER_BLOCK,
INPUTS_PER_BLOCK,
OUTPUT_START_OFFSET,
INPUT_START_OFFSET,
);
run_times[0] = time;
}
2 => {
let time = run_method2(
&vecs,
NUM_BLOCKS,
OUTPUTS_PER_BLOCK,
INPUTS_PER_BLOCK,
OUTPUT_START_OFFSET,
INPUT_START_OFFSET,
);
run_times[1] = time;
}
4 => {
let time = run_method4(
&vecs,
NUM_BLOCKS,
OUTPUTS_PER_BLOCK,
INPUTS_PER_BLOCK,
OUTPUT_START_OFFSET,
INPUT_START_OFFSET,
);
run_times[2] = time;
}
5 => {
let time = run_method5(
&vecs,
NUM_BLOCKS,
OUTPUTS_PER_BLOCK,
INPUTS_PER_BLOCK,
OUTPUT_START_OFFSET,
INPUT_START_OFFSET,
);
run_times[3] = time;
}
6 => {
let time = run_method6(
&vecs,
NUM_BLOCKS,
OUTPUTS_PER_BLOCK,
INPUTS_PER_BLOCK,
OUTPUT_START_OFFSET,
INPUT_START_OFFSET,
);
run_times[4] = time;
}
7 => {
let time = run_method7(
&vecs,
NUM_BLOCKS,
OUTPUTS_PER_BLOCK,
INPUTS_PER_BLOCK,
OUTPUT_START_OFFSET,
INPUT_START_OFFSET,
);
run_times[5] = time;
}
8 => {
let time = run_method8(
&vecs,
NUM_BLOCKS,
OUTPUTS_PER_BLOCK,
INPUTS_PER_BLOCK,
OUTPUT_START_OFFSET,
INPUT_START_OFFSET,
);
run_times[6] = time;
}
_ => unreachable!(),
}
}
method1_times.push(run_times[0]);
method2_times.push(run_times[1]);
method4_times.push(run_times[2]);
method5_times.push(run_times[3]);
method6_times.push(run_times[4]);
method7_times.push(run_times[5]);
method8_times.push(run_times[6]);
println!(" Method 1: {:?}", run_times[0]);
println!(" Method 2: {:?}", run_times[1]);
println!(" Method 4: {:?}", run_times[2]);
println!(" Method 5: {:?}", run_times[3]);
println!(" Method 6: {:?}", run_times[4]);
println!(" Method 7: {:?}", run_times[5]);
println!(" Method 8: {:?}", run_times[6]);
println!();
}
// Calculate statistics
println!("\n=== Statistics over {} runs ===\n", NUM_RUNS);
let methods = vec![
("Method 1 (Parallel Interleaved)", &method1_times),
(
"Method 2 (Sequential Read + Parallel Process)",
&method2_times,
),
(
"Method 4 (Parallel Sequential Reads + Parallel Process)",
&method4_times,
),
("Method 5 (Chunked Parallel)", &method5_times),
("Method 6 (Prefetch)", &method6_times),
("Method 7 (Reuse Readers)", &method7_times),
("Method 8 (Bulk Processing)", &method8_times),
];
for (name, times) in &methods {
let avg = times.iter().sum::<std::time::Duration>() / times.len() as u32;
let min = times.iter().min().unwrap();
let max = times.iter().max().unwrap();
println!("{}:", name);
println!(" Average: {:?}", avg);
println!(" Min: {:?}", min);
println!(" Max: {:?}", max);
println!(" Std dev: {:?}", calculate_stddev(times));
println!();
}
// Find overall winner based on average
let averages: Vec<_> = methods
.iter()
.map(|(name, times)| {
let avg = times.iter().sum::<std::time::Duration>() / times.len() as u32;
(*name, avg)
})
.collect();
let fastest = averages.iter().min_by_key(|(_, t)| t).unwrap();
println!(
"=== Winner (by average): {} - {:?} ===\n",
fastest.0, fastest.1
);
for (name, time) in &averages {
if time != &fastest.1 {
let diff = time.as_secs_f64() / fastest.1.as_secs_f64();
println!("{} is {:.2}x slower", name, diff);
}
}
Ok(())
}
fn run_method1(
vecs: &brk_indexer::Vecs,
num_blocks: usize,
outputs_per_block: usize,
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader();
let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader();
let txinindex_to_outpoint_reader = vecs.txinindex_to_outpoint.create_reader();
let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader();
let start_time = Instant::now();
for block_idx in 0..num_blocks {
// Process outputs
let block_start = output_start_offset + (block_idx * outputs_per_block);
let _outputs: Vec<_> = (block_start..(block_start + outputs_per_block))
.into_par_iter()
.map(|i| {
(
vecs.txoutindex_to_value
.unwrap_read_(i, &txoutindex_to_value_reader),
vecs.txoutindex_to_outputtype
.unwrap_read_(i, &txoutindex_to_outputtype_reader),
vecs.txoutindex_to_typeindex
.unwrap_read_(i, &txoutindex_to_typeindex_reader),
)
})
.collect();
// Process inputs
let input_block_start = input_start_offset + (block_idx * inputs_per_block);
let input_sum: u64 = (input_block_start..(input_block_start + inputs_per_block))
.into_par_iter()
.filter_map(|i| {
let outpoint = vecs
.txinindex_to_outpoint
.unwrap_read_(i, &txinindex_to_outpoint_reader);
if outpoint.is_coinbase() {
return None;
}
let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_(
outpoint.txindex().to_usize(),
&txindex_to_first_txoutindex_reader,
);
let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout());
let value = vecs
.txoutindex_to_value
.unwrap_read_(txoutindex, &txoutindex_to_value_reader);
Some(u64::from(value))
})
.sum();
std::hint::black_box(input_sum);
}
start_time.elapsed()
}
fn run_method2(
vecs: &brk_indexer::Vecs,
num_blocks: usize,
outputs_per_block: usize,
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
let start_time = Instant::now();
for block_idx in 0..num_blocks {
// Process outputs
let block_start = brk_types::TxOutIndex::new(
(output_start_offset + (block_idx * outputs_per_block)) as u64,
);
let values: Vec<_> = vecs
.txoutindex_to_value
.iter_at(block_start)
.take(outputs_per_block)
.map(|(_, v)| v.into_owned())
.collect();
let output_types: Vec<_> = vecs
.txoutindex_to_outputtype
.iter_at(block_start)
.take(outputs_per_block)
.map(|(_, v)| v.into_owned())
.collect();
let typeindexes: Vec<_> = vecs
.txoutindex_to_typeindex
.iter_at(block_start)
.take(outputs_per_block)
.map(|(_, v)| v.into_owned())
.collect();
let _outputs: Vec<_> = (0..outputs_per_block)
.into_par_iter()
.map(|i| (values[i], output_types[i], typeindexes[i]))
.collect();
// Process inputs
let input_block_start =
TxInIndex::new((input_start_offset + (block_idx * inputs_per_block)) as u64);
let outpoints: Vec<_> = vecs
.txinindex_to_outpoint
.iter_at(input_block_start)
.take(inputs_per_block)
.map(|(_, v)| v.into_owned())
.collect();
let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader();
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
let input_sum: u64 = (0..outpoints.len())
.into_par_iter()
.filter_map(|i| {
let outpoint = outpoints[i];
if outpoint.is_coinbase() {
return None;
}
let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_(
outpoint.txindex().to_usize(),
&txindex_to_first_txoutindex_reader,
);
let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout());
let value = vecs
.txoutindex_to_value
.unwrap_read_(txoutindex, &txoutindex_to_value_reader);
Some(u64::from(value))
})
.sum();
std::hint::black_box(input_sum);
}
start_time.elapsed()
}
fn run_method4(
vecs: &brk_indexer::Vecs,
num_blocks: usize,
outputs_per_block: usize,
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
let start_time = Instant::now();
for block_idx in 0..num_blocks {
// Process outputs with parallel reads
let block_start = brk_types::TxOutIndex::new(
(output_start_offset + (block_idx * outputs_per_block)) as u64,
);
let (values, output_types, typeindexes) = thread::scope(|s| {
let h1 = s.spawn(|| {
vecs.txoutindex_to_value
.iter_at(block_start)
.take(outputs_per_block)
.map(|(_, v)| v.into_owned())
.collect::<Vec<_>>()
});
let h2 = s.spawn(|| {
vecs.txoutindex_to_outputtype
.iter_at(block_start)
.take(outputs_per_block)
.map(|(_, v)| v.into_owned())
.collect::<Vec<_>>()
});
let h3 = s.spawn(|| {
vecs.txoutindex_to_typeindex
.iter_at(block_start)
.take(outputs_per_block)
.map(|(_, v)| v.into_owned())
.collect::<Vec<_>>()
});
(h1.join().unwrap(), h2.join().unwrap(), h3.join().unwrap())
});
let _outputs: Vec<_> = (0..outputs_per_block)
.into_par_iter()
.map(|i| (values[i], output_types[i], typeindexes[i]))
.collect();
// Process inputs
let input_block_start =
TxInIndex::new((input_start_offset + (block_idx * inputs_per_block)) as u64);
let outpoints: Vec<_> = vecs
.txinindex_to_outpoint
.iter_at(input_block_start)
.take(inputs_per_block)
.map(|(_, v)| v.into_owned())
.collect();
let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader();
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
let input_sum: u64 = (0..outpoints.len())
.into_par_iter()
.filter_map(|i| {
let outpoint = outpoints[i];
if outpoint.is_coinbase() {
return None;
}
let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_(
outpoint.txindex().to_usize(),
&txindex_to_first_txoutindex_reader,
);
let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout());
let value = vecs
.txoutindex_to_value
.unwrap_read_(txoutindex, &txoutindex_to_value_reader);
Some(u64::from(value))
})
.sum();
std::hint::black_box(input_sum);
}
start_time.elapsed()
}
fn run_method5(
vecs: &brk_indexer::Vecs,
num_blocks: usize,
outputs_per_block: usize,
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader();
let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader();
let txinindex_to_outpoint_reader = vecs.txinindex_to_outpoint.create_reader();
let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader();
let start_time = Instant::now();
for block_idx in 0..num_blocks {
// Process outputs with larger chunks
let block_start = output_start_offset + (block_idx * outputs_per_block);
let _outputs: Vec<_> = (block_start..(block_start + outputs_per_block))
.into_par_iter()
.with_min_len(500) // Larger chunks
.map(|i| {
(
vecs.txoutindex_to_value
.unwrap_read_(i, &txoutindex_to_value_reader),
vecs.txoutindex_to_outputtype
.unwrap_read_(i, &txoutindex_to_outputtype_reader),
vecs.txoutindex_to_typeindex
.unwrap_read_(i, &txoutindex_to_typeindex_reader),
)
})
.collect();
// Process inputs with larger chunks
let input_block_start = input_start_offset + (block_idx * inputs_per_block);
let input_sum: u64 = (input_block_start..(input_block_start + inputs_per_block))
.into_par_iter()
.with_min_len(500) // Larger chunks
.filter_map(|i| {
let outpoint = vecs
.txinindex_to_outpoint
.unwrap_read_(i, &txinindex_to_outpoint_reader);
if outpoint.is_coinbase() {
return None;
}
let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_(
outpoint.txindex().to_usize(),
&txindex_to_first_txoutindex_reader,
);
let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout());
let value = vecs
.txoutindex_to_value
.unwrap_read_(txoutindex, &txoutindex_to_value_reader);
Some(u64::from(value))
})
.sum();
std::hint::black_box(input_sum);
}
start_time.elapsed()
}
fn run_method6(
vecs: &brk_indexer::Vecs,
num_blocks: usize,
outputs_per_block: usize,
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
let start_time = Instant::now();
for block_idx in 0..num_blocks {
// Read outputs sequentially
let block_start = brk_types::TxOutIndex::new(
(output_start_offset + (block_idx * outputs_per_block)) as u64,
);
let values: Vec<_> = vecs
.txoutindex_to_value
.iter_at(block_start)
.take(outputs_per_block)
.map(|(_, v)| v.into_owned())
.collect();
let output_types: Vec<_> = vecs
.txoutindex_to_outputtype
.iter_at(block_start)
.take(outputs_per_block)
.map(|(_, v)| v.into_owned())
.collect();
let typeindexes: Vec<_> = vecs
.txoutindex_to_typeindex
.iter_at(block_start)
.take(outputs_per_block)
.map(|(_, v)| v.into_owned())
.collect();
// Read inputs sequentially
let input_block_start =
TxInIndex::new((input_start_offset + (block_idx * inputs_per_block)) as u64);
let outpoints: Vec<_> = vecs
.txinindex_to_outpoint
.iter_at(input_block_start)
.take(inputs_per_block)
.map(|(_, v)| v.into_owned())
.collect();
let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader();
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
// Prefetch all first_txoutindexes in parallel
let first_txoutindexes: Vec<Option<_>> = outpoints
.par_iter()
.map(|op| {
if op.is_coinbase() {
return None;
}
Some(
vecs.txindex_to_first_txoutindex
.unwrap_read_(op.txindex().to_usize(), &txindex_to_first_txoutindex_reader),
)
})
.collect();
// Then read values in parallel
let input_sum: u64 = outpoints
.par_iter()
.zip(first_txoutindexes.par_iter())
.filter_map(|(op, first_opt)| {
let first_txoutindex = first_opt.as_ref()?;
let txoutindex = first_txoutindex.to_usize() + usize::from(op.vout());
let value = vecs
.txoutindex_to_value
.unwrap_read_(txoutindex, &txoutindex_to_value_reader);
Some(u64::from(value))
})
.sum();
let _outputs: Vec<_> = (0..outputs_per_block)
.into_par_iter()
.map(|i| (values[i], output_types[i], typeindexes[i]))
.collect();
std::hint::black_box(input_sum);
}
start_time.elapsed()
}
fn run_method7(
vecs: &brk_indexer::Vecs,
num_blocks: usize,
outputs_per_block: usize,
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
// Create readers ONCE outside loop
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader();
let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader();
let txinindex_to_outpoint_reader = vecs.txinindex_to_outpoint.create_reader();
let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader();
let start_time = Instant::now();
for block_idx in 0..num_blocks {
let block_start = output_start_offset + (block_idx * outputs_per_block);
let _outputs: Vec<_> = (block_start..(block_start + outputs_per_block))
.into_par_iter()
.map(|i| {
(
vecs.txoutindex_to_value
.unwrap_read_(i, &txoutindex_to_value_reader),
vecs.txoutindex_to_outputtype
.unwrap_read_(i, &txoutindex_to_outputtype_reader),
vecs.txoutindex_to_typeindex
.unwrap_read_(i, &txoutindex_to_typeindex_reader),
)
})
.collect();
let input_block_start = input_start_offset + (block_idx * inputs_per_block);
let input_sum: u64 = (input_block_start..(input_block_start + inputs_per_block))
.into_par_iter()
.filter_map(|i| {
let outpoint = vecs
.txinindex_to_outpoint
.unwrap_read_(i, &txinindex_to_outpoint_reader);
if outpoint.is_coinbase() {
return None;
}
let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_(
outpoint.txindex().to_usize(),
&txindex_to_first_txoutindex_reader,
);
let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout());
let value = vecs
.txoutindex_to_value
.unwrap_read_(txoutindex, &txoutindex_to_value_reader);
Some(u64::from(value))
})
.sum();
std::hint::black_box(input_sum);
}
start_time.elapsed()
}
fn run_method8(
vecs: &brk_indexer::Vecs,
num_blocks: usize,
outputs_per_block: usize,
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader();
let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader();
let txinindex_to_outpoint_reader = vecs.txinindex_to_outpoint.create_reader();
let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader();
const BULK_SIZE: usize = 64;
let start_time = Instant::now();
for block_idx in 0..num_blocks {
let block_start = output_start_offset + (block_idx * outputs_per_block);
// Process outputs in bulk chunks
let _outputs: Vec<_> = (0..outputs_per_block)
.collect::<Vec<_>>()
.par_chunks(BULK_SIZE)
.flat_map(|chunk| {
chunk
.iter()
.map(|&offset| {
let i = block_start + offset;
(
vecs.txoutindex_to_value
.unwrap_read_(i, &txoutindex_to_value_reader),
vecs.txoutindex_to_outputtype
.unwrap_read_(i, &txoutindex_to_outputtype_reader),
vecs.txoutindex_to_typeindex
.unwrap_read_(i, &txoutindex_to_typeindex_reader),
)
})
.collect::<Vec<_>>()
})
.collect();
// Process inputs in bulk chunks
let input_block_start = input_start_offset + (block_idx * inputs_per_block);
let input_sum: u64 = (0..inputs_per_block)
.collect::<Vec<_>>()
.par_chunks(BULK_SIZE)
.flat_map(|chunk| {
chunk
.iter()
.filter_map(|&offset| {
let i = input_block_start + offset;
let outpoint = vecs
.txinindex_to_outpoint
.unwrap_read_(i, &txinindex_to_outpoint_reader);
if outpoint.is_coinbase() {
return None;
}
let first_txoutindex = vecs.txindex_to_first_txoutindex.unwrap_read_(
outpoint.txindex().to_usize(),
&txindex_to_first_txoutindex_reader,
);
let txoutindex = first_txoutindex.to_usize() + usize::from(outpoint.vout());
let value = vecs
.txoutindex_to_value
.unwrap_read_(txoutindex, &txoutindex_to_value_reader);
Some(u64::from(value))
})
.collect::<Vec<_>>()
})
.sum();
std::hint::black_box(input_sum);
}
start_time.elapsed()
}
fn calculate_stddev(times: &[std::time::Duration]) -> std::time::Duration {
let avg = times.iter().sum::<std::time::Duration>().as_secs_f64() / times.len() as f64;
let variance = times
.iter()
.map(|t| {
let diff = t.as_secs_f64() - avg;
diff * diff
})
.sum::<f64>()
/ times.len() as f64;
std::time::Duration::from_secs_f64(variance.sqrt())
}
+189 -248
View File
@@ -28,9 +28,9 @@ pub use vecs::*;
// One version for all data sources
// Increment on **change _OR_ addition**
const VERSION: Version = Version::new(22);
const VERSION: Version = Version::new(23);
const SNAPSHOT_BLOCK_RANGE: usize = 1_000;
const COLLISIONS_CHECKED_UP_TO: Height = Height::new(0);
const COLLISIONS_CHECKED_UP_TO: Height = Height::new(920_000);
#[derive(Clone)]
pub struct Indexer {
@@ -474,94 +474,49 @@ impl Indexer {
FxHashMap::default();
let mut same_block_output_info: FxHashMap<OutPoint, (OutputType, TypeIndex)> =
FxHashMap::default();
txouts
.into_iter()
.try_for_each(|data| -> Result<()> {
let (
txoutindex,
txout,
txindex,
vout,
outputtype,
addressbytes_opt,
typeindex_opt,
) = data;
for (txoutindex, txout, txindex, vout, outputtype, addressbytes_opt, typeindex_opt) in
txouts
{
let sats = Sats::from(txout.value);
let sats = Sats::from(txout.value);
if vout.is_zero() {
vecs.txindex_to_first_txoutindex
.push_if_needed(txindex, txoutindex)?;
}
if vout.is_zero() {
vecs.txindex_to_first_txoutindex
.push_if_needed(txindex, txoutindex)?;
}
vecs.txoutindex_to_value.push_if_needed(txoutindex, sats)?;
vecs.txoutindex_to_value.push_if_needed(txoutindex, sats)?;
vecs.txoutindex_to_txindex
.push_if_needed(txoutindex, txindex)?;
vecs.txoutindex_to_txindex
.push_if_needed(txoutindex, txindex)?;
vecs.txoutindex_to_outputtype
.push_if_needed(txoutindex, outputtype)?;
vecs.txoutindex_to_outputtype
.push_if_needed(txoutindex, outputtype)?;
let typeindex = if let Some(ti) = typeindex_opt {
let typeindex = if let Some(ti) = typeindex_opt {
ti
} else if let Some((address_bytes, address_hash)) = addressbytes_opt {
if let Some(&ti) = already_added_addressbyteshash.get(&address_hash) {
ti
} else if let Some((address_bytes, address_hash)) = addressbytes_opt {
if let Some(&ti) = already_added_addressbyteshash.get(&address_hash) {
ti
} else {
let ti = match outputtype {
OutputType::P2PK65 => indexes.p2pk65addressindex.copy_then_increment(),
OutputType::P2PK33 => indexes.p2pk33addressindex.copy_then_increment(),
OutputType::P2PKH => indexes.p2pkhaddressindex.copy_then_increment(),
OutputType::P2MS => {
vecs.p2msoutputindex_to_txindex
.push_if_needed(indexes.p2msoutputindex, txindex)?;
indexes.p2msoutputindex.copy_then_increment()
}
OutputType::P2SH => indexes.p2shaddressindex.copy_then_increment(),
OutputType::OpReturn => {
vecs.opreturnindex_to_txindex
.push_if_needed(indexes.opreturnindex, txindex)?;
indexes.opreturnindex.copy_then_increment()
}
OutputType::P2WPKH => indexes.p2wpkhaddressindex.copy_then_increment(),
OutputType::P2WSH => indexes.p2wshaddressindex.copy_then_increment(),
OutputType::P2TR => indexes.p2traddressindex.copy_then_increment(),
OutputType::P2A => indexes.p2aaddressindex.copy_then_increment(),
OutputType::Empty => {
vecs.emptyoutputindex_to_txindex
.push_if_needed(indexes.emptyoutputindex, txindex)?;
indexes.emptyoutputindex.copy_then_increment()
}
OutputType::Unknown => {
vecs.unknownoutputindex_to_txindex
.push_if_needed(indexes.unknownoutputindex, txindex)?;
indexes.unknownoutputindex.copy_then_increment()
}
_ => unreachable!(),
};
already_added_addressbyteshash.insert(address_hash, ti);
stores.addressbyteshash_to_typeindex.insert_if_needed(
address_hash,
ti,
height,
);
vecs.push_bytes_if_needed(ti, address_bytes)?;
ti
}
} else {
match outputtype {
let ti = match outputtype {
OutputType::P2PK65 => indexes.p2pk65addressindex.copy_then_increment(),
OutputType::P2PK33 => indexes.p2pk33addressindex.copy_then_increment(),
OutputType::P2PKH => indexes.p2pkhaddressindex.copy_then_increment(),
OutputType::P2MS => {
vecs.p2msoutputindex_to_txindex
.push_if_needed(indexes.p2msoutputindex, txindex)?;
indexes.p2msoutputindex.copy_then_increment()
}
OutputType::P2SH => indexes.p2shaddressindex.copy_then_increment(),
OutputType::OpReturn => {
vecs.opreturnindex_to_txindex
.push_if_needed(indexes.opreturnindex, txindex)?;
indexes.opreturnindex.copy_then_increment()
}
OutputType::P2WPKH => indexes.p2wpkhaddressindex.copy_then_increment(),
OutputType::P2WSH => indexes.p2wshaddressindex.copy_then_increment(),
OutputType::P2TR => indexes.p2traddressindex.copy_then_increment(),
OutputType::P2A => indexes.p2aaddressindex.copy_then_increment(),
OutputType::Empty => {
vecs.emptyoutputindex_to_txindex
.push_if_needed(indexes.emptyoutputindex, txindex)?;
@@ -573,217 +528,203 @@ impl Indexer {
indexes.unknownoutputindex.copy_then_increment()
}
_ => unreachable!(),
}
};
};
vecs.txoutindex_to_typeindex
.push_if_needed(txoutindex, typeindex)?;
already_added_addressbyteshash.insert(address_hash, ti);
stores.addressbyteshash_to_typeindex.insert_if_needed(
address_hash,
ti,
height,
);
vecs.push_bytes_if_needed(ti, address_bytes)?;
if outputtype.is_unspendable() {
return Ok(());
} else if outputtype.is_address() {
stores
.addresstype_to_typeindex_and_txindex
.get_mut(outputtype)
.unwrap()
.insert_if_needed(
TypeIndexAndTxIndex::from((typeindex, txindex)),
Unit,
height,
);
ti
}
let outpoint = OutPoint::new(txindex, vout);
if !same_block_spent_outpoints.contains(&outpoint) {
if outputtype.is_address() {
stores
.addresstype_to_typeindex_and_unspentoutpoint
.get_mut(outputtype)
.unwrap()
.insert_if_needed(
TypeIndexAndOutPoint::from((typeindex, outpoint)),
Unit,
height,
);
} else {
match outputtype {
OutputType::P2MS => {
vecs.p2msoutputindex_to_txindex
.push_if_needed(indexes.p2msoutputindex, txindex)?;
indexes.p2msoutputindex.copy_then_increment()
}
} else {
same_block_output_info.insert(outpoint, (outputtype, typeindex));
OutputType::OpReturn => {
vecs.opreturnindex_to_txindex
.push_if_needed(indexes.opreturnindex, txindex)?;
indexes.opreturnindex.copy_then_increment()
}
OutputType::Empty => {
vecs.emptyoutputindex_to_txindex
.push_if_needed(indexes.emptyoutputindex, txindex)?;
indexes.emptyoutputindex.copy_then_increment()
}
OutputType::Unknown => {
vecs.unknownoutputindex_to_txindex
.push_if_needed(indexes.unknownoutputindex, txindex)?;
indexes.unknownoutputindex.copy_then_increment()
}
_ => unreachable!(),
}
};
Ok(())
})?;
vecs.txoutindex_to_typeindex
.push_if_needed(txoutindex, typeindex)?;
if outputtype.is_unspendable() {
continue;
} else if outputtype.is_address() {
stores
.addresstype_to_typeindex_and_txindex
.get_mut(outputtype)
.unwrap()
.insert_if_needed(
TypeIndexAndTxIndex::from((typeindex, txindex)),
Unit,
height,
);
}
let outpoint = OutPoint::new(txindex, vout);
if same_block_spent_outpoints.contains(&outpoint) {
same_block_output_info.insert(outpoint, (outputtype, typeindex));
} else if outputtype.is_address() {
stores
.addresstype_to_typeindex_and_unspentoutpoint
.get_mut(outputtype)
.unwrap()
.insert_if_needed(
TypeIndexAndOutPoint::from((typeindex, outpoint)),
Unit,
height,
);
}
}
// println!(
// "txouts.into_iter() = : {:?}",
// i.elapsed()
// );
// let i = Instant::now();
txins
.into_iter()
.map(
#[allow(clippy::type_complexity)]
|(txinindex, input_source)| -> Result<(
TxInIndex,
Vin,
TxIndex,
OutPoint,
Option<(OutputType, TypeIndex)>,
)> {
if let InputSource::PreviousBlock((
vin,
txindex,
outpoint,
outputtype_typeindex_opt,
)) = input_source
{
return Ok((
txinindex,
vin,
txindex,
outpoint,
outputtype_typeindex_opt,
));
}
let InputSource::SameBlock((txindex, txin, vin, outpoint)) = input_source
else {
unreachable!()
};
let mut tuple = (txinindex, vin, txindex, outpoint, None);
for (txinindex, input_source) in txins {
let (vin, txindex, outpoint, outputtype_typeindex_opt) = match input_source {
InputSource::PreviousBlock(tuple) => tuple,
InputSource::SameBlock((txindex, txin, vin, outpoint)) => {
let mut tuple = (vin, txindex, outpoint, None);
if outpoint.is_coinbase() {
return Ok(tuple);
tuple
} else {
let outputtype_typeindex = same_block_output_info
.remove(&outpoint)
.ok_or(Error::Str("should have found addressindex from same block"))
.inspect_err(|_| {
dbg!(&same_block_output_info, txin);
})?;
if outputtype_typeindex.0.is_address() {
tuple.3 = Some(outputtype_typeindex);
}
(tuple.0, tuple.1, tuple.2, tuple.3)
}
let outputtype_typeindex = same_block_output_info
.remove(&outpoint)
.ok_or(Error::Str("should have found addressindex from same block"))
.inspect_err(|_| {
dbg!(&same_block_output_info, txin);
})?;
if outputtype_typeindex.0.is_address() {
tuple.4 = Some(outputtype_typeindex);
}
Ok(tuple)
},
)
.try_for_each(|res| -> Result<()> {
let (txinindex, vin, txindex, outpoint, outputtype_typeindex_opt) = res?;
if vin.is_zero() {
vecs.txindex_to_first_txinindex
.push_if_needed(txindex, txinindex)?;
}
};
vecs.txinindex_to_outpoint
.push_if_needed(txinindex, outpoint)?;
if vin.is_zero() {
vecs.txindex_to_first_txinindex
.push_if_needed(txindex, txinindex)?;
}
let Some((outputtype, typeindex)) = outputtype_typeindex_opt else {
return Ok(());
};
vecs.txinindex_to_outpoint
.push_if_needed(txinindex, outpoint)?;
stores
.addresstype_to_typeindex_and_txindex
.get_mut_unwrap(outputtype)
.insert_if_needed(
TypeIndexAndTxIndex::from((typeindex, txindex)),
Unit,
height,
);
let Some((outputtype, typeindex)) = outputtype_typeindex_opt else {
continue;
};
stores
.addresstype_to_typeindex_and_unspentoutpoint
.get_mut_unwrap(outputtype)
.remove_if_needed(
TypeIndexAndOutPoint::from((typeindex, outpoint)),
height,
);
stores
.addresstype_to_typeindex_and_txindex
.get_mut_unwrap(outputtype)
.insert_if_needed(
TypeIndexAndTxIndex::from((typeindex, txindex)),
Unit,
height,
);
Ok(())
})?;
stores
.addresstype_to_typeindex_and_unspentoutpoint
.get_mut_unwrap(outputtype)
.remove_if_needed(TypeIndexAndOutPoint::from((typeindex, outpoint)), height);
}
// println!("txins.into_iter(): {:?}", i.elapsed());
// let i = Instant::now();
if check_collisions {
let mut txindex_to_txid_iter = vecs.txindex_to_txid.into_iter();
txs.iter()
.try_for_each(|(txindex, _, _, _, prev_txindex_opt)| -> Result<()> {
let Some(prev_txindex) = prev_txindex_opt else {
return Ok(());
};
for (txindex, _, _, _, prev_txindex_opt) in txs.iter() {
let Some(prev_txindex) = prev_txindex_opt else {
continue;
};
// In case if we start at an already parsed height
if txindex == prev_txindex {
return Ok(());
}
// In case if we start at an already parsed height
if txindex == prev_txindex {
continue;
}
let len = vecs.txindex_to_txid.len();
// Ok if `get` is not par as should happen only twice
let prev_txid = txindex_to_txid_iter
.get(*prev_txindex)
.ok_or(Error::Str("To have txid for txindex"))
.inspect_err(|_| {
dbg!(txindex, len);
})?;
let len = vecs.txindex_to_txid.len();
// Ok if `get` is not par as should happen only twice
let prev_txid = txindex_to_txid_iter
.get(*prev_txindex)
.ok_or(Error::Str("To have txid for txindex"))
.inspect_err(|_| {
dbg!(txindex, len);
})?;
let prev_txid = prev_txid.as_ref();
let prev_txid = prev_txid.as_ref();
// If another Txid needs to be added to the list
// We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated
let only_known_dup_txids = [
bitcoin::Txid::from_str(
"d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599",
)
.unwrap()
.into(),
bitcoin::Txid::from_str(
"e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468",
)
.unwrap()
.into(),
];
// If another Txid needs to be added to the list
// We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated
let only_known_dup_txids = [
bitcoin::Txid::from_str(
"d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599",
)
.unwrap()
.into(),
bitcoin::Txid::from_str(
"e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468",
)
.unwrap()
.into(),
];
let is_dup = only_known_dup_txids.contains(prev_txid);
let is_dup = only_known_dup_txids.contains(prev_txid);
if !is_dup {
dbg!(height, txindex, prev_txid, prev_txindex);
return Err(Error::Str("Expect none"));
}
Ok(())
})?;
if !is_dup {
dbg!(height, txindex, prev_txid, prev_txindex);
return Err(Error::Str("Expect none"));
}
}
}
// println!("txindex_to_tx_and_txid = : {:?}", i.elapsed());
// let i = Instant::now();
txs.into_iter().try_for_each(
|(txindex, tx, txid, txid_prefix, prev_txindex_opt)| -> Result<()> {
if prev_txindex_opt.is_none() {
stores
.txidprefix_to_txindex
.insert_if_needed(txid_prefix, txindex, height);
}
for (txindex, tx, txid, txid_prefix, prev_txindex_opt) in txs {
if prev_txindex_opt.is_none() {
stores
.txidprefix_to_txindex
.insert_if_needed(txid_prefix, txindex, height);
}
vecs.txindex_to_height.push_if_needed(txindex, height)?;
vecs.txindex_to_txversion
.push_if_needed(txindex, tx.version.into())?;
vecs.txindex_to_txid.push_if_needed(txindex, txid)?;
vecs.txindex_to_rawlocktime
.push_if_needed(txindex, tx.lock_time.into())?;
vecs.txindex_to_base_size
.push_if_needed(txindex, tx.base_size().into())?;
vecs.txindex_to_total_size
.push_if_needed(txindex, tx.total_size().into())?;
vecs.txindex_to_is_explicitly_rbf
.push_if_needed(txindex, StoredBool::from(tx.is_explicitly_rbf()))?;
Ok(())
},
)?;
vecs.txindex_to_height.push_if_needed(txindex, height)?;
vecs.txindex_to_txversion
.push_if_needed(txindex, tx.version.into())?;
vecs.txindex_to_txid.push_if_needed(txindex, txid)?;
vecs.txindex_to_rawlocktime
.push_if_needed(txindex, tx.lock_time.into())?;
vecs.txindex_to_base_size
.push_if_needed(txindex, tx.base_size().into())?;
vecs.txindex_to_total_size
.push_if_needed(txindex, tx.total_size().into())?;
vecs.txindex_to_is_explicitly_rbf
.push_if_needed(txindex, StoredBool::from(tx.is_explicitly_rbf()))?;
}
// println!("txindex_to_tx_and_txid.into_iter(): {:?}", i.elapsed());
indexes.txindex += TxIndex::from(tx_len);
+2 -2
View File
@@ -59,7 +59,7 @@ pub struct Vecs {
pub txindex_to_total_size: CompressedVec<TxIndex, StoredU32>,
pub txindex_to_txid: RawVec<TxIndex, Txid>,
pub txindex_to_txversion: CompressedVec<TxIndex, TxVersion>,
pub txinindex_to_outpoint: RawVec<TxInIndex, OutPoint>,
pub txinindex_to_outpoint: CompressedVec<TxInIndex, OutPoint>,
pub txoutindex_to_outputtype: RawVec<TxOutIndex, OutputType>,
pub txoutindex_to_txindex: CompressedVec<TxOutIndex, TxIndex>,
pub txoutindex_to_typeindex: RawVec<TxOutIndex, TypeIndex>,
@@ -181,7 +181,7 @@ impl Vecs {
txindex_to_total_size: CompressedVec::forced_import(&db, "total_size", version)?,
txindex_to_txid: RawVec::forced_import(&db, "txid", version)?,
txindex_to_txversion: CompressedVec::forced_import(&db, "txversion", version)?,
txinindex_to_outpoint: RawVec::forced_import(&db, "outpoint", version)?,
txinindex_to_outpoint: CompressedVec::forced_import(&db, "outpoint", version)?,
txoutindex_to_outputtype: RawVec::forced_import(&db, "outputtype", version)?,
txoutindex_to_txindex: CompressedVec::forced_import(&db, "txindex", version)?,
txoutindex_to_typeindex: RawVec::forced_import(&db, "typeindex", version)?,
+3 -3
View File
@@ -1,6 +1,6 @@
#![doc = include_str!("../README.md")]
use brk_query::{PaginatedIndexParam, PaginationParam, Params, Query};
use brk_query::{AsyncQuery, PaginatedIndexParam, PaginationParam, Params};
use brk_rmcp::{
ErrorData as McpError, RoleServer, ServerHandler,
handler::server::{router::tool::ToolRouter, wrapper::Parameters},
@@ -16,7 +16,7 @@ pub mod route;
#[derive(Clone)]
pub struct MCP {
query: Query,
query: AsyncQuery,
tool_router: ToolRouter<MCP>,
}
@@ -24,7 +24,7 @@ const VERSION: &str = env!("CARGO_PKG_VERSION");
#[tool_router]
impl MCP {
pub fn new(query: &Query) -> Self {
pub fn new(query: &AsyncQuery) -> Self {
Self {
query: query.clone(),
tool_router: Self::tool_router(),
+3 -3
View File
@@ -1,5 +1,5 @@
use aide::axum::ApiRouter;
use brk_query::Query;
use brk_query::AsyncQuery;
use brk_rmcp::transport::{
StreamableHttpServerConfig,
streamable_http_server::{StreamableHttpService, session::local::LocalSessionManager},
@@ -10,14 +10,14 @@ use log::info;
use crate::MCP;
pub trait MCPRoutes {
fn add_mcp_routes(self, query: &Query, mcp: bool) -> Self;
fn add_mcp_routes(self, query: &AsyncQuery, mcp: bool) -> Self;
}
impl<T> MCPRoutes for ApiRouter<T>
where
T: Clone + Send + Sync + 'static,
{
fn add_mcp_routes(self, query: &Query, mcp: bool) -> Self {
fn add_mcp_routes(self, query: &AsyncQuery, mcp: bool) -> Self {
if !mcp {
return self;
}
+4
View File
@@ -9,6 +9,9 @@ repository.workspace = true
rust-version.workspace = true
build = "build.rs"
[features]
tokio = ["dep:tokio"]
[dependencies]
bitcoin = { workspace = true }
brk_computer = { workspace = true }
@@ -24,4 +27,5 @@ quickmatch = "0.1.8"
schemars = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, optional = true }
vecdb = { workspace = true }
+18
View File
@@ -0,0 +1,18 @@
// Should be async
// anything related to IO should use
//
// Sync function
// fn get(db: &CandyStore, key: &str) -> Option<Vec<u8>> {
// db.get(key).ok().flatten()
// }
use crate::Query;
// // Async function
// async fn get_async(db: Arc<CandyStore>, key: String) -> Option<Vec<u8>> {
// tokio::task::spawn_blocking(move || {
// db.get(&key).ok().flatten()
// }).await.ok()?
// }
#[derive(Clone)]
pub struct AsyncQuery(Query);
+2
View File
@@ -13,6 +13,7 @@ use brk_types::{
};
use vecdb::{AnyCollectableVec, AnyStoredVec};
mod r#async;
mod chain;
mod deser;
mod output;
@@ -20,6 +21,7 @@ mod pagination;
mod params;
mod vecs;
pub use r#async::*;
pub use output::{Output, Value};
pub use pagination::{PaginatedIndexParam, PaginatedMetrics, PaginationParam};
pub use params::{Params, ParamsDeprec, ParamsOpt};
+5 -6
View File
@@ -19,7 +19,7 @@ use axum::{
use brk_error::Result;
use brk_logger::OwoColorize;
use brk_mcp::route::MCPRoutes;
use brk_query::Query;
use brk_query::AsyncQuery;
use files::FilesRoutes;
use log::{error, info};
use quick_cache::sync::Cache;
@@ -31,19 +31,18 @@ mod api;
mod extended;
mod files;
use api::*;
use extended::*;
use crate::api::create_openapi;
#[derive(Clone)]
pub struct AppState {
query: Query,
query: AsyncQuery,
path: Option<PathBuf>,
cache: Arc<Cache<String, Bytes>>,
}
impl Deref for AppState {
type Target = Query;
type Target = AsyncQuery;
fn deref(&self) -> &Self::Target {
&self.query
}
@@ -54,7 +53,7 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION");
pub struct Server(AppState);
impl Server {
pub fn new(query: &Query, files_path: Option<PathBuf>) -> Self {
pub fn new(query: &AsyncQuery, files_path: Option<PathBuf>) -> Self {
Self(AppState {
query: query.clone(),
path: files_path,
+1
View File
@@ -16,6 +16,7 @@ brk_error = { workspace = true }
brk_types = { workspace = true }
byteview6 = { version = "=0.6.1", package = "byteview" }
byteview8 = { version = "~0.8.0", package = "byteview" }
candystore = "0.5.5"
fjall2 = { workspace = true }
# fjall3 = { workspace = true }
log = { workspace = true }
+2
View File
@@ -1,6 +1,7 @@
use allocative::Allocative;
use schemars::JsonSchema;
use serde::Serialize;
use vecdb::StoredCompressed;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::{TxIndex, Vout};
@@ -22,6 +23,7 @@ use crate::{TxIndex, Vout};
Allocative,
JsonSchema,
Hash,
StoredCompressed,
)]
pub struct OutPoint(u64);
+4
View File
@@ -71,6 +71,10 @@ impl Sats {
pub fn is_not_zero(&self) -> bool {
*self != Self::ZERO
}
pub fn is_max(&self) -> bool {
*self == Self::MAX
}
}
impl Add for Sats {
+4
View File
@@ -30,6 +30,10 @@ use super::Vin;
pub struct TxInIndex(u64);
impl TxInIndex {
pub fn new(index: u64) -> Self {
Self(index)
}
pub fn incremented(self) -> Self {
Self(*self + 1)
}
+4
View File
@@ -36,6 +36,10 @@ impl TxOutIndex {
pub const COINBASE: Self = Self(u64::MAX);
pub fn new(index: u64) -> Self {
Self(index)
}
pub fn incremented(self) -> Self {
Self(*self + 1)
}
+6
View File
@@ -83,6 +83,12 @@ impl From<Vout> for u64 {
}
}
impl From<Vout> for usize {
fn from(value: Vout) -> Self {
value.0 as usize
}
}
impl From<&[u8]> for Vout {
fn from(value: &[u8]) -> Self {
Self(u16::from_be_bytes(copy_first_2bytes(value).unwrap()))