rpc: init wrapper crate + global: snapshot

This commit is contained in:
nym21
2025-10-20 23:06:25 +02:00
parent 9b230d23dd
commit 4ffa2e3993
39 changed files with 1055 additions and 832 deletions

1248
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -41,10 +41,12 @@ brk_fetcher = { version = "0.0.111", path = "crates/brk_fetcher" }
brk_grouper = { version = "0.0.111", path = "crates/brk_grouper" }
brk_indexer = { version = "0.0.111", path = "crates/brk_indexer" }
brk_interface = { version = "0.0.111", path = "crates/brk_interface" }
brk_iterator = { version = "0.0.111", path = "crates/brk_iterator" }
brk_logger = { version = "0.0.111", path = "crates/brk_logger" }
brk_mcp = { version = "0.0.111", path = "crates/brk_mcp" }
brk_monitor = { version = "0.0.111", path = "crates/brk_monitor" }
brk_reader = { version = "0.0.111", path = "crates/brk_reader" }
brk_rpc = { version = "0.0.111", path = "crates/brk_rpc" }
brk_server = { version = "0.0.111", path = "crates/brk_server" }
brk_store = { version = "0.0.111", path = "crates/brk_store" }
brk_structs = { version = "0.0.111", path = "crates/brk_structs" }
@@ -53,10 +55,11 @@ brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_d
byteview = "=0.6.1"
# byteview = "~0.8.0"
derive_deref = "1.1.1"
fjall2 = { path = "../fjall2", package = "fjall" }
fjall2 = { version = "2.11.5", package = "brk_fjall" }
# fjall2 = { path = "../fjall2", package = "brk_fjall" }
# fjall2 = { version = "2.11.2", package = "fjall" }
# fjall3 = { version = "=3.0.0-pre.0", package = "fjall" }
fjall3 = { path = "../fjall3", package = "fjall" }
# fjall3 = { path = "../fjall3", package = "fjall" }
# fjall3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "bb15057500dce3115d7644d268b9deeaa895b431", package = "fjall" }
jiff = "0.2.15"
log = "0.4.28"

View File

@@ -12,7 +12,8 @@ build = "build.rs"
[dependencies]
log = { workspace = true }
notify = "8.2.0"
rolldown = "0.1.0"
rolldown = { version = "0.2.3", package = "brk_rolldown" }
# rolldown = "0.1.0"
# brk_rolldown = "0.2.3"
# brk_rolldown = { path = "../../../rolldown/crates/rolldown"}
sugar_path = "1.2.0"

View File

@@ -22,7 +22,7 @@ brk_logger = { workspace = true }
brk_reader = { workspace = true }
brk_server = { workspace = true }
vecdb = { workspace = true }
clap = { version = "4.5.49", features = ["derive", "string"] }
clap = { version = "4.5.50", features = ["derive", "string"] }
color-eyre = "0.6.5"
log = { workspace = true }
minreq = { workspace = true }

View File

