wavehunter: adds static server, system stats

This commit does a couple things:

1. breaks out the pcap streaming logic into its own module
2. bundles wavehunter/static files into the binary for easy distribution
3. serves those static files
4. serves dynamic json representing system and diag stats

I also threw together the world's ugliest website to display all this.
This commit is contained in:
Will Greenberg
2024-01-04 19:28:13 -08:00
parent dea1d17337
commit fe0e84ba18
9 changed files with 411 additions and 155 deletions

View File

@@ -1,24 +1,29 @@
mod config;
mod error;
mod pcap;
mod server;
mod stats;
use crate::config::{parse_config, parse_args};
use crate::server::{ServerState, serve_pcap, serve_qmdl};
use crate::server::{ServerState, get_qmdl, serve_static};
use crate::pcap::get_pcap;
use crate::stats::{get_system_stats, get_diag_stats};
use crate::error::WavehunterError;
use log::debug;
use axum::response::Redirect;
use orca::diag_device::DiagDevice;
use orca::diag_reader::DiagReader;
use axum::routing::get;
use axum::Router;
use tokio::fs::File;
use log::debug;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use std::sync::Arc;
fn run_diag_read_thread(mut dev: DiagDevice, bytes_read_lock: Arc<RwLock<usize>>) -> tokio::task::JoinHandle<Result<(), WavehunterError>> {
fn run_diag_read_thread(mut dev: DiagDevice, bytes_read_lock: Arc<RwLock<usize>>) -> JoinHandle<Result<(), WavehunterError>> {
tokio::task::spawn_blocking(move || {
loop {
// TODO: once we're actually doing analysis, we'll wanna use the messages
@@ -45,8 +50,12 @@ async fn run_server(config: &config::Config, qmdl_bytes_written: Arc<RwLock<usiz
});
let app = Router::new()
.route("/output.pcap", get(serve_pcap))
.route("/output.qmdl", get(serve_qmdl))
.route("/api/pcap/latest.pcap", get(get_pcap))
.route("/api/qmdl/latest.qmdl", get(get_qmdl))
.route("/api/system-stats", get(get_system_stats))
.route("/api/diag-stats", get(get_diag_stats))
.route("/", get(|| async { Redirect::permanent("/index.html") }))
.route("/*path", get(serve_static))
.with_state(state);
let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
let listener = TcpListener::bind(&addr).await.unwrap();

109
wavehunter/src/pcap.rs Normal file
View File

@@ -0,0 +1,109 @@
use crate::ServerState;
use orca::gsmtap_parser::GsmtapParser;
use orca::pcap::GsmtapPcapWriter;
use orca::qmdl::{QmdlReader, QmdlReaderError};
use orca::diag_reader::DiagReader;
use axum::body::Body;
use axum::http::header::CONTENT_TYPE;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::{Response, IntoResponse};
use std::fs::File;
use std::io::Write;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Poll, Context};
use futures_core::Stream;
use log::error;
use tokio::sync::mpsc;
// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
// written so far. This is done by spawning a blocking thread (a tokio thread
// capable of handling blocking operations) which streams chunks of pcap data to
// a channel that's piped to the client.
pub async fn get_pcap(State(state): State<Arc<ServerState>>) -> Result<Response, (StatusCode, String)> {
let qmdl_bytes_written = *state.qmdl_bytes_written.read().await;
if qmdl_bytes_written == 0 {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"QMDL file is empty, try again in a bit!".to_string()
));
}
let (tx, rx) = mpsc::channel(1);
let channel_reader = ChannelReader { rx };
let channel_writer = ChannelWriter { tx };
tokio::task::spawn_blocking(move || {
// the QMDL reader should stop at the last successfully written data
// chunk (qmdl_bytes_written)
let qmdl_file = File::open(&state.qmdl_path).unwrap();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_bytes_written));
let mut gsmtap_parser = GsmtapParser::new();
let mut pcap_writer = GsmtapPcapWriter::new(channel_writer).unwrap();
pcap_writer.write_iface_header().unwrap();
loop {
match qmdl_reader.read_response() {
Ok(messages) => {
for maybe_msg in messages {
match maybe_msg {
Ok(msg) => {
let maybe_gsmtap_msg = gsmtap_parser.recv_message(msg)
.expect("error parsing gsmtap message");
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp)
.expect("error writing pcap packet");
}
},
Err(e) => error!("error parsing message: {:?}", e),
}
}
},
// this is expected, and just means we've reached the end of the
// safely written QMDL data
Err(QmdlReaderError::MaxBytesReached(_)) => break,
Err(e) => {
error!("error reading qmdl file: {:?}", e);
break;
},
}
}
});
let headers = [(CONTENT_TYPE, "application/vnd.tcpdump.pcap")];
let body = Body::from_stream(channel_reader);
Ok((headers, body).into_response())
}
struct ChannelWriter {
tx: mpsc::Sender<Vec<u8>>,
}
impl Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.tx.blocking_send(buf.to_vec())
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "channel closed"))?;
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
struct ChannelReader {
rx: mpsc::Receiver<Vec<u8>>,
}
impl Stream for ChannelReader {
type Item = Result<Vec<u8>, String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.rx.poll_recv(cx) {
Poll::Ready(Some(msg)) => Poll::Ready(Some(Ok(msg))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

View File

@@ -1,117 +1,22 @@
use orca::gsmtap_parser::GsmtapParser;
use orca::pcap::GsmtapPcapWriter;
use orca::qmdl::{QmdlReader, QmdlReaderError};
use orca::diag_reader::DiagReader;
use axum::body::Body;
use axum::http::header::CONTENT_TYPE;
use axum::http::header::{CONTENT_TYPE, self};
use axum::extract::State;
use axum::http::StatusCode;
use axum::http::{StatusCode, HeaderValue};
use axum::response::{Response, IntoResponse};
use axum::extract::Path;
use tokio::io::AsyncReadExt;
use std::fs::File;
use std::io::Write;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::fs::File as AsyncFile;
use tokio_util::io::ReaderStream;
use std::task::{Poll, Context};
use futures_core::Stream;
use log::error;
use tokio::sync::mpsc;
use include_dir::{include_dir, Dir};
// Streams a pcap file chunk-by-chunk to the client by reading the QMDL data
// written so far. This is done by spawning a blocking thread (a tokio thread
// capable of handling blocking operations) which streams chunks of pcap data to
// a channel that's piped to the client.
pub async fn serve_pcap(State(state): State<Arc<ServerState>>) -> Result<Response, (StatusCode, String)> {
let qmdl_bytes_written = *state.qmdl_bytes_written.read().await;
if qmdl_bytes_written == 0 {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"QMDL file is empty, try again in a bit!".to_string()
));
}
let (tx, rx) = mpsc::channel(1);
let channel_reader = ChannelReader { rx };
let channel_writer = ChannelWriter { tx };
tokio::task::spawn_blocking(move || {
// the QMDL reader should stop at the last successfully written data
// chunk (qmdl_bytes_written)
let qmdl_file = File::open(&state.qmdl_path).unwrap();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_bytes_written));
let mut gsmtap_parser = GsmtapParser::new();
let mut pcap_writer = GsmtapPcapWriter::new(channel_writer).unwrap();
pcap_writer.write_iface_header().unwrap();
loop {
match qmdl_reader.read_response() {
Ok(messages) => {
for maybe_msg in messages {
match maybe_msg {
Ok(msg) => {
let maybe_gsmtap_msg = gsmtap_parser.recv_message(msg)
.expect("error parsing gsmtap message");
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
pcap_writer.write_gsmtap_message(gsmtap_msg, timestamp)
.expect("error writing pcap packet");
}
},
Err(e) => error!("error parsing message: {:?}", e),
}
}
},
// this is expected, and just means we've reached the end of the
// safely written QMDL data
Err(QmdlReaderError::MaxBytesReached(_)) => break,
Err(e) => {
error!("error reading qmdl file: {:?}", e);
break;
},
}
}
});
let headers = [(CONTENT_TYPE, "application/vnd.tcpdump.pcap")];
let body = Body::from_stream(channel_reader);
Ok((headers, body).into_response())
pub struct ServerState {
pub qmdl_bytes_written: Arc<RwLock<usize>>,
pub qmdl_path: String,
}
struct ChannelWriter {
tx: mpsc::Sender<Vec<u8>>,
}
impl Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.tx.blocking_send(buf.to_vec())
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "channel closed"))?;
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
struct ChannelReader {
rx: mpsc::Receiver<Vec<u8>>,
}
impl Stream for ChannelReader {
type Item = Result<Vec<u8>, String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.rx.poll_recv(cx) {
Poll::Ready(Some(msg)) => Poll::Ready(Some(Ok(msg))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
pub async fn serve_qmdl(State(state): State<Arc<ServerState>>) -> Result<Response, (StatusCode, String)> {
pub async fn get_qmdl(State(state): State<Arc<ServerState>>) -> Result<Response, (StatusCode, String)> {
let qmdl_file = AsyncFile::open(&state.qmdl_path).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("error opening QMDL file: {}", e)))?;
let qmdl_bytes_written = *state.qmdl_bytes_written.read().await;
@@ -123,7 +28,25 @@ pub async fn serve_qmdl(State(state): State<Arc<ServerState>>) -> Result<Respons
Ok((headers, body).into_response())
}
pub struct ServerState {
pub qmdl_bytes_written: Arc<RwLock<usize>>,
pub qmdl_path: String,
// Bundles the server's static files (html/css/js) into the binary for easy distribution
static STATIC_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/static");
pub async fn serve_static(Path(path): Path<String>) -> impl IntoResponse {
let path = path.trim_start_matches('/');
let mime_type = mime_guess::from_path(path).first_or_text_plain();
match STATIC_DIR.get_file(path) {
None => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap(),
Some(file) => Response::builder()
.status(StatusCode::OK)
.header(
header::CONTENT_TYPE,
HeaderValue::from_str(mime_type.as_ref()).unwrap(),
)
.body(Body::from(file.contents()))
.unwrap(),
}
}

121
wavehunter/src/stats.rs Normal file
View File

@@ -0,0 +1,121 @@
use crate::server::ServerState;
use axum::Json;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use std::sync::Arc;
use log::error;
use serde::Serialize;
use tokio::process::Command;
#[derive(Debug, Serialize)]
pub struct SystemStats {
pub disk_stats: DiskStats,
pub memory_stats: MemoryStats,
}
impl SystemStats {
pub async fn new(qmdl_path: &str) -> Result<Self, String> {
Ok(Self {
disk_stats: DiskStats::new(qmdl_path).await?,
memory_stats: MemoryStats::new().await?,
})
}
}
#[derive(Debug, Serialize)]
pub struct DiskStats {
partition: String,
total_size: String,
used_size: String,
available_size: String,
used_percent: String,
mounted_on: String,
}
impl DiskStats {
// runs "df -h <qmdl_path>" to get storage statistics for the partition containing
// the QMDL file
pub async fn new(qmdl_path: &str) -> Result<Self, String> {
let mut df_cmd = Command::new("df");
df_cmd.arg("-h");
df_cmd.arg(qmdl_path);
let stdout = get_cmd_output(df_cmd).await?;
let mut parts = stdout.split_whitespace().skip(7).to_owned();
Ok(Self {
partition: parts.next().ok_or("error parsing df output")?.to_string(),
total_size: parts.next().ok_or("error parsing df output")?.to_string(),
used_size: parts.next().ok_or("error parsing df output")?.to_string(),
available_size: parts.next().ok_or("error parsing df output")?.to_string(),
used_percent: parts.next().ok_or("error parsing df output")?.to_string(),
mounted_on: parts.next().ok_or("error parsing df output")?.to_string(),
})
}
}
#[derive(Debug, Serialize)]
pub struct MemoryStats {
total: String,
used: String,
free: String,
}
// runs the given command and returns its stdout as a string
async fn get_cmd_output(mut cmd: Command) -> Result<String, String> {
let cmd_str = format!("{:?}", &cmd);
let output = cmd.output().await
.map_err(|e| format!("error running command {}: {}", &cmd_str, e))?;
if !output.status.success() {
return Err(format!("command {} failed with exit code {}", &cmd_str, output.status.code().unwrap()));
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
impl MemoryStats {
// runs "free -k" and parses the output to retrieve memory stats
pub async fn new() -> Result<Self, String> {
let mut free_cmd = Command::new("free");
free_cmd.arg("-k");
let stdout = get_cmd_output(free_cmd).await?;
let mut numbers = stdout.split_whitespace()
.flat_map(|part| part.parse::<usize>());
Ok(Self {
total: humanize_kb(numbers.next().ok_or("error parsing free output")?),
used: humanize_kb(numbers.next().ok_or("error parsing free output")?),
free: humanize_kb(numbers.next().ok_or("error parsing free output")?),
})
}
}
// turns a number of kilobytes (like 28293) into a human-readable string (like "28.3M")
fn humanize_kb(kb: usize) -> String {
if kb < 1000{
return format!("{}K", kb);
}
format!("{:.1}M", kb as f64 / 1024.0)
}
pub async fn get_system_stats(State(state): State<Arc<ServerState>>) -> Result<Json<SystemStats>, (StatusCode, String)> {
match SystemStats::new(&state.qmdl_path).await {
Ok(stats) => Ok(Json(stats)),
Err(err) => {
error!("error getting system stats: {}", err);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error getting system stats".to_string()
));
},
}
}
#[derive(Debug, Serialize)]
pub struct DiagStats {
bytes_written: usize,
}
pub async fn get_diag_stats(State(state): State<Arc<ServerState>>) -> impl IntoResponse {
Json(DiagStats {
bytes_written: *state.qmdl_bytes_written.read().await,
})
}