Refactor and simplify QmdlReader

In the past, QmdlReader was written to share a trait with DiagDevice, so
it had to pretend to be reading MessagesContainers. This needlessly
complicated both its code as well as that of consumers'. Instead,
QmdlReader now returns a stream of diag Messages.

QmdlReader also automatically detects if it's reading a compressed QMDL
stream or not.

Additionally, QmdlReader no longer can be bounded by a filesize limit,
and instead relies on HDLC message framing to detect file truncation.
This works for both compressed and uncompressed QMDL files.
This commit is contained in:
Will Greenberg
2026-05-07 16:46:59 -07:00
parent af4a9aeb95
commit 0cd70ad73c
9 changed files with 368 additions and 270 deletions
+11 -23
View File
@@ -1,15 +1,13 @@
use clap::Parser; use clap::Parser;
use futures::TryStreamExt;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use pcap_file_tokio::pcapng::{Block, PcapNgReader}; use pcap_file_tokio::pcapng::{Block, PcapNgReader};
use rayhunter::{ use rayhunter::{
analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness}, analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness},
diag::DataType,
gsmtap_parser, gsmtap_parser,
pcap::GsmtapPcapWriter, pcap::GsmtapPcapWriter,
qmdl::QmdlReader, qmdl::QmdlReader,
}; };
use std::{collections::HashMap, future, path::PathBuf, pin::pin}; use std::{collections::HashMap, path::PathBuf};
use tokio::fs::File; use tokio::fs::File;
use walkdir::WalkDir; use walkdir::WalkDir;
@@ -113,22 +111,14 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) {
async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) { async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) {
let mut harness = Harness::new_with_config(&AnalyzerConfig::default()); let mut harness = Harness::new_with_config(&AnalyzerConfig::default());
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file"); let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
let compressed = qmdl_path.ends_with(".gz"); let mut qmdl_reader = QmdlReader::new(qmdl_file).await.expect("failed to open QmdlReader");
let qmdl_reader = QmdlReader::new(qmdl_file, compressed, None);
let mut qmdl_stream = pin!(
qmdl_reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace))
);
let mut report = Report::new(qmdl_path); let mut report = Report::new(qmdl_path);
while let Some(container) = qmdl_stream while let Some(maybe_message) = qmdl_reader
.try_next() .get_next_message()
.await .await
.expect("failed getting QMDL container") .expect("failed to get message")
{ {
for row in harness.analyze_qmdl_messages(container) { report.process_row(harness.analyze_qmdl_message(maybe_message));
report.process_row(row);
}
} }
report.print_summary(show_skipped); report.print_summary(show_skipped);
} }
@@ -137,9 +127,7 @@ async fn pcapify(qmdl_path: &PathBuf) {
let qmdl_file = &mut File::open(&qmdl_path) let qmdl_file = &mut File::open(&qmdl_path)
.await .await
.expect("failed to open qmdl file"); .expect("failed to open qmdl file");
let compressed = qmdl_path.ends_with(".gz"); let mut qmdl_reader = QmdlReader::new(qmdl_file).await.expect("failed to open QmdlReader");
let qmdl_file_size = qmdl_file.metadata().await.unwrap().len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, compressed, Some(qmdl_file_size as usize));
let mut pcap_path = qmdl_path.clone(); let mut pcap_path = qmdl_path.clone();
pcap_path.set_extension("pcapng"); pcap_path.set_extension("pcapng");
let pcap_file = &mut File::create(&pcap_path) let pcap_file = &mut File::create(&pcap_path)
@@ -147,12 +135,12 @@ async fn pcapify(qmdl_path: &PathBuf) {
.expect("failed to open pcap file"); .expect("failed to open pcap file");
let mut pcap_writer = GsmtapPcapWriter::new(pcap_file).await.unwrap(); let mut pcap_writer = GsmtapPcapWriter::new(pcap_file).await.unwrap();
pcap_writer.write_iface_header().await.unwrap(); pcap_writer.write_iface_header().await.unwrap();
while let Some(container) = qmdl_reader while let Some(maybe_message) = qmdl_reader
.get_next_messages_container() .get_next_message()
.await .await
.expect("failed to get container") .expect("failed to get message")
{ {
for msg in container.into_messages().into_iter().flatten() { if let Ok(msg) = maybe_message {
if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) { if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) {
pcap_writer pcap_writer
.write_gsmtap_message(parsed, timestamp) .write_gsmtap_message(parsed, timestamp)
+19 -14
View File
@@ -1,15 +1,14 @@
use std::sync::Arc; use std::sync::Arc;
use std::{cmp, future, pin}; use std::cmp;
use axum::Json; use axum::Json;
use axum::{ use axum::{
extract::{Path, State}, extract::{Path, State},
http::StatusCode, http::StatusCode,
}; };
use futures::TryStreamExt;
use log::{error, info}; use log::{error, info};
use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness}; use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness};
use rayhunter::diag::{DataType, MessagesContainer}; use rayhunter::diag::{DiagParsingError, Message, MessagesContainer};
use serde::Serialize; use serde::Serialize;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::io::{AsyncWriteExt, BufWriter};
@@ -46,7 +45,7 @@ impl AnalysisWriter {
// Runs the analysis harness on the given container, serializing the results // Runs the analysis harness on the given container, serializing the results
// to the analysis file, returning the whether any warnings were detected // to the analysis file, returning the whether any warnings were detected
pub async fn analyze( pub async fn analyze_container(
&mut self, &mut self,
container: MessagesContainer, container: MessagesContainer,
) -> Result<EventType, std::io::Error> { ) -> Result<EventType, std::io::Error> {
@@ -61,6 +60,17 @@ impl AnalysisWriter {
Ok(max_type) Ok(max_type)
} }
pub async fn analyze_message(
&mut self,
maybe_qmdl_msg: Result<Message, DiagParsingError>,
) -> Result<EventType, std::io::Error> {
let row = self.harness.analyze_qmdl_message(maybe_qmdl_msg);
if !row.is_empty() {
self.write(&row).await?;
}
Ok(row.get_max_event_type())
}
async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> { async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
let mut value_str = serde_json::to_string(value).unwrap(); let mut value_str = serde_json::to_string(value).unwrap();
value_str.push('\n'); value_str.push('\n');
@@ -134,7 +144,7 @@ async fn perform_analysis(
analyzer_config: &AnalyzerConfig, analyzer_config: &AnalyzerConfig,
) -> Result<(), String> { ) -> Result<(), String> {
info!("Opening QMDL and analysis file for {name}..."); info!("Opening QMDL and analysis file for {name}...");
let (analysis_file, qmdl_reader) = { let (analysis_file, mut qmdl_reader) = {
let mut qmdl_store = qmdl_store_lock.write().await; let mut qmdl_store = qmdl_store_lock.write().await;
let (entry_index, _) = qmdl_store let (entry_index, _) = qmdl_store
.entry_for_name(name) .entry_for_name(name)
@@ -154,20 +164,15 @@ async fn perform_analysis(
let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config) let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
let mut qmdl_stream = pin::pin!(
qmdl_reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace))
);
info!("Starting analysis for {name}..."); info!("Starting analysis for {name}...");
while let Some(container) = qmdl_stream while let Some(maybe_message) = qmdl_reader
.try_next() .get_next_message()
.await .await
.expect("failed getting QMDL container") .expect("failed to get message")
{ {
let _ = analysis_writer let _ = analysis_writer
.analyze(container) .analyze_message(maybe_message)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
} }
+1 -1
View File
@@ -342,7 +342,7 @@ impl DiagTask {
debug!("done!"); debug!("done!");
let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum(); let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum();
self.bytes_since_space_check += container_bytes; self.bytes_since_space_check += container_bytes;
let max_type = match analysis_writer.analyze(container).await { let max_type = match analysis_writer.analyze_container(container).await {
Ok(t) => t, Ok(t) => t,
Err(e) => { Err(e) => {
warn!("failed to analyze container: {e}"); warn!("failed to analyze container: {e}");
+11 -18
View File
@@ -7,12 +7,11 @@ use axum::http::StatusCode;
use axum::http::header::CONTENT_TYPE; use axum::http::header::CONTENT_TYPE;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use log::error; use log::error;
use rayhunter::diag::DataType;
use rayhunter::gsmtap_parser; use rayhunter::gsmtap_parser;
use rayhunter::pcap::GsmtapPcapWriter; use rayhunter::pcap::GsmtapPcapWriter;
use rayhunter::qmdl::QmdlReader; use rayhunter::qmdl::QmdlReader;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, duplex}; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, duplex};
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data // Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
@@ -71,28 +70,22 @@ pub async fn get_pcap(
pub async fn generate_pcap_data<R, W>(writer: W, mut reader: QmdlReader<R>) -> Result<(), Error> pub async fn generate_pcap_data<R, W>(writer: W, mut reader: QmdlReader<R>) -> Result<(), Error>
where where
W: AsyncWrite + Unpin + Send, W: AsyncWrite + Unpin + Send,
R: AsyncRead + Unpin, R: AsyncRead + AsyncSeek + Unpin,
{ {
let mut pcap_writer = GsmtapPcapWriter::new(writer).await?; let mut pcap_writer = GsmtapPcapWriter::new(writer).await?;
pcap_writer.write_iface_header().await?; pcap_writer.write_iface_header().await?;
while let Some(container) = reader.get_next_messages_container().await? { while let Some(maybe_msg) = reader.get_next_message().await? {
if container.data_type != DataType::UserSpace { match maybe_msg {
continue; Ok(msg) => {
} let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
for maybe_msg in container.into_messages() { pcap_writer
match maybe_msg { .write_gsmtap_message(gsmtap_msg, timestamp)
Ok(msg) => { .await?;
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
pcap_writer
.write_gsmtap_message(gsmtap_msg, timestamp)
.await?;
}
} }
Err(e) => error!("error parsing message: {e:?}"),
} }
Err(e) => error!("error parsing message: {e:?}"),
} }
} }
+3 -5
View File
@@ -288,11 +288,9 @@ impl RecordingStore {
let file = File::open(entry.get_qmdl_filepath(&self.path)) let file = File::open(entry.get_qmdl_filepath(&self.path))
.await .await
.map_err(RecordingStoreError::ReadFileError)?; .map_err(RecordingStoreError::ReadFileError)?;
Ok(QmdlReader::new( QmdlReader::new(file)
file, .await
entry.compressed, .map_err(RecordingStoreError::ReadFileError)
Some(entry.uncompressed_qmdl_size_bytes),
))
} }
// Returns the corresponding QMDL file for a given entry // Returns the corresponding QMDL file for a given entry
+89 -23
View File
@@ -82,7 +82,7 @@ pub async fn get_qmdl(
&entry.uncompressed_qmdl_size_bytes.to_string(), &entry.uncompressed_qmdl_size_bytes.to_string(),
), ),
]; ];
let body = Body::from_stream(qmdl_reader.as_stream()); let body = Body::from_stream(qmdl_reader.as_qmdl_stream());
Ok((headers, body).into_response()) Ok((headers, body).into_response())
} }
@@ -310,7 +310,7 @@ pub async fn get_zip(
Path(entry_name): Path<String>, Path(entry_name): Path<String>,
) -> Result<Response, (StatusCode, String)> { ) -> Result<Response, (StatusCode, String)> {
let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned(); let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned();
let (entry_index, compressed) = { let (entry_index, _) = {
let qmdl_store = state.qmdl_store_lock.read().await; let qmdl_store = state.qmdl_store_lock.read().await;
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or(( let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or((
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
@@ -338,7 +338,7 @@ pub async fn get_zip(
// Add QMDL file // Add QMDL file
{ {
let entry = ZipEntryBuilder::new( let entry = ZipEntryBuilder::new(
format!("{qmdl_idx}.qmdl").into(), format!("{qmdl_idx}.qmdl.gz").into(),
Compression::Stored, Compression::Stored,
); );
// FuturesAsyncWriteCompatExt::compat_write because async-zip's // FuturesAsyncWriteCompatExt::compat_write because async-zip's
@@ -425,9 +425,13 @@ pub async fn debug_set_display_state(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io::Cursor;
use super::*; use super::*;
use async_zip::base::read::mem::ZipFileReader; use async_zip::base::read::mem::ZipFileReader;
use axum::extract::{Path, State}; use axum::extract::{Path, State};
use futures::AsyncReadExt;
use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlReader, QmdlWriter}};
use tempfile::TempDir; use tempfile::TempDir;
async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) { async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
@@ -441,24 +445,23 @@ mod tests {
async fn create_test_entry_with_data( async fn create_test_entry_with_data(
store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>, store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>,
test_data: &[u8], test_data: &MessagesContainer,
) -> String { ) -> String {
let entry_name = { let entry_name = {
let mut store = store_lock.write().await; let mut store = store_lock.write().await;
let (mut qmdl_file, _analysis_file) = store.new_entry().await.unwrap(); let (qmdl_gz_file, _analysis_file) = store.new_entry().await.unwrap();
if !test_data.is_empty() { let mut writer = QmdlWriter::new(qmdl_gz_file);
use tokio::io::AsyncWriteExt; writer.write_container(test_data).await.unwrap();
qmdl_file.write_all(test_data).await.unwrap(); let test_data_len = writer.total_uncompressed_bytes;
qmdl_file.flush().await.unwrap(); writer.close().await.unwrap();
}
let current_entry = store.current_entry.unwrap(); let current_entry = store.current_entry.unwrap();
let entry = &store.manifest.entries[current_entry]; let entry = &store.manifest.entries[current_entry];
let entry_name = entry.name.clone(); let entry_name = entry.name.clone();
store store
.update_entry_qmdl_size(current_entry, test_data.len()) .update_entry_qmdl_size(current_entry, test_data_len)
.await .await
.unwrap(); .unwrap();
entry_name entry_name
@@ -492,17 +495,69 @@ mod tests {
}) })
} }
// valid HDLC encapsulated diag message generated from
// rayhunter::diag::test::get_test_message
fn create_test_container() -> MessagesContainer {
MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![
HdlcEncapsulatedMessage {
len: 39,
data: vec![
16,
0,
32,
0,
32,
0,
192,
176,
26,
165,
245,
135,
118,
35,
2,
1,
20,
14,
48,
0,
160,
0,
2,
8,
0,
0,
217,
15,
5,
0,
0,
0,
0,
1,
0,
10,
13,
196,
126,
],
},
],
}
}
#[tokio::test] #[tokio::test]
async fn test_get_zip_success() { async fn test_get_zip_success() {
let (_temp_dir, store_lock) = create_test_qmdl_store().await; let (_temp_dir, store_lock) = create_test_qmdl_store().await;
let test_qmdl_data = vec![0x7E, 0x00, 0x00, 0x00, 0x10, 0x00, 0x7E]; let test_qmdl_data = create_test_container();
let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await; let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await;
let state = create_test_server_state(store_lock); let state = create_test_server_state(store_lock);
let result = get_zip(State(state), Path(entry_name.clone())).await; let response = get_zip(State(state), Path(entry_name.clone())).await.unwrap();
assert!(result.is_ok());
let response = result.unwrap();
let headers = response.headers(); let headers = response.headers();
assert_eq!(headers.get("content-type").unwrap(), "application/zip"); assert_eq!(headers.get("content-type").unwrap(), "application/zip");
@@ -511,14 +566,11 @@ mod tests {
let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap(); let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap();
let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap(); let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap();
let zip_reader_file = zip_reader.file();
let filenames = zip_reader let filenames: Vec<String> = zip_reader_file.entries()
.file()
.entries()
.iter() .iter()
.map(|entry| entry.filename().as_str().unwrap().to_owned()) .map(|entry| entry.filename().as_str().unwrap().to_string())
.collect::<Vec<String>>(); .collect();
assert_eq!( assert_eq!(
filenames, filenames,
vec![ vec![
@@ -526,5 +578,19 @@ mod tests {
format!("{entry_name}.pcapng"), format!("{entry_name}.pcapng"),
] ]
); );
let mut qmdl_body = Vec::with_capacity(128);
zip_reader.reader_without_entry(0)
.await
.unwrap()
.read_to_end(&mut qmdl_body)
.await
.unwrap();
let mut qmdl_reader = QmdlReader::new(Cursor::new(qmdl_body)).await.unwrap();
let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap();
assert_eq!(
qmdl_reader.get_next_message().await.unwrap(),
Some(Ok(expected_message)),
);
} }
} }
+49 -43
View File
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use std::borrow::Cow; use std::borrow::Cow;
use crate::analysis::diagnostic::DiagnosticAnalyzer; use crate::analysis::diagnostic::DiagnosticAnalyzer;
use crate::diag::{DiagParsingError, Message};
use crate::gsmtap::{GsmtapHeader, GsmtapMessage, GsmtapType}; use crate::gsmtap::{GsmtapHeader, GsmtapMessage, GsmtapType};
use crate::util::RuntimeMetadata; use crate::util::RuntimeMetadata;
use crate::{diag::MessagesContainer, gsmtap_parser}; use crate::{diag::MessagesContainer, gsmtap_parser};
@@ -231,6 +232,14 @@ pub struct AnalysisRow {
} }
impl AnalysisRow { impl AnalysisRow {
pub fn new() -> Self {
Self {
packet_timestamp: None,
skipped_message_reason: None,
events: vec![],
}
}
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.skipped_message_reason.is_none() && !self.contains_warnings() self.skipped_message_reason.is_none() && !self.contains_warnings()
} }
@@ -412,50 +421,47 @@ impl Harness {
row row
} }
pub fn analyze_qmdl_message(&mut self, maybe_qmdl_message: Result<Message, DiagParsingError>) -> AnalysisRow {
let mut row = AnalysisRow::new();
self.packet_num += 1;
let qmdl_message = match maybe_qmdl_message {
Ok(msg) => msg,
Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}"));
return row;
}
};
let gsmtap_message = match gsmtap_parser::parse(qmdl_message) {
Ok(msg) => msg,
Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}"));
return row;
}
};
let Some((timestamp, gsmtap_msg)) = gsmtap_message else {
return row;
};
row.packet_timestamp = Some(timestamp.to_datetime());
let element = match InformationElement::try_from(&gsmtap_msg) {
Ok(element) => element,
Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}"));
return row;
}
};
row.events = self.analyze_information_element(&element);
row
}
pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> { pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> {
let mut rows = Vec::new(); container.into_messages()
for maybe_qmdl_message in container.into_messages() { .drain(..)
self.packet_num += 1; .map(|maybe_message| self.analyze_qmdl_message(maybe_message))
.collect()
rows.push(AnalysisRow {
packet_timestamp: None,
skipped_message_reason: None,
events: Vec::new(),
});
// unwrap is safe here since we just pushed a value
let row = rows.last_mut().unwrap();
let qmdl_message = match maybe_qmdl_message {
Ok(msg) => msg,
Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}"));
continue;
}
};
let gsmtap_message = match gsmtap_parser::parse(qmdl_message) {
Ok(msg) => msg,
Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}"));
continue;
}
};
let Some((timestamp, gsmtap_msg)) = gsmtap_message else {
continue;
};
row.packet_timestamp = Some(timestamp.to_datetime());
let element = match InformationElement::try_from(&gsmtap_msg) {
Ok(element) => element,
Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}"));
continue;
}
};
row.events = self.analyze_information_element(&element);
}
rows
} }
fn analyze_information_element(&mut self, ie: &InformationElement) -> Vec<Option<Event>> { fn analyze_information_element(&mut self, ie: &InformationElement) -> Vec<Option<Event>> {
+28 -20
View File
@@ -90,24 +90,7 @@ impl MessagesContainer {
let mut result = Vec::new(); let mut result = Vec::new();
for msg in self.messages { for msg in self.messages {
for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) { for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) {
match hdlc_decapsulate(sub_msg, &CRC_CCITT) { result.push(Message::from_hdlc(sub_msg));
Ok(data) => match Message::from_bytes((&data, 0)) {
Ok(((leftover_bytes, _), res)) => {
if !leftover_bytes.is_empty() {
warn!(
"warning: {} leftover bytes when parsing Message",
leftover_bytes.len()
);
}
result.push(Ok(res));
}
Err(e) => result.push(Err(DiagParsingError::MessageParsingError(e, data))),
},
Err(err) => result.push(Err(DiagParsingError::HdlcDecapsulationError(
err,
sub_msg.to_vec(),
))),
}
} }
} }
result result
@@ -159,6 +142,29 @@ pub enum Message {
}, },
} }
impl Message {
pub fn from_hdlc(data: &[u8]) -> Result<Message, DiagParsingError> {
match hdlc_decapsulate(data, &CRC_CCITT) {
Ok(data) => match Message::from_bytes((&data, 0)) {
Ok(((leftover_bytes, _), res)) => {
if !leftover_bytes.is_empty() {
warn!(
"warning: {} leftover bytes when parsing Message",
leftover_bytes.len()
);
}
Ok(res)
}
Err(e) => Err(DiagParsingError::MessageParsingError(e, data)),
},
Err(err) => Err(DiagParsingError::HdlcDecapsulationError(
err,
data.to_vec(),
)),
}
}
}
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
#[deku(ctx = "log_type: u16, hdr_len: u16", id = "log_type")] #[deku(ctx = "log_type: u16, hdr_len: u16", id = "log_type")]
pub enum LogBody { pub enum LogBody {
@@ -418,7 +424,7 @@ pub fn build_log_mask_request(
} }
#[cfg(test)] #[cfg(test)]
mod test { pub(crate) mod test {
use super::*; use super::*;
// Just about all of these test cases from manually parsing diag packets w/ QCSuper // Just about all of these test cases from manually parsing diag packets w/ QCSuper
@@ -532,7 +538,7 @@ mod test {
// this log is based on one captured on a real device -- if it fails to // this log is based on one captured on a real device -- if it fails to
// serialize or deserialize, that's probably a problem with this mock, not // serialize or deserialize, that's probably a problem with this mock, not
// the DiagReader implementation // the DiagReader implementation
fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) { pub fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) {
let length_with_payload = 31 + payload.len() as u16; let length_with_payload = 31 + payload.len() as u16;
let message = Message::Log { let message = Message::Log {
pending_msgs: 0, pending_msgs: 0,
@@ -566,6 +572,8 @@ mod test {
len: encapsulated_data.len() as u32, len: encapsulated_data.len() as u32,
data: encapsulated_data, data: encapsulated_data,
}; };
// sanity check
assert_eq!(&Message::from_hdlc(&encapsulated.data).unwrap(), &message);
(encapsulated, message) (encapsulated, message)
} }
+157 -123
View File
@@ -3,17 +3,18 @@
//! QmdlReader and QmdlWriter can read and write MessagesContainers to and from //! QmdlReader and QmdlWriter can read and write MessagesContainers to and from
//! QMDL files. //! QMDL files.
use std::io::{Cursor, ErrorKind}; use std::io::ErrorKind;
use std::pin::Pin; use std::pin::Pin;
use std::task::Poll; use std::task::Poll;
use crate::diag::{DataType, HdlcEncapsulatedMessage, MESSAGE_TERMINATOR, MessagesContainer}; use crate::diag::{DiagParsingError, MESSAGE_TERMINATOR, Message, MessagesContainer};
use async_compression::tokio::bufread::GzipDecoder; use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::write::GzipEncoder; use async_compression::tokio::write::GzipEncoder;
use futures::TryStream; use futures::TryStream;
use log::error; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
const GZIP_MAGIC_NUMBER: u16 = 0x1f8b;
pub struct QmdlWriter<T> pub struct QmdlWriter<T>
where where
@@ -64,15 +65,13 @@ enum QmdlReaderSource<T> {
#[derive(Debug)] #[derive(Debug)]
struct QmdlAsyncReader<T> { struct QmdlAsyncReader<T> {
source: QmdlReaderSource<T>, source: QmdlReaderSource<T>,
uncompressed_bytes_read: usize,
max_uncompressed_bytes: Option<usize>,
} }
impl<T> QmdlAsyncReader<T> impl<T> QmdlAsyncReader<T>
where where
T: AsyncRead, T: AsyncRead,
{ {
pub fn new(reader: T, compressed: bool, max_uncompressed_bytes: Option<usize>) -> Self { pub fn new(reader: T, compressed: bool) -> Self {
let source = if compressed { let source = if compressed {
QmdlReaderSource::Compressed { QmdlReaderSource::Compressed {
reader: GzipDecoder::new(BufReader::new(reader)), reader: GzipDecoder::new(BufReader::new(reader)),
@@ -83,8 +82,6 @@ where
}; };
Self { Self {
source, source,
uncompressed_bytes_read: 0,
max_uncompressed_bytes,
} }
} }
} }
@@ -98,23 +95,7 @@ where
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>, buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
// if we've already read beyond the byte limit, return without reading let res = match &mut self.get_mut().source {
// into the buffer, essentially signalling EOF
if let Some(max_bytes) = self.max_uncompressed_bytes
&& self.uncompressed_bytes_read >= max_bytes
{
if self.uncompressed_bytes_read > max_bytes {
error!(
"warning: {} bytes read, but max_bytes was {}",
self.uncompressed_bytes_read, max_bytes
);
}
return Poll::Ready(Ok(()));
}
let before = buf.filled().len();
let this = self.get_mut();
let res = match &mut this.source {
QmdlReaderSource::Compressed { reader, eof } => { QmdlReaderSource::Compressed { reader, eof } => {
// if we already determined we've reached the Gzip EOF, don't read more // if we already determined we've reached the Gzip EOF, don't read more
if *eof { if *eof {
@@ -134,17 +115,6 @@ where
} }
QmdlReaderSource::Uncompressed { reader } => Pin::new(reader).poll_read(cx, buf), QmdlReaderSource::Uncompressed { reader } => Pin::new(reader).poll_read(cx, buf),
}; };
// if we read more bytes than is allowed, cap the buffer by
// our max bytes
let after = buf.filled().len();
let read = after - before;
if let Some(max_bytes) = this.max_uncompressed_bytes
&& this.uncompressed_bytes_read + read > max_bytes
{
let overread = this.uncompressed_bytes_read + read - max_bytes;
buf.set_filled(after - overread);
}
res res
} }
} }
@@ -157,34 +127,59 @@ where
buf_reader: BufReader<QmdlAsyncReader<T>>, buf_reader: BufReader<QmdlAsyncReader<T>>,
} }
async fn is_gzip_stream<T>(mut reader: T) -> std::io::Result<bool>
where
T: AsyncRead + AsyncSeek + Unpin
{
let magic_number = reader.read_u16().await?;
reader.rewind().await?;
// this is safe because 0x1f8b.... doesn't overlap with any known
// diag::DataType values
Ok(magic_number == GZIP_MAGIC_NUMBER)
}
impl<T> QmdlReader<T> impl<T> QmdlReader<T>
where where
T: AsyncRead + Unpin, T: AsyncRead + AsyncSeek + Unpin,
{ {
pub fn new(reader: T, compressed: bool, max_uncompressed_bytes: Option<usize>) -> Self { pub async fn new(mut reader: T) -> std::io::Result<Self> {
QmdlReader { let compressed = is_gzip_stream(&mut reader)
.await
.unwrap_or(false);
Ok(QmdlReader {
buf_reader: BufReader::new(QmdlAsyncReader::new( buf_reader: BufReader::new(QmdlAsyncReader::new(
reader, reader,
compressed, compressed,
max_uncompressed_bytes,
)), )),
} })
} }
pub fn as_stream(self) -> impl TryStream<Ok = MessagesContainer, Error = std::io::Error> { pub fn as_qmdl_stream(self) -> impl TryStream<Ok = Vec<u8>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async { futures::stream::try_unfold(self, |mut reader| async {
let maybe_container = reader.get_next_messages_container().await?; let mut buf = vec![];
match maybe_container { match reader .buf_reader
Some(container) => Ok(Some((container, reader))), .read_until(MESSAGE_TERMINATOR, &mut buf)
.await {
Err(err) => Err(err),
Ok(0) => Ok(None),
Ok(_) => Ok(Some((buf, reader))),
}
})
}
pub fn as_message_stream(self) -> impl TryStream<Ok = Result<Message, DiagParsingError>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async {
match reader.get_next_message().await? {
Some(res) => Ok(Some((res, reader))),
None => Ok(None), None => Ok(None),
} }
}) })
} }
pub async fn get_next_messages_container( pub async fn get_next_message(
&mut self, &mut self,
) -> Result<Option<MessagesContainer>, std::io::Error> { ) -> Result<Option<Result<Message, DiagParsingError>>, std::io::Error> {
let mut buf = Vec::new(); let mut buf = vec![];
if self if self
.buf_reader .buf_reader
.read_until(MESSAGE_TERMINATOR, &mut buf) .read_until(MESSAGE_TERMINATOR, &mut buf)
@@ -194,19 +189,7 @@ where
return Ok(None); return Ok(None);
} }
// Since QMDL is just a flat list of messages, we can't actually Ok(Some(Message::from_hdlc(&buf)))
// reproduce the container structure they came from in the original
// read. So we'll just pretend that all containers had exactly one
// message. As far as I know, the number of messages per container
// doesn't actually affect anything, so this should be fine.
Ok(Some(MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![HdlcEncapsulatedMessage {
len: buf.len() as u32,
data: buf,
}],
}))
} }
} }
@@ -227,92 +210,148 @@ where
mod test { mod test {
use std::io::Cursor; use std::io::Cursor;
use crate::diag::CRC_CCITT; use crate::diag::{DataType, HdlcEncapsulatedMessage, test::get_test_message};
use crate::hdlc::hdlc_encapsulate;
use super::*; use super::*;
fn get_test_messages() -> Vec<HdlcEncapsulatedMessage> { fn get_test_messages() -> (Vec<HdlcEncapsulatedMessage>, Vec<Message>) {
let messages: Vec<HdlcEncapsulatedMessage> = (10..20) let mut hdlcs = Vec::new();
.map(|i| { let mut messages = Vec::new();
let data = hdlc_encapsulate(&vec![i as u8; i], &CRC_CCITT); for i in 10..20 {
HdlcEncapsulatedMessage { let (hdlc, msg) = get_test_message(&[i]);
len: data.len() as u32, hdlcs.push(hdlc);
data, messages.push(msg);
} }
}) (hdlcs, messages)
.collect();
messages
} }
// returns a byte array consisting of concatenated HDLC encapsulated // returns a byte array consisting of concatenated HDLC encapsulated
// test messages // test messages
fn get_test_message_bytes() -> Vec<u8> { fn get_test_message_bytes() -> Vec<u8> {
get_test_messages() let (hdlcs, _) = get_test_messages();
hdlcs
.iter() .iter()
.flat_map(|msg| msg.data.clone()) .flat_map(|msg| msg.data.clone())
.collect() .collect()
} }
fn get_test_containers() -> Vec<MessagesContainer> { fn get_test_containers() -> Vec<MessagesContainer> {
let messages = get_test_messages(); let (hdlcs, _) = get_test_messages();
let (messages1, messages2) = messages.split_at(5); let (hdlcs1, hdlcs2) = hdlcs.split_at(5);
vec![ vec![
MessagesContainer { MessagesContainer {
data_type: DataType::UserSpace, data_type: DataType::UserSpace,
num_messages: messages1.len() as u32, num_messages: hdlcs1.len() as u32,
messages: messages1.to_vec(), messages: hdlcs1.to_vec(),
}, },
MessagesContainer { MessagesContainer {
data_type: DataType::UserSpace, data_type: DataType::UserSpace,
num_messages: messages2.len() as u32, num_messages: hdlcs2.len() as u32,
messages: messages2.to_vec(), messages: hdlcs2.to_vec(),
}, },
] ]
} }
#[tokio::test] #[tokio::test]
async fn test_unbounded_qmdl_reader() { async fn test_qmdl_reader() {
let mut buf = Cursor::new(get_test_message_bytes()); let mut buf = Cursor::new(get_test_message_bytes());
let mut reader = QmdlReader::new(&mut buf, false, None); let mut reader = QmdlReader::new(&mut buf).await.unwrap();
let expected_messages = get_test_messages(); let (_, expected_messages) = get_test_messages();
for message in expected_messages { for msg in expected_messages {
let expected_container = MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![message],
};
assert_eq!( assert_eq!(
expected_container, Ok(msg),
reader.get_next_messages_container().await.unwrap().unwrap() reader.get_next_message().await.unwrap().unwrap()
); );
} }
} }
#[tokio::test] #[tokio::test]
async fn test_bounded_qmdl_reader() { async fn test_truncation() {
let mut buf = Cursor::new(get_test_message_bytes()); run_truncation_tests(false).await;
}
// bound the reader to the first two messages #[tokio::test]
let mut expected_messages = get_test_messages(); async fn test_compressed_truncation() {
let limit = expected_messages[0].len + expected_messages[1].len; run_truncation_tests(true).await;
}
let mut reader = QmdlReader::new(&mut buf, false, Some(limit as usize)); async fn run_truncation_tests(compressed: bool) {
for message in expected_messages.drain(0..2) { let (hdlcs, expected_messages) = get_test_messages();
let expected_container = MessagesContainer { let (bytes, message_lengths): (Vec<u8>, Vec<usize>) = if compressed {
data_type: DataType::UserSpace, let mut buf = Vec::new();
num_messages: 1, let mut compressed_lengths = Vec::new();
messages: vec![message], let mut writer = GzipEncoder::new(&mut buf);
}; for hdlc in &hdlcs {
assert_eq!( let before = writer.get_ref().len();
expected_container, writer.write_all(&hdlc.data).await.unwrap();
reader.get_next_messages_container().await.unwrap().unwrap() writer.flush().await.unwrap();
); let after = writer.get_ref().len();
compressed_lengths.push(after - before);
}
(buf, compressed_lengths)
} else {
(
get_test_message_bytes(),
hdlcs.iter()
.map(|hdlc| hdlc.data.len())
.collect()
)
};
for truncated_hdlc_i in 1..hdlcs.len() - 1 {
let whole_bytes: usize = message_lengths.iter().take(truncated_hdlc_i).sum();
for truncated_byte in 1..message_lengths[truncated_hdlc_i] {
let mut truncated_bytes = Cursor::new(&bytes[0..whole_bytes + truncated_byte]);
let mut reader = QmdlReader::new(&mut truncated_bytes).await.unwrap();
for msg in expected_messages.iter().take(truncated_hdlc_i) {
assert_eq!(
Ok(msg),
reader.get_next_message().await.unwrap().unwrap().as_ref()
);
}
if compressed {
// for a compressed reader, we have a couple possible
// outcomes, depending on how far along the Gzip DEFLATE
// block was before it was truncated:
match reader.get_next_message().await.unwrap() {
// if the block was truncated early enough, the
// GzipDecoder will detect an unexpected EOF, and our
// QmdlReader will indicate the stream of messages is
// done
None => {},
// if it's further along, the expanded result will be an
// invalid HDLC block. if that's the case, make sure the
// QmdlReader indicates the stream of messages is over
// with afterwards
Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))) => {
assert!(matches!(reader.get_next_message().await, Ok(None)));
},
// if it's further along still, we may get a complete
// Message, so make sure it matches the next expected
// one. then, make sure we've hit the end of the message
// stream
Some(Ok(msg)) => {
assert_eq!(&msg, &expected_messages[truncated_hdlc_i]);
assert!(matches!(reader.get_next_message().await, Ok(None)));
},
// we should never be able to decapsulate the HDLC into
// an invalid Diag message
Some(Err(DiagParsingError::MessageParsingError(_, _)))
=> {
panic!("unexpected MessageParsingError");
}
}
} else {
// a truncated uncompressed reader should always end on an
// HdlcDecapsulationError, and then return Ok(None) to
// indicate the message stream is over
assert!(matches!(
reader.get_next_message().await,
Ok(Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))))
));
assert!(matches!(reader.get_next_message().await, Ok(None)));
}
}
} }
assert!(matches!(
reader.get_next_messages_container().await,
Ok(None)
));
} }
/// Writes the test containers to a QmdlWriter, optionally finishing the /// Writes the test containers to a QmdlWriter, optionally finishing the
@@ -330,21 +369,16 @@ mod test {
writer.close().await.unwrap(); writer.close().await.unwrap();
} }
} }
let mut reader = QmdlReader::new(Cursor::new(buf), true, None); let mut reader = QmdlReader::new(Cursor::new(buf)).await.unwrap();
let expected_messages = get_test_messages(); let (_, expected_messages) = get_test_messages();
for message in expected_messages { for message in expected_messages {
let expected_container = MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![message],
};
assert_eq!( assert_eq!(
expected_container, Ok(message),
reader.get_next_messages_container().await.unwrap().unwrap() reader.get_next_message().await.unwrap().unwrap()
); );
} }
assert!(matches!( assert!(matches!(
reader.get_next_messages_container().await, reader.get_next_message().await,
Ok(None) Ok(None)
)); ));
} }