mirror of
https://github.com/EFForg/rayhunter.git
synced 2026-05-02 18:29:59 -07:00
wavehunter: add comments
This commit is contained in:
@@ -4,7 +4,6 @@ use serde::Deserialize;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct ConfigFile {
|
||||
qmdl_path: Option<String>,
|
||||
qmdl_store_path: Option<String>,
|
||||
port: Option<u16>,
|
||||
readonly_mode: Option<bool>,
|
||||
@@ -12,7 +11,6 @@ struct ConfigFile {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
pub qmdl_path: String,
|
||||
pub qmdl_store_path: String,
|
||||
pub port: u16,
|
||||
pub readonly_mode: bool,
|
||||
@@ -21,7 +19,6 @@ pub struct Config {
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Config {
|
||||
qmdl_path: "./wavehunter.qmdl".to_string(),
|
||||
qmdl_store_path: "/data/wavehunter".to_string(),
|
||||
port: 8080,
|
||||
readonly_mode: false,
|
||||
@@ -34,7 +31,6 @@ pub fn parse_config<P>(path: P) -> Result<Config, WavehunterError> where P: AsRe
|
||||
if let Ok(config_file) = std::fs::read_to_string(&path) {
|
||||
let parsed_config: ConfigFile = toml::from_str(&config_file)
|
||||
.map_err(WavehunterError::ConfigFileParsingError)?;
|
||||
if let Some(path) = parsed_config.qmdl_path { config.qmdl_path = path }
|
||||
if let Some(path) = parsed_config.qmdl_store_path { config.qmdl_store_path = path }
|
||||
if let Some(port) = parsed_config.port { config.port = port }
|
||||
if let Some(debug_mode) = parsed_config.readonly_mode { config.readonly_mode = debug_mode }
|
||||
|
||||
@@ -23,7 +23,12 @@ pub enum DiagDeviceCtrlMessage {
|
||||
}
|
||||
|
||||
pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>, qmdl_store_lock: Arc<RwLock<QmdlStore>>) -> JoinHandle<Result<(), WavehunterError>> {
|
||||
// mpsc channel for updating QmdlStore entry filesizes. First usize is the
|
||||
// index, second is the size in bytes
|
||||
let (tx, mut rx) = mpsc::channel::<(usize, usize)>(1);
|
||||
|
||||
// Spawn a thread to monitor the (usize, usize) channel for updates,
|
||||
// triggering QmdlStore updates
|
||||
let qmdl_store_lock_clone = qmdl_store_lock.clone();
|
||||
task_tracker.spawn(async move {
|
||||
while let Some((entry_idx, new_size)) = rx.recv().await {
|
||||
@@ -34,13 +39,19 @@ pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut
|
||||
info!("QMDL store size updater thread exiting...");
|
||||
});
|
||||
|
||||
// Spawn a thread to drive the DiagDevice reading loop. Since DiagDevice
|
||||
// works via synchronous I/O, we have to spawn a "blocking" thread to avoid
|
||||
// gumming up tokio's event loop.
|
||||
task_tracker.spawn_blocking(move || {
|
||||
loop {
|
||||
// First check if we've gotten any control meesages
|
||||
match qmdl_file_rx.try_recv() {
|
||||
Ok(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)) => {
|
||||
dev.qmdl_writer = Some(qmdl_writer);
|
||||
},
|
||||
Ok(DiagDeviceCtrlMessage::StopRecording) => dev.qmdl_writer = None,
|
||||
// Disconnected means all the Senders have been dropped, so it's
|
||||
// time to go
|
||||
Ok(DiagDeviceCtrlMessage::Exit) | Err(TryRecvError::Disconnected) => {
|
||||
info!("Diag reader thread exiting...");
|
||||
return Ok(())
|
||||
|
||||
@@ -30,6 +30,9 @@ use tokio::net::TcpListener;
|
||||
use tokio::sync::{RwLock, oneshot};
|
||||
use std::sync::Arc;
|
||||
|
||||
// Runs the axum server, taking all the elements needed to build up our
|
||||
// ServerState and a oneshot Receiver that'll fire when it's time to shutdown
|
||||
// (i.e. user hit ctrl+c)
|
||||
async fn run_server(
|
||||
task_tracker: &TaskTracker,
|
||||
config: &config::Config,
|
||||
@@ -68,6 +71,8 @@ async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) {
|
||||
info!("Server received shutdown signal, exiting...");
|
||||
}
|
||||
|
||||
// Loads a QmdlStore if one exists, and if not, only create one if we're not in
|
||||
// readonly mode.
|
||||
async fn init_qmdl_store(config: &config::Config) -> Result<QmdlStore, WavehunterError> {
|
||||
match (QmdlStore::exists(&config.qmdl_store_path).await?, config.readonly_mode) {
|
||||
(true, _) => Ok(QmdlStore::load(&config.qmdl_store_path).await?),
|
||||
@@ -76,6 +81,9 @@ async fn init_qmdl_store(config: &config::Config) -> Result<QmdlStore, Wavehunte
|
||||
}
|
||||
}
|
||||
|
||||
// Start a thread that'll track when user hits ctrl+c. When that happens,
|
||||
// trigger various cleanup tasks, including sending signals to other threads to
|
||||
// shutdown
|
||||
fn run_ctrl_c_thread(
|
||||
task_tracker: &TaskTracker,
|
||||
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
|
||||
@@ -112,6 +120,8 @@ async fn main() -> Result<(), WavehunterError> {
|
||||
let args = parse_args();
|
||||
let config = parse_config(&args.config_path)?;
|
||||
|
||||
// TaskTrackers give us an interface to spawn tokio threads, and then
|
||||
// eventually await all of them ending
|
||||
let task_tracker = TaskTracker::new();
|
||||
|
||||
let qmdl_store_lock = Arc::new(RwLock::new(init_qmdl_store(&config).await?));
|
||||
|
||||
@@ -120,6 +120,7 @@ impl QmdlStore {
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
// Returns the corresponding QMDL file for a given entry
|
||||
pub async fn open_entry(&self, entry: &ManifestEntry) -> Result<File, QmdlStoreError> {
|
||||
let mut file_path = self.path.join(&entry.name);
|
||||
file_path.set_extension("qmdl");
|
||||
|
||||
Reference in New Issue
Block a user