diff --git a/.github/workflows/build-release.yml b/.github/workflows/build-release.yml index b337f9d..cb44dd9 100644 --- a/.github/workflows/build-release.yml +++ b/.github/workflows/build-release.yml @@ -8,7 +8,7 @@ env: CARGO_TERM_COLOR: always jobs: - build_serial: + build_serial_and_check: strategy: matrix: platform: @@ -28,6 +28,15 @@ jobs: name: serial-${{ matrix.platform.os }} path: ./target/release/${{ matrix.platform.build_name }} if-no-files-found: error + steps: + - uses: actions/checkout@v4 + - name: Build check + run: cargo build --bin rayhunter-check --release + - uses: actions/upload-artifact@v4 + with: + name: rayhunter-check-${{ matrix.platform.os }} + path: ./target/release/${{ matrix.platform.build_name }} + if-no-files-found: error build_rootshell_and_rayhunter: runs-on: ubuntu-latest steps: @@ -63,7 +72,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/download-artifact@v4 - name: Fix executable permissions on binaries - run: chmod +x serial-*/serial rayhunter-daemon/rayhunter-daemon + run: chmod +x serial-*/serial rayhunter-check-*/rayhunter-check rayhunter-daemon/rayhunter-daemon - name: Setup release directory run: mv rayhunter-daemon/rayhunter-daemon rootshell/rootshell serial-* dist - name: Archive release directory diff --git a/bin/src/analysis.rs b/bin/src/analysis.rs new file mode 100644 index 0000000..4cdc52e --- /dev/null +++ b/bin/src/analysis.rs @@ -0,0 +1,240 @@ +use std::sync::Arc; +use std::{future, pin}; + +use axum::Json; +use axum::{ + extract::{Path, State}, + http::StatusCode, +}; +use futures::TryStreamExt; +use log::{debug, error, info}; +use rayhunter::analysis::analyzer::Harness; +use rayhunter::diag::{DataType, MessagesContainer}; +use rayhunter::qmdl::QmdlReader; +use serde::Serialize; +use tokio::fs::File; +use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{RwLock, RwLockWriteGuard}; +use tokio_util::task::TaskTracker; + +use crate::qmdl_store::RecordingStore; +use crate::server::ServerState; + +pub struct AnalysisWriter { + writer: BufWriter, + harness: Harness, + bytes_written: usize, +} + +// We write our analysis results to a file immediately to minimize the amount of +// state Rayhunter has to keep track of in memory. The analysis file's format is +// Newline Delimited JSON +// (https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson), which +// lets us simply append new rows to the end without parsing the entire JSON +// object beforehand. +impl AnalysisWriter { + pub async fn new(file: File) -> Result { + let mut result = Self { + writer: BufWriter::new(file), + harness: Harness::new_with_all_analyzers(), + bytes_written: 0, + }; + let metadata = result.harness.get_metadata(); + result.write(&metadata).await?; + Ok(result) + } + + // Runs the analysis harness on the given container, serializing the results + // to the analysis file and returning the file's new length. + pub async fn analyze(&mut self, container: MessagesContainer) -> Result { + let row = self.harness.analyze_qmdl_messages(container); + if !row.is_empty() { + self.write(&row).await?; + } + Ok(self.bytes_written) + } + + async fn write(&mut self, value: &T) -> Result<(), std::io::Error> { + let mut value_str = serde_json::to_string(value).unwrap(); + value_str.push('\n'); + self.bytes_written += value_str.len(); + self.writer.write_all(value_str.as_bytes()).await?; + self.writer.flush().await?; + Ok(()) + } + + // Flushes any pending I/O to disk before dropping the writer + pub async fn close(mut self) -> Result<(), std::io::Error> { + self.writer.flush().await?; + Ok(()) + } +} + +#[derive(Debug, Serialize, Clone, Default)] +pub struct AnalysisStatus { + queued: Vec, + running: Option, +} + +pub enum AnalysisCtrlMessage { + NewFilesQueued, + Exit, +} + +async fn queued_len(analysis_status_lock: Arc>) -> usize { + analysis_status_lock.read().await.queued.len() +} + +async fn dequeue_to_running(analysis_status_lock: Arc>) -> String { + let mut analysis_status = analysis_status_lock.write().await; + let name = analysis_status.queued.remove(0); + assert!(analysis_status.running.is_none()); + analysis_status.running = Some(name.clone()); + name +} + +async fn clear_running(analysis_status_lock: Arc>) { + let mut analysis_status = analysis_status_lock.write().await; + analysis_status.running = None; +} + +async fn perform_analysis( + name: &str, + qmdl_store_lock: Arc>, +) -> Result<(), String> { + info!("Opening QMDL and analysis file for {}...", name); + let (analysis_file, qmdl_file, entry_index) = { + let mut qmdl_store = qmdl_store_lock.write().await; + let (entry_index, _) = qmdl_store + .entry_for_name(&name) + .ok_or(format!("failed to find QMDL store entry for {}", name))?; + let analysis_file = qmdl_store + .clear_and_open_entry_analysis(entry_index) + .await + .map_err(|e| format!("{:?}", e))?; + let qmdl_file = qmdl_store + .open_entry_qmdl(entry_index) + .await + .map_err(|e| format!("{:?}", e))?; + + (analysis_file, qmdl_file, entry_index) + }; + + let mut analysis_writer = AnalysisWriter::new(analysis_file) + .await + .map_err(|e| format!("{:?}", e))?; + let file_size = qmdl_file + .metadata() + .await + .expect("failed to get QMDL file metadata") + .len(); + let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize)); + let mut qmdl_stream = pin::pin!(qmdl_reader + .as_stream() + .try_filter(|container| future::ready(container.data_type == DataType::UserSpace))); + + info!("Starting analysis for {}...", name); + while let Some(container) = qmdl_stream + .try_next() + .await + .expect("failed getting QMDL container") + { + let size_bytes = analysis_writer + .analyze(container) + .await + .map_err(|e| format!("{:?}", e))?; + debug!("{} analysis: {} bytes written", name, size_bytes); + let mut qmdl_store = qmdl_store_lock.write().await; + qmdl_store + .update_entry_analysis_size(entry_index, size_bytes) + .await + .map_err(|e| format!("{:?}", e))?; + } + + analysis_writer + .close() + .await + .map_err(|e| format!("{:?}", e))?; + info!("Analysis for {} complete!", name); + + Ok(()) +} + +pub fn run_analysis_thread( + task_tracker: &TaskTracker, + mut analysis_rx: Receiver, + qmdl_store_lock: Arc>, + analysis_status_lock: Arc>, +) { + task_tracker.spawn(async move { + loop { + match analysis_rx.recv().await { + Some(AnalysisCtrlMessage::NewFilesQueued) => { + let count = queued_len(analysis_status_lock.clone()).await; + for _ in 0..count { + let name = dequeue_to_running(analysis_status_lock.clone()).await; + if let Err(err) = perform_analysis(&name, qmdl_store_lock.clone()).await { + error!("failed to analyze {}: {}", name, err); + } + clear_running(analysis_status_lock.clone()).await; + } + } + Some(AnalysisCtrlMessage::Exit) | None => return, + } + } + }); +} + +pub async fn get_analysis_status( + State(state): State>, +) -> Result, (StatusCode, String)> { + Ok(Json(state.analysis_status_lock.read().await.clone())) +} + +fn queue_qmdl(name: &str, analysis_status: &mut RwLockWriteGuard) -> bool { + if analysis_status.queued.iter().any(|n| n == name) + || analysis_status.running.iter().any(|n| n == name) + { + return false; + } + analysis_status.queued.push(name.to_string()); + true +} + +pub async fn start_analysis( + State(state): State>, + Path(qmdl_name): Path, +) -> Result<(StatusCode, Json), (StatusCode, String)> { + let mut analysis_status = state.analysis_status_lock.write().await; + let store = state.qmdl_store_lock.read().await; + let queued = if qmdl_name.is_empty() { + let mut entry_names: Vec<&str> = store + .manifest + .entries + .iter() + .map(|e| e.name.as_str()) + .collect(); + if let Some(current_entry) = store.current_entry { + entry_names.remove(current_entry); + } + entry_names + .iter() + .any(|name| queue_qmdl(name, &mut analysis_status)) + } else { + queue_qmdl(&qmdl_name, &mut analysis_status) + }; + if queued { + state + .analysis_sender + .send(AnalysisCtrlMessage::NewFilesQueued) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to queue new analysis files: {:?}", e), + ) + })?; + } + Ok((StatusCode::ACCEPTED, Json(analysis_status.clone()))) +} diff --git a/bin/src/check.rs b/bin/src/check.rs index ebc210b..90fa6b2 100644 --- a/bin/src/check.rs +++ b/bin/src/check.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, future, path::PathBuf, pin::pin}; use rayhunter::{analysis::analyzer::Harness, diag::DataType, qmdl::QmdlReader}; -use tokio::fs::File; +use tokio::fs::{metadata, read_dir, File}; use clap::Parser; use futures::TryStreamExt; @@ -9,24 +9,17 @@ use futures::TryStreamExt; struct Args { #[arg(short, long)] qmdl_path: PathBuf, + + #[arg(long)] + show_skipped: bool, } -#[tokio::main] -async fn main() { - env_logger::init(); - let args = Args::parse(); - - let mut harness = Harness::new_with_all_analyzers(); - - let qmdl_file = File::open(args.qmdl_path).await.expect("failed to open QMDL file"); +async fn analyze_file(harness: &mut Harness, qmdl_path: &str, show_skipped: bool) { + let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file"); let file_size = qmdl_file.metadata().await.expect("failed to get QMDL file metadata").len(); let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize)); let mut qmdl_stream = pin!(qmdl_reader.as_stream() .try_filter(|container| future::ready(container.data_type == DataType::UserSpace))); - println!("Analyzers:"); - for analyzer in harness.get_metadata().analyzers { - println!(" - {}: {}", analyzer.name, analyzer.description); - } let mut skipped_reasons: HashMap = HashMap::new(); let mut total_messages = 0; let mut warnings = 0; @@ -47,11 +40,37 @@ async fn main() { } } } - if skipped > 0 { - println!("Messages skipped:"); + if show_skipped && skipped > 0 { + println!("{}: messages skipped:", qmdl_path); for (reason, count) in skipped_reasons.iter() { println!(" - {}: \"{}\"", count, reason); } } - println!("{} messages analyzed, {} warnings, {} messages skipped", total_messages, warnings, skipped); + println!("{}: {} messages analyzed, {} warnings, {} messages skipped", qmdl_path, total_messages, warnings, skipped); +} + +#[tokio::main] +async fn main() { + env_logger::init(); + let args = Args::parse(); + + let mut harness = Harness::new_with_all_analyzers(); + println!("Analyzers:"); + for analyzer in harness.get_metadata().analyzers { + println!(" - {}: {}", analyzer.name, analyzer.description); + } + + let metadata = metadata(&args.qmdl_path).await.expect("failed to get metadata"); + if metadata.is_dir() { + let mut dir = read_dir(&args.qmdl_path).await.expect("failed to read dir"); + while let Some(entry) = dir.next_entry().await.expect("failed to get entry") { + let name = entry.file_name(); + let name_str = name.to_str().unwrap(); + if name_str.ends_with(".qmdl") { + analyze_file(&mut harness, entry.path().to_str().unwrap(), args.show_skipped).await; + } + } + } else { + analyze_file(&mut harness, args.qmdl_path.to_str().unwrap(), args.show_skipped).await; + } } diff --git a/bin/src/config.rs b/bin/src/config.rs index c554b02..70557a1 100644 --- a/bin/src/config.rs +++ b/bin/src/config.rs @@ -6,7 +6,7 @@ use serde::Deserialize; struct ConfigFile { qmdl_store_path: Option, port: Option, - readonly_mode: Option, + debug_mode: Option, ui_level: Option, } @@ -14,7 +14,7 @@ struct ConfigFile { pub struct Config { pub qmdl_store_path: String, pub port: u16, - pub readonly_mode: bool, + pub debug_mode: bool, pub ui_level: u8, } @@ -23,7 +23,7 @@ impl Default for Config { Config { qmdl_store_path: "/data/rayhunter/qmdl".to_string(), port: 8080, - readonly_mode: false, + debug_mode: false, ui_level: 1, } } @@ -36,7 +36,7 @@ pub fn parse_config

