vec: iter + global: snapshot

This commit is contained in:
nym21
2025-04-27 00:21:21 +02:00
parent d55478da54
commit bdc3c19163
10 changed files with 389 additions and 155 deletions

View File

@@ -28,8 +28,8 @@ use crate::CheckedSub;
pub struct Height(u32);
impl Height {
pub const ZERO: Self = Height(0);
pub const MAX: Self = Height(u32::MAX);
pub const ZERO: Self = Self(0);
pub const MAX: Self = Self(u32::MAX);
pub fn new(height: u32) -> Self {
Self(height)

View File

@@ -29,6 +29,12 @@ use super::StoredU32;
pub struct TxIndex(u32);
impl TxIndex {
pub const ZERO: Self = Self(0);
pub fn new(txindex: u32) -> Self {
Self(txindex)
}
pub fn incremented(self) -> Self {
Self(*self + 1)
}

View File

@@ -19,36 +19,29 @@ use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
FromBytes,
Serialize,
)]
pub struct Weight(u32);
pub struct Weight(u64);
impl From<bitcoin::Weight> for Weight {
fn from(value: bitcoin::Weight) -> Self {
let wu = value.to_wu();
if wu > u32::MAX as u64 {
unreachable!("wu is too big, shouldn't happen")
}
Self(wu as u32)
Self(value.to_wu())
}
}
impl From<Weight> for bitcoin::Weight {
fn from(value: Weight) -> Self {
Self::from_wu(*value as u64)
Self::from_wu(value.0)
}
}
impl From<usize> for Weight {
fn from(value: usize) -> Self {
if value > u32::MAX as usize {
panic!()
}
Self(value as u32)
Self(value as u64)
}
}
impl From<f64> for Weight {
fn from(value: f64) -> Self {
Self(value as u32)
Self(value as u64)
}
}

View File

@@ -10,7 +10,7 @@ use brk_core::Height;
use brk_vec::{Value, Version};
use byteview::ByteView;
use fjall::{
PartitionCreateOptions, ReadTransaction, Result, TransactionalKeyspace,
PartitionCreateOptions, PersistMode, ReadTransaction, Result, TransactionalKeyspace,
TransactionalPartitionHandle,
};
use zerocopy::{Immutable, IntoBytes};
@@ -19,6 +19,7 @@ use super::StoreMeta;
pub struct Store<Key, Value> {
meta: StoreMeta,
name: String,
keyspace: TransactionalKeyspace,
partition: TransactionalPartitionHandle,
rtx: ReadTransaction,
@@ -58,6 +59,7 @@ where
Ok(Self {
meta,
name: name.to_owned(),
keyspace,
partition,
rtx,
@@ -177,6 +179,13 @@ where
.manual_journal_persist(true),
)
}
pub fn reset_partition(&mut self) -> Result<()> {
self.keyspace.delete_partition(self.partition.clone())?;
self.keyspace.persist(PersistMode::SyncAll)?;
self.partition = Self::open_partition_handle(&self.keyspace, &self.name)?;
Ok(())
}
}
impl<Key, Value> Clone for Store<Key, Value>
@@ -187,6 +196,7 @@ where
fn clone(&self) -> Self {
Self {
meta: self.meta.clone(),
name: self.name.clone(),
keyspace: self.keyspace.clone(),
partition: self.partition.clone(),
rtx: self.keyspace.read_tx(),

View File

@@ -86,137 +86,173 @@ impl Stores {
return Ok(());
}
vecs.height_to_blockhash
.iter_from(starting_indexes.height, |(_, blockhash, ..)| {
let blockhashprefix = BlockHashPrefix::from(blockhash);
self.blockhashprefix_to_height.remove(blockhashprefix);
Ok(())
})?;
if starting_indexes.height != Height::ZERO {
vecs.height_to_blockhash
.iter_from(starting_indexes.height, |(_, blockhash, ..)| {
let blockhashprefix = BlockHashPrefix::from(blockhash);
self.blockhashprefix_to_height.remove(blockhashprefix);
Ok(())
})?;
vecs.txindex_to_txid
.iter_from(starting_indexes.txindex, |(_txindex, txid, ..)| {
let txidprefix = TxidPrefix::from(txid);
self.txidprefix_to_txindex.remove(txidprefix);
Ok(())
})?;
if let Some(index) = vecs
.height_to_first_p2pk65index
.get(starting_indexes.height)?
{
let mut index = index.into_inner();
while let Some(typedbytes) = vecs
.p2pk65index_to_p2pk65bytes
.get(index)?
if let Some(mut index) = vecs
.height_to_first_p2pk65index
.get(starting_indexes.height)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PK65));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
while let Some(typedbytes) = vecs
.p2pk65index_to_p2pk65bytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PK65));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2pk33index
.get(starting_indexes.height)?
.map(Value::into_inner)
{
while let Some(typedbytes) = vecs
.p2pk33index_to_p2pk33bytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PK33));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2pkhindex
.get(starting_indexes.height)?
.map(Value::into_inner)
{
while let Some(typedbytes) = vecs
.p2pkhindex_to_p2pkhbytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PKH));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2shindex
.get(starting_indexes.height)?
.map(Value::into_inner)
{
while let Some(typedbytes) = vecs
.p2shindex_to_p2shbytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2SH));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2trindex
.get(starting_indexes.height)?
.map(Value::into_inner)
{
while let Some(typedbytes) = vecs
.p2trindex_to_p2trbytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2TR));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2wpkhindex
.get(starting_indexes.height)?
.map(Value::into_inner)
{
while let Some(typedbytes) = vecs
.p2wpkhindex_to_p2wpkhbytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2WPKH));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2wshindex
.get(starting_indexes.height)?
.map(Value::into_inner)
{
while let Some(typedbytes) = vecs
.p2wshindex_to_p2wshbytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2WSH));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
if let Some(mut index) = vecs
.height_to_first_p2aindex
.get(starting_indexes.height)?
.map(Value::into_inner)
{
while let Some(typedbytes) =
vecs.p2aindex_to_p2abytes.get(index)?.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2A));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
} else {
self.blockhashprefix_to_height.reset_partition()?;
self.addressbyteshash_to_outputtypeindex.reset_partition()?;
}
if let Some(index) = vecs
.height_to_first_p2pk33index
.get(starting_indexes.height)?
{
let mut index = index.into_inner();
while let Some(typedbytes) = vecs
.p2pk33index_to_p2pk33bytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PK33));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
if starting_indexes.txindex != TxIndex::ZERO {
vecs.txindex_to_txid
.iter_from(starting_indexes.txindex, |(txindex, txid, ..)| {
let txidprefix = TxidPrefix::from(&txid);
if let Some(index) = vecs
.height_to_first_p2pkhindex
.get(starting_indexes.height)?
{
let mut index = index.into_inner();
while let Some(typedbytes) = vecs
.p2pkhindex_to_p2pkhbytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PKH));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
// "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599"
let is_not_first_dup = txindex != TxIndex::new(142783)
|| txidprefix != TxidPrefix::from([153, 133, 216, 41, 84, 225, 15, 34]);
if let Some(index) = vecs
.height_to_first_p2shindex
.get(starting_indexes.height)?
{
let mut index = index.into_inner();
while let Some(typedbytes) = vecs
.p2shindex_to_p2shbytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2SH));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
// "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468"
let is_not_second_dup = txindex != TxIndex::new(142841)
|| txidprefix != TxidPrefix::from([104, 180, 95, 88, 182, 116, 233, 78]);
if let Some(index) = vecs
.height_to_first_p2trindex
.get(starting_indexes.height)?
{
let mut index = index.into_inner();
while let Some(typedbytes) = vecs
.p2trindex_to_p2trbytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2TR));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
if is_not_first_dup && is_not_second_dup {
self.txidprefix_to_txindex.remove(txidprefix);
}
if let Some(index) = vecs
.height_to_first_p2wpkhindex
.get(starting_indexes.height)?
{
let mut index = index.into_inner();
while let Some(typedbytes) = vecs
.p2wpkhindex_to_p2wpkhbytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2WPKH));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
}
if let Some(index) = vecs
.height_to_first_p2wshindex
.get(starting_indexes.height)?
{
let mut index = index.into_inner();
while let Some(typedbytes) = vecs
.p2wshindex_to_p2wshbytes
.get(index)?
.map(Value::into_inner)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2WSH));
self.addressbyteshash_to_outputtypeindex.remove(hash);
index.increment();
}
Ok(())
})?;
} else {
self.txidprefix_to_txindex.reset_partition()?;
}
self.commit(starting_indexes.height.decremented().unwrap_or_default())?;

