Use a cancellation token for restart logic as well (#602)

This commit is contained in:
Simon Fondrie-Teitler
2025-09-18 04:00:07 -04:00
committed by GitHub
parent 766f3461d3
commit b00f17d8fc
2 changed files with 16 additions and 43 deletions

View File

@@ -13,7 +13,6 @@ mod stats;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::config::{parse_args, parse_config};
use crate::diag::run_diag_read_thread;
@@ -43,8 +42,8 @@ use rayhunter::diag_device::DiagDevice;
use stats::get_log;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::RwLock;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::{RwLock, oneshot};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
@@ -126,12 +125,9 @@ async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, Rayh
// Start a thread that'll track when user hits ctrl+c. When that happens,
// trigger various cleanup tasks, including sending signals to other threads to
// shutdown
#[allow(clippy::too_many_arguments)]
fn run_shutdown_thread(
task_tracker: &TaskTracker,
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
daemon_restart_rx: oneshot::Receiver<()>,
should_restart_flag: Arc<AtomicBool>,
shutdown_token: CancellationToken,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
analysis_tx: Sender<AnalysisCtrlMessage>,
@@ -144,17 +140,9 @@ fn run_shutdown_thread(
if let Err(err) = res {
error!("Unable to listen for shutdown signal: {err}");
}
should_restart_flag.store(false, Ordering::Relaxed);
}
res = daemon_restart_rx => {
if let Err(err) = res {
error!("Unable to listen for shutdown signal: {err}");
}
should_restart_flag.store(true, Ordering::Relaxed);
}
};
_ = shutdown_token.cancelled() => {}
}
let mut qmdl_store = qmdl_store_lock.write().await;
if qmdl_store.current_entry.is_some() {
@@ -209,7 +197,8 @@ async fn run_with_config(
let (diag_tx, diag_rx) = mpsc::channel::<DiagDeviceCtrlMessage>(1);
let (ui_update_tx, ui_update_rx) = mpsc::channel::<display::DisplayState>(1);
let (analysis_tx, analysis_rx) = mpsc::channel::<AnalysisCtrlMessage>(5);
let shutdown_token = CancellationToken::new();
let restart_token = CancellationToken::new();
let shutdown_token = restart_token.child_token();
let notification_service = NotificationService::new(config.ntfy_url.clone());
@@ -255,7 +244,6 @@ async fn run_with_config(
);
}
let (daemon_restart_tx, daemon_restart_rx) = oneshot::channel::<()>();
let analysis_status_lock = Arc::new(RwLock::new(analysis_status));
run_analysis_thread(
&task_tracker,
@@ -264,13 +252,10 @@ async fn run_with_config(
analysis_status_lock.clone(),
config.analyzers.clone(),
);
let should_restart_flag = Arc::new(AtomicBool::new(false));
run_shutdown_thread(
&task_tracker,
diag_tx.clone(),
daemon_restart_rx,
should_restart_flag.clone(),
shutdown_token.clone(),
qmdl_store_lock.clone(),
analysis_tx.clone(),
@@ -283,7 +268,7 @@ async fn run_with_config(
diag_device_ctrl_sender: diag_tx,
analysis_status_lock,
analysis_sender: analysis_tx,
daemon_restart_tx: Arc::new(RwLock::new(Some(daemon_restart_tx))),
daemon_restart_token: restart_token.clone(),
ui_update_sender: Some(ui_update_tx),
});
run_server(&task_tracker, state, shutdown_token.clone()).await;
@@ -292,7 +277,7 @@ async fn run_with_config(
task_tracker.wait().await;
info!("see you space cowboy...");
Ok(should_restart_flag.load(Ordering::Relaxed))
Ok(restart_token.is_cancelled())
}
#[cfg(test)]