Compare commits

..

9 Commits

Author SHA1 Message Date
nym21 9aec991da6 release: v0.0.20 2025-04-10 21:39:34 +02:00
nym21 910701ce04 cleanup: old files 2025-04-10 21:39:12 +02:00
nym21 34b462d511 global: snapshot 2025-04-10 21:38:39 +02:00
nym21 139e93b2f0 vec: rework part 4 2025-04-10 15:55:26 +02:00
nym21 0dd7e9359e vec: rework part 3 2025-04-10 01:11:52 +02:00
nym21 41cf0225e3 vec: rework part 2 2025-04-09 22:59:18 +02:00
nym21 962254e511 vec: rework part 1 2025-04-09 16:31:31 +02:00
nym21 a7f2b24bac comp + vec: tiny opti 2025-04-08 15:38:20 +02:00
nym21 1323d988af computer + kibo: part 8 2025-04-08 11:40:35 +02:00
44 changed files with 2034 additions and 1377 deletions
Generated
+20 -12
View File
@@ -138,6 +138,12 @@ dependencies = [
"derive_arbitrary",
]
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "arrayvec"
version = "0.7.6"
@@ -368,7 +374,7 @@ dependencies = [
[[package]]
name = "brk"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"brk_cli",
"brk_computer",
@@ -385,7 +391,7 @@ dependencies = [
[[package]]
name = "brk_cli"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"brk_computer",
"brk_core",
@@ -406,7 +412,7 @@ dependencies = [
[[package]]
name = "brk_computer"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"brk_core",
"brk_exit",
@@ -421,7 +427,7 @@ dependencies = [
[[package]]
name = "brk_core"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"bitcoin",
"bitcoincore-rpc",
@@ -438,7 +444,7 @@ dependencies = [
[[package]]
name = "brk_exit"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"brk_logger",
"ctrlc",
@@ -447,7 +453,7 @@ dependencies = [
[[package]]
name = "brk_fetcher"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"brk_core",
"brk_logger",
@@ -460,7 +466,7 @@ dependencies = [
[[package]]
name = "brk_indexer"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"bitcoin",
"bitcoincore-rpc",
@@ -479,7 +485,7 @@ dependencies = [
[[package]]
name = "brk_logger"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"color-eyre",
"env_logger",
@@ -489,7 +495,7 @@ dependencies = [
[[package]]
name = "brk_parser"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"bitcoin",
"bitcoincore-rpc",
@@ -504,7 +510,7 @@ dependencies = [
[[package]]
name = "brk_query"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"brk_computer",
"brk_indexer",
@@ -520,7 +526,7 @@ dependencies = [
[[package]]
name = "brk_server"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"axum",
"brk_computer",
@@ -546,8 +552,10 @@ dependencies = [
[[package]]
name = "brk_vec"
version = "0.0.19"
version = "0.0.20"
dependencies = [
"arc-swap",
"axum",
"memmap2",
"rayon",
"serde",
+2 -1
View File
@@ -4,7 +4,7 @@ members = ["crates/*"]
package.description = "The Bitcoin Research Kit is a suite of tools designed to extract, compute and display data stored on a Bitcoin Core node"
package.license = "MIT"
package.edition = "2024"
package.version = "0.0.19"
package.version = "0.0.20"
package.repository = "https://github.com/bitcoinresearchkit/brk"
[profile.release]
@@ -16,6 +16,7 @@ panic = "abort"
inherits = "release"
[workspace.dependencies]
axum = "0.8.3"
bitcoin = { version = "0.32.5", features = ["serde"] }
bitcoincore-rpc = "0.19.0"
brk_cli = { version = "0", path = "crates/brk_cli" }
+3 -2
View File
@@ -35,7 +35,7 @@
</p>
> **WARNING**
>
>
> This project is still a work in progress and while it's much better in many ways than its previous version ([kibo v0.5](https://github.com/kibo-money/kibo)), it doesn't yet include all of those datasets. If you're interested in having everything right now, please use the latter until feature parity is achieved.
>
> The explorer part (mempool.space/electrs) is also not viable just yet.
@@ -58,6 +58,7 @@ The toolkit can be used in various ways to accommodate as many needs as possible
For more information visit: [`brk_cli`](https://crates.io/crates/brk_cli)
- **[Crates](https://crates.io/crates/brk)** \
Rust developers have access to a wide range crates, each built upon one another with its own specific purpose, enabling independent use and offering great flexibility.
PRs are welcome, especially if their goal is to introduce additional datasets.
The primary goal of this project is to be fully-featured and accessible for everyone, regardless of their background or financial situation - whether that person is an enthusiast, researcher, miner, analyst, or simply curious.
@@ -76,7 +77,7 @@ In contrast, existing alternatives tend to be either [very costly](https://studi
- [`brk_parser`](https://crates.io/crates/brk_parser): A very fast Bitcoin Core block parser and iterator built on top of bitcoin-rust
- [`brk_query`](https://crates.io/crates/brk_query): A library that finds requested datasets.
- [`brk_server`](https://crates.io/crates/brk_server): A server that serves Bitcoin data and swappable front-ends, built on top of `brk_indexer`, `brk_fetcher` and `brk_computer`
- [`brk_vec`](https://crates.io/crates/brk_vec): A very small, fast, efficient and simple storable Vec.
- [`brk_vec`](https://crates.io/crates/brk_vec): A push-only, truncable, compressable, saveable Vec
## Acknowledgments
+4 -3
View File
@@ -272,9 +272,10 @@ impl RunConfig {
}
fn read(path: &Path) -> Self {
fs::read_to_string(path).map_or(RunConfig::default(), |contents| {
toml::from_str(&contents).unwrap_or_default()
})
fs::read_to_string(path).map_or_else(
|_| RunConfig::default(),
|contents| toml::from_str(&contents).unwrap_or_default(),
)
}
fn write(&self, path: &Path) -> std::io::Result<()> {
+91 -59
View File
@@ -2,7 +2,6 @@ use core::error;
use std::{
cmp::Ordering,
fmt::Debug,
io,
ops::{Add, Sub},
path::{Path, PathBuf},
};
@@ -10,15 +9,23 @@ use std::{
use brk_core::CheckedSub;
use brk_exit::Exit;
use brk_vec::{
AnyStorableVec, Compressed, Error, Result, StorableVec, StoredIndex, StoredType, Version,
Compressed, DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, StoredVec, Value,
Version,
};
use log::info;
const FLUSH_EVERY: usize = 10_000;
const ONE_KIB: usize = 1024;
const ONE_MIB: usize = ONE_KIB * ONE_KIB;
const MAX_CACHE_SIZE: usize = 210 * ONE_MIB;
#[derive(Debug)]
pub struct ComputedVec<I, T> {
pub struct ComputedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
computed_version: Option<Version>,
vec: StorableVec<I, T>,
inner: StoredVec<I, T>,
}
impl<I, T> ComputedVec<I, T>
@@ -26,16 +33,18 @@ where
I: StoredIndex,
T: StoredType,
{
const SIZE_OF: usize = size_of::<T>();
pub fn forced_import(
path: &Path,
version: Version,
compressed: Compressed,
) -> brk_vec::Result<Self> {
let vec = StorableVec::forced_import(path, version, compressed)?;
let inner = StoredVec::forced_import(path, version, compressed)?;
Ok(Self {
computed_version: None,
vec,
inner,
})
}
@@ -44,7 +53,7 @@ where
return Ok(());
}
exit.block();
self.vec.truncate_if_needed(index)?;
self.inner.truncate_if_needed(index)?;
exit.release();
Ok(())
}
@@ -59,102 +68,118 @@ where
if ord == Ordering::Greater {
self.safe_truncate_if_needed(index, exit)?;
}
self.vec.push(value);
self.inner.push(value);
}
}
if self.vec.pushed_len() >= FLUSH_EVERY {
Ok(self.safe_flush(exit)?)
if self.inner.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE {
self.safe_flush(exit)
} else {
Ok(())
}
}
pub fn safe_flush(&mut self, exit: &Exit) -> io::Result<()> {
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
if exit.triggered() {
return Ok(());
}
exit.block();
self.vec.flush()?;
self.inner.flush()?;
exit.release();
Ok(())
}
fn version(&self) -> Version {
self.vec.version()
self.inner.version()
}
pub fn len(&self) -> usize {
self.vec.len()
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn vec(&self) -> &StorableVec<I, T> {
&self.vec
fn file_name(&self) -> String {
self.inner.file_name()
}
pub fn mut_vec(&mut self) -> &mut StorableVec<I, T> {
&mut self.vec
pub fn vec(&self) -> &StoredVec<I, T> {
&self.inner
}
pub fn any_vec(&self) -> &dyn AnyStorableVec {
&self.vec
pub fn mut_vec(&mut self) -> &mut StoredVec<I, T> {
&mut self.inner
}
pub fn mut_any_vec(&mut self) -> &mut dyn AnyStorableVec {
&mut self.vec
pub fn any_vec(&self) -> &dyn brk_vec::AnyStoredVec {
&self.inner
}
pub fn get(&mut self, index: I) -> Result<Option<&T>> {
self.vec.get(index)
pub fn mut_any_vec(&mut self) -> &mut dyn brk_vec::AnyStoredVec {
&mut self.inner
}
pub fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<T>> {
self.vec.collect_range(from, to)
pub fn cached_get(&mut self, index: I) -> Result<Option<Value<T>>> {
self.inner.cached_get(index)
}
pub fn collect_inclusive_range(&self, from: I, to: I) -> Result<Vec<T>> {
self.inner.collect_inclusive_range(from, to)
}
pub fn path(&self) -> &Path {
self.inner.path()
}
#[inline]
fn path_computed_version(&self) -> PathBuf {
self.vec.path().join("computed_version")
self.inner.path().join("computed_version")
}
fn validate_computed_version_or_reset_file(&mut self, version: Version) -> Result<()> {
let path = self.path_computed_version();
if version.validate(path.as_ref()).is_err() {
self.vec.reset()?;
self.inner.reset()?;
}
version.write(path.as_ref())?;
if self.is_empty() {
info!("Computing {}...", self.file_name())
}
Ok(())
}
pub fn compute_transform<A, B, F>(
&mut self,
max_from: A,
other: &mut StorableVec<A, B>,
other: &mut StoredVec<A, B>,
mut t: F,
exit: &Exit,
) -> Result<()>
where
A: StoredIndex,
B: StoredType,
F: FnMut((A, B, &mut Self, &mut StorableVec<A, B>)) -> (I, T),
F: FnMut((A, B, &mut Self, &mut dyn DynamicVec<I = A, T = B>)) -> (I, T),
{
self.validate_computed_version_or_reset_file(
Version::ZERO + self.version() + other.version(),
)?;
let index = max_from.min(A::from(self.len()));
other.iter_from_cloned(index, |(a, b, other)| {
other.iter_from(index, |(a, b, other)| {
let (i, v) = t((a, b, self, other));
self.forced_push_at(i, v, exit)
})?;
Ok(self.safe_flush(exit)?)
self.safe_flush(exit)
}
pub fn compute_inverse_more_to_less(
&mut self,
max_from: T,
other: &mut StorableVec<T, I>,
other: &mut StoredVec<T, I>,
exit: &Exit,
) -> Result<()>
where
@@ -165,24 +190,27 @@ where
Version::ZERO + self.version() + other.version(),
)?;
let index = max_from.min(self.vec.get_last()?.cloned().unwrap_or_default());
let index = max_from.min(
self.inner
.cached_get_last()?
.map_or_else(T::default, |v| v.into_inner()),
);
other.iter_from(index, |(v, i, ..)| {
let i = *i;
if self.get(i).unwrap().is_none_or(|old_v| *old_v > v) {
if self.cached_get(i).unwrap().is_none_or(|old_v| *old_v > v) {
self.forced_push_at(i, v, exit)
} else {
Ok(())
}
})?;
Ok(self.safe_flush(exit)?)
self.safe_flush(exit)
}
pub fn compute_inverse_less_to_more(
&mut self,
max_from: T,
first_indexes: &mut StorableVec<T, I>,
last_indexes: &mut StorableVec<T, I>,
first_indexes: &mut StoredVec<T, I>,
last_indexes: &mut StoredVec<T, I>,
exit: &Exit,
) -> Result<()>
where
@@ -196,18 +224,18 @@ where
let index = max_from.min(T::from(self.len()));
first_indexes.iter_from(index, |(value, first_index, ..)| {
let first_index = (first_index).to_usize()?;
let last_index = (last_indexes.get(value)?.unwrap()).to_usize()?;
let last_index = (last_indexes.cached_get(value)?.unwrap()).to_usize()?;
(first_index..last_index)
.try_for_each(|index| self.forced_push_at(I::from(index), value, exit))
})?;
Ok(self.safe_flush(exit)?)
self.safe_flush(exit)
}
pub fn compute_last_index_from_first(
&mut self,
max_from: I,
first_indexes: &mut StorableVec<I, T>,
first_indexes: &mut StoredVec<I, T>,
final_len: usize,
exit: &Exit,
) -> Result<()>
@@ -236,14 +264,14 @@ where
)?;
}
Ok(self.safe_flush(exit)?)
self.safe_flush(exit)
}
pub fn compute_count_from_indexes<T2>(
&mut self,
max_from: I,
first_indexes: &mut StorableVec<I, T2>,
last_indexes: &mut StorableVec<I, T2>,
first_indexes: &mut StoredVec<I, T2>,
last_indexes: &mut StoredVec<I, T2>,
exit: &Exit,
) -> Result<()>
where
@@ -257,21 +285,21 @@ where
let index = max_from.min(I::from(self.len()));
first_indexes.iter_from(index, |(i, first_index, ..)| {
let last_index = last_indexes.get(i)?.unwrap();
let last_index = last_indexes.cached_get(i)?.unwrap();
let count = (*last_index + 1_usize)
.checked_sub(*first_index)
.checked_sub(first_index)
.unwrap_or_default();
self.forced_push_at(i, count.into(), exit)
})?;
Ok(self.safe_flush(exit)?)
self.safe_flush(exit)
}
pub fn compute_is_first_ordered<A>(
&mut self,
max_from: I,
self_to_other: &mut StorableVec<I, A>,
other_to_self: &mut StorableVec<A, I>,
self_to_other: &mut StoredVec<I, A>,
other_to_self: &mut StoredVec<A, I>,
exit: &Exit,
) -> Result<()>
where
@@ -285,17 +313,21 @@ where
let index = max_from.min(I::from(self.len()));
self_to_other.iter_from(index, |(i, other, ..)| {
self.forced_push_at(i, T::from(other_to_self.get(*other)?.unwrap() == &i), exit)
self.forced_push_at(
i,
T::from(other_to_self.cached_get(other)?.unwrap().into_inner() == i),
exit,
)
})?;
Ok(self.safe_flush(exit)?)
self.safe_flush(exit)
}
pub fn compute_sum_from_indexes<T2>(
&mut self,
max_from: I,
first_indexes: &mut StorableVec<I, T2>,
last_indexes: &mut StorableVec<I, T2>,
first_indexes: &mut StoredVec<I, T2>,
last_indexes: &mut StoredVec<I, T2>,
exit: &Exit,
) -> Result<()>
where
@@ -309,12 +341,12 @@ where
let index = max_from.min(I::from(self.len()));
first_indexes.iter_from(index, |(index, first_index, ..)| {
let last_index = last_indexes.get(index)?.unwrap();
let count = *last_index + 1_usize - *first_index;
let last_index = last_indexes.cached_get(index)?.unwrap();
let count = *last_index + 1_usize - first_index;
self.forced_push_at(index, count.into(), exit)
})?;
Ok(self.safe_flush(exit)?)
self.safe_flush(exit)
}
}
@@ -326,7 +358,7 @@ where
fn clone(&self) -> Self {
Self {
computed_version: self.computed_version,
vec: self.vec.clone(),
inner: self.inner.clone(),
}
}
}
@@ -4,7 +4,7 @@ use brk_core::{CheckedSub, StoredU32, StoredU64, StoredUsize, Timestamp, Weight}
use brk_exit::Exit;
use brk_indexer::Indexer;
use brk_parser::bitcoin;
use brk_vec::{AnyStorableVec, Compressed, Version};
use brk_vec::{Compressed, Version};
use super::{
Indexes,
@@ -156,7 +156,7 @@ impl Vecs {
Ok(())
}
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
[
self.indexes_to_block_interval.any_vecs(),
self.indexes_to_block_count.any_vecs(),
@@ -1,7 +1,9 @@
use std::path::Path;
use brk_exit::Exit;
use brk_vec::{AnyStorableVec, Compressed, Result, StorableVec, StoredIndex, StoredType, Version};
use brk_vec::{
Compressed, DynamicVec, GenericVec, Result, StoredIndex, StoredType, StoredVec, Version,
};
use crate::storage::vecs::base::ComputedVec;
@@ -114,12 +116,7 @@ where
Ok(s)
}
pub fn extend(
&mut self,
max_from: I,
source: &mut StorableVec<I, T>,
exit: &Exit,
) -> Result<()> {
pub fn extend(&mut self, max_from: I, source: &mut StoredVec<I, T>, exit: &Exit) -> Result<()> {
if self.total.is_none() {
return Ok(());
};
@@ -128,17 +125,16 @@ where
let total_vec = self.total.as_mut().unwrap();
source.iter_from(index, |(i, v)| {
source.iter_from(index, |(i, v, ..)| {
let prev = i
.to_usize()
.unwrap()
.checked_sub(1)
.map_or(T::from(0_usize), |prev_i| {
total_vec
.get(I::from(prev_i))
.cached_get(I::from(prev_i))
.unwrap()
.unwrap_or(&T::from(0_usize))
.to_owned()
.map_or(T::from(0_usize), |v| v.into_inner())
});
let value = v.clone() + prev;
total_vec.forced_push_at(i, value, exit)?;
@@ -154,9 +150,9 @@ where
pub fn compute<I2>(
&mut self,
max_from: I,
source: &mut StorableVec<I2, T>,
first_indexes: &mut StorableVec<I, I2>,
last_indexes: &mut StorableVec<I, I2>,
source: &mut StoredVec<I2, T>,
first_indexes: &mut StoredVec<I, I2>,
last_indexes: &mut StoredVec<I, I2>,
exit: &Exit,
) -> Result<()>
where
@@ -166,23 +162,25 @@ where
{
let index = self.starting_index(max_from);
first_indexes.iter_from(index, |(i, first_index)| {
let first_index = *first_index;
let last_index = *last_indexes.get(i).unwrap().unwrap();
first_indexes.iter_from(index, |(i, first_index, ..)| {
let last_index = *last_indexes.cached_get(i)?.unwrap();
if let Some(first) = self.first.as_mut() {
let v = source.get(first_index).unwrap().unwrap();
first.forced_push_at(index, v.clone(), exit)?;
let v = source.cached_get(first_index)?.unwrap().into_inner();
first.forced_push_at(index, v, exit)?;
}
if let Some(last) = self.last.as_mut() {
let v = source.get(last_index).unwrap().unwrap();
last.forced_push_at(index, v.clone(), exit)?;
let v = source
.cached_get(last_index)
.inspect_err(|_| {
dbg!(last.path(), last_index);
})?
.unwrap()
.into_inner();
last.forced_push_at(index, v, exit)?;
}
let first_index = first_index.to_usize()?;
let last_index = last_index.to_usize()?;
let needs_sum_or_total = self.sum.is_some() || self.total.is_some();
let needs_average_sum_or_total = needs_sum_or_total || self.average.is_some();
let needs_sorted = self.max.is_some()
@@ -195,8 +193,7 @@ where
let needs_values = needs_sorted || needs_average_sum_or_total;
if needs_values {
let mut values =
source.collect_range(Some(first_index as i64), Some(last_index as i64))?;
let mut values = source.collect_inclusive_range(first_index, last_index)?;
if needs_sorted {
values.sort_unstable();
@@ -254,7 +251,12 @@ where
let prev = i.to_usize().unwrap().checked_sub(1).map_or(
T::from(0_usize),
|prev_i| {
total_vec.get(I::from(prev_i)).unwrap().unwrap().to_owned()
total_vec
.cached_get(I::from(prev_i))
.unwrap()
.unwrap()
.to_owned()
.into_inner()
},
);
total_vec.forced_push_at(i, prev + sum, exit)?;
@@ -276,8 +278,8 @@ where
&mut self,
max_from: I,
source: &mut ComputedVecBuilder<I2, T>,
first_indexes: &mut StorableVec<I, I2>,
last_indexes: &mut StorableVec<I, I2>,
first_indexes: &mut StoredVec<I, I2>,
last_indexes: &mut StoredVec<I, I2>,
exit: &Exit,
) -> Result<()>
where
@@ -296,19 +298,18 @@ where
let index = self.starting_index(max_from);
first_indexes.iter_from(index, |(i, first_index)| {
let first_index = *first_index;
let last_index = *last_indexes.get(i).unwrap().unwrap();
first_indexes.iter_from(index, |(i, first_index, ..)| {
let last_index = *last_indexes.cached_get(i).unwrap().unwrap();
if let Some(first) = self.first.as_mut() {
let v = source
.first
.as_mut()
.unwrap()
.get(first_index)
.cached_get(first_index)
.unwrap()
.cloned()
.unwrap();
.unwrap()
.into_inner();
first.forced_push_at(index, v, exit)?;
}
@@ -317,16 +318,13 @@ where
.last
.as_mut()
.unwrap()
.get(last_index)
.cached_get(last_index)
.unwrap()
.cloned()
.unwrap();
.unwrap()
.into_inner();
last.forced_push_at(index, v, exit)?;
}
let first_index = Some(first_index.to_usize()? as i64);
let last_index = Some(last_index.to_usize()? as i64);
let needs_sum_or_total = self.sum.is_some() || self.total.is_some();
let needs_average_sum_or_total = needs_sum_or_total || self.average.is_some();
let needs_sorted = self.max.is_some() || self.min.is_some();
@@ -339,7 +337,7 @@ where
.max
.as_ref()
.unwrap()
.collect_range(first_index, last_index)?;
.collect_inclusive_range(first_index, last_index)?;
values.sort_unstable();
max.forced_push_at(i, values.last().unwrap().clone(), exit)?;
}
@@ -349,7 +347,7 @@ where
.min
.as_ref()
.unwrap()
.collect_range(first_index, last_index)?;
.collect_inclusive_range(first_index, last_index)?;
values.sort_unstable();
min.forced_push_at(i, values.first().unwrap().clone(), exit)?;
}
@@ -361,7 +359,7 @@ where
.average
.as_ref()
.unwrap()
.collect_range(first_index, last_index)?;
.collect_inclusive_range(first_index, last_index)?;
let len = values.len() as f64;
let total = values
.into_iter()
@@ -378,7 +376,7 @@ where
.sum
.as_ref()
.unwrap()
.collect_range(first_index, last_index)?;
.collect_inclusive_range(first_index, last_index)?;
let sum = values.into_iter().fold(T::from(0), |a, b| a + b);
if let Some(sum_vec) = self.sum.as_mut() {
@@ -389,7 +387,11 @@ where
let prev = i.to_usize().unwrap().checked_sub(1).map_or(
T::from(0_usize),
|prev_i| {
total_vec.get(I::from(prev_i)).unwrap().unwrap().to_owned()
total_vec
.cached_get(I::from(prev_i))
.unwrap()
.unwrap()
.into_inner()
},
);
total_vec.forced_push_at(i, prev + sum, exit)?;
@@ -434,8 +436,8 @@ where
))
}
pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
let mut v: Vec<&dyn AnyStorableVec> = vec![];
pub fn any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
let mut v: Vec<&dyn brk_vec::AnyStoredVec> = vec![];
if let Some(first) = self.first.as_ref() {
v.push(first.any_vec());
@@ -3,7 +3,7 @@ use std::path::Path;
use brk_core::{Dateindex, Decadeindex, Monthindex, Quarterindex, Weekindex, Yearindex};
use brk_exit::Exit;
use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Result, Version};
use brk_vec::{AnyStoredVec, Compressed, Result, Version};
use crate::storage::vecs::{Indexes, base::ComputedVec, indexes};
@@ -126,7 +126,7 @@ where
Ok(())
}
pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
[
vec![self.dateindex.any_vec()],
self.dateindex_extra.any_vecs(),
@@ -5,7 +5,7 @@ use brk_core::{
};
use brk_exit::Exit;
use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Result, StorableVec, Version};
use brk_vec::{AnyStoredVec, Compressed, Result, StoredVec, Version};
use crate::storage::vecs::{Indexes, base::ComputedVec, indexes};
@@ -102,7 +102,7 @@ where
indexes: &mut indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
height: Option<&mut StorableVec<Height, T>>,
height: Option<&mut StoredVec<Height, T>>,
) -> color_eyre::Result<()> {
let height = height.unwrap_or_else(|| self.height.as_mut().unwrap().mut_vec());
@@ -168,7 +168,7 @@ where
Ok(())
}
pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
[
self.height.as_ref().map_or(vec![], |v| vec![v.any_vec()]),
self.height_extra.any_vecs(),
@@ -3,7 +3,7 @@ use std::path::Path;
use brk_core::{Difficultyepoch, Height};
use brk_exit::Exit;
use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Result, Version};
use brk_vec::{AnyStoredVec, Compressed, Result, Version};
use crate::storage::vecs::{Indexes, base::ComputedVec, indexes};
@@ -84,7 +84,7 @@ where
Ok(())
}
pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
[
vec![self.height.any_vec()],
self.height_extra.any_vecs(),
@@ -6,7 +6,7 @@ use brk_core::{
};
use brk_exit::Exit;
use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Result, Version};
use brk_vec::{AnyStoredVec, Compressed, Result, StoredVec, Version};
use crate::storage::vecs::{Indexes, base::ComputedVec, indexes};
@@ -17,7 +17,7 @@ pub struct ComputedVecsFromTxindex<T>
where
T: ComputedType + PartialOrd,
{
pub txindex: ComputedVec<Txindex, T>,
pub txindex: Option<ComputedVec<Txindex, T>>,
pub txindex_extra: ComputedVecBuilder<Txindex, T>,
pub height: ComputedVecBuilder<Height, T>,
pub dateindex: ComputedVecBuilder<Dateindex, T>,
@@ -38,15 +38,19 @@ where
pub fn forced_import(
path: &Path,
name: &str,
compute_source: bool,
version: Version,
compressed: Compressed,
options: StorableVecGeneatorOptions,
) -> color_eyre::Result<Self> {
let txindex = ComputedVec::forced_import(
&path.join(format!("txindex_to_{name}")),
version,
compressed,
)?;
let txindex = compute_source.then(|| {
ComputedVec::forced_import(
&path.join(format!("txindex_to_{name}")),
version,
compressed,
)
.unwrap()
});
let txindex_extra = ComputedVecBuilder::forced_import(
path,
@@ -75,7 +79,7 @@ where
})
}
pub fn compute<F>(
pub fn compute_all<F>(
&mut self,
indexer: &mut Indexer,
indexes: &mut indexes::Vecs,
@@ -92,14 +96,35 @@ where
&Exit,
) -> Result<()>,
{
compute(&mut self.txindex, indexer, indexes, starting_indexes, exit)?;
compute(
self.txindex.as_mut().unwrap(),
indexer,
indexes,
starting_indexes,
exit,
)?;
self.compute_rest(indexer, indexes, starting_indexes, exit, None)?;
Ok(())
}
pub fn compute_rest(
&mut self,
indexer: &mut Indexer,
indexes: &mut indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
txindex: Option<&mut StoredVec<Txindex, T>>,
) -> color_eyre::Result<()> {
let txindex = txindex.unwrap_or_else(|| self.txindex.as_mut().unwrap().mut_vec());
self.txindex_extra
.extend(starting_indexes.txindex, self.txindex.mut_vec(), exit)?;
.extend(starting_indexes.txindex, txindex, exit)?;
self.height.compute(
starting_indexes.height,
self.txindex.mut_vec(),
txindex,
indexer.mut_vecs().height_to_first_txindex.mut_vec(),
indexes.height_to_last_txindex.mut_vec(),
exit,
@@ -164,9 +189,9 @@ where
Ok(())
}
pub fn any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
pub fn any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
[
vec![self.txindex.any_vec()],
self.txindex.as_ref().map_or(vec![], |v| vec![v.any_vec()]),
self.txindex_extra.any_vecs(),
self.height.any_vecs(),
self.dateindex.any_vecs(),
+57 -36
View File
@@ -6,7 +6,7 @@ use brk_core::{
};
use brk_exit::Exit;
use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Version};
use brk_vec::{Compressed, Version};
use super::ComputedVec;
@@ -345,7 +345,7 @@ impl Vecs {
|(h, d, s, ..)| {
let d = h
.decremented()
.and_then(|h| s.get(h).ok())
.and_then(|h| s.cached_get(h).ok())
.flatten()
.map_or(d, |prev_d| {
let prev_d = *prev_d;
@@ -365,9 +365,8 @@ impl Vecs {
let starting_dateindex = self
.height_to_dateindex
.get(starting_indexes.height.decremented().unwrap_or_default())?
.copied()
.unwrap_or_default();
.cached_get(starting_indexes.height.decremented().unwrap_or_default())?
.map_or_else(Default::default, |v| v.into_inner());
self.height_to_dateindex.compute_transform(
starting_indexes.height,
@@ -378,8 +377,8 @@ impl Vecs {
let starting_dateindex = if let Some(dateindex) = self
.height_to_dateindex
.get(starting_indexes.height.decremented().unwrap_or_default())?
.copied()
.cached_get(starting_indexes.height.decremented().unwrap_or_default())?
.map(|v| v.into_inner())
{
starting_dateindex.min(dateindex)
} else {
@@ -451,9 +450,8 @@ impl Vecs {
let starting_weekindex = self
.dateindex_to_weekindex
.get(starting_dateindex)?
.copied()
.unwrap_or_default();
.cached_get(starting_dateindex)?
.map_or_else(Default::default, |v| v.into_inner());
self.dateindex_to_weekindex.compute_transform(
starting_dateindex,
@@ -487,7 +485,12 @@ impl Vecs {
self.weekindex_to_timestamp.compute_transform(
starting_weekindex,
self.weekindex_to_first_dateindex.mut_vec(),
|(i, d, ..)| (i, *self.dateindex_to_timestamp.get(d).unwrap().unwrap()),
|(i, d, ..)| {
(
i,
*self.dateindex_to_timestamp.cached_get(d).unwrap().unwrap(),
)
},
exit,
)?;
@@ -495,9 +498,8 @@ impl Vecs {
let starting_monthindex = self
.dateindex_to_monthindex
.get(starting_dateindex)?
.copied()
.unwrap_or_default();
.cached_get(starting_dateindex)?
.map_or_else(Default::default, |v| v.into_inner());
self.dateindex_to_monthindex.compute_transform(
starting_dateindex,
@@ -533,7 +535,12 @@ impl Vecs {
self.monthindex_to_timestamp.compute_transform(
starting_monthindex,
self.monthindex_to_first_dateindex.mut_vec(),
|(i, d, ..)| (i, *self.dateindex_to_timestamp.get(d).unwrap().unwrap()),
|(i, d, ..)| {
(
i,
*self.dateindex_to_timestamp.cached_get(d).unwrap().unwrap(),
)
},
exit,
)?;
@@ -541,9 +548,8 @@ impl Vecs {
let starting_quarterindex = self
.monthindex_to_quarterindex
.get(starting_monthindex)?
.copied()
.unwrap_or_default();
.cached_get(starting_monthindex)?
.map_or_else(Default::default, |v| v.into_inner());
self.monthindex_to_quarterindex.compute_transform(
starting_monthindex,
@@ -579,7 +585,12 @@ impl Vecs {
self.quarterindex_to_timestamp.compute_transform(
starting_quarterindex,
self.quarterindex_to_first_monthindex.mut_vec(),
|(i, m, ..)| (i, *self.monthindex_to_timestamp.get(m).unwrap().unwrap()),
|(i, m, ..)| {
(
i,
*self.monthindex_to_timestamp.cached_get(m).unwrap().unwrap(),
)
},
exit,
)?;
@@ -587,9 +598,8 @@ impl Vecs {
let starting_yearindex = self
.monthindex_to_yearindex
.get(starting_monthindex)?
.copied()
.unwrap_or_default();
.cached_get(starting_monthindex)?
.map_or_else(Default::default, |v| v.into_inner());
self.monthindex_to_yearindex.compute_transform(
starting_monthindex,
@@ -625,7 +635,12 @@ impl Vecs {
self.yearindex_to_timestamp.compute_transform(
starting_yearindex,
self.yearindex_to_first_monthindex.mut_vec(),
|(i, m, ..)| (i, *self.monthindex_to_timestamp.get(m).unwrap().unwrap()),
|(i, m, ..)| {
(
i,
*self.monthindex_to_timestamp.cached_get(m).unwrap().unwrap(),
)
},
exit,
)?;
@@ -633,9 +648,8 @@ impl Vecs {
let starting_decadeindex = self
.yearindex_to_decadeindex
.get(starting_yearindex)?
.copied()
.unwrap_or_default();
.cached_get(starting_yearindex)?
.map_or_else(Default::default, |v| v.into_inner());
self.yearindex_to_decadeindex.compute_transform(
starting_yearindex,
@@ -669,7 +683,12 @@ impl Vecs {
self.decadeindex_to_timestamp.compute_transform(
starting_decadeindex,
self.decadeindex_to_first_yearindex.mut_vec(),
|(i, y, ..)| (i, *self.yearindex_to_timestamp.get(y).unwrap().unwrap()),
|(i, y, ..)| {
(
i,
*self.yearindex_to_timestamp.cached_get(y).unwrap().unwrap(),
)
},
exit,
)?;
@@ -677,9 +696,8 @@ impl Vecs {
let starting_difficultyepoch = self
.height_to_difficultyepoch
.get(starting_indexes.height)?
.copied()
.unwrap_or_default();
.cached_get(starting_indexes.height)?
.map_or_else(Default::default, |v| v.into_inner());
self.height_to_difficultyepoch.compute_transform(
starting_indexes.height,
@@ -716,7 +734,11 @@ impl Vecs {
|(i, h, ..)| {
(
i,
*indexer_vecs.height_to_timestamp.get(h).unwrap().unwrap(),
*indexer_vecs
.height_to_timestamp
.cached_get(h)
.unwrap()
.unwrap(),
)
},
exit,
@@ -726,9 +748,8 @@ impl Vecs {
let starting_halvingepoch = self
.height_to_halvingepoch
.get(starting_indexes.height)?
.copied()
.unwrap_or_default();
.cached_get(starting_indexes.height)?
.map_or_else(Default::default, |v| v.into_inner());
self.height_to_halvingepoch.compute_transform(
starting_indexes.height,
@@ -765,7 +786,7 @@ impl Vecs {
// |(i, h, ..)| {
// (
// i,
// *indexer_vecs.height_to_timestamp.get(h).unwrap().unwrap(),
// *indexer_vecs.height_to_timestamp.cached_get(h).unwrap().unwrap(),
// )
// },
// exit,
@@ -784,7 +805,7 @@ impl Vecs {
})
}
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
vec![
self.dateindex_to_date.any_vec(),
self.dateindex_to_dateindex.any_vec(),
@@ -7,7 +7,7 @@ use brk_core::{
use brk_exit::Exit;
use brk_fetcher::Fetcher;
use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Version};
use brk_vec::{Compressed, Version};
use super::{
ComputedVec, Indexes,
@@ -243,8 +243,9 @@ impl Vecs {
.get_height(
h,
t,
h.decremented()
.map(|prev_h| *height_to_timestamp.get(prev_h).unwrap().unwrap()),
h.decremented().map(|prev_h| {
*height_to_timestamp.cached_get(prev_h).unwrap().unwrap()
}),
)
.unwrap();
(h, ohlc)
@@ -470,7 +471,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -479,7 +480,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -488,7 +489,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -516,7 +517,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -525,7 +526,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -534,7 +535,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -562,7 +563,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -571,7 +572,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -580,7 +581,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -608,7 +609,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -617,7 +618,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -626,7 +627,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -654,7 +655,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -663,7 +664,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -672,7 +673,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -704,7 +705,7 @@ impl Vecs {
.first
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
high: *self
@@ -713,7 +714,7 @@ impl Vecs {
.max
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
low: *self
@@ -722,7 +723,7 @@ impl Vecs {
.min
.as_mut()
.unwrap()
.get(i)
.cached_get(i)
.unwrap()
.unwrap(),
close,
@@ -765,7 +766,7 @@ impl Vecs {
Ok(())
}
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
vec![
vec![
self.dateindex_to_close_in_cents.any_vec(),
+2 -2
View File
@@ -3,7 +3,7 @@ use std::{fs, path::Path};
use brk_exit::Exit;
use brk_fetcher::Fetcher;
use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed};
use brk_vec::{AnyStoredVec, Compressed};
mod base;
mod blocks;
@@ -63,7 +63,7 @@ impl Vecs {
Ok(())
}
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
[
self.indexes.as_any_vecs(),
self.blocks.as_any_vecs(),
@@ -1,13 +1,13 @@
use std::{fs, path::Path};
use brk_core::{StoredU64, Txindex};
use brk_core::{Sats, StoredU8, StoredU32, StoredU64, Txindex, Txinindex, Txoutindex};
use brk_exit::Exit;
use brk_indexer::Indexer;
use brk_vec::{AnyStorableVec, Compressed, Version};
use brk_vec::{Compressed, DynamicVec, Version};
use super::{
ComputedVec, Indexes,
grouped::{ComputedVecsFromTxindex, StorableVecGeneatorOptions},
grouped::{ComputedVecsFromHeight, ComputedVecsFromTxindex, StorableVecGeneatorOptions},
indexes,
};
@@ -21,15 +21,24 @@ pub struct Vecs {
// pub height_to_outputcount: ComputedVec<Height, u32>,
// pub height_to_subsidy: ComputedVec<Height, u32>,
// pub height_to_totalfees: ComputedVec<Height, Sats>,
// pub height_to_txcount: ComputedVec<Height, u32>,
// pub txindex_to_fee: ComputedVec<Txindex, Sats>,
pub txindex_to_is_coinbase: ComputedVec<Txindex, bool>,
// pub txindex_to_feerate: ComputedVec<Txindex, Feerate>,
pub txindex_to_input_count: ComputedVecsFromTxindex<StoredU64>,
// pub txindex_to_input_sum: ComputedVec<Txindex, Sats>,
pub txindex_to_output_count: ComputedVecsFromTxindex<StoredU64>,
// pub txindex_to_output_sum: ComputedVec<Txindex, Sats>,
// pub txindex_to_output_value: ComputedVecsFromTxindex<Sats>,
pub txindex_to_v1: ComputedVec<Txindex, StoredU8>,
pub txindex_to_v2: ComputedVec<Txindex, StoredU8>,
pub txindex_to_v3: ComputedVec<Txindex, StoredU8>,
pub indexes_to_tx_v1: ComputedVecsFromHeight<StoredU32>,
pub indexes_to_tx_v2: ComputedVecsFromHeight<StoredU32>,
pub indexes_to_tx_v3: ComputedVecsFromHeight<StoredU32>,
// pub txinindex_to_value: ComputedVec<Txinindex, Sats>,
pub height_to_tx_count: ComputedVecsFromHeight<StoredU64>,
pub txindex_to_input_count: ComputedVecsFromTxindex<StoredU64>,
pub txindex_to_is_coinbase: ComputedVec<Txindex, bool>,
pub txindex_to_output_count: ComputedVecsFromTxindex<StoredU64>,
/// Value == 0 when Coinbase
pub txinindex_to_value: ComputedVec<Txinindex, Sats>,
}
impl Vecs {
@@ -37,6 +46,14 @@ impl Vecs {
fs::create_dir_all(path)?;
Ok(Self {
height_to_tx_count: ComputedVecsFromHeight::forced_import(
path,
"tx_count",
true,
Version::ZERO,
compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(),
)?,
// height_to_fee: StorableVec::forced_import(&path.join("height_to_fee"), Version::ZERO)?,
// height_to_input_count: StorableVec::forced_import(
// &path.join("height_to_input_count"),
@@ -65,6 +82,7 @@ impl Vecs {
txindex_to_input_count: ComputedVecsFromTxindex::forced_import(
path,
"input_count",
true,
Version::ZERO,
compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(),
@@ -72,15 +90,62 @@ impl Vecs {
txindex_to_output_count: ComputedVecsFromTxindex::forced_import(
path,
"output_count",
true,
Version::ZERO,
compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(),
)?,
// txinindex_to_value: StorableVec::forced_import(
// &path.join("txinindex_to_value"),
// txindex_to_output_value: ComputedVecsFromTxindex::forced_import(
// path,
// "output_value",
// Version::ZERO,
// compressed,
// StorableVecGeneatorOptions::default().add_sum().add_total(),
// )?,
txinindex_to_value: ComputedVec::forced_import(
&path.join("txinindex_to_value"),
Version::ZERO,
compressed,
)?,
txindex_to_v1: ComputedVec::forced_import(
&path.join("txindex_to_v1"),
Version::ZERO,
compressed,
)?,
txindex_to_v2: ComputedVec::forced_import(
&path.join("txindex_to_v2"),
Version::ZERO,
compressed,
)?,
txindex_to_v3: ComputedVec::forced_import(
&path.join("txindex_to_v3"),
Version::ZERO,
compressed,
)?,
indexes_to_tx_v1: ComputedVecsFromHeight::forced_import(
path,
"tx_v1",
true,
Version::ZERO,
compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(),
)?,
indexes_to_tx_v2: ComputedVecsFromHeight::forced_import(
path,
"tx_v2",
true,
Version::ZERO,
compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(),
)?,
indexes_to_tx_v3: ComputedVecsFromHeight::forced_import(
path,
"tx_v3",
true,
Version::ZERO,
compressed,
StorableVecGeneatorOptions::default().add_sum().add_total(),
)?,
})
}
@@ -91,7 +156,22 @@ impl Vecs {
starting_indexes: &Indexes,
exit: &Exit,
) -> color_eyre::Result<()> {
self.txindex_to_input_count.compute(
self.height_to_tx_count.compute_all(
indexer,
indexes,
starting_indexes,
exit,
|v, indexer, indexes, starting_indexes, exit| {
v.compute_count_from_indexes(
starting_indexes.height,
indexer.mut_vecs().height_to_first_txindex.mut_vec(),
indexes.height_to_last_txindex.mut_vec(),
exit,
)
},
)?;
self.txindex_to_input_count.compute_all(
indexer,
indexes,
starting_indexes,
@@ -106,7 +186,7 @@ impl Vecs {
},
)?;
self.txindex_to_output_count.compute(
self.txindex_to_output_count.compute_all(
indexer,
indexes,
starting_indexes,
@@ -121,6 +201,14 @@ impl Vecs {
},
)?;
// self.txindex_to_output_value.compute_rest(
// indexer,
// indexes,
// starting_indexes,
// exit,
// Some(indexer.mut_vecs().txoutindex_to_value.mut_vec()),
// )?;
let indexer_vecs = indexer.mut_vecs();
self.txindex_to_is_coinbase.compute_is_first_ordered(
@@ -130,23 +218,73 @@ impl Vecs {
exit,
)?;
// self.txinindex_to_value.compute_transform(
// starting_indexes.txinindex,
// indexer_vecs.txinindex_to_txoutindex.mut_vec(),
// |(txinindex, txoutindex, slf, other)| {
// let value =
// if let Ok(Some(value)) = indexer_vecs.txoutindex_to_value.read(txoutindex) {
// *value
// } else {
// dbg!(txinindex, txoutindex, slf.len(), other.len());
// panic!()
// };
// (txinindex, value)
self.txindex_to_v1.compute_transform(
starting_indexes.txindex,
indexer_vecs.txindex_to_txversion.mut_vec(),
|(i, v, ..)| (i, StoredU8::from(v)),
exit,
)?;
// self.indexes_to_tx_v1.compute_all(
// indexer,
// indexes,
// starting_indexes,
// exit,
// |vec, indexer, indexes, indexes, exit| {
// vec.compute_transform(
// starting_indexes.height,
// indexer.mut_vecs().txindex_to_txversion.mut_vec(),
// || {},
// exit,
// )?;
// },
// )?;
self.txindex_to_v2.compute_transform(
starting_indexes.txindex,
indexer_vecs.txindex_to_txversion.mut_vec(),
|(i, v, ..)| (i, StoredU8::from(v)),
exit,
)?;
// self.indexes_to_tx_v1.compute_rest(
// starting_indexes.txindex,
// indexer_vecs.txindex_to_txversion.mut_vec(),
// |(i, v, ..)| (i, StoredU8::from(v)),
// exit,
// )?;
self.txindex_to_v3.compute_transform(
starting_indexes.txindex,
indexer_vecs.txindex_to_txversion.mut_vec(),
|(i, v, ..)| (i, StoredU8::from(v)),
exit,
)?;
// self.indexes_to_tx_v1.compute_rest(
// starting_indexes.txindex,
// indexer_vecs.txindex_to_txversion.mut_vec(),
// |(i, v, ..)| (i, StoredU8::from(v)),
// exit,
// )?;
// self.vecs.txindex_to_fee.compute_transform(
self.txinindex_to_value.compute_transform(
starting_indexes.txinindex,
indexer_vecs.txinindex_to_txoutindex.mut_vec(),
|(txinindex, txoutindex, slf, other)| {
let value = if txoutindex == Txoutindex::COINBASE {
Sats::ZERO
} else if let Ok(Some(value)) = indexer_vecs
.txoutindex_to_value
.mut_vec()
.cached_get(txoutindex)
{
*value
} else {
dbg!(txinindex, txoutindex, slf.len(), other.len());
panic!()
};
(txinindex, value)
},
exit,
)?;
// self.txindex_to_fee.compute_transform(
// &mut self.vecs.txindex_to_height,
// &mut indexer.vecs().height_to_first_txindex,
// )?;
@@ -154,11 +292,21 @@ impl Vecs {
Ok(())
}
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
pub fn as_any_vecs(&self) -> Vec<&dyn brk_vec::AnyStoredVec> {
[
vec![self.txindex_to_is_coinbase.any_vec()],
vec![
self.txindex_to_is_coinbase.any_vec(),
self.txinindex_to_value.any_vec(),
self.txindex_to_v1.any_vec(),
self.txindex_to_v2.any_vec(),
self.txindex_to_v3.any_vec(),
],
self.height_to_tx_count.any_vecs(),
self.txindex_to_output_count.any_vecs(),
self.txindex_to_input_count.any_vecs(),
self.indexes_to_tx_v1.any_vecs(),
self.indexes_to_tx_v2.any_vecs(),
self.indexes_to_tx_v3.any_vecs(),
]
.concat()
}
+2
View File
@@ -22,6 +22,7 @@ mod quarterindex;
mod sats;
mod stored_u32;
mod stored_u64;
mod stored_u8;
mod stored_usize;
mod timestamp;
mod txid;
@@ -58,6 +59,7 @@ pub use monthindex::*;
pub use ohlc::*;
pub use quarterindex::*;
pub use sats::*;
pub use stored_u8::*;
pub use stored_u32::*;
pub use stored_u64::*;
pub use stored_usize::*;
+7 -1
View File
@@ -6,7 +6,7 @@ use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::CheckedSub;
use super::{Txinindex, Txoutindex};
use super::{Txindex, Txinindex, Txoutindex};
#[derive(
Debug,
@@ -80,6 +80,12 @@ impl From<StoredU64> for f64 {
}
}
impl From<Txindex> for StoredU64 {
fn from(value: Txindex) -> Self {
Self(*value as u64)
}
}
impl From<Txinindex> for StoredU64 {
fn from(value: Txinindex) -> Self {
Self(*value)
+79
View File
@@ -0,0 +1,79 @@
use std::ops::{Add, Div};
use derive_deref::Deref;
use serde::Serialize;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::CheckedSub;
#[derive(
Debug,
Deref,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
FromBytes,
Immutable,
IntoBytes,
KnownLayout,
Serialize,
)]
pub struct StoredU8(u8);
impl StoredU8 {
pub const ZERO: Self = Self(0);
pub fn new(counter: u8) -> Self {
Self(counter)
}
}
impl From<u8> for StoredU8 {
fn from(value: u8) -> Self {
Self(value)
}
}
impl From<usize> for StoredU8 {
fn from(value: usize) -> Self {
Self(value as u8)
}
}
impl CheckedSub<StoredU8> for StoredU8 {
fn checked_sub(self, rhs: Self) -> Option<Self> {
self.0.checked_sub(rhs.0).map(Self)
}
}
impl Div<usize> for StoredU8 {
type Output = Self;
fn div(self, rhs: usize) -> Self::Output {
Self(self.0 / rhs as u8)
}
}
impl Add for StoredU8 {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
Self(self.0 + rhs.0)
}
}
impl From<f64> for StoredU8 {
fn from(value: f64) -> Self {
if value < 0.0 || value > u32::MAX as f64 {
panic!()
}
Self(value as u8)
}
}
impl From<StoredU8> for f64 {
fn from(value: StoredU8) -> Self {
value.0 as f64
}
}
+8
View File
@@ -2,6 +2,8 @@ use derive_deref::Deref;
use serde::Serialize;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use super::StoredU8;
#[derive(Debug, Deref, Clone, Copy, Immutable, IntoBytes, KnownLayout, FromBytes, Serialize)]
pub struct TxVersion(i32);
@@ -16,3 +18,9 @@ impl From<TxVersion> for bitcoin::transaction::Version {
Self(value.0)
}
}
impl From<TxVersion> for StoredU8 {
fn from(value: TxVersion) -> Self {
Self::from(value.0 as u8)
}
}
+9 -5
View File
@@ -58,9 +58,13 @@ Stores: `src/storage/stores/mod.rs`
## Benchmark
Indexing `0..885_835` took `11 hours 6 min 50 s` on a Macbook Pro M3 Pro with 36 GB of RAM
### Result 1 - 2025-04-10
`footprint` report:
- Peak memory: `5115 MB`
- Memory while waiting for a new block: `890 MB`
- Reclaimable memory: `6478 MB`
- version: `v0.0.20`
- machine: `Macbook Pro M3 Pro (36GB RAM)`
- mode: `raw`
- from: `0`
- to: `891_810`
- time: `8 hours 27 min 3s`
- peak memory: `6.5GB`
- disk usage: `270 GB`
+5 -1
View File
@@ -1,4 +1,4 @@
use std::path::Path;
use std::{path::Path, time::Instant};
use brk_core::default_bitcoin_path;
use brk_exit::Exit;
@@ -8,6 +8,8 @@ use brk_parser::{Parser, rpc};
fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
let i = Instant::now();
brk_logger::init(Some(Path::new(".log")));
let bitcoin_dir = default_bitcoin_path();
@@ -29,5 +31,7 @@ fn main() -> color_eyre::Result<()> {
indexer.index(&parser, rpc, &exit)?;
dbg!(i.elapsed());
Ok(())
}
+36 -86
View File
@@ -1,21 +1,24 @@
use std::{
cmp::Ordering,
fmt::Debug,
io,
path::{Path, PathBuf},
};
use brk_vec::{
AnyStorableVec, Compressed, Error, MAX_CACHE_SIZE, MAX_PAGE_SIZE, Result, StorableVec,
StoredIndex, StoredType, Value, Version,
Compressed, DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, StoredVec, Value,
Version,
};
use super::Height;
#[derive(Debug)]
pub struct IndexedVec<I, T> {
#[derive(Debug, Clone)]
pub struct IndexedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
height: Option<Height>,
vec: StorableVec<I, T>,
inner: StoredVec<I, T>,
}
impl<I, T> IndexedVec<I, T>
@@ -23,91 +26,51 @@ where
I: StoredIndex,
T: StoredType,
{
pub const SIZE_OF_T: usize = size_of::<T>();
pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T;
pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T;
pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE;
pub fn forced_import(
path: &Path,
version: Version,
compressed: Compressed,
) -> brk_vec::Result<Self> {
let mut vec = StorableVec::forced_import(path, version, compressed)?;
let mut inner = StoredVec::forced_import(path, version, compressed)?;
vec.enable_large_cache();
inner.enable_large_cache_if_needed();
Ok(Self {
height: Height::try_from(Self::path_height_(path).as_path()).ok(),
vec,
inner,
})
}
#[inline]
pub fn get(&self, index: I) -> Result<Option<Value<'_, T>>> {
self.get_(index.to_usize()?)
self.inner.get(index)
}
fn get_(&self, index: usize) -> Result<Option<Value<'_, T>>> {
match self.vec.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.vec.pushed().get(index).map(|v| Value::Ref(v)));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
let large_cache_len = self.vec.large_cache_len();
if large_cache_len != 0 {
let page_index = Self::index_to_page_index(index);
let last_index = self.vec.stored_len() - 1;
let max_page_index = Self::index_to_page_index(last_index);
let min_page_index = (max_page_index + 1) - large_cache_len;
if page_index >= min_page_index {
let values = self
.vec
.pages()
.unwrap()
.get(page_index - min_page_index)
.ok_or(Error::MmapsVecIsTooSmall)?
.get_or_init(|| self.vec.decode_page(page_index).unwrap());
return Ok(values.get(index)?.map(|v| Value::Ref(v)));
}
}
Ok(self.vec.read_(index)?.map(|v| Value::Owned(v)))
#[inline]
pub fn cached_get(&mut self, index: I) -> Result<Option<Value<'_, T>>> {
self.inner.cached_get(index)
}
pub fn iter_from<F>(&mut self, index: I, f: F) -> Result<()>
where
F: FnMut((I, &T)) -> Result<()>,
F: FnMut((I, T, &mut dyn DynamicVec<I = I, T = T>)) -> Result<()>,
{
self.vec.iter_from(index, f)
}
#[inline(always)]
fn index_to_page_index(index: usize) -> usize {
index / Self::PER_PAGE
self.inner.iter_from(index, f)
}
#[inline]
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
match self.vec.len().cmp(&index.to_usize()?) {
match self.inner.len().cmp(&index.to_usize()?) {
Ordering::Greater => {
// dbg!(len, index, &self.pathbuf);
// panic!();
Ok(())
}
Ordering::Equal => {
self.vec.push(value);
self.inner.push(value);
Ok(())
}
Ordering::Less => {
dbg!(index, value, self.vec.len(), self.path_height());
dbg!(index, value, self.inner.len(), self.path_height());
Err(Error::IndexTooHigh)
}
}
@@ -117,67 +80,54 @@ where
if self.height.is_none_or(|self_height| self_height != height) {
height.write(&self.path_height())?;
}
self.vec.truncate_if_needed(index)?;
self.inner.truncate_if_needed(index)?;
Ok(())
}
pub fn flush(&mut self, height: Height) -> io::Result<()> {
pub fn flush(&mut self, height: Height) -> Result<()> {
height.write(&self.path_height())?;
self.vec.flush()
self.inner.flush()
}
pub fn vec(&self) -> &StorableVec<I, T> {
&self.vec
pub fn vec(&self) -> &StoredVec<I, T> {
&self.inner
}
pub fn mut_vec(&mut self) -> &mut StorableVec<I, T> {
&mut self.vec
pub fn mut_vec(&mut self) -> &mut StoredVec<I, T> {
&mut self.inner
}
pub fn any_vec(&self) -> &dyn AnyStorableVec {
&self.vec
pub fn any_vec(&self) -> &dyn brk_vec::AnyStoredVec {
&self.inner
}
pub fn len(&self) -> usize {
self.vec.len()
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.vec.is_empty()
self.inner.is_empty()
}
#[inline]
pub fn hasnt(&self, index: I) -> Result<bool> {
self.vec.has(index).map(|b| !b)
self.inner.has(index).map(|b| !b)
}
pub fn height(&self) -> brk_core::Result<Height> {
Height::try_from(self.path_height().as_path())
}
fn path_height(&self) -> PathBuf {
Self::path_height_(self.vec.path())
Self::path_height_(self.inner.path())
}
fn path_height_(path: &Path) -> PathBuf {
path.join("height")
}
}
impl<I, T> Clone for IndexedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn clone(&self) -> Self {
Self {
height: self.height,
vec: self.vec.clone(),
}
}
}
pub trait AnyIndexedVec: Send + Sync {
fn height(&self) -> brk_core::Result<Height>;
fn flush(&mut self, height: Height) -> io::Result<()>;
fn flush(&mut self, height: Height) -> Result<()>;
}
impl<I, T> AnyIndexedVec for IndexedVec<I, T>
@@ -189,7 +139,7 @@ where
self.height()
}
fn flush(&mut self, height: Height) -> io::Result<()> {
fn flush(&mut self, height: Height) -> Result<()> {
self.flush(height)
}
}
+4 -5
View File
@@ -1,4 +1,4 @@
use std::{fs, io, path::Path};
use std::{fs, path::Path};
use brk_core::{
Addressbytes, Addressindex, Addresstype, Addresstypeindex, BlockHash, Emptyindex, Height,
@@ -7,7 +7,7 @@ use brk_core::{
P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, Sats,
StoredUsize, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight,
};
use brk_vec::{AnyStorableVec, Compressed, Version};
use brk_vec::{AnyStoredVec, Compressed, Result, Version};
use rayon::prelude::*;
use crate::Indexes;
@@ -374,7 +374,6 @@ impl Vecs {
pub fn rollback_if_needed(&mut self, starting_indexes: &Indexes) -> brk_vec::Result<()> {
let saved_height = starting_indexes.height.decremented().unwrap_or_default();
// Now we can cut everything that's out of date
let &Indexes {
addressindex,
height,
@@ -595,7 +594,7 @@ impl Vecs {
}
}
pub fn flush(&mut self, height: Height) -> io::Result<()> {
pub fn flush(&mut self, height: Height) -> Result<()> {
self.as_mut_any_vecs()
.into_par_iter()
.try_for_each(|vec| vec.flush(height))
@@ -609,7 +608,7 @@ impl Vecs {
.unwrap()
}
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStorableVec> {
pub fn as_any_vecs(&self) -> Vec<&dyn AnyStoredVec> {
vec![
self.addressindex_to_addresstype.any_vec(),
self.addressindex_to_addresstypeindex.any_vec(),
+4 -4
View File
@@ -5,7 +5,7 @@
use brk_computer::Computer;
use brk_indexer::Indexer;
use brk_vec::AnyStorableVec;
use brk_vec::AnyStoredVec;
use tabled::settings::Style;
mod format;
@@ -51,7 +51,7 @@ impl<'a> Query<'a> {
}
}
pub fn search(&self, index: Index, ids: &[&str]) -> Vec<(String, &&dyn AnyStorableVec)> {
pub fn search(&self, index: Index, ids: &[&str]) -> Vec<(String, &&dyn AnyStoredVec)> {
let tuples = ids
.iter()
.flat_map(|s| {
@@ -86,7 +86,7 @@ impl<'a> Query<'a> {
pub fn format(
&self,
vecs: Vec<(String, &&dyn AnyStorableVec)>,
vecs: Vec<(String, &&dyn AnyStoredVec)>,
from: Option<i64>,
to: Option<i64>,
format: Option<Format>,
@@ -94,7 +94,7 @@ impl<'a> Query<'a> {
let mut values = vecs
.iter()
.map(|(_, vec)| -> brk_vec::Result<Vec<serde_json::Value>> {
vec.collect_range_values(from, to)
vec.collect_range_serde_json(from, to)
})
.collect::<brk_vec::Result<Vec<_>>>()?;
+4 -4
View File
@@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use brk_vec::AnyStorableVec;
use brk_vec::AnyStoredVec;
use derive_deref::{Deref, DerefMut};
use super::index::Index;
@@ -13,7 +13,7 @@ pub struct VecTrees<'a> {
impl<'a> VecTrees<'a> {
// Not the most performant or type safe but only built once so that's okay
pub fn insert(&mut self, vec: &'a dyn AnyStorableVec) {
pub fn insert(&mut self, vec: &'a dyn AnyStoredVec) {
let file_name = vec.file_name();
let split = file_name.split("_to_").collect::<Vec<_>>();
if split.len() != 2 {
@@ -88,7 +88,7 @@ impl<'a> VecTrees<'a> {
}
#[derive(Default, Deref, DerefMut)]
pub struct IndexToVec<'a>(BTreeMap<Index, &'a dyn AnyStorableVec>);
pub struct IndexToVec<'a>(BTreeMap<Index, &'a dyn AnyStoredVec>);
#[derive(Default, Deref, DerefMut)]
pub struct IdToVec<'a>(BTreeMap<String, &'a dyn AnyStorableVec>);
pub struct IdToVec<'a>(BTreeMap<String, &'a dyn AnyStoredVec>);
+1 -1
View File
@@ -7,7 +7,7 @@ license.workspace = true
repository.workspace = true
[dependencies]
axum = "0.8.3"
axum = { workspace = true }
brk_computer = { workspace = true }
brk_exit = { workspace = true }
brk_fetcher = { workspace = true }
+3 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "brk_vec"
description = "A very small, fast, efficient and simple storable Vec"
description = "A push-only, truncable, compressable, saveable Vec"
keywords = ["vec", "disk", "data"]
categories = ["database"]
version.workspace = true
@@ -9,6 +9,8 @@ license.workspace = true
repository.workspace = true
[dependencies]
axum = { workspace = true }
arc-swap = "1.7.1"
memmap2 = "0.9.5"
rayon = { workspace = true }
serde = { workspace = true }
-16
View File
@@ -39,19 +39,3 @@ A `Vec` (an array) that is stored on disk and thus which can be much larger than
Compared to a key/value store, the data stored is raw byte interpretation of the Vec's values without any overhead which is very efficient. Additionally it uses close to no RAM when caching isn't active and up to 100 MB when it is.
Compression is also available and built on top [`zstd`](https://crates.io/crates/zstd) to save even more space (from 0 to 75%). The tradeoff being slower reading speeds, especially random reading speeds. This is due to the data being stored in compressed pages of 16 KB, which means that if you to read even one value in that page you have to uncompress the whole page.
## Disclaimer
Portability will depend on the type of values.
Non bytes/slices types (`u8`, `u16`, ...) will be read as slice in an unsafe manner (using `std::slice::from_raw_parts`) and thus have the endianness of the system. On the other hand, `&[u8]` should be inserted as is.
If portability is important to you, just create a wrapper struct which has custom `get`, `push`, ... methods and does something like:
```rust
impl StorableVecU64 {
pub fn push(&mut self, value: u64) {
self.push(&value.to_be_bytes())
}
}
```
+14 -11
View File
@@ -1,13 +1,16 @@
use std::{fs, path::Path};
use brk_vec::{Compressed, StorableVec, Version};
use brk_vec::{Compressed, DynamicVec, GenericVec, StoredVec, Version};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = fs::remove_dir_all("./vec");
let version = Version::ZERO;
let compressed = Compressed::YES;
{
let mut vec: StorableVec<usize, u32> =
StorableVec::forced_import(Path::new("./vec"), Version::ZERO, Compressed::YES)?;
let mut vec: StoredVec<usize, u32> =
StoredVec::forced_import(Path::new("./vec"), version, compressed)?;
(0..21_u32).for_each(|v| {
vec.push(v);
@@ -20,8 +23,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
{
let mut vec: StorableVec<usize, u32> =
StorableVec::forced_import(Path::new("./vec"), Version::ZERO, Compressed::YES)?;
let mut vec: StoredVec<usize, u32> =
StoredVec::forced_import(Path::new("./vec"), version, compressed)?;
dbg!(vec.get(0)?);
dbg!(vec.get(0)?);
@@ -42,10 +45,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
{
let mut vec: StorableVec<usize, u32> =
StorableVec::forced_import(Path::new("./vec"), Version::ZERO, Compressed::YES)?;
let mut vec: StoredVec<usize, u32> =
StoredVec::forced_import(Path::new("./vec"), version, compressed)?;
vec.enable_large_cache();
vec.enable_large_cache_if_needed();
dbg!(vec.get(0)?);
dbg!(vec.get(20)?);
@@ -58,17 +61,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
dbg!(vec.get(5)?);
dbg!(vec.get(20)?);
vec.iter(|(_, v)| {
vec.iter(|(_, v, ..)| {
dbg!(v);
Ok(())
})?;
vec.iter_from(5, |(_, v)| {
vec.iter_from(5, |(_, v, ..)| {
dbg!(v);
Ok(())
})?;
dbg!(vec.collect_range(Some(-5), None)?);
dbg!(vec.collect_signed_range(Some(-5), None)?);
}
Ok(())
+10
View File
@@ -15,6 +15,7 @@ pub enum Error {
IO(io::Error),
ZeroCopyError,
IndexTooHigh,
EmptyVec,
IndexTooLow,
ExpectFileToHaveIndex,
ExpectVecToHaveIndex,
@@ -22,6 +23,7 @@ pub enum Error {
UnsupportedUnflushedState,
RangeFromAfterTo(usize, usize),
DifferentCompressionMode,
ToSerdeJsonValueError(serde_json::Error),
}
impl From<io::Error> for Error {
@@ -42,6 +44,12 @@ impl<A, B> From<zerocopy::error::SizeError<A, B>> for Error {
}
}
impl From<serde_json::Error> for Error {
fn from(error: serde_json::Error) -> Self {
Self::ToSerdeJsonValueError(error)
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
@@ -68,6 +76,8 @@ impl fmt::Display for Error {
Error::ZeroCopyError => write!(f, "Zero copy convert error"),
Error::RangeFromAfterTo(from, to) => write!(f, "Range, from {from} is after to {to}"),
Error::DifferentCompressionMode => write!(f, "Different compression mode chosen"),
Error::EmptyVec => write!(f, "The Vec is empty, maybe wait for a bit"),
Error::ToSerdeJsonValueError(error) => Debug::fmt(&error, f),
}
}
}
-2
View File
@@ -1,7 +1,5 @@
mod error;
mod value;
mod values;
pub use error::*;
pub use value::*;
pub use values::*;
-83
View File
@@ -1,83 +0,0 @@
use std::ops::Range;
use memmap2::Mmap;
use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes};
use crate::MAX_PAGE_SIZE;
use super::Result;
#[derive(Debug)]
pub enum Values<T> {
Owned(Box<[T]>),
Ref(Box<Mmap>),
}
impl<T> Values<T> {
const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T;
const SIZE_OF_T: usize = size_of::<T>();
pub fn get(&self, index: usize) -> Result<Option<&T>>
where
T: TryFromBytes + IntoBytes + Immutable + KnownLayout,
{
let index = Self::index_to_decoded_index(index);
Ok(match self {
Self::Owned(a) => a.get(index),
Self::Ref(m) => {
let range = Self::index_to_byte_range(index);
let source = &m[range];
Some(T::try_ref_from_bytes(source)?)
}
})
}
pub fn as_arr(&self) -> &[T] {
match self {
Self::Owned(a) => a,
Self::Ref(_) => unreachable!(),
}
}
pub fn as_mmap(&self) -> &Mmap {
match self {
Self::Owned(_) => unreachable!(),
Self::Ref(m) => m,
}
}
#[inline]
fn index_to_byte_range(index: usize) -> Range<usize> {
let index = Self::index_to_byte_index(index) as usize;
index..(index + Self::SIZE_OF_T)
}
#[inline]
fn index_to_byte_index(index: usize) -> u64 {
(index * Self::SIZE_OF_T) as u64
}
#[inline(always)]
fn index_to_decoded_index(index: usize) -> usize {
index % Self::PER_PAGE
}
}
impl<T> From<Box<[T]>> for Values<T> {
fn from(value: Box<[T]>) -> Self {
Self::Owned(value)
}
}
impl<T> From<Mmap> for Values<T> {
fn from(value: Mmap) -> Self {
Self::Ref(Box::new(value))
}
}
impl<T> Default for Values<T> {
fn default() -> Self {
Self::Owned(vec![].into_boxed_slice())
}
}
+198 -820
View File
File diff suppressed because it is too large Load Diff
@@ -22,8 +22,10 @@ impl CompressedPagesMetadata {
const PAGE_SIZE: usize = size_of::<CompressedPageMetadata>();
pub fn read(path: &Path) -> Result<CompressedPagesMetadata> {
let path = path.join("pages_meta");
let slf = Self {
vec: fs::read(path)
vec: fs::read(&path)
.unwrap_or_default()
.chunks(Self::PAGE_SIZE)
.map(|bytes| {
-61
View File
@@ -1,61 +0,0 @@
use std::{io, path::PathBuf};
use crate::{Result, StorableVec};
use super::{StoredIndex, StoredType};
pub trait AnyStorableVec: Send + Sync {
fn file_name(&self) -> String;
fn index_type_to_string(&self) -> &str;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn collect_range_values(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Vec<serde_json::Value>>;
fn flush(&mut self) -> io::Result<()>;
fn path_vec(&self) -> PathBuf;
}
impl<I, T> AnyStorableVec for StorableVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn file_name(&self) -> String {
self.file_name()
}
fn index_type_to_string(&self) -> &str {
self.index_type_to_string()
}
fn len(&self) -> usize {
self.len()
}
fn is_empty(&self) -> bool {
self.is_empty()
}
fn flush(&mut self) -> io::Result<()> {
self.flush()
}
fn collect_range_values(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Vec<serde_json::Value>> {
Ok(self
.collect_range(from, to)?
.into_iter()
.map(|v| serde_json::to_value(v).unwrap())
.collect::<Vec<_>>())
}
fn path_vec(&self) -> PathBuf {
self.base().path_vec()
}
}
+116
View File
@@ -0,0 +1,116 @@
use std::sync::Arc;
use arc_swap::{ArcSwap, Guard};
use memmap2::Mmap;
use crate::{Error, Result, Value};
use super::{StoredIndex, StoredType};
pub trait DynamicVec: Send + Sync {
type I: StoredIndex;
type T: StoredType;
#[inline]
fn get(&self, index: Self::I) -> Result<Option<Value<Self::T>>> {
self.get_(index.to_usize()?)
}
#[inline]
fn cached_get(&mut self, index: Self::I) -> Result<Option<Value<Self::T>>> {
self.get_(index.to_usize()?)
}
#[inline]
fn get_(&self, index: usize) -> Result<Option<Value<Self::T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed().get(index).map(Value::Ref));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
Ok(self
.get_stored_(index.to_usize()?, self.guard().as_ref().unwrap())?
.map(Value::Owned))
}
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<Self::T>>;
fn get_last(&self) -> Result<Option<Value<Self::T>>> {
let len = self.len();
if len == 0 {
return Ok(None);
}
self.get_(len - 1)
}
#[inline]
fn cached_get_(&mut self, index: usize) -> Result<Option<Value<Self::T>>> {
match self.index_to_pushed_index(index) {
Ok(index) => {
if let Some(index) = index {
return Ok(self.pushed().get(index).map(Value::Ref));
}
}
Err(Error::IndexTooHigh) => return Ok(None),
Err(Error::IndexTooLow) => {}
Err(error) => return Err(error),
}
let mmap = Arc::clone(self.guard().as_ref().unwrap());
Ok(self
.cached_get_stored_(index.to_usize()?, &mmap)?
.map(Value::Owned))
}
fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result<Option<Self::T>>;
fn cached_get_last(&mut self) -> Result<Option<Value<Self::T>>> {
let len = self.len();
if len == 0 {
return Ok(None);
}
self.cached_get_(len - 1)
}
#[inline]
fn len(&self) -> usize {
self.stored_len() + self.pushed_len()
}
#[inline]
fn is_empty(&self) -> bool {
self.len() == 0
}
fn mmap(&self) -> &ArcSwap<Mmap>;
fn guard(&self) -> &Option<Guard<Arc<Mmap>>>;
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>>;
fn stored_len(&self) -> usize;
fn pushed(&self) -> &[Self::T];
#[inline]
fn pushed_len(&self) -> usize {
self.pushed().len()
}
fn mut_pushed(&mut self) -> &mut Vec<Self::T>;
#[inline]
fn push(&mut self, value: Self::T) {
self.mut_pushed().push(value)
}
#[inline]
fn index_to_pushed_index(&self, index: usize) -> Result<Option<usize>> {
let stored_len = self.stored_len();
if index >= stored_len {
let index = index - stored_len;
if index >= self.pushed_len() {
Err(Error::IndexTooHigh)
} else {
Ok(Some(index))
}
} else {
Err(Error::IndexTooLow)
}
}
}
+209
View File
@@ -0,0 +1,209 @@
use std::{
fs::{File, OpenOptions},
io::{self, Seek, SeekFrom, Write},
path::{Path, PathBuf},
sync::Arc,
};
use axum::{
Json,
response::{IntoResponse, Response},
};
use memmap2::Mmap;
use serde_json::Value;
use crate::{Error, Result, Version};
use super::{DynamicVec, StoredIndex, StoredType};
pub trait GenericVec<I, T>: DynamicVec<I = I, T = T>
where
I: StoredIndex,
T: StoredType,
{
const SIZE_OF_T: usize = size_of::<Self::T>();
fn open_file(&self) -> io::Result<File> {
Self::open_file_(&self.path_vec())
}
fn open_file_(path: &Path) -> io::Result<File> {
OpenOptions::new()
.read(true)
.create(true)
.truncate(false)
.append(true)
.open(path)
}
fn file_set_len(&mut self, len: u64) -> Result<()> {
let mut file = self.open_file()?;
Self::file_set_len_(&mut file, len)?;
self.update_mmap(file)
}
fn file_set_len_(file: &mut File, len: u64) -> Result<()> {
file.set_len(len)?;
file.seek(SeekFrom::End(0))?;
Ok(())
}
fn file_write_all(&mut self, buf: &[u8]) -> Result<()> {
let mut file = self.open_file()?;
file.write_all(buf)?;
self.update_mmap(file)
}
fn file_truncate_and_write_all(&mut self, len: u64, buf: &[u8]) -> Result<()> {
let mut file = self.open_file()?;
Self::file_set_len_(&mut file, len)?;
file.write_all(buf)?;
self.update_mmap(file)
}
#[inline]
fn reset(&mut self) -> Result<()> {
self.file_write_all(&[])?;
Ok(())
}
fn new_mmap(file: File) -> Result<Arc<Mmap>> {
Ok(Arc::new(unsafe { Mmap::map(&file)? }))
}
fn update_mmap(&mut self, file: File) -> Result<()> {
let mmap = Self::new_mmap(file)?;
self.mmap().store(mmap);
if self.guard().is_some() {
let guard = self.mmap().load();
self.mut_guard().replace(guard);
} else {
unreachable!("This function shouldn't be called in a cloned instance")
}
Ok(())
}
#[inline]
fn is_pushed_empty(&self) -> bool {
self.pushed_len() == 0
}
#[inline]
fn has(&self, index: Self::I) -> Result<bool> {
Ok(self.has_(index.to_usize()?))
}
#[inline]
fn has_(&self, index: usize) -> bool {
index < self.len()
}
#[inline]
fn index_type_to_string(&self) -> &str {
Self::I::to_string()
}
#[inline]
fn iter<F>(&mut self, f: F) -> Result<()>
where
F: FnMut(
(
Self::I,
Self::T,
&mut dyn DynamicVec<I = Self::I, T = Self::T>,
),
) -> Result<()>,
{
self.iter_from(Self::I::default(), f)
}
fn iter_from<F>(&mut self, index: Self::I, f: F) -> Result<()>
where
F: FnMut(
(
Self::I,
Self::T,
&mut dyn DynamicVec<I = Self::I, T = Self::T>,
),
) -> Result<()>;
fn flush(&mut self) -> Result<()>;
fn truncate_if_needed(&mut self, index: Self::I) -> Result<()>;
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<Self::T>>;
#[inline]
fn collect_inclusive_range(&self, from: I, to: I) -> Result<Vec<Self::T>> {
self.collect_range(Some(from.to_usize()?), Some(to.to_usize()? + 1))
}
#[inline]
fn i64_to_usize(i: i64, len: usize) -> usize {
if i >= 0 {
i as usize
} else {
let v = len as i64 + i;
if v < 0 { 0 } else { v as usize }
}
}
fn collect_signed_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Self::T>> {
let len = self.len();
let from = from.map(|i| Self::i64_to_usize(i, len));
let to = to.map(|i| Self::i64_to_usize(i, len));
self.collect_range(from, to)
}
#[inline]
fn collect_range_axum_json(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Json<Vec<Self::T>>> {
Ok(Json(self.collect_signed_range(from, to)?))
}
#[inline]
fn collect_range_serde_json(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Value>> {
self.collect_signed_range(from, to)?
.into_iter()
.map(|v| serde_json::to_value(v).map_err(Error::from))
.collect::<Result<Vec<_>>>()
}
#[inline]
fn collect_range_response(&self, from: Option<i64>, to: Option<i64>) -> Result<Response> {
Ok(self.collect_range_axum_json(from, to)?.into_response())
}
fn path(&self) -> &Path;
#[inline]
fn path_vec(&self) -> PathBuf {
Self::path_vec_(self.path())
}
#[inline]
fn path_vec_(path: &Path) -> PathBuf {
path.join("vec")
}
#[inline]
fn path_version_(path: &Path) -> PathBuf {
path.join("version")
}
#[inline]
fn path_compressed_(path: &Path) -> PathBuf {
path.join("compressed")
}
#[inline]
fn file_name(&self) -> String {
self.path()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_owned()
}
fn version(&self) -> Version;
}
+4 -3
View File
@@ -1,8 +1,9 @@
mod any;
// mod bytes;
mod dynamic;
mod generic;
mod stored_index;
mod stored_type;
pub use any::*;
pub use dynamic::*;
pub use generic::*;
pub use stored_index::*;
pub use stored_type::*;
+20 -2
View File
@@ -5,10 +5,28 @@ use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes};
pub trait StoredType
where
Self: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + Serialize,
Self: Sized
+ Debug
+ Clone
+ TryFromBytes
+ IntoBytes
+ Immutable
+ KnownLayout
+ Send
+ Sync
+ Serialize,
{
}
impl<T> StoredType for T where
T: Sized + Debug + Clone + TryFromBytes + IntoBytes + Immutable + KnownLayout + Send + Sync + Serialize
T: Sized
+ Debug
+ Clone
+ TryFromBytes
+ IntoBytes
+ Immutable
+ KnownLayout
+ Send
+ Sync
+ Serialize
{
}
+517
View File
@@ -0,0 +1,517 @@
use std::{
fs::{self, File},
mem,
path::Path,
sync::{Arc, OnceLock},
};
use arc_swap::{ArcSwap, Guard};
use memmap2::Mmap;
use rayon::prelude::*;
use zstd::DEFAULT_COMPRESSION_LEVEL;
use crate::{
CompressedPageMetadata, CompressedPagesMetadata, DynamicVec, Error, GenericVec, RawVec, Result,
StoredIndex, StoredType, UnsafeSlice, Version,
};
const ONE_KIB: usize = 1024;
const ONE_MIB: usize = ONE_KIB * ONE_KIB;
pub const MAX_CACHE_SIZE: usize = 100 * ONE_MIB;
pub const MAX_PAGE_SIZE: usize = 16 * ONE_KIB;
#[derive(Debug)]
pub struct CompressedVec<I, T> {
inner: RawVec<I, T>,
decoded_page: Option<(usize, Vec<T>)>,
decoded_pages: Option<Vec<OnceLock<Vec<T>>>>,
pages_meta: Arc<ArcSwap<CompressedPagesMetadata>>,
// pages: Option<Vec<OnceLock<Values<T>>>>,
// page: Option<(usize, Values<T>)>,
// length: Length
}
impl<I, T> CompressedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T;
pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T;
pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE;
/// Same as import but will reset the folder under certain errors, so be careful !
pub fn forced_import(path: &Path, version: Version) -> Result<Self> {
let res = Self::import(path, version);
match res {
Err(Error::WrongEndian)
| Err(Error::DifferentVersion { .. })
| Err(Error::DifferentCompressionMode) => {
fs::remove_dir_all(path)?;
Self::import(path, version)
}
_ => res,
}
}
pub fn import(path: &Path, version: Version) -> Result<Self> {
fs::create_dir_all(path)?;
let vec_exists = fs::exists(Self::path_vec_(path)).is_ok_and(|b| b);
let compressed_path = Self::path_compressed_(path);
let compressed_exists = fs::exists(&compressed_path).is_ok_and(|b| b);
if vec_exists && !compressed_exists {
return Err(Error::DifferentCompressionMode);
}
if !vec_exists && !compressed_exists {
File::create(&compressed_path)?;
}
Ok(Self {
inner: RawVec::import(path, version)?,
decoded_page: None,
decoded_pages: None,
pages_meta: Arc::new(ArcSwap::new(Arc::new(CompressedPagesMetadata::read(path)?))),
})
}
fn cached_get_stored__(
index: usize,
mmap: &Mmap,
stored_len: usize,
decoded_page: &mut Option<(usize, Vec<T>)>,
compressed_pages_meta: &CompressedPagesMetadata,
) -> Result<Option<T>> {
let page_index = Self::index_to_page_index(index);
if decoded_page.as_ref().is_none_or(|b| b.0 != page_index) {
let values = Self::decode_page_(stored_len, page_index, mmap, compressed_pages_meta)?;
decoded_page.replace((page_index, values));
}
Ok(decoded_page
.as_ref()
.unwrap()
.1
.get(index % Self::PER_PAGE)
.cloned())
}
fn decode_page(&self, page_index: usize, mmap: &Mmap) -> Result<Vec<T>> {
Self::decode_page_(self.stored_len(), page_index, mmap, &self.pages_meta.load())
}
fn decode_page_(
stored_len: usize,
page_index: usize,
mmap: &Mmap,
compressed_pages_meta: &CompressedPagesMetadata,
) -> Result<Vec<T>> {
if Self::page_index_to_index(page_index) >= stored_len {
return Err(Error::IndexTooHigh);
} else if compressed_pages_meta.len() <= page_index {
return Err(Error::ExpectVecToHaveIndex);
}
let page = compressed_pages_meta.get(page_index).unwrap();
let len = page.bytes_len as usize;
let offset = page.start as usize;
Ok(zstd::decode_all(&mmap[offset..offset + len])
.inspect_err(|_| {
dbg!((len, offset, page_index, &mmap[..], &mmap.len()));
})?
.chunks(Self::SIZE_OF_T)
.map(|slice| T::try_read_from_bytes(slice).unwrap())
.collect::<Vec<_>>())
}
fn compress_page(chunk: &[T]) -> Vec<u8> {
if chunk.len() > Self::PER_PAGE {
panic!();
}
let mut bytes: Vec<u8> = vec![0; chunk.len() * Self::SIZE_OF_T];
let unsafe_bytes = UnsafeSlice::new(&mut bytes);
chunk
.into_par_iter()
.enumerate()
.for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes()));
zstd::encode_all(bytes.as_slice(), DEFAULT_COMPRESSION_LEVEL).unwrap()
}
pub fn enable_large_cache(&mut self) {
self.decoded_pages.replace(vec![]);
self.reset_large_cache();
}
pub fn disable_large_cache(&mut self) {
self.decoded_pages.take();
}
fn reset_large_cache(&mut self) {
let stored_len = self.stored_len();
if let Some(pages) = self.decoded_pages.as_mut() {
pages.par_iter_mut().for_each(|lock| {
lock.take();
});
let len = (stored_len as f64 / Self::PER_PAGE as f64).ceil() as usize;
let len = Self::CACHE_LENGTH.min(len);
if pages.len() != len {
pages.resize_with(len, Default::default);
}
}
}
pub fn large_cache_len(&self) -> usize {
self.decoded_pages.as_ref().map_or(0, |v| v.len())
}
fn reset_small_cache(&mut self) {
self.decoded_page.take();
}
fn reset_caches(&mut self) {
self.reset_small_cache();
self.reset_large_cache();
}
#[inline(always)]
fn index_to_page_index(index: usize) -> usize {
index / Self::PER_PAGE
}
#[inline(always)]
fn page_index_to_index(page_index: usize) -> usize {
page_index * Self::PER_PAGE
}
}
impl<I, T> DynamicVec for CompressedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
type I = I;
type T = T;
#[inline]
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let cached_start = self
.stored_len()
.checked_sub(Self::CACHE_LENGTH)
.unwrap_or_default();
let decoded_index = index % Self::PER_PAGE;
if index >= cached_start {
let trimmed_index = index - cached_start;
if let Some(decoded_pages) = self.decoded_pages.as_ref() {
let decoded_page = decoded_pages
.get(Self::index_to_page_index(trimmed_index))
.unwrap();
return Ok(decoded_page
.get_or_init(|| {
self.decode_page(Self::index_to_page_index(index), mmap)
.unwrap()
})
.get(decoded_index)
.cloned());
}
}
let page_index = Self::index_to_page_index(index);
Ok(self
.decode_page(page_index, mmap)?
.get(decoded_index)
.cloned())
}
#[inline]
fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
Self::cached_get_stored__(
index,
mmap,
self.stored_len(),
&mut self.decoded_page,
&self.pages_meta.load(),
)
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
self.inner.mmap()
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
self.inner.guard()
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
self.inner.mut_guard()
}
fn stored_len(&self) -> usize {
let pages_meta = self.pages_meta.load();
if let Some(last) = pages_meta.last() {
(pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize
} else {
0
}
}
#[inline]
fn pushed(&self) -> &[T] {
self.inner.pushed()
}
#[inline]
fn mut_pushed(&mut self) -> &mut Vec<T> {
self.inner.mut_pushed()
}
}
impl<I, T> GenericVec<I, T> for CompressedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn iter_from<F>(&mut self, index: I, mut f: F) -> Result<()>
where
F: FnMut((I, T, &mut dyn DynamicVec<I = Self::I, T = Self::T>)) -> Result<()>,
{
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let start = index.to_usize()?;
let stored_len = self.stored_len();
if start >= stored_len {
return Ok(());
}
let mmap = self.mmap().load();
let pages_meta = self.pages_meta.load();
(start..stored_len).try_for_each(|index| {
let v = Self::cached_get_stored__(
index,
&mmap,
stored_len,
&mut self.decoded_page,
&pages_meta,
)?
.unwrap();
f((I::from(index), v, self as &mut dyn DynamicVec<I = I, T = T>))
})
}
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<Self::T>> {
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let stored_len = self.stored_len();
let from = from.unwrap_or_default();
let to = to.map_or(stored_len, |i| i.min(stored_len));
if from >= stored_len {
return Ok(vec![]);
}
let mmap = self.mmap().load();
let pages_meta = self.pages_meta.load();
let mut decoded_page: Option<(usize, Vec<T>)> = None;
(from..to)
.map(|index| {
Self::cached_get_stored__(index, &mmap, stored_len, &mut decoded_page, &pages_meta)
.map(|opt| opt.unwrap())
})
.collect::<Result<Vec<_>>>()
}
fn flush(&mut self) -> Result<()> {
let pushed_len = self.pushed_len();
if pushed_len == 0 {
return Ok(());
}
let stored_len = self.stored_len();
let mut pages_meta = (**self.pages_meta.load()).clone();
let mut starting_page_index = pages_meta.len();
let mut values = vec![];
let mut truncate_at = None;
if self.stored_len() % Self::PER_PAGE != 0 {
if pages_meta.is_empty() {
unreachable!()
}
let last_page_index = pages_meta.len() - 1;
values = if let Some(values) = self
.decoded_pages
.as_mut()
.and_then(|v| v.last_mut().and_then(|lock| lock.take()))
{
values
} else if self
.decoded_page
.as_ref()
.is_some_and(|(page_index, _)| *page_index == last_page_index)
{
self.decoded_page.take().unwrap().1
} else {
Self::decode_page_(
stored_len,
last_page_index,
self.guard().as_ref().unwrap(),
&pages_meta,
)
.inspect_err(|_| {
dbg!(last_page_index, &pages_meta);
})
.unwrap()
};
truncate_at.replace(pages_meta.pop().unwrap().start);
starting_page_index = last_page_index;
}
let compressed = values
.into_par_iter()
.chain(mem::take(self.mut_pushed()).into_par_iter())
.chunks(Self::PER_PAGE)
.map(|chunk| (Self::compress_page(chunk.as_ref()), chunk.len()))
.collect::<Vec<_>>();
compressed
.iter()
.enumerate()
.for_each(|(i, (compressed_bytes, values_len))| {
let page_index = starting_page_index + i;
let start = if page_index != 0 {
let prev = pages_meta.get(page_index - 1).unwrap();
prev.start + prev.bytes_len as u64
} else {
0
};
let bytes_len = compressed_bytes.len() as u32;
let values_len = *values_len as u32;
let page = CompressedPageMetadata::new(start, bytes_len, values_len);
pages_meta.push(page_index, page);
});
let buf = compressed
.into_iter()
.flat_map(|(v, _)| v)
.collect::<Vec<_>>();
pages_meta.write()?;
if let Some(truncate_at) = truncate_at {
self.file_set_len(truncate_at)?;
}
self.file_write_all(&buf)?;
self.pages_meta.store(Arc::new(pages_meta));
self.reset_caches();
Ok(())
}
fn truncate_if_needed(&mut self, index: I) -> Result<()> {
let index = index.to_usize()?;
if index >= self.stored_len() {
return Ok(());
}
if index == 0 {
self.reset()?;
return Ok(());
}
let page_index = Self::index_to_page_index(index);
let guard = self.guard().as_ref().unwrap();
let mut pages_meta = (**self.pages_meta.load()).clone();
let values = self.decode_page(page_index, guard)?;
let mut buf = vec![];
let mut page = pages_meta.truncate(page_index).unwrap();
let len = page.start;
let decoded_index = index % Self::PER_PAGE;
if decoded_index != 0 {
let chunk = &values[..decoded_index];
buf = Self::compress_page(chunk);
page.values_len = chunk.len() as u32;
page.bytes_len = buf.len() as u32;
pages_meta.push(page_index, page);
}
pages_meta.write()?;
self.pages_meta.store(Arc::new(pages_meta));
self.file_truncate_and_write_all(len, &buf)?;
self.reset_caches();
Ok(())
}
#[inline]
fn path(&self) -> &Path {
self.inner.path()
}
#[inline]
fn version(&self) -> Version {
self.inner.version()
}
}
impl<I, T> Clone for CompressedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
decoded_page: None,
decoded_pages: None,
pages_meta: self.pages_meta.clone(),
}
}
}
+5
View File
@@ -0,0 +1,5 @@
mod compressed;
mod raw;
pub use compressed::*;
pub use raw::*;
+241
View File
@@ -0,0 +1,241 @@
use std::{
fs,
marker::PhantomData,
mem,
path::{Path, PathBuf},
sync::Arc,
};
use arc_swap::{ArcSwap, Guard};
use memmap2::Mmap;
use rayon::prelude::*;
use crate::{DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Version};
#[derive(Debug)]
pub struct RawVec<I, T> {
version: Version,
pathbuf: PathBuf,
// Consider Arc<ArcSwap<Option<Mmap>>> for dataraces when reorg ?
mmap: Arc<ArcSwap<Mmap>>,
guard: Option<Guard<Arc<Mmap>>>,
pushed: Vec<T>,
phantom: PhantomData<I>,
}
impl<I, T> RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
/// Same as import but will reset the folder under certain errors, so be careful !
pub fn forced_import(path: &Path, version: Version) -> Result<Self> {
let res = Self::import(path, version);
match res {
Err(Error::WrongEndian) | Err(Error::DifferentVersion { .. }) => {
fs::remove_dir_all(path)?;
Self::import(path, version)
}
_ => res,
}
}
pub fn import(path: &Path, version: Version) -> Result<Self> {
fs::create_dir_all(path)?;
let version_path = Self::path_version_(path);
version.validate(version_path.as_ref())?;
version.write(version_path.as_ref())?;
let file = Self::open_file_(Self::path_vec_(path).as_path())?;
let mmap = Arc::new(ArcSwap::new(Self::new_mmap(file)?));
let guard = Some(mmap.load());
Ok(Self {
mmap,
guard,
version,
pathbuf: path.to_owned(),
pushed: vec![],
phantom: PhantomData,
})
}
}
impl<I, T> DynamicVec for RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
type I = I;
type T = T;
#[inline]
fn get_stored_(&self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
let index = index * Self::SIZE_OF_T;
let slice = &mmap[index..(index + Self::SIZE_OF_T)];
Self::T::try_read_from_bytes(slice)
.map(|v| Some(v))
.map_err(Error::from)
}
#[inline]
fn cached_get_stored_(&mut self, index: usize, mmap: &Mmap) -> Result<Option<T>> {
self.get_stored_(index, mmap)
}
#[inline]
fn mmap(&self) -> &ArcSwap<Mmap> {
&self.mmap
}
#[inline]
fn guard(&self) -> &Option<Guard<Arc<Mmap>>> {
&self.guard
}
#[inline]
fn mut_guard(&mut self) -> &mut Option<Guard<Arc<Mmap>>> {
&mut self.guard
}
#[inline]
fn stored_len(&self) -> usize {
if let Some(guard) = self.guard() {
guard.len() / Self::SIZE_OF_T
} else {
self.mmap.load().len() / Self::SIZE_OF_T
}
}
#[inline]
fn pushed(&self) -> &[T] {
self.pushed.as_slice()
}
#[inline]
fn mut_pushed(&mut self) -> &mut Vec<T> {
&mut self.pushed
}
}
impl<I, T> GenericVec<I, T> for RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn iter_from<F>(&mut self, index: I, mut f: F) -> Result<()>
where
F: FnMut((I, T, &mut dyn DynamicVec<I = Self::I, T = Self::T>)) -> Result<()>,
{
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let start = index.to_usize()?;
let stored_len = self.stored_len();
if start >= stored_len {
return Ok(());
}
let guard = self.mmap.load();
(start..stored_len).try_for_each(|index| {
let v = self.get_stored_(index, &guard)?.unwrap();
f((I::from(index), v, self as &mut dyn DynamicVec<I = I, T = T>))
})
}
fn collect_range(&self, from: Option<usize>, to: Option<usize>) -> Result<Vec<T>> {
if !self.is_pushed_empty() {
return Err(Error::UnsupportedUnflushedState);
}
let stored_len = self.stored_len();
let from = from.unwrap_or_default();
let to = to.map_or(stored_len, |i| i.min(stored_len));
if from >= stored_len {
return Ok(vec![]);
}
let mmap = self.mmap.load();
(from..to)
.map(|index| self.get_stored_(index, &mmap).map(|opt| opt.unwrap()))
.collect::<Result<Vec<_>>>()
}
fn flush(&mut self) -> Result<()> {
let pushed_len = self.pushed_len();
if pushed_len == 0 {
return Ok(());
}
let bytes = {
let pushed = &mut self.pushed;
let mut bytes: Vec<u8> = vec![0; pushed.len() * Self::SIZE_OF_T];
let unsafe_bytes = UnsafeSlice::new(&mut bytes);
mem::take(pushed)
.into_par_iter()
.enumerate()
.for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes()));
bytes
};
self.file_write_all(&bytes)?;
Ok(())
}
fn truncate_if_needed(&mut self, index: I) -> Result<()> {
let index = index.to_usize()?;
if index >= self.stored_len() {
return Ok(());
}
if index == 0 {
self.reset()?;
return Ok(());
}
let len = index * Self::SIZE_OF_T;
self.file_set_len(len as u64)?;
Ok(())
}
#[inline]
fn path(&self) -> &Path {
self.pathbuf.as_path()
}
#[inline]
fn version(&self) -> Version {
self.version
}
}
impl<I, T> Clone for RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
fn clone(&self) -> Self {
Self {
version: self.version,
pathbuf: self.pathbuf.clone(),
// Consider Arc<ArcSwap<Option<Mmap>>> for dataraces when reorg ?
mmap: self.mmap.clone(),
guard: None,
pushed: vec![],
phantom: PhantomData,
}
}
}
+46 -36
View File
@@ -18,7 +18,7 @@
* "Transactions" |
* "US Dollars" |
* "Virtual Bytes" |
* "Weight"
* "Weight Units"
* } Unit
*
* @typedef {Object} BaseSeriesBlueprint
@@ -810,23 +810,28 @@ function createPartialOptions(colors) {
name: "Charts",
tree: [
{
name: "btc/usd",
title: "Bitcoin Price in US Dollars",
},
{
name: "usd/sats",
title: "Satoshis Per US Dollar",
unit: "Satoshis",
bottom: [
name: "Price",
tree: [
{
key: "sats-per-dollar",
title: "Satoshis",
color: colors.bitcoin,
name: "btc/usd",
title: "Bitcoin Price in US Dollars",
},
{
name: "usd/sats",
title: "Satoshis Per US Dollar",
unit: "Satoshis",
bottom: [
{
key: "sats-per-dollar",
title: "Satoshis",
color: colors.bitcoin,
},
],
},
],
},
{
name: "Blocks",
name: "Block",
tree: [
createBaseSumTotal({
name: "Count",
@@ -856,28 +861,33 @@ function createPartialOptions(colors) {
],
},
{
name: "Transactions",
name: "Transaction",
tree: [
{
name: "Inputs",
tree: [
createBaseSumTotal({
name: "Count",
title: "Transaction Input Count",
key: "input-count",
}),
],
},
{
name: "Outputs",
tree: [
createBaseSumTotal({
name: "Count",
title: "Transaction Output Count",
key: "output-count",
}),
],
},
createBaseSumTotal({
name: "Count",
title: "Transaction Count",
key: "tx-count",
}),
],
},
{
name: "Input",
tree: [
createBaseSumTotal({
name: "Count",
title: "Transaction Input Count",
key: "input-count",
}),
],
},
{
name: "Output",
tree: [
createBaseSumTotal({
name: "Count",
title: "Transaction Output Count",
key: "output-count",
}),
],
},
],
@@ -934,7 +944,7 @@ function createPartialOptions(colors) {
url: () => "https://status.kibo.money/",
},
{
name: "Crate",
name: "Crates",
url: () => "https://crates.io/crates/brk",
},
],
@@ -1241,7 +1251,7 @@ export function initOptions({
} else if (key.includes("-size")) {
anyPartial.unit = "Megabytes";
} else if (key.includes("-weight")) {
anyPartial.unit = "Weight";
anyPartial.unit = "Weight Units";
} else if (key.includes("-vbytes")) {
anyPartial.unit = "Virtual Bytes";
} else {
@@ -152,10 +152,25 @@ export function createVecIdToIndexes() {
"total-input-count": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"total-output-count": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"total-size": [Txindex],
"total-tx-count": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"total-tx-v1": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"total-tx-v2": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"total-tx-v3": [Dateindex, Height, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"tx-count": [Height],
"tx-count-sum": [Dateindex, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"tx-v1": [Height],
"tx-v1-sum": [Dateindex, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"tx-v2": [Height],
"tx-v2-sum": [Dateindex, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
"tx-v3": [Height],
"tx-v3-sum": [Dateindex, Weekindex, Monthindex, Quarterindex, Yearindex, Decadeindex, Difficultyepoch],
txid: [Txindex],
txoutindex: [Txinindex],
txversion: [Txindex],
value: [Txoutindex],
v1: [Txindex],
v2: [Txindex],
v3: [Txindex],
value: [Txinindex, Txoutindex],
weekindex: [Dateindex, Weekindex],
yearindex: [Monthindex, Yearindex],
}