vecs: init

This commit is contained in:
nym21
2025-07-21 11:02:25 +02:00
parent 7ef70b953b
commit 5347523921
23 changed files with 471 additions and 86 deletions

View File

@@ -747,9 +747,10 @@ impl Vecs {
separate_address_vecs
.par_iter_mut()
.try_for_each(|(_, v)| v.state.reset_price_to_amount())?;
}
};
let last_height = indexer.vecs.height_to_blockhash.height();
if starting_height <= last_height {
let inputindex_to_outputindex_mmap = inputindex_to_outputindex.create_mmap()?;
let outputindex_to_value_mmap = outputindex_to_value.create_mmap()?;
@@ -1237,7 +1238,7 @@ impl Vecs {
if height != Height::ZERO && height.unwrap_to_usize() % 10_000 == 0 {
info!("Flushing...");
exit.block();
let _lock = exit.lock();
self.flush_states(height, &chain_state, mem::take(&mut addresstype_to_typeindex_to_loadedaddressdata), mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata), exit)?;
@@ -1245,14 +1246,12 @@ impl Vecs {
&mut addresstypeindex_to_anyaddressindex_mmap_opt,
&mut anyaddressindex_to_anyaddressdata_mmap_opt,
);
exit.release();
}
Ok(())
})?;
exit.block();
let _lock = exit.lock();
info!("Flushing...");
@@ -1263,8 +1262,6 @@ impl Vecs {
mem::take(&mut addresstype_to_typeindex_to_emptyaddressdata),
exit,
)?;
} else {
exit.block();
}
unsafe { libc::sync() }
@@ -1420,8 +1417,6 @@ impl Vecs {
unsafe { libc::sync() }
exit.release();
Ok(())
}

View File

