mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 22:59:59 -07:00
The airport utility path doesn't exist on newer macOS versions. Added fallback methods using networksetup and ifconfig to detect WiFi availability.
2186 lines
83 KiB
Python
2186 lines
83 KiB
Python
"""
|
|
TSCM Advanced Features Module
|
|
|
|
Implements:
|
|
1. Capability & Coverage Reality Panel
|
|
2. Baseline Diff & Baseline Health
|
|
3. Per-Device Timelines
|
|
4. Meeting-Window Summary Enhancements
|
|
5. WiFi Advanced Indicators (Evil Twin, Probes, Deauth)
|
|
6. Bluetooth Risk Explainability & Proximity Heuristics
|
|
7. Operator Playbooks
|
|
|
|
DISCLAIMER: This system performs wireless and RF surveillance screening.
|
|
Findings indicate anomalies and indicators, not confirmed surveillance devices.
|
|
All claims are probabilistic pattern matches requiring professional verification.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import platform
|
|
import subprocess
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
from typing import Any, Optional
|
|
|
|
logger = logging.getLogger('intercept.tscm.advanced')
|
|
|
|
|
|
# =============================================================================
|
|
# 1. Capability & Coverage Reality Panel
|
|
# =============================================================================
|
|
|
|
class WifiMode(Enum):
|
|
"""WiFi adapter operating modes."""
|
|
MONITOR = 'monitor'
|
|
MANAGED = 'managed'
|
|
UNAVAILABLE = 'unavailable'
|
|
|
|
|
|
class BluetoothMode(Enum):
|
|
"""Bluetooth adapter capabilities."""
|
|
BLE_CLASSIC = 'ble_classic'
|
|
BLE_ONLY = 'ble_only'
|
|
LIMITED = 'limited'
|
|
UNAVAILABLE = 'unavailable'
|
|
|
|
|
|
@dataclass
|
|
class RFCapability:
|
|
"""RF/SDR device capabilities."""
|
|
device_type: str = 'none'
|
|
driver: str = ''
|
|
min_frequency_mhz: float = 0.0
|
|
max_frequency_mhz: float = 0.0
|
|
sample_rate_max: int = 0
|
|
available: bool = False
|
|
limitations: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class SweepCapabilities:
|
|
"""
|
|
Complete capabilities snapshot for a TSCM sweep.
|
|
|
|
Exposes what the current sweep CAN and CANNOT detect based on
|
|
OS, privileges, adapters, and SDR hardware limits.
|
|
"""
|
|
# System info
|
|
os_name: str = ''
|
|
os_version: str = ''
|
|
is_root: bool = False
|
|
|
|
# WiFi capabilities
|
|
wifi_mode: WifiMode = WifiMode.UNAVAILABLE
|
|
wifi_interface: str = ''
|
|
wifi_driver: str = ''
|
|
wifi_monitor_capable: bool = False
|
|
wifi_limitations: list[str] = field(default_factory=list)
|
|
|
|
# Bluetooth capabilities
|
|
bt_mode: BluetoothMode = BluetoothMode.UNAVAILABLE
|
|
bt_adapter: str = ''
|
|
bt_version: str = ''
|
|
bt_limitations: list[str] = field(default_factory=list)
|
|
|
|
# RF/SDR capabilities
|
|
rf_capability: RFCapability = field(default_factory=RFCapability)
|
|
|
|
# Overall limitations
|
|
all_limitations: list[str] = field(default_factory=list)
|
|
|
|
# Timestamp
|
|
captured_at: datetime = field(default_factory=datetime.now)
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
'system': {
|
|
'os': self.os_name,
|
|
'os_version': self.os_version,
|
|
'is_root': self.is_root,
|
|
},
|
|
'wifi': {
|
|
'mode': self.wifi_mode.value,
|
|
'interface': self.wifi_interface,
|
|
'driver': self.wifi_driver,
|
|
'monitor_capable': self.wifi_monitor_capable,
|
|
'limitations': self.wifi_limitations,
|
|
},
|
|
'bluetooth': {
|
|
'mode': self.bt_mode.value,
|
|
'adapter': self.bt_adapter,
|
|
'version': self.bt_version,
|
|
'limitations': self.bt_limitations,
|
|
},
|
|
'rf': {
|
|
'device_type': self.rf_capability.device_type,
|
|
'driver': self.rf_capability.driver,
|
|
'frequency_range_mhz': {
|
|
'min': self.rf_capability.min_frequency_mhz,
|
|
'max': self.rf_capability.max_frequency_mhz,
|
|
},
|
|
'sample_rate_max': self.rf_capability.sample_rate_max,
|
|
'available': self.rf_capability.available,
|
|
'limitations': self.rf_capability.limitations,
|
|
},
|
|
'all_limitations': self.all_limitations,
|
|
'captured_at': self.captured_at.isoformat(),
|
|
'disclaimer': (
|
|
"Capabilities are detected at sweep start time and may change. "
|
|
"Limitations listed affect what this sweep can reliably detect."
|
|
),
|
|
}
|
|
|
|
|
|
def detect_sweep_capabilities(
|
|
wifi_interface: str = '',
|
|
bt_adapter: str = '',
|
|
sdr_device: Any = None
|
|
) -> SweepCapabilities:
|
|
"""
|
|
Detect current system capabilities for TSCM sweeping.
|
|
|
|
Args:
|
|
wifi_interface: Specific WiFi interface to check
|
|
bt_adapter: Specific BT adapter to check
|
|
sdr_device: SDR device object if available
|
|
|
|
Returns:
|
|
SweepCapabilities object with complete capability assessment
|
|
"""
|
|
caps = SweepCapabilities()
|
|
|
|
# System info
|
|
caps.os_name = platform.system()
|
|
caps.os_version = platform.release()
|
|
caps.is_root = os.geteuid() == 0 if hasattr(os, 'geteuid') else False
|
|
|
|
# Detect WiFi capabilities
|
|
_detect_wifi_capabilities(caps, wifi_interface)
|
|
|
|
# Detect Bluetooth capabilities
|
|
_detect_bluetooth_capabilities(caps, bt_adapter)
|
|
|
|
# Detect RF/SDR capabilities
|
|
_detect_rf_capabilities(caps, sdr_device)
|
|
|
|
# Compile all limitations
|
|
caps.all_limitations = (
|
|
caps.wifi_limitations +
|
|
caps.bt_limitations +
|
|
caps.rf_capability.limitations
|
|
)
|
|
|
|
# Add privilege-based limitations
|
|
if not caps.is_root:
|
|
caps.all_limitations.append(
|
|
"Running without root privileges - some features may be limited"
|
|
)
|
|
|
|
return caps
|
|
|
|
|
|
def _detect_wifi_capabilities(caps: SweepCapabilities, interface: str) -> None:
|
|
"""Detect WiFi adapter capabilities."""
|
|
caps.wifi_interface = interface
|
|
|
|
if platform.system() == 'Darwin':
|
|
# macOS: Check for WiFi capability using multiple methods
|
|
wifi_available = False
|
|
|
|
# Method 1: Check airport utility (older macOS)
|
|
airport_path = '/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport'
|
|
if os.path.exists(airport_path):
|
|
wifi_available = True
|
|
|
|
# Method 2: Check for WiFi interface using networksetup (works on all macOS)
|
|
if not wifi_available:
|
|
try:
|
|
result = subprocess.run(
|
|
['networksetup', '-listallhardwareports'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if 'Wi-Fi' in result.stdout or 'AirPort' in result.stdout:
|
|
wifi_available = True
|
|
except Exception:
|
|
pass
|
|
|
|
# Method 3: Check if en0 exists (common WiFi interface on macOS)
|
|
if not wifi_available:
|
|
try:
|
|
result = subprocess.run(
|
|
['ifconfig', 'en0'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
wifi_available = True
|
|
except Exception:
|
|
pass
|
|
|
|
if wifi_available:
|
|
caps.wifi_mode = WifiMode.MANAGED
|
|
caps.wifi_driver = 'apple80211'
|
|
caps.wifi_monitor_capable = False
|
|
caps.wifi_limitations = [
|
|
"macOS WiFi operates in managed mode only.",
|
|
"Cannot capture probe requests or deauthentication frames.",
|
|
"Evil twin detection limited to SSID/BSSID comparison only.",
|
|
]
|
|
else:
|
|
caps.wifi_mode = WifiMode.UNAVAILABLE
|
|
caps.wifi_limitations = ["WiFi scanning unavailable - no interface found"]
|
|
|
|
else:
|
|
# Linux: Check for monitor mode capability
|
|
try:
|
|
# Check if interface supports monitor mode
|
|
result = subprocess.run(
|
|
['iw', 'list'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if 'monitor' in result.stdout.lower():
|
|
# Check current mode
|
|
if interface:
|
|
mode_result = subprocess.run(
|
|
['iw', 'dev', interface, 'info'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if 'type monitor' in mode_result.stdout.lower():
|
|
caps.wifi_mode = WifiMode.MONITOR
|
|
caps.wifi_monitor_capable = True
|
|
else:
|
|
caps.wifi_mode = WifiMode.MANAGED
|
|
caps.wifi_monitor_capable = True
|
|
caps.wifi_limitations.append(
|
|
"WiFi interface in managed mode. "
|
|
"Probe requests and deauth detection require monitor mode."
|
|
)
|
|
else:
|
|
caps.wifi_mode = WifiMode.MANAGED
|
|
caps.wifi_monitor_capable = True
|
|
else:
|
|
caps.wifi_mode = WifiMode.MANAGED
|
|
caps.wifi_monitor_capable = False
|
|
caps.wifi_limitations = [
|
|
"Passive WiFi frame analysis is not available in this sweep.",
|
|
"WiFi adapter does not support monitor mode.",
|
|
"Probe request and deauthentication detection unavailable.",
|
|
]
|
|
|
|
# Get driver info
|
|
if interface:
|
|
try:
|
|
driver_path = f'/sys/class/net/{interface}/device/driver'
|
|
if os.path.exists(driver_path):
|
|
caps.wifi_driver = os.path.basename(os.readlink(driver_path))
|
|
except Exception:
|
|
pass
|
|
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
caps.wifi_mode = WifiMode.UNAVAILABLE
|
|
caps.wifi_limitations = ["WiFi scanning tools not available"]
|
|
|
|
|
|
def _detect_bluetooth_capabilities(caps: SweepCapabilities, adapter: str) -> None:
|
|
"""Detect Bluetooth adapter capabilities."""
|
|
caps.bt_adapter = adapter
|
|
|
|
if platform.system() == 'Darwin':
|
|
# macOS: Use system_profiler
|
|
try:
|
|
result = subprocess.run(
|
|
['system_profiler', 'SPBluetoothDataType', '-json'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
if 'Bluetooth' in result.stdout:
|
|
caps.bt_mode = BluetoothMode.BLE_CLASSIC
|
|
caps.bt_version = 'macOS CoreBluetooth'
|
|
caps.bt_limitations = [
|
|
"BLE scanning limited to advertising devices only.",
|
|
"Classic Bluetooth discovery may be incomplete.",
|
|
"Manufacturer data parsing depends on device advertising.",
|
|
]
|
|
else:
|
|
caps.bt_mode = BluetoothMode.UNAVAILABLE
|
|
caps.bt_limitations = ["Bluetooth not available"]
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
caps.bt_mode = BluetoothMode.UNAVAILABLE
|
|
caps.bt_limitations = ["Bluetooth detection failed"]
|
|
|
|
else:
|
|
# Linux: Check bluetoothctl/hciconfig
|
|
try:
|
|
result = subprocess.run(
|
|
['hciconfig', '-a'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if 'hci' in result.stdout.lower():
|
|
# Check for BLE support
|
|
if 'le' in result.stdout.lower():
|
|
caps.bt_mode = BluetoothMode.BLE_CLASSIC
|
|
caps.bt_limitations = [
|
|
"BLE scanning range depends on adapter sensitivity.",
|
|
"Some devices may not be detected if not advertising.",
|
|
]
|
|
else:
|
|
caps.bt_mode = BluetoothMode.LIMITED
|
|
caps.bt_limitations = [
|
|
"Adapter may not support BLE scanning.",
|
|
"Limited to classic Bluetooth discovery.",
|
|
]
|
|
|
|
# Extract version
|
|
for line in result.stdout.split('\n'):
|
|
if 'hci version' in line.lower():
|
|
caps.bt_version = line.strip()
|
|
break
|
|
else:
|
|
caps.bt_mode = BluetoothMode.UNAVAILABLE
|
|
caps.bt_limitations = ["No Bluetooth adapter found"]
|
|
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
caps.bt_mode = BluetoothMode.UNAVAILABLE
|
|
caps.bt_limitations = ["Bluetooth tools not available"]
|
|
|
|
|
|
def _detect_rf_capabilities(caps: SweepCapabilities, sdr_device: Any) -> None:
|
|
"""Detect RF/SDR device capabilities."""
|
|
rf_cap = RFCapability()
|
|
|
|
try:
|
|
from utils.sdr import SDRFactory
|
|
devices = SDRFactory.detect_devices()
|
|
|
|
if devices:
|
|
device = devices[0] # Use first device
|
|
rf_cap.available = True
|
|
rf_cap.device_type = device.get('type', 'unknown')
|
|
rf_cap.driver = device.get('driver', '')
|
|
|
|
# Set frequency ranges based on device type
|
|
if 'rtl' in rf_cap.device_type.lower():
|
|
rf_cap.min_frequency_mhz = 24.0
|
|
rf_cap.max_frequency_mhz = 1766.0
|
|
rf_cap.sample_rate_max = 3200000
|
|
rf_cap.limitations = [
|
|
"RTL-SDR frequency range: 24-1766 MHz typical.",
|
|
"Cannot reliably cover frequencies below 24 MHz.",
|
|
"Cannot cover microwave bands (>1.8 GHz) without upconverter.",
|
|
"Signal detection limited by SDR noise floor and dynamic range.",
|
|
]
|
|
elif 'hackrf' in rf_cap.device_type.lower():
|
|
rf_cap.min_frequency_mhz = 1.0
|
|
rf_cap.max_frequency_mhz = 6000.0
|
|
rf_cap.sample_rate_max = 20000000
|
|
rf_cap.limitations = [
|
|
"HackRF frequency range: 1 MHz - 6 GHz.",
|
|
"8-bit ADC limits dynamic range for weak signal detection.",
|
|
]
|
|
else:
|
|
rf_cap.limitations = [
|
|
f"Unknown SDR type: {rf_cap.device_type}",
|
|
"Frequency coverage and capabilities uncertain.",
|
|
]
|
|
else:
|
|
rf_cap.available = False
|
|
rf_cap.device_type = 'none'
|
|
rf_cap.limitations = [
|
|
"No SDR device detected.",
|
|
"RF spectrum analysis is not available in this sweep.",
|
|
"Cannot scan for wireless microphones, bugs, or RF transmitters.",
|
|
]
|
|
|
|
except ImportError:
|
|
rf_cap.available = False
|
|
rf_cap.limitations = [
|
|
"SDR support not installed.",
|
|
"RF spectrum analysis unavailable.",
|
|
]
|
|
except Exception as e:
|
|
rf_cap.available = False
|
|
rf_cap.limitations = [f"SDR detection failed: {str(e)}"]
|
|
|
|
caps.rf_capability = rf_cap
|
|
|
|
|
|
# =============================================================================
|
|
# 2. Baseline Diff & Baseline Health
|
|
# =============================================================================
|
|
|
|
class BaselineHealth(Enum):
|
|
"""Baseline health status."""
|
|
HEALTHY = 'healthy'
|
|
NOISY = 'noisy'
|
|
STALE = 'stale'
|
|
|
|
|
|
@dataclass
|
|
class DeviceChange:
|
|
"""Represents a change detected compared to baseline."""
|
|
identifier: str
|
|
protocol: str
|
|
change_type: str # 'new', 'missing', 'rssi_drift', 'channel_change', 'security_change'
|
|
description: str
|
|
expected: bool = False # True if this is an expected/normal change
|
|
details: dict = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class BaselineDiff:
|
|
"""
|
|
Complete diff between a baseline and a sweep.
|
|
|
|
Shows what changed, whether baseline is reliable,
|
|
and separates expected vs unexpected changes.
|
|
"""
|
|
baseline_id: int
|
|
sweep_id: int
|
|
|
|
# Health assessment
|
|
health: BaselineHealth = BaselineHealth.HEALTHY
|
|
health_score: float = 1.0 # 0-1, higher is healthier
|
|
health_reasons: list[str] = field(default_factory=list)
|
|
|
|
# Age metrics
|
|
baseline_age_hours: float = 0.0
|
|
is_stale: bool = False
|
|
|
|
# Device changes
|
|
new_devices: list[DeviceChange] = field(default_factory=list)
|
|
missing_devices: list[DeviceChange] = field(default_factory=list)
|
|
changed_devices: list[DeviceChange] = field(default_factory=list)
|
|
|
|
# Summary counts
|
|
total_new: int = 0
|
|
total_missing: int = 0
|
|
total_changed: int = 0
|
|
|
|
# Expected vs unexpected
|
|
expected_changes: list[DeviceChange] = field(default_factory=list)
|
|
unexpected_changes: list[DeviceChange] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
'baseline_id': self.baseline_id,
|
|
'sweep_id': self.sweep_id,
|
|
'health': {
|
|
'status': self.health.value,
|
|
'score': round(self.health_score, 2),
|
|
'reasons': self.health_reasons,
|
|
},
|
|
'age': {
|
|
'hours': round(self.baseline_age_hours, 1),
|
|
'is_stale': self.is_stale,
|
|
},
|
|
'summary': {
|
|
'new_devices': self.total_new,
|
|
'missing_devices': self.total_missing,
|
|
'changed_devices': self.total_changed,
|
|
'expected_changes': len(self.expected_changes),
|
|
'unexpected_changes': len(self.unexpected_changes),
|
|
},
|
|
'new_devices': [
|
|
{'identifier': d.identifier, 'protocol': d.protocol,
|
|
'description': d.description, 'details': d.details}
|
|
for d in self.new_devices
|
|
],
|
|
'missing_devices': [
|
|
{'identifier': d.identifier, 'protocol': d.protocol,
|
|
'description': d.description, 'details': d.details}
|
|
for d in self.missing_devices
|
|
],
|
|
'changed_devices': [
|
|
{'identifier': d.identifier, 'protocol': d.protocol,
|
|
'change_type': d.change_type, 'description': d.description,
|
|
'expected': d.expected, 'details': d.details}
|
|
for d in self.changed_devices
|
|
],
|
|
'disclaimer': (
|
|
"Baseline comparison shows differences, not confirmed threats. "
|
|
"New devices may be legitimate. Missing devices may have been powered off."
|
|
),
|
|
}
|
|
|
|
|
|
def calculate_baseline_diff(
|
|
baseline: dict,
|
|
current_wifi: list[dict],
|
|
current_bt: list[dict],
|
|
current_rf: list[dict],
|
|
sweep_id: int
|
|
) -> BaselineDiff:
|
|
"""
|
|
Calculate comprehensive diff between baseline and current scan.
|
|
|
|
Args:
|
|
baseline: Baseline dict from database
|
|
current_wifi: Current WiFi devices
|
|
current_bt: Current Bluetooth devices
|
|
current_rf: Current RF signals
|
|
sweep_id: Current sweep ID
|
|
|
|
Returns:
|
|
BaselineDiff with complete comparison results
|
|
"""
|
|
diff = BaselineDiff(
|
|
baseline_id=baseline.get('id', 0),
|
|
sweep_id=sweep_id
|
|
)
|
|
|
|
# Calculate baseline age
|
|
created_at = baseline.get('created_at')
|
|
if created_at:
|
|
if isinstance(created_at, str):
|
|
try:
|
|
created = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
|
|
diff.baseline_age_hours = (datetime.now() - created.replace(tzinfo=None)).total_seconds() / 3600
|
|
except ValueError:
|
|
diff.baseline_age_hours = 0
|
|
elif isinstance(created_at, datetime):
|
|
diff.baseline_age_hours = (datetime.now() - created_at).total_seconds() / 3600
|
|
|
|
# Check if baseline is stale (>72 hours old)
|
|
diff.is_stale = diff.baseline_age_hours > 72
|
|
|
|
# Build baseline lookup dicts
|
|
baseline_wifi = {
|
|
d.get('bssid', d.get('mac', '')).upper(): d
|
|
for d in baseline.get('wifi_networks', [])
|
|
if d.get('bssid') or d.get('mac')
|
|
}
|
|
baseline_bt = {
|
|
d.get('mac', d.get('address', '')).upper(): d
|
|
for d in baseline.get('bt_devices', [])
|
|
if d.get('mac') or d.get('address')
|
|
}
|
|
baseline_rf = {
|
|
round(d.get('frequency', 0), 1): d
|
|
for d in baseline.get('rf_frequencies', [])
|
|
if d.get('frequency')
|
|
}
|
|
|
|
# Compare WiFi
|
|
_compare_wifi(diff, baseline_wifi, current_wifi)
|
|
|
|
# Compare Bluetooth
|
|
_compare_bluetooth(diff, baseline_bt, current_bt)
|
|
|
|
# Compare RF
|
|
_compare_rf(diff, baseline_rf, current_rf)
|
|
|
|
# Calculate totals
|
|
diff.total_new = len(diff.new_devices)
|
|
diff.total_missing = len(diff.missing_devices)
|
|
diff.total_changed = len(diff.changed_devices)
|
|
|
|
# Separate expected vs unexpected changes
|
|
for change in diff.new_devices + diff.missing_devices + diff.changed_devices:
|
|
if change.expected:
|
|
diff.expected_changes.append(change)
|
|
else:
|
|
diff.unexpected_changes.append(change)
|
|
|
|
# Calculate health
|
|
_calculate_baseline_health(diff, baseline)
|
|
|
|
return diff
|
|
|
|
|
|
def _compare_wifi(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None:
|
|
"""Compare WiFi devices between baseline and current."""
|
|
current_macs = {
|
|
d.get('bssid', d.get('mac', '')).upper(): d
|
|
for d in current
|
|
if d.get('bssid') or d.get('mac')
|
|
}
|
|
|
|
# Find new devices
|
|
for mac, device in current_macs.items():
|
|
if mac not in baseline:
|
|
ssid = device.get('essid', device.get('ssid', 'Hidden'))
|
|
diff.new_devices.append(DeviceChange(
|
|
identifier=mac,
|
|
protocol='wifi',
|
|
change_type='new',
|
|
description=f'New WiFi AP: {ssid}',
|
|
expected=False,
|
|
details={
|
|
'ssid': ssid,
|
|
'channel': device.get('channel'),
|
|
'rssi': device.get('power', device.get('signal')),
|
|
}
|
|
))
|
|
else:
|
|
# Check for changes
|
|
baseline_dev = baseline[mac]
|
|
changes = []
|
|
|
|
# RSSI drift
|
|
curr_rssi = device.get('power', device.get('signal'))
|
|
base_rssi = baseline_dev.get('power', baseline_dev.get('signal'))
|
|
if curr_rssi and base_rssi:
|
|
rssi_diff = abs(int(curr_rssi) - int(base_rssi))
|
|
if rssi_diff > 15:
|
|
changes.append(('rssi_drift', f'RSSI changed by {rssi_diff} dBm'))
|
|
|
|
# Channel change
|
|
curr_chan = device.get('channel')
|
|
base_chan = baseline_dev.get('channel')
|
|
if curr_chan and base_chan and curr_chan != base_chan:
|
|
changes.append(('channel_change', f'Channel changed from {base_chan} to {curr_chan}'))
|
|
|
|
# Security change
|
|
curr_sec = device.get('encryption', device.get('privacy', ''))
|
|
base_sec = baseline_dev.get('encryption', baseline_dev.get('privacy', ''))
|
|
if curr_sec and base_sec and curr_sec != base_sec:
|
|
changes.append(('security_change', f'Security changed from {base_sec} to {curr_sec}'))
|
|
|
|
for change_type, desc in changes:
|
|
diff.changed_devices.append(DeviceChange(
|
|
identifier=mac,
|
|
protocol='wifi',
|
|
change_type=change_type,
|
|
description=desc,
|
|
expected=change_type == 'rssi_drift', # RSSI drift is often expected
|
|
details={
|
|
'ssid': device.get('essid', device.get('ssid')),
|
|
'baseline': baseline_dev,
|
|
'current': device,
|
|
}
|
|
))
|
|
|
|
# Find missing devices
|
|
for mac, device in baseline.items():
|
|
if mac not in current_macs:
|
|
ssid = device.get('essid', device.get('ssid', 'Hidden'))
|
|
diff.missing_devices.append(DeviceChange(
|
|
identifier=mac,
|
|
protocol='wifi',
|
|
change_type='missing',
|
|
description=f'Missing WiFi AP: {ssid}',
|
|
expected=False, # Could be powered off
|
|
details={
|
|
'ssid': ssid,
|
|
'last_channel': device.get('channel'),
|
|
}
|
|
))
|
|
|
|
|
|
def _compare_bluetooth(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None:
|
|
"""Compare Bluetooth devices between baseline and current."""
|
|
current_macs = {
|
|
d.get('mac', d.get('address', '')).upper(): d
|
|
for d in current
|
|
if d.get('mac') or d.get('address')
|
|
}
|
|
|
|
# Find new devices
|
|
for mac, device in current_macs.items():
|
|
if mac not in baseline:
|
|
name = device.get('name', 'Unknown')
|
|
diff.new_devices.append(DeviceChange(
|
|
identifier=mac,
|
|
protocol='bluetooth',
|
|
change_type='new',
|
|
description=f'New BLE device: {name}',
|
|
expected=False,
|
|
details={
|
|
'name': name,
|
|
'rssi': device.get('rssi'),
|
|
'manufacturer': device.get('manufacturer'),
|
|
}
|
|
))
|
|
else:
|
|
# Check for changes
|
|
baseline_dev = baseline[mac]
|
|
|
|
# Name change (device renamed)
|
|
curr_name = device.get('name', '')
|
|
base_name = baseline_dev.get('name', '')
|
|
if curr_name and base_name and curr_name != base_name:
|
|
diff.changed_devices.append(DeviceChange(
|
|
identifier=mac,
|
|
protocol='bluetooth',
|
|
change_type='name_change',
|
|
description=f'Device renamed: {base_name} -> {curr_name}',
|
|
expected=True,
|
|
details={'old_name': base_name, 'new_name': curr_name}
|
|
))
|
|
|
|
# Find missing devices
|
|
for mac, device in baseline.items():
|
|
if mac not in current_macs:
|
|
name = device.get('name', 'Unknown')
|
|
diff.missing_devices.append(DeviceChange(
|
|
identifier=mac,
|
|
protocol='bluetooth',
|
|
change_type='missing',
|
|
description=f'Missing BLE device: {name}',
|
|
expected=True, # BLE devices often go to sleep
|
|
details={'name': name}
|
|
))
|
|
|
|
|
|
def _compare_rf(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None:
|
|
"""Compare RF signals between baseline and current."""
|
|
current_freqs = {
|
|
round(s.get('frequency', 0), 1): s
|
|
for s in current
|
|
if s.get('frequency')
|
|
}
|
|
|
|
# Find new signals
|
|
for freq, signal in current_freqs.items():
|
|
if freq not in baseline:
|
|
diff.new_devices.append(DeviceChange(
|
|
identifier=f'{freq:.1f} MHz',
|
|
protocol='rf',
|
|
change_type='new',
|
|
description=f'New RF signal at {freq:.3f} MHz',
|
|
expected=False,
|
|
details={
|
|
'frequency': freq,
|
|
'power': signal.get('power', signal.get('level')),
|
|
'modulation': signal.get('modulation'),
|
|
}
|
|
))
|
|
|
|
# Find missing signals
|
|
for freq, signal in baseline.items():
|
|
if freq not in current_freqs:
|
|
diff.missing_devices.append(DeviceChange(
|
|
identifier=f'{freq:.1f} MHz',
|
|
protocol='rf',
|
|
change_type='missing',
|
|
description=f'Missing RF signal at {freq:.1f} MHz',
|
|
expected=True, # RF signals can be intermittent
|
|
details={'frequency': freq}
|
|
))
|
|
|
|
|
|
def _calculate_baseline_health(diff: BaselineDiff, baseline: dict) -> None:
|
|
"""Calculate baseline health score and status."""
|
|
score = 1.0
|
|
reasons = []
|
|
|
|
# Age penalty
|
|
if diff.baseline_age_hours > 168: # > 1 week
|
|
score -= 0.4
|
|
reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old (>1 week)")
|
|
elif diff.baseline_age_hours > 72: # > 3 days
|
|
score -= 0.2
|
|
reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old (>3 days)")
|
|
elif diff.baseline_age_hours > 24:
|
|
score -= 0.1
|
|
reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old")
|
|
|
|
# Device churn penalty
|
|
total_baseline = (
|
|
len(baseline.get('wifi_networks', [])) +
|
|
len(baseline.get('bt_devices', [])) +
|
|
len(baseline.get('rf_frequencies', []))
|
|
)
|
|
|
|
if total_baseline > 0:
|
|
churn_rate = (diff.total_new + diff.total_missing) / total_baseline
|
|
if churn_rate > 0.5:
|
|
score -= 0.3
|
|
reasons.append(f"High device churn rate: {churn_rate:.0%}")
|
|
elif churn_rate > 0.25:
|
|
score -= 0.15
|
|
reasons.append(f"Moderate device churn rate: {churn_rate:.0%}")
|
|
|
|
# Small baseline penalty
|
|
if total_baseline < 3:
|
|
score -= 0.2
|
|
reasons.append(f"Baseline has few devices ({total_baseline}) - may be incomplete")
|
|
|
|
# Set health status
|
|
diff.health_score = max(0, min(1, score))
|
|
|
|
if diff.health_score >= 0.7:
|
|
diff.health = BaselineHealth.HEALTHY
|
|
elif diff.health_score >= 0.4:
|
|
diff.health = BaselineHealth.NOISY
|
|
if not reasons:
|
|
reasons.append("Baseline showing moderate variability")
|
|
else:
|
|
diff.health = BaselineHealth.STALE
|
|
if not reasons:
|
|
reasons.append("Baseline requires refresh")
|
|
|
|
diff.health_reasons = reasons
|
|
|
|
|
|
# =============================================================================
|
|
# 3. Per-Device Timelines
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class DeviceObservation:
|
|
"""A single observation of a device."""
|
|
timestamp: datetime
|
|
rssi: Optional[int] = None
|
|
present: bool = True
|
|
channel: Optional[int] = None
|
|
frequency: Optional[float] = None
|
|
attributes: dict = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class DeviceTimeline:
|
|
"""
|
|
Complete timeline for a device showing behavior over time.
|
|
|
|
Used to assess signal stability, movement patterns, and
|
|
meeting window correlation.
|
|
"""
|
|
identifier: str
|
|
protocol: str
|
|
name: Optional[str] = None
|
|
|
|
# Observation history (time-bucketed)
|
|
observations: list[DeviceObservation] = field(default_factory=list)
|
|
|
|
# Computed metrics
|
|
first_seen: Optional[datetime] = None
|
|
last_seen: Optional[datetime] = None
|
|
total_observations: int = 0
|
|
presence_ratio: float = 0.0 # % of time device was present
|
|
|
|
# Signal metrics
|
|
rssi_min: Optional[int] = None
|
|
rssi_max: Optional[int] = None
|
|
rssi_mean: Optional[float] = None
|
|
rssi_stability: float = 0.0 # 0-1, higher = more stable
|
|
|
|
# Movement assessment
|
|
appears_stationary: bool = True
|
|
movement_pattern: str = 'unknown' # 'stationary', 'mobile', 'intermittent'
|
|
|
|
# Meeting correlation
|
|
meeting_correlated: bool = False
|
|
meeting_observations: int = 0
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
'identifier': self.identifier,
|
|
'protocol': self.protocol,
|
|
'name': self.name,
|
|
'observations': [
|
|
{
|
|
'timestamp': obs.timestamp.isoformat(),
|
|
'rssi': obs.rssi,
|
|
'present': obs.present,
|
|
'channel': obs.channel,
|
|
'frequency': obs.frequency,
|
|
}
|
|
for obs in self.observations[-50:] # Limit to last 50
|
|
],
|
|
'metrics': {
|
|
'first_seen': self.first_seen.isoformat() if self.first_seen else None,
|
|
'last_seen': self.last_seen.isoformat() if self.last_seen else None,
|
|
'total_observations': self.total_observations,
|
|
'presence_ratio': round(self.presence_ratio, 2),
|
|
},
|
|
'signal': {
|
|
'rssi_min': self.rssi_min,
|
|
'rssi_max': self.rssi_max,
|
|
'rssi_mean': round(self.rssi_mean, 1) if self.rssi_mean else None,
|
|
'stability': round(self.rssi_stability, 2),
|
|
},
|
|
'movement': {
|
|
'appears_stationary': self.appears_stationary,
|
|
'pattern': self.movement_pattern,
|
|
},
|
|
'meeting_correlation': {
|
|
'correlated': self.meeting_correlated,
|
|
'observations_during_meeting': self.meeting_observations,
|
|
},
|
|
}
|
|
|
|
|
|
class TimelineManager:
|
|
"""
|
|
Manages per-device timelines with time-bucketing.
|
|
|
|
Buckets observations to keep memory bounded while preserving
|
|
useful behavioral patterns.
|
|
"""
|
|
|
|
def __init__(self, bucket_seconds: int = 30, max_observations: int = 200):
|
|
"""
|
|
Args:
|
|
bucket_seconds: Time bucket size in seconds
|
|
max_observations: Maximum observations to keep per device
|
|
"""
|
|
self.bucket_seconds = bucket_seconds
|
|
self.max_observations = max_observations
|
|
self.timelines: dict[str, DeviceTimeline] = {}
|
|
self._meeting_windows: list[tuple[datetime, Optional[datetime]]] = []
|
|
|
|
def add_observation(
|
|
self,
|
|
identifier: str,
|
|
protocol: str,
|
|
rssi: Optional[int] = None,
|
|
channel: Optional[int] = None,
|
|
frequency: Optional[float] = None,
|
|
name: Optional[str] = None,
|
|
attributes: Optional[dict] = None
|
|
) -> None:
|
|
"""Add an observation for a device."""
|
|
key = f"{protocol}:{identifier.upper()}"
|
|
now = datetime.now()
|
|
|
|
if key not in self.timelines:
|
|
self.timelines[key] = DeviceTimeline(
|
|
identifier=identifier.upper(),
|
|
protocol=protocol,
|
|
name=name,
|
|
first_seen=now,
|
|
)
|
|
|
|
timeline = self.timelines[key]
|
|
|
|
# Update name if provided
|
|
if name:
|
|
timeline.name = name
|
|
|
|
# Check if we should bucket with previous observation
|
|
if timeline.observations:
|
|
last_obs = timeline.observations[-1]
|
|
time_diff = (now - last_obs.timestamp).total_seconds()
|
|
|
|
if time_diff < self.bucket_seconds:
|
|
# Update existing bucket
|
|
if rssi is not None:
|
|
# Average RSSI
|
|
if last_obs.rssi is not None:
|
|
last_obs.rssi = (last_obs.rssi + rssi) // 2
|
|
else:
|
|
last_obs.rssi = rssi
|
|
return
|
|
|
|
# Add new observation
|
|
obs = DeviceObservation(
|
|
timestamp=now,
|
|
rssi=rssi,
|
|
present=True,
|
|
channel=channel,
|
|
frequency=frequency,
|
|
attributes=attributes or {},
|
|
)
|
|
timeline.observations.append(obs)
|
|
|
|
# Enforce max observations
|
|
if len(timeline.observations) > self.max_observations:
|
|
timeline.observations = timeline.observations[-self.max_observations:]
|
|
|
|
# Update metrics
|
|
timeline.last_seen = now
|
|
timeline.total_observations = len(timeline.observations)
|
|
|
|
# Check meeting correlation
|
|
if self._is_during_meeting(now):
|
|
timeline.meeting_observations += 1
|
|
timeline.meeting_correlated = True
|
|
|
|
def start_meeting_window(self) -> None:
|
|
"""Mark the start of a meeting window."""
|
|
self._meeting_windows.append((datetime.now(), None))
|
|
|
|
def end_meeting_window(self) -> None:
|
|
"""Mark the end of a meeting window."""
|
|
if self._meeting_windows and self._meeting_windows[-1][1] is None:
|
|
start = self._meeting_windows[-1][0]
|
|
self._meeting_windows[-1] = (start, datetime.now())
|
|
|
|
def _is_during_meeting(self, timestamp: datetime) -> bool:
|
|
"""Check if timestamp falls within a meeting window."""
|
|
for start, end in self._meeting_windows:
|
|
if end is None:
|
|
if timestamp >= start:
|
|
return True
|
|
elif start <= timestamp <= end:
|
|
return True
|
|
return False
|
|
|
|
def compute_metrics(self, identifier: str, protocol: str) -> Optional[DeviceTimeline]:
|
|
"""Compute all metrics for a device timeline."""
|
|
key = f"{protocol}:{identifier.upper()}"
|
|
if key not in self.timelines:
|
|
return None
|
|
|
|
timeline = self.timelines[key]
|
|
|
|
if not timeline.observations:
|
|
return timeline
|
|
|
|
# RSSI metrics
|
|
rssi_values = [obs.rssi for obs in timeline.observations if obs.rssi is not None]
|
|
if rssi_values:
|
|
timeline.rssi_min = min(rssi_values)
|
|
timeline.rssi_max = max(rssi_values)
|
|
timeline.rssi_mean = sum(rssi_values) / len(rssi_values)
|
|
|
|
# Calculate stability (0-1)
|
|
if len(rssi_values) >= 3:
|
|
variance = sum((r - timeline.rssi_mean) ** 2 for r in rssi_values) / len(rssi_values)
|
|
timeline.rssi_stability = max(0, 1 - (variance / 100))
|
|
|
|
# Movement assessment based on RSSI variance
|
|
rssi_range = timeline.rssi_max - timeline.rssi_min
|
|
if rssi_range < 10:
|
|
timeline.appears_stationary = True
|
|
timeline.movement_pattern = 'stationary'
|
|
elif rssi_range < 25:
|
|
timeline.appears_stationary = False
|
|
timeline.movement_pattern = 'mobile'
|
|
else:
|
|
timeline.appears_stationary = False
|
|
timeline.movement_pattern = 'intermittent'
|
|
|
|
# Presence ratio
|
|
if timeline.first_seen and timeline.last_seen:
|
|
total_duration = (timeline.last_seen - timeline.first_seen).total_seconds()
|
|
if total_duration > 0:
|
|
# Estimate presence based on observation count and bucket size
|
|
estimated_present_time = timeline.total_observations * self.bucket_seconds
|
|
timeline.presence_ratio = min(1.0, estimated_present_time / total_duration)
|
|
|
|
return timeline
|
|
|
|
def get_timeline(self, identifier: str, protocol: str) -> Optional[DeviceTimeline]:
|
|
"""Get computed timeline for a device."""
|
|
return self.compute_metrics(identifier, protocol)
|
|
|
|
def get_all_timelines(self) -> list[DeviceTimeline]:
|
|
"""Get all device timelines with computed metrics."""
|
|
for key in self.timelines:
|
|
protocol, identifier = key.split(':', 1)
|
|
self.compute_metrics(identifier, protocol)
|
|
return list(self.timelines.values())
|
|
|
|
|
|
# =============================================================================
|
|
# 5. Meeting-Window Summary Enhancements
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class MeetingWindowSummary:
|
|
"""
|
|
Summary of device activity during a meeting window.
|
|
|
|
Tracks devices first seen during meeting, behavior changes,
|
|
and applies meeting-window scoring modifiers.
|
|
"""
|
|
meeting_id: int
|
|
name: Optional[str] = None
|
|
start_time: Optional[datetime] = None
|
|
end_time: Optional[datetime] = None
|
|
duration_minutes: float = 0.0
|
|
|
|
# Devices first seen during meeting (high interest)
|
|
devices_first_seen: list[dict] = field(default_factory=list)
|
|
|
|
# Devices with behavior change during meeting
|
|
devices_behavior_change: list[dict] = field(default_factory=list)
|
|
|
|
# All active devices during meeting
|
|
active_devices: list[dict] = field(default_factory=list)
|
|
|
|
# Summary metrics
|
|
total_devices_active: int = 0
|
|
new_devices_count: int = 0
|
|
behavior_changes_count: int = 0
|
|
high_interest_count: int = 0
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
return {
|
|
'meeting_id': self.meeting_id,
|
|
'name': self.name,
|
|
'start_time': self.start_time.isoformat() if self.start_time else None,
|
|
'end_time': self.end_time.isoformat() if self.end_time else None,
|
|
'duration_minutes': round(self.duration_minutes, 1),
|
|
'summary': {
|
|
'total_devices_active': self.total_devices_active,
|
|
'new_devices': self.new_devices_count,
|
|
'behavior_changes': self.behavior_changes_count,
|
|
'high_interest': self.high_interest_count,
|
|
},
|
|
'devices_first_seen': self.devices_first_seen,
|
|
'devices_behavior_change': self.devices_behavior_change,
|
|
'disclaimer': (
|
|
"Meeting-correlated activity indicates temporal correlation only, "
|
|
"not confirmed surveillance. Devices may have legitimate reasons "
|
|
"for appearing during meetings."
|
|
),
|
|
}
|
|
|
|
|
|
def generate_meeting_summary(
|
|
meeting_window: dict,
|
|
device_timelines: list[DeviceTimeline],
|
|
device_profiles: list[dict]
|
|
) -> MeetingWindowSummary:
|
|
"""
|
|
Generate summary of device activity during a meeting window.
|
|
|
|
Args:
|
|
meeting_window: Meeting window dict from database
|
|
device_timelines: List of device timelines
|
|
device_profiles: List of device profiles from correlation engine
|
|
|
|
Returns:
|
|
MeetingWindowSummary with analysis
|
|
"""
|
|
summary = MeetingWindowSummary(
|
|
meeting_id=meeting_window.get('id', 0),
|
|
name=meeting_window.get('name'),
|
|
)
|
|
|
|
# Parse times
|
|
start_str = meeting_window.get('start_time')
|
|
end_str = meeting_window.get('end_time')
|
|
|
|
if start_str:
|
|
if isinstance(start_str, str):
|
|
summary.start_time = datetime.fromisoformat(start_str.replace('Z', '+00:00')).replace(tzinfo=None)
|
|
else:
|
|
summary.start_time = start_str
|
|
|
|
if end_str:
|
|
if isinstance(end_str, str):
|
|
summary.end_time = datetime.fromisoformat(end_str.replace('Z', '+00:00')).replace(tzinfo=None)
|
|
else:
|
|
summary.end_time = end_str
|
|
|
|
if summary.start_time and summary.end_time:
|
|
summary.duration_minutes = (summary.end_time - summary.start_time).total_seconds() / 60
|
|
|
|
if not summary.start_time:
|
|
return summary
|
|
|
|
# Analyze device timelines
|
|
for timeline in device_timelines:
|
|
if not timeline.first_seen:
|
|
continue
|
|
|
|
# Check if device was active during meeting
|
|
was_active = False
|
|
first_seen_during = False
|
|
|
|
for obs in timeline.observations:
|
|
if summary.end_time:
|
|
if summary.start_time <= obs.timestamp <= summary.end_time:
|
|
was_active = True
|
|
if timeline.first_seen and abs((obs.timestamp - timeline.first_seen).total_seconds()) < 60:
|
|
first_seen_during = True
|
|
break
|
|
else:
|
|
# Meeting still ongoing
|
|
if obs.timestamp >= summary.start_time:
|
|
was_active = True
|
|
if timeline.first_seen and abs((obs.timestamp - timeline.first_seen).total_seconds()) < 60:
|
|
first_seen_during = True
|
|
break
|
|
|
|
if was_active:
|
|
device_info = {
|
|
'identifier': timeline.identifier,
|
|
'protocol': timeline.protocol,
|
|
'name': timeline.name,
|
|
'meeting_correlated': True,
|
|
}
|
|
summary.active_devices.append(device_info)
|
|
|
|
if first_seen_during:
|
|
device_info['first_seen_during_meeting'] = True
|
|
summary.devices_first_seen.append({
|
|
**device_info,
|
|
'description': 'Device first seen during meeting window',
|
|
'risk_modifier': '+2 (meeting-correlated activity)',
|
|
})
|
|
|
|
# Update counts
|
|
summary.total_devices_active = len(summary.active_devices)
|
|
summary.new_devices_count = len(summary.devices_first_seen)
|
|
summary.behavior_changes_count = len(summary.devices_behavior_change)
|
|
|
|
# Count high interest from profiles
|
|
for profile in device_profiles:
|
|
if profile.get('risk_level') == 'high_interest':
|
|
indicators = profile.get('indicators', [])
|
|
if any(i.get('type') == 'meeting_correlated' for i in indicators):
|
|
summary.high_interest_count += 1
|
|
|
|
return summary
|
|
|
|
|
|
# =============================================================================
|
|
# 7. WiFi Advanced Indicators (LIMITED SCOPE)
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class WiFiAdvancedIndicator:
|
|
"""An advanced WiFi indicator detection."""
|
|
indicator_type: str # 'evil_twin', 'probe_request', 'deauth_burst'
|
|
severity: str # 'high', 'medium', 'low'
|
|
description: str
|
|
details: dict = field(default_factory=dict)
|
|
timestamp: datetime = field(default_factory=datetime.now)
|
|
requires_monitor_mode: bool = False
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
'type': self.indicator_type,
|
|
'severity': self.severity,
|
|
'description': self.description,
|
|
'details': self.details,
|
|
'timestamp': self.timestamp.isoformat(),
|
|
'requires_monitor_mode': self.requires_monitor_mode,
|
|
'disclaimer': (
|
|
"Pattern detected - this is an indicator, not confirmation of an attack. "
|
|
"Further investigation required."
|
|
),
|
|
}
|
|
|
|
|
|
class WiFiAdvancedDetector:
|
|
"""
|
|
Detects advanced WiFi indicators.
|
|
|
|
LIMITED SCOPE - Only implements:
|
|
1. Evil Twin patterns (same SSID, different BSSID/security/abnormal signal)
|
|
2. Probe requests for sensitive SSIDs (requires monitor mode)
|
|
3. Deauthentication bursts (requires monitor mode)
|
|
|
|
All findings labeled as "pattern detected", never called attacks.
|
|
"""
|
|
|
|
def __init__(self, monitor_mode_available: bool = False):
|
|
self.monitor_mode = monitor_mode_available
|
|
self.known_networks: dict[str, dict] = {} # SSID -> expected BSSID/security
|
|
self.probe_requests: list[dict] = []
|
|
self.deauth_frames: list[dict] = []
|
|
self.indicators: list[WiFiAdvancedIndicator] = []
|
|
|
|
def set_known_networks(self, networks: list[dict]) -> None:
|
|
"""Set known/expected networks from baseline."""
|
|
for net in networks:
|
|
ssid = net.get('essid', net.get('ssid', ''))
|
|
if ssid:
|
|
self.known_networks[ssid] = {
|
|
'bssid': net.get('bssid', net.get('mac', '')).upper(),
|
|
'security': net.get('encryption', net.get('privacy', '')),
|
|
'channel': net.get('channel'),
|
|
'rssi': net.get('power', net.get('signal')),
|
|
}
|
|
|
|
def analyze_network(self, network: dict) -> list[WiFiAdvancedIndicator]:
|
|
"""
|
|
Analyze a network for evil twin patterns.
|
|
|
|
Detects: Same SSID with different BSSID, security, or abnormal signal.
|
|
"""
|
|
indicators = []
|
|
ssid = network.get('essid', network.get('ssid', ''))
|
|
bssid = network.get('bssid', network.get('mac', '')).upper()
|
|
security = network.get('encryption', network.get('privacy', ''))
|
|
rssi = network.get('power', network.get('signal'))
|
|
|
|
if not ssid or ssid in ['', 'Hidden', '[Hidden]']:
|
|
return indicators
|
|
|
|
if ssid in self.known_networks:
|
|
known = self.known_networks[ssid]
|
|
|
|
# Different BSSID for same SSID
|
|
if known['bssid'] and known['bssid'] != bssid:
|
|
# Check security mismatch
|
|
security_mismatch = known['security'] and security and known['security'] != security
|
|
|
|
# Check signal anomaly (significantly stronger than expected)
|
|
signal_anomaly = False
|
|
if rssi and known.get('rssi'):
|
|
try:
|
|
rssi_diff = int(rssi) - int(known['rssi'])
|
|
signal_anomaly = rssi_diff > 20 # Much stronger than expected
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if security_mismatch:
|
|
indicators.append(WiFiAdvancedIndicator(
|
|
indicator_type='evil_twin',
|
|
severity='high',
|
|
description=f'Evil twin pattern detected for SSID "{ssid}"',
|
|
details={
|
|
'ssid': ssid,
|
|
'detected_bssid': bssid,
|
|
'expected_bssid': known['bssid'],
|
|
'detected_security': security,
|
|
'expected_security': known['security'],
|
|
'pattern': 'Different BSSID with security downgrade',
|
|
},
|
|
requires_monitor_mode=False,
|
|
))
|
|
elif signal_anomaly:
|
|
indicators.append(WiFiAdvancedIndicator(
|
|
indicator_type='evil_twin',
|
|
severity='medium',
|
|
description=f'Possible evil twin pattern for SSID "{ssid}"',
|
|
details={
|
|
'ssid': ssid,
|
|
'detected_bssid': bssid,
|
|
'expected_bssid': known['bssid'],
|
|
'signal_difference': f'+{rssi_diff} dBm stronger than expected',
|
|
'pattern': 'Different BSSID with abnormally strong signal',
|
|
},
|
|
requires_monitor_mode=False,
|
|
))
|
|
else:
|
|
indicators.append(WiFiAdvancedIndicator(
|
|
indicator_type='evil_twin',
|
|
severity='low',
|
|
description=f'Duplicate SSID detected: "{ssid}"',
|
|
details={
|
|
'ssid': ssid,
|
|
'detected_bssid': bssid,
|
|
'expected_bssid': known['bssid'],
|
|
'pattern': 'Multiple APs with same SSID (may be legitimate)',
|
|
},
|
|
requires_monitor_mode=False,
|
|
))
|
|
|
|
self.indicators.extend(indicators)
|
|
return indicators
|
|
|
|
def add_probe_request(self, frame: dict) -> Optional[WiFiAdvancedIndicator]:
|
|
"""
|
|
Record a probe request frame (requires monitor mode).
|
|
|
|
Detects repeated probing for sensitive SSIDs.
|
|
"""
|
|
if not self.monitor_mode:
|
|
return None
|
|
|
|
self.probe_requests.append({
|
|
'timestamp': datetime.now(),
|
|
'src_mac': frame.get('src_mac', '').upper(),
|
|
'probed_ssid': frame.get('ssid', ''),
|
|
})
|
|
|
|
# Keep last 1000 probe requests
|
|
if len(self.probe_requests) > 1000:
|
|
self.probe_requests = self.probe_requests[-1000:]
|
|
|
|
# Check for sensitive SSID probing
|
|
ssid = frame.get('ssid', '')
|
|
sensitive_patterns = [
|
|
'corp', 'internal', 'private', 'secure', 'vpn',
|
|
'admin', 'management', 'executive', 'board',
|
|
]
|
|
|
|
is_sensitive = any(p in ssid.lower() for p in sensitive_patterns) if ssid else False
|
|
|
|
if is_sensitive:
|
|
# Count recent probes for this SSID
|
|
recent_cutoff = datetime.now() - timedelta(minutes=5)
|
|
recent_probes = [
|
|
p for p in self.probe_requests
|
|
if p['probed_ssid'] == ssid and p['timestamp'] > recent_cutoff
|
|
]
|
|
|
|
if len(recent_probes) >= 3:
|
|
indicator = WiFiAdvancedIndicator(
|
|
indicator_type='probe_request',
|
|
severity='medium',
|
|
description=f'Repeated probing for sensitive SSID "{ssid}"',
|
|
details={
|
|
'ssid': ssid,
|
|
'probe_count': len(recent_probes),
|
|
'source_macs': list(set(p['src_mac'] for p in recent_probes)),
|
|
'pattern': 'Multiple probe requests for potentially sensitive network',
|
|
},
|
|
requires_monitor_mode=True,
|
|
)
|
|
self.indicators.append(indicator)
|
|
return indicator
|
|
|
|
return None
|
|
|
|
def add_deauth_frame(self, frame: dict) -> Optional[WiFiAdvancedIndicator]:
|
|
"""
|
|
Record a deauthentication frame (requires monitor mode).
|
|
|
|
Detects abnormal deauth volume potentially indicating attack.
|
|
"""
|
|
if not self.monitor_mode:
|
|
return None
|
|
|
|
self.deauth_frames.append({
|
|
'timestamp': datetime.now(),
|
|
'src_mac': frame.get('src_mac', '').upper(),
|
|
'dst_mac': frame.get('dst_mac', '').upper(),
|
|
'bssid': frame.get('bssid', '').upper(),
|
|
'reason': frame.get('reason_code'),
|
|
})
|
|
|
|
# Keep last 500 deauth frames
|
|
if len(self.deauth_frames) > 500:
|
|
self.deauth_frames = self.deauth_frames[-500:]
|
|
|
|
# Check for deauth burst (>10 deauths in 10 seconds)
|
|
recent_cutoff = datetime.now() - timedelta(seconds=10)
|
|
recent_deauths = [d for d in self.deauth_frames if d['timestamp'] > recent_cutoff]
|
|
|
|
if len(recent_deauths) >= 10:
|
|
# Check if targeting specific BSSID
|
|
bssid = frame.get('bssid', '').upper()
|
|
targeting_bssid = len([d for d in recent_deauths if d['bssid'] == bssid]) >= 5
|
|
|
|
indicator = WiFiAdvancedIndicator(
|
|
indicator_type='deauth_burst',
|
|
severity='high' if targeting_bssid else 'medium',
|
|
description='Deauthentication burst pattern detected',
|
|
details={
|
|
'deauth_count': len(recent_deauths),
|
|
'time_window_seconds': 10,
|
|
'targeted_bssid': bssid if targeting_bssid else None,
|
|
'unique_sources': len(set(d['src_mac'] for d in recent_deauths)),
|
|
'pattern': 'Abnormal deauthentication frame volume',
|
|
},
|
|
requires_monitor_mode=True,
|
|
)
|
|
self.indicators.append(indicator)
|
|
|
|
# Clear recent to avoid repeated alerts
|
|
self.deauth_frames = [d for d in self.deauth_frames if d['timestamp'] <= recent_cutoff]
|
|
|
|
return indicator
|
|
|
|
return None
|
|
|
|
def get_all_indicators(self) -> list[dict]:
|
|
"""Get all detected indicators."""
|
|
return [i.to_dict() for i in self.indicators]
|
|
|
|
def get_unavailable_features(self) -> list[str]:
|
|
"""Get list of features unavailable without monitor mode."""
|
|
if self.monitor_mode:
|
|
return []
|
|
return [
|
|
"Probe request analysis: Requires monitor mode to capture probe frames.",
|
|
"Deauthentication detection: Requires monitor mode to capture management frames.",
|
|
"Raw 802.11 frame analysis: Not available in managed mode.",
|
|
]
|
|
|
|
|
|
# =============================================================================
|
|
# 8. Bluetooth Risk Explainability & Proximity Heuristics
|
|
# =============================================================================
|
|
|
|
class BLEProximity(Enum):
|
|
"""RSSI-based proximity estimation."""
|
|
VERY_CLOSE = 'very_close' # Within ~1m
|
|
CLOSE = 'close' # Within ~3m
|
|
MODERATE = 'moderate' # Within ~10m
|
|
FAR = 'far' # Beyond ~10m
|
|
UNKNOWN = 'unknown'
|
|
|
|
|
|
@dataclass
|
|
class BLERiskExplanation:
|
|
"""
|
|
Explainable risk assessment for a BLE device.
|
|
|
|
Provides human-readable explanations, proximity estimates,
|
|
and recommended actions.
|
|
"""
|
|
identifier: str
|
|
name: Optional[str] = None
|
|
|
|
# Risk assessment
|
|
risk_level: str = 'informational'
|
|
risk_score: int = 0
|
|
risk_explanation: str = ''
|
|
|
|
# Proximity
|
|
proximity: BLEProximity = BLEProximity.UNKNOWN
|
|
proximity_explanation: str = ''
|
|
estimated_distance: str = ''
|
|
|
|
# Tracker detection
|
|
is_tracker: bool = False
|
|
tracker_type: Optional[str] = None
|
|
tracker_explanation: str = ''
|
|
|
|
# Meeting correlation
|
|
meeting_correlated: bool = False
|
|
meeting_explanation: str = ''
|
|
|
|
# Recommended action
|
|
recommended_action: str = ''
|
|
action_rationale: str = ''
|
|
|
|
# All indicators with explanations
|
|
indicators: list[dict] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
'identifier': self.identifier,
|
|
'name': self.name,
|
|
'risk': {
|
|
'level': self.risk_level,
|
|
'score': self.risk_score,
|
|
'explanation': self.risk_explanation,
|
|
},
|
|
'proximity': {
|
|
'estimate': self.proximity.value,
|
|
'explanation': self.proximity_explanation,
|
|
'estimated_distance': self.estimated_distance,
|
|
},
|
|
'tracker': {
|
|
'is_tracker': self.is_tracker,
|
|
'type': self.tracker_type,
|
|
'explanation': self.tracker_explanation,
|
|
},
|
|
'meeting_correlation': {
|
|
'correlated': self.meeting_correlated,
|
|
'explanation': self.meeting_explanation,
|
|
},
|
|
'recommended_action': {
|
|
'action': self.recommended_action,
|
|
'rationale': self.action_rationale,
|
|
},
|
|
'indicators': self.indicators,
|
|
'disclaimer': (
|
|
"Risk assessment is based on observable indicators and heuristics. "
|
|
"Proximity estimates are approximate based on RSSI and may vary with environment. "
|
|
"Tracker detection indicates brand presence, not confirmed threat."
|
|
),
|
|
}
|
|
|
|
|
|
def estimate_ble_proximity(rssi: int) -> tuple[BLEProximity, str, str]:
|
|
"""
|
|
Estimate BLE device proximity from RSSI.
|
|
|
|
Note: RSSI-based distance is highly variable due to:
|
|
- TX power differences between devices
|
|
- Environmental factors (walls, interference)
|
|
- Antenna characteristics
|
|
|
|
Returns:
|
|
Tuple of (proximity enum, explanation, estimated distance string)
|
|
"""
|
|
if rssi is None:
|
|
return (
|
|
BLEProximity.UNKNOWN,
|
|
"RSSI not available - cannot estimate proximity",
|
|
"Unknown"
|
|
)
|
|
|
|
# These thresholds are heuristic approximations
|
|
if rssi >= -50:
|
|
return (
|
|
BLEProximity.VERY_CLOSE,
|
|
f"Very strong signal ({rssi} dBm) suggests device is very close",
|
|
"< 1 meter (approximate)"
|
|
)
|
|
elif rssi >= -65:
|
|
return (
|
|
BLEProximity.CLOSE,
|
|
f"Strong signal ({rssi} dBm) suggests device is nearby",
|
|
"1-3 meters (approximate)"
|
|
)
|
|
elif rssi >= -80:
|
|
return (
|
|
BLEProximity.MODERATE,
|
|
f"Moderate signal ({rssi} dBm) suggests device is in the area",
|
|
"3-10 meters (approximate)"
|
|
)
|
|
else:
|
|
return (
|
|
BLEProximity.FAR,
|
|
f"Weak signal ({rssi} dBm) suggests device is distant",
|
|
"> 10 meters (approximate)"
|
|
)
|
|
|
|
|
|
def generate_ble_risk_explanation(
|
|
device: dict,
|
|
profile: Optional[dict] = None,
|
|
is_during_meeting: bool = False
|
|
) -> BLERiskExplanation:
|
|
"""
|
|
Generate human-readable risk explanation for a BLE device.
|
|
|
|
Args:
|
|
device: BLE device dict with mac, name, rssi, etc.
|
|
profile: DeviceProfile dict from correlation engine
|
|
is_during_meeting: Whether device was detected during meeting
|
|
|
|
Returns:
|
|
BLERiskExplanation with complete assessment
|
|
"""
|
|
mac = device.get('mac', device.get('address', '')).upper()
|
|
name = device.get('name', '')
|
|
rssi = device.get('rssi', device.get('signal'))
|
|
|
|
explanation = BLERiskExplanation(
|
|
identifier=mac,
|
|
name=name if name else None,
|
|
)
|
|
|
|
# Proximity estimation
|
|
if rssi:
|
|
try:
|
|
rssi_int = int(rssi)
|
|
prox, prox_exp, dist = estimate_ble_proximity(rssi_int)
|
|
explanation.proximity = prox
|
|
explanation.proximity_explanation = prox_exp
|
|
explanation.estimated_distance = dist
|
|
except (ValueError, TypeError):
|
|
explanation.proximity = BLEProximity.UNKNOWN
|
|
explanation.proximity_explanation = "Could not parse RSSI value"
|
|
|
|
# Tracker detection with explanation
|
|
tracker_info = device.get('tracker_type') or device.get('is_tracker')
|
|
if device.get('is_airtag'):
|
|
explanation.is_tracker = True
|
|
explanation.tracker_type = 'Apple AirTag'
|
|
explanation.tracker_explanation = (
|
|
"Apple AirTag detected via manufacturer data. AirTags are legitimate "
|
|
"tracking devices but may indicate unwanted tracking if not recognized. "
|
|
"Apple's Find My network will alert iPhone users to unknown AirTags."
|
|
)
|
|
elif device.get('is_tile'):
|
|
explanation.is_tracker = True
|
|
explanation.tracker_type = 'Tile'
|
|
explanation.tracker_explanation = (
|
|
"Tile tracker detected. Tile trackers are common consumer devices "
|
|
"for finding lost items. Presence does not indicate surveillance."
|
|
)
|
|
elif device.get('is_smarttag'):
|
|
explanation.is_tracker = True
|
|
explanation.tracker_type = 'Samsung SmartTag'
|
|
explanation.tracker_explanation = (
|
|
"Samsung SmartTag detected. SmartTags are consumer tracking devices "
|
|
"similar to AirTags. Samsung phones can detect unknown SmartTags."
|
|
)
|
|
elif device.get('is_espressif'):
|
|
explanation.tracker_type = 'ESP32/ESP8266'
|
|
explanation.tracker_explanation = (
|
|
"Espressif chipset (ESP32/ESP8266) detected. These are programmable "
|
|
"development boards commonly used in IoT projects. They can be configured "
|
|
"for various purposes including custom tracking devices."
|
|
)
|
|
|
|
# Meeting correlation explanation
|
|
if is_during_meeting or device.get('meeting_correlated'):
|
|
explanation.meeting_correlated = True
|
|
explanation.meeting_explanation = (
|
|
"Device detected during a marked meeting window. This temporal correlation "
|
|
"is noted but does not confirm malicious intent - many legitimate devices "
|
|
"are active during meetings (phones, laptops, wearables)."
|
|
)
|
|
|
|
# Build risk explanation from profile
|
|
if profile:
|
|
explanation.risk_level = profile.get('risk_level', 'informational')
|
|
explanation.risk_score = profile.get('total_score', 0)
|
|
|
|
# Convert indicators to explanations
|
|
for ind in profile.get('indicators', []):
|
|
ind_type = ind.get('type', '')
|
|
ind_desc = ind.get('description', '')
|
|
|
|
explanation.indicators.append({
|
|
'type': ind_type,
|
|
'description': ind_desc,
|
|
'explanation': _get_indicator_explanation(ind_type),
|
|
})
|
|
|
|
# Build overall risk explanation
|
|
if explanation.risk_level == 'high_interest':
|
|
explanation.risk_explanation = (
|
|
f"This device has accumulated {explanation.risk_score} risk points "
|
|
"across multiple indicators, warranting closer investigation. "
|
|
"High interest does not confirm surveillance - manual verification required."
|
|
)
|
|
elif explanation.risk_level == 'review':
|
|
explanation.risk_explanation = (
|
|
f"This device shows {explanation.risk_score} risk points indicating "
|
|
"it should be reviewed but is not immediately concerning."
|
|
)
|
|
else:
|
|
explanation.risk_explanation = (
|
|
"This device shows typical characteristics and does not raise "
|
|
"significant concerns based on observable indicators."
|
|
)
|
|
else:
|
|
explanation.risk_explanation = "No detailed profile available for risk assessment."
|
|
|
|
# Recommended action
|
|
_set_recommended_action(explanation)
|
|
|
|
return explanation
|
|
|
|
|
|
def _get_indicator_explanation(indicator_type: str) -> str:
|
|
"""Get human-readable explanation for an indicator type."""
|
|
explanations = {
|
|
'unknown_device': (
|
|
"Device manufacturer is unknown or uses a generic chipset. "
|
|
"This is common in DIY/hobbyist devices and some surveillance equipment."
|
|
),
|
|
'audio_capable': (
|
|
"Device advertises audio services (headphones, speakers, etc.). "
|
|
"Audio-capable devices could theoretically transmit captured audio."
|
|
),
|
|
'persistent': (
|
|
"Device has been detected repeatedly across multiple scans. "
|
|
"Persistence suggests a fixed or regularly present device."
|
|
),
|
|
'meeting_correlated': (
|
|
"Device activity correlates with marked meeting windows. "
|
|
"This is a temporal pattern that warrants attention."
|
|
),
|
|
'hidden_identity': (
|
|
"Device does not broadcast a name or uses minimal advertising. "
|
|
"Some legitimate devices minimize advertising for battery life."
|
|
),
|
|
'stable_rssi': (
|
|
"Signal strength is very stable, suggesting a stationary device. "
|
|
"Fixed placement could indicate a planted device."
|
|
),
|
|
'mac_rotation': (
|
|
"Device appears to use MAC address randomization. "
|
|
"This is a privacy feature in modern devices, also used to evade detection."
|
|
),
|
|
'known_tracker': (
|
|
"Device matches known tracking device signatures. "
|
|
"May be a legitimate item tracker or unwanted surveillance."
|
|
),
|
|
'airtag_detected': (
|
|
"Apple AirTag identified. Check if this belongs to someone present."
|
|
),
|
|
'tile_detected': (
|
|
"Tile tracker identified. Common consumer tracking device."
|
|
),
|
|
'smarttag_detected': (
|
|
"Samsung SmartTag identified. Consumer tracking device."
|
|
),
|
|
'esp32_device': (
|
|
"Espressif development board detected. Highly programmable, "
|
|
"could be configured for custom surveillance applications."
|
|
),
|
|
}
|
|
return explanations.get(indicator_type, "Indicator detected requiring review.")
|
|
|
|
|
|
def _set_recommended_action(explanation: BLERiskExplanation) -> None:
|
|
"""Set recommended action based on risk assessment."""
|
|
if explanation.risk_level == 'high_interest':
|
|
if explanation.is_tracker and explanation.proximity == BLEProximity.VERY_CLOSE:
|
|
explanation.recommended_action = 'Investigate immediately'
|
|
explanation.action_rationale = (
|
|
"Unknown tracker in very close proximity warrants immediate "
|
|
"physical search of the area and personal belongings."
|
|
)
|
|
elif explanation.is_tracker:
|
|
explanation.recommended_action = 'Investigate location'
|
|
explanation.action_rationale = (
|
|
"Tracker detected - recommend searching the area to locate "
|
|
"the physical device and determine if it belongs to someone present."
|
|
)
|
|
else:
|
|
explanation.recommended_action = 'Review and document'
|
|
explanation.action_rationale = (
|
|
"Multiple risk indicators present. Document the finding, "
|
|
"attempt to identify the device, and consider physical search "
|
|
"if other indicators suggest surveillance."
|
|
)
|
|
elif explanation.risk_level == 'review':
|
|
explanation.recommended_action = 'Monitor and document'
|
|
explanation.action_rationale = (
|
|
"Device shows some indicators worth noting. Add to monitoring list "
|
|
"and compare against future sweeps to identify patterns."
|
|
)
|
|
else:
|
|
explanation.recommended_action = 'Continue monitoring'
|
|
explanation.action_rationale = (
|
|
"No immediate action required. Device will be tracked in subsequent "
|
|
"sweeps for pattern analysis."
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# 9. Operator Playbooks ("What To Do Next")
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class PlaybookStep:
|
|
"""A single step in an operator playbook."""
|
|
step_number: int
|
|
action: str
|
|
details: str
|
|
safety_note: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class OperatorPlaybook:
|
|
"""
|
|
Procedural guidance for TSCM operators based on findings.
|
|
|
|
Playbooks are procedural (what to do), not prescriptive (how to decide).
|
|
All guidance is legally safe and professional.
|
|
"""
|
|
playbook_id: str
|
|
title: str
|
|
risk_level: str
|
|
description: str
|
|
steps: list[PlaybookStep] = field(default_factory=list)
|
|
when_to_escalate: str = ''
|
|
documentation_required: list[str] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
'playbook_id': self.playbook_id,
|
|
'title': self.title,
|
|
'risk_level': self.risk_level,
|
|
'description': self.description,
|
|
'steps': [
|
|
{
|
|
'step': s.step_number,
|
|
'action': s.action,
|
|
'details': s.details,
|
|
'safety_note': s.safety_note,
|
|
}
|
|
for s in self.steps
|
|
],
|
|
'when_to_escalate': self.when_to_escalate,
|
|
'documentation_required': self.documentation_required,
|
|
'disclaimer': (
|
|
"This playbook provides procedural guidance only. Actions should be "
|
|
"adapted to local laws, organizational policies, and professional judgment. "
|
|
"Do not disassemble, interfere with, or remove suspected devices without "
|
|
"proper authorization and legal guidance."
|
|
),
|
|
}
|
|
|
|
|
|
# Predefined playbooks by risk level
|
|
PLAYBOOKS = {
|
|
'high_interest_tracker': OperatorPlaybook(
|
|
playbook_id='PB-001',
|
|
title='High Interest: Unknown Tracker Detection',
|
|
risk_level='high_interest',
|
|
description='Guidance for responding to unknown tracking device detection',
|
|
steps=[
|
|
PlaybookStep(
|
|
step_number=1,
|
|
action='Document the finding',
|
|
details='Record device identifier, signal strength, location, and timestamp. Take screenshots of the detection.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=2,
|
|
action='Estimate device location',
|
|
details='Use signal strength variations while moving to triangulate approximate device position. Note areas of strongest signal.',
|
|
safety_note='Do not touch or disturb any physical device found.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=3,
|
|
action='Physical search (if authorized)',
|
|
details='Systematically search the high-signal area. Check common hiding spots: under furniture, in plants, behind fixtures, in bags/belongings.',
|
|
safety_note='Only conduct physical searches with proper authorization.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=4,
|
|
action='Identify device owner',
|
|
details='If device is located, determine if it belongs to someone legitimately present. Apple/Samsung/Tile devices can be scanned by their respective apps.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=5,
|
|
action='Escalate if unidentified',
|
|
details='If device owner cannot be determined and device is in sensitive location, escalate to security management.',
|
|
),
|
|
],
|
|
when_to_escalate='Escalate immediately if: device is concealed in sensitive area, owner cannot be identified, or multiple unknown trackers are found.',
|
|
documentation_required=[
|
|
'Device identifier (MAC address)',
|
|
'Signal strength readings at multiple locations',
|
|
'Physical location description',
|
|
'Photos of any located devices',
|
|
'Names of individuals present during search',
|
|
],
|
|
),
|
|
|
|
'high_interest_generic': OperatorPlaybook(
|
|
playbook_id='PB-002',
|
|
title='High Interest: Suspicious Device Pattern',
|
|
risk_level='high_interest',
|
|
description='Guidance for devices with multiple high-risk indicators',
|
|
steps=[
|
|
PlaybookStep(
|
|
step_number=1,
|
|
action='Review all indicators',
|
|
details='Examine each risk indicator in the device profile. Understand why the device scored high interest.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=2,
|
|
action='Cross-reference with baseline',
|
|
details='Check if device appears in baseline. New devices warrant more scrutiny than known devices.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=3,
|
|
action='Monitor for pattern',
|
|
details='Continue sweep and note if device persists, moves, or correlates with sensitive activities.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=4,
|
|
action='Attempt identification',
|
|
details='Research manufacturer OUI, check for matching devices in the environment, ask occupants about devices.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=5,
|
|
action='Document and report',
|
|
details='Add finding to sweep report with full details. Include in meeting/client debrief.',
|
|
),
|
|
],
|
|
when_to_escalate='Escalate if: device cannot be identified, shows surveillance-consistent behavior, or correlates strongly with sensitive activities.',
|
|
documentation_required=[
|
|
'Complete device profile',
|
|
'All risk indicators with scores',
|
|
'Timeline of observations',
|
|
'Correlation with meeting windows',
|
|
'Any identification attempts and results',
|
|
],
|
|
),
|
|
|
|
'needs_review': OperatorPlaybook(
|
|
playbook_id='PB-003',
|
|
title='Needs Review: Unknown Device',
|
|
risk_level='needs_review',
|
|
description='Guidance for devices requiring investigation but not immediately concerning',
|
|
steps=[
|
|
PlaybookStep(
|
|
step_number=1,
|
|
action='Note the device',
|
|
details='Add device to monitoring list. Record basic details: identifier, type, signal strength.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=2,
|
|
action='Check against known devices',
|
|
details='Verify device is not a known infrastructure device or personal device of authorized personnel.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=3,
|
|
action='Continue sweep',
|
|
details='Complete the sweep. Review device in context of all findings.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=4,
|
|
action='Assess in final review',
|
|
details='During sweep wrap-up, decide if device warrants further investigation or can be added to baseline.',
|
|
),
|
|
],
|
|
when_to_escalate='Escalate if: multiple "needs review" devices appear together, or device shows high-interest indicators in subsequent sweeps.',
|
|
documentation_required=[
|
|
'Device identifier and type',
|
|
'Brief description of why flagged',
|
|
'Decision made (investigate further / add to baseline / monitor)',
|
|
],
|
|
),
|
|
|
|
'informational': OperatorPlaybook(
|
|
playbook_id='PB-004',
|
|
title='Informational: Known/Expected Device',
|
|
risk_level='informational',
|
|
description='Guidance for devices that appear normal and expected',
|
|
steps=[
|
|
PlaybookStep(
|
|
step_number=1,
|
|
action='Verify against baseline',
|
|
details='Confirm device matches baseline entry. Note any changes (signal strength, channel, etc.).',
|
|
),
|
|
PlaybookStep(
|
|
step_number=2,
|
|
action='Log observation',
|
|
details='Record observation for timeline tracking. Even known devices should be logged.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=3,
|
|
action='Continue sweep',
|
|
details='No further action required. Proceed with sweep.',
|
|
),
|
|
],
|
|
when_to_escalate='Only escalate if device shows unexpected behavior changes or additional risk indicators.',
|
|
documentation_required=[
|
|
'Device identifier (for timeline)',
|
|
'Observation timestamp',
|
|
],
|
|
),
|
|
|
|
'wifi_evil_twin': OperatorPlaybook(
|
|
playbook_id='PB-005',
|
|
title='High Interest: Evil Twin Pattern Detected',
|
|
risk_level='high_interest',
|
|
description='Guidance when duplicate SSID with security mismatch is detected',
|
|
steps=[
|
|
PlaybookStep(
|
|
step_number=1,
|
|
action='Document both access points',
|
|
details='Record details of legitimate AP and suspected rogue: BSSID, security, signal strength, channel.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=2,
|
|
action='Verify legitimate AP',
|
|
details='Confirm which AP is the authorized infrastructure. Check with IT/facilities if needed.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=3,
|
|
action='Locate rogue AP',
|
|
details='Use signal strength to estimate rogue AP location. Walk the area noting signal variations.',
|
|
safety_note='Do not connect to or interact with the suspected rogue AP.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=4,
|
|
action='Physical search',
|
|
details='Search suspected area for unauthorized access point. Check for hidden devices, suspicious equipment.',
|
|
),
|
|
PlaybookStep(
|
|
step_number=5,
|
|
action='Report to IT Security',
|
|
details='Even if device not found, report the finding to IT Security for network monitoring.',
|
|
),
|
|
],
|
|
when_to_escalate='Escalate immediately. Evil twin attacks can capture credentials and traffic.',
|
|
documentation_required=[
|
|
'Both AP details (BSSID, SSID, security, channel, signal)',
|
|
'Location where detected',
|
|
'Signal strength map if created',
|
|
'Physical search results',
|
|
],
|
|
),
|
|
}
|
|
|
|
|
|
def get_playbook_for_finding(
|
|
risk_level: str,
|
|
finding_type: Optional[str] = None,
|
|
indicators: Optional[list[dict]] = None
|
|
) -> OperatorPlaybook:
|
|
"""
|
|
Get appropriate playbook for a finding.
|
|
|
|
Args:
|
|
risk_level: Risk level string
|
|
finding_type: Optional specific finding type
|
|
indicators: Optional list of indicators
|
|
|
|
Returns:
|
|
Appropriate OperatorPlaybook
|
|
"""
|
|
# Check for specific finding types
|
|
if finding_type == 'evil_twin':
|
|
return PLAYBOOKS['wifi_evil_twin']
|
|
|
|
# Check indicators for tracker
|
|
if indicators:
|
|
tracker_types = ['airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker']
|
|
if any(i.get('type') in tracker_types for i in indicators):
|
|
if risk_level == 'high_interest':
|
|
return PLAYBOOKS['high_interest_tracker']
|
|
|
|
# Return based on risk level
|
|
if risk_level == 'high_interest':
|
|
return PLAYBOOKS['high_interest_generic']
|
|
elif risk_level in ['review', 'needs_review']:
|
|
return PLAYBOOKS['needs_review']
|
|
else:
|
|
return PLAYBOOKS['informational']
|
|
|
|
|
|
def attach_playbook_to_finding(finding: dict) -> dict:
|
|
"""
|
|
Attach appropriate playbook to a finding dict.
|
|
|
|
Args:
|
|
finding: Finding dict with risk_level, indicators, etc.
|
|
|
|
Returns:
|
|
Finding dict with playbook attached
|
|
"""
|
|
risk_level = finding.get('risk_level', 'informational')
|
|
finding_type = finding.get('finding_type')
|
|
indicators = finding.get('indicators', [])
|
|
|
|
playbook = get_playbook_for_finding(risk_level, finding_type, indicators)
|
|
finding['suggested_playbook'] = playbook.to_dict()
|
|
finding['suggested_next_steps'] = [
|
|
f"Step {s.step_number}: {s.action}"
|
|
for s in playbook.steps[:3] # First 3 steps as quick reference
|
|
]
|
|
|
|
return finding
|
|
|
|
|
|
# =============================================================================
|
|
# Global Instance Management
|
|
# =============================================================================
|
|
|
|
_timeline_manager: Optional[TimelineManager] = None
|
|
_wifi_detector: Optional[WiFiAdvancedDetector] = None
|
|
|
|
|
|
def get_timeline_manager() -> TimelineManager:
|
|
"""Get or create global timeline manager."""
|
|
global _timeline_manager
|
|
if _timeline_manager is None:
|
|
_timeline_manager = TimelineManager()
|
|
return _timeline_manager
|
|
|
|
|
|
def reset_timeline_manager() -> None:
|
|
"""Reset global timeline manager."""
|
|
global _timeline_manager
|
|
_timeline_manager = TimelineManager()
|
|
|
|
|
|
def get_wifi_detector(monitor_mode: bool = False) -> WiFiAdvancedDetector:
|
|
"""Get or create global WiFi detector."""
|
|
global _wifi_detector
|
|
if _wifi_detector is None:
|
|
_wifi_detector = WiFiAdvancedDetector(monitor_mode)
|
|
return _wifi_detector
|
|
|
|
|
|
def reset_wifi_detector(monitor_mode: bool = False) -> None:
|
|
"""Reset global WiFi detector."""
|
|
global _wifi_detector
|
|
_wifi_detector = WiFiAdvancedDetector(monitor_mode)
|