@@ -48,13 +48,13 @@ pub fn run() -> color_eyre::Result<()> {
let exit = Exit::new();
exit.set_ctrlc_handler();
let parser = Reader::new(config.blocksdir(), rpc);
let reader = Reader::new(config.blocksdir(), rpc);
let mut indexer = Indexer::forced_import(&config.brkdir())?;
let mut computer = Computer::forced_import(&config.brkdir(), &indexer, config.fetcher())?;
let interface = Interface::build(&parser, &indexer, &computer);
let interface = Interface::build(&reader, &indexer, &computer);
let website = config.website();
@@ -135,11 +135,11 @@ pub fn run() -> color_eyre::Result<()> {
info!("{} blocks found.", block_count + 1);
let starting_indexes = indexer
.index(&parser, rpc, &exit, config.check_collisions())
.index(&reader, rpc, &exit, config.check_collisions())
.unwrap();
computer
.compute(&indexer, starting_indexes, &parser, &exit)
.compute(&indexer, starting_indexes, &reader, &exit)
.unwrap();
info!("Waiting for new blocks...");

View File

@@ -24,7 +24,7 @@ brk_structs = { workspace = true }
brk_traversable = { workspace = true }
derive_deref = { workspace = true }
log = { workspace = true }
pco = "0.4.6"
pco = "0.4.7"
rayon = { workspace = true }
serde = { workspace = true }
vecdb = { workspace = true }

View File

@@ -34,7 +34,7 @@ pub fn main() -> Result<()> {
let outputs_dir = Path::new(&std::env::var("HOME").unwrap()).join(".brk");
// let outputs_dir = Path::new("../../_outputs");
let parser = Reader::new(bitcoin_dir.join("blocks"), rpc);
let reader = Reader::new(bitcoin_dir.join("blocks"), rpc);
let mut indexer = Indexer::forced_import(&outputs_dir)?;
@@ -44,8 +44,8 @@ pub fn main() -> Result<()> {
loop {
let i = Instant::now();
let starting_indexes = indexer.index(&parser, rpc, &exit, true)?;
computer.compute(&indexer, starting_indexes, &parser, &exit)?;
let starting_indexes = indexer.index(&reader, rpc, &exit, true)?;
computer.compute(&indexer, starting_indexes, &reader, &exit)?;
dbg!(i.elapsed());
sleep(Duration::from_secs(10));
}

View File

@@ -70,6 +70,8 @@ impl Computer {
price::Vecs::forced_import(&computed_path, VERSION + Version::ZERO, &indexes).unwrap()
});
import in theads
Ok(Self {
constants: constants::Vecs::forced_import(
&computed_path,

View File

@@ -13,7 +13,7 @@ build = "build.rs"
bitcoin = { workspace = true }
bitcoincore-rpc = { workspace = true }
fjall2 = { workspace = true }
fjall3 = { workspace = true }
# fjall3 = { workspace = true }
jiff = { workspace = true }
minreq = { workspace = true }
sonic-rs = { workspace = true }

View File

@@ -13,7 +13,7 @@ pub enum Error {
BitcoinRPC(bitcoincore_rpc::Error),
Jiff(jiff::Error),
FjallV2(fjall2::Error),
FjallV3(fjall3::Error),
// FjallV3(fjall3::Error),
VecDB(vecdb::Error),
SeqDB(vecdb::SeqDBError),
Minreq(minreq::Error),
@@ -107,11 +107,11 @@ impl From<jiff::Error> for Error {
}
}
impl From<fjall3::Error> for Error {
fn from(value: fjall3::Error) -> Self {
Self::FjallV3(value)
}
}
// impl From<fjall3::Error> for Error {
// fn from(value: fjall3::Error) -> Self {
// Self::FjallV3(value)
// }
// }
impl From<fjall2::Error> for Error {
fn from(value: fjall2::Error) -> Self {
@@ -145,7 +145,7 @@ impl fmt::Display for Error {
Error::BitcoinFromScriptError(error) => Display::fmt(&error, f),
Error::BitcoinRPC(error) => Display::fmt(&error, f),
Error::FjallV2(error) => Display::fmt(&error, f),
Error::FjallV3(error) => Display::fmt(&error, f),
// Error::FjallV3(error) => Display::fmt(&error, f),
Error::IO(error) => Display::fmt(&error, f),
Error::Jiff(error) => Display::fmt(&error, f),
Error::Minreq(error) => Display::fmt(&error, f),

View File

@@ -30,7 +30,7 @@ pub struct ByAddressType<T> {
}
impl<T> ByAddressType<T> {
pub fn new<F>(f: F) -> Result<Self>
pub fn new_with_name<F>(f: F) -> Result<Self>
where
F: Fn(&'static str) -> Result<T>,
{
@@ -46,6 +46,22 @@ impl<T> ByAddressType<T> {
})
}
pub fn new_with_index<F>(f: F) -> Result<Self>
where
F: Fn(usize) -> Result<T>,
{
Ok(Self {
p2pk65: f(0)?,
p2pk33: f(1)?,
p2pkh: f(2)?,
p2sh: f(3)?,
p2wpkh: f(4)?,
p2wsh: f(5)?,
p2tr: f(6)?,
p2a: f(7)?,
})
}
pub fn get_unwrap(&self, address_type: OutputType) -> &T {
self.get(address_type).unwrap()
}

View File

@@ -20,7 +20,7 @@ brk_store = { workspace = true }
brk_structs = { workspace = true }
brk_traversable = { workspace = true }
fjall2 = { workspace = true }
fjall3 = { workspace = true }
# fjall3 = { workspace = true }
log = { workspace = true }
rayon = { workspace = true }
rustc-hash = { workspace = true }

View File

@@ -33,7 +33,7 @@ fn main() -> Result<()> {
let exit = Exit::new();
exit.set_ctrlc_handler();
let parser = Reader::new(blocks_dir, rpc);
let reader = Reader::new(blocks_dir, rpc);
fs::create_dir_all(&outputs_dir)?;
@@ -47,7 +47,7 @@ fn main() -> Result<()> {
loop {
let i = Instant::now();
indexer.index(&parser, rpc, &exit, true)?;
indexer.index(&reader, rpc, &exit, true)?;
dbg!(i.elapsed());
sleep(Duration::from_secs(60));

View File

@@ -28,7 +28,7 @@ pub use vecs::*;
// Increment on **change _OR_ addition**
const VERSION: Version = Version::new(22);
const SNAPSHOT_BLOCK_RANGE: usize = 1_000;
const COLLISIONS_CHECKED_UP_TO: Height = Height::new(909_150);
const COLLISIONS_CHECKED_UP_TO: Height = Height::new(0);
#[derive(Clone)]
pub struct Indexer {

View File

@@ -46,61 +46,30 @@ impl Stores {
let keyspace_ref = &keyspace;
let create_addressindex_and_txindex_store = |cohort| {
let create_addressindex_and_txindex_store = |index| {
Store::import(
keyspace_ref,
path,
&format!("{}addressindex_and_txindex", cohort),
&format!("a2t{}", index),
version,
Some(false),
)
};
let create_addressindex_and_unspentoutpoint_store = |cohort| {
Store::import(
keyspace_ref,
path,
&format!("{}addressindex_and_unspentoutpoint", cohort),
version,
None,
)
};
let create_addressindex_and_unspentoutpoint_store =
|index| Store::import(keyspace_ref, path, &format!("a2u{}", index), version, None);
Ok(Self {
keyspace: keyspace.clone(),
height_to_coinbase_tag: Store::import(
keyspace_ref,
path,
"height_to_coinbase_tag",
version,
None,
)?,
addressbyteshash_to_typeindex: Store::import(
keyspace_ref,
path,
"addressbyteshash_to_typeindex",
version,
None,
)?,
blockhashprefix_to_height: Store::import(
keyspace_ref,
path,
"blockhashprefix_to_height",
version,
None,
)?,
txidprefix_to_txindex: Store::import(
keyspace_ref,
path,
"txidprefix_to_txindex",
version,
None,
)?,
addresstype_to_typeindex_and_txindex: ByAddressType::new(
height_to_coinbase_tag: Store::import(keyspace_ref, path, "h2c", version, None)?,
addressbyteshash_to_typeindex: Store::import(keyspace_ref, path, "a2t", version, None)?,
blockhashprefix_to_height: Store::import(keyspace_ref, path, "b2h", version, None)?,
txidprefix_to_txindex: Store::import(keyspace_ref, path, "t2t", version, None)?,
addresstype_to_typeindex_and_txindex: ByAddressType::new_with_index(
create_addressindex_and_txindex_store,
)?,
addresstype_to_typeindex_and_unspentoutpoint: ByAddressType::new(
addresstype_to_typeindex_and_unspentoutpoint: ByAddressType::new_with_index(
create_addressindex_and_unspentoutpoint_store,
)?,
})

View File

@@ -29,13 +29,13 @@ pub fn main() -> Result<()> {
let exit = Exit::new();
exit.set_ctrlc_handler();
let parser = Reader::new(blocks_dir, rpc);
let reader = Reader::new(blocks_dir, rpc);
let indexer = Indexer::forced_import(&outputs_dir)?;
let computer = Computer::forced_import(&outputs_dir, &indexer, None)?;
let interface = Interface::build(&parser, &indexer, &computer);
let interface = Interface::build(&reader, &indexer, &computer);
dbg!(interface.search_and_format(Params {
index: Index::Height,

View File

@@ -34,7 +34,7 @@ pub fn get_transaction_info(
let txid = indexer.vecs.txindex_to_txid.iter().unwrap_get_inner(index);
let parser = interface.parser();
let reader = interface.parser();
let computer = interface.computer();
let position = computer
@@ -48,7 +48,7 @@ pub fn get_transaction_info(
.iter()
.unwrap_get_inner(index);
let blk_index_to_blk_path = parser.blk_index_to_blk_path();
let blk_index_to_blk_path = reader.blk_index_to_blk_path();
let Some(blk_path) = blk_index_to_blk_path.get(&position.blk_index()) else {
return Err(Error::Str("Failed to get the correct blk file"));
@@ -72,7 +72,7 @@ pub fn get_transaction_info(
if file.read_exact(&mut buffer).is_err() {
return Err(Error::Str("Failed to read the transaction (read exact)"));
}
xori.bytes(&mut buffer, parser.xor_bytes());
xori.bytes(&mut buffer, reader.xor_bytes());
let mut reader = Cursor::new(buffer);
let Ok(_) = bitcoin::Transaction::consensus_decode(&mut reader) else {

View File

@@ -40,14 +40,14 @@ pub struct Interface<'a> {
impl<'a> Interface<'a> {
pub fn build(parser: &Reader, indexer: &Indexer, computer: &Computer) -> Self {
let parser = parser.static_clone();
let reader = parser.static_clone();
let indexer = indexer.static_clone();
let computer = computer.static_clone();
let vecs = Vecs::build(indexer, computer);
Self {
vecs,
parser,
parser: reader,
indexer,
computer,
}

View File

@@ -0,0 +1,15 @@
[package]
name = "brk_iterator"
description = "A Bitcoin block iterator"
version.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
rust-version.workspace = true
build = "build.rs"
[dependencies]
bitcoin = { workspace = true }
brk_reader = { workspace = true }
brk_rpc = { workspace = true }

View File

@@ -0,0 +1 @@
# brk_iterator

View File

@@ -0,0 +1,8 @@
fn main() {
let profile = std::env::var("PROFILE").unwrap_or_default();
if profile == "release" {
println!("cargo:rustc-flag=-C");
println!("cargo:rustc-flag=target-cpu=native");
}
}

View File

@@ -0,0 +1 @@
fn main() {}

View File

@@ -0,0 +1,13 @@
use brk_rpc::Client;
pub struct BlockIterator {
client: Client,
}
impl BlockIterator {
pub fn new(client: Client) -> Self {
Self { client }
}
pub fn iter() {}
}

View File

@@ -1,71 +0,0 @@
use std::path::Path;
use bitcoincore_rpc::{Auth, Client};
use brk_reader::Reader;
use brk_structs::{Height, OutputType};
fn main() {
let i = std::time::Instant::now();
let bitcoin_dir = Path::new("").join("");
let rpc = Box::leak(Box::new(
Client::new(
"http://localhost:8332",
Auth::CookieFile(bitcoin_dir.join(".cookie")),
)
.unwrap(),
));
// let start = None;
// let end = None;
let parser = Reader::new(bitcoin_dir.join("blocks"), rpc);
// parser
// .parse(start, end)
// .iter()
// .for_each(|(height, _block, hash)| {
// println!("{height}: {hash}");
// });
// println!(
// "{}",
// parser
// .get(Height::new(0))
// .txdata
// .first()
// .unwrap()
// .output
// .first()
// .unwrap()
// .script_pubkey
// );
let block_850_000 = parser.get(Height::new(850_000)).unwrap();
let tx = block_850_000.txdata.iter().find(|tx| {
tx.compute_txid().to_string()
== "b10c0000004da5a9d1d9b4ae32e09f0b3e62d21a5cce5428d4ad714fb444eb5d"
});
let output = tx.unwrap().tx_out(7).unwrap();
dbg!(OutputType::from(&output.script_pubkey));
dbg!(output);
// println!(
// "{}",
// .txdata
// .first()
// .unwrap()
// .output
// .first()
// .unwrap()
// .value
// );
dbg!(i.elapsed());
}

View File

@@ -1,13 +1,13 @@
use bitcoin::{Transaction, VarInt, block::Header, consensus::Decodable, io::Cursor};
use bitcoincore_rpc::RpcApi;
use brk_error::Result;
use brk_structs::{BlkMetadata, Block, Height, ParsedBlock};
use brk_structs::{BlkMetadata, Block, Height, ReadBlock};
use crate::{XORBytes, XORIndex};
pub enum AnyBlock {
Raw(Vec<u8>),
Decoded(ParsedBlock),
Decoded(ReadBlock),
Skipped,
}
@@ -67,7 +67,7 @@ impl AnyBlock {
let block = bitcoin::Block { header, txdata };
let block = Block::from((height, hash, block));
let block = ParsedBlock::from((block, metadata, tx_metadata));
let block = ReadBlock::from((block, metadata, tx_metadata));
Ok(Self::Decoded(block))
}

View File

@@ -16,7 +16,7 @@ use bitcoin::{block::Header, consensus::Decodable};
use bitcoincore_rpc::RpcApi;
use blk_index_to_blk_path::*;
use brk_error::Result;
use brk_structs::{BlkMetadata, BlkPosition, Block, Height, ParsedBlock};
use brk_structs::{BlkMetadata, BlkPosition, Block, Height, ReadBlock};
use crossbeam::channel::{Receiver, bounded};
use parking_lot::{RwLock, RwLockReadGuard};
use rayon::prelude::*;
@@ -75,7 +75,7 @@ impl Reader {
///
/// For an example checkout `./main.rs`
///
pub fn read(&self, start: Option<Height>, end: Option<Height>) -> Receiver<ParsedBlock> {
pub fn read(&self, start: Option<Height>, end: Option<Height>) -> Receiver<ReadBlock> {
let rpc = self.rpc;
let (send_bytes, recv_bytes) = bounded(BOUND_CAP / 2);

18
crates/brk_rpc/Cargo.toml Normal file
View File

@@ -0,0 +1,18 @@
[package]
name = "brk_rpc"
description = "A thin wrapper around bitcoincore-rpc"
version.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
rust-version.workspace = true
build = "build.rs"
[dependencies]
bitcoin = { workspace = true }
bitcoincore-rpc = "0.19.0"
brk_error = { workspace = true }
brk_logger = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }

1
crates/brk_rpc/README.md Normal file
View File

@@ -0,0 +1 @@
# brk_rpc

8
crates/brk_rpc/build.rs Normal file
View File

@@ -0,0 +1,8 @@
fn main() {
let profile = std::env::var("PROFILE").unwrap_or_default();
if profile == "release" {
println!("cargo:rustc-flag=-C");
println!("cargo:rustc-flag=target-cpu=native");
}
}

View File

@@ -0,0 +1,21 @@
use std::path::Path;
use bitcoincore_rpc::RpcApi;
use brk_rpc::{Auth, Client};
fn main() {
brk_logger::init(None).unwrap();
let bitcoin_dir = Path::new(&std::env::var("HOME").unwrap())
.join("Library")
.join("Application Support")
.join("Bitcoin");
let auth = Auth::CookieFile(bitcoin_dir.join(".cookie"));
let client = Client::new("http://localhost:8332", auth).unwrap();
loop {
println!("{:?}", client.call(|c| c.get_block_count()).unwrap());
}
}

139
crates/brk_rpc/src/inner.rs Normal file
View File

@@ -0,0 +1,139 @@
use bitcoincore_rpc::{Client as CoreClient, Error as RpcError, jsonrpc};
use brk_error::Result;
use log::info;
use parking_lot::RwLock;
use std::time::Duration;
pub use bitcoincore_rpc::Auth;
pub struct ClientInner {
url: String,
auth: Auth,
client: RwLock<CoreClient>,
max_retries: usize,
retry_delay: Duration,
}
impl ClientInner {
pub fn new(url: &str, auth: Auth, max_retries: usize, retry_delay: Duration) -> Result<Self> {
let client = Self::retry(max_retries, retry_delay, || {
CoreClient::new(url, auth.clone()).map_err(Into::into)
})?;
Ok(Self {
url: url.to_string(),
auth,
client: RwLock::new(client),
max_retries,
retry_delay,
})
}
fn recreate(&self) -> Result<()> {
*self.client.write() = CoreClient::new(&self.url, self.auth.clone())?;
Ok(())
}
fn is_retriable(error: &RpcError) -> bool {
matches!(
error,
RpcError::JsonRpc(jsonrpc::Error::Rpc(e))
if e.code == -32600 || e.code == 401 || e.code == -28
) || matches!(error, RpcError::JsonRpc(jsonrpc::Error::Transport(_)))
}
fn retry<F, T>(max_retries: usize, delay: Duration, mut f: F) -> Result<T>
where
F: FnMut() -> Result<T>,
{
let mut last_error = None;
for attempt in 0..=max_retries {
if attempt > 0 {
info!(
"Retrying to connect to Bitcoin Core (attempt {}/{})",
attempt, max_retries
);
std::thread::sleep(delay);
}
match f() {
Ok(value) => {
if attempt > 0 {
info!(
"Successfully connected to Bitcoin Core after {} retries",
attempt
);
}
return Ok(value);
}
Err(e) => {
if attempt == 0 {
info!("Could not connect to Bitcoin Core, retrying: {}", e);
}
last_error = Some(e);
}
}
}
let err = last_error.unwrap();
info!(
"Failed to connect to Bitcoin Core after {} attempts",
max_retries + 1
);
Err(err)
}
pub fn call_with_retry<F, T>(&self, f: F) -> Result<T, RpcError>
where
F: Fn(&CoreClient) -> Result<T, RpcError>,
{
for attempt in 0..=self.max_retries {
if attempt > 0 {
info!(
"Trying to reconnect to Bitcoin Core (attempt {}/{})",
attempt, self.max_retries
);
self.recreate().ok();
std::thread::sleep(self.retry_delay);
}
match f(&self.client.read()) {
Ok(value) => {
if attempt > 0 {
info!(
"Successfully reconnected to Bitcoin Core after {} attempts",
attempt
);
}
return Ok(value);
}
Err(e) if Self::is_retriable(&e) => {
if attempt == 0 {
info!("Lost connection to Bitcoin Core, reconnecting...");
}
}
Err(e) => return Err(e),
}
}
info!(
"Could not reconnect to Bitcoin Core after {} attempts",
self.max_retries + 1
);
Err(RpcError::JsonRpc(jsonrpc::Error::Rpc(
jsonrpc::error::RpcError {
code: -1,
message: "Max retries exceeded".to_string(),
data: None,
},
)))
}
pub fn call_once<F, T>(&self, f: F) -> Result<T, RpcError>
where
F: Fn(&CoreClient) -> Result<T, RpcError>,
{
f(&self.client.read())
}
}

103
crates/brk_rpc/src/lib.rs Normal file
View File

@@ -0,0 +1,103 @@
use bitcoin::BlockHash;
use bitcoincore_rpc::json::GetBlockResult;
use bitcoincore_rpc::{Client as CoreClient, Error as RpcError, RpcApi};
use brk_error::Result;
use std::sync::Arc;
use std::time::Duration;
pub use bitcoincore_rpc::Auth;
mod inner;
use inner::ClientInner;
///
/// Bitcoin Core RPC Client
///
/// Free to clone (Arc)
///
#[derive(Clone)]
pub struct Client(Arc<ClientInner>);
impl Client {
pub fn new(url: &str, auth: Auth) -> Result<Self> {
Self::new_with(url, auth, 1_000_000, Duration::from_secs(1))
}
pub fn new_with(
url: &str,
auth: Auth,
max_retries: usize,
retry_delay: Duration,
) -> Result<Self> {
Ok(Self(Arc::new(ClientInner::new(
url,
auth,
max_retries,
retry_delay,
)?)))
}
pub fn get_block_info(&self, hash: &BlockHash) -> Result<GetBlockResult> {
self.call(|c| c.get_block_info(hash)).map_err(Into::into)
}
/// Checks if a block is in the main chain (has positive confirmations)
pub fn is_in_main_chain(&self, hash: &BlockHash) -> Result<bool> {
let block_info = self.get_block_info(hash)?;
Ok(block_info.confirmations > 0)
}
pub fn get_closest_valid_height(&self, hash: BlockHash) -> Result<u64> {
// First, try to get block info for the hash
match self.get_block_info(&hash) {
Ok(block_info) => {
// Check if this block is in the main chain
if self.is_in_main_chain(&hash)? {
// Block is in the main chain
Ok(block_info.height as u64)
} else {
// Confirmations is -1, meaning it's on a fork
// We need to find where it diverged from the main chain
// Get the previous block hash and walk backwards
let mut current_hash = block_info
.previousblockhash
.ok_or("Genesis block has no previous block")?;
loop {
if self.is_in_main_chain(&current_hash)? {
// Found a block in the main chain
let current_info = self.get_block_info(&current_hash)?;
return Ok(current_info.height as u64);
}
// Continue walking backwards
let current_info = self.get_block_info(&current_hash)?;
current_hash = current_info
.previousblockhash
.ok_or("Reached genesis without finding main chain")?;
}
}
}
Err(_) => {
// Block not found in the node's database at all
Err("Block hash not found in blockchain".into())
}
}
}
pub fn call<F, T>(&self, f: F) -> Result<T, RpcError>
where
F: Fn(&CoreClient) -> Result<T, RpcError>,
{
self.0.call_with_retry(f)
}
pub fn call_once<F, T>(&self, f: F) -> Result<T, RpcError>
where
F: Fn(&CoreClient) -> Result<T, RpcError>,
{
self.0.call_once(f)
}
}

View File

@@ -27,7 +27,7 @@ brk_traversable = { workspace = true }
vecdb = { workspace = true }
jiff = { workspace = true }
log = { workspace = true }
quick_cache = "0.6.17"
quick_cache = "0.6.18"
schemars = { workspace = true }
serde = { workspace = true }
sonic-rs = { workspace = true }

View File

@@ -25,7 +25,7 @@ pub fn main() -> Result<()> {
let exit = Exit::new();
exit.set_ctrlc_handler();
let parser = Reader::new(bitcoin_dir.join("blocks"), rpc);
let reader = Reader::new(bitcoin_dir.join("blocks"), rpc);
let outputs_dir = Path::new("../../_outputs");
@@ -39,7 +39,7 @@ pub fn main() -> Result<()> {
.enable_all()
.build()?
.block_on(async {
let interface = Interface::build(&parser, &indexer, &computer);
let interface = Interface::build(&reader, &indexer, &computer);
let server = Server::new(interface, None);
@@ -51,9 +51,9 @@ pub fn main() -> Result<()> {
loop {
let block_count = rpc.get_block_count()?;
let starting_indexes = indexer.index(&parser, rpc, &exit, true)?;
let starting_indexes = indexer.index(&reader, rpc, &exit, true)?;
computer.compute(&indexer, starting_indexes, &parser, &exit)?;
computer.compute(&indexer, starting_indexes, &reader, &exit)?;
while block_count == rpc.get_block_count()? {
sleep(Duration::from_secs(1))

View File

@@ -17,7 +17,7 @@ brk_structs = { workspace = true }
byteview6 = { version = "=0.6.1", package = "byteview" }
byteview8 = { version = "~0.8.0", package = "byteview" }
fjall2 = { workspace = true }
fjall3 = { workspace = true }
# fjall3 = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
rustc-hash = { workspace = true }

View File

@@ -1,9 +1,11 @@
use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path};
use std::{borrow::Cow, cmp, fmt::Debug, fs, hash::Hash, path::Path};
use brk_error::Result;
use brk_structs::{Height, Version};
use byteview6::ByteView;
use fjall2::{InnerItem, Keyspace, PartitionCreateOptions, PartitionHandle, PersistMode};
use fjall2::{
InnerItem, Keyspace, PartitionCreateOptions, PartitionHandle, PersistMode, ValueType,
};
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
@@ -170,14 +172,15 @@ where
let mut items = self
.puts
.drain()
.map(|(key, value)| InnerItem::Value { key, value })
.chain(self.dels.drain().map(|key| InnerItem::WeakTombstone(key)))
.map(|(key, value)| Item::Value { key, value })
.chain(self.dels.drain().map(|key| Item::Tomb(key)))
.collect::<Vec<_>>();
items.sort_unstable();
self.keyspace
.batch()
.commit_single_partition(&self.partition, items)?;
self.keyspace.batch().commit_partition(
&self.partition,
items.into_iter().map(InnerItem::from).collect::<Vec<_>>(),
)?;
Ok(())
}
@@ -208,3 +211,57 @@ where
self.meta.version()
}
}
pub enum Item<K, V> {
Value { key: K, value: V },
Tomb(K),
}
impl<K: Ord, V> Ord for Item<K, V> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.key().cmp(other.key())
}
}
impl<K: Ord, V> PartialOrd for Item<K, V> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<K: Eq, V> PartialEq for Item<K, V> {
fn eq(&self, other: &Self) -> bool {
self.key() == other.key()
}
}
impl<K: Eq, V> Eq for Item<K, V> {}
impl<K, V> Item<K, V> {
fn key(&self) -> &K {
match self {
Self::Value { key, .. } | Self::Tomb(key) => key,
}
}
}
impl<K, V> From<Item<K, V>> for InnerItem
where
K: Into<ByteView>,
V: Into<ByteView>,
{
fn from(value: Item<K, V>) -> Self {
match value {
Item::Value { key, value } => Self {
key: key.into().into(),
value: value.into().into(),
value_type: ValueType::Value,
},
Item::Tomb(key) => Self {
key: key.into().into(),
value: [].into(),
value_type: ValueType::WeakTombstone,
},
}
}
}

View File

@@ -18,7 +18,7 @@ byteview = { workspace = true }
derive_deref = { workspace = true }
itoa = "1.0.15"
jiff = { workspace = true }
num_enum = "0.7.4"
num_enum = "0.7.5"
rapidhash = "4.1.0"
ryu = "1.0.20"
schemars = { workspace = true }

View File

@@ -56,13 +56,13 @@ impl Deref for Block {
}
#[derive(Debug)]
pub struct ParsedBlock {
pub struct ReadBlock {
block: Block,
metadata: BlkMetadata,
tx_metadata: Vec<BlkMetadata>,
}
impl From<(Block, BlkMetadata, Vec<BlkMetadata>)> for ParsedBlock {
impl From<(Block, BlkMetadata, Vec<BlkMetadata>)> for ReadBlock {
fn from((block, metadata, tx_metadata): (Block, BlkMetadata, Vec<BlkMetadata>)) -> Self {
Self {
block,
@@ -72,7 +72,7 @@ impl From<(Block, BlkMetadata, Vec<BlkMetadata>)> for ParsedBlock {
}
}
impl ParsedBlock {
impl ReadBlock {
pub fn metadata(&self) -> &BlkMetadata {
&self.metadata
}
@@ -82,7 +82,7 @@ impl ParsedBlock {
}
}
impl Deref for ParsedBlock {
impl Deref for ReadBlock {
type Target = Block;
fn deref(&self) -> &Self::Target {
&self.block