mirror of
https://github.com/EFForg/rayhunter.git
synced 2026-04-30 09:29:58 -07:00
daemon: run analysis in realtime
Currently we just show the results of analysis as a <pre> tagged JSON blob, but eventually we can make some actual UI
This commit is contained in:
56
bin/src/analysis.rs
Normal file
56
bin/src/analysis.rs
Normal file
@@ -0,0 +1,56 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{extract::State, http::StatusCode, Json};
|
||||
use log::error;
|
||||
use rayhunter::{analysis::analyzer::{AnalysisReport, Harness}, diag::MessagesContainer};
|
||||
use tokio::sync;
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::server::ServerState;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AnalysisMessage {
|
||||
Reset,
|
||||
GetReport(sync::oneshot::Sender<AnalysisReport>),
|
||||
AnalyzeContainer(MessagesContainer),
|
||||
StopThread,
|
||||
}
|
||||
|
||||
pub fn run_analysis_thread(task_tracker: &TaskTracker) -> sync::mpsc::Sender<AnalysisMessage> {
|
||||
let (tx, mut rx) = sync::mpsc::channel(5);
|
||||
|
||||
task_tracker.spawn(async move {
|
||||
let mut harness = Harness::new_with_all_analyzers();
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Some(AnalysisMessage::GetReport(sender)) => {
|
||||
// this might fail if the client closes their connection
|
||||
// before we're done building the report
|
||||
if let Err(e) = sender.send(harness.build_analysis_report()) {
|
||||
error!("failed to send analysis report: {:?}", e);
|
||||
}
|
||||
},
|
||||
Some(AnalysisMessage::Reset) => harness = Harness::new_with_all_analyzers(),
|
||||
Some(AnalysisMessage::AnalyzeContainer(container)) => harness.analyze_qmdl_messages(container),
|
||||
Some(AnalysisMessage::StopThread) | None => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tx
|
||||
}
|
||||
|
||||
pub async fn get_analysis_report(State(state): State<Arc<ServerState>>) -> Result<Json<AnalysisReport>, (StatusCode, String)> {
|
||||
if state.readonly_mode {
|
||||
return Err((StatusCode::FORBIDDEN, "server is in readonly mode".to_string()));
|
||||
}
|
||||
let analysis_tx = state.maybe_analysis_tx.as_ref().unwrap();
|
||||
let (report_tx, report_rx) = tokio::sync::oneshot::channel();
|
||||
if let Err(e) = analysis_tx.send(AnalysisMessage::GetReport(report_tx)).await {
|
||||
return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("error reaching analysis thread: {:?}", e)));
|
||||
}
|
||||
match report_rx.await {
|
||||
Ok(report) => Ok(Json(report)),
|
||||
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("error fetching analysis report: {:?}", e)))
|
||||
}
|
||||
}
|
||||
108
bin/src/check.rs
108
bin/src/check.rs
@@ -1,8 +1,6 @@
|
||||
use std::{future, path::PathBuf, pin::pin};
|
||||
use chrono::{DateTime, FixedOffset};
|
||||
use rayhunter::{analysis::{analyzer::{Event, EventType, Harness}, information_element::InformationElement, lte_downgrade::LteSib6And7DowngradeAnalyzer}, diag::DataType, gsmtap_parser::GsmtapParser, qmdl::QmdlReader};
|
||||
use rayhunter::{analysis::analyzer::Harness, diag::DataType, qmdl::QmdlReader};
|
||||
use tokio::fs::File;
|
||||
use serde::Serialize;
|
||||
use clap::Parser;
|
||||
use futures::TryStreamExt;
|
||||
|
||||
@@ -13,120 +11,22 @@ struct Args {
|
||||
qmdl_path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct AnalyzerMetadata {
|
||||
name: String,
|
||||
description: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct ReportMetadata {
|
||||
num_packets_analyzed: usize,
|
||||
num_packets_skipped: usize,
|
||||
num_warnings: usize,
|
||||
first_packet_time: DateTime<FixedOffset>,
|
||||
last_packet_time: DateTime<FixedOffset>,
|
||||
analyzers: Vec<AnalyzerMetadata>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct PacketAnalysis {
|
||||
timestamp: DateTime<FixedOffset>,
|
||||
events: Vec<Option<Event>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct AnalysisReport {
|
||||
metadata: ReportMetadata,
|
||||
analysis: Vec<PacketAnalysis>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
env_logger::init();
|
||||
let args = Args::parse();
|
||||
|
||||
let mut harness = Harness::new();
|
||||
harness.add_analyzer(Box::new(LteSib6And7DowngradeAnalyzer{}));
|
||||
|
||||
let mut num_packets_analyzed = 0;
|
||||
let mut num_warnings = 0;
|
||||
let mut first_packet_time: Option<DateTime<FixedOffset>> = None;
|
||||
let mut last_packet_time: Option<DateTime<FixedOffset>> = None;
|
||||
let mut skipped_message_reasons: Vec<String> = Vec::new();
|
||||
let mut analysis: Vec<PacketAnalysis> = Vec::new();
|
||||
let mut analyzers: Vec<AnalyzerMetadata> = Vec::new();
|
||||
|
||||
let names = harness.get_names();
|
||||
let descriptions = harness.get_names();
|
||||
for (name, description) in names.iter().zip(descriptions.iter()) {
|
||||
analyzers.push(AnalyzerMetadata {
|
||||
name: name.to_string(),
|
||||
description: description.to_string(),
|
||||
});
|
||||
}
|
||||
let mut harness = Harness::new_with_all_analyzers();
|
||||
|
||||
let qmdl_file = File::open(args.qmdl_path).await.expect("failed to open QMDL file");
|
||||
let file_size = qmdl_file.metadata().await.expect("failed to get QMDL file metadata").len();
|
||||
let mut gsmtap_parser = GsmtapParser::new();
|
||||
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
|
||||
let mut qmdl_stream = pin!(qmdl_reader.as_stream()
|
||||
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
|
||||
while let Some(container) = qmdl_stream.try_next().await.expect("failed getting QMDL container") {
|
||||
for maybe_qmdl_message in container.into_messages() {
|
||||
let qmdl_message = match maybe_qmdl_message {
|
||||
Ok(msg) => msg,
|
||||
Err(err) => {
|
||||
skipped_message_reasons.push(format!("{:?}", err));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let gsmtap_message = match gsmtap_parser.parse(qmdl_message) {
|
||||
Ok(msg) => msg,
|
||||
Err(err) => {
|
||||
skipped_message_reasons.push(format!("{:?}", err));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let Some((timestamp, gsmtap_msg)) = gsmtap_message else {
|
||||
continue;
|
||||
};
|
||||
let element = match InformationElement::try_from(&gsmtap_msg) {
|
||||
Ok(element) => element,
|
||||
Err(err) => {
|
||||
skipped_message_reasons.push(format!("{:?}", err));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if first_packet_time.is_none() {
|
||||
first_packet_time = Some(timestamp.to_datetime());
|
||||
}
|
||||
last_packet_time = Some(timestamp.to_datetime());
|
||||
num_packets_analyzed += 1;
|
||||
let analysis_result = harness.analyze_information_element(&element);
|
||||
if analysis_result.iter().any(Option::is_some) {
|
||||
num_warnings += analysis_result.iter()
|
||||
.filter(|maybe_event| matches!(maybe_event, Some(Event { event_type: EventType::QualitativeWarning { .. }, .. })))
|
||||
.count();
|
||||
analysis.push(PacketAnalysis {
|
||||
timestamp: timestamp.to_datetime(),
|
||||
events: analysis_result,
|
||||
});
|
||||
}
|
||||
}
|
||||
harness.analyze_qmdl_messages(container)
|
||||
}
|
||||
|
||||
let report = AnalysisReport {
|
||||
metadata: ReportMetadata {
|
||||
num_packets_analyzed,
|
||||
num_packets_skipped: skipped_message_reasons.len(),
|
||||
num_warnings,
|
||||
first_packet_time: first_packet_time.expect("no packet times set"),
|
||||
last_packet_time: last_packet_time.expect("no packet times set"),
|
||||
analyzers,
|
||||
},
|
||||
analysis,
|
||||
};
|
||||
|
||||
let report = harness.build_analysis_report();
|
||||
println!("{}", serde_json::to_string(&report).expect("failed to serialize report"));
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
mod analysis;
|
||||
mod config;
|
||||
mod error;
|
||||
mod pcap;
|
||||
@@ -14,6 +15,7 @@ use crate::pcap::get_pcap;
|
||||
use crate::stats::get_system_stats;
|
||||
use crate::error::RayhunterError;
|
||||
|
||||
use analysis::{get_analysis_report, run_analysis_thread, AnalysisMessage};
|
||||
use axum::response::Redirect;
|
||||
use diag::{DiagDeviceCtrlMessage, start_recording, stop_recording};
|
||||
use log::{info, error};
|
||||
@@ -37,12 +39,14 @@ async fn run_server(
|
||||
config: &config::Config,
|
||||
qmdl_store_lock: Arc<RwLock<QmdlStore>>,
|
||||
server_shutdown_rx: oneshot::Receiver<()>,
|
||||
diag_device_sender: Sender<DiagDeviceCtrlMessage>
|
||||
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
|
||||
maybe_analysis_tx: Option<Sender<AnalysisMessage>>
|
||||
) -> JoinHandle<()> {
|
||||
let state = Arc::new(ServerState {
|
||||
qmdl_store_lock,
|
||||
diag_device_ctrl_sender: diag_device_sender,
|
||||
readonly_mode: config.readonly_mode,
|
||||
maybe_analysis_tx,
|
||||
});
|
||||
|
||||
let app = Router::new()
|
||||
@@ -52,6 +56,7 @@ async fn run_server(
|
||||
.route("/api/qmdl-manifest", get(get_qmdl_manifest))
|
||||
.route("/api/start-recording", post(start_recording))
|
||||
.route("/api/stop-recording", post(stop_recording))
|
||||
.route("/api/analysis-report", get(get_analysis_report))
|
||||
.route("/", get(|| async { Redirect::permanent("/index.html") }))
|
||||
.route("/*path", get(serve_static))
|
||||
.with_state(state);
|
||||
@@ -87,7 +92,8 @@ fn run_ctrl_c_thread(
|
||||
task_tracker: &TaskTracker,
|
||||
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
|
||||
server_shutdown_tx: oneshot::Sender<()>,
|
||||
qmdl_store_lock: Arc<RwLock<QmdlStore>>
|
||||
qmdl_store_lock: Arc<RwLock<QmdlStore>>,
|
||||
maybe_analysis_tx: Option<Sender<AnalysisMessage>>
|
||||
) -> JoinHandle<Result<(), RayhunterError>> {
|
||||
task_tracker.spawn(async move {
|
||||
match tokio::signal::ctrl_c().await {
|
||||
@@ -103,6 +109,10 @@ fn run_ctrl_c_thread(
|
||||
.expect("couldn't send server shutdown signal");
|
||||
diag_device_sender.send(DiagDeviceCtrlMessage::Exit).await
|
||||
.expect("couldn't send Exit message to diag thread");
|
||||
if let Some(analysis_tx) = maybe_analysis_tx {
|
||||
analysis_tx.send(AnalysisMessage::StopThread).await
|
||||
.expect("couldn't send Exit message to analysis thread")
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
error!("Unable to listen for shutdown signal: {}", err);
|
||||
@@ -125,18 +135,21 @@ async fn main() -> Result<(), RayhunterError> {
|
||||
|
||||
let qmdl_store_lock = Arc::new(RwLock::new(init_qmdl_store(&config).await?));
|
||||
let (tx, rx) = mpsc::channel::<DiagDeviceCtrlMessage>(1);
|
||||
let mut maybe_analysis_tx = None;
|
||||
if !config.readonly_mode {
|
||||
let mut dev = DiagDevice::new().await
|
||||
.map_err(RayhunterError::DiagInitError)?;
|
||||
dev.config_logs().await
|
||||
.map_err(RayhunterError::DiagInitError)?;
|
||||
|
||||
run_diag_read_thread(&task_tracker, dev, rx, qmdl_store_lock.clone());
|
||||
let analysis_tx = run_analysis_thread(&task_tracker);
|
||||
run_diag_read_thread(&task_tracker, dev, rx, qmdl_store_lock.clone(), analysis_tx.clone());
|
||||
maybe_analysis_tx = Some(analysis_tx);
|
||||
}
|
||||
|
||||
let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel::<()>();
|
||||
run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, qmdl_store_lock.clone());
|
||||
run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, tx).await;
|
||||
run_ctrl_c_thread(&task_tracker, tx.clone(), server_shutdown_tx, qmdl_store_lock.clone(), maybe_analysis_tx.clone());
|
||||
run_server(&task_tracker, &config, qmdl_store_lock.clone(), server_shutdown_rx, tx, maybe_analysis_tx).await;
|
||||
|
||||
task_tracker.close();
|
||||
task_tracker.wait().await;
|
||||
|
||||
@@ -6,13 +6,14 @@ use axum::http::StatusCode;
|
||||
use rayhunter::diag::DataType;
|
||||
use rayhunter::diag_device::DiagDevice;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use rayhunter::qmdl::QmdlWriter;
|
||||
use log::{debug, error, info};
|
||||
use tokio::fs::File;
|
||||
use tokio_util::task::TaskTracker;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
|
||||
use crate::analysis::AnalysisMessage;
|
||||
use crate::qmdl_store::QmdlStore;
|
||||
use crate::server::ServerState;
|
||||
|
||||
@@ -22,7 +23,13 @@ pub enum DiagDeviceCtrlMessage {
|
||||
Exit,
|
||||
}
|
||||
|
||||
pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>, qmdl_store_lock: Arc<RwLock<QmdlStore>>) {
|
||||
pub fn run_diag_read_thread(
|
||||
task_tracker: &TaskTracker,
|
||||
mut dev: DiagDevice,
|
||||
mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>,
|
||||
qmdl_store_lock: Arc<RwLock<QmdlStore>>,
|
||||
analysis_tx: Sender<AnalysisMessage>
|
||||
) {
|
||||
task_tracker.spawn(async move {
|
||||
let initial_file = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry");
|
||||
let mut qmdl_writer: Option<QmdlWriter<File>> = Some(QmdlWriter::new(initial_file));
|
||||
@@ -33,8 +40,14 @@ pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut
|
||||
match msg {
|
||||
Some(DiagDeviceCtrlMessage::StartRecording(new_writer)) => {
|
||||
qmdl_writer = Some(new_writer);
|
||||
analysis_tx.send(AnalysisMessage::Reset).await
|
||||
.expect("failed to send message to analysis thread");
|
||||
},
|
||||
Some(DiagDeviceCtrlMessage::StopRecording) => {
|
||||
qmdl_writer = None;
|
||||
analysis_tx.send(AnalysisMessage::Reset).await
|
||||
.expect("failed to send message to analysis thread");
|
||||
},
|
||||
Some(DiagDeviceCtrlMessage::StopRecording) => qmdl_writer = None,
|
||||
// None means all the Senders have been dropped, so it's
|
||||
// time to go
|
||||
Some(DiagDeviceCtrlMessage::Exit) | None => {
|
||||
@@ -59,6 +72,9 @@ pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut
|
||||
let index = qmdl_store.current_entry.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
|
||||
qmdl_store.update_entry(index, writer.total_written).await
|
||||
.expect("failed to update qmdl file size");
|
||||
debug!("sending container to analysis thread...");
|
||||
analysis_tx.send(AnalysisMessage::AnalyzeContainer(container)).await
|
||||
.expect("failed sending messages container to analysis thread");
|
||||
debug!("done!");
|
||||
} else {
|
||||
debug!("no qmdl_writer set, continuing...");
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::ServerState;
|
||||
|
||||
use rayhunter::diag::DataType;
|
||||
use rayhunter::gsmtap_parser::GsmtapParser;
|
||||
use rayhunter::gsmtap_parser;
|
||||
use rayhunter::pcap::GsmtapPcapWriter;
|
||||
use rayhunter::qmdl::QmdlReader;
|
||||
use axum::body::Body;
|
||||
@@ -34,7 +34,6 @@ pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Pa
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?;
|
||||
// the QMDL reader should stop at the last successfully written data chunk
|
||||
// (entry.size_bytes)
|
||||
let mut gsmtap_parser = GsmtapParser::new();
|
||||
let (reader, writer) = duplex(1024);
|
||||
let mut pcap_writer = GsmtapPcapWriter::new(writer).await.unwrap();
|
||||
pcap_writer.write_iface_header().await.unwrap();
|
||||
@@ -48,7 +47,7 @@ pub async fn get_pcap(State(state): State<Arc<ServerState>>, Path(qmdl_name): Pa
|
||||
for maybe_msg in container.into_messages() {
|
||||
match maybe_msg {
|
||||
Ok(msg) => {
|
||||
let maybe_gsmtap_msg = gsmtap_parser.parse(msg)
|
||||
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)
|
||||
.expect("error parsing gsmtap message");
|
||||
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
|
||||
pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp).await
|
||||
|
||||
@@ -11,6 +11,7 @@ use tokio::sync::RwLock;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use include_dir::{include_dir, Dir};
|
||||
|
||||
use crate::analysis::AnalysisMessage;
|
||||
use crate::DiagDeviceCtrlMessage;
|
||||
use crate::qmdl_store::QmdlStore;
|
||||
|
||||
@@ -18,6 +19,7 @@ pub struct ServerState {
|
||||
pub qmdl_store_lock: Arc<RwLock<QmdlStore>>,
|
||||
pub diag_device_ctrl_sender: Sender<DiagDeviceCtrlMessage>,
|
||||
pub readonly_mode: bool,
|
||||
pub maybe_analysis_tx: Option<Sender<AnalysisMessage>>,
|
||||
}
|
||||
|
||||
pub async fn get_qmdl(State(state): State<Arc<ServerState>>, Path(qmdl_name): Path<String>) -> Result<Response, (StatusCode, String)> {
|
||||
|
||||
Reference in New Issue
Block a user