diff --git a/Cargo.toml b/Cargo.toml index 93444b3db..58be4b15d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ smallvec = "1.15.1" tokio = { version = "1.48.0", features = ["rt-multi-thread"] } vecdb = { path = "../anydb/crates/vecdb", features = ["derive", "serde_json", "pco"] } # vecdb = { git = "https://github.com/anydb-rs/anydb", features = ["derive", "serde_json", "pco"] } -# vecdb = { version = "0.3.20", features = ["derive", "serde_json", "pco"] } +# vecdb = { version = "0.4.0", features = ["derive", "serde_json", "pco"] } [workspace.metadata.release] shared-version = true diff --git a/crates/brk_mcp/src/lib.rs b/crates/brk_mcp/src/lib.rs index 1cffb5d85..4a45b43a6 100644 --- a/crates/brk_mcp/src/lib.rs +++ b/crates/brk_mcp/src/lib.rs @@ -56,7 +56,7 @@ Get the list of all existing indexes and their accepted variants. async fn get_indexes(&self) -> Result { info!("mcp: get_indexes"); Ok(CallToolResult::success(vec![ - Content::json(self.query.inner().get_indexes()).unwrap(), + Content::json(self.query.inner().indexes()).unwrap(), ])) } @@ -71,7 +71,7 @@ If the `page` param is omitted, it will default to the first page. ) -> Result { info!("mcp: get_metrics"); Ok(CallToolResult::success(vec![ - Content::json(self.query.sync(|q| q.get_metrics(pagination))).unwrap(), + Content::json(self.query.sync(|q| q.metrics(pagination))).unwrap(), ])) } @@ -88,7 +88,7 @@ If the `page` param is omitted, it will default to the first page. let result = self .query .inner() - .get_index_to_vecids(paginated_index) + .index_to_vecids(paginated_index) .unwrap_or_default(); Ok(CallToolResult::success(vec![ Content::json(result).unwrap(), @@ -122,7 +122,7 @@ The response's format will depend on the given parameters, it will be: ) -> Result { info!("mcp: get_vecs"); Ok(CallToolResult::success(vec![Content::text( - match self.query.run(move |q| q.search_and_format(params)).await { + match self.query.run(move |q| q.search_and_format_legacy(params)).await { Ok(output) => output.to_string(), Err(e) => format!("Error:\n{e}"), }, diff --git a/crates/brk_query/examples/query.rs b/crates/brk_query/examples/query.rs index 1caefa19c..2b858255b 100644 --- a/crates/brk_query/examples/query.rs +++ b/crates/brk_query/examples/query.rs @@ -7,7 +7,7 @@ use brk_mempool::Mempool; use brk_query::Query; use brk_reader::Reader; use brk_rpc::{Auth, Client}; -use brk_types::{Address, DataRangeFormat, Index, MetricSelection, OutputType}; +use brk_types::{Address, OutputType}; use vecdb::Exit; pub fn main() -> Result<()> { @@ -62,11 +62,11 @@ fn run() -> Result<()> { .approximate_len() ); - dbg!(query.address(Address { + let _ = dbg!(query.address(Address { address: "bc1qwzrryqr3ja8w7hnja2spmkgfdcgvqwp5swz4af4ngsjecfz0w0pqud7k38".to_string(), })); - dbg!(query.address_txids( + let _ = dbg!(query.address_txids( Address { address: "bc1qwzrryqr3ja8w7hnja2spmkgfdcgvqwp5swz4af4ngsjecfz0w0pqud7k38".to_string(), }, @@ -74,7 +74,7 @@ fn run() -> Result<()> { 25 )); - dbg!(query.address_utxos(Address { + let _ = dbg!(query.address_utxos(Address { address: "bc1qwzrryqr3ja8w7hnja2spmkgfdcgvqwp5swz4af4ngsjecfz0w0pqud7k38".to_string(), })); diff --git a/crates/brk_query/src/impl/metrics.rs b/crates/brk_query/src/impl/metrics.rs new file mode 100644 index 000000000..06ed3db66 --- /dev/null +++ b/crates/brk_query/src/impl/metrics.rs @@ -0,0 +1,257 @@ +use std::collections::BTreeMap; + +use brk_error::{Error, Result}; +use brk_traversable::TreeNode; +use brk_types::{ + Format, Index, IndexInfo, Limit, Metric, MetricCount, MetricData, PaginatedMetrics, Pagination, + PaginationIndex, +}; +use vecdb::AnyExportableVec; + +use crate::vecs::{IndexToVec, MetricToVec}; +use crate::{DataRangeFormat, MetricSelection, Output, Query}; + +/// Estimated bytes per column header +const CSV_HEADER_BYTES_PER_COL: usize = 10; +/// Estimated bytes per cell value +const CSV_CELL_BYTES: usize = 15; + +impl Query { + pub fn match_metric(&self, metric: &Metric, limit: Limit) -> Vec<&'static str> { + self.vecs().matches(metric, limit) + } + + pub fn metric_not_found_error(&self, metric: &Metric) -> Error { + if let Some(first) = self.match_metric(metric, Limit::MIN).first() { + Error::String(format!("Could not find '{metric}', did you mean '{first}'?")) + } else { + Error::String(format!("Could not find '{metric}'.")) + } + } + + pub(crate) fn columns_to_csv( + columns: &[&dyn AnyExportableVec], + from: Option, + to: Option, + ) -> Result { + if columns.is_empty() { + return Ok(String::new()); + } + + let num_rows = columns[0].range_count(from, to); + let num_cols = columns.len(); + + let estimated_size = + num_cols * CSV_HEADER_BYTES_PER_COL + num_rows * num_cols * CSV_CELL_BYTES; + let mut csv = String::with_capacity(estimated_size); + + for (i, col) in columns.iter().enumerate() { + if i > 0 { + csv.push(','); + } + csv.push_str(col.name()); + } + csv.push('\n'); + + let mut writers: Vec<_> = columns + .iter() + .map(|col| col.create_writer(from, to)) + .collect(); + + for _ in 0..num_rows { + for (i, writer) in writers.iter_mut().enumerate() { + if i > 0 { + csv.push(','); + } + writer.write_next(&mut csv)?; + } + csv.push('\n'); + } + + Ok(csv) + } + + /// Format single metric - returns `MetricData` + pub fn format( + &self, + metric: &dyn AnyExportableVec, + params: &DataRangeFormat, + ) -> Result { + let from = params.from().map(|from| metric.i64_to_usize(from)); + let to = params.to().map(|to| metric.i64_to_usize(to)); + + Ok(match params.format() { + Format::CSV => Output::CSV(Self::columns_to_csv( + &[metric], + from.map(|v| v as i64), + to.map(|v| v as i64), + )?), + Format::JSON => { + let mut buf = Vec::new(); + MetricData::serialize(metric, from, to, &mut buf)?; + Output::Json(buf) + } + }) + } + + /// Format multiple metrics - returns `Vec` + pub fn format_bulk( + &self, + metrics: &[&dyn AnyExportableVec], + params: &DataRangeFormat, + ) -> Result { + let from = params.from().map(|from| { + metrics + .iter() + .map(|v| v.i64_to_usize(from)) + .min() + .unwrap_or_default() + }); + + let to = params.to().map(|to| { + metrics + .iter() + .map(|v| v.i64_to_usize(to)) + .min() + .unwrap_or_default() + }); + + let format = params.format(); + + Ok(match format { + Format::CSV => Output::CSV(Self::columns_to_csv( + metrics, + from.map(|v| v as i64), + to.map(|v| v as i64), + )?), + Format::JSON => { + if metrics.is_empty() { + return Ok(Output::default(format)); + } + + let mut buf = Vec::new(); + buf.push(b'['); + for (i, vec) in metrics.iter().enumerate() { + if i > 0 { + buf.push(b','); + } + MetricData::serialize(*vec, from, to, &mut buf)?; + } + buf.push(b']'); + Output::Json(buf) + } + }) + } + + /// Search for vecs matching the given metrics and index + pub fn search(&self, params: &MetricSelection) -> Vec<&'static dyn AnyExportableVec> { + params + .metrics + .iter() + .filter_map(|metric| self.vecs().get(metric, params.index)) + .collect() + } + + /// Calculate total weight of the vecs for the given range + pub fn weight(vecs: &[&dyn AnyExportableVec], from: Option, to: Option) -> usize { + vecs.iter().map(|v| v.range_weight(from, to)).sum() + } + + /// Search and format single metric + pub fn search_and_format(&self, params: MetricSelection) -> Result { + self.search_and_format_checked(params, usize::MAX) + } + + /// Search and format single metric with weight limit + pub fn search_and_format_checked( + &self, + params: MetricSelection, + max_weight: usize, + ) -> Result { + let vecs = self.search(¶ms); + + let Some(metric) = vecs.first() else { + let metric = params.metrics.first().cloned().unwrap_or_else(|| Metric::from("")); + return Err(self.metric_not_found_error(&metric)); + }; + + let weight = Self::weight(&vecs, params.from(), params.to()); + if weight > max_weight { + return Err(Error::String(format!( + "Request too heavy: {weight} bytes exceeds limit of {max_weight} bytes" + ))); + } + + self.format(*metric, ¶ms.range) + } + + /// Search and format bulk metrics + pub fn search_and_format_bulk(&self, params: MetricSelection) -> Result { + self.search_and_format_bulk_checked(params, usize::MAX) + } + + /// Search and format bulk metrics with weight limit (for DDoS prevention) + pub fn search_and_format_bulk_checked( + &self, + params: MetricSelection, + max_weight: usize, + ) -> Result { + let vecs = self.search(¶ms); + + if vecs.is_empty() { + return Ok(Output::default(params.range.format())); + } + + let weight = Self::weight(&vecs, params.from(), params.to()); + if weight > max_weight { + return Err(Error::String(format!( + "Request too heavy: {weight} bytes exceeds limit of {max_weight} bytes" + ))); + } + + self.format_bulk(&vecs, ¶ms.range) + } + + pub fn metric_to_index_to_vec(&self) -> &BTreeMap<&str, IndexToVec<'_>> { + &self.vecs().metric_to_index_to_vec + } + + pub fn index_to_metric_to_vec(&self) -> &BTreeMap> { + &self.vecs().index_to_metric_to_vec + } + + pub fn metric_count(&self) -> MetricCount { + MetricCount { + distinct_metrics: self.distinct_metric_count(), + total_endpoints: self.total_metric_count(), + } + } + + pub fn distinct_metric_count(&self) -> usize { + self.vecs().distinct_metric_count + } + + pub fn total_metric_count(&self) -> usize { + self.vecs().total_metric_count + } + + pub fn indexes(&self) -> &[IndexInfo] { + &self.vecs().indexes + } + + pub fn metrics(&self, pagination: Pagination) -> PaginatedMetrics { + self.vecs().metrics(pagination) + } + + pub fn metrics_catalog(&self) -> &TreeNode { + self.vecs().catalog() + } + + pub fn index_to_vecids(&self, paginated_index: PaginationIndex) -> Option<&[&str]> { + self.vecs().index_to_ids(paginated_index) + } + + pub fn metric_to_indexes(&self, metric: Metric) -> Option<&Vec> { + self.vecs().metric_to_indexes(metric) + } +} diff --git a/crates/brk_query/src/impl/metrics_legacy.rs b/crates/brk_query/src/impl/metrics_legacy.rs new file mode 100644 index 000000000..e26ec6369 --- /dev/null +++ b/crates/brk_query/src/impl/metrics_legacy.rs @@ -0,0 +1,75 @@ +//! Deprecated metrics formatting without MetricData wrapper. + +use brk_error::{Error, Result}; +use brk_types::Format; +use vecdb::AnyExportableVec; + +use crate::{DataRangeFormat, LegacyValue, MetricSelection, OutputLegacy, Query}; + +impl Query { + /// Deprecated - raw data without MetricData wrapper + pub fn format_legacy(&self, metrics: &[&dyn AnyExportableVec], params: &DataRangeFormat) -> Result { + let from = params + .from() + .map(|from| metrics.iter().map(|v| v.i64_to_usize(from)).min().unwrap_or_default()); + + let to = params + .to() + .map(|to| metrics.iter().map(|v| v.i64_to_usize(to)).min().unwrap_or_default()); + + let format = params.format(); + + Ok(match format { + Format::CSV => OutputLegacy::CSV(Self::columns_to_csv(metrics, from.map(|v| v as i64), to.map(|v| v as i64))?), + Format::JSON => { + if metrics.is_empty() { + return Ok(OutputLegacy::default(format)); + } + + if metrics.len() == 1 { + let metric = metrics[0]; + let count = metric.range_count(from.map(|v| v as i64), to.map(|v| v as i64)); + let mut buf = Vec::new(); + if count == 1 { + metric.write_json_value(from, &mut buf)?; + OutputLegacy::Json(LegacyValue::Value(buf)) + } else { + metric.write_json(from, to, &mut buf)?; + OutputLegacy::Json(LegacyValue::List(buf)) + } + } else { + let mut values = Vec::with_capacity(metrics.len()); + for vec in metrics { + let mut buf = Vec::new(); + vec.write_json(from, to, &mut buf)?; + values.push(buf); + } + OutputLegacy::Json(LegacyValue::Matrix(values)) + } + } + }) + } + + /// Deprecated - use search_and_format instead + pub fn search_and_format_legacy(&self, params: MetricSelection) -> Result { + self.search_and_format_legacy_checked(params, usize::MAX) + } + + /// Deprecated - use search_and_format_checked instead + pub fn search_and_format_legacy_checked(&self, params: MetricSelection, max_weight: usize) -> Result { + let vecs = self.search(¶ms); + + if vecs.is_empty() { + return Ok(OutputLegacy::default(params.range.format())); + } + + let weight = Self::weight(&vecs, params.from(), params.to()); + if weight > max_weight { + return Err(Error::String(format!( + "Request too heavy: {weight} bytes exceeds limit of {max_weight} bytes" + ))); + } + + self.format_legacy(&vecs, ¶ms.range) + } +} diff --git a/crates/brk_query/src/impl/mod.rs b/crates/brk_query/src/impl/mod.rs index 9228f7850..44bff5a35 100644 --- a/crates/brk_query/src/impl/mod.rs +++ b/crates/brk_query/src/impl/mod.rs @@ -5,6 +5,8 @@ mod address; mod block; mod mempool; +mod metrics; +mod metrics_legacy; mod mining; mod transaction; diff --git a/crates/brk_query/src/lib.rs b/crates/brk_query/src/lib.rs index b217febc3..d28259401 100644 --- a/crates/brk_query/src/lib.rs +++ b/crates/brk_query/src/lib.rs @@ -1,16 +1,14 @@ #![doc = include_str!("../README.md")] #![allow(clippy::module_inception)] -use std::{collections::BTreeMap, sync::Arc}; +use std::sync::Arc; use brk_computer::Computer; -use brk_error::{Error, Result}; use brk_indexer::Indexer; use brk_mempool::Mempool; use brk_reader::Reader; -use brk_traversable::TreeNode; -use brk_types::{Format, Height, Index, IndexInfo, Limit, Metric, MetricCount}; -use vecdb::{AnyExportableVec, AnyStoredVec}; +use brk_types::Height; +use vecdb::AnyStoredVec; // Infrastructure modules #[cfg(feature = "tokio")] @@ -29,9 +27,8 @@ pub use brk_types::{ Pagination, PaginationIndex, }; pub use r#impl::BLOCK_TXS_PAGE_SIZE; -pub use output::{Output, Value}; +pub use output::{LegacyValue, Output, OutputLegacy}; -use crate::vecs::{IndexToVec, MetricToVec}; use vecs::Vecs; #[derive(Clone)] @@ -70,194 +67,6 @@ impl Query { Height::from(self.indexer().vecs.block.height_to_blockhash.stamp()) } - // === Metrics methods === - - pub fn match_metric(&self, metric: &Metric, limit: Limit) -> Vec<&'static str> { - self.vecs().matches(metric, limit) - } - - fn columns_to_csv( - columns: &[&&dyn AnyExportableVec], - from: Option, - to: Option, - ) -> Result { - if columns.is_empty() { - return Ok(String::new()); - } - - let num_rows = columns[0].range_count(from, to); - let num_cols = columns.len(); - - let estimated_size = num_cols * 10 + num_rows * num_cols * 15; - let mut csv = String::with_capacity(estimated_size); - - // Write headers from column names - for (idx, col) in columns.iter().enumerate() { - if idx > 0 { - csv.push(','); - } - csv.push_str(col.name()); - } - csv.push('\n'); - - // Create one writer per column - let mut writers: Vec<_> = columns - .iter() - .map(|col| col.create_writer(from, to)) - .collect(); - - for _ in 0..num_rows { - for (index, writer) in writers.iter_mut().enumerate() { - if index > 0 { - csv.push(','); - } - writer.write_next(&mut csv)?; - } - csv.push('\n'); - } - - Ok(csv) - } - - pub fn format( - &self, - metrics: Vec<&&dyn AnyExportableVec>, - params: &DataRangeFormat, - ) -> Result { - let from = params.from().map(|from| { - metrics - .iter() - .map(|v| v.i64_to_usize(from)) - .min() - .unwrap_or_default() - }); - - let to = params.to().map(|to| { - metrics - .iter() - .map(|v| v.i64_to_usize(to)) - .min() - .unwrap_or_default() - }); - - let format = params.format(); - - Ok(match format { - Format::CSV => Output::CSV(Self::columns_to_csv( - &metrics, - from.map(|v| v as i64), - to.map(|v| v as i64), - )?), - Format::JSON => { - let mut values = metrics - .iter() - .map(|vec| vec.collect_range_json_bytes(from, to).map_err(Error::from)) - .collect::>>()?; - - if values.is_empty() { - return Ok(Output::default(format)); - } - - if values.len() == 1 { - Output::Json(Value::List(values.pop().unwrap())) - } else { - Output::Json(Value::Matrix(values)) - } - } - }) - } - - /// Search for vecs matching the given metrics and index - pub fn search(&self, params: &MetricSelection) -> Vec<&'static dyn AnyExportableVec> { - params - .metrics - .iter() - .filter_map(|metric| self.vecs().get(metric, params.index)) - .collect() - } - - /// Calculate total weight of the vecs for the given range - pub fn weight(vecs: &[&dyn AnyExportableVec], from: Option, to: Option) -> usize { - vecs.iter().map(|v| v.range_weight(from, to)).sum() - } - - pub fn search_and_format(&self, params: MetricSelection) -> Result { - let vecs = self.search(¶ms); - - if vecs.is_empty() { - return Ok(Output::default(params.range.format())); - } - - self.format(vecs.iter().collect(), ¶ms.range) - } - - /// Search and format with weight limit (for DDoS prevention) - pub fn search_and_format_checked( - &self, - params: MetricSelection, - max_weight: usize, - ) -> Result { - let vecs = self.search(¶ms); - - if vecs.is_empty() { - return Ok(Output::default(params.range.format())); - } - - let weight = Self::weight(&vecs, params.from(), params.to()); - if weight > max_weight { - return Err(Error::String(format!( - "Request too heavy: {weight} bytes exceeds limit of {max_weight} bytes" - ))); - } - - self.format(vecs.iter().collect(), ¶ms.range) - } - - pub fn metric_to_index_to_vec(&self) -> &BTreeMap<&str, IndexToVec<'_>> { - &self.vecs().metric_to_index_to_vec - } - - pub fn index_to_metric_to_vec(&self) -> &BTreeMap> { - &self.vecs().index_to_metric_to_vec - } - - pub fn metric_count(&self) -> MetricCount { - MetricCount { - distinct_metrics: self.distinct_metric_count(), - total_endpoints: self.total_metric_count(), - } - } - - pub fn distinct_metric_count(&self) -> usize { - self.vecs().distinct_metric_count - } - - pub fn total_metric_count(&self) -> usize { - self.vecs().total_metric_count - } - - pub fn get_indexes(&self) -> &[IndexInfo] { - &self.vecs().indexes - } - - pub fn get_metrics(&self, pagination: Pagination) -> PaginatedMetrics { - self.vecs().metrics(pagination) - } - - pub fn get_metrics_catalog(&self) -> &TreeNode { - self.vecs().catalog() - } - - pub fn get_index_to_vecids(&self, paginated_index: PaginationIndex) -> Option<&[&str]> { - self.vecs().index_to_ids(paginated_index) - } - - pub fn metric_to_indexes(&self, metric: Metric) -> Option<&Vec> { - self.vecs().metric_to_indexes(metric) - } - - // === Core accessors === - #[inline] pub fn reader(&self) -> &Reader { &self.0.reader diff --git a/crates/brk_query/src/output.rs b/crates/brk_query/src/output.rs index 23755e0f7..7e5da2451 100644 --- a/crates/brk_query/src/output.rs +++ b/crates/brk_query/src/output.rs @@ -1,8 +1,9 @@ use brk_types::Format; +/// New format with MetricData metadata wrapper #[derive(Debug)] pub enum Output { - Json(Value), + Json(Vec), CSV(String), } @@ -11,44 +12,67 @@ impl Output { pub fn to_string(self) -> String { match self { Output::CSV(s) => s, - Output::Json(v) => unsafe { String::from_utf8_unchecked(v.to_vec()) }, + Output::Json(v) => unsafe { String::from_utf8_unchecked(v) }, + } + } + + pub fn default(format: Format) -> Self { + match format { + Format::CSV => Output::CSV(String::new()), + Format::JSON => Output::Json(br#"{"len":0,"from":0,"to":0,"data":[]}"#.to_vec()), } } } +/// Deprecated: Raw JSON without metadata wrapper #[derive(Debug)] -pub enum Value { - Matrix(Vec>), - List(Vec), +pub enum OutputLegacy { + Json(LegacyValue), + CSV(String), } -impl Value { +impl OutputLegacy { + #[allow(clippy::inherent_to_string)] + pub fn to_string(self) -> String { + match self { + OutputLegacy::CSV(s) => s, + OutputLegacy::Json(v) => unsafe { String::from_utf8_unchecked(v.to_vec()) }, + } + } + + pub fn default(format: Format) -> Self { + match format { + Format::CSV => OutputLegacy::CSV(String::new()), + Format::JSON => OutputLegacy::Json(LegacyValue::List(b"[]".to_vec())), + } + } +} + +/// Deprecated: Raw JSON without metadata wrapper. +#[derive(Debug)] +pub enum LegacyValue { + Matrix(Vec>), + List(Vec), + Value(Vec), +} + +impl LegacyValue { pub fn to_vec(self) -> Vec { match self { - Value::List(v) => v, - Self::Matrix(m) => { - let total_size = m.iter().map(|v| v.len()).sum::() + m.len() - 1 + 2; - let mut matrix = Vec::with_capacity(total_size); - matrix.push(b'['); - + LegacyValue::Value(v) | LegacyValue::List(v) => v, + LegacyValue::Matrix(m) => { + let total_size = m.iter().map(|v| v.len()).sum::() + m.len() + 1; + let mut buf = Vec::with_capacity(total_size); + buf.push(b'['); for (i, vec) in m.into_iter().enumerate() { if i > 0 { - matrix.push(b','); + buf.push(b','); } - matrix.extend(vec); + buf.extend(vec); } - matrix.push(b']'); - matrix + buf.push(b']'); + buf } } } } - -impl Output { - pub fn default(format: Format) -> Self { - match format { - Format::CSV => Output::CSV("".to_string()), - Format::JSON => Output::Json(Value::List(b"[]".to_vec())), - } - } -} diff --git a/crates/brk_server/src/api/metrics/bulk.rs b/crates/brk_server/src/api/metrics/bulk.rs new file mode 100644 index 000000000..843d1884b --- /dev/null +++ b/crates/brk_server/src/api/metrics/bulk.rs @@ -0,0 +1,103 @@ +use std::time::Duration; + +use axum::{ + body::Body, + extract::{Query, State}, + http::{HeaderMap, StatusCode, Uri}, + response::{IntoResponse, Response}, +}; +use brk_query::{MetricSelection, Output}; +use brk_types::Format; +use quick_cache::sync::GuardResult; + +use crate::{ + CacheStrategy, api::metrics::MAX_WEIGHT, cache::CacheParams, extended::HeaderMapExtended, +}; + +use super::AppState; + +pub async fn handler( + uri: Uri, + headers: HeaderMap, + query: Query, + State(state): State, +) -> Response { + match req_to_response_res(uri, headers, query, state).await { + Ok(response) => response, + Err(error) => { + let mut response = + (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()).into_response(); + response.headers_mut().insert_cors(); + response + } + } +} + +async fn req_to_response_res( + uri: Uri, + headers: HeaderMap, + Query(params): Query, + AppState { query, cache, .. }: AppState, +) -> brk_error::Result { + let format = params.format(); + let height = query.sync(|q| q.height()); + let to = params.to(); + + let cache_params = CacheParams::resolve(&CacheStrategy::height_with(format!("{to:?}")), || { + height.into() + }); + + if cache_params.matches_etag(&headers) { + let mut response = (StatusCode::NOT_MODIFIED, "").into_response(); + response.headers_mut().insert_cors(); + return Ok(response); + } + + let cache_key = format!( + "{}{}{}", + uri.path(), + uri.query().unwrap_or(""), + cache_params.etag_str() + ); + let guard_res = cache.get_value_or_guard(&cache_key, Some(Duration::from_millis(50))); + + let mut response = if let GuardResult::Value(v) = guard_res { + Response::new(Body::from(v)) + } else { + match query + .run(move |q| q.search_and_format_bulk_checked(params, MAX_WEIGHT)) + .await? + { + Output::CSV(s) => { + if let GuardResult::Guard(g) = guard_res { + let _ = g.insert(s.clone().into()); + } + s.into_response() + } + Output::Json(v) => { + let json = v.to_vec(); + if let GuardResult::Guard(g) = guard_res { + let _ = g.insert(json.clone().into()); + } + json.into_response() + } + } + }; + + let headers = response.headers_mut(); + headers.insert_cors(); + if let Some(etag) = &cache_params.etag { + headers.insert_etag(etag); + } + headers.insert_cache_control(&cache_params.cache_control); + + match format { + Format::CSV => { + headers.insert_content_disposition_attachment(); + headers.insert_content_type_text_csv() + } + Format::JSON => headers.insert_content_type_application_json(), + } + + Ok(response) +} diff --git a/crates/brk_server/src/api/metrics/data.rs b/crates/brk_server/src/api/metrics/data.rs index 97f9cfd27..dd9ef9fa8 100644 --- a/crates/brk_server/src/api/metrics/data.rs +++ b/crates/brk_server/src/api/metrics/data.rs @@ -1,3 +1,5 @@ +//! Handler for single metric data endpoint. + use std::time::Duration; use axum::{ @@ -10,13 +12,12 @@ use brk_query::{MetricSelection, Output}; use brk_types::Format; use quick_cache::sync::GuardResult; -use crate::{CacheStrategy, cache::CacheParams, extended::HeaderMapExtended}; +use crate::{ + CacheStrategy, api::metrics::MAX_WEIGHT, cache::CacheParams, extended::HeaderMapExtended, +}; use super::AppState; -/// Maximum allowed request weight in bytes (650KB) -const MAX_WEIGHT: usize = 65 * 10_000; - pub async fn handler( uri: Uri, headers: HeaderMap, @@ -44,7 +45,9 @@ async fn req_to_response_res( let height = query.sync(|q| q.height()); let to = params.to(); - let cache_params = CacheParams::resolve(&CacheStrategy::height_with(format!("{to:?}")), || height.into()); + let cache_params = CacheParams::resolve(&CacheStrategy::height_with(format!("{to:?}")), || { + height.into() + }); if cache_params.matches_etag(&headers) { let mut response = (StatusCode::NOT_MODIFIED, "").into_response(); @@ -52,7 +55,12 @@ async fn req_to_response_res( return Ok(response); } - let cache_key = format!("{}{}{}", uri.path(), uri.query().unwrap_or(""), cache_params.etag_str()); + let cache_key = format!( + "single-{}{}{}", + uri.path(), + uri.query().unwrap_or(""), + cache_params.etag_str() + ); let guard_res = cache.get_value_or_guard(&cache_key, Some(Duration::from_millis(50))); let mut response = if let GuardResult::Value(v) = guard_res { diff --git a/crates/brk_server/src/api/metrics/legacy.rs b/crates/brk_server/src/api/metrics/legacy.rs new file mode 100644 index 000000000..f25818283 --- /dev/null +++ b/crates/brk_server/src/api/metrics/legacy.rs @@ -0,0 +1,105 @@ +//! Deprecated handler for legacy endpoints - returns raw data without MetricData wrapper. + +use std::time::Duration; + +use axum::{ + body::Body, + extract::{Query, State}, + http::{HeaderMap, StatusCode, Uri}, + response::{IntoResponse, Response}, +}; +use brk_query::{MetricSelection, OutputLegacy}; +use brk_types::Format; +use quick_cache::sync::GuardResult; + +use crate::{ + CacheStrategy, api::metrics::MAX_WEIGHT, cache::CacheParams, extended::HeaderMapExtended, +}; + +use super::AppState; + +pub async fn handler( + uri: Uri, + headers: HeaderMap, + query: Query, + State(state): State, +) -> Response { + match req_to_response_res(uri, headers, query, state).await { + Ok(response) => response, + Err(error) => { + let mut response = + (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()).into_response(); + response.headers_mut().insert_cors(); + response + } + } +} + +async fn req_to_response_res( + uri: Uri, + headers: HeaderMap, + Query(params): Query, + AppState { query, cache, .. }: AppState, +) -> brk_error::Result { + let format = params.format(); + let height = query.sync(|q| q.height()); + let to = params.to(); + + let cache_params = CacheParams::resolve(&CacheStrategy::height_with(format!("{to:?}")), || { + height.into() + }); + + if cache_params.matches_etag(&headers) { + let mut response = (StatusCode::NOT_MODIFIED, "").into_response(); + response.headers_mut().insert_cors(); + return Ok(response); + } + + let cache_key = format!( + "legacy-{}{}{}", + uri.path(), + uri.query().unwrap_or(""), + cache_params.etag_str() + ); + let guard_res = cache.get_value_or_guard(&cache_key, Some(Duration::from_millis(50))); + + let mut response = if let GuardResult::Value(v) = guard_res { + Response::new(Body::from(v)) + } else { + match query + .run(move |q| q.search_and_format_legacy_checked(params, MAX_WEIGHT)) + .await? + { + OutputLegacy::CSV(s) => { + if let GuardResult::Guard(g) = guard_res { + let _ = g.insert(s.clone().into()); + } + s.into_response() + } + OutputLegacy::Json(v) => { + let json = v.to_vec(); + if let GuardResult::Guard(g) = guard_res { + let _ = g.insert(json.clone().into()); + } + json.into_response() + } + } + }; + + let headers = response.headers_mut(); + headers.insert_cors(); + if let Some(etag) = &cache_params.etag { + headers.insert_etag(etag); + } + headers.insert_cache_control(&cache_params.cache_control); + + match format { + Format::CSV => { + headers.insert_content_disposition_attachment(); + headers.insert_content_type_text_csv() + } + Format::JSON => headers.insert_content_type_application_json(), + } + + Ok(response) +} diff --git a/crates/brk_server/src/api/metrics/mod.rs b/crates/brk_server/src/api/metrics/mod.rs index 19c748d52..c7c172f2f 100644 --- a/crates/brk_server/src/api/metrics/mod.rs +++ b/crates/brk_server/src/api/metrics/mod.rs @@ -5,15 +5,24 @@ use axum::{ response::{IntoResponse, Redirect, Response}, routing::get, }; -use brk_query::{DataRangeFormat, MetricSelection, MetricSelectionLegacy, PaginatedMetrics, Pagination}; +use brk_query::{ + DataRangeFormat, MetricSelection, MetricSelectionLegacy, PaginatedMetrics, Pagination, +}; use brk_traversable::TreeNode; -use brk_types::{Index, IndexInfo, Limit, Metric, MetricCount, Metrics}; +use brk_types::{ + Index, IndexInfo, Limit, Metric, MetricCount, MetricData, MetricWithIndex, Metrics, +}; use crate::{CacheStrategy, extended::TransformResponseExtended}; use super::AppState; +mod bulk; mod data; +mod legacy; + +/// Maximum allowed request weight in bytes (650KB) +const MAX_WEIGHT: usize = 65 * 10_000; pub trait ApiMetricsRoutes { fn add_metrics_routes(self) -> Self; @@ -48,7 +57,7 @@ impl ApiMetricsRoutes for ApiRouter { headers: HeaderMap, State(state): State | { - state.cached_json(&headers, CacheStrategy::Static, |q| Ok(q.get_indexes().to_vec())).await + state.cached_json(&headers, CacheStrategy::Static, |q| Ok(q.indexes().to_vec())).await }, |op| op .metrics_tag() @@ -68,7 +77,7 @@ impl ApiMetricsRoutes for ApiRouter { State(state): State, Query(pagination): Query | { - state.cached_json(&headers, CacheStrategy::Static, move |q| Ok(q.get_metrics(pagination))).await + state.cached_json(&headers, CacheStrategy::Static, move |q| Ok(q.metrics(pagination))).await }, |op| op .metrics_tag() @@ -82,7 +91,7 @@ impl ApiMetricsRoutes for ApiRouter { "/api/metrics/catalog", get_with( async |headers: HeaderMap, State(state): State| { - state.cached_json(&headers, CacheStrategy::Static, |q| Ok(q.get_metrics_catalog().clone())).await + state.cached_json(&headers, CacheStrategy::Static, |q| Ok(q.metrics_catalog().clone())).await }, |op| op .metrics_tag() @@ -125,13 +134,7 @@ impl ApiMetricsRoutes for ApiRouter { if let Some(indexes) = q.metric_to_indexes(metric.clone()) { return Ok(indexes.clone()) } - Err(brk_error::Error::String( - if let Some(first) = q.match_metric(&metric, Limit::MIN).first() { - format!("Could not find '{metric}', did you mean '{first}' ?") - } else { - format!("Could not find '{metric}'.") - } - )) + Err(q.metric_not_found_error(&metric)) }).await }, |op| op @@ -146,25 +149,48 @@ impl ApiMetricsRoutes for ApiRouter { .not_found(), ), ) - // WIP - .route("/api/metrics/bulk", get(data::handler)) - .route( + .api_route( "/api/metric/{metric}/{index}", - get( + get_with( async |uri: Uri, headers: HeaderMap, state: State, - Path((metric, index)): Path<(Metric, Index)>, + Path(path): Path, Query(range): Query| -> Response { data::handler( uri, headers, - Query(MetricSelection::from((index, metric, range))), + Query(MetricSelection::from((path.index, path.metric, range))), state, ) .await }, + |op| op + .metrics_tag() + .summary("Get metric data") + .description( + "Fetch data for a specific metric at the given index. \ + Use query parameters to filter by date range and format (json/csv)." + ) + .ok_response::() + .not_modified() + .not_found(), + ), + ) + .api_route( + "/api/metrics/bulk", + get_with( + bulk::handler, + |op| op + .metrics_tag() + .summary("Bulk metric data") + .description( + "Fetch multiple metrics in a single request. Supports filtering by index and date range. \ + Returns an array of MetricData objects." + ) + .ok_response::>() + .not_modified(), ), ) // !!! @@ -178,7 +204,7 @@ impl ApiMetricsRoutes for ApiRouter { Query(params): Query, state: State| -> Response { - data::handler(uri, headers, Query(params.into()), state).await + legacy::handler(uri, headers, Query(params.into()), state).await }, ), ) @@ -208,7 +234,7 @@ impl ApiMetricsRoutes for ApiRouter { Metrics::from(split.collect::>().join(separator)), range, )); - data::handler(uri, headers, Query(params), state).await + legacy::handler(uri, headers, Query(params), state).await }, ), ) diff --git a/crates/brk_types/src/feerate.rs b/crates/brk_types/src/feerate.rs index 5a5fb4a49..ba95c24f8 100644 --- a/crates/brk_types/src/feerate.rs +++ b/crates/brk_types/src/feerate.rs @@ -9,6 +9,7 @@ use vecdb::{Formattable, Pco}; use super::{Sats, VSize}; +/// Fee rate in sats/vB #[derive(Debug, Default, Clone, Copy, Serialize, Pco, JsonSchema)] pub struct FeeRate(f64); diff --git a/crates/brk_types/src/lib.rs b/crates/brk_types/src/lib.rs index c460f0313..c42d6cac1 100644 --- a/crates/brk_types/src/lib.rs +++ b/crates/brk_types/src/lib.rs @@ -34,6 +34,8 @@ mod blocktimestamp; mod blockweightentry; mod bytes; mod cents; +mod datarange; +mod datarangeformat; mod date; mod dateindex; mod decadeindex; @@ -62,14 +64,14 @@ mod loadedaddressindex; mod mempoolblock; mod mempoolentryinfo; mod mempoolinfo; -mod datarange; -mod datarangeformat; mod metric; mod metriccount; +mod metricdata; mod metrics; mod metricselection; mod metricselectionlegacy; mod metricspaginated; +mod metricwithindex; mod monthindex; mod ohlc; mod opreturnindex; @@ -180,6 +182,8 @@ pub use blocktimestamp::*; pub use blockweightentry::*; pub use bytes::*; pub use cents::*; +pub use datarange::*; +pub use datarangeformat::*; pub use date::*; pub use dateindex::*; pub use decadeindex::*; @@ -208,14 +212,14 @@ pub use loadedaddressindex::*; pub use mempoolblock::*; pub use mempoolentryinfo::*; pub use mempoolinfo::*; -pub use datarange::*; -pub use datarangeformat::*; pub use metric::*; pub use metriccount::*; +pub use metricdata::*; pub use metrics::*; pub use metricselection::*; pub use metricselectionlegacy::*; pub use metricspaginated::*; +pub use metricwithindex::*; pub use monthindex::*; pub use ohlc::*; pub use opreturnindex::*; diff --git a/crates/brk_types/src/metricdata.rs b/crates/brk_types/src/metricdata.rs new file mode 100644 index 000000000..239e8e90b --- /dev/null +++ b/crates/brk_types/src/metricdata.rs @@ -0,0 +1,44 @@ +use std::io::Write; + +use schemars::JsonSchema; +use serde_json::Value; +use vecdb::AnySerializableVec; + +/// Metric data with range information. +/// +/// All metric data endpoints return this structure when format is JSON. +/// This type is not instantiated - use `MetricData::serialize()` to write JSON bytes directly. +#[derive(JsonSchema)] +pub struct MetricData { + /// Number of data points returned + pub len: usize, + /// Start index (inclusive) of the returned range + pub from: usize, + /// End index (exclusive) of the returned range + pub to: usize, + /// The metric data + pub data: Vec, +} + +impl MetricData { + /// Write metric data as JSON to buffer: `{"len":N,"from":N,"to":N,"data":[...]}` + pub fn serialize( + vec: &dyn AnySerializableVec, + from: Option, + to: Option, + buf: &mut Vec, + ) -> vecdb::Result<()> { + let len = vec.len(); + let from_idx = from.unwrap_or(0); + let to_idx = to.unwrap_or(len).min(len); + let range_len = to_idx.saturating_sub(from_idx); + + write!( + buf, + r#"{{"len":{range_len},"from":{from_idx},"to":{to_idx},"data":"# + )?; + vec.write_json(from, to, buf)?; + buf.push(b'}'); + Ok(()) + } +} diff --git a/crates/brk_types/src/metricwithindex.rs b/crates/brk_types/src/metricwithindex.rs new file mode 100644 index 000000000..461a4e524 --- /dev/null +++ b/crates/brk_types/src/metricwithindex.rs @@ -0,0 +1,10 @@ +use schemars::JsonSchema; +use serde::Deserialize; + +use crate::{Index, Metric}; + +#[derive(Deserialize, JsonSchema)] +pub struct MetricWithIndex { + pub metric: Metric, + pub index: Index, +}