global: snapshot

This commit is contained in:
nym21
2025-08-03 23:38:58 +02:00
parent f7aa9424db
commit a2f5704581
50 changed files with 818 additions and 704 deletions

32
Cargo.lock generated
View File

@@ -513,9 +513,11 @@ name = "brk_cli"
version = "0.0.83"
dependencies = [
"bitcoincore-rpc",
"brk_bundler",
"brk_computer",
"brk_fetcher",
"brk_indexer",
"brk_interface",
"brk_logger",
"brk_parser",
"brk_server",
@@ -524,9 +526,11 @@ dependencies = [
"clap_derive",
"color-eyre",
"log",
"minreq",
"serde",
"tokio",
"toml",
"zip",
]
[[package]]
@@ -559,6 +563,7 @@ dependencies = [
"fjall",
"jiff",
"minreq",
"serde_json",
"zerocopy",
]
@@ -1003,7 +1008,6 @@ version = "0.0.83"
dependencies = [
"axum",
"bitcoincore-rpc",
"brk_bundler",
"brk_computer",
"brk_error",
"brk_fetcher",
@@ -1013,19 +1017,13 @@ dependencies = [
"brk_mcp",
"brk_parser",
"brk_vecs",
"clap",
"clap_derive",
"jiff",
"log",
"minreq",
"owo-colors",
"quick_cache",
"serde",
"serde_json",
"tokio",
"tower-http",
"tracing",
"zip",
]
[[package]]
@@ -1061,12 +1059,10 @@ dependencies = [
"brk_vecs",
"byteview",
"derive_deref",
"fjall",
"jiff",
"rapidhash",
"serde",
"serde_bytes",
"serde_json",
"zerocopy",
"zerocopy-derive",
]
@@ -1084,6 +1080,7 @@ dependencies = [
"pco",
"rayon",
"serde",
"serde_derive",
"serde_json",
"zerocopy",
"zerocopy-derive",
@@ -2507,9 +2504,9 @@ checksum = "610a5acd306ec67f907abe5567859a3c693fb9886eb1f012ab8f2a47bef3db51"
[[package]]
name = "notify"
version = "8.1.0"
version = "8.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3163f59cd3fa0e9ef8c32f242966a7b9994fd7378366099593e0e73077cd8c97"
checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3"
dependencies = [
"bitflags 2.9.1",
"fsevent-sys",
@@ -2638,11 +2635,12 @@ dependencies = [
[[package]]
name = "oxc-browserslist"
version = "2.0.12"
version = "2.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05d19022f54d3e0b8b1679c80f02d140e95e4308385eb247ba3168c299c81bb"
checksum = "b94a2f0da0105f3a813eeb6b182a791b2dc491f494432e1953fd8e72c1f3887d"
dependencies = [
"bincode",
"flate2",
"nom",
"rustc-hash",
"serde",
@@ -3356,9 +3354,9 @@ dependencies = [
[[package]]
name = "quick_cache"
version = "0.6.15"
version = "0.6.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8565e62e02af316570d4b492f17af1481d6c07cea60f4e7edd71700da5052ba9"
checksum = "9ad6644cb07b7f3488b9f3d2fde3b4c0a7fa367cafefb39dff93a659f76eb786"
dependencies = [
"ahash",
"equivalent",
@@ -4220,9 +4218,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.15"
version = "0.7.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5"
dependencies = [
"bytes",
"futures-core",

View File

@@ -41,14 +41,11 @@ brk_store = { version = "0.0.83", path = "crates/brk_store" }
brk_vecs = { version = "0.0.83", path = "crates/brk_vecs" }
brk_vecs_macros = { version = "0.0.83", path = "crates/brk_vecs_macros" }
byteview = "=0.6.1"
clap = { version = "4.5.42", features = ["string"] }
clap_derive = "4.5.41"
derive_deref = "1.1.1"
fjall = "2.11.2"
jiff = "0.2.15"
log = "0.4.27"
minreq = { version = "2.14.0", features = ["https", "serde_json"] }
owo-colors = "4.2.2"
parking_lot = "0.12.4"
rayon = "1.10.0"
serde = "1.0.219"

View File

@@ -9,7 +9,7 @@ repository.workspace = true
[dependencies]
log = { workspace = true }
notify = "8.1.0"
notify = "8.2.0"
brk_rolldown = "0.1.1"
# brk_rolldown = { path = "../../../rolldown/crates/rolldown"}
sugar_path = "1.2.0"

View File

@@ -1,4 +1,8 @@
use std::{fs, io, path::Path, sync::Arc};
use std::{
fs, io,
path::{Path, PathBuf},
sync::Arc,
};
use brk_rolldown::{Bundler, BundlerOptions, RawMinifyOptions, SourceMapType};
use log::error;
@@ -8,7 +12,7 @@ use tokio::sync::Mutex;
const VERSION: &str = env!("CARGO_PKG_VERSION");
pub async fn bundle(websites_path: &Path, source_folder: &str, watch: bool) -> io::Result<()> {
pub async fn bundle(websites_path: &Path, source_folder: &str, watch: bool) -> io::Result<PathBuf> {
let source_path = websites_path.join(source_folder);
let dist_path = websites_path.join("dist");
@@ -72,7 +76,7 @@ pub async fn bundle(websites_path: &Path, source_folder: &str, watch: bool) -> i
write_sw();
if !watch {
return Ok(());
return Ok(dist_path);
}
tokio::spawn(async move {
@@ -127,7 +131,7 @@ pub async fn bundle(websites_path: &Path, source_folder: &str, watch: bool) -> i
watcher.start().await;
});
Ok(())
Ok(dist_path)
}
fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> io::Result<()> {

View File

@@ -9,20 +9,24 @@ repository.workspace = true
[dependencies]
bitcoincore-rpc = { workspace = true }
brk_bundler = { workspace = true }
brk_computer = { workspace = true }
brk_fetcher = { workspace = true }
brk_indexer = { workspace = true }
brk_interface = { workspace = true }
brk_logger = { workspace = true }
brk_parser = { workspace = true }
brk_server = { workspace = true }
brk_vecs = { workspace = true }
clap = { workspace = true }
clap_derive = { workspace = true }
clap = { version = "4.5.42", features = ["string"] }
clap_derive = "4.5.41"
color-eyre = "0.6.5"
log = { workspace = true }
minreq = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
toml = "0.9.4"
zip = { version = "4.3.0", default-features = false, features = ["deflate"] }
[[bin]]
name = "brk"

View File

@@ -64,10 +64,10 @@ You can find a pre-built binary for your operating system in the [releases page]
```bash
# Install
cargo install brk # or `cargo install brk_cli`, the result is the same
cargo install brk --locked # or `cargo install brk_cli`, the result is the same
# Update
cargo install brk # or `cargo install-update -a` if you have `cargo-update` installed
cargo install brk --locked # or `cargo install-update -a` if you have `cargo-update` installed
```
### Source

View File

@@ -1,8 +1,9 @@
use std::{fs, io, path::Path};
use brk_interface::{Index, Interface};
use brk_server::VERSION;
use crate::{VERSION, Website};
use crate::website::Website;
const SCRIPTS: &str = "scripts";

View File

@@ -5,13 +5,12 @@ use std::{
use bitcoincore_rpc::{self, Auth, Client};
use brk_fetcher::Fetcher;
use brk_server::Website;
use clap::Parser;
use clap_derive::Parser;
use color_eyre::eyre::eyre;
use serde::{Deserialize, Deserializer, Serialize};
use crate::{default_bitcoin_path, default_brk_path, dot_brk_path};
use crate::{default_bitcoin_path, default_brk_path, dot_brk_path, website::Website};
const DOWNLOADS: &str = "downloads";

View File

@@ -1,13 +1,28 @@
#![doc = include_str!("../README.md")]
use std::{fs, thread};
use std::{
fs,
io::Cursor,
path::Path,
thread::{self, sleep},
time::Duration,
};
use bitcoincore_rpc::{self, RpcApi};
use brk_bundler::bundle;
use brk_computer::Computer;
use brk_indexer::Indexer;
use brk_interface::Interface;
use brk_server::{Server, VERSION};
use brk_vecs::Exit;
use log::info;
mod bridge;
mod config;
mod paths;
mod run;
mod website;
pub use paths::*;
use run::*;
use crate::{bridge::Bridge, config::Config, paths::*};
pub fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
@@ -22,3 +37,118 @@ pub fn main() -> color_eyre::Result<()> {
.join()
.unwrap()
}
pub fn run() -> color_eyre::Result<()> {
let config = Config::import()?;
let rpc = config.rpc()?;
let exit = Exit::new();
exit.set_ctrlc_handler();
let parser = brk_parser::Parser::new(config.blocksdir(), config.brkdir(), rpc);
let mut indexer = Indexer::forced_import(&config.brkdir())?;
let wait_for_synced_node = |rpc_client: &bitcoincore_rpc::Client| -> color_eyre::Result<()> {
let is_synced = || -> color_eyre::Result<bool> {
let info = rpc_client.get_blockchain_info()?;
Ok(info.headers == info.blocks)
};
if !is_synced()? {
info!("Waiting for node to be synced...");
while !is_synced()? {
sleep(Duration::from_secs(1))
}
}
Ok(())
};
let mut computer = Computer::forced_import(&config.brkdir(), &indexer, config.fetcher())?;
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async {
let interface = Interface::build(&indexer, &computer);
let website = config.website();
let downloads_path = config.downloads_dir();
let bundle_path = if website.is_some() {
let websites_dev_path = Path::new("../../websites");
let websites_path = if fs::exists(websites_dev_path)? {
websites_dev_path.to_path_buf()
} else {
let downloaded_websites_path =
downloads_path.join(format!("brk-{VERSION}")).join("websites");
if !fs::exists(&downloaded_websites_path)? {
info!("Downloading websites from Github...");
let url = format!(
"https://github.com/bitcoinresearchkit/brk/archive/refs/tags/v{VERSION}.zip",
);
let response = minreq::get(url).send()?;
let bytes = response.as_bytes();
let cursor = Cursor::new(bytes);
let mut zip = zip::ZipArchive::new(cursor).unwrap();
zip.extract(downloads_path).unwrap();
}
downloaded_websites_path
};
interface.generate_bridge_file(website, websites_path.as_path())?;
let watch = config.watch();
Some(bundle(&websites_path, website.to_folder_name(), watch).await?)
} else {
None
};
let server = Server::new(
interface,
bundle_path,
);
let mcp = config.mcp();
tokio::spawn(async move {
server.serve(mcp).await.unwrap();
});
sleep(Duration::from_secs(1));
loop {
wait_for_synced_node(rpc)?;
let block_count = rpc.get_block_count()?;
info!("{} blocks found.", block_count + 1);
let starting_indexes =
indexer.index(&parser, rpc, &exit, config.check_collisions())?;
computer.compute(&indexer, starting_indexes, &exit)?;
if let Some(delay) = config.delay() {
sleep(Duration::from_secs(delay))
}
info!("Waiting for new blocks...");
while block_count == rpc.get_block_count()? {
sleep(Duration::from_secs(1))
}
}
})
}

View File

@@ -1,88 +0,0 @@
use std::{thread::sleep, time::Duration};
use bitcoincore_rpc::{self, RpcApi};
use brk_computer::Computer;
use brk_indexer::Indexer;
use brk_server::Server;
use brk_vecs::Exit;
use log::info;
use crate::config::Config;
pub fn run() -> color_eyre::Result<()> {
let config = Config::import()?;
let rpc = config.rpc()?;
let exit = Exit::new();
exit.set_ctrlc_handler();
let parser = brk_parser::Parser::new(config.blocksdir(), config.brkdir(), rpc);
let mut indexer = Indexer::forced_import(&config.brkdir())?;
let wait_for_synced_node = |rpc_client: &bitcoincore_rpc::Client| -> color_eyre::Result<()> {
let is_synced = || -> color_eyre::Result<bool> {
let info = rpc_client.get_blockchain_info()?;
Ok(info.headers == info.blocks)
};
if !is_synced()? {
info!("Waiting for node to be synced...");
while !is_synced()? {
sleep(Duration::from_secs(1))
}
}
Ok(())
};
let mut computer = Computer::forced_import(&config.brkdir(), &indexer, config.fetcher())?;
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async {
let served_indexer = indexer.clone();
let served_computer = computer.clone();
let server = Server::new(
served_indexer,
served_computer,
config.website(),
&config.downloads_dir(),
)?;
let watch = config.watch();
let mcp = config.mcp();
tokio::spawn(async move {
server.serve(watch, mcp).await.unwrap();
});
sleep(Duration::from_secs(1));
loop {
wait_for_synced_node(rpc)?;
let block_count = rpc.get_block_count()?;
info!("{} blocks found.", block_count + 1);
let starting_indexes =
indexer.index(&parser, rpc, &exit, config.check_collisions())?;
computer.compute(&indexer, starting_indexes, &exit)?;
if let Some(delay) = config.delay() {
sleep(Duration::from_secs(delay))
}
info!("Waiting for new blocks...");
while block_count == rpc.get_block_count()? {
sleep(Duration::from_secs(1))
}
}
})
}

View File

@@ -17,7 +17,7 @@ impl Website {
!self.is_none()
}
pub fn to_folder_name(&self) -> &str {
pub fn to_folder_name(self) -> &'static str {
match self {
Self::Custom => "custom",
Self::Default => "default",

View File

@@ -239,4 +239,8 @@ impl Computer {
.flatten()
.collect::<Vec<_>>()
}
pub fn static_clone(&self) -> &'static Self {
Box::leak(Box::new(self.clone()))
}
}

View File

@@ -13,4 +13,5 @@ bitcoincore-rpc = { workspace = true }
fjall = { workspace = true }
jiff = { workspace = true }
minreq = { workspace = true }
serde_json = { workspace = true }
zerocopy = { workspace = true }

View File

@@ -13,12 +13,14 @@ pub enum Error {
Fjall(fjall::Error),
Minreq(minreq::Error),
SystemTimeError(time::SystemTimeError),
SerdeJson(serde_json::Error),
ZeroCopyError,
Vecs(brk_vecs::Error),
WrongLength,
WrongAddressType,
UnindexableDate,
QuickCacheError,
Str(&'static str),
String(String),
}
@@ -29,6 +31,12 @@ impl From<time::SystemTimeError> for Error {
}
}
impl From<serde_json::Error> for Error {
fn from(error: serde_json::Error) -> Self {
Self::SerdeJson(error)
}
}
impl From<io::Error> for Error {
fn from(value: io::Error) -> Self {
Self::IO(value)
@@ -82,6 +90,7 @@ impl fmt::Display for Error {
match self {
Error::IO(error) => Debug::fmt(&error, f),
Error::Minreq(error) => Debug::fmt(&error, f),
Error::SerdeJson(error) => Debug::fmt(&error, f),
Error::Vecs(error) => Debug::fmt(&error, f),
Error::BitcoinRPC(error) => Debug::fmt(&error, f),
Error::SystemTimeError(error) => Debug::fmt(&error, f),
@@ -90,6 +99,7 @@ impl fmt::Display for Error {
Error::ZeroCopyError => write!(f, "ZeroCopy error"),
Error::WrongLength => write!(f, "Wrong length"),
Error::QuickCacheError => write!(f, "Quick cache error"),
Error::WrongAddressType => write!(f, "Wrong address type"),
Error::UnindexableDate => write!(
f,

View File

@@ -221,7 +221,7 @@ where
I: StoredRaw + StoredIndex + From<usize>,
T: StoredRaw,
{
let h = Height::from(u64::from(height_to_index.stamp()));
let h = Height::from(height_to_index.stamp());
if h.is_zero() {
None
} else if h + 1_u32 == starting_height {

View File

@@ -800,6 +800,10 @@ impl Indexer {
Ok(starting_indexes)
}
pub fn static_clone(&self) -> &'static Self {
Box::leak(Box::new(self.clone()))
}
}
#[derive(Debug)]

View File

@@ -443,7 +443,7 @@ impl Vecs {
pub fn flush(&mut self, height: Height) -> Result<()> {
self.mut_vecs()
.into_par_iter()
.try_for_each(|vec| vec.stamped_flush(Stamp::from(u64::from(height))))?;
.try_for_each(|vec| vec.stamped_flush(Stamp::from(height)))?;
Ok(())
}
@@ -451,7 +451,7 @@ impl Vecs {
self.mut_vecs()
.into_iter()
.map(|vec| {
let h = Height::from(u64::from(vec.stamp()));
let h = Height::from(vec.stamp());
if h > Height::ZERO { h.incremented() } else { h }
})
.min()

View File

@@ -32,23 +32,28 @@ use vecs::Vecs;
use crate::vecs::{IdToVec, IndexToVec};
#[allow(dead_code)]
pub struct Interface<'a> {
vecs: Vecs<'a>,
_indexer: &'a Indexer,
_computer: &'a Computer,
indexer: &'a Indexer,
computer: &'a Computer,
}
impl<'a> Interface<'a> {
pub fn build(indexer: &'a Indexer, computer: &'a Computer) -> Self {
pub fn build(indexer: &Indexer, computer: &Computer) -> Self {
let indexer = indexer.static_clone();
let computer = computer.static_clone();
let vecs = Vecs::build(indexer, computer);
Self {
vecs: Vecs::build(indexer, computer),
_indexer: indexer,
_computer: computer,
vecs,
indexer,
computer,
}
}
pub fn get_height(&self) -> Height {
Height::from(u64::from(self._indexer.vecs.height_to_blockhash.stamp()))
Height::from(self.indexer.vecs.height_to_blockhash.stamp())
}
pub fn search(&self, params: &Params) -> Vec<(String, &&dyn AnyCollectableVec)> {

View File

@@ -11,4 +11,4 @@ repository.workspace = true
env_logger = "0.11.8"
jiff = { workspace = true }
log = { workspace = true }
owo-colors = { workspace = true }
owo-colors = "4.2.2"

View File

@@ -12,7 +12,7 @@ use std::{
use env_logger::{Builder, Env};
use jiff::{Timestamp, tz};
use owo_colors::OwoColorize;
pub use owo_colors::OwoColorize;
#[inline]
pub fn init(path: Option<&Path>) {

View File

@@ -10,7 +10,6 @@ repository.workspace = true
[dependencies]
axum = { workspace = true }
bitcoincore-rpc = { workspace = true }
brk_bundler = { workspace = true }
brk_computer = { workspace = true }
brk_error = { workspace = true }
brk_fetcher = { workspace = true }
@@ -20,19 +19,13 @@ brk_logger = { workspace = true }
brk_mcp = { workspace = true }
brk_parser = { workspace = true }
brk_vecs = { workspace = true }
clap = { workspace = true }
clap_derive = { workspace = true }
jiff = { workspace = true }
log = { workspace = true }
minreq = { workspace = true }
owo-colors = { workspace = true }
quick_cache = "0.6.15"
serde = { workspace = true }
quick_cache = "0.6.16"
serde_json = { workspace = true }
tokio = { workspace = true }
tower-http = { version = "0.6.6", features = ["compression-full", "trace"] }
tracing = "0.1.41"
zip = { version = "4.3.0", default-features = false, features = ["deflate"] }
[package.metadata.cargo-machete]
ignored = ["clap"]

View File

@@ -6,8 +6,9 @@ use brk_computer::Computer;
use brk_error::Result;
use brk_fetcher::Fetcher;
use brk_indexer::Indexer;
use brk_interface::Interface;
use brk_parser::Parser;
use brk_server::{Server, Website};
use brk_server::Server;
use brk_vecs::Exit;
pub fn main() -> Result<()> {
@@ -39,18 +40,12 @@ pub fn main() -> Result<()> {
.enable_all()
.build()?
.block_on(async {
let served_indexer = indexer.clone();
let served_computer = computer.clone();
let interface = Interface::build(&indexer, &computer);
let server = Server::new(
served_indexer,
served_computer,
Website::Default,
Path::new(""),
)?;
let server = Server::new(interface, None);
let server = tokio::spawn(async move {
server.serve(true, true).await.unwrap();
server.serve(true).await.unwrap();
});
if process {

View File

@@ -1,29 +1,30 @@
use std::time::Duration;
use axum::{
Json,
body::Body,
extract::{Query, State},
http::{HeaderMap, StatusCode},
http::{HeaderMap, StatusCode, Uri},
response::{IntoResponse, Response},
};
use brk_error::{Error, Result};
use brk_interface::{Format, Output, Params};
use brk_vecs::Stamp;
use quick_cache::sync::GuardResult;
use crate::traits::{HeaderMapExtended, ResponseExtended};
use crate::{HeaderMapExtended, ResponseExtended};
use super::AppState;
mod bridge;
pub use bridge::*;
const MAX_WEIGHT: usize = 320_000;
pub async fn handler(
uri: Uri,
headers: HeaderMap,
query: Query<Params>,
State(app_state): State<AppState>,
) -> Response {
match req_to_response_res(headers, query, app_state) {
match req_to_response_res(uri, headers, query, app_state) {
Ok(response) => response,
Err(error) => {
let mut response =
@@ -35,9 +36,12 @@ pub async fn handler(
}
fn req_to_response_res(
uri: Uri,
headers: HeaderMap,
Query(params): Query<Params>,
AppState { interface, .. }: AppState,
AppState {
interface, cache, ..
}: AppState,
) -> Result<Response> {
let vecs = interface.search(&params);
@@ -67,7 +71,7 @@ fn req_to_response_res(
.first()
.unwrap()
.1
.etag(Stamp::from(u64::from(interface.get_height())), to);
.etag(Stamp::from(interface.get_height()), to);
if headers
.get_if_none_match()
@@ -76,17 +80,51 @@ fn req_to_response_res(
return Ok(Response::new_not_modified());
}
let output = interface.format(vecs, &params.rest)?;
let guard_res = cache.get_value_or_guard(
&format!("{}{}{etag}", uri.path(), uri.query().unwrap_or("")),
Some(Duration::from_millis(500)),
);
let mut response = match output {
Output::CSV(s) => s.into_response(),
Output::TSV(s) => s.into_response(),
Output::Json(v) => match v {
brk_interface::Value::Single(v) => Json(v).into_response(),
brk_interface::Value::List(v) => Json(v).into_response(),
brk_interface::Value::Matrix(v) => Json(v).into_response(),
},
Output::MD(s) => s.into_response(),
let mut response = if let GuardResult::Value(v) = guard_res {
Response::new(Body::from(v))
} else {
match interface.format(vecs, &params.rest)? {
Output::CSV(s) => {
if let GuardResult::Guard(g) = guard_res {
g.insert(s.clone().into())
.map_err(|_| Error::QuickCacheError)?;
}
s.into_response()
}
Output::TSV(s) => {
if let GuardResult::Guard(g) = guard_res {
g.insert(s.clone().into())
.map_err(|_| Error::QuickCacheError)?;
}
s.into_response()
}
Output::MD(s) => {
if let GuardResult::Guard(g) = guard_res {
g.insert(s.clone().into())
.map_err(|_| Error::QuickCacheError)?;
}
s.into_response()
}
Output::Json(v) => {
let json = match v {
brk_interface::Value::Single(v) => serde_json::to_vec(&v)?,
brk_interface::Value::List(v) => serde_json::to_vec(&v)?,
brk_interface::Value::Matrix(v) => serde_json::to_vec(&v)?,
};
if let GuardResult::Guard(g) = guard_res {
g.insert(json.clone().into())
.map_err(|_| Error::QuickCacheError)?;
}
json.into_response()
}
}
};
let headers = response.headers_mut();

View File

@@ -1,7 +1,7 @@
use axum::{
Json, Router,
extract::{Path, Query, State},
http::HeaderMap,
http::{HeaderMap, Uri},
response::{IntoResponse, Redirect, Response},
routing::get,
};
@@ -12,8 +12,6 @@ use super::AppState;
mod explorer;
mod interface;
pub use interface::Bridge;
pub trait ApiRoutes {
fn add_api_routes(self) -> Self;
}
@@ -87,7 +85,8 @@ impl ApiRoutes for Router<AppState> {
.route(
"/api/vecs/{variant}",
get(
async |headers: HeaderMap,
async |uri: Uri,
headers: HeaderMap,
Path(variant): Path<String>,
Query(params_opt): Query<ParamsOpt>,
state: State<AppState>|
@@ -100,7 +99,7 @@ impl ApiRoutes for Router<AppState> {
(index, split.collect::<Vec<_>>().join(TO_SEPARATOR)),
params_opt,
));
interface::handler(headers, Query(params), state).await
interface::handler(uri, headers, Query(params), state).await
} else {
"Bad variant".into_response()
}
@@ -127,22 +126,3 @@ impl ApiRoutes for Router<AppState> {
)
}
}
// pub async fn variants_handler(State(app_state): State<AppState>) -> Response {
// Json(
// app_state
// .query
// .vec_trees
// .index_to_id_to_vec
// .iter()
// .flat_map(|(index, id_to_vec)| {
// let index_ser = index.serialize_long();
// id_to_vec
// .keys()
// .map(|id| format!("{}-to-{}", index_ser, id))
// .collect::<Vec<_>>()
// })
// .collect::<Vec<_>>(),
// )
// .into_response()
// }

View File

@@ -1,4 +1,4 @@
use std::{fs, path::Path};
use std::{fs, path::Path, time::Duration};
use axum::{
body::Body,
@@ -6,13 +6,11 @@ use axum::{
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
};
use brk_error::Result;
use brk_error::{Error, Result};
use log::{error, info};
use quick_cache::sync::GuardResult;
use crate::{
AppState,
traits::{HeaderMapExtended, ModifiedState, ResponseExtended},
};
use crate::{AppState, HeaderMapExtended, ModifiedState, ResponseExtended};
pub async fn file_handler(
headers: HeaderMap,
@@ -31,12 +29,12 @@ fn any_handler(
app_state: AppState,
path: Option<extract::Path<String>>,
) -> Response {
let dist_path = app_state.dist_path();
let files_path = app_state.path.as_ref().unwrap();
if let Some(path) = path.as_ref() {
let path = path.0.replace("..", "").replace("\\", "");
let mut path = dist_path.join(&path);
let mut path = files_path.join(&path);
if !path.exists() || path.is_dir() {
if path.extension().is_some() {
@@ -50,18 +48,18 @@ fn any_handler(
return response;
} else {
path = dist_path.join("index.html");
path = files_path.join("index.html");
}
}
path_to_response(&headers, &path)
path_to_response(&headers, &app_state, &path)
} else {
path_to_response(&headers, &dist_path.join("index.html"))
path_to_response(&headers, &app_state, &files_path.join("index.html"))
}
}
fn path_to_response(headers: &HeaderMap, path: &Path) -> Response {
match path_to_response_(headers, path) {
fn path_to_response(headers: &HeaderMap, app_state: &AppState, path: &Path) -> Response {
match path_to_response_(headers, app_state, path) {
Ok(response) => response,
Err(error) => {
let mut response =
@@ -74,32 +72,51 @@ fn path_to_response(headers: &HeaderMap, path: &Path) -> Response {
}
}
fn path_to_response_(headers: &HeaderMap, path: &Path) -> Result<Response> {
fn path_to_response_(headers: &HeaderMap, app_state: &AppState, path: &Path) -> Result<Response> {
let (modified, date) = headers.check_if_modified_since(path)?;
if modified == ModifiedState::NotModifiedSince {
return Ok(Response::new_not_modified());
}
let content = fs::read(path).unwrap_or_else(|error| {
error!("{error}");
let path = path.to_str().unwrap();
info!("Can't read file {path}");
panic!("")
});
let serialized_path = path.to_str().unwrap();
let mut response = Response::new(content.into());
let must_revalidate = path
.extension()
.is_some_and(|extension| extension == "html")
|| serialized_path.ends_with("service-worker.js");
let guard_res = if !must_revalidate {
Some(app_state.cache.get_value_or_guard(
&path.to_str().unwrap().to_owned(),
Some(Duration::from_millis(500)),
))
} else {
None
};
let mut response = if let Some(GuardResult::Value(v)) = guard_res {
Response::new(Body::from(v))
} else {
let content = fs::read(path).unwrap_or_else(|error| {
error!("{error}");
let path = path.to_str().unwrap();
info!("Can't read file {path}");
panic!("")
});
if let Some(GuardResult::Guard(g)) = guard_res {
g.insert(content.clone().into())
.map_err(|_| Error::QuickCacheError)?;
}
Response::new(content.into())
};
let headers = response.headers_mut();
headers.insert_cors();
headers.insert_content_type(path);
let serialized_path = path.to_str().unwrap();
if path
.extension()
.is_some_and(|extension| extension == "html")
|| serialized_path.ends_with("service-worker.js")
{
if must_revalidate {
headers.insert_cache_control_must_revalidate();
} else if path.extension().is_some_and(|extension| {
extension == "jpg"

View File

@@ -1,20 +1,20 @@
use std::path::PathBuf;
use axum::{Router, routing::get};
use super::AppState;
mod file;
mod website;
use file::{file_handler, index_handler};
pub use website::Website;
pub trait FilesRoutes {
fn add_website_routes(self, website: Website) -> Self;
fn add_files_routes(self, path: Option<&PathBuf>) -> Self;
}
impl FilesRoutes for Router<AppState> {
fn add_website_routes(self, website: Website) -> Self {
if website.is_some() {
fn add_files_routes(self, path: Option<&PathBuf>) -> Self {
if path.is_some() {
self.route("/{*path}", get(file_handler))
.route("/", get(index_handler))
} else {

View File

@@ -3,124 +3,57 @@
#![doc = include_str!("../examples/main.rs")]
#![doc = "```"]
use std::{
fs,
io::Cursor,
path::{Path, PathBuf},
time::Duration,
};
use std::{path::PathBuf, sync::Arc, time::Duration};
use api::{ApiRoutes, Bridge};
use api::ApiRoutes;
use axum::{
Json, Router,
body::Body,
body::{Body, Bytes},
http::{Request, Response, StatusCode, Uri},
middleware::Next,
routing::get,
serve,
};
use brk_bundler::bundle;
use brk_computer::Computer;
use brk_error::Result;
use brk_indexer::Indexer;
use brk_interface::Interface;
use brk_logger::OwoColorize;
use brk_mcp::route::MCPRoutes;
use files::FilesRoutes;
use log::{error, info};
use owo_colors::OwoColorize;
use quick_cache::sync::Cache;
use tokio::net::TcpListener;
use tower_http::{compression::CompressionLayer, trace::TraceLayer};
use tracing::Span;
mod api;
mod extended;
mod files;
mod traits;
pub use files::Website;
use tracing::Span;
use extended::*;
#[derive(Clone)]
pub struct AppState {
interface: &'static Interface<'static>,
website: Website,
websites_path: Option<PathBuf>,
}
impl AppState {
pub fn dist_path(&self) -> PathBuf {
self.websites_path
.as_ref()
.expect("Should never reach here is websites_path is None")
.join("dist")
}
path: Option<PathBuf>,
cache: Arc<Cache<String, Bytes>>,
}
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
const DEV_PATH: &str = "../..";
const WEBSITES: &str = "websites";
pub struct Server(AppState);
impl Server {
pub fn new(
indexer: Indexer,
computer: Computer,
website: Website,
downloads_path: &Path,
) -> Result<Self> {
let indexer = Box::leak(Box::new(indexer));
let computer = Box::leak(Box::new(computer));
let interface = Box::leak(Box::new(Interface::build(indexer, computer)));
let websites_path = if website.is_some() {
let websites_dev_path = Path::new(DEV_PATH).join(WEBSITES);
let websites_path = if fs::exists(&websites_dev_path)? {
websites_dev_path
} else {
let downloaded_websites_path =
downloads_path.join(format!("brk-{VERSION}")).join(WEBSITES);
if !fs::exists(&downloaded_websites_path)? {
info!("Downloading websites from Github...");
let url = format!(
"https://github.com/bitcoinresearchkit/brk/archive/refs/tags/v{VERSION}.zip",
);
let response = minreq::get(url).send()?;
let bytes = response.as_bytes();
let cursor = Cursor::new(bytes);
let mut zip = zip::ZipArchive::new(cursor).unwrap();
zip.extract(downloads_path).unwrap();
}
downloaded_websites_path
};
interface.generate_bridge_file(website, websites_path.as_path())?;
Some(websites_path)
} else {
None
};
Ok(Self(AppState {
interface,
website,
websites_path,
}))
pub fn new(interface: Interface<'static>, files_path: Option<PathBuf>) -> Self {
Self(AppState {
interface: Box::leak(Box::new(interface)),
path: files_path,
cache: Arc::new(Cache::new(10_000)),
})
}
pub async fn serve(self, watch: bool, mcp: bool) -> Result<()> {
pub async fn serve(self, mcp: bool) -> Result<()> {
let state = self.0;
if let Some(websites_path) = state.websites_path.clone() {
bundle(&websites_path, state.website.to_folder_name(), watch).await?;
}
let compression_layer = CompressionLayer::new()
.br(true)
.deflate(true)
@@ -162,7 +95,7 @@ impl Server {
let router = Router::new()
.add_api_routes()
.add_website_routes(state.website)
.add_files_routes(state.path.as_ref())
.add_mcp_routes(state.interface, mcp)
.route("/version", get(Json(VERSION)))
.with_state(state)

View File

@@ -14,12 +14,10 @@ brk_error = {workspace = true}
brk_vecs = {workspace = true}
byteview = { workspace = true }
derive_deref = { workspace = true }
fjall = { workspace = true }
jiff = { workspace = true }
rapidhash = "2.0.2"
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
zerocopy = { workspace = true }
zerocopy-derive = { workspace = true }

View File

@@ -1,2 +1,3 @@
/vecs
/raw
/compressed

View File

@@ -18,6 +18,7 @@ memmap2 = "0.9.7"
parking_lot = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
zerocopy = { workspace = true }
zerocopy-derive = { workspace = true }

View File

@@ -0,0 +1,153 @@
use std::{fs, path::Path, sync::Arc};
use brk_vecs::{
AnyStoredVec, AnyVec, CollectableVec, CompressedVec, File, GenericStoredVec, Stamp,
VecIterator, Version,
};
#[allow(clippy::upper_case_acronyms)]
type VEC = CompressedVec<usize, u32>;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = fs::remove_dir_all("compressed");
let version = Version::TWO;
let file = Arc::new(File::open(Path::new("compressed"))?);
{
let mut vec: VEC = CompressedVec::forced_import(&file, "vec", version)?;
(0..21_u32).for_each(|v| {
vec.push(v);
});
let mut iter = vec.into_iter();
dbg!(iter.get(0));
dbg!(iter.get(1));
dbg!(iter.get(2));
dbg!(iter.get(20));
dbg!(iter.get(21));
drop(iter);
dbg!("flush");
vec.flush()?;
dbg!("flushed");
dbg!(vec.header());
}
{
let mut vec: VEC = CompressedVec::forced_import(&file, "vec", version)?;
vec.mut_header().update_stamp(Stamp::new(100));
let mut iter = vec.into_iter();
dbg!(iter.get(0));
dbg!(iter.get(1));
dbg!(iter.get(2));
dbg!(iter.get(3));
dbg!(iter.get(4));
dbg!(iter.get(5));
dbg!(iter.get(20));
dbg!(iter.get(20));
dbg!(iter.get(0));
drop(iter);
vec.push(21);
vec.push(22);
let mut iter = vec.into_iter();
dbg!(iter.get(20));
dbg!(iter.get(21));
dbg!(iter.get(22));
dbg!(iter.get(23));
drop(iter);
vec.flush()?;
}
{
let mut vec: VEC = CompressedVec::forced_import(&file, "vec", version)?;
let mut iter = vec.into_iter();
dbg!(iter.get(0));
dbg!(iter.get(20));
dbg!(iter.get(21));
dbg!(iter.get(22));
drop(iter);
vec.truncate_if_needed(14)?;
let mut iter = vec.into_iter();
dbg!(iter.get(0));
dbg!(iter.get(5));
dbg!(iter.get(20));
drop(iter);
dbg!(vec.collect_signed_range(Some(-5), None)?);
vec.push(vec.len() as u32);
dbg!(VecIterator::last(vec.into_iter()));
dbg!(vec.into_iter().collect::<Vec<_>>());
}
{
let mut vec: VEC = CompressedVec::forced_import(&file, "vec", version)?;
vec.reset()?;
dbg!(vec.header(), vec.pushed_len(), vec.stored_len(), vec.len());
(0..21_u32).for_each(|v| {
vec.push(v);
});
let mut iter = vec.into_iter();
dbg!(iter.get(0));
dbg!(iter.get(20));
dbg!(iter.get(21));
drop(iter);
// let reader = vec.create_static_reader();
// dbg!(vec.take(10, &reader)?);
// dbg!(vec.get_or_read(10, &reader)?);
// dbg!(vec.holes());
// drop(reader);
vec.flush()?;
dbg!(vec.holes());
}
{
let mut vec: VEC = CompressedVec::forced_import(&file, "vec", version)?;
dbg!(vec.holes());
let reader = vec.create_static_reader();
dbg!(vec.get_or_read(10, &reader)?);
drop(reader);
// vec.update(10, 10)?;
// vec.update(0, 10)?;
let reader = vec.create_static_reader();
dbg!(
vec.holes(),
vec.get_or_read(0, &reader)?,
vec.get_or_read(10, &reader)?
);
drop(reader);
vec.flush()?;
}
{
let vec: VEC = CompressedVec::forced_import(&file, "vec", version)?;
dbg!(vec.collect()?);
}
Ok(())
}

View File

@@ -99,6 +99,8 @@ fn main() -> Result<()> {
);
}
dbg!(1);
file.write_all_to_region_at(region1_i.into(), &[1; 8000], 0)?;
{
@@ -146,7 +148,7 @@ fn main() -> Result<()> {
let layout = file.layout();
assert!(layout.start_to_index().is_empty());
assert!(layout.start_to_hole().is_empty());
assert!(layout.start_to_hole().len() == 1);
}
let (region1_i, _) = file.create_region_if_needed("region1")?;

View File

@@ -12,8 +12,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = fs::remove_dir_all("raw");
let version = Version::TWO;
// let format = Format::Raw;
//
let file = Arc::new(File::open(Path::new("raw"))?);
{

View File

@@ -195,4 +195,8 @@ impl Layout {
unreachable!();
}
}
pub fn reserved(&mut self, start: u64) -> Option<u64> {
self.start_to_reserved.remove(&start)
}
}

View File

@@ -178,28 +178,27 @@ impl File {
let data_len = data.len() as u64;
let new_len = at.map_or(len + data_len, |at| (at + data_len).max(len));
let write_start = start + at.unwrap_or(len);
if at.is_some_and(|at| at >= start + reserved) {
return Err(Error::Str("Invalid at parameter"));
}
// Write to reserved space if possible
if new_len <= reserved {
// println!(
// "Write to {region_index} reserved space at {}",
// start + at.unwrap_or(len)
// );
println!("Write to {region_index} reserved space at {write_start}");
if at.is_none() {
self.write(start + len, data);
self.write(write_start, data);
}
let mut region = region_lock.write();
if let Some(at) = at {
self.write(start + at, data);
}
if len != new_len {
region.set_len(new_len);
if at.is_some() {
self.write(write_start, data);
}
region.set_len(new_len);
regions.write_to_mmap(&region, region_index);
return Ok(());
@@ -217,10 +216,7 @@ impl File {
// If is last continue writing
if layout.is_last_anything(region_index) {
// println!(
// "{region_index} Append to file at {}",
// start + at.unwrap_or(len)
// );
println!("{region_index} Append to file at {write_start}");
self.set_min_len(start + new_reserved)?;
let mut region = region_lock.write();
@@ -228,7 +224,7 @@ impl File {
drop(region);
drop(layout);
self.write(start + len, data);
self.write(write_start, data);
let mut region = region_lock.write();
region.set_len(new_len);
@@ -243,7 +239,7 @@ impl File {
.get_hole(hole_start)
.is_some_and(|gap| gap >= added_reserve)
{
// println!("Expand {region_index} to hole");
println!("Expand {region_index} to hole");
layout.remove_or_compress_hole(hole_start, added_reserve);
let mut region = region_lock.write();
@@ -251,7 +247,7 @@ impl File {
drop(region);
drop(layout);
self.write(start + len, data);
self.write(write_start, data);
let mut region = region_lock.write();
region.set_len(new_len);
@@ -262,16 +258,16 @@ impl File {
// Find hole big enough to move the region
if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
// println!("Move {region_index} to hole at {hole_start}");
println!("Move {region_index} to hole at {hole_start}");
layout.remove_or_compress_hole(hole_start, new_reserved);
drop(layout);
self.write(
hole_start,
&self.mmap.read()[start as usize..(start + len) as usize],
&self.mmap.read()[start as usize..write_start as usize],
);
self.write(hole_start + len, data);
self.write(hole_start + at.unwrap_or(len), data);
let mut region = region_lock.write();
let mut layout = self.layout.write();
@@ -288,24 +284,25 @@ impl File {
let new_start = layout.len(&regions);
// Write at the end
// println!(
// "Move {region_index} to the end, from {start}..{} to {new_start}..{}",
// start + reserved,
// new_start + new_reserved
// );
println!(
"Move {region_index} to the end, from {start}..{} to {new_start}..{}",
start + reserved,
new_start + new_reserved
);
self.set_min_len(new_start + new_reserved)?;
layout.reserve(new_start, new_reserved);
drop(layout);
self.write(
new_start,
&self.mmap.read()[start as usize..(start + len) as usize],
&self.mmap.read()[start as usize..write_start as usize],
);
self.write(new_start + len, data);
self.write(new_start + at.unwrap_or(len), data);
let mut region = region_lock.write();
let mut layout = self.layout.write();
layout.move_region(new_start, region_index, &region)?;
assert!(layout.reserved(new_start) == Some(new_reserved));
drop(layout);
region.set_start(new_start);

View File

@@ -2,7 +2,7 @@
#![doc = "\n## Examples\n"]
#![doc = "\n### File\n\n```rust"]
#![doc = include_str!("../examples/file.rs")]
#![doc = "```"]
#![doc = "```\n"]
#![doc = "\n### Raw\n\n```rust"]
#![doc = include_str!("../examples/raw.rs")]
#![doc = "```"]

View File

@@ -1,18 +0,0 @@
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
#[derive(Debug, Clone, IntoBytes, Immutable, FromBytes, KnownLayout)]
pub struct CompressedPageMetadata {
pub start: u64,
pub bytes_len: u32,
pub values_len: u32,
}
impl CompressedPageMetadata {
pub fn new(start: u64, bytes_len: u32, values_len: u32) -> Self {
Self {
start,
bytes_len,
values_len,
}
}
}

View File

@@ -1,118 +0,0 @@
use std::{
fs::{self, OpenOptions},
io::{self, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};
use rayon::prelude::*;
use zerocopy::{IntoBytes, TryFromBytes};
use crate::Result;
use super::{CompressedPageMetadata, UnsafeSlice};
#[derive(Debug, Clone)]
pub struct CompressedPagesMetadata {
vec: Vec<CompressedPageMetadata>,
change_at: Option<usize>,
path: PathBuf,
}
impl CompressedPagesMetadata {
const PAGE_SIZE: usize = size_of::<CompressedPageMetadata>();
pub fn read(path: &Path) -> Result<CompressedPagesMetadata> {
let this = Self {
vec: fs::read(path)
.unwrap_or_default()
.chunks(Self::PAGE_SIZE)
.map(|bytes| {
if bytes.len() != Self::PAGE_SIZE {
panic!()
}
CompressedPageMetadata::try_read_from_bytes(bytes).unwrap()
})
.collect::<Vec<_>>(),
path: path.to_owned(),
change_at: None,
};
Ok(this)
}
pub fn write(&mut self) -> io::Result<()> {
if self.change_at.is_none() {
return Ok(());
}
let change_at = self.change_at.take().unwrap();
let len = (self.vec.len() - change_at) * Self::PAGE_SIZE;
let mut bytes: Vec<u8> = vec![0; len];
let unsafe_bytes = UnsafeSlice::new(&mut bytes);
self.vec[change_at..]
.par_iter()
.enumerate()
.for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::PAGE_SIZE, v.as_bytes()));
let mut file = OpenOptions::new()
.read(true)
.create(true)
.truncate(false)
.append(true)
.open(&self.path)?;
file.set_len((change_at * Self::PAGE_SIZE) as u64)?;
file.seek(SeekFrom::End(0))?;
file.write_all(&bytes)?;
Ok(())
}
pub fn len(&self) -> usize {
self.vec.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn get(&self, page_index: usize) -> Option<&CompressedPageMetadata> {
self.vec.get(page_index)
}
pub fn last(&self) -> Option<&CompressedPageMetadata> {
self.vec.last()
}
pub fn pop(&mut self) -> Option<CompressedPageMetadata> {
self.vec.pop()
}
pub fn push(&mut self, page_index: usize, page: CompressedPageMetadata) {
if page_index != self.vec.len() {
panic!();
}
self.set_changed_at(page_index);
self.vec.push(page);
}
fn set_changed_at(&mut self, page_index: usize) {
if self.change_at.is_none_or(|pi| pi > page_index) {
self.change_at.replace(page_index);
}
}
pub fn truncate(&mut self, page_index: usize) -> Option<CompressedPageMetadata> {
let page = self.get(page_index).cloned();
self.vec.truncate(page_index);
self.set_changed_at(page_index);
page
}
}

View File

@@ -1,37 +1,35 @@
use std::{
borrow::Cow,
collections::{BTreeMap, BTreeSet},
fs, mem,
mem,
sync::Arc,
};
use memmap2::Mmap;
use parking_lot::{RwLock, RwLockReadGuard};
use pco::data_types::Number;
use rayon::prelude::*;
use crate::{
AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, AsInnerSlice, BaseVecIterator,
BoxedVecIterator, CollectableVec, Error, File, FromInnerSlice, GenericStoredVec, HEADER_OFFSET,
Header, RawVec, Reader, Result, StoredCompressed, StoredIndex, UnsafeSlice, Version,
BoxedVecIterator, CollectableVec, Error, File, Format, FromInnerSlice, GenericStoredVec,
HEADER_OFFSET, Header, RawVec, Reader, Result, StoredCompressed, StoredIndex, Version,
};
mod compressed_page_meta;
mod compressed_pages_meta;
mod page;
mod pages;
use compressed_page_meta::*;
use compressed_pages_meta::*;
use page::*;
use pages::*;
const ONE_KIB: usize = 1024;
const MAX_PAGE_SIZE: usize = 16 * ONE_KIB;
const PCO_COMPRESSION_LEVEL: usize = 4;
const VERSION: Version = Version::TWO;
const VERSION: Version = Version::ONE;
#[derive(Debug)]
pub struct CompressedVec<I, T> {
inner: RawVec<I, T>,
pages_meta: Arc<RwLock<CompressedPagesMetadata>>,
pages: Arc<RwLock<Pages>>,
}
impl<I, T> CompressedVec<I, T>
@@ -39,8 +37,7 @@ where
I: StoredIndex,
T: StoredCompressed,
{
pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T;
pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T;
const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T;
/// Same as import but will reset the vec under certain errors, so be careful !
pub fn forced_import(file: &Arc<File>, name: &str, mut version: Version) -> Result<Self> {
@@ -51,73 +48,50 @@ where
| Err(Error::WrongEndian)
| Err(Error::WrongLength)
| Err(Error::DifferentVersion { .. }) => {
todo!();
// let path = Self::path_(file, name);
// fs::remove_file(path)?;
// Self::import(file, name, version)
let _ = file.remove_region(Self::vec_region_name_(name).into());
let _ = file.remove_region(Self::holes_region_name_(name).into());
let _ = file.remove_region(Self::pages_region_name_(name).into());
Self::import(file, name, version)
}
_ => res,
}
}
#[allow(unreachable_code, unused_variables)]
pub fn import(file: &Arc<File>, name: &str, version: Version) -> Result<Self> {
// let mut inner = RawVec::import(file, name, version)?;
let inner = RawVec::import_(file, name, version, Format::Compressed)?;
todo!();
let pages = Pages::import(file, &Self::pages_region_name_(name))?;
// let pages_meta = {
// let path = inner
// .folder()
// .join(format!("{}-pages-meta", I::to_string()));
// if inner.is_empty() {
// let _ = fs::remove_file(&path);
// }
// CompressedPagesMetadata::read(&path)?
// };
// inner.set_stored_len(if let Some(last) = pages_meta.last() {
// (pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize
// } else {
// 0
// });
// Ok(Self {
// inner,
// pages_meta: Arc::new(RwLock::new(pages_meta)),
// })
Ok(Self {
inner,
pages: Arc::new(RwLock::new(pages)),
})
}
fn decode_page(&self, page_index: usize, reader: &Reader) -> Result<Vec<T>> {
Self::decode_page_(
self.stored_len(),
page_index,
reader,
&self.pages_meta.read(),
)
Self::decode_page_(self.stored_len(), page_index, reader, &self.pages.read())
}
fn decode_page_(
stored_len: usize,
page_index: usize,
reader: &Reader,
compressed_pages_meta: &CompressedPagesMetadata,
pages: &Pages,
) -> Result<Vec<T>> {
if Self::page_index_to_index(page_index) >= stored_len {
return Err(Error::IndexTooHigh);
} else if compressed_pages_meta.len() <= page_index {
} else if page_index >= pages.len() {
return Err(Error::ExpectVecToHaveIndex);
}
let page = compressed_pages_meta.get(page_index).unwrap();
let len = page.bytes_len as usize;
let offset = page.start as usize;
let page = pages.get(page_index).unwrap();
let len = page.bytes as u64;
let offset = page.start;
let slice: &[u8] = reader.read(offset as u64, (offset + len) as u64);
let slice = reader.read(offset, len);
let vec: Vec<T::NumberType> = pco::standalone::simple_decompress(slice)?;
let vec: Vec<T> = T::from_inner_slice(vec);
let vec = T::from_inner_slice(vec);
Ok(vec)
}
@@ -156,13 +130,17 @@ where
iter.set_(i);
iter
}
fn pages_region_name_(name: &str) -> String {
format!("{}_pages", Self::vec_region_name_(name))
}
}
impl<I, T> Clone for CompressedVec<I, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
pages_meta: self.pages_meta.clone(),
pages: self.pages.clone(),
}
}
}
@@ -221,92 +199,87 @@ where
#[inline]
fn stored_len(&self) -> usize {
self.inner.stored_len()
self.pages.read().stored_len(Self::PER_PAGE)
}
fn flush(&mut self) -> Result<()> {
todo!();
self.inner.write_header_if_needed()?;
// let file_opt = self.inner.write_header_if_needed()?;
let pushed_len = self.pushed_len();
// let pushed_len = self.pushed_len();
let has_new_data = pushed_len != 0;
// if pushed_len == 0 {
// return Ok(());
// }
if !has_new_data {
return Ok(());
}
// let stored_len = self.stored_len();
let stored_len = self.stored_len();
// let mut file = file_opt.unwrap_or(self.open_file()?);
let mut pages = self.pages.write();
// let mut pages_meta = self.pages_meta.read();
let mut starting_page_index = pages.len();
let mut values = vec![];
let mut truncate_at = None;
// let mut starting_page_index = pages_meta.len();
// let mut values = vec![];
// let mut truncate_at = None;
if stored_len % Self::PER_PAGE != 0 {
if pages.is_empty() {
unreachable!()
}
// if self.stored_len() % Self::PER_PAGE != 0 {
// if pages_meta.is_empty() {
// unreachable!()
// }
let last_page_index = pages.len() - 1;
// let last_page_index = pages_meta.len() - 1;
let reader = self.create_reader();
// let mmap = unsafe { Mmap::map(&file)? };
values = Self::decode_page_(stored_len, last_page_index, &reader, &pages)
.inspect_err(|_| {
dbg!(last_page_index, &pages);
})
.unwrap();
// values = Self::decode_page_(stored_len, last_page_index, &mmap, &pages_meta)
// .inspect_err(|_| {
// dbg!(last_page_index, &pages_meta);
// })
// .unwrap();
truncate_at.replace(pages.pop().unwrap().start);
starting_page_index = last_page_index;
}
// truncate_at.replace(pages_meta.pop().unwrap().start);
// starting_page_index = last_page_index;
// }
let compressed = values
.into_par_iter()
.chain(mem::take(self.inner.mut_pushed()).into_par_iter())
.chunks(Self::PER_PAGE)
.map(|chunk| (Self::compress_page(chunk.as_slice()), chunk.len()))
.collect::<Vec<_>>();
// let compressed = values
// .into_par_iter()
// .chain(mem::take(self.mut_pushed()).into_par_iter())
// .chunks(Self::PER_PAGE)
// .map(|chunk| (Self::compress_page(chunk.as_ref()), chunk.len()))
// .collect::<Vec<_>>();
compressed.iter().enumerate().for_each(|(i, (bytes, len))| {
let page_index = starting_page_index + i;
// compressed
// .iter()
// .enumerate()
// .for_each(|(i, (compressed_bytes, values_len))| {
// let page_index = starting_page_index + i;
let start = if page_index != 0 {
let prev = pages.get(page_index - 1).unwrap();
prev.start + prev.bytes as u64
} else {
0
};
// let start = if page_index != 0 {
// let prev = pages_meta.get(page_index - 1).unwrap();
// prev.start + prev.bytes_len as u64
// } else {
// 0
// };
// let offsetted_start = start + HEADER_OFFSET as u64;
let page = Page::new(
start + HEADER_OFFSET as u64,
bytes.len() as u32,
*len as u32,
);
// let bytes_len = compressed_bytes.len() as u32;
// let values_len = *values_len as u32;
pages.checked_push(page_index, page);
});
// let page = CompressedPageMetadata::new(offsetted_start, bytes_len, values_len);
let buf = compressed
.into_iter()
.flat_map(|(v, _)| v)
.collect::<Vec<_>>();
// pages_meta.push(page_index, page);
// });
let file = self.file();
// let buf = compressed
// .into_iter()
// .flat_map(|(v, _)| v)
// .collect::<Vec<_>>();
if let Some(truncate_at) = truncate_at {
file.truncate_region(self.region_index().into(), truncate_at)?;
}
// pages_meta.write()?;
file.write_all_to_region(self.region_index().into(), &buf)?;
// if let Some(truncate_at) = truncate_at {
// self.file_set_len(&mut file, truncate_at)?;
// }
// self.file_write_all(&mut file, &buf)?;
// self.pages_meta.store(Arc::new(pages_meta));
pages.flush(file)?;
Ok(())
}
@@ -321,11 +294,10 @@ where
fn read_(&self, index: usize, reader: &Reader) -> Result<T> {
let page_index = Self::index_to_page_index(index);
let decoded_index = index % Self::PER_PAGE;
Ok(unsafe {
self.decode_page(page_index, reader)?
*self
.decode_page(page_index, reader)?
.get_unchecked(decoded_index)
.clone()
})
}
@@ -355,10 +327,8 @@ where
}
fn reset(&mut self) -> Result<()> {
// let mut pages_meta = (**self.pages_meta.load()).clone();
// pages_meta.truncate(0);
// pages_meta.write()?;
// self.pages_meta.store(Arc::new(pages_meta));
let file = self.file();
self.pages.write().reset(file)?;
self.reset_()
}
@@ -374,36 +344,47 @@ where
return Ok(());
}
let mut pages_meta = self.pages_meta.write();
let stored_len = self.stored_len();
let mut pages = self.pages.write();
let last_page_index = pages.len() - 1;
let page_index = Self::index_to_page_index(index);
let reader = self.create_static_reader();
let values = self.decode_page(page_index, &reader)?;
drop(reader);
let values = Self::decode_page_(
stored_len,
last_page_index,
&self.create_static_reader(),
&pages,
)?;
let mut buf = vec![];
let mut page = pages_meta.truncate(page_index).unwrap();
let len = page.start;
let mut page = pages.truncate(page_index).unwrap();
let decoded_index = index % Self::PER_PAGE;
let from = page.start;
if decoded_index != 0 {
let chunk = &values[..decoded_index];
buf = Self::compress_page(chunk);
page.values_len = chunk.len() as u32;
page.bytes_len = buf.len() as u32;
page.values = chunk.len() as u32;
page.bytes = buf.len() as u32;
pages_meta.push(page_index, page);
pages.checked_push(page_index, page);
}
pages_meta.write()?;
let file = self.file();
// self.file_truncate_and_write_all(&mut file, len, &buf)?;
pages.flush(file)?;
file.truncate_region(self.region_index().into(), from)?;
file.write_all_to_region(self.region_index().into(), &buf)?;
Ok(())
}
@@ -413,8 +394,8 @@ where
pub struct CompressedVecIterator<'a, I, T> {
vec: &'a CompressedVec<I, T>,
reader: Reader<'a>,
decoded_page: Option<(usize, Vec<T>)>,
pages_meta: RwLockReadGuard<'a, CompressedPagesMetadata>,
decoded: Option<(usize, Vec<T>)>,
pages: RwLockReadGuard<'a, Pages>,
stored_len: usize,
index: usize,
}
@@ -472,23 +453,23 @@ where
} else {
let page_index = i / Self::PER_PAGE;
if self.decoded_page.as_ref().is_none_or(|b| b.0 != page_index) {
if self.decoded.as_ref().is_none_or(|b| b.0 != page_index) {
let values = CompressedVec::<I, T>::decode_page_(
stored_len,
page_index,
&self.reader,
&self.pages_meta,
&self.pages,
)
.unwrap();
self.decoded_page.replace((page_index, values));
self.decoded.replace((page_index, values));
}
self.decoded_page
self.decoded
.as_ref()
.unwrap()
.1
.get(i % Self::PER_PAGE)
.map(|v| (I::from(i), Cow::Owned(v.clone())))
.map(|v| (I::from(i), Cow::Owned(*v)))
};
self.index += 1;
@@ -506,14 +487,14 @@ where
type IntoIter = CompressedVecIterator<'a, I, T>;
fn into_iter(self) -> Self::IntoIter {
let pages_meta = self.pages_meta.read();
let pages = self.pages.read();
let stored_len = self.stored_len();
CompressedVecIterator {
vec: self,
reader: self.create_static_reader(),
decoded_page: None,
pages_meta,
decoded: None,
pages,
index: 0,
stored_len,
}

View File

@@ -0,0 +1,19 @@
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
#[derive(Debug, Clone, IntoBytes, Immutable, FromBytes, KnownLayout)]
#[repr(C)]
pub struct Page {
pub start: u64,
pub bytes: u32,
pub values: u32,
}
impl Page {
pub fn new(start: u64, bytes: u32, values: u32) -> Self {
Self {
start,
bytes,
values,
}
}
}

View File

@@ -0,0 +1,122 @@
use std::sync::Arc;
use parking_lot::RwLock;
use zerocopy::{FromBytes, IntoBytes};
use crate::{
File, Result,
file::{Region, RegionReader},
};
use super::Page;
#[derive(Debug, Clone)]
pub struct Pages {
_region: Arc<RwLock<Region>>,
region_index: usize,
vec: Vec<Page>,
change_at: Option<usize>,
}
impl Pages {
const SIZE_OF_PAGE: usize = size_of::<Page>();
pub fn import(file: &File, name: &str) -> Result<Self> {
let (region_index, _region) = file.create_region_if_needed(name)?;
let vec = _region
.read()
.create_reader(file)
.read_all()
.chunks(Self::SIZE_OF_PAGE)
.map(|b| Page::read_from_bytes(b).map_err(|e| e.into()))
.collect::<Result<_>>()?;
Ok(Self {
_region,
region_index,
vec,
change_at: None,
})
}
pub fn flush(&mut self, file: &File) -> Result<()> {
if self.change_at.is_none() {
return Ok(());
}
let change_at = self.change_at.take().unwrap();
let at = (change_at * Self::SIZE_OF_PAGE) as u64;
file.truncate_region(self.region_index.into(), at)?;
file.write_all_to_region_at(
self.region_index.into(),
self.vec[change_at..].as_bytes(),
at,
)?;
Ok(())
}
pub fn len(&self) -> usize {
self.vec.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn get(&self, page_index: usize) -> Option<&Page> {
self.vec.get(page_index)
}
pub fn last(&self) -> Option<&Page> {
self.vec.last()
}
pub fn pop(&mut self) -> Option<Page> {
let popped = self.vec.pop();
if popped.is_some() {
self.set_changed_at(self.vec.len());
}
popped
}
pub fn checked_push(&mut self, page_index: usize, page: Page) {
if page_index != self.vec.len() {
panic!();
}
self.set_changed_at(page_index);
self.vec.push(page);
}
fn set_changed_at(&mut self, page_index: usize) {
if self.change_at.is_none_or(|pi| pi > page_index) {
self.change_at.replace(page_index);
}
}
pub fn reset(&mut self, file: &File) -> Result<()> {
self.truncate(0);
self.flush(file)
}
pub fn truncate(&mut self, page_index: usize) -> Option<Page> {
let page = self.get(page_index).cloned();
self.vec.truncate(page_index);
self.set_changed_at(page_index);
page
}
pub fn stored_len(&self, per_page: usize) -> usize {
if let Some(last) = self.last() {
(self.len() - 1) * per_page + last.values as usize
} else {
0
}
}
}

View File

@@ -1,4 +1,4 @@
use serde::{Deserialize, Serialize};
use serde_derive::{Deserialize, Serialize};
#[derive(Default, Debug, PartialEq, PartialOrd, Ord, Eq, Clone, Copy, Serialize, Deserialize)]
pub enum Computation {

View File

@@ -51,6 +51,11 @@ impl Header {
self.inner.write().stamp = stamp;
}
pub fn update_format(&mut self, format: Format) {
self.modified = true;
self.inner.write().compressed = ZeroCopyBool::from(format);
}
pub fn update_computed_version(&mut self, computed_version: Version) {
self.modified = true;
self.inner.write().computed_version = computed_version;

View File

@@ -7,8 +7,7 @@ use std::{
};
use parking_lot::RwLock;
use rayon::prelude::*;
use zerocopy::FromBytes;
use zerocopy::{FromBytes, IntoBytes};
use crate::{
AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, BaseVecIterator, BoxedVecIterator,
@@ -19,10 +18,8 @@ use crate::{
use super::Format;
mod header;
mod unsafe_slice;
pub use header::*;
pub use unsafe_slice::*;
const VERSION: Version = Version::ONE;
@@ -64,25 +61,24 @@ where
}
pub fn import(file: &Arc<File>, name: &str, version: Version) -> Result<Self> {
Self::import_(file, name, version, Format::Raw)
}
pub fn import_(file: &Arc<File>, name: &str, version: Version, format: Format) -> Result<Self> {
let (region_index, region) = file.create_region_if_needed(&Self::vec_region_name_(name))?;
let region_len = region.read().len() as usize;
if region_len > 0
&& (region_len < HEADER_OFFSET || (region_len - HEADER_OFFSET) % Self::SIZE_OF_T != 0)
&& (region_len < HEADER_OFFSET
|| (format.is_raw() && (region_len - HEADER_OFFSET) % Self::SIZE_OF_T != 0))
{
return Err(Error::Str("Region was saved incorrectly"));
}
let header = if region_len == 0 {
Header::create_and_write(file, region_index, version, Format::Raw)?
Header::create_and_write(file, region_index, version, format)?
} else {
Header::import_and_verify(
file,
region_index,
region.read().len(),
version,
Format::Raw,
)?
Header::import_and_verify(file, region_index, region.read().len(), version, format)?
};
let holes = if let Ok(holes) = file.get_region(Self::holes_region_name_(name).into()) {
@@ -200,13 +196,7 @@ where
#[inline]
fn stored_len(&self) -> usize {
(self
.file
.get_region(self.region_index.into())
.unwrap()
.len() as usize
- HEADER_OFFSET)
/ Self::SIZE_OF_T
(self.region.read().len() as usize - HEADER_OFFSET) / Self::SIZE_OF_T
}
fn flush(&mut self) -> Result<()> {
@@ -227,22 +217,10 @@ where
let file = &self.file;
if has_new_data {
let bytes = {
let mut bytes: Vec<u8> = vec![0; pushed_len * Self::SIZE_OF_T];
let unsafe_bytes = UnsafeSlice::new(&mut bytes);
mem::take(&mut self.pushed)
.into_par_iter()
.enumerate()
.for_each(|(i, v)| {
unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())
});
bytes
};
file.write_all_to_region(self.region_index.into(), &bytes)?;
file.write_all_to_region(
self.region_index.into(),
mem::take(&mut self.pushed).as_bytes(),
)?;
}
if has_updated_data {

View File

@@ -1,35 +0,0 @@
use std::cell::UnsafeCell;
#[derive(Copy, Clone)]
pub struct UnsafeSlice<'a, T>(&'a [UnsafeCell<T>]);
unsafe impl<T: Send + Sync> Send for UnsafeSlice<'_, T> {}
unsafe impl<T: Send + Sync> Sync for UnsafeSlice<'_, T> {}
impl<'a, T> UnsafeSlice<'a, T> {
pub fn new(slice: &'a mut [T]) -> Self {
let ptr = slice as *mut [T] as *const [UnsafeCell<T>];
Self(unsafe { &*ptr })
}
/// SAFETY: It is UB if two threads write to the same index without
/// synchronization.
pub fn write(&self, i: usize, value: T) {
unsafe {
*self.0[i].get() = value;
}
}
/// SAFETY: It is UB
pub fn get(&self, i: usize) -> *mut T {
self.0[i].get()
}
pub fn copy_slice(&self, start: usize, slice: &[T])
where
T: Copy,
{
slice.iter().enumerate().for_each(|(i, v)| {
self.write(start + i, *v);
});
}
}

View File

@@ -1,6 +1,6 @@
use std::{fs, io, path::Path};
use serde::{Deserialize, Serialize};
use serde_derive::{Deserialize, Serialize};
use crate::{Error, Result};