Fix SDR device lock-up from unreleased device registry on process crash

Stream threads for sensor, pager, acars, rtlamr, dmr, and dsc modes
never called release_sdr_device() when their SDR process crashed,
leaving devices permanently locked in the registry. Also fixes orphaned
companion processes (rtl_fm, rtl_tcp) not being killed on crash, start
path failures leaking processes, DMR stop handler missing lock, and
listening post/audio websocket pkill nuking all system-wide rtl_fm
processes. Wires up register_process()/unregister_process() so the
atexit/signal cleanup safety net actually works, and adds rtl_tcp,
rtl_power, rtlamr, ffmpeg to the killall endpoint.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-07 15:18:15 +00:00
parent b208576068
commit 60d3cff5e7
9 changed files with 196 additions and 45 deletions

3
app.py
View File

@@ -671,7 +671,8 @@ def kill_all() -> Response:
'rtl_fm', 'multimon-ng', 'rtl_433',
'airodump-ng', 'aireplay-ng', 'airmon-ng',
'dump1090', 'acarsdec', 'direwolf', 'AIS-catcher',
'hcitool', 'bluetoothctl', 'dsd'
'hcitool', 'bluetoothctl', 'dsd',
'rtl_tcp', 'rtl_power', 'rtlamr', 'ffmpeg',
]
for proc in processes_to_kill:

View File

@@ -27,6 +27,7 @@ from utils.constants import (
SSE_QUEUE_TIMEOUT,
PROCESS_START_WAIT,
)
from utils.process import register_process, unregister_process
acars_bp = Blueprint('acars', __name__, url_prefix='/acars')
@@ -144,9 +145,24 @@ def stream_acars_output(process: subprocess.Popen, is_text_mode: bool = False) -
logger.error(f"ACARS stream error: {e}")
app_module.acars_queue.put({'type': 'error', 'message': str(e)})
finally:
global acars_active_device
# Ensure process is terminated
try:
process.terminate()
process.wait(timeout=2)
except Exception:
try:
process.kill()
except Exception:
pass
unregister_process(process)
app_module.acars_queue.put({'type': 'status', 'status': 'stopped'})
with app_module.acars_lock:
app_module.acars_process = None
# Release SDR device
if acars_active_device is not None:
app_module.release_sdr_device(acars_active_device)
acars_active_device = None
@acars_bp.route('/tools')
@@ -311,6 +327,7 @@ def start_acars() -> Response:
return jsonify({'status': 'error', 'message': error_msg}), 500
app_module.acars_process = process
register_process(process)
# Start output streaming thread
thread = threading.Thread(

View File

@@ -66,12 +66,6 @@ def kill_audio_processes():
pass
rtl_process = None
# Kill any orphaned processes
try:
subprocess.run(['pkill', '-9', '-f', 'rtl_fm'], capture_output=True, timeout=1)
except:
pass
time.sleep(0.3)
@@ -228,13 +222,13 @@ def init_audio_websocket(app: Flask):
except TimeoutError:
pass
except Exception as e:
msg = str(e).lower()
if "connection closed" in msg:
logger.info("WebSocket closed by client")
break
if "timed out" not in msg:
logger.error(f"WebSocket receive error: {e}")
except Exception as e:
msg = str(e).lower()
if "connection closed" in msg:
logger.info("WebSocket closed by client")
break
if "timed out" not in msg:
logger.error(f"WebSocket receive error: {e}")
# Stream audio data if active
if streaming and proc and proc.poll() is None:

View File

@@ -18,6 +18,7 @@ from flask import Blueprint, jsonify, request, Response
import app as app_module
from utils.logging import get_logger
from utils.sse import format_sse
from utils.process import register_process, unregister_process
from utils.constants import (
SSE_QUEUE_TIMEOUT,
SSE_KEEPALIVE_INTERVAL,
@@ -244,6 +245,7 @@ def stream_dsd_output(rtl_process: subprocess.Popen, dsd_process: subprocess.Pop
except Exception as e:
logger.error(f"DSD stream error: {e}")
finally:
global dmr_active_device, dmr_rtl_process, dmr_dsd_process
dmr_running = False
# Capture exit info for diagnostics
rc = dsd_process.poll()
@@ -258,7 +260,26 @@ def stream_dsd_output(rtl_process: subprocess.Popen, dsd_process: subprocess.Pop
except Exception:
pass
logger.warning(f"DSD process exited with code {rc}: {detail}")
# Cleanup both processes
for proc in [dsd_process, rtl_process]:
if proc and proc.poll() is None:
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
try:
proc.kill()
except Exception:
pass
if proc:
unregister_process(proc)
dmr_rtl_process = None
dmr_dsd_process = None
_queue_put({'type': 'status', 'text': reason, 'exit_code': rc, 'detail': detail})
# Release SDR device
if dmr_active_device is not None:
app_module.release_sdr_device(dmr_active_device)
dmr_active_device = None
logger.info("DSD stream thread stopped")
@@ -354,6 +375,7 @@ def start_dmr() -> Response:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
register_process(dmr_rtl_process)
dmr_dsd_process = subprocess.Popen(
dsd_cmd,
@@ -361,6 +383,7 @@ def start_dmr() -> Response:
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
register_process(dmr_dsd_process)
# Allow rtl_fm to send directly to dsd
dmr_rtl_process.stdout.close()
@@ -428,21 +451,24 @@ def stop_dmr() -> Response:
"""Stop digital voice decoding."""
global dmr_rtl_process, dmr_dsd_process, dmr_running, dmr_active_device
dmr_running = False
with dmr_lock:
dmr_running = False
for proc in [dmr_dsd_process, dmr_rtl_process]:
if proc and proc.poll() is None:
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
for proc in [dmr_dsd_process, dmr_rtl_process]:
if proc and proc.poll() is None:
try:
proc.kill()
proc.terminate()
proc.wait(timeout=2)
except Exception:
pass
try:
proc.kill()
except Exception:
pass
if proc:
unregister_process(proc)
dmr_rtl_process = None
dmr_dsd_process = None
dmr_rtl_process = None
dmr_dsd_process = None
if dmr_active_device is not None:
app_module.release_sdr_device(dmr_active_device)

View File

@@ -39,6 +39,7 @@ from utils.sse import format_sse
from utils.validation import validate_device_index, validate_gain
from utils.sdr import SDRFactory, SDRType
from utils.dependencies import get_tool_path
from utils.process import register_process, unregister_process
logger = logging.getLogger('intercept.dsc')
@@ -169,17 +170,34 @@ def stream_dsc_decoder(master_fd: int, decoder_process: subprocess.Popen) -> Non
'error': str(e)
})
finally:
global dsc_active_device
try:
os.close(master_fd)
except OSError:
pass
decoder_process.wait()
dsc_running = False
# Cleanup both processes
with app_module.dsc_lock:
rtl_proc = app_module.dsc_rtl_process
for proc in [rtl_proc, decoder_process]:
if proc:
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
try:
proc.kill()
except Exception:
pass
unregister_process(proc)
app_module.dsc_queue.put({'type': 'status', 'status': 'stopped'})
with app_module.dsc_lock:
app_module.dsc_process = None
app_module.dsc_rtl_process = None
# Release SDR device
if dsc_active_device is not None:
app_module.release_sdr_device(dsc_active_device)
dsc_active_device = None
def _store_critical_alert(msg: dict) -> None:
@@ -362,6 +380,7 @@ def start_decoding() -> Response:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
register_process(rtl_process)
# Start stderr monitor thread
stderr_thread = threading.Thread(
@@ -382,6 +401,7 @@ def start_decoding() -> Response:
stderr=slave_fd,
close_fds=True
)
register_process(decoder_process)
os.close(slave_fd)
rtl_process.stdout.close()
@@ -408,6 +428,15 @@ def start_decoding() -> Response:
})
except FileNotFoundError as e:
# Kill orphaned rtl_fm process
try:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
try:
rtl_process.kill()
except Exception:
pass
# Release device on failure
if dsc_active_device is not None:
app_module.release_sdr_device(dsc_active_device)
@@ -417,6 +446,15 @@ def start_decoding() -> Response:
'message': f'Tool not found: {e.filename}'
}), 400
except Exception as e:
# Kill orphaned rtl_fm process if it was started
try:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
try:
rtl_process.kill()
except Exception:
pass
# Release device on failure
if dsc_active_device is not None:
app_module.release_sdr_device(dsc_active_device)

