From 5aa68a49c65558fb2805a95ab7dc9c76d4134460 Mon Sep 17 00:00:00 2001 From: Smittix Date: Fri, 27 Feb 2026 09:06:41 +0000 Subject: [PATCH] fix: SDR device registry collision with multiple SDR types The registry used plain int keys (device index), so HackRF at index 0 and RTL-SDR at index 0 would collide. Changed to composite string keys ("sdr_type:index") so each SDR type+index pair is tracked independently. Updated all route callers, frontend device selectors, and session restore. Co-Authored-By: Claude Opus 4.6 --- app.py | 46 +-- routes/acars.py | 67 +++-- routes/adsb.py | 15 +- routes/ais.py | 15 +- routes/aprs.py | 28 +- routes/dsc.py | 25 +- routes/listening_post.py | 54 ++-- routes/morse.py | 28 +- routes/pager.py | 159 ++++++----- routes/sensor.py | 522 +++++++++++++++++----------------- routes/vdl2.py | 188 ++++++------ routes/waterfall_websocket.py | 21 +- routes/wefax.py | 18 +- templates/adsb_dashboard.html | 80 ++++-- templates/index.html | 31 +- tests/test_morse.py | 12 +- tests/test_wefax.py | 384 ++++++++++++------------- 17 files changed, 907 insertions(+), 786 deletions(-) diff --git a/app.py b/app.py index 1584e83..b74410b 100644 --- a/app.py +++ b/app.py @@ -257,12 +257,12 @@ cleanup_manager.register(deauth_alerts) # SDR DEVICE REGISTRY # ============================================ # Tracks which mode is using which SDR device to prevent conflicts -# Key: device_index (int), Value: mode_name (str) -sdr_device_registry: dict[int, str] = {} +# Key: "sdr_type:device_index" (str), Value: mode_name (str) +sdr_device_registry: dict[str, str] = {} sdr_device_registry_lock = threading.Lock() -def claim_sdr_device(device_index: int, mode_name: str) -> str | None: +def claim_sdr_device(device_index: int, mode_name: str, sdr_type: str = 'rtlsdr') -> str | None: """Claim an SDR device for a mode. Checks the in-app registry first, then probes the USB device to @@ -272,43 +272,48 @@ def claim_sdr_device(device_index: int, mode_name: str) -> str | None: Args: device_index: The SDR device index to claim mode_name: Name of the mode claiming the device (e.g., 'sensor', 'rtlamr') + sdr_type: SDR type string (e.g., 'rtlsdr', 'hackrf', 'limesdr') Returns: Error message if device is in use, None if successfully claimed """ + key = f"{sdr_type}:{device_index}" with sdr_device_registry_lock: - if device_index in sdr_device_registry: - in_use_by = sdr_device_registry[device_index] - return f'SDR device {device_index} is in use by {in_use_by}. Stop {in_use_by} first or use a different device.' + if key in sdr_device_registry: + in_use_by = sdr_device_registry[key] + return f'SDR device {sdr_type}:{device_index} is in use by {in_use_by}. Stop {in_use_by} first or use a different device.' # Probe the USB device to catch external processes holding the handle - try: - from utils.sdr.detection import probe_rtlsdr_device - usb_error = probe_rtlsdr_device(device_index) - if usb_error: - return usb_error - except Exception: - pass # If probe fails, let the caller proceed normally + if sdr_type == 'rtlsdr': + try: + from utils.sdr.detection import probe_rtlsdr_device + usb_error = probe_rtlsdr_device(device_index) + if usb_error: + return usb_error + except Exception: + pass # If probe fails, let the caller proceed normally - sdr_device_registry[device_index] = mode_name + sdr_device_registry[key] = mode_name return None -def release_sdr_device(device_index: int) -> None: +def release_sdr_device(device_index: int, sdr_type: str = 'rtlsdr') -> None: """Release an SDR device from the registry. Args: device_index: The SDR device index to release + sdr_type: SDR type string (e.g., 'rtlsdr', 'hackrf', 'limesdr') """ + key = f"{sdr_type}:{device_index}" with sdr_device_registry_lock: - sdr_device_registry.pop(device_index, None) + sdr_device_registry.pop(key, None) -def get_sdr_device_status() -> dict[int, str]: +def get_sdr_device_status() -> dict[str, str]: """Get current SDR device allocations. Returns: - Dictionary mapping device indices to mode names + Dictionary mapping 'sdr_type:device_index' keys to mode names """ with sdr_device_registry_lock: return dict(sdr_device_registry) @@ -429,8 +434,9 @@ def get_devices_status() -> Response: result = [] for device in devices: d = device.to_dict() - d['in_use'] = device.index in registry - d['used_by'] = registry.get(device.index) + key = f"{device.sdr_type.value}:{device.index}" + d['in_use'] = key in registry + d['used_by'] = registry.get(key) result.append(d) return jsonify(result) diff --git a/routes/acars.py b/routes/acars.py index 7b9da78..5ffabef 100644 --- a/routes/acars.py +++ b/routes/acars.py @@ -21,7 +21,7 @@ import app as app_module from utils.logging import sensor_logger as logger from utils.validation import validate_device_index, validate_gain, validate_ppm from utils.sdr import SDRFactory, SDRType -from utils.sse import sse_stream_fanout +from utils.sse import sse_stream_fanout from utils.event_pipeline import process_event from utils.constants import ( PROCESS_TERMINATE_TIMEOUT, @@ -45,6 +45,7 @@ acars_last_message_time = None # Track which device is being used acars_active_device: int | None = None +acars_active_sdr_type: str | None = None def find_acarsdec(): @@ -151,7 +152,7 @@ def stream_acars_output(process: subprocess.Popen, is_text_mode: bool = False) - logger.error(f"ACARS stream error: {e}") app_module.acars_queue.put({'type': 'error', 'message': str(e)}) finally: - global acars_active_device + global acars_active_device, acars_active_sdr_type # Ensure process is terminated try: process.terminate() @@ -167,8 +168,9 @@ def stream_acars_output(process: subprocess.Popen, is_text_mode: bool = False) - app_module.acars_process = None # Release SDR device if acars_active_device is not None: - app_module.release_sdr_device(acars_active_device) + app_module.release_sdr_device(acars_active_device, acars_active_sdr_type or 'rtlsdr') acars_active_device = None + acars_active_sdr_type = None @acars_bp.route('/tools') @@ -200,7 +202,7 @@ def acars_status() -> Response: @acars_bp.route('/start', methods=['POST']) def start_acars() -> Response: """Start ACARS decoder.""" - global acars_message_count, acars_last_message_time, acars_active_device + global acars_message_count, acars_last_message_time, acars_active_device, acars_active_sdr_type with app_module.acars_lock: if app_module.acars_process and app_module.acars_process.poll() is None: @@ -227,9 +229,12 @@ def start_acars() -> Response: except ValueError as e: return jsonify({'status': 'error', 'message': str(e)}), 400 + # Resolve SDR type for device selection + sdr_type_str = data.get('sdr_type', 'rtlsdr') + # Check if device is available device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'acars') + error = app_module.claim_sdr_device(device_int, 'acars', sdr_type_str) if error: return jsonify({ 'status': 'error', @@ -238,6 +243,7 @@ def start_acars() -> Response: }), 409 acars_active_device = device_int + acars_active_sdr_type = sdr_type_str # Get frequencies - use provided or defaults frequencies = data.get('frequencies', DEFAULT_ACARS_FREQUENCIES) @@ -255,8 +261,6 @@ def start_acars() -> Response: acars_message_count = 0 acars_last_message_time = None - # Resolve SDR type for device selection - sdr_type_str = data.get('sdr_type', 'rtlsdr') try: sdr_type = SDRType(sdr_type_str) except ValueError: @@ -343,8 +347,9 @@ def start_acars() -> Response: if process.poll() is not None: # Process died - release device if acars_active_device is not None: - app_module.release_sdr_device(acars_active_device) + app_module.release_sdr_device(acars_active_device, acars_active_sdr_type or 'rtlsdr') acars_active_device = None + acars_active_sdr_type = None stderr = '' if process.stderr: stderr = process.stderr.read().decode('utf-8', errors='replace') @@ -375,8 +380,9 @@ def start_acars() -> Response: except Exception as e: # Release device on failure if acars_active_device is not None: - app_module.release_sdr_device(acars_active_device) + app_module.release_sdr_device(acars_active_device, acars_active_sdr_type or 'rtlsdr') acars_active_device = None + acars_active_sdr_type = None logger.error(f"Failed to start ACARS decoder: {e}") return jsonify({'status': 'error', 'message': str(e)}), 500 @@ -384,7 +390,7 @@ def start_acars() -> Response: @acars_bp.route('/stop', methods=['POST']) def stop_acars() -> Response: """Stop ACARS decoder.""" - global acars_active_device + global acars_active_device, acars_active_sdr_type with app_module.acars_lock: if not app_module.acars_process: @@ -405,31 +411,32 @@ def stop_acars() -> Response: # Release device from registry if acars_active_device is not None: - app_module.release_sdr_device(acars_active_device) + app_module.release_sdr_device(acars_active_device, acars_active_sdr_type or 'rtlsdr') acars_active_device = None + acars_active_sdr_type = None return jsonify({'status': 'stopped'}) -@acars_bp.route('/stream') -def stream_acars() -> Response: - """SSE stream for ACARS messages.""" - def _on_msg(msg: dict[str, Any]) -> None: - process_event('acars', msg, msg.get('type')) - - response = Response( - sse_stream_fanout( - source_queue=app_module.acars_queue, - channel_key='acars', - timeout=SSE_QUEUE_TIMEOUT, - keepalive_interval=SSE_KEEPALIVE_INTERVAL, - on_message=_on_msg, - ), - mimetype='text/event-stream', - ) - response.headers['Cache-Control'] = 'no-cache' - response.headers['X-Accel-Buffering'] = 'no' - return response +@acars_bp.route('/stream') +def stream_acars() -> Response: + """SSE stream for ACARS messages.""" + def _on_msg(msg: dict[str, Any]) -> None: + process_event('acars', msg, msg.get('type')) + + response = Response( + sse_stream_fanout( + source_queue=app_module.acars_queue, + channel_key='acars', + timeout=SSE_QUEUE_TIMEOUT, + keepalive_interval=SSE_KEEPALIVE_INTERVAL, + on_message=_on_msg, + ), + mimetype='text/event-stream', + ) + response.headers['Cache-Control'] = 'no-cache' + response.headers['X-Accel-Buffering'] = 'no' + return response @acars_bp.route('/frequencies') diff --git a/routes/adsb.py b/routes/adsb.py index 08c3186..60cc4a9 100644 --- a/routes/adsb.py +++ b/routes/adsb.py @@ -72,6 +72,7 @@ adsb_last_message_time = None adsb_bytes_received = 0 adsb_lines_received = 0 adsb_active_device = None # Track which device index is being used +adsb_active_sdr_type: str | None = None _sbs_error_logged = False # Suppress repeated connection error logs # Track ICAOs already looked up in aircraft database (avoid repeated lookups) @@ -674,7 +675,7 @@ def adsb_session(): @adsb_bp.route('/start', methods=['POST']) def start_adsb(): """Start ADS-B tracking.""" - global adsb_using_service, adsb_active_device + global adsb_using_service, adsb_active_device, adsb_active_sdr_type with app_module.adsb_lock: if adsb_using_service: @@ -787,7 +788,7 @@ def start_adsb(): # Check if device is available before starting local dump1090 device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'adsb') + error = app_module.claim_sdr_device(device_int, 'adsb', sdr_type_str) if error: return jsonify({ 'status': 'error', @@ -825,7 +826,7 @@ def start_adsb(): if app_module.adsb_process.poll() is not None: # Process exited - release device and get error message - app_module.release_sdr_device(device_int) + app_module.release_sdr_device(device_int, sdr_type_str) stderr_output = '' if app_module.adsb_process.stderr: try: @@ -872,6 +873,7 @@ def start_adsb(): adsb_using_service = True adsb_active_device = device # Track which device is being used + adsb_active_sdr_type = sdr_type_str thread = threading.Thread(target=parse_sbs_stream, args=(f'localhost:{ADSB_SBS_PORT}',), daemon=True) thread.start() @@ -891,14 +893,14 @@ def start_adsb(): }) except Exception as e: # Release device on failure - app_module.release_sdr_device(device_int) + app_module.release_sdr_device(device_int, sdr_type_str) return jsonify({'status': 'error', 'message': str(e)}) @adsb_bp.route('/stop', methods=['POST']) def stop_adsb(): """Stop ADS-B tracking.""" - global adsb_using_service, adsb_active_device + global adsb_using_service, adsb_active_device, adsb_active_sdr_type data = request.get_json(silent=True) or {} stop_source = data.get('source') stopped_by = request.remote_addr @@ -923,10 +925,11 @@ def stop_adsb(): # Release device from registry if adsb_active_device is not None: - app_module.release_sdr_device(adsb_active_device) + app_module.release_sdr_device(adsb_active_device, adsb_active_sdr_type or 'rtlsdr') adsb_using_service = False adsb_active_device = None + adsb_active_sdr_type = None app_module.adsb_aircraft.clear() _looked_up_icaos.clear() diff --git a/routes/ais.py b/routes/ais.py index 091d1ae..021fa24 100644 --- a/routes/ais.py +++ b/routes/ais.py @@ -44,6 +44,7 @@ ais_connected = False ais_messages_received = 0 ais_last_message_time = None ais_active_device = None +ais_active_sdr_type: str | None = None _ais_error_logged = True # Common installation paths for AIS-catcher @@ -350,7 +351,7 @@ def ais_status(): @ais_bp.route('/start', methods=['POST']) def start_ais(): """Start AIS tracking.""" - global ais_running, ais_active_device + global ais_running, ais_active_device, ais_active_sdr_type with app_module.ais_lock: if ais_running: @@ -397,7 +398,7 @@ def start_ais(): # Check if device is available device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'ais') + error = app_module.claim_sdr_device(device_int, 'ais', sdr_type_str) if error: return jsonify({ 'status': 'error', @@ -436,7 +437,7 @@ def start_ais(): if app_module.ais_process.poll() is not None: # Release device on failure - app_module.release_sdr_device(device_int) + app_module.release_sdr_device(device_int, sdr_type_str) stderr_output = '' if app_module.ais_process.stderr: try: @@ -450,6 +451,7 @@ def start_ais(): ais_running = True ais_active_device = device + ais_active_sdr_type = sdr_type_str # Start TCP parser thread thread = threading.Thread(target=parse_ais_stream, args=(tcp_port,), daemon=True) @@ -463,7 +465,7 @@ def start_ais(): }) except Exception as e: # Release device on failure - app_module.release_sdr_device(device_int) + app_module.release_sdr_device(device_int, sdr_type_str) logger.error(f"Failed to start AIS-catcher: {e}") return jsonify({'status': 'error', 'message': str(e)}), 500 @@ -471,7 +473,7 @@ def start_ais(): @ais_bp.route('/stop', methods=['POST']) def stop_ais(): """Stop AIS tracking.""" - global ais_running, ais_active_device + global ais_running, ais_active_device, ais_active_sdr_type with app_module.ais_lock: if app_module.ais_process: @@ -490,10 +492,11 @@ def stop_ais(): # Release device from registry if ais_active_device is not None: - app_module.release_sdr_device(ais_active_device) + app_module.release_sdr_device(ais_active_device, ais_active_sdr_type or 'rtlsdr') ais_running = False ais_active_device = None + ais_active_sdr_type = None app_module.ais_vessels.clear() return jsonify({'status': 'stopped'}) diff --git a/routes/aprs.py b/routes/aprs.py index 3bc77a9..93ce5e9 100644 --- a/routes/aprs.py +++ b/routes/aprs.py @@ -37,6 +37,7 @@ aprs_bp = Blueprint('aprs', __name__, url_prefix='/aprs') # Track which SDR device is being used aprs_active_device: int | None = None +aprs_active_sdr_type: str | None = None # APRS frequencies by region (MHz) APRS_FREQUENCIES = { @@ -1454,7 +1455,7 @@ def stream_aprs_output(master_fd: int, rtl_process: subprocess.Popen, decoder_pr - type='meter': Audio level meter readings (rate-limited) """ global aprs_packet_count, aprs_station_count, aprs_last_packet_time, aprs_stations - global _last_meter_time, _last_meter_level, aprs_active_device + global _last_meter_time, _last_meter_level, aprs_active_device, aprs_active_sdr_type # Capture the device claimed by THIS session so the finally block only # releases our own device, not one claimed by a subsequent start. @@ -1588,8 +1589,9 @@ def stream_aprs_output(master_fd: int, rtl_process: subprocess.Popen, decoder_pr pass # Release SDR device — only if it's still ours (not reclaimed by a new start) if my_device is not None and aprs_active_device == my_device: - app_module.release_sdr_device(my_device) + app_module.release_sdr_device(my_device, aprs_active_sdr_type or 'rtlsdr') aprs_active_device = None + aprs_active_sdr_type = None @aprs_bp.route('/tools') @@ -1658,7 +1660,7 @@ def aprs_data() -> Response: def start_aprs() -> Response: """Start APRS decoder.""" global aprs_packet_count, aprs_station_count, aprs_last_packet_time, aprs_stations - global aprs_active_device + global aprs_active_device, aprs_active_sdr_type with app_module.aprs_lock: if app_module.aprs_process and app_module.aprs_process.poll() is None: @@ -1707,7 +1709,7 @@ def start_aprs() -> Response: }), 400 # Reserve SDR device to prevent conflicts with other modes - error = app_module.claim_sdr_device(device, 'aprs') + error = app_module.claim_sdr_device(device, 'aprs', sdr_type_str) if error: return jsonify({ 'status': 'error', @@ -1715,6 +1717,7 @@ def start_aprs() -> Response: 'message': error }), 409 aprs_active_device = device + aprs_active_sdr_type = sdr_type_str # Get frequency for region region = data.get('region', 'north_america') @@ -1756,8 +1759,9 @@ def start_aprs() -> Response: rtl_cmd = rtl_cmd[:-1] + ['-E', 'dc', '-A', 'fast', '-'] except Exception as e: if aprs_active_device is not None: - app_module.release_sdr_device(aprs_active_device) + app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr') aprs_active_device = None + aprs_active_sdr_type = None return jsonify({'status': 'error', 'message': f'Failed to build SDR command: {e}'}), 500 # Build decoder command @@ -1859,8 +1863,9 @@ def start_aprs() -> Response: except Exception: pass if aprs_active_device is not None: - app_module.release_sdr_device(aprs_active_device) + app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr') aprs_active_device = None + aprs_active_sdr_type = None return jsonify({'status': 'error', 'message': error_msg}), 500 if decoder_process.poll() is not None: @@ -1886,8 +1891,9 @@ def start_aprs() -> Response: except Exception: pass if aprs_active_device is not None: - app_module.release_sdr_device(aprs_active_device) + app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr') aprs_active_device = None + aprs_active_sdr_type = None return jsonify({'status': 'error', 'message': error_msg}), 500 # Store references for status checks and cleanup @@ -1915,15 +1921,16 @@ def start_aprs() -> Response: except Exception as e: logger.error(f"Failed to start APRS decoder: {e}") if aprs_active_device is not None: - app_module.release_sdr_device(aprs_active_device) + app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr') aprs_active_device = None + aprs_active_sdr_type = None return jsonify({'status': 'error', 'message': str(e)}), 500 @aprs_bp.route('/stop', methods=['POST']) def stop_aprs() -> Response: """Stop APRS decoder.""" - global aprs_active_device + global aprs_active_device, aprs_active_sdr_type with app_module.aprs_lock: processes_to_stop = [] @@ -1963,8 +1970,9 @@ def stop_aprs() -> Response: # Release SDR device if aprs_active_device is not None: - app_module.release_sdr_device(aprs_active_device) + app_module.release_sdr_device(aprs_active_device, aprs_active_sdr_type or 'rtlsdr') aprs_active_device = None + aprs_active_sdr_type = None return jsonify({'status': 'stopped'}) diff --git a/routes/dsc.py b/routes/dsc.py index cbdbe33..5ef13d1 100644 --- a/routes/dsc.py +++ b/routes/dsc.py @@ -51,6 +51,7 @@ dsc_running = False # Track which device is being used dsc_active_device: int | None = None +dsc_active_sdr_type: str | None = None def _get_dsc_decoder_path() -> str | None: @@ -171,7 +172,7 @@ def stream_dsc_decoder(master_fd: int, decoder_process: subprocess.Popen) -> Non 'error': str(e) }) finally: - global dsc_active_device + global dsc_active_device, dsc_active_sdr_type try: os.close(master_fd) except OSError: @@ -197,8 +198,9 @@ def stream_dsc_decoder(master_fd: int, decoder_process: subprocess.Popen) -> Non app_module.dsc_rtl_process = None # Release SDR device if dsc_active_device is not None: - app_module.release_sdr_device(dsc_active_device) + app_module.release_sdr_device(dsc_active_device, dsc_active_sdr_type or 'rtlsdr') dsc_active_device = None + dsc_active_sdr_type = None def _store_critical_alert(msg: dict) -> None: @@ -331,10 +333,13 @@ def start_decoding() -> Response: 'message': str(e) }), 400 + # Get SDR type from request + sdr_type_str = data.get('sdr_type', 'rtlsdr') + # Check if device is available using centralized registry - global dsc_active_device + global dsc_active_device, dsc_active_sdr_type device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'dsc') + error = app_module.claim_sdr_device(device_int, 'dsc', sdr_type_str) if error: return jsonify({ 'status': 'error', @@ -343,6 +348,7 @@ def start_decoding() -> Response: }), 409 dsc_active_device = device_int + dsc_active_sdr_type = sdr_type_str # Clear queue while not app_module.dsc_queue.empty(): @@ -440,8 +446,9 @@ def start_decoding() -> Response: pass # Release device on failure if dsc_active_device is not None: - app_module.release_sdr_device(dsc_active_device) + app_module.release_sdr_device(dsc_active_device, dsc_active_sdr_type or 'rtlsdr') dsc_active_device = None + dsc_active_sdr_type = None return jsonify({ 'status': 'error', 'message': f'Tool not found: {e.filename}' @@ -458,8 +465,9 @@ def start_decoding() -> Response: pass # Release device on failure if dsc_active_device is not None: - app_module.release_sdr_device(dsc_active_device) + app_module.release_sdr_device(dsc_active_device, dsc_active_sdr_type or 'rtlsdr') dsc_active_device = None + dsc_active_sdr_type = None logger.error(f"Failed to start DSC decoder: {e}") return jsonify({ 'status': 'error', @@ -470,7 +478,7 @@ def start_decoding() -> Response: @dsc_bp.route('/stop', methods=['POST']) def stop_decoding() -> Response: """Stop DSC decoder.""" - global dsc_running, dsc_active_device + global dsc_running, dsc_active_device, dsc_active_sdr_type with app_module.dsc_lock: if not app_module.dsc_process: @@ -509,8 +517,9 @@ def stop_decoding() -> Response: # Release device from registry if dsc_active_device is not None: - app_module.release_sdr_device(dsc_active_device) + app_module.release_sdr_device(dsc_active_device, dsc_active_sdr_type or 'rtlsdr') dsc_active_device = None + dsc_active_sdr_type = None return jsonify({'status': 'stopped'}) diff --git a/routes/listening_post.py b/routes/listening_post.py index 1ae8428..b59bb40 100644 --- a/routes/listening_post.py +++ b/routes/listening_post.py @@ -55,7 +55,9 @@ scanner_lock = threading.Lock() scanner_paused = False scanner_current_freq = 0.0 scanner_active_device: Optional[int] = None +scanner_active_sdr_type: str = 'rtlsdr' receiver_active_device: Optional[int] = None +receiver_active_sdr_type: str = 'rtlsdr' scanner_power_process: Optional[subprocess.Popen] = None scanner_config = { 'start_freq': 88.0, @@ -996,7 +998,7 @@ def check_tools() -> Response: @receiver_bp.route('/scanner/start', methods=['POST']) def start_scanner() -> Response: """Start the frequency scanner.""" - global scanner_thread, scanner_running, scanner_config, scanner_active_device, receiver_active_device + global scanner_thread, scanner_running, scanner_config, scanner_active_device, scanner_active_sdr_type, receiver_active_device, receiver_active_sdr_type with scanner_lock: if scanner_running: @@ -1063,10 +1065,11 @@ def start_scanner() -> Response: }), 503 # Release listening device if active if receiver_active_device is not None: - app_module.release_sdr_device(receiver_active_device) + app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type) receiver_active_device = None + receiver_active_sdr_type = 'rtlsdr' # Claim device for scanner - error = app_module.claim_sdr_device(scanner_config['device'], 'scanner') + error = app_module.claim_sdr_device(scanner_config['device'], 'scanner', scanner_config['sdr_type']) if error: return jsonify({ 'status': 'error', @@ -1074,6 +1077,7 @@ def start_scanner() -> Response: 'message': error }), 409 scanner_active_device = scanner_config['device'] + scanner_active_sdr_type = scanner_config['sdr_type'] scanner_running = True scanner_thread = threading.Thread(target=scanner_loop_power, daemon=True) scanner_thread.start() @@ -1091,9 +1095,10 @@ def start_scanner() -> Response: 'message': f'rx_fm not found. Install SoapySDR utilities for {sdr_type}.' }), 503 if receiver_active_device is not None: - app_module.release_sdr_device(receiver_active_device) + app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type) receiver_active_device = None - error = app_module.claim_sdr_device(scanner_config['device'], 'scanner') + receiver_active_sdr_type = 'rtlsdr' + error = app_module.claim_sdr_device(scanner_config['device'], 'scanner', scanner_config['sdr_type']) if error: return jsonify({ 'status': 'error', @@ -1101,6 +1106,7 @@ def start_scanner() -> Response: 'message': error }), 409 scanner_active_device = scanner_config['device'] + scanner_active_sdr_type = scanner_config['sdr_type'] scanner_running = True scanner_thread = threading.Thread(target=scanner_loop, daemon=True) @@ -1115,7 +1121,7 @@ def start_scanner() -> Response: @receiver_bp.route('/scanner/stop', methods=['POST']) def stop_scanner() -> Response: """Stop the frequency scanner.""" - global scanner_running, scanner_active_device, scanner_power_process + global scanner_running, scanner_active_device, scanner_active_sdr_type, scanner_power_process scanner_running = False _stop_audio_stream() @@ -1130,8 +1136,9 @@ def stop_scanner() -> Response: pass scanner_power_process = None if scanner_active_device is not None: - app_module.release_sdr_device(scanner_active_device) + app_module.release_sdr_device(scanner_active_device, scanner_active_sdr_type) scanner_active_device = None + scanner_active_sdr_type = 'rtlsdr' return jsonify({'status': 'stopped'}) @@ -1296,7 +1303,7 @@ def get_presets() -> Response: @receiver_bp.route('/audio/start', methods=['POST']) def start_audio() -> Response: """Start audio at specific frequency (manual mode).""" - global scanner_running, scanner_active_device, receiver_active_device, scanner_power_process, scanner_thread + global scanner_running, scanner_active_device, scanner_active_sdr_type, receiver_active_device, receiver_active_sdr_type, scanner_power_process, scanner_thread global audio_running, audio_frequency, audio_modulation, audio_source, audio_start_token data = request.json or {} @@ -1356,8 +1363,9 @@ def start_audio() -> Response: if scanner_running: scanner_running = False if scanner_active_device is not None: - app_module.release_sdr_device(scanner_active_device) + app_module.release_sdr_device(scanner_active_device, scanner_active_sdr_type) scanner_active_device = None + scanner_active_sdr_type = 'rtlsdr' scanner_thread_ref = scanner_thread scanner_proc_ref = scanner_power_process scanner_power_process = None @@ -1419,8 +1427,9 @@ def start_audio() -> Response: audio_source = 'waterfall' # Shared monitor uses the waterfall's existing SDR claim. if receiver_active_device is not None: - app_module.release_sdr_device(receiver_active_device) + app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type) receiver_active_device = None + receiver_active_sdr_type = 'rtlsdr' return jsonify({ 'status': 'started', 'frequency': frequency, @@ -1443,13 +1452,14 @@ def start_audio() -> Response: # to give the USB device time to be fully released. if receiver_active_device is None or receiver_active_device != device: if receiver_active_device is not None: - app_module.release_sdr_device(receiver_active_device) + app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type) receiver_active_device = None + receiver_active_sdr_type = 'rtlsdr' error = None max_claim_attempts = 6 for attempt in range(max_claim_attempts): - error = app_module.claim_sdr_device(device, 'receiver') + error = app_module.claim_sdr_device(device, 'receiver', sdr_type) if not error: break if attempt < max_claim_attempts - 1: @@ -1466,6 +1476,7 @@ def start_audio() -> Response: 'message': error }), 409 receiver_active_device = device + receiver_active_sdr_type = sdr_type _start_audio_stream( frequency, @@ -1489,8 +1500,9 @@ def start_audio() -> Response: # Avoid leaving a stale device claim after startup failure. if receiver_active_device is not None: - app_module.release_sdr_device(receiver_active_device) + app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type) receiver_active_device = None + receiver_active_sdr_type = 'rtlsdr' start_error = '' for log_path in ('/tmp/rtl_fm_stderr.log', '/tmp/ffmpeg_stderr.log'): @@ -1515,11 +1527,12 @@ def start_audio() -> Response: @receiver_bp.route('/audio/stop', methods=['POST']) def stop_audio() -> Response: """Stop audio.""" - global receiver_active_device + global receiver_active_device, receiver_active_sdr_type _stop_audio_stream() if receiver_active_device is not None: - app_module.release_sdr_device(receiver_active_device) + app_module.release_sdr_device(receiver_active_device, receiver_active_sdr_type) receiver_active_device = None + receiver_active_sdr_type = 'rtlsdr' return jsonify({'status': 'stopped'}) @@ -1825,6 +1838,7 @@ waterfall_running = False waterfall_lock = threading.Lock() waterfall_queue: queue.Queue = queue.Queue(maxsize=200) waterfall_active_device: Optional[int] = None +waterfall_active_sdr_type: str = 'rtlsdr' waterfall_config = { 'start_freq': 88.0, 'end_freq': 108.0, @@ -2033,7 +2047,7 @@ def _waterfall_loop(): def _stop_waterfall_internal() -> None: """Stop the waterfall display and release resources.""" - global waterfall_running, waterfall_process, waterfall_active_device + global waterfall_running, waterfall_process, waterfall_active_device, waterfall_active_sdr_type waterfall_running = False if waterfall_process and waterfall_process.poll() is None: @@ -2048,14 +2062,15 @@ def _stop_waterfall_internal() -> None: waterfall_process = None if waterfall_active_device is not None: - app_module.release_sdr_device(waterfall_active_device) + app_module.release_sdr_device(waterfall_active_device, waterfall_active_sdr_type) waterfall_active_device = None + waterfall_active_sdr_type = 'rtlsdr' @receiver_bp.route('/waterfall/start', methods=['POST']) def start_waterfall() -> Response: """Start the waterfall/spectrogram display.""" - global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device + global waterfall_thread, waterfall_running, waterfall_config, waterfall_active_device, waterfall_active_sdr_type with waterfall_lock: if waterfall_running: @@ -2101,11 +2116,12 @@ def start_waterfall() -> Response: pass # Claim SDR device - error = app_module.claim_sdr_device(waterfall_config['device'], 'waterfall') + error = app_module.claim_sdr_device(waterfall_config['device'], 'waterfall', 'rtlsdr') if error: return jsonify({'status': 'error', 'error_type': 'DEVICE_BUSY', 'message': error}), 409 waterfall_active_device = waterfall_config['device'] + waterfall_active_sdr_type = 'rtlsdr' waterfall_running = True waterfall_thread = threading.Thread(target=_waterfall_loop, daemon=True) waterfall_thread.start() diff --git a/routes/morse.py b/routes/morse.py index 5491979..42c1818 100644 --- a/routes/morse.py +++ b/routes/morse.py @@ -51,6 +51,7 @@ class _FilteredQueue: # Track which device is being used morse_active_device: int | None = None +morse_active_sdr_type: str | None = None # Runtime lifecycle state. MORSE_IDLE = 'idle' @@ -231,7 +232,7 @@ def _snapshot_live_resources() -> list[str]: @morse_bp.route('/morse/start', methods=['POST']) def start_morse() -> Response: - global morse_active_device, morse_decoder_worker, morse_stderr_worker + global morse_active_device, morse_active_sdr_type, morse_decoder_worker, morse_stderr_worker global morse_stop_event, morse_control_queue, morse_runtime_config global morse_last_error, morse_session_id @@ -261,6 +262,8 @@ def start_morse() -> Response: except ValueError as e: return jsonify({'status': 'error', 'message': str(e)}), 400 + sdr_type_str = data.get('sdr_type', 'rtlsdr') + with app_module.morse_lock: if morse_state in {MORSE_STARTING, MORSE_RUNNING, MORSE_STOPPING}: return jsonify({ @@ -270,7 +273,7 @@ def start_morse() -> Response: }), 409 device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'morse') + error = app_module.claim_sdr_device(device_int, 'morse', sdr_type_str) if error: return jsonify({ 'status': 'error', @@ -279,6 +282,7 @@ def start_morse() -> Response: }), 409 morse_active_device = device_int + morse_active_sdr_type = sdr_type_str morse_last_error = '' morse_session_id += 1 @@ -288,7 +292,6 @@ def start_morse() -> Response: sample_rate = 22050 bias_t = _bool_value(data.get('bias_t', False), False) - sdr_type_str = data.get('sdr_type', 'rtlsdr') try: sdr_type = SDRType(sdr_type_str) except ValueError: @@ -408,7 +411,7 @@ def start_morse() -> Response: for device_pos, candidate_device_index in enumerate(candidate_device_indices, start=1): if candidate_device_index != active_device_index: prev_device = active_device_index - claim_error = app_module.claim_sdr_device(candidate_device_index, 'morse') + claim_error = app_module.claim_sdr_device(candidate_device_index, 'morse', sdr_type_str) if claim_error: msg = f'{_device_label(candidate_device_index)} unavailable: {claim_error}' attempt_errors.append(msg) @@ -417,7 +420,7 @@ def start_morse() -> Response: continue if prev_device is not None: - app_module.release_sdr_device(prev_device) + app_module.release_sdr_device(prev_device, morse_active_sdr_type or 'rtlsdr') active_device_index = candidate_device_index with app_module.morse_lock: morse_active_device = active_device_index @@ -634,8 +637,9 @@ def start_morse() -> Response: logger.error('Morse startup failed: %s', msg) with app_module.morse_lock: if morse_active_device is not None: - app_module.release_sdr_device(morse_active_device) + app_module.release_sdr_device(morse_active_device, morse_active_sdr_type or 'rtlsdr') morse_active_device = None + morse_active_sdr_type = None morse_last_error = msg _set_state(MORSE_ERROR, msg) _set_state(MORSE_IDLE, 'Idle') @@ -675,8 +679,9 @@ def start_morse() -> Response: ) with app_module.morse_lock: if morse_active_device is not None: - app_module.release_sdr_device(morse_active_device) + app_module.release_sdr_device(morse_active_device, morse_active_sdr_type or 'rtlsdr') morse_active_device = None + morse_active_sdr_type = None morse_last_error = f'Tool not found: {e.filename}' _set_state(MORSE_ERROR, morse_last_error) _set_state(MORSE_IDLE, 'Idle') @@ -692,8 +697,9 @@ def start_morse() -> Response: ) with app_module.morse_lock: if morse_active_device is not None: - app_module.release_sdr_device(morse_active_device) + app_module.release_sdr_device(morse_active_device, morse_active_sdr_type or 'rtlsdr') morse_active_device = None + morse_active_sdr_type = None morse_last_error = str(e) _set_state(MORSE_ERROR, morse_last_error) _set_state(MORSE_IDLE, 'Idle') @@ -702,7 +708,7 @@ def start_morse() -> Response: @morse_bp.route('/morse/stop', methods=['POST']) def stop_morse() -> Response: - global morse_active_device, morse_decoder_worker, morse_stderr_worker + global morse_active_device, morse_active_sdr_type, morse_decoder_worker, morse_stderr_worker global morse_stop_event, morse_control_queue stop_started = time.perf_counter() @@ -717,6 +723,7 @@ def stop_morse() -> Response: stderr_thread = morse_stderr_worker or getattr(rtl_proc, '_stderr_thread', None) control_queue = morse_control_queue or getattr(rtl_proc, '_control_queue', None) active_device = morse_active_device + active_sdr_type = morse_active_sdr_type if ( not rtl_proc @@ -768,7 +775,7 @@ def stop_morse() -> Response: _mark(f'stderr thread joined={stderr_joined}') if active_device is not None: - app_module.release_sdr_device(active_device) + app_module.release_sdr_device(active_device, active_sdr_type or 'rtlsdr') _mark(f'SDR device {active_device} released') stop_ms = round((time.perf_counter() - stop_started) * 1000.0, 1) @@ -782,6 +789,7 @@ def stop_morse() -> Response: with app_module.morse_lock: morse_active_device = None + morse_active_sdr_type = None _set_state(MORSE_IDLE, 'Stopped', extra={ 'stop_ms': stop_ms, 'cleanup_steps': cleanup_steps, diff --git a/routes/pager.py b/routes/pager.py index 6dfcc3b..7777ced 100644 --- a/routes/pager.py +++ b/routes/pager.py @@ -24,7 +24,7 @@ from utils.validation import ( validate_frequency, validate_device_index, validate_gain, validate_ppm, validate_rtl_tcp_host, validate_rtl_tcp_port ) -from utils.sse import sse_stream_fanout +from utils.sse import sse_stream_fanout from utils.event_pipeline import process_event from utils.process import safe_terminate, register_process, unregister_process from utils.sdr import SDRFactory, SDRType, SDRValidationError @@ -34,6 +34,7 @@ pager_bp = Blueprint('pager', __name__) # Track which device is being used pager_active_device: int | None = None +pager_active_sdr_type: str | None = None def parse_multimon_output(line: str) -> dict[str, str] | None: @@ -96,7 +97,7 @@ def parse_multimon_output(line: str) -> dict[str, str] | None: return None -def log_message(msg: dict[str, Any]) -> None: +def log_message(msg: dict[str, Any]) -> None: """Log a message to file if logging is enabled.""" if not app_module.logging_enabled: return @@ -104,39 +105,39 @@ def log_message(msg: dict[str, Any]) -> None: with open(app_module.log_file_path, 'a') as f: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') f.write(f"{timestamp} | {msg.get('protocol', 'UNKNOWN')} | {msg.get('address', '')} | {msg.get('message', '')}\n") - except Exception as e: - logger.error(f"Failed to log message: {e}") - - -def _encode_scope_waveform(samples: tuple[int, ...], window_size: int = 256) -> list[int]: - """Compress recent PCM samples into a signed 8-bit waveform for SSE.""" - if not samples: - return [] - - window = samples[-window_size:] if len(samples) > window_size else samples - waveform: list[int] = [] - for sample in window: - # Convert int16 PCM to int8 range for lightweight transport. - packed = int(round(sample / 256)) - waveform.append(max(-127, min(127, packed))) - return waveform - - -def audio_relay_thread( - rtl_stdout, - multimon_stdin, - output_queue: queue.Queue, - stop_event: threading.Event, -) -> None: - """Relay audio from rtl_fm to multimon-ng while computing signal levels. - - Reads raw 16-bit LE PCM from *rtl_stdout*, writes every chunk straight - through to *multimon_stdin*, and every ~100 ms pushes an RMS / peak scope - event plus a compact waveform sample onto *output_queue*. - """ - CHUNK = 4096 # bytes – 2048 samples at 16-bit mono - INTERVAL = 0.1 # seconds between scope updates - last_scope = time.monotonic() + except Exception as e: + logger.error(f"Failed to log message: {e}") + + +def _encode_scope_waveform(samples: tuple[int, ...], window_size: int = 256) -> list[int]: + """Compress recent PCM samples into a signed 8-bit waveform for SSE.""" + if not samples: + return [] + + window = samples[-window_size:] if len(samples) > window_size else samples + waveform: list[int] = [] + for sample in window: + # Convert int16 PCM to int8 range for lightweight transport. + packed = int(round(sample / 256)) + waveform.append(max(-127, min(127, packed))) + return waveform + + +def audio_relay_thread( + rtl_stdout, + multimon_stdin, + output_queue: queue.Queue, + stop_event: threading.Event, +) -> None: + """Relay audio from rtl_fm to multimon-ng while computing signal levels. + + Reads raw 16-bit LE PCM from *rtl_stdout*, writes every chunk straight + through to *multimon_stdin*, and every ~100 ms pushes an RMS / peak scope + event plus a compact waveform sample onto *output_queue*. + """ + CHUNK = 4096 # bytes – 2048 samples at 16-bit mono + INTERVAL = 0.1 # seconds between scope updates + last_scope = time.monotonic() try: while not stop_event.is_set(): @@ -160,16 +161,16 @@ def audio_relay_thread( if n_samples == 0: continue samples = struct.unpack(f'<{n_samples}h', data[:n_samples * 2]) - peak = max(abs(s) for s in samples) - rms = int(math.sqrt(sum(s * s for s in samples) / n_samples)) - output_queue.put_nowait({ - 'type': 'scope', - 'rms': rms, - 'peak': peak, - 'waveform': _encode_scope_waveform(samples), - }) - except (struct.error, ValueError, queue.Full): - pass + peak = max(abs(s) for s in samples) + rms = int(math.sqrt(sum(s * s for s in samples) / n_samples)) + output_queue.put_nowait({ + 'type': 'scope', + 'rms': rms, + 'peak': peak, + 'waveform': _encode_scope_waveform(samples), + }) + except (struct.error, ValueError, queue.Full): + pass except Exception as e: logger.debug(f"Audio relay error: {e}") finally: @@ -220,7 +221,7 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.output_queue.put({'type': 'error', 'text': str(e)}) finally: - global pager_active_device + global pager_active_device, pager_active_sdr_type try: os.close(master_fd) except OSError: @@ -249,13 +250,14 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: app_module.current_process = None # Release SDR device if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) + app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr') pager_active_device = None + pager_active_sdr_type = None @pager_bp.route('/start', methods=['POST']) def start_decoding() -> Response: - global pager_active_device + global pager_active_device, pager_active_sdr_type with app_module.process_lock: if app_module.current_process: @@ -284,10 +286,13 @@ def start_decoding() -> Response: rtl_tcp_host = data.get('rtl_tcp_host') rtl_tcp_port = data.get('rtl_tcp_port', 1234) + # Get SDR type early so we can pass it to claim/release + sdr_type_str = data.get('sdr_type', 'rtlsdr') + # Claim local device if not using remote rtl_tcp if not rtl_tcp_host: device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'pager') + error = app_module.claim_sdr_device(device_int, 'pager', sdr_type_str) if error: return jsonify({ 'status': 'error', @@ -295,14 +300,16 @@ def start_decoding() -> Response: 'message': error }), 409 pager_active_device = device_int + pager_active_sdr_type = sdr_type_str # Validate protocols valid_protocols = ['POCSAG512', 'POCSAG1200', 'POCSAG2400', 'FLEX'] protocols = data.get('protocols', valid_protocols) if not isinstance(protocols, list): if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) + app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr') pager_active_device = None + pager_active_sdr_type = None return jsonify({'status': 'error', 'message': 'Protocols must be a list'}), 400 protocols = [p for p in protocols if p in valid_protocols] if not protocols: @@ -327,8 +334,7 @@ def start_decoding() -> Response: elif proto == 'FLEX': decoders.extend(['-a', 'FLEX']) - # Get SDR type and build command via abstraction layer - sdr_type_str = data.get('sdr_type', 'rtlsdr') + # Build command via SDR abstraction layer try: sdr_type = SDRType(sdr_type_str) except ValueError: @@ -443,8 +449,9 @@ def start_decoding() -> Response: pass # Release device on failure if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) + app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr') pager_active_device = None + pager_active_sdr_type = None return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) except Exception as e: # Kill orphaned rtl_fm process if it was started @@ -458,14 +465,15 @@ def start_decoding() -> Response: pass # Release device on failure if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) + app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr') pager_active_device = None + pager_active_sdr_type = None return jsonify({'status': 'error', 'message': str(e)}) @pager_bp.route('/stop', methods=['POST']) def stop_decoding() -> Response: - global pager_active_device + global pager_active_device, pager_active_sdr_type with app_module.process_lock: if app_module.current_process: @@ -502,8 +510,9 @@ def stop_decoding() -> Response: # Release device from registry if pager_active_device is not None: - app_module.release_sdr_device(pager_active_device) + app_module.release_sdr_device(pager_active_device, pager_active_sdr_type or 'rtlsdr') pager_active_device = None + pager_active_sdr_type = None return jsonify({'status': 'stopped'}) @@ -553,22 +562,22 @@ def toggle_logging() -> Response: return jsonify({'logging': app_module.logging_enabled, 'log_file': app_module.log_file_path}) -@pager_bp.route('/stream') -def stream() -> Response: - def _on_msg(msg: dict[str, Any]) -> None: - process_event('pager', msg, msg.get('type')) - - response = Response( - sse_stream_fanout( - source_queue=app_module.output_queue, - channel_key='pager', - timeout=1.0, - keepalive_interval=30.0, - on_message=_on_msg, - ), - mimetype='text/event-stream', - ) - response.headers['Cache-Control'] = 'no-cache' - response.headers['X-Accel-Buffering'] = 'no' - response.headers['Connection'] = 'keep-alive' - return response +@pager_bp.route('/stream') +def stream() -> Response: + def _on_msg(msg: dict[str, Any]) -> None: + process_event('pager', msg, msg.get('type')) + + response = Response( + sse_stream_fanout( + source_queue=app_module.output_queue, + channel_key='pager', + timeout=1.0, + keepalive_interval=30.0, + on_message=_on_msg, + ), + mimetype='text/event-stream', + ) + response.headers['Cache-Control'] = 'no-cache' + response.headers['X-Accel-Buffering'] = 'no' + response.headers['Connection'] = 'keep-alive' + return response diff --git a/routes/sensor.py b/routes/sensor.py index ab34c8e..29026fa 100644 --- a/routes/sensor.py +++ b/routes/sensor.py @@ -1,5 +1,5 @@ -"""RTL_433 sensor monitoring routes.""" - +"""RTL_433 sensor monitoring routes.""" + from __future__ import annotations import json @@ -10,25 +10,26 @@ import threading import time from datetime import datetime from typing import Any, Generator - -from flask import Blueprint, jsonify, request, Response - -import app as app_module -from utils.logging import sensor_logger as logger -from utils.validation import ( - validate_frequency, validate_device_index, validate_gain, validate_ppm, - validate_rtl_tcp_host, validate_rtl_tcp_port -) + +from flask import Blueprint, jsonify, request, Response + +import app as app_module +from utils.logging import sensor_logger as logger +from utils.validation import ( + validate_frequency, validate_device_index, validate_gain, validate_ppm, + validate_rtl_tcp_host, validate_rtl_tcp_port +) from utils.sse import sse_stream_fanout -from utils.event_pipeline import process_event -from utils.process import safe_terminate, register_process, unregister_process -from utils.sdr import SDRFactory, SDRType - -sensor_bp = Blueprint('sensor', __name__) - -# Track which device is being used -sensor_active_device: int | None = None - +from utils.event_pipeline import process_event +from utils.process import safe_terminate, register_process, unregister_process +from utils.sdr import SDRFactory, SDRType + +sensor_bp = Blueprint('sensor', __name__) + +# Track which device is being used +sensor_active_device: int | None = None +sensor_active_sdr_type: str | None = None + # RSSI history per device (model_id -> list of (timestamp, rssi)) sensor_rssi_history: dict[str, list[tuple[float, float]]] = {} _MAX_RSSI_HISTORY = 60 @@ -65,36 +66,36 @@ def _build_scope_waveform(rssi: float, snr: float, noise: float, points: int = 2 def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: - """Stream rtl_433 JSON output to queue.""" - try: - app_module.sensor_queue.put({'type': 'status', 'text': 'started'}) - - for line in iter(process.stdout.readline, b''): - line = line.decode('utf-8', errors='replace').strip() - if not line: - continue - - try: - # rtl_433 outputs JSON objects, one per line - data = json.loads(line) - data['type'] = 'sensor' - app_module.sensor_queue.put(data) - - # Track RSSI history per device - _model = data.get('model', '') - _dev_id = data.get('id', '') - _rssi_val = data.get('rssi') - if _rssi_val is not None and _model: - _hist_key = f"{_model}_{_dev_id}" - hist = sensor_rssi_history.setdefault(_hist_key, []) - hist.append((time.time(), float(_rssi_val))) - if len(hist) > _MAX_RSSI_HISTORY: - del hist[: len(hist) - _MAX_RSSI_HISTORY] - - # Push scope event when signal level data is present - rssi = data.get('rssi') - snr = data.get('snr') - noise = data.get('noise') + """Stream rtl_433 JSON output to queue.""" + try: + app_module.sensor_queue.put({'type': 'status', 'text': 'started'}) + + for line in iter(process.stdout.readline, b''): + line = line.decode('utf-8', errors='replace').strip() + if not line: + continue + + try: + # rtl_433 outputs JSON objects, one per line + data = json.loads(line) + data['type'] = 'sensor' + app_module.sensor_queue.put(data) + + # Track RSSI history per device + _model = data.get('model', '') + _dev_id = data.get('id', '') + _rssi_val = data.get('rssi') + if _rssi_val is not None and _model: + _hist_key = f"{_model}_{_dev_id}" + hist = sensor_rssi_history.setdefault(_hist_key, []) + hist.append((time.time(), float(_rssi_val))) + if len(hist) > _MAX_RSSI_HISTORY: + del hist[: len(hist) - _MAX_RSSI_HISTORY] + + # Push scope event when signal level data is present + rssi = data.get('rssi') + snr = data.get('snr') + noise = data.get('noise') if rssi is not None or snr is not None: try: rssi_value = float(rssi) if rssi is not None else 0.0 @@ -113,204 +114,211 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: }) except (TypeError, ValueError, queue.Full): pass - - # Log if enabled - if app_module.logging_enabled: - try: - with open(app_module.log_file_path, 'a') as f: - timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - f.write(f"{timestamp} | {data.get('model', 'Unknown')} | {json.dumps(data)}\n") - except Exception: - pass - except json.JSONDecodeError: - # Not JSON, send as raw - app_module.sensor_queue.put({'type': 'raw', 'text': line}) - - except Exception as e: - app_module.sensor_queue.put({'type': 'error', 'text': str(e)}) - finally: - global sensor_active_device - # Ensure process is terminated - try: - process.terminate() - process.wait(timeout=2) - except Exception: - try: - process.kill() - except Exception: - pass - unregister_process(process) - app_module.sensor_queue.put({'type': 'status', 'text': 'stopped'}) - with app_module.sensor_lock: - app_module.sensor_process = None - # Release SDR device - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - - -@sensor_bp.route('/sensor/status') -def sensor_status() -> Response: - """Check if sensor decoder is currently running.""" - with app_module.sensor_lock: - running = app_module.sensor_process is not None and app_module.sensor_process.poll() is None - return jsonify({'running': running}) - - -@sensor_bp.route('/start_sensor', methods=['POST']) -def start_sensor() -> Response: - global sensor_active_device - - with app_module.sensor_lock: - if app_module.sensor_process: - return jsonify({'status': 'error', 'message': 'Sensor already running'}), 409 - - data = request.json or {} - - # Validate inputs - try: - freq = validate_frequency(data.get('frequency', '433.92')) - gain = validate_gain(data.get('gain', '0')) - ppm = validate_ppm(data.get('ppm', '0')) - device = validate_device_index(data.get('device', '0')) - except ValueError as e: - return jsonify({'status': 'error', 'message': str(e)}), 400 - - # Check for rtl_tcp (remote SDR) connection - rtl_tcp_host = data.get('rtl_tcp_host') - rtl_tcp_port = data.get('rtl_tcp_port', 1234) - - # Claim local device if not using remote rtl_tcp - if not rtl_tcp_host: - device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'sensor') - if error: - return jsonify({ - 'status': 'error', - 'error_type': 'DEVICE_BUSY', - 'message': error - }), 409 - sensor_active_device = device_int - - # Clear queue - while not app_module.sensor_queue.empty(): - try: - app_module.sensor_queue.get_nowait() - except queue.Empty: - break - - # Get SDR type and build command via abstraction layer - sdr_type_str = data.get('sdr_type', 'rtlsdr') - try: - sdr_type = SDRType(sdr_type_str) - except ValueError: - sdr_type = SDRType.RTL_SDR - - if rtl_tcp_host: - # Validate and create network device - try: - rtl_tcp_host = validate_rtl_tcp_host(rtl_tcp_host) - rtl_tcp_port = validate_rtl_tcp_port(rtl_tcp_port) - except ValueError as e: - return jsonify({'status': 'error', 'message': str(e)}), 400 - - sdr_device = SDRFactory.create_network_device(rtl_tcp_host, rtl_tcp_port) - logger.info(f"Using remote SDR: rtl_tcp://{rtl_tcp_host}:{rtl_tcp_port}") - else: - # Create local device object - sdr_device = SDRFactory.create_default_device(sdr_type, index=device) - - builder = SDRFactory.get_builder(sdr_device.sdr_type) - - # Build ISM band decoder command - bias_t = data.get('bias_t', False) - cmd = builder.build_ism_command( - device=sdr_device, - frequency_mhz=freq, - gain=float(gain) if gain and gain != 0 else None, - ppm=int(ppm) if ppm and ppm != 0 else None, - bias_t=bias_t - ) - - full_cmd = ' '.join(cmd) - logger.info(f"Running: {full_cmd}") - - # Add signal level metadata so the frontend scope can display RSSI/SNR - # Disable stats reporting to suppress "row count limit 50 reached" warnings - cmd.extend(['-M', 'level', '-M', 'stats:0']) - - try: - app_module.sensor_process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - register_process(app_module.sensor_process) - - # Start output thread - thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,)) - thread.daemon = True - thread.start() - - # Monitor stderr - # Filter noisy rtl_433 diagnostics that aren't useful to display - _stderr_noise = ( - 'bitbuffer_add_bit', - 'row count limit', - ) - - def monitor_stderr(): - for line in app_module.sensor_process.stderr: - err = line.decode('utf-8', errors='replace').strip() - if err and not any(noise in err for noise in _stderr_noise): - logger.debug(f"[rtl_433] {err}") - app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) - - stderr_thread = threading.Thread(target=monitor_stderr) - stderr_thread.daemon = True - stderr_thread.start() - - app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) - - return jsonify({'status': 'started', 'command': full_cmd}) - - except FileNotFoundError: - # Release device on failure - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'}) - except Exception as e: - # Release device on failure - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - return jsonify({'status': 'error', 'message': str(e)}) - - -@sensor_bp.route('/stop_sensor', methods=['POST']) -def stop_sensor() -> Response: - global sensor_active_device - - with app_module.sensor_lock: - if app_module.sensor_process: - app_module.sensor_process.terminate() - try: - app_module.sensor_process.wait(timeout=2) - except subprocess.TimeoutExpired: - app_module.sensor_process.kill() - app_module.sensor_process = None - - # Release device from registry - if sensor_active_device is not None: - app_module.release_sdr_device(sensor_active_device) - sensor_active_device = None - - return jsonify({'status': 'stopped'}) - - return jsonify({'status': 'not_running'}) - - + + # Log if enabled + if app_module.logging_enabled: + try: + with open(app_module.log_file_path, 'a') as f: + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + f.write(f"{timestamp} | {data.get('model', 'Unknown')} | {json.dumps(data)}\n") + except Exception: + pass + except json.JSONDecodeError: + # Not JSON, send as raw + app_module.sensor_queue.put({'type': 'raw', 'text': line}) + + except Exception as e: + app_module.sensor_queue.put({'type': 'error', 'text': str(e)}) + finally: + global sensor_active_device, sensor_active_sdr_type + # Ensure process is terminated + try: + process.terminate() + process.wait(timeout=2) + except Exception: + try: + process.kill() + except Exception: + pass + unregister_process(process) + app_module.sensor_queue.put({'type': 'status', 'text': 'stopped'}) + with app_module.sensor_lock: + app_module.sensor_process = None + # Release SDR device + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr') + sensor_active_device = None + sensor_active_sdr_type = None + + +@sensor_bp.route('/sensor/status') +def sensor_status() -> Response: + """Check if sensor decoder is currently running.""" + with app_module.sensor_lock: + running = app_module.sensor_process is not None and app_module.sensor_process.poll() is None + return jsonify({'running': running}) + + +@sensor_bp.route('/start_sensor', methods=['POST']) +def start_sensor() -> Response: + global sensor_active_device, sensor_active_sdr_type + + with app_module.sensor_lock: + if app_module.sensor_process: + return jsonify({'status': 'error', 'message': 'Sensor already running'}), 409 + + data = request.json or {} + + # Validate inputs + try: + freq = validate_frequency(data.get('frequency', '433.92')) + gain = validate_gain(data.get('gain', '0')) + ppm = validate_ppm(data.get('ppm', '0')) + device = validate_device_index(data.get('device', '0')) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 + + # Check for rtl_tcp (remote SDR) connection + rtl_tcp_host = data.get('rtl_tcp_host') + rtl_tcp_port = data.get('rtl_tcp_port', 1234) + + # Get SDR type early so we can pass it to claim/release + sdr_type_str = data.get('sdr_type', 'rtlsdr') + + # Claim local device if not using remote rtl_tcp + if not rtl_tcp_host: + device_int = int(device) + error = app_module.claim_sdr_device(device_int, 'sensor', sdr_type_str) + if error: + return jsonify({ + 'status': 'error', + 'error_type': 'DEVICE_BUSY', + 'message': error + }), 409 + sensor_active_device = device_int + sensor_active_sdr_type = sdr_type_str + + # Clear queue + while not app_module.sensor_queue.empty(): + try: + app_module.sensor_queue.get_nowait() + except queue.Empty: + break + + # Build command via SDR abstraction layer + try: + sdr_type = SDRType(sdr_type_str) + except ValueError: + sdr_type = SDRType.RTL_SDR + + if rtl_tcp_host: + # Validate and create network device + try: + rtl_tcp_host = validate_rtl_tcp_host(rtl_tcp_host) + rtl_tcp_port = validate_rtl_tcp_port(rtl_tcp_port) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 + + sdr_device = SDRFactory.create_network_device(rtl_tcp_host, rtl_tcp_port) + logger.info(f"Using remote SDR: rtl_tcp://{rtl_tcp_host}:{rtl_tcp_port}") + else: + # Create local device object + sdr_device = SDRFactory.create_default_device(sdr_type, index=device) + + builder = SDRFactory.get_builder(sdr_device.sdr_type) + + # Build ISM band decoder command + bias_t = data.get('bias_t', False) + cmd = builder.build_ism_command( + device=sdr_device, + frequency_mhz=freq, + gain=float(gain) if gain and gain != 0 else None, + ppm=int(ppm) if ppm and ppm != 0 else None, + bias_t=bias_t + ) + + full_cmd = ' '.join(cmd) + logger.info(f"Running: {full_cmd}") + + # Add signal level metadata so the frontend scope can display RSSI/SNR + # Disable stats reporting to suppress "row count limit 50 reached" warnings + cmd.extend(['-M', 'level', '-M', 'stats:0']) + + try: + app_module.sensor_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + register_process(app_module.sensor_process) + + # Start output thread + thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,)) + thread.daemon = True + thread.start() + + # Monitor stderr + # Filter noisy rtl_433 diagnostics that aren't useful to display + _stderr_noise = ( + 'bitbuffer_add_bit', + 'row count limit', + ) + + def monitor_stderr(): + for line in app_module.sensor_process.stderr: + err = line.decode('utf-8', errors='replace').strip() + if err and not any(noise in err for noise in _stderr_noise): + logger.debug(f"[rtl_433] {err}") + app_module.sensor_queue.put({'type': 'info', 'text': f'[rtl_433] {err}'}) + + stderr_thread = threading.Thread(target=monitor_stderr) + stderr_thread.daemon = True + stderr_thread.start() + + app_module.sensor_queue.put({'type': 'info', 'text': f'Command: {full_cmd}'}) + + return jsonify({'status': 'started', 'command': full_cmd}) + + except FileNotFoundError: + # Release device on failure + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr') + sensor_active_device = None + sensor_active_sdr_type = None + return jsonify({'status': 'error', 'message': 'rtl_433 not found. Install with: brew install rtl_433'}) + except Exception as e: + # Release device on failure + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr') + sensor_active_device = None + sensor_active_sdr_type = None + return jsonify({'status': 'error', 'message': str(e)}) + + +@sensor_bp.route('/stop_sensor', methods=['POST']) +def stop_sensor() -> Response: + global sensor_active_device, sensor_active_sdr_type + + with app_module.sensor_lock: + if app_module.sensor_process: + app_module.sensor_process.terminate() + try: + app_module.sensor_process.wait(timeout=2) + except subprocess.TimeoutExpired: + app_module.sensor_process.kill() + app_module.sensor_process = None + + # Release device from registry + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device, sensor_active_sdr_type or 'rtlsdr') + sensor_active_device = None + sensor_active_sdr_type = None + + return jsonify({'status': 'stopped'}) + + return jsonify({'status': 'not_running'}) + + @sensor_bp.route('/stream_sensor') def stream_sensor() -> Response: def _on_msg(msg: dict[str, Any]) -> None: @@ -330,12 +338,12 @@ def stream_sensor() -> Response: response.headers['X-Accel-Buffering'] = 'no' response.headers['Connection'] = 'keep-alive' return response - - -@sensor_bp.route('/sensor/rssi_history') -def get_rssi_history() -> Response: - """Return RSSI history for all tracked sensor devices.""" - result = {} - for key, entries in sensor_rssi_history.items(): - result[key] = [{'t': round(t, 1), 'rssi': rssi} for t, rssi in entries] - return jsonify({'status': 'success', 'devices': result}) + + +@sensor_bp.route('/sensor/rssi_history') +def get_rssi_history() -> Response: + """Return RSSI history for all tracked sensor devices.""" + result = {} + for key, entries in sensor_rssi_history.items(): + result[key] = [{'t': round(t, 1), 'rssi': rssi} for t, rssi in entries] + return jsonify({'status': 'success', 'devices': result}) diff --git a/routes/vdl2.py b/routes/vdl2.py index ca2200a..5f22878 100644 --- a/routes/vdl2.py +++ b/routes/vdl2.py @@ -1,17 +1,17 @@ """VDL2 aircraft datalink routes.""" -from __future__ import annotations - -import io -import json -import os -import platform -import pty -import queue -import shutil -import subprocess -import threading -import time +from __future__ import annotations + +import io +import json +import os +import platform +import pty +import queue +import shutil +import subprocess +import threading +import time from datetime import datetime from typing import Generator @@ -21,7 +21,7 @@ import app as app_module from utils.logging import sensor_logger as logger from utils.validation import validate_device_index, validate_gain, validate_ppm from utils.sdr import SDRFactory, SDRType -from utils.sse import sse_stream_fanout +from utils.sse import sse_stream_fanout from utils.event_pipeline import process_event from utils.constants import ( PROCESS_TERMINATE_TIMEOUT, @@ -48,6 +48,7 @@ vdl2_last_message_time = None # Track which device is being used vdl2_active_device: int | None = None +vdl2_active_sdr_type: str | None = None def find_dumpvdl2(): @@ -55,22 +56,22 @@ def find_dumpvdl2(): return shutil.which('dumpvdl2') -def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) -> None: - """Stream dumpvdl2 JSON output to queue.""" - global vdl2_message_count, vdl2_last_message_time - - try: - app_module.vdl2_queue.put({'type': 'status', 'status': 'started'}) - - # Use appropriate sentinel based on mode (text mode for pty on macOS) - sentinel = '' if is_text_mode else b'' - for line in iter(process.stdout.readline, sentinel): - if is_text_mode: - line = line.strip() - else: - line = line.decode('utf-8', errors='replace').strip() - if not line: - continue +def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) -> None: + """Stream dumpvdl2 JSON output to queue.""" + global vdl2_message_count, vdl2_last_message_time + + try: + app_module.vdl2_queue.put({'type': 'status', 'status': 'started'}) + + # Use appropriate sentinel based on mode (text mode for pty on macOS) + sentinel = '' if is_text_mode else b'' + for line in iter(process.stdout.readline, sentinel): + if is_text_mode: + line = line.strip() + else: + line = line.decode('utf-8', errors='replace').strip() + if not line: + continue try: data = json.loads(line) @@ -110,7 +111,7 @@ def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) -> logger.error(f"VDL2 stream error: {e}") app_module.vdl2_queue.put({'type': 'error', 'message': str(e)}) finally: - global vdl2_active_device + global vdl2_active_device, vdl2_active_sdr_type # Ensure process is terminated try: process.terminate() @@ -126,8 +127,9 @@ def stream_vdl2_output(process: subprocess.Popen, is_text_mode: bool = False) -> app_module.vdl2_process = None # Release SDR device if vdl2_active_device is not None: - app_module.release_sdr_device(vdl2_active_device) + app_module.release_sdr_device(vdl2_active_device, vdl2_active_sdr_type or 'rtlsdr') vdl2_active_device = None + vdl2_active_sdr_type = None @vdl2_bp.route('/tools') @@ -159,7 +161,7 @@ def vdl2_status() -> Response: @vdl2_bp.route('/start', methods=['POST']) def start_vdl2() -> Response: """Start VDL2 decoder.""" - global vdl2_message_count, vdl2_last_message_time, vdl2_active_device + global vdl2_message_count, vdl2_last_message_time, vdl2_active_device, vdl2_active_sdr_type with app_module.vdl2_lock: if app_module.vdl2_process and app_module.vdl2_process.poll() is None: @@ -186,9 +188,16 @@ def start_vdl2() -> Response: except ValueError as e: return jsonify({'status': 'error', 'message': str(e)}), 400 + # Resolve SDR type for device selection + sdr_type_str = data.get('sdr_type', 'rtlsdr') + try: + sdr_type = SDRType(sdr_type_str) + except ValueError: + sdr_type = SDRType.RTL_SDR + # Check if device is available device_int = int(device) - error = app_module.claim_sdr_device(device_int, 'vdl2') + error = app_module.claim_sdr_device(device_int, 'vdl2', sdr_type_str) if error: return jsonify({ 'status': 'error', @@ -197,6 +206,7 @@ def start_vdl2() -> Response: }), 409 vdl2_active_device = device_int + vdl2_active_sdr_type = sdr_type_str # Get frequencies - use provided or defaults # dumpvdl2 expects frequencies in Hz (integers) @@ -215,13 +225,6 @@ def start_vdl2() -> Response: vdl2_message_count = 0 vdl2_last_message_time = None - # Resolve SDR type for device selection - sdr_type_str = data.get('sdr_type', 'rtlsdr') - try: - sdr_type = SDRType(sdr_type_str) - except ValueError: - sdr_type = SDRType.RTL_SDR - is_soapy = sdr_type not in (SDRType.RTL_SDR,) # Build dumpvdl2 command @@ -252,28 +255,28 @@ def start_vdl2() -> Response: logger.info(f"Starting VDL2 decoder: {' '.join(cmd)}") try: - is_text_mode = False - - # On macOS, use pty to avoid stdout buffering issues - if platform.system() == 'Darwin': - master_fd, slave_fd = pty.openpty() - process = subprocess.Popen( - cmd, - stdout=slave_fd, - stderr=subprocess.PIPE, - start_new_session=True - ) - os.close(slave_fd) - # Wrap master_fd as a text file for line-buffered reading - process.stdout = io.open(master_fd, 'r', buffering=1) - is_text_mode = True - else: - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - start_new_session=True - ) + is_text_mode = False + + # On macOS, use pty to avoid stdout buffering issues + if platform.system() == 'Darwin': + master_fd, slave_fd = pty.openpty() + process = subprocess.Popen( + cmd, + stdout=slave_fd, + stderr=subprocess.PIPE, + start_new_session=True + ) + os.close(slave_fd) + # Wrap master_fd as a text file for line-buffered reading + process.stdout = io.open(master_fd, 'r', buffering=1) + is_text_mode = True + else: + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + start_new_session=True + ) # Wait briefly to check if process started time.sleep(PROCESS_START_WAIT) @@ -281,8 +284,9 @@ def start_vdl2() -> Response: if process.poll() is not None: # Process died - release device if vdl2_active_device is not None: - app_module.release_sdr_device(vdl2_active_device) + app_module.release_sdr_device(vdl2_active_device, vdl2_active_sdr_type or 'rtlsdr') vdl2_active_device = None + vdl2_active_sdr_type = None stderr = '' if process.stderr: stderr = process.stderr.read().decode('utf-8', errors='replace') @@ -295,12 +299,12 @@ def start_vdl2() -> Response: app_module.vdl2_process = process register_process(process) - # Start output streaming thread - thread = threading.Thread( - target=stream_vdl2_output, - args=(process, is_text_mode), - daemon=True - ) + # Start output streaming thread + thread = threading.Thread( + target=stream_vdl2_output, + args=(process, is_text_mode), + daemon=True + ) thread.start() return jsonify({ @@ -313,8 +317,9 @@ def start_vdl2() -> Response: except Exception as e: # Release device on failure if vdl2_active_device is not None: - app_module.release_sdr_device(vdl2_active_device) + app_module.release_sdr_device(vdl2_active_device, vdl2_active_sdr_type or 'rtlsdr') vdl2_active_device = None + vdl2_active_sdr_type = None logger.error(f"Failed to start VDL2 decoder: {e}") return jsonify({'status': 'error', 'message': str(e)}), 500 @@ -322,7 +327,7 @@ def start_vdl2() -> Response: @vdl2_bp.route('/stop', methods=['POST']) def stop_vdl2() -> Response: """Stop VDL2 decoder.""" - global vdl2_active_device + global vdl2_active_device, vdl2_active_sdr_type with app_module.vdl2_lock: if not app_module.vdl2_process: @@ -343,31 +348,32 @@ def stop_vdl2() -> Response: # Release device from registry if vdl2_active_device is not None: - app_module.release_sdr_device(vdl2_active_device) + app_module.release_sdr_device(vdl2_active_device, vdl2_active_sdr_type or 'rtlsdr') vdl2_active_device = None + vdl2_active_sdr_type = None return jsonify({'status': 'stopped'}) -@vdl2_bp.route('/stream') -def stream_vdl2() -> Response: - """SSE stream for VDL2 messages.""" - def _on_msg(msg: dict[str, Any]) -> None: - process_event('vdl2', msg, msg.get('type')) - - response = Response( - sse_stream_fanout( - source_queue=app_module.vdl2_queue, - channel_key='vdl2', - timeout=SSE_QUEUE_TIMEOUT, - keepalive_interval=SSE_KEEPALIVE_INTERVAL, - on_message=_on_msg, - ), - mimetype='text/event-stream', - ) - response.headers['Cache-Control'] = 'no-cache' - response.headers['X-Accel-Buffering'] = 'no' - return response +@vdl2_bp.route('/stream') +def stream_vdl2() -> Response: + """SSE stream for VDL2 messages.""" + def _on_msg(msg: dict[str, Any]) -> None: + process_event('vdl2', msg, msg.get('type')) + + response = Response( + sse_stream_fanout( + source_queue=app_module.vdl2_queue, + channel_key='vdl2', + timeout=SSE_QUEUE_TIMEOUT, + keepalive_interval=SSE_KEEPALIVE_INTERVAL, + on_message=_on_msg, + ), + mimetype='text/event-stream', + ) + response.headers['Cache-Control'] = 'no-cache' + response.headers['X-Accel-Buffering'] = 'no' + return response @vdl2_bp.route('/frequencies') diff --git a/routes/waterfall_websocket.py b/routes/waterfall_websocket.py index de31227..99ceab3 100644 --- a/routes/waterfall_websocket.py +++ b/routes/waterfall_websocket.py @@ -367,6 +367,7 @@ def init_waterfall_websocket(app: Flask): reader_thread = None stop_event = threading.Event() claimed_device = None + claimed_sdr_type = 'rtlsdr' my_generation = None # tracks which capture generation this handler owns capture_center_mhz = 0.0 capture_start_freq = 0.0 @@ -430,8 +431,9 @@ def init_waterfall_websocket(app: Flask): unregister_process(iq_process) iq_process = None if claimed_device is not None: - app_module.release_sdr_device(claimed_device) + app_module.release_sdr_device(claimed_device, claimed_sdr_type) claimed_device = None + claimed_sdr_type = 'rtlsdr' _set_shared_capture_state(running=False, generation=my_generation) my_generation = None stop_event.clear() @@ -513,7 +515,7 @@ def init_waterfall_websocket(app: Flask): max_claim_attempts = 4 if was_restarting else 1 claim_err = None for _claim_attempt in range(max_claim_attempts): - claim_err = app_module.claim_sdr_device(device_index, 'waterfall') + claim_err = app_module.claim_sdr_device(device_index, 'waterfall', sdr_type_str) if not claim_err: break if _claim_attempt < max_claim_attempts - 1: @@ -526,6 +528,7 @@ def init_waterfall_websocket(app: Flask): })) continue claimed_device = device_index + claimed_sdr_type = sdr_type_str # Build I/Q capture command try: @@ -539,8 +542,9 @@ def init_waterfall_websocket(app: Flask): bias_t=bias_t, ) except NotImplementedError as e: - app_module.release_sdr_device(device_index) + app_module.release_sdr_device(device_index, sdr_type_str) claimed_device = None + claimed_sdr_type = 'rtlsdr' ws.send(json.dumps({ 'status': 'error', 'message': str(e), @@ -549,8 +553,9 @@ def init_waterfall_websocket(app: Flask): # Pre-flight: check the capture binary exists if not shutil.which(iq_cmd[0]): - app_module.release_sdr_device(device_index) + app_module.release_sdr_device(device_index, sdr_type_str) claimed_device = None + claimed_sdr_type = 'rtlsdr' ws.send(json.dumps({ 'status': 'error', 'message': f'Required tool "{iq_cmd[0]}" not found. Install SoapySDR tools (rx_sdr).', @@ -602,8 +607,9 @@ def init_waterfall_websocket(app: Flask): safe_terminate(iq_process) unregister_process(iq_process) iq_process = None - app_module.release_sdr_device(device_index) + app_module.release_sdr_device(device_index, sdr_type_str) claimed_device = None + claimed_sdr_type = 'rtlsdr' ws.send(json.dumps({ 'status': 'error', 'message': f'Failed to start I/Q capture: {e}', @@ -806,8 +812,9 @@ def init_waterfall_websocket(app: Flask): unregister_process(iq_process) iq_process = None if claimed_device is not None: - app_module.release_sdr_device(claimed_device) + app_module.release_sdr_device(claimed_device, claimed_sdr_type) claimed_device = None + claimed_sdr_type = 'rtlsdr' _set_shared_capture_state(running=False, generation=my_generation) my_generation = None stop_event.clear() @@ -825,7 +832,7 @@ def init_waterfall_websocket(app: Flask): safe_terminate(iq_process) unregister_process(iq_process) if claimed_device is not None: - app_module.release_sdr_device(claimed_device) + app_module.release_sdr_device(claimed_device, claimed_sdr_type) _set_shared_capture_state(running=False, generation=my_generation) # Complete WebSocket close handshake, then shut down the # raw socket so Werkzeug cannot write its HTTP 200 response diff --git a/routes/wefax.py b/routes/wefax.py index 497cdf0..401672a 100644 --- a/routes/wefax.py +++ b/routes/wefax.py @@ -33,11 +33,12 @@ _wefax_queue: queue.Queue = queue.Queue(maxsize=100) # Track active SDR device wefax_active_device: int | None = None +wefax_active_sdr_type: str | None = None def _progress_callback(data: dict) -> None: """Callback to queue progress updates for SSE stream.""" - global wefax_active_device + global wefax_active_device, wefax_active_sdr_type try: _wefax_queue.put_nowait(data) @@ -56,8 +57,9 @@ def _progress_callback(data: dict) -> None: and data.get('status') in ('complete', 'error', 'stopped') and wefax_active_device is not None ): - app_module.release_sdr_device(wefax_active_device) + app_module.release_sdr_device(wefax_active_device, wefax_active_sdr_type or 'rtlsdr') wefax_active_device = None + wefax_active_sdr_type = None @wefax_bp.route('/status') @@ -169,9 +171,9 @@ def start_decoder(): }), 400 # Claim SDR device - global wefax_active_device + global wefax_active_device, wefax_active_sdr_type device_int = int(device_index) - error = app_module.claim_sdr_device(device_int, 'wefax') + error = app_module.claim_sdr_device(device_int, 'wefax', sdr_type_str) if error: return jsonify({ 'status': 'error', @@ -194,6 +196,7 @@ def start_decoder(): if success: wefax_active_device = device_int + wefax_active_sdr_type = sdr_type_str return jsonify({ 'status': 'started', 'frequency_khz': frequency_khz, @@ -209,7 +212,7 @@ def start_decoder(): 'device': device_int, }) else: - app_module.release_sdr_device(device_int) + app_module.release_sdr_device(device_int, sdr_type_str) return jsonify({ 'status': 'error', 'message': 'Failed to start decoder', @@ -219,13 +222,14 @@ def start_decoder(): @wefax_bp.route('/stop', methods=['POST']) def stop_decoder(): """Stop WeFax decoder.""" - global wefax_active_device + global wefax_active_device, wefax_active_sdr_type decoder = get_wefax_decoder() decoder.stop() if wefax_active_device is not None: - app_module.release_sdr_device(wefax_active_device) + app_module.release_sdr_device(wefax_active_device, wefax_active_sdr_type or 'rtlsdr') wefax_active_device = None + wefax_active_sdr_type = None return jsonify({'status': 'stopped'}) diff --git a/templates/adsb_dashboard.html b/templates/adsb_dashboard.html index c35636d..6ce0a96 100644 --- a/templates/adsb_dashboard.html +++ b/templates/adsb_dashboard.html @@ -1752,31 +1752,37 @@ ACARS: ${r.statistics.acarsMessages} messages`; airbandSelect.innerHTML = ''; if (devices.length === 0) { - adsbSelect.innerHTML = ''; - airbandSelect.innerHTML = ''; + adsbSelect.innerHTML = ''; + airbandSelect.innerHTML = ''; airbandSelect.disabled = true; } else { devices.forEach((dev, i) => { const idx = dev.index !== undefined ? dev.index : i; + const sdrType = dev.sdr_type || 'rtlsdr'; + const compositeVal = `${sdrType}:${idx}`; const displayName = `SDR ${idx}: ${dev.name}`; // Add to ADS-B selector const adsbOpt = document.createElement('option'); - adsbOpt.value = idx; + adsbOpt.value = compositeVal; + adsbOpt.dataset.sdrType = sdrType; + adsbOpt.dataset.index = idx; adsbOpt.textContent = displayName; adsbSelect.appendChild(adsbOpt); // Add to Airband selector const airbandOpt = document.createElement('option'); - airbandOpt.value = idx; + airbandOpt.value = compositeVal; + airbandOpt.dataset.sdrType = sdrType; + airbandOpt.dataset.index = idx; airbandOpt.textContent = displayName; airbandSelect.appendChild(airbandOpt); }); // Default: ADS-B uses first device, Airband uses second (if available) - adsbSelect.value = devices[0].index !== undefined ? devices[0].index : 0; + adsbSelect.value = adsbSelect.options[0]?.value || 'rtlsdr:0'; if (devices.length > 1) { - airbandSelect.value = devices[1].index !== undefined ? devices[1].index : 1; + airbandSelect.value = airbandSelect.options[1]?.value || airbandSelect.options[0]?.value || 'rtlsdr:0'; } // Show warning if only one device @@ -1787,8 +1793,8 @@ ACARS: ${r.statistics.acarsMessages} messages`; } }) .catch(() => { - document.getElementById('adsbDeviceSelect').innerHTML = ''; - document.getElementById('airbandDeviceSelect').innerHTML = ''; + document.getElementById('adsbDeviceSelect').innerHTML = ''; + document.getElementById('airbandDeviceSelect').innerHTML = ''; }); } @@ -2151,11 +2157,14 @@ sudo make install } } - // Get selected ADS-B device - const adsbDevice = parseInt(document.getElementById('adsbDeviceSelect').value) || 0; + // Get selected ADS-B device (composite value "sdr_type:index") + const adsbSelectVal = document.getElementById('adsbDeviceSelect').value || 'rtlsdr:0'; + const [adsbSdrType, adsbDeviceIdx] = adsbSelectVal.includes(':') ? adsbSelectVal.split(':') : ['rtlsdr', adsbSelectVal]; + const adsbDevice = parseInt(adsbDeviceIdx) || 0; const requestBody = { device: adsbDevice, + sdr_type: adsbSdrType, bias_t: getBiasTEnabled() }; if (remoteConfig) { @@ -2306,11 +2315,13 @@ sudo make install } const sessionDevice = session.device_index; + const sessionSdrType = session.sdr_type || 'rtlsdr'; if (sessionDevice !== null && sessionDevice !== undefined) { adsbActiveDevice = sessionDevice; const adsbSelect = document.getElementById('adsbDeviceSelect'); if (adsbSelect) { - adsbSelect.value = sessionDevice; + // Use composite value to select the correct device+type + adsbSelect.value = `${sessionSdrType}:${sessionDevice}`; } } @@ -3663,8 +3674,9 @@ sudo make install function startAcars() { const acarsSelect = document.getElementById('acarsDeviceSelect'); - const device = acarsSelect.value; - const sdr_type = acarsSelect.selectedOptions[0]?.dataset.sdrType || 'rtlsdr'; + const compositeVal = acarsSelect.value || 'rtlsdr:0'; + const [sdr_type, deviceIdx] = compositeVal.includes(':') ? compositeVal.split(':') : ['rtlsdr', compositeVal]; + const device = deviceIdx; const frequencies = getAcarsRegionFreqs(); // Check if using agent mode @@ -3896,18 +3908,21 @@ sudo make install const select = document.getElementById('acarsDeviceSelect'); select.innerHTML = ''; if (devices.length === 0) { - select.innerHTML = ''; + select.innerHTML = ''; } else { devices.forEach((d, i) => { const opt = document.createElement('option'); - opt.value = d.index || i; - opt.dataset.sdrType = d.sdr_type || 'rtlsdr'; - opt.textContent = `SDR ${d.index || i}: ${d.name || d.type || 'SDR'}`; + const sdrType = d.sdr_type || 'rtlsdr'; + const idx = d.index !== undefined ? d.index : i; + opt.value = `${sdrType}:${idx}`; + opt.dataset.sdrType = sdrType; + opt.dataset.index = idx; + opt.textContent = `SDR ${idx}: ${d.name || d.type || 'SDR'}`; select.appendChild(opt); }); // Default to device 1 if available (device 0 likely used for ADS-B) if (devices.length > 1) { - select.value = '1'; + select.value = select.options[1]?.value || select.options[0]?.value; } } }); @@ -3998,8 +4013,9 @@ sudo make install function startVdl2() { const vdl2Select = document.getElementById('vdl2DeviceSelect'); - const device = vdl2Select.value; - const sdr_type = vdl2Select.selectedOptions[0]?.dataset.sdrType || 'rtlsdr'; + const compositeVal = vdl2Select.value || 'rtlsdr:0'; + const [sdr_type, deviceIdx] = compositeVal.includes(':') ? compositeVal.split(':') : ['rtlsdr', compositeVal]; + const device = deviceIdx; const frequencies = getVdl2RegionFreqs(); // Check if using agent mode @@ -4419,17 +4435,20 @@ sudo make install const select = document.getElementById('vdl2DeviceSelect'); select.innerHTML = ''; if (devices.length === 0) { - select.innerHTML = ''; + select.innerHTML = ''; } else { devices.forEach((d, i) => { const opt = document.createElement('option'); - opt.value = d.index || i; - opt.dataset.sdrType = d.sdr_type || 'rtlsdr'; - opt.textContent = `SDR ${d.index || i}: ${d.name || d.type || 'SDR'}`; + const sdrType = d.sdr_type || 'rtlsdr'; + const idx = d.index !== undefined ? d.index : i; + opt.value = `${sdrType}:${idx}`; + opt.dataset.sdrType = sdrType; + opt.dataset.index = idx; + opt.textContent = `SDR ${idx}: ${d.name || d.type || 'SDR'}`; select.appendChild(opt); }); if (devices.length > 1) { - select.value = '1'; + select.value = select.options[1]?.value || select.options[0]?.value; } } }); @@ -5403,13 +5422,16 @@ sudo make install select.innerHTML = ''; if (devices.length === 0) { - select.innerHTML = ''; + select.innerHTML = ''; } else { devices.forEach(device => { const opt = document.createElement('option'); - opt.value = device.index; - opt.dataset.sdrType = device.sdr_type || 'rtlsdr'; - opt.textContent = `SDR ${device.index}: ${device.name || device.type || 'SDR'}`; + const sdrType = device.sdr_type || 'rtlsdr'; + const idx = device.index !== undefined ? device.index : 0; + opt.value = `${sdrType}:${idx}`; + opt.dataset.sdrType = sdrType; + opt.dataset.index = idx; + opt.textContent = `SDR ${idx}: ${device.name || device.type || 'SDR'}`; select.appendChild(opt); }); } diff --git a/templates/index.html b/templates/index.html index 279503f..58d1497 100644 --- a/templates/index.html +++ b/templates/index.html @@ -5646,37 +5646,41 @@ let currentDeviceList = []; // SDR Device Usage Tracking - // Tracks which mode is using which device index + // Tracks which mode is using which device (keyed by "sdr_type:index") const sdrDeviceUsage = { - // deviceIndex: 'modeName' (e.g., 0: 'pager', 1: 'scanner') + // "sdr_type:index": 'modeName' (e.g., "rtlsdr:0": 'pager', "hackrf:0": 'scanner') }; - function getDeviceInUseBy(deviceIndex) { - return sdrDeviceUsage[deviceIndex] || null; + function getDeviceInUseBy(deviceIndex, sdrType) { + const key = `${sdrType || getSelectedSDRType()}:${deviceIndex}`; + return sdrDeviceUsage[key] || null; } - function isDeviceInUse(deviceIndex) { - return sdrDeviceUsage[deviceIndex] !== undefined; + function isDeviceInUse(deviceIndex, sdrType) { + const key = `${sdrType || getSelectedSDRType()}:${deviceIndex}`; + return sdrDeviceUsage[key] !== undefined; } - function reserveDevice(deviceIndex, modeName) { - sdrDeviceUsage[deviceIndex] = modeName; + function reserveDevice(deviceIndex, modeName, sdrType) { + const key = `${sdrType || getSelectedSDRType()}:${deviceIndex}`; + sdrDeviceUsage[key] = modeName; updateDeviceSelectStatus(); } function releaseDevice(modeName) { - for (const [idx, mode] of Object.entries(sdrDeviceUsage)) { + for (const [key, mode] of Object.entries(sdrDeviceUsage)) { if (mode === modeName) { - delete sdrDeviceUsage[idx]; + delete sdrDeviceUsage[key]; } } updateDeviceSelectStatus(); } function getAvailableDevice() { - // Find first device not in use + // Find first device not in use (within selected SDR type) + const sdrType = getSelectedSDRType(); for (const device of currentDeviceList) { - if (!isDeviceInUse(device.index)) { + if ((device.sdr_type || 'rtlsdr') === sdrType && !isDeviceInUse(device.index, sdrType)) { return device.index; } } @@ -5688,10 +5692,11 @@ const select = document.getElementById('deviceSelect'); if (!select) return; + const sdrType = getSelectedSDRType(); const options = select.querySelectorAll('option'); options.forEach(opt => { const idx = parseInt(opt.value); - const usedBy = getDeviceInUseBy(idx); + const usedBy = getDeviceInUseBy(idx, sdrType); const baseName = opt.textContent.replace(/ \[.*\]$/, ''); // Remove existing status if (usedBy) { opt.textContent = `${baseName} [${usedBy.toUpperCase()}]`; diff --git a/tests/test_morse.py b/tests/test_morse.py index b17c7de..22fafc0 100644 --- a/tests/test_morse.py +++ b/tests/test_morse.py @@ -257,8 +257,8 @@ class TestMorseLifecycleRoutes: released_devices = [] - monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None) - monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx)) + monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode, sdr_type='rtlsdr': None) + monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx, sdr_type='rtlsdr': released_devices.append(idx)) class DummyDevice: sdr_type = morse_routes.SDRType.RTL_SDR @@ -337,8 +337,8 @@ class TestMorseLifecycleRoutes: released_devices = [] - monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None) - monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx)) + monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode, sdr_type='rtlsdr': None) + monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx, sdr_type='rtlsdr': released_devices.append(idx)) class DummyDevice: sdr_type = morse_routes.SDRType.RTL_SDR @@ -421,8 +421,8 @@ class TestMorseLifecycleRoutes: released_devices = [] - monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None) - monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx)) + monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode, sdr_type='rtlsdr': None) + monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx, sdr_type='rtlsdr': released_devices.append(idx)) class DummyDevice: def __init__(self, index: int): diff --git a/tests/test_wefax.py b/tests/test_wefax.py index 9a62192..54c2d14 100644 --- a/tests/test_wefax.py +++ b/tests/test_wefax.py @@ -54,72 +54,72 @@ class TestWeFaxStations: from utils.wefax_stations import get_station assert get_station('noj') is not None - def test_get_station_not_found(self): - """get_station() should return None for unknown callsign.""" - from utils.wefax_stations import get_station - assert get_station('XXXXX') is None - - def test_resolve_tuning_frequency_auto_uses_carrier_for_known_station(self): - """Known station frequencies default to carrier-list behavior in auto mode.""" - from utils.wefax_stations import resolve_tuning_frequency_khz - - tuned, reference, offset_applied = resolve_tuning_frequency_khz( - listed_frequency_khz=4298.0, - station_callsign='NOJ', - frequency_reference='auto', - ) - - assert math.isclose(tuned, 4296.1, abs_tol=1e-6) - assert reference == 'carrier' - assert offset_applied is True - - def test_resolve_tuning_frequency_auto_preserves_unknown_station_input(self): - """Ad-hoc frequencies (no station metadata) should be treated as dial.""" - from utils.wefax_stations import resolve_tuning_frequency_khz - - tuned, reference, offset_applied = resolve_tuning_frequency_khz( - listed_frequency_khz=4298.0, - station_callsign='', - frequency_reference='auto', - ) - - assert math.isclose(tuned, 4298.0, abs_tol=1e-6) - assert reference == 'dial' - assert offset_applied is False - - def test_resolve_tuning_frequency_dial_override(self): - """Explicit dial reference must bypass USB alignment.""" - from utils.wefax_stations import resolve_tuning_frequency_khz - - tuned, reference, offset_applied = resolve_tuning_frequency_khz( - listed_frequency_khz=4298.0, - station_callsign='NOJ', - frequency_reference='dial', - ) - - assert math.isclose(tuned, 4298.0, abs_tol=1e-6) - assert reference == 'dial' - assert offset_applied is False - - def test_resolve_tuning_frequency_rejects_invalid_reference(self): - """Invalid frequency reference values should raise a validation error.""" - from utils.wefax_stations import resolve_tuning_frequency_khz - - try: - resolve_tuning_frequency_khz( - listed_frequency_khz=4298.0, - station_callsign='NOJ', - frequency_reference='invalid', - ) - assert False, "Expected ValueError for invalid frequency_reference" - except ValueError as exc: - assert 'frequency_reference' in str(exc) - - def test_station_frequencies_have_khz(self): - """Each frequency entry must have 'khz' and 'description'.""" - from utils.wefax_stations import load_stations - for station in load_stations(): - for freq in station['frequencies']: + def test_get_station_not_found(self): + """get_station() should return None for unknown callsign.""" + from utils.wefax_stations import get_station + assert get_station('XXXXX') is None + + def test_resolve_tuning_frequency_auto_uses_carrier_for_known_station(self): + """Known station frequencies default to carrier-list behavior in auto mode.""" + from utils.wefax_stations import resolve_tuning_frequency_khz + + tuned, reference, offset_applied = resolve_tuning_frequency_khz( + listed_frequency_khz=4298.0, + station_callsign='NOJ', + frequency_reference='auto', + ) + + assert math.isclose(tuned, 4296.1, abs_tol=1e-6) + assert reference == 'carrier' + assert offset_applied is True + + def test_resolve_tuning_frequency_auto_preserves_unknown_station_input(self): + """Ad-hoc frequencies (no station metadata) should be treated as dial.""" + from utils.wefax_stations import resolve_tuning_frequency_khz + + tuned, reference, offset_applied = resolve_tuning_frequency_khz( + listed_frequency_khz=4298.0, + station_callsign='', + frequency_reference='auto', + ) + + assert math.isclose(tuned, 4298.0, abs_tol=1e-6) + assert reference == 'dial' + assert offset_applied is False + + def test_resolve_tuning_frequency_dial_override(self): + """Explicit dial reference must bypass USB alignment.""" + from utils.wefax_stations import resolve_tuning_frequency_khz + + tuned, reference, offset_applied = resolve_tuning_frequency_khz( + listed_frequency_khz=4298.0, + station_callsign='NOJ', + frequency_reference='dial', + ) + + assert math.isclose(tuned, 4298.0, abs_tol=1e-6) + assert reference == 'dial' + assert offset_applied is False + + def test_resolve_tuning_frequency_rejects_invalid_reference(self): + """Invalid frequency reference values should raise a validation error.""" + from utils.wefax_stations import resolve_tuning_frequency_khz + + try: + resolve_tuning_frequency_khz( + listed_frequency_khz=4298.0, + station_callsign='NOJ', + frequency_reference='invalid', + ) + assert False, "Expected ValueError for invalid frequency_reference" + except ValueError as exc: + assert 'frequency_reference' in str(exc) + + def test_station_frequencies_have_khz(self): + """Each frequency entry must have 'khz' and 'description'.""" + from utils.wefax_stations import load_stations + for station in load_stations(): + for freq in station['frequencies']: assert 'khz' in freq, f"{station['callsign']} missing khz" assert 'description' in freq, f"{station['callsign']} missing description" assert isinstance(freq['khz'], (int, float)) @@ -281,7 +281,7 @@ class TestWeFaxDecoder: # Route tests # --------------------------------------------------------------------------- -class TestWeFaxRoutes: +class TestWeFaxRoutes: """WeFax route endpoint tests.""" def test_status(self, client): @@ -390,11 +390,11 @@ class TestWeFaxRoutes: data = response.get_json() assert 'LPM' in data['message'] - def test_start_success(self, client): - """POST /wefax/start with valid params should succeed.""" - _login_session(client) - mock_decoder = MagicMock() - mock_decoder.is_running = False + def test_start_success(self, client): + """POST /wefax/start with valid params should succeed.""" + _login_session(client) + mock_decoder = MagicMock() + mock_decoder.is_running = False mock_decoder.start.return_value = True with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder), \ @@ -411,46 +411,46 @@ class TestWeFaxRoutes: content_type='application/json', ) - assert response.status_code == 200 - data = response.get_json() - assert data['status'] == 'started' - assert data['frequency_khz'] == 4298 - assert data['usb_offset_applied'] is True - assert math.isclose(data['tuned_frequency_khz'], 4296.1, abs_tol=1e-6) - assert data['frequency_reference'] == 'carrier' - assert data['station'] == 'NOJ' - mock_decoder.start.assert_called_once() - start_kwargs = mock_decoder.start.call_args.kwargs - assert math.isclose(start_kwargs['frequency_khz'], 4296.1, abs_tol=1e-6) - - def test_start_respects_dial_reference_override(self, client): - """POST /wefax/start with dial reference should not apply USB offset.""" - _login_session(client) - mock_decoder = MagicMock() - mock_decoder.is_running = False - mock_decoder.start.return_value = True - - with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder), \ - patch('routes.wefax.app_module.claim_sdr_device', return_value=None): - response = client.post( - '/wefax/start', - data=json.dumps({ - 'frequency_khz': 4298, - 'station': 'NOJ', - 'device': 0, - 'frequency_reference': 'dial', - }), - content_type='application/json', - ) - - assert response.status_code == 200 - data = response.get_json() - assert data['status'] == 'started' - assert data['usb_offset_applied'] is False - assert math.isclose(data['tuned_frequency_khz'], 4298.0, abs_tol=1e-6) - assert data['frequency_reference'] == 'dial' - start_kwargs = mock_decoder.start.call_args.kwargs - assert math.isclose(start_kwargs['frequency_khz'], 4298.0, abs_tol=1e-6) + assert response.status_code == 200 + data = response.get_json() + assert data['status'] == 'started' + assert data['frequency_khz'] == 4298 + assert data['usb_offset_applied'] is True + assert math.isclose(data['tuned_frequency_khz'], 4296.1, abs_tol=1e-6) + assert data['frequency_reference'] == 'carrier' + assert data['station'] == 'NOJ' + mock_decoder.start.assert_called_once() + start_kwargs = mock_decoder.start.call_args.kwargs + assert math.isclose(start_kwargs['frequency_khz'], 4296.1, abs_tol=1e-6) + + def test_start_respects_dial_reference_override(self, client): + """POST /wefax/start with dial reference should not apply USB offset.""" + _login_session(client) + mock_decoder = MagicMock() + mock_decoder.is_running = False + mock_decoder.start.return_value = True + + with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder), \ + patch('routes.wefax.app_module.claim_sdr_device', return_value=None): + response = client.post( + '/wefax/start', + data=json.dumps({ + 'frequency_khz': 4298, + 'station': 'NOJ', + 'device': 0, + 'frequency_reference': 'dial', + }), + content_type='application/json', + ) + + assert response.status_code == 200 + data = response.get_json() + assert data['status'] == 'started' + assert data['usb_offset_applied'] is False + assert math.isclose(data['tuned_frequency_khz'], 4298.0, abs_tol=1e-6) + assert data['frequency_reference'] == 'dial' + start_kwargs = mock_decoder.start.call_args.kwargs + assert math.isclose(start_kwargs['frequency_khz'], 4298.0, abs_tol=1e-6) def test_start_device_busy(self, client): """POST /wefax/start should return 409 when device is busy.""" @@ -509,83 +509,83 @@ class TestWeFaxRoutes: assert response.status_code == 400 - def test_delete_image_wrong_extension(self, client): - """DELETE /wefax/images/ should reject non-PNG.""" - _login_session(client) - mock_decoder = MagicMock() - - with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder): - response = client.delete('/wefax/images/test.jpg') - - assert response.status_code == 400 - - def test_schedule_enable_applies_usb_alignment(self, client): - """Scheduler should receive tuned USB dial frequency in auto mode.""" - _login_session(client) - mock_scheduler = MagicMock() - mock_scheduler.enable.return_value = { - 'enabled': True, - 'scheduled_count': 2, - 'total_broadcasts': 2, - } - - with patch('utils.wefax_scheduler.get_wefax_scheduler', return_value=mock_scheduler): - response = client.post( - '/wefax/schedule/enable', - data=json.dumps({ - 'station': 'NOJ', - 'frequency_khz': 4298, - 'device': 0, - }), - content_type='application/json', - ) - - assert response.status_code == 200 - data = response.get_json() - assert data['status'] == 'ok' - assert data['usb_offset_applied'] is True - assert math.isclose(data['tuned_frequency_khz'], 4296.1, abs_tol=1e-6) - enable_kwargs = mock_scheduler.enable.call_args.kwargs - assert math.isclose(enable_kwargs['frequency_khz'], 4296.1, abs_tol=1e-6) - - -class TestWeFaxProgressCallback: - """Regression tests for WeFax route-level progress callback behavior.""" - - def test_terminal_progress_releases_active_device(self): - """Terminal decoder events must release any manually claimed SDR.""" - import routes.wefax as wefax_routes - - original_device = wefax_routes.wefax_active_device - try: - wefax_routes.wefax_active_device = 3 - with patch('routes.wefax.app_module.release_sdr_device') as mock_release: - wefax_routes._progress_callback({ - 'type': 'wefax_progress', - 'status': 'error', - 'message': 'decode failed', - }) - - mock_release.assert_called_once_with(3) - assert wefax_routes.wefax_active_device is None - finally: - wefax_routes.wefax_active_device = original_device - - def test_non_terminal_progress_does_not_release_active_device(self): - """Non-terminal progress updates must not release SDR ownership.""" - import routes.wefax as wefax_routes - - original_device = wefax_routes.wefax_active_device - try: - wefax_routes.wefax_active_device = 4 - with patch('routes.wefax.app_module.release_sdr_device') as mock_release: - wefax_routes._progress_callback({ - 'type': 'wefax_progress', - 'status': 'receiving', - 'line_count': 120, - }) - - mock_release.assert_not_called() - assert wefax_routes.wefax_active_device == 4 - finally: - wefax_routes.wefax_active_device = original_device + def test_delete_image_wrong_extension(self, client): + """DELETE /wefax/images/ should reject non-PNG.""" + _login_session(client) + mock_decoder = MagicMock() + + with patch('routes.wefax.get_wefax_decoder', return_value=mock_decoder): + response = client.delete('/wefax/images/test.jpg') + + assert response.status_code == 400 + + def test_schedule_enable_applies_usb_alignment(self, client): + """Scheduler should receive tuned USB dial frequency in auto mode.""" + _login_session(client) + mock_scheduler = MagicMock() + mock_scheduler.enable.return_value = { + 'enabled': True, + 'scheduled_count': 2, + 'total_broadcasts': 2, + } + + with patch('utils.wefax_scheduler.get_wefax_scheduler', return_value=mock_scheduler): + response = client.post( + '/wefax/schedule/enable', + data=json.dumps({ + 'station': 'NOJ', + 'frequency_khz': 4298, + 'device': 0, + }), + content_type='application/json', + ) + + assert response.status_code == 200 + data = response.get_json() + assert data['status'] == 'ok' + assert data['usb_offset_applied'] is True + assert math.isclose(data['tuned_frequency_khz'], 4296.1, abs_tol=1e-6) + enable_kwargs = mock_scheduler.enable.call_args.kwargs + assert math.isclose(enable_kwargs['frequency_khz'], 4296.1, abs_tol=1e-6) + + +class TestWeFaxProgressCallback: + """Regression tests for WeFax route-level progress callback behavior.""" + + def test_terminal_progress_releases_active_device(self): + """Terminal decoder events must release any manually claimed SDR.""" + import routes.wefax as wefax_routes + + original_device = wefax_routes.wefax_active_device + try: + wefax_routes.wefax_active_device = 3 + with patch('routes.wefax.app_module.release_sdr_device') as mock_release: + wefax_routes._progress_callback({ + 'type': 'wefax_progress', + 'status': 'error', + 'message': 'decode failed', + }) + + mock_release.assert_called_once_with(3, 'rtlsdr') + assert wefax_routes.wefax_active_device is None + finally: + wefax_routes.wefax_active_device = original_device + + def test_non_terminal_progress_does_not_release_active_device(self): + """Non-terminal progress updates must not release SDR ownership.""" + import routes.wefax as wefax_routes + + original_device = wefax_routes.wefax_active_device + try: + wefax_routes.wefax_active_device = 4 + with patch('routes.wefax.app_module.release_sdr_device') as mock_release: + wefax_routes._progress_callback({ + 'type': 'wefax_progress', + 'status': 'receiving', + 'line_count': 120, + }) + + mock_release.assert_not_called() + assert wefax_routes.wefax_active_device == 4 + finally: + wefax_routes.wefax_active_device = original_device