diff --git a/check/src/main.rs b/check/src/main.rs index 940e3d8..461c826 100644 --- a/check/src/main.rs +++ b/check/src/main.rs @@ -111,7 +111,9 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) { async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) { let mut harness = Harness::new_with_config(&AnalyzerConfig::default()); let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file"); - let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader"); + let mut qmdl_reader = QmdlMessageReader::new(qmdl_file) + .await + .expect("failed to open QmdlReader"); let mut report = Report::new(qmdl_path); while let Some(maybe_message) = qmdl_reader .get_next_message() @@ -127,7 +129,9 @@ async fn pcapify(qmdl_path: &PathBuf) { let qmdl_file = &mut File::open(&qmdl_path) .await .expect("failed to open qmdl file"); - let mut qmdl_reader = QmdlMessageReader::new(qmdl_file).await.expect("failed to open QmdlReader"); + let mut qmdl_reader = QmdlMessageReader::new(qmdl_file) + .await + .expect("failed to open QmdlReader"); let mut pcap_path = qmdl_path.clone(); pcap_path.set_extension("pcapng"); let pcap_file = &mut File::create(&pcap_path) diff --git a/daemon/src/analysis.rs b/daemon/src/analysis.rs index 723fe95..99fde0f 100644 --- a/daemon/src/analysis.rs +++ b/daemon/src/analysis.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use std::cmp; +use std::sync::Arc; use axum::Json; use axum::{ diff --git a/daemon/src/diag.rs b/daemon/src/diag.rs index 3a4284d..4acf0b7 100644 --- a/daemon/src/diag.rs +++ b/daemon/src/diag.rs @@ -387,13 +387,10 @@ impl DiagTask { "total QMDL bytes written: {}, updating manifest...", file_size ); - let index = qmdl_store - .current_entry - .expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???"); - if let Err(e) = qmdl_store - .update_entry_qmdl_size(index, file_size) - .await - { + let index = qmdl_store.current_entry.expect( + "DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???", + ); + if let Err(e) = qmdl_store.update_entry_qmdl_size(index, file_size).await { let reason = format!("failed to update manifest (disk full?): {e}"); error!("{reason}"); self.stop(qmdl_store, Some(reason)).await; diff --git a/daemon/src/qmdl_store.rs b/daemon/src/qmdl_store.rs index c455846..fd9db88 100644 --- a/daemon/src/qmdl_store.rs +++ b/daemon/src/qmdl_store.rs @@ -64,8 +64,15 @@ impl FileKind { } } - pub fn get_filepath>(&self, entry_name: &str, base_path: P, qmdl_compressed: bool) -> PathBuf { - base_path.as_ref().join(self.get_filename(entry_name, qmdl_compressed)) + pub fn get_filepath>( + &self, + entry_name: &str, + base_path: P, + qmdl_compressed: bool, + ) -> PathBuf { + base_path + .as_ref() + .join(self.get_filename(entry_name, qmdl_compressed)) } } @@ -504,7 +511,11 @@ impl RecordingStore { self.write_manifest().await?; for &file_kind in FileKind::ALL { - let filepath = file_kind.get_filepath(&entry_to_delete.name, &self.path, entry_to_delete.compressed); + let filepath = file_kind.get_filepath( + &entry_to_delete.name, + &self.path, + entry_to_delete.compressed, + ); remove_file_if_exists(&filepath) .await .map_err(RecordingStoreError::DeleteFileError)?; @@ -591,10 +602,7 @@ mod tests { .entry_for_name(&store.manifest.entries[entry_index].name) .unwrap(); assert!(entry.last_message_time.is_some()); - assert_eq!( - store.manifest.entries[entry_index].qmdl_size_bytes, - 1000 - ); + assert_eq!(store.manifest.entries[entry_index].qmdl_size_bytes, 1000); assert_eq!( RecordingStore::read_manifest(dir.path()).await.unwrap(), store.manifest diff --git a/daemon/src/server.rs b/daemon/src/server.rs index e80e96f..e7b792f 100644 --- a/daemon/src/server.rs +++ b/daemon/src/server.rs @@ -13,9 +13,9 @@ use chrono::{DateTime, Local}; use log::{error, warn}; use rayhunter::qmdl::QmdlMessageReader; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncReadExt; use std::sync::Arc; use tokio::fs::write; +use tokio::io::AsyncReadExt; use tokio::io::copy; use tokio::io::duplex; use tokio::sync::RwLock; @@ -84,21 +84,16 @@ pub async fn get_qmdl( ) })? .ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?; - let qmdl_reader = QmdlMessageReader::new(qmdl_file) - .await - .map_err(|err| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("error reading QMDL file: {err}"), - ) - })?; + let qmdl_reader = QmdlMessageReader::new(qmdl_file).await.map_err(|err| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("error reading QMDL file: {err}"), + ) + })?; let headers = [ (CONTENT_TYPE, "application/octet-stream"), - ( - CONTENT_LENGTH, - &entry.qmdl_size_bytes.to_string(), - ), + (CONTENT_LENGTH, &entry.qmdl_size_bytes.to_string()), ]; let body = Body::from_stream(qmdl_reader.into_qmdl_stream()); Ok((headers, body).into_response()) @@ -423,12 +418,8 @@ pub async fn get_zip( }; let qmdl_reader = QmdlMessageReader::new(qmdl_file_for_pcap).await?; - if let Err(e) = generate_pcap_data( - &mut entry_writer, - qmdl_reader, - gps_records, - ) - .await + if let Err(e) = + generate_pcap_data(&mut entry_writer, qmdl_reader, gps_records).await { // if we fail to generate the PCAP file, we should still continue and give the // user the QMDL. @@ -549,7 +540,10 @@ mod tests { use async_zip::base::read::mem::ZipFileReader; use axum::extract::{Path, State}; use futures::AsyncReadExt; - use rayhunter::{diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, qmdl::{QmdlMessageReader, QmdlWriter}}; + use rayhunter::{ + diag::{DataType, HdlcEncapsulatedMessage, Message, MessagesContainer}, + qmdl::{QmdlMessageReader, QmdlWriter}, + }; use tempfile::TempDir; async fn create_test_qmdl_store() -> (TempDir, Arc>) { @@ -567,7 +561,8 @@ mod tests { ) -> String { let entry_name = { let mut store = store_lock.write().await; - let (mut qmdl_gz_file, _analysis_file) = store.new_entry(GpsMode::Disabled).await.unwrap(); + let (mut qmdl_gz_file, _analysis_file) = + store.new_entry(GpsMode::Disabled).await.unwrap(); let mut writer = QmdlWriter::new(&mut qmdl_gz_file); writer.write_container(test_data).await.unwrap(); @@ -624,52 +619,13 @@ mod tests { MessagesContainer { data_type: DataType::UserSpace, num_messages: 1, - messages: vec![ - HdlcEncapsulatedMessage { - len: 39, - data: vec![ - 16, - 0, - 32, - 0, - 32, - 0, - 192, - 176, - 26, - 165, - 245, - 135, - 118, - 35, - 2, - 1, - 20, - 14, - 48, - 0, - 160, - 0, - 2, - 8, - 0, - 0, - 217, - 15, - 5, - 0, - 0, - 0, - 0, - 1, - 0, - 10, - 13, - 196, - 126, - ], - }, - ], + messages: vec![HdlcEncapsulatedMessage { + len: 39, + data: vec![ + 16, 0, 32, 0, 32, 0, 192, 176, 26, 165, 245, 135, 118, 35, 2, 1, 20, 14, 48, 0, + 160, 0, 2, 8, 0, 0, 217, 15, 5, 0, 0, 0, 0, 1, 0, 10, 13, 196, 126, + ], + }], } } @@ -680,7 +636,9 @@ mod tests { let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await; let state = create_test_server_state(store_lock); - let response = get_zip(State(state), Path(entry_name.clone())).await.unwrap(); + let response = get_zip(State(state), Path(entry_name.clone())) + .await + .unwrap(); let headers = response.headers(); assert_eq!(headers.get("content-type").unwrap(), "application/zip"); @@ -690,7 +648,8 @@ mod tests { let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap(); let zip_reader_file = zip_reader.file(); - let filenames: Vec = zip_reader_file.entries() + let filenames: Vec = zip_reader_file + .entries() .iter() .map(|entry| entry.filename().as_str().unwrap().to_string()) .collect(); @@ -704,13 +663,16 @@ mod tests { ); let mut qmdl_body = Vec::with_capacity(128); - zip_reader.reader_without_entry(0) + zip_reader + .reader_without_entry(0) .await .unwrap() .read_to_end(&mut qmdl_body) .await .unwrap(); - let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body)).await.unwrap(); + let mut qmdl_reader = QmdlMessageReader::new(Cursor::new(qmdl_body)) + .await + .unwrap(); let expected_message = Message::from_hdlc(&test_qmdl_data.messages[0].data).unwrap(); assert_eq!( qmdl_reader.get_next_message().await.unwrap(), diff --git a/lib/src/analysis/analyzer.rs b/lib/src/analysis/analyzer.rs index 1d94312..c7dc085 100644 --- a/lib/src/analysis/analyzer.rs +++ b/lib/src/analysis/analyzer.rs @@ -421,7 +421,10 @@ impl Harness { row } - pub fn analyze_qmdl_message(&mut self, maybe_qmdl_message: Result) -> AnalysisRow { + pub fn analyze_qmdl_message( + &mut self, + maybe_qmdl_message: Result, + ) -> AnalysisRow { let mut row = AnalysisRow::new(); self.packet_num += 1; @@ -458,7 +461,8 @@ impl Harness { } pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec { - container.messages() + container + .messages() .drain(..) .map(|maybe_message| self.analyze_qmdl_message(maybe_message)) .collect() diff --git a/lib/src/diag.rs b/lib/src/diag.rs index 4c42b69..ce83f67 100644 --- a/lib/src/diag.rs +++ b/lib/src/diag.rs @@ -157,10 +157,7 @@ impl Message { } Err(e) => Err(DiagParsingError::MessageParsingError(e, data)), }, - Err(err) => Err(DiagParsingError::HdlcDecapsulationError( - err, - data.to_vec(), - )), + Err(err) => Err(DiagParsingError::HdlcDecapsulationError(err, data.to_vec())), } } } diff --git a/lib/src/qmdl.rs b/lib/src/qmdl.rs index b259ac1..b948a6f 100644 --- a/lib/src/qmdl.rs +++ b/lib/src/qmdl.rs @@ -12,7 +12,10 @@ use crate::diag::{DiagParsingError, MESSAGE_TERMINATOR, Message, MessagesContain use async_compression::tokio::bufread::GzipDecoder; use async_compression::tokio::write::GzipEncoder; use futures::TryStream; -use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::io::{ + AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, + BufReader, +}; const GZIP_MAGIC_NUMBER: u16 = 0x1f8b; @@ -82,9 +85,7 @@ where } else { QmdlReaderSource::Uncompressed { reader } }; - Self { - source, - } + Self { source } } } @@ -130,7 +131,7 @@ where async fn is_gzip_stream(mut reader: T) -> std::io::Result where - T: AsyncRead + AsyncSeek + Unpin + T: AsyncRead + AsyncSeek + Unpin, { let magic_number = reader.read_u16().await?; reader.rewind().await?; @@ -144,35 +145,37 @@ where T: AsyncRead + AsyncSeek + Unpin, { pub async fn new(mut reader: T) -> std::io::Result { - let compressed = is_gzip_stream(&mut reader) - .await - .unwrap_or(false); + let compressed = is_gzip_stream(&mut reader).await.unwrap_or(false); Ok(QmdlMessageReader { - buf_reader: BufReader::new(QmdlAsyncReader::new( - reader, - compressed, - )), + buf_reader: BufReader::new(QmdlAsyncReader::new(reader, compressed)), }) } pub fn is_compressed(&self) -> bool { - matches!(self.buf_reader.get_ref().source, QmdlReaderSource::Compressed { .. }) + matches!( + self.buf_reader.get_ref().source, + QmdlReaderSource::Compressed { .. } + ) } pub fn into_qmdl_stream(self) -> impl TryStream, Error = std::io::Error> { futures::stream::try_unfold(self, |mut reader| async { let mut buf = vec![]; - match reader .buf_reader + match reader + .buf_reader .read_until(MESSAGE_TERMINATOR, &mut buf) - .await { - Err(err) => Err(err), - Ok(0) => Ok(None), - Ok(_) => Ok(Some((buf, reader))), + .await + { + Err(err) => Err(err), + Ok(0) => Ok(None), + Ok(_) => Ok(Some((buf, reader))), } }) } - pub fn into_message_stream(self) -> impl TryStream, Error = std::io::Error> { + pub fn into_message_stream( + self, + ) -> impl TryStream, Error = std::io::Error> { futures::stream::try_unfold(self, |mut reader| async { match reader.get_next_message().await? { Some(res) => Ok(Some((res, reader))), @@ -234,10 +237,7 @@ mod test { // test messages fn get_test_message_bytes() -> Vec { let (hdlcs, _) = get_test_messages(); - hdlcs - .iter() - .flat_map(|msg| msg.data.clone()) - .collect() + hdlcs.iter().flat_map(|msg| msg.data.clone()).collect() } fn get_test_containers() -> Vec { @@ -264,10 +264,7 @@ mod test { assert!(!reader.is_compressed()); let (_, expected_messages) = get_test_messages(); for msg in expected_messages { - assert_eq!( - Ok(msg), - reader.get_next_message().await.unwrap().unwrap() - ); + assert_eq!(Ok(msg), reader.get_next_message().await.unwrap().unwrap()); } } @@ -298,9 +295,7 @@ mod test { } else { ( get_test_message_bytes(), - hdlcs.iter() - .map(|hdlc| hdlc.data.len()) - .collect() + hdlcs.iter().map(|hdlc| hdlc.data.len()).collect(), ) }; for truncated_hdlc_i in 1..hdlcs.len() - 1 { @@ -323,14 +318,14 @@ mod test { // GzipDecoder will detect an unexpected EOF, and our // QmdlReader will indicate the stream of messages is // done - None => {}, + None => {} // if it's further along, the expanded result will be an // invalid HDLC block. if that's the case, make sure the // QmdlReader indicates the stream of messages is over // with afterwards Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))) => { assert!(matches!(reader.get_next_message().await, Ok(None))); - }, + } // if it's further along still, we may get a complete // Message, so make sure it matches the next expected // one. then, make sure we've hit the end of the message @@ -338,11 +333,10 @@ mod test { Some(Ok(msg)) => { assert_eq!(&msg, &expected_messages[truncated_hdlc_i]); assert!(matches!(reader.get_next_message().await, Ok(None))); - }, + } // we should never be able to decapsulate the HDLC into // an invalid Diag message - Some(Err(DiagParsingError::MessageParsingError(_, _))) - => { + Some(Err(DiagParsingError::MessageParsingError(_, _))) => { panic!("unexpected MessageParsingError"); } } @@ -385,10 +379,7 @@ mod test { reader.get_next_message().await.unwrap().unwrap() ); } - assert!(matches!( - reader.get_next_message().await, - Ok(None) - )); + assert!(matches!(reader.get_next_message().await, Ok(None))); } #[tokio::test]