Add the test analyzer entirely via daemon flags

Also consolidate the duplicate AnalysisWriter implementation
This commit is contained in:
Will Greenberg
2024-10-08 12:58:34 -07:00
committed by Cooper Quintin
parent a6fce6d568
commit 16f705f29c
9 changed files with 101 additions and 84 deletions
+15 -7
View File
@@ -20,6 +20,7 @@ use tokio_util::task::TaskTracker;
use crate::qmdl_store::RecordingStore;
use crate::server::ServerState;
use crate::dummy_analyzer::TestAnalyzer;
pub struct AnalysisWriter {
writer: BufWriter<File>,
@@ -34,11 +35,16 @@ pub struct AnalysisWriter {
// lets us simply append new rows to the end without parsing the entire JSON
// object beforehand.
impl AnalysisWriter {
pub async fn new(file: File) -> Result<Self, std::io::Error> {
pub async fn new(file: File, enable_dummy_analyzer: bool) -> Result<Self, std::io::Error> {
let mut harness = Harness::new_with_all_analyzers();
if enable_dummy_analyzer {
harness.add_analyzer(Box::new(TestAnalyzer { count: 0 }));
}
let mut result = Self {
writer: BufWriter::new(file),
harness: Harness::new_with_all_analyzers(),
bytes_written: 0,
harness,
};
let metadata = result.harness.get_metadata();
result.write(&metadata).await?;
@@ -47,12 +53,12 @@ impl AnalysisWriter {
// Runs the analysis harness on the given container, serializing the results
// to the analysis file and returning the file's new length.
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<usize, std::io::Error> {
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<(usize, bool), std::io::Error> {
let row = self.harness.analyze_qmdl_messages(container);
if !row.is_empty() {
self.write(&row).await?;
}
Ok(self.bytes_written)
Ok((self.bytes_written, row.contains_warnings()))
}
async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
@@ -102,6 +108,7 @@ async fn clear_running(analysis_status_lock: Arc<RwLock<AnalysisStatus>>) {
async fn perform_analysis(
name: &str,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
enable_dummy_analyzer: bool,
) -> Result<(), String> {
info!("Opening QMDL and analysis file for {}...", name);
let (analysis_file, qmdl_file, entry_index) = {
@@ -121,7 +128,7 @@ async fn perform_analysis(
(analysis_file, qmdl_file, entry_index)
};
let mut analysis_writer = AnalysisWriter::new(analysis_file)
let mut analysis_writer = AnalysisWriter::new(analysis_file, enable_dummy_analyzer)
.await
.map_err(|e| format!("{:?}", e))?;
let file_size = qmdl_file
@@ -140,7 +147,7 @@ async fn perform_analysis(
.await
.expect("failed getting QMDL container")
{
let size_bytes = analysis_writer
let (size_bytes, _) = analysis_writer
.analyze(container)
.await
.map_err(|e| format!("{:?}", e))?;
@@ -166,6 +173,7 @@ pub fn run_analysis_thread(
mut analysis_rx: Receiver<AnalysisCtrlMessage>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
analysis_status_lock: Arc<RwLock<AnalysisStatus>>,
enable_dummy_analyzer: bool,
) {
task_tracker.spawn(async move {
loop {
@@ -174,7 +182,7 @@ pub fn run_analysis_thread(
let count = queued_len(analysis_status_lock.clone()).await;
for _ in 0..count {
let name = dequeue_to_running(analysis_status_lock.clone()).await;
if let Err(err) = perform_analysis(&name, qmdl_store_lock.clone()).await {
if let Err(err) = perform_analysis(&name, qmdl_store_lock.clone(), enable_dummy_analyzer).await {
error!("failed to analyze {}: {}", name, err);
}
clear_running(analysis_status_lock.clone()).await;
+8
View File
@@ -4,6 +4,8 @@ use tokio::fs::{metadata, read_dir, File};
use clap::Parser;
use futures::TryStreamExt;
mod dummy_analyzer;
#[derive(Parser, Debug)]
#[command(version, about)]
struct Args {
@@ -12,6 +14,9 @@ struct Args {
#[arg(long)]
show_skipped: bool,
#[arg(long)]
enable_dummy_analyzer: bool,
}
async fn analyze_file(harness: &mut Harness, qmdl_path: &str, show_skipped: bool) {
@@ -55,6 +60,9 @@ async fn main() {
let args = Args::parse();
let mut harness = Harness::new_with_all_analyzers();
if args.enable_dummy_analyzer {
harness.add_analyzer(Box::new(dummy_analyzer::TestAnalyzer { count: 0 }));
}
println!("Analyzers:");
for analyzer in harness.get_metadata().analyzers {
println!(" - {}: {}", analyzer.name, analyzer.description);
+8 -4
View File
@@ -8,6 +8,7 @@ struct ConfigFile {
port: Option<u16>,
debug_mode: Option<bool>,
ui_level: Option<u8>,
enable_dummy_analyzer: Option<bool>,
}
#[derive(Debug)]
@@ -16,6 +17,7 @@ pub struct Config {
pub port: u16,
pub debug_mode: bool,
pub ui_level: u8,
pub enable_dummy_analyzer: bool,
}
impl Default for Config {
@@ -25,6 +27,7 @@ impl Default for Config {
port: 8080,
debug_mode: false,
ui_level: 1,
enable_dummy_analyzer: false,
}
}
}
@@ -34,10 +37,11 @@ pub fn parse_config<P>(path: P) -> Result<Config, RayhunterError> where P: AsRef
if let Ok(config_file) = std::fs::read_to_string(&path) {
let parsed_config: ConfigFile = toml::from_str(&config_file)
.map_err(RayhunterError::ConfigFileParsingError)?;
if let Some(path) = parsed_config.qmdl_store_path { config.qmdl_store_path = path }
if let Some(port) = parsed_config.port { config.port = port }
if let Some(debug_mode) = parsed_config.debug_mode { config.debug_mode = debug_mode }
if let Some(ui_level) = parsed_config.ui_level { config.ui_level = ui_level }
parsed_config.qmdl_store_path.map(|v| config.qmdl_store_path = v);
parsed_config.port.map(|v| config.port = v);
parsed_config.debug_mode.map(|v| config.debug_mode = v);
parsed_config.ui_level.map(|v| config.ui_level = v);
parsed_config.enable_dummy_analyzer.map(|v| config.enable_dummy_analyzer = v);
}
Ok(config)
}
+3 -2
View File
@@ -7,6 +7,7 @@ mod stats;
mod qmdl_store;
mod diag;
mod framebuffer;
mod dummy_analyzer;
use crate::config::{parse_config, parse_args};
use crate::diag::run_diag_read_thread;
@@ -223,14 +224,14 @@ async fn main() -> Result<(), RayhunterError> {
.map_err(RayhunterError::DiagInitError)?;
info!("Starting Diag Thread");
run_diag_read_thread(&task_tracker, dev, rx, ui_update_tx.clone(), qmdl_store_lock.clone());
run_diag_read_thread(&task_tracker, dev, rx, ui_update_tx.clone(), qmdl_store_lock.clone(), config.enable_dummy_analyzer);
info!("Starting UI");
update_ui(&task_tracker, &config, ui_shutdown_rx, ui_update_rx);
}
let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>();
info!("create shutdown thread");
let analysis_status_lock = Arc::new(RwLock::new(AnalysisStatus::default()));
run_analysis_thread(&task_tracker, analysis_rx, qmdl_store_lock.clone(), analysis_status_lock.clone());
run_analysis_thread(&task_tracker, analysis_rx, qmdl_store_lock.clone(), analysis_status_lock.clone(), config.enable_dummy_analyzer);
run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, maybe_ui_shutdown_tx, qmdl_store_lock.clone(), analysis_tx.clone());
run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, ui_update_tx, tx, analysis_tx, analysis_status_lock).await;
+6 -58
View File
@@ -6,17 +6,13 @@ use axum::extract::{Path, State};
use axum::http::header::CONTENT_TYPE;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use rayhunter::analysis::analyzer::Harness;
use rayhunter::diag::{DataType, MessagesContainer};
use rayhunter::diag::DataType;
use rayhunter::diag_device::DiagDevice;
use serde::Serialize;
use tokio::sync::RwLock;
use tokio::sync::mpsc::{Receiver, Sender};
use rayhunter::qmdl::QmdlWriter;
use log::{debug, error, info};
use tokio::fs::File;
use tokio::io::BufWriter;
use tokio::io::AsyncWriteExt;
use tokio_util::io::ReaderStream;
use tokio_util::task::TaskTracker;
use futures::{StreamExt, TryStreamExt};
@@ -24,6 +20,7 @@ use futures::{StreamExt, TryStreamExt};
use crate::framebuffer;
use crate::qmdl_store::RecordingStore;
use crate::server::ServerState;
use crate::analysis::AnalysisWriter;
pub enum DiagDeviceCtrlMessage {
StopRecording,
@@ -31,68 +28,19 @@ pub enum DiagDeviceCtrlMessage {
Exit,
}
struct AnalysisWriter {
writer: BufWriter<File>,
harness: Harness,
bytes_written: usize,
}
// We write our analysis results to a file immediately to minimize the amount of
// state Rayhunter has to keep track of in memory. The analysis file's format is
// Newline Delimited JSON
// (https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson), which
// lets us simply append new rows to the end without parsing the entire JSON
// object beforehand.
impl AnalysisWriter {
pub async fn new(file: File) -> Result<Self, std::io::Error> {
let mut result = Self {
writer: BufWriter::new(file),
harness: Harness::new_with_all_analyzers(),
bytes_written: 0,
};
let metadata = result.harness.get_metadata();
result.write(&metadata).await?;
Ok(result)
}
// Runs the analysis harness on the given container, serializing the results
// to the analysis file and returning the file's new length.
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<(usize, bool), std::io::Error> {
let row = self.harness.analyze_qmdl_messages(container);
if !row.is_empty() {
self.write(&row).await?;
}
Ok((self.bytes_written, ! &row.analysis.is_empty()))
}
async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
let mut value_str = serde_json::to_string(value).unwrap();
value_str.push('\n');
self.bytes_written += value_str.len();
self.writer.write_all(value_str.as_bytes()).await?;
self.writer.flush().await?;
Ok(())
}
// Flushes any pending I/O to disk before dropping the writer
pub async fn close(mut self) -> Result<(), std::io::Error> {
self.writer.flush().await?;
Ok(())
}
}
pub fn run_diag_read_thread(
task_tracker: &TaskTracker,
mut dev: DiagDevice,
mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>,
ui_update_sender: Sender<framebuffer::DisplayState>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
enable_dummy_analyzer: bool,
) {
task_tracker.spawn(async move {
let (initial_qmdl_file, initial_analysis_file) = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry");
let mut maybe_qmdl_writer: Option<QmdlWriter<File>> = Some(QmdlWriter::new(initial_qmdl_file));
let mut diag_stream = pin!(dev.as_stream().into_stream());
let mut maybe_analysis_writer = Some(AnalysisWriter::new(initial_analysis_file).await
let mut maybe_analysis_writer = Some(AnalysisWriter::new(initial_analysis_file, enable_dummy_analyzer).await
.expect("failed to create analysis writer"));
loop {
tokio::select! {
@@ -103,7 +51,7 @@ pub fn run_diag_read_thread(
if let Some(analysis_writer) = maybe_analysis_writer {
analysis_writer.close().await.expect("failed to close analysis writer");
}
maybe_analysis_writer = Some(AnalysisWriter::new(new_analysis_file).await
maybe_analysis_writer = Some(AnalysisWriter::new(new_analysis_file, enable_dummy_analyzer).await
.expect("failed to write to analysis file"));
},
Some(DiagDeviceCtrlMessage::StopRecording) => {
+45
View File
@@ -0,0 +1,45 @@
use std::borrow::Cow;
use rayhunter::telcom_parser::lte_rrc::{PCCH_MessageType, PCCH_MessageType_c1, PagingUE_Identity};
use rayhunter::analysis::analyzer::{Analyzer, Event, EventType, Severity};
use rayhunter::analysis::information_element::{InformationElement, LteInformationElement};
pub struct TestAnalyzer{
pub count: i32,
}
impl Analyzer for TestAnalyzer{
fn get_name(&self) -> Cow<str> {
Cow::from("Example Analyzer")
}
fn get_description(&self) -> Cow<str> {
Cow::from("Always returns true, if you are seeing this you are either a developer or you are about to have problems.")
}
fn analyze_information_element(&mut self, ie: &InformationElement) -> Option<Event> {
self.count += 1;
if self.count % 100 == 0 {
return Some(Event {
event_type: EventType::Informational ,
message: "multiple of 100 events processed".to_string(),
})
}
let InformationElement::LTE(LteInformationElement::PCCH(pcch_msg)) = ie else {
return None;
};
let PCCH_MessageType::C1(PCCH_MessageType_c1::Paging(paging)) = &pcch_msg.message else {
return None;
};
for record in &paging.paging_record_list.as_ref()?.0 {
if let PagingUE_Identity::S_TMSI(_) = record.ue_identity {
return Some(Event {
event_type: EventType::QualitativeWarning { severity: Severity::Low },
message: "TMSI was provided to cell".to_string(),
})
}
}
None
}
}
-3
View File
@@ -24,6 +24,3 @@ tokio = { version = "1.35.1", features = ["full"] }
futures-core = "0.3.30"
futures = "0.3.30"
serde = { version = "1.0.197", features = ["derive"] }
[features]
debug = []
+13 -10
View File
@@ -6,12 +6,6 @@ use crate::{diag::MessagesContainer, gsmtap_parser};
use super::{imsi_provided::ImsiProvidedAnalyzer, information_element::InformationElement, lte_downgrade::LteSib6And7DowngradeAnalyzer, null_cipher::NullCipherAnalyzer};
#[cfg(feature="debug")]
use log::warn;
#[cfg(feature="debug")]
use super::test_analyzer::TestAnalyzer;
/// Qualitative measure of how severe a Warning event type is.
/// The levels should break down like this:
/// * Low: if combined with a large number of other Warnings, user should investigate
@@ -92,6 +86,19 @@ impl AnalysisRow {
pub fn is_empty(&self) -> bool {
self.skipped_message_reasons.is_empty() && self.analysis.is_empty()
}
pub fn contains_warnings(&self) -> bool {
for analysis in &self.analysis {
for maybe_event in &analysis.events {
if let Some(event) = maybe_event {
if matches!(event.event_type, EventType::QualitativeWarning { .. }) {
return true;
}
}
}
}
false
}
}
pub struct Harness {
@@ -109,10 +116,6 @@ impl Harness {
harness.add_analyzer(Box::new(ImsiProvidedAnalyzer{}));
harness.add_analyzer(Box::new(NullCipherAnalyzer{}));
#[cfg(feature="debug")] {
warn!("Loading test analyzers!");
harness.add_analyzer(Box::new(TestAnalyzer{count:0}));
}
harness
}
+3
View File
@@ -7,3 +7,6 @@ pub mod gsmtap;
pub mod gsmtap_parser;
pub mod pcap;
pub mod analysis;
// re-export telcom_parser, since we use its types in our API
pub use telcom_parser;