#![doc = include_str!("../README.md")] use std::{fs, path::Path, thread, time::Instant}; use brk_error::Result; use brk_indexer::Indexer; use brk_reader::Reader; use brk_traversable::Traversable; use brk_types::Version; use tracing::info; use vecdb::{Exit, Ro, Rw, StorageMode}; mod blocks; mod cointime; mod constants; mod distribution; pub mod indexes; mod inputs; mod internal; mod market; mod mining; mod outputs; mod pools; mod positions; pub mod prices; mod scripts; mod supply; mod transactions; #[derive(Traversable)] pub struct Computer { pub blocks: Box>, pub mining: Box>, pub transactions: Box>, pub scripts: Box>, pub positions: Box>, pub cointime: Box>, pub constants: Box, pub indexes: Box>, pub market: Box>, pub pools: Box>, pub prices: Box>, pub distribution: Box>, pub supply: Box>, pub inputs: Box>, pub outputs: Box>, } const VERSION: Version = Version::new(5); impl Computer { pub fn forced_import(outputs_path: &Path, indexer: &Indexer) -> Result { info!("Importing computer..."); let import_start = Instant::now(); let computed_path = outputs_path.join("computed"); const STACK_SIZE: usize = 8 * 1024 * 1024; let big_thread = || thread::Builder::new().stack_size(STACK_SIZE); let (indexes, positions) = timed("Imported indexes/positions", || { thread::scope(|s| -> Result<_> { let positions_handle = big_thread().spawn_scoped(s, || -> Result<_> { Ok(Box::new(positions::Vecs::forced_import( &computed_path, VERSION, )?)) })?; let indexes = Box::new(indexes::Vecs::forced_import( &computed_path, VERSION, indexer, )?); let positions = positions_handle.join().unwrap()?; Ok((indexes, positions)) }) })?; let (inputs, outputs) = timed("Imported inputs/outputs", || { thread::scope(|s| -> Result<_> { let inputs_handle = big_thread().spawn_scoped(s, || -> Result<_> { Ok(Box::new(inputs::Vecs::forced_import( &computed_path, VERSION, &indexes, )?)) })?; let outputs_handle = big_thread().spawn_scoped(s, || -> Result<_> { Ok(Box::new(outputs::Vecs::forced_import( &computed_path, VERSION, &indexes, )?)) })?; let inputs = inputs_handle.join().unwrap()?; let outputs = outputs_handle.join().unwrap()?; Ok((inputs, outputs)) }) })?; let (constants, prices) = timed("Imported prices/constants", || -> Result<_> { let constants = Box::new(constants::Vecs::new(VERSION, &indexes)); let prices = Box::new(prices::Vecs::forced_import( &computed_path, VERSION, &indexes, )?); Ok((constants, prices)) })?; let (blocks, mining, transactions, scripts, pools, cointime) = timed("Imported blocks/mining/tx/scripts/pools/cointime", || { thread::scope(|s| -> Result<_> { let blocks_handle = big_thread().spawn_scoped(s, || -> Result<_> { Ok(Box::new(blocks::Vecs::forced_import( &computed_path, VERSION, indexer, &indexes, )?)) })?; let mining_handle = big_thread().spawn_scoped(s, || -> Result<_> { Ok(Box::new(mining::Vecs::forced_import( &computed_path, VERSION, &indexes, )?)) })?; let transactions_handle = big_thread().spawn_scoped(s, || -> Result<_> { Ok(Box::new(transactions::Vecs::forced_import( &computed_path, VERSION, indexer, &indexes, )?)) })?; let scripts_handle = big_thread().spawn_scoped(s, || -> Result<_> { Ok(Box::new(scripts::Vecs::forced_import( &computed_path, VERSION, &indexes, )?)) })?; let cointime = Box::new(cointime::Vecs::forced_import( &computed_path, VERSION, &indexes, )?); let blocks = blocks_handle.join().unwrap()?; let mining = mining_handle.join().unwrap()?; let transactions = transactions_handle.join().unwrap()?; let scripts = scripts_handle.join().unwrap()?; let pools = Box::new(pools::Vecs::forced_import( &computed_path, VERSION, &indexes, )?); Ok((blocks, mining, transactions, scripts, pools, cointime)) }) })?; let distribution = timed("Imported distribution", || -> Result<_> { Ok(Box::new(distribution::Vecs::forced_import( &computed_path, VERSION, &indexes, )?)) })?; let supply = timed("Imported supply", || -> Result<_> { Ok(Box::new(supply::Vecs::forced_import( &computed_path, VERSION, &indexes, &distribution, )?)) })?; let market = timed("Imported market", || -> Result<_> { Ok(Box::new(market::Vecs::forced_import( &computed_path, VERSION, &indexes, )?)) })?; info!("Total import time: {:?}", import_start.elapsed()); let this = Self { blocks, mining, transactions, scripts, constants, market, distribution, supply, positions, pools, cointime, indexes, inputs, prices, outputs, }; Self::retain_databases(&computed_path)?; Ok(this) } /// Removes database folders that are no longer in use. fn retain_databases(computed_path: &Path) -> Result<()> { const EXPECTED_DBS: &[&str] = &[ blocks::DB_NAME, mining::DB_NAME, transactions::DB_NAME, scripts::DB_NAME, positions::DB_NAME, cointime::DB_NAME, indexes::DB_NAME, market::DB_NAME, pools::DB_NAME, prices::DB_NAME, distribution::DB_NAME, supply::DB_NAME, inputs::DB_NAME, outputs::DB_NAME, ]; if !computed_path.exists() { return Ok(()); } for entry in fs::read_dir(computed_path)? { let entry = entry?; let file_type = entry.file_type()?; if !file_type.is_dir() { continue; } if let Some(name) = entry.file_name().to_str() && !EXPECTED_DBS.contains(&name) { info!("Removing obsolete database folder: {}", name); fs::remove_dir_all(entry.path())?; } } Ok(()) } pub fn compute( &mut self, indexer: &Indexer, starting_indexes: brk_indexer::Indexes, reader: &Reader, exit: &Exit, ) -> Result<()> { let compute_start = Instant::now(); let mut starting_indexes = timed("Computed indexes", || { self.indexes .compute(indexer, &mut self.blocks, starting_indexes, exit) })?; timed("Computed prices", || { self.prices .compute(indexer, &self.indexes, &starting_indexes, exit) })?; thread::scope(|scope| -> Result<()> { let positions = scope.spawn(|| { timed("Computed positions", || { self.positions .compute(indexer, &starting_indexes, reader, exit) }) }); timed("Computed blocks", || { self.blocks .compute(indexer, &self.indexes, &starting_indexes, exit) })?; timed("Computed inputs", || { self.inputs.compute( indexer, &self.indexes, &self.blocks, &starting_indexes, exit, ) })?; timed("Computed scripts", || { self.scripts.compute( indexer, &self.blocks, &self.outputs, &self.prices, &starting_indexes, exit, ) })?; timed("Computed outputs", || { self.outputs.compute( indexer, &self.indexes, &self.inputs, &self.scripts, &self.blocks, &starting_indexes, exit, ) })?; timed("Computed transactions", || { self.transactions.compute( indexer, &self.indexes, &self.blocks, &self.inputs, &self.outputs, &self.prices, &starting_indexes, exit, ) })?; timed("Computed mining", || { self.mining.compute( indexer, &self.indexes, &self.blocks, &self.transactions, &self.prices, &starting_indexes, exit, ) })?; positions.join().unwrap()?; Ok(()) })?; let starting_indexes_clone = starting_indexes.clone(); thread::scope(|scope| -> Result<()> { let pools = scope.spawn(|| { timed("Computed pools", || { self.pools.compute( indexer, &self.indexes, &self.blocks, &self.prices, &self.mining, &starting_indexes_clone, exit, ) }) }); timed("Computed distribution", || { self.distribution.compute( indexer, &self.indexes, &self.inputs, &self.outputs, &self.transactions, &self.blocks, &self.prices, &mut starting_indexes, exit, ) })?; pools.join().unwrap()?; Ok(()) })?; thread::scope(|scope| -> Result<()> { let market = scope.spawn(|| { timed("Computed market", || { self.market.compute( &self.indexes, &self.prices, &self.blocks, &self.mining, &self.distribution, &self.transactions, &starting_indexes, exit, ) }) }); timed("Computed supply", || { self.supply.compute( &self.scripts, &self.blocks, &self.mining, &self.transactions, &self.prices, &self.distribution, &starting_indexes, exit, ) })?; market.join().unwrap()?; Ok(()) })?; timed("Computed cointime", || { self.cointime.compute( &starting_indexes, &self.prices, &self.blocks, &self.mining, &self.supply, &self.distribution, exit, ) })?; info!("Total compute time: {:?}", compute_start.elapsed()); Ok(()) } } impl Computer { /// Iterate over all exportable vecs with their database name. pub fn iter_named_exportable( &self, ) -> impl Iterator { use brk_traversable::Traversable; macro_rules! named { ($($field:ident),+ $(,)?) => { std::iter::empty() $(.chain(self.$field.iter_any_exportable().map(|v| ($field::DB_NAME, v))))+ }; } named!( blocks, mining, transactions, scripts, positions, cointime, constants, indexes, market, pools, prices, distribution, supply, inputs, outputs, ) } } fn timed(label: &str, f: impl FnOnce() -> T) -> T { let start = Instant::now(); let result = f(); info!("{label} in {:?}", start.elapsed()); result }