Observer Location
+
+
+
+
@@ -3636,8 +3668,30 @@
+
+
+
+
+
+
+
+ ⚪ Disconnected
+
+
@@ -4279,6 +4333,12 @@
let rangeRingsLayer = null;
let observerMarkerAdsb = null;
+ // GPS Dongle state
+ let gpsDevices = [];
+ let gpsConnected = false;
+ let gpsEventSource = null;
+ let gpsLastPosition = null;
+
// Audio alert system using Web Audio API (uses shared audioContext declared later)
function getAdsbAudioContext() {
if (!window.adsbAudioCtx) {
@@ -8247,10 +8307,23 @@
const mapContainer = document.getElementById('aircraftMap');
if (!mapContainer || aircraftMap) return;
+ // Use GPS position if available, otherwise use observerLocation or default
+ let initialLat = observerLocation.lat || 51.5;
+ let initialLon = observerLocation.lon || -0.1;
+
+ // Check if GPS has a recent position
+ if (gpsLastPosition && gpsLastPosition.latitude && gpsLastPosition.longitude) {
+ initialLat = gpsLastPosition.latitude;
+ initialLon = gpsLastPosition.longitude;
+ observerLocation.lat = initialLat;
+ observerLocation.lon = initialLon;
+ console.log('GPS: Initializing map with GPS position', initialLat, initialLon);
+ }
+
// Initialize Leaflet map
aircraftMap = L.map('aircraftMap', {
- center: [51.5, -0.1], // Default to London
- zoom: 5,
+ center: [initialLat, initialLon],
+ zoom: 8,
zoomControl: true,
attributionControl: true
});
@@ -8294,6 +8367,17 @@
// Initial update
updateAircraftMarkers();
+
+ // Update input fields with current position
+ const adsbLatInput = document.getElementById('adsbObsLat');
+ const adsbLonInput = document.getElementById('adsbObsLon');
+ if (adsbLatInput) adsbLatInput.value = observerLocation.lat.toFixed(4);
+ if (adsbLonInput) adsbLonInput.value = observerLocation.lon.toFixed(4);
+
+ // Draw initial range rings if GPS is connected
+ if (gpsConnected) {
+ drawRangeRings();
+ }
}
function toggleAircraftClustering() {
@@ -8997,6 +9081,207 @@
}
}
+ // ============================================
+ // GPS DONGLE FUNCTIONS
+ // ============================================
+
+ async function checkGpsDongleAvailable() {
+ try {
+ const response = await fetch('/gps/available');
+ const data = await response.json();
+ return data.available;
+ } catch (e) {
+ console.warn('GPS dongle check failed:', e);
+ return false;
+ }
+ }
+
+ async function refreshGpsDevices() {
+ try {
+ const response = await fetch('/gps/devices');
+ const data = await response.json();
+ if (data.status === 'ok') {
+ gpsDevices = data.devices;
+ updateGpsDeviceSelectors();
+ return data.devices;
+ }
+ } catch (e) {
+ console.warn('Failed to get GPS devices:', e);
+ }
+ return [];
+ }
+
+ function updateGpsDeviceSelectors() {
+ // Update all GPS device selectors in the UI
+ const selectors = document.querySelectorAll('.gps-device-select');
+ selectors.forEach(select => {
+ const currentValue = select.value;
+ select.innerHTML = '';
+ gpsDevices.forEach(device => {
+ const option = document.createElement('option');
+ option.value = device.path;
+ option.textContent = device.name + (device.accessible ? '' : ' (no access)');
+ option.disabled = !device.accessible;
+ select.appendChild(option);
+ });
+ if (currentValue && gpsDevices.some(d => d.path === currentValue)) {
+ select.value = currentValue;
+ }
+ });
+ }
+
+ async function startGpsDongle(devicePath) {
+ if (!devicePath) {
+ showError('Please select a GPS device');
+ return false;
+ }
+
+ try {
+ const response = await fetch('/gps/start', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ device: devicePath, baudrate: 9600 })
+ });
+ const data = await response.json();
+
+ if (data.status === 'started') {
+ gpsConnected = true;
+ startGpsStream();
+ updateGpsStatus(true);
+ showInfo('GPS dongle connected: ' + devicePath);
+ return true;
+ } else {
+ showError('Failed to start GPS: ' + data.message);
+ return false;
+ }
+ } catch (e) {
+ showError('GPS connection error: ' + e.message);
+ return false;
+ }
+ }
+
+ async function stopGpsDongle() {
+ try {
+ if (gpsEventSource) {
+ gpsEventSource.close();
+ gpsEventSource = null;
+ }
+ await fetch('/gps/stop', { method: 'POST' });
+ gpsConnected = false;
+ gpsLastPosition = null;
+ updateGpsStatus(false);
+ showInfo('GPS dongle disconnected');
+ } catch (e) {
+ console.warn('GPS stop error:', e);
+ }
+ }
+
+ function startGpsStream() {
+ if (gpsEventSource) {
+ gpsEventSource.close();
+ }
+
+ gpsEventSource = new EventSource('/gps/stream');
+ gpsEventSource.onmessage = (event) => {
+ try {
+ const data = JSON.parse(event.data);
+ console.log('GPS data received:', data);
+ if (data.type === 'position') {
+ gpsLastPosition = data;
+ updateLocationFromGps(data);
+ // Update status indicator with coordinates
+ const statusIndicators = document.querySelectorAll('.gps-status-indicator');
+ statusIndicators.forEach(indicator => {
+ if (data.latitude && data.longitude) {
+ indicator.textContent = `🟢 ${data.latitude.toFixed(4)}, ${data.longitude.toFixed(4)}`;
+ indicator.style.color = 'var(--accent-green)';
+ }
+ });
+ } else if (data.type === 'keepalive') {
+ console.log('GPS keepalive');
+ }
+ } catch (e) {
+ console.error('GPS parse error:', e);
+ }
+ };
+ gpsEventSource.onerror = (e) => {
+ console.warn('GPS stream error:', e);
+ gpsConnected = false;
+ updateGpsStatus(false);
+ };
+ }
+
+ function updateLocationFromGps(position) {
+ if (!position || !position.latitude || !position.longitude) {
+ console.warn('GPS: Invalid position data', position);
+ return;
+ }
+
+ console.log('GPS: Updating location to', position.latitude, position.longitude);
+
+ // Update satellite observer location
+ const satLatInput = document.getElementById('obsLat');
+ const satLonInput = document.getElementById('obsLon');
+ if (satLatInput) satLatInput.value = position.latitude.toFixed(4);
+ if (satLonInput) satLonInput.value = position.longitude.toFixed(4);
+
+ // Update ADS-B observer location
+ const adsbLatInput = document.getElementById('adsbObsLat');
+ const adsbLonInput = document.getElementById('adsbObsLon');
+ if (adsbLatInput) adsbLatInput.value = position.latitude.toFixed(4);
+ if (adsbLonInput) adsbLonInput.value = position.longitude.toFixed(4);
+
+ // Update observerLocation for ADS-B calculations
+ observerLocation.lat = position.latitude;
+ observerLocation.lon = position.longitude;
+
+ // Center ADS-B map on new location (only on first fix or significant movement)
+ if (typeof aircraftMap !== 'undefined' && aircraftMap) {
+ const currentCenter = aircraftMap.getCenter();
+ const distance = Math.sqrt(
+ Math.pow(currentCenter.lat - position.latitude, 2) +
+ Math.pow(currentCenter.lng - position.longitude, 2)
+ );
+ console.log('GPS: Map exists, distance from current center:', distance);
+ // Only recenter if moved more than ~1km (0.01 degrees)
+ if (distance > 0.01 || !aircraftMap._gpsInitialized) {
+ console.log('GPS: Centering map on', position.latitude, position.longitude);
+ aircraftMap.setView([position.latitude, position.longitude], aircraftMap.getZoom());
+ aircraftMap._gpsInitialized = true;
+ }
+ } else {
+ console.log('GPS: aircraftMap not available yet');
+ }
+
+ // Trigger map updates
+ if (typeof drawRangeRings === 'function') {
+ drawRangeRings();
+ }
+ }
+
+ function updateGpsStatus(connected) {
+ const statusIndicators = document.querySelectorAll('.gps-status-indicator');
+ statusIndicators.forEach(indicator => {
+ indicator.textContent = connected ? '🟢 Connected' : '⚪ Disconnected';
+ indicator.style.color = connected ? 'var(--accent-green)' : 'var(--text-secondary)';
+ });
+
+ const connectBtns = document.querySelectorAll('.gps-connect-btn');
+ const disconnectBtns = document.querySelectorAll('.gps-disconnect-btn');
+ connectBtns.forEach(btn => btn.style.display = connected ? 'none' : 'block');
+ disconnectBtns.forEach(btn => btn.style.display = connected ? 'block' : 'none');
+ }
+
+ function toggleGpsSection(show) {
+ const gpsSections = document.querySelectorAll('.gps-dongle-section');
+ gpsSections.forEach(section => {
+ section.style.display = show ? 'block' : 'none';
+ });
+ if (show) {
+ refreshGpsDevices();
+ }
+ }
+
function initPolarPlot() {
const canvas = document.getElementById('polarPlotCanvas');
if (!canvas) return;
diff --git a/utils/gps.py b/utils/gps.py
new file mode 100644
index 0000000..1633fd4
--- /dev/null
+++ b/utils/gps.py
@@ -0,0 +1,506 @@
+"""
+GPS dongle support for INTERCEPT.
+
+Provides detection and reading of USB GPS dongles via serial port.
+Parses NMEA sentences to extract location data.
+"""
+
+from __future__ import annotations
+
+import logging
+import os
+import re
+import glob
+import threading
+import time
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Optional, Callable
+
+logger = logging.getLogger('intercept.gps')
+
+# Try to import serial, but don't fail if not available
+try:
+ import serial
+ SERIAL_AVAILABLE = True
+except ImportError:
+ SERIAL_AVAILABLE = False
+ logger.warning("pyserial not installed - GPS dongle support disabled")
+
+
+@dataclass
+class GPSPosition:
+ """GPS position data."""
+ latitude: float
+ longitude: float
+ altitude: Optional[float] = None
+ speed: Optional[float] = None # knots
+ heading: Optional[float] = None # degrees
+ satellites: Optional[int] = None
+ fix_quality: int = 0 # 0=invalid, 1=GPS, 2=DGPS
+ timestamp: Optional[datetime] = None
+ device: Optional[str] = None
+
+ def to_dict(self) -> dict:
+ """Convert to dictionary for JSON serialization."""
+ return {
+ 'latitude': self.latitude,
+ 'longitude': self.longitude,
+ 'altitude': self.altitude,
+ 'speed': self.speed,
+ 'heading': self.heading,
+ 'satellites': self.satellites,
+ 'fix_quality': self.fix_quality,
+ 'timestamp': self.timestamp.isoformat() if self.timestamp else None,
+ 'device': self.device,
+ }
+
+
+def detect_gps_devices() -> list[dict]:
+ """
+ Detect potential GPS serial devices.
+
+ Returns a list of device info dictionaries.
+ """
+ devices = []
+
+ # Common GPS device patterns by platform
+ patterns = []
+
+ if os.name == 'posix':
+ # Linux
+ patterns.extend([
+ '/dev/ttyUSB*', # USB serial adapters
+ '/dev/ttyACM*', # USB CDC ACM devices (many GPS)
+ '/dev/gps*', # gpsd symlinks
+ ])
+ # macOS
+ patterns.extend([
+ '/dev/tty.usbserial*',
+ '/dev/tty.usbmodem*',
+ '/dev/cu.usbserial*',
+ '/dev/cu.usbmodem*',
+ ])
+
+ for pattern in patterns:
+ for path in glob.glob(pattern):
+ # Try to get device info
+ device_info = {
+ 'path': path,
+ 'name': os.path.basename(path),
+ 'type': 'serial',
+ }
+
+ # Check if it's readable
+ if os.access(path, os.R_OK):
+ device_info['accessible'] = True
+ else:
+ device_info['accessible'] = False
+ device_info['error'] = 'Permission denied'
+
+ devices.append(device_info)
+
+ return devices
+
+
+def parse_nmea_coordinate(coord: str, direction: str) -> Optional[float]:
+ """
+ Parse NMEA coordinate format to decimal degrees.
+
+ NMEA format: DDDMM.MMMM or DDMM.MMMM
+ """
+ if not coord or not direction:
+ return None
+
+ try:
+ # Find the decimal point
+ dot_pos = coord.index('.')
+
+ # Degrees are everything before the last 2 digits before decimal
+ degrees = int(coord[:dot_pos - 2])
+ minutes = float(coord[dot_pos - 2:])
+
+ result = degrees + (minutes / 60.0)
+
+ # Apply direction
+ if direction in ('S', 'W'):
+ result = -result
+
+ return result
+ except (ValueError, IndexError):
+ return None
+
+
+def parse_gga(parts: list[str]) -> Optional[GPSPosition]:
+ """
+ Parse GPGGA/GNGGA sentence (Global Positioning System Fix Data).
+
+ Format: $GPGGA,time,lat,N/S,lon,E/W,quality,satellites,hdop,altitude,M,...
+ """
+ if len(parts) < 10:
+ return None
+
+ try:
+ fix_quality = int(parts[6]) if parts[6] else 0
+
+ # No fix
+ if fix_quality == 0:
+ return None
+
+ lat = parse_nmea_coordinate(parts[2], parts[3])
+ lon = parse_nmea_coordinate(parts[4], parts[5])
+
+ if lat is None or lon is None:
+ return None
+
+ # Parse optional fields
+ satellites = int(parts[7]) if parts[7] else None
+ altitude = float(parts[9]) if parts[9] else None
+
+ # Parse time (HHMMSS.sss)
+ timestamp = None
+ if parts[1]:
+ try:
+ time_str = parts[1].split('.')[0]
+ if len(time_str) >= 6:
+ now = datetime.utcnow()
+ timestamp = now.replace(
+ hour=int(time_str[0:2]),
+ minute=int(time_str[2:4]),
+ second=int(time_str[4:6]),
+ microsecond=0
+ )
+ except (ValueError, IndexError):
+ pass
+
+ return GPSPosition(
+ latitude=lat,
+ longitude=lon,
+ altitude=altitude,
+ satellites=satellites,
+ fix_quality=fix_quality,
+ timestamp=timestamp,
+ )
+ except (ValueError, IndexError) as e:
+ logger.debug(f"GGA parse error: {e}")
+ return None
+
+
+def parse_rmc(parts: list[str]) -> Optional[GPSPosition]:
+ """
+ Parse GPRMC/GNRMC sentence (Recommended Minimum).
+
+ Format: $GPRMC,time,status,lat,N/S,lon,E/W,speed,heading,date,...
+ """
+ if len(parts) < 8:
+ return None
+
+ try:
+ # Check status (A=active/valid, V=void/invalid)
+ if parts[2] != 'A':
+ return None
+
+ lat = parse_nmea_coordinate(parts[3], parts[4])
+ lon = parse_nmea_coordinate(parts[5], parts[6])
+
+ if lat is None or lon is None:
+ return None
+
+ # Parse optional fields
+ speed = float(parts[7]) if parts[7] else None # knots
+ heading = float(parts[8]) if len(parts) > 8 and parts[8] else None
+
+ # Parse timestamp
+ timestamp = None
+ if parts[1] and len(parts) > 9 and parts[9]:
+ try:
+ time_str = parts[1].split('.')[0]
+ date_str = parts[9]
+ if len(time_str) >= 6 and len(date_str) >= 6:
+ timestamp = datetime(
+ year=2000 + int(date_str[4:6]),
+ month=int(date_str[2:4]),
+ day=int(date_str[0:2]),
+ hour=int(time_str[0:2]),
+ minute=int(time_str[2:4]),
+ second=int(time_str[4:6]),
+ )
+ except (ValueError, IndexError):
+ pass
+
+ return GPSPosition(
+ latitude=lat,
+ longitude=lon,
+ speed=speed,
+ heading=heading,
+ timestamp=timestamp,
+ fix_quality=1, # RMC with A status means valid fix
+ )
+ except (ValueError, IndexError) as e:
+ logger.debug(f"RMC parse error: {e}")
+ return None
+
+
+def parse_nmea_sentence(sentence: str) -> Optional[GPSPosition]:
+ """
+ Parse an NMEA sentence and extract position data.
+
+ Supports: GGA, RMC sentences (with GP, GN, GL prefixes)
+ """
+ sentence = sentence.strip()
+
+ # Validate checksum if present
+ if '*' in sentence:
+ data, checksum = sentence.rsplit('*', 1)
+ if data.startswith('$'):
+ data = data[1:]
+
+ # Calculate checksum
+ calc_checksum = 0
+ for char in data:
+ calc_checksum ^= ord(char)
+
+ try:
+ if int(checksum, 16) != calc_checksum:
+ logger.debug(f"Checksum mismatch: {sentence}")
+ return None
+ except ValueError:
+ pass
+
+ # Remove $ prefix if present
+ if sentence.startswith('$'):
+ sentence = sentence[1:]
+
+ # Remove checksum for parsing
+ if '*' in sentence:
+ sentence = sentence.split('*')[0]
+
+ parts = sentence.split(',')
+ if not parts:
+ return None
+
+ msg_type = parts[0]
+
+ # Handle various NMEA talker IDs (GP=GPS, GN=GNSS, GL=GLONASS, GA=Galileo)
+ if msg_type.endswith('GGA'):
+ return parse_gga(parts)
+ elif msg_type.endswith('RMC'):
+ return parse_rmc(parts)
+
+ return None
+
+
+class GPSReader:
+ """
+ Reads GPS data from a serial device.
+
+ Runs in a background thread and maintains current position.
+ """
+
+ def __init__(self, device_path: str, baudrate: int = 9600):
+ self.device_path = device_path
+ self.baudrate = baudrate
+ self._position: Optional[GPSPosition] = None
+ self._lock = threading.Lock()
+ self._running = False
+ self._thread: Optional[threading.Thread] = None
+ self._serial: Optional['serial.Serial'] = None
+ self._last_update: Optional[datetime] = None
+ self._error: Optional[str] = None
+ self._callbacks: list[Callable[[GPSPosition], None]] = []
+
+ @property
+ def position(self) -> Optional[GPSPosition]:
+ """Get the current GPS position."""
+ with self._lock:
+ return self._position
+
+ @property
+ def is_running(self) -> bool:
+ """Check if the reader is running."""
+ return self._running
+
+ @property
+ def last_update(self) -> Optional[datetime]:
+ """Get the time of the last position update."""
+ with self._lock:
+ return self._last_update
+
+ @property
+ def error(self) -> Optional[str]:
+ """Get any error message."""
+ with self._lock:
+ return self._error
+
+ def add_callback(self, callback: Callable[[GPSPosition], None]) -> None:
+ """Add a callback to be called on position updates."""
+ self._callbacks.append(callback)
+
+ def remove_callback(self, callback: Callable[[GPSPosition], None]) -> None:
+ """Remove a position update callback."""
+ if callback in self._callbacks:
+ self._callbacks.remove(callback)
+
+ def start(self) -> bool:
+ """Start reading GPS data in a background thread."""
+ if not SERIAL_AVAILABLE:
+ self._error = "pyserial not installed"
+ return False
+
+ if self._running:
+ return True
+
+ try:
+ self._serial = serial.Serial(
+ self.device_path,
+ baudrate=self.baudrate,
+ timeout=1.0
+ )
+ self._running = True
+ self._error = None
+
+ self._thread = threading.Thread(target=self._read_loop, daemon=True)
+ self._thread.start()
+
+ logger.info(f"Started GPS reader on {self.device_path}")
+ return True
+
+ except serial.SerialException as e:
+ self._error = str(e)
+ logger.error(f"Failed to open GPS device {self.device_path}: {e}")
+ return False
+
+ def stop(self) -> None:
+ """Stop reading GPS data."""
+ self._running = False
+
+ if self._serial:
+ try:
+ self._serial.close()
+ except Exception:
+ pass
+ self._serial = None
+
+ if self._thread:
+ self._thread.join(timeout=2.0)
+ self._thread = None
+
+ logger.info(f"Stopped GPS reader on {self.device_path}")
+
+ def _read_loop(self) -> None:
+ """Background thread loop for reading GPS data."""
+ buffer = ""
+ sentence_count = 0
+
+ while self._running and self._serial:
+ try:
+ # Read available data
+ if self._serial.in_waiting:
+ data = self._serial.read(self._serial.in_waiting)
+ buffer += data.decode('ascii', errors='ignore')
+
+ # Process complete lines
+ while '\n' in buffer:
+ line, buffer = buffer.split('\n', 1)
+ line = line.strip()
+
+ if line.startswith('$'):
+ sentence_count += 1
+ # Log first few sentences and periodically after that
+ if sentence_count <= 5 or sentence_count % 100 == 0:
+ logger.debug(f"GPS NMEA [{sentence_count}]: {line[:60]}...")
+
+ position = parse_nmea_sentence(line)
+ if position:
+ logger.info(f"GPS fix: {position.latitude:.6f}, {position.longitude:.6f} (sats: {position.satellites}, quality: {position.fix_quality})")
+ position.device = self.device_path
+ self._update_position(position)
+ else:
+ time.sleep(0.1)
+
+ except serial.SerialException as e:
+ logger.error(f"GPS read error: {e}")
+ with self._lock:
+ self._error = str(e)
+ break
+ except Exception as e:
+ logger.debug(f"GPS parse error: {e}")
+
+ def _update_position(self, position: GPSPosition) -> None:
+ """Update the current position and notify callbacks."""
+ with self._lock:
+ # Merge data from different sentence types
+ if self._position:
+ # Keep altitude from GGA if RMC doesn't have it
+ if position.altitude is None and self._position.altitude:
+ position.altitude = self._position.altitude
+ # Keep satellites from GGA
+ if position.satellites is None and self._position.satellites:
+ position.satellites = self._position.satellites
+
+ self._position = position
+ self._last_update = datetime.utcnow()
+ self._error = None
+
+ # Notify callbacks
+ for callback in self._callbacks:
+ try:
+ callback(position)
+ except Exception as e:
+ logger.error(f"GPS callback error: {e}")
+
+
+# Global GPS reader instance
+_gps_reader: Optional[GPSReader] = None
+_gps_lock = threading.Lock()
+
+
+def get_gps_reader() -> Optional[GPSReader]:
+ """Get the global GPS reader instance."""
+ with _gps_lock:
+ return _gps_reader
+
+
+def start_gps(device_path: str, baudrate: int = 9600) -> bool:
+ """
+ Start the global GPS reader.
+
+ Args:
+ device_path: Path to the GPS serial device
+ baudrate: Serial baudrate (default 9600)
+
+ Returns:
+ True if started successfully
+ """
+ global _gps_reader
+
+ with _gps_lock:
+ # Stop existing reader if any
+ if _gps_reader:
+ _gps_reader.stop()
+
+ _gps_reader = GPSReader(device_path, baudrate)
+ return _gps_reader.start()
+
+
+def stop_gps() -> None:
+ """Stop the global GPS reader."""
+ global _gps_reader
+
+ with _gps_lock:
+ if _gps_reader:
+ _gps_reader.stop()
+ _gps_reader = None
+
+
+def get_current_position() -> Optional[GPSPosition]:
+ """Get the current GPS position from the global reader."""
+ reader = get_gps_reader()
+ if reader:
+ return reader.position
+ return None
+
+
+def is_serial_available() -> bool:
+ """Check if pyserial is available."""
+ return SERIAL_AVAILABLE
diff --git a/utils/sdr/__init__.py b/utils/sdr/__init__.py
index aa7cbcc..d29f054 100644
--- a/utils/sdr/__init__.py
+++ b/utils/sdr/__init__.py
@@ -21,6 +21,8 @@ Example usage:
cmd = builder.build_fm_demod_command(device, frequency_mhz=153.35)
"""
+from __future__ import annotations
+
from typing import Optional
from .base import CommandBuilder, SDRCapabilities, SDRDevice, SDRType
diff --git a/utils/sdr/base.py b/utils/sdr/base.py
index 29f2f8f..7f6f49a 100644
--- a/utils/sdr/base.py
+++ b/utils/sdr/base.py
@@ -5,6 +5,8 @@ This module provides the core abstractions for supporting multiple SDR hardware
types (RTL-SDR, LimeSDR, HackRF, etc.) through a unified interface.
"""
+from __future__ import annotations
+
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
diff --git a/utils/sdr/detection.py b/utils/sdr/detection.py
index ec0a08e..6c996d5 100644
--- a/utils/sdr/detection.py
+++ b/utils/sdr/detection.py
@@ -4,6 +4,8 @@ Multi-hardware SDR device detection.
Detects RTL-SDR devices via rtl_test and other SDR hardware via SoapySDR.
"""
+from __future__ import annotations
+
import logging
import re
import shutil
diff --git a/utils/sdr/hackrf.py b/utils/sdr/hackrf.py
index 5fa3a07..7abcef8 100644
--- a/utils/sdr/hackrf.py
+++ b/utils/sdr/hackrf.py
@@ -5,6 +5,8 @@ Uses SoapySDR-based tools for FM demodulation and signal capture.
HackRF supports 1 MHz to 6 GHz frequency range.
"""
+from __future__ import annotations
+
from typing import Optional
from .base import CommandBuilder, SDRCapabilities, SDRDevice, SDRType
diff --git a/utils/sdr/limesdr.py b/utils/sdr/limesdr.py
index 7f7ffdd..4f8fcb2 100644
--- a/utils/sdr/limesdr.py
+++ b/utils/sdr/limesdr.py
@@ -5,6 +5,8 @@ Uses SoapySDR-based tools for FM demodulation and signal capture.
LimeSDR supports 100 kHz to 3.8 GHz frequency range.
"""
+from __future__ import annotations
+
from typing import Optional
from .base import CommandBuilder, SDRCapabilities, SDRDevice, SDRType
diff --git a/utils/sdr/rtlsdr.py b/utils/sdr/rtlsdr.py
index 7cdf243..3c58918 100644
--- a/utils/sdr/rtlsdr.py
+++ b/utils/sdr/rtlsdr.py
@@ -5,6 +5,8 @@ Uses native rtl_* tools (rtl_fm, rtl_433) and dump1090 for maximum compatibility
with existing RTL-SDR installations. No SoapySDR dependency required.
"""
+from __future__ import annotations
+
from typing import Optional
from .base import CommandBuilder, SDRCapabilities, SDRDevice, SDRType