daemon: switch to writing heuristics output to ND-JSON

ND-JSON (newline-delimited JSON) is just a file with a list of JSON
objects separated by newlines. This way, as the analyzer harness
processes new packets, it can simply append JSON-serialized results
to a file without parsing the entire thing first.

Also simplifies the analysis stuff to all operate in the diag thread.
This commit is contained in:
Will Greenberg
2024-05-09 14:46:41 -07:00
parent 4a5bede4ee
commit bfc688ad21
10 changed files with 256 additions and 224 deletions

View File

@@ -1,56 +0,0 @@
use std::sync::Arc;
use axum::{extract::State, http::StatusCode, Json};
use log::error;
use rayhunter::{analysis::analyzer::{AnalysisReport, Harness}, diag::MessagesContainer};
use tokio::sync;
use tokio_util::task::TaskTracker;
use crate::server::ServerState;
#[derive(Debug)]
pub enum AnalysisMessage {
Reset,
GetReport(sync::oneshot::Sender<AnalysisReport>),
AnalyzeContainer(MessagesContainer),
StopThread,
}
pub fn run_analysis_thread(task_tracker: &TaskTracker) -> sync::mpsc::Sender<AnalysisMessage> {
let (tx, mut rx) = sync::mpsc::channel(5);
task_tracker.spawn(async move {
let mut harness = Harness::new_with_all_analyzers();
loop {
match rx.recv().await {
Some(AnalysisMessage::GetReport(sender)) => {
// this might fail if the client closes their connection
// before we're done building the report
if let Err(e) = sender.send(harness.build_analysis_report()) {
error!("failed to send analysis report: {:?}", e);
}
},
Some(AnalysisMessage::Reset) => harness = Harness::new_with_all_analyzers(),
Some(AnalysisMessage::AnalyzeContainer(container)) => harness.analyze_qmdl_messages(container),
Some(AnalysisMessage::StopThread) | None => break,
}
}
});
tx
}
pub async fn get_analysis_report(State(state): State<Arc<ServerState>>) -> Result<Json<AnalysisReport>, (StatusCode, String)> {
if state.readonly_mode {
return Err((StatusCode::FORBIDDEN, "server is in readonly mode".to_string()));
}
let analysis_tx = state.maybe_analysis_tx.as_ref().unwrap();
let (report_tx, report_rx) = tokio::sync::oneshot::channel();
if let Err(e) = analysis_tx.send(AnalysisMessage::GetReport(report_tx)).await {
return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("error reaching analysis thread: {:?}", e)));
}
match report_rx.await {
Ok(report) => Ok(Json(report)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("error fetching analysis report: {:?}", e)))
}
}

View File

