diff --git a/Cargo.lock b/Cargo.lock index 85326cd..b3603d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -249,6 +249,20 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "async_zip" +version = "0.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b9f7252833d5ed4b00aa9604b563529dd5e11de9c23615de2dcdf91eb87b52" +dependencies = [ + "crc32fast", + "futures-lite", + "pin-project", + "thiserror 1.0.69", + "tokio", + "tokio-util", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -2374,6 +2388,8 @@ dependencies = [ name = "rayhunter-daemon" version = "0.3.4" dependencies = [ + "anyhow", + "async_zip", "axum", "chrono", "clap", @@ -3067,6 +3083,7 @@ checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "futures-util", "hashbrown", diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 41109e7..394bce5 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -29,7 +29,7 @@ thiserror = "1.0.52" libc = "0.2.150" log = "0.4.20" env_logger = { version = "0.11", default-features = false } -tokio-util = { version = "0.7.10", features = ["rt", "io"] } +tokio-util = { version = "0.7.10", features = ["rt", "io", "compat"] } futures-macro = "0.3.30" include_dir = "0.7.3" mime_guess = "2.0.4" @@ -41,3 +41,5 @@ serde_json = "1.0.114" image = { version = "0.25.1", default-features = false, features = ["png", "gif"] } tempfile = "3.10.1" simple_logger = "5.0.0" +async_zip = { version = "0.0.17", features = ["tokio"] } +anyhow = "1.0.98" diff --git a/bin/src/daemon.rs b/bin/src/daemon.rs index 54df436..44d4059 100644 --- a/bin/src/daemon.rs +++ b/bin/src/daemon.rs @@ -15,7 +15,7 @@ use crate::diag::run_diag_read_thread; use crate::error::RayhunterError; use crate::pcap::get_pcap; use crate::qmdl_store::RecordingStore; -use crate::server::{get_qmdl, serve_static, ServerState}; +use crate::server::{get_qmdl, get_zip, serve_static, ServerState}; use crate::stats::get_system_stats; use analysis::{ @@ -46,6 +46,7 @@ fn get_router() -> AppRouter { Router::new() .route("/api/pcap/{name}", get(get_pcap)) .route("/api/qmdl/{name}", get(get_qmdl)) + .route("/api/zip/{name}", get(get_zip)) .route("/api/system-stats", get(get_system_stats)) .route("/api/qmdl-manifest", get(get_qmdl_manifest)) .route("/api/start-recording", post(start_recording)) diff --git a/bin/src/pcap.rs b/bin/src/pcap.rs index 1d4d5ad..531bd28 100644 --- a/bin/src/pcap.rs +++ b/bin/src/pcap.rs @@ -1,19 +1,18 @@ use crate::ServerState; +use anyhow::Error; use axum::body::Body; use axum::extract::{Path, State}; use axum::http::header::CONTENT_TYPE; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; -use futures::TryStreamExt; use log::error; use rayhunter::diag::DataType; use rayhunter::gsmtap_parser; use rayhunter::pcap::GsmtapPcapWriter; use rayhunter::qmdl::QmdlReader; use std::sync::Arc; -use std::{future, pin::pin}; -use tokio::io::duplex; +use tokio::io::{duplex, AsyncRead, AsyncWrite}; use tokio_util::io::ReaderStream; // Streams a pcap file chunk-by-chunk to the client by reading the QMDL data @@ -45,35 +44,10 @@ pub async fn get_pcap( // the QMDL reader should stop at the last successfully written data chunk // (entry.size_bytes) let (reader, writer) = duplex(1024); - let mut pcap_writer = GsmtapPcapWriter::new(writer).await.unwrap(); - pcap_writer.write_iface_header().await.unwrap(); tokio::spawn(async move { - let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes)); - let mut messages_stream = pin!(reader - .as_stream() - .try_filter(|container| future::ready(container.data_type == DataType::UserSpace))); - - while let Some(container) = messages_stream - .try_next() - .await - .expect("failed getting QMDL container") - { - for maybe_msg in container.into_messages() { - match maybe_msg { - Ok(msg) => { - let maybe_gsmtap_msg = - gsmtap_parser::parse(msg).expect("error parsing gsmtap message"); - if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg { - pcap_writer - .write_gsmtap_message(gsmtap_msg, timestamp) - .await - .expect("error writing pcap packet"); - } - } - Err(e) => error!("error parsing message: {:?}", e), - } - } + if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes).await { + error!("failed to generate PCAP: {:?}", e); } }); @@ -81,3 +55,39 @@ pub async fn get_pcap( let body = Body::from_stream(ReaderStream::new(reader)); Ok((headers, body).into_response()) } + +pub async fn generate_pcap_data( + writer: W, + qmdl_file: R, + qmdl_size_bytes: usize, +) -> Result<(), Error> +where + W: AsyncWrite + Unpin + Send, + R: AsyncRead + Unpin, +{ + let mut pcap_writer = GsmtapPcapWriter::new(writer).await?; + pcap_writer.write_iface_header().await?; + + let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes)); + while let Some(container) = reader.get_next_messages_container().await? { + if container.data_type != DataType::UserSpace { + continue; + } + + for maybe_msg in container.into_messages() { + match maybe_msg { + Ok(msg) => { + let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?; + if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg { + pcap_writer + .write_gsmtap_message(gsmtap_msg, timestamp) + .await?; + } + } + Err(e) => error!("error parsing message: {:?}", e), + } + } + } + + Ok(()) +} diff --git a/bin/src/server.rs b/bin/src/server.rs index 9065733..eee7271 100644 --- a/bin/src/server.rs +++ b/bin/src/server.rs @@ -1,3 +1,7 @@ +use anyhow::Error; +use async_zip::tokio::write::ZipFileWriter; +use async_zip::Compression; +use async_zip::ZipEntryBuilder; use axum::body::Body; use axum::extract::Path; use axum::extract::State; @@ -5,13 +9,16 @@ use axum::http::header::{self, CONTENT_LENGTH, CONTENT_TYPE}; use axum::http::{HeaderValue, StatusCode}; use axum::response::{IntoResponse, Response}; use include_dir::{include_dir, Dir}; +use log::error; use std::sync::Arc; -use tokio::io::AsyncReadExt; +use tokio::io::{copy, duplex, AsyncReadExt}; use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; +use tokio_util::compat::FuturesAsyncWriteCompatExt; use tokio_util::io::ReaderStream; use crate::analysis::{AnalysisCtrlMessage, AnalysisStatus}; +use crate::pcap::generate_pcap_data; use crate::qmdl_store::RecordingStore; use crate::{display, DiagDeviceCtrlMessage}; @@ -76,3 +83,194 @@ pub async fn serve_static( .unwrap(), } } + +pub async fn get_zip( + State(state): State>, + Path(entry_name): Path, +) -> Result { + let qmdl_idx = entry_name.trim_end_matches(".zip").to_owned(); + let (entry_index, qmdl_size_bytes) = { + let qmdl_store = state.qmdl_store_lock.read().await; + let (entry_index, entry) = qmdl_store.entry_for_name(&qmdl_idx).ok_or(( + StatusCode::NOT_FOUND, + format!("couldn't find entry with name {}", qmdl_idx), + ))?; + + if entry.qmdl_size_bytes == 0 { + return Err(( + StatusCode::SERVICE_UNAVAILABLE, + "QMDL file is empty, try again in a bit!".to_string(), + )); + } + + (entry_index, entry.qmdl_size_bytes) + }; + + let qmdl_store_lock = state.qmdl_store_lock.clone(); + + let (reader, writer) = duplex(8192); + + tokio::spawn(async move { + let result: Result<(), Error> = async { + let mut zip2 = ZipFileWriter::with_tokio(writer); + + // Add QMDL file + { + let entry = + ZipEntryBuilder::new(format!("{qmdl_idx}.qmdl").into(), Compression::Stored); + let mut entry_writer = zip2.write_entry_stream(entry).await?.compat_write(); + + let mut qmdl_file = { + let qmdl_store = qmdl_store_lock.read().await; + qmdl_store + .open_entry_qmdl(entry_index) + .await? + .take(qmdl_size_bytes as u64) + }; + + copy(&mut qmdl_file, &mut entry_writer).await?; + entry_writer.into_inner().close().await?; + } + + // Add PCAP file + { + let entry = + ZipEntryBuilder::new(format!("{qmdl_idx}.pcapng").into(), Compression::Stored); + let mut entry_writer = zip2.write_entry_stream(entry).await?.compat_write(); + + let qmdl_file_for_pcap = { + let qmdl_store = qmdl_store_lock.read().await; + qmdl_store + .open_entry_qmdl(entry_index) + .await? + .take(qmdl_size_bytes as u64) + }; + + if let Err(e) = + generate_pcap_data(&mut entry_writer, qmdl_file_for_pcap, qmdl_size_bytes).await + { + // if we fail to generate the PCAP file, we should still continue and give the + // user the QMDL. + error!("Failed to generate PCAP: {:?}", e); + } + + entry_writer.into_inner().close().await?; + } + + zip2.close().await?; + Ok(()) + } + .await; + + if let Err(e) = result { + error!("Error generating ZIP file: {:?}", e); + } + }); + + let headers = [(CONTENT_TYPE, "application/zip")]; + let body = Body::from_stream(ReaderStream::new(reader)); + Ok((headers, body).into_response()) +} + +#[cfg(test)] +mod tests { + use super::*; + use async_zip::base::read::mem::ZipFileReader; + use axum::extract::{Path, State}; + use std::io::Cursor; + use tempfile::TempDir; + + async fn create_test_qmdl_store() -> (TempDir, Arc>) { + let temp_dir = TempDir::new().unwrap(); + let store_path = temp_dir.path().to_path_buf(); + let store = crate::qmdl_store::RecordingStore::create(&store_path) + .await + .unwrap(); + (temp_dir, Arc::new(RwLock::new(store))) + } + + async fn create_test_entry_with_data( + store_lock: &Arc>, + test_data: &[u8], + ) -> String { + let entry_name = { + let mut store = store_lock.write().await; + let (mut qmdl_file, _analysis_file) = store.new_entry().await.unwrap(); + + if !test_data.is_empty() { + use tokio::io::AsyncWriteExt; + qmdl_file.write_all(test_data).await.unwrap(); + qmdl_file.flush().await.unwrap(); + } + + let current_entry = store.current_entry.unwrap(); + let entry = &store.manifest.entries[current_entry]; + let entry_name = entry.name.clone(); + + store + .update_entry_qmdl_size(current_entry, test_data.len()) + .await + .unwrap(); + entry_name + }; + + let mut store = store_lock.write().await; + store.close_current_entry().await.unwrap(); + entry_name + } + + fn create_test_server_state( + store_lock: Arc>, + ) -> Arc { + let (tx, _rx) = tokio::sync::mpsc::channel(1); + let (ui_tx, _ui_rx) = tokio::sync::mpsc::channel(1); + let (analysis_tx, _analysis_rx) = tokio::sync::mpsc::channel(1); + + let analysis_status = { + let store = store_lock.try_read().unwrap(); + crate::analysis::AnalysisStatus::new(&*store) + }; + + Arc::new(ServerState { + qmdl_store_lock: store_lock, + diag_device_ctrl_sender: tx, + ui_update_sender: ui_tx, + analysis_status_lock: Arc::new(RwLock::new(analysis_status)), + analysis_sender: analysis_tx, + debug_mode: true, + }) + } + + #[tokio::test] + async fn test_get_zip_success() { + let (_temp_dir, store_lock) = create_test_qmdl_store().await; + let test_qmdl_data = vec![0x7E, 0x00, 0x00, 0x00, 0x10, 0x00, 0x7E]; + let entry_name = create_test_entry_with_data(&store_lock, &test_qmdl_data).await; + let state = create_test_server_state(store_lock); + + let result = get_zip(State(state), Path(entry_name.clone())).await; + + assert!(result.is_ok()); + let response = result.unwrap(); + + let headers = response.headers(); + assert_eq!(headers.get("content-type").unwrap(), "application/zip"); + + let body = response.into_body(); + let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap(); + + let zip_reader = ZipFileReader::new(body_bytes.to_vec()).await.unwrap(); + + let filenames = zip_reader + .file() + .entries() + .iter() + .map(|entry| entry.filename().as_str().unwrap().to_owned()) + .collect::>(); + + assert_eq!( + filenames, + vec![format!("{entry_name}.qmdl"), format!("{entry_name}.pcapng"),] + ); + } +} diff --git a/bin/web/src/lib/components/ManifestCard.svelte b/bin/web/src/lib/components/ManifestCard.svelte index 93b58ef..0ccd984 100644 --- a/bin/web/src/lib/components/ManifestCard.svelte +++ b/bin/web/src/lib/components/ManifestCard.svelte @@ -59,6 +59,7 @@
+ {#if current} {:else} diff --git a/bin/web/src/lib/components/ManifestTable.svelte b/bin/web/src/lib/components/ManifestTable.svelte index 79f7d56..1493531 100644 --- a/bin/web/src/lib/components/ManifestTable.svelte +++ b/bin/web/src/lib/components/ManifestTable.svelte @@ -19,6 +19,7 @@ Size PCAP QMDL + ZIP Analysis @@ -32,6 +33,6 @@
{#each entries as entry, i} - + {/each}
\ No newline at end of file diff --git a/bin/web/src/lib/components/ManifestTableRow.svelte b/bin/web/src/lib/components/ManifestTableRow.svelte index 6ffd553..8fa0484 100644 --- a/bin/web/src/lib/components/ManifestTableRow.svelte +++ b/bin/web/src/lib/components/ManifestTableRow.svelte @@ -36,6 +36,7 @@ {entry.get_readable_qmdl_size()} + {#if current} @@ -49,7 +50,7 @@ {/if} - + diff --git a/bin/web/src/lib/manifest.svelte.ts b/bin/web/src/lib/manifest.svelte.ts index 817d1af..834e578 100644 --- a/bin/web/src/lib/manifest.svelte.ts +++ b/bin/web/src/lib/manifest.svelte.ts @@ -93,6 +93,10 @@ export class ManifestEntry { return `/api/qmdl/${this.name}.qmdl`; } + get_zip_url(): string { + return `/api/zip/${this.name}.zip`; + } + get_analysis_report_url(): string { return `/api/analysis-report/${this.name}`; }