wavehunter: add QMDL storage

Instead of reading/writing to a single QMDL file, we now can manage
a directory of several files, and have the ability to start/stop writing
to them on the fly.

This commit also adds graceful exiting to the server, so we can perform
cleanup steps when the server's exiting.
This commit is contained in:
Will Greenberg
2024-01-10 15:47:52 -08:00
parent 3d869971d9
commit 7b972ef5e4
14 changed files with 742 additions and 126 deletions

View File

@@ -3,64 +3,106 @@ mod error;
mod pcap;
mod server;
mod stats;
mod qmdl_store;
mod diag;
use crate::config::{parse_config, parse_args};
use crate::diag::run_diag_read_thread;
use crate::qmdl_store::QmdlStore;
use crate::server::{ServerState, get_qmdl, serve_static};
use crate::pcap::get_pcap;
use crate::stats::{get_system_stats, get_diag_stats};
use crate::stats::get_system_stats;
use crate::error::WavehunterError;
use axum::response::Redirect;
use diag::{DiagDeviceCtrlMessage, start_recording, stop_recording};
use log::{info, error};
use orca::diag_device::DiagDevice;
use orca::diag_reader::DiagReader;
use axum::routing::get;
use axum::routing::{get, post};
use axum::Router;
use tokio::fs::File;
use log::debug;
use orca::qmdl::QmdlWriter;
use stats::get_qmdl_manifest;
use tokio::sync::mpsc::{self, Sender};
use tokio::task::JoinHandle;
use tokio_util::task::TaskTracker;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio::sync::{RwLock, oneshot};
use std::sync::Arc;
fn run_diag_read_thread(mut dev: DiagDevice, bytes_read_lock: Arc<RwLock<usize>>) -> JoinHandle<Result<(), WavehunterError>> {
tokio::task::spawn_blocking(move || {
loop {
// TODO: once we're actually doing analysis, we'll wanna use the messages
// returned here. Until then, the DiagDevice has already written those messages
// to the QMDL file, so we can just ignore them.
debug!("reading response from diag device...");
let _messages = dev.read_response().map_err(WavehunterError::DiagReadError)?;
debug!("got diag response ({} messages)", _messages.len());
// keep track of how many bytes were written to the QMDL file so we can read
// a valid block of data from it in the HTTP server
debug!("total QMDL bytes written: {}, updating state...", dev.qmdl_writer.total_written);
let mut bytes_read = bytes_read_lock.blocking_write();
*bytes_read = dev.qmdl_writer.total_written;
debug!("done!");
}
})
}
async fn run_server(config: &config::Config, qmdl_bytes_written: Arc<RwLock<usize>>) -> Result<(), WavehunterError> {
async fn run_server(
task_tracker: &TaskTracker,
config: &config::Config,
qmdl_store_lock: Arc<RwLock<QmdlStore>>,
server_shutdown_rx: oneshot::Receiver<()>,
diag_device_sender: Sender<DiagDeviceCtrlMessage>
) -> JoinHandle<()> {
let state = Arc::new(ServerState {
qmdl_bytes_written,
qmdl_path: config.qmdl_path.clone(),
qmdl_store_lock,
diag_device_ctrl_sender: diag_device_sender,
readonly_mode: config.readonly_mode,
});
let app = Router::new()
.route("/api/pcap/latest.pcap", get(get_pcap))
.route("/api/qmdl/latest.qmdl", get(get_qmdl))
.route("/api/pcap/*name", get(get_pcap))
.route("/api/qmdl/*name", get(get_qmdl))
.route("/api/system-stats", get(get_system_stats))
.route("/api/diag-stats", get(get_diag_stats))
.route("/api/qmdl-manifest", get(get_qmdl_manifest))
.route("/api/start-recording", post(start_recording))
.route("/api/stop-recording", post(stop_recording))
.route("/", get(|| async { Redirect::permanent("/index.html") }))
.route("/*path", get(serve_static))
.with_state(state);
let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
let listener = TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
task_tracker.spawn(async move {
info!("The orca is hunting for stingrays...");
axum::serve(listener, app)
.with_graceful_shutdown(server_shutdown_signal(server_shutdown_rx))
.await.unwrap();
})
}
async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) {
server_shutdown_rx.await.unwrap();
info!("Server received shutdown signal, exiting...");
}
async fn init_qmdl_store(config: &config::Config) -> Result<QmdlStore, WavehunterError> {
match (QmdlStore::exists(&config.qmdl_store_path).await?, config.readonly_mode) {
(true, _) => Ok(QmdlStore::load(&config.qmdl_store_path).await?),
(false, false) => Ok(QmdlStore::create(&config.qmdl_store_path).await?),
(false, true) => Err(WavehunterError::NoStoreReadonlyMode(config.qmdl_store_path.clone())),
}
}
fn run_ctrl_c_thread(
task_tracker: &TaskTracker,
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
server_shutdown_tx: oneshot::Sender<()>,
qmdl_store_lock: Arc<RwLock<QmdlStore>>
) -> JoinHandle<Result<(), WavehunterError>> {
task_tracker.spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
let mut qmdl_store = qmdl_store_lock.write().await;
if qmdl_store.current_entry.is_some() {
info!("Closing current QMDL entry...");
qmdl_store.close_current_entry().await?;
info!("Done!");
}
server_shutdown_tx.send(())
.expect("couldn't send server shutdown signal");
diag_device_sender.send(DiagDeviceCtrlMessage::Exit).await
.expect("couldn't send Exit message to diag thread");
},
Err(err) => {
error!("Unable to listen for shutdown signal: {}", err);
}
}
Ok(())
})
}
#[tokio::main]
@@ -70,23 +112,27 @@ async fn main() -> Result<(), WavehunterError> {
let args = parse_args();
let config = parse_config(&args.config_path)?;
let qmdl_bytes_lock: Arc<RwLock<usize>>;
if !config.debug_mode {
let mut dev = DiagDevice::new(&config.qmdl_path)
let task_tracker = TaskTracker::new();
let qmdl_store_lock = Arc::new(RwLock::new(init_qmdl_store(&config).await?));
let (tx, rx) = mpsc::channel::<DiagDeviceCtrlMessage>(1);
if !config.readonly_mode {
let qmdl_file = qmdl_store_lock.write().await.new_entry().await?;
let qmdl_writer = QmdlWriter::new(qmdl_file.into_std().await);
let mut dev = DiagDevice::new(Some(qmdl_writer))
.map_err(WavehunterError::DiagInitError)?;
dev.config_logs()
.map_err(WavehunterError::DiagInitError)?;
qmdl_bytes_lock = Arc::new(RwLock::new(dev.qmdl_writer.total_written));
// TODO: handle exiting gracefully
let _read_thread_handle = run_diag_read_thread(dev, qmdl_bytes_lock.clone());
} else {
let qmdl_file = File::open(&config.qmdl_path).await.expect("couldn't open QMDL file");
let qmdl_file_size = qmdl_file.metadata().await.expect("couldn't get QMDL file metadata")
.len() as usize;
qmdl_bytes_lock = Arc::new(RwLock::new(qmdl_file_size));
run_diag_read_thread(&task_tracker, dev, rx, qmdl_store_lock.clone());
}
println!("The orca is hunting for stingrays...");
run_server(&config, qmdl_bytes_lock).await
let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>();
run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, qmdl_store_lock.clone());
run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, tx).await;
task_tracker.close();
task_tracker.wait().await;
Ok(())
}