diff --git a/crates/brk_core/src/structs/height.rs b/crates/brk_core/src/structs/height.rs index 3e5b3573d..23cae0d5a 100644 --- a/crates/brk_core/src/structs/height.rs +++ b/crates/brk_core/src/structs/height.rs @@ -28,8 +28,8 @@ use crate::CheckedSub; pub struct Height(u32); impl Height { - pub const ZERO: Self = Height(0); - pub const MAX: Self = Height(u32::MAX); + pub const ZERO: Self = Self(0); + pub const MAX: Self = Self(u32::MAX); pub fn new(height: u32) -> Self { Self(height) diff --git a/crates/brk_core/src/structs/txindex.rs b/crates/brk_core/src/structs/txindex.rs index 5ad4c19a1..2095713c4 100644 --- a/crates/brk_core/src/structs/txindex.rs +++ b/crates/brk_core/src/structs/txindex.rs @@ -29,6 +29,12 @@ use super::StoredU32; pub struct TxIndex(u32); impl TxIndex { + pub const ZERO: Self = Self(0); + + pub fn new(txindex: u32) -> Self { + Self(txindex) + } + pub fn incremented(self) -> Self { Self(*self + 1) } diff --git a/crates/brk_core/src/structs/weight.rs b/crates/brk_core/src/structs/weight.rs index a4975ea4a..7ec7bd5c8 100644 --- a/crates/brk_core/src/structs/weight.rs +++ b/crates/brk_core/src/structs/weight.rs @@ -19,36 +19,29 @@ use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; FromBytes, Serialize, )] -pub struct Weight(u32); +pub struct Weight(u64); impl From for Weight { fn from(value: bitcoin::Weight) -> Self { - let wu = value.to_wu(); - if wu > u32::MAX as u64 { - unreachable!("wu is too big, shouldn't happen") - } - Self(wu as u32) + Self(value.to_wu()) } } impl From for bitcoin::Weight { fn from(value: Weight) -> Self { - Self::from_wu(*value as u64) + Self::from_wu(value.0) } } impl From for Weight { fn from(value: usize) -> Self { - if value > u32::MAX as usize { - panic!() - } - Self(value as u32) + Self(value as u64) } } impl From for Weight { fn from(value: f64) -> Self { - Self(value as u32) + Self(value as u64) } } diff --git a/crates/brk_indexer/src/stores/base.rs b/crates/brk_indexer/src/stores/base.rs index 1bfbd4e2d..024a1f212 100644 --- a/crates/brk_indexer/src/stores/base.rs +++ b/crates/brk_indexer/src/stores/base.rs @@ -10,7 +10,7 @@ use brk_core::Height; use brk_vec::{Value, Version}; use byteview::ByteView; use fjall::{ - PartitionCreateOptions, ReadTransaction, Result, TransactionalKeyspace, + PartitionCreateOptions, PersistMode, ReadTransaction, Result, TransactionalKeyspace, TransactionalPartitionHandle, }; use zerocopy::{Immutable, IntoBytes}; @@ -19,6 +19,7 @@ use super::StoreMeta; pub struct Store { meta: StoreMeta, + name: String, keyspace: TransactionalKeyspace, partition: TransactionalPartitionHandle, rtx: ReadTransaction, @@ -58,6 +59,7 @@ where Ok(Self { meta, + name: name.to_owned(), keyspace, partition, rtx, @@ -177,6 +179,13 @@ where .manual_journal_persist(true), ) } + + pub fn reset_partition(&mut self) -> Result<()> { + self.keyspace.delete_partition(self.partition.clone())?; + self.keyspace.persist(PersistMode::SyncAll)?; + self.partition = Self::open_partition_handle(&self.keyspace, &self.name)?; + Ok(()) + } } impl Clone for Store @@ -187,6 +196,7 @@ where fn clone(&self) -> Self { Self { meta: self.meta.clone(), + name: self.name.clone(), keyspace: self.keyspace.clone(), partition: self.partition.clone(), rtx: self.keyspace.read_tx(), diff --git a/crates/brk_indexer/src/stores/mod.rs b/crates/brk_indexer/src/stores/mod.rs index 2c76ccbfa..43f4f4978 100644 --- a/crates/brk_indexer/src/stores/mod.rs +++ b/crates/brk_indexer/src/stores/mod.rs @@ -86,137 +86,173 @@ impl Stores { return Ok(()); } - vecs.height_to_blockhash - .iter_from(starting_indexes.height, |(_, blockhash, ..)| { - let blockhashprefix = BlockHashPrefix::from(blockhash); - self.blockhashprefix_to_height.remove(blockhashprefix); - Ok(()) - })?; + if starting_indexes.height != Height::ZERO { + vecs.height_to_blockhash + .iter_from(starting_indexes.height, |(_, blockhash, ..)| { + let blockhashprefix = BlockHashPrefix::from(blockhash); + self.blockhashprefix_to_height.remove(blockhashprefix); + Ok(()) + })?; - vecs.txindex_to_txid - .iter_from(starting_indexes.txindex, |(_txindex, txid, ..)| { - let txidprefix = TxidPrefix::from(txid); - self.txidprefix_to_txindex.remove(txidprefix); - Ok(()) - })?; - - if let Some(index) = vecs - .height_to_first_p2pk65index - .get(starting_indexes.height)? - { - let mut index = index.into_inner(); - while let Some(typedbytes) = vecs - .p2pk65index_to_p2pk65bytes - .get(index)? + if let Some(mut index) = vecs + .height_to_first_p2pk65index + .get(starting_indexes.height)? .map(Value::into_inner) { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from((&bytes, OutputType::P2PK65)); - self.addressbyteshash_to_outputtypeindex.remove(hash); - index.increment(); + while let Some(typedbytes) = vecs + .p2pk65index_to_p2pk65bytes + .get(index)? + .map(Value::into_inner) + { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from((&bytes, OutputType::P2PK65)); + self.addressbyteshash_to_outputtypeindex.remove(hash); + index.increment(); + } } + + if let Some(mut index) = vecs + .height_to_first_p2pk33index + .get(starting_indexes.height)? + .map(Value::into_inner) + { + while let Some(typedbytes) = vecs + .p2pk33index_to_p2pk33bytes + .get(index)? + .map(Value::into_inner) + { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from((&bytes, OutputType::P2PK33)); + self.addressbyteshash_to_outputtypeindex.remove(hash); + index.increment(); + } + } + + if let Some(mut index) = vecs + .height_to_first_p2pkhindex + .get(starting_indexes.height)? + .map(Value::into_inner) + { + while let Some(typedbytes) = vecs + .p2pkhindex_to_p2pkhbytes + .get(index)? + .map(Value::into_inner) + { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from((&bytes, OutputType::P2PKH)); + self.addressbyteshash_to_outputtypeindex.remove(hash); + index.increment(); + } + } + + if let Some(mut index) = vecs + .height_to_first_p2shindex + .get(starting_indexes.height)? + .map(Value::into_inner) + { + while let Some(typedbytes) = vecs + .p2shindex_to_p2shbytes + .get(index)? + .map(Value::into_inner) + { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from((&bytes, OutputType::P2SH)); + self.addressbyteshash_to_outputtypeindex.remove(hash); + index.increment(); + } + } + + if let Some(mut index) = vecs + .height_to_first_p2trindex + .get(starting_indexes.height)? + .map(Value::into_inner) + { + while let Some(typedbytes) = vecs + .p2trindex_to_p2trbytes + .get(index)? + .map(Value::into_inner) + { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from((&bytes, OutputType::P2TR)); + self.addressbyteshash_to_outputtypeindex.remove(hash); + index.increment(); + } + } + + if let Some(mut index) = vecs + .height_to_first_p2wpkhindex + .get(starting_indexes.height)? + .map(Value::into_inner) + { + while let Some(typedbytes) = vecs + .p2wpkhindex_to_p2wpkhbytes + .get(index)? + .map(Value::into_inner) + { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from((&bytes, OutputType::P2WPKH)); + self.addressbyteshash_to_outputtypeindex.remove(hash); + index.increment(); + } + } + + if let Some(mut index) = vecs + .height_to_first_p2wshindex + .get(starting_indexes.height)? + .map(Value::into_inner) + { + while let Some(typedbytes) = vecs + .p2wshindex_to_p2wshbytes + .get(index)? + .map(Value::into_inner) + { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from((&bytes, OutputType::P2WSH)); + self.addressbyteshash_to_outputtypeindex.remove(hash); + index.increment(); + } + } + + if let Some(mut index) = vecs + .height_to_first_p2aindex + .get(starting_indexes.height)? + .map(Value::into_inner) + { + while let Some(typedbytes) = + vecs.p2aindex_to_p2abytes.get(index)?.map(Value::into_inner) + { + let bytes = AddressBytes::from(typedbytes); + let hash = AddressBytesHash::from((&bytes, OutputType::P2A)); + self.addressbyteshash_to_outputtypeindex.remove(hash); + index.increment(); + } + } + } else { + self.blockhashprefix_to_height.reset_partition()?; + self.addressbyteshash_to_outputtypeindex.reset_partition()?; } - if let Some(index) = vecs - .height_to_first_p2pk33index - .get(starting_indexes.height)? - { - let mut index = index.into_inner(); - while let Some(typedbytes) = vecs - .p2pk33index_to_p2pk33bytes - .get(index)? - .map(Value::into_inner) - { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from((&bytes, OutputType::P2PK33)); - self.addressbyteshash_to_outputtypeindex.remove(hash); - index.increment(); - } - } + if starting_indexes.txindex != TxIndex::ZERO { + vecs.txindex_to_txid + .iter_from(starting_indexes.txindex, |(txindex, txid, ..)| { + let txidprefix = TxidPrefix::from(&txid); - if let Some(index) = vecs - .height_to_first_p2pkhindex - .get(starting_indexes.height)? - { - let mut index = index.into_inner(); - while let Some(typedbytes) = vecs - .p2pkhindex_to_p2pkhbytes - .get(index)? - .map(Value::into_inner) - { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from((&bytes, OutputType::P2PKH)); - self.addressbyteshash_to_outputtypeindex.remove(hash); - index.increment(); - } - } + // "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599" + let is_not_first_dup = txindex != TxIndex::new(142783) + || txidprefix != TxidPrefix::from([153, 133, 216, 41, 84, 225, 15, 34]); - if let Some(index) = vecs - .height_to_first_p2shindex - .get(starting_indexes.height)? - { - let mut index = index.into_inner(); - while let Some(typedbytes) = vecs - .p2shindex_to_p2shbytes - .get(index)? - .map(Value::into_inner) - { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from((&bytes, OutputType::P2SH)); - self.addressbyteshash_to_outputtypeindex.remove(hash); - index.increment(); - } - } + // "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468" + let is_not_second_dup = txindex != TxIndex::new(142841) + || txidprefix != TxidPrefix::from([104, 180, 95, 88, 182, 116, 233, 78]); - if let Some(index) = vecs - .height_to_first_p2trindex - .get(starting_indexes.height)? - { - let mut index = index.into_inner(); - while let Some(typedbytes) = vecs - .p2trindex_to_p2trbytes - .get(index)? - .map(Value::into_inner) - { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from((&bytes, OutputType::P2TR)); - self.addressbyteshash_to_outputtypeindex.remove(hash); - index.increment(); - } - } + if is_not_first_dup && is_not_second_dup { + self.txidprefix_to_txindex.remove(txidprefix); + } - if let Some(index) = vecs - .height_to_first_p2wpkhindex - .get(starting_indexes.height)? - { - let mut index = index.into_inner(); - while let Some(typedbytes) = vecs - .p2wpkhindex_to_p2wpkhbytes - .get(index)? - .map(Value::into_inner) - { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from((&bytes, OutputType::P2WPKH)); - self.addressbyteshash_to_outputtypeindex.remove(hash); - index.increment(); - } - } - - if let Some(index) = vecs - .height_to_first_p2wshindex - .get(starting_indexes.height)? - { - let mut index = index.into_inner(); - while let Some(typedbytes) = vecs - .p2wshindex_to_p2wshbytes - .get(index)? - .map(Value::into_inner) - { - let bytes = AddressBytes::from(typedbytes); - let hash = AddressBytesHash::from((&bytes, OutputType::P2WSH)); - self.addressbyteshash_to_outputtypeindex.remove(hash); - index.increment(); - } + Ok(()) + })?; + } else { + self.txidprefix_to_txindex.reset_partition()?; } self.commit(starting_indexes.height.decremented().unwrap_or_default())?; diff --git a/crates/brk_parser/src/blk_index_to_blk_recap.rs b/crates/brk_parser/src/blk_index_to_blk_recap.rs index 19f74ba3d..0f5ac2e8b 100644 --- a/crates/brk_parser/src/blk_index_to_blk_recap.rs +++ b/crates/brk_parser/src/blk_index_to_blk_recap.rs @@ -14,7 +14,11 @@ pub struct BlkIndexToBlkRecap { } impl BlkIndexToBlkRecap { - pub fn import(bitcoin_dir: &Path, blk_index_to_blk_path: &BlkIndexToBlkPath, start: Option) -> (Self, u16) { + pub fn import( + bitcoin_dir: &Path, + blk_index_to_blk_path: &BlkIndexToBlkPath, + start: Option, + ) -> (Self, u16) { let path = bitcoin_dir.join("blk_index_to_blk_recap.json"); let tree = { @@ -40,17 +44,19 @@ impl BlkIndexToBlkRecap { let mut unprocessed_keys = self.tree.keys().copied().collect::>(); - blk_index_to_blk_path.iter().for_each(|(blk_index, blk_path)| { - unprocessed_keys.remove(blk_index); - if let Some(blk_recap) = self.tree.get(blk_index) { - if blk_recap.has_different_modified_time(blk_path) { - self.tree.remove(blk_index).unwrap(); - if min_removed_blk_index.is_none_or(|_blk_index| *blk_index < _blk_index) { - min_removed_blk_index.replace(*blk_index); + blk_index_to_blk_path + .iter() + .for_each(|(blk_index, blk_path)| { + unprocessed_keys.remove(blk_index); + if let Some(blk_recap) = self.tree.get(blk_index) { + if blk_recap.has_different_modified_time(blk_path) { + self.tree.remove(blk_index).unwrap(); + if min_removed_blk_index.is_none_or(|_blk_index| *blk_index < _blk_index) { + min_removed_blk_index.replace(*blk_index); + } } } - } - }); + }); unprocessed_keys.into_iter().for_each(|blk_index| { self.tree.remove(&blk_index).unwrap(); @@ -71,7 +77,11 @@ impl BlkIndexToBlkRecap { let mut start = None; - if let Some(found) = self.tree.iter().find(|(_, recap)| recap.max_height >= height) { + if let Some(found) = self + .tree + .iter() + .find(|(_, recap)| recap.max_height >= height) + { start = Some(*found.0); } diff --git a/crates/brk_vec/examples/main.rs b/crates/brk_vec/examples/main.rs index c183a4546..c81b15234 100644 --- a/crates/brk_vec/examples/main.rs +++ b/crates/brk_vec/examples/main.rs @@ -72,6 +72,11 @@ fn main() -> Result<(), Box> { })?; dbg!(vec.collect_signed_range(Some(-5), None)?); + + vec.push(vec.len() as u32); + dbg!(vec.get_last()); + + dbg!(vec.into_iter().map(|v| v).collect::>()); } Ok(()) diff --git a/crates/brk_vec/src/lib.rs b/crates/brk_vec/src/lib.rs index 355113dcd..0dd313592 100644 --- a/crates/brk_vec/src/lib.rs +++ b/crates/brk_vec/src/lib.rs @@ -239,3 +239,43 @@ where GenericVec::file_name(self) } } + +#[derive(Debug)] +pub enum StoredVecIterator<'a, I, T> +where + I: StoredIndex, + T: StoredType, +{ + Raw(RawVecIterator<'a, I, T>), + Compressed(CompressedVecIterator<'a, I, T>), +} + +impl<'a, I, T> Iterator for StoredVecIterator<'a, I, T> +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Value<'a, T>); + fn next(&mut self) -> Option { + match self { + Self::Compressed(i) => i.next(), + Self::Raw(i) => i.next(), + } + } +} + +impl<'a, I, T> IntoIterator for &'a StoredVec +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Value<'a, T>); + type IntoIter = StoredVecIterator<'a, I, T>; + + fn into_iter(self) -> Self::IntoIter { + match self { + StoredVec::Compressed(v) => StoredVecIterator::Compressed(v.into_iter()), + StoredVec::Raw(v) => StoredVecIterator::Raw(v.into_iter()), + } + } +} diff --git a/crates/brk_vec/src/variants/compressed.rs b/crates/brk_vec/src/variants/compressed.rs index 21e545abe..bd2eacc3c 100644 --- a/crates/brk_vec/src/variants/compressed.rs +++ b/crates/brk_vec/src/variants/compressed.rs @@ -12,7 +12,7 @@ use zstd::DEFAULT_COMPRESSION_LEVEL; use crate::{ CompressedPageMetadata, CompressedPagesMetadata, DynamicVec, Error, GenericVec, RawVec, Result, - StoredIndex, StoredType, UnsafeSlice, Version, + StoredIndex, StoredType, UnsafeSlice, Value, Version, }; const ONE_KIB: usize = 1024; @@ -193,6 +193,14 @@ where fn page_index_to_index(page_index: usize) -> usize { page_index * Self::PER_PAGE } + + fn stored_len_(pages_meta: &Guard>) -> usize { + if let Some(last) = pages_meta.last() { + (pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize + } else { + 0 + } + } } impl DynamicVec for CompressedVec @@ -262,12 +270,7 @@ where } fn stored_len(&self) -> usize { - let pages_meta = self.pages_meta.load(); - if let Some(last) = pages_meta.last() { - (pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize - } else { - 0 - } + Self::stored_len_(&self.pages_meta.load()) } #[inline] @@ -524,3 +527,74 @@ where } } } + +#[derive(Debug)] +pub struct CompressedVecIterator<'a, I, T> { + vec: &'a CompressedVec, + guard: Guard>, + decoded_page: Option<(usize, Vec)>, + // second_decoded_page?: Option<(usize, Vec)>, + pages_meta: Guard>, + stored_len: usize, + index: usize, +} + +impl<'a, I, T> Iterator for CompressedVecIterator<'a, I, T> +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Value<'a, T>); + fn next(&mut self) -> Option { + let mmap = &self.guard; + let i = self.index; + let stored_len = self.stored_len; + + let result = if i >= stored_len { + let j = i - stored_len; + if j >= self.vec.pushed_len() { + return None; + } + self.vec + .pushed() + .get(j) + .map(|v| (I::from(i), Value::Ref(v))) + } else { + CompressedVec::::cached_get_stored__( + i, + mmap, + stored_len, + &mut self.decoded_page, + &self.pages_meta, + ) + .unwrap() + .map(|v| (I::from(i), Value::Owned(v))) + }; + + self.index += 1; + + result + } +} + +impl<'a, I, T> IntoIterator for &'a CompressedVec +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Value<'a, T>); + type IntoIter = CompressedVecIterator<'a, I, T>; + + fn into_iter(self) -> Self::IntoIter { + let pages_meta = self.pages_meta.load(); + let stored_len = CompressedVec::::stored_len_(&pages_meta); + CompressedVecIterator { + vec: self, + guard: self.mmap().load(), + decoded_page: None, + pages_meta, + stored_len, + index: 0, + } + } +} diff --git a/crates/brk_vec/src/variants/raw.rs b/crates/brk_vec/src/variants/raw.rs index fd59d709a..28479ed51 100644 --- a/crates/brk_vec/src/variants/raw.rs +++ b/crates/brk_vec/src/variants/raw.rs @@ -10,7 +10,9 @@ use arc_swap::{ArcSwap, Guard}; use memmap2::Mmap; use rayon::prelude::*; -use crate::{DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Version}; +use crate::{ + DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Value, Version, +}; #[derive(Debug)] pub struct RawVec { @@ -239,3 +241,61 @@ where } } } + +#[derive(Debug)] +pub struct RawVecIterator<'a, I, T> { + vec: &'a RawVec, + guard: Guard>, + index: usize, +} + +impl RawVecIterator<'_, I, T> { + const SIZE_OF_T: usize = size_of::(); +} + +impl<'a, I, T> Iterator for RawVecIterator<'a, I, T> +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Value<'a, T>); + fn next(&mut self) -> Option { + let mmap = &self.guard; + let vec = self.vec; + let i = self.index; + + let stored_len = mmap.len() / Self::SIZE_OF_T; + + let result = if i >= stored_len { + let j = i - stored_len; + if j >= vec.pushed_len() { + return None; + } + vec.pushed().get(j).map(|v| (I::from(i), Value::Ref(v))) + } else { + vec.get_stored_(i, mmap) + .unwrap() + .map(|v| (I::from(i), Value::Owned(v))) + }; + + self.index += 1; + result + } +} + +impl<'a, I, T> IntoIterator for &'a RawVec +where + I: StoredIndex, + T: StoredType, +{ + type Item = (I, Value<'a, T>); + type IntoIter = RawVecIterator<'a, I, T>; + + fn into_iter(self) -> Self::IntoIter { + RawVecIterator { + vec: self, + guard: self.mmap.load(), + index: 0, + } + } +}