Compare commits

...

10 Commits

Author SHA1 Message Date
Will Greenberg f8c9701556 Add bmw's ZIP reader sanity check 2026-06-03 19:15:44 -07:00
Will Greenberg fda0659ba4 Revert manifest change, rename some structs
Because QmdlMessageReader no longer cares about tracking the
uncompressed bytes read thus far, we can use the old manifest QMDL file
size name again.
2026-06-03 19:11:53 -07:00
Will Greenberg 0cd70ad73c Refactor and simplify QmdlReader
In the past, QmdlReader was written to share a trait with DiagDevice, so
it had to pretend to be reading MessagesContainers. This needlessly
complicated both its code as well as that of consumers'. Instead,
QmdlReader now returns a stream of diag Messages.

QmdlReader also automatically detects if it's reading a compressed QMDL
stream or not.

Additionally, QmdlReader no longer can be bounded by a filesize limit,
and instead relies on HDLC message framing to detect file truncation.
This works for both compressed and uncompressed QMDL files.
2026-06-03 18:27:19 -07:00
Will Greenberg af4a9aeb95 server: compressed QMDL is decompressed in a zip
This is perhaps a bit silly, but the current output of a compressed
QmdlReader is always decompressed, so when we export a ZIP, we're
basically un-gzipping the QMDL before zipping it.

However, this approach makes for simpler code and results in a uniform
set of files for the user, which I think is worth the slight amount of
wasted processing.
2026-04-03 13:40:31 -07:00
Will Greenberg 73e3c9a5c2 simplify QmdlWriter 2026-04-03 13:40:26 -07:00
Will Greenberg 58c60c2661 daemon: put QmdlWriter in a Box
This'll balance the enum size given QmdlWriter's larger size
2026-04-01 12:29:58 -07:00
Will Greenberg e0ae8a0298 run cargo fmt 2026-04-01 11:55:56 -07:00
Will Greenberg e6a3a4331e daemon: fix zip test 2026-04-01 11:43:48 -07:00
Will Greenberg 9191540e86 qmdl_store: maintain backwards compatibility 2026-04-01 11:40:04 -07:00
Will Greenberg 0a93e93838 Add support for compressed QMDL
This reworks the QmdlWriter to output gzipped QMDL files by default,
and allows QmdlReader to operate on either compressed or uncompressed
QMDLs.

