diff --git a/Cargo.lock b/Cargo.lock index 51de1dc30..6751a8ca0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -513,9 +513,11 @@ name = "brk_cli" version = "0.0.83" dependencies = [ "bitcoincore-rpc", + "brk_bundler", "brk_computer", "brk_fetcher", "brk_indexer", + "brk_interface", "brk_logger", "brk_parser", "brk_server", @@ -524,9 +526,11 @@ dependencies = [ "clap_derive", "color-eyre", "log", + "minreq", "serde", "tokio", "toml", + "zip", ] [[package]] @@ -559,6 +563,7 @@ dependencies = [ "fjall", "jiff", "minreq", + "serde_json", "zerocopy", ] @@ -1003,7 +1008,6 @@ version = "0.0.83" dependencies = [ "axum", "bitcoincore-rpc", - "brk_bundler", "brk_computer", "brk_error", "brk_fetcher", @@ -1013,19 +1017,13 @@ dependencies = [ "brk_mcp", "brk_parser", "brk_vecs", - "clap", - "clap_derive", "jiff", "log", - "minreq", - "owo-colors", "quick_cache", - "serde", "serde_json", "tokio", "tower-http", "tracing", - "zip", ] [[package]] @@ -1061,12 +1059,10 @@ dependencies = [ "brk_vecs", "byteview", "derive_deref", - "fjall", "jiff", "rapidhash", "serde", "serde_bytes", - "serde_json", "zerocopy", "zerocopy-derive", ] @@ -1084,6 +1080,7 @@ dependencies = [ "pco", "rayon", "serde", + "serde_derive", "serde_json", "zerocopy", "zerocopy-derive", @@ -2507,9 +2504,9 @@ checksum = "610a5acd306ec67f907abe5567859a3c693fb9886eb1f012ab8f2a47bef3db51" [[package]] name = "notify" -version = "8.1.0" +version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3163f59cd3fa0e9ef8c32f242966a7b9994fd7378366099593e0e73077cd8c97" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" dependencies = [ "bitflags 2.9.1", "fsevent-sys", @@ -2638,11 +2635,12 @@ dependencies = [ [[package]] name = "oxc-browserslist" -version = "2.0.12" +version = "2.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e05d19022f54d3e0b8b1679c80f02d140e95e4308385eb247ba3168c299c81bb" +checksum = "b94a2f0da0105f3a813eeb6b182a791b2dc491f494432e1953fd8e72c1f3887d" dependencies = [ "bincode", + "flate2", "nom", "rustc-hash", "serde", @@ -3356,9 +3354,9 @@ dependencies = [ [[package]] name = "quick_cache" -version = "0.6.15" +version = "0.6.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8565e62e02af316570d4b492f17af1481d6c07cea60f4e7edd71700da5052ba9" +checksum = "9ad6644cb07b7f3488b9f3d2fde3b4c0a7fa367cafefb39dff93a659f76eb786" dependencies = [ "ahash", "equivalent", @@ -4220,9 +4218,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.15" +version = "0.7.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index ea0196499..3f03bb547 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,14 +41,11 @@ brk_store = { version = "0.0.83", path = "crates/brk_store" } brk_vecs = { version = "0.0.83", path = "crates/brk_vecs" } brk_vecs_macros = { version = "0.0.83", path = "crates/brk_vecs_macros" } byteview = "=0.6.1" -clap = { version = "4.5.42", features = ["string"] } -clap_derive = "4.5.41" derive_deref = "1.1.1" fjall = "2.11.2" jiff = "0.2.15" log = "0.4.27" minreq = { version = "2.14.0", features = ["https", "serde_json"] } -owo-colors = "4.2.2" parking_lot = "0.12.4" rayon = "1.10.0" serde = "1.0.219" diff --git a/crates/brk_bundler/Cargo.toml b/crates/brk_bundler/Cargo.toml index f3f6ebb68..160acc309 100644 --- a/crates/brk_bundler/Cargo.toml +++ b/crates/brk_bundler/Cargo.toml @@ -9,7 +9,7 @@ repository.workspace = true [dependencies] log = { workspace = true } -notify = "8.1.0" +notify = "8.2.0" brk_rolldown = "0.1.1" # brk_rolldown = { path = "../../../rolldown/crates/rolldown"} sugar_path = "1.2.0" diff --git a/crates/brk_bundler/src/lib.rs b/crates/brk_bundler/src/lib.rs index c86a16872..e9dc3bc3b 100644 --- a/crates/brk_bundler/src/lib.rs +++ b/crates/brk_bundler/src/lib.rs @@ -1,4 +1,8 @@ -use std::{fs, io, path::Path, sync::Arc}; +use std::{ + fs, io, + path::{Path, PathBuf}, + sync::Arc, +}; use brk_rolldown::{Bundler, BundlerOptions, RawMinifyOptions, SourceMapType}; use log::error; @@ -8,7 +12,7 @@ use tokio::sync::Mutex; const VERSION: &str = env!("CARGO_PKG_VERSION"); -pub async fn bundle(websites_path: &Path, source_folder: &str, watch: bool) -> io::Result<()> { +pub async fn bundle(websites_path: &Path, source_folder: &str, watch: bool) -> io::Result { let source_path = websites_path.join(source_folder); let dist_path = websites_path.join("dist"); @@ -72,7 +76,7 @@ pub async fn bundle(websites_path: &Path, source_folder: &str, watch: bool) -> i write_sw(); if !watch { - return Ok(()); + return Ok(dist_path); } tokio::spawn(async move { @@ -127,7 +131,7 @@ pub async fn bundle(websites_path: &Path, source_folder: &str, watch: bool) -> i watcher.start().await; }); - Ok(()) + Ok(dist_path) } fn copy_dir_all(src: impl AsRef, dst: impl AsRef) -> io::Result<()> { diff --git a/crates/brk_cli/Cargo.toml b/crates/brk_cli/Cargo.toml index da73ca3bb..75d841bde 100644 --- a/crates/brk_cli/Cargo.toml +++ b/crates/brk_cli/Cargo.toml @@ -9,20 +9,24 @@ repository.workspace = true [dependencies] bitcoincore-rpc = { workspace = true } +brk_bundler = { workspace = true } brk_computer = { workspace = true } brk_fetcher = { workspace = true } brk_indexer = { workspace = true } +brk_interface = { workspace = true } brk_logger = { workspace = true } brk_parser = { workspace = true } brk_server = { workspace = true } brk_vecs = { workspace = true } -clap = { workspace = true } -clap_derive = { workspace = true } +clap = { version = "4.5.42", features = ["string"] } +clap_derive = "4.5.41" color-eyre = "0.6.5" log = { workspace = true } +minreq = { workspace = true } serde = { workspace = true } tokio = { workspace = true } toml = "0.9.4" +zip = { version = "4.3.0", default-features = false, features = ["deflate"] } [[bin]] name = "brk" diff --git a/crates/brk_cli/README.md b/crates/brk_cli/README.md index 023057bb4..a93bfaee8 100644 --- a/crates/brk_cli/README.md +++ b/crates/brk_cli/README.md @@ -64,10 +64,10 @@ You can find a pre-built binary for your operating system in the [releases page] ```bash # Install -cargo install brk # or `cargo install brk_cli`, the result is the same +cargo install brk --locked # or `cargo install brk_cli`, the result is the same # Update -cargo install brk # or `cargo install-update -a` if you have `cargo-update` installed +cargo install brk --locked # or `cargo install-update -a` if you have `cargo-update` installed ``` ### Source diff --git a/crates/brk_server/src/api/interface/bridge.rs b/crates/brk_cli/src/bridge.rs similarity index 98% rename from crates/brk_server/src/api/interface/bridge.rs rename to crates/brk_cli/src/bridge.rs index 5f0cd2bce..9b679f566 100644 --- a/crates/brk_server/src/api/interface/bridge.rs +++ b/crates/brk_cli/src/bridge.rs @@ -1,8 +1,9 @@ use std::{fs, io, path::Path}; use brk_interface::{Index, Interface}; +use brk_server::VERSION; -use crate::{VERSION, Website}; +use crate::website::Website; const SCRIPTS: &str = "scripts"; diff --git a/crates/brk_cli/src/config.rs b/crates/brk_cli/src/config.rs index e74571963..63b27e454 100644 --- a/crates/brk_cli/src/config.rs +++ b/crates/brk_cli/src/config.rs @@ -5,13 +5,12 @@ use std::{ use bitcoincore_rpc::{self, Auth, Client}; use brk_fetcher::Fetcher; -use brk_server::Website; use clap::Parser; use clap_derive::Parser; use color_eyre::eyre::eyre; use serde::{Deserialize, Deserializer, Serialize}; -use crate::{default_bitcoin_path, default_brk_path, dot_brk_path}; +use crate::{default_bitcoin_path, default_brk_path, dot_brk_path, website::Website}; const DOWNLOADS: &str = "downloads"; diff --git a/crates/brk_cli/src/lib.rs b/crates/brk_cli/src/lib.rs index b96bc45af..dd48bd680 100644 --- a/crates/brk_cli/src/lib.rs +++ b/crates/brk_cli/src/lib.rs @@ -1,13 +1,28 @@ #![doc = include_str!("../README.md")] -use std::{fs, thread}; +use std::{ + fs, + io::Cursor, + path::Path, + thread::{self, sleep}, + time::Duration, +}; +use bitcoincore_rpc::{self, RpcApi}; +use brk_bundler::bundle; +use brk_computer::Computer; +use brk_indexer::Indexer; +use brk_interface::Interface; +use brk_server::{Server, VERSION}; +use brk_vecs::Exit; +use log::info; + +mod bridge; mod config; mod paths; -mod run; +mod website; -pub use paths::*; -use run::*; +use crate::{bridge::Bridge, config::Config, paths::*}; pub fn main() -> color_eyre::Result<()> { color_eyre::install()?; @@ -22,3 +37,118 @@ pub fn main() -> color_eyre::Result<()> { .join() .unwrap() } + +pub fn run() -> color_eyre::Result<()> { + let config = Config::import()?; + + let rpc = config.rpc()?; + + let exit = Exit::new(); + exit.set_ctrlc_handler(); + + let parser = brk_parser::Parser::new(config.blocksdir(), config.brkdir(), rpc); + + let mut indexer = Indexer::forced_import(&config.brkdir())?; + + let wait_for_synced_node = |rpc_client: &bitcoincore_rpc::Client| -> color_eyre::Result<()> { + let is_synced = || -> color_eyre::Result { + let info = rpc_client.get_blockchain_info()?; + Ok(info.headers == info.blocks) + }; + + if !is_synced()? { + info!("Waiting for node to be synced..."); + while !is_synced()? { + sleep(Duration::from_secs(1)) + } + } + + Ok(()) + }; + + let mut computer = Computer::forced_import(&config.brkdir(), &indexer, config.fetcher())?; + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()? + .block_on(async { + let interface = Interface::build(&indexer, &computer); + + let website = config.website(); + + let downloads_path = config.downloads_dir(); + + let bundle_path = if website.is_some() { + let websites_dev_path = Path::new("../../websites"); + + let websites_path = if fs::exists(websites_dev_path)? { + websites_dev_path.to_path_buf() + } else { + let downloaded_websites_path = + downloads_path.join(format!("brk-{VERSION}")).join("websites"); + + if !fs::exists(&downloaded_websites_path)? { + info!("Downloading websites from Github..."); + + let url = format!( + "https://github.com/bitcoinresearchkit/brk/archive/refs/tags/v{VERSION}.zip", + ); + + let response = minreq::get(url).send()?; + let bytes = response.as_bytes(); + let cursor = Cursor::new(bytes); + + let mut zip = zip::ZipArchive::new(cursor).unwrap(); + + zip.extract(downloads_path).unwrap(); + } + + downloaded_websites_path + }; + + interface.generate_bridge_file(website, websites_path.as_path())?; + + let watch = config.watch(); + + Some(bundle(&websites_path, website.to_folder_name(), watch).await?) + } else { + None + }; + + let server = Server::new( + interface, + bundle_path, + ); + + let mcp = config.mcp(); + + tokio::spawn(async move { + server.serve(mcp).await.unwrap(); + }); + + sleep(Duration::from_secs(1)); + + loop { + wait_for_synced_node(rpc)?; + + let block_count = rpc.get_block_count()?; + + info!("{} blocks found.", block_count + 1); + + let starting_indexes = + indexer.index(&parser, rpc, &exit, config.check_collisions())?; + + computer.compute(&indexer, starting_indexes, &exit)?; + + if let Some(delay) = config.delay() { + sleep(Duration::from_secs(delay)) + } + + info!("Waiting for new blocks..."); + + while block_count == rpc.get_block_count()? { + sleep(Duration::from_secs(1)) + } + } + }) +} diff --git a/crates/brk_cli/src/run.rs b/crates/brk_cli/src/run.rs deleted file mode 100644 index aa5a0c22b..000000000 --- a/crates/brk_cli/src/run.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::{thread::sleep, time::Duration}; - -use bitcoincore_rpc::{self, RpcApi}; -use brk_computer::Computer; -use brk_indexer::Indexer; -use brk_server::Server; -use brk_vecs::Exit; -use log::info; - -use crate::config::Config; - -pub fn run() -> color_eyre::Result<()> { - let config = Config::import()?; - - let rpc = config.rpc()?; - - let exit = Exit::new(); - exit.set_ctrlc_handler(); - - let parser = brk_parser::Parser::new(config.blocksdir(), config.brkdir(), rpc); - - let mut indexer = Indexer::forced_import(&config.brkdir())?; - - let wait_for_synced_node = |rpc_client: &bitcoincore_rpc::Client| -> color_eyre::Result<()> { - let is_synced = || -> color_eyre::Result { - let info = rpc_client.get_blockchain_info()?; - Ok(info.headers == info.blocks) - }; - - if !is_synced()? { - info!("Waiting for node to be synced..."); - while !is_synced()? { - sleep(Duration::from_secs(1)) - } - } - - Ok(()) - }; - - let mut computer = Computer::forced_import(&config.brkdir(), &indexer, config.fetcher())?; - - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build()? - .block_on(async { - let served_indexer = indexer.clone(); - let served_computer = computer.clone(); - - let server = Server::new( - served_indexer, - served_computer, - config.website(), - &config.downloads_dir(), - )?; - - let watch = config.watch(); - let mcp = config.mcp(); - - tokio::spawn(async move { - server.serve(watch, mcp).await.unwrap(); - }); - - sleep(Duration::from_secs(1)); - - loop { - wait_for_synced_node(rpc)?; - - let block_count = rpc.get_block_count()?; - - info!("{} blocks found.", block_count + 1); - - let starting_indexes = - indexer.index(&parser, rpc, &exit, config.check_collisions())?; - - computer.compute(&indexer, starting_indexes, &exit)?; - - if let Some(delay) = config.delay() { - sleep(Duration::from_secs(delay)) - } - - info!("Waiting for new blocks..."); - - while block_count == rpc.get_block_count()? { - sleep(Duration::from_secs(1)) - } - } - }) -} diff --git a/crates/brk_server/src/files/website.rs b/crates/brk_cli/src/website.rs similarity index 91% rename from crates/brk_server/src/files/website.rs rename to crates/brk_cli/src/website.rs index 2a4fb4660..209d8edcd 100644 --- a/crates/brk_server/src/files/website.rs +++ b/crates/brk_cli/src/website.rs @@ -17,7 +17,7 @@ impl Website { !self.is_none() } - pub fn to_folder_name(&self) -> &str { + pub fn to_folder_name(self) -> &'static str { match self { Self::Custom => "custom", Self::Default => "default", diff --git a/crates/brk_computer/src/lib.rs b/crates/brk_computer/src/lib.rs index 28388ece8..7791c15f0 100644 --- a/crates/brk_computer/src/lib.rs +++ b/crates/brk_computer/src/lib.rs @@ -239,4 +239,8 @@ impl Computer { .flatten() .collect::>() } + + pub fn static_clone(&self) -> &'static Self { + Box::leak(Box::new(self.clone())) + } } diff --git a/crates/brk_error/Cargo.toml b/crates/brk_error/Cargo.toml index c6324f200..9a3987e28 100644 --- a/crates/brk_error/Cargo.toml +++ b/crates/brk_error/Cargo.toml @@ -13,4 +13,5 @@ bitcoincore-rpc = { workspace = true } fjall = { workspace = true } jiff = { workspace = true } minreq = { workspace = true } +serde_json = { workspace = true } zerocopy = { workspace = true } diff --git a/crates/brk_error/src/lib.rs b/crates/brk_error/src/lib.rs index df167ddd7..00a57b77f 100644 --- a/crates/brk_error/src/lib.rs +++ b/crates/brk_error/src/lib.rs @@ -13,12 +13,14 @@ pub enum Error { Fjall(fjall::Error), Minreq(minreq::Error), SystemTimeError(time::SystemTimeError), + SerdeJson(serde_json::Error), ZeroCopyError, Vecs(brk_vecs::Error), WrongLength, WrongAddressType, UnindexableDate, + QuickCacheError, Str(&'static str), String(String), } @@ -29,6 +31,12 @@ impl From for Error { } } +impl From for Error { + fn from(error: serde_json::Error) -> Self { + Self::SerdeJson(error) + } +} + impl From for Error { fn from(value: io::Error) -> Self { Self::IO(value) @@ -82,6 +90,7 @@ impl fmt::Display for Error { match self { Error::IO(error) => Debug::fmt(&error, f), Error::Minreq(error) => Debug::fmt(&error, f), + Error::SerdeJson(error) => Debug::fmt(&error, f), Error::Vecs(error) => Debug::fmt(&error, f), Error::BitcoinRPC(error) => Debug::fmt(&error, f), Error::SystemTimeError(error) => Debug::fmt(&error, f), @@ -90,6 +99,7 @@ impl fmt::Display for Error { Error::ZeroCopyError => write!(f, "ZeroCopy error"), Error::WrongLength => write!(f, "Wrong length"), + Error::QuickCacheError => write!(f, "Quick cache error"), Error::WrongAddressType => write!(f, "Wrong address type"), Error::UnindexableDate => write!( f, diff --git a/crates/brk_indexer/src/indexes.rs b/crates/brk_indexer/src/indexes.rs index 917acda47..7e906d0ec 100644 --- a/crates/brk_indexer/src/indexes.rs +++ b/crates/brk_indexer/src/indexes.rs @@ -221,7 +221,7 @@ where I: StoredRaw + StoredIndex + From, T: StoredRaw, { - let h = Height::from(u64::from(height_to_index.stamp())); + let h = Height::from(height_to_index.stamp()); if h.is_zero() { None } else if h + 1_u32 == starting_height { diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 382f99205..e457e6a81 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -800,6 +800,10 @@ impl Indexer { Ok(starting_indexes) } + + pub fn static_clone(&self) -> &'static Self { + Box::leak(Box::new(self.clone())) + } } #[derive(Debug)] diff --git a/crates/brk_indexer/src/vecs.rs b/crates/brk_indexer/src/vecs.rs index 932fab6ec..edd571b73 100644 --- a/crates/brk_indexer/src/vecs.rs +++ b/crates/brk_indexer/src/vecs.rs @@ -443,7 +443,7 @@ impl Vecs { pub fn flush(&mut self, height: Height) -> Result<()> { self.mut_vecs() .into_par_iter() - .try_for_each(|vec| vec.stamped_flush(Stamp::from(u64::from(height))))?; + .try_for_each(|vec| vec.stamped_flush(Stamp::from(height)))?; Ok(()) } @@ -451,7 +451,7 @@ impl Vecs { self.mut_vecs() .into_iter() .map(|vec| { - let h = Height::from(u64::from(vec.stamp())); + let h = Height::from(vec.stamp()); if h > Height::ZERO { h.incremented() } else { h } }) .min() diff --git a/crates/brk_interface/src/lib.rs b/crates/brk_interface/src/lib.rs index b48920587..f2a25252e 100644 --- a/crates/brk_interface/src/lib.rs +++ b/crates/brk_interface/src/lib.rs @@ -32,23 +32,28 @@ use vecs::Vecs; use crate::vecs::{IdToVec, IndexToVec}; +#[allow(dead_code)] pub struct Interface<'a> { vecs: Vecs<'a>, - _indexer: &'a Indexer, - _computer: &'a Computer, + indexer: &'a Indexer, + computer: &'a Computer, } impl<'a> Interface<'a> { - pub fn build(indexer: &'a Indexer, computer: &'a Computer) -> Self { + pub fn build(indexer: &Indexer, computer: &Computer) -> Self { + let indexer = indexer.static_clone(); + let computer = computer.static_clone(); + let vecs = Vecs::build(indexer, computer); + Self { - vecs: Vecs::build(indexer, computer), - _indexer: indexer, - _computer: computer, + vecs, + indexer, + computer, } } pub fn get_height(&self) -> Height { - Height::from(u64::from(self._indexer.vecs.height_to_blockhash.stamp())) + Height::from(self.indexer.vecs.height_to_blockhash.stamp()) } pub fn search(&self, params: &Params) -> Vec<(String, &&dyn AnyCollectableVec)> { diff --git a/crates/brk_logger/Cargo.toml b/crates/brk_logger/Cargo.toml index a67c2941a..23054c86d 100644 --- a/crates/brk_logger/Cargo.toml +++ b/crates/brk_logger/Cargo.toml @@ -11,4 +11,4 @@ repository.workspace = true env_logger = "0.11.8" jiff = { workspace = true } log = { workspace = true } -owo-colors = { workspace = true } +owo-colors = "4.2.2" diff --git a/crates/brk_logger/src/lib.rs b/crates/brk_logger/src/lib.rs index 892f185d0..b94a15e19 100644 --- a/crates/brk_logger/src/lib.rs +++ b/crates/brk_logger/src/lib.rs @@ -12,7 +12,7 @@ use std::{ use env_logger::{Builder, Env}; use jiff::{Timestamp, tz}; -use owo_colors::OwoColorize; +pub use owo_colors::OwoColorize; #[inline] pub fn init(path: Option<&Path>) { diff --git a/crates/brk_server/Cargo.toml b/crates/brk_server/Cargo.toml index ed2a8239b..8a150ca9d 100644 --- a/crates/brk_server/Cargo.toml +++ b/crates/brk_server/Cargo.toml @@ -10,7 +10,6 @@ repository.workspace = true [dependencies] axum = { workspace = true } bitcoincore-rpc = { workspace = true } -brk_bundler = { workspace = true } brk_computer = { workspace = true } brk_error = { workspace = true } brk_fetcher = { workspace = true } @@ -20,19 +19,13 @@ brk_logger = { workspace = true } brk_mcp = { workspace = true } brk_parser = { workspace = true } brk_vecs = { workspace = true } -clap = { workspace = true } -clap_derive = { workspace = true } jiff = { workspace = true } log = { workspace = true } -minreq = { workspace = true } -owo-colors = { workspace = true } -quick_cache = "0.6.15" -serde = { workspace = true } +quick_cache = "0.6.16" serde_json = { workspace = true } tokio = { workspace = true } tower-http = { version = "0.6.6", features = ["compression-full", "trace"] } tracing = "0.1.41" -zip = { version = "4.3.0", default-features = false, features = ["deflate"] } [package.metadata.cargo-machete] ignored = ["clap"] diff --git a/crates/brk_server/examples/main.rs b/crates/brk_server/examples/main.rs index b100fc64e..a902271da 100644 --- a/crates/brk_server/examples/main.rs +++ b/crates/brk_server/examples/main.rs @@ -6,8 +6,9 @@ use brk_computer::Computer; use brk_error::Result; use brk_fetcher::Fetcher; use brk_indexer::Indexer; +use brk_interface::Interface; use brk_parser::Parser; -use brk_server::{Server, Website}; +use brk_server::Server; use brk_vecs::Exit; pub fn main() -> Result<()> { @@ -39,18 +40,12 @@ pub fn main() -> Result<()> { .enable_all() .build()? .block_on(async { - let served_indexer = indexer.clone(); - let served_computer = computer.clone(); + let interface = Interface::build(&indexer, &computer); - let server = Server::new( - served_indexer, - served_computer, - Website::Default, - Path::new(""), - )?; + let server = Server::new(interface, None); let server = tokio::spawn(async move { - server.serve(true, true).await.unwrap(); + server.serve(true).await.unwrap(); }); if process { diff --git a/crates/brk_server/src/api/explorer/mod.rs b/crates/brk_server/src/api/explorer.rs similarity index 100% rename from crates/brk_server/src/api/explorer/mod.rs rename to crates/brk_server/src/api/explorer.rs diff --git a/crates/brk_server/src/api/interface/mod.rs b/crates/brk_server/src/api/interface.rs similarity index 51% rename from crates/brk_server/src/api/interface/mod.rs rename to crates/brk_server/src/api/interface.rs index 282af2e52..81955d91a 100644 --- a/crates/brk_server/src/api/interface/mod.rs +++ b/crates/brk_server/src/api/interface.rs @@ -1,29 +1,30 @@ +use std::time::Duration; + use axum::{ Json, + body::Body, extract::{Query, State}, - http::{HeaderMap, StatusCode}, + http::{HeaderMap, StatusCode, Uri}, response::{IntoResponse, Response}, }; use brk_error::{Error, Result}; use brk_interface::{Format, Output, Params}; use brk_vecs::Stamp; +use quick_cache::sync::GuardResult; -use crate::traits::{HeaderMapExtended, ResponseExtended}; +use crate::{HeaderMapExtended, ResponseExtended}; use super::AppState; -mod bridge; - -pub use bridge::*; - const MAX_WEIGHT: usize = 320_000; pub async fn handler( + uri: Uri, headers: HeaderMap, query: Query, State(app_state): State, ) -> Response { - match req_to_response_res(headers, query, app_state) { + match req_to_response_res(uri, headers, query, app_state) { Ok(response) => response, Err(error) => { let mut response = @@ -35,9 +36,12 @@ pub async fn handler( } fn req_to_response_res( + uri: Uri, headers: HeaderMap, Query(params): Query, - AppState { interface, .. }: AppState, + AppState { + interface, cache, .. + }: AppState, ) -> Result { let vecs = interface.search(¶ms); @@ -67,7 +71,7 @@ fn req_to_response_res( .first() .unwrap() .1 - .etag(Stamp::from(u64::from(interface.get_height())), to); + .etag(Stamp::from(interface.get_height()), to); if headers .get_if_none_match() @@ -76,17 +80,51 @@ fn req_to_response_res( return Ok(Response::new_not_modified()); } - let output = interface.format(vecs, ¶ms.rest)?; + let guard_res = cache.get_value_or_guard( + &format!("{}{}{etag}", uri.path(), uri.query().unwrap_or("")), + Some(Duration::from_millis(500)), + ); - let mut response = match output { - Output::CSV(s) => s.into_response(), - Output::TSV(s) => s.into_response(), - Output::Json(v) => match v { - brk_interface::Value::Single(v) => Json(v).into_response(), - brk_interface::Value::List(v) => Json(v).into_response(), - brk_interface::Value::Matrix(v) => Json(v).into_response(), - }, - Output::MD(s) => s.into_response(), + let mut response = if let GuardResult::Value(v) = guard_res { + Response::new(Body::from(v)) + } else { + match interface.format(vecs, ¶ms.rest)? { + Output::CSV(s) => { + if let GuardResult::Guard(g) = guard_res { + g.insert(s.clone().into()) + .map_err(|_| Error::QuickCacheError)?; + } + s.into_response() + } + Output::TSV(s) => { + if let GuardResult::Guard(g) = guard_res { + g.insert(s.clone().into()) + .map_err(|_| Error::QuickCacheError)?; + } + s.into_response() + } + Output::MD(s) => { + if let GuardResult::Guard(g) = guard_res { + g.insert(s.clone().into()) + .map_err(|_| Error::QuickCacheError)?; + } + s.into_response() + } + Output::Json(v) => { + let json = match v { + brk_interface::Value::Single(v) => serde_json::to_vec(&v)?, + brk_interface::Value::List(v) => serde_json::to_vec(&v)?, + brk_interface::Value::Matrix(v) => serde_json::to_vec(&v)?, + }; + + if let GuardResult::Guard(g) = guard_res { + g.insert(json.clone().into()) + .map_err(|_| Error::QuickCacheError)?; + } + + json.into_response() + } + } }; let headers = response.headers_mut(); diff --git a/crates/brk_server/src/api/mod.rs b/crates/brk_server/src/api/mod.rs index 78c54e522..3e5fe5098 100644 --- a/crates/brk_server/src/api/mod.rs +++ b/crates/brk_server/src/api/mod.rs @@ -1,7 +1,7 @@ use axum::{ Json, Router, extract::{Path, Query, State}, - http::HeaderMap, + http::{HeaderMap, Uri}, response::{IntoResponse, Redirect, Response}, routing::get, }; @@ -12,8 +12,6 @@ use super::AppState; mod explorer; mod interface; -pub use interface::Bridge; - pub trait ApiRoutes { fn add_api_routes(self) -> Self; } @@ -87,7 +85,8 @@ impl ApiRoutes for Router { .route( "/api/vecs/{variant}", get( - async |headers: HeaderMap, + async |uri: Uri, + headers: HeaderMap, Path(variant): Path, Query(params_opt): Query, state: State| @@ -100,7 +99,7 @@ impl ApiRoutes for Router { (index, split.collect::>().join(TO_SEPARATOR)), params_opt, )); - interface::handler(headers, Query(params), state).await + interface::handler(uri, headers, Query(params), state).await } else { "Bad variant".into_response() } @@ -127,22 +126,3 @@ impl ApiRoutes for Router { ) } } - -// pub async fn variants_handler(State(app_state): State) -> Response { -// Json( -// app_state -// .query -// .vec_trees -// .index_to_id_to_vec -// .iter() -// .flat_map(|(index, id_to_vec)| { -// let index_ser = index.serialize_long(); -// id_to_vec -// .keys() -// .map(|id| format!("{}-to-{}", index_ser, id)) -// .collect::>() -// }) -// .collect::>(), -// ) -// .into_response() -// } diff --git a/crates/brk_server/src/traits/header_map.rs b/crates/brk_server/src/extended/header_map.rs similarity index 100% rename from crates/brk_server/src/traits/header_map.rs rename to crates/brk_server/src/extended/header_map.rs diff --git a/crates/brk_server/src/traits/mod.rs b/crates/brk_server/src/extended/mod.rs similarity index 100% rename from crates/brk_server/src/traits/mod.rs rename to crates/brk_server/src/extended/mod.rs diff --git a/crates/brk_server/src/traits/response.rs b/crates/brk_server/src/extended/response.rs similarity index 100% rename from crates/brk_server/src/traits/response.rs rename to crates/brk_server/src/extended/response.rs diff --git a/crates/brk_server/src/files/file.rs b/crates/brk_server/src/files/file.rs index 61fab5a00..12686cdb3 100644 --- a/crates/brk_server/src/files/file.rs +++ b/crates/brk_server/src/files/file.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::{fs, path::Path, time::Duration}; use axum::{ body::Body, @@ -6,13 +6,11 @@ use axum::{ http::{HeaderMap, StatusCode}, response::{IntoResponse, Response}, }; -use brk_error::Result; +use brk_error::{Error, Result}; use log::{error, info}; +use quick_cache::sync::GuardResult; -use crate::{ - AppState, - traits::{HeaderMapExtended, ModifiedState, ResponseExtended}, -}; +use crate::{AppState, HeaderMapExtended, ModifiedState, ResponseExtended}; pub async fn file_handler( headers: HeaderMap, @@ -31,12 +29,12 @@ fn any_handler( app_state: AppState, path: Option>, ) -> Response { - let dist_path = app_state.dist_path(); + let files_path = app_state.path.as_ref().unwrap(); if let Some(path) = path.as_ref() { let path = path.0.replace("..", "").replace("\\", ""); - let mut path = dist_path.join(&path); + let mut path = files_path.join(&path); if !path.exists() || path.is_dir() { if path.extension().is_some() { @@ -50,18 +48,18 @@ fn any_handler( return response; } else { - path = dist_path.join("index.html"); + path = files_path.join("index.html"); } } - path_to_response(&headers, &path) + path_to_response(&headers, &app_state, &path) } else { - path_to_response(&headers, &dist_path.join("index.html")) + path_to_response(&headers, &app_state, &files_path.join("index.html")) } } -fn path_to_response(headers: &HeaderMap, path: &Path) -> Response { - match path_to_response_(headers, path) { +fn path_to_response(headers: &HeaderMap, app_state: &AppState, path: &Path) -> Response { + match path_to_response_(headers, app_state, path) { Ok(response) => response, Err(error) => { let mut response = @@ -74,32 +72,51 @@ fn path_to_response(headers: &HeaderMap, path: &Path) -> Response { } } -fn path_to_response_(headers: &HeaderMap, path: &Path) -> Result { +fn path_to_response_(headers: &HeaderMap, app_state: &AppState, path: &Path) -> Result { let (modified, date) = headers.check_if_modified_since(path)?; if modified == ModifiedState::NotModifiedSince { return Ok(Response::new_not_modified()); } - let content = fs::read(path).unwrap_or_else(|error| { - error!("{error}"); - let path = path.to_str().unwrap(); - info!("Can't read file {path}"); - panic!("") - }); + let serialized_path = path.to_str().unwrap(); - let mut response = Response::new(content.into()); + let must_revalidate = path + .extension() + .is_some_and(|extension| extension == "html") + || serialized_path.ends_with("service-worker.js"); + + let guard_res = if !must_revalidate { + Some(app_state.cache.get_value_or_guard( + &path.to_str().unwrap().to_owned(), + Some(Duration::from_millis(500)), + )) + } else { + None + }; + + let mut response = if let Some(GuardResult::Value(v)) = guard_res { + Response::new(Body::from(v)) + } else { + let content = fs::read(path).unwrap_or_else(|error| { + error!("{error}"); + let path = path.to_str().unwrap(); + info!("Can't read file {path}"); + panic!("") + }); + + if let Some(GuardResult::Guard(g)) = guard_res { + g.insert(content.clone().into()) + .map_err(|_| Error::QuickCacheError)?; + } + + Response::new(content.into()) + }; let headers = response.headers_mut(); headers.insert_cors(); headers.insert_content_type(path); - let serialized_path = path.to_str().unwrap(); - - if path - .extension() - .is_some_and(|extension| extension == "html") - || serialized_path.ends_with("service-worker.js") - { + if must_revalidate { headers.insert_cache_control_must_revalidate(); } else if path.extension().is_some_and(|extension| { extension == "jpg" diff --git a/crates/brk_server/src/files/mod.rs b/crates/brk_server/src/files/mod.rs index 6ea032579..71b31082b 100644 --- a/crates/brk_server/src/files/mod.rs +++ b/crates/brk_server/src/files/mod.rs @@ -1,20 +1,20 @@ +use std::path::PathBuf; + use axum::{Router, routing::get}; use super::AppState; mod file; -mod website; use file::{file_handler, index_handler}; -pub use website::Website; pub trait FilesRoutes { - fn add_website_routes(self, website: Website) -> Self; + fn add_files_routes(self, path: Option<&PathBuf>) -> Self; } impl FilesRoutes for Router { - fn add_website_routes(self, website: Website) -> Self { - if website.is_some() { + fn add_files_routes(self, path: Option<&PathBuf>) -> Self { + if path.is_some() { self.route("/{*path}", get(file_handler)) .route("/", get(index_handler)) } else { diff --git a/crates/brk_server/src/lib.rs b/crates/brk_server/src/lib.rs index 01ff86d84..8d1f504bc 100644 --- a/crates/brk_server/src/lib.rs +++ b/crates/brk_server/src/lib.rs @@ -3,124 +3,57 @@ #![doc = include_str!("../examples/main.rs")] #![doc = "```"] -use std::{ - fs, - io::Cursor, - path::{Path, PathBuf}, - time::Duration, -}; +use std::{path::PathBuf, sync::Arc, time::Duration}; -use api::{ApiRoutes, Bridge}; +use api::ApiRoutes; use axum::{ Json, Router, - body::Body, + body::{Body, Bytes}, http::{Request, Response, StatusCode, Uri}, middleware::Next, routing::get, serve, }; -use brk_bundler::bundle; -use brk_computer::Computer; use brk_error::Result; -use brk_indexer::Indexer; use brk_interface::Interface; +use brk_logger::OwoColorize; use brk_mcp::route::MCPRoutes; use files::FilesRoutes; use log::{error, info}; -use owo_colors::OwoColorize; +use quick_cache::sync::Cache; use tokio::net::TcpListener; use tower_http::{compression::CompressionLayer, trace::TraceLayer}; +use tracing::Span; mod api; +mod extended; mod files; -mod traits; -pub use files::Website; -use tracing::Span; +use extended::*; #[derive(Clone)] pub struct AppState { interface: &'static Interface<'static>, - website: Website, - websites_path: Option, -} - -impl AppState { - pub fn dist_path(&self) -> PathBuf { - self.websites_path - .as_ref() - .expect("Should never reach here is websites_path is None") - .join("dist") - } + path: Option, + cache: Arc>, } pub const VERSION: &str = env!("CARGO_PKG_VERSION"); -const DEV_PATH: &str = "../.."; -const WEBSITES: &str = "websites"; - pub struct Server(AppState); impl Server { - pub fn new( - indexer: Indexer, - computer: Computer, - website: Website, - downloads_path: &Path, - ) -> Result { - let indexer = Box::leak(Box::new(indexer)); - let computer = Box::leak(Box::new(computer)); - let interface = Box::leak(Box::new(Interface::build(indexer, computer))); - - let websites_path = if website.is_some() { - let websites_dev_path = Path::new(DEV_PATH).join(WEBSITES); - - let websites_path = if fs::exists(&websites_dev_path)? { - websites_dev_path - } else { - let downloaded_websites_path = - downloads_path.join(format!("brk-{VERSION}")).join(WEBSITES); - - if !fs::exists(&downloaded_websites_path)? { - info!("Downloading websites from Github..."); - - let url = format!( - "https://github.com/bitcoinresearchkit/brk/archive/refs/tags/v{VERSION}.zip", - ); - - let response = minreq::get(url).send()?; - let bytes = response.as_bytes(); - let cursor = Cursor::new(bytes); - - let mut zip = zip::ZipArchive::new(cursor).unwrap(); - - zip.extract(downloads_path).unwrap(); - } - - downloaded_websites_path - }; - - interface.generate_bridge_file(website, websites_path.as_path())?; - - Some(websites_path) - } else { - None - }; - - Ok(Self(AppState { - interface, - website, - websites_path, - })) + pub fn new(interface: Interface<'static>, files_path: Option) -> Self { + Self(AppState { + interface: Box::leak(Box::new(interface)), + path: files_path, + cache: Arc::new(Cache::new(10_000)), + }) } - pub async fn serve(self, watch: bool, mcp: bool) -> Result<()> { + pub async fn serve(self, mcp: bool) -> Result<()> { let state = self.0; - if let Some(websites_path) = state.websites_path.clone() { - bundle(&websites_path, state.website.to_folder_name(), watch).await?; - } - let compression_layer = CompressionLayer::new() .br(true) .deflate(true) @@ -162,7 +95,7 @@ impl Server { let router = Router::new() .add_api_routes() - .add_website_routes(state.website) + .add_files_routes(state.path.as_ref()) .add_mcp_routes(state.interface, mcp) .route("/version", get(Json(VERSION))) .with_state(state) diff --git a/crates/brk_structs/Cargo.toml b/crates/brk_structs/Cargo.toml index 09a14e324..76501a417 100644 --- a/crates/brk_structs/Cargo.toml +++ b/crates/brk_structs/Cargo.toml @@ -14,12 +14,10 @@ brk_error = {workspace = true} brk_vecs = {workspace = true} byteview = { workspace = true } derive_deref = { workspace = true } -fjall = { workspace = true } jiff = { workspace = true } rapidhash = "2.0.2" serde = { workspace = true } serde_bytes = { workspace = true } -serde_json = { workspace = true } zerocopy = { workspace = true } zerocopy-derive = { workspace = true } diff --git a/crates/brk_vecs/.gitignore b/crates/brk_vecs/.gitignore index 75c17868d..bb90405e8 100644 --- a/crates/brk_vecs/.gitignore +++ b/crates/brk_vecs/.gitignore @@ -1,2 +1,3 @@ /vecs /raw +/compressed diff --git a/crates/brk_vecs/Cargo.toml b/crates/brk_vecs/Cargo.toml index 3f707206d..431f69681 100644 --- a/crates/brk_vecs/Cargo.toml +++ b/crates/brk_vecs/Cargo.toml @@ -18,6 +18,7 @@ memmap2 = "0.9.7" parking_lot = { workspace = true } rayon = { workspace = true } serde = { workspace = true } +serde_derive = { workspace = true } serde_json = { workspace = true } zerocopy = { workspace = true } zerocopy-derive = { workspace = true } diff --git a/crates/brk_vecs/examples/compressed.rs b/crates/brk_vecs/examples/compressed.rs new file mode 100644 index 000000000..8128b2d5f --- /dev/null +++ b/crates/brk_vecs/examples/compressed.rs @@ -0,0 +1,153 @@ +use std::{fs, path::Path, sync::Arc}; + +use brk_vecs::{ + AnyStoredVec, AnyVec, CollectableVec, CompressedVec, File, GenericStoredVec, Stamp, + VecIterator, Version, +}; + +#[allow(clippy::upper_case_acronyms)] +type VEC = CompressedVec; + +fn main() -> Result<(), Box> { + let _ = fs::remove_dir_all("compressed"); + + let version = Version::TWO; + + let file = Arc::new(File::open(Path::new("compressed"))?); + + { + let mut vec: VEC = CompressedVec::forced_import(&file, "vec", version)?; + + (0..21_u32).for_each(|v| { + vec.push(v); + }); + + let mut iter = vec.into_iter(); + dbg!(iter.get(0)); + dbg!(iter.get(1)); + dbg!(iter.get(2)); + dbg!(iter.get(20)); + dbg!(iter.get(21)); + drop(iter); + + dbg!("flush"); + vec.flush()?; + dbg!("flushed"); + + dbg!(vec.header()); + } + + { + let mut vec: VEC = CompressedVec::forced_import(&file, "vec", version)?; + + vec.mut_header().update_stamp(Stamp::new(100)); + + let mut iter = vec.into_iter(); + dbg!(iter.get(0)); + dbg!(iter.get(1)); + dbg!(iter.get(2)); + dbg!(iter.get(3)); + dbg!(iter.get(4)); + dbg!(iter.get(5)); + dbg!(iter.get(20)); + dbg!(iter.get(20)); + dbg!(iter.get(0)); + drop(iter); + + vec.push(21); + vec.push(22); + + let mut iter = vec.into_iter(); + dbg!(iter.get(20)); + dbg!(iter.get(21)); + dbg!(iter.get(22)); + dbg!(iter.get(23)); + drop(iter); + + vec.flush()?; + } + + { + let mut vec: VEC = CompressedVec::forced_import(&file, "vec", version)?; + + let mut iter = vec.into_iter(); + dbg!(iter.get(0)); + dbg!(iter.get(20)); + dbg!(iter.get(21)); + dbg!(iter.get(22)); + drop(iter); + + vec.truncate_if_needed(14)?; + + let mut iter = vec.into_iter(); + dbg!(iter.get(0)); + dbg!(iter.get(5)); + dbg!(iter.get(20)); + drop(iter); + + dbg!(vec.collect_signed_range(Some(-5), None)?); + + vec.push(vec.len() as u32); + dbg!(VecIterator::last(vec.into_iter())); + + dbg!(vec.into_iter().collect::>()); + } + + { + let mut vec: VEC = CompressedVec::forced_import(&file, "vec", version)?; + + vec.reset()?; + + dbg!(vec.header(), vec.pushed_len(), vec.stored_len(), vec.len()); + + (0..21_u32).for_each(|v| { + vec.push(v); + }); + + let mut iter = vec.into_iter(); + dbg!(iter.get(0)); + dbg!(iter.get(20)); + dbg!(iter.get(21)); + drop(iter); + + // let reader = vec.create_static_reader(); + // dbg!(vec.take(10, &reader)?); + // dbg!(vec.get_or_read(10, &reader)?); + // dbg!(vec.holes()); + // drop(reader); + + vec.flush()?; + dbg!(vec.holes()); + } + + { + let mut vec: VEC = CompressedVec::forced_import(&file, "vec", version)?; + + dbg!(vec.holes()); + + let reader = vec.create_static_reader(); + dbg!(vec.get_or_read(10, &reader)?); + drop(reader); + + // vec.update(10, 10)?; + // vec.update(0, 10)?; + + let reader = vec.create_static_reader(); + dbg!( + vec.holes(), + vec.get_or_read(0, &reader)?, + vec.get_or_read(10, &reader)? + ); + drop(reader); + + vec.flush()?; + } + + { + let vec: VEC = CompressedVec::forced_import(&file, "vec", version)?; + + dbg!(vec.collect()?); + } + + Ok(()) +} diff --git a/crates/brk_vecs/examples/file.rs b/crates/brk_vecs/examples/file.rs index 2678c8168..ac6f3812b 100644 --- a/crates/brk_vecs/examples/file.rs +++ b/crates/brk_vecs/examples/file.rs @@ -99,6 +99,8 @@ fn main() -> Result<()> { ); } + dbg!(1); + file.write_all_to_region_at(region1_i.into(), &[1; 8000], 0)?; { @@ -146,7 +148,7 @@ fn main() -> Result<()> { let layout = file.layout(); assert!(layout.start_to_index().is_empty()); - assert!(layout.start_to_hole().is_empty()); + assert!(layout.start_to_hole().len() == 1); } let (region1_i, _) = file.create_region_if_needed("region1")?; diff --git a/crates/brk_vecs/examples/raw.rs b/crates/brk_vecs/examples/raw.rs index fbdd6beae..6ca1069a6 100644 --- a/crates/brk_vecs/examples/raw.rs +++ b/crates/brk_vecs/examples/raw.rs @@ -12,8 +12,7 @@ fn main() -> Result<(), Box> { let _ = fs::remove_dir_all("raw"); let version = Version::TWO; - // let format = Format::Raw; - // + let file = Arc::new(File::open(Path::new("raw"))?); { diff --git a/crates/brk_vecs/src/file/layout.rs b/crates/brk_vecs/src/file/layout.rs index c13259835..d12154513 100644 --- a/crates/brk_vecs/src/file/layout.rs +++ b/crates/brk_vecs/src/file/layout.rs @@ -195,4 +195,8 @@ impl Layout { unreachable!(); } } + + pub fn reserved(&mut self, start: u64) -> Option { + self.start_to_reserved.remove(&start) + } } diff --git a/crates/brk_vecs/src/file/mod.rs b/crates/brk_vecs/src/file/mod.rs index 001a9de65..a70d9fd45 100644 --- a/crates/brk_vecs/src/file/mod.rs +++ b/crates/brk_vecs/src/file/mod.rs @@ -178,28 +178,27 @@ impl File { let data_len = data.len() as u64; let new_len = at.map_or(len + data_len, |at| (at + data_len).max(len)); + let write_start = start + at.unwrap_or(len); + if at.is_some_and(|at| at >= start + reserved) { return Err(Error::Str("Invalid at parameter")); } // Write to reserved space if possible if new_len <= reserved { - // println!( - // "Write to {region_index} reserved space at {}", - // start + at.unwrap_or(len) - // ); + println!("Write to {region_index} reserved space at {write_start}"); if at.is_none() { - self.write(start + len, data); + self.write(write_start, data); } let mut region = region_lock.write(); - if let Some(at) = at { - self.write(start + at, data); - } - if len != new_len { - region.set_len(new_len); + + if at.is_some() { + self.write(write_start, data); } + + region.set_len(new_len); regions.write_to_mmap(®ion, region_index); return Ok(()); @@ -217,10 +216,7 @@ impl File { // If is last continue writing if layout.is_last_anything(region_index) { - // println!( - // "{region_index} Append to file at {}", - // start + at.unwrap_or(len) - // ); + println!("{region_index} Append to file at {write_start}"); self.set_min_len(start + new_reserved)?; let mut region = region_lock.write(); @@ -228,7 +224,7 @@ impl File { drop(region); drop(layout); - self.write(start + len, data); + self.write(write_start, data); let mut region = region_lock.write(); region.set_len(new_len); @@ -243,7 +239,7 @@ impl File { .get_hole(hole_start) .is_some_and(|gap| gap >= added_reserve) { - // println!("Expand {region_index} to hole"); + println!("Expand {region_index} to hole"); layout.remove_or_compress_hole(hole_start, added_reserve); let mut region = region_lock.write(); @@ -251,7 +247,7 @@ impl File { drop(region); drop(layout); - self.write(start + len, data); + self.write(write_start, data); let mut region = region_lock.write(); region.set_len(new_len); @@ -262,16 +258,16 @@ impl File { // Find hole big enough to move the region if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) { - // println!("Move {region_index} to hole at {hole_start}"); + println!("Move {region_index} to hole at {hole_start}"); layout.remove_or_compress_hole(hole_start, new_reserved); drop(layout); self.write( hole_start, - &self.mmap.read()[start as usize..(start + len) as usize], + &self.mmap.read()[start as usize..write_start as usize], ); - self.write(hole_start + len, data); + self.write(hole_start + at.unwrap_or(len), data); let mut region = region_lock.write(); let mut layout = self.layout.write(); @@ -288,24 +284,25 @@ impl File { let new_start = layout.len(®ions); // Write at the end - // println!( - // "Move {region_index} to the end, from {start}..{} to {new_start}..{}", - // start + reserved, - // new_start + new_reserved - // ); + println!( + "Move {region_index} to the end, from {start}..{} to {new_start}..{}", + start + reserved, + new_start + new_reserved + ); self.set_min_len(new_start + new_reserved)?; layout.reserve(new_start, new_reserved); drop(layout); self.write( new_start, - &self.mmap.read()[start as usize..(start + len) as usize], + &self.mmap.read()[start as usize..write_start as usize], ); - self.write(new_start + len, data); + self.write(new_start + at.unwrap_or(len), data); let mut region = region_lock.write(); let mut layout = self.layout.write(); layout.move_region(new_start, region_index, ®ion)?; + assert!(layout.reserved(new_start) == Some(new_reserved)); drop(layout); region.set_start(new_start); diff --git a/crates/brk_vecs/src/lib.rs b/crates/brk_vecs/src/lib.rs index bc49cc1cd..a23f34d6c 100644 --- a/crates/brk_vecs/src/lib.rs +++ b/crates/brk_vecs/src/lib.rs @@ -2,7 +2,7 @@ #![doc = "\n## Examples\n"] #![doc = "\n### File\n\n```rust"] #![doc = include_str!("../examples/file.rs")] -#![doc = "```"] +#![doc = "```\n"] #![doc = "\n### Raw\n\n```rust"] #![doc = include_str!("../examples/raw.rs")] #![doc = "```"] diff --git a/crates/brk_vecs/src/variants/compressed/compressed_page_meta.rs b/crates/brk_vecs/src/variants/compressed/compressed_page_meta.rs deleted file mode 100644 index a0b18cf7e..000000000 --- a/crates/brk_vecs/src/variants/compressed/compressed_page_meta.rs +++ /dev/null @@ -1,18 +0,0 @@ -use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; - -#[derive(Debug, Clone, IntoBytes, Immutable, FromBytes, KnownLayout)] -pub struct CompressedPageMetadata { - pub start: u64, - pub bytes_len: u32, - pub values_len: u32, -} - -impl CompressedPageMetadata { - pub fn new(start: u64, bytes_len: u32, values_len: u32) -> Self { - Self { - start, - bytes_len, - values_len, - } - } -} diff --git a/crates/brk_vecs/src/variants/compressed/compressed_pages_meta.rs b/crates/brk_vecs/src/variants/compressed/compressed_pages_meta.rs deleted file mode 100644 index 941def5e7..000000000 --- a/crates/brk_vecs/src/variants/compressed/compressed_pages_meta.rs +++ /dev/null @@ -1,118 +0,0 @@ -use std::{ - fs::{self, OpenOptions}, - io::{self, Seek, SeekFrom, Write}, - path::{Path, PathBuf}, -}; - -use rayon::prelude::*; -use zerocopy::{IntoBytes, TryFromBytes}; - -use crate::Result; - -use super::{CompressedPageMetadata, UnsafeSlice}; - -#[derive(Debug, Clone)] -pub struct CompressedPagesMetadata { - vec: Vec, - change_at: Option, - path: PathBuf, -} - -impl CompressedPagesMetadata { - const PAGE_SIZE: usize = size_of::(); - - pub fn read(path: &Path) -> Result { - let this = Self { - vec: fs::read(path) - .unwrap_or_default() - .chunks(Self::PAGE_SIZE) - .map(|bytes| { - if bytes.len() != Self::PAGE_SIZE { - panic!() - } - CompressedPageMetadata::try_read_from_bytes(bytes).unwrap() - }) - .collect::>(), - path: path.to_owned(), - change_at: None, - }; - - Ok(this) - } - - pub fn write(&mut self) -> io::Result<()> { - if self.change_at.is_none() { - return Ok(()); - } - - let change_at = self.change_at.take().unwrap(); - - let len = (self.vec.len() - change_at) * Self::PAGE_SIZE; - - let mut bytes: Vec = vec![0; len]; - - let unsafe_bytes = UnsafeSlice::new(&mut bytes); - - self.vec[change_at..] - .par_iter() - .enumerate() - .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::PAGE_SIZE, v.as_bytes())); - - let mut file = OpenOptions::new() - .read(true) - .create(true) - .truncate(false) - .append(true) - .open(&self.path)?; - - file.set_len((change_at * Self::PAGE_SIZE) as u64)?; - file.seek(SeekFrom::End(0))?; - - file.write_all(&bytes)?; - - Ok(()) - } - - pub fn len(&self) -> usize { - self.vec.len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn get(&self, page_index: usize) -> Option<&CompressedPageMetadata> { - self.vec.get(page_index) - } - - pub fn last(&self) -> Option<&CompressedPageMetadata> { - self.vec.last() - } - - pub fn pop(&mut self) -> Option { - self.vec.pop() - } - - pub fn push(&mut self, page_index: usize, page: CompressedPageMetadata) { - if page_index != self.vec.len() { - panic!(); - } - - self.set_changed_at(page_index); - - self.vec.push(page); - } - - fn set_changed_at(&mut self, page_index: usize) { - if self.change_at.is_none_or(|pi| pi > page_index) { - self.change_at.replace(page_index); - } - } - - pub fn truncate(&mut self, page_index: usize) -> Option { - let page = self.get(page_index).cloned(); - self.vec.truncate(page_index); - self.set_changed_at(page_index); - page - } -} diff --git a/crates/brk_vecs/src/variants/compressed/mod.rs b/crates/brk_vecs/src/variants/compressed/mod.rs index 32af408c0..e6a9d6212 100644 --- a/crates/brk_vecs/src/variants/compressed/mod.rs +++ b/crates/brk_vecs/src/variants/compressed/mod.rs @@ -1,37 +1,35 @@ use std::{ borrow::Cow, collections::{BTreeMap, BTreeSet}, - fs, mem, + mem, sync::Arc, }; -use memmap2::Mmap; use parking_lot::{RwLock, RwLockReadGuard}; -use pco::data_types::Number; use rayon::prelude::*; use crate::{ AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, AsInnerSlice, BaseVecIterator, - BoxedVecIterator, CollectableVec, Error, File, FromInnerSlice, GenericStoredVec, HEADER_OFFSET, - Header, RawVec, Reader, Result, StoredCompressed, StoredIndex, UnsafeSlice, Version, + BoxedVecIterator, CollectableVec, Error, File, Format, FromInnerSlice, GenericStoredVec, + HEADER_OFFSET, Header, RawVec, Reader, Result, StoredCompressed, StoredIndex, Version, }; -mod compressed_page_meta; -mod compressed_pages_meta; +mod page; +mod pages; -use compressed_page_meta::*; -use compressed_pages_meta::*; +use page::*; +use pages::*; const ONE_KIB: usize = 1024; const MAX_PAGE_SIZE: usize = 16 * ONE_KIB; const PCO_COMPRESSION_LEVEL: usize = 4; -const VERSION: Version = Version::TWO; +const VERSION: Version = Version::ONE; #[derive(Debug)] pub struct CompressedVec { inner: RawVec, - pages_meta: Arc>, + pages: Arc>, } impl CompressedVec @@ -39,8 +37,7 @@ where I: StoredIndex, T: StoredCompressed, { - pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; - pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; + const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; /// Same as import but will reset the vec under certain errors, so be careful ! pub fn forced_import(file: &Arc, name: &str, mut version: Version) -> Result { @@ -51,73 +48,50 @@ where | Err(Error::WrongEndian) | Err(Error::WrongLength) | Err(Error::DifferentVersion { .. }) => { - todo!(); - - // let path = Self::path_(file, name); - // fs::remove_file(path)?; - // Self::import(file, name, version) + let _ = file.remove_region(Self::vec_region_name_(name).into()); + let _ = file.remove_region(Self::holes_region_name_(name).into()); + let _ = file.remove_region(Self::pages_region_name_(name).into()); + Self::import(file, name, version) } _ => res, } } - #[allow(unreachable_code, unused_variables)] pub fn import(file: &Arc, name: &str, version: Version) -> Result { - // let mut inner = RawVec::import(file, name, version)?; + let inner = RawVec::import_(file, name, version, Format::Compressed)?; - todo!(); + let pages = Pages::import(file, &Self::pages_region_name_(name))?; - // let pages_meta = { - // let path = inner - // .folder() - // .join(format!("{}-pages-meta", I::to_string())); - // if inner.is_empty() { - // let _ = fs::remove_file(&path); - // } - // CompressedPagesMetadata::read(&path)? - // }; - - // inner.set_stored_len(if let Some(last) = pages_meta.last() { - // (pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize - // } else { - // 0 - // }); - - // Ok(Self { - // inner, - // pages_meta: Arc::new(RwLock::new(pages_meta)), - // }) + Ok(Self { + inner, + pages: Arc::new(RwLock::new(pages)), + }) } fn decode_page(&self, page_index: usize, reader: &Reader) -> Result> { - Self::decode_page_( - self.stored_len(), - page_index, - reader, - &self.pages_meta.read(), - ) + Self::decode_page_(self.stored_len(), page_index, reader, &self.pages.read()) } fn decode_page_( stored_len: usize, page_index: usize, reader: &Reader, - compressed_pages_meta: &CompressedPagesMetadata, + pages: &Pages, ) -> Result> { if Self::page_index_to_index(page_index) >= stored_len { return Err(Error::IndexTooHigh); - } else if compressed_pages_meta.len() <= page_index { + } else if page_index >= pages.len() { return Err(Error::ExpectVecToHaveIndex); } - let page = compressed_pages_meta.get(page_index).unwrap(); - let len = page.bytes_len as usize; - let offset = page.start as usize; + let page = pages.get(page_index).unwrap(); + let len = page.bytes as u64; + let offset = page.start; - let slice: &[u8] = reader.read(offset as u64, (offset + len) as u64); + let slice = reader.read(offset, len); let vec: Vec = pco::standalone::simple_decompress(slice)?; - let vec: Vec = T::from_inner_slice(vec); + let vec = T::from_inner_slice(vec); Ok(vec) } @@ -156,13 +130,17 @@ where iter.set_(i); iter } + + fn pages_region_name_(name: &str) -> String { + format!("{}_pages", Self::vec_region_name_(name)) + } } impl Clone for CompressedVec { fn clone(&self) -> Self { Self { inner: self.inner.clone(), - pages_meta: self.pages_meta.clone(), + pages: self.pages.clone(), } } } @@ -221,92 +199,87 @@ where #[inline] fn stored_len(&self) -> usize { - self.inner.stored_len() + self.pages.read().stored_len(Self::PER_PAGE) } fn flush(&mut self) -> Result<()> { - todo!(); + self.inner.write_header_if_needed()?; - // let file_opt = self.inner.write_header_if_needed()?; + let pushed_len = self.pushed_len(); - // let pushed_len = self.pushed_len(); + let has_new_data = pushed_len != 0; - // if pushed_len == 0 { - // return Ok(()); - // } + if !has_new_data { + return Ok(()); + } - // let stored_len = self.stored_len(); + let stored_len = self.stored_len(); - // let mut file = file_opt.unwrap_or(self.open_file()?); + let mut pages = self.pages.write(); - // let mut pages_meta = self.pages_meta.read(); + let mut starting_page_index = pages.len(); + let mut values = vec![]; + let mut truncate_at = None; - // let mut starting_page_index = pages_meta.len(); - // let mut values = vec![]; - // let mut truncate_at = None; + if stored_len % Self::PER_PAGE != 0 { + if pages.is_empty() { + unreachable!() + } - // if self.stored_len() % Self::PER_PAGE != 0 { - // if pages_meta.is_empty() { - // unreachable!() - // } + let last_page_index = pages.len() - 1; - // let last_page_index = pages_meta.len() - 1; + let reader = self.create_reader(); - // let mmap = unsafe { Mmap::map(&file)? }; + values = Self::decode_page_(stored_len, last_page_index, &reader, &pages) + .inspect_err(|_| { + dbg!(last_page_index, &pages); + }) + .unwrap(); - // values = Self::decode_page_(stored_len, last_page_index, &mmap, &pages_meta) - // .inspect_err(|_| { - // dbg!(last_page_index, &pages_meta); - // }) - // .unwrap(); + truncate_at.replace(pages.pop().unwrap().start); + starting_page_index = last_page_index; + } - // truncate_at.replace(pages_meta.pop().unwrap().start); - // starting_page_index = last_page_index; - // } + let compressed = values + .into_par_iter() + .chain(mem::take(self.inner.mut_pushed()).into_par_iter()) + .chunks(Self::PER_PAGE) + .map(|chunk| (Self::compress_page(chunk.as_slice()), chunk.len())) + .collect::>(); - // let compressed = values - // .into_par_iter() - // .chain(mem::take(self.mut_pushed()).into_par_iter()) - // .chunks(Self::PER_PAGE) - // .map(|chunk| (Self::compress_page(chunk.as_ref()), chunk.len())) - // .collect::>(); + compressed.iter().enumerate().for_each(|(i, (bytes, len))| { + let page_index = starting_page_index + i; - // compressed - // .iter() - // .enumerate() - // .for_each(|(i, (compressed_bytes, values_len))| { - // let page_index = starting_page_index + i; + let start = if page_index != 0 { + let prev = pages.get(page_index - 1).unwrap(); + prev.start + prev.bytes as u64 + } else { + 0 + }; - // let start = if page_index != 0 { - // let prev = pages_meta.get(page_index - 1).unwrap(); - // prev.start + prev.bytes_len as u64 - // } else { - // 0 - // }; - // let offsetted_start = start + HEADER_OFFSET as u64; + let page = Page::new( + start + HEADER_OFFSET as u64, + bytes.len() as u32, + *len as u32, + ); - // let bytes_len = compressed_bytes.len() as u32; - // let values_len = *values_len as u32; + pages.checked_push(page_index, page); + }); - // let page = CompressedPageMetadata::new(offsetted_start, bytes_len, values_len); + let buf = compressed + .into_iter() + .flat_map(|(v, _)| v) + .collect::>(); - // pages_meta.push(page_index, page); - // }); + let file = self.file(); - // let buf = compressed - // .into_iter() - // .flat_map(|(v, _)| v) - // .collect::>(); + if let Some(truncate_at) = truncate_at { + file.truncate_region(self.region_index().into(), truncate_at)?; + } - // pages_meta.write()?; + file.write_all_to_region(self.region_index().into(), &buf)?; - // if let Some(truncate_at) = truncate_at { - // self.file_set_len(&mut file, truncate_at)?; - // } - - // self.file_write_all(&mut file, &buf)?; - - // self.pages_meta.store(Arc::new(pages_meta)); + pages.flush(file)?; Ok(()) } @@ -321,11 +294,10 @@ where fn read_(&self, index: usize, reader: &Reader) -> Result { let page_index = Self::index_to_page_index(index); let decoded_index = index % Self::PER_PAGE; - Ok(unsafe { - self.decode_page(page_index, reader)? + *self + .decode_page(page_index, reader)? .get_unchecked(decoded_index) - .clone() }) } @@ -355,10 +327,8 @@ where } fn reset(&mut self) -> Result<()> { - // let mut pages_meta = (**self.pages_meta.load()).clone(); - // pages_meta.truncate(0); - // pages_meta.write()?; - // self.pages_meta.store(Arc::new(pages_meta)); + let file = self.file(); + self.pages.write().reset(file)?; self.reset_() } @@ -374,36 +344,47 @@ where return Ok(()); } - let mut pages_meta = self.pages_meta.write(); + let stored_len = self.stored_len(); + + let mut pages = self.pages.write(); + + let last_page_index = pages.len() - 1; let page_index = Self::index_to_page_index(index); - let reader = self.create_static_reader(); - let values = self.decode_page(page_index, &reader)?; - drop(reader); + let values = Self::decode_page_( + stored_len, + last_page_index, + &self.create_static_reader(), + &pages, + )?; let mut buf = vec![]; - let mut page = pages_meta.truncate(page_index).unwrap(); - - let len = page.start; + let mut page = pages.truncate(page_index).unwrap(); let decoded_index = index % Self::PER_PAGE; + let from = page.start; + if decoded_index != 0 { let chunk = &values[..decoded_index]; buf = Self::compress_page(chunk); - page.values_len = chunk.len() as u32; - page.bytes_len = buf.len() as u32; + page.values = chunk.len() as u32; + page.bytes = buf.len() as u32; - pages_meta.push(page_index, page); + pages.checked_push(page_index, page); } - pages_meta.write()?; + let file = self.file(); - // self.file_truncate_and_write_all(&mut file, len, &buf)?; + pages.flush(file)?; + + file.truncate_region(self.region_index().into(), from)?; + + file.write_all_to_region(self.region_index().into(), &buf)?; Ok(()) } @@ -413,8 +394,8 @@ where pub struct CompressedVecIterator<'a, I, T> { vec: &'a CompressedVec, reader: Reader<'a>, - decoded_page: Option<(usize, Vec)>, - pages_meta: RwLockReadGuard<'a, CompressedPagesMetadata>, + decoded: Option<(usize, Vec)>, + pages: RwLockReadGuard<'a, Pages>, stored_len: usize, index: usize, } @@ -472,23 +453,23 @@ where } else { let page_index = i / Self::PER_PAGE; - if self.decoded_page.as_ref().is_none_or(|b| b.0 != page_index) { + if self.decoded.as_ref().is_none_or(|b| b.0 != page_index) { let values = CompressedVec::::decode_page_( stored_len, page_index, &self.reader, - &self.pages_meta, + &self.pages, ) .unwrap(); - self.decoded_page.replace((page_index, values)); + self.decoded.replace((page_index, values)); } - self.decoded_page + self.decoded .as_ref() .unwrap() .1 .get(i % Self::PER_PAGE) - .map(|v| (I::from(i), Cow::Owned(v.clone()))) + .map(|v| (I::from(i), Cow::Owned(*v))) }; self.index += 1; @@ -506,14 +487,14 @@ where type IntoIter = CompressedVecIterator<'a, I, T>; fn into_iter(self) -> Self::IntoIter { - let pages_meta = self.pages_meta.read(); + let pages = self.pages.read(); let stored_len = self.stored_len(); CompressedVecIterator { vec: self, reader: self.create_static_reader(), - decoded_page: None, - pages_meta, + decoded: None, + pages, index: 0, stored_len, } diff --git a/crates/brk_vecs/src/variants/compressed/page.rs b/crates/brk_vecs/src/variants/compressed/page.rs new file mode 100644 index 000000000..33ea4ebc8 --- /dev/null +++ b/crates/brk_vecs/src/variants/compressed/page.rs @@ -0,0 +1,19 @@ +use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +#[derive(Debug, Clone, IntoBytes, Immutable, FromBytes, KnownLayout)] +#[repr(C)] +pub struct Page { + pub start: u64, + pub bytes: u32, + pub values: u32, +} + +impl Page { + pub fn new(start: u64, bytes: u32, values: u32) -> Self { + Self { + start, + bytes, + values, + } + } +} diff --git a/crates/brk_vecs/src/variants/compressed/pages.rs b/crates/brk_vecs/src/variants/compressed/pages.rs new file mode 100644 index 000000000..d837b056e --- /dev/null +++ b/crates/brk_vecs/src/variants/compressed/pages.rs @@ -0,0 +1,122 @@ +use std::sync::Arc; + +use parking_lot::RwLock; +use zerocopy::{FromBytes, IntoBytes}; + +use crate::{ + File, Result, + file::{Region, RegionReader}, +}; + +use super::Page; + +#[derive(Debug, Clone)] +pub struct Pages { + _region: Arc>, + region_index: usize, + + vec: Vec, + change_at: Option, +} + +impl Pages { + const SIZE_OF_PAGE: usize = size_of::(); + + pub fn import(file: &File, name: &str) -> Result { + let (region_index, _region) = file.create_region_if_needed(name)?; + + let vec = _region + .read() + .create_reader(file) + .read_all() + .chunks(Self::SIZE_OF_PAGE) + .map(|b| Page::read_from_bytes(b).map_err(|e| e.into())) + .collect::>()?; + + Ok(Self { + _region, + region_index, + vec, + change_at: None, + }) + } + + pub fn flush(&mut self, file: &File) -> Result<()> { + if self.change_at.is_none() { + return Ok(()); + } + + let change_at = self.change_at.take().unwrap(); + let at = (change_at * Self::SIZE_OF_PAGE) as u64; + + file.truncate_region(self.region_index.into(), at)?; + + file.write_all_to_region_at( + self.region_index.into(), + self.vec[change_at..].as_bytes(), + at, + )?; + + Ok(()) + } + + pub fn len(&self) -> usize { + self.vec.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn get(&self, page_index: usize) -> Option<&Page> { + self.vec.get(page_index) + } + + pub fn last(&self) -> Option<&Page> { + self.vec.last() + } + + pub fn pop(&mut self) -> Option { + let popped = self.vec.pop(); + if popped.is_some() { + self.set_changed_at(self.vec.len()); + } + popped + } + + pub fn checked_push(&mut self, page_index: usize, page: Page) { + if page_index != self.vec.len() { + panic!(); + } + + self.set_changed_at(page_index); + + self.vec.push(page); + } + + fn set_changed_at(&mut self, page_index: usize) { + if self.change_at.is_none_or(|pi| pi > page_index) { + self.change_at.replace(page_index); + } + } + + pub fn reset(&mut self, file: &File) -> Result<()> { + self.truncate(0); + self.flush(file) + } + + pub fn truncate(&mut self, page_index: usize) -> Option { + let page = self.get(page_index).cloned(); + self.vec.truncate(page_index); + self.set_changed_at(page_index); + page + } + + pub fn stored_len(&self, per_page: usize) -> usize { + if let Some(last) = self.last() { + (self.len() - 1) * per_page + last.values as usize + } else { + 0 + } + } +} diff --git a/crates/brk_vecs/src/variants/computed/computation.rs b/crates/brk_vecs/src/variants/computed/computation.rs index 0f6142e61..eceb25524 100644 --- a/crates/brk_vecs/src/variants/computed/computation.rs +++ b/crates/brk_vecs/src/variants/computed/computation.rs @@ -1,4 +1,4 @@ -use serde::{Deserialize, Serialize}; +use serde_derive::{Deserialize, Serialize}; #[derive(Default, Debug, PartialEq, PartialOrd, Ord, Eq, Clone, Copy, Serialize, Deserialize)] pub enum Computation { diff --git a/crates/brk_vecs/src/variants/raw/header.rs b/crates/brk_vecs/src/variants/raw/header.rs index 61aa9844d..ae9d3cf5f 100644 --- a/crates/brk_vecs/src/variants/raw/header.rs +++ b/crates/brk_vecs/src/variants/raw/header.rs @@ -51,6 +51,11 @@ impl Header { self.inner.write().stamp = stamp; } + pub fn update_format(&mut self, format: Format) { + self.modified = true; + self.inner.write().compressed = ZeroCopyBool::from(format); + } + pub fn update_computed_version(&mut self, computed_version: Version) { self.modified = true; self.inner.write().computed_version = computed_version; diff --git a/crates/brk_vecs/src/variants/raw/mod.rs b/crates/brk_vecs/src/variants/raw/mod.rs index 1c242e161..4f8588193 100644 --- a/crates/brk_vecs/src/variants/raw/mod.rs +++ b/crates/brk_vecs/src/variants/raw/mod.rs @@ -7,8 +7,7 @@ use std::{ }; use parking_lot::RwLock; -use rayon::prelude::*; -use zerocopy::FromBytes; +use zerocopy::{FromBytes, IntoBytes}; use crate::{ AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, BaseVecIterator, BoxedVecIterator, @@ -19,10 +18,8 @@ use crate::{ use super::Format; mod header; -mod unsafe_slice; pub use header::*; -pub use unsafe_slice::*; const VERSION: Version = Version::ONE; @@ -64,25 +61,24 @@ where } pub fn import(file: &Arc, name: &str, version: Version) -> Result { + Self::import_(file, name, version, Format::Raw) + } + + pub fn import_(file: &Arc, name: &str, version: Version, format: Format) -> Result { let (region_index, region) = file.create_region_if_needed(&Self::vec_region_name_(name))?; let region_len = region.read().len() as usize; if region_len > 0 - && (region_len < HEADER_OFFSET || (region_len - HEADER_OFFSET) % Self::SIZE_OF_T != 0) + && (region_len < HEADER_OFFSET + || (format.is_raw() && (region_len - HEADER_OFFSET) % Self::SIZE_OF_T != 0)) { return Err(Error::Str("Region was saved incorrectly")); } let header = if region_len == 0 { - Header::create_and_write(file, region_index, version, Format::Raw)? + Header::create_and_write(file, region_index, version, format)? } else { - Header::import_and_verify( - file, - region_index, - region.read().len(), - version, - Format::Raw, - )? + Header::import_and_verify(file, region_index, region.read().len(), version, format)? }; let holes = if let Ok(holes) = file.get_region(Self::holes_region_name_(name).into()) { @@ -200,13 +196,7 @@ where #[inline] fn stored_len(&self) -> usize { - (self - .file - .get_region(self.region_index.into()) - .unwrap() - .len() as usize - - HEADER_OFFSET) - / Self::SIZE_OF_T + (self.region.read().len() as usize - HEADER_OFFSET) / Self::SIZE_OF_T } fn flush(&mut self) -> Result<()> { @@ -227,22 +217,10 @@ where let file = &self.file; if has_new_data { - let bytes = { - let mut bytes: Vec = vec![0; pushed_len * Self::SIZE_OF_T]; - - let unsafe_bytes = UnsafeSlice::new(&mut bytes); - - mem::take(&mut self.pushed) - .into_par_iter() - .enumerate() - .for_each(|(i, v)| { - unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes()) - }); - - bytes - }; - - file.write_all_to_region(self.region_index.into(), &bytes)?; + file.write_all_to_region( + self.region_index.into(), + mem::take(&mut self.pushed).as_bytes(), + )?; } if has_updated_data { diff --git a/crates/brk_vecs/src/variants/raw/unsafe_slice.rs b/crates/brk_vecs/src/variants/raw/unsafe_slice.rs deleted file mode 100644 index 37bdeb6f4..000000000 --- a/crates/brk_vecs/src/variants/raw/unsafe_slice.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::cell::UnsafeCell; - -#[derive(Copy, Clone)] -pub struct UnsafeSlice<'a, T>(&'a [UnsafeCell]); -unsafe impl Send for UnsafeSlice<'_, T> {} -unsafe impl Sync for UnsafeSlice<'_, T> {} - -impl<'a, T> UnsafeSlice<'a, T> { - pub fn new(slice: &'a mut [T]) -> Self { - let ptr = slice as *mut [T] as *const [UnsafeCell]; - Self(unsafe { &*ptr }) - } - - /// SAFETY: It is UB if two threads write to the same index without - /// synchronization. - pub fn write(&self, i: usize, value: T) { - unsafe { - *self.0[i].get() = value; - } - } - - /// SAFETY: It is UB - pub fn get(&self, i: usize) -> *mut T { - self.0[i].get() - } - - pub fn copy_slice(&self, start: usize, slice: &[T]) - where - T: Copy, - { - slice.iter().enumerate().for_each(|(i, v)| { - self.write(start + i, *v); - }); - } -} diff --git a/crates/brk_vecs/src/variants/stored/format.rs b/crates/brk_vecs/src/variants/stored/format.rs index a458d2dae..bf86d86f1 100644 --- a/crates/brk_vecs/src/variants/stored/format.rs +++ b/crates/brk_vecs/src/variants/stored/format.rs @@ -1,6 +1,6 @@ use std::{fs, io, path::Path}; -use serde::{Deserialize, Serialize}; +use serde_derive::{Deserialize, Serialize}; use crate::{Error, Result};