On-demand analysis of past recordings

* rayhunter-daemon: API for triggering and reading analysis
* rayhunter-daemon: rename readonly mode to debug mode
* rayhunter-daemon: debug mode allows live-loading frontend files
* rayhunter-check: rework to handle directories
* rayhunter-check: better output
* CI: build rayhunter-check
This commit is contained in:
Will Greenberg
2024-08-15 18:18:19 -07:00
committed by Cooper Quintin
parent c59fb7c013
commit df84faa1f9
13 changed files with 636 additions and 134 deletions

240
bin/src/analysis.rs Normal file
View File

@@ -0,0 +1,240 @@
use std::sync::Arc;
use std::{future, pin};
use axum::Json;
use axum::{
extract::{Path, State},
http::StatusCode,
};
use futures::TryStreamExt;
use log::{debug, error, info};
use rayhunter::analysis::analyzer::Harness;
use rayhunter::diag::{DataType, MessagesContainer};
use rayhunter::qmdl::QmdlReader;
use serde::Serialize;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::mpsc::Receiver;
use tokio::sync::{RwLock, RwLockWriteGuard};
use tokio_util::task::TaskTracker;
use crate::qmdl_store::RecordingStore;
use crate::server::ServerState;
pub struct AnalysisWriter {
writer: BufWriter<File>,
harness: Harness,
bytes_written: usize,
}
// We write our analysis results to a file immediately to minimize the amount of
// state Rayhunter has to keep track of in memory. The analysis file's format is
// Newline Delimited JSON
// (https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson), which
// lets us simply append new rows to the end without parsing the entire JSON
// object beforehand.
impl AnalysisWriter {
pub async fn new(file: File) -> Result<Self, std::io::Error> {
let mut result = Self {
writer: BufWriter::new(file),
harness: Harness::new_with_all_analyzers(),
bytes_written: 0,
};
let metadata = result.harness.get_metadata();
result.write(&metadata).await?;
Ok(result)
}
// Runs the analysis harness on the given container, serializing the results
// to the analysis file and returning the file's new length.
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<usize, std::io::Error> {
let row = self.harness.analyze_qmdl_messages(container);
if !row.is_empty() {
self.write(&row).await?;
}
Ok(self.bytes_written)
}
async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
let mut value_str = serde_json::to_string(value).unwrap();
value_str.push('\n');
self.bytes_written += value_str.len();
self.writer.write_all(value_str.as_bytes()).await?;
self.writer.flush().await?;
Ok(())
}
// Flushes any pending I/O to disk before dropping the writer
pub async fn close(mut self) -> Result<(), std::io::Error> {
self.writer.flush().await?;
Ok(())
}
}
#[derive(Debug, Serialize, Clone, Default)]
pub struct AnalysisStatus {
queued: Vec<String>,
running: Option<String>,
}
pub enum AnalysisCtrlMessage {
NewFilesQueued,
Exit,
}
async fn queued_len(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) -> usize {
analysis_status_lock.read().await.queued.len()
}
async fn dequeue_to_running(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) -> String {
let mut analysis_status = analysis_status_lock.write().await;
let name = analysis_status.queued.remove(0);
assert!(analysis_status.running.is_none());
analysis_status.running = Some(name.clone());
name
}
async fn clear_running(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) {
let mut analysis_status = analysis_status_lock.write().await;
analysis_status.running = None;
}
async fn perform_analysis(
name: &str,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
) -> Result<(), String> {
info!("Opening QMDL and analysis file for {}...", name);
let (analysis_file, qmdl_file, entry_index) = {
let mut qmdl_store = qmdl_store_lock.write().await;
let (entry_index, _) = qmdl_store
.entry_for_name(&name)
.ok_or(format!("failed to find QMDL store entry for {}", name))?;
let analysis_file = qmdl_store
.clear_and_open_entry_analysis(entry_index)
.await
.map_err(|e| format!("{:?}", e))?;
let qmdl_file = qmdl_store
.open_entry_qmdl(entry_index)
.await
.map_err(|e| format!("{:?}", e))?;
(analysis_file, qmdl_file, entry_index)
};
let mut analysis_writer = AnalysisWriter::new(analysis_file)
.await
.map_err(|e| format!("{:?}", e))?;
let file_size = qmdl_file
.metadata()
.await
.expect("failed to get QMDL file metadata")
.len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin::pin!(qmdl_reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
info!("Starting analysis for {}...", name);
while let Some(container) = qmdl_stream
.try_next()
.await
.expect("failed getting QMDL container")
{
let size_bytes = analysis_writer
.analyze(container)
.await
.map_err(|e| format!("{:?}", e))?;
debug!("{} analysis: {} bytes written", name, size_bytes);
let mut qmdl_store = qmdl_store_lock.write().await;
qmdl_store
.update_entry_analysis_size(entry_index, size_bytes)
.await
.map_err(|e| format!("{:?}", e))?;
}
analysis_writer
.close()
.await
.map_err(|e| format!("{:?}", e))?;
info!("Analysis for {} complete!", name);
Ok(())
}
pub fn run_analysis_thread(
task_tracker: &TaskTracker,
mut analysis_rx: Receiver<AnalysisCtrlMessage>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
analysis_status_lock: Arc<RwLock<AnalysisStatus>>,
) {
task_tracker.spawn(async move {
loop {
match analysis_rx.recv().await {
Some(AnalysisCtrlMessage::NewFilesQueued) => {
let count = queued_len(analysis_status_lock.clone()).await;
for _ in 0..count {
let name = dequeue_to_running(analysis_status_lock.clone()).await;
if let Err(err) = perform_analysis(&name, qmdl_store_lock.clone()).await {
error!("failed to analyze {}: {}", name, err);
}
clear_running(analysis_status_lock.clone()).await;
}
}
Some(AnalysisCtrlMessage::Exit) | None => return,
}
}
});
}
pub async fn get_analysis_status(
State(state): State<Arc<ServerState>>,
) -> Result<Json<AnalysisStatus>, (StatusCode, String)> {
Ok(Json(state.analysis_status_lock.read().await.clone()))
}
fn queue_qmdl(name: &str, analysis_status: &mut RwLockWriteGuard<AnalysisStatus>) -> bool {
if analysis_status.queued.iter().any(|n| n == name)
|| analysis_status.running.iter().any(|n| n == name)
{
return false;
}
analysis_status.queued.push(name.to_string());
true
}
pub async fn start_analysis(
State(state): State<Arc<ServerState>>,
Path(qmdl_name): Path<String>,
) -> Result<(StatusCode, Json<AnalysisStatus>), (StatusCode, String)> {
let mut analysis_status = state.analysis_status_lock.write().await;
let store = state.qmdl_store_lock.read().await;
let queued = if qmdl_name.is_empty() {
let mut entry_names: Vec<&str> = store
.manifest
.entries
.iter()
.map(|e| e.name.as_str())
.collect();
if let Some(current_entry) = store.current_entry {
entry_names.remove(current_entry);
}
entry_names
.iter()
.any(|name| queue_qmdl(name, &mut analysis_status))
} else {
queue_qmdl(&qmdl_name, &mut analysis_status)
};
if queued {
state
.analysis_sender
.send(AnalysisCtrlMessage::NewFilesQueued)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to queue new analysis files: {:?}", e),
)
})?;
}
Ok((StatusCode::ACCEPTED, Json(analysis_status.clone())))
}

