mirror of
https://github.com/bitcoinresearchkit/brk.git
synced 2026-06-30 22:09:00 -07:00
bitview: reorg part 7 + fix hanging ?
This commit is contained in:
@@ -16,7 +16,7 @@ use bitcoincore_rpc::RpcApi;
|
||||
use blk_index_to_blk_path::*;
|
||||
use brk_error::Result;
|
||||
use brk_structs::{BlkMetadata, BlkPosition, Block, Height, ParsedBlock};
|
||||
use crossbeam::channel::{bounded, Receiver};
|
||||
use crossbeam::channel::{Receiver, bounded};
|
||||
use parking_lot::{RwLock, RwLockReadGuard};
|
||||
use rayon::prelude::*;
|
||||
|
||||
@@ -157,30 +157,36 @@ impl Parser {
|
||||
|
||||
let mut bulk = vec![];
|
||||
|
||||
let drain_and_send = |bulk: &mut Vec<(BlkMetadata, AnyBlock, XORIndex)>| {
|
||||
// Using a vec and sending after to not end up with stuck threads in par iter
|
||||
mem::take(bulk)
|
||||
.into_par_iter()
|
||||
.try_for_each(|(metdata, any_block, xor_i)| {
|
||||
if let Ok(AnyBlock::Decoded(block)) =
|
||||
any_block.decode(metdata, rpc, xor_i, xor_bytes, start, end)
|
||||
&& send_block.send(block).is_err()
|
||||
{
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
// Private pool to prevent collision with the global pool
|
||||
// Without it there can be hanging
|
||||
let parser_pool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(thread::available_parallelism().unwrap().get() / 2)
|
||||
.build()
|
||||
.expect("Failed to create parser thread pool");
|
||||
|
||||
ControlFlow::Continue(())
|
||||
})
|
||||
let drain_and_send = |bulk: &mut Vec<(BlkMetadata, AnyBlock, XORIndex)>| {
|
||||
parser_pool.install(|| {
|
||||
mem::take(bulk)
|
||||
.into_par_iter()
|
||||
.try_for_each(|(metdata, any_block, xor_i)| {
|
||||
if let Ok(AnyBlock::Decoded(block)) =
|
||||
any_block.decode(metdata, rpc, xor_i, xor_bytes, start, end)
|
||||
&& send_block.send(block).is_err()
|
||||
{
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
ControlFlow::Continue(())
|
||||
})
|
||||
})
|
||||
};
|
||||
|
||||
recv_bytes.iter().try_for_each(|tuple| {
|
||||
bulk.push(tuple);
|
||||
|
||||
if bulk.len() < BOUND_CAP / 2 {
|
||||
if bulk.len() < BOUND_CAP {
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
|
||||
// Sending in bulk to not lock threads in standby
|
||||
drain_and_send(&mut bulk)
|
||||
})?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user