From 257de37dfebf2548fee4bf4b20ef17f956d5cece Mon Sep 17 00:00:00 2001 From: Smittix Date: Tue, 10 Feb 2026 18:07:52 +0000 Subject: [PATCH] Update SDR device claims --- app.py | 120 ++++++++++++--------- intercept_agent.py | 197 +++++++++++++++++++--------------- routes/adsb.py | 108 ++++++++++++++----- routes/ais.py | 13 ++- routes/listening_post.py | 182 +++++++++++++++++-------------- routes/pager.py | 129 +++++++++++----------- routes/sensor.py | 94 ++++++++-------- routes/waterfall_websocket.py | 85 ++++++++------- templates/ais_dashboard.html | 77 ++++++++----- 9 files changed, 585 insertions(+), 420 deletions(-) diff --git a/app.py b/app.py index 7a82bd5..2c8f428 100644 --- a/app.py +++ b/app.py @@ -235,62 +235,77 @@ cleanup_manager.register(deauth_alerts) # ============================================ # SDR DEVICE REGISTRY # ============================================ -# Tracks which mode is using which SDR device to prevent conflicts -# Key: device_index (int), Value: mode_name (str) -sdr_device_registry: dict[int, str] = {} -sdr_device_registry_lock = threading.Lock() - - -def claim_sdr_device(device_index: int, mode_name: str) -> str | None: - """Claim an SDR device for a mode. +# Tracks which mode is using which SDR device to prevent conflicts +# Key: device_index (int), Value: {sdr_type: mode_name} +sdr_device_registry: dict[int, dict[str, str]] = {} +sdr_device_registry_lock = threading.Lock() + + +def claim_sdr_device(device_index: int, mode_name: str, sdr_type: str = 'rtlsdr') -> str | None: + """Claim an SDR device for a mode. Checks the in-app registry first, then probes the USB device to catch stale handles held by external processes (e.g. a leftover rtl_fm from a previous crash). Args: - device_index: The SDR device index to claim - mode_name: Name of the mode claiming the device (e.g., 'sensor', 'rtlamr') + device_index: The SDR device index to claim + mode_name: Name of the mode claiming the device (e.g., 'sensor', 'rtlamr') + sdr_type: SDR hardware type (e.g., 'rtlsdr', 'hackrf') Returns: Error message if device is in use, None if successfully claimed """ - with sdr_device_registry_lock: - if device_index in sdr_device_registry: - in_use_by = sdr_device_registry[device_index] - return f'SDR device {device_index} is in use by {in_use_by}. Stop {in_use_by} first or use a different device.' - - # Probe the USB device to catch external processes holding the handle - try: - from utils.sdr.detection import probe_rtlsdr_device - usb_error = probe_rtlsdr_device(device_index) - if usb_error: - return usb_error - except Exception: - pass # If probe fails, let the caller proceed normally - - sdr_device_registry[device_index] = mode_name - return None - - -def release_sdr_device(device_index: int) -> None: - """Release an SDR device from the registry. + sdr_type_key = str(sdr_type or 'rtlsdr').lower() + + with sdr_device_registry_lock: + device_entry = sdr_device_registry.get(device_index, {}) + if sdr_type_key in device_entry: + in_use_by = device_entry[sdr_type_key] + return f'SDR device {device_index} is in use by {in_use_by}. Stop {in_use_by} first or use a different device.' + + # Probe the USB device to catch external processes holding the handle + # Only relevant for RTL-SDR devices + if sdr_type_key == 'rtlsdr': + try: + from utils.sdr.detection import probe_rtlsdr_device + usb_error = probe_rtlsdr_device(device_index) + if usb_error: + return usb_error + except Exception: + pass # If probe fails, let the caller proceed normally + + if device_index not in sdr_device_registry: + sdr_device_registry[device_index] = {} + sdr_device_registry[device_index][sdr_type_key] = mode_name + return None + + +def release_sdr_device(device_index: int, sdr_type: str = 'rtlsdr') -> None: + """Release an SDR device from the registry. Args: - device_index: The SDR device index to release - """ - with sdr_device_registry_lock: - sdr_device_registry.pop(device_index, None) + device_index: The SDR device index to release + sdr_type: SDR hardware type (e.g., 'rtlsdr', 'hackrf') + """ + sdr_type_key = str(sdr_type or 'rtlsdr').lower() + with sdr_device_registry_lock: + entry = sdr_device_registry.get(device_index) + if not entry: + return + entry.pop(sdr_type_key, None) + if not entry: + sdr_device_registry.pop(device_index, None) -def get_sdr_device_status() -> dict[int, str]: - """Get current SDR device allocations. +def get_sdr_device_status() -> dict[int, dict[str, str]]: + """Get current SDR device allocations. Returns: - Dictionary mapping device indices to mode names - """ - with sdr_device_registry_lock: - return dict(sdr_device_registry) + Dictionary mapping device indices to {sdr_type: mode_name} + """ + with sdr_device_registry_lock: + return {idx: dict(modes) for idx, modes in sdr_device_registry.items()} # ============================================ @@ -388,17 +403,20 @@ def get_devices() -> Response: @app.route('/devices/status') -def get_devices_status() -> Response: - """Get all SDR devices with usage status.""" - devices = SDRFactory.detect_devices() - registry = get_sdr_device_status() - - result = [] - for device in devices: - d = device.to_dict() - d['in_use'] = device.index in registry - d['used_by'] = registry.get(device.index) - result.append(d) +def get_devices_status() -> Response: + """Get all SDR devices with usage status.""" + devices = SDRFactory.detect_devices() + registry = get_sdr_device_status() + + result = [] + for device in devices: + d = device.to_dict() + sdr_type_key = device.sdr_type.value if hasattr(device.sdr_type, 'value') else str(device.sdr_type) + sdr_type_key = str(sdr_type_key).lower() + device_registry = registry.get(device.index, {}) + d['in_use'] = sdr_type_key in device_registry + d['used_by'] = device_registry.get(sdr_type_key) + result.append(d) return jsonify(result) diff --git a/intercept_agent.py b/intercept_agent.py index ac635a3..8bd6c1f 100644 --- a/intercept_agent.py +++ b/intercept_agent.py @@ -673,13 +673,14 @@ class ModeManager: def get_status(self) -> dict: """Get overall agent status.""" # Build running modes with device info for multi-SDR tracking - running_modes_detail = {} - for mode, info in self.running_modes.items(): - params = info.get('params', {}) - running_modes_detail[mode] = { - 'started_at': info.get('started_at'), - 'device': params.get('device', params.get('device_index', 0)), - } + running_modes_detail = {} + for mode, info in self.running_modes.items(): + params = info.get('params', {}) + running_modes_detail[mode] = { + 'started_at': info.get('started_at'), + 'device': params.get('device', params.get('device_index', 0)), + 'sdr_type': str(params.get('sdr_type', 'rtlsdr')).lower(), + } status = { 'running_modes': list(self.running_modes.keys()), @@ -698,22 +699,24 @@ class ModeManager: # Modes that use RTL-SDR devices SDR_MODES = {'adsb', 'sensor', 'pager', 'ais', 'acars', 'dsc', 'rtlamr', 'listening_post'} - def get_sdr_in_use(self, device: int = 0) -> str | None: - """Check if an SDR device is in use by another mode. - - Returns the mode name using the device, or None if available. - """ - for mode, info in self.running_modes.items(): - if mode in self.SDR_MODES: - mode_device = info.get('params', {}).get('device', 0) - # Normalize to int for comparison - try: - mode_device = int(mode_device) - except (ValueError, TypeError): - mode_device = 0 - if mode_device == device: - return mode - return None + def get_sdr_in_use(self, device: int = 0, sdr_type: str = 'rtlsdr') -> str | None: + """Check if an SDR device is in use by another mode. + + Returns the mode name using the device, or None if available. + """ + sdr_type_key = str(sdr_type or 'rtlsdr').lower() + for mode, info in self.running_modes.items(): + if mode in self.SDR_MODES: + mode_device = info.get('params', {}).get('device', 0) + mode_sdr_type = str(info.get('params', {}).get('sdr_type', 'rtlsdr')).lower() + # Normalize to int for comparison + try: + mode_device = int(mode_device) + except (ValueError, TypeError): + mode_device = 0 + if mode_device == device and mode_sdr_type == sdr_type_key: + return mode + return None def start_mode(self, mode: str, params: dict) -> dict: """Start a mode with given parameters.""" @@ -725,18 +728,19 @@ class ModeManager: return {'status': 'error', 'message': f'{mode} not available (missing tools)'} # Check SDR device conflicts for SDR-based modes - if mode in self.SDR_MODES: - device = params.get('device', 0) - try: - device = int(device) - except (ValueError, TypeError): - device = 0 - in_use_by = self.get_sdr_in_use(device) - if in_use_by: - return { - 'status': 'error', - 'message': f'SDR device {device} is in use by {in_use_by}. Stop {in_use_by} first or use a different device.' - } + if mode in self.SDR_MODES: + device = params.get('device', 0) + try: + device = int(device) + except (ValueError, TypeError): + device = 0 + sdr_type = str(params.get('sdr_type', 'rtlsdr')).lower() + in_use_by = self.get_sdr_in_use(device, sdr_type) + if in_use_by: + return { + 'status': 'error', + 'message': f'SDR device {device} is in use by {in_use_by}. Stop {in_use_by} first or use a different device.' + } # Initialize lock if needed if mode not in self.locks: @@ -1097,10 +1101,15 @@ class ModeManager: if mode in self.data_snapshots: del self.data_snapshots[mode] - # Mode-specific cleanup - if mode == 'adsb': - self.adsb_aircraft.clear() - elif mode == 'wifi': + # Mode-specific cleanup + if mode == 'adsb': + self.adsb_aircraft.clear() + if 'adsb_mlat' in self.output_threads: + thread = self.output_threads['adsb_mlat'] + if thread and thread.is_alive(): + thread.join(timeout=1) + del self.output_threads['adsb_mlat'] + elif mode == 'wifi': self.wifi_networks.clear() self.wifi_clients.clear() elif mode == 'bluetooth': @@ -1311,14 +1320,20 @@ class ModeManager: """Start dump1090 ADS-B mode using Intercept's utilities.""" gain = params.get('gain', '40') device = params.get('device', '0') - bias_t = params.get('bias_t', False) - sdr_type_str = params.get('sdr_type', 'rtlsdr') - remote_sbs_host = params.get('remote_sbs_host') - remote_sbs_port = params.get('remote_sbs_port', 30003) + bias_t = params.get('bias_t', False) + sdr_type_str = params.get('sdr_type', 'rtlsdr') + remote_sbs_host = params.get('remote_sbs_host') + remote_sbs_port = params.get('remote_sbs_port', 30003) + mlat_sbs_host = params.get('mlat_sbs_host') + mlat_sbs_port = params.get('mlat_sbs_port', 30105) - # If remote SBS host provided, just connect to it - if remote_sbs_host: - return self._start_adsb_sbs_connection(remote_sbs_host, remote_sbs_port) + # If remote SBS host provided, just connect to it + if remote_sbs_host: + result = self._start_adsb_sbs_connection(remote_sbs_host, remote_sbs_port, source_tag='adsb', thread_name='adsb') + if mlat_sbs_host: + self._start_adsb_sbs_connection(mlat_sbs_host, mlat_sbs_port, source_tag='mlat', thread_name='adsb_mlat') + result['mlat_source'] = f'{mlat_sbs_host}:{mlat_sbs_port}' + return result # Check if dump1090 already running on port 30003 try: @@ -1326,9 +1341,13 @@ class ModeManager: sock.settimeout(1.0) result = sock.connect_ex(('localhost', 30003)) sock.close() - if result == 0: - logger.info("dump1090 already running, connecting to SBS port") - return self._start_adsb_sbs_connection('localhost', 30003) + if result == 0: + logger.info("dump1090 already running, connecting to SBS port") + result = self._start_adsb_sbs_connection('localhost', 30003, source_tag='adsb', thread_name='adsb') + if mlat_sbs_host: + self._start_adsb_sbs_connection(mlat_sbs_host, mlat_sbs_port, source_tag='mlat', thread_name='adsb_mlat') + result['mlat_source'] = f'{mlat_sbs_host}:{mlat_sbs_port}' + return result except Exception: pass @@ -1380,12 +1399,16 @@ class ModeManager: # Wait for dump1090 to start time.sleep(2) - if proc.poll() is not None: - stderr = proc.stderr.read().decode('utf-8', errors='ignore') - return {'status': 'error', 'message': f'dump1090 failed to start: {stderr[:200]}'} - - # Connect to SBS port - return self._start_adsb_sbs_connection('localhost', 30003) + if proc.poll() is not None: + stderr = proc.stderr.read().decode('utf-8', errors='ignore') + return {'status': 'error', 'message': f'dump1090 failed to start: {stderr[:200]}'} + + # Connect to SBS port + result = self._start_adsb_sbs_connection('localhost', 30003, source_tag='adsb', thread_name='adsb') + if mlat_sbs_host: + self._start_adsb_sbs_connection(mlat_sbs_host, mlat_sbs_port, source_tag='mlat', thread_name='adsb_mlat') + result['mlat_source'] = f'{mlat_sbs_host}:{mlat_sbs_port}' + return result except FileNotFoundError: return {'status': 'error', 'message': 'dump1090 not found'} @@ -1414,27 +1437,27 @@ class ModeManager: return path return None - def _start_adsb_sbs_connection(self, host: str, port: int) -> dict: - """Connect to SBS port and start parsing.""" - thread = threading.Thread( - target=self._adsb_sbs_reader, - args=(host, port), - daemon=True - ) - thread.start() - self.output_threads['adsb'] = thread - - return { - 'status': 'started', - 'mode': 'adsb', - 'sbs_source': f'{host}:{port}', + def _start_adsb_sbs_connection(self, host: str, port: int, *, source_tag: str = 'adsb', thread_name: str = 'adsb') -> dict: + """Connect to SBS port and start parsing.""" + thread = threading.Thread( + target=self._adsb_sbs_reader, + args=(host, port, source_tag), + daemon=True + ) + thread.start() + self.output_threads[thread_name] = thread + + return { + 'status': 'started', + 'mode': 'adsb', + 'sbs_source': f'{host}:{port}', 'gps_enabled': gps_manager.is_running } - def _adsb_sbs_reader(self, host: str, port: int): - """Read and parse SBS data from dump1090.""" - mode = 'adsb' - stop_event = self.stop_events.get(mode) + def _adsb_sbs_reader(self, host: str, port: int, source_tag: str = 'adsb'): + """Read and parse SBS data from dump1090.""" + mode = 'adsb' + stop_event = self.stop_events.get(mode) retry_count = 0 max_retries = 5 @@ -1443,8 +1466,8 @@ class ModeManager: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5.0) sock.connect((host, port)) - logger.info(f"Connected to SBS at {host}:{port}") - retry_count = 0 + logger.info(f"Connected to SBS at {host}:{port} ({source_tag})") + retry_count = 0 buffer = "" sock.settimeout(1.0) @@ -1458,7 +1481,7 @@ class ModeManager: while '\n' in buffer: line, buffer = buffer.split('\n', 1) - self._parse_sbs_line(line.strip()) + self._parse_sbs_line(line.strip(), source_tag) except socket.timeout: continue @@ -1475,10 +1498,10 @@ class ModeManager: logger.info("ADS-B SBS reader stopped") - def _parse_sbs_line(self, line: str): - """Parse SBS format line and update aircraft dict.""" - if not line: - return + def _parse_sbs_line(self, line: str, source_tag: str = 'adsb'): + """Parse SBS format line and update aircraft dict.""" + if not line: + return parts = line.split(',') if len(parts) < 11 or parts[0] != 'MSG': @@ -1503,12 +1526,14 @@ class ModeManager: if callsign: aircraft['callsign'] = callsign - elif msg_type == '3' and len(parts) > 15: - if parts[11]: - aircraft['altitude'] = int(float(parts[11])) - if parts[14] and parts[15]: - aircraft['lat'] = float(parts[14]) - aircraft['lon'] = float(parts[15]) + elif msg_type == '3' and len(parts) > 15: + if parts[11]: + aircraft['altitude'] = int(float(parts[11])) + if parts[14] and parts[15]: + aircraft['lat'] = float(parts[14]) + aircraft['lon'] = float(parts[15]) + if source_tag: + aircraft['position_source'] = source_tag elif msg_type == '4' and len(parts) > 16: if parts[12]: diff --git a/routes/adsb.py b/routes/adsb.py index 0239c4a..12bac43 100644 --- a/routes/adsb.py +++ b/routes/adsb.py @@ -35,6 +35,9 @@ from config import ( ADSB_DB_USER, ADSB_AUTO_START, ADSB_HISTORY_ENABLED, + ADSB_MLAT_ENABLED, + ADSB_MLAT_SBS_HOST, + ADSB_MLAT_SBS_PORT, SHARED_OBSERVER_LOCATION_ENABLED, ) from utils.logging import adsb_logger as logger @@ -71,7 +74,10 @@ adsb_last_message_time = None adsb_bytes_received = 0 adsb_lines_received = 0 adsb_active_device = None # Track which device index is being used +adsb_active_sdr_type = None _sbs_error_logged = False # Suppress repeated connection error logs +adsb_connected_sources: set[str] = set() +_adsb_connection_lock = threading.Lock() # Track ICAOs already looked up in aircraft database (avoid repeated lookups) _looked_up_icaos: set[str] = set() @@ -318,7 +324,29 @@ def check_dump1090_service(): return None -def parse_sbs_stream(service_addr): +def _reset_adsb_state() -> None: + global adsb_connected, adsb_messages_received, adsb_last_message_time, adsb_bytes_received, adsb_lines_received, _sbs_error_logged + adsb_connected = False + adsb_messages_received = 0 + adsb_last_message_time = None + adsb_bytes_received = 0 + adsb_lines_received = 0 + _sbs_error_logged = False + with _adsb_connection_lock: + adsb_connected_sources.clear() + + +def _set_adsb_connected(source_key: str, connected: bool) -> None: + global adsb_connected + with _adsb_connection_lock: + if connected: + adsb_connected_sources.add(source_key) + else: + adsb_connected_sources.discard(source_key) + adsb_connected = bool(adsb_connected_sources) + + +def parse_sbs_stream(service_addr: str, source_tag: str | None = None): """Parse SBS format data from dump1090 SBS port.""" global adsb_using_service, adsb_connected, adsb_messages_received, adsb_last_message_time, adsb_bytes_received, adsb_lines_received, _sbs_error_logged @@ -327,26 +355,23 @@ def parse_sbs_stream(service_addr): host, port = service_addr.split(':') port = int(port) + source_label = source_tag or 'adsb' - logger.info(f"SBS stream parser started, connecting to {host}:{port}") - adsb_connected = False - adsb_messages_received = 0 - _sbs_error_logged = False + logger.info(f"SBS stream parser started ({source_label}), connecting to {host}:{port}") while adsb_using_service: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(SBS_SOCKET_TIMEOUT) sock.connect((host, port)) - adsb_connected = True + _set_adsb_connected(service_addr, True) _sbs_error_logged = False # Reset so we log next error - logger.info("Connected to SBS stream") + logger.info(f"Connected to SBS stream ({source_label})") buffer = "" last_update = time.time() pending_updates = set() - adsb_bytes_received = 0 - adsb_lines_received = 0 + local_lines_received = 0 while adsb_using_service: try: @@ -364,13 +389,14 @@ def parse_sbs_stream(service_addr): continue adsb_lines_received += 1 + local_lines_received += 1 # Log first few lines for debugging - if adsb_lines_received <= 3: - logger.info(f"SBS line {adsb_lines_received}: {line[:100]}") + if local_lines_received <= 3: + logger.info(f"SBS line ({source_label}) {local_lines_received}: {line[:100]}") parts = line.split(',') if len(parts) < 11 or parts[0] != 'MSG': - if adsb_lines_received <= 5: + if local_lines_received <= 5: logger.debug(f"Skipping non-MSG line: {line[:50]}") continue @@ -421,6 +447,8 @@ def parse_sbs_stream(service_addr): try: aircraft['lat'] = float(parts[14]) aircraft['lon'] = float(parts[15]) + if source_label: + aircraft['position_source'] = source_label except (ValueError, TypeError): pass @@ -494,18 +522,26 @@ def parse_sbs_stream(service_addr): continue sock.close() - adsb_connected = False + _set_adsb_connected(service_addr, False) except OSError as e: - adsb_connected = False + _set_adsb_connected(service_addr, False) if not _sbs_error_logged: logger.warning(f"SBS connection error: {e}, reconnecting...") _sbs_error_logged = True time.sleep(SBS_RECONNECT_DELAY) - adsb_connected = False + _set_adsb_connected(service_addr, False) logger.info("SBS stream parser stopped") +def _start_mlat_stream(host: str, port: int) -> str: + mlat_addr = f"{host}:{port}" + logger.info(f"Connecting to MLAT SBS at {mlat_addr}") + thread = threading.Thread(target=parse_sbs_stream, args=(mlat_addr, 'mlat'), daemon=True) + thread.start() + return mlat_addr + + @adsb_bp.route('/tools') def check_adsb_tools(): """Check for ADS-B decoding tools and hardware.""" @@ -580,7 +616,7 @@ def adsb_session(): @adsb_bp.route('/start', methods=['POST']) def start_adsb(): """Start ADS-B tracking.""" - global adsb_using_service, adsb_active_device + global adsb_using_service, adsb_active_device, adsb_active_sdr_type with app_module.adsb_lock: if adsb_using_service: @@ -601,10 +637,22 @@ def start_adsb(): device = validate_device_index(data.get('device', '0')) except ValueError as e: return jsonify({'status': 'error', 'message': str(e)}), 400 + _reset_adsb_state() # Check for remote SBS connection (e.g., remote dump1090) remote_sbs_host = data.get('remote_sbs_host') remote_sbs_port = data.get('remote_sbs_port', 30003) + mlat_sbs_host = (data.get('mlat_sbs_host') or '').strip() + mlat_sbs_port = data.get('mlat_sbs_port', ADSB_MLAT_SBS_PORT) + if not mlat_sbs_host and ADSB_MLAT_ENABLED and ADSB_MLAT_SBS_HOST: + mlat_sbs_host = ADSB_MLAT_SBS_HOST + mlat_sbs_port = ADSB_MLAT_SBS_PORT + if mlat_sbs_host: + try: + mlat_sbs_host = validate_rtl_tcp_host(mlat_sbs_host) + mlat_sbs_port = validate_rtl_tcp_port(mlat_sbs_port) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 if remote_sbs_host: # Validate and connect to remote dump1090 SBS output @@ -617,8 +665,10 @@ def start_adsb(): remote_addr = f"{remote_sbs_host}:{remote_sbs_port}" logger.info(f"Connecting to remote dump1090 SBS at {remote_addr}") adsb_using_service = True - thread = threading.Thread(target=parse_sbs_stream, args=(remote_addr,), daemon=True) + thread = threading.Thread(target=parse_sbs_stream, args=(remote_addr, 'adsb'), daemon=True) thread.start() + if mlat_sbs_host: + _start_mlat_stream(mlat_sbs_host, mlat_sbs_port) session = _record_session_start( device_index=device, sdr_type='remote', @@ -638,8 +688,10 @@ def start_adsb(): if existing_service: logger.info(f"Found existing dump1090 service at {existing_service}") adsb_using_service = True - thread = threading.Thread(target=parse_sbs_stream, args=(existing_service,), daemon=True) + thread = threading.Thread(target=parse_sbs_stream, args=(existing_service, 'adsb'), daemon=True) thread.start() + if mlat_sbs_host: + _start_mlat_stream(mlat_sbs_host, mlat_sbs_port) session = _record_session_start( device_index=device, sdr_type='external', @@ -689,7 +741,7 @@ def start_adsb(): # Check if device is available before starting local dump1090 device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'adsb') + error = app_module.claim_sdr_device(device_int, 'adsb', sdr_type.value) if error: return jsonify({ 'status': 'error', @@ -726,7 +778,7 @@ def start_adsb(): if app_module.adsb_process.poll() is not None: # Process exited - release device and get error message - app_module.release_sdr_device(device_int) + app_module.release_sdr_device(device_int, sdr_type.value) stderr_output = '' if app_module.adsb_process.stderr: try: @@ -772,9 +824,12 @@ def start_adsb(): }) adsb_using_service = True - adsb_active_device = device # Track which device is being used - thread = threading.Thread(target=parse_sbs_stream, args=(f'localhost:{ADSB_SBS_PORT}',), daemon=True) + adsb_active_device = device # Track which device index is being used + adsb_active_sdr_type = sdr_type.value + thread = threading.Thread(target=parse_sbs_stream, args=(f'localhost:{ADSB_SBS_PORT}', 'adsb'), daemon=True) thread.start() + if mlat_sbs_host: + _start_mlat_stream(mlat_sbs_host, mlat_sbs_port) session = _record_session_start( device_index=device, @@ -792,14 +847,14 @@ def start_adsb(): }) except Exception as e: # Release device on failure - app_module.release_sdr_device(device_int) + app_module.release_sdr_device(device_int, sdr_type.value) return jsonify({'status': 'error', 'message': str(e)}) @adsb_bp.route('/stop', methods=['POST']) def stop_adsb(): """Stop ADS-B tracking.""" - global adsb_using_service, adsb_active_device + global adsb_using_service, adsb_active_device, adsb_active_sdr_type data = request.json or {} stop_source = data.get('source') stopped_by = request.remote_addr @@ -823,10 +878,12 @@ def stop_adsb(): # Release device from registry if adsb_active_device is not None: - app_module.release_sdr_device(adsb_active_device) + app_module.release_sdr_device(adsb_active_device, adsb_active_sdr_type or 'rtlsdr') adsb_using_service = False adsb_active_device = None + adsb_active_sdr_type = None + _reset_adsb_state() app_module.adsb_aircraft.clear() _looked_up_icaos.clear() @@ -868,6 +925,7 @@ def adsb_dashboard(): 'adsb_dashboard.html', shared_observer_location=SHARED_OBSERVER_LOCATION_ENABLED, adsb_auto_start=ADSB_AUTO_START, + adsb_mlat_enabled=ADSB_MLAT_ENABLED, ) diff --git a/routes/ais.py b/routes/ais.py index b481fd8..6260de6 100644 --- a/routes/ais.py +++ b/routes/ais.py @@ -44,6 +44,7 @@ ais_connected = False ais_messages_received = 0 ais_last_message_time = None ais_active_device = None +ais_active_sdr_type = None _ais_error_logged = True # Common installation paths for AIS-catcher @@ -326,7 +327,7 @@ def ais_status(): @ais_bp.route('/start', methods=['POST']) def start_ais(): """Start AIS tracking.""" - global ais_running, ais_active_device + global ais_running, ais_active_device, ais_active_sdr_type with app_module.ais_lock: if ais_running: @@ -373,7 +374,7 @@ def start_ais(): # Check if device is available device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'ais') + error = app_module.claim_sdr_device(device_int, 'ais', sdr_type.value) if error: return jsonify({ 'status': 'error', @@ -412,7 +413,7 @@ def start_ais(): if app_module.ais_process.poll() is not None: # Release device on failure - app_module.release_sdr_device(device_int) + app_module.release_sdr_device(device_int, sdr_type.value) stderr_output = '' if app_module.ais_process.stderr: try: @@ -426,6 +427,7 @@ def start_ais(): ais_running = True ais_active_device = device + ais_active_sdr_type = sdr_type.value # Start TCP parser thread thread = threading.Thread(target=parse_ais_stream, args=(tcp_port,), daemon=True) @@ -439,7 +441,7 @@ def start_ais(): }) except Exception as e: # Release device on failure - app_module.release_sdr_device(device_int) + app_module.release_sdr_device(device_int, sdr_type.value) logger.error(f"Failed to start AIS-catcher: {e}") return jsonify({'status': 'error', 'message': str(e)}), 500 @@ -466,10 +468,11 @@ def stop_ais(): # Release device from registry if ais_active_device is not None: - app_module.release_sdr_device(ais_active_device) + app_module.release_sdr_device(ais_active_device, ais_active_sdr_type or 'rtlsdr') ais_running = False ais_active_device = None + ais_active_sdr_type = None app_module.ais_vessels.clear() return jsonify({'status': 'stopped'}) diff --git a/routes/listening_post.py b/routes/listening_post.py index cb43aec..b1fd051 100644 --- a/routes/listening_post.py +++ b/routes/listening_post.py @@ -46,13 +46,15 @@ audio_modulation = 'fm' # Scanner state scanner_thread: Optional[threading.Thread] = None -scanner_running = False -scanner_lock = threading.Lock() -scanner_paused = False -scanner_current_freq = 0.0 -scanner_active_device: Optional[int] = None -listening_active_device: Optional[int] = None -scanner_power_process: Optional[subprocess.Popen] = None +scanner_running = False +scanner_lock = threading.Lock() +scanner_paused = False +scanner_current_freq = 0.0 +scanner_active_device: Optional[int] = None +scanner_active_sdr_type: Optional[str] = None +listening_active_device: Optional[int] = None +listening_active_sdr_type: Optional[str] = None +scanner_power_process: Optional[subprocess.Popen] = None scanner_config = { 'start_freq': 88.0, 'end_freq': 108.0, @@ -936,7 +938,8 @@ def check_tools() -> Response: @listening_post_bp.route('/scanner/start', methods=['POST']) def start_scanner() -> Response: """Start the frequency scanner.""" - global scanner_thread, scanner_running, scanner_config, scanner_active_device, listening_active_device + global scanner_thread, scanner_running, scanner_config, scanner_active_device, listening_active_device + global scanner_active_sdr_type, listening_active_sdr_type with scanner_lock: if scanner_running: @@ -1002,21 +1005,23 @@ def start_scanner() -> Response: 'message': 'rtl_power not found. Install rtl-sdr tools.' }), 503 # Release listening device if active - if listening_active_device is not None: - app_module.release_sdr_device(listening_active_device) - listening_active_device = None - # Claim device for scanner - error = app_module.claim_sdr_device(scanner_config['device'], 'scanner') - if error: - return jsonify({ - 'status': 'error', - 'error_type': 'DEVICE_BUSY', - 'message': error - }), 409 - scanner_active_device = scanner_config['device'] - scanner_running = True - scanner_thread = threading.Thread(target=scanner_loop_power, daemon=True) - scanner_thread.start() + if listening_active_device is not None: + app_module.release_sdr_device(listening_active_device, listening_active_sdr_type or 'rtlsdr') + listening_active_device = None + listening_active_sdr_type = None + # Claim device for scanner + error = app_module.claim_sdr_device(scanner_config['device'], 'scanner', sdr_type) + if error: + return jsonify({ + 'status': 'error', + 'error_type': 'DEVICE_BUSY', + 'message': error + }), 409 + scanner_active_device = scanner_config['device'] + scanner_active_sdr_type = sdr_type + scanner_running = True + scanner_thread = threading.Thread(target=scanner_loop_power, daemon=True) + scanner_thread.start() else: if sdr_type == 'rtlsdr': if not find_rtl_fm(): @@ -1030,17 +1035,19 @@ def start_scanner() -> Response: 'status': 'error', 'message': f'rx_fm not found. Install SoapySDR utilities for {sdr_type}.' }), 503 - if listening_active_device is not None: - app_module.release_sdr_device(listening_active_device) - listening_active_device = None - error = app_module.claim_sdr_device(scanner_config['device'], 'scanner') - if error: - return jsonify({ - 'status': 'error', - 'error_type': 'DEVICE_BUSY', - 'message': error - }), 409 - scanner_active_device = scanner_config['device'] + if listening_active_device is not None: + app_module.release_sdr_device(listening_active_device, listening_active_sdr_type or 'rtlsdr') + listening_active_device = None + listening_active_sdr_type = None + error = app_module.claim_sdr_device(scanner_config['device'], 'scanner', sdr_type) + if error: + return jsonify({ + 'status': 'error', + 'error_type': 'DEVICE_BUSY', + 'message': error + }), 409 + scanner_active_device = scanner_config['device'] + scanner_active_sdr_type = sdr_type scanner_running = True scanner_thread = threading.Thread(target=scanner_loop, daemon=True) @@ -1053,9 +1060,9 @@ def start_scanner() -> Response: @listening_post_bp.route('/scanner/stop', methods=['POST']) -def stop_scanner() -> Response: - """Stop the frequency scanner.""" - global scanner_running, scanner_active_device, scanner_power_process +def stop_scanner() -> Response: + """Stop the frequency scanner.""" + global scanner_running, scanner_active_device, scanner_power_process, scanner_active_sdr_type scanner_running = False _stop_audio_stream() @@ -1069,9 +1076,10 @@ def stop_scanner() -> Response: except Exception: pass scanner_power_process = None - if scanner_active_device is not None: - app_module.release_sdr_device(scanner_active_device) - scanner_active_device = None + if scanner_active_device is not None: + app_module.release_sdr_device(scanner_active_device, scanner_active_sdr_type or 'rtlsdr') + scanner_active_device = None + scanner_active_sdr_type = None return jsonify({'status': 'stopped'}) @@ -1242,14 +1250,16 @@ def get_presets() -> Response: @listening_post_bp.route('/audio/start', methods=['POST']) def start_audio() -> Response: """Start audio at specific frequency (manual mode).""" - global scanner_running, scanner_active_device, listening_active_device, scanner_power_process, scanner_thread + global scanner_running, scanner_active_device, listening_active_device, scanner_power_process, scanner_thread + global scanner_active_sdr_type, listening_active_sdr_type, waterfall_active_sdr_type # Stop scanner if running if scanner_running: scanner_running = False - if scanner_active_device is not None: - app_module.release_sdr_device(scanner_active_device) - scanner_active_device = None + if scanner_active_device is not None: + app_module.release_sdr_device(scanner_active_device, scanner_active_sdr_type or 'rtlsdr') + scanner_active_device = None + scanner_active_sdr_type = None if scanner_thread and scanner_thread.is_alive(): try: scanner_thread.join(timeout=2.0) @@ -1306,18 +1316,19 @@ def start_audio() -> Response: scanner_config['sdr_type'] = sdr_type # Stop waterfall if it's using the same SDR (SSE path) - if waterfall_running and waterfall_active_device == device: - _stop_waterfall_internal() - time.sleep(0.2) + if waterfall_running and waterfall_active_device == device and waterfall_active_sdr_type == sdr_type: + _stop_waterfall_internal() + time.sleep(0.2) # Claim device for listening audio. The WebSocket waterfall handler # may still be tearing down its IQ capture process (thread join + # safe_terminate can take several seconds), so we retry with back-off # to give the USB device time to be fully released. if listening_active_device is None or listening_active_device != device: - if listening_active_device is not None: - app_module.release_sdr_device(listening_active_device) - listening_active_device = None + if listening_active_device is not None: + app_module.release_sdr_device(listening_active_device, listening_active_sdr_type or 'rtlsdr') + listening_active_device = None + listening_active_sdr_type = None error = None max_claim_attempts = 6 @@ -1325,13 +1336,13 @@ def start_audio() -> Response: # Force-release a stale waterfall registry entry on each # attempt — the WebSocket handler may not have finished # cleanup yet. - device_status = app_module.get_sdr_device_status() - if device_status.get(device) == 'waterfall': - app_module.release_sdr_device(device) - - error = app_module.claim_sdr_device(device, 'listening') - if not error: - break + device_status = app_module.get_sdr_device_status() + if device_status.get(device, {}).get(sdr_type) == 'waterfall': + app_module.release_sdr_device(device, sdr_type) + + error = app_module.claim_sdr_device(device, 'listening', sdr_type) + if not error: + break if attempt < max_claim_attempts - 1: logger.debug( f"Device claim attempt {attempt + 1}/{max_claim_attempts} " @@ -1345,7 +1356,8 @@ def start_audio() -> Response: 'error_type': 'DEVICE_BUSY', 'message': error }), 409 - listening_active_device = device + listening_active_device = device + listening_active_sdr_type = sdr_type _start_audio_stream(frequency, modulation) @@ -1363,14 +1375,15 @@ def start_audio() -> Response: @listening_post_bp.route('/audio/stop', methods=['POST']) -def stop_audio() -> Response: - """Stop audio.""" - global listening_active_device - _stop_audio_stream() - if listening_active_device is not None: - app_module.release_sdr_device(listening_active_device) - listening_active_device = None - return jsonify({'status': 'stopped'}) +def stop_audio() -> Response: + """Stop audio.""" + global listening_active_device, listening_active_sdr_type + _stop_audio_stream() + if listening_active_device is not None: + app_module.release_sdr_device(listening_active_device, listening_active_sdr_type or 'rtlsdr') + listening_active_device = None + listening_active_sdr_type = None + return jsonify({'status': 'stopped'}) @listening_post_bp.route('/audio/status') @@ -1547,9 +1560,10 @@ waterfall_process: Optional[subprocess.Popen] = None waterfall_thread: Optional[threading.Thread] = None waterfall_running = False waterfall_lock = threading.Lock() -waterfall_queue: queue.Queue = queue.Queue(maxsize=200) -waterfall_active_device: Optional[int] = None -waterfall_config = { +waterfall_queue: queue.Queue = queue.Queue(maxsize=200) +waterfall_active_device: Optional[int] = None +waterfall_active_sdr_type: Optional[str] = None +waterfall_config = { 'start_freq': 88.0, 'end_freq': 108.0, 'bin_size': 10000, @@ -1723,9 +1737,9 @@ def _waterfall_loop(): logger.info("Waterfall loop stopped") -def _stop_waterfall_internal() -> None: - """Stop the waterfall display and release resources.""" - global waterfall_running, waterfall_process, waterfall_active_device +def _stop_waterfall_internal() -> None: + """Stop the waterfall display and release resources.""" + global waterfall_running, waterfall_process, waterfall_active_device, waterfall_active_sdr_type waterfall_running = False if waterfall_process and waterfall_process.poll() is None: @@ -1739,15 +1753,16 @@ def _stop_waterfall_internal() -> None: pass waterfall_process = None - if waterfall_active_device is not None: - app_module.release_sdr_device(waterfall_active_device) - waterfall_active_device = None + if waterfall_active_device is not None: + app_module.release_sdr_device(waterfall_active_device, waterfall_active_sdr_type or 'rtlsdr') + waterfall_active_device = None + waterfall_active_sdr_type = None @listening_post_bp.route('/waterfall/start', methods=['POST']) -def start_waterfall() -> Response: - """Start the waterfall/spectrogram display.""" - global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device +def start_waterfall() -> Response: + """Start the waterfall/spectrogram display.""" + global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device, waterfall_active_sdr_type with waterfall_lock: if waterfall_running: @@ -1788,11 +1803,12 @@ def start_waterfall() -> Response: pass # Claim SDR device - error = app_module.claim_sdr_device(waterfall_config['device'], 'waterfall') - if error: - return jsonify({'status': 'error', 'error_type': 'DEVICE_BUSY', 'message': error}), 409 - - waterfall_active_device = waterfall_config['device'] + error = app_module.claim_sdr_device(waterfall_config['device'], 'waterfall', 'rtlsdr') + if error: + return jsonify({'status': 'error', 'error_type': 'DEVICE_BUSY', 'message': error}), 409 + + waterfall_active_device = waterfall_config['device'] + waterfall_active_sdr_type = 'rtlsdr' waterfall_running = True waterfall_thread = threading.Thread(target=_waterfall_loop, daemon=True) waterfall_thread.start() diff --git a/routes/pager.py b/routes/pager.py index 3253a6c..8a3dfa1 100644 --- a/routes/pager.py +++ b/routes/pager.py @@ -32,8 +32,9 @@ from utils.dependencies import get_tool_path pager_bp = Blueprint('pager', __name__) -# Track which device is being used -pager_active_device: int | None = None +# Track which device is being used +pager_active_device: int | None = None +pager_active_sdr_type: str | None = None def parse_multimon_output(line: str) -> dict[str, str] | None: @@ -205,7 +206,7 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.output_queue.put({'type': 'error', 'text': str(e)}) finally: - global pager_active_device + global pager_active_device, pager_active_sdr_type try: os.close(master_fd) except OSError: @@ -233,14 +234,15 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: with app_module.process_lock: app_module.current_process = None # Release SDR device - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr') + pager_active_device = None + pager_active_sdr_type = None @pager_bp.route('/start', methods=['POST']) -def start_decoding() -> Response: - global pager_active_device +def start_decoding() -> Response: + global pager_active_device, pager_active_sdr_type with app_module.process_lock: if app_module.current_process: @@ -262,33 +264,42 @@ def start_decoding() -> Response: squelch = int(squelch) if not 0 <= squelch <= 1000: raise ValueError("Squelch must be between 0 and 1000") - except (ValueError, TypeError): - return jsonify({'status': 'error', 'message': 'Invalid squelch value'}), 400 - - # Check for rtl_tcp (remote SDR) connection - rtl_tcp_host = data.get('rtl_tcp_host') - rtl_tcp_port = data.get('rtl_tcp_port', 1234) - - # Claim local device if not using remote rtl_tcp - if not rtl_tcp_host: - device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'pager') - if error: - return jsonify({ - 'status': 'error', - 'error_type': 'DEVICE_BUSY', - 'message': error - }), 409 - pager_active_device = device_int + except (ValueError, TypeError): + return jsonify({'status': 'error', 'message': 'Invalid squelch value'}), 400 + + # Get SDR type and build command via abstraction layer + sdr_type_str = data.get('sdr_type', 'rtlsdr') + try: + sdr_type = SDRType(sdr_type_str) + except ValueError: + sdr_type = SDRType.RTL_SDR + + # Check for rtl_tcp (remote SDR) connection + rtl_tcp_host = data.get('rtl_tcp_host') + rtl_tcp_port = data.get('rtl_tcp_port', 1234) + + # Claim local device if not using remote rtl_tcp + if not rtl_tcp_host: + device_int = int(device) + error = app_module.claim_sdr_device(device_int, 'pager', sdr_type.value) + if error: + return jsonify({ + 'status': 'error', + 'error_type': 'DEVICE_BUSY', + 'message': error + }), 409 + pager_active_device = device_int + pager_active_sdr_type = sdr_type.value # Validate protocols valid_protocols = ['POCSAG512', 'POCSAG1200', 'POCSAG2400', 'FLEX'] protocols = data.get('protocols', valid_protocols) - if not isinstance(protocols, list): - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None - return jsonify({'status': 'error', 'message': 'Protocols must be a list'}), 400 + if not isinstance(protocols, list): + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr') + pager_active_device = None + pager_active_sdr_type = None + return jsonify({'status': 'error', 'message': 'Protocols must be a list'}), 400 protocols = [p for p in protocols if p in valid_protocols] if not protocols: protocols = valid_protocols @@ -312,14 +323,7 @@ def start_decoding() -> Response: elif proto == 'FLEX': decoders.extend(['-a', 'FLEX']) - # Get SDR type and build command via abstraction layer - sdr_type_str = data.get('sdr_type', 'rtlsdr') - try: - sdr_type = SDRType(sdr_type_str) - except ValueError: - sdr_type = SDRType.RTL_SDR - - if rtl_tcp_host: + if rtl_tcp_host: # Validate and create network device try: rtl_tcp_host = validate_rtl_tcp_host(rtl_tcp_host) @@ -416,22 +420,23 @@ def start_decoding() -> Response: return jsonify({'status': 'started', 'command': full_cmd}) - except FileNotFoundError as e: - # Kill orphaned rtl_fm process - try: - rtl_process.terminate() - rtl_process.wait(timeout=2) + except FileNotFoundError as e: + # Kill orphaned rtl_fm process + try: + rtl_process.terminate() + rtl_process.wait(timeout=2) except Exception: try: rtl_process.kill() except Exception: pass - # Release device on failure - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None - return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) - except Exception as e: + # Release device on failure + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr') + pager_active_device = None + pager_active_sdr_type = None + return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) + except Exception as e: # Kill orphaned rtl_fm process if it was started try: rtl_process.terminate() @@ -441,16 +446,17 @@ def start_decoding() -> Response: rtl_process.kill() except Exception: pass - # Release device on failure - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None - return jsonify({'status': 'error', 'message': str(e)}) + # Release device on failure + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr') + pager_active_device = None + pager_active_sdr_type = None + return jsonify({'status': 'error', 'message': str(e)}) @pager_bp.route('/stop', methods=['POST']) -def stop_decoding() -> Response: - global pager_active_device +def stop_decoding() -> Response: + global pager_active_device, pager_active_sdr_type with app_module.process_lock: if app_module.current_process: @@ -485,10 +491,11 @@ def stop_decoding() -> Response: app_module.current_process = None - # Release device from registry - if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) - pager_active_device = None + # Release device from registry + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr') + pager_active_device = None + pager_active_sdr_type = None return jsonify({'status': 'stopped'}) diff --git a/routes/sensor.py b/routes/sensor.py index e2110fb..3ad7d7f 100644 --- a/routes/sensor.py +++ b/routes/sensor.py @@ -25,8 +25,9 @@ from utils.sdr import SDRFactory, SDRType sensor_bp = Blueprint('sensor', __name__) -# Track which device is being used -sensor_active_device: int | None = None +# Track which device is being used +sensor_active_device: int | None = None +sensor_active_sdr_type: str | None = None def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: @@ -75,7 +76,7 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.sensor_queue.put({'type': 'error', 'text': str(e)}) finally: - global sensor_active_device + global sensor_active_device, sensor_active_sdr_type # Ensure process is terminated try: process.terminate() @@ -90,9 +91,10 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: with app_module.sensor_lock: app_module.sensor_process = None # Release SDR device - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr') + sensor_active_device = None + sensor_active_sdr_type = None @sensor_bp.route('/sensor/status') @@ -104,8 +106,8 @@ def sensor_status() -> Response: @sensor_bp.route('/start_sensor', methods=['POST']) -def start_sensor() -> Response: - global sensor_active_device +def start_sensor() -> Response: + global sensor_active_device, sensor_active_sdr_type with app_module.sensor_lock: if app_module.sensor_process: @@ -122,21 +124,29 @@ def start_sensor() -> Response: except ValueError as e: return jsonify({'status': 'error', 'message': str(e)}), 400 - # Check for rtl_tcp (remote SDR) connection - rtl_tcp_host = data.get('rtl_tcp_host') - rtl_tcp_port = data.get('rtl_tcp_port', 1234) + # Get SDR type and build command via abstraction layer + sdr_type_str = data.get('sdr_type', 'rtlsdr') + try: + sdr_type = SDRType(sdr_type_str) + except ValueError: + sdr_type = SDRType.RTL_SDR + + # Check for rtl_tcp (remote SDR) connection + rtl_tcp_host = data.get('rtl_tcp_host') + rtl_tcp_port = data.get('rtl_tcp_port', 1234) # Claim local device if not using remote rtl_tcp if not rtl_tcp_host: device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'sensor') - if error: - return jsonify({ - 'status': 'error', - 'error_type': 'DEVICE_BUSY', - 'message': error - }), 409 - sensor_active_device = device_int + error = app_module.claim_sdr_device(device_int, 'sensor', sdr_type.value) + if error: + return jsonify({ + 'status': 'error', + 'error_type': 'DEVICE_BUSY', + 'message': error + }), 409 + sensor_active_device = device_int + sensor_active_sdr_type = sdr_type.value # Clear queue while not app_module.sensor_queue.empty(): @@ -145,14 +155,7 @@ def start_sensor() -> Response: except queue.Empty: break - # Get SDR type and build command via abstraction layer - sdr_type_str = data.get('sdr_type', 'rtlsdr') - try: - sdr_type = SDRType(sdr_type_str) - except ValueError: - sdr_type = SDRType.RTL_SDR - - if rtl_tcp_host: + if rtl_tcp_host: # Validate and create network device try: rtl_tcp_host = validate_rtl_tcp_host(rtl_tcp_host) @@ -214,23 +217,25 @@ def start_sensor() -> Response: return jsonify({'status': 'started', 'command': full_cmd}) - except FileNotFoundError: - # Release device on failure - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'}) - except Exception as e: - # Release device on failure - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - return jsonify({'status': 'error', 'message': str(e)}) + except FileNotFoundError: + # Release device on failure + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr') + sensor_active_device = None + sensor_active_sdr_type = None + return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'}) + except Exception as e: + # Release device on failure + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr') + sensor_active_device = None + sensor_active_sdr_type = None + return jsonify({'status': 'error', 'message': str(e)}) @sensor_bp.route('/stop_sensor', methods=['POST']) -def stop_sensor() -> Response: - global sensor_active_device +def stop_sensor() -> Response: + global sensor_active_device, sensor_active_sdr_type with app_module.sensor_lock: if app_module.sensor_process: @@ -242,9 +247,10 @@ def stop_sensor() -> Response: app_module.sensor_process = None # Release device from registry - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr') + sensor_active_device = None + sensor_active_sdr_type = None return jsonify({'status': 'stopped'}) diff --git a/routes/waterfall_websocket.py b/routes/waterfall_websocket.py index 5512d6f..346a3a0 100644 --- a/routes/waterfall_websocket.py +++ b/routes/waterfall_websocket.py @@ -83,10 +83,11 @@ def init_waterfall_websocket(app: Flask): # Import app module for device claiming import app as app_module - iq_process = None - reader_thread = None - stop_event = threading.Event() - claimed_device = None + iq_process = None + reader_thread = None + stop_event = threading.Event() + claimed_device = None + claimed_sdr_type = None # Queue for outgoing messages — only the main loop touches ws.send() send_queue = queue.Queue(maxsize=120) @@ -136,13 +137,14 @@ def init_waterfall_websocket(app: Flask): stop_event.set() if reader_thread and reader_thread.is_alive(): reader_thread.join(timeout=2) - if iq_process: - safe_terminate(iq_process) - unregister_process(iq_process) - iq_process = None - if claimed_device is not None: - app_module.release_sdr_device(claimed_device) - claimed_device = None + if iq_process: + safe_terminate(iq_process) + unregister_process(iq_process) + iq_process = None + if claimed_device is not None: + app_module.release_sdr_device(claimed_device, claimed_sdr_type or 'rtlsdr') + claimed_device = None + claimed_sdr_type = None stop_event.clear() # Flush stale frames from previous capture while not send_queue.empty(): @@ -185,15 +187,16 @@ def init_waterfall_websocket(app: Flask): end_freq = center_freq + effective_span_mhz / 2 # Claim the device - claim_err = app_module.claim_sdr_device(device_index, 'waterfall') - if claim_err: - ws.send(json.dumps({ - 'status': 'error', + claim_err = app_module.claim_sdr_device(device_index, 'waterfall', sdr_type.value) + if claim_err: + ws.send(json.dumps({ + 'status': 'error', 'message': claim_err, 'error_type': 'DEVICE_BUSY', })) continue - claimed_device = device_index + claimed_device = device_index + claimed_sdr_type = sdr_type.value # Build I/Q capture command try: @@ -207,11 +210,12 @@ def init_waterfall_websocket(app: Flask): ppm=ppm, bias_t=bias_t, ) - except NotImplementedError as e: - app_module.release_sdr_device(device_index) - claimed_device = None - ws.send(json.dumps({ - 'status': 'error', + except NotImplementedError as e: + app_module.release_sdr_device(device_index, sdr_type.value) + claimed_device = None + claimed_sdr_type = None + ws.send(json.dumps({ + 'status': 'error', 'message': str(e), })) continue @@ -255,10 +259,11 @@ def init_waterfall_websocket(app: Flask): safe_terminate(iq_process) unregister_process(iq_process) iq_process = None - app_module.release_sdr_device(device_index) - claimed_device = None - ws.send(json.dumps({ - 'status': 'error', + app_module.release_sdr_device(device_index, sdr_type.value) + claimed_device = None + claimed_sdr_type = None + ws.send(json.dumps({ + 'status': 'error', 'message': f'Failed to start I/Q capture: {e}', })) continue @@ -345,15 +350,16 @@ def init_waterfall_websocket(app: Flask): if reader_thread and reader_thread.is_alive(): reader_thread.join(timeout=2) reader_thread = None - if iq_process: - safe_terminate(iq_process) - unregister_process(iq_process) - iq_process = None - if claimed_device is not None: - app_module.release_sdr_device(claimed_device) - claimed_device = None - stop_event.clear() - ws.send(json.dumps({'status': 'stopped'})) + if iq_process: + safe_terminate(iq_process) + unregister_process(iq_process) + iq_process = None + if claimed_device is not None: + app_module.release_sdr_device(claimed_device, claimed_sdr_type or 'rtlsdr') + claimed_device = None + claimed_sdr_type = None + stop_event.clear() + ws.send(json.dumps({'status': 'stopped'})) except Exception as e: logger.info(f"WebSocket waterfall closed: {e}") @@ -362,11 +368,12 @@ def init_waterfall_websocket(app: Flask): stop_event.set() if reader_thread and reader_thread.is_alive(): reader_thread.join(timeout=2) - if iq_process: - safe_terminate(iq_process) - unregister_process(iq_process) - if claimed_device is not None: - app_module.release_sdr_device(claimed_device) + if iq_process: + safe_terminate(iq_process) + unregister_process(iq_process) + if claimed_device is not None: + app_module.release_sdr_device(claimed_device, claimed_sdr_type or 'rtlsdr') + claimed_sdr_type = None # Complete WebSocket close handshake, then shut down the # raw socket so Werkzeug cannot write its HTTP 200 response # on top of the WebSocket stream (which browsers see as diff --git a/templates/ais_dashboard.html b/templates/ais_dashboard.html index 492ac68..6fafce0 100644 --- a/templates/ais_dashboard.html +++ b/templates/ais_dashboard.html @@ -449,7 +449,10 @@ devices.forEach((d, i) => { const opt = document.createElement('option'); opt.value = d.index; - opt.textContent = `SDR ${d.index}: ${d.name}`; + const sdrType = (d.sdr_type || d.type || 'rtlsdr').toLowerCase(); + const sdrLabel = sdrType.toUpperCase(); + opt.dataset.sdrType = sdrType; + opt.textContent = `SDR ${d.index} (${sdrLabel}): ${d.name}`; aisSelect.appendChild(opt); }); } @@ -457,18 +460,23 @@ // Populate DSC device selector const dscSelect = document.getElementById('dscDeviceSelect'); dscSelect.innerHTML = ''; - if (devices.length === 0) { - dscSelect.innerHTML = ''; + const dscDevices = devices.filter(d => { + const sdrType = (d.sdr_type || d.type || 'rtlsdr').toLowerCase(); + return sdrType === 'rtlsdr'; + }); + if (dscDevices.length === 0) { + dscSelect.innerHTML = ''; } else { - devices.forEach((d, i) => { + dscDevices.forEach((d, i) => { const opt = document.createElement('option'); opt.value = d.index; - opt.textContent = `SDR ${d.index}: ${d.name}`; + opt.dataset.sdrType = 'rtlsdr'; + opt.textContent = `SDR ${d.index} (RTLSDR): ${d.name}`; dscSelect.appendChild(opt); }); // Default to second device if available - if (devices.length > 1) { - dscSelect.value = devices[1].index; + if (dscDevices.length > 1) { + dscSelect.value = dscDevices[1].index; } } }) @@ -546,7 +554,9 @@ } function startTracking() { - const device = document.getElementById('aisDeviceSelect').value; + const aisSelect = document.getElementById('aisDeviceSelect'); + const device = aisSelect.value; + const sdrType = (aisSelect.selectedOptions[0]?.dataset?.sdrType || 'rtlsdr').toLowerCase(); const gain = document.getElementById('aisGain').value; // Check if using agent mode @@ -561,7 +571,7 @@ fetch(`/controller/agents/${aisCurrentAgent}/ais/start`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ device, gain, bias_t: getBiasTEnabled() }) + body: JSON.stringify({ device, gain, bias_t: getBiasTEnabled(), sdr_type: sdrType }) }) .then(r => r.json()) .then(result => { @@ -586,7 +596,7 @@ fetch('/ais/start', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ device, gain, bias_t: getBiasTEnabled() }) + body: JSON.stringify({ device, gain, bias_t: getBiasTEnabled(), sdr_type: sdrType }) }) .then(r => r.json()) .then(data => { @@ -1170,7 +1180,9 @@ } function startDscTracking() { - const device = document.getElementById('dscDeviceSelect').value; + const dscSelect = document.getElementById('dscDeviceSelect'); + const device = dscSelect.value; + const sdrType = (dscSelect.selectedOptions[0]?.dataset?.sdrType || 'rtlsdr').toLowerCase(); const gain = document.getElementById('dscGain').value; // Check if using agent mode @@ -1185,7 +1197,7 @@ fetch(endpoint, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ device, gain }) + body: JSON.stringify({ device, gain, sdr_type: sdrType }) }) .then(r => r.json()) .then(data => { @@ -1617,21 +1629,32 @@ const aisSelect = document.getElementById('aisDeviceSelect'); const dscSelect = document.getElementById('dscDeviceSelect'); - [aisSelect, dscSelect].forEach(select => { + const aisDevices = devices || []; + const dscDevices = aisDevices.filter(device => { + const sdrType = (device.sdr_type || device.type || 'rtlsdr').toLowerCase(); + return sdrType === 'rtlsdr'; + }); + + const fillSelect = (select, list, emptyLabel) => { if (!select) return; select.innerHTML = ''; - - if (devices.length === 0) { - select.innerHTML = ''; - } else { - devices.forEach(device => { - const opt = document.createElement('option'); - opt.value = device.index; - opt.textContent = `Device ${device.index}: ${device.name || device.type || 'SDR'}`; - select.appendChild(opt); - }); + if (list.length === 0) { + select.innerHTML = ``; + return; } - }); + list.forEach(device => { + const opt = document.createElement('option'); + const sdrType = (device.sdr_type || device.type || 'rtlsdr').toLowerCase(); + const sdrLabel = sdrType.toUpperCase(); + opt.value = device.index; + opt.dataset.sdrType = sdrType; + opt.textContent = `Device ${device.index} (${sdrLabel}): ${device.name || device.type || 'SDR'}`; + select.appendChild(opt); + }); + }; + + fillSelect(aisSelect, aisDevices, 'No SDR found'); + fillSelect(dscSelect, dscDevices, 'No RTL-SDR found'); } // Override startTracking for agent support @@ -1645,13 +1668,15 @@ return; } - const device = document.getElementById('aisDeviceSelect').value; + const aisSelect = document.getElementById('aisDeviceSelect'); + const device = aisSelect.value; + const sdrType = (aisSelect.selectedOptions[0]?.dataset?.sdrType || 'rtlsdr').toLowerCase(); const gain = document.getElementById('aisGain').value; fetch(`/controller/agents/${aisCurrentAgent}/ais/start`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ device, gain, bias_t: getBiasTEnabled() }) + body: JSON.stringify({ device, gain, bias_t: getBiasTEnabled(), sdr_type: sdrType }) }) .then(r => r.json()) .then(data => {