View File

@@ -891,17 +891,6 @@ def _stop_audio_stream_internal():
audio_process = None
audio_rtl_process = None
# Kill any orphaned rtl_fm, rtl_power, and ffmpeg processes
for proc_pattern in ['rtl_fm', 'rtl_power']:
try:
subprocess.run(['pkill', '-9', proc_pattern], capture_output=True, timeout=0.5)
except Exception:
pass
try:
subprocess.run(['pkill', '-9', '-f', 'ffmpeg.*pipe:0'], capture_output=True, timeout=0.5)
except Exception:
pass
# Pause for SDR device USB interface to be released by kernel
time.sleep(1.0)

View File

@@ -23,7 +23,7 @@ from utils.validation import (
validate_rtl_tcp_host, validate_rtl_tcp_port
)
from utils.sse import format_sse
from utils.process import safe_terminate, register_process
from utils.process import safe_terminate, register_process, unregister_process
from utils.sdr import SDRFactory, SDRType, SDRValidationError
from utils.dependencies import get_tool_path
@@ -146,14 +146,32 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None:
except Exception as e:
app_module.output_queue.put({'type': 'error', 'text': str(e)})
finally:
global pager_active_device
try:
os.close(master_fd)
except OSError:
pass
process.wait()
# Cleanup companion rtl_fm process and decoder
with app_module.process_lock:
rtl_proc = getattr(app_module.current_process, '_rtl_process', None)
for proc in [rtl_proc, process]:
if proc:
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
try:
proc.kill()
except Exception:
pass
unregister_process(proc)
app_module.output_queue.put({'type': 'status', 'text': 'stopped'})
with app_module.process_lock:
app_module.current_process = None
# Release SDR device
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
pager_active_device = None
@pager_bp.route('/start', methods=['POST'])
@@ -281,6 +299,7 @@ def start_decoding() -> Response:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
register_process(rtl_process)
# Start a thread to monitor rtl_fm stderr for errors
def monitor_rtl_stderr():
@@ -304,6 +323,7 @@ def start_decoding() -> Response:
stderr=slave_fd,
close_fds=True
)
register_process(multimon_process)
os.close(slave_fd)
rtl_process.stdout.close()
@@ -322,12 +342,30 @@ def start_decoding() -> Response:
return jsonify({'status': 'started', 'command': full_cmd})
except FileNotFoundError as e:
# Kill orphaned rtl_fm process
try:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
try:
rtl_process.kill()
except Exception:
pass
# Release device on failure
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)
pager_active_device = None
return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'})
except Exception as e:
# Kill orphaned rtl_fm process if it was started
try:
rtl_process.terminate()
rtl_process.wait(timeout=2)
except Exception:
try:
rtl_process.kill()
except Exception:
pass
# Release device on failure
if pager_active_device is not None:
app_module.release_sdr_device(pager_active_device)

