From e0ae8a0298e716cd0c524a9193bac84c89b961d3 Mon Sep 17 00:00:00 2001 From: Will Greenberg Date: Wed, 1 Apr 2026 11:55:56 -0700 Subject: [PATCH] run cargo fmt --- daemon/src/diag.rs | 2 +- daemon/src/pcap.rs | 5 +---- daemon/src/qmdl_store.rs | 16 +++++++++++++--- daemon/src/server.rs | 26 +++++++++++++++----------- lib/src/qmdl.rs | 31 +++++++++++++++++-------------- 5 files changed, 47 insertions(+), 33 deletions(-) diff --git a/daemon/src/diag.rs b/daemon/src/diag.rs index b80d40f..bf0c863 100644 --- a/daemon/src/diag.rs +++ b/daemon/src/diag.rs @@ -243,7 +243,7 @@ impl DiagTask { } = state { match (qmdl_writer.close().await, analysis_writer.close().await) { - (Ok(()), Ok(())) => {}, + (Ok(()), Ok(())) => {} (qmdl_result, analysis_result) => { if let Err(err) = qmdl_result { error!("failed to close QmdlWriter: {:?}", err); diff --git a/daemon/src/pcap.rs b/daemon/src/pcap.rs index e917e8f..93b29cc 100644 --- a/daemon/src/pcap.rs +++ b/daemon/src/pcap.rs @@ -68,10 +68,7 @@ pub async fn get_pcap( Ok((headers, body).into_response()) } -pub async fn generate_pcap_data( - writer: W, - mut reader: QmdlReader, -) -> Result<(), Error> +pub async fn generate_pcap_data(writer: W, mut reader: QmdlReader) -> Result<(), Error> where W: AsyncWrite + Unpin + Send, R: AsyncRead + Unpin, diff --git a/daemon/src/qmdl_store.rs b/daemon/src/qmdl_store.rs index ea42dae..ac52182 100644 --- a/daemon/src/qmdl_store.rs +++ b/daemon/src/qmdl_store.rs @@ -280,12 +280,19 @@ impl RecordingStore { } // Returns the corresponding QMDL file for a given entry - pub async fn open_entry_qmdl(&self, entry_index: usize) -> Result, RecordingStoreError> { + pub async fn open_entry_qmdl( + &self, + entry_index: usize, + ) -> Result, RecordingStoreError> { let entry = &self.manifest.entries[entry_index]; let file = File::open(entry.get_qmdl_filepath(&self.path)) .await .map_err(RecordingStoreError::ReadFileError)?; - Ok(QmdlReader::new(file, entry.compressed, Some(entry.uncompressed_qmdl_size_bytes))) + Ok(QmdlReader::new( + file, + entry.compressed, + Some(entry.uncompressed_qmdl_size_bytes), + )) } // Returns the corresponding QMDL file for a given entry @@ -506,7 +513,10 @@ mod tests { .entry_for_name(&store.manifest.entries[entry_index].name) .unwrap(); assert!(entry.last_message_time.is_some()); - assert_eq!(store.manifest.entries[entry_index].uncompressed_qmdl_size_bytes, 1000); + assert_eq!( + store.manifest.entries[entry_index].uncompressed_qmdl_size_bytes, + 1000 + ); assert_eq!( RecordingStore::read_manifest(dir.path()).await.unwrap(), store.manifest diff --git a/daemon/src/server.rs b/daemon/src/server.rs index 979cebf..41768fd 100644 --- a/daemon/src/server.rs +++ b/daemon/src/server.rs @@ -12,9 +12,9 @@ use axum::response::{IntoResponse, Response}; use chrono::{DateTime, Local}; use log::{error, warn}; use serde::{Deserialize, Serialize}; -use tokio::io::copy; use std::sync::Arc; use tokio::fs::write; +use tokio::io::copy; use tokio::io::duplex; use tokio::sync::RwLock; use tokio::sync::mpsc::Sender; @@ -77,7 +77,10 @@ pub async fn get_qmdl( let headers = [ (CONTENT_TYPE, "application/octet-stream"), - (CONTENT_LENGTH, &entry.uncompressed_qmdl_size_bytes.to_string()), + ( + CONTENT_LENGTH, + &entry.uncompressed_qmdl_size_bytes.to_string(), + ), ]; let body = Body::from_stream(qmdl_reader.as_stream()); Ok((headers, body).into_response()) @@ -335,16 +338,16 @@ pub async fn get_zip( // Add QMDL file { let extension = if compressed { "qmdl.gz" } else { "qmdl" }; - let entry = - ZipEntryBuilder::new(format!("{qmdl_idx}.{extension}").into(), Compression::Stored); + let entry = ZipEntryBuilder::new( + format!("{qmdl_idx}.{extension}").into(), + Compression::Stored, + ); // FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does // not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed // once https://github.com/Majored/rs-async-zip/pull/160 is released. let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write(); let qmdl_store = qmdl_store_lock.read().await; - let mut qmdl_reader = qmdl_store - .open_entry_qmdl(entry_index) - .await?; + let mut qmdl_reader = qmdl_store.open_entry_qmdl(entry_index).await?; copy(&mut qmdl_reader, &mut entry_writer).await?; entry_writer.into_inner().close().await?; } @@ -356,9 +359,7 @@ pub async fn get_zip( let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write(); let qmdl_store = qmdl_store_lock.read().await; - let qmdl_reader = qmdl_store - .open_entry_qmdl(entry_index) - .await?; + let qmdl_reader = qmdl_store.open_entry_qmdl(entry_index).await?; if let Err(e) = generate_pcap_data(&mut entry_writer, qmdl_reader).await { // if we fail to generate the PCAP file, we should still continue and give the @@ -520,7 +521,10 @@ mod tests { assert_eq!( filenames, - vec![format!("{entry_name}.qmdl.gz"), format!("{entry_name}.pcapng"),] + vec![ + format!("{entry_name}.qmdl.gz"), + format!("{entry_name}.pcapng"), + ] ); } } diff --git a/lib/src/qmdl.rs b/lib/src/qmdl.rs index a9515f8..78c41fe 100644 --- a/lib/src/qmdl.rs +++ b/lib/src/qmdl.rs @@ -9,11 +9,11 @@ use std::task::Poll; use crate::diag::{DataType, HdlcEncapsulatedMessage, MESSAGE_TERMINATOR, MessagesContainer}; +use async_compression::tokio::bufread::GzipDecoder; +use async_compression::tokio::write::GzipEncoder; use futures::TryStream; use log::error; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; -use async_compression::tokio::bufread::GzipDecoder; -use async_compression::tokio::write::GzipEncoder; pub struct QmdlWriter where @@ -79,7 +79,7 @@ struct QmdlAsyncReader { impl QmdlAsyncReader where - T: AsyncRead + T: AsyncRead, { pub fn new(reader: T, compressed: bool, max_uncompressed_bytes: Option) -> Self { let source = if compressed { @@ -137,13 +137,11 @@ where Poll::Ready(Err(err)) if err.kind() == ErrorKind::UnexpectedEof => { *eof = true; Poll::Ready(Ok(())) - }, + } res => res, } - }, - QmdlReaderSource::Uncompressed { reader } => { - Pin::new(reader).poll_read(cx, buf) - }, + } + QmdlReaderSource::Uncompressed { reader } => Pin::new(reader).poll_read(cx, buf), }; // if we read more bytes than is allowed, cap the buffer by @@ -175,14 +173,14 @@ where pub fn new(reader: T, compressed: bool, max_uncompressed_bytes: Option) -> Self { QmdlReader { buf_reader: BufReader::new(QmdlAsyncReader::new( - reader, compressed, max_uncompressed_bytes + reader, + compressed, + max_uncompressed_bytes, )), } } - pub fn as_stream( - self, - ) -> impl TryStream { + pub fn as_stream(self) -> impl TryStream { futures::stream::try_unfold(self, |mut reader| async { let maybe_container = reader.get_next_messages_container().await?; match maybe_container { @@ -196,7 +194,12 @@ where &mut self, ) -> Result, std::io::Error> { let mut buf = Vec::new(); - if self.buf_reader.read_until(MESSAGE_TERMINATOR, &mut buf).await? == 0 { + if self + .buf_reader + .read_until(MESSAGE_TERMINATOR, &mut buf) + .await? + == 0 + { return Ok(None); } @@ -218,7 +221,7 @@ where impl AsyncRead for QmdlReader where - T: AsyncRead + Unpin + T: AsyncRead + Unpin, { fn poll_read( self: Pin<&mut Self>,