bitbase: rayon done

This commit is contained in:
nym21
2025-01-08 18:01:28 +01:00
parent 5b1735db2b
commit 1c3cb91ecd
3 changed files with 339 additions and 280 deletions

View File

@@ -1,7 +1,7 @@
use std::{collections::BTreeMap, path::Path, str::FromStr};
use std::{collections::BTreeMap, path::Path, str::FromStr, thread};
use biter::{
bitcoin::{hashes::Hash, Txid},
bitcoin::{hashes::Hash, TxIn, TxOut, Txid},
bitcoincore_rpc::{Auth, Client},
};
@@ -22,9 +22,16 @@ const DAILY_BLOCK_TARGET: usize = 144;
const MONTHLY_BLOCK_TARGET: usize = DAILY_BLOCK_TARGET * 30;
const U16MAX: usize = u16::MAX as usize;
enum TxInOrAddresstxoutindex<'a> {
TxIn(&'a TxIn),
Addresstxoutindex(Addresstxoutindex),
}
fn main() -> color_eyre::Result<()> {
let i = std::time::Instant::now();
let check_collisions = true;
let data_dir = Path::new("../../../../bitcoin");
let cookie = Path::new(data_dir).join(".cookie");
let rpc = Client::new("http://localhost:8332", Auth::CookieFile(cookie)).unwrap();
@@ -86,14 +93,16 @@ fn main() -> color_eyre::Result<()> {
}
if parts.blockhash_prefix_to_height.needs(height) {
if let Some(prev_height_slice) =
wtx.fetch_update(parts.blockhash_prefix_to_height.data(), blockhash.prefix(), |_| {
Some(Slice::from(height))
})?
{
dbg!(blockhash, Height::from(prev_height_slice));
return Err(eyre!("Collision, expect prefix to need be set yet"));
if check_collisions {
if let Some(prev_height_slice) =
wtx.get(parts.blockhash_prefix_to_height.data(), blockhash.prefix())?
{
dbg!(blockhash, Height::from(prev_height_slice));
return Err(eyre!("Collision, expect prefix to need be set yet"));
}
}
wtx.insert(parts.blockhash_prefix_to_height.data(), blockhash.prefix(),Slice::from(height));
}
if parts.height_to_blockhash.needs(height) {
@@ -111,107 +120,316 @@ fn main() -> color_eyre::Result<()> {
let txlen = block.txdata.len();
let last_txi = txlen - 1;
let mut txi_to_txid_and_prev_txindex_slice_opt = block
.txdata
.par_iter()
.enumerate()
.map(|(txi, tx)| -> color_eyre::Result<(usize, (Txid, Option<Slice>))> {
let txid = tx.compute_txid();
let (txid_to_index_join_handle, txin_or_addresstxoutindex_vec_handle, txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle) = thread::scope(|scope| {
let txid_to_index_handle = scope.spawn(|| -> color_eyre::Result<_> {
block
.txdata
.par_iter()
.enumerate()
.map(|(index, tx)| -> color_eyre::Result<_> {
let txid = tx.compute_txid();
let prev_txindex_slice_opt = wtx.get(parts.txid_prefix_to_txindex.data(), txid.prefix())?;
// Could be removed as should only trigger for two txid (duplicates)
let prev_txindex_slice_opt = wtx.get(parts.txid_prefix_to_txindex.data(), txid.prefix())?;
Ok((txi, (txid, prev_txindex_slice_opt)))
})
.try_fold(
|| -> BTreeMap<usize, (Txid, Option<Slice>)> { BTreeMap::default() },
|mut map, tuple| -> color_eyre::Result<BTreeMap<usize, (Txid, Option<Slice>)>> {
let (txi, tuple) = tuple?;
map.insert(txi, tuple);
Ok(map)
},
)
.try_reduce(BTreeMap::default, |mut map, mut map2| {
if map.len() > map2.len() {
map.append(&mut map2);
Ok(map)
} else {
map2.append(&mut map);
Ok(map2)
Ok((txid, (index, prev_txindex_slice_opt)))
})
.try_fold(
BTreeMap::new,
|mut map, tuple| {
let (key, value) = tuple?;
map.insert(key, value);
Ok(map)
},
)
.try_reduce(BTreeMap::new, |mut map, mut map2| {
if map.len() > map2.len() {
map.append(&mut map2);
Ok(map)
} else {
map2.append(&mut map);
Ok(map2)
}
})});
let txin_or_addresstxoutindex_vec_handle = scope.spawn(|| -> color_eyre::Result<Vec<TxInOrAddresstxoutindex>> {
block
.txdata
.par_iter()
.filter(|tx| !tx.is_coinbase())
.flat_map(|tx| &tx.input)
.map(|txin| -> color_eyre::Result<_> {
let outpoint = txin.previous_output;
let txid_prefix = outpoint.txid.prefix();
let vout = outpoint.vout as u16;
let txindex = if let Some(txindex) = wtx
.get(parts.txid_prefix_to_txindex.data(), txid_prefix)?
.map(Txindex::from)
{
txindex
} else {
return Ok(TxInOrAddresstxoutindex::TxIn(txin));
};
let txoutindex = Txoutindex::from((txindex, vout));
let addressindex = Addressindex::from(
wtx.get(parts.txoutindex_to_addressindex.data(), Slice::from(txoutindex))?
.context("Expect addressindex to not be none")
.inspect_err(|_| {
let height = Height::from(
wtx.get(parts.txindex_to_height.data(), Slice::from(txindex))
.expect("txindex_to_height get not fail")
.expect("Expect height for txindex"),
);
dbg!(outpoint.txid, txindex, vout, txoutindex, height);
})?,
);
Ok(TxInOrAddresstxoutindex::Addresstxoutindex(Addresstxoutindex::from((
addressindex,
txoutindex,
))))
})
.try_fold(
Vec::new,
|mut vec, addresstxoutindex| {
// There is no need to check for bad_tx as there are only 2 instances known
// Which you can find below and which are coinbase tx and thus which are already filtered
if parts.addresstxoutindexes_out.needs(height) {
vec.push(addresstxoutindex?);
}
Ok(vec)
},
)
.try_reduce(Vec::new, |mut v, mut v2| {
if v.len() > v2.len() {
v.append(&mut v2);
Ok(v)
} else {
v2.append(&mut v);
Ok(v2)
}
})
});
let txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle = scope.spawn(|| -> color_eyre::Result<BTreeMap<Txoutindex,
(&TxOut, Addresstype, color_eyre::Result<Addressbytes>, Option<Slice>)>> {
block
.txdata
.par_iter()
.enumerate()
.flat_map(|(index, tx)| {
tx.output
.par_iter()
.enumerate()
.map(move |(vout, txout)| (index, vout, txout))
})
.map(
|(index, vout, txout)| {
if vout > U16MAX {
return Err(eyre!("vout bigger than u16::MAX"));
}
let vout = vout as u16;
let txindex = txindex + index;
let txoutindex = Txoutindex::from((txindex, vout));
let script = &txout.script_pubkey;
let addresstype = Addresstype::from(script);
let addressbytes_res = Addressbytes::try_from((script, addresstype)).inspect_err(|_| {
// dbg!(&txout, height, txi, &tx.compute_txid());
});
let addressindex_slice_opt = addressbytes_res.as_ref().ok().and_then(|addressbytes| {
wtx.get(
parts.addressbytes_prefix_to_addressindex.data(),
Slice::from(addressbytes),
)
.ok()
.and_then(|s| s)
});
let is_new_address = addressindex_slice_opt.is_none();
if check_collisions && is_new_address {
if let Ok(addressbytes) = &addressbytes_res {
if let Some(prev) = wtx.get(
parts.addressbytes_prefix_to_addressindex.data(),
Slice::from(addressbytes),
)? {
dbg!(prev);
return Err(eyre!("addressbytes_prefix_to_addressindex collision, expect none"));
}
}
}
Ok((
txoutindex,
(txout, addresstype, addressbytes_res, addressindex_slice_opt),
))
},
)
.try_fold(
BTreeMap::new,
|mut map, tuple| -> color_eyre::Result<_> {
let (key, value) = tuple?;
map.insert(key, value);
Ok(map)
},
)
.try_reduce(BTreeMap::new, |mut map, mut map2| {
if map.len() > map2.len() {
map.append(&mut map2);
Ok(map)
} else {
map2.append(&mut map);
Ok(map2)
}
})
});
(txid_to_index_handle.join(), txin_or_addresstxoutindex_vec_handle.join(), txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle.join())
});
let txid_to_index = txid_to_index_join_handle.ok().context("Expect txid_to_index_join_handle to join")??;
let txin_or_addresstxoutindex_vec = txin_or_addresstxoutindex_vec_handle.ok().context("Export txin_or_addresstxoutindex_vec_handle to join")??;
let txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt = txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle.ok().context("Expect txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt_handle to join")??;
let mut new_txoutindex_to_addressindex: BTreeMap<Txoutindex, Addressindex> = BTreeMap::new();
txoutindex_to_txout_addresstype_addressbytes_res_addressindex_opt
.into_iter()
.try_for_each(|(txoutindex, (txout, addresstype, addressbytes_res, addressindex_slice_opt))| -> color_eyre::Result<()> {
let amount = Amount::from(txout.value);
let mut addressindex_local = addressindex;
if amount.is_zero() {
if parts.zero_txoutindexes.needs(height) {
wtx.insert(
parts.zero_txoutindexes.data(),
Slice::from(txoutindex),
Slice::default(),
);
}
} else if parts.txoutindex_to_amount.needs(height) {
wtx.insert(
parts.txoutindex_to_amount.data(),
Slice::from(txoutindex),
Slice::from(amount),
);
}
if let Some(addressindex_slice) = addressindex_slice_opt {
addressindex_local = addressindex_slice.into()
} else {
new_txoutindex_to_addressindex.insert(txoutindex, addressindex_local);
if parts.addressindex_to_addresstype.needs(height) {
wtx.insert(
parts.addressindex_to_addresstype.data(),
Slice::from(addressindex_local),
Slice::from(addresstype),
);
}
if let Ok(addressbytes) = addressbytes_res {
if parts.addressbytes_prefix_to_addressindex.needs(height) {
wtx.insert(
parts.addressbytes_prefix_to_addressindex.data(),
Slice::from(addressbytes.prefix()),
Slice::from(addressindex_local),
);
}
if parts.addressindex_to_addressbytes.needs(height) {
wtx.insert(
parts.addressindex_to_addressbytes.data(),
Slice::from(addressindex_local),
Slice::from(&addressbytes),
);
}
}
addressindex.increment();
}
if parts.txoutindex_to_addressindex.needs(height) {
wtx.insert(
parts.txoutindex_to_addressindex.data(),
Slice::from(txoutindex),
Slice::from(addressindex_local),
);
}
let addresstxoutindex = Addresstxoutindex::from((addressindex_local, txoutindex));
if parts.addresstxoutindexes_in.needs(height) {
wtx.insert(
parts.addresstxoutindexes_in.data(),
Slice::from(addresstxoutindex),
Slice::default(),
);
}
Ok(())
})?;
// let addresstxoutindexes_out = block
// .txdata
// .par_iter()
// .filter(|tx| !tx.is_coinbase())
// .flat_map(|tx| &tx.input)
// .try_fold(
// || -> Vec<Addresstxoutindex> { vec![] },
// |mut vec, txin| -> color_eyre::Result<Vec<Addresstxoutindex>> {
// let outpoint = txin.previous_output;
// let txid_prefix = outpoint.txid.prefix();
// let vout = outpoint.vout as u16;
// let txindex = Txindex::from(
// wtx.get(parts.txid_prefix_to_txindex.data(), txid_prefix)?
// .context("Expect txid to be saved")?,
// );
// let txoutindex = Txoutindex::from((txindex, vout));
// let addressindex = Addressindex::from(
// wtx.get(parts.txoutindex_to_addressindex.data(), Slice::from(txoutindex))?
// .context("Expect addressindex to not be none")
// .inspect_err(|_| {
// let height = Height::from(
// wtx.get(parts.txindex_to_height.data(), Slice::from(txindex))
// .expect("txindex_to_height get not fail")
// .expect("Expect height for txindex"),
// );
// dbg!(outpoint.txid, txindex, vout, txoutindex, height);
// })?,
// );
// if parts.addresstxoutindexes_out.needs(height) {
// vec.push(Addresstxoutindex::from((addressindex, txoutindex)));
// }
// Ok(vec)
// },
// )
// .try_reduce(Vec::new, |mut v, mut v2| {
// if v.len() > v2.len() {
// v.append(&mut v2);
// Ok(v)
// } else {
// v2.append(&mut v);
// Ok(v2)
// }
// })?;
// addresstxoutindexes_out.into_iter().for_each(|addresstxoutindex| {
// wtx.insert(
// parts.addresstxoutindexes_out.data(),
// Slice::from(addresstxoutindex),
// Slice::default(),
// );
// });
block
.txdata
if parts.addresstxoutindexes_out.needs(height) {
txin_or_addresstxoutindex_vec
.into_iter()
.enumerate()
.try_for_each(|(txi, tx)| -> color_eyre::Result<()> {
let is_coinbase = tx.is_coinbase();
.map(|txin_or_addresstxoutindex| -> color_eyre::Result<Addresstxoutindex> {
match txin_or_addresstxoutindex {
TxInOrAddresstxoutindex::Addresstxoutindex(addresstxoutindex) => Ok(addresstxoutindex),
TxInOrAddresstxoutindex::TxIn(txin) => {
let outpoint = txin.previous_output;
let txid = outpoint.txid;
let vout = outpoint.vout as u16;
let index = txid_to_index
.get(&txid)
.context("txid should be in same block")?.0;
let txindex = txindex + index;
let txoutindex = Txoutindex::from((txindex, vout));
let addressindex = new_txoutindex_to_addressindex
.remove(&txoutindex)
.context("should have found addressindex from same block").inspect_err(|_| {
dbg!(txin, txoutindex);
})?;
if txi == 0 && parts.height_to_first_txindex.needs(height) {
Ok(Addresstxoutindex::from((addressindex, txoutindex)))
}
}
})
.try_for_each(|addresstxoutindex| -> color_eyre::Result<()> {
wtx.insert(
parts.addresstxoutindexes_out.data(),
Slice::from(addresstxoutindex?),
Slice::default(),
);
Ok(())
})?;
}
txid_to_index.into_iter().try_for_each(
|(txid, (index, prev_txindex_slice_opt))| -> color_eyre::Result<()> {
let txindex = txindex + index;
if index == 0 && parts.height_to_first_txindex.needs(height) {
wtx.insert(
parts.height_to_first_txindex.data(),
Slice::from(height),
Slice::from(txindex),
);
}
if txi == last_txi && parts.height_to_last_txindex.needs(height) {
if index == last_txi && parts.height_to_last_txindex.needs(height) {
wtx.insert(
parts.height_to_last_txindex.data(),
Slice::from(height),
@@ -219,15 +437,6 @@ fn main() -> color_eyre::Result<()> {
);
}
let mut bad_tx = false;
let (txid, prev_txindex_slice_opt) = txi_to_txid_and_prev_txindex_slice_opt
.remove(&txi)
.context("Par compute of tx should have worked")
.inspect_err(|_| {
dbg!(&txi_to_txid_and_prev_txindex_slice_opt, &txi, tx.compute_txid());
})?;
if parts.txindex_to_txid.needs(height) {
wtx.insert(parts.txindex_to_txid.data(), Slice::from(txindex), txid);
}
@@ -239,18 +448,20 @@ fn main() -> color_eyre::Result<()> {
}
}
Some(prev_txindex_slice) => {
// Ok if `get` is not par as should happen only twice
let prev_txid = Txid::from_slice(
&wtx.get(parts.txindex_to_txid.data(), &prev_txindex_slice)?
.expect("To have txid for txindex"),
)?;
// If another Txid needs to be added to the list
// We need to check that it's also a coinbase tx otherwise par_iter inputs needs to be updated
let only_known_dup_txids = [
Txid::from_str("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599")?,
Txid::from_str("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468")?,
];
let is_dup = only_known_dup_txids.contains(&prev_txid);
bad_tx = is_dup;
if !is_dup {
let prev_height = Height::from(
@@ -272,174 +483,9 @@ fn main() -> color_eyre::Result<()> {
);
}
tx.input
.par_iter()
// .into_par_iter()
.try_fold(
|| -> Vec<Addresstxoutindex> { vec![] },
|mut vec, txin| -> color_eyre::Result<Vec<Addresstxoutindex>> {
if is_coinbase {
return Ok(vec);
}
let outpoint = txin.previous_output;
let txid_prefix = outpoint.txid.prefix();
let vout = outpoint.vout as u16;
let txindex = Txindex::from(
wtx.get(parts.txid_prefix_to_txindex.data(), txid_prefix)?
.context("Expect txid to be saved")?,
);
let txoutindex = Txoutindex::from((txindex, vout));
let addressindex = Addressindex::from(
wtx.get(parts.txoutindex_to_addressindex.data(), Slice::from(txoutindex))?
.context("Expect addressindex to not be none")
.inspect_err(|_| {
let height = Height::from(
wtx.get(parts.txindex_to_height.data(), Slice::from(txindex))
.expect("txindex_to_height get not fail")
.expect("Expect height for txindex"),
);
dbg!(txid, outpoint.txid, txindex, vout, txoutindex, height);
})?,
);
if bad_tx {
dbg!(tx.compute_txid(), outpoint);
panic!("bad tx in input")
}
if !bad_tx && parts.addresstxoutindexes_out.needs(height) {
vec.push(Addresstxoutindex::from((addressindex, txoutindex)));
}
Ok(vec)
},
)
.try_reduce(Vec::new, |mut v, mut v2| {
if v.len() > v2.len() {
v.append(&mut v2);
Ok(v)
} else {
v2.append(&mut v);
Ok(v2)
}
})?
.into_iter()
.for_each(|addresstxoutindex| {
wtx.insert(
parts.addresstxoutindexes_out.data(),
Slice::from(addresstxoutindex),
Slice::default(),
);
});
tx.output
.into_iter()
.enumerate()
.try_for_each(|(vout, txout)| -> color_eyre::Result<()> {
if vout > U16MAX {
return Err(eyre!("vout bigger than u16::MAX"));
}
let vout = vout as u16;
let txoutindex = Txoutindex::from((txindex, vout));
let amount = Amount::from(txout.value);
if amount.is_zero() {
if parts.zero_txoutindexes.needs(height) {
wtx.insert(
parts.zero_txoutindexes.data(),
Slice::from(txoutindex),
Slice::default(),
);
}
} else if parts.txoutindex_to_amount.needs(height) {
wtx.insert(
parts.txoutindex_to_amount.data(),
Slice::from(txoutindex),
Slice::from(amount),
);
}
let script = &txout.script_pubkey;
let mut addressindex_local = addressindex;
let addresstype = Addresstype::from(script);
let addressbytes = Addressbytes::try_from((script, addresstype)).inspect_err(|_| {
// dbg!(&txout, height, txi, &tx.compute_txid());
});
if let Some(addressindex_slice) = addressbytes.as_ref().ok().and_then(|addressbytes| {
wtx.get(
parts.addressbytes_prefix_to_addressindex.data(),
Slice::from(addressbytes),
)
.ok()
.and_then(|s| s)
}) {
addressindex_local = addressindex_slice.into()
} else {
if parts.addressindex_to_addresstype.needs(height) {
wtx.insert(
parts.addressindex_to_addresstype.data(),
Slice::from(addressindex_local),
Slice::from(addresstype),
);
}
if let Ok(addressbytes) = addressbytes {
if parts.addressbytes_prefix_to_addressindex.needs(height) {
if let Some(prev) = wtx.fetch_update(
parts.addressbytes_prefix_to_addressindex.data(),
Slice::from(&addressbytes),
|_| Some(Slice::from(addressindex_local)),
)? {
dbg!(prev);
return Err(eyre!("Expect none"));
}
}
if parts.addressindex_to_addressbytes.needs(height) {
wtx.insert(
parts.addressindex_to_addressbytes.data(),
Slice::from(addressindex_local),
Slice::from(&addressbytes),
);
}
}
addressindex.increment();
}
if parts.txoutindex_to_addressindex.needs(height) {
wtx.insert(
parts.txoutindex_to_addressindex.data(),
Slice::from(txoutindex),
Slice::from(addressindex_local),
);
}
let addresstxoutindex = Addresstxoutindex::from((addressindex_local, txoutindex));
if !bad_tx && parts.addresstxoutindexes_in.needs(height) {
wtx.insert(
parts.addresstxoutindexes_in.data(),
Slice::from(addresstxoutindex),
Slice::default(),
);
}
Ok(())
})?;
txindex.increment();
Ok(())
})?;
},
)?;
if parts.height_to_last_addressindex.needs(height) {
wtx.insert(
@@ -457,10 +503,12 @@ fn main() -> color_eyre::Result<()> {
wtx_opt.replace(wtx);
}
txindex += txlen;
Ok(())
})?;
let wtx = wtx_opt.take().context("option should have WriteTransaction")?;
let wtx = wtx_opt.take().context("option should have wtx")?;
export(&keyspace, wtx, &parts, height)?;
dbg!(i.elapsed());

View File

@@ -5,7 +5,7 @@ use fjall::Slice;
use super::Addresstype;
#[derive(Debug, Deref, DerefMut)]
#[derive(Debug, Deref, DerefMut, PartialEq, Eq, PartialOrd, Ord)]
pub struct Addressbytes(Slice);
impl TryFrom<(&ScriptBuf, Addresstype)> for Addressbytes {

View File

@@ -1,3 +1,5 @@
use std::ops::{Add, AddAssign};
use derive_deref::{Deref, DerefMut};
use fjall::Slice;
@@ -9,10 +11,6 @@ pub struct Txindex(u32);
impl Txindex {
pub const BYTES: usize = size_of::<Self>();
pub fn increment(&mut self) {
self.0 += 1;
}
pub fn incremented(self) -> Self {
Self(*self + 1)
}
@@ -45,3 +43,16 @@ impl From<Txindex> for Slice {
value.to_be_bytes().into()
}
}
impl Add<usize> for Txindex {
type Output = Self;
fn add(self, rhs: usize) -> Self::Output {
Self(self.0 + rhs as u32)
}
}
impl AddAssign<usize> for Txindex {
fn add_assign(&mut self, rhs: usize) {
self.0 += rhs as u32
}
}