mirror of
https://github.com/EFForg/rayhunter.git
synced 2026-05-31 10:13:35 -07:00
run cargo fmt
This commit is contained in:
+1
-1
@@ -243,7 +243,7 @@ impl DiagTask {
|
||||
} = state
|
||||
{
|
||||
match (qmdl_writer.close().await, analysis_writer.close().await) {
|
||||
(Ok(()), Ok(())) => {},
|
||||
(Ok(()), Ok(())) => {}
|
||||
(qmdl_result, analysis_result) => {
|
||||
if let Err(err) = qmdl_result {
|
||||
error!("failed to close QmdlWriter: {:?}", err);
|
||||
|
||||
+1
-4
@@ -68,10 +68,7 @@ pub async fn get_pcap(
|
||||
Ok((headers, body).into_response())
|
||||
}
|
||||
|
||||
pub async fn generate_pcap_data<R, W>(
|
||||
writer: W,
|
||||
mut reader: QmdlReader<R>,
|
||||
) -> Result<(), Error>
|
||||
pub async fn generate_pcap_data<R, W>(writer: W, mut reader: QmdlReader<R>) -> Result<(), Error>
|
||||
where
|
||||
W: AsyncWrite + Unpin + Send,
|
||||
R: AsyncRead + Unpin,
|
||||
|
||||
@@ -280,12 +280,19 @@ impl RecordingStore {
|
||||
}
|
||||
|
||||
// Returns the corresponding QMDL file for a given entry
|
||||
pub async fn open_entry_qmdl(&self, entry_index: usize) -> Result<QmdlReader<File>, RecordingStoreError> {
|
||||
pub async fn open_entry_qmdl(
|
||||
&self,
|
||||
entry_index: usize,
|
||||
) -> Result<QmdlReader<File>, RecordingStoreError> {
|
||||
let entry = &self.manifest.entries[entry_index];
|
||||
let file = File::open(entry.get_qmdl_filepath(&self.path))
|
||||
.await
|
||||
.map_err(RecordingStoreError::ReadFileError)?;
|
||||
Ok(QmdlReader::new(file, entry.compressed, Some(entry.uncompressed_qmdl_size_bytes)))
|
||||
Ok(QmdlReader::new(
|
||||
file,
|
||||
entry.compressed,
|
||||
Some(entry.uncompressed_qmdl_size_bytes),
|
||||
))
|
||||
}
|
||||
|
||||
// Returns the corresponding QMDL file for a given entry
|
||||
@@ -506,7 +513,10 @@ mod tests {
|
||||
.entry_for_name(&store.manifest.entries[entry_index].name)
|
||||
.unwrap();
|
||||
assert!(entry.last_message_time.is_some());
|
||||
assert_eq!(store.manifest.entries[entry_index].uncompressed_qmdl_size_bytes, 1000);
|
||||
assert_eq!(
|
||||
store.manifest.entries[entry_index].uncompressed_qmdl_size_bytes,
|
||||
1000
|
||||
);
|
||||
assert_eq!(
|
||||
RecordingStore::read_manifest(dir.path()).await.unwrap(),
|
||||
store.manifest
|
||||
|
||||
+15
-11
@@ -12,9 +12,9 @@ use axum::response::{IntoResponse, Response};
|
||||
use chrono::{DateTime, Local};
|
||||
use log::{error, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::copy;
|
||||
use std::sync::Arc;
|
||||
use tokio::fs::write;
|
||||
use tokio::io::copy;
|
||||
use tokio::io::duplex;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
@@ -77,7 +77,10 @@ pub async fn get_qmdl(
|
||||
|
||||
let headers = [
|
||||
(CONTENT_TYPE, "application/octet-stream"),
|
||||
(CONTENT_LENGTH, &entry.uncompressed_qmdl_size_bytes.to_string()),
|
||||
(
|
||||
CONTENT_LENGTH,
|
||||
&entry.uncompressed_qmdl_size_bytes.to_string(),
|
||||
),
|
||||
];
|
||||
let body = Body::from_stream(qmdl_reader.as_stream());
|
||||
Ok((headers, body).into_response())
|
||||
@@ -335,16 +338,16 @@ pub async fn get_zip(
|
||||
// Add QMDL file
|
||||
{
|
||||
let extension = if compressed { "qmdl.gz" } else { "qmdl" };
|
||||
let entry =
|
||||
ZipEntryBuilder::new(format!("{qmdl_idx}.{extension}").into(), Compression::Stored);
|
||||
let entry = ZipEntryBuilder::new(
|
||||
format!("{qmdl_idx}.{extension}").into(),
|
||||
Compression::Stored,
|
||||
);
|
||||
// FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does
|
||||
// not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed
|
||||
// once https://github.com/Majored/rs-async-zip/pull/160 is released.
|
||||
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
|
||||
let qmdl_store = qmdl_store_lock.read().await;
|
||||
let mut qmdl_reader = qmdl_store
|
||||
.open_entry_qmdl(entry_index)
|
||||
.await?;
|
||||
let mut qmdl_reader = qmdl_store.open_entry_qmdl(entry_index).await?;
|
||||
copy(&mut qmdl_reader, &mut entry_writer).await?;
|
||||
entry_writer.into_inner().close().await?;
|
||||
}
|
||||
@@ -356,9 +359,7 @@ pub async fn get_zip(
|
||||
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();
|
||||
|
||||
let qmdl_store = qmdl_store_lock.read().await;
|
||||
let qmdl_reader = qmdl_store
|
||||
.open_entry_qmdl(entry_index)
|
||||
.await?;
|
||||
let qmdl_reader = qmdl_store.open_entry_qmdl(entry_index).await?;
|
||||
|
||||
if let Err(e) = generate_pcap_data(&mut entry_writer, qmdl_reader).await {
|
||||
// if we fail to generate the PCAP file, we should still continue and give the
|
||||
@@ -520,7 +521,10 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
filenames,
|
||||
vec![format!("{entry_name}.qmdl.gz"), format!("{entry_name}.pcapng"),]
|
||||
vec![
|
||||
format!("{entry_name}.qmdl.gz"),
|
||||
format!("{entry_name}.pcapng"),
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
+17
-14
@@ -9,11 +9,11 @@ use std::task::Poll;
|
||||
|
||||
use crate::diag::{DataType, HdlcEncapsulatedMessage, MESSAGE_TERMINATOR, MessagesContainer};
|
||||
|
||||
use async_compression::tokio::bufread::GzipDecoder;
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use futures::TryStream;
|
||||
use log::error;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
|
||||
use async_compression::tokio::bufread::GzipDecoder;
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
|
||||
pub struct QmdlWriter<T>
|
||||
where
|
||||
@@ -79,7 +79,7 @@ struct QmdlAsyncReader<T> {
|
||||
|
||||
impl<T> QmdlAsyncReader<T>
|
||||
where
|
||||
T: AsyncRead
|
||||
T: AsyncRead,
|
||||
{
|
||||
pub fn new(reader: T, compressed: bool, max_uncompressed_bytes: Option<usize>) -> Self {
|
||||
let source = if compressed {
|
||||
@@ -137,13 +137,11 @@ where
|
||||
Poll::Ready(Err(err)) if err.kind() == ErrorKind::UnexpectedEof => {
|
||||
*eof = true;
|
||||
Poll::Ready(Ok(()))
|
||||
},
|
||||
}
|
||||
res => res,
|
||||
}
|
||||
},
|
||||
QmdlReaderSource::Uncompressed { reader } => {
|
||||
Pin::new(reader).poll_read(cx, buf)
|
||||
},
|
||||
}
|
||||
QmdlReaderSource::Uncompressed { reader } => Pin::new(reader).poll_read(cx, buf),
|
||||
};
|
||||
|
||||
// if we read more bytes than is allowed, cap the buffer by
|
||||
@@ -175,14 +173,14 @@ where
|
||||
pub fn new(reader: T, compressed: bool, max_uncompressed_bytes: Option<usize>) -> Self {
|
||||
QmdlReader {
|
||||
buf_reader: BufReader::new(QmdlAsyncReader::new(
|
||||
reader, compressed, max_uncompressed_bytes
|
||||
reader,
|
||||
compressed,
|
||||
max_uncompressed_bytes,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_stream(
|
||||
self,
|
||||
) -> impl TryStream<Ok = MessagesContainer, Error = std::io::Error> {
|
||||
pub fn as_stream(self) -> impl TryStream<Ok = MessagesContainer, Error = std::io::Error> {
|
||||
futures::stream::try_unfold(self, |mut reader| async {
|
||||
let maybe_container = reader.get_next_messages_container().await?;
|
||||
match maybe_container {
|
||||
@@ -196,7 +194,12 @@ where
|
||||
&mut self,
|
||||
) -> Result<Option<MessagesContainer>, std::io::Error> {
|
||||
let mut buf = Vec::new();
|
||||
if self.buf_reader.read_until(MESSAGE_TERMINATOR, &mut buf).await? == 0 {
|
||||
if self
|
||||
.buf_reader
|
||||
.read_until(MESSAGE_TERMINATOR, &mut buf)
|
||||
.await?
|
||||
== 0
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -218,7 +221,7 @@ where
|
||||
|
||||
impl<T> AsyncRead for QmdlReader<T>
|
||||
where
|
||||
T: AsyncRead + Unpin
|
||||
T: AsyncRead + Unpin,
|
||||
{
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
|
||||
Reference in New Issue
Block a user