mirror of
https://github.com/EFForg/rayhunter.git
synced 2026-07-05 08:08:11 -07:00
Add ZIP download endpoint
This commit is contained in:
committed by
Will Greenberg
parent
29823d3e82
commit
b2502847a1
+2
-1
@@ -15,7 +15,7 @@ use crate::diag::run_diag_read_thread;
|
||||
use crate::error::RayhunterError;
|
||||
use crate::pcap::get_pcap;
|
||||
use crate::qmdl_store::RecordingStore;
|
||||
use crate::server::{get_qmdl, serve_static, ServerState};
|
||||
use crate::server::{get_qmdl, get_zip, serve_static, ServerState};
|
||||
use crate::stats::get_system_stats;
|
||||
|
||||
use analysis::{
|
||||
@@ -46,6 +46,7 @@ fn get_router() -> AppRouter {
|
||||
Router::new()
|
||||
.route("/api/pcap/{name}", get(get_pcap))
|
||||
.route("/api/qmdl/{name}", get(get_qmdl))
|
||||
.route("/api/zip/{name}", get(get_zip))
|
||||
.route("/api/system-stats", get(get_system_stats))
|
||||
.route("/api/qmdl-manifest", get(get_qmdl_manifest))
|
||||
.route("/api/start-recording", post(start_recording))
|
||||
|
||||
+40
-30
@@ -1,19 +1,18 @@
|
||||
use crate::ServerState;
|
||||
|
||||
use anyhow::Error;
|
||||
use axum::body::Body;
|
||||
use axum::extract::{Path, State};
|
||||
use axum::http::header::CONTENT_TYPE;
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use futures::TryStreamExt;
|
||||
use log::error;
|
||||
use rayhunter::diag::DataType;
|
||||
use rayhunter::gsmtap_parser;
|
||||
use rayhunter::pcap::GsmtapPcapWriter;
|
||||
use rayhunter::qmdl::QmdlReader;
|
||||
use std::sync::Arc;
|
||||
use std::{future, pin::pin};
|
||||
use tokio::io::duplex;
|
||||
use tokio::io::{duplex, AsyncRead, AsyncWrite};
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
|
||||
@@ -45,35 +44,10 @@ pub async fn get_pcap(
|
||||
// the QMDL reader should stop at the last successfully written data chunk
|
||||
// (entry.size_bytes)
|
||||
let (reader, writer) = duplex(1024);
|
||||
let mut pcap_writer = GsmtapPcapWriter::new(writer).await.unwrap();
|
||||
pcap_writer.write_iface_header().await.unwrap();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes));
|
||||
let mut messages_stream = pin!(reader
|
||||
.as_stream()
|
||||
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
|
||||
|
||||
while let Some(container) = messages_stream
|
||||
.try_next()
|
||||
.await
|
||||
.expect("failed getting QMDL container")
|
||||
{
|
||||
for maybe_msg in container.into_messages() {
|
||||
match maybe_msg {
|
||||
Ok(msg) => {
|
||||
let maybe_gsmtap_msg =
|
||||
gsmtap_parser::parse(msg).expect("error parsing gsmtap message");
|
||||
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
|
||||
pcap_writer
|
||||
.write_gsmtap_message(gsmtap_msg, timestamp)
|
||||
.await
|
||||
.expect("error writing pcap packet");
|
||||
}
|
||||
}
|
||||
Err(e) => error!("error parsing message: {:?}", e),
|
||||
}
|
||||
}
|
||||
if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes).await {
|
||||
error!("failed to generate PCAP: {:?}", e);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -81,3 +55,39 @@ pub async fn get_pcap(
|
||||
let body = Body::from_stream(ReaderStream::new(reader));
|
||||
Ok((headers, body).into_response())
|
||||
}
|
||||
|
||||
pub async fn generate_pcap_data<R, W>(
|
||||
writer: W,
|
||||
qmdl_file: R,
|
||||
qmdl_size_bytes: usize,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
W: AsyncWrite + Unpin + Send,
|
||||
R: AsyncRead + Unpin,
|
||||
{
|
||||
let mut pcap_writer = GsmtapPcapWriter::new(writer).await?;
|
||||
pcap_writer.write_iface_header().await?;
|
||||
|
||||
let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes));
|
||||
while let Some(container) = reader.get_next_messages_container().await? {
|
||||
if container.data_type != DataType::UserSpace {
|
||||
continue;
|
||||
}
|
||||
|
||||
for maybe_msg in container.into_messages() {
|
||||
match maybe_msg {
|
||||
Ok(msg) => {
|
||||
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
|
||||
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
|
||||
pcap_writer
|
||||
.write_gsmtap_message(gsmtap_msg, timestamp)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Err(e) => error!("error parsing message: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
+199
-1
@@ -1,3 +1,7 @@
|
||||
use anyhow::Error;
|
||||
use async_zip::tokio::write::ZipFileWriter;
|
||||
use async_zip::Compression;
|
||||
use async_zip::ZipEntryBuilder;
|
||||
use axum::body::Body;
|
||||
use axum::extract::Path;
|
||||
use axum::extract::State;
|
||||
@@ -5,13 +9,16 @@ use axum::http::header::{self, CONTENT_LENGTH, CONTENT_TYPE};
|
||||
use axum::http::{HeaderValue, StatusCode};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use include_dir::{include_dir, Dir};
|
||||
use log::error;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::{copy, duplex, AsyncReadExt};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus};
|
||||
use crate::pcap::generate_pcap_data;
|
||||
use crate::qmdl_store::RecordingStore;
|
||||
use crate::{display, DiagDeviceCtrlMessage};
|
||||
|
||||
@@ -76,3 +83,194 @@ pub async fn serve_static(
|
||||
.unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_zip(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
Path(entry_name): Path<String>,
|
||||
) -> Result<Response, (StatusCode, String)> {
|
||||
let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned();
|
||||
let (entry_index, qmdl_size_bytes) = {
|
||||
let qmdl_store = state.qmdl_store_lock.read().await;
|
||||
let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or((
|
||||
StatusCode::NOT_FOUND,
|
||||
format!("couldn't find entry with name {}", qmdl_idx),
|
||||
))?;
|
||||
|
||||
if entry.qmdl_size_bytes == 0 {
|
||||
return Err((
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
"QMDL file is empty, try again in a bit!".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
(entry_index, entry.qmdl_size_bytes)
|
||||
};
|
||||
|
||||
let qmdl_store_lock = state.qmdl_store_lock.clone();
|
||||
|
||||
let (reader, writer) = duplex(8192);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let result: Result<(), Error> = async {
|
||||
let mut zip2 = ZipFileWriter::with_tokio(writer);
|
||||
|
||||
// Add QMDL file
|
||||
{
|
||||
let entry =
|
||||
ZipEntryBuilder::new(format!("{qmdl_idx}.qmdl").into(), Compression::Stored);
|
||||
let mut entry_writer = zip2.write_entry_stream(entry).await?.compat_write();
|
||||
|
||||
let mut qmdl_file = {
|
||||
let qmdl_store = qmdl_store_lock.read().await;
|
||||
qmdl_store
|
||||
.open_entry_qmdl(entry_index)
|
||||
.await?
|
||||
.take(qmdl_size_bytes as u64)
|
||||
};
|
||||
|
||||
copy(&mut qmdl_file, &mut entry_writer).await?;
|
||||
entry_writer.into_inner().close().await?;
|
||||
}
|
||||
|
||||
// Add PCAP file
|
||||
{
|
||||
let entry =
|
||||
ZipEntryBuilder::new(format!("{qmdl_idx}.pcapng").into(), Compression::Stored);
|
||||
let mut entry_writer = zip2.write_entry_stream(entry).await?.compat_write();
|
||||
|
||||
let qmdl_file_for_pcap = {
|
||||
let qmdl_store = qmdl_store_lock.read().await;
|
||||
qmdl_store
|
||||
.open_entry_qmdl(entry_index)
|
||||
.await?
|
||||
.take(qmdl_size_bytes as u64)
|
||||
};
|
||||
|
||||
if let Err(e) =
|
||||
generate_pcap_data(&mut entry_writer, qmdl_file_for_pcap, qmdl_size_bytes).await
|
||||
{
|
||||
// if we fail to generate the PCAP file, we should still continue and give the
|
||||
// user the QMDL.
|
||||
error!("Failed to generate PCAP: {:?}", e);
|
||||
}
|
||||
|
||||
entry_writer.into_inner().close().await?;
|
||||
}
|
||||
|
||||
zip2.close().await?;
|
||||
Ok(())
|
||||
}
|
||||
.await;
|
||||
|
||||
if let Err(e) = result {
|
||||
error!("Error generating ZIP file: {:?}", e);
|
||||
}
|
||||
});
|
||||
|
||||
let headers = [(CONTENT_TYPE, "application/zip")];
|
||||
let body = Body::from_stream(ReaderStream::new(reader));
|
||||
Ok((headers, body).into_response())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_zip::base::read::mem::ZipFileReader;
|
||||
use axum::extract::{Path, State};
|
||||
use std::io::Cursor;
|
||||
use tempfile::TempDir;
|
||||
|
||||
async fn create_test_qmdl_store() -> (TempDir, Arc<RwLock<crate::qmdl_store::RecordingStore>>) {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let store_path = temp_dir.path().to_path_buf();
|
||||
let store = crate::qmdl_store::RecordingStore::create(&store_path)
|
||||
.await
|
||||
.unwrap();
|
||||
(temp_dir, Arc::new(RwLock::new(store)))
|
||||
}
|
||||
|
||||
async fn create_test_entry_with_data(
|
||||
store_lock: &Arc<RwLock<crate::qmdl_store::RecordingStore>>,
|
||||
test_data: &[u8],
|
||||
) -> String {
|
||||
let entry_name = {
|
||||
let mut store = store_lock.write().await;
|
||||
let (mut qmdl_file, _analysis_file) = store.new_entry().await.unwrap();
|
||||
|
||||
if !test_data.is_empty() {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
qmdl_file.write_all(test_data).await.unwrap();
|
||||
qmdl_file.flush().await.unwrap();
|
||||
}
|
||||
|
||||
let current_entry = store.current_entry.unwrap();
|
||||
let entry = &store.manifest.entries[current_entry];
|
||||
let entry_name = entry.name.clone();
|
||||
|
||||
store
|
||||
.update_entry_qmdl_size(current_entry, test_data.len())
|
||||
.await
|
||||
.unwrap();
|
||||
entry_name
|
||||
};
|
||||
|
||||
let mut store = store_lock.write().await;
|
||||
store.close_current_entry().await.unwrap();
|
||||
entry_name
|
||||
}
|
||||
|
||||
fn create_test_server_state(
|
||||
store_lock: Arc<RwLock<crate::qmdl_store::RecordingStore>>,
|
||||
) -> Arc<ServerState> {
|
||||
let (tx, _rx) = tokio::sync::mpsc::channel(1);
|
||||
let (ui_tx, _ui_rx) = tokio::sync::mpsc::channel(1);
|
||||
let (analysis_tx, _analysis_rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
let analysis_status = {
|
||||
let store = store_lock.try_read().unwrap();
|
||||
crate::analysis::AnalysisStatus::new(&*store)
|
||||
};
|
||||
|
||||
Arc::new(ServerState {
|
||||
qmdl_store_lock: store_lock,
|
||||
diag_device_ctrl_sender: tx,
|
||||
ui_update_sender: ui_tx,
|
||||
analysis_status_lock: Arc::new(RwLock::new(analysis_status)),
|
||||
analysis_sender: analysis_tx,
|
||||
debug_mode: true,
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_zip_success() {
|
||||
let (_temp_dir, store_lock) = create_test_qmdl_store().await;
|
||||
let test_qmdl_data = vec![0x7E, 0x00, 0x00, 0x00, 0x10, 0x00, 0x7E];
|
||||
let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await;
|
||||
let state = create_test_server_state(store_lock);
|
||||
|
||||
let result = get_zip(State(state), Path(entry_name.clone())).await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
let response = result.unwrap();
|
||||
|
||||
let headers = response.headers();
|
||||
assert_eq!(headers.get("content-type").unwrap(), "application/zip");
|
||||
|
||||
let body = response.into_body();
|
||||
let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap();
|
||||
|
||||
let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap();
|
||||
|
||||
let filenames = zip_reader
|
||||
.file()
|
||||
.entries()
|
||||
.iter()
|
||||
.map(|entry| entry.filename().as_str().unwrap().to_owned())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
assert_eq!(
|
||||
filenames,
|
||||
vec![format!("{entry_name}.qmdl"), format!("{entry_name}.pcapng"),]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user