diff --git a/CHANGELOG.md b/CHANGELOG.md index c5456c4da..ae5122317 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ - Created separate crate for indexing called `bindex` - Created a crate a storage engine specialized in storing datasets that have indexes as keys and thus can be represented by an array/vec called `storable-vec` - Removed the need for the `-txindex=1` parameter when starting your Bitcoin Core node as kibō has its own indexes now -- Tried different storage engines such as `fjall`, `canopydb` and `heed`, the first ended up being 3 times slower than `sanakirja` and the rest wouldn't play nice with `rayon` which is a dealbreaker +- Tried different storage engines such as `fjall`, `canopydb` and `heed`, the first ended up being 3 times slower than `sanakirja` and had very fragile files (just looking at them would corrupt them), and the rest wouldn't play nice with `rayon` which is a dealbreaker - `snkrj` added a robust auto defragmentation to improve disk usage without the need for user's intervention ## Git diff --git a/Cargo.lock b/Cargo.lock index b4de7143a..534b8a9e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,10 +71,10 @@ name = "bindex" version = "0.1.0" dependencies = [ "bitcoin_hashes 0.16.0", - "biter 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "biter", "color-eyre", - "ctrlc", "derive_deref", + "exit", "jiff", "rayon", "snkrj", @@ -197,21 +197,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "biter" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02527a2b7944d9edc90db0a73339c57cd521f9623f99416574d1943312c9d290" -dependencies = [ - "bitcoin", - "bitcoincore-rpc", - "crossbeam", - "derived-deref", - "rayon", - "serde", - "serde_json", -] - [[package]] name = "bitflags" version = "1.3.2" @@ -220,9 +205,13 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1be3f42a67d6d345ecd59f675f3f012d6974981560836e938c22b424b85ce1be" +checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" + +[[package]] +name = "bomputer" +version = "0.1.0" [[package]] name = "byteorder" @@ -372,6 +361,13 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "exit" +version = "0.1.0" +dependencies = [ + "ctrlc", +] + [[package]] name = "eyre" version = "0.6.12" @@ -564,7 +560,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags 2.7.0", + "bitflags 2.8.0", "cfg-if", "cfg_aliases", "libc", diff --git a/Cargo.toml b/Cargo.toml index 1a3669c99..95fb25da8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ # edition = "2021" [workspace] -members = ["bindex", "biter", "iterable", "snkrj", "storable_vec"] +members = ["bindex", "biter", "bomputer", "exit", "iterable", "snkrj", "storable_vec"] resolver = "2" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/bindex/.gitignore b/bindex/.gitignore index e2647cd44..752861df6 100644 --- a/bindex/.gitignore +++ b/bindex/.gitignore @@ -1 +1,2 @@ /database +/indexes diff --git a/bindex/Cargo.toml b/bindex/Cargo.toml index 4b86ae73d..549182e38 100644 --- a/bindex/Cargo.toml +++ b/bindex/Cargo.toml @@ -5,10 +5,10 @@ edition = "2021" [dependencies] bitcoin_hashes = "0.16.0" -biter = "0.2.2" +biter = { path = "../biter" } color-eyre = "0.6.3" -ctrlc = "3.4.5" derive_deref = "1.1.1" +exit = { path = "../exit" } jiff = "0.1.24" rayon = "1.10.0" snkrj = { path = "../snkrj" } diff --git a/bindex/src/indexer.rs b/bindex/src/indexer.rs new file mode 100644 index 000000000..38b8591fe --- /dev/null +++ b/bindex/src/indexer.rs @@ -0,0 +1,672 @@ +use std::{ + collections::BTreeMap, + io::{Read, Write}, + path::Path, + str::FromStr, + thread::{self}, +}; + +use biter::{ + bitcoin::{Transaction, TxIn, TxOut, Txid}, + rpc, +}; +use exit::Exit; + +use crate::storage::{Stores, Vecs}; +use crate::structs::{ + Addressbytes, AddressbytesPrefix, Addressindex, Addresstype, Amount, BlockHashPrefix, Height, Timestamp, + TxidPrefix, Txindex, Txoutindex, Vout, +}; +use color_eyre::eyre::{eyre, ContextCompat}; +use rayon::prelude::*; + +#[derive(Debug)] +enum TxInOrAddressindextoutindex<'a> { + TxIn(&'a TxIn), + AddressTxTxoutIndexes((Addressindex, Txindex, Txoutindex)), +} + +const UNSAFE_BLOCKS: u32 = 100; +const DAILY_BLOCK_TARGET: usize = 144; +const SNAPSHOT_BLOCK_RANGE: usize = DAILY_BLOCK_TARGET * 10; + +#[derive(Debug, Default)] +pub struct Indexer { + // +} + +impl Indexer { + pub fn index(indexes_dir: &Path, bitcoin_dir: &Path, rpc: rpc::Client, exit: Exit) -> color_eyre::Result<()> { + let check_collisions = true; + + let mut vecs = Vecs::import(&indexes_dir.join("vecs"))?; + + let open_stores = || Stores::open(&indexes_dir.join("stores")); + let stores = open_stores()?; + + let mut height = vecs + .min_height() + .unwrap_or_default() + .min(stores.min_height()) + .and_then(|h| h.checked_sub(UNSAFE_BLOCKS)) + .map(Height::from) + .unwrap_or_default(); + // let mut height = Height::default(); + + let mut txindex_global = vecs.height_to_first_txindex.get_or_default(height)?; + let mut txoutindex_global = vecs.height_to_first_txoutindex.get_or_default(height)?; + let mut addressindex_global = vecs.height_to_first_addressindex.get_or_default(height)?; + let mut emptyindex_global = vecs.height_to_first_emptyindex.get_or_default(height)?; + let mut multisigindex_global = vecs.height_to_first_emptyindex.get_or_default(height)?; + let mut opreturnindex_global = vecs.height_to_first_emptyindex.get_or_default(height)?; + let mut pushonlyindex_global = vecs.height_to_first_emptyindex.get_or_default(height)?; + let mut unknownindex_global = vecs.height_to_first_emptyindex.get_or_default(height)?; + let mut p2pk33index_global = vecs.height_to_p2pk33index.get_or_default(height)?; + let mut p2pk65index_global = vecs.height_to_p2pk65index.get_or_default(height)?; + let mut p2pkhindex_global = vecs.height_to_p2pkhindex.get_or_default(height)?; + let mut p2shindex_global = vecs.height_to_p2shindex.get_or_default(height)?; + let mut p2trindex_global = vecs.height_to_p2trindex.get_or_default(height)?; + let mut p2wpkhindex_global = vecs.height_to_p2wpkhindex.get_or_default(height)?; + let mut p2wshindex_global = vecs.height_to_p2wshindex.get_or_default(height)?; + + let export = |stores: Stores, vecs: &mut Vecs, height: Height| -> color_eyre::Result<()> { + println!("Exporting..."); + + exit.block(); + // At 401760 + // Memory: 1.87 GB + // Real Memory: 13.46 GB + // if height >= Height::from(400_000_u32) { + // pause(); + // } + println!("Flushing vecs..."); + + thread::scope(|scope| -> color_eyre::Result<()> { + let vecs_handle = scope.spawn(|| vecs.flush(height)); + let stores_handle = scope.spawn(|| stores.export(height)); + vecs_handle.join().unwrap()?; + stores_handle.join().unwrap()?; + Ok(()) + })?; + + // At 401760 + // Memory: 1.83 GB + // Real Memory: 9.45 GB + // if height >= Height::from(400_000_u32) { + // pause(); + // } + + // At: 401760 + // Memory: 1.34 GB + // Real Memory: 1.52 GB + println!("All done..."); + // if height >= Height::from(400_000_u32) { + // pause(); + // } + exit.unblock(); + Ok(()) + }; + + let mut stores_opt = Some(stores); + + biter::new(bitcoin_dir, Some(height.into()), None, rpc) + .iter() + .try_for_each(|(_height, block, blockhash)| -> color_eyre::Result<()> { + println!("Processing block {_height}..."); + + height = Height::from(_height); + let timestamp = Timestamp::try_from(block.header.time)?; + + let mut stores = stores_opt.take().context("option should have wtx")?; + + if let Some(saved_blockhash) = vecs.height_to_blockhash.get(height)? { + if &blockhash != saved_blockhash.as_ref() { + todo!("Rollback not implemented"); + // parts.rollback_from(&mut wtx, height, &exit)?; + } + } + + let blockhash_prefix = BlockHashPrefix::try_from(&blockhash)?; + + if stores + .blockhash_prefix_to_height + .get(&blockhash_prefix)? + .is_some_and(|prev_height| *prev_height != height) + { + dbg!(blockhash); + return Err(eyre!("Collision, expect prefix to need be set yet")); + } + + stores + .blockhash_prefix_to_height + .insert_if_needed(blockhash_prefix, height, height); + + vecs.height_to_blockhash.push_if_needed(height, blockhash)?; + vecs.height_to_timestamp.push_if_needed(height, timestamp)?; + vecs.height_to_size.push_if_needed(height, block.total_size())?; + vecs.height_to_weight.push_if_needed(height, block.weight())?; + vecs.height_to_first_txindex.push_if_needed(height, txindex_global)?; + vecs.height_to_first_txoutindex + .push_if_needed(height, txoutindex_global)?; + vecs.height_to_first_addressindex + .push_if_needed(height, addressindex_global)?; + vecs.height_to_first_emptyindex + .push_if_needed(height, emptyindex_global)?; + vecs.height_to_first_multisigindex + .push_if_needed(height, multisigindex_global)?; + vecs.height_to_first_opreturnindex + .push_if_needed(height, opreturnindex_global)?; + vecs.height_to_first_pushonlyindex + .push_if_needed(height, pushonlyindex_global)?; + vecs.height_to_first_unknownindex + .push_if_needed(height, unknownindex_global)?; + vecs.height_to_p2pk33index.push_if_needed(height, p2pk33index_global)?; + vecs.height_to_p2pk65index.push_if_needed(height, p2pk65index_global)?; + vecs.height_to_p2pkhindex.push_if_needed(height, p2pkhindex_global)?; + vecs.height_to_p2shindex.push_if_needed(height, p2shindex_global)?; + vecs.height_to_p2trindex.push_if_needed(height, p2trindex_global)?; + vecs.height_to_p2wpkhindex.push_if_needed(height, p2wpkhindex_global)?; + vecs.height_to_p2wshindex.push_if_needed(height, p2wshindex_global)?; + + let outputs = block + .txdata + .iter() + .enumerate() + .flat_map(|(index, tx)| { + tx.output + .iter() + .enumerate() + .map(move |(vout, txout)| (Txindex::from(index), Vout::from(vout), txout, tx)) + }) + .collect::>(); + + let tx_len = block.txdata.len(); + let outputs_len = outputs.len(); + + let ( + txid_prefix_to_txid_and_block_txindex_and_prev_txindex_join_handle, + txin_or_addressindextxoutindex_vec_handle, + txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle, + ) = thread::scope(|scope| { + let txid_prefix_to_txid_and_block_txindex_and_prev_txindex_handle = + scope.spawn(|| -> color_eyre::Result<_> { + block + .txdata + .par_iter() + .enumerate() + .map(|(index, tx)| -> color_eyre::Result<_> { + let txid = tx.compute_txid(); + + let txid_prefix = TxidPrefix::try_from(&txid)?; + + let prev_txindex_slice_opt = + if check_collisions && stores.txid_prefix_to_txindex.needs(height) { + // Should only find collisions for two txids (duplicates), see below + stores.txid_prefix_to_txindex.get(&txid_prefix)?.cloned() + } else { + None + }; + + Ok((txid_prefix, (tx, txid, Txindex::from(index), prev_txindex_slice_opt))) + }) + .try_fold(BTreeMap::new, |mut map, tuple| { + let (key, value) = tuple?; + map.insert(key, value); + Ok(map) + }) + .try_reduce(BTreeMap::new, |mut map, mut map2| { + if map.len() > map2.len() { + map.append(&mut map2); + Ok(map) + } else { + map2.append(&mut map); + Ok(map2) + } + }) + }); + + let txin_or_addressindextxoutindex_vec_handle = + scope.spawn(|| -> color_eyre::Result> { + block + .txdata + .par_iter() + .filter(|tx| !tx.is_coinbase()) + .flat_map(|tx| &tx.input) + .map(|txin| -> color_eyre::Result<_> { + let outpoint = txin.previous_output; + let txid = outpoint.txid; + let vout = Vout::from(outpoint.vout); + + let txindex = if let Some(txindex) = stores + .txid_prefix_to_txindex + .get(&TxidPrefix::try_from(&txid)?)? + .and_then(|txindex| { + // Checking if not finding txindex from the future + (txindex < &txindex_global).then_some(txindex) + }) { + *txindex + } else { + return Ok(TxInOrAddressindextoutindex::TxIn(txin)); + }; + + let txoutindex = *vecs + .txindex_to_first_txoutindex + .get(txindex)? + .context("Expect txoutindex to not be none") + .inspect_err(|_| { + dbg!(outpoint.txid, txindex, vout); + })? + + vout; + + let addressindex = *vecs + .txoutindex_to_addressindex + .get(txoutindex)? + .context("Expect addressindex to not be none") + .inspect_err(|_| { + // let height = vecdisks.txindex_to_height.get(txindex.into()).expect("txindex_to_height get not fail") + // .expect("Expect height for txindex"); + dbg!(outpoint.txid, txindex, vout, txoutindex); + })?; + + Ok(TxInOrAddressindextoutindex::AddressTxTxoutIndexes(( + addressindex, + txindex, + txoutindex, + ))) + }) + .try_fold(Vec::new, |mut vec, res| { + vec.push(res?); + Ok(vec) + }) + .try_reduce(Vec::new, |mut v, mut v2| { + if v.len() > v2.len() { + v.append(&mut v2); + Ok(v) + } else { + v2.append(&mut v); + Ok(v2) + } + }) + }); + + let txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle = scope.spawn(|| { + outputs + .into_par_iter() + .enumerate() + .map( + #[allow(clippy::type_complexity)] + |(block_txoutindex, (block_txindex, vout, txout, tx))| -> color_eyre::Result<( + Txoutindex, + ( + &TxOut, + Txindex, + Vout, + Addresstype, + color_eyre::Result, + Option, + &Transaction, + ), + )> { + let txindex = txindex_global + block_txindex; + let txoutindex = txoutindex_global + Txoutindex::from(block_txoutindex); + + let script = &txout.script_pubkey; + + let addresstype = Addresstype::from(script); + + let addressbytes_res = + Addressbytes::try_from((script, addresstype)).inspect_err(|_| { + // dbg!(&txout, height, txi, &tx.compute_txid()); + }); + + let addressindex_opt = addressbytes_res.as_ref().ok().and_then(|addressbytes| { + stores + .addressbytes_prefix_to_addressindex + .get(&AddressbytesPrefix::from((addressbytes, addresstype))) + .unwrap() + .cloned() + // Checking if not in the future + .and_then(|addressindex_local| { + (addressindex_local < addressindex_global).then_some(addressindex_local) + }) + }); // OK + + if let Some(Some(addressindex)) = check_collisions.then_some(addressindex_opt) { + let addressbytes = addressbytes_res.as_ref().unwrap(); + + let prev_addresstype = *vecs + .addressindex_to_addresstype + .get(addressindex)? + .context("Expect to have address type")?; + + let addresstypeindex = *vecs + .addressindex_to_addresstypeindex + .get(addressindex)? + .context("Expect to have address type index")?; + // Good first time + // Wrong after rerun + + let prev_addressbytes_opt = + vecs.get_addressbytes(prev_addresstype, addresstypeindex)?; + + let prev_addressbytes = + prev_addressbytes_opt.as_ref().context("Expect to have addressbytes")?; + + if (vecs.addressindex_to_addresstype.hasnt(addressindex) + && addresstype != prev_addresstype) + || (stores.addressbytes_prefix_to_addressindex.needs(height) + && prev_addressbytes != addressbytes) + { + let txid = tx.compute_txid(); + dbg!( + _height, + txid, + vout, + block_txindex, + addresstype, + prev_addresstype, + prev_addressbytes, + addressbytes, + addressindex_global, + addressindex, + addresstypeindex, + txout, + AddressbytesPrefix::from((addressbytes, addresstype)), + AddressbytesPrefix::from((prev_addressbytes, prev_addresstype)) + ); + panic!() + } + } + + Ok(( + txoutindex, + ( + txout, + txindex, + vout, + addresstype, + addressbytes_res, + addressindex_opt, + tx, + ), + )) + }, + ) + .try_fold(BTreeMap::new, |mut map, tuple| -> color_eyre::Result<_> { + let (key, value) = tuple?; + map.insert(key, value); + Ok(map) + }) + .try_reduce(BTreeMap::new, |mut map, mut map2| { + if map.len() > map2.len() { + map.append(&mut map2); + Ok(map) + } else { + map2.append(&mut map); + Ok(map2) + } + }) + }); + + ( + txid_prefix_to_txid_and_block_txindex_and_prev_txindex_handle.join(), + txin_or_addressindextxoutindex_vec_handle.join(), + txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle.join(), + ) + }); + + let txid_prefix_to_txid_and_block_txindex_and_prev_txindex = + txid_prefix_to_txid_and_block_txindex_and_prev_txindex_join_handle + .ok() + .context( + "Expect txid_prefix_to_txid_and_block_txindex_and_prev_txindex_join_handle to join", + )??; + + let txin_or_addressindextxoutindex_vec = txin_or_addressindextxoutindex_vec_handle + .ok() + .context("Export txin_or_addressindextxoutindex_vec_handle to join")??; + + let txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt = + txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle + .ok() + .context( + "Expect txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle to join", + )??; + + let mut new_txindexvout_to_addressindextxoutindex: BTreeMap< + (Txindex, Vout), + (Addressindex, Txoutindex), + > = BTreeMap::new(); + + let mut already_added_addressbytes_prefix: BTreeMap = BTreeMap::new(); + + txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt + .into_iter() + .try_for_each( + |( + txoutindex, + (txout, txindex, vout, addresstype, addressbytes_res, addressindex_opt, _tx), + )| + -> color_eyre::Result<()> { + let amount = Amount::from(txout.value); + + if vout.is_zero() { + vecs.txindex_to_first_txoutindex.push_if_needed(txindex, txoutindex)?; + } + + vecs.txoutindex_to_amount.push_if_needed(txoutindex, amount)?; + + let mut addressindex = addressindex_global; + + let mut addressbytes_prefix = None; + + if let Some(addressindex_local) = addressindex_opt.or_else(|| { + addressbytes_res.as_ref().ok().and_then(|addressbytes| { + // Check if address was first seen before in this iterator + // Example: https://mempool.space/address/046a0765b5865641ce08dd39690aade26dfbf5511430ca428a3089261361cef170e3929a68aee3d8d4848b0c5111b0a37b82b86ad559fd2a745b44d8e8d9dfdc0c + addressbytes_prefix.replace(AddressbytesPrefix::from((addressbytes, addresstype))); + already_added_addressbytes_prefix + .get(addressbytes_prefix.as_ref().unwrap()) + .cloned() + }) + }) { + addressindex = addressindex_local; + } else { + addressindex_global.increment(); + + let addresstypeindex = match addresstype { + Addresstype::Empty => emptyindex_global.clone_then_increment(), + Addresstype::Multisig => multisigindex_global.clone_then_increment(), + Addresstype::OpReturn => opreturnindex_global.clone_then_increment(), + Addresstype::PushOnly => pushonlyindex_global.clone_then_increment(), + Addresstype::Unknown => unknownindex_global.clone_then_increment(), + Addresstype::P2PK65 => p2pk65index_global.clone_then_increment(), + Addresstype::P2PK33 => p2pk33index_global.clone_then_increment(), + Addresstype::P2PKH => p2pkhindex_global.clone_then_increment(), + Addresstype::P2SH => p2shindex_global.clone_then_increment(), + Addresstype::P2WPKH => p2wpkhindex_global.clone_then_increment(), + Addresstype::P2WSH => p2wshindex_global.clone_then_increment(), + Addresstype::P2TR => p2trindex_global.clone_then_increment(), + }; + + vecs.addressindex_to_addresstype + .push_if_needed(addressindex, addresstype)?; + + vecs.addressindex_to_addresstypeindex + .push_if_needed(addressindex, addresstypeindex)?; + + vecs.addressindex_to_height + .push_if_needed(addressindex, height)?; + + if let Ok(addressbytes) = addressbytes_res { + let addressbytes_prefix = addressbytes_prefix.unwrap(); + + already_added_addressbytes_prefix + .insert(addressbytes_prefix.clone(), addressindex); + + stores.addressbytes_prefix_to_addressindex.insert_if_needed( + addressbytes_prefix, + addressindex, + height, + ); + + vecs.push_addressbytes_if_needed(addresstypeindex, addressbytes)?; + } + } + + new_txindexvout_to_addressindextxoutindex + .insert((txindex, vout), (addressindex, txoutindex)); + + vecs.txoutindex_to_addressindex + .push_if_needed(txoutindex, addressindex)?; + + stores + .addressindex_to_txoutindex_in + .insert_if_needed(addressindex, txoutindex, height); + + Ok(()) + }, + )?; + + drop(already_added_addressbytes_prefix); + + txin_or_addressindextxoutindex_vec + .into_iter() + .map( + |txin_or_addressindextxoutindex| -> color_eyre::Result<(Addressindex, Txindex, Txoutindex)> { + match txin_or_addressindextxoutindex { + TxInOrAddressindextoutindex::AddressTxTxoutIndexes(triplet) => Ok(triplet), + TxInOrAddressindextoutindex::TxIn(txin) => { + let outpoint = txin.previous_output; + let txid = outpoint.txid; + let vout = Vout::from(outpoint.vout); + let index = txid_prefix_to_txid_and_block_txindex_and_prev_txindex + .get(&TxidPrefix::try_from(&txid)?) + .context("txid should be in same block")? + .2; + let txindex = txindex_global + index; + + let (addressindex, txoutindex) = new_txindexvout_to_addressindextxoutindex + .remove(&(txindex, vout)) + .context("should have found addressindex from same block") + .inspect_err(|_| { + dbg!(&new_txindexvout_to_addressindextxoutindex, txin, txindex, vout, txid); + })?; + + Ok((addressindex, txindex, txoutindex)) + } + } + }, + ) + .try_for_each(|res| -> color_eyre::Result<()> { + let (addressindex, txindex, txoutindex) = res?; + stores + .addressindex_to_txoutindex_out + .insert_if_needed(addressindex, txoutindex, height); + stores + .txindex_to_txoutindex_in + .insert_if_needed(txindex, txoutindex, height); + Ok(()) + })?; + + drop(new_txindexvout_to_addressindextxoutindex); + + let mut txindex_to_tx_and_txid: BTreeMap = BTreeMap::default(); + + txid_prefix_to_txid_and_block_txindex_and_prev_txindex + .into_iter() + .try_for_each( + |(txid_prefix, (tx, txid, index, prev_txindex_opt))| -> color_eyre::Result<()> { + let txindex = txindex_global + index; + + txindex_to_tx_and_txid.insert(txindex, (tx, txid)); + + match prev_txindex_opt { + None => { + stores + .txid_prefix_to_txindex + .insert_if_needed(txid_prefix, txindex, height); + } + Some(prev_txindex) => { + // In case if we start at an already parsed height + if txindex == prev_txindex { + return Ok(()); + } + + let len = vecs.txindex_to_txid.len(); + // Ok if `get` is not par as should happen only twice + let prev_txid = vecs + .txindex_to_txid + .get(prev_txindex)? + .context("To have txid for txindex") + .inspect_err(|_| { + dbg!(txindex, txid, len); + })?; + + // #[allow(clippy::redundant_locals)] + // let prev_txid = prev_txid; + let prev_txid = prev_txid.as_ref(); + + // If another Txid needs to be added to the list + // We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated + let only_known_dup_txids = [ + Txid::from_str( + "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599", + )?, + Txid::from_str( + "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468", + )?, + ]; + + let is_dup = only_known_dup_txids.contains(prev_txid); + + if !is_dup { + let prev_height = + vecs.txindex_to_height.get(prev_txindex)?.expect("To have height"); + dbg!(height, txid, txindex, prev_height, prev_txid, prev_txindex); + return Err(eyre!("Expect none")); + } + } + } + + Ok(()) + }, + )?; + + txindex_to_tx_and_txid + .into_iter() + .try_for_each(|(txindex, (tx, txid))| -> color_eyre::Result<()> { + vecs.txindex_to_txversion.push_if_needed(txindex, tx.version)?; + vecs.txindex_to_txid.push_if_needed(txindex, txid)?; + vecs.txindex_to_height.push_if_needed(txindex, height)?; + vecs.txindex_to_locktime.push_if_needed(txindex, tx.lock_time)?; + Ok(()) + })?; + + txindex_global += Txindex::from(tx_len); + txoutindex_global += Txoutindex::from(outputs_len); + + let should_snapshot = _height != 0 && _height % SNAPSHOT_BLOCK_RANGE == 0 && !exit.active(); + if should_snapshot { + export(stores, &mut vecs, height)?; + stores_opt.replace(open_stores()?); + } else { + stores_opt.replace(stores); + } + + Ok(()) + })?; + + export(stores_opt.take().context("option should have wtx")?, &mut vecs, height)?; + + Ok(()) + } +} + +fn pause() { + let mut stdin = std::io::stdin(); + let mut stdout = std::io::stdout(); + write!(stdout, "Press any key to continue...").unwrap(); + stdout.flush().unwrap(); + let _ = stdin.read(&mut [0u8]).unwrap(); +} diff --git a/bindex/src/main.rs b/bindex/src/main.rs index 30f1e1de6..bd571c95b 100644 --- a/bindex/src/main.rs +++ b/bindex/src/main.rs @@ -1,591 +1,32 @@ -use std::{ - collections::BTreeMap, - io::{Read, Write}, - path::Path, - str::FromStr, - thread::{self}, -}; +use std::path::Path; -use biter::{ - bitcoin::{Transaction, TxIn, TxOut, Txid}, - bitcoincore_rpc::{Auth, Client}, -}; +use biter::rpc; +mod indexer; +mod storage; mod structs; -use color_eyre::eyre::{eyre, ContextCompat}; -use rayon::prelude::*; -use structs::{ - Addressbytes, AddressbytesPrefix, Addressindex, Addressindextxoutindex, Addresstype, Addresstypeindex, Amount, - BlockHashPrefix, Date, Exit, Height, Stores, Timestamp, TxidPrefix, Txindex, Txindexvout, Txoutindex, Vecs, -}; +use exit::Exit; +use indexer::Indexer; +use structs::{AddressbytesPrefix, Addressindex, BlockHashPrefix, Height, TxidPrefix, Txindex, Txoutindex}; // https://github.com/romanz/electrs/blob/master/doc/schema.md -#[derive(Debug)] -enum TxInOrAddressindextoutindex<'a> { - TxIn(&'a TxIn), - Addressindextoutindex(Addressindextxoutindex), -} - -const UNSAFE_BLOCKS: u32 = 100; -const DAILY_BLOCK_TARGET: usize = 144; -const SNAPSHOT_BLOCK_RANGE: usize = DAILY_BLOCK_TARGET * 10; - fn main() -> color_eyre::Result<()> { color_eyre::install()?; - let i = std::time::Instant::now(); - - let check_collisions = true; - let data_dir = Path::new("../../bitcoin"); - let cookie = Path::new(data_dir).join(".cookie"); - let rpc = Client::new("http://localhost:8332", Auth::CookieFile(cookie))?; - + let rpc = rpc::Client::new( + "http://localhost:8332", + rpc::Auth::CookieFile(Path::new(data_dir).join(".cookie")), + )?; let exit = Exit::new(); - let path_database = Path::new("./database"); - let path_stores = path_database.join("stores"); + let i = std::time::Instant::now(); - let stores = Stores::open(&path_stores)?; - - let mut vecs = Vecs::import(&path_database.join("vecs"))?; - - let mut height = vecs - .min_height() - .unwrap_or_default() - .min(stores.min_height()) - .and_then(|h| h.checked_sub(UNSAFE_BLOCKS)) - .map(Height::from) - .unwrap_or_default(); - - let mut txindex = vecs - .height_to_first_txindex - .get(height)? - .map(|v| *v) - .unwrap_or(Txindex::default()); - - let mut txoutindex = vecs - .height_to_first_txoutindex - .get(height)? - .map(|v| *v) - .unwrap_or(Txoutindex::default()); - - let mut addressindex = vecs - .height_to_first_addressindex - .get(height)? - .map(|v| *v) - .unwrap_or(Addressindex::default()); - - let export = |stores: Stores, vecs: &mut Vecs, height: Height| -> color_eyre::Result<()> { - exit.block(); - println!("Exporting..."); - // Memory: 3.76 GB - // Real Memory: 22.47 GB - // Private Memory: 12.44 GB - // if height > Height::from(400_000_u32) { - // pause(); - // } - // vecs.reset_cache(); - // At: 403200 - // Memory: 3.78 GB - // Real Memory: 12.65 GB - // Private Memory: 11.39 GB - // if height > Height::from(400_000_u32) { - // pause(); - // } - vecs.flush(height)?; - // At: 403200 - // Memory: 3.79 GB - // Real Memory: 12.37 GB - // Private Memory: 10.95 GB - // Gone up wtf - // if height > Height::from(400_000_u32) { - // pause(); - // } - stores.export(height); - println!("Export done"); - // At: 403200 - // Memory: 2.23 GB - // Real Memory: 1.05 GB - // Private Memory: 0.109 GB - // if height > Height::from(400_000_u32) { - // pause(); - // } - exit.unblock(); - Ok(()) - }; - - let mut stores_opt = Some(stores); - - biter::new(data_dir, Some(height.into()), Some(400_000), rpc) - .iter() - .try_for_each(|(_height, block, blockhash)| -> color_eyre::Result<()> { - println!("Processing block {_height}..."); - - height = Height::from(_height); - let timestamp = Timestamp::try_from(block.header.time)?; - let date = Date::from(×tamp); - - let mut stores = stores_opt.take().context("option should have wtx")?; - - if let Some(saved_blockhash) = vecs.height_to_blockhash.get(height)? { - // if &blockhash != saved_blockhash { - if &blockhash != saved_blockhash.as_ref() { - todo!("Rollback not implemented"); - // parts.rollback_from(&mut wtx, height, &exit)?; - } - } - - if stores.blockhash_prefix_to_height.needs(height) { - let blockhash_prefix = BlockHashPrefix::try_from(&blockhash)?; - - if check_collisions { - if let Some(prev_height) = - stores.blockhash_prefix_to_height.get(&blockhash_prefix) - { - dbg!(blockhash, prev_height); - return Err(eyre!("Collision, expect prefix to need be set yet")); - } - } - - stores.blockhash_prefix_to_height.insert(blockhash_prefix,height); - } - - vecs.height_to_blockhash.push_if_needed(height, blockhash)?; - vecs.height_to_first_txindex.push_if_needed(height, txindex)?; - vecs.height_to_first_txoutindex.push_if_needed(height, txoutindex)?; - vecs.height_to_first_addressindex.push_if_needed(height, addressindex)?; - vecs.height_to_timestamp.push_if_needed(height, timestamp)?; - vecs.height_to_date.push_if_needed(height, date)?; - - let outputs = block - .txdata - .iter() - .enumerate() - .flat_map(|(index, tx)| { - tx.output - .iter() - .enumerate() - .map(move |(vout, txout)| (Txindex::from(index), vout as u32, txout, tx)) - }).collect::>(); - - let tx_len = block.txdata.len(); - let outputs_len = outputs.len(); - - let (txid_prefix_to_txid_and_block_txindex_and_prev_txindex_join_handle, txin_or_addressindextxoutindex_vec_handle, txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle) = thread::scope(|scope| { - let txid_prefix_to_txid_and_block_txindex_and_prev_txindex_handle = scope.spawn(|| -> color_eyre::Result<_> { - block - .txdata - .par_iter() - .enumerate() - .map(|(index, tx)| -> color_eyre::Result<_> { - let txid = tx.compute_txid(); - - let txid_prefix = TxidPrefix::try_from(&txid)?; - - let prev_txindex_slice_opt = if check_collisions && stores.txid_prefix_to_txindex.needs(height) { - // Should only find collisions for two txids (duplicates), see below - stores.txid_prefix_to_txindex.get(&txid_prefix).cloned() - } else { - None - }; - - Ok((txid_prefix, (tx, txid, Txindex::from(index), prev_txindex_slice_opt))) - }) - .try_fold( - BTreeMap::new, - |mut map, tuple| { - let (key, value) = tuple?; - map.insert(key, value); - Ok(map) - }, - ) - .try_reduce(BTreeMap::new, |mut map, mut map2| { - if map.len() > map2.len() { - map.append(&mut map2); - Ok(map) - } else { - map2.append(&mut map); - Ok(map2) - } - })}); - - let txin_or_addressindextxoutindex_vec_handle = scope.spawn(|| -> color_eyre::Result> { - block - .txdata - .par_iter() - .filter(|tx| !tx.is_coinbase()) - .flat_map(|tx| &tx.input) - .map(|txin| -> color_eyre::Result<_> { - let outpoint = txin.previous_output; - let txid = outpoint.txid; - let vout = outpoint.vout; - - let txindex_local = if let Some(txindex_local) = stores.txid_prefix_to_txindex - .get(&TxidPrefix::try_from(&txid)?).and_then(|txindex_local| { - // Checking if not finding txindex from the future - (txindex_local < &txindex).then_some(txindex_local) - }) - { - *txindex_local - } else { - return Ok(TxInOrAddressindextoutindex::TxIn(txin)); - }; - - let txindexvout = Txindexvout::from((txindex_local, vout)); - - let txoutindex = - *stores.txindexvout_to_txoutindex.get(&txindexvout) - .context("Expect txoutindex to not be none") - .inspect_err(|_| { - dbg!(outpoint.txid, txindex_local, vout, txindexvout); - })?; - - let addressindex = *vecs.txoutindex_to_addressindex.get(txoutindex)? - .context("Expect addressindex to not be none") - .inspect_err(|_| { - // let height = vecdisks.txindex_to_height.get(txindex.into()).expect("txindex_to_height get not fail") - // .expect("Expect height for txindex"); - dbg!(outpoint.txid, txindex_local, vout, txindexvout); - })?; - - Ok(TxInOrAddressindextoutindex::Addressindextoutindex(Addressindextxoutindex::from(( - addressindex, - txoutindex, - )))) - }) - .try_fold( - Vec::new, - |mut vec, addressindextxoutindex| { - vec.push(addressindextxoutindex?); - Ok(vec) - }, - ) - .try_reduce(Vec::new, |mut v, mut v2| { - if v.len() > v2.len() { - v.append(&mut v2); - Ok(v) - } else { - v2.append(&mut v); - Ok(v2) - } - }) - }); - - let txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle = scope.spawn(|| { - outputs.into_par_iter().enumerate() - .map( - #[allow(clippy::type_complexity)] - |(block_txoutindex, (block_txindex, vout, txout, tx))| -> color_eyre::Result<(Txoutindex, - (&TxOut, Txindexvout, Addresstype, color_eyre::Result, Option))> { - let txindex_local = txindex + block_txindex; - let txindexvout = Txindexvout::from((txindex_local, vout)); - let txoutindex_local = txoutindex + Txoutindex::from(block_txoutindex); - - let script = &txout.script_pubkey; - - let addresstype = Addresstype::from(script); - - let addressbytes_res = Addressbytes::try_from((script, addresstype)).inspect_err(|_| { - // dbg!(&txout, height, txi, &tx.compute_txid()); - }); - - let addressindex_opt = addressbytes_res.as_ref().ok().and_then(|addressbytes| { - stores.addressbytes_prefix_to_addressindex.get( - &AddressbytesPrefix::from((addressbytes, addresstype)), - ) - .cloned() - // Checking if not in the future - .and_then(|addressindex_local| (addressindex_local < addressindex) - .then_some(addressindex_local)) - }); // OK - - if let Some(Some(addressindex_local)) = check_collisions.then_some(addressindex_opt) { - let addressbytes = addressbytes_res.as_ref().unwrap(); - - let prev_addresstype = *vecs.addressindex_to_addresstype.get( - addressindex_local, - )?.context("Expect to have address type")?; - - let addresstypeindex = *vecs.addressindex_to_addresstypeindex.get( - addressindex_local, - )?.context("Expect to have address type index")?; - // Good first time - // Wrong after rerun - - let prev_addressbytes_opt= vecs.get_addressbytes(prev_addresstype, addresstypeindex)?; - - let prev_addressbytes = prev_addressbytes_opt.as_ref().context("Expect to have addressbytes")?; - - if (vecs.addressindex_to_addresstype.hasnt(addressindex_local) && addresstype != prev_addresstype) || (stores.addressbytes_prefix_to_addressindex.needs(height) && prev_addressbytes != addressbytes) { - let txid = tx.compute_txid(); - dbg!(_height, txid, vout, block_txindex, addresstype, prev_addresstype, prev_addressbytes, addressbytes, addressindex, addressindex_local, addresstypeindex, txout, AddressbytesPrefix::from((addressbytes, addresstype)), AddressbytesPrefix::from((prev_addressbytes, prev_addresstype))); - panic!() - } - } - - Ok(( - txoutindex_local, - (txout, txindexvout, addresstype, addressbytes_res, addressindex_opt), - )) - }, - ) - .try_fold( - BTreeMap::new, - |mut map, tuple| -> color_eyre::Result<_> { - let (key, value) = tuple?; - map.insert(key, value); - Ok(map) - }, - ) - .try_reduce(BTreeMap::new, |mut map, mut map2| { - if map.len() > map2.len() { - map.append(&mut map2); - Ok(map) - } else { - map2.append(&mut map); - Ok(map2) - } - }) - }); - - (txid_prefix_to_txid_and_block_txindex_and_prev_txindex_handle.join(), txin_or_addressindextxoutindex_vec_handle.join(), txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle.join()) - }); - - let txid_prefix_to_txid_and_block_txindex_and_prev_txindex = txid_prefix_to_txid_and_block_txindex_and_prev_txindex_join_handle.ok().context("Expect txid_prefix_to_txid_and_block_txindex_and_prev_txindex_join_handle to join")??; - - let txin_or_addressindextxoutindex_vec = txin_or_addressindextxoutindex_vec_handle.ok().context("Export txin_or_addressindextxoutindex_vec_handle to join")??; - - let txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt = txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle.ok().context("Expect txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle to join")??; - - let mut new_txindexvout_to_addressindextxoutindex: BTreeMap = BTreeMap::new(); - - let mut already_added_addressbytes_prefix: BTreeMap = BTreeMap::new(); - - txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt - .into_iter() - .try_for_each(|(txoutindex, (txout, txindexvout, addresstype, addressbytes_res, addressindex_opt))| -> color_eyre::Result<()> { - let amount = Amount::from(txout.value); - - stores.txindexvout_to_txoutindex.insert_if_needed( - txindexvout, - txoutindex, - height, - ); - - vecs.txoutindex_to_amount.push_if_needed( - txoutindex, - amount, - )?; - - let mut addressindex_local = addressindex; - - let mut addressbytes_prefix = None; - - if let Some(addressindex) = addressindex_opt.or_else(|| addressbytes_res.as_ref().ok().and_then(|addressbytes| { - // Check if address was first seen before in this iterator - // Example: https://mempool.space/address/046a0765b5865641ce08dd39690aade26dfbf5511430ca428a3089261361cef170e3929a68aee3d8d4848b0c5111b0a37b82b86ad559fd2a745b44d8e8d9dfdc0c - addressbytes_prefix.replace(AddressbytesPrefix::from((addressbytes, addresstype))); - already_added_addressbytes_prefix.get( - addressbytes_prefix.as_ref().unwrap(), - ).cloned() - })) { - addressindex_local = addressindex; - } else { - addressindex.increment(); - - // TODO: Create counter of other addresstypes instead - let addresstypeindex = Addresstypeindex::from(vecs.addresstype_to_addressbytes(addresstype).map_or(0, |vec| vec.len())); - - vecs.addressindex_to_addresstype.push_if_needed(addressindex_local, addresstype)?; - - vecs.addressindex_to_addresstypeindex.push_if_needed(addressindex_local, addresstypeindex)?; - - if let Ok(addressbytes) = addressbytes_res { - let addressbytes_prefix = addressbytes_prefix.unwrap(); - - // if addressindex_local == Addressindex::from(257905_u32) || addressbytes_prefix == AddressbytesPrefix::from( - // [ - // 116_u8, - // 86, - // 96, - // 52, - // 2, - // 87, - // 151, - // 177, - // ], - // ) { - // dbg!(addressindex_local, addressbytes, addressbytes_prefix, addresstypeindex); - // panic!(); - // } - - already_added_addressbytes_prefix.insert(addressbytes_prefix.clone(), addressindex_local); - - stores.addressbytes_prefix_to_addressindex.insert_if_needed( - addressbytes_prefix, - addressindex_local, - height - ); - - vecs.push_addressbytes_if_needed(addresstypeindex, addressbytes)?; - } - } - - let addressindextxoutindex = Addressindextxoutindex::from((addressindex_local, txoutindex)); - - new_txindexvout_to_addressindextxoutindex.insert(txindexvout, addressindextxoutindex); - - vecs.txoutindex_to_addressindex.push_if_needed( - txoutindex, - addressindex_local, - )?; - - stores.addressindextxoutindex_in.insert_if_needed( - addressindextxoutindex, - (), - height, - ); - - Ok(()) - })?; - - drop(already_added_addressbytes_prefix); - - if stores.addressindextxoutindex_out.needs(height) { - txin_or_addressindextxoutindex_vec - .into_iter() - .map(|txin_or_addressindextxoutindex| -> color_eyre::Result { - match txin_or_addressindextxoutindex { - TxInOrAddressindextoutindex::Addressindextoutindex(addressindextxoutindex) => Ok(addressindextxoutindex), - TxInOrAddressindextoutindex::TxIn(txin) => { - let outpoint = txin.previous_output; - let txid = outpoint.txid; - let vout = outpoint.vout; - let index = txid_prefix_to_txid_and_block_txindex_and_prev_txindex - .get(&TxidPrefix::try_from(&txid)?) - .context("txid should be in same block")?.2; - let txindex_local = txindex + index; - - let txindexvout = Txindexvout::from((txindex_local, vout)); - - new_txindexvout_to_addressindextxoutindex - .remove(&txindexvout) - .context("should have found addressindex from same block").inspect_err(|_| { - dbg!(&new_txindexvout_to_addressindextxoutindex, txin, txindexvout, txid); - }) - } - } - }) - .try_for_each(|addressindextxoutindex| -> color_eyre::Result<()> { - stores.addressindextxoutindex_out.insert( - addressindextxoutindex?, - (), - ); - Ok(()) - })?; - } - - drop(new_txindexvout_to_addressindextxoutindex); - - let mut txindex_to_tx_and_txid: BTreeMap = BTreeMap::default(); - - txid_prefix_to_txid_and_block_txindex_and_prev_txindex.into_iter().try_for_each( - |(txid_prefix, (tx, txid, index, prev_txindex_opt))| -> color_eyre::Result<()> { - let txindex_local = txindex + index; - - txindex_to_tx_and_txid.insert(txindex_local, (tx, txid)); - - match prev_txindex_opt { - None => { - stores.txid_prefix_to_txindex.insert_if_needed(txid_prefix, txindex_local, height); - }, - Some(prev_txindex) => { - // In case if we start at an already parsed height - if txindex_local == prev_txindex { - return Ok(()) - } - - let len = vecs.txindex_to_txid.len(); - // Ok if `get` is not par as should happen only twice - let prev_txid = - vecs.txindex_to_txid.get(prev_txindex)? - .context("To have txid for txindex").inspect_err(|_| { - dbg!(txindex_local, txid, len); - })?; - - // #[allow(clippy::redundant_locals)] - // let prev_txid = prev_txid; - let prev_txid = prev_txid.as_ref(); - - // If another Txid needs to be added to the list - // We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated - let only_known_dup_txids = [ - Txid::from_str("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599")?, - Txid::from_str("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468")?, - ]; - - let is_dup = only_known_dup_txids.contains(prev_txid); - - if !is_dup { - let prev_height = vecs.txindex_to_height.get(prev_txindex)?.expect("To have height"); - dbg!(height, txid, txindex_local, prev_height, prev_txid, prev_txindex); - return Err(eyre!("Expect none")); - } - } - } - - Ok(()) - }, - )?; - - txindex_to_tx_and_txid.into_iter().try_for_each(|(txindex, (tx, txid))| -> color_eyre::Result<()> { - vecs.txindex_to_txversion.push_if_needed(txindex, tx.version)?; - vecs.txindex_to_txid.push_if_needed(txindex, txid)?; - vecs.txindex_to_height.push_if_needed(txindex, height)?; - vecs.txindex_to_inputcount.push_if_needed(txindex, tx.input.len() as u32)?; - vecs.txindex_to_outputcount.push_if_needed(txindex, tx.output.len() as u32)?; - Ok(()) - })?; - - vecs.height_to_last_txindex.push_if_needed(height, txindex.decremented())?; - vecs.height_to_last_txoutindex.push_if_needed(height, txoutindex.decremented())?; - vecs.height_to_last_addressindex.push_if_needed(height, addressindex.decremented())?; - - let should_snapshot = _height % SNAPSHOT_BLOCK_RANGE == 0 && !exit.active(); - if should_snapshot { - export(stores, &mut vecs, height)?; - stores_opt.replace(Stores::open(&path_stores)?); - } else { - stores_opt.replace(stores); - } - - txindex += Txindex::from(tx_len); - txoutindex += Txoutindex::from(outputs_len); - - Ok(()) - })?; - - let stores = stores_opt.take().context("option should have wtx")?; - export(stores, &mut vecs, height)?; + Indexer::index(Path::new("indexes"), data_dir, rpc, exit)?; dbg!(i.elapsed()); - pause(); - Ok(()) } - -fn pause() { - let mut stdin = std::io::stdin(); - let mut stdout = std::io::stdout(); - - // We want the cursor to stay at the end of the line, so we print without a newline and flush manually. - write!(stdout, "Press any key to continue...").unwrap(); - stdout.flush().unwrap(); - - // Read a single byte and discard - let _ = stdin.read(&mut [0u8]).unwrap(); -} diff --git a/bindex/src/storage/mod.rs b/bindex/src/storage/mod.rs new file mode 100644 index 000000000..96983bd40 --- /dev/null +++ b/bindex/src/storage/mod.rs @@ -0,0 +1,5 @@ +mod stores; +mod vecs; + +pub use stores::*; +pub use vecs::*; diff --git a/bindex/src/storage/stores/meta.rs b/bindex/src/storage/stores/meta.rs new file mode 100644 index 000000000..b73ac3385 --- /dev/null +++ b/bindex/src/storage/stores/meta.rs @@ -0,0 +1,81 @@ +use std::{ + fs, io, + path::{Path, PathBuf}, +}; + +use snkrj::UnitDatabase; + +use super::{Height, Version}; + +pub struct StoreMeta { + pathbuf: PathBuf, + version: Version, + height: Option, + pub len: usize, +} + +impl StoreMeta { + pub fn checked_open(path: &Path, version: Version) -> Result { + fs::create_dir_all(path)?; + + let is_same_version = + Version::try_from(Self::path_version_(path).as_path()).is_ok_and(|prev_version| version == prev_version); + + if !is_same_version { + fs::remove_dir_all(path)?; + fs::create_dir(path)?; + } + + let this = Self { + pathbuf: path.to_owned(), + version, + height: Height::try_from(Self::path_height_(path).as_path()).ok(), + len: UnitDatabase::read_length_(path), + }; + + this.version.write(&this.path_version())?; + + Ok(this) + } + + #[allow(unused)] + pub fn len(&self) -> usize { + self.len + } + + pub fn export(mut self, height: Height) -> Result<(), io::Error> { + self.height = Some(height); + height.write(&self.path_height())?; + UnitDatabase::write_length_(&self.pathbuf, self.len) + } + + pub fn path_parts(&self) -> PathBuf { + Self::path_parts_(&self.pathbuf) + } + fn path_parts_(path: &Path) -> PathBuf { + path.join("parts") + } + + fn path_version(&self) -> PathBuf { + Self::path_version_(&self.pathbuf) + } + fn path_version_(path: &Path) -> PathBuf { + path.join("version") + } + + pub fn height(&self) -> Option<&Height> { + self.height.as_ref() + } + pub fn needs(&self, height: Height) -> bool { + self.height.is_none_or(|self_height| height > self_height) + } + pub fn has(&self, height: Height) -> bool { + !self.needs(height) + } + fn path_height(&self) -> PathBuf { + Self::path_height_(&self.pathbuf) + } + fn path_height_(path: &Path) -> PathBuf { + path.join("height") + } +} diff --git a/bindex/src/structs/stores.rs b/bindex/src/storage/stores/mod.rs similarity index 67% rename from bindex/src/structs/stores.rs rename to bindex/src/storage/stores/mod.rs index 37e360bf7..e9e500fa7 100644 --- a/bindex/src/structs/stores.rs +++ b/bindex/src/storage/stores/mod.rs @@ -1,33 +1,44 @@ use std::{path::Path, thread}; -use crate::structs::Height; - -use super::{ - AddressbytesPrefix, Addressindex, Addressindextxoutindex, BlockHashPrefix, Store, TxidPrefix, Txindex, Txindexvout, - Txoutindex, Version, +use crate::{ + structs::Version, AddressbytesPrefix, Addressindex, BlockHashPrefix, Height, TxidPrefix, Txindex, Txoutindex, }; +mod meta; +mod multi; +mod unique; + +use meta::*; +use multi::*; +use unique::*; + pub struct Stores { - pub addressbytes_prefix_to_addressindex: Store, - pub addressindextxoutindex_in: Store, - pub addressindextxoutindex_out: Store, - pub blockhash_prefix_to_height: Store, - pub txid_prefix_to_txindex: Store, - pub txindexvout_to_txoutindex: Store, + pub addressbytes_prefix_to_addressindex: StoreUnique, + pub addressindex_to_txoutindex_in: StoreMulti, // Received + pub addressindex_to_txoutindex_out: StoreMulti, // Spent + pub blockhash_prefix_to_height: StoreUnique, + pub txid_prefix_to_txindex: StoreUnique, + pub txindex_to_txoutindex_in: StoreMulti, // Inputs } impl Stores { pub fn open(path: &Path) -> color_eyre::Result { Ok(Self { - addressbytes_prefix_to_addressindex: Store::open( + addressbytes_prefix_to_addressindex: StoreUnique::open( &path.join("addressbytes_prefix_to_addressindex"), Version::from(1), )?, - addressindextxoutindex_in: Store::open(&path.join("addresstxoutindexes_in"), Version::from(1))?, - addressindextxoutindex_out: Store::open(&path.join("addresstxoutindexes_out"), Version::from(1))?, - blockhash_prefix_to_height: Store::open(&path.join("blockhash_prefix_to_height"), Version::from(1))?, - txid_prefix_to_txindex: Store::open(&path.join("txid_prefix_to_txindex"), Version::from(1))?, - txindexvout_to_txoutindex: Store::open(&path.join("txindexvout_to_txoutindex"), Version::from(1))?, + addressindex_to_txoutindex_in: StoreMulti::open( + &path.join("addressindex_to_txoutindex_in"), + Version::from(1), + )?, + addressindex_to_txoutindex_out: StoreMulti::open( + &path.join("addressindex_to_txoutindex_out"), + Version::from(1), + )?, + blockhash_prefix_to_height: StoreUnique::open(&path.join("blockhash_prefix_to_height"), Version::from(1))?, + txid_prefix_to_txindex: StoreUnique::open(&path.join("txid_prefix_to_txindex"), Version::from(1))?, + txindex_to_txoutindex_in: StoreMulti::open(&path.join("txindex_to_txoutindex_in"), Version::from(1))?, }) } @@ -153,11 +164,11 @@ impl Stores { pub fn min_height(&self) -> Option { [ self.addressbytes_prefix_to_addressindex.height(), - self.addressindextxoutindex_in.height(), - self.addressindextxoutindex_out.height(), + self.addressindex_to_txoutindex_in.height(), + self.addressindex_to_txoutindex_out.height(), self.blockhash_prefix_to_height.height(), self.txid_prefix_to_txindex.height(), - self.txindexvout_to_txoutindex.height(), + self.txindex_to_txoutindex_in.height(), ] .into_iter() .min() @@ -165,14 +176,18 @@ impl Stores { .cloned() } - pub fn export(self, height: Height) { + pub fn export(self, height: Height) -> Result<(), snkrj::Error> { thread::scope(|scope| { - scope.spawn(|| self.addressbytes_prefix_to_addressindex.export(height)); - scope.spawn(|| self.addressindextxoutindex_in.export(height)); - scope.spawn(|| self.addressindextxoutindex_out.export(height)); - scope.spawn(|| self.blockhash_prefix_to_height.export(height)); - scope.spawn(|| self.txid_prefix_to_txindex.export(height)); - scope.spawn(|| self.txindexvout_to_txoutindex.export(height)); - }); + vec![ + scope.spawn(|| self.addressbytes_prefix_to_addressindex.export(height)), + scope.spawn(|| self.addressindex_to_txoutindex_in.export(height)), + scope.spawn(|| self.addressindex_to_txoutindex_out.export(height)), + scope.spawn(|| self.blockhash_prefix_to_height.export(height)), + scope.spawn(|| self.txid_prefix_to_txindex.export(height)), + scope.spawn(|| self.txindex_to_txoutindex_in.export(height)), + ] + .into_iter() + .try_for_each(|handle| -> Result<(), snkrj::Error> { handle.join().unwrap() }) + }) } } diff --git a/bindex/src/storage/stores/multi.rs b/bindex/src/storage/stores/multi.rs new file mode 100644 index 000000000..2c5cf2e4b --- /dev/null +++ b/bindex/src/storage/stores/multi.rs @@ -0,0 +1,101 @@ +use std::{array, path::Path, sync::OnceLock}; + +use rayon::prelude::*; +use snkrj::{DatabaseKey, DatabaseMulti, DatabaseValue}; + +use super::{Height, StoreMeta, Version}; + +pub struct StoreMulti +where + K: DatabaseKey, + V: DatabaseValue, +{ + meta: StoreMeta, + pub parts: [OnceLock>>; 64], +} + +impl StoreMulti +where + K: DatabaseKey, + V: DatabaseValue, +{ + pub fn open(path: &Path, version: Version) -> Result { + let meta = StoreMeta::checked_open(path, version)?; + + Ok(Self { + meta, + parts: array::from_fn(|_| OnceLock::new()), + }) + } + + // pub fn len(&self) -> usize { + // self.meta.len() + // } + + fn get_or_init_store(&self, key: &K) -> &DatabaseMulti { + self.get_or_init_store_(key.as_ne_six_bits() as usize) + } + + fn get_or_init_store_(&self, storeindex: usize) -> &DatabaseMulti { + self.parts[storeindex] + .get_or_init(|| Box::new(DatabaseMulti::open(self.meta.path_parts().join(storeindex.to_string())).unwrap())) + } + + fn get_or_init_mut_store(&mut self, key: &K) -> &mut DatabaseMulti { + self.get_or_init_store(key); + + self.parts + .get_mut(key.as_ne_six_bits() as usize) + .unwrap() + .get_mut() + .unwrap() + } + + #[allow(unused)] + pub fn open_all(&self) { + (0..=(u8::MAX) as usize).for_each(|storeindex| { + self.get_or_init_store_(storeindex); + }); + } + + pub fn get(&self, key: &K) -> Result, snkrj::Error> { + self.get_or_init_store(key).get(key) + } + + pub fn insert(&mut self, key: K, value: V) -> Option { + self.meta.len += 1; + self.get_or_init_mut_store(&key).insert(key, value) + } + + pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { + if self.meta.needs(height) { + self.insert(key, value); + } + } + + pub fn export(self, height: Height) -> Result<(), snkrj::Error> { + if self.has(height) { + return Ok(()); + } + + self.meta.export(height)?; + + self.parts.into_par_iter().try_for_each(|s| { + if let Some(db) = s.into_inner() { + db.export() + } else { + Ok(()) + } + }) + } + + pub fn height(&self) -> Option<&Height> { + self.meta.height() + } + pub fn needs(&self, height: Height) -> bool { + self.meta.needs(height) + } + pub fn has(&self, height: Height) -> bool { + self.meta.has(height) + } +} diff --git a/bindex/src/storage/stores/unique.rs b/bindex/src/storage/stores/unique.rs new file mode 100644 index 000000000..ff4864181 --- /dev/null +++ b/bindex/src/storage/stores/unique.rs @@ -0,0 +1,102 @@ +use std::{array, path::Path, sync::OnceLock}; + +use rayon::prelude::*; +use snkrj::{DatabaseKey, DatabaseUnique, DatabaseValue}; + +use super::{Height, StoreMeta, Version}; + +pub struct StoreUnique +where + K: DatabaseKey, + V: DatabaseValue, +{ + meta: StoreMeta, + pub parts: [OnceLock>>; 64], +} + +impl StoreUnique +where + K: DatabaseKey, + V: DatabaseValue, +{ + pub fn open(path: &Path, version: Version) -> Result { + let meta = StoreMeta::checked_open(path, version)?; + + Ok(Self { + meta, + parts: array::from_fn(|_| OnceLock::new()), + }) + } + + // pub fn len(&self) -> usize { + // self.meta.len() + // } + + fn get_or_init_store(&self, key: &K) -> &DatabaseUnique { + self.get_or_init_store_(key.as_ne_six_bits() as usize) + } + + fn get_or_init_store_(&self, storeindex: usize) -> &DatabaseUnique { + self.parts[storeindex].get_or_init(|| { + Box::new(DatabaseUnique::open(self.meta.path_parts().join(storeindex.to_string())).unwrap()) + }) + } + + fn get_or_init_mut_store(&mut self, key: &K) -> &mut DatabaseUnique { + self.get_or_init_store(key); + + self.parts + .get_mut(key.as_ne_six_bits() as usize) + .unwrap() + .get_mut() + .unwrap() + } + + #[allow(unused)] + pub fn open_all(&self) { + (0..=(u8::MAX) as usize).for_each(|storeindex| { + self.get_or_init_store_(storeindex); + }); + } + + pub fn get(&self, key: &K) -> Result, snkrj::Error> { + self.get_or_init_store(key).get(key) + } + + pub fn insert(&mut self, key: K, value: V) -> Option { + self.meta.len += 1; + self.get_or_init_mut_store(&key).insert(key, value) + } + + pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { + if self.meta.needs(height) { + self.insert(key, value); + } + } + + pub fn export(self, height: Height) -> Result<(), snkrj::Error> { + if self.has(height) { + return Ok(()); + } + + self.meta.export(height)?; + + self.parts.into_par_iter().try_for_each(|s| { + if let Some(db) = s.into_inner() { + db.export() + } else { + Ok(()) + } + }) + } + + pub fn height(&self) -> Option<&Height> { + self.meta.height() + } + pub fn needs(&self, height: Height) -> bool { + self.meta.needs(height) + } + pub fn has(&self, height: Height) -> bool { + self.meta.has(height) + } +} diff --git a/bindex/src/structs/vec.rs b/bindex/src/storage/vecs/base.rs similarity index 69% rename from bindex/src/structs/vec.rs rename to bindex/src/storage/vecs/base.rs index c78d93d97..834bfc435 100644 --- a/bindex/src/structs/vec.rs +++ b/bindex/src/storage/vecs/base.rs @@ -8,6 +8,7 @@ use std::{ use super::{Height, Version}; pub struct StorableVec { + height: Option, pathbuf: PathBuf, version: Version, vec: storable_vec::StorableVec, @@ -22,41 +23,48 @@ where fs::create_dir_all(path)?; let pathbuf = path.to_owned(); - let path_vec = Self::_path_vec(path); - let path_version = Self::_path_version(path); + let path_vec = Self::path_vec_(path); + let path_version = Self::path_version_(path); let is_same_version = Version::try_from(path_version.as_path()).is_ok_and(|prev_version| version == prev_version); if !is_same_version { let _ = fs::remove_file(&path_vec); let _ = fs::remove_file(&path_version); - let _ = fs::remove_file(Self::_path_height(path)); + let _ = fs::remove_file(Self::path_height_(path)); } - Ok(Self { + let this = Self { + height: Height::try_from(Self::path_height_(path).as_path()).ok(), pathbuf, version, vec: storable_vec::StorableVec::import(&path_vec)?, - }) + }; + + this.version.write(&this.path_version())?; + + Ok(this) } pub fn flush(&mut self, height: Height) -> io::Result<()> { - height.write(&self.path_height())?; - self.version.write(&self.path_version())?; + if self.needs(height) { + height.write(&self.path_height())?; + } + self.vec.flush() } // fn path_vec(&self) -> PathBuf { // Self::_path_vec(&self.path) // } - fn _path_vec(path: &Path) -> PathBuf { + fn path_vec_(path: &Path) -> PathBuf { path.join("vec") } fn path_version(&self) -> PathBuf { - Self::_path_version(&self.pathbuf) + Self::path_version_(&self.pathbuf) } - fn _path_version(path: &Path) -> PathBuf { + fn path_version_(path: &Path) -> PathBuf { path.join("version") } @@ -64,15 +72,18 @@ where Height::try_from(self.path_height().as_path()) } fn path_height(&self) -> PathBuf { - Self::_path_height(&self.pathbuf) + Self::path_height_(&self.pathbuf) } - fn _path_height(path: &Path) -> PathBuf { + fn path_height_(path: &Path) -> PathBuf { path.join("height") } - // pub fn needs(&self, height: Height) -> bool { - // self.height() // store height in struct - // } + pub fn needs(&self, height: Height) -> bool { + self.height.is_none_or(|self_height| height > self_height) + } + pub fn has(&self, height: Height) -> bool { + !self.needs(height) + } } impl Deref for StorableVec { diff --git a/bindex/src/structs/vecs.rs b/bindex/src/storage/vecs/mod.rs similarity index 70% rename from bindex/src/structs/vecs.rs rename to bindex/src/storage/vecs/mod.rs index 59e6ebcc7..05ff6b180 100644 --- a/bindex/src/structs/vecs.rs +++ b/bindex/src/storage/vecs/mod.rs @@ -1,38 +1,44 @@ use std::{fs, io, path::Path}; -use biter::bitcoin::{transaction, BlockHash, Txid}; +use biter::bitcoin::{self, transaction, BlockHash, Txid, Weight}; use color_eyre::eyre::eyre; +use exit::Exit; use rayon::prelude::*; use storable_vec::AnyStorableVec; -use super::{ - Addressbytes, Addressindex, Addresstype, Addresstypeindex, Amount, AnyBindexVec, Date, Exit, Height, - P2PK33AddressBytes, P2PK65AddressBytes, P2PKHAddressBytes, P2SHAddressBytes, P2TRAddressBytes, P2WPKHAddressBytes, - P2WSHAddressBytes, StorableVec, Timestamp, Txindex, Txoutindex, Version, +use crate::structs::{ + Addressbytes, Addressindex, Addresstype, Addresstypeindex, Amount, Height, P2PK33AddressBytes, P2PK65AddressBytes, + P2PKHAddressBytes, P2SHAddressBytes, P2TRAddressBytes, P2WPKHAddressBytes, P2WSHAddressBytes, Timestamp, Txindex, + Txoutindex, Version, }; +mod base; + +use base::*; + pub struct Vecs { pub addressindex_to_addresstype: StorableVec, pub addressindex_to_addresstypeindex: StorableVec, + pub addressindex_to_height: StorableVec, pub height_to_blockhash: StorableVec, - pub height_to_date: StorableVec, - pub height_to_totalfees: StorableVec, pub height_to_first_addressindex: StorableVec, + pub height_to_first_emptyindex: StorableVec, + pub height_to_first_multisigindex: StorableVec, + pub height_to_first_opreturnindex: StorableVec, + pub height_to_first_pushonlyindex: StorableVec, pub height_to_first_txindex: StorableVec, pub height_to_first_txoutindex: StorableVec, - pub height_to_inputcount: StorableVec, - pub height_to_last_addressindex: StorableVec, - pub height_to_last_txindex: StorableVec, - pub height_to_last_txoutindex: StorableVec, - pub height_to_outputcount: StorableVec, + pub height_to_first_unknownindex: StorableVec, + pub height_to_p2pk33index: StorableVec, + pub height_to_p2pk65index: StorableVec, + pub height_to_p2pkhindex: StorableVec, + pub height_to_p2shindex: StorableVec, + pub height_to_p2trindex: StorableVec, + pub height_to_p2wpkhindex: StorableVec, + pub height_to_p2wshindex: StorableVec, + pub height_to_size: StorableVec, pub height_to_timestamp: StorableVec, - pub height_to_txcount: StorableVec, - // pub height_to_size: StorableVec, - // pub height_to_weight: StorableVec, - // pub height_to_subsidy: StorableVec, - // pub height_to_minfeerate: StorableVec, - // pub height_to_maxfeerate: StorableVec, - // pub height_to_medianfeerate: StorableVec, + pub height_to_weight: StorableVec, pub p2pk33index_to_p2pk33addressbytes: StorableVec, pub p2pk65index_to_p2pk65addressbytes: StorableVec, pub p2pkhindex_to_p2pkhaddressbytes: StorableVec, @@ -40,15 +46,30 @@ pub struct Vecs { pub p2trindex_to_p2traddressbytes: StorableVec, pub p2wpkhindex_to_p2wpkhaddressbytes: StorableVec, pub p2wshindex_to_p2wshaddressbytes: StorableVec, - pub txindex_to_fee: StorableVec, - // pub txindex_to_feerate: StorableVec, + pub txindex_to_first_txoutindex: StorableVec, pub txindex_to_height: StorableVec, - pub txindex_to_inputcount: StorableVec, - pub txindex_to_outputcount: StorableVec, + pub txindex_to_locktime: StorableVec, pub txindex_to_txid: StorableVec, pub txindex_to_txversion: StorableVec, pub txoutindex_to_addressindex: StorableVec, pub txoutindex_to_amount: StorableVec, + // Can be computed later: + // pub height_to_date: StorableVec, + // pub height_to_totalfees: StorableVec, + // pub height_to_inputcount: StorableVec, + // pub height_to_last_addressindex: StorableVec, + // pub height_to_last_txindex: StorableVec, + // pub height_to_last_txoutindex: StorableVec, + // pub height_to_outputcount: StorableVec, + // pub height_to_txcount: StorableVec, + // pub height_to_subsidy: StorableVec, + // pub height_to_minfeerate: StorableVec, + // pub height_to_maxfeerate: StorableVec, + // pub height_to_medianfeerate: StorableVec, + // pub txindex_to_feerate: StorableVec, + // pub txindex_to_inputcount: StorableVec, + // pub txindex_to_outputcount: StorableVec, + // pub txindex_to_last_txoutindex: StorableVec, } // const UNSAFE_BLOCKS: usize = 100; @@ -66,28 +87,47 @@ impl Vecs { &path.join("addressindex_to_addresstypeindex"), Version::from(1), )?, + addressindex_to_height: StorableVec::import(&path.join("addressindex_to_height"), Version::from(1))?, height_to_blockhash: StorableVec::import(&path.join("height_to_blockhash"), Version::from(1))?, - height_to_date: StorableVec::import(&path.join("height_to_date"), Version::from(1))?, height_to_first_addressindex: StorableVec::import( &path.join("height_to_first_addressindex"), Version::from(1), )?, + height_to_first_emptyindex: StorableVec::import( + &path.join("height_to_first_emptyindex"), + Version::from(1), + )?, + height_to_first_multisigindex: StorableVec::import( + &path.join("height_to_first_multisigindex"), + Version::from(1), + )?, + height_to_first_opreturnindex: StorableVec::import( + &path.join("height_to_first_opreturnindex"), + Version::from(1), + )?, + height_to_first_pushonlyindex: StorableVec::import( + &path.join("height_to_first_pushonlyindex"), + Version::from(1), + )?, height_to_first_txindex: StorableVec::import(&path.join("height_to_first_txindex"), Version::from(1))?, height_to_first_txoutindex: StorableVec::import( &path.join("height_to_first_txoutindex"), Version::from(1), )?, - height_to_inputcount: StorableVec::import(&path.join("height_to_inputcount"), Version::from(1))?, - height_to_last_addressindex: StorableVec::import( - &path.join("height_to_last_addressindex"), + height_to_first_unknownindex: StorableVec::import( + &path.join("height_to_first_unkownindex"), Version::from(1), )?, - height_to_last_txindex: StorableVec::import(&path.join("height_to_last_txindex"), Version::from(1))?, - height_to_last_txoutindex: StorableVec::import(&path.join("height_to_last_txoutindex"), Version::from(1))?, - height_to_outputcount: StorableVec::import(&path.join("height_to_outputcount"), Version::from(1))?, + height_to_p2pk33index: StorableVec::import(&path.join("height_to_p2pk33index"), Version::from(1))?, + height_to_p2pk65index: StorableVec::import(&path.join("height_to_p2pk65index"), Version::from(1))?, + height_to_p2pkhindex: StorableVec::import(&path.join("height_to_p2pkhindex"), Version::from(1))?, + height_to_p2shindex: StorableVec::import(&path.join("height_to_p2shindex"), Version::from(1))?, + height_to_p2trindex: StorableVec::import(&path.join("height_to_p2trindex"), Version::from(1))?, + height_to_p2wpkhindex: StorableVec::import(&path.join("height_to_p2wpkhindex"), Version::from(1))?, + height_to_p2wshindex: StorableVec::import(&path.join("height_to_p2wshindex"), Version::from(1))?, + height_to_size: StorableVec::import(&path.join("height_to_size"), Version::from(1))?, height_to_timestamp: StorableVec::import(&path.join("height_to_timestamp"), Version::from(1))?, - height_to_totalfees: StorableVec::import(&path.join("height_to_totalfees"), Version::from(1))?, - height_to_txcount: StorableVec::import(&path.join("height_to_txcount"), Version::from(1))?, + height_to_weight: StorableVec::import(&path.join("height_to_weight"), Version::from(1))?, p2pk33index_to_p2pk33addressbytes: StorableVec::import( &path.join("p2pk33index_to_p2pk33addressbytes"), Version::from(1), @@ -116,10 +156,12 @@ impl Vecs { &path.join("p2wshindex_to_p2wshaddressbytes"), Version::from(1), )?, - txindex_to_fee: StorableVec::import(&path.join("txindex_to_fee"), Version::from(1))?, + txindex_to_first_txoutindex: StorableVec::import( + &path.join("txindex_to_first_txoutindex"), + Version::from(1), + )?, txindex_to_height: StorableVec::import(&path.join("txindex_to_height"), Version::from(1))?, - txindex_to_inputcount: StorableVec::import(&path.join("txindex_to_inputcount"), Version::from(1))?, - txindex_to_outputcount: StorableVec::import(&path.join("txindex_to_outputcount"), Version::from(1))?, + txindex_to_locktime: StorableVec::import(&path.join("txindex_to_locktime"), Version::from(1))?, txindex_to_txid: StorableVec::import(&path.join("txindex_to_txid"), Version::from(1))?, txindex_to_txversion: StorableVec::import(&path.join("txindex_to_txversion"), Version::from(1))?, txoutindex_to_addressindex: StorableVec::import( @@ -130,19 +172,6 @@ impl Vecs { }) } - pub fn addresstype_to_addressbytes(&self, addresstype: Addresstype) -> color_eyre::Result<&dyn AnyStorableVec> { - match addresstype { - Addresstype::P2PK65 => Ok(&*self.p2pk65index_to_p2pk65addressbytes), - Addresstype::P2PK33 => Ok(&*self.p2pk33index_to_p2pk33addressbytes), - Addresstype::P2PKH => Ok(&*self.p2pkhindex_to_p2pkhaddressbytes), - Addresstype::P2SH => Ok(&*self.p2shindex_to_p2shaddressbytes), - Addresstype::P2WPKH => Ok(&*self.p2wpkhindex_to_p2wpkhaddressbytes), - Addresstype::P2WSH => Ok(&*self.p2wshindex_to_p2wshaddressbytes), - Addresstype::P2TR => Ok(&*self.p2trindex_to_p2traddressbytes), - _ => Err(eyre!("wrong address type")), - } - } - pub fn push_addressbytes_if_needed( &mut self, index: Addresstypeindex, @@ -332,23 +361,30 @@ impl Vecs { .min()) } - pub fn as_slice(&self) -> [&dyn AnyBindexVec; 30] { + pub fn as_slice(&self) -> [&dyn AnyBindexVec; 36] { [ &self.addressindex_to_addresstype as &dyn AnyBindexVec, &self.addressindex_to_addresstypeindex, + &self.addressindex_to_height, &self.height_to_blockhash, - &self.height_to_date, - &self.height_to_totalfees, &self.height_to_first_addressindex, + &self.height_to_first_emptyindex, + &self.height_to_first_multisigindex, + &self.height_to_first_opreturnindex, + &self.height_to_first_pushonlyindex, &self.height_to_first_txindex, &self.height_to_first_txoutindex, - &self.height_to_inputcount, - &self.height_to_last_addressindex, - &self.height_to_last_txindex, - &self.height_to_last_txoutindex, - &self.height_to_outputcount, + &self.height_to_first_unknownindex, + &self.height_to_p2pk33index, + &self.height_to_p2pk65index, + &self.height_to_p2pkhindex, + &self.height_to_p2shindex, + &self.height_to_p2trindex, + &self.height_to_p2wpkhindex, + &self.height_to_p2wshindex, + &self.height_to_size, &self.height_to_timestamp, - &self.height_to_txcount, + &self.height_to_weight, &self.p2pk33index_to_p2pk33addressbytes, &self.p2pk65index_to_p2pk65addressbytes, &self.p2pkhindex_to_p2pkhaddressbytes, @@ -356,10 +392,9 @@ impl Vecs { &self.p2trindex_to_p2traddressbytes, &self.p2wpkhindex_to_p2wpkhaddressbytes, &self.p2wshindex_to_p2wshaddressbytes, - &self.txindex_to_fee, + &self.txindex_to_first_txoutindex, &self.txindex_to_height, - &self.txindex_to_inputcount, - &self.txindex_to_outputcount, + &self.txindex_to_locktime, &self.txindex_to_txid, &self.txindex_to_txversion, &self.txoutindex_to_addressindex, @@ -367,23 +402,30 @@ impl Vecs { ] } - pub fn as_mut_slice(&mut self) -> [&mut (dyn AnyBindexVec + Send + Sync); 30] { + pub fn as_mut_slice(&mut self) -> [&mut (dyn AnyBindexVec + Send + Sync); 36] { [ &mut self.addressindex_to_addresstype as &mut (dyn AnyBindexVec + Send + Sync), &mut self.addressindex_to_addresstypeindex, + &mut self.addressindex_to_height, &mut self.height_to_blockhash, - &mut self.height_to_date, - &mut self.height_to_totalfees, // <- &mut self.height_to_first_addressindex, + &mut self.height_to_first_emptyindex, + &mut self.height_to_first_multisigindex, + &mut self.height_to_first_opreturnindex, + &mut self.height_to_first_pushonlyindex, &mut self.height_to_first_txindex, &mut self.height_to_first_txoutindex, - &mut self.height_to_inputcount, // <- - &mut self.height_to_last_addressindex, - &mut self.height_to_last_txindex, - &mut self.height_to_last_txoutindex, - &mut self.height_to_outputcount, // <- + &mut self.height_to_first_unknownindex, + &mut self.height_to_p2pk33index, + &mut self.height_to_p2pk65index, + &mut self.height_to_p2pkhindex, + &mut self.height_to_p2shindex, + &mut self.height_to_p2trindex, + &mut self.height_to_p2wpkhindex, + &mut self.height_to_p2wshindex, + &mut self.height_to_size, &mut self.height_to_timestamp, - &mut self.height_to_txcount, // <- + &mut self.height_to_weight, &mut self.p2pk33index_to_p2pk33addressbytes, &mut self.p2pk65index_to_p2pk65addressbytes, &mut self.p2pkhindex_to_p2pkhaddressbytes, @@ -391,10 +433,9 @@ impl Vecs { &mut self.p2trindex_to_p2traddressbytes, &mut self.p2wpkhindex_to_p2wpkhaddressbytes, &mut self.p2wshindex_to_p2wshaddressbytes, - &mut self.txindex_to_fee, // <- + &mut self.txindex_to_first_txoutindex, &mut self.txindex_to_height, - &mut self.txindex_to_inputcount, // <- - &mut self.txindex_to_outputcount, // <- + &mut self.txindex_to_locktime, &mut self.txindex_to_txid, &mut self.txindex_to_txversion, &mut self.txoutindex_to_addressindex, diff --git a/bindex/src/structs/addresstypeindex.rs b/bindex/src/structs/addresstypeindex.rs index a2ca0e5de..5b126b31d 100644 --- a/bindex/src/structs/addresstypeindex.rs +++ b/bindex/src/structs/addresstypeindex.rs @@ -17,6 +17,12 @@ impl Addresstypeindex { pub fn incremented(self) -> Self { Self(*self + 1) } + + pub fn clone_then_increment(&mut self) -> Self { + let i = *self; + self.increment(); + i + } } impl From for Addresstypeindex { diff --git a/bindex/src/structs/height.rs b/bindex/src/structs/height.rs index b3df4bc15..1fdb478c3 100644 --- a/bindex/src/structs/height.rs +++ b/bindex/src/structs/height.rs @@ -4,7 +4,7 @@ use std::{ path::Path, }; -use biter::bitcoincore_rpc::{self, RpcApi}; +use biter::rpc::{self, RpcApi}; use derive_deref::{Deref, DerefMut}; use snkrj::{direct_repr, Storable, UnsizedStorable}; use storable_vec::UnsafeSizedSerDe; @@ -113,9 +113,9 @@ impl TryFrom<&Path> for Height { } } -impl TryFrom<&bitcoincore_rpc::Client> for Height { - type Error = bitcoincore_rpc::Error; - fn try_from(value: &bitcoincore_rpc::Client) -> Result { +impl TryFrom<&rpc::Client> for Height { + type Error = rpc::Error; + fn try_from(value: &rpc::Client) -> Result { Ok((value.get_blockchain_info()?.blocks as usize - 1).into()) } } diff --git a/bindex/src/structs/mod.rs b/bindex/src/structs/mod.rs index 0c0a84e02..44e08c7c1 100644 --- a/bindex/src/structs/mod.rs +++ b/bindex/src/structs/mod.rs @@ -1,41 +1,27 @@ mod addressbytes; mod addressindex; -mod addressindextxoutindex; mod addresstype; mod addresstypeindex; mod amount; -mod date; -mod exit; mod height; mod prefix; mod slice; -mod store; -mod stores; mod timestamp; mod txindex; -mod txindexvout; mod txoutindex; -mod vec; -mod vecs; mod version; +mod vout; pub use addressbytes::*; pub use addressindex::*; -pub use addressindextxoutindex::*; pub use addresstype::*; pub use addresstypeindex::*; pub use amount::*; -pub use date::*; -pub use exit::*; pub use height::*; pub use prefix::*; pub use slice::*; -pub use store::*; -pub use stores::*; pub use timestamp::*; pub use txindex::*; -pub use txindexvout::*; pub use txoutindex::*; -pub use vec::*; -pub use vecs::*; pub use version::*; +pub use vout::*; diff --git a/bindex/src/structs/store.rs b/bindex/src/structs/store.rs deleted file mode 100644 index eefd70b2e..000000000 --- a/bindex/src/structs/store.rs +++ /dev/null @@ -1,153 +0,0 @@ -use std::{ - array, fs, - path::{Path, PathBuf}, - sync::OnceLock, -}; - -use rayon::prelude::*; -use snkrj::{Database, DatabaseKey, DatabaseValue, UnitDatabase}; -use storable_vec::UnsafeSizedSerDe; - -use super::{Height, Version}; - -pub struct Store -where - K: DatabaseKey, - V: DatabaseValue, -{ - pathbuf: PathBuf, - version: Version, - height: Option, - len: usize, - pub parts: [OnceLock>>; 256], -} - -impl Store -where - K: DatabaseKey, - V: DatabaseValue, -{ - pub fn open(path: &Path, version: Version) -> Result { - fs::create_dir_all(path)?; - - let is_same_version = - Version::try_from(Self::path_version_(path).as_path()).is_ok_and(|prev_version| version == prev_version); - - if !is_same_version { - fs::remove_dir(path)?; - fs::create_dir_all(path)?; - } - - let height = Height::try_from(Self::path_height_(path).as_path()).ok(); - - Ok(Self { - pathbuf: path.to_owned(), - version, - height, - len: UnitDatabase::read_length_(path), - parts: array::from_fn(|_| OnceLock::new()), - }) - } - - #[allow(unused)] - pub fn len(&self) -> usize { - self.len - } - - fn key_to_byte(key: &K) -> u8 { - let slice = key.unsafe_as_slice(); - - *(if cfg!(target_endian = "big") { - slice.last() - } else { - slice.first() - }) - .unwrap() - } - - fn get_or_init_store(&self, key: &K) -> &Database { - self.get_or_init_store_(Self::key_to_byte(key) as usize) - } - - fn get_or_init_store_(&self, storeindex: usize) -> &Database { - self.parts[storeindex] - .get_or_init(|| Box::new(Database::open(self.path_parts().join(storeindex.to_string())).unwrap())) - } - - fn get_or_init_mut_store(&mut self, key: &K) -> &mut Database { - self.get_or_init_store(key); - - self.parts - .get_mut(Self::key_to_byte(key) as usize) - .unwrap() - .get_mut() - .unwrap() - } - - #[allow(unused)] - pub fn open_all(&self) { - (0..=(u8::MAX) as usize).for_each(|storeindex| { - self.get_or_init_store_(storeindex); - }); - } - - pub fn get(&self, key: &K) -> Option<&V> { - self.get_or_init_store(key).get(key) - } - - pub fn insert(&mut self, key: K, value: V) -> Option { - self.len += 1; - self.get_or_init_mut_store(&key).insert(key, value) - } - - pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { - if self.needs(height) { - self.insert(key, value); - } - } - - pub fn export(mut self, height: Height) -> Result<(), snkrj::Error> { - if self.height.is_some_and(|self_height| self_height >= height) { - return Ok(()); - } - - self.height = Some(height); - self.version.write(&self.path_version())?; - height.write(&self.path_height())?; - UnitDatabase::write_length_(&self.pathbuf, self.len)?; - self.parts.into_par_iter().try_for_each(|s| { - if let Some(db) = s.into_inner() { - db.export() - } else { - Ok(()) - } - }) - } - - fn path_parts(&self) -> PathBuf { - Self::path_parts_(&self.pathbuf) - } - fn path_parts_(path: &Path) -> PathBuf { - path.join("parts") - } - - fn path_version(&self) -> PathBuf { - Self::path_version_(&self.pathbuf) - } - fn path_version_(path: &Path) -> PathBuf { - path.join("version") - } - - pub fn height(&self) -> Option<&Height> { - self.height.as_ref() - } - pub fn needs(&self, height: Height) -> bool { - self.height.is_none_or(|self_height| height > self_height) - } - fn path_height(&self) -> PathBuf { - Self::path_height_(&self.pathbuf) - } - fn path_height_(path: &Path) -> PathBuf { - path.join("height") - } -} diff --git a/bindex/src/structs/txoutindex.rs b/bindex/src/structs/txoutindex.rs index 6424bf70a..f0dfba968 100644 --- a/bindex/src/structs/txoutindex.rs +++ b/bindex/src/structs/txoutindex.rs @@ -3,6 +3,8 @@ use std::ops::{Add, AddAssign}; use derive_deref::{Deref, DerefMut}; use snkrj::{direct_repr, Storable, UnsizedStorable}; +use super::Vout; + #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Deref, DerefMut, Default)] pub struct Txoutindex(u64); direct_repr!(Txoutindex); @@ -24,6 +26,13 @@ impl Add for Txoutindex { } } +impl Add for Txoutindex { + type Output = Self; + fn add(self, rhs: Vout) -> Self::Output { + Self(self.0 + u64::from(rhs)) + } +} + impl AddAssign for Txoutindex { fn add_assign(&mut self, rhs: Txoutindex) { self.0 += rhs.0 diff --git a/bindex/src/structs/vout.rs b/bindex/src/structs/vout.rs new file mode 100644 index 000000000..a5f267823 --- /dev/null +++ b/bindex/src/structs/vout.rs @@ -0,0 +1,30 @@ +use derive_deref::Deref; + +#[derive(Debug, Deref, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct Vout(u32); + +impl Vout { + const ZERO: Self = Vout(0_u32); + + pub fn is_zero(&self) -> bool { + *self == Self::ZERO + } +} + +impl From for Vout { + fn from(value: u32) -> Self { + Self(value) + } +} + +impl From for Vout { + fn from(value: usize) -> Self { + Self(value as u32) + } +} + +impl From for u64 { + fn from(value: Vout) -> Self { + value.0 as u64 + } +} diff --git a/biter/src/lib.rs b/biter/src/lib.rs index c554537cc..03afdfebb 100644 --- a/biter/src/lib.rs +++ b/biter/src/lib.rs @@ -18,7 +18,7 @@ use crossbeam::channel::{bounded, Receiver}; use rayon::prelude::*; pub use bitcoin; -pub use bitcoincore_rpc; +pub use bitcoincore_rpc as rpc; mod blk_index_to_blk_path; mod blk_index_to_blk_recap; diff --git a/bomputer/Cargo.toml b/bomputer/Cargo.toml new file mode 100644 index 000000000..ad1754861 --- /dev/null +++ b/bomputer/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "bomputer" +version = "0.1.0" +edition = "2021" + +[dependencies] diff --git a/bomputer/src/lib.rs b/bomputer/src/lib.rs new file mode 100644 index 000000000..00a8d195b --- /dev/null +++ b/bomputer/src/lib.rs @@ -0,0 +1,18 @@ +mod structs; + +use structs::*; + +pub fn add(left: u64, right: u64) -> u64 { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} diff --git a/bindex/src/structs/date.rs b/bomputer/src/structs/date.rs similarity index 70% rename from bindex/src/structs/date.rs rename to bomputer/src/structs/date.rs index ae3a76ff5..207c88d3d 100644 --- a/bindex/src/structs/date.rs +++ b/bomputer/src/structs/date.rs @@ -10,3 +10,10 @@ impl From<&Timestamp> for Date { Self(jiff::civil::Date::from(value.to_zoned(TimeZone::UTC))) } } + +impl From for usize { + // 2009-01-03 => 0 + // 2009-01-09 => 1 + // 2009-01-10 => 2 + // ... +} diff --git a/bomputer/src/structs/feerate.rs b/bomputer/src/structs/feerate.rs new file mode 100644 index 000000000..52ed2fad9 --- /dev/null +++ b/bomputer/src/structs/feerate.rs @@ -0,0 +1,4 @@ +use derive_deref::Deref; + +#[derive(Debug, Deref, Clone, Copy)] +pub struct Feerate(f32); diff --git a/bomputer/src/structs/mod.rs b/bomputer/src/structs/mod.rs new file mode 100644 index 000000000..9d48ff76d --- /dev/null +++ b/bomputer/src/structs/mod.rs @@ -0,0 +1,5 @@ +mod date; +mod feerate; + +pub use date::*; +pub use feerate::*; diff --git a/exit/Cargo.toml b/exit/Cargo.toml new file mode 100644 index 000000000..9a58fb4bb --- /dev/null +++ b/exit/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "exit" +version = "0.1.0" +edition = "2021" + +[dependencies] +ctrlc = "3.4.5" diff --git a/bindex/src/structs/exit.rs b/exit/src/lib.rs similarity index 100% rename from bindex/src/structs/exit.rs rename to exit/src/lib.rs diff --git a/snkrj/src/base.rs b/snkrj/src/base.rs new file mode 100644 index 000000000..113ffe77a --- /dev/null +++ b/snkrj/src/base.rs @@ -0,0 +1,174 @@ +// https://docs.rs/sanakirja/latest/sanakirja/index.html +// https://pijul.org/posts/2021-02-06-rethinking-sanakirja/ + +use std::{ + collections::BTreeMap, + fs::{self, File}, + io, + path::{Component, Path, PathBuf}, + result::Result, +}; + +use sanakirja::btree::{page, Db_}; +pub use sanakirja::*; + +use crate::{DatabaseKey, DatabaseValue}; + +pub type UnitDatabase = Base<(), ()>; + +/// +/// A simple wrapper around Sanakirja aatabase that acts as a very fast on disk BTreeMap. +/// +/// The state of the tree is uncommited until `.export()` is called during which it is unsafe to stop the program. +/// +pub struct Base +where + Key: DatabaseKey, + Value: DatabaseValue, +{ + pathbuf: PathBuf, + db: Db_>, + txn: MutTxn, +} + +const ROOT_DB: usize = 0; +const PAGE_SIZE: u64 = 4096; + +const DEFRAGMENT_RATIO_THRESHOLD: f64 = 0.5; + +impl Base +where + Key: DatabaseKey, + Value: DatabaseValue, +{ + const KEY_SIZE: usize = size_of::(); + const VALUE_SIZE: usize = size_of::(); + const KEY_AND_VALUE_SIZE: usize = Self::KEY_SIZE + Self::VALUE_SIZE; + + /// Open a database without a lock file where only one instance is safe to open. + pub fn open(pathbuf: PathBuf) -> Result { + fs::create_dir_all(&pathbuf)?; + + let env = unsafe { Env::new_nolock(Self::path_sanakirja_(&pathbuf), PAGE_SIZE, 1)? }; + + let mut txn = Env::mut_txn_begin(env)?; + + let db = txn + .root_db(ROOT_DB) + .unwrap_or_else(|| unsafe { btree::create_db_(&mut txn).unwrap() }); + + Ok(Self { pathbuf, db, txn }) + } + + pub fn path_sanakirja(&self) -> PathBuf { + Self::path_sanakirja_(&self.pathbuf) + } + fn path_sanakirja_(path: &Path) -> PathBuf { + path.join("sanakirja") + } + + pub fn path_self_defragmented(&self) -> PathBuf { + let defragmented_path_opt: Option = self.pathbuf.components().last(); + let folder = match defragmented_path_opt { + Some(Component::Normal(f)) => f.to_str().unwrap(), + _ => unreachable!(), + }; + let mut original_path = self.pathbuf.clone(); + original_path.pop(); + original_path.join(format!("{folder}-defragmented")) + } + + pub fn read_length(&self) -> usize { + Self::read_length_(&self.pathbuf) + } + pub fn read_length_(path: &Path) -> usize { + fs::read(Self::path_length(path)) + .map(|v| { + let mut buf = [0_u8; 8]; + v.iter().enumerate().take(8).for_each(|(i, b)| { + buf[i] = *b; + }); + usize::from_le_bytes(buf) + }) + .unwrap_or_default() + } + pub fn write_length(&self, len: usize) -> Result<(), io::Error> { + Self::write_length_(&self.pathbuf, len) + } + pub fn write_length_(path: &Path, len: usize) -> Result<(), io::Error> { + fs::write(Self::path_length(path), len.to_le_bytes()) + } + fn path_length(path: &Path) -> PathBuf { + path.join("length") + } + + #[inline] + pub fn get(&self, key: &Key) -> Result, Error> { + let option = btree::get(&self.txn, &self.db, key, None)?; + if let Some((key_found, v)) = option { + if key == key_found { + return Ok(Some(v)); + } + } + Ok(None) + } + + /// Iterate over key/value pairs from the database (disk) + #[inline] + #[allow(clippy::type_complexity)] + pub fn iter( + &self, + ) -> Result, Key, Value, page::Page>, Error> { + btree::iter(&self.txn, &self.db, None) + } + + pub fn put(&mut self, key: &Key, value: &Value) -> Result { + btree::put(&mut self.txn, &mut self.db, key, value) + } + + pub fn del(&mut self, key: &Key, value: Option<&Value>) -> Result { + btree::del(&mut self.txn, &mut self.db, key, value) + } + + fn get_file_size_to_data_size_ratio(&self, len: usize) -> Result { + let data_bytes = (len * Self::KEY_AND_VALUE_SIZE) as f64; + let file_bytes = File::open(&self.pathbuf)?.metadata()?.len() as f64; + Ok(file_bytes / data_bytes) + } + + pub fn should_defragment(&self, len: usize) -> Result { + Ok(self.get_file_size_to_data_size_ratio(len)? >= DEFRAGMENT_RATIO_THRESHOLD) + } + + pub fn iter_collect(&self) -> Result, Error> { + self.iter()?.collect::<_>() + } + + pub fn iter_collect_multi(&self) -> Result>, Error> { + let mut tree: BTreeMap<_, Vec<_>> = BTreeMap::new(); + self.iter()?.try_for_each(|res| -> Result<(), Error> { + let (key, value): (&Key, &Value) = res?; + tree.entry(key).or_default().push(value); + Ok(()) + })?; + Ok(tree) + } + + pub fn commit(mut self, len: usize) -> Result<(), Error> { + // dbg!(&self.pathbuf, len); + // panic!(); + self.write_length(len)?; + self.txn.set_root(ROOT_DB, self.db.db.into()); + self.txn.commit() + } + + pub fn destroy(self) -> io::Result<()> { + let path = self.pathbuf.to_owned(); + drop(self); + fs::remove_dir_all(&path) + } + + pub fn path(&self) -> &Path { + &self.pathbuf + } +} diff --git a/snkrj/src/lib.rs b/snkrj/src/lib.rs index 00189538b..241bfd3b7 100644 --- a/snkrj/src/lib.rs +++ b/snkrj/src/lib.rs @@ -1,346 +1,14 @@ // https://docs.rs/sanakirja/latest/sanakirja/index.html // https://pijul.org/posts/2021-02-06-rethinking-sanakirja/ -use core::panic; -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt::Debug, - fs::{self, File}, - io, mem, - path::{Path, PathBuf}, - result::Result, -}; - -use sanakirja::btree::{page, Db_}; pub use sanakirja::*; -/// -/// A simple wrapper around Sanakirja aatabase that acts as a very fast on disk BTreeMap. -/// -/// The state of the tree is uncommited until `.export()` is called during which it is unsafe to stop the program. -/// -pub struct Database -where - Key: DatabaseKey, - Value: DatabaseValue, -{ - pathbuf: PathBuf, - puts: BTreeMap, - dels: BTreeSet, - len: usize, - db: Db_>, - txn: MutTxn, -} +mod base; +mod multi; +mod traits; +mod unique; -const ROOT_DB: usize = 0; -const PAGE_SIZE: u64 = 4096; - -pub type UnitDatabase = Database<(), ()>; - -const DEFRAGMENT_RATIO_THRESHOLD: f64 = 0.5; - -impl Database -where - Key: DatabaseKey, - Value: DatabaseValue, -{ - const KEY_SIZE: usize = size_of::(); - const VALUE_SIZE: usize = size_of::(); - const KEY_AND_VALUE_SIZE: usize = Self::KEY_SIZE + Self::VALUE_SIZE; - - /// Open a database without a lock file where only one instance is safe to open. - pub fn open(pathbuf: PathBuf) -> Result { - fs::create_dir_all(&pathbuf)?; - - let env = unsafe { Env::new_nolock(Self::path_sanakirja_(&pathbuf), PAGE_SIZE, 1)? }; - - let mut txn = Env::mut_txn_begin(env)?; - - let db = txn - .root_db(ROOT_DB) - .unwrap_or_else(|| unsafe { btree::create_db_(&mut txn).unwrap() }); - - Ok(Self { - len: Self::read_length_(&pathbuf), - pathbuf, - puts: BTreeMap::default(), - dels: BTreeSet::default(), - db, - txn, - }) - } - - pub fn path_sanakirja(&self) -> PathBuf { - Self::path_sanakirja_(&self.pathbuf) - } - fn path_sanakirja_(path: &Path) -> PathBuf { - path.join("sanakirja") - } - - pub fn read_length(&self) -> usize { - Self::read_length_(&self.pathbuf) - } - pub fn read_length_(path: &Path) -> usize { - fs::read(Self::path_length(path)) - .map(|v| { - let mut buf = [0_u8; 8]; - v.iter().enumerate().take(8).for_each(|(i, b)| { - buf[i] = *b; - }); - usize::from_le_bytes(buf) - }) - .unwrap_or_default() - } - pub fn write_length(&self) -> Result<(), io::Error> { - Self::write_length_(&self.pathbuf, self.len) - } - pub fn write_length_(path: &Path, len: usize) -> Result<(), io::Error> { - fs::write(Self::path_length(path), len.to_le_bytes()) - } - fn path_length(path: &Path) -> PathBuf { - path.join("length") - } - - #[inline] - pub fn get(&self, key: &Key) -> Option<&Value> { - if let Some(cached_put) = self.get_from_ram(key) { - return Some(cached_put); - } - - self.get_from_disk(key) - } - - /// Get only from the uncommited tree (ram) without checking the database (disk) - #[inline] - pub fn get_from_ram(&self, key: &Key) -> Option<&Value> { - self.puts.get(key) - } - - /// Get mut only from the uncommited tree (ram) without checking the database (disk) - #[inline] - pub fn get_mut_from_ram(&mut self, key: &Key) -> Option<&mut Value> { - self.puts.get_mut(key) - } - - /// Get only from the database (disk) without checking the uncommited tree (ram) - #[inline] - pub fn get_from_disk(&self, key: &Key) -> Option<&Value> { - let option = btree::get(&self.txn, &self.db, key, None).unwrap(); - - if let Some((key_found, v)) = option { - if key == key_found { - return Some(v); - } - } - - None - } - - #[inline] - pub fn insert(&mut self, key: Key, value: Value) -> Option { - self.dels.remove(&key); - self.insert_to_ram(key, value) - } - - /// Insert without removing the key to the dels tree, so be sure that it hasn't added to the delete set - #[inline] - pub fn insert_to_ram(&mut self, key: Key, value: Value) -> Option { - self.len += 1; - self.puts.insert(key, value) - } - - #[inline] - pub fn update(&mut self, key: Key, value: Value) -> Option { - self.dels.insert(key.clone()); - self.puts.insert(key, value) - } - - #[inline] - pub fn remove(&mut self, key: &Key) -> Option { - self.len -= 1; - self.puts.remove(key).or_else(|| { - self.dels.insert(key.clone()); - None - }) - } - - /// Get only from the uncommited tree (ram) without checking the database (disk) - #[inline] - pub fn remove_from_ram(&mut self, key: &Key) -> Option { - self.len -= 1; - self.puts.remove(key) - } - - /// Add the key only to the dels tree without checking if it's present in the puts tree, only use if you are positive that you neither added nor updated an entry with this key - #[inline] - pub fn remove_later_from_disk(&mut self, key: &Key) { - self.len -= 1; - self.dels.insert(key.clone()); - } - - /// Iterate over key/value pairs from the uncommited tree (ram) - #[inline] - pub fn iter_ram(&self) -> std::collections::btree_map::Iter<'_, Key, Value> { - self.puts.iter() - } - - /// Iterate over key/value pairs from the database (disk) - #[inline] - pub fn iter_disk( - &self, - ) -> btree::Iter<'_, MutTxn, Key, Value, page::Page> { - btree::iter(&self.txn, &self.db, None).unwrap() - } - - /// Iterate over key/value pairs - #[inline] - pub fn iter_ram_then_disk(&self) -> impl Iterator { - self.iter_ram().chain(self.iter_disk().map(|r| r.unwrap())) - } - - /// Collect a **clone** of all uncommited key/value pairs (ram) - pub fn collect_ram(&self) -> BTreeMap { - self.puts.clone() - } - - /// Collect a **clone** of all key/value pairs from the database (disk) - pub fn collect_disk(&self) -> BTreeMap { - self.iter_disk() - .map(|r| r.unwrap()) - .map(|(key, value)| (key.clone(), value.clone())) - .collect::<_>() - } - - #[inline] - pub fn len(&self) -> usize { - self.len - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.len == 0 - } - - // pub fn export(self) -> Result<(), Error> { - // self.boxed().boxed_export() - // } - - // pub fn boxed(self) -> Box { - // Box::new(self) - // } - - pub fn get_file_size_to_data_ratio(&self) -> Result { - let data_bytes = (self.len() * Self::KEY_AND_VALUE_SIZE) as f64; - let file_bytes = File::open(&self.pathbuf)?.metadata()?.len() as f64; - Ok(file_bytes / data_bytes) - } - - /// Flush all puts and dels from the ram to disk with an option to defragment the database to save some disk space - /// - /// /!\ Do not kill the program while this function is runnning /!\ - pub fn export(mut self) -> Result<(), Error> { - let defragment = self.get_file_size_to_data_ratio()? >= DEFRAGMENT_RATIO_THRESHOLD; - - if defragment { - let mut btree = self.collect_disk(); - - let disk_len = btree.len(); - let dels_len = self.dels.len(); - let puts_len = self.puts.len(); - - let path = self.pathbuf.to_owned(); - self.dels.iter().for_each(|key| { - btree.remove(key); - }); - btree.append(&mut self.puts); - - let len = btree.len(); - - if len != self.len { - dbg!(len, self.len, path, disk_len, dels_len, puts_len); - panic!("Len should be the same"); - } - - self.destroy()?; - - self = Self::open(path).unwrap(); - - if !self.is_empty() { - panic!() - } - - self.len = len; - self.puts = btree; - } - - self.write_length()?; - - if self.dels.is_empty() && self.puts.is_empty() { - return Ok(()); - } - - mem::take(&mut self.dels) - .into_iter() - .try_for_each(|key| -> Result<(), Error> { - btree::del(&mut self.txn, &mut self.db, &key, None)?; - Ok(()) - })?; - - mem::take(&mut self.puts).into_iter().try_for_each( - |(key, value)| -> Result<(), Error> { - btree::put(&mut self.txn, &mut self.db, &key, &value)?; - Ok(()) - }, - )?; - - self.txn.set_root(ROOT_DB, self.db.db.into()); - - self.txn.commit() - } - - pub fn destroy(self) -> io::Result<()> { - let path = self.pathbuf.to_owned(); - - drop(self); - - fs::remove_dir_all(&path) - } -} - -pub trait AnyDatabase { - fn export(self) -> Result<(), Error>; - // fn boxed_export(self: Box) -> Result<(), Error>; - fn destroy(self) -> io::Result<()>; -} - -impl AnyDatabase for Database -where - Key: DatabaseKey, - Value: DatabaseValue, -{ - fn export(self) -> Result<(), Error> { - self.export() - } - - // fn boxed_export(self: Box) -> Result<(), Error> { - // self.boxed_export() - // } - - fn destroy(self) -> io::Result<()> { - self.destroy() - } -} - -pub trait DatabaseKey -where - Self: Ord + Clone + Debug + Storable + Send + Sync, -{ -} -impl DatabaseKey for T where T: Ord + Clone + Debug + Storable + Send + Sync {} - -pub trait DatabaseValue -where - Self: Clone + Storable + PartialEq + Send + Sync, -{ -} -impl DatabaseValue for T where T: Clone + Storable + PartialEq + Send + Sync {} +pub use base::*; +pub use multi::*; +pub use traits::*; +pub use unique::*; diff --git a/snkrj/src/main.rs b/snkrj/src/main.rs index a28e195de..055d45f79 100644 --- a/snkrj/src/main.rs +++ b/snkrj/src/main.rs @@ -1,16 +1,16 @@ -use snkrj::Database; +use snkrj::DatabaseUnique; fn main() { let path = std::env::temp_dir().join("./db"); - let database: Database = Database::open(path.clone()).unwrap(); - let _ = database.destroy(); + // let database: DatabaseUnique = DatabaseUnique::open(path.clone()).unwrap(); + // let _ = database.destroy(); - let mut database: Database = Database::open(path.clone()).unwrap(); + let mut database: DatabaseUnique = DatabaseUnique::open(path.clone()).unwrap(); database.insert(64, 128); database.export().unwrap(); - let mut database: Database = Database::open(path).unwrap(); + let mut database: DatabaseUnique = DatabaseUnique::open(path).unwrap(); database.insert(1, 2); database.insert(128, 256); println!("iter_ram:"); @@ -18,11 +18,11 @@ fn main() { println!("{:?}", pair); }); println!("iter_disk:"); - database.iter_disk().for_each(|pair| { + database.iter_disk().unwrap().for_each(|pair| { println!("{:?}", pair.unwrap()); }); println!("iter_ram_then_disk:"); - database.iter_ram_then_disk().for_each(|pair| { + database.iter_ram_then_disk().unwrap().for_each(|pair| { println!("{:?}", pair); }); database.export().unwrap(); diff --git a/snkrj/src/multi.rs b/snkrj/src/multi.rs new file mode 100644 index 000000000..24f8cbebe --- /dev/null +++ b/snkrj/src/multi.rs @@ -0,0 +1,240 @@ +// https://docs.rs/sanakirja/latest/sanakirja/index.html +// https://pijul.org/posts/2021-02-06-rethinking-sanakirja/ + +use core::panic; +use std::{ + collections::{BTreeMap, BTreeSet}, + fs, mem, + path::PathBuf, + result::Result, +}; + +use sanakirja::btree::page; +pub use sanakirja::*; + +use crate::{AnyDatabase, Base, DatabaseKey, DatabaseValue}; + +/// +/// A simple wrapper around Sanakirja aatabase that acts as a very fast on disk BTreeMap. +/// +/// The state of the tree is uncommited until `.export()` is called during which it is unsafe to stop the program. +/// +pub struct DatabaseMulti +where + Key: DatabaseKey, + Value: DatabaseValue, +{ + puts: BTreeMap>, + dels: BTreeSet, + len: usize, + db: Base, +} + +impl DatabaseMulti +where + Key: DatabaseKey, + Value: DatabaseValue, +{ + /// Open a database without a lock file where only one instance is safe to open. + pub fn open(pathbuf: PathBuf) -> Result { + let db = Base::open(pathbuf)?; + Ok(Self { + len: db.read_length(), + puts: BTreeMap::default(), + dels: BTreeSet::default(), + db, + }) + } + + #[inline] + pub fn get(&self, key: &Key) -> Result, Error> { + if let Some(cached_put) = self.get_uncommited(key) { + return Ok(Some(cached_put)); + } + + self.db.get(key) + } + + /// Get only from the uncommited tree (ram) without checking the database (disk) + #[inline] + pub fn get_uncommited(&self, key: &Key) -> Option<&Value> { + self.puts.get(key).and_then(|v| v.first()) + } + + /// Get mut only from the uncommited tree (ram) without checking the database (disk) + #[inline] + pub fn get_mut_uncommited(&mut self, key: &Key) -> Option<&mut Value> { + self.puts.get_mut(key).and_then(|v| v.first_mut()) + } + + #[inline] + pub fn insert(&mut self, key: Key, value: Value) -> Option { + self.dels.remove(&key); + self.unchecked_insert(key, value) + } + + /// Insert without removing the key to the dels tree, so be sure that it hasn't added to the delete set + #[inline] + pub fn unchecked_insert(&mut self, key: Key, value: Value) -> Option { + self.len += 1; + self.puts.entry(key).or_default().push(value); + None + } + + #[inline] + pub fn update(&mut self, key: Key, value: Value) -> Option { + todo!() + // self.dels.insert(key.clone()); + // self.puts.insert(key, value) + } + + #[inline] + pub fn remove(&mut self, key: &Key) -> Option { + todo!() + // self.len -= 1; + // self.puts.remove(key).or_else(|| { + // self.dels.insert(key.clone()); + // None + // }) + } + + /// Remove only from the uncommited tree (ram) without checking the database (disk) + #[inline] + pub fn remove_from_uncommited(&mut self, key: &Key) -> Option { + todo!() + // self.len -= 1; + // self.puts.remove(key) + } + + /// Add the key only to the dels tree without checking if it's present in the puts tree, only use if you are positive that you neither added nor updated an entry with this key + #[inline] + pub fn remove_later_from_disk(&mut self, key: &Key) { + todo!() + // self.len -= 1; + // self.dels.insert(key.clone()); + } + + /// Iterate over key/value pairs from the uncommited tree (ram) + #[inline] + pub fn iter_ram(&self) -> std::collections::btree_map::Iter<'_, Key, Vec> { + self.puts.iter() + } + + /// Iterate over key/value pairs from the database (disk) + #[inline] + #[allow(clippy::type_complexity)] + pub fn iter_disk( + &self, + ) -> Result, Key, Value, page::Page>, Error> { + self.db.iter() + } + + /// Iterate over key/value pairs + // #[inline] + // pub fn iter_ram_then_disk(&self) -> Result, Error> { + // todo!(); + // // Ok(self.iter_ram().chain(self.iter_disk()?.map(|r| r.unwrap()))) + // } + + /// Collect a **clone** of all uncommited key/value pairs (ram) + pub fn collect_ram(&self) -> BTreeMap { + todo!() + // self.puts.clone() + } + + #[inline] + pub fn len(&self) -> usize { + self.len + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Flush all puts and dels from the ram to disk with an option to defragment the database to save some disk space + /// + /// /!\ Do not kill the program while this function is runnning /!\ + pub fn export(mut self) -> Result<(), Error> { + if self.dels.is_empty() && self.puts.is_empty() { + return Ok(()); + } + + if self.db.should_defragment(self.len)? { + let mut btree = self.db.iter_collect_multi()?; + // TODO: + // self.dels.iter().for_each(|key| { + // btree.remove(key); + // }); + self.puts.iter().for_each(|(key, values)| { + // btree.insert(key, value); + let vec = btree.entry(key).or_default(); + vec.extend(values.iter()); + }); + + let path_self_original = self.db.path().to_owned(); + let path_self_defragmented = self.db.path_self_defragmented(); + + let len = btree.values().map(|v| v.len()).sum::(); + + if len != self.len { + dbg!(len, self.len, path_self_defragmented); + panic!("Len should be the same"); + } + + { + let mut defragmented = Self::open(path_self_defragmented.clone()).unwrap(); + + btree + .into_iter() + .try_for_each(|(key, values)| -> Result<(), Error> { + values + .into_iter() + .try_for_each(|value| -> Result<(), Error> { + defragmented.db.put(key, value)?; + Ok(()) + })?; + Ok(()) + })?; + + defragmented.len = len; + defragmented.db.commit(self.len)?; + } + + drop(self); + + fs::remove_dir_all(&path_self_original)?; + fs::rename(&path_self_defragmented, &path_self_original)?; + + Ok(()) + } else { + mem::take(&mut self.dels) + .into_iter() + .try_for_each(|key| -> Result<(), Error> { + self.db.del(&key, None)?; + Ok(()) + })?; + + mem::take(&mut self.puts).into_iter().try_for_each( + |(key, vec): (Key, Vec)| -> Result<(), Error> { + vec.into_iter().try_for_each(|value| -> Result<(), Error> { + self.db.put(&key, &value)?; + Ok(()) + }) + }, + )?; + + self.db.commit(self.len) + } + } +} + +impl AnyDatabase for DatabaseMulti +where + Key: DatabaseKey, + Value: DatabaseValue, +{ + fn export(self) -> Result<(), Error> { + self.export() + } +} diff --git a/snkrj/src/traits.rs b/snkrj/src/traits.rs new file mode 100644 index 000000000..758b70472 --- /dev/null +++ b/snkrj/src/traits.rs @@ -0,0 +1,59 @@ +use std::fmt::Debug; + +use sanakirja::Storable; + +pub trait AnyDatabase { + fn export(self) -> Result<(), sanakirja::Error>; + // fn destroy(self) -> io::Result<()>; +} + +pub trait DatabaseKey +where + Self: Ord + Clone + Debug + Storable + Send + Sync, +{ + const SIZE: usize = size_of::(); + const SIZE_SMALLER_THAN_TWO: bool = Self::SIZE < 2; + + fn as_ne_byte(&self) -> u8 { + let data: *const Self = self; + let data: *const u8 = data as *const u8; + let slice = unsafe { std::slice::from_raw_parts(data, Self::SIZE) }; + + *(if cfg!(target_endian = "big") { + slice.last() + } else { + slice.first() + }) + .unwrap() + } + + fn as_ne_six_bits(&self) -> u8 { + self.as_ne_byte() >> 2 + } + + fn as_ne_two_bytes(&self) -> [u8; 2] { + let data: *const Self = self; + let data: *const u8 = data as *const u8; + let slice = unsafe { std::slice::from_raw_parts(data, Self::SIZE) }; + + if Self::SIZE_SMALLER_THAN_TWO { + panic!("Doesn't make sense") + } + + if cfg!(target_endian = "big") { + let mut iter = slice.iter().rev(); + [*iter.next().unwrap(), *iter.next().unwrap()] + } else { + let mut iter = slice.iter(); + [*iter.next().unwrap(), *iter.next().unwrap()] + } + } +} +impl DatabaseKey for T where T: Ord + Clone + Debug + Storable + Send + Sync {} + +pub trait DatabaseValue +where + Self: Clone + Storable + PartialEq + Send + Sync, +{ +} +impl DatabaseValue for T where T: Clone + Storable + PartialEq + Send + Sync {} diff --git a/snkrj/src/unique.rs b/snkrj/src/unique.rs new file mode 100644 index 000000000..f5dc6582e --- /dev/null +++ b/snkrj/src/unique.rs @@ -0,0 +1,223 @@ +// https://docs.rs/sanakirja/latest/sanakirja/index.html +// https://pijul.org/posts/2021-02-06-rethinking-sanakirja/ + +use core::panic; +use std::{ + collections::{BTreeMap, BTreeSet}, + fs, mem, + path::PathBuf, + result::Result, +}; + +use sanakirja::btree::page; +pub use sanakirja::*; + +use crate::{AnyDatabase, Base, DatabaseKey, DatabaseValue}; + +/// +/// A simple wrapper around Sanakirja aatabase that acts as a very fast on disk BTreeMap. +/// +/// The state of the tree is uncommited until `.export()` is called during which it is unsafe to stop the program. +/// +pub struct DatabaseUnique +where + Key: DatabaseKey, + Value: DatabaseValue, +{ + puts: BTreeMap, + dels: BTreeSet, + len: usize, + db: Base, +} + +impl DatabaseUnique +where + Key: DatabaseKey, + Value: DatabaseValue, +{ + /// Open a database without a lock file where only one instance is safe to open. + pub fn open(pathbuf: PathBuf) -> Result { + let db = Base::open(pathbuf)?; + Ok(Self { + len: db.read_length(), + puts: BTreeMap::default(), + dels: BTreeSet::default(), + db, + }) + } + + #[inline] + pub fn get(&self, key: &Key) -> Result, Error> { + if let Some(cached_put) = self.get_uncommited(key) { + return Ok(Some(cached_put)); + } + + self.db.get(key) + } + + /// Get only from the uncommited tree (ram) without checking the database (disk) + #[inline] + pub fn get_uncommited(&self, key: &Key) -> Option<&Value> { + self.puts.get(key) + } + + /// Get mut only from the uncommited tree (ram) without checking the database (disk) + #[inline] + pub fn get_mut_uncommited(&mut self, key: &Key) -> Option<&mut Value> { + self.puts.get_mut(key) + } + + #[inline] + pub fn insert(&mut self, key: Key, value: Value) -> Option { + self.dels.remove(&key); + self.unchecked_insert(key, value) + } + + /// Insert without removing the key to the dels tree, so be sure that it hasn't added to the delete set + #[inline] + pub fn unchecked_insert(&mut self, key: Key, value: Value) -> Option { + self.len += 1; + self.puts.insert(key, value) + } + + #[inline] + pub fn update(&mut self, key: Key, value: Value) -> Option { + self.dels.insert(key.clone()); + self.puts.insert(key, value) + } + + #[inline] + pub fn remove(&mut self, key: &Key) -> Option { + self.len -= 1; + self.puts.remove(key).or_else(|| { + self.dels.insert(key.clone()); + None + }) + } + + /// Remove only from the uncommited tree (ram) without checking the database (disk) + #[inline] + pub fn remove_from_uncommited(&mut self, key: &Key) -> Option { + self.len -= 1; + self.puts.remove(key) + } + + /// Add the key only to the dels tree without checking if it's present in the puts tree, only use if you are positive that you neither added nor updated an entry with this key + #[inline] + pub fn remove_later_from_disk(&mut self, key: &Key) { + self.len -= 1; + self.dels.insert(key.clone()); + } + + /// Iterate over key/value pairs from the uncommited tree (ram) + #[inline] + pub fn iter_ram(&self) -> std::collections::btree_map::Iter<'_, Key, Value> { + self.puts.iter() + } + + /// Iterate over key/value pairs from the database (disk) + #[inline] + #[allow(clippy::type_complexity)] + pub fn iter_disk( + &self, + ) -> Result, Key, Value, page::Page>, Error> { + self.db.iter() + } + + /// Iterate over key/value pairs + #[inline] + pub fn iter_ram_then_disk(&self) -> Result, Error> { + Ok(self.iter_ram().chain(self.iter_disk()?.map(|r| r.unwrap()))) + } + + /// Collect a **clone** of all uncommited key/value pairs (ram) + pub fn collect_ram(&self) -> BTreeMap { + self.puts.clone() + } + + #[inline] + pub fn len(&self) -> usize { + self.len + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Flush all puts and dels from the ram to disk with an option to defragment the database to save some disk space + /// + /// /!\ Do not kill the program while this function is runnning /!\ + pub fn export(mut self) -> Result<(), Error> { + if self.dels.is_empty() && self.puts.is_empty() { + return Ok(()); + } + + if self.db.should_defragment(self.len)? { + let mut btree = self.db.iter_collect()?; + self.dels.iter().for_each(|key| { + btree.remove(key); + }); + self.puts.iter().for_each(|(key, value)| { + btree.insert(key, value); + }); + + let path_self_original = self.db.path().to_owned(); + let path_self_defragmented = self.db.path_self_defragmented(); + + let len = btree.len(); + + if len != self.len { + dbg!(len, self.len, path_self_defragmented); + panic!("Len should be the same"); + } + + { + let mut defragmented = Self::open(path_self_defragmented.clone()).unwrap(); + + btree + .into_iter() + .try_for_each(|(key, value)| -> Result<(), Error> { + defragmented.db.put(key, value)?; + Ok(()) + })?; + + defragmented.len = len; + defragmented.db.commit(self.len)?; + } + + drop(self); + + fs::remove_dir_all(&path_self_original)?; + fs::rename(&path_self_defragmented, &path_self_original)?; + + Ok(()) + } else { + mem::take(&mut self.dels) + .into_iter() + .try_for_each(|key| -> Result<(), Error> { + self.db.del(&key, None)?; + Ok(()) + })?; + + mem::take(&mut self.puts).into_iter().try_for_each( + |(key, value)| -> Result<(), Error> { + self.db.put(&key, &value)?; + Ok(()) + }, + )?; + + self.db.commit(self.len) + } + } +} + +impl AnyDatabase for DatabaseUnique +where + Key: DatabaseKey, + Value: DatabaseValue, +{ + fn export(self) -> Result<(), Error> { + self.export() + } +} diff --git a/storable_vec/src/lib.rs b/storable_vec/src/lib.rs index 3037b9080..69839dbd7 100644 --- a/storable_vec/src/lib.rs +++ b/storable_vec/src/lib.rs @@ -26,7 +26,6 @@ use memmap2::{Mmap, MmapOptions}; #[derive(Debug)] pub struct StorableVec { pathbuf: PathBuf, - // file_for_reads: File, file: File, cache: Vec>>, // Boxed to reduce the size of the lock (24 > 16) disk_len: usize, @@ -42,46 +41,6 @@ const MAX_PAGE_SIZE: usize = 4 * 4096; const ONE_MB: usize = 1000 * 1024; const MAX_CACHE_SIZE: usize = 100 * ONE_MB; -#[derive(Debug, Clone)] -pub enum Value<'a, T> { - Ref(&'a T), - Owned(T), -} - -impl Value<'_, T> -where - T: Sized + Debug + Clone, -{ - pub fn into_inner(self) -> T { - match self { - Self::Ref(t) => t.to_owned(), - Self::Owned(t) => t, - } - } -} - -impl Deref for Value<'_, T> { - type Target = T; - fn deref(&self) -> &Self::Target { - match self { - Self::Ref(t) => t, - Self::Owned(t) => t, - } - } -} - -impl AsRef for Value<'_, T> -where - T: Sized + Debug + Clone, -{ - fn as_ref(&self) -> &T { - match self { - Self::Ref(t) => t, - Self::Owned(t) => t, - } - } -} - impl StorableVec where I: Into, @@ -91,7 +50,8 @@ where pub const PER_PAGE: usize = MAX_PAGE_SIZE / Self::SIZE_OF_T; /// In bytes pub const PAGE_SIZE: usize = Self::PER_PAGE * Self::SIZE_OF_T; - pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; + pub const CACHE_LENGTH: usize = usize::MAX; + // pub const CACHE_LENGTH: usize = MAX_CACHE_SIZE / Self::PAGE_SIZE; pub fn import(path: &Path) -> Result { let file = Self::open_file(path)?; @@ -100,7 +60,6 @@ where pathbuf: path.to_owned(), disk_len: Self::byte_index_to_index(file.metadata()?.len() as usize), file, - // file_for_reads: Self::open_file(path)?, cache: vec![], pushed: vec![], // updated: BTreeMap::new(), @@ -109,9 +68,6 @@ where phantom: PhantomData, }; - this.cache.resize_with(Self::CACHE_LENGTH, Default::default); - this.cache.shrink_to_fit(); - this.reset_cache(); Ok(this) @@ -121,9 +77,14 @@ where // let len = (self.disk_len as f64 / Self::PER_PAGE as f64).ceil() as usize; // self.cache.clear(); // self.cache.resize_with(len, Default::default); + self.cache.iter_mut().for_each(|lock| { lock.take(); }); + let len = (self.disk_len as f64 / Self::PER_PAGE as f64).ceil() as usize; + self.cache + .resize_with(Self::CACHE_LENGTH.min(len), Default::default); + self.cache.shrink_to_fit(); } fn open_file(path: &Path) -> Result { @@ -164,7 +125,6 @@ where } } - #[allow(unused)] #[inline] pub fn get(&self, index: I) -> Result>> { self.get_(index.into()) @@ -191,7 +151,7 @@ where let last_index = self.disk_len - 1; let max_page_index = last_index / Self::PER_PAGE; let min_page_index = (max_page_index + 1) - .checked_sub(Self::CACHE_LENGTH) + .checked_sub(self.cache.len()) .unwrap_or_default(); if page_index >= min_page_index { @@ -215,7 +175,6 @@ where Ok(Some(Value::Ref(T::unsafe_try_from_slice(slice)?))) } else { - // let mut file = &self.file_for_reads; let mut file = Self::open_file(&self.pathbuf).unwrap(); file.seek(SeekFrom::Start(byte_index as u64)).unwrap(); @@ -228,6 +187,15 @@ where Ok(Some(Value::Owned(value.to_owned()))) } } + pub fn get_or_default(&self, index: I) -> Result + where + T: Default + Clone, + { + Ok(self + .get(index)? + .map(|v| (*v).clone()) + .unwrap_or(Default::default())) + } #[allow(unused)] pub fn first(&self) -> Result>> { @@ -253,7 +221,11 @@ where pub fn push_if_needed_(&mut self, index: usize, value: T) -> Result<()> { let len = self.len(); match len.cmp(&index) { - Ordering::Greater => Ok(()), + Ordering::Greater => { + // dbg!(len, index); + // panic!(); + Ok(()) + } Ordering::Equal => { self.push(value); Ok(()) @@ -325,23 +297,21 @@ where } pub fn flush(&mut self) -> io::Result<()> { - self.disk_len += self.pushed.len(); - self.reset_cache(); + if self.pushed.is_empty() { + return Ok(()); + } + + self.disk_len += self.pushed.len(); + let mut bytes: Vec = vec![]; mem::take(&mut self.pushed) .into_iter() .for_each(|v| bytes.extend_from_slice(v.unsafe_as_slice())); - // self.file.seek(SeekFrom::End(0))?; self.file.write_all(&bytes)?; - // self.file_for_mmaps.write_all(&bytes)?; - - // self.file_for_mmaps.sync_all()?; - // self.file_for_reads.sync_all()?; - // self.file_for_reads = Self::open_file(&self.pathbuf)?; Ok(()) } @@ -371,6 +341,44 @@ where } } +#[derive(Debug, Clone)] +pub enum Value<'a, T> { + Ref(&'a T), + Owned(T), +} + +impl Value<'_, T> +where + T: Sized + Debug + Clone, +{ + pub fn into_inner(self) -> T { + match self { + Self::Ref(t) => t.to_owned(), + Self::Owned(t) => t, + } + } +} +impl Deref for Value<'_, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + match self { + Self::Ref(t) => t, + Self::Owned(t) => t, + } + } +} +impl AsRef for Value<'_, T> +where + T: Sized + Debug + Clone, +{ + fn as_ref(&self) -> &T { + match self { + Self::Ref(t) => t, + Self::Owned(t) => t, + } + } +} + pub trait UnsafeSizedSerDe where Self: Sized,