Simplify shutdown with cancellation tokens (#601)

This commit is contained in:
Simon Fondrie-Teitler
2025-09-17 18:33:44 -04:00
committed by GitHub
parent d30dd6fd9d
commit 766f3461d3
11 changed files with 48 additions and 89 deletions

View File

@@ -9,9 +9,7 @@ use rayhunter::analysis::analyzer::EventType;
use log::{error, info};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio_util::task::TaskTracker;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use include_dir::{Dir, include_dir};
@@ -173,7 +171,7 @@ pub fn update_ui(
task_tracker: &TaskTracker,
config: &config::Config,
mut fb: impl GenericFramebuffer,
mut ui_shutdown_rx: oneshot::Receiver<()>,
shutdown_token: CancellationToken,
mut ui_update_rx: Receiver<DisplayState>,
) {
static IMAGE_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/images/");
@@ -204,13 +202,9 @@ pub fn update_ui(
);
}
loop {
match ui_shutdown_rx.try_recv() {
Ok(_) => {
info!("received UI shutdown");
break;
}
Err(TryRecvError::Empty) => {}
Err(e) => panic!("error receiving shutdown message: {e}"),
if shutdown_token.is_cancelled() {
info!("received UI shutdown");
break;
}
match ui_update_rx.try_recv() {
Ok(state) => {

View File

@@ -1,6 +1,6 @@
use log::info;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use crate::config;
@@ -9,7 +9,7 @@ use crate::display::DisplayState;
pub fn update_ui(
_task_tracker: &TaskTracker,
_config: &config::Config,
_ui_shutdown_rx: oneshot::Receiver<()>,
_shutdown_token: CancellationToken,
_ui_update_rx: Receiver<DisplayState>,
) {
info!("Headless mode, not spawning UI.");

View File

@@ -4,7 +4,7 @@ use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer};
use async_trait::async_trait;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
const FB_PATH: &str = "/dev/fb0";
@@ -38,14 +38,14 @@ impl GenericFramebuffer for Framebuffer {
pub fn update_ui(
task_tracker: &TaskTracker,
config: &config::Config,
ui_shutdown_rx: oneshot::Receiver<()>,
shutdown_token: CancellationToken,
ui_update_rx: Receiver<DisplayState>,
) {
generic_framebuffer::update_ui(
task_tracker,
config,
Framebuffer,
ui_shutdown_rx,
shutdown_token,
ui_update_rx,
)
}

View File

@@ -4,7 +4,7 @@
/// DisplayState::WarningDetected { .. } => Signal LED slowly blinks red.
use log::{error, info};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use std::time::Duration;
@@ -27,7 +27,7 @@ async fn stop_blinking(path: String) {
pub fn update_ui(
task_tracker: &TaskTracker,
config: &config::Config,
mut ui_shutdown_rx: oneshot::Receiver<()>,
shutdown_token: CancellationToken,
mut ui_update_rx: mpsc::Receiver<DisplayState>,
) {
let mut invisible: bool = false;
@@ -40,13 +40,9 @@ pub fn update_ui(
let mut last_state = DisplayState::Paused;
loop {
match ui_shutdown_rx.try_recv() {
Ok(_) => {
info!("received UI shutdown");
break;
}
Err(oneshot::error::TryRecvError::Empty) => {}
Err(e) => panic!("error receiving shutdown message: {e}"),
if shutdown_token.is_cancelled() {
info!("received UI shutdown");
break;
}
match ui_update_rx.try_recv() {
Ok(new_state) => state = new_state,

View File

@@ -1,6 +1,6 @@
use log::info;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use crate::config;
@@ -11,7 +11,7 @@ use std::fs;
pub fn update_ui(
task_tracker: &TaskTracker,
config: &config::Config,
ui_shutdown_rx: oneshot::Receiver<()>,
shutdown_token: CancellationToken,
ui_update_rx: Receiver<DisplayState>,
) {
let display_level = config.ui_level;
@@ -23,9 +23,9 @@ pub fn update_ui(
// The alternative would be to make the entire initialization async
if fs::exists(tplink_onebit::OLED_PATH).unwrap_or_default() {
info!("detected one-bit display");
tplink_onebit::update_ui(task_tracker, config, ui_shutdown_rx, ui_update_rx)
tplink_onebit::update_ui(task_tracker, config, shutdown_token, ui_update_rx)
} else {
info!("fallback to framebuffer");
tplink_framebuffer::update_ui(task_tracker, config, ui_shutdown_rx, ui_update_rx)
tplink_framebuffer::update_ui(task_tracker, config, shutdown_token, ui_update_rx)
}
}

View File

@@ -2,13 +2,13 @@ use async_trait::async_trait;
use std::os::fd::AsRawFd;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
use crate::config;
use crate::display::DisplayState;
use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_util::task::TaskTracker;
const FB_PATH: &str = "/dev/fb0";
@@ -80,14 +80,14 @@ impl GenericFramebuffer for Framebuffer {
pub fn update_ui(
task_tracker: &TaskTracker,
config: &config::Config,
ui_shutdown_rx: oneshot::Receiver<()>,
shutdown_token: CancellationToken,
ui_update_rx: Receiver<DisplayState>,
) {
generic_framebuffer::update_ui(
task_tracker,
config,
Framebuffer,
ui_shutdown_rx,
shutdown_token,
ui_update_rx,
)
}

View File

@@ -6,8 +6,7 @@ use crate::display::DisplayState;
use log::{error, info};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use std::time::Duration;
@@ -112,7 +111,7 @@ const STATUS_WARNING: &[u8] = pixelart! {
pub fn update_ui(
task_tracker: &TaskTracker,
config: &config::Config,
mut ui_shutdown_rx: oneshot::Receiver<()>,
shutdown_token: CancellationToken,
mut ui_update_rx: Receiver<DisplayState>,
) {
let display_level = config.ui_level;
@@ -124,13 +123,9 @@ pub fn update_ui(
let mut pixels = STATUS_SMILING;
loop {
match ui_shutdown_rx.try_recv() {
Ok(_) => {
info!("received UI shutdown");
break;
}
Err(TryRecvError::Empty) => {}
Err(e) => panic!("error receiving shutdown message: {e}"),
if shutdown_token.is_cancelled() {
info!("received UI shutdown");
break;
}
match ui_update_rx.try_recv() {

View File

@@ -4,7 +4,7 @@
/// DisplayState::WarningDetected => Signal LED is solid red.
use log::{error, info};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use std::time::Duration;
@@ -27,7 +27,7 @@ async fn led_off(path: String) {
pub fn update_ui(
task_tracker: &TaskTracker,
config: &config::Config,
mut ui_shutdown_rx: oneshot::Receiver<()>,
shutdown_token: CancellationToken,
mut ui_update_rx: mpsc::Receiver<DisplayState>,
) {
let mut invisible: bool = false;
@@ -41,13 +41,9 @@ pub fn update_ui(
let mut last_update = std::time::Instant::now();
loop {
match ui_shutdown_rx.try_recv() {
Ok(_) => {
info!("received UI shutdown");
break;
}
Err(oneshot::error::TryRecvError::Empty) => {}
Err(e) => panic!("error receiving shutdown message: {e}"),
if shutdown_token.is_cancelled() {
info!("received UI shutdown");
break;
}
match ui_update_rx.try_recv() {
Ok(new_state) => state = new_state,

View File

@@ -10,7 +10,7 @@ use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer};
use async_trait::async_trait;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
const FB_PATH: &str = "/dev/fb0";
@@ -43,14 +43,14 @@ impl GenericFramebuffer for Framebuffer {
pub fn update_ui(
task_tracker: &TaskTracker,
config: &config::Config,
ui_shutdown_rx: oneshot::Receiver<()>,
shutdown_token: CancellationToken,
ui_update_rx: Receiver<DisplayState>,
) {
generic_framebuffer::update_ui(
task_tracker,
config,
Framebuffer,
ui_shutdown_rx,
shutdown_token,
ui_update_rx,
)
}

View File

@@ -3,7 +3,7 @@ use std::time::{Duration, Instant};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use crate::config;
@@ -21,7 +21,7 @@ pub fn run_key_input_thread(
task_tracker: &TaskTracker,
config: &config::Config,
diag_tx: Sender<DiagDeviceCtrlMessage>,
mut ui_shutdown_rx: oneshot::Receiver<()>,
cancellation_token: CancellationToken,
) {
if config.key_input_mode == 0 {
return;
@@ -43,7 +43,7 @@ pub fn run_key_input_thread(
loop {
tokio::select! {
_ = &mut ui_shutdown_rx => {
_ = cancellation_token.cancelled() => {
info!("received key input shutdown");
return;
}

View File

@@ -46,6 +46,7 @@ use tokio::select;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::{RwLock, oneshot};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
type AppRouter = Router<Arc<ServerState>>;
@@ -78,7 +79,7 @@ fn get_router() -> AppRouter {
async fn run_server(
task_tracker: &TaskTracker,
state: Arc<ServerState>,
server_shutdown_rx: oneshot::Receiver<()>,
shutdown_token: CancellationToken,
) -> JoinHandle<()> {
info!("spinning up server");
let addr = SocketAddr::from(([0, 0, 0, 0], state.config.port));
@@ -88,17 +89,12 @@ async fn run_server(
task_tracker.spawn(async move {
info!("The orca is hunting for stingrays...");
axum::serve(listener, app)
.with_graceful_shutdown(server_shutdown_signal(server_shutdown_rx))
.with_graceful_shutdown(shutdown_token.cancelled_owned())
.await
.unwrap();
})
}
async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) {
server_shutdown_rx.await.unwrap();
info!("Server received shutdown signal, exiting...");
}
// Loads a RecordingStore if one exists, and if not, only create one if we're
// not in debug mode. If we fail to parse the manifest AND we're not in debug
// mode, try to recover the manifest from the existing QMDL files
@@ -136,9 +132,7 @@ fn run_shutdown_thread(
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
daemon_restart_rx: oneshot::Receiver<()>,
should_restart_flag: Arc<AtomicBool>,
server_shutdown_tx: oneshot::Sender<()>,
maybe_ui_shutdown_tx: Option<oneshot::Sender<()>>,
maybe_key_input_shutdown_tx: Option<oneshot::Sender<()>>,
shutdown_token: CancellationToken,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
analysis_tx: Sender<AnalysisCtrlMessage>,
) -> JoinHandle<Result<(), RayhunterError>> {
@@ -169,15 +163,7 @@ fn run_shutdown_thread(
info!("Done!");
}
server_shutdown_tx
.send(())
.expect("couldn't send server shutdown signal");
if let Some(ui_shutdown_tx) = maybe_ui_shutdown_tx {
let _ = ui_shutdown_tx.send(());
}
if let Some(key_input_shutdown_tx) = maybe_key_input_shutdown_tx {
let _ = key_input_shutdown_tx.send(());
}
shutdown_token.cancel();
diag_device_sender
.send(DiagDeviceCtrlMessage::Exit)
.await
@@ -223,14 +209,11 @@ async fn run_with_config(
let (diag_tx, diag_rx) = mpsc::channel::<DiagDeviceCtrlMessage>(1);
let (ui_update_tx, ui_update_rx) = mpsc::channel::<display::DisplayState>(1);
let (analysis_tx, analysis_rx) = mpsc::channel::<AnalysisCtrlMessage>(5);
let mut maybe_ui_shutdown_tx = None;
let mut maybe_key_input_shutdown_tx = None;
let shutdown_token = CancellationToken::new();
let notification_service = NotificationService::new(config.ntfy_url.clone());
if !config.debug_mode {
let (ui_shutdown_tx, ui_shutdown_rx) = oneshot::channel();
maybe_ui_shutdown_tx = Some(ui_shutdown_tx);
info!("Using configuration for device: {0:?}", config.device);
let mut dev = DiagDevice::new(&config.device)
.await
@@ -261,21 +244,18 @@ async fn run_with_config(
Device::Pinephone => display::headless::update_ui,
Device::Uz801 => display::uz801::update_ui,
};
update_ui(&task_tracker, &config, ui_shutdown_rx, ui_update_rx);
update_ui(&task_tracker, &config, shutdown_token.clone(), ui_update_rx);
info!("Starting Key Input service");
let (key_input_shutdown_tx, key_input_shutdown_rx) = oneshot::channel();
maybe_key_input_shutdown_tx = Some(key_input_shutdown_tx);
key_input::run_key_input_thread(
&task_tracker,
&config,
diag_tx.clone(),
key_input_shutdown_rx,
shutdown_token.clone(),
);
}
let (daemon_restart_tx, daemon_restart_rx) = oneshot::channel::<()>();
let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>();
let analysis_status_lock = Arc::new(RwLock::new(analysis_status));
run_analysis_thread(
&task_tracker,
@@ -291,9 +271,7 @@ async fn run_with_config(
diag_tx.clone(),
daemon_restart_rx,
should_restart_flag.clone(),
server_shutdown_tx,
maybe_ui_shutdown_tx,
maybe_key_input_shutdown_tx,
shutdown_token.clone(),
qmdl_store_lock.clone(),
analysis_tx.clone(),
);
@@ -308,7 +286,7 @@ async fn run_with_config(
daemon_restart_tx: Arc::new(RwLock::new(Some(daemon_restart_tx))),
ui_update_sender: Some(ui_update_tx),
});
run_server(&task_tracker, state, server_shutdown_rx).await;
run_server(&task_tracker, state, shutdown_token.clone()).await;
task_tracker.close();
task_tracker.wait().await;