@@ -23,10 +23,9 @@ async fn main() {
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin!(qmdl_reader.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
println!("{}\n", serde_json::to_string(&harness.get_metadata()).expect("failed to serialize report metadata"));
while let Some(container) = qmdl_stream.try_next().await.expect("failed getting QMDL container") {
harness.analyze_qmdl_messages(container)
let row = harness.analyze_qmdl_messages(container);
println!("{}\n", serde_json::to_string(&row).expect("failed to serialize row"));
}
let report = harness.build_analysis_report();
println!("{}", serde_json::to_string(&report).expect("failed to serialize report"));
}

View File

@@ -1,4 +1,3 @@
mod analysis;
mod config;
mod error;
mod pcap;
@@ -9,15 +8,14 @@ mod diag;
use crate::config::{parse_config, parse_args};
use crate::diag::run_diag_read_thread;
use crate::qmdl_store::QmdlStore;
use crate::qmdl_store::RecordingStore;
use crate::server::{ServerState, get_qmdl, serve_static};
use crate::pcap::get_pcap;
use crate::stats::get_system_stats;
use crate::error::RayhunterError;
use analysis::{get_analysis_report, run_analysis_thread, AnalysisMessage};
use axum::response::Redirect;
use diag::{DiagDeviceCtrlMessage, start_recording, stop_recording};
use diag::{get_analysis_report, start_recording, stop_recording, DiagDeviceCtrlMessage};
use log::{info, error};
use rayhunter::diag_device::DiagDevice;
use axum::routing::{get, post};
@@ -37,16 +35,14 @@ use std::sync::Arc;
async fn run_server(
task_tracker: &TaskTracker,
config: &config::Config,
qmdl_store_lock: Arc<RwLock<QmdlStore>>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
server_shutdown_rx: oneshot::Receiver<()>,
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
maybe_analysis_tx: Option<Sender<AnalysisMessage>>
diag_device_sender: Sender<DiagDeviceCtrlMessage>
) -> JoinHandle<()> {
let state = Arc::new(ServerState {
qmdl_store_lock,
diag_device_ctrl_sender: diag_device_sender,
readonly_mode: config.readonly_mode,
maybe_analysis_tx,
readonly_mode: config.readonly_mode
});
let app = Router::new()
@@ -77,10 +73,10 @@ async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) {
// Loads a QmdlStore if one exists, and if not, only create one if we're not in
// readonly mode.
async fn init_qmdl_store(config: &config::Config) -> Result<QmdlStore, RayhunterError> {
match (QmdlStore::exists(&config.qmdl_store_path).await?, config.readonly_mode) {
(true, _) => Ok(QmdlStore::load(&config.qmdl_store_path).await?),
(false, false) => Ok(QmdlStore::create(&config.qmdl_store_path).await?),
async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, RayhunterError> {
match (RecordingStore::exists(&config.qmdl_store_path).await?, config.readonly_mode) {
(true, _) => Ok(RecordingStore::load(&config.qmdl_store_path).await?),
(false, false) => Ok(RecordingStore::create(&config.qmdl_store_path).await?),
(false, true) => Err(RayhunterError::NoStoreReadonlyMode(config.qmdl_store_path.clone())),
}
}
@@ -92,8 +88,7 @@ fn run_ctrl_c_thread(
task_tracker: &TaskTracker,
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
server_shutdown_tx: oneshot::Sender<()>,
qmdl_store_lock: Arc<RwLock<QmdlStore>>,
maybe_analysis_tx: Option<Sender<AnalysisMessage>>
qmdl_store_lock: Arc<RwLock<RecordingStore>>
) -> JoinHandle<Result<(), RayhunterError>> {
task_tracker.spawn(async move {
match tokio::signal::ctrl_c().await {
@@ -109,10 +104,6 @@ fn run_ctrl_c_thread(
.expect("couldn't send server shutdown signal");
diag_device_sender.send(DiagDeviceCtrlMessage::Exit).await
.expect("couldn't send Exit message to diag thread");
if let Some(analysis_tx) = maybe_analysis_tx {
analysis_tx.send(AnalysisMessage::StopThread).await
.expect("couldn't send Exit message to analysis thread")
}
},
Err(err) => {
error!("Unable to listen for shutdown signal: {}", err);
@@ -135,21 +126,18 @@ async fn main() -> Result<(), RayhunterError> {
let qmdl_store_lock = Arc::new(RwLock::new(init_qmdl_store(&config).await?));
let (tx, rx) = mpsc::channel::<DiagDeviceCtrlMessage>(1);
let mut maybe_analysis_tx = None;
if !config.readonly_mode {
let mut dev = DiagDevice::new().await
.map_err(RayhunterError::DiagInitError)?;
dev.config_logs().await
.map_err(RayhunterError::DiagInitError)?;
let analysis_tx = run_analysis_thread(&task_tracker);
run_diag_read_thread(&task_tracker, dev, rx, qmdl_store_lock.clone(), analysis_tx.clone());
maybe_analysis_tx = Some(analysis_tx);
run_diag_read_thread(&task_tracker, dev, rx, qmdl_store_lock.clone());
}
let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>();
run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, qmdl_store_lock.clone(), maybe_analysis_tx.clone());
run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, tx, maybe_analysis_tx).await;
run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, qmdl_store_lock.clone());
run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, tx).await;
task_tracker.close();
task_tracker.wait().await;

View File

@@ -1,57 +1,122 @@
use std::pin::pin;
use std::sync::Arc;
use axum::body::Body;
use axum::extract::State;
use axum::http::header::CONTENT_TYPE;
use axum::http::StatusCode;
use rayhunter::diag::DataType;
use axum::response::{IntoResponse, Response};
use rayhunter::analysis::analyzer::Harness;
use rayhunter::diag::{DataType, MessagesContainer};
use rayhunter::diag_device::DiagDevice;
use serde::Serialize;
use tokio::sync::RwLock;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::Receiver;
use rayhunter::qmdl::QmdlWriter;
use log::{debug, error, info};
use tokio::fs::File;
use tokio::io::{BufWriter, AsyncWriteExt};
use tokio_util::io::ReaderStream;
use tokio_util::task::TaskTracker;
use futures::{StreamExt, TryStreamExt};
use crate::analysis::AnalysisMessage;
use crate::qmdl_store::QmdlStore;
use crate::qmdl_store::RecordingStore;
use crate::server::ServerState;
pub enum DiagDeviceCtrlMessage {
StopRecording,
StartRecording(QmdlWriter<File>),
StartRecording((QmdlWriter<File>, File)),
Exit,
}
struct AnalysisWriter {
writer: BufWriter<File>,
harness: Harness,
bytes_written: usize,
}
// We write our analysis results to a file immediately to minimize the amount of
// state Rayhunter has to keep track of in memory. The analysis file's format is
// Newline Delimited JSON
// (https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson), which
// lets us simply append new rows to the end without parsing the entire JSON
// object beforehand.
impl AnalysisWriter {
pub async fn new(file: File) -> Result<Self, std::io::Error> {
let mut result = Self {
writer: BufWriter::new(file),
harness: Harness::new_with_all_analyzers(),
bytes_written: 0,
};
let metadata = result.harness.get_metadata();
result.write(&metadata).await?;
Ok(result)
}
// Runs the analysis harness on the given container, serializing the results
// to the analysis file and returning the file's new length.
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<usize, std::io::Error> {
let row = self.harness.analyze_qmdl_messages(container);
if !row.is_empty() {
self.write(&row).await?;
}
Ok(self.bytes_written)
}
async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
let mut value_str = serde_json::to_string(value).unwrap();
value_str.push('\n');
self.bytes_written += value_str.len();
self.writer.write_all(value_str.as_bytes()).await?;
self.writer.flush().await?;
Ok(())
}
// Flushes any pending I/O to disk before dropping the writer
pub async fn close(mut self) -> Result<(), std::io::Error> {
self.writer.flush().await?;
Ok(())
}
}
pub fn run_diag_read_thread(
task_tracker: &TaskTracker,
mut dev: DiagDevice,
mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>,
qmdl_store_lock: Arc<RwLock<QmdlStore>>,
analysis_tx: Sender<AnalysisMessage>
qmdl_store_lock: Arc<RwLock<RecordingStore>>
) {
task_tracker.spawn(async move {
let initial_file = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry");
let mut qmdl_writer: Option<QmdlWriter<File>> = Some(QmdlWriter::new(initial_file));
let (initial_qmdl_file, initial_analysis_file) = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry");
let mut maybe_qmdl_writer: Option<QmdlWriter<File>> = Some(QmdlWriter::new(initial_qmdl_file));
let mut diag_stream = pin!(dev.as_stream().into_stream());
let mut maybe_analysis_writer = Some(AnalysisWriter::new(initial_analysis_file).await
.expect("failed to create analysis writer"));
loop {
tokio::select! {
msg = qmdl_file_rx.recv() => {
match msg {
Some(DiagDeviceCtrlMessage::StartRecording(new_writer)) => {
qmdl_writer = Some(new_writer);
analysis_tx.send(AnalysisMessage::Reset).await
.expect("failed to send message to analysis thread");
Some(DiagDeviceCtrlMessage::StartRecording((new_writer, new_analysis_file))) => {
maybe_qmdl_writer = Some(new_writer);
if let Some(analysis_writer) = maybe_analysis_writer {
analysis_writer.close().await.expect("failed to close analysis writer");
}
maybe_analysis_writer = Some(AnalysisWriter::new(new_analysis_file).await
.expect("failed to write to analysis file"));
},
Some(DiagDeviceCtrlMessage::StopRecording) => {
qmdl_writer = None;
analysis_tx.send(AnalysisMessage::Reset).await
.expect("failed to send message to analysis thread");
maybe_qmdl_writer = None;
if let Some(analysis_writer) = maybe_analysis_writer {
analysis_writer.close().await.expect("failed to close analysis writer");
}
maybe_analysis_writer = None;
},
// None means all the Senders have been dropped, so it's
// time to go
Some(DiagDeviceCtrlMessage::Exit) | None => {
info!("Diag reader thread exiting...");
if let Some(analysis_writer) = maybe_analysis_writer {
analysis_writer.close().await.expect("failed to close analysis writer");
}
return Ok(())
},
}
@@ -65,20 +130,26 @@ pub fn run_diag_read_thread(
}
// keep track of how many bytes were written to the QMDL file so we can read
// a valid block of data from it in the HTTP server
if let Some(writer) = qmdl_writer.as_mut() {
writer.write_container(&container).await.expect("failed to write to QMDL writer");
debug!("total QMDL bytes written: {}, updating manifest...", writer.total_written);
if let Some(qmdl_writer) = maybe_qmdl_writer.as_mut() {
qmdl_writer.write_container(&container).await.expect("failed to write to QMDL writer");
debug!("total QMDL bytes written: {}, updating manifest...", qmdl_writer.total_written);
let mut qmdl_store = qmdl_store_lock.write().await;
let index = qmdl_store.current_entry.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
qmdl_store.update_entry(index, writer.total_written).await
qmdl_store.update_entry_qmdl_size(index, qmdl_writer.total_written).await
.expect("failed to update qmdl file size");
debug!("sending container to analysis thread...");
analysis_tx.send(AnalysisMessage::AnalyzeContainer(container)).await
.expect("failed sending messages container to analysis thread");
debug!("done!");
} else {
debug!("no qmdl_writer set, continuing...");
}
if let Some(analysis_writer) = maybe_analysis_writer.as_mut() {
let analysis_file_len = analysis_writer.analyze(container).await
.expect("failed to analyze container");
let mut qmdl_store = qmdl_store_lock.write().await;
let index = qmdl_store.current_entry.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
qmdl_store.update_entry_analysis_size(index, analysis_file_len as usize).await
.expect("failed to update analysis file size");
}
},
Err(err) => {
error!("error reading diag device: {}", err);
@@ -96,10 +167,10 @@ pub async fn start_recording(State(state): State<Arc<ServerState>>) -> Result<(S
return Err((StatusCode::FORBIDDEN, "server is in readonly mode".to_string()));
}
let mut qmdl_store = state.qmdl_store_lock.write().await;
let qmdl_file = qmdl_store.new_entry().await
let (qmdl_file, analysis_file) = qmdl_store.new_entry().await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't create new qmdl entry: {}", e)))?;
let qmdl_writer = QmdlWriter::new(qmdl_file);
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)).await
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StartRecording((qmdl_writer, analysis_file))).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}
@@ -115,3 +186,20 @@ pub async fn stop_recording(State(state): State<Arc<ServerState>>) -> Result<(St
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}
pub async fn get_analysis_report(State(state): State<Arc<ServerState>>) -> Result<Response, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let Some(entry) = qmdl_store.get_current_entry() else {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"No QMDL data's being recorded to analyze, try starting a new recording!".to_string()
));
};
let analysis_file = qmdl_store.open_entry_analysis(entry).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?;
let analysis_stream = ReaderStream::new(analysis_file);
let headers = [(CONTENT_TYPE, "application/x-ndjson")];
let body = Body::from_stream(analysis_stream);
Ok((headers, body).into_response())
}

View File

@@ -1,7 +1,7 @@
use thiserror::Error;
use rayhunter::diag_device::DiagDeviceError;
use crate::qmdl_store::QmdlStoreError;
use crate::qmdl_store::RecordingStoreError;
#[derive(Error, Debug)]
pub enum RayhunterError{
@@ -12,7 +12,7 @@ pub enum RayhunterError{
#[error("Tokio error: {0}")]
TokioError(#[from] tokio::io::Error),
#[error("QmdlStore error: {0}")]
QmdlStoreError(#[from] QmdlStoreError),
QmdlStoreError(#[from] RecordingStoreError),
#[error("No QMDL store found at path {0}, but can't create a new one due to readonly mode")]
NoStoreReadonlyMode(String),
}

View File

@@ -23,14 +23,14 @@ pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Pa
let qmdl_store = state.qmdl_store_lock.read().await;
let entry = qmdl_store.entry_for_name(&qmdl_name)
.ok_or((StatusCode::NOT_FOUND, format!("couldn't find qmdl file with name {}", qmdl_name)))?;
if entry.size_bytes == 0 {
if entry.qmdl_size_bytes == 0 {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"QMDL file is empty, try again in a bit!".to_string()
));
}
let qmdl_file = qmdl_store.open_entry(&entry).await
let qmdl_file = qmdl_store.open_entry_qmdl(&entry).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?;
// the QMDL reader should stop at the last successfully written data chunk
// (entry.size_bytes)
@@ -39,7 +39,7 @@ pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Pa
pcap_writer.write_iface_header().await.unwrap();
tokio::spawn(async move {
let mut reader = QmdlReader::new(qmdl_file, Some(entry.size_bytes));
let mut reader = QmdlReader::new(qmdl_file, Some(entry.qmdl_size_bytes));
let mut messages_stream = pin!(reader.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));

View File

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use chrono::{DateTime, Local};
#[derive(Debug, Error)]
pub enum QmdlStoreError {
pub enum RecordingStoreError {
#[error("Can't close an entry when there's no current entry")]
NoCurrentEntry,
#[error("Couldn't create file: {0}")]
@@ -22,7 +22,7 @@ pub enum QmdlStoreError {
ParseManifestError(toml::de::Error)
}
pub struct QmdlStore {
pub struct RecordingStore {
pub path: PathBuf,
pub manifest: Manifest,
pub current_entry: Option<usize>, // index into manifest
@@ -38,7 +38,8 @@ pub struct ManifestEntry {
pub name: String,
pub start_time: DateTime<Local>,
pub last_message_time: Option<DateTime<Local>>,
pub size_bytes: usize,
pub qmdl_size_bytes: usize,
pub analysis_size_bytes: usize,
}
impl ManifestEntry {
@@ -48,113 +49,142 @@ impl ManifestEntry {
name: format!("{}", now.timestamp()),
start_time: now,
last_message_time: None,
size_bytes: 0,
qmdl_size_bytes: 0,
analysis_size_bytes: 0,
}
}
pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
let mut filepath = path.as_ref().join(&self.name);
filepath.set_extension("qmdl");
filepath
}
pub fn get_analysis_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
let mut filepath = path.as_ref().join(&self.name);
filepath.set_extension("ndjson");
filepath
}
}
impl QmdlStore {
impl RecordingStore {
// Returns whether a directory with a "manifest.toml" exists at the given
// path (though doesn't check if that manifest is valid)
pub async fn exists<P>(path: P) -> Result<bool, QmdlStoreError> where P: AsRef<Path> {
pub async fn exists<P>(path: P) -> Result<bool, RecordingStoreError> where P: AsRef<Path> {
let manifest_path = path.as_ref().join("manifest.toml");
let dir_exists = try_exists(path).await.map_err(QmdlStoreError::OpenDirError)?;
let manifest_exists = try_exists(manifest_path).await.map_err(QmdlStoreError::ReadManifestError)?;
let dir_exists = try_exists(path).await.map_err(RecordingStoreError::OpenDirError)?;
let manifest_exists = try_exists(manifest_path).await.map_err(RecordingStoreError::ReadManifestError)?;
Ok(dir_exists && manifest_exists)
}
// Loads an existing QmdlStore at the given path. Errors if no store exists,
// Loads an existing RecordingStore at the given path. Errors if no store exists,
// or if it's malformed.
pub async fn load<P>(path: P) -> Result<Self, QmdlStoreError> where P: AsRef<Path> {
pub async fn load<P>(path: P) -> Result<Self, RecordingStoreError> where P: AsRef<Path> {
let path: PathBuf = path.as_ref().to_path_buf();
let manifest = QmdlStore::read_manifest(&path).await?;
Ok(QmdlStore {
let manifest = RecordingStore::read_manifest(&path).await?;
Ok(RecordingStore {
path,
manifest,
current_entry: None,
})
}
// Creates a new QmdlStore at the given path. This involves creating a dir
// Creates a new RecordingStore at the given path. This involves creating a dir
// and writing an empty manifest.
pub async fn create<P>(path: P) -> Result<Self, QmdlStoreError> where P: AsRef<Path> {
pub async fn create<P>(path: P) -> Result<Self, RecordingStoreError> where P: AsRef<Path> {
let manifest_path = path.as_ref().join("manifest.toml");
fs::create_dir_all(&path).await
.map_err(QmdlStoreError::OpenDirError)?;
.map_err(RecordingStoreError::OpenDirError)?;
let mut manifest_file = File::create(&manifest_path).await
.map_err(QmdlStoreError::WriteManifestError)?;
.map_err(RecordingStoreError::WriteManifestError)?;
let empty_manifest = Manifest { entries: Vec::new() };
let empty_manifest_contents = toml::to_string_pretty(&empty_manifest)
.expect("failed to serialize manifest");
manifest_file.write_all(empty_manifest_contents.as_bytes()).await
.map_err(QmdlStoreError::WriteManifestError)?;
QmdlStore::load(path).await
.map_err(RecordingStoreError::WriteManifestError)?;
RecordingStore::load(path).await
}
async fn read_manifest<P>(path: P) -> Result<Manifest, QmdlStoreError> where P: AsRef<Path> {
async fn read_manifest<P>(path: P) -> Result<Manifest, RecordingStoreError> where P: AsRef<Path> {
let manifest_path = path.as_ref().join("manifest.toml");
let file_contents = fs::read_to_string(&manifest_path).await
.map_err(QmdlStoreError::ReadManifestError)?;
.map_err(RecordingStoreError::ReadManifestError)?;
toml::from_str(&file_contents)
.map_err(QmdlStoreError::ParseManifestError)
.map_err(RecordingStoreError::ParseManifestError)
}
// Closes the current entry (if needed), creates a new entry based on the
// current time, and updates the manifest
pub async fn new_entry(&mut self) -> Result<File, QmdlStoreError> {
// current time, and updates the manifest. Returns a tuple of the entry's
// newly created QMDL file and analysis file.
pub async fn new_entry(&mut self) -> Result<(File, File), RecordingStoreError> {
// if we've already got an entry open, close it
if self.current_entry.is_some() {
self.close_current_entry().await?;
}
let new_entry = ManifestEntry::new();
let mut file_path = self.path.join(&new_entry.name);
file_path.set_extension("qmdl");
let file = File::options()
let qmdl_filepath = new_entry.get_qmdl_filepath(&self.path);
let qmdl_file = File::options()
.create(true)
.write(true)
.open(&file_path).await
.map_err(QmdlStoreError::CreateFileError)?;
.open(&qmdl_filepath).await
.map_err(RecordingStoreError::CreateFileError)?;
let analysis_filepath = new_entry.get_analysis_filepath(&self.path);
let analysis_file = File::options()
.create(true)
.write(true)
.open(&analysis_filepath).await
.map_err(RecordingStoreError::CreateFileError)?;
self.manifest.entries.push(new_entry);
self.current_entry = Some(self.manifest.entries.len() - 1);
self.write_manifest().await?;
Ok(file)
Ok((qmdl_file, analysis_file))
}
// Returns the corresponding QMDL file for a given entry
pub async fn open_entry(&self, entry: &ManifestEntry) -> Result<File, QmdlStoreError> {
let mut file_path = self.path.join(&entry.name);
file_path.set_extension("qmdl");
File::open(file_path).await
.map_err(QmdlStoreError::ReadFileError)
pub async fn open_entry_qmdl(&self, entry: &ManifestEntry) -> Result<File, RecordingStoreError> {
File::open(entry.get_qmdl_filepath(&self.path)).await
.map_err(RecordingStoreError::ReadFileError)
}
// Returns the corresponding QMDL file for a given entry
pub async fn open_entry_analysis(&self, entry: &ManifestEntry) -> Result<File, RecordingStoreError> {
File::open(entry.get_analysis_filepath(&self.path)).await
.map_err(RecordingStoreError::ReadFileError)
}
// Unsets the current entry
pub async fn close_current_entry(&mut self) -> Result<(), QmdlStoreError> {
pub async fn close_current_entry(&mut self) -> Result<(), RecordingStoreError> {
match self.current_entry {
Some(_) => {
self.current_entry = None;
Ok(())
},
None => Err(QmdlStoreError::NoCurrentEntry)
None => Err(RecordingStoreError::NoCurrentEntry)
}
}
// Sets the given entry's size and updates the last_message_time to now, updating the manifest
pub async fn update_entry(&mut self, entry_index: usize, size_bytes: usize) -> Result<(), QmdlStoreError> {
self.manifest.entries[entry_index].size_bytes = size_bytes;
pub async fn update_entry_qmdl_size(&mut self, entry_index: usize, size_bytes: usize) -> Result<(), RecordingStoreError> {
self.manifest.entries[entry_index].qmdl_size_bytes = size_bytes;
self.manifest.entries[entry_index].last_message_time = Some(Local::now());
self.write_manifest().await
}
async fn write_manifest(&mut self) -> Result<(), QmdlStoreError> {
// Sets the given entry's analysis file size
pub async fn update_entry_analysis_size(&mut self, entry_index: usize, size_bytes: usize) -> Result<(), RecordingStoreError> {
self.manifest.entries[entry_index].analysis_size_bytes = size_bytes;
self.write_manifest().await
}
async fn write_manifest(&mut self) -> Result<(), RecordingStoreError> {
let mut manifest_file = File::options()
.write(true)
.open(self.path.join("manifest.toml")).await
.map_err(QmdlStoreError::WriteManifestError)?;
.map_err(RecordingStoreError::WriteManifestError)?;
let manifest_contents = toml::to_string_pretty(&self.manifest)
.expect("failed to serialize manifest");
manifest_file.write_all(manifest_contents.as_bytes()).await
.map_err(QmdlStoreError::WriteManifestError)?;
.map_err(RecordingStoreError::WriteManifestError)?;
Ok(())
}
@@ -164,6 +194,11 @@ impl QmdlStore {
.find(|entry| entry.name == name)
.cloned()
}
pub fn get_current_entry(&self) -> Option<&ManifestEntry> {
let entry_index = self.current_entry?;
self.manifest.entries.get(entry_index)
}
}
#[cfg(test)]
@@ -174,36 +209,36 @@ mod tests {
#[tokio::test]
async fn test_load_from_empty_dir() {
let dir = TempDir::new("qmdl_store_test").unwrap();
assert!(!QmdlStore::exists(dir.path()).await.unwrap());
let _created_store = QmdlStore::create(dir.path()).await.unwrap();
assert!(QmdlStore::exists(dir.path()).await.unwrap());
let loaded_store = QmdlStore::load(dir.path()).await.unwrap();
assert!(!RecordingStore::exists(dir.path()).await.unwrap());
let _created_store = RecordingStore::create(dir.path()).await.unwrap();
assert!(RecordingStore::exists(dir.path()).await.unwrap());
let loaded_store = RecordingStore::load(dir.path()).await.unwrap();
assert_eq!(loaded_store.manifest.entries.len(), 0);
}
#[tokio::test]
async fn test_creating_updating_and_closing_entries() {
let dir = TempDir::new("qmdl_store_test").unwrap();
let mut store = QmdlStore::create(dir.path()).await.unwrap();
let mut store = RecordingStore::create(dir.path()).await.unwrap();
let _ = store.new_entry().await.unwrap();
let entry_index = store.current_entry.unwrap();
assert_eq!(QmdlStore::read_manifest(dir.path()).await.unwrap(), store.manifest);
assert_eq!(RecordingStore::read_manifest(dir.path()).await.unwrap(), store.manifest);
assert!(store.manifest.entries[entry_index].last_message_time.is_none());
store.update_entry(entry_index, 1000).await.unwrap();
store.update_entry_qmdl_size(entry_index, 1000).await.unwrap();
let entry = store.entry_for_name(&store.manifest.entries[entry_index].name).unwrap();
assert!(entry.last_message_time.is_some());
assert_eq!(store.manifest.entries[entry_index].size_bytes, 1000);
assert_eq!(QmdlStore::read_manifest(dir.path()).await.unwrap(), store.manifest);
assert_eq!(store.manifest.entries[entry_index].qmdl_size_bytes, 1000);
assert_eq!(RecordingStore::read_manifest(dir.path()).await.unwrap(), store.manifest);
store.close_current_entry().await.unwrap();
assert!(matches!(store.close_current_entry().await, Err(QmdlStoreError::NoCurrentEntry)));
assert!(matches!(store.close_current_entry().await, Err(RecordingStoreError::NoCurrentEntry)));
}
#[tokio::test]
async fn test_repeated_new_entries() {
let dir = TempDir::new("qmdl_store_test").unwrap();
let mut store = QmdlStore::create(dir.path()).await.unwrap();
let mut store = RecordingStore::create(dir.path()).await.unwrap();
let _ = store.new_entry().await.unwrap();
let entry_index = store.current_entry.unwrap();
let _ = store.new_entry().await.unwrap();

View File

@@ -11,24 +11,22 @@ use tokio::sync::RwLock;
use tokio_util::io::ReaderStream;
use include_dir::{include_dir, Dir};
use crate::analysis::AnalysisMessage;
use crate::DiagDeviceCtrlMessage;
use crate::qmdl_store::QmdlStore;
use crate::qmdl_store::RecordingStore;
pub struct ServerState {
pub qmdl_store_lock: Arc<RwLock<QmdlStore>>,
pub qmdl_store_lock: Arc<RwLock<RecordingStore>>,
pub diag_device_ctrl_sender: Sender<DiagDeviceCtrlMessage>,
pub readonly_mode: bool,
pub maybe_analysis_tx: Option<Sender<AnalysisMessage>>,
pub readonly_mode: bool
}
pub async fn get_qmdl(State(state): State<Arc<ServerState>>, Path(qmdl_name): Path<String>) -> Result<Response, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let entry = qmdl_store.entry_for_name(&qmdl_name)
.ok_or((StatusCode::NOT_FOUND, format!("couldn't find qmdl file with name {}", qmdl_name)))?;
let qmdl_file = qmdl_store.open_entry(&entry).await
let qmdl_file = qmdl_store.open_entry_qmdl(&entry).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("error opening QMDL file: {}", e)))?;
let limited_qmdl_file = qmdl_file.take(entry.size_bytes as u64);
let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64);
let qmdl_stream = ReaderStream::new(limited_qmdl_file);
let headers = [(CONTENT_TYPE, "application/octet-stream")];