chore: cargo fmt

This commit is contained in:
oopsbagel
2025-04-13 22:00:54 -07:00
committed by Will Greenberg
parent 151e186ef9
commit 9fe75ac961
37 changed files with 1246 additions and 819 deletions

View File

@@ -18,9 +18,9 @@ use tokio::sync::mpsc::Receiver;
use tokio::sync::{RwLock, RwLockWriteGuard};
use tokio_util::task::TaskTracker;
use crate::dummy_analyzer::TestAnalyzer;
use crate::qmdl_store::RecordingStore;
use crate::server::ServerState;
use crate::dummy_analyzer::TestAnalyzer;
pub struct AnalysisWriter {
writer: BufWriter<File>,
@@ -53,7 +53,10 @@ impl AnalysisWriter {
// Runs the analysis harness on the given container, serializing the results
// to the analysis file and returning the file's new length.
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<(usize, bool), std::io::Error> {
pub async fn analyze(
&mut self,
container: MessagesContainer,
) -> Result<(usize, bool), std::io::Error> {
let row = self.harness.analyze_qmdl_messages(container);
if !row.is_empty() {
self.write(&row).await?;
@@ -182,7 +185,10 @@ pub fn run_analysis_thread(
let count = queued_len(analysis_status_lock.clone()).await;
for _ in 0..count {
let name = dequeue_to_running(analysis_status_lock.clone()).await;
if let Err(err) = perform_analysis(&name, qmdl_store_lock.clone(), enable_dummy_analyzer).await {
if let Err(err) =
perform_analysis(&name, qmdl_store_lock.clone(), enable_dummy_analyzer)
.await
{
error!("failed to analyze {}: {}", name, err);
}
clear_running(analysis_status_lock.clone()).await;

View File

@@ -1,9 +1,15 @@
use std::{collections::HashMap, future, path::PathBuf, pin::pin};
use log::{info, warn};
use rayhunter::{analysis::analyzer::{EventType, Harness}, diag::DataType, gsmtap_parser, pcap::GsmtapPcapWriter, qmdl::QmdlReader};
use tokio::fs::{metadata, read_dir, File};
use clap::Parser;
use futures::TryStreamExt;
use log::{info, warn};
use rayhunter::{
analysis::analyzer::{EventType, Harness},
diag::DataType,
gsmtap_parser,
pcap::GsmtapPcapWriter,
qmdl::QmdlReader,
};
use std::{collections::HashMap, future, path::PathBuf, pin::pin};
use tokio::fs::{metadata, read_dir, File};
mod dummy_analyzer;
@@ -28,15 +34,24 @@ struct Args {
async fn analyze_file(harness: &mut Harness, qmdl_path: &str, show_skipped: bool) {
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
let file_size = qmdl_file.metadata().await.expect("failed to get QMDL file metadata").len();
let file_size = qmdl_file
.metadata()
.await
.expect("failed to get QMDL file metadata")
.len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin!(qmdl_reader.as_stream()
let mut qmdl_stream = pin!(qmdl_reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
let mut skipped_reasons: HashMap<String, i32> = HashMap::new();
let mut total_messages = 0;
let mut warnings = 0;
let mut skipped = 0;
while let Some(container) = qmdl_stream.try_next().await.expect("failed getting QMDL container") {
while let Some(container) = qmdl_stream
.try_next()
.await
.expect("failed getting QMDL container")
{
let row = harness.analyze_qmdl_messages(container);
total_messages += 1;
for reason in row.skipped_message_reasons {
@@ -50,18 +65,13 @@ async fn analyze_file(harness: &mut Harness, qmdl_path: &str, show_skipped: bool
EventType::Informational => {
info!(
"{}: INFO - {} {}",
qmdl_path,
analysis.timestamp,
event.message,
qmdl_path, analysis.timestamp, event.message,
);
}
EventType::QualitativeWarning { severity } => {
warn!(
"{}: WARNING (Severity: {:?}) - {} {}",
qmdl_path,
severity,
analysis.timestamp,
event.message,
qmdl_path, severity, analysis.timestamp, event.message,
);
warnings += 1;
}
@@ -75,22 +85,36 @@ async fn analyze_file(harness: &mut Harness, qmdl_path: &str, show_skipped: bool
info!(" - {}: \"{}\"", count, reason);
}
}
info!("{}: {} messages analyzed, {} warnings, {} messages skipped", qmdl_path, total_messages, warnings, skipped);
info!(
"{}: {} messages analyzed, {} warnings, {} messages skipped",
qmdl_path, total_messages, warnings, skipped
);
}
async fn pcapify(qmdl_path: &PathBuf) {
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open qmdl file");
let qmdl_file = &mut File::open(&qmdl_path)
.await
.expect("failed to open qmdl file");
let qmdl_file_size = qmdl_file.metadata().await.unwrap().len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_file_size as usize));
let mut pcap_path = qmdl_path.clone();
pcap_path.set_extension("pcap");
let pcap_file = &mut File::create(&pcap_path).await.expect("failed to open pcap file");
let pcap_file = &mut File::create(&pcap_path)
.await
.expect("failed to open pcap file");
let mut pcap_writer = GsmtapPcapWriter::new(pcap_file).await.unwrap();
pcap_writer.write_iface_header().await.unwrap();
while let Some(container) = qmdl_reader.get_next_messages_container().await.expect("failed to get container") {
while let Some(container) = qmdl_reader
.get_next_messages_container()
.await
.expect("failed to get container")
{
for msg in container.into_messages().into_iter().flatten() {
if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) {
pcap_writer.write_gsmtap_message(parsed, timestamp).await.expect("failed to write");
pcap_writer
.write_gsmtap_message(parsed, timestamp)
.await
.expect("failed to write");
}
}
}
@@ -109,7 +133,8 @@ async fn main() {
.with_colors(true)
.without_timestamps()
.with_level(level)
.init().unwrap();
.init()
.unwrap();
let mut harness = Harness::new_with_all_analyzers();
if args.enable_dummy_analyzer {
@@ -120,7 +145,9 @@ async fn main() {
info!(" - {}: {}", analyzer.name, analyzer.description);
}
let metadata = metadata(&args.qmdl_path).await.expect("failed to get metadata");
let metadata = metadata(&args.qmdl_path)
.await
.expect("failed to get metadata");
if metadata.is_dir() {
let mut dir = read_dir(&args.qmdl_path).await.expect("failed to read dir");
while let Some(entry) = dir.next_entry().await.expect("failed to get entry") {

View File

@@ -2,8 +2,7 @@ use crate::error::RayhunterError;
use serde::Deserialize;
#[derive(Debug)]
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct Config {
pub qmdl_store_path: String,
@@ -27,7 +26,10 @@ impl Default for Config {
}
}
pub fn parse_config<P>(path: P) -> Result<Config, RayhunterError> where P: AsRef<std::path::Path> {
pub fn parse_config<P>(path: P) -> Result<Config, RayhunterError>
where
P: AsRef<std::path::Path>,
{
if let Ok(config_file) = std::fs::read_to_string(&path) {
Ok(toml::from_str(&config_file).map_err(RayhunterError::ConfigFileParsingError)?)
} else {

View File

@@ -1,38 +1,43 @@
mod analysis;
mod config;
mod error;
mod pcap;
mod server;
mod stats;
mod qmdl_store;
mod diag;
mod display;
mod dummy_analyzer;
mod error;
mod pcap;
mod qmdl_store;
mod server;
mod stats;
use crate::config::{parse_config, parse_args};
use crate::config::{parse_args, parse_config};
use crate::diag::run_diag_read_thread;
use crate::qmdl_store::RecordingStore;
use crate::server::{ServerState, get_qmdl, serve_static};
use crate::pcap::get_pcap;
use crate::stats::get_system_stats;
use crate::error::RayhunterError;
use crate::pcap::get_pcap;
use crate::qmdl_store::RecordingStore;
use crate::server::{get_qmdl, serve_static, ServerState};
use crate::stats::get_system_stats;
use analysis::{get_analysis_status, run_analysis_thread, start_analysis, AnalysisCtrlMessage, AnalysisStatus};
use analysis::{
get_analysis_status, run_analysis_thread, start_analysis, AnalysisCtrlMessage, AnalysisStatus,
};
use axum::response::Redirect;
use diag::{delete_all_recordings, delete_recording, get_analysis_report, start_recording, stop_recording, DiagDeviceCtrlMessage};
use log::{info, error};
use qmdl_store::RecordingStoreError;
use rayhunter::diag_device::DiagDevice;
use axum::routing::{get, post};
use axum::Router;
use diag::{
delete_all_recordings, delete_recording, get_analysis_report, start_recording, stop_recording,
DiagDeviceCtrlMessage,
};
use log::{error, info};
use qmdl_store::RecordingStoreError;
use rayhunter::diag_device::DiagDevice;
use stats::get_qmdl_manifest;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::{oneshot, RwLock};
use tokio::task::JoinHandle;
use tokio_util::task::TaskTracker;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::sync::{RwLock, oneshot};
use std::sync::Arc;
type AppRouter = Router<Arc<ServerState>>;
@@ -70,7 +75,8 @@ async fn run_server(
info!("The orca is hunting for stingrays...");
axum::serve(listener, app)
.with_graceful_shutdown(server_shutdown_signal(server_shutdown_rx))
.await.unwrap();
.await
.unwrap();
})
}
@@ -88,7 +94,9 @@ async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, Rayh
if store_exists {
Ok(RecordingStore::load(&config.qmdl_store_path).await?)
} else {
Err(RayhunterError::NoStoreDebugMode(config.qmdl_store_path.clone()))
Err(RayhunterError::NoStoreDebugMode(
config.qmdl_store_path.clone(),
))
}
} else if store_exists {
match RecordingStore::load(&config.qmdl_store_path).await {
@@ -97,7 +105,7 @@ async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, Rayh
error!("failed to parse QMDL manifest: {}", err);
info!("creating new empty manifest...");
Ok(RecordingStore::create(&config.qmdl_store_path).await?)
},
}
Err(err) => Err(err.into()),
}
} else {
@@ -126,18 +134,24 @@ fn run_ctrl_c_thread(
info!("Done!");
}
server_shutdown_tx.send(())
server_shutdown_tx
.send(())
.expect("couldn't send server shutdown signal");
info!("sending UI shutdown");
if let Some(ui_shutdown_tx) = maybe_ui_shutdown_tx {
ui_shutdown_tx.send(())
ui_shutdown_tx
.send(())
.expect("couldn't send ui shutdown signal");
}
diag_device_sender.send(DiagDeviceCtrlMessage::Exit).await
diag_device_sender
.send(DiagDeviceCtrlMessage::Exit)
.await
.expect("couldn't send Exit message to diag thread");
analysis_tx.send(AnalysisCtrlMessage::Exit).await
analysis_tx
.send(AnalysisCtrlMessage::Exit)
.await
.expect("couldn't send Exit message to analysis thread");
},
}
Err(err) => {
error!("Unable to listen for shutdown signal: {}", err);
}
@@ -146,7 +160,6 @@ fn run_ctrl_c_thread(
})
}
#[tokio::main]
async fn main() -> Result<(), RayhunterError> {
env_logger::init();
@@ -167,21 +180,43 @@ async fn main() -> Result<(), RayhunterError> {
if !config.debug_mode {
let (ui_shutdown_tx, ui_shutdown_rx) = oneshot::channel();
maybe_ui_shutdown_tx = Some(ui_shutdown_tx);
let mut dev = DiagDevice::new().await
let mut dev = DiagDevice::new()
.await
.map_err(RayhunterError::DiagInitError)?;
dev.config_logs().await
dev.config_logs()
.await
.map_err(RayhunterError::DiagInitError)?;
info!("Starting Diag Thread");
run_diag_read_thread(&task_tracker, dev, rx, ui_update_tx.clone(), qmdl_store_lock.clone(), config.enable_dummy_analyzer);
run_diag_read_thread(
&task_tracker,
dev,
rx,
ui_update_tx.clone(),
qmdl_store_lock.clone(),
config.enable_dummy_analyzer,
);
info!("Starting UI");
display::update_ui(&task_tracker, &config, ui_shutdown_rx, ui_update_rx);
}
let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>();
info!("create shutdown thread");
let analysis_status_lock = Arc::new(RwLock::new(AnalysisStatus::default()));
run_analysis_thread(&task_tracker, analysis_rx, qmdl_store_lock.clone(), analysis_status_lock.clone(), config.enable_dummy_analyzer);
run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, maybe_ui_shutdown_tx, qmdl_store_lock.clone(), analysis_tx.clone());
run_analysis_thread(
&task_tracker,
analysis_rx,
qmdl_store_lock.clone(),
analysis_status_lock.clone(),
config.enable_dummy_analyzer,
);
run_ctrl_c_thread(
&task_tracker,
tx.clone(),
server_shutdown_tx,
maybe_ui_shutdown_tx,
qmdl_store_lock.clone(),
analysis_tx.clone(),
);
let state = Arc::new(ServerState {
qmdl_store_lock: qmdl_store_lock.clone(),
diag_device_ctrl_sender: tx,

View File

@@ -6,21 +6,21 @@ use axum::extract::{Path, State};
use axum::http::header::CONTENT_TYPE;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use futures::{StreamExt, TryStreamExt};
use log::{debug, error, info};
use rayhunter::diag::DataType;
use rayhunter::diag_device::DiagDevice;
use tokio::sync::RwLock;
use tokio::sync::mpsc::{Receiver, Sender};
use rayhunter::qmdl::QmdlWriter;
use log::{debug, error, info};
use tokio::fs::File;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::RwLock;
use tokio_util::io::ReaderStream;
use tokio_util::task::TaskTracker;
use futures::{StreamExt, TryStreamExt};
use crate::analysis::AnalysisWriter;
use crate::display;
use crate::qmdl_store::{RecordingStore, RecordingStoreError};
use crate::server::ServerState;
use crate::analysis::AnalysisWriter;
pub enum DiagDeviceCtrlMessage {
StopRecording,
@@ -119,35 +119,82 @@ pub fn run_diag_read_thread(
});
}
pub async fn start_recording(State(state): State<Arc<ServerState>>) -> Result<(StatusCode, String), (StatusCode, String)> {
pub async fn start_recording(
State(state): State<Arc<ServerState>>,
) -> Result<(StatusCode, String), (StatusCode, String)> {
if state.debug_mode {
return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string()));
}
let mut qmdl_store = state.qmdl_store_lock.write().await;
let (qmdl_file, analysis_file) = qmdl_store.new_entry().await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't create new qmdl entry: {}", e)))?;
let (qmdl_file, analysis_file) = qmdl_store.new_entry().await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't create new qmdl entry: {}", e),
)
})?;
let qmdl_writer = QmdlWriter::new(qmdl_file);
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StartRecording((qmdl_writer, analysis_file))).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
state
.diag_device_ctrl_sender
.send(DiagDeviceCtrlMessage::StartRecording((
qmdl_writer,
analysis_file,
)))
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't send stop recording message: {}", e),
)
})?;
let display_state = display::DisplayState::Recording;
state.ui_update_sender.send(display_state).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send ui update message: {}", e)))?;
state
.ui_update_sender
.send(display_state)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't send ui update message: {}", e),
)
})?;
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}
pub async fn stop_recording(State(state): State<Arc<ServerState>>) -> Result<(StatusCode, String), (StatusCode, String)> {
pub async fn stop_recording(
State(state): State<Arc<ServerState>>,
) -> Result<(StatusCode, String), (StatusCode, String)> {
if state.debug_mode {
return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string()));
}
let mut qmdl_store = state.qmdl_store_lock.write().await;
qmdl_store.close_current_entry().await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't close current qmdl entry: {}", e)))?;
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StopRecording).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
state.ui_update_sender.send(display::DisplayState::Paused).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send ui update message: {}", e)))?;
qmdl_store.close_current_entry().await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't close current qmdl entry: {}", e),
)
})?;
state
.diag_device_ctrl_sender
.send(DiagDeviceCtrlMessage::StopRecording)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't send stop recording message: {}", e),
)
})?;
state
.ui_update_sender
.send(display::DisplayState::Paused)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't send ui update message: {}", e),
)
})?;
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}
@@ -160,45 +207,98 @@ pub async fn delete_recording(
}
let mut qmdl_store = state.qmdl_store_lock.write().await;
match qmdl_store.delete_entry(&qmdl_name).await {
Err(RecordingStoreError::NoSuchEntryError) => return Err((StatusCode::BAD_REQUEST, format!("no recording with name {qmdl_name}"))),
Err(e) => return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't delete recording: {e}"))),
Ok(_) => {},
Err(RecordingStoreError::NoSuchEntryError) => {
return Err((
StatusCode::BAD_REQUEST,
format!("no recording with name {qmdl_name}"),
))
}
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't delete recording: {e}"),
))
}
Ok(_) => {}
}
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StopRecording).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
state.ui_update_sender.send(display::DisplayState::Paused).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send ui update message: {}", e)))?;
state
.diag_device_ctrl_sender
.send(DiagDeviceCtrlMessage::StopRecording)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't send stop recording message: {}", e),
)
})?;
state
.ui_update_sender
.send(display::DisplayState::Paused)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't send ui update message: {}", e),
)
})?;
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}
pub async fn delete_all_recordings(State(state): State<Arc<ServerState>>) -> Result<(StatusCode, String), (StatusCode, String)> {
pub async fn delete_all_recordings(
State(state): State<Arc<ServerState>>,
) -> Result<(StatusCode, String), (StatusCode, String)> {
if state.debug_mode {
return Err((StatusCode::FORBIDDEN, "server is in debug mode".to_string()));
}
let mut qmdl_store = state.qmdl_store_lock.write().await;
qmdl_store.delete_all_entries().await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't delete all recordings: {}", e)))?;
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StopRecording).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
state.ui_update_sender.send(display::DisplayState::Paused).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send ui update message: {}", e)))?;
qmdl_store.delete_all_entries().await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't delete all recordings: {}", e),
)
})?;
state
.diag_device_ctrl_sender
.send(DiagDeviceCtrlMessage::StopRecording)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't send stop recording message: {}", e),
)
})?;
state
.ui_update_sender
.send(display::DisplayState::Paused)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("couldn't send ui update message: {}", e),
)
})?;
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}
pub async fn get_analysis_report(State(state): State<Arc<ServerState>>, Path(qmdl_name): Path<String>) -> Result<Response, (StatusCode, String)> {
pub async fn get_analysis_report(
State(state): State<Arc<ServerState>>,
Path(qmdl_name): Path<String>,
) -> Result<Response, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let (entry_index, _) = if qmdl_name == "live" {
qmdl_store.get_current_entry().ok_or((
StatusCode::SERVICE_UNAVAILABLE,
"No QMDL data's being recorded to analyze, try starting a new recording!".to_string()
"No QMDL data's being recorded to analyze, try starting a new recording!".to_string(),
))?
} else {
qmdl_store.entry_for_name(&qmdl_name).ok_or((
StatusCode::NOT_FOUND,
format!("Couldn't find QMDL entry with name \"{}\"", qmdl_name)
format!("Couldn't find QMDL entry with name \"{}\"", qmdl_name),
))?
};
let analysis_file = qmdl_store.open_entry_analysis(entry_index).await
let analysis_file = qmdl_store
.open_entry_analysis(entry_index)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?;
let analysis_stream = ReaderStream::new(analysis_file);

View File

@@ -1,9 +1,9 @@
use crate::config;
use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer};
use crate::display::DisplayState;
use crate::display::generic_framebuffer::{self, GenericFramebuffer, Dimensions};
use tokio::sync::oneshot;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_util::task::TaskTracker;
const FB_PATH: &str = "/dev/fb0";
@@ -20,10 +20,7 @@ impl GenericFramebuffer for Framebuffer {
}
}
fn write_buffer(
&mut self,
buffer: &[(u8, u8, u8)],
) {
fn write_buffer(&mut self, buffer: &[(u8, u8, u8)]) {
let mut raw_buffer = Vec::new();
for (r, g, b) in buffer {
let mut rgb565: u16 = (*r as u16 & 0b11111000) << 8;
@@ -40,7 +37,7 @@ pub fn update_ui(
task_tracker: &TaskTracker,
config: &config::Config,
ui_shutdown_rx: oneshot::Receiver<()>,
ui_update_rx: Receiver<DisplayState>
ui_update_rx: Receiver<DisplayState>,
) {
generic_framebuffer::update_ui(
task_tracker,

View File

@@ -4,7 +4,7 @@ use tokio::sync::oneshot;
use tokio_util::task::TaskTracker;
use crate::config;
use crate::display::{DisplayState, tplink_onebit, tplink_framebuffer};
use crate::display::{tplink_framebuffer, tplink_onebit, DisplayState};
use std::fs;
@@ -21,19 +21,9 @@ pub fn update_ui(
if fs::exists(tplink_onebit::OLED_PATH).unwrap_or_default() {
info!("detected one-bit display");
tplink_onebit::update_ui(
task_tracker,
config,
ui_shutdown_rx,
ui_update_rx
)
tplink_onebit::update_ui(task_tracker, config, ui_shutdown_rx, ui_update_rx)
} else {
info!("fallback to framebuffer");
tplink_framebuffer::update_ui(
task_tracker,
config,
ui_shutdown_rx,
ui_update_rx
)
tplink_framebuffer::update_ui(task_tracker, config, ui_shutdown_rx, ui_update_rx)
}
}

View File

@@ -3,11 +3,11 @@ use std::io::Write;
use std::os::fd::AsRawFd;
use crate::config;
use crate::display::generic_framebuffer::{self, Dimensions, GenericFramebuffer};
use crate::display::DisplayState;
use crate::display::generic_framebuffer::{self, GenericFramebuffer, Dimensions};
use tokio::sync::oneshot;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_util::task::TaskTracker;
const FB_PATH: &str = "/dev/fb0";
@@ -33,10 +33,7 @@ impl GenericFramebuffer for Framebuffer {
}
}
fn write_buffer(
&mut self,
buffer: &[(u8, u8, u8)],
) {
fn write_buffer(&mut self, buffer: &[(u8, u8, u8)]) {
// for how to write to the buffer, consult M7350v5_en_gpl/bootable/recovery/recovery_color_oled.c
let dimensions = self.dimensions();
let width = dimensions.width;
@@ -65,7 +62,7 @@ impl GenericFramebuffer for Framebuffer {
unsafe {
let res = libc::ioctl(
f.as_raw_fd(),
0x4619, // FBIORECT_DISPLAY
0x4619, // FBIORECT_DISPLAY
&mut arg as *mut _,
std::mem::size_of::<fb_fillrect>(),
);
@@ -81,7 +78,7 @@ pub fn update_ui(
task_tracker: &TaskTracker,
config: &config::Config,
ui_shutdown_rx: oneshot::Receiver<()>,
ui_update_rx: Receiver<DisplayState>
ui_update_rx: Receiver<DisplayState>,
) {
generic_framebuffer::update_ui(
task_tracker,

View File

@@ -5,11 +5,11 @@ use rayhunter::telcom_parser::lte_rrc::{PCCH_MessageType, PCCH_MessageType_c1, P
use rayhunter::analysis::analyzer::{Analyzer, Event, EventType, Severity};
use rayhunter::analysis::information_element::{InformationElement, LteInformationElement};
pub struct TestAnalyzer{
pub struct TestAnalyzer {
pub count: i32,
}
impl Analyzer for TestAnalyzer{
impl Analyzer for TestAnalyzer {
fn get_name(&self) -> Cow<str> {
Cow::from("Example Analyzer")
}
@@ -22,15 +22,15 @@ impl Analyzer for TestAnalyzer{
self.count += 1;
if self.count % 100 == 0 {
return Some(Event {
event_type: EventType::Informational ,
event_type: EventType::Informational,
message: "multiple of 100 events processed".to_string(),
})
});
}
let pcch_msg = match ie {
InformationElement::LTE(lte_ie) => match &** lte_ie {
InformationElement::LTE(lte_ie) => match &**lte_ie {
LteInformationElement::PCCH(pcch_msg) => pcch_msg,
_ => return None,
}
},
_ => return None,
};
let PCCH_MessageType::C1(PCCH_MessageType_c1::Paging(paging)) = &pcch_msg.message else {
@@ -39,9 +39,11 @@ impl Analyzer for TestAnalyzer{
for record in &paging.paging_record_list.as_ref()?.0 {
if let PagingUE_Identity::S_TMSI(_) = record.ue_identity {
return Some(Event {
event_type: EventType::QualitativeWarning { severity: Severity::Low },
event_type: EventType::QualitativeWarning {
severity: Severity::Low,
},
message: "TMSI was provided to cell".to_string(),
})
});
}
}
None

View File

@@ -1,10 +1,10 @@
use thiserror::Error;
use rayhunter::diag_device::DiagDeviceError;
use thiserror::Error;
use crate::qmdl_store::RecordingStoreError;
#[derive(Error, Debug)]
pub enum RayhunterError{
pub enum RayhunterError {
#[error("Config file parsing error: {0}")]
ConfigFileParsingError(#[from] toml::de::Error),
#[error("Diag intialization error: {0}")]

View File

@@ -1,36 +1,43 @@
use crate::ServerState;
use axum::body::Body;
use axum::extract::{Path, State};
use axum::http::header::CONTENT_TYPE;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use futures::TryStreamExt;
use log::error;
use rayhunter::diag::DataType;
use rayhunter::gsmtap_parser;
use rayhunter::pcap::GsmtapPcapWriter;
use rayhunter::qmdl::QmdlReader;
use axum::body::Body;
use axum::http::header::CONTENT_TYPE;
use axum::extract::{State, Path};
use axum::http::StatusCode;
use axum::response::{Response, IntoResponse};
use std::sync::Arc;
use std::{future, pin::pin};
use tokio::io::duplex;
use tokio_util::io::ReaderStream;
use std::{future, pin::pin};
use std::sync::Arc;
use log::error;
use futures::TryStreamExt;
// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
// written so far. This is done by spawning a thread which streams chunks of
// pcap data to a channel that's piped to the client.
pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Path<String>) -> Result<Response, (StatusCode, String)> {
pub async fn get_pcap(
State(state): State<Arc<ServerState>>,
Path(qmdl_name): Path<String>,
) -> Result<Response, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_name)
.ok_or((StatusCode::NOT_FOUND, format!("couldn't find qmdl file with name {}", qmdl_name)))?;
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_name).ok_or((
StatusCode::NOT_FOUND,
format!("couldn't find qmdl file with name {}", qmdl_name),
))?;
if entry.qmdl_size_bytes == 0 {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"QMDL file is empty, try again in a bit!".to_string()
"QMDL file is empty, try again in a bit!".to_string(),
));
}
let qmdl_size_bytes = entry.qmdl_size_bytes;
let qmdl_file = qmdl_store.open_entry_qmdl(entry_index).await
let qmdl_file = qmdl_store
.open_entry_qmdl(entry_index)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?;
// the QMDL reader should stop at the last successfully written data chunk
// (entry.size_bytes)
@@ -40,20 +47,27 @@ pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Pa
tokio::spawn(async move {
let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes));
let mut messages_stream = pin!(reader.as_stream()
let mut messages_stream = pin!(reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
while let Some(container) = messages_stream.try_next().await.expect("failed getting QMDL container") {
while let Some(container) = messages_stream
.try_next()
.await
.expect("failed getting QMDL container")
{
for maybe_msg in container.into_messages() {
match maybe_msg {
Ok(msg) => {
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)
.expect("error parsing gsmtap message");
let maybe_gsmtap_msg =
gsmtap_parser::parse(msg).expect("error parsing gsmtap message");
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp).await
pcap_writer
.write_gsmtap_message(gsmtap_msg, timestamp)
.await
.expect("error writing pcap packet");
}
},
}
Err(e) => error!("error parsing message: {:?}", e),
}
}

View File

@@ -1,11 +1,11 @@
use rayhunter::util::RuntimeMetadata;
use chrono::{DateTime, Local};
use rayhunter::util::RuntimeMetadata;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use thiserror::Error;
use tokio::{
fs::{self, try_exists, File, OpenOptions},
io::AsyncWriteExt
io::AsyncWriteExt,
};
#[derive(Debug, Error)]
@@ -127,7 +127,7 @@ impl RecordingStore {
let mut store = RecordingStore {
path: path.as_ref().to_owned(),
manifest: Manifest {
entries: Vec::new()
entries: Vec::new(),
},
current_entry: None,
};
@@ -171,10 +171,7 @@ impl RecordingStore {
}
// Returns the corresponding QMDL file for a given entry
pub async fn open_entry_qmdl(
&self,
entry_index: usize,
) -> Result<File, RecordingStoreError> {
pub async fn open_entry_qmdl(&self, entry_index: usize) -> Result<File, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index];
File::open(entry.get_qmdl_filepath(&self.path))
.await
@@ -203,8 +200,7 @@ impl RecordingStore {
.open(entry.get_analysis_filepath(&self.path))
.await
.map_err(RecordingStoreError::ReadFileError)?;
self.update_entry_analysis_size(entry_index, 0)
.await?;
self.update_entry_analysis_size(entry_index, 0).await?;
Ok(file)
}
@@ -253,7 +249,8 @@ impl RecordingStore {
.await
.map_err(RecordingStoreError::WriteManifestError)?;
fs::rename(tmp_path, self.path.join("manifest.toml")).await
fs::rename(tmp_path, self.path.join("manifest.toml"))
.await
.map_err(RecordingStoreError::WriteManifestError)?;
Ok(())
@@ -261,7 +258,8 @@ impl RecordingStore {
// Finds an entry by filename
pub fn entry_for_name(&self, name: &str) -> Option<(usize, &ManifestEntry)> {
let entry_index = self.manifest
let entry_index = self
.manifest
.entries
.iter()
.position(|entry| entry.name == name)?;
@@ -274,7 +272,8 @@ impl RecordingStore {
}
pub async fn delete_entry(&mut self, name: &str) -> Result<ManifestEntry, RecordingStoreError> {
let entry_to_delete_idx = self.manifest
let entry_to_delete_idx = self
.manifest
.entries
.iter()
.position(|entry| entry.name == name)

View File

@@ -1,20 +1,20 @@
use axum::body::Body;
use axum::http::header::{self, CONTENT_LENGTH, CONTENT_TYPE};
use axum::extract::State;
use axum::http::{StatusCode, HeaderValue};
use axum::response::{Response, IntoResponse};
use axum::extract::Path;
use axum::extract::State;
use axum::http::header::{self, CONTENT_LENGTH, CONTENT_TYPE};
use axum::http::{HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use include_dir::{include_dir, Dir};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::Sender;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::io::ReaderStream;
use include_dir::{include_dir, Dir};
use crate::{display, DiagDeviceCtrlMessage};
use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus};
use crate::qmdl_store::RecordingStore;
use crate::{display, DiagDeviceCtrlMessage};
pub struct ServerState {
pub qmdl_store_lock: Arc<RwLock<RecordingStore>>,
@@ -25,13 +25,22 @@ pub struct ServerState {
pub debug_mode: bool,
}
pub async fn get_qmdl(State(state): State<Arc<ServerState>>, Path(qmdl_name): Path<String>) -> Result<Response, (StatusCode, String)> {
pub async fn get_qmdl(
State(state): State<Arc<ServerState>>,
Path(qmdl_name): Path<String>,
) -> Result<Response, (StatusCode, String)> {
let qmdl_idx = qmdl_name.trim_end_matches(".qmdl");
let qmdl_store = state.qmdl_store_lock.read().await;
let (entry_index, entry) = qmdl_store.entry_for_name(qmdl_idx)
.ok_or((StatusCode::NOT_FOUND, format!("couldn't find qmdl file with name {}", qmdl_idx)))?;
let qmdl_file = qmdl_store.open_entry_qmdl(entry_index).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("error opening QMDL file: {}", e)))?;
let (entry_index, entry) = qmdl_store.entry_for_name(qmdl_idx).ok_or((
StatusCode::NOT_FOUND,
format!("couldn't find qmdl file with name {}", qmdl_idx),
))?;
let qmdl_file = qmdl_store.open_entry_qmdl(entry_index).await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("error opening QMDL file: {}", e),
)
})?;
let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64);
let qmdl_stream = ReaderStream::new(limited_qmdl_file);
@@ -46,7 +55,10 @@ pub async fn get_qmdl(State(state): State<Arc<ServerState>>, Path(qmdl_name): Pa
// Bundles the server's static files (html/css/js) into the binary for easy distribution
static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/static");
pub async fn serve_static(State(state): State<Arc<ServerState>>, Path(path): Path<String>) -> impl IntoResponse {
pub async fn serve_static(
State(state): State<Arc<ServerState>>,
Path(path): Path<String>,
) -> impl IntoResponse {
let path = path.trim_start_matches('/');
let mime_type = mime_guess::from_path(path).first_or_text_plain();
@@ -62,7 +74,9 @@ pub async fn serve_static(State(state): State<Arc<ServerState>>, Path(path): Pat
return match File::open(build_path).await {
Ok(mut file) => {
let mut body = String::new();
file.read_to_string(&mut body).await.expect("failed to read file");
file.read_to_string(&mut body)
.await
.expect("failed to read file");
Response::builder()
.status(StatusCode::OK)
.header(
@@ -71,11 +85,11 @@ pub async fn serve_static(State(state): State<Arc<ServerState>>, Path(path): Pat
)
.body(Body::from(body))
.unwrap()
},
}
Err(_) => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap()
.unwrap(),
};
}

View File

@@ -3,9 +3,9 @@ use std::sync::Arc;
use crate::qmdl_store::ManifestEntry;
use crate::server::ServerState;
use axum::Json;
use axum::extract::State;
use axum::http::StatusCode;
use axum::Json;
use log::error;
use serde::Serialize;
use tokio::process::Command;
@@ -65,10 +65,16 @@ pub struct MemoryStats {
// runs the given command and returns its stdout as a string
async fn get_cmd_output(mut cmd: Command) -> Result<String, String> {
let cmd_str = format!("{:?}", &cmd);
let output = cmd.output().await
let output = cmd
.output()
.await
.map_err(|e| format!("error running command {}: {}", &cmd_str, e))?;
if !output.status.success() {
return Err(format!("command {} failed with exit code {}", &cmd_str, output.status.code().unwrap()));
return Err(format!(
"command {} failed with exit code {}",
&cmd_str,
output.status.code().unwrap()
));
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
@@ -79,7 +85,8 @@ impl MemoryStats {
let mut free_cmd = Command::new("free");
free_cmd.arg("-k");
let stdout = get_cmd_output(free_cmd).await?;
let mut numbers = stdout.split_whitespace()
let mut numbers = stdout
.split_whitespace()
.flat_map(|part| part.parse::<usize>());
Ok(Self {
total: humanize_kb(numbers.next().ok_or("error parsing free output")?),
@@ -91,13 +98,15 @@ impl MemoryStats {
// turns a number of kilobytes (like 28293) into a human-readable string (like "28.3M")
fn humanize_kb(kb: usize) -> String {
if kb < 1000{
if kb < 1000 {
return format!("{}K", kb);
}
format!("{:.1}M", kb as f64 / 1024.0)
}
pub async fn get_system_stats(State(state): State<Arc<ServerState>>) -> Result<Json<SystemStats>, (StatusCode, String)> {
pub async fn get_system_stats(
State(state): State<Arc<ServerState>>,
) -> Result<Json<SystemStats>, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
match SystemStats::new(qmdl_store.path.to_str().unwrap()).await {
Ok(stats) => Ok(Json(stats)),
@@ -105,9 +114,9 @@ pub async fn get_system_stats(State(state): State<Arc<ServerState>>) -> Result<J
error!("error getting system stats: {}", err);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error getting system stats".to_string()
"error getting system stats".to_string(),
))
},
}
}
}
@@ -117,7 +126,9 @@ pub struct ManifestStats {
pub current_entry: Option<ManifestEntry>,
}
pub async fn get_qmdl_manifest(State(state): State<Arc<ServerState>>) -> Result<Json<ManifestStats>, (StatusCode, String)> {
pub async fn get_qmdl_manifest(
State(state): State<Arc<ServerState>>,
) -> Result<Json<ManifestStats>, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let mut entries = qmdl_store.manifest.entries.clone();
let current_entry = qmdl_store.current_entry.map(|index| entries.remove(index));