diff --git a/crates/brk_cli/src/lib.rs b/crates/brk_cli/src/lib.rs index 810c65bec..ba0a5e828 100644 --- a/crates/brk_cli/src/lib.rs +++ b/crates/brk_cli/src/lib.rs @@ -1,4 +1,4 @@ -use std::fs; +use std::{fs, thread}; use brk_core::{dot_brk_log_path, dot_brk_path}; use brk_query::Params as QueryArgs; @@ -35,8 +35,12 @@ pub fn main() -> color_eyre::Result<()> { let cli = Cli::parse(); - match cli.command { - Commands::Run(args) => run(args), - Commands::Query(args) => query(args), - } + thread::Builder::new() + .stack_size(128 * 1024 * 1024) + .spawn(|| match cli.command { + Commands::Run(args) => run(args), + Commands::Query(args) => query(args), + })? + .join() + .unwrap() } diff --git a/crates/brk_cli/src/query.rs b/crates/brk_cli/src/query.rs index 41763e90b..57b3d2379 100644 --- a/crates/brk_cli/src/query.rs +++ b/crates/brk_cli/src/query.rs @@ -19,12 +19,14 @@ pub fn query(params: QueryParams) -> color_eyre::Result<()> { let query = Query::build(&indexer, &computer); let index = Index::try_from(params.index.as_str())?; - let ids = params.values.iter().map(|s| s.as_str()).collect::>(); + let from = params.from(); + let to = params.to(); + let format = params.format(); - let res = query.search_and_format(index, &ids, params.from, params.to, params.format)?; + let res = query.search_and_format(index, &ids, from, to, format)?; - if params.format.is_some() { + if format.is_some() { println!("{}", res); } else { println!( diff --git a/crates/brk_cli/src/run.rs b/crates/brk_cli/src/run.rs index c34ac3f33..148cb0f1a 100644 --- a/crates/brk_cli/src/run.rs +++ b/crates/brk_cli/src/run.rs @@ -1,7 +1,7 @@ use std::{ fs, path::{Path, PathBuf}, - thread::{self, sleep}, + thread::sleep, time::Duration, }; @@ -49,69 +49,61 @@ pub fn run(config: RunConfig) -> color_eyre::Result<()> { Ok(()) }; - let f = move || -> color_eyre::Result<()> { - let mut computer = Computer::new(&config.outputsdir(), config.fetcher(), format); - computer.import_stores(&indexer)?; - computer.import_vecs(&indexer, config.computation())?; + let mut computer = Computer::new(&config.outputsdir(), config.fetcher(), format); + computer.import_stores(&indexer)?; + computer.import_vecs(&indexer, config.computation())?; - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build()? - .block_on(async { - let server = if config.serve() { - let served_indexer = indexer.clone(); - let served_computer = computer.clone(); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()? + .block_on(async { + let server = if config.serve() { + let served_indexer = indexer.clone(); + let served_computer = computer.clone(); - let server = Server::new(served_indexer, served_computer, config.website())?; + let server = Server::new(served_indexer, served_computer, config.website())?; - let opt = Some(tokio::spawn(async move { - server.serve().await.unwrap(); - })); + let opt = Some(tokio::spawn(async move { + server.serve().await.unwrap(); + })); - sleep(Duration::from_secs(1)); + sleep(Duration::from_secs(1)); - opt - } else { - None - }; + opt + } else { + None + }; - if config.process() { - loop { - wait_for_synced_node()?; + if config.process() { + loop { + wait_for_synced_node()?; - let block_count = rpc.get_block_count()?; + let block_count = rpc.get_block_count()?; - info!("{} blocks found.", block_count + 1); + info!("{} blocks found.", block_count + 1); - let starting_indexes = indexer.index(&parser, rpc, &exit)?; + let starting_indexes = indexer.index(&parser, rpc, &exit)?; - computer.compute(&mut indexer, starting_indexes, &exit)?; + computer.compute(&mut indexer, starting_indexes, &exit)?; - if let Some(delay) = config.delay() { - sleep(Duration::from_secs(delay)) - } + if let Some(delay) = config.delay() { + sleep(Duration::from_secs(delay)) + } - info!("Waiting for new blocks..."); + info!("Waiting for new blocks..."); - while block_count == rpc.get_block_count()? { - sleep(Duration::from_secs(1)) - } + while block_count == rpc.get_block_count()? { + sleep(Duration::from_secs(1)) } } + } - if let Some(handle) = server { - handle.await.unwrap(); - } + if let Some(handle) = server { + handle.await.unwrap(); + } - Ok(()) - }) - }; - - thread::Builder::new() - .stack_size(128 * 1024 * 1024) - .spawn(f)? - .join() - .unwrap() + Ok(()) + }) } #[derive(Parser, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)] diff --git a/crates/brk_computer/src/vecs/mod.rs b/crates/brk_computer/src/vecs/mod.rs index daafaba55..66ec810d1 100644 --- a/crates/brk_computer/src/vecs/mod.rs +++ b/crates/brk_computer/src/vecs/mod.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::{fs, path::Path, thread}; use brk_core::Version; use brk_exit::Exit; @@ -44,22 +44,31 @@ impl Vecs { ) -> color_eyre::Result { fs::create_dir_all(path)?; - let indexes = indexes::Vecs::forced_import( - path, - version + VERSION + Version::ZERO, - indexer, - computation, - format, - )?; + let (indexes, fetched) = thread::scope(|s| { + let indexes_handle = s.spawn(|| { + indexes::Vecs::forced_import( + path, + version + VERSION + Version::ZERO, + indexer, + computation, + format, + ) + .unwrap() + }); - let fetched = fetch.then(|| { - fetched::Vecs::forced_import( - path, - version + VERSION + Version::ZERO, - computation, - format, - ) - .unwrap() + let fetch_handle = s.spawn(|| { + fetch.then(|| { + fetched::Vecs::forced_import( + path, + version + VERSION + Version::ZERO, + computation, + format, + ) + .unwrap() + }) + }); + + (indexes_handle.join().unwrap(), fetch_handle.join().unwrap()) }); Ok(Self { diff --git a/crates/brk_core/src/structs/version.rs b/crates/brk_core/src/structs/version.rs index fa2451d8f..8a555293d 100644 --- a/crates/brk_core/src/structs/version.rs +++ b/crates/brk_core/src/structs/version.rs @@ -33,7 +33,9 @@ impl Version { Self(self.0.swap_bytes()) } - pub fn validate(&self, path: &Path) -> Result<()> { + /// Ok(true) if existed and is same + /// Ok(false) if didn't exist + pub fn validate(&self, path: &Path) -> Result { if let Ok(prev_version) = Version::try_from(path) { if prev_version != *self { if prev_version.swap_bytes() == *self { @@ -44,9 +46,11 @@ impl Version { expected: *self, }); } - } - Ok(()) + Ok(true) + } else { + Ok(false) + } } } diff --git a/crates/brk_query/src/lib.rs b/crates/brk_query/src/lib.rs index a9985f091..aad5ca73e 100644 --- a/crates/brk_query/src/lib.rs +++ b/crates/brk_query/src/lib.rs @@ -19,7 +19,7 @@ mod vec_trees; pub use format::Format; pub use index::Index; pub use output::{Output, Value}; -pub use params::Params; +pub use params::{Params, ParamsOpt}; pub use table::Tabled; use vec_trees::VecTrees; diff --git a/crates/brk_query/src/params.rs b/crates/brk_query/src/params.rs index 64f648f24..6446ac010 100644 --- a/crates/brk_query/src/params.rs +++ b/crates/brk_query/src/params.rs @@ -1,6 +1,8 @@ +use std::{fmt::Display, ops::Deref, str::FromStr}; + use clap::builder::PossibleValuesParser; use clap_derive::Parser; -use serde::Deserialize; +use serde::{Deserialize, Deserializer}; use serde_with::{OneOrMany, formats::PreferOne, serde_as}; use crate::{Format, Index}; @@ -17,18 +19,111 @@ pub struct Params { #[serde_as(as = "OneOrMany<_, PreferOne>")] /// Names of the values requested pub values: Vec, + + #[clap(flatten)] + #[serde(flatten)] + pub rest: ParamsOpt, +} + +// The macro creates custom deserialization code. +// You need to specify a function name and the field name of the flattened field. +serde_with::flattened_maybe!(deserialize_rest, "rest"); + +impl Deref for Params { + type Target = ParamsOpt; + fn deref(&self) -> &Self::Target { + &self.rest + } +} + +impl From<((String, String), ParamsOpt)> for Params { + fn from(((index, id), rest): ((String, String), ParamsOpt)) -> Self { + Self { + index, + values: vec![id], + rest, + } + } +} + +#[serde_as] +#[derive(Debug, Deserialize, Parser)] +pub struct ParamsOpt { #[clap(short, long, allow_hyphen_values = true)] - #[serde(alias = "f")] + #[serde(default, alias = "f", deserialize_with = "de_unquote_i64")] /// Inclusive starting index, if negative will be from the end - pub from: Option, + from: Option, #[clap(short, long, allow_hyphen_values = true)] - #[serde(default, alias = "t")] + #[serde(default, alias = "t", deserialize_with = "de_unquote_i64")] /// Exclusive ending index, if negative will be from the end, overrides 'count' - pub to: Option, - #[serde(default, alias = "c")] + to: Option, + #[clap(short, long, allow_hyphen_values = true)] + #[serde(default, alias = "c", deserialize_with = "de_unquote_usize")] /// Number of values - pub count: Option, + count: Option, #[clap(short = 'F', long)] /// Format of the output - pub format: Option, + format: Option, +} + +impl ParamsOpt { + pub fn from(&self) -> Option { + self.from + } + + pub fn to(&self) -> Option { + if self.to.is_none() { + if let Some(c) = self.count { + let c = c as i64; + if let Some(f) = self.from { + if f.is_positive() || f.abs() > c { + return Some(f + c); + } + } else { + return Some(c); + } + } + } + self.to + } + + pub fn format(&self) -> Option { + self.format + } +} + +fn de_unquote_i64<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + de_unquote(deserializer) +} + +fn de_unquote_usize<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + de_unquote(deserializer) +} + +fn de_unquote<'de, D, F>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, + F: FromStr + Display, + ::Err: Display, +{ + let opt: Option = Option::deserialize(deserializer)?; + let s = match opt { + None => return Ok(None), + Some(mut s) => { + // strip any leading/trailing quotes + if s.starts_with('"') && s.ends_with('"') && s.len() >= 2 { + s = s[1..s.len() - 1].to_string(); + } + s + } + }; + s.parse::() + .map(Some) + .map_err(|e| serde::de::Error::custom(format!("cannot parse `{}` as type: {}", s, e))) } diff --git a/crates/brk_server/README.md b/crates/brk_server/README.md index d6c769a3e..930bd5c0f 100644 --- a/crates/brk_server/README.md +++ b/crates/brk_server/README.md @@ -41,19 +41,35 @@ The API uses `brk_query` and so inherites all of its features including formats. ### API -#### `GET /api/vecs/indexes` +#### [`GET /api/vecs/index-count`](https://bitcoinresearchkit.org/api/vecs/index-count) + +Count of all possible indexes + +#### [`GET /api/vecs/id-count`](https://bitcoinresearchkit.org/api/vecs/id-count) + +Count of all possible ids + +#### [`GET /api/vecs/variant-count`](https://bitcoinresearchkit.org/api/vecs/variant-count) + +Count of all possible variants + +#### [`GET /api/vecs/indexes`](https://bitcoinresearchkit.org/api/vecs/indexes) A list of all possible vec indexes and their accepted variants -#### `GET /api/vecs/ids` +#### [`GET /api/vecs/ids`](https://bitcoinresearchkit.org/api/vecs/ids) A list of all possible vec ids -#### `GET /api/vecs/id-to-indexes` +#### [`GET /api/vecs/variants`](https://bitcoinresearchkit.org/api/vecs/variants) + +A list of all possible variants + +#### [`GET /api/vecs/id-to-indexes`](https://bitcoinresearchkit.org/api/vecs/id-to-indexes) A list of all possible vec ids and their supported vec indexes -#### `GET /api/vecs/index-to-ids` +#### [`GET /api/vecs/index-to-ids`](https://bitcoinresearchkit.org/api/vecs/index-to-ids) A list of all possible vec indexes and their supported vec ids @@ -67,8 +83,9 @@ This endpoint retrieves data based on the specified vector index and values. | --- | --- | --- | --- | | `index` | `VecIndex` | Yes | The vector index to query. | | `values` | `VecId[]` | Yes | A comma or space-separated list of vector IDs to retrieve. | -| `from` | `unsigned int` | No | The starting index for pagination (default is 0). | -| `to` | `unsigned int` | No | The ending index for pagination (default is the total number of results). | +| `from` | `signed int` | No | Inclusive starting index for pagination (default is 0). | +| `to` | `signed int` | No | Exclusive ending index for pagination (default is the total number of results). Overrides `count` | +| `count` | `unsigned int` | No | The number of values requested | | `format` | `string` | No | The format of the response. Options include `json`, `csv`, `tsv`, or `md` (default is `json`). | **Examples:** @@ -80,7 +97,7 @@ GET /api/query?index=week&values=ohlc,block-interval-average&from=0&to=20&format ### Meta -#### `GET /version` +#### [`GET /version`](https://bitcoinresearchkit.org/version) The version of the server and thus BRK. diff --git a/crates/brk_server/src/api/mod.rs b/crates/brk_server/src/api/mod.rs index d6af28b26..3e8fff14d 100644 --- a/crates/brk_server/src/api/mod.rs +++ b/crates/brk_server/src/api/mod.rs @@ -1,11 +1,13 @@ use std::collections::BTreeMap; use axum::{ - Router, - extract::State, + Json, Router, + extract::{Path, Query, State}, + http::HeaderMap, response::{IntoResponse, Redirect, Response}, routing::get, }; +use brk_query::{Params, ParamsOpt}; use super::AppState; @@ -20,24 +22,29 @@ pub trait ApiRoutes { impl ApiRoutes for Router { fn add_api_routes(self) -> Self { - self.route( - "/api", - get(|| async { - Redirect::permanent( - "https://github.com/bitcoinresearchkit/brk/tree/main/crates/brk_server#api", - ) - }), - ) - .route("/api/query", get(query::handler)) - .route("/api/vecs/ids", get(vecids_handler)) - .route("/api/vecs/indexes", get(vecindexes_handler)) - .route("/api/vecs/id-to-indexes", get(vecid_to_vecindexes_handler)) - .route("/api/vecs/index-to-ids", get(vecindex_to_vecids_handler)) + self.route("/api/query", get(query::handler)) + .route("/api/vecs/id-count", get(id_count_handler)) + .route("/api/vecs/index-count", get(index_count_handler)) + .route("/api/vecs/variant-count", get(variant_count_handler)) + .route("/api/vecs/ids", get(ids_handler)) + .route("/api/vecs/indexes", get(indexes_handler)) + .route("/api/vecs/variants", get(variants_handler)) + .route("/api/vecs/id-to-indexes", get(id_to_indexes_handler)) + .route("/api/vecs/index-to-ids", get(index_to_ids_handler)) + .route("/api/{variant}", get(variant_handler)) + .route( + "/api", + get(|| async { + Redirect::temporary( + "https://github.com/bitcoinresearchkit/brk/tree/main/crates/brk_server#api", + ) + }), + ) } } -pub async fn vecids_handler(State(app_state): State) -> Response { - axum::Json( +pub async fn ids_handler(State(app_state): State) -> Response { + Json( app_state .query .vec_trees @@ -48,8 +55,29 @@ pub async fn vecids_handler(State(app_state): State) -> Response { .into_response() } -pub async fn vecindexes_handler(State(app_state): State) -> Response { - axum::Json( +pub async fn variant_count_handler(State(app_state): State) -> Response { + Json( + app_state + .query + .vec_trees + .index_to_id_to_vec + .values() + .map(|tree| tree.len()) + .sum::(), + ) + .into_response() +} + +pub async fn id_count_handler(State(app_state): State) -> Response { + Json(app_state.query.vec_trees.id_to_index_to_vec.keys().count()).into_response() +} + +pub async fn index_count_handler(State(app_state): State) -> Response { + Json(app_state.query.vec_trees.index_to_id_to_vec.keys().count()).into_response() +} + +pub async fn indexes_handler(State(app_state): State) -> Response { + Json( app_state .query .vec_trees @@ -61,10 +89,48 @@ pub async fn vecindexes_handler(State(app_state): State) -> Response { .into_response() } -pub async fn vecid_to_vecindexes_handler(State(app_state): State) -> Response { - axum::Json(app_state.query.vec_trees.serialize_id_to_index_to_vec()).into_response() +pub async fn variants_handler(State(app_state): State) -> Response { + Json( + app_state + .query + .vec_trees + .index_to_id_to_vec + .iter() + .flat_map(|(index, id_to_vec)| { + let index_ser = index.serialize_long(); + id_to_vec + .keys() + .map(|id| format!("{}-to-{}", index_ser, id)) + .collect::>() + }) + .collect::>(), + ) + .into_response() } -pub async fn vecindex_to_vecids_handler(State(app_state): State) -> Response { - axum::Json(app_state.query.vec_trees.serialize_index_to_id_to_vec()).into_response() +pub async fn id_to_indexes_handler(State(app_state): State) -> Response { + Json(app_state.query.vec_trees.serialize_id_to_index_to_vec()).into_response() +} + +pub async fn index_to_ids_handler(State(app_state): State) -> Response { + Json(app_state.query.vec_trees.serialize_index_to_id_to_vec()).into_response() +} + +const TO_SEPARATOR: &str = "-to-"; + +pub async fn variant_handler( + headers: HeaderMap, + Path(variant): Path, + Query(params_opt): Query, + state: State, +) -> Response { + let mut split = variant.split(TO_SEPARATOR); + let params = Params::from(( + ( + split.next().unwrap().to_string(), + split.collect::>().join(TO_SEPARATOR), + ), + params_opt, + )); + query::handler(headers, Query(params), state).await } diff --git a/crates/brk_server/src/api/query/mod.rs b/crates/brk_server/src/api/query/mod.rs index 9b301ea99..380806441 100644 --- a/crates/brk_server/src/api/query/mod.rs +++ b/crates/brk_server/src/api/query/mod.rs @@ -37,30 +37,14 @@ pub async fn handler( fn req_to_response_res( headers: HeaderMap, AxumQuery(Params { - format, - from, index, - mut to, - count, values, + rest, }): AxumQuery, AppState { query, .. }: AppState, ) -> color_eyre::Result { let index = Index::try_from(index.as_str())?; - if to.is_none() { - if let Some(c) = count { - let c = c as i64; - if let Some(f) = from { - if f.is_positive() || f.abs() > c { - to.replace(f + c); - } - } else { - to.replace(c); - } - } - } - let vecs = query.search( index, &values.iter().map(|v| v.as_str()).collect::>(), @@ -70,6 +54,10 @@ fn req_to_response_res( return Ok(Json(vec![] as Vec).into_response()); } + let from = rest.from(); + let to = rest.to(); + let format = rest.format(); + let weight = vecs .iter() .map(|(_, v)| { diff --git a/crates/brk_vec/src/variants/raw.rs b/crates/brk_vec/src/variants/raw.rs index 02f023f8f..9b682087a 100644 --- a/crates/brk_vec/src/variants/raw.rs +++ b/crates/brk_vec/src/variants/raw.rs @@ -48,8 +48,10 @@ where fs::create_dir_all(path)?; let version_path = Self::path_version_(path); - version.validate(version_path.as_ref())?; - version.write(version_path.as_ref())?; + + if !version.validate(version_path.as_ref())? { + version.write(version_path.as_ref())?; + } let file = Self::open_file_(Self::path_vec_(path).as_path())?; let mmap = Arc::new(ArcSwap::new(Self::new_mmap(file)?));