QmdlReader has been significantly rewritten to expose a single AsyncRead
interface to both compressed and uncompressed QMDL sources.
2026-03-30 19:58:13 -07:00
12 changed files with 669 additions and 404 deletions
Generated
+50 -27
View File
@@ -274,6 +274,18 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "async-compression"
version = "0.4.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0f9ee0f6e02ffd7ad5816e9464499fba7b3effd01123b515c41d1697c43dad1"
dependencies = [
"compression-codecs",
"compression-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "async-executor" name = "async-executor"
version = "1.13.3" version = "1.13.3"
@@ -945,6 +957,23 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "compression-codecs"
version = "0.4.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb7b51a7d9c967fc26773061ba86150f19c50c0d65c887cb1fbe295fd16619b7"
dependencies = [
"compression-core",
"flate2",
"memchr",
]
[[package]]
name = "compression-core"
version = "0.4.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d"
[[package]] [[package]]
name = "concurrent-queue" name = "concurrent-queue"
version = "2.5.0" version = "2.5.0"
@@ -1733,9 +1762,9 @@ dependencies = [
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.1.1" version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c"
dependencies = [ dependencies = [
"crc32fast", "crc32fast",
"miniz_oxide", "miniz_oxide",
@@ -1812,9 +1841,9 @@ dependencies = [
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@@ -1827,9 +1856,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@@ -1837,15 +1866,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-task", "futures-task",
@@ -1854,9 +1883,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
[[package]] [[package]]
name = "futures-lite" name = "futures-lite"
@@ -1873,9 +1902,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -1884,21 +1913,21 @@ dependencies = [
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@@ -1908,7 +1937,6 @@ dependencies = [
"futures-task", "futures-task",
"memchr", "memchr",
"pin-project-lite", "pin-project-lite",
"pin-utils",
"slab", "slab",
] ]
@@ -4132,12 +4160,6 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "piper" name = "piper"
version = "0.2.4" version = "0.2.4"
@@ -4674,6 +4696,7 @@ checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539"
name = "rayhunter" name = "rayhunter"
version = "0.10.2" version = "0.10.2"
dependencies = [ dependencies = [
"async-compression",
"bytes", "bytes",
"chrono", "chrono",
"crc", "crc",
+13 -30
View File
@@ -1,15 +1,13 @@
use clap::Parser; use clap::Parser;
use futures::TryStreamExt;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use pcap_file_tokio::pcapng::{Block, PcapNgReader}; use pcap_file_tokio::pcapng::{Block, PcapNgReader};
use rayhunter::{ use rayhunter::{
analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness}, analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness},
diag::DataType,
gsmtap_parser, gsmtap_parser,
pcap::GsmtapPcapWriter, pcap::GsmtapPcapWriter,
qmdl::QmdlReader, qmdl::QmdlMessageReader,
}; };
use std::{collections::HashMap, future, path::PathBuf, pin::pin}; use std::{collections::HashMap, path::PathBuf};
use tokio::fs::File; use tokio::fs::File;
use walkdir::WalkDir; use walkdir::WalkDir;
@@ -113,26 +111,14 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) {
async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) { async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) {
let mut harness = Harness::new_with_config(&AnalyzerConfig::default()); let mut harness = Harness::new_with_config(&AnalyzerConfig::default());
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file"); let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
let file_size = qmdl_file let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader");
.metadata()
.await
.expect("failed to get QMDL file metadata")
.len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin!(
qmdl_reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace))
);
let mut report = Report::new(qmdl_path); let mut report = Report::new(qmdl_path);
while let Some(container) = qmdl_stream while let Some(maybe_message) = qmdl_reader
.try_next() .get_next_message()
.await .await
.expect("failed getting QMDL container") .expect("failed to get message")
{ {
for row in harness.analyze_qmdl_messages(container) { report.process_row(harness.analyze_qmdl_message(maybe_message));
report.process_row(row);
}
} }
report.print_summary(show_skipped); report.print_summary(show_skipped);
} }
@@ -141,8 +127,7 @@ async fn pcapify(qmdl_path: &PathBuf) {
let qmdl_file = &mut File::open(&qmdl_path) let qmdl_file = &mut File::open(&qmdl_path)
.await .await
.expect("failed to open qmdl file"); .expect("failed to open qmdl file");
let qmdl_file_size = qmdl_file.metadata().await.unwrap().len(); let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader");
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_file_size as usize));
let mut pcap_path = qmdl_path.clone(); let mut pcap_path = qmdl_path.clone();
pcap_path.set_extension("pcapng"); pcap_path.set_extension("pcapng");
let pcap_file = &mut File::create(&pcap_path) let pcap_file = &mut File::create(&pcap_path)
@@ -150,12 +135,12 @@ async fn pcapify(qmdl_path: &PathBuf) {
.expect("failed to open pcap file"); .expect("failed to open pcap file");
let mut pcap_writer = GsmtapPcapWriter::new(pcap_file).await.unwrap(); let mut pcap_writer = GsmtapPcapWriter::new(pcap_file).await.unwrap();
pcap_writer.write_iface_header().await.unwrap(); pcap_writer.write_iface_header().await.unwrap();
while let Some(container) = qmdl_reader while let Some(maybe_message) = qmdl_reader
.get_next_messages_container() .get_next_message()
.await .await
.expect("failed to get container") .expect("failed to get message")
{ {
for msg in container.into_messages().into_iter().flatten() { if let Ok(msg) = maybe_message {
if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) { if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) {
pcap_writer pcap_writer
.write_gsmtap_message(parsed, timestamp) .write_gsmtap_message(parsed, timestamp)
@@ -197,9 +182,7 @@ async fn main() {
let name_str = name.to_str().unwrap(); let name_str = name.to_str().unwrap();
let path = entry.path(); let path = entry.path();
let path_str = path.to_str().unwrap(); let path_str = path.to_str().unwrap();
// instead of relying on the QMDL extension, can we check if a file is if name_str.ends_with(".qmdl") || name_str.ends_with(".qmdl.gz") {
// QMDL by inspecting the contents?
if name_str.ends_with(".qmdl") {
info!("**** Beginning analysis of {name_str}"); info!("**** Beginning analysis of {name_str}");
analyze_qmdl(path_str, args.show_skipped).await; analyze_qmdl(path_str, args.show_skipped).await;
if args.pcapify { if args.pcapify {
+1 -1
View File
@@ -33,7 +33,7 @@ futures-macro = "0.3.30"
include_dir = "0.7.3" include_dir = "0.7.3"
chrono = { version = "0.4.31", features = ["serde"] } chrono = { version = "0.4.31", features = ["serde"] }
tokio-stream = { version = "0.1.14", default-features = false, features = ["io-util"] } tokio-stream = { version = "0.1.14", default-features = false, features = ["io-util"] }
futures = { version = "0.3.30", default-features = false } futures = { version = "0.3.32", default-features = false, features = ["std"] }
serde_json = "1.0.114" serde_json = "1.0.114"
image = { version = "0.25.1", default-features = false, features = ["png", "gif"] } image = { version = "0.25.1", default-features = false, features = ["png", "gif"] }
tempfile = "3.10.2" tempfile = "3.10.2"
+21 -23
View File
@@ -1,16 +1,14 @@
use std::sync::Arc; use std::sync::Arc;
use std::{cmp, future, pin}; use std::cmp;
use axum::Json; use axum::Json;
use axum::{ use axum::{
extract::{Path, State}, extract::{Path, State},
http::StatusCode, http::StatusCode,
}; };
use futures::TryStreamExt;
use log::{error, info}; use log::{error, info};
use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness}; use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness};
use rayhunter::diag::{DataType, MessagesContainer}; use rayhunter::diag::{DiagParsingError, Message, MessagesContainer};
use rayhunter::qmdl::QmdlReader;
use serde::Serialize; use serde::Serialize;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::io::{AsyncWriteExt, BufWriter};
@@ -47,7 +45,7 @@ impl AnalysisWriter {
// Runs the analysis harness on the given container, serializing the results // Runs the analysis harness on the given container, serializing the results
// to the analysis file, returning the whether any warnings were detected // to the analysis file, returning the whether any warnings were detected
pub async fn analyze( pub async fn analyze_container(
&mut self, &mut self,
container: MessagesContainer, container: MessagesContainer,
) -> Result<EventType, std::io::Error> { ) -> Result<EventType, std::io::Error> {
@@ -62,6 +60,17 @@ impl AnalysisWriter {
Ok(max_type) Ok(max_type)
} }
pub async fn analyze_message(
&mut self,
maybe_qmdl_msg: Result<Message, DiagParsingError>,
) -> Result<EventType, std::io::Error> {
let row = self.harness.analyze_qmdl_message(maybe_qmdl_msg);
if !row.is_empty() {
self.write(&row).await?;
}
Ok(row.get_max_event_type())
}
async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> { async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
let mut value_str = serde_json::to_string(value).unwrap(); let mut value_str = serde_json::to_string(value).unwrap();
value_str.push('\n'); value_str.push('\n');
@@ -135,7 +144,7 @@ async fn perform_analysis(
analyzer_config: &AnalyzerConfig, analyzer_config: &AnalyzerConfig,
) -> Result<(), String> { ) -> Result<(), String> {
info!("Opening QMDL and analysis file for {name}..."); info!("Opening QMDL and analysis file for {name}...");
let (analysis_file, qmdl_file) = { let (analysis_file, mut qmdl_reader) = {
let mut qmdl_store = qmdl_store_lock.write().await; let mut qmdl_store = qmdl_store_lock.write().await;
let (entry_index, _) = qmdl_store let (entry_index, _) = qmdl_store
.entry_for_name(name) .entry_for_name(name)
@@ -144,37 +153,26 @@ async fn perform_analysis(
.clear_and_open_entry_analysis(entry_index) .clear_and_open_entry_analysis(entry_index)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
let qmdl_file = qmdl_store let qmdl_reader = qmdl_store
.open_entry_qmdl(entry_index) .open_entry_qmdl(entry_index)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
(analysis_file, qmdl_file) (analysis_file, qmdl_reader)
}; };
let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config) let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
let file_size = qmdl_file
.metadata()
.await
.expect("failed to get QMDL file metadata")
.len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin::pin!(
qmdl_reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace))
);
info!("Starting analysis for {name}..."); info!("Starting analysis for {name}...");
while let Some(container) = qmdl_stream while let Some(maybe_message) = qmdl_reader
.try_next() .get_next_message()
.await .await
.expect("failed getting QMDL container") .expect("failed to get message")
{ {
let _ = analysis_writer let _ = analysis_writer
.analyze(container) .analyze_message(maybe_message)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
} }
+23 -11
View File
@@ -63,7 +63,7 @@ pub struct DiagTask {
enum DiagState { enum DiagState {
Recording { Recording {
qmdl_writer: QmdlWriter<File>, qmdl_writer: Box<QmdlWriter<File>>,
analysis_writer: Box<AnalysisWriter>, analysis_writer: Box<AnalysisWriter>,
}, },
Stopped, Stopped,
@@ -143,7 +143,7 @@ impl DiagTask {
DiskSpaceCheck::Failed => {} DiskSpaceCheck::Failed => {}
} }
let (qmdl_file, analysis_file) = match qmdl_store.new_entry().await { let (qmdl_gz_file, analysis_file) = match qmdl_store.new_entry().await {
Ok(files) => files, Ok(files) => files,
Err(e) => { Err(e) => {
let msg = format!("failed creating QMDL file entry: {e}"); let msg = format!("failed creating QMDL file entry: {e}");
@@ -152,7 +152,7 @@ impl DiagTask {
} }
}; };
self.stop_current_recording().await; self.stop_current_recording().await;
let qmdl_writer = QmdlWriter::new(qmdl_file); let qmdl_writer = Box::new(QmdlWriter::new(qmdl_gz_file));
let analysis_writer = match AnalysisWriter::new(analysis_file, &self.analyzer_config).await let analysis_writer = match AnalysisWriter::new(analysis_file, &self.analyzer_config).await
{ {
Ok(writer) => Box::new(writer), Ok(writer) => Box::new(writer),
@@ -237,13 +237,23 @@ impl DiagTask {
let mut state = DiagState::Stopped; let mut state = DiagState::Stopped;
std::mem::swap(&mut self.state, &mut state); std::mem::swap(&mut self.state, &mut state);
if let DiagState::Recording { if let DiagState::Recording {
analysis_writer, .. qmdl_writer,
analysis_writer,
..
} = state } = state
{ {
analysis_writer match (qmdl_writer.close().await, analysis_writer.close().await) {
.close() (Ok(()), Ok(())) => {}
.await (qmdl_result, analysis_result) => {
.expect("failed to close analysis writer"); if let Err(err) = qmdl_result {
error!("failed to close QmdlWriter: {:?}", err);
}
if let Err(err) = analysis_result {
error!("failed to close AnalysisWriter: {:?}", err);
}
panic!();
}
}
} }
} }
@@ -313,15 +323,16 @@ impl DiagTask {
self.stop(qmdl_store, Some(reason)).await; self.stop(qmdl_store, Some(reason)).await;
return; return;
} }
if let Ok(file_size) = qmdl_writer.size().await {
debug!( debug!(
"total QMDL bytes written: {}, updating manifest...", "total QMDL bytes written: {}, updating manifest...",
qmdl_writer.total_written file_size
); );
let index = qmdl_store let index = qmdl_store
.current_entry .current_entry
.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); .expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
if let Err(e) = qmdl_store if let Err(e) = qmdl_store
.update_entry_qmdl_size(index, qmdl_writer.total_written) .update_entry_qmdl_size(index, file_size)
.await .await
{ {
let reason = format!("failed to update manifest (disk full?): {e}"); let reason = format!("failed to update manifest (disk full?): {e}");
@@ -330,9 +341,10 @@ impl DiagTask {
return; return;
} }
debug!("done!"); debug!("done!");
}
let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum(); let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum();
self.bytes_since_space_check += container_bytes; self.bytes_since_space_check += container_bytes;
let max_type = match analysis_writer.analyze(container).await { let max_type = match analysis_writer.analyze_container(container).await {
Ok(t) => t, Ok(t) => t,
Err(e) => { Err(e) => {
warn!("failed to analyze container: {e}"); warn!("failed to analyze container: {e}");
+7 -22
View File
@@ -7,12 +7,11 @@ use axum::http::StatusCode;
use axum::http::header::CONTENT_TYPE; use axum::http::header::CONTENT_TYPE;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use log::error; use log::error;
use rayhunter::diag::DataType;
use rayhunter::gsmtap_parser; use rayhunter::gsmtap_parser;
use rayhunter::pcap::GsmtapPcapWriter; use rayhunter::pcap::GsmtapPcapWriter;
use rayhunter::qmdl::QmdlReader; use rayhunter::qmdl::QmdlMessageReader;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, duplex}; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, duplex};
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data // Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
@@ -51,17 +50,14 @@ pub async fn get_pcap(
"QMDL file is empty, try again in a bit!".to_string(), "QMDL file is empty, try again in a bit!".to_string(),
)); ));
} }
let qmdl_size_bytes = entry.qmdl_size_bytes; let qmdl_reader = qmdl_store
let qmdl_file = qmdl_store
.open_entry_qmdl(entry_index) .open_entry_qmdl(entry_index)
.await .await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?; .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
// the QMDL reader should stop at the last successfully written data chunk
// (entry.size_bytes)
let (reader, writer) = duplex(1024); let (reader, writer) = duplex(1024);
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes).await { if let Err(e) = generate_pcap_data(writer, qmdl_reader).await {
error!("failed to generate PCAP: {e:?}"); error!("failed to generate PCAP: {e:?}");
} }
}); });
@@ -71,25 +67,15 @@ pub async fn get_pcap(
Ok((headers, body).into_response()) Ok((headers, body).into_response())
} }
pub async fn generate_pcap_data<R, W>( pub async fn generate_pcap_data<R, W>(writer: W, mut reader: QmdlMessageReader<R>) -> Result<(), Error>
writer: W,
qmdl_file: R,
qmdl_size_bytes: usize,
) -> Result<(), Error>
where where
W: AsyncWrite + Unpin + Send, W: AsyncWrite + Unpin + Send,
R: AsyncRead + Unpin, R: AsyncRead + AsyncSeek + Unpin,
{ {
let mut pcap_writer = GsmtapPcapWriter::new(writer).await?; let mut pcap_writer = GsmtapPcapWriter::new(writer).await?;
pcap_writer.write_iface_header().await?; pcap_writer.write_iface_header().await?;
let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes)); while let Some(maybe_msg) = reader.get_next_message().await? {
while let Some(container) = reader.get_next_messages_container().await? {
if container.data_type != DataType::UserSpace {
continue;
}
for maybe_msg in container.into_messages() {
match maybe_msg { match maybe_msg {
Ok(msg) => { Ok(msg) => {
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?; let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
@@ -102,7 +88,6 @@ where
Err(e) => error!("error parsing message: {e:?}"), Err(e) => error!("error parsing message: {e:?}"),
} }
} }
}
Ok(()) Ok(())
} }
+30 -9
View File
@@ -4,6 +4,7 @@ use std::path::{Path, PathBuf};
use chrono::{DateTime, Local}; use chrono::{DateTime, Local};
use log::{info, warn}; use log::{info, warn};
use rayhunter::qmdl::QmdlMessageReader;
use rayhunter::util::RuntimeMetadata; use rayhunter::util::RuntimeMetadata;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use thiserror::Error; use thiserror::Error;
@@ -57,7 +58,6 @@ pub struct ManifestEntry {
/// The system time when the last message was recorded to the file /// The system time when the last message was recorded to the file
#[cfg_attr(feature = "apidocs", schema(value_type = String))] #[cfg_attr(feature = "apidocs", schema(value_type = String))]
pub last_message_time: Option<DateTime<Local>>, pub last_message_time: Option<DateTime<Local>>,
/// The size of the QMDL file in bytes
pub qmdl_size_bytes: usize, pub qmdl_size_bytes: usize,
/// The rayhunter daemon version which generated the file /// The rayhunter daemon version which generated the file
pub rayhunter_version: Option<String>, pub rayhunter_version: Option<String>,
@@ -67,6 +67,8 @@ pub struct ManifestEntry {
pub arch: Option<String>, pub arch: Option<String>,
#[serde(default)] #[serde(default)]
pub stop_reason: Option<String>, pub stop_reason: Option<String>,
#[serde(default)]
pub compressed: bool,
} }
impl ManifestEntry { impl ManifestEntry {
@@ -82,12 +84,17 @@ impl ManifestEntry {
system_os: Some(metadata.system_os), system_os: Some(metadata.system_os),
arch: Some(metadata.arch), arch: Some(metadata.arch),
stop_reason: None, stop_reason: None,
compressed: true,
} }
} }
pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf { pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
let mut filepath = path.as_ref().join(&self.name); let mut filepath = path.as_ref().join(&self.name);
if self.compressed {
filepath.set_extension("qmdl.gz");
} else {
filepath.set_extension("qmdl"); filepath.set_extension("qmdl");
}
filepath filepath
} }
@@ -153,8 +160,9 @@ impl RecordingStore {
} }
// Does a best-effort attempt to recover the manifest from a directory of // Does a best-effort attempt to recover the manifest from a directory of
// QMDL files. We expect these files to be named like "<timestamp>.qmdl", // QMDL files. We expect these files to be named like "<timestamp>.qmdl"
// and skip any files which don't match that pattern. // or "<timestamp>.qmdl.gz", and skip any files which don't match that
// pattern.
pub async fn recover<P>(path: P) -> Result<Self, RecordingStoreError> pub async fn recover<P>(path: P) -> Result<Self, RecordingStoreError>
where where
P: AsRef<Path>, P: AsRef<Path>,
@@ -174,11 +182,14 @@ impl RecordingStore {
continue; continue;
}; };
if !filename.ends_with(".qmdl") { let (stem, compressed) = if filename.ends_with(".qmdl") {
(filename.trim_end_matches(".qmdl"), false)
} else if filename.ends_with(".qmdl.gz") {
(filename.trim_end_matches(".qmdl.gz"), true)
} else {
continue; continue;
} };
let stem = filename.trim_end_matches(".qmdl");
let Ok(start_timestamp) = stem.parse::<i64>() else { let Ok(start_timestamp) = stem.parse::<i64>() else {
warn!("QMDL file has invalid name {os_filename:?}, skipping"); warn!("QMDL file has invalid name {os_filename:?}, skipping");
continue; continue;
@@ -205,6 +216,7 @@ impl RecordingStore {
info!("successfully recovered QMDL entry {os_filename:?}!"); info!("successfully recovered QMDL entry {os_filename:?}!");
manifest_entries.push(ManifestEntry { manifest_entries.push(ManifestEntry {
name: stem.to_string(), name: stem.to_string(),
compressed,
start_time: start_time.into(), start_time: start_time.into(),
last_message_time: Some(last_message_time.into()), last_message_time: Some(last_message_time.into()),
qmdl_size_bytes: metadata.size() as usize, qmdl_size_bytes: metadata.size() as usize,
@@ -265,9 +277,15 @@ impl RecordingStore {
} }
// Returns the corresponding QMDL file for a given entry // Returns the corresponding QMDL file for a given entry
pub async fn open_entry_qmdl(&self, entry_index: usize) -> Result<File, RecordingStoreError> { pub async fn open_entry_qmdl(
&self,
entry_index: usize,
) -> Result<QmdlMessageReader<File>, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index]; let entry = &self.manifest.entries[entry_index];
File::open(entry.get_qmdl_filepath(&self.path)) let file = File::open(entry.get_qmdl_filepath(&self.path))
.await
.map_err(RecordingStoreError::ReadFileError)?;
QmdlMessageReader::new(file)
.await .await
.map_err(RecordingStoreError::ReadFileError) .map_err(RecordingStoreError::ReadFileError)
} }
@@ -490,7 +508,10 @@ mod tests {
.entry_for_name(&store.manifest.entries[entry_index].name) .entry_for_name(&store.manifest.entries[entry_index].name)
.unwrap(); .unwrap();
assert!(entry.last_message_time.is_some()); assert!(entry.last_message_time.is_some());
assert_eq!(store.manifest.entries[entry_index].qmdl_size_bytes, 1000); assert_eq!(
store.manifest.entries[entry_index].qmdl_size_bytes,
1000
);
assert_eq!( assert_eq!(
RecordingStore::read_manifest(dir.path()).await.unwrap(), RecordingStore::read_manifest(dir.path()).await.unwrap(),
store.manifest store.manifest
+117 -52
View File
@@ -14,7 +14,8 @@ use log::{error, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::fs::write; use tokio::fs::write;
use tokio::io::{AsyncReadExt, copy, duplex}; use tokio::io::copy;
use tokio::io::duplex;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio_util::compat::FuturesAsyncWriteCompatExt; use tokio_util::compat::FuturesAsyncWriteCompatExt;
@@ -64,7 +65,7 @@ pub async fn get_qmdl(
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
format!("couldn't find qmdl file with name {qmdl_idx}"), format!("couldn't find qmdl file with name {qmdl_idx}"),
))?; ))?;
let qmdl_file = qmdl_store let qmdl_reader = qmdl_store
.open_entry_qmdl(entry_index) .open_entry_qmdl(entry_index)
.await .await
.map_err(|err| { .map_err(|err| {
@@ -73,14 +74,15 @@ pub async fn get_qmdl(
format!("error opening QMDL file: {err}"), format!("error opening QMDL file: {err}"),
) )
})?; })?;
let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64);
let qmdl_stream = ReaderStream::new(limited_qmdl_file);
let headers = [ let headers = [
(CONTENT_TYPE, "application/octet-stream"), (CONTENT_TYPE, "application/octet-stream"),
(CONTENT_LENGTH, &entry.qmdl_size_bytes.to_string()), (
CONTENT_LENGTH,
&entry.qmdl_size_bytes.to_string(),
),
]; ];
let body = Body::from_stream(qmdl_stream); let body = Body::from_stream(qmdl_reader.into_qmdl_stream());
Ok((headers, body).into_response()) Ok((headers, body).into_response())
} }
@@ -308,7 +310,7 @@ pub async fn get_zip(
Path(entry_name): Path<String>, Path(entry_name): Path<String>,
) -> Result<Response, (StatusCode, String)> { ) -> Result<Response, (StatusCode, String)> {
let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned(); let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned();
let (entry_index, qmdl_size_bytes) = { let (entry_index, _) = {
let qmdl_store = state.qmdl_store_lock.read().await; let qmdl_store = state.qmdl_store_lock.read().await;
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or(( let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or((
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
@@ -322,7 +324,7 @@ pub async fn get_zip(
)); ));
} }
(entry_index, entry.qmdl_size_bytes) (entry_index, entry.compressed)
}; };
let qmdl_store_lock = state.qmdl_store_lock.clone(); let qmdl_store_lock = state.qmdl_store_lock.clone();
@@ -335,22 +337,18 @@ pub async fn get_zip(
// Add QMDL file // Add QMDL file
{ {
let entry = let entry = ZipEntryBuilder::new(
ZipEntryBuilder::new(format!("{qmdl_idx}.qmdl").into(), Compression::Stored); format!("{qmdl_idx}.qmdl.gz").into(),
// FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does Compression::Stored,
// not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed );
// once https://github.com/Majored/rs-async-zip/pull/160 is released. // FuturesAsyncWriteCompatExt::compat_write because async-zip's
// entrystream does not impl tokio's AsyncWrite, but only
// future's AsyncWrite. This can be removed once
// https://github.com/Majored/rs-async-zip/pull/160 is released.
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write(); let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
let mut qmdl_file = {
let qmdl_store = qmdl_store_lock.read().await; let qmdl_store = qmdl_store_lock.read().await;
qmdl_store let mut qmdl_reader = qmdl_store.open_entry_qmdl(entry_index).await?;
.open_entry_qmdl(entry_index) copy(&mut qmdl_reader, &mut entry_writer).await?;
.await?
.take(qmdl_size_bytes as u64)
};
copy(&mut qmdl_file, &mut entry_writer).await?;
entry_writer.into_inner().close().await?; entry_writer.into_inner().close().await?;
} }
@@ -360,17 +358,10 @@ pub async fn get_zip(
ZipEntryBuilder::new(format!("{qmdl_idx}.pcapng").into(), Compression::Stored); ZipEntryBuilder::new(format!("{qmdl_idx}.pcapng").into(), Compression::Stored);
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write(); let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
let qmdl_file_for_pcap = {
let qmdl_store = qmdl_store_lock.read().await; let qmdl_store = qmdl_store_lock.read().await;
qmdl_store let qmdl_reader = qmdl_store.open_entry_qmdl(entry_index).await?;
.open_entry_qmdl(entry_index)
.await?
.take(qmdl_size_bytes as u64)
};
if let Err(e) = if let Err(e) = generate_pcap_data(&mut entry_writer, qmdl_reader).await {
generate_pcap_data(&mut entry_writer, qmdl_file_for_pcap, qmdl_size_bytes).await
{
// if we fail to generate the PCAP file, we should still continue and give the // if we fail to generate the PCAP file, we should still continue and give the
// user the QMDL. // user the QMDL.
error!("Failed to generate PCAP: {e:?}"); error!("Failed to generate PCAP: {e:?}");
@@ -434,9 +425,13 @@ pub async fn debug_set_display_state(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io::Cursor;
use super::*; use super::*;
use async_zip::base::read::mem::ZipFileReader; use async_zip::base::read::mem::ZipFileReader;
use axum::extract::{Path, State}; use axum::extract::{Path, State};
use futures::AsyncReadExt;
use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlMessageReader, QmdlWriter}};
use tempfile::TempDir; use tempfile::TempDir;
async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) { async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
@@ -450,24 +445,24 @@ mod tests {
async fn create_test_entry_with_data( async fn create_test_entry_with_data(
store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>, store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>,
test_data: &[u8], test_data: &MessagesContainer,
) -> String { ) -> String {
let entry_name = { let entry_name = {
let mut store = store_lock.write().await; let mut store = store_lock.write().await;
let (mut qmdl_file, _analysis_file) = store.new_entry().await.unwrap(); let (mut qmdl_gz_file, _analysis_file) = store.new_entry().await.unwrap();
if !test_data.is_empty() { let mut writer = QmdlWriter::new(&mut qmdl_gz_file);
use tokio::io::AsyncWriteExt; writer.write_container(test_data).await.unwrap();
qmdl_file.write_all(test_data).await.unwrap(); writer.close().await.unwrap();
qmdl_file.flush().await.unwrap();
} let qmdl_file_size = qmdl_gz_file.metadata().await.unwrap().len() as usize;
let current_entry = store.current_entry.unwrap(); let current_entry = store.current_entry.unwrap();
let entry = &store.manifest.entries[current_entry]; let entry = &store.manifest.entries[current_entry];
let entry_name = entry.name.clone(); let entry_name = entry.name.clone();
store store
.update_entry_qmdl_size(current_entry, test_data.len()) .update_entry_qmdl_size(current_entry, qmdl_file_size)
.await .await
.unwrap(); .unwrap();
entry_name entry_name
@@ -501,17 +496,69 @@ mod tests {
}) })
} }
// valid HDLC encapsulated diag message generated from
// rayhunter::diag::test::get_test_message
fn create_test_container() -> MessagesContainer {
MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![
HdlcEncapsulatedMessage {
len: 39,
data: vec![
16,
0,
32,
0,
32,
0,
192,
176,
26,
165,
245,
135,
118,
35,
2,
1,
20,
14,
48,
0,
160,
0,
2,
8,
0,
0,
217,
15,
5,
0,
0,
0,
0,
1,
0,
10,
13,
196,
126,
],
},
],
}
}
#[tokio::test] #[tokio::test]
async fn test_get_zip_success() { async fn test_get_zip_success() {
let (_temp_dir, store_lock) = create_test_qmdl_store().await; let (_temp_dir, store_lock) = create_test_qmdl_store().await;
let test_qmdl_data = vec![0x7E, 0x00, 0x00, 0x00, 0x10, 0x00, 0x7E]; let test_qmdl_data = create_test_container();
let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await; let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await;
let state = create_test_server_state(store_lock); let state = create_test_server_state(store_lock);
let result = get_zip(State(state), Path(entry_name.clone())).await; let response = get_zip(State(state), Path(entry_name.clone())).await.unwrap();
assert!(result.is_ok());
let response = result.unwrap();
let headers = response.headers(); let headers = response.headers();
assert_eq!(headers.get("content-type").unwrap(), "application/zip"); assert_eq!(headers.get("content-type").unwrap(), "application/zip");
@@ -520,17 +567,35 @@ mod tests {
let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap(); let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap();
let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap(); let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap();
let zip_reader_file = zip_reader.file();
let filenames = zip_reader // sanity check that our ZIP has no empty files
.file() for entry in zip_reader_file.entries() {
.entries() assert_ne!(entry.uncompressed_size(), 0);
}
let filenames: Vec<String> = zip_reader_file.entries()
.iter() .iter()
.map(|entry| entry.filename().as_str().unwrap().to_owned()) .map(|entry| entry.filename().as_str().unwrap().to_string())
.collect::<Vec<String>>(); .collect();
assert_eq!( assert_eq!(
filenames, filenames,
vec![format!("{entry_name}.qmdl"), format!("{entry_name}.pcapng"),] vec![
format!("{entry_name}.qmdl.gz"),
format!("{entry_name}.pcapng"),
]
);
let mut qmdl_body = Vec::with_capacity(128);
zip_reader.reader_without_entry(0)
.await
.unwrap()
.read_to_end(&mut qmdl_body)
.await
.unwrap();
let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body)).await.unwrap();
let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap();
assert_eq!(
qmdl_reader.get_next_message().await.unwrap(),
Some(Ok(expected_message)),
); );
} }
} }
+1
View File
@@ -30,5 +30,6 @@ serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
num_enum = "0.7.4" num_enum = "0.7.4"
utoipa = { version = "5.4.0", optional = true } utoipa = { version = "5.4.0", optional = true }
async-compression = { version = "0.4.41", features = ["tokio", "gzip"] }
[dev-dependencies] [dev-dependencies]
+22 -16
View File
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use std::borrow::Cow; use std::borrow::Cow;
use crate::analysis::diagnostic::DiagnosticAnalyzer; use crate::analysis::diagnostic::DiagnosticAnalyzer;
use crate::diag::{DiagParsingError, Message};
use crate::gsmtap::{GsmtapHeader, GsmtapMessage, GsmtapType}; use crate::gsmtap::{GsmtapHeader, GsmtapMessage, GsmtapType};
use crate::util::RuntimeMetadata; use crate::util::RuntimeMetadata;
use crate::{diag::MessagesContainer, gsmtap_parser}; use crate::{diag::MessagesContainer, gsmtap_parser};
@@ -231,6 +232,14 @@ pub struct AnalysisRow {
} }
impl AnalysisRow { impl AnalysisRow {
pub fn new() -> Self {
Self {
packet_timestamp: None,
skipped_message_reason: None,
events: vec![],
}
}
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.skipped_message_reason.is_none() && !self.contains_warnings() self.skipped_message_reason.is_none() && !self.contains_warnings()
} }
@@ -412,36 +421,27 @@ impl Harness {
row row
} }
pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> { pub fn analyze_qmdl_message(&mut self, maybe_qmdl_message: Result<Message, DiagParsingError>) -> AnalysisRow {
let mut rows = Vec::new(); let mut row = AnalysisRow::new();
for maybe_qmdl_message in container.into_messages() {
self.packet_num += 1; self.packet_num += 1;
rows.push(AnalysisRow {
packet_timestamp: None,
skipped_message_reason: None,
events: Vec::new(),
});
// unwrap is safe here since we just pushed a value
let row = rows.last_mut().unwrap();
let qmdl_message = match maybe_qmdl_message { let qmdl_message = match maybe_qmdl_message {
Ok(msg) => msg, Ok(msg) => msg,
Err(err) => { Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}")); row.skipped_message_reason = Some(format!("{err:?}"));
continue; return row;
} }
}; };
let gsmtap_message = match gsmtap_parser::parse(qmdl_message) { let gsmtap_message = match gsmtap_parser::parse(qmdl_message) {
Ok(msg) => msg, Ok(msg) => msg,
Err(err) => { Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}")); row.skipped_message_reason = Some(format!("{err:?}"));
continue; return row;
} }
}; };
let Some((timestamp, gsmtap_msg)) = gsmtap_message else { let Some((timestamp, gsmtap_msg)) = gsmtap_message else {
continue; return row;
}; };
row.packet_timestamp = Some(timestamp.to_datetime()); row.packet_timestamp = Some(timestamp.to_datetime());
@@ -449,13 +449,19 @@ impl Harness {
Ok(element) => element, Ok(element) => element,
Err(err) => { Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}")); row.skipped_message_reason = Some(format!("{err:?}"));
continue; return row;
} }
}; };
row.events = self.analyze_information_element(&element); row.events = self.analyze_information_element(&element);
row
} }
rows
pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> {
container.into_messages()
.drain(..)
.map(|maybe_message| self.analyze_qmdl_message(maybe_message))
.collect()
} }
fn analyze_information_element(&mut self, ie: &InformationElement) -> Vec<Option<Event>> { fn analyze_information_element(&mut self, ie: &InformationElement) -> Vec<Option<Event>> {
+35 -20
View File
@@ -1,5 +1,6 @@
//! Diag protocol serialization/deserialization //! Diag protocol serialization/deserialization
use bytes::Bytes;
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use crc::{Algorithm, Crc}; use crc::{Algorithm, Crc};
use deku::prelude::*; use deku::prelude::*;
@@ -89,30 +90,19 @@ impl MessagesContainer {
let mut result = Vec::new(); let mut result = Vec::new();
for msg in self.messages { for msg in self.messages {
for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) { for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) {
match hdlc_decapsulate(sub_msg, &CRC_CCITT) { result.push(Message::from_hdlc(sub_msg));
Ok(data) => match Message::from_bytes((&data, 0)) {
Ok(((leftover_bytes, _), res)) => {
if !leftover_bytes.is_empty() {
warn!(
"warning: {} leftover bytes when parsing Message",
leftover_bytes.len()
);
}
result.push(Ok(res));
}
Err(e) => result.push(Err(DiagParsingError::MessageParsingError(e, data))),
},
Err(err) => result.push(Err(DiagParsingError::HdlcDecapsulationError(
err,
sub_msg.to_vec(),
))),
}
} }
} }
result result
} }
} }
impl From<MessagesContainer> for Bytes {
fn from(value: MessagesContainer) -> Self {
value.to_bytes().unwrap().into()
}
}
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
pub struct HdlcEncapsulatedMessage { pub struct HdlcEncapsulatedMessage {
pub len: u32, pub len: u32,
@@ -152,6 +142,29 @@ pub enum Message {
}, },
} }
impl Message {
pub fn from_hdlc(data: &[u8]) -> Result<Message, DiagParsingError> {
match hdlc_decapsulate(data, &CRC_CCITT) {
Ok(data) => match Message::from_bytes((&data, 0)) {
Ok(((leftover_bytes, _), res)) => {
if !leftover_bytes.is_empty() {
warn!(
"warning: {} leftover bytes when parsing Message",
leftover_bytes.len()
);
}
Ok(res)
}
Err(e) => Err(DiagParsingError::MessageParsingError(e, data)),
},
Err(err) => Err(DiagParsingError::HdlcDecapsulationError(
err,
data.to_vec(),
)),
}
}
}
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
#[deku(ctx = "log_type: u16, hdr_len: u16", id = "log_type")] #[deku(ctx = "log_type: u16, hdr_len: u16", id = "log_type")]
pub enum LogBody { pub enum LogBody {
@@ -411,7 +424,7 @@ pub fn build_log_mask_request(
} }
#[cfg(test)] #[cfg(test)]
mod test { pub(crate) mod test {
use super::*; use super::*;
// Just about all of these test cases from manually parsing diag packets w/ QCSuper // Just about all of these test cases from manually parsing diag packets w/ QCSuper
@@ -525,7 +538,7 @@ mod test {
// this log is based on one captured on a real device -- if it fails to // this log is based on one captured on a real device -- if it fails to
// serialize or deserialize, that's probably a problem with this mock, not // serialize or deserialize, that's probably a problem with this mock, not
// the DiagReader implementation // the DiagReader implementation
fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) { pub fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) {
let length_with_payload = 31 + payload.len() as u16; let length_with_payload = 31 + payload.len() as u16;
let message = Message::Log { let message = Message::Log {
pending_msgs: 0, pending_msgs: 0,
@@ -559,6 +572,8 @@ mod test {
len: encapsulated_data.len() as u32, len: encapsulated_data.len() as u32,
data: encapsulated_data, data: encapsulated_data,
}; };
// sanity check
assert_eq!(&Message::from_hdlc(&encapsulated.data).unwrap(), &message);
(encapsulated, message) (encapsulated, message)
} }
+294 -138
View File
@@ -3,109 +3,212 @@
//! QmdlReader and QmdlWriter can read and write MessagesContainers to and from //! QmdlReader and QmdlWriter can read and write MessagesContainers to and from
//! QMDL files. //! QMDL files.
use crate::diag::{DataType, HdlcEncapsulatedMessage, MESSAGE_TERMINATOR, MessagesContainer}; use std::io::ErrorKind;
use std::pin::Pin;
use std::task::Poll;
use crate::diag::{DiagParsingError, MESSAGE_TERMINATOR, Message, MessagesContainer};
use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::write::GzipEncoder;
use futures::TryStream; use futures::TryStream;
use log::error; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
const GZIP_MAGIC_NUMBER: u16 = 0x1f8b;
pub struct QmdlWriter<T> pub struct QmdlWriter<T>
where where
T: AsyncWrite + Unpin, T: AsyncWrite + Unpin,
{ {
writer: T, writer: GzipEncoder<T>,
pub total_written: usize,
} }
impl<T> QmdlWriter<T> impl<T> QmdlWriter<T>
where where
T: AsyncWrite + Unpin, T: AsyncWrite + AsyncSeek + Unpin,
{ {
pub fn new(writer: T) -> Self { pub fn new(writer: T) -> Self {
QmdlWriter::new_with_existing_size(writer, 0) let gzip_writer = GzipEncoder::new(writer);
QmdlWriter {
writer: gzip_writer,
}
} }
pub fn new_with_existing_size(writer: T, existing_size: usize) -> Self { pub async fn size(&mut self) -> std::io::Result<usize> {
QmdlWriter { let size = self.writer.get_mut().stream_position().await?;
writer, Ok(size as usize)
total_written: existing_size,
}
} }
pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> { pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> {
for msg in &container.messages { for msg in &container.messages {
self.writer.write_all(&msg.data).await?; self.writer.write_all(&msg.data).await?;
self.total_written += msg.data.len(); self.writer.flush().await?;
} }
Ok(()) Ok(())
} }
pub async fn close(mut self) -> std::io::Result<()> {
self.writer.shutdown().await?;
Ok(())
}
} }
pub struct QmdlReader<T> #[derive(Debug)]
enum QmdlReaderSource<T> {
Compressed {
reader: GzipDecoder<BufReader<T>>,
eof: bool,
},
Uncompressed {
reader: T,
},
}
#[derive(Debug)]
struct QmdlAsyncReader<T> {
source: QmdlReaderSource<T>,
}
impl<T> QmdlAsyncReader<T>
where where
T: AsyncRead, T: AsyncRead,
{ {
reader: BufReader<T>, pub fn new(reader: T, compressed: bool) -> Self {
bytes_read: usize, let source = if compressed {
max_bytes: Option<usize>, QmdlReaderSource::Compressed {
reader: GzipDecoder::new(BufReader::new(reader)),
eof: false,
}
} else {
QmdlReaderSource::Uncompressed { reader }
};
Self {
source,
}
}
} }
impl<T> QmdlReader<T> impl<T> AsyncRead for QmdlAsyncReader<T>
where where
T: AsyncRead + Unpin, T: AsyncRead + Unpin,
{ {
pub fn new(reader: T, max_bytes: Option<usize>) -> Self { fn poll_read(
QmdlReader { self: Pin<&mut Self>,
reader: BufReader::new(reader), cx: &mut std::task::Context<'_>,
bytes_read: 0, buf: &mut tokio::io::ReadBuf<'_>,
max_bytes, ) -> Poll<std::io::Result<()>> {
let res = match &mut self.get_mut().source {
QmdlReaderSource::Compressed { reader, eof } => {
// if we already determined we've reached the Gzip EOF, don't read more
if *eof {
return Poll::Ready(Ok(()));
}
match Pin::new(reader).poll_read(cx, buf) {
// if we hit an unexpected EOF in a Gzip file, it shouldn't
// be considered fatal, just a truncated file. mark that
// we're done and return the result as usual
Poll::Ready(Err(err)) if err.kind() == ErrorKind::UnexpectedEof => {
*eof = true;
Poll::Ready(Ok(()))
}
res => res,
}
}
QmdlReaderSource::Uncompressed { reader } => Pin::new(reader).poll_read(cx, buf),
};
res
} }
} }
pub fn as_stream( #[derive(Debug)]
&mut self, pub struct QmdlMessageReader<T>
) -> impl TryStream<Ok = MessagesContainer, Error = std::io::Error> + '_ { where
futures::stream::try_unfold(self, |reader| async { T: AsyncRead,
let maybe_container = reader.get_next_messages_container().await?; {
match maybe_container { buf_reader: BufReader<QmdlAsyncReader<T>>,
Some(container) => Ok(Some((container, reader))), }
async fn is_gzip_stream<T>(mut reader: T) -> std::io::Result<bool>
where
T: AsyncRead + AsyncSeek + Unpin
{
let magic_number = reader.read_u16().await?;
reader.rewind().await?;
// this is safe because 0x1f8b.... doesn't overlap with any known
// diag::DataType values
Ok(magic_number == GZIP_MAGIC_NUMBER)
}
impl<T> QmdlMessageReader<T>
where
T: AsyncRead + AsyncSeek + Unpin,
{
pub async fn new(mut reader: T) -> std::io::Result<Self> {
let compressed = is_gzip_stream(&mut reader)
.await
.unwrap_or(false);
Ok(QmdlMessageReader {
buf_reader: BufReader::new(QmdlAsyncReader::new(
reader,
compressed,
)),
})
}
pub fn is_compressed(&self) -> bool {
matches!(self.buf_reader.get_ref().source, QmdlReaderSource::Compressed { .. })
}
pub fn into_qmdl_stream(self) -> impl TryStream<Ok = Vec<u8>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async {
let mut buf = vec![];
match reader .buf_reader
.read_until(MESSAGE_TERMINATOR, &mut buf)
.await {
Err(err) => Err(err),
Ok(0) => Ok(None),
Ok(_) => Ok(Some((buf, reader))),
}
})
}
pub fn into_message_stream(self) -> impl TryStream<Ok = Result<Message, DiagParsingError>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async {
match reader.get_next_message().await? {
Some(res) => Ok(Some((res, reader))),
None => Ok(None), None => Ok(None),
} }
}) })
} }
pub async fn get_next_messages_container( pub async fn get_next_message(
&mut self, &mut self,
) -> Result<Option<MessagesContainer>, std::io::Error> { ) -> Result<Option<Result<Message, DiagParsingError>>, std::io::Error> {
if let Some(max_bytes) = self.max_bytes let mut buf = vec![];
&& self.bytes_read >= max_bytes if self
.buf_reader
.read_until(MESSAGE_TERMINATOR, &mut buf)
.await?
== 0
{ {
if self.bytes_read > max_bytes {
error!(
"warning: {} bytes read, but max_bytes was {}",
self.bytes_read, max_bytes
);
}
return Ok(None); return Ok(None);
} }
let mut buf = Vec::new(); Ok(Some(Message::from_hdlc(&buf)))
let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf).await?; }
self.bytes_read += bytes_read; }
// Since QMDL is just a flat list of messages, we can't actually impl<T> AsyncRead for QmdlMessageReader<T>
// reproduce the container structure they came from in the original where
// read. So we'll just pretend that all containers had exactly one T: AsyncRead + Unpin,
// message. As far as I know, the number of messages per container {
// doesn't actually affect anything, so this should be fine. fn poll_read(
Ok(Some(MessagesContainer { self: Pin<&mut Self>,
data_type: DataType::UserSpace, cx: &mut std::task::Context<'_>,
num_messages: 1, buf: &mut tokio::io::ReadBuf<'_>,
messages: vec![HdlcEncapsulatedMessage { ) -> Poll<std::io::Result<()>> {
len: bytes_read as u32, Pin::new(&mut self.get_mut().buf_reader).poll_read(cx, buf)
data: buf,
}],
}))
} }
} }
@@ -113,132 +216,185 @@ where
mod test { mod test {
use std::io::Cursor; use std::io::Cursor;
use crate::diag::CRC_CCITT; use crate::diag::{DataType, HdlcEncapsulatedMessage, test::get_test_message};
use crate::hdlc::hdlc_encapsulate;
use super::*; use super::*;
fn get_test_messages() -> Vec<HdlcEncapsulatedMessage> { fn get_test_messages() -> (Vec<HdlcEncapsulatedMessage>, Vec<Message>) {
let messages: Vec<HdlcEncapsulatedMessage> = (10..20) let mut hdlcs = Vec::new();
.map(|i| { let mut messages = Vec::new();
let data = hdlc_encapsulate(&vec![i as u8; i], &CRC_CCITT); for i in 10..20 {
HdlcEncapsulatedMessage { let (hdlc, msg) = get_test_message(&[i]);
len: data.len() as u32, hdlcs.push(hdlc);
data, messages.push(msg);
} }
}) (hdlcs, messages)
.collect();
messages
} }
// returns a byte array consisting of concatenated HDLC encapsulated // returns a byte array consisting of concatenated HDLC encapsulated
// test messages // test messages
fn get_test_message_bytes() -> Vec<u8> { fn get_test_message_bytes() -> Vec<u8> {
get_test_messages() let (hdlcs, _) = get_test_messages();
hdlcs
.iter() .iter()
.flat_map(|msg| msg.data.clone()) .flat_map(|msg| msg.data.clone())
.collect() .collect()
} }
fn get_test_containers() -> Vec<MessagesContainer> { fn get_test_containers() -> Vec<MessagesContainer> {
let messages = get_test_messages(); let (hdlcs, _) = get_test_messages();
let (messages1, messages2) = messages.split_at(5); let (hdlcs1, hdlcs2) = hdlcs.split_at(5);
vec![ vec![
MessagesContainer { MessagesContainer {
data_type: DataType::UserSpace, data_type: DataType::UserSpace,
num_messages: messages1.len() as u32, num_messages: hdlcs1.len() as u32,
messages: messages1.to_vec(), messages: hdlcs1.to_vec(),
}, },
MessagesContainer { MessagesContainer {
data_type: DataType::UserSpace, data_type: DataType::UserSpace,
num_messages: messages2.len() as u32, num_messages: hdlcs2.len() as u32,
messages: messages2.to_vec(), messages: hdlcs2.to_vec(),
}, },
] ]
} }
#[tokio::test] #[tokio::test]
async fn test_unbounded_qmdl_reader() { async fn test_qmdl_reader() {
let mut buf = Cursor::new(get_test_message_bytes()); let mut buf = Cursor::new(get_test_message_bytes());
let mut reader = QmdlReader::new(&mut buf, None); let mut reader = QmdlMessageReader::new(&mut buf).await.unwrap();
let expected_messages = get_test_messages(); assert!(!reader.is_compressed());
for message in expected_messages { let (_, expected_messages) = get_test_messages();
let expected_container = MessagesContainer { for msg in expected_messages {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![message],
};
assert_eq!( assert_eq!(
expected_container, Ok(msg),
reader.get_next_messages_container().await.unwrap().unwrap() reader.get_next_message().await.unwrap().unwrap()
); );
} }
} }
#[tokio::test] #[tokio::test]
async fn test_bounded_qmdl_reader() { async fn test_truncation() {
let mut buf = Cursor::new(get_test_message_bytes()); run_truncation_tests(false).await;
}
// bound the reader to the first two messages #[tokio::test]
let mut expected_messages = get_test_messages(); async fn test_compressed_truncation() {
let limit = expected_messages[0].len + expected_messages[1].len; run_truncation_tests(true).await;
}
let mut reader = QmdlReader::new(&mut buf, Some(limit as usize)); async fn run_truncation_tests(compressed: bool) {
for message in expected_messages.drain(0..2) { let (hdlcs, expected_messages) = get_test_messages();
let expected_container = MessagesContainer { let (bytes, message_lengths): (Vec<u8>, Vec<usize>) = if compressed {
data_type: DataType::UserSpace, let mut buf = Vec::new();
num_messages: 1, let mut compressed_lengths = Vec::new();
messages: vec![message], let mut writer = GzipEncoder::new(&mut buf);
for hdlc in &hdlcs {
let before = writer.get_ref().len();
writer.write_all(&hdlc.data).await.unwrap();
writer.flush().await.unwrap();
let after = writer.get_ref().len();
compressed_lengths.push(after - before);
}
(buf, compressed_lengths)
} else {
(
get_test_message_bytes(),
hdlcs.iter()
.map(|hdlc| hdlc.data.len())
.collect()
)
}; };
for truncated_hdlc_i in 1..hdlcs.len() - 1 {
let whole_bytes: usize = message_lengths.iter().take(truncated_hdlc_i).sum();
for truncated_byte in 1..message_lengths[truncated_hdlc_i] {
let mut truncated_bytes = Cursor::new(&bytes[0..whole_bytes + truncated_byte]);
let mut reader = QmdlMessageReader::new(&mut truncated_bytes).await.unwrap();
for msg in expected_messages.iter().take(truncated_hdlc_i) {
assert_eq!( assert_eq!(
expected_container, Ok(msg),
reader.get_next_messages_container().await.unwrap().unwrap() reader.get_next_message().await.unwrap().unwrap().as_ref()
);
}
if compressed {
// for a compressed reader, we have a couple possible
// outcomes, depending on how far along the Gzip DEFLATE
// block was before it was truncated:
match reader.get_next_message().await.unwrap() {
// if the block was truncated early enough, the
// GzipDecoder will detect an unexpected EOF, and our
// QmdlReader will indicate the stream of messages is
// done
None => {},
// if it's further along, the expanded result will be an
// invalid HDLC block. if that's the case, make sure the
// QmdlReader indicates the stream of messages is over
// with afterwards
Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))) => {
assert!(matches!(reader.get_next_message().await, Ok(None)));
},
// if it's further along still, we may get a complete
// Message, so make sure it matches the next expected
// one. then, make sure we've hit the end of the message
// stream
Some(Ok(msg)) => {
assert_eq!(&msg, &expected_messages[truncated_hdlc_i]);
assert!(matches!(reader.get_next_message().await, Ok(None)));
},
// we should never be able to decapsulate the HDLC into
// an invalid Diag message
Some(Err(DiagParsingError::MessageParsingError(_, _)))
=> {
panic!("unexpected MessageParsingError");
}
}
} else {
// a truncated uncompressed reader should always end on an
// HdlcDecapsulationError, and then return Ok(None) to
// indicate the message stream is over
assert!(matches!(
reader.get_next_message().await,
Ok(Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))))
));
assert!(matches!(reader.get_next_message().await, Ok(None)));
}
}
}
}
/// Writes the test containers to a QmdlWriter, optionally finishing the
/// gzip stream with a footer. Then, attempts to decompress the buffer with
/// a QmdlWriter, asserting that the containers match what's expected.
async fn run_compressed_reading_and_writing_tests(do_close: bool) {
let containers = get_test_containers();
let mut buf = Cursor::new(Vec::new());
{
let mut writer = QmdlWriter::new(&mut buf);
for container in &containers {
writer.write_container(&container).await.unwrap();
}
if do_close {
writer.close().await.unwrap();
}
}
buf.set_position(0);
let mut reader = QmdlMessageReader::new(buf).await.unwrap();
assert!(reader.is_compressed());
let (_, expected_messages) = get_test_messages();
for message in expected_messages {
assert_eq!(
Ok(message),
reader.get_next_message().await.unwrap().unwrap()
); );
} }
assert!(matches!( assert!(matches!(
reader.get_next_messages_container().await, reader.get_next_message().await,
Ok(None) Ok(None)
)); ));
} }
#[tokio::test] #[tokio::test]
async fn test_qmdl_writer() { async fn test_compressed_reading_and_writing() {
let mut buf = Vec::new(); run_compressed_reading_and_writing_tests(true).await;
let mut writer = QmdlWriter::new(&mut buf); run_compressed_reading_and_writing_tests(false).await;
let expected_containers = get_test_containers();
for container in &expected_containers {
writer.write_container(container).await.unwrap();
}
assert_eq!(writer.total_written, buf.len());
assert_eq!(buf, get_test_message_bytes());
}
#[tokio::test]
async fn test_writing_and_reading() {
let mut buf = Vec::new();
let mut writer = QmdlWriter::new(&mut buf);
let expected_containers = get_test_containers();
for container in &expected_containers {
writer.write_container(container).await.unwrap();
}
let limit = Some(buf.len());
let mut reader = QmdlReader::new(Cursor::new(&mut buf), limit);
let expected_messages = get_test_messages();
for message in expected_messages {
let expected_container = MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![message],
};
assert_eq!(
expected_container,
reader.get_next_messages_container().await.unwrap().unwrap()
);
}
assert!(matches!(
reader.get_next_messages_container().await,
Ok(None)
));
} }
} }