@@ -35,7 +35,8 @@ pub enum Error {
WrongAddressType,
UnindexableDate,
String(&'static str),
Str(&'static str),
String(String),
}
impl From<time::SystemTimeError> for Error {
@@ -134,6 +135,7 @@ impl fmt::Display for Error {
"Date cannot be indexed, must be 2009-01-03, 2009-01-09 or greater"
),
Error::Str(s) => write!(f, "{s}"),
Error::String(s) => write!(f, "{s}"),
}
}

View File

@@ -44,7 +44,7 @@ impl LoadedAddressData {
pub fn send(&mut self, amount: Sats, previous_price: Option<Dollars>) -> Result<()> {
if self.amount() < amount {
return Err(Error::String("Previous_amount smaller than sent amount"));
return Err(Error::Str("Previous_amount smaller than sent amount"));
}
self.sent += amount;
self.outputs_len -= 1;

View File

@@ -5,7 +5,7 @@ pub fn copy_first_4bytes(slice: &[u8]) -> Result<[u8; 4]> {
let mut buf: [u8; 4] = [0; 4];
let buf_len = buf.len();
if slice.len() < buf_len {
return Err(Error::String("Buffer is too small to convert to 8 bytes"));
return Err(Error::Str("Buffer is too small to convert to 8 bytes"));
}
slice.iter().take(buf_len).enumerate().for_each(|(i, r)| {
buf[i] = *r;
@@ -18,7 +18,7 @@ pub fn copy_first_8bytes(slice: &[u8]) -> Result<[u8; 8]> {
let mut buf: [u8; 8] = [0; 8];
let buf_len = buf.len();
if slice.len() < buf_len {
return Err(Error::String("Buffer is too small to convert to 8 bytes"));
return Err(Error::Str("Buffer is too small to convert to 8 bytes"));
}
slice.iter().take(buf_len).enumerate().for_each(|(i, r)| {
buf[i] = *r;

View File

@@ -11,3 +11,4 @@ repository.workspace = true
brk_logger = { workspace = true }
ctrlc = { version = "3.4.7", features = ["termination"] }
log = { workspace = true }
parking_lot = { workspace = true }

View File

@@ -8,7 +8,7 @@ fn main() {
brk_logger::init(Some(Path::new(".log")));
exit.block();
let lock = exit.lock();
let mut i = 0;
while i < 21 {
@@ -17,7 +17,7 @@ fn main() {
i += 1;
}
exit.release();
drop(lock);
let mut j = 0;
while j < 10 {

View File

@@ -3,69 +3,34 @@
#![doc = include_str!("../examples/main.rs")]
#![doc = "```"]
use std::{
process::exit,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread::sleep,
time::Duration,
};
use std::{process::exit, sync::Arc};
use log::info;
use parking_lot::{RwLock, RwLockReadGuard};
#[derive(Default, Clone)]
pub struct Exit {
blocking: Arc<AtomicBool>,
triggered: Arc<AtomicBool>,
}
pub struct Exit(Arc<RwLock<()>>);
impl Exit {
pub fn new() -> Self {
let s = Self {
triggered: Arc::new(AtomicBool::new(false)),
blocking: Arc::new(AtomicBool::new(false)),
};
let arc = Arc::new(RwLock::new(()));
let triggered = s.triggered.clone();
let blocking = s.blocking.clone();
let is_blocking = move || blocking.load(Ordering::SeqCst);
let copy = arc.clone();
ctrlc::set_handler(move || {
info!("Exitting...");
triggered.store(true, Ordering::SeqCst);
if is_blocking() {
if copy.is_locked() {
info!("Waiting to exit safely...");
while is_blocking() {
sleep(Duration::from_millis(50));
}
}
let _lock = copy.write();
info!("Exiting...");
exit(0);
})
.expect("Error setting Ctrl-C handler");
s
Self(arc)
}
pub fn block(&self) {
self.blocking.store(true, Ordering::SeqCst);
}
pub fn blocked(&self) -> bool {
self.blocking.load(Ordering::SeqCst)
}
pub fn release(&self) {
self.blocking.store(false, Ordering::SeqCst);
}
pub fn triggered(&self) -> bool {
self.triggered.load(Ordering::SeqCst)
pub fn lock(&self) -> RwLockReadGuard<'_, ()> {
self.0.read()
}
}

View File

@@ -60,11 +60,11 @@ impl Indexer {
// dbg!(starting_indexes);
// panic!();
exit.block();
let lock = exit.lock();
self.stores
.rollback_if_needed(&mut self.vecs, &starting_indexes)?;
self.vecs.rollback_if_needed(&starting_indexes)?;
exit.release();
drop(lock);
let vecs = &mut self.vecs;
let stores = &mut self.stores;
@@ -90,15 +90,14 @@ impl Indexer {
rem: bool,
exit: &Exit|
-> color_eyre::Result<bool> {
if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem || exit.triggered() {
if height == 0 || (height % SNAPSHOT_BLOCK_RANGE != 0) != rem {
return Ok(false);
}
info!("Exporting...");
exit.block();
let _lock = exit.lock();
stores.commit(height)?;
vecs.flush(height)?;
exit.release();
Ok(true)
};

View File

@@ -49,17 +49,8 @@ where
}
fn safe_truncate_if_needed(&mut self, index: I, exit: &Exit) -> Result<()> {
if exit.triggered() {
return Ok(());
}
let blocked = exit.blocked();
if !blocked {
exit.block();
}
let _lock = exit.lock();
self.0.truncate_if_needed(index)?;
if !blocked {
exit.release();
}
Ok(())
}
@@ -85,17 +76,8 @@ where
}
pub fn safe_flush(&mut self, exit: &Exit) -> Result<()> {
if exit.triggered() {
return Ok(());
}
let blocked = exit.blocked();
if !blocked {
exit.block();
}
let _lock = exit.lock();
self.0.flush()?;
if !blocked {
exit.release();
}
Ok(())
}

1
crates/brk_vecs/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/vecs

View File

@@ -0,0 +1,27 @@
[package]
name = "brk_vecs"
description = "A storeable vec"
keywords = ["vec", "disk", "data"]
categories = ["database"]
version.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
[dependencies]
arc-swap = { workspace = true }
bincode = { workspace = true }
brk_core = { workspace = true }
brk_exit = { workspace = true }
libc = "0.2.174"
log = { workspace = true }
memmap2 = "0.9.7"
parking_lot = {workspace = true}
rayon = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
zerocopy = { workspace = true }
zerocopy-derive = { workspace = true }
zstd = "0.13.3"

View File

@@ -0,0 +1,12 @@
use std::path::Path;
use brk_core::Result;
use brk_vecs::{File, PAGE_SIZE};
fn main() -> Result<()> {
let file = File::open(Path::new("vecs"))?;
file.grow_if_needed(PAGE_SIZE * 1_000_000)?;
Ok(())
}

View File

@@ -0,0 +1,82 @@
use std::fs::OpenOptions;
use std::sync::Arc;
use std::{collections::HashMap, fs, io::BufReader, path::Path};
use bincode::decode_from_std_read;
use bincode::{Decode, Encode, config};
use brk_core::Result;
use parking_lot::{RwLock, RwLockReadGuard};
use crate::PAGE_SIZE;
use super::Region;
#[derive(Debug)]
pub struct Layout {
file: fs::File,
pub id_to_index: HashMap<String, usize>,
pub index_to_region: Vec<Arc<RwLock<Region>>>,
// holes
}
impl Layout {
pub fn open(path: &Path) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.create(true)
.write(true)
.truncate(false)
.open(path)?;
Ok(if file.metadata()?.len() != 0 {
let config = config::standard();
let mut reader = BufReader::new(&file);
let serialized: SerializedRegions = decode_from_std_read(&mut reader, config)?;
let mut id_to_index = HashMap::new();
let mut index_to_region = vec![];
serialized.0.into_iter().for_each(|(str, region)| {
id_to_index.insert(str, index_to_region.len());
index_to_region.push(Arc::new(RwLock::new(region)));
});
Self {
file,
id_to_index,
index_to_region,
}
} else {
Self {
file,
id_to_index: HashMap::new(),
index_to_region: Vec::new(),
}
})
}
pub fn get_or_create_region_from_id(&mut self, id: String) -> Result<usize> {
if let Some(v) = self.id_to_index.get(&id) {
return Ok(*v);
}
let index = self.create_region()?;
self.id_to_index.insert(id, index);
Ok(index)
}
fn create_region(&mut self) -> Result<usize> {
let index = self.index_to_region.len();
let length = PAGE_SIZE;
Ok(0)
}
pub fn get(&self, region: usize) -> Option<RwLockReadGuard<'_, Region>> {
self.index_to_region.get(region).map(|r| r.read())
}
}
#[derive(Debug, Encode, Decode)]
struct SerializedRegions(HashMap<String, Region>);

View File

@@ -0,0 +1,128 @@
use std::{
fs::{self, OpenOptions},
os::unix::io::AsRawFd,
path::Path,
sync::Arc,
};
use brk_core::{Error, Result};
use libc::off_t;
use memmap2::{MmapMut, MmapOptions};
use parking_lot::{RwLock, RwLockReadGuard};
mod layout;
mod reader;
mod region;
use layout::*;
use region::*;
use crate::file::reader::Reader;
pub const PAGE_SIZE: usize = 4096;
pub struct File {
layout: Arc<RwLock<Layout>>,
file: Arc<RwLock<fs::File>>,
mmap: Arc<RwLock<MmapMut>>,
}
impl File {
pub fn open(path: &Path) -> Result<Self> {
fs::create_dir_all(path)?;
let layout = Layout::open(&path.join("layout.dat"))?;
let file = OpenOptions::new()
.read(true)
.create(true)
.write(true)
.truncate(false)
.open(path.join("data.dat"))?;
let mmap = Self::mmap(&file)?;
Ok(Self {
file: Arc::new(RwLock::new(file)),
mmap: Arc::new(RwLock::new(mmap)),
layout: Arc::new(RwLock::new(layout)),
})
}
/// len % PAGE_SIZE == 0
pub fn grow_if_needed(&self, len: usize) -> Result<()> {
assert!(len % PAGE_SIZE == 0);
let file = self.file.write();
let len = len as u64;
if file.metadata()?.len() < len {
file.set_len(len)?;
self.remap_(&file)
} else {
Ok(())
}
}
pub fn get_or_create_region_from_id(&mut self, id: String) -> Result<usize> {
self.layout.write().get_or_create_region_from_id(id)
}
pub fn create_reader<'a, 'b>(&'a self, region_id: usize) -> Result<Reader<'a, 'b>>
where
'a: 'b,
{
let layout: RwLockReadGuard<'a, Layout> = self.layout.read();
let mmap: RwLockReadGuard<'a, MmapMut> = self.mmap.read();
let region: RwLockReadGuard<'b, Region> =
layout.get(region_id).ok_or(Error::Str("Unknown region"))?;
Ok(Reader::new(mmap, layout, region))
}
fn remap(&self) -> Result<()> {
*self.mmap.write() = Self::mmap(&self.file.read())?;
Ok(())
}
fn remap_(&self, file: &fs::File) -> Result<()> {
*self.mmap.write() = Self::mmap(file)?;
Ok(())
}
fn mmap(file: &fs::File) -> Result<MmapMut> {
Ok(unsafe { MmapOptions::new().map_mut(file)? })
}
pub fn delete() {}
#[cfg(target_os = "macos")]
fn punch_hole(file: &fs::File, offset: u64, length: u64) -> Result<()> {
let fpunchhole = FPunchhole {
fp_flags: 0,
reserved: 0,
fp_offset: offset as libc::off_t,
fp_length: length as libc::off_t,
};
let result = unsafe {
libc::fcntl(
file.as_raw_fd(),
libc::F_PUNCHHOLE,
&fpunchhole as *const FPunchhole,
)
};
if result == -1 {
let err = std::io::Error::last_os_error();
return Err(Error::String(format!("Failed to punch hole: {err}")));
}
Ok(())
}
}
#[repr(C)]
struct FPunchhole {
fp_flags: u32,
reserved: u32,
fp_offset: off_t,
fp_length: off_t,
}

View File

@@ -0,0 +1,44 @@
use memmap2::MmapMut;
use parking_lot::RwLockReadGuard;
use crate::file::layout::Layout;
use super::Region;
pub struct Reader<'a, 'b>
where
'a: 'b,
{
layout: RwLockReadGuard<'a, Layout>,
mmap: RwLockReadGuard<'a, MmapMut>,
region: RwLockReadGuard<'b, Region>,
}
impl<'a, 'b> Reader<'a, 'b>
where
'a: 'b,
{
pub fn new(
mmap: RwLockReadGuard<'a, MmapMut>,
layout: RwLockReadGuard<'a, Layout>,
region: RwLockReadGuard<'b, Region>,
) -> Self {
Self {
mmap,
layout,
region,
}
}
pub fn read(&self, offset: usize, len: usize) -> &[u8] {
assert!(offset + len < self.region.length());
let start = self.region.start() + offset;
let end = start + len;
&self.mmap[start..end]
}
pub fn region(&self) -> &Region {
&self.region
}
}

View File

@@ -0,0 +1,79 @@
// use std::sync::Arc;
use bincode::{Decode, Encode};
// use parking_lot::{RwLock, RwLockReadGuard};
use crate::PAGE_SIZE;
// #[derive(Debug, Encode, Decode)]
#[derive(Debug, Encode, Decode)]
pub struct Region {
// Bad name
/// Must be multiple of 4096
start: usize,
length: usize,
/// Must be multiple of 4096
reserved: usize,
// lock: Arc<RwLock<()>>,
// variant: usize, // Raw or Compressed or something else ? to know if there is a header ? Since blocks 4096, storing headers individually would be dumb
}
impl Region {
pub fn new(start: usize, length: usize, reserved: usize) -> Self {
assert!(reserved > 0);
assert!(start % PAGE_SIZE == 0);
assert!(reserved % PAGE_SIZE == 0);
assert!(length <= reserved);
Self {
start,
length,
reserved,
// lock: Arc::new(RwLock::new(())),
}
}
pub fn start(&self) -> usize {
self.start
}
pub fn length(&self) -> usize {
self.length
}
pub fn reserved(&self) -> usize {
self.reserved
}
// pub fn lock(&self) -> RwLockReadGuard<'_, ()> {
// self.lock.read()
// }
}
// #[derive(Debug, Encode, Decode)]
// pub struct RegionInner {
// start: usize,
// length: usize,
// reserved: usize,
// }
// impl From<Region> for RegionInner {
// fn from(value: Region) -> Self {
// Self {
// start: value.start,
// length: value.length,
// reserved: value.reserved,
// }
// }
// }
// impl From<RegionInner> for Region {
// fn from(value: RegionInner) -> Self {
// Self {
// start: value.start,
// length: value.length,
// reserved: value.reserved,
// lock: Arc::new(RwLock::new(())),
// }
// }
// }

View File

@@ -0,0 +1,5 @@
mod file;
mod variants;
pub use file::*;
pub use variants::*;

View File

@@ -0,0 +1,3 @@
mod raw;
pub use raw::*;

View File

@@ -0,0 +1,3 @@
pub struct RawVec {
region: usize,
}

View File

@@ -0,0 +1,3 @@
pub struct Stamp(u64);
pub struct StampedVec;