From 998db1beedc9a027f5f1fab97c6a2102e47c6cc2 Mon Sep 17 00:00:00 2001 From: nym21 Date: Wed, 10 Dec 2025 13:22:35 +0100 Subject: [PATCH] global: snapshot --- Cargo.lock | 40 +++ Cargo.toml | 3 +- crates/brk_bundler/Cargo.toml | 4 +- .../brk_computer/examples/computer_bench.rs | 21 +- crates/brk_computer/src/chain.rs | 1 - crates/brk_computer/src/indexes.rs | 2 +- crates/brk_error/src/lib.rs | 52 ++++ crates/brk_fetcher/examples/fetch_kraken.rs | 9 + .../examples/{main.rs => fetcher.rs} | 18 +- crates/brk_fetcher/src/binance.rs | 130 +++++----- crates/brk_fetcher/src/brk.rs | 51 ++-- crates/brk_fetcher/src/kraken.rs | 132 +++++----- crates/brk_fetcher/src/lib.rs | 241 ++++++++---------- crates/brk_fetcher/src/ohlc.rs | 81 ++++++ crates/brk_fetcher/src/retry.rs | 34 ++- crates/brk_fetcher/src/source.rs | 131 ++++++++++ crates/brk_grouper/src/amount_filter.rs | 6 +- crates/brk_grouper/src/filter.rs | 3 +- crates/brk_grouper/src/time_filter.rs | 108 ++++---- crates/brk_indexer/examples/indexer_bench.rs | 5 +- crates/brk_indexer/examples/indexer_bench2.rs | 3 +- crates/brk_indexer/src/processor.rs | 33 ++- 22 files changed, 698 insertions(+), 410 deletions(-) create mode 100644 crates/brk_fetcher/examples/fetch_kraken.rs rename crates/brk_fetcher/examples/{main.rs => fetcher.rs} (71%) create mode 100644 crates/brk_fetcher/src/ohlc.rs create mode 100644 crates/brk_fetcher/src/source.rs diff --git a/Cargo.lock b/Cargo.lock index 5389d6789..fb5c9e5d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -847,6 +847,8 @@ dependencies = [ [[package]] name = "brk_rolldown" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4c28de774e0ef00d245c08e58ad26c912afd40149897af597bd50a0f65a248" dependencies = [ "anyhow", "append-only-vec", @@ -899,6 +901,8 @@ dependencies = [ [[package]] name = "brk_rolldown_common" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "468af65fa5bab1026237d36c450109018a7f2b1232c22bf34c5207b0a88c6ae1" dependencies = [ "anyhow", "arcstr", @@ -929,6 +933,8 @@ dependencies = [ [[package]] name = "brk_rolldown_dev_common" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e266f6c9721ef770e47884a626f930f6b9d195fd578195c00c679ab50eb0841" dependencies = [ "brk_rolldown_common", "brk_rolldown_error", @@ -938,6 +944,8 @@ dependencies = [ [[package]] name = "brk_rolldown_devtools" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bc520612874708cfa1476f32d706756403d0cee519fc66f276c70af2a60807d" dependencies = [ "blake3", "brk_rolldown_devtools_action", @@ -952,6 +960,8 @@ dependencies = [ [[package]] name = "brk_rolldown_devtools_action" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a13eb16a0561095b134f0e39dac08b320150c249e8c57bf919154e0d4d5f310a" dependencies = [ "serde", "ts-rs", @@ -960,6 +970,8 @@ dependencies = [ [[package]] name = "brk_rolldown_ecmascript" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8582c0a01de8978aa1fab9b14e96943104aeb9ecacbb4f427fec32b9c247b7c" dependencies = [ "arcstr", "brk_rolldown_error", @@ -971,6 +983,8 @@ dependencies = [ [[package]] name = "brk_rolldown_ecmascript_utils" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f33e703d8d85cdd73fca0d1699c559af4fba345b3110d49601d3f9b6dd5b436c" dependencies = [ "brk_rolldown_common", "oxc", @@ -980,6 +994,8 @@ dependencies = [ [[package]] name = "brk_rolldown_error" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcc15aad987d1c6c8e85defe85a0df3f6fc096089bd6fc994b3b3d54e4d1233c" dependencies = [ "anyhow", "arcstr", @@ -998,6 +1014,8 @@ dependencies = [ [[package]] name = "brk_rolldown_fs" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a993d55c60467e62a1c8093a7f092d4690bf04f39f6f99dd701cd39529aa610" dependencies = [ "oxc_resolver", "vfs", @@ -1006,6 +1024,8 @@ dependencies = [ [[package]] name = "brk_rolldown_plugin" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21c0cc2fd5929acb738d3e5d6f1b43c44f71935d6c4b1cb1f67801adb85c84fd" dependencies = [ "anyhow", "arcstr", @@ -1034,6 +1054,8 @@ dependencies = [ [[package]] name = "brk_rolldown_plugin_chunk_import_map" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c201332bec8851626e62c6320b3fdc08e17a12764db111335903a554f3b8c59" dependencies = [ "arcstr", "brk_rolldown_common", @@ -1047,6 +1069,8 @@ dependencies = [ [[package]] name = "brk_rolldown_plugin_data_uri" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fc1e9e46d4f01c138de7234153b2d8947a78de88503cc15e24fe9607bcd0614" dependencies = [ "arcstr", "base64-simd", @@ -1060,6 +1084,8 @@ dependencies = [ [[package]] name = "brk_rolldown_plugin_hmr" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87fbad836388ac021815de74523aef5960bdc951a8dbbaf4c186a782da8bbcd5" dependencies = [ "arcstr", "brk_rolldown_common", @@ -1070,6 +1096,8 @@ dependencies = [ [[package]] name = "brk_rolldown_plugin_oxc_runtime" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e1fa517f1cdd1b84feb1d3f3e3a13a34cea498c2cfa7f216a49e634208336fc" dependencies = [ "arcstr", "brk_rolldown_plugin", @@ -1080,6 +1108,8 @@ dependencies = [ [[package]] name = "brk_rolldown_resolver" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d231a0dac24bdabca95501d3c4914db2dfd851957cc1dfc7ab163418fd64e7" dependencies = [ "anyhow", "arcstr", @@ -1095,6 +1125,8 @@ dependencies = [ [[package]] name = "brk_rolldown_sourcemap" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388f6aad886ebcb9a93ee6a88bcea70629016617296caf4643fca935cab2f044" dependencies = [ "brk_rolldown_utils", "memchr", @@ -1106,6 +1138,8 @@ dependencies = [ [[package]] name = "brk_rolldown_std_utils" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade8f44d7f6217501f7d78baa25d30c2a953d1455026933ec67ca178b8269fdf" dependencies = [ "regex", ] @@ -1113,6 +1147,8 @@ dependencies = [ [[package]] name = "brk_rolldown_tracing" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9feb3e6d9066865a18186e6791de0061910cea10ca89cdae4cc3737db62323ab" dependencies = [ "tracing", "tracing-chrome", @@ -1122,6 +1158,8 @@ dependencies = [ [[package]] name = "brk_rolldown_utils" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115e79f091452c0dc535d17c81342e7bc227e135c7d8c21e574aaa209fee24eb" dependencies = [ "anyhow", "arcstr", @@ -1214,6 +1252,8 @@ dependencies = [ [[package]] name = "brk_string_wizard" version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fd91a55e6cb47ded6db1e618443e0ce29ecb31b8f1196dcf526a738cfd24a17" dependencies = [ "memchr", "oxc_index", diff --git a/Cargo.toml b/Cargo.toml index 11e460309..6053dfd44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,8 @@ serde_derive = "1.0.228" serde_json = { version = "1.0.145", features = ["float_roundtrip"] } tokio = { version = "1.48.0", features = ["rt-multi-thread"] } vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco"] } -# vecdb = { version = "0.3.20", features = ["derive"] } +# vecdb = { git = "https://github.com/anydb-rs/anydb", features = ["derive", "serde_json", "pco"] } +# vecdb = { version = "0.3.20", features = ["derive", "serde_json", "pco"] } [workspace.metadata.release] shared-version = true diff --git a/crates/brk_bundler/Cargo.toml b/crates/brk_bundler/Cargo.toml index debb67f7d..379fae737 100644 --- a/crates/brk_bundler/Cargo.toml +++ b/crates/brk_bundler/Cargo.toml @@ -11,7 +11,7 @@ build = "build.rs" [dependencies] log = { workspace = true } notify = "8.2.0" -rolldown = { path = "../../../rolldown/crates/rolldown", package = "brk_rolldown" } -# rolldown = { version = "0.5.0", package = "brk_rolldown", features = ["experimental"] } +# rolldown = { path = "../../../rolldown/crates/rolldown", package = "brk_rolldown" } +rolldown = { version = "0.5.1", package = "brk_rolldown" } sugar_path = "1.2.1" tokio = { workspace = true } diff --git a/crates/brk_computer/examples/computer_bench.rs b/crates/brk_computer/examples/computer_bench.rs index d6e34e1b3..6868f373e 100644 --- a/crates/brk_computer/examples/computer_bench.rs +++ b/crates/brk_computer/examples/computer_bench.rs @@ -4,11 +4,12 @@ use brk_bencher::Bencher; use brk_computer::Computer; use brk_error::Result; use brk_fetcher::Fetcher; -use brk_indexer::{Indexer, Indexes}; +use brk_indexer::Indexer; +use brk_iterator::Blocks; use brk_reader::Reader; use brk_rpc::{Auth, Client}; use log::{debug, info}; -use vecdb::Exit; +use vecdb::{AnyStoredVec, Exit}; pub fn main() -> Result<()> { // Can't increase main thread's stack size, thus we need to use another thread @@ -36,12 +37,23 @@ fn run() -> Result<()> { let reader = Reader::new(bitcoin_dir.join("blocks"), &client); - let indexer = Indexer::forced_import(&outputs_dir)?; + let blocks = Blocks::new(&client, &reader); + + let mut indexer = Indexer::forced_import(&outputs_dir)?; let fetcher = Fetcher::import(true, None)?; let mut computer = Computer::forced_import(&outputs_benches_dir, &indexer, Some(fetcher))?; + dbg!( + computer + .indexes + .txinindex_to_txoutindex + .region() + .meta() + .reserved() + ); + let mut bencher = Bencher::from_cargo_env(env!("CARGO_PKG_NAME"), &outputs_dir.join("computed"))?; bencher.start()?; @@ -55,8 +67,7 @@ fn run() -> Result<()> { }); let i = Instant::now(); - // let starting_indexes = indexer.checked_index(&blocks, &client, &exit)?; - let starting_indexes = Indexes::default(); + let starting_indexes = indexer.index(&blocks, &client, &exit)?; info!("Done in {:?}", i.elapsed()); let i = Instant::now(); diff --git a/crates/brk_computer/src/chain.rs b/crates/brk_computer/src/chain.rs index 9e6a53748..b6e85a417 100644 --- a/crates/brk_computer/src/chain.rs +++ b/crates/brk_computer/src/chain.rs @@ -857,7 +857,6 @@ impl Vecs { compute_indexes_to_tx_vany(&mut self.indexes_to_tx_v2, TxVersion::TWO)?; compute_indexes_to_tx_vany(&mut self.indexes_to_tx_v3, TxVersion::THREE)?; - // Because random reads are needed, reading directly from the mmap is faster than using buffered iterators let txoutindex_to_value = &indexer.vecs.txoutindex_to_value; let txoutindex_to_value_reader = indexer.vecs.txoutindex_to_value.create_reader(); self.txinindex_to_value.compute_transform( diff --git a/crates/brk_computer/src/indexes.rs b/crates/brk_computer/src/indexes.rs index 098df1253..7184db8e1 100644 --- a/crates/brk_computer/src/indexes.rs +++ b/crates/brk_computer/src/indexes.rs @@ -275,7 +275,7 @@ impl Vecs { self.txindex_to_input_count.compute_count_from_indexes( starting_indexes.txindex, &indexer.vecs.txindex_to_first_txinindex, - &self.txinindex_to_txoutindex, + &indexer.vecs.txinindex_to_outpoint, exit, )?; diff --git a/crates/brk_error/src/lib.rs b/crates/brk_error/src/lib.rs index 7bba379c7..7e6d827fc 100644 --- a/crates/brk_error/src/lib.rs +++ b/crates/brk_error/src/lib.rs @@ -212,3 +212,55 @@ impl fmt::Display for Error { } impl std::error::Error for Error {} + +impl Error { + /// Returns true if this network/fetch error indicates a permanent/blocking condition + /// that won't be resolved by retrying (e.g., DNS failure, connection refused, blocked endpoint). + /// Returns false for transient errors worth retrying (timeouts, rate limits, server errors). + pub fn is_network_permanently_blocked(&self) -> bool { + match self { + Error::Minreq(e) => is_minreq_error_permanent(e), + Error::IO(e) => is_io_error_permanent(e), + // Other errors are data/parsing related, not network - treat as transient + _ => false, + } + } +} + +fn is_minreq_error_permanent(e: &minreq::Error) -> bool { + use minreq::Error::*; + match e { + // DNS resolution failure - likely blocked or misconfigured + IoError(io_err) => is_io_error_permanent(io_err), + // Check error message for common blocking indicators + other => { + let msg = format!("{:?}", other); + // DNS/connection failures + msg.contains("nodename nor servname") + || msg.contains("Name or service not known") + || msg.contains("No such host") + || msg.contains("connection refused") + || msg.contains("Connection refused") + // SSL/TLS failures (often due to blocking/MITM) + || msg.contains("certificate") + || msg.contains("SSL") + || msg.contains("TLS") + || msg.contains("handshake") + } + } +} + +fn is_io_error_permanent(e: &std::io::Error) -> bool { + use std::io::ErrorKind::*; + match e.kind() { + // Permanent errors + ConnectionRefused | PermissionDenied | AddrNotAvailable => true, + // Check the error message for DNS failures + _ => { + let msg = e.to_string(); + msg.contains("nodename nor servname") + || msg.contains("Name or service not known") + || msg.contains("No such host") + } + } +} diff --git a/crates/brk_fetcher/examples/fetch_kraken.rs b/crates/brk_fetcher/examples/fetch_kraken.rs new file mode 100644 index 000000000..c22464dcf --- /dev/null +++ b/crates/brk_fetcher/examples/fetch_kraken.rs @@ -0,0 +1,9 @@ +use brk_error::Result; +use brk_fetcher::Kraken; + +fn main() -> Result<()> { + brk_logger::init(None)?; + let _ = dbg!(Kraken::fetch_1d()); + let _ = dbg!(Kraken::fetch_1mn()); + Ok(()) +} diff --git a/crates/brk_fetcher/examples/main.rs b/crates/brk_fetcher/examples/fetcher.rs similarity index 71% rename from crates/brk_fetcher/examples/main.rs rename to crates/brk_fetcher/examples/fetcher.rs index 0972541b9..1283ebf2c 100644 --- a/crates/brk_fetcher/examples/main.rs +++ b/crates/brk_fetcher/examples/fetcher.rs @@ -9,20 +9,20 @@ fn main() -> Result<()> { dbg!(brk.get_from_height(Height::new(900_000))?); dbg!(brk.get_from_date(Date::new(2025, 6, 7))?); - let mut fetcher = Fetcher::import(true, None)?; + let mut fetcher = Fetcher::new(true, None)?; - Binance::fetch_1d().map(|b| { + let _ = Binance::fetch_1d().map(|b| { dbg!(b.last_key_value()); - })?; - Kraken::fetch_1d().map(|b| { + }); + let _ = Kraken::fetch_1d().map(|b| { dbg!(b.last_key_value()); - })?; - Binance::fetch_1mn().map(|b| { + }); + let _ = Binance::fetch_1mn().map(|b| { dbg!(b.last_key_value()); - })?; - Kraken::fetch_1mn().map(|b| { + }); + let _ = Kraken::fetch_1mn().map(|b| { dbg!(b.last_key_value()); - })?; + }); dbg!(fetcher.get_date(Date::new(2025, 6, 5))?); dbg!(fetcher.get_height( diff --git a/crates/brk_fetcher/src/binance.rs b/crates/brk_fetcher/src/binance.rs index 6d85ed40b..9fb473a8d 100644 --- a/crates/brk_fetcher/src/binance.rs +++ b/crates/brk_fetcher/src/binance.rs @@ -6,11 +6,14 @@ use std::{ }; use brk_error::{Error, Result}; -use brk_types::{Cents, OHLCCents, Timestamp}; +use brk_types::{Date, Height, OHLCCents, Timestamp}; use log::info; use serde_json::Value; -use crate::{Close, Date, Dollars, Fetcher, High, Low, Open, default_retry}; +use crate::{ + PriceSource, default_retry, + ohlc::{compute_ohlc_from_range, ohlc_from_array, timestamp_from_ms, date_from_timestamp}, +}; #[derive(Clone)] pub struct Binance { @@ -35,44 +38,43 @@ impl Binance { timestamp: Timestamp, previous_timestamp: Option, ) -> Result { + // Try live API data first if self._1mn.is_none() || self._1mn.as_ref().unwrap().last_key_value().unwrap().0 <= ×tamp { self._1mn.replace(Self::fetch_1mn()?); } - let res = Fetcher::find_height_ohlc( + let res = compute_ohlc_from_range( self._1mn.as_ref().unwrap(), timestamp, previous_timestamp, - "binance 1mn", + "Binance 1mn", ); if res.is_ok() { return res; } + // Fall back to HAR file data if self.har.is_none() { self.har.replace(self.read_har().unwrap_or_default()); } - Fetcher::find_height_ohlc( + compute_ohlc_from_range( self.har.as_ref().unwrap(), timestamp, previous_timestamp, - "binance har", + "Binance HAR", ) } pub fn fetch_1mn() -> Result> { - info!("Fetching 1mn prices from Binance..."); - default_retry(|_| { - Self::json_to_timestamp_to_ohlc(&serde_json::from_slice( - minreq::get(Self::url("interval=1m&limit=1000")) - .send()? - .as_bytes(), - )?) + let url = Self::url("interval=1m&limit=1000"); + info!("Fetching {url} ..."); + let json: Value = serde_json::from_slice(minreq::get(url).send()?.as_bytes())?; + Self::parse_ohlc_array(&json) }) } @@ -90,12 +92,11 @@ impl Binance { } pub fn fetch_1d() -> Result> { - info!("Fetching daily prices from Binance..."); - default_retry(|_| { - Self::json_to_date_to_ohlc(&serde_json::from_slice( - minreq::get(Self::url("interval=1d")).send()?.as_bytes(), - )?) + let url = Self::url("interval=1d"); + info!("Fetching {url} ..."); + let json: Value = serde_json::from_slice(minreq::get(url).send()?.as_bytes())?; + Self::parse_date_ohlc_array(&json) }) } @@ -167,8 +168,8 @@ impl Binance { } let text = text.unwrap().as_str().unwrap(); - - Self::json_to_timestamp_to_ohlc(&serde_json::from_str(text).unwrap()) + let json: Value = serde_json::from_str(text).unwrap(); + Self::parse_ohlc_array(&json) }) .try_fold(BTreeMap::default(), |mut all, res| { all.append(&mut res?); @@ -176,63 +177,56 @@ impl Binance { }) } - fn json_to_timestamp_to_ohlc(json: &Value) -> Result> { - Self::json_to_btree(json, Self::array_to_timestamp_and_ohlc) - } - - fn json_to_date_to_ohlc(json: &Value) -> Result> { - Self::json_to_btree(json, Self::array_to_date_and_ohlc) - } - - fn json_to_btree(json: &Value, fun: F) -> Result> - where - F: Fn(&Value) -> Result<(K, V)>, - K: Ord, - { - json.as_array() - .ok_or(Error::Str("Expect to be an array"))? + fn parse_ohlc_array(json: &Value) -> Result> { + let result = json + .as_array() + .ok_or(Error::Str("Expected JSON array"))? .iter() - .map(fun) - .collect::, _>>() + .filter_map(|v| v.as_array()) + .map(|arr| { + let ts = arr.first().and_then(|v| v.as_u64()).unwrap_or(0); + (timestamp_from_ms(ts), ohlc_from_array(arr)) + }) + .collect(); + Ok(result) } - fn array_to_timestamp_and_ohlc(array: &Value) -> Result<(Timestamp, OHLCCents)> { - let array = array.as_array().ok_or(Error::Str("Expect to be array"))?; - - let timestamp = Timestamp::from((array.first().unwrap().as_u64().unwrap() / 1_000) as u32); - - let get_cents = |index: usize| { - Cents::from(Dollars::from( - array - .get(index) - .unwrap() - .as_str() - .unwrap() - .parse::() - .unwrap(), - )) - }; - - Ok(( - timestamp, - OHLCCents::from(( - Open::new(get_cents(1)), - High::new(get_cents(2)), - Low::new(get_cents(3)), - Close::new(get_cents(4)), - )), - )) - } - - fn array_to_date_and_ohlc(array: &Value) -> Result<(Date, OHLCCents)> { - Self::array_to_timestamp_and_ohlc(array).map(|(t, ohlc)| (Date::from(t), ohlc)) + fn parse_date_ohlc_array(json: &Value) -> Result> { + Self::parse_ohlc_array(json).map(|map| { + map.into_iter() + .map(|(ts, ohlc)| (date_from_timestamp(ts), ohlc)) + .collect() + }) } fn url(query: &str) -> String { format!("https://api.binance.com/api/v3/uiKlines?symbol=BTCUSDT&{query}") } - pub fn clear(&mut self) { +} + +impl PriceSource for Binance { + fn name(&self) -> &'static str { + "Binance" + } + + fn get_date(&mut self, date: Date) -> Option> { + Some(self.get_from_1d(&date)) + } + + fn get_1mn( + &mut self, + timestamp: Timestamp, + previous_timestamp: Option, + ) -> Option> { + Some(self.get_from_1mn(timestamp, previous_timestamp)) + } + + fn get_height(&mut self, _height: Height) -> Option> { + None // Binance doesn't support height-based queries + } + + fn clear(&mut self) { self._1d.take(); self._1mn.take(); } diff --git a/crates/brk_fetcher/src/brk.rs b/crates/brk_fetcher/src/brk.rs index 3027405f7..44b369d09 100644 --- a/crates/brk_fetcher/src/brk.rs +++ b/crates/brk_fetcher/src/brk.rs @@ -1,11 +1,11 @@ use std::collections::BTreeMap; use brk_error::{Error, Result}; -use brk_types::{Cents, CheckedSub, Date, DateIndex, Height, OHLCCents}; +use brk_types::{Cents, CheckedSub, Close, Date, DateIndex, Dollars, Height, High, Low, OHLCCents, Open, Timestamp}; use log::info; use serde_json::Value; -use crate::{Close, Dollars, High, Low, Open, default_retry}; +use crate::{PriceSource, default_retry}; #[derive(Default, Clone)] #[allow(clippy::upper_case_acronyms)] @@ -25,12 +25,8 @@ impl BRK { if !self.height_to_ohlc.contains_key(&key) || ((key + self.height_to_ohlc.get(&key).unwrap().len()) <= height) { - self.height_to_ohlc.insert( - key, - Self::fetch_height_prices(key).inspect_err(|e| { - dbg!(e); - })?, - ); + self.height_to_ohlc + .insert(key, Self::fetch_height_prices(key)?); } self.height_to_ohlc @@ -42,14 +38,13 @@ impl BRK { } fn fetch_height_prices(height: Height) -> Result> { - info!("Fetching BRK height {height} prices..."); - default_retry(|_| { let url = format!( "{API_URL}/height-to-price-ohlc?from={}&to={}", height, height + CHUNK_SIZE ); + info!("Fetching {url} ..."); let body: Value = serde_json::from_slice(minreq::get(url).send()?.as_bytes())?; @@ -70,12 +65,8 @@ impl BRK { if !self.dateindex_to_ohlc.contains_key(&key) || ((key + self.dateindex_to_ohlc.get(&key).unwrap().len()) <= dateindex) { - self.dateindex_to_ohlc.insert( - key, - Self::fetch_date_prices(key).inspect_err(|e| { - dbg!(e); - })?, - ); + self.dateindex_to_ohlc + .insert(key, Self::fetch_date_prices(key)?); } self.dateindex_to_ohlc @@ -87,14 +78,13 @@ impl BRK { } fn fetch_date_prices(dateindex: DateIndex) -> Result> { - info!("Fetching BRK dateindex {dateindex} prices..."); - default_retry(|_| { let url = format!( "{API_URL}/dateindex-to-price-ohlc?from={}&to={}", dateindex, dateindex + CHUNK_SIZE ); + info!("Fetching {url}..."); let body: Value = serde_json::from_slice(minreq::get(url).send()?.as_bytes())?; @@ -128,7 +118,30 @@ impl BRK { ))) } - pub fn clear(&mut self) { +} + +impl PriceSource for BRK { + fn name(&self) -> &'static str { + "BRK" + } + + fn get_date(&mut self, date: Date) -> Option> { + Some(self.get_from_date(date)) + } + + fn get_1mn( + &mut self, + _timestamp: Timestamp, + _previous_timestamp: Option, + ) -> Option> { + None // BRK doesn't support timestamp-based queries + } + + fn get_height(&mut self, height: Height) -> Option> { + Some(self.get_from_height(height)) + } + + fn clear(&mut self) { self.height_to_ohlc.clear(); self.dateindex_to_ohlc.clear(); } diff --git a/crates/brk_fetcher/src/kraken.rs b/crates/brk_fetcher/src/kraken.rs index 45d81fee1..26abfd2eb 100644 --- a/crates/brk_fetcher/src/kraken.rs +++ b/crates/brk_fetcher/src/kraken.rs @@ -1,11 +1,14 @@ use std::collections::BTreeMap; use brk_error::{Error, Result}; -use brk_types::{Cents, Close, Date, Dollars, High, Low, OHLCCents, Open, Timestamp}; +use brk_types::{Date, Height, OHLCCents, Timestamp}; use log::info; use serde_json::Value; -use crate::{Fetcher, default_retry}; +use crate::{ + PriceSource, default_retry, + ohlc::{compute_ohlc_from_range, ohlc_from_array, timestamp_from_secs, date_from_timestamp}, +}; #[derive(Default, Clone)] pub struct Kraken { @@ -14,7 +17,7 @@ pub struct Kraken { } impl Kraken { - pub fn get_from_1mn( + fn get_from_1mn( &mut self, timestamp: Timestamp, previous_timestamp: Option, @@ -24,27 +27,26 @@ impl Kraken { { self._1mn.replace(Self::fetch_1mn()?); } - Fetcher::find_height_ohlc( + compute_ohlc_from_range( self._1mn.as_ref().unwrap(), timestamp, previous_timestamp, - "kraken 1m", + "Kraken 1mn", ) } pub fn fetch_1mn() -> Result> { - info!("Fetching 1mn prices from Kraken..."); - default_retry(|_| { - Self::json_to_timestamp_to_ohlc(&serde_json::from_slice( - minreq::get(Self::url(1)).send()?.as_bytes(), - )?) + let url = Self::url(1); + info!("Fetching {url} ..."); + let json: Value = serde_json::from_slice(minreq::get(url).send()?.as_bytes())?; + Self::parse_ohlc_response(&json) }) } - pub fn get_from_1d(&mut self, date: &Date) -> Result { + fn get_from_1d(&mut self, date: &Date) -> Result { if self._1d.is_none() || self._1d.as_ref().unwrap().last_key_value().unwrap().0 <= date { - self._1d.replace(Kraken::fetch_1d()?); + self._1d.replace(Self::fetch_1d()?); } self._1d .as_ref() @@ -55,80 +57,66 @@ impl Kraken { } pub fn fetch_1d() -> Result> { - info!("Fetching daily prices from Kraken..."); - default_retry(|_| { - Self::json_to_date_to_ohlc(&serde_json::from_slice( - minreq::get(Self::url(1440)).send()?.as_bytes(), - )?) + let url = Self::url(1440); + info!("Fetching {url} ..."); + let json: Value = serde_json::from_slice(minreq::get(url).send()?.as_bytes())?; + Self::parse_date_ohlc_response(&json) }) } - fn json_to_timestamp_to_ohlc(json: &Value) -> Result> { - Self::json_to_btree(json, Self::array_to_timestamp_and_ohlc) - } - - fn json_to_date_to_ohlc(json: &Value) -> Result> { - Self::json_to_btree(json, Self::array_to_date_and_ohlc) - } - - fn json_to_btree(json: &Value, fun: F) -> Result> - where - F: Fn(&Value) -> Result<(K, V)>, - K: Ord, - { - json.as_object() - .ok_or(Error::Str("Expect to be an object"))? + /// Parse Kraken's nested JSON response: { result: { XXBTZUSD: [...] } } + fn parse_ohlc_response(json: &Value) -> Result> { + let result = json .get("result") - .ok_or(Error::Str("Expect object to have result"))? - .as_object() - .ok_or(Error::Str("Expect to be an object"))? - .get("XXBTZUSD") - .ok_or(Error::Str("Expect to have XXBTZUSD"))? - .as_array() - .ok_or(Error::Str("Expect to be an array"))? + .and_then(|r| r.get("XXBTZUSD")) + .and_then(|v| v.as_array()) + .ok_or(Error::Str("Invalid Kraken response format"))? .iter() - .map(fun) - .collect::, _>>() + .filter_map(|v| v.as_array()) + .map(|arr| { + let ts = arr.first().and_then(|v| v.as_u64()).unwrap_or(0); + (timestamp_from_secs(ts), ohlc_from_array(arr)) + }) + .collect(); + Ok(result) } - fn array_to_timestamp_and_ohlc(array: &Value) -> Result<(Timestamp, OHLCCents)> { - let array = array.as_array().ok_or(Error::Str("Expect to be array"))?; - - let timestamp = Timestamp::from(array.first().unwrap().as_u64().unwrap() as u32); - - let get_cents = |index: usize| { - Cents::from(Dollars::from( - array - .get(index) - .unwrap() - .as_str() - .unwrap() - .parse::() - .unwrap(), - )) - }; - - Ok(( - timestamp, - OHLCCents::from(( - Open::new(get_cents(1)), - High::new(get_cents(2)), - Low::new(get_cents(3)), - Close::new(get_cents(4)), - )), - )) - } - - fn array_to_date_and_ohlc(array: &Value) -> Result<(Date, OHLCCents)> { - Self::array_to_timestamp_and_ohlc(array).map(|(t, ohlc)| (Date::from(t), ohlc)) + fn parse_date_ohlc_response(json: &Value) -> Result> { + Self::parse_ohlc_response(json).map(|map| { + map.into_iter() + .map(|(ts, ohlc)| (date_from_timestamp(ts), ohlc)) + .collect() + }) } fn url(interval: usize) -> String { format!("https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval={interval}") } +} - pub fn clear(&mut self) { +impl PriceSource for Kraken { + fn name(&self) -> &'static str { + "Kraken" + } + + fn get_date(&mut self, date: Date) -> Option> { + Some(self.get_from_1d(&date)) + } + + fn get_1mn( + &mut self, + timestamp: Timestamp, + previous_timestamp: Option, + ) -> Option> { + Some(self.get_from_1mn(timestamp, previous_timestamp)) + } + + fn get_height(&mut self, _height: Height) -> Option> { + None // Kraken doesn't support height-based queries + } + + fn clear(&mut self) { self._1d.take(); self._1mn.take(); } diff --git a/crates/brk_fetcher/src/lib.rs b/crates/brk_fetcher/src/lib.rs index 5d9749f84..048196c3a 100644 --- a/crates/brk_fetcher/src/lib.rs +++ b/crates/brk_fetcher/src/lib.rs @@ -1,74 +1,87 @@ #![doc = include_str!("../README.md")] -use std::{collections::BTreeMap, path::Path, thread::sleep, time::Duration}; +use std::{path::Path, thread::sleep, time::Duration}; use brk_error::{Error, Result}; -use brk_types::{Close, Date, Dollars, Height, High, Low, OHLCCents, Open, Timestamp}; +use brk_types::{Date, Height, OHLCCents, Timestamp}; use log::info; mod binance; mod brk; mod kraken; +mod ohlc; mod retry; +mod source; pub use binance::*; pub use brk::*; pub use kraken::*; +pub use ohlc::compute_ohlc_from_range; use retry::*; +pub use source::{PriceSource, TrackedSource}; -const TRIES: usize = 12 * 60; +const MAX_RETRIES: usize = 12 * 60; // 12 hours of retrying #[derive(Clone)] pub struct Fetcher { - binance: Option, - kraken: Option, - brk: BRK, + pub binance: Option>, + pub kraken: Option>, + pub brk: TrackedSource, } impl Fetcher { pub fn import(exchanges: bool, hars_path: Option<&Path>) -> Result { + Self::new(exchanges, hars_path) + } + + pub fn new(exchanges: bool, hars_path: Option<&Path>) -> Result { Ok(Self { - binance: exchanges.then(|| Binance::init(hars_path)), - kraken: exchanges.then(Kraken::default), - brk: BRK::default(), + binance: exchanges.then(|| TrackedSource::new(Binance::init(hars_path))), + kraken: exchanges.then(|| TrackedSource::new(Kraken::default())), + brk: TrackedSource::new(BRK::default()), }) } - pub fn get_date(&mut self, date: Date) -> Result { - self.get_date_(date, 0) + /// Iterate over all active sources in priority order + fn for_each_source(&mut self, mut f: F) + where + F: FnMut(&mut dyn PriceSource), + { + if let Some(binance) = &mut self.binance { + f(binance); + } + if let Some(kraken) = &mut self.kraken { + f(kraken); + } + f(&mut self.brk); } - fn get_date_(&mut self, date: Date, tries: usize) -> Result { - self.kraken - .as_mut() - .map_or(Err(Error::Str("Kraken off")), |kraken| { - kraken.get_from_1d(&date) - }) - .or_else(|_| { - // eprintln!("{e}"); - self.binance - .as_mut() - .map_or(Err(Error::Str("Binance off")), |binance| { - binance.get_from_1d(&date) - }) - }) - .or_else(|_| { - // eprintln!("{e}"); - self.brk.get_from_date(date) - }) - .or_else(|e| { - sleep(Duration::from_secs(60)); + /// Try fetching from each source in order, return first success + fn try_sources(&mut self, mut fetch: F) -> Option> + where + F: FnMut(&mut dyn PriceSource) -> Option>, + { + if let Some(binance) = &mut self.binance + && let Some(Ok(ohlc)) = fetch(binance) + { + return Some(Ok(ohlc)); + } + if let Some(kraken) = &mut self.kraken + && let Some(Ok(ohlc)) = fetch(kraken) + { + return Some(Ok(ohlc)); + } + if let Some(Ok(ohlc)) = fetch(&mut self.brk) { + return Some(Ok(ohlc)); + } + None + } - if tries < TRIES { - self.clear(); - // dbg!(e, date, &self.binance._1d); - info!("Retrying to fetch date price..."); - self.get_date_(date, tries + 1) - } else { - info!("Failed to fetch date prices..."); - Err(e) - } - }) + pub fn get_date(&mut self, date: Date) -> Result { + self.fetch_with_retry( + |source| source.get_date(date), + || format!("Failed to fetch price for date {date}"), + ) } pub fn get_height( @@ -76,62 +89,25 @@ impl Fetcher { height: Height, timestamp: Timestamp, previous_timestamp: Option, - ) -> Result { - self.get_height_(height, timestamp, previous_timestamp, 0) - } - - fn get_height_( - &mut self, - height: Height, - timestamp: Timestamp, - previous_timestamp: Option, - tries: usize, ) -> Result { let timestamp = timestamp.floor_seconds(); - - if previous_timestamp.is_none() && height != Height::ZERO { - panic!("Shouldn't be possible"); - } - let previous_timestamp = previous_timestamp.map(|t| t.floor_seconds()); - let ohlc = self - .kraken - .as_mut() - .map_or(Err(Error::Str("Kraken off")), |kraken| { - kraken.get_from_1mn(timestamp, previous_timestamp) - }) - .unwrap_or_else(|_report| { - // eprintln!("{_report}"); - self.binance - .as_mut() - .map_or(Err(Error::Str("Binance off")), |binance| { - binance.get_from_1mn(timestamp, previous_timestamp) - }) - .unwrap_or_else(|_report| { - // // eprintln!("{_report}"); - self.brk.get_from_height(height).unwrap_or_else(|_report| { - // eprintln!("{_report}"); + if previous_timestamp.is_none() && height != Height::ZERO { + panic!("previous_timestamp required for non-genesis blocks"); + } - sleep(Duration::from_secs(60)); - - if tries < TRIES { - self.clear(); - - info!("Retrying to fetch height prices..."); - // dbg!((height, timestamp, previous_timestamp)); - - return self - .get_height_(height, timestamp, previous_timestamp, tries + 1) - .unwrap(); - } - - info!("Failed to fetch height prices"); - - let date = Date::from(timestamp); - // eprintln!("{e}"); - panic!( - " + self.fetch_with_retry( + |source| { + // Try 1mn data first, fall back to height-based + source + .get_1mn(timestamp, previous_timestamp) + .or_else(|| source.get_height(height)) + }, + || { + let date = Date::from(timestamp); + format!( + " Can't find the price for: height: {height} - date: {date} 1mn APIs are limited to the last 16 hours for Binance's and the last 10 hours for Kraken's How to fix this: @@ -145,64 +121,49 @@ How to fix this: 7. Go back to the dev tools 8. Export to a har file (if there is no explicit button, click on the cog button) 9. Move the file to 'parser/imports/binance.har' - " - ) - }) - }) - }); - - Ok(ohlc) +" + ) + }, + ) } - fn find_height_ohlc( - tree: &BTreeMap, - timestamp: Timestamp, - previous_timestamp: Option, - name: &str, - ) -> Result { - let previous_ohlc = previous_timestamp - .map_or(Some(OHLCCents::default()), |previous_timestamp| { - tree.get(&previous_timestamp).cloned() - }); + /// Try each source in order, with retries on total failure + fn fetch_with_retry(&mut self, mut fetch: F, error_message: E) -> Result + where + F: FnMut(&mut dyn PriceSource) -> Option>, + E: Fn() -> String, + { + for retry in 0..=MAX_RETRIES { + if let Some(ohlc) = self.try_sources(&mut fetch) { + return ohlc; + } - let last_ohlc = tree.get(×tamp); - - if previous_ohlc.is_none() || last_ohlc.is_none() { - return Err(Error::String(format!("Couldn't find timestamp in {name}"))); + // All sources failed + if retry < MAX_RETRIES { + info!("All sources failed, retrying in 60s..."); + sleep(Duration::from_secs(60)); + self.clear_caches(); + } } - let previous_ohlc = previous_ohlc.unwrap(); - - let mut final_ohlc = OHLCCents::from(previous_ohlc.close); - - let start = previous_timestamp.unwrap_or(Timestamp::new(0)); - let end = timestamp; - - // Otherwise it's a re-org - if start < end { - tree.range(start..=end).skip(1).for_each(|(_, ohlc)| { - if ohlc.high > final_ohlc.high { - final_ohlc.high = ohlc.high - } - - if ohlc.low < final_ohlc.low { - final_ohlc.low = ohlc.low - } - - final_ohlc.close = ohlc.close; - }); - } - - Ok(final_ohlc) + Err(Error::String(error_message())) } + fn clear_caches(&mut self) { + self.for_each_source(|s| s.clear()); + } + + /// Clear caches and reset health state for all sources pub fn clear(&mut self) { - if let Some(kraken) = self.kraken.as_mut() { - kraken.clear() + if let Some(binance) = &mut self.binance { + binance.clear(); + binance.reset_health(); } - if let Some(binance) = self.binance.as_mut() { - binance.clear() + if let Some(kraken) = &mut self.kraken { + kraken.clear(); + kraken.reset_health(); } self.brk.clear(); + self.brk.reset_health(); } } diff --git a/crates/brk_fetcher/src/ohlc.rs b/crates/brk_fetcher/src/ohlc.rs new file mode 100644 index 000000000..585ea1ddf --- /dev/null +++ b/crates/brk_fetcher/src/ohlc.rs @@ -0,0 +1,81 @@ +use std::collections::BTreeMap; + +use brk_error::{Error, Result}; +use brk_types::{Cents, Close, Date, Dollars, High, Low, OHLCCents, Open, Timestamp}; + +/// Parse OHLC value from a JSON array element at given index +pub fn parse_cents(array: &[serde_json::Value], index: usize) -> Cents { + Cents::from(Dollars::from( + array + .get(index) + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0.0), + )) +} + +/// Build OHLCCents from array indices 1-4 (open, high, low, close) +pub fn ohlc_from_array(array: &[serde_json::Value]) -> OHLCCents { + OHLCCents::from(( + Open::new(parse_cents(array, 1)), + High::new(parse_cents(array, 2)), + Low::new(parse_cents(array, 3)), + Close::new(parse_cents(array, 4)), + )) +} + +/// Compute OHLC for a block from a time series of minute data. +/// Aggregates all candles between previous_timestamp and timestamp. +pub fn compute_ohlc_from_range( + tree: &BTreeMap, + timestamp: Timestamp, + previous_timestamp: Option, + source_name: &str, +) -> Result { + let previous_ohlc = previous_timestamp + .map_or(Some(OHLCCents::default()), |t| tree.get(&t).cloned()); + + let last_ohlc = tree.get(×tamp); + + if previous_ohlc.is_none() || last_ohlc.is_none() { + return Err(Error::String(format!( + "Couldn't find timestamp in {source_name}" + ))); + } + + let previous_ohlc = previous_ohlc.unwrap(); + let mut result = OHLCCents::from(previous_ohlc.close); + + let start = previous_timestamp.unwrap_or(Timestamp::new(0)); + let end = timestamp; + + // Skip if re-org (start >= end) + if start < end { + for (_, ohlc) in tree.range(start..=end).skip(1) { + if ohlc.high > result.high { + result.high = ohlc.high; + } + if ohlc.low < result.low { + result.low = ohlc.low; + } + result.close = ohlc.close; + } + } + + Ok(result) +} + +/// Parse timestamp from milliseconds (Binance format) +pub fn timestamp_from_ms(ms: u64) -> Timestamp { + Timestamp::from((ms / 1_000) as u32) +} + +/// Parse timestamp from seconds (Kraken format) +pub fn timestamp_from_secs(secs: u64) -> Timestamp { + Timestamp::from(secs as u32) +} + +/// Convert timestamp to date +pub fn date_from_timestamp(timestamp: Timestamp) -> Date { + Date::from(timestamp) +} diff --git a/crates/brk_fetcher/src/retry.rs b/crates/brk_fetcher/src/retry.rs index e40629e41..ec5e8e674 100644 --- a/crates/brk_fetcher/src/retry.rs +++ b/crates/brk_fetcher/src/retry.rs @@ -1,32 +1,38 @@ -use std::{fmt::Debug, thread::sleep, time::Duration}; +use std::{thread::sleep, time::Duration}; use brk_error::Result; use log::info; -pub fn default_retry(function: impl Fn(usize) -> Result) -> Result -where - T: Debug, -{ +pub fn default_retry(function: impl Fn(usize) -> Result) -> Result { retry(function, 5, 6) } -fn retry(function: impl Fn(usize) -> Result, sleep_in_s: u64, retries: usize) -> Result -where - T: Debug, -{ +fn retry(function: impl Fn(usize) -> Result, sleep_in_s: u64, retries: usize) -> Result { let mut i = 0; loop { let res = function(i); - if i == retries || res.is_ok() { + if res.is_ok() { return res; - } else { - let _ = dbg!(res); - info!("Failed, waiting {sleep_in_s} seconds..."); - sleep(Duration::from_secs(sleep_in_s)); } + // Check if error is permanent (blocked endpoint, DNS failure, etc.) + // If so, fail immediately without retrying + if let Err(ref e) = res + && e.is_network_permanently_blocked() + { + info!("Permanent network error detected (blocked/unreachable), skipping retries"); + return res; + } + + if i == retries { + return res; + } + + info!("Failed, waiting {sleep_in_s} seconds..."); + sleep(Duration::from_secs(sleep_in_s)); + i += 1; } } diff --git a/crates/brk_fetcher/src/source.rs b/crates/brk_fetcher/src/source.rs new file mode 100644 index 000000000..490c56b3c --- /dev/null +++ b/crates/brk_fetcher/src/source.rs @@ -0,0 +1,131 @@ +use std::time::{Duration, Instant}; + +use brk_error::{Error, Result}; +use brk_types::{Date, Height, OHLCCents, Timestamp}; +use log::info; + +/// Default cooldown period for unhealthy sources (5 minutes) +const DEFAULT_COOLDOWN_SECS: u64 = 5 * 60; + +/// A price data source that can fetch OHLC data by date or timestamp. +pub trait PriceSource { + fn name(&self) -> &'static str; + + /// Fetch daily OHLC for a date. Returns None if this source doesn't support date queries. + fn get_date(&mut self, date: Date) -> Option>; + + /// Fetch minute OHLC for a timestamp range. Returns None if unsupported. + fn get_1mn( + &mut self, + timestamp: Timestamp, + previous_timestamp: Option, + ) -> Option>; + + /// Fetch OHLC by block height. Returns None if unsupported. + fn get_height(&mut self, height: Height) -> Option>; + + /// Clear cached data + fn clear(&mut self); +} + +/// Wraps a price source with health tracking. +/// Automatically skips blocked/unreachable sources and rechecks after cooldown. +#[derive(Clone)] +pub struct TrackedSource { + source: T, + unhealthy_since: Option, + cooldown: Duration, +} + +impl TrackedSource { + pub fn new(source: T) -> Self { + Self { + source, + unhealthy_since: None, + cooldown: Duration::from_secs(DEFAULT_COOLDOWN_SECS), + } + } + + pub fn name(&self) -> &'static str { + self.source.name() + } + + fn is_healthy(&self) -> bool { + match self.unhealthy_since { + None => true, + Some(since) => since.elapsed() >= self.cooldown, + } + } + + fn remaining_cooldown(&self) -> u64 { + self.unhealthy_since + .map(|since| self.cooldown.saturating_sub(since.elapsed()).as_secs()) + .unwrap_or(0) + } + + /// Try to fetch, tracking health state + fn try_fetch(&mut self, fetch: impl FnOnce(&mut T) -> Option>) -> Option> { + if !self.is_healthy() { + return Some(Err(Error::String(format!( + "{} temporarily disabled (recheck in {}s)", + self.name(), + self.remaining_cooldown() + )))); + } + + let result = fetch(&mut self.source)?; + self.update_health(&result); + Some(result) + } + + fn update_health(&mut self, result: &Result) { + match result { + Ok(_) => { + if self.unhealthy_since.take().is_some() { + info!("{} is back online", self.name()); + } + } + Err(e) if e.is_network_permanently_blocked() => { + if self.unhealthy_since.is_none() { + info!( + "{} marked unhealthy (blocked/unreachable), recheck in {}s", + self.name(), + self.cooldown.as_secs() + ); + self.unhealthy_since = Some(Instant::now()); + } + } + Err(_) => {} // Transient - no change + } + } + + pub fn reset_health(&mut self) { + self.unhealthy_since = None; + } +} + +impl PriceSource for TrackedSource { + fn name(&self) -> &'static str { + self.source.name() + } + + fn get_date(&mut self, date: Date) -> Option> { + self.try_fetch(|s| s.get_date(date)) + } + + fn get_1mn( + &mut self, + timestamp: Timestamp, + previous_timestamp: Option, + ) -> Option> { + self.try_fetch(|s| s.get_1mn(timestamp, previous_timestamp)) + } + + fn get_height(&mut self, height: Height) -> Option> { + self.try_fetch(|s| s.get_height(height)) + } + + fn clear(&mut self) { + self.source.clear(); + } +} diff --git a/crates/brk_grouper/src/amount_filter.rs b/crates/brk_grouper/src/amount_filter.rs index 817addbd1..1ca1ab34a 100644 --- a/crates/brk_grouper/src/amount_filter.rs +++ b/crates/brk_grouper/src/amount_filter.rs @@ -40,7 +40,11 @@ impl AmountFilter { AmountFilter::LowerThan(s) => format!("under_{}", format_sats(*s)), AmountFilter::GreaterOrEqual(s) => format!("above_{}", format_sats(*s)), AmountFilter::Range(r) => { - format!("{}_{}", format_sats(r.start), format_sats(r.end)) + format!( + "above_{}_under_{}", + format_sats(r.start), + format_sats(r.end) + ) } } } diff --git a/crates/brk_grouper/src/filter.rs b/crates/brk_grouper/src/filter.rs index 2acf4a393..367b623f6 100644 --- a/crates/brk_grouper/src/filter.rs +++ b/crates/brk_grouper/src/filter.rs @@ -58,8 +58,7 @@ impl Filter { let needs_prefix = match self { Filter::All | Filter::Term(_) | Filter::Epoch(_) | Filter::Type(_) => false, - Filter::Time(_) => matches!(context, CohortContext::Utxo), - Filter::Amount(_) => true, + Filter::Time(_) | Filter::Amount(_) => true, }; if needs_prefix { diff --git a/crates/brk_grouper/src/time_filter.rs b/crates/brk_grouper/src/time_filter.rs index 621b45303..d5db1030d 100644 --- a/crates/brk_grouper/src/time_filter.rs +++ b/crates/brk_grouper/src/time_filter.rs @@ -44,66 +44,66 @@ impl TimeFilter { pub fn to_name_suffix(&self) -> String { match self { // Special cases for common filters - TimeFilter::LowerThan(1) => "up_to_1d".to_string(), - TimeFilter::LowerThan(7) => "up_to_1w".to_string(), - TimeFilter::LowerThan(30) => "up_to_1m".to_string(), - TimeFilter::LowerThan(60) => "up_to_2m".to_string(), - TimeFilter::LowerThan(90) => "up_to_3m".to_string(), - TimeFilter::LowerThan(120) => "up_to_4m".to_string(), + TimeFilter::LowerThan(1) => "up_to_1d_old".to_string(), + TimeFilter::LowerThan(7) => "up_to_1w_old".to_string(), + TimeFilter::LowerThan(30) => "up_to_1m_old".to_string(), + TimeFilter::LowerThan(60) => "up_to_2m_old".to_string(), + TimeFilter::LowerThan(90) => "up_to_3m_old".to_string(), + TimeFilter::LowerThan(120) => "up_to_4m_old".to_string(), TimeFilter::LowerThan(150) => "sth".to_string(), - TimeFilter::LowerThan(180) => "up_to_6m".to_string(), - TimeFilter::LowerThan(365) => "up_to_1y".to_string(), - TimeFilter::LowerThan(730) => "up_to_2y".to_string(), - TimeFilter::LowerThan(1095) => "up_to_3y".to_string(), - TimeFilter::LowerThan(1460) => "up_to_4y".to_string(), - TimeFilter::LowerThan(1825) => "up_to_5y".to_string(), - TimeFilter::LowerThan(2190) => "up_to_6y".to_string(), - TimeFilter::LowerThan(2555) => "up_to_7y".to_string(), - TimeFilter::LowerThan(2920) => "up_to_8y".to_string(), - TimeFilter::LowerThan(3650) => "up_to_10y".to_string(), - TimeFilter::LowerThan(4380) => "up_to_12y".to_string(), - TimeFilter::LowerThan(5475) => "up_to_15y".to_string(), + TimeFilter::LowerThan(180) => "up_to_6m_old".to_string(), + TimeFilter::LowerThan(365) => "up_to_1y_old".to_string(), + TimeFilter::LowerThan(730) => "up_to_2y_old".to_string(), + TimeFilter::LowerThan(1095) => "up_to_3y_old".to_string(), + TimeFilter::LowerThan(1460) => "up_to_4y_old".to_string(), + TimeFilter::LowerThan(1825) => "up_to_5y_old".to_string(), + TimeFilter::LowerThan(2190) => "up_to_6y_old".to_string(), + TimeFilter::LowerThan(2555) => "up_to_7y_old".to_string(), + TimeFilter::LowerThan(2920) => "up_to_8y_old".to_string(), + TimeFilter::LowerThan(3650) => "up_to_10y_old".to_string(), + TimeFilter::LowerThan(4380) => "up_to_12y_old".to_string(), + TimeFilter::LowerThan(5475) => "up_to_15y_old".to_string(), - TimeFilter::GreaterOrEqual(1) => "at_least_1d".to_string(), - TimeFilter::GreaterOrEqual(7) => "at_least_1w".to_string(), - TimeFilter::GreaterOrEqual(30) => "at_least_1m".to_string(), - TimeFilter::GreaterOrEqual(60) => "at_least_2m".to_string(), - TimeFilter::GreaterOrEqual(90) => "at_least_3m".to_string(), - TimeFilter::GreaterOrEqual(120) => "at_least_4m".to_string(), + TimeFilter::GreaterOrEqual(1) => "at_least_1d_old".to_string(), + TimeFilter::GreaterOrEqual(7) => "at_least_1w_old".to_string(), + TimeFilter::GreaterOrEqual(30) => "at_least_1m_old".to_string(), + TimeFilter::GreaterOrEqual(60) => "at_least_2m_old".to_string(), + TimeFilter::GreaterOrEqual(90) => "at_least_3m_old".to_string(), + TimeFilter::GreaterOrEqual(120) => "at_least_4m_old".to_string(), TimeFilter::GreaterOrEqual(150) => "lth".to_string(), - TimeFilter::GreaterOrEqual(180) => "at_least_6m".to_string(), - TimeFilter::GreaterOrEqual(365) => "at_least_1y".to_string(), - TimeFilter::GreaterOrEqual(730) => "at_least_2y".to_string(), - TimeFilter::GreaterOrEqual(1095) => "at_least_3y".to_string(), - TimeFilter::GreaterOrEqual(1460) => "at_least_4y".to_string(), - TimeFilter::GreaterOrEqual(1825) => "at_least_5y".to_string(), - TimeFilter::GreaterOrEqual(2190) => "at_least_6y".to_string(), - TimeFilter::GreaterOrEqual(2555) => "at_least_7y".to_string(), - TimeFilter::GreaterOrEqual(2920) => "at_least_8y".to_string(), - TimeFilter::GreaterOrEqual(3650) => "at_least_10y".to_string(), - TimeFilter::GreaterOrEqual(4380) => "at_least_12y".to_string(), - TimeFilter::GreaterOrEqual(5475) => "at_least_15y".to_string(), + TimeFilter::GreaterOrEqual(180) => "at_least_6m_old".to_string(), + TimeFilter::GreaterOrEqual(365) => "at_least_1y_old".to_string(), + TimeFilter::GreaterOrEqual(730) => "at_least_2y_old".to_string(), + TimeFilter::GreaterOrEqual(1095) => "at_least_3y_old".to_string(), + TimeFilter::GreaterOrEqual(1460) => "at_least_4y_old".to_string(), + TimeFilter::GreaterOrEqual(1825) => "at_least_5y_old".to_string(), + TimeFilter::GreaterOrEqual(2190) => "at_least_6y_old".to_string(), + TimeFilter::GreaterOrEqual(2555) => "at_least_7y_old".to_string(), + TimeFilter::GreaterOrEqual(2920) => "at_least_8y_old".to_string(), + TimeFilter::GreaterOrEqual(3650) => "at_least_10y_old".to_string(), + TimeFilter::GreaterOrEqual(4380) => "at_least_12y_old".to_string(), + TimeFilter::GreaterOrEqual(5475) => "at_least_15y_old".to_string(), // Range special cases TimeFilter::Range(r) if *r == (0..1) => "up_to_1d".to_string(), - TimeFilter::Range(r) if *r == (1..7) => "1d_to_1w".to_string(), - TimeFilter::Range(r) if *r == (7..30) => "1w_to_1m".to_string(), - TimeFilter::Range(r) if *r == (30..60) => "1m_to_2m".to_string(), - TimeFilter::Range(r) if *r == (60..90) => "2m_to_3m".to_string(), - TimeFilter::Range(r) if *r == (90..120) => "3m_to_4m".to_string(), - TimeFilter::Range(r) if *r == (120..150) => "4m_to_5m".to_string(), - TimeFilter::Range(r) if *r == (150..180) => "5m_to_6m".to_string(), - TimeFilter::Range(r) if *r == (180..365) => "6m_to_1y".to_string(), - TimeFilter::Range(r) if *r == (365..730) => "1y_to_2y".to_string(), - TimeFilter::Range(r) if *r == (730..1095) => "2y_to_3y".to_string(), - TimeFilter::Range(r) if *r == (1095..1460) => "3y_to_4y".to_string(), - TimeFilter::Range(r) if *r == (1460..1825) => "4y_to_5y".to_string(), - TimeFilter::Range(r) if *r == (1825..2190) => "5y_to_6y".to_string(), - TimeFilter::Range(r) if *r == (2190..2555) => "6y_to_7y".to_string(), - TimeFilter::Range(r) if *r == (2555..2920) => "7y_to_8y".to_string(), - TimeFilter::Range(r) if *r == (2920..3650) => "8y_to_10y".to_string(), - TimeFilter::Range(r) if *r == (3650..4380) => "10y_to_12y".to_string(), - TimeFilter::Range(r) if *r == (4380..5475) => "12y_to_15y".to_string(), + TimeFilter::Range(r) if *r == (1..7) => "at_least_1d_up_to_1w_old".to_string(), + TimeFilter::Range(r) if *r == (7..30) => "at_least_1w_up_to_1m_old".to_string(), + TimeFilter::Range(r) if *r == (30..60) => "at_least_1m_up_to_2m_old".to_string(), + TimeFilter::Range(r) if *r == (60..90) => "at_least_2m_up_to_3m_old".to_string(), + TimeFilter::Range(r) if *r == (90..120) => "at_least_3m_up_to_4m_old".to_string(), + TimeFilter::Range(r) if *r == (120..150) => "at_least_4m_up_to_5m_old".to_string(), + TimeFilter::Range(r) if *r == (150..180) => "at_least_5m_up_to_6m_old".to_string(), + TimeFilter::Range(r) if *r == (180..365) => "at_least_6m_up_to_1y_old".to_string(), + TimeFilter::Range(r) if *r == (365..730) => "at_least_1y_up_to_2y_old".to_string(), + TimeFilter::Range(r) if *r == (730..1095) => "at_least_2y_up_to_3y_old".to_string(), + TimeFilter::Range(r) if *r == (1095..1460) => "at_least_3y_up_to_4y_old".to_string(), + TimeFilter::Range(r) if *r == (1460..1825) => "at_least_4y_up_to_5y_old".to_string(), + TimeFilter::Range(r) if *r == (1825..2190) => "at_least_5y_up_to_6y_old".to_string(), + TimeFilter::Range(r) if *r == (2190..2555) => "at_least_6y_up_to_7y_old".to_string(), + TimeFilter::Range(r) if *r == (2555..2920) => "at_least_7y_up_to_8y_old".to_string(), + TimeFilter::Range(r) if *r == (2920..3650) => "at_least_8y_up_to_10y_old".to_string(), + TimeFilter::Range(r) if *r == (3650..4380) => "at_least_10y_up_to_12y_old".to_string(), + TimeFilter::Range(r) if *r == (4380..5475) => "at_least_12y_up_to_15y_old".to_string(), // Fallback generic names TimeFilter::LowerThan(d) => format!("up_to_{}d", d), diff --git a/crates/brk_indexer/examples/indexer_bench.rs b/crates/brk_indexer/examples/indexer_bench.rs index c7d0b2cff..643518862 100644 --- a/crates/brk_indexer/examples/indexer_bench.rs +++ b/crates/brk_indexer/examples/indexer_bench.rs @@ -25,9 +25,10 @@ fn main() -> Result<()> { // let bitcoin_dir = Path::new("/Volumes/WD_BLACK1/bitcoin"); let outputs_dir = Path::new(&env::var("HOME").unwrap()).join(".brk/benches"); - fs::create_dir_all(&outputs_dir)?; // let outputs_dir = Path::new("/Volumes/WD_BLACK1/brk"); + fs::create_dir_all(&outputs_dir)?; + let client = Client::new( Client::default_url(), Auth::CookieFile(bitcoin_dir.join(".cookie")), @@ -37,8 +38,6 @@ fn main() -> Result<()> { let blocks = Blocks::new(&client, &reader); - fs::create_dir_all(&outputs_dir)?; - let mut indexer = Indexer::forced_import(&outputs_dir)?; let mut bencher = diff --git a/crates/brk_indexer/examples/indexer_bench2.rs b/crates/brk_indexer/examples/indexer_bench2.rs index e95df4d29..2533332e2 100644 --- a/crates/brk_indexer/examples/indexer_bench2.rs +++ b/crates/brk_indexer/examples/indexer_bench2.rs @@ -25,9 +25,10 @@ fn main() -> Result<()> { // let bitcoin_dir = Path::new("/Volumes/WD_BLACK1/bitcoin"); let outputs_dir = Path::new(&env::var("HOME").unwrap()).join(".brk/benches"); - fs::create_dir_all(&outputs_dir)?; // let outputs_dir = Path::new("/Volumes/WD_BLACK1/brk"); + fs::create_dir_all(&outputs_dir)?; + let client = Client::new( Client::default_url(), Auth::CookieFile(bitcoin_dir.join(".cookie")), diff --git a/crates/brk_indexer/src/processor.rs b/crates/brk_indexer/src/processor.rs index 2cb604d6f..b5c720b5c 100644 --- a/crates/brk_indexer/src/processor.rs +++ b/crates/brk_indexer/src/processor.rs @@ -1,7 +1,6 @@ use bitcoin::{Transaction, TxIn, TxOut}; use brk_error::{Error, Result}; use brk_grouper::ByAddressType; -use brk_store::AnyStore; use brk_types::{ AddressBytes, AddressHash, AddressIndexOutPoint, AddressIndexTxIndex, Block, BlockHashPrefix, Height, OutPoint, OutputType, Sats, StoredBool, Timestamp, TxInIndex, TxIndex, TxOutIndex, @@ -195,6 +194,21 @@ impl<'a> BlockProcessor<'a> { let outpoint = txin.previous_output; let txid = Txid::from(outpoint.txid); let txid_prefix = TxidPrefix::from(&txid); + let vout = Vout::from(outpoint.vout); + + if let Some(&&same_block_txindex) = txid_prefix_to_txindex + .get(&txid_prefix) { + let outpoint = OutPoint::new(same_block_txindex, vout); + return Ok(( + txinindex, + InputSource::SameBlock { + txindex, + txin, + vin, + outpoint, + }, + )); + } let prev_txindex = if let Some(txindex) = self .stores @@ -207,24 +221,9 @@ impl<'a> BlockProcessor<'a> { { txindex } else { - let vout = Vout::from(outpoint.vout); - let prev_txindex = **txid_prefix_to_txindex - .get(&txid_prefix) - .ok_or(Error::Str("txid should be in same block"))?; - let outpoint = OutPoint::new(prev_txindex, vout); - - return Ok(( - txinindex, - InputSource::SameBlock { - txindex, - txin, - vin, - outpoint, - }, - )); + return Err(Error::Str("Can't find txid = {txid}")); }; - let vout = Vout::from(outpoint.vout); let txoutindex = self .vecs .txindex_to_first_txoutindex