diff --git a/app.py b/app.py index 5891d3d..cf1934a 100644 --- a/app.py +++ b/app.py @@ -671,7 +671,8 @@ def kill_all() -> Response: 'rtl_fm', 'multimon-ng', 'rtl_433', 'airodump-ng', 'aireplay-ng', 'airmon-ng', 'dump1090', 'acarsdec', 'direwolf', 'AIS-catcher', - 'hcitool', 'bluetoothctl', 'dsd' + 'hcitool', 'bluetoothctl', 'dsd', + 'rtl_tcp', 'rtl_power', 'rtlamr', 'ffmpeg', ] for proc in processes_to_kill: diff --git a/routes/acars.py b/routes/acars.py index 160ecd9..1615293 100644 --- a/routes/acars.py +++ b/routes/acars.py @@ -27,6 +27,7 @@ from utils.constants import ( SSE_QUEUE_TIMEOUT, PROCESS_START_WAIT, ) +from utils.process import register_process, unregister_process acars_bp = Blueprint('acars', __name__, url_prefix='/acars') @@ -144,9 +145,24 @@ def stream_acars_output(process: subprocess.Popen, is_text_mode: bool = False) - logger.error(f"ACARS stream error: {e}") app_module.acars_queue.put({'type': 'error', 'message': str(e)}) finally: + global acars_active_device + # Ensure process is terminated + try: + process.terminate() + process.wait(timeout=2) + except Exception: + try: + process.kill() + except Exception: + pass + unregister_process(process) app_module.acars_queue.put({'type': 'status', 'status': 'stopped'}) with app_module.acars_lock: app_module.acars_process = None + # Release SDR device + if acars_active_device is not None: + app_module.release_sdr_device(acars_active_device) + acars_active_device = None @acars_bp.route('/tools') @@ -311,6 +327,7 @@ def start_acars() -> Response: return jsonify({'status': 'error', 'message': error_msg}), 500 app_module.acars_process = process + register_process(process) # Start output streaming thread thread = threading.Thread( diff --git a/routes/audio_websocket.py b/routes/audio_websocket.py index 17abaa2..6d70d0b 100644 --- a/routes/audio_websocket.py +++ b/routes/audio_websocket.py @@ -66,12 +66,6 @@ def kill_audio_processes(): pass rtl_process = None - # Kill any orphaned processes - try: - subprocess.run(['pkill', '-9', '-f', 'rtl_fm'], capture_output=True, timeout=1) - except: - pass - time.sleep(0.3) @@ -228,13 +222,13 @@ def init_audio_websocket(app: Flask): except TimeoutError: pass - except Exception as e: - msg = str(e).lower() - if "connection closed" in msg: - logger.info("WebSocket closed by client") - break - if "timed out" not in msg: - logger.error(f"WebSocket receive error: {e}") + except Exception as e: + msg = str(e).lower() + if "connection closed" in msg: + logger.info("WebSocket closed by client") + break + if "timed out" not in msg: + logger.error(f"WebSocket receive error: {e}") # Stream audio data if active if streaming and proc and proc.poll() is None: diff --git a/routes/dmr.py b/routes/dmr.py index f02a644..8770583 100644 --- a/routes/dmr.py +++ b/routes/dmr.py @@ -18,6 +18,7 @@ from flask import Blueprint, jsonify, request, Response import app as app_module from utils.logging import get_logger from utils.sse import format_sse +from utils.process import register_process, unregister_process from utils.constants import ( SSE_QUEUE_TIMEOUT, SSE_KEEPALIVE_INTERVAL, @@ -244,6 +245,7 @@ def stream_dsd_output(rtl_process: subprocess.Popen, dsd_process: subprocess.Pop except Exception as e: logger.error(f"DSD stream error: {e}") finally: + global dmr_active_device, dmr_rtl_process, dmr_dsd_process dmr_running = False # Capture exit info for diagnostics rc = dsd_process.poll() @@ -258,7 +260,26 @@ def stream_dsd_output(rtl_process: subprocess.Popen, dsd_process: subprocess.Pop except Exception: pass logger.warning(f"DSD process exited with code {rc}: {detail}") + # Cleanup both processes + for proc in [dsd_process, rtl_process]: + if proc and proc.poll() is None: + try: + proc.terminate() + proc.wait(timeout=2) + except Exception: + try: + proc.kill() + except Exception: + pass + if proc: + unregister_process(proc) + dmr_rtl_process = None + dmr_dsd_process = None _queue_put({'type': 'status', 'text': reason, 'exit_code': rc, 'detail': detail}) + # Release SDR device + if dmr_active_device is not None: + app_module.release_sdr_device(dmr_active_device) + dmr_active_device = None logger.info("DSD stream thread stopped") @@ -354,6 +375,7 @@ def start_dmr() -> Response: stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) + register_process(dmr_rtl_process) dmr_dsd_process = subprocess.Popen( dsd_cmd, @@ -361,6 +383,7 @@ def start_dmr() -> Response: stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) + register_process(dmr_dsd_process) # Allow rtl_fm to send directly to dsd dmr_rtl_process.stdout.close() @@ -428,21 +451,24 @@ def stop_dmr() -> Response: """Stop digital voice decoding.""" global dmr_rtl_process, dmr_dsd_process, dmr_running, dmr_active_device - dmr_running = False + with dmr_lock: + dmr_running = False - for proc in [dmr_dsd_process, dmr_rtl_process]: - if proc and proc.poll() is None: - try: - proc.terminate() - proc.wait(timeout=2) - except Exception: + for proc in [dmr_dsd_process, dmr_rtl_process]: + if proc and proc.poll() is None: try: - proc.kill() + proc.terminate() + proc.wait(timeout=2) except Exception: - pass + try: + proc.kill() + except Exception: + pass + if proc: + unregister_process(proc) - dmr_rtl_process = None - dmr_dsd_process = None + dmr_rtl_process = None + dmr_dsd_process = None if dmr_active_device is not None: app_module.release_sdr_device(dmr_active_device) diff --git a/routes/dsc.py b/routes/dsc.py index 913b1ba..92853ed 100644 --- a/routes/dsc.py +++ b/routes/dsc.py @@ -39,6 +39,7 @@ from utils.sse import format_sse from utils.validation import validate_device_index, validate_gain from utils.sdr import SDRFactory, SDRType from utils.dependencies import get_tool_path +from utils.process import register_process, unregister_process logger = logging.getLogger('intercept.dsc') @@ -169,17 +170,34 @@ def stream_dsc_decoder(master_fd: int, decoder_process: subprocess.Popen) -> Non 'error': str(e) }) finally: + global dsc_active_device try: os.close(master_fd) except OSError: pass - decoder_process.wait() dsc_running = False + # Cleanup both processes + with app_module.dsc_lock: + rtl_proc = app_module.dsc_rtl_process + for proc in [rtl_proc, decoder_process]: + if proc: + try: + proc.terminate() + proc.wait(timeout=2) + except Exception: + try: + proc.kill() + except Exception: + pass + unregister_process(proc) app_module.dsc_queue.put({'type': 'status', 'status': 'stopped'}) - with app_module.dsc_lock: app_module.dsc_process = None app_module.dsc_rtl_process = None + # Release SDR device + if dsc_active_device is not None: + app_module.release_sdr_device(dsc_active_device) + dsc_active_device = None def _store_critical_alert(msg: dict) -> None: @@ -362,6 +380,7 @@ def start_decoding() -> Response: stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + register_process(rtl_process) # Start stderr monitor thread stderr_thread = threading.Thread( @@ -382,6 +401,7 @@ def start_decoding() -> Response: stderr=slave_fd, close_fds=True ) + register_process(decoder_process) os.close(slave_fd) rtl_process.stdout.close() @@ -408,6 +428,15 @@ def start_decoding() -> Response: }) except FileNotFoundError as e: + # Kill orphaned rtl_fm process + try: + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + try: + rtl_process.kill() + except Exception: + pass # Release device on failure if dsc_active_device is not None: app_module.release_sdr_device(dsc_active_device) @@ -417,6 +446,15 @@ def start_decoding() -> Response: 'message': f'Tool not found: {e.filename}' }), 400 except Exception as e: + # Kill orphaned rtl_fm process if it was started + try: + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + try: + rtl_process.kill() + except Exception: + pass # Release device on failure if dsc_active_device is not None: app_module.release_sdr_device(dsc_active_device) diff --git a/routes/listening_post.py b/routes/listening_post.py index a2cccfc..d023207 100644 --- a/routes/listening_post.py +++ b/routes/listening_post.py @@ -891,17 +891,6 @@ def _stop_audio_stream_internal(): audio_process = None audio_rtl_process = None - # Kill any orphaned rtl_fm, rtl_power, and ffmpeg processes - for proc_pattern in ['rtl_fm', 'rtl_power']: - try: - subprocess.run(['pkill', '-9', proc_pattern], capture_output=True, timeout=0.5) - except Exception: - pass - try: - subprocess.run(['pkill', '-9', '-f', 'ffmpeg.*pipe:0'], capture_output=True, timeout=0.5) - except Exception: - pass - # Pause for SDR device USB interface to be released by kernel time.sleep(1.0) diff --git a/routes/pager.py b/routes/pager.py index 8a94d08..5dd1efe 100644 --- a/routes/pager.py +++ b/routes/pager.py @@ -23,7 +23,7 @@ from utils.validation import ( validate_rtl_tcp_host, validate_rtl_tcp_port ) from utils.sse import format_sse -from utils.process import safe_terminate, register_process +from utils.process import safe_terminate, register_process, unregister_process from utils.sdr import SDRFactory, SDRType, SDRValidationError from utils.dependencies import get_tool_path @@ -146,14 +146,32 @@ def stream_decoder(master_fd: int, process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.output_queue.put({'type': 'error', 'text': str(e)}) finally: + global pager_active_device try: os.close(master_fd) except OSError: pass - process.wait() + # Cleanup companion rtl_fm process and decoder + with app_module.process_lock: + rtl_proc = getattr(app_module.current_process, '_rtl_process', None) + for proc in [rtl_proc, process]: + if proc: + try: + proc.terminate() + proc.wait(timeout=2) + except Exception: + try: + proc.kill() + except Exception: + pass + unregister_process(proc) app_module.output_queue.put({'type': 'status', 'text': 'stopped'}) with app_module.process_lock: app_module.current_process = None + # Release SDR device + if pager_active_device is not None: + app_module.release_sdr_device(pager_active_device) + pager_active_device = None @pager_bp.route('/start', methods=['POST']) @@ -281,6 +299,7 @@ def start_decoding() -> Response: stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + register_process(rtl_process) # Start a thread to monitor rtl_fm stderr for errors def monitor_rtl_stderr(): @@ -304,6 +323,7 @@ def start_decoding() -> Response: stderr=slave_fd, close_fds=True ) + register_process(multimon_process) os.close(slave_fd) rtl_process.stdout.close() @@ -322,12 +342,30 @@ def start_decoding() -> Response: return jsonify({'status': 'started', 'command': full_cmd}) except FileNotFoundError as e: + # Kill orphaned rtl_fm process + try: + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + try: + rtl_process.kill() + except Exception: + pass # Release device on failure if pager_active_device is not None: app_module.release_sdr_device(pager_active_device) pager_active_device = None return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}) except Exception as e: + # Kill orphaned rtl_fm process if it was started + try: + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + try: + rtl_process.kill() + except Exception: + pass # Release device on failure if pager_active_device is not None: app_module.release_sdr_device(pager_active_device) diff --git a/routes/rtlamr.py b/routes/rtlamr.py index a269d67..8d31167 100644 --- a/routes/rtlamr.py +++ b/routes/rtlamr.py @@ -18,7 +18,7 @@ from utils.validation import ( validate_frequency, validate_device_index, validate_gain, validate_ppm ) from utils.sse import format_sse -from utils.process import safe_terminate, register_process +from utils.process import safe_terminate, register_process, unregister_process rtlamr_bp = Blueprint('rtlamr', __name__) @@ -61,10 +61,37 @@ def stream_rtlamr_output(process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.rtlamr_queue.put({'type': 'error', 'text': str(e)}) finally: - process.wait() + global rtl_tcp_process, rtlamr_active_device + # Ensure rtlamr process is terminated + try: + process.terminate() + process.wait(timeout=2) + except Exception: + try: + process.kill() + except Exception: + pass + unregister_process(process) + # Kill companion rtl_tcp process + with rtl_tcp_lock: + if rtl_tcp_process: + try: + rtl_tcp_process.terminate() + rtl_tcp_process.wait(timeout=2) + except Exception: + try: + rtl_tcp_process.kill() + except Exception: + pass + unregister_process(rtl_tcp_process) + rtl_tcp_process = None app_module.rtlamr_queue.put({'type': 'status', 'text': 'stopped'}) with app_module.rtlamr_lock: app_module.rtlamr_process = None + # Release SDR device + if rtlamr_active_device is not None: + app_module.release_sdr_device(rtlamr_active_device) + rtlamr_active_device = None @rtlamr_bp.route('/start_rtlamr', methods=['POST']) @@ -133,7 +160,8 @@ def start_rtlamr() -> Response: stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - + register_process(rtl_tcp_process) + # Wait a moment for rtl_tcp to start time.sleep(3) @@ -141,6 +169,10 @@ def start_rtlamr() -> Response: app_module.rtlamr_queue.put({'type': 'info', 'text': f'rtl_tcp: {" ".join(rtl_tcp_cmd)}'}) except Exception as e: logger.error(f"Failed to start rtl_tcp: {e}") + # Release SDR device on rtl_tcp failure + if rtlamr_active_device is not None: + app_module.release_sdr_device(rtlamr_active_device) + rtlamr_active_device = None return jsonify({'status': 'error', 'message': f'Failed to start rtl_tcp: {e}'}), 500 # Build rtlamr command @@ -174,6 +206,7 @@ def start_rtlamr() -> Response: stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + register_process(app_module.rtlamr_process) # Start output thread thread = threading.Thread(target=stream_rtlamr_output, args=(app_module.rtlamr_process,)) diff --git a/routes/sensor.py b/routes/sensor.py index e6dec53..da99c8f 100644 --- a/routes/sensor.py +++ b/routes/sensor.py @@ -19,7 +19,7 @@ from utils.validation import ( validate_rtl_tcp_host, validate_rtl_tcp_port ) from utils.sse import format_sse -from utils.process import safe_terminate, register_process +from utils.process import safe_terminate, register_process, unregister_process from utils.sdr import SDRFactory, SDRType sensor_bp = Blueprint('sensor', __name__) @@ -59,10 +59,24 @@ def stream_sensor_output(process: subprocess.Popen[bytes]) -> None: except Exception as e: app_module.sensor_queue.put({'type': 'error', 'text': str(e)}) finally: - process.wait() + global sensor_active_device + # Ensure process is terminated + try: + process.terminate() + process.wait(timeout=2) + except Exception: + try: + process.kill() + except Exception: + pass + unregister_process(process) app_module.sensor_queue.put({'type': 'status', 'text': 'stopped'}) with app_module.sensor_lock: app_module.sensor_process = None + # Release SDR device + if sensor_active_device is not None: + app_module.release_sdr_device(sensor_active_device) + sensor_active_device = None @sensor_bp.route('/start_sensor', methods=['POST']) @@ -149,6 +163,7 @@ def start_sensor() -> Response: stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + register_process(app_module.sensor_process) # Start output thread thread = threading.Thread(target=stream_sensor_output, args=(app_module.sensor_process,))