diff --git a/routes/waterfall_websocket.py b/routes/waterfall_websocket.py index 75c937b..7ca757d 100644 --- a/routes/waterfall_websocket.py +++ b/routes/waterfall_websocket.py @@ -11,14 +11,14 @@ from typing import Any import numpy as np from flask import Flask - -try: - from flask_sock import Sock - WEBSOCKET_AVAILABLE = True -except ImportError: - WEBSOCKET_AVAILABLE = False - Sock = None - + +try: + from flask_sock import Sock + WEBSOCKET_AVAILABLE = True +except ImportError: + WEBSOCKET_AVAILABLE = False + Sock = None + from utils.logging import get_logger from utils.process import register_process, safe_terminate, unregister_process from utils.sdr import SDRFactory, SDRType @@ -46,7 +46,7 @@ _shared_state: dict[str, Any] = { 'monitor_modulation': 'wfm', 'monitor_squelch': 0, } - + # Maximum bandwidth per SDR type (Hz) MAX_BANDWIDTH = { SDRType.RTL_SDR: 2400000, @@ -290,48 +290,48 @@ def _pick_sample_rate(span_hz: int, caps: SDRCapabilities, sdr_type: SDRType) -> def _resolve_sdr_type(sdr_type_str: str) -> SDRType: """Convert client sdr_type string to SDRType enum.""" - mapping = { - 'rtlsdr': SDRType.RTL_SDR, - 'rtl_sdr': SDRType.RTL_SDR, - 'hackrf': SDRType.HACKRF, - 'limesdr': SDRType.LIME_SDR, - 'lime_sdr': SDRType.LIME_SDR, - 'airspy': SDRType.AIRSPY, - 'sdrplay': SDRType.SDRPLAY, - } - return mapping.get(sdr_type_str.lower(), SDRType.RTL_SDR) - - -def _build_dummy_device(device_index: int, sdr_type: SDRType) -> SDRDevice: - """Build a minimal SDRDevice for command building.""" - builder = SDRFactory.get_builder(sdr_type) - caps = builder.get_capabilities() - return SDRDevice( - sdr_type=sdr_type, - index=device_index, - name=f'{sdr_type.value}-{device_index}', - serial='N/A', - driver=sdr_type.value, - capabilities=caps, - ) - - -def init_waterfall_websocket(app: Flask): - """Initialize WebSocket waterfall streaming.""" - if not WEBSOCKET_AVAILABLE: - logger.warning("flask-sock not installed, WebSocket waterfall disabled") - return - - sock = Sock(app) - - @sock.route('/ws/waterfall') - def waterfall_stream(ws): - """WebSocket endpoint for real-time waterfall streaming.""" - logger.info("WebSocket waterfall client connected") - - # Import app module for device claiming - import app as app_module - + mapping = { + 'rtlsdr': SDRType.RTL_SDR, + 'rtl_sdr': SDRType.RTL_SDR, + 'hackrf': SDRType.HACKRF, + 'limesdr': SDRType.LIME_SDR, + 'lime_sdr': SDRType.LIME_SDR, + 'airspy': SDRType.AIRSPY, + 'sdrplay': SDRType.SDRPLAY, + } + return mapping.get(sdr_type_str.lower(), SDRType.RTL_SDR) + + +def _build_dummy_device(device_index: int, sdr_type: SDRType) -> SDRDevice: + """Build a minimal SDRDevice for command building.""" + builder = SDRFactory.get_builder(sdr_type) + caps = builder.get_capabilities() + return SDRDevice( + sdr_type=sdr_type, + index=device_index, + name=f'{sdr_type.value}-{device_index}', + serial='N/A', + driver=sdr_type.value, + capabilities=caps, + ) + + +def init_waterfall_websocket(app: Flask): + """Initialize WebSocket waterfall streaming.""" + if not WEBSOCKET_AVAILABLE: + logger.warning("flask-sock not installed, WebSocket waterfall disabled") + return + + sock = Sock(app) + + @sock.route('/ws/waterfall') + def waterfall_stream(ws): + """WebSocket endpoint for real-time waterfall streaming.""" + logger.info("WebSocket waterfall client connected") + + # Import app module for device claiming + import app as app_module + iq_process = None reader_thread = None stop_event = threading.Event() @@ -342,47 +342,47 @@ def init_waterfall_websocket(app: Flask): capture_span_mhz = 0.0 # Queue for outgoing messages — only the main loop touches ws.send() send_queue = queue.Queue(maxsize=120) - - try: - while True: - # Drain send queue first (non-blocking) - while True: - try: - outgoing = send_queue.get_nowait() - except queue.Empty: - break - try: - ws.send(outgoing) - except Exception: - stop_event.set() - break - - try: + + try: + while True: + # Drain send queue first (non-blocking) + while True: + try: + outgoing = send_queue.get_nowait() + except queue.Empty: + break + try: + ws.send(outgoing) + except Exception: + stop_event.set() + break + + try: msg = ws.receive(timeout=0.01) - except Exception as e: - err = str(e).lower() - if "closed" in err: - break - if "timed out" not in err: - logger.error(f"WebSocket receive error: {e}") - continue - - if msg is None: - # simple-websocket returns None on timeout AND on - # close; check ws.connected to tell them apart. - if not ws.connected: - break - if stop_event.is_set(): - break - continue - - try: - data = json.loads(msg) - except (json.JSONDecodeError, TypeError): - continue - - cmd = data.get('cmd') - + except Exception as e: + err = str(e).lower() + if "closed" in err: + break + if "timed out" not in err: + logger.error(f"WebSocket receive error: {e}") + continue + + if msg is None: + # simple-websocket returns None on timeout AND on + # close; check ws.connected to tell them apart. + if not ws.connected: + break + if stop_event.is_set(): + break + continue + + try: + data = json.loads(msg) + except (json.JSONDecodeError, TypeError): + continue + + cmd = data.get('cmd') + if cmd == 'start': # Stop any existing capture was_restarting = iq_process is not None @@ -462,8 +462,16 @@ def init_waterfall_websocket(app: Flask): start_freq = center_freq_mhz - effective_span_mhz / 2 end_freq = center_freq_mhz + effective_span_mhz / 2 - # Claim the device - claim_err = app_module.claim_sdr_device(device_index, 'waterfall') + # Claim the device (retry when restarting to allow + # the kernel time to release the USB handle). + max_claim_attempts = 4 if was_restarting else 1 + claim_err = None + for _claim_attempt in range(max_claim_attempts): + claim_err = app_module.claim_sdr_device(device_index, 'waterfall') + if not claim_err: + break + if _claim_attempt < max_claim_attempts - 1: + time.sleep(0.4) if claim_err: ws.send(json.dumps({ 'status': 'error', @@ -716,33 +724,33 @@ def init_waterfall_websocket(app: Flask): reader_thread.join(timeout=2) reader_thread = None if iq_process: - safe_terminate(iq_process) - unregister_process(iq_process) - iq_process = None + safe_terminate(iq_process) + unregister_process(iq_process) + iq_process = None if claimed_device is not None: app_module.release_sdr_device(claimed_device) claimed_device = None _set_shared_capture_state(running=False) stop_event.clear() ws.send(json.dumps({'status': 'stopped'})) - - except Exception as e: - logger.info(f"WebSocket waterfall closed: {e}") + + except Exception as e: + logger.info(f"WebSocket waterfall closed: {e}") finally: # Cleanup stop_event.set() if reader_thread and reader_thread.is_alive(): reader_thread.join(timeout=2) - if iq_process: - safe_terminate(iq_process) - unregister_process(iq_process) + if iq_process: + safe_terminate(iq_process) + unregister_process(iq_process) if claimed_device is not None: app_module.release_sdr_device(claimed_device) _set_shared_capture_state(running=False) # Complete WebSocket close handshake, then shut down the # raw socket so Werkzeug cannot write its HTTP 200 response - # on top of the WebSocket stream (which browsers see as - # "Invalid frame header"). + # on top of the WebSocket stream (which browsers see as + # "Invalid frame header"). with suppress(Exception): ws.close() with suppress(Exception): diff --git a/static/js/modes/waterfall.js b/static/js/modes/waterfall.js index 4a42a23..71ebb4b 100644 --- a/static/js/modes/waterfall.js +++ b/static/js/modes/waterfall.js @@ -2488,6 +2488,17 @@ const Waterfall = (function () { } else if (msg.status === 'error') { _running = false; _scanStartPending = false; + _pendingSharedMonitorRearm = false; + // If the monitor was using the shared IQ stream that + // just failed, tear down the stale monitor state so + // the button becomes clickable again after restart. + if (_monitoring && _monitorSource === 'waterfall') { + clearTimeout(_monitorRetuneTimer); + _monitoring = false; + _monitorSource = 'process'; + _syncMonitorButtons(); + _setMonitorState('Monitor stopped (waterfall error)'); + } if (_scanRunning) { _scanAwaitingCapture = true; _setScanState(msg.message || 'Waterfall retune error, retrying...', true);