mirror of
https://github.com/EFForg/rayhunter.git
synced 2026-06-21 11:48:25 -07:00
Merge branch 'main' into notifications
This commit is contained in:
@@ -0,0 +1,265 @@
|
||||
use std::sync::Arc;
|
||||
use std::{future, pin};
|
||||
|
||||
use axum::Json;
|
||||
use axum::{
|
||||
extract::{Path, State},
|
||||
http::StatusCode,
|
||||
};
|
||||
use futures::TryStreamExt;
|
||||
use log::{error, info};
|
||||
use rayhunter::analysis::analyzer::{AnalyzerConfig, Harness};
|
||||
use rayhunter::diag::{DataType, MessagesContainer};
|
||||
use rayhunter::qmdl::QmdlReader;
|
||||
use serde::Serialize;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncWriteExt, BufWriter};
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::qmdl_store::RecordingStore;
|
||||
use crate::server::ServerState;
|
||||
|
||||
pub struct AnalysisWriter {
|
||||
writer: BufWriter<File>,
|
||||
harness: Harness,
|
||||
}
|
||||
|
||||
// We write our analysis results to a file immediately to minimize the amount of
|
||||
// state Rayhunter has to keep track of in memory. The analysis file's format is
|
||||
// Newline Delimited JSON
|
||||
// (https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson), which
|
||||
// lets us simply append new rows to the end without parsing the entire JSON
|
||||
// object beforehand.
|
||||
impl AnalysisWriter {
|
||||
pub async fn new(file: File, analyzer_config: &AnalyzerConfig) -> Result<Self, std::io::Error> {
|
||||
let harness = Harness::new_with_config(analyzer_config);
|
||||
|
||||
let mut result = Self {
|
||||
writer: BufWriter::new(file),
|
||||
harness,
|
||||
};
|
||||
let metadata = result.harness.get_metadata();
|
||||
result.write(&metadata).await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// Runs the analysis harness on the given container, serializing the results
|
||||
// to the analysis file, returning the whether any warnings were detected
|
||||
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<bool, std::io::Error> {
|
||||
let mut warning_detected = false;
|
||||
for row in self.harness.analyze_qmdl_messages(container) {
|
||||
if !row.is_empty() {
|
||||
self.write(&row).await?;
|
||||
}
|
||||
warning_detected |= row.contains_warnings();
|
||||
}
|
||||
Ok(warning_detected)
|
||||
}
|
||||
|
||||
async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
|
||||
let mut value_str = serde_json::to_string(value).unwrap();
|
||||
value_str.push('\n');
|
||||
self.writer.write_all(value_str.as_bytes()).await?;
|
||||
self.writer.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Flushes any pending I/O to disk before dropping the writer
|
||||
pub async fn close(mut self) -> Result<(), std::io::Error> {
|
||||
self.writer.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct AnalysisStatus {
|
||||
queued: Vec<String>,
|
||||
running: Option<String>,
|
||||
finished: Vec<String>,
|
||||
}
|
||||
|
||||
impl AnalysisStatus {
|
||||
pub fn new(store: &RecordingStore) -> Self {
|
||||
let existing_recordings: Vec<String> = store
|
||||
.manifest
|
||||
.entries
|
||||
.iter()
|
||||
.map(|entry| entry.name.clone())
|
||||
.collect();
|
||||
AnalysisStatus {
|
||||
queued: Vec::new(),
|
||||
running: None,
|
||||
finished: existing_recordings,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum AnalysisCtrlMessage {
|
||||
NewFilesQueued,
|
||||
RecordingFinished(String),
|
||||
Exit,
|
||||
}
|
||||
|
||||
async fn queued_len(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) -> usize {
|
||||
analysis_status_lock.read().await.queued.len()
|
||||
}
|
||||
|
||||
async fn dequeue_to_running(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) -> String {
|
||||
let mut analysis_status = analysis_status_lock.write().await;
|
||||
let name = analysis_status.queued.remove(0);
|
||||
assert!(analysis_status.running.is_none());
|
||||
analysis_status.running = Some(name.clone());
|
||||
name
|
||||
}
|
||||
|
||||
async fn finish_running_analysis(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) {
|
||||
let mut analysis_status = analysis_status_lock.write().await;
|
||||
let finished = analysis_status.running.take().unwrap();
|
||||
analysis_status.finished.push(finished);
|
||||
}
|
||||
|
||||
async fn perform_analysis(
|
||||
name: &str,
|
||||
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
|
||||
analyzer_config: &AnalyzerConfig,
|
||||
) -> Result<(), String> {
|
||||
info!("Opening QMDL and analysis file for {name}...");
|
||||
let (analysis_file, qmdl_file) = {
|
||||
let mut qmdl_store = qmdl_store_lock.write().await;
|
||||
let (entry_index, _) = qmdl_store
|
||||
.entry_for_name(name)
|
||||
.ok_or(format!("failed to find QMDL store entry for {name}"))?;
|
||||
let analysis_file = qmdl_store
|
||||
.clear_and_open_entry_analysis(entry_index)
|
||||
.await
|
||||
.map_err(|e| format!("{e:?}"))?;
|
||||
let qmdl_file = qmdl_store
|
||||
.open_entry_qmdl(entry_index)
|
||||
.await
|
||||
.map_err(|e| format!("{e:?}"))?;
|
||||
|
||||
(analysis_file, qmdl_file)
|
||||
};
|
||||
|
||||
let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config)
|
||||
.await
|
||||
.map_err(|e| format!("{e:?}"))?;
|
||||
let file_size = qmdl_file
|
||||
.metadata()
|
||||
.await
|
||||
.expect("failed to get QMDL file metadata")
|
||||
.len();
|
||||
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
|
||||
let mut qmdl_stream = pin::pin!(
|
||||
qmdl_reader
|
||||
.as_stream()
|
||||
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace))
|
||||
);
|
||||
|
||||
info!("Starting analysis for {name}...");
|
||||
while let Some(container) = qmdl_stream
|
||||
.try_next()
|
||||
.await
|
||||
.expect("failed getting QMDL container")
|
||||
{
|
||||
let _ = analysis_writer
|
||||
.analyze(container)
|
||||
.await
|
||||
.map_err(|e| format!("{e:?}"))?;
|
||||
}
|
||||
|
||||
analysis_writer
|
||||
.close()
|
||||
.await
|
||||
.map_err(|e| format!("{e:?}"))?;
|
||||
info!("Analysis for {name} complete!");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn run_analysis_thread(
|
||||
task_tracker: &TaskTracker,
|
||||
mut analysis_rx: Receiver<AnalysisCtrlMessage>,
|
||||
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
|
||||
analysis_status_lock: Arc<RwLock<AnalysisStatus>>,
|
||||
analyzer_config: AnalyzerConfig,
|
||||
) {
|
||||
task_tracker.spawn(async move {
|
||||
loop {
|
||||
match analysis_rx.recv().await {
|
||||
Some(AnalysisCtrlMessage::NewFilesQueued) => {
|
||||
let count = queued_len(analysis_status_lock.clone()).await;
|
||||
for _ in 0..count {
|
||||
let name = dequeue_to_running(analysis_status_lock.clone()).await;
|
||||
if let Err(err) =
|
||||
perform_analysis(&name, qmdl_store_lock.clone(), &analyzer_config).await
|
||||
{
|
||||
error!("failed to analyze {name}: {err}");
|
||||
}
|
||||
finish_running_analysis(analysis_status_lock.clone()).await;
|
||||
}
|
||||
}
|
||||
Some(AnalysisCtrlMessage::RecordingFinished(name)) => {
|
||||
let mut status = analysis_status_lock.write().await;
|
||||
status.finished.push(name);
|
||||
}
|
||||
Some(AnalysisCtrlMessage::Exit) | None => return,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn get_analysis_status(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
) -> Result<Json<AnalysisStatus>, (StatusCode, String)> {
|
||||
Ok(Json(state.analysis_status_lock.read().await.clone()))
|
||||
}
|
||||
|
||||
fn queue_qmdl(name: &str, analysis_status: &mut RwLockWriteGuard<AnalysisStatus>) -> bool {
|
||||
if analysis_status.queued.iter().any(|n| n == name)
|
||||
|| analysis_status.running.iter().any(|n| n == name)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
analysis_status.queued.push(name.to_string());
|
||||
true
|
||||
}
|
||||
|
||||
pub async fn start_analysis(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
Path(qmdl_name): Path<String>,
|
||||
) -> Result<(StatusCode, Json<AnalysisStatus>), (StatusCode, String)> {
|
||||
let mut analysis_status = state.analysis_status_lock.write().await;
|
||||
let store = state.qmdl_store_lock.read().await;
|
||||
let queued = if qmdl_name.is_empty() {
|
||||
let mut entry_names: Vec<&str> = store
|
||||
.manifest
|
||||
.entries
|
||||
.iter()
|
||||
.map(|e| e.name.as_str())
|
||||
.collect();
|
||||
if let Some(current_entry) = store.current_entry {
|
||||
entry_names.remove(current_entry);
|
||||
}
|
||||
entry_names
|
||||
.iter()
|
||||
.any(|name| queue_qmdl(name, &mut analysis_status))
|
||||
} else {
|
||||
queue_qmdl(&qmdl_name, &mut analysis_status)
|
||||
};
|
||||
if queued {
|
||||
state
|
||||
.analysis_sender
|
||||
.send(AnalysisCtrlMessage::NewFilesQueued)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("failed to queue new analysis files: {e:?}"),
|
||||
)
|
||||
})?;
|
||||
}
|
||||
Ok((StatusCode::ACCEPTED, Json(analysis_status.clone())))
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
use log::warn;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use rayhunter::Device;
|
||||
use rayhunter::analysis::analyzer::AnalyzerConfig;
|
||||
|
||||
use crate::error::RayhunterError;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
#[serde(default)]
|
||||
pub struct Config {
|
||||
pub qmdl_store_path: String,
|
||||
pub port: u16,
|
||||
pub debug_mode: bool,
|
||||
pub device: Device,
|
||||
pub ui_level: u8,
|
||||
pub colorblind_mode: bool,
|
||||
pub key_input_mode: u8,
|
||||
pub ntfy_topic: Option<String>,
|
||||
pub analyzers: AnalyzerConfig,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Config {
|
||||
qmdl_store_path: "/data/rayhunter/qmdl".to_string(),
|
||||
port: 8080,
|
||||
debug_mode: false,
|
||||
device: Device::Orbic,
|
||||
ui_level: 1,
|
||||
colorblind_mode: false,
|
||||
key_input_mode: 0,
|
||||
analyzers: AnalyzerConfig::default(),
|
||||
ntfy_topic: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn parse_config<P>(path: P) -> Result<Config, RayhunterError>
|
||||
where
|
||||
P: AsRef<std::path::Path>,
|
||||
{
|
||||
if let Ok(config_file) = tokio::fs::read_to_string(&path).await {
|
||||
Ok(toml::from_str(&config_file).map_err(RayhunterError::ConfigFileParsingError)?)
|
||||
} else {
|
||||
warn!("unable to read config file, using default config");
|
||||
Ok(Config::default())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Args {
|
||||
pub config_path: String,
|
||||
}
|
||||
|
||||
pub fn parse_args() -> Args {
|
||||
let args: Vec<String> = std::env::args().collect();
|
||||
if args.len() != 2 {
|
||||
println!("Usage: {} /path/to/config/file", args[0]);
|
||||
std::process::exit(1);
|
||||
}
|
||||
Args {
|
||||
config_path: args[1].clone(),
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,430 @@
|
||||
use std::ops::DerefMut;
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::body::Body;
|
||||
use axum::extract::{Path, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::http::header::CONTENT_TYPE;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use log::{debug, error, info, warn};
|
||||
use rayhunter::analysis::analyzer::AnalyzerConfig;
|
||||
use rayhunter::diag::{DataType, MessagesContainer};
|
||||
use rayhunter::diag_device::DiagDevice;
|
||||
use rayhunter::qmdl::QmdlWriter;
|
||||
use tokio::fs::File;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::{RwLock, oneshot};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::analysis::{AnalysisCtrlMessage, AnalysisWriter};
|
||||
use crate::display;
|
||||
use crate::notifications::Notification;
|
||||
use crate::qmdl_store::{RecordingStore, RecordingStoreError};
|
||||
use crate::server::ServerState;
|
||||
|
||||
pub enum DiagDeviceCtrlMessage {
|
||||
StopRecording,
|
||||
StartRecording,
|
||||
DeleteEntry {
|
||||
name: String,
|
||||
response_tx: oneshot::Sender<Result<(), RecordingStoreError>>,
|
||||
},
|
||||
DeleteAllEntries {
|
||||
response_tx: oneshot::Sender<Result<(), RecordingStoreError>>,
|
||||
},
|
||||
Exit,
|
||||
}
|
||||
|
||||
pub struct DiagTask {
|
||||
ui_update_sender: Sender<display::DisplayState>,
|
||||
analysis_sender: Sender<AnalysisCtrlMessage>,
|
||||
analyzer_config: AnalyzerConfig,
|
||||
state: DiagState,
|
||||
}
|
||||
|
||||
enum DiagState {
|
||||
Recording {
|
||||
qmdl_writer: QmdlWriter<File>,
|
||||
analysis_writer: Box<AnalysisWriter>,
|
||||
},
|
||||
Stopped,
|
||||
}
|
||||
|
||||
impl DiagTask {
|
||||
fn new(
|
||||
ui_update_sender: Sender<display::DisplayState>,
|
||||
analysis_sender: Sender<AnalysisCtrlMessage>,
|
||||
analyzer_config: AnalyzerConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
ui_update_sender,
|
||||
analysis_sender,
|
||||
analyzer_config,
|
||||
state: DiagState::Stopped,
|
||||
}
|
||||
}
|
||||
|
||||
/// Start recording
|
||||
async fn start(&mut self, qmdl_store: &mut RecordingStore) {
|
||||
let (qmdl_file, analysis_file) = qmdl_store
|
||||
.new_entry()
|
||||
.await
|
||||
.expect("failed creating QMDL file entry");
|
||||
self.stop_current_recording().await;
|
||||
let qmdl_writer = QmdlWriter::new(qmdl_file);
|
||||
let analysis_writer = AnalysisWriter::new(analysis_file, &self.analyzer_config)
|
||||
.await
|
||||
.map(Box::new)
|
||||
.expect("failed to write to analysis file");
|
||||
self.state = DiagState::Recording {
|
||||
qmdl_writer,
|
||||
analysis_writer,
|
||||
};
|
||||
if let Err(e) = self
|
||||
.ui_update_sender
|
||||
.send(display::DisplayState::Recording)
|
||||
.await
|
||||
{
|
||||
warn!("couldn't send ui update message: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Stop recording
|
||||
async fn stop(&mut self, qmdl_store: &mut RecordingStore) {
|
||||
self.stop_current_recording().await;
|
||||
if let Some((_, entry)) = qmdl_store.get_current_entry() {
|
||||
if let Err(e) = self
|
||||
.analysis_sender
|
||||
.send(AnalysisCtrlMessage::RecordingFinished(
|
||||
entry.name.to_string(),
|
||||
))
|
||||
.await
|
||||
{
|
||||
warn!("couldn't send analysis message: {e}");
|
||||
}
|
||||
}
|
||||
if let Err(e) = qmdl_store.close_current_entry().await {
|
||||
error!("couldn't close current entry: {e}");
|
||||
}
|
||||
if let Err(e) = self
|
||||
.ui_update_sender
|
||||
.send(display::DisplayState::Paused)
|
||||
.await
|
||||
{
|
||||
warn!("couldn't send ui update message: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_entry(
|
||||
&mut self,
|
||||
qmdl_store: &mut RecordingStore,
|
||||
name: &str,
|
||||
) -> Result<(), RecordingStoreError> {
|
||||
if qmdl_store.is_current_entry(name) {
|
||||
self.stop(qmdl_store).await;
|
||||
}
|
||||
let res = qmdl_store.delete_entry(name).await;
|
||||
if let Err(e) = res.as_ref() {
|
||||
error!("Error deleting QMDL entry {e}");
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
async fn delete_all_entries(
|
||||
&mut self,
|
||||
qmdl_store: &mut RecordingStore,
|
||||
) -> Result<(), RecordingStoreError> {
|
||||
self.stop(qmdl_store).await;
|
||||
let res = qmdl_store.delete_all_entries().await;
|
||||
if let Err(e) = res.as_ref() {
|
||||
error!("Error deleting QMDL entries {e}");
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
async fn stop_current_recording(&mut self) {
|
||||
let mut state = DiagState::Stopped;
|
||||
std::mem::swap(&mut self.state, &mut state);
|
||||
if let DiagState::Recording {
|
||||
analysis_writer, ..
|
||||
} = state
|
||||
{
|
||||
analysis_writer
|
||||
.close()
|
||||
.await
|
||||
.expect("failed to close analysis writer");
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_container(
|
||||
&mut self,
|
||||
qmdl_store: &mut RecordingStore,
|
||||
container: MessagesContainer,
|
||||
notification_channel: &tokio::sync::mpsc::Sender<Notification>,
|
||||
) {
|
||||
if container.data_type != DataType::UserSpace {
|
||||
debug!("skipping non-userspace diag messages...");
|
||||
return;
|
||||
}
|
||||
// keep track of how many bytes were written to the QMDL file so we can read
|
||||
// a valid block of data from it in the HTTP server
|
||||
if let DiagState::Recording {
|
||||
qmdl_writer,
|
||||
analysis_writer,
|
||||
} = &mut self.state
|
||||
{
|
||||
qmdl_writer
|
||||
.write_container(&container)
|
||||
.await
|
||||
.expect("failed to write to QMDL writer");
|
||||
debug!(
|
||||
"total QMDL bytes written: {}, updating manifest...",
|
||||
qmdl_writer.total_written
|
||||
);
|
||||
let index = qmdl_store
|
||||
.current_entry
|
||||
.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
|
||||
qmdl_store
|
||||
.update_entry_qmdl_size(index, qmdl_writer.total_written)
|
||||
.await
|
||||
.expect("failed to update qmdl file size");
|
||||
debug!("done!");
|
||||
let heuristic_warning = analysis_writer
|
||||
.analyze(container)
|
||||
.await
|
||||
.expect("failed to analyze container");
|
||||
if heuristic_warning {
|
||||
info!("a heuristic triggered on this run!");
|
||||
self.ui_update_sender
|
||||
.send(display::DisplayState::WarningDetected)
|
||||
.await
|
||||
.expect("couldn't send ui update message: {}");
|
||||
notification_channel
|
||||
.send(Notification::new(
|
||||
"heuristic-warning".to_string(),
|
||||
"New warning triggered!".to_string(),
|
||||
Some(Duration::from_secs(60 * 5)),
|
||||
))
|
||||
.await
|
||||
.expect("Failed to send to notification channel");
|
||||
}
|
||||
} else {
|
||||
debug!("no qmdl_writer set, continuing...");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn run_diag_read_thread(
|
||||
task_tracker: &TaskTracker,
|
||||
mut dev: DiagDevice,
|
||||
mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>,
|
||||
qmdl_file_tx: Sender<DiagDeviceCtrlMessage>,
|
||||
ui_update_sender: Sender<display::DisplayState>,
|
||||
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
|
||||
analysis_sender: Sender<AnalysisCtrlMessage>,
|
||||
analyzer_config: AnalyzerConfig,
|
||||
notification_channel: tokio::sync::mpsc::Sender<Notification>,
|
||||
) {
|
||||
task_tracker.spawn(async move {
|
||||
let mut diag_stream = pin!(dev.as_stream().into_stream());
|
||||
let mut diag_task = DiagTask::new(ui_update_sender, analysis_sender, analyzer_config);
|
||||
qmdl_file_tx
|
||||
.send(DiagDeviceCtrlMessage::StartRecording)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = qmdl_file_rx.recv() => {
|
||||
match msg {
|
||||
Some(DiagDeviceCtrlMessage::StartRecording) => {
|
||||
let mut qmdl_store = qmdl_store_lock.write().await;
|
||||
diag_task.start(qmdl_store.deref_mut()).await;
|
||||
},
|
||||
Some(DiagDeviceCtrlMessage::StopRecording) => {
|
||||
let mut qmdl_store = qmdl_store_lock.write().await;
|
||||
diag_task.stop(qmdl_store.deref_mut()).await;
|
||||
},
|
||||
// None means all the Senders have been dropped, so it's
|
||||
// time to go
|
||||
Some(DiagDeviceCtrlMessage::Exit) | None => {
|
||||
info!("Diag reader thread exiting...");
|
||||
diag_task.stop_current_recording().await;
|
||||
return Ok(())
|
||||
},
|
||||
Some(DiagDeviceCtrlMessage::DeleteEntry { name, response_tx }) => {
|
||||
let mut qmdl_store = qmdl_store_lock.write().await;
|
||||
let resp = diag_task.delete_entry(qmdl_store.deref_mut(), name.as_str()).await;
|
||||
if response_tx.send(resp).is_err() {
|
||||
error!("Failed to send delete entry respons, receiver dropped");
|
||||
}
|
||||
},
|
||||
Some(DiagDeviceCtrlMessage::DeleteAllEntries { response_tx }) => {
|
||||
let mut qmdl_store = qmdl_store_lock.write().await;
|
||||
let resp = diag_task.delete_all_entries(qmdl_store.deref_mut()).await;
|
||||
if response_tx.send(resp).is_err() {
|
||||
error!("Failed to send delete all entries respons, receiver dropped");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
maybe_container = diag_stream.next() => {
|
||||
match maybe_container.unwrap() {
|
||||
Ok(container) => {
|
||||
let mut qmdl_store = qmdl_store_lock.write().await;
|
||||
diag_task.process_container(qmdl_store.deref_mut(), container, ¬ification_channel).await
|
||||
},
|
||||
Err(err) => {
|
||||
error!("error reading diag device: {err}");
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Start recording API for web thread
|
||||
pub async fn start_recording(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
) -> Result<(StatusCode, String), (StatusCode, String)> {
|
||||
if state.config.debug_mode {
|
||||
return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string()));
|
||||
}
|
||||
|
||||
state
|
||||
.diag_device_ctrl_sender
|
||||
.send(DiagDeviceCtrlMessage::StartRecording)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("couldn't send start recording message: {e}"),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok((StatusCode::ACCEPTED, "ok".to_string()))
|
||||
}
|
||||
|
||||
/// Stop recording API for web thread
|
||||
pub async fn stop_recording(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
) -> Result<(StatusCode, String), (StatusCode, String)> {
|
||||
if state.config.debug_mode {
|
||||
return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string()));
|
||||
}
|
||||
state
|
||||
.diag_device_ctrl_sender
|
||||
.send(DiagDeviceCtrlMessage::StopRecording)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("couldn't send stop recording message: {e}"),
|
||||
)
|
||||
})?;
|
||||
Ok((StatusCode::ACCEPTED, "ok".to_string()))
|
||||
}
|
||||
|
||||
pub async fn delete_recording(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
Path(qmdl_name): Path<String>,
|
||||
) -> Result<(StatusCode, String), (StatusCode, String)> {
|
||||
if state.config.debug_mode {
|
||||
return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string()));
|
||||
}
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
state
|
||||
.diag_device_ctrl_sender
|
||||
.send(DiagDeviceCtrlMessage::DeleteEntry {
|
||||
name: qmdl_name.clone(),
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("couldn't send delete entry message: {e}"),
|
||||
)
|
||||
})?;
|
||||
match response_rx.await.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("failed to receive delete response: {e}"),
|
||||
)
|
||||
})? {
|
||||
Ok(_) => Ok((StatusCode::ACCEPTED, "ok".to_string())),
|
||||
Err(RecordingStoreError::NoSuchEntryError) => Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!("no recording with name {qmdl_name}"),
|
||||
)),
|
||||
Err(e) => Err((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("couldn't delete recording: {e}"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_all_recordings(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
) -> Result<(StatusCode, String), (StatusCode, String)> {
|
||||
if state.config.debug_mode {
|
||||
return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string()));
|
||||
}
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
state
|
||||
.diag_device_ctrl_sender
|
||||
.send(DiagDeviceCtrlMessage::DeleteAllEntries { response_tx })
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("couldn't send delete all entries message: {e}"),
|
||||
)
|
||||
})?;
|
||||
match response_rx.await.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("failed to receive delete all response: {e}"),
|
||||
)
|
||||
})? {
|
||||
Ok(_) => Ok((StatusCode::ACCEPTED, "ok".to_string())),
|
||||
Err(e) => Err((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("couldn't delete recordings: {e}"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_analysis_report(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
Path(qmdl_name): Path<String>,
|
||||
) -> Result<Response, (StatusCode, String)> {
|
||||
let qmdl_store = state.qmdl_store_lock.read().await;
|
||||
let (entry_index, _) = if qmdl_name == "live" {
|
||||
qmdl_store.get_current_entry().ok_or((
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
"No QMDL data's being recorded to analyze, try starting a new recording!".to_string(),
|
||||
))?
|
||||
} else {
|
||||
qmdl_store.entry_for_name(&qmdl_name).ok_or((
|
||||
StatusCode::NOT_FOUND,
|
||||
format!("Couldn't find QMDL entry with name \"{qmdl_name}\""),
|
||||
))?
|
||||
};
|
||||
let analysis_file = qmdl_store
|
||||
.open_entry_analysis(entry_index)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
|
||||
let analysis_stream = ReaderStream::new(analysis_file);
|
||||
|
||||
let headers = [(CONTENT_TYPE, "application/x-ndjson")];
|
||||
let body = Body::from_stream(analysis_stream);
|
||||
Ok((headers, body).into_response())
|
||||
}
|
||||
@@ -0,0 +1,203 @@
|
||||
use async_trait::async_trait;
|
||||
use image::{AnimationDecoder, DynamicImage, codecs::gif::GifDecoder, imageops::FilterType};
|
||||
use std::io::Cursor;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::config;
|
||||
use crate::display::DisplayState;
|
||||
|
||||
use log::{error, info};
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use include_dir::{Dir, include_dir};
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct Dimensions {
|
||||
pub height: u32,
|
||||
pub width: u32,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Copy, Clone)]
|
||||
pub enum Color {
|
||||
Red,
|
||||
Green,
|
||||
Blue,
|
||||
White,
|
||||
Black,
|
||||
Cyan,
|
||||
Yellow,
|
||||
Pink,
|
||||
}
|
||||
|
||||
impl Color {
|
||||
fn rgb(self) -> (u8, u8, u8) {
|
||||
match self {
|
||||
Color::Red => (0xff, 0, 0),
|
||||
Color::Green => (0, 0xff, 0),
|
||||
Color::Blue => (0, 0, 0xff),
|
||||
Color::White => (0xff, 0xff, 0xff),
|
||||
Color::Black => (0, 0, 0),
|
||||
Color::Cyan => (0, 0xff, 0xff),
|
||||
Color::Yellow => (0xff, 0xff, 0),
|
||||
Color::Pink => (0xfe, 0x24, 0xff),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Color {
|
||||
fn from_state(state: DisplayState, colorblind_mode: bool) -> Self {
|
||||
match state {
|
||||
DisplayState::Paused => Color::White,
|
||||
DisplayState::Recording => {
|
||||
if colorblind_mode {
|
||||
Color::Blue
|
||||
} else {
|
||||
Color::Green
|
||||
}
|
||||
}
|
||||
DisplayState::WarningDetected => Color::Red,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait GenericFramebuffer: Send + 'static {
|
||||
fn dimensions(&self) -> Dimensions;
|
||||
|
||||
async fn write_buffer(&mut self, buffer: Vec<(u8, u8, u8)>); // rgb, row-wise, left-to-right, top-to-bottom
|
||||
|
||||
async fn write_dynamic_image(&mut self, img: DynamicImage) {
|
||||
let dimensions = self.dimensions();
|
||||
let mut width = img.width();
|
||||
let mut height = img.height();
|
||||
let resized_img: DynamicImage;
|
||||
if height > dimensions.height || width > dimensions.width {
|
||||
resized_img = img.resize(dimensions.width, dimensions.height, FilterType::CatmullRom);
|
||||
width = dimensions.width.min(resized_img.width());
|
||||
height = dimensions.height.min(resized_img.height());
|
||||
} else {
|
||||
resized_img = img;
|
||||
}
|
||||
let img_rgba8 = resized_img.as_rgba8().unwrap();
|
||||
let mut buf = Vec::new();
|
||||
for y in 0..height {
|
||||
for x in 0..width {
|
||||
let px = img_rgba8.get_pixel(x, y);
|
||||
buf.push((px[0], px[1], px[2]));
|
||||
}
|
||||
}
|
||||
|
||||
self.write_buffer(buf).await
|
||||
}
|
||||
|
||||
async fn draw_gif(&mut self, img_buffer: &[u8]) {
|
||||
let cursor = Cursor::new(img_buffer);
|
||||
if let Ok(decoder) = GifDecoder::new(cursor) {
|
||||
let frames: Vec<_> = decoder
|
||||
.into_frames()
|
||||
.filter_map(|f| f.ok())
|
||||
.map(|frame| {
|
||||
let (numerator, _) = frame.delay().numer_denom_ms();
|
||||
let img = DynamicImage::from(frame.into_buffer());
|
||||
(img, numerator as u64)
|
||||
})
|
||||
.collect();
|
||||
|
||||
for (img, delay_ms) in frames {
|
||||
self.write_dynamic_image(img).await;
|
||||
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn draw_img(&mut self, img_buffer: &[u8]) {
|
||||
let img = image::load_from_memory(img_buffer).unwrap();
|
||||
self.write_dynamic_image(img).await
|
||||
}
|
||||
|
||||
async fn draw_line(&mut self, color: Color, height: u32) {
|
||||
let width = self.dimensions().width;
|
||||
let px_num = height * width;
|
||||
let mut buffer = Vec::new();
|
||||
for _ in 0..px_num {
|
||||
buffer.push(color.rgb());
|
||||
}
|
||||
|
||||
self.write_buffer(buffer).await
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_ui(
|
||||
task_tracker: &TaskTracker,
|
||||
config: &config::Config,
|
||||
mut fb: impl GenericFramebuffer,
|
||||
mut ui_shutdown_rx: oneshot::Receiver<()>,
|
||||
mut ui_update_rx: Receiver<DisplayState>,
|
||||
) {
|
||||
static IMAGE_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/images/");
|
||||
let display_level = config.ui_level;
|
||||
if display_level == 0 {
|
||||
info!("Invisible mode, not spawning UI.");
|
||||
}
|
||||
|
||||
let colorblind_mode = config.colorblind_mode;
|
||||
let mut display_color = Color::from_state(DisplayState::Recording, colorblind_mode);
|
||||
|
||||
task_tracker.spawn(async move {
|
||||
// this feels wrong, is there a more rusty way to do this?
|
||||
let mut img: Option<&[u8]> = None;
|
||||
if display_level == 2 {
|
||||
img = Some(
|
||||
IMAGE_DIR
|
||||
.get_file("orca.gif")
|
||||
.expect("failed to read orca.gif")
|
||||
.contents(),
|
||||
);
|
||||
} else if display_level == 3 {
|
||||
img = Some(
|
||||
IMAGE_DIR
|
||||
.get_file("eff.png")
|
||||
.expect("failed to read eff.png")
|
||||
.contents(),
|
||||
);
|
||||
}
|
||||
loop {
|
||||
match ui_shutdown_rx.try_recv() {
|
||||
Ok(_) => {
|
||||
info!("received UI shutdown");
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Empty) => {}
|
||||
Err(e) => panic!("error receiving shutdown message: {e}"),
|
||||
}
|
||||
match ui_update_rx.try_recv() {
|
||||
Ok(state) => {
|
||||
display_color = Color::from_state(state, colorblind_mode);
|
||||
}
|
||||
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
|
||||
Err(e) => error!("error receiving framebuffer update message: {e}"),
|
||||
}
|
||||
|
||||
match display_level {
|
||||
2 => fb.draw_gif(img.unwrap()).await,
|
||||
3 => fb.draw_img(img.unwrap()).await,
|
||||
128 => {
|
||||
fb.draw_line(Color::Cyan, 128).await;
|
||||
fb.draw_line(Color::Pink, 102).await;
|
||||
fb.draw_line(Color::White, 76).await;
|
||||
fb.draw_line(Color::Pink, 50).await;
|
||||
fb.draw_line(Color::Cyan, 25).await;
|
||||
}
|
||||
// this branch id for ui_level 1, which is also the default if an
|
||||
// unknown value is used
|
||||
_ => {}
|
||||
};
|
||||
fb.draw_line(display_color, 2).await;
|
||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
use log::info;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::config;
|
||||
use crate::display::DisplayState;
|
||||
|
||||
pub fn update_ui(
|
||||
_task_tracker: &TaskTracker,
|
||||
_config: &config::Config,
|
||||
_ui_shutdown_rx: oneshot::Receiver<()>,
|
||||
_ui_update_rx: Receiver<DisplayState>,
|
||||
) {
|
||||
info!("Headless mode, not spawning UI.");
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
mod generic_framebuffer;
|
||||
|
||||
pub mod headless;
|
||||
pub mod orbic;
|
||||
pub mod tmobile;
|
||||
pub mod tplink;
|
||||
pub mod tplink_framebuffer;
|
||||
pub mod tplink_onebit;
|
||||
pub mod uz801;
|
||||
pub mod wingtech;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
pub enum DisplayState {
|
||||
Recording,
|
||||
Paused,
|
||||
WarningDetected,
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
use crate::config;
|
||||
use crate::display::DisplayState;
|
||||
use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer};
|
||||
use async_trait::async_trait;
|
||||
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
const FB_PATH: &str = "/dev/fb0";
|
||||
|
||||
#[derive(Copy, Clone, Default)]
|
||||
struct Framebuffer;
|
||||
|
||||
#[async_trait]
|
||||
impl GenericFramebuffer for Framebuffer {
|
||||
fn dimensions(&self) -> Dimensions {
|
||||
// TODO actually poll for this, maybe w/ fbset?
|
||||
Dimensions {
|
||||
height: 128,
|
||||
width: 128,
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_buffer(&mut self, buffer: Vec<(u8, u8, u8)>) {
|
||||
let mut raw_buffer = Vec::new();
|
||||
for (r, g, b) in buffer {
|
||||
let mut rgb565: u16 = (r as u16 & 0b11111000) << 8;
|
||||
rgb565 |= (g as u16 & 0b11111100) << 3;
|
||||
rgb565 |= (b as u16) >> 3;
|
||||
raw_buffer.extend(rgb565.to_le_bytes());
|
||||
}
|
||||
|
||||
tokio::fs::write(FB_PATH, &raw_buffer).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_ui(
|
||||
task_tracker: &TaskTracker,
|
||||
config: &config::Config,
|
||||
ui_shutdown_rx: oneshot::Receiver<()>,
|
||||
ui_update_rx: Receiver<DisplayState>,
|
||||
) {
|
||||
generic_framebuffer::update_ui(
|
||||
task_tracker,
|
||||
config,
|
||||
Framebuffer,
|
||||
ui_shutdown_rx,
|
||||
ui_update_rx,
|
||||
)
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
/// Display module for Tmobile TMOHS1, blink LEDs on the front of the device.
|
||||
/// DisplayState::Recording => Signal LED slowly blinks blue.
|
||||
/// DisplayState::Paused => WiFi LED blinks white.
|
||||
/// DisplayState::WarningDetected => Signal LED slowly blinks red.
|
||||
use log::{error, info};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::config;
|
||||
use crate::display::DisplayState;
|
||||
|
||||
macro_rules! led {
|
||||
($l:expr) => {{ format!("/sys/class/leds/led:{}/blink", $l) }};
|
||||
}
|
||||
|
||||
async fn start_blinking(path: String) {
|
||||
tokio::fs::write(&path, "1").await.ok();
|
||||
}
|
||||
|
||||
async fn stop_blinking(path: String) {
|
||||
tokio::fs::write(&path, "0").await.ok();
|
||||
}
|
||||
|
||||
pub fn update_ui(
|
||||
task_tracker: &TaskTracker,
|
||||
config: &config::Config,
|
||||
mut ui_shutdown_rx: oneshot::Receiver<()>,
|
||||
mut ui_update_rx: mpsc::Receiver<DisplayState>,
|
||||
) {
|
||||
let mut invisible: bool = false;
|
||||
if config.ui_level == 0 {
|
||||
info!("Invisible mode, not spawning UI.");
|
||||
invisible = true;
|
||||
}
|
||||
task_tracker.spawn(async move {
|
||||
let mut state = DisplayState::Recording;
|
||||
let mut last_state = DisplayState::Paused;
|
||||
|
||||
loop {
|
||||
match ui_shutdown_rx.try_recv() {
|
||||
Ok(_) => {
|
||||
info!("received UI shutdown");
|
||||
break;
|
||||
}
|
||||
Err(oneshot::error::TryRecvError::Empty) => {}
|
||||
Err(e) => panic!("error receiving shutdown message: {e}"),
|
||||
}
|
||||
match ui_update_rx.try_recv() {
|
||||
Ok(new_state) => state = new_state,
|
||||
Err(mpsc::error::TryRecvError::Empty) => {}
|
||||
Err(e) => error!("error receiving ui update message: {e}"),
|
||||
};
|
||||
if invisible || state == last_state {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
match state {
|
||||
DisplayState::Paused => {
|
||||
stop_blinking(led!("signal_blue")).await;
|
||||
stop_blinking(led!("signal_red")).await;
|
||||
start_blinking(led!("wlan_white")).await;
|
||||
}
|
||||
DisplayState::Recording => {
|
||||
stop_blinking(led!("wlan_white")).await;
|
||||
stop_blinking(led!("signal_red")).await;
|
||||
start_blinking(led!("signal_blue")).await;
|
||||
}
|
||||
DisplayState::WarningDetected => {
|
||||
stop_blinking(led!("wlan_white")).await;
|
||||
stop_blinking(led!("signal_blue")).await;
|
||||
start_blinking(led!("signal_red")).await;
|
||||
}
|
||||
}
|
||||
last_state = state;
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
use log::info;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::config;
|
||||
use crate::display::{DisplayState, tplink_framebuffer, tplink_onebit};
|
||||
|
||||
use std::fs;
|
||||
|
||||
pub fn update_ui(
|
||||
task_tracker: &TaskTracker,
|
||||
config: &config::Config,
|
||||
ui_shutdown_rx: oneshot::Receiver<()>,
|
||||
ui_update_rx: Receiver<DisplayState>,
|
||||
) {
|
||||
let display_level = config.ui_level;
|
||||
if display_level == 0 {
|
||||
info!("Invisible mode, not spawning UI.");
|
||||
}
|
||||
|
||||
// Since this is a one-time check at startup, using sync is acceptable
|
||||
// The alternative would be to make the entire initialization async
|
||||
if fs::exists(tplink_onebit::OLED_PATH).unwrap_or_default() {
|
||||
info!("detected one-bit display");
|
||||
tplink_onebit::update_ui(task_tracker, config, ui_shutdown_rx, ui_update_rx)
|
||||
} else {
|
||||
info!("fallback to framebuffer");
|
||||
tplink_framebuffer::update_ui(task_tracker, config, ui_shutdown_rx, ui_update_rx)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
use async_trait::async_trait;
|
||||
use std::os::fd::AsRawFd;
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
use crate::config;
|
||||
use crate::display::DisplayState;
|
||||
use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer};
|
||||
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
const FB_PATH: &str = "/dev/fb0";
|
||||
|
||||
struct Framebuffer;
|
||||
|
||||
#[repr(C)]
|
||||
struct fb_fillrect {
|
||||
dx: u32,
|
||||
dy: u32,
|
||||
width: u32,
|
||||
height: u32,
|
||||
color: u32,
|
||||
rop: u32,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl GenericFramebuffer for Framebuffer {
|
||||
fn dimensions(&self) -> Dimensions {
|
||||
// TODO actually poll for this, maybe w/ fbset?
|
||||
Dimensions {
|
||||
height: 128,
|
||||
width: 128,
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_buffer(&mut self, buffer: Vec<(u8, u8, u8)>) {
|
||||
// for how to write to the buffer, consult M7350v5_en_gpl/bootable/recovery/recovery_color_oled.c
|
||||
let dimensions = self.dimensions();
|
||||
let width = dimensions.width;
|
||||
let height = buffer.len() as u32 / width;
|
||||
let mut f = OpenOptions::new().write(true).open(FB_PATH).await.unwrap();
|
||||
let mut arg = fb_fillrect {
|
||||
dx: 0,
|
||||
dy: 0,
|
||||
width,
|
||||
height,
|
||||
color: 0xffff, // not sure what this is
|
||||
rop: 0,
|
||||
};
|
||||
|
||||
let mut raw_buffer = Vec::new();
|
||||
for (r, g, b) in buffer {
|
||||
let mut rgb565: u16 = (r as u16 & 0b11111000) << 8;
|
||||
rgb565 |= (g as u16 & 0b11111100) << 3;
|
||||
rgb565 |= (b as u16) >> 3;
|
||||
// note: big-endian!
|
||||
raw_buffer.extend(rgb565.to_be_bytes());
|
||||
}
|
||||
|
||||
f.write_all(&raw_buffer).await.unwrap();
|
||||
|
||||
// ioctl is a synchronous operation, but it's fast enough that it shouldn't block
|
||||
unsafe {
|
||||
let res = libc::ioctl(
|
||||
f.as_raw_fd(),
|
||||
0x4619, // FBIORECT_DISPLAY
|
||||
&mut arg as *mut _,
|
||||
std::mem::size_of::<fb_fillrect>(),
|
||||
);
|
||||
|
||||
if res < 0 {
|
||||
panic!("failed to send FBIORECT_DISPLAY ioctl, {res}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_ui(
|
||||
task_tracker: &TaskTracker,
|
||||
config: &config::Config,
|
||||
ui_shutdown_rx: oneshot::Receiver<()>,
|
||||
ui_update_rx: Receiver<DisplayState>,
|
||||
) {
|
||||
generic_framebuffer::update_ui(
|
||||
task_tracker,
|
||||
config,
|
||||
Framebuffer,
|
||||
ui_shutdown_rx,
|
||||
ui_update_rx,
|
||||
)
|
||||
}
|
||||
@@ -0,0 +1,168 @@
|
||||
/// Display module for the TP-Link M7350 oled one-bit display.
|
||||
///
|
||||
/// https://github.com/m0veax/tplink_m7350/tree/main/oled
|
||||
use crate::config;
|
||||
use crate::display::DisplayState;
|
||||
|
||||
use log::{error, info};
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
pub const OLED_PATH: &str = "/sys/class/display/oled/oled_buffer";
|
||||
|
||||
// those coordinates were mainly chosen for a spot that doesn't get regularly updated by the main
|
||||
// oledd service. otherwise we'd have to write to the display more than once per second to prevent
|
||||
// the icon from flickering.
|
||||
const STATUS_X: u8 = 104;
|
||||
const STATUS_Y: u8 = 40;
|
||||
const STATUS_W: u8 = 16;
|
||||
const STATUS_H: u8 = 16;
|
||||
|
||||
macro_rules! pixel {
|
||||
(x) => {
|
||||
0
|
||||
};
|
||||
(_) => {
|
||||
1
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! pixelart {
|
||||
(x=$x:expr, y=$y:expr, width=$width:expr, height=$height:expr; $($a:tt $b:tt $c:tt $d:tt $e:tt $f:tt $g:tt $h:tt)*) => {{
|
||||
// one bit per pixel + 4 bytes for header
|
||||
const BUF_SIZE: usize = ($width as usize * $height as usize) / 8 + 4;
|
||||
const BUF_BYTES: [u8; BUF_SIZE] = [
|
||||
$x,
|
||||
$y,
|
||||
$width,
|
||||
$height,
|
||||
$(
|
||||
(pixel!($a) << 7 | pixel!($b) << 6 | pixel!($c) << 5 | pixel!($d) << 4 | pixel!($e) << 3 | pixel!($f) << 2 | pixel!($g) << 1 | pixel!($h)),
|
||||
)*
|
||||
];
|
||||
|
||||
&BUF_BYTES
|
||||
}}
|
||||
}
|
||||
|
||||
const STATUS_PAUSED: &[u8] = pixelart! {
|
||||
x=STATUS_X, y=STATUS_Y, width=STATUS_W, height=STATUS_H;
|
||||
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
|
||||
_ _ _ x x x x x x x x x x _ _ _
|
||||
_ x x _ _ _ _ _ _ _ _ _ _ x x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ x _ _ _ _ x _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x x _ _ _ _ _ _ _ _ _ _ x x _
|
||||
_ _ _ x x x x x x x x x x _ _ _
|
||||
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
|
||||
};
|
||||
|
||||
const STATUS_SMILING: &[u8] = pixelart! {
|
||||
x=STATUS_X, y=STATUS_Y, width=STATUS_W, height=STATUS_H;
|
||||
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
|
||||
_ _ _ x x x x x x x x x x _ _ _
|
||||
_ x x _ _ _ _ _ _ _ _ _ _ x x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ x _ _ _ _ x _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ x _ _ _ _ x _ _ _ x _
|
||||
_ x _ _ _ x _ _ _ _ x _ _ _ x _
|
||||
_ x _ _ _ x x x x x x _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x x _ _ _ _ _ _ _ _ _ _ x x _
|
||||
_ _ _ x x x x x x x x x x _ _ _
|
||||
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
|
||||
};
|
||||
|
||||
const STATUS_WARNING: &[u8] = pixelart! {
|
||||
x=STATUS_X, y=STATUS_Y, width=STATUS_W, height=STATUS_H;
|
||||
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
|
||||
_ _ _ x x x x x x x x x x _ _ _
|
||||
_ x x _ _ _ _ _ _ _ _ _ _ x x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ x x _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ x x _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ x x _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ x x _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ x x _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ x x _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ x x _ _ _ _ _ x _
|
||||
_ x _ _ _ _ _ _ _ _ _ _ _ _ x _
|
||||
_ x x _ _ _ _ _ _ _ _ _ _ x x _
|
||||
_ _ _ x x x x x x x x x x _ _ _
|
||||
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
|
||||
};
|
||||
|
||||
pub fn update_ui(
|
||||
task_tracker: &TaskTracker,
|
||||
config: &config::Config,
|
||||
mut ui_shutdown_rx: oneshot::Receiver<()>,
|
||||
mut ui_update_rx: Receiver<DisplayState>,
|
||||
) {
|
||||
let display_level = config.ui_level;
|
||||
if display_level == 0 {
|
||||
info!("Invisible mode, not spawning UI.");
|
||||
}
|
||||
|
||||
task_tracker.spawn(async move {
|
||||
let mut pixels = STATUS_SMILING;
|
||||
|
||||
loop {
|
||||
match ui_shutdown_rx.try_recv() {
|
||||
Ok(_) => {
|
||||
info!("received UI shutdown");
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Empty) => {}
|
||||
Err(e) => panic!("error receiving shutdown message: {e}"),
|
||||
}
|
||||
|
||||
match ui_update_rx.try_recv() {
|
||||
Ok(DisplayState::Paused) => pixels = STATUS_PAUSED,
|
||||
Ok(DisplayState::Recording) => pixels = STATUS_SMILING,
|
||||
Ok(DisplayState::WarningDetected) => pixels = STATUS_WARNING,
|
||||
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
|
||||
Err(e) => {
|
||||
error!("error receiving framebuffer update message: {e}");
|
||||
}
|
||||
};
|
||||
|
||||
// we write the status every second because it may have been overwritten through menu
|
||||
// navigation.
|
||||
if display_level != 0 {
|
||||
if let Err(e) = tokio::fs::write(OLED_PATH, pixels).await {
|
||||
error!("failed to write to display: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pixelart_macro() {
|
||||
assert_eq!(
|
||||
STATUS_WARNING,
|
||||
[
|
||||
104, 40, 16, 16, 255, 255, 224, 7, 159, 249, 191, 253, 190, 125, 190, 125, 190, 125,
|
||||
190, 125, 190, 125, 191, 253, 190, 125, 190, 125, 191, 253, 159, 249, 224, 7, 255, 255
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
/// Display module for Uz801, light LEDs on the front of the device.
|
||||
/// DisplayState::Recording => Green LED is solid.
|
||||
/// DisplayState::Paused => Signal LED is solid blue (wifi LED).
|
||||
/// DisplayState::WarningDetected => Signal LED is solid red.
|
||||
use log::{error, info};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::config;
|
||||
use crate::display::DisplayState;
|
||||
|
||||
macro_rules! led {
|
||||
($l:expr) => {{ format!("/sys/class/leds/{}/brightness", $l) }};
|
||||
}
|
||||
|
||||
async fn led_on(path: String) {
|
||||
tokio::fs::write(&path, "1").await.ok();
|
||||
}
|
||||
|
||||
async fn led_off(path: String) {
|
||||
tokio::fs::write(&path, "0").await.ok();
|
||||
}
|
||||
|
||||
pub fn update_ui(
|
||||
task_tracker: &TaskTracker,
|
||||
config: &config::Config,
|
||||
mut ui_shutdown_rx: oneshot::Receiver<()>,
|
||||
mut ui_update_rx: mpsc::Receiver<DisplayState>,
|
||||
) {
|
||||
let mut invisible: bool = false;
|
||||
if config.ui_level == 0 {
|
||||
info!("Invisible mode, not spawning UI.");
|
||||
invisible = true;
|
||||
}
|
||||
task_tracker.spawn(async move {
|
||||
let mut state = DisplayState::Recording;
|
||||
let mut last_state = DisplayState::Paused;
|
||||
let mut last_update = std::time::Instant::now();
|
||||
|
||||
loop {
|
||||
match ui_shutdown_rx.try_recv() {
|
||||
Ok(_) => {
|
||||
info!("received UI shutdown");
|
||||
break;
|
||||
}
|
||||
Err(oneshot::error::TryRecvError::Empty) => {}
|
||||
Err(e) => panic!("error receiving shutdown message: {e}"),
|
||||
}
|
||||
match ui_update_rx.try_recv() {
|
||||
Ok(new_state) => state = new_state,
|
||||
Err(mpsc::error::TryRecvError::Empty) => {}
|
||||
Err(e) => error!("error receiving ui update message: {e}"),
|
||||
};
|
||||
|
||||
// Update LEDs if state changed or if 5 seconds have passed since last update
|
||||
let now = std::time::Instant::now();
|
||||
let should_update = !invisible
|
||||
&& (state != last_state
|
||||
|| now.duration_since(last_update) >= Duration::from_secs(5));
|
||||
|
||||
if should_update {
|
||||
match state {
|
||||
DisplayState::Paused => {
|
||||
led_off(led!("red")).await;
|
||||
led_off(led!("green")).await;
|
||||
led_on(led!("wifi")).await;
|
||||
}
|
||||
DisplayState::Recording => {
|
||||
led_off(led!("red")).await;
|
||||
led_off(led!("wifi")).await;
|
||||
led_on(led!("green")).await;
|
||||
}
|
||||
DisplayState::WarningDetected => {
|
||||
led_off(led!("green")).await;
|
||||
led_off(led!("wifi")).await;
|
||||
led_on(led!("red")).await;
|
||||
}
|
||||
}
|
||||
last_state = state;
|
||||
last_update = now;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
use crate::config;
|
||||
use crate::display::DisplayState;
|
||||
use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer};
|
||||
/// Display support for the Wingtech CT2MHS01 hotspot.
|
||||
///
|
||||
/// Tested on (from `/etc/wt_version`):
|
||||
/// WT_INNER_VERSION=SW_Q89323AA1_V057_M10_CRICKET_USR_MP
|
||||
/// WT_PRODUCTION_VERSION=CT2MHS01_0.04.55
|
||||
/// WT_HARDWARE_VERSION=89323_1_20
|
||||
use async_trait::async_trait;
|
||||
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
const FB_PATH: &str = "/dev/fb0";
|
||||
|
||||
#[derive(Copy, Clone, Default)]
|
||||
struct Framebuffer;
|
||||
|
||||
#[async_trait]
|
||||
impl GenericFramebuffer for Framebuffer {
|
||||
fn dimensions(&self) -> Dimensions {
|
||||
Dimensions {
|
||||
height: 128,
|
||||
width: 160,
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_buffer(&mut self, buffer: Vec<(u8, u8, u8)>) {
|
||||
let mut raw_buffer = Vec::new();
|
||||
for (r, g, b) in buffer {
|
||||
let mut rgb565: u16 = (r as u16 & 0b11111000) << 8;
|
||||
rgb565 |= (g as u16 & 0b11111100) << 3;
|
||||
rgb565 |= (b as u16) >> 3;
|
||||
raw_buffer.extend(rgb565.to_le_bytes());
|
||||
}
|
||||
|
||||
tokio::fs::write(FB_PATH, &raw_buffer).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_ui(
|
||||
task_tracker: &TaskTracker,
|
||||
config: &config::Config,
|
||||
ui_shutdown_rx: oneshot::Receiver<()>,
|
||||
ui_update_rx: Receiver<DisplayState>,
|
||||
) {
|
||||
generic_framebuffer::update_ui(
|
||||
task_tracker,
|
||||
config,
|
||||
Framebuffer,
|
||||
ui_shutdown_rx,
|
||||
ui_update_rx,
|
||||
)
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
use rayhunter::diag_device::DiagDeviceError;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::qmdl_store::RecordingStoreError;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum RayhunterError {
|
||||
#[error("Config file parsing error: {0}")]
|
||||
ConfigFileParsingError(#[from] toml::de::Error),
|
||||
#[error("Diag intialization error: {0}")]
|
||||
DiagInitError(DiagDeviceError),
|
||||
#[error("Tokio error: {0}")]
|
||||
TokioError(#[from] tokio::io::Error),
|
||||
#[error("QmdlStore error: {0}")]
|
||||
QmdlStoreError(#[from] RecordingStoreError),
|
||||
#[error("No QMDL store found at path {0}, but can't create a new one due to debug mode")]
|
||||
NoStoreDebugMode(String),
|
||||
}
|
||||
@@ -0,0 +1,131 @@
|
||||
use log::{error, info};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::config;
|
||||
use crate::diag::DiagDeviceCtrlMessage;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Event {
|
||||
KeyDown,
|
||||
KeyUp,
|
||||
}
|
||||
|
||||
const INPUT_EVENT_SIZE: usize = 32;
|
||||
|
||||
pub fn run_key_input_thread(
|
||||
task_tracker: &TaskTracker,
|
||||
config: &config::Config,
|
||||
diag_tx: Sender<DiagDeviceCtrlMessage>,
|
||||
mut ui_shutdown_rx: oneshot::Receiver<()>,
|
||||
) {
|
||||
if config.key_input_mode == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
task_tracker.spawn(async move {
|
||||
// Open the input device
|
||||
let mut file = match File::open("/dev/input/event0").await {
|
||||
Ok(file) => file,
|
||||
Err(e) => {
|
||||
error!("Failed to open /dev/input/event0: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut buffer = [0u8; INPUT_EVENT_SIZE];
|
||||
let mut last_keyup: Option<Instant> = None;
|
||||
let mut last_event_time: Option<Instant> = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut ui_shutdown_rx => {
|
||||
info!("received key input shutdown");
|
||||
return;
|
||||
}
|
||||
result = file.read_exact(&mut buffer) => {
|
||||
if let Err(e) = result {
|
||||
error!("failed to read key input: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let event = parse_event(buffer);
|
||||
|
||||
let now = Instant::now();
|
||||
|
||||
// On orbic it was observed that pressing the power button can trigger many successive
|
||||
// events. Drop events that are too close together.
|
||||
if let Some(last_time) = last_event_time {
|
||||
if now.duration_since(last_time) < Duration::from_millis(50) {
|
||||
last_event_time = Some(now);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
last_event_time = Some(now);
|
||||
|
||||
match event {
|
||||
Event::KeyUp => {
|
||||
if let Some(last_keyup_instant) = last_keyup {
|
||||
let elapsed = now.duration_since(last_keyup_instant);
|
||||
|
||||
if elapsed >= Duration::from_millis(100)
|
||||
&& elapsed <= Duration::from_millis(800)
|
||||
{
|
||||
if let Err(e) = diag_tx.send(DiagDeviceCtrlMessage::StopRecording).await
|
||||
{
|
||||
error!("Failed to send StopRecording: {e}");
|
||||
}
|
||||
if let Err(e) =
|
||||
diag_tx.send(DiagDeviceCtrlMessage::StartRecording).await
|
||||
{
|
||||
error!("Failed to send StartRecording: {e}");
|
||||
}
|
||||
last_keyup = None;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
last_keyup = Some(now);
|
||||
}
|
||||
Event::KeyDown => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn parse_event(input: [u8; INPUT_EVENT_SIZE]) -> Event {
|
||||
if input[12] == 0 {
|
||||
Event::KeyUp
|
||||
} else {
|
||||
Event::KeyDown
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_event_keydown_m7350_v5() {
|
||||
let input = [
|
||||
0x57, 0x6c, 0x09, 0x00, 0x7c, 0xfb, 0x03, 0x00, 0x01, 0x00, 0x74, 0x00, 0x01, 0x00,
|
||||
0x00, 0x00, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
];
|
||||
assert!(matches!(parse_event(input), Event::KeyDown));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_event_keyup_m7350_v5() {
|
||||
let input = [
|
||||
0x57, 0x6c, 0x09, 0x00, 0x1b, 0x15, 0x05, 0x00, 0x01, 0x00, 0x74, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
];
|
||||
assert!(matches!(parse_event(input), Event::KeyUp));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,322 @@
|
||||
mod analysis;
|
||||
mod config;
|
||||
mod diag;
|
||||
mod display;
|
||||
mod error;
|
||||
mod key_input;
|
||||
mod notifications;
|
||||
mod pcap;
|
||||
mod qmdl_store;
|
||||
mod server;
|
||||
mod stats;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use crate::config::{parse_args, parse_config};
|
||||
use crate::diag::run_diag_read_thread;
|
||||
use crate::error::RayhunterError;
|
||||
use crate::notifications::{run_notification_worker, NotificationService};
|
||||
use crate::pcap::get_pcap;
|
||||
use crate::qmdl_store::RecordingStore;
|
||||
use crate::server::{ServerState, get_config, get_qmdl, get_zip, serve_static, set_config};
|
||||
use crate::stats::{get_qmdl_manifest, get_system_stats};
|
||||
|
||||
use analysis::{
|
||||
AnalysisCtrlMessage, AnalysisStatus, get_analysis_status, run_analysis_thread, start_analysis,
|
||||
};
|
||||
use axum::Router;
|
||||
use axum::response::Redirect;
|
||||
use axum::routing::{get, post};
|
||||
use diag::{
|
||||
DiagDeviceCtrlMessage, delete_all_recordings, delete_recording, get_analysis_report,
|
||||
start_recording, stop_recording,
|
||||
};
|
||||
use log::{error, info};
|
||||
use qmdl_store::RecordingStoreError;
|
||||
use rayhunter::Device;
|
||||
use rayhunter::diag_device::DiagDevice;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::select;
|
||||
use tokio::sync::mpsc::{self, Sender};
|
||||
use tokio::sync::{RwLock, oneshot};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
type AppRouter = Router<Arc<ServerState>>;
|
||||
|
||||
fn get_router() -> AppRouter {
|
||||
Router::new()
|
||||
.route("/api/pcap/{name}", get(get_pcap))
|
||||
.route("/api/qmdl/{name}", get(get_qmdl))
|
||||
.route("/api/zip/{name}", get(get_zip))
|
||||
.route("/api/system-stats", get(get_system_stats))
|
||||
.route("/api/qmdl-manifest", get(get_qmdl_manifest))
|
||||
.route("/api/start-recording", post(start_recording))
|
||||
.route("/api/stop-recording", post(stop_recording))
|
||||
.route("/api/delete-recording/{name}", post(delete_recording))
|
||||
.route("/api/delete-all-recordings", post(delete_all_recordings))
|
||||
.route("/api/analysis-report/{name}", get(get_analysis_report))
|
||||
.route("/api/analysis", get(get_analysis_status))
|
||||
.route("/api/analysis/{name}", post(start_analysis))
|
||||
.route("/api/config", get(get_config))
|
||||
.route("/api/config", post(set_config))
|
||||
.route("/", get(|| async { Redirect::permanent("/index.html") }))
|
||||
.route("/{*path}", get(serve_static))
|
||||
}
|
||||
|
||||
// Runs the axum server, taking all the elements needed to build up our
|
||||
// ServerState and a oneshot Receiver that'll fire when it's time to shutdown
|
||||
// (i.e. user hit ctrl+c)
|
||||
async fn run_server(
|
||||
task_tracker: &TaskTracker,
|
||||
state: Arc<ServerState>,
|
||||
server_shutdown_rx: oneshot::Receiver<()>,
|
||||
) -> JoinHandle<()> {
|
||||
info!("spinning up server");
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], state.config.port));
|
||||
let listener = TcpListener::bind(&addr).await.unwrap();
|
||||
let app = get_router().with_state(state);
|
||||
|
||||
task_tracker.spawn(async move {
|
||||
info!("The orca is hunting for stingrays...");
|
||||
axum::serve(listener, app)
|
||||
.with_graceful_shutdown(server_shutdown_signal(server_shutdown_rx))
|
||||
.await
|
||||
.unwrap();
|
||||
})
|
||||
}
|
||||
|
||||
async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) {
|
||||
server_shutdown_rx.await.unwrap();
|
||||
info!("Server received shutdown signal, exiting...");
|
||||
}
|
||||
|
||||
// Loads a RecordingStore if one exists, and if not, only create one if we're
|
||||
// not in debug mode. If we fail to parse the manifest AND we're not in debug
|
||||
// mode, try to recover the manifest from the existing QMDL files
|
||||
async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, RayhunterError> {
|
||||
let store_exists = RecordingStore::exists(&config.qmdl_store_path).await?;
|
||||
if config.debug_mode {
|
||||
if store_exists {
|
||||
Ok(RecordingStore::load(&config.qmdl_store_path).await?)
|
||||
} else {
|
||||
Err(RayhunterError::NoStoreDebugMode(
|
||||
config.qmdl_store_path.clone(),
|
||||
))
|
||||
}
|
||||
} else if store_exists {
|
||||
match RecordingStore::load(&config.qmdl_store_path).await {
|
||||
Ok(store) => Ok(store),
|
||||
Err(RecordingStoreError::ParseManifestError(err)) => {
|
||||
error!("failed to parse QMDL manifest: {err}");
|
||||
info!("recovering manifest from existing QMDL files...");
|
||||
Ok(RecordingStore::recover(&config.qmdl_store_path).await?)
|
||||
}
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
} else {
|
||||
Ok(RecordingStore::create(&config.qmdl_store_path).await?)
|
||||
}
|
||||
}
|
||||
|
||||
// Start a thread that'll track when user hits ctrl+c. When that happens,
|
||||
// trigger various cleanup tasks, including sending signals to other threads to
|
||||
// shutdown
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn run_shutdown_thread(
|
||||
task_tracker: &TaskTracker,
|
||||
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
|
||||
daemon_restart_rx: oneshot::Receiver<()>,
|
||||
should_restart_flag: Arc<AtomicBool>,
|
||||
server_shutdown_tx: oneshot::Sender<()>,
|
||||
maybe_ui_shutdown_tx: Option<oneshot::Sender<()>>,
|
||||
maybe_key_input_shutdown_tx: Option<oneshot::Sender<()>>,
|
||||
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
|
||||
analysis_tx: Sender<AnalysisCtrlMessage>,
|
||||
) -> JoinHandle<Result<(), RayhunterError>> {
|
||||
info!("create shutdown thread");
|
||||
|
||||
task_tracker.spawn(async move {
|
||||
select! {
|
||||
res = tokio::signal::ctrl_c() => {
|
||||
if let Err(err) = res {
|
||||
error!("Unable to listen for shutdown signal: {err}");
|
||||
}
|
||||
|
||||
should_restart_flag.store(false, Ordering::Relaxed);
|
||||
}
|
||||
res = daemon_restart_rx => {
|
||||
if let Err(err) = res {
|
||||
error!("Unable to listen for shutdown signal: {err}");
|
||||
}
|
||||
|
||||
should_restart_flag.store(true, Ordering::Relaxed);
|
||||
}
|
||||
};
|
||||
|
||||
let mut qmdl_store = qmdl_store_lock.write().await;
|
||||
if qmdl_store.current_entry.is_some() {
|
||||
info!("Closing current QMDL entry...");
|
||||
qmdl_store.close_current_entry().await?;
|
||||
info!("Done!");
|
||||
}
|
||||
|
||||
server_shutdown_tx
|
||||
.send(())
|
||||
.expect("couldn't send server shutdown signal");
|
||||
if let Some(ui_shutdown_tx) = maybe_ui_shutdown_tx {
|
||||
let _ = ui_shutdown_tx.send(());
|
||||
}
|
||||
if let Some(key_input_shutdown_tx) = maybe_key_input_shutdown_tx {
|
||||
let _ = key_input_shutdown_tx.send(());
|
||||
}
|
||||
diag_device_sender
|
||||
.send(DiagDeviceCtrlMessage::Exit)
|
||||
.await
|
||||
.expect("couldn't send Exit message to diag thread");
|
||||
analysis_tx
|
||||
.send(AnalysisCtrlMessage::Exit)
|
||||
.await
|
||||
.expect("couldn't send Exit message to analysis thread");
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<(), RayhunterError> {
|
||||
env_logger::init();
|
||||
|
||||
rustls_rustcrypto::provider()
|
||||
.install_default()
|
||||
.expect("Couldn't install rustcrypto provider");
|
||||
|
||||
let args = parse_args();
|
||||
|
||||
loop {
|
||||
let config = parse_config(&args.config_path).await?;
|
||||
if !run_with_config(&args, config).await? {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_with_config(
|
||||
args: &config::Args,
|
||||
config: config::Config,
|
||||
) -> Result<bool, RayhunterError> {
|
||||
// TaskTrackers give us an interface to spawn tokio threads, and then
|
||||
// eventually await all of them ending
|
||||
let task_tracker = TaskTracker::new();
|
||||
println!("R A Y H U N T E R 🐳");
|
||||
|
||||
let store = init_qmdl_store(&config).await?;
|
||||
let analysis_status = AnalysisStatus::new(&store);
|
||||
let qmdl_store_lock = Arc::new(RwLock::new(store));
|
||||
let (diag_tx, diag_rx) = mpsc::channel::<DiagDeviceCtrlMessage>(1);
|
||||
let (ui_update_tx, ui_update_rx) = mpsc::channel::<display::DisplayState>(1);
|
||||
let (analysis_tx, analysis_rx) = mpsc::channel::<AnalysisCtrlMessage>(5);
|
||||
let mut maybe_ui_shutdown_tx = None;
|
||||
let mut maybe_key_input_shutdown_tx = None;
|
||||
|
||||
let notification_service = NotificationService::new(config.ntfy_topic.clone());
|
||||
|
||||
if !config.debug_mode {
|
||||
let (ui_shutdown_tx, ui_shutdown_rx) = oneshot::channel();
|
||||
maybe_ui_shutdown_tx = Some(ui_shutdown_tx);
|
||||
info!("Using configuration for device: {0:?}", config.device);
|
||||
let mut dev = DiagDevice::new(&config.device)
|
||||
.await
|
||||
.map_err(RayhunterError::DiagInitError)?;
|
||||
dev.config_logs()
|
||||
.await
|
||||
.map_err(RayhunterError::DiagInitError)?;
|
||||
|
||||
info!("Starting Diag Thread");
|
||||
run_diag_read_thread(
|
||||
&task_tracker,
|
||||
dev,
|
||||
diag_rx,
|
||||
diag_tx.clone(),
|
||||
ui_update_tx.clone(),
|
||||
qmdl_store_lock.clone(),
|
||||
analysis_tx.clone(),
|
||||
config.analyzers.clone(),
|
||||
notification_service.new_handler(),
|
||||
);
|
||||
info!("Starting UI");
|
||||
|
||||
let update_ui = match &config.device {
|
||||
Device::Orbic => display::orbic::update_ui,
|
||||
Device::Tplink => display::tplink::update_ui,
|
||||
Device::Tmobile => display::tmobile::update_ui,
|
||||
Device::Wingtech => display::wingtech::update_ui,
|
||||
Device::Pinephone => display::headless::update_ui,
|
||||
Device::Uz801 => display::uz801::update_ui,
|
||||
};
|
||||
update_ui(&task_tracker, &config, ui_shutdown_rx, ui_update_rx);
|
||||
|
||||
info!("Starting Key Input service");
|
||||
let (key_input_shutdown_tx, key_input_shutdown_rx) = oneshot::channel();
|
||||
maybe_key_input_shutdown_tx = Some(key_input_shutdown_tx);
|
||||
key_input::run_key_input_thread(
|
||||
&task_tracker,
|
||||
&config,
|
||||
diag_tx.clone(),
|
||||
key_input_shutdown_rx,
|
||||
);
|
||||
}
|
||||
|
||||
let (daemon_restart_tx, daemon_restart_rx) = oneshot::channel::<()>();
|
||||
let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>();
|
||||
let analysis_status_lock = Arc::new(RwLock::new(analysis_status));
|
||||
run_analysis_thread(
|
||||
&task_tracker,
|
||||
analysis_rx,
|
||||
qmdl_store_lock.clone(),
|
||||
analysis_status_lock.clone(),
|
||||
config.analyzers.clone(),
|
||||
);
|
||||
let should_restart_flag = Arc::new(AtomicBool::new(false));
|
||||
|
||||
run_shutdown_thread(
|
||||
&task_tracker,
|
||||
diag_tx.clone(),
|
||||
daemon_restart_rx,
|
||||
should_restart_flag.clone(),
|
||||
server_shutdown_tx,
|
||||
maybe_ui_shutdown_tx,
|
||||
maybe_key_input_shutdown_tx,
|
||||
qmdl_store_lock.clone(),
|
||||
analysis_tx.clone(),
|
||||
);
|
||||
run_notification_worker(&task_tracker, notification_service);
|
||||
let state = Arc::new(ServerState {
|
||||
config_path: args.config_path.clone(),
|
||||
config,
|
||||
qmdl_store_lock: qmdl_store_lock.clone(),
|
||||
diag_device_ctrl_sender: diag_tx,
|
||||
analysis_status_lock,
|
||||
analysis_sender: analysis_tx,
|
||||
daemon_restart_tx: Arc::new(RwLock::new(Some(daemon_restart_tx))),
|
||||
});
|
||||
run_server(&task_tracker, state, server_shutdown_rx).await;
|
||||
|
||||
task_tracker.close();
|
||||
task_tracker.wait().await;
|
||||
|
||||
info!("see you space cowboy...");
|
||||
Ok(should_restart_flag.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_router() {
|
||||
// assert that creating the router does not panic from invalid route patterns.
|
||||
let _ = get_router();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
use std::{
|
||||
cmp::min,
|
||||
collections::HashMap,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use log::error;
|
||||
use tokio::sync::mpsc::{self, error::TryRecvError};
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
static NTFY_BASE_URL: &str = "https://ntfy.sh/";
|
||||
|
||||
pub struct Notification {
|
||||
message_type: String,
|
||||
message: String,
|
||||
debounce: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Notification {
|
||||
pub fn new(message_type: String, message: String, debounce: Option<Duration>) -> Self {
|
||||
Notification {
|
||||
message_type,
|
||||
message,
|
||||
debounce,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct NotificationStatus {
|
||||
message: String,
|
||||
needs_sending: bool,
|
||||
last_sent: Option<Instant>,
|
||||
last_attempt: Option<Instant>,
|
||||
failed_since_last_success: u32,
|
||||
}
|
||||
|
||||
pub struct NotificationService {
|
||||
channel_name: Option<String>,
|
||||
tx: mpsc::Sender<Notification>,
|
||||
rx: mpsc::Receiver<Notification>,
|
||||
}
|
||||
|
||||
impl NotificationService {
|
||||
pub fn new(channel_name: Option<String>) -> Self {
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
Self {
|
||||
channel_name,
|
||||
tx,
|
||||
rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_handler(&self) -> mpsc::Sender<Notification> {
|
||||
self.tx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_notification_worker(
|
||||
task_tracker: &TaskTracker,
|
||||
mut notification_service: NotificationService,
|
||||
) {
|
||||
task_tracker.spawn(async move {
|
||||
let channel_name = notification_service.channel_name.unwrap_or("".into());
|
||||
|
||||
if !channel_name.is_empty() {
|
||||
let mut notification_statuses = HashMap::new();
|
||||
let http_client = reqwest::Client::new();
|
||||
|
||||
loop {
|
||||
// Get any notifications since the last time we checked
|
||||
loop {
|
||||
match notification_service.rx.try_recv() {
|
||||
Ok(notification) => {
|
||||
let status = notification_statuses
|
||||
.entry(notification.message_type)
|
||||
.or_insert_with(|| NotificationStatus {
|
||||
message: "".to_string(),
|
||||
needs_sending: true,
|
||||
last_sent: None,
|
||||
last_attempt: None,
|
||||
failed_since_last_success: 0,
|
||||
});
|
||||
// Ignore if we're in the debounce period
|
||||
if let Some(debounce) = notification.debounce {
|
||||
if let Some(last_sent) = status.last_sent {
|
||||
if last_sent.elapsed() < debounce {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
status.message = notification.message;
|
||||
status.needs_sending = true;
|
||||
}
|
||||
Err(TryRecvError::Empty) => {
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to send pending notifications
|
||||
for notification in notification_statuses.values_mut() {
|
||||
if !notification.needs_sending {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Backoff retries, up to a maximum of 256 seconds.
|
||||
if let Some(last_attempt) = notification.last_attempt {
|
||||
let min_wait_time = Duration::from_secs(
|
||||
2u64.pow(min(notification.failed_since_last_success, 8)),
|
||||
);
|
||||
if last_attempt.elapsed() < min_wait_time {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match http_client
|
||||
.post(format!("{NTFY_BASE_URL}{channel_name}"))
|
||||
.body(notification.message.clone())
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
notification.last_sent = Some(Instant::now());
|
||||
notification.failed_since_last_success = 0;
|
||||
notification.needs_sending = false;
|
||||
} else {
|
||||
notification.failed_since_last_success += 1;
|
||||
notification.last_attempt = Some(Instant::now());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to send notification to ntfy: {e}");
|
||||
notification.failed_since_last_success += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
}
|
||||
}
|
||||
// If there's no channel name we'll just discard the notifications
|
||||
else {
|
||||
loop {
|
||||
notification_service.rx.recv().await;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
use crate::ServerState;
|
||||
|
||||
use anyhow::Error;
|
||||
use axum::body::Body;
|
||||
use axum::extract::{Path, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::http::header::CONTENT_TYPE;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use log::error;
|
||||
use rayhunter::diag::DataType;
|
||||
use rayhunter::gsmtap_parser;
|
||||
use rayhunter::pcap::GsmtapPcapWriter;
|
||||
use rayhunter::qmdl::QmdlReader;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, duplex};
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
|
||||
// written so far. This is done by spawning a thread which streams chunks of
|
||||
// pcap data to a channel that's piped to the client.
|
||||
pub async fn get_pcap(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
Path(mut qmdl_name): Path<String>,
|
||||
) -> Result<Response, (StatusCode, String)> {
|
||||
let qmdl_store = state.qmdl_store_lock.read().await;
|
||||
if qmdl_name.ends_with("pcapng") {
|
||||
qmdl_name = qmdl_name.trim_end_matches(".pcapng").to_string();
|
||||
}
|
||||
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_name).ok_or((
|
||||
StatusCode::NOT_FOUND,
|
||||
format!("couldn't find manifest entry with name {qmdl_name}"),
|
||||
))?;
|
||||
if entry.qmdl_size_bytes == 0 {
|
||||
return Err((
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
"QMDL file is empty, try again in a bit!".to_string(),
|
||||
));
|
||||
}
|
||||
let qmdl_size_bytes = entry.qmdl_size_bytes;
|
||||
let qmdl_file = qmdl_store
|
||||
.open_entry_qmdl(entry_index)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
|
||||
// the QMDL reader should stop at the last successfully written data chunk
|
||||
// (entry.size_bytes)
|
||||
let (reader, writer) = duplex(1024);
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes).await {
|
||||
error!("failed to generate PCAP: {e:?}");
|
||||
}
|
||||
});
|
||||
|
||||
let headers = [(CONTENT_TYPE, "application/vnd.tcpdump.pcap")];
|
||||
let body = Body::from_stream(ReaderStream::new(reader));
|
||||
Ok((headers, body).into_response())
|
||||
}
|
||||
|
||||
pub async fn generate_pcap_data<R, W>(
|
||||
writer: W,
|
||||
qmdl_file: R,
|
||||
qmdl_size_bytes: usize,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
W: AsyncWrite + Unpin + Send,
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
let mut pcap_writer = GsmtapPcapWriter::new(writer).await?;
|
||||
pcap_writer.write_iface_header().await?;
|
||||
|
||||
let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes));
|
||||
while let Some(container) = reader.get_next_messages_container().await? {
|
||||
if container.data_type != DataType::UserSpace {
|
||||
continue;
|
||||
}
|
||||
|
||||
for maybe_msg in container.into_messages() {
|
||||
match maybe_msg {
|
||||
Ok(msg) => {
|
||||
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
|
||||
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
|
||||
pcap_writer
|
||||
.write_gsmtap_message(gsmtap_msg, timestamp)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Err(e) => error!("error parsing message: {e:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,520 @@
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use chrono::{DateTime, Local};
|
||||
use log::{info, warn};
|
||||
use rayhunter::util::RuntimeMetadata;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
fs::{self, File, OpenOptions, try_exists},
|
||||
io::AsyncWriteExt,
|
||||
};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum RecordingStoreError {
|
||||
#[error("Can't close an entry when there's no current entry")]
|
||||
NoCurrentEntry,
|
||||
#[error("An entry with that name doesn't exist")]
|
||||
NoSuchEntryError,
|
||||
#[error("Couldn't create file: {0}")]
|
||||
CreateFileError(tokio::io::Error),
|
||||
#[error("Couldn't read file: {0}")]
|
||||
ReadFileError(tokio::io::Error),
|
||||
#[error("Couldn't delete file: {0}")]
|
||||
DeleteFileError(tokio::io::Error),
|
||||
#[error("Couldn't open directory at path: {0}")]
|
||||
OpenDirError(tokio::io::Error),
|
||||
#[error("Couldn't read manifest file: {0}")]
|
||||
ReadManifestError(tokio::io::Error),
|
||||
#[error("Couldn't write manifest file: {0}")]
|
||||
WriteManifestError(tokio::io::Error),
|
||||
#[error("Couldn't parse QMDL store manifest file: {0}")]
|
||||
ParseManifestError(toml::de::Error),
|
||||
}
|
||||
|
||||
pub struct RecordingStore {
|
||||
pub path: PathBuf,
|
||||
pub manifest: Manifest,
|
||||
pub current_entry: Option<usize>, // index into manifest
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
|
||||
pub struct Manifest {
|
||||
pub entries: Vec<ManifestEntry>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, PartialEq, Debug)]
|
||||
pub struct ManifestEntry {
|
||||
pub name: String,
|
||||
pub start_time: DateTime<Local>,
|
||||
pub last_message_time: Option<DateTime<Local>>,
|
||||
pub qmdl_size_bytes: usize,
|
||||
pub rayhunter_version: Option<String>,
|
||||
pub system_os: Option<String>,
|
||||
pub arch: Option<String>,
|
||||
}
|
||||
|
||||
impl ManifestEntry {
|
||||
fn new() -> Self {
|
||||
let now = Local::now();
|
||||
let metadata = RuntimeMetadata::new();
|
||||
ManifestEntry {
|
||||
name: format!("{}", now.timestamp()),
|
||||
start_time: now,
|
||||
last_message_time: None,
|
||||
qmdl_size_bytes: 0,
|
||||
rayhunter_version: Some(metadata.rayhunter_version),
|
||||
system_os: Some(metadata.system_os),
|
||||
arch: Some(metadata.arch),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
|
||||
let mut filepath = path.as_ref().join(&self.name);
|
||||
filepath.set_extension("qmdl");
|
||||
filepath
|
||||
}
|
||||
|
||||
pub fn get_analysis_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
|
||||
let mut filepath = path.as_ref().join(&self.name);
|
||||
filepath.set_extension("ndjson");
|
||||
filepath
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordingStore {
|
||||
// Returns whether a directory with a "manifest.toml" exists at the given
|
||||
// path (though doesn't check if that manifest is valid)
|
||||
pub async fn exists<P>(path: P) -> Result<bool, RecordingStoreError>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let manifest_path = path.as_ref().join("manifest.toml");
|
||||
let dir_exists = try_exists(path)
|
||||
.await
|
||||
.map_err(RecordingStoreError::OpenDirError)?;
|
||||
let manifest_exists = try_exists(manifest_path)
|
||||
.await
|
||||
.map_err(RecordingStoreError::ReadManifestError)?;
|
||||
Ok(dir_exists && manifest_exists)
|
||||
}
|
||||
|
||||
// Loads an existing RecordingStore at the given path. Errors if no store exists,
|
||||
// or if it's malformed.
|
||||
pub async fn load<P>(path: P) -> Result<Self, RecordingStoreError>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let path: PathBuf = path.as_ref().to_path_buf();
|
||||
let manifest = RecordingStore::read_manifest(&path).await?;
|
||||
Ok(RecordingStore {
|
||||
path,
|
||||
manifest,
|
||||
current_entry: None,
|
||||
})
|
||||
}
|
||||
|
||||
// Creates a new RecordingStore at the given path. This involves creating a dir
|
||||
// and writing an empty manifest.
|
||||
pub async fn create<P>(path: P) -> Result<Self, RecordingStoreError>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
fs::create_dir_all(&path)
|
||||
.await
|
||||
.map_err(RecordingStoreError::OpenDirError)?;
|
||||
|
||||
let mut store = RecordingStore {
|
||||
path: path.as_ref().to_owned(),
|
||||
manifest: Manifest {
|
||||
entries: Vec::new(),
|
||||
},
|
||||
current_entry: None,
|
||||
};
|
||||
|
||||
store.write_manifest().await?;
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
// Does a best-effort attempt to recover the manifest from a directory of
|
||||
// QMDL files. We expect these files to be named like "<timestamp>.qmdl",
|
||||
// and skip any files which don't match that pattern.
|
||||
pub async fn recover<P>(path: P) -> Result<Self, RecordingStoreError>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let mut dir_entries = fs::read_dir(path.as_ref())
|
||||
.await
|
||||
.map_err(RecordingStoreError::OpenDirError)?;
|
||||
let mut manifest_entries = Vec::new();
|
||||
|
||||
while let Some(entry) = dir_entries
|
||||
.next_entry()
|
||||
.await
|
||||
.map_err(RecordingStoreError::OpenDirError)?
|
||||
{
|
||||
let os_filename = entry.file_name();
|
||||
let Some(filename) = os_filename.to_str() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if !filename.ends_with(".qmdl") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let stem = filename.trim_end_matches(".qmdl");
|
||||
let Ok(start_timestamp) = stem.parse::<i64>() else {
|
||||
warn!("QMDL file has invalid name {os_filename:?}, skipping");
|
||||
continue;
|
||||
};
|
||||
|
||||
let metadata = match entry.metadata().await {
|
||||
Ok(metadata) => metadata,
|
||||
Err(err) => {
|
||||
warn!("failed to read QMDL file metadata: {err:?}, skipping");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(start_time) = DateTime::from_timestamp(start_timestamp, 0) else {
|
||||
warn!("QMDL filename {os_filename:?} gave an invalid timestamp, skipping");
|
||||
continue;
|
||||
};
|
||||
|
||||
let Ok(last_message_time) = metadata.modified() else {
|
||||
warn!("failed to get modified time for QMDL file {os_filename:?}, skipping");
|
||||
continue;
|
||||
};
|
||||
|
||||
info!("successfully recovered QMDL entry {os_filename:?}!");
|
||||
manifest_entries.push(ManifestEntry {
|
||||
name: stem.to_string(),
|
||||
start_time: start_time.into(),
|
||||
last_message_time: Some(last_message_time.into()),
|
||||
qmdl_size_bytes: metadata.size() as usize,
|
||||
rayhunter_version: None,
|
||||
system_os: None,
|
||||
arch: None,
|
||||
});
|
||||
}
|
||||
|
||||
// sort chronologically
|
||||
manifest_entries.sort_by(|a, b| a.start_time.cmp(&b.start_time));
|
||||
|
||||
let mut store = RecordingStore {
|
||||
path: path.as_ref().to_path_buf(),
|
||||
manifest: Manifest {
|
||||
entries: manifest_entries,
|
||||
},
|
||||
current_entry: None,
|
||||
};
|
||||
store.write_manifest().await?;
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
async fn read_manifest<P>(path: P) -> Result<Manifest, RecordingStoreError>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let manifest_path = path.as_ref().join("manifest.toml");
|
||||
let file_contents = fs::read_to_string(&manifest_path)
|
||||
.await
|
||||
.map_err(RecordingStoreError::ReadManifestError)?;
|
||||
toml::from_str(&file_contents).map_err(RecordingStoreError::ParseManifestError)
|
||||
}
|
||||
|
||||
// Closes the current entry (if needed), creates a new entry based on the
|
||||
// current time, and updates the manifest. Returns a tuple of the entry's
|
||||
// newly created QMDL file and analysis file.
|
||||
pub async fn new_entry(&mut self) -> Result<(File, File), RecordingStoreError> {
|
||||
// if we've already got an entry open, close it
|
||||
if self.current_entry.is_some() {
|
||||
self.close_current_entry().await?;
|
||||
}
|
||||
let new_entry = ManifestEntry::new();
|
||||
let qmdl_filepath = new_entry.get_qmdl_filepath(&self.path);
|
||||
let qmdl_file = File::create(&qmdl_filepath)
|
||||
.await
|
||||
.map_err(RecordingStoreError::CreateFileError)?;
|
||||
let analysis_filepath = new_entry.get_analysis_filepath(&self.path);
|
||||
let analysis_file = File::create(&analysis_filepath)
|
||||
.await
|
||||
.map_err(RecordingStoreError::CreateFileError)?;
|
||||
self.manifest.entries.push(new_entry);
|
||||
self.current_entry = Some(self.manifest.entries.len() - 1);
|
||||
self.write_manifest().await?;
|
||||
Ok((qmdl_file, analysis_file))
|
||||
}
|
||||
|
||||
// Returns the corresponding QMDL file for a given entry
|
||||
pub async fn open_entry_qmdl(&self, entry_index: usize) -> Result<File, RecordingStoreError> {
|
||||
let entry = &self.manifest.entries[entry_index];
|
||||
File::open(entry.get_qmdl_filepath(&self.path))
|
||||
.await
|
||||
.map_err(RecordingStoreError::ReadFileError)
|
||||
}
|
||||
|
||||
// Returns the corresponding QMDL file for a given entry
|
||||
pub async fn open_entry_analysis(
|
||||
&self,
|
||||
entry_index: usize,
|
||||
) -> Result<File, RecordingStoreError> {
|
||||
let entry = &self.manifest.entries[entry_index];
|
||||
File::open(entry.get_analysis_filepath(&self.path))
|
||||
.await
|
||||
.map_err(RecordingStoreError::ReadFileError)
|
||||
}
|
||||
|
||||
pub async fn clear_and_open_entry_analysis(
|
||||
&mut self,
|
||||
entry_index: usize,
|
||||
) -> Result<File, RecordingStoreError> {
|
||||
let entry = &self.manifest.entries[entry_index];
|
||||
let file = OpenOptions::new()
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(entry.get_analysis_filepath(&self.path))
|
||||
.await
|
||||
.map_err(RecordingStoreError::ReadFileError)?;
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
// Unsets the current entry
|
||||
pub async fn close_current_entry(&mut self) -> Result<(), RecordingStoreError> {
|
||||
match self.current_entry {
|
||||
Some(_) => {
|
||||
self.current_entry = None;
|
||||
Ok(())
|
||||
}
|
||||
None => Err(RecordingStoreError::NoCurrentEntry),
|
||||
}
|
||||
}
|
||||
|
||||
// Sets the given entry's size and updates the last_message_time to now, updating the manifest
|
||||
pub async fn update_entry_qmdl_size(
|
||||
&mut self,
|
||||
entry_index: usize,
|
||||
size_bytes: usize,
|
||||
) -> Result<(), RecordingStoreError> {
|
||||
self.manifest.entries[entry_index].qmdl_size_bytes = size_bytes;
|
||||
self.manifest.entries[entry_index].last_message_time = Some(Local::now());
|
||||
self.write_manifest().await
|
||||
}
|
||||
|
||||
async fn write_manifest(&mut self) -> Result<(), RecordingStoreError> {
|
||||
// we don't technically need a mutable reference to `self` here, but it
|
||||
// does prevent multiple concurrent writes across different threads
|
||||
let tmp_path = self.path.join("manifest.toml.new");
|
||||
let mut manifest_tmp_file = File::create(&tmp_path)
|
||||
.await
|
||||
.map_err(RecordingStoreError::WriteManifestError)?;
|
||||
|
||||
let manifest_contents =
|
||||
toml::to_string_pretty(&self.manifest).expect("failed to serialize manifest");
|
||||
manifest_tmp_file
|
||||
.write_all(manifest_contents.as_bytes())
|
||||
.await
|
||||
.map_err(RecordingStoreError::WriteManifestError)?;
|
||||
|
||||
fs::rename(tmp_path, self.path.join("manifest.toml"))
|
||||
.await
|
||||
.map_err(RecordingStoreError::WriteManifestError)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Finds an entry by filename
|
||||
pub fn entry_for_name(&self, name: &str) -> Option<(usize, &ManifestEntry)> {
|
||||
let entry_index = self
|
||||
.manifest
|
||||
.entries
|
||||
.iter()
|
||||
.position(|entry| entry.name == name)?;
|
||||
Some((entry_index, &self.manifest.entries[entry_index]))
|
||||
}
|
||||
|
||||
pub fn get_current_entry(&self) -> Option<(usize, &ManifestEntry)> {
|
||||
let entry_index = self.current_entry?;
|
||||
Some((entry_index, &self.manifest.entries[entry_index]))
|
||||
}
|
||||
|
||||
pub fn is_current_entry(&self, name: &str) -> bool {
|
||||
match self.current_entry {
|
||||
Some(idx) => match self.manifest.entries.get(idx) {
|
||||
Some(entry) => entry.name == name,
|
||||
None => false,
|
||||
},
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_entry(&mut self, name: &str) -> Result<(), RecordingStoreError> {
|
||||
let entry_to_delete_idx = self
|
||||
.manifest
|
||||
.entries
|
||||
.iter()
|
||||
.position(|entry| entry.name == name)
|
||||
.ok_or(RecordingStoreError::NoSuchEntryError)?;
|
||||
match self.current_entry {
|
||||
Some(current_entry) if current_entry == entry_to_delete_idx => {
|
||||
self.close_current_entry().await?;
|
||||
}
|
||||
Some(current_entry) => {
|
||||
self.current_entry = Some(current_entry - 1);
|
||||
}
|
||||
None => {}
|
||||
};
|
||||
let entry_to_delete = self.manifest.entries.remove(entry_to_delete_idx);
|
||||
self.write_manifest().await?;
|
||||
let qmdl_filepath = entry_to_delete.get_qmdl_filepath(&self.path);
|
||||
let analysis_filepath = entry_to_delete.get_analysis_filepath(&self.path);
|
||||
remove_file_if_exists(&qmdl_filepath)
|
||||
.await
|
||||
.map_err(RecordingStoreError::DeleteFileError)?;
|
||||
remove_file_if_exists(&analysis_filepath)
|
||||
.await
|
||||
.map_err(RecordingStoreError::DeleteFileError)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_all_entries(&mut self) -> Result<(), RecordingStoreError> {
|
||||
if self.current_entry.is_some() {
|
||||
self.close_current_entry().await?;
|
||||
}
|
||||
|
||||
let mut keep = Vec::new();
|
||||
|
||||
for entry in &self.manifest.entries {
|
||||
let qmdl_filepath = entry.get_qmdl_filepath(&self.path);
|
||||
let analysis_filepath = entry.get_analysis_filepath(&self.path);
|
||||
|
||||
if let Err(e) = remove_file_if_exists(&qmdl_filepath).await {
|
||||
log::warn!("failed to remove {qmdl_filepath:?}: {e:?}");
|
||||
keep.push(true);
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Err(e) = remove_file_if_exists(&analysis_filepath).await {
|
||||
log::warn!("failed to remove {analysis_filepath:?}: {e:?}");
|
||||
keep.push(true);
|
||||
continue;
|
||||
}
|
||||
|
||||
keep.push(false);
|
||||
}
|
||||
|
||||
let mut keep_iter = keep.into_iter();
|
||||
self.manifest.entries.retain(|_| keep_iter.next().unwrap());
|
||||
self.write_manifest().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn remove_file_if_exists(path: &Path) -> Result<(), io::Error> {
|
||||
match tokio::fs::remove_file(path).await {
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
|
||||
res => res,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::{Builder, TempDir};
|
||||
|
||||
fn make_temp_dir() -> TempDir {
|
||||
Builder::new().prefix("qmdl_store_test").tempdir().unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_load_from_empty_dir() {
|
||||
let dir = make_temp_dir();
|
||||
assert!(!RecordingStore::exists(dir.path()).await.unwrap());
|
||||
let _created_store = RecordingStore::create(dir.path()).await.unwrap();
|
||||
assert!(RecordingStore::exists(dir.path()).await.unwrap());
|
||||
let loaded_store = RecordingStore::load(dir.path()).await.unwrap();
|
||||
assert_eq!(loaded_store.manifest.entries.len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_creating_updating_and_closing_entries() {
|
||||
let dir = make_temp_dir();
|
||||
let mut store = RecordingStore::create(dir.path()).await.unwrap();
|
||||
let _ = store.new_entry().await.unwrap();
|
||||
let entry_index = store.current_entry.unwrap();
|
||||
assert_eq!(
|
||||
RecordingStore::read_manifest(dir.path()).await.unwrap(),
|
||||
store.manifest
|
||||
);
|
||||
assert!(
|
||||
store.manifest.entries[entry_index]
|
||||
.last_message_time
|
||||
.is_none()
|
||||
);
|
||||
|
||||
store
|
||||
.update_entry_qmdl_size(entry_index, 1000)
|
||||
.await
|
||||
.unwrap();
|
||||
let (entry_index, entry) = store
|
||||
.entry_for_name(&store.manifest.entries[entry_index].name)
|
||||
.unwrap();
|
||||
assert!(entry.last_message_time.is_some());
|
||||
assert_eq!(store.manifest.entries[entry_index].qmdl_size_bytes, 1000);
|
||||
assert_eq!(
|
||||
RecordingStore::read_manifest(dir.path()).await.unwrap(),
|
||||
store.manifest
|
||||
);
|
||||
|
||||
store.close_current_entry().await.unwrap();
|
||||
assert!(matches!(
|
||||
store.close_current_entry().await,
|
||||
Err(RecordingStoreError::NoCurrentEntry)
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_on_existing_store() {
|
||||
let dir = make_temp_dir();
|
||||
let mut store = RecordingStore::create(dir.path()).await.unwrap();
|
||||
let _ = store.new_entry().await.unwrap();
|
||||
let entry_index = store.current_entry.unwrap();
|
||||
store
|
||||
.update_entry_qmdl_size(entry_index, 1000)
|
||||
.await
|
||||
.unwrap();
|
||||
let store = RecordingStore::create(dir.path()).await.unwrap();
|
||||
assert_eq!(store.manifest.entries.len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_repeated_new_entries() {
|
||||
let dir = make_temp_dir();
|
||||
let mut store = RecordingStore::create(dir.path()).await.unwrap();
|
||||
let _ = store.new_entry().await.unwrap();
|
||||
let entry_index = store.current_entry.unwrap();
|
||||
let _ = store.new_entry().await.unwrap();
|
||||
let new_entry_index = store.current_entry.unwrap();
|
||||
assert_ne!(entry_index, new_entry_index);
|
||||
assert_eq!(store.manifest.entries.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_all_entries() {
|
||||
let dir = make_temp_dir();
|
||||
let mut store = RecordingStore::create(dir.path()).await.unwrap();
|
||||
let _ = store.new_entry().await.unwrap();
|
||||
assert!(store.current_entry.is_some());
|
||||
|
||||
store.delete_all_entries().await.unwrap();
|
||||
assert!(store.current_entry.is_none());
|
||||
|
||||
// regression test: deleting all entries should also work when there's no current
|
||||
// recording.
|
||||
store.delete_all_entries().await.unwrap();
|
||||
assert!(store.current_entry.is_none());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,345 @@
|
||||
use anyhow::Error;
|
||||
use async_zip::Compression;
|
||||
use async_zip::ZipEntryBuilder;
|
||||
use async_zip::tokio::write::ZipFileWriter;
|
||||
use axum::Json;
|
||||
use axum::body::Body;
|
||||
use axum::extract::Path;
|
||||
use axum::extract::State;
|
||||
use axum::http::header::{self, CONTENT_LENGTH, CONTENT_TYPE};
|
||||
use axum::http::{HeaderValue, StatusCode};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use log::{error, warn};
|
||||
use std::sync::Arc;
|
||||
use tokio::fs::write;
|
||||
use tokio::io::{AsyncReadExt, copy, duplex};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::{RwLock, oneshot};
|
||||
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
use crate::DiagDeviceCtrlMessage;
|
||||
use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus};
|
||||
use crate::config::Config;
|
||||
use crate::pcap::generate_pcap_data;
|
||||
use crate::qmdl_store::RecordingStore;
|
||||
|
||||
pub struct ServerState {
|
||||
pub config_path: String,
|
||||
pub config: Config,
|
||||
pub qmdl_store_lock: Arc<RwLock<RecordingStore>>,
|
||||
pub diag_device_ctrl_sender: Sender<DiagDeviceCtrlMessage>,
|
||||
pub analysis_status_lock: Arc<RwLock<AnalysisStatus>>,
|
||||
pub analysis_sender: Sender<AnalysisCtrlMessage>,
|
||||
pub daemon_restart_tx: Arc<RwLock<Option<oneshot::Sender<()>>>>,
|
||||
}
|
||||
|
||||
pub async fn get_qmdl(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
Path(qmdl_name): Path<String>,
|
||||
) -> Result<Response, (StatusCode, String)> {
|
||||
let qmdl_idx = qmdl_name.trim_end_matches(".qmdl");
|
||||
let qmdl_store = state.qmdl_store_lock.read().await;
|
||||
let (entry_index, entry) = qmdl_store.entry_for_name(qmdl_idx).ok_or((
|
||||
StatusCode::NOT_FOUND,
|
||||
format!("couldn't find qmdl file with name {qmdl_idx}"),
|
||||
))?;
|
||||
let qmdl_file = qmdl_store
|
||||
.open_entry_qmdl(entry_index)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("error opening QMDL file: {err}"),
|
||||
)
|
||||
})?;
|
||||
let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64);
|
||||
let qmdl_stream = ReaderStream::new(limited_qmdl_file);
|
||||
|
||||
let headers = [
|
||||
(CONTENT_TYPE, "application/octet-stream"),
|
||||
(CONTENT_LENGTH, &entry.qmdl_size_bytes.to_string()),
|
||||
];
|
||||
let body = Body::from_stream(qmdl_stream);
|
||||
Ok((headers, body).into_response())
|
||||
}
|
||||
|
||||
pub async fn serve_static(
|
||||
State(_): State<Arc<ServerState>>,
|
||||
Path(path): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
let path = path.trim_start_matches('/');
|
||||
|
||||
match path {
|
||||
"rayhunter_icon.png" => (
|
||||
[(header::CONTENT_TYPE, HeaderValue::from_static("image/png"))],
|
||||
include_bytes!("../web/build/rayhunter_icon.png"),
|
||||
)
|
||||
.into_response(),
|
||||
"rayhunter_orca_only.png" => (
|
||||
[(header::CONTENT_TYPE, HeaderValue::from_static("image/png"))],
|
||||
include_bytes!("../web/build/rayhunter_orca_only.png"),
|
||||
)
|
||||
.into_response(),
|
||||
"rayhunter_text.png" => (
|
||||
[(header::CONTENT_TYPE, HeaderValue::from_static("image/png"))],
|
||||
include_bytes!("../web/build/rayhunter_text.png"),
|
||||
)
|
||||
.into_response(),
|
||||
"favicon.png" => (
|
||||
[(header::CONTENT_TYPE, HeaderValue::from_static("image/png"))],
|
||||
include_bytes!("../web/build/favicon.png"),
|
||||
)
|
||||
.into_response(),
|
||||
"index.html" => (
|
||||
[
|
||||
(header::CONTENT_TYPE, HeaderValue::from_static("text/html")),
|
||||
(header::CONTENT_ENCODING, HeaderValue::from_static("gzip")),
|
||||
],
|
||||
include_bytes!("../web/build/index.html.gz"),
|
||||
)
|
||||
.into_response(),
|
||||
path => {
|
||||
warn!("404 on path: {path}");
|
||||
StatusCode::NOT_FOUND.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_config(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
) -> Result<Json<Config>, (StatusCode, String)> {
|
||||
Ok(Json(state.config.clone()))
|
||||
}
|
||||
|
||||
pub async fn set_config(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
Json(config): Json<Config>,
|
||||
) -> Result<(StatusCode, String), (StatusCode, String)> {
|
||||
let config_str = toml::to_string_pretty(&config).map_err(|err| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("failed to serialize config as TOML: {err}"),
|
||||
)
|
||||
})?;
|
||||
|
||||
write(&state.config_path, config_str).await.map_err(|err| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("failed to write config file: {err}"),
|
||||
)
|
||||
})?;
|
||||
|
||||
// Trigger daemon restart after writing config
|
||||
let mut restart_tx = state.daemon_restart_tx.write().await;
|
||||
if let Some(sender) = restart_tx.take() {
|
||||
sender.send(()).map_err(|_| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"couldn't send restart signal".to_string(),
|
||||
)
|
||||
})?;
|
||||
Ok((
|
||||
StatusCode::ACCEPTED,
|
||||
"wrote config and triggered restart".to_string(),
|
||||
))
|
||||
} else {
|
||||
Ok((
|
||||
StatusCode::ACCEPTED,
|
||||
"wrote config but restart already triggered".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_zip(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
Path(entry_name): Path<String>,
|
||||
) -> Result<Response, (StatusCode, String)> {
|
||||
let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned();
|
||||
let (entry_index, qmdl_size_bytes) = {
|
||||
let qmdl_store = state.qmdl_store_lock.read().await;
|
||||
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or((
|
||||
StatusCode::NOT_FOUND,
|
||||
format!("couldn't find entry with name {qmdl_idx}"),
|
||||
))?;
|
||||
|
||||
if entry.qmdl_size_bytes == 0 {
|
||||
return Err((
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
"QMDL file is empty, try again in a bit!".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
(entry_index, entry.qmdl_size_bytes)
|
||||
};
|
||||
|
||||
let qmdl_store_lock = state.qmdl_store_lock.clone();
|
||||
|
||||
let (reader, writer) = duplex(8192);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let result: Result<(), Error> = async {
|
||||
let mut zip = ZipFileWriter::with_tokio(writer);
|
||||
|
||||
// Add QMDL file
|
||||
{
|
||||
let entry =
|
||||
ZipEntryBuilder::new(format!("{qmdl_idx}.qmdl").into(), Compression::Stored);
|
||||
// FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does
|
||||
// not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed
|
||||
// once https://github.com/Majored/rs-async-zip/pull/160 is released.
|
||||
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
|
||||
|
||||
let mut qmdl_file = {
|
||||
let qmdl_store = qmdl_store_lock.read().await;
|
||||
qmdl_store
|
||||
.open_entry_qmdl(entry_index)
|
||||
.await?
|
||||
.take(qmdl_size_bytes as u64)
|
||||
};
|
||||
|
||||
copy(&mut qmdl_file, &mut entry_writer).await?;
|
||||
entry_writer.into_inner().close().await?;
|
||||
}
|
||||
|
||||
// Add PCAP file
|
||||
{
|
||||
let entry =
|
||||
ZipEntryBuilder::new(format!("{qmdl_idx}.pcapng").into(), Compression::Stored);
|
||||
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
|
||||
|
||||
let qmdl_file_for_pcap = {
|
||||
let qmdl_store = qmdl_store_lock.read().await;
|
||||
qmdl_store
|
||||
.open_entry_qmdl(entry_index)
|
||||
.await?
|
||||
.take(qmdl_size_bytes as u64)
|
||||
};
|
||||
|
||||
if let Err(e) =
|
||||
generate_pcap_data(&mut entry_writer, qmdl_file_for_pcap, qmdl_size_bytes).await
|
||||
{
|
||||
// if we fail to generate the PCAP file, we should still continue and give the
|
||||
// user the QMDL.
|
||||
error!("Failed to generate PCAP: {e:?}");
|
||||
}
|
||||
|
||||
entry_writer.into_inner().close().await?;
|
||||
}
|
||||
|
||||
zip.close().await?;
|
||||
Ok(())
|
||||
}
|
||||
.await;
|
||||
|
||||
if let Err(e) = result {
|
||||
error!("Error generating ZIP file: {e:?}");
|
||||
}
|
||||
});
|
||||
|
||||
let headers = [(CONTENT_TYPE, "application/zip")];
|
||||
let body = Body::from_stream(ReaderStream::new(reader));
|
||||
Ok((headers, body).into_response())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_zip::base::read::mem::ZipFileReader;
|
||||
use axum::extract::{Path, State};
|
||||
use tempfile::TempDir;
|
||||
|
||||
async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let store_path = temp_dir.path().to_path_buf();
|
||||
let store = crate::qmdl_store::RecordingStore::create(&store_path)
|
||||
.await
|
||||
.unwrap();
|
||||
(temp_dir, Arc::new(RwLock::new(store)))
|
||||
}
|
||||
|
||||
async fn create_test_entry_with_data(
|
||||
store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>,
|
||||
test_data: &[u8],
|
||||
) -> String {
|
||||
let entry_name = {
|
||||
let mut store = store_lock.write().await;
|
||||
let (mut qmdl_file, _analysis_file) = store.new_entry().await.unwrap();
|
||||
|
||||
if !test_data.is_empty() {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
qmdl_file.write_all(test_data).await.unwrap();
|
||||
qmdl_file.flush().await.unwrap();
|
||||
}
|
||||
|
||||
let current_entry = store.current_entry.unwrap();
|
||||
let entry = &store.manifest.entries[current_entry];
|
||||
let entry_name = entry.name.clone();
|
||||
|
||||
store
|
||||
.update_entry_qmdl_size(current_entry, test_data.len())
|
||||
.await
|
||||
.unwrap();
|
||||
entry_name
|
||||
};
|
||||
|
||||
let mut store = store_lock.write().await;
|
||||
store.close_current_entry().await.unwrap();
|
||||
entry_name
|
||||
}
|
||||
|
||||
fn create_test_server_state(
|
||||
store_lock: Arc<RwLock<crate::qmdl_store::RecordingStore>>,
|
||||
) -> Arc<ServerState> {
|
||||
let (tx, _rx) = tokio::sync::mpsc::channel(1);
|
||||
let (analysis_tx, _analysis_rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
let analysis_status = {
|
||||
let store = store_lock.try_read().unwrap();
|
||||
crate::analysis::AnalysisStatus::new(&store)
|
||||
};
|
||||
|
||||
Arc::new(ServerState {
|
||||
config_path: "/tmp/test_config.toml".to_string(),
|
||||
config: Config::default(),
|
||||
qmdl_store_lock: store_lock,
|
||||
diag_device_ctrl_sender: tx,
|
||||
analysis_status_lock: Arc::new(RwLock::new(analysis_status)),
|
||||
analysis_sender: analysis_tx,
|
||||
daemon_restart_tx: Arc::new(RwLock::new(None)),
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_zip_success() {
|
||||
let (_temp_dir, store_lock) = create_test_qmdl_store().await;
|
||||
let test_qmdl_data = vec![0x7E, 0x00, 0x00, 0x00, 0x10, 0x00, 0x7E];
|
||||
let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await;
|
||||
let state = create_test_server_state(store_lock);
|
||||
|
||||
let result = get_zip(State(state), Path(entry_name.clone())).await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
let response = result.unwrap();
|
||||
|
||||
let headers = response.headers();
|
||||
assert_eq!(headers.get("content-type").unwrap(), "application/zip");
|
||||
|
||||
let body = response.into_body();
|
||||
let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap();
|
||||
|
||||
let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap();
|
||||
|
||||
let filenames = zip_reader
|
||||
.file()
|
||||
.entries()
|
||||
.iter()
|
||||
.map(|entry| entry.filename().as_str().unwrap().to_owned())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
assert_eq!(
|
||||
filenames,
|
||||
vec![format!("{entry_name}.qmdl"), format!("{entry_name}.pcapng"),]
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,158 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::qmdl_store::ManifestEntry;
|
||||
use crate::server::ServerState;
|
||||
|
||||
use axum::Json;
|
||||
use axum::extract::State;
|
||||
use axum::http::StatusCode;
|
||||
use log::error;
|
||||
use rayhunter::{Device, util::RuntimeMetadata};
|
||||
use serde::Serialize;
|
||||
use tokio::process::Command;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct SystemStats {
|
||||
pub disk_stats: DiskStats,
|
||||
pub memory_stats: MemoryStats,
|
||||
pub runtime_metadata: RuntimeMetadata,
|
||||
}
|
||||
|
||||
impl SystemStats {
|
||||
pub async fn new(qmdl_path: &str, device: &Device) -> Result<Self, String> {
|
||||
Ok(Self {
|
||||
disk_stats: DiskStats::new(qmdl_path, device).await?,
|
||||
memory_stats: MemoryStats::new(device).await?,
|
||||
runtime_metadata: RuntimeMetadata::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct DiskStats {
|
||||
partition: String,
|
||||
total_size: String,
|
||||
used_size: String,
|
||||
available_size: String,
|
||||
used_percent: String,
|
||||
mounted_on: String,
|
||||
}
|
||||
|
||||
impl DiskStats {
|
||||
// runs "df -h <qmdl_path>" to get storage statistics for the partition containing
|
||||
// the QMDL file.
|
||||
pub async fn new(qmdl_path: &str, device: &Device) -> Result<Self, String> {
|
||||
// Uz801 needs to be told to use the busybox df specifically
|
||||
let mut df_cmd: Command;
|
||||
if matches!(device, Device::Uz801) {
|
||||
df_cmd = Command::new("busybox");
|
||||
df_cmd.arg("df");
|
||||
} else {
|
||||
df_cmd = Command::new("df");
|
||||
}
|
||||
df_cmd.arg("-h");
|
||||
df_cmd.arg(qmdl_path);
|
||||
let stdout = get_cmd_output(df_cmd).await?;
|
||||
|
||||
// Handle standard df -h format
|
||||
let mut parts = stdout.split_whitespace().skip(7);
|
||||
Ok(Self {
|
||||
partition: parts.next().ok_or("error parsing df output")?.to_string(),
|
||||
total_size: parts.next().ok_or("error parsing df output")?.to_string(),
|
||||
used_size: parts.next().ok_or("error parsing df output")?.to_string(),
|
||||
available_size: parts.next().ok_or("error parsing df output")?.to_string(),
|
||||
used_percent: parts.next().ok_or("error parsing df output")?.to_string(),
|
||||
mounted_on: parts.next().ok_or("error parsing df output")?.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct MemoryStats {
|
||||
total: String,
|
||||
used: String,
|
||||
free: String,
|
||||
}
|
||||
|
||||
// runs the given command and returns its stdout as a string
|
||||
async fn get_cmd_output(mut cmd: Command) -> Result<String, String> {
|
||||
let cmd_str = format!("{:?}", &cmd);
|
||||
let output = cmd
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| format!("error running command {}: {}", &cmd_str, e))?;
|
||||
if !output.status.success() {
|
||||
return Err(format!(
|
||||
"command {} failed with exit code {}",
|
||||
&cmd_str,
|
||||
output.status.code().unwrap()
|
||||
));
|
||||
}
|
||||
Ok(String::from_utf8_lossy(&output.stdout).to_string())
|
||||
}
|
||||
|
||||
impl MemoryStats {
|
||||
// runs "free -k" and parses the output to retrieve memory stats for most devices,
|
||||
pub async fn new(device: &Device) -> Result<Self, String> {
|
||||
// Use busybox for Uz801
|
||||
let mut free_cmd: Command;
|
||||
if matches!(device, Device::Uz801) {
|
||||
free_cmd = Command::new("busybox");
|
||||
free_cmd.arg("free");
|
||||
} else {
|
||||
free_cmd = Command::new("free");
|
||||
}
|
||||
free_cmd.arg("-k");
|
||||
let stdout = get_cmd_output(free_cmd).await?;
|
||||
let mut numbers = stdout
|
||||
.split_whitespace()
|
||||
.flat_map(|part| part.parse::<usize>());
|
||||
Ok(Self {
|
||||
total: humanize_kb(numbers.next().ok_or("error parsing free output")?),
|
||||
used: humanize_kb(numbers.next().ok_or("error parsing free output")?),
|
||||
free: humanize_kb(numbers.next().ok_or("error parsing free output")?),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// turns a number of kilobytes (like 28293) into a human-readable string (like "28.3M")
|
||||
fn humanize_kb(kb: usize) -> String {
|
||||
if kb < 1000 {
|
||||
return format!("{kb}K");
|
||||
}
|
||||
format!("{:.1}M", kb as f64 / 1024.0)
|
||||
}
|
||||
|
||||
pub async fn get_system_stats(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
) -> Result<Json<SystemStats>, (StatusCode, String)> {
|
||||
let qmdl_store = state.qmdl_store_lock.read().await;
|
||||
match SystemStats::new(qmdl_store.path.to_str().unwrap(), &state.config.device).await {
|
||||
Ok(stats) => Ok(Json(stats)),
|
||||
Err(err) => {
|
||||
error!("error getting system stats: {err}");
|
||||
Err((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"error getting system stats".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct ManifestStats {
|
||||
pub entries: Vec<ManifestEntry>,
|
||||
pub current_entry: Option<ManifestEntry>,
|
||||
}
|
||||
|
||||
pub async fn get_qmdl_manifest(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
) -> Result<Json<ManifestStats>, (StatusCode, String)> {
|
||||
let qmdl_store = state.qmdl_store_lock.read().await;
|
||||
let mut entries = qmdl_store.manifest.entries.clone();
|
||||
let current_entry = qmdl_store.current_entry.map(|index| entries.remove(index));
|
||||
Ok(Json(ManifestStats {
|
||||
entries,
|
||||
current_entry,
|
||||
}))
|
||||
}
|
||||
Reference in New Issue
Block a user