bindex: retrying fjall

This commit is contained in:
nym21
2025-01-27 12:49:19 +01:00
parent 042be6e229
commit 90a5c4fbf8
24 changed files with 974 additions and 55 deletions

View File

@@ -3,7 +3,8 @@ use std::{
io::{Read, Write},
path::Path,
str::FromStr,
thread::{self},
thread::{self, sleep},
time::Duration,
};
use biter::{
@@ -12,12 +13,13 @@ use biter::{
};
use color_eyre::eyre::{eyre, ContextCompat};
use exit::Exit;
use fjall::{PersistMode, ReadTransaction, TransactionalKeyspace};
use rayon::prelude::*;
mod storage;
mod structs;
use storage::{Stores, Vecs};
use storage::{Partitions, Vecs};
use structs::{
Addressbytes, AddressbytesPrefix, Addressindex, Addresstype, Amount, BlockHashPrefix, Height, Timestamp,
TxidPrefix, Txindex, Txinindex, Txoutindex, Vin, Vout,
@@ -35,17 +37,17 @@ impl Indexer {
let mut vecs = Vecs::import(&indexes_dir.join("vecs"))?;
let open_stores = || Stores::open(&indexes_dir.join("stores"));
let stores = open_stores()?;
let keyspace = fjall::Config::new(indexes_dir.join("fjall")).open_transactional()?;
let mut parts = Partitions::import(&keyspace, &exit)?;
let rtx = keyspace.read_tx();
let mut height = vecs
.min_height()
.unwrap_or_default()
.min(stores.min_height())
.min(parts.min_height())
.and_then(|h| h.checked_sub(UNSAFE_BLOCKS))
.map(Height::from)
.unwrap_or_default();
// let mut height = Height::default();
let mut txindex_global = vecs.height_to_first_txindex.get_or_default(height)?;
let mut txinindex_global = vecs.height_to_first_txinindex.get_or_default(height)?;
@@ -64,31 +66,37 @@ impl Indexer {
let mut p2wpkhindex_global = vecs.height_to_first_p2wpkhindex.get_or_default(height)?;
let mut p2wshindex_global = vecs.height_to_first_p2wshindex.get_or_default(height)?;
let export = |stores: Stores, vecs: &mut Vecs, height: Height| -> color_eyre::Result<()> {
let export = |keyspace: &TransactionalKeyspace,
rtx: ReadTransaction,
parts: &mut Partitions,
vecs: &mut Vecs,
height: Height|
-> color_eyre::Result<()> {
println!("Exporting...");
if height >= Height::from(400_000_u32) {
pause();
// println!("Flushing vecs...");
}
drop(rtx);
exit.block();
thread::scope(|scope| -> color_eyre::Result<()> {
let vecs_handle = scope.spawn(|| vecs.flush(height));
let stores_handle = scope.spawn(|| stores.export(height));
parts.write(keyspace, height)?;
keyspace.persist(PersistMode::SyncAll)?;
vecs_handle.join().unwrap()?;
stores_handle.join().unwrap()?;
Ok(())
})?;
exit.unblock();
Ok(())
};
let mut stores_opt = Some(stores);
// let mut stores_opt = Some(stores);
let mut rtx_opt = Some(rtx);
biter::new(bitcoin_dir, Some(height.into()), Some(500_000), rpc)
biter::new(bitcoin_dir, Some(height.into()), None, rpc)
.iter()
.try_for_each(|(_height, block, blockhash)| -> color_eyre::Result<()> {
println!("Processing block {_height}...");
@@ -96,27 +104,28 @@ impl Indexer {
height = Height::from(_height);
let timestamp = Timestamp::try_from(block.header.time)?;
let mut stores = stores_opt.take().context("option should have wtx")?;
// let mut stores = stores_opt.take().context("option should have store")?;
let rtx = rtx_opt.take().context("option should have rtx")?;
if let Some(saved_blockhash) = vecs.height_to_blockhash.get(height)? {
if &blockhash != saved_blockhash.as_ref() {
todo!("Rollback not implemented");
// parts.rollback_from(&mut wtx, height, &exit)?;
// parts.rollback_from(&mut rtx, height, &exit)?;
}
}
let blockhash_prefix = BlockHashPrefix::try_from(&blockhash)?;
if stores
if parts
.blockhash_prefix_to_height
.get(&blockhash_prefix)?
.is_some_and(|prev_height| *prev_height != height)
.get(&rtx, &blockhash_prefix)?
.is_some_and(|prev_height| prev_height != height)
{
dbg!(blockhash);
return Err(eyre!("Collision, expect prefix to need be set yet"));
}
stores
parts
.blockhash_prefix_to_height
.insert_if_needed(blockhash_prefix, height, height);
@@ -194,9 +203,9 @@ impl Indexer {
let txid_prefix = TxidPrefix::try_from(&txid)?;
let prev_txindex_slice_opt =
if check_collisions && stores.txid_prefix_to_txindex.needs(height) {
if check_collisions && parts.txid_prefix_to_txindex.needs(height) {
// Should only find collisions for two txids (duplicates), see below
stores.txid_prefix_to_txindex.get(&txid_prefix)?.cloned()
parts.txid_prefix_to_txindex.get(&rtx, &txid_prefix)?
} else {
None
};
@@ -235,14 +244,14 @@ impl Indexer {
return Ok((txinindex, InputSource::SameBlock((tx, txindex, txin, vin))));
}
let prev_txindex = if let Some(txindex) = stores
let prev_txindex = if let Some(txindex) = parts
.txid_prefix_to_txindex
.get(&TxidPrefix::try_from(&outpoint.txid)?)?
.get(&rtx, &TxidPrefix::try_from(&outpoint.txid)?)?
.and_then(|txindex| {
// Checking if not finding txindex from the future
(txindex < &txindex_global).then_some(txindex)
(txindex < txindex_global).then_some(txindex)
}) {
*txindex
txindex
} else {
// dbg!(txindex_global + block_txindex, txindex, txin, vin);
return Ok((txinindex, InputSource::SameBlock((tx, txindex, txin, vin))));
@@ -312,11 +321,10 @@ impl Indexer {
});
let addressindex_opt = addressbytes_res.as_ref().ok().and_then(|addressbytes| {
stores
parts
.addressbytes_prefix_to_addressindex
.get(&AddressbytesPrefix::from((addressbytes, addresstype)))
.get(&rtx, &AddressbytesPrefix::from((addressbytes, addresstype)))
.unwrap()
.cloned()
// Checking if not in the future
.and_then(|addressindex_local| {
(addressindex_local < addressindex_global).then_some(addressindex_local)
@@ -346,7 +354,7 @@ impl Indexer {
if (vecs.addressindex_to_addresstype.hasnt(addressindex)
&& addresstype != prev_addresstype)
|| (stores.addressbytes_prefix_to_addressindex.needs(height)
|| (parts.addressbytes_prefix_to_addressindex.needs(height)
&& prev_addressbytes != addressbytes)
{
let txid = tx.compute_txid();
@@ -494,9 +502,9 @@ impl Indexer {
let addressbytes_prefix = addressbytes_prefix.unwrap();
already_added_addressbytes_prefix
.insert(addressbytes_prefix.clone(), addressindex);
.insert(addressbytes_prefix, addressindex);
stores.addressbytes_prefix_to_addressindex.insert_if_needed(
parts.addressbytes_prefix_to_addressindex.insert_if_needed(
addressbytes_prefix,
addressindex,
height,
@@ -580,7 +588,7 @@ impl Indexer {
match prev_txindex_opt {
None => {
stores
parts
.txid_prefix_to_txindex
.insert_if_needed(txid_prefix, txindex, height);
}
@@ -646,16 +654,24 @@ impl Indexer {
let should_snapshot = _height != 0 && _height % SNAPSHOT_BLOCK_RANGE == 0 && !exit.active();
if should_snapshot {
export(stores, &mut vecs, height)?;
stores_opt.replace(open_stores()?);
export(&keyspace, rtx, &mut parts, &mut vecs, height)?;
rtx_opt.replace(keyspace.read_tx());
} else {
stores_opt.replace(stores);
rtx_opt.replace(rtx);
}
Ok(())
})?;
export(stores_opt.take().context("option should have wtx")?, &mut vecs, height)?;
export(
&keyspace,
rtx_opt.take().context("option should have wtx")?,
&mut parts,
&mut vecs,
height,
)?;
sleep(Duration::from_millis(100));
Ok(())
}