From fda0659ba49c648a6eef0039caf82a409bed6635 Mon Sep 17 00:00:00 2001 From: Will Greenberg Date: Wed, 3 Jun 2026 19:11:53 -0700 Subject: [PATCH] Revert manifest change, rename some structs Because QmdlMessageReader no longer cares about tracking the uncompressed bytes read thus far, we can use the old manifest QMDL file size name again. --- check/src/main.rs | 6 +++--- daemon/src/diag.rs | 34 ++++++++++++++++++---------------- daemon/src/pcap.rs | 6 +++--- daemon/src/qmdl_store.rs | 19 ++++++++----------- daemon/src/server.rs | 19 ++++++++++--------- lib/src/qmdl.rs | 37 +++++++++++++++++++++++-------------- 6 files changed, 65 insertions(+), 56 deletions(-) diff --git a/check/src/main.rs b/check/src/main.rs index 34501b0..320765c 100644 --- a/check/src/main.rs +++ b/check/src/main.rs @@ -5,7 +5,7 @@ use rayhunter::{ analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness}, gsmtap_parser, pcap::GsmtapPcapWriter, - qmdl::QmdlReader, + qmdl::QmdlMessageReader, }; use std::{collections::HashMap, path::PathBuf}; use tokio::fs::File; @@ -111,7 +111,7 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) { async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) { let mut harness = Harness::new_with_config(&AnalyzerConfig::default()); let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file"); - let mut qmdl_reader = QmdlReader::new(qmdl_file).await.expect("failed to open QmdlReader"); + let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader"); let mut report = Report::new(qmdl_path); while let Some(maybe_message) = qmdl_reader .get_next_message() @@ -127,7 +127,7 @@ async fn pcapify(qmdl_path: &PathBuf) { let qmdl_file = &mut File::open(&qmdl_path) .await .expect("failed to open qmdl file"); - let mut qmdl_reader = QmdlReader::new(qmdl_file).await.expect("failed to open QmdlReader"); + let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader"); let mut pcap_path = qmdl_path.clone(); pcap_path.set_extension("pcapng"); let pcap_file = &mut File::create(&pcap_path) diff --git a/daemon/src/diag.rs b/daemon/src/diag.rs index a0dfe3f..cc48992 100644 --- a/daemon/src/diag.rs +++ b/daemon/src/diag.rs @@ -323,23 +323,25 @@ impl DiagTask { self.stop(qmdl_store, Some(reason)).await; return; } - debug!( - "total QMDL bytes written: {}, updating manifest...", - qmdl_writer.total_uncompressed_bytes - ); - let index = qmdl_store - .current_entry - .expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); - if let Err(e) = qmdl_store - .update_entry_qmdl_size(index, qmdl_writer.total_uncompressed_bytes) - .await - { - let reason = format!("failed to update manifest (disk full?): {e}"); - error!("{reason}"); - self.stop(qmdl_store, Some(reason)).await; - return; + if let Ok(file_size) = qmdl_writer.size().await { + debug!( + "total QMDL bytes written: {}, updating manifest...", + file_size + ); + let index = qmdl_store + .current_entry + .expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); + if let Err(e) = qmdl_store + .update_entry_qmdl_size(index, file_size) + .await + { + let reason = format!("failed to update manifest (disk full?): {e}"); + error!("{reason}"); + self.stop(qmdl_store, Some(reason)).await; + return; + } + debug!("done!"); } - debug!("done!"); let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum(); self.bytes_since_space_check += container_bytes; let max_type = match analysis_writer.analyze_container(container).await { diff --git a/daemon/src/pcap.rs b/daemon/src/pcap.rs index 88e14d5..5af5639 100644 --- a/daemon/src/pcap.rs +++ b/daemon/src/pcap.rs @@ -9,7 +9,7 @@ use axum::response::{IntoResponse, Response}; use log::error; use rayhunter::gsmtap_parser; use rayhunter::pcap::GsmtapPcapWriter; -use rayhunter::qmdl::QmdlReader; +use rayhunter::qmdl::QmdlMessageReader; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, duplex}; use tokio_util::io::ReaderStream; @@ -44,7 +44,7 @@ pub async fn get_pcap( StatusCode::NOT_FOUND, format!("couldn't find manifest entry with name {qmdl_name}"), ))?; - if entry.uncompressed_qmdl_size_bytes == 0 { + if entry.qmdl_size_bytes == 0 { return Err(( StatusCode::SERVICE_UNAVAILABLE, "QMDL file is empty, try again in a bit!".to_string(), @@ -67,7 +67,7 @@ pub async fn get_pcap( Ok((headers, body).into_response()) } -pub async fn generate_pcap_data(writer: W, mut reader: QmdlReader) -> Result<(), Error> +pub async fn generate_pcap_data(writer: W, mut reader: QmdlMessageReader) -> Result<(), Error> where W: AsyncWrite + Unpin + Send, R: AsyncRead + AsyncSeek + Unpin, diff --git a/daemon/src/qmdl_store.rs b/daemon/src/qmdl_store.rs index 0ade581..7bfadb4 100644 --- a/daemon/src/qmdl_store.rs +++ b/daemon/src/qmdl_store.rs @@ -4,7 +4,7 @@ use std::path::{Path, PathBuf}; use chrono::{DateTime, Local}; use log::{info, warn}; -use rayhunter::qmdl::QmdlReader; +use rayhunter::qmdl::QmdlMessageReader; use rayhunter::util::RuntimeMetadata; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -58,10 +58,7 @@ pub struct ManifestEntry { /// The system time when the last message was recorded to the file #[cfg_attr(feature = "apidocs", schema(value_type = String))] pub last_message_time: Option>, - /// The size of the uncompressed QMDL data in bytes. Previously this was - /// called `qmdl_size_bytes`, so alias it for backwards compatibility. - #[serde(alias = "qmdl_size_bytes")] - pub uncompressed_qmdl_size_bytes: usize, + pub qmdl_size_bytes: usize, /// The rayhunter daemon version which generated the file pub rayhunter_version: Option, /// The OS which created the file @@ -82,7 +79,7 @@ impl ManifestEntry { name: format!("{}", now.timestamp()), start_time: now, last_message_time: None, - uncompressed_qmdl_size_bytes: 0, + qmdl_size_bytes: 0, rayhunter_version: Some(metadata.rayhunter_version), system_os: Some(metadata.system_os), arch: Some(metadata.arch), @@ -222,7 +219,7 @@ impl RecordingStore { compressed, start_time: start_time.into(), last_message_time: Some(last_message_time.into()), - uncompressed_qmdl_size_bytes: metadata.size() as usize, + qmdl_size_bytes: metadata.size() as usize, rayhunter_version: None, system_os: None, arch: None, @@ -283,12 +280,12 @@ impl RecordingStore { pub async fn open_entry_qmdl( &self, entry_index: usize, - ) -> Result, RecordingStoreError> { + ) -> Result, RecordingStoreError> { let entry = &self.manifest.entries[entry_index]; let file = File::open(entry.get_qmdl_filepath(&self.path)) .await .map_err(RecordingStoreError::ReadFileError)?; - QmdlReader::new(file) + QmdlMessageReader::new(file) .await .map_err(RecordingStoreError::ReadFileError) } @@ -335,7 +332,7 @@ impl RecordingStore { entry_index: usize, size_bytes: usize, ) -> Result<(), RecordingStoreError> { - self.manifest.entries[entry_index].uncompressed_qmdl_size_bytes = size_bytes; + self.manifest.entries[entry_index].qmdl_size_bytes = size_bytes; self.manifest.entries[entry_index].last_message_time = Some(rayhunter::clock::get_adjusted_now()); self.write_manifest().await @@ -512,7 +509,7 @@ mod tests { .unwrap(); assert!(entry.last_message_time.is_some()); assert_eq!( - store.manifest.entries[entry_index].uncompressed_qmdl_size_bytes, + store.manifest.entries[entry_index].qmdl_size_bytes, 1000 ); assert_eq!( diff --git a/daemon/src/server.rs b/daemon/src/server.rs index 1566336..26cc063 100644 --- a/daemon/src/server.rs +++ b/daemon/src/server.rs @@ -79,10 +79,10 @@ pub async fn get_qmdl( (CONTENT_TYPE, "application/octet-stream"), ( CONTENT_LENGTH, - &entry.uncompressed_qmdl_size_bytes.to_string(), + &entry.qmdl_size_bytes.to_string(), ), ]; - let body = Body::from_stream(qmdl_reader.as_qmdl_stream()); + let body = Body::from_stream(qmdl_reader.into_qmdl_stream()); Ok((headers, body).into_response()) } @@ -317,7 +317,7 @@ pub async fn get_zip( format!("couldn't find entry with name {qmdl_idx}"), ))?; - if entry.uncompressed_qmdl_size_bytes == 0 { + if entry.qmdl_size_bytes == 0 { return Err(( StatusCode::SERVICE_UNAVAILABLE, "QMDL file is empty, try again in a bit!".to_string(), @@ -431,7 +431,7 @@ mod tests { use async_zip::base::read::mem::ZipFileReader; use axum::extract::{Path, State}; use futures::AsyncReadExt; - use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlReader, QmdlWriter}}; + use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlMessageReader, QmdlWriter}}; use tempfile::TempDir; async fn create_test_qmdl_store() -> (TempDir, Arc>) { @@ -449,19 +449,20 @@ mod tests { ) -> String { let entry_name = { let mut store = store_lock.write().await; - let (qmdl_gz_file, _analysis_file) = store.new_entry().await.unwrap(); + let (mut qmdl_gz_file, _analysis_file) = store.new_entry().await.unwrap(); - let mut writer = QmdlWriter::new(qmdl_gz_file); + let mut writer = QmdlWriter::new(&mut qmdl_gz_file); writer.write_container(test_data).await.unwrap(); - let test_data_len = writer.total_uncompressed_bytes; writer.close().await.unwrap(); + let qmdl_file_size = qmdl_gz_file.metadata().await.unwrap().len() as usize; + let current_entry = store.current_entry.unwrap(); let entry = &store.manifest.entries[current_entry]; let entry_name = entry.name.clone(); store - .update_entry_qmdl_size(current_entry, test_data_len) + .update_entry_qmdl_size(current_entry, qmdl_file_size) .await .unwrap(); entry_name @@ -586,7 +587,7 @@ mod tests { .read_to_end(&mut qmdl_body) .await .unwrap(); - let mut qmdl_reader = QmdlReader::new(Cursor::new(qmdl_body)).await.unwrap(); + let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body)).await.unwrap(); let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap(); assert_eq!( qmdl_reader.get_next_message().await.unwrap(), diff --git a/lib/src/qmdl.rs b/lib/src/qmdl.rs index 86bd040..6864d87 100644 --- a/lib/src/qmdl.rs +++ b/lib/src/qmdl.rs @@ -21,26 +21,28 @@ where T: AsyncWrite + Unpin, { writer: GzipEncoder, - pub total_uncompressed_bytes: usize, } impl QmdlWriter where - T: AsyncWrite + Unpin, + T: AsyncWrite + AsyncSeek + Unpin, { pub fn new(writer: T) -> Self { let gzip_writer = GzipEncoder::new(writer); QmdlWriter { writer: gzip_writer, - total_uncompressed_bytes: 0, } } + pub async fn size(&mut self) -> std::io::Result { + let size = self.writer.get_mut().stream_position().await?; + Ok(size as usize) + } + pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> { for msg in &container.messages { self.writer.write_all(&msg.data).await?; self.writer.flush().await?; - self.total_uncompressed_bytes += msg.data.len(); } Ok(()) } @@ -120,7 +122,7 @@ where } #[derive(Debug)] -pub struct QmdlReader +pub struct QmdlMessageReader where T: AsyncRead, { @@ -138,7 +140,7 @@ where Ok(magic_number == GZIP_MAGIC_NUMBER) } -impl QmdlReader +impl QmdlMessageReader where T: AsyncRead + AsyncSeek + Unpin, { @@ -146,7 +148,7 @@ where let compressed = is_gzip_stream(&mut reader) .await .unwrap_or(false); - Ok(QmdlReader { + Ok(QmdlMessageReader { buf_reader: BufReader::new(QmdlAsyncReader::new( reader, compressed, @@ -154,7 +156,11 @@ where }) } - pub fn as_qmdl_stream(self) -> impl TryStream, Error = std::io::Error> { + pub fn is_compressed(&self) -> bool { + matches!(self.buf_reader.get_ref().source, QmdlReaderSource::Compressed { .. }) + } + + pub fn into_qmdl_stream(self) -> impl TryStream, Error = std::io::Error> { futures::stream::try_unfold(self, |mut reader| async { let mut buf = vec![]; match reader .buf_reader @@ -167,7 +173,7 @@ where }) } - pub fn as_message_stream(self) -> impl TryStream, Error = std::io::Error> { + pub fn into_message_stream(self) -> impl TryStream, Error = std::io::Error> { futures::stream::try_unfold(self, |mut reader| async { match reader.get_next_message().await? { Some(res) => Ok(Some((res, reader))), @@ -193,7 +199,7 @@ where } } -impl AsyncRead for QmdlReader +impl AsyncRead for QmdlMessageReader where T: AsyncRead + Unpin, { @@ -255,7 +261,8 @@ mod test { #[tokio::test] async fn test_qmdl_reader() { let mut buf = Cursor::new(get_test_message_bytes()); - let mut reader = QmdlReader::new(&mut buf).await.unwrap(); + let mut reader = QmdlMessageReader::new(&mut buf).await.unwrap(); + assert!(!reader.is_compressed()); let (_, expected_messages) = get_test_messages(); for msg in expected_messages { assert_eq!( @@ -301,7 +308,7 @@ mod test { let whole_bytes: usize = message_lengths.iter().take(truncated_hdlc_i).sum(); for truncated_byte in 1..message_lengths[truncated_hdlc_i] { let mut truncated_bytes = Cursor::new(&bytes[0..whole_bytes + truncated_byte]); - let mut reader = QmdlReader::new(&mut truncated_bytes).await.unwrap(); + let mut reader = QmdlMessageReader::new(&mut truncated_bytes).await.unwrap(); for msg in expected_messages.iter().take(truncated_hdlc_i) { assert_eq!( Ok(msg), @@ -359,7 +366,7 @@ mod test { /// a QmdlWriter, asserting that the containers match what's expected. async fn run_compressed_reading_and_writing_tests(do_close: bool) { let containers = get_test_containers(); - let mut buf = Vec::new(); + let mut buf = Cursor::new(Vec::new()); { let mut writer = QmdlWriter::new(&mut buf); for container in &containers { @@ -369,7 +376,9 @@ mod test { writer.close().await.unwrap(); } } - let mut reader = QmdlReader::new(Cursor::new(buf)).await.unwrap(); + buf.set_position(0); + let mut reader = QmdlMessageReader::new(buf).await.unwrap(); + assert!(reader.is_compressed()); let (_, expected_messages) = get_test_messages(); for message in expected_messages { assert_eq!(