global: snapshot

This commit is contained in:
nym21
2025-10-25 16:30:14 +02:00
parent 1e4acfe124
commit 7d01e9e91e
12 changed files with 201 additions and 138 deletions

22
Cargo.lock generated
View File

@@ -506,10 +506,13 @@ dependencies = [
"brk_fetcher",
"brk_grouper",
"brk_indexer",
"brk_iterator",
"brk_logger",
"brk_mcp",
"brk_monitor",
"brk_query",
"brk_reader",
"brk_rpc",
"brk_server",
"brk_store",
"brk_traversable",
@@ -612,9 +615,11 @@ dependencies = [
"brk_error",
"brk_fetcher",
"brk_indexer",
"brk_iterator",
"brk_logger",
"brk_query",
"brk_reader",
"brk_rpc",
"brk_server",
"clap",
"color-eyre",
@@ -637,8 +642,10 @@ dependencies = [
"brk_fetcher",
"brk_grouper",
"brk_indexer",
"brk_iterator",
"brk_logger",
"brk_reader",
"brk_rpc",
"brk_store",
"brk_traversable",
"brk_types",
@@ -783,6 +790,7 @@ dependencies = [
"brk_error",
"brk_indexer",
"brk_reader",
"brk_rpc",
"brk_traversable",
"brk_types",
"derive_deref",
@@ -1224,10 +1232,12 @@ dependencies = [
"brk_error",
"brk_fetcher",
"brk_indexer",
"brk_iterator",
"brk_logger",
"brk_mcp",
"brk_query",
"brk_reader",
"brk_rpc",
"brk_traversable",
"brk_types",
"jiff",
@@ -1375,9 +1385,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.41"
version = "1.2.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac9fe6cdbb24b6ade63616c0a0688e45bb56732262c158df3c0c4bea4ca47cb7"
checksum = "81bbf3b3619004ad9bd139f62a9ab5cfe467f307455a0d307b0cf58bf070feaa"
dependencies = [
"find-msvc-tools",
"jobserver",
@@ -2018,9 +2028,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "flate2"
version = "1.1.4"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc5a4e564e38c699f2880d3fda590bedc2e69f3f84cd48b457bd892ce61d0aa9"
checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb"
dependencies = [
"crc32fast",
"libz-rs-sys",
@@ -3757,9 +3767,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.102"
version = "1.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e0f6df8eaa422d97d72edcd152e1451618fed47fabbdbd5a8864167b1d4aff7"
checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
dependencies = [
"unicode-ident",
]

View File

@@ -18,7 +18,7 @@ pub trait Bridge {
fn generate_js_files(&self, modules_path: &Path) -> io::Result<()>;
}
impl Bridge for Query<'static> {
impl Bridge for Query {
fn generate_js_files(&self, modules_path: &Path) -> io::Result<()> {
let path = modules_path.join("brk-client");
@@ -81,7 +81,7 @@ export const POOL_ID_TO_POOL_NAME = /** @type {const} */ ({
fs::write(path, contents)
}
fn generate_metrics_file(query: &Query<'static>, parent: &Path) -> io::Result<()> {
fn generate_metrics_file(query: &Query, parent: &Path) -> io::Result<()> {
let path = parent.join(Path::new("metrics.js"));
let indexes = Index::all();

View File

@@ -16,9 +16,11 @@ brk_computer = { workspace = true }
brk_error = { workspace = true }
brk_fetcher = { workspace = true }
brk_indexer = { workspace = true }
brk_iterator = { workspace = true }
brk_query = { workspace = true }
brk_logger = { workspace = true }
brk_reader = { workspace = true }
brk_rpc = { workspace = true }
brk_server = { workspace = true }
vecdb = { workspace = true }
clap = { version = "4.5.50", features = ["derive", "string"] }

View File

@@ -3,10 +3,10 @@ use std::{
path::{Path, PathBuf},
};
use bitcoincore_rpc::{self, Auth, Client};
use brk_error::{Error, Result};
use brk_fetcher::Fetcher;
use brk_rpc::{Auth, Client};
use clap::Parser;
use color_eyre::eyre::eyre;
use serde::{Deserialize, Deserializer, Serialize};
use crate::{default_bitcoin_path, default_brk_path, dot_brk_path, website::Website};
@@ -78,7 +78,7 @@ pub struct Config {
}
impl Config {
pub fn import() -> color_eyre::Result<Self> {
pub fn import() -> Result<Self> {
let config_args = Some(Config::parse());
let path = dot_brk_path();
@@ -196,18 +196,18 @@ Finally, you can run the program with '-h' for help."
fs::write(path, toml::to_string(self).unwrap())
}
pub fn rpc(&self) -> color_eyre::Result<&'static Client> {
Ok(Box::leak(Box::new(Client::new(
pub fn rpc(&self) -> Result<Client> {
Ok(Client::new(
&format!(
"http://{}:{}",
self.rpcconnect().unwrap_or(&"localhost".to_string()),
self.rpcport().unwrap_or(8332)
),
self.rpc_auth().unwrap(),
)?)))
)?)
}
fn rpc_auth(&self) -> color_eyre::Result<Auth> {
fn rpc_auth(&self) -> Result<Auth> {
let cookie = self.path_cookiefile();
if cookie.is_file() {
@@ -218,7 +218,7 @@ Finally, you can run the program with '-h' for help."
self.rpcpassword.clone().unwrap(),
))
} else {
Err(eyre!("Failed to find correct auth"))
Err(Error::Str("Failed to find correct auth"))
}
}

View File

@@ -8,12 +8,12 @@ use std::{
time::Duration,
};
use bitcoincore_rpc::{self, RpcApi};
use brk_binder::Bridge;
use brk_bundler::bundle;
use brk_computer::Computer;
use brk_error::Result;
use brk_indexer::Indexer;
use brk_iterator::Blocks;
use brk_query::Query;
use brk_reader::Reader;
use brk_server::{Server, VERSION};
@@ -27,12 +27,7 @@ mod website;
use crate::{config::Config, paths::*};
pub fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
fs::create_dir_all(dot_brk_path())?;
brk_logger::init(Some(&dot_brk_log_path()))?;
// Can't increase main thread's stack size, thus we need to use another thread
thread::Builder::new()
.stack_size(512 * 1024 * 1024)
.spawn(run)?
@@ -41,14 +36,22 @@ pub fn main() -> color_eyre::Result<()> {
}
pub fn run() -> color_eyre::Result<()> {
color_eyre::install()?;
fs::create_dir_all(dot_brk_path())?;
brk_logger::init(Some(&dot_brk_log_path()))?;
let config = Config::import()?;
let rpc = config.rpc()?;
let client = config.rpc()?;
let exit = Exit::new();
exit.set_ctrlc_handler();
let reader = Reader::new(config.blocksdir(), rpc);
let reader = Reader::new(config.blocksdir(), &client);
let blocks = Blocks::new(&client, &reader);
let mut indexer = Indexer::forced_import(&config.brkdir())?;
@@ -112,7 +115,7 @@ pub fn run() -> color_eyre::Result<()> {
None
};
let server = Server::new(query, bundle_path);
let server = Server::new(&query, bundle_path);
tokio::spawn(async move {
server.serve(true).await.unwrap();
@@ -128,40 +131,24 @@ pub fn run() -> color_eyre::Result<()> {
let _handle = runtime.spawn(future);
loop {
wait_for_synced_node(rpc)?;
client.wait_for_synced_node()?;
let block_count = rpc.get_block_count()?;
let last_height = client.get_last_height()?;
info!("{} blocks found.", block_count + 1);
info!("{} blocks found.", u32::from(last_height) + 1);
let starting_indexes = if config.check_collisions() {
indexer.checked_index(&reader, rpc, &exit)?;
indexer.checked_index(&blocks, &client, &exit)?
} else {
indexer.index(&reader, rpc, &exit)?;
indexer.index(&blocks, &client, &exit)?
};
computer.compute(&indexer, starting_indexes, &reader, &exit)?;
info!("Waiting for new blocks...");
while block_count == rpc.get_block_count()? {
while last_height == client.get_last_height()? {
sleep(Duration::from_secs(1))
}
}
}
fn wait_for_synced_node(rpc_client: &bitcoincore_rpc::Client) -> color_eyre::Result<()> {
let is_synced = || -> color_eyre::Result<bool> {
let info = rpc_client.get_blockchain_info()?;
Ok(info.headers == info.blocks)
};
if !is_synced()? {
info!("Waiting for node to sync...");
while !is_synced()? {
sleep(Duration::from_secs(1))
}
}
Ok(())
}

View File

@@ -14,6 +14,15 @@ use brk_rpc::{Auth, Client};
use vecdb::Exit;
pub fn main() -> Result<()> {
// Can't increase main thread's stack size, thus we need to use another thread
thread::Builder::new()
.stack_size(512 * 1024 * 1024)
.spawn(run)?
.join()
.unwrap()
}
fn run() -> Result<()> {
brk_logger::init(Some(Path::new(".log")))?;
let bitcoin_dir = Path::new(&std::env::var("HOME").unwrap())
@@ -22,6 +31,9 @@ pub fn main() -> Result<()> {
.join("Bitcoin");
// let bitcoin_dir = Path::new("/Volumes/WD_BLACK/bitcoin");
let outputs_dir = Path::new(&std::env::var("HOME").unwrap()).join(".brk");
// let outputs_dir = Path::new("../../_outputs");
let client = Client::new(
"http://localhost:8332",
Auth::CookieFile(bitcoin_dir.join(".cookie")),
@@ -31,9 +43,6 @@ pub fn main() -> Result<()> {
let blocks = Blocks::new(&client, &reader);
let outputs_dir = Path::new(&std::env::var("HOME").unwrap()).join(".brk");
// let outputs_dir = Path::new("../../_outputs");
let mut indexer = Indexer::forced_import(&outputs_dir)?;
let fetcher = Fetcher::import(true, None)?;
@@ -41,20 +50,13 @@ pub fn main() -> Result<()> {
let exit = Exit::new();
exit.set_ctrlc_handler();
// Can't increase main thread's stack size, thus we need to use another thread
thread::Builder::new()
.stack_size(256 * 1024 * 1024)
.spawn(move || -> Result<()> {
let mut computer = Computer::forced_import(&outputs_dir, &indexer, Some(fetcher))?;
let mut computer = Computer::forced_import(&outputs_dir, &indexer, Some(fetcher))?;
loop {
let i = Instant::now();
let starting_indexes = indexer.checked_index(&blocks, &client, &exit)?;
computer.compute(&indexer, starting_indexes, &reader, &exit)?;
dbg!(i.elapsed());
sleep(Duration::from_secs(10));
}
})?
.join()
.unwrap()
loop {
let i = Instant::now();
let starting_indexes = indexer.checked_index(&blocks, &client, &exit)?;
computer.compute(&indexer, starting_indexes, &reader, &exit)?;
dbg!(i.elapsed());
sleep(Duration::from_secs(10));
}
}

View File

@@ -124,20 +124,12 @@ impl Indexer {
};
let mut readers = Readers::new(&self.vecs);
let mut already_added_addressbyteshash: FxHashMap<AddressBytesHash, TypeIndex> =
FxHashMap::default();
let mut same_block_spent_outpoints: FxHashSet<OutPoint> = FxHashSet::default();
let mut same_block_output_info: FxHashMap<OutPoint, (OutputType, TypeIndex)> =
FxHashMap::default();
let vecs = &mut self.vecs;
let stores = &mut self.stores;
for block in blocks.after(prev_hash)? {
// let i_tot = Instant::now();
already_added_addressbyteshash.clear();
same_block_spent_outpoints.clear();
same_block_output_info.clear();
let height = block.height();
let blockhash = block.hash();
@@ -305,16 +297,19 @@ impl Indexer {
// println!("txinindex_and_txindata = : {:?}", i.elapsed());
// let i = Instant::now();
same_block_spent_outpoints.extend(txins.iter().filter_map(|(_, input_source)| {
let InputSource::SameBlock((_, _, _, outpoint)) = input_source else {
return None;
};
if !outpoint.is_coinbase() {
Some(*outpoint)
} else {
None
}
}));
let same_block_spent_outpoints: FxHashSet<OutPoint> = txins
.iter()
.filter_map(|(_, input_source)| {
let InputSource::SameBlock((_, _, _, outpoint)) = input_source else {
return None;
};
if !outpoint.is_coinbase() {
Some(*outpoint)
} else {
None
}
})
.collect();
// println!("same_block_spent_outpoints = : {:?}", i.elapsed());
// let i = Instant::now();
@@ -475,6 +470,10 @@ impl Indexer {
let tx_len = block.txdata.len();
// let i = Instant::now();
let mut already_added_addressbyteshash: FxHashMap<AddressBytesHash, TypeIndex> =
FxHashMap::default();
let mut same_block_output_info: FxHashMap<OutPoint, (OutputType, TypeIndex)> =
FxHashMap::default();
txouts
.into_iter()
.try_for_each(|data| -> Result<()> {

View File

@@ -8,7 +8,7 @@ use brk_types::{
TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
Vout,
};
use fjall2::{Keyspace, PersistMode};
use fjall2::{PersistMode, TransactionalKeyspace};
use rayon::prelude::*;
use vecdb::{AnyVec, StoredIndex, VecIterator};
@@ -18,7 +18,7 @@ use super::Vecs;
#[derive(Clone)]
pub struct Stores {
pub keyspace: Keyspace,
pub keyspace: TransactionalKeyspace,
pub addressbyteshash_to_typeindex: Store<AddressBytesHash, TypeIndex>,
pub blockhashprefix_to_height: Store<BlockHashPrefix, Height>,

View File

@@ -1,8 +1,11 @@
use std::thread::sleep;
use std::{mem, sync::Arc, time::Duration};
use bitcoin::block::Header;
use bitcoin::consensus::encode;
use bitcoincore_rpc::json::{GetBlockHeaderResult, GetBlockResult, GetTxOutResult};
use bitcoincore_rpc::json::{
GetBlockHeaderResult, GetBlockResult, GetBlockchainInfoResult, GetTxOutResult,
};
use bitcoincore_rpc::{Client as CoreClient, Error as RpcError, RpcApi};
use brk_error::Result;
use brk_types::{BlockHash, Height, Sats, Transaction, TxIn, TxOut, TxStatus, Txid, Vout};
@@ -12,6 +15,7 @@ pub use bitcoincore_rpc::Auth;
mod inner;
use inner::ClientInner;
use log::info;
///
/// Bitcoin Core RPC Client
@@ -40,6 +44,13 @@ impl Client {
)?)))
}
/// Returns a data structure containing various state info regarding
/// blockchain processing.
pub fn get_blockchain_info(&self) -> Result<GetBlockchainInfoResult> {
self.call(move |c| c.get_blockchain_info())
.map_err(Into::into)
}
pub fn get_block<'a, H>(&self, hash: &'a H) -> Result<bitcoin::Block>
where
&'a H: Into<&'a bitcoin::BlockHash>,
@@ -230,6 +241,22 @@ impl Client {
}
}
pub fn wait_for_synced_node(&self) -> Result<()> {
let is_synced = || -> Result<bool> {
let info = self.get_blockchain_info()?;
Ok(info.headers == info.blocks)
};
if !is_synced()? {
info!("Waiting for node to sync...");
while !is_synced()? {
sleep(Duration::from_secs(1))
}
}
Ok(())
}
pub fn call<F, T>(&self, f: F) -> Result<T, RpcError>
where
F: Fn(&CoreClient) -> Result<T, RpcError>,

View File

@@ -1,4 +1,8 @@
use std::{path::Path, thread::sleep, time::Duration};
use std::{
path::Path,
thread::{self, sleep},
time::Duration,
};
use brk_computer::Computer;
@@ -10,14 +14,29 @@ use brk_query::Query;
use brk_reader::Reader;
use brk_rpc::{Auth, Client};
use brk_server::Server;
use log::info;
use vecdb::Exit;
pub fn main() -> Result<()> {
// Can't increase main thread's stack size, thus we need to use another thread
thread::Builder::new()
.stack_size(512 * 1024 * 1024)
.spawn(run)?
.join()
.unwrap()
}
fn run() -> Result<()> {
brk_logger::init(Some(Path::new(".log")))?;
let process = true;
let bitcoin_dir = Path::new(&std::env::var("HOME").unwrap())
.join("Library")
.join("Application Support")
.join("Bitcoin");
// let bitcoin_dir = Path::new("/Volumes/WD_BLACK1/bitcoin");
let bitcoin_dir = Path::new("");
let outputs_dir = Path::new(&std::env::var("HOME").unwrap()).join(".brk");
// let outputs_dir = Path::new("../../_outputs");
let client = Client::new(
"http://localhost:8332",
@@ -28,46 +47,48 @@ pub fn main() -> Result<()> {
let blocks = Blocks::new(&client, &reader);
let outputs_dir = Path::new("../../_outputs");
let mut indexer = Indexer::forced_import(outputs_dir)?;
let mut indexer = Indexer::forced_import(&outputs_dir)?;
let fetcher = Some(Fetcher::import(true, None)?);
let mut computer = Computer::forced_import(outputs_dir, &indexer, fetcher)?;
let mut computer = Computer::forced_import(&outputs_dir, &indexer, fetcher)?;
let exit = Exit::new();
exit.set_ctrlc_handler();
tokio::runtime::Builder::new_multi_thread()
let query = Query::build(&reader, &indexer, &computer);
let future = async move {
let server = Server::new(&query, None);
tokio::spawn(async move {
server.serve(true).await.unwrap();
});
Ok(()) as Result<()>
};
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async {
let query = Query::build(&reader, &indexer, &computer);
.build()?;
let server = Server::new(&query, None);
let _handle = runtime.spawn(future);
let server = tokio::spawn(async move {
server.serve(true).await.unwrap();
});
loop {
client.wait_for_synced_node()?;
if process {
loop {
let last_height = client.get_last_height()?;
let last_height = client.get_last_height()?;
let starting_indexes = indexer.checked_index(&blocks, &client, &exit)?;
info!("{} blocks found.", u32::from(last_height) + 1);
computer.compute(&indexer, starting_indexes, &reader, &exit)?;
let starting_indexes = indexer.checked_index(&blocks, &client, &exit)?;
while last_height == client.get_last_height()? {
sleep(Duration::from_secs(1))
}
}
}
computer.compute(&indexer, starting_indexes, &reader, &exit)?;
#[allow(unreachable_code)]
server.await.unwrap();
info!("Waiting for new blocks...");
Ok(())
})
while last_height == client.get_last_height()? {
sleep(Duration::from_secs(1))
}
}
}

View File

@@ -5,7 +5,7 @@ use std::{
use brk_error::Result;
use brk_types::Version;
use fjall2::{Keyspace, PartitionHandle, PersistMode};
use fjall2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle};
use super::Height;
@@ -18,13 +18,13 @@ pub struct StoreMeta {
impl StoreMeta {
pub fn checked_open<F>(
keyspace: &Keyspace,
keyspace: &TransactionalKeyspace,
path: &Path,
version: Version,
open_partition_handle: F,
) -> Result<(Self, PartitionHandle)>
) -> Result<(Self, TransactionalPartitionHandle)>
where
F: Fn() -> Result<PartitionHandle>,
F: Fn() -> Result<TransactionalPartitionHandle>,
{
fs::create_dir_all(path)?;

View File

@@ -1,10 +1,11 @@
use std::{borrow::Cow, cmp, fmt::Debug, fs, hash::Hash, path::Path};
use std::{borrow::Cow, cmp, fmt::Debug, fs, hash::Hash, mem, path::Path};
use brk_error::Result;
use brk_types::{Height, Version};
use byteview6::ByteView;
use fjall2::{
InnerItem, Keyspace, PartitionCreateOptions, PartitionHandle, PersistMode, ValueType,
InnerItem, PartitionCreateOptions, PersistMode, TransactionalKeyspace,
TransactionalPartitionHandle, ValueType,
};
use rustc_hash::{FxHashMap, FxHashSet};
@@ -18,18 +19,18 @@ use meta::*;
pub struct StoreV2<Key, Value> {
meta: StoreMeta,
name: &'static str,
keyspace: Keyspace,
partition: PartitionHandle,
keyspace: TransactionalKeyspace,
partition: TransactionalPartitionHandle,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
}
const MAJOR_FJALL_VERSION: Version = Version::TWO;
pub fn open_keyspace(path: &Path) -> fjall2::Result<Keyspace> {
pub fn open_keyspace(path: &Path) -> fjall2::Result<TransactionalKeyspace> {
fjall2::Config::new(path.join("fjall"))
.max_write_buffer_size(32 * 1024 * 1024)
.open()
.open_transactional()
}
impl<K, V> StoreV2<K, V>
@@ -39,23 +40,25 @@ where
ByteView: From<K> + From<V>,
{
fn open_partition_handle(
keyspace: &Keyspace,
keyspace: &TransactionalKeyspace,
name: &str,
bloom_filters: Option<bool>,
) -> Result<PartitionHandle> {
) -> Result<TransactionalPartitionHandle> {
let mut options = PartitionCreateOptions::default()
.max_memtable_size(8 * 1024 * 1024)
.manual_journal_persist(true);
if bloom_filters.is_some_and(|b| !b) {
options = options.bloom_filter_bits(None);
} else {
options = options.bloom_filter_bits(Some(5));
}
keyspace.open_partition(name, options).map_err(|e| e.into())
}
pub fn import(
keyspace: &Keyspace,
keyspace: &TransactionalKeyspace,
path: &Path,
name: &str,
version: Version,
@@ -100,12 +103,16 @@ where
}
pub fn is_empty(&self) -> Result<bool> {
self.partition.is_empty().map_err(|e| e.into())
self.keyspace
.read_tx()
.is_empty(&self.partition)
.map_err(|e| e.into())
}
pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
self.partition
.iter()
self.keyspace
.read_tx()
.iter(&self.partition)
.map(|res| res.unwrap())
.map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
}
@@ -140,6 +147,11 @@ where
}
}
#[inline]
pub fn approximate_len(&self) -> usize {
self.partition.approximate_len()
}
#[inline]
fn has(&self, height: Height) -> bool {
self.meta.has(height)
@@ -169,16 +181,19 @@ where
return Ok(());
}
let mut items = self
.puts
.drain()
let mut items = mem::take(&mut self.puts)
.into_iter()
.map(|(key, value)| Item::Value { key, value })
.chain(self.dels.drain().map(|key| Item::Tomb(key)))
.chain(
mem::take(&mut self.dels)
.into_iter()
.map(|key| Item::Tomb(key)),
)
.collect::<Vec<_>>();
items.sort_unstable();
self.keyspace.batch().commit_partition(
&self.partition,
self.keyspace.inner().batch().commit_partition(
self.partition.inner(),
items.into_iter().map(InnerItem::from).collect::<Vec<_>>(),
)?;