computer: store part 2

This commit is contained in:
nym21
2025-06-27 19:38:44 +02:00
parent e73daa6214
commit 8ea13544de
16 changed files with 393 additions and 118 deletions
+111 -7
View File
@@ -1,19 +1,23 @@
use std::{path::Path, thread};
use brk_core::{
AddressData, EmptyAddressData, P2AAddressIndex, P2AAddressIndexOutputindex, P2PK33AddressIndex,
P2PK33AddressIndexOutputindex, P2PK65AddressIndex, P2PK65AddressIndexOutputindex,
P2PKHAddressIndex, P2PKHAddressIndexOutputindex, P2SHAddressIndex, P2SHAddressIndexOutputindex,
P2TRAddressIndex, P2TRAddressIndexOutputindex, P2WPKHAddressIndex,
P2WPKHAddressIndexOutputindex, P2WSHAddressIndex, P2WSHAddressIndexOutputindex, Unit, Version,
AddressData, EmptyAddressData, Height, P2AAddressIndex, P2AAddressIndexOutputindex,
P2PK33AddressIndex, P2PK33AddressIndexOutputindex, P2PK65AddressIndex,
P2PK65AddressIndexOutputindex, P2PKHAddressIndex, P2PKHAddressIndexOutputindex,
P2SHAddressIndex, P2SHAddressIndexOutputindex, P2TRAddressIndex, P2TRAddressIndexOutputindex,
P2WPKHAddressIndex, P2WPKHAddressIndexOutputindex, P2WSHAddressIndex,
P2WSHAddressIndexOutputindex, Result, Unit, Version,
};
use brk_store::Store;
use fjall::TransactionalKeyspace;
use brk_store::{AnyStore, Store};
use fjall::{PersistMode, TransactionalKeyspace};
use rayon::prelude::*;
const VERSION: Version = Version::ZERO;
#[derive(Clone)]
pub struct Stores {
keyspace: TransactionalKeyspace,
pub p2aaddressindex_to_addressdata: Store<P2AAddressIndex, AddressData>,
pub p2aaddressindex_to_emptyaddressdata: Store<P2AAddressIndex, EmptyAddressData>,
pub p2aaddressindex_to_utxos_received: Store<P2AAddressIndexOutputindex, Unit>,
@@ -420,6 +424,8 @@ impl Stores {
});
Ok(Self {
keyspace: keyspace.clone(),
p2aaddressindex_to_addressdata,
p2aaddressindex_to_emptyaddressdata,
p2aaddressindex_to_utxos_received,
@@ -461,4 +467,102 @@ impl Stores {
p2wshaddressindex_to_utxos_sent,
})
}
pub fn starting_height(&self) -> Height {
self.as_slice()
.into_iter()
.map(|store| store.height().map(Height::incremented).unwrap_or_default())
.min()
.unwrap()
}
pub fn commit(&mut self, height: Height) -> Result<()> {
self.as_mut_slice()
.into_par_iter()
.try_for_each(|store| store.commit(height))?;
self.keyspace
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
pub fn rotate_memtables(&self) {
self.as_slice()
.into_iter()
.for_each(|store| store.rotate_memtable());
}
fn as_slice(&self) -> [&(dyn AnyStore + Send + Sync); 32] {
[
&self.p2aaddressindex_to_addressdata,
&self.p2aaddressindex_to_emptyaddressdata,
&self.p2aaddressindex_to_utxos_received,
&self.p2aaddressindex_to_utxos_sent,
&self.p2pk33addressindex_to_addressdata,
&self.p2pk33addressindex_to_emptyaddressdata,
&self.p2pk33addressindex_to_utxos_received,
&self.p2pk33addressindex_to_utxos_sent,
&self.p2pk65addressindex_to_addressdata,
&self.p2pk65addressindex_to_emptyaddressdata,
&self.p2pk65addressindex_to_utxos_received,
&self.p2pk65addressindex_to_utxos_sent,
&self.p2pkhaddressindex_to_addressdata,
&self.p2pkhaddressindex_to_emptyaddressdata,
&self.p2pkhaddressindex_to_utxos_received,
&self.p2pkhaddressindex_to_utxos_sent,
&self.p2shaddressindex_to_addressdata,
&self.p2shaddressindex_to_emptyaddressdata,
&self.p2shaddressindex_to_utxos_received,
&self.p2shaddressindex_to_utxos_sent,
&self.p2traddressindex_to_addressdata,
&self.p2traddressindex_to_emptyaddressdata,
&self.p2traddressindex_to_utxos_received,
&self.p2traddressindex_to_utxos_sent,
&self.p2wpkhaddressindex_to_addressdata,
&self.p2wpkhaddressindex_to_emptyaddressdata,
&self.p2wpkhaddressindex_to_utxos_received,
&self.p2wpkhaddressindex_to_utxos_sent,
&self.p2wshaddressindex_to_addressdata,
&self.p2wshaddressindex_to_emptyaddressdata,
&self.p2wshaddressindex_to_utxos_received,
&self.p2wshaddressindex_to_utxos_sent,
]
}
fn as_mut_slice(&mut self) -> [&mut (dyn AnyStore + Send + Sync); 32] {
[
&mut self.p2aaddressindex_to_addressdata,
&mut self.p2aaddressindex_to_emptyaddressdata,
&mut self.p2aaddressindex_to_utxos_received,
&mut self.p2aaddressindex_to_utxos_sent,
&mut self.p2pk33addressindex_to_addressdata,
&mut self.p2pk33addressindex_to_emptyaddressdata,
&mut self.p2pk33addressindex_to_utxos_received,
&mut self.p2pk33addressindex_to_utxos_sent,
&mut self.p2pk65addressindex_to_addressdata,
&mut self.p2pk65addressindex_to_emptyaddressdata,
&mut self.p2pk65addressindex_to_utxos_received,
&mut self.p2pk65addressindex_to_utxos_sent,
&mut self.p2pkhaddressindex_to_addressdata,
&mut self.p2pkhaddressindex_to_emptyaddressdata,
&mut self.p2pkhaddressindex_to_utxos_received,
&mut self.p2pkhaddressindex_to_utxos_sent,
&mut self.p2shaddressindex_to_addressdata,
&mut self.p2shaddressindex_to_emptyaddressdata,
&mut self.p2shaddressindex_to_utxos_received,
&mut self.p2shaddressindex_to_utxos_sent,
&mut self.p2traddressindex_to_addressdata,
&mut self.p2traddressindex_to_emptyaddressdata,
&mut self.p2traddressindex_to_utxos_received,
&mut self.p2traddressindex_to_utxos_sent,
&mut self.p2wpkhaddressindex_to_addressdata,
&mut self.p2wpkhaddressindex_to_emptyaddressdata,
&mut self.p2wpkhaddressindex_to_utxos_received,
&mut self.p2wpkhaddressindex_to_utxos_sent,
&mut self.p2wshaddressindex_to_addressdata,
&mut self.p2wshaddressindex_to_emptyaddressdata,
&mut self.p2wshaddressindex_to_utxos_received,
&mut self.p2wshaddressindex_to_utxos_sent,
]
}
}
+61 -5
View File
@@ -1,11 +1,11 @@
use byteview::ByteView;
use zerocopy::{FromBytes, IntoBytes};
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
use zerocopy_derive::{FromBytes, Immutable, KnownLayout};
use crate::{Dollars, Sats};
use crate::{CheckedSub, Dollars, EmptyAddressData, Error, Result, Sats};
#[derive(Debug, Default, Clone, FromBytes, Immutable, IntoBytes, KnownLayout)]
#[repr(C, packed)]
#[derive(Debug, Default, Clone, FromBytes, Immutable, KnownLayout)]
#[repr(C)]
pub struct AddressData {
pub sent: Sats,
pub received: Sats,
@@ -17,6 +17,54 @@ impl AddressData {
pub fn amount(&self) -> Sats {
(u64::from(self.received) - u64::from(self.sent)).into()
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
if self.amount() == Sats::ZERO {
if self.outputs_len != 0 {
unreachable!();
}
true
} else {
false
}
}
pub fn receive(&mut self, amount: Sats, price: Dollars) {
self.received += amount;
self.outputs_len += 1;
self.realized_cap += price * amount;
}
pub fn send(&mut self, amount: Sats, previous_price: Dollars) -> Result<()> {
if self.amount() < amount {
return Err(Error::String("Previous_amount smaller than sent amount"));
}
self.sent += amount;
self.outputs_len -= 1;
self.realized_cap = self
.realized_cap
.checked_sub(previous_price * amount)
.unwrap();
Ok(())
}
}
impl From<EmptyAddressData> for AddressData {
fn from(value: EmptyAddressData) -> Self {
Self::from(&value)
}
}
impl From<&EmptyAddressData> for AddressData {
fn from(value: &EmptyAddressData) -> Self {
Self {
sent: value.transfered,
received: value.transfered,
realized_cap: Dollars::ZERO,
outputs_len: 0,
}
}
}
impl From<ByteView> for AddressData {
@@ -31,6 +79,14 @@ impl From<AddressData> for ByteView {
}
impl From<&AddressData> for ByteView {
fn from(value: &AddressData) -> Self {
Self::new(value.as_bytes())
Self::new(
&[
value.sent.as_bytes(),
value.received.as_bytes(),
value.realized_cap.as_bytes(),
value.outputs_len.as_bytes(),
]
.concat(),
)
}
}
@@ -2,14 +2,30 @@ use byteview::ByteView;
use zerocopy::{FromBytes, IntoBytes};
use zerocopy_derive::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::Sats;
use crate::{AddressData, Sats};
#[derive(Debug, Default, Clone, FromBytes, Immutable, IntoBytes, KnownLayout)]
pub struct EmptyAddressData {
pub transfered: Sats,
}
impl EmptyAddressData {}
impl From<AddressData> for EmptyAddressData {
fn from(value: AddressData) -> Self {
Self::from(&value)
}
}
impl From<&AddressData> for EmptyAddressData {
fn from(value: &AddressData) -> Self {
if value.sent != value.received {
dbg!(&value);
panic!("Trying to convert not empty wallet to empty !");
}
Self {
transfered: value.sent,
}
}
}
impl From<ByteView> for EmptyAddressData {
fn from(value: ByteView) -> Self {
@@ -11,6 +11,15 @@ pub struct P2AAddressIndexOutputindex {
outputindex: OutputIndex,
}
impl From<(P2AAddressIndex, OutputIndex)> for P2AAddressIndexOutputindex {
fn from(value: (P2AAddressIndex, OutputIndex)) -> Self {
Self {
addressindex: value.0,
outputindex: value.1,
}
}
}
impl From<ByteView> for P2AAddressIndexOutputindex {
fn from(value: ByteView) -> Self {
let addressindex =
@@ -11,6 +11,15 @@ pub struct P2PK33AddressIndexOutputindex {
outputindex: OutputIndex,
}
impl From<(P2PK33AddressIndex, OutputIndex)> for P2PK33AddressIndexOutputindex {
fn from(value: (P2PK33AddressIndex, OutputIndex)) -> Self {
Self {
addressindex: value.0,
outputindex: value.1,
}
}
}
impl From<ByteView> for P2PK33AddressIndexOutputindex {
fn from(value: ByteView) -> Self {
let addressindex =
@@ -11,6 +11,15 @@ pub struct P2PK65AddressIndexOutputindex {
outputindex: OutputIndex,
}
impl From<(P2PK65AddressIndex, OutputIndex)> for P2PK65AddressIndexOutputindex {
fn from(value: (P2PK65AddressIndex, OutputIndex)) -> Self {
Self {
addressindex: value.0,
outputindex: value.1,
}
}
}
impl From<ByteView> for P2PK65AddressIndexOutputindex {
fn from(value: ByteView) -> Self {
let addressindex =
@@ -11,6 +11,15 @@ pub struct P2PKHAddressIndexOutputindex {
outputindex: OutputIndex,
}
impl From<(P2PKHAddressIndex, OutputIndex)> for P2PKHAddressIndexOutputindex {
fn from(value: (P2PKHAddressIndex, OutputIndex)) -> Self {
Self {
addressindex: value.0,
outputindex: value.1,
}
}
}
impl From<ByteView> for P2PKHAddressIndexOutputindex {
fn from(value: ByteView) -> Self {
let addressindex =
@@ -11,6 +11,15 @@ pub struct P2SHAddressIndexOutputindex {
outputindex: OutputIndex,
}
impl From<(P2SHAddressIndex, OutputIndex)> for P2SHAddressIndexOutputindex {
fn from(value: (P2SHAddressIndex, OutputIndex)) -> Self {
Self {
addressindex: value.0,
outputindex: value.1,
}
}
}
impl From<ByteView> for P2SHAddressIndexOutputindex {
fn from(value: ByteView) -> Self {
let addressindex =
@@ -11,6 +11,15 @@ pub struct P2TRAddressIndexOutputindex {
outputindex: OutputIndex,
}
impl From<(P2TRAddressIndex, OutputIndex)> for P2TRAddressIndexOutputindex {
fn from(value: (P2TRAddressIndex, OutputIndex)) -> Self {
Self {
addressindex: value.0,
outputindex: value.1,
}
}
}
impl From<ByteView> for P2TRAddressIndexOutputindex {
fn from(value: ByteView) -> Self {
let addressindex =
@@ -11,6 +11,15 @@ pub struct P2WPKHAddressIndexOutputindex {
outputindex: OutputIndex,
}
impl From<(P2WPKHAddressIndex, OutputIndex)> for P2WPKHAddressIndexOutputindex {
fn from(value: (P2WPKHAddressIndex, OutputIndex)) -> Self {
Self {
addressindex: value.0,
outputindex: value.1,
}
}
}
impl From<ByteView> for P2WPKHAddressIndexOutputindex {
fn from(value: ByteView) -> Self {
let addressindex =
@@ -11,6 +11,15 @@ pub struct P2WSHAddressIndexOutputindex {
outputindex: OutputIndex,
}
impl From<(P2WSHAddressIndex, OutputIndex)> for P2WSHAddressIndexOutputindex {
fn from(value: (P2WSHAddressIndex, OutputIndex)) -> Self {
Self {
addressindex: value.0,
outputindex: value.1,
}
}
}
impl From<ByteView> for P2WSHAddressIndexOutputindex {
fn from(value: ByteView) -> Self {
let addressindex =
+1
View File
@@ -14,6 +14,7 @@ use brk_core::{
use bitcoin::{Transaction, TxIn, TxOut};
use brk_exit::Exit;
use brk_parser::Parser;
use brk_store::AnyStore;
use brk_vec::{AnyVec, VecIterator};
use color_eyre::eyre::{ContextCompat, eyre};
use log::{error, info};
+29 -29
View File
@@ -4,9 +4,10 @@ use brk_core::{
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutputType, Result, TxIndex,
TxidPrefix, TypeIndex, Value, Version,
};
use brk_store::Store;
use brk_store::{AnyStore, Store};
use brk_vec::AnyIterableVec;
use fjall::{PersistMode, TransactionalKeyspace};
use rayon::prelude::*;
use crate::Indexes;
@@ -283,34 +284,17 @@ impl Stores {
}
pub fn starting_height(&self) -> Height {
[
self.addressbyteshash_to_typeindex.height(),
self.blockhashprefix_to_height.height(),
self.txidprefix_to_txindex.height(),
]
.into_iter()
.map(|height| height.map(Height::incremented).unwrap_or_default())
.min()
.unwrap()
self.as_slice()
.into_iter()
.map(|store| store.height().map(Height::incremented).unwrap_or_default())
.min()
.unwrap()
}
pub fn commit(&mut self, height: Height) -> Result<()> {
thread::scope(|scope| -> Result<()> {
let addressbyteshash_to_typeindex_commit_handle =
scope.spawn(|| self.addressbyteshash_to_typeindex.commit(height));
let blockhashprefix_to_height_commit_handle =
scope.spawn(|| self.blockhashprefix_to_height.commit(height));
let txidprefix_to_txindex_commit_handle =
scope.spawn(|| self.txidprefix_to_txindex.commit(height));
addressbyteshash_to_typeindex_commit_handle
.join()
.unwrap()?;
blockhashprefix_to_height_commit_handle.join().unwrap()?;
txidprefix_to_txindex_commit_handle.join().unwrap()?;
Ok(())
})?;
self.as_mut_slice()
.into_par_iter()
.try_for_each(|store| store.commit(height))?;
self.keyspace
.persist(PersistMode::SyncAll)
@@ -318,8 +302,24 @@ impl Stores {
}
pub fn rotate_memtables(&self) {
self.addressbyteshash_to_typeindex.rotate_memtable();
self.blockhashprefix_to_height.rotate_memtable();
self.txidprefix_to_txindex.rotate_memtable();
self.as_slice()
.into_iter()
.for_each(|store| store.rotate_memtable());
}
fn as_slice(&self) -> [&(dyn AnyStore + Send + Sync); 3] {
[
&self.addressbyteshash_to_typeindex,
&self.blockhashprefix_to_height,
&self.txidprefix_to_txindex,
]
}
fn as_mut_slice(&mut self) -> [&mut (dyn AnyStore + Send + Sync); 3] {
[
&mut self.addressbyteshash_to_typeindex,
&mut self.blockhashprefix_to_height,
&mut self.txidprefix_to_txindex,
]
}
}
+1 -1
View File
@@ -1,7 +1,7 @@
use std::path::Path;
use brk_core::{Dollars, Height, Result, Sats, Version};
use brk_store::Store;
use brk_store::{AnyStore, Store};
fn main() -> Result<()> {
let p = Path::new("./examples/_fjall");
+81 -74
View File
@@ -18,7 +18,10 @@ use fjall::{
};
mod meta;
mod r#trait;
use meta::*;
pub use r#trait::*;
pub struct Store<Key, Value> {
meta: StoreMeta,
@@ -33,7 +36,7 @@ pub struct Store<Key, Value> {
/// Use default if will read
const DEFAULT_BLOOM_FILTER_BITS: Option<u8> = Some(5);
// const CHECK_COLLISISONS: bool = true;
// const CHECK_COLLISIONS: bool = true;
const MAJOR_FJALL_VERSION: Version = Version::TWO;
pub fn open_keyspace(path: &Path) -> fjall::Result<TransactionalKeyspace> {
@@ -161,79 +164,6 @@ where
// });
// }
pub fn commit(&mut self, height: Height) -> Result<()> {
if self.has(height) && self.puts.is_empty() && self.dels.is_empty() {
return Ok(());
}
self.meta.export(self.len(), height)?;
let mut wtx = self.keyspace.write_tx();
let partition = self.partition.as_ref().unwrap();
mem::take(&mut self.dels)
.into_iter()
.for_each(|key| wtx.remove(partition, ByteView::from(key)));
mem::take(&mut self.puts)
.into_iter()
.for_each(|(key, value)| {
// if CHECK_COLLISISONS {
// #[allow(unused_must_use)]
// if let Ok(Some(value)) = wtx.get(&self.partition, key.as_bytes()) {
// dbg!(
// &key,
// V::try_from(value.as_bytes().into()).unwrap(),
// &self.meta,
// self.rtx.get(&self.partition, key.as_bytes())
// );
// unreachable!();
// }
// }
wtx.insert(partition, ByteView::from(key), ByteView::from(value))
});
wtx.commit()?;
self.rtx = self.keyspace.read_tx();
Ok(())
}
pub fn rotate_memtable(&self) {
let _ = self.partition.as_ref().unwrap().inner().rotate_memtable();
}
pub fn height(&self) -> Option<Height> {
self.meta.height()
}
pub fn len(&self) -> usize {
let len = self.meta.len() + self.puts.len() - self.dels.len();
if len > 18440000000000000000 {
dbg!((
len,
self.meta.path(),
self.meta.len(),
self.puts.len(),
&self.dels,
));
unreachable!()
}
len
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
pub fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
fn open_partition_handle(
keyspace: &TransactionalKeyspace,
name: &str,
@@ -268,6 +198,83 @@ where
}
}
impl<'a, K, V> AnyStore for Store<K, V>
where
K: Debug + Clone + From<ByteView> + Ord + 'a,
V: Debug + Clone + From<ByteView>,
ByteView: From<K> + From<&'a K> + From<V>,
{
fn commit(&mut self, height: Height) -> Result<()> {
if self.has(height) && self.puts.is_empty() && self.dels.is_empty() {
return Ok(());
}
self.meta.export(self.len(), height)?;
let mut wtx = self.keyspace.write_tx();
let partition = self.partition.as_ref().unwrap();
mem::take(&mut self.dels)
.into_iter()
.for_each(|key| wtx.remove(partition, ByteView::from(key)));
mem::take(&mut self.puts)
.into_iter()
.for_each(|(key, value)| {
// if CHECK_COLLISIONS {
// #[allow(unused_must_use)]
// if let Ok(Some(value)) = wtx.get(&self.partition, key.as_bytes()) {
// dbg!(
// &key,
// V::try_from(value.as_bytes().into()).unwrap(),
// &self.meta,
// self.rtx.get(&self.partition, key.as_bytes())
// );
// unreachable!();
// }
// }
wtx.insert(partition, ByteView::from(key), ByteView::from(value))
});
wtx.commit()?;
self.rtx = self.keyspace.read_tx();
Ok(())
}
fn rotate_memtable(&self) {
let _ = self.partition.as_ref().unwrap().inner().rotate_memtable();
}
fn height(&self) -> Option<Height> {
self.meta.height()
}
fn len(&self) -> usize {
let len = self.meta.len() + self.puts.len() - self.dels.len();
if len > 18440000000000000000 {
dbg!((
len,
self.meta.path(),
self.meta.len(),
self.puts.len(),
&self.dels,
));
unreachable!()
}
len
}
fn has(&self, height: Height) -> bool {
self.meta.has(height)
}
fn needs(&self, height: Height) -> bool {
self.meta.needs(height)
}
}
impl<Key, Value> Clone for Store<Key, Value>
where
Key: Clone,
+19
View File
@@ -0,0 +1,19 @@
use brk_core::{Height, Result};
pub trait AnyStore {
fn commit(&mut self, height: Height) -> Result<()>;
fn rotate_memtable(&self);
fn height(&self) -> Option<Height>;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn has(&self, height: Height) -> bool;
fn needs(&self, height: Height) -> bool;
}