diff --git a/Cargo.lock b/Cargo.lock index 3b7c42f..f2e3207 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.2" @@ -26,6 +38,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -204,6 +222,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets 0.48.5", ] @@ -343,6 +362,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "funty" version = "2.0.0" @@ -364,6 +389,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -383,9 +419,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", + "futures-macro", "futures-task", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -418,6 +456,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "hermit-abi" @@ -829,6 +871,43 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -867,6 +946,15 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -1040,6 +1128,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand", + "remove_dir_all", +] + [[package]] name = "termcolor" version = "1.4.0" @@ -1108,6 +1206,8 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown", "pin-project-lite", "tokio", "tracing", @@ -1292,13 +1392,16 @@ name = "wavehunter" version = "0.1.0" dependencies = [ "axum", + "chrono", "env_logger", "futures-core", + "futures-macro", "include_dir", "log", "mime_guess", "orca", "serde", + "tempdir", "thiserror", "tokio", "tokio-util", @@ -1494,3 +1597,23 @@ checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" dependencies = [ "tap", ] + +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] diff --git a/orca/src/diag_device.rs b/orca/src/diag_device.rs index 1b1d471..4e456f6 100644 --- a/orca/src/diag_device.rs +++ b/orca/src/diag_device.rs @@ -71,7 +71,7 @@ const DIAG_IOCTL_SWITCH_LOGGING: u64 = 7; pub struct DiagDevice { file: File, - pub qmdl_writer: QmdlWriter, + pub qmdl_writer: Option>, fully_initialized: bool, read_buf: Vec, use_mdm: i32, @@ -92,16 +92,18 @@ impl DiagReader for DiagDevice { warn!("warning: {} leftover bytes when parsing MessagesContainer", leftover_bytes.len()); } - if self.fully_initialized { - self.qmdl_writer.write_container(&container) - .map_err(DiagDeviceError::QmdlFileWriteError)?; + if let Some(qmdl_writer) = self.qmdl_writer.as_mut() { + if self.fully_initialized { + qmdl_writer.write_container(&container) + .map_err(DiagDeviceError::QmdlFileWriteError)?; + } } Ok(container) } } impl DiagDevice { - pub fn new

(qmdl_path: P) -> DiagResult where P: AsRef { + pub fn new(qmdl_writer: Option>) -> DiagResult { let diag_file = std::fs::File::options() .read(true) .write(true) @@ -109,21 +111,6 @@ impl DiagDevice { .map_err(DiagDeviceError::OpenDiagDeviceError)?; let fd = diag_file.as_raw_fd(); - let qmdl_file = File::options() - .create(true) - .append(true) - .open(&qmdl_path) - .map_err(DiagDeviceError::OpenQmdlFileError)?; - let qmdl_metadata = qmdl_file.metadata().map_err(DiagDeviceError::OpenQmdlFileError)?; - if qmdl_metadata.len() != 0 { - info!( - "QMDL file {} already contains data ({} bytes), appending to it", - qmdl_path.as_ref().display(), - qmdl_metadata.len() - ); - } - let qmdl_writer = QmdlWriter::new_with_existing_size(qmdl_file, qmdl_metadata.len() as usize); - enable_frame_readwrite(fd, MEMORY_DEVICE_MODE)?; let use_mdm = determine_use_mdm(fd)?; diff --git a/wavehunter/Cargo.toml b/wavehunter/Cargo.toml index 1d94e20..af25014 100644 --- a/wavehunter/Cargo.toml +++ b/wavehunter/Cargo.toml @@ -10,11 +10,14 @@ orca = { path = "../orca" } toml = "0.8.8" serde = { version = "1.0.193", features = ["derive"] } tokio = { version = "1.35.1", features = ["full"] } -axum = "0.7.2" +axum = "0.7.3" futures-core = "0.3.30" thiserror = "1.0.52" log = "0.4.20" env_logger = "0.10.1" -tokio-util = { version = "0.7.10", features = ["io"] } +tokio-util = { version = "0.7.10", features = ["rt"] } +futures-macro = "0.3.30" include_dir = "0.7.3" mime_guess = "2.0.4" +tempdir = "0.3.7" +chrono = { version = "0.4.31", features = ["serde"] } diff --git a/wavehunter/src/config.rs b/wavehunter/src/config.rs index d8280f8..be91ddf 100644 --- a/wavehunter/src/config.rs +++ b/wavehunter/src/config.rs @@ -5,36 +5,40 @@ use serde::Deserialize; #[derive(Deserialize)] struct ConfigFile { qmdl_path: Option, + qmdl_store_path: Option, port: Option, - debug_mode: Option, + readonly_mode: Option, } #[derive(Debug)] pub struct Config { pub qmdl_path: String, + pub qmdl_store_path: String, pub port: u16, - pub debug_mode: bool, + pub readonly_mode: bool, } impl Default for Config { fn default() -> Self { Config { qmdl_path: "./wavehunter.qmdl".to_string(), + qmdl_store_path: "/data/wavehunter".to_string(), port: 8080, - debug_mode: false, + readonly_mode: false, } } } pub fn parse_config

(path: P) -> Result where P: AsRef { - let config_file = std::fs::read_to_string(&path) - .map_err(|_| WavehunterError::MissingConfigFile(format!("{:?}", path.as_ref())))?; - let parsed_config: ConfigFile = toml::from_str(&config_file) - .map_err(WavehunterError::ConfigFileParsingError)?; let mut config = Config::default(); - if let Some(path) = parsed_config.qmdl_path { config.qmdl_path = path } - if let Some(port) = parsed_config.port { config.port = port } - if let Some(debug_mode) = parsed_config.debug_mode { config.debug_mode = debug_mode } + if let Ok(config_file) = std::fs::read_to_string(&path) { + let parsed_config: ConfigFile = toml::from_str(&config_file) + .map_err(WavehunterError::ConfigFileParsingError)?; + if let Some(path) = parsed_config.qmdl_path { config.qmdl_path = path } + if let Some(path) = parsed_config.qmdl_store_path { config.qmdl_store_path = path } + if let Some(port) = parsed_config.port { config.port = port } + if let Some(debug_mode) = parsed_config.readonly_mode { config.readonly_mode = debug_mode } + } Ok(config) } diff --git a/wavehunter/src/diag.rs b/wavehunter/src/diag.rs new file mode 100644 index 0000000..2eac3e6 --- /dev/null +++ b/wavehunter/src/diag.rs @@ -0,0 +1,99 @@ +use std::sync::Arc; + +use axum::extract::State; +use axum::http::StatusCode; +use orca::diag_device::DiagDevice; +use orca::diag_reader::DiagReader; +use tokio::sync::RwLock; +use tokio::sync::mpsc::{Receiver, self}; +use orca::qmdl::QmdlWriter; +use log::{debug, info}; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::task::JoinHandle; +use tokio_util::task::TaskTracker; + +use crate::error::WavehunterError; +use crate::qmdl_store::QmdlStore; +use crate::server::ServerState; + +pub enum DiagDeviceCtrlMessage { + StopRecording, + StartRecording(QmdlWriter), + Exit, +} + +pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver, qmdl_store_lock: Arc>) -> JoinHandle> { + let (tx, mut rx) = mpsc::channel::<(usize, usize)>(1); + let qmdl_store_lock_clone = qmdl_store_lock.clone(); + task_tracker.spawn(async move { + while let Some((entry_idx, new_size)) = rx.recv().await { + let mut qmdl_store = qmdl_store_lock_clone.write().await; + qmdl_store.update_entry_size(entry_idx, new_size).await + .expect("failed to update qmdl file size"); + } + info!("QMDL store size updater thread exiting..."); + }); + + task_tracker.spawn_blocking(move || { + loop { + match qmdl_file_rx.try_recv() { + Ok(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)) => { + dev.qmdl_writer = Some(qmdl_writer); + }, + Ok(DiagDeviceCtrlMessage::StopRecording) => dev.qmdl_writer = None, + Ok(DiagDeviceCtrlMessage::Exit) | Err(TryRecvError::Disconnected) => { + info!("Diag reader thread exiting..."); + return Ok(()) + }, + // empty just means there's no message for us, so continue as normal + Err(TryRecvError::Empty) => {}, + } + + // remember the QmdlStore current entry index so we can update its size later + let qmdl_store_index = qmdl_store_lock.blocking_read().current_entry; + + // TODO: once we're actually doing analysis, we'll wanna use the messages + // returned here. Until then, the DiagDevice has already written those messages + // to the QMDL file, so we can just ignore them. + debug!("reading response from diag device..."); + let _messages = dev.read_response().map_err(WavehunterError::DiagReadError)?; + debug!("got diag response ({} messages)", _messages.len()); + + // keep track of how many bytes were written to the QMDL file so we can read + // a valid block of data from it in the HTTP server + if let Some(qmdl_writer) = dev.qmdl_writer.as_ref() { + debug!("total QMDL bytes written: {}, sending update...", qmdl_writer.total_written); + let index = qmdl_store_index.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); + tx.blocking_send((index, qmdl_writer.total_written)).unwrap(); + debug!("done!"); + } else { + debug!("no qmdl_writer set, continuing..."); + } + } + }) +} + +pub async fn start_recording(State(state): State>) -> Result<(StatusCode, String), (StatusCode, String)> { + if state.readonly_mode { + return Err((StatusCode::FORBIDDEN, format!("server is in readonly mode"))); + } + let mut qmdl_store = state.qmdl_store_lock.write().await; + let qmdl_file = qmdl_store.new_entry().await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't create new qmdl entry: {}", e)))?; + let qmdl_writer = QmdlWriter::new(qmdl_file.into_std().await); + state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?; + Ok((StatusCode::ACCEPTED, format!("ok"))) +} + +pub async fn stop_recording(State(state): State>) -> Result<(StatusCode, String), (StatusCode, String)> { + if state.readonly_mode { + return Err((StatusCode::FORBIDDEN, format!("server is in readonly mode"))); + } + let mut qmdl_store = state.qmdl_store_lock.write().await; + qmdl_store.close_current_entry().await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't close current qmdl entry: {}", e)))?; + state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StopRecording).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?; + Ok((StatusCode::ACCEPTED, format!("ok"))) +} diff --git a/wavehunter/src/error.rs b/wavehunter/src/error.rs index 72200a9..ed1d34d 100644 --- a/wavehunter/src/error.rs +++ b/wavehunter/src/error.rs @@ -1,10 +1,10 @@ use thiserror::Error; use orca::diag_device::DiagDeviceError; +use crate::qmdl_store::QmdlStoreError; + #[derive(Error, Debug)] pub enum WavehunterError { - #[error("Missing config file: {0}")] - MissingConfigFile(String), #[error("Config file parsing error: {0}")] ConfigFileParsingError(#[from] toml::de::Error), #[error("Diag intialization error: {0}")] @@ -13,4 +13,8 @@ pub enum WavehunterError { DiagReadError(DiagDeviceError), #[error("Tokio error: {0}")] TokioError(#[from] tokio::io::Error), + #[error("QmdlStore error: {0}")] + QmdlStoreError(#[from] QmdlStoreError), + #[error("No QMDL store found at path {0}, but can't create a new one due to readonly mode")] + NoStoreReadonlyMode(String), } diff --git a/wavehunter/src/main.rs b/wavehunter/src/main.rs index 404d465..9606e26 100644 --- a/wavehunter/src/main.rs +++ b/wavehunter/src/main.rs @@ -3,64 +3,106 @@ mod error; mod pcap; mod server; mod stats; +mod qmdl_store; +mod diag; use crate::config::{parse_config, parse_args}; +use crate::diag::run_diag_read_thread; +use crate::qmdl_store::QmdlStore; use crate::server::{ServerState, get_qmdl, serve_static}; use crate::pcap::get_pcap; -use crate::stats::{get_system_stats, get_diag_stats}; +use crate::stats::get_system_stats; use crate::error::WavehunterError; use axum::response::Redirect; +use diag::{DiagDeviceCtrlMessage, start_recording, stop_recording}; +use log::{info, error}; use orca::diag_device::DiagDevice; -use orca::diag_reader::DiagReader; -use axum::routing::get; +use axum::routing::{get, post}; use axum::Router; -use tokio::fs::File; -use log::debug; +use orca::qmdl::QmdlWriter; +use stats::get_qmdl_manifest; +use tokio::sync::mpsc::{self, Sender}; +use tokio::task::JoinHandle; +use tokio_util::task::TaskTracker; use std::net::SocketAddr; use tokio::net::TcpListener; -use tokio::sync::RwLock; -use tokio::task::JoinHandle; +use tokio::sync::{RwLock, oneshot}; use std::sync::Arc; -fn run_diag_read_thread(mut dev: DiagDevice, bytes_read_lock: Arc>) -> JoinHandle> { - tokio::task::spawn_blocking(move || { - loop { - // TODO: once we're actually doing analysis, we'll wanna use the messages - // returned here. Until then, the DiagDevice has already written those messages - // to the QMDL file, so we can just ignore them. - debug!("reading response from diag device..."); - let _messages = dev.read_response().map_err(WavehunterError::DiagReadError)?; - debug!("got diag response ({} messages)", _messages.len()); - - // keep track of how many bytes were written to the QMDL file so we can read - // a valid block of data from it in the HTTP server - debug!("total QMDL bytes written: {}, updating state...", dev.qmdl_writer.total_written); - let mut bytes_read = bytes_read_lock.blocking_write(); - *bytes_read = dev.qmdl_writer.total_written; - debug!("done!"); - } - }) -} - -async fn run_server(config: &config::Config, qmdl_bytes_written: Arc>) -> Result<(), WavehunterError> { +async fn run_server( + task_tracker: &TaskTracker, + config: &config::Config, + qmdl_store_lock: Arc>, + server_shutdown_rx: oneshot::Receiver<()>, + diag_device_sender: Sender +) -> JoinHandle<()> { let state = Arc::new(ServerState { - qmdl_bytes_written, - qmdl_path: config.qmdl_path.clone(), + qmdl_store_lock, + diag_device_ctrl_sender: diag_device_sender, + readonly_mode: config.readonly_mode, }); let app = Router::new() - .route("/api/pcap/latest.pcap", get(get_pcap)) - .route("/api/qmdl/latest.qmdl", get(get_qmdl)) + .route("/api/pcap/*name", get(get_pcap)) + .route("/api/qmdl/*name", get(get_qmdl)) .route("/api/system-stats", get(get_system_stats)) - .route("/api/diag-stats", get(get_diag_stats)) + .route("/api/qmdl-manifest", get(get_qmdl_manifest)) + .route("/api/start-recording", post(start_recording)) + .route("/api/stop-recording", post(stop_recording)) .route("/", get(|| async { Redirect::permanent("/index.html") })) .route("/*path", get(serve_static)) .with_state(state); let addr = SocketAddr::from(([127, 0, 0, 1], config.port)); let listener = TcpListener::bind(&addr).await.unwrap(); - axum::serve(listener, app).await.unwrap(); - Ok(()) + task_tracker.spawn(async move { + info!("The orca is hunting for stingrays..."); + axum::serve(listener, app) + .with_graceful_shutdown(server_shutdown_signal(server_shutdown_rx)) + .await.unwrap(); + }) +} + +async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) { + server_shutdown_rx.await.unwrap(); + info!("Server received shutdown signal, exiting..."); +} + +async fn init_qmdl_store(config: &config::Config) -> Result { + match (QmdlStore::exists(&config.qmdl_store_path).await?, config.readonly_mode) { + (true, _) => Ok(QmdlStore::load(&config.qmdl_store_path).await?), + (false, false) => Ok(QmdlStore::create(&config.qmdl_store_path).await?), + (false, true) => Err(WavehunterError::NoStoreReadonlyMode(config.qmdl_store_path.clone())), + } +} + +fn run_ctrl_c_thread( + task_tracker: &TaskTracker, + diag_device_sender: Sender, + server_shutdown_tx: oneshot::Sender<()>, + qmdl_store_lock: Arc> +) -> JoinHandle> { + task_tracker.spawn(async move { + match tokio::signal::ctrl_c().await { + Ok(()) => { + let mut qmdl_store = qmdl_store_lock.write().await; + if qmdl_store.current_entry.is_some() { + info!("Closing current QMDL entry..."); + qmdl_store.close_current_entry().await?; + info!("Done!"); + } + + server_shutdown_tx.send(()) + .expect("couldn't send server shutdown signal"); + diag_device_sender.send(DiagDeviceCtrlMessage::Exit).await + .expect("couldn't send Exit message to diag thread"); + }, + Err(err) => { + error!("Unable to listen for shutdown signal: {}", err); + } + } + Ok(()) + }) } #[tokio::main] @@ -70,23 +112,27 @@ async fn main() -> Result<(), WavehunterError> { let args = parse_args(); let config = parse_config(&args.config_path)?; - let qmdl_bytes_lock: Arc>; - if !config.debug_mode { - let mut dev = DiagDevice::new(&config.qmdl_path) + let task_tracker = TaskTracker::new(); + + let qmdl_store_lock = Arc::new(RwLock::new(init_qmdl_store(&config).await?)); + let (tx, rx) = mpsc::channel::(1); + if !config.readonly_mode { + let qmdl_file = qmdl_store_lock.write().await.new_entry().await?; + let qmdl_writer = QmdlWriter::new(qmdl_file.into_std().await); + let mut dev = DiagDevice::new(Some(qmdl_writer)) .map_err(WavehunterError::DiagInitError)?; dev.config_logs() .map_err(WavehunterError::DiagInitError)?; - qmdl_bytes_lock = Arc::new(RwLock::new(dev.qmdl_writer.total_written)); - // TODO: handle exiting gracefully - let _read_thread_handle = run_diag_read_thread(dev, qmdl_bytes_lock.clone()); - } else { - let qmdl_file = File::open(&config.qmdl_path).await.expect("couldn't open QMDL file"); - let qmdl_file_size = qmdl_file.metadata().await.expect("couldn't get QMDL file metadata") - .len() as usize; - qmdl_bytes_lock = Arc::new(RwLock::new(qmdl_file_size)); + run_diag_read_thread(&task_tracker, dev, rx, qmdl_store_lock.clone()); } - println!("The orca is hunting for stingrays..."); - run_server(&config, qmdl_bytes_lock).await + let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>(); + run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, qmdl_store_lock.clone()); + run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, tx).await; + + task_tracker.close(); + task_tracker.wait().await; + + Ok(()) } diff --git a/wavehunter/src/pcap.rs b/wavehunter/src/pcap.rs index 290c60d..ea51ba1 100644 --- a/wavehunter/src/pcap.rs +++ b/wavehunter/src/pcap.rs @@ -6,10 +6,9 @@ use orca::qmdl::{QmdlReader, QmdlReaderError}; use orca::diag_reader::DiagReader; use axum::body::Body; use axum::http::header::CONTENT_TYPE; -use axum::extract::State; +use axum::extract::{State, Path}; use axum::http::StatusCode; use axum::response::{Response, IntoResponse}; -use std::fs::File; use std::io::Write; use std::pin::Pin; use std::sync::Arc; @@ -22,14 +21,19 @@ use tokio::sync::mpsc; // written so far. This is done by spawning a blocking thread (a tokio thread // capable of handling blocking operations) which streams chunks of pcap data to // a channel that's piped to the client. -pub async fn get_pcap(State(state): State>) -> Result { - let qmdl_bytes_written = *state.qmdl_bytes_written.read().await; - if qmdl_bytes_written == 0 { +pub async fn get_pcap(State(state): State>, Path(qmdl_name): Path) -> Result { + let qmdl_store = state.qmdl_store_lock.read().await; + let entry = qmdl_store.entry_for_name(&qmdl_name) + .ok_or((StatusCode::NOT_FOUND, format!("couldn't find qmdl file with name {}", qmdl_name)))?; + if entry.size_bytes == 0 { return Err(( StatusCode::SERVICE_UNAVAILABLE, "QMDL file is empty, try again in a bit!".to_string() )); } + let qmdl_file = qmdl_store.open_entry(&entry).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))? + .into_std().await; let (tx, rx) = mpsc::channel(1); let channel_reader = ChannelReader { rx }; @@ -37,8 +41,7 @@ pub async fn get_pcap(State(state): State>) -> Result, // index into manifest +} + +#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)] +pub struct Manifest { + pub entries: Vec, +} + +#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)] +pub struct ManifestEntry { + pub name: String, + pub start_time: DateTime, + pub end_time: Option>, + pub size_bytes: usize, +} + +impl ManifestEntry { + fn new() -> Self { + let now = Local::now(); + ManifestEntry { + name: format!("{}", now.timestamp()), + start_time: now, + end_time: None, + size_bytes: 0, + } + } +} + +impl QmdlStore { + // Returns whether a directory with a "manifest.toml" exists at the given + // path (though doesn't check if that manifest is valid) + pub async fn exists

(path: P) -> Result where P: AsRef { + let manifest_path = path.as_ref().join("manifest.toml"); + let dir_exists = try_exists(path).await.map_err(QmdlStoreError::OpenDirError)?; + let manifest_exists = try_exists(manifest_path).await.map_err(QmdlStoreError::ReadManifestError)?; + Ok(dir_exists && manifest_exists) + } + + // Loads an existing QmdlStore at the given path. Errors if no store exists, + // or if it's malformed. + pub async fn load

(path: P) -> Result where P: AsRef { + let path: PathBuf = path.as_ref().to_path_buf(); + let manifest = QmdlStore::read_manifest(&path).await?; + Ok(QmdlStore { + path, + manifest, + current_entry: None, + }) + } + + // Creates a new QmdlStore at the given path. This involves creating a dir + // and writing an empty manifest. + pub async fn create

(path: P) -> Result where P: AsRef { + let manifest_path = path.as_ref().join("manifest.toml"); + fs::create_dir_all(&path).await + .map_err(QmdlStoreError::OpenDirError)?; + let mut manifest_file = File::create(&manifest_path).await + .map_err(QmdlStoreError::WriteManifestError)?; + let empty_manifest = Manifest { entries: Vec::new() }; + let empty_manifest_contents = toml::to_string_pretty(&empty_manifest) + .expect("failed to serialize manifest"); + manifest_file.write_all(empty_manifest_contents.as_bytes()).await + .map_err(QmdlStoreError::WriteManifestError)?; + QmdlStore::load(path).await + } + + async fn read_manifest

(path: P) -> Result where P: AsRef { + let manifest_path = path.as_ref().join("manifest.toml"); + let file_contents = fs::read_to_string(&manifest_path).await + .map_err(QmdlStoreError::ReadManifestError)?; + toml::from_str(&file_contents) + .map_err(QmdlStoreError::ParseManifestError) + } + + // Closes the current entry (if needed), creates a new entry based on the + // current time, and updates the manifest + pub async fn new_entry(&mut self) -> Result { + // if we've already got an entry open, close it + if self.current_entry.is_some() { + self.close_current_entry().await?; + } + let new_entry = ManifestEntry::new(); + let mut file_path = self.path.join(&new_entry.name); + file_path.set_extension("qmdl"); + let file = File::options() + .create(true) + .write(true) + .open(&file_path).await + .map_err(QmdlStoreError::CreateFileError)?; + self.manifest.entries.push(new_entry); + self.current_entry = Some(self.manifest.entries.len() - 1); + self.write_manifest().await?; + Ok(file) + } + + pub async fn open_entry(&self, entry: &ManifestEntry) -> Result { + let mut file_path = self.path.join(&entry.name); + file_path.set_extension("qmdl"); + File::open(file_path).await + .map_err(QmdlStoreError::ReadFileError) + } + + // Sets the current entry's end_time, updates the manifest, and unsets the + // current entry + pub async fn close_current_entry(&mut self) -> Result<(), QmdlStoreError> { + let entry_index = self.current_entry.take() + .ok_or(QmdlStoreError::NoCurrentEntry)?; + self.manifest.entries[entry_index].end_time = Some(Local::now()); + self.write_manifest().await + } + + // Sets the given entry's size, updating the manifest + pub async fn update_entry_size(&mut self, entry_index: usize, size_bytes: usize) -> Result<(), QmdlStoreError> { + self.manifest.entries[entry_index].size_bytes = size_bytes; + self.write_manifest().await + } + + async fn write_manifest(&mut self) -> Result<(), QmdlStoreError> { + let mut manifest_file = File::options() + .write(true) + .open(self.path.join("manifest.toml")).await + .map_err(QmdlStoreError::WriteManifestError)?; + let manifest_contents = toml::to_string_pretty(&self.manifest) + .expect("failed to serialize manifest"); + manifest_file.write_all(manifest_contents.as_bytes()).await + .map_err(QmdlStoreError::WriteManifestError)?; + Ok(()) + } + + // Finds an entry by filename + pub fn entry_for_name(&self, name: &str) -> Option { + self.manifest.entries.iter() + .find(|entry| entry.name == name) + .cloned() + } +} + +#[cfg(test)] +mod tests { + use tempdir::TempDir; + use super::*; + + #[tokio::test] + async fn test_load_from_empty_dir() { + let dir = TempDir::new("qmdl_store_test").unwrap(); + assert!(!QmdlStore::exists(dir.path()).await.unwrap()); + let _created_store = QmdlStore::create(dir.path()).await.unwrap(); + assert!(QmdlStore::exists(dir.path()).await.unwrap()); + let loaded_store = QmdlStore::load(dir.path()).await.unwrap(); + assert_eq!(loaded_store.manifest.entries.len(), 0); + } + + #[tokio::test] + async fn test_creating_updating_and_closing_entries() { + let dir = TempDir::new("qmdl_store_test").unwrap(); + let mut store = QmdlStore::create(dir.path()).await.unwrap(); + let _ = store.new_entry().await.unwrap(); + let entry_index = store.current_entry.unwrap(); + assert_eq!(QmdlStore::read_manifest(dir.path()).await.unwrap(), store.manifest); + + store.update_entry_size(entry_index, 1000).await.unwrap(); + assert_eq!(store.manifest.entries[entry_index].size_bytes, 1000); + assert_eq!(QmdlStore::read_manifest(dir.path()).await.unwrap(), store.manifest); + + assert!(store.manifest.entries[entry_index].end_time.is_none()); + store.close_current_entry().await.unwrap(); + let entry = store.entry_for_name(&store.manifest.entries[entry_index].name).unwrap(); + assert!(entry.end_time.is_some()); + assert_eq!(QmdlStore::read_manifest(dir.path()).await.unwrap(), store.manifest); + + assert!(matches!(store.close_current_entry().await, Err(QmdlStoreError::NoCurrentEntry))); + } + + #[tokio::test] + async fn test_repeated_new_entries() { + let dir = TempDir::new("qmdl_store_test").unwrap(); + let mut store = QmdlStore::create(dir.path()).await.unwrap(); + let _ = store.new_entry().await.unwrap(); + let entry_index = store.current_entry.unwrap(); + let _ = store.new_entry().await.unwrap(); + let new_entry_index = store.current_entry.unwrap(); + assert_ne!(entry_index, new_entry_index); + assert_eq!(store.manifest.entries.len(), 2); + } +} diff --git a/wavehunter/src/server.rs b/wavehunter/src/server.rs index affa28d..783130d 100644 --- a/wavehunter/src/server.rs +++ b/wavehunter/src/server.rs @@ -5,22 +5,28 @@ use axum::http::{StatusCode, HeaderValue}; use axum::response::{Response, IntoResponse}; use axum::extract::Path; use tokio::io::AsyncReadExt; +use tokio::sync::mpsc::Sender; use std::sync::Arc; use tokio::sync::RwLock; -use tokio::fs::File as AsyncFile; use tokio_util::io::ReaderStream; use include_dir::{include_dir, Dir}; +use crate::DiagDeviceCtrlMessage; +use crate::qmdl_store::QmdlStore; + pub struct ServerState { - pub qmdl_bytes_written: Arc>, - pub qmdl_path: String, + pub qmdl_store_lock: Arc>, + pub diag_device_ctrl_sender: Sender, + pub readonly_mode: bool, } -pub async fn get_qmdl(State(state): State>) -> Result { - let qmdl_file = AsyncFile::open(&state.qmdl_path).await +pub async fn get_qmdl(State(state): State>, Path(qmdl_name): Path) -> Result { + let qmdl_store = state.qmdl_store_lock.read().await; + let entry = qmdl_store.entry_for_name(&qmdl_name) + .ok_or((StatusCode::NOT_FOUND, format!("couldn't find qmdl file with name {}", qmdl_name)))?; + let qmdl_file = qmdl_store.open_entry(&entry).await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("error opening QMDL file: {}", e)))?; - let qmdl_bytes_written = *state.qmdl_bytes_written.read().await; - let limited_qmdl_file = qmdl_file.take(qmdl_bytes_written as u64); + let limited_qmdl_file = qmdl_file.take(entry.size_bytes as u64); let qmdl_stream = ReaderStream::new(limited_qmdl_file); let headers = [(CONTENT_TYPE, "application/octet-stream")]; diff --git a/wavehunter/src/stats.rs b/wavehunter/src/stats.rs index 9ccc0fd..3ddd088 100644 --- a/wavehunter/src/stats.rs +++ b/wavehunter/src/stats.rs @@ -1,10 +1,11 @@ +use std::sync::Arc; + +use crate::qmdl_store::ManifestEntry; use crate::server::ServerState; use axum::Json; use axum::extract::State; use axum::http::StatusCode; -use axum::response::IntoResponse; -use std::sync::Arc; use log::error; use serde::Serialize; use tokio::process::Command; @@ -97,7 +98,8 @@ fn humanize_kb(kb: usize) -> String { } pub async fn get_system_stats(State(state): State>) -> Result, (StatusCode, String)> { - match SystemStats::new(&state.qmdl_path).await { + let qmdl_store = state.qmdl_store_lock.read().await; + match SystemStats::new(qmdl_store.path.to_str().unwrap()).await { Ok(stats) => Ok(Json(stats)), Err(err) => { error!("error getting system stats: {}", err); @@ -109,13 +111,18 @@ pub async fn get_system_stats(State(state): State>) -> Result, + pub current_entry: Option, } -pub async fn get_diag_stats(State(state): State>) -> impl IntoResponse { - Json(DiagStats { - bytes_written: *state.qmdl_bytes_written.read().await, - }) +pub async fn get_qmdl_manifest(State(state): State>) -> Result, (StatusCode, String)> { + let qmdl_store = state.qmdl_store_lock.read().await; + let mut entries = qmdl_store.manifest.entries.clone(); + let current_entry = qmdl_store.current_entry.map(|index| entries.remove(index)); + Ok(Json(ManifestStats { + entries, + current_entry, + })) } diff --git a/wavehunter/static/css/style.css b/wavehunter/static/css/style.css index e69de29..de89e27 100644 --- a/wavehunter/static/css/style.css +++ b/wavehunter/static/css/style.css @@ -0,0 +1,40 @@ +td, +th { + border: 1px solid rgb(190, 190, 190); + padding: 10px; +} + +td { + text-align: center; +} + +tr:nth-child(even) { + background-color: #eee; +} + +th[scope='col'] { + background-color: #696969; + color: #fff; +} + +th[scope='row'] { + background-color: #d7d9f2; +} + +tr.current { + background-color: #fe537b; + font-weight: bold; +} + +caption { + padding: 10px; + caption-side: bottom; +} + +table { + border-collapse: collapse; + border: 2px solid rgb(200, 200, 200); + letter-spacing: 1px; + font-family: sans-serif; + font-size: 0.8rem; +} diff --git a/wavehunter/static/index.html b/wavehunter/static/index.html index bc2d433..c7387b1 100644 --- a/wavehunter/static/index.html +++ b/wavehunter/static/index.html @@ -4,19 +4,35 @@ -

- Latest PCAP - Latest QMDL +
+ + +
+ + + + + + + + + + + +
NameDate StartedDate StoppedSize (bytes)PCAPQMDL
+
+

System stats

+
Loading...
-
Loading...
-
Loading...
diff --git a/wavehunter/static/js/main.js b/wavehunter/static/js/main.js index de2f9ee..b8885dd 100644 --- a/wavehunter/static/js/main.js +++ b/wavehunter/static/js/main.js @@ -1,24 +1,90 @@ async function populateDivs() { const systemStats = await getSystemStats(); - const diagStats = await getDiagStats(); - const systemStatsDiv = document.getElementById('system-stats'); - const diagStatsDiv = document.getElementById('diag-stats'); - systemStatsDiv.innerHTML = JSON.stringify(systemStats, null, 2); - diagStatsDiv.innerHTML = JSON.stringify(diagStats, null, 2); + + const qmdlManifest = await getQmdlManifest(); + updateQmdlManifestTable(qmdlManifest); +} + +function updateQmdlManifestTable(manifest) { + const table = document.getElementById('qmdl-manifest-table'); + const numRows = table.rows.length; + for (let i=1; i= 200 && response.status < 300) { + return body; + } else { + throw new Error(body); + } }