Add support for compressed QMDL

Major changes:
* QmdlWriter now outputs gzipped QMDL files by default
* QmdlReader renamed to QmdlMessageReader, and reads both compressed and
  uncompressed QMDL. It no longer requires bounding to avoid reading
  partially written files.
This commit is contained in:
Will Greenberg
2026-03-30 15:56:03 -07:00
committed by Brad Warren
parent f5a0cddc88
commit 94b989c3c0
13 changed files with 641 additions and 394 deletions
+113 -36
View File
@@ -11,10 +11,13 @@ use axum::http::{HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use chrono::{DateTime, Local};
use log::{error, warn};
use rayhunter::qmdl::QmdlMessageReader;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use std::sync::Arc;
use tokio::fs::write;
use tokio::io::{AsyncReadExt, copy, duplex};
use tokio::io::copy;
use tokio::io::duplex;
use tokio::sync::RwLock;
use tokio::sync::mpsc::Sender;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
@@ -81,14 +84,23 @@ pub async fn get_qmdl(
)
})?
.ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?;
let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64);
let qmdl_stream = ReaderStream::new(limited_qmdl_file);
let qmdl_reader = QmdlMessageReader::new(qmdl_file)
.await
.map_err(|err| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("error reading QMDL file: {err}"),
)
})?;
let headers = [
(CONTENT_TYPE, "application/octet-stream"),
(CONTENT_LENGTH, &entry.qmdl_size_bytes.to_string()),
(
CONTENT_LENGTH,
&entry.qmdl_size_bytes.to_string(),
),
];
let body = Body::from_stream(qmdl_stream);
let body = Body::from_stream(qmdl_reader.into_qmdl_stream());
Ok((headers, body).into_response())
}
@@ -334,7 +346,7 @@ pub async fn get_zip(
Path(entry_name): Path<String>,
) -> Result<Response, (StatusCode, String)> {
let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned();
let (entry_index, qmdl_size_bytes) = {
let (entry_index, compressed, qmdl_file_size) = {
let qmdl_store = state.qmdl_store_lock.read().await;
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or((
StatusCode::NOT_FOUND,
@@ -348,7 +360,7 @@ pub async fn get_zip(
));
}
(entry_index, entry.qmdl_size_bytes)
(entry_index, entry.compressed, entry.qmdl_size_bytes)
};
let qmdl_store_lock = state.qmdl_store_lock.clone();
@@ -377,23 +389,22 @@ pub async fn get_zip(
continue;
};
let entry = ZipEntryBuilder::new(
file_kind.get_filename(&qmdl_idx).into(),
let zip_entry = ZipEntryBuilder::new(
file_kind.get_filename(&qmdl_idx, compressed).into(),
Compression::Stored,
);
// FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does
// not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed
// once https://github.com/Majored/rs-async-zip/pull/160 is released.
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
let mut entry_writer = zip.write_entry_stream(zip_entry).await?.compat_write();
// Truncating to qmdl_size_bytes is an attempt to ignore partial writes by the diag
// thread.
if file_kind == FileKind::Qmdl {
copy(&mut file.take(qmdl_size_bytes as u64), &mut entry_writer).await?;
copy(&mut file.take(qmdl_file_size as u64), &mut entry_writer).await?;
} else {
copy(&mut file, &mut entry_writer).await?;
}
entry_writer.into_inner().close().await?;
}
@@ -409,13 +420,12 @@ pub async fn get_zip(
.open_file(entry_index, FileKind::Qmdl)
.await?
.ok_or_else(|| anyhow::anyhow!("QMDL file not found"))?
.take(qmdl_size_bytes as u64)
};
let qmdl_reader = QmdlMessageReader::new(qmdl_file_for_pcap).await?;
if let Err(e) = generate_pcap_data(
&mut entry_writer,
qmdl_file_for_pcap,
qmdl_size_bytes,
qmdl_reader,
gps_records,
)
.await
@@ -532,10 +542,14 @@ pub async fn debug_set_display_state(
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
use crate::config::GpsMode;
use async_zip::base::read::mem::ZipFileReader;
use axum::extract::{Path, State};
use futures::AsyncReadExt;
use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlMessageReader, QmdlWriter}};
use tempfile::TempDir;
async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
@@ -549,24 +563,24 @@ mod tests {
async fn create_test_entry_with_data(
store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>,
test_data: &[u8],
test_data: &MessagesContainer,
) -> String {
let entry_name = {
let mut store = store_lock.write().await;
let (mut qmdl_file, _analysis_file) = store.new_entry(GpsMode::Disabled).await.unwrap();
let (mut qmdl_gz_file, _analysis_file) = store.new_entry(GpsMode::Disabled).await.unwrap();
if !test_data.is_empty() {
use tokio::io::AsyncWriteExt;
qmdl_file.write_all(test_data).await.unwrap();
qmdl_file.flush().await.unwrap();
}
let mut writer = QmdlWriter::new(&mut qmdl_gz_file);
writer.write_container(test_data).await.unwrap();
writer.close().await.unwrap();
let qmdl_file_size = qmdl_gz_file.metadata().await.unwrap().len() as usize;
let current_entry = store.current_entry.unwrap();
let entry = &store.manifest.entries[current_entry];
let entry_name = entry.name.clone();
store
.update_entry_qmdl_size(current_entry, test_data.len())
.update_entry_qmdl_size(current_entry, qmdl_file_size)
.await
.unwrap();
entry_name
@@ -604,17 +618,69 @@ mod tests {
})
}
// valid HDLC encapsulated diag message generated from
// rayhunter::diag::test::get_test_message
fn create_test_container() -> MessagesContainer {
MessagesContainer {
data_type: DataType::UserSpace,
num_messages: 1,
messages: vec![
HdlcEncapsulatedMessage {
len: 39,
data: vec![
16,
0,
32,
0,
32,
0,
192,
176,
26,
165,
245,
135,
118,
35,
2,
1,
20,
14,
48,
0,
160,
0,
2,
8,
0,
0,
217,
15,
5,
0,
0,
0,
0,
1,
0,
10,
13,
196,
126,
],
},
],
}
}
#[tokio::test]
async fn test_get_zip_success() {
let (_temp_dir, store_lock) = create_test_qmdl_store().await;
let test_qmdl_data = vec![0x7E, 0x00, 0x00, 0x00, 0x10, 0x00, 0x7E];
let test_qmdl_data = create_test_container();
let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await;
let state = create_test_server_state(store_lock);
let result = get_zip(State(state), Path(entry_name.clone())).await;
assert!(result.is_ok());
let response = result.unwrap();
let response = get_zip(State(state), Path(entry_name.clone())).await.unwrap();
let headers = response.headers();
assert_eq!(headers.get("content-type").unwrap(), "application/zip");
@@ -623,21 +689,32 @@ mod tests {
let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap();
let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap();
let filenames = zip_reader
.file()
.entries()
let zip_reader_file = zip_reader.file();
let filenames: Vec<String> = zip_reader_file.entries()
.iter()
.map(|entry| entry.filename().as_str().unwrap().to_owned())
.collect::<Vec<String>>();
.map(|entry| entry.filename().as_str().unwrap().to_string())
.collect();
assert_eq!(
filenames,
vec![
format!("{entry_name}.qmdl"),
format!("{entry_name}.qmdl.gz"),
format!("{entry_name}-gps.ndjson"),
format!("{entry_name}.pcapng"),
]
);
let mut qmdl_body = Vec::with_capacity(128);
zip_reader.reader_without_entry(0)
.await
.unwrap()
.read_to_end(&mut qmdl_body)
.await
.unwrap();
let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body)).await.unwrap();
let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap();
assert_eq!(
qmdl_reader.get_next_message().await.unwrap(),
Some(Ok(expected_message)),
);
}
}