global: snapshot

This commit is contained in:
nym21
2025-02-28 11:52:25 +01:00
parent 5b1ca3711a
commit 1b93ccf608
35 changed files with 460 additions and 271 deletions

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../../../README.md")]
#[cfg(feature = "core")]
pub mod core {
#[doc(inline)]
@@ -10,6 +12,12 @@ pub mod computer {
pub use brk_computer::*;
}
#[cfg(feature = "exit")]
pub mod exit {
#[doc(inline)]
pub use brk_exit::*;
}
#[cfg(feature = "fetcher")]
pub mod fetcher {
#[doc(inline)]
@@ -39,3 +47,9 @@ pub mod server {
#[doc(inline)]
pub use brk_server::*;
}
#[cfg(feature = "vec")]
pub mod vec {
#[doc(inline)]
pub use brk_vec::*;
}

View File

@@ -0,0 +1,2 @@
#![doc = include_str!("../README.md")]

View File

@@ -9,7 +9,10 @@ repository.workspace = true
[dependencies]
brk_core = { workspace = true }
brk_exit = { workspace = true }
brk_fetcher = { workspace = true }
brk_indexer = { workspace = true }
brk_logger = { workspace = true }
brk_parser = { workspace = true }
brk_vec = { workspace = true }
color-eyre = { workspace = true }
log = { workspace = true }

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../README.md")]
use std::path::{Path, PathBuf};
use brk_exit::Exit;
@@ -37,42 +39,52 @@ impl Computer<SINGLE_THREAD> {
// TODO: Remove all outdated
self.vecs
.txindex_to_last_txinindex
.compute_last_index_from_first(&mut indexer.vecs.txindex_to_first_txinindex, txinindexes_count)?;
self.vecs.txindex_to_last_txinindex.compute_last_index_from_first(
&mut indexer.vecs.txindex_to_first_txinindex,
txinindexes_count,
exit,
)?;
self.vecs.txindex_to_inputs_count.compute_count_from_indexes(
&mut indexer.vecs.txindex_to_first_txinindex,
&mut self.vecs.txindex_to_last_txinindex,
exit,
)?;
self.vecs
.txindex_to_last_txoutindex
.compute_last_index_from_first(&mut indexer.vecs.txindex_to_first_txoutindex, txoutindexes_count)?;
self.vecs.txindex_to_last_txoutindex.compute_last_index_from_first(
&mut indexer.vecs.txindex_to_first_txoutindex,
txoutindexes_count,
exit,
)?;
self.vecs.txindex_to_outputs_count.compute_count_from_indexes(
&mut indexer.vecs.txindex_to_first_txoutindex,
&mut self.vecs.txindex_to_last_txoutindex,
exit,
)?;
self.vecs
.height_to_date
.compute_transform(&mut indexer.vecs.height_to_timestamp, |timestamp| {
Date::from(*timestamp)
})?;
self.vecs.height_to_date.compute_transform(
&mut indexer.vecs.height_to_timestamp,
|timestamp| Date::from(*timestamp),
exit,
)?;
self.vecs
.height_to_last_txindex
.compute_last_index_from_first(&mut indexer.vecs.height_to_first_txindex, height_count)?;
self.vecs.height_to_last_txindex.compute_last_index_from_first(
&mut indexer.vecs.height_to_first_txindex,
height_count,
exit,
)?;
self.vecs.txindex_to_height.compute_inverse_less_to_more(
&mut indexer.vecs.height_to_first_txindex,
&mut self.vecs.height_to_last_txindex,
exit,
)?;
self.vecs.txindex_to_is_coinbase.compute_is_first_ordered(
&mut self.vecs.txindex_to_height,
&mut indexer.vecs.height_to_first_txindex,
exit,
)?;
// self.vecs.txindex_to_fee.compute_transform(
@@ -86,7 +98,7 @@ impl Computer<SINGLE_THREAD> {
self.vecs
.dateindex_to_first_height
.compute_inverse_more_to_less(&mut self.vecs.height_to_dateindex)?;
.compute_inverse_more_to_less(&mut self.vecs.height_to_dateindex, exit)?;
// ---
// Date to X

View File

@@ -1,25 +1,50 @@
use std::path::Path;
use std::{path::Path, thread::sleep, time::Duration};
use brk_computer::Computer;
use brk_exit::Exit;
use brk_indexer::Indexer;
use brk_parser::{
Parser,
rpc::{self, RpcApi},
};
use brk_vec::CACHED_GETS;
use log::info;
pub fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
brk_logger::init(Some(Path::new(".log")));
let data_dir = Path::new("../../../bitcoin");
let rpc = Box::leak(Box::new(rpc::Client::new(
"http://localhost:8332",
rpc::Auth::CookieFile(Path::new(data_dir).join(".cookie")),
)?));
let exit = Exit::new();
let i = std::time::Instant::now();
let parser = Parser::new(data_dir, rpc);
let outputs_dir = Path::new("../../_outputs");
let indexer = Indexer::import(&outputs_dir.join("indexes"))?;
let indexer: Indexer<CACHED_GETS> = Indexer::import(&outputs_dir.join("indexes"))?;
let mut computer = Computer::import(&outputs_dir.join("computed"))?;
// let mut computer = Computer::import(&outputs_dir.join("computed"))?;
computer.compute(indexer, &exit)?;
// loop {
// let block_count = rpc.get_block_count()?;
dbg!(i.elapsed());
// info!("{block_count} blocks found.");
// indexer.index(&parser, rpc, &exit)?;
// computer.compute(indexer, &exit)?;
// info!("Waiting for new blocks...");
// while block_count == rpc.get_block_count()? {
// sleep(Duration::from_secs(1))
// }
// }
Ok(())
}

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../README.md")]
mod error;
mod structs;
mod utils;

View File

@@ -12,6 +12,10 @@ impl Date {
pub const INDEX_ONE: Self = Self(20090109);
pub const INDEX_ONE_: Date_ = Date_::constant(2009, 1, 9);
pub fn new(year: u16, month: u8, day: u8) -> Self {
Self(year as u32 * 1_00_00 + month as u32 * 1_00 + day as u32)
}
pub fn year(&self) -> u16 {
(self.0 / 1_00_00) as u16
}
@@ -33,7 +37,7 @@ impl Default for Date {
impl From<Date_> for Date {
fn from(value: Date_) -> Self {
Self(value.year() as u32 * 1_00_00 + value.month() as u32 * 1_00 + value.day() as u32)
Self::new(value.year() as u16, value.month() as u8, value.day() as u8)
}
}
@@ -58,3 +62,9 @@ impl From<Dateindex> for Date {
)
}
}
impl std::fmt::Display for Date {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!("{}-{}-{}", self.year(), self.month(), self.day()))
}
}

View File

@@ -1,4 +1,5 @@
use derive_deref::Deref;
use jiff::{civil::date, tz::TimeZone};
use serde::Serialize;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
@@ -7,12 +8,27 @@ use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
)]
pub struct Timestamp(u32);
impl Timestamp {
pub fn floor_seconds(self) -> Self {
let t = jiff::Timestamp::from(self).to_zoned(TimeZone::UTC);
let d = jiff::civil::DateTime::from(t);
let d = date(d.year(), d.month(), d.day()).at(d.hour(), d.minute(), 0, 0);
Self::from(d.to_zoned(TimeZone::UTC).unwrap().timestamp())
}
}
impl From<u32> for Timestamp {
fn from(value: u32) -> Self {
Self(value)
}
}
impl From<jiff::Timestamp> for Timestamp {
fn from(value: jiff::Timestamp) -> Self {
Self(value.as_second() as u32)
}
}
impl From<Timestamp> for jiff::Timestamp {
fn from(value: Timestamp) -> Self {
jiff::Timestamp::from_second(*value as i64).unwrap()

View File

@@ -1,6 +1,6 @@
use byteview::ByteView;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Unit;
impl From<ByteView> for Unit {

View File

@@ -0,0 +1 @@
# BRK Exit

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../README.md")]
use std::{
process::exit,
sync::{

View File

@@ -11,19 +11,19 @@ use color_eyre::eyre::{ContextCompat, eyre};
use log::info;
use serde_json::Value;
use crate::{Close, Date, Dollars, High, Low, Open, Pricer, fetchers::retry};
use crate::{Close, Date, Dollars, Fetcher, High, Low, Open, fetchers::retry};
pub struct Binance {
path: PathBuf,
path: Option<PathBuf>,
_1mn: Option<BTreeMap<Timestamp, OHLCCents>>,
_1d: Option<BTreeMap<Date, OHLCCents>>,
har: Option<BTreeMap<Timestamp, OHLCCents>>,
}
impl Binance {
pub fn init(path: &Path) -> Self {
pub fn init(path: Option<&Path>) -> Self {
Self {
path: path.to_owned(),
path: path.map(|p| p.to_owned()),
_1mn: None,
_1d: None,
har: None,
@@ -38,12 +38,23 @@ impl Binance {
if self._1mn.is_none() || self._1mn.as_ref().unwrap().last_key_value().unwrap().0 <= &timestamp {
self._1mn.replace(Self::fetch_1mn()?);
}
Pricer::find_height_ohlc(
let res = Fetcher::find_height_ohlc(
self._1mn.as_ref().unwrap(),
timestamp,
previous_timestamp,
"binance 1mn",
)
);
if res.is_ok() {
return res;
}
if self.har.is_none() {
self.har.replace(self.read_har().unwrap_or_default());
}
Fetcher::find_height_ohlc(self.har.as_ref().unwrap(), timestamp, previous_timestamp, "binance har")
}
pub fn fetch_1mn() -> color_eyre::Result<BTreeMap<Timestamp, OHLCCents>> {
@@ -72,8 +83,6 @@ impl Binance {
pub fn fetch_1d() -> color_eyre::Result<BTreeMap<Date, OHLCCents>> {
info!("Fetching daily prices from Kraken...");
dbg!(&Self::url("interval=1d"));
retry(
|_| Self::json_to_date_to_ohlc(&minreq::get(Self::url("interval=1d")).send()?.json()?),
30,
@@ -81,21 +90,14 @@ impl Binance {
)
}
pub fn get_from_har_binance(
&mut self,
timestamp: Timestamp,
previous_timestamp: Option<Timestamp>,
) -> color_eyre::Result<OHLCCents> {
if self.har.is_none() {
self.har.replace(self.read_har().unwrap_or_default());
}
Pricer::find_height_ohlc(self.har.as_ref().unwrap(), timestamp, previous_timestamp, "binance har")
}
fn read_har(&self) -> color_eyre::Result<BTreeMap<Timestamp, OHLCCents>> {
if self.path.is_none() {
return Err(eyre!("Path missing"));
}
info!("Reading Binance har file...");
let path = &self.path;
let path = self.path.as_ref().unwrap();
fs::create_dir_all(path)?;

View File

@@ -27,7 +27,7 @@ impl Kibo {
}
}
pub fn get_from_height_kibo(&mut self, height: Height) -> color_eyre::Result<OHLCCents> {
pub fn get_from_height(&mut self, height: Height) -> color_eyre::Result<OHLCCents> {
#[allow(clippy::map_entry)]
if !self.height_to_ohlc_vec.contains_key(&height)
|| ((usize::from(height) + self.height_to_ohlc_vec.get(&height).unwrap().len()) <= usize::from(height))
@@ -77,7 +77,7 @@ impl Kibo {
)
}
pub fn get_from_date_kibo(&mut self, date: &Date) -> color_eyre::Result<OHLCCents> {
pub fn get_from_date(&mut self, date: &Date) -> color_eyre::Result<OHLCCents> {
let year = date.year();
#[allow(clippy::map_entry)]

View File

@@ -5,7 +5,7 @@ use color_eyre::eyre::ContextCompat;
use log::info;
use serde_json::Value;
use crate::{Pricer, fetchers::retry};
use crate::{Fetcher, fetchers::retry};
#[derive(Default)]
pub struct Kraken {
@@ -22,7 +22,7 @@ impl Kraken {
if self._1mn.is_none() || self._1mn.as_ref().unwrap().last_key_value().unwrap().0 <= &timestamp {
self._1mn.replace(Self::fetch_1mn()?);
}
Pricer::find_height_ohlc(self._1mn.as_ref().unwrap(), timestamp, previous_timestamp, "kraken 1m")
Fetcher::find_height_ohlc(self._1mn.as_ref().unwrap(), timestamp, previous_timestamp, "kraken 1m")
}
fn fetch_1mn() -> color_eyre::Result<BTreeMap<Timestamp, OHLCCents>> {

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../README.md")]
use std::{collections::BTreeMap, fs, path::Path};
use brk_core::{Cents, Close, Date, Dollars, Height, High, Low, OHLCCents, Open, Timestamp};
@@ -6,94 +8,80 @@ use color_eyre::eyre::Error;
mod fetchers;
// use brk_indexer::Indexer;
pub use fetchers::*;
use fetchers::*;
pub struct Pricer {
pub struct Fetcher {
binance: Binance,
kraken: Kraken,
kibo: Kibo,
}
impl Pricer {
pub fn import(path: &Path) -> color_eyre::Result<Self> {
fs::create_dir_all(path)?;
impl Fetcher {
pub fn import(hars_path: Option<&Path>) -> color_eyre::Result<Self> {
if let Some(path) = hars_path {
fs::create_dir_all(path)?;
}
Ok(Self {
binance: Binance::init(path),
binance: Binance::init(hars_path),
kraken: Kraken::default(),
kibo: Kibo::default(),
})
}
fn get_date_ohlc(&mut self, date: Date) -> color_eyre::Result<OHLCCents> {
todo!();
// if self.ohlc.date.is_key_safe(date) {
// Ok(self.ohlc.date.get_or_import(&date).unwrap().to_owned())
// } else {
// let ohlc = self
// .get_from_daily_kraken(&date)
// .or_else(|_| self.get_from_daily_binance(&date))
// .or_else(|_| self.get_from_date_kibo(&date))?;
// self.ohlc.date.insert(date, ohlc);
// Ok(ohlc)
// }
pub fn get_date(&mut self, date: Date) -> color_eyre::Result<OHLCCents> {
self.kraken
.get_from_1d(&date)
.or_else(|_| self.binance.get_from_1d(&date))
.or_else(|_| self.kibo.get_from_date(&date))
}
fn get_height_ohlc(
pub fn get_height(
&mut self,
height: Height,
timestamp: Timestamp,
previous_timestamp: Option<Timestamp>,
) -> color_eyre::Result<OHLCCents> {
todo!();
let timestamp = timestamp.floor_seconds();
// if let Some(ohlc) = self.ohlc.height.get_or_import(&height) {
// return Ok(ohlc);
// }
if previous_timestamp.is_none() && height != Height::ZERO {
panic!("Shouldn't be possible");
}
// let timestamp = timestamp.to_floored_seconds();
let previous_timestamp = previous_timestamp.map(|t| t.floor_seconds());
// if previous_timestamp.is_none() && !height.is_first() {
// panic!("Shouldn't be possible");
// }
let ohlc = self
.kraken
.get_from_1mn(timestamp, previous_timestamp)
.unwrap_or_else(|_| {
self.binance
.get_from_1mn(timestamp, previous_timestamp)
.unwrap_or_else(|_| {
self.kibo.get_from_height(height).unwrap_or_else(|_| {
let date = Date::from(timestamp);
// let previous_timestamp = previous_timestamp.map(|t| t.to_floored_seconds());
panic!(
"Can't find the price for: height: {height} - date: {date}
1mn APIs are limited to the last 16 hours for Binance's and the last 10 hours for Kraken's
How to fix this:
1. Go to https://www.binance.com/en/trade/BTC_USDT?type=spot
2. Select 1mn interval
3. Open the inspector/dev tools
4. Go to the Network Tab
5. Filter URLs by 'uiKlines'
6. Go back to the chart and scroll until you pass the date mentioned few lines ago
7. Go back to the dev tools
8. Export to a har file (if there is no explicit button, click on the cog button)
9. Move the file to 'parser/imports/binance.har'
"
)
})
})
});
// let ohlc = self
// .get_from_1mn_kraken(timestamp, previous_timestamp)
// .unwrap_or_else(|_| {
// self.get_from_1mn_binance(timestamp, previous_timestamp)
// .unwrap_or_else(|_| {
// self.get_from_har_binance(timestamp, previous_timestamp, config)
// .unwrap_or_else(|_| {
// self.get_from_height_kibo(&height).unwrap_or_else(|_| {
// let date = timestamp.to_date();
// self.ohlc.height.insert(height, ohlc);
// panic!(
// "Can't find the price for: height: {height} - date: {date}
// 1mn APIs are limited to the last 16 hours for Binance's and the last 10 hours for Kraken's
// How to fix this:
// 1. Go to https://www.binance.com/en/trade/BTC_USDT?type=spot
// 2. Select 1mn interval
// 3. Open the inspector/dev tools
// 4. Go to the Network Tab
// 5. Filter URLs by 'uiKlines'
// 6. Go back to the chart and scroll until you pass the date mentioned few lines ago
// 7. Go back to the dev tools
// 8. Export to a har file (if there is no explicit button, click on the cog button)
// 9. Move the file to 'parser/imports/binance.har'
// "
// )
// })
// })
// })
// });
// // self.ohlc.height.insert(height, ohlc);
// Ok(ohlc)
Ok(ohlc)
}
fn find_height_ohlc(

View File

@@ -1,16 +1,15 @@
use brk_fetcher::Binance;
use brk_core::Date;
use brk_fetcher::Fetcher;
fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
brk_logger::init(None);
dbg!(Binance::fetch_1d()?);
// dbg!(Binance::fetch_1mn_prices());
// dbg!(Kraken::fetch_1d()?);
// dbg!(Kraken::fetch_1mn_prices()?);
// dbg!(Kibo::fetch_date_prices(2025)?);
// dbg!(Kibo::fetch_height_prices(Height::from(880_000_u32))?);
let mut fetcher = Fetcher::import(None)?;
dbg!(fetcher.get_date(Date::new(2025, 1, 1))?);
dbg!(fetcher.get_height(885604_u32.into(), 1740683986.into(), Some(1740683000.into()))?);
Ok(())
}

View File

@@ -7,9 +7,9 @@ use brk_parser::NUMBER_OF_UNSAFE_BLOCKS;
use brk_vec::CACHED_GETS;
use color_eyre::eyre::ContextCompat;
use crate::storage::{Stores, Vecs};
use crate::{Stores, Vecs};
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct Indexes {
pub addressindex: Addressindex,
pub emptyindex: Emptyindex,

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../README.md")]
use std::{
collections::BTreeMap,
path::Path,
@@ -17,13 +19,13 @@ use brk_vec::CACHED_GETS;
use color_eyre::eyre::{ContextCompat, eyre};
use log::info;
use rayon::prelude::*;
mod indexes;
mod stores;
mod vecs;
mod storage;
mod structs;
pub use storage::{AnyStorableVec, StorableVec, Store, StoreMeta};
use storage::{Stores, Vecs};
pub use structs::*;
pub use indexes::*;
pub use stores::*;
pub use vecs::*;
const SNAPSHOT_BLOCK_RANGE: usize = 1000;
@@ -47,7 +49,7 @@ impl<const MODE: u8> Indexer<MODE> {
}
impl Indexer<CACHED_GETS> {
pub fn index(&mut self, parser: &Parser, rpc: &'static rpc::Client, exit: &Exit) -> color_eyre::Result<()> {
pub fn index(&mut self, parser: &Parser, rpc: &'static rpc::Client, exit: &Exit) -> color_eyre::Result<Indexes> {
let check_collisions = true;
let starting_indexes = Indexes::try_from((&mut self.vecs, &self.stores, rpc)).unwrap_or_else(|_| {
@@ -82,14 +84,13 @@ impl Indexer<CACHED_GETS> {
let vecs = &mut self.vecs;
let stores = &mut self.stores;
let mut idxs = starting_indexes;
if idxs.height > Height::try_from(rpc)? {
return Ok(());
if starting_indexes.height > Height::try_from(rpc)? {
return Ok(starting_indexes);
}
info!("Started indexing...");
let mut idxs = starting_indexes.clone();
parser.parse(Some(idxs.height), None).iter().try_for_each(
|(height, block, blockhash)| -> color_eyre::Result<()> {
info!("Indexing block {height}...");
@@ -633,7 +634,7 @@ impl Indexer<CACHED_GETS> {
export_if_needed(stores, vecs, idxs.height, true, exit)?;
Ok(())
Ok(starting_indexes)
}
}

View File

@@ -40,6 +40,5 @@ fn main() -> color_eyre::Result<()> {
}
#[allow(unreachable_code)]
// To make sure that Fjall had the time to flush everything properly
sleep(Duration::from_millis(100));
Ok(())
}

View File

@@ -1,5 +0,0 @@
mod stores;
mod vecs;
pub use stores::*;
pub use vecs::*;

View File

@@ -29,8 +29,8 @@ const CHECK_COLLISISONS: bool = true;
impl<K, V> Store<K, V>
where
K: Debug + Into<ByteView> + Ord + Immutable + IntoBytes,
V: Debug + Into<ByteView> + TryFrom<ByteView>,
K: Debug + Clone + Into<ByteView> + Ord + Immutable + IntoBytes,
V: Debug + Clone + Into<ByteView> + TryFrom<ByteView>,
<V as TryFrom<ByteView>>::Error: error::Error + Send + Sync + 'static,
{
pub fn import(path: &Path, version: Version) -> color_eyre::Result<Self> {
@@ -161,7 +161,26 @@ where
fn open_partition_handle(keyspace: &TransactionalKeyspace) -> Result<TransactionalPartitionHandle> {
keyspace.open_partition(
"partition",
PartitionCreateOptions::default().manual_journal_persist(true),
PartitionCreateOptions::default()
.bloom_filter_bits(Some(5))
.manual_journal_persist(true),
)
}
}
impl<Key, Value> Clone for Store<Key, Value>
where
Key: Clone,
Value: Clone,
{
fn clone(&self) -> Self {
Self {
meta: self.meta.clone(),
keyspace: self.keyspace.clone(),
part: self.part.clone(),
rtx: self.keyspace.read_tx(),
puts: self.puts.clone(),
dels: self.dels.clone(),
}
}
}

View File

@@ -8,7 +8,7 @@ use zerocopy::{FromBytes, IntoBytes};
use super::Height;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct StoreMeta {
pathbuf: PathBuf,
version: Version,

View File

@@ -13,6 +13,7 @@ pub use meta::*;
use super::Vecs;
#[derive(Clone)]
pub struct Stores {
pub addresshash_to_addressindex: Store<AddressHash, Addressindex>,
pub blockhash_prefix_to_height: Store<BlockHashPrefix, Height>,

View File

@@ -1,3 +0,0 @@
mod indexes;
pub use indexes::*;

View File

@@ -5,7 +5,7 @@ use std::{
path::{Path, PathBuf},
};
use brk_vec::{StoredIndex, StoredType, Version};
use brk_vec::{CACHED_GETS, StoredIndex, StoredType, Version};
use super::Height;
@@ -27,11 +27,6 @@ where
})
}
pub fn flush(&mut self, height: Height) -> io::Result<()> {
height.write(&self.path_height())?;
self.vec.flush()
}
pub fn truncate_if_needed(&mut self, index: I, height: Height) -> brk_vec::Result<Option<T>> {
if self.height.is_none_or(|self_height| self_height != height) {
height.write(&self.path_height())?;
@@ -50,6 +45,17 @@ where
}
}
impl<I, T> StorableVec<I, T, CACHED_GETS>
where
I: StoredIndex,
T: StoredType,
{
pub fn flush(&mut self, height: Height) -> io::Result<()> {
height.write(&self.path_height())?;
self.vec.flush()
}
}
impl<I, T, const MODE: u8> Deref for StorableVec<I, T, MODE> {
type Target = brk_vec::StorableVec<I, T, MODE>;
fn deref(&self) -> &Self::Target {
@@ -67,7 +73,7 @@ pub trait AnyStorableVec: Send + Sync {
fn flush(&mut self, height: Height) -> io::Result<()>;
}
impl<I, T, const MODE: u8> AnyStorableVec for StorableVec<I, T, MODE>
impl<I, T> AnyStorableVec for StorableVec<I, T, CACHED_GETS>
where
I: StoredIndex,
T: StoredType,

View File

@@ -293,22 +293,8 @@ impl<const MODE: u8> Vecs<MODE> {
Ok(())
}
pub fn flush(&mut self, height: Height) -> io::Result<()> {
self.as_mut_any_vec_slice()
.into_par_iter()
.try_for_each(|vec| vec.flush(height))
}
pub fn starting_height(&mut self) -> Height {
self.as_mut_any_vec_slice()
.into_iter()
.map(|vec| vec.height().map(Height::incremented).unwrap_or_default())
.min()
.unwrap()
}
pub fn as_any_json_vec_slice(&self) -> [&dyn AnyJsonStorableVec; 43] {
[
pub fn as_any_json_vecs(&self) -> Vec<&dyn AnyJsonStorableVec> {
vec![
&*self.addressindex_to_addresstype as &dyn AnyJsonStorableVec,
&*self.addressindex_to_addresstypeindex,
&*self.addressindex_to_height,
@@ -354,54 +340,6 @@ impl<const MODE: u8> Vecs<MODE> {
&*self.txoutindex_to_value,
]
}
pub fn as_mut_any_vec_slice(&mut self) -> [&mut dyn AnyStorableVec; 43] {
[
&mut self.addressindex_to_addresstype as &mut dyn AnyStorableVec,
&mut self.addressindex_to_addresstypeindex,
&mut self.addressindex_to_height,
&mut self.height_to_blockhash,
&mut self.height_to_difficulty,
&mut self.height_to_first_addressindex,
&mut self.height_to_first_emptyindex,
&mut self.height_to_first_multisigindex,
&mut self.height_to_first_opreturnindex,
&mut self.height_to_first_pushonlyindex,
&mut self.height_to_first_txindex,
&mut self.height_to_first_txinindex,
&mut self.height_to_first_txoutindex,
&mut self.height_to_first_unknownindex,
&mut self.height_to_first_p2pk33index,
&mut self.height_to_first_p2pk65index,
&mut self.height_to_first_p2pkhindex,
&mut self.height_to_first_p2shindex,
&mut self.height_to_first_p2trindex,
&mut self.height_to_first_p2wpkhindex,
&mut self.height_to_first_p2wshindex,
&mut self.height_to_size,
&mut self.height_to_timestamp,
&mut self.height_to_weight,
&mut self.p2pk33index_to_p2pk33addressbytes,
&mut self.p2pk65index_to_p2pk65addressbytes,
&mut self.p2pkhindex_to_p2pkhaddressbytes,
&mut self.p2shindex_to_p2shaddressbytes,
&mut self.p2trindex_to_p2traddressbytes,
&mut self.p2wpkhindex_to_p2wpkhaddressbytes,
&mut self.p2wshindex_to_p2wshaddressbytes,
&mut self.txindex_to_first_txinindex,
&mut self.txindex_to_first_txoutindex,
&mut self.txindex_to_height,
&mut self.txindex_to_locktime,
&mut self.txindex_to_txid,
&mut self.txindex_to_base_size,
&mut self.txindex_to_total_size,
&mut self.txindex_to_is_explicitly_rbf,
&mut self.txindex_to_txversion,
&mut self.txinindex_to_txoutindex,
&mut self.txoutindex_to_addressindex,
&mut self.txoutindex_to_value,
]
}
}
impl Vecs<CACHED_GETS> {
@@ -471,4 +409,66 @@ impl Vecs<CACHED_GETS> {
Addressbytes::P2TR(bytes) => self.p2trindex_to_p2traddressbytes.push_if_needed(index.into(), bytes),
}
}
pub fn flush(&mut self, height: Height) -> io::Result<()> {
self.as_mut_any_vecs()
.into_par_iter()
.try_for_each(|vec| vec.flush(height))
}
pub fn starting_height(&mut self) -> Height {
self.as_mut_any_vecs()
.into_iter()
.map(|vec| vec.height().map(Height::incremented).unwrap_or_default())
.min()
.unwrap()
}
fn as_mut_any_vecs(&mut self) -> Vec<&mut dyn AnyStorableVec> {
vec![
&mut self.addressindex_to_addresstype as &mut dyn AnyStorableVec,
&mut self.addressindex_to_addresstypeindex,
&mut self.addressindex_to_height,
&mut self.height_to_blockhash,
&mut self.height_to_difficulty,
&mut self.height_to_first_addressindex,
&mut self.height_to_first_emptyindex,
&mut self.height_to_first_multisigindex,
&mut self.height_to_first_opreturnindex,
&mut self.height_to_first_pushonlyindex,
&mut self.height_to_first_txindex,
&mut self.height_to_first_txinindex,
&mut self.height_to_first_txoutindex,
&mut self.height_to_first_unknownindex,
&mut self.height_to_first_p2pk33index,
&mut self.height_to_first_p2pk65index,
&mut self.height_to_first_p2pkhindex,
&mut self.height_to_first_p2shindex,
&mut self.height_to_first_p2trindex,
&mut self.height_to_first_p2wpkhindex,
&mut self.height_to_first_p2wshindex,
&mut self.height_to_size,
&mut self.height_to_timestamp,
&mut self.height_to_weight,
&mut self.p2pk33index_to_p2pk33addressbytes,
&mut self.p2pk65index_to_p2pk65addressbytes,
&mut self.p2pkhindex_to_p2pkhaddressbytes,
&mut self.p2shindex_to_p2shaddressbytes,
&mut self.p2trindex_to_p2traddressbytes,
&mut self.p2wpkhindex_to_p2wpkhaddressbytes,
&mut self.p2wshindex_to_p2wshaddressbytes,
&mut self.txindex_to_first_txinindex,
&mut self.txindex_to_first_txoutindex,
&mut self.txindex_to_height,
&mut self.txindex_to_locktime,
&mut self.txindex_to_txid,
&mut self.txindex_to_base_size,
&mut self.txindex_to_total_size,
&mut self.txindex_to_is_explicitly_rbf,
&mut self.txindex_to_txversion,
&mut self.txinindex_to_txoutindex,
&mut self.txoutindex_to_addressindex,
&mut self.txoutindex_to_value,
]
}
}

View File

@@ -0,0 +1 @@
# BRK Logger

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../README.md")]
use std::{
fmt::Display,
fs::{self, OpenOptions},

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../README.md")]
use std::{
cmp::Ordering,
collections::BTreeMap,

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../README.md")]
use std::time::Instant;
use api::{ApiRoutes, VecIdToIndexToVec};
@@ -31,7 +33,7 @@ pub async fn main(indexer: Indexer<STATELESS>, computer: Computer<STATELESS>) ->
indexer
.vecs
.as_any_json_vec_slice()
.as_any_json_vecs()
.into_iter()
.for_each(|vec| vecs.insert(vec));

View File

@@ -12,6 +12,7 @@ repository.workspace = true
json = ["serde", "serde_json"]
[dependencies]
brk_exit = { workspace = true }
memmap2 = "0.9.5"
rayon = { workspace = true }
serde = { workspace = true, optional = true }

View File

@@ -1,3 +1,5 @@
#![doc = include_str!("../README.md")]
use std::{
cmp::Ordering,
error,
@@ -11,6 +13,7 @@ use std::{
sync::OnceLock,
};
use brk_exit::Exit;
pub use memmap2;
use rayon::prelude::*;
pub use zerocopy;
@@ -53,6 +56,7 @@ pub const STATELESS: u8 = 2;
///
#[derive(Debug)]
pub struct StorableVec<I, T, const MODE: u8> {
version: Version,
pathbuf: PathBuf,
file: File,
/// **Number of values NOT number of bytes**
@@ -103,26 +107,15 @@ where
fs::create_dir_all(path)?;
if MODE != STATELESS {
let path_version = Self::path_version_(path);
if let Ok(prev_version) = Version::try_from(path_version.as_path()) {
if prev_version != version {
if prev_version.swap_bytes() == version {
return Err(Error::WrongEndian);
}
return Err(Error::DifferentVersion {
found: prev_version,
expected: version,
});
}
}
version.write(&path_version)?;
let path = Self::path_version_(path);
version.validate(path.as_ref())?;
version.write(path.as_ref())?;
}
let file = Self::open_file_(&Self::path_vec_(path))?;
let mut slf = Self {
version,
pathbuf: path.to_owned(),
file_position: 0,
file_len: Self::read_disk_len_(&file)?,
@@ -265,7 +258,7 @@ where
self.has(index).map(|b| !b)
}
pub fn flush(&mut self) -> io::Result<()> {
fn _flush(&mut self) -> io::Result<()> {
if self.pushed.is_empty() {
return Ok(());
}
@@ -286,6 +279,11 @@ where
Ok(())
}
fn reset(&mut self) -> Result<()> {
self.truncate_if_needed(I::from(0))?;
Ok(())
}
pub fn truncate_if_needed(&mut self, index: I) -> Result<Option<T>> {
let index = Self::i_to_usize(index)?;
@@ -463,6 +461,11 @@ where
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
self.push_if_needed_(index, value)
}
#[inline]
pub fn flush(&mut self) -> io::Result<()> {
self._flush()
}
}
const FLUSH_EVERY: usize = 10_000;
@@ -486,7 +489,7 @@ where
res
}
pub fn last(&mut self) -> Result<Option<&T>> {
fn last(&mut self) -> Result<Option<&T>> {
let len = self.len();
if len == 0 {
return Ok(None);
@@ -494,28 +497,28 @@ where
Ok(self.get_(len - 1).ok())
}
#[inline]
pub fn push(&mut self, value: T) {
self.push_(value)
}
// #[inline]
// fn push(&mut self, value: T) {
// self.push_(value)
// }
#[inline]
pub fn push_if_needed(&mut self, index: I, value: T) -> Result<()> {
fn blocked_push_if_needed(&mut self, index: I, value: T, exit: &Exit) -> Result<()> {
self.push_if_needed_(index, value)?;
if self.pushed_len() >= FLUSH_EVERY {
Ok(self.flush()?)
Ok(self.blocked_flush(exit)?)
} else {
Ok(())
}
}
pub fn iter<F>(&mut self, f: F) -> Result<()>
where
F: FnMut((I, &T)) -> Result<()>,
{
self.iter_from(I::default(), f)
}
// pub fn iter<F>(&mut self, f: F) -> Result<()>
// where
// F: FnMut((I, &T)) -> Result<()>,
// {
// self.iter_from(I::default(), f)
// }
pub fn iter_from<F>(&mut self, mut index: I, mut f: F) -> Result<()>
where
@@ -545,103 +548,157 @@ where
Ok(())
}
pub fn compute_transform<A, F>(&mut self, other: &mut StorableVec<I, A, SINGLE_THREAD>, t: F) -> Result<()>
#[inline]
fn path_computed_version(&self) -> PathBuf {
self.path().join("computed_version")
}
// #[inline]
// fn path_computed_version_(path: &Path) -> PathBuf {
// path.join("computed_version")
// }
fn validate_or_reset(&mut self, version: Version) -> Result<()> {
let path = self.path_computed_version();
if version.validate(path.as_ref()).is_err() {
self.reset()?;
}
version.write(path.as_ref())?;
Ok(())
}
fn blocked_flush(&mut self, exit: &Exit) -> io::Result<()> {
if exit.triggered() {
return Ok(());
}
exit.block();
self._flush()?;
exit.release();
Ok(())
}
pub fn compute_transform<A, F>(
&mut self,
other: &mut StorableVec<I, A, SINGLE_THREAD>,
t: F,
exit: &Exit,
) -> Result<()>
where
A: StoredType,
F: Fn(&A) -> T,
{
other.iter_from(I::from(self.len()), |(i, a)| self.push_if_needed(i, t(a)))?;
Ok(self.flush()?)
self.validate_or_reset(Version::from(0) + self.version + other.version)?;
other.iter_from(I::from(self.len()), |(i, a)| self.blocked_push_if_needed(i, t(a), exit))?;
Ok(self.blocked_flush(exit)?)
}
pub fn compute_inverse_more_to_less(&mut self, other: &mut StorableVec<T, I, SINGLE_THREAD>) -> Result<()>
pub fn compute_inverse_more_to_less(
&mut self,
other: &mut StorableVec<T, I, SINGLE_THREAD>,
exit: &Exit,
) -> Result<()>
where
I: StoredType,
T: StoredIndex,
{
self.validate_or_reset(Version::from(0) + self.version + other.version)?;
let index = self.last()?.cloned().unwrap_or_default();
other.iter_from(index, |(v, i)| self.push_if_needed(*i, v))?;
Ok(self.flush()?)
other.iter_from(index, |(v, i)| self.blocked_push_if_needed(*i, v, exit))?;
Ok(self.blocked_flush(exit)?)
}
pub fn compute_inverse_less_to_more(
&mut self,
first_indexes: &mut StorableVec<T, I, SINGLE_THREAD>,
last_indexes: &mut StorableVec<T, I, SINGLE_THREAD>,
exit: &Exit,
) -> Result<()>
where
I: StoredType,
T: StoredIndex,
{
self.validate_or_reset(Version::from(0) + self.version + first_indexes.version + last_indexes.version)?;
first_indexes.iter_from(T::from(self.len()), |(value, first_index)| {
let first_index = Self::i_to_usize(*first_index)?;
let last_index = Self::i_to_usize(*last_indexes.get(value)?)?;
(first_index..last_index).try_for_each(|index| self.push_if_needed(I::from(index), value))
(first_index..last_index).try_for_each(|index| self.blocked_push_if_needed(I::from(index), value, exit))
})?;
Ok(self.flush()?)
Ok(self.blocked_flush(exit)?)
}
pub fn compute_last_index_from_first(
&mut self,
first_index_vec: &mut StorableVec<I, T, SINGLE_THREAD>,
first_indexes: &mut StorableVec<I, T, SINGLE_THREAD>,
final_len: usize,
exit: &Exit,
) -> Result<()>
where
T: Copy + From<usize> + Sub<T, Output = T> + StoredIndex,
{
self.validate_or_reset(Version::from(0) + self.version + first_indexes.version)?;
let one = T::from(1);
let mut prev_index: Option<I> = None;
first_index_vec.iter_from(I::from(self.len()), |(i, v)| {
first_indexes.iter_from(I::from(self.len()), |(i, v)| {
if let Some(prev_index) = prev_index {
self.push_if_needed(prev_index, *v - one)?;
self.blocked_push_if_needed(prev_index, *v - one, exit)?;
}
prev_index.replace(i);
Ok(())
})?;
if let Some(prev_index) = prev_index {
self.push_if_needed(prev_index, T::from(final_len) - one)?;
self.blocked_push_if_needed(prev_index, T::from(final_len) - one, exit)?;
}
Ok(self.flush()?)
Ok(self.blocked_flush(exit)?)
}
pub fn compute_count_from_indexes<T2>(
&mut self,
first_indexes: &mut StorableVec<I, T2, SINGLE_THREAD>,
last_indexes: &mut StorableVec<I, T2, SINGLE_THREAD>,
exit: &Exit,
) -> Result<()>
where
T: From<T2>,
T2: StoredType + Copy + Add<usize, Output = T2> + Sub<T2, Output = T2> + TryInto<T>,
<T2 as TryInto<T>>::Error: error::Error + 'static,
{
self.validate_or_reset(Version::from(0) + self.version + first_indexes.version + last_indexes.version)?;
first_indexes.iter_from(I::from(self.len()), |(i, first_index)| {
let last_index = last_indexes.get(i)?;
let count = *last_index + 1_usize - *first_index;
self.push_if_needed(i, count.into())
self.blocked_push_if_needed(i, count.into(), exit)
})?;
Ok(self.flush()?)
Ok(self.blocked_flush(exit)?)
}
pub fn compute_is_first_ordered<A>(
&mut self,
self_to_other: &mut StorableVec<I, A, SINGLE_THREAD>,
other_to_self: &mut StorableVec<A, I, SINGLE_THREAD>,
exit: &Exit,
) -> Result<()>
where
I: StoredType,
T: From<bool>,
A: StoredIndex + StoredType,
{
self.validate_or_reset(Version::from(0) + self.version + self_to_other.version + other_to_self.version)?;
self_to_other.iter_from(I::from(self.len()), |(i, other)| {
self.push_if_needed(i, T::from(other_to_self.get(*other)? == &i))
self.blocked_push_if_needed(i, T::from(other_to_self.get(*other)? == &i), exit)
})?;
Ok(self.flush()?)
Ok(self.blocked_flush(exit)?)
}
pub fn compute_sum_from_indexes<T2, F>(
&mut self,
first_indexes: &mut StorableVec<I, T2, SINGLE_THREAD>,
last_indexes: &mut StorableVec<I, T2, SINGLE_THREAD>,
exit: &Exit,
) -> Result<()>
where
T: From<T2>,
@@ -649,12 +706,14 @@ where
<T2 as TryInto<T>>::Error: error::Error + 'static,
F: Fn(&T2) -> T,
{
self.validate_or_reset(Version::from(0) + self.version + first_indexes.version + last_indexes.version)?;
first_indexes.iter_from(I::from(self.len()), |(i, first_index)| {
let last_index = last_indexes.get(i)?;
let count = *last_index + 1_usize - *first_index;
self.push_if_needed(i, count.into())
self.blocked_push_if_needed(i, count.into(), exit)
})?;
Ok(self.flush()?)
Ok(self.blocked_flush(exit)?)
}
}

View File

@@ -1,12 +1,13 @@
use std::{
fs,
io::{self, Read},
ops::Add,
path::Path,
};
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};
use crate::Error;
use crate::{Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, IntoBytes, Immutable, KnownLayout)]
pub struct Version(u32);
@@ -19,6 +20,22 @@ impl Version {
pub fn swap_bytes(self) -> Self {
Self(self.0.swap_bytes())
}
pub fn validate(&self, path: &Path) -> Result<()> {
if let Ok(prev_version) = Version::try_from(path) {
if prev_version != *self {
if prev_version.swap_bytes() == *self {
return Err(Error::WrongEndian);
}
return Err(Error::DifferentVersion {
found: prev_version,
expected: *self,
});
}
}
Ok(())
}
}
impl From<u32> for Version {
@@ -35,3 +52,10 @@ impl TryFrom<&Path> for Version {
Ok(*(Self::ref_from_bytes(&buf)?))
}
}
impl Add<Version> for Version {
type Output = Version;
fn add(self, rhs: Version) -> Self::Output {
Self(self.0 + rhs.0)
}
}

View File

@@ -1,6 +1,6 @@
use std::{io, mem};
use std::mem;
use crate::{Result, StorableVec, STATELESS};
use crate::{Result, STATELESS, StorableVec};
use super::{StoredIndex, StoredType};
@@ -9,7 +9,7 @@ pub trait AnyStorableVec: Send + Sync {
fn index_type_to_string(&self) -> &str;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn flush(&mut self) -> io::Result<()>;
// fn flush(&mut self) -> io::Result<()>;
}
impl<I, T, const MODE: u8> AnyStorableVec for StorableVec<I, T, MODE>
@@ -33,9 +33,9 @@ where
self.is_empty()
}
fn flush(&mut self) -> io::Result<()> {
self.flush()
}
// fn flush(&mut self) -> io::Result<()> {
// self.flush()
// }
}
#[cfg(feature = "json")]