vec: compression part 1

This commit is contained in:
nym21
2025-03-13 17:11:04 +01:00
parent b4fbcf6bee
commit c459a3033d
30 changed files with 960 additions and 337 deletions

View File

@@ -22,12 +22,6 @@ Vecs: `src/storage/vecs/mod.rs`
Stores: `src/storage/stores/mod.rs`
## Examples
Rust: `src/main.rs`
Python: `../python/parse.py`
## Benchmark
Indexing `0..885_835` took `11 hours 6 min 50 s` on a Macbook Pro M3 Pro with 36 GB of RAM

View File

@@ -1,6 +1,6 @@
use std::{path::Path, thread::sleep, time::Duration};
use std::path::Path;
use brk_core::default_bitcoin_path;
use brk_core::{default_bitcoin_path, dot_brk_path};
use brk_exit::Exit;
use brk_indexer::{Indexer, rpc::RpcApi};
use brk_parser::{
@@ -24,23 +24,25 @@ fn main() -> color_eyre::Result<()> {
let parser = Parser::new(bitcoin_dir.join("blocks"), rpc);
let mut indexer = Indexer::new(Path::new("../../_outputs/indexed").to_owned())?;
let outputs = dot_brk_path().join("outputs");
let mut indexer = Indexer::new(outputs.join("indexed").to_owned(), true)?;
indexer.import_stores()?;
indexer.import_vecs()?;
loop {
let block_count = rpc.get_block_count()?;
// loop {
let block_count = rpc.get_block_count()?;
info!("{block_count} blocks found.");
info!("{block_count} blocks found.");
indexer.index(&parser, rpc, &exit)?;
indexer.index(&parser, rpc, &exit)?;
info!("Waiting for new blocks...");
info!("Waiting for new blocks...");
while block_count == rpc.get_block_count()? {
sleep(Duration::from_secs(1))
}
}
// while block_count == rpc.get_block_count()? {
// sleep(Duration::from_secs(1))
// }
// }
#[allow(unreachable_code)]
Ok(())

View File

@@ -18,6 +18,7 @@ pub use brk_parser::*;
use bitcoin::{Transaction, TxIn, TxOut};
use brk_exit::Exit;
use brk_vec::Compressed;
use color_eyre::eyre::{ContextCompat, eyre};
use log::info;
use rayon::prelude::*;
@@ -36,15 +37,17 @@ pub struct Indexer {
path: PathBuf,
vecs: Option<Vecs>,
stores: Option<Stores>,
check_collisions: bool,
}
impl Indexer {
pub fn new(indexes_dir: PathBuf) -> color_eyre::Result<Self> {
pub fn new(indexes_dir: PathBuf, check_collisions: bool) -> color_eyre::Result<Self> {
setrlimit()?;
Ok(Self {
path: indexes_dir,
vecs: None,
stores: None,
check_collisions,
})
}
@@ -66,8 +69,6 @@ impl Indexer {
rpc: &'static rpc::Client,
exit: &Exit,
) -> color_eyre::Result<Indexes> {
let check_collisions = true;
let starting_indexes = Indexes::try_from((
self.vecs.as_mut().unwrap(),
self.stores.as_ref().unwrap(),
@@ -96,7 +97,7 @@ impl Indexer {
let mut idxs = starting_indexes.clone();
let start = Some(idxs.height);
let end = None; //Some(Height::new(400_000));
let end = None;
if starting_indexes.height > Height::try_from(rpc)?
|| end.is_some_and(|end| starting_indexes.height > end)
@@ -124,12 +125,14 @@ impl Indexer {
Ok(())
};
parser.parse(start, None).iter().try_for_each(
parser.parse(start, end).iter().try_for_each(
|(height, block, blockhash)| -> color_eyre::Result<()> {
info!("Indexing block {height}...");
idxs.height = height;
let check_collisions = self.check_collisions && height > Height::new(886_000);
let blockhash = BlockHash::from(blockhash);
let blockhash_prefix = BlockHashPrefix::from(&blockhash);
@@ -232,8 +235,6 @@ impl Indexer {
let txindex = idxs.txindex + block_txindex;
let txinindex = idxs.txinindex + Txinindex::from(block_txinindex);
// dbg!((txindex, txinindex, vin));
let outpoint = txin.previous_output;
let txid = Txid::from(outpoint.txid);
@@ -598,6 +599,10 @@ impl Indexer {
return Ok(());
}
if !check_collisions {
return Ok(())
}
let len = vecs.txindex_to_txid.len();
// Ok if `get` is not par as should happen only twice
let prev_txid = vecs
@@ -608,8 +613,6 @@ impl Indexer {
dbg!(txindex, len);
})?;
// #[allow(clippy::redundant_locals)]
// let prev_txid = prev_txid;
let prev_txid = prev_txid.as_ref();
// If another Txid needs to be added to the list

View File

@@ -5,7 +5,7 @@ use std::{
path::{Path, PathBuf},
};
use brk_vec::{StoredIndex, StoredType, Version};
use brk_vec::{Compressed, StoredIndex, StoredType, Version};
use super::Height;
@@ -20,10 +20,10 @@ where
I: StoredIndex,
T: StoredType,
{
pub fn import(path: &Path, version: Version) -> brk_vec::Result<Self> {
let mut vec = brk_vec::StorableVec::forced_import(path, version)?;
pub fn import(path: &Path, version: Version, compressed: Compressed) -> brk_vec::Result<Self> {
let mut vec = brk_vec::StorableVec::forced_import(path, version, compressed)?;
vec.reset_mmaps()?;
vec.init_big_cache()?;
Ok(Self {
height: Height::try_from(Self::path_height_(path).as_path()).ok(),
@@ -31,11 +31,12 @@ where
})
}
pub fn truncate_if_needed(&mut self, index: I, height: Height) -> brk_vec::Result<Option<T>> {
pub fn truncate_if_needed(&mut self, index: I, height: Height) -> brk_vec::Result<()> {
if self.height.is_none_or(|self_height| self_height != height) {
height.write(&self.path_height())?;
}
self.vec.truncate_if_needed(index)
self.vec.truncate_if_needed(index)?;
Ok(())
}
pub fn height(&self) -> brk_core::Result<Height> {
@@ -51,7 +52,7 @@ where
pub fn flush(&mut self, height: Height) -> io::Result<()> {
height.write(&self.path_height())?;
self.vec.flush()?;
self.vec.reset_mmaps()
self.vec.init_big_cache()
}
}

View File

@@ -7,7 +7,7 @@ use brk_core::{
P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, Sats,
Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight,
};
use brk_vec::{AnyStorableVec, Version};
use brk_vec::{AnyStorableVec, Compressed, Version};
use rayon::prelude::*;
use crate::Indexes;
@@ -71,168 +71,217 @@ impl Vecs {
addressindex_to_addresstype: StorableVec::import(
&path.join("addressindex_to_addresstype"),
Version::from(1),
Compressed::YES,
)?,
addressindex_to_addresstypeindex: StorableVec::import(
&path.join("addressindex_to_addresstypeindex"),
Version::from(1),
Compressed::YES,
)?,
addressindex_to_height: StorableVec::import(
&path.join("addressindex_to_height"),
Version::from(1),
Compressed::YES,
)?,
height_to_blockhash: StorableVec::import(
&path.join("height_to_blockhash"),
Version::from(1),
Compressed::NO,
)?,
height_to_difficulty: StorableVec::import(
&path.join("height_to_difficulty"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_addressindex: StorableVec::import(
&path.join("height_to_first_addressindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_emptyindex: StorableVec::import(
&path.join("height_to_first_emptyindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_multisigindex: StorableVec::import(
&path.join("height_to_first_multisigindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_opreturnindex: StorableVec::import(
&path.join("height_to_first_opreturnindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_pushonlyindex: StorableVec::import(
&path.join("height_to_first_pushonlyindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_txindex: StorableVec::import(
&path.join("height_to_first_txindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_txinindex: StorableVec::import(
&path.join("height_to_first_txinindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_txoutindex: StorableVec::import(
&path.join("height_to_first_txoutindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_unknownindex: StorableVec::import(
&path.join("height_to_first_unkownindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_p2pk33index: StorableVec::import(
&path.join("height_to_first_p2pk33index"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_p2pk65index: StorableVec::import(
&path.join("height_to_first_p2pk65index"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_p2pkhindex: StorableVec::import(
&path.join("height_to_first_p2pkhindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_p2shindex: StorableVec::import(
&path.join("height_to_first_p2shindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_p2trindex: StorableVec::import(
&path.join("height_to_first_p2trindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_p2wpkhindex: StorableVec::import(
&path.join("height_to_first_p2wpkhindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_first_p2wshindex: StorableVec::import(
&path.join("height_to_first_p2wshindex"),
Version::from(1),
Compressed::YES,
)?,
height_to_size: StorableVec::import(
&path.join("height_to_size"),
Version::from(1),
Compressed::YES,
)?,
height_to_size: StorableVec::import(&path.join("height_to_size"), Version::from(1))?,
height_to_timestamp: StorableVec::import(
&path.join("height_to_timestamp"),
Version::from(1),
Compressed::YES,
)?,
height_to_weight: StorableVec::import(
&path.join("height_to_weight"),
Version::from(1),
Compressed::YES,
)?,
p2pk33index_to_p2pk33addressbytes: StorableVec::import(
&path.join("p2pk33index_to_p2pk33addressbytes"),
Version::from(1),
Compressed::NO,
)?,
p2pk65index_to_p2pk65addressbytes: StorableVec::import(
&path.join("p2pk65index_to_p2pk65addressbytes"),
Version::from(1),
Compressed::NO,
)?,
p2pkhindex_to_p2pkhaddressbytes: StorableVec::import(
&path.join("p2pkhindex_to_p2pkhaddressbytes"),
Version::from(1),
Compressed::NO,
)?,
p2shindex_to_p2shaddressbytes: StorableVec::import(
&path.join("p2shindex_to_p2shaddressbytes"),
Version::from(1),
Compressed::NO,
)?,
p2trindex_to_p2traddressbytes: StorableVec::import(
&path.join("p2trindex_to_p2traddressbytes"),
Version::from(1),
Compressed::NO,
)?,
p2wpkhindex_to_p2wpkhaddressbytes: StorableVec::import(
&path.join("p2wpkhindex_to_p2wpkhaddressbytes"),
Version::from(1),
Compressed::NO,
)?,
p2wshindex_to_p2wshaddressbytes: StorableVec::import(
&path.join("p2wshindex_to_p2wshaddressbytes"),
Version::from(1),
Compressed::NO,
)?,
txindex_to_first_txinindex: StorableVec::import(
&path.join("txindex_to_first_txinindex"),
Version::from(1),
Compressed::YES,
)?,
txindex_to_first_txoutindex: StorableVec::import(
&path.join("txindex_to_first_txoutindex"),
Version::from(1),
Compressed::NO,
)?,
txindex_to_height: StorableVec::import(
&path.join("txindex_to_height"),
Version::from(1),
Compressed::YES,
)?,
txindex_to_locktime: StorableVec::import(
&path.join("txindex_to_locktime"),
Version::from(1),
Compressed::YES,
)?,
txindex_to_txid: StorableVec::import(
&path.join("txindex_to_txid"),
Version::from(1),
Compressed::NO,
)?,
txindex_to_txid: StorableVec::import(&path.join("txindex_to_txid"), Version::from(1))?,
txindex_to_base_size: StorableVec::import(
&path.join("txindex_to_base_size"),
Version::from(1),
Compressed::YES,
)?,
txindex_to_total_size: StorableVec::import(
&path.join("txindex_to_total_size"),
Version::from(1),
Compressed::YES,
)?,
txindex_to_is_explicitly_rbf: StorableVec::import(
&path.join("txindex_to_is_explicitly_rbf"),
Version::from(1),
Compressed::YES,
)?,
txindex_to_txversion: StorableVec::import(
&path.join("txindex_to_txversion"),
Version::from(1),
Compressed::YES,
)?,
txinindex_to_txoutindex: StorableVec::import(
&path.join("txinindex_to_txoutindex"),
Version::from(1),
Compressed::YES,
)?,
txoutindex_to_addressindex: StorableVec::import(
&path.join("txoutindex_to_addressindex"),
Version::from(1),
Compressed::YES,
)?,
txoutindex_to_value: StorableVec::import(
&path.join("txoutindex_to_value"),
Version::from(1),
Compressed::YES,
)?,
})
}