View File

@@ -14,7 +14,11 @@ pub struct BlkIndexToBlkRecap {
}
impl BlkIndexToBlkRecap {
pub fn import(bitcoin_dir: &Path, blk_index_to_blk_path: &BlkIndexToBlkPath, start: Option<Height>) -> (Self, u16) {
pub fn import(
bitcoin_dir: &Path,
blk_index_to_blk_path: &BlkIndexToBlkPath,
start: Option<Height>,
) -> (Self, u16) {
let path = bitcoin_dir.join("blk_index_to_blk_recap.json");
let tree = {
@@ -40,17 +44,19 @@ impl BlkIndexToBlkRecap {
let mut unprocessed_keys = self.tree.keys().copied().collect::<BTreeSet<_>>();
blk_index_to_blk_path.iter().for_each(|(blk_index, blk_path)| {
unprocessed_keys.remove(blk_index);
if let Some(blk_recap) = self.tree.get(blk_index) {
if blk_recap.has_different_modified_time(blk_path) {
self.tree.remove(blk_index).unwrap();
if min_removed_blk_index.is_none_or(|_blk_index| *blk_index < _blk_index) {
min_removed_blk_index.replace(*blk_index);
blk_index_to_blk_path
.iter()
.for_each(|(blk_index, blk_path)| {
unprocessed_keys.remove(blk_index);
if let Some(blk_recap) = self.tree.get(blk_index) {
if blk_recap.has_different_modified_time(blk_path) {
self.tree.remove(blk_index).unwrap();
if min_removed_blk_index.is_none_or(|_blk_index| *blk_index < _blk_index) {
min_removed_blk_index.replace(*blk_index);
}
}
}
}
});
});
unprocessed_keys.into_iter().for_each(|blk_index| {
self.tree.remove(&blk_index).unwrap();
@@ -71,7 +77,11 @@ impl BlkIndexToBlkRecap {
let mut start = None;
if let Some(found) = self.tree.iter().find(|(_, recap)| recap.max_height >= height) {
if let Some(found) = self
.tree
.iter()
.find(|(_, recap)| recap.max_height >= height)
{
start = Some(*found.0);
}

View File

@@ -72,6 +72,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
})?;
dbg!(vec.collect_signed_range(Some(-5), None)?);
vec.push(vec.len() as u32);
dbg!(vec.get_last());
dbg!(vec.into_iter().map(|v| v).collect::<Vec<_>>());
}
Ok(())

View File

@@ -239,3 +239,43 @@ where
GenericVec::file_name(self)
}
}
#[derive(Debug)]
pub enum StoredVecIterator<'a, I, T>
where
I: StoredIndex,
T: StoredType,
{
Raw(RawVecIterator<'a, I, T>),
Compressed(CompressedVecIterator<'a, I, T>),
}
impl<'a, I, T> Iterator for StoredVecIterator<'a, I, T>
where
I: StoredIndex,
T: StoredType,
{
type Item = (I, Value<'a, T>);
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Compressed(i) => i.next(),
Self::Raw(i) => i.next(),
}
}
}
impl<'a, I, T> IntoIterator for &'a StoredVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
type Item = (I, Value<'a, T>);
type IntoIter = StoredVecIterator<'a, I, T>;
fn into_iter(self) -> Self::IntoIter {
match self {
StoredVec::Compressed(v) => StoredVecIterator::Compressed(v.into_iter()),
StoredVec::Raw(v) => StoredVecIterator::Raw(v.into_iter()),
}
}
}

View File

@@ -12,7 +12,7 @@ use zstd::DEFAULT_COMPRESSION_LEVEL;
use crate::{
CompressedPageMetadata, CompressedPagesMetadata, DynamicVec, Error, GenericVec, RawVec, Result,
StoredIndex, StoredType, UnsafeSlice, Version,
StoredIndex, StoredType, UnsafeSlice, Value, Version,
};
const ONE_KIB: usize = 1024;
@@ -193,6 +193,14 @@ where
fn page_index_to_index(page_index: usize) -> usize {
page_index * Self::PER_PAGE
}
fn stored_len_(pages_meta: &Guard<Arc<CompressedPagesMetadata>>) -> usize {
if let Some(last) = pages_meta.last() {
(pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize
} else {
0
}
}
}
impl<I, T> DynamicVec for CompressedVec<I, T>
@@ -262,12 +270,7 @@ where
}
fn stored_len(&self) -> usize {
let pages_meta = self.pages_meta.load();
if let Some(last) = pages_meta.last() {
(pages_meta.len() - 1) * Self::PER_PAGE + last.values_len as usize
} else {
0
}
Self::stored_len_(&self.pages_meta.load())
}
#[inline]
@@ -524,3 +527,74 @@ where
}
}
}
#[derive(Debug)]
pub struct CompressedVecIterator<'a, I, T> {
vec: &'a CompressedVec<I, T>,
guard: Guard<Arc<Mmap>>,
decoded_page: Option<(usize, Vec<T>)>,
// second_decoded_page?: Option<(usize, Vec<T>)>,
pages_meta: Guard<Arc<CompressedPagesMetadata>>,
stored_len: usize,
index: usize,
}
impl<'a, I, T> Iterator for CompressedVecIterator<'a, I, T>
where
I: StoredIndex,
T: StoredType,
{
type Item = (I, Value<'a, T>);
fn next(&mut self) -> Option<Self::Item> {
let mmap = &self.guard;
let i = self.index;
let stored_len = self.stored_len;
let result = if i >= stored_len {
let j = i - stored_len;
if j >= self.vec.pushed_len() {
return None;
}
self.vec
.pushed()
.get(j)
.map(|v| (I::from(i), Value::Ref(v)))
} else {
CompressedVec::<I, T>::cached_get_stored__(
i,
mmap,
stored_len,
&mut self.decoded_page,
&self.pages_meta,
)
.unwrap()
.map(|v| (I::from(i), Value::Owned(v)))
};
self.index += 1;
result
}
}
impl<'a, I, T> IntoIterator for &'a CompressedVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
type Item = (I, Value<'a, T>);
type IntoIter = CompressedVecIterator<'a, I, T>;
fn into_iter(self) -> Self::IntoIter {
let pages_meta = self.pages_meta.load();
let stored_len = CompressedVec::<I, T>::stored_len_(&pages_meta);
CompressedVecIterator {
vec: self,
guard: self.mmap().load(),
decoded_page: None,
pages_meta,
stored_len,
index: 0,
}
}
}

View File

@@ -10,7 +10,9 @@ use arc_swap::{ArcSwap, Guard};
use memmap2::Mmap;
use rayon::prelude::*;
use crate::{DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Version};
use crate::{
DynamicVec, Error, GenericVec, Result, StoredIndex, StoredType, UnsafeSlice, Value, Version,
};
#[derive(Debug)]
pub struct RawVec<I, T> {
@@ -239,3 +241,61 @@ where
}
}
}
#[derive(Debug)]
pub struct RawVecIterator<'a, I, T> {
vec: &'a RawVec<I, T>,
guard: Guard<Arc<Mmap>>,
index: usize,
}
impl<I, T> RawVecIterator<'_, I, T> {
const SIZE_OF_T: usize = size_of::<T>();
}
impl<'a, I, T> Iterator for RawVecIterator<'a, I, T>
where
I: StoredIndex,
T: StoredType,
{
type Item = (I, Value<'a, T>);
fn next(&mut self) -> Option<Self::Item> {
let mmap = &self.guard;
let vec = self.vec;
let i = self.index;
let stored_len = mmap.len() / Self::SIZE_OF_T;
let result = if i >= stored_len {
let j = i - stored_len;
if j >= vec.pushed_len() {
return None;
}
vec.pushed().get(j).map(|v| (I::from(i), Value::Ref(v)))
} else {
vec.get_stored_(i, mmap)
.unwrap()
.map(|v| (I::from(i), Value::Owned(v)))
};
self.index += 1;
result
}
}
impl<'a, I, T> IntoIterator for &'a RawVec<I, T>
where
I: StoredIndex,
T: StoredType,
{
type Item = (I, Value<'a, T>);
type IntoIter = RawVecIterator<'a, I, T>;
fn into_iter(self) -> Self::IntoIter {
RawVecIterator {
vec: self,
guard: self.mmap.load(),
index: 0,
}
}
}