from __future__ import annotations import atexit import logging import signal import subprocess import re import threading import time from typing import Any, Callable from .dependencies import check_tool logger = logging.getLogger('intercept.process') # Track all spawned processes for cleanup _spawned_processes: list[subprocess.Popen] = [] _process_lock = threading.Lock() def register_process(process: subprocess.Popen) -> None: """Register a spawned process for cleanup on exit.""" with _process_lock: _spawned_processes.append(process) def unregister_process(process: subprocess.Popen) -> None: """Unregister a process from cleanup list.""" with _process_lock: if process in _spawned_processes: _spawned_processes.remove(process) def cleanup_all_processes() -> None: """Clean up all registered processes on exit.""" logger.info("Cleaning up all spawned processes...") with _process_lock: for process in _spawned_processes: if process and process.poll() is None: try: process.terminate() process.wait(timeout=2) except subprocess.TimeoutExpired: process.kill() except Exception as e: logger.warning(f"Error cleaning up process: {e}") _spawned_processes.clear() def safe_terminate(process: subprocess.Popen | None, timeout: float = 2.0) -> bool: """ Safely terminate a process. Args: process: Process to terminate timeout: Seconds to wait before killing Returns: True if process was terminated, False if already dead or None """ if not process: return False if process.poll() is not None: # Already dead unregister_process(process) return False try: process.terminate() process.wait(timeout=timeout) unregister_process(process) return True except subprocess.TimeoutExpired: process.kill() unregister_process(process) return True except Exception as e: logger.warning(f"Error terminating process: {e}") return False # Register cleanup handlers atexit.register(cleanup_all_processes) # Handle signals for graceful shutdown def _signal_handler(signum, frame): """Handle termination signals. Keep this minimal — logging and lock acquisition in signal handlers can deadlock when another thread holds the logging or process lock. Process cleanup is handled by the atexit handler registered above. """ import sys if signum == signal.SIGINT: raise KeyboardInterrupt() sys.exit(0) # Only register signal handlers if we're not in a thread try: signal.signal(signal.SIGTERM, _signal_handler) signal.signal(signal.SIGINT, _signal_handler) except ValueError: # Can't set signal handlers from a thread pass def cleanup_stale_processes() -> None: """Kill any stale processes from previous runs (but not system services).""" # Note: dump1090 is NOT included here as users may run it as a system service processes_to_kill = ['rtl_adsb', 'rtl_433', 'multimon-ng', 'rtl_fm'] for proc_name in processes_to_kill: try: subprocess.run(['pkill', '-9', proc_name], capture_output=True) except (subprocess.SubprocessError, OSError): pass def is_valid_mac(mac: str | None) -> bool: """Validate MAC address format.""" if not mac: return False return bool(re.match(r'^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$', mac)) def is_valid_channel(channel: str | int | None) -> bool: """Validate WiFi channel number.""" try: ch = int(channel) # type: ignore[arg-type] return 1 <= ch <= 200 except (ValueError, TypeError): return False def detect_devices() -> list[dict[str, Any]]: """Detect RTL-SDR devices.""" devices: list[dict[str, Any]] = [] if not check_tool('rtl_test'): return devices try: result = subprocess.run( ['rtl_test', '-t'], capture_output=True, text=True, timeout=5 ) output = result.stderr + result.stdout # Parse device info device_pattern = r'(\d+):\s+(.+?)(?:,\s*SN:\s*(\S+))?$' for line in output.split('\n'): line = line.strip() match = re.match(device_pattern, line) if match: devices.append({ 'index': int(match.group(1)), 'name': match.group(2).strip().rstrip(','), 'serial': match.group(3) or 'N/A' }) if not devices: found_match = re.search(r'Found (\d+) device', output) if found_match: count = int(found_match.group(1)) for i in range(count): devices.append({ 'index': i, 'name': f'RTL-SDR Device {i}', 'serial': 'Unknown' }) except Exception: pass return devices