From 589bb0241138a2dbefb6e5e77de46da1e18ba298 Mon Sep 17 00:00:00 2001 From: nym21 Date: Mon, 23 Jun 2025 20:48:00 +0200 Subject: [PATCH] vec: single file with header --- Cargo.lock | 100 +++++----- Cargo.toml | 6 +- README.md | 4 + crates/brk_cli/src/config.rs | 2 +- crates/brk_computer/src/vecs/blocks.rs | 4 +- crates/brk_computer/src/vecs/cointime.rs | 4 +- crates/brk_computer/src/vecs/constants.rs | 4 +- crates/brk_computer/src/vecs/fetched.rs | 4 +- crates/brk_computer/src/vecs/indexes.rs | 4 +- crates/brk_computer/src/vecs/market.rs | 4 +- crates/brk_computer/src/vecs/mining.rs | 4 +- crates/brk_computer/src/vecs/mod.rs | 4 +- .../brk_computer/src/vecs/stateful/cohort.rs | 4 +- crates/brk_computer/src/vecs/transactions.rs | 4 +- crates/brk_core/src/enums/error.rs | 4 +- crates/brk_core/src/structs/version.rs | 13 +- crates/brk_core/src/utils/rlimit.rs | 4 +- crates/brk_fetcher/src/lib.rs | 6 +- crates/brk_indexer/examples/main.rs | 2 +- crates/brk_indexer/src/indexes.rs | 10 +- crates/brk_indexer/src/lib.rs | 6 +- crates/brk_indexer/src/vecs.rs | 9 +- crates/brk_logger/src/lib.rs | 2 +- crates/brk_parser/src/lib.rs | 2 +- crates/brk_server/Cargo.toml | 2 +- crates/brk_state/src/outputs/by_date_range.rs | 2 +- crates/brk_state/src/outputs/by_term.rs | 4 +- crates/brk_store/src/lib.rs | 35 ++-- crates/brk_store/src/meta.rs | 12 +- crates/brk_vec/examples/main.rs | 39 +++- .../src/structs/compressed_pages_meta.rs | 8 +- crates/brk_vec/src/structs/format.rs | 4 + crates/brk_vec/src/structs/header.rs | 183 ++++++++++++++++++ crates/brk_vec/src/structs/mod.rs | 6 +- crates/brk_vec/src/traits/generic.rs | 93 +++++---- crates/brk_vec/src/variants/compressed.rs | 78 +++++--- crates/brk_vec/src/variants/eager.rs | 161 +++++++-------- crates/brk_vec/src/variants/indexed.rs | 81 ++++---- crates/brk_vec/src/variants/raw.rs | 119 ++++++++---- crates/brk_vec/src/variants/stored.rs | 52 ++++- 40 files changed, 685 insertions(+), 404 deletions(-) create mode 100644 crates/brk_vec/src/structs/header.rs diff --git a/Cargo.lock b/Cargo.lock index c7fb3f350..a7b6b10d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -197,7 +197,7 @@ checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -589,7 +589,7 @@ dependencies = [ "brk_vec", "color-eyre", "derive_deref", - "schemars 1.0.0-rc.2", + "schemars 1.0.0", "serde", "serde_json", "serde_with", @@ -623,9 +623,9 @@ dependencies = [ [[package]] name = "brk_rmcp" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90d257949059da288c47b4f88ac80abcaa18afe65972d8b543369ee2099b37db" +checksum = "7ecf167efab04b87a8850e12196228388733aa43adaeafd5b4d6f9e29d850263" dependencies = [ "base64 0.22.1", "brk_rmcp-macros", @@ -638,7 +638,7 @@ dependencies = [ "paste", "pin-project-lite", "rand 0.9.1", - "schemars 1.0.0-rc.2", + "schemars 1.0.0", "serde", "serde_json", "sse-stream", @@ -660,7 +660,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -1222,7 +1222,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -1446,7 +1446,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -1457,7 +1457,7 @@ checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ "darling_core", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -1498,7 +1498,7 @@ checksum = "30542c1ad912e0e3d22a1935c290e12e8a29d704a420177a31faad4a601a0800" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -1529,7 +1529,7 @@ checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", "unicode-xid", ] @@ -1577,7 +1577,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -1664,9 +1664,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "fjall" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13279146a877c2060f668bc4c477af8ef5aa42732c58dca32fcb4aff40edc5b4" +checksum = "f5cb653019268f6dc8de3b254b633a2d233a775054349b804b9cfbf18bbe3426" dependencies = [ "byteorder", "byteview", @@ -1776,7 +1776,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -2145,7 +2145,7 @@ checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -2300,9 +2300,9 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "lsm-tree" -version = "2.10.0" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0d03b764a7e3009cc4d314bfce42ce28b4a2c458fc7149b57817cbed7898f43" +checksum = "1d2bd4cdc451a8dcf11329190afb9b78eb8988bed07a3da29b8d73d2e0c731ff" dependencies = [ "byteorder", "crossbeam-skiplist", @@ -2324,9 +2324,9 @@ dependencies = [ [[package]] name = "lz4_flex" -version = "0.11.5" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" [[package]] name = "matchers" @@ -2594,7 +2594,7 @@ checksum = "3bd3da01a295024fa79e3b4aba14b590d91256a274ff29cc5ee8f55183b2df24" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -2637,7 +2637,7 @@ dependencies = [ "phf", "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -3117,7 +3117,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -3146,7 +3146,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -3216,7 +3216,7 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -3364,7 +3364,7 @@ checksum = "1165225c21bff1f3bbce98f5a1f889949bc902d3575308cc7b0de30b4f6d27c7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -3552,9 +3552,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.0.0-rc.2" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "284cf8058b5165c2e1aedb8f56f4b09a838a764d1c110178e1747d1f77a01ceb" +checksum = "febc07c7e70b5db4f023485653c754d76e1bbe8d9dbfa20193ce13da9f9633f4" dependencies = [ "dyn-clone", "ref-cast", @@ -3565,14 +3565,14 @@ dependencies = [ [[package]] name = "schemars_derive" -version = "1.0.0-rc.2" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ce51e6bea03670e88edba2d724eed817b97f4ab017a1001bae5da325ce93fe7" +checksum = "c1eeedaab7b1e1d09b5b4661121f4d27f9e7487089b0117833ccd7a882ee1ecc" dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -3660,7 +3660,7 @@ checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -3671,7 +3671,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -3746,7 +3746,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -3883,9 +3883,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.103" +version = "2.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4307e30089d6fd6aff212f2da3a1f9e32f3223b1f010fb09b7c95f90f3ca1e8" +checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" dependencies = [ "proc-macro2", "quote", @@ -3919,7 +3919,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -3990,7 +3990,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -4001,7 +4001,7 @@ checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -4068,7 +4068,7 @@ checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -4204,7 +4204,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -4302,7 +4302,7 @@ checksum = "e9d4ed7b4c18cc150a6a0a1e9ea1ecfa688791220781af6e119f9599a8502a0a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", "termcolor", ] @@ -4499,7 +4499,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", "wasm-bindgen-shared", ] @@ -4521,7 +4521,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4593,7 +4593,7 @@ checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -4604,7 +4604,7 @@ checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -4833,7 +4833,7 @@ checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] @@ -4853,14 +4853,14 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.103", + "syn 2.0.104", ] [[package]] name = "zip" -version = "4.1.0" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af7dcdb4229c0e79c2531a24de7726a0e980417a74fb4d030a35f535665439a0" +checksum = "95ab361742de920c5535880f89bbd611ee62002bf11341d16a5f057bb8ba6899" dependencies = [ "aes", "arbitrary", diff --git a/Cargo.toml b/Cargo.toml index dd96ac4af..0ea75d29d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ brk_indexer = { version = "0.0.66", path = "crates/brk_indexer" } brk_interface = { version = "0.0.66", path = "crates/brk_interface" } brk_logger = { version = "0.0.66", path = "crates/brk_logger" } brk_parser = { version = "0.0.66", path = "crates/brk_parser" } -brk_rmcp = { version = "0.1.6", features = ["transport-streamable-http-server", "transport-worker"]} +brk_rmcp = { version = "0.1.7", features = ["transport-streamable-http-server", "transport-worker"]} brk_server = { version = "0.0.66", path = "crates/brk_server" } brk_state = { version = "0.0.66", path = "crates/brk_state" } brk_store = { version = "0.0.66", path = "crates/brk_store" } @@ -42,12 +42,12 @@ clap = { version = "4.5.40", features = ["string"] } clap_derive = "4.5.40" color-eyre = "0.6.5" derive_deref = "1.1.1" -fjall = "2.11.0" +fjall = "2.11.1" jiff = "0.2.15" log = { version = "0.4.27" } minreq = { version = "2.13.4", features = ["https", "serde_json"] } rayon = "1.10.0" -schemars = "1.0.0-rc.2" +schemars = "1.0.0" serde = { version = "1.0.219" } serde_bytes = "0.11.17" serde_derive = "1.0.219" diff --git a/README.md b/README.md index 70eca211a..35586e70a 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,10 @@ The toolkit can be used in various ways to accommodate as many needs as possible - **[API](https://github.com/bitcoinresearchkit/brk/tree/main/crates/brk_server#endpoints)** \ Researchers and developers are free to use BRK's public API with ![Datasets variant count](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fbitcoinresearchkit.org%2Fapi%2Fvecs%2Fvariant-count&query=%24&style=flat&label=%20&color=white) dataset variants at their disposal. \ Just like the website, it's entirely free, with no authentication or rate-limiting. +- AI \ + LLMs have to possibility to connect to BRK's backend through a [MCP](https://modelcontextprotocol.io/introduction). \ + It will give them access to the same tools as the API, with no restrictions, and allow you to have your very own data analysts. \ + One-shot output examples: [Document](https://claude.ai/public/artifacts/71194d29-f965-417c-ba09-fdf0e4ecb1d5) // [Dashboard](https://claude.ai/public/artifacts/beef143f-399a-4ed4-b8bf-c986b776de42) // [Dashboard 2](https://claude.ai/public/artifacts/5430ae49-bb3d-4fc1-ab24-f1e33deb40dc) - **[CLI](https://crates.io/crates/brk_cli)** \ Node runners are strongly encouraged to try out and self-host their own instance using BRK's command line interface. \ The CLI has multiple cogs available for users to tweak to adapt to all situations with even the possibility for web developers to create their own custom website which could later on be added as an alternative front-end. diff --git a/crates/brk_cli/src/config.rs b/crates/brk_cli/src/config.rs index 58fa8c0f6..2f978b03a 100644 --- a/crates/brk_cli/src/config.rs +++ b/crates/brk_cli/src/config.rs @@ -87,7 +87,7 @@ pub struct Config { #[arg(long, value_name = "SECONDS")] delay: Option, - /// Activate the Model Context Protocol (MCP) endpoint to give LLMs access to BRK, default: false, saved + /// Activate the Model Context Protocol (MCP) endpoint to give LLMs access to BRK (experimental), default: false, saved #[serde(default, deserialize_with = "default_on_error")] #[arg(long, value_name = "BOOL")] mcp: Option, diff --git a/crates/brk_computer/src/vecs/blocks.rs b/crates/brk_computer/src/vecs/blocks.rs index 72358fc39..75644192d 100644 --- a/crates/brk_computer/src/vecs/blocks.rs +++ b/crates/brk_computer/src/vecs/blocks.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::path::Path; use brk_core::{ CheckedSub, DifficultyEpoch, HalvingEpoch, Height, StoredU32, StoredU64, StoredUsize, @@ -37,8 +37,6 @@ impl Vecs { _computation: Computation, format: Format, ) -> color_eyre::Result { - fs::create_dir_all(path)?; - Ok(Self { height_to_interval: EagerVec::forced_import( path, diff --git a/crates/brk_computer/src/vecs/cointime.rs b/crates/brk_computer/src/vecs/cointime.rs index 641177395..ecb6563ef 100644 --- a/crates/brk_computer/src/vecs/cointime.rs +++ b/crates/brk_computer/src/vecs/cointime.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::path::Path; use brk_core::{Bitcoin, CheckedSub, Dollars, StoredF64, Version}; use brk_exit::Exit; @@ -57,8 +57,6 @@ impl Vecs { ) -> color_eyre::Result { let compute_dollars = fetched.is_some(); - fs::create_dir_all(path)?; - Ok(Self { indexes_to_coinblocks_created: ComputedVecsFromHeight::forced_import( path, diff --git a/crates/brk_computer/src/vecs/constants.rs b/crates/brk_computer/src/vecs/constants.rs index 34a105924..d686a1f6a 100644 --- a/crates/brk_computer/src/vecs/constants.rs +++ b/crates/brk_computer/src/vecs/constants.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::path::Path; use brk_core::{StoredU8, Version}; use brk_exit::Exit; @@ -28,8 +28,6 @@ impl Vecs { _computation: Computation, format: Format, ) -> color_eyre::Result { - fs::create_dir_all(path)?; - Ok(Self { _0: ComputedVecsFromHeight::forced_import( path, diff --git a/crates/brk_computer/src/vecs/fetched.rs b/crates/brk_computer/src/vecs/fetched.rs index 4f0dec6c3..2fdd97e17 100644 --- a/crates/brk_computer/src/vecs/fetched.rs +++ b/crates/brk_computer/src/vecs/fetched.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::path::Path; use brk_core::{ Cents, Close, DateIndex, DecadeIndex, DifficultyEpoch, Dollars, Height, High, Low, MonthIndex, @@ -75,8 +75,6 @@ impl Vecs { _computation: Computation, format: Format, ) -> color_eyre::Result { - fs::create_dir_all(path)?; - let mut fetched_path = path.to_owned(); fetched_path.pop(); fetched_path = fetched_path.join("fetched"); diff --git a/crates/brk_computer/src/vecs/indexes.rs b/crates/brk_computer/src/vecs/indexes.rs index b4717ca75..ef11f59e3 100644 --- a/crates/brk_computer/src/vecs/indexes.rs +++ b/crates/brk_computer/src/vecs/indexes.rs @@ -1,4 +1,4 @@ -use std::{fs, ops::Deref, path::Path}; +use std::{ops::Deref, path::Path}; use brk_core::{ Date, DateIndex, DecadeIndex, DifficultyEpoch, EmptyOutputIndex, HalvingEpoch, Height, @@ -94,8 +94,6 @@ impl Vecs { computation: Computation, format: Format, ) -> color_eyre::Result { - fs::create_dir_all(path)?; - let outputindex_to_outputindex = ComputedVec::forced_import_or_init_from_1( computation, path, diff --git a/crates/brk_computer/src/vecs/market.rs b/crates/brk_computer/src/vecs/market.rs index e1d002b39..98e670660 100644 --- a/crates/brk_computer/src/vecs/market.rs +++ b/crates/brk_computer/src/vecs/market.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path, thread}; +use std::{path::Path, thread}; use brk_core::{Date, DateIndex, Dollars, Height, Sats, StoredF32, StoredUsize, Version}; use brk_exit::Exit; @@ -168,8 +168,6 @@ impl Vecs { _computation: Computation, format: Format, ) -> color_eyre::Result { - fs::create_dir_all(path)?; - Ok(Self { height_to_marketcap: EagerVec::forced_import( path, diff --git a/crates/brk_computer/src/vecs/mining.rs b/crates/brk_computer/src/vecs/mining.rs index bc563d0ad..0dfc6e144 100644 --- a/crates/brk_computer/src/vecs/mining.rs +++ b/crates/brk_computer/src/vecs/mining.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::path::Path; use brk_core::{DifficultyEpoch, HalvingEpoch, StoredF64, Version}; use brk_exit::Exit; @@ -27,8 +27,6 @@ impl Vecs { _computation: Computation, format: Format, ) -> color_eyre::Result { - fs::create_dir_all(path)?; - Ok(Self { indexes_to_difficulty: ComputedVecsFromHeight::forced_import( path, diff --git a/crates/brk_computer/src/vecs/mod.rs b/crates/brk_computer/src/vecs/mod.rs index 5a65c91b8..24683c3b3 100644 --- a/crates/brk_computer/src/vecs/mod.rs +++ b/crates/brk_computer/src/vecs/mod.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path, thread}; +use std::{path::Path, thread}; use brk_core::Version; use brk_exit::Exit; @@ -44,8 +44,6 @@ impl Vecs { computation: Computation, format: Format, ) -> color_eyre::Result { - fs::create_dir_all(path)?; - let (indexes, fetched) = thread::scope(|s| { let indexes_handle = s.spawn(|| { indexes::Vecs::forced_import( diff --git a/crates/brk_computer/src/vecs/stateful/cohort.rs b/crates/brk_computer/src/vecs/stateful/cohort.rs index 00477cc94..356248199 100644 --- a/crates/brk_computer/src/vecs/stateful/cohort.rs +++ b/crates/brk_computer/src/vecs/stateful/cohort.rs @@ -1,5 +1,5 @@ use core::panic; -use std::{fs, path::Path}; +use std::path::Path; use brk_core::{ Bitcoin, DateIndex, Dollars, Height, Result, Sats, StoredF32, StoredF64, StoredUsize, Version, @@ -143,8 +143,6 @@ impl Vecs { ) -> color_eyre::Result { let compute_dollars = fetched.is_some(); - fs::create_dir_all(path)?; - // let prefix = |s: &str| cohort_name.map_or(s.to_string(), |name| format!("{s}_{name}")); let suffix = |s: &str| cohort_name.map_or(s.to_string(), |name| format!("{name}_{s}")); diff --git a/crates/brk_computer/src/vecs/transactions.rs b/crates/brk_computer/src/vecs/transactions.rs index 0edde8e9d..cc5499a9d 100644 --- a/crates/brk_computer/src/vecs/transactions.rs +++ b/crates/brk_computer/src/vecs/transactions.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::path::Path; use brk_core::{ CheckedSub, Feerate, HalvingEpoch, Height, InputIndex, OutputIndex, Sats, StoredU32, @@ -98,8 +98,6 @@ impl Vecs { ) -> color_eyre::Result { let compute_dollars = fetched.is_some(); - fs::create_dir_all(path)?; - let inputindex_to_value = ComputedVec::forced_import_or_init_from_2( computation, path, diff --git a/crates/brk_core/src/enums/error.rs b/crates/brk_core/src/enums/error.rs index 60ef08d1b..14964904d 100644 --- a/crates/brk_core/src/enums/error.rs +++ b/crates/brk_core/src/enums/error.rs @@ -20,6 +20,7 @@ pub enum Error { WrongEndian, DifferentVersion { found: Version, expected: Version }, + UnexpectedData, MmapsVecIsTooSmall, IndexTooHigh, EmptyVec, @@ -102,12 +103,13 @@ impl fmt::Display for Error { Error::BincodeDecodeError(error) => Debug::fmt(&error, f), Error::BincodeEncodeError(error) => Debug::fmt(&error, f), Error::ZeroCopyError => write!(f, "ZeroCopy error"), + Error::UnexpectedData => write!(f, "Unexpected data"), Error::WrongEndian => write!(f, "Wrong endian"), Error::DifferentVersion { found, expected } => { write!( f, - "Different version; found: {found:?}, expected: {expected:?}" + "Different version found: {found:?}, expected: {expected:?}" ) } Error::MmapsVecIsTooSmall => write!(f, "Mmaps vec is too small"), diff --git a/crates/brk_core/src/structs/version.rs b/crates/brk_core/src/structs/version.rs index 8a555293d..dc740367c 100644 --- a/crates/brk_core/src/structs/version.rs +++ b/crates/brk_core/src/structs/version.rs @@ -12,7 +12,18 @@ use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; use crate::{Error, Result}; #[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, IntoBytes, Immutable, KnownLayout, + Default, + Debug, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + FromBytes, + IntoBytes, + Immutable, + KnownLayout, )] pub struct Version(u64); diff --git a/crates/brk_core/src/utils/rlimit.rs b/crates/brk_core/src/utils/rlimit.rs index 6f078c5d1..162a52574 100644 --- a/crates/brk_core/src/utils/rlimit.rs +++ b/crates/brk_core/src/utils/rlimit.rs @@ -4,14 +4,12 @@ use rlimit::{Resource, getrlimit}; pub fn setrlimit() -> io::Result<()> { let no_file_limit = getrlimit(Resource::NOFILE)?; + rlimit::setrlimit( Resource::NOFILE, no_file_limit.0.max(210_000), no_file_limit.1, )?; - // let no_stack = getrlimit(Resource::STACK)?; - // rlimit::setrlimit(Resource::STACK, no_stack.1, no_stack.1)?; - Ok(()) } diff --git a/crates/brk_fetcher/src/lib.rs b/crates/brk_fetcher/src/lib.rs index 9d7f10a57..f27391975 100644 --- a/crates/brk_fetcher/src/lib.rs +++ b/crates/brk_fetcher/src/lib.rs @@ -3,7 +3,7 @@ #![doc = include_str!("../examples/main.rs")] #![doc = "```"] -use std::{collections::BTreeMap, fs, path::Path, thread::sleep, time::Duration}; +use std::{collections::BTreeMap, path::Path, thread::sleep, time::Duration}; use brk_core::{Close, Date, Dollars, Height, High, Low, OHLCCents, Open, Timestamp}; use color_eyre::eyre::Error; @@ -24,10 +24,6 @@ pub struct Fetcher { impl Fetcher { pub fn import(hars_path: Option<&Path>) -> color_eyre::Result { - if let Some(path) = hars_path { - fs::create_dir_all(path)?; - } - Ok(Self { binance: Binance::init(hars_path), kraken: Kraken::default(), diff --git a/crates/brk_indexer/examples/main.rs b/crates/brk_indexer/examples/main.rs index 276511a7d..5fa9c2a40 100644 --- a/crates/brk_indexer/examples/main.rs +++ b/crates/brk_indexer/examples/main.rs @@ -26,7 +26,7 @@ fn main() -> color_eyre::Result<()> { let mut indexer = Indexer::forced_import(outputs)?; - indexer.index(&parser, rpc, &exit, false)?; + indexer.index(&parser, rpc, &exit, true)?; dbg!(i.elapsed()); diff --git a/crates/brk_indexer/src/indexes.rs b/crates/brk_indexer/src/indexes.rs index 1719c35da..338311509 100644 --- a/crates/brk_indexer/src/indexes.rs +++ b/crates/brk_indexer/src/indexes.rs @@ -5,7 +5,7 @@ use brk_core::{ P2SHIndex, P2TRIndex, P2WPKHIndex, P2WSHIndex, Result, TxIndex, UnknownOutputIndex, }; use brk_parser::NUMBER_OF_UNSAFE_BLOCKS; -use brk_vec::{AnyIterableVec, AnyVec, IndexedVec, StoredIndex, StoredType}; +use brk_vec::{AnyIndexedVec, AnyIterableVec, AnyVec, IndexedVec, StoredIndex, StoredType}; use color_eyre::eyre::ContextCompat; use crate::{Stores, Vecs}; @@ -215,10 +215,10 @@ where I: StoredType + StoredIndex + From, T: StoredType, { - if height_to_index - .height() - .is_ok_and(|h| h + 1_u32 == starting_height) - { + let h = height_to_index.height(); + if h.is_zero() { + None + } else if height_to_index.height() + 1_u32 == starting_height { Some(I::from(index_to_else.len())) } else { height_to_index.iter().get_inner(starting_height) diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 260476737..1694d000a 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -53,11 +53,7 @@ impl Indexer { check_collisions: bool, ) -> color_eyre::Result { let starting_indexes = Indexes::try_from((&mut self.vecs, &self.stores, rpc)) - .unwrap_or_else(|_report| { - let indexes = Indexes::default(); - indexes.push_if_needed(&mut self.vecs).unwrap(); - indexes - }); + .unwrap_or_else(|_report| Indexes::default()); exit.block(); self.stores diff --git a/crates/brk_indexer/src/vecs.rs b/crates/brk_indexer/src/vecs.rs index 2e329b0b7..664a3930f 100644 --- a/crates/brk_indexer/src/vecs.rs +++ b/crates/brk_indexer/src/vecs.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::path::Path; use brk_core::{ AddressBytes, BlockHash, EmptyOutputIndex, Height, InputIndex, OpReturnIndex, OutputIndex, @@ -67,8 +67,6 @@ pub struct Vecs { impl Vecs { pub fn forced_import(path: &Path, version: Version) -> color_eyre::Result { - fs::create_dir_all(path)?; - Ok(Self { emptyoutputindex_to_txindex: IndexedVec::forced_import( path, @@ -493,7 +491,10 @@ impl Vecs { pub fn starting_height(&mut self) -> Height { self.mut_vecs() .into_iter() - .map(|vec| vec.height().map(Height::incremented).unwrap_or_default()) + .map(|vec| { + let h = vec.height(); + if h > Height::ZERO { h.incremented() } else { h } + }) .min() .unwrap() } diff --git a/crates/brk_logger/src/lib.rs b/crates/brk_logger/src/lib.rs index e1e90c748..e95f7b465 100644 --- a/crates/brk_logger/src/lib.rs +++ b/crates/brk_logger/src/lib.rs @@ -26,7 +26,7 @@ pub fn init(path: Option<&Path>) { }); Builder::from_env(Env::default().default_filter_or( - "info,fjall=off,lsm_tree=off,rolldown=off,brk_rolldown=off,rmcp=off,brk_rmcp=off,tracing=off", + "info,bitcoin=off,bitcoincore-rpc=off,fjall=off,lsm_tree=off,rolldown=off,brk_rolldown=off,rmcp=off,brk_rmcp=off,tracing=off", )) .format(move |buf, record| { let date_time = Timestamp::now() diff --git a/crates/brk_parser/src/lib.rs b/crates/brk_parser/src/lib.rs index 3c39923e2..59b80bdb7 100644 --- a/crates/brk_parser/src/lib.rs +++ b/crates/brk_parser/src/lib.rs @@ -31,7 +31,7 @@ use utils::*; use xor_bytes::*; use xor_index::*; -pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 1000; +pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 100; const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217]; const BOUND_CAP: usize = 50; diff --git a/crates/brk_server/Cargo.toml b/crates/brk_server/Cargo.toml index 6f0140ee6..b7d800202 100644 --- a/crates/brk_server/Cargo.toml +++ b/crates/brk_server/Cargo.toml @@ -31,7 +31,7 @@ serde = { workspace = true } tokio = { workspace = true } tower-http = { version = "0.6.6", features = ["compression-full", "trace"] } tracing = "0.1.41" -zip = "4.1.0" +zip = "4.2.0" [package.metadata.cargo-machete] ignored = ["clap"] diff --git a/crates/brk_state/src/outputs/by_date_range.rs b/crates/brk_state/src/outputs/by_date_range.rs index 04f2dfa85..d482171e1 100644 --- a/crates/brk_state/src/outputs/by_date_range.rs +++ b/crates/brk_state/src/outputs/by_date_range.rs @@ -42,7 +42,7 @@ impl From> for OutputsByDateRange<(OutputFilter, T)> { _5y_to_6y: (OutputFilter::Range(5 * 365..6 * 365), value._5y_to_6y), _6y_to_7y: (OutputFilter::Range(6 * 365..7 * 365), value._6y_to_7y), _7y_to_8y: (OutputFilter::Range(7 * 365..8 * 365), value._7y_to_8y), - _8y_to_10y: (OutputFilter::Range(7 * 365..10 * 365), value._8y_to_10y), + _8y_to_10y: (OutputFilter::Range(8 * 365..10 * 365), value._8y_to_10y), _10y_to_15y: (OutputFilter::Range(10 * 365..15 * 365), value._10y_to_15y), _15y_to_end: (OutputFilter::From(15 * 365), value._15y_to_end), } diff --git a/crates/brk_state/src/outputs/by_term.rs b/crates/brk_state/src/outputs/by_term.rs index b10c2359b..1a80fbb34 100644 --- a/crates/brk_state/src/outputs/by_term.rs +++ b/crates/brk_state/src/outputs/by_term.rs @@ -21,8 +21,8 @@ impl OutputsByTerm<(OutputFilter, T)> { impl From> for OutputsByTerm<(OutputFilter, T)> { fn from(value: OutputsByTerm) -> Self { Self { - long: (OutputFilter::From(150), value.long), - short: (OutputFilter::To(150), value.short), + short: (OutputFilter::To(5 * 30), value.short), + long: (OutputFilter::From(5 * 30), value.long), } } } diff --git a/crates/brk_store/src/lib.rs b/crates/brk_store/src/lib.rs index b849cfdfa..7b99eace6 100644 --- a/crates/brk_store/src/lib.rs +++ b/crates/brk_store/src/lib.rs @@ -8,10 +8,8 @@ use std::{ fmt::Debug, mem, path::Path, - sync::Arc, }; -use arc_swap::ArcSwap; use brk_core::{Height, Result, Value, Version}; use byteview::ByteView; use fjall::{ @@ -26,7 +24,7 @@ pub struct Store { meta: StoreMeta, name: String, keyspace: TransactionalKeyspace, - partition: Arc>, + partition: Option, rtx: ReadTransaction, puts: BTreeMap, dels: BTreeSet, @@ -75,7 +73,7 @@ where meta, name: name.to_owned(), keyspace: keyspace.clone(), - partition: Arc::new(ArcSwap::from_pointee(partition)), + partition: Some(partition), rtx, puts: BTreeMap::new(), dels: BTreeSet::new(), @@ -86,7 +84,10 @@ where pub fn get(&self, key: &'a K) -> Result>> { if let Some(v) = self.puts.get(key) { Ok(Some(Value::Ref(v))) - } else if let Some(slice) = self.rtx.get(&self.partition.load(), ByteView::from(key))? { + } else if let Some(slice) = self + .rtx + .get(self.partition.as_ref().unwrap(), ByteView::from(key))? + { Ok(Some(Value::Owned(V::from(ByteView::from(slice))))) } else { Ok(None) @@ -169,7 +170,7 @@ where let mut wtx = self.keyspace.write_tx(); - let partition = &self.partition.load(); + let partition = self.partition.as_ref().unwrap(); mem::take(&mut self.dels) .into_iter() @@ -201,7 +202,7 @@ where } pub fn rotate_memtable(&self) { - let _ = self.partition.load().inner().rotate_memtable(); + let _ = self.partition.as_ref().unwrap().inner().rotate_memtable(); } pub fn height(&self) -> Option { @@ -250,22 +251,18 @@ where } pub fn reset_partition(&mut self) -> Result<()> { - let partition = Arc::try_unwrap(self.partition.swap(unsafe { - #[allow(clippy::uninit_assumed_init, invalid_value)] - mem::MaybeUninit::uninit().assume_init() - })) - .ok() - .unwrap(); + let partition: TransactionalPartitionHandle = self.partition.take().unwrap(); self.keyspace.delete_partition(partition)?; self.keyspace.persist(PersistMode::SyncAll)?; - self.partition.store(Arc::new(Self::open_partition_handle( - &self.keyspace, - &self.name, - self.bloom_filter_bits, - )?)); + self.meta.reset(); + + let partition = + Self::open_partition_handle(&self.keyspace, &self.name, self.bloom_filter_bits)?; + + self.partition.replace(partition); Ok(()) } @@ -281,7 +278,7 @@ where meta: self.meta.clone(), name: self.name.clone(), keyspace: self.keyspace.clone(), - partition: self.partition.clone(), + partition: None, rtx: self.keyspace.read_tx(), puts: self.puts.clone(), dels: self.dels.clone(), diff --git a/crates/brk_store/src/meta.rs b/crates/brk_store/src/meta.rs index 8b662236b..bf052f9a4 100644 --- a/crates/brk_store/src/meta.rs +++ b/crates/brk_store/src/meta.rs @@ -37,7 +37,8 @@ impl StoreMeta { let mut partition = open_partition_handle()?; if !is_same_version { - Self::reset_(path)?; + fs::remove_dir_all(path)?; + fs::create_dir(path)?; keyspace.delete_partition(partition)?; keyspace.persist(fjall::PersistMode::SyncAll)?; partition = open_partition_handle()?; @@ -76,12 +77,9 @@ impl StoreMeta { height.write(&self.path_height()) } - // pub fn reset(&self) -> io::Result<()> { - // Self::reset_(self.pathbuf.as_path()) - // } - fn reset_(path: &Path) -> io::Result<()> { - fs::remove_dir_all(path)?; - fs::create_dir(path) + pub fn reset(&mut self) { + self.height.take(); + self.len = 0 } pub fn path(&self) -> &Path { diff --git a/crates/brk_vec/examples/main.rs b/crates/brk_vec/examples/main.rs index 40168e841..f8c26ab02 100644 --- a/crates/brk_vec/examples/main.rs +++ b/crates/brk_vec/examples/main.rs @@ -1,17 +1,18 @@ use std::{fs, path::Path}; -use brk_core::{DateIndex, Version}; +use brk_core::{DateIndex, Height, Version}; use brk_vec::{AnyVec, CollectableVec, Format, GenericStoredVec, StoredVec, VecIterator}; +type VEC = StoredVec; + fn main() -> Result<(), Box> { let _ = fs::remove_dir_all("./vec"); - let version = Version::ZERO; + let version = Version::TWO; let format = Format::Compressed; { - let mut vec: StoredVec = - StoredVec::forced_import(Path::new("."), "vec", version, format)?; + let mut vec: VEC = StoredVec::forced_import(Path::new("."), "vec", version, format)?; (0..21_u32).for_each(|v| { vec.push(v); @@ -23,14 +24,16 @@ fn main() -> Result<(), Box> { dbg!(iter.get(21.into())); vec.flush()?; + + // dbg!(vec.header()); } { - let mut vec: StoredVec = - StoredVec::forced_import(Path::new("."), "vec", version, format)?; - let mut iter = vec.into_iter(); + let mut vec: VEC = StoredVec::forced_import(Path::new("."), "vec", version, format)?; - dbg!(iter.get(0.into())); + vec.mut_header().update_height(Height::new(100)); + + let mut iter = vec.into_iter(); dbg!(iter.get(0.into())); dbg!(iter.get(1.into())); dbg!(iter.get(2.into())); @@ -52,8 +55,7 @@ fn main() -> Result<(), Box> { } { - let mut vec: StoredVec = - StoredVec::forced_import(Path::new("."), "vec", version, format)?; + let mut vec: VEC = StoredVec::forced_import(Path::new("."), "vec", version, format)?; let mut iter = vec.into_iter(); dbg!(iter.get(0.into())); @@ -77,5 +79,22 @@ fn main() -> Result<(), Box> { dbg!(vec.into_iter().collect::>()); } + { + let mut vec: VEC = StoredVec::forced_import(Path::new("."), "vec", version, format)?; + + vec.reset()?; + + dbg!(vec.header(), vec.pushed_len(), vec.stored_len(), vec.len()); + + (0..21_u32).for_each(|v| { + vec.push(v); + }); + + let mut iter = vec.into_iter(); + dbg!(iter.get(0.into())); + dbg!(iter.get(20.into())); + dbg!(iter.get(21.into())); + } + Ok(()) } diff --git a/crates/brk_vec/src/structs/compressed_pages_meta.rs b/crates/brk_vec/src/structs/compressed_pages_meta.rs index e2f2b20ff..6d170e064 100644 --- a/crates/brk_vec/src/structs/compressed_pages_meta.rs +++ b/crates/brk_vec/src/structs/compressed_pages_meta.rs @@ -21,10 +21,8 @@ impl CompressedPagesMetadata { const PAGE_SIZE: usize = size_of::(); pub fn read(path: &Path) -> Result { - let path = path.join("pages_meta"); - - let slf = Self { - vec: fs::read(&path) + let this = Self { + vec: fs::read(path) .unwrap_or_default() .chunks(Self::PAGE_SIZE) .map(|bytes| { @@ -38,7 +36,7 @@ impl CompressedPagesMetadata { change_at: None, }; - Ok(slf) + Ok(this) } pub fn write(&mut self) -> io::Result<()> { diff --git a/crates/brk_vec/src/structs/format.rs b/crates/brk_vec/src/structs/format.rs index 1ba767b37..ab5e5452c 100644 --- a/crates/brk_vec/src/structs/format.rs +++ b/crates/brk_vec/src/structs/format.rs @@ -18,6 +18,10 @@ impl Format { fs::write(path, self.as_bytes()) } + pub fn is_raw(&self) -> bool { + *self == Self::Raw + } + pub fn is_compressed(&self) -> bool { *self == Self::Compressed } diff --git a/crates/brk_vec/src/structs/header.rs b/crates/brk_vec/src/structs/header.rs new file mode 100644 index 000000000..bfe6972ab --- /dev/null +++ b/crates/brk_vec/src/structs/header.rs @@ -0,0 +1,183 @@ +use std::{ + fs::File, + io::{self, Seek, SeekFrom}, + os::unix::fs::FileExt, + sync::Arc, +}; + +use arc_swap::ArcSwap; +use brk_core::{Error, Height, Result, Version}; +use memmap2::Mmap; +use zerocopy::{FromBytes, IntoBytes}; +use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +use crate::Format; + +const HEADER_VERSION: Version = Version::ONE; +pub const HEADER_OFFSET: usize = size_of::(); + +#[derive(Debug, Clone)] +pub struct Header { + inner: Arc>, + modified: bool, +} + +impl Header { + pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result { + let inner = HeaderInner::create_and_write(file, vec_version, format)?; + Ok(Self { + inner: Arc::new(ArcSwap::from_pointee(inner)), + modified: false, + }) + } + + pub fn import_and_verify(mmap: &Mmap, vec_version: Version, format: Format) -> Result { + let inner = HeaderInner::import_and_verify(mmap, vec_version, format)?; + Ok(Self { + inner: Arc::new(ArcSwap::from_pointee(inner)), + modified: false, + }) + } + + pub fn update_height(&mut self, height: Height) { + self.modified = true; + self.inner.rcu(|header| { + let mut header = (**header).clone(); + header.height = height; + header + }); + } + + pub fn update_computed_version(&mut self, computed_version: Version) { + self.modified = true; + self.inner.rcu(|header| { + let mut header = (**header).clone(); + header.computed_version = computed_version; + header + }); + } + + pub fn vec_version(&self) -> Version { + self.inner.load().vec_version + } + + pub fn computed_version(&self) -> Version { + self.inner.load().computed_version + } + + pub fn height(&self) -> Height { + self.inner.load().height + } + + pub fn write_if_needed(&mut self, file: &mut File) -> io::Result<()> { + if self.modified { + self.inner.load().write(file)?; + self.modified = false; + } + Ok(()) + } +} + +#[repr(C)] +#[derive(Debug, Clone, FromBytes, IntoBytes, Immutable, KnownLayout)] +struct HeaderInner { + pub header_version: Version, + pub vec_version: Version, + pub computed_version: Version, + pub height: Height, + pub compressed: ZeroCopyBool, +} + +impl HeaderInner { + pub fn create_and_write(file: &mut File, vec_version: Version, format: Format) -> Result { + let header = Self { + header_version: HEADER_VERSION, + vec_version, + computed_version: Version::default(), + height: Height::default(), + compressed: ZeroCopyBool::from(format), + }; + header.write(file)?; + // dbg!(file.bytes().map(|b| b.unwrap()).collect::>()); + file.seek(SeekFrom::End(0))?; + Ok(header) + } + + pub fn write(&self, file: &mut File) -> io::Result<()> { + file.write_all_at(self.as_bytes(), 0) + } + + pub fn import_and_verify(mmap: &Mmap, vec_version: Version, format: Format) -> Result { + if mmap.len() < HEADER_OFFSET { + return Err(Error::WrongLength); + } + // dbg!(mmap.len()); + let header = HeaderInner::read_from_bytes(&mmap[..HEADER_OFFSET])?; + // dbg!(&header); + if header.header_version != HEADER_VERSION { + return Err(Error::DifferentVersion { + found: header.header_version, + expected: HEADER_VERSION, + }); + } + if header.vec_version != vec_version { + return Err(Error::DifferentVersion { + found: header.vec_version, + expected: vec_version, + }); + } + if header.compressed.is_broken() { + return Err(Error::WrongEndian); + } + if (header.compressed.is_true() && format.is_raw()) + || (header.compressed.is_false() && format.is_compressed()) + { + return Err(Error::DifferentCompressionMode); + } + Ok(header) + } +} + +#[derive( + Debug, + Clone, + Copy, + Default, + PartialEq, + Eq, + PartialOrd, + Ord, + FromBytes, + IntoBytes, + Immutable, + KnownLayout, +)] +#[repr(C)] +pub struct ZeroCopyBool(u32); + +impl ZeroCopyBool { + pub const TRUE: Self = Self(1); + pub const FALSE: Self = Self(0); + + pub fn is_true(&self) -> bool { + *self == Self::TRUE + } + + pub fn is_false(&self) -> bool { + *self == Self::FALSE + } + + pub fn is_broken(&self) -> bool { + *self > Self::TRUE + } +} + +impl From for ZeroCopyBool { + fn from(value: Format) -> Self { + if value.is_raw() { + Self::FALSE + } else { + Self::TRUE + } + } +} diff --git a/crates/brk_vec/src/structs/mod.rs b/crates/brk_vec/src/structs/mod.rs index 44c669288..265e9eaad 100644 --- a/crates/brk_vec/src/structs/mod.rs +++ b/crates/brk_vec/src/structs/mod.rs @@ -1,11 +1,13 @@ mod compressed_page_meta; mod compressed_pages_meta; mod format; -mod length; +mod header; +// mod length; mod unsafe_slice; pub use compressed_page_meta::*; pub use compressed_pages_meta::*; pub use format::*; -pub use length::*; +pub use header::*; +// pub use length::*; pub use unsafe_slice::*; diff --git a/crates/brk_vec/src/traits/generic.rs b/crates/brk_vec/src/traits/generic.rs index d4c0d665e..6232c44ca 100644 --- a/crates/brk_vec/src/traits/generic.rs +++ b/crates/brk_vec/src/traits/generic.rs @@ -10,10 +10,13 @@ use arc_swap::ArcSwap; use brk_core::{Result, Value}; use memmap2::Mmap; +use crate::{AnyVec, HEADER_OFFSET, Header}; + use super::{StoredIndex, StoredType}; pub trait GenericStoredVec: Send + Sync where + Self: AnyVec, I: StoredIndex, T: StoredType, { @@ -50,6 +53,10 @@ where self.stored_len() + self.pushed_len() } + fn index_to_name(&self) -> String { + format!("{}_to_{}", I::to_string(), self.name()) + } + fn mmap(&self) -> &ArcSwap; fn stored_len(&self) -> usize; @@ -66,26 +73,47 @@ where self.mut_pushed().push(value) } - fn path(&self) -> PathBuf; + fn header(&self) -> &Header; + fn mut_header(&mut self) -> &mut Header; + + fn parent(&self) -> &Path; + + fn folder(&self) -> PathBuf { + self.parent().join(self.name()) + } + + fn folder_(parent: &Path, name: &str) -> PathBuf { + parent.join(name) + } + + fn path(&self) -> PathBuf { + Self::path_(self.parent(), self.name()) + } + + fn path_(parent: &Path, name: &str) -> PathBuf { + Self::folder_(parent, name).join(I::to_string()) + } // --- - fn open_file(&self) -> io::Result { - Self::open_file_(&self.path_vec()) - } fn open_file_(path: &Path) -> io::Result { - OpenOptions::new() + let mut file = OpenOptions::new() .read(true) .create(true) + .write(true) .truncate(false) - .append(true) - .open(path) + .open(path)?; + file.seek(SeekFrom::End(0))?; + Ok(file) } + fn file(&self) -> &File; + fn mut_file(&mut self) -> &mut File; + fn file_set_len(&mut self, len: u64) -> Result<()> { - let mut file = self.open_file()?; - Self::file_set_len_(&mut file, len)?; - self.update_mmap(file) + let file = self.mut_file(); + Self::file_set_len_(file, len)?; + self.update_mmap() } fn file_set_len_(file: &mut File, len: u64) -> Result<()> { file.set_len(len)?; @@ -94,29 +122,31 @@ where } fn file_write_all(&mut self, buf: &[u8]) -> Result<()> { - let mut file = self.open_file()?; + let file = self.mut_file(); file.write_all(buf)?; - self.update_mmap(file) + self.update_mmap() } fn file_truncate_and_write_all(&mut self, len: u64, buf: &[u8]) -> Result<()> { - let mut file = self.open_file()?; - Self::file_set_len_(&mut file, len)?; + let file = self.mut_file(); + Self::file_set_len_(file, len)?; file.write_all(buf)?; - self.update_mmap(file) + self.update_mmap() } + fn reset(&mut self) -> Result<()>; + #[inline] - fn reset(&mut self) -> Result<()> { - self.file_truncate_and_write_all(0, &[]) + fn reset_(&mut self) -> Result<()> { + self.file_truncate_and_write_all(HEADER_OFFSET as u64, &[]) } - fn new_mmap(file: File) -> Result> { - Ok(Arc::new(unsafe { Mmap::map(&file)? })) + fn new_mmap(file: &File) -> Result> { + Ok(Arc::new(unsafe { Mmap::map(file)? })) } - fn update_mmap(&mut self, file: File) -> Result<()> { - let mmap = Self::new_mmap(file)?; + fn update_mmap(&mut self) -> Result<()> { + let mmap = Self::new_mmap(self.file())?; self.mmap().store(mmap); Ok(()) } @@ -139,28 +169,9 @@ where fn truncate_if_needed(&mut self, index: I) -> Result<()>; - #[inline] - fn path_vec(&self) -> PathBuf { - Self::path_vec_(&self.path()) - } - #[inline] - fn path_vec_(path: &Path) -> PathBuf { - path.join("vec") - } - - #[inline] - fn path_version_(path: &Path) -> PathBuf { - path.join("version") - } - - #[inline] - fn path_compressed_(path: &Path) -> PathBuf { - path.join("compressed") - } - fn modified_time_(&self) -> Result { Ok(self - .path_vec() + .file() .metadata()? .modified()? .duration_since(time::UNIX_EPOCH)?) diff --git a/crates/brk_vec/src/variants/compressed.rs b/crates/brk_vec/src/variants/compressed.rs index a1cef56c7..d0c76ed6a 100644 --- a/crates/brk_vec/src/variants/compressed.rs +++ b/crates/brk_vec/src/variants/compressed.rs @@ -14,8 +14,8 @@ use zstd::DEFAULT_COMPRESSION_LEVEL; use crate::{ AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec, - CompressedPageMetadata, CompressedPagesMetadata, GenericStoredVec, RawVec, StoredIndex, - StoredType, UnsafeSlice, + CompressedPageMetadata, CompressedPagesMetadata, GenericStoredVec, HEADER_OFFSET, Header, + RawVec, StoredIndex, StoredType, UnsafeSlice, }; const ONE_KIB: usize = 1024; @@ -23,7 +23,7 @@ const ONE_MIB: usize = ONE_KIB * ONE_KIB; pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB; pub const MAX_PAGE_SIZE: usize = 64 * ONE_KIB; -const VERSION: Version = Version::ONE; +const VERSION: Version = Version::TWO; #[derive(Debug)] pub struct CompressedVec { @@ -41,47 +41,38 @@ where pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; /// Same as import but will reset the folder under certain errors, so be careful ! - pub fn forced_import(path: &Path, name: &str, mut version: Version) -> Result { + pub fn forced_import(parent: &Path, name: &str, mut version: Version) -> Result { version = version + VERSION; - - let res = Self::import(path, name, version); + let res = Self::import(parent, name, version); match res { - Err(Error::WrongEndian) - | Err(Error::DifferentVersion { .. }) - | Err(Error::DifferentCompressionMode) => { - fs::remove_dir_all(path)?; - Self::import(path, name, version) + Err(Error::DifferentCompressionMode) + | Err(Error::WrongEndian) + | Err(Error::WrongLength) + | Err(Error::DifferentVersion { .. }) => { + let path = Self::path_(parent, name); + fs::remove_file(path)?; + Self::import(parent, name, version) } _ => res, } } - pub fn import(path: &Path, name: &str, version: Version) -> Result { + pub fn import(parent: &Path, name: &str, version: Version) -> Result { + let inner = RawVec::import(parent, name, version)?; + let pages_meta = { - let path = path.join(name).join(I::to_string()); - - let vec_exists = fs::exists(Self::path_vec_(&path)).is_ok_and(|b| b); - let compressed_path = Self::path_compressed_(&path); - let compressed_exists = fs::exists(&compressed_path).is_ok_and(|b| b); - - if vec_exists && !compressed_exists { - return Err(Error::DifferentCompressionMode); + let path = inner + .folder() + .join(format!("{}-pages-meta", I::to_string())); + if inner.is_empty() { + let _ = fs::remove_file(&path); } - - if !vec_exists && !compressed_exists { - fs::create_dir_all(&path)?; - File::create(&compressed_path)?; - } - Arc::new(ArcSwap::new(Arc::new(CompressedPagesMetadata::read( &path, )?))) }; - Ok(Self { - inner: RawVec::import(path, name, version)?, - pages_meta, - }) + Ok(Self { inner, pages_meta }) } fn decode_page(&self, page_index: usize, mmap: &Mmap) -> Result> { @@ -182,11 +173,31 @@ where .cloned()) } + fn header(&self) -> &Header { + self.inner.header() + } + + fn mut_header(&mut self) -> &mut Header { + self.inner.mut_header() + } + #[inline] fn mmap(&self) -> &ArcSwap { self.inner.mmap() } + fn parent(&self) -> &Path { + self.inner.parent() + } + + fn file(&self) -> &File { + self.inner.file() + } + + fn mut_file(&mut self) -> &mut File { + self.inner.mut_file() + } + #[inline] fn stored_len(&self) -> usize { Self::stored_len__(&self.pages_meta.load()) @@ -211,6 +222,8 @@ where } fn flush(&mut self) -> Result<()> { + self.inner.write_header_if_needed()?; + let pushed_len = self.pushed_len(); if pushed_len == 0 { @@ -266,11 +279,12 @@ where } else { 0 }; + let offsetted_start = start + HEADER_OFFSET as u64; let bytes_len = compressed_bytes.len() as u32; let values_len = *values_len as u32; - let page = CompressedPageMetadata::new(start, bytes_len, values_len); + let page = CompressedPageMetadata::new(offsetted_start, bytes_len, values_len); pages_meta.push(page_index, page); }); @@ -298,7 +312,7 @@ where pages_meta.truncate(0); pages_meta.write()?; self.pages_meta.store(Arc::new(pages_meta)); - self.file_truncate_and_write_all(0, &[]) + self.reset_() } fn truncate_if_needed(&mut self, index: I) -> Result<()> { diff --git a/crates/brk_vec/src/variants/eager.rs b/crates/brk_vec/src/variants/eager.rs index 5cf4d8a17..0c621843d 100644 --- a/crates/brk_vec/src/variants/eager.rs +++ b/crates/brk_vec/src/variants/eager.rs @@ -5,7 +5,6 @@ use std::{ fmt::Debug, ops::{Add, Div, Mul}, path::{Path, PathBuf}, - sync::Arc, time::Duration, }; @@ -29,10 +28,8 @@ const MAX_CACHE_SIZE: usize = 210 * ONE_MIB; const DCA_AMOUNT: Dollars = Dollars::mint(100.0); #[derive(Debug, Clone)] -pub struct EagerVec { - computed_version: Arc>>, - inner: StoredVec, -} +pub struct EagerVec(StoredVec); +// computed_version: Arc>>, impl EagerVec where @@ -47,12 +44,9 @@ where version: Version, format: Format, ) -> Result { - let inner = StoredVec::forced_import(path, value_name, version, format)?; - - Ok(Self { - computed_version: Arc::new(ArcSwap::from_pointee(None)), - inner, - }) + Ok(Self(StoredVec::forced_import( + path, value_name, version, format, + )?)) } fn safe_truncate_if_needed(&mut self, index: I, exit: &Exit) -> Result<()> { @@ -63,7 +57,7 @@ where if !blocked { exit.block(); } - self.inner.truncate_if_needed(index)?; + self.0.truncate_if_needed(index)?; if !blocked { exit.release(); } @@ -80,11 +74,11 @@ where if ord == Ordering::Greater { self.safe_truncate_if_needed(index, exit)?; } - self.inner.push(value); + self.0.push(value); } } - if self.inner.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE { + if self.0.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE { self.safe_flush(exit) } else { Ok(()) @@ -99,7 +93,7 @@ where if !blocked { exit.block(); } - self.inner.flush()?; + self.0.flush()?; if !blocked { exit.release(); } @@ -107,33 +101,34 @@ where } pub fn path(&self) -> PathBuf { - self.inner.path() + self.0.path() } pub fn get_or_read(&self, index: I, mmap: &Mmap) -> Result>> { - self.inner.get_or_read(index, mmap) + self.0.get_or_read(index, mmap) } pub fn mmap(&self) -> &ArcSwap { - self.inner.mmap() + self.0.mmap() } pub fn inner_version(&self) -> Version { - self.inner.version() + self.0.version() } - #[inline] - fn path_computed_version(&self) -> PathBuf { - self.inner.path().join("computed_version") + fn update_computed_version(&mut self, computed_version: Version) { + self.0 + .mut_header() + .update_computed_version(computed_version); } pub fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> { - let path = self.path_computed_version(); - if version.validate(path.as_ref()).is_err() { - self.inner.reset()?; + if version != self.0.header().computed_version() { + self.update_computed_version(version); + if !self.is_empty() { + self.0.reset()?; + } } - version.write(path.as_ref())?; - self.computed_version.store(Arc::new(Some(version))); if self.is_empty() { info!( @@ -157,9 +152,7 @@ where where F: FnMut(I) -> (I, T), { - self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + version, - )?; + self.validate_computed_version_or_reset_file(Version::ZERO + self.0.version() + version)?; let index = max_from.min(I::from(self.len())); (index.to_usize()?..to).try_for_each(|i| { @@ -216,7 +209,7 @@ where F: FnMut((A, B, &Self)) -> (I, T), { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + other.version(), + Version::ZERO + self.0.version() + other.version(), )?; let index = max_from.min(A::from(self.len())); @@ -239,7 +232,7 @@ where T: Add, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + added.version() + adder.version(), + Version::ZERO + self.0.version() + added.version() + adder.version(), )?; let index = max_from.min(I::from(self.len())); @@ -265,7 +258,7 @@ where T: CheckedSub, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + subtracted.version() + subtracter.version(), + Version::ZERO + self.0.version() + subtracted.version() + subtracter.version(), )?; let index = max_from.min(I::from(self.len())); @@ -294,7 +287,7 @@ where T2: StoredType, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + source.version(), + Version::ZERO + self.0.version() + source.version(), )?; let index = max_from.min(I::from(self.len())); @@ -333,7 +326,7 @@ where T: From, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + multiplied.version() + multiplier.version(), + Version::ZERO + self.0.version() + multiplied.version() + multiplier.version(), )?; let index = max_from.min(I::from(self.len())); @@ -416,7 +409,7 @@ where T: From, { self.validate_computed_version_or_reset_file( - Version::ONE + self.inner.version() + divided.version() + divider.version(), + Version::ONE + self.0.version() + divided.version() + divider.version(), )?; let index = max_from.min(I::from(self.len())); @@ -453,7 +446,7 @@ where T: From, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + ath.version() + close.version(), + Version::ZERO + self.0.version() + ath.version() + close.version(), )?; let index = max_from.min(I::from(self.len())); @@ -483,12 +476,11 @@ where T: StoredIndex, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + other.version(), + Version::ZERO + self.0.version() + other.version(), )?; let index = max_from.min( - VecIterator::last(self.inner.into_iter()) - .map_or_else(T::default, |(_, v)| v.into_inner()), + VecIterator::last(self.0.into_iter()).map_or_else(T::default, |(_, v)| v.into_inner()), ); let mut prev_i = None; other.iter_at(index).try_for_each(|(v, i)| -> Result<()> { @@ -518,10 +510,7 @@ where T: StoredIndex, { self.validate_computed_version_or_reset_file( - Version::ZERO - + self.inner.version() - + first_indexes.version() - + indexes_count.version(), + Version::ZERO + self.0.version() + first_indexes.version() + indexes_count.version(), )?; let mut indexes_count_iter = indexes_count.iter(); @@ -613,10 +602,7 @@ where >::Error: error::Error + 'static, { self.validate_computed_version_or_reset_file( - Version::ZERO - + self.inner.version() - + first_indexes.version() - + other_to_else.version(), + Version::ZERO + self.0.version() + first_indexes.version() + other_to_else.version(), )?; let mut other_iter = first_indexes.iter(); @@ -654,10 +640,7 @@ where A: StoredIndex + StoredType, { self.validate_computed_version_or_reset_file( - Version::ZERO - + self.inner.version() - + self_to_other.version() - + other_to_self.version(), + Version::ZERO + self.0.version() + self_to_other.version() + other_to_self.version(), )?; let mut other_to_self_iter = other_to_self.iter(); @@ -686,10 +669,7 @@ where T2: StoredIndex + StoredType, { self.validate_computed_version_or_reset_file( - Version::ZERO - + self.inner.version() - + first_indexes.version() - + indexes_count.version(), + Version::ZERO + self.0.version() + first_indexes.version() + indexes_count.version(), )?; let mut indexes_count_iter = indexes_count.iter(); @@ -721,7 +701,7 @@ where T: From + Add, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + others.iter().map(|v| v.version()).sum(), + Version::ZERO + self.0.version() + others.iter().map(|v| v.version()).sum(), )?; if others.is_empty() { @@ -756,7 +736,7 @@ where T: From + Add + Ord, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + others.iter().map(|v| v.version()).sum(), + Version::ZERO + self.0.version() + others.iter().map(|v| v.version()).sum(), )?; if others.is_empty() { @@ -793,7 +773,7 @@ where T: From + Add + Ord, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + others.iter().map(|v| v.version()).sum(), + Version::ZERO + self.0.version() + others.iter().map(|v| v.version()).sum(), )?; if others.is_empty() { @@ -849,12 +829,13 @@ where f32: From + From, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + source.version(), + Version::ONE + self.0.version() + source.version(), )?; let index = max_from.min(I::from(self.len())); let mut prev = None; let min_prev_i = min_i.unwrap_or_default().unwrap_to_usize(); + let mut other_iter = source.iter(); source.iter_at(index).try_for_each(|(i, value)| { let value = value.into_inner(); @@ -867,11 +848,21 @@ where T::from(0.0) }); } - let len = (i.unwrap_to_usize() - min_prev_i + 1).min(sma); - let sma = T::from( - (f32::from(prev.clone().unwrap()) * (len - 1) as f32 + f32::from(value)) - / len as f32, - ); + + let processed_values_count = i.unwrap_to_usize() - min_prev_i + 1; + let len = (processed_values_count).min(sma); + + let value = f32::from(value); + + let sma = T::from(if processed_values_count > sma { + let prev_sum = f32::from(prev.clone().unwrap()) * len as f32; + let value_to_subtract = f32::from( + other_iter.unwrap_get_inner_(i.unwrap_to_usize().checked_sub(sma).unwrap()), + ); + (prev_sum - value_to_subtract + value) / len as f32 + } else { + (f32::from(prev.clone().unwrap()) * (len - 1) as f32 + value) / len as f32 + }); prev.replace(sma.clone()); self.forced_push_at(i, sma, exit) @@ -897,7 +888,7 @@ where T: From, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + source.version(), + Version::ZERO + self.0.version() + source.version(), )?; let index = max_from.min(I::from(self.len())); @@ -928,7 +919,7 @@ where T: CheckedSub + Default, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + source.version(), + Version::ZERO + self.0.version() + source.version(), )?; let index = max_from.min(I::from(self.len())); @@ -961,7 +952,7 @@ where T: From, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + source.version(), + Version::ZERO + self.0.version() + source.version(), )?; let index = max_from.min(I::from(self.len())); @@ -997,7 +988,7 @@ where T: From, { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + percentage_returns.version(), + Version::ZERO + self.0.version() + percentage_returns.version(), )?; if days % 365 != 0 { @@ -1056,7 +1047,7 @@ impl EagerVec { exit: &Exit, ) -> Result<()> { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + closes.version(), + Version::ZERO + self.0.version() + closes.version(), )?; let mut other_iter = closes.iter(); @@ -1105,7 +1096,7 @@ impl EagerVec { exit: &Exit, ) -> Result<()> { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + closes.version(), + Version::ZERO + self.0.version() + closes.version(), )?; let mut prev = None; @@ -1146,7 +1137,7 @@ impl EagerVec { exit: &Exit, ) -> Result<()> { self.validate_computed_version_or_reset_file( - Version::ONE + self.inner.version() + stacks.version(), + Version::ONE + self.0.version() + stacks.version(), )?; let index = max_from.min(DateIndex::from(self.len())); @@ -1177,7 +1168,7 @@ impl EagerVec { exit: &Exit, ) -> Result<()> { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + stacks.version(), + Version::ZERO + self.0.version() + stacks.version(), )?; let index = max_from.min(DateIndex::from(self.len())); @@ -1209,7 +1200,7 @@ where exit: &Exit, ) -> Result<()> { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + sats.version(), + Version::ZERO + self.0.version() + sats.version(), )?; let index = max_from.min(I::from(self.len())); @@ -1234,7 +1225,7 @@ where exit: &Exit, ) -> Result<()> { self.validate_computed_version_or_reset_file( - Version::ZERO + self.inner.version() + bitcoin.version(), + Version::ZERO + self.0.version() + bitcoin.version(), )?; let mut price_iter = price.iter(); @@ -1260,7 +1251,7 @@ where // ) -> Result<()> { // self.validate_computed_version_or_reset_file( // Version::ZERO -// + self.inner.version() +// + self.0.version() // + bitcoin.version() // + i_to_height.version() // + price.version(), @@ -1289,7 +1280,7 @@ where type IntoIter = StoredVecIterator<'a, I, T>; fn into_iter(self) -> Self::IntoIter { - self.inner.into_iter() + self.0.into_iter() } } @@ -1300,28 +1291,22 @@ where { #[inline] fn version(&self) -> Version { - self.computed_version - .load() - .or_else(|| { - dbg!(self.path()); - None - }) - .unwrap() + self.0.header().computed_version() } #[inline] fn name(&self) -> &str { - self.inner.name() + self.0.name() } #[inline] fn len(&self) -> usize { - self.inner.len() + self.0.len() } #[inline] fn modified_time(&self) -> Result { - self.inner.modified_time() + self.0.modified_time() } #[inline] @@ -1345,7 +1330,7 @@ where I: StoredIndex, T: StoredType + 'a, { - Box::new(self.inner.into_iter()) + Box::new(self.0.into_iter()) } } diff --git a/crates/brk_vec/src/variants/indexed.rs b/crates/brk_vec/src/variants/indexed.rs index ebe81f64d..99436dcf2 100644 --- a/crates/brk_vec/src/variants/indexed.rs +++ b/crates/brk_vec/src/variants/indexed.rs @@ -1,25 +1,17 @@ -use std::{ - cmp::Ordering, - fmt::Debug, - path::{Path, PathBuf}, - time::Duration, -}; +use std::{cmp::Ordering, fmt::Debug, path::Path, time::Duration}; use arc_swap::ArcSwap; use brk_core::{Error, Height, Result, Value, Version}; use crate::{ AnyCollectableVec, AnyIterableVec, AnyVec, BoxedVecIterator, CollectableVec, Format, - GenericStoredVec, Mmap, StoredIndex, StoredType, StoredVec, + GenericStoredVec, Header, Mmap, StoredIndex, StoredType, StoredVec, }; use super::StoredVecIterator; #[derive(Debug, Clone)] -pub struct IndexedVec { - height: Option, - inner: StoredVec, -} +pub struct IndexedVec(StoredVec); impl IndexedVec where @@ -28,26 +20,23 @@ where { pub fn forced_import( path: &Path, - value_name: &str, + name: &str, version: Version, format: Format, ) -> Result { - let inner = StoredVec::forced_import(path, value_name, version, format)?; - - Ok(Self { - height: Height::try_from(Self::path_height_(&inner.path()).as_path()).ok(), - inner, - }) + Ok(Self( + StoredVec::forced_import(path, name, version, format).unwrap(), + )) } #[inline] pub fn get_or_read(&self, index: I, mmap: &Mmap) -> Result>> { - self.inner.get_or_read(index, mmap) + self.0.get_or_read(index, mmap) } #[inline] pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { - let len = self.inner.len(); + let len = self.0.len(); match len.cmp(&index.to_usize()?) { Ordering::Greater => { // dbg!(len, index, &self.pathbuf); @@ -55,46 +44,42 @@ where Ok(()) } Ordering::Equal => { - self.inner.push(value); + self.0.push(value); Ok(()) } Ordering::Less => { - dbg!(index, value, len, self.path_height()); + dbg!(index, value, len, self.0.header()); Err(Error::IndexTooHigh) } } } + fn update_height(&mut self, height: Height) { + self.0.mut_header().update_height(height); + } + pub fn truncate_if_needed(&mut self, index: I, height: Height) -> Result<()> { - if self.height.is_none_or(|self_height| self_height != height) { - height.write(&self.path_height())?; - } - self.inner.truncate_if_needed(index)?; + self.update_height(height); + self.0.truncate_if_needed(index)?; Ok(()) } pub fn flush(&mut self, height: Height) -> Result<()> { - height.write(&self.path_height())?; - self.inner.flush() + self.update_height(height); + self.0.flush() + } + + pub fn header(&self) -> &Header { + self.0.header() } pub fn mmap(&self) -> &ArcSwap { - self.inner.mmap() + self.0.mmap() } #[inline] pub fn hasnt(&self, index: I) -> Result { - self.inner.has(index).map(|b| !b) - } - - pub fn height(&self) -> brk_core::Result { - Height::try_from(self.path_height().as_path()) - } - fn path_height(&self) -> PathBuf { - Self::path_height_(&self.inner.path()) - } - fn path_height_(path: &Path) -> PathBuf { - path.join("height") + self.0.has(index).map(|b| !b) } } @@ -105,22 +90,22 @@ where { #[inline] fn version(&self) -> Version { - self.inner.version() + self.0.version() } #[inline] fn name(&self) -> &str { - self.inner.name() + self.0.name() } #[inline] fn len(&self) -> usize { - self.inner.len() + self.0.len() } #[inline] fn modified_time(&self) -> Result { - self.inner.modified_time() + self.0.modified_time() } #[inline] @@ -135,7 +120,7 @@ where } pub trait AnyIndexedVec: AnyVec { - fn height(&self) -> brk_core::Result; + fn height(&self) -> Height; fn flush(&mut self, height: Height) -> Result<()>; } @@ -144,8 +129,8 @@ where I: StoredIndex, T: StoredType, { - fn height(&self) -> brk_core::Result { - self.height() + fn height(&self) -> Height { + self.0.header().height() } fn flush(&mut self, height: Height) -> Result<()> { @@ -162,7 +147,7 @@ where type IntoIter = StoredVecIterator<'a, I, T>; fn into_iter(self) -> Self::IntoIter { - self.inner.into_iter() + self.0.into_iter() } } diff --git a/crates/brk_vec/src/variants/raw.rs b/crates/brk_vec/src/variants/raw.rs index 0ee103b44..d8c565896 100644 --- a/crates/brk_vec/src/variants/raw.rs +++ b/crates/brk_vec/src/variants/raw.rs @@ -1,5 +1,6 @@ use std::{ - fs, + fs::{self, File}, + io, marker::PhantomData, mem, path::{Path, PathBuf}, @@ -14,14 +15,17 @@ use rayon::prelude::*; use crate::{ AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec, - GenericStoredVec, StoredIndex, StoredType, UnsafeSlice, + Format, GenericStoredVec, HEADER_OFFSET, Header, StoredIndex, StoredType, UnsafeSlice, }; +const VERSION: Version = Version::ONE; + #[derive(Debug)] pub struct RawVec { - version: Version, + header: Header, parent: PathBuf, name: String, + file: Option, // Consider Arc>> for dataraces when reorg ? mmap: Arc>, pushed: Vec, @@ -34,40 +38,58 @@ where T: StoredType, { /// Same as import but will reset the folder under certain errors, so be careful ! - pub fn forced_import(path: &Path, name: &str, version: Version) -> Result { - let res = Self::import(path, name, version); + pub fn forced_import(parent: &Path, name: &str, mut version: Version) -> Result { + version = version + VERSION; + let res = Self::import(parent, name, version); match res { - Err(Error::WrongEndian) | Err(Error::DifferentVersion { .. }) => { - fs::remove_dir_all(path)?; - Self::import(path, name, version) - } + // Err(Error::DifferentCompressionMode) + // | Err(Error::WrongEndian) + // | Err(Error::WrongLength) + // | Err(Error::DifferentVersion { .. }) => { + // let path = Self::path_(parent, name); + // fs::remove_file(path)?; + // Self::import(parent, name, version) + // } _ => res, } } - pub fn import(path: &Path, name: &str, version: Version) -> Result { - let (version, mmap) = { - let path = path.join(name).join(I::to_string()); - - fs::create_dir_all(&path)?; - - let version_path = Self::path_version_(&path); - - if !version.validate(version_path.as_ref())? { - version.write(version_path.as_ref())?; + pub fn import(parent: &Path, name: &str, version: Version) -> Result { + let path = Self::path_(parent, name); + let (file, mmap, header) = match Self::open_file_(&path) { + Ok(mut file) => { + if file.metadata()?.len() == 0 { + let header = Header::create_and_write(&mut file, version, Format::Raw)?; + let mmap = Self::new_mmap(&file)?; + (file, mmap, header) + } else { + let mmap = Self::new_mmap(&file)?; + // dbg!(&mmap[..]); + let header = Header::import_and_verify(&mmap, version, Format::Raw)?; + // dbg!((&header, name, I::to_string())); + (file, mmap, header) + } } - - let file = Self::open_file_(Self::path_vec_(&path).as_path())?; - let mmap = Arc::new(ArcSwap::new(Self::new_mmap(file)?)); - - (version, mmap) + Err(e) => match e.kind() { + io::ErrorKind::NotFound => { + fs::create_dir_all(Self::folder_(parent, name))?; + let mut file = Self::open_file_(&path)?; + let header = Header::create_and_write(&mut file, version, Format::Raw)?; + let mmap = Self::new_mmap(&file)?; + (file, mmap, header) + } + _ => return Err(e.into()), + }, }; + let mmap = Arc::new(ArcSwap::new(mmap)); + Ok(Self { mmap, - version, + header, + file: Some(file), name: name.to_string(), - parent: path.to_owned(), + parent: parent.to_owned(), pushed: vec![], phantom: PhantomData, }) @@ -89,6 +111,10 @@ where iter.set_(i); iter } + + pub fn write_header_if_needed(&mut self) -> io::Result<()> { + self.header.write_if_needed(self.file.as_mut().unwrap()) + } } impl GenericStoredVec for RawVec @@ -98,25 +124,43 @@ where { #[inline] fn read_(&self, index: usize, mmap: &Mmap) -> Result> { - let index = index * Self::SIZE_OF_T; + let index = index * Self::SIZE_OF_T + HEADER_OFFSET; let slice = &mmap[index..(index + Self::SIZE_OF_T)]; T::try_read_from_bytes(slice) .map(|v| Some(v)) .map_err(Error::from) } + fn header(&self) -> &Header { + &self.header + } + + fn mut_header(&mut self) -> &mut Header { + &mut self.header + } + #[inline] fn mmap(&self) -> &ArcSwap { &self.mmap } + #[inline] + fn file(&self) -> &File { + self.file.as_ref().unwrap() + } + + #[inline] + fn mut_file(&mut self) -> &mut File { + self.file.as_mut().unwrap() + } + #[inline] fn stored_len(&self) -> usize { self.stored_len_(&self.mmap.load()) } #[inline] fn stored_len_(&self, mmap: &Mmap) -> usize { - mmap.len() / Self::SIZE_OF_T + (mmap.len() - HEADER_OFFSET) / Self::SIZE_OF_T } #[inline] @@ -129,11 +173,13 @@ where } #[inline] - fn path(&self) -> PathBuf { - self.parent.join(self.name()).join(I::to_string()) + fn parent(&self) -> &Path { + &self.parent } fn flush(&mut self) -> Result<()> { + self.write_header_if_needed()?; + let pushed_len = self.pushed_len(); if pushed_len == 0 { @@ -172,12 +218,16 @@ where return Ok(()); } - let len = index * Self::SIZE_OF_T; + let len = index * Self::SIZE_OF_T + HEADER_OFFSET; self.file_set_len(len as u64)?; Ok(()) } + + fn reset(&mut self) -> Result<()> { + self.reset_() + } } impl AnyVec for RawVec @@ -187,12 +237,12 @@ where { #[inline] fn version(&self) -> Version { - self.version + self.header.vec_version() } #[inline] fn name(&self) -> &str { - self.name.as_str() + &self.name } #[inline] @@ -219,9 +269,10 @@ where impl Clone for RawVec { fn clone(&self) -> Self { Self { - version: self.version, + header: self.header.clone(), parent: self.parent.clone(), name: self.name.clone(), + file: None, mmap: self.mmap.clone(), pushed: vec![], phantom: PhantomData, diff --git a/crates/brk_vec/src/variants/stored.rs b/crates/brk_vec/src/variants/stored.rs index 7e9493994..20874aabb 100644 --- a/crates/brk_vec/src/variants/stored.rs +++ b/crates/brk_vec/src/variants/stored.rs @@ -1,4 +1,5 @@ use std::{ + fs::File, path::{Path, PathBuf}, time::Duration, }; @@ -9,7 +10,7 @@ use memmap2::Mmap; use crate::{ AnyCollectableVec, AnyIterableVec, AnyVec, BaseVecIterator, BoxedVecIterator, CollectableVec, - Format, GenericStoredVec, StoredIndex, StoredType, + Format, GenericStoredVec, Header, StoredIndex, StoredType, }; use super::{CompressedVec, CompressedVecIterator, RawVec, RawVecIterator}; @@ -31,8 +32,6 @@ where version: Version, format: Format, ) -> Result { - // let path = I::path(path, value_name); - if version == Version::ZERO { dbg!(path, name); panic!("Version must be at least 1, can't verify endianess otherwise"); @@ -61,6 +60,22 @@ where } } + #[inline] + fn header(&self) -> &Header { + match self { + StoredVec::Raw(v) => v.header(), + StoredVec::Compressed(v) => v.header(), + } + } + + #[inline] + fn mut_header(&mut self) -> &mut Header { + match self { + StoredVec::Raw(v) => v.mut_header(), + StoredVec::Compressed(v) => v.mut_header(), + } + } + #[inline] fn mmap(&self) -> &ArcSwap { match self { @@ -69,6 +84,30 @@ where } } + #[inline] + fn parent(&self) -> &Path { + match self { + StoredVec::Raw(v) => v.parent(), + StoredVec::Compressed(v) => v.parent(), + } + } + + #[inline] + fn file(&self) -> &File { + match self { + StoredVec::Raw(v) => v.file(), + StoredVec::Compressed(v) => v.file(), + } + } + + #[inline] + fn mut_file(&mut self) -> &mut File { + match self { + StoredVec::Raw(v) => v.mut_file(), + StoredVec::Compressed(v) => v.mut_file(), + } + } + #[inline] fn stored_len(&self) -> usize { match self { @@ -120,6 +159,13 @@ where StoredVec::Compressed(v) => v.truncate_if_needed(index), } } + + fn reset(&mut self) -> Result<()> { + match self { + StoredVec::Raw(v) => v.reset(), + StoredVec::Compressed(v) => v.reset(), + } + } } impl AnyVec for StoredVec