From 19d9b3967c80be2612e83309127d509b14ce25da Mon Sep 17 00:00:00 2001 From: Will Greenberg Date: Mon, 30 Mar 2026 15:56:03 -0700 Subject: [PATCH] Add support for compressed QMDL Major changes: * QmdlWriter now outputs gzipped QMDL files by default * QmdlReader renamed to QmdlMessageReader, and reads both compressed and uncompressed QMDL. It no longer requires bounding to avoid reading partially written files. --- Cargo.lock | 57 ++--- check/src/main.rs | 43 ++-- daemon/Cargo.toml | 2 +- daemon/src/analysis.rs | 46 ++-- daemon/src/diag.rs | 64 +++--- daemon/src/pcap.rs | 45 ++-- daemon/src/qmdl_store.rs | 39 ++-- daemon/src/server.rs | 149 +++++++++--- daemon/src/webdav.rs | 7 +- lib/Cargo.toml | 1 + lib/src/analysis/analyzer.rs | 92 ++++---- lib/src/diag.rs | 55 +++-- lib/src/qmdl.rs | 435 ++++++++++++++++++++++++----------- 13 files changed, 641 insertions(+), 394 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9c829c3..ad3be3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,13 +276,12 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.33" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93c1f86859c1af3d514fa19e8323147ff10ea98684e6c7b307912509f50e67b2" +checksum = "e79b3f8a79cccc2898f31920fc69f304859b3bd567490f75ebf51ae1c792a9ac" dependencies = [ "compression-codecs", "compression-core", - "futures-core", "pin-project-lite", "tokio", ] @@ -1007,9 +1006,9 @@ dependencies = [ [[package]] name = "compression-codecs" -version = "0.4.32" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "680dc087785c5230f8e8843e2e57ac7c1c90488b6a91b88caa265410568f441b" +checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf" dependencies = [ "compression-core", "flate2", @@ -1906,9 +1905,9 @@ checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" [[package]] name = "flate2" -version = "1.1.1" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" +checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" dependencies = [ "crc32fast", "miniz_oxide", @@ -1997,9 +1996,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" dependencies = [ "futures-channel", "futures-core", @@ -2012,9 +2011,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", "futures-sink", @@ -2022,15 +2021,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" [[package]] name = "futures-executor" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" dependencies = [ "futures-core", "futures-task", @@ -2039,9 +2038,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" [[package]] name = "futures-lite" @@ -2058,9 +2057,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", @@ -2069,21 +2068,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" [[package]] name = "futures-task" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" [[package]] name = "futures-util" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-channel", "futures-core", @@ -2093,7 +2092,6 @@ dependencies = [ "futures-task", "memchr", "pin-project-lite", - "pin-utils", "slab", ] @@ -4356,12 +4354,6 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "piper" version = "0.2.4" @@ -4856,6 +4848,7 @@ checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539" name = "rayhunter" version = "0.11.2" dependencies = [ + "async-compression", "bytes", "chrono", "crc", diff --git a/check/src/main.rs b/check/src/main.rs index 6165a39..940e3d8 100644 --- a/check/src/main.rs +++ b/check/src/main.rs @@ -1,15 +1,13 @@ use clap::Parser; -use futures::TryStreamExt; use log::{debug, error, info, warn}; use pcap_file_tokio::pcapng::{Block, PcapNgReader}; use rayhunter::{ analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness}, - diag::DataType, gsmtap_parser, pcap::GsmtapPcapWriter, - qmdl::QmdlReader, + qmdl::QmdlMessageReader, }; -use std::{collections::HashMap, future, path::PathBuf, pin::pin}; +use std::{collections::HashMap, path::PathBuf}; use tokio::fs::File; use walkdir::WalkDir; @@ -113,26 +111,14 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) { async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) { let mut harness = Harness::new_with_config(&AnalyzerConfig::default()); let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file"); - let file_size = qmdl_file - .metadata() - .await - .expect("failed to get QMDL file metadata") - .len(); - let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize)); - let mut qmdl_stream = pin!( - qmdl_reader - .as_stream() - .try_filter(|container| future::ready(container.data_type == DataType::UserSpace)) - ); + let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader"); let mut report = Report::new(qmdl_path); - while let Some(container) = qmdl_stream - .try_next() + while let Some(maybe_message) = qmdl_reader + .get_next_message() .await - .expect("failed getting QMDL container") + .expect("failed to get message") { - for row in harness.analyze_qmdl_messages(container) { - report.process_row(row); - } + report.process_row(harness.analyze_qmdl_message(maybe_message)); } report.print_summary(show_skipped); } @@ -141,8 +127,7 @@ async fn pcapify(qmdl_path: &PathBuf) { let qmdl_file = &mut File::open(&qmdl_path) .await .expect("failed to open qmdl file"); - let qmdl_file_size = qmdl_file.metadata().await.unwrap().len(); - let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_file_size as usize)); + let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader"); let mut pcap_path = qmdl_path.clone(); pcap_path.set_extension("pcapng"); let pcap_file = &mut File::create(&pcap_path) @@ -150,12 +135,12 @@ async fn pcapify(qmdl_path: &PathBuf) { .expect("failed to open pcap file"); let mut pcap_writer = GsmtapPcapWriter::new(pcap_file).await.unwrap(); pcap_writer.write_iface_header().await.unwrap(); - while let Some(container) = qmdl_reader - .get_next_messages_container() + while let Some(maybe_message) = qmdl_reader + .get_next_message() .await - .expect("failed to get container") + .expect("failed to get message") { - for msg in container.messages().into_iter().flatten() { + if let Ok(msg) = maybe_message { if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) { pcap_writer .write_gsmtap_message(parsed, timestamp, None) @@ -197,9 +182,7 @@ async fn main() { let name_str = name.to_str().unwrap(); let path = entry.path(); let path_str = path.to_str().unwrap(); - // instead of relying on the QMDL extension, can we check if a file is - // QMDL by inspecting the contents? - if name_str.ends_with(".qmdl") { + if name_str.ends_with(".qmdl") || name_str.ends_with(".qmdl.gz") { info!("**** Beginning analysis of {name_str}"); analyze_qmdl(path_str, args.show_skipped).await; if args.pcapify { diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 4d51b02..b96d148 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -35,7 +35,7 @@ futures-macro = "0.3.30" include_dir = "0.7.3" chrono = { version = "0.4.31", features = ["serde"] } tokio-stream = { version = "0.1.14", default-features = false, features = ["io-util"] } -futures = { version = "0.3.30", default-features = false } +futures = { version = "0.3.32", default-features = false, features = ["std"] } serde_json = "1.0.114" image = { version = "0.25.1", default-features = false, features = ["png", "gif"] } tempfile = "3.10.2" diff --git a/daemon/src/analysis.rs b/daemon/src/analysis.rs index 41efd95..723fe95 100644 --- a/daemon/src/analysis.rs +++ b/daemon/src/analysis.rs @@ -1,16 +1,15 @@ use std::sync::Arc; -use std::{cmp, future, pin}; +use std::cmp; use axum::Json; use axum::{ extract::{Path, State}, http::StatusCode, }; -use futures::TryStreamExt; use log::{error, info}; use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness}; -use rayhunter::diag::{DataType, MessagesContainer}; -use rayhunter::qmdl::QmdlReader; +use rayhunter::diag::{DiagParsingError, Message, MessagesContainer}; +use rayhunter::qmdl::QmdlMessageReader; use serde::Serialize; use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; @@ -47,7 +46,7 @@ impl AnalysisWriter { // Runs the analysis harness on the given container, serializing the results // to the analysis file, returning the whether any warnings were detected - pub async fn analyze( + pub async fn analyze_container( &mut self, container: MessagesContainer, ) -> Result { @@ -62,6 +61,17 @@ impl AnalysisWriter { Ok(max_type) } + pub async fn analyze_message( + &mut self, + maybe_qmdl_msg: Result, + ) -> Result { + let row = self.harness.analyze_qmdl_message(maybe_qmdl_msg); + if !row.is_empty() { + self.write(&row).await?; + } + Ok(row.get_max_event_type()) + } + async fn write(&mut self, value: &T) -> Result<(), std::io::Error> { let mut value_str = serde_json::to_string(value).unwrap(); value_str.push('\n'); @@ -135,7 +145,7 @@ async fn perform_analysis( analyzer_config: &AnalyzerConfig, ) -> Result<(), String> { info!("Opening QMDL and analysis file for {name}..."); - let (analysis_file, qmdl_file) = { + let (analysis_file, mut qmdl_reader) = { let mut qmdl_store = qmdl_store_lock.write().await; let (entry_index, _) = qmdl_store .entry_for_name(name) @@ -149,33 +159,25 @@ async fn perform_analysis( .await .map_err(|e| format!("{e:?}"))? .ok_or("QMDL file not found")?; + let qmdl_reader = QmdlMessageReader::new(qmdl_file) + .await + .map_err(|e| format!("{e:?}"))?; - (analysis_file, qmdl_file) + (analysis_file, qmdl_reader) }; let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config) .await .map_err(|e| format!("{e:?}"))?; - let file_size = qmdl_file - .metadata() - .await - .expect("failed to get QMDL file metadata") - .len(); - let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize)); - let mut qmdl_stream = pin::pin!( - qmdl_reader - .as_stream() - .try_filter(|container| future::ready(container.data_type == DataType::UserSpace)) - ); info!("Starting analysis for {name}..."); - while let Some(container) = qmdl_stream - .try_next() + while let Some(maybe_message) = qmdl_reader + .get_next_message() .await - .expect("failed getting QMDL container") + .expect("failed to get message") { let _ = analysis_writer - .analyze(container) + .analyze_message(maybe_message) .await .map_err(|e| format!("{e:?}"))?; } diff --git a/daemon/src/diag.rs b/daemon/src/diag.rs index 7e33912..3a4284d 100644 --- a/daemon/src/diag.rs +++ b/daemon/src/diag.rs @@ -74,7 +74,7 @@ pub struct DiagTask { enum DiagState { Recording { - qmdl_writer: QmdlWriter, + qmdl_writer: Box>, analysis_writer: Box, }, Stopped, @@ -158,7 +158,7 @@ impl DiagTask { DiskSpaceCheck::Failed => {} } - let (qmdl_file, analysis_file) = qmdl_store.new_entry(self.gps_mode).await?; + let (qmdl_gz_file, analysis_file) = qmdl_store.new_entry(self.gps_mode).await?; // For fixed-mode sessions, write the configured coordinates to the storage // immediately so the per-session GPS is stored durably and isn't affected @@ -185,13 +185,11 @@ impl DiagTask { .await .map_err(RecordingStoreError::WriteFileError)?; } - self.stop_current_recording().await; - let qmdl_writer = QmdlWriter::new(qmdl_file); + let qmdl_writer = Box::new(QmdlWriter::new(qmdl_gz_file)); let analysis_writer = AnalysisWriter::new(analysis_file, &self.analyzer_config) .await .map_err(RecordingStoreError::WriteFileError)?; - self.state = DiagState::Recording { qmdl_writer, analysis_writer: Box::new(analysis_writer), @@ -300,13 +298,23 @@ impl DiagTask { let mut state = DiagState::Stopped; std::mem::swap(&mut self.state, &mut state); if let DiagState::Recording { - analysis_writer, .. + qmdl_writer, + analysis_writer, + .. } = state { - analysis_writer - .close() - .await - .expect("failed to close analysis writer"); + match (qmdl_writer.close().await, analysis_writer.close().await) { + (Ok(()), Ok(())) => {} + (qmdl_result, analysis_result) => { + if let Err(err) = qmdl_result { + error!("failed to close QmdlWriter: {:?}", err); + } + if let Err(err) = analysis_result { + error!("failed to close AnalysisWriter: {:?}", err); + } + panic!(); + } + } } } @@ -374,23 +382,25 @@ impl DiagTask { self.stop(qmdl_store, Some(reason)).await; return; } - debug!( - "total QMDL bytes written: {}, updating manifest...", - qmdl_writer.total_written - ); - let index = qmdl_store - .current_entry - .expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); - if let Err(e) = qmdl_store - .update_entry_qmdl_size(index, qmdl_writer.total_written) - .await - { - let reason = format!("failed to update manifest (disk full?): {e}"); - error!("{reason}"); - self.stop(qmdl_store, Some(reason)).await; - return; + if let Ok(file_size) = qmdl_writer.size().await { + debug!( + "total QMDL bytes written: {}, updating manifest...", + file_size + ); + let index = qmdl_store + .current_entry + .expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); + if let Err(e) = qmdl_store + .update_entry_qmdl_size(index, file_size) + .await + { + let reason = format!("failed to update manifest (disk full?): {e}"); + error!("{reason}"); + self.stop(qmdl_store, Some(reason)).await; + return; + } + debug!("done!"); } - debug!("done!"); // Extract the latest packet timestamp from this container if let Some(ts) = container @@ -407,7 +417,7 @@ impl DiagTask { let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum(); self.bytes_since_space_check += container_bytes; - let max_type = match analysis_writer.analyze(container).await { + let max_type = match analysis_writer.analyze_container(container).await { Ok(t) => t, Err(e) => { warn!("failed to analyze container: {e}"); diff --git a/daemon/src/pcap.rs b/daemon/src/pcap.rs index bad30b0..8af85e3 100644 --- a/daemon/src/pcap.rs +++ b/daemon/src/pcap.rs @@ -10,12 +10,11 @@ use axum::http::StatusCode; use axum::http::header::CONTENT_TYPE; use axum::response::{IntoResponse, Response}; use log::error; -use rayhunter::diag::DataType; use rayhunter::gsmtap_parser; use rayhunter::pcap::{GpsPoint, GsmtapPcapWriter}; -use rayhunter::qmdl::QmdlReader; +use rayhunter::qmdl::QmdlMessageReader; use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWrite, duplex}; +use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, duplex}; use tokio_util::io::ReaderStream; #[cfg_attr(feature = "apidocs", utoipa::path( @@ -51,18 +50,20 @@ pub async fn get_pcap( "QMDL file is empty, try again in a bit!".to_string(), )); } - let qmdl_size_bytes = entry.qmdl_size_bytes; let qmdl_file = qmdl_store .open_file(entry_index, FileKind::Qmdl) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))? .ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?; + let qmdl_reader = QmdlMessageReader::new(qmdl_file) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?; let (reader, writer) = duplex(1024); let gps_records = load_gps_records_for_entry(&state, entry_index).await; drop(qmdl_store); tokio::spawn(async move { - if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes, gps_records).await { + if let Err(e) = generate_pcap_data(writer, qmdl_reader, gps_records).await { error!("failed to generate PCAP: {e:?}"); } }); @@ -131,37 +132,29 @@ fn find_nearest_gps(records: &[GpsRecord], packet_timestamp: i64) -> Option( writer: W, - qmdl_file: R, - qmdl_size_bytes: usize, + mut reader: QmdlMessageReader, gps_records: Vec, ) -> Result<(), Error> where W: AsyncWrite + Unpin + Send, - R: AsyncRead + Unpin, + R: AsyncRead + AsyncSeek + Unpin, { let mut pcap_writer = GsmtapPcapWriter::new(writer).await?; pcap_writer.write_iface_header().await?; - let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes)); - while let Some(container) = reader.get_next_messages_container().await? { - if container.data_type != DataType::UserSpace { - continue; - } - - for maybe_msg in container.messages() { - match maybe_msg { - Ok(msg) => { - let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?; - if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg { - let packet_unix_ts = timestamp.to_datetime().timestamp(); - let gps = find_nearest_gps(&gps_records, packet_unix_ts); - pcap_writer - .write_gsmtap_message(gsmtap_msg, timestamp, gps.as_ref()) - .await?; - } + while let Some(maybe_msg) = reader.get_next_message().await? { + match maybe_msg { + Ok(msg) => { + let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?; + if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg { + let packet_unix_ts = timestamp.to_datetime().timestamp(); + let gps = find_nearest_gps(&gps_records, packet_unix_ts); + pcap_writer + .write_gsmtap_message(gsmtap_msg, timestamp, gps.as_ref()) + .await?; } - Err(e) => error!("error parsing message: {e:?}"), } + Err(e) => error!("error parsing message: {e:?}"), } } diff --git a/daemon/src/qmdl_store.rs b/daemon/src/qmdl_store.rs index d15c1f7..c455846 100644 --- a/daemon/src/qmdl_store.rs +++ b/daemon/src/qmdl_store.rs @@ -55,16 +55,17 @@ impl FileKind { // List of all possible physical files on disk. pub const ALL: &'static [FileKind] = &[FileKind::Qmdl, FileKind::Analysis, FileKind::Gps]; - pub fn get_filename(&self, entry_name: &str) -> String { + pub fn get_filename(&self, entry_name: &str, qmdl_compressed: bool) -> String { match self { + FileKind::Qmdl if qmdl_compressed => format!("{}.qmdl.gz", entry_name), FileKind::Qmdl => format!("{}.qmdl", entry_name), FileKind::Analysis => format!("{}.ndjson", entry_name), FileKind::Gps => format!("{}-gps.ndjson", entry_name), } } - pub fn get_filepath>(&self, entry_name: &str, base_path: P) -> PathBuf { - base_path.as_ref().join(self.get_filename(entry_name)) + pub fn get_filepath>(&self, entry_name: &str, base_path: P, qmdl_compressed: bool) -> PathBuf { + base_path.as_ref().join(self.get_filename(entry_name, qmdl_compressed)) } } @@ -101,7 +102,6 @@ pub struct ManifestEntry { /// The system time when the last message was recorded to the file #[cfg_attr(feature = "apidocs", schema(value_type = String))] pub last_message_time: Option>, - /// The size of the QMDL file in bytes pub qmdl_size_bytes: usize, /// The rayhunter daemon version which generated the file pub rayhunter_version: Option, @@ -116,6 +116,8 @@ pub struct ManifestEntry { pub upload_time: Option>, #[serde(default)] pub gps_mode: Option, + #[serde(default)] + pub compressed: bool, } impl ManifestEntry { @@ -133,11 +135,12 @@ impl ManifestEntry { stop_reason: None, upload_time: None, gps_mode: Some(gps_mode), + compressed: true, } } pub fn get_filepath>(&self, file_kind: FileKind, path: P) -> PathBuf { - file_kind.get_filepath(&self.name, path) + file_kind.get_filepath(&self.name, path, self.compressed) } } @@ -196,8 +199,9 @@ impl RecordingStore { } // Does a best-effort attempt to recover the manifest from a directory of - // QMDL files. We expect these files to be named like ".qmdl", - // and skip any files which don't match that pattern. + // QMDL files. We expect these files to be named like ".qmdl" + // or ".qmdl.gz", and skip any files which don't match that + // pattern. pub async fn recover

