diff --git a/check/src/main.rs b/check/src/main.rs index a449030..6165a39 100644 --- a/check/src/main.rs +++ b/check/src/main.rs @@ -155,7 +155,7 @@ async fn pcapify(qmdl_path: &PathBuf) { .await .expect("failed to get container") { - for msg in container.into_messages().into_iter().flatten() { + for msg in container.messages().into_iter().flatten() { if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) { pcap_writer .write_gsmtap_message(parsed, timestamp, None) diff --git a/daemon/src/diag.rs b/daemon/src/diag.rs index 6a2f40d..7b72af7 100644 --- a/daemon/src/diag.rs +++ b/daemon/src/diag.rs @@ -23,7 +23,7 @@ use tokio_util::task::TaskTracker; #[cfg(feature = "apidocs")] use rayhunter::analysis::analyzer::ReportMetadata; use rayhunter::analysis::analyzer::{AnalysisLineNormalizer, AnalyzerConfig, EventType}; -use rayhunter::diag::{DataType, MessagesContainer}; +use rayhunter::diag::{DataType, Message, MessagesContainer}; use rayhunter::diag_device::DiagDevice; use rayhunter::qmdl::QmdlWriter; @@ -49,6 +49,10 @@ pub enum DiagDeviceCtrlMessage { DeleteAllEntries { response_tx: oneshot::Sender>, }, + GpsUpdate { + lat: f64, + lon: f64, + }, Exit, } @@ -65,6 +69,7 @@ pub struct DiagTask { max_type_seen: EventType, bytes_since_space_check: usize, low_space_warned: bool, + latest_packet_timestamp: Option, } enum DiagState { @@ -126,6 +131,7 @@ impl DiagTask { max_type_seen: EventType::Informational, bytes_since_space_check: 0, low_space_warned: false, + latest_packet_timestamp: None, } } @@ -154,7 +160,7 @@ impl DiagTask { let (qmdl_file, analysis_file) = qmdl_store.new_entry(self.gps_mode).await?; - // For fixed-mode sessions, write the configured coordinates to the sidecar + // For fixed-mode sessions, write the configured coordinates to the storage // immediately so the per-session GPS is stored durably and isn't affected // by future config changes or GPS API calls. if self.gps_mode == GpsMode::Fixed @@ -164,10 +170,11 @@ impl DiagTask { let mut gps_file = qmdl_store .open_entry_gps_for_append(entry_idx) .await? - .ok_or(RecordingStoreError::GpsSidecarNotFound)?; + .ok_or(RecordingStoreError::GpsStorageNotFound)?; let record = GpsRecord { - unix_ts: chrono::Utc::now().timestamp(), + latest_packet_timestamp: None, + system_time: rayhunter::clock::get_adjusted_now().timestamp(), lat, lon, }; @@ -257,6 +264,38 @@ impl DiagTask { res } + async fn handle_gps_update(&mut self, qmdl_store: &RecordingStore, lat: f64, lon: f64) { + let Some((entry_idx, _)) = qmdl_store.get_current_entry() else { + info!("GPS update received but no recording active, not writing to storage"); + return; + }; + let mut file = match qmdl_store.open_entry_gps_for_append(entry_idx).await { + Ok(Some(f)) => f, + Ok(None) => { + error!("GPS storage not found, cannot write GPS record"); + return; + } + Err(e) => { + error!("failed to open GPS storage: {e}"); + return; + } + }; + let record = GpsRecord { + latest_packet_timestamp: self.latest_packet_timestamp, + system_time: rayhunter::clock::get_adjusted_now().timestamp(), + lat, + lon, + }; + let Ok(mut json) = serde_json::to_vec(&record) else { + error!("failed to serialize GPS record"); + return; + }; + json.push(b'\n'); + if let Err(e) = file.write_all(&json).await { + error!("failed to write GPS record to storage: {e}"); + } + } + async fn stop_current_recording(&mut self) { let mut state = DiagState::Stopped; std::mem::swap(&mut self.state, &mut state); @@ -352,6 +391,20 @@ impl DiagTask { return; } debug!("done!"); + + // Extract the latest packet timestamp from this container + if let Some(ts) = container + .messages() + .into_iter() + .filter_map(|r| match r { + Ok(Message::Log { timestamp, .. }) => Some(timestamp.to_datetime().timestamp()), + _ => None, + }) + .max() + { + self.latest_packet_timestamp = Some(ts); + } + let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum(); self.bytes_since_space_check += container_bytes; let max_type = match analysis_writer.analyze(container).await { @@ -465,6 +518,10 @@ pub fn run_diag_read_thread( error!("Failed to send delete all entries respons, receiver dropped"); } }, + Some(DiagDeviceCtrlMessage::GpsUpdate { lat, lon }) => { + let qmdl_store = qmdl_store_lock.read().await; + diag_task.handle_gps_update(&qmdl_store, lat, lon).await; + }, } } maybe_container = diag_stream.next() => { diff --git a/daemon/src/gps.rs b/daemon/src/gps.rs index 2be8c67..146919b 100644 --- a/daemon/src/gps.rs +++ b/daemon/src/gps.rs @@ -1,12 +1,13 @@ use axum::Json; use axum::extract::State; use axum::http::StatusCode; -use log::{error, info, warn}; +use log::{error, warn}; use serde::{Deserialize, Deserializer, Serialize}; use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::io::{AsyncBufReadExt, BufReader}; use crate::config::GpsMode; +use crate::diag::DiagDeviceCtrlMessage; use crate::server::ServerState; fn deserialize_latitude<'de, D>(deserializer: D) -> Result @@ -37,28 +38,6 @@ where Ok(v) } -fn deserialize_unix_ts<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - use serde::de; - use serde_json::Value; - match Value::deserialize(deserializer)? { - Value::Number(n) => n - .as_i64() - .or_else(|| n.as_f64().map(|f| f as i64)) - .ok_or_else(|| de::Error::custom("timestamp out of range")), - Value::String(s) => s - .trim() - .parse::() - .map(|f| f as i64) - .map_err(|_| de::Error::custom("timestamp must be a numeric value")), - _ => Err(de::Error::custom( - "timestamp must be a number or numeric string", - )), - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] #[cfg_attr(feature = "apidocs", derive(utoipa::ToSchema))] pub struct GpsData { @@ -66,18 +45,20 @@ pub struct GpsData { pub latitude: f64, #[serde(deserialize_with = "deserialize_longitude")] pub longitude: f64, - #[serde(deserialize_with = "deserialize_unix_ts")] - pub timestamp: i64, } #[derive(Serialize, Deserialize)] pub struct GpsRecord { - pub unix_ts: i64, + /// Packet timestamp (modem clock) for correlation with captured packets. + /// None if no packets have been received yet. + pub latest_packet_timestamp: Option, + /// Drift-corrected system time when this GPS fix was received + pub system_time: i64, pub lat: f64, pub lon: f64, } -/// Reads all GPS records from a sidecar NDJSON file, logging and skipping malformed lines. +/// Reads all GPS records from a storage NDJSON file, logging and skipping malformed lines. pub async fn load_gps_records(file: tokio::fs::File) -> Vec { let reader = BufReader::new(file); let mut lines = reader.lines(); @@ -86,16 +67,16 @@ pub async fn load_gps_records(file: tokio::fs::File) -> Vec { match lines.next_line().await { Ok(Some(line)) => match serde_json::from_str::(&line) { Ok(record) => records.push(record), - Err(e) => warn!("skipping malformed GPS sidecar line: {e}"), + Err(e) => warn!("skipping malformed GPS storage line: {e}"), }, Ok(None) => break, Err(e) => { - error!("error reading GPS sidecar file: {e}"); + error!("error reading GPS storage file: {e}"); break; } } } - records.sort_by_key(|r| r.unix_ts); + records.sort_by_key(|r| r.latest_packet_timestamp.unwrap_or(i64::MIN)); records } @@ -108,10 +89,10 @@ pub async fn load_gps_records(file: tokio::fs::File) -> Vec { responses( (status = StatusCode::OK, description = "GPS data accepted"), (status = StatusCode::FORBIDDEN, description = "GPS API endpoint is disabled"), - (status = StatusCode::INTERNAL_SERVER_ERROR, description = "Failed to write GPS record") + (status = StatusCode::INTERNAL_SERVER_ERROR, description = "Failed to send GPS update") ), summary = "Submit GPS coordinates", - description = "Submit GPS coordinates from an external source (e.g. a phone app). Requires gps_mode to be set to 'Api' in configuration. latitude is in decimal degrees from -90 to 90, longitude is in decimal degrees from -180 to 180, timestamp is a Unix timestamp in seconds." + description = "Submit GPS coordinates from an external source (e.g. a phone app). Requires gps_mode to be set to 'Api' in configuration. latitude is in decimal degrees from -90 to 90, longitude is in decimal degrees from -180 to 180. The timestamp is derived from the most recent packet's modem timestamp." ))] pub async fn post_gps( State(state): State>, @@ -124,48 +105,27 @@ pub async fn post_gps( .to_string(), )); } + + // Update in-memory state for GET /api/gps let mut gps = state.gps_state.write().await; *gps = Some(gps_data.clone()); drop(gps); - let qmdl_store = state.qmdl_store_lock.read().await; - if let Some((entry_idx, _)) = qmdl_store.get_current_entry() { - match qmdl_store.open_entry_gps_for_append(entry_idx).await { - Ok(Some(mut file)) => { - let record = GpsRecord { - unix_ts: gps_data.timestamp, - lat: gps_data.latitude, - lon: gps_data.longitude, - }; - let mut json = serde_json::to_vec(&record).map_err(|e| { - error!("failed to serialize GPS record: {e}"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("failed to serialize GPS record: {e}"), - ) - })?; - json.push(b'\n'); - file.write_all(&json).await.map_err(|e| { - error!("failed to write GPS record to sidecar: {e}"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("failed to write GPS record to sidecar: {e}"), - ) - })?; - } - Ok(None) => error!("GPS sidecar directory not found, cannot write GPS record"), - Err(e) => { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - format!("failed to open GPS sidecar: {e}"), - )); - } - } - } else { - info!( - "GPS data received but no recording is active — position updated in memory only, not persisted to sidecar" - ); - } + // Send to DiagTask to write to storage with packet timestamp + state + .diag_device_ctrl_sender + .send(DiagDeviceCtrlMessage::GpsUpdate { + lat: gps_data.latitude, + lon: gps_data.longitude, + }) + .await + .map_err(|e| { + error!("failed to send GPS update: {e}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to send GPS update: {e}"), + ) + })?; Ok(StatusCode::OK) } diff --git a/daemon/src/main.rs b/daemon/src/main.rs index ba2b899..17dc87d 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -311,7 +311,6 @@ async fn run_with_config( (Some(lat), Some(lon)) => Some(gps::GpsData { latitude: lat, longitude: lon, - timestamp: chrono::Utc::now().timestamp(), }), _ => { warn!( diff --git a/daemon/src/pcap.rs b/daemon/src/pcap.rs index 8cbf388..c0c7a6b 100644 --- a/daemon/src/pcap.rs +++ b/daemon/src/pcap.rs @@ -85,31 +85,35 @@ pub(crate) async fn load_gps_records_for_entry( .and_then(|e| e.gps_mode); if gps_mode.is_some_and(|m| m != GpsMode::Disabled) { error!( - "GPS sidecar expected for entry {entry_index} (mode: {gps_mode:?}) but not found" + "GPS storage expected for entry {entry_index} (mode: {gps_mode:?}) but not found" ); } vec![] } Err(e) => { - error!("failed to open GPS sidecar: {e}"); + error!("failed to open GPS storage: {e}"); vec![] } } } -fn find_nearest_gps(records: &[GpsRecord], packet_unix_ts: i64) -> Option { +fn record_timestamp(r: &GpsRecord) -> i64 { + r.latest_packet_timestamp.unwrap_or(i64::MIN) +} + +fn find_nearest_gps(records: &[GpsRecord], packet_timestamp: i64) -> Option { if records.is_empty() { return None; } - let idx = records.partition_point(|r| r.unix_ts <= packet_unix_ts); + let idx = records.partition_point(|r| record_timestamp(r) <= packet_timestamp); let record = if idx == 0 { &records[0] } else if idx >= records.len() { &records[records.len() - 1] } else { let (before, after) = (&records[idx - 1], &records[idx]); - let before_delta = packet_unix_ts - before.unix_ts; - let after_delta = after.unix_ts - packet_unix_ts; + let before_delta = packet_timestamp - record_timestamp(before); + let after_delta = record_timestamp(after) - packet_timestamp; if before_delta <= after_delta { before } else { @@ -119,7 +123,7 @@ fn find_nearest_gps(records: &[GpsRecord], packet_unix_ts: i64) -> Option { let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?; @@ -166,8 +170,13 @@ where mod tests { use super::*; - fn rec(unix_ts: i64, lat: f64, lon: f64) -> GpsRecord { - GpsRecord { unix_ts, lat, lon } + fn rec(latest_packet_timestamp: i64, lat: f64, lon: f64) -> GpsRecord { + GpsRecord { + latest_packet_timestamp: Some(latest_packet_timestamp), + system_time: 0, + lat, + lon, + } } #[test] diff --git a/daemon/src/qmdl_store.rs b/daemon/src/qmdl_store.rs index 54ba08f..bf197ec 100644 --- a/daemon/src/qmdl_store.rs +++ b/daemon/src/qmdl_store.rs @@ -37,8 +37,8 @@ pub enum RecordingStoreError { ParseManifestError(toml::de::Error), #[error("Insufficient disk space: {0}MB available, {1}MB required")] InsufficientDiskSpace(u64, u64), - #[error("GPS sidecar directory not found")] - GpsSidecarNotFound, + #[error("GPS storage directory not found")] + GpsStorageNotFound, #[error("Serialization error: {0}")] SerializationError(#[from] serde_json::Error), } diff --git a/daemon/web/src/lib/components/ConfigForm.svelte b/daemon/web/src/lib/components/ConfigForm.svelte index 88a3669..8d9d4de 100644 --- a/daemon/web/src/lib/components/ConfigForm.svelte +++ b/daemon/web/src/lib/components/ConfigForm.svelte @@ -798,8 +798,8 @@

{#if config.gps_mode === GpsMode.Api} - POST latitude, longitude, and timestamp to /api/gps from - any device on the network. + POST latitude and longitude to /api/gps from any device + on the network. Timestamp is derived from packet capture timing. {:else if config.gps_mode === GpsMode.Fixed} GPS coordinates are fixed to the values below. {:else} diff --git a/daemon/web/src/lib/components/SystemStatsTable.svelte b/daemon/web/src/lib/components/SystemStatsTable.svelte index e81be9a..604c10b 100644 --- a/daemon/web/src/lib/components/SystemStatsTable.svelte +++ b/daemon/web/src/lib/components/SystemStatsTable.svelte @@ -36,11 +36,6 @@ } return text; }); - - const gps_date_formatter = new Intl.DateTimeFormat(undefined, { - timeStyle: 'long', - dateStyle: 'short', - });

Latitude {gps_data.latitude.toFixed(6)} - + Longitude {gps_data.longitude.toFixed(6)} - - GPS Timestamp - - {gps_data.timestamp > 0 - ? gps_date_formatter.format(new Date(gps_data.timestamp * 1000)) - : 'Fixed'} - - {:else} GPS Data diff --git a/lib/src/analysis/analyzer.rs b/lib/src/analysis/analyzer.rs index d903775..426e934 100644 --- a/lib/src/analysis/analyzer.rs +++ b/lib/src/analysis/analyzer.rs @@ -414,7 +414,7 @@ impl Harness { pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec { let mut rows = Vec::new(); - for maybe_qmdl_message in container.into_messages() { + for maybe_qmdl_message in container.messages() { self.packet_num += 1; rows.push(AnalysisRow { diff --git a/lib/src/diag.rs b/lib/src/diag.rs index fcdb33b..323747b 100644 --- a/lib/src/diag.rs +++ b/lib/src/diag.rs @@ -85,9 +85,9 @@ pub struct MessagesContainer { } impl MessagesContainer { - pub fn into_messages(self) -> Vec> { + pub fn messages(&self) -> Vec> { let mut result = Vec::new(); - for msg in self.messages { + for msg in &self.messages { for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) { match hdlc_decapsulate(sub_msg, &CRC_CCITT) { Ok(data) => match Message::from_bytes((&data, 0)) { @@ -569,7 +569,7 @@ mod test { let mut container = make_container(DataType::UserSpace, encapsulated1); container.messages.push(encapsulated2); container.num_messages += 1; - assert_eq!(container.into_messages(), vec![Ok(message1), Ok(message2)]); + assert_eq!(container.messages(), vec![Ok(message1), Ok(message2)]); } #[test] @@ -579,7 +579,7 @@ mod test { encapsulated1.data.extend(encapsulated2.data); encapsulated1.len += encapsulated2.len; let container = make_container(DataType::UserSpace, encapsulated1); - assert_eq!(container.into_messages(), vec![Ok(message1), Ok(message2)]); + assert_eq!(container.messages(), vec![Ok(message1), Ok(message2)]); } #[test] @@ -593,7 +593,7 @@ mod test { let mut container = make_container(DataType::UserSpace, encapsulated1); container.messages.push(encapsulated2); container.num_messages += 1; - let result = container.into_messages(); + let result = container.messages(); assert_eq!(result[0], Ok(message1)); assert!(matches!( result[1], @@ -611,7 +611,7 @@ mod test { let mut container = make_container(DataType::UserSpace, encapsulated1); container.messages.push(bad_encapsulation); container.num_messages += 1; - let result = container.into_messages(); + let result = container.messages(); assert_eq!(result[0], Ok(message1)); assert!(matches!( result[1], diff --git a/lib/src/diag_device.rs b/lib/src/diag_device.rs index 47aa496..fe42592 100644 --- a/lib/src/diag_device.rs +++ b/lib/src/diag_device.rs @@ -212,7 +212,7 @@ impl DiagDevice { if container.data_type != DataType::UserSpace { continue; } - return Ok(container.into_messages()); + return Ok(container.messages()); } }