diff --git a/Cargo.lock b/Cargo.lock index 6a27c048d..3395a206b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -506,10 +506,13 @@ dependencies = [ "brk_fetcher", "brk_grouper", "brk_indexer", + "brk_iterator", "brk_logger", "brk_mcp", + "brk_monitor", "brk_query", "brk_reader", + "brk_rpc", "brk_server", "brk_store", "brk_traversable", @@ -612,9 +615,11 @@ dependencies = [ "brk_error", "brk_fetcher", "brk_indexer", + "brk_iterator", "brk_logger", "brk_query", "brk_reader", + "brk_rpc", "brk_server", "clap", "color-eyre", @@ -637,8 +642,10 @@ dependencies = [ "brk_fetcher", "brk_grouper", "brk_indexer", + "brk_iterator", "brk_logger", "brk_reader", + "brk_rpc", "brk_store", "brk_traversable", "brk_types", @@ -783,6 +790,7 @@ dependencies = [ "brk_error", "brk_indexer", "brk_reader", + "brk_rpc", "brk_traversable", "brk_types", "derive_deref", @@ -1224,10 +1232,12 @@ dependencies = [ "brk_error", "brk_fetcher", "brk_indexer", + "brk_iterator", "brk_logger", "brk_mcp", "brk_query", "brk_reader", + "brk_rpc", "brk_traversable", "brk_types", "jiff", @@ -1375,9 +1385,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.41" +version = "1.2.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac9fe6cdbb24b6ade63616c0a0688e45bb56732262c158df3c0c4bea4ca47cb7" +checksum = "81bbf3b3619004ad9bd139f62a9ab5cfe467f307455a0d307b0cf58bf070feaa" dependencies = [ "find-msvc-tools", "jobserver", @@ -2018,9 +2028,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flate2" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc5a4e564e38c699f2880d3fda590bedc2e69f3f84cd48b457bd892ce61d0aa9" +checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" dependencies = [ "crc32fast", "libz-rs-sys", @@ -3757,9 +3767,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.102" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e0f6df8eaa422d97d72edcd152e1451618fed47fabbdbd5a8864167b1d4aff7" +checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" dependencies = [ "unicode-ident", ] diff --git a/crates/brk_binder/src/js.rs b/crates/brk_binder/src/js.rs index c847cf615..9e1c3e653 100644 --- a/crates/brk_binder/src/js.rs +++ b/crates/brk_binder/src/js.rs @@ -18,7 +18,7 @@ pub trait Bridge { fn generate_js_files(&self, modules_path: &Path) -> io::Result<()>; } -impl Bridge for Query<'static> { +impl Bridge for Query { fn generate_js_files(&self, modules_path: &Path) -> io::Result<()> { let path = modules_path.join("brk-client"); @@ -81,7 +81,7 @@ export const POOL_ID_TO_POOL_NAME = /** @type {const} */ ({ fs::write(path, contents) } -fn generate_metrics_file(query: &Query<'static>, parent: &Path) -> io::Result<()> { +fn generate_metrics_file(query: &Query, parent: &Path) -> io::Result<()> { let path = parent.join(Path::new("metrics.js")); let indexes = Index::all(); diff --git a/crates/brk_cli/Cargo.toml b/crates/brk_cli/Cargo.toml index 393b1d5b6..e0d7990c5 100644 --- a/crates/brk_cli/Cargo.toml +++ b/crates/brk_cli/Cargo.toml @@ -16,9 +16,11 @@ brk_computer = { workspace = true } brk_error = { workspace = true } brk_fetcher = { workspace = true } brk_indexer = { workspace = true } +brk_iterator = { workspace = true } brk_query = { workspace = true } brk_logger = { workspace = true } brk_reader = { workspace = true } +brk_rpc = { workspace = true } brk_server = { workspace = true } vecdb = { workspace = true } clap = { version = "4.5.50", features = ["derive", "string"] } diff --git a/crates/brk_cli/src/config.rs b/crates/brk_cli/src/config.rs index 83711ce69..c7bc510ad 100644 --- a/crates/brk_cli/src/config.rs +++ b/crates/brk_cli/src/config.rs @@ -3,10 +3,10 @@ use std::{ path::{Path, PathBuf}, }; -use bitcoincore_rpc::{self, Auth, Client}; +use brk_error::{Error, Result}; use brk_fetcher::Fetcher; +use brk_rpc::{Auth, Client}; use clap::Parser; -use color_eyre::eyre::eyre; use serde::{Deserialize, Deserializer, Serialize}; use crate::{default_bitcoin_path, default_brk_path, dot_brk_path, website::Website}; @@ -78,7 +78,7 @@ pub struct Config { } impl Config { - pub fn import() -> color_eyre::Result { + pub fn import() -> Result { let config_args = Some(Config::parse()); let path = dot_brk_path(); @@ -196,18 +196,18 @@ Finally, you can run the program with '-h' for help." fs::write(path, toml::to_string(self).unwrap()) } - pub fn rpc(&self) -> color_eyre::Result<&'static Client> { - Ok(Box::leak(Box::new(Client::new( + pub fn rpc(&self) -> Result { + Ok(Client::new( &format!( "http://{}:{}", self.rpcconnect().unwrap_or(&"localhost".to_string()), self.rpcport().unwrap_or(8332) ), self.rpc_auth().unwrap(), - )?))) + )?) } - fn rpc_auth(&self) -> color_eyre::Result { + fn rpc_auth(&self) -> Result { let cookie = self.path_cookiefile(); if cookie.is_file() { @@ -218,7 +218,7 @@ Finally, you can run the program with '-h' for help." self.rpcpassword.clone().unwrap(), )) } else { - Err(eyre!("Failed to find correct auth")) + Err(Error::Str("Failed to find correct auth")) } } diff --git a/crates/brk_cli/src/lib.rs b/crates/brk_cli/src/lib.rs index 0b2b7ef44..71f789dc3 100644 --- a/crates/brk_cli/src/lib.rs +++ b/crates/brk_cli/src/lib.rs @@ -8,12 +8,12 @@ use std::{ time::Duration, }; -use bitcoincore_rpc::{self, RpcApi}; use brk_binder::Bridge; use brk_bundler::bundle; use brk_computer::Computer; use brk_error::Result; use brk_indexer::Indexer; +use brk_iterator::Blocks; use brk_query::Query; use brk_reader::Reader; use brk_server::{Server, VERSION}; @@ -27,12 +27,7 @@ mod website; use crate::{config::Config, paths::*}; pub fn main() -> color_eyre::Result<()> { - color_eyre::install()?; - - fs::create_dir_all(dot_brk_path())?; - - brk_logger::init(Some(&dot_brk_log_path()))?; - + // Can't increase main thread's stack size, thus we need to use another thread thread::Builder::new() .stack_size(512 * 1024 * 1024) .spawn(run)? @@ -41,14 +36,22 @@ pub fn main() -> color_eyre::Result<()> { } pub fn run() -> color_eyre::Result<()> { + color_eyre::install()?; + + fs::create_dir_all(dot_brk_path())?; + + brk_logger::init(Some(&dot_brk_log_path()))?; + let config = Config::import()?; - let rpc = config.rpc()?; + let client = config.rpc()?; let exit = Exit::new(); exit.set_ctrlc_handler(); - let reader = Reader::new(config.blocksdir(), rpc); + let reader = Reader::new(config.blocksdir(), &client); + + let blocks = Blocks::new(&client, &reader); let mut indexer = Indexer::forced_import(&config.brkdir())?; @@ -112,7 +115,7 @@ pub fn run() -> color_eyre::Result<()> { None }; - let server = Server::new(query, bundle_path); + let server = Server::new(&query, bundle_path); tokio::spawn(async move { server.serve(true).await.unwrap(); @@ -128,40 +131,24 @@ pub fn run() -> color_eyre::Result<()> { let _handle = runtime.spawn(future); loop { - wait_for_synced_node(rpc)?; + client.wait_for_synced_node()?; - let block_count = rpc.get_block_count()?; + let last_height = client.get_last_height()?; - info!("{} blocks found.", block_count + 1); + info!("{} blocks found.", u32::from(last_height) + 1); let starting_indexes = if config.check_collisions() { - indexer.checked_index(&reader, rpc, &exit)?; + indexer.checked_index(&blocks, &client, &exit)? } else { - indexer.index(&reader, rpc, &exit)?; + indexer.index(&blocks, &client, &exit)? }; computer.compute(&indexer, starting_indexes, &reader, &exit)?; info!("Waiting for new blocks..."); - while block_count == rpc.get_block_count()? { + while last_height == client.get_last_height()? { sleep(Duration::from_secs(1)) } } } - -fn wait_for_synced_node(rpc_client: &bitcoincore_rpc::Client) -> color_eyre::Result<()> { - let is_synced = || -> color_eyre::Result { - let info = rpc_client.get_blockchain_info()?; - Ok(info.headers == info.blocks) - }; - - if !is_synced()? { - info!("Waiting for node to sync..."); - while !is_synced()? { - sleep(Duration::from_secs(1)) - } - } - - Ok(()) -} diff --git a/crates/brk_computer/examples/computer.rs b/crates/brk_computer/examples/computer.rs index e1cdce747..d2f41959c 100644 --- a/crates/brk_computer/examples/computer.rs +++ b/crates/brk_computer/examples/computer.rs @@ -14,6 +14,15 @@ use brk_rpc::{Auth, Client}; use vecdb::Exit; pub fn main() -> Result<()> { + // Can't increase main thread's stack size, thus we need to use another thread + thread::Builder::new() + .stack_size(512 * 1024 * 1024) + .spawn(run)? + .join() + .unwrap() +} + +fn run() -> Result<()> { brk_logger::init(Some(Path::new(".log")))?; let bitcoin_dir = Path::new(&std::env::var("HOME").unwrap()) @@ -22,6 +31,9 @@ pub fn main() -> Result<()> { .join("Bitcoin"); // let bitcoin_dir = Path::new("/Volumes/WD_BLACK/bitcoin"); + let outputs_dir = Path::new(&std::env::var("HOME").unwrap()).join(".brk"); + // let outputs_dir = Path::new("../../_outputs"); + let client = Client::new( "http://localhost:8332", Auth::CookieFile(bitcoin_dir.join(".cookie")), @@ -31,9 +43,6 @@ pub fn main() -> Result<()> { let blocks = Blocks::new(&client, &reader); - let outputs_dir = Path::new(&std::env::var("HOME").unwrap()).join(".brk"); - // let outputs_dir = Path::new("../../_outputs"); - let mut indexer = Indexer::forced_import(&outputs_dir)?; let fetcher = Fetcher::import(true, None)?; @@ -41,20 +50,13 @@ pub fn main() -> Result<()> { let exit = Exit::new(); exit.set_ctrlc_handler(); - // Can't increase main thread's stack size, thus we need to use another thread - thread::Builder::new() - .stack_size(256 * 1024 * 1024) - .spawn(move || -> Result<()> { - let mut computer = Computer::forced_import(&outputs_dir, &indexer, Some(fetcher))?; + let mut computer = Computer::forced_import(&outputs_dir, &indexer, Some(fetcher))?; - loop { - let i = Instant::now(); - let starting_indexes = indexer.checked_index(&blocks, &client, &exit)?; - computer.compute(&indexer, starting_indexes, &reader, &exit)?; - dbg!(i.elapsed()); - sleep(Duration::from_secs(10)); - } - })? - .join() - .unwrap() + loop { + let i = Instant::now(); + let starting_indexes = indexer.checked_index(&blocks, &client, &exit)?; + computer.compute(&indexer, starting_indexes, &reader, &exit)?; + dbg!(i.elapsed()); + sleep(Duration::from_secs(10)); + } } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 8d09b9648..ae23698ac 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -124,20 +124,12 @@ impl Indexer { }; let mut readers = Readers::new(&self.vecs); - let mut already_added_addressbyteshash: FxHashMap = - FxHashMap::default(); - let mut same_block_spent_outpoints: FxHashSet = FxHashSet::default(); - let mut same_block_output_info: FxHashMap = - FxHashMap::default(); let vecs = &mut self.vecs; let stores = &mut self.stores; for block in blocks.after(prev_hash)? { // let i_tot = Instant::now(); - already_added_addressbyteshash.clear(); - same_block_spent_outpoints.clear(); - same_block_output_info.clear(); let height = block.height(); let blockhash = block.hash(); @@ -305,16 +297,19 @@ impl Indexer { // println!("txinindex_and_txindata = : {:?}", i.elapsed()); // let i = Instant::now(); - same_block_spent_outpoints.extend(txins.iter().filter_map(|(_, input_source)| { - let InputSource::SameBlock((_, _, _, outpoint)) = input_source else { - return None; - }; - if !outpoint.is_coinbase() { - Some(*outpoint) - } else { - None - } - })); + let same_block_spent_outpoints: FxHashSet = txins + .iter() + .filter_map(|(_, input_source)| { + let InputSource::SameBlock((_, _, _, outpoint)) = input_source else { + return None; + }; + if !outpoint.is_coinbase() { + Some(*outpoint) + } else { + None + } + }) + .collect(); // println!("same_block_spent_outpoints = : {:?}", i.elapsed()); // let i = Instant::now(); @@ -475,6 +470,10 @@ impl Indexer { let tx_len = block.txdata.len(); // let i = Instant::now(); + let mut already_added_addressbyteshash: FxHashMap = + FxHashMap::default(); + let mut same_block_output_info: FxHashMap = + FxHashMap::default(); txouts .into_iter() .try_for_each(|data| -> Result<()> { diff --git a/crates/brk_indexer/src/stores_v2.rs b/crates/brk_indexer/src/stores_v2.rs index 6ebcf5afb..f3ce4e8e5 100644 --- a/crates/brk_indexer/src/stores_v2.rs +++ b/crates/brk_indexer/src/stores_v2.rs @@ -8,7 +8,7 @@ use brk_types::{ TxOutIndex, TxidPrefix, TypeIndex, TypeIndexAndOutPoint, TypeIndexAndTxIndex, Unit, Version, Vout, }; -use fjall2::{Keyspace, PersistMode}; +use fjall2::{PersistMode, TransactionalKeyspace}; use rayon::prelude::*; use vecdb::{AnyVec, StoredIndex, VecIterator}; @@ -18,7 +18,7 @@ use super::Vecs; #[derive(Clone)] pub struct Stores { - pub keyspace: Keyspace, + pub keyspace: TransactionalKeyspace, pub addressbyteshash_to_typeindex: Store, pub blockhashprefix_to_height: Store, diff --git a/crates/brk_rpc/src/lib.rs b/crates/brk_rpc/src/lib.rs index 0a272dcc3..efee7955b 100644 --- a/crates/brk_rpc/src/lib.rs +++ b/crates/brk_rpc/src/lib.rs @@ -1,8 +1,11 @@ +use std::thread::sleep; use std::{mem, sync::Arc, time::Duration}; use bitcoin::block::Header; use bitcoin::consensus::encode; -use bitcoincore_rpc::json::{GetBlockHeaderResult, GetBlockResult, GetTxOutResult}; +use bitcoincore_rpc::json::{ + GetBlockHeaderResult, GetBlockResult, GetBlockchainInfoResult, GetTxOutResult, +}; use bitcoincore_rpc::{Client as CoreClient, Error as RpcError, RpcApi}; use brk_error::Result; use brk_types::{BlockHash, Height, Sats, Transaction, TxIn, TxOut, TxStatus, Txid, Vout}; @@ -12,6 +15,7 @@ pub use bitcoincore_rpc::Auth; mod inner; use inner::ClientInner; +use log::info; /// /// Bitcoin Core RPC Client @@ -40,6 +44,13 @@ impl Client { )?))) } + /// Returns a data structure containing various state info regarding + /// blockchain processing. + pub fn get_blockchain_info(&self) -> Result { + self.call(move |c| c.get_blockchain_info()) + .map_err(Into::into) + } + pub fn get_block<'a, H>(&self, hash: &'a H) -> Result where &'a H: Into<&'a bitcoin::BlockHash>, @@ -230,6 +241,22 @@ impl Client { } } + pub fn wait_for_synced_node(&self) -> Result<()> { + let is_synced = || -> Result { + let info = self.get_blockchain_info()?; + Ok(info.headers == info.blocks) + }; + + if !is_synced()? { + info!("Waiting for node to sync..."); + while !is_synced()? { + sleep(Duration::from_secs(1)) + } + } + + Ok(()) + } + pub fn call(&self, f: F) -> Result where F: Fn(&CoreClient) -> Result, diff --git a/crates/brk_server/examples/main.rs b/crates/brk_server/examples/main.rs index 05300dba7..6ff46f4c8 100644 --- a/crates/brk_server/examples/main.rs +++ b/crates/brk_server/examples/main.rs @@ -1,4 +1,8 @@ -use std::{path::Path, thread::sleep, time::Duration}; +use std::{ + path::Path, + thread::{self, sleep}, + time::Duration, +}; use brk_computer::Computer; @@ -10,14 +14,29 @@ use brk_query::Query; use brk_reader::Reader; use brk_rpc::{Auth, Client}; use brk_server::Server; +use log::info; use vecdb::Exit; pub fn main() -> Result<()> { + // Can't increase main thread's stack size, thus we need to use another thread + thread::Builder::new() + .stack_size(512 * 1024 * 1024) + .spawn(run)? + .join() + .unwrap() +} + +fn run() -> Result<()> { brk_logger::init(Some(Path::new(".log")))?; - let process = true; + let bitcoin_dir = Path::new(&std::env::var("HOME").unwrap()) + .join("Library") + .join("Application Support") + .join("Bitcoin"); + // let bitcoin_dir = Path::new("/Volumes/WD_BLACK1/bitcoin"); - let bitcoin_dir = Path::new(""); + let outputs_dir = Path::new(&std::env::var("HOME").unwrap()).join(".brk"); + // let outputs_dir = Path::new("../../_outputs"); let client = Client::new( "http://localhost:8332", @@ -28,46 +47,48 @@ pub fn main() -> Result<()> { let blocks = Blocks::new(&client, &reader); - let outputs_dir = Path::new("../../_outputs"); - - let mut indexer = Indexer::forced_import(outputs_dir)?; + let mut indexer = Indexer::forced_import(&outputs_dir)?; let fetcher = Some(Fetcher::import(true, None)?); - let mut computer = Computer::forced_import(outputs_dir, &indexer, fetcher)?; + let mut computer = Computer::forced_import(&outputs_dir, &indexer, fetcher)?; let exit = Exit::new(); exit.set_ctrlc_handler(); - tokio::runtime::Builder::new_multi_thread() + let query = Query::build(&reader, &indexer, &computer); + + let future = async move { + let server = Server::new(&query, None); + + tokio::spawn(async move { + server.serve(true).await.unwrap(); + }); + + Ok(()) as Result<()> + }; + + let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() - .build()? - .block_on(async { - let query = Query::build(&reader, &indexer, &computer); + .build()?; - let server = Server::new(&query, None); + let _handle = runtime.spawn(future); - let server = tokio::spawn(async move { - server.serve(true).await.unwrap(); - }); + loop { + client.wait_for_synced_node()?; - if process { - loop { - let last_height = client.get_last_height()?; + let last_height = client.get_last_height()?; - let starting_indexes = indexer.checked_index(&blocks, &client, &exit)?; + info!("{} blocks found.", u32::from(last_height) + 1); - computer.compute(&indexer, starting_indexes, &reader, &exit)?; + let starting_indexes = indexer.checked_index(&blocks, &client, &exit)?; - while last_height == client.get_last_height()? { - sleep(Duration::from_secs(1)) - } - } - } + computer.compute(&indexer, starting_indexes, &reader, &exit)?; - #[allow(unreachable_code)] - server.await.unwrap(); + info!("Waiting for new blocks..."); - Ok(()) - }) + while last_height == client.get_last_height()? { + sleep(Duration::from_secs(1)) + } + } } diff --git a/crates/brk_store/src/v2/meta.rs b/crates/brk_store/src/v2/meta.rs index 8bcf063eb..8e5e91071 100644 --- a/crates/brk_store/src/v2/meta.rs +++ b/crates/brk_store/src/v2/meta.rs @@ -5,7 +5,7 @@ use std::{ use brk_error::Result; use brk_types::Version; -use fjall2::{Keyspace, PartitionHandle, PersistMode}; +use fjall2::{PersistMode, TransactionalKeyspace, TransactionalPartitionHandle}; use super::Height; @@ -18,13 +18,13 @@ pub struct StoreMeta { impl StoreMeta { pub fn checked_open( - keyspace: &Keyspace, + keyspace: &TransactionalKeyspace, path: &Path, version: Version, open_partition_handle: F, - ) -> Result<(Self, PartitionHandle)> + ) -> Result<(Self, TransactionalPartitionHandle)> where - F: Fn() -> Result, + F: Fn() -> Result, { fs::create_dir_all(path)?; diff --git a/crates/brk_store/src/v2/mod.rs b/crates/brk_store/src/v2/mod.rs index 493712ee4..732a04f92 100644 --- a/crates/brk_store/src/v2/mod.rs +++ b/crates/brk_store/src/v2/mod.rs @@ -1,10 +1,11 @@ -use std::{borrow::Cow, cmp, fmt::Debug, fs, hash::Hash, path::Path}; +use std::{borrow::Cow, cmp, fmt::Debug, fs, hash::Hash, mem, path::Path}; use brk_error::Result; use brk_types::{Height, Version}; use byteview6::ByteView; use fjall2::{ - InnerItem, Keyspace, PartitionCreateOptions, PartitionHandle, PersistMode, ValueType, + InnerItem, PartitionCreateOptions, PersistMode, TransactionalKeyspace, + TransactionalPartitionHandle, ValueType, }; use rustc_hash::{FxHashMap, FxHashSet}; @@ -18,18 +19,18 @@ use meta::*; pub struct StoreV2 { meta: StoreMeta, name: &'static str, - keyspace: Keyspace, - partition: PartitionHandle, + keyspace: TransactionalKeyspace, + partition: TransactionalPartitionHandle, puts: FxHashMap, dels: FxHashSet, } const MAJOR_FJALL_VERSION: Version = Version::TWO; -pub fn open_keyspace(path: &Path) -> fjall2::Result { +pub fn open_keyspace(path: &Path) -> fjall2::Result { fjall2::Config::new(path.join("fjall")) .max_write_buffer_size(32 * 1024 * 1024) - .open() + .open_transactional() } impl StoreV2 @@ -39,23 +40,25 @@ where ByteView: From + From, { fn open_partition_handle( - keyspace: &Keyspace, + keyspace: &TransactionalKeyspace, name: &str, bloom_filters: Option, - ) -> Result { + ) -> Result { let mut options = PartitionCreateOptions::default() .max_memtable_size(8 * 1024 * 1024) .manual_journal_persist(true); if bloom_filters.is_some_and(|b| !b) { options = options.bloom_filter_bits(None); + } else { + options = options.bloom_filter_bits(Some(5)); } keyspace.open_partition(name, options).map_err(|e| e.into()) } pub fn import( - keyspace: &Keyspace, + keyspace: &TransactionalKeyspace, path: &Path, name: &str, version: Version, @@ -100,12 +103,16 @@ where } pub fn is_empty(&self) -> Result { - self.partition.is_empty().map_err(|e| e.into()) + self.keyspace + .read_tx() + .is_empty(&self.partition) + .map_err(|e| e.into()) } pub fn iter(&self) -> impl Iterator { - self.partition - .iter() + self.keyspace + .read_tx() + .iter(&self.partition) .map(|res| res.unwrap()) .map(|(k, v)| (K::from(ByteView::from(&*k)), V::from(ByteView::from(&*v)))) } @@ -140,6 +147,11 @@ where } } + #[inline] + pub fn approximate_len(&self) -> usize { + self.partition.approximate_len() + } + #[inline] fn has(&self, height: Height) -> bool { self.meta.has(height) @@ -169,16 +181,19 @@ where return Ok(()); } - let mut items = self - .puts - .drain() + let mut items = mem::take(&mut self.puts) + .into_iter() .map(|(key, value)| Item::Value { key, value }) - .chain(self.dels.drain().map(|key| Item::Tomb(key))) + .chain( + mem::take(&mut self.dels) + .into_iter() + .map(|key| Item::Tomb(key)), + ) .collect::>(); items.sort_unstable(); - self.keyspace.batch().commit_partition( - &self.partition, + self.keyspace.inner().batch().commit_partition( + self.partition.inner(), items.into_iter().map(InnerItem::from).collect::>(), )?;