Use latest packet timestamp in GPS file, move writing into DiagTask to eliminate RwLocks, remove "sidecar" word from codebase

This commit is contained in:
Markus Unterwaditzer
2026-05-18 23:43:43 +02:00
committed by Will Greenberg
parent 2ada840919
commit 0c90f8910a
11 changed files with 125 additions and 113 deletions
+1 -1
View File
@@ -155,7 +155,7 @@ async fn pcapify(qmdl_path: &PathBuf) {
.await .await
.expect("failed to get container") .expect("failed to get container")
{ {
for msg in container.into_messages().into_iter().flatten() { for msg in container.messages().into_iter().flatten() {
if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) { if let Ok(Some((timestamp, parsed))) = gsmtap_parser::parse(msg) {
pcap_writer pcap_writer
.write_gsmtap_message(parsed, timestamp, None) .write_gsmtap_message(parsed, timestamp, None)
+61 -4
View File
@@ -23,7 +23,7 @@ use tokio_util::task::TaskTracker;
#[cfg(feature = "apidocs")] #[cfg(feature = "apidocs")]
use rayhunter::analysis::analyzer::ReportMetadata; use rayhunter::analysis::analyzer::ReportMetadata;
use rayhunter::analysis::analyzer::{AnalysisLineNormalizer, AnalyzerConfig, EventType}; use rayhunter::analysis::analyzer::{AnalysisLineNormalizer, AnalyzerConfig, EventType};
use rayhunter::diag::{DataType, MessagesContainer}; use rayhunter::diag::{DataType, Message, MessagesContainer};
use rayhunter::diag_device::DiagDevice; use rayhunter::diag_device::DiagDevice;
use rayhunter::qmdl::QmdlWriter; use rayhunter::qmdl::QmdlWriter;
@@ -49,6 +49,10 @@ pub enum DiagDeviceCtrlMessage {
DeleteAllEntries { DeleteAllEntries {
response_tx: oneshot::Sender<Result<(), RecordingStoreError>>, response_tx: oneshot::Sender<Result<(), RecordingStoreError>>,
}, },
GpsUpdate {
lat: f64,
lon: f64,
},
Exit, Exit,
} }
@@ -65,6 +69,7 @@ pub struct DiagTask {
max_type_seen: EventType, max_type_seen: EventType,
bytes_since_space_check: usize, bytes_since_space_check: usize,
low_space_warned: bool, low_space_warned: bool,
latest_packet_timestamp: Option<i64>,
} }
enum DiagState { enum DiagState {
@@ -126,6 +131,7 @@ impl DiagTask {
max_type_seen: EventType::Informational, max_type_seen: EventType::Informational,
bytes_since_space_check: 0, bytes_since_space_check: 0,
low_space_warned: false, low_space_warned: false,
latest_packet_timestamp: None,
} }
} }
@@ -154,7 +160,7 @@ impl DiagTask {
let (qmdl_file, analysis_file) = qmdl_store.new_entry(self.gps_mode).await?; let (qmdl_file, analysis_file) = qmdl_store.new_entry(self.gps_mode).await?;
// For fixed-mode sessions, write the configured coordinates to the sidecar // For fixed-mode sessions, write the configured coordinates to the storage
// immediately so the per-session GPS is stored durably and isn't affected // immediately so the per-session GPS is stored durably and isn't affected
// by future config changes or GPS API calls. // by future config changes or GPS API calls.
if self.gps_mode == GpsMode::Fixed if self.gps_mode == GpsMode::Fixed
@@ -164,10 +170,11 @@ impl DiagTask {
let mut gps_file = qmdl_store let mut gps_file = qmdl_store
.open_entry_gps_for_append(entry_idx) .open_entry_gps_for_append(entry_idx)
.await? .await?
.ok_or(RecordingStoreError::GpsSidecarNotFound)?; .ok_or(RecordingStoreError::GpsStorageNotFound)?;
let record = GpsRecord { let record = GpsRecord {
unix_ts: chrono::Utc::now().timestamp(), latest_packet_timestamp: None,
system_time: rayhunter::clock::get_adjusted_now().timestamp(),
lat, lat,
lon, lon,
}; };
@@ -257,6 +264,38 @@ impl DiagTask {
res res
} }
async fn handle_gps_update(&mut self, qmdl_store: &RecordingStore, lat: f64, lon: f64) {
let Some((entry_idx, _)) = qmdl_store.get_current_entry() else {
info!("GPS update received but no recording active, not writing to storage");
return;
};
let mut file = match qmdl_store.open_entry_gps_for_append(entry_idx).await {
Ok(Some(f)) => f,
Ok(None) => {
error!("GPS storage not found, cannot write GPS record");
return;
}
Err(e) => {
error!("failed to open GPS storage: {e}");
return;
}
};
let record = GpsRecord {
latest_packet_timestamp: self.latest_packet_timestamp,
system_time: rayhunter::clock::get_adjusted_now().timestamp(),
lat,
lon,
};
let Ok(mut json) = serde_json::to_vec(&record) else {
error!("failed to serialize GPS record");
return;
};
json.push(b'\n');
if let Err(e) = file.write_all(&json).await {
error!("failed to write GPS record to storage: {e}");
}
}
async fn stop_current_recording(&mut self) { async fn stop_current_recording(&mut self) {
let mut state = DiagState::Stopped; let mut state = DiagState::Stopped;
std::mem::swap(&mut self.state, &mut state); std::mem::swap(&mut self.state, &mut state);
@@ -352,6 +391,20 @@ impl DiagTask {
return; return;
} }
debug!("done!"); debug!("done!");
// Extract the latest packet timestamp from this container
if let Some(ts) = container
.messages()
.into_iter()
.filter_map(|r| match r {
Ok(Message::Log { timestamp, .. }) => Some(timestamp.to_datetime().timestamp()),
_ => None,
})
.max()
{
self.latest_packet_timestamp = Some(ts);
}
let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum(); let container_bytes: usize = container.messages.iter().map(|m| m.data.len()).sum();
self.bytes_since_space_check += container_bytes; self.bytes_since_space_check += container_bytes;
let max_type = match analysis_writer.analyze(container).await { let max_type = match analysis_writer.analyze(container).await {
@@ -465,6 +518,10 @@ pub fn run_diag_read_thread(
error!("Failed to send delete all entries respons, receiver dropped"); error!("Failed to send delete all entries respons, receiver dropped");
} }
}, },
Some(DiagDeviceCtrlMessage::GpsUpdate { lat, lon }) => {
let qmdl_store = qmdl_store_lock.read().await;
diag_task.handle_gps_update(&qmdl_store, lat, lon).await;
},
} }
} }
maybe_container = diag_stream.next() => { maybe_container = diag_stream.next() => {
+31 -71
View File
@@ -1,12 +1,13 @@
use axum::Json; use axum::Json;
use axum::extract::State; use axum::extract::State;
use axum::http::StatusCode; use axum::http::StatusCode;
use log::{error, info, warn}; use log::{error, warn};
use serde::{Deserialize, Deserializer, Serialize}; use serde::{Deserialize, Deserializer, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, BufReader};
use crate::config::GpsMode; use crate::config::GpsMode;
use crate::diag::DiagDeviceCtrlMessage;
use crate::server::ServerState; use crate::server::ServerState;
fn deserialize_latitude<'de, D>(deserializer: D) -> Result<f64, D::Error> fn deserialize_latitude<'de, D>(deserializer: D) -> Result<f64, D::Error>
@@ -37,28 +38,6 @@ where
Ok(v) Ok(v)
} }
fn deserialize_unix_ts<'de, D>(deserializer: D) -> Result<i64, D::Error>
where
D: Deserializer<'de>,
{
use serde::de;
use serde_json::Value;
match Value::deserialize(deserializer)? {
Value::Number(n) => n
.as_i64()
.or_else(|| n.as_f64().map(|f| f as i64))
.ok_or_else(|| de::Error::custom("timestamp out of range")),
Value::String(s) => s
.trim()
.parse::<f64>()
.map(|f| f as i64)
.map_err(|_| de::Error::custom("timestamp must be a numeric value")),
_ => Err(de::Error::custom(
"timestamp must be a number or numeric string",
)),
}
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(feature = "apidocs", derive(utoipa::ToSchema))] #[cfg_attr(feature = "apidocs", derive(utoipa::ToSchema))]
pub struct GpsData { pub struct GpsData {
@@ -66,18 +45,20 @@ pub struct GpsData {
pub latitude: f64, pub latitude: f64,
#[serde(deserialize_with = "deserialize_longitude")] #[serde(deserialize_with = "deserialize_longitude")]
pub longitude: f64, pub longitude: f64,
#[serde(deserialize_with = "deserialize_unix_ts")]
pub timestamp: i64,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct GpsRecord { pub struct GpsRecord {
pub unix_ts: i64, /// Packet timestamp (modem clock) for correlation with captured packets.
/// None if no packets have been received yet.
pub latest_packet_timestamp: Option<i64>,
/// Drift-corrected system time when this GPS fix was received
pub system_time: i64,
pub lat: f64, pub lat: f64,
pub lon: f64, pub lon: f64,
} }
/// Reads all GPS records from a sidecar NDJSON file, logging and skipping malformed lines. /// Reads all GPS records from a storage NDJSON file, logging and skipping malformed lines.
pub async fn load_gps_records(file: tokio::fs::File) -> Vec<GpsRecord> { pub async fn load_gps_records(file: tokio::fs::File) -> Vec<GpsRecord> {
let reader = BufReader::new(file); let reader = BufReader::new(file);
let mut lines = reader.lines(); let mut lines = reader.lines();
@@ -86,16 +67,16 @@ pub async fn load_gps_records(file: tokio::fs::File) -> Vec<GpsRecord> {
match lines.next_line().await { match lines.next_line().await {
Ok(Some(line)) => match serde_json::from_str::<GpsRecord>(&line) { Ok(Some(line)) => match serde_json::from_str::<GpsRecord>(&line) {
Ok(record) => records.push(record), Ok(record) => records.push(record),
Err(e) => warn!("skipping malformed GPS sidecar line: {e}"), Err(e) => warn!("skipping malformed GPS storage line: {e}"),
}, },
Ok(None) => break, Ok(None) => break,
Err(e) => { Err(e) => {
error!("error reading GPS sidecar file: {e}"); error!("error reading GPS storage file: {e}");
break; break;
} }
} }
} }
records.sort_by_key(|r| r.unix_ts); records.sort_by_key(|r| r.latest_packet_timestamp.unwrap_or(i64::MIN));
records records
} }
@@ -108,10 +89,10 @@ pub async fn load_gps_records(file: tokio::fs::File) -> Vec<GpsRecord> {
responses( responses(
(status = StatusCode::OK, description = "GPS data accepted"), (status = StatusCode::OK, description = "GPS data accepted"),
(status = StatusCode::FORBIDDEN, description = "GPS API endpoint is disabled"), (status = StatusCode::FORBIDDEN, description = "GPS API endpoint is disabled"),
(status = StatusCode::INTERNAL_SERVER_ERROR, description = "Failed to write GPS record") (status = StatusCode::INTERNAL_SERVER_ERROR, description = "Failed to send GPS update")
), ),
summary = "Submit GPS coordinates", summary = "Submit GPS coordinates",
description = "Submit GPS coordinates from an external source (e.g. a phone app). Requires gps_mode to be set to 'Api' in configuration. latitude is in decimal degrees from -90 to 90, longitude is in decimal degrees from -180 to 180, timestamp is a Unix timestamp in seconds." description = "Submit GPS coordinates from an external source (e.g. a phone app). Requires gps_mode to be set to 'Api' in configuration. latitude is in decimal degrees from -90 to 90, longitude is in decimal degrees from -180 to 180. The timestamp is derived from the most recent packet's modem timestamp."
))] ))]
pub async fn post_gps( pub async fn post_gps(
State(state): State<Arc<ServerState>>, State(state): State<Arc<ServerState>>,
@@ -124,48 +105,27 @@ pub async fn post_gps(
.to_string(), .to_string(),
)); ));
} }
// Update in-memory state for GET /api/gps
let mut gps = state.gps_state.write().await; let mut gps = state.gps_state.write().await;
*gps = Some(gps_data.clone()); *gps = Some(gps_data.clone());
drop(gps); drop(gps);
let qmdl_store = state.qmdl_store_lock.read().await; // Send to DiagTask to write to storage with packet timestamp
if let Some((entry_idx, _)) = qmdl_store.get_current_entry() { state
match qmdl_store.open_entry_gps_for_append(entry_idx).await { .diag_device_ctrl_sender
Ok(Some(mut file)) => { .send(DiagDeviceCtrlMessage::GpsUpdate {
let record = GpsRecord { lat: gps_data.latitude,
unix_ts: gps_data.timestamp, lon: gps_data.longitude,
lat: gps_data.latitude, })
lon: gps_data.longitude, .await
}; .map_err(|e| {
let mut json = serde_json::to_vec(&record).map_err(|e| { error!("failed to send GPS update: {e}");
error!("failed to serialize GPS record: {e}"); (
( StatusCode::INTERNAL_SERVER_ERROR,
StatusCode::INTERNAL_SERVER_ERROR, format!("failed to send GPS update: {e}"),
format!("failed to serialize GPS record: {e}"), )
) })?;
})?;
json.push(b'\n');
file.write_all(&json).await.map_err(|e| {
error!("failed to write GPS record to sidecar: {e}");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to write GPS record to sidecar: {e}"),
)
})?;
}
Ok(None) => error!("GPS sidecar directory not found, cannot write GPS record"),
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to open GPS sidecar: {e}"),
));
}
}
} else {
info!(
"GPS data received but no recording is active — position updated in memory only, not persisted to sidecar"
);
}
Ok(StatusCode::OK) Ok(StatusCode::OK)
} }
-1
View File
@@ -311,7 +311,6 @@ async fn run_with_config(
(Some(lat), Some(lon)) => Some(gps::GpsData { (Some(lat), Some(lon)) => Some(gps::GpsData {
latitude: lat, latitude: lat,
longitude: lon, longitude: lon,
timestamp: chrono::Utc::now().timestamp(),
}), }),
_ => { _ => {
warn!( warn!(
+19 -10
View File
@@ -85,31 +85,35 @@ pub(crate) async fn load_gps_records_for_entry(
.and_then(|e| e.gps_mode); .and_then(|e| e.gps_mode);
if gps_mode.is_some_and(|m| m != GpsMode::Disabled) { if gps_mode.is_some_and(|m| m != GpsMode::Disabled) {
error!( error!(
"GPS sidecar expected for entry {entry_index} (mode: {gps_mode:?}) but not found" "GPS storage expected for entry {entry_index} (mode: {gps_mode:?}) but not found"
); );
} }
vec![] vec![]
} }
Err(e) => { Err(e) => {
error!("failed to open GPS sidecar: {e}"); error!("failed to open GPS storage: {e}");
vec![] vec![]
} }
} }
} }
fn find_nearest_gps(records: &[GpsRecord], packet_unix_ts: i64) -> Option<GpsPoint> { fn record_timestamp(r: &GpsRecord) -> i64 {
r.latest_packet_timestamp.unwrap_or(i64::MIN)
}
fn find_nearest_gps(records: &[GpsRecord], packet_timestamp: i64) -> Option<GpsPoint> {
if records.is_empty() { if records.is_empty() {
return None; return None;
} }
let idx = records.partition_point(|r| r.unix_ts <= packet_unix_ts); let idx = records.partition_point(|r| record_timestamp(r) <= packet_timestamp);
let record = if idx == 0 { let record = if idx == 0 {
&records[0] &records[0]
} else if idx >= records.len() { } else if idx >= records.len() {
&records[records.len() - 1] &records[records.len() - 1]
} else { } else {
let (before, after) = (&records[idx - 1], &records[idx]); let (before, after) = (&records[idx - 1], &records[idx]);
let before_delta = packet_unix_ts - before.unix_ts; let before_delta = packet_timestamp - record_timestamp(before);
let after_delta = after.unix_ts - packet_unix_ts; let after_delta = record_timestamp(after) - packet_timestamp;
if before_delta <= after_delta { if before_delta <= after_delta {
before before
} else { } else {
@@ -119,7 +123,7 @@ fn find_nearest_gps(records: &[GpsRecord], packet_unix_ts: i64) -> Option<GpsPoi
Some(GpsPoint { Some(GpsPoint {
latitude: record.lat, latitude: record.lat,
longitude: record.lon, longitude: record.lon,
unix_ts: record.unix_ts, unix_ts: record_timestamp(record),
}) })
} }
@@ -142,7 +146,7 @@ where
continue; continue;
} }
for maybe_msg in container.into_messages() { for maybe_msg in container.messages() {
match maybe_msg { match maybe_msg {
Ok(msg) => { Ok(msg) => {
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?; let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
@@ -166,8 +170,13 @@ where
mod tests { mod tests {
use super::*; use super::*;
fn rec(unix_ts: i64, lat: f64, lon: f64) -> GpsRecord { fn rec(latest_packet_timestamp: i64, lat: f64, lon: f64) -> GpsRecord {
GpsRecord { unix_ts, lat, lon } GpsRecord {
latest_packet_timestamp: Some(latest_packet_timestamp),
system_time: 0,
lat,
lon,
}
} }
#[test] #[test]
+2 -2
View File
@@ -37,8 +37,8 @@ pub enum RecordingStoreError {
ParseManifestError(toml::de::Error), ParseManifestError(toml::de::Error),
#[error("Insufficient disk space: {0}MB available, {1}MB required")] #[error("Insufficient disk space: {0}MB available, {1}MB required")]
InsufficientDiskSpace(u64, u64), InsufficientDiskSpace(u64, u64),
#[error("GPS sidecar directory not found")] #[error("GPS storage directory not found")]
GpsSidecarNotFound, GpsStorageNotFound,
#[error("Serialization error: {0}")] #[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error), SerializationError(#[from] serde_json::Error),
} }
@@ -798,8 +798,8 @@
</select> </select>
<p class="text-xs text-gray-500 mt-1"> <p class="text-xs text-gray-500 mt-1">
{#if config.gps_mode === GpsMode.Api} {#if config.gps_mode === GpsMode.Api}
POST latitude, longitude, and timestamp to <code>/api/gps</code> from POST latitude and longitude to <code>/api/gps</code> from any device
any device on the network. on the network. Timestamp is derived from packet capture timing.
{:else if config.gps_mode === GpsMode.Fixed} {:else if config.gps_mode === GpsMode.Fixed}
GPS coordinates are fixed to the values below. GPS coordinates are fixed to the values below.
{:else} {:else}
@@ -36,11 +36,6 @@
} }
return text; return text;
}); });
const gps_date_formatter = new Intl.DateTimeFormat(undefined, {
timeStyle: 'long',
dateStyle: 'short',
});
</script> </script>
<div <div
@@ -129,18 +124,10 @@
<td class="py-1 pr-4 text-gray-500 font-medium">Latitude</td> <td class="py-1 pr-4 text-gray-500 font-medium">Latitude</td>
<td class="py-1 font-mono">{gps_data.latitude.toFixed(6)}</td> <td class="py-1 font-mono">{gps_data.latitude.toFixed(6)}</td>
</tr> </tr>
<tr class="border-b border-gray-200"> <tr>
<td class="py-1 pr-4 text-gray-500 font-medium">Longitude</td> <td class="py-1 pr-4 text-gray-500 font-medium">Longitude</td>
<td class="py-1 font-mono">{gps_data.longitude.toFixed(6)}</td> <td class="py-1 font-mono">{gps_data.longitude.toFixed(6)}</td>
</tr> </tr>
<tr>
<td class="py-1 pr-4 text-gray-500 font-medium">GPS Timestamp</td>
<td class="py-1 font-mono">
{gps_data.timestamp > 0
? gps_date_formatter.format(new Date(gps_data.timestamp * 1000))
: 'Fixed'}
</td>
</tr>
{:else} {:else}
<tr> <tr>
<td class="py-1 pr-4 text-gray-500 font-medium">GPS Data</td> <td class="py-1 pr-4 text-gray-500 font-medium">GPS Data</td>
+1 -1
View File
@@ -414,7 +414,7 @@ impl Harness {
pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> { pub fn analyze_qmdl_messages(&mut self, container: MessagesContainer) -> Vec<AnalysisRow> {
let mut rows = Vec::new(); let mut rows = Vec::new();
for maybe_qmdl_message in container.into_messages() { for maybe_qmdl_message in container.messages() {
self.packet_num += 1; self.packet_num += 1;
rows.push(AnalysisRow { rows.push(AnalysisRow {
+6 -6
View File
@@ -85,9 +85,9 @@ pub struct MessagesContainer {
} }
impl MessagesContainer { impl MessagesContainer {
pub fn into_messages(self) -> Vec<Result<Message, DiagParsingError>> { pub fn messages(&self) -> Vec<Result<Message, DiagParsingError>> {
let mut result = Vec::new(); let mut result = Vec::new();
for msg in self.messages { for msg in &self.messages {
for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) { for sub_msg in msg.data.split_inclusive(|&b| b == MESSAGE_TERMINATOR) {
match hdlc_decapsulate(sub_msg, &CRC_CCITT) { match hdlc_decapsulate(sub_msg, &CRC_CCITT) {
Ok(data) => match Message::from_bytes((&data, 0)) { Ok(data) => match Message::from_bytes((&data, 0)) {
@@ -569,7 +569,7 @@ mod test {
let mut container = make_container(DataType::UserSpace, encapsulated1); let mut container = make_container(DataType::UserSpace, encapsulated1);
container.messages.push(encapsulated2); container.messages.push(encapsulated2);
container.num_messages += 1; container.num_messages += 1;
assert_eq!(container.into_messages(), vec![Ok(message1), Ok(message2)]); assert_eq!(container.messages(), vec![Ok(message1), Ok(message2)]);
} }
#[test] #[test]
@@ -579,7 +579,7 @@ mod test {
encapsulated1.data.extend(encapsulated2.data); encapsulated1.data.extend(encapsulated2.data);
encapsulated1.len += encapsulated2.len; encapsulated1.len += encapsulated2.len;
let container = make_container(DataType::UserSpace, encapsulated1); let container = make_container(DataType::UserSpace, encapsulated1);
assert_eq!(container.into_messages(), vec![Ok(message1), Ok(message2)]); assert_eq!(container.messages(), vec![Ok(message1), Ok(message2)]);
} }
#[test] #[test]
@@ -593,7 +593,7 @@ mod test {
let mut container = make_container(DataType::UserSpace, encapsulated1); let mut container = make_container(DataType::UserSpace, encapsulated1);
container.messages.push(encapsulated2); container.messages.push(encapsulated2);
container.num_messages += 1; container.num_messages += 1;
let result = container.into_messages(); let result = container.messages();
assert_eq!(result[0], Ok(message1)); assert_eq!(result[0], Ok(message1));
assert!(matches!( assert!(matches!(
result[1], result[1],
@@ -611,7 +611,7 @@ mod test {
let mut container = make_container(DataType::UserSpace, encapsulated1); let mut container = make_container(DataType::UserSpace, encapsulated1);
container.messages.push(bad_encapsulation); container.messages.push(bad_encapsulation);
container.num_messages += 1; container.num_messages += 1;
let result = container.into_messages(); let result = container.messages();
assert_eq!(result[0], Ok(message1)); assert_eq!(result[0], Ok(message1));
assert!(matches!( assert!(matches!(
result[1], result[1],
+1 -1
View File
@@ -212,7 +212,7 @@ impl DiagDevice {
if container.data_type != DataType::UserSpace { if container.data_type != DataType::UserSpace {
continue; continue;
} }
return Ok(container.into_messages()); return Ok(container.messages());
} }
} }