From 0588055d1f845858c9a19f43031d609d34482223 Mon Sep 17 00:00:00 2001 From: James Smith Date: Fri, 12 Jun 2026 08:31:54 +0100 Subject: [PATCH] refactor: extract shared capability detection from agent utils/capabilities.py now owns interface detection and mode availability; the agent delegates via detect_interfaces() and detect_mode_availability(). The agent keeps config gating and tool_details population to preserve its result shape exactly. The moved fallback path uses utils.dependencies.check_tool instead of the agent's old shutil.which fallback; check_tool also searches Homebrew paths, a strict superset (strictly better detection). Co-Authored-By: Claude Fable 5 --- intercept_agent.py | 227 +++------------------------------- tests/test_capabilities.py | 43 +++++++ utils/capabilities.py | 243 +++++++++++++++++++++++++++++++++++++ 3 files changed, 301 insertions(+), 212 deletions(-) create mode 100644 tests/test_capabilities.py create mode 100644 utils/capabilities.py diff --git a/intercept_agent.py b/intercept_agent.py index d93fc1f..cd32358 100644 --- a/intercept_agent.py +++ b/intercept_agent.py @@ -36,12 +36,15 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # Import dependency checking from Intercept utils try: - from utils.dependencies import TOOL_DEPENDENCIES, check_all_dependencies, check_tool + from utils.dependencies import TOOL_DEPENDENCIES, check_all_dependencies HAS_DEPENDENCIES_MODULE = True except ImportError: HAS_DEPENDENCIES_MODULE = False +# Shared capability detection (extracted so app and agent cannot drift) +from utils.capabilities import MODE_DEPENDENCY_MAP, detect_interfaces, detect_mode_availability + # Import TSCM modules for consistent analysis (same as local mode) try: from utils.tscm.correlation import CorrelationEngine @@ -391,14 +394,6 @@ class ModeManager: logger.warning("Dependencies module not available") return self._dependencies - def _check_tool(self, tool_name: str) -> bool: - """Check if a tool is available using Intercept's dependency checker.""" - deps = self._get_dependencies() - if deps and hasattr(deps, "check_tool"): - return deps.check_tool(tool_name) - # Fallback to simple which check - return shutil.which(tool_name) is not None - def _get_tool_path(self, tool_name: str) -> str | None: """Get tool path using Intercept's dependency module.""" deps = self._get_dependencies() @@ -425,59 +420,30 @@ class ModeManager: "tool_details": {}, # Detailed tool status } - # Detect interfaces using Intercept's TSCM device detection - self._detect_interfaces(capabilities) + # Detect interfaces via shared capability detection + capabilities["interfaces"] = detect_interfaces() - # Use Intercept's comprehensive dependency checking if available + # Detect mode availability via shared capability detection + raw_modes = detect_mode_availability() + for mode, ready in raw_modes.items(): + # Apply this agent's per-mode config gating on top of tool readiness + capabilities["modes"][mode] = ready if config.modes_enabled.get(mode, True) else False + + # Populate detailed tool info for modes tracked in dependencies.py if HAS_DEPENDENCIES_MODULE: try: dep_status = check_all_dependencies() - # Map dependency status to mode availability - mode_mapping = { - "pager": "pager", - "sensor": "sensor", - "aircraft": "adsb", - "ais": "ais", - "acars": "acars", - "aprs": "aprs", - "wifi": "wifi", - "bluetooth": "bluetooth", - "tscm": "tscm", - "satellite": "satellite", - } - for dep_mode, cap_mode in mode_mapping.items(): + for dep_mode, cap_mode in MODE_DEPENDENCY_MAP.items(): if dep_mode in dep_status: mode_info = dep_status[dep_mode] - # Check if mode is enabled in config - if not config.modes_enabled.get(cap_mode, True): - capabilities["modes"][cap_mode] = False - else: - capabilities["modes"][cap_mode] = mode_info["ready"] - # Store detailed tool info capabilities["tool_details"][cap_mode] = { "name": mode_info["name"], "ready": mode_info["ready"], "missing_required": mode_info["missing_required"], "tools": mode_info["tools"], } - # Handle modes not in dependencies.py - extra_modes = ["dsc", "rtlamr", "listening_post"] - extra_tools = { - "dsc": ["rtl_fm"], - "rtlamr": ["rtlamr"], - "listening_post": ["rtl_fm"], - } - for mode in extra_modes: - if not config.modes_enabled.get(mode, True): - capabilities["modes"][mode] = False - else: - tools = extra_tools.get(mode, []) - capabilities["modes"][mode] = all(check_tool(tool) for tool in tools) if tools else True except Exception as e: - logger.warning(f"Dependency check failed, using fallback: {e}") - self._detect_capabilities_fallback(capabilities) - else: - self._detect_capabilities_fallback(capabilities) + logger.warning(f"Tool detail collection failed: {e}") # Use Intercept's SDR detection sdr_factory = self._get_sdr_factory() @@ -501,169 +467,6 @@ class ModeManager: self._capabilities = capabilities return capabilities - def _detect_interfaces(self, capabilities: dict): - """Detect WiFi interfaces and Bluetooth adapters.""" - import platform - - interfaces = capabilities.get("interfaces", {}) - - # Detect WiFi interfaces - if platform.system() == "Darwin": # macOS - try: - result = subprocess.run( - ["networksetup", "-listallhardwareports"], capture_output=True, text=True, timeout=5 - ) - lines = result.stdout.split("\n") - for i, line in enumerate(lines): - if "Wi-Fi" in line or "AirPort" in line: - port_name = line.replace("Hardware Port:", "").strip() - for j in range(i + 1, min(i + 3, len(lines))): - if "Device:" in lines[j]: - device = lines[j].split("Device:")[1].strip() - interfaces["wifi_interfaces"].append( - { - "name": device, - "display_name": f"{port_name} ({device})", - "type": "internal", - "monitor_capable": False, - } - ) - break - except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): - pass - else: # Linux - try: - result = subprocess.run(["iw", "dev"], capture_output=True, text=True, timeout=5) - current_iface = None - for line in result.stdout.split("\n"): - line = line.strip() - if line.startswith("Interface"): - current_iface = line.split()[1] - elif current_iface and "type" in line: - iface_type = line.split()[-1] - interfaces["wifi_interfaces"].append( - { - "name": current_iface, - "display_name": f"Wireless ({current_iface}) - {iface_type}", - "type": iface_type, - "monitor_capable": True, - } - ) - current_iface = None - except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): - # Fall back to iwconfig - try: - result = subprocess.run(["iwconfig"], capture_output=True, text=True, timeout=5) - for line in result.stdout.split("\n"): - if "IEEE 802.11" in line: - iface = line.split()[0] - interfaces["wifi_interfaces"].append( - { - "name": iface, - "display_name": f"Wireless ({iface})", - "type": "managed", - "monitor_capable": True, - } - ) - except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): - pass - - # Detect Bluetooth adapters - if platform.system() == "Linux": - try: - result = subprocess.run(["hciconfig"], capture_output=True, text=True, timeout=5) - blocks = re.split(r"(?=^hci\d+:)", result.stdout, flags=re.MULTILINE) - for block in blocks: - if block.strip(): - first_line = block.split("\n")[0] - match = re.match(r"(hci\d+):", first_line) - if match: - iface_name = match.group(1) - is_up = "UP RUNNING" in block or "\tUP " in block - interfaces["bt_adapters"].append( - { - "name": iface_name, - "display_name": f"Bluetooth Adapter ({iface_name})", - "type": "hci", - "status": "up" if is_up else "down", - } - ) - except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): - # Try bluetoothctl as fallback - try: - result = subprocess.run(["bluetoothctl", "list"], capture_output=True, text=True, timeout=5) - for line in result.stdout.split("\n"): - if "Controller" in line: - parts = line.split() - if len(parts) >= 3: - addr = parts[1] - name = " ".join(parts[2:]) if len(parts) > 2 else "Bluetooth" - interfaces["bt_adapters"].append( - { - "name": addr, - "display_name": f"{name} ({addr[-8:]})", - "type": "controller", - "status": "available", - } - ) - except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): - pass - elif platform.system() == "Darwin": - try: - result = subprocess.run( - ["system_profiler", "SPBluetoothDataType"], capture_output=True, text=True, timeout=10 - ) - bt_name = "Built-in Bluetooth" - bt_addr = "" - for line in result.stdout.split("\n"): - if "Address:" in line: - bt_addr = line.split("Address:")[1].strip() - break - interfaces["bt_adapters"].append( - { - "name": "default", - "display_name": f"{bt_name}" + (f" ({bt_addr[-8:]})" if bt_addr else ""), - "type": "macos", - "status": "available", - } - ) - except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): - interfaces["bt_adapters"].append( - {"name": "default", "display_name": "Built-in Bluetooth", "type": "macos", "status": "available"} - ) - - def _detect_capabilities_fallback(self, capabilities: dict): - """Fallback capability detection when dependencies module unavailable.""" - tool_checks = { - "pager": ["rtl_fm", "multimon-ng"], - "sensor": ["rtl_433"], - "adsb": ["dump1090"], - "ais": ["AIS-catcher"], - "acars": ["acarsdec"], - "aprs": ["rtl_fm", "direwolf"], - "wifi": ["airmon-ng", "airodump-ng"], - "bluetooth": ["bluetoothctl"], - "dsc": ["rtl_fm"], - "rtlamr": ["rtlamr"], - "satellite": [], - "listening_post": ["rtl_fm"], - "tscm": ["rtl_fm"], - } - - for mode, tools in tool_checks.items(): - if not config.modes_enabled.get(mode, True): - capabilities["modes"][mode] = False - continue - if not tools: - capabilities["modes"][mode] = True - continue - if mode == "adsb": - capabilities["modes"][mode] = ( - self._check_tool("dump1090") or self._check_tool("dump1090-fa") or self._check_tool("readsb") - ) - else: - capabilities["modes"][mode] = all(self._check_tool(tool) for tool in tools) - def get_status(self) -> dict: """Get overall agent status.""" # Build running modes with device info for multi-SDR tracking diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py new file mode 100644 index 0000000..1a6fa1e --- /dev/null +++ b/tests/test_capabilities.py @@ -0,0 +1,43 @@ +"""Tests for shared capability detection.""" + +from unittest.mock import patch + +from utils.capabilities import detect_interfaces, detect_mode_availability + + +class TestModeAvailability: + def test_all_tools_present(self): + with patch("utils.capabilities.check_all_dependencies") as mock_deps: + mock_deps.return_value = { + key: {"ready": True} + for key in ( + "pager", + "sensor", + "aircraft", + "ais", + "acars", + "aprs", + "wifi", + "bluetooth", + "tscm", + "satellite", + ) + } + modes = detect_mode_availability() + assert modes.get("sensor") is True + assert modes.get("pager") is True + assert modes.get("adsb") is True # maps from dep key "aircraft" + + def test_no_tools_present(self): + with patch("utils.capabilities.check_all_dependencies") as mock_deps: + mock_deps.return_value = {} + modes = detect_mode_availability() + assert modes.get("sensor") is False + + +class TestInterfaceDetection: + def test_returns_expected_shape(self, fake_process): + with patch("subprocess.Popen", return_value=fake_process()): + interfaces = detect_interfaces() + assert set(interfaces) == {"wifi_interfaces", "bt_adapters", "sdr_devices"} + assert isinstance(interfaces["wifi_interfaces"], list) diff --git a/utils/capabilities.py b/utils/capabilities.py new file mode 100644 index 0000000..d1885e0 --- /dev/null +++ b/utils/capabilities.py @@ -0,0 +1,243 @@ +"""Shared tool/interface capability detection. + +Extracted from the standalone agent (``intercept_agent.py``) so the app and the +agent share one implementation and cannot drift. Mode availability is derived +from :func:`utils.dependencies.check_all_dependencies`; interface detection +probes the host for WiFi interfaces and Bluetooth adapters. + +This module is intentionally config-agnostic: it reports raw tool/hardware +availability. Callers that gate modes behind their own configuration apply that +gating on top of the values returned here. +""" + +from __future__ import annotations + +import platform +import re +import subprocess + +from utils.dependencies import check_all_dependencies, check_tool +from utils.logging import get_logger + +logger = get_logger("intercept.capabilities") + +# Mapping from utils.dependencies mode key -> capability/mode key used by callers. +MODE_DEPENDENCY_MAP = { + "pager": "pager", + "sensor": "sensor", + "aircraft": "adsb", + "ais": "ais", + "acars": "acars", + "aprs": "aprs", + "wifi": "wifi", + "bluetooth": "bluetooth", + "tscm": "tscm", + "satellite": "satellite", +} + +# Modes not represented in utils.dependencies; keyed by cap mode -> required tools. +EXTRA_MODE_TOOLS = { + "dsc": ["rtl_fm"], + "rtlamr": ["rtlamr"], + "listening_post": ["rtl_fm"], +} + +# Fallback tool checks when the dependencies module is unavailable. +FALLBACK_TOOL_CHECKS = { + "pager": ["rtl_fm", "multimon-ng"], + "sensor": ["rtl_433"], + "adsb": ["dump1090"], + "ais": ["AIS-catcher"], + "acars": ["acarsdec"], + "aprs": ["rtl_fm", "direwolf"], + "wifi": ["airmon-ng", "airodump-ng"], + "bluetooth": ["bluetoothctl"], + "dsc": ["rtl_fm"], + "rtlamr": ["rtlamr"], + "satellite": [], + "listening_post": ["rtl_fm"], + "tscm": ["rtl_fm"], +} + + +def detect_mode_availability() -> dict[str, bool]: + """Detect mode availability from tool dependencies. + + Returns a ``{cap_mode: bool}`` map of raw tool readiness. Falls back to + direct tool checks if :func:`check_all_dependencies` raises. + """ + modes: dict[str, bool] = {} + try: + dep_status = check_all_dependencies() + except Exception as e: + logger.warning(f"Dependency check failed, using fallback: {e}") + return _detect_mode_availability_fallback() + + for dep_mode, cap_mode in MODE_DEPENDENCY_MAP.items(): + if dep_mode in dep_status: + modes[cap_mode] = dep_status[dep_mode]["ready"] + else: + modes[cap_mode] = False + + # Modes not in dependencies.py + for cap_mode, tools in EXTRA_MODE_TOOLS.items(): + modes[cap_mode] = all(check_tool(tool) for tool in tools) if tools else True + + return modes + + +def _detect_mode_availability_fallback() -> dict[str, bool]: + """Fallback mode availability when the dependencies module is unavailable. + + Note: this uses ``utils.dependencies.check_tool``, which also searches + Homebrew paths (a strict superset of ``shutil.which``). + """ + modes: dict[str, bool] = {} + for mode, tools in FALLBACK_TOOL_CHECKS.items(): + if not tools: + modes[mode] = True + elif mode == "adsb": + modes[mode] = check_tool("dump1090") or check_tool("dump1090-fa") or check_tool("readsb") + else: + modes[mode] = all(check_tool(tool) for tool in tools) + return modes + + +def detect_interfaces() -> dict[str, list]: + """Detect WiFi interfaces and Bluetooth adapters on the host. + + Returns ``{"wifi_interfaces": [...], "bt_adapters": [...], "sdr_devices": []}``. + ``sdr_devices`` is left empty here; SDR enumeration is handled by callers. + """ + interfaces: dict[str, list] = { + "wifi_interfaces": [], + "bt_adapters": [], + "sdr_devices": [], + } + + # Detect WiFi interfaces + if platform.system() == "Darwin": # macOS + try: + result = subprocess.run( + ["networksetup", "-listallhardwareports"], capture_output=True, text=True, timeout=5 + ) + lines = result.stdout.split("\n") + for i, line in enumerate(lines): + if "Wi-Fi" in line or "AirPort" in line: + port_name = line.replace("Hardware Port:", "").strip() + for j in range(i + 1, min(i + 3, len(lines))): + if "Device:" in lines[j]: + device = lines[j].split("Device:")[1].strip() + interfaces["wifi_interfaces"].append( + { + "name": device, + "display_name": f"{port_name} ({device})", + "type": "internal", + "monitor_capable": False, + } + ) + break + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + pass + else: # Linux + try: + result = subprocess.run(["iw", "dev"], capture_output=True, text=True, timeout=5) + current_iface = None + for line in result.stdout.split("\n"): + line = line.strip() + if line.startswith("Interface"): + current_iface = line.split()[1] + elif current_iface and "type" in line: + iface_type = line.split()[-1] + interfaces["wifi_interfaces"].append( + { + "name": current_iface, + "display_name": f"Wireless ({current_iface}) - {iface_type}", + "type": iface_type, + "monitor_capable": True, + } + ) + current_iface = None + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + # Fall back to iwconfig + try: + result = subprocess.run(["iwconfig"], capture_output=True, text=True, timeout=5) + for line in result.stdout.split("\n"): + if "IEEE 802.11" in line: + iface = line.split()[0] + interfaces["wifi_interfaces"].append( + { + "name": iface, + "display_name": f"Wireless ({iface})", + "type": "managed", + "monitor_capable": True, + } + ) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + pass + + # Detect Bluetooth adapters + if platform.system() == "Linux": + try: + result = subprocess.run(["hciconfig"], capture_output=True, text=True, timeout=5) + blocks = re.split(r"(?=^hci\d+:)", result.stdout, flags=re.MULTILINE) + for block in blocks: + if block.strip(): + first_line = block.split("\n")[0] + match = re.match(r"(hci\d+):", first_line) + if match: + iface_name = match.group(1) + is_up = "UP RUNNING" in block or "\tUP " in block + interfaces["bt_adapters"].append( + { + "name": iface_name, + "display_name": f"Bluetooth Adapter ({iface_name})", + "type": "hci", + "status": "up" if is_up else "down", + } + ) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + # Try bluetoothctl as fallback + try: + result = subprocess.run(["bluetoothctl", "list"], capture_output=True, text=True, timeout=5) + for line in result.stdout.split("\n"): + if "Controller" in line: + parts = line.split() + if len(parts) >= 3: + addr = parts[1] + name = " ".join(parts[2:]) if len(parts) > 2 else "Bluetooth" + interfaces["bt_adapters"].append( + { + "name": addr, + "display_name": f"{name} ({addr[-8:]})", + "type": "controller", + "status": "available", + } + ) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + pass + elif platform.system() == "Darwin": + try: + result = subprocess.run( + ["system_profiler", "SPBluetoothDataType"], capture_output=True, text=True, timeout=10 + ) + bt_name = "Built-in Bluetooth" + bt_addr = "" + for line in result.stdout.split("\n"): + if "Address:" in line: + bt_addr = line.split("Address:")[1].strip() + break + interfaces["bt_adapters"].append( + { + "name": "default", + "display_name": f"{bt_name}" + (f" ({bt_addr[-8:]})" if bt_addr else ""), + "type": "macos", + "status": "available", + } + ) + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError): + interfaces["bt_adapters"].append( + {"name": "default", "display_name": "Built-in Bluetooth", "type": "macos", "status": "available"} + ) + + return interfaces