diff --git a/wavehunter/src/config.rs b/wavehunter/src/config.rs new file mode 100644 index 0000000..1cd20dc --- /dev/null +++ b/wavehunter/src/config.rs @@ -0,0 +1,51 @@ +use crate::error::WavehunterError; + +use serde::Deserialize; +use toml; + +#[derive(Deserialize)] +struct ConfigFile { + qmdl_path: Option, + port: Option, +} + +#[derive(Debug)] +pub struct Config { + pub qmdl_path: String, + pub port: u16, +} + +impl Default for Config { + fn default() -> Self { + Config { + qmdl_path: "./wavehunter.qmdl".to_string(), + port: 8080, + } + } +} + +pub fn parse_config

(path: P) -> Result where P: AsRef { + let config_file = std::fs::read_to_string(&path) + .map_err(|_| WavehunterError::MissingConfigFile(format!("{:?}", path.as_ref())))?; + let parsed_config: ConfigFile = toml::from_str(&config_file) + .map_err(WavehunterError::ConfigFileParsingError)?; + let mut config = Config::default(); + parsed_config.qmdl_path.map(|path| config.qmdl_path = path); + parsed_config.port.map(|path| config.port = path); + Ok(config) +} + +pub struct Args { + pub config_path: String, +} + +pub fn parse_args() -> Args { + let args: Vec = std::env::args().collect(); + if args.len() != 2 { + println!("Usage: {} /path/to/config/file", args[0]); + std::process::exit(1); + } + Args { + config_path: args[1].clone(), + } +} diff --git a/wavehunter/src/error.rs b/wavehunter/src/error.rs new file mode 100644 index 0000000..72200a9 --- /dev/null +++ b/wavehunter/src/error.rs @@ -0,0 +1,16 @@ +use thiserror::Error; +use orca::diag_device::DiagDeviceError; + +#[derive(Error, Debug)] +pub enum WavehunterError { + #[error("Missing config file: {0}")] + MissingConfigFile(String), + #[error("Config file parsing error: {0}")] + ConfigFileParsingError(#[from] toml::de::Error), + #[error("Diag intialization error: {0}")] + DiagInitError(DiagDeviceError), + #[error("Diag read error: {0}")] + DiagReadError(DiagDeviceError), + #[error("Tokio error: {0}")] + TokioError(#[from] tokio::io::Error), +} diff --git a/wavehunter/src/main.rs b/wavehunter/src/main.rs index f75ad04..7e63924 100644 --- a/wavehunter/src/main.rs +++ b/wavehunter/src/main.rs @@ -1,90 +1,20 @@ -use axum::body::Body; -use axum::http::header::CONTENT_TYPE; -use futures_core::Stream; -use log::error; -use orca::diag_device::{DiagDevice, DiagDeviceError}; +mod config; +mod error; +mod server; + +use crate::config::{parse_config, parse_args}; +use crate::server::{ServerState, serve_pcap}; +use crate::error::WavehunterError; + +use orca::diag_device::DiagDevice; use orca::diag_reader::DiagReader; -use orca::gsmtap_parser::GsmtapParser; -use orca::pcap::GsmtapPcapWriter; -use orca::qmdl::{QmdlReader, QmdlReaderError}; -use axum::Router; -use axum::extract::State; -use axum::http::StatusCode; -use axum::response::{Response, IntoResponse}; use axum::routing::get; -use std::fs::File; -use thiserror::Error; -use serde::Deserialize; -use std::io::Write; -use std::sync::Arc; +use axum::Router; use std::net::SocketAddr; -use std::pin::Pin; -use std::task::{Poll, Context}; use tokio::net::TcpListener; -use tokio::sync::{mpsc, RwLock}; -use toml; - -#[derive(Error, Debug)] -enum WavehunterError { - #[error("Missing config file: {0}")] - MissingConfigFile(String), - #[error("Config file parsing error: {0}")] - ConfigFileParsingError(#[from] toml::de::Error), - #[error("Diag intialization error: {0}")] - DiagInitError(DiagDeviceError), - #[error("Diag read error: {0}")] - DiagReadError(DiagDeviceError), - #[error("Tokio error: {0}")] - TokioError(#[from] tokio::io::Error), -} - -#[derive(Deserialize)] -struct ConfigFile { - qmdl_path: Option, - port: Option, -} - -#[derive(Debug)] -struct Config { - qmdl_path: String, - port: u16, -} - -impl Default for Config { - fn default() -> Self { - Config { - qmdl_path: "./wavehunter.qmdl".to_string(), - port: 8080, - } - } -} - -fn parse_config

