mirror of
https://github.com/smittix/intercept.git
synced 2026-06-14 00:33:35 -07:00
chore: Bump version to v2.18.0
Bluetooth enhancements (service data inspector, appearance codes, MAC cluster tracking, behavioral flags, IRK badges, distance estimation), ACARS SoapySDR multi-backend support, dump1090 stale process cleanup, GPS error state, and proximity radar/signal card UI improvements. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
+178
@@ -483,3 +483,181 @@ def get_current_position() -> GPSPosition | None:
|
||||
if client:
|
||||
return client.position
|
||||
return None
|
||||
|
||||
|
||||
# ============================================
|
||||
# GPS device detection and gpsd auto-start
|
||||
# ============================================
|
||||
|
||||
_gpsd_process: 'subprocess.Popen | None' = None
|
||||
_gpsd_process_lock = threading.Lock()
|
||||
|
||||
|
||||
def detect_gps_devices() -> list[dict]:
|
||||
"""
|
||||
Detect connected GPS serial devices.
|
||||
|
||||
Returns list of dicts with 'path' and 'description' keys.
|
||||
"""
|
||||
import glob
|
||||
import os
|
||||
import platform
|
||||
|
||||
devices: list[dict] = []
|
||||
system = platform.system()
|
||||
|
||||
if system == 'Linux':
|
||||
# Common USB GPS device paths
|
||||
patterns = ['/dev/ttyUSB*', '/dev/ttyACM*']
|
||||
for pattern in patterns:
|
||||
for path in sorted(glob.glob(pattern)):
|
||||
desc = _describe_device_linux(path)
|
||||
devices.append({'path': path, 'description': desc})
|
||||
|
||||
# Also check /dev/serial/by-id for descriptive names
|
||||
serial_dir = '/dev/serial/by-id'
|
||||
if os.path.isdir(serial_dir):
|
||||
for name in sorted(os.listdir(serial_dir)):
|
||||
full = os.path.join(serial_dir, name)
|
||||
real = os.path.realpath(full)
|
||||
# Skip if we already found this device
|
||||
if any(d['path'] == real for d in devices):
|
||||
# Update description with the more descriptive name
|
||||
for d in devices:
|
||||
if d['path'] == real:
|
||||
d['description'] = name
|
||||
continue
|
||||
devices.append({'path': real, 'description': name})
|
||||
|
||||
elif system == 'Darwin':
|
||||
# macOS: USB serial devices (prefer cu. over tty. for outgoing)
|
||||
patterns = ['/dev/cu.usbmodem*', '/dev/cu.usbserial*']
|
||||
for pattern in patterns:
|
||||
for path in sorted(glob.glob(pattern)):
|
||||
desc = _describe_device_macos(path)
|
||||
devices.append({'path': path, 'description': desc})
|
||||
|
||||
# Sort: devices with GPS-related descriptions first
|
||||
gps_keywords = ('gps', 'gnss', 'u-blox', 'ublox', 'nmea', 'sirf', 'navigation')
|
||||
devices.sort(key=lambda d: (
|
||||
0 if any(k in d['description'].lower() for k in gps_keywords) else 1
|
||||
))
|
||||
|
||||
return devices
|
||||
|
||||
|
||||
def _describe_device_linux(path: str) -> str:
|
||||
"""Get a human-readable description of a Linux serial device."""
|
||||
import os
|
||||
basename = os.path.basename(path)
|
||||
# Try to read from sysfs
|
||||
try:
|
||||
# /sys/class/tty/ttyUSB0/device/../product
|
||||
sysfs = f'/sys/class/tty/{basename}/device/../product'
|
||||
if os.path.exists(sysfs):
|
||||
with open(sysfs) as f:
|
||||
return f.read().strip()
|
||||
except Exception:
|
||||
pass
|
||||
return basename
|
||||
|
||||
|
||||
def _describe_device_macos(path: str) -> str:
|
||||
"""Get a description of a macOS serial device."""
|
||||
import os
|
||||
return os.path.basename(path)
|
||||
|
||||
|
||||
def is_gpsd_running(host: str = 'localhost', port: int = 2947) -> bool:
|
||||
"""Check if gpsd is reachable."""
|
||||
import socket
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(1.0)
|
||||
sock.connect((host, port))
|
||||
sock.close()
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def start_gpsd_daemon(device_path: str, host: str = 'localhost',
|
||||
port: int = 2947) -> tuple[bool, str]:
|
||||
"""
|
||||
Start gpsd daemon pointing at the given device.
|
||||
|
||||
Returns (success, message) tuple.
|
||||
"""
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
global _gpsd_process
|
||||
|
||||
with _gpsd_process_lock:
|
||||
# Already running?
|
||||
if is_gpsd_running(host, port):
|
||||
return True, 'gpsd already running'
|
||||
|
||||
gpsd_bin = shutil.which('gpsd')
|
||||
if not gpsd_bin:
|
||||
return False, 'gpsd not installed'
|
||||
|
||||
# Stop any existing managed process
|
||||
stop_gpsd_daemon()
|
||||
|
||||
try:
|
||||
import os
|
||||
if not os.path.exists(device_path):
|
||||
return False, f'Device {device_path} not found'
|
||||
|
||||
cmd = [gpsd_bin, '-N', '-n', '-S', str(port), device_path]
|
||||
logger.info(f"Starting gpsd: {' '.join(cmd)}")
|
||||
print(f"[GPS] Starting gpsd: {' '.join(cmd)}", flush=True)
|
||||
|
||||
_gpsd_process = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
|
||||
# Give gpsd a moment to start
|
||||
import time
|
||||
time.sleep(1.5)
|
||||
|
||||
if _gpsd_process.poll() is not None:
|
||||
stderr = ''
|
||||
if _gpsd_process.stderr:
|
||||
stderr = _gpsd_process.stderr.read().decode('utf-8', errors='ignore').strip()
|
||||
msg = f'gpsd exited with code {_gpsd_process.returncode}'
|
||||
if stderr:
|
||||
msg += f': {stderr}'
|
||||
return False, msg
|
||||
|
||||
# Verify it's listening
|
||||
if is_gpsd_running(host, port):
|
||||
return True, f'gpsd started on {device_path}'
|
||||
else:
|
||||
return False, 'gpsd started but not accepting connections'
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start gpsd: {e}")
|
||||
return False, str(e)
|
||||
|
||||
|
||||
def stop_gpsd_daemon() -> None:
|
||||
"""Stop the managed gpsd daemon process."""
|
||||
global _gpsd_process
|
||||
|
||||
with _gpsd_process_lock:
|
||||
if _gpsd_process and _gpsd_process.poll() is None:
|
||||
try:
|
||||
_gpsd_process.terminate()
|
||||
_gpsd_process.wait(timeout=3.0)
|
||||
except Exception:
|
||||
try:
|
||||
_gpsd_process.kill()
|
||||
except Exception:
|
||||
pass
|
||||
logger.info("Stopped gpsd daemon")
|
||||
print("[GPS] Stopped gpsd daemon", flush=True)
|
||||
_gpsd_process = None
|
||||
|
||||
Reference in New Issue
Block a user