server: add support for /api/X-to-Y + fix query cli + add meta api endpoints

This commit is contained in:
nym21
2025-06-08 20:30:53 +02:00
parent e0bf1d736f
commit be492d5084
11 changed files with 311 additions and 132 deletions
+9 -5
View File
@@ -1,4 +1,4 @@
use std::fs;
use std::{fs, thread};
use brk_core::{dot_brk_log_path, dot_brk_path};
use brk_query::Params as QueryArgs;
@@ -35,8 +35,12 @@ pub fn main() -> color_eyre::Result<()> {
let cli = Cli::parse();
match cli.command {
Commands::Run(args) => run(args),
Commands::Query(args) => query(args),
}
thread::Builder::new()
.stack_size(128 * 1024 * 1024)
.spawn(|| match cli.command {
Commands::Run(args) => run(args),
Commands::Query(args) => query(args),
})?
.join()
.unwrap()
}
+5 -3
View File
@@ -19,12 +19,14 @@ pub fn query(params: QueryParams) -> color_eyre::Result<()> {
let query = Query::build(&indexer, &computer);
let index = Index::try_from(params.index.as_str())?;
let ids = params.values.iter().map(|s| s.as_str()).collect::<Vec<_>>();
let from = params.from();
let to = params.to();
let format = params.format();
let res = query.search_and_format(index, &ids, params.from, params.to, params.format)?;
let res = query.search_and_format(index, &ids, from, to, format)?;
if params.format.is_some() {
if format.is_some() {
println!("{}", res);
} else {
println!(
+39 -47
View File
@@ -1,7 +1,7 @@
use std::{
fs,
path::{Path, PathBuf},
thread::{self, sleep},
thread::sleep,
time::Duration,
};
@@ -49,69 +49,61 @@ pub fn run(config: RunConfig) -> color_eyre::Result<()> {
Ok(())
};
let f = move || -> color_eyre::Result<()> {
let mut computer = Computer::new(&config.outputsdir(), config.fetcher(), format);
computer.import_stores(&indexer)?;
computer.import_vecs(&indexer, config.computation())?;
let mut computer = Computer::new(&config.outputsdir(), config.fetcher(), format);
computer.import_stores(&indexer)?;
computer.import_vecs(&indexer, config.computation())?;
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async {
let server = if config.serve() {
let served_indexer = indexer.clone();
let served_computer = computer.clone();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async {
let server = if config.serve() {
let served_indexer = indexer.clone();
let served_computer = computer.clone();
let server = Server::new(served_indexer, served_computer, config.website())?;
let server = Server::new(served_indexer, served_computer, config.website())?;
let opt = Some(tokio::spawn(async move {
server.serve().await.unwrap();
}));
let opt = Some(tokio::spawn(async move {
server.serve().await.unwrap();
}));
sleep(Duration::from_secs(1));
sleep(Duration::from_secs(1));
opt
} else {
None
};
opt
} else {
None
};
if config.process() {
loop {
wait_for_synced_node()?;
if config.process() {
loop {
wait_for_synced_node()?;
let block_count = rpc.get_block_count()?;
let block_count = rpc.get_block_count()?;
info!("{} blocks found.", block_count + 1);
info!("{} blocks found.", block_count + 1);
let starting_indexes = indexer.index(&parser, rpc, &exit)?;
let starting_indexes = indexer.index(&parser, rpc, &exit)?;
computer.compute(&mut indexer, starting_indexes, &exit)?;
computer.compute(&mut indexer, starting_indexes, &exit)?;
if let Some(delay) = config.delay() {
sleep(Duration::from_secs(delay))
}
if let Some(delay) = config.delay() {
sleep(Duration::from_secs(delay))
}
info!("Waiting for new blocks...");
info!("Waiting for new blocks...");
while block_count == rpc.get_block_count()? {
sleep(Duration::from_secs(1))
}
while block_count == rpc.get_block_count()? {
sleep(Duration::from_secs(1))
}
}
}
if let Some(handle) = server {
handle.await.unwrap();
}
if let Some(handle) = server {
handle.await.unwrap();
}
Ok(())
})
};
thread::Builder::new()
.stack_size(128 * 1024 * 1024)
.spawn(f)?
.join()
.unwrap()
Ok(())
})
}
#[derive(Parser, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
+25 -16
View File
@@ -1,4 +1,4 @@
use std::{fs, path::Path};
use std::{fs, path::Path, thread};
use brk_core::Version;
use brk_exit::Exit;
@@ -44,22 +44,31 @@ impl Vecs {
) -> color_eyre::Result<Self> {
fs::create_dir_all(path)?;
let indexes = indexes::Vecs::forced_import(
path,
version + VERSION + Version::ZERO,
indexer,
computation,
format,
)?;
let (indexes, fetched) = thread::scope(|s| {
let indexes_handle = s.spawn(|| {
indexes::Vecs::forced_import(
path,
version + VERSION + Version::ZERO,
indexer,
computation,
format,
)
.unwrap()
});
let fetched = fetch.then(|| {
fetched::Vecs::forced_import(
path,
version + VERSION + Version::ZERO,
computation,
format,
)
.unwrap()
let fetch_handle = s.spawn(|| {
fetch.then(|| {
fetched::Vecs::forced_import(
path,
version + VERSION + Version::ZERO,
computation,
format,
)
.unwrap()
})
});
(indexes_handle.join().unwrap(), fetch_handle.join().unwrap())
});
Ok(Self {
+7 -3
View File
@@ -33,7 +33,9 @@ impl Version {
Self(self.0.swap_bytes())
}
pub fn validate(&self, path: &Path) -> Result<()> {
/// Ok(true) if existed and is same
/// Ok(false) if didn't exist
pub fn validate(&self, path: &Path) -> Result<bool> {
if let Ok(prev_version) = Version::try_from(path) {
if prev_version != *self {
if prev_version.swap_bytes() == *self {
@@ -44,9 +46,11 @@ impl Version {
expected: *self,
});
}
}
Ok(())
Ok(true)
} else {
Ok(false)
}
}
}
+1 -1
View File
@@ -19,7 +19,7 @@ mod vec_trees;
pub use format::Format;
pub use index::Index;
pub use output::{Output, Value};
pub use params::Params;
pub use params::{Params, ParamsOpt};
pub use table::Tabled;
use vec_trees::VecTrees;
+103 -8
View File
@@ -1,6 +1,8 @@
use std::{fmt::Display, ops::Deref, str::FromStr};
use clap::builder::PossibleValuesParser;
use clap_derive::Parser;
use serde::Deserialize;
use serde::{Deserialize, Deserializer};
use serde_with::{OneOrMany, formats::PreferOne, serde_as};
use crate::{Format, Index};
@@ -17,18 +19,111 @@ pub struct Params {
#[serde_as(as = "OneOrMany<_, PreferOne>")]
/// Names of the values requested
pub values: Vec<String>,
#[clap(flatten)]
#[serde(flatten)]
pub rest: ParamsOpt,
}
// The macro creates custom deserialization code.
// You need to specify a function name and the field name of the flattened field.
serde_with::flattened_maybe!(deserialize_rest, "rest");
impl Deref for Params {
type Target = ParamsOpt;
fn deref(&self) -> &Self::Target {
&self.rest
}
}
impl From<((String, String), ParamsOpt)> for Params {
fn from(((index, id), rest): ((String, String), ParamsOpt)) -> Self {
Self {
index,
values: vec![id],
rest,
}
}
}
#[serde_as]
#[derive(Debug, Deserialize, Parser)]
pub struct ParamsOpt {
#[clap(short, long, allow_hyphen_values = true)]
#[serde(alias = "f")]
#[serde(default, alias = "f", deserialize_with = "de_unquote_i64")]
/// Inclusive starting index, if negative will be from the end
pub from: Option<i64>,
from: Option<i64>,
#[clap(short, long, allow_hyphen_values = true)]
#[serde(default, alias = "t")]
#[serde(default, alias = "t", deserialize_with = "de_unquote_i64")]
/// Exclusive ending index, if negative will be from the end, overrides 'count'
pub to: Option<i64>,
#[serde(default, alias = "c")]
to: Option<i64>,
#[clap(short, long, allow_hyphen_values = true)]
#[serde(default, alias = "c", deserialize_with = "de_unquote_usize")]
/// Number of values
pub count: Option<usize>,
count: Option<usize>,
#[clap(short = 'F', long)]
/// Format of the output
pub format: Option<Format>,
format: Option<Format>,
}
impl ParamsOpt {
pub fn from(&self) -> Option<i64> {
self.from
}
pub fn to(&self) -> Option<i64> {
if self.to.is_none() {
if let Some(c) = self.count {
let c = c as i64;
if let Some(f) = self.from {
if f.is_positive() || f.abs() > c {
return Some(f + c);
}
} else {
return Some(c);
}
}
}
self.to
}
pub fn format(&self) -> Option<Format> {
self.format
}
}
fn de_unquote_i64<'de, D>(deserializer: D) -> Result<Option<i64>, D::Error>
where
D: Deserializer<'de>,
{
de_unquote(deserializer)
}
fn de_unquote_usize<'de, D>(deserializer: D) -> Result<Option<usize>, D::Error>
where
D: Deserializer<'de>,
{
de_unquote(deserializer)
}
fn de_unquote<'de, D, F>(deserializer: D) -> Result<Option<F>, D::Error>
where
D: Deserializer<'de>,
F: FromStr + Display,
<F as std::str::FromStr>::Err: Display,
{
let opt: Option<String> = Option::deserialize(deserializer)?;
let s = match opt {
None => return Ok(None),
Some(mut s) => {
// strip any leading/trailing quotes
if s.starts_with('"') && s.ends_with('"') && s.len() >= 2 {
s = s[1..s.len() - 1].to_string();
}
s
}
};
s.parse::<F>()
.map(Some)
.map_err(|e| serde::de::Error::custom(format!("cannot parse `{}` as type: {}", s, e)))
}
+24 -7
View File
@@ -41,19 +41,35 @@ The API uses `brk_query` and so inherites all of its features including formats.
### API
#### `GET /api/vecs/indexes`
#### [`GET /api/vecs/index-count`](https://bitcoinresearchkit.org/api/vecs/index-count)
Count of all possible indexes
#### [`GET /api/vecs/id-count`](https://bitcoinresearchkit.org/api/vecs/id-count)
Count of all possible ids
#### [`GET /api/vecs/variant-count`](https://bitcoinresearchkit.org/api/vecs/variant-count)
Count of all possible variants
#### [`GET /api/vecs/indexes`](https://bitcoinresearchkit.org/api/vecs/indexes)
A list of all possible vec indexes and their accepted variants
#### `GET /api/vecs/ids`
#### [`GET /api/vecs/ids`](https://bitcoinresearchkit.org/api/vecs/ids)
A list of all possible vec ids
#### `GET /api/vecs/id-to-indexes`
#### [`GET /api/vecs/variants`](https://bitcoinresearchkit.org/api/vecs/variants)
A list of all possible variants
#### [`GET /api/vecs/id-to-indexes`](https://bitcoinresearchkit.org/api/vecs/id-to-indexes)
A list of all possible vec ids and their supported vec indexes
#### `GET /api/vecs/index-to-ids`
#### [`GET /api/vecs/index-to-ids`](https://bitcoinresearchkit.org/api/vecs/index-to-ids)
A list of all possible vec indexes and their supported vec ids
@@ -67,8 +83,9 @@ This endpoint retrieves data based on the specified vector index and values.
| --- | --- | --- | --- |
| `index` | `VecIndex` | Yes | The vector index to query. |
| `values` | `VecId[]` | Yes | A comma or space-separated list of vector IDs to retrieve. |
| `from` | `unsigned int` | No | The starting index for pagination (default is 0). |
| `to` | `unsigned int` | No | The ending index for pagination (default is the total number of results). |
| `from` | `signed int` | No | Inclusive starting index for pagination (default is 0). |
| `to` | `signed int` | No | Exclusive ending index for pagination (default is the total number of results). Overrides `count` |
| `count` | `unsigned int` | No | The number of values requested |
| `format` | `string` | No | The format of the response. Options include `json`, `csv`, `tsv`, or `md` (default is `json`). |
**Examples:**
@@ -80,7 +97,7 @@ GET /api/query?index=week&values=ohlc,block-interval-average&from=0&to=20&format
### Meta
#### `GET /version`
#### [`GET /version`](https://bitcoinresearchkit.org/version)
The version of the server and thus BRK.
+89 -23
View File
@@ -1,11 +1,13 @@
use std::collections::BTreeMap;
use axum::{
Router,
extract::State,
Json, Router,
extract::{Path, Query, State},
http::HeaderMap,
response::{IntoResponse, Redirect, Response},
routing::get,
};
use brk_query::{Params, ParamsOpt};
use super::AppState;
@@ -20,24 +22,29 @@ pub trait ApiRoutes {
impl ApiRoutes for Router<AppState> {
fn add_api_routes(self) -> Self {
self.route(
"/api",
get(|| async {
Redirect::permanent(
"https://github.com/bitcoinresearchkit/brk/tree/main/crates/brk_server#api",
)
}),
)
.route("/api/query", get(query::handler))
.route("/api/vecs/ids", get(vecids_handler))
.route("/api/vecs/indexes", get(vecindexes_handler))
.route("/api/vecs/id-to-indexes", get(vecid_to_vecindexes_handler))
.route("/api/vecs/index-to-ids", get(vecindex_to_vecids_handler))
self.route("/api/query", get(query::handler))
.route("/api/vecs/id-count", get(id_count_handler))
.route("/api/vecs/index-count", get(index_count_handler))
.route("/api/vecs/variant-count", get(variant_count_handler))
.route("/api/vecs/ids", get(ids_handler))
.route("/api/vecs/indexes", get(indexes_handler))
.route("/api/vecs/variants", get(variants_handler))
.route("/api/vecs/id-to-indexes", get(id_to_indexes_handler))
.route("/api/vecs/index-to-ids", get(index_to_ids_handler))
.route("/api/{variant}", get(variant_handler))
.route(
"/api",
get(|| async {
Redirect::temporary(
"https://github.com/bitcoinresearchkit/brk/tree/main/crates/brk_server#api",
)
}),
)
}
}
pub async fn vecids_handler(State(app_state): State<AppState>) -> Response {
axum::Json(
pub async fn ids_handler(State(app_state): State<AppState>) -> Response {
Json(
app_state
.query
.vec_trees
@@ -48,8 +55,29 @@ pub async fn vecids_handler(State(app_state): State<AppState>) -> Response {
.into_response()
}
pub async fn vecindexes_handler(State(app_state): State<AppState>) -> Response {
axum::Json(
pub async fn variant_count_handler(State(app_state): State<AppState>) -> Response {
Json(
app_state
.query
.vec_trees
.index_to_id_to_vec
.values()
.map(|tree| tree.len())
.sum::<usize>(),
)
.into_response()
}
pub async fn id_count_handler(State(app_state): State<AppState>) -> Response {
Json(app_state.query.vec_trees.id_to_index_to_vec.keys().count()).into_response()
}
pub async fn index_count_handler(State(app_state): State<AppState>) -> Response {
Json(app_state.query.vec_trees.index_to_id_to_vec.keys().count()).into_response()
}
pub async fn indexes_handler(State(app_state): State<AppState>) -> Response {
Json(
app_state
.query
.vec_trees
@@ -61,10 +89,48 @@ pub async fn vecindexes_handler(State(app_state): State<AppState>) -> Response {
.into_response()
}
pub async fn vecid_to_vecindexes_handler(State(app_state): State<AppState>) -> Response {
axum::Json(app_state.query.vec_trees.serialize_id_to_index_to_vec()).into_response()
pub async fn variants_handler(State(app_state): State<AppState>) -> Response {
Json(
app_state
.query
.vec_trees
.index_to_id_to_vec
.iter()
.flat_map(|(index, id_to_vec)| {
let index_ser = index.serialize_long();
id_to_vec
.keys()
.map(|id| format!("{}-to-{}", index_ser, id))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>(),
)
.into_response()
}
pub async fn vecindex_to_vecids_handler(State(app_state): State<AppState>) -> Response {
axum::Json(app_state.query.vec_trees.serialize_index_to_id_to_vec()).into_response()
pub async fn id_to_indexes_handler(State(app_state): State<AppState>) -> Response {
Json(app_state.query.vec_trees.serialize_id_to_index_to_vec()).into_response()
}
pub async fn index_to_ids_handler(State(app_state): State<AppState>) -> Response {
Json(app_state.query.vec_trees.serialize_index_to_id_to_vec()).into_response()
}
const TO_SEPARATOR: &str = "-to-";
pub async fn variant_handler(
headers: HeaderMap,
Path(variant): Path<String>,
Query(params_opt): Query<ParamsOpt>,
state: State<AppState>,
) -> Response {
let mut split = variant.split(TO_SEPARATOR);
let params = Params::from((
(
split.next().unwrap().to_string(),
split.collect::<Vec<_>>().join(TO_SEPARATOR),
),
params_opt,
));
query::handler(headers, Query(params), state).await
}
+5 -17
View File
@@ -37,30 +37,14 @@ pub async fn handler(
fn req_to_response_res(
headers: HeaderMap,
AxumQuery(Params {
format,
from,
index,
mut to,
count,
values,
rest,
}): AxumQuery<Params>,
AppState { query, .. }: AppState,
) -> color_eyre::Result<Response> {
let index = Index::try_from(index.as_str())?;
if to.is_none() {
if let Some(c) = count {
let c = c as i64;
if let Some(f) = from {
if f.is_positive() || f.abs() > c {
to.replace(f + c);
}
} else {
to.replace(c);
}
}
}
let vecs = query.search(
index,
&values.iter().map(|v| v.as_str()).collect::<Vec<_>>(),
@@ -70,6 +54,10 @@ fn req_to_response_res(
return Ok(Json(vec![] as Vec<usize>).into_response());
}
let from = rest.from();
let to = rest.to();
let format = rest.format();
let weight = vecs
.iter()
.map(|(_, v)| {
+4 -2
View File
@@ -48,8 +48,10 @@ where
fs::create_dir_all(path)?;
let version_path = Self::path_version_(path);
version.validate(version_path.as_ref())?;
version.write(version_path.as_ref())?;
if !version.validate(version_path.as_ref())? {
version.write(version_path.as_ref())?;
}
let file = Self::open_file_(Self::path_vec_(path).as_path())?;
let mmap = Arc::new(ArcSwap::new(Self::new_mmap(file)?));