Compare commits

...

5 Commits

Author SHA1 Message Date
Will Greenberg 58c60c2661 daemon: put QmdlWriter in a Box
This'll balance the enum size given QmdlWriter's larger size
2026-04-01 12:29:58 -07:00
Will Greenberg e0ae8a0298 run cargo fmt 2026-04-01 11:55:56 -07:00
Will Greenberg e6a3a4331e daemon: fix zip test 2026-04-01 11:43:48 -07:00
Will Greenberg 9191540e86 qmdl_store: maintain backwards compatibility 2026-04-01 11:40:04 -07:00
Will Greenberg 0a93e93838 Add support for compressed QMDL
This reworks the QmdlWriter to output gzipped QMDL files by default,
and allows QmdlReader to operate on either compressed or uncompressed
QMDLs.

QmdlReader has been significantly rewritten to expose a single AsyncRead
interface to both compressed and uncompressed QMDL sources.
2026-03-30 19:58:13 -07:00
11 changed files with 337 additions and 174 deletions
Generated
+50 -27
View File
@@ -274,6 +274,18 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "async-compression"
version = "0.4.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0f9ee0f6e02ffd7ad5816e9464499fba7b3effd01123b515c41d1697c43dad1"
dependencies = [
"compression-codecs",
"compression-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "async-executor" name = "async-executor"
version = "1.13.3" version = "1.13.3"
@@ -945,6 +957,23 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "compression-codecs"
version = "0.4.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb7b51a7d9c967fc26773061ba86150f19c50c0d65c887cb1fbe295fd16619b7"
dependencies = [
"compression-core",
"flate2",
"memchr",
]
[[package]]
name = "compression-core"
version = "0.4.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d"
[[package]] [[package]]
name = "concurrent-queue" name = "concurrent-queue"
version = "2.5.0" version = "2.5.0"
@@ -1733,9 +1762,9 @@ dependencies = [
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.1.1" version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c"
dependencies = [ dependencies = [
"crc32fast", "crc32fast",
"miniz_oxide", "miniz_oxide",
@@ -1812,9 +1841,9 @@ dependencies = [
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@@ -1827,9 +1856,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@@ -1837,15 +1866,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-task", "futures-task",
@@ -1854,9 +1883,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
[[package]] [[package]]
name = "futures-lite" name = "futures-lite"
@@ -1873,9 +1902,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -1884,21 +1913,21 @@ dependencies = [
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.31" version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@@ -1908,7 +1937,6 @@ dependencies = [
"futures-task", "futures-task",
"memchr", "memchr",
"pin-project-lite", "pin-project-lite",
"pin-utils",
"slab", "slab",
] ]
@@ -4132,12 +4160,6 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "piper" name = "piper"
version = "0.2.4" version = "0.2.4"
@@ -4674,6 +4696,7 @@ checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539"
name = "rayhunter" name = "rayhunter"
version = "0.10.2" version = "0.10.2"
dependencies = [ dependencies = [
"async-compression",
"bytes", "bytes",
"chrono", "chrono",
"crc", "crc",
+5 -10
View File
@@ -113,12 +113,8 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) {
async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) { async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) {
let mut harness = Harness::new_with_config(&AnalyzerConfig::default()); let mut harness = Harness::new_with_config(&AnalyzerConfig::default());
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file"); let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
let file_size = qmdl_file let compressed = qmdl_path.ends_with(".gz");
.metadata() let qmdl_reader = QmdlReader::new(qmdl_file, compressed, None);
.await
.expect("failed to get QMDL file metadata")
.len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin!( let mut qmdl_stream = pin!(
qmdl_reader qmdl_reader
.as_stream() .as_stream()
@@ -141,8 +137,9 @@ async fn pcapify(qmdl_path: &PathBuf) {
let qmdl_file = &mut File::open(&qmdl_path) let qmdl_file = &mut File::open(&qmdl_path)
.await .await
.expect("failed to open qmdl file"); .expect("failed to open qmdl file");
let compressed = qmdl_path.ends_with(".gz");
let qmdl_file_size = qmdl_file.metadata().await.unwrap().len(); let qmdl_file_size = qmdl_file.metadata().await.unwrap().len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_file_size as usize)); let mut qmdl_reader = QmdlReader::new(qmdl_file, compressed, Some(qmdl_file_size as usize));
let mut pcap_path = qmdl_path.clone(); let mut pcap_path = qmdl_path.clone();
pcap_path.set_extension("pcapng"); pcap_path.set_extension("pcapng");
let pcap_file = &mut File::create(&pcap_path) let pcap_file = &mut File::create(&pcap_path)
@@ -197,9 +194,7 @@ async fn main() {
let name_str = name.to_str().unwrap(); let name_str = name.to_str().unwrap();
let path = entry.path(); let path = entry.path();
let path_str = path.to_str().unwrap(); let path_str = path.to_str().unwrap();
// instead of relying on the QMDL extension, can we check if a file is if name_str.ends_with(".qmdl") || name_str.ends_with(".qmdl.gz") {
// QMDL by inspecting the contents?
if name_str.ends_with(".qmdl") {
info!("**** Beginning analysis of {name_str}"); info!("**** Beginning analysis of {name_str}");
analyze_qmdl(path_str, args.show_skipped).await; analyze_qmdl(path_str, args.show_skipped).await;
if args.pcapify { if args.pcapify {
+1 -1
View File
@@ -33,7 +33,7 @@ futures-macro = "0.3.30"
include_dir = "0.7.3" include_dir = "0.7.3"
chrono = { version = "0.4.31", features = ["serde"] } chrono = { version = "0.4.31", features = ["serde"] }
tokio-stream = { version = "0.1.14", default-features = false, features = ["io-util"] } tokio-stream = { version = "0.1.14", default-features = false, features = ["io-util"] }
futures = { version = "0.3.30", default-features = false } futures = { version = "0.3.32", default-features = false, features = ["std"] }
serde_json = "1.0.114" serde_json = "1.0.114"
image = { version = "0.25.1", default-features = false, features = ["png", "gif"] } image = { version = "0.25.1", default-features = false, features = ["png", "gif"] }
tempfile = "3.10.2" tempfile = "3.10.2"
+3 -10
View File
@@ -10,7 +10,6 @@ use futures::TryStreamExt;
use log::{error, info}; use log::{error, info};
use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness}; use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness};
use rayhunter::diag::{DataType, MessagesContainer}; use rayhunter::diag::{DataType, MessagesContainer};
use rayhunter::qmdl::QmdlReader;
use serde::Serialize; use serde::Serialize;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::io::{AsyncWriteExt, BufWriter};
@@ -135,7 +134,7 @@ async fn perform_analysis(
analyzer_config: &AnalyzerConfig, analyzer_config: &AnalyzerConfig,
) -> Result<(), String> { ) -> Result<(), String> {
info!("Opening QMDL and analysis file for {name}..."); info!("Opening QMDL and analysis file for {name}...");
let (analysis_file, qmdl_file) = { let (analysis_file, qmdl_reader) = {
let mut qmdl_store = qmdl_store_lock.write().await; let mut qmdl_store = qmdl_store_lock.write().await;
let (entry_index, _) = qmdl_store let (entry_index, _) = qmdl_store
.entry_for_name(name) .entry_for_name(name)
@@ -144,23 +143,17 @@ async fn perform_analysis(
.clear_and_open_entry_analysis(entry_index) .clear_and_open_entry_analysis(entry_index)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
let qmdl_file = qmdl_store let qmdl_reader = qmdl_store
.open_entry_qmdl(entry_index) .open_entry_qmdl(entry_index)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
(analysis_file, qmdl_file) (analysis_file, qmdl_reader)
}; };
let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config) let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config)
.await .await
.map_err(|e| format!("{e:?}"))?; .map_err(|e| format!("{e:?}"))?;
let file_size = qmdl_file
.metadata()
.await
.expect("failed to get QMDL file metadata")
.len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin::pin!( let mut qmdl_stream = pin::pin!(
qmdl_reader qmdl_reader
.as_stream() .as_stream()
+20 -10
View File
@@ -63,7 +63,7 @@ pub struct DiagTask {
enum DiagState { enum DiagState {
Recording { Recording {
qmdl_writer: QmdlWriter<File>, qmdl_writer: Box<QmdlWriter<File>>,
analysis_writer: Box<AnalysisWriter>, analysis_writer: Box<AnalysisWriter>,
}, },
Stopped, Stopped,
@@ -143,7 +143,7 @@ impl DiagTask {
DiskSpaceCheck::Failed => {} DiskSpaceCheck::Failed => {}
} }
let (qmdl_file, analysis_file) = match qmdl_store.new_entry().await { let (qmdl_gz_file, analysis_file) = match qmdl_store.new_entry().await {
Ok(files) => files, Ok(files) => files,
Err(e) => { Err(e) => {
let msg = format!("failed creating QMDL file entry: {e}"); let msg = format!("failed creating QMDL file entry: {e}");
@@ -152,7 +152,7 @@ impl DiagTask {
} }
}; };
self.stop_current_recording().await; self.stop_current_recording().await;
let qmdl_writer = QmdlWriter::new(qmdl_file); let qmdl_writer = Box::new(QmdlWriter::new(qmdl_gz_file));
let analysis_writer = match AnalysisWriter::new(analysis_file, &self.analyzer_config).await let analysis_writer = match AnalysisWriter::new(analysis_file, &self.analyzer_config).await
{ {
Ok(writer) => Box::new(writer), Ok(writer) => Box::new(writer),
@@ -237,13 +237,23 @@ impl DiagTask {
let mut state = DiagState::Stopped; let mut state = DiagState::Stopped;
std::mem::swap(&mut self.state, &mut state); std::mem::swap(&mut self.state, &mut state);
if let DiagState::Recording { if let DiagState::Recording {
analysis_writer, .. qmdl_writer,
analysis_writer,
..
} = state } = state
{ {
analysis_writer match (qmdl_writer.close().await, analysis_writer.close().await) {
.close() (Ok(()), Ok(())) => {}
.await (qmdl_result, analysis_result) => {
.expect("failed to close analysis writer"); if let Err(err) = qmdl_result {
error!("failed to close QmdlWriter: {:?}", err);
}
if let Err(err) = analysis_result {
error!("failed to close AnalysisWriter: {:?}", err);
}
panic!();
}
}
} }
} }
@@ -315,13 +325,13 @@ impl DiagTask {
} }
debug!( debug!(
"total QMDL bytes written: {}, updating manifest...", "total QMDL bytes written: {}, updating manifest...",
qmdl_writer.total_written qmdl_writer.total_uncompressed_bytes
); );
let index = qmdl_store let index = qmdl_store
.current_entry .current_entry
.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); .expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
if let Err(e) = qmdl_store if let Err(e) = qmdl_store
.update_entry_qmdl_size(index, qmdl_writer.total_written) .update_entry_qmdl_size(index, qmdl_writer.total_uncompressed_bytes)
.await .await
{ {
let reason = format!("failed to update manifest (disk full?): {e}"); let reason = format!("failed to update manifest (disk full?): {e}");
+4 -12
View File
@@ -45,23 +45,20 @@ pub async fn get_pcap(
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
format!("couldn't find manifest entry with name {qmdl_name}"), format!("couldn't find manifest entry with name {qmdl_name}"),
))?; ))?;
if entry.qmdl_size_bytes == 0 { if entry.uncompressed_qmdl_size_bytes == 0 {
return Err(( return Err((
StatusCode::SERVICE_UNAVAILABLE, StatusCode::SERVICE_UNAVAILABLE,
"QMDL file is empty, try again in a bit!".to_string(), "QMDL file is empty, try again in a bit!".to_string(),
)); ));
} }
let qmdl_size_bytes = entry.qmdl_size_bytes; let qmdl_reader = qmdl_store
let qmdl_file = qmdl_store
.open_entry_qmdl(entry_index) .open_entry_qmdl(entry_index)
.await .await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?; .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
// the QMDL reader should stop at the last successfully written data chunk
// (entry.size_bytes)
let (reader, writer) = duplex(1024); let (reader, writer) = duplex(1024);
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes).await { if let Err(e) = generate_pcap_data(writer, qmdl_reader).await {
error!("failed to generate PCAP: {e:?}"); error!("failed to generate PCAP: {e:?}");
} }
}); });
@@ -71,11 +68,7 @@ pub async fn get_pcap(
Ok((headers, body).into_response()) Ok((headers, body).into_response())
} }
pub async fn generate_pcap_data<R, W>( pub async fn generate_pcap_data<R, W>(writer: W, mut reader: QmdlReader<R>) -> Result<(), Error>
writer: W,
qmdl_file: R,
qmdl_size_bytes: usize,
) -> Result<(), Error>
where where
W: AsyncWrite + Unpin + Send, W: AsyncWrite + Unpin + Send,
R: AsyncRead + Unpin, R: AsyncRead + Unpin,
@@ -83,7 +76,6 @@ where
let mut pcap_writer = GsmtapPcapWriter::new(writer).await?; let mut pcap_writer = GsmtapPcapWriter::new(writer).await?;
pcap_writer.write_iface_header().await?; pcap_writer.write_iface_header().await?;
let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes));
while let Some(container) = reader.get_next_messages_container().await? { while let Some(container) = reader.get_next_messages_container().await? {
if container.data_type != DataType::UserSpace { if container.data_type != DataType::UserSpace {
continue; continue;
+41 -15
View File
@@ -4,6 +4,7 @@ use std::path::{Path, PathBuf};
use chrono::{DateTime, Local}; use chrono::{DateTime, Local};
use log::{info, warn}; use log::{info, warn};
use rayhunter::qmdl::QmdlReader;
use rayhunter::util::RuntimeMetadata; use rayhunter::util::RuntimeMetadata;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use thiserror::Error; use thiserror::Error;
@@ -57,8 +58,10 @@ pub struct ManifestEntry {
/// The system time when the last message was recorded to the file /// The system time when the last message was recorded to the file
#[cfg_attr(feature = "apidocs", schema(value_type = String))] #[cfg_attr(feature = "apidocs", schema(value_type = String))]
pub last_message_time: Option<DateTime<Local>>, pub last_message_time: Option<DateTime<Local>>,
/// The size of the QMDL file in bytes /// The size of the uncompressed QMDL data in bytes. Previously this was
pub qmdl_size_bytes: usize, /// called `qmdl_size_bytes`, so alias it for backwards compatibility.
#[serde(alias = "qmdl_size_bytes")]
pub uncompressed_qmdl_size_bytes: usize,
/// The rayhunter daemon version which generated the file /// The rayhunter daemon version which generated the file
pub rayhunter_version: Option<String>, pub rayhunter_version: Option<String>,
/// The OS which created the file /// The OS which created the file
@@ -67,6 +70,8 @@ pub struct ManifestEntry {
pub arch: Option<String>, pub arch: Option<String>,
#[serde(default)] #[serde(default)]
pub stop_reason: Option<String>, pub stop_reason: Option<String>,
#[serde(default)]
pub compressed: bool,
} }
impl ManifestEntry { impl ManifestEntry {
@@ -77,17 +82,22 @@ impl ManifestEntry {
name: format!("{}", now.timestamp()), name: format!("{}", now.timestamp()),
start_time: now, start_time: now,
last_message_time: None, last_message_time: None,
qmdl_size_bytes: 0, uncompressed_qmdl_size_bytes: 0,
rayhunter_version: Some(metadata.rayhunter_version), rayhunter_version: Some(metadata.rayhunter_version),
system_os: Some(metadata.system_os), system_os: Some(metadata.system_os),
arch: Some(metadata.arch), arch: Some(metadata.arch),
stop_reason: None, stop_reason: None,
compressed: true,
} }
} }
pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf { pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
let mut filepath = path.as_ref().join(&self.name); let mut filepath = path.as_ref().join(&self.name);
filepath.set_extension("qmdl"); if self.compressed {
filepath.set_extension("qmdl.gz");
} else {
filepath.set_extension("qmdl");
}
filepath filepath
} }
@@ -153,8 +163,9 @@ impl RecordingStore {
} }
// Does a best-effort attempt to recover the manifest from a directory of // Does a best-effort attempt to recover the manifest from a directory of
// QMDL files. We expect these files to be named like "<timestamp>.qmdl", // QMDL files. We expect these files to be named like "<timestamp>.qmdl"
// and skip any files which don't match that pattern. // or "<timestamp>.qmdl.gz", and skip any files which don't match that
// pattern.
pub async fn recover<P>(path: P) -> Result<Self, RecordingStoreError> pub async fn recover<P>(path: P) -> Result<Self, RecordingStoreError>
where where
P: AsRef<Path>, P: AsRef<Path>,
@@ -174,11 +185,14 @@ impl RecordingStore {
continue; continue;
}; };
if !filename.ends_with(".qmdl") { let (stem, compressed) = if filename.ends_with(".qmdl") {
(filename.trim_end_matches(".qmdl"), false)
} else if filename.ends_with(".qmdl.gz") {
(filename.trim_end_matches(".qmdl.gz"), true)
} else {
continue; continue;
} };
let stem = filename.trim_end_matches(".qmdl");
let Ok(start_timestamp) = stem.parse::<i64>() else { let Ok(start_timestamp) = stem.parse::<i64>() else {
warn!("QMDL file has invalid name {os_filename:?}, skipping"); warn!("QMDL file has invalid name {os_filename:?}, skipping");
continue; continue;
@@ -205,9 +219,10 @@ impl RecordingStore {
info!("successfully recovered QMDL entry {os_filename:?}!"); info!("successfully recovered QMDL entry {os_filename:?}!");
manifest_entries.push(ManifestEntry { manifest_entries.push(ManifestEntry {
name: stem.to_string(), name: stem.to_string(),
compressed,
start_time: start_time.into(), start_time: start_time.into(),
last_message_time: Some(last_message_time.into()), last_message_time: Some(last_message_time.into()),
qmdl_size_bytes: metadata.size() as usize, uncompressed_qmdl_size_bytes: metadata.size() as usize,
rayhunter_version: None, rayhunter_version: None,
system_os: None, system_os: None,
arch: None, arch: None,
@@ -265,11 +280,19 @@ impl RecordingStore {
} }
// Returns the corresponding QMDL file for a given entry // Returns the corresponding QMDL file for a given entry
pub async fn open_entry_qmdl(&self, entry_index: usize) -> Result<File, RecordingStoreError> { pub async fn open_entry_qmdl(
&self,
entry_index: usize,
) -> Result<QmdlReader<File>, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index]; let entry = &self.manifest.entries[entry_index];
File::open(entry.get_qmdl_filepath(&self.path)) let file = File::open(entry.get_qmdl_filepath(&self.path))
.await .await
.map_err(RecordingStoreError::ReadFileError) .map_err(RecordingStoreError::ReadFileError)?;
Ok(QmdlReader::new(
file,
entry.compressed,
Some(entry.uncompressed_qmdl_size_bytes),
))
} }
// Returns the corresponding QMDL file for a given entry // Returns the corresponding QMDL file for a given entry
@@ -314,7 +337,7 @@ impl RecordingStore {
entry_index: usize, entry_index: usize,
size_bytes: usize, size_bytes: usize,
) -> Result<(), RecordingStoreError> { ) -> Result<(), RecordingStoreError> {
self.manifest.entries[entry_index].qmdl_size_bytes = size_bytes; self.manifest.entries[entry_index].uncompressed_qmdl_size_bytes = size_bytes;
self.manifest.entries[entry_index].last_message_time = self.manifest.entries[entry_index].last_message_time =
Some(rayhunter::clock::get_adjusted_now()); Some(rayhunter::clock::get_adjusted_now());
self.write_manifest().await self.write_manifest().await
@@ -490,7 +513,10 @@ mod tests {
.entry_for_name(&store.manifest.entries[entry_index].name) .entry_for_name(&store.manifest.entries[entry_index].name)
.unwrap(); .unwrap();
assert!(entry.last_message_time.is_some()); assert!(entry.last_message_time.is_some());
assert_eq!(store.manifest.entries[entry_index].qmdl_size_bytes, 1000); assert_eq!(
store.manifest.entries[entry_index].uncompressed_qmdl_size_bytes,
1000
);
assert_eq!( assert_eq!(
RecordingStore::read_manifest(dir.path()).await.unwrap(), RecordingStore::read_manifest(dir.path()).await.unwrap(),
store.manifest store.manifest
+26 -32
View File
@@ -14,7 +14,8 @@ use log::{error, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::fs::write; use tokio::fs::write;
use tokio::io::{AsyncReadExt, copy, duplex}; use tokio::io::copy;
use tokio::io::duplex;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio_util::compat::FuturesAsyncWriteCompatExt; use tokio_util::compat::FuturesAsyncWriteCompatExt;
@@ -64,7 +65,7 @@ pub async fn get_qmdl(
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
format!("couldn't find qmdl file with name {qmdl_idx}"), format!("couldn't find qmdl file with name {qmdl_idx}"),
))?; ))?;
let qmdl_file = qmdl_store let qmdl_reader = qmdl_store
.open_entry_qmdl(entry_index) .open_entry_qmdl(entry_index)
.await .await
.map_err(|err| { .map_err(|err| {
@@ -73,14 +74,15 @@ pub async fn get_qmdl(
format!("error opening QMDL file: {err}"), format!("error opening QMDL file: {err}"),
) )
})?; })?;
let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64);
let qmdl_stream = ReaderStream::new(limited_qmdl_file);
let headers = [ let headers = [
(CONTENT_TYPE, "application/octet-stream"), (CONTENT_TYPE, "application/octet-stream"),
(CONTENT_LENGTH, &entry.qmdl_size_bytes.to_string()), (
CONTENT_LENGTH,
&entry.uncompressed_qmdl_size_bytes.to_string(),
),
]; ];
let body = Body::from_stream(qmdl_stream); let body = Body::from_stream(qmdl_reader.as_stream());
Ok((headers, body).into_response()) Ok((headers, body).into_response())
} }
@@ -308,21 +310,21 @@ pub async fn get_zip(
Path(entry_name): Path<String>, Path(entry_name): Path<String>,
) -> Result<Response, (StatusCode, String)> { ) -> Result<Response, (StatusCode, String)> {
let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned(); let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned();
let (entry_index, qmdl_size_bytes) = { let (entry_index, compressed) = {
let qmdl_store = state.qmdl_store_lock.read().await; let qmdl_store = state.qmdl_store_lock.read().await;
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or(( let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or((
StatusCode::NOT_FOUND, StatusCode::NOT_FOUND,
format!("couldn't find entry with name {qmdl_idx}"), format!("couldn't find entry with name {qmdl_idx}"),
))?; ))?;
if entry.qmdl_size_bytes == 0 { if entry.uncompressed_qmdl_size_bytes == 0 {
return Err(( return Err((
StatusCode::SERVICE_UNAVAILABLE, StatusCode::SERVICE_UNAVAILABLE,
"QMDL file is empty, try again in a bit!".to_string(), "QMDL file is empty, try again in a bit!".to_string(),
)); ));
} }
(entry_index, entry.qmdl_size_bytes) (entry_index, entry.compressed)
}; };
let qmdl_store_lock = state.qmdl_store_lock.clone(); let qmdl_store_lock = state.qmdl_store_lock.clone();
@@ -335,22 +337,18 @@ pub async fn get_zip(
// Add QMDL file // Add QMDL file
{ {
let entry = let extension = if compressed { "qmdl.gz" } else { "qmdl" };
ZipEntryBuilder::new(format!("{qmdl_idx}.qmdl").into(), Compression::Stored); let entry = ZipEntryBuilder::new(
format!("{qmdl_idx}.{extension}").into(),
Compression::Stored,
);
// FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does // FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does
// not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed // not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed
// once https://github.com/Majored/rs-async-zip/pull/160 is released. // once https://github.com/Majored/rs-async-zip/pull/160 is released.
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write(); let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
let qmdl_store = qmdl_store_lock.read().await;
let mut qmdl_file = { let mut qmdl_reader = qmdl_store.open_entry_qmdl(entry_index).await?;
let qmdl_store = qmdl_store_lock.read().await; copy(&mut qmdl_reader, &mut entry_writer).await?;
qmdl_store
.open_entry_qmdl(entry_index)
.await?
.take(qmdl_size_bytes as u64)
};
copy(&mut qmdl_file, &mut entry_writer).await?;
entry_writer.into_inner().close().await?; entry_writer.into_inner().close().await?;
} }
@@ -360,17 +358,10 @@ pub async fn get_zip(
ZipEntryBuilder::new(format!("{qmdl_idx}.pcapng").into(), Compression::Stored); ZipEntryBuilder::new(format!("{qmdl_idx}.pcapng").into(), Compression::Stored);
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write(); let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
let qmdl_file_for_pcap = { let qmdl_store = qmdl_store_lock.read().await;
let qmdl_store = qmdl_store_lock.read().await; let qmdl_reader = qmdl_store.open_entry_qmdl(entry_index).await?;
qmdl_store
.open_entry_qmdl(entry_index)
.await?
.take(qmdl_size_bytes as u64)
};
if let Err(e) = if let Err(e) = generate_pcap_data(&mut entry_writer, qmdl_reader).await {
generate_pcap_data(&mut entry_writer, qmdl_file_for_pcap, qmdl_size_bytes).await
{
// if we fail to generate the PCAP file, we should still continue and give the // if we fail to generate the PCAP file, we should still continue and give the
// user the QMDL. // user the QMDL.
error!("Failed to generate PCAP: {e:?}"); error!("Failed to generate PCAP: {e:?}");
@@ -530,7 +521,10 @@ mod tests {
assert_eq!( assert_eq!(
filenames, filenames,
vec![format!("{entry_name}.qmdl"), format!("{entry_name}.pcapng"),] vec![
format!("{entry_name}.qmdl.gz"),
format!("{entry_name}.pcapng"),
]
); );
} }
} }
+1
View File
@@ -30,5 +30,6 @@ serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
num_enum = "0.7.4" num_enum = "0.7.4"
utoipa = { version = "5.4.0", optional = true } utoipa = { version = "5.4.0", optional = true }
async-compression = { version = "0.4.41", features = ["tokio", "gzip"] }
[dev-dependencies] [dev-dependencies]
+7
View File
@@ -1,5 +1,6 @@
//! Diag protocol serialization/deserialization //! Diag protocol serialization/deserialization
use bytes::Bytes;
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use crc::{Algorithm, Crc}; use crc::{Algorithm, Crc};
use deku::prelude::*; use deku::prelude::*;
@@ -113,6 +114,12 @@ impl MessagesContainer {
} }
} }
impl From<MessagesContainer> for Bytes {
fn from(value: MessagesContainer) -> Self {
value.to_bytes().unwrap().into()
}
}
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)] #[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
pub struct HdlcEncapsulatedMessage { pub struct HdlcEncapsulatedMessage {
pub len: u32, pub len: u32,
+179 -57
View File
@@ -3,8 +3,14 @@
//! QmdlReader and QmdlWriter can read and write MessagesContainers to and from //! QmdlReader and QmdlWriter can read and write MessagesContainers to and from
//! QMDL files. //! QMDL files.
use std::io::{Cursor, ErrorKind};
use std::pin::Pin;
use std::task::Poll;
use crate::diag::{DataType, HdlcEncapsulatedMessage, MESSAGE_TERMINATOR, MessagesContainer}; use crate::diag::{DataType, HdlcEncapsulatedMessage, MESSAGE_TERMINATOR, MessagesContainer};
use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::write::GzipEncoder;
use futures::TryStream; use futures::TryStream;
use log::error; use log::error;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
@@ -13,8 +19,8 @@ pub struct QmdlWriter<T>
where where
T: AsyncWrite + Unpin, T: AsyncWrite + Unpin,
{ {
writer: T, writer: GzipEncoder<T>,
pub total_written: usize, pub total_uncompressed_bytes: usize,
} }
impl<T> QmdlWriter<T> impl<T> QmdlWriter<T>
@@ -22,50 +28,160 @@ where
T: AsyncWrite + Unpin, T: AsyncWrite + Unpin,
{ {
pub fn new(writer: T) -> Self { pub fn new(writer: T) -> Self {
QmdlWriter::new_with_existing_size(writer, 0) let gzip_writer = GzipEncoder::new(writer);
}
pub fn new_with_existing_size(writer: T, existing_size: usize) -> Self {
QmdlWriter { QmdlWriter {
writer, writer: gzip_writer,
total_written: existing_size, total_uncompressed_bytes: 0,
} }
} }
pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> { pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> {
for msg in &container.messages { for msg in &container.messages {
self.writer.write_all(&msg.data).await?; // for a gzipped file, we can't use `msg.data.len()` to
self.total_written += msg.data.len(); // determine the number of bytes written, so we have to
// manually do a `write_all()` type loop
let mut buf = Cursor::new(&msg.data);
loop {
let bytes_written = self.writer.write_buf(&mut buf).await?;
self.writer.flush().await?;
if bytes_written == 0 {
break;
}
self.total_uncompressed_bytes += bytes_written;
}
} }
Ok(()) Ok(())
} }
pub async fn close(mut self) -> std::io::Result<()> {
self.writer.shutdown().await?;
Ok(())
}
} }
#[derive(Debug)]
enum QmdlReaderSource<T> {
Compressed {
reader: GzipDecoder<BufReader<T>>,
eof: bool,
},
Uncompressed {
reader: T,
},
}
#[derive(Debug)]
struct QmdlAsyncReader<T> {
source: QmdlReaderSource<T>,
uncompressed_bytes_read: usize,
max_uncompressed_bytes: Option<usize>,
}
impl<T> QmdlAsyncReader<T>
where
T: AsyncRead,
{
pub fn new(reader: T, compressed: bool, max_uncompressed_bytes: Option<usize>) -> Self {
let source = if compressed {
QmdlReaderSource::Compressed {
reader: GzipDecoder::new(BufReader::new(reader)),
eof: false,
}
} else {
QmdlReaderSource::Uncompressed { reader }
};
Self {
source,
uncompressed_bytes_read: 0,
max_uncompressed_bytes,
}
}
}
impl<T> AsyncRead for QmdlAsyncReader<T>
where
T: AsyncRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
// if we've already read beyond the byte limit, return without reading
// into the buffer, essentially signalling EOF
if let Some(max_bytes) = self.max_uncompressed_bytes
&& self.uncompressed_bytes_read >= max_bytes
{
if self.uncompressed_bytes_read > max_bytes {
error!(
"warning: {} bytes read, but max_bytes was {}",
self.uncompressed_bytes_read, max_bytes
);
}
return Poll::Ready(Ok(()));
}
let before = buf.filled().len();
let this = self.get_mut();
let res = match &mut this.source {
QmdlReaderSource::Compressed { reader, eof } => {
// if we already determined we've reached the Gzip EOF, don't read more
if *eof {
return Poll::Ready(Ok(()));
}
match Pin::new(reader).poll_read(cx, buf) {
// if we hit an unexpected EOF in a Gzip file, it shouldn't
// be considered fatal, just a truncated file. mark that
// we're done and return the result as usual
Poll::Ready(Err(err)) if err.kind() == ErrorKind::UnexpectedEof => {
*eof = true;
Poll::Ready(Ok(()))
}
res => res,
}
}
QmdlReaderSource::Uncompressed { reader } => Pin::new(reader).poll_read(cx, buf),
};
// if we read more bytes than is allowed, cap the buffer by
// our max bytes
let after = buf.filled().len();
let read = after - before;
if let Some(max_bytes) = this.max_uncompressed_bytes
&& this.uncompressed_bytes_read + read > max_bytes
{
let overread = this.uncompressed_bytes_read + read - max_bytes;
buf.set_filled(after - overread);
}
res
}
}
#[derive(Debug)]
pub struct QmdlReader<T> pub struct QmdlReader<T>
where where
T: AsyncRead, T: AsyncRead,
{ {
reader: BufReader<T>, buf_reader: BufReader<QmdlAsyncReader<T>>,
bytes_read: usize,
max_bytes: Option<usize>,
} }
impl<T> QmdlReader<T> impl<T> QmdlReader<T>
where where
T: AsyncRead + Unpin, T: AsyncRead + Unpin,
{ {
pub fn new(reader: T, max_bytes: Option<usize>) -> Self { pub fn new(reader: T, compressed: bool, max_uncompressed_bytes: Option<usize>) -> Self {
QmdlReader { QmdlReader {
reader: BufReader::new(reader), buf_reader: BufReader::new(QmdlAsyncReader::new(
bytes_read: 0, reader,
max_bytes, compressed,
max_uncompressed_bytes,
)),
} }
} }
pub fn as_stream( pub fn as_stream(self) -> impl TryStream<Ok = MessagesContainer, Error = std::io::Error> {
&mut self, futures::stream::try_unfold(self, |mut reader| async {
) -> impl TryStream<Ok = MessagesContainer, Error = std::io::Error> + '_ {
futures::stream::try_unfold(self, |reader| async {
let maybe_container = reader.get_next_messages_container().await?; let maybe_container = reader.get_next_messages_container().await?;
match maybe_container { match maybe_container {
Some(container) => Ok(Some((container, reader))), Some(container) => Ok(Some((container, reader))),
@@ -77,22 +193,16 @@ where
pub async fn get_next_messages_container( pub async fn get_next_messages_container(
&mut self, &mut self,
) -> Result<Option<MessagesContainer>, std::io::Error> { ) -> Result<Option<MessagesContainer>, std::io::Error> {
if let Some(max_bytes) = self.max_bytes let mut buf = Vec::new();
&& self.bytes_read >= max_bytes if self
.buf_reader
.read_until(MESSAGE_TERMINATOR, &mut buf)
.await?
== 0
{ {
if self.bytes_read > max_bytes {
error!(
"warning: {} bytes read, but max_bytes was {}",
self.bytes_read, max_bytes
);
}
return Ok(None); return Ok(None);
} }
let mut buf = Vec::new();
let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf).await?;
self.bytes_read += bytes_read;
// Since QMDL is just a flat list of messages, we can't actually // Since QMDL is just a flat list of messages, we can't actually
// reproduce the container structure they came from in the original // reproduce the container structure they came from in the original
// read. So we'll just pretend that all containers had exactly one // read. So we'll just pretend that all containers had exactly one
@@ -102,13 +212,26 @@ where
data_type: DataType::UserSpace, data_type: DataType::UserSpace,
num_messages: 1, num_messages: 1,
messages: vec![HdlcEncapsulatedMessage { messages: vec![HdlcEncapsulatedMessage {
len: bytes_read as u32, len: buf.len() as u32,
data: buf, data: buf,
}], }],
})) }))
} }
} }
impl<T> AsyncRead for QmdlReader<T>
where
T: AsyncRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.get_mut().buf_reader).poll_read(cx, buf)
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::io::Cursor; use std::io::Cursor;
@@ -160,7 +283,7 @@ mod test {
#[tokio::test] #[tokio::test]
async fn test_unbounded_qmdl_reader() { async fn test_unbounded_qmdl_reader() {
let mut buf = Cursor::new(get_test_message_bytes()); let mut buf = Cursor::new(get_test_message_bytes());
let mut reader = QmdlReader::new(&mut buf, None); let mut reader = QmdlReader::new(&mut buf, false, None);
let expected_messages = get_test_messages(); let expected_messages = get_test_messages();
for message in expected_messages { for message in expected_messages {
let expected_container = MessagesContainer { let expected_container = MessagesContainer {
@@ -183,7 +306,7 @@ mod test {
let mut expected_messages = get_test_messages(); let mut expected_messages = get_test_messages();
let limit = expected_messages[0].len + expected_messages[1].len; let limit = expected_messages[0].len + expected_messages[1].len;
let mut reader = QmdlReader::new(&mut buf, Some(limit as usize)); let mut reader = QmdlReader::new(&mut buf, false, Some(limit as usize));
for message in expected_messages.drain(0..2) { for message in expected_messages.drain(0..2) {
let expected_container = MessagesContainer { let expected_container = MessagesContainer {
data_type: DataType::UserSpace, data_type: DataType::UserSpace,
@@ -201,29 +324,22 @@ mod test {
)); ));
} }
#[tokio::test] /// Writes the test containers to a QmdlWriter, optionally finishing the
async fn test_qmdl_writer() { /// gzip stream with a footer. Then, attempts to decompress the buffer with
/// a QmdlWriter, asserting that the containers match what's expected.
async fn run_compressed_reading_and_writing_tests(do_close: bool) {
let containers = get_test_containers();
let mut buf = Vec::new(); let mut buf = Vec::new();
let mut writer = QmdlWriter::new(&mut buf); {
let expected_containers = get_test_containers(); let mut writer = QmdlWriter::new(&mut buf);
for container in &expected_containers { for container in &containers {
writer.write_container(container).await.unwrap(); writer.write_container(&container).await.unwrap();
}
if do_close {
writer.close().await.unwrap();
}
} }
assert_eq!(writer.total_written, buf.len()); let mut reader = QmdlReader::new(Cursor::new(buf), true, None);
assert_eq!(buf, get_test_message_bytes());
}
#[tokio::test]
async fn test_writing_and_reading() {
let mut buf = Vec::new();
let mut writer = QmdlWriter::new(&mut buf);
let expected_containers = get_test_containers();
for container in &expected_containers {
writer.write_container(container).await.unwrap();
}
let limit = Some(buf.len());
let mut reader = QmdlReader::new(Cursor::new(&mut buf), limit);
let expected_messages = get_test_messages(); let expected_messages = get_test_messages();
for message in expected_messages { for message in expected_messages {
let expected_container = MessagesContainer { let expected_container = MessagesContainer {
@@ -241,4 +357,10 @@ mod test {
Ok(None) Ok(None)
)); ));
} }
#[tokio::test]
async fn test_compressed_reading_and_writing() {
run_compressed_reading_and_writing_tests(true).await;
run_compressed_reading_and_writing_tests(false).await;
}
} }