add basic restart endpoint

This commit is contained in:
Markus Unterwaditzer
2025-06-02 21:11:55 +02:00
parent 0b05d1617c
commit 9b759e6b42
3 changed files with 100 additions and 39 deletions

View File

@@ -10,13 +10,18 @@ mod qmdl_store;
mod server;
mod stats;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::config::{parse_args, parse_config};
use crate::diag::run_diag_read_thread;
use crate::error::RayhunterError;
use crate::pcap::get_pcap;
use crate::qmdl_store::RecordingStore;
use crate::server::{get_qmdl, serve_static, ServerState};
use crate::stats::get_system_stats;
use crate::server::{get_qmdl, restart_daemon, serve_static, ServerState};
use crate::stats::{get_qmdl_manifest, get_system_stats};
use analysis::{
get_analysis_status, run_analysis_thread, start_analysis, AnalysisCtrlMessage, AnalysisStatus,
@@ -31,13 +36,12 @@ use diag::{
use log::{error, info};
use qmdl_store::RecordingStoreError;
use rayhunter::diag_device::DiagDevice;
use stats::get_qmdl_manifest;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::{oneshot, RwLock};
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tokio_util::task::TaskTracker;
type AppRouter = Router<Arc<ServerState>>;
@@ -55,6 +59,7 @@ fn get_router() -> AppRouter {
.route("/api/analysis-report/{name}", get(get_analysis_report))
.route("/api/analysis", get(get_analysis_status))
.route("/api/analysis/{name}", post(start_analysis))
.route("/api/restart-daemon", post(restart_daemon))
.route("/", get(|| async { Redirect::permanent("/index.html") }))
.route("/{*path}", get(serve_static))
}
@@ -117,46 +122,60 @@ async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, Rayh
// Start a thread that'll track when user hits ctrl+c. When that happens,
// trigger various cleanup tasks, including sending signals to other threads to
// shutdown
fn run_ctrl_c_thread(
fn run_shutdown_thread(
task_tracker: &TaskTracker,
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
daemon_restart_rx: oneshot::Receiver<()>,
should_restart_flag: Arc<AtomicBool>,
server_shutdown_tx: oneshot::Sender<()>,
maybe_ui_shutdown_tx: Option<oneshot::Sender<()>>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
analysis_tx: Sender<AnalysisCtrlMessage>,
) -> JoinHandle<Result<(), RayhunterError>> {
info!("create shutdown thread");
task_tracker.spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
let mut qmdl_store = qmdl_store_lock.write().await;
if qmdl_store.current_entry.is_some() {
info!("Closing current QMDL entry...");
qmdl_store.close_current_entry().await?;
info!("Done!");
select! {
res = tokio::signal::ctrl_c() => {
if let Err(err) = res {
error!("Unable to listen for shutdown signal: {}", err);
}
server_shutdown_tx
.send(())
.expect("couldn't send server shutdown signal");
info!("sending UI shutdown");
if let Some(ui_shutdown_tx) = maybe_ui_shutdown_tx {
ui_shutdown_tx
.send(())
.expect("couldn't send ui shutdown signal");
should_restart_flag.store(false, Ordering::Relaxed);
}
res = daemon_restart_rx => {
if let Err(err) = res {
error!("Unable to listen for shutdown signal: {}", err);
}
diag_device_sender
.send(DiagDeviceCtrlMessage::Exit)
.await
.expect("couldn't send Exit message to diag thread");
analysis_tx
.send(AnalysisCtrlMessage::Exit)
.await
.expect("couldn't send Exit message to analysis thread");
}
Err(err) => {
error!("Unable to listen for shutdown signal: {}", err);
should_restart_flag.store(true, Ordering::Relaxed);
}
};
let mut qmdl_store = qmdl_store_lock.write().await;
if qmdl_store.current_entry.is_some() {
info!("Closing current QMDL entry...");
qmdl_store.close_current_entry().await?;
info!("Done!");
}
server_shutdown_tx
.send(())
.expect("couldn't send server shutdown signal");
info!("sending UI shutdown");
if let Some(ui_shutdown_tx) = maybe_ui_shutdown_tx {
ui_shutdown_tx
.send(())
.expect("couldn't send ui shutdown signal");
}
diag_device_sender
.send(DiagDeviceCtrlMessage::Exit)
.await
.expect("couldn't send Exit message to diag thread");
analysis_tx
.send(AnalysisCtrlMessage::Exit)
.await
.expect("couldn't send Exit message to analysis thread");
Ok(())
})
}
@@ -166,8 +185,21 @@ async fn main() -> Result<(), RayhunterError> {
env_logger::init();
let args = parse_args();
let config = parse_config(&args.config_path)?;
loop {
let config = parse_config(&args.config_path)?;
if !run_with_config(&config).await? {
return Ok(());
}
// For some reason the diag device needs a very long time to become available again within
// the same process, on TP-Link M7350 v3. While process restart would reset it faster.
println!("Restarting Rayhunter. Waiting for 5 seconds...");
sleep(Duration::from_secs(5)).await;
}
}
async fn run_with_config(config: &config::Config) -> Result<bool, RayhunterError> {
// TaskTrackers give us an interface to spawn tokio threads, and then
// eventually await all of them ending
let task_tracker = TaskTracker::new();
@@ -207,8 +239,9 @@ async fn main() -> Result<(), RayhunterError> {
info!("Starting Key Input service");
key_input::run_key_input_thread(&task_tracker, &config, diag_tx.clone());
}
let (daemon_restart_tx, daemon_restart_rx) = oneshot::channel::<()>();
let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>();
info!("create shutdown thread");
let analysis_status_lock = Arc::new(RwLock::new(analysis_status));
run_analysis_thread(
&task_tracker,
@@ -218,9 +251,13 @@ async fn main() -> Result<(), RayhunterError> {
config.enable_dummy_analyzer,
config.analyzers.clone(),
);
run_ctrl_c_thread(
let should_restart_flag = Arc::new(AtomicBool::new(false));
run_shutdown_thread(
&task_tracker,
diag_tx.clone(),
daemon_restart_rx,
should_restart_flag.clone(),
server_shutdown_tx,
maybe_ui_shutdown_tx,
qmdl_store_lock.clone(),
@@ -233,6 +270,7 @@ async fn main() -> Result<(), RayhunterError> {
debug_mode: config.debug_mode,
analysis_status_lock,
analysis_sender: analysis_tx,
daemon_restart_tx: Arc::new(RwLock::new(Some(daemon_restart_tx))),
});
run_server(&task_tracker, &config, state, server_shutdown_rx).await;
@@ -240,7 +278,7 @@ async fn main() -> Result<(), RayhunterError> {
task_tracker.wait().await;
info!("see you space cowboy...");
Ok(())
Ok(should_restart_flag.load(Ordering::Relaxed))
}
#[cfg(test)]

View File

@@ -8,7 +8,7 @@ use include_dir::{include_dir, Dir};
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tokio::sync::{oneshot, RwLock};
use tokio_util::io::ReaderStream;
use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus};
@@ -22,6 +22,7 @@ pub struct ServerState {
pub analysis_status_lock: Arc<RwLock<AnalysisStatus>>,
pub analysis_sender: Sender<AnalysisCtrlMessage>,
pub debug_mode: bool,
pub daemon_restart_tx: Arc<RwLock<Option<oneshot::Sender<()>>>>,
}
pub async fn get_qmdl(
@@ -76,3 +77,25 @@ pub async fn serve_static(
.unwrap(),
}
}
pub async fn restart_daemon(
State(state): State<Arc<ServerState>>,
) -> Result<(StatusCode, String), (StatusCode, String)> {
let mut restart_tx = state.daemon_restart_tx.write().await;
if let Some(sender) = restart_tx.take() {
sender.send(()).map_err(|()| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"couldn't send restart signal".to_string(),
)
})?;
Ok((StatusCode::ACCEPTED, "restart signal sent".to_string()))
} else {
Ok((
StatusCode::ACCEPTED,
"restart already triggered".to_string(),
))
}
}