run cargo fmt

This commit is contained in:
Will Greenberg
2026-06-03 20:14:09 -07:00
parent 19d9b3967c
commit b8fd4204db
8 changed files with 96 additions and 133 deletions
+6 -2
View File
@@ -111,7 +111,9 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) {
async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) { async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) {
let mut harness = Harness::new_with_config(&AnalyzerConfig::default()); let mut harness = Harness::new_with_config(&AnalyzerConfig::default());
let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file"); let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file");
let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader"); let mut qmdl_reader = QmdlMessageReader::new(qmdl_file)
.await
.expect("failed to open QmdlReader");
let mut report = Report::new(qmdl_path); let mut report = Report::new(qmdl_path);
while let Some(maybe_message) = qmdl_reader while let Some(maybe_message) = qmdl_reader
.get_next_message() .get_next_message()
@@ -127,7 +129,9 @@ async fn pcapify(qmdl_path: &PathBuf) {
let qmdl_file = &mut File::open(&qmdl_path) let qmdl_file = &mut File::open(&qmdl_path)
.await .await
.expect("failed to open qmdl file"); .expect("failed to open qmdl file");
let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader"); let mut qmdl_reader = QmdlMessageReader::new(qmdl_file)
.await
.expect("failed to open QmdlReader");
let mut pcap_path = qmdl_path.clone(); let mut pcap_path = qmdl_path.clone();
pcap_path.set_extension("pcapng"); pcap_path.set_extension("pcapng");
let pcap_file = &mut File::create(&pcap_path) let pcap_file = &mut File::create(&pcap_path)
+1 -1
View File
@@ -1,5 +1,5 @@
use std::sync::Arc;
use std::cmp; use std::cmp;
use std::sync::Arc;
use axum::Json; use axum::Json;
use axum::{ use axum::{
+4 -7
View File
@@ -387,13 +387,10 @@ impl DiagTask {
"total QMDL bytes written: {}, updating manifest...", "total QMDL bytes written: {}, updating manifest...",
file_size file_size
); );
let index = qmdl_store let index = qmdl_store.current_entry.expect(
.current_entry "DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???",
.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); );
if let Err(e) = qmdl_store if let Err(e) = qmdl_store.update_entry_qmdl_size(index, file_size).await {
.update_entry_qmdl_size(index, file_size)
.await
{
let reason = format!("failed to update manifest (disk full?): {e}"); let reason = format!("failed to update manifest (disk full?): {e}");
error!("{reason}"); error!("{reason}");
self.stop(qmdl_store, Some(reason)).await; self.stop(qmdl_store, Some(reason)).await;
+15 -7
View File
@@ -64,8 +64,15 @@ impl FileKind {
} }
} }
pub fn get_filepath<P: AsRef<Path>>(&self, entry_name: &str, base_path: P, qmdl_compressed: bool) -> PathBuf { pub fn get_filepath<P: AsRef<Path>>(
base_path.as_ref().join(self.get_filename(entry_name, qmdl_compressed)) &self,
entry_name: &str,
base_path: P,
qmdl_compressed: bool,
) -> PathBuf {
base_path
.as_ref()
.join(self.get_filename(entry_name, qmdl_compressed))
} }
} }
@@ -504,7 +511,11 @@ impl RecordingStore {
self.write_manifest().await?; self.write_manifest().await?;
for &file_kind in FileKind::ALL { for &file_kind in FileKind::ALL {
let filepath = file_kind.get_filepath(&entry_to_delete.name, &self.path, entry_to_delete.compressed); let filepath = file_kind.get_filepath(
&entry_to_delete.name,
&self.path,
entry_to_delete.compressed,
);
remove_file_if_exists(&filepath) remove_file_if_exists(&filepath)
.await .await
.map_err(RecordingStoreError::DeleteFileError)?; .map_err(RecordingStoreError::DeleteFileError)?;
@@ -591,10 +602,7 @@ mod tests {
.entry_for_name(&store.manifest.entries[entry_index].name) .entry_for_name(&store.manifest.entries[entry_index].name)
.unwrap(); .unwrap();
assert!(entry.last_message_time.is_some()); assert!(entry.last_message_time.is_some());
assert_eq!( assert_eq!(store.manifest.entries[entry_index].qmdl_size_bytes, 1000);
store.manifest.entries[entry_index].qmdl_size_bytes,
1000
);
assert_eq!( assert_eq!(
RecordingStore::read_manifest(dir.path()).await.unwrap(), RecordingStore::read_manifest(dir.path()).await.unwrap(),
store.manifest store.manifest
+33 -71
View File
@@ -13,9 +13,9 @@ use chrono::{DateTime, Local};
use log::{error, warn}; use log::{error, warn};
use rayhunter::qmdl::QmdlMessageReader; use rayhunter::qmdl::QmdlMessageReader;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use std::sync::Arc; use std::sync::Arc;
use tokio::fs::write; use tokio::fs::write;
use tokio::io::AsyncReadExt;
use tokio::io::copy; use tokio::io::copy;
use tokio::io::duplex; use tokio::io::duplex;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@@ -84,21 +84,16 @@ pub async fn get_qmdl(
) )
})? })?
.ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?; .ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?;
let qmdl_reader = QmdlMessageReader::new(qmdl_file) let qmdl_reader = QmdlMessageReader::new(qmdl_file).await.map_err(|err| {
.await (
.map_err(|err| { StatusCode::INTERNAL_SERVER_ERROR,
( format!("error reading QMDL file: {err}"),
StatusCode::INTERNAL_SERVER_ERROR, )
format!("error reading QMDL file: {err}"), })?;
)
})?;
let headers = [ let headers = [
(CONTENT_TYPE, "application/octet-stream"), (CONTENT_TYPE, "application/octet-stream"),
( (CONTENT_LENGTH, &entry.qmdl_size_bytes.to_string()),
CONTENT_LENGTH,
&entry.qmdl_size_bytes.to_string(),
),
]; ];
let body = Body::from_stream(qmdl_reader.into_qmdl_stream()); let body = Body::from_stream(qmdl_reader.into_qmdl_stream());
Ok((headers, body).into_response()) Ok((headers, body).into_response())
@@ -423,12 +418,8 @@ pub async fn get_zip(
}; };
let qmdl_reader = QmdlMessageReader::new(qmdl_file_for_pcap).await?; let qmdl_reader = QmdlMessageReader::new(qmdl_file_for_pcap).await?;
if let Err(e) = generate_pcap_data( if let Err(e) =
&mut entry_writer, generate_pcap_data(&mut entry_writer, qmdl_reader, gps_records).await
qmdl_reader,
gps_records,
)
.await
{ {
// if we fail to generate the PCAP file, we should still continue and give the // if we fail to generate the PCAP file, we should still continue and give the
// user the QMDL. // user the QMDL.
@@ -549,7 +540,10 @@ mod tests {
use async_zip::base::read::mem::ZipFileReader; use async_zip::base::read::mem::ZipFileReader;
use axum::extract::{Path, State}; use axum::extract::{Path, State};
use futures::AsyncReadExt; use futures::AsyncReadExt;
use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlMessageReader, QmdlWriter}}; use rayhunter::{
diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer},
qmdl::{QmdlMessageReader, QmdlWriter},
};
use tempfile::TempDir; use tempfile::TempDir;
async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) { async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
@@ -567,7 +561,8 @@ mod tests {
) -> String { ) -> String {
let entry_name = { let entry_name = {
let mut store = store_lock.write().await; let mut store = store_lock.write().await;
let (mut qmdl_gz_file, _analysis_file) = store.new_entry(GpsMode::Disabled).await.unwrap(); let (mut qmdl_gz_file, _analysis_file) =
store.new_entry(GpsMode::Disabled).await.unwrap();
let mut writer = QmdlWriter::new(&mut qmdl_gz_file); let mut writer = QmdlWriter::new(&mut qmdl_gz_file);
writer.write_container(test_data).await.unwrap(); writer.write_container(test_data).await.unwrap();
@@ -624,52 +619,13 @@ mod tests {
MessagesContainer { MessagesContainer {
data_type: DataType::UserSpace, data_type: DataType::UserSpace,
num_messages: 1, num_messages: 1,
messages: vec![ messages: vec![HdlcEncapsulatedMessage {
HdlcEncapsulatedMessage { len: 39,
len: 39, data: vec![
data: vec![ 16, 0, 32, 0, 32, 0, 192, 176, 26, 165, 245, 135, 118, 35, 2, 1, 20, 14, 48, 0,
16, 160, 0, 2, 8, 0, 0, 217, 15, 5, 0, 0, 0, 0, 1, 0, 10, 13, 196, 126,
0, ],
32, }],
0,
32,
0,
192,
176,
26,
165,
245,
135,
118,
35,
2,
1,
20,
14,
48,
0,
160,
0,
2,
8,
0,
0,
217,
15,
5,
0,
0,
0,
0,
1,
0,
10,
13,
196,
126,
],
},
],
} }
} }
@@ -680,7 +636,9 @@ mod tests {
let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await; let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await;
let state = create_test_server_state(store_lock); let state = create_test_server_state(store_lock);
let response = get_zip(State(state), Path(entry_name.clone())).await.unwrap(); let response = get_zip(State(state), Path(entry_name.clone()))
.await
.unwrap();
let headers = response.headers(); let headers = response.headers();
assert_eq!(headers.get("content-type").unwrap(), "application/zip"); assert_eq!(headers.get("content-type").unwrap(), "application/zip");
@@ -690,7 +648,8 @@ mod tests {
let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap(); let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap();
let zip_reader_file = zip_reader.file(); let zip_reader_file = zip_reader.file();
let filenames: Vec<String> = zip_reader_file.entries() let filenames: Vec<String> = zip_reader_file
.entries()
.iter() .iter()
.map(|entry| entry.filename().as_str().unwrap().to_string()) .map(|entry| entry.filename().as_str().unwrap().to_string())
.collect(); .collect();
@@ -704,13 +663,16 @@ mod tests {
); );
let mut qmdl_body = Vec::with_capacity(128); let mut qmdl_body = Vec::with_capacity(128);
zip_reader.reader_without_entry(0) zip_reader
.reader_without_entry(0)
.await .await
.unwrap() .unwrap()
.read_to_end(&mut qmdl_body) .read_to_end(&mut qmdl_body)
.await .await
.unwrap(); .unwrap();
let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body)).await.unwrap(); let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body))
.await
.unwrap();
let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap(); let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap();
assert_eq!( assert_eq!(
qmdl_reader.get_next_message().await.unwrap(), qmdl_reader.get_next_message().await.unwrap(),
+6 -2
View File
@@ -421,7 +421,10 @@ impl Harness {
row row
} }
pub fn analyze_qmdl_message(&mut self, maybe_qmdl_message: Result<Message, DiagParsingError>) -> AnalysisRow { pub fn analyze_qmdl_message(
&mut self,
maybe_qmdl_message: Result<Message, DiagParsingError>,
) -> AnalysisRow {
let mut row = AnalysisRow::new(); let mut row = AnalysisRow::new();
self.packet_num += 1; self.packet_num += 1;
@@ -458,7 +461,8 @@ impl Harness {
} }
pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> { pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> {
container.messages() container
.messages()
.drain(..) .drain(..)
.map(|maybe_message| self.analyze_qmdl_message(maybe_message)) .map(|maybe_message| self.analyze_qmdl_message(maybe_message))
.collect() .collect()
+1 -4
View File
@@ -157,10 +157,7 @@ impl Message {
} }
Err(e) => Err(DiagParsingError::MessageParsingError(e, data)), Err(e) => Err(DiagParsingError::MessageParsingError(e, data)),
}, },
Err(err) => Err(DiagParsingError::HdlcDecapsulationError( Err(err) => Err(DiagParsingError::HdlcDecapsulationError(err, data.to_vec())),
err,
data.to_vec(),
)),
} }
} }
} }
+30 -39
View File
@@ -12,7 +12,10 @@ use crate::diag::{DiagParsingError, MESSAGE_TERMINATOR, Message, MessagesContain
use async_compression::tokio::bufread::GzipDecoder; use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::write::GzipEncoder; use async_compression::tokio::write::GzipEncoder;
use futures::TryStream; use futures::TryStream;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::io::{
AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt,
BufReader,
};
const GZIP_MAGIC_NUMBER: u16 = 0x1f8b; const GZIP_MAGIC_NUMBER: u16 = 0x1f8b;
@@ -82,9 +85,7 @@ where
} else { } else {
QmdlReaderSource::Uncompressed { reader } QmdlReaderSource::Uncompressed { reader }
}; };
Self { Self { source }
source,
}
} }
} }
@@ -130,7 +131,7 @@ where
async fn is_gzip_stream<T>(mut reader: T) -> std::io::Result<bool> async fn is_gzip_stream<T>(mut reader: T) -> std::io::Result<bool>
where where
T: AsyncRead + AsyncSeek + Unpin T: AsyncRead + AsyncSeek + Unpin,
{ {
let magic_number = reader.read_u16().await?; let magic_number = reader.read_u16().await?;
reader.rewind().await?; reader.rewind().await?;
@@ -144,35 +145,37 @@ where
T: AsyncRead + AsyncSeek + Unpin, T: AsyncRead + AsyncSeek + Unpin,
{ {
pub async fn new(mut reader: T) -> std::io::Result<Self> { pub async fn new(mut reader: T) -> std::io::Result<Self> {
let compressed = is_gzip_stream(&mut reader) let compressed = is_gzip_stream(&mut reader).await.unwrap_or(false);
.await
.unwrap_or(false);
Ok(QmdlMessageReader { Ok(QmdlMessageReader {
buf_reader: BufReader::new(QmdlAsyncReader::new( buf_reader: BufReader::new(QmdlAsyncReader::new(reader, compressed)),
reader,
compressed,
)),
}) })
} }
pub fn is_compressed(&self) -> bool { pub fn is_compressed(&self) -> bool {
matches!(self.buf_reader.get_ref().source, QmdlReaderSource::Compressed { .. }) matches!(
self.buf_reader.get_ref().source,
QmdlReaderSource::Compressed { .. }
)
} }
pub fn into_qmdl_stream(self) -> impl TryStream<Ok = Vec<u8>, Error = std::io::Error> { pub fn into_qmdl_stream(self) -> impl TryStream<Ok = Vec<u8>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async { futures::stream::try_unfold(self, |mut reader| async {
let mut buf = vec![]; let mut buf = vec![];
match reader .buf_reader match reader
.buf_reader
.read_until(MESSAGE_TERMINATOR, &mut buf) .read_until(MESSAGE_TERMINATOR, &mut buf)
.await { .await
Err(err) => Err(err), {
Ok(0) => Ok(None), Err(err) => Err(err),
Ok(_) => Ok(Some((buf, reader))), Ok(0) => Ok(None),
Ok(_) => Ok(Some((buf, reader))),
} }
}) })
} }
pub fn into_message_stream(self) -> impl TryStream<Ok = Result<Message, DiagParsingError>, Error = std::io::Error> { pub fn into_message_stream(
self,
) -> impl TryStream<Ok = Result<Message, DiagParsingError>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async { futures::stream::try_unfold(self, |mut reader| async {
match reader.get_next_message().await? { match reader.get_next_message().await? {
Some(res) => Ok(Some((res, reader))), Some(res) => Ok(Some((res, reader))),
@@ -234,10 +237,7 @@ mod test {
// test messages // test messages
fn get_test_message_bytes() -> Vec<u8> { fn get_test_message_bytes() -> Vec<u8> {
let (hdlcs, _) = get_test_messages(); let (hdlcs, _) = get_test_messages();
hdlcs hdlcs.iter().flat_map(|msg| msg.data.clone()).collect()
.iter()
.flat_map(|msg| msg.data.clone())
.collect()
} }
fn get_test_containers() -> Vec<MessagesContainer> { fn get_test_containers() -> Vec<MessagesContainer> {
@@ -264,10 +264,7 @@ mod test {
assert!(!reader.is_compressed()); assert!(!reader.is_compressed());
let (_, expected_messages) = get_test_messages(); let (_, expected_messages) = get_test_messages();
for msg in expected_messages { for msg in expected_messages {
assert_eq!( assert_eq!(Ok(msg), reader.get_next_message().await.unwrap().unwrap());
Ok(msg),
reader.get_next_message().await.unwrap().unwrap()
);
} }
} }
@@ -298,9 +295,7 @@ mod test {
} else { } else {
( (
get_test_message_bytes(), get_test_message_bytes(),
hdlcs.iter() hdlcs.iter().map(|hdlc| hdlc.data.len()).collect(),
.map(|hdlc| hdlc.data.len())
.collect()
) )
}; };
for truncated_hdlc_i in 1..hdlcs.len() - 1 { for truncated_hdlc_i in 1..hdlcs.len() - 1 {
@@ -323,14 +318,14 @@ mod test {
// GzipDecoder will detect an unexpected EOF, and our // GzipDecoder will detect an unexpected EOF, and our
// QmdlReader will indicate the stream of messages is // QmdlReader will indicate the stream of messages is
// done // done
None => {}, None => {}
// if it's further along, the expanded result will be an // if it's further along, the expanded result will be an
// invalid HDLC block. if that's the case, make sure the // invalid HDLC block. if that's the case, make sure the
// QmdlReader indicates the stream of messages is over // QmdlReader indicates the stream of messages is over
// with afterwards // with afterwards
Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))) => { Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))) => {
assert!(matches!(reader.get_next_message().await, Ok(None))); assert!(matches!(reader.get_next_message().await, Ok(None)));
}, }
// if it's further along still, we may get a complete // if it's further along still, we may get a complete
// Message, so make sure it matches the next expected // Message, so make sure it matches the next expected
// one. then, make sure we've hit the end of the message // one. then, make sure we've hit the end of the message
@@ -338,11 +333,10 @@ mod test {
Some(Ok(msg)) => { Some(Ok(msg)) => {
assert_eq!(&msg, &expected_messages[truncated_hdlc_i]); assert_eq!(&msg, &expected_messages[truncated_hdlc_i]);
assert!(matches!(reader.get_next_message().await, Ok(None))); assert!(matches!(reader.get_next_message().await, Ok(None)));
}, }
// we should never be able to decapsulate the HDLC into // we should never be able to decapsulate the HDLC into
// an invalid Diag message // an invalid Diag message
Some(Err(DiagParsingError::MessageParsingError(_, _))) Some(Err(DiagParsingError::MessageParsingError(_, _))) => {
=> {
panic!("unexpected MessageParsingError"); panic!("unexpected MessageParsingError");
} }
} }
@@ -385,10 +379,7 @@ mod test {
reader.get_next_message().await.unwrap().unwrap() reader.get_next_message().await.unwrap().unwrap()
); );
} }
assert!(matches!( assert!(matches!(reader.get_next_message().await, Ok(None)));
reader.get_next_message().await,
Ok(None)
));
} }
#[tokio::test] #[tokio::test]