diff --git a/Cargo.lock b/Cargo.lock index 7a6c9395c..d339ec12e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -431,9 +431,9 @@ version = "0.1.9" dependencies = [ "brk_cohort", "brk_types", - "minreq", "serde", "serde_json", + "ureq", ] [[package]] @@ -487,11 +487,11 @@ dependencies = [ "corepc-client", "fjall", "jiff", - "minreq", "pco", "serde_json", "thiserror", "tokio", + "ureq", "vecdb", ] @@ -502,9 +502,9 @@ dependencies = [ "brk_error", "brk_logger", "brk_types", - "minreq", "serde_json", "tracing", + "ureq", ] [[package]] @@ -2120,11 +2120,8 @@ version = "2.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05015102dad0f7d61691ca347e9d9d9006685a64aefb3d79eecf62665de2153d" dependencies = [ - "rustls", - "rustls-webpki", "serde", "serde_json", - "webpki-roots", ] [[package]] @@ -2639,23 +2636,36 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.12" +version = "0.23.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" dependencies = [ "log", + "once_cell", "ring", + "rustls-pki-types", "rustls-webpki", - "sct", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pki-types" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" +dependencies = [ + "zeroize", ] [[package]] name = "rustls-webpki" -version = "0.101.7" +version = "0.103.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53" dependencies = [ "ring", + "rustls-pki-types", "untrusted", ] @@ -2712,16 +2722,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "secp256k1" version = "0.29.1" @@ -2955,6 +2955,12 @@ dependencies = [ "syn", ] +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" version = "2.0.117" @@ -3261,6 +3267,35 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "ureq" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdc97a28575b85cfedf2a7e7d3cc64b3e11bd8ac766666318003abbacc7a21fc" +dependencies = [ + "base64 0.22.1", + "flate2", + "log", + "percent-encoding", + "rustls", + "rustls-pki-types", + "ureq-proto", + "utf-8", + "webpki-roots", +] + +[[package]] +name = "ureq-proto" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d81f9efa9df032be5934a46a068815a10a042b494b6a58cb0a1a97bb5467ed6f" +dependencies = [ + "base64 0.22.1", + "http", + "httparse", + "log", +] + [[package]] name = "url" version = "2.5.8" @@ -3274,6 +3309,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -3452,9 +3493,12 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.4" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" +dependencies = [ + "rustls-pki-types", +] [[package]] name = "weezl" @@ -3898,6 +3942,12 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zeroize" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" + [[package]] name = "zerotrie" version = "0.2.3" diff --git a/Cargo.toml b/Cargo.toml index 99dde2870..fd027d1b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,8 +40,6 @@ aide = { version = "0.16.0-alpha.2", features = ["axum-json", "axum-query"] } axum = { version = "0.8.8", default-features = false, features = ["http1", "json", "query", "tokio", "tracing"] } bitcoin = { version = "0.32.8", features = ["serde"] } bitcoincore-rpc = "0.19.0" -corepc-client = { path = "/Users/k/Developer/corepc/client", features = ["client-sync"] } -corepc-jsonrpc = { package = "jsonrpc", path = "/Users/k/Developer/corepc/jsonrpc", features = ["simple_http"], default-features = false } brk_alloc = { version = "0.1.9", path = "crates/brk_alloc" } brk_bencher = { version = "0.1.9", path = "crates/brk_bencher" } brk_bindgen = { version = "0.1.9", path = "crates/brk_bindgen" } @@ -54,8 +52,8 @@ brk_fetcher = { version = "0.1.9", path = "crates/brk_fetcher" } brk_indexer = { version = "0.1.9", path = "crates/brk_indexer" } brk_iterator = { version = "0.1.9", path = "crates/brk_iterator" } brk_logger = { version = "0.1.9", path = "crates/brk_logger" } -brk_oracle = { version = "0.1.9", path = "crates/brk_oracle" } brk_mempool = { version = "0.1.9", path = "crates/brk_mempool" } +brk_oracle = { version = "0.1.9", path = "crates/brk_oracle" } brk_query = { version = "0.1.9", path = "crates/brk_query", features = ["tokio"] } brk_reader = { version = "0.1.9", path = "crates/brk_reader" } brk_rpc = { version = "0.1.9", path = "crates/brk_rpc" } @@ -67,11 +65,12 @@ brk_types = { version = "0.1.9", path = "crates/brk_types" } brk_website = { version = "0.1.9", path = "crates/brk_website" } byteview = "0.10.1" color-eyre = "0.6.5" +corepc-client = { path = "/Users/k/Developer/corepc/client", features = ["client-sync"] } +corepc-jsonrpc = { package = "jsonrpc", path = "/Users/k/Developer/corepc/jsonrpc", features = ["simple_http"], default-features = false } derive_more = { version = "2.1.1", features = ["deref", "deref_mut"] } fjall = "3.1.0" indexmap = { version = "2.13.0", features = ["serde"] } jiff = { version = "0.2.23", features = ["perf-inline", "tz-system"], default-features = false } -minreq = { version = "2.14.1", features = ["https", "json-using-serde"] } owo-colors = "4.3.0" parking_lot = "0.12.5" pco = "1.0.1" @@ -84,9 +83,10 @@ serde_derive = "1.0.228" serde_json = { version = "1.0.149", features = ["float_roundtrip", "preserve_order"] } smallvec = "1.15.1" tokio = { version = "1.50.0", features = ["rt-multi-thread"] } -tracing = { version = "0.1", default-features = false, features = ["std"] } tower-http = { version = "0.6.8", features = ["catch-panic", "compression-br", "compression-gzip", "compression-zstd", "cors", "normalize-path", "timeout", "trace"] } tower-layer = "0.3" +tracing = { version = "0.1", default-features = false, features = ["std"] } +ureq = "3.2.0" # vecdb = { version = "0.6.8", features = ["derive", "serde_json", "pco", "schemars"] } vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco", "schemars"] } diff --git a/crates/brk_bindgen/src/generators/rust/client.rs b/crates/brk_bindgen/src/generators/rust/client.rs index 8fbaa4640..e6eef3296 100644 --- a/crates/brk_bindgen/src/generators/rust/client.rs +++ b/crates/brk_bindgen/src/generators/rust/client.rs @@ -11,7 +11,8 @@ use crate::{ pub fn generate_imports(output: &mut String) { writeln!( output, - r#"use std::sync::Arc; + r#"use std::io::Read as _; +use std::sync::Arc; use std::ops::{{Bound, RangeBounds}}; use serde::de::DeserializeOwned; pub use brk_cohort::*; @@ -59,59 +60,61 @@ impl Default for BrkClientOptions {{ }} }} -/// Base HTTP client for making requests. +/// Base HTTP client for making requests. Reuses connections via ureq::Agent. #[derive(Debug, Clone)] pub struct BrkClientBase {{ + agent: ureq::Agent, base_url: String, - timeout_secs: u64, }} impl BrkClientBase {{ /// Create a new client with the given base URL. pub fn new(base_url: impl Into) -> Self {{ - Self {{ - base_url: base_url.into(), - timeout_secs: 30, - }} + Self::with_options(BrkClientOptions {{ base_url: base_url.into(), ..Default::default() }}) }} /// Create a new client with options. pub fn with_options(options: BrkClientOptions) -> Self {{ + let agent = ureq::Agent::config_builder() + .timeout_global(Some(std::time::Duration::from_secs(options.timeout_secs))) + .build() + .into(); Self {{ + agent, base_url: options.base_url, - timeout_secs: options.timeout_secs, }} }} - fn get(&self, path: &str) -> Result {{ + fn get(&self, path: &str) -> Result> {{ let base = self.base_url.trim_end_matches('/'); let url = format!("{{}}{{}}", base, path); - let response = minreq::get(&url) - .with_timeout(self.timeout_secs) - .send() + let mut response = self.agent.get(&url) + .call() .map_err(|e| BrkError {{ message: e.to_string() }})?; - if response.status_code >= 400 {{ + if response.status().as_u16() >= 400 {{ return Err(BrkError {{ - message: format!("HTTP {{}}", response.status_code), + message: format!("HTTP {{}}", response.status().as_u16()), }}); }} - Ok(response) + let mut bytes = Vec::new(); + response.body_mut().as_reader().read_to_end(&mut bytes) + .map_err(|e| BrkError {{ message: e.to_string() }})?; + Ok(bytes) }} /// Make a GET request and deserialize JSON response. pub fn get_json(&self, path: &str) -> Result {{ - self.get(path)? - .json() + let bytes = self.get(path)?; + serde_json::from_slice(&bytes) .map_err(|e| BrkError {{ message: e.to_string() }}) }} /// Make a GET request and return raw text response. pub fn get_text(&self, path: &str) -> Result {{ - self.get(path)? - .as_str() - .map(|s| s.to_string()) + let bytes = self.get(path)?; + String::from_utf8(bytes) .map_err(|e| BrkError {{ message: e.to_string() }}) }} }} diff --git a/crates/brk_client/Cargo.toml b/crates/brk_client/Cargo.toml index 2e487f063..fff699733 100644 --- a/crates/brk_client/Cargo.toml +++ b/crates/brk_client/Cargo.toml @@ -13,6 +13,6 @@ exclude = ["examples/"] [dependencies] brk_cohort = { workspace = true } brk_types = { workspace = true } -minreq = { workspace = true } +ureq = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/brk_client/src/lib.rs b/crates/brk_client/src/lib.rs index fec470553..de6053894 100644 --- a/crates/brk_client/src/lib.rs +++ b/crates/brk_client/src/lib.rs @@ -7,6 +7,7 @@ #![allow(clippy::useless_format)] #![allow(clippy::unnecessary_to_owned)] +use std::io::Read as _; use std::sync::Arc; use std::ops::{Bound, RangeBounds}; use serde::de::DeserializeOwned; @@ -47,59 +48,61 @@ impl Default for BrkClientOptions { } } -/// Base HTTP client for making requests. +/// Base HTTP client for making requests. Reuses connections via ureq::Agent. #[derive(Debug, Clone)] pub struct BrkClientBase { + agent: ureq::Agent, base_url: String, - timeout_secs: u64, } impl BrkClientBase { /// Create a new client with the given base URL. pub fn new(base_url: impl Into) -> Self { - Self { - base_url: base_url.into(), - timeout_secs: 30, - } + Self::with_options(BrkClientOptions { base_url: base_url.into(), ..Default::default() }) } /// Create a new client with options. pub fn with_options(options: BrkClientOptions) -> Self { + let agent = ureq::Agent::config_builder() + .timeout_global(Some(std::time::Duration::from_secs(options.timeout_secs))) + .build() + .into(); Self { + agent, base_url: options.base_url, - timeout_secs: options.timeout_secs, } } - fn get(&self, path: &str) -> Result { + fn get(&self, path: &str) -> Result> { let base = self.base_url.trim_end_matches('/'); let url = format!("{}{}", base, path); - let response = minreq::get(&url) - .with_timeout(self.timeout_secs) - .send() + let mut response = self.agent.get(&url) + .call() .map_err(|e| BrkError { message: e.to_string() })?; - if response.status_code >= 400 { + if response.status().as_u16() >= 400 { return Err(BrkError { - message: format!("HTTP {}", response.status_code), + message: format!("HTTP {}", response.status().as_u16()), }); } - Ok(response) + let mut bytes = Vec::new(); + response.body_mut().as_reader().read_to_end(&mut bytes) + .map_err(|e| BrkError { message: e.to_string() })?; + Ok(bytes) } /// Make a GET request and deserialize JSON response. pub fn get_json(&self, path: &str) -> Result { - self.get(path)? - .json() + let bytes = self.get(path)?; + serde_json::from_slice(&bytes) .map_err(|e| BrkError { message: e.to_string() }) } /// Make a GET request and return raw text response. pub fn get_text(&self, path: &str) -> Result { - self.get(path)? - .as_str() - .map(|s| s.to_string()) + let bytes = self.get(path)?; + String::from_utf8(bytes) .map_err(|e| BrkError { message: e.to_string() }) } } diff --git a/crates/brk_error/Cargo.toml b/crates/brk_error/Cargo.toml index 6930c331b..0009f75e6 100644 --- a/crates/brk_error/Cargo.toml +++ b/crates/brk_error/Cargo.toml @@ -13,7 +13,7 @@ bitcoincore-rpc = ["dep:bitcoincore-rpc"] corepc = ["dep:corepc-client"] fjall = ["dep:fjall"] jiff = ["dep:jiff"] -minreq = ["dep:minreq"] +ureq = ["dep:ureq"] pco = ["dep:pco"] serde_json = ["dep:serde_json"] tokio = ["dep:tokio"] @@ -25,7 +25,7 @@ bitcoincore-rpc = { workspace = true, optional = true } corepc-client = { workspace = true, optional = true } fjall = { workspace = true, optional = true } jiff = { workspace = true, optional = true } -minreq = { workspace = true, optional = true } +ureq = { workspace = true, optional = true } pco = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } thiserror = "2.0" diff --git a/crates/brk_error/src/lib.rs b/crates/brk_error/src/lib.rs index aa93b604b..2e28560a3 100644 --- a/crates/brk_error/src/lib.rs +++ b/crates/brk_error/src/lib.rs @@ -35,9 +35,9 @@ pub enum Error { #[error(transparent)] RawDB(#[from] vecdb::RawDBError), - #[cfg(feature = "minreq")] + #[cfg(feature = "ureq")] #[error(transparent)] - Minreq(#[from] minreq::Error), + Ureq(#[from] ureq::Error), #[error(transparent)] SystemTimeError(#[from] time::SystemTimeError), @@ -180,8 +180,8 @@ impl Error { /// Returns false for transient errors worth retrying (timeouts, rate limits, server errors). pub fn is_network_permanently_blocked(&self) -> bool { match self { - #[cfg(feature = "minreq")] - Error::Minreq(e) => is_minreq_error_permanent(e), + #[cfg(feature = "ureq")] + Error::Ureq(e) => is_ureq_error_permanent(e), Error::IO(e) => is_io_error_permanent(e), // 403 Forbidden suggests IP/geo blocking; 429 and 5xx are transient Error::HttpStatus { status, .. } => *status == 403, @@ -191,28 +191,18 @@ impl Error { } } -#[cfg(feature = "minreq")] -fn is_minreq_error_permanent(e: &minreq::Error) -> bool { - use minreq::Error::*; - match e { - // DNS resolution failure - likely blocked or misconfigured - IoError(io_err) => is_io_error_permanent(io_err), - // Check error message for common blocking indicators - other => { - let msg = format!("{:?}", other); - // DNS/connection failures - msg.contains("nodename nor servname") - || msg.contains("Name or service not known") - || msg.contains("No such host") - || msg.contains("connection refused") - || msg.contains("Connection refused") - // SSL/TLS failures (often due to blocking/MITM) - || msg.contains("certificate") - || msg.contains("SSL") - || msg.contains("TLS") - || msg.contains("handshake") - } - } +#[cfg(feature = "ureq")] +fn is_ureq_error_permanent(e: &ureq::Error) -> bool { + let msg = format!("{:?}", e); + msg.contains("nodename nor servname") + || msg.contains("Name or service not known") + || msg.contains("No such host") + || msg.contains("connection refused") + || msg.contains("Connection refused") + || msg.contains("certificate") + || msg.contains("SSL") + || msg.contains("TLS") + || msg.contains("handshake") } fn is_io_error_permanent(e: &std::io::Error) -> bool { diff --git a/crates/brk_fetcher/Cargo.toml b/crates/brk_fetcher/Cargo.toml index 0c256aadc..bb4b6b43c 100644 --- a/crates/brk_fetcher/Cargo.toml +++ b/crates/brk_fetcher/Cargo.toml @@ -9,9 +9,9 @@ repository.workspace = true exclude = ["examples/"] [dependencies] -brk_error = { workspace = true, features = ["minreq", "serde_json"] } +brk_error = { workspace = true, features = ["ureq", "serde_json"] } brk_logger = { workspace = true } brk_types = { workspace = true } tracing = { workspace = true } -minreq = { workspace = true } +ureq = { workspace = true } serde_json = { workspace = true } diff --git a/crates/brk_fetcher/src/binance.rs b/crates/brk_fetcher/src/binance.rs index 49cfe3428..ffffcbcd0 100644 --- a/crates/brk_fetcher/src/binance.rs +++ b/crates/brk_fetcher/src/binance.rs @@ -9,14 +9,16 @@ use brk_error::{Error, Result}; use brk_types::{Date, Height, OHLCCents, Timestamp}; use serde_json::Value; use tracing::info; +use ureq::Agent; use crate::{ - PriceSource, check_response, default_retry, + PriceSource, checked_get, default_retry, ohlc::{compute_ohlc_from_range, date_from_timestamp, ohlc_from_array, timestamp_from_ms}, }; #[derive(Clone)] pub struct Binance { + agent: Agent, path: Option, _1mn: Option>, _1d: Option>, @@ -24,8 +26,9 @@ pub struct Binance { } impl Binance { - pub fn init(path: Option<&Path>) -> Self { + pub fn new(path: Option<&Path>, agent: Agent) -> Self { Self { + agent, path: path.map(|p| p.to_owned()), _1mn: None, _1d: None, @@ -41,7 +44,7 @@ impl Binance { // Try live API data first if self._1mn.as_ref().and_then(|m| m.last_key_value()).is_none_or(|(k, _)| k <= ×tamp) { - self._1mn.replace(Self::fetch_1mn()?); + self._1mn.replace(self.fetch_1mn()?); } let res = compute_ohlc_from_range( @@ -68,11 +71,12 @@ impl Binance { ) } - pub fn fetch_1mn() -> Result> { + pub fn fetch_1mn(&self) -> Result> { + let agent = &self.agent; default_retry(|_| { let url = Self::url("interval=1m&limit=1000"); info!("Fetching {url} ..."); - let bytes = check_response(minreq::get(&url).with_timeout(30).send()?, &url)?; + let bytes = checked_get(agent, &url)?; let json: Value = serde_json::from_slice(&bytes)?; Self::parse_ohlc_array(&json) }) @@ -80,7 +84,7 @@ impl Binance { pub fn get_from_1d(&mut self, date: &Date) -> Result { if self._1d.as_ref().and_then(|m| m.last_key_value()).is_none_or(|(k, _)| k <= date) { - self._1d.replace(Self::fetch_1d()?); + self._1d.replace(self.fetch_1d()?); } self._1d @@ -91,11 +95,12 @@ impl Binance { .ok_or(Error::NotFound("Couldn't find date".into())) } - pub fn fetch_1d() -> Result> { + pub fn fetch_1d(&self) -> Result> { + let agent = &self.agent; default_retry(|_| { let url = Self::url("interval=1d"); info!("Fetching {url} ..."); - let bytes = check_response(minreq::get(&url).with_timeout(30).send()?, &url)?; + let bytes = checked_get(agent, &url)?; let json: Value = serde_json::from_slice(&bytes)?; Self::parse_date_ohlc_array(&json) }) @@ -204,10 +209,8 @@ impl Binance { format!("https://api.binance.com/api/v3/uiKlines?symbol=BTCUSDT&{query}") } - pub fn ping() -> Result<()> { - minreq::get("https://api.binance.com/api/v3/ping") - .with_timeout(10) - .send()?; + pub fn ping(&self) -> Result<()> { + self.agent.get("https://api.binance.com/api/v3/ping").call()?; Ok(()) } } @@ -234,7 +237,7 @@ impl PriceSource for Binance { } fn ping(&self) -> Result<()> { - Self::ping() + self.ping() } fn clear(&mut self) { diff --git a/crates/brk_fetcher/src/brk.rs b/crates/brk_fetcher/src/brk.rs index 5e763daea..e15cc58ce 100644 --- a/crates/brk_fetcher/src/brk.rs +++ b/crates/brk_fetcher/src/brk.rs @@ -6,16 +6,28 @@ use brk_types::{ }; use serde_json::Value; use tracing::info; +use ureq::Agent; -use crate::{PriceSource, check_response, default_retry}; +use crate::{PriceSource, checked_get, default_retry}; -#[derive(Default, Clone)] +#[derive(Clone)] #[allow(clippy::upper_case_acronyms)] pub struct BRK { + agent: Agent, height_to_ohlc: BTreeMap>, day1_to_ohlc: BTreeMap>, } +impl BRK { + pub fn new(agent: Agent) -> Self { + Self { + agent, + height_to_ohlc: BTreeMap::new(), + day1_to_ohlc: BTreeMap::new(), + } + } +} + const API_URL: &str = "https://bitview.space/api/vecs"; const CHUNK_SIZE: usize = 10_000; @@ -28,7 +40,7 @@ impl BRK { || ((key + self.height_to_ohlc.get(&key).unwrap().len()) <= height) { self.height_to_ohlc - .insert(key, Self::fetch_height_prices(key)?); + .insert(key, self.fetch_height_prices(key)?); } self.height_to_ohlc @@ -39,7 +51,8 @@ impl BRK { .ok_or(Error::NotFound("Couldn't find height in BRK".into())) } - fn fetch_height_prices(height: Height) -> Result> { + fn fetch_height_prices(&self, height: Height) -> Result> { + let agent = &self.agent; default_retry(|_| { let url = format!( "{API_URL}/height-to-price-ohlc?from={}&to={}", @@ -48,7 +61,7 @@ impl BRK { ); info!("Fetching {url} ..."); - let bytes = check_response(minreq::get(&url).with_timeout(30).send()?, &url)?; + let bytes = checked_get(agent, &url)?; let body: Value = serde_json::from_slice(&bytes)?; body.as_array() @@ -68,7 +81,7 @@ impl BRK { if !self.day1_to_ohlc.contains_key(&key) || ((key + self.day1_to_ohlc.get(&key).unwrap().len()) <= day1) { - self.day1_to_ohlc.insert(key, Self::fetch_date_prices(key)?); + self.day1_to_ohlc.insert(key, self.fetch_date_prices(key)?); } self.day1_to_ohlc @@ -79,7 +92,8 @@ impl BRK { .ok_or(Error::NotFound("Couldn't find date in BRK".into())) } - fn fetch_date_prices(day1: Day1) -> Result> { + fn fetch_date_prices(&self, day1: Day1) -> Result> { + let agent = &self.agent; default_retry(|_| { let url = format!( "{API_URL}/day1-to-price-ohlc?from={}&to={}", @@ -88,7 +102,7 @@ impl BRK { ); info!("Fetching {url}..."); - let bytes = check_response(minreq::get(&url).with_timeout(30).send()?, &url)?; + let bytes = checked_get(agent, &url)?; let body: Value = serde_json::from_slice(&bytes)?; body.as_array() @@ -121,8 +135,8 @@ impl BRK { ))) } - pub fn ping() -> Result<()> { - minreq::get(API_URL).with_timeout(10).send()?; + pub fn ping(&self) -> Result<()> { + self.agent.get(API_URL).call()?; Ok(()) } } @@ -149,7 +163,7 @@ impl PriceSource for BRK { } fn ping(&self) -> Result<()> { - Self::ping() + self.ping() } fn clear(&mut self) { diff --git a/crates/brk_fetcher/src/kraken.rs b/crates/brk_fetcher/src/kraken.rs index fc455d601..8edd5938a 100644 --- a/crates/brk_fetcher/src/kraken.rs +++ b/crates/brk_fetcher/src/kraken.rs @@ -4,18 +4,30 @@ use brk_error::{Error, Result}; use brk_types::{Date, Height, OHLCCents, Timestamp}; use serde_json::Value; use tracing::info; +use ureq::Agent; use crate::{ - PriceSource, check_response, default_retry, + PriceSource, checked_get, default_retry, ohlc::{compute_ohlc_from_range, date_from_timestamp, ohlc_from_array, timestamp_from_secs}, }; -#[derive(Default, Clone)] +#[derive(Clone)] pub struct Kraken { + agent: Agent, _1mn: Option>, _1d: Option>, } +impl Kraken { + pub fn new(agent: Agent) -> Self { + Self { + agent, + _1mn: None, + _1d: None, + } + } +} + impl Kraken { fn get_from_1mn( &mut self, @@ -24,7 +36,7 @@ impl Kraken { ) -> Result { if self._1mn.as_ref().and_then(|m| m.last_key_value()).is_none_or(|(k, _)| k <= ×tamp) { - self._1mn.replace(Self::fetch_1mn()?); + self._1mn.replace(self.fetch_1mn()?); } compute_ohlc_from_range( self._1mn.as_ref().unwrap(), @@ -34,11 +46,12 @@ impl Kraken { ) } - pub fn fetch_1mn() -> Result> { + pub fn fetch_1mn(&self) -> Result> { + let agent = &self.agent; default_retry(|_| { let url = Self::url(1); info!("Fetching {url} ..."); - let bytes = check_response(minreq::get(&url).with_timeout(30).send()?, &url)?; + let bytes = checked_get(agent, &url)?; let json: Value = serde_json::from_slice(&bytes)?; Self::parse_ohlc_response(&json) }) @@ -46,7 +59,7 @@ impl Kraken { fn get_from_1d(&mut self, date: &Date) -> Result { if self._1d.as_ref().and_then(|m| m.last_key_value()).is_none_or(|(k, _)| k <= date) { - self._1d.replace(Self::fetch_1d()?); + self._1d.replace(self.fetch_1d()?); } self._1d .as_ref() @@ -56,11 +69,12 @@ impl Kraken { .ok_or(Error::NotFound("Couldn't find date".into())) } - pub fn fetch_1d() -> Result> { + pub fn fetch_1d(&self) -> Result> { + let agent = &self.agent; default_retry(|_| { let url = Self::url(1440); info!("Fetching {url} ..."); - let bytes = check_response(minreq::get(&url).with_timeout(30).send()?, &url)?; + let bytes = checked_get(agent, &url)?; let json: Value = serde_json::from_slice(&bytes)?; Self::parse_date_ohlc_response(&json) }) @@ -95,10 +109,8 @@ impl Kraken { format!("https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval={interval}") } - pub fn ping() -> Result<()> { - minreq::get("https://api.kraken.com/0/public/Time") - .with_timeout(10) - .send()?; + pub fn ping(&self) -> Result<()> { + self.agent.get("https://api.kraken.com/0/public/Time").call()?; Ok(()) } } @@ -125,7 +137,7 @@ impl PriceSource for Kraken { } fn ping(&self) -> Result<()> { - Self::ping() + self.ping() } fn clear(&mut self) { diff --git a/crates/brk_fetcher/src/lib.rs b/crates/brk_fetcher/src/lib.rs index 26516780d..ea8f230d8 100644 --- a/crates/brk_fetcher/src/lib.rs +++ b/crates/brk_fetcher/src/lib.rs @@ -1,10 +1,12 @@ #![doc = include_str!("../README.md")] +use std::io::Read as _; use std::{path::Path, thread::sleep, time::Duration}; use brk_error::{Error, Result}; use brk_types::{Date, Height, OHLCCents, Timestamp}; use tracing::{info, warn}; +use ureq::Agent; mod binance; mod brk; @@ -22,21 +24,32 @@ pub use source::{PriceSource, TrackedSource}; const MAX_RETRIES: usize = 12 * 60; // 12 hours of retrying -/// Check HTTP response status and return bytes or error -pub fn check_response(response: minreq::Response, url: &str) -> Result> { - let status = response.status_code as u16; - if (200..300).contains(&status) { - Ok(response.into_bytes()) - } else { - Err(Error::HttpStatus { +/// Create a shared HTTP agent with connection pooling and default timeout. +pub fn new_agent(timeout_secs: u64) -> Agent { + Agent::config_builder() + .timeout_global(Some(Duration::from_secs(timeout_secs))) + .build() + .into() +} + +/// Perform a GET request and check the response status. +pub fn checked_get(agent: &Agent, url: &str) -> Result> { + let mut response = agent.get(url).call()?; + let status = response.status().as_u16(); + if status >= 400 { + return Err(Error::HttpStatus { status, url: url.to_string(), - }) + }); } + let mut bytes = Vec::new(); + response.body_mut().as_reader().read_to_end(&mut bytes)?; + Ok(bytes) } #[derive(Clone)] pub struct Fetcher { + pub agent: Agent, pub binance: TrackedSource, pub kraken: TrackedSource, pub brk: TrackedSource, @@ -48,10 +61,12 @@ impl Fetcher { } pub fn new(hars_path: Option<&Path>) -> Result { + let agent = new_agent(30); Ok(Self { - binance: TrackedSource::new(Binance::init(hars_path)), - kraken: TrackedSource::new(Kraken::default()), - brk: TrackedSource::new(BRK::default()), + binance: TrackedSource::new(Binance::new(hars_path, agent.clone())), + kraken: TrackedSource::new(Kraken::new(agent.clone())), + brk: TrackedSource::new(BRK::new(agent.clone())), + agent, }) }