bindex: snapshot

This commit is contained in:
nym21
2025-01-23 11:11:39 +01:00
parent 1296a2e9ec
commit d629ae8fbb
64 changed files with 334 additions and 3844 deletions
+46
View File
@@ -0,0 +1,46 @@
use std::{
collections::BTreeMap,
fs,
path::{Path, PathBuf},
};
use derived_deref::{Deref, DerefMut};
const BLK: &str = "blk";
const DAT: &str = ".dat";
#[derive(Debug, Deref, DerefMut)]
pub struct BlkIndexToBlkPath(BTreeMap<usize, PathBuf>);
impl BlkIndexToBlkPath {
pub fn scan(data_dir: &Path) -> Self {
let blocks_dir = data_dir.join("blocks");
Self(
fs::read_dir(blocks_dir)
.unwrap()
.map(|entry| entry.unwrap().path())
.filter(|path| {
let is_file = path.is_file();
if is_file {
let file_name = path.file_name().unwrap().to_str().unwrap();
file_name.starts_with(BLK) && file_name.ends_with(DAT)
} else {
false
}
})
.map(|path| {
let file_name = path.file_name().unwrap().to_str().unwrap();
let blk_index = file_name[BLK.len()..(file_name.len() - DAT.len())]
.parse::<usize>()
.unwrap();
(blk_index, path)
})
.collect::<BTreeMap<_, _>>(),
)
}
}
+127
View File
@@ -0,0 +1,127 @@
use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet},
fs::{self, File},
io::{BufReader, BufWriter},
path::{Path, PathBuf},
};
use derived_deref::{Deref, DerefMut};
use crate::{blk_recap::BlkRecap, BlkIndexToBlkPath, BlkMetadataAndBlock};
const TARGET_BLOCKS_PER_MONTH: usize = 144 * 30;
#[derive(Deref, DerefMut, Debug)]
pub struct BlkIndexToBlkRecap {
path: PathBuf,
#[target]
tree: BTreeMap<usize, BlkRecap>,
last_safe_height: Option<usize>,
}
impl BlkIndexToBlkRecap {
pub fn import(blocks_dir: &BlkIndexToBlkPath, data_dir: &Path) -> Self {
let path = data_dir.join("blk_index_to_blk_recap.json");
let tree = {
fs::create_dir_all(data_dir).unwrap();
if let Ok(file) = File::open(&path) {
let reader = BufReader::new(file);
serde_json::from_reader(reader).unwrap_or_default()
} else {
BTreeMap::default()
}
};
let mut this = Self {
path,
tree,
last_safe_height: None,
};
this.clean_outdated(blocks_dir);
this
}
pub fn clean_outdated(&mut self, blocks_dir: &BlkIndexToBlkPath) {
let mut unprocessed_keys = self.keys().copied().collect::<BTreeSet<_>>();
blocks_dir.iter().for_each(|(blk_index, blk_path)| {
unprocessed_keys.remove(blk_index);
if let Some(blk_recap) = self.get(blk_index) {
if blk_recap.has_different_modified_time(blk_path) {
self.remove(blk_index);
}
}
});
unprocessed_keys.into_iter().for_each(|blk_index| {
self.remove(&blk_index);
});
self.last_safe_height = self.iter().map(|(_, recap)| recap.height()).max();
}
pub fn get_start_recap(&self, start: Option<usize>) -> Option<(usize, BlkRecap)> {
if let Some(start) = start {
let (last_key, last_value) = self.last_key_value()?;
if last_value.height() < start {
return Some((*last_key, *last_value));
} else if let Some((blk_index, _)) = self
.iter()
.find(|(_, blk_recap)| blk_recap.is_younger_than(start))
{
if *blk_index != 0 {
let blk_index = *blk_index - 1;
return Some((blk_index, *self.get(&blk_index).unwrap()));
}
}
}
None
}
pub fn update(&mut self, blk_metadata_and_block: &BlkMetadataAndBlock, height: usize) {
let blk_index = blk_metadata_and_block.blk_metadata.index;
if let Some(last_entry) = self.last_entry() {
match last_entry.key().cmp(&blk_index) {
Ordering::Greater => {
last_entry.remove_entry();
}
Ordering::Less => {
self.insert(blk_index, BlkRecap::from(height, blk_metadata_and_block));
}
Ordering::Equal => {}
};
} else {
if blk_index != 0 || height != 0 {
// dbg!(blk_index, height);
unreachable!();
}
self.insert(blk_index, BlkRecap::first(blk_metadata_and_block));
}
if self
.last_safe_height
.map_or(true, |safe_height| height >= safe_height)
&& (height % TARGET_BLOCKS_PER_MONTH) == 0
{
self.export();
}
}
pub fn export(&self) {
let file = File::create(&self.path).unwrap_or_else(|_| {
dbg!(&self.path);
panic!("No such file or directory")
});
serde_json::to_writer_pretty(&mut BufWriter::new(file), &self.tree).unwrap();
}
}
+18
View File
@@ -0,0 +1,18 @@
use std::path::PathBuf;
use crate::path_to_modified_time;
#[derive(Clone, Copy)]
pub struct BlkMetadata {
pub index: usize,
pub modified_time: u64,
}
impl BlkMetadata {
pub fn new(index: usize, path: &PathBuf) -> Self {
Self {
index,
modified_time: path_to_modified_time(path),
}
}
}
+17
View File
@@ -0,0 +1,17 @@
use bitcoin::Block;
use crate::BlkMetadata;
pub struct BlkMetadataAndBlock {
pub blk_metadata: BlkMetadata,
pub block: Block,
}
impl BlkMetadataAndBlock {
pub fn new(blk_metadata: BlkMetadata, block: Block) -> Self {
Self {
blk_metadata,
block,
}
}
}
+47
View File
@@ -0,0 +1,47 @@
use std::path::PathBuf;
use bitcoin::{hashes::Hash, BlockHash};
use serde::{Deserialize, Serialize};
use crate::{path_to_modified_time, BlkMetadataAndBlock};
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct BlkRecap {
min_continuous_height: usize,
min_continuous_prev_hash: BlockHash,
modified_time: u64,
}
impl BlkRecap {
pub fn first(blk_metadata_and_block: &BlkMetadataAndBlock) -> Self {
Self {
min_continuous_height: 0,
min_continuous_prev_hash: BlockHash::all_zeros(),
modified_time: blk_metadata_and_block.blk_metadata.modified_time,
}
}
pub fn from(height: usize, blk_metadata_and_block: &BlkMetadataAndBlock) -> Self {
Self {
min_continuous_height: height,
min_continuous_prev_hash: blk_metadata_and_block.block.header.prev_blockhash,
modified_time: blk_metadata_and_block.blk_metadata.modified_time,
}
}
pub fn has_different_modified_time(&self, blk_path: &PathBuf) -> bool {
self.modified_time != path_to_modified_time(blk_path)
}
pub fn is_younger_than(&self, height: usize) -> bool {
self.min_continuous_height > height
}
pub fn height(&self) -> usize {
self.min_continuous_height
}
pub fn prev_hash(&self) -> &BlockHash {
&self.min_continuous_prev_hash
}
}
+385
View File
@@ -0,0 +1,385 @@
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
fs::{self},
ops::ControlFlow,
path::Path,
thread,
};
use bitcoin::{
consensus::{Decodable, ReadExt},
hashes::Hash,
io::{Cursor, Read},
Block, BlockHash,
};
use bitcoincore_rpc::RpcApi;
use blk_index_to_blk_path::*;
use crossbeam::channel::{bounded, Receiver};
use rayon::prelude::*;
pub use bitcoin;
pub use bitcoincore_rpc;
mod blk_index_to_blk_path;
mod blk_index_to_blk_recap;
mod blk_metadata;
mod blk_metadata_and_block;
mod blk_recap;
mod utils;
use blk_index_to_blk_recap::*;
use blk_metadata::*;
use blk_metadata_and_block::*;
use utils::*;
pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 100;
const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217];
const BOUND_CAP: usize = 210;
enum BlockState {
Raw(Vec<u8>),
Decoded(Block),
}
///
/// Returns a crossbeam channel receiver that receives `(usize, Block, BlockHash)` tuples (with `usize` being the height) in sequential order.
///
/// # Arguments
///
/// * `data_dir` - Path to the Bitcoin data directory
/// * `start` - Inclusive starting height of the blocks received, `None` for 0
/// * `end` - Inclusive ending height of the blocks received, `None` for the last one
/// * `rpc` - RPC client to filter out forks
///
/// # Example
///
/// ```rust
/// use std::path::Path;
///
/// use bitcoincore_rpc::{Auth, Client};
///
/// let i = std::time::Instant::now();
///
/// let data_dir = Path::new("../../bitcoin");
/// let url = "http://localhost:8332";
/// let cookie = Path::new(data_dir).join(".cookie");
/// let auth = Auth::CookieFile(cookie);
/// let rpc = Client::new(url, auth).unwrap();
///
/// let start = Some(850_000);
/// let end = None;
///
/// biter::new(data_dir, start, end, rpc)
/// .iter()
/// .for_each(|(height, _block, hash)| {
/// println!("{height}: {hash}");
/// });
///
/// dbg!(i.elapsed());
/// ```
///
pub fn new(
data_dir: &Path,
start: Option<usize>,
end: Option<usize>,
rpc: bitcoincore_rpc::Client,
) -> Receiver<(usize, Block, BlockHash)> {
let (send_block_reader, recv_block_reader) = bounded(BOUND_CAP);
let (send_block, recv_block) = bounded(BOUND_CAP);
let (send_height_block_hash, recv_height_block_hash) = bounded(BOUND_CAP);
let blk_index_to_blk_path = BlkIndexToBlkPath::scan(data_dir);
let mut blk_index_to_blk_recap = BlkIndexToBlkRecap::import(&blk_index_to_blk_path, data_dir);
let start_recap = blk_index_to_blk_recap.get_start_recap(start);
let starting_blk_index = start_recap.as_ref().map_or(0, |(index, _)| *index);
thread::spawn(move || {
blk_index_to_blk_path
.iter()
.filter(|(blk_index, _)| blk_index >= &&starting_blk_index)
.try_for_each(move |(blk_index, blk_path)| {
let blk_metadata = BlkMetadata::new(*blk_index, blk_path);
let blk_bytes = fs::read(blk_path).unwrap();
let blk_bytes_len = blk_bytes.len() as u64;
let mut cursor = Cursor::new(blk_bytes.as_slice());
let mut current_4bytes = [0; 4];
'parent: loop {
if cursor.position() == blk_bytes_len {
break;
}
// Read until we find a valid suite of MAGIC_BYTES
loop {
current_4bytes.rotate_left(1);
if let Ok(byte) = cursor.read_u8() {
current_4bytes[3] = byte;
} else {
break 'parent;
}
if current_4bytes == MAGIC_BYTES {
break;
}
}
let block_size = cursor.read_u32().unwrap();
let mut raw_block = vec![0u8; block_size as usize];
cursor.read_exact(&mut raw_block).unwrap();
if send_block_reader
.send((blk_metadata, BlockState::Raw(raw_block)))
.is_err()
{
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
})
});
// thread::spawn(move || {
// recv_block_reader.iter().par_bridge().try_for_each(
// move |(blk_metadata, mut block_state)| {
// let raw_block = match block_state {
// BlockState::Raw(vec) => vec,
// _ => unreachable!(),
// };
// let mut cursor = Cursor::new(raw_block);
// block_state = BlockState::Decoded(Block::consensus_decode(&mut cursor).unwrap());
// if send_block
// .send(BlkMetadataAndBlock::new(
// blk_metadata,
// match block_state {
// BlockState::Decoded(block) => block,
// _ => unreachable!(),
// },
// ))
// .is_err()
// {
// return ControlFlow::Break(());
// }
// ControlFlow::Continue(())
// },
// );
// });
// Can't use the previous code because .send() blocks all the threads if full
// And other .par_iter() are also stuck because of that
thread::spawn(move || {
let mut bulk = vec![];
let drain_and_send = |bulk: &mut Vec<_>| {
// Using a vec and sending after to not end up with stuck threads in par iter
bulk.par_iter_mut().for_each(|(_, block_state)| {
let raw_block = match block_state {
BlockState::Raw(vec) => vec,
_ => unreachable!(),
};
let mut cursor = Cursor::new(raw_block);
*block_state = BlockState::Decoded(Block::consensus_decode(&mut cursor).unwrap());
});
bulk.drain(..).try_for_each(|(blk_metadata, block_state)| {
let block = match block_state {
BlockState::Decoded(block) => block,
_ => unreachable!(),
};
if send_block
.send(BlkMetadataAndBlock::new(blk_metadata, block))
.is_err()
{
return ControlFlow::Break(());
}
ControlFlow::Continue(())
})
};
recv_block_reader.iter().try_for_each(|tuple| {
bulk.push(tuple);
if bulk.len() < BOUND_CAP / 2 {
return ControlFlow::Continue(());
}
drain_and_send(&mut bulk)
});
drain_and_send(&mut bulk)
});
// Tokio version: 1022s
// Slighlty slower than rayon version
// thread::spawn(move || {
// let rt = tokio::runtime::Runtime::new().unwrap();
// let _guard = rt.enter();
// let mut tasks = VecDeque::with_capacity(BOUND);
// recv_block_reader
// .iter()
// .try_for_each(move |(blk_metadata, block_state)| {
// let raw_block = match block_state {
// BlockState::Raw(vec) => vec,
// _ => unreachable!(),
// };
// tasks.push_back(tokio::task::spawn(async move {
// let block = Block::consensus_decode(&mut Cursor::new(raw_block)).unwrap();
// (blk_metadata, block)
// }));
// while tasks.len() > BOUND {
// let (blk_metadata, block) = rt.block_on(tasks.pop_front().unwrap()).unwrap();
// if send_block
// .send(BlkMetadataAndBlock::new(blk_metadata, block))
// .is_err()
// {
// return ControlFlow::Break(());
// }
// }
// ControlFlow::Continue(())
// });
//
// todo!("Send the rest")
// });
thread::spawn(move || {
let mut height = start_recap.map_or(0, |(_, recap)| recap.height());
let mut future_blocks = BTreeMap::default();
let mut recent_chain: VecDeque<(BlockHash, BlkMetadataAndBlock)> = VecDeque::default();
let mut recent_hashes: BTreeSet<BlockHash> = BTreeSet::default();
let mut prev_hash =
start_recap.map_or_else(BlockHash::all_zeros, |(_, recap)| *recap.prev_hash());
let mut prepare_and_send = |(hash, tuple): (BlockHash, BlkMetadataAndBlock)| {
blk_index_to_blk_recap.update(&tuple, height);
if start.map_or(true, |start| start <= height) {
send_height_block_hash
.send((height, tuple.block, hash))
.unwrap();
}
if end == Some(height) {
return ControlFlow::Break(());
}
height += 1;
ControlFlow::Continue(())
};
let mut update_tip = |prev_hash: &mut BlockHash,
recent_hashes: &mut BTreeSet<BlockHash>,
recent_chain: &mut VecDeque<(BlockHash, BlkMetadataAndBlock)>,
future_blocks: &mut BTreeMap<BlockHash, BlkMetadataAndBlock>,
tuple: BlkMetadataAndBlock| {
let mut tuple = Some(tuple);
while let Some(tuple) = tuple.take().or_else(|| future_blocks.remove(prev_hash)) {
let hash = tuple.block.block_hash();
*prev_hash = hash;
recent_hashes.insert(hash);
recent_chain.push_back((hash, tuple));
}
while recent_chain.len() > NUMBER_OF_UNSAFE_BLOCKS {
let (hash, tuple) = recent_chain.pop_front().unwrap();
recent_hashes.remove(&hash);
if prepare_and_send((hash, tuple)).is_break() {
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
};
let flow = recv_block.iter().try_for_each(|tuple| {
// block isn't next after current tip
if prev_hash != tuple.block.header.prev_blockhash {
let is_block_active =
|hash| rpc.get_block_header_info(hash).unwrap().confirmations > 0;
// block prev has already been processed
if recent_hashes.contains(&tuple.block.header.prev_blockhash) {
let hash = tuple.block.block_hash();
if is_block_active(&hash) {
let prev_index = recent_chain
.iter()
.position(|(hash, ..)| hash == &tuple.block.header.prev_blockhash)
.unwrap();
let bad_index_start = prev_index + 1;
recent_chain.drain(bad_index_start..).for_each(|(hash, _)| {
recent_hashes.remove(&hash);
});
return update_tip(
&mut prev_hash,
&mut recent_hashes,
&mut recent_chain,
&mut future_blocks,
tuple,
);
}
// Check if there was already a future block with the same prev hash
} else if let Some(prev_tuple) =
future_blocks.insert(tuple.block.header.prev_blockhash, tuple)
{
// If the previous was the active one
if is_block_active(&prev_tuple.block.block_hash()) {
// Rollback the insert
future_blocks.insert(prev_tuple.block.header.prev_blockhash, prev_tuple);
}
}
} else {
return update_tip(
&mut prev_hash,
&mut recent_hashes,
&mut recent_chain,
&mut future_blocks,
tuple,
);
}
ControlFlow::Continue(())
});
if flow.is_continue() {
// Send the last (up to 100) blocks
recent_chain.into_iter().try_for_each(prepare_and_send);
}
blk_index_to_blk_recap.export();
});
recv_height_block_hash
}
+24
View File
@@ -0,0 +1,24 @@
use std::path::Path;
use bitcoincore_rpc::{Auth, Client};
fn main() {
let i = std::time::Instant::now();
let data_dir = Path::new("../../../bitcoin");
let url = "http://localhost:8332";
let cookie = Path::new(data_dir).join(".cookie");
let auth = Auth::CookieFile(cookie);
let rpc = Client::new(url, auth).unwrap();
let start = Some(810078);
let end = None;
biter::new(data_dir, start, end, rpc)
.iter()
.for_each(|(height, _block, hash)| {
println!("{height}: {hash}");
});
dbg!(i.elapsed());
}
+11
View File
@@ -0,0 +1,11 @@
use std::{fs, path::PathBuf, time::UNIX_EPOCH};
pub fn path_to_modified_time(path: &PathBuf) -> u64 {
fs::metadata(path)
.unwrap()
.modified()
.unwrap()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}