View File

@@ -1,6 +1,6 @@
use std::{collections::HashMap, future, path::PathBuf, pin::pin};
use rayhunter::{analysis::analyzer::Harness, diag::DataType, qmdl::QmdlReader};
use tokio::fs::File;
use tokio::fs::{metadata, read_dir, File};
use clap::Parser;
use futures::TryStreamExt;
@@ -9,24 +9,17 @@ use futures::TryStreamExt;
struct Args {
#[arg(short, long)]
qmdl_path: PathBuf,
#[arg(long)]
show_skipped: bool,
}
#[tokio::main]
async fn main() {
env_logger::init();
let args = Args::parse();
let mut harness = Harness::new_with_all_analyzers();
let qmdl_file = File::open(args.qmdl_path).await.expect("failed to open QMDL file");
async fn analyze_file(harness: &mut Harness, qmdl_path: &str, show_skipped: bool) {
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
let file_size = qmdl_file.metadata().await.expect("failed to get QMDL file metadata").len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin!(qmdl_reader.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
println!("Analyzers:");
for analyzer in harness.get_metadata().analyzers {
println!(" - {}: {}", analyzer.name, analyzer.description);
}
let mut skipped_reasons: HashMap<String, i32> = HashMap::new();
let mut total_messages = 0;
let mut warnings = 0;
@@ -47,11 +40,37 @@ async fn main() {
}
}
}
if skipped > 0 {
println!("Messages skipped:");
if show_skipped && skipped > 0 {
println!("{}: messages skipped:", qmdl_path);
for (reason, count) in skipped_reasons.iter() {
println!(" - {}: \"{}\"", count, reason);
}
}
println!("{} messages analyzed, {} warnings, {} messages skipped", total_messages, warnings, skipped);
println!("{}: {} messages analyzed, {} warnings, {} messages skipped", qmdl_path, total_messages, warnings, skipped);
}
#[tokio::main]
async fn main() {
env_logger::init();
let args = Args::parse();
let mut harness = Harness::new_with_all_analyzers();
println!("Analyzers:");
for analyzer in harness.get_metadata().analyzers {
println!(" - {}: {}", analyzer.name, analyzer.description);
}
let metadata = metadata(&args.qmdl_path).await.expect("failed to get metadata");
if metadata.is_dir() {
let mut dir = read_dir(&args.qmdl_path).await.expect("failed to read dir");
while let Some(entry) = dir.next_entry().await.expect("failed to get entry") {
let name = entry.file_name();
let name_str = name.to_str().unwrap();
if name_str.ends_with(".qmdl") {
analyze_file(&mut harness, entry.path().to_str().unwrap(), args.show_skipped).await;
}
}
} else {
analyze_file(&mut harness, args.qmdl_path.to_str().unwrap(), args.show_skipped).await;
}
}

View File

@@ -6,7 +6,7 @@ use serde::Deserialize;
struct ConfigFile {
qmdl_store_path: Option<String>,
port: Option<u16>,
readonly_mode: Option<bool>,
debug_mode: Option<bool>,
ui_level: Option<u8>,
}
@@ -14,7 +14,7 @@ struct ConfigFile {
pub struct Config {
pub qmdl_store_path: String,
pub port: u16,
pub readonly_mode: bool,
pub debug_mode: bool,
pub ui_level: u8,
}
@@ -23,7 +23,7 @@ impl Default for Config {
Config {
qmdl_store_path: "/data/rayhunter/qmdl".to_string(),
port: 8080,
readonly_mode: false,
debug_mode: false,
ui_level: 1,
}
}
@@ -36,7 +36,7 @@ pub fn parse_config<P>(path: P) -> Result<Config, RayhunterError> where P: AsRef
.map_err(RayhunterError::ConfigFileParsingError)?;
if let Some(path) = parsed_config.qmdl_store_path { config.qmdl_store_path = path }
if let Some(port) = parsed_config.port { config.port = port }
if let Some(readonly_mode) = parsed_config.readonly_mode { config.readonly_mode = readonly_mode }
if let Some(debug_mode) = parsed_config.debug_mode { config.debug_mode = debug_mode }
if let Some(ui_level) = parsed_config.ui_level { config.ui_level = ui_level }
}
Ok(config)

View File

@@ -1,3 +1,4 @@
mod analysis;
mod config;
mod error;
mod pcap;
@@ -16,6 +17,7 @@ use crate::stats::get_system_stats;
use crate::error::RayhunterError;
use crate::framebuffer::Framebuffer;
use analysis::{get_analysis_status, run_analysis_thread, start_analysis, AnalysisCtrlMessage, AnalysisStatus};
use axum::response::Redirect;
use diag::{get_analysis_report, start_recording, stop_recording, DiagDeviceCtrlMessage};
use log::{info, error};
@@ -44,14 +46,18 @@ async fn run_server(
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
server_shutdown_rx: oneshot::Receiver<()>,
ui_update_tx: Sender<framebuffer::DisplayState>,
diag_device_sender: Sender<DiagDeviceCtrlMessage>
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
analysis_sender: Sender<AnalysisCtrlMessage>,
analysis_status_lock: Arc<RwLock<AnalysisStatus>>,
) -> JoinHandle<()> {
info!("spinning up server");
let state = Arc::new(ServerState {
qmdl_store_lock,
diag_device_ctrl_sender: diag_device_sender,
ui_update_sender: ui_update_tx,
readonly_mode: config.readonly_mode
debug_mode: config.debug_mode,
analysis_status_lock,
analysis_sender,
});
let app = Router::new()
@@ -61,7 +67,9 @@ async fn run_server(
.route("/api/qmdl-manifest", get(get_qmdl_manifest))
.route("/api/start-recording", post(start_recording))
.route("/api/stop-recording", post(stop_recording))
.route("/api/analysis-report", get(get_analysis_report))
.route("/api/analysis-report/*name", get(get_analysis_report))
.route("/api/analysis", get(get_analysis_status))
.route("/api/analysis/*name", post(start_analysis))
.route("/", get(|| async { Redirect::permanent("/index.html") }))
.route("/*path", get(serve_static))
.with_state(state);
@@ -81,12 +89,12 @@ async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) {
}
// Loads a QmdlStore if one exists, and if not, only create one if we're not in
// readonly mode.
// debug mode.
async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, RayhunterError> {
match (RecordingStore::exists(&config.qmdl_store_path).await?, config.readonly_mode) {
match (RecordingStore::exists(&config.qmdl_store_path).await?, config.debug_mode) {
(true, _) => Ok(RecordingStore::load(&config.qmdl_store_path).await?),
(false, false) => Ok(RecordingStore::create(&config.qmdl_store_path).await?),
(false, true) => Err(RayhunterError::NoStoreReadonlyMode(config.qmdl_store_path.clone())),
(false, true) => Err(RayhunterError::NoStoreDebugMode(config.qmdl_store_path.clone())),
}
}
@@ -97,8 +105,9 @@ fn run_ctrl_c_thread(
task_tracker: &TaskTracker,
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
server_shutdown_tx: oneshot::Sender<()>,
ui_shutdown_tx: oneshot::Sender<()>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>
maybe_ui_shutdown_tx: Option<oneshot::Sender<()>>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
analysis_tx: Sender<AnalysisCtrlMessage>,
) -> JoinHandle<Result<(), RayhunterError>> {
task_tracker.spawn(async move {
match tokio::signal::ctrl_c().await {
@@ -113,10 +122,14 @@ fn run_ctrl_c_thread(
server_shutdown_tx.send(())
.expect("couldn't send server shutdown signal");
info!("sending UI shutdown");
ui_shutdown_tx.send(())
.expect("couldn't send ui shutdown signal");
if let Some(ui_shutdown_tx) = maybe_ui_shutdown_tx {
ui_shutdown_tx.send(())
.expect("couldn't send ui shutdown signal");
}
diag_device_sender.send(DiagDeviceCtrlMessage::Exit).await
.expect("couldn't send Exit message to diag thread");
analysis_tx.send(AnalysisCtrlMessage::Exit).await
.expect("couldn't send Exit message to analysis thread");
},
Err(err) => {
error!("Unable to listen for shutdown signal: {}", err);
@@ -151,8 +164,7 @@ fn update_ui(task_tracker: &TaskTracker, config: &config::Config, mut ui_shutdo
break;
},
Err(TryRecvError::Empty) => {},
Err(e) => error!("error receiving shutdown message: {e}")
Err(e) => panic!("error receiving shutdown message: {e}")
}
match ui_update_rx.try_recv() {
Ok(state) => {
@@ -161,7 +173,7 @@ fn update_ui(task_tracker: &TaskTracker, config: &config::Config, mut ui_shutdo
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {},
Err(e) => error!("error receiving framebuffer update message: {e}")
}
match display_level {
2 => {
fb.draw_gif(img.unwrap());
@@ -200,8 +212,11 @@ async fn main() -> Result<(), RayhunterError> {
let qmdl_store_lock = Arc::new(RwLock::new(init_qmdl_store(&config).await?));
let (tx, rx) = mpsc::channel::<DiagDeviceCtrlMessage>(1);
let (ui_update_tx, ui_update_rx) = mpsc::channel::<framebuffer::DisplayState>(1);
let (ui_shutdown_tx, ui_shutdown_rx) = oneshot::channel();
if !config.readonly_mode {
let (analysis_tx, analysis_rx) = mpsc::channel::<AnalysisCtrlMessage>(5);
let mut maybe_ui_shutdown_tx = None;
if !config.debug_mode {
let (ui_shutdown_tx, ui_shutdown_rx) = oneshot::channel();
maybe_ui_shutdown_tx = Some(ui_shutdown_tx);
let mut dev = DiagDevice::new().await
.map_err(RayhunterError::DiagInitError)?;
dev.config_logs().await
@@ -214,8 +229,10 @@ async fn main() -> Result<(), RayhunterError> {
}
let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>();
info!("create shutdown thread");
run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, ui_shutdown_tx, qmdl_store_lock.clone());
run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, ui_update_tx, tx).await;
let analysis_status_lock = Arc::new(RwLock::new(AnalysisStatus::default()));
run_analysis_thread(&task_tracker, analysis_rx, qmdl_store_lock.clone(), analysis_status_lock.clone());
run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, maybe_ui_shutdown_tx, qmdl_store_lock.clone(), analysis_tx.clone());
run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, ui_update_tx, tx, analysis_tx, analysis_status_lock).await;
task_tracker.close();
task_tracker.wait().await;

View File

@@ -2,7 +2,7 @@ use std::pin::pin;
use std::sync::Arc;
use axum::body::Body;
use axum::extract::State;
use axum::extract::{Path, State};
use axum::http::header::CONTENT_TYPE;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
@@ -15,7 +15,8 @@ use tokio::sync::mpsc::{Receiver, Sender};
use rayhunter::qmdl::QmdlWriter;
use log::{debug, error, info};
use tokio::fs::File;
use tokio::io::{BufWriter, AsyncWriteExt};
use tokio::io::BufWriter;
use tokio::io::AsyncWriteExt;
use tokio_util::io::ReaderStream;
use tokio_util::task::TaskTracker;
use futures::{StreamExt, TryStreamExt};
@@ -171,8 +172,8 @@ pub fn run_diag_read_thread(
}
pub async fn start_recording(State(state): State<Arc<ServerState>>) -> Result<(StatusCode, String), (StatusCode, String)> {
if state.readonly_mode {
return Err((StatusCode::FORBIDDEN, "server is in readonly mode".to_string()));
if state.debug_mode {
return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string()));
}
let mut qmdl_store = state.qmdl_store_lock.write().await;
let (qmdl_file, analysis_file) = qmdl_store.new_entry().await
@@ -186,8 +187,8 @@ pub async fn start_recording(State(state): State<Arc<ServerState>>) -> Result<(S
}
pub async fn stop_recording(State(state): State<Arc<ServerState>>) -> Result<(StatusCode, String), (StatusCode, String)> {
if state.readonly_mode {
return Err((StatusCode::FORBIDDEN, "server is in readonly mode".to_string()));
if state.debug_mode {
return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string()));
}
let mut qmdl_store = state.qmdl_store_lock.write().await;
qmdl_store.close_current_entry().await
@@ -199,15 +200,20 @@ pub async fn stop_recording(State(state): State<Arc<ServerState>>) -> Result<(St
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}
pub async fn get_analysis_report(State(state): State<Arc<ServerState>>) -> Result<Response, (StatusCode, String)> {
pub async fn get_analysis_report(State(state): State<Arc<ServerState>>, Path(qmdl_name): Path<String>) -> Result<Response, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let Some(entry) = qmdl_store.get_current_entry() else {
return Err((
let (entry_index, _) = if qmdl_name == "live" {
qmdl_store.get_current_entry().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"No QMDL data's being recorded to analyze, try starting a new recording!".to_string()
));
))?
} else {
qmdl_store.entry_for_name(&qmdl_name).ok_or((
StatusCode::NOT_FOUND,
format!("Couldn't find QMDL entry with name \"{}\"", qmdl_name)
))?
};
let analysis_file = qmdl_store.open_entry_analysis(entry).await
let analysis_file = qmdl_store.open_entry_analysis(entry_index).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?;
let analysis_stream = ReaderStream::new(analysis_file);

View File

@@ -13,6 +13,6 @@ pub enum RayhunterError{
TokioError(#[from] tokio::io::Error),
#[error("QmdlStore error: {0}")]
QmdlStoreError(#[from] RecordingStoreError),
#[error("No QMDL store found at path {0}, but can't create a new one due to readonly mode")]
NoStoreReadonlyMode(String),
#[error("No QMDL store found at path {0}, but can't create a new one due to debug mode")]
NoStoreDebugMode(String),
}

View File

@@ -21,7 +21,7 @@ use futures::TryStreamExt;
// pcap data to a channel that's piped to the client.
pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Path<String>) -> Result<Response, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let entry = qmdl_store.entry_for_name(&qmdl_name)
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_name)
.ok_or((StatusCode::NOT_FOUND, format!("couldn't find qmdl file with name {}", qmdl_name)))?;
if entry.qmdl_size_bytes == 0 {
return Err((
@@ -29,8 +29,8 @@ pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Pa
"QMDL file is empty, try again in a bit!".to_string()
));
}
let qmdl_file = qmdl_store.open_entry_qmdl(&entry).await
let qmdl_size_bytes = entry.qmdl_size_bytes;
let qmdl_file = qmdl_store.open_entry_qmdl(entry_index).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?;
// the QMDL reader should stop at the last successfully written data chunk
// (entry.size_bytes)
@@ -39,10 +39,10 @@ pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Pa
pcap_writer.write_iface_header().await.unwrap();
tokio::spawn(async move {
let mut reader = QmdlReader::new(qmdl_file, Some(entry.qmdl_size_bytes));
let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes));
let mut messages_stream = pin!(reader.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
while let Some(container) = messages_stream.try_next().await.expect("failed getting QMDL container") {
for maybe_msg in container.into_messages() {
match maybe_msg {

View File

@@ -1,8 +1,11 @@
use std::path::{PathBuf, Path};
use thiserror::Error;
use tokio::{fs::{self, File, try_exists}, io::AsyncWriteExt};
use serde::{Deserialize, Serialize};
use chrono::{DateTime, Local};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use thiserror::Error;
use tokio::{
fs::{self, try_exists, File, OpenOptions},
io::AsyncWriteExt,
};
#[derive(Debug, Error)]
pub enum RecordingStoreError {
@@ -19,7 +22,7 @@ pub enum RecordingStoreError {
#[error("Couldn't write manifest file: {0}")]
WriteManifestError(tokio::io::Error),
#[error("Couldn't parse QMDL store manifest file: {0}")]
ParseManifestError(toml::de::Error)
ParseManifestError(toml::de::Error),
}
pub struct RecordingStore {
@@ -70,16 +73,26 @@ impl ManifestEntry {
impl RecordingStore {
// Returns whether a directory with a "manifest.toml" exists at the given
// path (though doesn't check if that manifest is valid)
pub async fn exists<P>(path: P) -> Result<bool, RecordingStoreError> where P: AsRef<Path> {
pub async fn exists<P>(path: P) -> Result<bool, RecordingStoreError>
where
P: AsRef<Path>,
{
let manifest_path = path.as_ref().join("manifest.toml");
let dir_exists = try_exists(path).await.map_err(RecordingStoreError::OpenDirError)?;
let manifest_exists = try_exists(manifest_path).await.map_err(RecordingStoreError::ReadManifestError)?;
let dir_exists = try_exists(path)
.await
.map_err(RecordingStoreError::OpenDirError)?;
let manifest_exists = try_exists(manifest_path)
.await
.map_err(RecordingStoreError::ReadManifestError)?;
Ok(dir_exists && manifest_exists)
}
// Loads an existing RecordingStore at the given path. Errors if no store exists,
// or if it's malformed.
pub async fn load<P>(path: P) -> Result<Self, RecordingStoreError> where P: AsRef<Path> {
pub async fn load<P>(path: P) -> Result<Self, RecordingStoreError>
where
P: AsRef<Path>,
{
let path: PathBuf = path.as_ref().to_path_buf();
let manifest = RecordingStore::read_manifest(&path).await?;
Ok(RecordingStore {
@@ -91,26 +104,38 @@ impl RecordingStore {
// Creates a new RecordingStore at the given path. This involves creating a dir
// and writing an empty manifest.
pub async fn create<P>(path: P) -> Result<Self, RecordingStoreError> where P: AsRef<Path> {
pub async fn create<P>(path: P) -> Result<Self, RecordingStoreError>
where
P: AsRef<Path>,
{
let manifest_path = path.as_ref().join("manifest.toml");
fs::create_dir_all(&path).await
fs::create_dir_all(&path)
.await
.map_err(RecordingStoreError::OpenDirError)?;
let mut manifest_file = File::create(&manifest_path).await
let mut manifest_file = File::create(&manifest_path)
.await
.map_err(RecordingStoreError::WriteManifestError)?;
let empty_manifest = Manifest { entries: Vec::new() };
let empty_manifest_contents = toml::to_string_pretty(&empty_manifest)
.expect("failed to serialize manifest");
manifest_file.write_all(empty_manifest_contents.as_bytes()).await
let empty_manifest = Manifest {
entries: Vec::new(),
};
let empty_manifest_contents =
toml::to_string_pretty(&empty_manifest).expect("failed to serialize manifest");
manifest_file
.write_all(empty_manifest_contents.as_bytes())
.await
.map_err(RecordingStoreError::WriteManifestError)?;
RecordingStore::load(path).await
}
async fn read_manifest<P>(path: P) -> Result<Manifest, RecordingStoreError> where P: AsRef<Path> {
async fn read_manifest<P>(path: P) -> Result<Manifest, RecordingStoreError>
where
P: AsRef<Path>,
{
let manifest_path = path.as_ref().join("manifest.toml");
let file_contents = fs::read_to_string(&manifest_path).await
let file_contents = fs::read_to_string(&manifest_path)
.await
.map_err(RecordingStoreError::ReadManifestError)?;
toml::from_str(&file_contents)
.map_err(RecordingStoreError::ParseManifestError)
toml::from_str(&file_contents).map_err(RecordingStoreError::ParseManifestError)
}
// Closes the current entry (if needed), creates a new entry based on the
@@ -126,13 +151,15 @@ impl RecordingStore {
let qmdl_file = File::options()
.create(true)
.write(true)
.open(&qmdl_filepath).await
.open(&qmdl_filepath)
.await
.map_err(RecordingStoreError::CreateFileError)?;
let analysis_filepath = new_entry.get_analysis_filepath(&self.path);
let analysis_file = File::options()
.create(true)
.write(true)
.open(&analysis_filepath).await
.open(&analysis_filepath)
.await
.map_err(RecordingStoreError::CreateFileError)?;
self.manifest.entries.push(new_entry);
self.current_entry = Some(self.manifest.entries.len() - 1);
@@ -141,37 +168,71 @@ impl RecordingStore {
}
// Returns the corresponding QMDL file for a given entry
pub async fn open_entry_qmdl(&self, entry: &ManifestEntry) -> Result<File, RecordingStoreError> {
File::open(entry.get_qmdl_filepath(&self.path)).await
pub async fn open_entry_qmdl(
&self,
entry_index: usize,
) -> Result<File, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index];
File::open(entry.get_qmdl_filepath(&self.path))
.await
.map_err(RecordingStoreError::ReadFileError)
}
// Returns the corresponding QMDL file for a given entry
pub async fn open_entry_analysis(&self, entry: &ManifestEntry) -> Result<File, RecordingStoreError> {
File::open(entry.get_analysis_filepath(&self.path)).await
pub async fn open_entry_analysis(
&self,
entry_index: usize,
) -> Result<File, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index];
File::open(entry.get_analysis_filepath(&self.path))
.await
.map_err(RecordingStoreError::ReadFileError)
}
pub async fn clear_and_open_entry_analysis(
&mut self,
entry_index: usize,
) -> Result<File, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index];
let file = OpenOptions::new()
.write(true)
.truncate(true)
.open(entry.get_analysis_filepath(&self.path))
.await
.map_err(RecordingStoreError::ReadFileError)?;
self.update_entry_analysis_size(entry_index, 0)
.await?;
Ok(file)
}
// Unsets the current entry
pub async fn close_current_entry(&mut self) -> Result<(), RecordingStoreError> {
match self.current_entry {
Some(_) => {
self.current_entry = None;
Ok(())
},
None => Err(RecordingStoreError::NoCurrentEntry)
}
None => Err(RecordingStoreError::NoCurrentEntry),
}
}
// Sets the given entry's size and updates the last_message_time to now, updating the manifest
pub async fn update_entry_qmdl_size(&mut self, entry_index: usize, size_bytes: usize) -> Result<(), RecordingStoreError> {
pub async fn update_entry_qmdl_size(
&mut self,
entry_index: usize,
size_bytes: usize,
) -> Result<(), RecordingStoreError> {
self.manifest.entries[entry_index].qmdl_size_bytes = size_bytes;
self.manifest.entries[entry_index].last_message_time = Some(Local::now());
self.write_manifest().await
}
// Sets the given entry's analysis file size
pub async fn update_entry_analysis_size(&mut self, entry_index: usize, size_bytes: usize) -> Result<(), RecordingStoreError> {
pub async fn update_entry_analysis_size(
&mut self,
entry_index: usize,
size_bytes: usize,
) -> Result<(), RecordingStoreError> {
self.manifest.entries[entry_index].analysis_size_bytes = size_bytes;
self.write_manifest().await
}
@@ -179,32 +240,37 @@ impl RecordingStore {
async fn write_manifest(&mut self) -> Result<(), RecordingStoreError> {
let mut manifest_file = File::options()
.write(true)
.open(self.path.join("manifest.toml")).await
.open(self.path.join("manifest.toml"))
.await
.map_err(RecordingStoreError::WriteManifestError)?;
let manifest_contents = toml::to_string_pretty(&self.manifest)
.expect("failed to serialize manifest");
manifest_file.write_all(manifest_contents.as_bytes()).await
let manifest_contents =
toml::to_string_pretty(&self.manifest).expect("failed to serialize manifest");
manifest_file
.write_all(manifest_contents.as_bytes())
.await
.map_err(RecordingStoreError::WriteManifestError)?;
Ok(())
}
// Finds an entry by filename
pub fn entry_for_name(&self, name: &str) -> Option<ManifestEntry> {
self.manifest.entries.iter()
.find(|entry| entry.name == name)
.cloned()
pub fn entry_for_name(&self, name: &str) -> Option<(usize, &ManifestEntry)> {
let entry_index = self.manifest
.entries
.iter()
.position(|entry| entry.name == name)?;
Some((entry_index, &self.manifest.entries[entry_index]))
}
pub fn get_current_entry(&self) -> Option<&ManifestEntry> {
pub fn get_current_entry(&self) -> Option<(usize, &ManifestEntry)> {
let entry_index = self.current_entry?;
self.manifest.entries.get(entry_index)
Some((entry_index, &self.manifest.entries[entry_index]))
}
}
#[cfg(test)]
mod tests {
use tempfile::{TempDir, Builder};
use super::*;
use tempfile::{Builder, TempDir};
fn make_temp_dir() -> TempDir {
Builder::new().prefix("qmdl_store_test").tempdir().unwrap()
@@ -226,17 +292,33 @@ mod tests {
let mut store = RecordingStore::create(dir.path()).await.unwrap();
let _ = store.new_entry().await.unwrap();
let entry_index = store.current_entry.unwrap();
assert_eq!(RecordingStore::read_manifest(dir.path()).await.unwrap(), store.manifest);
assert!(store.manifest.entries[entry_index].last_message_time.is_none());
assert_eq!(
RecordingStore::read_manifest(dir.path()).await.unwrap(),
store.manifest
);
assert!(store.manifest.entries[entry_index]
.last_message_time
.is_none());
store.update_entry_qmdl_size(entry_index, 1000).await.unwrap();
let entry = store.entry_for_name(&store.manifest.entries[entry_index].name).unwrap();
store
.update_entry_qmdl_size(entry_index, 1000)
.await
.unwrap();
let (entry_index, entry) = store
.entry_for_name(&store.manifest.entries[entry_index].name)
.unwrap();
assert!(entry.last_message_time.is_some());
assert_eq!(store.manifest.entries[entry_index].qmdl_size_bytes, 1000);
assert_eq!(RecordingStore::read_manifest(dir.path()).await.unwrap(), store.manifest);
assert_eq!(
RecordingStore::read_manifest(dir.path()).await.unwrap(),
store.manifest
);
store.close_current_entry().await.unwrap();
assert!(matches!(store.close_current_entry().await, Err(RecordingStoreError::NoCurrentEntry)));
assert!(matches!(
store.close_current_entry().await,
Err(RecordingStoreError::NoCurrentEntry)
));
}
#[tokio::test]

View File

@@ -4,6 +4,7 @@ use axum::extract::State;
use axum::http::{StatusCode, HeaderValue};
use axum::response::{Response, IntoResponse};
use axum::extract::Path;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::Sender;
use std::sync::Arc;
@@ -12,20 +13,23 @@ use tokio_util::io::ReaderStream;
use include_dir::{include_dir, Dir};
use crate::{framebuffer, DiagDeviceCtrlMessage};
use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus};
use crate::qmdl_store::RecordingStore;
pub struct ServerState {
pub qmdl_store_lock: Arc<RwLock<RecordingStore>>,
pub diag_device_ctrl_sender: Sender<DiagDeviceCtrlMessage>,
pub ui_update_sender: Sender<framebuffer::DisplayState>,
pub readonly_mode: bool
pub analysis_status_lock: Arc<RwLock<AnalysisStatus>>,
pub analysis_sender: Sender<AnalysisCtrlMessage>,
pub debug_mode: bool
}
pub async fn get_qmdl(State(state): State<Arc<ServerState>>, Path(qmdl_name): Path<String>) -> Result<Response, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let entry = qmdl_store.entry_for_name(&qmdl_name)
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_name)
.ok_or((StatusCode::NOT_FOUND, format!("couldn't find qmdl file with name {}", qmdl_name)))?;
let qmdl_file = qmdl_store.open_entry_qmdl(&entry).await
let qmdl_file = qmdl_store.open_entry_qmdl(entry_index).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("error opening QMDL file: {}", e)))?;
let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64);
let qmdl_stream = ReaderStream::new(limited_qmdl_file);
@@ -38,10 +42,39 @@ pub async fn get_qmdl(State(state): State<Arc<ServerState>>, Path(qmdl_name): Pa
// Bundles the server's static files (html/css/js) into the binary for easy distribution
static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/static");
pub async fn serve_static(Path(path): Path<String>) -> impl IntoResponse {
pub async fn serve_static(State(state): State<Arc<ServerState>>, Path(path): Path<String>) -> impl IntoResponse {
let path = path.trim_start_matches('/');
let mime_type = mime_guess::from_path(path).first_or_text_plain();
// if we're in debug mode, return the files from the build directory so we
// don't have to rebuild every time the JS/HTML change
if state.debug_mode {
let mut build_path = std::path::PathBuf::new();
build_path.push("bin");
build_path.push("static");
for part in path.split("/") {
build_path.push(part);
}
return match File::open(build_path).await {
Ok(mut file) => {
let mut body = String::new();
file.read_to_string(&mut body).await.expect("failed to read file");
Response::builder()
.status(StatusCode::OK)
.header(
header::CONTENT_TYPE,
HeaderValue::from_str(mime_type.as_ref()).unwrap(),
)
.body(Body::from(body))
.unwrap()
},
Err(_) => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap()
};
}
match STATIC_DIR.get_file(path) {
None => Response::builder()
.status(StatusCode::NOT_FOUND)