Compare commits

..

9 Commits

Author SHA1 Message Date
nym21 9aec991da6 release: v0.0.20 2025-04-10 21:39:34 +02:00
nym21 910701ce04 cleanup: old files 2025-04-10 21:39:12 +02:00
nym21 34b462d511 global: snapshot 2025-04-10 21:38:39 +02:00
nym21 139e93b2f0 vec: rework part 4 2025-04-10 15:55:26 +02:00
nym21 0dd7e9359e vec: rework part 3 2025-04-10 01:11:52 +02:00
nym21 41cf0225e3 vec: rework part 2 2025-04-09 22:59:18 +02:00
nym21 962254e511 vec: rework part 1 2025-04-09 16:31:31 +02:00
nym21 a7f2b24bac comp + vec: tiny opti 2025-04-08 15:38:20 +02:00
nym21 1323d988af computer + kibo: part 8 2025-04-08 11:40:35 +02:00
44 changed files with 2034 additions and 1377 deletions
Generated
+20 -12
View File
@@ -138,6 +138,12 @@ dependencies = [
"derive_arbitrary", "derive_arbitrary",
] ]
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.7.6" version = "0.7.6"
@@ -368,7 +374,7 @@ dependencies = [
[[package]] [[package]]
name = "brk" name = "brk"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"brk_cli", "brk_cli",
"brk_computer", "brk_computer",
@@ -385,7 +391,7 @@ dependencies = [
[[package]] [[package]]
name = "brk_cli" name = "brk_cli"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"brk_computer", "brk_computer",
"brk_core", "brk_core",
@@ -406,7 +412,7 @@ dependencies = [
[[package]] [[package]]
name = "brk_computer" name = "brk_computer"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"brk_core", "brk_core",
"brk_exit", "brk_exit",
@@ -421,7 +427,7 @@ dependencies = [
[[package]] [[package]]
name = "brk_core" name = "brk_core"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"bitcoin", "bitcoin",
"bitcoincore-rpc", "bitcoincore-rpc",
@@ -438,7 +444,7 @@ dependencies = [
[[package]] [[package]]
name = "brk_exit" name = "brk_exit"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"brk_logger", "brk_logger",
"ctrlc", "ctrlc",
@@ -447,7 +453,7 @@ dependencies = [
[[package]] [[package]]
name = "brk_fetcher" name = "brk_fetcher"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"brk_core", "brk_core",
"brk_logger", "brk_logger",
@@ -460,7 +466,7 @@ dependencies = [
[[package]] [[package]]
name = "brk_indexer" name = "brk_indexer"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"bitcoin", "bitcoin",
"bitcoincore-rpc", "bitcoincore-rpc",
@@ -479,7 +485,7 @@ dependencies = [
[[package]] [[package]]
name = "brk_logger" name = "brk_logger"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"color-eyre", "color-eyre",
"env_logger", "env_logger",
@@ -489,7 +495,7 @@ dependencies = [
[[package]] [[package]]
name = "brk_parser" name = "brk_parser"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"bitcoin", "bitcoin",
"bitcoincore-rpc", "bitcoincore-rpc",
@@ -504,7 +510,7 @@ dependencies = [
[[package]] [[package]]
name = "brk_query" name = "brk_query"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"brk_computer", "brk_computer",
"brk_indexer", "brk_indexer",
@@ -520,7 +526,7 @@ dependencies = [
[[package]] [[package]]
name = "brk_server" name = "brk_server"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"axum", "axum",
"brk_computer", "brk_computer",
@@ -546,8 +552,10 @@ dependencies = [
[[package]] [[package]]
name = "brk_vec" name = "brk_vec"
version = "0.0.19" version = "0.0.20"
dependencies = [ dependencies = [
"arc-swap",
"axum",
"memmap2", "memmap2",
"rayon", "rayon",
"serde", "serde",
+2 -1
View File
@@ -4,7 +4,7 @@ members = ["crates/*"]
package.description = "The Bitcoin Research Kit is a suite of tools designed to extract, compute and display data stored on a Bitcoin Core node" package.description = "The Bitcoin Research Kit is a suite of tools designed to extract, compute and display data stored on a Bitcoin Core node"
package.license = "MIT" package.license = "MIT"
package.edition = "2024" package.edition = "2024"
package.version = "0.0.19" package.version = "0.0.20"
package.repository = "https://github.com/bitcoinresearchkit/brk" package.repository = "https://github.com/bitcoinresearchkit/brk"
[profile.release] [profile.release]
@@ -16,6 +16,7 @@ panic = "abort"
inherits = "release" inherits = "release"
[workspace.dependencies] [workspace.dependencies]
axum = "0.8.3"
bitcoin = { version = "0.32.5", features = ["serde"] } bitcoin = { version = "0.32.5", features = ["serde"] }
bitcoincore-rpc = "0.19.0" bitcoincore-rpc = "0.19.0"
brk_cli = { version = "0", path = "crates/brk_cli" } brk_cli = { version = "0", path = "crates/brk_cli" }
+3 -2
View File
@@ -35,7 +35,7 @@
</p> </p>
> **WARNING** > **WARNING**
> >
> This project is still a work in progress and while it's much better in many ways than its previous version ([kibo v0.5](https://github.com/kibo-money/kibo)), it doesn't yet include all of those datasets. If you're interested in having everything right now, please use the latter until feature parity is achieved. > This project is still a work in progress and while it's much better in many ways than its previous version ([kibo v0.5](https://github.com/kibo-money/kibo)), it doesn't yet include all of those datasets. If you're interested in having everything right now, please use the latter until feature parity is achieved.
> >
> The explorer part (mempool.space/electrs) is also not viable just yet. > The explorer part (mempool.space/electrs) is also not viable just yet.
@@ -58,6 +58,7 @@ The toolkit can be used in various ways to accommodate as many needs as possible
For more information visit: [`brk_cli`](https://crates.io/crates/brk_cli) For more information visit: [`brk_cli`](https://crates.io/crates/brk_cli)
- **[Crates](https://crates.io/crates/brk)** \ - **[Crates](https://crates.io/crates/brk)** \
Rust developers have access to a wide range crates, each built upon one another with its own specific purpose, enabling independent use and offering great flexibility. Rust developers have access to a wide range crates, each built upon one another with its own specific purpose, enabling independent use and offering great flexibility.
PRs are welcome, especially if their goal is to introduce additional datasets.
The primary goal of this project is to be fully-featured and accessible for everyone, regardless of their background or financial situation - whether that person is an enthusiast, researcher, miner, analyst, or simply curious. The primary goal of this project is to be fully-featured and accessible for everyone, regardless of their background or financial situation - whether that person is an enthusiast, researcher, miner, analyst, or simply curious.
@@ -76,7 +77,7 @@ In contrast, existing alternatives tend to be either [very costly](https://studi
- [`brk_parser`](https://crates.io/crates/brk_parser): A very fast Bitcoin Core block parser and iterator built on top of bitcoin-rust - [`brk_parser`](https://crates.io/crates/brk_parser): A very fast Bitcoin Core block parser and iterator built on top of bitcoin-rust
- [`brk_query`](https://crates.io/crates/brk_query): A library that finds requested datasets. - [`brk_query`](https://crates.io/crates/brk_query): A library that finds requested datasets.
- [`brk_server`](https://crates.io/crates/brk_server): A server that serves Bitcoin data and swappable front-ends, built on top of `brk_indexer`, `brk_fetcher` and `brk_computer` - [`brk_server`](https://crates.io/crates/brk_server): A server that serves Bitcoin data and swappable front-ends, built on top of `brk_indexer`, `brk_fetcher` and `brk_computer`
- [`brk_vec`](https://crates.io/crates/brk_vec): A very small, fast, efficient and simple storable Vec. - [`brk_vec`](https://crates.io/crates/brk_vec): A push-only, truncable, compressable, saveable Vec
## Acknowledgments ## Acknowledgments
+4 -3
View File
@@ -272,9 +272,10 @@ impl RunConfig {
} }
fn read(path: &Path) -> Self { fn read(path: &Path) -> Self {
fs::read_to_string(path).map_or(RunConfig::default(), |contents| { fs::read_to_string(path).map_or_else(
toml::from_str(&contents).unwrap_or_default() |_| RunConfig::default(),
}) |contents| toml::from_str(&contents).unwrap_or_default(),
)
} }
fn write(&self, path: &Path) -> std::io::Result<()> { fn write(&self, path: &Path) -> std::io::Result<()> {
+91 -59
View File
@@ -2,7 +2,6 @@ use core::error;
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
fmt::Debug, fmt::Debug,
io,
ops::{Add, Sub}, ops::{Add, Sub},
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
@@ -10,15 +9,23 @@ use std::{
use brk_core::CheckedSub; use brk_core::CheckedSub;
use brk_exit::Exit; use brk_exit::Exit;
use brk_vec::{ use brk_vec::{
AnyStorableVec, Compressed, Error, Result, StorableVec, StoredIndex, StoredType, Version, Compressed, DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, StoredVec, Value,
Version,
}; };
use log::info;
const FLUSH_EVERY: usize = 10_000; const ONE_KIB: usize = 1024;
const ONE_MIB: usize = ONE_KIB * ONE_KIB;
const MAX_CACHE_SIZE: usize = 210 * ONE_MIB;
#[derive(Debug)] #[derive(Debug)]
pub struct ComputedVec<I, T> { pub struct ComputedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
computed_version: Option<Version>, computed_version: Option<Version>,
vec: StorableVec<I, T>, inner: StoredVec<I, T>,
} }
impl<I, T> ComputedVec<I, T> impl<I, T> ComputedVec<I, T>
@@ -26,16 +33,18 @@ where
I: StoredIndex, I: StoredIndex,
T: StoredType, T: StoredType,
{ {
const SIZE_OF: usize = size_of::<T>();
pub fn forced_import( pub fn forced_import(
path: &Path, path: &Path,
version: Version, version: Version,
compressed: Compressed, compressed: Compressed,
) -> brk_vec::Result<Self> { ) -> brk_vec::Result<Self> {
let vec = StorableVec::forced_import(path, version, compressed)?; let inner = StoredVec::forced_import(path, version, compressed)?;
Ok(Self { Ok(Self {
computed_version: None, computed_version: None,
vec, inner,
}) })
} }
@@ -44,7 +53,7 @@ where
return Ok(()); return Ok(());
} }
exit.block(); exit.block();
self.vec.truncate_if_needed(index)?; self.inner.truncate_if_needed(index)?;
exit.release(); exit.release();
Ok(()) Ok(())
} }
@@ -59,102 +68,118 @@ where
if ord == Ordering::Greater { if ord == Ordering::Greater {
self.safe_truncate_if_needed(index, exit)?; self.safe_truncate_if_needed(index, exit)?;
} }
self.vec.push(value); self.inner.push(value);
} }
} }
if self.vec.pushed_len() >= FLUSH_EVERY { if self.inner.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE {
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} else { } else {
Ok(()) Ok(())
} }
} }
pub fn safe_flush(&mut self, exit: &Exit) -> io::Result<()> { pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
if exit.triggered() { if exit.triggered() {
return Ok(()); return Ok(());
} }
exit.block(); exit.block();
self.vec.flush()?; self.inner.flush()?;
exit.release(); exit.release();
Ok(()) Ok(())
} }
fn version(&self) -> Version { fn version(&self) -> Version {
self.vec.version() self.inner.version()
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.vec.len() self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
} }
pub fn vec(&self) -> &StorableVec<I, T> { fn file_name(&self) -> String {
&self.vec self.inner.file_name()
} }
pub fn mut_vec(&mut self) -> &mut StorableVec<I, T> { pub fn vec(&self) -> &StoredVec<I, T> {
&mut self.vec &self.inner
} }
pub fn any_vec(&self) -> &dyn AnyStorableVec { pub fn mut_vec(&mut self) -> &mut StoredVec<I, T> {
&self.vec &mut self.inner
} }
pub fn mut_any_vec(&mut self) -> &mut dyn AnyStorableVec { pub fn any_vec(&self) -> &dyn brk_vec::AnyStoredVec {
&mut self.vec &self.inner
} }
pub fn get(&mut self, index: I) -> Result<Option<&T>> { pub fn mut_any_vec(&mut self) -> &mut dyn brk_vec::AnyStoredVec {
self.vec.get(index) &mut self.inner
} }
pub fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<T>> { pub fn cached_get(&mut self, index: I) -> Result<Option<Value<T>>> {
self.vec.collect_range(from, to) self.inner.cached_get(index)
}
pub fn collect_inclusive_range(&self, from: I, to: I) -> Result<Vec<T>> {
self.inner.collect_inclusive_range(from, to)
}
pub fn path(&self) -> &Path {
self.inner.path()
} }
#[inline] #[inline]
fn path_computed_version(&self) -> PathBuf { fn path_computed_version(&self) -> PathBuf {
self.vec.path().join("computed_version") self.inner.path().join("computed_version")
} }
fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> { fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> {
let path = self.path_computed_version(); let path = self.path_computed_version();
if version.validate(path.as_ref()).is_err() { if version.validate(path.as_ref()).is_err() {
self.vec.reset()?; self.inner.reset()?;
} }
version.write(path.as_ref())?; version.write(path.as_ref())?;
if self.is_empty() {
info!("Computing {}...", self.file_name())
}
Ok(()) Ok(())
} }
pub fn compute_transform<A, B, F>( pub fn compute_transform<A, B, F>(
&mut self, &mut self,
max_from: A, max_from: A,
other: &mut StorableVec<A, B>, other: &mut StoredVec<A, B>,
mut t: F, mut t: F,
exit: &Exit, exit: &Exit,
) -> Result<()> ) -> Result<()>
where where
A: StoredIndex, A: StoredIndex,
B: StoredType, B: StoredType,
F: FnMut((A, B, &mut Self, &mut StorableVec<A, B>)) -> (I, T), F: FnMut((A, B, &mut Self, &mut dyn DynamicVec<I = A, T = B>)) -> (I, T),
{ {
self.validate_computed_version_or_reset_file( self.validate_computed_version_or_reset_file(
Version::ZERO + self.version() + other.version(), Version::ZERO + self.version() + other.version(),
)?; )?;
let index = max_from.min(A::from(self.len())); let index = max_from.min(A::from(self.len()));
other.iter_from_cloned(index, |(a, b, other)| { other.iter_from(index, |(a, b, other)| {
let (i, v) = t((a, b, self, other)); let (i, v) = t((a, b, self, other));
self.forced_push_at(i, v, exit) self.forced_push_at(i, v, exit)
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_inverse_more_to_less( pub fn compute_inverse_more_to_less(
&mut self, &mut self,
max_from: T, max_from: T,
other: &mut StorableVec<T, I>, other: &mut StoredVec<T, I>,
exit: &Exit, exit: &Exit,
) -> Result<()> ) -> Result<()>
where where
@@ -165,24 +190,27 @@ where
Version::ZERO + self.version() + other.version(), Version::ZERO + self.version() + other.version(),
)?; )?;
let index = max_from.min(self.vec.get_last()?.cloned().unwrap_or_default()); let index = max_from.min(
self.inner
.cached_get_last()?
.map_or_else(T::default, |v| v.into_inner()),
);
other.iter_from(index, |(v, i, ..)| { other.iter_from(index, |(v, i, ..)| {
let i = *i; if self.cached_get(i).unwrap().is_none_or(|old_v| *old_v > v) {
if self.get(i).unwrap().is_none_or(|old_v| *old_v > v) {
self.forced_push_at(i, v, exit) self.forced_push_at(i, v, exit)
} else { } else {
Ok(()) Ok(())
} }
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_inverse_less_to_more( pub fn compute_inverse_less_to_more(
&mut self, &mut self,
max_from: T, max_from: T,
first_indexes: &mut StorableVec<T, I>, first_indexes: &mut StoredVec<T, I>,
last_indexes: &mut StorableVec<T, I>, last_indexes: &mut StoredVec<T, I>,
exit: &Exit, exit: &Exit,
) -> Result<()> ) -> Result<()>
where where
@@ -196,18 +224,18 @@ where
let index = max_from.min(T::from(self.len())); let index = max_from.min(T::from(self.len()));
first_indexes.iter_from(index, |(value, first_index, ..)| { first_indexes.iter_from(index, |(value, first_index, ..)| {
let first_index = (first_index).to_usize()?; let first_index = (first_index).to_usize()?;
let last_index = (last_indexes.get(value)?.unwrap()).to_usize()?; let last_index = (last_indexes.cached_get(value)?.unwrap()).to_usize()?;
(first_index..last_index) (first_index..last_index)
.try_for_each(|index| self.forced_push_at(I::from(index), value, exit)) .try_for_each(|index| self.forced_push_at(I::from(index), value, exit))
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_last_index_from_first( pub fn compute_last_index_from_first(
&mut self, &mut self,
max_from: I, max_from: I,
first_indexes: &mut StorableVec<I, T>, first_indexes: &mut StoredVec<I, T>,
final_len: usize, final_len: usize,
exit: &Exit, exit: &Exit,
) -> Result<()> ) -> Result<()>
@@ -236,14 +264,14 @@ where
)?; )?;
} }
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_count_from_indexes<T2>( pub fn compute_count_from_indexes<T2>(
&mut self, &mut self,
max_from: I, max_from: I,
first_indexes: &mut StorableVec<I, T2>, first_indexes: &mut StoredVec<I, T2>,
last_indexes: &mut StorableVec<I, T2>, last_indexes: &mut StoredVec<I, T2>,
exit: &Exit, exit: &Exit,
) -> Result<()> ) -> Result<()>
where where
@@ -257,21 +285,21 @@ where
let index = max_from.min(I::from(self.len())); let index = max_from.min(I::from(self.len()));
first_indexes.iter_from(index, |(i, first_index, ..)| { first_indexes.iter_from(index, |(i, first_index, ..)| {
let last_index = last_indexes.get(i)?.unwrap(); let last_index = last_indexes.cached_get(i)?.unwrap();
let count = (*last_index + 1_usize) let count = (*last_index + 1_usize)
.checked_sub(*first_index) .checked_sub(first_index)
.unwrap_or_default(); .unwrap_or_default();
self.forced_push_at(i, count.into(), exit) self.forced_push_at(i, count.into(), exit)
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_is_first_ordered<A>( pub fn compute_is_first_ordered<A>(
&mut self, &mut self,
max_from: I, max_from: I,
self_to_other: &mut StorableVec<I, A>, self_to_other: &mut StoredVec<I, A>,
other_to_self: &mut StorableVec<A, I>, other_to_self: &mut StoredVec<A, I>,
exit: &Exit, exit: &Exit,
) -> Result<()> ) -> Result<()>
where where
@@ -285,17 +313,21 @@ where
let index = max_from.min(I::from(self.len())); let index = max_from.min(I::from(self.len()));
self_to_other.iter_from(index, |(i, other, ..)| { self_to_other.iter_from(index, |(i, other, ..)| {
self.forced_push_at(i, T::from(other_to_self.get(*other)?.unwrap() == &i), exit) self.forced_push_at(
i,
T::from(other_to_self.cached_get(other)?.unwrap().into_inner() == i),
exit,
)
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_sum_from_indexes<T2>( pub fn compute_sum_from_indexes<T2>(
&mut self, &mut self,
max_from: I, max_from: I,
first_indexes: &mut StorableVec<I, T2>, first_indexes: &mut StoredVec<I, T2>,
last_indexes: &mut StorableVec<I, T2>, last_indexes: &mut StoredVec<I, T2>,
exit: &Exit, exit: &Exit,
) -> Result<()> ) -> Result<()>
where where
@@ -309,12 +341,12 @@ where
let index = max_from.min(I::from(self.len())); let index = max_from.min(I::from(self.len()));
first_indexes.iter_from(index, |(index, first_index, ..)| { first_indexes.iter_from(index, |(index, first_index, ..)| {
let last_index = last_indexes.get(index)?.unwrap(); let last_index = last_indexes.cached_get(index)?.unwrap();
let count = *last_index + 1_usize - *first_index; let count = *last_index + 1_usize - first_index;
self.forced_push_at(index, count.into(), exit) self.forced_push_at(index, count.into(), exit)
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
} }
@@ -326,7 +358,7 @@ where
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
computed_version: self.computed_version, computed_version: self.computed_version,
vec: self.vec.clone(), inner: self.inner.clone(),
} }
} }
} }
@@ -4,7 +4,7 @@ use brk_core::{CheckedSub, StoredU32, StoredU64, StoredUsize, Timestamp, Weight}
use brk_exit::Exit; use brk_exit::Exit;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_parser::bitcoin; use brk_parser::bitcoin;
use brk_vec::{AnyStorableVec, Compressed, Version}; use brk_vec::{Compressed, Version};
use super::{ use super::{
Indexes, Indexes,
@@ -156,7 +156,7 @@ impl Vecs {
Ok(()) Ok(())
} }
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
[ [
self.indexes_to_block_interval.any_vecs(), self.indexes_to_block_interval.any_vecs(),
self.indexes_to_block_count.any_vecs(), self.indexes_to_block_count.any_vecs(),
@@ -1,7 +1,9 @@
use std::path::Path; use std::path::Path;
use brk_exit::Exit; use brk_exit::Exit;
use brk_vec::{AnyStorableVec, Compressed, Result, StorableVec, StoredIndex, StoredType, Version}; use brk_vec::{
Compressed, DynamicVec, GenericVec, Result, StoredIndex, StoredType, StoredVec, Version,
};
use crate::storage::vecs::base::ComputedVec; use crate::storage::vecs::base::ComputedVec;
@@ -114,12 +116,7 @@ where
Ok(s) Ok(s)
} }
pub fn extend( pub fn extend(&mut self, max_from: I, source: &mut StoredVec<I, T>, exit: &Exit) -> Result<()> {
&mut self,
max_from: I,
source: &mut StorableVec<I, T>,
exit: &Exit,
) -> Result<()> {
if self.total.is_none() { if self.total.is_none() {
return Ok(()); return Ok(());
}; };
@@ -128,17 +125,16 @@ where
let total_vec = self.total.as_mut().unwrap(); let total_vec = self.total.as_mut().unwrap();
source.iter_from(index, |(i, v)| { source.iter_from(index, |(i, v, ..)| {
let prev = i let prev = i
.to_usize() .to_usize()
.unwrap() .unwrap()
.checked_sub(1) .checked_sub(1)
.map_or(T::from(0_usize), |prev_i| { .map_or(T::from(0_usize), |prev_i| {
total_vec total_vec
.get(I::from(prev_i)) .cached_get(I::from(prev_i))
.unwrap() .unwrap()
.unwrap_or(&T::from(0_usize)) .map_or(T::from(0_usize), |v| v.into_inner())
.to_owned()
}); });
let value = v.clone() + prev; let value = v.clone() + prev;
total_vec.forced_push_at(i, value, exit)?; total_vec.forced_push_at(i, value, exit)?;
@@ -154,9 +150,9 @@ where
pub fn compute<I2>( pub fn compute<I2>(
&mut self, &mut self,
max_from: I, max_from: I,
source: &mut StorableVec<I2, T>, source: &mut StoredVec<I2, T>,
first_indexes: &mut StorableVec<I, I2>, first_indexes: &mut StoredVec<I, I2>,
last_indexes: &mut StorableVec<I, I2>, last_indexes: &mut StoredVec<I, I2>,
exit: &Exit, exit: &Exit,
) -> Result<()> ) -> Result<()>
where where
@@ -166,23 +162,25 @@ where
{ {
let index = self.starting_index(max_from); let index = self.starting_index(max_from);
first_indexes.iter_from(index, |(i, first_index)| { first_indexes.iter_from(index, |(i, first_index, ..)| {
let first_index = *first_index; let last_index = *last_indexes.cached_get(i)?.unwrap();
let last_index = *last_indexes.get(i).unwrap().unwrap();
if let Some(first) = self.first.as_mut() { if let Some(first) = self.first.as_mut() {
let v = source.get(first_index).unwrap().unwrap(); let v = source.cached_get(first_index)?.unwrap().into_inner();
first.forced_push_at(index, v.clone(), exit)?; first.forced_push_at(index, v, exit)?;
} }
if let Some(last) = self.last.as_mut() { if let Some(last) = self.last.as_mut() {
let v = source.get(last_index).unwrap().unwrap(); let v = source
last.forced_push_at(index, v.clone(), exit)?; .cached_get(last_index)
.inspect_err(|_| {
dbg!(last.path(), last_index);
})?
.unwrap()
.into_inner();
last.forced_push_at(index, v, exit)?;
} }
let first_index = first_index.to_usize()?;
let last_index = last_index.to_usize()?;
let needs_sum_or_total = self.sum.is_some() || self.total.is_some(); let needs_sum_or_total = self.sum.is_some() || self.total.is_some();
let needs_average_sum_or_total = needs_sum_or_total || self.average.is_some(); let needs_average_sum_or_total = needs_sum_or_total || self.average.is_some();
let needs_sorted = self.max.is_some() let needs_sorted = self.max.is_some()
@@ -195,8 +193,7 @@ where
let needs_values = needs_sorted || needs_average_sum_or_total; let needs_values = needs_sorted || needs_average_sum_or_total;
if needs_values { if needs_values {
let mut values = let mut values = source.collect_inclusive_range(first_index, last_index)?;
source.collect_range(Some(first_index as i64), Some(last_index as i64))?;
if needs_sorted { if needs_sorted {
values.sort_unstable(); values.sort_unstable();
@@ -254,7 +251,12 @@ where
let prev = i.to_usize().unwrap().checked_sub(1).map_or( let prev = i.to_usize().unwrap().checked_sub(1).map_or(
T::from(0_usize), T::from(0_usize),
|prev_i| { |prev_i| {
total_vec.get(I::from(prev_i)).unwrap().unwrap().to_owned() total_vec
.cached_get(I::from(prev_i))
.unwrap()
.unwrap()
.to_owned()
.into_inner()
}, },
); );
total_vec.forced_push_at(i, prev + sum, exit)?; total_vec.forced_push_at(i, prev + sum, exit)?;
@@ -276,8 +278,8 @@ where
&mut self, &mut self,
max_from: I, max_from: I,
source: &mut ComputedVecBuilder<I2, T>, source: &mut ComputedVecBuilder<I2, T>,
first_indexes: &mut StorableVec<I, I2>, first_indexes: &mut StoredVec<I, I2>,
last_indexes: &mut StorableVec<I, I2>, last_indexes: &mut StoredVec<I, I2>,
exit: &Exit, exit: &Exit,
) -> Result<()> ) -> Result<()>
where where
@@ -296,19 +298,18 @@ where
let index = self.starting_index(max_from); let index = self.starting_index(max_from);
first_indexes.iter_from(index, |(i, first_index)| { first_indexes.iter_from(index, |(i, first_index, ..)| {
let first_index = *first_index; let last_index = *last_indexes.cached_get(i).unwrap().unwrap();
let last_index = *last_indexes.get(i).unwrap().unwrap();
if let Some(first) = self.first.as_mut() { if let Some(first) = self.first.as_mut() {
let v = source let v = source
.first .first
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(first_index) .cached_get(first_index)
.unwrap() .unwrap()
.cloned() .unwrap()
.unwrap(); .into_inner();
first.forced_push_at(index, v, exit)?; first.forced_push_at(index, v, exit)?;
} }
@@ -317,16 +318,13 @@ where
.last .last
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(last_index) .cached_get(last_index)
.unwrap() .unwrap()
.cloned() .unwrap()
.unwrap(); .into_inner();
last.forced_push_at(index, v, exit)?; last.forced_push_at(index, v, exit)?;
} }
let first_index = Some(first_index.to_usize()? as i64);
let last_index = Some(last_index.to_usize()? as i64);
let needs_sum_or_total = self.sum.is_some() || self.total.is_some(); let needs_sum_or_total = self.sum.is_some() || self.total.is_some();
let needs_average_sum_or_total = needs_sum_or_total || self.average.is_some(); let needs_average_sum_or_total = needs_sum_or_total || self.average.is_some();
let needs_sorted = self.max.is_some() || self.min.is_some(); let needs_sorted = self.max.is_some() || self.min.is_some();
@@ -339,7 +337,7 @@ where
.max .max
.as_ref() .as_ref()
.unwrap() .unwrap()
.collect_range(first_index, last_index)?; .collect_inclusive_range(first_index, last_index)?;
values.sort_unstable(); values.sort_unstable();
max.forced_push_at(i, values.last().unwrap().clone(), exit)?; max.forced_push_at(i, values.last().unwrap().clone(), exit)?;
} }
@@ -349,7 +347,7 @@ where
.min .min
.as_ref() .as_ref()
.unwrap() .unwrap()
.collect_range(first_index, last_index)?; .collect_inclusive_range(first_index, last_index)?;
values.sort_unstable(); values.sort_unstable();
min.forced_push_at(i, values.first().unwrap().clone(), exit)?; min.forced_push_at(i, values.first().unwrap().clone(), exit)?;
} }
@@ -361,7 +359,7 @@ where
.average .average
.as_ref() .as_ref()
.unwrap() .unwrap()
.collect_range(first_index, last_index)?; .collect_inclusive_range(first_index, last_index)?;
let len = values.len() as f64; let len = values.len() as f64;
let total = values let total = values
.into_iter() .into_iter()
@@ -378,7 +376,7 @@ where
.sum .sum
.as_ref() .as_ref()
.unwrap() .unwrap()
.collect_range(first_index, last_index)?; .collect_inclusive_range(first_index, last_index)?;
let sum = values.into_iter().fold(T::from(0), |a, b| a + b); let sum = values.into_iter().fold(T::from(0), |a, b| a + b);
if let Some(sum_vec) = self.sum.as_mut() { if let Some(sum_vec) = self.sum.as_mut() {
@@ -389,7 +387,11 @@ where
let prev = i.to_usize().unwrap().checked_sub(1).map_or( let prev = i.to_usize().unwrap().checked_sub(1).map_or(
T::from(0_usize), T::from(0_usize),
|prev_i| { |prev_i| {
total_vec.get(I::from(prev_i)).unwrap().unwrap().to_owned() total_vec
.cached_get(I::from(prev_i))
.unwrap()
.unwrap()
.into_inner()
}, },
); );
total_vec.forced_push_at(i, prev + sum, exit)?; total_vec.forced_push_at(i, prev + sum, exit)?;
@@ -434,8 +436,8 @@ where
)) ))
} }
pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
let mut v: Vec<&dyn AnyStorableVec> = vec![]; let mut v: Vec<&dyn brk_vec::AnyStoredVec> = vec![];
if let Some(first) = self.first.as_ref() { if let Some(first) = self.first.as_ref() {
v.push(first.any_vec()); v.push(first.any_vec());
@@ -3,7 +3,7 @@ use std::path::Path;
use brk_core::{Dateindex, Decadeindex, Monthindex, Quarterindex, Weekindex, Yearindex}; use brk_core::{Dateindex, Decadeindex, Monthindex, Quarterindex, Weekindex, Yearindex};
use brk_exit::Exit; use brk_exit::Exit;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Result, Version}; use brk_vec::{AnyStoredVec, Compressed, Result, Version};
use crate::storage::vecs::{Indexes, base::ComputedVec, indexes}; use crate::storage::vecs::{Indexes, base::ComputedVec, indexes};
@@ -126,7 +126,7 @@ where
Ok(()) Ok(())
} }
pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
[ [
vec![self.dateindex.any_vec()], vec![self.dateindex.any_vec()],
self.dateindex_extra.any_vecs(), self.dateindex_extra.any_vecs(),
@@ -5,7 +5,7 @@ use brk_core::{
}; };
use brk_exit::Exit; use brk_exit::Exit;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Result, StorableVec, Version}; use brk_vec::{AnyStoredVec, Compressed, Result, StoredVec, Version};
use crate::storage::vecs::{Indexes, base::ComputedVec, indexes}; use crate::storage::vecs::{Indexes, base::ComputedVec, indexes};
@@ -102,7 +102,7 @@ where
indexes: &mut indexes::Vecs, indexes: &mut indexes::Vecs,
starting_indexes: &Indexes, starting_indexes: &Indexes,
exit: &Exit, exit: &Exit,
height: Option<&mut StorableVec<Height, T>>, height: Option<&mut StoredVec<Height, T>>,
) -> color_eyre::Result<()> { ) -> color_eyre::Result<()> {
let height = height.unwrap_or_else(|| self.height.as_mut().unwrap().mut_vec()); let height = height.unwrap_or_else(|| self.height.as_mut().unwrap().mut_vec());
@@ -168,7 +168,7 @@ where
Ok(()) Ok(())
} }
pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
[ [
self.height.as_ref().map_or(vec![], |v| vec![v.any_vec()]), self.height.as_ref().map_or(vec![], |v| vec![v.any_vec()]),
self.height_extra.any_vecs(), self.height_extra.any_vecs(),
@@ -3,7 +3,7 @@ use std::path::Path;
use brk_core::{Difficultyepoch, Height}; use brk_core::{Difficultyepoch, Height};
use brk_exit::Exit; use brk_exit::Exit;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Result, Version}; use brk_vec::{AnyStoredVec, Compressed, Result, Version};
use crate::storage::vecs::{Indexes, base::ComputedVec, indexes}; use crate::storage::vecs::{Indexes, base::ComputedVec, indexes};
@@ -84,7 +84,7 @@ where
Ok(()) Ok(())
} }
pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
[ [
vec![self.height.any_vec()], vec![self.height.any_vec()],
self.height_extra.any_vecs(), self.height_extra.any_vecs(),
@@ -6,7 +6,7 @@ use brk_core::{
}; };
use brk_exit::Exit; use brk_exit::Exit;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Result, Version}; use brk_vec::{AnyStoredVec, Compressed, Result, StoredVec, Version};
use crate::storage::vecs::{Indexes, base::ComputedVec, indexes}; use crate::storage::vecs::{Indexes, base::ComputedVec, indexes};
@@ -17,7 +17,7 @@ pub struct ComputedVecsFromTxindex<T>
where where
T: ComputedType + PartialOrd, T: ComputedType + PartialOrd,
{ {
pub txindex: ComputedVec<Txindex, T>, pub txindex: Option<ComputedVec<Txindex, T>>,
pub txindex_extra: ComputedVecBuilder<Txindex, T>, pub txindex_extra: ComputedVecBuilder<Txindex, T>,
pub height: ComputedVecBuilder<Height, T>, pub height: ComputedVecBuilder<Height, T>,
pub dateindex: ComputedVecBuilder<Dateindex, T>, pub dateindex: ComputedVecBuilder<Dateindex, T>,
@@ -38,15 +38,19 @@ where
pub fn forced_import( pub fn forced_import(
path: &Path, path: &Path,
name: &str, name: &str,
compute_source: bool,
version: Version, version: Version,
compressed: Compressed, compressed: Compressed,
options: StorableVecGeneatorOptions, options: StorableVecGeneatorOptions,
) -> color_eyre::Result<Self> { ) -> color_eyre::Result<Self> {
let txindex = ComputedVec::forced_import( let txindex = compute_source.then(|| {
&path.join(format!("txindex_to_{name}")), ComputedVec::forced_import(
version, &path.join(format!("txindex_to_{name}")),
compressed, version,
)?; compressed,
)
.unwrap()
});
let txindex_extra = ComputedVecBuilder::forced_import( let txindex_extra = ComputedVecBuilder::forced_import(
path, path,
@@ -75,7 +79,7 @@ where
}) })
} }
pub fn compute<F>( pub fn compute_all<F>(
&mut self, &mut self,
indexer: &mut Indexer, indexer: &mut Indexer,
indexes: &mut indexes::Vecs, indexes: &mut indexes::Vecs,
@@ -92,14 +96,35 @@ where
&Exit, &Exit,
) -> Result<()>, ) -> Result<()>,
{ {
compute(&mut self.txindex, indexer, indexes, starting_indexes, exit)?; compute(
self.txindex.as_mut().unwrap(),
indexer,
indexes,
starting_indexes,
exit,
)?;
self.compute_rest(indexer, indexes, starting_indexes, exit, None)?;
Ok(())
}
pub fn compute_rest(
&mut self,
indexer: &mut Indexer,
indexes: &mut indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
txindex: Option<&mut StoredVec<Txindex, T>>,
) -> color_eyre::Result<()> {
let txindex = txindex.unwrap_or_else(|| self.txindex.as_mut().unwrap().mut_vec());
self.txindex_extra self.txindex_extra
.extend(starting_indexes.txindex, self.txindex.mut_vec(), exit)?; .extend(starting_indexes.txindex, txindex, exit)?;
self.height.compute( self.height.compute(
starting_indexes.height, starting_indexes.height,
self.txindex.mut_vec(), txindex,
indexer.mut_vecs().height_to_first_txindex.mut_vec(), indexer.mut_vecs().height_to_first_txindex.mut_vec(),
indexes.height_to_last_txindex.mut_vec(), indexes.height_to_last_txindex.mut_vec(),
exit, exit,
@@ -164,9 +189,9 @@ where
Ok(()) Ok(())
} }
pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
[ [
vec![self.txindex.any_vec()], self.txindex.as_ref().map_or(vec![], |v| vec![v.any_vec()]),
self.txindex_extra.any_vecs(), self.txindex_extra.any_vecs(),
self.height.any_vecs(), self.height.any_vecs(),
self.dateindex.any_vecs(), self.dateindex.any_vecs(),
+57 -36
View File
@@ -6,7 +6,7 @@ use brk_core::{
}; };
use brk_exit::Exit; use brk_exit::Exit;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Version}; use brk_vec::{Compressed, Version};
use super::ComputedVec; use super::ComputedVec;
@@ -345,7 +345,7 @@ impl Vecs {
|(h, d, s, ..)| { |(h, d, s, ..)| {
let d = h let d = h
.decremented() .decremented()
.and_then(|h| s.get(h).ok()) .and_then(|h| s.cached_get(h).ok())
.flatten() .flatten()
.map_or(d, |prev_d| { .map_or(d, |prev_d| {
let prev_d = *prev_d; let prev_d = *prev_d;
@@ -365,9 +365,8 @@ impl Vecs {
let starting_dateindex = self let starting_dateindex = self
.height_to_dateindex .height_to_dateindex
.get(starting_indexes.height.decremented().unwrap_or_default())? .cached_get(starting_indexes.height.decremented().unwrap_or_default())?
.copied() .map_or_else(Default::default, |v| v.into_inner());
.unwrap_or_default();
self.height_to_dateindex.compute_transform( self.height_to_dateindex.compute_transform(
starting_indexes.height, starting_indexes.height,
@@ -378,8 +377,8 @@ impl Vecs {
let starting_dateindex = if let Some(dateindex) = self let starting_dateindex = if let Some(dateindex) = self
.height_to_dateindex .height_to_dateindex
.get(starting_indexes.height.decremented().unwrap_or_default())? .cached_get(starting_indexes.height.decremented().unwrap_or_default())?
.copied() .map(|v| v.into_inner())
{ {
starting_dateindex.min(dateindex) starting_dateindex.min(dateindex)
} else { } else {
@@ -451,9 +450,8 @@ impl Vecs {
let starting_weekindex = self let starting_weekindex = self
.dateindex_to_weekindex .dateindex_to_weekindex
.get(starting_dateindex)? .cached_get(starting_dateindex)?
.copied() .map_or_else(Default::default, |v| v.into_inner());
.unwrap_or_default();
self.dateindex_to_weekindex.compute_transform( self.dateindex_to_weekindex.compute_transform(
starting_dateindex, starting_dateindex,
@@ -487,7 +485,12 @@ impl Vecs {
self.weekindex_to_timestamp.compute_transform( self.weekindex_to_timestamp.compute_transform(
starting_weekindex, starting_weekindex,
self.weekindex_to_first_dateindex.mut_vec(), self.weekindex_to_first_dateindex.mut_vec(),
|(i, d, ..)| (i, *self.dateindex_to_timestamp.get(d).unwrap().unwrap()), |(i, d, ..)| {
(
i,
*self.dateindex_to_timestamp.cached_get(d).unwrap().unwrap(),
)
},
exit, exit,
)?; )?;
@@ -495,9 +498,8 @@ impl Vecs {
let starting_monthindex = self let starting_monthindex = self
.dateindex_to_monthindex .dateindex_to_monthindex
.get(starting_dateindex)? .cached_get(starting_dateindex)?
.copied() .map_or_else(Default::default, |v| v.into_inner());
.unwrap_or_default();
self.dateindex_to_monthindex.compute_transform( self.dateindex_to_monthindex.compute_transform(
starting_dateindex, starting_dateindex,
@@ -533,7 +535,12 @@ impl Vecs {
self.monthindex_to_timestamp.compute_transform( self.monthindex_to_timestamp.compute_transform(
starting_monthindex, starting_monthindex,
self.monthindex_to_first_dateindex.mut_vec(), self.monthindex_to_first_dateindex.mut_vec(),
|(i, d, ..)| (i, *self.dateindex_to_timestamp.get(d).unwrap().unwrap()), |(i, d, ..)| {
(
i,
*self.dateindex_to_timestamp.cached_get(d).unwrap().unwrap(),
)
},
exit, exit,
)?; )?;
@@ -541,9 +548,8 @@ impl Vecs {
let starting_quarterindex = self let starting_quarterindex = self
.monthindex_to_quarterindex .monthindex_to_quarterindex
.get(starting_monthindex)? .cached_get(starting_monthindex)?
.copied() .map_or_else(Default::default, |v| v.into_inner());
.unwrap_or_default();
self.monthindex_to_quarterindex.compute_transform( self.monthindex_to_quarterindex.compute_transform(
starting_monthindex, starting_monthindex,
@@ -579,7 +585,12 @@ impl Vecs {
self.quarterindex_to_timestamp.compute_transform( self.quarterindex_to_timestamp.compute_transform(
starting_quarterindex, starting_quarterindex,
self.quarterindex_to_first_monthindex.mut_vec(), self.quarterindex_to_first_monthindex.mut_vec(),
|(i, m, ..)| (i, *self.monthindex_to_timestamp.get(m).unwrap().unwrap()), |(i, m, ..)| {
(
i,
*self.monthindex_to_timestamp.cached_get(m).unwrap().unwrap(),
)
},
exit, exit,
)?; )?;
@@ -587,9 +598,8 @@ impl Vecs {
let starting_yearindex = self let starting_yearindex = self
.monthindex_to_yearindex .monthindex_to_yearindex
.get(starting_monthindex)? .cached_get(starting_monthindex)?
.copied() .map_or_else(Default::default, |v| v.into_inner());
.unwrap_or_default();
self.monthindex_to_yearindex.compute_transform( self.monthindex_to_yearindex.compute_transform(
starting_monthindex, starting_monthindex,
@@ -625,7 +635,12 @@ impl Vecs {
self.yearindex_to_timestamp.compute_transform( self.yearindex_to_timestamp.compute_transform(
starting_yearindex, starting_yearindex,
self.yearindex_to_first_monthindex.mut_vec(), self.yearindex_to_first_monthindex.mut_vec(),
|(i, m, ..)| (i, *self.monthindex_to_timestamp.get(m).unwrap().unwrap()), |(i, m, ..)| {
(
i,
*self.monthindex_to_timestamp.cached_get(m).unwrap().unwrap(),
)
},
exit, exit,
)?; )?;
@@ -633,9 +648,8 @@ impl Vecs {
let starting_decadeindex = self let starting_decadeindex = self
.yearindex_to_decadeindex .yearindex_to_decadeindex
.get(starting_yearindex)? .cached_get(starting_yearindex)?
.copied() .map_or_else(Default::default, |v| v.into_inner());
.unwrap_or_default();
self.yearindex_to_decadeindex.compute_transform( self.yearindex_to_decadeindex.compute_transform(
starting_yearindex, starting_yearindex,
@@ -669,7 +683,12 @@ impl Vecs {
self.decadeindex_to_timestamp.compute_transform( self.decadeindex_to_timestamp.compute_transform(
starting_decadeindex, starting_decadeindex,
self.decadeindex_to_first_yearindex.mut_vec(), self.decadeindex_to_first_yearindex.mut_vec(),
|(i, y, ..)| (i, *self.yearindex_to_timestamp.get(y).unwrap().unwrap()), |(i, y, ..)| {
(
i,
*self.yearindex_to_timestamp.cached_get(y).unwrap().unwrap(),
)
},
exit, exit,
)?; )?;
@@ -677,9 +696,8 @@ impl Vecs {
let starting_difficultyepoch = self let starting_difficultyepoch = self
.height_to_difficultyepoch .height_to_difficultyepoch
.get(starting_indexes.height)? .cached_get(starting_indexes.height)?
.copied() .map_or_else(Default::default, |v| v.into_inner());
.unwrap_or_default();
self.height_to_difficultyepoch.compute_transform( self.height_to_difficultyepoch.compute_transform(
starting_indexes.height, starting_indexes.height,
@@ -716,7 +734,11 @@ impl Vecs {
|(i, h, ..)| { |(i, h, ..)| {
( (
i, i,
*indexer_vecs.height_to_timestamp.get(h).unwrap().unwrap(), *indexer_vecs
.height_to_timestamp
.cached_get(h)
.unwrap()
.unwrap(),
) )
}, },
exit, exit,
@@ -726,9 +748,8 @@ impl Vecs {
let starting_halvingepoch = self let starting_halvingepoch = self
.height_to_halvingepoch .height_to_halvingepoch
.get(starting_indexes.height)? .cached_get(starting_indexes.height)?
.copied() .map_or_else(Default::default, |v| v.into_inner());
.unwrap_or_default();
self.height_to_halvingepoch.compute_transform( self.height_to_halvingepoch.compute_transform(
starting_indexes.height, starting_indexes.height,
@@ -765,7 +786,7 @@ impl Vecs {
// |(i, h, ..)| { // |(i, h, ..)| {
// ( // (
// i, // i,
// *indexer_vecs.height_to_timestamp.get(h).unwrap().unwrap(), // *indexer_vecs.height_to_timestamp.cached_get(h).unwrap().unwrap(),
// ) // )
// }, // },
// exit, // exit,
@@ -784,7 +805,7 @@ impl Vecs {
}) })
} }
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
vec![ vec![
self.dateindex_to_date.any_vec(), self.dateindex_to_date.any_vec(),
self.dateindex_to_dateindex.any_vec(), self.dateindex_to_dateindex.any_vec(),
@@ -7,7 +7,7 @@ use brk_core::{
use brk_exit::Exit; use brk_exit::Exit;
use brk_fetcher::Fetcher; use brk_fetcher::Fetcher;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Version}; use brk_vec::{Compressed, Version};
use super::{ use super::{
ComputedVec, Indexes, ComputedVec, Indexes,
@@ -243,8 +243,9 @@ impl Vecs {
.get_height( .get_height(
h, h,
t, t,
h.decremented() h.decremented().map(|prev_h| {
.map(|prev_h| *height_to_timestamp.get(prev_h).unwrap().unwrap()), *height_to_timestamp.cached_get(prev_h).unwrap().unwrap()
}),
) )
.unwrap(); .unwrap();
(h, ohlc) (h, ohlc)
@@ -470,7 +471,7 @@ impl Vecs {
.first .first
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
high: *self high: *self
@@ -479,7 +480,7 @@ impl Vecs {
.max .max
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
low: *self low: *self
@@ -488,7 +489,7 @@ impl Vecs {
.min .min
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
close, close,
@@ -516,7 +517,7 @@ impl Vecs {
.first .first
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
high: *self high: *self
@@ -525,7 +526,7 @@ impl Vecs {
.max .max
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
low: *self low: *self
@@ -534,7 +535,7 @@ impl Vecs {
.min .min
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
close, close,
@@ -562,7 +563,7 @@ impl Vecs {
.first .first
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
high: *self high: *self
@@ -571,7 +572,7 @@ impl Vecs {
.max .max
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
low: *self low: *self
@@ -580,7 +581,7 @@ impl Vecs {
.min .min
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
close, close,
@@ -608,7 +609,7 @@ impl Vecs {
.first .first
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
high: *self high: *self
@@ -617,7 +618,7 @@ impl Vecs {
.max .max
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
low: *self low: *self
@@ -626,7 +627,7 @@ impl Vecs {
.min .min
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
close, close,
@@ -654,7 +655,7 @@ impl Vecs {
.first .first
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
high: *self high: *self
@@ -663,7 +664,7 @@ impl Vecs {
.max .max
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
low: *self low: *self
@@ -672,7 +673,7 @@ impl Vecs {
.min .min
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
close, close,
@@ -704,7 +705,7 @@ impl Vecs {
.first .first
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
high: *self high: *self
@@ -713,7 +714,7 @@ impl Vecs {
.max .max
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
low: *self low: *self
@@ -722,7 +723,7 @@ impl Vecs {
.min .min
.as_mut() .as_mut()
.unwrap() .unwrap()
.get(i) .cached_get(i)
.unwrap() .unwrap()
.unwrap(), .unwrap(),
close, close,
@@ -765,7 +766,7 @@ impl Vecs {
Ok(()) Ok(())
} }
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
vec![ vec![
vec![ vec![
self.dateindex_to_close_in_cents.any_vec(), self.dateindex_to_close_in_cents.any_vec(),
+2 -2
View File
@@ -3,7 +3,7 @@ use std::{fs, path::Path};
use brk_exit::Exit; use brk_exit::Exit;
use brk_fetcher::Fetcher; use brk_fetcher::Fetcher;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed}; use brk_vec::{AnyStoredVec, Compressed};
mod base; mod base;
mod blocks; mod blocks;
@@ -63,7 +63,7 @@ impl Vecs {
Ok(()) Ok(())
} }
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
[ [
self.indexes.as_any_vecs(), self.indexes.as_any_vecs(),
self.blocks.as_any_vecs(), self.blocks.as_any_vecs(),
@@ -1,13 +1,13 @@
use std::{fs, path::Path}; use std::{fs, path::Path};
use brk_core::{StoredU64, Txindex}; use brk_core::{Sats, StoredU8, StoredU32, StoredU64, Txindex, Txinindex, Txoutindex};
use brk_exit::Exit; use brk_exit::Exit;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Version}; use brk_vec::{Compressed, DynamicVec, Version};
use super::{ use super::{
ComputedVec, Indexes, ComputedVec, Indexes,
grouped::{ComputedVecsFromTxindex, StorableVecGeneatorOptions}, grouped::{ComputedVecsFromHeight, ComputedVecsFromTxindex, StorableVecGeneatorOptions},
indexes, indexes,
}; };
@@ -21,15 +21,24 @@ pub struct Vecs {
// pub height_to_outputcount: ComputedVec<Height, u32>, // pub height_to_outputcount: ComputedVec<Height, u32>,
// pub height_to_subsidy: ComputedVec<Height, u32>, // pub height_to_subsidy: ComputedVec<Height, u32>,
// pub height_to_totalfees: ComputedVec<Height, Sats>, // pub height_to_totalfees: ComputedVec<Height, Sats>,
// pub height_to_txcount: ComputedVec<Height, u32>,
// pub txindex_to_fee: ComputedVec<Txindex, Sats>, // pub txindex_to_fee: ComputedVec<Txindex, Sats>,
pub txindex_to_is_coinbase: ComputedVec<Txindex, bool>,
// pub txindex_to_feerate: ComputedVec<Txindex, Feerate>, // pub txindex_to_feerate: ComputedVec<Txindex, Feerate>,
pub txindex_to_input_count: ComputedVecsFromTxindex<StoredU64>,
// pub txindex_to_input_sum: ComputedVec<Txindex, Sats>, // pub txindex_to_input_sum: ComputedVec<Txindex, Sats>,
pub txindex_to_output_count: ComputedVecsFromTxindex<StoredU64>,
// pub txindex_to_output_sum: ComputedVec<Txindex, Sats>, // pub txindex_to_output_sum: ComputedVec<Txindex, Sats>,
// pub txindex_to_output_value: ComputedVecsFromTxindex<Sats>,
pub txindex_to_v1: ComputedVec<Txindex, StoredU8>,
pub txindex_to_v2: ComputedVec<Txindex, StoredU8>,
pub txindex_to_v3: ComputedVec<Txindex, StoredU8>,
pub indexes_to_tx_v1: ComputedVecsFromHeight<StoredU32>,
pub indexes_to_tx_v2: ComputedVecsFromHeight<StoredU32>,
pub indexes_to_tx_v3: ComputedVecsFromHeight<StoredU32>,
// pub txinindex_to_value: ComputedVec<Txinindex, Sats>, // pub txinindex_to_value: ComputedVec<Txinindex, Sats>,
pub height_to_tx_count: ComputedVecsFromHeight<StoredU64>,
pub txindex_to_input_count: ComputedVecsFromTxindex<StoredU64>,
pub txindex_to_is_coinbase: ComputedVec<Txindex, bool>,
pub txindex_to_output_count: ComputedVecsFromTxindex<StoredU64>,
/// Value == 0 when Coinbase
pub txinindex_to_value: ComputedVec<Txinindex, Sats>,
} }
impl Vecs { impl Vecs {
@@ -37,6 +46,14 @@ impl Vecs {
fs::create_dir_all(path)?; fs::create_dir_all(path)?;
Ok(Self { Ok(Self {
height_to_tx_count: ComputedVecsFromHeight::forced_import(
path,
"tx_count",
true,
Version::ZERO,
compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(),
)?,
// height_to_fee: StorableVec::forced_import(&path.join("height_to_fee"), Version::ZERO)?, // height_to_fee: StorableVec::forced_import(&path.join("height_to_fee"), Version::ZERO)?,
// height_to_input_count: StorableVec::forced_import( // height_to_input_count: StorableVec::forced_import(
// &path.join("height_to_input_count"), // &path.join("height_to_input_count"),
@@ -65,6 +82,7 @@ impl Vecs {
txindex_to_input_count: ComputedVecsFromTxindex::forced_import( txindex_to_input_count: ComputedVecsFromTxindex::forced_import(
path, path,
"input_count", "input_count",
true,
Version::ZERO, Version::ZERO,
compressed, compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(), StorableVecGeneatorOptions::default().add_sum().add_total(),
@@ -72,15 +90,62 @@ impl Vecs {
txindex_to_output_count: ComputedVecsFromTxindex::forced_import( txindex_to_output_count: ComputedVecsFromTxindex::forced_import(
path, path,
"output_count", "output_count",
true,
Version::ZERO, Version::ZERO,
compressed, compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(), StorableVecGeneatorOptions::default().add_sum().add_total(),
)?, )?,
// txinindex_to_value: StorableVec::forced_import( // txindex_to_output_value: ComputedVecsFromTxindex::forced_import(
// &path.join("txinindex_to_value"), // path,
// "output_value",
// Version::ZERO, // Version::ZERO,
// compressed, // compressed,
// StorableVecGeneatorOptions::default().add_sum().add_total(),
// )?, // )?,
txinindex_to_value: ComputedVec::forced_import(
&path.join("txinindex_to_value"),
Version::ZERO,
compressed,
)?,
txindex_to_v1: ComputedVec::forced_import(
&path.join("txindex_to_v1"),
Version::ZERO,
compressed,
)?,
txindex_to_v2: ComputedVec::forced_import(
&path.join("txindex_to_v2"),
Version::ZERO,
compressed,
)?,
txindex_to_v3: ComputedVec::forced_import(
&path.join("txindex_to_v3"),
Version::ZERO,
compressed,
)?,
indexes_to_tx_v1: ComputedVecsFromHeight::forced_import(
path,
"tx_v1",
true,
Version::ZERO,
compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(),
)?,
indexes_to_tx_v2: ComputedVecsFromHeight::forced_import(
path,
"tx_v2",
true,
Version::ZERO,
compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(),
)?,
indexes_to_tx_v3: ComputedVecsFromHeight::forced_import(
path,
"tx_v3",
true,
Version::ZERO,
compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(),
)?,
}) })
} }
@@ -91,7 +156,22 @@ impl Vecs {
starting_indexes: &Indexes, starting_indexes: &Indexes,
exit: &Exit, exit: &Exit,
) -> color_eyre::Result<()> { ) -> color_eyre::Result<()> {
self.txindex_to_input_count.compute( self.height_to_tx_count.compute_all(
indexer,
indexes,
starting_indexes,
exit,
|v, indexer, indexes, starting_indexes, exit| {
v.compute_count_from_indexes(
starting_indexes.height,
indexer.mut_vecs().height_to_first_txindex.mut_vec(),
indexes.height_to_last_txindex.mut_vec(),
exit,
)
},
)?;
self.txindex_to_input_count.compute_all(
indexer, indexer,
indexes, indexes,
starting_indexes, starting_indexes,
@@ -106,7 +186,7 @@ impl Vecs {
}, },
)?; )?;
self.txindex_to_output_count.compute( self.txindex_to_output_count.compute_all(
indexer, indexer,
indexes, indexes,
starting_indexes, starting_indexes,
@@ -121,6 +201,14 @@ impl Vecs {
}, },
)?; )?;
// self.txindex_to_output_value.compute_rest(
// indexer,
// indexes,
// starting_indexes,
// exit,
// Some(indexer.mut_vecs().txoutindex_to_value.mut_vec()),
// )?;
let indexer_vecs = indexer.mut_vecs(); let indexer_vecs = indexer.mut_vecs();
self.txindex_to_is_coinbase.compute_is_first_ordered( self.txindex_to_is_coinbase.compute_is_first_ordered(
@@ -130,23 +218,73 @@ impl Vecs {
exit, exit,
)?; )?;
// self.txinindex_to_value.compute_transform( self.txindex_to_v1.compute_transform(
// starting_indexes.txinindex, starting_indexes.txindex,
// indexer_vecs.txinindex_to_txoutindex.mut_vec(), indexer_vecs.txindex_to_txversion.mut_vec(),
// |(txinindex, txoutindex, slf, other)| { |(i, v, ..)| (i, StoredU8::from(v)),
// let value = exit,
// if let Ok(Some(value)) = indexer_vecs.txoutindex_to_value.read(txoutindex) { )?;
// *value // self.indexes_to_tx_v1.compute_all(
// } else { // indexer,
// dbg!(txinindex, txoutindex, slf.len(), other.len()); // indexes,
// panic!() // starting_indexes,
// }; // exit,
// (txinindex, value) // |vec, indexer, indexes, indexes, exit| {
// vec.compute_transform(
// starting_indexes.height,
// indexer.mut_vecs().txindex_to_txversion.mut_vec(),
// || {},
// exit,
// )?;
// }, // },
// )?;
self.txindex_to_v2.compute_transform(
starting_indexes.txindex,
indexer_vecs.txindex_to_txversion.mut_vec(),
|(i, v, ..)| (i, StoredU8::from(v)),
exit,
)?;
// self.indexes_to_tx_v1.compute_rest(
// starting_indexes.txindex,
// indexer_vecs.txindex_to_txversion.mut_vec(),
// |(i, v, ..)| (i, StoredU8::from(v)),
// exit,
// )?;
self.txindex_to_v3.compute_transform(
starting_indexes.txindex,
indexer_vecs.txindex_to_txversion.mut_vec(),
|(i, v, ..)| (i, StoredU8::from(v)),
exit,
)?;
// self.indexes_to_tx_v1.compute_rest(
// starting_indexes.txindex,
// indexer_vecs.txindex_to_txversion.mut_vec(),
// |(i, v, ..)| (i, StoredU8::from(v)),
// exit, // exit,
// )?; // )?;
// self.vecs.txindex_to_fee.compute_transform( self.txinindex_to_value.compute_transform(
starting_indexes.txinindex,
indexer_vecs.txinindex_to_txoutindex.mut_vec(),
|(txinindex, txoutindex, slf, other)| {
let value = if txoutindex == Txoutindex::COINBASE {
Sats::ZERO
} else if let Ok(Some(value)) = indexer_vecs
.txoutindex_to_value
.mut_vec()
.cached_get(txoutindex)
{
*value
} else {
dbg!(txinindex, txoutindex, slf.len(), other.len());
panic!()
};
(txinindex, value)
},
exit,
)?;
// self.txindex_to_fee.compute_transform(
// &mut self.vecs.txindex_to_height, // &mut self.vecs.txindex_to_height,
// &mut indexer.vecs().height_to_first_txindex, // &mut indexer.vecs().height_to_first_txindex,
// )?; // )?;
@@ -154,11 +292,21 @@ impl Vecs {
Ok(()) Ok(())
} }
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
[ [
vec![self.txindex_to_is_coinbase.any_vec()], vec![
self.txindex_to_is_coinbase.any_vec(),
self.txinindex_to_value.any_vec(),
self.txindex_to_v1.any_vec(),
self.txindex_to_v2.any_vec(),
self.txindex_to_v3.any_vec(),
],
self.height_to_tx_count.any_vecs(),
self.txindex_to_output_count.any_vecs(), self.txindex_to_output_count.any_vecs(),
self.txindex_to_input_count.any_vecs(), self.txindex_to_input_count.any_vecs(),
self.indexes_to_tx_v1.any_vecs(),
self.indexes_to_tx_v2.any_vecs(),
self.indexes_to_tx_v3.any_vecs(),
] ]
.concat() .concat()
} }
+2
View File
@@ -22,6 +22,7 @@ mod quarterindex;
mod sats; mod sats;
mod stored_u32; mod stored_u32;
mod stored_u64; mod stored_u64;
mod stored_u8;
mod stored_usize; mod stored_usize;
mod timestamp; mod timestamp;
mod txid; mod txid;
@@ -58,6 +59,7 @@ pub use monthindex::*;
pub use ohlc::*; pub use ohlc::*;
pub use quarterindex::*; pub use quarterindex::*;
pub use sats::*; pub use sats::*;
pub use stored_u8::*;
pub use stored_u32::*; pub use stored_u32::*;
pub use stored_u64::*; pub use stored_u64::*;
pub use stored_usize::*; pub use stored_usize::*;
+7 -1
View File
@@ -6,7 +6,7 @@ use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::CheckedSub; use crate::CheckedSub;
use super::{Txinindex, Txoutindex}; use super::{Txindex, Txinindex, Txoutindex};
#[derive( #[derive(
Debug, Debug,
@@ -80,6 +80,12 @@ impl From<StoredU64> for f64 {
} }
} }
impl From<Txindex> for StoredU64 {
fn from(value: Txindex) -> Self {
Self(*value as u64)
}
}
impl From<Txinindex> for StoredU64 { impl From<Txinindex> for StoredU64 {
fn from(value: Txinindex) -> Self { fn from(value: Txinindex) -> Self {
Self(*value) Self(*value)
+79
View File
@@ -0,0 +1,79 @@
use std::ops::{Add, Div};
use derive_deref::Deref;
use serde::Serialize;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::CheckedSub;
#[derive(
Debug,
Deref,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
FromBytes,
Immutable,
IntoBytes,
KnownLayout,
Serialize,
)]
pub struct StoredU8(u8);
impl StoredU8 {
pub const ZERO: Self = Self(0);
pub fn new(counter: u8) -> Self {
Self(counter)
}
}
impl From<u8> for StoredU8 {
fn from(value: u8) -> Self {
Self(value)
}
}
impl From<usize> for StoredU8 {
fn from(value: usize) -> Self {
Self(value as u8)
}
}
impl CheckedSub<StoredU8> for StoredU8 {
fn checked_sub(self, rhs: Self) -> Option<Self> {
self.0.checked_sub(rhs.0).map(Self)
}
}
impl Div<usize> for StoredU8 {
type Output = Self;
fn div(self, rhs: usize) -> Self::Output {
Self(self.0 / rhs as u8)
}
}
impl Add for StoredU8 {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
Self(self.0 + rhs.0)
}
}
impl From<f64> for StoredU8 {
fn from(value: f64) -> Self {
if value < 0.0 || value > u32::MAX as f64 {
panic!()
}
Self(value as u8)
}
}
impl From<StoredU8> for f64 {
fn from(value: StoredU8) -> Self {
value.0 as f64
}
}
+8
View File
@@ -2,6 +2,8 @@ use derive_deref::Deref;
use serde::Serialize; use serde::Serialize;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use super::StoredU8;
#[derive(Debug, Deref, Clone, Copy, Immutable, IntoBytes, KnownLayout, FromBytes, Serialize)] #[derive(Debug, Deref, Clone, Copy, Immutable, IntoBytes, KnownLayout, FromBytes, Serialize)]
pub struct TxVersion(i32); pub struct TxVersion(i32);
@@ -16,3 +18,9 @@ impl From<TxVersion> for bitcoin::transaction::Version {
Self(value.0) Self(value.0)
} }
} }
impl From<TxVersion> for StoredU8 {
fn from(value: TxVersion) -> Self {
Self::from(value.0 as u8)
}
}
+9 -5
View File
@@ -58,9 +58,13 @@ Stores: `src/storage/stores/mod.rs`
## Benchmark ## Benchmark
Indexing `0..885_835` took `11 hours 6 min 50 s` on a Macbook Pro M3 Pro with 36 GB of RAM ### Result 1 - 2025-04-10
`footprint` report: - version: `v0.0.20`
- Peak memory: `5115 MB` - machine: `Macbook Pro M3 Pro (36GB RAM)`
- Memory while waiting for a new block: `890 MB` - mode: `raw`
- Reclaimable memory: `6478 MB` - from: `0`
- to: `891_810`
- time: `8 hours 27 min 3s`
- peak memory: `6.5GB`
- disk usage: `270 GB`
+5 -1
View File
@@ -1,4 +1,4 @@
use std::path::Path; use std::{path::Path, time::Instant};
use brk_core::default_bitcoin_path; use brk_core::default_bitcoin_path;
use brk_exit::Exit; use brk_exit::Exit;
@@ -8,6 +8,8 @@ use brk_parser::{Parser, rpc};
fn main() -> color_eyre::Result<()> { fn main() -> color_eyre::Result<()> {
color_eyre::install()?; color_eyre::install()?;
let i = Instant::now();
brk_logger::init(Some(Path::new(".log"))); brk_logger::init(Some(Path::new(".log")));
let bitcoin_dir = default_bitcoin_path(); let bitcoin_dir = default_bitcoin_path();
@@ -29,5 +31,7 @@ fn main() -> color_eyre::Result<()> {
indexer.index(&parser, rpc, &exit)?; indexer.index(&parser, rpc, &exit)?;
dbg!(i.elapsed());
Ok(()) Ok(())
} }
+36 -86
View File
@@ -1,21 +1,24 @@
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
fmt::Debug, fmt::Debug,
io,
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
use brk_vec::{ use brk_vec::{
AnyStorableVec, Compressed, Error, MAX_CACHE_SIZE, MAX_PAGE_SIZE, Result, StorableVec, Compressed, DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, StoredVec, Value,
StoredIndex, StoredType, Value, Version, Version,
}; };
use super::Height; use super::Height;
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct IndexedVec<I, T> { pub struct IndexedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
height: Option<Height>, height: Option<Height>,
vec: StorableVec<I, T>, inner: StoredVec<I, T>,
} }
impl<I, T> IndexedVec<I, T> impl<I, T> IndexedVec<I, T>
@@ -23,91 +26,51 @@ where
I: StoredIndex, I: StoredIndex,
T: StoredType, T: StoredType,
{ {
pub const SIZE_OF_T: usize = size_of::<T>();
pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T;
pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T;
pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE;
pub fn forced_import( pub fn forced_import(
path: &Path, path: &Path,
version: Version, version: Version,
compressed: Compressed, compressed: Compressed,
) -> brk_vec::Result<Self> { ) -> brk_vec::Result<Self> {
let mut vec = StorableVec::forced_import(path, version, compressed)?; let mut inner = StoredVec::forced_import(path, version, compressed)?;
vec.enable_large_cache(); inner.enable_large_cache_if_needed();
Ok(Self { Ok(Self {
height: Height::try_from(Self::path_height_(path).as_path()).ok(), height: Height::try_from(Self::path_height_(path).as_path()).ok(),
vec, inner,
}) })
} }
#[inline] #[inline]
pub fn get(&self, index: I) -> Result<Option<Value<'_, T>>> { pub fn get(&self, index: I) -> Result<Option<Value<'_, T>>> {
self.get_(index.to_usize()?) self.inner.get(index)
} }
fn get_(&self, index: usize) -> Result<Option<Value<'_, T>>> { #[inline]
match self.vec.index_to_pushed_index(index) { pub fn cached_get(&mut self, index: I) -> Result<Option<Value<'_, T>>> {
Ok(index) => { self.inner.cached_get(index)
if let Some(index) = index {
return Ok(self.vec.pushed().get(index).map(|v| Value::Ref(v)));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
let large_cache_len = self.vec.large_cache_len();
if large_cache_len != 0 {
let page_index = Self::index_to_page_index(index);
let last_index = self.vec.stored_len() - 1;
let max_page_index = Self::index_to_page_index(last_index);
let min_page_index = (max_page_index + 1) - large_cache_len;
if page_index >= min_page_index {
let values = self
.vec
.pages()
.unwrap()
.get(page_index - min_page_index)
.ok_or(Error::MmapsVecIsTooSmall)?
.get_or_init(|| self.vec.decode_page(page_index).unwrap());
return Ok(values.get(index)?.map(|v| Value::Ref(v)));
}
}
Ok(self.vec.read_(index)?.map(|v| Value::Owned(v)))
} }
pub fn iter_from<F>(&mut self, index: I, f: F) -> Result<()> pub fn iter_from<F>(&mut self, index: I, f: F) -> Result<()>
where where
F: FnMut((I, &T)) -> Result<()>, F: FnMut((I, T, &mut dyn DynamicVec<I = I, T = T>)) -> Result<()>,
{ {
self.vec.iter_from(index, f) self.inner.iter_from(index, f)
}
#[inline(always)]
fn index_to_page_index(index: usize) -> usize {
index / Self::PER_PAGE
} }
#[inline] #[inline]
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> { pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
match self.vec.len().cmp(&index.to_usize()?) { match self.inner.len().cmp(&index.to_usize()?) {
Ordering::Greater => { Ordering::Greater => {
// dbg!(len, index, &self.pathbuf); // dbg!(len, index, &self.pathbuf);
// panic!(); // panic!();
Ok(()) Ok(())
} }
Ordering::Equal => { Ordering::Equal => {
self.vec.push(value); self.inner.push(value);
Ok(()) Ok(())
} }
Ordering::Less => { Ordering::Less => {
dbg!(index, value, self.vec.len(), self.path_height()); dbg!(index, value, self.inner.len(), self.path_height());
Err(Error::IndexTooHigh) Err(Error::IndexTooHigh)
} }
} }
@@ -117,67 +80,54 @@ where
if self.height.is_none_or(|self_height| self_height != height) { if self.height.is_none_or(|self_height| self_height != height) {
height.write(&self.path_height())?; height.write(&self.path_height())?;
} }
self.vec.truncate_if_needed(index)?; self.inner.truncate_if_needed(index)?;
Ok(()) Ok(())
} }
pub fn flush(&mut self, height: Height) -> io::Result<()> { pub fn flush(&mut self, height: Height) -> Result<()> {
height.write(&self.path_height())?; height.write(&self.path_height())?;
self.vec.flush() self.inner.flush()
} }
pub fn vec(&self) -> &StorableVec<I, T> { pub fn vec(&self) -> &StoredVec<I, T> {
&self.vec &self.inner
} }
pub fn mut_vec(&mut self) -> &mut StorableVec<I, T> { pub fn mut_vec(&mut self) -> &mut StoredVec<I, T> {
&mut self.vec &mut self.inner
} }
pub fn any_vec(&self) -> &dyn AnyStorableVec { pub fn any_vec(&self) -> &dyn brk_vec::AnyStoredVec {
&self.vec &self.inner
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.vec.len() self.inner.len()
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.vec.is_empty() self.inner.is_empty()
} }
#[inline] #[inline]
pub fn hasnt(&self, index: I) -> Result<bool> { pub fn hasnt(&self, index: I) -> Result<bool> {
self.vec.has(index).map(|b| !b) self.inner.has(index).map(|b| !b)
} }
pub fn height(&self) -> brk_core::Result<Height> { pub fn height(&self) -> brk_core::Result<Height> {
Height::try_from(self.path_height().as_path()) Height::try_from(self.path_height().as_path())
} }
fn path_height(&self) -> PathBuf { fn path_height(&self) -> PathBuf {
Self::path_height_(self.vec.path()) Self::path_height_(self.inner.path())
} }
fn path_height_(path: &Path) -> PathBuf { fn path_height_(path: &Path) -> PathBuf {
path.join("height") path.join("height")
} }
} }
impl<I, T> Clone for IndexedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn clone(&self) -> Self {
Self {
height: self.height,
vec: self.vec.clone(),
}
}
}
pub trait AnyIndexedVec: Send + Sync { pub trait AnyIndexedVec: Send + Sync {
fn height(&self) -> brk_core::Result<Height>; fn height(&self) -> brk_core::Result<Height>;
fn flush(&mut self, height: Height) -> io::Result<()>; fn flush(&mut self, height: Height) -> Result<()>;
} }
impl<I, T> AnyIndexedVec for IndexedVec<I, T> impl<I, T> AnyIndexedVec for IndexedVec<I, T>
@@ -189,7 +139,7 @@ where
self.height() self.height()
} }
fn flush(&mut self, height: Height) -> io::Result<()> { fn flush(&mut self, height: Height) -> Result<()> {
self.flush(height) self.flush(height)
} }
} }
+4 -5
View File
@@ -1,4 +1,4 @@
use std::{fs, io, path::Path}; use std::{fs, path::Path};
use brk_core::{ use brk_core::{
Addressbytes, Addressindex, Addresstype, Addresstypeindex, BlockHash, Emptyindex, Height, Addressbytes, Addressindex, Addresstype, Addresstypeindex, BlockHash, Emptyindex, Height,
@@ -7,7 +7,7 @@ use brk_core::{
P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, Sats, P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, Sats,
StoredUsize, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight, StoredUsize, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight,
}; };
use brk_vec::{AnyStorableVec, Compressed, Version}; use brk_vec::{AnyStoredVec, Compressed, Result, Version};
use rayon::prelude::*; use rayon::prelude::*;
use crate::Indexes; use crate::Indexes;
@@ -374,7 +374,6 @@ impl Vecs {
pub fn rollback_if_needed(&mut self, starting_indexes: &Indexes) -> brk_vec::Result<()> { pub fn rollback_if_needed(&mut self, starting_indexes: &Indexes) -> brk_vec::Result<()> {
let saved_height = starting_indexes.height.decremented().unwrap_or_default(); let saved_height = starting_indexes.height.decremented().unwrap_or_default();
// Now we can cut everything that's out of date
let &Indexes { let &Indexes {
addressindex, addressindex,
height, height,
@@ -595,7 +594,7 @@ impl Vecs {
} }
} }
pub fn flush(&mut self, height: Height) -> io::Result<()> { pub fn flush(&mut self, height: Height) -> Result<()> {
self.as_mut_any_vecs() self.as_mut_any_vecs()
.into_par_iter() .into_par_iter()
.try_for_each(|vec| vec.flush(height)) .try_for_each(|vec| vec.flush(height))
@@ -609,7 +608,7 @@ impl Vecs {
.unwrap() .unwrap()
} }
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> { pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
vec![ vec![
self.addressindex_to_addresstype.any_vec(), self.addressindex_to_addresstype.any_vec(),
self.addressindex_to_addresstypeindex.any_vec(), self.addressindex_to_addresstypeindex.any_vec(),
+4 -4
View File
@@ -5,7 +5,7 @@
use brk_computer::Computer; use brk_computer::Computer;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_vec::AnyStorableVec; use brk_vec::AnyStoredVec;
use tabled::settings::Style; use tabled::settings::Style;
mod format; mod format;
@@ -51,7 +51,7 @@ impl<'a> Query<'a> {
} }
} }
pub fn search(&self, index: Index, ids: &[&str]) -> Vec<(String, &&dyn AnyStorableVec)> { pub fn search(&self, index: Index, ids: &[&str]) -> Vec<(String, &&dyn AnyStoredVec)> {
let tuples = ids let tuples = ids
.iter() .iter()
.flat_map(|s| { .flat_map(|s| {
@@ -86,7 +86,7 @@ impl<'a> Query<'a> {
pub fn format( pub fn format(
&self, &self,
vecs: Vec<(String, &&dyn AnyStorableVec)>, vecs: Vec<(String, &&dyn AnyStoredVec)>,
from: Option<i64>, from: Option<i64>,
to: Option<i64>, to: Option<i64>,
format: Option<Format>, format: Option<Format>,
@@ -94,7 +94,7 @@ impl<'a> Query<'a> {
let mut values = vecs let mut values = vecs
.iter() .iter()
.map(|(_, vec)| -> brk_vec::Result<Vec<serde_json::Value>> { .map(|(_, vec)| -> brk_vec::Result<Vec<serde_json::Value>> {
vec.collect_range_values(from, to) vec.collect_range_serde_json(from, to)
}) })
.collect::<brk_vec::Result<Vec<_>>>()?; .collect::<brk_vec::Result<Vec<_>>>()?;
+4 -4
View File
@@ -1,6 +1,6 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use brk_vec::AnyStorableVec; use brk_vec::AnyStoredVec;
use derive_deref::{Deref, DerefMut}; use derive_deref::{Deref, DerefMut};
use super::index::Index; use super::index::Index;
@@ -13,7 +13,7 @@ pub struct VecTrees<'a> {
impl<'a> VecTrees<'a> { impl<'a> VecTrees<'a> {
// Not the most performant or type safe but only built once so that's okay // Not the most performant or type safe but only built once so that's okay
pub fn insert(&mut self, vec: &'a dyn AnyStorableVec) { pub fn insert(&mut self, vec: &'a dyn AnyStoredVec) {
let file_name = vec.file_name(); let file_name = vec.file_name();
let split = file_name.split("_to_").collect::<Vec<_>>(); let split = file_name.split("_to_").collect::<Vec<_>>();
if split.len() != 2 { if split.len() != 2 {
@@ -88,7 +88,7 @@ impl<'a> VecTrees<'a> {
} }
#[derive(Default, Deref, DerefMut)] #[derive(Default, Deref, DerefMut)]
pub struct IndexToVec<'a>(BTreeMap<Index, &'a dyn AnyStorableVec>); pub struct IndexToVec<'a>(BTreeMap<Index, &'a dyn AnyStoredVec>);
#[derive(Default, Deref, DerefMut)] #[derive(Default, Deref, DerefMut)]
pub struct IdToVec<'a>(BTreeMap<String, &'a dyn AnyStorableVec>); pub struct IdToVec<'a>(BTreeMap<String, &'a dyn AnyStoredVec>);
+1 -1
View File
@@ -7,7 +7,7 @@ license.workspace = true
repository.workspace = true repository.workspace = true
[dependencies] [dependencies]
axum = "0.8.3" axum = { workspace = true }
brk_computer = { workspace = true } brk_computer = { workspace = true }
brk_exit = { workspace = true } brk_exit = { workspace = true }
brk_fetcher = { workspace = true } brk_fetcher = { workspace = true }
+3 -1
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "brk_vec" name = "brk_vec"
description = "A very small, fast, efficient and simple storable Vec" description = "A push-only, truncable, compressable, saveable Vec"
keywords = ["vec", "disk", "data"] keywords = ["vec", "disk", "data"]
categories = ["database"] categories = ["database"]
version.workspace = true version.workspace = true
@@ -9,6 +9,8 @@ license.workspace = true
repository.workspace = true repository.workspace = true
[dependencies] [dependencies]
axum = { workspace = true }
arc-swap = "1.7.1"
memmap2 = "0.9.5" memmap2 = "0.9.5"
rayon = { workspace = true } rayon = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
-16
View File
@@ -39,19 +39,3 @@ A `Vec` (an array) that is stored on disk and thus which can be much larger than
Compared to a key/value store, the data stored is raw byte interpretation of the Vec's values without any overhead which is very efficient. Additionally it uses close to no RAM when caching isn't active and up to 100 MB when it is. Compared to a key/value store, the data stored is raw byte interpretation of the Vec's values without any overhead which is very efficient. Additionally it uses close to no RAM when caching isn't active and up to 100 MB when it is.
Compression is also available and built on top [`zstd`](https://crates.io/crates/zstd) to save even more space (from 0 to 75%). The tradeoff being slower reading speeds, especially random reading speeds. This is due to the data being stored in compressed pages of 16 KB, which means that if you to read even one value in that page you have to uncompress the whole page. Compression is also available and built on top [`zstd`](https://crates.io/crates/zstd) to save even more space (from 0 to 75%). The tradeoff being slower reading speeds, especially random reading speeds. This is due to the data being stored in compressed pages of 16 KB, which means that if you to read even one value in that page you have to uncompress the whole page.
## Disclaimer
Portability will depend on the type of values.
Non bytes/slices types (`u8`, `u16`, ...) will be read as slice in an unsafe manner (using `std::slice::from_raw_parts`) and thus have the endianness of the system. On the other hand, `&[u8]` should be inserted as is.
If portability is important to you, just create a wrapper struct which has custom `get`, `push`, ... methods and does something like:
```rust
impl StorableVecU64 {
pub fn push(&mut self, value: u64) {
self.push(&value.to_be_bytes())
}
}
```
+14 -11
View File
@@ -1,13 +1,16 @@
use std::{fs, path::Path}; use std::{fs, path::Path};
use brk_vec::{Compressed, StorableVec, Version}; use brk_vec::{Compressed, DynamicVec, GenericVec, StoredVec, Version};
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = fs::remove_dir_all("./vec"); let _ = fs::remove_dir_all("./vec");
let version = Version::ZERO;
let compressed = Compressed::YES;
{ {
let mut vec: StorableVec<usize, u32> = let mut vec: StoredVec<usize, u32> =
StorableVec::forced_import(Path::new("./vec"), Version::ZERO, Compressed::YES)?; StoredVec::forced_import(Path::new("./vec"), version, compressed)?;
(0..21_u32).for_each(|v| { (0..21_u32).for_each(|v| {
vec.push(v); vec.push(v);
@@ -20,8 +23,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
{ {
let mut vec: StorableVec<usize, u32> = let mut vec: StoredVec<usize, u32> =
StorableVec::forced_import(Path::new("./vec"), Version::ZERO, Compressed::YES)?; StoredVec::forced_import(Path::new("./vec"), version, compressed)?;
dbg!(vec.get(0)?); dbg!(vec.get(0)?);
dbg!(vec.get(0)?); dbg!(vec.get(0)?);
@@ -42,10 +45,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
{ {
let mut vec: StorableVec<usize, u32> = let mut vec: StoredVec<usize, u32> =
StorableVec::forced_import(Path::new("./vec"), Version::ZERO, Compressed::YES)?; StoredVec::forced_import(Path::new("./vec"), version, compressed)?;
vec.enable_large_cache(); vec.enable_large_cache_if_needed();
dbg!(vec.get(0)?); dbg!(vec.get(0)?);
dbg!(vec.get(20)?); dbg!(vec.get(20)?);
@@ -58,17 +61,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
dbg!(vec.get(5)?); dbg!(vec.get(5)?);
dbg!(vec.get(20)?); dbg!(vec.get(20)?);
vec.iter(|(_, v)| { vec.iter(|(_, v, ..)| {
dbg!(v); dbg!(v);
Ok(()) Ok(())
})?; })?;
vec.iter_from(5, |(_, v)| { vec.iter_from(5, |(_, v, ..)| {
dbg!(v); dbg!(v);
Ok(()) Ok(())
})?; })?;
dbg!(vec.collect_range(Some(-5), None)?); dbg!(vec.collect_signed_range(Some(-5), None)?);
} }
Ok(()) Ok(())
+10
View File
@@ -15,6 +15,7 @@ pub enum Error {
IO(io::Error), IO(io::Error),
ZeroCopyError, ZeroCopyError,
IndexTooHigh, IndexTooHigh,
EmptyVec,
IndexTooLow, IndexTooLow,
ExpectFileToHaveIndex, ExpectFileToHaveIndex,
ExpectVecToHaveIndex, ExpectVecToHaveIndex,
@@ -22,6 +23,7 @@ pub enum Error {
UnsupportedUnflushedState, UnsupportedUnflushedState,
RangeFromAfterTo(usize, usize), RangeFromAfterTo(usize, usize),
DifferentCompressionMode, DifferentCompressionMode,
ToSerdeJsonValueError(serde_json::Error),
} }
impl From<io::Error> for Error { impl From<io::Error> for Error {
@@ -42,6 +44,12 @@ impl<A, B> From<zerocopy::error::SizeError<A, B>> for Error {
} }
} }
impl From<serde_json::Error> for Error {
fn from(error: serde_json::Error) -> Self {
Self::ToSerdeJsonValueError(error)
}
}
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
@@ -68,6 +76,8 @@ impl fmt::Display for Error {
Error::ZeroCopyError => write!(f, "Zero copy convert error"), Error::ZeroCopyError => write!(f, "Zero copy convert error"),
Error::RangeFromAfterTo(from, to) => write!(f, "Range, from {from} is after to {to}"), Error::RangeFromAfterTo(from, to) => write!(f, "Range, from {from} is after to {to}"),
Error::DifferentCompressionMode => write!(f, "Different compression mode chosen"), Error::DifferentCompressionMode => write!(f, "Different compression mode chosen"),
Error::EmptyVec => write!(f, "The Vec is empty, maybe wait for a bit"),
Error::ToSerdeJsonValueError(error) => Debug::fmt(&error, f),
} }
} }
} }
-2
View File
@@ -1,7 +1,5 @@
mod error; mod error;
mod value; mod value;
mod values;
pub use error::*; pub use error::*;
pub use value::*; pub use value::*;
pub use values::*;
-83
View File
@@ -1,83 +0,0 @@
use std::ops::Range;
use memmap2::Mmap;
use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes};
use crate::MAX_PAGE_SIZE;
use super::Result;
#[derive(Debug)]
pub enum Values<T> {
Owned(Box<[T]>),
Ref(Box<Mmap>),
}
impl<T> Values<T> {
const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T;
const SIZE_OF_T: usize = size_of::<T>();
pub fn get(&self, index: usize) -> Result<Option<&T>>
where
T: TryFromBytes + IntoBytes + Immutable + KnownLayout,
{
let index = Self::index_to_decoded_index(index);
Ok(match self {
Self::Owned(a) => a.get(index),
Self::Ref(m) => {
let range = Self::index_to_byte_range(index);
let source = &m[range];
Some(T::try_ref_from_bytes(source)?)
}
})
}
pub fn as_arr(&self) -> &[T] {
match self {
Self::Owned(a) => a,
Self::Ref(_) => unreachable!(),
}
}
pub fn as_mmap(&self) -> &Mmap {
match self {
Self::Owned(_) => unreachable!(),
Self::Ref(m) => m,
}
}
#[inline]
fn index_to_byte_range(index: usize) -> Range<usize> {
let index = Self::index_to_byte_index(index) as usize;
index..(index + Self::SIZE_OF_T)
}
#[inline]
fn index_to_byte_index(index: usize) -> u64 {
(index * Self::SIZE_OF_T) as u64
}
#[inline(always)]
fn index_to_decoded_index(index: usize) -> usize {
index % Self::PER_PAGE
}
}
impl<T> From<Box<[T]>> for Values<T> {
fn from(value: Box<[T]>) -> Self {
Self::Owned(value)
}
}
impl<T> From<Mmap> for Values<T> {
fn from(value: Mmap) -> Self {
Self::Ref(Box::new(value))
}
}
impl<T> Default for Values<T> {
fn default() -> Self {
Self::Owned(vec![].into_boxed_slice())
}
}
+198 -820
View File
File diff suppressed because it is too large Load Diff
@@ -22,8 +22,10 @@ impl CompressedPagesMetadata {
const PAGE_SIZE: usize = size_of::<CompressedPageMetadata>(); const PAGE_SIZE: usize = size_of::<CompressedPageMetadata>();
pub fn read(path: &Path) -> Result<CompressedPagesMetadata> { pub fn read(path: &Path) -> Result<CompressedPagesMetadata> {
let path = path.join("pages_meta");
let slf = Self { let slf = Self {
vec: fs::read(path) vec: fs::read(&path)
.unwrap_or_default() .unwrap_or_default()
.chunks(Self::PAGE_SIZE) .chunks(Self::PAGE_SIZE)
.map(|bytes| { .map(|bytes| {
-61
View File
@@ -1,61 +0,0 @@
use std::{io, path::PathBuf};
use crate::{Result, StorableVec};
use super::{StoredIndex, StoredType};
pub trait AnyStorableVec: Send + Sync {
fn file_name(&self) -> String;
fn index_type_to_string(&self) -> &str;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn collect_range_values(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Vec<serde_json::Value>>;
fn flush(&mut self) -> io::Result<()>;
fn path_vec(&self) -> PathBuf;
}
impl<I, T> AnyStorableVec for StorableVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn file_name(&self) -> String {
self.file_name()
}
fn index_type_to_string(&self) -> &str {
self.index_type_to_string()
}
fn len(&self) -> usize {
self.len()
}
fn is_empty(&self) -> bool {
self.is_empty()
}
fn flush(&mut self) -> io::Result<()> {
self.flush()
}
fn collect_range_values(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Vec<serde_json::Value>> {
Ok(self
.collect_range(from, to)?
.into_iter()
.map(|v| serde_json::to_value(v).unwrap())
.collect::<Vec<_>>())
}
fn path_vec(&self) -> PathBuf {
self.base().path_vec()
}
}
+116
View File
@@ -0,0 +1,116 @@
use std::sync::Arc;
use arc_swap::{ArcSwap, Guard};
use memmap2::Mmap;
use crate::{Error, Result, Value};
use super::{StoredIndex, StoredType};
pub trait DynamicVec: Send + Sync {
type I: StoredIndex;
type T: StoredType;
#[inline]
fn get(&self, index: Self::I) -> Result<Option<Value<Self::T>>> {
self.get_(index.to_usize()?)
}
#[inline]
fn cached_get(&mut self, index: Self::I) -> Result<Option<Value<Self::T>>> {
self.get_(index.to_usize()?)
}
#[inline]
fn get_(&self, index: usize) -> Result<Option<Value<Self::T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed().get(index).map(Value::Ref));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
Ok(self
.get_stored_(index.to_usize()?, self.guard().as_ref().unwrap())?
.map(Value::Owned))
}
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<Self::T>>;
fn get_last(&self) -> Result<Option<Value<Self::T>>> {
let len = self.len();
if len == 0 {
return Ok(None);
}
self.get_(len - 1)
}
#[inline]
fn cached_get_(&mut self, index: usize) -> Result<Option<Value<Self::T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed().get(index).map(Value::Ref));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
let mmap = Arc::clone(self.guard().as_ref().unwrap());
Ok(self
.cached_get_stored_(index.to_usize()?, &mmap)?
.map(Value::Owned))
}
fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result<Option<Self::T>>;
fn cached_get_last(&mut self) -> Result<Option<Value<Self::T>>> {
let len = self.len();
if len == 0 {
return Ok(None);
}
self.cached_get_(len - 1)
}
#[inline]
fn len(&self) -> usize {
self.stored_len() + self.pushed_len()
}
#[inline]
fn is_empty(&self) -> bool {
self.len() == 0
}
fn mmap(&self) -> &ArcSwap<Mmap>;
fn guard(&self) -> &Option<Guard<Arc<Mmap>>>;
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>>;
fn stored_len(&self) -> usize;
fn pushed(&self) -> &[Self::T];
#[inline]
fn pushed_len(&self) -> usize {
self.pushed().len()
}
fn mut_pushed(&mut self) -> &mut Vec<Self::T>;
#[inline]
fn push(&mut self, value: Self::T) {
self.mut_pushed().push(value)
}
#[inline]
fn index_to_pushed_index(&self, index: usize) -> Result<Option<usize>> {
let stored_len = self.stored_len();
if index >= stored_len {
let index = index - stored_len;
if index >= self.pushed_len() {
Err(Error::IndexTooHigh)
} else {
Ok(Some(index))
}
} else {
Err(Error::IndexTooLow)
}
}
}
+209
View File
@@ -0,0 +1,209 @@
use std::{
fs::{File, OpenOptions},
io::{self, Seek, SeekFrom, Write},
path::{Path, PathBuf},
sync::Arc,
};
use axum::{
Json,
response::{IntoResponse, Response},
};
use memmap2::Mmap;
use serde_json::Value;
use crate::{Error, Result, Version};
use super::{DynamicVec, StoredIndex, StoredType};
pub trait GenericVec<I, T>: DynamicVec<I = I, T = T>
where
I: StoredIndex,
T: StoredType,
{
const SIZE_OF_T: usize = size_of::<Self::T>();
fn open_file(&self) -> io::Result<File> {
Self::open_file_(&self.path_vec())
}
fn open_file_(path: &Path) -> io::Result<File> {
OpenOptions::new()
.read(true)
.create(true)
.truncate(false)
.append(true)
.open(path)
}
fn file_set_len(&mut self, len: u64) -> Result<()> {
let mut file = self.open_file()?;
Self::file_set_len_(&mut file, len)?;
self.update_mmap(file)
}
fn file_set_len_(file: &mut File, len: u64) -> Result<()> {
file.set_len(len)?;
file.seek(SeekFrom::End(0))?;
Ok(())
}
fn file_write_all(&mut self, buf: &[u8]) -> Result<()> {
let mut file = self.open_file()?;
file.write_all(buf)?;
self.update_mmap(file)
}
fn file_truncate_and_write_all(&mut self, len: u64, buf: &[u8]) -> Result<()> {
let mut file = self.open_file()?;
Self::file_set_len_(&mut file, len)?;
file.write_all(buf)?;
self.update_mmap(file)
}
#[inline]
fn reset(&mut self) -> Result<()> {
self.file_write_all(&[])?;
Ok(())
}
fn new_mmap(file: File) -> Result<Arc<Mmap>> {
Ok(Arc::new(unsafe { Mmap::map(&file)? }))
}
fn update_mmap(&mut self, file: File) -> Result<()> {
let mmap = Self::new_mmap(file)?;
self.mmap().store(mmap);
if self.guard().is_some() {
let guard = self.mmap().load();
self.mut_guard().replace(guard);
} else {
unreachable!("This function shouldn't be called in a cloned instance")
}
Ok(())
}
#[inline]
fn is_pushed_empty(&self) -> bool {
self.pushed_len() == 0
}
#[inline]
fn has(&self, index: Self::I) -> Result<bool> {
Ok(self.has_(index.to_usize()?))
}
#[inline]
fn has_(&self, index: usize) -> bool {
index < self.len()
}
#[inline]
fn index_type_to_string(&self) -> &str {
Self::I::to_string()
}
#[inline]
fn iter<F>(&mut self, f: F) -> Result<()>
where
F: FnMut(
(
Self::I,
Self::T,
&mut dyn DynamicVec<I = Self::I, T = Self::T>,
),
) -> Result<()>,
{
self.iter_from(Self::I::default(), f)
}
fn iter_from<F>(&mut self, index: Self::I, f: F) -> Result<()>
where
F: FnMut(
(
Self::I,
Self::T,
&mut dyn DynamicVec<I = Self::I, T = Self::T>,
),
) -> Result<()>;
fn flush(&mut self) -> Result<()>;
fn truncate_if_needed(&mut self, index: Self::I) -> Result<()>;
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<Self::T>>;
#[inline]
fn collect_inclusive_range(&self, from: I, to: I) -> Result<Vec<Self::T>> {
self.collect_range(Some(from.to_usize()?), Some(to.to_usize()? + 1))
}
#[inline]
fn i64_to_usize(i: i64, len: usize) -> usize {
if i >= 0 {
i as usize
} else {
let v = len as i64 + i;
if v < 0 { 0 } else { v as usize }
}
}
fn collect_signed_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Self::T>> {
let len = self.len();
let from = from.map(|i| Self::i64_to_usize(i, len));
let to = to.map(|i| Self::i64_to_usize(i, len));
self.collect_range(from, to)
}
#[inline]
fn collect_range_axum_json(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Json<Vec<Self::T>>> {
Ok(Json(self.collect_signed_range(from, to)?))
}
#[inline]
fn collect_range_serde_json(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Value>> {
self.collect_signed_range(from, to)?
.into_iter()
.map(|v| serde_json::to_value(v).map_err(Error::from))
.collect::<Result<Vec<_>>>()
}
#[inline]
fn collect_range_response(&self, from: Option<i64>, to: Option<i64>) -> Result<Response> {
Ok(self.collect_range_axum_json(from, to)?.into_response())
}
fn path(&self) -> &Path;
#[inline]
fn path_vec(&self) -> PathBuf {
Self::path_vec_(self.path())
}
#[inline]
fn path_vec_(path: &Path) -> PathBuf {
path.join("vec")
}
#[inline]
fn path_version_(path: &Path) -> PathBuf {
path.join("version")
}
#[inline]
fn path_compressed_(path: &Path) -> PathBuf {
path.join("compressed")
}
#[inline]
fn file_name(&self) -> String {
self.path()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_owned()
}
fn version(&self) -> Version;
}
+4 -3
View File
@@ -1,8 +1,9 @@
mod any; mod dynamic;
// mod bytes; mod generic;
mod stored_index; mod stored_index;
mod stored_type; mod stored_type;
pub use any::*; pub use dynamic::*;
pub use generic::*;
pub use stored_index::*; pub use stored_index::*;
pub use stored_type::*; pub use stored_type::*;
+20 -2
View File
@@ -5,10 +5,28 @@ use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes};
pub trait StoredType pub trait StoredType
where where
Self: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + Serialize, Self: Sized
+ Debug
+ Clone
+ TryFromBytes
+ IntoBytes
+ Immutable
+ KnownLayout
+ Send
+ Sync
+ Serialize,
{ {
} }
impl<T> StoredType for T where impl<T> StoredType for T where
T: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + Serialize T: Sized
+ Debug
+ Clone
+ TryFromBytes
+ IntoBytes
+ Immutable
+ KnownLayout
+ Send
+ Sync
+ Serialize
{ {
} }
+517
View File
@@ -0,0 +1,517 @@
use std::{
fs::{self, File},
mem,
path::Path,
sync::{Arc, OnceLock},
};
use arc_swap::{ArcSwap, Guard};
use memmap2::Mmap;
use rayon::prelude::*;
use zstd::DEFAULT_COMPRESSION_LEVEL;
use crate::{
CompressedPageMetadata, CompressedPagesMetadata, DynamicVec, Error, GenericVec, RawVec, Result,
StoredIndex, StoredType, UnsafeSlice, Version,
};
const ONE_KIB: usize = 1024;
const ONE_MIB: usize = ONE_KIB * ONE_KIB;
pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB;
pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB;
#[derive(Debug)]
pub struct CompressedVec<I, T> {
inner: RawVec<I, T>,
decoded_page: Option<(usize, Vec<T>)>,
decoded_pages: Option<Vec<OnceLock<Vec<T>>>>,
pages_meta: Arc<ArcSwap<CompressedPagesMetadata>>,
// pages: Option<Vec<OnceLock<Values<T>>>>,
// page: Option<(usize, Values<T>)>,
// length: Length
}
impl<I, T> CompressedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T;
pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T;
pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE;
/// Same as import but will reset the folder under certain errors, so be careful !
pub fn forced_import(path: &Path, version: Version) -> Result<Self> {
let res = Self::import(path, version);
match res {
Err(Error::WrongEndian)
| Err(Error::DifferentVersion { .. })
| Err(Error::DifferentCompressionMode) => {
fs::remove_dir_all(path)?;
Self::import(path, version)
}
_ => res,
}
}
pub fn import(path: &Path, version: Version) -> Result<Self> {
fs::create_dir_all(path)?;
let vec_exists = fs::exists(Self::path_vec_(path)).is_ok_and(|b| b);
let compressed_path = Self::path_compressed_(path);
let compressed_exists = fs::exists(&compressed_path).is_ok_and(|b| b);
if vec_exists && !compressed_exists {
return Err(Error::DifferentCompressionMode);
}
if !vec_exists && !compressed_exists {
File::create(&compressed_path)?;
}
Ok(Self {
inner: RawVec::import(path, version)?,
decoded_page: None,
decoded_pages: None,
pages_meta: Arc::new(ArcSwap::new(Arc::new(CompressedPagesMetadata::read(path)?))),
})
}
fn cached_get_stored__(
index: usize,
mmap: &Mmap,
stored_len: usize,
decoded_page: &mut Option<(usize, Vec<T>)>,
compressed_pages_meta: &CompressedPagesMetadata,
) -> Result<Option<T>> {
let page_index = Self::index_to_page_index(index);
if decoded_page.as_ref().is_none_or(|b| b.0 != page_index) {
let values = Self::decode_page_(stored_len, page_index, mmap, compressed_pages_meta)?;
decoded_page.replace((page_index, values));
}
Ok(decoded_page
.as_ref()
.unwrap()
.1
.get(index % Self::PER_PAGE)
.cloned())
}
fn decode_page(&self, page_index: usize, mmap: &Mmap) -> Result<Vec<T>> {
Self::decode_page_(self.stored_len(), page_index, mmap, &self.pages_meta.load())
}
fn decode_page_(
stored_len: usize,
page_index: usize,
mmap: &Mmap,
compressed_pages_meta: &CompressedPagesMetadata,
) -> Result<Vec<T>> {
if Self::page_index_to_index(page_index) >= stored_len {
return Err(Error::IndexTooHigh);
} else if compressed_pages_meta.len() <= page_index {
return Err(Error::ExpectVecToHaveIndex);
}
let page = compressed_pages_meta.get(page_index).unwrap();
let len = page.bytes_len as usize;
let offset = page.start as usize;
Ok(zstd::decode_all(&mmap[offset..offset + len])
.inspect_err(|_| {
dbg!((len, offset, page_index, &mmap[..], &mmap.len()));
})?
.chunks(Self::SIZE_OF_T)
.map(|slice| T::try_read_from_bytes(slice).unwrap())
.collect::<Vec<_>>())
}
fn compress_page(chunk: &[T]) -> Vec<u8> {
if chunk.len() > Self::PER_PAGE {
panic!();
}
let mut bytes: Vec<u8> = vec![0; chunk.len() * Self::SIZE_OF_T];
let unsafe_bytes = UnsafeSlice::new(&mut bytes);
chunk
.into_par_iter()
.enumerate()
.for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes()));
zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL).unwrap()
}
pub fn enable_large_cache(&mut self) {
self.decoded_pages.replace(vec![]);
self.reset_large_cache();
}
pub fn disable_large_cache(&mut self) {
self.decoded_pages.take();
}
fn reset_large_cache(&mut self) {
let stored_len = self.stored_len();
if let Some(pages) = self.decoded_pages.as_mut() {
pages.par_iter_mut().for_each(|lock| {
lock.take();
});
let len = (stored_len as f64 / Self::PER_PAGE as f64).ceil() as usize;
let len = Self::CACHE_LENGTH.min(len);
if pages.len() != len {
pages.resize_with(len, Default::default);
}
}
}
pub fn large_cache_len(&self) -> usize {
self.decoded_pages.as_ref().map_or(0, |v| v.len())
}
fn reset_small_cache(&mut self) {
self.decoded_page.take();
}
fn reset_caches(&mut self) {
self.reset_small_cache();
self.reset_large_cache();
}
#[inline(always)]
fn index_to_page_index(index: usize) -> usize {
index / Self::PER_PAGE
}
#[inline(always)]
fn page_index_to_index(page_index: usize) -> usize {
page_index * Self::PER_PAGE
}
}
impl<I, T> DynamicVec for CompressedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
type I = I;
type T = T;
#[inline]
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let cached_start = self
.stored_len()
.checked_sub(Self::CACHE_LENGTH)
.unwrap_or_default();
let decoded_index = index % Self::PER_PAGE;
if index >= cached_start {
let trimmed_index = index - cached_start;
if let Some(decoded_pages) = self.decoded_pages.as_ref() {
let decoded_page = decoded_pages
.get(Self::index_to_page_index(trimmed_index))
.unwrap();
return Ok(decoded_page
.get_or_init(|| {
self.decode_page(Self::index_to_page_index(index), mmap)
.unwrap()
})
.get(decoded_index)
.cloned());
}
}
let page_index = Self::index_to_page_index(index);
Ok(self
.decode_page(page_index, mmap)?
.get(decoded_index)
.cloned())
}
#[inline]
fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
Self::cached_get_stored__(
index,
mmap,
self.stored_len(),
&mut self.decoded_page,
&self.pages_meta.load(),
)
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
self.inner.mmap()
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
self.inner.guard()
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
self.inner.mut_guard()
}
fn stored_len(&self) -> usize {
let pages_meta = self.pages_meta.load();
if let Some(last) = pages_meta.last() {
(pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize
} else {
0
}
}
#[inline]
fn pushed(&self) -> &[T] {
self.inner.pushed()
}
#[inline]
fn mut_pushed(&mut self) -> &mut Vec<T> {
self.inner.mut_pushed()
}
}
impl<I, T> GenericVec<I, T> for CompressedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn iter_from<F>(&mut self, index: I, mut f: F) -> Result<()>
where
F: FnMut((I, T, &mut dyn DynamicVec<I = Self::I, T = Self::T>)) -> Result<()>,
{
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let start = index.to_usize()?;
let stored_len = self.stored_len();
if start >= stored_len {
return Ok(());
}
let mmap = self.mmap().load();
let pages_meta = self.pages_meta.load();
(start..stored_len).try_for_each(|index| {
let v = Self::cached_get_stored__(
index,
&mmap,
stored_len,
&mut self.decoded_page,
&pages_meta,
)?
.unwrap();
f((I::from(index), v, self as &mut dyn DynamicVec<I = I, T = T>))
})
}
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<Self::T>> {
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let stored_len = self.stored_len();
let from = from.unwrap_or_default();
let to = to.map_or(stored_len, |i| i.min(stored_len));
if from >= stored_len {
return Ok(vec![]);
}
let mmap = self.mmap().load();
let pages_meta = self.pages_meta.load();
let mut decoded_page: Option<(usize, Vec<T>)> = None;
(from..to)
.map(|index| {
Self::cached_get_stored__(index, &mmap, stored_len, &mut decoded_page, &pages_meta)
.map(|opt| opt.unwrap())
})
.collect::<Result<Vec<_>>>()
}
fn flush(&mut self) -> Result<()> {
let pushed_len = self.pushed_len();
if pushed_len == 0 {
return Ok(());
}
let stored_len = self.stored_len();
let mut pages_meta = (**self.pages_meta.load()).clone();
let mut starting_page_index = pages_meta.len();
let mut values = vec![];
let mut truncate_at = None;
if self.stored_len() % Self::PER_PAGE != 0 {
if pages_meta.is_empty() {
unreachable!()
}
let last_page_index = pages_meta.len() - 1;
values = if let Some(values) = self
.decoded_pages
.as_mut()
.and_then(|v| v.last_mut().and_then(|lock| lock.take()))
{
values
} else if self
.decoded_page
.as_ref()
.is_some_and(|(page_index, _)| *page_index == last_page_index)
{
self.decoded_page.take().unwrap().1
} else {
Self::decode_page_(
stored_len,
last_page_index,
self.guard().as_ref().unwrap(),
&pages_meta,
)
.inspect_err(|_| {
dbg!(last_page_index, &pages_meta);
})
.unwrap()
};
truncate_at.replace(pages_meta.pop().unwrap().start);
starting_page_index = last_page_index;
}
let compressed = values
.into_par_iter()
.chain(mem::take(self.mut_pushed()).into_par_iter())
.chunks(Self::PER_PAGE)
.map(|chunk| (Self::compress_page(chunk.as_ref()), chunk.len()))
.collect::<Vec<_>>();
compressed
.iter()
.enumerate()
.for_each(|(i, (compressed_bytes, values_len))| {
let page_index = starting_page_index + i;
let start = if page_index != 0 {
let prev = pages_meta.get(page_index - 1).unwrap();
prev.start + prev.bytes_len as u64
} else {
0
};
let bytes_len = compressed_bytes.len() as u32;
let values_len = *values_len as u32;
let page = CompressedPageMetadata::new(start, bytes_len, values_len);
pages_meta.push(page_index, page);
});
let buf = compressed
.into_iter()
.flat_map(|(v, _)| v)
.collect::<Vec<_>>();
pages_meta.write()?;
if let Some(truncate_at) = truncate_at {
self.file_set_len(truncate_at)?;
}
self.file_write_all(&buf)?;
self.pages_meta.store(Arc::new(pages_meta));
self.reset_caches();
Ok(())
}
fn truncate_if_needed(&mut self, index: I) -> Result<()> {
let index = index.to_usize()?;
if index >= self.stored_len() {
return Ok(());
}
if index == 0 {
self.reset()?;
return Ok(());
}
let page_index = Self::index_to_page_index(index);
let guard = self.guard().as_ref().unwrap();
let mut pages_meta = (**self.pages_meta.load()).clone();
let values = self.decode_page(page_index, guard)?;
let mut buf = vec![];
let mut page = pages_meta.truncate(page_index).unwrap();
let len = page.start;
let decoded_index = index % Self::PER_PAGE;
if decoded_index != 0 {
let chunk = &values[..decoded_index];
buf = Self::compress_page(chunk);
page.values_len = chunk.len() as u32;
page.bytes_len = buf.len() as u32;
pages_meta.push(page_index, page);
}
pages_meta.write()?;
self.pages_meta.store(Arc::new(pages_meta));
self.file_truncate_and_write_all(len, &buf)?;
self.reset_caches();
Ok(())
}
#[inline]
fn path(&self) -> &Path {
self.inner.path()
}
#[inline]
fn version(&self) -> Version {
self.inner.version()
}
}
impl<I, T> Clone for CompressedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
decoded_page: None,
decoded_pages: None,
pages_meta: self.pages_meta.clone(),
}
}
}
+5
View File
@@ -0,0 +1,5 @@
mod compressed;
mod raw;
pub use compressed::*;
pub use raw::*;
+241
View File
@@ -0,0 +1,241 @@
use std::{
fs,
marker::PhantomData,
mem,
path::{Path, PathBuf},
sync::Arc,
};
use arc_swap::{ArcSwap, Guard};
use memmap2::Mmap;
use rayon::prelude::*;
use crate::{DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Version};
#[derive(Debug)]
pub struct RawVec<I, T> {
version: Version,
pathbuf: PathBuf,
// Consider Arc<ArcSwap<Option<Mmap>>> for dataraces when reorg ?
mmap: Arc<ArcSwap<Mmap>>,
guard: Option<Guard<Arc<Mmap>>>,
pushed: Vec<T>,
phantom: PhantomData<I>,
}
impl<I, T> RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
/// Same as import but will reset the folder under certain errors, so be careful !
pub fn forced_import(path: &Path, version: Version) -> Result<Self> {
let res = Self::import(path, version);
match res {
Err(Error::WrongEndian) | Err(Error::DifferentVersion { .. }) => {
fs::remove_dir_all(path)?;
Self::import(path, version)
}
_ => res,
}
}
pub fn import(path: &Path, version: Version) -> Result<Self> {
fs::create_dir_all(path)?;
let version_path = Self::path_version_(path);
version.validate(version_path.as_ref())?;
version.write(version_path.as_ref())?;
let file = Self::open_file_(Self::path_vec_(path).as_path())?;
let mmap = Arc::new(ArcSwap::new(Self::new_mmap(file)?));
let guard = Some(mmap.load());
Ok(Self {
mmap,
guard,
version,
pathbuf: path.to_owned(),
pushed: vec![],
phantom: PhantomData,
})
}
}
impl<I, T> DynamicVec for RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
type I = I;
type T = T;
#[inline]
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let index = index * Self::SIZE_OF_T;
let slice = &mmap[index..(index + Self::SIZE_OF_T)];
Self::T::try_read_from_bytes(slice)
.map(|v| Some(v))
.map_err(Error::from)
}
#[inline]
fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
self.get_stored_(index, mmap)
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
&self.mmap
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
&self.guard
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
&mut self.guard
}
#[inline]
fn stored_len(&self) -> usize {
if let Some(guard) = self.guard() {
guard.len() / Self::SIZE_OF_T
} else {
self.mmap.load().len() / Self::SIZE_OF_T
}
}
#[inline]
fn pushed(&self) -> &[T] {
self.pushed.as_slice()
}
#[inline]
fn mut_pushed(&mut self) -> &mut Vec<T> {
&mut self.pushed
}
}
impl<I, T> GenericVec<I, T> for RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn iter_from<F>(&mut self, index: I, mut f: F) -> Result<()>
where
F: FnMut((I, T, &mut dyn DynamicVec<I = Self::I, T = Self::T>)) -> Result<()>,
{
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let start = index.to_usize()?;
let stored_len = self.stored_len();
if start >= stored_len {
return Ok(());
}
let guard = self.mmap.load();
(start..stored_len).try_for_each(|index| {
let v = self.get_stored_(index, &guard)?.unwrap();
f((I::from(index), v, self as &mut dyn DynamicVec<I = I, T = T>))
})
}
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<T>> {
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let stored_len = self.stored_len();
let from = from.unwrap_or_default();
let to = to.map_or(stored_len, |i| i.min(stored_len));
if from >= stored_len {
return Ok(vec![]);
}
let mmap = self.mmap.load();
(from..to)
.map(|index| self.get_stored_(index, &mmap).map(|opt| opt.unwrap()))
.collect::<Result<Vec<_>>>()
}
fn flush(&mut self) -> Result<()> {
let pushed_len = self.pushed_len();
if pushed_len == 0 {
return Ok(());
}
let bytes = {
let pushed = &mut self.pushed;
let mut bytes: Vec<u8> = vec![0; pushed.len() * Self::SIZE_OF_T];
let unsafe_bytes = UnsafeSlice::new(&mut bytes);
mem::take(pushed)
.into_par_iter()
.enumerate()
.for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes()));
bytes
};
self.file_write_all(&bytes)?;
Ok(())
}
fn truncate_if_needed(&mut self, index: I) -> Result<()> {
let index = index.to_usize()?;
if index >= self.stored_len() {
return Ok(());
}
if index == 0 {
self.reset()?;
return Ok(());
}
let len = index * Self::SIZE_OF_T;
self.file_set_len(len as u64)?;
Ok(())
}
#[inline]
fn path(&self) -> &Path {
self.pathbuf.as_path()
}
#[inline]
fn version(&self) -> Version {
self.version
}
}
impl<I, T> Clone for RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn clone(&self) -> Self {
Self {
version: self.version,
pathbuf: self.pathbuf.clone(),
// Consider Arc<ArcSwap<Option<Mmap>>> for dataraces when reorg ?
mmap: self.mmap.clone(),
guard: None,
pushed: vec![],
phantom: PhantomData,
}
}
}
+46 -36
View File
@@ -18,7 +18,7 @@
* "Transactions" | * "Transactions" |
* "US Dollars" | * "US Dollars" |
* "Virtual Bytes" | * "Virtual Bytes" |
* "Weight" * "Weight Units"
* } Unit * } Unit
* *
* @typedef {Object} BaseSeriesBlueprint * @typedef {Object} BaseSeriesBlueprint
@@ -810,23 +810,28 @@ function createPartialOptions(colors) {
name: "Charts", name: "Charts",
tree: [ tree: [
{ {
name: "btc/usd", name: "Price",
title: "Bitcoin Price in US Dollars", tree: [
},
{
name: "usd/sats",
title: "Satoshis Per US Dollar",
unit: "Satoshis",
bottom: [
{ {
key: "sats-per-dollar", name: "btc/usd",
title: "Satoshis", title: "Bitcoin Price in US Dollars",
color: colors.bitcoin, },
{
name: "usd/sats",
title: "Satoshis Per US Dollar",
unit: "Satoshis",
bottom: [
{
key: "sats-per-dollar",
title: "Satoshis",
color: colors.bitcoin,
},
],
}, },
], ],
}, },
{ {
name: "Blocks", name: "Block",
tree: [ tree: [
createBaseSumTotal({ createBaseSumTotal({
name: "Count", name: "Count",
@@ -856,28 +861,33 @@ function createPartialOptions(colors) {
], ],
}, },
{ {
name: "Transactions", name: "Transaction",
tree: [ tree: [
{ createBaseSumTotal({
name: "Inputs", name: "Count",
tree: [ title: "Transaction Count",
createBaseSumTotal({ key: "tx-count",
name: "Count", }),
title: "Transaction Input Count", ],
key: "input-count", },
}), {
], name: "Input",
}, tree: [
{ createBaseSumTotal({
name: "Outputs", name: "Count",
tree: [ title: "Transaction Input Count",
createBaseSumTotal({ key: "input-count",
name: "Count", }),
title: "Transaction Output Count", ],
key: "output-count", },
}), {
], name: "Output",
}, tree: [
createBaseSumTotal({
name: "Count",
title: "Transaction Output Count",
key: "output-count",
}),
], ],
}, },
], ],
@@ -934,7 +944,7 @@ function createPartialOptions(colors) {
url: () => "https://status.kibo.money/", url: () => "https://status.kibo.money/",
}, },
{ {
name: "Crate", name: "Crates",
url: () => "https://crates.io/crates/brk", url: () => "https://crates.io/crates/brk",
}, },
], ],
@@ -1241,7 +1251,7 @@ export function initOptions({
} else if (key.includes("-size")) { } else if (key.includes("-size")) {
anyPartial.unit = "Megabytes"; anyPartial.unit = "Megabytes";
} else if (key.includes("-weight")) { } else if (key.includes("-weight")) {
anyPartial.unit = "Weight"; anyPartial.unit = "Weight Units";
} else if (key.includes("-vbytes")) { } else if (key.includes("-vbytes")) {
anyPartial.unit = "Virtual Bytes"; anyPartial.unit = "Virtual Bytes";
} else { } else {
@@ -152,10 +152,25 @@ export function createVecIdToIndexes() {
"total-input-count": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch], "total-input-count": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"total-output-count": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch], "total-output-count": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"total-size": [Txindex], "total-size": [Txindex],
"total-tx-count": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"total-tx-v1": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"total-tx-v2": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"total-tx-v3": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"tx-count": [Height],
"tx-count-sum": [Dateindex, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"tx-v1": [Height],
"tx-v1-sum": [Dateindex, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"tx-v2": [Height],
"tx-v2-sum": [Dateindex, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"tx-v3": [Height],
"tx-v3-sum": [Dateindex, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
txid: [Txindex], txid: [Txindex],
txoutindex: [Txinindex], txoutindex: [Txinindex],
txversion: [Txindex], txversion: [Txindex],
value: [Txoutindex], v1: [Txindex],
v2: [Txindex],
v3: [Txindex],
value: [Txinindex, Txoutindex],
weekindex: [Dateindex, Weekindex], weekindex: [Dateindex, Weekindex],
yearindex: [Monthindex, Yearindex], yearindex: [Monthindex, Yearindex],
} }