diff --git a/bin/src/daemon.rs b/bin/src/daemon.rs index 54df436..f0754ac 100644 --- a/bin/src/daemon.rs +++ b/bin/src/daemon.rs @@ -10,13 +10,18 @@ mod qmdl_store; mod server; mod stats; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + use crate::config::{parse_args, parse_config}; use crate::diag::run_diag_read_thread; use crate::error::RayhunterError; use crate::pcap::get_pcap; use crate::qmdl_store::RecordingStore; -use crate::server::{get_qmdl, serve_static, ServerState}; -use crate::stats::get_system_stats; +use crate::server::{get_qmdl, restart_daemon, serve_static, ServerState}; +use crate::stats::{get_qmdl_manifest, get_system_stats}; use analysis::{ get_analysis_status, run_analysis_thread, start_analysis, AnalysisCtrlMessage, AnalysisStatus, @@ -31,13 +36,12 @@ use diag::{ use log::{error, info}; use qmdl_store::RecordingStoreError; use rayhunter::diag_device::DiagDevice; -use stats::get_qmdl_manifest; -use std::net::SocketAddr; -use std::sync::Arc; use tokio::net::TcpListener; +use tokio::select; use tokio::sync::mpsc::{self, Sender}; use tokio::sync::{oneshot, RwLock}; use tokio::task::JoinHandle; +use tokio::time::sleep; use tokio_util::task::TaskTracker; type AppRouter = Router>; @@ -55,6 +59,7 @@ fn get_router() -> AppRouter { .route("/api/analysis-report/{name}", get(get_analysis_report)) .route("/api/analysis", get(get_analysis_status)) .route("/api/analysis/{name}", post(start_analysis)) + .route("/api/restart-daemon", post(restart_daemon)) .route("/", get(|| async { Redirect::permanent("/index.html") })) .route("/{*path}", get(serve_static)) } @@ -117,46 +122,60 @@ async fn init_qmdl_store(config: &config::Config) -> Result, + daemon_restart_rx: oneshot::Receiver<()>, + should_restart_flag: Arc, server_shutdown_tx: oneshot::Sender<()>, maybe_ui_shutdown_tx: Option>, qmdl_store_lock: Arc>, analysis_tx: Sender, ) -> JoinHandle> { + info!("create shutdown thread"); + task_tracker.spawn(async move { - match tokio::signal::ctrl_c().await { - Ok(()) => { - let mut qmdl_store = qmdl_store_lock.write().await; - if qmdl_store.current_entry.is_some() { - info!("Closing current QMDL entry..."); - qmdl_store.close_current_entry().await?; - info!("Done!"); + select! { + res = tokio::signal::ctrl_c() => { + if let Err(err) = res { + error!("Unable to listen for shutdown signal: {}", err); } - server_shutdown_tx - .send(()) - .expect("couldn't send server shutdown signal"); - info!("sending UI shutdown"); - if let Some(ui_shutdown_tx) = maybe_ui_shutdown_tx { - ui_shutdown_tx - .send(()) - .expect("couldn't send ui shutdown signal"); + should_restart_flag.store(false, Ordering::Relaxed); + } + res = daemon_restart_rx => { + if let Err(err) = res { + error!("Unable to listen for shutdown signal: {}", err); } - diag_device_sender - .send(DiagDeviceCtrlMessage::Exit) - .await - .expect("couldn't send Exit message to diag thread"); - analysis_tx - .send(AnalysisCtrlMessage::Exit) - .await - .expect("couldn't send Exit message to analysis thread"); - } - Err(err) => { - error!("Unable to listen for shutdown signal: {}", err); + + should_restart_flag.store(true, Ordering::Relaxed); } + }; + + let mut qmdl_store = qmdl_store_lock.write().await; + if qmdl_store.current_entry.is_some() { + info!("Closing current QMDL entry..."); + qmdl_store.close_current_entry().await?; + info!("Done!"); } + + server_shutdown_tx + .send(()) + .expect("couldn't send server shutdown signal"); + info!("sending UI shutdown"); + if let Some(ui_shutdown_tx) = maybe_ui_shutdown_tx { + ui_shutdown_tx + .send(()) + .expect("couldn't send ui shutdown signal"); + } + diag_device_sender + .send(DiagDeviceCtrlMessage::Exit) + .await + .expect("couldn't send Exit message to diag thread"); + analysis_tx + .send(AnalysisCtrlMessage::Exit) + .await + .expect("couldn't send Exit message to analysis thread"); Ok(()) }) } @@ -166,8 +185,21 @@ async fn main() -> Result<(), RayhunterError> { env_logger::init(); let args = parse_args(); - let config = parse_config(&args.config_path)?; + loop { + let config = parse_config(&args.config_path)?; + if !run_with_config(&config).await? { + return Ok(()); + } + + // For some reason the diag device needs a very long time to become available again within + // the same process, on TP-Link M7350 v3. While process restart would reset it faster. + println!("Restarting Rayhunter. Waiting for 5 seconds..."); + sleep(Duration::from_secs(5)).await; + } +} + +async fn run_with_config(config: &config::Config) -> Result { // TaskTrackers give us an interface to spawn tokio threads, and then // eventually await all of them ending let task_tracker = TaskTracker::new(); @@ -207,8 +239,9 @@ async fn main() -> Result<(), RayhunterError> { info!("Starting Key Input service"); key_input::run_key_input_thread(&task_tracker, &config, diag_tx.clone()); } + + let (daemon_restart_tx, daemon_restart_rx) = oneshot::channel::<()>(); let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>(); - info!("create shutdown thread"); let analysis_status_lock = Arc::new(RwLock::new(analysis_status)); run_analysis_thread( &task_tracker, @@ -218,9 +251,13 @@ async fn main() -> Result<(), RayhunterError> { config.enable_dummy_analyzer, config.analyzers.clone(), ); - run_ctrl_c_thread( + let should_restart_flag = Arc::new(AtomicBool::new(false)); + + run_shutdown_thread( &task_tracker, diag_tx.clone(), + daemon_restart_rx, + should_restart_flag.clone(), server_shutdown_tx, maybe_ui_shutdown_tx, qmdl_store_lock.clone(), @@ -233,6 +270,7 @@ async fn main() -> Result<(), RayhunterError> { debug_mode: config.debug_mode, analysis_status_lock, analysis_sender: analysis_tx, + daemon_restart_tx: Arc::new(RwLock::new(Some(daemon_restart_tx))), }); run_server(&task_tracker, &config, state, server_shutdown_rx).await; @@ -240,7 +278,7 @@ async fn main() -> Result<(), RayhunterError> { task_tracker.wait().await; info!("see you space cowboy..."); - Ok(()) + Ok(should_restart_flag.load(Ordering::Relaxed)) } #[cfg(test)] diff --git a/bin/src/server.rs b/bin/src/server.rs index 9065733..19d990b 100644 --- a/bin/src/server.rs +++ b/bin/src/server.rs @@ -8,7 +8,7 @@ use include_dir::{include_dir, Dir}; use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio::sync::mpsc::Sender; -use tokio::sync::RwLock; +use tokio::sync::{oneshot, RwLock}; use tokio_util::io::ReaderStream; use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus}; @@ -22,6 +22,7 @@ pub struct ServerState { pub analysis_status_lock: Arc>, pub analysis_sender: Sender, pub debug_mode: bool, + pub daemon_restart_tx: Arc>>>, } pub async fn get_qmdl( @@ -76,3 +77,25 @@ pub async fn serve_static( .unwrap(), } } + +pub async fn restart_daemon( + State(state): State>, +) -> Result<(StatusCode, String), (StatusCode, String)> { + let mut restart_tx = state.daemon_restart_tx.write().await; + + if let Some(sender) = restart_tx.take() { + sender.send(()).map_err(|()| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + "couldn't send restart signal".to_string(), + ) + })?; + + Ok((StatusCode::ACCEPTED, "restart signal sent".to_string())) + } else { + Ok(( + StatusCode::ACCEPTED, + "restart already triggered".to_string(), + )) + } +} diff --git a/lib/src/diag_device.rs b/lib/src/diag_device.rs index 8693233..f364e1f 100644 --- a/lib/src/diag_device.rs +++ b/lib/src/diag_device.rs @@ -7,7 +7,7 @@ use crate::log_codes; use deku::prelude::*; use futures::TryStream; -use log::{error, info}; +use log::{debug, error, info}; use std::io::ErrorKind; use std::os::fd::AsRawFd; use thiserror::Error; @@ -123,7 +123,7 @@ impl DiagDevice { .map_err(DiagDeviceError::DeviceReadFailed)?; } - info!( + debug!( "Parsing messages container size = {:?} [{:?}]", bytes_read, &self.read_buf[0..bytes_read]