Merge branch 'main' into dockerize

This commit is contained in:
deadmanoz
2025-07-15 08:50:22 -07:00
committed by GitHub
196 changed files with 19187 additions and 6607 deletions

View File

@@ -7,14 +7,15 @@ use std::{collections::BTreeMap, path::Path, str::FromStr, thread};
use brk_core::{
AddressBytes, AddressBytesHash, BlockHash, BlockHashPrefix, Height, InputIndex, OutputIndex,
OutputType, Sats, Timestamp, TxIndex, Txid, TxidPrefix, TypeIndex, Version, Vin, Vout,
setrlimit,
OutputType, Sats, Timestamp, TxIndex, Txid, TxidPrefix, TypeIndex, TypeIndexWithOutputindex,
Unit, Version, Vin, Vout, setrlimit,
};
use bitcoin::{Transaction, TxIn, TxOut};
use brk_exit::Exit;
use brk_parser::Parser;
use brk_vec::{AnyVec, VecIterator};
use brk_store::AnyStore;
use brk_vec::{AnyVec, Mmap, VecIterator};
use color_eyre::eyre::{ContextCompat, eyre};
use log::{error, info};
use rayon::prelude::*;
@@ -39,6 +40,7 @@ pub struct Indexer {
impl Indexer {
pub fn forced_import(outputs_dir: &Path) -> color_eyre::Result<Self> {
setrlimit()?;
Ok(Self {
vecs: Vecs::forced_import(&outputs_dir.join("vecs/indexed"), VERSION + Version::ZERO)?,
stores: Stores::forced_import(&outputs_dir.join("stores"), VERSION + Version::ZERO)?,
@@ -55,6 +57,9 @@ impl Indexer {
let starting_indexes = Indexes::try_from((&mut self.vecs, &self.stores, rpc))
.unwrap_or_else(|_report| Indexes::default());
// dbg!(starting_indexes);
// panic!();
exit.block();
self.stores
.rollback_if_needed(&mut self.vecs, &starting_indexes)?;
@@ -84,9 +89,9 @@ impl Indexer {
height: Height,
rem: bool,
exit: &Exit|
-> color_eyre::Result<()> {
-> color_eyre::Result<bool> {
if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem || exit.triggered() {
return Ok(());
return Ok(false);
}
info!("Exporting...");
@@ -94,15 +99,88 @@ impl Indexer {
stores.commit(height)?;
vecs.flush(height)?;
exit.release();
Ok(())
Ok(true)
};
let mut txindex_to_first_outputindex_mmap_opt = None;
let mut p2pk65addressindex_to_p2pk65bytes_mmap_opt = None;
let mut p2pk33addressindex_to_p2pk33bytes_mmap_opt = None;
let mut p2pkhaddressindex_to_p2pkhbytes_mmap_opt = None;
let mut p2shaddressindex_to_p2shbytes_mmap_opt = None;
let mut p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt = None;
let mut p2wshaddressindex_to_p2wshbytes_mmap_opt = None;
let mut p2traddressindex_to_p2trbytes_mmap_opt = None;
let mut p2aaddressindex_to_p2abytes_mmap_opt = None;
let reset_mmaps_options =
|vecs: &mut Vecs,
txindex_to_first_outputindex_mmap_opt: &mut Option<Mmap>,
p2pk65addressindex_to_p2pk65bytes_mmap_opt: &mut Option<Mmap>,
p2pk33addressindex_to_p2pk33bytes_mmap_opt: &mut Option<Mmap>,
p2pkhaddressindex_to_p2pkhbytes_mmap_opt: &mut Option<Mmap>,
p2shaddressindex_to_p2shbytes_mmap_opt: &mut Option<Mmap>,
p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt: &mut Option<Mmap>,
p2wshaddressindex_to_p2wshbytes_mmap_opt: &mut Option<Mmap>,
p2traddressindex_to_p2trbytes_mmap_opt: &mut Option<Mmap>,
p2aaddressindex_to_p2abytes_mmap_opt: &mut Option<Mmap>| {
txindex_to_first_outputindex_mmap_opt
.replace(vecs.txindex_to_first_outputindex.create_mmap().unwrap());
p2pk65addressindex_to_p2pk65bytes_mmap_opt.replace(
vecs.p2pk65addressindex_to_p2pk65bytes
.create_mmap()
.unwrap(),
);
p2pk33addressindex_to_p2pk33bytes_mmap_opt.replace(
vecs.p2pk33addressindex_to_p2pk33bytes
.create_mmap()
.unwrap(),
);
p2pkhaddressindex_to_p2pkhbytes_mmap_opt
.replace(vecs.p2pkhaddressindex_to_p2pkhbytes.create_mmap().unwrap());
p2shaddressindex_to_p2shbytes_mmap_opt
.replace(vecs.p2shaddressindex_to_p2shbytes.create_mmap().unwrap());
p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt.replace(
vecs.p2wpkhaddressindex_to_p2wpkhbytes
.create_mmap()
.unwrap(),
);
p2wshaddressindex_to_p2wshbytes_mmap_opt
.replace(vecs.p2wshaddressindex_to_p2wshbytes.create_mmap().unwrap());
p2traddressindex_to_p2trbytes_mmap_opt
.replace(vecs.p2traddressindex_to_p2trbytes.create_mmap().unwrap());
p2aaddressindex_to_p2abytes_mmap_opt
.replace(vecs.p2aaddressindex_to_p2abytes.create_mmap().unwrap());
};
reset_mmaps_options(
vecs,
&mut txindex_to_first_outputindex_mmap_opt,
&mut p2pk65addressindex_to_p2pk65bytes_mmap_opt,
&mut p2pk33addressindex_to_p2pk33bytes_mmap_opt,
&mut p2pkhaddressindex_to_p2pkhbytes_mmap_opt,
&mut p2shaddressindex_to_p2shbytes_mmap_opt,
&mut p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt,
&mut p2wshaddressindex_to_p2wshbytes_mmap_opt,
&mut p2traddressindex_to_p2trbytes_mmap_opt,
&mut p2aaddressindex_to_p2abytes_mmap_opt,
);
parser.parse(start, end).iter().try_for_each(
|(height, block, blockhash)| -> color_eyre::Result<()> {
info!("Indexing block {height}...");
idxs.height = height;
let txindex_to_first_outputindex_mmap = txindex_to_first_outputindex_mmap_opt.as_ref().unwrap();
let p2pk65addressindex_to_p2pk65bytes_mmap = p2pk65addressindex_to_p2pk65bytes_mmap_opt.as_ref().unwrap();
let p2pk33addressindex_to_p2pk33bytes_mmap = p2pk33addressindex_to_p2pk33bytes_mmap_opt.as_ref().unwrap();
let p2pkhaddressindex_to_p2pkhbytes_mmap = p2pkhaddressindex_to_p2pkhbytes_mmap_opt.as_ref().unwrap();
let p2shaddressindex_to_p2shbytes_mmap = p2shaddressindex_to_p2shbytes_mmap_opt.as_ref().unwrap();
let p2wpkhaddressindex_to_p2wpkhbytes_mmap = p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt.as_ref().unwrap();
let p2wshaddressindex_to_p2wshbytes_mmap = p2wshaddressindex_to_p2wshbytes_mmap_opt.as_ref().unwrap();
let p2traddressindex_to_p2trbytes_mmap = p2traddressindex_to_p2trbytes_mmap_opt.as_ref().unwrap();
let p2aaddressindex_to_p2abytes_mmap = p2aaddressindex_to_p2abytes_mmap_opt.as_ref().unwrap();
// Used to check rapidhash collisions
let check_collisions = check_collisions && height > Height::new(COLLISIONS_CHECKED_UP_TO);
@@ -132,34 +210,6 @@ impl Indexer {
vecs.height_to_total_size.push_if_needed(height, block.total_size().into())?;
vecs.height_to_weight.push_if_needed(height, block.weight().into())?;
let inputs = block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.input
.iter()
.enumerate()
.map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx))
})
.collect::<Vec<_>>();
let outputs = block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.output
.iter()
.enumerate()
.map(move |(vout, txout)| (TxIndex::from(index), Vout::from(vout), txout, tx))
})
.collect::<Vec<_>>();
let tx_len = block.txdata.len();
let outputs_len = outputs.len();
let inputs_len = inputs.len();
let (
txid_prefix_to_txid_and_block_txindex_and_prev_txindex_join_handle,
input_source_vec_handle,
@@ -169,9 +219,9 @@ impl Indexer {
scope.spawn(|| -> color_eyre::Result<_> {
block
.txdata
.par_iter()
.iter()
.enumerate()
.map(|(index, tx)| -> color_eyre::Result<_> {
.map(|(index, tx)| {
let txid = Txid::from(tx.compute_txid());
let txid_prefix = TxidPrefix::from(&txid);
@@ -186,26 +236,21 @@ impl Indexer {
Ok((txid_prefix, (tx, txid, TxIndex::from(index), prev_txindex_opt)))
})
.try_fold(BTreeMap::new, |mut map, tuple| {
let (key, value) = tuple?;
map.insert(key, value);
Ok(map)
})
.try_reduce(BTreeMap::new, |mut map, mut map2| {
if map.len() > map2.len() {
map.append(&mut map2);
Ok(map)
} else {
map2.append(&mut map);
Ok(map2)
}
})
.collect::<color_eyre::Result<BTreeMap<_, _>>>()
});
let input_source_vec_handle = scope.spawn(|| {
let txindex_to_first_outputindex_mmap = vecs
.txindex_to_first_outputindex.mmap().load();
let inputs = block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.input
.iter()
.enumerate()
.map(move |(vin, txin)| (TxIndex::from(index), Vin::from(vin), txin, tx))
})
.collect::<Vec<_>>();
inputs
.into_par_iter()
@@ -237,11 +282,11 @@ impl Indexer {
let vout = Vout::from(outpoint.vout);
let outputindex = vecs.txindex_to_first_outputindex.get_or_read(prev_txindex, &txindex_to_first_outputindex_mmap)?
let outputindex = vecs.txindex_to_first_outputindex.get_or_read(prev_txindex, txindex_to_first_outputindex_mmap)?
.context("Expect outputindex to not be none")
.inspect_err(|_| {
dbg!(outpoint.txid, prev_txindex, vout);
})?.into_inner()
})?.into_owned()
+ vout;
Ok((inputindex, InputSource::PreviousBlock((
@@ -266,157 +311,155 @@ impl Indexer {
})
});
let outputindex_to_txout_outputtype_addressbytes_res_addressindex_opt_handle = scope.spawn(|| {
let p2pk65addressindex_to_p2pk65bytes_mmap = vecs
.p2pk65addressindex_to_p2pk65bytes.mmap().load();
let p2pk33addressindex_to_p2pk33bytes_mmap = vecs.p2pk33addressindex_to_p2pk33bytes.mmap().load();
let p2pkhaddressindex_to_p2pkhbytes_mmap = vecs.p2pkhaddressindex_to_p2pkhbytes.mmap().load();
let p2shaddressindex_to_p2shbytes_mmap = vecs.p2shaddressindex_to_p2shbytes.mmap().load();
let p2wpkhaddressindex_to_p2wpkhbytes_mmap = vecs.p2wpkhaddressindex_to_p2wpkhbytes.mmap().load();
let p2wshaddressindex_to_p2wshbytes_mmap = vecs.p2wshaddressindex_to_p2wshbytes.mmap().load();
let p2traddressindex_to_p2trbytes_mmap = vecs.p2traddressindex_to_p2trbytes.mmap().load();
let p2aaddressindex_to_p2abytes_mmap = vecs.p2aaddressindex_to_p2abytes.mmap().load();
let outputs = block
.txdata
.iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.output
.iter()
.enumerate()
.map(move |(vout, txout)| (TxIndex::from(index), Vout::from(vout), txout, tx))
}).collect::<Vec<_>>();
outputs
.into_par_iter()
.enumerate()
.map(
#[allow(clippy::type_complexity)]
|(block_outputindex, (block_txindex, vout, txout, tx))| -> color_eyre::Result<(
OutputIndex,
(
&TxOut,
TxIndex,
Vout,
OutputType,
brk_core::Result<AddressBytes>,
Option<TypeIndex>,
&Transaction,
),
)> {
let txindex = idxs.txindex + block_txindex;
let outputindex = idxs.outputindex + OutputIndex::from(block_outputindex);
let outputindex_to_txout_outputtype_addressbytes_res_addressindex = outputs.into_par_iter()
.enumerate()
.map(
#[allow(clippy::type_complexity)]
|(block_outputindex, (block_txindex, vout, txout, tx))| -> color_eyre::Result<(
OutputIndex,
(
&TxOut,
TxIndex,
Vout,
OutputType,
brk_core::Result<AddressBytes>,
Option<TypeIndex>,
&Transaction,
),
)> {
let txindex = idxs.txindex + block_txindex;
let outputindex = idxs.outputindex + OutputIndex::from(block_outputindex);
let script = &txout.script_pubkey;
let script = &txout.script_pubkey;
let outputtype = OutputType::from(script);
let outputtype = OutputType::from(script);
let address_bytes_res =
AddressBytes::try_from((script, outputtype)).inspect_err(|_| {
// dbg!(&txout, height, txi, &tx.compute_txid());
});
let typeindex_opt = address_bytes_res.as_ref().ok().and_then(|addressbytes| {
stores
.addressbyteshash_to_typeindex
.get(&AddressBytesHash::from((addressbytes, outputtype)))
.unwrap()
.map(|v| *v)
// Checking if not in the future
.and_then(|typeindex_local| {
(typeindex_local < idxs.typeindex(outputtype)).then_some(typeindex_local)
})
let address_bytes_res =
AddressBytes::try_from((script, outputtype)).inspect_err(|_| {
// dbg!(&txout, height, txi, &tx.compute_txid());
});
if let Some(Some(typeindex)) = check_collisions.then_some(typeindex_opt) {
let addressbytes = address_bytes_res.as_ref().unwrap();
let typeindex_opt = address_bytes_res.as_ref().ok().and_then(|addressbytes| {
stores
.addressbyteshash_to_typeindex
.get(&AddressBytesHash::from((addressbytes, outputtype)))
.unwrap()
.map(|v| *v)
// Checking if not in the future
.and_then(|typeindex_local| {
(typeindex_local < idxs.typeindex(outputtype)).then_some(typeindex_local)
})
});
let prev_addressbytes_opt = match outputtype {
OutputType::P2PK65 => vecs
.p2pk65addressindex_to_p2pk65bytes
.get_or_read(typeindex.into(), &p2pk65addressindex_to_p2pk65bytes_mmap)?
.map(|v| AddressBytes::from(v.into_inner())),
OutputType::P2PK33 => vecs
.p2pk33addressindex_to_p2pk33bytes
.get_or_read(typeindex.into(), &p2pk33addressindex_to_p2pk33bytes_mmap)?
.map(|v| AddressBytes::from(v.into_inner())),
OutputType::P2PKH => vecs
.p2pkhaddressindex_to_p2pkhbytes
.get_or_read(typeindex.into(), &p2pkhaddressindex_to_p2pkhbytes_mmap)?
.map(|v| AddressBytes::from(v.into_inner())),
OutputType::P2SH => vecs
.p2shaddressindex_to_p2shbytes
.get_or_read(typeindex.into(), &p2shaddressindex_to_p2shbytes_mmap)?
.map(|v| AddressBytes::from(v.into_inner())),
OutputType::P2WPKH => vecs
.p2wpkhaddressindex_to_p2wpkhbytes
.get_or_read(typeindex.into(), &p2wpkhaddressindex_to_p2wpkhbytes_mmap)?
.map(|v| AddressBytes::from(v.into_inner())),
OutputType::P2WSH => vecs
.p2wshaddressindex_to_p2wshbytes
.get_or_read(typeindex.into(), &p2wshaddressindex_to_p2wshbytes_mmap)?
.map(|v| AddressBytes::from(v.into_inner())),
OutputType::P2TR => vecs
.p2traddressindex_to_p2trbytes
.get_or_read(typeindex.into(), &p2traddressindex_to_p2trbytes_mmap)?
.map(|v| AddressBytes::from(v.into_inner())),
OutputType::P2A => vecs
.p2aaddressindex_to_p2abytes
.get_or_read(typeindex.into(), &p2aaddressindex_to_p2abytes_mmap)?
.map(|v| AddressBytes::from(v.into_inner())),
OutputType::Empty | OutputType::OpReturn | OutputType::P2MS | OutputType::Unknown => {
unreachable!()
}
};
let prev_addressbytes =
prev_addressbytes_opt.as_ref().context("Expect to have addressbytes")?;
if let Some(Some(typeindex)) = check_collisions.then_some(typeindex_opt) {
let addressbytes = address_bytes_res.as_ref().unwrap();
if stores.addressbyteshash_to_typeindex.needs(height)
&& prev_addressbytes != addressbytes
{
let txid = tx.compute_txid();
dbg!(
height,
txid,
vout,
block_txindex,
outputtype,
prev_addressbytes,
addressbytes,
&idxs,
typeindex,
typeindex,
txout,
AddressBytesHash::from((addressbytes, outputtype)),
);
panic!()
let prev_addressbytes_opt = match outputtype {
OutputType::P2PK65 => vecs
.p2pk65addressindex_to_p2pk65bytes
.get_or_read(typeindex.into(), p2pk65addressindex_to_p2pk65bytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2PK33 => vecs
.p2pk33addressindex_to_p2pk33bytes
.get_or_read(typeindex.into(), p2pk33addressindex_to_p2pk33bytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2PKH => vecs
.p2pkhaddressindex_to_p2pkhbytes
.get_or_read(typeindex.into(), p2pkhaddressindex_to_p2pkhbytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2SH => vecs
.p2shaddressindex_to_p2shbytes
.get_or_read(typeindex.into(), p2shaddressindex_to_p2shbytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2WPKH => vecs
.p2wpkhaddressindex_to_p2wpkhbytes
.get_or_read(typeindex.into(), p2wpkhaddressindex_to_p2wpkhbytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2WSH => vecs
.p2wshaddressindex_to_p2wshbytes
.get_or_read(typeindex.into(), p2wshaddressindex_to_p2wshbytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2TR => vecs
.p2traddressindex_to_p2trbytes
.get_or_read(typeindex.into(), p2traddressindex_to_p2trbytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2A => vecs
.p2aaddressindex_to_p2abytes
.get_or_read(typeindex.into(), p2aaddressindex_to_p2abytes_mmap)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::Empty | OutputType::OpReturn | OutputType::P2MS | OutputType::Unknown => {
unreachable!()
}
}
};
let prev_addressbytes =
prev_addressbytes_opt.as_ref().context("Expect to have addressbytes")?;
Ok((
outputindex,
(
txout,
txindex,
if stores.addressbyteshash_to_typeindex.needs(height)
&& prev_addressbytes != addressbytes
{
let txid = tx.compute_txid();
dbg!(
height,
txid,
vout,
block_txindex,
outputtype,
address_bytes_res,
typeindex_opt,
tx,
),
))
},
)
.try_fold(BTreeMap::new, |mut map, tuple| -> color_eyre::Result<_> {
let (key, value) = tuple?;
map.insert(key, value);
Ok(map)
})
.try_reduce(BTreeMap::new, |mut map, mut map2| {
if map.len() > map2.len() {
map.append(&mut map2);
Ok(map)
} else {
map2.append(&mut map);
Ok(map2)
prev_addressbytes,
addressbytes,
&idxs,
typeindex,
typeindex,
txout,
AddressBytesHash::from((addressbytes, outputtype)),
);
panic!()
}
}
})
});
Ok((
outputindex,
(
txout,
txindex,
vout,
outputtype,
address_bytes_res,
typeindex_opt,
tx,
),
))
},
)
.try_fold(BTreeMap::new, |mut map, tuple| -> color_eyre::Result<_> {
let (key, value) = tuple?;
map.insert(key, value);
Ok(map)
})
.try_reduce(BTreeMap::new, |mut map, mut map2| {
if map.len() > map2.len() {
map.append(&mut map2);
Ok(map)
} else {
map2.append(&mut map);
Ok(map2)
}
});
(
txid_prefix_to_txid_and_block_txindex_and_prev_txindex_handle.join(),
input_source_vec_handle.join(),
outputindex_to_txout_outputtype_addressbytes_res_addressindex_opt_handle.join(),
outputindex_to_txout_outputtype_addressbytes_res_addressindex,
)
});
@@ -436,7 +479,11 @@ impl Indexer {
.ok()
.context(
"Expect outputindex_to_txout_outputtype_addressbytes_res_addressindex_opt_handle to join",
)??;
)?;
let outputs_len = outputindex_to_txout_outputtype_addressbytes_res_addressindex_opt.len();
let inputs_len = input_source_vec.len();
let tx_len = block.txdata.len();
let mut new_txindexvout_to_outputindex: BTreeMap<
(TxIndex, Vout),
@@ -546,6 +593,10 @@ impl Indexer {
new_txindexvout_to_outputindex
.insert((txindex, vout), outputindex);
if outputtype.is_address() {
stores.addresstype_to_typeindex_with_outputindex.get_mut(outputtype).unwrap().insert_if_needed(TypeIndexWithOutputindex::from((typeindex, outputindex)), Unit, height);
}
Ok(())
},
)?;
@@ -687,7 +738,22 @@ impl Indexer {
idxs.inputindex += InputIndex::from(inputs_len);
idxs.outputindex += OutputIndex::from(outputs_len);
export_if_needed(stores, vecs, height, false, exit)?;
let exported = export_if_needed(stores, vecs, height, false, exit)?;
if exported {
reset_mmaps_options(
vecs,
&mut txindex_to_first_outputindex_mmap_opt,
&mut p2pk65addressindex_to_p2pk65bytes_mmap_opt,
&mut p2pk33addressindex_to_p2pk33bytes_mmap_opt,
&mut p2pkhaddressindex_to_p2pkhbytes_mmap_opt,
&mut p2shaddressindex_to_p2shbytes_mmap_opt,
&mut p2wpkhaddressindex_to_p2wpkhbytes_mmap_opt,
&mut p2wshaddressindex_to_p2wshbytes_mmap_opt,
&mut p2traddressindex_to_p2trbytes_mmap_opt,
&mut p2aaddressindex_to_p2abytes_mmap_opt,
);
}
Ok(())
},
@@ -695,8 +761,6 @@ impl Indexer {
export_if_needed(stores, vecs, idxs.height, true, exit)?;
stores.rotate_memtables();
Ok(starting_indexes)
}
}

View File

@@ -1,12 +1,13 @@
use std::{fs, path::Path, thread};
use std::{borrow::Cow, fs, path::Path, thread};
use brk_core::{
AddressBytes, AddressBytesHash, BlockHashPrefix, Height, OutputType, Result, TxIndex,
TxidPrefix, TypeIndex, Value, Version,
AddressBytes, AddressBytesHash, BlockHashPrefix, ByAddressType, Height, OutputIndex,
OutputType, Result, TxIndex, TxidPrefix, TypeIndex, TypeIndexWithOutputindex, Unit, Version,
};
use brk_store::Store;
use brk_vec::AnyIterableVec;
use brk_store::{AnyStore, Store};
use brk_vec::{AnyIterableVec, VecIterator};
use fjall::{PersistMode, TransactionalKeyspace};
use rayon::prelude::*;
use crate::Indexes;
@@ -15,9 +16,12 @@ use super::Vecs;
#[derive(Clone)]
pub struct Stores {
pub keyspace: TransactionalKeyspace,
pub addressbyteshash_to_typeindex: Store<AddressBytesHash, TypeIndex>,
pub blockhashprefix_to_height: Store<BlockHashPrefix, Height>,
pub txidprefix_to_txindex: Store<TxidPrefix, TxIndex>,
pub addresstype_to_typeindex_with_outputindex:
ByAddressType<Store<TypeIndexWithOutputindex, Unit>>,
}
const VERSION: Version = Version::ZERO;
@@ -62,24 +66,193 @@ impl Stores {
None,
)
});
let p2aaddressindex_with_outputindex = scope.spawn(|| {
Store::import(
&keyspace,
path,
"p2aaddressindex_with_outputindex",
version + VERSION + Version::ZERO,
Some(false),
)
});
let p2pk33addressindex_with_outputindex = scope.spawn(|| {
Store::import(
&keyspace,
path,
"p2pk33addressindex_with_outputindex",
version + VERSION + Version::ZERO,
Some(false),
)
});
let p2pk65addressindex_with_outputindex = scope.spawn(|| {
Store::import(
&keyspace,
path,
"p2pk65addressindex_with_outputindex",
version + VERSION + Version::ZERO,
Some(false),
)
});
let p2pkhaddressindex_with_outputindex = scope.spawn(|| {
Store::import(
&keyspace,
path,
"p2pkhaddressindex_with_outputindex",
version + VERSION + Version::ZERO,
Some(false),
)
});
let p2shaddressindex_with_outputindex = scope.spawn(|| {
Store::import(
&keyspace,
path,
"p2shaddressindex_with_outputindex",
version + VERSION + Version::ZERO,
Some(false),
)
});
let p2traddressindex_with_outputindex = scope.spawn(|| {
Store::import(
&keyspace,
path,
"p2traddressindex_with_outputindex",
version + VERSION + Version::ZERO,
Some(false),
)
});
let p2wpkhaddressindex_with_outputindex = scope.spawn(|| {
Store::import(
&keyspace,
path,
"p2wpkhaddressindex_with_outputindex",
version + VERSION + Version::ZERO,
Some(false),
)
});
let p2wshaddressindex_with_outputindex = scope.spawn(|| {
Store::import(
&keyspace,
path,
"p2wshaddressindex_with_outputindex",
version + VERSION + Version::ZERO,
Some(false),
)
});
Ok(Self {
keyspace: keyspace.clone(),
addressbyteshash_to_typeindex: addressbyteshash_to_typeindex.join().unwrap()?,
blockhashprefix_to_height: blockhashprefix_to_height.join().unwrap()?,
txidprefix_to_txindex: txidprefix_to_txindex.join().unwrap()?,
addresstype_to_typeindex_with_outputindex: ByAddressType {
p2pk65: p2pk65addressindex_with_outputindex.join().unwrap()?,
p2pk33: p2pk33addressindex_with_outputindex.join().unwrap()?,
p2pkh: p2pkhaddressindex_with_outputindex.join().unwrap()?,
p2sh: p2shaddressindex_with_outputindex.join().unwrap()?,
p2wpkh: p2wpkhaddressindex_with_outputindex.join().unwrap()?,
p2wsh: p2wshaddressindex_with_outputindex.join().unwrap()?,
p2tr: p2traddressindex_with_outputindex.join().unwrap()?,
p2a: p2aaddressindex_with_outputindex.join().unwrap()?,
},
})
})
}
pub fn starting_height(&self) -> Height {
self.as_slice()
.into_iter()
.map(|store| {
// let height =
store.height().map(Height::incremented).unwrap_or_default()
// dbg!((height, store.name()));
})
.min()
.unwrap()
}
pub fn commit(&mut self, height: Height) -> Result<()> {
self.as_mut_slice()
.into_par_iter()
.try_for_each(|store| store.commit(height))?;
self.keyspace
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
fn as_slice(&self) -> [&(dyn AnyStore + Send + Sync); 11] {
[
&self.addressbyteshash_to_typeindex,
&self.blockhashprefix_to_height,
&self.txidprefix_to_txindex,
&self.addresstype_to_typeindex_with_outputindex.p2a,
&self.addresstype_to_typeindex_with_outputindex.p2pk33,
&self.addresstype_to_typeindex_with_outputindex.p2pk65,
&self.addresstype_to_typeindex_with_outputindex.p2pkh,
&self.addresstype_to_typeindex_with_outputindex.p2sh,
&self.addresstype_to_typeindex_with_outputindex.p2tr,
&self.addresstype_to_typeindex_with_outputindex.p2wpkh,
&self.addresstype_to_typeindex_with_outputindex.p2wsh,
]
}
fn as_mut_slice(&mut self) -> [&mut (dyn AnyStore + Send + Sync); 11] {
[
&mut self.addressbyteshash_to_typeindex,
&mut self.blockhashprefix_to_height,
&mut self.txidprefix_to_txindex,
&mut self.addresstype_to_typeindex_with_outputindex.p2a,
&mut self.addresstype_to_typeindex_with_outputindex.p2pk33,
&mut self.addresstype_to_typeindex_with_outputindex.p2pk65,
&mut self.addresstype_to_typeindex_with_outputindex.p2pkh,
&mut self.addresstype_to_typeindex_with_outputindex.p2sh,
&mut self.addresstype_to_typeindex_with_outputindex.p2tr,
&mut self.addresstype_to_typeindex_with_outputindex.p2wpkh,
&mut self.addresstype_to_typeindex_with_outputindex.p2wsh,
]
}
pub fn rollback_if_needed(
&mut self,
vecs: &mut Vecs,
starting_indexes: &Indexes,
) -> color_eyre::Result<()> {
if self.addressbyteshash_to_typeindex.is_empty()
&& self.blockhashprefix_to_height.is_empty()
&& self.txidprefix_to_txindex.is_empty()
if self.addressbyteshash_to_typeindex.is_empty()?
&& self.blockhashprefix_to_height.is_empty()?
&& self.txidprefix_to_txindex.is_empty()?
&& self
.addresstype_to_typeindex_with_outputindex
.p2a
.is_empty()?
&& self
.addresstype_to_typeindex_with_outputindex
.p2pk33
.is_empty()?
&& self
.addresstype_to_typeindex_with_outputindex
.p2pk65
.is_empty()?
&& self
.addresstype_to_typeindex_with_outputindex
.p2pkh
.is_empty()?
&& self
.addresstype_to_typeindex_with_outputindex
.p2sh
.is_empty()?
&& self
.addresstype_to_typeindex_with_outputindex
.p2tr
.is_empty()?
&& self
.addresstype_to_typeindex_with_outputindex
.p2wpkh
.is_empty()?
&& self
.addresstype_to_typeindex_with_outputindex
.p2wsh
.is_empty()?
{
return Ok(());
}
@@ -88,7 +261,7 @@ impl Stores {
vecs.height_to_blockhash
.iter_at(starting_indexes.height)
.for_each(|(_, v)| {
let blockhashprefix = BlockHashPrefix::from(Value::into_inner(v));
let blockhashprefix = BlockHashPrefix::from(v.into_owned());
self.blockhashprefix_to_height.remove(blockhashprefix);
});
@@ -96,14 +269,14 @@ impl Stores {
.height_to_first_p2pk65addressindex
.iter()
.get(starting_indexes.height)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let mut p2pk65addressindex_to_p2pk65bytes_iter =
vecs.p2pk65addressindex_to_p2pk65bytes.iter();
while let Some(typedbytes) = p2pk65addressindex_to_p2pk65bytes_iter
.get(index)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PK65));
@@ -116,14 +289,14 @@ impl Stores {
.height_to_first_p2pk33addressindex
.iter()
.get(starting_indexes.height)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let mut p2pk33addressindex_to_p2pk33bytes_iter =
vecs.p2pk33addressindex_to_p2pk33bytes.iter();
while let Some(typedbytes) = p2pk33addressindex_to_p2pk33bytes_iter
.get(index)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PK33));
@@ -136,14 +309,14 @@ impl Stores {
.height_to_first_p2pkhaddressindex
.iter()
.get(starting_indexes.height)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let mut p2pkhaddressindex_to_p2pkhbytes_iter =
vecs.p2pkhaddressindex_to_p2pkhbytes.iter();
while let Some(typedbytes) = p2pkhaddressindex_to_p2pkhbytes_iter
.get(index)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2PKH));
@@ -156,14 +329,14 @@ impl Stores {
.height_to_first_p2shaddressindex
.iter()
.get(starting_indexes.height)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let mut p2shaddressindex_to_p2shbytes_iter =
vecs.p2shaddressindex_to_p2shbytes.iter();
while let Some(typedbytes) = p2shaddressindex_to_p2shbytes_iter
.get(index)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2SH));
@@ -176,14 +349,14 @@ impl Stores {
.height_to_first_p2traddressindex
.iter()
.get(starting_indexes.height)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let mut p2traddressindex_to_p2trbytes_iter =
vecs.p2traddressindex_to_p2trbytes.iter();
while let Some(typedbytes) = p2traddressindex_to_p2trbytes_iter
.get(index)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2TR));
@@ -196,14 +369,14 @@ impl Stores {
.height_to_first_p2wpkhaddressindex
.iter()
.get(starting_indexes.height)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let mut p2wpkhaddressindex_to_p2wpkhbytes_iter =
vecs.p2wpkhaddressindex_to_p2wpkhbytes.iter();
while let Some(typedbytes) = p2wpkhaddressindex_to_p2wpkhbytes_iter
.get(index)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2WPKH));
@@ -216,14 +389,14 @@ impl Stores {
.height_to_first_p2wshaddressindex
.iter()
.get(starting_indexes.height)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let mut p2wshaddressindex_to_p2wshbytes_iter =
vecs.p2wshaddressindex_to_p2wshbytes.iter();
while let Some(typedbytes) = p2wshaddressindex_to_p2wshbytes_iter
.get(index)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2WSH));
@@ -236,13 +409,13 @@ impl Stores {
.height_to_first_p2aaddressindex
.iter()
.get(starting_indexes.height)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let mut p2aaddressindex_to_p2abytes_iter = vecs.p2aaddressindex_to_p2abytes.iter();
while let Some(typedbytes) = p2aaddressindex_to_p2abytes_iter
.get(index)
.map(Value::into_inner)
.map(Cow::into_owned)
{
let bytes = AddressBytes::from(typedbytes);
let hash = AddressBytesHash::from((&bytes, OutputType::P2A));
@@ -251,15 +424,15 @@ impl Stores {
}
}
} else {
self.blockhashprefix_to_height.reset_partition()?;
self.addressbyteshash_to_typeindex.reset_partition()?;
self.blockhashprefix_to_height.reset()?;
self.addressbyteshash_to_typeindex.reset()?;
}
if starting_indexes.txindex != TxIndex::ZERO {
vecs.txindex_to_txid
.iter_at(starting_indexes.txindex)
.for_each(|(txindex, txid)| {
let txidprefix = TxidPrefix::from(&txid.into_inner());
let txidprefix = TxidPrefix::from(&txid.into_owned());
// "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599"
let is_not_first_dup = txindex != TxIndex::new(142783)
@@ -274,52 +447,51 @@ impl Stores {
}
});
} else {
self.txidprefix_to_txindex.reset_partition()?;
self.txidprefix_to_txindex.reset()?;
}
if starting_indexes.outputindex != OutputIndex::ZERO {
let mut outputindex_to_typeindex_iter = vecs.outputindex_to_typeindex.into_iter();
vecs.outputindex_to_outputtype
.iter_at(starting_indexes.outputindex)
.filter(|(_, outputtype)| outputtype.is_address())
.for_each(|(outputindex, outputtype)| {
let outputtype = outputtype.into_owned();
let typeindex = outputindex_to_typeindex_iter.unwrap_get_inner(outputindex);
self.addresstype_to_typeindex_with_outputindex
.get_mut(outputtype)
.unwrap()
.remove(TypeIndexWithOutputindex::from((typeindex, outputindex)));
});
} else {
self.addresstype_to_typeindex_with_outputindex.p2a.reset()?;
self.addresstype_to_typeindex_with_outputindex
.p2pk33
.reset()?;
self.addresstype_to_typeindex_with_outputindex
.p2pk65
.reset()?;
self.addresstype_to_typeindex_with_outputindex
.p2pkh
.reset()?;
self.addresstype_to_typeindex_with_outputindex
.p2sh
.reset()?;
self.addresstype_to_typeindex_with_outputindex
.p2tr
.reset()?;
self.addresstype_to_typeindex_with_outputindex
.p2wpkh
.reset()?;
self.addresstype_to_typeindex_with_outputindex
.p2wsh
.reset()?;
}
self.commit(starting_indexes.height.decremented().unwrap_or_default())?;
Ok(())
}
pub fn starting_height(&self) -> Height {
[
self.addressbyteshash_to_typeindex.height(),
self.blockhashprefix_to_height.height(),
self.txidprefix_to_txindex.height(),
]
.into_iter()
.map(|height| height.map(Height::incremented).unwrap_or_default())
.min()
.unwrap()
}
pub fn commit(&mut self, height: Height) -> Result<()> {
thread::scope(|scope| -> Result<()> {
let addressbyteshash_to_typeindex_commit_handle =
scope.spawn(|| self.addressbyteshash_to_typeindex.commit(height));
let blockhashprefix_to_height_commit_handle =
scope.spawn(|| self.blockhashprefix_to_height.commit(height));
let txidprefix_to_txindex_commit_handle =
scope.spawn(|| self.txidprefix_to_txindex.commit(height));
addressbyteshash_to_typeindex_commit_handle
.join()
.unwrap()?;
blockhashprefix_to_height_commit_handle.join().unwrap()?;
txidprefix_to_txindex_commit_handle.join().unwrap()?;
Ok(())
})?;
self.keyspace
.persist(PersistMode::SyncAll)
.map_err(|e| e.into())
}
pub fn rotate_memtables(&self) {
self.addressbyteshash_to_typeindex.rotate_memtable();
self.blockhashprefix_to_height.rotate_memtable();
self.txidprefix_to_txindex.rotate_memtable();
}
}