mirror of
https://github.com/EFForg/rayhunter.git
synced 2026-06-03 19:53:33 -07:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f8c9701556 | |||
| fda0659ba4 | |||
| 0cd70ad73c | |||
| af4a9aeb95 | |||
| 73e3c9a5c2 | |||
| 58c60c2661 | |||
| e0ae8a0298 | |||
| e6a3a4331e | |||
| 9191540e86 | |||
| 0a93e93838 |
Generated
+50
-27
@@ -274,6 +274,18 @@ dependencies = [
|
|||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-compression"
|
||||||
|
version = "0.4.41"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d0f9ee0f6e02ffd7ad5816e9464499fba7b3effd01123b515c41d1697c43dad1"
|
||||||
|
dependencies = [
|
||||||
|
"compression-codecs",
|
||||||
|
"compression-core",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-executor"
|
name = "async-executor"
|
||||||
version = "1.13.3"
|
version = "1.13.3"
|
||||||
@@ -945,6 +957,23 @@ dependencies = [
|
|||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "compression-codecs"
|
||||||
|
version = "0.4.37"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "eb7b51a7d9c967fc26773061ba86150f19c50c0d65c887cb1fbe295fd16619b7"
|
||||||
|
dependencies = [
|
||||||
|
"compression-core",
|
||||||
|
"flate2",
|
||||||
|
"memchr",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "compression-core"
|
||||||
|
version = "0.4.31"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "concurrent-queue"
|
name = "concurrent-queue"
|
||||||
version = "2.5.0"
|
version = "2.5.0"
|
||||||
@@ -1733,9 +1762,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flate2"
|
name = "flate2"
|
||||||
version = "1.1.1"
|
version = "1.1.9"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece"
|
checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crc32fast",
|
"crc32fast",
|
||||||
"miniz_oxide",
|
"miniz_oxide",
|
||||||
@@ -1812,9 +1841,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures"
|
name = "futures"
|
||||||
version = "0.3.31"
|
version = "0.3.32"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
|
checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
@@ -1827,9 +1856,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-channel"
|
name = "futures-channel"
|
||||||
version = "0.3.31"
|
version = "0.3.32"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
|
checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-sink",
|
"futures-sink",
|
||||||
@@ -1837,15 +1866,15 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-core"
|
name = "futures-core"
|
||||||
version = "0.3.31"
|
version = "0.3.32"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
|
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-executor"
|
name = "futures-executor"
|
||||||
version = "0.3.31"
|
version = "0.3.32"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
|
checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-task",
|
"futures-task",
|
||||||
@@ -1854,9 +1883,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-io"
|
name = "futures-io"
|
||||||
version = "0.3.31"
|
version = "0.3.32"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
|
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-lite"
|
name = "futures-lite"
|
||||||
@@ -1873,9 +1902,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-macro"
|
name = "futures-macro"
|
||||||
version = "0.3.31"
|
version = "0.3.32"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
|
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
@@ -1884,21 +1913,21 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-sink"
|
name = "futures-sink"
|
||||||
version = "0.3.31"
|
version = "0.3.32"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
|
checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-task"
|
name = "futures-task"
|
||||||
version = "0.3.31"
|
version = "0.3.32"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
|
checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-util"
|
name = "futures-util"
|
||||||
version = "0.3.31"
|
version = "0.3.32"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
|
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
@@ -1908,7 +1937,6 @@ dependencies = [
|
|||||||
"futures-task",
|
"futures-task",
|
||||||
"memchr",
|
"memchr",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"pin-utils",
|
|
||||||
"slab",
|
"slab",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -4132,12 +4160,6 @@ version = "0.2.16"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
|
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "pin-utils"
|
|
||||||
version = "0.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "piper"
|
name = "piper"
|
||||||
version = "0.2.4"
|
version = "0.2.4"
|
||||||
@@ -4674,6 +4696,7 @@ checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539"
|
|||||||
name = "rayhunter"
|
name = "rayhunter"
|
||||||
version = "0.10.2"
|
version = "0.10.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-compression",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"crc",
|
"crc",
|
||||||
|
|||||||
+13
-30
@@ -1,15 +1,13 @@
|
|||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use futures::TryStreamExt;
|
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use pcap_file_tokio::pcapng::{Block, PcapNgReader};
|
use pcap_file_tokio::pcapng::{Block, PcapNgReader};
|
||||||
use rayhunter::{
|
use rayhunter::{
|
||||||
analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness},
|
analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness},
|
||||||
diag::DataType,
|
|
||||||
gsmtap_parser,
|
gsmtap_parser,
|
||||||
pcap::GsmtapPcapWriter,
|
pcap::GsmtapPcapWriter,
|
||||||
qmdl::QmdlReader,
|
qmdl::QmdlMessageReader,
|
||||||
};
|
};
|
||||||
use std::{collections::HashMap, future, path::PathBuf, pin::pin};
|
use std::{collections::HashMap, path::PathBuf};
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
@@ -113,26 +111,14 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) {
|
|||||||
async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) {
|
async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) {
|
||||||
let mut harness = Harness::new_with_config(&AnalyzerConfig::default());
|
let mut harness = Harness::new_with_config(&AnalyzerConfig::default());
|
||||||
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
|
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
|
||||||
let file_size = qmdl_file
|
let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader");
|
||||||
.metadata()
|
|
||||||
.await
|
|
||||||
.expect("failed to get QMDL file metadata")
|
|
||||||
.len();
|
|
||||||
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
|
|
||||||
let mut qmdl_stream = pin!(
|
|
||||||
qmdl_reader
|
|
||||||
.as_stream()
|
|
||||||
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace))
|
|
||||||
);
|
|
||||||
let mut report = Report::new(qmdl_path);
|
let mut report = Report::new(qmdl_path);
|
||||||
while let Some(container) = qmdl_stream
|
while let Some(maybe_message) = qmdl_reader
|
||||||
.try_next()
|
.get_next_message()
|
||||||
.await
|
.await
|
||||||
.expect("failed getting QMDL container")
|
.expect("failed to get message")
|
||||||
{
|
{
|
||||||
for row in harness.analyze_qmdl_messages(container) {
|
report.process_row(harness.analyze_qmdl_message(maybe_message));
|
||||||
report.process_row(row);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
report.print_summary(show_skipped);
|
report.print_summary(show_skipped);
|
||||||
}
|
}
|
||||||
@@ -141,8 +127,7 @@ async fn pcapify(qmdl_path: &PathBuf) {
|
|||||||
let qmdl_file = &mut File::open(&qmdl_path)
|
let qmdl_file = &mut File::open(&qmdl_path)
|
||||||
.await
|
.await
|
||||||
.expect("failed to open qmdl file");
|
.expect("failed to open qmdl file");
|
||||||
let qmdl_file_size = qmdl_file.metadata().await.unwrap().len();
|
let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader");
|
||||||
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_file_size as usize));
|
|
||||||
let mut pcap_path = qmdl_path.clone();
|
let mut pcap_path = qmdl_path.clone();
|
||||||
pcap_path.set_extension("pcapng");
|
pcap_path.set_extension("pcapng");
|
||||||
let pcap_file = &mut File::create(&pcap_path)
|
let pcap_file = &mut File::create(&pcap_path)
|
||||||
@@ -150,12 +135,12 @@ async fn pcapify(qmdl_path: &PathBuf) {
|
|||||||
.expect("failed to open pcap file");
|
.expect("failed to open pcap file");
|
||||||
let mut pcap_writer = GsmtapPcapWriter::new(pcap_file).await.unwrap();
|
let mut pcap_writer = GsmtapPcapWriter::new(pcap_file).await.unwrap();
|
||||||
pcap_writer.write_iface_header().await.unwrap();
|
pcap_writer.write_iface_header().await.unwrap();
|
||||||
while let Some(container) = qmdl_reader
|
while let Some(maybe_message) = qmdl_reader
|
||||||
.get_next_messages_container()
|
.get_next_message()
|
||||||
.await
|
.await
|
||||||
.expect("failed to get container")
|
.expect("failed to get message")
|
||||||
{
|
{
|
||||||
for msg in container.into_messages().into_iter().flatten() {
|
if let Ok(msg) = maybe_message {
|
||||||
if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) {
|
if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) {
|
||||||
pcap_writer
|
pcap_writer
|
||||||
.write_gsmtap_message(parsed, timestamp)
|
.write_gsmtap_message(parsed, timestamp)
|
||||||
@@ -197,9 +182,7 @@ async fn main() {
|
|||||||
let name_str = name.to_str().unwrap();
|
let name_str = name.to_str().unwrap();
|
||||||
let path = entry.path();
|
let path = entry.path();
|
||||||
let path_str = path.to_str().unwrap();
|
let path_str = path.to_str().unwrap();
|
||||||
// instead of relying on the QMDL extension, can we check if a file is
|
if name_str.ends_with(".qmdl") || name_str.ends_with(".qmdl.gz") {
|
||||||
// QMDL by inspecting the contents?
|
|
||||||
if name_str.ends_with(".qmdl") {
|
|
||||||
info!("**** Beginning analysis of {name_str}");
|
info!("**** Beginning analysis of {name_str}");
|
||||||
analyze_qmdl(path_str, args.show_skipped).await;
|
analyze_qmdl(path_str, args.show_skipped).await;
|
||||||
if args.pcapify {
|
if args.pcapify {
|
||||||
|
|||||||
+1
-1
@@ -33,7 +33,7 @@ futures-macro = "0.3.30"
|
|||||||
include_dir = "0.7.3"
|
include_dir = "0.7.3"
|
||||||
chrono = { version = "0.4.31", features = ["serde"] }
|
chrono = { version = "0.4.31", features = ["serde"] }
|
||||||
tokio-stream = { version = "0.1.14", default-features = false, features = ["io-util"] }
|
tokio-stream = { version = "0.1.14", default-features = false, features = ["io-util"] }
|
||||||
futures = { version = "0.3.30", default-features = false }
|
futures = { version = "0.3.32", default-features = false, features = ["std"] }
|
||||||
serde_json = "1.0.114"
|
serde_json = "1.0.114"
|
||||||
image = { version = "0.25.1", default-features = false, features = ["png", "gif"] }
|
image = { version = "0.25.1", default-features = false, features = ["png", "gif"] }
|
||||||
tempfile = "3.10.2"
|
tempfile = "3.10.2"
|
||||||
|
|||||||
+21
-23
@@ -1,16 +1,14 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{cmp, future, pin};
|
use std::cmp;
|
||||||
|
|
||||||
use axum::Json;
|
use axum::Json;
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{Path, State},
|
extract::{Path, State},
|
||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
};
|
};
|
||||||
use futures::TryStreamExt;
|
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness};
|
use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness};
|
||||||
use rayhunter::diag::{DataType, MessagesContainer};
|
use rayhunter::diag::{DiagParsingError, Message, MessagesContainer};
|
||||||
use rayhunter::qmdl::QmdlReader;
|
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::io::{AsyncWriteExt, BufWriter};
|
use tokio::io::{AsyncWriteExt, BufWriter};
|
||||||
@@ -47,7 +45,7 @@ impl AnalysisWriter {
|
|||||||
|
|
||||||
// Runs the analysis harness on the given container, serializing the results
|
// Runs the analysis harness on the given container, serializing the results
|
||||||
// to the analysis file, returning the whether any warnings were detected
|
// to the analysis file, returning the whether any warnings were detected
|
||||||
pub async fn analyze(
|
pub async fn analyze_container(
|
||||||
&mut self,
|
&mut self,
|
||||||
container: MessagesContainer,
|
container: MessagesContainer,
|
||||||
) -> Result<EventType, std::io::Error> {
|
) -> Result<EventType, std::io::Error> {
|
||||||
@@ -62,6 +60,17 @@ impl AnalysisWriter {
|
|||||||
Ok(max_type)
|
Ok(max_type)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn analyze_message(
|
||||||
|
&mut self,
|
||||||
|
maybe_qmdl_msg: Result<Message, DiagParsingError>,
|
||||||
|
) -> Result<EventType, std::io::Error> {
|
||||||
|
let row = self.harness.analyze_qmdl_message(maybe_qmdl_msg);
|
||||||
|
if !row.is_empty() {
|
||||||
|
self.write(&row).await?;
|
||||||
|
}
|
||||||
|
Ok(row.get_max_event_type())
|
||||||
|
}
|
||||||
|
|
||||||
async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
|
async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
|
||||||
let mut value_str = serde_json::to_string(value).unwrap();
|
let mut value_str = serde_json::to_string(value).unwrap();
|
||||||
value_str.push('\n');
|
value_str.push('\n');
|
||||||
@@ -135,7 +144,7 @@ async fn perform_analysis(
|
|||||||
analyzer_config: &AnalyzerConfig,
|
analyzer_config: &AnalyzerConfig,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
info!("Opening QMDL and analysis file for {name}...");
|
info!("Opening QMDL and analysis file for {name}...");
|
||||||
let (analysis_file, qmdl_file) = {
|
let (analysis_file, mut qmdl_reader) = {
|
||||||
let mut qmdl_store = qmdl_store_lock.write().await;
|
let mut qmdl_store = qmdl_store_lock.write().await;
|
||||||
let (entry_index, _) = qmdl_store
|
let (entry_index, _) = qmdl_store
|
||||||
.entry_for_name(name)
|
.entry_for_name(name)
|
||||||
@@ -144,37 +153,26 @@ async fn perform_analysis(
|
|||||||
.clear_and_open_entry_analysis(entry_index)
|
.clear_and_open_entry_analysis(entry_index)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("{e:?}"))?;
|
.map_err(|e| format!("{e:?}"))?;
|
||||||
let qmdl_file = qmdl_store
|
let qmdl_reader = qmdl_store
|
||||||
.open_entry_qmdl(entry_index)
|
.open_entry_qmdl(entry_index)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("{e:?}"))?;
|
.map_err(|e| format!("{e:?}"))?;
|
||||||
|
|
||||||
(analysis_file, qmdl_file)
|
(analysis_file, qmdl_reader)
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config)
|
let mut analysis_writer = AnalysisWriter::new(analysis_file, analyzer_config)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("{e:?}"))?;
|
.map_err(|e| format!("{e:?}"))?;
|
||||||
let file_size = qmdl_file
|
|
||||||
.metadata()
|
|
||||||
.await
|
|
||||||
.expect("failed to get QMDL file metadata")
|
|
||||||
.len();
|
|
||||||
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
|
|
||||||
let mut qmdl_stream = pin::pin!(
|
|
||||||
qmdl_reader
|
|
||||||
.as_stream()
|
|
||||||
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace))
|
|
||||||
);
|
|
||||||
|
|
||||||
info!("Starting analysis for {name}...");
|
info!("Starting analysis for {name}...");
|
||||||
while let Some(container) = qmdl_stream
|
while let Some(maybe_message) = qmdl_reader
|
||||||
.try_next()
|
.get_next_message()
|
||||||
.await
|
.await
|
||||||
.expect("failed getting QMDL container")
|
.expect("failed to get message")
|
||||||
{
|
{
|
||||||
let _ = analysis_writer
|
let _ = analysis_writer
|
||||||
.analyze(container)
|
.analyze_message(maybe_message)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("{e:?}"))?;
|
.map_err(|e| format!("{e:?}"))?;
|
||||||
}
|
}
|
||||||
|
|||||||
+23
-11
@@ -63,7 +63,7 @@ pub struct DiagTask {
|
|||||||
|
|
||||||
enum DiagState {
|
enum DiagState {
|
||||||
Recording {
|
Recording {
|
||||||
qmdl_writer: QmdlWriter<File>,
|
qmdl_writer: Box<QmdlWriter<File>>,
|
||||||
analysis_writer: Box<AnalysisWriter>,
|
analysis_writer: Box<AnalysisWriter>,
|
||||||
},
|
},
|
||||||
Stopped,
|
Stopped,
|
||||||
@@ -143,7 +143,7 @@ impl DiagTask {
|
|||||||
DiskSpaceCheck::Failed => {}
|
DiskSpaceCheck::Failed => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (qmdl_file, analysis_file) = match qmdl_store.new_entry().await {
|
let (qmdl_gz_file, analysis_file) = match qmdl_store.new_entry().await {
|
||||||
Ok(files) => files,
|
Ok(files) => files,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let msg = format!("failed creating QMDL file entry: {e}");
|
let msg = format!("failed creating QMDL file entry: {e}");
|
||||||
@@ -152,7 +152,7 @@ impl DiagTask {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.stop_current_recording().await;
|
self.stop_current_recording().await;
|
||||||
let qmdl_writer = QmdlWriter::new(qmdl_file);
|
let qmdl_writer = Box::new(QmdlWriter::new(qmdl_gz_file));
|
||||||
let analysis_writer = match AnalysisWriter::new(analysis_file, &self.analyzer_config).await
|
let analysis_writer = match AnalysisWriter::new(analysis_file, &self.analyzer_config).await
|
||||||
{
|
{
|
||||||
Ok(writer) => Box::new(writer),
|
Ok(writer) => Box::new(writer),
|
||||||
@@ -237,13 +237,23 @@ impl DiagTask {
|
|||||||
let mut state = DiagState::Stopped;
|
let mut state = DiagState::Stopped;
|
||||||
std::mem::swap(&mut self.state, &mut state);
|
std::mem::swap(&mut self.state, &mut state);
|
||||||
if let DiagState::Recording {
|
if let DiagState::Recording {
|
||||||
analysis_writer, ..
|
qmdl_writer,
|
||||||
|
analysis_writer,
|
||||||
|
..
|
||||||
} = state
|
} = state
|
||||||
{
|
{
|
||||||
analysis_writer
|
match (qmdl_writer.close().await, analysis_writer.close().await) {
|
||||||
.close()
|
(Ok(()), Ok(())) => {}
|
||||||
.await
|
(qmdl_result, analysis_result) => {
|
||||||
.expect("failed to close analysis writer");
|
if let Err(err) = qmdl_result {
|
||||||
|
error!("failed to close QmdlWriter: {:?}", err);
|
||||||
|
}
|
||||||
|
if let Err(err) = analysis_result {
|
||||||
|
error!("failed to close AnalysisWriter: {:?}", err);
|
||||||
|
}
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -313,15 +323,16 @@ impl DiagTask {
|
|||||||
self.stop(qmdl_store, Some(reason)).await;
|
self.stop(qmdl_store, Some(reason)).await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if let Ok(file_size) = qmdl_writer.size().await {
|
||||||
debug!(
|
debug!(
|
||||||
"total QMDL bytes written: {}, updating manifest...",
|
"total QMDL bytes written: {}, updating manifest...",
|
||||||
qmdl_writer.total_written
|
file_size
|
||||||
);
|
);
|
||||||
let index = qmdl_store
|
let index = qmdl_store
|
||||||
.current_entry
|
.current_entry
|
||||||
.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
|
.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
|
||||||
if let Err(e) = qmdl_store
|
if let Err(e) = qmdl_store
|
||||||
.update_entry_qmdl_size(index, qmdl_writer.total_written)
|
.update_entry_qmdl_size(index, file_size)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
let reason = format!("failed to update manifest (disk full?): {e}");
|
let reason = format!("failed to update manifest (disk full?): {e}");
|
||||||
@@ -330,9 +341,10 @@ impl DiagTask {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
debug!("done!");
|
debug!("done!");
|
||||||
|
}
|
||||||
let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum();
|
let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum();
|
||||||
self.bytes_since_space_check += container_bytes;
|
self.bytes_since_space_check += container_bytes;
|
||||||
let max_type = match analysis_writer.analyze(container).await {
|
let max_type = match analysis_writer.analyze_container(container).await {
|
||||||
Ok(t) => t,
|
Ok(t) => t,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("failed to analyze container: {e}");
|
warn!("failed to analyze container: {e}");
|
||||||
|
|||||||
+7
-22
@@ -7,12 +7,11 @@ use axum::http::StatusCode;
|
|||||||
use axum::http::header::CONTENT_TYPE;
|
use axum::http::header::CONTENT_TYPE;
|
||||||
use axum::response::{IntoResponse, Response};
|
use axum::response::{IntoResponse, Response};
|
||||||
use log::error;
|
use log::error;
|
||||||
use rayhunter::diag::DataType;
|
|
||||||
use rayhunter::gsmtap_parser;
|
use rayhunter::gsmtap_parser;
|
||||||
use rayhunter::pcap::GsmtapPcapWriter;
|
use rayhunter::pcap::GsmtapPcapWriter;
|
||||||
use rayhunter::qmdl::QmdlReader;
|
use rayhunter::qmdl::QmdlMessageReader;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite, duplex};
|
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, duplex};
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
|
|
||||||
// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
|
// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
|
||||||
@@ -51,17 +50,14 @@ pub async fn get_pcap(
|
|||||||
"QMDL file is empty, try again in a bit!".to_string(),
|
"QMDL file is empty, try again in a bit!".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
let qmdl_size_bytes = entry.qmdl_size_bytes;
|
let qmdl_reader = qmdl_store
|
||||||
let qmdl_file = qmdl_store
|
|
||||||
.open_entry_qmdl(entry_index)
|
.open_entry_qmdl(entry_index)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
|
||||||
// the QMDL reader should stop at the last successfully written data chunk
|
|
||||||
// (entry.size_bytes)
|
|
||||||
let (reader, writer) = duplex(1024);
|
let (reader, writer) = duplex(1024);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes).await {
|
if let Err(e) = generate_pcap_data(writer, qmdl_reader).await {
|
||||||
error!("failed to generate PCAP: {e:?}");
|
error!("failed to generate PCAP: {e:?}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -71,25 +67,15 @@ pub async fn get_pcap(
|
|||||||
Ok((headers, body).into_response())
|
Ok((headers, body).into_response())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn generate_pcap_data<R, W>(
|
pub async fn generate_pcap_data<R, W>(writer: W, mut reader: QmdlMessageReader<R>) -> Result<(), Error>
|
||||||
writer: W,
|
|
||||||
qmdl_file: R,
|
|
||||||
qmdl_size_bytes: usize,
|
|
||||||
) -> Result<(), Error>
|
|
||||||
where
|
where
|
||||||
W: AsyncWrite + Unpin + Send,
|
W: AsyncWrite + Unpin + Send,
|
||||||
R: AsyncRead + Unpin,
|
R: AsyncRead + AsyncSeek + Unpin,
|
||||||
{
|
{
|
||||||
let mut pcap_writer = GsmtapPcapWriter::new(writer).await?;
|
let mut pcap_writer = GsmtapPcapWriter::new(writer).await?;
|
||||||
pcap_writer.write_iface_header().await?;
|
pcap_writer.write_iface_header().await?;
|
||||||
|
|
||||||
let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes));
|
while let Some(maybe_msg) = reader.get_next_message().await? {
|
||||||
while let Some(container) = reader.get_next_messages_container().await? {
|
|
||||||
if container.data_type != DataType::UserSpace {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for maybe_msg in container.into_messages() {
|
|
||||||
match maybe_msg {
|
match maybe_msg {
|
||||||
Ok(msg) => {
|
Ok(msg) => {
|
||||||
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
|
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
|
||||||
@@ -102,7 +88,6 @@ where
|
|||||||
Err(e) => error!("error parsing message: {e:?}"),
|
Err(e) => error!("error parsing message: {e:?}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use std::path::{Path, PathBuf};
|
|||||||
|
|
||||||
use chrono::{DateTime, Local};
|
use chrono::{DateTime, Local};
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
|
use rayhunter::qmdl::QmdlMessageReader;
|
||||||
use rayhunter::util::RuntimeMetadata;
|
use rayhunter::util::RuntimeMetadata;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
@@ -57,7 +58,6 @@ pub struct ManifestEntry {
|
|||||||
/// The system time when the last message was recorded to the file
|
/// The system time when the last message was recorded to the file
|
||||||
#[cfg_attr(feature = "apidocs", schema(value_type = String))]
|
#[cfg_attr(feature = "apidocs", schema(value_type = String))]
|
||||||
pub last_message_time: Option<DateTime<Local>>,
|
pub last_message_time: Option<DateTime<Local>>,
|
||||||
/// The size of the QMDL file in bytes
|
|
||||||
pub qmdl_size_bytes: usize,
|
pub qmdl_size_bytes: usize,
|
||||||
/// The rayhunter daemon version which generated the file
|
/// The rayhunter daemon version which generated the file
|
||||||
pub rayhunter_version: Option<String>,
|
pub rayhunter_version: Option<String>,
|
||||||
@@ -67,6 +67,8 @@ pub struct ManifestEntry {
|
|||||||
pub arch: Option<String>,
|
pub arch: Option<String>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub stop_reason: Option<String>,
|
pub stop_reason: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub compressed: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ManifestEntry {
|
impl ManifestEntry {
|
||||||
@@ -82,12 +84,17 @@ impl ManifestEntry {
|
|||||||
system_os: Some(metadata.system_os),
|
system_os: Some(metadata.system_os),
|
||||||
arch: Some(metadata.arch),
|
arch: Some(metadata.arch),
|
||||||
stop_reason: None,
|
stop_reason: None,
|
||||||
|
compressed: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
|
pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
|
||||||
let mut filepath = path.as_ref().join(&self.name);
|
let mut filepath = path.as_ref().join(&self.name);
|
||||||
|
if self.compressed {
|
||||||
|
filepath.set_extension("qmdl.gz");
|
||||||
|
} else {
|
||||||
filepath.set_extension("qmdl");
|
filepath.set_extension("qmdl");
|
||||||
|
}
|
||||||
filepath
|
filepath
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -153,8 +160,9 @@ impl RecordingStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Does a best-effort attempt to recover the manifest from a directory of
|
// Does a best-effort attempt to recover the manifest from a directory of
|
||||||
// QMDL files. We expect these files to be named like "<timestamp>.qmdl",
|
// QMDL files. We expect these files to be named like "<timestamp>.qmdl"
|
||||||
// and skip any files which don't match that pattern.
|
// or "<timestamp>.qmdl.gz", and skip any files which don't match that
|
||||||
|
// pattern.
|
||||||
pub async fn recover<P>(path: P) -> Result<Self, RecordingStoreError>
|
pub async fn recover<P>(path: P) -> Result<Self, RecordingStoreError>
|
||||||
where
|
where
|
||||||
P: AsRef<Path>,
|
P: AsRef<Path>,
|
||||||
@@ -174,11 +182,14 @@ impl RecordingStore {
|
|||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
if !filename.ends_with(".qmdl") {
|
let (stem, compressed) = if filename.ends_with(".qmdl") {
|
||||||
|
(filename.trim_end_matches(".qmdl"), false)
|
||||||
|
} else if filename.ends_with(".qmdl.gz") {
|
||||||
|
(filename.trim_end_matches(".qmdl.gz"), true)
|
||||||
|
} else {
|
||||||
continue;
|
continue;
|
||||||
}
|
};
|
||||||
|
|
||||||
let stem = filename.trim_end_matches(".qmdl");
|
|
||||||
let Ok(start_timestamp) = stem.parse::<i64>() else {
|
let Ok(start_timestamp) = stem.parse::<i64>() else {
|
||||||
warn!("QMDL file has invalid name {os_filename:?}, skipping");
|
warn!("QMDL file has invalid name {os_filename:?}, skipping");
|
||||||
continue;
|
continue;
|
||||||
@@ -205,6 +216,7 @@ impl RecordingStore {
|
|||||||
info!("successfully recovered QMDL entry {os_filename:?}!");
|
info!("successfully recovered QMDL entry {os_filename:?}!");
|
||||||
manifest_entries.push(ManifestEntry {
|
manifest_entries.push(ManifestEntry {
|
||||||
name: stem.to_string(),
|
name: stem.to_string(),
|
||||||
|
compressed,
|
||||||
start_time: start_time.into(),
|
start_time: start_time.into(),
|
||||||
last_message_time: Some(last_message_time.into()),
|
last_message_time: Some(last_message_time.into()),
|
||||||
qmdl_size_bytes: metadata.size() as usize,
|
qmdl_size_bytes: metadata.size() as usize,
|
||||||
@@ -265,9 +277,15 @@ impl RecordingStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Returns the corresponding QMDL file for a given entry
|
// Returns the corresponding QMDL file for a given entry
|
||||||
pub async fn open_entry_qmdl(&self, entry_index: usize) -> Result<File, RecordingStoreError> {
|
pub async fn open_entry_qmdl(
|
||||||
|
&self,
|
||||||
|
entry_index: usize,
|
||||||
|
) -> Result<QmdlMessageReader<File>, RecordingStoreError> {
|
||||||
let entry = &self.manifest.entries[entry_index];
|
let entry = &self.manifest.entries[entry_index];
|
||||||
File::open(entry.get_qmdl_filepath(&self.path))
|
let file = File::open(entry.get_qmdl_filepath(&self.path))
|
||||||
|
.await
|
||||||
|
.map_err(RecordingStoreError::ReadFileError)?;
|
||||||
|
QmdlMessageReader::new(file)
|
||||||
.await
|
.await
|
||||||
.map_err(RecordingStoreError::ReadFileError)
|
.map_err(RecordingStoreError::ReadFileError)
|
||||||
}
|
}
|
||||||
@@ -490,7 +508,10 @@ mod tests {
|
|||||||
.entry_for_name(&store.manifest.entries[entry_index].name)
|
.entry_for_name(&store.manifest.entries[entry_index].name)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(entry.last_message_time.is_some());
|
assert!(entry.last_message_time.is_some());
|
||||||
assert_eq!(store.manifest.entries[entry_index].qmdl_size_bytes, 1000);
|
assert_eq!(
|
||||||
|
store.manifest.entries[entry_index].qmdl_size_bytes,
|
||||||
|
1000
|
||||||
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
RecordingStore::read_manifest(dir.path()).await.unwrap(),
|
RecordingStore::read_manifest(dir.path()).await.unwrap(),
|
||||||
store.manifest
|
store.manifest
|
||||||
|
|||||||
+117
-52
@@ -14,7 +14,8 @@ use log::{error, warn};
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::fs::write;
|
use tokio::fs::write;
|
||||||
use tokio::io::{AsyncReadExt, copy, duplex};
|
use tokio::io::copy;
|
||||||
|
use tokio::io::duplex;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
||||||
@@ -64,7 +65,7 @@ pub async fn get_qmdl(
|
|||||||
StatusCode::NOT_FOUND,
|
StatusCode::NOT_FOUND,
|
||||||
format!("couldn't find qmdl file with name {qmdl_idx}"),
|
format!("couldn't find qmdl file with name {qmdl_idx}"),
|
||||||
))?;
|
))?;
|
||||||
let qmdl_file = qmdl_store
|
let qmdl_reader = qmdl_store
|
||||||
.open_entry_qmdl(entry_index)
|
.open_entry_qmdl(entry_index)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
@@ -73,14 +74,15 @@ pub async fn get_qmdl(
|
|||||||
format!("error opening QMDL file: {err}"),
|
format!("error opening QMDL file: {err}"),
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64);
|
|
||||||
let qmdl_stream = ReaderStream::new(limited_qmdl_file);
|
|
||||||
|
|
||||||
let headers = [
|
let headers = [
|
||||||
(CONTENT_TYPE, "application/octet-stream"),
|
(CONTENT_TYPE, "application/octet-stream"),
|
||||||
(CONTENT_LENGTH, &entry.qmdl_size_bytes.to_string()),
|
(
|
||||||
|
CONTENT_LENGTH,
|
||||||
|
&entry.qmdl_size_bytes.to_string(),
|
||||||
|
),
|
||||||
];
|
];
|
||||||
let body = Body::from_stream(qmdl_stream);
|
let body = Body::from_stream(qmdl_reader.into_qmdl_stream());
|
||||||
Ok((headers, body).into_response())
|
Ok((headers, body).into_response())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -308,7 +310,7 @@ pub async fn get_zip(
|
|||||||
Path(entry_name): Path<String>,
|
Path(entry_name): Path<String>,
|
||||||
) -> Result<Response, (StatusCode, String)> {
|
) -> Result<Response, (StatusCode, String)> {
|
||||||
let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned();
|
let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned();
|
||||||
let (entry_index, qmdl_size_bytes) = {
|
let (entry_index, _) = {
|
||||||
let qmdl_store = state.qmdl_store_lock.read().await;
|
let qmdl_store = state.qmdl_store_lock.read().await;
|
||||||
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or((
|
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or((
|
||||||
StatusCode::NOT_FOUND,
|
StatusCode::NOT_FOUND,
|
||||||
@@ -322,7 +324,7 @@ pub async fn get_zip(
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
(entry_index, entry.qmdl_size_bytes)
|
(entry_index, entry.compressed)
|
||||||
};
|
};
|
||||||
|
|
||||||
let qmdl_store_lock = state.qmdl_store_lock.clone();
|
let qmdl_store_lock = state.qmdl_store_lock.clone();
|
||||||
@@ -335,22 +337,18 @@ pub async fn get_zip(
|
|||||||
|
|
||||||
// Add QMDL file
|
// Add QMDL file
|
||||||
{
|
{
|
||||||
let entry =
|
let entry = ZipEntryBuilder::new(
|
||||||
ZipEntryBuilder::new(format!("{qmdl_idx}.qmdl").into(), Compression::Stored);
|
format!("{qmdl_idx}.qmdl.gz").into(),
|
||||||
// FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does
|
Compression::Stored,
|
||||||
// not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed
|
);
|
||||||
// once https://github.com/Majored/rs-async-zip/pull/160 is released.
|
// FuturesAsyncWriteCompatExt::compat_write because async-zip's
|
||||||
|
// entrystream does not impl tokio's AsyncWrite, but only
|
||||||
|
// future's AsyncWrite. This can be removed once
|
||||||
|
// https://github.com/Majored/rs-async-zip/pull/160 is released.
|
||||||
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
|
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
|
||||||
|
|
||||||
let mut qmdl_file = {
|
|
||||||
let qmdl_store = qmdl_store_lock.read().await;
|
let qmdl_store = qmdl_store_lock.read().await;
|
||||||
qmdl_store
|
let mut qmdl_reader = qmdl_store.open_entry_qmdl(entry_index).await?;
|
||||||
.open_entry_qmdl(entry_index)
|
copy(&mut qmdl_reader, &mut entry_writer).await?;
|
||||||
.await?
|
|
||||||
.take(qmdl_size_bytes as u64)
|
|
||||||
};
|
|
||||||
|
|
||||||
copy(&mut qmdl_file, &mut entry_writer).await?;
|
|
||||||
entry_writer.into_inner().close().await?;
|
entry_writer.into_inner().close().await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -360,17 +358,10 @@ pub async fn get_zip(
|
|||||||
ZipEntryBuilder::new(format!("{qmdl_idx}.pcapng").into(), Compression::Stored);
|
ZipEntryBuilder::new(format!("{qmdl_idx}.pcapng").into(), Compression::Stored);
|
||||||
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
|
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
|
||||||
|
|
||||||
let qmdl_file_for_pcap = {
|
|
||||||
let qmdl_store = qmdl_store_lock.read().await;
|
let qmdl_store = qmdl_store_lock.read().await;
|
||||||
qmdl_store
|
let qmdl_reader = qmdl_store.open_entry_qmdl(entry_index).await?;
|
||||||
.open_entry_qmdl(entry_index)
|
|
||||||
.await?
|
|
||||||
.take(qmdl_size_bytes as u64)
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) =
|
if let Err(e) = generate_pcap_data(&mut entry_writer, qmdl_reader).await {
|
||||||
generate_pcap_data(&mut entry_writer, qmdl_file_for_pcap, qmdl_size_bytes).await
|
|
||||||
{
|
|
||||||
// if we fail to generate the PCAP file, we should still continue and give the
|
// if we fail to generate the PCAP file, we should still continue and give the
|
||||||
// user the QMDL.
|
// user the QMDL.
|
||||||
error!("Failed to generate PCAP: {e:?}");
|
error!("Failed to generate PCAP: {e:?}");
|
||||||
@@ -434,9 +425,13 @@ pub async fn debug_set_display_state(
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::io::Cursor;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use async_zip::base::read::mem::ZipFileReader;
|
use async_zip::base::read::mem::ZipFileReader;
|
||||||
use axum::extract::{Path, State};
|
use axum::extract::{Path, State};
|
||||||
|
use futures::AsyncReadExt;
|
||||||
|
use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlMessageReader, QmdlWriter}};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
|
async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
|
||||||
@@ -450,24 +445,24 @@ mod tests {
|
|||||||
|
|
||||||
async fn create_test_entry_with_data(
|
async fn create_test_entry_with_data(
|
||||||
store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>,
|
store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>,
|
||||||
test_data: &[u8],
|
test_data: &MessagesContainer,
|
||||||
) -> String {
|
) -> String {
|
||||||
let entry_name = {
|
let entry_name = {
|
||||||
let mut store = store_lock.write().await;
|
let mut store = store_lock.write().await;
|
||||||
let (mut qmdl_file, _analysis_file) = store.new_entry().await.unwrap();
|
let (mut qmdl_gz_file, _analysis_file) = store.new_entry().await.unwrap();
|
||||||
|
|
||||||
if !test_data.is_empty() {
|
let mut writer = QmdlWriter::new(&mut qmdl_gz_file);
|
||||||
use tokio::io::AsyncWriteExt;
|
writer.write_container(test_data).await.unwrap();
|
||||||
qmdl_file.write_all(test_data).await.unwrap();
|
writer.close().await.unwrap();
|
||||||
qmdl_file.flush().await.unwrap();
|
|
||||||
}
|
let qmdl_file_size = qmdl_gz_file.metadata().await.unwrap().len() as usize;
|
||||||
|
|
||||||
let current_entry = store.current_entry.unwrap();
|
let current_entry = store.current_entry.unwrap();
|
||||||
let entry = &store.manifest.entries[current_entry];
|
let entry = &store.manifest.entries[current_entry];
|
||||||
let entry_name = entry.name.clone();
|
let entry_name = entry.name.clone();
|
||||||
|
|
||||||
store
|
store
|
||||||
.update_entry_qmdl_size(current_entry, test_data.len())
|
.update_entry_qmdl_size(current_entry, qmdl_file_size)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
entry_name
|
entry_name
|
||||||
@@ -501,17 +496,69 @@ mod tests {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// valid HDLC encapsulated diag message generated from
|
||||||
|
// rayhunter::diag::test::get_test_message
|
||||||
|
fn create_test_container() -> MessagesContainer {
|
||||||
|
MessagesContainer {
|
||||||
|
data_type: DataType::UserSpace,
|
||||||
|
num_messages: 1,
|
||||||
|
messages: vec![
|
||||||
|
HdlcEncapsulatedMessage {
|
||||||
|
len: 39,
|
||||||
|
data: vec![
|
||||||
|
16,
|
||||||
|
0,
|
||||||
|
32,
|
||||||
|
0,
|
||||||
|
32,
|
||||||
|
0,
|
||||||
|
192,
|
||||||
|
176,
|
||||||
|
26,
|
||||||
|
165,
|
||||||
|
245,
|
||||||
|
135,
|
||||||
|
118,
|
||||||
|
35,
|
||||||
|
2,
|
||||||
|
1,
|
||||||
|
20,
|
||||||
|
14,
|
||||||
|
48,
|
||||||
|
0,
|
||||||
|
160,
|
||||||
|
0,
|
||||||
|
2,
|
||||||
|
8,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
217,
|
||||||
|
15,
|
||||||
|
5,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
1,
|
||||||
|
0,
|
||||||
|
10,
|
||||||
|
13,
|
||||||
|
196,
|
||||||
|
126,
|
||||||
|
],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_get_zip_success() {
|
async fn test_get_zip_success() {
|
||||||
let (_temp_dir, store_lock) = create_test_qmdl_store().await;
|
let (_temp_dir, store_lock) = create_test_qmdl_store().await;
|
||||||
let test_qmdl_data = vec![0x7E, 0x00, 0x00, 0x00, 0x10, 0x00, 0x7E];
|
let test_qmdl_data = create_test_container();
|
||||||
let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await;
|
let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await;
|
||||||
let state = create_test_server_state(store_lock);
|
let state = create_test_server_state(store_lock);
|
||||||
|
|
||||||
let result = get_zip(State(state), Path(entry_name.clone())).await;
|
let response = get_zip(State(state), Path(entry_name.clone())).await.unwrap();
|
||||||
|
|
||||||
assert!(result.is_ok());
|
|
||||||
let response = result.unwrap();
|
|
||||||
|
|
||||||
let headers = response.headers();
|
let headers = response.headers();
|
||||||
assert_eq!(headers.get("content-type").unwrap(), "application/zip");
|
assert_eq!(headers.get("content-type").unwrap(), "application/zip");
|
||||||
@@ -520,17 +567,35 @@ mod tests {
|
|||||||
let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap();
|
let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap();
|
||||||
|
|
||||||
let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap();
|
let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap();
|
||||||
|
let zip_reader_file = zip_reader.file();
|
||||||
let filenames = zip_reader
|
// sanity check that our ZIP has no empty files
|
||||||
.file()
|
for entry in zip_reader_file.entries() {
|
||||||
.entries()
|
assert_ne!(entry.uncompressed_size(), 0);
|
||||||
|
}
|
||||||
|
let filenames: Vec<String> = zip_reader_file.entries()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|entry| entry.filename().as_str().unwrap().to_owned())
|
.map(|entry| entry.filename().as_str().unwrap().to_string())
|
||||||
.collect::<Vec<String>>();
|
.collect();
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
filenames,
|
filenames,
|
||||||
vec![format!("{entry_name}.qmdl"), format!("{entry_name}.pcapng"),]
|
vec![
|
||||||
|
format!("{entry_name}.qmdl.gz"),
|
||||||
|
format!("{entry_name}.pcapng"),
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut qmdl_body = Vec::with_capacity(128);
|
||||||
|
zip_reader.reader_without_entry(0)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.read_to_end(&mut qmdl_body)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body)).await.unwrap();
|
||||||
|
let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
qmdl_reader.get_next_message().await.unwrap(),
|
||||||
|
Some(Ok(expected_message)),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,5 +30,6 @@ serde = { version = "1.0.197", features = ["derive"] }
|
|||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
num_enum = "0.7.4"
|
num_enum = "0.7.4"
|
||||||
utoipa = { version = "5.4.0", optional = true }
|
utoipa = { version = "5.4.0", optional = true }
|
||||||
|
async-compression = { version = "0.4.41", features = ["tokio", "gzip"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
|
||||||
use crate::analysis::diagnostic::DiagnosticAnalyzer;
|
use crate::analysis::diagnostic::DiagnosticAnalyzer;
|
||||||
|
use crate::diag::{DiagParsingError, Message};
|
||||||
use crate::gsmtap::{GsmtapHeader, GsmtapMessage, GsmtapType};
|
use crate::gsmtap::{GsmtapHeader, GsmtapMessage, GsmtapType};
|
||||||
use crate::util::RuntimeMetadata;
|
use crate::util::RuntimeMetadata;
|
||||||
use crate::{diag::MessagesContainer, gsmtap_parser};
|
use crate::{diag::MessagesContainer, gsmtap_parser};
|
||||||
@@ -231,6 +232,14 @@ pub struct AnalysisRow {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl AnalysisRow {
|
impl AnalysisRow {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
packet_timestamp: None,
|
||||||
|
skipped_message_reason: None,
|
||||||
|
events: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn is_empty(&self) -> bool {
|
pub fn is_empty(&self) -> bool {
|
||||||
self.skipped_message_reason.is_none() && !self.contains_warnings()
|
self.skipped_message_reason.is_none() && !self.contains_warnings()
|
||||||
}
|
}
|
||||||
@@ -412,36 +421,27 @@ impl Harness {
|
|||||||
row
|
row
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> {
|
pub fn analyze_qmdl_message(&mut self, maybe_qmdl_message: Result<Message, DiagParsingError>) -> AnalysisRow {
|
||||||
let mut rows = Vec::new();
|
let mut row = AnalysisRow::new();
|
||||||
for maybe_qmdl_message in container.into_messages() {
|
|
||||||
self.packet_num += 1;
|
self.packet_num += 1;
|
||||||
|
|
||||||
rows.push(AnalysisRow {
|
|
||||||
packet_timestamp: None,
|
|
||||||
skipped_message_reason: None,
|
|
||||||
events: Vec::new(),
|
|
||||||
});
|
|
||||||
// unwrap is safe here since we just pushed a value
|
|
||||||
let row = rows.last_mut().unwrap();
|
|
||||||
let qmdl_message = match maybe_qmdl_message {
|
let qmdl_message = match maybe_qmdl_message {
|
||||||
Ok(msg) => msg,
|
Ok(msg) => msg,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
row.skipped_message_reason = Some(format!("{err:?}"));
|
row.skipped_message_reason = Some(format!("{err:?}"));
|
||||||
continue;
|
return row;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let gsmtap_message = match gsmtap_parser::parse(qmdl_message) {
|
let gsmtap_message = match gsmtap_parser::parse(qmdl_message) {
|
||||||
Ok(msg) => msg,
|
Ok(msg) => msg,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
row.skipped_message_reason = Some(format!("{err:?}"));
|
row.skipped_message_reason = Some(format!("{err:?}"));
|
||||||
continue;
|
return row;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some((timestamp, gsmtap_msg)) = gsmtap_message else {
|
let Some((timestamp, gsmtap_msg)) = gsmtap_message else {
|
||||||
continue;
|
return row;
|
||||||
};
|
};
|
||||||
row.packet_timestamp = Some(timestamp.to_datetime());
|
row.packet_timestamp = Some(timestamp.to_datetime());
|
||||||
|
|
||||||
@@ -449,13 +449,19 @@ impl Harness {
|
|||||||
Ok(element) => element,
|
Ok(element) => element,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
row.skipped_message_reason = Some(format!("{err:?}"));
|
row.skipped_message_reason = Some(format!("{err:?}"));
|
||||||
continue;
|
return row;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
row.events = self.analyze_information_element(&element);
|
row.events = self.analyze_information_element(&element);
|
||||||
|
row
|
||||||
}
|
}
|
||||||
rows
|
|
||||||
|
pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> {
|
||||||
|
container.into_messages()
|
||||||
|
.drain(..)
|
||||||
|
.map(|maybe_message| self.analyze_qmdl_message(maybe_message))
|
||||||
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn analyze_information_element(&mut self, ie: &InformationElement) -> Vec<Option<Event>> {
|
fn analyze_information_element(&mut self, ie: &InformationElement) -> Vec<Option<Event>> {
|
||||||
|
|||||||
+35
-20
@@ -1,5 +1,6 @@
|
|||||||
//! Diag protocol serialization/deserialization
|
//! Diag protocol serialization/deserialization
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
use chrono::{DateTime, FixedOffset};
|
use chrono::{DateTime, FixedOffset};
|
||||||
use crc::{Algorithm, Crc};
|
use crc::{Algorithm, Crc};
|
||||||
use deku::prelude::*;
|
use deku::prelude::*;
|
||||||
@@ -89,30 +90,19 @@ impl MessagesContainer {
|
|||||||
let mut result = Vec::new();
|
let mut result = Vec::new();
|
||||||
for msg in self.messages {
|
for msg in self.messages {
|
||||||
for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) {
|
for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) {
|
||||||
match hdlc_decapsulate(sub_msg, &CRC_CCITT) {
|
result.push(Message::from_hdlc(sub_msg));
|
||||||
Ok(data) => match Message::from_bytes((&data, 0)) {
|
|
||||||
Ok(((leftover_bytes, _), res)) => {
|
|
||||||
if !leftover_bytes.is_empty() {
|
|
||||||
warn!(
|
|
||||||
"warning: {} leftover bytes when parsing Message",
|
|
||||||
leftover_bytes.len()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
result.push(Ok(res));
|
|
||||||
}
|
|
||||||
Err(e) => result.push(Err(DiagParsingError::MessageParsingError(e, data))),
|
|
||||||
},
|
|
||||||
Err(err) => result.push(Err(DiagParsingError::HdlcDecapsulationError(
|
|
||||||
err,
|
|
||||||
sub_msg.to_vec(),
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<MessagesContainer> for Bytes {
|
||||||
|
fn from(value: MessagesContainer) -> Self {
|
||||||
|
value.to_bytes().unwrap().into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
|
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
|
||||||
pub struct HdlcEncapsulatedMessage {
|
pub struct HdlcEncapsulatedMessage {
|
||||||
pub len: u32,
|
pub len: u32,
|
||||||
@@ -152,6 +142,29 @@ pub enum Message {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Message {
|
||||||
|
pub fn from_hdlc(data: &[u8]) -> Result<Message, DiagParsingError> {
|
||||||
|
match hdlc_decapsulate(data, &CRC_CCITT) {
|
||||||
|
Ok(data) => match Message::from_bytes((&data, 0)) {
|
||||||
|
Ok(((leftover_bytes, _), res)) => {
|
||||||
|
if !leftover_bytes.is_empty() {
|
||||||
|
warn!(
|
||||||
|
"warning: {} leftover bytes when parsing Message",
|
||||||
|
leftover_bytes.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
Err(e) => Err(DiagParsingError::MessageParsingError(e, data)),
|
||||||
|
},
|
||||||
|
Err(err) => Err(DiagParsingError::HdlcDecapsulationError(
|
||||||
|
err,
|
||||||
|
data.to_vec(),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
|
#[derive(Debug, Clone, PartialEq, DekuRead, DekuWrite)]
|
||||||
#[deku(ctx = "log_type: u16, hdr_len: u16", id = "log_type")]
|
#[deku(ctx = "log_type: u16, hdr_len: u16", id = "log_type")]
|
||||||
pub enum LogBody {
|
pub enum LogBody {
|
||||||
@@ -411,7 +424,7 @@ pub fn build_log_mask_request(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
pub(crate) mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
// Just about all of these test cases from manually parsing diag packets w/ QCSuper
|
// Just about all of these test cases from manually parsing diag packets w/ QCSuper
|
||||||
@@ -525,7 +538,7 @@ mod test {
|
|||||||
// this log is based on one captured on a real device -- if it fails to
|
// this log is based on one captured on a real device -- if it fails to
|
||||||
// serialize or deserialize, that's probably a problem with this mock, not
|
// serialize or deserialize, that's probably a problem with this mock, not
|
||||||
// the DiagReader implementation
|
// the DiagReader implementation
|
||||||
fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) {
|
pub fn get_test_message(payload: &[u8]) -> (HdlcEncapsulatedMessage, Message) {
|
||||||
let length_with_payload = 31 + payload.len() as u16;
|
let length_with_payload = 31 + payload.len() as u16;
|
||||||
let message = Message::Log {
|
let message = Message::Log {
|
||||||
pending_msgs: 0,
|
pending_msgs: 0,
|
||||||
@@ -559,6 +572,8 @@ mod test {
|
|||||||
len: encapsulated_data.len() as u32,
|
len: encapsulated_data.len() as u32,
|
||||||
data: encapsulated_data,
|
data: encapsulated_data,
|
||||||
};
|
};
|
||||||
|
// sanity check
|
||||||
|
assert_eq!(&Message::from_hdlc(&encapsulated.data).unwrap(), &message);
|
||||||
(encapsulated, message)
|
(encapsulated, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+294
-138
@@ -3,109 +3,212 @@
|
|||||||
//! QmdlReader and QmdlWriter can read and write MessagesContainers to and from
|
//! QmdlReader and QmdlWriter can read and write MessagesContainers to and from
|
||||||
//! QMDL files.
|
//! QMDL files.
|
||||||
|
|
||||||
use crate::diag::{DataType, HdlcEncapsulatedMessage, MESSAGE_TERMINATOR, MessagesContainer};
|
use std::io::ErrorKind;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::Poll;
|
||||||
|
|
||||||
|
use crate::diag::{DiagParsingError, MESSAGE_TERMINATOR, Message, MessagesContainer};
|
||||||
|
|
||||||
|
use async_compression::tokio::bufread::GzipDecoder;
|
||||||
|
use async_compression::tokio::write::GzipEncoder;
|
||||||
use futures::TryStream;
|
use futures::TryStream;
|
||||||
use log::error;
|
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader};
|
||||||
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
|
|
||||||
|
const GZIP_MAGIC_NUMBER: u16 = 0x1f8b;
|
||||||
|
|
||||||
pub struct QmdlWriter<T>
|
pub struct QmdlWriter<T>
|
||||||
where
|
where
|
||||||
T: AsyncWrite + Unpin,
|
T: AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
writer: T,
|
writer: GzipEncoder<T>,
|
||||||
pub total_written: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> QmdlWriter<T>
|
impl<T> QmdlWriter<T>
|
||||||
where
|
where
|
||||||
T: AsyncWrite + Unpin,
|
T: AsyncWrite + AsyncSeek + Unpin,
|
||||||
{
|
{
|
||||||
pub fn new(writer: T) -> Self {
|
pub fn new(writer: T) -> Self {
|
||||||
QmdlWriter::new_with_existing_size(writer, 0)
|
let gzip_writer = GzipEncoder::new(writer);
|
||||||
|
QmdlWriter {
|
||||||
|
writer: gzip_writer,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_with_existing_size(writer: T, existing_size: usize) -> Self {
|
pub async fn size(&mut self) -> std::io::Result<usize> {
|
||||||
QmdlWriter {
|
let size = self.writer.get_mut().stream_position().await?;
|
||||||
writer,
|
Ok(size as usize)
|
||||||
total_written: existing_size,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> {
|
pub async fn write_container(&mut self, container: &MessagesContainer) -> std::io::Result<()> {
|
||||||
for msg in &container.messages {
|
for msg in &container.messages {
|
||||||
self.writer.write_all(&msg.data).await?;
|
self.writer.write_all(&msg.data).await?;
|
||||||
self.total_written += msg.data.len();
|
self.writer.flush().await?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn close(mut self) -> std::io::Result<()> {
|
||||||
|
self.writer.shutdown().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct QmdlReader<T>
|
#[derive(Debug)]
|
||||||
|
enum QmdlReaderSource<T> {
|
||||||
|
Compressed {
|
||||||
|
reader: GzipDecoder<BufReader<T>>,
|
||||||
|
eof: bool,
|
||||||
|
},
|
||||||
|
Uncompressed {
|
||||||
|
reader: T,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct QmdlAsyncReader<T> {
|
||||||
|
source: QmdlReaderSource<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> QmdlAsyncReader<T>
|
||||||
where
|
where
|
||||||
T: AsyncRead,
|
T: AsyncRead,
|
||||||
{
|
{
|
||||||
reader: BufReader<T>,
|
pub fn new(reader: T, compressed: bool) -> Self {
|
||||||
bytes_read: usize,
|
let source = if compressed {
|
||||||
max_bytes: Option<usize>,
|
QmdlReaderSource::Compressed {
|
||||||
|
reader: GzipDecoder::new(BufReader::new(reader)),
|
||||||
|
eof: false,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
QmdlReaderSource::Uncompressed { reader }
|
||||||
|
};
|
||||||
|
Self {
|
||||||
|
source,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> QmdlReader<T>
|
impl<T> AsyncRead for QmdlAsyncReader<T>
|
||||||
where
|
where
|
||||||
T: AsyncRead + Unpin,
|
T: AsyncRead + Unpin,
|
||||||
{
|
{
|
||||||
pub fn new(reader: T, max_bytes: Option<usize>) -> Self {
|
fn poll_read(
|
||||||
QmdlReader {
|
self: Pin<&mut Self>,
|
||||||
reader: BufReader::new(reader),
|
cx: &mut std::task::Context<'_>,
|
||||||
bytes_read: 0,
|
buf: &mut tokio::io::ReadBuf<'_>,
|
||||||
max_bytes,
|
) -> Poll<std::io::Result<()>> {
|
||||||
|
let res = match &mut self.get_mut().source {
|
||||||
|
QmdlReaderSource::Compressed { reader, eof } => {
|
||||||
|
// if we already determined we've reached the Gzip EOF, don't read more
|
||||||
|
if *eof {
|
||||||
|
return Poll::Ready(Ok(()));
|
||||||
|
}
|
||||||
|
|
||||||
|
match Pin::new(reader).poll_read(cx, buf) {
|
||||||
|
// if we hit an unexpected EOF in a Gzip file, it shouldn't
|
||||||
|
// be considered fatal, just a truncated file. mark that
|
||||||
|
// we're done and return the result as usual
|
||||||
|
Poll::Ready(Err(err)) if err.kind() == ErrorKind::UnexpectedEof => {
|
||||||
|
*eof = true;
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
res => res,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
QmdlReaderSource::Uncompressed { reader } => Pin::new(reader).poll_read(cx, buf),
|
||||||
|
};
|
||||||
|
res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn as_stream(
|
#[derive(Debug)]
|
||||||
&mut self,
|
pub struct QmdlMessageReader<T>
|
||||||
) -> impl TryStream<Ok = MessagesContainer, Error = std::io::Error> + '_ {
|
where
|
||||||
futures::stream::try_unfold(self, |reader| async {
|
T: AsyncRead,
|
||||||
let maybe_container = reader.get_next_messages_container().await?;
|
{
|
||||||
match maybe_container {
|
buf_reader: BufReader<QmdlAsyncReader<T>>,
|
||||||
Some(container) => Ok(Some((container, reader))),
|
}
|
||||||
|
|
||||||
|
async fn is_gzip_stream<T>(mut reader: T) -> std::io::Result<bool>
|
||||||
|
where
|
||||||
|
T: AsyncRead + AsyncSeek + Unpin
|
||||||
|
{
|
||||||
|
let magic_number = reader.read_u16().await?;
|
||||||
|
reader.rewind().await?;
|
||||||
|
// this is safe because 0x1f8b.... doesn't overlap with any known
|
||||||
|
// diag::DataType values
|
||||||
|
Ok(magic_number == GZIP_MAGIC_NUMBER)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> QmdlMessageReader<T>
|
||||||
|
where
|
||||||
|
T: AsyncRead + AsyncSeek + Unpin,
|
||||||
|
{
|
||||||
|
pub async fn new(mut reader: T) -> std::io::Result<Self> {
|
||||||
|
let compressed = is_gzip_stream(&mut reader)
|
||||||
|
.await
|
||||||
|
.unwrap_or(false);
|
||||||
|
Ok(QmdlMessageReader {
|
||||||
|
buf_reader: BufReader::new(QmdlAsyncReader::new(
|
||||||
|
reader,
|
||||||
|
compressed,
|
||||||
|
)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_compressed(&self) -> bool {
|
||||||
|
matches!(self.buf_reader.get_ref().source, QmdlReaderSource::Compressed { .. })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_qmdl_stream(self) -> impl TryStream<Ok = Vec<u8>, Error = std::io::Error> {
|
||||||
|
futures::stream::try_unfold(self, |mut reader| async {
|
||||||
|
let mut buf = vec![];
|
||||||
|
match reader .buf_reader
|
||||||
|
.read_until(MESSAGE_TERMINATOR, &mut buf)
|
||||||
|
.await {
|
||||||
|
Err(err) => Err(err),
|
||||||
|
Ok(0) => Ok(None),
|
||||||
|
Ok(_) => Ok(Some((buf, reader))),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_message_stream(self) -> impl TryStream<Ok = Result<Message, DiagParsingError>, Error = std::io::Error> {
|
||||||
|
futures::stream::try_unfold(self, |mut reader| async {
|
||||||
|
match reader.get_next_message().await? {
|
||||||
|
Some(res) => Ok(Some((res, reader))),
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_next_messages_container(
|
pub async fn get_next_message(
|
||||||
&mut self,
|
&mut self,
|
||||||
) -> Result<Option<MessagesContainer>, std::io::Error> {
|
) -> Result<Option<Result<Message, DiagParsingError>>, std::io::Error> {
|
||||||
if let Some(max_bytes) = self.max_bytes
|
let mut buf = vec![];
|
||||||
&& self.bytes_read >= max_bytes
|
if self
|
||||||
|
.buf_reader
|
||||||
|
.read_until(MESSAGE_TERMINATOR, &mut buf)
|
||||||
|
.await?
|
||||||
|
== 0
|
||||||
{
|
{
|
||||||
if self.bytes_read > max_bytes {
|
|
||||||
error!(
|
|
||||||
"warning: {} bytes read, but max_bytes was {}",
|
|
||||||
self.bytes_read, max_bytes
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut buf = Vec::new();
|
Ok(Some(Message::from_hdlc(&buf)))
|
||||||
let bytes_read = self.reader.read_until(MESSAGE_TERMINATOR, &mut buf).await?;
|
}
|
||||||
self.bytes_read += bytes_read;
|
}
|
||||||
|
|
||||||
// Since QMDL is just a flat list of messages, we can't actually
|
impl<T> AsyncRead for QmdlMessageReader<T>
|
||||||
// reproduce the container structure they came from in the original
|
where
|
||||||
// read. So we'll just pretend that all containers had exactly one
|
T: AsyncRead + Unpin,
|
||||||
// message. As far as I know, the number of messages per container
|
{
|
||||||
// doesn't actually affect anything, so this should be fine.
|
fn poll_read(
|
||||||
Ok(Some(MessagesContainer {
|
self: Pin<&mut Self>,
|
||||||
data_type: DataType::UserSpace,
|
cx: &mut std::task::Context<'_>,
|
||||||
num_messages: 1,
|
buf: &mut tokio::io::ReadBuf<'_>,
|
||||||
messages: vec![HdlcEncapsulatedMessage {
|
) -> Poll<std::io::Result<()>> {
|
||||||
len: bytes_read as u32,
|
Pin::new(&mut self.get_mut().buf_reader).poll_read(cx, buf)
|
||||||
data: buf,
|
|
||||||
}],
|
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,132 +216,185 @@ where
|
|||||||
mod test {
|
mod test {
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
||||||
use crate::diag::CRC_CCITT;
|
use crate::diag::{DataType, HdlcEncapsulatedMessage, test::get_test_message};
|
||||||
use crate::hdlc::hdlc_encapsulate;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
fn get_test_messages() -> Vec<HdlcEncapsulatedMessage> {
|
fn get_test_messages() -> (Vec<HdlcEncapsulatedMessage>, Vec<Message>) {
|
||||||
let messages: Vec<HdlcEncapsulatedMessage> = (10..20)
|
let mut hdlcs = Vec::new();
|
||||||
.map(|i| {
|
let mut messages = Vec::new();
|
||||||
let data = hdlc_encapsulate(&vec![i as u8; i], &CRC_CCITT);
|
for i in 10..20 {
|
||||||
HdlcEncapsulatedMessage {
|
let (hdlc, msg) = get_test_message(&[i]);
|
||||||
len: data.len() as u32,
|
hdlcs.push(hdlc);
|
||||||
data,
|
messages.push(msg);
|
||||||
}
|
}
|
||||||
})
|
(hdlcs, messages)
|
||||||
.collect();
|
|
||||||
messages
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns a byte array consisting of concatenated HDLC encapsulated
|
// returns a byte array consisting of concatenated HDLC encapsulated
|
||||||
// test messages
|
// test messages
|
||||||
fn get_test_message_bytes() -> Vec<u8> {
|
fn get_test_message_bytes() -> Vec<u8> {
|
||||||
get_test_messages()
|
let (hdlcs, _) = get_test_messages();
|
||||||
|
hdlcs
|
||||||
.iter()
|
.iter()
|
||||||
.flat_map(|msg| msg.data.clone())
|
.flat_map(|msg| msg.data.clone())
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_test_containers() -> Vec<MessagesContainer> {
|
fn get_test_containers() -> Vec<MessagesContainer> {
|
||||||
let messages = get_test_messages();
|
let (hdlcs, _) = get_test_messages();
|
||||||
let (messages1, messages2) = messages.split_at(5);
|
let (hdlcs1, hdlcs2) = hdlcs.split_at(5);
|
||||||
vec![
|
vec![
|
||||||
MessagesContainer {
|
MessagesContainer {
|
||||||
data_type: DataType::UserSpace,
|
data_type: DataType::UserSpace,
|
||||||
num_messages: messages1.len() as u32,
|
num_messages: hdlcs1.len() as u32,
|
||||||
messages: messages1.to_vec(),
|
messages: hdlcs1.to_vec(),
|
||||||
},
|
},
|
||||||
MessagesContainer {
|
MessagesContainer {
|
||||||
data_type: DataType::UserSpace,
|
data_type: DataType::UserSpace,
|
||||||
num_messages: messages2.len() as u32,
|
num_messages: hdlcs2.len() as u32,
|
||||||
messages: messages2.to_vec(),
|
messages: hdlcs2.to_vec(),
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_unbounded_qmdl_reader() {
|
async fn test_qmdl_reader() {
|
||||||
let mut buf = Cursor::new(get_test_message_bytes());
|
let mut buf = Cursor::new(get_test_message_bytes());
|
||||||
let mut reader = QmdlReader::new(&mut buf, None);
|
let mut reader = QmdlMessageReader::new(&mut buf).await.unwrap();
|
||||||
let expected_messages = get_test_messages();
|
assert!(!reader.is_compressed());
|
||||||
for message in expected_messages {
|
let (_, expected_messages) = get_test_messages();
|
||||||
let expected_container = MessagesContainer {
|
for msg in expected_messages {
|
||||||
data_type: DataType::UserSpace,
|
|
||||||
num_messages: 1,
|
|
||||||
messages: vec![message],
|
|
||||||
};
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
expected_container,
|
Ok(msg),
|
||||||
reader.get_next_messages_container().await.unwrap().unwrap()
|
reader.get_next_message().await.unwrap().unwrap()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_bounded_qmdl_reader() {
|
async fn test_truncation() {
|
||||||
let mut buf = Cursor::new(get_test_message_bytes());
|
run_truncation_tests(false).await;
|
||||||
|
}
|
||||||
|
|
||||||
// bound the reader to the first two messages
|
#[tokio::test]
|
||||||
let mut expected_messages = get_test_messages();
|
async fn test_compressed_truncation() {
|
||||||
let limit = expected_messages[0].len + expected_messages[1].len;
|
run_truncation_tests(true).await;
|
||||||
|
}
|
||||||
|
|
||||||
let mut reader = QmdlReader::new(&mut buf, Some(limit as usize));
|
async fn run_truncation_tests(compressed: bool) {
|
||||||
for message in expected_messages.drain(0..2) {
|
let (hdlcs, expected_messages) = get_test_messages();
|
||||||
let expected_container = MessagesContainer {
|
let (bytes, message_lengths): (Vec<u8>, Vec<usize>) = if compressed {
|
||||||
data_type: DataType::UserSpace,
|
let mut buf = Vec::new();
|
||||||
num_messages: 1,
|
let mut compressed_lengths = Vec::new();
|
||||||
messages: vec![message],
|
let mut writer = GzipEncoder::new(&mut buf);
|
||||||
|
for hdlc in &hdlcs {
|
||||||
|
let before = writer.get_ref().len();
|
||||||
|
writer.write_all(&hdlc.data).await.unwrap();
|
||||||
|
writer.flush().await.unwrap();
|
||||||
|
let after = writer.get_ref().len();
|
||||||
|
compressed_lengths.push(after - before);
|
||||||
|
}
|
||||||
|
(buf, compressed_lengths)
|
||||||
|
} else {
|
||||||
|
(
|
||||||
|
get_test_message_bytes(),
|
||||||
|
hdlcs.iter()
|
||||||
|
.map(|hdlc| hdlc.data.len())
|
||||||
|
.collect()
|
||||||
|
)
|
||||||
};
|
};
|
||||||
|
for truncated_hdlc_i in 1..hdlcs.len() - 1 {
|
||||||
|
let whole_bytes: usize = message_lengths.iter().take(truncated_hdlc_i).sum();
|
||||||
|
for truncated_byte in 1..message_lengths[truncated_hdlc_i] {
|
||||||
|
let mut truncated_bytes = Cursor::new(&bytes[0..whole_bytes + truncated_byte]);
|
||||||
|
let mut reader = QmdlMessageReader::new(&mut truncated_bytes).await.unwrap();
|
||||||
|
for msg in expected_messages.iter().take(truncated_hdlc_i) {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
expected_container,
|
Ok(msg),
|
||||||
reader.get_next_messages_container().await.unwrap().unwrap()
|
reader.get_next_message().await.unwrap().unwrap().as_ref()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if compressed {
|
||||||
|
// for a compressed reader, we have a couple possible
|
||||||
|
// outcomes, depending on how far along the Gzip DEFLATE
|
||||||
|
// block was before it was truncated:
|
||||||
|
match reader.get_next_message().await.unwrap() {
|
||||||
|
// if the block was truncated early enough, the
|
||||||
|
// GzipDecoder will detect an unexpected EOF, and our
|
||||||
|
// QmdlReader will indicate the stream of messages is
|
||||||
|
// done
|
||||||
|
None => {},
|
||||||
|
// if it's further along, the expanded result will be an
|
||||||
|
// invalid HDLC block. if that's the case, make sure the
|
||||||
|
// QmdlReader indicates the stream of messages is over
|
||||||
|
// with afterwards
|
||||||
|
Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))) => {
|
||||||
|
assert!(matches!(reader.get_next_message().await, Ok(None)));
|
||||||
|
},
|
||||||
|
// if it's further along still, we may get a complete
|
||||||
|
// Message, so make sure it matches the next expected
|
||||||
|
// one. then, make sure we've hit the end of the message
|
||||||
|
// stream
|
||||||
|
Some(Ok(msg)) => {
|
||||||
|
assert_eq!(&msg, &expected_messages[truncated_hdlc_i]);
|
||||||
|
assert!(matches!(reader.get_next_message().await, Ok(None)));
|
||||||
|
},
|
||||||
|
// we should never be able to decapsulate the HDLC into
|
||||||
|
// an invalid Diag message
|
||||||
|
Some(Err(DiagParsingError::MessageParsingError(_, _)))
|
||||||
|
=> {
|
||||||
|
panic!("unexpected MessageParsingError");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// a truncated uncompressed reader should always end on an
|
||||||
|
// HdlcDecapsulationError, and then return Ok(None) to
|
||||||
|
// indicate the message stream is over
|
||||||
|
assert!(matches!(
|
||||||
|
reader.get_next_message().await,
|
||||||
|
Ok(Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))))
|
||||||
|
));
|
||||||
|
assert!(matches!(reader.get_next_message().await, Ok(None)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Writes the test containers to a QmdlWriter, optionally finishing the
|
||||||
|
/// gzip stream with a footer. Then, attempts to decompress the buffer with
|
||||||
|
/// a QmdlWriter, asserting that the containers match what's expected.
|
||||||
|
async fn run_compressed_reading_and_writing_tests(do_close: bool) {
|
||||||
|
let containers = get_test_containers();
|
||||||
|
let mut buf = Cursor::new(Vec::new());
|
||||||
|
{
|
||||||
|
let mut writer = QmdlWriter::new(&mut buf);
|
||||||
|
for container in &containers {
|
||||||
|
writer.write_container(&container).await.unwrap();
|
||||||
|
}
|
||||||
|
if do_close {
|
||||||
|
writer.close().await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
buf.set_position(0);
|
||||||
|
let mut reader = QmdlMessageReader::new(buf).await.unwrap();
|
||||||
|
assert!(reader.is_compressed());
|
||||||
|
let (_, expected_messages) = get_test_messages();
|
||||||
|
for message in expected_messages {
|
||||||
|
assert_eq!(
|
||||||
|
Ok(message),
|
||||||
|
reader.get_next_message().await.unwrap().unwrap()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
reader.get_next_messages_container().await,
|
reader.get_next_message().await,
|
||||||
Ok(None)
|
Ok(None)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_qmdl_writer() {
|
async fn test_compressed_reading_and_writing() {
|
||||||
let mut buf = Vec::new();
|
run_compressed_reading_and_writing_tests(true).await;
|
||||||
let mut writer = QmdlWriter::new(&mut buf);
|
run_compressed_reading_and_writing_tests(false).await;
|
||||||
let expected_containers = get_test_containers();
|
|
||||||
for container in &expected_containers {
|
|
||||||
writer.write_container(container).await.unwrap();
|
|
||||||
}
|
|
||||||
assert_eq!(writer.total_written, buf.len());
|
|
||||||
assert_eq!(buf, get_test_message_bytes());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_writing_and_reading() {
|
|
||||||
let mut buf = Vec::new();
|
|
||||||
let mut writer = QmdlWriter::new(&mut buf);
|
|
||||||
let expected_containers = get_test_containers();
|
|
||||||
for container in &expected_containers {
|
|
||||||
writer.write_container(container).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
let limit = Some(buf.len());
|
|
||||||
let mut reader = QmdlReader::new(Cursor::new(&mut buf), limit);
|
|
||||||
let expected_messages = get_test_messages();
|
|
||||||
for message in expected_messages {
|
|
||||||
let expected_container = MessagesContainer {
|
|
||||||
data_type: DataType::UserSpace,
|
|
||||||
num_messages: 1,
|
|
||||||
messages: vec![message],
|
|
||||||
};
|
|
||||||
assert_eq!(
|
|
||||||
expected_container,
|
|
||||||
reader.get_next_messages_container().await.unwrap().unwrap()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
assert!(matches!(
|
|
||||||
reader.get_next_messages_container().await,
|
|
||||||
Ok(None)
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user