global: snapshot

This commit is contained in:
nym21
2025-11-04 11:43:04 +01:00
parent cf08e470ef
commit 2ad55bf558
37 changed files with 1903 additions and 670 deletions
Generated
+101 -31
View File
@@ -675,7 +675,8 @@ version = "0.0.111"
dependencies = [
"bitcoin",
"bitcoincore-rpc",
"brk_fjall",
"brk_fjall 2.11.5",
"brk_fjall 3.0.0-pre.4",
"jiff",
"minreq",
"sonic-rs",
@@ -705,13 +706,27 @@ dependencies = [
"byteview 0.6.1",
"dashmap",
"log",
"lsm-tree",
"lsm-tree 2.10.4",
"path-absolutize",
"std-semaphore",
"tempfile",
"xxhash-rust",
]
[[package]]
name = "brk_fjall"
version = "3.0.0-pre.4"
dependencies = [
"byteorder-lite",
"byteview 0.8.0",
"dashmap",
"log",
"lsm-tree 3.0.0-pre.4",
"std-semaphore",
"tempfile",
"xxhash-rust",
]
[[package]]
name = "brk_grouper"
version = "0.0.111"
@@ -729,7 +744,8 @@ version = "0.0.111"
dependencies = [
"bitcoin",
"brk_error",
"brk_fjall",
"brk_fjall 2.11.5",
"brk_fjall 3.0.0-pre.4",
"brk_grouper",
"brk_iterator",
"brk_logger",
@@ -740,6 +756,7 @@ dependencies = [
"brk_types",
"log",
"rayon",
"redb",
"rustc-hash",
"vecdb",
]
@@ -1269,13 +1286,15 @@ name = "brk_store"
version = "0.0.111"
dependencies = [
"brk_error",
"brk_fjall",
"brk_fjall 2.11.5",
"brk_fjall 3.0.0-pre.4",
"brk_types",
"byteview 0.6.1",
"byteview 0.8.0",
"candystore",
"log",
"parking_lot 0.12.5",
"redb",
"rustc-hash",
]
@@ -1317,12 +1336,13 @@ dependencies = [
"allocative",
"bitcoin",
"brk_error",
"byteview 0.6.1",
"byteview 0.8.0",
"derive_deref",
"itoa",
"jiff",
"num_enum",
"rapidhash",
"redb",
"ryu",
"schemars",
"serde",
@@ -1389,6 +1409,12 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "byteorder-lite"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"
[[package]]
name = "bytes"
version = "1.10.1"
@@ -2836,6 +2862,26 @@ dependencies = [
"xxhash-rust",
]
[[package]]
name = "lsm-tree"
version = "3.0.0-pre.4"
dependencies = [
"byteorder-lite",
"byteview 0.8.0",
"crossbeam-skiplist",
"enum_dispatch",
"interval-heap",
"log",
"lz4_flex",
"quick_cache",
"rustc-hash",
"self_cell",
"sfa",
"tempfile",
"varint-rs",
"xxhash-rust",
]
[[package]]
name = "lz4_flex"
version = "0.11.5"
@@ -4060,6 +4106,19 @@ dependencies = [
"rustversion",
]
[[package]]
name = "rawdb"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6601495283897e6193cbd70d65e4e7420995c59c0de3de953f297c2fa8d9dc7"
dependencies = [
"allocative",
"libc",
"memmap2",
"parking_lot 0.12.5",
"rayon",
]
[[package]]
name = "rayon"
version = "1.11.0"
@@ -4080,6 +4139,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "redb"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06"
dependencies = [
"libc",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
@@ -4296,9 +4364,9 @@ dependencies = [
[[package]]
name = "schemars"
version = "1.0.4"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0"
checksum = "1317c3bf3e7df961da95b0a56a172a02abead31276215a0497241a7624b487ce"
dependencies = [
"chrono",
"dyn-clone",
@@ -4311,9 +4379,9 @@ dependencies = [
[[package]]
name = "schemars_derive"
version = "1.0.4"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80"
checksum = "5f760a6150d45dd66ec044983c124595ae76912e77ed0b44124cb3e415cce5d9"
dependencies = [
"proc-macro2",
"quote",
@@ -4380,18 +4448,6 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc"
[[package]]
name = "seqdb"
version = "0.2.17"
dependencies = [
"allocative",
"libc",
"memmap2",
"parking_lot 0.12.5",
"rayon",
"zerocopy",
]
[[package]]
name = "serde"
version = "1.0.228"
@@ -4502,6 +4558,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sfa"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b84349eedbb0664e9febec9c385e8a6644d0496281965ed28b1b20a0108264a9"
dependencies = [
"byteorder-lite",
"log",
"xxhash-rust",
]
[[package]]
name = "sha1"
version = "0.10.6"
@@ -4594,9 +4661,9 @@ dependencies = [
[[package]]
name = "sonic-rs"
version = "0.5.5"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22540d56ba14521e4878ad436d498518c59698c39a89d5905c694932f0bf7134"
checksum = "4425ea8d66ec950e0a8f2ef52c766cc3d68d661d9a0845c353c40833179fd866"
dependencies = [
"ahash",
"bumpalo",
@@ -4615,9 +4682,9 @@ dependencies = [
[[package]]
name = "sonic-simd"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b421f7b6aa4a5de8f685aaf398dfaa828346ee639d2b1c1061ab43d40baa6223"
checksum = "5707edbfb34a40c9f2a55fa09a49101d9fec4e0cc171ce386086bd9616f34257"
dependencies = [
"cfg-if",
]
@@ -4905,9 +4972,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.16"
version = "0.7.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5"
checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594"
dependencies = [
"bytes",
"futures-core",
@@ -5264,17 +5331,18 @@ checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
[[package]]
name = "vecdb"
version = "0.2.17"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cffa6180edb8fb521c9958d4d00a1cca1b5de396ee371f2f8e3ec54c21205eff"
dependencies = [
"allocative",
"ctrlc",
"log",
"parking_lot 0.12.5",
"pco",
"seqdb",
"rawdb",
"serde",
"serde_derive",
"serde_json",
"sonic-rs",
"vecdb_derive",
"zerocopy",
@@ -5282,7 +5350,9 @@ dependencies = [
[[package]]
name = "vecdb_derive"
version = "0.2.17"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23dffd7af8e5579122f79e4a2df3f62f5f315ef0d593721f585384b61bc722e2"
dependencies = [
"quote",
"syn 2.0.108",
+8 -7
View File
@@ -52,30 +52,31 @@ brk_store = { version = "0.0.111", path = "crates/brk_store" }
brk_types = { version = "0.0.111", path = "crates/brk_types" }
brk_traversable = { version = "0.0.111", path = "crates/brk_traversable", features = ["derive"] }
brk_traversable_derive = { version = "0.0.111", path = "crates/brk_traversable_derive" }
byteview = "=0.6.1"
# byteview = "~0.8.0"
# byteview = "=0.6.1"
byteview = "~0.8.0"
derive_deref = "1.1.1"
fjall2 = { version = "2.11.5", package = "brk_fjall" }
# fjall2 = { path = "../fjall2", package = "brk_fjall" }
# fjall2 = { version = "2.11.2", package = "fjall" }
# fjall3 = { version = "=3.0.0-pre.0", package = "fjall" }
# fjall3 = { path = "../fjall3", package = "fjall" }
fjall3 = { path = "../fjall3", package = "brk_fjall" }
# fjall3 = { git = "https://github.com/fjall-rs/fjall.git", rev = "bb15057500dce3115d7644d268b9deeaa895b431", package = "fjall" }
jiff = "0.2.15"
log = "0.4.28"
minreq = { version = "2.14.1", features = ["https", "serde_json"] }
parking_lot = "0.12.5"
rayon = "1.11.0"
redb = "3.1.0"
rustc-hash = "2.1.1"
schemars = "1.0.4"
schemars = "1.0.5"
serde = "1.0.228"
serde_bytes = "0.11.19"
serde_derive = "1.0.228"
serde_json = { version = "1.0.145", features = ["float_roundtrip"] }
sonic-rs = "0.5.5"
sonic-rs = "0.5.6"
tokio = { version = "1.48.0", features = ["rt-multi-thread"] }
vecdb = { path = "../seqdb/crates/vecdb", features = ["derive"] }
# vecdb = { version = "0.2.17", features = ["derive"] }
# vecdb = { path = "../seqdb/crates/vecdb", features = ["derive"] }
vecdb = { version = "0.3.1", features = ["derive"] }
zerocopy = { version = "0.8.27", features = ["derive"] }
[workspace.metadata.release]
+45 -50
View File
@@ -605,12 +605,11 @@ impl Vecs {
starting_dateindex
};
self.dateindex_to_first_height
.compute_inverse_more_to_less(
starting_indexes.height,
&self.height_to_dateindex,
exit,
)?;
self.dateindex_to_first_height.compute_coarser(
starting_indexes.height,
&self.height_to_dateindex,
exit,
)?;
self.dateindex_to_dateindex.compute_from_index(
starting_dateindex,
@@ -648,8 +647,11 @@ impl Vecs {
exit,
)?;
self.weekindex_to_first_dateindex
.compute_inverse_more_to_less(starting_dateindex, &self.dateindex_to_weekindex, exit)?;
self.weekindex_to_first_dateindex.compute_coarser(
starting_dateindex,
&self.dateindex_to_weekindex,
exit,
)?;
self.weekindex_to_weekindex.compute_from_index(
starting_weekindex,
@@ -681,12 +683,11 @@ impl Vecs {
exit,
)?;
self.difficultyepoch_to_first_height
.compute_inverse_more_to_less(
starting_indexes.height,
&self.height_to_difficultyepoch,
exit,
)?;
self.difficultyepoch_to_first_height.compute_coarser(
starting_indexes.height,
&self.height_to_difficultyepoch,
exit,
)?;
self.difficultyepoch_to_difficultyepoch.compute_from_index(
starting_difficultyepoch,
@@ -719,12 +720,11 @@ impl Vecs {
exit,
)?;
self.monthindex_to_first_dateindex
.compute_inverse_more_to_less(
starting_dateindex,
&self.dateindex_to_monthindex,
exit,
)?;
self.monthindex_to_first_dateindex.compute_coarser(
starting_dateindex,
&self.dateindex_to_monthindex,
exit,
)?;
self.monthindex_to_monthindex.compute_from_index(
starting_monthindex,
@@ -756,12 +756,11 @@ impl Vecs {
exit,
)?;
self.quarterindex_to_first_monthindex
.compute_inverse_more_to_less(
starting_monthindex,
&self.monthindex_to_quarterindex,
exit,
)?;
self.quarterindex_to_first_monthindex.compute_coarser(
starting_monthindex,
&self.monthindex_to_quarterindex,
exit,
)?;
// let quarter_count = self.quarterindex_to_first_monthindex.len();
@@ -795,12 +794,11 @@ impl Vecs {
exit,
)?;
self.semesterindex_to_first_monthindex
.compute_inverse_more_to_less(
starting_monthindex,
&self.monthindex_to_semesterindex,
exit,
)?;
self.semesterindex_to_first_monthindex.compute_coarser(
starting_monthindex,
&self.monthindex_to_semesterindex,
exit,
)?;
// let semester_count = self.semesterindex_to_first_monthindex.len();
@@ -834,12 +832,11 @@ impl Vecs {
exit,
)?;
self.yearindex_to_first_monthindex
.compute_inverse_more_to_less(
starting_monthindex,
&self.monthindex_to_yearindex,
exit,
)?;
self.yearindex_to_first_monthindex.compute_coarser(
starting_monthindex,
&self.monthindex_to_yearindex,
exit,
)?;
self.yearindex_to_yearindex.compute_from_index(
starting_yearindex,
@@ -870,12 +867,11 @@ impl Vecs {
exit,
)?;
self.halvingepoch_to_first_height
.compute_inverse_more_to_less(
starting_indexes.height,
&self.height_to_halvingepoch,
exit,
)?;
self.halvingepoch_to_first_height.compute_coarser(
starting_indexes.height,
&self.height_to_halvingepoch,
exit,
)?;
self.halvingepoch_to_halvingepoch.compute_from_index(
starting_halvingepoch,
@@ -899,12 +895,11 @@ impl Vecs {
exit,
)?;
self.decadeindex_to_first_yearindex
.compute_inverse_more_to_less(
starting_yearindex,
&self.yearindex_to_decadeindex,
exit,
)?;
self.decadeindex_to_first_yearindex.compute_coarser(
starting_yearindex,
&self.yearindex_to_decadeindex,
exit,
)?;
self.decadeindex_to_decadeindex.compute_from_index(
starting_decadeindex,
+1 -1
View File
@@ -13,7 +13,7 @@ build = "build.rs"
bitcoin = { workspace = true }
bitcoincore-rpc = { workspace = true }
fjall2 = { workspace = true }
# fjall3 = { workspace = true }
fjall3 = { workspace = true }
jiff = { workspace = true }
minreq = { workspace = true }
sonic-rs = { workspace = true }
+8 -8
View File
@@ -13,7 +13,7 @@ pub enum Error {
BitcoinRPC(bitcoincore_rpc::Error),
Jiff(jiff::Error),
FjallV2(fjall2::Error),
// FjallV3(fjall3::Error),
FjallV3(fjall3::Error),
VecDB(vecdb::Error),
SeqDB(vecdb::SeqDBError),
Minreq(minreq::Error),
@@ -134,12 +134,12 @@ impl From<jiff::Error> for Error {
}
}
// impl From<fjall3::Error> for Error {
// #[inline]
// fn from(value: fjall3::Error) -> Self {
// Self::FjallV3(value)
// }
// }
impl From<fjall3::Error> for Error {
#[inline]
fn from(value: fjall3::Error) -> Self {
Self::FjallV3(value)
}
}
impl From<fjall2::Error> for Error {
#[inline]
@@ -179,7 +179,7 @@ impl fmt::Display for Error {
Error::BitcoinHexToArrayError(error) => Display::fmt(&error, f),
Error::BitcoinRPC(error) => Display::fmt(&error, f),
Error::FjallV2(error) => Display::fmt(&error, f),
// Error::FjallV3(error) => Display::fmt(&error, f),
Error::FjallV3(error) => Display::fmt(&error, f),
Error::IO(error) => Display::fmt(&error, f),
Error::Jiff(error) => Display::fmt(&error, f),
Error::Minreq(error) => Display::fmt(&error, f),
+2 -1
View File
@@ -21,8 +21,9 @@ brk_store = { workspace = true }
brk_types = { workspace = true }
brk_traversable = { workspace = true }
fjall2 = { workspace = true }
# fjall3 = { workspace = true }
fjall3 = { workspace = true }
log = { workspace = true }
rayon = { workspace = true }
redb = { workspace = true }
rustc-hash = { workspace = true }
vecdb = { workspace = true }
+1 -1
View File
@@ -14,7 +14,7 @@ fn main() -> Result<()> {
let mut sum = Sats::ZERO;
let mut count: usize = 0;
for value in indexer.vecs.txoutindex_to_value.clean_values().unwrap() {
for value in indexer.vecs.txoutindex_to_value.clean_iter().unwrap() {
sum += value;
count += 1;
}
@@ -8,7 +8,7 @@ fn run_benchmark(indexer: &Indexer) -> (Sats, std::time::Duration, usize) {
let mut sum = Sats::ZERO;
let mut count = 0;
for value in indexer.vecs.txoutindex_to_value.clean_values().unwrap() {
for value in indexer.vecs.txoutindex_to_value.clean_iter().unwrap() {
// for value in indexer.vecs.txoutindex_to_value.values() {
sum += value;
count += 1;
@@ -1,4 +1,9 @@
use std::{fs, path::Path, thread, time::Instant};
use std::{
fs,
path::Path,
thread,
time::{Duration, Instant},
};
use brk_error::Result;
use brk_indexer::Indexer;
@@ -54,7 +59,7 @@ fn main() -> Result<()> {
_ => vec![6, 4, 7, 1, 8, 5, 2],
};
let mut run_times = [std::time::Duration::ZERO; 7];
let mut run_times = [Duration::ZERO; 7];
for &method in &order {
match method {
@@ -77,7 +82,7 @@ fn main() -> Result<()> {
INPUTS_PER_BLOCK,
OUTPUT_START_OFFSET,
INPUT_START_OFFSET,
);
)?;
run_times[1] = time;
}
4 => {
@@ -88,7 +93,7 @@ fn main() -> Result<()> {
INPUTS_PER_BLOCK,
OUTPUT_START_OFFSET,
INPUT_START_OFFSET,
);
)?;
run_times[2] = time;
}
5 => {
@@ -110,7 +115,7 @@ fn main() -> Result<()> {
INPUTS_PER_BLOCK,
OUTPUT_START_OFFSET,
INPUT_START_OFFSET,
);
)?;
run_times[4] = time;
}
7 => {
@@ -177,7 +182,7 @@ fn main() -> Result<()> {
];
for (name, times) in &methods {
let avg = times.iter().sum::<std::time::Duration>() / times.len() as u32;
let avg = times.iter().sum::<Duration>() / times.len() as u32;
let min = times.iter().min().unwrap();
let max = times.iter().max().unwrap();
@@ -193,7 +198,7 @@ fn main() -> Result<()> {
let averages: Vec<_> = methods
.iter()
.map(|(name, times)| {
let avg = times.iter().sum::<std::time::Duration>() / times.len() as u32;
let avg = times.iter().sum::<Duration>() / times.len() as u32;
(*name, avg)
})
.collect();
@@ -221,7 +226,7 @@ fn run_method1(
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
) -> Duration {
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader();
let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader();
@@ -287,7 +292,7 @@ fn run_method2(
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
) -> Result<Duration> {
let start_time = Instant::now();
for block_idx in 0..num_blocks {
@@ -298,23 +303,23 @@ fn run_method2(
let values: Vec<_> = vecs
.txoutindex_to_value
.iter_at(block_start)
.iter()?
.skip(block_start.to_usize())
.take(outputs_per_block)
.map(|(_, v)| v)
.collect();
let output_types: Vec<_> = vecs
.txoutindex_to_outputtype
.iter_at(block_start)
.iter()?
.skip(block_start.to_usize())
.take(outputs_per_block)
.map(|(_, v)| v)
.collect();
let typeindexes: Vec<_> = vecs
.txoutindex_to_typeindex
.iter_at(block_start)
.iter()?
.skip(block_start.to_usize())
.take(outputs_per_block)
.map(|(_, v)| v)
.collect();
let _outputs: Vec<_> = (0..outputs_per_block)
@@ -328,9 +333,9 @@ fn run_method2(
let outpoints: Vec<_> = vecs
.txinindex_to_outpoint
.iter_at(input_block_start)
.iter()?
.skip(input_block_start.to_usize())
.take(inputs_per_block)
.map(|(_, v)| v)
.collect();
let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader();
@@ -360,7 +365,7 @@ fn run_method2(
std::hint::black_box(input_sum);
}
start_time.elapsed()
Ok(start_time.elapsed())
}
fn run_method4(
@@ -370,7 +375,7 @@ fn run_method4(
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
) -> Result<Duration> {
let start_time = Instant::now();
for block_idx in 0..num_blocks {
@@ -379,33 +384,40 @@ fn run_method4(
(output_start_offset + (block_idx * outputs_per_block)) as u64,
);
let (values, output_types, typeindexes) = thread::scope(|s| {
let h1 = s.spawn(|| {
vecs.txoutindex_to_value
.iter_at(block_start)
let (values, output_types, typeindexes) = thread::scope(|s| -> Result<_> {
let h1 = s.spawn(|| -> Result<_> {
Ok(vecs
.txoutindex_to_value
.iter()?
.skip(block_start.to_usize())
.take(outputs_per_block)
.map(|(_, v)| v)
.collect::<Vec<_>>()
.collect::<Vec<_>>())
});
let h2 = s.spawn(|| {
vecs.txoutindex_to_outputtype
.iter_at(block_start)
let h2 = s.spawn(|| -> Result<_> {
Ok(vecs
.txoutindex_to_outputtype
.iter()?
.skip(block_start.to_usize())
.take(outputs_per_block)
.map(|(_, v)| v)
.collect::<Vec<_>>()
.collect::<Vec<_>>())
});
let h3 = s.spawn(|| {
vecs.txoutindex_to_typeindex
.iter_at(block_start)
let h3 = s.spawn(|| -> Result<_> {
Ok(vecs
.txoutindex_to_typeindex
.iter()?
.skip(block_start.to_usize())
.take(outputs_per_block)
.map(|(_, v)| v)
.collect::<Vec<_>>()
.collect::<Vec<_>>())
});
(h1.join().unwrap(), h2.join().unwrap(), h3.join().unwrap())
});
Ok((
h1.join().unwrap()?,
h2.join().unwrap()?,
h3.join().unwrap()?,
))
})?;
let _outputs: Vec<_> = (0..outputs_per_block)
.into_par_iter()
@@ -418,9 +430,9 @@ fn run_method4(
let outpoints: Vec<_> = vecs
.txinindex_to_outpoint
.iter_at(input_block_start)
.iter()?
.skip(input_block_start.to_usize())
.take(inputs_per_block)
.map(|(_, v)| v)
.collect();
let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader();
@@ -450,7 +462,7 @@ fn run_method4(
std::hint::black_box(input_sum);
}
start_time.elapsed()
Ok(start_time.elapsed())
}
fn run_method5(
@@ -460,7 +472,7 @@ fn run_method5(
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
) -> Duration {
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader();
let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader();
@@ -528,7 +540,7 @@ fn run_method6(
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
) -> Result<Duration> {
let start_time = Instant::now();
for block_idx in 0..num_blocks {
@@ -539,23 +551,23 @@ fn run_method6(
let values: Vec<_> = vecs
.txoutindex_to_value
.iter_at(block_start)
.iter()?
.skip(block_start.to_usize())
.take(outputs_per_block)
.map(|(_, v)| v)
.collect();
let output_types: Vec<_> = vecs
.txoutindex_to_outputtype
.iter_at(block_start)
.iter()?
.skip(block_start.to_usize())
.take(outputs_per_block)
.map(|(_, v)| v)
.collect();
let typeindexes: Vec<_> = vecs
.txoutindex_to_typeindex
.iter_at(block_start)
.iter()?
.skip(block_start.to_usize())
.take(outputs_per_block)
.map(|(_, v)| v)
.collect();
// Read inputs sequentially
@@ -564,9 +576,9 @@ fn run_method6(
let outpoints: Vec<_> = vecs
.txinindex_to_outpoint
.iter_at(input_block_start)
.iter()?
.skip(input_block_start.to_usize())
.take(inputs_per_block)
.map(|(_, v)| v)
.collect();
let txindex_to_first_txoutindex_reader = vecs.txindex_to_first_txoutindex.create_reader();
@@ -608,7 +620,7 @@ fn run_method6(
std::hint::black_box(input_sum);
}
start_time.elapsed()
Ok(start_time.elapsed())
}
fn run_method7(
@@ -618,7 +630,7 @@ fn run_method7(
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
) -> Duration {
// Create readers ONCE outside loop
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader();
@@ -683,7 +695,7 @@ fn run_method8(
inputs_per_block: usize,
output_start_offset: usize,
input_start_offset: usize,
) -> std::time::Duration {
) -> Duration {
let txoutindex_to_value_reader = vecs.txoutindex_to_value.create_reader();
let txoutindex_to_outputtype_reader = vecs.txoutindex_to_outputtype.create_reader();
let txoutindex_to_typeindex_reader = vecs.txoutindex_to_typeindex.create_reader();
@@ -758,8 +770,8 @@ fn run_method8(
start_time.elapsed()
}
fn calculate_stddev(times: &[std::time::Duration]) -> std::time::Duration {
let avg = times.iter().sum::<std::time::Duration>().as_secs_f64() / times.len() as f64;
fn calculate_stddev(times: &[Duration]) -> Duration {
let avg = times.iter().sum::<Duration>().as_secs_f64() / times.len() as f64;
let variance = times
.iter()
.map(|t| {
@@ -768,5 +780,5 @@ fn calculate_stddev(times: &[std::time::Duration]) -> std::time::Duration {
})
.sum::<f64>()
/ times.len() as f64;
std::time::Duration::from_secs_f64(variance.sqrt())
Duration::from_secs_f64(variance.sqrt())
}
+1 -1
View File
@@ -233,6 +233,6 @@ where
} else if h + 1_u32 == starting_height {
Some(I::from(index_to_else.len()))
} else {
height_to_index.iter().get_inner(starting_height)
height_to_index.iter().get(starting_height)
}
}
+7 -6
View File
@@ -15,14 +15,16 @@ use brk_types::{
use log::{error, info};
use rayon::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet};
use vecdb::{AnyVec, Exit, GenericStoredVec, Reader, VecIterator};
use vecdb::{AnyVec, Exit, GenericStoredVec, Reader, VecIteratorExtended};
mod indexes;
mod stores_v2;
mod stores_redb;
// mod stores_v2;
// mod stores_v3;
mod vecs;
pub use indexes::*;
pub use stores_v2::*;
pub use stores_redb::*;
// pub use stores_v2::*;
// pub use stores_v3::*;
pub use vecs::*;
@@ -80,9 +82,8 @@ impl Indexer {
exit: &Exit,
check_collisions: bool,
) -> Result<Indexes> {
let (starting_indexes, prev_hash) = if let Some(hash) =
VecIterator::last(self.vecs.height_to_blockhash.iter()).map(|(_, v)| v)
{
let last_blockhash = self.vecs.height_to_blockhash.iter()?.last();
let (starting_indexes, prev_hash) = if let Some(hash) = last_blockhash {
let (height, hash) = client.get_closest_valid_height(hash)?;
let starting_indexes =
Indexes::from((height.incremented(), &mut self.vecs, &self.stores));
+408
View File
@@ -0,0 +1,408 @@
use std::{fs, path::Path, sync::Arc};
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, StoreRedb as Store};
use brk_types::{
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex,
TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
Vout,
};
use rayon::prelude::*;
use redb::Database;
use vecdb::{AnyVec, GenericStoredVec, StoredIndex, VecIterator, VecIteratorExtended};
use crate::Indexes;
use super::Vecs;
#[derive(Clone)]
pub struct Stores {
pub database: Arc<Database>,
pub addressbyteshash_to_typeindex: Store<AddressBytesHash, TypeIndex>,
pub blockhashprefix_to_height: Store<BlockHashPrefix, Height>,
pub height_to_coinbase_tag: Store<Height, StoredString>,
pub txidprefix_to_txindex: Store<TxidPrefix, TxIndex>,
pub addresstype_to_typeindex_and_txindex: ByAddressType<Store<TypeIndexAndTxIndex, Unit>>,
pub addresstype_to_typeindex_and_unspentoutpoint:
ByAddressType<Store<TypeIndexAndOutPoint, Unit>>,
}
impl Stores {
pub fn forced_import(parent: &Path, version: Version) -> Result<Self> {
let pathbuf = parent.join("stores");
let path = pathbuf.as_path();
fs::create_dir_all(&pathbuf)?;
let database = Arc::new(match brk_store::open_redb_database(path) {
Ok(database) => database,
Err(_) => {
fs::remove_dir_all(path)?;
return Self::forced_import(path, version);
}
});
let database_ref = &database;
let create_addressindex_and_txindex_store = |index| {
Store::import(
database_ref,
path,
&format!("a2t{}", index),
version,
Some(false),
)
};
let create_addressindex_and_unspentoutpoint_store =
|index| Store::import(database_ref, path, &format!("a2u{}", index), version, None);
Ok(Self {
database: database.clone(),
height_to_coinbase_tag: Store::import(database_ref, path, "h2c", version, None)?,
addressbyteshash_to_typeindex: Store::import(database_ref, path, "a2t", version, None)?,
blockhashprefix_to_height: Store::import(database_ref, path, "b2h", version, None)?,
txidprefix_to_txindex: Store::import(database_ref, path, "t2t", version, None)?,
addresstype_to_typeindex_and_txindex: ByAddressType::new_with_index(
create_addressindex_and_txindex_store,
)?,
addresstype_to_typeindex_and_unspentoutpoint: ByAddressType::new_with_index(
create_addressindex_and_unspentoutpoint_store,
)?,
})
}
pub fn starting_height(&self) -> Height {
self.iter_any_store()
.map(|store| {
// let height =
store.height().map(Height::incremented).unwrap_or_default()
// dbg!((height, store.name()));
})
.min()
.unwrap()
}
pub fn commit(&mut self, height: Height) -> Result<()> {
[
&mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore,
&mut self.blockhashprefix_to_height,
&mut self.height_to_coinbase_tag,
&mut self.txidprefix_to_txindex,
]
// .into_iter() // Changed from par_iter_mut()
.into_par_iter() // Changed from par_iter_mut()
.chain(
self.addresstype_to_typeindex_and_txindex
// .iter_mut()
.par_iter_mut()
.map(|s| s as &mut dyn AnyStore),
)
.chain(
self.addresstype_to_typeindex_and_unspentoutpoint
// .iter_mut()
.par_iter_mut()
.map(|s| s as &mut dyn AnyStore),
)
.try_for_each(|store| store.commit(height))?;
Ok(())
// self.database
// .persist(PersistMode::SyncAll)
// .map_err(|e| e.into())
}
fn iter_any_store(&self) -> impl Iterator<Item = &dyn AnyStore> {
[
&self.addressbyteshash_to_typeindex as &dyn AnyStore,
&self.blockhashprefix_to_height,
&self.height_to_coinbase_tag,
&self.txidprefix_to_txindex,
]
.into_iter()
.chain(
self.addresstype_to_typeindex_and_txindex
.iter()
.map(|s| s as &dyn AnyStore),
)
.chain(
self.addresstype_to_typeindex_and_unspentoutpoint
.iter()
.map(|s| s as &dyn AnyStore),
)
}
pub fn rollback_if_needed(
&mut self,
vecs: &mut Vecs,
starting_indexes: &Indexes,
) -> Result<()> {
if self.addressbyteshash_to_typeindex.is_empty()?
&& self.blockhashprefix_to_height.is_empty()?
&& self.txidprefix_to_txindex.is_empty()?
&& self.height_to_coinbase_tag.is_empty()?
&& self
.addresstype_to_typeindex_and_txindex
.iter()
.map(|s| s.is_empty())
.collect::<Result<Vec<_>>>()?
.into_iter()
.all(|empty| empty)
&& self
.addresstype_to_typeindex_and_unspentoutpoint
.iter()
.map(|s| s.is_empty())
.collect::<Result<Vec<_>>>()?
.into_iter()
.all(|empty| empty)
{
return Ok(());
}
if starting_indexes.height != Height::ZERO {
vecs.height_to_blockhash
.iter()?
.skip(starting_indexes.height.to_usize())
.map(BlockHashPrefix::from)
.for_each(|prefix| {
self.blockhashprefix_to_height.remove(prefix);
});
(starting_indexes.height.to_usize()..vecs.height_to_blockhash.len())
.map(Height::from)
.for_each(|h| {
self.height_to_coinbase_tag.remove(h);
});
if let Ok(mut index) = vecs
.height_to_first_p2pk65addressindex
.one_shot_read(starting_indexes.height)
{
let mut p2pk65addressindex_to_p2pk65bytes_iter =
vecs.p2pk65addressindex_to_p2pk65bytes.iter()?;
while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2pk33addressindex
.one_shot_read(starting_indexes.height)
{
let mut p2pk33addressindex_to_p2pk33bytes_iter =
vecs.p2pk33addressindex_to_p2pk33bytes.iter()?;
while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2pkhaddressindex
.one_shot_read(starting_indexes.height)
{
let mut p2pkhaddressindex_to_p2pkhbytes_iter =
vecs.p2pkhaddressindex_to_p2pkhbytes.iter()?;
while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2shaddressindex
.one_shot_read(starting_indexes.height)
{
let mut p2shaddressindex_to_p2shbytes_iter =
vecs.p2shaddressindex_to_p2shbytes.iter()?;
while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2traddressindex
.one_shot_read(starting_indexes.height)
{
let mut p2traddressindex_to_p2trbytes_iter =
vecs.p2traddressindex_to_p2trbytes.iter()?;
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2wpkhaddressindex
.one_shot_read(starting_indexes.height)
{
let mut p2wpkhaddressindex_to_p2wpkhbytes_iter =
vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter()?;
while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2wshaddressindex
.one_shot_read(starting_indexes.height)
{
let mut p2wshaddressindex_to_p2wshbytes_iter =
vecs.p2wshaddressindex_to_p2wshbytes.iter()?;
while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
if let Ok(mut index) = vecs
.height_to_first_p2aaddressindex
.one_shot_read(starting_indexes.height)
{
let mut p2aaddressindex_to_p2abytes_iter =
vecs.p2aaddressindex_to_p2abytes.iter()?;
while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
index.increment();
}
}
} else {
unreachable!();
// self.blockhashprefix_to_height.reset()?;
// self.addressbyteshash_to_typeindex.reset()?;
}
if starting_indexes.txindex != TxIndex::ZERO {
vecs.txindex_to_txid
.iter()?
.enumerate()
.skip(starting_indexes.txindex.to_usize())
.for_each(|(txindex, txid)| {
let txindex = TxIndex::from(txindex);
let txidprefix = TxidPrefix::from(&txid);
// "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599"
let is_not_first_dup = txindex != TxIndex::new(142783)
|| txidprefix != TxidPrefix::from([153, 133, 216, 41, 84, 225, 15, 34]);
// "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468"
let is_not_second_dup = txindex != TxIndex::new(142841)
|| txidprefix != TxidPrefix::from([104, 180, 95, 88, 182, 116, 233, 78]);
if is_not_first_dup && is_not_second_dup {
self.txidprefix_to_txindex.remove(txidprefix);
}
});
} else {
unreachable!();
// self.txidprefix_to_txindex.reset()?;
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
let mut txoutindex_to_txindex_iter = vecs.txoutindex_to_txindex.iter()?;
let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?;
vecs.txoutindex_to_outputtype
.iter()?
.enumerate()
.skip(starting_indexes.txoutindex.to_usize())
.zip(
vecs.txoutindex_to_typeindex
.iter()?
.skip(starting_indexes.txoutindex.to_usize()),
)
.filter(|((_, outputtype), _)| outputtype.is_address())
.for_each(|((txoutindex, outputtype), typeindex)| {
let txindex = txoutindex_to_txindex_iter.unsafe_get_(txoutindex);
let vout = Vout::from(
txoutindex.to_usize()
- txindex_to_first_txoutindex_iter
.unsafe_get(txindex)
.to_usize(),
);
let outpoint = OutPoint::new(txindex, vout);
self.addresstype_to_typeindex_and_unspentoutpoint
.get_mut(outputtype)
.unwrap()
.remove(TypeIndexAndOutPoint::from((typeindex, outpoint)));
});
// Add back outputs that were spent after the rollback point
let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?;
let mut txoutindex_to_outputtype_iter = vecs.txoutindex_to_outputtype.iter()?;
let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.iter()?;
vecs.txinindex_to_outpoint
.iter()?
.skip(starting_indexes.txinindex.to_usize())
.for_each(|outpoint| {
if outpoint.is_coinbase() {
return;
}
let txindex = outpoint.txindex();
let vout = outpoint.vout();
// Calculate txoutindex from txindex and vout
let txoutindex = txindex_to_first_txoutindex_iter.unsafe_get(txindex) + vout;
// Only process if this output was created before the rollback point
if txoutindex < starting_indexes.txoutindex {
let outputtype = txoutindex_to_outputtype_iter.unsafe_get(txoutindex);
if outputtype.is_address() {
let typeindex = txoutindex_to_typeindex_iter.unsafe_get(txoutindex);
self.addresstype_to_typeindex_and_unspentoutpoint
.get_mut(outputtype)
.unwrap()
.insert(TypeIndexAndOutPoint::from((typeindex, outpoint)), Unit);
}
}
});
} else {
unreachable!();
// self.addresstype_to_typeindex_and_txindex
// .iter_mut()
// .try_for_each(|s| s.reset())?;
// self.addresstype_to_typeindex_and_unspentoutpoint
// .iter_mut()
// .try_for_each(|s| s.reset())?;
}
self.commit(starting_indexes.height.decremented().unwrap_or_default())?;
Ok(())
}
}
+57 -63
View File
@@ -2,7 +2,7 @@ use std::{fs, path::Path};
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, StoreV2 as Store};
use brk_store::{AnyStore, StoreFjallV2 as Store};
use brk_types::{
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex,
TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
@@ -10,7 +10,7 @@ use brk_types::{
};
use fjall2::{PersistMode, TransactionalKeyspace};
use rayon::prelude::*;
use vecdb::{AnyVec, StoredIndex, VecIterator};
use vecdb::{AnyVec, GenericStoredVec, StoredIndex, VecIterator, VecIteratorExtended};
use crate::Indexes;
@@ -160,10 +160,11 @@ impl Stores {
if starting_indexes.height != Height::ZERO {
vecs.height_to_blockhash
.iter_at(starting_indexes.height)
.for_each(|(_, v)| {
let blockhashprefix = BlockHashPrefix::from(v);
self.blockhashprefix_to_height.remove(blockhashprefix);
.iter()?
.skip(starting_indexes.height.to_usize())
.map(BlockHashPrefix::from)
.for_each(|prefix| {
self.blockhashprefix_to_height.remove(prefix);
});
(starting_indexes.height.to_usize()..vecs.height_to_blockhash.len())
@@ -172,13 +173,12 @@ impl Stores {
self.height_to_coinbase_tag.remove(h);
});
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2pk65addressindex
.iter()
.get(starting_indexes.height)
.one_shot_read(starting_indexes.height)
{
let mut p2pk65addressindex_to_p2pk65bytes_iter =
vecs.p2pk65addressindex_to_p2pk65bytes.iter();
vecs.p2pk65addressindex_to_p2pk65bytes.iter()?;
while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
@@ -188,13 +188,12 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2pk33addressindex
.iter()
.get(starting_indexes.height)
.one_shot_read(starting_indexes.height)
{
let mut p2pk33addressindex_to_p2pk33bytes_iter =
vecs.p2pk33addressindex_to_p2pk33bytes.iter();
vecs.p2pk33addressindex_to_p2pk33bytes.iter()?;
while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
@@ -204,13 +203,12 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2pkhaddressindex
.iter()
.get(starting_indexes.height)
.one_shot_read(starting_indexes.height)
{
let mut p2pkhaddressindex_to_p2pkhbytes_iter =
vecs.p2pkhaddressindex_to_p2pkhbytes.iter();
vecs.p2pkhaddressindex_to_p2pkhbytes.iter()?;
while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
@@ -220,13 +218,12 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2shaddressindex
.iter()
.get(starting_indexes.height)
.one_shot_read(starting_indexes.height)
{
let mut p2shaddressindex_to_p2shbytes_iter =
vecs.p2shaddressindex_to_p2shbytes.iter();
vecs.p2shaddressindex_to_p2shbytes.iter()?;
while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
@@ -236,13 +233,12 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2traddressindex
.iter()
.get(starting_indexes.height)
.one_shot_read(starting_indexes.height)
{
let mut p2traddressindex_to_p2trbytes_iter =
vecs.p2traddressindex_to_p2trbytes.iter();
vecs.p2traddressindex_to_p2trbytes.iter()?;
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
@@ -252,13 +248,12 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2wpkhaddressindex
.iter()
.get(starting_indexes.height)
.one_shot_read(starting_indexes.height)
{
let mut p2wpkhaddressindex_to_p2wpkhbytes_iter =
vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter();
vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter()?;
while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
@@ -268,13 +263,12 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2wshaddressindex
.iter()
.get(starting_indexes.height)
.one_shot_read(starting_indexes.height)
{
let mut p2wshaddressindex_to_p2wshbytes_iter =
vecs.p2wshaddressindex_to_p2wshbytes.iter();
vecs.p2wshaddressindex_to_p2wshbytes.iter()?;
while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
@@ -284,12 +278,12 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2aaddressindex
.iter()
.get(starting_indexes.height)
.one_shot_read(starting_indexes.height)
{
let mut p2aaddressindex_to_p2abytes_iter = vecs.p2aaddressindex_to_p2abytes.iter();
let mut p2aaddressindex_to_p2abytes_iter =
vecs.p2aaddressindex_to_p2abytes.iter()?;
while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
@@ -306,8 +300,12 @@ impl Stores {
if starting_indexes.txindex != TxIndex::ZERO {
vecs.txindex_to_txid
.iter_at(starting_indexes.txindex)
.iter()?
.enumerate()
.skip(starting_indexes.txindex.to_usize())
.for_each(|(txindex, txid)| {
let txindex = TxIndex::from(txindex);
let txidprefix = TxidPrefix::from(&txid);
// "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599"
@@ -328,23 +326,25 @@ impl Stores {
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
let mut txoutindex_to_txindex_iter = vecs.txoutindex_to_txindex.iter()?;
let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?;
vecs.txoutindex_to_outputtype
.iter_at(starting_indexes.txoutindex)
.iter()?
.enumerate()
.skip(starting_indexes.txoutindex.to_usize())
.zip(
vecs.txoutindex_to_typeindex
.iter_at(starting_indexes.txoutindex),
.iter()?
.skip(starting_indexes.txoutindex.to_usize()),
)
.filter(|((_, outputtype), _)| outputtype.is_address())
.for_each(|((txoutindex, outputtype), (_, typeindex))| {
let txindex = vecs.txoutindex_to_txindex.iter().get(txoutindex).unwrap();
.for_each(|((txoutindex, outputtype), typeindex)| {
let txindex = txoutindex_to_txindex_iter.unsafe_get_(txoutindex);
let vout = Vout::from(
txoutindex.to_usize()
- vecs
.txindex_to_first_txoutindex
.iter()
.get(txindex)
.unwrap()
- txindex_to_first_txoutindex_iter
.unsafe_get(txindex)
.to_usize(),
);
let outpoint = OutPoint::new(txindex, vout);
@@ -356,9 +356,13 @@ impl Stores {
});
// Add back outputs that were spent after the rollback point
let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?;
let mut txoutindex_to_outputtype_iter = vecs.txoutindex_to_outputtype.iter()?;
let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.iter()?;
vecs.txinindex_to_outpoint
.iter_at(starting_indexes.txinindex)
.for_each(|(_, outpoint)| {
.iter()?
.skip(starting_indexes.txinindex.to_usize())
.for_each(|outpoint| {
if outpoint.is_coinbase() {
return;
}
@@ -367,24 +371,14 @@ impl Stores {
let vout = outpoint.vout();
// Calculate txoutindex from txindex and vout
let txoutindex = vecs
.txindex_to_first_txoutindex
.iter()
.get(txindex)
.unwrap()
+ vout;
let txoutindex = txindex_to_first_txoutindex_iter.unsafe_get(txindex) + vout;
// Only process if this output was created before the rollback point
if txoutindex < starting_indexes.txoutindex {
let outputtype = vecs
.txoutindex_to_outputtype
.iter()
.get(txoutindex)
.unwrap();
let outputtype = txoutindex_to_outputtype_iter.unsafe_get(txoutindex);
if outputtype.is_address() {
let typeindex =
vecs.txoutindex_to_typeindex.iter().get(txoutindex).unwrap();
let typeindex = txoutindex_to_typeindex_iter.unsafe_get(txoutindex);
self.addresstype_to_typeindex_and_unspentoutpoint
.get_mut(outputtype)
+147 -151
View File
@@ -1,15 +1,17 @@
use std::{borrow::Cow, fs, path::Path};
use std::{fs, path::Path};
use brk_error::Result;
use brk_grouper::ByAddressType;
use brk_store::{AnyStore, StoreV3 as Store};
use brk_store::{AnyStore, StoreFjallV3 as Store};
use brk_types::{
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, StoredString, TxIndex, TxOutIndex,
TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutPoint, StoredString, TxIndex,
TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version,
Vout,
};
use fjall3::{PersistMode, TxDatabase};
use log::info;
use rayon::prelude::*;
use vecdb::{AnyVec, StoredIndex, VecIterator};
use vecdb::{AnyVec, GenericStoredVec, StoredIndex, VecIterator, VecIteratorExtended};
use crate::Indexes;
@@ -35,7 +37,7 @@ impl Stores {
fs::create_dir_all(&pathbuf)?;
let database = match brk_store::open_database(path) {
let database = match brk_store::open_fjall2_database(path) {
Ok(database) => database,
Err(_) => {
fs::remove_dir_all(path)?;
@@ -45,61 +47,30 @@ impl Stores {
let database_ref = &database;
let create_addressindex_and_txindex_store = |cohort| {
let create_addressindex_and_txindex_store = |index| {
Store::import(
database_ref,
path,
&format!("{}addressindex_and_txindex", cohort),
&format!("a2t{}", index),
version,
Some(false),
)
};
let create_addressindex_and_unspentoutpoint_store = |cohort| {
Store::import(
database_ref,
path,
&format!("{}addressindex_and_unspentoutpoint", cohort),
version,
None,
)
};
let create_addressindex_and_unspentoutpoint_store =
|index| Store::import(database_ref, path, &format!("a2u{}", index), version, None);
Ok(Self {
database: database.clone(),
height_to_coinbase_tag: Store::import(
database_ref,
path,
"height_to_coinbase_tag",
version,
None,
)?,
addressbyteshash_to_typeindex: Store::import(
database_ref,
path,
"addressbyteshash_to_typeindex",
version,
None,
)?,
blockhashprefix_to_height: Store::import(
database_ref,
path,
"blockhashprefix_to_height",
version,
None,
)?,
txidprefix_to_txindex: Store::import(
database_ref,
path,
"txidprefix_to_txindex",
version,
None,
)?,
addresstype_to_typeindex_and_txindex: ByAddressType::new(
height_to_coinbase_tag: Store::import(database_ref, path, "h2c", version, None)?,
addressbyteshash_to_typeindex: Store::import(database_ref, path, "a2t", version, None)?,
blockhashprefix_to_height: Store::import(database_ref, path, "b2h", version, None)?,
txidprefix_to_txindex: Store::import(database_ref, path, "t2t", version, None)?,
addresstype_to_typeindex_and_txindex: ByAddressType::new_with_index(
create_addressindex_and_txindex_store,
)?,
addresstype_to_typeindex_and_unspentoutpoint: ByAddressType::new(
addresstype_to_typeindex_and_unspentoutpoint: ByAddressType::new_with_index(
create_addressindex_and_unspentoutpoint_store,
)?,
})
@@ -117,6 +88,12 @@ impl Stores {
}
pub fn commit(&mut self, height: Height) -> Result<()> {
info!(
"database.write_buffer_size = {}",
self.database.write_buffer_size()
);
info!("database.journal_count = {}", self.database.journal_count());
[
&mut self.addressbyteshash_to_typeindex as &mut dyn AnyStore,
&mut self.blockhashprefix_to_height,
@@ -136,11 +113,15 @@ impl Stores {
)
.try_for_each(|store| store.commit(height))?;
Ok(())
info!(
"database.write_buffer_size = {}",
self.database.write_buffer_size()
);
info!("database.journal_count = {}", self.database.journal_count());
// self.database
// .persist(PersistMode::SyncAll)
// .map_err(|e| e.into())
self.database
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
fn iter_any_store(&self) -> impl Iterator<Item = &dyn AnyStore> {
@@ -192,10 +173,11 @@ impl Stores {
if starting_indexes.height != Height::ZERO {
vecs.height_to_blockhash
.iter_at(starting_indexes.height)
.for_each(|(_, v)| {
let blockhashprefix = BlockHashPrefix::from(v.into_owned());
self.blockhashprefix_to_height.remove(blockhashprefix);
.iter()?
.skip(starting_indexes.height.to_usize())
.map(BlockHashPrefix::from)
.for_each(|prefix| {
self.blockhashprefix_to_height.remove(prefix);
});
(starting_indexes.height.to_usize()..vecs.height_to_blockhash.len())
@@ -204,19 +186,14 @@ impl Stores {
self.height_to_coinbase_tag.remove(h);
});
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2pk65addressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
.one_shot_read(starting_indexes.height)
{
let mut p2pk65addressindex_to_p2pk65bytes_iter =
vecs.p2pk65addressindex_to_p2pk65bytes.iter();
vecs.p2pk65addressindex_to_p2pk65bytes.iter()?;
while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter
.get(index)
.map(Cow::into_owned)
{
while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
@@ -224,19 +201,14 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2pk33addressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
.one_shot_read(starting_indexes.height)
{
let mut p2pk33addressindex_to_p2pk33bytes_iter =
vecs.p2pk33addressindex_to_p2pk33bytes.iter();
vecs.p2pk33addressindex_to_p2pk33bytes.iter()?;
while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter
.get(index)
.map(Cow::into_owned)
{
while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
@@ -244,19 +216,14 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2pkhaddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
.one_shot_read(starting_indexes.height)
{
let mut p2pkhaddressindex_to_p2pkhbytes_iter =
vecs.p2pkhaddressindex_to_p2pkhbytes.iter();
vecs.p2pkhaddressindex_to_p2pkhbytes.iter()?;
while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter
.get(index)
.map(Cow::into_owned)
{
while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
@@ -264,19 +231,14 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2shaddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
.one_shot_read(starting_indexes.height)
{
let mut p2shaddressindex_to_p2shbytes_iter =
vecs.p2shaddressindex_to_p2shbytes.iter();
vecs.p2shaddressindex_to_p2shbytes.iter()?;
while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter
.get(index)
.map(Cow::into_owned)
{
while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
@@ -284,19 +246,14 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2traddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
.one_shot_read(starting_indexes.height)
{
let mut p2traddressindex_to_p2trbytes_iter =
vecs.p2traddressindex_to_p2trbytes.iter();
vecs.p2traddressindex_to_p2trbytes.iter()?;
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter
.get(index)
.map(Cow::into_owned)
{
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
@@ -304,19 +261,14 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2wpkhaddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
.one_shot_read(starting_indexes.height)
{
let mut p2wpkhaddressindex_to_p2wpkhbytes_iter =
vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter();
vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter()?;
while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter
.get(index)
.map(Cow::into_owned)
{
while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
@@ -324,19 +276,14 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2wshaddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
.one_shot_read(starting_indexes.height)
{
let mut p2wshaddressindex_to_p2wshbytes_iter =
vecs.p2wshaddressindex_to_p2wshbytes.iter();
vecs.p2wshaddressindex_to_p2wshbytes.iter()?;
while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter
.get(index)
.map(Cow::into_owned)
{
while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
@@ -344,18 +291,14 @@ impl Stores {
}
}
if let Some(mut index) = vecs
if let Ok(mut index) = vecs
.height_to_first_p2aaddressindex
.iter()
.get(starting_indexes.height)
.map(Cow::into_owned)
.one_shot_read(starting_indexes.height)
{
let mut p2aaddressindex_to_p2abytes_iter = vecs.p2aaddressindex_to_p2abytes.iter();
let mut p2aaddressindex_to_p2abytes_iter =
vecs.p2aaddressindex_to_p2abytes.iter()?;
while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter
.get(index)
.map(Cow::into_owned)
{
while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter.get(index) {
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from(&bytes);
self.addressbyteshash_to_typeindex.remove(hash);
@@ -363,15 +306,20 @@ impl Stores {
}
}
} else {
self.blockhashprefix_to_height.reset()?;
self.addressbyteshash_to_typeindex.reset()?;
unreachable!();
// self.blockhashprefix_to_height.reset()?;
// self.addressbyteshash_to_typeindex.reset()?;
}
if starting_indexes.txindex != TxIndex::ZERO {
vecs.txindex_to_txid
.iter_at(starting_indexes.txindex)
.iter()?
.enumerate()
.skip(starting_indexes.txindex.to_usize())
.for_each(|(txindex, txid)| {
let txidprefix = TxidPrefix::from(&txid.into_owned());
let txindex = TxIndex::from(txindex);
let txidprefix = TxidPrefix::from(&txid);
// "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599"
let is_not_first_dup = txindex != TxIndex::new(142783)
@@ -386,32 +334,80 @@ impl Stores {
}
});
} else {
self.txidprefix_to_txindex.reset()?;
unreachable!();
// self.txidprefix_to_txindex.reset()?;
}
if starting_indexes.txoutindex != TxOutIndex::ZERO {
// todo!();
// let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.into_iter();
// vecs.txoutindex_to_outputtype
// .iter_at(starting_indexes.txoutindex)
// .filter(|(_, outputtype)| outputtype.is_address())
// .for_each(|(txoutindex, outputtype)| {
// let outputtype = outputtype.into_owned();
let mut txoutindex_to_txindex_iter = vecs.txoutindex_to_txindex.iter()?;
let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?;
vecs.txoutindex_to_outputtype
.iter()?
.enumerate()
.skip(starting_indexes.txoutindex.to_usize())
.zip(
vecs.txoutindex_to_typeindex
.iter()?
.skip(starting_indexes.txoutindex.to_usize()),
)
.filter(|((_, outputtype), _)| outputtype.is_address())
.for_each(|((txoutindex, outputtype), typeindex)| {
let txindex = txoutindex_to_txindex_iter.unsafe_get_(txoutindex);
// let typeindex = txoutindex_to_typeindex_iter.unwrap_get_inner(txoutindex);
let vout = Vout::from(
txoutindex.to_usize()
- txindex_to_first_txoutindex_iter
.unsafe_get(txindex)
.to_usize(),
);
let outpoint = OutPoint::new(txindex, vout);
// self.addresstype_to_typeindex_and_unspentoutpoint
// .get_mut(outputtype)
// .unwrap()
// .remove(TypeIndexAndTxIndex::from((typeindex, txoutindex)));
// });
self.addresstype_to_typeindex_and_unspentoutpoint
.get_mut(outputtype)
.unwrap()
.remove(TypeIndexAndOutPoint::from((typeindex, outpoint)));
});
// Add back outputs that were spent after the rollback point
let mut txindex_to_first_txoutindex_iter = vecs.txindex_to_first_txoutindex.iter()?;
let mut txoutindex_to_outputtype_iter = vecs.txoutindex_to_outputtype.iter()?;
let mut txoutindex_to_typeindex_iter = vecs.txoutindex_to_typeindex.iter()?;
vecs.txinindex_to_outpoint
.iter()?
.skip(starting_indexes.txinindex.to_usize())
.for_each(|outpoint| {
if outpoint.is_coinbase() {
return;
}
let txindex = outpoint.txindex();
let vout = outpoint.vout();
// Calculate txoutindex from txindex and vout
let txoutindex = txindex_to_first_txoutindex_iter.unsafe_get(txindex) + vout;
// Only process if this output was created before the rollback point
if txoutindex < starting_indexes.txoutindex {
let outputtype = txoutindex_to_outputtype_iter.unsafe_get(txoutindex);
if outputtype.is_address() {
let typeindex = txoutindex_to_typeindex_iter.unsafe_get(txoutindex);
self.addresstype_to_typeindex_and_unspentoutpoint
.get_mut(outputtype)
.unwrap()
.insert(TypeIndexAndOutPoint::from((typeindex, outpoint)), Unit);
}
}
});
} else {
self.addresstype_to_typeindex_and_txindex
.iter_mut()
.try_for_each(|s| s.reset())?;
self.addresstype_to_typeindex_and_unspentoutpoint
.iter_mut()
.try_for_each(|s| s.reset())?;
unreachable!();
// self.addresstype_to_typeindex_and_txindex
// .iter_mut()
// .try_for_each(|s| s.reset())?;
// self.addresstype_to_typeindex_and_unspentoutpoint
// .iter_mut()
// .try_for_each(|s| s.reset())?;
}
self.commit(starting_indexes.height.decremented().unwrap_or_default())?;
+1 -1
View File
@@ -23,7 +23,7 @@ pub fn init(path: Option<&Path>) -> io::Result<()> {
});
Builder::from_env(Env::default().default_filter_or(
"info,bitcoin=off,bitcoincore-rpc=off,fjall=off,lsm_tree=off,rolldown=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off,brk_aide=off",
"debug,bitcoin=off,bitcoincore-rpc=off,rolldown=off,rolldown=off,rmcp=off,brk_rmcp=off,tracing=off,aide=off,brk_aide=off",
))
.format(move |buf, record| {
let date_time = Timestamp::now()
+2 -1
View File
@@ -18,7 +18,8 @@ byteview6 = { version = "=0.6.1", package = "byteview" }
byteview8 = { version = "~0.8.0", package = "byteview" }
candystore = "0.5.5"
fjall2 = { workspace = true }
# fjall3 = { workspace = true }
fjall3 = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
redb = { workspace = true }
rustc-hash = { workspace = true }
-2
View File
@@ -3,8 +3,6 @@ use brk_types::{Height, Version};
pub trait AnyStore: Send + Sync {
fn commit(&mut self, height: Height) -> Result<()>;
fn persist(&self) -> Result<()>;
// fn reset(&mut self) -> Result<()>;
fn name(&self) -> &'static str;
fn height(&self) -> Option<Height>;
fn has(&self, height: Height) -> bool;
@@ -4,8 +4,8 @@ use brk_error::Result;
use brk_types::{Height, Version};
use byteview6::ByteView;
use fjall2::{
InnerItem, PartitionCreateOptions, PersistMode, TransactionalKeyspace,
TransactionalPartitionHandle, ValueType,
InnerItem, PartitionCreateOptions, TransactionalKeyspace, TransactionalPartitionHandle,
ValueType,
};
use rustc_hash::{FxHashMap, FxHashSet};
@@ -16,7 +16,7 @@ mod meta;
use meta::*;
#[derive(Clone)]
pub struct StoreV2<Key, Value> {
pub struct StoreFjallV2<Key, Value> {
meta: StoreMeta,
name: &'static str,
keyspace: TransactionalKeyspace,
@@ -33,7 +33,7 @@ pub fn open_keyspace(path: &Path) -> fjall2::Result<TransactionalKeyspace> {
.open_transactional()
}
impl<K, V> StoreV2<K, V>
impl<K, V> StoreFjallV2<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
@@ -163,7 +163,7 @@ where
}
}
impl<K, V> AnyStore for StoreV2<K, V>
impl<K, V> AnyStore for StoreFjallV2<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
@@ -200,12 +200,6 @@ where
Ok(())
}
fn persist(&self) -> Result<()> {
self.keyspace
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
fn name(&self) -> &'static str {
self.name
}
@@ -227,7 +221,7 @@ where
}
}
pub enum Item<K, V> {
enum Item<K, V> {
Value { key: K, value: V },
Tomb(K),
}
@@ -5,7 +5,7 @@ use std::{
use brk_error::Result;
use brk_types::Version;
use fjall3::{PersistMode, TxDatabase, TxKeyspace};
use fjall3::{TxDatabase, TxKeyspace};
use super::Height;
@@ -18,7 +18,7 @@ pub struct StoreMeta {
impl StoreMeta {
pub fn checked_open<F>(
database: &TxDatabase,
_database: &TxDatabase,
path: &Path,
version: Version,
open_partition_handle: F,
@@ -28,18 +28,18 @@ impl StoreMeta {
{
fs::create_dir_all(path)?;
let mut partition = open_partition_handle()?;
let partition = open_partition_handle()?;
if Version::try_from(Self::path_version_(path).as_path())
.is_ok_and(|prev_version| version != prev_version)
{
todo!();
fs::remove_dir_all(path)?;
// Doesn't exist
// database.delete_partition(partition)?;
fs::create_dir(path)?;
database.persist(PersistMode::SyncAll)?;
partition = open_partition_handle()?;
// fs::remove_dir_all(path)?;
// // Doesn't exist
// // database.delete_partition(partition)?;
// fs::create_dir(path)?;
// database.persist(PersistMode::SyncAll)?;
// partition = open_partition_handle()?;
}
let slf = Self {
@@ -62,10 +62,6 @@ impl StoreMeta {
height.write(&self.path_height())
}
pub fn reset(&mut self) {
self.height.take();
}
pub fn path(&self) -> &Path {
&self.pathbuf
}
+289
View File
@@ -0,0 +1,289 @@
use std::{borrow::Cow, cmp, fmt::Debug, fs, hash::Hash, mem, path::Path};
use brk_error::Result;
use brk_types::{Height, Version};
use byteview8::ByteView;
use fjall3::{
KeyspaceCreateOptions, TxDatabase, TxKeyspace, ValueType,
config::{BloomConstructionPolicy, FilterPolicy, FilterPolicyEntry},
};
mod meta;
use meta::*;
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
#[derive(Clone)]
pub struct StoreFjallV3<Key, Value> {
meta: StoreMeta,
name: &'static str,
database: TxDatabase,
keyspace: TxKeyspace,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
}
const MAJOR_FJALL_VERSION: Version = Version::new(3);
pub fn open_fjall2_database(path: &Path) -> fjall3::Result<TxDatabase> {
TxDatabase::builder(path.join("fjall"))
.max_write_buffer_size(32 * 1024 * 1024)
.cache_size(1024 * 1024 * 1024)
.open()
}
impl<K, V> StoreFjallV3<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
{
fn open_keyspace(
database: &TxDatabase,
name: &str,
bloom_filters: Option<bool>,
) -> Result<TxKeyspace> {
let mut options = KeyspaceCreateOptions::default()
.manual_journal_persist(true)
.max_memtable_size(8 * 1024 * 1024);
if bloom_filters.is_some_and(|b| !b) {
options = options.filter_policy(FilterPolicy::disabled());
} else {
options = options.filter_policy(FilterPolicy::all(FilterPolicyEntry::Bloom(
BloomConstructionPolicy::BitsPerKey(5.0),
)));
}
database.keyspace(name, options).map_err(|e| e.into())
}
pub fn import(
database: &TxDatabase,
path: &Path,
name: &str,
version: Version,
bloom_filters: Option<bool>,
) -> Result<Self> {
fs::create_dir_all(path)?;
let (meta, keyspace) = StoreMeta::checked_open(
database,
&path.join(format!("meta/{name}")),
MAJOR_FJALL_VERSION + version,
|| {
Self::open_keyspace(database, name, bloom_filters).inspect_err(|e| {
eprintln!("{e}");
eprintln!("Delete {path:?} and try again");
})
},
)?;
Ok(Self {
meta,
name: Box::leak(Box::new(name.to_string())),
database: database.clone(),
keyspace,
puts: FxHashMap::default(),
dels: FxHashSet::default(),
})
}
#[inline]
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
ByteView: From<&'a K>,
{
if let Some(v) = self.puts.get(key) {
Ok(Some(Cow::Borrowed(v)))
// } else if let Some(slice) = self
// .database
// .read_tx()
// .get(&self.keyspace, ByteView::from(key))?
// {
} else if let Some(slice) = self.keyspace.get(ByteView::from(key))? {
Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
} else {
Ok(None)
}
}
#[inline]
pub fn is_empty(&self) -> Result<bool> {
self.database
.read_tx()
.is_empty(&self.keyspace)
.map_err(|e| e.into())
}
#[inline]
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
self.insert(key, value);
}
}
#[inline]
pub fn insert(&mut self, key: K, value: V) {
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
#[inline]
pub fn remove(&mut self, key: K) {
// Hot path: key was recently inserted
if self.puts.remove(&key).is_some() {
return;
}
let newly_inserted = self.dels.insert(key);
debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
}
#[inline]
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
}
}
#[inline]
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
#[inline]
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
}
impl<K, V> AnyStore for StoreFjallV3<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
fn commit(&mut self, height: Height) -> Result<()> {
if self.has(height) {
return Ok(());
}
self.meta.export(height)?;
if self.puts.is_empty() && self.dels.is_empty() {
return Ok(());
}
let mut batch = self.database.inner().batch();
let mut items = mem::take(&mut self.puts)
.into_iter()
.map(|(key, value)| Item::Value { key, value })
.chain(
mem::take(&mut self.dels)
.into_iter()
.map(|key| Item::Tomb(key)),
)
.collect::<Vec<_>>();
items.sort_unstable();
batch.data = items
.into_iter()
.map(|i| i.fjall(&self.keyspace))
.collect::<Vec<_>>();
batch.commit_keyspace(self.keyspace.inner())?;
// let mut wtx = self.database.write_tx();
// let mut dels = self.dels.drain().collect::<Vec<_>>();
// dels.sort_unstable();
// dels.into_iter()
// .for_each(|key| wtx.remove(&self.keyspace, ByteView::from(key)));
// let mut puts = self.puts.drain().collect::<Vec<_>>();
// puts.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
// puts.into_iter().for_each(|(key, value)| {
// wtx.insert(&self.keyspace, ByteView::from(key), ByteView::from(value))
// });
// wtx.commit()?;
Ok(())
}
fn name(&self) -> &'static str {
self.name
}
fn height(&self) -> Option<Height> {
self.meta.height()
}
fn has(&self, height: Height) -> bool {
self.has(height)
}
fn needs(&self, height: Height) -> bool {
self.needs(height)
}
fn version(&self) -> Version {
self.meta.version()
}
}
enum Item<K, V> {
Value { key: K, value: V },
Tomb(K),
}
impl<K: Ord, V> Ord for Item<K, V> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.key().cmp(other.key())
}
}
impl<K: Ord, V> PartialOrd for Item<K, V> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<K: Eq, V> PartialEq for Item<K, V> {
fn eq(&self, other: &Self) -> bool {
self.key() == other.key()
}
}
impl<K: Eq, V> Eq for Item<K, V> {}
impl<K, V> Item<K, V> {
fn key(&self) -> &K {
match self {
Self::Value { key, .. } | Self::Tomb(key) => key,
}
}
pub fn fjall(self, keyspace: &TxKeyspace) -> fjall3::Item
where
K: Into<ByteView>,
V: Into<ByteView>,
{
match self {
Item::Value { key, value } => fjall3::Item {
keyspace_id: keyspace.inner().id,
key: key.into().into(),
value: value.into().into(),
value_type: ValueType::Value,
},
Item::Tomb(key) => fjall3::Item {
keyspace_id: keyspace.inner().id,
key: key.into().into(),
value: [].into(),
value_type: ValueType::WeakTombstone,
},
}
}
}
+6 -4
View File
@@ -1,9 +1,11 @@
#![doc = include_str!("../README.md")]
mod any;
mod v2;
// mod v3;
mod fjall_v2;
mod fjall_v3;
mod redb;
pub use any::*;
pub use v2::*;
// pub use v3::*;
pub use fjall_v2::*;
pub use fjall_v3::*;
pub use redb::*;
+77
View File
@@ -0,0 +1,77 @@
use std::{
fs, io,
path::{Path, PathBuf},
};
use brk_error::Result;
use brk_types::Version;
use super::Height;
#[derive(Debug, Clone)]
pub struct StoreMeta {
pathbuf: PathBuf,
version: Version,
height: Option<Height>,
}
impl StoreMeta {
pub fn checked_open(path: &Path, version: Version) -> Result<Self> {
fs::create_dir_all(path)?;
if Version::try_from(Self::path_version_(path).as_path())
.is_ok_and(|prev_version| version != prev_version)
{
todo!();
}
let slf = Self {
pathbuf: path.to_owned(),
version,
height: Height::try_from(Self::path_height_(path).as_path()).ok(),
};
slf.version.write(&slf.path_version())?;
Ok(slf)
}
pub fn version(&self) -> Version {
self.version
}
pub fn export(&mut self, height: Height) -> io::Result<()> {
self.height = Some(height);
height.write(&self.path_height())
}
pub fn path(&self) -> &Path {
&self.pathbuf
}
fn path_version(&self) -> PathBuf {
Self::path_version_(&self.pathbuf)
}
fn path_version_(path: &Path) -> PathBuf {
path.join("version")
}
#[inline]
pub fn height(&self) -> Option<Height> {
self.height
}
#[inline]
pub fn needs(&self, height: Height) -> bool {
self.height.is_none_or(|self_height| height > self_height)
}
#[inline]
pub fn has(&self, height: Height) -> bool {
!self.needs(height)
}
fn path_height(&self) -> PathBuf {
Self::path_height_(&self.pathbuf)
}
fn path_height_(path: &Path) -> PathBuf {
path.join("height")
}
}
+232
View File
@@ -0,0 +1,232 @@
use std::{
borrow::{Borrow, Cow},
fmt::Debug,
fs,
hash::Hash,
mem::{self, transmute},
path::Path,
sync::Arc,
};
use brk_error::Result;
use brk_types::{Height, Version};
use parking_lot::RwLock;
use redb::{
Builder, Database, Durability, Key, ReadOnlyTable, ReadableDatabase, ReadableTableMetadata,
TableDefinition, Value,
};
mod meta;
use meta::*;
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
#[derive(Clone)]
pub struct StoreRedb<K, V>
where
K: Key + 'static,
V: Value + 'static,
{
meta: StoreMeta,
name: &'static str,
db: Arc<Database>,
table: Arc<RwLock<Option<ReadOnlyTable<K, V>>>>,
puts: FxHashMap<K, V>,
dels: FxHashSet<K>,
}
const MAJOR_FJALL_VERSION: Version = Version::new(3);
pub fn open_redb_database(path: &Path) -> redb::Result<Database> {
let db = Builder::new()
.set_cache_size(4 * 1024 * 1024 * 1024)
.create(path.join("store.redb"))
.unwrap();
Ok(db)
}
impl<K, V> StoreRedb<K, V>
where
K: Key + Ord + Eq + Hash + 'static,
V: Value + Clone + 'static,
{
pub fn import(
db: &Arc<Database>,
path: &Path,
name: &str,
version: Version,
_bloom_filters: Option<bool>,
) -> Result<Self> {
fs::create_dir_all(path)?;
let meta = StoreMeta::checked_open(
&path.join(format!("meta/{name}")),
MAJOR_FJALL_VERSION + version,
)?;
{
let mut wtx = db.begin_write().unwrap();
wtx.set_durability(Durability::Immediate).unwrap();
let definition: TableDefinition<K, V> = TableDefinition::new(name);
let table = wtx.open_table(definition).unwrap();
drop(table);
wtx.commit().unwrap();
}
let definition: TableDefinition<K, V> = TableDefinition::new(name);
let table = db.begin_read().unwrap().open_table(definition).unwrap();
Ok(Self {
db: db.clone(),
meta,
name: Box::leak(Box::new(name.to_string())),
table: Arc::new(RwLock::new(Some(table))),
puts: FxHashMap::default(),
dels: FxHashSet::default(),
})
}
// In case my hack doesn't work:
// https://github.com/cberner/redb/issues/869
#[inline]
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
&'a K: Borrow<K::SelfType<'a>>,
V: From<V::SelfType<'static>>,
{
if let Some(v) = self.puts.get(key) {
Ok(Some(Cow::Borrowed(v)))
} else if let Some(value) = self.table.read().as_ref().unwrap().get(key).unwrap() {
let selftype: <V as Value>::SelfType<'static> = unsafe { transmute(value.value()) };
let owned: V = selftype.into();
Ok(Some(Cow::Owned(owned)))
} else {
Ok(None)
}
}
#[inline]
pub fn is_empty(&self) -> Result<bool> {
Ok(self.table.read().as_ref().unwrap().len().unwrap() == 0)
}
#[inline]
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
self.insert(key, value);
}
}
#[inline]
pub fn insert(&mut self, key: K, value: V) {
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
#[inline]
pub fn remove(&mut self, key: K) {
// Hot path: key was recently inserted
if self.puts.remove(&key).is_some() {
return;
}
let newly_inserted = self.dels.insert(key);
debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
}
#[inline]
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
}
}
#[inline]
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
#[inline]
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
}
impl<K, V> AnyStore for StoreRedb<K, V>
where
// K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
// V: Debug + Clone + From<ByteView>,
K: Debug + Clone + Key + Ord + Eq + Hash + 'static + Borrow<K::SelfType<'static>>,
V: Debug + Clone + Value + 'static + Borrow<V::SelfType<'static>>,
// ByteView: From<K> + From<V>,
Self: Send + Sync,
{
fn commit(&mut self, height: Height) -> Result<()> {
if self.has(height) {
return Ok(());
}
self.meta.export(height)?;
if self.puts.is_empty() && self.dels.is_empty() {
return Ok(());
}
// let mut _rtx_lock = self._rtx.write();
// drop(_rtx_lock.take());
let mut table_lock = self.table.write();
drop(table_lock.take());
let mut wtx = self.db.begin_write().unwrap();
wtx.set_durability(Durability::Immediate).unwrap();
let definition: TableDefinition<K, V> = TableDefinition::new(self.name);
let mut table = wtx.open_table(definition).unwrap();
mem::take(&mut self.puts)
.into_iter()
.for_each(|(key, value)| {
table.insert(key, value).unwrap();
});
mem::take(&mut self.dels).into_iter().for_each(|key| {
table.remove(key).unwrap();
});
drop(table);
wtx.commit().unwrap();
table_lock.replace(
self.db
.begin_read()
.unwrap()
.open_table(definition)
.unwrap(),
);
Ok(())
}
fn name(&self) -> &'static str {
self.name
}
fn height(&self) -> Option<Height> {
self.meta.height()
}
fn has(&self, height: Height) -> bool {
self.has(height)
}
fn needs(&self, height: Height) -> bool {
self.needs(height)
}
fn version(&self) -> Version {
self.meta.version()
}
}
-256
View File
@@ -1,256 +0,0 @@
use std::{borrow::Cow, fmt::Debug, fs, hash::Hash, path::Path, sync::Arc};
use brk_error::Result;
use brk_types::{Height, Version};
use byteview8::ByteView;
use fjall3::{KeyspaceCreateOptions, PersistMode, TxDatabase, TxKeyspace};
mod meta;
use log::info;
use meta::*;
use parking_lot::RwLock;
use rustc_hash::{FxHashMap, FxHashSet};
use crate::any::AnyStore;
#[derive(Clone)]
pub struct StoreV3<Key, Value> {
meta: StoreMeta,
name: &'static str,
database: TxDatabase,
keyspace: Arc<RwLock<Option<TxKeyspace>>>,
puts: FxHashMap<Key, Value>,
dels: FxHashSet<Key>,
}
const MAJOR_FJALL_VERSION: Version = Version::new(3);
pub fn open_database(path: &Path) -> fjall3::Result<TxDatabase> {
TxDatabase::builder(path.join("fjall"))
.cache_size(4 * 1024 * 1024 * 1024)
// .max_write_buffer_size(bytes)
.open()
}
impl<K, V> StoreV3<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
{
fn open_keyspace(database: &TxDatabase, name: &str) -> Result<TxKeyspace> {
database
.keyspace(
name,
KeyspaceCreateOptions::default().max_memtable_size(8 * 1024 * 1024), // .manual_journal_persist(true),
)
.map_err(|e| e.into())
}
pub fn import(
database: &TxDatabase,
path: &Path,
name: &str,
version: Version,
_bloom_filters: Option<bool>,
) -> Result<Self> {
fs::create_dir_all(path)?;
let (meta, keyspace) = StoreMeta::checked_open(
database,
&path.join(format!("meta/{name}")),
MAJOR_FJALL_VERSION + version,
|| {
Self::open_keyspace(database, name).inspect_err(|e| {
eprintln!("{e}");
eprintln!("Delete {path:?} and try again");
})
},
)?;
Ok(Self {
meta,
name: Box::leak(Box::new(name.to_string())),
database: database.clone(),
keyspace: Arc::new(RwLock::new(Some(keyspace))),
puts: FxHashMap::default(),
dels: FxHashSet::default(),
})
}
#[inline]
pub fn get<'a>(&'a self, key: &'a K) -> Result<Option<Cow<'a, V>>>
where
ByteView: From<&'a K>,
{
if let Some(v) = self.puts.get(key) {
Ok(Some(Cow::Borrowed(v)))
} else if let Some(slice) = self
.database
.read_tx()
.get(self.keyspace.read().as_ref().unwrap(), ByteView::from(key))?
{
Ok(Some(Cow::Owned(V::from(ByteView::from(slice)))))
} else {
Ok(None)
}
}
#[inline]
pub fn is_empty(&self) -> Result<bool> {
self.database
.read_tx()
.is_empty(self.keyspace.read().as_ref().unwrap())
.map_err(|e| e.into())
}
// pub fn iter(&self) -> impl Iterator<Item = (K, V)> {
// let keyspace = self.keyspace.read().as_ref().unwrap();
// self.rtx
// .read()
// .as_ref()
// .unwrap()
// .iter(keyspace)
// .map(|res| res.into_inner().unwrap())
// .map(|(k, v)| (K::from(ByteView::from(k)), V::from(ByteView::from(v))))
// }
#[inline]
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
let _ = self.dels.is_empty() || self.dels.remove(&key);
self.puts.insert(key, value);
}
}
#[inline]
pub fn remove(&mut self, key: K) {
// Hot path: key was recently inserted
if self.puts.remove(&key).is_some() {
return;
}
let newly_inserted = self.dels.insert(key);
debug_assert!(newly_inserted, "Double deletion at {:?}", self.meta.path());
}
#[inline]
pub fn remove_if_needed(&mut self, key: K, height: Height) {
if self.needs(height) {
self.remove(key)
}
}
// pub fn retain_or_del<F>(&mut self, retain: F)
// where
// F: Fn(&K, &mut V) -> bool,
// {
// self.puts.retain(|k, v| {
// let ret = retain(k, v);
// if !ret {
// self.dels.insert(k.clone());
// }
// ret
// });
// }
#[inline]
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
#[inline]
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
}
impl<K, V> AnyStore for StoreV3<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + Eq + Hash,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<V>,
Self: Send + Sync,
{
fn commit(&mut self, height: Height) -> Result<()> {
if self.has(height) {
return Ok(());
}
self.meta.export(height)?;
if self.puts.is_empty() && self.dels.is_empty() {
return Ok(());
}
let mut wtx = self.database.write_tx();
let keyspace = self.keyspace.read();
let partition = keyspace.as_ref().unwrap();
let mut dels = self.dels.drain().collect::<Vec<_>>();
dels.sort_unstable();
dels.into_iter()
.for_each(|key| wtx.remove(partition, ByteView::from(key)));
let mut puts = self.puts.drain().collect::<Vec<_>>();
puts.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
puts.into_iter().for_each(|(key, value)| {
wtx.insert(partition, ByteView::from(key), ByteView::from(value))
});
wtx.commit()?;
Ok(())
}
fn persist(&self) -> Result<()> {
self.database
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
fn name(&self) -> &'static str {
self.name
}
// fn reset(&mut self) -> Result<()> {
// info!("Resetting {}...", self.name);
// todo!();
// let mut opt = self.keyspace.write();
// let keyspace = opt.take().unwrap();
// // Doesn't exist yet
// // self.database.remove_keyspace(keyspace)?;
// self.meta.reset();
// let keyspace = Self::open_keyspace(&self.database, self.name)?;
// opt.replace(keyspace);
// Ok(())
// }
fn height(&self) -> Option<Height> {
self.meta.height()
}
fn has(&self, height: Height) -> bool {
self.has(height)
}
fn needs(&self, height: Height) -> bool {
self.needs(height)
}
fn version(&self) -> Version {
self.meta.version()
}
}
+1
View File
@@ -19,6 +19,7 @@ itoa = "1.0.15"
jiff = { workspace = true }
num_enum = "0.7.5"
rapidhash = "4.1.1"
redb = { workspace = true }
ryu = "1.0.20"
schemars = { workspace = true }
serde = { workspace = true }
+40
View File
@@ -1,5 +1,8 @@
use std::{cmp::Ordering, mem};
use byteview::ByteView;
use derive_deref::Deref;
use redb::{Key, TypeName, Value};
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use super::AddressBytes;
@@ -48,3 +51,40 @@ impl From<&AddressBytesHash> for ByteView {
Self::new(value.as_bytes())
}
}
impl Value for AddressBytesHash {
type SelfType<'a> = AddressBytesHash;
type AsBytes<'a>
= [u8; mem::size_of::<u64>()]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(mem::size_of::<u64>())
}
fn from_bytes<'a>(data: &'a [u8]) -> AddressBytesHash
where
Self: 'a,
{
AddressBytesHash(u64::from_le_bytes(data.try_into().unwrap()))
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::<u64>()]
where
Self: 'a,
Self: 'b,
{
value.0.to_le_bytes()
}
fn type_name() -> TypeName {
TypeName::new("AddressBytesHash")
}
}
impl Key for AddressBytesHash {
fn compare(data1: &[u8], data2: &[u8]) -> Ordering {
Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
}
}
+40
View File
@@ -1,5 +1,8 @@
use std::{cmp::Ordering, mem};
use byteview::ByteView;
use derive_deref::Deref;
use redb::{Key, TypeName, Value};
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::copy_first_8bytes;
@@ -57,3 +60,40 @@ impl From<BlockHashPrefix> for ByteView {
Self::from(&value)
}
}
impl Value for BlockHashPrefix {
type SelfType<'a> = BlockHashPrefix;
type AsBytes<'a>
= [u8; mem::size_of::<u64>()]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(mem::size_of::<u64>())
}
fn from_bytes<'a>(data: &'a [u8]) -> BlockHashPrefix
where
Self: 'a,
{
BlockHashPrefix(u64::from_le_bytes(data.try_into().unwrap()))
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::<u64>()]
where
Self: 'a,
Self: 'b,
{
value.0.to_le_bytes()
}
fn type_name() -> TypeName {
TypeName::new("BlockHashPrefix")
}
}
impl Key for BlockHashPrefix {
fn compare(data1: &[u8], data2: &[u8]) -> Ordering {
Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
}
}
+40
View File
@@ -1,11 +1,14 @@
use std::{
cmp::Ordering,
fmt::Debug,
mem,
ops::{Add, AddAssign, Rem},
};
use allocative::Allocative;
use byteview::ByteView;
use derive_deref::Deref;
use redb::{Key, TypeName, Value};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use vecdb::{CheckedSub, PrintableIndex, Stamp, StoredCompressed};
@@ -280,3 +283,40 @@ impl std::fmt::Display for Height {
f.write_str(str)
}
}
impl Value for Height {
type SelfType<'a> = Height;
type AsBytes<'a>
= [u8; mem::size_of::<u32>()]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(mem::size_of::<u32>())
}
fn from_bytes<'a>(data: &'a [u8]) -> Height
where
Self: 'a,
{
Height(u32::from_le_bytes(data.try_into().unwrap()))
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::<u32>()]
where
Self: 'a,
Self: 'b,
{
value.0.to_le_bytes()
}
fn type_name() -> TypeName {
TypeName::new("Height")
}
}
impl Key for Height {
fn compare(data1: &[u8], data2: &[u8]) -> Ordering {
Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
}
}
+35 -1
View File
@@ -1,7 +1,8 @@
use std::borrow::Cow;
use std::{borrow::Cow, str};
use byteview::ByteView;
use derive_deref::Deref;
use redb::{TypeName, Value};
use serde::Serialize;
use vecdb::PrintableIndex;
@@ -67,3 +68,36 @@ impl PrintableIndex for StoredString {
&["string"]
}
}
impl Value for StoredString {
type SelfType<'a>
= StoredString
where
Self: 'a;
type AsBytes<'a>
= &'a str
where
Self: 'a;
fn fixed_width() -> Option<usize> {
None
}
fn from_bytes<'a>(data: &'a [u8]) -> StoredString
where
Self: 'a,
{
StoredString(str::from_utf8(data).unwrap().to_string())
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a str
where
Self: 'b,
{
value.as_str()
}
fn type_name() -> TypeName {
TypeName::new("StoredString")
}
}
+40
View File
@@ -1,5 +1,8 @@
use std::{cmp::Ordering, mem};
use byteview::ByteView;
use derive_deref::Deref;
use redb::{Key, TypeName, Value};
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::copy_first_8bytes;
@@ -64,3 +67,40 @@ impl From<[u8; 8]> for TxidPrefix {
Self(u64::from_ne_bytes(value))
}
}
impl Value for TxidPrefix {
type SelfType<'a> = TxidPrefix;
type AsBytes<'a>
= [u8; mem::size_of::<u64>()]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(mem::size_of::<u64>())
}
fn from_bytes<'a>(data: &'a [u8]) -> TxidPrefix
where
Self: 'a,
{
TxidPrefix(u64::from_le_bytes(data.try_into().unwrap()))
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::<u64>()]
where
Self: 'a,
Self: 'b,
{
value.0.to_le_bytes()
}
fn type_name() -> TypeName {
TypeName::new("TxidPrefix")
}
}
impl Key for TxidPrefix {
fn compare(data1: &[u8], data2: &[u8]) -> Ordering {
Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
}
}
+36 -1
View File
@@ -1,8 +1,12 @@
use std::ops::{Add, AddAssign};
use std::{
mem,
ops::{Add, AddAssign},
};
use allocative::Allocative;
use byteview::ByteView;
use derive_deref::{Deref, DerefMut};
use redb::{TypeName, Value};
use schemars::JsonSchema;
use serde::Serialize;
use vecdb::{CheckedSub, PrintableIndex, StoredCompressed};
@@ -166,3 +170,34 @@ impl std::fmt::Display for TxIndex {
f.write_str(str)
}
}
impl Value for TxIndex {
type SelfType<'a> = TxIndex;
type AsBytes<'a>
= [u8; mem::size_of::<u32>()]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(mem::size_of::<u32>())
}
fn from_bytes<'a>(data: &'a [u8]) -> TxIndex
where
Self: 'a,
{
TxIndex(u32::from_le_bytes(data.try_into().unwrap()))
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::<u32>()]
where
Self: 'a,
Self: 'b,
{
value.0.to_le_bytes()
}
fn type_name() -> TypeName {
TypeName::new("TxIndex")
}
}
+33 -1
View File
@@ -1,6 +1,7 @@
use std::ops::Add;
use std::{mem, ops::Add};
use byteview::ByteView;
use redb::{TypeName, Value};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use vecdb::{CheckedSub, StoredCompressed};
@@ -150,3 +151,34 @@ impl std::fmt::Display for TypeIndex {
f.write_str(str)
}
}
impl Value for TypeIndex {
type SelfType<'a> = TypeIndex;
type AsBytes<'a>
= [u8; mem::size_of::<u32>()]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(mem::size_of::<u32>())
}
fn from_bytes<'a>(data: &'a [u8]) -> TypeIndex
where
Self: 'a,
{
TypeIndex(u32::from_le_bytes(data.try_into().unwrap()))
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::<u32>()]
where
Self: 'a,
Self: 'b,
{
value.0.to_le_bytes()
}
fn type_name() -> TypeName {
TypeName::new("TypeIndex")
}
}
+51 -1
View File
@@ -1,6 +1,10 @@
use std::hash::{Hash, Hasher};
use std::{
cmp::Ordering,
hash::{Hash, Hasher},
};
use byteview::ByteView;
use redb::{Key, TypeName, Value};
use serde::Serialize;
use zerocopy::IntoBytes;
@@ -62,3 +66,49 @@ impl From<&TypeIndexAndOutPoint> for ByteView {
)
}
}
impl Value for TypeIndexAndOutPoint {
type SelfType<'a> = TypeIndexAndOutPoint;
type AsBytes<'a>
= [u8; 10]
// 8 bytes (u64) + 2 bytes (u16)
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(10) // 8 + 2
}
fn from_bytes<'a>(data: &'a [u8]) -> TypeIndexAndOutPoint
where
Self: 'a,
{
TypeIndexAndOutPoint {
typeindexandtxindex: TypeIndexAndTxIndex::from_bytes(&data[0..8]),
vout: Vout::from_bytes(&data[8..10]),
}
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 10]
where
Self: 'a,
Self: 'b,
{
let mut bytes = [0u8; 10];
bytes[0..8].copy_from_slice(&<TypeIndexAndTxIndex as redb::Value>::as_bytes(
&value.typeindexandtxindex,
));
bytes[8..10].copy_from_slice(&<Vout as redb::Value>::as_bytes(&value.vout));
bytes
}
fn type_name() -> TypeName {
TypeName::new("TypeIndexAndOutPoint")
}
}
impl Key for TypeIndexAndOutPoint {
fn compare(data1: &[u8], data2: &[u8]) -> Ordering {
Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
}
}
@@ -1,4 +1,7 @@
use std::{cmp::Ordering, mem};
use byteview::ByteView;
use redb::{Key, TypeName, Value};
use serde::Serialize;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
@@ -78,3 +81,40 @@ impl From<TypeIndexAndTxIndex> for u64 {
value.0
}
}
impl Value for TypeIndexAndTxIndex {
type SelfType<'a> = TypeIndexAndTxIndex;
type AsBytes<'a>
= [u8; mem::size_of::<u64>()]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(mem::size_of::<u64>())
}
fn from_bytes<'a>(data: &'a [u8]) -> TypeIndexAndTxIndex
where
Self: 'a,
{
TypeIndexAndTxIndex(u64::from_le_bytes(data.try_into().unwrap()))
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::<u64>()]
where
Self: 'a,
Self: 'b,
{
value.0.to_le_bytes()
}
fn type_name() -> TypeName {
TypeName::new("TypeIndexAndTxIndex")
}
}
impl Key for TypeIndexAndTxIndex {
fn compare(data1: &[u8], data2: &[u8]) -> Ordering {
Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
}
}
+36
View File
@@ -1,4 +1,5 @@
use byteview::ByteView;
use redb::{TypeName, Value};
#[derive(Debug, Clone)]
pub struct Unit;
@@ -15,3 +16,38 @@ impl From<Unit> for ByteView {
Self::new(&[])
}
}
impl Value for Unit {
type SelfType<'a>
= Unit
where
Self: 'a;
type AsBytes<'a>
= &'a [u8]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(0)
}
#[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
fn from_bytes<'a>(_data: &'a [u8]) -> Unit
where
Self: 'a,
{
Unit
}
#[allow(clippy::ignored_unit_patterns)]
fn as_bytes<'a, 'b: 'a>(_: &'a Self::SelfType<'b>) -> &'a [u8]
where
Self: 'b,
{
&[]
}
fn type_name() -> TypeName {
TypeName::new("Unit")
}
}
+34
View File
@@ -1,5 +1,8 @@
use std::mem;
use allocative::Allocative;
use derive_deref::Deref;
use redb::{TypeName, Value};
use schemars::JsonSchema;
use serde::Serialize;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
@@ -107,3 +110,34 @@ impl std::fmt::Display for Vout {
self.0.fmt(f)
}
}
impl Value for Vout {
type SelfType<'a> = Vout;
type AsBytes<'a>
= [u8; mem::size_of::<u16>()]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(mem::size_of::<u16>())
}
fn from_bytes<'a>(data: &'a [u8]) -> Vout
where
Self: 'a,
{
Vout(u16::from_le_bytes(data.try_into().unwrap()))
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; mem::size_of::<u16>()]
where
Self: 'a,
Self: 'b,
{
value.0.to_le_bytes()
}
fn type_name() -> TypeName {
TypeName::new("Vout")
}
}