mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 06:40:00 -07:00
fix: waterfall device claim fails on frequency change due to USB release lag
When restarting capture for a new frequency, the USB handle from the just-killed process may not be released by the kernel in time for the rtl_test probe inside claim_sdr_device. Add retry logic (up to 4 attempts with 0.4s backoff) matching the pattern already used by the audio start endpoint. Also clean up stale shared-monitor state in the frontend error handler so the monitor button is not left disabled when the capture restart fails.
This commit is contained in:
@@ -11,14 +11,14 @@ from typing import Any
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from flask import Flask
|
from flask import Flask
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from flask_sock import Sock
|
from flask_sock import Sock
|
||||||
WEBSOCKET_AVAILABLE = True
|
WEBSOCKET_AVAILABLE = True
|
||||||
except ImportError:
|
except ImportError:
|
||||||
WEBSOCKET_AVAILABLE = False
|
WEBSOCKET_AVAILABLE = False
|
||||||
Sock = None
|
Sock = None
|
||||||
|
|
||||||
from utils.logging import get_logger
|
from utils.logging import get_logger
|
||||||
from utils.process import register_process, safe_terminate, unregister_process
|
from utils.process import register_process, safe_terminate, unregister_process
|
||||||
from utils.sdr import SDRFactory, SDRType
|
from utils.sdr import SDRFactory, SDRType
|
||||||
@@ -46,7 +46,7 @@ _shared_state: dict[str, Any] = {
|
|||||||
'monitor_modulation': 'wfm',
|
'monitor_modulation': 'wfm',
|
||||||
'monitor_squelch': 0,
|
'monitor_squelch': 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
# Maximum bandwidth per SDR type (Hz)
|
# Maximum bandwidth per SDR type (Hz)
|
||||||
MAX_BANDWIDTH = {
|
MAX_BANDWIDTH = {
|
||||||
SDRType.RTL_SDR: 2400000,
|
SDRType.RTL_SDR: 2400000,
|
||||||
@@ -290,48 +290,48 @@ def _pick_sample_rate(span_hz: int, caps: SDRCapabilities, sdr_type: SDRType) ->
|
|||||||
|
|
||||||
def _resolve_sdr_type(sdr_type_str: str) -> SDRType:
|
def _resolve_sdr_type(sdr_type_str: str) -> SDRType:
|
||||||
"""Convert client sdr_type string to SDRType enum."""
|
"""Convert client sdr_type string to SDRType enum."""
|
||||||
mapping = {
|
mapping = {
|
||||||
'rtlsdr': SDRType.RTL_SDR,
|
'rtlsdr': SDRType.RTL_SDR,
|
||||||
'rtl_sdr': SDRType.RTL_SDR,
|
'rtl_sdr': SDRType.RTL_SDR,
|
||||||
'hackrf': SDRType.HACKRF,
|
'hackrf': SDRType.HACKRF,
|
||||||
'limesdr': SDRType.LIME_SDR,
|
'limesdr': SDRType.LIME_SDR,
|
||||||
'lime_sdr': SDRType.LIME_SDR,
|
'lime_sdr': SDRType.LIME_SDR,
|
||||||
'airspy': SDRType.AIRSPY,
|
'airspy': SDRType.AIRSPY,
|
||||||
'sdrplay': SDRType.SDRPLAY,
|
'sdrplay': SDRType.SDRPLAY,
|
||||||
}
|
}
|
||||||
return mapping.get(sdr_type_str.lower(), SDRType.RTL_SDR)
|
return mapping.get(sdr_type_str.lower(), SDRType.RTL_SDR)
|
||||||
|
|
||||||
|
|
||||||
def _build_dummy_device(device_index: int, sdr_type: SDRType) -> SDRDevice:
|
def _build_dummy_device(device_index: int, sdr_type: SDRType) -> SDRDevice:
|
||||||
"""Build a minimal SDRDevice for command building."""
|
"""Build a minimal SDRDevice for command building."""
|
||||||
builder = SDRFactory.get_builder(sdr_type)
|
builder = SDRFactory.get_builder(sdr_type)
|
||||||
caps = builder.get_capabilities()
|
caps = builder.get_capabilities()
|
||||||
return SDRDevice(
|
return SDRDevice(
|
||||||
sdr_type=sdr_type,
|
sdr_type=sdr_type,
|
||||||
index=device_index,
|
index=device_index,
|
||||||
name=f'{sdr_type.value}-{device_index}',
|
name=f'{sdr_type.value}-{device_index}',
|
||||||
serial='N/A',
|
serial='N/A',
|
||||||
driver=sdr_type.value,
|
driver=sdr_type.value,
|
||||||
capabilities=caps,
|
capabilities=caps,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def init_waterfall_websocket(app: Flask):
|
def init_waterfall_websocket(app: Flask):
|
||||||
"""Initialize WebSocket waterfall streaming."""
|
"""Initialize WebSocket waterfall streaming."""
|
||||||
if not WEBSOCKET_AVAILABLE:
|
if not WEBSOCKET_AVAILABLE:
|
||||||
logger.warning("flask-sock not installed, WebSocket waterfall disabled")
|
logger.warning("flask-sock not installed, WebSocket waterfall disabled")
|
||||||
return
|
return
|
||||||
|
|
||||||
sock = Sock(app)
|
sock = Sock(app)
|
||||||
|
|
||||||
@sock.route('/ws/waterfall')
|
@sock.route('/ws/waterfall')
|
||||||
def waterfall_stream(ws):
|
def waterfall_stream(ws):
|
||||||
"""WebSocket endpoint for real-time waterfall streaming."""
|
"""WebSocket endpoint for real-time waterfall streaming."""
|
||||||
logger.info("WebSocket waterfall client connected")
|
logger.info("WebSocket waterfall client connected")
|
||||||
|
|
||||||
# Import app module for device claiming
|
# Import app module for device claiming
|
||||||
import app as app_module
|
import app as app_module
|
||||||
|
|
||||||
iq_process = None
|
iq_process = None
|
||||||
reader_thread = None
|
reader_thread = None
|
||||||
stop_event = threading.Event()
|
stop_event = threading.Event()
|
||||||
@@ -342,47 +342,47 @@ def init_waterfall_websocket(app: Flask):
|
|||||||
capture_span_mhz = 0.0
|
capture_span_mhz = 0.0
|
||||||
# Queue for outgoing messages — only the main loop touches ws.send()
|
# Queue for outgoing messages — only the main loop touches ws.send()
|
||||||
send_queue = queue.Queue(maxsize=120)
|
send_queue = queue.Queue(maxsize=120)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
# Drain send queue first (non-blocking)
|
# Drain send queue first (non-blocking)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
outgoing = send_queue.get_nowait()
|
outgoing = send_queue.get_nowait()
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
ws.send(outgoing)
|
ws.send(outgoing)
|
||||||
except Exception:
|
except Exception:
|
||||||
stop_event.set()
|
stop_event.set()
|
||||||
break
|
break
|
||||||
|
|
||||||
try:
|
try:
|
||||||
msg = ws.receive(timeout=0.01)
|
msg = ws.receive(timeout=0.01)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
err = str(e).lower()
|
err = str(e).lower()
|
||||||
if "closed" in err:
|
if "closed" in err:
|
||||||
break
|
break
|
||||||
if "timed out" not in err:
|
if "timed out" not in err:
|
||||||
logger.error(f"WebSocket receive error: {e}")
|
logger.error(f"WebSocket receive error: {e}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if msg is None:
|
if msg is None:
|
||||||
# simple-websocket returns None on timeout AND on
|
# simple-websocket returns None on timeout AND on
|
||||||
# close; check ws.connected to tell them apart.
|
# close; check ws.connected to tell them apart.
|
||||||
if not ws.connected:
|
if not ws.connected:
|
||||||
break
|
break
|
||||||
if stop_event.is_set():
|
if stop_event.is_set():
|
||||||
break
|
break
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = json.loads(msg)
|
data = json.loads(msg)
|
||||||
except (json.JSONDecodeError, TypeError):
|
except (json.JSONDecodeError, TypeError):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
cmd = data.get('cmd')
|
cmd = data.get('cmd')
|
||||||
|
|
||||||
if cmd == 'start':
|
if cmd == 'start':
|
||||||
# Stop any existing capture
|
# Stop any existing capture
|
||||||
was_restarting = iq_process is not None
|
was_restarting = iq_process is not None
|
||||||
@@ -462,8 +462,16 @@ def init_waterfall_websocket(app: Flask):
|
|||||||
start_freq = center_freq_mhz - effective_span_mhz / 2
|
start_freq = center_freq_mhz - effective_span_mhz / 2
|
||||||
end_freq = center_freq_mhz + effective_span_mhz / 2
|
end_freq = center_freq_mhz + effective_span_mhz / 2
|
||||||
|
|
||||||
# Claim the device
|
# Claim the device (retry when restarting to allow
|
||||||
claim_err = app_module.claim_sdr_device(device_index, 'waterfall')
|
# the kernel time to release the USB handle).
|
||||||
|
max_claim_attempts = 4 if was_restarting else 1
|
||||||
|
claim_err = None
|
||||||
|
for _claim_attempt in range(max_claim_attempts):
|
||||||
|
claim_err = app_module.claim_sdr_device(device_index, 'waterfall')
|
||||||
|
if not claim_err:
|
||||||
|
break
|
||||||
|
if _claim_attempt < max_claim_attempts - 1:
|
||||||
|
time.sleep(0.4)
|
||||||
if claim_err:
|
if claim_err:
|
||||||
ws.send(json.dumps({
|
ws.send(json.dumps({
|
||||||
'status': 'error',
|
'status': 'error',
|
||||||
@@ -716,33 +724,33 @@ def init_waterfall_websocket(app: Flask):
|
|||||||
reader_thread.join(timeout=2)
|
reader_thread.join(timeout=2)
|
||||||
reader_thread = None
|
reader_thread = None
|
||||||
if iq_process:
|
if iq_process:
|
||||||
safe_terminate(iq_process)
|
safe_terminate(iq_process)
|
||||||
unregister_process(iq_process)
|
unregister_process(iq_process)
|
||||||
iq_process = None
|
iq_process = None
|
||||||
if claimed_device is not None:
|
if claimed_device is not None:
|
||||||
app_module.release_sdr_device(claimed_device)
|
app_module.release_sdr_device(claimed_device)
|
||||||
claimed_device = None
|
claimed_device = None
|
||||||
_set_shared_capture_state(running=False)
|
_set_shared_capture_state(running=False)
|
||||||
stop_event.clear()
|
stop_event.clear()
|
||||||
ws.send(json.dumps({'status': 'stopped'}))
|
ws.send(json.dumps({'status': 'stopped'}))
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.info(f"WebSocket waterfall closed: {e}")
|
logger.info(f"WebSocket waterfall closed: {e}")
|
||||||
finally:
|
finally:
|
||||||
# Cleanup
|
# Cleanup
|
||||||
stop_event.set()
|
stop_event.set()
|
||||||
if reader_thread and reader_thread.is_alive():
|
if reader_thread and reader_thread.is_alive():
|
||||||
reader_thread.join(timeout=2)
|
reader_thread.join(timeout=2)
|
||||||
if iq_process:
|
if iq_process:
|
||||||
safe_terminate(iq_process)
|
safe_terminate(iq_process)
|
||||||
unregister_process(iq_process)
|
unregister_process(iq_process)
|
||||||
if claimed_device is not None:
|
if claimed_device is not None:
|
||||||
app_module.release_sdr_device(claimed_device)
|
app_module.release_sdr_device(claimed_device)
|
||||||
_set_shared_capture_state(running=False)
|
_set_shared_capture_state(running=False)
|
||||||
# Complete WebSocket close handshake, then shut down the
|
# Complete WebSocket close handshake, then shut down the
|
||||||
# raw socket so Werkzeug cannot write its HTTP 200 response
|
# raw socket so Werkzeug cannot write its HTTP 200 response
|
||||||
# on top of the WebSocket stream (which browsers see as
|
# on top of the WebSocket stream (which browsers see as
|
||||||
# "Invalid frame header").
|
# "Invalid frame header").
|
||||||
with suppress(Exception):
|
with suppress(Exception):
|
||||||
ws.close()
|
ws.close()
|
||||||
with suppress(Exception):
|
with suppress(Exception):
|
||||||
|
|||||||
@@ -2488,6 +2488,17 @@ const Waterfall = (function () {
|
|||||||
} else if (msg.status === 'error') {
|
} else if (msg.status === 'error') {
|
||||||
_running = false;
|
_running = false;
|
||||||
_scanStartPending = false;
|
_scanStartPending = false;
|
||||||
|
_pendingSharedMonitorRearm = false;
|
||||||
|
// If the monitor was using the shared IQ stream that
|
||||||
|
// just failed, tear down the stale monitor state so
|
||||||
|
// the button becomes clickable again after restart.
|
||||||
|
if (_monitoring && _monitorSource === 'waterfall') {
|
||||||
|
clearTimeout(_monitorRetuneTimer);
|
||||||
|
_monitoring = false;
|
||||||
|
_monitorSource = 'process';
|
||||||
|
_syncMonitorButtons();
|
||||||
|
_setMonitorState('Monitor stopped (waterfall error)');
|
||||||
|
}
|
||||||
if (_scanRunning) {
|
if (_scanRunning) {
|
||||||
_scanAwaitingCapture = true;
|
_scanAwaitingCapture = true;
|
||||||
_setScanState(msg.message || 'Waterfall retune error, retrying...', true);
|
_setScanState(msg.message || 'Waterfall retune error, retrying...', true);
|
||||||
|
|||||||
Reference in New Issue
Block a user