(path: P) -> Result where P: AsRef { - let config_file = std::fs::read_to_string(&path) - .map_err(|_| WavehunterError::MissingConfigFile(format!("{:?}", path.as_ref())))?; - let parsed_config: ConfigFile = toml::from_str(&config_file) - .map_err(WavehunterError::ConfigFileParsingError)?; - let mut config = Config::default(); - parsed_config.qmdl_path.map(|path| config.qmdl_path = path); - parsed_config.port.map(|path| config.port = path); - Ok(config) -} - -struct Args { - config_path: String, -} - -fn parse_args() -> Args { - let args: Vec = std::env::args().collect(); - if args.len() != 2 { - println!("Usage: {} /path/to/config/file", args[0]); - std::process::exit(1); - } - Args { - config_path: args[1].clone(), - } -} +use tokio::sync::RwLock; +use std::sync::Arc; fn run_diag_read_thread(mut dev: DiagDevice, bytes_read_lock: Arc>) -> tokio::task::JoinHandle> { tokio::task::spawn_blocking(move || { @@ -102,103 +32,6 @@ fn run_diag_read_thread(mut dev: DiagDevice, bytes_read_lock: Arc> }) } -// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data -// written so far. This is done by spawning a blocking thread (a tokio thread -// capable of handling blocking operations) which streams chunks of pcap data to -// a channel that's piped to the client. -async fn serve_pcap(State(state): State>) -> Result { - let qmdl_bytes_written = *state.qmdl_bytes_written.read().await; - if qmdl_bytes_written == 0 { - return Err(( - StatusCode::SERVICE_UNAVAILABLE, - "QMDL file is empty, try again in a bit!".to_string() - )); - } - - let (tx, rx) = mpsc::channel(1); - let channel_reader = ChannelReader { rx }; - let channel_writer = ChannelWriter { tx }; - tokio::task::spawn_blocking(move || { - // the QMDL reader should stop at the last successfully written data - // chunk (qmdl_bytes_written) - let qmdl_file = File::open(&state.qmdl_path).unwrap(); - let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_bytes_written)); - - let mut gsmtap_parser = GsmtapParser::new(); - let mut pcap_writer = GsmtapPcapWriter::new(channel_writer).unwrap(); - pcap_writer.write_iface_header().unwrap(); - loop { - match qmdl_reader.read_response() { - Ok(messages) => { - for maybe_msg in messages { - match maybe_msg { - Ok(msg) => { - let maybe_gsmtap_msg = gsmtap_parser.recv_message(msg) - .expect("error parsing gsmtap message"); - if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg { - pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp) - .expect("error writing pcap packet"); - } - }, - Err(e) => { - error!("error parsing message: {:?}", e); - }, - } - } - }, - // this is expected, and just means we've reached the end of the - // safely written QMDL data - Err(QmdlReaderError::MaxBytesReached(_)) => break, - Err(e) => { - error!("error reading qmdl file: {:?}", e); - break; - }, - } - } - }); - - let headers = [(CONTENT_TYPE, "application/vnd.tcpdump.pcap")]; - let body = Body::from_stream(channel_reader); - Ok((headers, body).into_response()) -} - -struct ChannelWriter { - tx: mpsc::Sender>, -} - -impl Write for ChannelWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.tx.blocking_send(buf.to_vec()) - .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "channel closed"))?; - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -struct ChannelReader { - rx: mpsc::Receiver>, -} - -impl Stream for ChannelReader { - type Item = Result, String>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.rx.poll_recv(cx) { - Poll::Ready(Some(msg)) => Poll::Ready(Some(Ok(msg))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} - -struct ServerState { - qmdl_bytes_written: Arc>, - qmdl_path: String, -} - #[tokio::main] async fn main() -> Result<(), WavehunterError> { env_logger::init(); diff --git a/wavehunter/src/server.rs b/wavehunter/src/server.rs new file mode 100644 index 0000000..95b74e9 --- /dev/null +++ b/wavehunter/src/server.rs @@ -0,0 +1,116 @@ +use orca::gsmtap_parser::GsmtapParser; +use orca::pcap::GsmtapPcapWriter; +use orca::qmdl::{QmdlReader, QmdlReaderError}; +use orca::diag_reader::DiagReader; + +use axum::body::Body; +use axum::http::header::CONTENT_TYPE; +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::{Response, IntoResponse}; +use std::fs::File; +use std::io::Write; +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::RwLock; +use std::task::{Poll, Context}; +use futures_core::Stream; +use log::error; +use tokio::sync::mpsc; + +// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data +// written so far. This is done by spawning a blocking thread (a tokio thread +// capable of handling blocking operations) which streams chunks of pcap data to +// a channel that's piped to the client. +pub async fn serve_pcap(State(state): State>) -> Result { + let qmdl_bytes_written = *state.qmdl_bytes_written.read().await; + if qmdl_bytes_written == 0 { + return Err(( + StatusCode::SERVICE_UNAVAILABLE, + "QMDL file is empty, try again in a bit!".to_string() + )); + } + + let (tx, rx) = mpsc::channel(1); + let channel_reader = ChannelReader { rx }; + let channel_writer = ChannelWriter { tx }; + tokio::task::spawn_blocking(move || { + // the QMDL reader should stop at the last successfully written data + // chunk (qmdl_bytes_written) + let qmdl_file = File::open(&state.qmdl_path).unwrap(); + let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_bytes_written)); + + let mut gsmtap_parser = GsmtapParser::new(); + let mut pcap_writer = GsmtapPcapWriter::new(channel_writer).unwrap(); + pcap_writer.write_iface_header().unwrap(); + loop { + match qmdl_reader.read_response() { + Ok(messages) => { + for maybe_msg in messages { + match maybe_msg { + Ok(msg) => { + let maybe_gsmtap_msg = gsmtap_parser.recv_message(msg) + .expect("error parsing gsmtap message"); + if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg { + pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp) + .expect("error writing pcap packet"); + } + }, + Err(e) => { + error!("error parsing message: {:?}", e); + }, + } + } + }, + // this is expected, and just means we've reached the end of the + // safely written QMDL data + Err(QmdlReaderError::MaxBytesReached(_)) => break, + Err(e) => { + error!("error reading qmdl file: {:?}", e); + break; + }, + } + } + }); + + let headers = [(CONTENT_TYPE, "application/vnd.tcpdump.pcap")]; + let body = Body::from_stream(channel_reader); + Ok((headers, body).into_response()) +} + +struct ChannelWriter { + tx: mpsc::Sender>, +} + +impl Write for ChannelWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.tx.blocking_send(buf.to_vec()) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "channel closed"))?; + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +struct ChannelReader { + rx: mpsc::Receiver>, +} + +impl Stream for ChannelReader { + type Item = Result, String>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.rx.poll_recv(cx) { + Poll::Ready(Some(msg)) => Poll::Ready(Some(Ok(msg))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +pub struct ServerState { + pub qmdl_bytes_written: Arc>, + pub qmdl_path: String, +}