vec: rework part 3

This commit is contained in:
nym21
2025-04-10 01:11:52 +02:00
parent 41cf0225e3
commit 0dd7e9359e
12 changed files with 84 additions and 42 deletions

View File

@@ -272,9 +272,10 @@ impl RunConfig {
} }
fn read(path: &Path) -> Self { fn read(path: &Path) -> Self {
fs::read_to_string(path).map_or_else(RunConfig::default, |contents| { fs::read_to_string(path).map_or_else(
toml::from_str(&contents).unwrap_or_default() |_| RunConfig::default(),
}) |contents| toml::from_str(&contents).unwrap_or_default(),
)
} }
fn write(&self, path: &Path) -> std::io::Result<()> { fn write(&self, path: &Path) -> std::io::Result<()> {

View File

@@ -2,7 +2,6 @@ use core::error;
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
fmt::Debug, fmt::Debug,
io,
ops::{Add, Sub}, ops::{Add, Sub},
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
@@ -73,7 +72,7 @@ where
} }
if self.vec.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE { if self.vec.pushed_len() * Self::SIZE_OF >= MAX_CACHE_SIZE {
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} else { } else {
Ok(()) Ok(())
} }
@@ -117,9 +116,9 @@ where
self.vec.get(index) self.vec.get(index)
} }
// pub fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<T>> { pub fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<T>> {
// self.vec.collect_range(from, to) self.vec.collect_range(from, to)
// } }
#[inline] #[inline]
fn path_computed_version(&self) -> PathBuf { fn path_computed_version(&self) -> PathBuf {
@@ -157,7 +156,7 @@ where
self.forced_push_at(i, v, exit) self.forced_push_at(i, v, exit)
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_inverse_more_to_less( pub fn compute_inverse_more_to_less(
@@ -187,7 +186,7 @@ where
} }
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_inverse_less_to_more( pub fn compute_inverse_less_to_more(
@@ -213,7 +212,7 @@ where
.try_for_each(|index| self.forced_push_at(I::from(index), value, exit)) .try_for_each(|index| self.forced_push_at(I::from(index), value, exit))
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_last_index_from_first( pub fn compute_last_index_from_first(
@@ -248,7 +247,7 @@ where
)?; )?;
} }
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_count_from_indexes<T2>( pub fn compute_count_from_indexes<T2>(
@@ -276,7 +275,7 @@ where
self.forced_push_at(i, count.into(), exit) self.forced_push_at(i, count.into(), exit)
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_is_first_ordered<A>( pub fn compute_is_first_ordered<A>(
@@ -304,7 +303,7 @@ where
) )
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
pub fn compute_sum_from_indexes<T2>( pub fn compute_sum_from_indexes<T2>(
@@ -330,7 +329,7 @@ where
self.forced_push_at(index, count.into(), exit) self.forced_push_at(index, count.into(), exit)
})?; })?;
Ok(self.safe_flush(exit)?) self.safe_flush(exit)
} }
} }

View File

