mirror of
https://github.com/EFForg/rayhunter.git
synced 2026-05-29 20:49:28 -07:00
daemon: do a best-effort manifest recovery
Revises @cooperq's recovery a bit by preventing any panics, recovering whichever files we can and skipping the ones we can't.
This commit is contained in:
committed by
Cooper Quintin
parent
1011c4b123
commit
f1e283b52c
@@ -34,8 +34,6 @@ use diag::{
|
||||
use log::{error, info};
|
||||
use qmdl_store::RecordingStoreError;
|
||||
use rayhunter::diag_device::DiagDevice;
|
||||
use std::path::Path;
|
||||
use tokio::fs;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::select;
|
||||
use tokio::sync::mpsc::{self, Sender};
|
||||
@@ -94,7 +92,7 @@ async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) {
|
||||
|
||||
// Loads a RecordingStore if one exists, and if not, only create one if we're
|
||||
// not in debug mode. If we fail to parse the manifest AND we're not in debug
|
||||
// mode, try to recover by making a new (empty) manifest in the same directory.
|
||||
// mode, try to recover the manifest from the existing QMDL files
|
||||
async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, RayhunterError> {
|
||||
let store_exists = RecordingStore::exists(&config.qmdl_store_path).await?;
|
||||
if config.debug_mode {
|
||||
@@ -110,28 +108,8 @@ async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, Rayh
|
||||
Ok(store) => Ok(store),
|
||||
Err(RecordingStoreError::ParseManifestError(err)) => {
|
||||
error!("failed to parse QMDL manifest: {err}");
|
||||
info!("creating new empty manifest...");
|
||||
let mut recording_store = RecordingStore::create(&config.qmdl_store_path).await?;
|
||||
info!("parsing existing qmdl files into recording store...");
|
||||
let path = Path::new(&config.qmdl_store_path);
|
||||
let mut entries = fs::read_dir(path).await?;
|
||||
|
||||
// We might want to sort these newest to oldest so we don't have entries in manifest.toml in random order
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let file_name = entry.file_name();
|
||||
let file_name_str = match file_name.to_str() {
|
||||
Some(s) => s,
|
||||
None => continue, // skip non-UTF-8 names
|
||||
};
|
||||
|
||||
if file_name_str.ends_with(".qmdl") {
|
||||
let name = file_name_str.trim_end_matches(".qmdl");
|
||||
info!("making entry for {}", name);
|
||||
recording_store.new_entry_from_existing(name.to_string()).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(recording_store)
|
||||
info!("recovering manifest from existing QMDL files...");
|
||||
Ok(RecordingStore::recover(&config.qmdl_store_path).await?)
|
||||
}
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use chrono::{DateTime, Local, TimeZone};
|
||||
use rayhunter::analysis;
|
||||
use chrono::{DateTime, Local};
|
||||
use log::warn;
|
||||
use rayhunter::util::RuntimeMetadata;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
@@ -138,6 +138,73 @@ impl RecordingStore {
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
// Does a best-effort attempt to recover the manifest from a directory of
|
||||
// QMDL files. We expect these files to be named like "<timestamp>.qmdl",
|
||||
// and skip any files which don't match that pattern.
|
||||
pub async fn recover<P>(path: P) -> Result<Self, RecordingStoreError>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let mut dir_entries = fs::read_dir(path.as_ref()).await
|
||||
.map_err(RecordingStoreError::OpenDirError)?;
|
||||
let mut manifest_entries = Vec::new();
|
||||
|
||||
while let Some(entry) = dir_entries.next_entry().await.map_err(RecordingStoreError::OpenDirError)? {
|
||||
let os_filename = entry.file_name();
|
||||
let Some(filename) = os_filename.to_str() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if !filename.ends_with(".qmdl") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let stem = filename.trim_end_matches(".qmdl");
|
||||
let Ok(start_timestamp) = stem.parse::<i64>() else {
|
||||
warn!("QMDL file has invalid name {os_filename:?}, skipping");
|
||||
continue;
|
||||
};
|
||||
|
||||
let metadata = match entry.metadata().await {
|
||||
Ok(metadata) => metadata,
|
||||
Err(err) => {
|
||||
warn!("failed to read QMDL file metadata: {err:?}, skipping");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let start_time = DateTime::from_timestamp_nanos(start_timestamp);
|
||||
let Ok(last_message_time) = metadata.modified() else {
|
||||
warn!("failed to get modified time for QMDL file {os_filename:?}, skipping");
|
||||
continue;
|
||||
};
|
||||
|
||||
manifest_entries.push(ManifestEntry {
|
||||
name: stem.to_string(),
|
||||
start_time: start_time.into(),
|
||||
last_message_time: Some(last_message_time.into()),
|
||||
qmdl_size_bytes: metadata.size() as usize,
|
||||
rayhunter_version: None,
|
||||
system_os: None,
|
||||
arch: None,
|
||||
});
|
||||
}
|
||||
|
||||
// sort chronologically
|
||||
manifest_entries.sort_by(|a, b| a.start_time.cmp(&b.start_time));
|
||||
|
||||
let mut store = RecordingStore {
|
||||
path: path.as_ref().to_path_buf(),
|
||||
manifest: Manifest {
|
||||
entries: manifest_entries,
|
||||
},
|
||||
current_entry: None,
|
||||
};
|
||||
store.write_manifest().await?;
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
async fn read_manifest<P>(path: P) -> Result<Manifest, RecordingStoreError>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
@@ -171,40 +238,6 @@ impl RecordingStore {
|
||||
self.write_manifest().await?;
|
||||
Ok((qmdl_file, analysis_file))
|
||||
}
|
||||
|
||||
pub async fn new_entry_from_existing(&mut self, name: String) -> Result<(File, File), RecordingStoreError> {
|
||||
// if we've already got an entry open, close it
|
||||
if self.current_entry.is_some() {
|
||||
self.close_current_entry().await?;
|
||||
}
|
||||
let mut new_entry = ManifestEntry::new();
|
||||
new_entry.name = name;
|
||||
let qmdl_filepath = new_entry.get_qmdl_filepath(&self.path);
|
||||
let qmdl_file = File::open(&qmdl_filepath)
|
||||
.await
|
||||
.map_err(RecordingStoreError::ReadFileError)?;
|
||||
let qmdl_meta = qmdl_file.metadata().await.map_err(RecordingStoreError::ReadFileError)?;
|
||||
let analysis_filepath = new_entry.get_analysis_filepath(&self.path);
|
||||
let analysis_file = File::open(&analysis_filepath)
|
||||
.await
|
||||
.map_err(RecordingStoreError::ReadFileError)?;
|
||||
|
||||
let timestamp = Local.timestamp_opt(new_entry.name.parse::<i64>().expect("Invalid timestamp"), 0).unwrap();
|
||||
new_entry.start_time = timestamp;
|
||||
|
||||
// I can't think of a better way to find this
|
||||
let update = qmdl_meta.modified().expect("no mod date").duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards");
|
||||
new_entry.last_message_time = Some(Local.timestamp_opt(update.as_secs().try_into().expect("error"), 0).unwrap());
|
||||
|
||||
new_entry.analysis_size_bytes = analysis_file.metadata().await.map_err(RecordingStoreError::ReadFileError)?.len().try_into().expect("file too large");
|
||||
new_entry.qmdl_size_bytes = qmdl_meta.len().try_into().expect("file too large");
|
||||
|
||||
self.manifest.entries.push(new_entry);
|
||||
self.current_entry = Some(self.manifest.entries.len() - 1);
|
||||
self.write_manifest().await?;
|
||||
Ok((qmdl_file, analysis_file))
|
||||
}
|
||||
|
||||
// Returns the corresponding QMDL file for a given entry
|
||||
pub async fn open_entry_qmdl(&self, entry_index: usize) -> Result<File, RecordingStoreError> {
|
||||
@@ -262,6 +295,8 @@ impl RecordingStore {
|
||||
}
|
||||
|
||||
async fn write_manifest(&mut self) -> Result<(), RecordingStoreError> {
|
||||
// we don't technically need a mutable reference to `self` here, but it
|
||||
// does prevent multiple concurrent writes across different threads
|
||||
let tmp_path = self.path.join("manifest.toml.new");
|
||||
let mut manifest_tmp_file = File::create(&tmp_path)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user