Revert manifest change, rename some structs

Because QmdlMessageReader no longer cares about tracking the
uncompressed bytes read thus far, we can use the old manifest QMDL file
size name again.
This commit is contained in:
Will Greenberg
2026-06-03 19:11:53 -07:00
parent 0cd70ad73c
commit fda0659ba4
6 changed files with 65 additions and 56 deletions
+3 -3
View File
@@ -5,7 +5,7 @@ use rayhunter::{
analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness},
gsmtap_parser,
pcap::GsmtapPcapWriter,
qmdl::QmdlReader,
qmdl::QmdlMessageReader,
};
use std::{collections::HashMap, path::PathBuf};
use tokio::fs::File;
@@ -111,7 +111,7 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) {
async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) {
let mut harness = Harness::new_with_config(&AnalyzerConfig::default());
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
let mut qmdl_reader = QmdlReader::new(qmdl_file).await.expect("failed to open QmdlReader");
let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader");
let mut report = Report::new(qmdl_path);
while let Some(maybe_message) = qmdl_reader
.get_next_message()
@@ -127,7 +127,7 @@ async fn pcapify(qmdl_path: &PathBuf) {
let qmdl_file = &mut File::open(&qmdl_path)
.await
.expect("failed to open qmdl file");
let mut qmdl_reader = QmdlReader::new(qmdl_file).await.expect("failed to open QmdlReader");
let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader");
let mut pcap_path = qmdl_path.clone();
pcap_path.set_extension("pcapng");
let pcap_file = &mut File::create(&pcap_path)
+18 -16
View File
@@ -323,23 +323,25 @@ impl DiagTask {
self.stop(qmdl_store, Some(reason)).await;
return;
}
debug!(
"total QMDL bytes written: {}, updating manifest...",
qmdl_writer.total_uncompressed_bytes
);
let index = qmdl_store
.current_entry
.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
if let Err(e) = qmdl_store
.update_entry_qmdl_size(index, qmdl_writer.total_uncompressed_bytes)
.await
{
let reason = format!("failed to update manifest (disk full?): {e}");
error!("{reason}");
self.stop(qmdl_store, Some(reason)).await;
return;
if let Ok(file_size) = qmdl_writer.size().await {
debug!(
"total QMDL bytes written: {}, updating manifest...",
file_size
);
let index = qmdl_store
.current_entry
.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
if let Err(e) = qmdl_store
.update_entry_qmdl_size(index, file_size)
.await
{
let reason = format!("failed to update manifest (disk full?): {e}");
error!("{reason}");
self.stop(qmdl_store, Some(reason)).await;
return;
}
debug!("done!");
}
debug!("done!");
let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum();
self.bytes_since_space_check += container_bytes;
let max_type = match analysis_writer.analyze_container(container).await {
+3 -3
View File
@@ -9,7 +9,7 @@ use axum::response::{IntoResponse, Response};
use log::error;
use rayhunter::gsmtap_parser;
use rayhunter::pcap::GsmtapPcapWriter;
use rayhunter::qmdl::QmdlReader;
use rayhunter::qmdl::QmdlMessageReader;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, duplex};
use tokio_util::io::ReaderStream;
@@ -44,7 +44,7 @@ pub async fn get_pcap(
StatusCode::NOT_FOUND,
format!("couldn't find manifest entry with name {qmdl_name}"),
))?;
if entry.uncompressed_qmdl_size_bytes == 0 {
if entry.qmdl_size_bytes == 0 {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"QMDL file is empty, try again in a bit!".to_string(),
@@ -67,7 +67,7 @@ pub async fn get_pcap(
Ok((headers, body).into_response())
}
pub async fn generate_pcap_data<R, W>(writer: W, mut reader: QmdlReader<R>) -> Result<(), Error>
pub async fn generate_pcap_data<R, W>(writer: W, mut reader: QmdlMessageReader<R>) -> Result<(), Error>
where
W: AsyncWrite + Unpin + Send,
R: AsyncRead + AsyncSeek + Unpin,
+8 -11
View File
@@ -4,7 +4,7 @@ use std::path::{Path, PathBuf};
use chrono::{DateTime, Local};
use log::{info, warn};
use rayhunter::qmdl::QmdlReader;
use rayhunter::qmdl::QmdlMessageReader;
use rayhunter::util::RuntimeMetadata;
use serde::{Deserialize, Serialize};
use thiserror::Error;
@@ -58,10 +58,7 @@ pub struct ManifestEntry {
/// The system time when the last message was recorded to the file
#[cfg_attr(feature = "apidocs", schema(value_type = String))]
pub last_message_time: Option<DateTime<Local>>,
/// The size of the uncompressed QMDL data in bytes. Previously this was
/// called `qmdl_size_bytes`, so alias it for backwards compatibility.
#[serde(alias = "qmdl_size_bytes")]
pub uncompressed_qmdl_size_bytes: usize,
pub qmdl_size_bytes: usize,
/// The rayhunter daemon version which generated the file
pub rayhunter_version: Option<String>,
/// The OS which created the file
@@ -82,7 +79,7 @@ impl ManifestEntry {
name: format!("{}", now.timestamp()),
start_time: now,
last_message_time: None,
uncompressed_qmdl_size_bytes: 0,
qmdl_size_bytes: 0,
rayhunter_version: Some(metadata.rayhunter_version),
system_os: Some(metadata.system_os),
arch: Some(metadata.arch),
@@ -222,7 +219,7 @@ impl RecordingStore {
compressed,
start_time: start_time.into(),
last_message_time: Some(last_message_time.into()),
uncompressed_qmdl_size_bytes: metadata.size() as usize,
qmdl_size_bytes: metadata.size() as usize,
rayhunter_version: None,
system_os: None,
arch: None,
@@ -283,12 +280,12 @@ impl RecordingStore {
pub async fn open_entry_qmdl(
&self,
entry_index: usize,
) -> Result<QmdlReader<File>, RecordingStoreError> {
) -> Result<QmdlMessageReader<File>, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index];
let file = File::open(entry.get_qmdl_filepath(&self.path))
.await
.map_err(RecordingStoreError::ReadFileError)?;
QmdlReader::new(file)
QmdlMessageReader::new(file)
.await
.map_err(RecordingStoreError::ReadFileError)
}
@@ -335,7 +332,7 @@ impl RecordingStore {
entry_index: usize,
size_bytes: usize,
) -> Result<(), RecordingStoreError> {
self.manifest.entries[entry_index].uncompressed_qmdl_size_bytes = size_bytes;
self.manifest.entries[entry_index].qmdl_size_bytes = size_bytes;
self.manifest.entries[entry_index].last_message_time =
Some(rayhunter::clock::get_adjusted_now());
self.write_manifest().await
@@ -512,7 +509,7 @@ mod tests {
.unwrap();
assert!(entry.last_message_time.is_some());
assert_eq!(
store.manifest.entries[entry_index].uncompressed_qmdl_size_bytes,
store.manifest.entries[entry_index].qmdl_size_bytes,
1000
);
assert_eq!(
+10 -9
View File
@@ -79,10 +79,10 @@ pub async fn get_qmdl(
(CONTENT_TYPE, "application/octet-stream"),
(
CONTENT_LENGTH,
&entry.uncompressed_qmdl_size_bytes.to_string(),
&entry.qmdl_size_bytes.to_string(),
),
];
let body = Body::from_stream(qmdl_reader.as_qmdl_stream());
let body = Body::from_stream(qmdl_reader.into_qmdl_stream());
Ok((headers, body).into_response())
}
@@ -317,7 +317,7 @@ pub async fn get_zip(
format!("couldn't find entry with name {qmdl_idx}"),
))?;
if entry.uncompressed_qmdl_size_bytes == 0 {
if entry.qmdl_size_bytes == 0 {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"QMDL file is empty, try again in a bit!".to_string(),
@@ -431,7 +431,7 @@ mod tests {
use async_zip::base::read::mem::ZipFileReader;
use axum::extract::{Path, State};
use futures::AsyncReadExt;
use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlReader, QmdlWriter}};
use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlMessageReader, QmdlWriter}};
use tempfile::TempDir;
async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
@@ -449,19 +449,20 @@ mod tests {
) -> String {
let entry_name = {
let mut store = store_lock.write().await;
let (qmdl_gz_file, _analysis_file) = store.new_entry().await.unwrap();
let (mut qmdl_gz_file, _analysis_file) = store.new_entry().await.unwrap();
let mut writer = QmdlWriter::new(qmdl_gz_file);
let mut writer = QmdlWriter::new(&mut qmdl_gz_file);
writer.write_container(test_data).await.unwrap();
let test_data_len = writer.total_uncompressed_bytes;
writer.close().await.unwrap();
let qmdl_file_size = qmdl_gz_file.metadata().await.unwrap().len() as usize;
let current_entry = store.current_entry.unwrap();
let entry = &store.manifest.entries[current_entry];
let entry_name = entry.name.clone();
store
.update_entry_qmdl_size(current_entry, test_data_len)
.update_entry_qmdl_size(current_entry, qmdl_file_size)
.await
.unwrap();
entry_name
@@ -586,7 +587,7 @@ mod tests {
.read_to_end(&mut qmdl_body)
.await
.unwrap();
let mut qmdl_reader = QmdlReader::new(Cursor::new(qmdl_body)).await.unwrap();
let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body)).await.unwrap();
let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap();
assert_eq!(
qmdl_reader.get_next_message().await.unwrap(),
+23 -14
View File
@@ -21,26 +21,28 @@ where
T: AsyncWrite + Unpin,
{
writer: GzipEncoder<T>,
pub total_uncompressed_bytes: usize,
}
impl<T> QmdlWriter<T>
where
T: AsyncWrite + Unpin,
T: AsyncWrite + AsyncSeek + Unpin,
{
pub fn new(writer: T) -> Self {
let gzip_writer = GzipEncoder::new(writer);
QmdlWriter {
writer: gzip_writer,
total_uncompressed_bytes: 0,
}
}
pub async fn size(&mut self) -> std::io::Result<usize> {
let size = self.writer.get_mut().stream_position().await?;
Ok(size as usize)
}
pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> {
for msg in &container.messages {
self.writer.write_all(&msg.data).await?;
self.writer.flush().await?;
self.total_uncompressed_bytes += msg.data.len();
}
Ok(())
}
@@ -120,7 +122,7 @@ where
}
#[derive(Debug)]
pub struct QmdlReader<T>
pub struct QmdlMessageReader<T>
where
T: AsyncRead,
{
@@ -138,7 +140,7 @@ where
Ok(magic_number == GZIP_MAGIC_NUMBER)
}
impl<T> QmdlReader<T>
impl<T> QmdlMessageReader<T>
where
T: AsyncRead + AsyncSeek + Unpin,
{
@@ -146,7 +148,7 @@ where
let compressed = is_gzip_stream(&mut reader)
.await
.unwrap_or(false);
Ok(QmdlReader {
Ok(QmdlMessageReader {
buf_reader: BufReader::new(QmdlAsyncReader::new(
reader,
compressed,
@@ -154,7 +156,11 @@ where
})
}
pub fn as_qmdl_stream(self) -> impl TryStream<Ok = Vec<u8>, Error = std::io::Error> {
pub fn is_compressed(&self) -> bool {
matches!(self.buf_reader.get_ref().source, QmdlReaderSource::Compressed { .. })
}
pub fn into_qmdl_stream(self) -> impl TryStream<Ok = Vec<u8>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async {
let mut buf = vec![];
match reader .buf_reader
@@ -167,7 +173,7 @@ where
})
}
pub fn as_message_stream(self) -> impl TryStream<Ok = Result<Message, DiagParsingError>, Error = std::io::Error> {
pub fn into_message_stream(self) -> impl TryStream<Ok = Result<Message, DiagParsingError>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async {
match reader.get_next_message().await? {
Some(res) => Ok(Some((res, reader))),
@@ -193,7 +199,7 @@ where
}
}
impl<T> AsyncRead for QmdlReader<T>
impl<T> AsyncRead for QmdlMessageReader<T>
where
T: AsyncRead + Unpin,
{
@@ -255,7 +261,8 @@ mod test {
#[tokio::test]
async fn test_qmdl_reader() {
let mut buf = Cursor::new(get_test_message_bytes());
let mut reader = QmdlReader::new(&mut buf).await.unwrap();
let mut reader = QmdlMessageReader::new(&mut buf).await.unwrap();
assert!(!reader.is_compressed());
let (_, expected_messages) = get_test_messages();
for msg in expected_messages {
assert_eq!(
@@ -301,7 +308,7 @@ mod test {
let whole_bytes: usize = message_lengths.iter().take(truncated_hdlc_i).sum();
for truncated_byte in 1..message_lengths[truncated_hdlc_i] {
let mut truncated_bytes = Cursor::new(&bytes[0..whole_bytes + truncated_byte]);
let mut reader = QmdlReader::new(&mut truncated_bytes).await.unwrap();
let mut reader = QmdlMessageReader::new(&mut truncated_bytes).await.unwrap();
for msg in expected_messages.iter().take(truncated_hdlc_i) {
assert_eq!(
Ok(msg),
@@ -359,7 +366,7 @@ mod test {
/// a QmdlWriter, asserting that the containers match what's expected.
async fn run_compressed_reading_and_writing_tests(do_close: bool) {
let containers = get_test_containers();
let mut buf = Vec::new();
let mut buf = Cursor::new(Vec::new());
{
let mut writer = QmdlWriter::new(&mut buf);
for container in &containers {
@@ -369,7 +376,9 @@ mod test {
writer.close().await.unwrap();
}
}
let mut reader = QmdlReader::new(Cursor::new(buf)).await.unwrap();
buf.set_position(0);
let mut reader = QmdlMessageReader::new(buf).await.unwrap();
assert!(reader.is_compressed());
let (_, expected_messages) = get_test_messages();
for message in expected_messages {
assert_eq!(