From 0dd7e9359eb20634397fb848f05e118dde917888 Mon Sep 17 00:00:00 2001 From: nym21 Date: Thu, 10 Apr 2025 01:11:52 +0200 Subject: [PATCH] vec: rework part 3 --- crates/brk_cli/src/run.rs | 7 +++--- crates/brk_computer/src/storage/vecs/base.rs | 23 +++++++++--------- .../brk_computer/src/storage/vecs/blocks.rs | 2 +- .../src/storage/vecs/grouped/builder.rs | 14 ++++++----- .../src/storage/vecs/marketprice.rs | 2 +- crates/brk_indexer/src/vecs/base.rs | 1 - crates/brk_query/src/lib.rs | 2 +- crates/brk_vec/src/enums/error.rs | 8 +++++++ crates/brk_vec/src/lib.rs | 19 +++++++++++++-- crates/brk_vec/src/traits/generic.rs | 24 +++++++++++++++++-- crates/brk_vec/src/variants/compressed.rs | 3 +-- crates/brk_vec/src/variants/raw.rs | 21 ++++++++-------- 12 files changed, 84 insertions(+), 42 deletions(-) diff --git a/crates/brk_cli/src/run.rs b/crates/brk_cli/src/run.rs index d09790366..ab10f1b99 100644 --- a/crates/brk_cli/src/run.rs +++ b/crates/brk_cli/src/run.rs @@ -272,9 +272,10 @@ impl RunConfig { } fn read(path: &Path) -> Self { - fs::read_to_string(path).map_or_else(RunConfig::default, |contents| { - toml::from_str(&contents).unwrap_or_default() - }) + fs::read_to_string(path).map_or_else( + |_| RunConfig::default(), + |contents| toml::from_str(&contents).unwrap_or_default(), + ) } fn write(&self, path: &Path) -> std::io::Result<()> { diff --git a/crates/brk_computer/src/storage/vecs/base.rs b/crates/brk_computer/src/storage/vecs/base.rs index 61b34ace0..933debe7c 100644 --- a/crates/brk_computer/src/storage/vecs/base.rs +++ b/crates/brk_computer/src/storage/vecs/base.rs @@ -2,7 +2,6 @@ use core::error; use std::{ cmp::Ordering, fmt::Debug, - io, ops::{Add, Sub}, path::{Path, PathBuf}, }; @@ -73,7 +72,7 @@ where } if self.vec.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE { - Ok(self.safe_flush(exit)?) + self.safe_flush(exit) } else { Ok(()) } @@ -117,9 +116,9 @@ where self.vec.get(index) } - // pub fn collect_range(&self, from: Option, to: Option) -> Result> { - // self.vec.collect_range(from, to) - // } + pub fn collect_range(&self, from: Option, to: Option) -> Result> { + self.vec.collect_range(from, to) + } #[inline] fn path_computed_version(&self) -> PathBuf { @@ -157,7 +156,7 @@ where self.forced_push_at(i, v, exit) })?; - Ok(self.safe_flush(exit)?) + self.safe_flush(exit) } pub fn compute_inverse_more_to_less( @@ -187,7 +186,7 @@ where } })?; - Ok(self.safe_flush(exit)?) + self.safe_flush(exit) } pub fn compute_inverse_less_to_more( @@ -213,7 +212,7 @@ where .try_for_each(|index| self.forced_push_at(I::from(index), value, exit)) })?; - Ok(self.safe_flush(exit)?) + self.safe_flush(exit) } pub fn compute_last_index_from_first( @@ -248,7 +247,7 @@ where )?; } - Ok(self.safe_flush(exit)?) + self.safe_flush(exit) } pub fn compute_count_from_indexes( @@ -276,7 +275,7 @@ where self.forced_push_at(i, count.into(), exit) })?; - Ok(self.safe_flush(exit)?) + self.safe_flush(exit) } pub fn compute_is_first_ordered( @@ -304,7 +303,7 @@ where ) })?; - Ok(self.safe_flush(exit)?) + self.safe_flush(exit) } pub fn compute_sum_from_indexes( @@ -330,7 +329,7 @@ where self.forced_push_at(index, count.into(), exit) })?; - Ok(self.safe_flush(exit)?) + self.safe_flush(exit) } } diff --git a/crates/brk_computer/src/storage/vecs/blocks.rs b/crates/brk_computer/src/storage/vecs/blocks.rs index 94c0d97d6..1cca8e0d0 100644 --- a/crates/brk_computer/src/storage/vecs/blocks.rs +++ b/crates/brk_computer/src/storage/vecs/blocks.rs @@ -4,7 +4,7 @@ use brk_core::{CheckedSub, StoredU32, StoredU64, StoredUsize, Timestamp, Weight} use brk_exit::Exit; use brk_indexer::Indexer; use brk_parser::bitcoin; -use brk_vec::{Compressed, DynamicVec, Version}; +use brk_vec::{Compressed, Version}; use super::{ Indexes, diff --git a/crates/brk_computer/src/storage/vecs/grouped/builder.rs b/crates/brk_computer/src/storage/vecs/grouped/builder.rs index 4f615b0d0..cb951347c 100644 --- a/crates/brk_computer/src/storage/vecs/grouped/builder.rs +++ b/crates/brk_computer/src/storage/vecs/grouped/builder.rs @@ -164,7 +164,6 @@ where let index = self.starting_index(max_from); first_indexes.iter_from(index, |(i, first_index, ..)| { - let first_index = first_index; let last_index = *last_indexes.get(i).unwrap().unwrap(); if let Some(first) = self.first.as_mut() { @@ -298,8 +297,7 @@ where let index = self.starting_index(max_from); - first_indexes.iter_from(index, |(i, first_index)| { - let first_index = *first_index; + first_indexes.iter_from(index, |(i, first_index, ..)| { let last_index = *last_indexes.get(i).unwrap().unwrap(); if let Some(first) = self.first.as_mut() { @@ -321,8 +319,8 @@ where .unwrap() .get(last_index) .unwrap() - .cloned() - .unwrap(); + .unwrap() + .into_inner(); last.forced_push_at(index, v, exit)?; } @@ -391,7 +389,11 @@ where let prev = i.to_usize().unwrap().checked_sub(1).map_or( T::from(0_usize), |prev_i| { - total_vec.get(I::from(prev_i)).unwrap().unwrap().to_owned() + total_vec + .get(I::from(prev_i)) + .unwrap() + .unwrap() + .into_inner() }, ); total_vec.forced_push_at(i, prev + sum, exit)?; diff --git a/crates/brk_computer/src/storage/vecs/marketprice.rs b/crates/brk_computer/src/storage/vecs/marketprice.rs index 959b5ae6f..c0aa20e33 100644 --- a/crates/brk_computer/src/storage/vecs/marketprice.rs +++ b/crates/brk_computer/src/storage/vecs/marketprice.rs @@ -7,7 +7,7 @@ use brk_core::{ use brk_exit::Exit; use brk_fetcher::Fetcher; use brk_indexer::Indexer; -use brk_vec::{Compressed, DynamicVec, Version}; +use brk_vec::{Compressed, Version}; use super::{ ComputedVec, Indexes, diff --git a/crates/brk_indexer/src/vecs/base.rs b/crates/brk_indexer/src/vecs/base.rs index daf87217f..d340f263c 100644 --- a/crates/brk_indexer/src/vecs/base.rs +++ b/crates/brk_indexer/src/vecs/base.rs @@ -1,7 +1,6 @@ use std::{ cmp::Ordering, fmt::Debug, - io, path::{Path, PathBuf}, }; diff --git a/crates/brk_query/src/lib.rs b/crates/brk_query/src/lib.rs index c3b731712..dc27f83f3 100644 --- a/crates/brk_query/src/lib.rs +++ b/crates/brk_query/src/lib.rs @@ -94,7 +94,7 @@ impl<'a> Query<'a> { let mut values = vecs .iter() .map(|(_, vec)| -> brk_vec::Result> { - vec.collect_range_values(from, to) + vec.collect_range_serde_json(from, to) }) .collect::>>()?; diff --git a/crates/brk_vec/src/enums/error.rs b/crates/brk_vec/src/enums/error.rs index f1a117467..645e5be53 100644 --- a/crates/brk_vec/src/enums/error.rs +++ b/crates/brk_vec/src/enums/error.rs @@ -23,6 +23,7 @@ pub enum Error { UnsupportedUnflushedState, RangeFromAfterTo(usize, usize), DifferentCompressionMode, + ToSerdeJsonValueError(serde_json::Error), } impl From for Error { @@ -43,6 +44,12 @@ impl From> for Error { } } +impl From for Error { + fn from(error: serde_json::Error) -> Self { + Self::ToSerdeJsonValueError(error) + } +} + impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -70,6 +77,7 @@ impl fmt::Display for Error { Error::RangeFromAfterTo(from, to) => write!(f, "Range, from {from} is after to {to}"), Error::DifferentCompressionMode => write!(f, "Different compression mode chosen"), Error::EmptyVec => write!(f, "The Vec is empty, maybe wait for a bit"), + Error::ToSerdeJsonValueError(error) => Debug::fmt(&error, f), } } } diff --git a/crates/brk_vec/src/lib.rs b/crates/brk_vec/src/lib.rs index 13711a91a..565930d57 100644 --- a/crates/brk_vec/src/lib.rs +++ b/crates/brk_vec/src/lib.rs @@ -14,7 +14,7 @@ use std::{ }; use arc_swap::{ArcSwap, Guard}; -use axum::{Json, response::Response}; +use axum::response::Response; pub use enums::*; use memmap2::Mmap; pub use structs::*; @@ -109,7 +109,7 @@ where } } - fn collect_range(&self, from: Option, to: Option) -> Result>> { + fn collect_range(&self, from: Option, to: Option) -> Result> { match self { StoredVec::Raw(v) => v.collect_range(from, to), StoredVec::Compressed(v) => v.collect_range(from, to), @@ -176,6 +176,11 @@ pub trait AnyStoredVec: Send + Sync { fn len(&self) -> usize; fn is_empty(&self) -> bool; fn flush(&mut self) -> Result<()>; + fn collect_range_serde_json( + &self, + from: Option, + to: Option, + ) -> Result>; fn collect_range_response(&self, from: Option, to: Option) -> Result; fn path_vec(&self) -> PathBuf; } @@ -204,6 +209,16 @@ where GenericVec::flush(self) } + #[inline] + fn collect_range_serde_json( + &self, + from: Option, + to: Option, + ) -> Result> { + GenericVec::collect_range_serde_json(self, from, to) + } + + #[inline] fn collect_range_response(&self, from: Option, to: Option) -> Result { GenericVec::collect_range_response(self, from, to) } diff --git a/crates/brk_vec/src/traits/generic.rs b/crates/brk_vec/src/traits/generic.rs index 5402da62f..83282dd25 100644 --- a/crates/brk_vec/src/traits/generic.rs +++ b/crates/brk_vec/src/traits/generic.rs @@ -11,6 +11,7 @@ use axum::{ response::{IntoResponse, Response}, }; use memmap2::Mmap; +use serde_json::Value; use crate::{Error, Result, Version}; @@ -159,10 +160,28 @@ where fn truncate_if_needed(&mut self, index: Self::I) -> Result<()>; - fn collect_range(&self, from: Option, to: Option) -> Result>>; + fn collect_range(&self, from: Option, to: Option) -> Result>; + #[inline] + fn collect_range_axum_json( + &self, + from: Option, + to: Option, + ) -> Result>> { + Ok(Json(self.collect_range(from, to)?)) + } + + #[inline] + fn collect_range_serde_json(&self, from: Option, to: Option) -> Result> { + self.collect_range(from, to)? + .into_iter() + .map(|v| serde_json::to_value(v).map_err(Error::from)) + .collect::>>() + } + + #[inline] fn collect_range_response(&self, from: Option, to: Option) -> Result { - Ok(self.collect_range(from, to)?.into_response()) + Ok(self.collect_range_axum_json(from, to)?.into_response()) } fn path(&self) -> &Path; @@ -181,6 +200,7 @@ where path.join("version") } + #[inline] fn file_name(&self) -> String { self.path() .file_name() diff --git a/crates/brk_vec/src/variants/compressed.rs b/crates/brk_vec/src/variants/compressed.rs index a2edb5708..f8e62bb43 100644 --- a/crates/brk_vec/src/variants/compressed.rs +++ b/crates/brk_vec/src/variants/compressed.rs @@ -5,7 +5,6 @@ use std::{ }; use arc_swap::{ArcSwap, Guard}; -use axum::Json; use memmap2::Mmap; use rayon::prelude::*; use zstd::DEFAULT_COMPRESSION_LEVEL; @@ -232,7 +231,7 @@ where todo!() } - fn collect_range(&self, from: Option, to: Option) -> Result>> { + fn collect_range(&self, from: Option, to: Option) -> Result> { todo!() } diff --git a/crates/brk_vec/src/variants/raw.rs b/crates/brk_vec/src/variants/raw.rs index 687f40e9a..92154c1d6 100644 --- a/crates/brk_vec/src/variants/raw.rs +++ b/crates/brk_vec/src/variants/raw.rs @@ -7,7 +7,6 @@ use std::{ }; use arc_swap::{ArcSwap, Guard}; -use axum::Json; use memmap2::Mmap; use rayon::prelude::*; @@ -116,8 +115,8 @@ where impl GenericVec for RawVec where - I: StoredIndex + Send + Sync, - T: StoredType + Send + Sync, + I: StoredIndex, + T: StoredType, { fn iter_from(&mut self, index: I, mut f: F) -> Result<()> where @@ -131,6 +130,8 @@ where let start = index.to_usize()? * Self::SIZE_OF_T; + dbg!(self.path()); + guard[start..] .chunks(Self::SIZE_OF_T) .enumerate() @@ -188,24 +189,22 @@ where Ok(()) } - fn collect_range(&self, from: Option, to: Option) -> Result>> { + fn collect_range(&self, from: Option, to: Option) -> Result> { let guard = self.mmap.load(); let len = guard.len() / Self::SIZE_OF_T; if len == 0 { - return Ok(Json(vec![])); + return Ok(vec![]); } let from = from.map_or(0, |i| Self::fix_i64(i, len, true)); let to = to.map_or(len, |i| Self::fix_i64(i, len, false)); - Ok(Json( - guard[from * Self::SIZE_OF_T..to * Self::SIZE_OF_T] - .chunks(Self::SIZE_OF_T) - .map(|chunk| T::try_read_from_bytes(chunk).unwrap()) - .collect::>(), - )) + Ok(guard[from * Self::SIZE_OF_T..to * Self::SIZE_OF_T] + .chunks(Self::SIZE_OF_T) + .map(|chunk| T::try_read_from_bytes(chunk).unwrap()) + .collect::>()) } #[inline]