run cargo fmt

This commit is contained in:
Will Greenberg
2026-06-03 20:14:09 -07:00
committed by Brad Warren
parent 94b989c3c0
commit 76ae8fccd9
8 changed files with 96 additions and 133 deletions
+6 -2
View File
@@ -421,7 +421,10 @@ impl Harness {
row
}
pub fn analyze_qmdl_message(&mut self, maybe_qmdl_message: Result<Message, DiagParsingError>) -> AnalysisRow {
pub fn analyze_qmdl_message(
&mut self,
maybe_qmdl_message: Result<Message, DiagParsingError>,
) -> AnalysisRow {
let mut row = AnalysisRow::new();
self.packet_num += 1;
@@ -458,7 +461,8 @@ impl Harness {
}
pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> {
container.messages()
container
.messages()
.drain(..)
.map(|maybe_message| self.analyze_qmdl_message(maybe_message))
.collect()
+1 -4
View File
@@ -157,10 +157,7 @@ impl Message {
}
Err(e) => Err(DiagParsingError::MessageParsingError(e, data)),
},
Err(err) => Err(DiagParsingError::HdlcDecapsulationError(
err,
data.to_vec(),
)),
Err(err) => Err(DiagParsingError::HdlcDecapsulationError(err, data.to_vec())),
}
}
}
+30 -39
View File
@@ -12,7 +12,10 @@ use crate::diag::{DiagParsingError, MESSAGE_TERMINATOR, Message, MessagesContain
use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::write::GzipEncoder;
use futures::TryStream;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::io::{
AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt,
BufReader,
};
const GZIP_MAGIC_NUMBER: u16 = 0x1f8b;
@@ -82,9 +85,7 @@ where
} else {
QmdlReaderSource::Uncompressed { reader }
};
Self {
source,
}
Self { source }
}
}
@@ -130,7 +131,7 @@ where
async fn is_gzip_stream<T>(mut reader: T) -> std::io::Result<bool>
where
T: AsyncRead + AsyncSeek + Unpin
T: AsyncRead + AsyncSeek + Unpin,
{
let magic_number = reader.read_u16().await?;
reader.rewind().await?;
@@ -144,35 +145,37 @@ where
T: AsyncRead + AsyncSeek + Unpin,
{
pub async fn new(mut reader: T) -> std::io::Result<Self> {
let compressed = is_gzip_stream(&mut reader)
.await
.unwrap_or(false);
let compressed = is_gzip_stream(&mut reader).await.unwrap_or(false);
Ok(QmdlMessageReader {
buf_reader: BufReader::new(QmdlAsyncReader::new(
reader,
compressed,
)),
buf_reader: BufReader::new(QmdlAsyncReader::new(reader, compressed)),
})
}
pub fn is_compressed(&self) -> bool {
matches!(self.buf_reader.get_ref().source, QmdlReaderSource::Compressed { .. })
matches!(
self.buf_reader.get_ref().source,
QmdlReaderSource::Compressed { .. }
)
}
pub fn into_qmdl_stream(self) -> impl TryStream<Ok = Vec<u8>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async {
let mut buf = vec![];
match reader .buf_reader
match reader
.buf_reader
.read_until(MESSAGE_TERMINATOR, &mut buf)
.await {
Err(err) => Err(err),
Ok(0) => Ok(None),
Ok(_) => Ok(Some((buf, reader))),
.await
{
Err(err) => Err(err),
Ok(0) => Ok(None),
Ok(_) => Ok(Some((buf, reader))),
}
})
}
pub fn into_message_stream(self) -> impl TryStream<Ok = Result<Message, DiagParsingError>, Error = std::io::Error> {
pub fn into_message_stream(
self,
) -> impl TryStream<Ok = Result<Message, DiagParsingError>, Error = std::io::Error> {
futures::stream::try_unfold(self, |mut reader| async {
match reader.get_next_message().await? {
Some(res) => Ok(Some((res, reader))),
@@ -234,10 +237,7 @@ mod test {
// test messages
fn get_test_message_bytes() -> Vec<u8> {
let (hdlcs, _) = get_test_messages();
hdlcs
.iter()
.flat_map(|msg| msg.data.clone())
.collect()
hdlcs.iter().flat_map(|msg| msg.data.clone()).collect()
}
fn get_test_containers() -> Vec<MessagesContainer> {
@@ -264,10 +264,7 @@ mod test {
assert!(!reader.is_compressed());
let (_, expected_messages) = get_test_messages();
for msg in expected_messages {
assert_eq!(
Ok(msg),
reader.get_next_message().await.unwrap().unwrap()
);
assert_eq!(Ok(msg), reader.get_next_message().await.unwrap().unwrap());
}
}
@@ -298,9 +295,7 @@ mod test {
} else {
(
get_test_message_bytes(),
hdlcs.iter()
.map(|hdlc| hdlc.data.len())
.collect()
hdlcs.iter().map(|hdlc| hdlc.data.len()).collect(),
)
};
for truncated_hdlc_i in 1..hdlcs.len() - 1 {
@@ -323,14 +318,14 @@ mod test {
// GzipDecoder will detect an unexpected EOF, and our
// QmdlReader will indicate the stream of messages is
// done
None => {},
None => {}
// if it's further along, the expanded result will be an
// invalid HDLC block. if that's the case, make sure the
// QmdlReader indicates the stream of messages is over
// with afterwards
Some(Err(DiagParsingError::HdlcDecapsulationError(_, _))) => {
assert!(matches!(reader.get_next_message().await, Ok(None)));
},
}
// if it's further along still, we may get a complete
// Message, so make sure it matches the next expected
// one. then, make sure we've hit the end of the message
@@ -338,11 +333,10 @@ mod test {
Some(Ok(msg)) => {
assert_eq!(&msg, &expected_messages[truncated_hdlc_i]);
assert!(matches!(reader.get_next_message().await, Ok(None)));
},
}
// we should never be able to decapsulate the HDLC into
// an invalid Diag message
Some(Err(DiagParsingError::MessageParsingError(_, _)))
=> {
Some(Err(DiagParsingError::MessageParsingError(_, _))) => {
panic!("unexpected MessageParsingError");
}
}
@@ -385,10 +379,7 @@ mod test {
reader.get_next_message().await.unwrap().unwrap()
);
}
assert!(matches!(
reader.get_next_message().await,
Ok(None)
));
assert!(matches!(reader.get_next_message().await, Ok(None)));
}
#[tokio::test]