@@ -4,7 +4,7 @@ use brk_core::{CheckedSub, StoredU32, StoredU64, StoredUsize, Timestamp, Weight}
use brk_exit::Exit; use brk_exit::Exit;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_parser::bitcoin; use brk_parser::bitcoin;
use brk_vec::{Compressed, DynamicVec, Version}; use brk_vec::{Compressed, Version};
use super::{ use super::{
Indexes, Indexes,

View File

@@ -164,7 +164,6 @@ where
let index = self.starting_index(max_from); let index = self.starting_index(max_from);
first_indexes.iter_from(index, |(i, first_index, ..)| { first_indexes.iter_from(index, |(i, first_index, ..)| {
let first_index = first_index;
let last_index = *last_indexes.get(i).unwrap().unwrap(); let last_index = *last_indexes.get(i).unwrap().unwrap();
if let Some(first) = self.first.as_mut() { if let Some(first) = self.first.as_mut() {
@@ -298,8 +297,7 @@ where
let index = self.starting_index(max_from); let index = self.starting_index(max_from);
first_indexes.iter_from(index, |(i, first_index)| { first_indexes.iter_from(index, |(i, first_index, ..)| {
let first_index = *first_index;
let last_index = *last_indexes.get(i).unwrap().unwrap(); let last_index = *last_indexes.get(i).unwrap().unwrap();
if let Some(first) = self.first.as_mut() { if let Some(first) = self.first.as_mut() {
@@ -321,8 +319,8 @@ where
.unwrap() .unwrap()
.get(last_index) .get(last_index)
.unwrap() .unwrap()
.cloned() .unwrap()
.unwrap(); .into_inner();
last.forced_push_at(index, v, exit)?; last.forced_push_at(index, v, exit)?;
} }
@@ -391,7 +389,11 @@ where
let prev = i.to_usize().unwrap().checked_sub(1).map_or( let prev = i.to_usize().unwrap().checked_sub(1).map_or(
T::from(0_usize), T::from(0_usize),
|prev_i| { |prev_i| {
total_vec.get(I::from(prev_i)).unwrap().unwrap().to_owned() total_vec
.get(I::from(prev_i))
.unwrap()
.unwrap()
.into_inner()
}, },
); );
total_vec.forced_push_at(i, prev + sum, exit)?; total_vec.forced_push_at(i, prev + sum, exit)?;

View File

@@ -7,7 +7,7 @@ use brk_core::{
use brk_exit::Exit; use brk_exit::Exit;
use brk_fetcher::Fetcher; use brk_fetcher::Fetcher;
use brk_indexer::Indexer; use brk_indexer::Indexer;
use brk_vec::{Compressed, DynamicVec, Version}; use brk_vec::{Compressed, Version};
use super::{ use super::{
ComputedVec, Indexes, ComputedVec, Indexes,

View File

@@ -1,7 +1,6 @@
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
fmt::Debug, fmt::Debug,
io,
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };

View File

@@ -94,7 +94,7 @@ impl<'a> Query<'a> {
let mut values = vecs let mut values = vecs
.iter() .iter()
.map(|(_, vec)| -> brk_vec::Result<Vec<serde_json::Value>> { .map(|(_, vec)| -> brk_vec::Result<Vec<serde_json::Value>> {
vec.collect_range_values(from, to) vec.collect_range_serde_json(from, to)
}) })
.collect::<brk_vec::Result<Vec<_>>>()?; .collect::<brk_vec::Result<Vec<_>>>()?;

View File

@@ -23,6 +23,7 @@ pub enum Error {
UnsupportedUnflushedState, UnsupportedUnflushedState,
RangeFromAfterTo(usize, usize), RangeFromAfterTo(usize, usize),
DifferentCompressionMode, DifferentCompressionMode,
ToSerdeJsonValueError(serde_json::Error),
} }
impl From<io::Error> for Error { impl From<io::Error> for Error {
@@ -43,6 +44,12 @@ impl<A, B> From<zerocopy::error::SizeError<A, B>> for Error {
} }
} }
impl From<serde_json::Error> for Error {
fn from(error: serde_json::Error) -> Self {
Self::ToSerdeJsonValueError(error)
}
}
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
@@ -70,6 +77,7 @@ impl fmt::Display for Error {
Error::RangeFromAfterTo(from, to) => write!(f, "Range, from {from} is after to {to}"), Error::RangeFromAfterTo(from, to) => write!(f, "Range, from {from} is after to {to}"),
Error::DifferentCompressionMode => write!(f, "Different compression mode chosen"), Error::DifferentCompressionMode => write!(f, "Different compression mode chosen"),
Error::EmptyVec => write!(f, "The Vec is empty, maybe wait for a bit"), Error::EmptyVec => write!(f, "The Vec is empty, maybe wait for a bit"),
Error::ToSerdeJsonValueError(error) => Debug::fmt(&error, f),
} }
} }
} }

View File

@@ -14,7 +14,7 @@ use std::{
}; };
use arc_swap::{ArcSwap, Guard}; use arc_swap::{ArcSwap, Guard};
use axum::{Json, response::Response}; use axum::response::Response;
pub use enums::*; pub use enums::*;
use memmap2::Mmap; use memmap2::Mmap;
pub use structs::*; pub use structs::*;
@@ -109,7 +109,7 @@ where
} }
} }
fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Json<Vec<T>>> { fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Self::T>> {
match self { match self {
StoredVec::Raw(v) => v.collect_range(from, to), StoredVec::Raw(v) => v.collect_range(from, to),
StoredVec::Compressed(v) => v.collect_range(from, to), StoredVec::Compressed(v) => v.collect_range(from, to),
@@ -176,6 +176,11 @@ pub trait AnyStoredVec: Send + Sync {
fn len(&self) -> usize; fn len(&self) -> usize;
fn is_empty(&self) -> bool; fn is_empty(&self) -> bool;
fn flush(&mut self) -> Result<()>; fn flush(&mut self) -> Result<()>;
fn collect_range_serde_json(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Vec<serde_json::Value>>;
fn collect_range_response(&self, from: Option<i64>, to: Option<i64>) -> Result<Response>; fn collect_range_response(&self, from: Option<i64>, to: Option<i64>) -> Result<Response>;
fn path_vec(&self) -> PathBuf; fn path_vec(&self) -> PathBuf;
} }
@@ -204,6 +209,16 @@ where
GenericVec::flush(self) GenericVec::flush(self)
} }
#[inline]
fn collect_range_serde_json(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Vec<serde_json::Value>> {
GenericVec::collect_range_serde_json(self, from, to)
}
#[inline]
fn collect_range_response(&self, from: Option<i64>, to: Option<i64>) -> Result<Response> { fn collect_range_response(&self, from: Option<i64>, to: Option<i64>) -> Result<Response> {
GenericVec::collect_range_response(self, from, to) GenericVec::collect_range_response(self, from, to)
} }

View File

@@ -11,6 +11,7 @@ use axum::{
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use memmap2::Mmap; use memmap2::Mmap;
use serde_json::Value;
use crate::{Error, Result, Version}; use crate::{Error, Result, Version};
@@ -159,10 +160,28 @@ where
fn truncate_if_needed(&mut self, index: Self::I) -> Result<()>; fn truncate_if_needed(&mut self, index: Self::I) -> Result<()>;
fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Json<Vec<Self::T>>>; fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Self::T>>;
#[inline]
fn collect_range_axum_json(
&self,
from: Option<i64>,
to: Option<i64>,
) -> Result<Json<Vec<Self::T>>> {
Ok(Json(self.collect_range(from, to)?))
}
#[inline]
fn collect_range_serde_json(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Value>> {
self.collect_range(from, to)?
.into_iter()
.map(|v| serde_json::to_value(v).map_err(Error::from))
.collect::<Result<Vec<_>>>()
}
#[inline]
fn collect_range_response(&self, from: Option<i64>, to: Option<i64>) -> Result<Response> { fn collect_range_response(&self, from: Option<i64>, to: Option<i64>) -> Result<Response> {
Ok(self.collect_range(from, to)?.into_response()) Ok(self.collect_range_axum_json(from, to)?.into_response())
} }
fn path(&self) -> &Path; fn path(&self) -> &Path;
@@ -181,6 +200,7 @@ where
path.join("version") path.join("version")
} }
#[inline]
fn file_name(&self) -> String { fn file_name(&self) -> String {
self.path() self.path()
.file_name() .file_name()

View File

@@ -5,7 +5,6 @@ use std::{
}; };
use arc_swap::{ArcSwap, Guard}; use arc_swap::{ArcSwap, Guard};
use axum::Json;
use memmap2::Mmap; use memmap2::Mmap;
use rayon::prelude::*; use rayon::prelude::*;
use zstd::DEFAULT_COMPRESSION_LEVEL; use zstd::DEFAULT_COMPRESSION_LEVEL;
@@ -232,7 +231,7 @@ where
todo!() todo!()
} }
fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Json<Vec<T>>> { fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<Self::T>> {
todo!() todo!()
} }

View File

@@ -7,7 +7,6 @@ use std::{
}; };
use arc_swap::{ArcSwap, Guard}; use arc_swap::{ArcSwap, Guard};
use axum::Json;
use memmap2::Mmap; use memmap2::Mmap;
use rayon::prelude::*; use rayon::prelude::*;
@@ -116,8 +115,8 @@ where
impl<I, T> GenericVec<I, T> for RawVec<I, T> impl<I, T> GenericVec<I, T> for RawVec<I, T>
where where
I: StoredIndex + Send + Sync, I: StoredIndex,
T: StoredType + Send + Sync, T: StoredType,
{ {
fn iter_from<F>(&mut self, index: I, mut f: F) -> Result<()> fn iter_from<F>(&mut self, index: I, mut f: F) -> Result<()>
where where
@@ -131,6 +130,8 @@ where
let start = index.to_usize()? * Self::SIZE_OF_T; let start = index.to_usize()? * Self::SIZE_OF_T;
dbg!(self.path());
guard[start..] guard[start..]
.chunks(Self::SIZE_OF_T) .chunks(Self::SIZE_OF_T)
.enumerate() .enumerate()
@@ -188,24 +189,22 @@ where
Ok(()) Ok(())
} }
fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Json<Vec<T>>> { fn collect_range(&self, from: Option<i64>, to: Option<i64>) -> Result<Vec<T>> {
let guard = self.mmap.load(); let guard = self.mmap.load();
let len = guard.len() / Self::SIZE_OF_T; let len = guard.len() / Self::SIZE_OF_T;
if len == 0 { if len == 0 {
return Ok(Json(vec![])); return Ok(vec![]);
} }
let from = from.map_or(0, |i| Self::fix_i64(i, len, true)); let from = from.map_or(0, |i| Self::fix_i64(i, len, true));
let to = to.map_or(len, |i| Self::fix_i64(i, len, false)); let to = to.map_or(len, |i| Self::fix_i64(i, len, false));
Ok(Json( Ok(guard[from * Self::SIZE_OF_T..to * Self::SIZE_OF_T]
guard[from * Self::SIZE_OF_T..to * Self::SIZE_OF_T] .chunks(Self::SIZE_OF_T)
.chunks(Self::SIZE_OF_T) .map(|chunk| T::try_read_from_bytes(chunk).unwrap())
.map(|chunk| T::try_read_from_bytes(chunk).unwrap()) .collect::<Vec<_>>())
.collect::<Vec<_>>(),
))
} }
#[inline] #[inline]