vecs: fix race condition

This commit is contained in:
nym21
2025-08-04 23:48:20 +02:00
parent 5b855fd835
commit e28a0cde55
9 changed files with 251 additions and 157 deletions
+156 -89
View File
@@ -97,12 +97,20 @@ impl TryFrom<(&mut Vecs, &Stores, &Client)> for Indexes {
let stores_starting_height = stores.starting_height();
let starting_height = vecs_starting_height.min(stores_starting_height);
// dbg!(
// vecs_starting_height,
// stores_starting_height,
// starting_height
// );
let range = u32::from(
starting_height
.checked_sub(NUMBER_OF_UNSAFE_BLOCKS as u32)
.unwrap_or_default(),
)..u32::from(starting_height);
let mut height_to_blockhash_iter = vecs.height_to_blockhash.iter();
// But we also need to check the chain and start earlier in case of a reorg
let height = range // ..= because of last saved + 1
.map(Height::from)
@@ -113,101 +121,160 @@ impl TryFrom<(&mut Vecs, &Stores, &Client)> for Indexes {
})
.unwrap();
vecs.height_to_blockhash
.iter()
height_to_blockhash_iter
.get(*height)
.is_none_or(|saved_blockhash| &rpc_blockhash != saved_blockhash.as_ref())
})
.unwrap_or(starting_height);
Ok(Self {
emptyoutputindex: starting_index(
&vecs.height_to_first_emptyoutputindex,
&vecs.emptyoutputindex_to_txindex,
height,
)
.ok_or(Error::Str(""))?,
let emptyoutputindex = starting_index(
&vecs.height_to_first_emptyoutputindex,
&vecs.emptyoutputindex_to_txindex,
height,
p2msoutputindex: starting_index(
&vecs.height_to_first_p2msoutputindex,
&vecs.p2msoutputindex_to_txindex,
height,
)
.ok_or(Error::Str(""))?,
opreturnindex: starting_index(
&vecs.height_to_first_opreturnindex,
&vecs.opreturnindex_to_txindex,
height,
)
.ok_or(Error::Str(""))?,
p2pk33addressindex: starting_index(
&vecs.height_to_first_p2pk33addressindex,
&vecs.p2pk33addressindex_to_p2pk33bytes,
height,
)
.ok_or(Error::Str(""))?,
p2pk65addressindex: starting_index(
&vecs.height_to_first_p2pk65addressindex,
&vecs.p2pk65addressindex_to_p2pk65bytes,
height,
)
.ok_or(Error::Str(""))?,
p2pkhaddressindex: starting_index(
&vecs.height_to_first_p2pkhaddressindex,
&vecs.p2pkhaddressindex_to_p2pkhbytes,
height,
)
.ok_or(Error::Str(""))?,
p2shaddressindex: starting_index(
&vecs.height_to_first_p2shaddressindex,
&vecs.p2shaddressindex_to_p2shbytes,
height,
)
.ok_or(Error::Str(""))?,
p2traddressindex: starting_index(
&vecs.height_to_first_p2traddressindex,
&vecs.p2traddressindex_to_p2trbytes,
height,
)
.ok_or(Error::Str(""))?,
p2wpkhaddressindex: starting_index(
&vecs.height_to_first_p2wpkhaddressindex,
&vecs.p2wpkhaddressindex_to_p2wpkhbytes,
height,
)
.ok_or(Error::Str(""))?,
p2wshaddressindex: starting_index(
&vecs.height_to_first_p2wshaddressindex,
&vecs.p2wshaddressindex_to_p2wshbytes,
height,
)
.ok_or(Error::Str(""))?,
p2aaddressindex: starting_index(
&vecs.height_to_first_p2aaddressindex,
&vecs.p2aaddressindex_to_p2abytes,
height,
)
.ok_or(Error::Str(""))?,
txindex: starting_index(&vecs.height_to_first_txindex, &vecs.txindex_to_txid, height)
.ok_or(Error::Str(""))?,
inputindex: starting_index(
&vecs.height_to_first_inputindex,
&vecs.inputindex_to_outputindex,
height,
)
.ok_or(Error::Str(""))?,
outputindex: starting_index(
&vecs.height_to_first_outputindex,
&vecs.outputindex_to_value,
height,
)
.ok_or(Error::Str(""))?,
unknownoutputindex: starting_index(
&vecs.height_to_first_unknownoutputindex,
&vecs.unknownoutputindex_to_txindex,
height,
)
.ok_or(Error::Str(""))?,
)
.ok_or(Error::Str(""))?;
// dbg!(emptyoutputindex);
let p2msoutputindex = starting_index(
&vecs.height_to_first_p2msoutputindex,
&vecs.p2msoutputindex_to_txindex,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(p2msoutputindex);
let opreturnindex = starting_index(
&vecs.height_to_first_opreturnindex,
&vecs.opreturnindex_to_txindex,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(opreturnindex);
let p2pk33addressindex = starting_index(
&vecs.height_to_first_p2pk33addressindex,
&vecs.p2pk33addressindex_to_p2pk33bytes,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(p2pk33addressindex);
let p2pk65addressindex = starting_index(
&vecs.height_to_first_p2pk65addressindex,
&vecs.p2pk65addressindex_to_p2pk65bytes,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(p2pk65addressindex);
let p2pkhaddressindex = starting_index(
&vecs.height_to_first_p2pkhaddressindex,
&vecs.p2pkhaddressindex_to_p2pkhbytes,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(p2pkhaddressindex);
let p2shaddressindex = starting_index(
&vecs.height_to_first_p2shaddressindex,
&vecs.p2shaddressindex_to_p2shbytes,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(p2shaddressindex);
let p2traddressindex = starting_index(
&vecs.height_to_first_p2traddressindex,
&vecs.p2traddressindex_to_p2trbytes,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(p2traddressindex);
let p2wpkhaddressindex = starting_index(
&vecs.height_to_first_p2wpkhaddressindex,
&vecs.p2wpkhaddressindex_to_p2wpkhbytes,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(p2wpkhaddressindex);
let p2wshaddressindex = starting_index(
&vecs.height_to_first_p2wshaddressindex,
&vecs.p2wshaddressindex_to_p2wshbytes,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(p2wshaddressindex);
let p2aaddressindex = starting_index(
&vecs.height_to_first_p2aaddressindex,
&vecs.p2aaddressindex_to_p2abytes,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(p2aaddressindex);
let txindex = starting_index(&vecs.height_to_first_txindex, &vecs.txindex_to_txid, height)
.ok_or(Error::Str(""))?;
// dbg!(txindex);
let inputindex = starting_index(
&vecs.height_to_first_inputindex,
&vecs.inputindex_to_outputindex,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(inputindex);
let outputindex = starting_index(
&vecs.height_to_first_outputindex,
&vecs.outputindex_to_value,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(outputindex);
let unknownoutputindex = starting_index(
&vecs.height_to_first_unknownoutputindex,
&vecs.unknownoutputindex_to_txindex,
height,
)
.ok_or(Error::Str(""))?;
// dbg!(unknownoutputindex);
Ok(Self {
emptyoutputindex,
height,
p2msoutputindex,
opreturnindex,
p2pk33addressindex,
p2pk65addressindex,
p2pkhaddressindex,
p2shaddressindex,
p2traddressindex,
p2wpkhaddressindex,
p2wshaddressindex,
p2aaddressindex,
txindex,
inputindex,
outputindex,
unknownoutputindex,
})
}
}
+23 -21
View File
@@ -26,7 +26,7 @@ pub use indexes::*;
pub use stores::*;
pub use vecs::*;
const SNAPSHOT_BLOCK_RANGE: usize = 1000;
const SNAPSHOT_BLOCK_RANGE: usize = 1_000;
const COLLISIONS_CHECKED_UP_TO: Height = Height::new(907_000);
const VERSION: Version = Version::ONE;
@@ -64,11 +64,13 @@ impl Indexer {
) -> Result<Indexes> {
let file = self.file.clone();
// dbg!(self.file.regions().id_to_index());
// dbg!(self.file.layout());
let starting_indexes = Indexes::try_from((&mut self.vecs, &self.stores, rpc))
.unwrap_or_else(|_report| Indexes::default());
// dbg!(starting_indexes);
// panic!();
// dbg!(&starting_indexes);
let lock = exit.lock();
self.stores
@@ -180,15 +182,15 @@ impl Indexer {
idxs.height = height;
let txindex_to_first_outputindex_mmap = txindex_to_first_outputindex_reader_opt.as_ref().unwrap();
let p2pk65addressindex_to_p2pk65bytes_mmap = p2pk65addressindex_to_p2pk65bytes_reader_opt.as_ref().unwrap();
let p2pk33addressindex_to_p2pk33bytes_mmap = p2pk33addressindex_to_p2pk33bytes_reader_opt.as_ref().unwrap();
let p2pkhaddressindex_to_p2pkhbytes_mmap = p2pkhaddressindex_to_p2pkhbytes_reader_opt.as_ref().unwrap();
let p2shaddressindex_to_p2shbytes_mmap = p2shaddressindex_to_p2shbytes_reader_opt.as_ref().unwrap();
let p2wpkhaddressindex_to_p2wpkhbytes_mmap = p2wpkhaddressindex_to_p2wpkhbytes_reader_opt.as_ref().unwrap();
let p2wshaddressindex_to_p2wshbytes_mmap = p2wshaddressindex_to_p2wshbytes_reader_opt.as_ref().unwrap();
let p2traddressindex_to_p2trbytes_mmap = p2traddressindex_to_p2trbytes_reader_opt.as_ref().unwrap();
let p2aaddressindex_to_p2abytes_mmap = p2aaddressindex_to_p2abytes_reader_opt.as_ref().unwrap();
let txindex_to_first_outputindex_reader = txindex_to_first_outputindex_reader_opt.as_ref().unwrap();
let p2pk65addressindex_to_p2pk65bytes_reader = p2pk65addressindex_to_p2pk65bytes_reader_opt.as_ref().unwrap();
let p2pk33addressindex_to_p2pk33bytes_reader = p2pk33addressindex_to_p2pk33bytes_reader_opt.as_ref().unwrap();
let p2pkhaddressindex_to_p2pkhbytes_reader = p2pkhaddressindex_to_p2pkhbytes_reader_opt.as_ref().unwrap();
let p2shaddressindex_to_p2shbytes_reader = p2shaddressindex_to_p2shbytes_reader_opt.as_ref().unwrap();
let p2wpkhaddressindex_to_p2wpkhbytes_reader = p2wpkhaddressindex_to_p2wpkhbytes_reader_opt.as_ref().unwrap();
let p2wshaddressindex_to_p2wshbytes_reader = p2wshaddressindex_to_p2wshbytes_reader_opt.as_ref().unwrap();
let p2traddressindex_to_p2trbytes_reader = p2traddressindex_to_p2trbytes_reader_opt.as_ref().unwrap();
let p2aaddressindex_to_p2abytes_reader = p2aaddressindex_to_p2abytes_reader_opt.as_ref().unwrap();
// Used to check rapidhash collisions
let check_collisions = check_collisions && height > COLLISIONS_CHECKED_UP_TO ;
@@ -291,7 +293,7 @@ impl Indexer {
let vout = Vout::from(outpoint.vout);
let outputindex = vecs.txindex_to_first_outputindex.get_or_read(prev_txindex, txindex_to_first_outputindex_mmap)?
let outputindex = vecs.txindex_to_first_outputindex.get_or_read(prev_txindex, txindex_to_first_outputindex_reader)?
.ok_or(Error::Str("Expect outputindex to not be none"))
.inspect_err(|_| {
dbg!(outpoint.txid, prev_txindex, vout);
@@ -377,35 +379,35 @@ impl Indexer {
let prev_addressbytes_opt = match outputtype {
OutputType::P2PK65 => vecs
.p2pk65addressindex_to_p2pk65bytes
.get_or_read(typeindex.into(), p2pk65addressindex_to_p2pk65bytes_mmap)?
.get_or_read(typeindex.into(), p2pk65addressindex_to_p2pk65bytes_reader)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2PK33 => vecs
.p2pk33addressindex_to_p2pk33bytes
.get_or_read(typeindex.into(), p2pk33addressindex_to_p2pk33bytes_mmap)?
.get_or_read(typeindex.into(), p2pk33addressindex_to_p2pk33bytes_reader)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2PKH => vecs
.p2pkhaddressindex_to_p2pkhbytes
.get_or_read(typeindex.into(), p2pkhaddressindex_to_p2pkhbytes_mmap)?
.get_or_read(typeindex.into(), p2pkhaddressindex_to_p2pkhbytes_reader)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2SH => vecs
.p2shaddressindex_to_p2shbytes
.get_or_read(typeindex.into(), p2shaddressindex_to_p2shbytes_mmap)?
.get_or_read(typeindex.into(), p2shaddressindex_to_p2shbytes_reader)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2WPKH => vecs
.p2wpkhaddressindex_to_p2wpkhbytes
.get_or_read(typeindex.into(), p2wpkhaddressindex_to_p2wpkhbytes_mmap)?
.get_or_read(typeindex.into(), p2wpkhaddressindex_to_p2wpkhbytes_reader)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2WSH => vecs
.p2wshaddressindex_to_p2wshbytes
.get_or_read(typeindex.into(), p2wshaddressindex_to_p2wshbytes_mmap)?
.get_or_read(typeindex.into(), p2wshaddressindex_to_p2wshbytes_reader)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2TR => vecs
.p2traddressindex_to_p2trbytes
.get_or_read(typeindex.into(), p2traddressindex_to_p2trbytes_mmap)?
.get_or_read(typeindex.into(), p2traddressindex_to_p2trbytes_reader)?
.map(|v| AddressBytes::from(v.into_owned())),
OutputType::P2A => vecs
.p2aaddressindex_to_p2abytes
.get_or_read(typeindex.into(), p2aaddressindex_to_p2abytes_mmap)?
.get_or_read(typeindex.into(), p2aaddressindex_to_p2abytes_reader)?
.map(|v| AddressBytes::from(v.into_owned())),
_ => {
unreachable!()
+43 -17
View File
@@ -143,7 +143,7 @@ impl File {
#[inline]
pub fn write_all_to_region(&self, identifier: Identifier, data: &[u8]) -> Result<()> {
self.write_all_to_region_at_(identifier, data, None)
self.write_all_to_region_at_(identifier, data, None, false)
}
#[inline]
@@ -153,14 +153,25 @@ impl File {
data: &[u8],
at: u64,
) -> Result<()> {
self.write_all_to_region_at_(identifier, data, Some(at))
self.write_all_to_region_at_(identifier, data, Some(at), false)
}
pub fn truncate_write_all_to_region(
&self,
identifier: Identifier,
at: u64,
data: &[u8],
) -> Result<()> {
self.write_all_to_region_at_(identifier, data, Some(at), true)
}
// NEW
fn write_all_to_region_at_(
&self,
identifier: Identifier,
data: &[u8],
at: Option<u64>,
truncate: bool,
) -> Result<()> {
let regions = self.regions.read();
let Some(region_lock) = regions.get_region(identifier.clone()) else {
@@ -176,17 +187,22 @@ impl File {
drop(region);
let data_len = data.len() as u64;
let new_len = at.map_or(len + data_len, |at| (at + data_len).max(len));
let new_len = at.map_or(len + data_len, |at| {
assert!(at <= len);
let new_len = at + data_len;
if truncate { new_len } else { new_len.max(len) }
});
let write_start = start + at.unwrap_or(len);
if at.is_some_and(|at| at >= start + reserved) {
if at.is_some_and(|at| at >= reserved) {
return Err(Error::Str("Invalid at parameter"));
}
// Write to reserved space if possible
if new_len <= reserved {
println!("Write to {region_index} reserved space at {write_start}");
// println!(
// "Write {data_len} bytes to {region_index} reserved space at {write_start} (start = {start}, at = {at:?}, len = {len})"
// );
if at.is_none() {
self.write(write_start, data);
@@ -216,7 +232,7 @@ impl File {
// If is last continue writing
if layout.is_last_anything(region_index) {
println!("{region_index} Append to file at {write_start}");
// println!("{region_index} Append to file at {write_start}");
self.set_min_len(start + new_reserved)?;
let mut region = region_lock.write();
@@ -239,7 +255,7 @@ impl File {
.get_hole(hole_start)
.is_some_and(|gap| gap >= added_reserve)
{
println!("Expand {region_index} to hole");
// println!("Expand {region_index} to hole");
layout.remove_or_compress_hole(hole_start, added_reserve);
let mut region = region_lock.write();
@@ -258,7 +274,7 @@ impl File {
// Find hole big enough to move the region
if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
println!("Move {region_index} to hole at {hole_start}");
// println!("Move {region_index} to hole at {hole_start}");
layout.remove_or_compress_hole(hole_start, new_reserved);
drop(layout);
@@ -284,11 +300,11 @@ impl File {
let new_start = layout.len(&regions);
// Write at the end
println!(
"Move {region_index} to the end, from {start}..{} to {new_start}..{}",
start + reserved,
new_start + new_reserved
);
// println!(
// "Move {region_index} to the end, from {start}..{} to {new_start}..{}",
// start + reserved,
// new_start + new_reserved
// );
self.set_min_len(new_start + new_reserved)?;
layout.reserve(new_start, new_reserved);
drop(layout);
@@ -329,9 +345,13 @@ impl File {
slice[start..end].copy_from_slice(data);
}
///
/// From relative to start
///
/// DO NOT call any `write_all` function right after as there could be a race condition if hole punching happens
///
pub fn truncate_region(&self, identifier: Identifier, from: u64) -> Result<()> {
let Some(region) = self.regions.read().get_region(identifier) else {
let Some(region) = self.regions.read().get_region(identifier.clone()) else {
return Err(Error::Str("Unknown region"));
};
let mut region_ = region.write();
@@ -354,7 +374,10 @@ impl File {
if start > end {
unreachable!("Should not be possible");
} else if start < end {
self.punch_hole(start, end - start)?;
let length = end - start;
// if length > PAGE_SIZE {
self.punch_hole(start, length)?;
// }
}
Ok(())
}
@@ -457,12 +480,15 @@ impl File {
})
}
#[inline]
fn punch_hole(&self, start: u64, length: u64) -> Result<()> {
let file = self.file.write();
Self::punch_hole_impl(&file, start, length)
Self::punch_hole_(&file, start, length)
}
#[inline]
fn punch_hole_(file: &fs::File, start: u64, length: u64) -> Result<()> {
// println!("Punching {length} bytes hole at {start}");
Self::punch_hole_impl(file, start, length)
}
+3 -2
View File
@@ -32,7 +32,8 @@ impl<'a> Reader<'a> {
&self.region
}
pub fn prefixed(&self, offset: usize) -> &[u8] {
&self.mmap[offset..]
pub fn prefixed(&self, offset: u64) -> &[u8] {
let start = self.region.start() + offset;
&self.mmap[start as usize..]
}
}
+2
View File
@@ -12,6 +12,7 @@ pub struct Region {
len: u64,
/// Must be multiple of 4096, greater or equal to len
reserved: u64,
padding: u64,
}
pub const SIZE_OF_REGION: usize = size_of::<Region>();
@@ -27,6 +28,7 @@ impl Region {
start,
len,
reserved,
padding: 0,
}
}
+15 -18
View File
@@ -93,6 +93,12 @@ where
let vec: Vec<T::NumberType> = pco::standalone::simple_decompress(slice)?;
let vec = T::from_inner_slice(vec);
if vec.len() != page.values as usize {
dbg!((offset, len));
dbg!(vec);
unreachable!()
}
Ok(vec)
}
@@ -205,11 +211,7 @@ where
fn flush(&mut self) -> Result<()> {
self.inner.write_header_if_needed()?;
let pushed_len = self.pushed_len();
let has_new_data = pushed_len != 0;
if !has_new_data {
if self.is_pushed_empty() {
return Ok(());
}
@@ -222,9 +224,7 @@ where
let mut truncate_at = None;
if stored_len % Self::PER_PAGE != 0 {
if pages.is_empty() {
unreachable!()
}
assert!(!pages.is_empty());
let last_page_index = pages.len() - 1;
@@ -236,7 +236,8 @@ where
})
.unwrap();
truncate_at.replace(pages.pop().unwrap().start);
let start = pages.pop().unwrap().start;
truncate_at.replace(start);
starting_page_index = last_page_index;
}
@@ -254,14 +255,10 @@ where
let prev = pages.get(page_index - 1).unwrap();
prev.start + prev.bytes as u64
} else {
0
HEADER_OFFSET as u64
};
let page = Page::new(
start + HEADER_OFFSET as u64,
bytes.len() as u32,
*len as u32,
);
let page = Page::new(start, bytes.len() as u32, *len as u32);
pages.checked_push(page_index, page);
});
@@ -274,11 +271,11 @@ where
let file = self.file();
if let Some(truncate_at) = truncate_at {
file.truncate_region(self.region_index().into(), truncate_at)?;
file.truncate_write_all_to_region(self.region_index().into(), truncate_at, &buf)?;
} else {
file.write_all_to_region(self.region_index().into(), &buf)?;
}
file.write_all_to_region(self.region_index().into(), &buf)?;
pages.flush(file)?;
Ok(())
@@ -49,12 +49,10 @@ impl Pages {
let change_at = self.change_at.take().unwrap();
let at = (change_at * Self::SIZE_OF_PAGE) as u64;
file.truncate_region(self.region_index.into(), at)?;
file.write_all_to_region_at(
file.truncate_write_all_to_region(
self.region_index.into(),
self.vec[change_at..].as_bytes(),
at,
self.vec[change_at..].as_bytes(),
)?;
Ok(())
+4 -2
View File
@@ -84,14 +84,15 @@ impl Header {
}
}
#[repr(C)]
#[derive(Debug, Clone, FromBytes, IntoBytes, Immutable, KnownLayout)]
#[repr(C)]
struct HeaderInner {
pub header_version: Version,
pub vec_version: Version,
pub computed_version: Version,
pub stamp: Stamp,
pub compressed: ZeroCopyBool,
pub padding: [u8; 31],
}
impl HeaderInner {
@@ -107,6 +108,7 @@ impl HeaderInner {
computed_version: Version::default(),
stamp: Stamp::default(),
compressed: ZeroCopyBool::from(format),
padding: Default::default(),
};
header.write(file, region_index)?;
Ok(header)
@@ -172,7 +174,7 @@ impl HeaderInner {
Immutable,
KnownLayout,
)]
pub struct ZeroCopyBool(u64);
pub struct ZeroCopyBool(u8);
impl ZeroCopyBool {
pub const TRUE: Self = Self(1);
+3 -4
View File
@@ -202,9 +202,7 @@ where
fn flush(&mut self) -> Result<()> {
self.write_header_if_needed()?;
let pushed_len = self.pushed_len();
let has_new_data = pushed_len != 0;
let has_new_data = !self.is_pushed_empty();
let has_updated_data = !self.updated.is_empty();
let has_holes = !self.holes.is_empty();
let had_holes = self.has_stored_holes && !has_holes;
@@ -273,7 +271,7 @@ where
{
#[inline]
fn read_(&self, index: usize, reader: &Reader) -> Result<T> {
T::read_from_prefix(reader.prefixed(index * Self::SIZE_OF_T + HEADER_OFFSET))
T::read_from_prefix(reader.prefixed((index * Self::SIZE_OF_T + HEADER_OFFSET) as u64))
.map(|(v, _)| v)
.map_err(Error::from)
}
@@ -318,6 +316,7 @@ where
}
let from = index * Self::SIZE_OF_T + HEADER_OFFSET;
self.file
.truncate_region(self.region_index.into(), from as u64)
}