Compare commits

...

4 Commits

Author SHA1 Message Date
Will Greenberg cedb94ad3c more clippy appeasement 2026-06-04 10:03:46 -07:00
Will Greenberg d8c7d9cd0b fix clippy 2026-06-03 21:12:28 -07:00
Will Greenberg b8fd4204db run cargo fmt 2026-06-03 20:19:44 -07:00
Will Greenberg 19d9b3967c Add support for compressed QMDL
Major changes:
* QmdlWriter now outputs gzipped QMDL files by default
* QmdlReader renamed to QmdlMessageReader, and reads both compressed and
  uncompressed QMDL. It no longer requires bounding to avoid reading
  partially written files.
2026-06-03 20:19:44 -07:00
13 changed files with 621 additions and 415 deletions
Generated
+25 -32
View File
@@ -276,13 +276,12 @@ dependencies = [
[[package]] [[package]]
name = "async-compression" name = "async-compression"
version = "0.4.33" version = "0.4.42"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93c1f86859c1af3d514fa19e8323147ff10ea98684e6c7b307912509f50e67b2" checksum = "e79b3f8a79cccc2898f31920fc69f304859b3bd567490f75ebf51ae1c792a9ac"
dependencies = [ dependencies = [
"compression-codecs", "compression-codecs",
"compression-core", "compression-core",
"futures-core",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
] ]
@@ -1007,9 +1006,9 @@ dependencies = [
[[package]] [[package]]
name = "compression-codecs" name = "compression-codecs"
version = "0.4.32" version = "0.4.38"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "680dc087785c5230f8e8843e2e57ac7c1c90488b6a91b88caa265410568f441b" checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf"
dependencies = [ dependencies = [
"compression-core", "compression-core",
"flate2", "flate2",
@@ -1906,9 +1905,9 @@ checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.1.1" version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c"
dependencies = [ dependencies = [
"crc32fast", "crc32fast",
"miniz_oxide", "miniz_oxide",
@@ -1997,9 +1996,9 @@ dependencies = [
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@@ -2012,9 +2011,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@@ -2022,15 +2021,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-task", "futures-task",
@@ -2039,9 +2038,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
[[package]] [[package]]
name = "futures-lite" name = "futures-lite"
@@ -2058,9 +2057,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -2069,21 +2068,21 @@ dependencies = [
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@@ -2093,7 +2092,6 @@ dependencies = [
"futures-task", "futures-task",
"memchr", "memchr",
"pin-project-lite", "pin-project-lite",
"pin-utils",
"slab", "slab",
] ]
@@ -4356,12 +4354,6 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "piper" name = "piper"
version = "0.2.4" version = "0.2.4"
@@ -4856,6 +4848,7 @@ checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539"
name = "rayhunter" name = "rayhunter"
version = "0.11.2" version = "0.11.2"
dependencies = [ dependencies = [
"async-compression",
"bytes", "bytes",
"chrono", "chrono",
"crc", "crc",
+18 -31
View File
@@ -1,15 +1,13 @@
use clap::Parser; use clap::Parser;
use futures::TryStreamExt;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use pcap_file_tokio::pcapng::{Block, PcapNgReader}; use pcap_file_tokio::pcapng::{Block, PcapNgReader};
use rayhunter::{ use rayhunter::{
analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness}, analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness},
diag::DataType,
gsmtap_parser, gsmtap_parser,
pcap::GsmtapPcapWriter, pcap::GsmtapPcapWriter,
qmdl::QmdlReader, qmdl::QmdlMessageReader,
}; };
use std::{collections::HashMap, future, path::PathBuf, pin::pin}; use std::{collections::HashMap, path::PathBuf};
use tokio::fs::File; use tokio::fs::File;
use walkdir::WalkDir; use walkdir::WalkDir;
@@ -113,26 +111,16 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) {
async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) { async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) {
let mut harness = Harness::new_with_config(&AnalyzerConfig::default()); let mut harness = Harness::new_with_config(&AnalyzerConfig::default());
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file"); let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
let file_size = qmdl_file let mut qmdl_reader = QmdlMessageReader::new(qmdl_file)
.metadata()
.await .await
.expect("failed to get QMDL file metadata") .expect("failed to open QmdlReader");
.len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin!(
qmdl_reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace))
);
let mut report = Report::new(qmdl_path); let mut report = Report::new(qmdl_path);
while let Some(container) = qmdl_stream while let Some(maybe_message) = qmdl_reader
.try_next() .get_next_message()
.await .await
.expect("failed getting QMDL container") .expect("failed to get message")
{ {
for row in harness.analyze_qmdl_messages(container) { report.process_row(harness.analyze_qmdl_message(maybe_message));
report.process_row(row);
}
} }
report.print_summary(show_skipped); report.print_summary(show_skipped);
} }
@@ -141,8 +129,9 @@ async fn pcapify(qmdl_path: &PathBuf) {
let qmdl_file = &mut File::open(&qmdl_path) let qmdl_file = &mut File::open(&qmdl_path)
.await .await
.expect("failed to open qmdl file"); .expect("failed to open qmdl file");
let qmdl_file_size = qmdl_file.metadata().await.unwrap().len(); let mut qmdl_reader = QmdlMessageReader::new(qmdl_file)
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_file_size as usize)); .await
.expect("failed to open QmdlReader");
let mut pcap_path = qmdl_path.clone(); let mut pcap_path = qmdl_path.clone();
pcap_path.set_extension("pcapng"); pcap_path.set_extension("pcapng");
let pcap_file = &mut File::create(&pcap_path) let pcap_file = &mut File::create(&pcap_path)
@@ -150,20 +139,20 @@ async fn pcapify(qmdl_path: &PathBuf) {
.expect("failed to open pcap file"); .expect("failed to open pcap file");
let mut pcap_writer = GsmtapPcapWriter::new(pcap_file).await.unwrap(); let mut pcap_writer = GsmtapPcapWriter::new(pcap_file).await.unwrap();
pcap_writer.write_iface_header().await.unwrap(); pcap_writer.write_iface_header().await.unwrap();
while let Some(container) = qmdl_reader while let Some(maybe_message) = qmdl_reader
.get_next_messages_container() .get_next_message()
.await .await
.expect("failed to get container") .expect("failed to get message")
{
if let Ok(msg) = maybe_message
&& let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg)
{ {
for msg in container.messages().into_iter().flatten() {
if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) {
pcap_writer pcap_writer
.write_gsmtap_message(parsed, timestamp, None) .write_gsmtap_message(parsed, timestamp, None)
.await .await
.expect("failed to write"); .expect("failed to write");
} }
} }
}
info!("wrote pcap to {:?}", &pcap_path); info!("wrote pcap to {:?}", &pcap_path);
} }
@@ -197,9 +186,7 @@ async fn main() {
let name_str = name.to_str().unwrap(); let name_str = name.to_str().unwrap();
let path = entry.path(); let path = entry.path();
let path_str = path.to_str().unwrap(); let path_str = path.to_str().unwrap();
// instead of relying on the QMDL extension, can we check if a file is if name_str.ends_with(".qmdl") || name_str.ends_with(".qmdl.gz") {
// QMDL by inspecting the contents?
if name_str.ends_with(".qmdl") {
info!("**** Beginning analysis of {name_str}"); info!("**** Beginning analysis of {name_str}");
analyze_qmdl(path_str, args.show_skipped).await; analyze_qmdl(path_str, args.show_skipped).await;
if args.pcapify { if args.pcapify {
+1 -1
View File
@@ -35,7 +35,7 @@ futures-macro = "0.3.30"
include_dir = "0.7.3" include_dir = "0.7.3"
chrono = { version = "0.4.31", features = ["serde"] } chrono = { version = "0.4.31", features = ["serde"] }
tokio-stream = { version = "0.1.14", default-features = false, features = ["io-util"] } tokio-stream = { version = "0.1.14", default-features = false, features = ["io-util"] }
futures = { version = "0.3.30", default-features = false } futures = { version = "0.3.32", default-features = false, features = ["std"] }
serde_json = "1.0.114" serde_json = "1.0.114"
image = { version = "0.25.1", default-features = false, features = ["png", "gif"] } image = { version = "0.25.1", default-features = false, features = ["png", "gif"] }
tempfile = "3.10.2" tempfile = "3.10.2"
+24 -22
View File
@@ -1,16 +1,15 @@
use std::cmp;
use std::sync::Arc; use std::sync::Arc;
use std::{cmp, future, pin};
use axum::Json; use axum::Json;
use axum::{ use axum::{
extract::{Path, State}, extract::{Path, State},
http::StatusCode, http::StatusCode,
}; };
use futures::TryStreamExt;
use log::{error, info}; use log::{error, info};
use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness}; use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness};
use rayhunter::diag::{DataType, MessagesContainer}; use rayhunter::diag::{DiagParsingError, Message, MessagesContainer};
use rayhunter::qmdl::QmdlReader; use rayhunter::qmdl::QmdlMessageReader;
use serde::Serialize; use serde::Serialize;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::io::{AsyncWriteExt, BufWriter};
@@ -47,7 +46,7 @@ impl AnalysisWriter {
// Runs the analysis harness on the given container, serializing the results // Runs the analysis harness on the given container, serializing the results
// to the analysis file, returning the whether any warnings were detected // to the analysis file, returning the whether any warnings were detected
pub async fn analyze( pub async fn analyze_container(
&mut self, &mut self,
container: MessagesContainer, container: MessagesContainer,
) -> Result<EventType, std::io::Error> { ) -> Result<EventType, std::io::Error> {
@@ -62,6 +61,17 @@ impl AnalysisWriter {
Ok(max_type) Ok(max_type)
} }
pub async fn analyze_message(
&mut self,
maybe_qmdl_msg: Result<Message, DiagParsingError>,
) -> Result<EventType, std::io::Error> {
let row = self.harness.analyze_qmdl_message(maybe_qmdl_msg);
if !row.is_empty() {
self.write(&row).await?;
}
Ok(row.get_max_event_type())
}
async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> { async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
let mut value_str = serde_json::to_string(value).unwrap(); let mut value_str = serde_json::to_string(value).unwrap();
value_str.push('\n'); value_str.push('\n');
@@ -135,7 +145,7 @@ async fn perform_analysis(
analyzer_config: &AnalyzerConfig, analyzer_config: &AnalyzerConfig,
) -> Result<(), String> { ) -> Result<(), String> {
info!("Opening QMDL and analysis file for {name}..."); info!("Opening QMDL and analysis file for {name}...");
let (analysis_file, qmdl_file) = { let (analysis_file, mut qmdl_reader) = {
let mut qmdl_store = qmdl_store_lock.write().await; let mut qmdl_store = qmdl_store_lock.write().await;
let (entry_index, _) = qmdl_store let (entry_index, _) = qmdl_store
.entry_for_name(name) .entry_for_name(name)
@@ -149,33 +159,25 @@ async fn perform_analysis(
.await .await
.map_err(|e| format!("{e:?}"))? .map_err(|e| format!("{e:?}"))?
.ok_or("QMDL file not found")?; .ok_or("QMDL file not found")?;
let qmdl_reader = QmdlMessageReader::new(qmdl_file)
.await
.map_err(|e| format!("{e:?}"))?;
(analysis_file, qmdl_file) (analysis_file, qmdl_reader)
}; };
let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config) let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
let file_size = qmdl_file
.metadata()
.await
.expect("failed to get QMDL file metadata")
.len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin::pin!(
qmdl_reader
.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace))
);
info!("Starting analysis for {name}..."); info!("Starting analysis for {name}...");
while let Some(container) = qmdl_stream while let Some(maybe_message) = qmdl_reader
.try_next() .get_next_message()
.await .await
.expect("failed getting QMDL container") .expect("failed to get message")
{ {
let _ = analysis_writer let _ = analysis_writer
.analyze(container) .analyze_message(maybe_message)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
} }
+26 -19
View File
@@ -74,7 +74,7 @@ pub struct DiagTask {
enum DiagState { enum DiagState {
Recording { Recording {
qmdl_writer: QmdlWriter<File>, qmdl_writer: Box<QmdlWriter<File>>,
analysis_writer: Box<AnalysisWriter>, analysis_writer: Box<AnalysisWriter>,
}, },
Stopped, Stopped,
@@ -158,7 +158,7 @@ impl DiagTask {
DiskSpaceCheck::Failed => {} DiskSpaceCheck::Failed => {}
} }
let (qmdl_file, analysis_file) = qmdl_store.new_entry(self.gps_mode).await?; let (qmdl_gz_file, analysis_file) = qmdl_store.new_entry(self.gps_mode).await?;
// For fixed-mode sessions, write the configured coordinates to the storage // For fixed-mode sessions, write the configured coordinates to the storage
// immediately so the per-session GPS is stored durably and isn't affected // immediately so the per-session GPS is stored durably and isn't affected
@@ -185,13 +185,11 @@ impl DiagTask {
.await .await
.map_err(RecordingStoreError::WriteFileError)?; .map_err(RecordingStoreError::WriteFileError)?;
} }
self.stop_current_recording().await; self.stop_current_recording().await;
let qmdl_writer = QmdlWriter::new(qmdl_file); let qmdl_writer = Box::new(QmdlWriter::new(qmdl_gz_file));
let analysis_writer = AnalysisWriter::new(analysis_file, &self.analyzer_config) let analysis_writer = AnalysisWriter::new(analysis_file, &self.analyzer_config)
.await .await
.map_err(RecordingStoreError::WriteFileError)?; .map_err(RecordingStoreError::WriteFileError)?;
self.state = DiagState::Recording { self.state = DiagState::Recording {
qmdl_writer, qmdl_writer,
analysis_writer: Box::new(analysis_writer), analysis_writer: Box::new(analysis_writer),
@@ -300,13 +298,23 @@ impl DiagTask {
let mut state = DiagState::Stopped; let mut state = DiagState::Stopped;
std::mem::swap(&mut self.state, &mut state); std::mem::swap(&mut self.state, &mut state);
if let DiagState::Recording { if let DiagState::Recording {
analysis_writer, .. qmdl_writer,
analysis_writer,
..
} = state } = state
{ {
analysis_writer match (qmdl_writer.close().await, analysis_writer.close().await) {
.close() (Ok(()), Ok(())) => {}
.await (qmdl_result, analysis_result) => {
.expect("failed to close analysis writer"); if let Err(err) = qmdl_result {
error!("failed to close QmdlWriter: {:?}", err);
}
if let Err(err) = analysis_result {
error!("failed to close AnalysisWriter: {:?}", err);
}
panic!();
}
}
} }
} }
@@ -374,23 +382,22 @@ impl DiagTask {
self.stop(qmdl_store, Some(reason)).await; self.stop(qmdl_store, Some(reason)).await;
return; return;
} }
if let Ok(file_size) = qmdl_writer.size().await {
debug!( debug!(
"total QMDL bytes written: {}, updating manifest...", "total QMDL bytes written: {}, updating manifest...",
qmdl_writer.total_written file_size
); );
let index = qmdl_store let index = qmdl_store.current_entry.expect(
.current_entry "DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???",
.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); );
if let Err(e) = qmdl_store if let Err(e) = qmdl_store.update_entry_qmdl_size(index, file_size).await {
.update_entry_qmdl_size(index, qmdl_writer.total_written)
.await
{
let reason = format!("failed to update manifest (disk full?): {e}"); let reason = format!("failed to update manifest (disk full?): {e}");
error!("{reason}"); error!("{reason}");
self.stop(qmdl_store, Some(reason)).await; self.stop(qmdl_store, Some(reason)).await;
return; return;
} }
debug!("done!"); debug!("done!");
}
// Extract the latest packet timestamp from this container // Extract the latest packet timestamp from this container
if let Some(ts) = container if let Some(ts) = container
@@ -407,7 +414,7 @@ impl DiagTask {
let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum(); let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum();
self.bytes_since_space_check += container_bytes; self.bytes_since_space_check += container_bytes;
let max_type = match analysis_writer.analyze(container).await { let max_type = match analysis_writer.analyze_container(container).await {
Ok(t) => t, Ok(t) => t,
Err(e) => { Err(e) => {
warn!("failed to analyze container: {e}"); warn!("failed to analyze container: {e}");
+9 -16
View File
@@ -10,12 +10,11 @@ use axum::http::StatusCode;
use axum::http::header::CONTENT_TYPE; use axum::http::header::CONTENT_TYPE;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use log::error; use log::error;
use rayhunter::diag::DataType;
use rayhunter::gsmtap_parser; use rayhunter::gsmtap_parser;
use rayhunter::pcap::{GpsPoint, GsmtapPcapWriter}; use rayhunter::pcap::{GpsPoint, GsmtapPcapWriter};
use rayhunter::qmdl::QmdlReader; use rayhunter::qmdl::QmdlMessageReader;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, duplex}; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, duplex};
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
#[cfg_attr(feature = "apidocs", utoipa::path( #[cfg_attr(feature = "apidocs", utoipa::path(
@@ -51,18 +50,20 @@ pub async fn get_pcap(
"QMDL file is empty, try again in a bit!".to_string(), "QMDL file is empty, try again in a bit!".to_string(),
)); ));
} }
let qmdl_size_bytes = entry.qmdl_size_bytes;
let qmdl_file = qmdl_store let qmdl_file = qmdl_store
.open_file(entry_index, FileKind::Qmdl) .open_file(entry_index, FileKind::Qmdl)
.await .await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))? .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?
.ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?; .ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?;
let qmdl_reader = QmdlMessageReader::new(qmdl_file)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
let (reader, writer) = duplex(1024); let (reader, writer) = duplex(1024);
let gps_records = load_gps_records_for_entry(&state, entry_index).await; let gps_records = load_gps_records_for_entry(&state, entry_index).await;
drop(qmdl_store); drop(qmdl_store);
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes, gps_records).await { if let Err(e) = generate_pcap_data(writer, qmdl_reader, gps_records).await {
error!("failed to generate PCAP: {e:?}"); error!("failed to generate PCAP: {e:?}");
} }
}); });
@@ -131,24 +132,17 @@ fn find_nearest_gps(records: &[GpsRecord], packet_timestamp: i64) -> Option<GpsP
pub async fn generate_pcap_data<R, W>( pub async fn generate_pcap_data<R, W>(
writer: W, writer: W,
qmdl_file: R, mut reader: QmdlMessageReader<R>,
qmdl_size_bytes: usize,
gps_records: Vec<GpsRecord>, gps_records: Vec<GpsRecord>,
) -> Result<(), Error> ) -> Result<(), Error>
where where
W: AsyncWrite + Unpin + Send, W: AsyncWrite + Unpin + Send,
R: AsyncRead + Unpin, R: AsyncRead + AsyncSeek + Unpin,
{ {
let mut pcap_writer = GsmtapPcapWriter::new(writer).await?; let mut pcap_writer = GsmtapPcapWriter::new(writer).await?;
pcap_writer.write_iface_header().await?; pcap_writer.write_iface_header().await?;
let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes)); while let Some(maybe_msg) = reader.get_next_message().await? {
while let Some(container) = reader.get_next_messages_container().await? {
if container.data_type != DataType::UserSpace {
continue;
}
for maybe_msg in container.messages() {
match maybe_msg { match maybe_msg {
Ok(msg) => { Ok(msg) => {
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?; let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
@@ -163,7 +157,6 @@ where
Err(e) => error!("error parsing message: {e:?}"), Err(e) => error!("error parsing message: {e:?}"),
} }
} }
}
Ok(()) Ok(())
} }
+32 -13
View File
@@ -55,16 +55,24 @@ impl FileKind {
// List of all possible physical files on disk. // List of all possible physical files on disk.
pub const ALL: &'static [FileKind] = &[FileKind::Qmdl, FileKind::Analysis, FileKind::Gps]; pub const ALL: &'static [FileKind] = &[FileKind::Qmdl, FileKind::Analysis, FileKind::Gps];
pub fn get_filename(&self, entry_name: &str) -> String { pub fn get_filename(&self, entry_name: &str, qmdl_compressed: bool) -> String {
match self { match self {
FileKind::Qmdl if qmdl_compressed => format!("{}.qmdl.gz", entry_name),
FileKind::Qmdl => format!("{}.qmdl", entry_name), FileKind::Qmdl => format!("{}.qmdl", entry_name),
FileKind::Analysis => format!("{}.ndjson", entry_name), FileKind::Analysis => format!("{}.ndjson", entry_name),
FileKind::Gps => format!("{}-gps.ndjson", entry_name), FileKind::Gps => format!("{}-gps.ndjson", entry_name),
} }
} }
pub fn get_filepath<P: AsRef<Path>>(&self, entry_name: &str, base_path: P) -> PathBuf { pub fn get_filepath<P: AsRef<Path>>(
base_path.as_ref().join(self.get_filename(entry_name)) &self,
entry_name: &str,
base_path: P,
qmdl_compressed: bool,
) -> PathBuf {
base_path
.as_ref()
.join(self.get_filename(entry_name, qmdl_compressed))
} }
} }
@@ -101,7 +109,6 @@ pub struct ManifestEntry {
/// The system time when the last message was recorded to the file /// The system time when the last message was recorded to the file
#[cfg_attr(feature = "apidocs", schema(value_type = String))] #[cfg_attr(feature = "apidocs", schema(value_type = String))]
pub last_message_time: Option<DateTime<Local>>, pub last_message_time: Option<DateTime<Local>>,
/// The size of the QMDL file in bytes
pub qmdl_size_bytes: usize, pub qmdl_size_bytes: usize,
/// The rayhunter daemon version which generated the file /// The rayhunter daemon version which generated the file
pub rayhunter_version: Option<String>, pub rayhunter_version: Option<String>,
@@ -116,6 +123,8 @@ pub struct ManifestEntry {
pub upload_time: Option<DateTime<Local>>, pub upload_time: Option<DateTime<Local>>,
#[serde(default)] #[serde(default)]
pub gps_mode: Option<GpsMode>, pub gps_mode: Option<GpsMode>,
#[serde(default)]
pub compressed: bool,
} }
impl ManifestEntry { impl ManifestEntry {
@@ -133,11 +142,12 @@ impl ManifestEntry {
stop_reason: None, stop_reason: None,
upload_time: None, upload_time: None,
gps_mode: Some(gps_mode), gps_mode: Some(gps_mode),
compressed: true,
} }
} }
pub fn get_filepath<P: AsRef<Path>>(&self, file_kind: FileKind, path: P) -> PathBuf { pub fn get_filepath<P: AsRef<Path>>(&self, file_kind: FileKind, path: P) -> PathBuf {
file_kind.get_filepath(&self.name, path) file_kind.get_filepath(&self.name, path, self.compressed)
} }
} }
@@ -196,8 +206,9 @@ impl RecordingStore {
} }
// Does a best-effort attempt to recover the manifest from a directory of // Does a best-effort attempt to recover the manifest from a directory of
// QMDL files. We expect these files to be named like "<timestamp>.qmdl", // QMDL files. We expect these files to be named like "<timestamp>.qmdl"
// and skip any files which don't match that pattern. // or "<timestamp>.qmdl.gz", and skip any files which don't match that
// pattern.
pub async fn recover<P>(path: P) -> Result<Self, RecordingStoreError> pub async fn recover<P>(path: P) -> Result<Self, RecordingStoreError>
where where
P: AsRef<Path>, P: AsRef<Path>,
@@ -217,11 +228,14 @@ impl RecordingStore {
continue; continue;
}; };
if !filename.ends_with(".qmdl") { let (stem, compressed) = if filename.ends_with(".qmdl") {
(filename.trim_end_matches(".qmdl"), false)
} else if filename.ends_with(".qmdl.gz") {
(filename.trim_end_matches(".qmdl.gz"), true)
} else {
continue; continue;
} };
let stem = filename.trim_end_matches(".qmdl");
let Ok(start_timestamp) = stem.parse::<i64>() else { let Ok(start_timestamp) = stem.parse::<i64>() else {
warn!("QMDL file has invalid name {os_filename:?}, skipping"); warn!("QMDL file has invalid name {os_filename:?}, skipping");
continue; continue;
@@ -248,6 +262,7 @@ impl RecordingStore {
info!("successfully recovered QMDL entry {os_filename:?}!"); info!("successfully recovered QMDL entry {os_filename:?}!");
manifest_entries.push(ManifestEntry { manifest_entries.push(ManifestEntry {
name: stem.to_string(), name: stem.to_string(),
compressed,
start_time: start_time.into(), start_time: start_time.into(),
last_message_time: Some(last_message_time.into()), last_message_time: Some(last_message_time.into()),
qmdl_size_bytes: metadata.size() as usize, qmdl_size_bytes: metadata.size() as usize,
@@ -322,7 +337,7 @@ impl RecordingStore {
file_kind: FileKind, file_kind: FileKind,
) -> Result<Option<File>, RecordingStoreError> { ) -> Result<Option<File>, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index]; let entry = &self.manifest.entries[entry_index];
let filepath = file_kind.get_filepath(&entry.name, &self.path); let filepath = file_kind.get_filepath(&entry.name, &self.path, entry.compressed);
match File::open(&filepath).await { match File::open(&filepath).await {
Ok(file) => Ok(Some(file)), Ok(file) => Ok(Some(file)),
@@ -496,7 +511,11 @@ impl RecordingStore {
self.write_manifest().await?; self.write_manifest().await?;
for &file_kind in FileKind::ALL { for &file_kind in FileKind::ALL {
let filepath = file_kind.get_filepath(&entry_to_delete.name, &self.path); let filepath = file_kind.get_filepath(
&entry_to_delete.name,
&self.path,
entry_to_delete.compressed,
);
remove_file_if_exists(&filepath) remove_file_if_exists(&filepath)
.await .await
.map_err(RecordingStoreError::DeleteFileError)?; .map_err(RecordingStoreError::DeleteFileError)?;
@@ -513,7 +532,7 @@ impl RecordingStore {
'entries: for entry in &self.manifest.entries { 'entries: for entry in &self.manifest.entries {
for &file_kind in FileKind::ALL { for &file_kind in FileKind::ALL {
let filepath = file_kind.get_filepath(&entry.name, &self.path); let filepath = file_kind.get_filepath(&entry.name, &self.path, entry.compressed);
if let Err(e) = remove_file_if_exists(&filepath).await { if let Err(e) = remove_file_if_exists(&filepath).await {
log::warn!("failed to remove {filepath:?}: {e:?}"); log::warn!("failed to remove {filepath:?}: {e:?}");
// Some error happened with deleting this entry, abort and go to the next one. // Some error happened with deleting this entry, abort and go to the next one.
+78 -39
View File
@@ -11,10 +11,13 @@ use axum::http::{HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use chrono::{DateTime, Local}; use chrono::{DateTime, Local};
use log::{error, warn}; use log::{error, warn};
use rayhunter::qmdl::QmdlMessageReader;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::fs::write; use tokio::fs::write;
use tokio::io::{AsyncReadExt, copy, duplex}; use tokio::io::AsyncReadExt;
use tokio::io::copy;
use tokio::io::duplex;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio_util::compat::FuturesAsyncWriteCompatExt; use tokio_util::compat::FuturesAsyncWriteCompatExt;
@@ -81,14 +84,18 @@ pub async fn get_qmdl(
) )
})? })?
.ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?; .ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?;
let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64); let qmdl_reader = QmdlMessageReader::new(qmdl_file).await.map_err(|err| {
let qmdl_stream = ReaderStream::new(limited_qmdl_file); (
StatusCode::INTERNAL_SERVER_ERROR,
format!("error reading QMDL file: {err}"),
)
})?;
let headers = [ let headers = [
(CONTENT_TYPE, "application/octet-stream"), (CONTENT_TYPE, "application/octet-stream"),
(CONTENT_LENGTH, &entry.qmdl_size_bytes.to_string()), (CONTENT_LENGTH, &entry.qmdl_size_bytes.to_string()),
]; ];
let body = Body::from_stream(qmdl_stream); let body = Body::from_stream(qmdl_reader.into_qmdl_stream());
Ok((headers, body).into_response()) Ok((headers, body).into_response())
} }
@@ -334,7 +341,7 @@ pub async fn get_zip(
Path(entry_name): Path<String>, Path(entry_name): Path<String>,
) -> Result<Response, (StatusCode, String)> { ) -> Result<Response, (StatusCode, String)> {
let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned(); let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned();
let (entry_index, qmdl_size_bytes) = { let (entry_index, compressed, qmdl_file_size) = {
let qmdl_store = state.qmdl_store_lock.read().await; let qmdl_store = state.qmdl_store_lock.read().await;
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or(( let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or((
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
@@ -348,7 +355,7 @@ pub async fn get_zip(
)); ));
} }
(entry_index, entry.qmdl_size_bytes) (entry_index, entry.compressed, entry.qmdl_size_bytes)
}; };
let qmdl_store_lock = state.qmdl_store_lock.clone(); let qmdl_store_lock = state.qmdl_store_lock.clone();
@@ -377,23 +384,22 @@ pub async fn get_zip(
continue; continue;
}; };
let entry = ZipEntryBuilder::new( let zip_entry = ZipEntryBuilder::new(
file_kind.get_filename(&qmdl_idx).into(), file_kind.get_filename(&qmdl_idx, compressed).into(),
Compression::Stored, Compression::Stored,
); );
// FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does // FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does
// not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed // not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed
// once https://github.com/Majored/rs-async-zip/pull/160 is released. // once https://github.com/Majored/rs-async-zip/pull/160 is released.
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write(); let mut entry_writer = zip.write_entry_stream(zip_entry).await?.compat_write();
// Truncating to qmdl_size_bytes is an attempt to ignore partial writes by the diag // Truncating to qmdl_size_bytes is an attempt to ignore partial writes by the diag
// thread. // thread.
if file_kind == FileKind::Qmdl { if file_kind == FileKind::Qmdl {
copy(&mut file.take(qmdl_size_bytes as u64), &mut entry_writer).await?; copy(&mut file.take(qmdl_file_size as u64), &mut entry_writer).await?;
} else { } else {
copy(&mut file, &mut entry_writer).await?; copy(&mut file, &mut entry_writer).await?;
} }
entry_writer.into_inner().close().await?; entry_writer.into_inner().close().await?;
} }
@@ -409,16 +415,11 @@ pub async fn get_zip(
.open_file(entry_index, FileKind::Qmdl) .open_file(entry_index, FileKind::Qmdl)
.await? .await?
.ok_or_else(|| anyhow::anyhow!("QMDL file not found"))? .ok_or_else(|| anyhow::anyhow!("QMDL file not found"))?
.take(qmdl_size_bytes as u64)
}; };
let qmdl_reader = QmdlMessageReader::new(qmdl_file_for_pcap).await?;
if let Err(e) = generate_pcap_data( if let Err(e) =
&mut entry_writer, generate_pcap_data(&mut entry_writer, qmdl_reader, gps_records).await
qmdl_file_for_pcap,
qmdl_size_bytes,
gps_records,
)
.await
{ {
// if we fail to generate the PCAP file, we should still continue and give the // if we fail to generate the PCAP file, we should still continue and give the
// user the QMDL. // user the QMDL.
@@ -532,10 +533,17 @@ pub async fn debug_set_display_state(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io::Cursor;
use super::*; use super::*;
use crate::config::GpsMode; use crate::config::GpsMode;
use async_zip::base::read::mem::ZipFileReader; use async_zip::base::read::mem::ZipFileReader;
use axum::extract::{Path, State}; use axum::extract::{Path, State};
use futures::AsyncReadExt;
use rayhunter::{
diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer},
qmdl::{QmdlMessageReader, QmdlWriter},
};
use tempfile::TempDir; use tempfile::TempDir;
async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) { async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
@@ -549,24 +557,25 @@ mod tests {
async fn create_test_entry_with_data( async fn create_test_entry_with_data(
store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>, store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>,
test_data: &[u8], test_data: &MessagesContainer,
) -> String { ) -> String {
let entry_name = { let entry_name = {
let mut store = store_lock.write().await; let mut store = store_lock.write().await;
let (mut qmdl_file, _analysis_file) = store.new_entry(GpsMode::Disabled).await.unwrap(); let (mut qmdl_gz_file, _analysis_file) =
store.new_entry(GpsMode::Disabled).await.unwrap();
if !test_data.is_empty() { let mut writer = QmdlWriter::new(&mut qmdl_gz_file);
use tokio::io::AsyncWriteExt; writer.write_container(test_data).await.unwrap();
qmdl_file.write_all(test_data).await.unwrap(); writer.close().await.unwrap();
qmdl_file.flush().await.unwrap();
} let qmdl_file_size = qmdl_gz_file.metadata().await.unwrap().len() as usize;
let current_entry = store.current_entry.unwrap(); let current_entry = store.current_entry.unwrap();
let entry = &store.manifest.entries[current_entry]; let entry = &store.manifest.entries[current_entry];
let entry_name = entry.name.clone(); let entry_name = entry.name.clone();
store store
.update_entry_qmdl_size(current_entry, test_data.len()) .update_entry_qmdl_size(current_entry, qmdl_file_size)
.await .await
.unwrap(); .unwrap();
entry_name entry_name
@@ -604,17 +613,32 @@ mod tests {
}) })
} }
// valid HDLC encapsulated diag message generated from
// rayhunter::diag::test::get_test_message
fn create_test_container() -> MessagesContainer {
MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![HdlcEncapsulatedMessage {
len: 39,
data: vec![
16, 0, 32, 0, 32, 0, 192, 176, 26, 165, 245, 135, 118, 35, 2, 1, 20, 14, 48, 0,
160, 0, 2, 8, 0, 0, 217, 15, 5, 0, 0, 0, 0, 1, 0, 10, 13, 196, 126,
],
}],
}
}
#[tokio::test] #[tokio::test]
async fn test_get_zip_success() { async fn test_get_zip_success() {
let (_temp_dir, store_lock) = create_test_qmdl_store().await; let (_temp_dir, store_lock) = create_test_qmdl_store().await;
let test_qmdl_data = vec![0x7E, 0x00, 0x00, 0x00, 0x10, 0x00, 0x7E]; let test_qmdl_data = create_test_container();
let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await; let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await;
let state = create_test_server_state(store_lock); let state = create_test_server_state(store_lock);
let result = get_zip(State(state), Path(entry_name.clone())).await; let response = get_zip(State(state), Path(entry_name.clone()))
.await
assert!(result.is_ok()); .unwrap();
let response = result.unwrap();
let headers = response.headers(); let headers = response.headers();
assert_eq!(headers.get("content-type").unwrap(), "application/zip"); assert_eq!(headers.get("content-type").unwrap(), "application/zip");
@@ -623,21 +647,36 @@ mod tests {
let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap(); let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap();
let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap(); let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap();
let zip_reader_file = zip_reader.file();
let filenames = zip_reader let filenames: Vec<String> = zip_reader_file
.file()
.entries() .entries()
.iter() .iter()
.map(|entry| entry.filename().as_str().unwrap().to_owned()) .map(|entry| entry.filename().as_str().unwrap().to_string())
.collect::<Vec<String>>(); .collect();
assert_eq!( assert_eq!(
filenames, filenames,
vec![ vec![
format!("{entry_name}.qmdl"), format!("{entry_name}.qmdl.gz"),
format!("{entry_name}-gps.ndjson"), format!("{entry_name}-gps.ndjson"),
format!("{entry_name}.pcapng"), format!("{entry_name}.pcapng"),
] ]
); );
let mut qmdl_body = Vec::with_capacity(128);
zip_reader
.reader_without_entry(0)
.await
.unwrap()
.read_to_end(&mut qmdl_body)
.await
.unwrap();
let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body))
.await
.unwrap();
let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap();
assert_eq!(
qmdl_reader.get_next_message().await.unwrap(),
Some(Ok(expected_message)),
);
} }
} }
+4 -3
View File
@@ -102,7 +102,8 @@ async fn try_upload_entry(
shutdown_token: CancellationToken, shutdown_token: CancellationToken,
) -> Option<()> { ) -> Option<()> {
let read_lock = store.read().await; let read_lock = store.read().await;
let entry_idx = read_lock.entry_for_name(&entry_name)?.0; let (entry_idx, entry) = read_lock.entry_for_name(&entry_name)?;
let compressed = entry.compressed;
let file = read_lock.open_file(entry_idx, file_kind).await; let file = read_lock.open_file(entry_idx, file_kind).await;
drop(read_lock); drop(read_lock);
@@ -118,7 +119,7 @@ async fn try_upload_entry(
} }
}; };
let file_name = file_kind.get_filename(&entry_name); let file_name = file_kind.get_filename(&entry_name, compressed);
let res = select! { let res = select! {
_ = shutdown_token.cancelled() => { _ = shutdown_token.cancelled() => {
@@ -331,7 +332,7 @@ mod tests {
let recorded = captured.lock().await; let recorded = captured.lock().await;
assert_eq!(recorded.len(), 3); assert_eq!(recorded.len(), 3);
let paths: Vec<&str> = recorded.iter().map(|r| r.path.as_str()).collect(); let paths: Vec<&str> = recorded.iter().map(|r| r.path.as_str()).collect();
let qmdl_path = format!("dav/{}.qmdl", entry_name); let qmdl_path = format!("dav/{}.qmdl.gz", entry_name);
let ndjson_path = format!("dav/{}.ndjson", entry_name); let ndjson_path = format!("dav/{}.ndjson", entry_name);
let gps_path = format!("dav/{}-gps.ndjson", entry_name); let gps_path = format!("dav/{}-gps.ndjson", entry_name);
assert!(paths.contains(&qmdl_path.as_str())); assert!(paths.contains(&qmdl_path.as_str()));
+1
View File
@@ -30,6 +30,7 @@ serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
num_enum = "0.7.4" num_enum = "0.7.4"
utoipa = { version = "5.4.0", optional = true } utoipa = { version = "5.4.0", optional = true }
async-compression = { version = "0.4.41", features = ["tokio", "gzip"] }
[dev-dependencies] [dev-dependencies]
tempfile = "3" tempfile = "3"
+23 -17
View File
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use std::borrow::Cow; use std::borrow::Cow;
use crate::analysis::diagnostic::DiagnosticAnalyzer; use crate::analysis::diagnostic::DiagnosticAnalyzer;
use crate::diag::{DiagParsingError, Message};
use crate::gsmtap::{GsmtapHeader, GsmtapMessage, GsmtapType}; use crate::gsmtap::{GsmtapHeader, GsmtapMessage, GsmtapType};
use crate::util::RuntimeMetadata; use crate::util::RuntimeMetadata;
use crate::{diag::MessagesContainer, gsmtap_parser}; use crate::{diag::MessagesContainer, gsmtap_parser};
@@ -223,7 +224,7 @@ impl AnalysisLineNormalizer {
} }
} }
#[derive(Serialize, Debug)] #[derive(Serialize, Debug, Default)]
pub struct AnalysisRow { pub struct AnalysisRow {
pub packet_timestamp: Option<DateTime<FixedOffset>>, pub packet_timestamp: Option<DateTime<FixedOffset>>,
pub skipped_message_reason: Option<String>, pub skipped_message_reason: Option<String>,
@@ -231,6 +232,10 @@ pub struct AnalysisRow {
} }
impl AnalysisRow { impl AnalysisRow {
pub fn new() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.skipped_message_reason.is_none() && !self.contains_warnings() self.skipped_message_reason.is_none() && !self.contains_warnings()
} }
@@ -412,36 +417,30 @@ impl Harness {
row row
} }
pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> { pub fn analyze_qmdl_message(
let mut rows = Vec::new(); &mut self,
for maybe_qmdl_message in container.messages() { maybe_qmdl_message: Result<Message, DiagParsingError>,
) -> AnalysisRow {
let mut row = AnalysisRow::new();
self.packet_num += 1; self.packet_num += 1;
rows.push(AnalysisRow {
packet_timestamp: None,
skipped_message_reason: None,
events: Vec::new(),
});
// unwrap is safe here since we just pushed a value
let row = rows.last_mut().unwrap();
let qmdl_message = match maybe_qmdl_message { let qmdl_message = match maybe_qmdl_message {
Ok(msg) => msg, Ok(msg) => msg,
Err(err) => { Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}")); row.skipped_message_reason = Some(format!("{err:?}"));
continue; return row;
} }
}; };
let gsmtap_message = match gsmtap_parser::parse(qmdl_message) { let gsmtap_message = match gsmtap_parser::parse(qmdl_message) {
Ok(msg) => msg, Ok(msg) => msg,
Err(err) => { Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}")); row.skipped_message_reason = Some(format!("{err:?}"));
continue; return row;
} }
}; };
let Some((timestamp, gsmtap_msg)) = gsmtap_message else { let Some((timestamp, gsmtap_msg)) = gsmtap_message else {
continue; return row;
}; };
row.packet_timestamp = Some(timestamp.to_datetime()); row.packet_timestamp = Some(timestamp.to_datetime());
@@ -449,13 +448,20 @@ impl Harness {
Ok(element) => element, Ok(element) => element,
Err(err) => { Err(err) => {
row.skipped_message_reason = Some(format!("{err:?}")); row.skipped_message_reason = Some(format!("{err:?}"));
continue; return row;
} }
}; };
row.events = self.analyze_information_element(&element); row.events = self.analyze_information_element(&element);
row
} }
rows
pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> {
container
.messages()
.drain(..)
.map(|maybe_message| self.analyze_qmdl_message(maybe_message))
.collect()
} }
fn analyze_information_element(&mut self, ie: &InformationElement) -> Vec<Option<Event>> { fn analyze_information_element(&mut self, ie: &InformationElement) -> Vec<Option<Event>> {
+32 -20
View File
@@ -1,5 +1,6 @@
//! Diag protocol serialization/deserialization //! Diag protocol serialization/deserialization
use bytes::Bytes;
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use crc::{Algorithm, Crc}; use crc::{Algorithm, Crc};
use deku::prelude::*; use deku::prelude::*;
@@ -89,30 +90,19 @@ impl MessagesContainer {
let mut result = Vec::new(); let mut result = Vec::new();
for msg in &self.messages { for msg in &self.messages {
for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) { for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) {
match hdlc_decapsulate(sub_msg, &CRC_CCITT) { result.push(Message::from_hdlc(sub_msg));
Ok(data) => match Message::from_bytes((&data, 0)) {
Ok(((leftover_bytes, _), res)) => {
if !leftover_bytes.is_empty() {
warn!(
"warning: {} leftover bytes when parsing Message",
leftover_bytes.len()
);
}
result.push(Ok(res));
}
Err(e) => result.push(Err(DiagParsingError::MessageParsingError(e, data))),
},
Err(err) => result.push(Err(DiagParsingError::HdlcDecapsulationError(
err,
sub_msg.to_vec(),
))),
}
} }
} }
result result
} }
} }
impl From<MessagesContainer> for Bytes {
fn from(value: MessagesContainer) -> Self {
value.to_bytes().unwrap().into()
}
}
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
pub struct HdlcEncapsulatedMessage { pub struct HdlcEncapsulatedMessage {
pub len: u32, pub len: u32,
@@ -152,6 +142,26 @@ pub enum Message {
}, },
} }
impl Message {
pub fn from_hdlc(data: &[u8]) -> Result<Message, DiagParsingError> {
match hdlc_decapsulate(data, &CRC_CCITT) {
Ok(data) => match Message::from_bytes((&data, 0)) {
Ok(((leftover_bytes, _), res)) => {
if !leftover_bytes.is_empty() {
warn!(
"warning: {} leftover bytes when parsing Message",
leftover_bytes.len()
);
}
Ok(res)
}
Err(e) => Err(DiagParsingError::MessageParsingError(e, data)),
},
Err(err) => Err(DiagParsingError::HdlcDecapsulationError(err, data.to_vec())),
}
}
}
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
#[deku(ctx = "log_type: u16, hdr_len: u16", id = "log_type")] #[deku(ctx = "log_type: u16, hdr_len: u16", id = "log_type")]
pub enum LogBody { pub enum LogBody {
@@ -411,7 +421,7 @@ pub fn build_log_mask_request(
} }
#[cfg(test)] #[cfg(test)]
mod test { pub(crate) mod test {
use super::*; use super::*;
// Just about all of these test cases from manually parsing diag packets w/ QCSuper // Just about all of these test cases from manually parsing diag packets w/ QCSuper
@@ -525,7 +535,7 @@ mod test {
// this log is based on one captured on a real device -- if it fails to // this log is based on one captured on a real device -- if it fails to
// serialize or deserialize, that's probably a problem with this mock, not // serialize or deserialize, that's probably a problem with this mock, not
// the DiagReader implementation // the DiagReader implementation
fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) { pub fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) {
let length_with_payload = 31 + payload.len() as u16; let length_with_payload = 31 + payload.len() as u16;
let message = Message::Log { let message = Message::Log {
pending_msgs: 0, pending_msgs: 0,
@@ -559,6 +569,8 @@ mod test {
len: encapsulated_data.len() as u32, len: encapsulated_data.len() as u32,
data: encapsulated_data, data: encapsulated_data,
}; };
// sanity check
assert_eq!(&Message::from_hdlc(&encapsulated.data).unwrap(), &message);
(encapsulated, message) (encapsulated, message)
} }
+293 -147
View File
@@ -3,109 +3,214 @@
//! QmdlReader and QmdlWriter can read and write MessagesContainers to and from //! QmdlReader and QmdlWriter can read and write MessagesContainers to and from
//! QMDL files. //! QMDL files.
use crate::diag::{DataType, HdlcEncapsulatedMessage, MESSAGE_TERMINATOR, MessagesContainer}; use std::io::ErrorKind;
use std::pin::Pin;
use std::task::Poll;
use crate::diag::{DiagParsingError, MESSAGE_TERMINATOR, Message, MessagesContainer};
use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::write::GzipEncoder;
use futures::TryStream; use futures::TryStream;
use log::error; use tokio::io::{
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt,
BufReader,
};
const GZIP_MAGIC_NUMBER: u16 = 0x1f8b;
pub struct QmdlWriter<T> pub struct QmdlWriter<T>
where where
T: AsyncWrite + Unpin, T: AsyncWrite + Unpin,
{ {
writer: T, writer: GzipEncoder<T>,
pub total_written: usize,
} }
impl<T> QmdlWriter<T> impl<T> QmdlWriter<T>
where where
T: AsyncWrite + Unpin, T: AsyncWrite + AsyncSeek + Unpin,
{ {
pub fn new(writer: T) -> Self { pub fn new(writer: T) -> Self {
QmdlWriter::new_with_existing_size(writer, 0) let gzip_writer = GzipEncoder::new(writer);
QmdlWriter {
writer: gzip_writer,
}
} }
pub fn new_with_existing_size(writer: T, existing_size: usize) -> Self { pub async fn size(&mut self) -> std::io::Result<usize> {
QmdlWriter { let size = self.writer.get_mut().stream_position().await?;
writer, Ok(size as usize)
total_written: existing_size,
}
} }
pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> { pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> {
for msg in &container.messages { for msg in &container.messages {
self.writer.write_all(&msg.data).await?; self.writer.write_all(&msg.data).await?;
self.total_written += msg.data.len(); self.writer.flush().await?;
} }
Ok(()) Ok(())
} }
pub async fn close(mut self) -> std::io::Result<()> {
self.writer.shutdown().await?;
Ok(())
}
} }
pub struct QmdlReader<T> #[derive(Debug)]
enum QmdlReaderSource<T> {
Compressed {
reader: GzipDecoder<BufReader<T>>,
eof: bool,
},
Uncompressed {
reader: T,
},
}
#[derive(Debug)]
struct QmdlAsyncReader<T> {
source: QmdlReaderSource<T>,
}
impl<T> QmdlAsyncReader<T>
where where
T: AsyncRead, T: AsyncRead,
{ {
reader: BufReader<T>, pub fn new(reader: T, compressed: bool) -> Self {
bytes_read: usize, let source = if compressed {
max_bytes: Option<usize>, QmdlReaderSource::Compressed {
reader: GzipDecoder::new(BufReader::new(reader)),
eof: false,
}
} else {
QmdlReaderSource::Uncompressed { reader }
};
Self { source }
}
} }
impl<T> QmdlReader<T> impl<T> AsyncRead for QmdlAsyncReader<T>
where where
T: AsyncRead + Unpin, T: AsyncRead + Unpin,
{ {
pub fn new(reader: T, max_bytes: Option<usize>) -> Self { fn poll_read(
QmdlReader { self: Pin<&mut Self>,
reader: BufReader::new(reader), cx: &mut std::task::Context<'_>,
bytes_read: 0, buf: &mut tokio::io::ReadBuf<'_>,
max_bytes, ) -> Poll<std::io::Result<()>> {
match &mut self.get_mut().source {
QmdlReaderSource::Compressed { reader, eof } => {
// if we already determined we've reached the Gzip EOF, don't read more
if *eof {
return Poll::Ready(Ok(()));
}
match Pin::new(reader).poll_read(cx, buf) {
// if we hit an unexpected EOF in a Gzip file, it shouldn't
// be considered fatal, just a truncated file. mark that
// we're done and return the result as usual
Poll::Ready(Err(err)) if err.kind() == ErrorKind::UnexpectedEof => {
*eof = true;
Poll::Ready(Ok(()))
}
res => res,
}
}
QmdlReaderSource::Uncompressed { reader } => Pin::new(reader).poll_read(cx, buf),
}
} }
} }
pub fn as_stream( #[derive(Debug)]
&mut self, pub struct QmdlMessageReader<T>
) -> impl TryStream<Ok = MessagesContainer, Error = std::io::Error> + '_ { where
futures::stream::try_unfold(self, |reader| async { T: AsyncRead,
let maybe_container = reader.get_next_messages_container().await?; {
match maybe_container { buf_reader: BufReader<QmdlAsyncReader<T>>,
Some(container) => Ok(Some((container, reader))), }
async fn is_gzip_stream<T>(mut reader: T) -> std::io::Result<bool>
where
T: AsyncRead + AsyncSeek + Unpin,
{
let magic_number = reader.read_u16().await?;
reader.rewind().await?;
// this is safe because 0x1f8b.... doesn't overlap with any known
// diag::DataType values
Ok(magic_number == GZIP_MAGIC_NUMBER)
}
impl<T> QmdlMessageReader<T>
where
T: AsyncRead + AsyncSeek + Unpin,
{
pub async fn new(mut reader: T) -> std::io::Result<Self> {
let compressed = is_gzip_stream(&mut reader).await.unwrap_or(false);
Ok(QmdlMessageReader {
buf_reader: BufReader::new(QmdlAsyncReader::new(reader, compressed)),
})
}
pub fn is_compressed(&self) -> bool {
matches!(
self.buf_reader.get_ref().source,
QmdlReaderSource::Compressed { .. }
)
}
pub fn into_qmdl_stream(self) -> impl TryStream<Ok = Vec<u8>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async {
let mut buf = vec![];
match reader
.buf_reader
.read_until(MESSAGE_TERMINATOR, &mut buf)
.await
{
Err(err) => Err(err),
Ok(0) => Ok(None),
Ok(_) => Ok(Some((buf, reader))),
}
})
}
pub fn into_message_stream(
self,
) -> impl TryStream<Ok = Result<Message, DiagParsingError>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async {
match reader.get_next_message().await? {
Some(res) => Ok(Some((res, reader))),
None => Ok(None), None => Ok(None),
} }
}) })
} }
pub async fn get_next_messages_container( pub async fn get_next_message(
&mut self, &mut self,
) -> Result<Option<MessagesContainer>, std::io::Error> { ) -> Result<Option<Result<Message, DiagParsingError>>, std::io::Error> {
if let Some(max_bytes) = self.max_bytes let mut buf = vec![];
&& self.bytes_read >= max_bytes if self
.buf_reader
.read_until(MESSAGE_TERMINATOR, &mut buf)
.await?
== 0
{ {
if self.bytes_read > max_bytes {
error!(
"warning: {} bytes read, but max_bytes was {}",
self.bytes_read, max_bytes
);
}
return Ok(None); return Ok(None);
} }
let mut buf = Vec::new(); Ok(Some(Message::from_hdlc(&buf)))
let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf).await?; }
self.bytes_read += bytes_read; }
// Since QMDL is just a flat list of messages, we can't actually impl<T> AsyncRead for QmdlMessageReader<T>
// reproduce the container structure they came from in the original where
// read. So we'll just pretend that all containers had exactly one T: AsyncRead + Unpin,
// message. As far as I know, the number of messages per container {
// doesn't actually affect anything, so this should be fine. fn poll_read(
Ok(Some(MessagesContainer { self: Pin<&mut Self>,
data_type: DataType::UserSpace, cx: &mut std::task::Context<'_>,
num_messages: 1, buf: &mut tokio::io::ReadBuf<'_>,
messages: vec![HdlcEncapsulatedMessage { ) -> Poll<std::io::Result<()>> {
len: bytes_read as u32, Pin::new(&mut self.get_mut().buf_reader).poll_read(cx, buf)
data: buf,
}],
}))
} }
} }
@@ -113,132 +218,173 @@ where
mod test { mod test {
use std::io::Cursor; use std::io::Cursor;
use crate::diag::CRC_CCITT; use crate::diag::{DataType, HdlcEncapsulatedMessage, test::get_test_message};
use crate::hdlc::hdlc_encapsulate;
use super::*; use super::*;
fn get_test_messages() -> Vec<HdlcEncapsulatedMessage> { fn get_test_messages() -> (Vec<HdlcEncapsulatedMessage>, Vec<Message>) {
let messages: Vec<HdlcEncapsulatedMessage> = (10..20) let mut hdlcs = Vec::new();
.map(|i| { let mut messages = Vec::new();
let data = hdlc_encapsulate(&vec![i as u8; i], &CRC_CCITT); for i in 10..20 {
HdlcEncapsulatedMessage { let (hdlc, msg) = get_test_message(&[i]);
len: data.len() as u32, hdlcs.push(hdlc);
data, messages.push(msg);
} }
}) (hdlcs, messages)
.collect();
messages
} }
// returns a byte array consisting of concatenated HDLC encapsulated // returns a byte array consisting of concatenated HDLC encapsulated
// test messages // test messages
fn get_test_message_bytes() -> Vec<u8> { fn get_test_message_bytes() -> Vec<u8> {
get_test_messages() let (hdlcs, _) = get_test_messages();
.iter() hdlcs.iter().flat_map(|msg| msg.data.clone()).collect()
.flat_map(|msg| msg.data.clone())
.collect()
} }
fn get_test_containers() -> Vec<MessagesContainer> { fn get_test_containers() -> Vec<MessagesContainer> {
let messages = get_test_messages(); let (hdlcs, _) = get_test_messages();
let (messages1, messages2) = messages.split_at(5); let (hdlcs1, hdlcs2) = hdlcs.split_at(5);
vec![ vec![
MessagesContainer { MessagesContainer {
data_type: DataType::UserSpace, data_type: DataType::UserSpace,
num_messages: messages1.len() as u32, num_messages: hdlcs1.len() as u32,
messages: messages1.to_vec(), messages: hdlcs1.to_vec(),
}, },
MessagesContainer { MessagesContainer {
data_type: DataType::UserSpace, data_type: DataType::UserSpace,
num_messages: messages2.len() as u32, num_messages: hdlcs2.len() as u32,
messages: messages2.to_vec(), messages: hdlcs2.to_vec(),
}, },
] ]
} }
#[tokio::test] #[tokio::test]
async fn test_unbounded_qmdl_reader() { async fn test_qmdl_reader() {
let mut buf = Cursor::new(get_test_message_bytes()); let mut buf = Cursor::new(get_test_message_bytes());
let mut reader = QmdlReader::new(&mut buf, None); let mut reader = QmdlMessageReader::new(&mut buf).await.unwrap();
let expected_messages = get_test_messages(); assert!(!reader.is_compressed());
for message in expected_messages { let (_, expected_messages) = get_test_messages();
let expected_container = MessagesContainer { for msg in expected_messages {
data_type: DataType::UserSpace, assert_eq!(Ok(msg), reader.get_next_message().await.unwrap().unwrap());
num_messages: 1,
messages: vec![message],
};
assert_eq!(
expected_container,
reader.get_next_messages_container().await.unwrap().unwrap()
);
} }
} }
#[tokio::test] #[tokio::test]
async fn test_bounded_qmdl_reader() { async fn test_truncation() {
let mut buf = Cursor::new(get_test_message_bytes()); run_truncation_tests(false).await;
// bound the reader to the first two messages
let mut expected_messages = get_test_messages();
let limit = expected_messages[0].len + expected_messages[1].len;
let mut reader = QmdlReader::new(&mut buf, Some(limit as usize));
for message in expected_messages.drain(0..2) {
let expected_container = MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![message],
};
assert_eq!(
expected_container,
reader.get_next_messages_container().await.unwrap().unwrap()
);
}
assert!(matches!(
reader.get_next_messages_container().await,
Ok(None)
));
} }
#[tokio::test] #[tokio::test]
async fn test_qmdl_writer() { async fn test_compressed_truncation() {
run_truncation_tests(true).await;
}
async fn run_truncation_tests(compressed: bool) {
let (hdlcs, expected_messages) = get_test_messages();
let (bytes, message_lengths): (Vec<u8>, Vec<usize>) = if compressed {
let mut buf = Vec::new(); let mut buf = Vec::new();
let mut writer = QmdlWriter::new(&mut buf); let mut compressed_lengths = Vec::new();
let expected_containers = get_test_containers(); let mut writer = GzipEncoder::new(&mut buf);
for container in &expected_containers { for hdlc in &hdlcs {
writer.write_container(container).await.unwrap(); let before = writer.get_ref().len();
writer.write_all(&hdlc.data).await.unwrap();
writer.flush().await.unwrap();
let after = writer.get_ref().len();
compressed_lengths.push(after - before);
} }
assert_eq!(writer.total_written, buf.len()); (buf, compressed_lengths)
assert_eq!(buf, get_test_message_bytes()); } else {
(
get_test_message_bytes(),
hdlcs.iter().map(|hdlc| hdlc.data.len()).collect(),
)
};
for truncated_hdlc_i in 1..hdlcs.len() - 1 {
let whole_bytes: usize = message_lengths.iter().take(truncated_hdlc_i).sum();
for truncated_byte in 1..message_lengths[truncated_hdlc_i] {
let mut truncated_bytes = Cursor::new(&bytes[0..whole_bytes + truncated_byte]);
let mut reader = QmdlMessageReader::new(&mut truncated_bytes).await.unwrap();
for msg in expected_messages.iter().take(truncated_hdlc_i) {
assert_eq!(
Ok(msg),
reader.get_next_message().await.unwrap().unwrap().as_ref()
);
}
if compressed {
// for a compressed reader, we have a couple possible
// outcomes, depending on how far along the Gzip DEFLATE
// block was before it was truncated:
match reader.get_next_message().await.unwrap() {
// if the block was truncated early enough, the
// GzipDecoder will detect an unexpected EOF, and our
// QmdlReader will indicate the stream of messages is
// done
None => {}
// if it's further along, the expanded result will be an
// invalid HDLC block. if that's the case, make sure the
// QmdlReader indicates the stream of messages is over
// with afterwards
Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))) => {
assert!(matches!(reader.get_next_message().await, Ok(None)));
}
// if it's further along still, we may get a complete
// Message, so make sure it matches the next expected
// one. then, make sure we've hit the end of the message
// stream
Some(Ok(msg)) => {
assert_eq!(&msg, &expected_messages[truncated_hdlc_i]);
assert!(matches!(reader.get_next_message().await, Ok(None)));
}
// we should never be able to decapsulate the HDLC into
// an invalid Diag message
Some(Err(DiagParsingError::MessageParsingError(_, _))) => {
panic!("unexpected MessageParsingError");
}
}
} else {
// a truncated uncompressed reader should always end on an
// HdlcDecapsulationError, and then return Ok(None) to
// indicate the message stream is over
assert!(matches!(
reader.get_next_message().await,
Ok(Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))))
));
assert!(matches!(reader.get_next_message().await, Ok(None)));
}
}
}
}
/// Writes the test containers to a QmdlWriter, optionally finishing the
/// gzip stream with a footer. Then, attempts to decompress the buffer with
/// a QmdlWriter, asserting that the containers match what's expected.
async fn run_compressed_reading_and_writing_tests(do_close: bool) {
let containers = get_test_containers();
let mut buf = Cursor::new(Vec::new());
{
let mut writer = QmdlWriter::new(&mut buf);
for container in &containers {
writer.write_container(&container).await.unwrap();
}
if do_close {
writer.close().await.unwrap();
}
}
buf.set_position(0);
let mut reader = QmdlMessageReader::new(buf).await.unwrap();
assert!(reader.is_compressed());
let (_, expected_messages) = get_test_messages();
for message in expected_messages {
assert_eq!(
Ok(message),
reader.get_next_message().await.unwrap().unwrap()
);
}
assert!(matches!(reader.get_next_message().await, Ok(None)));
} }
#[tokio::test] #[tokio::test]
async fn test_writing_and_reading() { async fn test_compressed_reading_and_writing() {
let mut buf = Vec::new(); run_compressed_reading_and_writing_tests(true).await;
let mut writer = QmdlWriter::new(&mut buf); run_compressed_reading_and_writing_tests(false).await;
let expected_containers = get_test_containers();
for container in &expected_containers {
writer.write_container(container).await.unwrap();
}
let limit = Some(buf.len());
let mut reader = QmdlReader::new(Cursor::new(&mut buf), limit);
let expected_messages = get_test_messages();
for message in expected_messages {
let expected_container = MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![message],
};
assert_eq!(
expected_container,
reader.get_next_messages_container().await.unwrap().unwrap()
);
}
assert!(matches!(
reader.get_next_messages_container().await,
Ok(None)
));
} }
} }