(path: P) -> Result where P: AsRef, @@ -217,11 +221,14 @@ impl RecordingStore { continue; }; - if !filename.ends_with(".qmdl") { + let (stem, compressed) = if filename.ends_with(".qmdl") { + (filename.trim_end_matches(".qmdl"), false) + } else if filename.ends_with(".qmdl.gz") { + (filename.trim_end_matches(".qmdl.gz"), true) + } else { continue; - } + }; - let stem = filename.trim_end_matches(".qmdl"); let Ok(start_timestamp) = stem.parse::() else { warn!("QMDL file has invalid name {os_filename:?}, skipping"); continue; @@ -248,6 +255,7 @@ impl RecordingStore { info!("successfully recovered QMDL entry {os_filename:?}!"); manifest_entries.push(ManifestEntry { name: stem.to_string(), + compressed, start_time: start_time.into(), last_message_time: Some(last_message_time.into()), qmdl_size_bytes: metadata.size() as usize, @@ -322,7 +330,7 @@ impl RecordingStore { file_kind: FileKind, ) -> Result, RecordingStoreError> { let entry = &self.manifest.entries[entry_index]; - let filepath = file_kind.get_filepath(&entry.name, &self.path); + let filepath = file_kind.get_filepath(&entry.name, &self.path, entry.compressed); match File::open(&filepath).await { Ok(file) => Ok(Some(file)), @@ -496,7 +504,7 @@ impl RecordingStore { self.write_manifest().await?; for &file_kind in FileKind::ALL { - let filepath = file_kind.get_filepath(&entry_to_delete.name, &self.path); + let filepath = file_kind.get_filepath(&entry_to_delete.name, &self.path, entry_to_delete.compressed); remove_file_if_exists(&filepath) .await .map_err(RecordingStoreError::DeleteFileError)?; @@ -513,7 +521,7 @@ impl RecordingStore { 'entries: for entry in &self.manifest.entries { for &file_kind in FileKind::ALL { - let filepath = file_kind.get_filepath(&entry.name, &self.path); + let filepath = file_kind.get_filepath(&entry.name, &self.path, entry.compressed); if let Err(e) = remove_file_if_exists(&filepath).await { log::warn!("failed to remove {filepath:?}: {e:?}"); // Some error happened with deleting this entry, abort and go to the next one. @@ -583,7 +591,10 @@ mod tests { .entry_for_name(&store.manifest.entries[entry_index].name) .unwrap(); assert!(entry.last_message_time.is_some()); - assert_eq!(store.manifest.entries[entry_index].qmdl_size_bytes, 1000); + assert_eq!( + store.manifest.entries[entry_index].qmdl_size_bytes, + 1000 + ); assert_eq!( RecordingStore::read_manifest(dir.path()).await.unwrap(), store.manifest diff --git a/daemon/src/server.rs b/daemon/src/server.rs index c898a48..e80e96f 100644 --- a/daemon/src/server.rs +++ b/daemon/src/server.rs @@ -11,10 +11,13 @@ use axum::http::{HeaderValue, StatusCode}; use axum::response::{IntoResponse, Response}; use chrono::{DateTime, Local}; use log::{error, warn}; +use rayhunter::qmdl::QmdlMessageReader; use serde::{Deserialize, Serialize}; +use tokio::io::AsyncReadExt; use std::sync::Arc; use tokio::fs::write; -use tokio::io::{AsyncReadExt, copy, duplex}; +use tokio::io::copy; +use tokio::io::duplex; use tokio::sync::RwLock; use tokio::sync::mpsc::Sender; use tokio_util::compat::FuturesAsyncWriteCompatExt; @@ -81,14 +84,23 @@ pub async fn get_qmdl( ) })? .ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?; - let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64); - let qmdl_stream = ReaderStream::new(limited_qmdl_file); + let qmdl_reader = QmdlMessageReader::new(qmdl_file) + .await + .map_err(|err| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("error reading QMDL file: {err}"), + ) + })?; let headers = [ (CONTENT_TYPE, "application/octet-stream"), - (CONTENT_LENGTH, &entry.qmdl_size_bytes.to_string()), + ( + CONTENT_LENGTH, + &entry.qmdl_size_bytes.to_string(), + ), ]; - let body = Body::from_stream(qmdl_stream); + let body = Body::from_stream(qmdl_reader.into_qmdl_stream()); Ok((headers, body).into_response()) } @@ -334,7 +346,7 @@ pub async fn get_zip( Path(entry_name): Path, ) -> Result { let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned(); - let (entry_index, qmdl_size_bytes) = { + let (entry_index, compressed, qmdl_file_size) = { let qmdl_store = state.qmdl_store_lock.read().await; let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or(( StatusCode::NOT_FOUND, @@ -348,7 +360,7 @@ pub async fn get_zip( )); } - (entry_index, entry.qmdl_size_bytes) + (entry_index, entry.compressed, entry.qmdl_size_bytes) }; let qmdl_store_lock = state.qmdl_store_lock.clone(); @@ -377,23 +389,22 @@ pub async fn get_zip( continue; }; - let entry = ZipEntryBuilder::new( - file_kind.get_filename(&qmdl_idx).into(), + let zip_entry = ZipEntryBuilder::new( + file_kind.get_filename(&qmdl_idx, compressed).into(), Compression::Stored, ); // FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does // not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed // once https://github.com/Majored/rs-async-zip/pull/160 is released. - let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write(); + let mut entry_writer = zip.write_entry_stream(zip_entry).await?.compat_write(); // Truncating to qmdl_size_bytes is an attempt to ignore partial writes by the diag // thread. if file_kind == FileKind::Qmdl { - copy(&mut file.take(qmdl_size_bytes as u64), &mut entry_writer).await?; + copy(&mut file.take(qmdl_file_size as u64), &mut entry_writer).await?; } else { copy(&mut file, &mut entry_writer).await?; } - entry_writer.into_inner().close().await?; } @@ -409,13 +420,12 @@ pub async fn get_zip( .open_file(entry_index, FileKind::Qmdl) .await? .ok_or_else(|| anyhow::anyhow!("QMDL file not found"))? - .take(qmdl_size_bytes as u64) }; + let qmdl_reader = QmdlMessageReader::new(qmdl_file_for_pcap).await?; if let Err(e) = generate_pcap_data( &mut entry_writer, - qmdl_file_for_pcap, - qmdl_size_bytes, + qmdl_reader, gps_records, ) .await @@ -532,10 +542,14 @@ pub async fn debug_set_display_state( #[cfg(test)] mod tests { + use std::io::Cursor; + use super::*; use crate::config::GpsMode; use async_zip::base::read::mem::ZipFileReader; use axum::extract::{Path, State}; + use futures::AsyncReadExt; + use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlMessageReader, QmdlWriter}}; use tempfile::TempDir; async fn create_test_qmdl_store() -> (TempDir, Arc>) { @@ -549,24 +563,24 @@ mod tests { async fn create_test_entry_with_data( store_lock: &Arc>, - test_data: &[u8], + test_data: &MessagesContainer, ) -> String { let entry_name = { let mut store = store_lock.write().await; - let (mut qmdl_file, _analysis_file) = store.new_entry(GpsMode::Disabled).await.unwrap(); + let (mut qmdl_gz_file, _analysis_file) = store.new_entry(GpsMode::Disabled).await.unwrap(); - if !test_data.is_empty() { - use tokio::io::AsyncWriteExt; - qmdl_file.write_all(test_data).await.unwrap(); - qmdl_file.flush().await.unwrap(); - } + let mut writer = QmdlWriter::new(&mut qmdl_gz_file); + writer.write_container(test_data).await.unwrap(); + writer.close().await.unwrap(); + + let qmdl_file_size = qmdl_gz_file.metadata().await.unwrap().len() as usize; let current_entry = store.current_entry.unwrap(); let entry = &store.manifest.entries[current_entry]; let entry_name = entry.name.clone(); store - .update_entry_qmdl_size(current_entry, test_data.len()) + .update_entry_qmdl_size(current_entry, qmdl_file_size) .await .unwrap(); entry_name @@ -604,17 +618,69 @@ mod tests { }) } + // valid HDLC encapsulated diag message generated from + // rayhunter::diag::test::get_test_message + fn create_test_container() -> MessagesContainer { + MessagesContainer { + data_type: DataType::UserSpace, + num_messages: 1, + messages: vec![ + HdlcEncapsulatedMessage { + len: 39, + data: vec![ + 16, + 0, + 32, + 0, + 32, + 0, + 192, + 176, + 26, + 165, + 245, + 135, + 118, + 35, + 2, + 1, + 20, + 14, + 48, + 0, + 160, + 0, + 2, + 8, + 0, + 0, + 217, + 15, + 5, + 0, + 0, + 0, + 0, + 1, + 0, + 10, + 13, + 196, + 126, + ], + }, + ], + } + } + #[tokio::test] async fn test_get_zip_success() { let (_temp_dir, store_lock) = create_test_qmdl_store().await; - let test_qmdl_data = vec![0x7E, 0x00, 0x00, 0x00, 0x10, 0x00, 0x7E]; + let test_qmdl_data = create_test_container(); let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await; let state = create_test_server_state(store_lock); - let result = get_zip(State(state), Path(entry_name.clone())).await; - - assert!(result.is_ok()); - let response = result.unwrap(); + let response = get_zip(State(state), Path(entry_name.clone())).await.unwrap(); let headers = response.headers(); assert_eq!(headers.get("content-type").unwrap(), "application/zip"); @@ -623,21 +689,32 @@ mod tests { let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap(); let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap(); - - let filenames = zip_reader - .file() - .entries() + let zip_reader_file = zip_reader.file(); + let filenames: Vec = zip_reader_file.entries() .iter() - .map(|entry| entry.filename().as_str().unwrap().to_owned()) - .collect::>(); - + .map(|entry| entry.filename().as_str().unwrap().to_string()) + .collect(); assert_eq!( filenames, vec![ - format!("{entry_name}.qmdl"), + format!("{entry_name}.qmdl.gz"), format!("{entry_name}-gps.ndjson"), format!("{entry_name}.pcapng"), ] ); + + let mut qmdl_body = Vec::with_capacity(128); + zip_reader.reader_without_entry(0) + .await + .unwrap() + .read_to_end(&mut qmdl_body) + .await + .unwrap(); + let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body)).await.unwrap(); + let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap(); + assert_eq!( + qmdl_reader.get_next_message().await.unwrap(), + Some(Ok(expected_message)), + ); } } diff --git a/daemon/src/webdav.rs b/daemon/src/webdav.rs index 6369f9f..aa36ed3 100644 --- a/daemon/src/webdav.rs +++ b/daemon/src/webdav.rs @@ -102,7 +102,8 @@ async fn try_upload_entry( shutdown_token: CancellationToken, ) -> Option<()> { let read_lock = store.read().await; - let entry_idx = read_lock.entry_for_name(&entry_name)?.0; + let (entry_idx, entry) = read_lock.entry_for_name(&entry_name)?; + let compressed = entry.compressed; let file = read_lock.open_file(entry_idx, file_kind).await; drop(read_lock); @@ -118,7 +119,7 @@ async fn try_upload_entry( } }; - let file_name = file_kind.get_filename(&entry_name); + let file_name = file_kind.get_filename(&entry_name, compressed); let res = select! { _ = shutdown_token.cancelled() => { @@ -331,7 +332,7 @@ mod tests { let recorded = captured.lock().await; assert_eq!(recorded.len(), 3); let paths: Vec<&str> = recorded.iter().map(|r| r.path.as_str()).collect(); - let qmdl_path = format!("dav/{}.qmdl", entry_name); + let qmdl_path = format!("dav/{}.qmdl.gz", entry_name); let ndjson_path = format!("dav/{}.ndjson", entry_name); let gps_path = format!("dav/{}-gps.ndjson", entry_name); assert!(paths.contains(&qmdl_path.as_str())); diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 26099dd..1b81ebf 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -30,6 +30,7 @@ serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0" num_enum = "0.7.4" utoipa = { version = "5.4.0", optional = true } +async-compression = { version = "0.4.41", features = ["tokio", "gzip"] } [dev-dependencies] tempfile = "3" diff --git a/lib/src/analysis/analyzer.rs b/lib/src/analysis/analyzer.rs index 426e934..1d94312 100644 --- a/lib/src/analysis/analyzer.rs +++ b/lib/src/analysis/analyzer.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; use crate::analysis::diagnostic::DiagnosticAnalyzer; +use crate::diag::{DiagParsingError, Message}; use crate::gsmtap::{GsmtapHeader, GsmtapMessage, GsmtapType}; use crate::util::RuntimeMetadata; use crate::{diag::MessagesContainer, gsmtap_parser}; @@ -231,6 +232,14 @@ pub struct AnalysisRow { } impl AnalysisRow { + pub fn new() -> Self { + Self { + packet_timestamp: None, + skipped_message_reason: None, + events: vec![], + } + } + pub fn is_empty(&self) -> bool { self.skipped_message_reason.is_none() && !self.contains_warnings() } @@ -412,50 +421,47 @@ impl Harness { row } + pub fn analyze_qmdl_message(&mut self, maybe_qmdl_message: Result) -> AnalysisRow { + let mut row = AnalysisRow::new(); + self.packet_num += 1; + + let qmdl_message = match maybe_qmdl_message { + Ok(msg) => msg, + Err(err) => { + row.skipped_message_reason = Some(format!("{err:?}")); + return row; + } + }; + let gsmtap_message = match gsmtap_parser::parse(qmdl_message) { + Ok(msg) => msg, + Err(err) => { + row.skipped_message_reason = Some(format!("{err:?}")); + return row; + } + }; + + let Some((timestamp, gsmtap_msg)) = gsmtap_message else { + return row; + }; + row.packet_timestamp = Some(timestamp.to_datetime()); + + let element = match InformationElement::try_from(&gsmtap_msg) { + Ok(element) => element, + Err(err) => { + row.skipped_message_reason = Some(format!("{err:?}")); + return row; + } + }; + + row.events = self.analyze_information_element(&element); + row + } + pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec { - let mut rows = Vec::new(); - for maybe_qmdl_message in container.messages() { - self.packet_num += 1; - - rows.push(AnalysisRow { - packet_timestamp: None, - skipped_message_reason: None, - events: Vec::new(), - }); - // unwrap is safe here since we just pushed a value - let row = rows.last_mut().unwrap(); - let qmdl_message = match maybe_qmdl_message { - Ok(msg) => msg, - Err(err) => { - row.skipped_message_reason = Some(format!("{err:?}")); - continue; - } - }; - - let gsmtap_message = match gsmtap_parser::parse(qmdl_message) { - Ok(msg) => msg, - Err(err) => { - row.skipped_message_reason = Some(format!("{err:?}")); - continue; - } - }; - - let Some((timestamp, gsmtap_msg)) = gsmtap_message else { - continue; - }; - row.packet_timestamp = Some(timestamp.to_datetime()); - - let element = match InformationElement::try_from(&gsmtap_msg) { - Ok(element) => element, - Err(err) => { - row.skipped_message_reason = Some(format!("{err:?}")); - continue; - } - }; - - row.events = self.analyze_information_element(&element); - } - rows + container.messages() + .drain(..) + .map(|maybe_message| self.analyze_qmdl_message(maybe_message)) + .collect() } fn analyze_information_element(&mut self, ie: &InformationElement) -> Vec> { diff --git a/lib/src/diag.rs b/lib/src/diag.rs index 323747b..4c42b69 100644 --- a/lib/src/diag.rs +++ b/lib/src/diag.rs @@ -1,5 +1,6 @@ //! Diag protocol serialization/deserialization +use bytes::Bytes; use chrono::{DateTime, FixedOffset}; use crc::{Algorithm, Crc}; use deku::prelude::*; @@ -89,30 +90,19 @@ impl MessagesContainer { let mut result = Vec::new(); for msg in &self.messages { for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) { - match hdlc_decapsulate(sub_msg, &CRC_CCITT) { - Ok(data) => match Message::from_bytes((&data, 0)) { - Ok(((leftover_bytes, _), res)) => { - if !leftover_bytes.is_empty() { - warn!( - "warning: {} leftover bytes when parsing Message", - leftover_bytes.len() - ); - } - result.push(Ok(res)); - } - Err(e) => result.push(Err(DiagParsingError::MessageParsingError(e, data))), - }, - Err(err) => result.push(Err(DiagParsingError::HdlcDecapsulationError( - err, - sub_msg.to_vec(), - ))), - } + result.push(Message::from_hdlc(sub_msg)); } } result } } +impl From for Bytes { + fn from(value: MessagesContainer) -> Self { + value.to_bytes().unwrap().into() + } +} + #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] pub struct HdlcEncapsulatedMessage { pub len: u32, @@ -152,6 +142,29 @@ pub enum Message { }, } +impl Message { + pub fn from_hdlc(data: &[u8]) -> Result { + match hdlc_decapsulate(data, &CRC_CCITT) { + Ok(data) => match Message::from_bytes((&data, 0)) { + Ok(((leftover_bytes, _), res)) => { + if !leftover_bytes.is_empty() { + warn!( + "warning: {} leftover bytes when parsing Message", + leftover_bytes.len() + ); + } + Ok(res) + } + Err(e) => Err(DiagParsingError::MessageParsingError(e, data)), + }, + Err(err) => Err(DiagParsingError::HdlcDecapsulationError( + err, + data.to_vec(), + )), + } + } +} + #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] #[deku(ctx = "log_type: u16, hdr_len: u16", id = "log_type")] pub enum LogBody { @@ -411,7 +424,7 @@ pub fn build_log_mask_request( } #[cfg(test)] -mod test { +pub(crate) mod test { use super::*; // Just about all of these test cases from manually parsing diag packets w/ QCSuper @@ -525,7 +538,7 @@ mod test { // this log is based on one captured on a real device -- if it fails to // serialize or deserialize, that's probably a problem with this mock, not // the DiagReader implementation - fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) { + pub fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) { let length_with_payload = 31 + payload.len() as u16; let message = Message::Log { pending_msgs: 0, @@ -559,6 +572,8 @@ mod test { len: encapsulated_data.len() as u32, data: encapsulated_data, }; + // sanity check + assert_eq!(&Message::from_hdlc(&encapsulated.data).unwrap(), &message); (encapsulated, message) } diff --git a/lib/src/qmdl.rs b/lib/src/qmdl.rs index 9a2f0c6..b259ac1 100644 --- a/lib/src/qmdl.rs +++ b/lib/src/qmdl.rs @@ -3,109 +3,211 @@ //! QmdlReader and QmdlWriter can read and write MessagesContainers to and from //! QMDL files. -use crate::diag::{DataType, HdlcEncapsulatedMessage, MESSAGE_TERMINATOR, MessagesContainer}; +use std::io::ErrorKind; +use std::pin::Pin; +use std::task::Poll; +use crate::diag::{DiagParsingError, MESSAGE_TERMINATOR, Message, MessagesContainer}; + +use async_compression::tokio::bufread::GzipDecoder; +use async_compression::tokio::write::GzipEncoder; use futures::TryStream; -use log::error; -use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader}; + +const GZIP_MAGIC_NUMBER: u16 = 0x1f8b; pub struct QmdlWriter where T: AsyncWrite + Unpin, { - writer: T, - pub total_written: usize, + writer: GzipEncoder, } impl QmdlWriter where - T: AsyncWrite + Unpin, + T: AsyncWrite + AsyncSeek + Unpin, { pub fn new(writer: T) -> Self { - QmdlWriter::new_with_existing_size(writer, 0) + let gzip_writer = GzipEncoder::new(writer); + QmdlWriter { + writer: gzip_writer, + } } - pub fn new_with_existing_size(writer: T, existing_size: usize) -> Self { - QmdlWriter { - writer, - total_written: existing_size, - } + pub async fn size(&mut self) -> std::io::Result { + let size = self.writer.get_mut().stream_position().await?; + Ok(size as usize) } pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> { for msg in &container.messages { self.writer.write_all(&msg.data).await?; - self.total_written += msg.data.len(); + self.writer.flush().await?; } Ok(()) } + + pub async fn close(mut self) -> std::io::Result<()> { + self.writer.shutdown().await?; + Ok(()) + } } -pub struct QmdlReader +#[derive(Debug)] +enum QmdlReaderSource { + Compressed { + reader: GzipDecoder>, + eof: bool, + }, + Uncompressed { + reader: T, + }, +} + +#[derive(Debug)] +struct QmdlAsyncReader { + source: QmdlReaderSource, +} + +impl QmdlAsyncReader where T: AsyncRead, { - reader: BufReader, - bytes_read: usize, - max_bytes: Option, + pub fn new(reader: T, compressed: bool) -> Self { + let source = if compressed { + QmdlReaderSource::Compressed { + reader: GzipDecoder::new(BufReader::new(reader)), + eof: false, + } + } else { + QmdlReaderSource::Uncompressed { reader } + }; + Self { + source, + } + } } -impl QmdlReader +impl AsyncRead for QmdlAsyncReader where T: AsyncRead + Unpin, { - pub fn new(reader: T, max_bytes: Option) -> Self { - QmdlReader { - reader: BufReader::new(reader), - bytes_read: 0, - max_bytes, + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + match &mut self.get_mut().source { + QmdlReaderSource::Compressed { reader, eof } => { + // if we already determined we've reached the Gzip EOF, don't read more + if *eof { + return Poll::Ready(Ok(())); + } + + match Pin::new(reader).poll_read(cx, buf) { + // if we hit an unexpected EOF in a Gzip file, it shouldn't + // be considered fatal, just a truncated file. mark that + // we're done and return the result as usual + Poll::Ready(Err(err)) if err.kind() == ErrorKind::UnexpectedEof => { + *eof = true; + Poll::Ready(Ok(())) + } + res => res, + } + } + QmdlReaderSource::Uncompressed { reader } => Pin::new(reader).poll_read(cx, buf), } } +} - pub fn as_stream( - &mut self, - ) -> impl TryStream + '_ { - futures::stream::try_unfold(self, |reader| async { - let maybe_container = reader.get_next_messages_container().await?; - match maybe_container { - Some(container) => Ok(Some((container, reader))), +#[derive(Debug)] +pub struct QmdlMessageReader +where + T: AsyncRead, +{ + buf_reader: BufReader>, +} + +async fn is_gzip_stream(mut reader: T) -> std::io::Result +where + T: AsyncRead + AsyncSeek + Unpin +{ + let magic_number = reader.read_u16().await?; + reader.rewind().await?; + // this is safe because 0x1f8b.... doesn't overlap with any known + // diag::DataType values + Ok(magic_number == GZIP_MAGIC_NUMBER) +} + +impl QmdlMessageReader +where + T: AsyncRead + AsyncSeek + Unpin, +{ + pub async fn new(mut reader: T) -> std::io::Result { + let compressed = is_gzip_stream(&mut reader) + .await + .unwrap_or(false); + Ok(QmdlMessageReader { + buf_reader: BufReader::new(QmdlAsyncReader::new( + reader, + compressed, + )), + }) + } + + pub fn is_compressed(&self) -> bool { + matches!(self.buf_reader.get_ref().source, QmdlReaderSource::Compressed { .. }) + } + + pub fn into_qmdl_stream(self) -> impl TryStream, Error = std::io::Error> { + futures::stream::try_unfold(self, |mut reader| async { + let mut buf = vec![]; + match reader .buf_reader + .read_until(MESSAGE_TERMINATOR, &mut buf) + .await { + Err(err) => Err(err), + Ok(0) => Ok(None), + Ok(_) => Ok(Some((buf, reader))), + } + }) + } + + pub fn into_message_stream(self) -> impl TryStream, Error = std::io::Error> { + futures::stream::try_unfold(self, |mut reader| async { + match reader.get_next_message().await? { + Some(res) => Ok(Some((res, reader))), None => Ok(None), } }) } - pub async fn get_next_messages_container( + pub async fn get_next_message( &mut self, - ) -> Result, std::io::Error> { - if let Some(max_bytes) = self.max_bytes - && self.bytes_read >= max_bytes + ) -> Result>, std::io::Error> { + let mut buf = vec![]; + if self + .buf_reader + .read_until(MESSAGE_TERMINATOR, &mut buf) + .await? + == 0 { - if self.bytes_read > max_bytes { - error!( - "warning: {} bytes read, but max_bytes was {}", - self.bytes_read, max_bytes - ); - } return Ok(None); } - let mut buf = Vec::new(); - let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf).await?; - self.bytes_read += bytes_read; + Ok(Some(Message::from_hdlc(&buf))) + } +} - // Since QMDL is just a flat list of messages, we can't actually - // reproduce the container structure they came from in the original - // read. So we'll just pretend that all containers had exactly one - // message. As far as I know, the number of messages per container - // doesn't actually affect anything, so this should be fine. - Ok(Some(MessagesContainer { - data_type: DataType::UserSpace, - num_messages: 1, - messages: vec![HdlcEncapsulatedMessage { - len: bytes_read as u32, - data: buf, - }], - })) +impl AsyncRead for QmdlMessageReader +where + T: AsyncRead + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().buf_reader).poll_read(cx, buf) } } @@ -113,132 +215,185 @@ where mod test { use std::io::Cursor; - use crate::diag::CRC_CCITT; - use crate::hdlc::hdlc_encapsulate; + use crate::diag::{DataType, HdlcEncapsulatedMessage, test::get_test_message}; use super::*; - fn get_test_messages() -> Vec { - let messages: Vec = (10..20) - .map(|i| { - let data = hdlc_encapsulate(&vec![i as u8; i], &CRC_CCITT); - HdlcEncapsulatedMessage { - len: data.len() as u32, - data, - } - }) - .collect(); - messages + fn get_test_messages() -> (Vec, Vec) { + let mut hdlcs = Vec::new(); + let mut messages = Vec::new(); + for i in 10..20 { + let (hdlc, msg) = get_test_message(&[i]); + hdlcs.push(hdlc); + messages.push(msg); + } + (hdlcs, messages) } // returns a byte array consisting of concatenated HDLC encapsulated // test messages fn get_test_message_bytes() -> Vec { - get_test_messages() + let (hdlcs, _) = get_test_messages(); + hdlcs .iter() .flat_map(|msg| msg.data.clone()) .collect() } fn get_test_containers() -> Vec { - let messages = get_test_messages(); - let (messages1, messages2) = messages.split_at(5); + let (hdlcs, _) = get_test_messages(); + let (hdlcs1, hdlcs2) = hdlcs.split_at(5); vec![ MessagesContainer { data_type: DataType::UserSpace, - num_messages: messages1.len() as u32, - messages: messages1.to_vec(), + num_messages: hdlcs1.len() as u32, + messages: hdlcs1.to_vec(), }, MessagesContainer { data_type: DataType::UserSpace, - num_messages: messages2.len() as u32, - messages: messages2.to_vec(), + num_messages: hdlcs2.len() as u32, + messages: hdlcs2.to_vec(), }, ] } #[tokio::test] - async fn test_unbounded_qmdl_reader() { + async fn test_qmdl_reader() { let mut buf = Cursor::new(get_test_message_bytes()); - let mut reader = QmdlReader::new(&mut buf, None); - let expected_messages = get_test_messages(); - for message in expected_messages { - let expected_container = MessagesContainer { - data_type: DataType::UserSpace, - num_messages: 1, - messages: vec![message], - }; + let mut reader = QmdlMessageReader::new(&mut buf).await.unwrap(); + assert!(!reader.is_compressed()); + let (_, expected_messages) = get_test_messages(); + for msg in expected_messages { assert_eq!( - expected_container, - reader.get_next_messages_container().await.unwrap().unwrap() + Ok(msg), + reader.get_next_message().await.unwrap().unwrap() ); } } #[tokio::test] - async fn test_bounded_qmdl_reader() { - let mut buf = Cursor::new(get_test_message_bytes()); + async fn test_truncation() { + run_truncation_tests(false).await; + } - // bound the reader to the first two messages - let mut expected_messages = get_test_messages(); - let limit = expected_messages[0].len + expected_messages[1].len; + #[tokio::test] + async fn test_compressed_truncation() { + run_truncation_tests(true).await; + } - let mut reader = QmdlReader::new(&mut buf, Some(limit as usize)); - for message in expected_messages.drain(0..2) { - let expected_container = MessagesContainer { - data_type: DataType::UserSpace, - num_messages: 1, - messages: vec![message], - }; + async fn run_truncation_tests(compressed: bool) { + let (hdlcs, expected_messages) = get_test_messages(); + let (bytes, message_lengths): (Vec, Vec) = if compressed { + let mut buf = Vec::new(); + let mut compressed_lengths = Vec::new(); + let mut writer = GzipEncoder::new(&mut buf); + for hdlc in &hdlcs { + let before = writer.get_ref().len(); + writer.write_all(&hdlc.data).await.unwrap(); + writer.flush().await.unwrap(); + let after = writer.get_ref().len(); + compressed_lengths.push(after - before); + } + (buf, compressed_lengths) + } else { + ( + get_test_message_bytes(), + hdlcs.iter() + .map(|hdlc| hdlc.data.len()) + .collect() + ) + }; + for truncated_hdlc_i in 1..hdlcs.len() - 1 { + let whole_bytes: usize = message_lengths.iter().take(truncated_hdlc_i).sum(); + for truncated_byte in 1..message_lengths[truncated_hdlc_i] { + let mut truncated_bytes = Cursor::new(&bytes[0..whole_bytes + truncated_byte]); + let mut reader = QmdlMessageReader::new(&mut truncated_bytes).await.unwrap(); + for msg in expected_messages.iter().take(truncated_hdlc_i) { + assert_eq!( + Ok(msg), + reader.get_next_message().await.unwrap().unwrap().as_ref() + ); + } + if compressed { + // for a compressed reader, we have a couple possible + // outcomes, depending on how far along the Gzip DEFLATE + // block was before it was truncated: + match reader.get_next_message().await.unwrap() { + // if the block was truncated early enough, the + // GzipDecoder will detect an unexpected EOF, and our + // QmdlReader will indicate the stream of messages is + // done + None => {}, + // if it's further along, the expanded result will be an + // invalid HDLC block. if that's the case, make sure the + // QmdlReader indicates the stream of messages is over + // with afterwards + Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))) => { + assert!(matches!(reader.get_next_message().await, Ok(None))); + }, + // if it's further along still, we may get a complete + // Message, so make sure it matches the next expected + // one. then, make sure we've hit the end of the message + // stream + Some(Ok(msg)) => { + assert_eq!(&msg, &expected_messages[truncated_hdlc_i]); + assert!(matches!(reader.get_next_message().await, Ok(None))); + }, + // we should never be able to decapsulate the HDLC into + // an invalid Diag message + Some(Err(DiagParsingError::MessageParsingError(_, _))) + => { + panic!("unexpected MessageParsingError"); + } + } + } else { + // a truncated uncompressed reader should always end on an + // HdlcDecapsulationError, and then return Ok(None) to + // indicate the message stream is over + assert!(matches!( + reader.get_next_message().await, + Ok(Some(Err(DiagParsingError::HdlcDecapsulationError(_, _)))) + )); + assert!(matches!(reader.get_next_message().await, Ok(None))); + } + } + } + } + + /// Writes the test containers to a QmdlWriter, optionally finishing the + /// gzip stream with a footer. Then, attempts to decompress the buffer with + /// a QmdlWriter, asserting that the containers match what's expected. + async fn run_compressed_reading_and_writing_tests(do_close: bool) { + let containers = get_test_containers(); + let mut buf = Cursor::new(Vec::new()); + { + let mut writer = QmdlWriter::new(&mut buf); + for container in &containers { + writer.write_container(&container).await.unwrap(); + } + if do_close { + writer.close().await.unwrap(); + } + } + buf.set_position(0); + let mut reader = QmdlMessageReader::new(buf).await.unwrap(); + assert!(reader.is_compressed()); + let (_, expected_messages) = get_test_messages(); + for message in expected_messages { assert_eq!( - expected_container, - reader.get_next_messages_container().await.unwrap().unwrap() + Ok(message), + reader.get_next_message().await.unwrap().unwrap() ); } assert!(matches!( - reader.get_next_messages_container().await, + reader.get_next_message().await, Ok(None) )); } #[tokio::test] - async fn test_qmdl_writer() { - let mut buf = Vec::new(); - let mut writer = QmdlWriter::new(&mut buf); - let expected_containers = get_test_containers(); - for container in &expected_containers { - writer.write_container(container).await.unwrap(); - } - assert_eq!(writer.total_written, buf.len()); - assert_eq!(buf, get_test_message_bytes()); - } - - #[tokio::test] - async fn test_writing_and_reading() { - let mut buf = Vec::new(); - let mut writer = QmdlWriter::new(&mut buf); - let expected_containers = get_test_containers(); - for container in &expected_containers { - writer.write_container(container).await.unwrap(); - } - - let limit = Some(buf.len()); - let mut reader = QmdlReader::new(Cursor::new(&mut buf), limit); - let expected_messages = get_test_messages(); - for message in expected_messages { - let expected_container = MessagesContainer { - data_type: DataType::UserSpace, - num_messages: 1, - messages: vec![message], - }; - assert_eq!( - expected_container, - reader.get_next_messages_container().await.unwrap().unwrap() - ); - } - assert!(matches!( - reader.get_next_messages_container().await, - Ok(None) - )); + async fn test_compressed_reading_and_writing() { + run_compressed_reading_and_writing_tests(true).await; + run_compressed_reading_and_writing_tests(false).await; } }