indexer: speed

This commit is contained in:
nym21
2025-10-19 21:18:15 +02:00
parent 71078b5bdd
commit e9f6295014
27 changed files with 248 additions and 288 deletions
Generated
+6 -8
View File
@@ -613,7 +613,7 @@ version = "0.0.111"
dependencies = [
"bitcoin",
"bitcoincore-rpc",
"fjall 2.11.2",
"fjall 2.11.3",
"fjall 3.0.0-pre.1",
"jiff",
"minreq",
@@ -658,7 +658,7 @@ dependencies = [
"brk_store",
"brk_structs",
"brk_traversable",
"fjall 2.11.2",
"fjall 2.11.3",
"fjall 3.0.0-pre.1",
"log",
"rayon",
@@ -816,7 +816,7 @@ dependencies = [
"brk_structs",
"byteview 0.6.1",
"byteview 0.8.0",
"fjall 2.11.2",
"fjall 2.11.3",
"fjall 3.0.0-pre.1",
"log",
"parking_lot 0.12.5",
@@ -1579,9 +1579,7 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]]
name = "fjall"
version = "2.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b25ad44cd4360a0448a9b5a0a6f1c7a621101cca4578706d43c9a821418aebc"
version = "2.11.3"
dependencies = [
"byteorder",
"byteview 0.6.1",
@@ -3216,9 +3214,9 @@ checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3"
[[package]]
name = "pco"
version = "0.4.6"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab068b64f2c6f074cbdcafc80ebd83a27da92a3848deba2fabc21eba6691fc65"
checksum = "daea1197f2969fab4d5c6620eade5d46c98a8e9b04ad2bc3725fc5dfc4eb8a49"
dependencies = [
"better_io",
"dtype_dispatch",
+5 -18
View File
@@ -23,23 +23,9 @@ panic = "abort"
strip = true
overflow-checks = false
[profile.profiling]
inherits = "release"
debug = true
[profile.dist]
inherits = "release"
[profile.clippy]
inherits = "dev"
lto = "off"
codegen-units = 256
opt-level = 0
debug = false
overflow-checks = false
panic = "abort"
debug-assertions = false
[workspace.dependencies]
aide = { version = "0.15.2", features = ["axum-json", "axum-query"], package = "brk-aide" }
allocative = { version = "0.3.4", features = ["parking_lot"] }
@@ -67,10 +53,11 @@ brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_d
byteview = "=0.6.1"
# byteview = "~0.8.0"
derive_deref = "1.1.1"
fjall_v2 = { version = "2.11.2", package = "fjall" }
# fjall_v3 = { version = "=3.0.0-pre.0", package = "fjall" }
fjall_v3 = { path = "../fjall", package = "fjall" }
# fjall_v3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "bb15057500dce3115d7644d268b9deeaa895b431", package = "fjall" }
fjall2 = { path = "../fjall2", package = "fjall" }
# fjall2 = { version = "2.11.2", package = "fjall" }
# fjall3 = { version = "=3.0.0-pre.0", package = "fjall" }
fjall3 = { path = "../fjall3", package = "fjall" }
# fjall3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "bb15057500dce3115d7644d268b9deeaa895b431", package = "fjall" }
jiff = "0.2.15"
log = "0.4.28"
minreq = { version = "2.14.1", features = ["https", "serde_json"] }
+20 -21
View File
@@ -5,8 +5,8 @@ use brk_error::Result;
use brk_indexer::Indexer;
use brk_structs::{
Bitcoin, CheckedSub, DateIndex, DecadeIndex, DifficultyEpoch, Dollars, FeeRate, HalvingEpoch,
Height, TxInIndex, MonthIndex, ONE_DAY_IN_SEC_F64, TxOutIndex, QuarterIndex, Sats,
SemesterIndex, StoredBool, StoredF32, StoredF64, StoredU32, StoredU64, Timestamp, TxIndex,
Height, MonthIndex, ONE_DAY_IN_SEC_F64, QuarterIndex, Sats, SemesterIndex, StoredBool,
StoredF32, StoredF64, StoredU32, StoredU64, Timestamp, TxInIndex, TxIndex, TxOutIndex,
TxVersion, Version, WeekIndex, Weight, YearIndex,
};
use brk_traversable::Traversable;
@@ -98,8 +98,7 @@ pub struct Vecs {
pub indexes_to_tx_vsize: ComputedVecsFromTxindex<StoredU64>,
pub indexes_to_tx_weight: ComputedVecsFromTxindex<Weight>,
pub indexes_to_unknownoutput_count: ComputedVecsFromHeight<StoredU64>,
pub txinindex_to_value:
LazyVecFrom2<TxInIndex, Sats, TxInIndex, TxOutIndex, TxOutIndex, Sats>,
pub txinindex_to_value: LazyVecFrom2<TxInIndex, Sats, TxInIndex, TxOutIndex, TxOutIndex, Sats>,
pub indexes_to_input_count: ComputedVecsFromTxindex<StoredU64>,
pub txindex_to_is_coinbase: LazyVecFrom2<TxIndex, StoredBool, TxIndex, Height, Height, TxIndex>,
pub indexes_to_output_count: ComputedVecsFromTxindex<StoredU64>,
@@ -165,21 +164,21 @@ impl Vecs {
indexer.vecs.txinindex_to_txoutindex.boxed_clone(),
indexer.vecs.txoutindex_to_value.boxed_clone(),
|index: TxInIndex, txinindex_to_txoutindex_iter, txoutindex_to_value_iter| {
txinindex_to_txoutindex_iter
.next_at(index.unwrap_to_usize())
.map(|(txinindex, txoutindex)| {
txinindex_to_txoutindex_iter.next_at(index.to_usize()).map(
|(txinindex, txoutindex)| {
let txoutindex = txoutindex.into_owned();
if txoutindex == TxOutIndex::COINBASE {
Sats::ZERO
} else if let Some((_, value)) =
txoutindex_to_value_iter.next_at(txoutindex.unwrap_to_usize())
txoutindex_to_value_iter.next_at(txoutindex.to_usize())
{
value.into_owned()
} else {
dbg!(txinindex, txoutindex);
panic!()
}
})
},
)
},
);
@@ -189,7 +188,7 @@ impl Vecs {
indexer.vecs.txindex_to_base_size.boxed_clone(),
indexer.vecs.txindex_to_total_size.boxed_clone(),
|index: TxIndex, txindex_to_base_size_iter, txindex_to_total_size_iter| {
let index = index.unwrap_to_usize();
let index = index.to_usize();
txindex_to_base_size_iter
.next_at(index)
.map(|(_, base_size)| {
@@ -213,7 +212,7 @@ impl Vecs {
version + Version::ZERO,
txindex_to_weight.boxed_clone(),
|index: TxIndex, iter| {
let index = index.unwrap_to_usize();
let index = index.to_usize();
iter.next_at(index).map(|(_, weight)| {
StoredU64::from(
bitcoin::Weight::from(weight.into_owned()).to_vbytes_ceil() as usize
@@ -229,11 +228,11 @@ impl Vecs {
indexer.vecs.height_to_first_txindex.boxed_clone(),
|index: TxIndex, txindex_to_height_iter, height_to_first_txindex_iter| {
txindex_to_height_iter
.next_at(index.unwrap_to_usize())
.next_at(index.to_usize())
.map(|(_, height)| {
let height = height.into_owned();
let txindex = height_to_first_txindex_iter
.next_at(height.unwrap_to_usize())
.next_at(height.to_usize())
.unwrap()
.1
.into_owned();
@@ -252,7 +251,7 @@ impl Vecs {
txindex_to_first_txinindex_iter,
txindex_to_input_count_iter,
txinindex_to_value_iter| {
let txindex = index.unwrap_to_usize();
let txindex = index.to_usize();
txindex_to_first_txinindex_iter
.next_at(txindex)
.map(|(_, first_index)| {
@@ -299,7 +298,7 @@ impl Vecs {
txindex_to_first_txoutindex_iter,
txindex_to_output_count_iter,
txoutindex_to_value_iter| {
let txindex = index.unwrap_to_usize();
let txindex = index.to_usize();
txindex_to_first_txoutindex_iter
.next_at(txindex)
.map(|(_, first_index)| {
@@ -1539,14 +1538,15 @@ impl Vecs {
|(height, txindex, ..)| {
let first_txoutindex = txindex_to_first_txoutindex_iter
.unwrap_get_inner(txindex)
.unwrap_to_usize();
.to_usize();
let output_count = txindex_to_output_count_iter.unwrap_get_inner(txindex);
let mut sats = Sats::ZERO;
(first_txoutindex..first_txoutindex + usize::from(output_count))
.for_each(|txoutindex| {
(first_txoutindex..first_txoutindex + usize::from(output_count)).for_each(
|txoutindex| {
sats += txoutindex_to_value_iter
.unwrap_get_inner(TxOutIndex::from(txoutindex));
});
},
);
(height, sats)
},
exit,
@@ -1624,8 +1624,7 @@ impl Vecs {
self.indexes_to_subsidy.sats.height.as_ref().unwrap(),
|(height, subsidy, ..)| {
let halving = HalvingEpoch::from(height);
let expected =
Sats::FIFTY_BTC / 2_usize.pow(halving.unwrap_to_usize() as u32);
let expected = Sats::FIFTY_BTC / 2_usize.pow(halving.to_usize() as u32);
(height, expected.checked_sub(subsidy).unwrap())
},
exit,
+2 -3
View File
@@ -105,7 +105,7 @@ impl Vecs {
.try_for_each(|(i, v)| -> Result<()> {
let d = v.into_owned();
if prev.is_none() {
let i = i.unwrap_to_usize();
let i = i.to_usize();
prev.replace(if i > 0 {
self.dateindex_to_price_ohlc_in_cents
.into_iter()
@@ -115,8 +115,7 @@ impl Vecs {
});
}
let ohlc = if i.unwrap_to_usize() + 100
>= self.dateindex_to_price_ohlc_in_cents.len()
let ohlc = if i.to_usize() + 100 >= self.dateindex_to_price_ohlc_in_cents.len()
&& let Ok(mut ohlc) = self.fetcher.get_date(d)
{
let prev_open = *prev.as_ref().unwrap().close;
@@ -68,7 +68,7 @@ where
.map_or_else(|| source.as_ref().unwrap().clone(), |v| v.clone()),
len_source.clone(),
|i: I, source, len_source| {
if i.unwrap_to_usize() >= len_source.len() {
if i.to_usize() >= len_source.len() {
return None;
}
source
@@ -95,7 +95,7 @@ where
),
len_source.clone(),
|i: I, source, len_source| {
if i.unwrap_to_usize() >= len_source.len() {
if i.to_usize() >= len_source.len() {
return None;
}
source
@@ -114,7 +114,7 @@ where
.map_or_else(|| source.as_ref().unwrap().clone(), |v| v.clone()),
len_source.clone(),
|i: I, source, len_source| {
if i.unwrap_to_usize() >= len_source.len() {
if i.to_usize() >= len_source.len() {
return None;
}
S1I::inclusive_range_from(i, source.len())
@@ -133,7 +133,7 @@ where
.map_or_else(|| source.as_ref().unwrap().clone(), |v| v.clone()),
len_source.clone(),
|i: I, source, len_source| {
if i.unwrap_to_usize() >= len_source.len() {
if i.to_usize() >= len_source.len() {
return None;
}
S1I::inclusive_range_from(i, source.len())
@@ -152,7 +152,7 @@ where
.map_or_else(|| source.as_ref().unwrap().clone(), |v| v.clone()),
len_source.clone(),
|i: I, source, len_source| {
if i.unwrap_to_usize() >= len_source.len() {
if i.to_usize() >= len_source.len() {
return None;
}
let vec = S1I::inclusive_range_from(i, source.len())
@@ -182,7 +182,7 @@ where
.map_or_else(|| source.as_ref().unwrap().clone(), |v| v.clone()),
len_source.clone(),
|i: I, source, len_source| {
if i.unwrap_to_usize() >= len_source.len() {
if i.to_usize() >= len_source.len() {
return None;
}
let vec = S1I::inclusive_range_from(i, source.len())
@@ -204,7 +204,7 @@ where
source_extra.cumulative.as_ref().unwrap().boxed_clone(),
len_source.clone(),
|i: I, source, len_source| {
if i.unwrap_to_usize() >= len_source.len() {
if i.to_usize() >= len_source.len() {
return None;
}
source
@@ -249,7 +249,7 @@ impl ComputedVecsFromTxindex<Bitcoin> {
let starting_index = self.height.starting_index(starting_indexes.height);
(starting_index.unwrap_to_usize()..indexer.vecs.height_to_weight.len())
(starting_index.to_usize()..indexer.vecs.height_to_weight.len())
.map(Height::from)
.try_for_each(|height| -> Result<()> {
if let Some(first) = self.height.first.as_mut() {
@@ -430,7 +430,7 @@ impl ComputedVecsFromTxindex<Dollars> {
let mut close_iter = price.chainindexes_to_price_close.height.into_iter();
(starting_index.unwrap_to_usize()..indexer.vecs.height_to_weight.len())
(starting_index.to_usize()..indexer.vecs.height_to_weight.len())
.map(Height::from)
.try_for_each(|height| -> Result<()> {
let price = *close_iter.unwrap_get_inner(height);
@@ -384,8 +384,8 @@ impl ComputedRatioVecsFromDateIndex {
.min(starting_indexes.dateindex);
let mut sorted = self.ratio.dateindex.as_ref().unwrap().collect_range(
Some(min_ratio_date.unwrap_to_usize()),
Some(starting_dateindex.unwrap_to_usize()),
Some(min_ratio_date.to_usize()),
Some(starting_dateindex.to_usize()),
);
sorted.sort_unstable();
@@ -477,8 +477,8 @@ impl ComputedStandardDeviationVecsFromDateIndex {
.min(starting_indexes.dateindex);
let mut sorted = source.collect_range(
Some(min_date.unwrap_to_usize()),
Some(starting_dateindex.unwrap_to_usize()),
Some(min_date.to_usize()),
Some(starting_dateindex.to_usize()),
);
sorted.sort_unstable();
@@ -551,8 +551,7 @@ impl ComputedStandardDeviationVecsFromDateIndex {
let avg = sma_iter.unwrap_get_inner(index);
let population =
index.checked_sub(min_date).unwrap().unwrap_to_usize() as f32 + 1.0;
let population = index.checked_sub(min_date).unwrap().to_usize() as f32 + 1.0;
let sd = StoredF32::from(
(sorted.iter().map(|v| (**v - *avg).powi(2)).sum::<f32>() / population)
@@ -58,7 +58,7 @@ impl ComputedValueVecsFromTxindex {
version + VERSION,
source_vec.map_or_else(|| sats.txindex.as_ref().unwrap().boxed_clone(), |s| s),
|txindex: TxIndex, iter| {
iter.next_at(txindex.unwrap_to_usize()).map(|(_, value)| {
iter.next_at(txindex.to_usize()).map(|(_, value)| {
let sats = value.into_owned();
Bitcoin::from(sats)
})
@@ -85,7 +85,7 @@ impl ComputedValueVecsFromTxindex {
txindex_to_btc_iter,
txindex_to_height_iter,
height_to_price_close_iter| {
let txindex = txindex.unwrap_to_usize();
let txindex = txindex.to_usize();
txindex_to_btc_iter.next_at(txindex).and_then(|(_, value)| {
let btc = value.into_owned();
txindex_to_height_iter
@@ -93,7 +93,7 @@ impl ComputedValueVecsFromTxindex {
.and_then(|(_, value)| {
let height = value.into_owned();
height_to_price_close_iter
.next_at(height.unwrap_to_usize())
.next_at(height.to_usize())
.map(|(_, close)| *close.into_owned() * btc)
})
})
+7 -7
View File
@@ -4,11 +4,11 @@ use brk_error::Result;
use brk_indexer::Indexer;
use brk_structs::{
Date, DateIndex, DecadeIndex, DifficultyEpoch, EmptyOutputIndex, HalvingEpoch, Height,
TxInIndex, MonthIndex, OpReturnIndex, TxOutIndex, P2AAddressIndex, P2ABytes, P2MSOutputIndex,
P2PK33AddressIndex, P2PK33Bytes, P2PK65AddressIndex, P2PK65Bytes, P2PKHAddressIndex,
P2PKHBytes, P2SHAddressIndex, P2SHBytes, P2TRAddressIndex, P2TRBytes, P2WPKHAddressIndex,
P2WPKHBytes, P2WSHAddressIndex, P2WSHBytes, QuarterIndex, Sats, SemesterIndex, StoredU64,
Timestamp, TxIndex, Txid, UnknownOutputIndex, Version, WeekIndex, YearIndex,
MonthIndex, OpReturnIndex, P2AAddressIndex, P2ABytes, P2MSOutputIndex, P2PK33AddressIndex,
P2PK33Bytes, P2PK65AddressIndex, P2PK65Bytes, P2PKHAddressIndex, P2PKHBytes, P2SHAddressIndex,
P2SHBytes, P2TRAddressIndex, P2TRBytes, P2WPKHAddressIndex, P2WPKHBytes, P2WSHAddressIndex,
P2WSHBytes, QuarterIndex, Sats, SemesterIndex, StoredU64, Timestamp, TxInIndex, TxIndex,
TxOutIndex, Txid, UnknownOutputIndex, Version, WeekIndex, YearIndex,
};
use brk_traversable::Traversable;
use vecdb::{
@@ -130,7 +130,7 @@ impl Vecs {
indexer.vecs.txindex_to_first_txinindex.boxed_clone(),
indexer.vecs.txinindex_to_txoutindex.boxed_clone(),
|index: TxIndex, txindex_to_first_txinindex_iter, txinindex_to_txoutindex_iter| {
let txindex = index.unwrap_to_usize();
let txindex = index.to_usize();
txindex_to_first_txinindex_iter
.next_at(txindex)
.map(|(_, start)| {
@@ -150,7 +150,7 @@ impl Vecs {
indexer.vecs.txindex_to_first_txoutindex.boxed_clone(),
indexer.vecs.txoutindex_to_value.boxed_clone(),
|index: TxIndex, txindex_to_first_txoutindex_iter, txoutindex_to_value_iter| {
let txindex = index.unwrap_to_usize();
let txindex = index.to_usize();
txindex_to_first_txoutindex_iter
.next_at(txindex)
.map(|(_, start)| {
+2 -2
View File
@@ -1589,7 +1589,7 @@ impl Vecs {
self.indexes_to_price_ath.dateindex.as_ref().unwrap(),
|(i, ath, slf)| {
if prev.is_none() {
let i = i.unwrap_to_usize();
let i = i.to_usize();
prev.replace(if i > 0 {
slf.into_iter().unwrap_get_inner_(i - 1)
} else {
@@ -1620,7 +1620,7 @@ impl Vecs {
.unwrap(),
|(i, days, slf)| {
if prev.is_none() {
let i = i.unwrap_to_usize();
let i = i.to_usize();
prev.replace(if i > 0 {
slf.into_iter().unwrap_get_inner_(i - 1)
} else {
+4 -6
View File
@@ -4,7 +4,7 @@ use allocative::Allocative;
use brk_error::Result;
use brk_indexer::Indexer;
use brk_store::AnyStore;
use brk_structs::{Address, AddressBytes, Height, TxOutIndex, OutputType, PoolId, Pools, pools};
use brk_structs::{Address, AddressBytes, Height, OutputType, PoolId, Pools, TxOutIndex, pools};
use brk_traversable::Traversable;
use rayon::prelude::*;
use vecdb::{
@@ -122,8 +122,7 @@ impl Vecs {
)?;
let mut height_to_first_txindex_iter = indexer.vecs.height_to_first_txindex.iter();
let mut txindex_to_first_txoutindex_iter =
indexer.vecs.txindex_to_first_txoutindex.iter();
let mut txindex_to_first_txoutindex_iter = indexer.vecs.txindex_to_first_txoutindex.iter();
let mut txindex_to_output_count_iter = indexes.txindex_to_output_count.iter();
let mut txoutindex_to_outputtype_iter = indexer.vecs.txoutindex_to_outputtype.iter();
let mut txoutindex_to_typeindex_iter = indexer.vecs.txoutindex_to_typeindex.iter();
@@ -147,7 +146,7 @@ impl Vecs {
let min = starting_indexes
.height
.unwrap_to_usize()
.to_usize()
.min(self.height_to_pool.len());
indexer
@@ -163,8 +162,7 @@ impl Vecs {
let pool = (*txoutindex..(*txoutindex + *outputcount))
.map(TxOutIndex::from)
.find_map(|txoutindex| {
let outputtype =
txoutindex_to_outputtype_iter.unwrap_get_inner(txoutindex);
let outputtype = txoutindex_to_outputtype_iter.unwrap_get_inner(txoutindex);
let typeindex = txoutindex_to_typeindex_iter.unwrap_get_inner(txoutindex);
match outputtype {
+1 -1
View File
@@ -352,7 +352,7 @@ impl Vecs {
self.indexes_to_blocks_mined.dateindex.unwrap_cumulative(),
|(i, sum, cumulative, slf)| {
if prev.is_none() {
let i = i.unwrap_to_usize();
let i = i.to_usize();
prev.replace(if i > 0 {
slf.into_iter().unwrap_get_inner_(i - 1)
} else {
+12 -12
View File
@@ -11,10 +11,10 @@ use brk_grouper::{ByAddressType, ByAnyAddress, Filtered};
use brk_indexer::Indexer;
use brk_structs::{
AnyAddressDataIndexEnum, AnyAddressIndex, CheckedSub, DateIndex, Dollars, EmptyAddressData,
EmptyAddressIndex, Height, TxInIndex, LoadedAddressData, LoadedAddressIndex, TxOutIndex,
OutputType, P2AAddressIndex, P2PK33AddressIndex, P2PK65AddressIndex, P2PKHAddressIndex,
P2SHAddressIndex, P2TRAddressIndex, P2WPKHAddressIndex, P2WSHAddressIndex, Sats, StoredU64,
Timestamp, TypeIndex, Version,
EmptyAddressIndex, Height, LoadedAddressData, LoadedAddressIndex, OutputType, P2AAddressIndex,
P2PK33AddressIndex, P2PK65AddressIndex, P2PKHAddressIndex, P2SHAddressIndex, P2TRAddressIndex,
P2WPKHAddressIndex, P2WSHAddressIndex, Sats, StoredU64, Timestamp, TxInIndex, TxOutIndex,
TypeIndex, Version,
};
use brk_traversable::Traversable;
use log::info;
@@ -201,7 +201,7 @@ impl Vecs {
.unwrap()
.boxed_clone(),
|height: Height, iter| {
iter.next_at(height.unwrap_to_usize()).map(|(_, value)| {
iter.next_at(height.to_usize()).map(|(_, value)| {
let d: Dollars = value.into_owned();
d
})
@@ -874,7 +874,7 @@ impl Vecs {
.unwrap_or_default(),
);
(height.unwrap_to_usize()..height_to_date_fixed.len())
(height.to_usize()..height_to_date_fixed.len())
.map(Height::from)
.try_for_each(|_height| -> Result<()> {
height = _height;
@@ -895,10 +895,10 @@ impl Vecs {
.map(|i| *i.unwrap_get_inner(height));
let first_txoutindex = height_to_first_txoutindex_iter
.unwrap_get_inner(height)
.unwrap_to_usize();
.to_usize();
let first_txinindex = height_to_first_txinindex_iter
.unwrap_get_inner(height)
.unwrap_to_usize();
.to_usize();
let output_count = height_to_output_count_iter.unwrap_get_inner(height);
let input_count = height_to_input_count_iter.unwrap_get_inner(height);
@@ -1268,7 +1268,7 @@ impl Vecs {
)
})?;
if height != last_height && height != Height::ZERO && height.unwrap_to_usize() % 10_000 == 0 {
if height != last_height && height != Height::ZERO && height.to_usize() % 10_000 == 0 {
let _lock = exit.lock();
addresstypeindex_to_anyaddressindex_reader_opt.take();
@@ -1967,13 +1967,13 @@ impl HeightToAddressTypeToVec<(TypeIndex, Sats)> {
self.0.into_iter().try_for_each(|(prev_height, v)| {
let prev_price = height_to_price_close_vec
.as_ref()
.map(|v| **v.get(prev_height.unwrap_to_usize()).unwrap());
.map(|v| **v.get(prev_height.to_usize()).unwrap());
let prev_timestamp = *height_to_timestamp_fixed_vec
.get(prev_height.unwrap_to_usize())
.get(prev_height.to_usize())
.unwrap();
let blocks_old = height.unwrap_to_usize() - prev_height.unwrap_to_usize();
let blocks_old = height.to_usize() - prev_height.to_usize();
let days_old = timestamp.difference_in_days_between_float(prev_timestamp);
@@ -1514,13 +1514,13 @@ impl Vecs {
let chain_state_len = chain_state.len();
height_to_sent.into_iter().for_each(|(height, sent)| {
chain_state[height.unwrap_to_usize()].supply -= &sent.spendable_supply;
chain_state[height.to_usize()].supply -= &sent.spendable_supply;
let block_state = chain_state.get(height.unwrap_to_usize()).unwrap();
let block_state = chain_state.get(height.to_usize()).unwrap();
let prev_price = block_state.price;
let blocks_old = chain_state_len - 1 - height.unwrap_to_usize();
let blocks_old = chain_state_len - 1 - height.to_usize();
let days_old = last_timestamp.difference_in_days_between(block_state.timestamp);
let days_old_float =
+6 -7
View File
@@ -42,7 +42,7 @@ impl ComputeDCAStackViaLen for EagerVec<DateIndex, Sats> {
let index = max_from.min(DateIndex::from(self.len()));
closes.iter_at(index).try_for_each(|(i, closes)| {
let price = *closes.into_owned();
let i_usize = i.unwrap_to_usize();
let i_usize = i.to_usize();
if prev.is_none() {
if i_usize == 0 {
prev.replace(Sats::ZERO);
@@ -92,7 +92,7 @@ impl ComputeDCAStackViaLen for EagerVec<DateIndex, Sats> {
let index = max_from.min(DateIndex::from(self.len()));
closes.iter_at(index).try_for_each(|(i, closes)| {
let price = *closes.into_owned();
let i_usize = i.unwrap_to_usize();
let i_usize = i.to_usize();
if prev.is_none() {
if i_usize == 0 {
prev.replace(Sats::ZERO);
@@ -157,8 +157,8 @@ impl ComputeDCAAveragePriceViaLen for EagerVec<DateIndex, Dollars> {
if i > first_price_date {
avg_price = DCA_AMOUNT
* len
.min(i.unwrap_to_usize() + 1)
.min(i.checked_sub(first_price_date).unwrap().unwrap_to_usize() + 1)
.min(i.to_usize() + 1)
.min(i.checked_sub(first_price_date).unwrap().to_usize() + 1)
/ Bitcoin::from(stack);
}
self.forced_push_at(i, avg_price, exit)
@@ -182,14 +182,13 @@ impl ComputeDCAAveragePriceViaLen for EagerVec<DateIndex, Dollars> {
let index = max_from.min(DateIndex::from(self.len()));
let from_usize = from.unwrap_to_usize();
let from_usize = from.to_usize();
stacks.iter_at(index).try_for_each(|(i, stack)| {
let stack = stack.into_owned();
let mut avg_price = Dollars::from(f64::NAN);
if i >= from {
avg_price =
DCA_AMOUNT * (i.unwrap_to_usize() + 1 - from_usize) / Bitcoin::from(stack);
avg_price = DCA_AMOUNT * (i.to_usize() + 1 - from_usize) / Bitcoin::from(stack);
}
self.forced_push_at(i, avg_price, exit)
})?;
+2 -2
View File
@@ -12,8 +12,8 @@ build = "build.rs"
[dependencies]
bitcoin = { workspace = true }
bitcoincore-rpc = { workspace = true }
fjall_v2 = { workspace = true }
fjall_v3 = { workspace = true }
fjall2 = { workspace = true }
fjall3 = { workspace = true }
jiff = { workspace = true }
minreq = { workspace = true }
sonic-rs = { workspace = true }
+6 -6
View File
@@ -12,8 +12,8 @@ pub enum Error {
IO(io::Error),
BitcoinRPC(bitcoincore_rpc::Error),
Jiff(jiff::Error),
FjallV2(fjall_v2::Error),
FjallV3(fjall_v3::Error),
FjallV2(fjall2::Error),
FjallV3(fjall3::Error),
VecDB(vecdb::Error),
SeqDB(vecdb::SeqDBError),
Minreq(minreq::Error),
@@ -107,14 +107,14 @@ impl From<jiff::Error> for Error {
}
}
impl From<fjall_v3::Error> for Error {
fn from(value: fjall_v3::Error) -> Self {
impl From<fjall3::Error> for Error {
fn from(value: fjall3::Error) -> Self {
Self::FjallV3(value)
}
}
impl From<fjall_v2::Error> for Error {
fn from(value: fjall_v2::Error) -> Self {
impl From<fjall2::Error> for Error {
fn from(value: fjall2::Error) -> Self {
Self::FjallV2(value)
}
}
+2 -2
View File
@@ -19,8 +19,8 @@ brk_reader = { workspace = true }
brk_store = { workspace = true }
brk_structs = { workspace = true }
brk_traversable = { workspace = true }
fjall_v2 = { workspace = true }
fjall_v3 = { workspace = true }
fjall2 = { workspace = true }
fjall3 = { workspace = true }
log = { workspace = true }
rayon = { workspace = true }
rustc-hash = { workspace = true }
+82 -87
View File
@@ -1,8 +1,8 @@
#![doc = include_str!("../README.md")]
use std::{collections::BTreeMap, path::Path, str::FromStr, thread, time::Instant};
use std::{path::Path, str::FromStr, thread, time::Instant};
use bitcoin::{Transaction, TxIn, TxOut};
use bitcoin::{TxIn, TxOut};
use brk_error::{Error, Result};
use brk_store::AnyStore;
use brk_structs::{
@@ -170,7 +170,7 @@ impl Indexer {
.push_if_needed(height, block.weight().into())?;
// let i = Instant::now();
let txid_prefix_and_txid_and_block_txindex_and_prev_txindex = block
let txs = block
.txdata
.par_iter()
.enumerate()
@@ -189,13 +189,21 @@ impl Indexer {
};
Ok((
idxs.txindex + TxIndex::from(index),
tx,
txid,
txid_prefix,
(tx, txid, TxIndex::from(index), prev_txindex_opt),
prev_txindex_opt,
))
})
.collect::<Result<FxHashMap<_, _>>>()?;
.collect::<Result<Vec<_>>>()?;
// println!("txid_prefix_and_txid_and_... = : {:?}", i.elapsed());
let txid_prefix_to_txindex = txs
.iter()
.map(|(txindex, _, _, prefix, _)| (*prefix, txindex))
.collect::<FxHashMap<_, _>>();
// let i = Instant::now();
let inputs = block
.txdata
@@ -238,15 +246,12 @@ impl Indexer {
} else {
let vout = Vout::from(outpoint.vout);
let block_txindex = txid_prefix_and_txid_and_block_txindex_and_prev_txindex
let prev_txindex = **txid_prefix_to_txindex
.get(&txid_prefix)
.ok_or(Error::Str("txid should be in same block")).inspect_err(|_| {
dbg!(&txid_prefix_and_txid_and_block_txindex_and_prev_txindex);
dbg!(&txs);
// panic!();
})?
.2;
let prev_txindex = idxs.txindex + block_txindex;
})?;
let outpoint = OutPoint::new(prev_txindex, vout);
@@ -255,7 +260,7 @@ impl Indexer {
let vout = Vout::from(outpoint.vout);
let txoutindex = vecs.txindex_to_first_txoutindex.get_or_read(prev_txindex, &readers.txindex_to_first_txoutindex)?
let txoutindex = vecs.txindex_to_first_txoutindex.get_pushed_or_read(prev_txindex, &readers.txindex_to_first_txoutindex)?
.ok_or(Error::Str("Expect txoutindex to not be none"))
.inspect_err(|_| {
dbg!(outpoint.txid, prev_txindex, vout);
@@ -264,7 +269,7 @@ impl Indexer {
let outpoint = OutPoint::new(prev_txindex, vout);
let outputtype = vecs.txoutindex_to_outputtype.get_or_read(txoutindex, &readers.txoutindex_to_outputtype)?
let outputtype = vecs.txoutindex_to_outputtype.get_pushed_or_read(txoutindex, &readers.txoutindex_to_outputtype)?
.ok_or(Error::Str("Expect outputtype to not be none"))?.into_owned();
let mut tuple = (
@@ -279,7 +284,7 @@ impl Indexer {
if outputtype.is_address() {
let typeindex = vecs
.txoutindex_to_typeindex
.get_or_read(txoutindex, &readers.txoutindex_to_typeindex)?
.get_pushed_or_read(txoutindex, &readers.txoutindex_to_typeindex)?
.ok_or(Error::Str("Expect typeindex to not be none"))?.into_owned();
tuple.3 = Some((outputtype, typeindex));
}
@@ -369,56 +374,56 @@ impl Indexer {
let prev_addressbytes_opt = match outputtype {
OutputType::P2PK65 => vecs
.p2pk65addressindex_to_p2pk65bytes
.get_or_read(
.get_pushed_or_read(
typeindex.into(),
&readers.p2pk65addressindex_to_p2pk65bytes,
)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2PK33 => vecs
.p2pk33addressindex_to_p2pk33bytes
.get_or_read(
.get_pushed_or_read(
typeindex.into(),
&readers.p2pk33addressindex_to_p2pk33bytes,
)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2PKH => vecs
.p2pkhaddressindex_to_p2pkhbytes
.get_or_read(
.get_pushed_or_read(
typeindex.into(),
&readers.p2pkhaddressindex_to_p2pkhbytes,
)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2SH => vecs
.p2shaddressindex_to_p2shbytes
.get_or_read(
.get_pushed_or_read(
typeindex.into(),
&readers.p2shaddressindex_to_p2shbytes,
)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2WPKH => vecs
.p2wpkhaddressindex_to_p2wpkhbytes
.get_or_read(
.get_pushed_or_read(
typeindex.into(),
&readers.p2wpkhaddressindex_to_p2wpkhbytes,
)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2WSH => vecs
.p2wshaddressindex_to_p2wshbytes
.get_or_read(
.get_pushed_or_read(
typeindex.into(),
&readers.p2wshaddressindex_to_p2wshbytes,
)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2TR => vecs
.p2traddressindex_to_p2trbytes
.get_or_read(
.get_pushed_or_read(
typeindex.into(),
&readers.p2traddressindex_to_p2trbytes,
)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2A => vecs
.p2aaddressindex_to_p2abytes
.get_or_read(
.get_pushed_or_read(
typeindex.into(),
&readers.p2aaddressindex_to_p2abytes,
)?
@@ -696,78 +701,67 @@ impl Indexer {
})?;
// println!("txinindex_and_txindata.into_iter(): {:?}", i.elapsed());
let mut txindex_to_txid_iter = vecs.txindex_to_txid.into_iter();
// let i = Instant::now();
let txindex_to_tx_and_txid = txid_prefix_and_txid_and_block_txindex_and_prev_txindex
.into_iter()
.map(
|(txid_prefix, (tx, txid, index, prev_txindex_opt))| -> Result<(TxIndex, (&Transaction, Txid))> {
let txindex = idxs.txindex + index;
if check_collisions {
let mut txindex_to_txid_iter = vecs.txindex_to_txid.into_iter();
txs.iter()
.try_for_each(|(txindex, _, _, _, prev_txindex_opt)| -> Result<()> {
let Some(prev_txindex) = prev_txindex_opt else {
return Ok(());
};
let tuple = (txindex, (tx, txid));
match prev_txindex_opt {
None => {
stores
.txidprefix_to_txindex
.insert_if_needed(txid_prefix, txindex, height);
}
Some(prev_txindex) => {
// In case if we start at an already parsed height
if txindex == prev_txindex {
return Ok(tuple);
}
if !check_collisions {
return Ok(tuple);
}
let len = vecs.txindex_to_txid.len();
// Ok if `get` is not par as should happen only twice
let prev_txid = txindex_to_txid_iter
.get(prev_txindex)
.ok_or(Error::Str("To have txid for txindex"))
.inspect_err(|_| {
dbg!(txindex, len);
})?;
let prev_txid = prev_txid.as_ref();
// If another Txid needs to be added to the list
// We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated
let only_known_dup_txids = [
bitcoin::Txid::from_str(
"d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599",
)
.unwrap()
.into(),
bitcoin::Txid::from_str(
"e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468",
)
.unwrap()
.into(),
];
let is_dup = only_known_dup_txids.contains(prev_txid);
if !is_dup {
dbg!(height, txindex, prev_txid, prev_txindex);
return Err(Error::Str("Expect none"));
}
}
// In case if we start at an already parsed height
if txindex == prev_txindex {
return Ok(());
}
Ok(tuple)
},
).collect::<Result<BTreeMap<_, _>>>()?;
let len = vecs.txindex_to_txid.len();
// Ok if `get` is not par as should happen only twice
let prev_txid = txindex_to_txid_iter
.get(*prev_txindex)
.ok_or(Error::Str("To have txid for txindex"))
.inspect_err(|_| {
dbg!(txindex, len);
})?;
let prev_txid = prev_txid.as_ref();
// If another Txid needs to be added to the list
// We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated
let only_known_dup_txids = [
bitcoin::Txid::from_str(
"d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599",
)
.unwrap()
.into(),
bitcoin::Txid::from_str(
"e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468",
)
.unwrap()
.into(),
];
let is_dup = only_known_dup_txids.contains(prev_txid);
if !is_dup {
dbg!(height, txindex, prev_txid, prev_txindex);
return Err(Error::Str("Expect none"));
}
Ok(())
})?;
}
// println!("txindex_to_tx_and_txid = : {:?}", i.elapsed());
drop(txindex_to_txid_iter);
// let i = Instant::now();
txindex_to_tx_and_txid.into_iter().try_for_each(
|(txindex, (tx, txid))| -> Result<()> {
txs.into_iter().try_for_each(
|(txindex, tx, txid, txid_prefix, prev_txindex_opt)| -> Result<()> {
if prev_txindex_opt.is_none() {
stores
.txidprefix_to_txindex
.insert_if_needed(txid_prefix, txindex, height);
}
vecs.txindex_to_txversion
.push_if_needed(txindex, tx.version.into())?;
vecs.txindex_to_txid.push_if_needed(txindex, txid)?;
@@ -779,6 +773,7 @@ impl Indexer {
.push_if_needed(txindex, tx.total_size().into())?;
vecs.txindex_to_is_explicitly_rbf
.push_if_needed(txindex, StoredBool::from(tx.is_explicitly_rbf()))?;
Ok(())
},
)?;
+2 -2
View File
@@ -7,7 +7,7 @@ use brk_structs::{
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, StoredString, TxIndex, TxOutIndex,
TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
};
use fjall_v2::{PersistMode, TransactionalKeyspace};
use fjall2::{PersistMode, TransactionalKeyspace};
use rayon::prelude::*;
use vecdb::{AnyVec, StoredIndex, VecIterator};
@@ -196,7 +196,7 @@ impl Stores {
self.blockhashprefix_to_height.remove(blockhashprefix);
});
(starting_indexes.height.unwrap_to_usize()..vecs.height_to_blockhash.len())
(starting_indexes.height.to_usize()..vecs.height_to_blockhash.len())
.map(Height::from)
.for_each(|h| {
self.height_to_coinbase_tag.remove(h);
+7 -5
View File
@@ -7,7 +7,7 @@ use brk_structs::{
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, StoredString, TxIndex, TxOutIndex,
TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
};
use fjall_v3::{PersistMode, TxDatabase};
use fjall3::{PersistMode, TxDatabase};
use rayon::prelude::*;
use vecdb::{AnyVec, StoredIndex, VecIterator};
@@ -136,9 +136,11 @@ impl Stores {
)
.try_for_each(|store| store.commit(height))?;
self.database
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
Ok(())
// self.database
// .persist(PersistMode::SyncAll)
// .map_err(|e| e.into())
}
fn iter_any_store(&self) -> impl Iterator<Item = &dyn AnyStore> {
@@ -196,7 +198,7 @@ impl Stores {
self.blockhashprefix_to_height.remove(blockhashprefix);
});
(starting_indexes.height.unwrap_to_usize()..vecs.height_to_blockhash.len())
(starting_indexes.height.to_usize()..vecs.height_to_blockhash.len())
.map(Height::from)
.for_each(|h| {
self.height_to_coinbase_tag.remove(h);
+4 -4
View File
@@ -14,10 +14,10 @@ build = "build.rs"
[dependencies]
brk_error = { workspace = true }
brk_structs = { workspace = true }
byteview_v6 = { version = "=0.6.1", package = "byteview" }
byteview_v8 = { version = "~0.8.0", package = "byteview" }
fjall_v2 = { workspace = true }
fjall_v3 = { workspace = true }
byteview6 = { version = "=0.6.1", package = "byteview" }
byteview8 = { version = "~0.8.0", package = "byteview" }
fjall2 = { workspace = true }
fjall3 = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
rustc-hash = { workspace = true }
+4 -1
View File
@@ -5,7 +5,7 @@ use std::{
use brk_error::Result;
use brk_structs::Version;
use fjall_v2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle};
use fjall2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle};
use super::Height;
@@ -75,12 +75,15 @@ impl StoreMeta {
path.join("version")
}
#[inline]
pub fn height(&self) -> Option<Height> {
self.height
}
#[inline]
pub fn needs(&self, height: Height) -> bool {
self.height.is_none_or(|self_height| height > self_height)
}
#[inline]
pub fn has(&self, height: Height) -> bool {
!self.needs(height)
}
+28 -34
View File
@@ -1,21 +1,21 @@
use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path, sync::Arc};
use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, mem, path::Path, sync::Arc};
use brk_error::Result;
use brk_structs::{Height, Version};
use byteview_v6::ByteView;
use fjall_v2::{
use byteview6::ByteView;
use fjall2::{
PartitionCreateOptions, PersistMode, ReadTransaction, TransactionalKeyspace,
TransactionalPartitionHandle,
};
use parking_lot::RwLock;
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
mod meta;
use log::info;
use meta::*;
use parking_lot::RwLock;
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
#[derive(Clone)]
pub struct StoreV2<Key, Value> {
@@ -32,8 +32,8 @@ pub struct StoreV2<Key, Value> {
// const CHECK_COLLISIONS: bool = true;
const MAJOR_FJALL_VERSION: Version = Version::TWO;
pub fn open_keyspace(path: &Path) -> fjall_v2::Result<TransactionalKeyspace> {
fjall_v2::Config::new(path.join("fjall"))
pub fn open_keyspace(path: &Path) -> fjall2::Result<TransactionalKeyspace> {
fjall2::Config::new(path.join("fjall"))
// .cache_size(1024 * 1024 * 1024) // for tests only
.max_write_buffer_size(32 * 1024 * 1024)
.open_transactional()
@@ -97,6 +97,7 @@ where
})
}
#[inline]
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
ByteView: From<&'a K>,
@@ -135,33 +136,26 @@ where
.map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v))))
}
#[inline]
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
if !self.dels.is_empty() {
self.dels.remove(&key);
// unreachable!("Shouldn't reach this");
}
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
}
#[inline]
pub fn remove(&mut self, key: K) {
// if self.is_empty()? {
// return Ok(());
// }
// if !self.puts.is_empty() {
// unreachable!("Shouldn't reach this");
// }
if (self.puts.is_empty() || self.puts.remove(&key).is_none()) && !self.dels.insert(key) {
dbg!(&self.meta.path());
unreachable!();
// Hot path: key was recently inserted
if self.puts.remove(&key).is_some() {
return;
}
// Ok(())
let newly_inserted = self.dels.insert(key);
debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
}
#[inline]
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
@@ -181,10 +175,12 @@ where
// });
// }
#[inline]
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
#[inline]
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
@@ -217,16 +213,14 @@ where
let partition = partition.as_ref().unwrap();
let mut dels = self.dels.drain().collect::<Vec<_>>();
dels.sort_unstable();
dels.into_iter()
.for_each(|key| wtx.remove(partition, ByteView::from(key)));
wtx.remove_batch(partition, self.dels.drain().map(ByteView::from));
let mut puts = self.puts.drain().collect::<Vec<_>>();
puts.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
puts.into_iter().for_each(|(key, value)| {
wtx.insert(partition, ByteView::from(key), ByteView::from(value))
});
wtx.insert_batch(
partition,
self.puts
.drain()
.map(|(k, v)| (ByteView::from(k), ByteView::from(v))),
);
wtx.commit()?;
+4 -1
View File
@@ -5,7 +5,7 @@ use std::{
use brk_error::Result;
use brk_structs::Version;
use fjall_v3::{PersistMode, TxDatabase, TxKeyspace};
use fjall3::{PersistMode, TxDatabase, TxKeyspace};
use super::Height;
@@ -77,12 +77,15 @@ impl StoreMeta {
path.join("version")
}
#[inline]
pub fn height(&self) -> Option<Height> {
self.height
}
#[inline]
pub fn needs(&self, height: Height) -> bool {
self.height.is_none_or(|self_height| height > self_height)
}
#[inline]
pub fn has(&self, height: Height) -> bool {
!self.needs(height)
}
+22 -38
View File
@@ -2,8 +2,8 @@ use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path, sync::Arc};
use brk_error::Result;
use brk_structs::{Height, Version};
use byteview_v8::ByteView;
use fjall_v3::{KeyspaceCreateOptions, PersistMode, ReadTransaction, TxDatabase, TxKeyspace};
use byteview8::ByteView;
use fjall3::{KeyspaceCreateOptions, PersistMode, TxDatabase, TxKeyspace};
mod meta;
@@ -20,16 +20,16 @@ pub struct StoreV3<Key, Value> {
name: &'static str,
database: TxDatabase,
keyspace: Arc<RwLock<Option<TxKeyspace>>>,
rtx: Arc<RwLock<Option<ReadTransaction>>>,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
}
const MAJOR_FJALL_VERSION: Version = Version::new(3);
pub fn open_database(path: &Path) -> fjall_v3::Result<TxDatabase> {
pub fn open_database(path: &Path) -> fjall3::Result<TxDatabase> {
TxDatabase::builder(path.join("fjall"))
.cache_size(4 * 1024 * 1024 * 1024)
// .max_write_buffer_size(bytes)
.open()
}
@@ -43,7 +43,7 @@ where
database
.keyspace(
name,
KeyspaceCreateOptions::default().manual_journal_persist(true),
KeyspaceCreateOptions::default().max_memtable_size(8 * 1024 * 1024), // .manual_journal_persist(true),
)
.map_err(|e| e.into())
}
@@ -69,19 +69,17 @@ where
},
)?;
let rtx = database.read_tx();
Ok(Self {
meta,
name: Box::leak(Box::new(name.to_string())),
database: database.clone(),
keyspace: Arc::new(RwLock::new(Some(keyspace))),
rtx: Arc::new(RwLock::new(Some(rtx))),
puts: FxHashMap::default(),
dels: FxHashSet::default(),
})
}
#[inline]
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
ByteView: From<&'a K>,
@@ -89,10 +87,8 @@ where
if let Some(v) = self.puts.get(key) {
Ok(Some(Cow::Borrowed(v)))
} else if let Some(slice) = self
.rtx
.read()
.as_ref()
.unwrap()
.database
.read_tx()
.get(self.keyspace.read().as_ref().unwrap(), ByteView::from(key))?
{
Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
@@ -101,11 +97,10 @@ where
}
}
#[inline]
pub fn is_empty(&self) -> Result<bool> {
self.rtx
.read()
.as_ref()
.unwrap()
self.database
.read_tx()
.is_empty(self.keyspace.read().as_ref().unwrap())
.map_err(|e| e.into())
}
@@ -122,33 +117,26 @@ where
// .map(|(k, v)| (K::from(ByteView::from(k)), V::from(ByteView::from(v))))
// }
#[inline]
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
if !self.dels.is_empty() {
self.dels.remove(&key);
// unreachable!("Shouldn't reach this");
}
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
}
#[inline]
pub fn remove(&mut self, key: K) {
// if self.is_empty()? {
// return Ok(());
// }
// if !self.puts.is_empty() {
// unreachable!("Shouldn't reach this");
// }
if (self.puts.is_empty() || self.puts.remove(&key).is_none()) && !self.dels.insert(key) {
dbg!(&self.meta.path());
unreachable!();
// Hot path: key was recently inserted
if self.puts.remove(&key).is_some() {
return;
}
// Ok(())
let newly_inserted = self.dels.insert(key);
debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
}
#[inline]
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
@@ -168,10 +156,12 @@ where
// });
// }
#[inline]
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
#[inline]
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
@@ -195,10 +185,6 @@ where
return Ok(());
}
let mut rtx = self.rtx.write();
let bad_rtx = rtx.take();
drop(bad_rtx);
let mut wtx = self.database.write_tx();
let keyspace = self.keyspace.read();
@@ -218,8 +204,6 @@ where
wtx.commit()?;
rtx.replace(self.database.read_tx());
Ok(())
}