computer: fix stateful

This commit is contained in:
nym21
2025-07-17 11:35:40 +02:00
parent a0cfc1be2b
commit c07e66c086
16 changed files with 340 additions and 105 deletions

6
Cargo.lock generated
View File

@@ -3618,15 +3618,15 @@ checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustix"
version = "1.0.7"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266"
checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8"
dependencies = [
"bitflags 2.9.1",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.59.0",
"windows-sys 0.60.2",
]
[[package]]

View File

@@ -10,8 +10,12 @@
- pull latest version and notify is out of date
- _computer_
- **add rollback of states (in stateful)**
- remove configurable format (raw/compressed) and chose sane ones instead
- linear reads: compressed (height/date/... + txindex_to_height + txindex_to_version + ...)
- random reads: raw (outputindex_to_value + ...)
- add prices paid by percentile (percentile cost basis) back
- add support for per index computation
- fix feerate which is always ZERO due to coinbase transaction
- fix min feerate which is always ZERO due to coinbase transaction
- before computing multiple sources check their length, panic if not equal
- add oracle price dataset (https://utxo.live/oracle/UTXOracle.py)
- add address counts relative to all datasets

View File

@@ -1,2 +1 @@
cargo build --profile profiling
flamegraph -- ../../target/profiling/brk
sudo cargo flamegraph --profile profiling --root

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../README.md")]
use std::{fs, thread};
use brk_core::{dot_brk_log_path, dot_brk_path};

View File

@@ -14,11 +14,7 @@ pub fn run() -> color_eyre::Result<()> {
let rpc = config.rpc()?;
let exit = Exit::new();
let parser = brk_parser::Parser::new(
config.blocksdir(),
config.brkdir(),
rpc,
);
let parser = brk_parser::Parser::new(config.blocksdir(), config.brkdir(), rpc);
let format = config.format();
@@ -59,7 +55,8 @@ pub fn run() -> color_eyre::Result<()> {
let watch = config.watch();
let mcp = config.mcp();
let server_handle = tokio::spawn(async move {
tokio::spawn(async move {
server.serve(watch, mcp).await.unwrap();
});

View File

@@ -1,7 +1,6 @@
use std::{path::Path, thread};
use brk_computer::Computer;
use brk_core::{default_bitcoin_path, default_brk_path};
use brk_exit::Exit;
use brk_fetcher::Fetcher;
use brk_indexer::Indexer;
@@ -13,7 +12,7 @@ pub fn main() -> color_eyre::Result<()> {
brk_logger::init(Some(Path::new(".log")));
// let bitcoin_dir = default_bitcoin_path();
// let bitcoin_dir = brk_core::default_bitcoin_path();
let bitcoin_dir = Path::new("/Volumes/WD_BLACK/bitcoin");
let rpc = Box::leak(Box::new(bitcoincore_rpc::Client::new(
@@ -26,7 +25,11 @@ pub fn main() -> color_eyre::Result<()> {
thread::Builder::new()
.stack_size(256 * 1024 * 1024)
.spawn(move || -> color_eyre::Result<()> {
let parser = Parser::new(bitcoin_dir.join("blocks"), default_brk_path(), rpc);
let parser = Parser::new(
bitcoin_dir.join("blocks"),
brk_core::default_brk_path(),
rpc,
);
let _outputs_dir = Path::new("/Volumes/WD_BLACK/brk").join("outputs");
let outputs_dir = _outputs_dir.as_path();

View File

@@ -1,4 +1,4 @@
use std::{collections::BTreeSet, mem};
use std::collections::BTreeSet;
use brk_core::TypeIndex;
use derive_deref::{Deref, DerefMut};
@@ -8,29 +8,6 @@ use super::ByAddressType;
#[derive(Debug, Deref, DerefMut)]
pub struct AddressTypeToTypeIndexSet(ByAddressType<BTreeSet<TypeIndex>>);
impl AddressTypeToTypeIndexSet {
pub fn merge(mut self, mut other: Self) -> Self {
Self::merge_(&mut self.p2pk65, &mut other.p2pk65);
Self::merge_(&mut self.p2pk33, &mut other.p2pk33);
Self::merge_(&mut self.p2pkh, &mut other.p2pkh);
Self::merge_(&mut self.p2sh, &mut other.p2sh);
Self::merge_(&mut self.p2wpkh, &mut other.p2wpkh);
Self::merge_(&mut self.p2wsh, &mut other.p2wsh);
Self::merge_(&mut self.p2tr, &mut other.p2tr);
Self::merge_(&mut self.p2a, &mut other.p2a);
self
}
fn merge_(own: &mut BTreeSet<TypeIndex>, other: &mut BTreeSet<TypeIndex>) {
if own.len() >= other.len() {
own.append(other);
} else {
other.append(own);
mem::swap(own, other);
}
}
}
impl Default for AddressTypeToTypeIndexSet {
fn default() -> Self {
Self(ByAddressType {

View File

@@ -29,10 +29,6 @@ impl<T> AddressTypeToTypeIndexTree<T> {
mem::swap(own, other);
}
}
pub fn unwrap(self) -> ByAddressType<BTreeMap<TypeIndex, T>> {
self.0
}
}
impl<T> Default for AddressTypeToTypeIndexTree<T> {

View File

@@ -55,7 +55,7 @@ use range_map::*;
use r#trait::*;
pub use withaddressdatasource::WithAddressDataSource;
const VERSION: Version = Version::new(18);
const VERSION: Version = Version::new(19);
#[derive(Clone)]
pub struct Vecs {
@@ -1188,20 +1188,10 @@ impl Vecs {
if height != Height::ZERO && height.unwrap_to_usize() % 10_000 == 0 {
info!("Flushing...");
exit.block();
self.flush_states(height, &chain_state, exit)?;
// Maybe keep some from the end for both
let addresstype_to_typeindex_to_loadedaddressdata_to_consume =
mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata);
let addresstype_to_typeindex_to_emptyaddressdata_to_consume =
mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata);
// stores.commit(
// height,
// addresstype_to_typeindex_to_loadedaddressdata_to_consume,
// addresstype_to_typeindex_to_emptyaddressdata_to_consume,
// )?;
self.flush_states(height, &chain_state, mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata), mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata), exit)?;
self.reset_mmaps_options(
&mut addresstypeindex_to_anyaddressindex_mmap_opt,
@@ -1218,12 +1208,13 @@ impl Vecs {
info!("Flushing...");
self.flush_states(height, &chain_state, exit)?;
// stores.commit(
// height,
// mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata),
// mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata),
// )?;
self.flush_states(
height,
&chain_state,
mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata),
mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata),
exit,
)?;
} else {
exit.block();
}
@@ -1488,33 +1479,39 @@ impl Vecs {
.into_owned();
Some(match anyaddressindex.to_enum() {
AnyAddressDataIndexEnum::Loaded(index) => {
AnyAddressDataIndexEnum::Loaded(loadedaddressindex) => {
let mmap = anyaddressindex_to_anyaddressdata_mmap_opt
.loaded
.as_ref()
.unwrap();
let loadedaddressdata = loadedaddressindex_to_loadedaddressdata
.get_or_read(index, mmap)
.get_or_read(loadedaddressindex, mmap)
.unwrap()
.unwrap()
.into_owned();
WithAddressDataSource::FromLoadedAddressDataVec(loadedaddressdata)
WithAddressDataSource::FromLoadedAddressDataVec((
loadedaddressindex,
loadedaddressdata,
))
}
AnyAddressDataIndexEnum::Empty(index) => {
AnyAddressDataIndexEnum::Empty(emtpyaddressindex) => {
let mmap = anyaddressindex_to_anyaddressdata_mmap_opt
.empty
.as_ref()
.unwrap();
let emptyaddressdata = emptyaddressindex_to_emptyaddressdata
.get_or_read(index, mmap)
.get_or_read(emtpyaddressindex, mmap)
.unwrap()
.unwrap()
.into_owned();
WithAddressDataSource::FromEmptyAddressDataVec(emptyaddressdata.into())
WithAddressDataSource::FromEmptyAddressDataVec((
emtpyaddressindex,
emptyaddressdata.into(),
))
}
})
}
@@ -1580,6 +1577,12 @@ impl Vecs {
&mut self,
height: Height,
chain_state: &[BlockState],
mut addresstype_to_typeindex_to_loadedaddressdata: AddressTypeToTypeIndexTree<
WithAddressDataSource<LoadedAddressData>,
>,
mut addresstype_to_typeindex_to_emptyaddressdata: AddressTypeToTypeIndexTree<
WithAddressDataSource<EmptyAddressData>,
>,
exit: &Exit,
) -> Result<()> {
self.utxo_cohorts
@@ -1601,6 +1604,149 @@ impl Vecs {
.into_iter()
.try_for_each(|v| v.safe_flush(exit))?;
let mut addresstype_to_typeindex_to_new_or_updated_anyaddressindex =
AddressTypeToTypeIndexTree::default();
addresstype_to_typeindex_to_emptyaddressdata
.into_typed_vec()
.into_iter()
.try_for_each(|(_type, tree)| -> Result<()> {
tree.into_iter().try_for_each(
|(typeindex, emptyaddressdata_with_source)| -> Result<()> {
match emptyaddressdata_with_source {
WithAddressDataSource::New(emptyaddressdata) => {
let emptyaddressindex = self
.emptyaddressindex_to_emptyaddressdata
.fill_first_hole_or_push(emptyaddressdata)?;
let anyaddressindex = AnyAddressIndex::from(emptyaddressindex);
addresstype_to_typeindex_to_new_or_updated_anyaddressindex
.get_mut(_type)
.unwrap()
.insert(typeindex, anyaddressindex);
Ok(())
}
WithAddressDataSource::FromEmptyAddressDataVec((
emptyaddressindex,
emptyaddressdata,
)) => self
.emptyaddressindex_to_emptyaddressdata
.update(emptyaddressindex, emptyaddressdata),
WithAddressDataSource::FromLoadedAddressDataVec((
loadedaddressindex,
emptyaddressdata,
)) => {
self.loadedaddressindex_to_loadedaddressdata
.delete(loadedaddressindex);
let emptyaddressindex = self
.emptyaddressindex_to_emptyaddressdata
.fill_first_hole_or_push(emptyaddressdata)?;
let anyaddressindex = emptyaddressindex.into();
addresstype_to_typeindex_to_new_or_updated_anyaddressindex
.get_mut(_type)
.unwrap()
.insert(typeindex, anyaddressindex);
Ok(())
}
}
},
)
})?;
addresstype_to_typeindex_to_loadedaddressdata
.into_typed_vec()
.into_iter()
.try_for_each(|(_type, tree)| -> Result<()> {
tree.into_iter().try_for_each(
|(typeindex, loadedaddressdata_with_source)| -> Result<()> {
match loadedaddressdata_with_source {
WithAddressDataSource::New(loadedaddressdata) => {
let loadedaddressindex = self
.loadedaddressindex_to_loadedaddressdata
.fill_first_hole_or_push(loadedaddressdata)?;
let anyaddressindex = AnyAddressIndex::from(loadedaddressindex);
addresstype_to_typeindex_to_new_or_updated_anyaddressindex
.get_mut(_type)
.unwrap()
.insert(typeindex, anyaddressindex);
Ok(())
}
WithAddressDataSource::FromLoadedAddressDataVec((
loadedaddressindex,
loadedaddressdata,
)) => self
.loadedaddressindex_to_loadedaddressdata
.update(loadedaddressindex, loadedaddressdata),
WithAddressDataSource::FromEmptyAddressDataVec((
emptyaddressindex,
loadedaddressdata,
)) => {
self.emptyaddressindex_to_emptyaddressdata
.delete(emptyaddressindex);
let loadedaddressindex = self
.loadedaddressindex_to_loadedaddressdata
.fill_first_hole_or_push(loadedaddressdata)?;
let anyaddressindex = loadedaddressindex.into();
addresstype_to_typeindex_to_new_or_updated_anyaddressindex
.get_mut(_type)
.unwrap()
.insert(typeindex, anyaddressindex);
Ok(())
}
}
},
)
})?;
addresstype_to_typeindex_to_new_or_updated_anyaddressindex
.into_typed_vec()
.into_iter()
.try_for_each(|(_type, tree)| -> Result<()> {
tree.into_iter()
.try_for_each(|(typeindex, anyaddressindex)| -> Result<()> {
match _type {
OutputType::P2PK33 => self
.p2pk33addressindex_to_anyaddressindex
.update_or_push(typeindex.into(), anyaddressindex),
OutputType::P2PK65 => self
.p2pk65addressindex_to_anyaddressindex
.update_or_push(typeindex.into(), anyaddressindex),
OutputType::P2PKH => self
.p2pkhaddressindex_to_anyaddressindex
.update_or_push(typeindex.into(), anyaddressindex),
OutputType::P2SH => self
.p2shaddressindex_to_anyaddressindex
.update_or_push(typeindex.into(), anyaddressindex),
OutputType::P2TR => self
.p2traddressindex_to_anyaddressindex
.update_or_push(typeindex.into(), anyaddressindex),
OutputType::P2WPKH => self
.p2wpkhaddressindex_to_anyaddressindex
.update_or_push(typeindex.into(), anyaddressindex),
OutputType::P2WSH => self
.p2wshaddressindex_to_anyaddressindex
.update_or_push(typeindex.into(), anyaddressindex),
OutputType::P2A => self
.p2aaddressindex_to_anyaddressindex
.update_or_push(typeindex.into(), anyaddressindex),
_ => unreachable!(),
}
})
})?;
self.p2pk33addressindex_to_anyaddressindex.flush(height)?;
self.p2pk65addressindex_to_anyaddressindex.flush(height)?;
self.p2pkhaddressindex_to_anyaddressindex.flush(height)?;

View File

@@ -1,10 +1,10 @@
use brk_core::{EmptyAddressData, LoadedAddressData};
use brk_core::{EmptyAddressData, EmptyAddressIndex, LoadedAddressData, LoadedAddressIndex};
#[derive(Debug)]
pub enum WithAddressDataSource<T> {
New(T),
FromLoadedAddressDataVec(T),
FromEmptyAddressDataVec(T),
FromLoadedAddressDataVec((LoadedAddressIndex, T)),
FromEmptyAddressDataVec((EmptyAddressIndex, T)),
}
impl<T> WithAddressDataSource<T> {
@@ -12,27 +12,15 @@ impl<T> WithAddressDataSource<T> {
matches!(self, Self::New(_))
}
pub fn is_from_addressdata(&self) -> bool {
matches!(self, Self::FromLoadedAddressDataVec(_))
}
pub fn is_from_emptyaddressdata(&self) -> bool {
matches!(self, Self::FromEmptyAddressDataVec(_))
}
pub fn deref(&self) -> &T {
match self {
Self::New(v) => v,
Self::FromLoadedAddressDataVec(v) => v,
Self::FromEmptyAddressDataVec(v) => v,
}
}
pub fn deref_mut(&mut self) -> &mut T {
match self {
Self::New(v) => v,
Self::FromLoadedAddressDataVec(v) => v,
Self::FromEmptyAddressDataVec(v) => v,
Self::FromLoadedAddressDataVec((_, v)) => v,
Self::FromEmptyAddressDataVec((_, v)) => v,
}
}
}
@@ -41,11 +29,11 @@ impl From<WithAddressDataSource<EmptyAddressData>> for WithAddressDataSource<Loa
fn from(value: WithAddressDataSource<EmptyAddressData>) -> Self {
match value {
WithAddressDataSource::New(v) => Self::New(v.into()),
WithAddressDataSource::FromLoadedAddressDataVec(v) => {
Self::FromLoadedAddressDataVec(v.into())
WithAddressDataSource::FromLoadedAddressDataVec((i, v)) => {
Self::FromLoadedAddressDataVec((i, v.into()))
}
WithAddressDataSource::FromEmptyAddressDataVec(v) => {
Self::FromEmptyAddressDataVec(v.into())
WithAddressDataSource::FromEmptyAddressDataVec((i, v)) => {
Self::FromEmptyAddressDataVec((i, v.into()))
}
}
}
@@ -55,11 +43,11 @@ impl From<WithAddressDataSource<LoadedAddressData>> for WithAddressDataSource<Em
fn from(value: WithAddressDataSource<LoadedAddressData>) -> Self {
match value {
WithAddressDataSource::New(v) => Self::New(v.into()),
WithAddressDataSource::FromLoadedAddressDataVec(v) => {
Self::FromLoadedAddressDataVec(v.into())
WithAddressDataSource::FromLoadedAddressDataVec((i, v)) => {
Self::FromLoadedAddressDataVec((i, v.into()))
}
WithAddressDataSource::FromEmptyAddressDataVec(v) => {
Self::FromEmptyAddressDataVec(v.into())
WithAddressDataSource::FromEmptyAddressDataVec((i, v)) => {
Self::FromEmptyAddressDataVec((i, v.into()))
}
}
}

View File

@@ -19,6 +19,21 @@ impl AnyAddressIndex {
}
}
impl From<LoadedAddressIndex> for AnyAddressIndex {
fn from(value: LoadedAddressIndex) -> Self {
if u32::from(value) >= MIN_EMPTY_INDEX {
panic!("")
}
Self(*value)
}
}
impl From<EmptyAddressIndex> for AnyAddressIndex {
fn from(value: EmptyAddressIndex) -> Self {
Self(*value + MIN_EMPTY_INDEX)
}
}
impl Serialize for AnyAddressIndex {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@@ -38,9 +53,7 @@ impl From<AnyAddressIndex> for AnyAddressDataIndexEnum {
fn from(value: AnyAddressIndex) -> Self {
let uvalue = u32::from(value.0);
if uvalue >= MIN_EMPTY_INDEX {
Self::Empty(EmptyAddressIndex::from(TypeIndex::from(
uvalue - MIN_EMPTY_INDEX,
)))
Self::Empty(EmptyAddressIndex::from(uvalue - MIN_EMPTY_INDEX))
} else {
Self::Loaded(LoadedAddressIndex::from(value.0))
}

View File

@@ -1,5 +1,6 @@
use std::ops::Add;
use derive_deref::Deref;
use serde::Serialize;
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
@@ -7,13 +8,14 @@ use crate::{CheckedSub, Printable, TypeIndex};
#[derive(
Debug,
Default,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Clone,
Copy,
Default,
Deref,
FromBytes,
Immutable,
IntoBytes,
@@ -33,6 +35,11 @@ impl From<usize> for EmptyAddressIndex {
Self(TypeIndex::from(value))
}
}
impl From<u32> for EmptyAddressIndex {
fn from(value: u32) -> Self {
Self(TypeIndex::from(value))
}
}
impl From<EmptyAddressIndex> for usize {
fn from(value: EmptyAddressIndex) -> Self {

View File

@@ -1,5 +1,6 @@
use std::ops::Add;
use derive_deref::Deref;
use serde::Serialize;
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
@@ -13,6 +14,7 @@ use crate::{CheckedSub, Printable, TypeIndex};
Ord,
Clone,
Copy,
Deref,
Default,
FromBytes,
Immutable,
@@ -38,6 +40,11 @@ impl From<LoadedAddressIndex> for usize {
usize::from(value.0)
}
}
impl From<LoadedAddressIndex> for u32 {
fn from(value: LoadedAddressIndex) -> Self {
u32::from(value.0)
}
}
impl Add<usize> for LoadedAddressIndex {
type Output = Self;
fn add(self, rhs: usize) -> Self::Output {

View File

@@ -73,6 +73,12 @@ impl From<TypeIndex> for usize {
}
}
impl Add<u32> for TypeIndex {
type Output = Self;
fn add(self, rhs: u32) -> Self::Output {
Self(self.0 + rhs)
}
}
impl Add<usize> for TypeIndex {
type Output = Self;
fn add(self, rhs: usize) -> Self::Output {

View File

@@ -1,5 +1,6 @@
use std::{
borrow::Cow,
cmp::Ordering,
collections::{BTreeMap, BTreeSet},
fs::{self, File, OpenOptions},
io::{self, Seek, SeekFrom, Write},
@@ -85,20 +86,68 @@ where
self.mut_pushed().push(value)
}
#[inline]
fn update_or_push(&mut self, index: I, value: T) -> Result<()> {
let len = self.len();
match len.cmp(&index.to_usize()?) {
Ordering::Less => {
dbg!(index, value, len, self.header());
Err(Error::IndexTooHigh)
}
Ordering::Equal => {
self.push(value);
Ok(())
}
Ordering::Greater => self.update(index, value),
}
}
fn get_first_empty_index(&self) -> I {
self.holes()
.first()
.cloned()
.unwrap_or_else(|| self.len_())
.into()
}
#[inline]
fn fill_first_hole_or_push(&mut self, value: T) -> Result<I> {
Ok(
if let Some(hole) = self.mut_holes().pop_first().map(I::from) {
self.update(hole, value)?;
hole
} else {
self.push(value);
I::from(self.len() - 1)
},
)
}
fn holes(&self) -> &BTreeSet<usize>;
fn mut_holes(&mut self) -> &mut BTreeSet<usize>;
fn take(&mut self, index: I, mmap: &Mmap) -> Result<Option<T>> {
let opt = self.get_or_read(index, mmap)?.map(|v| v.into_owned());
if opt.is_some() {
let uindex = index.unwrap_to_usize();
let updated = self.mut_updated();
if !updated.is_empty() {
updated.remove(&uindex);
}
self.mut_holes().insert(uindex);
self.unchecked_delete(index);
}
Ok(opt)
}
#[inline]
fn delete(&mut self, index: I) {
if index.unwrap_to_usize() < self.len() {
self.unchecked_delete(index);
}
}
#[inline]
#[doc(hidden)]
fn unchecked_delete(&mut self, index: I) {
let uindex = index.unwrap_to_usize();
let updated = self.mut_updated();
if !updated.is_empty() {
updated.remove(&uindex);
}
self.mut_holes().insert(uindex);
}
fn updated(&self) -> &BTreeMap<usize, T>;
fn mut_updated(&mut self) -> &mut BTreeMap<usize, T>;

View File

@@ -38,6 +38,30 @@ where
self.0.get_or_read(index, mmap)
}
#[inline]
pub fn update_or_push(&mut self, index: I, value: T) -> Result<()> {
self.0.update_or_push(index, value)
}
#[inline]
pub fn checked_push(&mut self, index: I, value: T) -> Result<()> {
let len = self.0.len();
match len.cmp(&index.to_usize()?) {
Ordering::Greater => {
dbg!(index, value, len, self.0.header());
Err(Error::IndexTooLow)
}
Ordering::Equal => {
self.0.push(value);
Ok(())
}
Ordering::Less => {
dbg!(index, value, len, self.0.header());
Err(Error::IndexTooHigh)
}
}
}
#[inline]
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
let len = self.0.len();
@@ -58,6 +82,23 @@ where
}
}
#[inline]
pub fn fill_first_hole_or_push(&mut self, value: T) -> Result<I> {
self.0.fill_first_hole_or_push(value)
}
pub fn update(&mut self, index: I, value: T) -> Result<()> {
self.0.update(index, value)
}
pub fn take(&mut self, index: I, mmap: &Mmap) -> Result<Option<T>> {
self.0.take(index, mmap)
}
pub fn delete(&mut self, index: I) {
self.0.delete(index)
}
fn update_height(&mut self, height: Height) {
self.0.mut_header().update_height(height);
}