From e28a0cde5579e280f725be33f7ac49b327845189 Mon Sep 17 00:00:00 2001 From: nym21 Date: Mon, 4 Aug 2025 23:48:20 +0200 Subject: [PATCH] vecs: fix race condition --- crates/brk_indexer/src/indexes.rs | 245 +++++++++++------- crates/brk_indexer/src/lib.rs | 44 ++-- crates/brk_vecs/src/file/mod.rs | 60 +++-- crates/brk_vecs/src/file/reader.rs | 5 +- crates/brk_vecs/src/file/region.rs | 2 + .../brk_vecs/src/variants/compressed/mod.rs | 33 ++- .../brk_vecs/src/variants/compressed/pages.rs | 6 +- crates/brk_vecs/src/variants/raw/header.rs | 6 +- crates/brk_vecs/src/variants/raw/mod.rs | 7 +- 9 files changed, 251 insertions(+), 157 deletions(-) diff --git a/crates/brk_indexer/src/indexes.rs b/crates/brk_indexer/src/indexes.rs index 7e906d0ec..a55334e18 100644 --- a/crates/brk_indexer/src/indexes.rs +++ b/crates/brk_indexer/src/indexes.rs @@ -97,12 +97,20 @@ impl TryFrom<(&mut Vecs, &Stores, &Client)> for Indexes { let stores_starting_height = stores.starting_height(); let starting_height = vecs_starting_height.min(stores_starting_height); + // dbg!( + // vecs_starting_height, + // stores_starting_height, + // starting_height + // ); + let range = u32::from( starting_height .checked_sub(NUMBER_OF_UNSAFE_BLOCKS as u32) .unwrap_or_default(), )..u32::from(starting_height); + let mut height_to_blockhash_iter = vecs.height_to_blockhash.iter(); + // But we also need to check the chain and start earlier in case of a reorg let height = range // ..= because of last saved + 1 .map(Height::from) @@ -113,101 +121,160 @@ impl TryFrom<(&mut Vecs, &Stores, &Client)> for Indexes { }) .unwrap(); - vecs.height_to_blockhash - .iter() + height_to_blockhash_iter .get(*height) .is_none_or(|saved_blockhash| &rpc_blockhash != saved_blockhash.as_ref()) }) .unwrap_or(starting_height); - Ok(Self { - emptyoutputindex: starting_index( - &vecs.height_to_first_emptyoutputindex, - &vecs.emptyoutputindex_to_txindex, - height, - ) - .ok_or(Error::Str(""))?, + let emptyoutputindex = starting_index( + &vecs.height_to_first_emptyoutputindex, + &vecs.emptyoutputindex_to_txindex, height, - p2msoutputindex: starting_index( - &vecs.height_to_first_p2msoutputindex, - &vecs.p2msoutputindex_to_txindex, - height, - ) - .ok_or(Error::Str(""))?, - opreturnindex: starting_index( - &vecs.height_to_first_opreturnindex, - &vecs.opreturnindex_to_txindex, - height, - ) - .ok_or(Error::Str(""))?, - p2pk33addressindex: starting_index( - &vecs.height_to_first_p2pk33addressindex, - &vecs.p2pk33addressindex_to_p2pk33bytes, - height, - ) - .ok_or(Error::Str(""))?, - p2pk65addressindex: starting_index( - &vecs.height_to_first_p2pk65addressindex, - &vecs.p2pk65addressindex_to_p2pk65bytes, - height, - ) - .ok_or(Error::Str(""))?, - p2pkhaddressindex: starting_index( - &vecs.height_to_first_p2pkhaddressindex, - &vecs.p2pkhaddressindex_to_p2pkhbytes, - height, - ) - .ok_or(Error::Str(""))?, - p2shaddressindex: starting_index( - &vecs.height_to_first_p2shaddressindex, - &vecs.p2shaddressindex_to_p2shbytes, - height, - ) - .ok_or(Error::Str(""))?, - p2traddressindex: starting_index( - &vecs.height_to_first_p2traddressindex, - &vecs.p2traddressindex_to_p2trbytes, - height, - ) - .ok_or(Error::Str(""))?, - p2wpkhaddressindex: starting_index( - &vecs.height_to_first_p2wpkhaddressindex, - &vecs.p2wpkhaddressindex_to_p2wpkhbytes, - height, - ) - .ok_or(Error::Str(""))?, - p2wshaddressindex: starting_index( - &vecs.height_to_first_p2wshaddressindex, - &vecs.p2wshaddressindex_to_p2wshbytes, - height, - ) - .ok_or(Error::Str(""))?, - p2aaddressindex: starting_index( - &vecs.height_to_first_p2aaddressindex, - &vecs.p2aaddressindex_to_p2abytes, - height, - ) - .ok_or(Error::Str(""))?, - txindex: starting_index(&vecs.height_to_first_txindex, &vecs.txindex_to_txid, height) - .ok_or(Error::Str(""))?, - inputindex: starting_index( - &vecs.height_to_first_inputindex, - &vecs.inputindex_to_outputindex, - height, - ) - .ok_or(Error::Str(""))?, - outputindex: starting_index( - &vecs.height_to_first_outputindex, - &vecs.outputindex_to_value, - height, - ) - .ok_or(Error::Str(""))?, - unknownoutputindex: starting_index( - &vecs.height_to_first_unknownoutputindex, - &vecs.unknownoutputindex_to_txindex, - height, - ) - .ok_or(Error::Str(""))?, + ) + .ok_or(Error::Str(""))?; + + // dbg!(emptyoutputindex); + + let p2msoutputindex = starting_index( + &vecs.height_to_first_p2msoutputindex, + &vecs.p2msoutputindex_to_txindex, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(p2msoutputindex); + + let opreturnindex = starting_index( + &vecs.height_to_first_opreturnindex, + &vecs.opreturnindex_to_txindex, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(opreturnindex); + + let p2pk33addressindex = starting_index( + &vecs.height_to_first_p2pk33addressindex, + &vecs.p2pk33addressindex_to_p2pk33bytes, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(p2pk33addressindex); + + let p2pk65addressindex = starting_index( + &vecs.height_to_first_p2pk65addressindex, + &vecs.p2pk65addressindex_to_p2pk65bytes, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(p2pk65addressindex); + + let p2pkhaddressindex = starting_index( + &vecs.height_to_first_p2pkhaddressindex, + &vecs.p2pkhaddressindex_to_p2pkhbytes, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(p2pkhaddressindex); + + let p2shaddressindex = starting_index( + &vecs.height_to_first_p2shaddressindex, + &vecs.p2shaddressindex_to_p2shbytes, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(p2shaddressindex); + + let p2traddressindex = starting_index( + &vecs.height_to_first_p2traddressindex, + &vecs.p2traddressindex_to_p2trbytes, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(p2traddressindex); + + let p2wpkhaddressindex = starting_index( + &vecs.height_to_first_p2wpkhaddressindex, + &vecs.p2wpkhaddressindex_to_p2wpkhbytes, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(p2wpkhaddressindex); + + let p2wshaddressindex = starting_index( + &vecs.height_to_first_p2wshaddressindex, + &vecs.p2wshaddressindex_to_p2wshbytes, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(p2wshaddressindex); + + let p2aaddressindex = starting_index( + &vecs.height_to_first_p2aaddressindex, + &vecs.p2aaddressindex_to_p2abytes, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(p2aaddressindex); + + let txindex = starting_index(&vecs.height_to_first_txindex, &vecs.txindex_to_txid, height) + .ok_or(Error::Str(""))?; + + // dbg!(txindex); + + let inputindex = starting_index( + &vecs.height_to_first_inputindex, + &vecs.inputindex_to_outputindex, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(inputindex); + + let outputindex = starting_index( + &vecs.height_to_first_outputindex, + &vecs.outputindex_to_value, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(outputindex); + + let unknownoutputindex = starting_index( + &vecs.height_to_first_unknownoutputindex, + &vecs.unknownoutputindex_to_txindex, + height, + ) + .ok_or(Error::Str(""))?; + + // dbg!(unknownoutputindex); + + Ok(Self { + emptyoutputindex, + height, + p2msoutputindex, + opreturnindex, + p2pk33addressindex, + p2pk65addressindex, + p2pkhaddressindex, + p2shaddressindex, + p2traddressindex, + p2wpkhaddressindex, + p2wshaddressindex, + p2aaddressindex, + txindex, + inputindex, + outputindex, + unknownoutputindex, }) } } diff --git a/crates/brk_indexer/src/lib.rs b/crates/brk_indexer/src/lib.rs index 99c6ad0bc..c1314789f 100644 --- a/crates/brk_indexer/src/lib.rs +++ b/crates/brk_indexer/src/lib.rs @@ -26,7 +26,7 @@ pub use indexes::*; pub use stores::*; pub use vecs::*; -const SNAPSHOT_BLOCK_RANGE: usize = 1000; +const SNAPSHOT_BLOCK_RANGE: usize = 1_000; const COLLISIONS_CHECKED_UP_TO: Height = Height::new(907_000); const VERSION: Version = Version::ONE; @@ -64,11 +64,13 @@ impl Indexer { ) -> Result { let file = self.file.clone(); + // dbg!(self.file.regions().id_to_index()); + // dbg!(self.file.layout()); + let starting_indexes = Indexes::try_from((&mut self.vecs, &self.stores, rpc)) .unwrap_or_else(|_report| Indexes::default()); - // dbg!(starting_indexes); - // panic!(); + // dbg!(&starting_indexes); let lock = exit.lock(); self.stores @@ -180,15 +182,15 @@ impl Indexer { idxs.height = height; - let txindex_to_first_outputindex_mmap = txindex_to_first_outputindex_reader_opt.as_ref().unwrap(); - let p2pk65addressindex_to_p2pk65bytes_mmap = p2pk65addressindex_to_p2pk65bytes_reader_opt.as_ref().unwrap(); - let p2pk33addressindex_to_p2pk33bytes_mmap = p2pk33addressindex_to_p2pk33bytes_reader_opt.as_ref().unwrap(); - let p2pkhaddressindex_to_p2pkhbytes_mmap = p2pkhaddressindex_to_p2pkhbytes_reader_opt.as_ref().unwrap(); - let p2shaddressindex_to_p2shbytes_mmap = p2shaddressindex_to_p2shbytes_reader_opt.as_ref().unwrap(); - let p2wpkhaddressindex_to_p2wpkhbytes_mmap = p2wpkhaddressindex_to_p2wpkhbytes_reader_opt.as_ref().unwrap(); - let p2wshaddressindex_to_p2wshbytes_mmap = p2wshaddressindex_to_p2wshbytes_reader_opt.as_ref().unwrap(); - let p2traddressindex_to_p2trbytes_mmap = p2traddressindex_to_p2trbytes_reader_opt.as_ref().unwrap(); - let p2aaddressindex_to_p2abytes_mmap = p2aaddressindex_to_p2abytes_reader_opt.as_ref().unwrap(); + let txindex_to_first_outputindex_reader = txindex_to_first_outputindex_reader_opt.as_ref().unwrap(); + let p2pk65addressindex_to_p2pk65bytes_reader = p2pk65addressindex_to_p2pk65bytes_reader_opt.as_ref().unwrap(); + let p2pk33addressindex_to_p2pk33bytes_reader = p2pk33addressindex_to_p2pk33bytes_reader_opt.as_ref().unwrap(); + let p2pkhaddressindex_to_p2pkhbytes_reader = p2pkhaddressindex_to_p2pkhbytes_reader_opt.as_ref().unwrap(); + let p2shaddressindex_to_p2shbytes_reader = p2shaddressindex_to_p2shbytes_reader_opt.as_ref().unwrap(); + let p2wpkhaddressindex_to_p2wpkhbytes_reader = p2wpkhaddressindex_to_p2wpkhbytes_reader_opt.as_ref().unwrap(); + let p2wshaddressindex_to_p2wshbytes_reader = p2wshaddressindex_to_p2wshbytes_reader_opt.as_ref().unwrap(); + let p2traddressindex_to_p2trbytes_reader = p2traddressindex_to_p2trbytes_reader_opt.as_ref().unwrap(); + let p2aaddressindex_to_p2abytes_reader = p2aaddressindex_to_p2abytes_reader_opt.as_ref().unwrap(); // Used to check rapidhash collisions let check_collisions = check_collisions && height > COLLISIONS_CHECKED_UP_TO ; @@ -291,7 +293,7 @@ impl Indexer { let vout = Vout::from(outpoint.vout); - let outputindex = vecs.txindex_to_first_outputindex.get_or_read(prev_txindex, txindex_to_first_outputindex_mmap)? + let outputindex = vecs.txindex_to_first_outputindex.get_or_read(prev_txindex, txindex_to_first_outputindex_reader)? .ok_or(Error::Str("Expect outputindex to not be none")) .inspect_err(|_| { dbg!(outpoint.txid, prev_txindex, vout); @@ -377,35 +379,35 @@ impl Indexer { let prev_addressbytes_opt = match outputtype { OutputType::P2PK65 => vecs .p2pk65addressindex_to_p2pk65bytes - .get_or_read(typeindex.into(), p2pk65addressindex_to_p2pk65bytes_mmap)? + .get_or_read(typeindex.into(), p2pk65addressindex_to_p2pk65bytes_reader)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2PK33 => vecs .p2pk33addressindex_to_p2pk33bytes - .get_or_read(typeindex.into(), p2pk33addressindex_to_p2pk33bytes_mmap)? + .get_or_read(typeindex.into(), p2pk33addressindex_to_p2pk33bytes_reader)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2PKH => vecs .p2pkhaddressindex_to_p2pkhbytes - .get_or_read(typeindex.into(), p2pkhaddressindex_to_p2pkhbytes_mmap)? + .get_or_read(typeindex.into(), p2pkhaddressindex_to_p2pkhbytes_reader)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2SH => vecs .p2shaddressindex_to_p2shbytes - .get_or_read(typeindex.into(), p2shaddressindex_to_p2shbytes_mmap)? + .get_or_read(typeindex.into(), p2shaddressindex_to_p2shbytes_reader)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2WPKH => vecs .p2wpkhaddressindex_to_p2wpkhbytes - .get_or_read(typeindex.into(), p2wpkhaddressindex_to_p2wpkhbytes_mmap)? + .get_or_read(typeindex.into(), p2wpkhaddressindex_to_p2wpkhbytes_reader)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2WSH => vecs .p2wshaddressindex_to_p2wshbytes - .get_or_read(typeindex.into(), p2wshaddressindex_to_p2wshbytes_mmap)? + .get_or_read(typeindex.into(), p2wshaddressindex_to_p2wshbytes_reader)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2TR => vecs .p2traddressindex_to_p2trbytes - .get_or_read(typeindex.into(), p2traddressindex_to_p2trbytes_mmap)? + .get_or_read(typeindex.into(), p2traddressindex_to_p2trbytes_reader)? .map(|v| AddressBytes::from(v.into_owned())), OutputType::P2A => vecs .p2aaddressindex_to_p2abytes - .get_or_read(typeindex.into(), p2aaddressindex_to_p2abytes_mmap)? + .get_or_read(typeindex.into(), p2aaddressindex_to_p2abytes_reader)? .map(|v| AddressBytes::from(v.into_owned())), _ => { unreachable!() diff --git a/crates/brk_vecs/src/file/mod.rs b/crates/brk_vecs/src/file/mod.rs index a70d9fd45..15f4a9464 100644 --- a/crates/brk_vecs/src/file/mod.rs +++ b/crates/brk_vecs/src/file/mod.rs @@ -143,7 +143,7 @@ impl File { #[inline] pub fn write_all_to_region(&self, identifier: Identifier, data: &[u8]) -> Result<()> { - self.write_all_to_region_at_(identifier, data, None) + self.write_all_to_region_at_(identifier, data, None, false) } #[inline] @@ -153,14 +153,25 @@ impl File { data: &[u8], at: u64, ) -> Result<()> { - self.write_all_to_region_at_(identifier, data, Some(at)) + self.write_all_to_region_at_(identifier, data, Some(at), false) } + pub fn truncate_write_all_to_region( + &self, + identifier: Identifier, + at: u64, + data: &[u8], + ) -> Result<()> { + self.write_all_to_region_at_(identifier, data, Some(at), true) + } + + // NEW fn write_all_to_region_at_( &self, identifier: Identifier, data: &[u8], at: Option, + truncate: bool, ) -> Result<()> { let regions = self.regions.read(); let Some(region_lock) = regions.get_region(identifier.clone()) else { @@ -176,17 +187,22 @@ impl File { drop(region); let data_len = data.len() as u64; - let new_len = at.map_or(len + data_len, |at| (at + data_len).max(len)); - + let new_len = at.map_or(len + data_len, |at| { + assert!(at <= len); + let new_len = at + data_len; + if truncate { new_len } else { new_len.max(len) } + }); let write_start = start + at.unwrap_or(len); - if at.is_some_and(|at| at >= start + reserved) { + if at.is_some_and(|at| at >= reserved) { return Err(Error::Str("Invalid at parameter")); } // Write to reserved space if possible if new_len <= reserved { - println!("Write to {region_index} reserved space at {write_start}"); + // println!( + // "Write {data_len} bytes to {region_index} reserved space at {write_start} (start = {start}, at = {at:?}, len = {len})" + // ); if at.is_none() { self.write(write_start, data); @@ -216,7 +232,7 @@ impl File { // If is last continue writing if layout.is_last_anything(region_index) { - println!("{region_index} Append to file at {write_start}"); + // println!("{region_index} Append to file at {write_start}"); self.set_min_len(start + new_reserved)?; let mut region = region_lock.write(); @@ -239,7 +255,7 @@ impl File { .get_hole(hole_start) .is_some_and(|gap| gap >= added_reserve) { - println!("Expand {region_index} to hole"); + // println!("Expand {region_index} to hole"); layout.remove_or_compress_hole(hole_start, added_reserve); let mut region = region_lock.write(); @@ -258,7 +274,7 @@ impl File { // Find hole big enough to move the region if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) { - println!("Move {region_index} to hole at {hole_start}"); + // println!("Move {region_index} to hole at {hole_start}"); layout.remove_or_compress_hole(hole_start, new_reserved); drop(layout); @@ -284,11 +300,11 @@ impl File { let new_start = layout.len(®ions); // Write at the end - println!( - "Move {region_index} to the end, from {start}..{} to {new_start}..{}", - start + reserved, - new_start + new_reserved - ); + // println!( + // "Move {region_index} to the end, from {start}..{} to {new_start}..{}", + // start + reserved, + // new_start + new_reserved + // ); self.set_min_len(new_start + new_reserved)?; layout.reserve(new_start, new_reserved); drop(layout); @@ -329,9 +345,13 @@ impl File { slice[start..end].copy_from_slice(data); } + /// /// From relative to start + /// + /// DO NOT call any `write_all` function right after as there could be a race condition if hole punching happens + /// pub fn truncate_region(&self, identifier: Identifier, from: u64) -> Result<()> { - let Some(region) = self.regions.read().get_region(identifier) else { + let Some(region) = self.regions.read().get_region(identifier.clone()) else { return Err(Error::Str("Unknown region")); }; let mut region_ = region.write(); @@ -354,7 +374,10 @@ impl File { if start > end { unreachable!("Should not be possible"); } else if start < end { - self.punch_hole(start, end - start)?; + let length = end - start; + // if length > PAGE_SIZE { + self.punch_hole(start, length)?; + // } } Ok(()) } @@ -457,12 +480,15 @@ impl File { }) } + #[inline] fn punch_hole(&self, start: u64, length: u64) -> Result<()> { let file = self.file.write(); - Self::punch_hole_impl(&file, start, length) + Self::punch_hole_(&file, start, length) } + #[inline] fn punch_hole_(file: &fs::File, start: u64, length: u64) -> Result<()> { + // println!("Punching {length} bytes hole at {start}"); Self::punch_hole_impl(file, start, length) } diff --git a/crates/brk_vecs/src/file/reader.rs b/crates/brk_vecs/src/file/reader.rs index e95bb0bfb..62be55c54 100644 --- a/crates/brk_vecs/src/file/reader.rs +++ b/crates/brk_vecs/src/file/reader.rs @@ -32,7 +32,8 @@ impl<'a> Reader<'a> { &self.region } - pub fn prefixed(&self, offset: usize) -> &[u8] { - &self.mmap[offset..] + pub fn prefixed(&self, offset: u64) -> &[u8] { + let start = self.region.start() + offset; + &self.mmap[start as usize..] } } diff --git a/crates/brk_vecs/src/file/region.rs b/crates/brk_vecs/src/file/region.rs index ec74f5deb..c08021e3d 100644 --- a/crates/brk_vecs/src/file/region.rs +++ b/crates/brk_vecs/src/file/region.rs @@ -12,6 +12,7 @@ pub struct Region { len: u64, /// Must be multiple of 4096, greater or equal to len reserved: u64, + padding: u64, } pub const SIZE_OF_REGION: usize = size_of::(); @@ -27,6 +28,7 @@ impl Region { start, len, reserved, + padding: 0, } } diff --git a/crates/brk_vecs/src/variants/compressed/mod.rs b/crates/brk_vecs/src/variants/compressed/mod.rs index e6a9d6212..a2fb8d2ae 100644 --- a/crates/brk_vecs/src/variants/compressed/mod.rs +++ b/crates/brk_vecs/src/variants/compressed/mod.rs @@ -93,6 +93,12 @@ where let vec: Vec = pco::standalone::simple_decompress(slice)?; let vec = T::from_inner_slice(vec); + if vec.len() != page.values as usize { + dbg!((offset, len)); + dbg!(vec); + unreachable!() + } + Ok(vec) } @@ -205,11 +211,7 @@ where fn flush(&mut self) -> Result<()> { self.inner.write_header_if_needed()?; - let pushed_len = self.pushed_len(); - - let has_new_data = pushed_len != 0; - - if !has_new_data { + if self.is_pushed_empty() { return Ok(()); } @@ -222,9 +224,7 @@ where let mut truncate_at = None; if stored_len % Self::PER_PAGE != 0 { - if pages.is_empty() { - unreachable!() - } + assert!(!pages.is_empty()); let last_page_index = pages.len() - 1; @@ -236,7 +236,8 @@ where }) .unwrap(); - truncate_at.replace(pages.pop().unwrap().start); + let start = pages.pop().unwrap().start; + truncate_at.replace(start); starting_page_index = last_page_index; } @@ -254,14 +255,10 @@ where let prev = pages.get(page_index - 1).unwrap(); prev.start + prev.bytes as u64 } else { - 0 + HEADER_OFFSET as u64 }; - let page = Page::new( - start + HEADER_OFFSET as u64, - bytes.len() as u32, - *len as u32, - ); + let page = Page::new(start, bytes.len() as u32, *len as u32); pages.checked_push(page_index, page); }); @@ -274,11 +271,11 @@ where let file = self.file(); if let Some(truncate_at) = truncate_at { - file.truncate_region(self.region_index().into(), truncate_at)?; + file.truncate_write_all_to_region(self.region_index().into(), truncate_at, &buf)?; + } else { + file.write_all_to_region(self.region_index().into(), &buf)?; } - file.write_all_to_region(self.region_index().into(), &buf)?; - pages.flush(file)?; Ok(()) diff --git a/crates/brk_vecs/src/variants/compressed/pages.rs b/crates/brk_vecs/src/variants/compressed/pages.rs index d837b056e..e55b03593 100644 --- a/crates/brk_vecs/src/variants/compressed/pages.rs +++ b/crates/brk_vecs/src/variants/compressed/pages.rs @@ -49,12 +49,10 @@ impl Pages { let change_at = self.change_at.take().unwrap(); let at = (change_at * Self::SIZE_OF_PAGE) as u64; - file.truncate_region(self.region_index.into(), at)?; - - file.write_all_to_region_at( + file.truncate_write_all_to_region( self.region_index.into(), - self.vec[change_at..].as_bytes(), at, + self.vec[change_at..].as_bytes(), )?; Ok(()) diff --git a/crates/brk_vecs/src/variants/raw/header.rs b/crates/brk_vecs/src/variants/raw/header.rs index ae9d3cf5f..9541682b4 100644 --- a/crates/brk_vecs/src/variants/raw/header.rs +++ b/crates/brk_vecs/src/variants/raw/header.rs @@ -84,14 +84,15 @@ impl Header { } } -#[repr(C)] #[derive(Debug, Clone, FromBytes, IntoBytes, Immutable, KnownLayout)] +#[repr(C)] struct HeaderInner { pub header_version: Version, pub vec_version: Version, pub computed_version: Version, pub stamp: Stamp, pub compressed: ZeroCopyBool, + pub padding: [u8; 31], } impl HeaderInner { @@ -107,6 +108,7 @@ impl HeaderInner { computed_version: Version::default(), stamp: Stamp::default(), compressed: ZeroCopyBool::from(format), + padding: Default::default(), }; header.write(file, region_index)?; Ok(header) @@ -172,7 +174,7 @@ impl HeaderInner { Immutable, KnownLayout, )] -pub struct ZeroCopyBool(u64); +pub struct ZeroCopyBool(u8); impl ZeroCopyBool { pub const TRUE: Self = Self(1); diff --git a/crates/brk_vecs/src/variants/raw/mod.rs b/crates/brk_vecs/src/variants/raw/mod.rs index 4f8588193..9f32e74d2 100644 --- a/crates/brk_vecs/src/variants/raw/mod.rs +++ b/crates/brk_vecs/src/variants/raw/mod.rs @@ -202,9 +202,7 @@ where fn flush(&mut self) -> Result<()> { self.write_header_if_needed()?; - let pushed_len = self.pushed_len(); - - let has_new_data = pushed_len != 0; + let has_new_data = !self.is_pushed_empty(); let has_updated_data = !self.updated.is_empty(); let has_holes = !self.holes.is_empty(); let had_holes = self.has_stored_holes && !has_holes; @@ -273,7 +271,7 @@ where { #[inline] fn read_(&self, index: usize, reader: &Reader) -> Result { - T::read_from_prefix(reader.prefixed(index * Self::SIZE_OF_T + HEADER_OFFSET)) + T::read_from_prefix(reader.prefixed((index * Self::SIZE_OF_T + HEADER_OFFSET) as u64)) .map(|(v, _)| v) .map_err(Error::from) } @@ -318,6 +316,7 @@ where } let from = index * Self::SIZE_OF_T + HEADER_OFFSET; + self.file .truncate_region(self.region_index.into(), from as u64) }