indexer: rollback fixed via fjall v2.6.6 (conv on discord)

This commit is contained in:
nym21
2025-02-27 00:50:35 +01:00
parent 66b31a62d0
commit 677aca7a03
16 changed files with 172 additions and 187 deletions

View File

@@ -14,4 +14,3 @@ jiff = { workspace = true }
log = { workspace = true }
minreq = { workspace = true }
serde_json = { workspace = true }
storable_vec = { workspace = true }

View File

@@ -10,7 +10,6 @@ use brk_core::{Cents, OHLCCents, Timestamp};
use color_eyre::eyre::{ContextCompat, eyre};
use log::info;
use serde_json::Value;
use storable_vec::STATELESS;
use crate::{Close, Date, Dollars, High, Low, Open, Pricer, fetchers::retry};
@@ -39,7 +38,7 @@ impl Binance {
if self._1mn.is_none() || self._1mn.as_ref().unwrap().last_key_value().unwrap().0 <= &timestamp {
self._1mn.replace(Self::fetch_1mn()?);
}
Pricer::<STATELESS>::find_height_ohlc(
Pricer::find_height_ohlc(
self._1mn.as_ref().unwrap(),
timestamp,
previous_timestamp,
@@ -90,7 +89,7 @@ impl Binance {
if self.har.is_none() {
self.har.replace(self.read_har().unwrap_or_default());
}
Pricer::<STATELESS>::find_height_ohlc(self.har.as_ref().unwrap(), timestamp, previous_timestamp, "binance har")
Pricer::find_height_ohlc(self.har.as_ref().unwrap(), timestamp, previous_timestamp, "binance har")
}
fn read_har(&self) -> color_eyre::Result<BTreeMap<Timestamp, OHLCCents>> {

View File

@@ -4,7 +4,6 @@ use brk_core::{Cents, Close, Date, Dollars, High, Low, OHLCCents, Open, Timestam
use color_eyre::eyre::ContextCompat;
use log::info;
use serde_json::Value;
use storable_vec::STATELESS;
use crate::{Pricer, fetchers::retry};
@@ -23,7 +22,7 @@ impl Kraken {
if self._1mn.is_none() || self._1mn.as_ref().unwrap().last_key_value().unwrap().0 <= &timestamp {
self._1mn.replace(Self::fetch_1mn()?);
}
Pricer::<STATELESS>::find_height_ohlc(self._1mn.as_ref().unwrap(), timestamp, previous_timestamp, "kraken 1m")
Pricer::find_height_ohlc(self._1mn.as_ref().unwrap(), timestamp, previous_timestamp, "kraken 1m")
}
fn fetch_1mn() -> color_eyre::Result<BTreeMap<Timestamp, OHLCCents>> {

View File

@@ -1,25 +1,20 @@
use std::{
collections::BTreeMap,
fs,
path::{Path, PathBuf},
};
use std::{collections::BTreeMap, fs, path::Path};
use brk_core::{Cents, Close, Date, Dateindex, Dollars, Height, High, Low, OHLCCents, Open, Timestamp};
use brk_core::{Cents, Close, Date, Dollars, Height, High, Low, OHLCCents, Open, Timestamp};
use color_eyre::eyre::Error;
mod fetchers;
// use brk_indexer::Indexer;
pub use fetchers::*;
use storable_vec::{AnyJsonStorableVec, AnyStorableVec, SINGLE_THREAD, StorableVec, Version};
pub struct Pricer<const MODE: u8> {
pub struct Pricer {
binance: Binance,
kraken: Kraken,
kibo: Kibo,
}
impl<const MODE: u8> Pricer<MODE> {
impl Pricer {
pub fn import(path: &Path) -> color_eyre::Result<Self> {
fs::create_dir_all(path)?;
@@ -30,27 +25,6 @@ impl<const MODE: u8> Pricer<MODE> {
})
}
pub fn compute_if_needed(&mut self) {
// TODO: Remove all outdated
// indexer
// .vecs
// .height_to_timestamp
// .iter_from(Height::default(), |v| Ok(()));
// self.open
// .multi_insert_simple_transform(heights, dates, &mut self.ohlc, &|ohlc| ohlc.open);
// self.high
// .multi_insert_simple_transform(heights, dates, &mut self.ohlc, &|ohlc| ohlc.high);
// self.low
// .multi_insert_simple_transform(heights, dates, &mut self.ohlc, &|ohlc| ohlc.low);
// self.close
// .multi_insert_simple_transform(heights, dates, &mut self.ohlc, &|ohlc| ohlc.close);
}
fn get_date_ohlc(&mut self, date: Date) -> color_eyre::Result<OHLCCents> {
todo!();
// if self.ohlc.date.is_key_safe(date) {

View File

@@ -1,5 +1,4 @@
use brk_fetcher::{Binance, Kibo, Kraken};
use serde_json::Value;
use brk_fetcher::Binance;
fn main() -> color_eyre::Result<()> {
color_eyre::install()?;

View File

@@ -14,7 +14,7 @@ pub use brk_parser::*;
use bitcoin::{Transaction, TxIn, TxOut};
use color_eyre::eyre::{ContextCompat, eyre};
use hodor::Exit;
use hodor::Hodor;
use log::info;
use rayon::prelude::*;
use storable_vec::CACHED_GETS;
@@ -35,10 +35,9 @@ pub struct Indexer<const MODE: u8> {
impl<const MODE: u8> Indexer<MODE> {
pub fn import(indexes_dir: &Path) -> color_eyre::Result<Self> {
// info!("Increasing limit of opened files to 210_000...");
rlimit::setrlimit(
rlimit::Resource::NOFILE,
210_000,
rlimit::getrlimit(rlimit::Resource::NOFILE).unwrap().0.max(210_000),
rlimit::getrlimit(rlimit::Resource::NOFILE).unwrap().1,
)?;
@@ -51,7 +50,7 @@ impl<const MODE: u8> Indexer<MODE> {
}
impl Indexer<CACHED_GETS> {
pub fn index(&mut self, parser: &Parser, rpc: &'static rpc::Client, exit: &Exit) -> color_eyre::Result<()> {
pub fn index(&mut self, parser: &Parser, rpc: &'static rpc::Client, hodor: &Hodor) -> color_eyre::Result<()> {
let check_collisions = true;
let starting_indexes = Indexes::try_from((&mut self.vecs, &self.stores, rpc)).unwrap_or_else(|_| {
@@ -60,25 +59,29 @@ impl Indexer<CACHED_GETS> {
indexes
});
// dbg!(starting_indexes);
// panic!();
hodor.hold();
self.stores.rollback_if_needed(&self.vecs, &starting_indexes)?;
self.vecs.rollback_if_needed(&starting_indexes)?;
hodor.release();
exit.block();
self.stores.rollback(&self.vecs, &starting_indexes)?;
self.vecs.rollback(&starting_indexes)?;
exit.unblock();
let export_if_needed = |stores: &mut Fjalls,
vecs: &mut StorableVecs<CACHED_GETS>,
height: Height,
hodor: &Hodor|
-> color_eyre::Result<()> {
if height == 0 || height % SNAPSHOT_BLOCK_RANGE != 0 || hodor.triggered() {
return Ok(());
}
let export =
|stores: &mut Fjalls, vecs: &mut StorableVecs<CACHED_GETS>, height: Height| -> color_eyre::Result<()> {
info!("Exporting...");
exit.block();
stores.commit(height)?;
info!("Exported stores");
vecs.flush(height)?;
info!("Exported vecs");
exit.unblock();
Ok(())
};
info!("Exporting...");
hodor.hold();
stores.commit(height)?;
info!("Exported stores");
vecs.flush(height)?;
info!("Exported vecs");
hodor.release();
Ok(())
};
let vecs = &mut self.vecs;
let stores = &mut self.stores;
@@ -91,9 +94,8 @@ impl Indexer<CACHED_GETS> {
info!("Started indexing...");
parser.parse(Some(idxs.height), None)
.iter()
.try_for_each(|(height, block, blockhash)| -> color_eyre::Result<()> {
parser.parse(Some(idxs.height), None).iter().try_for_each(
|(height, block, blockhash)| -> color_eyre::Result<()> {
info!("Indexing block {height}...");
idxs.height = height;
@@ -115,8 +117,10 @@ impl Indexer<CACHED_GETS> {
.insert_if_needed(blockhash_prefix, height, height);
vecs.height_to_blockhash.push_if_needed(height, blockhash)?;
vecs.height_to_difficulty.push_if_needed(height, block.header.difficulty_float())?;
vecs.height_to_timestamp.push_if_needed(height, Timestamp::from(block.header.time))?;
vecs.height_to_difficulty
.push_if_needed(height, block.header.difficulty_float())?;
vecs.height_to_timestamp
.push_if_needed(height, Timestamp::from(block.header.time))?;
vecs.height_to_size.push_if_needed(height, block.total_size())?;
vecs.height_to_weight.push_if_needed(height, block.weight().into())?;
@@ -583,10 +587,12 @@ impl Indexer<CACHED_GETS> {
let only_known_dup_txids = [
bitcoin::Txid::from_str(
"d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599",
)?.into(),
)?
.into(),
bitcoin::Txid::from_str(
"e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468",
)?.into(),
)?
.into(),
];
let is_dup = only_known_dup_txids.contains(prev_txid);
@@ -623,18 +629,15 @@ impl Indexer<CACHED_GETS> {
idxs.push_future_if_needed(vecs)?;
let should_snapshot = height != 0 && height % SNAPSHOT_BLOCK_RANGE == 0 && !exit.blocked();
if should_snapshot {
export(stores, vecs, height)?;
}
export_if_needed(stores, vecs, height, hodor)?;
Ok(())
})?;
},
)?;
if idxs.height % SNAPSHOT_BLOCK_RANGE != 0 {
export(stores, vecs, idxs.height)?;
}
export_if_needed(stores, vecs, idxs.height, hodor)?;
// To make sure that Fjall had the time to flush everything properly
sleep(Duration::from_millis(100));
Ok(())

View File

@@ -5,21 +5,21 @@ use brk_parser::{
Parser,
rpc::{self},
};
use hodor::Exit;
use hodor::Hodor;
use log::info;
use storable_vec::CACHED_GETS;
fn main() -> color_eyre::Result<()> {
color_eyre::install()?;
brk_logger::init(None);
brk_logger::init(Some(Path::new(".log")));
let data_dir = Path::new("../../../bitcoin");
let rpc = Box::leak(Box::new(rpc::Client::new(
"http://localhost:8332",
rpc::Auth::CookieFile(Path::new(data_dir).join(".cookie")),
)?));
let exit = Exit::new();
let hodor = Hodor::new();
let parser = Parser::new(data_dir, rpc);
@@ -32,7 +32,7 @@ fn main() -> color_eyre::Result<()> {
let mut indexer: Indexer<CACHED_GETS> = Indexer::import(Path::new("../../_outputs/indexes"))?;
indexer.index(&parser, rpc, &exit)?;
indexer.index(&parser, rpc, &hodor)?;
info!("Took: {:?}", i.elapsed());

View File

@@ -1,6 +1,8 @@
use std::{
collections::{BTreeMap, BTreeSet},
error, mem,
error,
fmt::Debug,
mem,
path::Path,
};
@@ -27,25 +29,32 @@ const CHECK_COLLISISONS: bool = true;
impl<K, V> Store<K, V>
where
K: Into<ByteView> + Ord + Immutable + IntoBytes,
V: Into<ByteView> + TryFrom<ByteView>,
K: Debug + Into<ByteView> + Ord + Immutable + IntoBytes,
V: Debug + Into<ByteView> + TryFrom<ByteView>,
<V as TryFrom<ByteView>>::Error: error::Error + Send + Sync + 'static,
{
pub fn import(path: &Path, version: Version) -> color_eyre::Result<Self> {
let meta = StoreMeta::checked_open(path, version)?;
let keyspace = if let Ok(keyspace) = Self::open_keyspace(path) {
keyspace
} else {
meta.reset()?;
return Self::import(path, version);
let keyspace = match Self::open_keyspace(path) {
Ok(keyspace) => keyspace,
Err(e) => {
dbg!(e);
meta.reset()?;
return Self::import(path, version);
}
};
let part = if let Ok(part) = Self::open_partition_handle(&keyspace) {
part
} else {
drop(keyspace);
meta.reset()?;
return Self::import(path, version);
let part = match Self::open_partition_handle(&keyspace) {
Ok(part) => part,
Err(e) => {
dbg!(e);
drop(keyspace);
meta.reset()?;
return Self::import(path, version);
}
};
let rtx = keyspace.read_tx();
Ok(Self {
@@ -71,8 +80,8 @@ where
pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) {
if self.needs(height) {
if !self.dels.is_empty() {
unreachable!("Shouldn't reach this");
// self.dels.remove(&key);
unreachable!("Shouldn't reach this");
}
self.puts.insert(key, value);
}
@@ -83,7 +92,10 @@ where
unreachable!("Shouldn't reach this");
// self.puts.remove(&key);
}
self.dels.insert(key);
// dbg!(&key);
if !self.dels.insert(key) {
unreachable!();
}
}
pub fn commit(&mut self, height: Height) -> Result<()> {
@@ -101,8 +113,14 @@ where
mem::take(&mut self.puts).into_iter().for_each(|(key, value)| {
if CHECK_COLLISISONS {
#[allow(unused_must_use)]
if let Ok(Some(value)) = wtx.get(&self.part, key.as_bytes()) {
dbg!(value, &self.meta);
dbg!(
&key,
V::try_from(value.into()).unwrap(),
&self.meta,
self.rtx.get(&self.part, key.as_bytes())
);
unreachable!();
}
}

View File

@@ -84,9 +84,6 @@ impl StoreMeta {
path.join("height")
}
// fn read_length(&self) -> color_eyre::Result<usize> {
// Self::read_length_(&self.pathbuf)
// }
fn read_length_(path: &Path) -> color_eyre::Result<usize> {
Ok(fs::read(Self::path_length(path))
.map(|v| usize::read_from_bytes(v.as_slice()).unwrap_or_default())

View File

@@ -21,18 +21,27 @@ pub struct Fjalls {
impl Fjalls {
pub fn import(path: &Path) -> color_eyre::Result<Self> {
let addresshash_to_addressindex = Store::import(&path.join("addresshash_to_addressindex"), Version::from(1))?;
let blockhash_prefix_to_height = Store::import(&path.join("blockhash_prefix_to_height"), Version::from(1))?;
let txid_prefix_to_txindex = Store::import(&path.join("txid_prefix_to_txindex"), Version::from(1))?;
thread::scope(|scope| {
let addresshash_to_addressindex =
scope.spawn(|| Store::import(&path.join("addresshash_to_addressindex"), Version::from(1)));
let blockhash_prefix_to_height =
scope.spawn(|| Store::import(&path.join("blockhash_prefix_to_height"), Version::from(1)));
let txid_prefix_to_txindex =
scope.spawn(|| Store::import(&path.join("txid_prefix_to_txindex"), Version::from(1)));
Ok(Self {
addresshash_to_addressindex,
blockhash_prefix_to_height,
txid_prefix_to_txindex,
Ok(Self {
addresshash_to_addressindex: addresshash_to_addressindex.join().unwrap()?,
blockhash_prefix_to_height: blockhash_prefix_to_height.join().unwrap()?,
txid_prefix_to_txindex: txid_prefix_to_txindex.join().unwrap()?,
})
})
}
pub fn rollback(&mut self, vecs: &StorableVecs<CACHED_GETS>, starting_indexes: &Indexes) -> color_eyre::Result<()> {
pub fn rollback_if_needed(
&mut self,
vecs: &StorableVecs<CACHED_GETS>,
starting_indexes: &Indexes,
) -> color_eyre::Result<()> {
vecs.height_to_blockhash
.iter_from(starting_indexes.height, |(_, blockhash)| {
let blockhash = blockhash.as_ref();
@@ -41,12 +50,13 @@ impl Fjalls {
Ok(())
})?;
vecs.txindex_to_txid.iter_from(starting_indexes.txindex, |(_, txid)| {
let txid = txid.as_ref();
let txid_prefix = TxidPrefix::from(txid);
self.txid_prefix_to_txindex.remove(txid_prefix);
Ok(())
})?;
vecs.txindex_to_txid
.iter_from(starting_indexes.txindex, |(_txindex, txid)| {
let txid = txid.as_ref();
let txid_prefix = TxidPrefix::from(txid);
self.txid_prefix_to_txindex.remove(txid_prefix);
Ok(())
})?;
if let Some(index) = vecs.height_to_first_p2pk65index.get(starting_indexes.height)? {
let mut index = index.into_inner();

View File

@@ -48,14 +48,6 @@ where
fn path_height_(path: &Path) -> PathBuf {
path.join("height")
}
pub fn needs(&self, height: Height) -> bool {
self.height.is_none_or(|self_height| height > self_height)
}
#[allow(unused)]
pub fn has(&self, height: Height) -> bool {
!self.needs(height)
}
}
impl<I, T, const MODE: u8> Deref for StorableVec<I, T, MODE> {

View File

@@ -187,7 +187,7 @@ impl<const MODE: u8> StorableVecs<MODE> {
})
}
pub fn rollback(&mut self, starting_indexes: &Indexes) -> storable_vec::Result<()> {
pub fn rollback_if_needed(&mut self, starting_indexes: &Indexes) -> storable_vec::Result<()> {
let saved_height = starting_indexes.height.decremented();
// We don't want to override the starting indexes so we cut from n + 1

View File

@@ -15,7 +15,7 @@ color-eyre = { workspace = true }
derive_deref = { workspace = true }
jiff = { workspace = true }
log = { workspace = true }
oxc = { version = "0.52.0", features = ["codegen", "minifier"] }
oxc = { version = "0.53.0", features = ["codegen", "minifier"] }
serde = { workspace = true }
serde_json = { workspace = true }
storable_vec = { workspace = true }