diff --git a/daemon/src/display/generic_framebuffer.rs b/daemon/src/display/generic_framebuffer.rs index 4e3f856..d0e0ab1 100644 --- a/daemon/src/display/generic_framebuffer.rs +++ b/daemon/src/display/generic_framebuffer.rs @@ -9,9 +9,7 @@ use rayhunter::analysis::analyzer::EventType; use log::{error, info}; use tokio::sync::mpsc::Receiver; -use tokio::sync::oneshot; -use tokio::sync::oneshot::error::TryRecvError; -use tokio_util::task::TaskTracker; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; use include_dir::{Dir, include_dir}; @@ -173,7 +171,7 @@ pub fn update_ui( task_tracker: &TaskTracker, config: &config::Config, mut fb: impl GenericFramebuffer, - mut ui_shutdown_rx: oneshot::Receiver<()>, + shutdown_token: CancellationToken, mut ui_update_rx: Receiver, ) { static IMAGE_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/images/"); @@ -204,13 +202,9 @@ pub fn update_ui( ); } loop { - match ui_shutdown_rx.try_recv() { - Ok(_) => { - info!("received UI shutdown"); - break; - } - Err(TryRecvError::Empty) => {} - Err(e) => panic!("error receiving shutdown message: {e}"), + if shutdown_token.is_cancelled() { + info!("received UI shutdown"); + break; } match ui_update_rx.try_recv() { Ok(state) => { diff --git a/daemon/src/display/headless.rs b/daemon/src/display/headless.rs index d090b48..88cfe31 100644 --- a/daemon/src/display/headless.rs +++ b/daemon/src/display/headless.rs @@ -1,6 +1,6 @@ use log::info; use tokio::sync::mpsc::Receiver; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; use crate::config; @@ -9,7 +9,7 @@ use crate::display::DisplayState; pub fn update_ui( _task_tracker: &TaskTracker, _config: &config::Config, - _ui_shutdown_rx: oneshot::Receiver<()>, + _shutdown_token: CancellationToken, _ui_update_rx: Receiver, ) { info!("Headless mode, not spawning UI."); diff --git a/daemon/src/display/orbic.rs b/daemon/src/display/orbic.rs index af92637..6a07e83 100644 --- a/daemon/src/display/orbic.rs +++ b/daemon/src/display/orbic.rs @@ -4,7 +4,7 @@ use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer}; use async_trait::async_trait; use tokio::sync::mpsc::Receiver; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; const FB_PATH: &str = "/dev/fb0"; @@ -38,14 +38,14 @@ impl GenericFramebuffer for Framebuffer { pub fn update_ui( task_tracker: &TaskTracker, config: &config::Config, - ui_shutdown_rx: oneshot::Receiver<()>, + shutdown_token: CancellationToken, ui_update_rx: Receiver, ) { generic_framebuffer::update_ui( task_tracker, config, Framebuffer, - ui_shutdown_rx, + shutdown_token, ui_update_rx, ) } diff --git a/daemon/src/display/tmobile.rs b/daemon/src/display/tmobile.rs index 2a20a59..1a39fc1 100644 --- a/daemon/src/display/tmobile.rs +++ b/daemon/src/display/tmobile.rs @@ -4,7 +4,7 @@ /// DisplayState::WarningDetected { .. } => Signal LED slowly blinks red. use log::{error, info}; use tokio::sync::mpsc; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; use std::time::Duration; @@ -27,7 +27,7 @@ async fn stop_blinking(path: String) { pub fn update_ui( task_tracker: &TaskTracker, config: &config::Config, - mut ui_shutdown_rx: oneshot::Receiver<()>, + shutdown_token: CancellationToken, mut ui_update_rx: mpsc::Receiver, ) { let mut invisible: bool = false; @@ -40,13 +40,9 @@ pub fn update_ui( let mut last_state = DisplayState::Paused; loop { - match ui_shutdown_rx.try_recv() { - Ok(_) => { - info!("received UI shutdown"); - break; - } - Err(oneshot::error::TryRecvError::Empty) => {} - Err(e) => panic!("error receiving shutdown message: {e}"), + if shutdown_token.is_cancelled() { + info!("received UI shutdown"); + break; } match ui_update_rx.try_recv() { Ok(new_state) => state = new_state, diff --git a/daemon/src/display/tplink.rs b/daemon/src/display/tplink.rs index 522e839..fe51277 100644 --- a/daemon/src/display/tplink.rs +++ b/daemon/src/display/tplink.rs @@ -1,6 +1,6 @@ use log::info; use tokio::sync::mpsc::Receiver; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; use crate::config; @@ -11,7 +11,7 @@ use std::fs; pub fn update_ui( task_tracker: &TaskTracker, config: &config::Config, - ui_shutdown_rx: oneshot::Receiver<()>, + shutdown_token: CancellationToken, ui_update_rx: Receiver, ) { let display_level = config.ui_level; @@ -23,9 +23,9 @@ pub fn update_ui( // The alternative would be to make the entire initialization async if fs::exists(tplink_onebit::OLED_PATH).unwrap_or_default() { info!("detected one-bit display"); - tplink_onebit::update_ui(task_tracker, config, ui_shutdown_rx, ui_update_rx) + tplink_onebit::update_ui(task_tracker, config, shutdown_token, ui_update_rx) } else { info!("fallback to framebuffer"); - tplink_framebuffer::update_ui(task_tracker, config, ui_shutdown_rx, ui_update_rx) + tplink_framebuffer::update_ui(task_tracker, config, shutdown_token, ui_update_rx) } } diff --git a/daemon/src/display/tplink_framebuffer.rs b/daemon/src/display/tplink_framebuffer.rs index 5963538..8189eda 100644 --- a/daemon/src/display/tplink_framebuffer.rs +++ b/daemon/src/display/tplink_framebuffer.rs @@ -2,13 +2,13 @@ use async_trait::async_trait; use std::os::fd::AsRawFd; use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; +use tokio_util::sync::CancellationToken; use crate::config; use crate::display::DisplayState; use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer}; use tokio::sync::mpsc::Receiver; -use tokio::sync::oneshot; use tokio_util::task::TaskTracker; const FB_PATH: &str = "/dev/fb0"; @@ -80,14 +80,14 @@ impl GenericFramebuffer for Framebuffer { pub fn update_ui( task_tracker: &TaskTracker, config: &config::Config, - ui_shutdown_rx: oneshot::Receiver<()>, + shutdown_token: CancellationToken, ui_update_rx: Receiver, ) { generic_framebuffer::update_ui( task_tracker, config, Framebuffer, - ui_shutdown_rx, + shutdown_token, ui_update_rx, ) } diff --git a/daemon/src/display/tplink_onebit.rs b/daemon/src/display/tplink_onebit.rs index d33c4b0..fbf3ded 100644 --- a/daemon/src/display/tplink_onebit.rs +++ b/daemon/src/display/tplink_onebit.rs @@ -6,8 +6,7 @@ use crate::display::DisplayState; use log::{error, info}; use tokio::sync::mpsc::Receiver; -use tokio::sync::oneshot; -use tokio::sync::oneshot::error::TryRecvError; +use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; use std::time::Duration; @@ -112,7 +111,7 @@ const STATUS_WARNING: &[u8] = pixelart! { pub fn update_ui( task_tracker: &TaskTracker, config: &config::Config, - mut ui_shutdown_rx: oneshot::Receiver<()>, + shutdown_token: CancellationToken, mut ui_update_rx: Receiver, ) { let display_level = config.ui_level; @@ -124,13 +123,9 @@ pub fn update_ui( let mut pixels = STATUS_SMILING; loop { - match ui_shutdown_rx.try_recv() { - Ok(_) => { - info!("received UI shutdown"); - break; - } - Err(TryRecvError::Empty) => {} - Err(e) => panic!("error receiving shutdown message: {e}"), + if shutdown_token.is_cancelled() { + info!("received UI shutdown"); + break; } match ui_update_rx.try_recv() { diff --git a/daemon/src/display/uz801.rs b/daemon/src/display/uz801.rs index 6127258..1f7bd59 100644 --- a/daemon/src/display/uz801.rs +++ b/daemon/src/display/uz801.rs @@ -4,7 +4,7 @@ /// DisplayState::WarningDetected => Signal LED is solid red. use log::{error, info}; use tokio::sync::mpsc; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; use std::time::Duration; @@ -27,7 +27,7 @@ async fn led_off(path: String) { pub fn update_ui( task_tracker: &TaskTracker, config: &config::Config, - mut ui_shutdown_rx: oneshot::Receiver<()>, + shutdown_token: CancellationToken, mut ui_update_rx: mpsc::Receiver, ) { let mut invisible: bool = false; @@ -41,13 +41,9 @@ pub fn update_ui( let mut last_update = std::time::Instant::now(); loop { - match ui_shutdown_rx.try_recv() { - Ok(_) => { - info!("received UI shutdown"); - break; - } - Err(oneshot::error::TryRecvError::Empty) => {} - Err(e) => panic!("error receiving shutdown message: {e}"), + if shutdown_token.is_cancelled() { + info!("received UI shutdown"); + break; } match ui_update_rx.try_recv() { Ok(new_state) => state = new_state, diff --git a/daemon/src/display/wingtech.rs b/daemon/src/display/wingtech.rs index d88951c..36fa03a 100644 --- a/daemon/src/display/wingtech.rs +++ b/daemon/src/display/wingtech.rs @@ -10,7 +10,7 @@ use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer}; use async_trait::async_trait; use tokio::sync::mpsc::Receiver; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; const FB_PATH: &str = "/dev/fb0"; @@ -43,14 +43,14 @@ impl GenericFramebuffer for Framebuffer { pub fn update_ui( task_tracker: &TaskTracker, config: &config::Config, - ui_shutdown_rx: oneshot::Receiver<()>, + shutdown_token: CancellationToken, ui_update_rx: Receiver, ) { generic_framebuffer::update_ui( task_tracker, config, Framebuffer, - ui_shutdown_rx, + shutdown_token, ui_update_rx, ) } diff --git a/daemon/src/key_input.rs b/daemon/src/key_input.rs index 2a66c8c..4865fb4 100644 --- a/daemon/src/key_input.rs +++ b/daemon/src/key_input.rs @@ -3,7 +3,7 @@ use std::time::{Duration, Instant}; use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::sync::mpsc::Sender; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; use crate::config; @@ -21,7 +21,7 @@ pub fn run_key_input_thread( task_tracker: &TaskTracker, config: &config::Config, diag_tx: Sender, - mut ui_shutdown_rx: oneshot::Receiver<()>, + cancellation_token: CancellationToken, ) { if config.key_input_mode == 0 { return; @@ -43,7 +43,7 @@ pub fn run_key_input_thread( loop { tokio::select! { - _ = &mut ui_shutdown_rx => { + _ = cancellation_token.cancelled() => { info!("received key input shutdown"); return; } diff --git a/daemon/src/main.rs b/daemon/src/main.rs index b4ace33..a121d47 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -46,6 +46,7 @@ use tokio::select; use tokio::sync::mpsc::{self, Sender}; use tokio::sync::{RwLock, oneshot}; use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; type AppRouter = Router>; @@ -78,7 +79,7 @@ fn get_router() -> AppRouter { async fn run_server( task_tracker: &TaskTracker, state: Arc, - server_shutdown_rx: oneshot::Receiver<()>, + shutdown_token: CancellationToken, ) -> JoinHandle<()> { info!("spinning up server"); let addr = SocketAddr::from(([0, 0, 0, 0], state.config.port)); @@ -88,17 +89,12 @@ async fn run_server( task_tracker.spawn(async move { info!("The orca is hunting for stingrays..."); axum::serve(listener, app) - .with_graceful_shutdown(server_shutdown_signal(server_shutdown_rx)) + .with_graceful_shutdown(shutdown_token.cancelled_owned()) .await .unwrap(); }) } -async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) { - server_shutdown_rx.await.unwrap(); - info!("Server received shutdown signal, exiting..."); -} - // Loads a RecordingStore if one exists, and if not, only create one if we're // not in debug mode. If we fail to parse the manifest AND we're not in debug // mode, try to recover the manifest from the existing QMDL files @@ -136,9 +132,7 @@ fn run_shutdown_thread( diag_device_sender: Sender, daemon_restart_rx: oneshot::Receiver<()>, should_restart_flag: Arc, - server_shutdown_tx: oneshot::Sender<()>, - maybe_ui_shutdown_tx: Option>, - maybe_key_input_shutdown_tx: Option>, + shutdown_token: CancellationToken, qmdl_store_lock: Arc>, analysis_tx: Sender, ) -> JoinHandle> { @@ -169,15 +163,7 @@ fn run_shutdown_thread( info!("Done!"); } - server_shutdown_tx - .send(()) - .expect("couldn't send server shutdown signal"); - if let Some(ui_shutdown_tx) = maybe_ui_shutdown_tx { - let _ = ui_shutdown_tx.send(()); - } - if let Some(key_input_shutdown_tx) = maybe_key_input_shutdown_tx { - let _ = key_input_shutdown_tx.send(()); - } + shutdown_token.cancel(); diag_device_sender .send(DiagDeviceCtrlMessage::Exit) .await @@ -223,14 +209,11 @@ async fn run_with_config( let (diag_tx, diag_rx) = mpsc::channel::(1); let (ui_update_tx, ui_update_rx) = mpsc::channel::(1); let (analysis_tx, analysis_rx) = mpsc::channel::(5); - let mut maybe_ui_shutdown_tx = None; - let mut maybe_key_input_shutdown_tx = None; + let shutdown_token = CancellationToken::new(); let notification_service = NotificationService::new(config.ntfy_url.clone()); if !config.debug_mode { - let (ui_shutdown_tx, ui_shutdown_rx) = oneshot::channel(); - maybe_ui_shutdown_tx = Some(ui_shutdown_tx); info!("Using configuration for device: {0:?}", config.device); let mut dev = DiagDevice::new(&config.device) .await @@ -261,21 +244,18 @@ async fn run_with_config( Device::Pinephone => display::headless::update_ui, Device::Uz801 => display::uz801::update_ui, }; - update_ui(&task_tracker, &config, ui_shutdown_rx, ui_update_rx); + update_ui(&task_tracker, &config, shutdown_token.clone(), ui_update_rx); info!("Starting Key Input service"); - let (key_input_shutdown_tx, key_input_shutdown_rx) = oneshot::channel(); - maybe_key_input_shutdown_tx = Some(key_input_shutdown_tx); key_input::run_key_input_thread( &task_tracker, &config, diag_tx.clone(), - key_input_shutdown_rx, + shutdown_token.clone(), ); } let (daemon_restart_tx, daemon_restart_rx) = oneshot::channel::<()>(); - let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>(); let analysis_status_lock = Arc::new(RwLock::new(analysis_status)); run_analysis_thread( &task_tracker, @@ -291,9 +271,7 @@ async fn run_with_config( diag_tx.clone(), daemon_restart_rx, should_restart_flag.clone(), - server_shutdown_tx, - maybe_ui_shutdown_tx, - maybe_key_input_shutdown_tx, + shutdown_token.clone(), qmdl_store_lock.clone(), analysis_tx.clone(), ); @@ -308,7 +286,7 @@ async fn run_with_config( daemon_restart_tx: Arc::new(RwLock::new(Some(daemon_restart_tx))), ui_update_sender: Some(ui_update_tx), }); - run_server(&task_tracker, state, server_shutdown_rx).await; + run_server(&task_tracker, state, shutdown_token.clone()).await; task_tracker.close(); task_tracker.wait().await;