refactor: extract shared capability detection from agent

utils/capabilities.py now owns interface detection and mode
availability; the agent delegates via detect_interfaces() and
detect_mode_availability(). The agent keeps config gating and
tool_details population to preserve its result shape exactly.

The moved fallback path uses utils.dependencies.check_tool instead of
the agent's old shutil.which fallback; check_tool also searches
Homebrew paths, a strict superset (strictly better detection).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
James Smith
2026-06-12 08:31:54 +01:00
parent e14271c5ee
commit 0588055d1f
3 changed files with 301 additions and 212 deletions
+15 -212
View File
@@ -36,12 +36,15 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Import dependency checking from Intercept utils
try:
from utils.dependencies import TOOL_DEPENDENCIES, check_all_dependencies, check_tool
from utils.dependencies import TOOL_DEPENDENCIES, check_all_dependencies
HAS_DEPENDENCIES_MODULE = True
except ImportError:
HAS_DEPENDENCIES_MODULE = False
# Shared capability detection (extracted so app and agent cannot drift)
from utils.capabilities import MODE_DEPENDENCY_MAP, detect_interfaces, detect_mode_availability
# Import TSCM modules for consistent analysis (same as local mode)
try:
from utils.tscm.correlation import CorrelationEngine
@@ -391,14 +394,6 @@ class ModeManager:
logger.warning("Dependencies module not available")
return self._dependencies
def _check_tool(self, tool_name: str) -> bool:
"""Check if a tool is available using Intercept's dependency checker."""
deps = self._get_dependencies()
if deps and hasattr(deps, "check_tool"):
return deps.check_tool(tool_name)
# Fallback to simple which check
return shutil.which(tool_name) is not None
def _get_tool_path(self, tool_name: str) -> str | None:
"""Get tool path using Intercept's dependency module."""
deps = self._get_dependencies()
@@ -425,59 +420,30 @@ class ModeManager:
"tool_details": {}, # Detailed tool status
}
# Detect interfaces using Intercept's TSCM device detection
self._detect_interfaces(capabilities)
# Detect interfaces via shared capability detection
capabilities["interfaces"] = detect_interfaces()
# Use Intercept's comprehensive dependency checking if available
# Detect mode availability via shared capability detection
raw_modes = detect_mode_availability()
for mode, ready in raw_modes.items():
# Apply this agent's per-mode config gating on top of tool readiness
capabilities["modes"][mode] = ready if config.modes_enabled.get(mode, True) else False
# Populate detailed tool info for modes tracked in dependencies.py
if HAS_DEPENDENCIES_MODULE:
try:
dep_status = check_all_dependencies()
# Map dependency status to mode availability
mode_mapping = {
"pager": "pager",
"sensor": "sensor",
"aircraft": "adsb",
"ais": "ais",
"acars": "acars",
"aprs": "aprs",
"wifi": "wifi",
"bluetooth": "bluetooth",
"tscm": "tscm",
"satellite": "satellite",
}
for dep_mode, cap_mode in mode_mapping.items():
for dep_mode, cap_mode in MODE_DEPENDENCY_MAP.items():
if dep_mode in dep_status:
mode_info = dep_status[dep_mode]
# Check if mode is enabled in config
if not config.modes_enabled.get(cap_mode, True):
capabilities["modes"][cap_mode] = False
else:
capabilities["modes"][cap_mode] = mode_info["ready"]
# Store detailed tool info
capabilities["tool_details"][cap_mode] = {
"name": mode_info["name"],
"ready": mode_info["ready"],
"missing_required": mode_info["missing_required"],
"tools": mode_info["tools"],
}
# Handle modes not in dependencies.py
extra_modes = ["dsc", "rtlamr", "listening_post"]
extra_tools = {
"dsc": ["rtl_fm"],
"rtlamr": ["rtlamr"],
"listening_post": ["rtl_fm"],
}
for mode in extra_modes:
if not config.modes_enabled.get(mode, True):
capabilities["modes"][mode] = False
else:
tools = extra_tools.get(mode, [])
capabilities["modes"][mode] = all(check_tool(tool) for tool in tools) if tools else True
except Exception as e:
logger.warning(f"Dependency check failed, using fallback: {e}")
self._detect_capabilities_fallback(capabilities)
else:
self._detect_capabilities_fallback(capabilities)
logger.warning(f"Tool detail collection failed: {e}")
# Use Intercept's SDR detection
sdr_factory = self._get_sdr_factory()
@@ -501,169 +467,6 @@ class ModeManager:
self._capabilities = capabilities
return capabilities
def _detect_interfaces(self, capabilities: dict):
"""Detect WiFi interfaces and Bluetooth adapters."""
import platform
interfaces = capabilities.get("interfaces", {})
# Detect WiFi interfaces
if platform.system() == "Darwin": # macOS
try:
result = subprocess.run(
["networksetup", "-listallhardwareports"], capture_output=True, text=True, timeout=5
)
lines = result.stdout.split("\n")
for i, line in enumerate(lines):
if "Wi-Fi" in line or "AirPort" in line:
port_name = line.replace("Hardware Port:", "").strip()
for j in range(i + 1, min(i + 3, len(lines))):
if "Device:" in lines[j]:
device = lines[j].split("Device:")[1].strip()
interfaces["wifi_interfaces"].append(
{
"name": device,
"display_name": f"{port_name} ({device})",
"type": "internal",
"monitor_capable": False,
}
)
break
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
else: # Linux
try:
result = subprocess.run(["iw", "dev"], capture_output=True, text=True, timeout=5)
current_iface = None
for line in result.stdout.split("\n"):
line = line.strip()
if line.startswith("Interface"):
current_iface = line.split()[1]
elif current_iface and "type" in line:
iface_type = line.split()[-1]
interfaces["wifi_interfaces"].append(
{
"name": current_iface,
"display_name": f"Wireless ({current_iface}) - {iface_type}",
"type": iface_type,
"monitor_capable": True,
}
)
current_iface = None
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
# Fall back to iwconfig
try:
result = subprocess.run(["iwconfig"], capture_output=True, text=True, timeout=5)
for line in result.stdout.split("\n"):
if "IEEE 802.11" in line:
iface = line.split()[0]
interfaces["wifi_interfaces"].append(
{
"name": iface,
"display_name": f"Wireless ({iface})",
"type": "managed",
"monitor_capable": True,
}
)
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
# Detect Bluetooth adapters
if platform.system() == "Linux":
try:
result = subprocess.run(["hciconfig"], capture_output=True, text=True, timeout=5)
blocks = re.split(r"(?=^hci\d+:)", result.stdout, flags=re.MULTILINE)
for block in blocks:
if block.strip():
first_line = block.split("\n")[0]
match = re.match(r"(hci\d+):", first_line)
if match:
iface_name = match.group(1)
is_up = "UP RUNNING" in block or "\tUP " in block
interfaces["bt_adapters"].append(
{
"name": iface_name,
"display_name": f"Bluetooth Adapter ({iface_name})",
"type": "hci",
"status": "up" if is_up else "down",
}
)
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
# Try bluetoothctl as fallback
try:
result = subprocess.run(["bluetoothctl", "list"], capture_output=True, text=True, timeout=5)
for line in result.stdout.split("\n"):
if "Controller" in line:
parts = line.split()
if len(parts) >= 3:
addr = parts[1]
name = " ".join(parts[2:]) if len(parts) > 2 else "Bluetooth"
interfaces["bt_adapters"].append(
{
"name": addr,
"display_name": f"{name} ({addr[-8:]})",
"type": "controller",
"status": "available",
}
)
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
elif platform.system() == "Darwin":
try:
result = subprocess.run(
["system_profiler", "SPBluetoothDataType"], capture_output=True, text=True, timeout=10
)
bt_name = "Built-in Bluetooth"
bt_addr = ""
for line in result.stdout.split("\n"):
if "Address:" in line:
bt_addr = line.split("Address:")[1].strip()
break
interfaces["bt_adapters"].append(
{
"name": "default",
"display_name": f"{bt_name}" + (f" ({bt_addr[-8:]})" if bt_addr else ""),
"type": "macos",
"status": "available",
}
)
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
interfaces["bt_adapters"].append(
{"name": "default", "display_name": "Built-in Bluetooth", "type": "macos", "status": "available"}
)
def _detect_capabilities_fallback(self, capabilities: dict):
"""Fallback capability detection when dependencies module unavailable."""
tool_checks = {
"pager": ["rtl_fm", "multimon-ng"],
"sensor": ["rtl_433"],
"adsb": ["dump1090"],
"ais": ["AIS-catcher"],
"acars": ["acarsdec"],
"aprs": ["rtl_fm", "direwolf"],
"wifi": ["airmon-ng", "airodump-ng"],
"bluetooth": ["bluetoothctl"],
"dsc": ["rtl_fm"],
"rtlamr": ["rtlamr"],
"satellite": [],
"listening_post": ["rtl_fm"],
"tscm": ["rtl_fm"],
}
for mode, tools in tool_checks.items():
if not config.modes_enabled.get(mode, True):
capabilities["modes"][mode] = False
continue
if not tools:
capabilities["modes"][mode] = True
continue
if mode == "adsb":
capabilities["modes"][mode] = (
self._check_tool("dump1090") or self._check_tool("dump1090-fa") or self._check_tool("readsb")
)
else:
capabilities["modes"][mode] = all(self._check_tool(tool) for tool in tools)
def get_status(self) -> dict:
"""Get overall agent status."""
# Build running modes with device info for multi-SDR tracking
+43
View File
@@ -0,0 +1,43 @@
"""Tests for shared capability detection."""
from unittest.mock import patch
from utils.capabilities import detect_interfaces, detect_mode_availability
class TestModeAvailability:
def test_all_tools_present(self):
with patch("utils.capabilities.check_all_dependencies") as mock_deps:
mock_deps.return_value = {
key: {"ready": True}
for key in (
"pager",
"sensor",
"aircraft",
"ais",
"acars",
"aprs",
"wifi",
"bluetooth",
"tscm",
"satellite",
)
}
modes = detect_mode_availability()
assert modes.get("sensor") is True
assert modes.get("pager") is True
assert modes.get("adsb") is True # maps from dep key "aircraft"
def test_no_tools_present(self):
with patch("utils.capabilities.check_all_dependencies") as mock_deps:
mock_deps.return_value = {}
modes = detect_mode_availability()
assert modes.get("sensor") is False
class TestInterfaceDetection:
def test_returns_expected_shape(self, fake_process):
with patch("subprocess.Popen", return_value=fake_process()):
interfaces = detect_interfaces()
assert set(interfaces) == {"wifi_interfaces", "bt_adapters", "sdr_devices"}
assert isinstance(interfaces["wifi_interfaces"], list)
+243
View File
@@ -0,0 +1,243 @@
"""Shared tool/interface capability detection.
Extracted from the standalone agent (``intercept_agent.py``) so the app and the
agent share one implementation and cannot drift. Mode availability is derived
from :func:`utils.dependencies.check_all_dependencies`; interface detection
probes the host for WiFi interfaces and Bluetooth adapters.
This module is intentionally config-agnostic: it reports raw tool/hardware
availability. Callers that gate modes behind their own configuration apply that
gating on top of the values returned here.
"""
from __future__ import annotations
import platform
import re
import subprocess
from utils.dependencies import check_all_dependencies, check_tool
from utils.logging import get_logger
logger = get_logger("intercept.capabilities")
# Mapping from utils.dependencies mode key -> capability/mode key used by callers.
MODE_DEPENDENCY_MAP = {
"pager": "pager",
"sensor": "sensor",
"aircraft": "adsb",
"ais": "ais",
"acars": "acars",
"aprs": "aprs",
"wifi": "wifi",
"bluetooth": "bluetooth",
"tscm": "tscm",
"satellite": "satellite",
}
# Modes not represented in utils.dependencies; keyed by cap mode -> required tools.
EXTRA_MODE_TOOLS = {
"dsc": ["rtl_fm"],
"rtlamr": ["rtlamr"],
"listening_post": ["rtl_fm"],
}
# Fallback tool checks when the dependencies module is unavailable.
FALLBACK_TOOL_CHECKS = {
"pager": ["rtl_fm", "multimon-ng"],
"sensor": ["rtl_433"],
"adsb": ["dump1090"],
"ais": ["AIS-catcher"],
"acars": ["acarsdec"],
"aprs": ["rtl_fm", "direwolf"],
"wifi": ["airmon-ng", "airodump-ng"],
"bluetooth": ["bluetoothctl"],
"dsc": ["rtl_fm"],
"rtlamr": ["rtlamr"],
"satellite": [],
"listening_post": ["rtl_fm"],
"tscm": ["rtl_fm"],
}
def detect_mode_availability() -> dict[str, bool]:
"""Detect mode availability from tool dependencies.
Returns a ``{cap_mode: bool}`` map of raw tool readiness. Falls back to
direct tool checks if :func:`check_all_dependencies` raises.
"""
modes: dict[str, bool] = {}
try:
dep_status = check_all_dependencies()
except Exception as e:
logger.warning(f"Dependency check failed, using fallback: {e}")
return _detect_mode_availability_fallback()
for dep_mode, cap_mode in MODE_DEPENDENCY_MAP.items():
if dep_mode in dep_status:
modes[cap_mode] = dep_status[dep_mode]["ready"]
else:
modes[cap_mode] = False
# Modes not in dependencies.py
for cap_mode, tools in EXTRA_MODE_TOOLS.items():
modes[cap_mode] = all(check_tool(tool) for tool in tools) if tools else True
return modes
def _detect_mode_availability_fallback() -> dict[str, bool]:
"""Fallback mode availability when the dependencies module is unavailable.
Note: this uses ``utils.dependencies.check_tool``, which also searches
Homebrew paths (a strict superset of ``shutil.which``).
"""
modes: dict[str, bool] = {}
for mode, tools in FALLBACK_TOOL_CHECKS.items():
if not tools:
modes[mode] = True
elif mode == "adsb":
modes[mode] = check_tool("dump1090") or check_tool("dump1090-fa") or check_tool("readsb")
else:
modes[mode] = all(check_tool(tool) for tool in tools)
return modes
def detect_interfaces() -> dict[str, list]:
"""Detect WiFi interfaces and Bluetooth adapters on the host.
Returns ``{"wifi_interfaces": [...], "bt_adapters": [...], "sdr_devices": []}``.
``sdr_devices`` is left empty here; SDR enumeration is handled by callers.
"""
interfaces: dict[str, list] = {
"wifi_interfaces": [],
"bt_adapters": [],
"sdr_devices": [],
}
# Detect WiFi interfaces
if platform.system() == "Darwin": # macOS
try:
result = subprocess.run(
["networksetup", "-listallhardwareports"], capture_output=True, text=True, timeout=5
)
lines = result.stdout.split("\n")
for i, line in enumerate(lines):
if "Wi-Fi" in line or "AirPort" in line:
port_name = line.replace("Hardware Port:", "").strip()
for j in range(i + 1, min(i + 3, len(lines))):
if "Device:" in lines[j]:
device = lines[j].split("Device:")[1].strip()
interfaces["wifi_interfaces"].append(
{
"name": device,
"display_name": f"{port_name} ({device})",
"type": "internal",
"monitor_capable": False,
}
)
break
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
else: # Linux
try:
result = subprocess.run(["iw", "dev"], capture_output=True, text=True, timeout=5)
current_iface = None
for line in result.stdout.split("\n"):
line = line.strip()
if line.startswith("Interface"):
current_iface = line.split()[1]
elif current_iface and "type" in line:
iface_type = line.split()[-1]
interfaces["wifi_interfaces"].append(
{
"name": current_iface,
"display_name": f"Wireless ({current_iface}) - {iface_type}",
"type": iface_type,
"monitor_capable": True,
}
)
current_iface = None
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
# Fall back to iwconfig
try:
result = subprocess.run(["iwconfig"], capture_output=True, text=True, timeout=5)
for line in result.stdout.split("\n"):
if "IEEE 802.11" in line:
iface = line.split()[0]
interfaces["wifi_interfaces"].append(
{
"name": iface,
"display_name": f"Wireless ({iface})",
"type": "managed",
"monitor_capable": True,
}
)
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
# Detect Bluetooth adapters
if platform.system() == "Linux":
try:
result = subprocess.run(["hciconfig"], capture_output=True, text=True, timeout=5)
blocks = re.split(r"(?=^hci\d+:)", result.stdout, flags=re.MULTILINE)
for block in blocks:
if block.strip():
first_line = block.split("\n")[0]
match = re.match(r"(hci\d+):", first_line)
if match:
iface_name = match.group(1)
is_up = "UP RUNNING" in block or "\tUP " in block
interfaces["bt_adapters"].append(
{
"name": iface_name,
"display_name": f"Bluetooth Adapter ({iface_name})",
"type": "hci",
"status": "up" if is_up else "down",
}
)
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
# Try bluetoothctl as fallback
try:
result = subprocess.run(["bluetoothctl", "list"], capture_output=True, text=True, timeout=5)
for line in result.stdout.split("\n"):
if "Controller" in line:
parts = line.split()
if len(parts) >= 3:
addr = parts[1]
name = " ".join(parts[2:]) if len(parts) > 2 else "Bluetooth"
interfaces["bt_adapters"].append(
{
"name": addr,
"display_name": f"{name} ({addr[-8:]})",
"type": "controller",
"status": "available",
}
)
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
pass
elif platform.system() == "Darwin":
try:
result = subprocess.run(
["system_profiler", "SPBluetoothDataType"], capture_output=True, text=True, timeout=10
)
bt_name = "Built-in Bluetooth"
bt_addr = ""
for line in result.stdout.split("\n"):
if "Address:" in line:
bt_addr = line.split("Address:")[1].strip()
break
interfaces["bt_adapters"].append(
{
"name": "default",
"display_name": f"{bt_name}" + (f" ({bt_addr[-8:]})" if bt_addr else ""),
"type": "macos",
"status": "available",
}
)
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError):
interfaces["bt_adapters"].append(
{"name": "default", "display_name": "Built-in Bluetooth", "type": "macos", "status": "available"}
)
return interfaces