mirror of
https://github.com/EFForg/rayhunter.git
synced 2026-04-27 07:59:59 -07:00
Transition to async I/O for most things
Mixing async and sync I/O leads to a multitude of complications, and generally speaking it's much more convenient to stick to one paradigm or the other. Since axum (and many other HTTP servers) use async, and since async is a convenient model for performing operations like "handle an MPSC message or file read, whichever happens first", let's commit to an async interface.
This commit is contained in:
119
bin/src/diag.rs
119
bin/src/diag.rs
@@ -1,87 +1,78 @@
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum::http::StatusCode;
|
||||
use rayhunter::diag::DataType;
|
||||
use rayhunter::diag_device::DiagDevice;
|
||||
use rayhunter::diag_reader::DiagReader;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::mpsc::{Receiver, self};
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use rayhunter::qmdl::QmdlWriter;
|
||||
use log::{debug, info};
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
use tokio::task::JoinHandle;
|
||||
use log::{debug, error, info};
|
||||
use tokio::fs::File;
|
||||
use tokio_util::task::TaskTracker;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
|
||||
use crate::error::RayhunterError;
|
||||
use crate::qmdl_store::QmdlStore;
|
||||
use crate::server::ServerState;
|
||||
|
||||
pub enum DiagDeviceCtrlMessage {
|
||||
StopRecording,
|
||||
StartRecording(QmdlWriter<std::fs::File>),
|
||||
StartRecording(QmdlWriter<File>),
|
||||
Exit,
|
||||
}
|
||||
|
||||
pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>, qmdl_store_lock: Arc<RwLock<QmdlStore>>) -> JoinHandle<Result<(), RayhunterError>> {
|
||||
// mpsc channel for updating QmdlStore entry filesizes. First usize is the
|
||||
// index, second is the size in bytes
|
||||
let (tx, mut rx) = mpsc::channel::<(usize, usize)>(1);
|
||||
|
||||
// Spawn a thread to monitor the (usize, usize) channel for updates,
|
||||
// triggering QmdlStore updates
|
||||
let qmdl_store_lock_clone = qmdl_store_lock.clone();
|
||||
pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>, qmdl_store_lock: Arc<RwLock<QmdlStore>>) {
|
||||
task_tracker.spawn(async move {
|
||||
while let Some((entry_idx, new_size)) = rx.recv().await {
|
||||
let mut qmdl_store = qmdl_store_lock_clone.write().await;
|
||||
qmdl_store.update_entry(entry_idx, new_size).await
|
||||
.expect("failed to update qmdl file size");
|
||||
}
|
||||
info!("QMDL store size updater thread exiting...");
|
||||
});
|
||||
|
||||
// Spawn a thread to drive the DiagDevice reading loop. Since DiagDevice
|
||||
// works via synchronous I/O, we have to spawn a "blocking" thread to avoid
|
||||
// gumming up tokio's event loop.
|
||||
task_tracker.spawn_blocking(move || {
|
||||
let initial_file = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry");
|
||||
let mut qmdl_writer: Option<QmdlWriter<File>> = Some(QmdlWriter::new(initial_file));
|
||||
let mut diag_stream = pin!(dev.as_stream().into_stream());
|
||||
loop {
|
||||
// First check if we've gotten any control meesages
|
||||
match qmdl_file_rx.try_recv() {
|
||||
Ok(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)) => {
|
||||
dev.qmdl_writer = Some(qmdl_writer);
|
||||
},
|
||||
Ok(DiagDeviceCtrlMessage::StopRecording) => dev.qmdl_writer = None,
|
||||
// Disconnected means all the Senders have been dropped, so it's
|
||||
// time to go
|
||||
Ok(DiagDeviceCtrlMessage::Exit) | Err(TryRecvError::Disconnected) => {
|
||||
info!("Diag reader thread exiting...");
|
||||
return Ok(())
|
||||
},
|
||||
// empty just means there's no message for us, so continue as normal
|
||||
Err(TryRecvError::Empty) => {},
|
||||
}
|
||||
|
||||
// remember the QmdlStore current entry index so we can update its size later
|
||||
let qmdl_store_index = qmdl_store_lock.blocking_read().current_entry;
|
||||
|
||||
// TODO: once we're actually doing analysis, we'll wanna use the messages
|
||||
// returned here. Until then, the DiagDevice has already written those messages
|
||||
// to the QMDL file, so we can just ignore them.
|
||||
debug!("reading response from diag device...");
|
||||
let _messages = dev.read_response().map_err(RayhunterError::DiagReadError)?;
|
||||
debug!("got diag response ({} messages)", _messages.len());
|
||||
|
||||
// keep track of how many bytes were written to the QMDL file so we can read
|
||||
// a valid block of data from it in the HTTP server
|
||||
if let Some(qmdl_writer) = dev.qmdl_writer.as_ref() {
|
||||
debug!("total QMDL bytes written: {}, sending update...", qmdl_writer.total_written);
|
||||
let index = qmdl_store_index.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
|
||||
tx.blocking_send((index, qmdl_writer.total_written)).unwrap();
|
||||
debug!("done!");
|
||||
} else {
|
||||
debug!("no qmdl_writer set, continuing...");
|
||||
tokio::select! {
|
||||
msg = qmdl_file_rx.recv() => {
|
||||
match msg {
|
||||
Some(DiagDeviceCtrlMessage::StartRecording(new_writer)) => {
|
||||
qmdl_writer = Some(new_writer);
|
||||
},
|
||||
Some(DiagDeviceCtrlMessage::StopRecording) => qmdl_writer = None,
|
||||
// None means all the Senders have been dropped, so it's
|
||||
// time to go
|
||||
Some(DiagDeviceCtrlMessage::Exit) | None => {
|
||||
info!("Diag reader thread exiting...");
|
||||
return Ok(())
|
||||
},
|
||||
}
|
||||
}
|
||||
maybe_container = diag_stream.next() => {
|
||||
match maybe_container.unwrap() {
|
||||
Ok(container) => {
|
||||
if container.data_type != DataType::UserSpace {
|
||||
debug!("skipping non-userspace diag messages...");
|
||||
continue;
|
||||
}
|
||||
// keep track of how many bytes were written to the QMDL file so we can read
|
||||
// a valid block of data from it in the HTTP server
|
||||
if let Some(writer) = qmdl_writer.as_mut() {
|
||||
writer.write_container(&container).await.expect("failed to write to QMDL writer");
|
||||
debug!("total QMDL bytes written: {}, updating manifest...", writer.total_written);
|
||||
let mut qmdl_store = qmdl_store_lock.write().await;
|
||||
let index = qmdl_store.current_entry.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
|
||||
qmdl_store.update_entry(index, writer.total_written).await
|
||||
.expect("failed to update qmdl file size");
|
||||
debug!("done!");
|
||||
} else {
|
||||
debug!("no qmdl_writer set, continuing...");
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
error!("error reading diag device: {}", err);
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn start_recording(State(state): State<Arc<ServerState>>) -> Result<(StatusCode, String), (StatusCode, String)> {
|
||||
@@ -91,7 +82,7 @@ pub async fn start_recording(State(state): State<Arc<ServerState>>) -> Result<(S
|
||||
let mut qmdl_store = state.qmdl_store_lock.write().await;
|
||||
let qmdl_file = qmdl_store.new_entry().await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't create new qmdl entry: {}", e)))?;
|
||||
let qmdl_writer = QmdlWriter::new(qmdl_file.into_std().await);
|
||||
let qmdl_writer = QmdlWriter::new(qmdl_file);
|
||||
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)).await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
|
||||
Ok((StatusCode::ACCEPTED, format!("ok")))
|
||||
|
||||
Reference in New Issue
Block a user