global: snapshot

This commit is contained in:
nym21
2025-03-01 15:22:34 +01:00
parent 1b93ccf608
commit 6d7ff38cf2
40 changed files with 936 additions and 768 deletions

105
Cargo.lock generated
View File

@@ -117,9 +117,9 @@ checksum = "7330592adf847ee2e3513587b4db2db410a0d751378654e7e993d9adcbe5c795"
[[package]]
name = "async-compression"
version = "0.4.18"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522"
checksum = "310c9bcae737a48ef5cdee3174184e6d548b292739ede61a1f955ef76a738861"
dependencies = [
"brotli",
"flate2",
@@ -318,9 +318,9 @@ dependencies = [
[[package]]
name = "bitflags"
version = "2.8.0"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36"
checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
[[package]]
name = "brk"
@@ -333,6 +333,7 @@ dependencies = [
"brk_indexer",
"brk_logger",
"brk_parser",
"brk_query",
"brk_server",
"brk_vec",
]
@@ -340,6 +341,17 @@ dependencies = [
[[package]]
name = "brk_cli"
version = "0.0.2"
dependencies = [
"brk_computer",
"brk_core",
"brk_exit",
"brk_indexer",
"brk_logger",
"brk_parser",
"brk_query",
"brk_server",
"clap",
]
[[package]]
name = "brk_computer"
@@ -370,7 +382,7 @@ dependencies = [
"rlimit",
"serde",
"serde_bytes",
"zerocopy 0.8.20",
"zerocopy 0.8.21",
]
[[package]]
@@ -410,7 +422,7 @@ dependencies = [
"fjall",
"log",
"rayon",
"zerocopy 0.8.20",
"zerocopy 0.8.21",
]
[[package]]
@@ -434,7 +446,16 @@ dependencies = [
"rayon",
"serde",
"serde_json",
"zerocopy 0.8.20",
"zerocopy 0.8.21",
]
[[package]]
name = "brk_query"
version = "0.0.2"
dependencies = [
"brk_computer",
"brk_indexer",
"clap",
]
[[package]]
@@ -443,8 +464,10 @@ version = "0.0.2"
dependencies = [
"axum",
"brk_computer",
"brk_exit",
"brk_indexer",
"brk_logger",
"brk_parser",
"brk_vec",
"color-eyre",
"derive_deref",
@@ -466,7 +489,7 @@ dependencies = [
"rayon",
"serde",
"serde_json",
"zerocopy 0.8.20",
"zerocopy 0.8.21",
]
[[package]]
@@ -528,9 +551,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.15"
version = "1.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c736e259eea577f443d5c86c304f9f4ae0295c43f3ba05c21f1d66b5f06001af"
checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c"
dependencies = [
"jobserver",
"libc",
@@ -549,6 +572,46 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "clap"
version = "4.5.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.5.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.5.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.98",
]
[[package]]
name = "clap_lex"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
[[package]]
name = "color-eyre"
version = "0.6.3"
@@ -934,6 +997,12 @@ dependencies = [
"allocator-api2",
]
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hex-conservative"
version = "0.2.1"
@@ -2225,6 +2294,12 @@ version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d08889ec5408683408db66ad89e0e1f93dff55c73a4ccc71c427d5b277ee47e6"
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "syn"
version = "1.0.109"
@@ -2646,11 +2721,11 @@ dependencies = [
[[package]]
name = "zerocopy"
version = "0.8.20"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dde3bb8c68a8f3f1ed4ac9221aad6b10cece3e60a8e2ea54a6a2dec806d0084c"
checksum = "dcf01143b2dd5d134f11f545cf9f1431b13b749695cb33bcce051e7568f99478"
dependencies = [
"zerocopy-derive 0.8.20",
"zerocopy-derive 0.8.21",
]
[[package]]
@@ -2666,9 +2741,9 @@ dependencies = [
[[package]]
name = "zerocopy-derive"
version = "0.8.20"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eea57037071898bf96a6da35fd626f4f27e9cee3ead2a6c703cf09d472b2e700"
checksum = "712c8386f4f4299382c9abee219bee7084f78fb939d88b6840fcc1320d5f6da2"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -17,9 +17,11 @@ brk_fetcher = { version = "0", path = "crates/brk_fetcher" }
brk_indexer = { version = "0", path = "crates/brk_indexer" }
brk_logger = { version = "0", path = "crates/brk_logger" }
brk_parser = { version = "0", path = "crates/brk_parser" }
brk_query = { version = "0", path = "crates/brk_query" }
brk_server = { version = "0", path = "crates/brk_server" }
brk_vec = { version = "0", path = "crates/brk_vec", features = ["json"] }
brk_vec = { version = "0", path = "crates/brk_vec" }
byteview = "0.5.4"
clap = { version = "4.5.31", features = ["derive"] }
color-eyre = "0.6.3"
derive_deref = "1.1.1"
fjall = "2.6.7"
@@ -29,4 +31,4 @@ minreq = { version = "2.13.2", features = ["https", "serde_json"] }
rayon = "1.10.0"
serde = { version = "1.0.218", features = ["derive"] }
serde_json = { version = "1.0.139", features = ["float_roundtrip"] }
zerocopy = { version = "0.8.20", features = ["derive"] }
zerocopy = { version = "0.8.21", features = ["derive"] }

View File

@@ -63,6 +63,12 @@ A very fast Bitcoin Core block parser and iterator built on top of bitcoin-rust.
> Status: ✅
### `brk_query`
A library that finds requested datasets.
> Status: ⚠️
### `brk_server`
A server that serves Bitcoin data and swappable front-ends, built on top of brk_indexer, brk_fetcher and brk_computer.
@@ -117,6 +123,11 @@ Feel free to open an issue if you want to add another instance
## Setup
### Hardware
- Last base model Mac mini
- External SSD
### Requirements
- At least 16 GB of RAM
@@ -124,12 +135,8 @@ Feel free to open an issue if you want to add another instance
- Recommended: Rated at 3 GB/s (Thunderbolt 4 speed)
- A running instance of bitcoin-core
- Example: `bitcoind -datadir="$HOME/.bitcoin" -blocksonly`
- Git
- Unix based operating system (Mac OS or Linux)
- Ubuntu users need to install `open-ssl` via `sudo apt install libssl-dev pkg-config`
- Mac OS:
- Disable Spotlight or exclude the `--kibodir` folder from it
- Don't use Time Machine or exclude the `--kibodir` folder (especially needed for local snapshots)
### Build

View File

@@ -16,6 +16,7 @@ full = [
"indexer",
"logger",
"parser",
"query",
"server",
"vec",
]
@@ -26,6 +27,7 @@ fetcher = ["brk_fetcher"]
indexer = ["brk_indexer"]
logger = ["brk_logger"]
parser = ["brk_parser"]
query = ["brk_query"]
server = ["brk_server"]
vec = ["brk_vec"]
@@ -37,6 +39,7 @@ brk_fetcher = { workspace = true, optional = true }
brk_indexer = { workspace = true, optional = true }
brk_logger = { workspace = true, optional = true }
brk_parser = { workspace = true, optional = true }
brk_query = { workspace = true, optional = true }
brk_server = { workspace = true, optional = true }
brk_vec = { workspace = true, optional = true }

View File

@@ -42,6 +42,12 @@ pub mod parser {
pub use brk_parser::*;
}
#[cfg(feature = "query")]
pub mod query {
#[doc(inline)]
pub use brk_query::*;
}
#[cfg(feature = "server")]
pub mod server {
#[doc(inline)]

View File

@@ -7,3 +7,12 @@ license.workspace = true
repository.workspace = true
[dependencies]
brk_core = { workspace = true }
brk_computer = { workspace = true }
brk_exit = { workspace = true }
brk_indexer = { workspace = true }
brk_logger = { workspace = true }
brk_parser = { workspace = true }
brk_query = { workspace = true }
brk_server = { workspace = true }
clap = { workspace = true }

View File

@@ -1,2 +0,0 @@
#![doc = include_str!("../README.md")]

View File

@@ -1,3 +1,40 @@
fn main() {
println!("Hello, world!");
use clap::{Args, Parser, Subcommand};
#[derive(Parser)]
#[command(version, about)]
#[command(propagate_version = true)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Run(RunArgs),
Query(QueryArgs),
}
#[derive(Args)]
struct RunArgs {
name: Option<String>,
}
#[derive(Args)]
struct QueryArgs {
name: Option<String>,
}
fn main() {
let cli = Cli::parse();
// You can check for the existence of subcommands, and if found use their
// matches just as you would the top level cmd
match &cli.command {
Commands::Run(name) => {
println!("'myapp add' was used, name is: {:?}", name.name);
}
Commands::Query(name) => {
println!("'myapp add' was used, name is: {:?}", name.name);
}
}
}

View File

@@ -1,27 +1,32 @@
#![doc = include_str!("../README.md")]
#![doc = "\n## Example\n\n```rust"]
#![doc = include_str!("main.rs")]
#![doc = "```"]
use std::path::{Path, PathBuf};
use brk_exit::Exit;
use brk_indexer::Indexer;
use brk_indexer::{Indexer, Indexes};
pub use brk_parser::rpc;
mod storage;
use brk_core::Date;
use brk_vec::SINGLE_THREAD;
use storage::{Stores, Vecs};
pub struct Computer<const MODE: u8> {
#[derive(Clone)]
pub struct Computer {
path: PathBuf,
pub vecs: Vecs<MODE>,
pub vecs: Vecs,
pub stores: Stores,
}
impl<const MODE: u8> Computer<MODE> {
impl Computer {
pub fn import(computed_dir: &Path) -> color_eyre::Result<Self> {
let vecs = Vecs::import(&computed_dir.join("vecs"))?;
let stores = Stores::import(&computed_dir.join("fjall"))?;
let stores = Stores::import(&computed_dir.join("stores"))?;
Ok(Self {
path: computed_dir.to_owned(),
vecs,
@@ -30,8 +35,8 @@ impl<const MODE: u8> Computer<MODE> {
}
}
impl Computer<SINGLE_THREAD> {
pub fn compute(&mut self, mut indexer: Indexer<SINGLE_THREAD>, exit: &Exit) -> color_eyre::Result<()> {
impl Computer {
pub fn compute(&mut self, indexer: &mut Indexer, starting_indexes: Indexes, exit: &Exit) -> color_eyre::Result<()> {
let height_count = indexer.vecs.height_to_size.len();
let txindexes_count = indexer.vecs.txindex_to_txid.len();
let txinindexes_count = indexer.vecs.txinindex_to_txoutindex.len();
@@ -39,53 +44,61 @@ impl Computer<SINGLE_THREAD> {
// TODO: Remove all outdated
self.vecs.txindex_to_last_txinindex.compute_last_index_from_first(
&mut indexer.vecs.txindex_to_first_txinindex,
txinindexes_count,
exit,
)?;
// self.vecs.txindex_to_last_txinindex.compute_last_index_from_first(
// starting_indexes.txindex,
// &mut indexer.vecs.txindex_to_first_txinindex,
// txinindexes_count,
// exit,
// )?;
self.vecs.txindex_to_inputs_count.compute_count_from_indexes(
&mut indexer.vecs.txindex_to_first_txinindex,
&mut self.vecs.txindex_to_last_txinindex,
exit,
)?;
// self.vecs.txindex_to_inputs_count.compute_count_from_indexes(
// starting_indexes.txindex,
// &mut indexer.vecs.txindex_to_first_txinindex,
// &mut self.vecs.txindex_to_last_txinindex,
// exit,
// )?;
self.vecs.txindex_to_last_txoutindex.compute_last_index_from_first(
&mut indexer.vecs.txindex_to_first_txoutindex,
txoutindexes_count,
exit,
)?;
// self.vecs.txindex_to_last_txoutindex.compute_last_index_from_first(
// starting_indexes.txindex,
// &mut indexer.vecs.txindex_to_first_txoutindex,
// txoutindexes_count,
// exit,
// )?;
self.vecs.txindex_to_outputs_count.compute_count_from_indexes(
&mut indexer.vecs.txindex_to_first_txoutindex,
&mut self.vecs.txindex_to_last_txoutindex,
exit,
)?;
// self.vecs.txindex_to_outputs_count.compute_count_from_indexes(
// starting_indexes.txindex,
// &mut indexer.vecs.txindex_to_first_txoutindex,
// &mut self.vecs.txindex_to_last_txoutindex,
// exit,
// )?;
self.vecs.height_to_date.compute_transform(
starting_indexes.height,
&mut indexer.vecs.height_to_timestamp,
|timestamp| Date::from(*timestamp),
exit,
)?;
self.vecs.height_to_last_txindex.compute_last_index_from_first(
&mut indexer.vecs.height_to_first_txindex,
height_count,
exit,
)?;
// self.vecs.height_to_last_txindex.compute_last_index_from_first(
// starting_indexes.height,
// &mut indexer.vecs.height_to_first_txindex,
// height_count,
// exit,
// )?;
self.vecs.txindex_to_height.compute_inverse_less_to_more(
&mut indexer.vecs.height_to_first_txindex,
&mut self.vecs.height_to_last_txindex,
exit,
)?;
// self.vecs.txindex_to_height.compute_inverse_less_to_more(
// starting_indexes.height,
// &mut indexer.vecs.height_to_first_txindex,
// &mut self.vecs.height_to_last_txindex,
// exit,
// )?;
self.vecs.txindex_to_is_coinbase.compute_is_first_ordered(
&mut self.vecs.txindex_to_height,
&mut indexer.vecs.height_to_first_txindex,
exit,
)?;
// self.vecs.txindex_to_is_coinbase.compute_is_first_ordered(
// starting_indexes.txindex,
// &mut self.vecs.txindex_to_height,
// &mut indexer.vecs.height_to_first_txindex,
// exit,
// )?;
// self.vecs.txindex_to_fee.compute_transform(
// &mut self.vecs.txindex_to_height,
@@ -96,9 +109,9 @@ impl Computer<SINGLE_THREAD> {
// self.vecs.height_to_dateindex.compute(...)
self.vecs
.dateindex_to_first_height
.compute_inverse_more_to_less(&mut self.vecs.height_to_dateindex, exit)?;
// self.vecs
// .dateindex_to_first_height
// .compute_inverse_more_to_less(&mut self.vecs.height_to_dateindex, exit)?;
// ---
// Date to X

View File

@@ -7,7 +7,6 @@ use brk_parser::{
Parser,
rpc::{self, RpcApi},
};
use brk_vec::CACHED_GETS;
use log::info;
pub fn main() -> color_eyre::Result<()> {
@@ -26,25 +25,26 @@ pub fn main() -> color_eyre::Result<()> {
let outputs_dir = Path::new("../../_outputs");
let indexer: Indexer<CACHED_GETS> = Indexer::import(&outputs_dir.join("indexes"))?;
let mut indexer = Indexer::import(&outputs_dir.join("indexed"))?;
// let mut computer = Computer::import(&outputs_dir.join("computed"))?;
let mut computer = Computer::import(&outputs_dir.join("computed"))?;
// loop {
// let block_count = rpc.get_block_count()?;
loop {
let block_count = rpc.get_block_count()?;
// info!("{block_count} blocks found.");
info!("{block_count} blocks found.");
// indexer.index(&parser, rpc, &exit)?;
let starting_indexes = indexer.index(&parser, rpc, &exit)?;
// computer.compute(indexer, &exit)?;
computer.compute(&mut indexer, starting_indexes, &exit)?;
// info!("Waiting for new blocks...");
info!("Waiting for new blocks...");
// while block_count == rpc.get_block_count()? {
// sleep(Duration::from_secs(1))
// }
// }
while block_count == rpc.get_block_count()? {
sleep(Duration::from_secs(1))
}
}
#[allow(unreachable_code)]
Ok(())
}

View File

@@ -4,6 +4,7 @@ use brk_core::{AddressindexTxoutindex, Unit};
use brk_indexer::Store;
use brk_vec::Version;
#[derive(Clone)]
pub struct Stores {
pub address_to_utxos_received: Store<AddressindexTxoutindex, Unit>,
pub address_to_utxos_spent: Store<AddressindexTxoutindex, Unit>,

View File

@@ -4,59 +4,60 @@ use brk_core::{
Addressindex, Cents, Close, Date, Dateindex, Dollars, Feerate, Height, High, Low, Open, Sats, Timestamp, Txindex,
Txinindex, Txoutindex,
};
use brk_vec::{StorableVec, Version};
use brk_vec::{AnyStorableVec, StorableVec, Version};
// mod base;
// use base::*;
pub struct Vecs<const MODE: u8> {
pub dateindex_to_first_height: StorableVec<Dateindex, Height, MODE>,
// pub dateindex_to_last_height: StorableVec<Dateindex, Height, MODE>,
// pub height_to_block_interval: StorableVec<Height, Timestamp, MODE>,
pub dateindex_to_close_in_cents: StorableVec<Dateindex, Close<Cents>, MODE>,
pub dateindex_to_close_in_dollars: StorableVec<Dateindex, Close<Dollars>, MODE>,
pub dateindex_to_high_in_cents: StorableVec<Dateindex, High<Cents>, MODE>,
pub dateindex_to_high_in_dollars: StorableVec<Dateindex, High<Dollars>, MODE>,
pub dateindex_to_low_in_cents: StorableVec<Dateindex, Low<Cents>, MODE>,
pub dateindex_to_low_in_dollars: StorableVec<Dateindex, Low<Dollars>, MODE>,
pub dateindex_to_open_in_cents: StorableVec<Dateindex, Open<Cents>, MODE>,
pub dateindex_to_open_in_dollars: StorableVec<Dateindex, Open<Dollars>, MODE>,
pub height_to_close_in_cents: StorableVec<Height, Close<Cents>, MODE>,
pub height_to_close_in_dollars: StorableVec<Height, Close<Dollars>, MODE>,
pub height_to_high_in_cents: StorableVec<Height, High<Cents>, MODE>,
pub height_to_high_in_dollars: StorableVec<Height, High<Dollars>, MODE>,
pub height_to_low_in_cents: StorableVec<Height, Low<Cents>, MODE>,
pub height_to_low_in_dollars: StorableVec<Height, Low<Dollars>, MODE>,
pub height_to_open_in_cents: StorableVec<Height, Open<Cents>, MODE>,
pub height_to_open_in_dollars: StorableVec<Height, Open<Dollars>, MODE>,
pub height_to_date: StorableVec<Height, Date, MODE>,
pub height_to_dateindex: StorableVec<Height, Dateindex, MODE>,
// pub height_to_fee: StorableVec<Txindex, Amount, MODE>,
// pub height_to_inputcount: StorableVec<Height, u32, MODE>,
// pub height_to_last_addressindex: StorableVec<Height, Addressindex, MODE>,
pub height_to_last_txindex: StorableVec<Height, Txindex, MODE>,
// pub height_to_last_txoutindex: StorableVec<Height, Txoutindex, MODE>,
// pub height_to_maxfeerate: StorableVec<Height, Feerate, MODE>,
// pub height_to_medianfeerate: StorableVec<Height, Feerate, MODE>,
// pub height_to_minfeerate: StorableVec<Height, Feerate, MODE>,
// pub height_to_outputcount: StorableVec<Height, u32, MODE>,
// pub height_to_subsidy: StorableVec<Height, u32, MODE>,
// pub height_to_totalfees: StorableVec<Height, Amount, MODE>,
// pub height_to_txcount: StorableVec<Height, u32, MODE>,
pub txindex_to_fee: StorableVec<Txindex, Sats, MODE>,
pub txindex_to_height: StorableVec<Txindex, Height, MODE>,
pub txindex_to_is_coinbase: StorableVec<Txindex, bool, MODE>,
// pub txindex_to_feerate: StorableVec<Txindex, Feerate, MODE>,
pub txindex_to_inputs_count: StorableVec<Txindex, u32, MODE>,
pub txindex_to_inputs_sum: StorableVec<Txindex, Sats, MODE>,
pub txindex_to_last_txinindex: StorableVec<Txindex, Txinindex, MODE>,
pub txindex_to_last_txoutindex: StorableVec<Txindex, Txoutindex, MODE>,
pub txindex_to_outputs_count: StorableVec<Txindex, u32, MODE>,
pub txindex_to_outputs_sum: StorableVec<Txindex, Sats, MODE>,
#[derive(Clone)]
pub struct Vecs {
pub dateindex_to_first_height: StorableVec<Dateindex, Height>,
// pub dateindex_to_last_height: StorableVec<Dateindex, Height>,
// pub height_to_block_interval: StorableVec<Height, Timestamp>,
pub dateindex_to_close_in_cents: StorableVec<Dateindex, Close<Cents>>,
pub dateindex_to_close_in_dollars: StorableVec<Dateindex, Close<Dollars>>,
pub dateindex_to_high_in_cents: StorableVec<Dateindex, High<Cents>>,
pub dateindex_to_high_in_dollars: StorableVec<Dateindex, High<Dollars>>,
pub dateindex_to_low_in_cents: StorableVec<Dateindex, Low<Cents>>,
pub dateindex_to_low_in_dollars: StorableVec<Dateindex, Low<Dollars>>,
pub dateindex_to_open_in_cents: StorableVec<Dateindex, Open<Cents>>,
pub dateindex_to_open_in_dollars: StorableVec<Dateindex, Open<Dollars>>,
pub height_to_close_in_cents: StorableVec<Height, Close<Cents>>,
pub height_to_close_in_dollars: StorableVec<Height, Close<Dollars>>,
pub height_to_high_in_cents: StorableVec<Height, High<Cents>>,
pub height_to_high_in_dollars: StorableVec<Height, High<Dollars>>,
pub height_to_low_in_cents: StorableVec<Height, Low<Cents>>,
pub height_to_low_in_dollars: StorableVec<Height, Low<Dollars>>,
pub height_to_open_in_cents: StorableVec<Height, Open<Cents>>,
pub height_to_open_in_dollars: StorableVec<Height, Open<Dollars>>,
pub height_to_date: StorableVec<Height, Date>,
pub height_to_dateindex: StorableVec<Height, Dateindex>,
// pub height_to_fee: StorableVec<Txindex, Amount>,
// pub height_to_inputcount: StorableVec<Height, u32>,
// pub height_to_last_addressindex: StorableVec<Height, Addressindex>,
pub height_to_last_txindex: StorableVec<Height, Txindex>,
// pub height_to_last_txoutindex: StorableVec<Height, Txoutindex>,
// pub height_to_maxfeerate: StorableVec<Height, Feerate>,
// pub height_to_medianfeerate: StorableVec<Height, Feerate>,
// pub height_to_minfeerate: StorableVec<Height, Feerate>,
// pub height_to_outputcount: StorableVec<Height, u32>,
// pub height_to_subsidy: StorableVec<Height, u32>,
// pub height_to_totalfees: StorableVec<Height, Amount>,
// pub height_to_txcount: StorableVec<Height, u32>,
pub txindex_to_fee: StorableVec<Txindex, Sats>,
pub txindex_to_height: StorableVec<Txindex, Height>,
pub txindex_to_is_coinbase: StorableVec<Txindex, bool>,
// pub txindex_to_feerate: StorableVec<Txindex, Feerate>,
pub txindex_to_inputs_count: StorableVec<Txindex, u32>,
pub txindex_to_inputs_sum: StorableVec<Txindex, Sats>,
pub txindex_to_last_txinindex: StorableVec<Txindex, Txinindex>,
pub txindex_to_last_txoutindex: StorableVec<Txindex, Txoutindex>,
pub txindex_to_outputs_count: StorableVec<Txindex, u32>,
pub txindex_to_outputs_sum: StorableVec<Txindex, Sats>,
}
impl<const MODE: u8> Vecs<MODE> {
impl Vecs {
pub fn import(path: &Path) -> color_eyre::Result<Self> {
fs::create_dir_all(path)?;
@@ -148,45 +149,24 @@ impl<const MODE: u8> Vecs<MODE> {
})
}
// pub fn as_slice(&self) -> [&dyn AnyComputedStorableVec; 1] {
// [
// &self.dateindex_to_close_in_cents as &dyn AnyJsonStorableVec,
// &self.dateindex_to_close_in_dollars,
// &self.dateindex_to_high_in_cents,
// &self.dateindex_to_high_in_dollars,
// &self.dateindex_to_low_in_cents,
// &self.dateindex_to_low_in_dollars,
// &self.dateindex_to_open_in_cents,
// &self.dateindex_to_open_in_dollars,
// &self.height_to_close_in_cents,
// &self.height_to_close_in_dollars,
// &self.height_to_high_in_cents,
// &self.height_to_high_in_dollars,
// &self.height_to_low_in_cents,
// &self.height_to_low_in_dollars,
// &self.height_to_open_in_cents,
// &self.height_to_open_in_dollars,
// ]
// }
// pub fn as_mut_slice(&mut self) -> [&mut dyn AnyComputedStorableVec; 1] {
// [
// &mut self.dateindex_to_close_in_cents as &mut dyn AnyStorableVec,
// &mut self.dateindex_to_close_in_dollars,
// &mut self.dateindex_to_high_in_cents,
// &mut self.dateindex_to_high_in_dollars,
// &mut self.dateindex_to_low_in_cents,
// &mut self.dateindex_to_low_in_dollars,
// &mut self.dateindex_to_open_in_cents,
// &mut self.dateindex_to_open_in_dollars,
// &mut self.height_to_close_in_cents,
// &mut self.height_to_close_in_dollars,
// &mut self.height_to_high_in_cents,
// &mut self.height_to_high_in_dollars,
// &mut self.height_to_low_in_cents,
// &mut self.height_to_low_in_dollars,
// &mut self.height_to_open_in_cents,
// &mut self.height_to_open_in_dollars,
// ]
// }
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
vec![
&self.height_to_date as &dyn AnyStorableVec,
// &self.dateindex_to_close_in_dollars,
// &self.dateindex_to_high_in_cents,
// &self.dateindex_to_high_in_dollars,
// &self.dateindex_to_low_in_cents,
// &self.dateindex_to_low_in_dollars,
// &self.dateindex_to_open_in_cents,
// &self.dateindex_to_open_in_dollars,
// &self.height_to_close_in_cents,
// &self.height_to_close_in_dollars,
// &self.height_to_high_in_cents,
// &self.height_to_high_in_dollars,
// &self.height_to_low_in_cents,
// &self.height_to_low_in_dollars,
// &self.height_to_open_in_cents,
// &self.height_to_open_in_dollars,
]
}
}

View File

@@ -1,9 +1,12 @@
use jiff::{Span, civil::Date as Date_, tz::TimeZone};
use serde::Serialize;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use super::{Dateindex, Timestamp};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, Immutable, IntoBytes, KnownLayout)]
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, Immutable, IntoBytes, KnownLayout, Serialize,
)]
pub struct Date(u32);
impl Date {

View File

@@ -1,5 +1,6 @@
use std::ops::Add;
use serde::Serialize;
// use color_eyre::eyre::eyre;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
@@ -7,7 +8,9 @@ use crate::Error;
use super::Date;
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, Immutable, IntoBytes, KnownLayout)]
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, Immutable, IntoBytes, KnownLayout, Serialize,
)]
pub struct Dateindex(u16);
impl From<Dateindex> for usize {

View File

@@ -1,4 +1,7 @@
#![doc = include_str!("../README.md")]
#![doc = "\n## Example\n\n```rust"]
#![doc = include_str!("main.rs")]
#![doc = "```"]
use std::{collections::BTreeMap, fs, path::Path};

View File

@@ -14,7 +14,7 @@ Vecs are used sparingly instead of stores for multiple reasons:
Storage wise, the expected overhead should be around 30% of the chain itself.
Peaks at 11-13 GB of RAM
Peaks at 5 GB of RAM
## Outputs
@@ -27,3 +27,12 @@ Stores: `src/storage/stores/mod.rs`
Rust: `src/main.rs`
Python: `../python/parse.py`
## Benchmark
Indexing `0..885_835` took `11 hours 6 min 50 s` on a Macbook Pro M3 Pro with 36 GB of RAM
`footprint` report:
- Peak memory: `5115 MB`
- Memory while waiting for a new block: `890 MB`
- Reclaimable memory: `6478 MB`

View File

@@ -4,7 +4,6 @@ use brk_core::{
P2SHindex, P2TRindex, P2WPKHindex, P2WSHindex, Pushonlyindex, Txindex, Txinindex, Txoutindex, Unknownindex,
};
use brk_parser::NUMBER_OF_UNSAFE_BLOCKS;
use brk_vec::CACHED_GETS;
use color_eyre::eyre::ContextCompat;
use crate::{Stores, Vecs};
@@ -31,7 +30,7 @@ pub struct Indexes {
}
impl Indexes {
pub fn push_if_needed(&self, vecs: &mut Vecs<CACHED_GETS>) -> brk_vec::Result<()> {
pub fn push_if_needed(&self, vecs: &mut Vecs) -> brk_vec::Result<()> {
let height = self.height;
vecs.height_to_first_txindex.push_if_needed(height, self.txindex)?;
vecs.height_to_first_txinindex.push_if_needed(height, self.txinindex)?;
@@ -64,7 +63,7 @@ impl Indexes {
Ok(())
}
pub fn push_future_if_needed(&mut self, vecs: &mut Vecs<CACHED_GETS>) -> brk_vec::Result<()> {
pub fn push_future_if_needed(&mut self, vecs: &mut Vecs) -> brk_vec::Result<()> {
self.height.increment();
self.push_if_needed(vecs)?;
self.height.decrement();
@@ -72,9 +71,9 @@ impl Indexes {
}
}
impl TryFrom<(&mut Vecs<CACHED_GETS>, &Stores, &Client)> for Indexes {
impl TryFrom<(&mut Vecs, &Stores, &Client)> for Indexes {
type Error = color_eyre::Report;
fn try_from((vecs, stores, rpc): (&mut Vecs<CACHED_GETS>, &Stores, &Client)) -> color_eyre::Result<Self> {
fn try_from((vecs, stores, rpc): (&mut Vecs, &Stores, &Client)) -> color_eyre::Result<Self> {
// Height at which we wanna start: min last saved + 1 or 0
let starting_height = vecs.starting_height().min(stores.starting_height());

View File

@@ -1,4 +1,7 @@
#![doc = include_str!("../README.md")]
#![doc = "\n## Example\n\n```rust"]
#![doc = include_str!("main.rs")]
#![doc = "```"]
use std::{
collections::BTreeMap,
@@ -15,7 +18,6 @@ pub use brk_parser::*;
use bitcoin::{Transaction, TxIn, TxOut};
use brk_exit::Exit;
use brk_vec::CACHED_GETS;
use color_eyre::eyre::{ContextCompat, eyre};
use log::info;
use rayon::prelude::*;
@@ -29,12 +31,13 @@ pub use vecs::*;
const SNAPSHOT_BLOCK_RANGE: usize = 1000;
pub struct Indexer<const MODE: u8> {
pub vecs: Vecs<MODE>,
#[derive(Clone)]
pub struct Indexer {
pub vecs: Vecs,
pub stores: Stores,
}
impl<const MODE: u8> Indexer<MODE> {
impl Indexer {
pub fn import(indexes_dir: &Path) -> color_eyre::Result<Self> {
setrlimit()?;
@@ -42,13 +45,11 @@ impl<const MODE: u8> Indexer<MODE> {
let vecs = Vecs::import(&indexes_dir.join("vecs"))?;
let stores = Stores::import(&indexes_dir.join("fjall"))?;
let stores = Stores::import(&indexes_dir.join("stores"))?;
Ok(Self { vecs, stores })
}
}
impl Indexer<CACHED_GETS> {
pub fn index(&mut self, parser: &Parser, rpc: &'static rpc::Client, exit: &Exit) -> color_eyre::Result<Indexes> {
let check_collisions = true;
@@ -63,35 +64,35 @@ impl Indexer<CACHED_GETS> {
self.vecs.rollback_if_needed(&starting_indexes)?;
exit.release();
let export_if_needed = |stores: &mut Stores,
vecs: &mut Vecs<CACHED_GETS>,
height: Height,
rem: bool,
exit: &Exit|
-> color_eyre::Result<()> {
if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem || exit.triggered() {
return Ok(());
}
let export_if_needed =
|stores: &mut Stores, vecs: &mut Vecs, height: Height, rem: bool, exit: &Exit| -> color_eyre::Result<()> {
if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem || exit.triggered() {
return Ok(());
}
info!("Exporting...");
exit.block();
stores.commit(height)?;
vecs.flush(height)?;
exit.release();
Ok(())
};
info!("Exporting...");
exit.block();
stores.commit(height)?;
vecs.flush(height)?;
exit.release();
Ok(())
};
let vecs = &mut self.vecs;
let stores = &mut self.stores;
if starting_indexes.height > Height::try_from(rpc)? {
let mut idxs = starting_indexes.clone();
let start = Some(idxs.height);
let end = None; //Some(Height::new(400_000));
if starting_indexes.height > Height::try_from(rpc)? || end.is_some_and(|end| starting_indexes.height > end) {
return Ok(starting_indexes);
}
info!("Started indexing...");
let mut idxs = starting_indexes.clone();
parser.parse(Some(idxs.height), None).iter().try_for_each(
parser.parse(start, None).iter().try_for_each(
|(height, block, blockhash)| -> color_eyre::Result<()> {
info!("Indexing block {height}...");
@@ -634,6 +635,8 @@ impl Indexer<CACHED_GETS> {
export_if_needed(stores, vecs, idxs.height, true, exit)?;
stores.rotate_memtables();
Ok(starting_indexes)
}
}

View File

@@ -6,7 +6,6 @@ use brk_parser::{
Parser,
rpc::{self},
};
use brk_vec::CACHED_GETS;
use log::info;
fn main() -> color_eyre::Result<()> {
@@ -23,13 +22,13 @@ fn main() -> color_eyre::Result<()> {
let parser = Parser::new(data_dir, rpc);
let mut indexer: Indexer<CACHED_GETS> = Indexer::import(Path::new("../../_outputs/indexes"))?;
loop {
let block_count = rpc.get_block_count()?;
info!("{block_count} blocks found.");
let mut indexer = Indexer::import(Path::new("../../_outputs/indexed"))?;
indexer.index(&parser, rpc, &exit)?;
info!("Waiting for new blocks...");

View File

@@ -136,6 +136,10 @@ where
Ok(())
}
pub fn rotate_memtable(&self) {
let _ = self.part.inner().rotate_memtable();
}
pub fn height(&self) -> Option<Height> {
self.meta.height()
}
@@ -155,7 +159,9 @@ where
}
fn open_keyspace(path: &Path) -> Result<TransactionalKeyspace> {
fjall::Config::new(path.join("fjall")).open_transactional()
fjall::Config::new(path.join("fjall"))
.max_write_buffer_size(32 * 1024 * 1024)
.open_transactional()
}
fn open_partition_handle(keyspace: &TransactionalKeyspace) -> Result<TransactionalPartitionHandle> {
@@ -163,6 +169,7 @@ where
"partition",
PartitionCreateOptions::default()
.bloom_filter_bits(Some(5))
.max_memtable_size(8 * 1024 * 1024)
.manual_journal_persist(true),
)
}

View File

@@ -1,7 +1,7 @@
use std::{path::Path, thread};
use brk_core::{AddressHash, Addressbytes, Addressindex, Addresstype, BlockHashPrefix, Height, TxidPrefix, Txindex};
use brk_vec::{CACHED_GETS, Value, Version};
use brk_vec::{Value, Version};
use crate::Indexes;
@@ -38,14 +38,9 @@ impl Stores {
})
}
pub fn rollback_if_needed(
&mut self,
vecs: &Vecs<CACHED_GETS>,
starting_indexes: &Indexes,
) -> color_eyre::Result<()> {
pub fn rollback_if_needed(&mut self, vecs: &Vecs, starting_indexes: &Indexes) -> color_eyre::Result<()> {
vecs.height_to_blockhash
.iter_from(starting_indexes.height, |(_, blockhash)| {
let blockhash = blockhash.as_ref();
let blockhash_prefix = BlockHashPrefix::from(blockhash);
self.blockhash_prefix_to_height.remove(blockhash_prefix);
Ok(())
@@ -53,7 +48,6 @@ impl Stores {
vecs.txindex_to_txid
.iter_from(starting_indexes.txindex, |(_txindex, txid)| {
let txid = txid.as_ref();
let txid_prefix = TxidPrefix::from(txid);
self.txid_prefix_to_txindex.remove(txid_prefix);
Ok(())
@@ -173,4 +167,10 @@ impl Stores {
Ok(())
})
}
pub fn rotate_memtables(&self) {
self.addresshash_to_addressindex.rotate_memtable();
self.blockhash_prefix_to_height.rotate_memtable();
self.txid_prefix_to_txindex.rotate_memtable();
}
}

View File

@@ -5,25 +5,29 @@ use std::{
path::{Path, PathBuf},
};
use brk_vec::{CACHED_GETS, StoredIndex, StoredType, Version};
use brk_vec::{StoredIndex, StoredType, Version};
use super::Height;
#[derive(Debug)]
pub struct StorableVec<I, T, const MODE: u8> {
pub struct StorableVec<I, T> {
height: Option<Height>,
vec: brk_vec::StorableVec<I, T, MODE>,
vec: brk_vec::StorableVec<I, T>,
}
impl<I, T, const MODE: u8> StorableVec<I, T, MODE>
impl<I, T> StorableVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
pub fn import(path: &Path, version: Version) -> brk_vec::Result<Self> {
let mut vec = brk_vec::StorableVec::forced_import(path, version)?;
vec.reset_mmaps()?;
Ok(Self {
height: Height::try_from(Self::path_height_(path).as_path()).ok(),
vec: brk_vec::StorableVec::forced_import(path, version)?,
vec,
})
}
@@ -43,37 +47,44 @@ where
fn path_height_(path: &Path) -> PathBuf {
path.join("height")
}
}
impl<I, T> StorableVec<I, T, CACHED_GETS>
where
I: StoredIndex,
T: StoredType,
{
pub fn flush(&mut self, height: Height) -> io::Result<()> {
height.write(&self.path_height())?;
self.vec.flush()
self.vec.flush()?;
self.vec.reset_mmaps()
}
}
impl<I, T, const MODE: u8> Deref for StorableVec<I, T, MODE> {
type Target = brk_vec::StorableVec<I, T, MODE>;
impl<I, T> Deref for StorableVec<I, T> {
type Target = brk_vec::StorableVec<I, T>;
fn deref(&self) -> &Self::Target {
&self.vec
}
}
impl<I, T, const MODE: u8> DerefMut for StorableVec<I, T, MODE> {
impl<I, T> DerefMut for StorableVec<I, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.vec
}
}
impl<I, T> Clone for StorableVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn clone(&self) -> Self {
Self {
height: self.height,
vec: self.vec.clone(),
}
}
}
pub trait AnyStorableVec: Send + Sync {
pub trait AnyIndexedVec: Send + Sync {
fn height(&self) -> brk_core::Result<Height>;
fn flush(&mut self, height: Height) -> io::Result<()>;
}
impl<I, T> AnyStorableVec for StorableVec<I, T, CACHED_GETS>
impl<I, T> AnyIndexedVec for StorableVec<I, T>
where
I: StoredIndex,
T: StoredType,

View File

@@ -6,7 +6,7 @@ use brk_core::{
P2SHAddressBytes, P2SHindex, P2TRAddressBytes, P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes,
P2WSHindex, Pushonlyindex, Sats, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight,
};
use brk_vec::{AnyJsonStorableVec, CACHED_GETS, Version};
use brk_vec::{AnyStorableVec, Version};
use rayon::prelude::*;
use crate::Indexes;
@@ -15,53 +15,54 @@ mod base;
pub use base::*;
pub struct Vecs<const MODE: u8> {
pub addressindex_to_addresstype: StorableVec<Addressindex, Addresstype, MODE>,
pub addressindex_to_addresstypeindex: StorableVec<Addressindex, Addresstypeindex, MODE>,
pub addressindex_to_height: StorableVec<Addressindex, Height, MODE>,
pub height_to_blockhash: StorableVec<Height, BlockHash, MODE>,
pub height_to_difficulty: StorableVec<Height, f64, MODE>,
pub height_to_first_addressindex: StorableVec<Height, Addressindex, MODE>,
pub height_to_first_emptyindex: StorableVec<Height, Emptyindex, MODE>,
pub height_to_first_multisigindex: StorableVec<Height, Multisigindex, MODE>,
pub height_to_first_opreturnindex: StorableVec<Height, Opreturnindex, MODE>,
pub height_to_first_pushonlyindex: StorableVec<Height, Pushonlyindex, MODE>,
pub height_to_first_txindex: StorableVec<Height, Txindex, MODE>,
pub height_to_first_txinindex: StorableVec<Height, Txinindex, MODE>,
pub height_to_first_txoutindex: StorableVec<Height, Txoutindex, MODE>,
pub height_to_first_unknownindex: StorableVec<Height, Unknownindex, MODE>,
pub height_to_first_p2pk33index: StorableVec<Height, P2PK33index, MODE>,
pub height_to_first_p2pk65index: StorableVec<Height, P2PK65index, MODE>,
pub height_to_first_p2pkhindex: StorableVec<Height, P2PKHindex, MODE>,
pub height_to_first_p2shindex: StorableVec<Height, P2SHindex, MODE>,
pub height_to_first_p2trindex: StorableVec<Height, P2TRindex, MODE>,
pub height_to_first_p2wpkhindex: StorableVec<Height, P2WPKHindex, MODE>,
pub height_to_first_p2wshindex: StorableVec<Height, P2WSHindex, MODE>,
pub height_to_size: StorableVec<Height, usize, MODE>,
pub height_to_timestamp: StorableVec<Height, Timestamp, MODE>,
pub height_to_weight: StorableVec<Height, Weight, MODE>,
pub p2pk33index_to_p2pk33addressbytes: StorableVec<P2PK33index, P2PK33AddressBytes, MODE>,
pub p2pk65index_to_p2pk65addressbytes: StorableVec<P2PK65index, P2PK65AddressBytes, MODE>,
pub p2pkhindex_to_p2pkhaddressbytes: StorableVec<P2PKHindex, P2PKHAddressBytes, MODE>,
pub p2shindex_to_p2shaddressbytes: StorableVec<P2SHindex, P2SHAddressBytes, MODE>,
pub p2trindex_to_p2traddressbytes: StorableVec<P2TRindex, P2TRAddressBytes, MODE>,
pub p2wpkhindex_to_p2wpkhaddressbytes: StorableVec<P2WPKHindex, P2WPKHAddressBytes, MODE>,
pub p2wshindex_to_p2wshaddressbytes: StorableVec<P2WSHindex, P2WSHAddressBytes, MODE>,
pub txindex_to_first_txinindex: StorableVec<Txindex, Txinindex, MODE>,
pub txindex_to_first_txoutindex: StorableVec<Txindex, Txoutindex, MODE>,
pub txindex_to_height: StorableVec<Txindex, Height, MODE>,
pub txindex_to_locktime: StorableVec<Txindex, LockTime, MODE>,
pub txindex_to_txid: StorableVec<Txindex, Txid, MODE>,
pub txindex_to_base_size: StorableVec<Txindex, usize, MODE>,
pub txindex_to_total_size: StorableVec<Txindex, usize, MODE>,
pub txindex_to_is_explicitly_rbf: StorableVec<Txindex, bool, MODE>,
pub txindex_to_txversion: StorableVec<Txindex, TxVersion, MODE>,
pub txinindex_to_txoutindex: StorableVec<Txinindex, Txoutindex, MODE>,
pub txoutindex_to_addressindex: StorableVec<Txoutindex, Addressindex, MODE>,
pub txoutindex_to_value: StorableVec<Txoutindex, Sats, MODE>,
#[derive(Clone)]
pub struct Vecs {
pub addressindex_to_addresstype: StorableVec<Addressindex, Addresstype>,
pub addressindex_to_addresstypeindex: StorableVec<Addressindex, Addresstypeindex>,
pub addressindex_to_height: StorableVec<Addressindex, Height>,
pub height_to_blockhash: StorableVec<Height, BlockHash>,
pub height_to_difficulty: StorableVec<Height, f64>,
pub height_to_first_addressindex: StorableVec<Height, Addressindex>,
pub height_to_first_emptyindex: StorableVec<Height, Emptyindex>,
pub height_to_first_multisigindex: StorableVec<Height, Multisigindex>,
pub height_to_first_opreturnindex: StorableVec<Height, Opreturnindex>,
pub height_to_first_pushonlyindex: StorableVec<Height, Pushonlyindex>,
pub height_to_first_txindex: StorableVec<Height, Txindex>,
pub height_to_first_txinindex: StorableVec<Height, Txinindex>,
pub height_to_first_txoutindex: StorableVec<Height, Txoutindex>,
pub height_to_first_unknownindex: StorableVec<Height, Unknownindex>,
pub height_to_first_p2pk33index: StorableVec<Height, P2PK33index>,
pub height_to_first_p2pk65index: StorableVec<Height, P2PK65index>,
pub height_to_first_p2pkhindex: StorableVec<Height, P2PKHindex>,
pub height_to_first_p2shindex: StorableVec<Height, P2SHindex>,
pub height_to_first_p2trindex: StorableVec<Height, P2TRindex>,
pub height_to_first_p2wpkhindex: StorableVec<Height, P2WPKHindex>,
pub height_to_first_p2wshindex: StorableVec<Height, P2WSHindex>,
pub height_to_size: StorableVec<Height, usize>,
pub height_to_timestamp: StorableVec<Height, Timestamp>,
pub height_to_weight: StorableVec<Height, Weight>,
pub p2pk33index_to_p2pk33addressbytes: StorableVec<P2PK33index, P2PK33AddressBytes>,
pub p2pk65index_to_p2pk65addressbytes: StorableVec<P2PK65index, P2PK65AddressBytes>,
pub p2pkhindex_to_p2pkhaddressbytes: StorableVec<P2PKHindex, P2PKHAddressBytes>,
pub p2shindex_to_p2shaddressbytes: StorableVec<P2SHindex, P2SHAddressBytes>,
pub p2trindex_to_p2traddressbytes: StorableVec<P2TRindex, P2TRAddressBytes>,
pub p2wpkhindex_to_p2wpkhaddressbytes: StorableVec<P2WPKHindex, P2WPKHAddressBytes>,
pub p2wshindex_to_p2wshaddressbytes: StorableVec<P2WSHindex, P2WSHAddressBytes>,
pub txindex_to_first_txinindex: StorableVec<Txindex, Txinindex>,
pub txindex_to_first_txoutindex: StorableVec<Txindex, Txoutindex>,
pub txindex_to_height: StorableVec<Txindex, Height>,
pub txindex_to_locktime: StorableVec<Txindex, LockTime>,
pub txindex_to_txid: StorableVec<Txindex, Txid>,
pub txindex_to_base_size: StorableVec<Txindex, usize>,
pub txindex_to_total_size: StorableVec<Txindex, usize>,
pub txindex_to_is_explicitly_rbf: StorableVec<Txindex, bool>,
pub txindex_to_txversion: StorableVec<Txindex, TxVersion>,
pub txinindex_to_txoutindex: StorableVec<Txinindex, Txoutindex>,
pub txoutindex_to_addressindex: StorableVec<Txoutindex, Addressindex>,
pub txoutindex_to_value: StorableVec<Txoutindex, Sats>,
}
impl<const MODE: u8> Vecs<MODE> {
impl Vecs {
pub fn import(path: &Path) -> color_eyre::Result<Self> {
fs::create_dir_all(path)?;
@@ -293,56 +294,6 @@ impl<const MODE: u8> Vecs<MODE> {
Ok(())
}
pub fn as_any_json_vecs(&self) -> Vec<&dyn AnyJsonStorableVec> {
vec![
&*self.addressindex_to_addresstype as &dyn AnyJsonStorableVec,
&*self.addressindex_to_addresstypeindex,
&*self.addressindex_to_height,
&*self.height_to_blockhash,
&*self.height_to_difficulty,
&*self.height_to_first_addressindex,
&*self.height_to_first_emptyindex,
&*self.height_to_first_multisigindex,
&*self.height_to_first_opreturnindex,
&*self.height_to_first_pushonlyindex,
&*self.height_to_first_txindex,
&*self.height_to_first_txinindex,
&*self.height_to_first_txoutindex,
&*self.height_to_first_unknownindex,
&*self.height_to_first_p2pk33index,
&*self.height_to_first_p2pk65index,
&*self.height_to_first_p2pkhindex,
&*self.height_to_first_p2shindex,
&*self.height_to_first_p2trindex,
&*self.height_to_first_p2wpkhindex,
&*self.height_to_first_p2wshindex,
&*self.height_to_size,
&*self.height_to_timestamp,
&*self.height_to_weight,
&*self.p2pk33index_to_p2pk33addressbytes,
&*self.p2pk65index_to_p2pk65addressbytes,
&*self.p2pkhindex_to_p2pkhaddressbytes,
&*self.p2shindex_to_p2shaddressbytes,
&*self.p2trindex_to_p2traddressbytes,
&*self.p2wpkhindex_to_p2wpkhaddressbytes,
&*self.p2wshindex_to_p2wshaddressbytes,
&*self.txindex_to_first_txinindex,
&*self.txindex_to_first_txoutindex,
&*self.txindex_to_height,
&*self.txindex_to_locktime,
&*self.txindex_to_txid,
&*self.txindex_to_base_size,
&*self.txindex_to_total_size,
&*self.txindex_to_is_explicitly_rbf,
&*self.txindex_to_txversion,
&*self.txinindex_to_txoutindex,
&*self.txoutindex_to_addressindex,
&*self.txoutindex_to_value,
]
}
}
impl Vecs<CACHED_GETS> {
pub fn get_addressbytes(
&self,
addresstype: Addresstype,
@@ -424,9 +375,57 @@ impl Vecs<CACHED_GETS> {
.unwrap()
}
fn as_mut_any_vecs(&mut self) -> Vec<&mut dyn AnyStorableVec> {
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
vec![
&mut self.addressindex_to_addresstype as &mut dyn AnyStorableVec,
&*self.addressindex_to_addresstype as &dyn AnyStorableVec,
&*self.addressindex_to_addresstypeindex,
&*self.addressindex_to_height,
&*self.height_to_blockhash,
&*self.height_to_difficulty,
&*self.height_to_first_addressindex,
&*self.height_to_first_emptyindex,
&*self.height_to_first_multisigindex,
&*self.height_to_first_opreturnindex,
&*self.height_to_first_pushonlyindex,
&*self.height_to_first_txindex,
&*self.height_to_first_txinindex,
&*self.height_to_first_txoutindex,
&*self.height_to_first_unknownindex,
&*self.height_to_first_p2pk33index,
&*self.height_to_first_p2pk65index,
&*self.height_to_first_p2pkhindex,
&*self.height_to_first_p2shindex,
&*self.height_to_first_p2trindex,
&*self.height_to_first_p2wpkhindex,
&*self.height_to_first_p2wshindex,
&*self.height_to_size,
&*self.height_to_timestamp,
&*self.height_to_weight,
&*self.p2pk33index_to_p2pk33addressbytes,
&*self.p2pk65index_to_p2pk65addressbytes,
&*self.p2pkhindex_to_p2pkhaddressbytes,
&*self.p2shindex_to_p2shaddressbytes,
&*self.p2trindex_to_p2traddressbytes,
&*self.p2wpkhindex_to_p2wpkhaddressbytes,
&*self.p2wshindex_to_p2wshaddressbytes,
&*self.txindex_to_first_txinindex,
&*self.txindex_to_first_txoutindex,
&*self.txindex_to_height,
&*self.txindex_to_locktime,
&*self.txindex_to_txid,
&*self.txindex_to_base_size,
&*self.txindex_to_total_size,
&*self.txindex_to_is_explicitly_rbf,
&*self.txindex_to_txversion,
&*self.txinindex_to_txoutindex,
&*self.txoutindex_to_addressindex,
&*self.txoutindex_to_value,
]
}
fn as_mut_any_vecs(&mut self) -> Vec<&mut dyn AnyIndexedVec> {
vec![
&mut self.addressindex_to_addresstype as &mut dyn AnyIndexedVec,
&mut self.addressindex_to_addresstypeindex,
&mut self.addressindex_to_height,
&mut self.height_to_blockhash,

View File

@@ -1,4 +1,7 @@
#![doc = include_str!("../README.md")]
#![doc = "\n## Example\n\n```rust"]
#![doc = include_str!("main.rs")]
#![doc = "```"]
use std::{
fmt::Display,

View File

@@ -1,4 +1,7 @@
#![doc = include_str!("../README.md")]
#![doc = "\n## Example\n\n```rust"]
#![doc = include_str!("main.rs")]
#![doc = "```"]
use std::{
cmp::Ordering,

View File

@@ -0,0 +1,12 @@
[package]
name = "brk_query"
description = "A library that finds requested datasets"
license.workspace = true
edition.workspace = true
version.workspace = true
repository.workspace = true
[dependencies]
brk_computer = { workspace = true }
brk_indexer = { workspace = true }
clap = { workspace = true }

View File

@@ -0,0 +1 @@
# BRK Query

View File

@@ -0,0 +1,4 @@
#![doc = include_str!("../README.md")]
#![doc = "\n## Example\n\n```rust"]
#![doc = include_str!("main.rs")]
#![doc = "```"]

View File

@@ -9,8 +9,10 @@ repository.workspace = true
[dependencies]
axum = "0.8.1"
brk_computer = { workspace = true }
brk_exit = { workspace = true }
brk_indexer = { workspace = true }
brk_logger = { workspace = true }
brk_parser = { workspace = true }
brk_vec = { workspace = true }
color-eyre = { workspace = true }
derive_deref = { workspace = true }

View File

@@ -1,6 +1,6 @@
use std::{collections::BTreeMap, fs, io};
use brk_vec::AnyJsonStorableVec;
use brk_vec::AnyStorableVec;
use derive_deref::{Deref, DerefMut};
use crate::WEBSITE_DEV_PATH;
@@ -12,7 +12,7 @@ pub struct VecIdToIndexToVec(BTreeMap<String, IndexToVec>);
impl VecIdToIndexToVec {
// Not the most performant or type safe but only built once so that's okay
pub fn insert(&mut self, vec: &'static dyn AnyJsonStorableVec) {
pub fn insert(&mut self, vec: &'static dyn AnyStorableVec) {
let file_name = vec.file_name();
let split = file_name.split("_to_").collect::<Vec<_>>();
if split.len() != 2 {
@@ -75,4 +75,4 @@ impl VecIdToIndexToVec {
}
#[derive(Default, Deref, DerefMut)]
pub struct IndexToVec(BTreeMap<Index, &'static dyn AnyJsonStorableVec>);
pub struct IndexToVec(BTreeMap<Index, &'static dyn AnyStorableVec>);

View File

@@ -1,4 +1,7 @@
#![doc = include_str!("../README.md")]
#![doc = "\n## Example\n\n```rust"]
#![doc = include_str!("main.rs")]
#![doc = "```"]
use std::time::Instant;
@@ -6,7 +9,6 @@ use api::{ApiRoutes, VecIdToIndexToVec};
use axum::{Json, Router, http::StatusCode, routing::get, serve};
use brk_computer::Computer;
use brk_indexer::Indexer;
use brk_vec::STATELESS;
use color_eyre::owo_colors::OwoColorize;
use files::FilesRoutes;
use log::{error, info};
@@ -20,22 +22,19 @@ mod traits;
#[derive(Clone)]
pub struct AppState {
vecs: &'static VecIdToIndexToVec,
indexer: &'static Indexer<STATELESS>,
computer: &'static Computer<STATELESS>,
indexer: &'static Indexer,
computer: &'static Computer,
}
pub const WEBSITE_DEV_PATH: &str = "../../websites/kibo.money/";
pub async fn main(indexer: Indexer<STATELESS>, computer: Computer<STATELESS>) -> color_eyre::Result<()> {
pub async fn main(indexer: Indexer, computer: Computer) -> color_eyre::Result<()> {
let indexer = Box::leak(Box::new(indexer));
let computer = Box::leak(Box::new(computer));
let vecs = Box::leak(Box::new(VecIdToIndexToVec::default()));
indexer
.vecs
.as_any_json_vecs()
.into_iter()
.for_each(|vec| vecs.insert(vec));
indexer.vecs.as_any_vecs().into_iter().for_each(|vec| vecs.insert(vec));
computer.vecs.as_any_vecs().into_iter().for_each(|vec| vecs.insert(vec));
vecs.generate_dts_file()?;

View File

@@ -1,20 +1,62 @@
use std::path::Path;
use std::{path::Path, thread::sleep, time::Duration};
use brk_computer::Computer;
use brk_exit::Exit;
use brk_indexer::Indexer;
use brk_vec::STATELESS;
use brk_parser::{
Parser,
rpc::{self, RpcApi},
};
use log::info;
#[tokio::main]
pub async fn main() -> color_eyre::Result<()> {
pub fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
brk_logger::init(None);
brk_logger::init(Some(Path::new(".log")));
let path = Path::new("../../_outputs");
let indexer: Indexer<STATELESS> = Indexer::import(&path.join("indexes"))?;
let computer: Computer<STATELESS> = Computer::import(&path.join("computed"))?;
let data_dir = Path::new("../../../bitcoin");
let rpc = Box::leak(Box::new(rpc::Client::new(
"http://localhost:8332",
rpc::Auth::CookieFile(Path::new(data_dir).join(".cookie")),
)?));
let exit = Exit::new();
brk_server::main(indexer, computer).await.unwrap();
let parser = Parser::new(data_dir, rpc);
Ok(())
let outputs_dir = Path::new("../../_outputs");
let mut indexer = Indexer::import(&outputs_dir.join("indexed"))?;
let mut computer = Computer::import(&outputs_dir.join("computed"))?;
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async {
let served_indexer = indexer.clone();
let served_computer = computer.clone();
tokio::spawn(async move {
brk_server::main(served_indexer, served_computer).await.unwrap();
});
loop {
let block_count = rpc.get_block_count()?;
info!("{block_count} blocks found.");
let starting_indexes = indexer.index(&parser, rpc, &exit)?;
computer.compute(&mut indexer, starting_indexes, &exit)?;
info!("Waiting for new blocks...");
while block_count == rpc.get_block_count()? {
sleep(Duration::from_secs(1))
}
}
#[allow(unreachable_code)]
Ok(())
}) as color_eyre::Result<()>
}

View File

@@ -8,13 +8,10 @@ edition.workspace = true
license.workspace = true
repository.workspace = true
[features]
json = ["serde", "serde_json"]
[dependencies]
brk_exit = { workspace = true }
memmap2 = "0.9.5"
rayon = { workspace = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
zerocopy = { workspace = true }

View File

@@ -1,4 +1,7 @@
#![doc = include_str!("../README.md")]
#![doc = "\n## Example\n\n```rust"]
#![doc = include_str!("main.rs")]
#![doc = "```"]
use std::{
cmp::Ordering,
@@ -26,23 +29,6 @@ pub use enums::*;
pub use structs::*;
pub use traits::*;
type Buffer = Vec<u8>;
/// Uses `Mmap` instead of `File`
///
/// Used in `/indexer`
pub const CACHED_GETS: u8 = 0;
/// Will use the same `File` for every read, so not thread safe
///
/// Used in `/computer`
pub const SINGLE_THREAD: u8 = 1;
/// Will spin up a new `File` for every read
///
/// Used in `/server`
pub const STATELESS: u8 = 2;
///
/// A very small, fast, efficient and simple storable Vec
///
@@ -55,16 +41,15 @@ pub const STATELESS: u8 = 2;
/// If you don't call `.flush()` it just acts as a normal Vec
///
#[derive(Debug)]
pub struct StorableVec<I, T, const MODE: u8> {
pub struct StorableVec<I, T> {
version: Version,
pathbuf: PathBuf,
file: File,
/// **Number of values NOT number of bytes**
file_len: usize,
file_position: u64,
buf: Buffer,
/// Only for CACHED_GETS
cache: Vec<OnceLock<Box<memmap2::Mmap>>>, // Boxed Mmap to reduce the size of the Lock (from 24 to 16)
buf: Vec<u8>,
mmaps: Vec<OnceLock<Box<memmap2::Mmap>>>, // Boxed Mmap to reduce the size of the Lock (from 24 to 16)
pushed: Vec<T>,
// updated: BTreeMap<usize, T>,
// inserted: BTreeMap<usize, T>,
@@ -76,11 +61,12 @@ pub struct StorableVec<I, T, const MODE: u8> {
/// In bytes
const MAX_PAGE_SIZE: usize = 4 * 4096;
const ONE_MB: usize = 1000 * 1024;
const ONE_MB: usize = 1024 * 1024;
// const MAX_CACHE_SIZE: usize = usize::MAX;
const MAX_CACHE_SIZE: usize = 100 * ONE_MB;
const FLUSH_EVERY: usize = 10_000;
impl<I, T, const MODE: u8> StorableVec<I, T, MODE>
impl<I, T> StorableVec<I, T>
where
I: StoredIndex,
T: StoredType,
@@ -106,11 +92,9 @@ where
pub fn import(path: &Path, version: Version) -> Result<Self> {
fs::create_dir_all(path)?;
if MODE != STATELESS {
let path = Self::path_version_(path);
version.validate(path.as_ref())?;
version.write(path.as_ref())?;
}
let version_path = Self::path_version_(path);
version.validate(version_path.as_ref())?;
version.write(version_path.as_ref())?;
let file = Self::open_file_(&Self::path_vec_(path))?;
@@ -121,7 +105,7 @@ where
file_len: Self::read_disk_len_(&file)?,
file,
buf: Self::create_buffer(),
cache: vec![],
mmaps: vec![],
pushed: vec![],
// updated: BTreeMap::new(),
// inserted: BTreeMap::new(),
@@ -131,13 +115,13 @@ where
// opened_mmaps: AtomicUsize::new(0),
};
slf.reset_disk_related_state()?;
slf.reset_file_metadata()?;
Ok(slf)
}
#[inline]
fn create_buffer() -> Buffer {
fn create_buffer() -> Vec<u8> {
vec![0; Self::SIZE_OF_T]
}
@@ -153,9 +137,12 @@ where
.open(path)
}
fn open_file_at_then_read(&self, index: usize) -> Result<T> {
pub fn open_then_read(&self, index: I) -> Result<T> {
self.open_then_read_(Self::i_to_usize(index)?)
}
fn open_then_read_(&self, index: usize) -> Result<T> {
let mut file = self.open_file()?;
Self::seek(&mut file, Self::index_to_byte_index(index))?;
Self::seek_(&mut file, Self::index_to_byte_index(index))?;
let mut buf = Self::create_buffer();
Self::read_exact(&mut file, &mut buf).map(|v| v.to_owned())
}
@@ -167,35 +154,33 @@ where
Ok(Self::byte_index_to_index(file.metadata()?.len() as usize))
}
fn reset_disk_related_state(&mut self) -> io::Result<()> {
fn reset_file_metadata(&mut self) -> io::Result<()> {
self.file_len = self.read_disk_len()?;
self.file_position = 0;
self.reset_cache()
self.file_position = self.file.seek(SeekFrom::Start(0))?;
Ok(())
}
fn reset_cache(&mut self) -> io::Result<()> {
match MODE {
CACHED_GETS => {
self.cache.par_iter_mut().for_each(|lock| {
lock.take();
});
pub fn reset_mmaps(&mut self) -> io::Result<()> {
self.mmaps.par_iter_mut().for_each(|lock| {
lock.take();
});
let len = (self.file_len as f64 / Self::PER_PAGE as f64).ceil() as usize;
let len = Self::CACHE_LENGTH.min(len);
let len = (self.file_len as f64 / Self::PER_PAGE as f64).ceil() as usize;
let len = Self::CACHE_LENGTH.min(len);
if self.cache.len() != len {
self.cache.resize_with(len, Default::default);
// self.cache.shrink_to_fit();
}
Ok(())
}
_ => Ok(()),
if self.mmaps.len() != len {
self.mmaps.resize_with(len, Default::default);
}
Ok(())
}
#[inline]
fn seek(file: &mut File, byte_index: u64) -> io::Result<u64> {
fn seek(&mut self, byte_index: u64) -> io::Result<u64> {
self.file.seek(SeekFrom::Start(byte_index))
}
#[inline]
fn seek_(file: &mut File, byte_index: u64) -> io::Result<u64> {
file.seek(SeekFrom::Start(byte_index))
}
@@ -206,12 +191,168 @@ where
}
#[inline]
fn push_(&mut self, value: T) {
pub fn get(&self, index: I) -> Result<Option<Value<'_, T>>> {
self.get_(Self::i_to_usize(index)?)
}
fn get_(&self, index: usize) -> Result<Option<Value<'_, T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed.get(index).map(|v| Value::Ref(v)));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
// if !self.updated.is_empty() {
// if let Some(v) = self.updated.get(&index) {
// return Ok(Some(v));
// }
// }
let page_index = index / Self::PER_PAGE;
let last_index = self.file_len - 1;
let max_page_index = last_index / Self::PER_PAGE;
let min_page_index = (max_page_index + 1) - self.mmaps.len();
// let min_open_page = self.min.load(AtomicOrdering::SeqCst);
// if self.min.load(AtomicOrdering::SeqCst) {
// self.min.set(value)
// }
if !self.mmaps.is_empty() && page_index >= min_page_index {
let mmap = &**self
.mmaps
.get(page_index - min_page_index)
.ok_or(Error::MmapsVecIsTooSmall)?
.get_or_init(|| {
Box::new(unsafe {
memmap2::MmapOptions::new()
.len(Self::PAGE_SIZE)
.offset((page_index * Self::PAGE_SIZE) as u64)
.map(&self.file)
.unwrap()
})
});
let range = Self::index_to_byte_range(index);
let slice = &mmap[range];
return Ok(Some(Value::Ref(T::try_ref_from_bytes(slice)?)));
}
Ok(self.open_then_read_(index).map_or(None, |v| Some(Value::Owned(v))))
}
#[inline]
pub fn read(&mut self, index: I) -> Result<Option<&T>> {
self.read_(Self::i_to_usize(index)?)
}
#[inline]
pub fn read_(&mut self, index: usize) -> Result<Option<&T>> {
let byte_index = Self::index_to_byte_index(index);
if self.file_position != byte_index {
self.file_position = self.seek(Self::index_to_byte_index(index))?;
}
match Self::read_exact(&mut self.file, &mut self.buf) {
Ok(value) => {
self.file_position += Self::SIZE_OF_T as u64;
Ok(Some(value))
}
Err(e) => Err(e),
}
}
fn read_last(&mut self) -> Result<Option<&T>> {
let len = self.len();
if len == 0 {
return Ok(None);
}
self.read_(len - 1)
}
pub fn iter<F>(&self, f: F) -> Result<()>
where
F: FnMut((I, &T)) -> Result<()>,
{
self.iter_from(I::default(), f)
}
pub fn iter_from<F>(&self, mut index: I, mut f: F) -> Result<()>
where
F: FnMut((I, &T)) -> Result<()>,
{
let mut file = self.open_file()?;
let disk_len = I::from(Self::read_disk_len_(&file)?);
let pushed_len = I::from(self.pushed_len());
Self::seek_(&mut file, Self::index_to_byte_index(Self::i_to_usize(index)?))?;
let mut buf = Self::create_buffer();
while index < disk_len {
f((index, Self::read_exact(&mut file, &mut buf)?))?;
index = index + 1;
}
let disk_len = Self::i_to_usize(disk_len)?;
let mut i = I::default();
while i < pushed_len {
f((i + disk_len, self.pushed.get(Self::i_to_usize(i)?).as_ref().unwrap()))?;
i = i + 1;
}
Ok(())
}
pub fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<T>> {
if !self.pushed.is_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let mut file = self.open_file()?;
let len = Self::read_disk_len_(&file)?;
let from = from.map_or(0, |from| {
if from >= 0 {
from as usize
} else {
(len as i64 + from) as usize
}
});
let to = to.map_or(len, |to| {
if to >= 0 {
to as usize
} else {
(len as i64 + to) as usize
}
});
if from >= to {
return Err(Error::RangeFromAfterTo);
}
Self::seek_(&mut file, Self::index_to_byte_index(from))?;
let mut buf = Self::create_buffer();
Ok((from..to)
.map(|_| Self::read_exact(&mut file, &mut buf).map(|v| v.to_owned()).unwrap())
.collect::<Vec<_>>())
}
#[inline]
pub fn push(&mut self, value: T) {
self.pushed.push(value)
}
#[inline]
fn push_if_needed_(&mut self, index: I, value: T) -> Result<()> {
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
match self.len().cmp(&Self::i_to_usize(index)?) {
Ordering::Greater => {
// dbg!(len, index, &self.pathbuf);
@@ -229,6 +370,27 @@ where
}
}
#[inline]
fn push_and_flush_if_needed(&mut self, index: I, value: T, exit: &Exit) -> Result<()> {
match self.len().cmp(&Self::i_to_usize(index)?) {
Ordering::Less => {
return Err(Error::IndexTooHigh);
}
ord => {
if ord == Ordering::Greater {
self.safe_truncate_if_needed(index, exit)?;
}
self.pushed.push(value);
}
}
if self.pushed_len() >= FLUSH_EVERY {
Ok(self.safe_flush(exit)?)
} else {
Ok(())
}
}
#[inline]
pub fn len(&self) -> usize {
self.file_len + self.pushed_len()
@@ -258,7 +420,7 @@ where
self.has(index).map(|b| !b)
}
fn _flush(&mut self) -> io::Result<()> {
pub fn flush(&mut self) -> io::Result<()> {
if self.pushed.is_empty() {
return Ok(());
}
@@ -274,16 +436,35 @@ where
self.file.write_all(&bytes)?;
self.reset_disk_related_state()?;
self.reset_file_metadata()?;
Ok(())
}
fn reset(&mut self) -> Result<()> {
pub fn safe_flush(&mut self, exit: &Exit) -> io::Result<()> {
if exit.triggered() {
return Ok(());
}
exit.block();
self.flush()?;
exit.release();
Ok(())
}
pub fn reset_file(&mut self) -> Result<()> {
self.truncate_if_needed(I::from(0))?;
Ok(())
}
fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> {
let path = self.path_computed_version();
if version.validate(path.as_ref()).is_err() {
self.reset_file()?;
}
version.write(path.as_ref())?;
Ok(())
}
pub fn truncate_if_needed(&mut self, index: I) -> Result<Option<T>> {
let index = Self::i_to_usize(index)?;
@@ -291,14 +472,23 @@ where
return Ok(None);
}
let value_at_index = self.open_file_at_then_read(index).ok();
let value_at_index = self.open_then_read_(index).ok();
self.file.set_len(Self::index_to_byte_index(index))?;
self.reset_disk_related_state()?;
self.reset_file_metadata()?;
Ok(value_at_index)
}
pub fn safe_truncate_if_needed(&mut self, index: I, exit: &Exit) -> Result<()> {
if exit.triggered() {
return Ok(());
}
exit.block();
self.truncate_if_needed(index)?;
exit.release();
Ok(())
}
#[inline]
fn i_to_usize(index: I) -> Result<usize> {
@@ -357,228 +547,19 @@ where
path.join("version")
}
pub fn index_type_to_string(&self) -> &str {
std::any::type_name::<I>()
}
}
impl<I, T> StorableVec<I, T, CACHED_GETS>
where
I: StoredIndex,
T: StoredType,
{
#[inline]
pub fn get(&self, index: I) -> Result<Option<Value<'_, T>>> {
self.get_(Self::i_to_usize(index)?)
}
fn get_(&self, index: usize) -> Result<Option<Value<'_, T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed.get(index).map(|v| Value::Ref(v)));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
// if !self.updated.is_empty() {
// if let Some(v) = self.updated.get(&index) {
// return Ok(Some(v));
// }
// }
let page_index = index / Self::PER_PAGE;
let last_index = self.file_len - 1;
let max_page_index = last_index / Self::PER_PAGE;
let min_page_index = (max_page_index + 1) - self.cache.len();
// let min_open_page = self.min.load(AtomicOrdering::SeqCst);
// if self.min.load(AtomicOrdering::SeqCst) {
// self.min.set(value)
// }
if page_index >= min_page_index {
let mmap = &**self
.cache
.get(page_index - min_page_index)
.ok_or(Error::MmapsVecIsTooSmall)?
.get_or_init(|| {
Box::new(unsafe {
memmap2::MmapOptions::new()
.len(Self::PAGE_SIZE)
.offset((page_index * Self::PAGE_SIZE) as u64)
.map(&self.file)
.unwrap()
})
});
let range = Self::index_to_byte_range(index);
let slice = &mmap[range];
return Ok(Some(Value::Ref(T::try_ref_from_bytes(slice)?)));
}
Ok(Some(Value::Owned(self.open_file_at_then_read(index)?.to_owned())))
}
pub fn get_or_default(&self, index: I) -> Result<T>
where
T: Default + Clone,
{
Ok(self.get(index)?.map(|v| (*v).clone()).unwrap_or(Default::default()))
}
pub fn iter_from<F>(&self, mut index: I, mut f: F) -> Result<()>
where
F: FnMut((I, Value<T>)) -> Result<()>,
{
let disk_len = I::from(Self::read_disk_len_(&self.file)?);
while index < disk_len {
f((index, self.get(index)?.unwrap()))?;
index = index + 1;
}
let mut index = I::from(0);
let pushed_len = I::from(self.pushed_len());
let disk_len = Self::i_to_usize(disk_len)?;
while index < pushed_len {
f(((index + disk_len), self.get(index)?.unwrap()))?;
index = index + 1;
}
Ok(())
}
#[inline]
pub fn push(&mut self, value: T) {
self.push_(value)
}
#[inline]
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
self.push_if_needed_(index, value)
}
#[inline]
pub fn flush(&mut self) -> io::Result<()> {
self._flush()
}
}
const FLUSH_EVERY: usize = 10_000;
impl<I, T> StorableVec<I, T, SINGLE_THREAD>
where
I: StoredIndex,
T: StoredType,
{
pub fn get(&mut self, index: I) -> Result<&T> {
self.get_(Self::i_to_usize(index)?)
}
fn get_(&mut self, index: usize) -> Result<&T> {
let byte_index = Self::index_to_byte_index(index);
if self.file_position != byte_index {
self.file_position = Self::seek(&mut self.file, byte_index)?;
}
let res = Self::read_exact(&mut self.file, &mut self.buf);
if res.is_ok() {
self.file_position += Self::SIZE_OF_T as u64;
}
res
}
fn last(&mut self) -> Result<Option<&T>> {
let len = self.len();
if len == 0 {
return Ok(None);
}
Ok(self.get_(len - 1).ok())
}
// #[inline]
// fn push(&mut self, value: T) {
// self.push_(value)
// }
#[inline]
fn blocked_push_if_needed(&mut self, index: I, value: T, exit: &Exit) -> Result<()> {
self.push_if_needed_(index, value)?;
if self.pushed_len() >= FLUSH_EVERY {
Ok(self.blocked_flush(exit)?)
} else {
Ok(())
}
}
// pub fn iter<F>(&mut self, f: F) -> Result<()>
// where
// F: FnMut((I, &T)) -> Result<()>,
// {
// self.iter_from(I::default(), f)
// }
pub fn iter_from<F>(&mut self, mut index: I, mut f: F) -> Result<()>
where
F: FnMut((I, &T)) -> Result<()>,
{
// let pushed_len = self.pushed_len();
// self.seek_if_needed(index)?;
if !self.pushed.is_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let disk_len = I::from(Self::read_disk_len_(&self.file)?);
while index < disk_len {
f((index, self.get(index)?))?;
index = index + 1;
}
// i = 0;
// while i < pushed_len {
// f((I::from(i + disk_len), self.pushed.get(i).as_ref().unwrap()))?;
// i += 1;
// }
Ok(())
}
#[inline]
fn path_computed_version(&self) -> PathBuf {
self.path().join("computed_version")
}
// #[inline]
// fn path_computed_version_(path: &Path) -> PathBuf {
// path.join("computed_version")
// }
fn validate_or_reset(&mut self, version: Version) -> Result<()> {
let path = self.path_computed_version();
if version.validate(path.as_ref()).is_err() {
self.reset()?;
}
version.write(path.as_ref())?;
Ok(())
}
fn blocked_flush(&mut self, exit: &Exit) -> io::Result<()> {
if exit.triggered() {
return Ok(());
}
exit.block();
self._flush()?;
exit.release();
Ok(())
pub fn index_type_to_string(&self) -> &str {
std::any::type_name::<I>()
}
pub fn compute_transform<A, F>(
&mut self,
other: &mut StorableVec<I, A, SINGLE_THREAD>,
max_from: I,
other: &mut StorableVec<I, A>,
t: F,
exit: &Exit,
) -> Result<()>
@@ -586,78 +567,91 @@ where
A: StoredType,
F: Fn(&A) -> T,
{
self.validate_or_reset(Version::from(0) + self.version + other.version)?;
self.validate_computed_version_or_reset_file(Version::from(0) + self.version + other.version)?;
other.iter_from(I::from(self.len()), |(i, a)| self.blocked_push_if_needed(i, t(a), exit))?;
Ok(self.blocked_flush(exit)?)
let index = max_from.min(I::from(self.len()));
other.iter_from(index, |(i, a)| self.push_and_flush_if_needed(i, t(a), exit))?;
Ok(self.safe_flush(exit)?)
}
pub fn compute_inverse_more_to_less(
&mut self,
other: &mut StorableVec<T, I, SINGLE_THREAD>,
max_from: T,
other: &mut StorableVec<T, I>,
exit: &Exit,
) -> Result<()>
where
I: StoredType,
I: StoredType + StoredIndex,
T: StoredIndex,
{
self.validate_or_reset(Version::from(0) + self.version + other.version)?;
self.validate_computed_version_or_reset_file(Version::from(0) + self.version + other.version)?;
let index = self.last()?.cloned().unwrap_or_default();
other.iter_from(index, |(v, i)| self.blocked_push_if_needed(*i, v, exit))?;
Ok(self.blocked_flush(exit)?)
let index = max_from.min(self.read_last()?.cloned().unwrap_or_default());
other.iter_from(index, |(v, i)| self.push_and_flush_if_needed(*i, v, exit))?;
Ok(self.safe_flush(exit)?)
}
pub fn compute_inverse_less_to_more(
&mut self,
first_indexes: &mut StorableVec<T, I, SINGLE_THREAD>,
last_indexes: &mut StorableVec<T, I, SINGLE_THREAD>,
max_from: T,
first_indexes: &mut StorableVec<T, I>,
last_indexes: &mut StorableVec<T, I>,
exit: &Exit,
) -> Result<()>
where
I: StoredType,
T: StoredIndex,
{
self.validate_or_reset(Version::from(0) + self.version + first_indexes.version + last_indexes.version)?;
self.validate_computed_version_or_reset_file(
Version::from(0) + self.version + first_indexes.version + last_indexes.version,
)?;
first_indexes.iter_from(T::from(self.len()), |(value, first_index)| {
let index = max_from.min(T::from(self.len()));
first_indexes.iter_from(index, |(value, first_index)| {
let first_index = Self::i_to_usize(*first_index)?;
let last_index = Self::i_to_usize(*last_indexes.get(value)?)?;
(first_index..last_index).try_for_each(|index| self.blocked_push_if_needed(I::from(index), value, exit))
let last_index = Self::i_to_usize(*last_indexes.read(value)?.unwrap())?;
(first_index..last_index).try_for_each(|index| self.push_and_flush_if_needed(I::from(index), value, exit))
})?;
Ok(self.blocked_flush(exit)?)
Ok(self.safe_flush(exit)?)
}
pub fn compute_last_index_from_first(
&mut self,
first_indexes: &mut StorableVec<I, T, SINGLE_THREAD>,
max_from: I,
first_indexes: &mut StorableVec<I, T>,
final_len: usize,
exit: &Exit,
) -> Result<()>
where
T: Copy + From<usize> + Sub<T, Output = T> + StoredIndex,
{
self.validate_or_reset(Version::from(0) + self.version + first_indexes.version)?;
self.validate_computed_version_or_reset_file(Version::from(0) + self.version + first_indexes.version)?;
let index = max_from.min(I::from(self.len()));
let one = T::from(1);
let mut prev_index: Option<I> = None;
first_indexes.iter_from(I::from(self.len()), |(i, v)| {
first_indexes.iter_from(index, |(i, v)| {
if let Some(prev_index) = prev_index {
self.blocked_push_if_needed(prev_index, *v - one, exit)?;
self.push_and_flush_if_needed(prev_index, *v - one, exit)?;
}
prev_index.replace(i);
Ok(())
})?;
if let Some(prev_index) = prev_index {
self.blocked_push_if_needed(prev_index, T::from(final_len) - one, exit)?;
self.push_and_flush_if_needed(prev_index, T::from(final_len) - one, exit)?;
}
Ok(self.blocked_flush(exit)?)
Ok(self.safe_flush(exit)?)
}
pub fn compute_count_from_indexes<T2>(
&mut self,
first_indexes: &mut StorableVec<I, T2, SINGLE_THREAD>,
last_indexes: &mut StorableVec<I, T2, SINGLE_THREAD>,
max_from: I,
first_indexes: &mut StorableVec<I, T2>,
last_indexes: &mut StorableVec<I, T2>,
exit: &Exit,
) -> Result<()>
where
@@ -665,20 +659,25 @@ where
T2: StoredType + Copy + Add<usize, Output = T2> + Sub<T2, Output = T2> + TryInto<T>,
<T2 as TryInto<T>>::Error: error::Error + 'static,
{
self.validate_or_reset(Version::from(0) + self.version + first_indexes.version + last_indexes.version)?;
self.validate_computed_version_or_reset_file(
Version::from(0) + self.version + first_indexes.version + last_indexes.version,
)?;
first_indexes.iter_from(I::from(self.len()), |(i, first_index)| {
let last_index = last_indexes.get(i)?;
let index = max_from.min(I::from(self.len()));
first_indexes.iter_from(index, |(i, first_index)| {
let last_index = last_indexes.read(i)?.unwrap();
let count = *last_index + 1_usize - *first_index;
self.blocked_push_if_needed(i, count.into(), exit)
self.push_and_flush_if_needed(i, count.into(), exit)
})?;
Ok(self.blocked_flush(exit)?)
Ok(self.safe_flush(exit)?)
}
pub fn compute_is_first_ordered<A>(
&mut self,
self_to_other: &mut StorableVec<I, A, SINGLE_THREAD>,
other_to_self: &mut StorableVec<A, I, SINGLE_THREAD>,
max_from: I,
self_to_other: &mut StorableVec<I, A>,
other_to_self: &mut StorableVec<A, I>,
exit: &Exit,
) -> Result<()>
where
@@ -686,18 +685,23 @@ where
T: From<bool>,
A: StoredIndex + StoredType,
{
self.validate_or_reset(Version::from(0) + self.version + self_to_other.version + other_to_self.version)?;
self.validate_computed_version_or_reset_file(
Version::from(0) + self.version + self_to_other.version + other_to_self.version,
)?;
self_to_other.iter_from(I::from(self.len()), |(i, other)| {
self.blocked_push_if_needed(i, T::from(other_to_self.get(*other)? == &i), exit)
let index = max_from.min(I::from(self.len()));
self_to_other.iter_from(index, |(i, other)| {
self.push_and_flush_if_needed(i, T::from(other_to_self.read(*other)?.unwrap() == &i), exit)
})?;
Ok(self.blocked_flush(exit)?)
Ok(self.safe_flush(exit)?)
}
pub fn compute_sum_from_indexes<T2, F>(
&mut self,
first_indexes: &mut StorableVec<I, T2, SINGLE_THREAD>,
last_indexes: &mut StorableVec<I, T2, SINGLE_THREAD>,
max_from: I,
first_indexes: &mut StorableVec<I, T2>,
last_indexes: &mut StorableVec<I, T2>,
exit: &Exit,
) -> Result<()>
where
@@ -706,70 +710,22 @@ where
<T2 as TryInto<T>>::Error: error::Error + 'static,
F: Fn(&T2) -> T,
{
self.validate_or_reset(Version::from(0) + self.version + first_indexes.version + last_indexes.version)?;
self.validate_computed_version_or_reset_file(
Version::from(0) + self.version + first_indexes.version + last_indexes.version,
)?;
first_indexes.iter_from(I::from(self.len()), |(i, first_index)| {
let last_index = last_indexes.get(i)?;
let index = max_from.min(I::from(self.len()));
first_indexes.iter_from(index, |(index, first_index)| {
let last_index = last_indexes.read(index)?.unwrap();
let count = *last_index + 1_usize - *first_index;
self.blocked_push_if_needed(i, count.into(), exit)
self.push_and_flush_if_needed(index, count.into(), exit)
})?;
Ok(self.blocked_flush(exit)?)
Ok(self.safe_flush(exit)?)
}
}
impl<I, T> StorableVec<I, T, STATELESS>
where
I: StoredIndex,
T: StoredType,
{
#[inline]
pub fn get(&self, index: I) -> Result<Option<T>> {
Ok(Some(self.open_file_at_then_read(Self::i_to_usize(index)?)?))
}
pub fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<T>> {
if !self.pushed.is_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let mut file = self.open_file()?;
let len = Self::read_disk_len_(&file)?;
let from = from.map_or(0, |from| {
if from >= 0 {
from as usize
} else {
(len as i64 + from) as usize
}
});
let to = to.map_or(len, |to| {
if to >= 0 {
to as usize
} else {
(len as i64 + to) as usize
}
});
if from >= to {
return Err(Error::RangeFromAfterTo);
}
Self::seek(&mut file, Self::index_to_byte_index(from))?;
let mut buf = Self::create_buffer();
Ok((from..to)
.map(|_| Self::read_exact(&mut file, &mut buf).map(|v| v.to_owned()).unwrap())
.collect::<Vec<_>>())
}
// Add iter iter_from iter_range collect..
// + add memory cap
}
impl<I, T> Clone for StorableVec<I, T, STATELESS>
impl<I, T> Clone for StorableVec<I, T>
where
I: StoredIndex,
T: StoredType,

View File

@@ -1,11 +1,10 @@
use std::path::Path;
use brk_vec::{CACHED_GETS, SINGLE_THREAD, StorableVec, Version};
use brk_vec::{StorableVec, Version};
fn main() -> Result<(), Box<dyn std::error::Error>> {
{
let mut vec: StorableVec<usize, u32, CACHED_GETS> =
StorableVec::forced_import(Path::new("./v"), Version::from(1))?;
let mut vec: StorableVec<usize, u32> = StorableVec::forced_import(Path::new("./v"), Version::from(1))?;
vec.push(0);
vec.push(1);
@@ -17,13 +16,12 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
{
let mut vec: StorableVec<usize, u32, SINGLE_THREAD> =
StorableVec::forced_import(Path::new("./v"), Version::from(1))?;
let mut vec: StorableVec<usize, u32> = StorableVec::forced_import(Path::new("./v"), Version::from(1))?;
dbg!(vec.get(0)?); // 0
dbg!(vec.get(1)?); // 0
dbg!(vec.get(2)?); // 0
dbg!(vec.get(0)?); // 0
dbg!(vec.read(0)?); // 0
dbg!(vec.read(1)?); // 0
dbg!(vec.read(2)?); // 0
dbg!(vec.read(0)?); // 0
}
Ok(())

View File

@@ -1,6 +1,6 @@
use std::mem;
use std::io;
use crate::{Result, STATELESS, StorableVec};
use crate::{Result, StorableVec};
use super::{StoredIndex, StoredType};
@@ -9,10 +9,11 @@ pub trait AnyStorableVec: Send + Sync {
fn index_type_to_string(&self) -> &str;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
// fn flush(&mut self) -> io::Result<()>;
fn collect_range_values(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<serde_json::Value>>;
fn flush(&mut self) -> io::Result<()>;
}
impl<I, T, const MODE: u8> AnyStorableVec for StorableVec<I, T, MODE>
impl<I, T> AnyStorableVec for StorableVec<I, T>
where
I: StoredIndex,
T: StoredType,
@@ -33,33 +34,15 @@ where
self.is_empty()
}
// fn flush(&mut self) -> io::Result<()> {
// self.flush()
// }
}
fn flush(&mut self) -> io::Result<()> {
self.flush()
}
#[cfg(feature = "json")]
pub trait AnyJsonStorableVec: AnyStorableVec {
fn collect_range_values(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<serde_json::Value>>;
}
#[cfg(feature = "json")]
impl<I, T, const MODE: u8> AnyJsonStorableVec for StorableVec<I, T, MODE>
where
I: StoredIndex,
T: StoredType + serde::Serialize,
{
fn collect_range_values(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<serde_json::Value>> {
if MODE == STATELESS {
Ok(
unsafe { mem::transmute::<&StorableVec<I, T, MODE>, &StorableVec<I, T, STATELESS>>(self) }
.collect_range(from, to)?
.into_iter()
.map(|v| serde_json::to_value(v).unwrap())
.collect::<Vec<_>>(),
)
} else {
todo!("todo ?")
}
Ok(self
.collect_range(from, to)?
.into_iter()
.map(|v| serde_json::to_value(v).unwrap())
.collect::<Vec<_>>())
}
}

View File

@@ -1,13 +1,14 @@
use std::fmt::Debug;
use serde::Serialize;
use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes};
pub trait StoredType
where
Self: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync,
Self: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + Serialize,
{
}
impl<T> StoredType for T where
T: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync
T: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + Serialize
{
}