diff --git a/Cargo.lock b/Cargo.lock index 70148638b..8d46b9174 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -394,11 +394,27 @@ version = "0.1.0" dependencies = [ "bindex", "biter", + "brice", "color-eyre", "derive_deref", "exit", "fjall", - "pricer", + "storable_vec", + "zerocopy 0.8.18", +] + +[[package]] +name = "brice" +version = "0.1.0" +dependencies = [ + "bindex", + "color-eyre", + "derive_deref", + "jiff", + "logger", + "minreq", + "serde", + "serde_json", "storable_vec", "zerocopy 0.8.18", ] @@ -1055,7 +1071,7 @@ dependencies = [ "http", "hyper", "hyper-util", - "rustls", + "rustls 0.23.23", "rustls-pki-types", "tokio", "tokio-rustls", @@ -1290,9 +1306,9 @@ checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "jiff" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba926fdd8e5b5e7f9700355b0831d8c416afe94b014b1023424037a187c9c582" +checksum = "3590fea8e9e22d449600c9bbd481a8163bef223e4ff938e5f55899f8cf1adb93" dependencies = [ "jiff-tzdb-platform", "log", @@ -1480,8 +1496,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da0c420feb01b9fb5061f8c8f452534361dd783756dcf38ec45191ce55e7a161" dependencies = [ "log", + "once_cell", + "rustls 0.21.12", + "rustls-webpki 0.101.7", "serde", "serde_json", + "webpki-roots", ] [[package]] @@ -1575,9 +1595,9 @@ checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" [[package]] name = "openssl" -version = "0.10.70" +version = "0.10.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61cfb4e166a8bb8c9b55c500bc2308550148ece889be90f609377e58140f42c6" +checksum = "5e14130c6a98cd258fdcb0fb6d744152343ff729cbfcb28c656a9d12b999fbcd" dependencies = [ "bitflags", "cfg-if", @@ -1607,9 +1627,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "openssl-sys" -version = "0.9.105" +version = "0.9.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b22d5b84be05a8d6947c7cb71f7c849aa0f112acd4bf51c2a7c1c988ac0a9dc" +checksum = "8bb61ea9811cc39e3c2069f40b8b8e2e70d8569b361f879786cc7ed48b777cdd" dependencies = [ "cc", "libc", @@ -1637,9 +1657,9 @@ checksum = "fb37767f6569cd834a413442455e0f066d0d522de8630436e2a1761d9726ba56" [[package]] name = "oxc" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d064edaa54040423cb94b283026dfcd679b29c26c2f070021dc268cc96fb155" +checksum = "c9f57beb511c2a1848baf900782d3975f89dbf75fc47ad2207147acf95523ab0" dependencies = [ "oxc_allocator", "oxc_ast", @@ -1680,9 +1700,9 @@ dependencies = [ [[package]] name = "oxc_allocator" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "417522e84d8fdc87cab7bfc95b6419757c3d2f2013b78491c0295135d3516945" +checksum = "5a6d450673da14c60c6946deb0c06d68377d2d6f2ee41ad5b462fa9894001560" dependencies = [ "allocator-api2", "bumpalo", @@ -1693,9 +1713,9 @@ dependencies = [ [[package]] name = "oxc_ast" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e0b6014198425ee79e1e0c71c79fc874f3a7c7007043d9ce1b353f740de488" +checksum = "692c74b135e66d3fa13312d886f4a307131a5bd2fd76e5a4137825c6161b56a5" dependencies = [ "bitflags", "cow-utils", @@ -1710,9 +1730,9 @@ dependencies = [ [[package]] name = "oxc_ast_macros" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a58764050b69c30bdf45a6f803bff212306ba9ac50277f21a231ff6e110da682" +checksum = "d4759d0523a72576036df640954ac9c4ce995f39f290cfc723221cc45ac7b11a" dependencies = [ "proc-macro2", "quote", @@ -1721,9 +1741,9 @@ dependencies = [ [[package]] name = "oxc_cfg" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "621ea878d0e487215822fe61c98afaa337e79f8290352a95bd8d37c45eab0484" +checksum = "c58e5838966f971c24e8be8c34fb901e6e8a48211ea1db6d66c76fbb77b58d6c" dependencies = [ "bitflags", "itertools", @@ -1736,9 +1756,9 @@ dependencies = [ [[package]] name = "oxc_codegen" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f68faeabfca1a9ad49b26939766185e87f4d03d08d917b465b1956db7d2ac466" +checksum = "b6e298be4a113f57749ed51a38dc253671eaed418a5de6dbd2bf29ff34c4f595" dependencies = [ "assert-unchecked", "bitflags", @@ -1758,9 +1778,9 @@ dependencies = [ [[package]] name = "oxc_data_structures" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f3c3ebb1b248fa1a5ea965b7398eb0533f45545c6cbe28cd581f57211413096" +checksum = "8e87ee74753f20549e9d6bcc00e956f5ab67fde4a23e383fd4f7d8ad752d71d3" dependencies = [ "assert-unchecked", "ropey", @@ -1768,9 +1788,9 @@ dependencies = [ [[package]] name = "oxc_diagnostics" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae69c956abfbadf5c31f08dc2b901bcefadb87a472106e13ae2b6a01135c66c2" +checksum = "e27133a883a4f5d9796259a3ca6ab28ba8c097ee32cd58fc794688e205f66727" dependencies = [ "cow-utils", "oxc-miette", @@ -1778,10 +1798,11 @@ dependencies = [ [[package]] name = "oxc_ecmascript" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2f773d18674b21240b42a7c8bcb3522de09b8841993620286e87812d7e7866a" +checksum = "16a7535550ce59cc7373d60d13e5fb32eee1817ba9dcfa3df7a1ae96bdb0945e" dependencies = [ + "cow-utils", "num-bigint", "num-traits", "oxc_ast", @@ -1791,9 +1812,9 @@ dependencies = [ [[package]] name = "oxc_estree" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "244fd8ba7157c8f12910024373d4e3665b6a35fb5947edae1d4a6c084ac8e0c3" +checksum = "a817a58818b63a5b26a15527e9c7f09c93ba1a95f570ae75e5e84bd3dee047de" [[package]] name = "oxc_index" @@ -1803,9 +1824,9 @@ checksum = "5eca5d9726cd0a6e433debe003b7bc88b2ecad0bb6109f0cef7c55e692139a34" [[package]] name = "oxc_mangler" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73839b44f57bdc707e63c3558d4da368e904960f13db1639c6d6c5e2e7e4ab8d" +checksum = "40dc09dd18e8c7cfaa85ef14f2634b81439de7ac900d579dcc3d6885c0847895" dependencies = [ "fixedbitset", "itertools", @@ -1819,9 +1840,9 @@ dependencies = [ [[package]] name = "oxc_minifier" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba22e1818002180efb9e023422a56d67b4baaa537deaf61d23d8ea6893ddcdd7" +checksum = "5c8987515e26c6765bf10c12073cd1d135bd99f531fcb2bcae399b8590e3dd44" dependencies = [ "cow-utils", "oxc_allocator", @@ -1840,9 +1861,9 @@ dependencies = [ [[package]] name = "oxc_parser" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b9d2ce3556a0ec6ee45da6936c5f04d97e6f00ef7ddca08dcc473094a21cfed" +checksum = "6410afa5ad3cde657f26803f828726b8d3b475131299faf5c7948299892a560c" dependencies = [ "assert-unchecked", "bitflags", @@ -1863,9 +1884,9 @@ dependencies = [ [[package]] name = "oxc_regular_expression" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0e6d43914ff6758f62cf150f12d068a737fd464735d6b83ecefc7136f65e685" +checksum = "9711b8c74f28de9a295bc03baf7d61abf47af35556156e49f3a44fa572194325" dependencies = [ "oxc_allocator", "oxc_ast_macros", @@ -1879,9 +1900,9 @@ dependencies = [ [[package]] name = "oxc_semantic" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e08d8bb5afc64dfc05e9f3b573c9af8b23a9a24aceb2bb1634d1f131717b736" +checksum = "18e0eac139ab28789789f993fa4955c386454d7c34c1adfe44448c1cb7769cf0" dependencies = [ "assert-unchecked", "itertools", @@ -1915,9 +1936,9 @@ dependencies = [ [[package]] name = "oxc_span" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ae8953d32ade84e814d154ee046f0f08b2b236e12f550545a690454ab2a102c" +checksum = "f8c7d6b92b30eea1fdc8907c6dfe8525b75a815024d6567628697789076fb7e1" dependencies = [ "compact_str", "oxc-miette", @@ -1928,9 +1949,9 @@ dependencies = [ [[package]] name = "oxc_syntax" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f375deb505b7fad6ed2125fdb92ffa8d550d866ca2ab0dfd24e681b125883c1" +checksum = "9540bf5b53cb0cd81dbdf07b25834a5064b762e6a330ca35d0d424a581a442b8" dependencies = [ "assert-unchecked", "bitflags", @@ -1949,9 +1970,9 @@ dependencies = [ [[package]] name = "oxc_traverse" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a4d049215b81048597d120851e1e797331be4f45ae2bb39ae2855c3be76fcc3" +checksum = "9583a428473921f6faa890fd9b564d8df59832d6638b03f4a30779a15d22e612" dependencies = [ "compact_str", "itoa", @@ -2106,22 +2127,6 @@ dependencies = [ "zerocopy 0.7.35", ] -[[package]] -name = "pricer" -version = "0.1.0" -dependencies = [ - "bindex", - "color-eyre", - "derive_deref", - "jiff", - "logger", - "reqwest", - "serde", - "serde_json", - "storable_vec", - "zerocopy 0.8.18", -] - [[package]] name = "proc-macro2" version = "1.0.93" @@ -2250,7 +2255,6 @@ version = "0.12.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ - "async-compression", "base64 0.22.1", "bytes", "encoding_rs", @@ -2281,7 +2285,6 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", - "tokio-util", "tower", "tower-service", "url", @@ -2349,6 +2352,18 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.23" @@ -2357,7 +2372,7 @@ checksum = "47796c98c480fce5406ef69d1c76378375492c3b0a0de587be0c1d9feb12f395" dependencies = [ "once_cell", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.102.8", "subtle", "zeroize", ] @@ -2377,6 +2392,16 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.102.8" @@ -2402,9 +2427,9 @@ checksum = "6ea1a2d0a644769cc99faa24c3ad26b379b786fe7c36fd3c546254801650e6dd" [[package]] name = "ryu-js" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad97d4ce1560a5e27cec89519dc8300d1aa6035b099821261c651486a19e44d5" +checksum = "dd29631678d6fb0903b69223673e122c32e9ae559d0960a38d574695ebc0ea15" [[package]] name = "schannel" @@ -2421,6 +2446,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "secp256k1" version = "0.29.1" @@ -2630,6 +2665,7 @@ name = "storable_vec" version = "0.1.2" dependencies = [ "memmap2", + "rayon", "serde", "serde_json", "zerocopy 0.8.18", @@ -2712,9 +2748,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.16.0" +version = "3.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38c246215d7d24f48ae091a2902398798e05d978b24315d6efbc00ede9a8bb91" +checksum = "22e5a0acb1f3f55f65cc4a866c361b2fb2a0ff6366785ae6fbb5f85df07ba230" dependencies = [ "cfg-if", "fastrand", @@ -2820,7 +2856,7 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f6d0975eaace0cf0fcadee4e4aaa5da15b5c079146f2cffb67c113be122bf37" dependencies = [ - "rustls", + "rustls 0.23.23", "tokio", ] @@ -3143,6 +3179,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "windows-registry" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 68e1b0483..1356c50c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,10 +20,10 @@ exit = { path = "exit" } fjall = "2.6.3" indexer = { path = "indexer", package = "bindex" } iterator = { path = "iterator", package = "biter" } -jiff = "0.2.0" +jiff = "0.2.1" logger = { path = "logger" } rayon = "1.10.0" -pricer = { path = "pricer" } +pricer = { path = "pricer", package = "brice" } rlimit = { version = "0.10.2" } serde = { version = "1.0.217", features = ["derive"] } serde_json = { version = "1.0.138", features = ["float_roundtrip"] } diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 6b68f9873..ea8f92c2a 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -1,5 +1,6 @@ [package] name = "bindex" +description = "A bitcoin-core indexer" version = "0.1.0" edition = "2021" diff --git a/indexer/README.md b/indexer/README.md new file mode 100644 index 000000000..4d5c31bf9 --- /dev/null +++ b/indexer/README.md @@ -0,0 +1,19 @@ +# Indexer + +A [Bitcoin Core](https://bitcoincore.org/en/about/) node indexer which iterates over the chain (via `../iterator`) and creates a database of the vecs (`../storable_vec`) and key/value stores ([`fjall`](https://crates.io/crates/fjall)) that can used in your Rust code. + +The crate only stores the bare minimum to be self sufficient and not have to use an RPC client (except for scripts which are not stored). If you need more data, checkout `../computer` which uses the outputs from the indexer to compute a whole range of datasets. + +The neat thing about using simple vecs to store data is that you can parse the outputed files with another programming language such as Python very simply, you'll find an example below. + +## Usage + +Rust: `src/main.rs` + +Python: `../python/parse.py` + +## Outputs + +Vecs: `src/storage/storable_vecs/mod.rs` + +Stores: `src/storage/fjalls/mod.rs` diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 98fa77fe0..2ecd7dd9d 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -22,7 +22,6 @@ pub use storage::{AnyStorableVec, StorableVec, Store, StoreMeta}; use storage::{Fjalls, StorableVecs}; pub use structs::*; -const UNSAFE_BLOCKS: u32 = 1000; const SNAPSHOT_BLOCK_RANGE: usize = 1000; pub struct Indexer { @@ -43,68 +42,42 @@ impl Indexer { pub fn index(&mut self, bitcoin_dir: &Path, rpc: rpc::Client, exit: &Exit) -> color_eyre::Result<()> { let check_collisions = true; - let vecs = &mut self.vecs; - let trees = &mut self.trees; + let starting_indexes = Indexes::try_from((&mut self.vecs, &self.trees, &rpc)).unwrap_or_else(|_| { + let indexes = Indexes::default(); + indexes.push_if_needed(&mut self.vecs).unwrap(); + indexes + }); - let mut height = vecs - .min_height() - .unwrap_or_default() - .min(trees.min_height()) - .and_then(|h| h.checked_sub(UNSAFE_BLOCKS)) - .map(Height::from) - .unwrap_or_default(); - - let mut txindex_global = vecs.height_to_first_txindex.get_or_default(height)?; - let mut txinindex_global = vecs.height_to_first_txinindex.get_or_default(height)?; - let mut txoutindex_global = vecs.height_to_first_txoutindex.get_or_default(height)?; - let mut addressindex_global = vecs.height_to_first_addressindex.get_or_default(height)?; - let mut emptyindex_global = vecs.height_to_first_emptyindex.get_or_default(height)?; - let mut multisigindex_global = vecs.height_to_first_multisigindex.get_or_default(height)?; - let mut opreturnindex_global = vecs.height_to_first_opreturnindex.get_or_default(height)?; - let mut pushonlyindex_global = vecs.height_to_first_pushonlyindex.get_or_default(height)?; - let mut unknownindex_global = vecs.height_to_first_unknownindex.get_or_default(height)?; - let mut p2pk33index_global = vecs.height_to_first_p2pk33index.get_or_default(height)?; - let mut p2pk65index_global = vecs.height_to_first_p2pk65index.get_or_default(height)?; - let mut p2pkhindex_global = vecs.height_to_first_p2pkhindex.get_or_default(height)?; - let mut p2shindex_global = vecs.height_to_first_p2shindex.get_or_default(height)?; - let mut p2trindex_global = vecs.height_to_first_p2trindex.get_or_default(height)?; - let mut p2wpkhindex_global = vecs.height_to_first_p2wpkhindex.get_or_default(height)?; - let mut p2wshindex_global = vecs.height_to_first_p2wshindex.get_or_default(height)?; + exit.block(); + self.trees.rollback(&self.vecs, &starting_indexes)?; + self.vecs.rollback(&starting_indexes)?; + exit.unblock(); let export = |trees: &mut Fjalls, vecs: &mut StorableVecs, height: Height| -> color_eyre::Result<()> { info!("Exporting..."); - exit.block(); - - thread::scope(|scope| -> color_eyre::Result<()> { - let vecs_handle = scope.spawn(|| vecs.flush(height)); - trees.commit(height)?; - vecs_handle.join().unwrap()?; - Ok(()) - })?; - + trees.commit(height)?; + vecs.flush(height)?; exit.unblock(); - Ok(()) }; - iterator::new(bitcoin_dir, Some(height.into()), Some(400_000), rpc) + let vecs = &mut self.vecs; + let trees = &mut self.trees; + + let mut idxs = starting_indexes; + + iterator::new(bitcoin_dir, Some(idxs.height.into()), None, rpc) .iter() .try_for_each(|(_height, block, blockhash)| -> color_eyre::Result<()> { info!("Indexing block {_height}..."); + let height = Height::from(_height); + idxs.height = height; + let blockhash = BlockHash::from(blockhash); - height = Height::from(_height); - - if let Some(saved_blockhash) = vecs.height_to_blockhash.get(height)? { - if &blockhash != saved_blockhash.as_ref() { - todo!("Rollback not implemented"); - // trees.rollback_from(&mut rtx, height, &exit)?; - } - } - - let blockhash_prefix = BlockHashPrefix::try_from(&blockhash)?; + let blockhash_prefix = BlockHashPrefix::from(&blockhash); if trees .blockhash_prefix_to_height @@ -124,30 +97,6 @@ impl Indexer { vecs.height_to_timestamp.push_if_needed(height, Timestamp::from(block.header.time))?; vecs.height_to_size.push_if_needed(height, block.total_size())?; vecs.height_to_weight.push_if_needed(height, block.weight().into())?; - vecs.height_to_first_txindex.push_if_needed(height, txindex_global)?; - vecs.height_to_first_txinindex - .push_if_needed(height, txinindex_global)?; - vecs.height_to_first_txoutindex - .push_if_needed(height, txoutindex_global)?; - vecs.height_to_first_addressindex - .push_if_needed(height, addressindex_global)?; - vecs.height_to_first_emptyindex - .push_if_needed(height, emptyindex_global)?; - vecs.height_to_first_multisigindex - .push_if_needed(height, multisigindex_global)?; - vecs.height_to_first_opreturnindex - .push_if_needed(height, opreturnindex_global)?; - vecs.height_to_first_pushonlyindex - .push_if_needed(height, pushonlyindex_global)?; - vecs.height_to_first_unknownindex - .push_if_needed(height, unknownindex_global)?; - vecs.height_to_first_p2pk33index.push_if_needed(height, p2pk33index_global)?; - vecs.height_to_first_p2pk65index.push_if_needed(height, p2pk65index_global)?; - vecs.height_to_first_p2pkhindex.push_if_needed(height, p2pkhindex_global)?; - vecs.height_to_first_p2shindex.push_if_needed(height, p2shindex_global)?; - vecs.height_to_first_p2trindex.push_if_needed(height, p2trindex_global)?; - vecs.height_to_first_p2wpkhindex.push_if_needed(height, p2wpkhindex_global)?; - vecs.height_to_first_p2wshindex.push_if_needed(height, p2wshindex_global)?; let inputs = block .txdata @@ -191,7 +140,7 @@ impl Indexer { .map(|(index, tx)| -> color_eyre::Result<_> { let txid = Txid::from(tx.compute_txid()); - let txid_prefix = TxidPrefix::try_from(&txid)?; + let txid_prefix = TxidPrefix::from(&txid); let prev_txindex_opt = if check_collisions && trees.txid_prefix_to_txindex.needs(height) { @@ -224,8 +173,8 @@ impl Indexer { .into_par_iter() .enumerate() .map(|(block_txinindex, (block_txindex, vin, txin, tx))| -> color_eyre::Result<(Txinindex, InputSource)> { - let txindex = txindex_global + block_txindex; - let txinindex = txinindex_global + Txinindex::from(block_txinindex); + let txindex = idxs.txindex + block_txindex; + let txinindex = idxs.txinindex + Txinindex::from(block_txinindex); // dbg!((txindex, txinindex, vin)); @@ -238,15 +187,15 @@ impl Indexer { let prev_txindex = if let Some(txindex) = trees .txid_prefix_to_txindex - .get(&TxidPrefix::try_from(&txid)?)? + .get(&TxidPrefix::from(&txid))? .map(|v| *v) .and_then(|txindex| { // Checking if not finding txindex from the future - (txindex < txindex_global).then_some(txindex) + (txindex < idxs.txindex).then_some(txindex) }) { txindex } else { - // dbg!(txindex_global + block_txindex, txindex, txin, vin); + // dbg!(indexes.txindex + block_txindex, txindex, txin, vin); return Ok((txinindex, InputSource::SameBlock((tx, txindex, txin, vin)))); }; @@ -301,8 +250,8 @@ impl Indexer { &Transaction, ), )> { - let txindex = txindex_global + block_txindex; - let txoutindex = txoutindex_global + Txoutindex::from(block_txoutindex); + let txindex = idxs.txindex + block_txindex; + let txoutindex = idxs.txoutindex + Txoutindex::from(block_txoutindex); let script = &txout.script_pubkey; @@ -321,7 +270,7 @@ impl Indexer { .map(|v| *v) // Checking if not in the future .and_then(|addressindex_local| { - (addressindex_local < addressindex_global).then_some(addressindex_local) + (addressindex_local < idxs.addressindex).then_some(addressindex_local) }) }); @@ -359,7 +308,7 @@ impl Indexer { prev_addresstype, prev_addressbytes, addressbytes, - addressindex_global, + idxs.addressindex, addressindex, addresstypeindex, txout, @@ -448,7 +397,7 @@ impl Indexer { vecs.txoutindex_to_value.push_if_needed(txoutindex, sats)?; - let mut addressindex = addressindex_global; + let mut addressindex = idxs.addressindex; let mut addresshash = None; @@ -464,21 +413,21 @@ impl Indexer { }) { addressindex = addressindex_local; } else { - addressindex_global.increment(); + idxs.addressindex.increment(); let addresstypeindex = match addresstype { - Addresstype::Empty => emptyindex_global.copy_then_increment(), - Addresstype::Multisig => multisigindex_global.copy_then_increment(), - Addresstype::OpReturn => opreturnindex_global.copy_then_increment(), - Addresstype::PushOnly => pushonlyindex_global.copy_then_increment(), - Addresstype::Unknown => unknownindex_global.copy_then_increment(), - Addresstype::P2PK65 => p2pk65index_global.copy_then_increment(), - Addresstype::P2PK33 => p2pk33index_global.copy_then_increment(), - Addresstype::P2PKH => p2pkhindex_global.copy_then_increment(), - Addresstype::P2SH => p2shindex_global.copy_then_increment(), - Addresstype::P2WPKH => p2wpkhindex_global.copy_then_increment(), - Addresstype::P2WSH => p2wshindex_global.copy_then_increment(), - Addresstype::P2TR => p2trindex_global.copy_then_increment(), + Addresstype::Empty => idxs.emptyindex.copy_then_increment(), + Addresstype::Multisig => idxs.multisigindex.copy_then_increment(), + Addresstype::OpReturn => idxs.opreturnindex.copy_then_increment(), + Addresstype::PushOnly => idxs.pushonlyindex.copy_then_increment(), + Addresstype::Unknown => idxs.unknownindex.copy_then_increment(), + Addresstype::P2PK65 => idxs.p2pk65index.copy_then_increment(), + Addresstype::P2PK33 => idxs.p2pk33index.copy_then_increment(), + Addresstype::P2PKH => idxs.p2pkhindex.copy_then_increment(), + Addresstype::P2SH => idxs.p2shindex.copy_then_increment(), + Addresstype::P2WPKH => idxs.p2wpkhindex.copy_then_increment(), + Addresstype::P2WSH => idxs.p2wshindex.copy_then_increment(), + Addresstype::P2TR => idxs.p2trindex.copy_then_increment(), }; vecs.addressindex_to_addresstype @@ -537,13 +486,13 @@ impl Indexer { let vout = Vout::from(outpoint.vout); let block_txindex = txid_prefix_to_txid_and_block_txindex_and_prev_txindex - .get(&TxidPrefix::try_from(&txid)?) + .get(&TxidPrefix::from(&txid)) .context("txid should be in same block").inspect_err(|_| { dbg!(&txid_prefix_to_txid_and_block_txindex_and_prev_txindex); // panic!(); })? .2; - let prev_txindex = txindex_global + block_txindex; + let prev_txindex = idxs.txindex + block_txindex; let prev_txoutindex = new_txindexvout_to_txoutindex .remove(&(prev_txindex, vout)) @@ -577,7 +526,7 @@ impl Indexer { .into_iter() .try_for_each( |(txid_prefix, (tx, txid, index, prev_txindex_opt))| -> color_eyre::Result<()> { - let txindex = txindex_global + index; + let txindex = idxs.txindex + index; txindex_to_tx_and_txid.insert(txindex, (tx, txid)); @@ -640,12 +589,19 @@ impl Indexer { vecs.txindex_to_txid.push_if_needed(txindex, txid)?; vecs.txindex_to_height.push_if_needed(txindex, height)?; vecs.txindex_to_locktime.push_if_needed(txindex, tx.lock_time.into())?; + // tx.base_size() + // tx.total_size() + // tx.is_explicitly_rbf() + // tx.weight() + // tx.vsize() in computer as it can be computed from the weight Ok(()) })?; - txindex_global += Txindex::from(tx_len); - txinindex_global += Txinindex::from(inputs_len); - txoutindex_global += Txoutindex::from(outputs_len); + idxs.txindex += Txindex::from(tx_len); + idxs.txinindex += Txinindex::from(inputs_len); + idxs.txoutindex += Txoutindex::from(outputs_len); + + idxs.push_future_if_needed(vecs)?; let should_snapshot = _height != 0 && _height % SNAPSHOT_BLOCK_RANGE == 0 && !exit.blocked(); if should_snapshot { @@ -655,7 +611,7 @@ impl Indexer { Ok(()) })?; - export(trees, vecs, height)?; + export(trees, vecs, idxs.height)?; sleep(Duration::from_millis(100)); diff --git a/indexer/src/storage/fjalls/base.rs b/indexer/src/storage/fjalls/base.rs index 6d6030f2e..7f75d1af8 100644 --- a/indexer/src/storage/fjalls/base.rs +++ b/indexer/src/storage/fjalls/base.rs @@ -1,4 +1,8 @@ -use std::{collections::BTreeMap, error, mem, path::Path}; +use std::{ + collections::{BTreeMap, BTreeSet}, + error, mem, + path::Path, +}; use fjall::{ PartitionCreateOptions, PersistMode, ReadTransaction, Result, Slice, TransactionalKeyspace, @@ -17,8 +21,11 @@ pub struct Store { part: TransactionalPartitionHandle, rtx: ReadTransaction, puts: BTreeMap, + dels: BTreeSet, } +const CHECK_COLLISISONS: bool = true; + impl Store where K: Into + Ord + Immutable + IntoBytes, @@ -48,6 +55,7 @@ where part, rtx, puts: BTreeMap::new(), + dels: BTreeSet::new(), }) } @@ -63,21 +71,44 @@ where pub fn insert_if_needed(&mut self, key: K, value: V, height: Height) { if self.needs(height) { + if !self.dels.is_empty() { + unreachable!("Shouldn't reach this"); + // self.dels.remove(&key); + } self.puts.insert(key, value); } } + pub fn remove(&mut self, key: K) { + if !self.puts.is_empty() { + unreachable!("Shouldn't reach this"); + // self.puts.remove(&key); + } + self.dels.insert(key); + } + pub fn commit(&mut self, height: Height) -> Result<()> { - if self.has(height) && self.puts.is_empty() { + if self.has(height) && self.puts.is_empty() && self.dels.is_empty() { return Ok(()); } self.meta.export(self.len(), height)?; let mut wtx = self.keyspace.write_tx(); - mem::take(&mut self.puts) + + mem::take(&mut self.dels) .into_iter() - .for_each(|(key, value)| wtx.insert(&self.part, key, value)); + .for_each(|key| wtx.remove(&self.part, key)); + + mem::take(&mut self.puts).into_iter().for_each(|(key, value)| { + if CHECK_COLLISISONS { + if let Ok(Some(value)) = wtx.get(&self.part, key.as_bytes()) { + dbg!(value); + unreachable!(); + } + } + wtx.insert(&self.part, key, value) + }); wtx.commit()?; @@ -88,12 +119,12 @@ where Ok(()) } - pub fn height(&self) -> Option<&Height> { + pub fn height(&self) -> Option { self.meta.height() } pub fn len(&self) -> usize { - self.meta.len() + self.puts.len() + self.meta.len() + self.puts.len() - self.dels.len() } pub fn is_empty(&self) -> bool { self.len() == 0 diff --git a/indexer/src/storage/fjalls/meta.rs b/indexer/src/storage/fjalls/meta.rs index 35f7bf24d..b9a79c38f 100644 --- a/indexer/src/storage/fjalls/meta.rs +++ b/indexer/src/storage/fjalls/meta.rs @@ -67,8 +67,8 @@ impl StoreMeta { path.join("version") } - pub fn height(&self) -> Option<&Height> { - self.height.as_ref() + pub fn height(&self) -> Option { + self.height } pub fn needs(&self, height: Height) -> bool { self.height.is_none_or(|self_height| height > self_height) diff --git a/indexer/src/storage/fjalls/mod.rs b/indexer/src/storage/fjalls/mod.rs index 4c6724929..7b2a0c029 100644 --- a/indexer/src/storage/fjalls/mod.rs +++ b/indexer/src/storage/fjalls/mod.rs @@ -1,16 +1,19 @@ use std::{path::Path, thread}; -use storable_vec::Version; +use storable_vec::{Value, Version, CACHED_GETS}; -use crate::{AddressHash, Addressindex, BlockHashPrefix, Height, TxidPrefix, Txindex}; +use crate::{ + AddressHash, Addressbytes, Addressindex, Addresstype, BlockHashPrefix, Height, Indexes, TxidPrefix, Txindex, +}; mod base; mod meta; -// mod version; pub use base::*; pub use meta::*; +use super::StorableVecs; + pub struct Fjalls { pub addresshash_to_addressindex: Store, pub blockhash_prefix_to_height: Store, @@ -30,135 +33,135 @@ impl Fjalls { }) } - // pub fn rollback_from( - // &mut self, - // _wtx: &mut WriteTransaction, - // _height: Height, - // _exit: &Exit, - // ) -> color_eyre::Result<()> { - // panic!(); - // let mut txindex = None; + pub fn rollback(&mut self, vecs: &StorableVecs, starting_indexes: &Indexes) -> color_eyre::Result<()> { + vecs.height_to_blockhash + .iter_from(starting_indexes.height, |(_, blockhash)| { + let blockhash = blockhash.as_ref(); + let blockhash_prefix = BlockHashPrefix::from(blockhash); + self.blockhash_prefix_to_height.remove(blockhash_prefix); + Ok(()) + })?; - // wtx.range(self.height_to_blockhash.data(), Slice::from(height)..) - // .try_for_each(|slice| -> color_eyre::Result<()> { - // let (height_slice, slice_blockhash) = slice?; - // let blockhash = BlockHash::from_slice(&slice_blockhash)?; + vecs.txindex_to_txid.iter_from(starting_indexes.txindex, |(_, txid)| { + let txid = txid.as_ref(); + let txid_prefix = TxidPrefix::from(txid); + self.txid_prefix_to_txindex.remove(txid_prefix); + Ok(()) + })?; - // wtx.remove(self.height_to_blockhash.data(), height_slice); + vecs.height_to_first_p2pk65index + .iter_from(starting_indexes.height, |(_, index)| { + if let Some(typedbytes) = vecs + .p2pk65index_to_p2pk65addressbytes + .get(index.into_inner())? + .map(Value::into_inner) + { + let bytes = Addressbytes::from(typedbytes); + let hash = AddressHash::from((&bytes, Addresstype::P2PK65)); + self.addresshash_to_addressindex.remove(hash); + } + Ok(()) + })?; - // wtx.remove(self.blockhash_prefix_to_height.data(), blockhash.prefix()); + vecs.height_to_first_p2pk33index + .iter_from(starting_indexes.height, |(_, index)| { + if let Some(typedbytes) = vecs + .p2pk33index_to_p2pk33addressbytes + .get(index.into_inner())? + .map(Value::into_inner) + { + let bytes = Addressbytes::from(typedbytes); + let hash = AddressHash::from((&bytes, Addresstype::P2PK33)); + self.addresshash_to_addressindex.remove(hash); + } + Ok(()) + })?; - // if txindex.is_none() { - // txindex.replace( - // wtx.get(self.height_to_first_txindex.data(), height_slice)? - // .context("for height to have first txindex")?, - // ); - // } - // wtx.remove(self.height_to_first_txindex.data(), height_slice); - // wtx.remove(self.height_to_last_txindex.data(), height_slice); + vecs.height_to_first_p2pkhindex + .iter_from(starting_indexes.height, |(_, index)| { + if let Some(typedbytes) = vecs + .p2pkhindex_to_p2pkhaddressbytes + .get(index.into_inner())? + .map(Value::into_inner) + { + let bytes = Addressbytes::from(typedbytes); + let hash = AddressHash::from((&bytes, Addresstype::P2PKH)); + self.addresshash_to_addressindex.remove(hash); + } + Ok(()) + })?; - // Ok(()) - // })?; + vecs.height_to_first_p2shindex + .iter_from(starting_indexes.height, |(_, index)| { + if let Some(typedbytes) = vecs + .p2shindex_to_p2shaddressbytes + .get(index.into_inner())? + .map(Value::into_inner) + { + let bytes = Addressbytes::from(typedbytes); + let hash = AddressHash::from((&bytes, Addresstype::P2SH)); + self.addresshash_to_addressindex.remove(hash); + } + Ok(()) + })?; - // let txindex = txindex.context("txindex to not be none by now")?; + vecs.height_to_first_p2trindex + .iter_from(starting_indexes.height, |(_, index)| { + if let Some(typedbytes) = vecs + .p2trindex_to_p2traddressbytes + .get(index.into_inner())? + .map(Value::into_inner) + { + let bytes = Addressbytes::from(typedbytes); + let hash = AddressHash::from((&bytes, Addresstype::P2TR)); + self.addresshash_to_addressindex.remove(hash); + } + Ok(()) + })?; - // wtx.range(self.txindex_to_txid.data(), Slice::from(txindex)..) - // .try_for_each(|slice| -> color_eyre::Result<()> { - // let (slice_txindex, slice_txid) = slice?; - // let txindex = Txindex::from(slice_txindex); - // let txid = Txid::from_slice(&slice_txid)?; + vecs.height_to_first_p2wpkhindex + .iter_from(starting_indexes.height, |(_, index)| { + if let Some(typedbytes) = vecs + .p2wpkhindex_to_p2wpkhaddressbytes + .get(index.into_inner())? + .map(Value::into_inner) + { + let bytes = Addressbytes::from(typedbytes); + let hash = AddressHash::from((&bytes, Addresstype::P2WPKH)); + self.addresshash_to_addressindex.remove(hash); + } + Ok(()) + })?; - // wtx.remove(self.txindex_to_txid.data(), Slice::from(txindex)); - // wtx.remove(self.txindex_to_height.data(), Slice::from(txindex)); - // wtx.remove(self.txid_prefix_to_txindex.data(), txid.prefix()); + vecs.height_to_first_p2wshindex + .iter_from(starting_indexes.height, |(_, index)| { + if let Some(typedbytes) = vecs + .p2wshindex_to_p2wshaddressbytes + .get(index.into_inner())? + .map(Value::into_inner) + { + let bytes = Addressbytes::from(typedbytes); + let hash = AddressHash::from((&bytes, Addresstype::P2WSH)); + self.addresshash_to_addressindex.remove(hash); + } + Ok(()) + })?; - // Ok(()) - // })?; + self.commit(starting_indexes.height.decremented())?; - // let txoutindex = Txoutindex::from(txindex); + Ok(()) + } - // let mut addressindexes = BTreeSet::new(); - - // wtx.range(self.txoutindex_to_amount.data(), Slice::from(txoutindex)..) - // .try_for_each(|slice| -> color_eyre::Result<()> { - // let (txoutindex_slice, _) = slice?; - - // wtx.remove(self.txoutindex_to_amount.data(), txoutindex_slice); - - // if let Some(addressindex_slice) = - // wtx.get(self.txoutindex_to_addressindex.data(), txoutindex_slice)? - // { - // wtx.remove(self.txoutindex_to_addressindex.data(), txoutindex_slice); - - // let addressindex = Addressindex::from(addressindex_slice); - // addressindexes.insert(addressindex); - - // let txoutindex = Txoutindex::from(txoutindex_slice); - // let addresstxoutindex = Addresstxoutindex::from((addressindex, txoutindex)); - - // wtx.remove( - // self.addressindex_to_txoutindexes.data(), - // Slice::from(addresstxoutindex), - // ); - // } - - // Ok(()) - // })?; - - // addressindexes - // .into_iter() - // .filter(|addressindex| { - // let is_empty = wtx - // .prefix( - // self.addressindex_to_txoutindexes.data(), - // Slice::from(*addressindex), - // ) - // .next() - // .is_none(); - // is_empty - // }) - // .try_for_each(|addressindex| -> color_eyre::Result<()> { - // let addressindex_slice = Slice::from(addressindex); - - // let addressbytes = Addressbytes::from( - // wtx.get( - // self.addressindex_to_addressbytes.data(), - // &addressindex_slice, - // )? - // .context("addressindex_to_address to have value")?, - // ); - // wtx.remove( - // self.addressbytes_prefix_to_addressindex.data(), - // addressbytes.prefix(), - // ); - // wtx.remove( - // self.addressindex_to_addressbytes.data(), - // &addressindex_slice, - // ); - // wtx.remove(self.addressindex_to_addresstype.data(), &addressindex_slice); - - // Ok(()) - // })?; - // - - // todo!("clear addresstxoutindexes_out") - // todo!("clear addresstxoutindexes_in") - // todo!("clear zero_txoutindexes") - // todo!("clear txindexvout_to_txoutindex") - - // Ok(()) - // } - - pub fn min_height(&self) -> Option { + pub fn starting_height(&self) -> Height { [ self.addresshash_to_addressindex.height(), self.blockhash_prefix_to_height.height(), self.txid_prefix_to_txindex.height(), ] .into_iter() + .map(|height| height.map(Height::incremented).unwrap_or_default()) .min() - .flatten() - .cloned() + .unwrap() } pub fn commit(&mut self, height: Height) -> fjall::Result<()> { @@ -176,22 +179,4 @@ impl Fjalls { Ok(()) }) } - - // pub fn udpate_meta(&self, wtx: &mut WriteTransaction, height: Height) { - // self.addressbytes_prefix_to_addressindex.update_meta(wtx, height); - // self.blockhash_prefix_to_height.update_meta(wtx, height); - // self.txid_prefix_to_txindex.update_meta(wtx, height); - // } - - // pub fn export(self, height: Height) -> Result<(), snkrj::Error> { - // thread::scope(|scope| { - // vec![ - // scope.spawn(|| self.addressbytes_prefix_to_addressindex.export(height)), - // scope.spawn(|| self.blockhash_prefix_to_height.export(height)), - // scope.spawn(|| self.txid_prefix_to_txindex.export(height)), - // ] - // .into_iter() - // .try_for_each(|handle| -> Result<(), snkrj::Error> { handle.join().unwrap() }) - // }) - // } } diff --git a/indexer/src/storage/storable_vecs/base.rs b/indexer/src/storage/storable_vecs/base.rs index 0f4f95ca9..adb76511f 100644 --- a/indexer/src/storage/storable_vecs/base.rs +++ b/indexer/src/storage/storable_vecs/base.rs @@ -28,14 +28,14 @@ where } pub fn flush(&mut self, height: Height) -> io::Result<()> { - if self.needs(height) { - height.write(&self.path_height())?; - } + height.write(&self.path_height())?; self.vec.flush() } pub fn truncate_if_needed(&mut self, index: I, height: Height) -> storable_vec::Result> { - height.write(&self.path_height())?; + if self.height.is_none_or(|self_height| self_height != height) { + height.write(&self.path_height())?; + } self.vec.truncate_if_needed(index) } diff --git a/indexer/src/storage/storable_vecs/mod.rs b/indexer/src/storage/storable_vecs/mod.rs index 7b063f867..170456860 100644 --- a/indexer/src/storage/storable_vecs/mod.rs +++ b/indexer/src/storage/storable_vecs/mod.rs @@ -1,14 +1,17 @@ -use std::{collections::BTreeMap, fs, io, path::Path}; +use std::{fs, io, path::Path}; -use exit::Exit; use rayon::prelude::*; use storable_vec::{AnyJsonStorableVec, Version, CACHED_GETS}; -use crate::structs::{ - Addressbytes, Addressindex, Addresstype, Addresstypeindex, BlockHash, Emptyindex, Height, LockTime, Multisigindex, - Opreturnindex, P2PK33AddressBytes, P2PK33index, P2PK65AddressBytes, P2PK65index, P2PKHAddressBytes, P2PKHindex, - P2SHAddressBytes, P2SHindex, P2TRAddressBytes, P2TRindex, P2WPKHAddressBytes, P2WPKHindex, P2WSHAddressBytes, - P2WSHindex, Pushonlyindex, Sats, Timestamp, TxVersion, Txid, Txindex, Txinindex, Txoutindex, Unknownindex, Weight, +use crate::{ + structs::{ + Addressbytes, Addressindex, Addresstype, Addresstypeindex, BlockHash, Emptyindex, Height, LockTime, + Multisigindex, Opreturnindex, P2PK33AddressBytes, P2PK33index, P2PK65AddressBytes, P2PK65index, + P2PKHAddressBytes, P2PKHindex, P2SHAddressBytes, P2SHindex, P2TRAddressBytes, P2TRindex, P2WPKHAddressBytes, + P2WPKHindex, P2WSHAddressBytes, P2WSHindex, Pushonlyindex, Sats, Timestamp, TxVersion, Txid, Txindex, + Txinindex, Txoutindex, Unknownindex, Weight, + }, + Indexes, }; mod base; @@ -58,8 +61,6 @@ pub struct StorableVecs { pub txoutindex_to_value: StorableVec, } -// const UNSAFE_BLOCKS: usize = 1000; - impl StorableVecs { pub fn import(path: &Path) -> color_eyre::Result { fs::create_dir_all(path)?; @@ -180,113 +181,104 @@ impl StorableVecs { }) } - #[allow(unused)] - pub fn rollback_from(&mut self, height: Height, exit: &Exit) -> storable_vec::Result<()> { - let prev_height = height.decremented(); + pub fn rollback(&mut self, starting_indexes: &Indexes) -> storable_vec::Result<()> { + let saved_height = starting_indexes.height.decremented(); - let mut truncated_indexes: BTreeMap> = BTreeMap::new(); + // We don't want to override the starting indexes so we cut from n + 1 + let height = starting_indexes.height.incremented(); - let addressindex = self - .height_to_first_addressindex - .truncate_if_needed(height, prev_height)?; - let txindex = self.height_to_first_txindex.truncate_if_needed(height, prev_height)?; - let txinindex = self.height_to_first_txinindex.truncate_if_needed(height, prev_height)?; - let txoutindex = self - .height_to_first_txoutindex - .truncate_if_needed(height, prev_height)?; - let p2pk33index = self - .height_to_first_p2pk33index - .truncate_if_needed(height, prev_height)?; - let p2pk65index = self - .height_to_first_p2pk65index - .truncate_if_needed(height, prev_height)?; - let p2pkhindex = self - .height_to_first_p2pkhindex - .truncate_if_needed(height, prev_height)?; - let p2shindex = self.height_to_first_p2shindex.truncate_if_needed(height, prev_height)?; - let p2trindex = self.height_to_first_p2trindex.truncate_if_needed(height, prev_height)?; - let p2wpkhindex = self - .height_to_first_p2wpkhindex - .truncate_if_needed(height, prev_height)?; - let p2wshindex = self - .height_to_first_p2wshindex - .truncate_if_needed(height, prev_height)?; - - self.height_to_blockhash.truncate_if_needed(height, prev_height)?; - self.height_to_difficulty.truncate_if_needed(height, prev_height)?; + self.height_to_first_addressindex + .truncate_if_needed(height, saved_height)?; self.height_to_first_emptyindex - .truncate_if_needed(height, prev_height)?; + .truncate_if_needed(height, saved_height)?; self.height_to_first_multisigindex - .truncate_if_needed(height, prev_height)?; + .truncate_if_needed(height, saved_height)?; self.height_to_first_opreturnindex - .truncate_if_needed(height, prev_height)?; + .truncate_if_needed(height, saved_height)?; + self.height_to_first_p2pk33index + .truncate_if_needed(height, saved_height)?; + self.height_to_first_p2pk65index + .truncate_if_needed(height, saved_height)?; + self.height_to_first_p2pkhindex + .truncate_if_needed(height, saved_height)?; + self.height_to_first_p2shindex + .truncate_if_needed(height, saved_height)?; + self.height_to_first_p2trindex + .truncate_if_needed(height, saved_height)?; + self.height_to_first_p2wpkhindex + .truncate_if_needed(height, saved_height)?; + self.height_to_first_p2wshindex + .truncate_if_needed(height, saved_height)?; self.height_to_first_pushonlyindex - .truncate_if_needed(height, prev_height)?; + .truncate_if_needed(height, saved_height)?; + self.height_to_first_txindex.truncate_if_needed(height, saved_height)?; + self.height_to_first_txinindex + .truncate_if_needed(height, saved_height)?; + self.height_to_first_txoutindex + .truncate_if_needed(height, saved_height)?; self.height_to_first_unknownindex - .truncate_if_needed(height, prev_height)?; - self.height_to_size.truncate_if_needed(height, prev_height)?; - self.height_to_timestamp.truncate_if_needed(height, prev_height)?; - self.height_to_weight.truncate_if_needed(height, prev_height)?; + .truncate_if_needed(height, saved_height)?; - if let Some(addressindex) = addressindex { - self.addressindex_to_addresstype - .truncate_if_needed(addressindex, prev_height)?; - self.addressindex_to_addresstypeindex - .truncate_if_needed(addressindex, prev_height)?; - self.addressindex_to_height - .truncate_if_needed(addressindex, prev_height)?; - } + // Now we can cut everything that's out of date + let &Indexes { + addressindex, + height, + p2pk33index, + p2pk65index, + p2pkhindex, + p2shindex, + p2trindex, + p2wpkhindex, + p2wshindex, + txindex, + txinindex, + txoutindex, + .. + } = starting_indexes; - if let Some(p2pk33index) = p2pk33index { - self.p2pk33index_to_p2pk33addressbytes - .truncate_if_needed(p2pk33index, prev_height)?; - } - if let Some(p2pk65index) = p2pk65index { - self.p2pk65index_to_p2pk65addressbytes - .truncate_if_needed(p2pk65index, prev_height)?; - } - if let Some(p2pkhindex) = p2pkhindex { - self.p2pkhindex_to_p2pkhaddressbytes - .truncate_if_needed(p2pkhindex, prev_height)?; - } - if let Some(p2shindex) = p2shindex { - self.p2shindex_to_p2shaddressbytes - .truncate_if_needed(p2shindex, prev_height)?; - } - if let Some(p2trindex) = p2trindex { - self.p2trindex_to_p2traddressbytes - .truncate_if_needed(p2trindex, prev_height)?; - } - if let Some(p2wpkhindex) = p2wpkhindex { - self.p2wpkhindex_to_p2wpkhaddressbytes - .truncate_if_needed(p2wpkhindex, prev_height)?; - } - if let Some(p2wshindex) = p2wshindex { - self.p2wshindex_to_p2wshaddressbytes - .truncate_if_needed(p2wshindex, prev_height); - } + self.height_to_blockhash.truncate_if_needed(height, saved_height)?; + self.height_to_difficulty.truncate_if_needed(height, saved_height)?; + self.height_to_size.truncate_if_needed(height, saved_height)?; + self.height_to_timestamp.truncate_if_needed(height, saved_height)?; + self.height_to_weight.truncate_if_needed(height, saved_height)?; - if let Some(txindex) = txindex { - self.txindex_to_first_txinindex - .truncate_if_needed(txindex, prev_height)?; - self.txindex_to_first_txoutindex - .truncate_if_needed(txindex, prev_height)?; - self.txindex_to_height.truncate_if_needed(txindex, prev_height)?; - self.txindex_to_locktime.truncate_if_needed(txindex, prev_height)?; - self.txindex_to_txid.truncate_if_needed(txindex, prev_height)?; - self.txindex_to_txversion.truncate_if_needed(txindex, prev_height)?; - } + self.addressindex_to_addresstype + .truncate_if_needed(addressindex, saved_height)?; + self.addressindex_to_addresstypeindex + .truncate_if_needed(addressindex, saved_height)?; + self.addressindex_to_height + .truncate_if_needed(addressindex, saved_height)?; - if let Some(txinindex) = txinindex { - self.txinindex_to_txoutindex - .truncate_if_needed(txinindex, prev_height)?; - } + self.p2pk33index_to_p2pk33addressbytes + .truncate_if_needed(p2pk33index, saved_height)?; + self.p2pk65index_to_p2pk65addressbytes + .truncate_if_needed(p2pk65index, saved_height)?; + self.p2pkhindex_to_p2pkhaddressbytes + .truncate_if_needed(p2pkhindex, saved_height)?; + self.p2shindex_to_p2shaddressbytes + .truncate_if_needed(p2shindex, saved_height)?; + self.p2trindex_to_p2traddressbytes + .truncate_if_needed(p2trindex, saved_height)?; + self.p2wpkhindex_to_p2wpkhaddressbytes + .truncate_if_needed(p2wpkhindex, saved_height)?; + self.p2wshindex_to_p2wshaddressbytes + .truncate_if_needed(p2wshindex, saved_height)?; - if let Some(txoutindex) = txoutindex { - self.txoutindex_to_addressindex - .truncate_if_needed(txoutindex, prev_height)?; - self.txoutindex_to_value.truncate_if_needed(txoutindex, prev_height)?; - } + self.txindex_to_first_txinindex + .truncate_if_needed(txindex, saved_height)?; + self.txindex_to_first_txoutindex + .truncate_if_needed(txindex, saved_height)?; + self.txindex_to_height.truncate_if_needed(txindex, saved_height)?; + self.txindex_to_locktime.truncate_if_needed(txindex, saved_height)?; + self.txindex_to_txid.truncate_if_needed(txindex, saved_height)?; + self.txindex_to_txversion.truncate_if_needed(txindex, saved_height)?; + + self.txinindex_to_txoutindex + .truncate_if_needed(txinindex, saved_height)?; + + self.txoutindex_to_addressindex + .truncate_if_needed(txoutindex, saved_height)?; + self.txoutindex_to_value.truncate_if_needed(txoutindex, saved_height)?; Ok(()) } @@ -297,12 +289,12 @@ impl StorableVecs { .try_for_each(|vec| vec.flush(height)) } - pub fn min_height(&mut self) -> color_eyre::Result> { - Ok(self - .as_mut_any_vec_slice() + pub fn starting_height(&mut self) -> Height { + self.as_mut_any_vec_slice() .into_iter() - .map(|vec| vec.height().unwrap_or_default()) - .min()) + .map(|vec| vec.height().map(Height::incremented).unwrap_or_default()) + .min() + .unwrap() } pub fn as_any_json_vec_slice(&self) -> [&dyn AnyJsonStorableVec; 40] { diff --git a/indexer/src/structs/blockhash.rs b/indexer/src/structs/blockhash.rs index ac3eff4ae..0bb44bd82 100644 --- a/indexer/src/structs/blockhash.rs +++ b/indexer/src/structs/blockhash.rs @@ -1,9 +1,12 @@ use std::mem; use derive_deref::Deref; +use iterator::rpc::{Client, RpcApi}; use serde::Serialize; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; +use super::Height; + #[derive(Debug, Deref, Clone, PartialEq, Eq, Immutable, IntoBytes, KnownLayout, FromBytes, Serialize)] pub struct BlockHash([u8; 32]); @@ -18,3 +21,10 @@ impl From for bitcoin::BlockHash { unsafe { mem::transmute(value) } } } + +impl TryFrom<(&Client, Height)> for BlockHash { + type Error = iterator::rpc::Error; + fn try_from((rpc, height): (&Client, Height)) -> Result { + Ok(Self::from(rpc.get_block_hash(u64::from(height))?)) + } +} diff --git a/indexer/src/structs/compressed.rs b/indexer/src/structs/compressed.rs index 92301d14b..fc482eba9 100644 --- a/indexer/src/structs/compressed.rs +++ b/indexer/src/structs/compressed.rs @@ -41,10 +41,9 @@ impl From for Slice { #[derive(Debug, Deref, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, Immutable, IntoBytes, KnownLayout)] pub struct BlockHashPrefix([u8; 8]); -impl TryFrom<&BlockHash> for BlockHashPrefix { - type Error = color_eyre::Report; - fn try_from(value: &BlockHash) -> Result { - Ok(Self(copy_first_8bytes(&value[..]))) +impl From<&BlockHash> for BlockHashPrefix { + fn from(value: &BlockHash) -> Self { + Self(copy_first_8bytes(&value[..]).unwrap()) } } impl TryFrom for BlockHashPrefix { @@ -66,10 +65,9 @@ impl From for Slice { #[derive(Debug, Deref, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, Immutable, IntoBytes, KnownLayout)] pub struct TxidPrefix([u8; 8]); -impl TryFrom<&Txid> for TxidPrefix { - type Error = color_eyre::Report; - fn try_from(value: &Txid) -> Result { - Ok(Self(copy_first_8bytes(&value[..]))) +impl From<&Txid> for TxidPrefix { + fn from(value: &Txid) -> Self { + Self(copy_first_8bytes(&value[..]).unwrap()) } } impl TryFrom for TxidPrefix { @@ -89,14 +87,14 @@ impl From for Slice { } } -fn copy_first_8bytes(slice: &[u8]) -> [u8; 8] { +fn copy_first_8bytes(slice: &[u8]) -> Result<[u8; 8], ()> { let mut buf: [u8; 8] = [0; 8]; let buf_len = buf.len(); if slice.len() < buf_len { - panic!("bad len"); + return Err(()); } slice.iter().take(buf_len).enumerate().for_each(|(i, r)| { buf[i] = *r; }); - buf + Ok(buf) } diff --git a/indexer/src/structs/height.rs b/indexer/src/structs/height.rs index daaa11c15..e19631fd3 100644 --- a/indexer/src/structs/height.rs +++ b/indexer/src/structs/height.rs @@ -30,13 +30,31 @@ use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; pub struct Height(u32); impl Height { + const ZERO: Self = Height(0); + pub fn write(&self, path: &Path) -> Result<(), io::Error> { fs::write(path, self.as_bytes()) } - pub fn decremented(&self) -> Self { + pub fn increment(&mut self) { + self.0 += 1; + } + + pub fn incremented(self) -> Self { + Self(self.0 + 1) + } + + pub fn decrement(&mut self) { + self.0 -= 1; + } + + pub fn decremented(self) -> Self { Self(self.0.checked_sub(1).unwrap_or_default()) } + + pub fn is_zero(self) -> bool { + self == Self::ZERO + } } impl PartialEq for Height { @@ -133,6 +151,12 @@ impl From for usize { } } +impl From for u64 { + fn from(value: Height) -> Self { + value.0 as u64 + } +} + impl TryFrom<&Path> for Height { type Error = storable_vec::Error; fn try_from(value: &Path) -> Result { diff --git a/indexer/src/structs/indexes.rs b/indexer/src/structs/indexes.rs new file mode 100644 index 000000000..2392ca4b5 --- /dev/null +++ b/indexer/src/structs/indexes.rs @@ -0,0 +1,114 @@ +use color_eyre::eyre::ContextCompat; +use iterator::rpc::Client; +use iterator::NUMBER_OF_UNSAFE_BLOCKS; +use storable_vec::CACHED_GETS; + +use crate::storage::{Fjalls, StorableVecs}; + +use super::{ + Addressindex, BlockHash, Emptyindex, Height, Multisigindex, Opreturnindex, P2PK33index, P2PK65index, P2PKHindex, + P2SHindex, P2TRindex, P2WPKHindex, P2WSHindex, Pushonlyindex, Txindex, Txinindex, Txoutindex, Unknownindex, +}; + +#[derive(Debug, Default)] +pub struct Indexes { + pub addressindex: Addressindex, + pub emptyindex: Emptyindex, + pub height: Height, + pub multisigindex: Multisigindex, + pub opreturnindex: Opreturnindex, + pub p2pk33index: P2PK33index, + pub p2pk65index: P2PK65index, + pub p2pkhindex: P2PKHindex, + pub p2shindex: P2SHindex, + pub p2trindex: P2TRindex, + pub p2wpkhindex: P2WPKHindex, + pub p2wshindex: P2WSHindex, + pub pushonlyindex: Pushonlyindex, + pub txindex: Txindex, + pub txinindex: Txinindex, + pub txoutindex: Txoutindex, + pub unknownindex: Unknownindex, +} + +impl Indexes { + pub fn push_if_needed(&self, vecs: &mut StorableVecs) -> storable_vec::Result<()> { + let height = self.height; + vecs.height_to_first_txindex.push_if_needed(height, self.txindex)?; + vecs.height_to_first_txinindex.push_if_needed(height, self.txinindex)?; + vecs.height_to_first_txoutindex + .push_if_needed(height, self.txoutindex)?; + vecs.height_to_first_addressindex + .push_if_needed(height, self.addressindex)?; + vecs.height_to_first_emptyindex + .push_if_needed(height, self.emptyindex)?; + vecs.height_to_first_multisigindex + .push_if_needed(height, self.multisigindex)?; + vecs.height_to_first_opreturnindex + .push_if_needed(height, self.opreturnindex)?; + vecs.height_to_first_pushonlyindex + .push_if_needed(height, self.pushonlyindex)?; + vecs.height_to_first_unknownindex + .push_if_needed(height, self.unknownindex)?; + vecs.height_to_first_p2pk33index + .push_if_needed(height, self.p2pk33index)?; + vecs.height_to_first_p2pk65index + .push_if_needed(height, self.p2pk65index)?; + vecs.height_to_first_p2pkhindex + .push_if_needed(height, self.p2pkhindex)?; + vecs.height_to_first_p2shindex.push_if_needed(height, self.p2shindex)?; + vecs.height_to_first_p2trindex.push_if_needed(height, self.p2trindex)?; + vecs.height_to_first_p2wpkhindex + .push_if_needed(height, self.p2wpkhindex)?; + vecs.height_to_first_p2wshindex + .push_if_needed(height, self.p2wshindex)?; + Ok(()) + } + + pub fn push_future_if_needed(&mut self, vecs: &mut StorableVecs) -> storable_vec::Result<()> { + self.height.increment(); + self.push_if_needed(vecs)?; + self.height.decrement(); + Ok(()) + } +} + +impl TryFrom<(&mut StorableVecs, &Fjalls, &Client)> for Indexes { + type Error = color_eyre::Report; + fn try_from((vecs, trees, rpc): (&mut StorableVecs, &Fjalls, &Client)) -> color_eyre::Result { + // Height at which we wanna start: min last saved + 1 or 0 + let starting_height = vecs.starting_height().min(trees.starting_height()); + + // But we also need to check the chain and start earlier in case of a reorg + let height = (starting_height + .checked_sub(NUMBER_OF_UNSAFE_BLOCKS as u32) + .unwrap_or_default()..*starting_height) // ..= because of last saved + 1 + .map(Height::from) + .find(|height| { + let rpc_blockhash = BlockHash::try_from((rpc, *height)).unwrap(); + let saved_blockhash = vecs.height_to_blockhash.get(*height).unwrap().unwrap(); + &rpc_blockhash != saved_blockhash.as_ref() + }) + .unwrap_or(starting_height); + + Ok(Self { + addressindex: *vecs.height_to_first_addressindex.get(height)?.context("")?, + emptyindex: *vecs.height_to_first_emptyindex.get(height)?.context("")?, + height, + multisigindex: *vecs.height_to_first_multisigindex.get(height)?.context("")?, + opreturnindex: *vecs.height_to_first_opreturnindex.get(height)?.context("")?, + p2pk33index: *vecs.height_to_first_p2pk33index.get(height)?.context("")?, + p2pk65index: *vecs.height_to_first_p2pk65index.get(height)?.context("")?, + p2pkhindex: *vecs.height_to_first_p2pkhindex.get(height)?.context("")?, + p2shindex: *vecs.height_to_first_p2shindex.get(height)?.context("")?, + p2trindex: *vecs.height_to_first_p2trindex.get(height)?.context("")?, + p2wpkhindex: *vecs.height_to_first_p2wpkhindex.get(height)?.context("")?, + p2wshindex: *vecs.height_to_first_p2wshindex.get(height)?.context("")?, + pushonlyindex: *vecs.height_to_first_pushonlyindex.get(height)?.context("")?, + txindex: *vecs.height_to_first_txindex.get(height)?.context("")?, + txinindex: *vecs.height_to_first_txinindex.get(height)?.context("")?, + txoutindex: *vecs.height_to_first_txoutindex.get(height)?.context("")?, + unknownindex: *vecs.height_to_first_unknownindex.get(height)?.context("")?, + }) + } +} diff --git a/indexer/src/structs/mod.rs b/indexer/src/structs/mod.rs index ff4050491..e31c7bffc 100644 --- a/indexer/src/structs/mod.rs +++ b/indexer/src/structs/mod.rs @@ -5,6 +5,7 @@ mod addresstypeindex; mod blockhash; mod compressed; mod height; +mod indexes; mod locktime; mod sats; mod timestamp; @@ -24,6 +25,7 @@ pub use addresstypeindex::*; pub use blockhash::*; pub use compressed::*; pub use height::*; +pub use indexes::*; pub use locktime::*; pub use sats::*; pub use timestamp::*; diff --git a/iterator/Cargo.toml b/iterator/Cargo.toml index 15a8700c5..943555b16 100644 --- a/iterator/Cargo.toml +++ b/iterator/Cargo.toml @@ -16,4 +16,3 @@ serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.138" derive_deref = { workspace = true } bitcoincore-rpc = "0.19.0" -# tokio = { version = "1.39.2", features = ["rt-multi-thread"] } diff --git a/iterator/src/lib.rs b/iterator/src/lib.rs index 3cf4f4e11..f76ef2a50 100644 --- a/iterator/src/lib.rs +++ b/iterator/src/lib.rs @@ -193,45 +193,6 @@ pub fn new( drain_and_send(&mut bulk) }); - // Tokio version: 1022s - // Slighlty slower than rayon version - // thread::spawn(move || { - // let rt = tokio::runtime::Runtime::new().unwrap(); - // let _guard = rt.enter(); - - // let mut tasks = VecDeque::with_capacity(BOUND); - - // recv_block_reader - // .iter() - // .try_for_each(move |(blk_metadata, block_state)| { - // let raw_block = match block_state { - // BlockState::Raw(vec) => vec, - // _ => unreachable!(), - // }; - - // tasks.push_back(tokio::task::spawn(async move { - // let block = Block::consensus_decode(&mut Cursor::new(raw_block)).unwrap(); - - // (blk_metadata, block) - // })); - - // while tasks.len() > BOUND { - // let (blk_metadata, block) = rt.block_on(tasks.pop_front().unwrap()).unwrap(); - - // if send_block - // .send(BlkMetadataAndBlock::new(blk_metadata, block)) - // .is_err() - // { - // return ControlFlow::Break(()); - // } - // } - - // ControlFlow::Continue(()) - // }); - // - // todo!("Send the rest") - // }); - thread::spawn(move || { let mut height = start_recap.map_or(0, |(_, recap)| recap.height()); diff --git a/iterator/src/main.rs b/iterator/src/main.rs index 908ba6127..f0a0b4e8a 100644 --- a/iterator/src/main.rs +++ b/iterator/src/main.rs @@ -11,7 +11,7 @@ fn main() { let auth = Auth::CookieFile(cookie); let rpc = Client::new(url, auth).unwrap(); - let start = Some(749900); + let start = None; let end = None; biter::new(data_dir, start, end, rpc) diff --git a/pricer/Cargo.toml b/pricer/Cargo.toml index 82473bf62..55e4c724f 100644 --- a/pricer/Cargo.toml +++ b/pricer/Cargo.toml @@ -1,5 +1,6 @@ [package] -name = "pricer" +name = "brice" +description = "A bitcoin price fetcher" version = "0.1.0" edition = "2021" @@ -7,16 +8,9 @@ edition = "2021" color-eyre = { workspace = true } derive_deref = { workspace = true } indexer = { workspace = true } -logger = { workspace = true } -reqwest = { version = "0.12.12", features = [ - "blocking", - "brotli", - "deflate", - "gzip", - "json", - "zstd", -] } jiff = { workspace = true } +logger = { workspace = true } +minreq = { version = "2.13.2", features = ["https", "serde_json"] } serde = { workspace = true } serde_json = { workspace = true } storable_vec = { workspace = true } diff --git a/pricer/src/fetchers/binance.rs b/pricer/src/fetchers/binance.rs index 944e25688..265462ca6 100644 --- a/pricer/src/fetchers/binance.rs +++ b/pricer/src/fetchers/binance.rs @@ -35,7 +35,7 @@ impl Binance { } } - fn get_from_1mn( + pub fn get_from_1mn( &mut self, timestamp: Timestamp, previous_timestamp: Option, @@ -44,7 +44,7 @@ impl Binance { self._1mn.replace(Self::fetch_1mn()?); } Pricer::::find_height_ohlc( - &self._1mn.as_ref().unwrap(), + self._1mn.as_ref().unwrap(), timestamp, previous_timestamp, "binance 1mn", @@ -55,7 +55,7 @@ impl Binance { info!("Fetching 1mn prices from Binance..."); retry( - |_| Self::json_to_timestamp_to_ohlc(&reqwest::blocking::get(Self::url("interval=1m&limit=1000"))?.json()?), + |_| Self::json_to_timestamp_to_ohlc(&minreq::get(Self::url("interval=1m&limit=1000")).send()?.json()?), 30, 10, ) @@ -74,11 +74,13 @@ impl Binance { .ok_or(color_eyre::eyre::Error::msg("Couldn't find date")) } - fn fetch_1d() -> color_eyre::Result> { + pub fn fetch_1d() -> color_eyre::Result> { info!("Fetching daily prices from Kraken..."); + dbg!(&Self::url("interval=1d")); + retry( - |_| Self::json_to_date_to_ohlc(&reqwest::blocking::get(Self::url("interval=1d"))?.json()?), + |_| Self::json_to_date_to_ohlc(&minreq::get(Self::url("interval=1d")).send()?.json()?), 30, 10, ) @@ -92,12 +94,7 @@ impl Binance { if self.har.is_none() { self.har.replace(self.read_har().unwrap_or_default()); } - Pricer::::find_height_ohlc( - &self.har.as_ref().unwrap(), - timestamp, - previous_timestamp, - "binance har", - ) + Pricer::::find_height_ohlc(self.har.as_ref().unwrap(), timestamp, previous_timestamp, "binance har") } fn read_har(&self) -> color_eyre::Result> { diff --git a/pricer/src/fetchers/kibo.rs b/pricer/src/fetchers/kibo.rs index 8cefa2299..847ab32a4 100644 --- a/pricer/src/fetchers/kibo.rs +++ b/pricer/src/fetchers/kibo.rs @@ -55,8 +55,9 @@ impl Kibo { |try_index| { let base_url = Self::get_base_url(try_index); - let body: Value = - reqwest::blocking::get(format!("{base_url}/height-to-price?chunk={}", height))?.json()?; + let body: Value = minreq::get(format!("{base_url}/height-to-price?chunk={}", height)) + .send()? + .json()?; let vec = body .as_object() @@ -112,7 +113,9 @@ impl Kibo { |try_index| { let base_url = Self::get_base_url(try_index); - let body: Value = reqwest::blocking::get(format!("{base_url}/date-to-price?chunk={}", year))?.json()?; + let body: Value = minreq::get(format!("{base_url}/date-to-price?chunk={}", year)) + .send()? + .json()?; body.as_object() .context("Expect to be an object")? diff --git a/pricer/src/fetchers/kraken.rs b/pricer/src/fetchers/kraken.rs index 4e8689756..6de067b6c 100644 --- a/pricer/src/fetchers/kraken.rs +++ b/pricer/src/fetchers/kraken.rs @@ -23,14 +23,14 @@ impl Kraken { if self._1mn.is_none() || self._1mn.as_ref().unwrap().last_key_value().unwrap().0 <= ×tamp { self._1mn.replace(Self::fetch_1mn()?); } - Pricer::::find_height_ohlc(&self._1mn.as_ref().unwrap(), timestamp, previous_timestamp, "kraken 1m") + Pricer::::find_height_ohlc(self._1mn.as_ref().unwrap(), timestamp, previous_timestamp, "kraken 1m") } fn fetch_1mn() -> color_eyre::Result> { info!("Fetching 1mn prices from Kraken..."); retry( - |_| Self::json_to_timestamp_to_ohlc(&reqwest::blocking::get(Self::url(1))?.json()?), + |_| Self::json_to_timestamp_to_ohlc(&minreq::get(Self::url(1)).send()?.json()?), 30, 10, ) @@ -52,7 +52,7 @@ impl Kraken { info!("Fetching daily prices from Kraken..."); retry( - |_| Self::json_to_date_to_ohlc(&reqwest::blocking::get(Self::url(1440))?.json()?), + |_| Self::json_to_date_to_ohlc(&minreq::get(Self::url(1440)).send()?.json()?), 30, 10, ) diff --git a/pricer/src/main.rs b/pricer/src/main.rs index 9efd33647..eaa6b12d5 100644 --- a/pricer/src/main.rs +++ b/pricer/src/main.rs @@ -1,14 +1,13 @@ -// fn main() {} - +use brice::{Binance, Kibo, Kraken}; use indexer::Height; -use pricer::{Binance, Kibo, Kraken}; +use serde_json::Value; fn main() -> color_eyre::Result<()> { color_eyre::install()?; logger::init_log(None); - // dbg!(Binance::fetch_1d_prices()?); + dbg!(Binance::fetch_1d()?); // dbg!(Binance::fetch_1mn_prices()); // dbg!(Kraken::fetch_1d()?); // dbg!(Kraken::fetch_1mn_prices()?); diff --git a/python/example.py b/python/parse.py similarity index 100% rename from python/example.py rename to python/parse.py diff --git a/server/Cargo.toml b/server/Cargo.toml index e2ad61572..e3e66f7ea 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,7 +11,7 @@ derive_deref = { workspace = true } indexer = { workspace = true } jiff = { workspace = true } logger = { workspace = true } -oxc = { version = "0.50.0", features = ["codegen", "minifier"] } +oxc = { version = "0.51.0", features = ["codegen", "minifier"] } reqwest = { version = "0.12.12", features = ["blocking", "json"] } serde = { workspace = true } serde_json = { workspace = true } diff --git a/storable_vec/Cargo.toml b/storable_vec/Cargo.toml index dc4fd3c8f..dcef7c1c4 100644 --- a/storable_vec/Cargo.toml +++ b/storable_vec/Cargo.toml @@ -12,6 +12,7 @@ json = ["dep:serde", "dep:serde_json"] [dependencies] memmap2 = "0.9.5" -zerocopy = { workspace = true } +rayon = { workspace = true } serde = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } +zerocopy = { workspace = true } diff --git a/storable_vec/src/lib.rs b/storable_vec/src/lib.rs index 40c0eb220..51f8da646 100644 --- a/storable_vec/src/lib.rs +++ b/storable_vec/src/lib.rs @@ -12,6 +12,7 @@ use std::{ }; pub use memmap2; +use rayon::prelude::*; pub use zerocopy; mod enums; @@ -71,9 +72,9 @@ pub struct StorableVec { /// In bytes const MAX_PAGE_SIZE: usize = 4 * 4096; -// const ONE_MB: usize = 1000 * 1024; -const MAX_CACHE_SIZE: usize = usize::MAX; -// const MAX_CACHE_SIZE: usize = 100 * ONE_MB; +const ONE_MB: usize = 1000 * 1024; +// const MAX_CACHE_SIZE: usize = usize::MAX; +const MAX_CACHE_SIZE: usize = 100 * ONE_MB; impl StorableVec where @@ -182,8 +183,7 @@ where fn reset_cache(&mut self) -> io::Result<()> { match MODE { CACHED_GETS => { - // par_iter_mut ? - self.cache.iter_mut().for_each(|lock| { + self.cache.par_iter_mut().for_each(|lock| { lock.take(); }); @@ -192,7 +192,7 @@ where if self.cache.len() != len { self.cache.resize_with(len, Default::default); - self.cache.shrink_to_fit(); + // self.cache.shrink_to_fit(); } Ok(()) @@ -270,10 +270,14 @@ where return Ok(()); } - let mut bytes: Vec = Vec::with_capacity(self.pushed_len() * Self::SIZE_OF_T); + let mut bytes: Vec = vec![0; self.pushed_len() * Self::SIZE_OF_T]; + + let unsafe_bytes = UnsafeSlice::new(&mut bytes); + mem::take(&mut self.pushed) - .into_iter() - .for_each(|v| bytes.extend_from_slice(v.as_bytes())); + .into_par_iter() + .enumerate() + .for_each(|(i, v)| unsafe_bytes.copy_slice(i * Self::SIZE_OF_T, v.as_bytes())); self.file.write_all(&bytes)?; @@ -291,8 +295,7 @@ where let value_at_index = self.open_file_at_then_read(index).ok(); - self.file - .set_len(Self::index_to_byte_index(index.checked_sub(1).unwrap_or_default()))?; + self.file.set_len(Self::index_to_byte_index(index))?; self.reset_disk_related_state()?; @@ -429,6 +432,28 @@ where Ok(self.get(index)?.map(|v| (*v).clone()).unwrap_or(Default::default())) } + pub fn iter_from(&self, mut index: I, mut f: F) -> Result<()> + where + F: FnMut((I, Value)) -> Result<()>, + { + let disk_len = I::from(Self::read_disk_len_(&self.file)?); + + while index < disk_len { + f((index, self.get(index)?.unwrap()))?; + index = index + 1; + } + + let mut index = I::from(0); + let pushed_len = I::from(self.pushed_len()); + let disk_len = Self::i_to_usize(disk_len)?; + while index < pushed_len { + f(((index + disk_len), self.get(index)?.map(Value::from).unwrap()))?; + index = index + 1; + } + + Ok(()) + } + #[inline] pub fn push(&mut self, value: T) { self.push_(value) diff --git a/storable_vec/src/structs/mod.rs b/storable_vec/src/structs/mod.rs index 7a60bc383..51482371f 100644 --- a/storable_vec/src/structs/mod.rs +++ b/storable_vec/src/structs/mod.rs @@ -1,3 +1,5 @@ +mod unsafe_slice; mod version; +pub use unsafe_slice::*; pub use version::*; diff --git a/storable_vec/src/structs/unsafe_slice.rs b/storable_vec/src/structs/unsafe_slice.rs new file mode 100644 index 000000000..8f354e0ca --- /dev/null +++ b/storable_vec/src/structs/unsafe_slice.rs @@ -0,0 +1,30 @@ +use std::cell::UnsafeCell; + +#[derive(Copy, Clone)] +pub struct UnsafeSlice<'a, T>(&'a [UnsafeCell]); +unsafe impl Send for UnsafeSlice<'_, T> {} +unsafe impl Sync for UnsafeSlice<'_, T> {} + +impl<'a, T> UnsafeSlice<'a, T> { + pub fn new(slice: &'a mut [T]) -> Self { + let ptr = slice as *mut [T] as *const [UnsafeCell]; + Self(unsafe { &*ptr }) + } + + /// SAFETY: It is UB if two threads write to the same index without + /// synchronization. + pub fn write(&self, i: usize, value: T) { + unsafe { + *self.0[i].get() = value; + } + } + + pub fn copy_slice(&self, start: usize, slice: &[T]) + where + T: Copy, + { + slice.iter().enumerate().for_each(|(i, v)| { + self.write(start + i, *v); + }); + } +}