global: snapshot + lock file + better errors

This commit is contained in:
nym21
2025-08-07 17:29:30 +02:00
parent 4740610923
commit 03e3760152
38 changed files with 851 additions and 473 deletions
+4 -4
View File
@@ -47,7 +47,7 @@ on:
jobs:
# Run 'dist plan' (or host) to determine what tasks we need to do
plan:
runs-on: "ubuntu-latest"
runs-on: "ubuntu-22.04"
outputs:
val: ${{ steps.plan.outputs.manifest }}
tag: ${{ !github.event.pull_request && github.ref_name || '' }}
@@ -170,7 +170,7 @@ jobs:
needs:
- plan
- build-local-artifacts
runs-on: "ubuntu-latest"
runs-on: "ubuntu-22.04"
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
BUILD_MANIFEST_NAME: target/distrib/global-dist-manifest.json
@@ -221,7 +221,7 @@ jobs:
if: ${{ always() && needs.plan.outputs.publishing == 'true' && (needs.build-global-artifacts.result == 'skipped' || needs.build-global-artifacts.result == 'success') && (needs.build-local-artifacts.result == 'skipped' || needs.build-local-artifacts.result == 'success') }}
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
runs-on: "ubuntu-latest"
runs-on: "ubuntu-22.04"
outputs:
val: ${{ steps.host.outputs.manifest }}
steps:
@@ -286,7 +286,7 @@ jobs:
# still allowing individual publish jobs to skip themselves (for prereleases).
# "host" however must run to completion, no skipping allowed!
if: ${{ always() && needs.host.result == 'success' }}
runs-on: "ubuntu-latest"
runs-on: "ubuntu-22.04"
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
steps:
Generated
+412 -185
View File
File diff suppressed because it is too large Load Diff
+1 -6
View File
@@ -66,12 +66,7 @@ tag-message = "release: v{{version}}"
cargo-dist-version = "0.29.0"
ci = "github"
installers = []
targets = [
"aarch64-apple-darwin",
"aarch64-unknown-linux-gnu",
"x86_64-apple-darwin",
"x86_64-unknown-linux-gnu",
]
targets = ["aarch64-apple-darwin", "aarch64-unknown-linux-gnu", "x86_64-unknown-linux-gnu"]
# [workspace.metadata.dist.github-custom-runners]
# global = "ubuntu-latest"
+1 -1
View File
@@ -11,7 +11,7 @@ build = "../../build.rs"
[dependencies]
log = { workspace = true }
notify = "8.2.0"
brk_rolldown = "0.1.1"
brk_rolldown = "0.1.4"
# brk_rolldown = { path = "../../../rolldown/crates/rolldown"}
sugar_path = "1.2.0"
tokio = { workspace = true }
+6 -8
View File
@@ -52,14 +52,12 @@ pub async fn bundle(websites_path: &Path, source_folder: &str, watch: bool) -> i
let write_index = move || {
let mut contents = fs::read_to_string(&absolute_source_index_path).unwrap();
if let Ok(entry) = fs::read_to_string(absolute_dist_path_clone.join("scripts/entry.js")) {
if let Some(start) = entry.find("main") {
if let Some(end) = entry.find(".js") {
let main_hashed = &entry[start..end];
contents =
contents.replace("/scripts/main.js", &format!("/scripts/{main_hashed}.js"));
}
}
if let Ok(entry) = fs::read_to_string(absolute_dist_path_clone.join("scripts/entry.js"))
&& let Some(start) = entry.find("main")
&& let Some(end) = entry.find(".js")
{
let main_hashed = &entry[start..end];
contents = contents.replace("/scripts/main.js", &format!("/scripts/{main_hashed}.js"));
}
let _ = fs::write(&absolute_dist_index_path, contents);
+1 -1
View File
@@ -19,7 +19,7 @@ brk_logger = { workspace = true }
brk_parser = { workspace = true }
brk_server = { workspace = true }
brk_vecs = { workspace = true }
clap = { version = "4.5.42", features = ["string"] }
clap = { version = "4.5.43", features = ["string"] }
clap_derive = "4.5.41"
color-eyre = "0.6.5"
log = { workspace = true }
+17 -10
View File
@@ -1,4 +1,8 @@
use std::{path::Path, thread};
use std::{
path::Path,
thread::{self, sleep},
time::{Duration, Instant},
};
use brk_computer::Computer;
use brk_error::Result;
@@ -23,25 +27,28 @@ pub fn main() -> Result<()> {
let exit = Exit::new();
exit.set_ctrlc_handler();
// Can't increase main thread's stack programatically, thus we need to use another thread
// Can't increase main thread's stack size, thus we need to use another thread
thread::Builder::new()
.stack_size(256 * 1024 * 1024)
.spawn(move || -> Result<()> {
let outputs_dir = Path::new("../../_outputs");
let outputs_dir = Path::new(&std::env::var("HOME").unwrap()).join(".brk");
// let outputs_dir = Path::new("../../_outputs");
let parser = Parser::new(bitcoin_dir.join("blocks"), outputs_dir.to_path_buf(), rpc);
let mut indexer = Indexer::forced_import(outputs_dir)?;
let mut indexer = Indexer::forced_import(&outputs_dir)?;
let fetcher = Fetcher::import(None)?;
let mut computer = Computer::forced_import(outputs_dir, &indexer, Some(fetcher))?;
let mut computer = Computer::forced_import(&outputs_dir, &indexer, Some(fetcher))?;
let starting_indexes = indexer.index(&parser, rpc, &exit, true)?;
computer.compute(&indexer, starting_indexes, &exit)?;
Ok(())
loop {
let i = Instant::now();
let starting_indexes = indexer.index(&parser, rpc, &exit, true)?;
computer.compute(&indexer, starting_indexes, &exit)?;
dbg!(i.elapsed());
sleep(Duration::from_secs(10));
}
})?
.join()
.unwrap()
+12 -2
View File
@@ -146,6 +146,18 @@ impl Vecs {
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.compute_(indexer, indexes, starting_indexes, exit)?;
self.file.flush_then_punch()?;
Ok(())
}
fn compute_(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.timeindexes_to_timestamp.compute_all(
indexer,
@@ -251,8 +263,6 @@ impl Vecs {
exit,
)?;
self.file.flush()?;
self.file.punch_holes()?;
Ok(())
}
+24 -2
View File
@@ -308,6 +308,30 @@ impl Vecs {
transactions: &transactions::Vecs,
stateful: &stateful::Vecs,
exit: &Exit,
) -> Result<()> {
self.compute_(
indexer,
indexes,
starting_indexes,
price,
transactions,
stateful,
exit,
)?;
self.file.flush_then_punch()?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn compute_(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
price: Option<&price::Vecs>,
transactions: &transactions::Vecs,
stateful: &stateful::Vecs,
exit: &Exit,
) -> Result<()> {
let circulating_supply = &stateful.utxo_cohorts.all.1.height_to_supply;
@@ -707,8 +731,6 @@ impl Vecs {
)?;
}
self.file.flush()?;
self.file.punch_holes()?;
Ok(())
}
+12 -2
View File
@@ -87,6 +87,18 @@ impl Vecs {
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.compute_(indexer, indexes, starting_indexes, exit)?;
self.file.flush_then_punch()?;
Ok(())
}
fn compute_(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.constant_0.compute_all(
indexer,
@@ -156,8 +168,6 @@ impl Vecs {
},
)?;
self.file.flush()?;
self.file.punch_holes()?;
Ok(())
}
+12 -2
View File
@@ -48,6 +48,18 @@ impl Vecs {
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.compute_(indexer, indexes, starting_indexes, exit)?;
self.file.flush_then_punch()?;
Ok(())
}
fn compute_(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
let height_to_timestamp = &indexer.vecs.height_to_timestamp;
let index = starting_indexes
@@ -115,8 +127,6 @@ impl Vecs {
})?;
self.dateindex_to_ohlc_in_cents.safe_flush(exit)?;
self.file.flush()?;
self.file.punch_holes()?;
Ok(())
}
+19 -11
View File
@@ -580,6 +580,17 @@ impl Vecs {
indexer: &Indexer,
starting_indexes: brk_indexer::Indexes,
exit: &Exit,
) -> Result<Indexes> {
let idxs = self.compute_(indexer, starting_indexes, exit)?;
self.file.flush_then_punch()?;
Ok(idxs)
}
fn compute_(
&mut self,
indexer: &Indexer,
starting_indexes: brk_indexer::Indexes,
exit: &Exit,
) -> Result<Indexes> {
// ---
// OutputIndex
@@ -749,14 +760,14 @@ impl Vecs {
starting_indexes.height,
&indexer.vecs.height_to_timestamp,
|(h, timestamp, height_to_timestamp_fixed_iter)| {
if prev_timestamp_fixed.is_none() {
if let Some(prev_h) = h.decremented() {
prev_timestamp_fixed.replace(
height_to_timestamp_fixed_iter
.into_iter()
.unwrap_get_inner(prev_h),
);
}
if prev_timestamp_fixed.is_none()
&& let Some(prev_h) = h.decremented()
{
prev_timestamp_fixed.replace(
height_to_timestamp_fixed_iter
.into_iter()
.unwrap_get_inner(prev_h),
);
}
let timestamp_fixed =
prev_timestamp_fixed.map_or(timestamp, |prev_d| prev_d.max(timestamp));
@@ -1117,9 +1128,6 @@ impl Vecs {
exit,
)?;
self.file.flush()?;
self.file.punch_holes()?;
Ok(Indexes {
indexes: starting_indexes,
dateindex: starting_dateindex,
+21 -2
View File
@@ -1530,6 +1530,27 @@ impl Vecs {
transactions: &mut transactions::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.compute_(
indexer,
indexes,
price,
transactions,
starting_indexes,
exit,
)?;
self.file.flush_then_punch()?;
Ok(())
}
fn compute_(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
price: &price::Vecs,
transactions: &mut transactions::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.height_to_marketcap.compute_multiply(
starting_indexes.height,
@@ -2163,8 +2184,6 @@ impl Vecs {
},
)?;
self.file.flush()?;
self.file.punch_holes()?;
Ok(())
}
+12 -2
View File
@@ -77,6 +77,18 @@ impl Vecs {
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.compute_(indexer, indexes, starting_indexes, exit)?;
self.file.flush_then_punch()?;
Ok(())
}
fn compute_(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
let mut height_to_difficultyepoch_iter = indexes.height_to_difficultyepoch.into_iter();
self.indexes_to_difficultyepoch.compute_all(
@@ -135,8 +147,6 @@ impl Vecs {
Some(&indexer.vecs.height_to_difficulty),
)?;
self.file.flush()?;
self.file.punch_holes()?;
Ok(())
}
+13 -2
View File
@@ -372,6 +372,19 @@ impl Vecs {
starting_indexes: &Indexes,
fetched: &fetched::Vecs,
exit: &Exit,
) -> Result<()> {
self.compute_(indexer, indexes, starting_indexes, fetched, exit)?;
self.file.flush_then_punch()?;
Ok(())
}
fn compute_(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
fetched: &fetched::Vecs,
exit: &Exit,
) -> Result<()> {
self.height_to_open_in_cents.compute_transform(
starting_indexes.height,
@@ -1267,8 +1280,6 @@ impl Vecs {
})?;
self.decadeindex_to_ohlc_in_sats.safe_flush(exit)?;
self.file.flush()?;
self.file.punch_holes()?;
Ok(())
}
@@ -1,24 +0,0 @@
use std::collections::BTreeSet;
use brk_structs::TypeIndex;
use derive_deref::{Deref, DerefMut};
use super::ByAddressType;
#[derive(Debug, Deref, DerefMut)]
pub struct AddressTypeToTypeIndexSet(ByAddressType<BTreeSet<TypeIndex>>);
impl Default for AddressTypeToTypeIndexSet {
fn default() -> Self {
Self(ByAddressType {
p2pk65: BTreeSet::default(),
p2pk33: BTreeSet::default(),
p2pkh: BTreeSet::default(),
p2sh: BTreeSet::default(),
p2wpkh: BTreeSet::default(),
p2wsh: BTreeSet::default(),
p2tr: BTreeSet::default(),
p2a: BTreeSet::default(),
})
}
}
+25 -3
View File
@@ -28,7 +28,6 @@ mod address_cohorts;
mod addresstype_to_addresscount;
mod addresstype_to_height_to_addresscount;
mod addresstype_to_indexes_to_addresscount;
mod addresstype_to_typeindex_set;
mod addresstype_to_typeindex_tree;
mod addresstype_to_vec;
mod common;
@@ -520,6 +519,31 @@ impl Vecs {
// Must take ownership as its indexes will be updated for this specific function
starting_indexes: &mut Indexes,
exit: &Exit,
) -> Result<()> {
self.compute_(
indexer,
indexes,
transactions,
price,
market,
starting_indexes,
exit,
)?;
self.file.flush_then_punch()?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn compute_(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
transactions: &transactions::Vecs,
price: Option<&price::Vecs>,
market: &market::Vecs,
// Must take ownership as its indexes will be updated for this specific function
starting_indexes: &mut Indexes,
exit: &Exit,
) -> Result<()> {
let height_to_first_outputindex = &indexer.vecs.height_to_first_outputindex;
let height_to_first_inputindex = &indexer.vecs.height_to_first_inputindex;
@@ -1356,8 +1380,6 @@ impl Vecs {
exit,
)?;
self.file.flush()?;
self.file.punch_holes()?;
Ok(())
}
@@ -53,12 +53,12 @@ impl CohortState {
pub fn increment(&mut self, supply_state: &SupplyState, price: Option<Dollars>) {
self.supply += supply_state;
if supply_state.value > Sats::ZERO {
if let Some(realized) = self.realized.as_mut() {
let price = price.unwrap();
realized.increment(supply_state, price);
self.price_to_amount.increment(price, supply_state);
}
if supply_state.value > Sats::ZERO
&& let Some(realized) = self.realized.as_mut()
{
let price = price.unwrap();
realized.increment(supply_state, price);
self.price_to_amount.increment(price, supply_state);
}
}
@@ -70,23 +70,23 @@ impl CohortState {
) {
self.supply += supply_state;
if supply_state.value > Sats::ZERO {
if let Some(realized) = self.realized.as_mut() {
realized.increment_(realized_cap);
self.price_to_amount.increment(realized_price, supply_state);
}
if supply_state.value > Sats::ZERO
&& let Some(realized) = self.realized.as_mut()
{
realized.increment_(realized_cap);
self.price_to_amount.increment(realized_price, supply_state);
}
}
pub fn decrement(&mut self, supply_state: &SupplyState, price: Option<Dollars>) {
self.supply -= supply_state;
if supply_state.value > Sats::ZERO {
if let Some(realized) = self.realized.as_mut() {
let price = price.unwrap();
realized.decrement(supply_state, price);
self.price_to_amount.decrement(price, supply_state);
}
if supply_state.value > Sats::ZERO
&& let Some(realized) = self.realized.as_mut()
{
let price = price.unwrap();
realized.decrement(supply_state, price);
self.price_to_amount.decrement(price, supply_state);
}
}
@@ -98,11 +98,11 @@ impl CohortState {
) {
self.supply -= supply_state;
if supply_state.value > Sats::ZERO {
if let Some(realized) = self.realized.as_mut() {
realized.decrement_(realized_cap);
self.price_to_amount.decrement(realized_price, supply_state);
}
if supply_state.value > Sats::ZERO
&& let Some(realized) = self.realized.as_mut()
{
realized.decrement_(realized_cap);
self.price_to_amount.decrement(realized_price, supply_state);
}
}
@@ -124,21 +124,22 @@ impl CohortState {
) {
self.supply += supply_state;
if supply_state.value > Sats::ZERO {
if let Some(realized) = self.realized.as_mut() {
let price = price.unwrap();
realized.receive(supply_state, price);
if supply_state.value > Sats::ZERO
&& let Some(realized) = self.realized.as_mut()
{
let price = price.unwrap();
realized.receive(supply_state, price);
if let Some((price, supply)) = price_to_amount_increment
&& supply.value.is_not_zero()
{
self.price_to_amount.increment(price, supply);
}
if let Some((price, supply)) = price_to_amount_decrement
&& supply.value.is_not_zero()
{
self.price_to_amount.decrement(price, supply);
}
if let Some((price, supply)) = price_to_amount_increment
&& supply.value.is_not_zero()
{
self.price_to_amount.increment(price, supply);
}
if let Some((price, supply)) = price_to_amount_decrement
&& supply.value.is_not_zero()
{
self.price_to_amount.decrement(price, supply);
}
}
}
+15 -4
View File
@@ -291,7 +291,7 @@ impl Vecs {
// )?;
let txindex_to_fee = ComputedVecFrom2::forced_import_or_init_from_2(
computation,
Computation::Eager,
&file,
"fee",
version + VERSION + Version::ZERO,
@@ -314,7 +314,7 @@ impl Vecs {
)?;
let txindex_to_feerate = ComputedVecFrom2::forced_import_or_init_from_2(
computation,
Computation::Eager,
&file,
"feerate",
version + VERSION + Version::ZERO,
@@ -719,6 +719,19 @@ impl Vecs {
starting_indexes: &Indexes,
price: Option<&price::Vecs>,
exit: &Exit,
) -> Result<()> {
self.compute_(indexer, indexes, starting_indexes, price, exit)?;
self.file.flush_then_punch()?;
Ok(())
}
fn compute_(
&mut self,
indexer: &Indexer,
indexes: &indexes::Vecs,
starting_indexes: &Indexes,
price: Option<&price::Vecs>,
exit: &Exit,
) -> Result<()> {
self.indexes_to_tx_count.compute_all(
indexer,
@@ -1214,8 +1227,6 @@ impl Vecs {
},
)?;
self.file.flush()?;
self.file.punch_holes()?;
Ok(())
}
+9 -9
View File
@@ -1,5 +1,5 @@
use std::{
fmt::{self, Debug},
fmt::{self, Debug, Display},
io, result, time,
};
@@ -88,14 +88,14 @@ impl<A, B> From<zerocopy::error::SizeError<A, B>> for Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::IO(error) => Debug::fmt(&error, f),
Error::Minreq(error) => Debug::fmt(&error, f),
Error::SerdeJson(error) => Debug::fmt(&error, f),
Error::Vecs(error) => Debug::fmt(&error, f),
Error::BitcoinRPC(error) => Debug::fmt(&error, f),
Error::SystemTimeError(error) => Debug::fmt(&error, f),
Error::Jiff(error) => Debug::fmt(&error, f),
Error::Fjall(error) => Debug::fmt(&error, f),
Error::IO(error) => Display::fmt(&error, f),
Error::Minreq(error) => Display::fmt(&error, f),
Error::SerdeJson(error) => Display::fmt(&error, f),
Error::Vecs(error) => Display::fmt(&error, f),
Error::BitcoinRPC(error) => Display::fmt(&error, f),
Error::SystemTimeError(error) => Display::fmt(&error, f),
Error::Jiff(error) => Display::fmt(&error, f),
Error::Fjall(error) => Display::fmt(&error, f),
Error::ZeroCopyError => write!(f, "ZeroCopy error"),
Error::WrongLength => write!(f, "Wrong length"),
+5 -5
View File
@@ -72,11 +72,11 @@ impl<'a> Interface<'a> {
})
.map(|mut id| {
let mut res = self.vecs.id_to_index_to_vec.get(id.as_str());
if res.is_none() {
if let Ok(index) = Index::try_from(id.as_str()) {
id = index.possible_values().last().unwrap().to_string();
res = self.vecs.id_to_index_to_vec.get(id.as_str())
}
if res.is_none()
&& let Ok(index) = Index::try_from(id.as_str())
{
id = index.possible_values().last().unwrap().to_string();
res = self.vecs.id_to_index_to_vec.get(id.as_str())
}
(id, res)
})
+9 -9
View File
@@ -92,16 +92,16 @@ impl ParamsOpt {
}
pub fn to(&self) -> Option<i64> {
if self.to.is_none() {
if let Some(c) = self.count {
let c = c as i64;
if let Some(f) = self.from {
if f >= 0 || f.abs() > c {
return Some(f + c);
}
} else {
return Some(c);
if self.to.is_none()
&& let Some(c) = self.count
{
let c = c as i64;
if let Some(f) = self.from {
if f >= 0 || f.abs() > c {
return Some(f + c);
}
} else {
return Some(c);
}
}
self.to
+11 -11
View File
@@ -5,7 +5,7 @@ use std::{
path::{Path, PathBuf},
};
use crate::{blk_recap::BlkRecap, BlkIndexToBlkPath, Height};
use crate::{BlkIndexToBlkPath, Height, blk_recap::BlkRecap};
#[derive(Debug)]
pub struct BlkIndexToBlkRecap {
@@ -48,12 +48,12 @@ impl BlkIndexToBlkRecap {
.iter()
.for_each(|(blk_index, blk_path)| {
unprocessed_keys.remove(blk_index);
if let Some(blk_recap) = self.tree.get(blk_index) {
if blk_recap.has_different_modified_time(blk_path) {
self.tree.remove(blk_index).unwrap();
if min_removed_blk_index.is_none_or(|_blk_index| *blk_index < _blk_index) {
min_removed_blk_index.replace(*blk_index);
}
if let Some(blk_recap) = self.tree.get(blk_index)
&& blk_recap.has_different_modified_time(blk_path)
{
self.tree.remove(blk_index).unwrap();
if min_removed_blk_index.is_none_or(|_blk_index| *blk_index < _blk_index) {
min_removed_blk_index.replace(*blk_index);
}
}
});
@@ -85,10 +85,10 @@ impl BlkIndexToBlkRecap {
start = Some(*found.0);
}
if let Some(min_removed) = min_removed {
if start.is_none_or(|start| start > min_removed) {
start = Some(min_removed);
}
if let Some(min_removed) = min_removed
&& start.is_none_or(|start| start > min_removed)
{
start = Some(min_removed);
}
// Should only be none if asking for a too high start
+1 -7
View File
@@ -97,17 +97,11 @@ fn req_to_response_res(
s.into_response()
}
Output::Json(v) => {
let json = match v {
brk_interface::Value::Single(v) => serde_json::to_vec(&v)?,
brk_interface::Value::List(v) => serde_json::to_vec(&v)?,
brk_interface::Value::Matrix(v) => serde_json::to_vec(&v)?,
};
let json = serde_json::to_vec(&v)?;
if let GuardResult::Guard(g) = guard_res {
g.insert(json.clone().into())
.map_err(|_| Error::QuickCacheError)?;
}
json.into_response()
}
}
+11 -11
View File
@@ -102,23 +102,23 @@ impl HeaderMapExtended for HeaderMap {
.to_zoned(TimeZone::UTC)
.datetime();
if let Some(if_modified_since) = self.get_if_modified_since() {
if if_modified_since == date {
return Ok((ModifiedState::NotModifiedSince, date));
}
if let Some(if_modified_since) = self.get_if_modified_since()
&& if_modified_since == date
{
return Ok((ModifiedState::NotModifiedSince, date));
}
Ok((ModifiedState::ModifiedSince, date))
}
fn get_if_modified_since(&self) -> Option<DateTime> {
if let Some(modified_since) = self.get(IF_MODIFIED_SINCE) {
if let Ok(modified_since) = modified_since.to_str() {
return strtime::parse(MODIFIED_SINCE_FORMAT, modified_since)
.unwrap()
.to_datetime()
.ok();
}
if let Some(modified_since) = self.get(IF_MODIFIED_SINCE)
&& let Ok(modified_since) = modified_since.to_str()
{
return strtime::parse(MODIFIED_SINCE_FORMAT, modified_since)
.unwrap()
.to_datetime()
.ok();
}
None
+1 -1
View File
@@ -88,7 +88,7 @@ where
})
}
pub fn get(&self, key: &'a K) -> Result<Option<Cow<V>>> {
pub fn get(&'_ self, key: &'a K) -> Result<Option<Cow<'_, V>>> {
if let Some(v) = self.puts.get(key) {
Ok(Some(Cow::Borrowed(v)))
} else if let Some(slice) = self
+2 -1
View File
@@ -83,7 +83,7 @@ fn main() -> Result<()> {
);
}
file.write_all_to_region_at(region1_i.into(), &[1], 18)?;
file.write_all_to_region_at(region1_i.into(), &[0, 0, 0, 0, 0, 1], 13)?;
{
let region = file.get_region(region1_i.into())?;
@@ -118,6 +118,7 @@ fn main() -> Result<()> {
println!("Disk usage - post sync: {}", file.disk_usage());
file.truncate_region(region1_i.into(), 10)?;
file.punch_holes()?;
{
let region = file.get_region(region1_i.into())?;
+17 -6
View File
@@ -1,6 +1,6 @@
use std::{
fmt::{self, Debug},
io, result, time,
fmt::{self, Debug, Display},
fs, io, result, time,
};
use crate::Version;
@@ -10,6 +10,7 @@ pub type Result<T, E = Error> = result::Result<T, E>;
#[derive(Debug)]
pub enum Error {
IO(io::Error),
TryLockError(fs::TryLockError),
SerdeJson(serde_json::Error),
SystemTimeError(time::SystemTimeError),
PCO(pco::errors::PcoError),
@@ -38,6 +39,12 @@ impl From<io::Error> for Error {
}
}
impl From<fs::TryLockError> for Error {
fn from(value: fs::TryLockError) -> Self {
Self::TryLockError(value)
}
}
impl From<pco::errors::PcoError> for Error {
fn from(value: pco::errors::PcoError) -> Self {
Self::PCO(value)
@@ -65,10 +72,14 @@ impl From<serde_json::Error> for Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::IO(error) => Debug::fmt(&error, f),
Error::PCO(error) => Debug::fmt(&error, f),
Error::SystemTimeError(error) => Debug::fmt(&error, f),
Error::SerdeJson(error) => Debug::fmt(&error, f),
Error::IO(error) => Display::fmt(&error, f),
Error::TryLockError(_) => write!(
f,
"Couldn't lock file. It must be already opened by another process."
),
Error::PCO(error) => Display::fmt(&error, f),
Error::SystemTimeError(error) => Display::fmt(&error, f),
Error::SerdeJson(error) => Display::fmt(&error, f),
Error::ZeroCopyError => write!(f, "ZeroCopy error"),
Error::WrongEndian => write!(f, "Wrong endian"),
+64 -72
View File
@@ -6,7 +6,6 @@ use std::{
};
use libc::off_t;
use log::info;
use memmap2::{MmapMut, MmapOptions};
use parking_lot::{RwLock, RwLockReadGuard};
@@ -52,6 +51,7 @@ impl File {
.write(true)
.truncate(false)
.open(Self::data_path_(path))?;
file.try_lock()?;
let mmap = Self::create_mmap(&file)?;
@@ -201,7 +201,7 @@ impl File {
// Write to reserved space if possible
if new_len <= reserved {
// println!(
// info!(
// "Write {data_len} bytes to {region_index} reserved space at {write_start} (start = {start}, at = {at:?}, len = {len})"
// );
@@ -233,7 +233,7 @@ impl File {
// If is last continue writing
if layout.is_last_anything(region_index) {
// println!("{region_index} Append to file at {write_start}");
// info!("{region_index} Append to file at {write_start}");
self.set_min_len(start + new_reserved)?;
let mut region = region_lock.write();
@@ -256,7 +256,7 @@ impl File {
.get_hole(hole_start)
.is_some_and(|gap| gap >= added_reserve)
{
// println!("Expand {region_index} to hole");
// info!("Expand {region_index} to hole");
layout.remove_or_compress_hole(hole_start, added_reserve);
let mut region = region_lock.write();
@@ -275,7 +275,7 @@ impl File {
// Find hole big enough to move the region
if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
// println!("Move {region_index} to hole at {hole_start}");
// info!("Move {region_index} to hole at {hole_start}");
layout.remove_or_compress_hole(hole_start, new_reserved);
drop(layout);
@@ -301,7 +301,7 @@ impl File {
let new_start = layout.len(&regions);
// Write at the end
// println!(
// info!(
// "Move {region_index} to the end, from {start}..{} to {new_start}..{}",
// start + reserved,
// new_start + new_reserved
@@ -349,37 +349,20 @@ impl File {
///
/// From relative to start
///
/// DO NOT call any `write_all` function right after as there could be a race condition if hole punching happens
/// Non destructive
///
pub fn truncate_region(&self, identifier: Identifier, from: u64) -> Result<()> {
let Some(region) = self.regions.read().get_region(identifier.clone()) else {
return Err(Error::Str("Unknown region"));
};
let mut region_ = region.write();
let start = region_.start();
let len = region_.len();
let reserved = region_.reserved();
// dbg!(from, start);
if from == len {
return Ok(());
} else if from > len {
return Err(Error::Str("Truncating further than length"));
}
region_.set_len(from);
let end = start + reserved;
let start = Self::ceil_number_to_page_size_multiple(start + from);
if start > end {
unreachable!("Should not be possible");
} else if start < end {
let length = end - start;
// if length > PAGE_SIZE {
self.punch_hole(start, length)?;
// }
}
Ok(())
}
@@ -400,8 +383,6 @@ impl File {
layout.remove_region(index, &region_)?;
self.punch_hole(region_.start(), region_.reserved())?;
drop(region_);
Ok(Some(region))
@@ -456,56 +437,71 @@ impl File {
regions.flush()
}
/// Do not write, right after as there might be a race condition
pub fn punch_holes(&self) -> Result<()> {
let file = self.file.write();
let mmap = self.mmap.read();
let regions = self.regions.read();
let layout = self.layout.read();
Self::punch_holes_(&file, &mmap, &regions, &layout)
pub fn flush_then_punch(&self) -> Result<()> {
self.flush()?;
self.punch_holes()
}
fn punch_holes_(
file: &fs::File,
mmap: &MmapMut,
regions: &Regions,
layout: &Layout,
) -> Result<()> {
regions
pub fn punch_holes(&self) -> Result<()> {
let file = self.file.write();
let mut mmap = self.mmap.write();
let regions = self.regions.read();
let layout = self.layout.read();
let mut punched = regions
.index_to_region()
.iter()
.par_iter()
.flatten()
.try_for_each(|region_lock| -> Result<()> {
.map(|region_lock| -> Result<usize> {
let region = region_lock.read();
let start = region.start();
let rstart = region.start();
let len = region.len();
let reserved = region.reserved();
let ceil_len = Self::ceil_number_to_page_size_multiple(len);
assert!(len <= ceil_len);
let reserved = region.reserved();
if ceil_len > reserved {
panic!()
} else if ceil_len < reserved {
let start = start + ceil_len;
let start = rstart + ceil_len;
let hole = reserved - ceil_len;
if Self::approx_has_punchable_data(mmap, start, hole) {
info!("Punching a hole of {hole} bytes at {start}...");
Self::punch_hole_(file, start, hole)?;
if Self::approx_has_punchable_data(&mmap, start, hole) {
// info!(
// "dbg: {:?}",
// (region, rstart, len, ceil_len, reserved, start, hole)
// );
// info!("Punching a hole of {hole} bytes at {start}...");
Self::punch_hole(&file, start, hole)?;
return Ok(1);
}
}
Ok(())
})?;
Ok(0)
})
.sum::<Result<usize>>()?;
layout
punched += layout
.start_to_hole()
.par_iter()
.try_for_each(|(&start, &hole)| -> Result<()> {
if Self::approx_has_punchable_data(mmap, start, hole) {
info!("Punching a hole of {hole} bytes at {start}...");
Self::punch_hole_(file, start, hole)
.map(|(&start, &hole)| -> Result<usize> {
if Self::approx_has_punchable_data(&mmap, start, hole) {
// info!("dbg: {:?}", (start, hole));
// info!("Punching a hole of {hole} bytes at {start}...");
Self::punch_hole(&file, start, hole)?;
Ok(1)
} else {
Ok(())
Ok(0)
}
})
.sum::<Result<usize>>()?;
if punched > 0 {
// info!("Remaping post hole punching...");
unsafe {
libc::fsync(file.as_raw_fd());
}
*mmap = Self::create_mmap(&file)?;
}
Ok(())
}
fn approx_has_punchable_data(mmap: &MmapMut, start: u64, len: u64) -> bool {
@@ -517,7 +513,15 @@ impl File {
let check = |start, end| {
assert!(start >= min);
assert!(end < max);
mmap[start] != 0 || mmap[end] != 0
let start_is_some = mmap[start] != 0;
// if start_is_some {
// info!("mmap[start = {}] = {}", start, mmap[start])
// }
let end_is_some = mmap[end] != 0;
// if end_is_some {
// info!("mmap[end = {}] = {}", end, mmap[end])
// }
start_is_some || end_is_some
};
// Check first page (first and last byte)
@@ -551,20 +555,8 @@ impl File {
false
}
#[inline]
fn punch_hole(&self, start: u64, length: u64) -> Result<()> {
let file = self.file.write();
Self::punch_hole_(&file, start, length)
}
#[inline]
fn punch_hole_(file: &fs::File, start: u64, length: u64) -> Result<()> {
// println!("Punching {length} bytes hole at {start}");
Self::punch_hole_impl(file, start, length)
}
#[cfg(target_os = "macos")]
fn punch_hole_impl(file: &fs::File, start: u64, length: u64) -> Result<()> {
fn punch_hole(file: &fs::File, start: u64, length: u64) -> Result<()> {
let fpunchhole = FPunchhole {
fp_flags: 0,
reserved: 0,
@@ -589,7 +581,7 @@ impl File {
}
#[cfg(target_os = "linux")]
fn punch_hole_impl(file: &fs::File, start: u64, length: u64) -> Result<()> {
fn punch_hole(file: &fs::File, start: u64, length: u64) -> Result<()> {
let result = unsafe {
libc::fallocate(
file.as_raw_fd(),
@@ -608,7 +600,7 @@ impl File {
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
fn punch_hole_impl(_file: &fs::File, _start: u64, _length: u64) -> Result<()> {
fn punch_hole(_file: &fs::File, _start: u64, _length: u64) -> Result<()> {
Err(Error::String(
"Hole punching not supported on this platform".to_string(),
))
+1 -1
View File
@@ -68,7 +68,7 @@ impl Region {
}
pub trait RegionReader {
fn create_reader(self, file: &File) -> Reader;
fn create_reader(self, file: &'_ File) -> Reader<'_>;
}
impl<'a> RegionReader for RwLockReadGuard<'a, Region> {
+1
View File
@@ -43,6 +43,7 @@ impl Regions {
.write(true)
.truncate(false)
.open(path.join("index_to_region"))?;
index_to_region_file.try_lock()?;
let index_to_region_mmap = unsafe { MmapMut::map_mut(&index_to_region_file)? };
+6 -4
View File
@@ -25,7 +25,7 @@ where
///
/// You'll want to drop the reader before mutable ops
///
fn create_reader(&self) -> Reader {
fn create_reader(&'_ self) -> Reader<'_> {
self.create_static_reader()
}
@@ -55,11 +55,11 @@ where
fn read_(&self, index: usize, reader: &Reader) -> Result<T>;
#[inline]
fn get_or_read(&self, index: I, reader: &Reader) -> Result<Option<Cow<T>>> {
fn get_or_read(&'_ self, index: I, reader: &Reader) -> Result<Option<Cow<'_, T>>> {
self.get_or_read_(index.to_usize()?, reader)
}
#[inline]
fn get_or_read_(&self, index: usize, reader: &Reader) -> Result<Option<Cow<T>>> {
fn get_or_read_(&'_ self, index: usize, reader: &Reader) -> Result<Option<Cow<'_, T>>> {
let stored_len = self.stored_len();
let holes = self.holes();
@@ -136,7 +136,9 @@ where
}
}
if self.pushed_len() * Self::SIZE_OF_T >= MAX_CACHE_SIZE {
let pushed_bytes = self.pushed_len() * Self::SIZE_OF_T;
if pushed_bytes >= MAX_CACHE_SIZE {
// info!("pushed_bytes ({pushed_bytes}) >= MAX_CACHE_SIZE ({MAX_CACHE_SIZE})");
self.safe_flush(exit)?;
}
+6 -1
View File
@@ -1,10 +1,14 @@
use crate::{AnyVec, Exit, File, Result, Stamp, variants::Header};
use parking_lot::RwLock;
use crate::{AnyVec, Exit, File, Result, Stamp, file::Region, variants::Header};
pub trait AnyStoredVec: AnyVec {
fn file(&self) -> &File;
fn region_index(&self) -> usize;
fn region(&self) -> &RwLock<Region>;
fn header(&self) -> &Header;
fn mut_header(&mut self) -> &mut Header;
@@ -13,6 +17,7 @@ pub trait AnyStoredVec: AnyVec {
#[inline]
fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
// info!("safe flush {}", self.name());
let _lock = exit.lock();
self.flush()
}
+16 -5
View File
@@ -12,6 +12,7 @@ use crate::{
AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, AsInnerSlice, BaseVecIterator,
BoxedVecIterator, CollectableVec, Error, File, Format, FromInnerSlice, GenericStoredVec,
HEADER_OFFSET, Header, RawVec, Reader, Result, StoredCompressed, StoredIndex, Version,
file::Region,
};
mod page;
@@ -58,7 +59,7 @@ where
}
pub fn import(file: &Arc<File>, name: &str, version: Version) -> Result<Self> {
let inner = RawVec::import_(file, name, version, Format::Compressed)?;
let inner = RawVec::any_import(file, name, version, Format::Compressed)?;
let pages = Pages::import(file, &Self::pages_region_name_(name))?;
@@ -191,6 +192,10 @@ where
self.inner.file()
}
fn region(&self) -> &RwLock<Region> {
self.inner.region()
}
fn region_index(&self) -> usize {
self.inner.region_index()
}
@@ -212,6 +217,7 @@ where
self.inner.write_header_if_needed()?;
if self.is_pushed_empty() {
// info!("Nothing to push {}", self.region_index());
return Ok(());
}
@@ -232,7 +238,12 @@ where
values = Self::decode_page_(stored_len, last_page_index, &reader, &pages)
.inspect_err(|_| {
dbg!(last_page_index, &pages);
dbg!((
last_page_index,
&pages,
self.region_index(),
&self.region().read()
));
})
.unwrap();
@@ -271,8 +282,10 @@ where
let file = self.file();
if let Some(truncate_at) = truncate_at {
// info!("truncate_write_all_to_region {}", self.region_index());
file.truncate_write_all_to_region(self.region_index().into(), truncate_at, &buf)?;
} else {
// info!("write_all_to_region {}", self.region_index());
file.write_all_to_region(self.region_index().into(), &buf)?;
}
@@ -379,9 +392,7 @@ where
pages.flush(file)?;
file.truncate_region(self.region_index().into(), from)?;
file.write_all_to_region(self.region_index().into(), &buf)?;
file.truncate_write_all_to_region(self.region_index().into(), from, &buf)?;
Ok(())
}
+20 -14
View File
@@ -9,11 +9,13 @@ use std::{
};
use log::info;
use parking_lot::RwLock;
use crate::{
AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, BoxedVecIterator, CheckedSub,
CollectableVec, Exit, File, Format, GenericStoredVec, Reader, Result, StoredCompressed,
StoredIndex, StoredRaw, StoredVec, StoredVecIterator, VecIterator, Version, variants::Header,
StoredIndex, StoredRaw, StoredVec, StoredVecIterator, VecIterator, Version, file::Region,
variants::Header,
};
#[derive(Debug, Clone)]
@@ -960,19 +962,6 @@ where
}
}
impl<'a, I, T> IntoIterator for &'a EagerVec<I, T>
where
I: StoredIndex,
T: StoredCompressed,
{
type Item = (I, Cow<'a, T>);
type IntoIter = StoredVecIterator<'a, I, T>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl<I, T> AnyVec for EagerVec<I, T>
where
I: StoredIndex,
@@ -1017,6 +1006,10 @@ where
self.0.region_index()
}
fn region(&self) -> &RwLock<Region> {
self.0.region()
}
fn header(&self) -> &Header {
self.0.header()
}
@@ -1082,6 +1075,19 @@ where
}
}
impl<'a, I, T> IntoIterator for &'a EagerVec<I, T>
where
I: StoredIndex,
T: StoredCompressed,
{
type Item = (I, Cow<'a, T>);
type IntoIter = StoredVecIterator<'a, I, T>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl<I, T> AnyIterableVec<I, T> for EagerVec<I, T>
where
I: StoredIndex,
+13 -4
View File
@@ -61,10 +61,15 @@ where
}
pub fn import(file: &Arc<File>, name: &str, version: Version) -> Result<Self> {
Self::import_(file, name, version, Format::Raw)
Self::any_import(file, name, version, Format::Raw)
}
pub fn import_(file: &Arc<File>, name: &str, version: Version, format: Format) -> Result<Self> {
pub fn any_import(
file: &Arc<File>,
name: &str,
version: Version,
format: Format,
) -> Result<Self> {
let (region_index, region) = file.create_region_if_needed(&Self::vec_region_name_(name))?;
let region_len = region.read().len() as usize;
@@ -239,13 +244,13 @@ where
let (holes_index, _) = self
.file
.create_region_if_needed(&self.holes_region_name())?;
self.file.truncate_region(holes_index.into(), 0)?;
let bytes = self
.holes
.iter()
.flat_map(|i| i.to_ne_bytes())
.collect::<Vec<_>>();
self.file.write_all_to_region(holes_index.into(), &bytes)?;
self.file
.truncate_write_all_to_region(holes_index.into(), 0, &bytes)?;
} else if had_holes {
self.has_stored_holes = false;
let _ = self.file.remove_region(self.holes_region_name().into());
@@ -262,6 +267,10 @@ where
fn region_index(&self) -> usize {
self.region_index
}
fn region(&self) -> &RwLock<Region> {
&self.region
}
}
impl<I, T> GenericStoredVec<I, T> for RawVec<I, T>
@@ -46,10 +46,10 @@ impl Format {
}
pub fn validate(&self, path: &Path) -> Result<()> {
if let Ok(prev_compressed) = Format::try_from(path) {
if prev_compressed != *self {
return Err(Error::DifferentCompressionMode);
}
if let Ok(prev_compressed) = Format::try_from(path)
&& prev_compressed != *self
{
return Err(Error::DifferentCompressionMode);
}
Ok(())
+10 -1
View File
@@ -7,7 +7,7 @@ use std::{
use crate::{
AnyCollectableVec, AnyIterableVec, AnyStoredVec, AnyVec, BaseVecIterator, BoxedVecIterator,
CollectableVec, File, GenericStoredVec, Header, Result, StoredCompressed, StoredIndex, Version,
file::Reader,
file::{Reader, Region},
};
use super::{CompressedVec, CompressedVecIterator, RawVec, RawVecIterator};
@@ -15,6 +15,7 @@ use super::{CompressedVec, CompressedVecIterator, RawVec, RawVecIterator};
mod format;
pub use format::*;
use parking_lot::RwLock;
#[derive(Debug, Clone)]
pub enum StoredVec<I, T> {
@@ -105,6 +106,14 @@ where
}
}
#[inline]
fn region(&self) -> &RwLock<Region> {
match self {
StoredVec::Raw(v) => v.region(),
StoredVec::Compressed(v) => v.region(),
}
}
#[inline]
fn header(&self) -> &Header {
match self {