(path: P) -> Result where P: AsRef .map_err(RayhunterError::ConfigFileParsingError)?; if let Some(path) = parsed_config.qmdl_store_path { config.qmdl_store_path = path } if let Some(port) = parsed_config.port { config.port = port } - if let Some(readonly_mode) = parsed_config.readonly_mode { config.readonly_mode = readonly_mode } + if let Some(debug_mode) = parsed_config.debug_mode { config.debug_mode = debug_mode } if let Some(ui_level) = parsed_config.ui_level { config.ui_level = ui_level } } Ok(config) diff --git a/bin/src/daemon.rs b/bin/src/daemon.rs index 1711778..a6e572e 100644 --- a/bin/src/daemon.rs +++ b/bin/src/daemon.rs @@ -1,3 +1,4 @@ +mod analysis; mod config; mod error; mod pcap; @@ -16,6 +17,7 @@ use crate::stats::get_system_stats; use crate::error::RayhunterError; use crate::framebuffer::Framebuffer; +use analysis::{get_analysis_status, run_analysis_thread, start_analysis, AnalysisCtrlMessage, AnalysisStatus}; use axum::response::Redirect; use diag::{get_analysis_report, start_recording, stop_recording, DiagDeviceCtrlMessage}; use log::{info, error}; @@ -44,14 +46,18 @@ async fn run_server( qmdl_store_lock: Arc>, server_shutdown_rx: oneshot::Receiver<()>, ui_update_tx: Sender, - diag_device_sender: Sender + diag_device_sender: Sender, + analysis_sender: Sender, + analysis_status_lock: Arc>, ) -> JoinHandle<()> { info!("spinning up server"); let state = Arc::new(ServerState { qmdl_store_lock, diag_device_ctrl_sender: diag_device_sender, ui_update_sender: ui_update_tx, - readonly_mode: config.readonly_mode + debug_mode: config.debug_mode, + analysis_status_lock, + analysis_sender, }); let app = Router::new() @@ -61,7 +67,9 @@ async fn run_server( .route("/api/qmdl-manifest", get(get_qmdl_manifest)) .route("/api/start-recording", post(start_recording)) .route("/api/stop-recording", post(stop_recording)) - .route("/api/analysis-report", get(get_analysis_report)) + .route("/api/analysis-report/*name", get(get_analysis_report)) + .route("/api/analysis", get(get_analysis_status)) + .route("/api/analysis/*name", post(start_analysis)) .route("/", get(|| async { Redirect::permanent("/index.html") })) .route("/*path", get(serve_static)) .with_state(state); @@ -81,12 +89,12 @@ async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) { } // Loads a QmdlStore if one exists, and if not, only create one if we're not in -// readonly mode. +// debug mode. async fn init_qmdl_store(config: &config::Config) -> Result { - match (RecordingStore::exists(&config.qmdl_store_path).await?, config.readonly_mode) { + match (RecordingStore::exists(&config.qmdl_store_path).await?, config.debug_mode) { (true, _) => Ok(RecordingStore::load(&config.qmdl_store_path).await?), (false, false) => Ok(RecordingStore::create(&config.qmdl_store_path).await?), - (false, true) => Err(RayhunterError::NoStoreReadonlyMode(config.qmdl_store_path.clone())), + (false, true) => Err(RayhunterError::NoStoreDebugMode(config.qmdl_store_path.clone())), } } @@ -97,8 +105,9 @@ fn run_ctrl_c_thread( task_tracker: &TaskTracker, diag_device_sender: Sender, server_shutdown_tx: oneshot::Sender<()>, - ui_shutdown_tx: oneshot::Sender<()>, - qmdl_store_lock: Arc> + maybe_ui_shutdown_tx: Option>, + qmdl_store_lock: Arc>, + analysis_tx: Sender, ) -> JoinHandle> { task_tracker.spawn(async move { match tokio::signal::ctrl_c().await { @@ -113,10 +122,14 @@ fn run_ctrl_c_thread( server_shutdown_tx.send(()) .expect("couldn't send server shutdown signal"); info!("sending UI shutdown"); - ui_shutdown_tx.send(()) - .expect("couldn't send ui shutdown signal"); + if let Some(ui_shutdown_tx) = maybe_ui_shutdown_tx { + ui_shutdown_tx.send(()) + .expect("couldn't send ui shutdown signal"); + } diag_device_sender.send(DiagDeviceCtrlMessage::Exit).await .expect("couldn't send Exit message to diag thread"); + analysis_tx.send(AnalysisCtrlMessage::Exit).await + .expect("couldn't send Exit message to analysis thread"); }, Err(err) => { error!("Unable to listen for shutdown signal: {}", err); @@ -151,8 +164,7 @@ fn update_ui(task_tracker: &TaskTracker, config: &config::Config, mut ui_shutdo break; }, Err(TryRecvError::Empty) => {}, - Err(e) => error!("error receiving shutdown message: {e}") - + Err(e) => panic!("error receiving shutdown message: {e}") } match ui_update_rx.try_recv() { Ok(state) => { @@ -161,7 +173,7 @@ fn update_ui(task_tracker: &TaskTracker, config: &config::Config, mut ui_shutdo Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}, Err(e) => error!("error receiving framebuffer update message: {e}") } - + match display_level { 2 => { fb.draw_gif(img.unwrap()); @@ -200,8 +212,11 @@ async fn main() -> Result<(), RayhunterError> { let qmdl_store_lock = Arc::new(RwLock::new(init_qmdl_store(&config).await?)); let (tx, rx) = mpsc::channel::(1); let (ui_update_tx, ui_update_rx) = mpsc::channel::(1); - let (ui_shutdown_tx, ui_shutdown_rx) = oneshot::channel(); - if !config.readonly_mode { + let (analysis_tx, analysis_rx) = mpsc::channel::(5); + let mut maybe_ui_shutdown_tx = None; + if !config.debug_mode { + let (ui_shutdown_tx, ui_shutdown_rx) = oneshot::channel(); + maybe_ui_shutdown_tx = Some(ui_shutdown_tx); let mut dev = DiagDevice::new().await .map_err(RayhunterError::DiagInitError)?; dev.config_logs().await @@ -214,8 +229,10 @@ async fn main() -> Result<(), RayhunterError> { } let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>(); info!("create shutdown thread"); - run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, ui_shutdown_tx, qmdl_store_lock.clone()); - run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, ui_update_tx, tx).await; + let analysis_status_lock = Arc::new(RwLock::new(AnalysisStatus::default())); + run_analysis_thread(&task_tracker, analysis_rx, qmdl_store_lock.clone(), analysis_status_lock.clone()); + run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, maybe_ui_shutdown_tx, qmdl_store_lock.clone(), analysis_tx.clone()); + run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, ui_update_tx, tx, analysis_tx, analysis_status_lock).await; task_tracker.close(); task_tracker.wait().await; diff --git a/bin/src/diag.rs b/bin/src/diag.rs index 564679a..d4680a6 100644 --- a/bin/src/diag.rs +++ b/bin/src/diag.rs @@ -2,7 +2,7 @@ use std::pin::pin; use std::sync::Arc; use axum::body::Body; -use axum::extract::State; +use axum::extract::{Path, State}; use axum::http::header::CONTENT_TYPE; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; @@ -15,7 +15,8 @@ use tokio::sync::mpsc::{Receiver, Sender}; use rayhunter::qmdl::QmdlWriter; use log::{debug, error, info}; use tokio::fs::File; -use tokio::io::{BufWriter, AsyncWriteExt}; +use tokio::io::BufWriter; +use tokio::io::AsyncWriteExt; use tokio_util::io::ReaderStream; use tokio_util::task::TaskTracker; use futures::{StreamExt, TryStreamExt}; @@ -171,8 +172,8 @@ pub fn run_diag_read_thread( } pub async fn start_recording(State(state): State>) -> Result<(StatusCode, String), (StatusCode, String)> { - if state.readonly_mode { - return Err((StatusCode::FORBIDDEN, "server is in readonly mode".to_string())); + if state.debug_mode { + return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string())); } let mut qmdl_store = state.qmdl_store_lock.write().await; let (qmdl_file, analysis_file) = qmdl_store.new_entry().await @@ -186,8 +187,8 @@ pub async fn start_recording(State(state): State>) -> Result<(S } pub async fn stop_recording(State(state): State>) -> Result<(StatusCode, String), (StatusCode, String)> { - if state.readonly_mode { - return Err((StatusCode::FORBIDDEN, "server is in readonly mode".to_string())); + if state.debug_mode { + return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string())); } let mut qmdl_store = state.qmdl_store_lock.write().await; qmdl_store.close_current_entry().await @@ -199,15 +200,20 @@ pub async fn stop_recording(State(state): State>) -> Result<(St Ok((StatusCode::ACCEPTED, "ok".to_string())) } -pub async fn get_analysis_report(State(state): State>) -> Result { +pub async fn get_analysis_report(State(state): State>, Path(qmdl_name): Path) -> Result { let qmdl_store = state.qmdl_store_lock.read().await; - let Some(entry) = qmdl_store.get_current_entry() else { - return Err(( + let (entry_index, _) = if qmdl_name == "live" { + qmdl_store.get_current_entry().ok_or(( StatusCode::SERVICE_UNAVAILABLE, "No QMDL data's being recorded to analyze, try starting a new recording!".to_string() - )); + ))? + } else { + qmdl_store.entry_for_name(&qmdl_name).ok_or(( + StatusCode::NOT_FOUND, + format!("Couldn't find QMDL entry with name \"{}\"", qmdl_name) + ))? }; - let analysis_file = qmdl_store.open_entry_analysis(entry).await + let analysis_file = qmdl_store.open_entry_analysis(entry_index).await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?; let analysis_stream = ReaderStream::new(analysis_file); diff --git a/bin/src/error.rs b/bin/src/error.rs index 6942ed1..2983256 100644 --- a/bin/src/error.rs +++ b/bin/src/error.rs @@ -13,6 +13,6 @@ pub enum RayhunterError{ TokioError(#[from] tokio::io::Error), #[error("QmdlStore error: {0}")] QmdlStoreError(#[from] RecordingStoreError), - #[error("No QMDL store found at path {0}, but can't create a new one due to readonly mode")] - NoStoreReadonlyMode(String), + #[error("No QMDL store found at path {0}, but can't create a new one due to debug mode")] + NoStoreDebugMode(String), } diff --git a/bin/src/pcap.rs b/bin/src/pcap.rs index 0a0b46c..4f7395c 100644 --- a/bin/src/pcap.rs +++ b/bin/src/pcap.rs @@ -21,7 +21,7 @@ use futures::TryStreamExt; // pcap data to a channel that's piped to the client. pub async fn get_pcap(State(state): State>, Path(qmdl_name): Path) -> Result { let qmdl_store = state.qmdl_store_lock.read().await; - let entry = qmdl_store.entry_for_name(&qmdl_name) + let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_name) .ok_or((StatusCode::NOT_FOUND, format!("couldn't find qmdl file with name {}", qmdl_name)))?; if entry.qmdl_size_bytes == 0 { return Err(( @@ -29,8 +29,8 @@ pub async fn get_pcap(State(state): State>, Path(qmdl_name): Pa "QMDL file is empty, try again in a bit!".to_string() )); } - - let qmdl_file = qmdl_store.open_entry_qmdl(&entry).await + let qmdl_size_bytes = entry.qmdl_size_bytes; + let qmdl_file = qmdl_store.open_entry_qmdl(entry_index).await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?; // the QMDL reader should stop at the last successfully written data chunk // (entry.size_bytes) @@ -39,10 +39,10 @@ pub async fn get_pcap(State(state): State>, Path(qmdl_name): Pa pcap_writer.write_iface_header().await.unwrap(); tokio::spawn(async move { - let mut reader = QmdlReader::new(qmdl_file, Some(entry.qmdl_size_bytes)); + let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes)); let mut messages_stream = pin!(reader.as_stream() .try_filter(|container| future::ready(container.data_type == DataType::UserSpace))); - + while let Some(container) = messages_stream.try_next().await.expect("failed getting QMDL container") { for maybe_msg in container.into_messages() { match maybe_msg { diff --git a/bin/src/qmdl_store.rs b/bin/src/qmdl_store.rs index accd7ce..11c6eda 100644 --- a/bin/src/qmdl_store.rs +++ b/bin/src/qmdl_store.rs @@ -1,8 +1,11 @@ -use std::path::{PathBuf, Path}; -use thiserror::Error; -use tokio::{fs::{self, File, try_exists}, io::AsyncWriteExt}; -use serde::{Deserialize, Serialize}; use chrono::{DateTime, Local}; +use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; +use thiserror::Error; +use tokio::{ + fs::{self, try_exists, File, OpenOptions}, + io::AsyncWriteExt, +}; #[derive(Debug, Error)] pub enum RecordingStoreError { @@ -19,7 +22,7 @@ pub enum RecordingStoreError { #[error("Couldn't write manifest file: {0}")] WriteManifestError(tokio::io::Error), #[error("Couldn't parse QMDL store manifest file: {0}")] - ParseManifestError(toml::de::Error) + ParseManifestError(toml::de::Error), } pub struct RecordingStore { @@ -70,16 +73,26 @@ impl ManifestEntry { impl RecordingStore { // Returns whether a directory with a "manifest.toml" exists at the given // path (though doesn't check if that manifest is valid) - pub async fn exists

(path: P) -> Result where P: AsRef { + pub async fn exists

(path: P) -> Result + where + P: AsRef, + { let manifest_path = path.as_ref().join("manifest.toml"); - let dir_exists = try_exists(path).await.map_err(RecordingStoreError::OpenDirError)?; - let manifest_exists = try_exists(manifest_path).await.map_err(RecordingStoreError::ReadManifestError)?; + let dir_exists = try_exists(path) + .await + .map_err(RecordingStoreError::OpenDirError)?; + let manifest_exists = try_exists(manifest_path) + .await + .map_err(RecordingStoreError::ReadManifestError)?; Ok(dir_exists && manifest_exists) } // Loads an existing RecordingStore at the given path. Errors if no store exists, // or if it's malformed. - pub async fn load

(path: P) -> Result where P: AsRef { + pub async fn load

(path: P) -> Result + where + P: AsRef, + { let path: PathBuf = path.as_ref().to_path_buf(); let manifest = RecordingStore::read_manifest(&path).await?; Ok(RecordingStore { @@ -91,26 +104,38 @@ impl RecordingStore { // Creates a new RecordingStore at the given path. This involves creating a dir // and writing an empty manifest. - pub async fn create

(path: P) -> Result where P: AsRef { + pub async fn create

(path: P) -> Result + where + P: AsRef, + { let manifest_path = path.as_ref().join("manifest.toml"); - fs::create_dir_all(&path).await + fs::create_dir_all(&path) + .await .map_err(RecordingStoreError::OpenDirError)?; - let mut manifest_file = File::create(&manifest_path).await + let mut manifest_file = File::create(&manifest_path) + .await .map_err(RecordingStoreError::WriteManifestError)?; - let empty_manifest = Manifest { entries: Vec::new() }; - let empty_manifest_contents = toml::to_string_pretty(&empty_manifest) - .expect("failed to serialize manifest"); - manifest_file.write_all(empty_manifest_contents.as_bytes()).await + let empty_manifest = Manifest { + entries: Vec::new(), + }; + let empty_manifest_contents = + toml::to_string_pretty(&empty_manifest).expect("failed to serialize manifest"); + manifest_file + .write_all(empty_manifest_contents.as_bytes()) + .await .map_err(RecordingStoreError::WriteManifestError)?; RecordingStore::load(path).await } - async fn read_manifest

(path: P) -> Result where P: AsRef { + async fn read_manifest

(path: P) -> Result + where + P: AsRef, + { let manifest_path = path.as_ref().join("manifest.toml"); - let file_contents = fs::read_to_string(&manifest_path).await + let file_contents = fs::read_to_string(&manifest_path) + .await .map_err(RecordingStoreError::ReadManifestError)?; - toml::from_str(&file_contents) - .map_err(RecordingStoreError::ParseManifestError) + toml::from_str(&file_contents).map_err(RecordingStoreError::ParseManifestError) } // Closes the current entry (if needed), creates a new entry based on the @@ -126,13 +151,15 @@ impl RecordingStore { let qmdl_file = File::options() .create(true) .write(true) - .open(&qmdl_filepath).await + .open(&qmdl_filepath) + .await .map_err(RecordingStoreError::CreateFileError)?; let analysis_filepath = new_entry.get_analysis_filepath(&self.path); let analysis_file = File::options() .create(true) .write(true) - .open(&analysis_filepath).await + .open(&analysis_filepath) + .await .map_err(RecordingStoreError::CreateFileError)?; self.manifest.entries.push(new_entry); self.current_entry = Some(self.manifest.entries.len() - 1); @@ -141,37 +168,71 @@ impl RecordingStore { } // Returns the corresponding QMDL file for a given entry - pub async fn open_entry_qmdl(&self, entry: &ManifestEntry) -> Result { - File::open(entry.get_qmdl_filepath(&self.path)).await + pub async fn open_entry_qmdl( + &self, + entry_index: usize, + ) -> Result { + let entry = &self.manifest.entries[entry_index]; + File::open(entry.get_qmdl_filepath(&self.path)) + .await .map_err(RecordingStoreError::ReadFileError) } // Returns the corresponding QMDL file for a given entry - pub async fn open_entry_analysis(&self, entry: &ManifestEntry) -> Result { - File::open(entry.get_analysis_filepath(&self.path)).await + pub async fn open_entry_analysis( + &self, + entry_index: usize, + ) -> Result { + let entry = &self.manifest.entries[entry_index]; + File::open(entry.get_analysis_filepath(&self.path)) + .await .map_err(RecordingStoreError::ReadFileError) } + pub async fn clear_and_open_entry_analysis( + &mut self, + entry_index: usize, + ) -> Result { + let entry = &self.manifest.entries[entry_index]; + let file = OpenOptions::new() + .write(true) + .truncate(true) + .open(entry.get_analysis_filepath(&self.path)) + .await + .map_err(RecordingStoreError::ReadFileError)?; + self.update_entry_analysis_size(entry_index, 0) + .await?; + Ok(file) + } + // Unsets the current entry pub async fn close_current_entry(&mut self) -> Result<(), RecordingStoreError> { match self.current_entry { Some(_) => { self.current_entry = None; Ok(()) - }, - None => Err(RecordingStoreError::NoCurrentEntry) + } + None => Err(RecordingStoreError::NoCurrentEntry), } } // Sets the given entry's size and updates the last_message_time to now, updating the manifest - pub async fn update_entry_qmdl_size(&mut self, entry_index: usize, size_bytes: usize) -> Result<(), RecordingStoreError> { + pub async fn update_entry_qmdl_size( + &mut self, + entry_index: usize, + size_bytes: usize, + ) -> Result<(), RecordingStoreError> { self.manifest.entries[entry_index].qmdl_size_bytes = size_bytes; self.manifest.entries[entry_index].last_message_time = Some(Local::now()); self.write_manifest().await } // Sets the given entry's analysis file size - pub async fn update_entry_analysis_size(&mut self, entry_index: usize, size_bytes: usize) -> Result<(), RecordingStoreError> { + pub async fn update_entry_analysis_size( + &mut self, + entry_index: usize, + size_bytes: usize, + ) -> Result<(), RecordingStoreError> { self.manifest.entries[entry_index].analysis_size_bytes = size_bytes; self.write_manifest().await } @@ -179,32 +240,37 @@ impl RecordingStore { async fn write_manifest(&mut self) -> Result<(), RecordingStoreError> { let mut manifest_file = File::options() .write(true) - .open(self.path.join("manifest.toml")).await + .open(self.path.join("manifest.toml")) + .await .map_err(RecordingStoreError::WriteManifestError)?; - let manifest_contents = toml::to_string_pretty(&self.manifest) - .expect("failed to serialize manifest"); - manifest_file.write_all(manifest_contents.as_bytes()).await + let manifest_contents = + toml::to_string_pretty(&self.manifest).expect("failed to serialize manifest"); + manifest_file + .write_all(manifest_contents.as_bytes()) + .await .map_err(RecordingStoreError::WriteManifestError)?; Ok(()) } // Finds an entry by filename - pub fn entry_for_name(&self, name: &str) -> Option { - self.manifest.entries.iter() - .find(|entry| entry.name == name) - .cloned() + pub fn entry_for_name(&self, name: &str) -> Option<(usize, &ManifestEntry)> { + let entry_index = self.manifest + .entries + .iter() + .position(|entry| entry.name == name)?; + Some((entry_index, &self.manifest.entries[entry_index])) } - pub fn get_current_entry(&self) -> Option<&ManifestEntry> { + pub fn get_current_entry(&self) -> Option<(usize, &ManifestEntry)> { let entry_index = self.current_entry?; - self.manifest.entries.get(entry_index) + Some((entry_index, &self.manifest.entries[entry_index])) } } #[cfg(test)] mod tests { - use tempfile::{TempDir, Builder}; use super::*; + use tempfile::{Builder, TempDir}; fn make_temp_dir() -> TempDir { Builder::new().prefix("qmdl_store_test").tempdir().unwrap() @@ -226,17 +292,33 @@ mod tests { let mut store = RecordingStore::create(dir.path()).await.unwrap(); let _ = store.new_entry().await.unwrap(); let entry_index = store.current_entry.unwrap(); - assert_eq!(RecordingStore::read_manifest(dir.path()).await.unwrap(), store.manifest); - assert!(store.manifest.entries[entry_index].last_message_time.is_none()); + assert_eq!( + RecordingStore::read_manifest(dir.path()).await.unwrap(), + store.manifest + ); + assert!(store.manifest.entries[entry_index] + .last_message_time + .is_none()); - store.update_entry_qmdl_size(entry_index, 1000).await.unwrap(); - let entry = store.entry_for_name(&store.manifest.entries[entry_index].name).unwrap(); + store + .update_entry_qmdl_size(entry_index, 1000) + .await + .unwrap(); + let (entry_index, entry) = store + .entry_for_name(&store.manifest.entries[entry_index].name) + .unwrap(); assert!(entry.last_message_time.is_some()); assert_eq!(store.manifest.entries[entry_index].qmdl_size_bytes, 1000); - assert_eq!(RecordingStore::read_manifest(dir.path()).await.unwrap(), store.manifest); + assert_eq!( + RecordingStore::read_manifest(dir.path()).await.unwrap(), + store.manifest + ); store.close_current_entry().await.unwrap(); - assert!(matches!(store.close_current_entry().await, Err(RecordingStoreError::NoCurrentEntry))); + assert!(matches!( + store.close_current_entry().await, + Err(RecordingStoreError::NoCurrentEntry) + )); } #[tokio::test] diff --git a/bin/src/server.rs b/bin/src/server.rs index 028df21..3f9f481 100644 --- a/bin/src/server.rs +++ b/bin/src/server.rs @@ -4,6 +4,7 @@ use axum::extract::State; use axum::http::{StatusCode, HeaderValue}; use axum::response::{Response, IntoResponse}; use axum::extract::Path; +use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::sync::mpsc::Sender; use std::sync::Arc; @@ -12,20 +13,23 @@ use tokio_util::io::ReaderStream; use include_dir::{include_dir, Dir}; use crate::{framebuffer, DiagDeviceCtrlMessage}; +use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus}; use crate::qmdl_store::RecordingStore; pub struct ServerState { pub qmdl_store_lock: Arc>, pub diag_device_ctrl_sender: Sender, pub ui_update_sender: Sender, - pub readonly_mode: bool + pub analysis_status_lock: Arc>, + pub analysis_sender: Sender, + pub debug_mode: bool } pub async fn get_qmdl(State(state): State>, Path(qmdl_name): Path) -> Result { let qmdl_store = state.qmdl_store_lock.read().await; - let entry = qmdl_store.entry_for_name(&qmdl_name) + let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_name) .ok_or((StatusCode::NOT_FOUND, format!("couldn't find qmdl file with name {}", qmdl_name)))?; - let qmdl_file = qmdl_store.open_entry_qmdl(&entry).await + let qmdl_file = qmdl_store.open_entry_qmdl(entry_index).await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("error opening QMDL file: {}", e)))?; let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64); let qmdl_stream = ReaderStream::new(limited_qmdl_file); @@ -38,10 +42,39 @@ pub async fn get_qmdl(State(state): State>, Path(qmdl_name): Pa // Bundles the server's static files (html/css/js) into the binary for easy distribution static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/static"); -pub async fn serve_static(Path(path): Path) -> impl IntoResponse { +pub async fn serve_static(State(state): State>, Path(path): Path) -> impl IntoResponse { let path = path.trim_start_matches('/'); let mime_type = mime_guess::from_path(path).first_or_text_plain(); + // if we're in debug mode, return the files from the build directory so we + // don't have to rebuild every time the JS/HTML change + if state.debug_mode { + let mut build_path = std::path::PathBuf::new(); + build_path.push("bin"); + build_path.push("static"); + for part in path.split("/") { + build_path.push(part); + } + return match File::open(build_path).await { + Ok(mut file) => { + let mut body = String::new(); + file.read_to_string(&mut body).await.expect("failed to read file"); + Response::builder() + .status(StatusCode::OK) + .header( + header::CONTENT_TYPE, + HeaderValue::from_str(mime_type.as_ref()).unwrap(), + ) + .body(Body::from(body)) + .unwrap() + }, + Err(_) => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap() + }; + } + match STATIC_DIR.get_file(path) { None => Response::builder() .status(StatusCode::NOT_FOUND) diff --git a/bin/static/index.html b/bin/static/index.html index 878f79d..c14e7ea 100644 --- a/bin/static/index.html +++ b/bin/static/index.html @@ -27,15 +27,16 @@ Size (bytes) PCAP QMDL + Analysis Result

-

System stats

+

Live System stats

Loading...
-

Analysis Report

+

Analysis Report of Current Capture

Loading...
diff --git a/bin/static/js/main.js b/bin/static/js/main.js index fd78bee..af4f8e7 100644 --- a/bin/static/js/main.js +++ b/bin/static/js/main.js @@ -1,16 +1,99 @@ +const STATUS_RUNNING = 'running'; +const STATUS_QUEUED = 'queued'; +const STATUS_NEEDS_UPDATE = 'needs-update'; +const STATUS_COMPLETE = 'complete'; + async function populateDivs() { const systemStats = await getSystemStats(); const systemStatsDiv = document.getElementById('system-stats'); systemStatsDiv.innerHTML = JSON.stringify(systemStats, null, 2); - const analysisReport = await getAnalysisReport(); const analysisReportDiv = document.getElementById('analysis-report'); - analysisReportDiv.innerHTML = JSON.stringify(analysisReport, null, 2); + try { + const analysisReport = await getAnalysisReport('live'); + analysisReportDiv.innerHTML = JSON.stringify(analysisReport, null, 2); + } catch (e) { + analysisReportDiv.innerHTML = e.toString(); + } const qmdlManifest = await getQmdlManifest(); + await updateAnalysisStatus(qmdlManifest); + await updateAnalysisResults(qmdlManifest); updateQmdlManifestTable(qmdlManifest); } +function setStatus(qmdlManifest, name, status) { + // ignore qmdlManifest.current_entry, it's always running + for (const entry of qmdlManifest.entries) { + if (entry.name === name) { + entry['status'] = status; + return; + } + } +} + +async function updateAnalysisStatus(qmdlManifest) { + const status = JSON.parse(await req('GET', '/api/analysis')); + if (status.running) { + setStatus(qmdlManifest, status.running, STATUS_RUNNING); + } + for (const queued in status.queued) { + setStatus(qmdlManifest, queued, STATUS_QUEUED); + } +} + +function parseNewlineDelimitedJSON(inputStr) { + const lines = inputStr.split('\n'); + const result = []; + let currentLine = ''; + while (lines.length > 0) { + currentLine += lines.shift(); + try { + const entry = JSON.parse(currentLine); + result.push(entry); + // if this chunk wasn't valid JSON, there was an escaped newline in the + // JSON line, so simply continue to the next one + } catch (e) {} + } + return result; +} + +async function updateEntryAnalysisResult(entry) { + entry.analysis = { + warnings: [], + }; + const report = parseNewlineDelimitedJSON(await req('GET', `/api/analysis-report/${entry.name}`)); + for (const row of report) { + if (row["analysis"]) { + const timestamp = new Date(row["timestamp"]); + const analysis = row["analysis"]; + for (const warning of analysis) { + entry.warnings.push({ + timestamp, + warning, + }) + } + } + } + if (entry.analysis.warnings.length === 0) { + entry.analysis_result = `0 warnings!`; + } else { + entry.analysis_result = `!!! ${entry.analysis.warnings.length} warnings !!!`; + } +} + +async function updateAnalysisResults(qmdlManifest) { + if (qmdlManifest.current_entry) { + await updateEntryAnalysisResult(qmdlManifest.current_entry); + } + for (const entry of qmdlManifest.entries) { + if (entry.status === STATUS_NEEDS_UPDATE) { + await updateEntryAnalysisResult(entry); + entry.status = STATUS_COMPLETE; + } + } +} + function updateQmdlManifestTable(manifest) { const table = document.getElementById('qmdl-manifest-table'); const numRows = table.rows.length; @@ -18,43 +101,52 @@ function updateQmdlManifestTable(manifest) { table.deleteRow(1); } if (manifest.current_entry) { - const row = createEntryRow(manifest.current_entry); + const row = createEntryRow(manifest.current_entry, true); row.classList.add('current'); table.appendChild(row) } for (let entry of manifest.entries) { - table.appendChild(createEntryRow(entry)); + table.appendChild(createEntryRow(entry), false); } } -function createEntryRow(entry) { +function createLink(uri, text) { + const link = document.createElement('a'); + link.href = uri; + link.innerText = text; + return link; +} + +function createEntryRow(entry, isCurrent) { const row = document.createElement('tr'); const name = document.createElement('th'); name.scope = 'row'; name.innerText = entry.name; row.appendChild(name); + for (const key of ['start_time', 'last_message_time', 'qmdl_size_bytes']) { const td = document.createElement('td'); td.innerText = entry[key]; row.appendChild(td); } - const pcap_td = document.createElement('td'); - const pcap_link = document.createElement('a'); - pcap_link.href = `/api/pcap/${entry.name}`; - pcap_link.innerText = 'pcap'; - pcap_td.appendChild(pcap_link); - row.appendChild(pcap_td); - const qmdl_td = document.createElement('td'); - const qmdl_link = document.createElement('a'); - qmdl_link.href = `/api/qmdl/${entry.name}`; - qmdl_link.innerText = 'qmdl'; - qmdl_td.appendChild(qmdl_link); - row.appendChild(qmdl_td); + + const pcapTd = document.createElement('td'); + pcapTd.appendChild(createLink(`/api/pcap/${entry.name}`, 'pcap')); + row.appendChild(pcapTd); + + const qmdlTd = document.createElement('td'); + qmdlTd.appendChild(createLink(`/api/qmdl/${entry.name}`, 'qmdl')); + row.appendChild(qmdlTd); + + const analysisResult = document.createElement('td'); + analysisResult.innerText = entry.analysis_result; + row.appendChild(analysisResult); + return row; } -async function getAnalysisReport() { - const rows = await req('GET', '/api/analysis-report'); +async function getAnalysisReport(name) { + const rows = await req('GET', `/api/analysis-report/${name}`); return rows.split('\n') .filter(row => row.length > 0) .map(row => JSON.parse(row)); @@ -67,6 +159,8 @@ async function getSystemStats() { async function getQmdlManifest() { const manifest = JSON.parse(await req('GET', '/api/qmdl-manifest')); if (manifest.current_entry) { + manifest.current_entry.status = STATUS_NEEDS_UPDATE; + manifest.current_entry.analysis_result = 'Waiting...'; manifest.current_entry.start_time = new Date(manifest.current_entry.start_time); if (manifest.current_entry.last_message_time === undefined) { manifest.current_entry.last_message_time = "N/A"; @@ -75,6 +169,8 @@ async function getQmdlManifest() { } } for (entry of manifest.entries) { + entry.status = STATUS_NEEDS_UPDATE; + entry.analysis_result = 'Waiting...'; entry.start_time = new Date(entry.start_time); entry.last_message_time = new Date(entry.last_message_time); } diff --git a/dist/config.toml.example b/dist/config.toml.example index 1492ccd..edfdaed 100644 --- a/dist/config.toml.example +++ b/dist/config.toml.example @@ -1,10 +1,9 @@ # cat config.toml qmdl_store_path = "/data/rayhunter/qmdl" port = 8080 -readonly_mode = false -# UI Levels: -# 0 = invisible mode, no indicator that rayhunter is running -# 1 = Subtle mode, display a green line at the top of the screen when rayhunter is running -# 2 = Demo Mode, display a fun orca gif +# UI Levels: +# 0 = invisible mode, no indicator that rayhunter is running +# 1 = Subtle mode, display a green line at the top of the screen when rayhunter is running +# 2 = Demo Mode, display a fun orca gif # 3 = display the EFF logo ui_level = 1