View File

@@ -18,7 +18,7 @@ from utils.validation import (
validate_frequency, validate_device_index, validate_gain, validate_ppm
)
from utils.sse import format_sse
from utils.process import safe_terminate, register_process
from utils.process import safe_terminate, register_process, unregister_process
rtlamr_bp = Blueprint('rtlamr', __name__)
@@ -61,10 +61,37 @@ def stream_rtlamr_output(process: subprocess.Popen[bytes]) -> None:
except Exception as e:
app_module.rtlamr_queue.put({'type': 'error', 'text': str(e)})
finally:
process.wait()
global rtl_tcp_process, rtlamr_active_device
# Ensure rtlamr process is terminated
try:
process.terminate()
process.wait(timeout=2)
except Exception:
try:
process.kill()
except Exception:
pass
unregister_process(process)
# Kill companion rtl_tcp process
with rtl_tcp_lock:
if rtl_tcp_process:
try:
rtl_tcp_process.terminate()
rtl_tcp_process.wait(timeout=2)
except Exception:
try:
rtl_tcp_process.kill()
except Exception:
pass
unregister_process(rtl_tcp_process)
rtl_tcp_process = None
app_module.rtlamr_queue.put({'type': 'status', 'text': 'stopped'})
with app_module.rtlamr_lock:
app_module.rtlamr_process = None
# Release SDR device
if rtlamr_active_device is not None:
app_module.release_sdr_device(rtlamr_active_device)
rtlamr_active_device = None
@rtlamr_bp.route('/start_rtlamr', methods=['POST'])
@@ -133,7 +160,8 @@ def start_rtlamr() -> Response:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
register_process(rtl_tcp_process)
# Wait a moment for rtl_tcp to start
time.sleep(3)
@@ -141,6 +169,10 @@ def start_rtlamr() -> Response:
app_module.rtlamr_queue.put({'type': 'info', 'text': f'rtl_tcp: {" ".join(rtl_tcp_cmd)}'})
except Exception as e:
logger.error(f"Failed to start rtl_tcp: {e}")
# Release SDR device on rtl_tcp failure
if rtlamr_active_device is not None:
app_module.release_sdr_device(rtlamr_active_device)
rtlamr_active_device = None
return jsonify({'status': 'error', 'message': f'Failed to start rtl_tcp: {e}'}), 500
# Build rtlamr command
@@ -174,6 +206,7 @@ def start_rtlamr() -> Response:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
register_process(app_module.rtlamr_process)
# Start output thread
thread = threading.Thread(target=stream_rtlamr_output, args=(app_module.rtlamr_process,))

View File

@@ -19,7 +19,7 @@ from utils.validation import (
validate_rtl_tcp_host, validate_rtl_tcp_port
)
from utils.sse import format_sse
from utils.process import safe_terminate, register_process
from utils.process import safe_terminate, register_process, unregister_process
from utils.sdr import SDRFactory, SDRType
sensor_bp = Blueprint('sensor', __name__)
@@ -59,10 +59,24 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None:
except Exception as e:
app_module.sensor_queue.put({'type': 'error', 'text': str(e)})
finally:
process.wait()
global sensor_active_device
# Ensure process is terminated
try:
process.terminate()
process.wait(timeout=2)
except Exception:
try:
process.kill()
except Exception:
pass
unregister_process(process)
app_module.sensor_queue.put({'type': 'status', 'text': 'stopped'})
with app_module.sensor_lock:
app_module.sensor_process = None
# Release SDR device
if sensor_active_device is not None:
app_module.release_sdr_device(sensor_active_device)
sensor_active_device = None
@sensor_bp.route('/start_sensor', methods=['POST'])
@@ -149,6 +163,7 @@ def start_sensor() -> Response:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
register_process(app_module.sensor_process)
# Start output thread
thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,))