Files
intercept/utils/tscm/advanced.py

2240 lines
85 KiB
Python

"""
TSCM Advanced Features Module
Implements:
1. Capability & Coverage Reality Panel
2. Baseline Diff & Baseline Health
3. Per-Device Timelines
4. Meeting-Window Summary Enhancements
5. WiFi Advanced Indicators (Evil Twin, Probes, Deauth)
6. Bluetooth Risk Explainability & Proximity Heuristics
7. Operator Playbooks
DISCLAIMER: This system performs wireless and RF surveillance screening.
Findings indicate anomalies and indicators, not confirmed surveillance devices.
All claims are probabilistic pattern matches requiring professional verification.
"""
from __future__ import annotations
import logging
import os
import platform
import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional
logger = logging.getLogger('intercept.tscm.advanced')
# =============================================================================
# 1. Capability & Coverage Reality Panel
# =============================================================================
class WifiMode(Enum):
"""WiFi adapter operating modes."""
MONITOR = 'monitor'
MANAGED = 'managed'
UNAVAILABLE = 'unavailable'
class BluetoothMode(Enum):
"""Bluetooth adapter capabilities."""
BLE_CLASSIC = 'ble_classic'
BLE_ONLY = 'ble_only'
LIMITED = 'limited'
UNAVAILABLE = 'unavailable'
@dataclass
class RFCapability:
"""RF/SDR device capabilities."""
device_type: str = 'none'
driver: str = ''
min_frequency_mhz: float = 0.0
max_frequency_mhz: float = 0.0
sample_rate_max: int = 0
available: bool = False
limitations: list[str] = field(default_factory=list)
@dataclass
class SweepCapabilities:
"""
Complete capabilities snapshot for a TSCM sweep.
Exposes what the current sweep CAN and CANNOT detect based on
OS, privileges, adapters, and SDR hardware limits.
"""
# System info
os_name: str = ''
os_version: str = ''
is_root: bool = False
# WiFi capabilities
wifi_mode: WifiMode = WifiMode.UNAVAILABLE
wifi_interface: str = ''
wifi_driver: str = ''
wifi_monitor_capable: bool = False
wifi_limitations: list[str] = field(default_factory=list)
# Bluetooth capabilities
bt_mode: BluetoothMode = BluetoothMode.UNAVAILABLE
bt_adapter: str = ''
bt_version: str = ''
bt_limitations: list[str] = field(default_factory=list)
# RF/SDR capabilities
rf_capability: RFCapability = field(default_factory=RFCapability)
# Overall limitations
all_limitations: list[str] = field(default_factory=list)
# Timestamp
captured_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'system': {
'os': self.os_name,
'os_version': self.os_version,
'is_root': self.is_root,
},
'wifi': {
'mode': self.wifi_mode.value,
'interface': self.wifi_interface,
'driver': self.wifi_driver,
'monitor_capable': self.wifi_monitor_capable,
'limitations': self.wifi_limitations,
},
'bluetooth': {
'mode': self.bt_mode.value,
'adapter': self.bt_adapter,
'version': self.bt_version,
'limitations': self.bt_limitations,
},
'rf': {
'device_type': self.rf_capability.device_type,
'driver': self.rf_capability.driver,
'frequency_range_mhz': {
'min': self.rf_capability.min_frequency_mhz,
'max': self.rf_capability.max_frequency_mhz,
},
'sample_rate_max': self.rf_capability.sample_rate_max,
'available': self.rf_capability.available,
'limitations': self.rf_capability.limitations,
},
'all_limitations': self.all_limitations,
'captured_at': self.captured_at.isoformat(),
'disclaimer': (
"Capabilities are detected at sweep start time and may change. "
"Limitations listed affect what this sweep can reliably detect."
),
}
def detect_sweep_capabilities(
wifi_interface: str = '',
bt_adapter: str = '',
sdr_device: Any = None
) -> SweepCapabilities:
"""
Detect current system capabilities for TSCM sweeping.
Args:
wifi_interface: Specific WiFi interface to check
bt_adapter: Specific BT adapter to check
sdr_device: SDR device object if available
Returns:
SweepCapabilities object with complete capability assessment
"""
caps = SweepCapabilities()
# System info
caps.os_name = platform.system()
caps.os_version = platform.release()
caps.is_root = os.geteuid() == 0 if hasattr(os, 'geteuid') else False
# Detect WiFi capabilities
_detect_wifi_capabilities(caps, wifi_interface)
# Detect Bluetooth capabilities
_detect_bluetooth_capabilities(caps, bt_adapter)
# Detect RF/SDR capabilities
_detect_rf_capabilities(caps, sdr_device)
# Compile all limitations
caps.all_limitations = (
caps.wifi_limitations +
caps.bt_limitations +
caps.rf_capability.limitations
)
# Add privilege-based limitations
if not caps.is_root:
caps.all_limitations.append(
"Running without root privileges - some features may be limited"
)
return caps
def _detect_wifi_capabilities(caps: SweepCapabilities, interface: str) -> None:
"""Detect WiFi adapter capabilities."""
caps.wifi_interface = interface
if platform.system() == 'Darwin':
# macOS: Check for WiFi capability using multiple methods
wifi_available = False
# Method 1: Check airport utility (older macOS)
airport_path = '/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport'
if os.path.exists(airport_path):
wifi_available = True
# Method 2: Check for WiFi interface using networksetup (works on all macOS)
if not wifi_available:
try:
result = subprocess.run(
['networksetup', '-listallhardwareports'],
capture_output=True,
text=True,
timeout=5
)
if 'Wi-Fi' in result.stdout or 'AirPort' in result.stdout:
wifi_available = True
except Exception:
pass
# Method 3: Check if en0 exists (common WiFi interface on macOS)
if not wifi_available:
try:
result = subprocess.run(
['ifconfig', 'en0'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
wifi_available = True
except Exception:
pass
if wifi_available:
caps.wifi_mode = WifiMode.MANAGED
caps.wifi_driver = 'apple80211'
caps.wifi_monitor_capable = False
caps.wifi_limitations = [
"macOS WiFi operates in managed mode only.",
"Cannot capture probe requests or deauthentication frames.",
"Evil twin detection limited to SSID/BSSID comparison only.",
]
else:
caps.wifi_mode = WifiMode.UNAVAILABLE
caps.wifi_limitations = ["WiFi scanning unavailable - no interface found"]
else:
# Linux: Check for monitor mode capability
try:
# Check if interface supports monitor mode
result = subprocess.run(
['iw', 'list'],
capture_output=True,
text=True,
timeout=5
)
if 'monitor' in result.stdout.lower():
# Check current mode
if interface:
mode_result = subprocess.run(
['iw', 'dev', interface, 'info'],
capture_output=True,
text=True,
timeout=5
)
if 'type monitor' in mode_result.stdout.lower():
caps.wifi_mode = WifiMode.MONITOR
caps.wifi_monitor_capable = True
else:
caps.wifi_mode = WifiMode.MANAGED
caps.wifi_monitor_capable = True
caps.wifi_limitations.append(
"WiFi interface in managed mode. "
"Probe requests and deauth detection require monitor mode."
)
else:
caps.wifi_mode = WifiMode.MANAGED
caps.wifi_monitor_capable = True
else:
caps.wifi_mode = WifiMode.MANAGED
caps.wifi_monitor_capable = False
caps.wifi_limitations = [
"Passive WiFi frame analysis is not available in this sweep.",
"WiFi adapter does not support monitor mode.",
"Probe request and deauthentication detection unavailable.",
]
# Get driver info
if interface:
try:
driver_path = f'/sys/class/net/{interface}/device/driver'
if os.path.exists(driver_path):
caps.wifi_driver = os.path.basename(os.readlink(driver_path))
except Exception:
pass
except (subprocess.TimeoutExpired, FileNotFoundError):
caps.wifi_mode = WifiMode.UNAVAILABLE
caps.wifi_limitations = ["WiFi scanning tools not available"]
def _detect_bluetooth_capabilities(caps: SweepCapabilities, adapter: str) -> None:
"""Detect Bluetooth adapter capabilities."""
caps.bt_adapter = adapter
if platform.system() == 'Darwin':
# macOS: Use system_profiler
try:
result = subprocess.run(
['system_profiler', 'SPBluetoothDataType', '-json'],
capture_output=True,
text=True,
timeout=10
)
if 'Bluetooth' in result.stdout:
caps.bt_mode = BluetoothMode.BLE_CLASSIC
caps.bt_version = 'macOS CoreBluetooth'
caps.bt_limitations = [
"BLE scanning limited to advertising devices only.",
"Classic Bluetooth discovery may be incomplete.",
"Manufacturer data parsing depends on device advertising.",
]
else:
caps.bt_mode = BluetoothMode.UNAVAILABLE
caps.bt_limitations = ["Bluetooth not available"]
except (subprocess.TimeoutExpired, FileNotFoundError):
caps.bt_mode = BluetoothMode.UNAVAILABLE
caps.bt_limitations = ["Bluetooth detection failed"]
else:
# Linux: Check bluetoothctl/hciconfig
try:
result = subprocess.run(
['hciconfig', '-a'],
capture_output=True,
text=True,
timeout=5
)
if 'hci' in result.stdout.lower():
# Check for BLE support
if 'le' in result.stdout.lower():
caps.bt_mode = BluetoothMode.BLE_CLASSIC
caps.bt_limitations = [
"BLE scanning range depends on adapter sensitivity.",
"Some devices may not be detected if not advertising.",
]
else:
caps.bt_mode = BluetoothMode.LIMITED
caps.bt_limitations = [
"Adapter may not support BLE scanning.",
"Limited to classic Bluetooth discovery.",
]
# Extract version
for line in result.stdout.split('\n'):
if 'hci version' in line.lower():
caps.bt_version = line.strip()
break
else:
caps.bt_mode = BluetoothMode.UNAVAILABLE
caps.bt_limitations = ["No Bluetooth adapter found"]
except (subprocess.TimeoutExpired, FileNotFoundError):
caps.bt_mode = BluetoothMode.UNAVAILABLE
caps.bt_limitations = ["Bluetooth tools not available"]
def _detect_rf_capabilities(caps: SweepCapabilities, sdr_device: Any) -> None:
"""Detect RF/SDR device capabilities."""
rf_cap = RFCapability()
try:
from utils.sdr import SDRFactory
devices = SDRFactory.detect_devices()
if devices:
device = devices[0] # Use first device
rf_cap.available = True
rf_cap.device_type = getattr(device, 'sdr_type', 'unknown')
if hasattr(rf_cap.device_type, 'value'):
rf_cap.device_type = rf_cap.device_type.value
rf_cap.driver = getattr(device, 'driver', '')
# Set frequency ranges based on device type
if 'rtl' in rf_cap.device_type.lower():
rf_cap.min_frequency_mhz = 24.0
rf_cap.max_frequency_mhz = 1766.0
rf_cap.sample_rate_max = 3200000
rf_cap.limitations = [
"RTL-SDR frequency range: 24-1766 MHz typical.",
"Cannot reliably cover frequencies below 24 MHz.",
"Cannot cover microwave bands (>1.8 GHz) without upconverter.",
"Signal detection limited by SDR noise floor and dynamic range.",
]
elif 'hackrf' in rf_cap.device_type.lower():
rf_cap.min_frequency_mhz = 1.0
rf_cap.max_frequency_mhz = 6000.0
rf_cap.sample_rate_max = 20000000
rf_cap.limitations = [
"HackRF frequency range: 1 MHz - 6 GHz.",
"8-bit ADC limits dynamic range for weak signal detection.",
]
else:
rf_cap.limitations = [
f"Unknown SDR type: {rf_cap.device_type}",
"Frequency coverage and capabilities uncertain.",
]
else:
rf_cap.available = False
rf_cap.device_type = 'none'
rf_cap.limitations = [
"No SDR device detected.",
"RF spectrum analysis is not available in this sweep.",
"Cannot scan for wireless microphones, bugs, or RF transmitters.",
]
except ImportError:
rf_cap.available = False
rf_cap.limitations = [
"SDR support not installed.",
"RF spectrum analysis unavailable.",
]
except Exception as e:
rf_cap.available = False
rf_cap.limitations = [f"SDR detection failed: {str(e)}"]
caps.rf_capability = rf_cap
# =============================================================================
# 2. Baseline Diff & Baseline Health
# =============================================================================
class BaselineHealth(Enum):
"""Baseline health status."""
HEALTHY = 'healthy'
NOISY = 'noisy'
STALE = 'stale'
@dataclass
class DeviceChange:
"""Represents a change detected compared to baseline."""
identifier: str
protocol: str
change_type: str # 'new', 'missing', 'rssi_drift', 'channel_change', 'security_change'
description: str
expected: bool = False # True if this is an expected/normal change
details: dict = field(default_factory=dict)
@dataclass
class BaselineDiff:
"""
Complete diff between a baseline and a sweep.
Shows what changed, whether baseline is reliable,
and separates expected vs unexpected changes.
"""
baseline_id: int
sweep_id: int
# Health assessment
health: BaselineHealth = BaselineHealth.HEALTHY
health_score: float = 1.0 # 0-1, higher is healthier
health_reasons: list[str] = field(default_factory=list)
# Age metrics
baseline_age_hours: float = 0.0
is_stale: bool = False
# Device changes
new_devices: list[DeviceChange] = field(default_factory=list)
missing_devices: list[DeviceChange] = field(default_factory=list)
changed_devices: list[DeviceChange] = field(default_factory=list)
# Summary counts
total_new: int = 0
total_missing: int = 0
total_changed: int = 0
# Expected vs unexpected
expected_changes: list[DeviceChange] = field(default_factory=list)
unexpected_changes: list[DeviceChange] = field(default_factory=list)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'baseline_id': self.baseline_id,
'sweep_id': self.sweep_id,
'health': {
'status': self.health.value,
'score': round(self.health_score, 2),
'reasons': self.health_reasons,
},
'age': {
'hours': round(self.baseline_age_hours, 1),
'is_stale': self.is_stale,
},
'summary': {
'new_devices': self.total_new,
'missing_devices': self.total_missing,
'changed_devices': self.total_changed,
'expected_changes': len(self.expected_changes),
'unexpected_changes': len(self.unexpected_changes),
},
'new_devices': [
{'identifier': d.identifier, 'protocol': d.protocol,
'description': d.description, 'details': d.details}
for d in self.new_devices
],
'missing_devices': [
{'identifier': d.identifier, 'protocol': d.protocol,
'description': d.description, 'details': d.details}
for d in self.missing_devices
],
'changed_devices': [
{'identifier': d.identifier, 'protocol': d.protocol,
'change_type': d.change_type, 'description': d.description,
'expected': d.expected, 'details': d.details}
for d in self.changed_devices
],
'disclaimer': (
"Baseline comparison shows differences, not confirmed threats. "
"New devices may be legitimate. Missing devices may have been powered off."
),
}
def calculate_baseline_diff(
baseline: dict,
current_wifi: list[dict],
current_wifi_clients: list[dict],
current_bt: list[dict],
current_rf: list[dict],
sweep_id: int
) -> BaselineDiff:
"""
Calculate comprehensive diff between baseline and current scan.
Args:
baseline: Baseline dict from database
current_wifi: Current WiFi devices
current_wifi_clients: Current WiFi clients
current_bt: Current Bluetooth devices
current_rf: Current RF signals
sweep_id: Current sweep ID
Returns:
BaselineDiff with complete comparison results
"""
diff = BaselineDiff(
baseline_id=baseline.get('id', 0),
sweep_id=sweep_id
)
# Calculate baseline age
created_at = baseline.get('created_at')
if created_at:
if isinstance(created_at, str):
try:
created = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
diff.baseline_age_hours = (datetime.now() - created.replace(tzinfo=None)).total_seconds() / 3600
except ValueError:
diff.baseline_age_hours = 0
elif isinstance(created_at, datetime):
diff.baseline_age_hours = (datetime.now() - created_at).total_seconds() / 3600
# Check if baseline is stale (>72 hours old)
diff.is_stale = diff.baseline_age_hours > 72
# Build baseline lookup dicts
baseline_wifi = {
d.get('bssid', d.get('mac', '')).upper(): d
for d in baseline.get('wifi_networks', [])
if d.get('bssid') or d.get('mac')
}
baseline_wifi_clients = {
d.get('mac', d.get('address', '')).upper(): d
for d in baseline.get('wifi_clients', [])
if d.get('mac') or d.get('address')
}
baseline_bt = {
d.get('mac', d.get('address', '')).upper(): d
for d in baseline.get('bt_devices', [])
if d.get('mac') or d.get('address')
}
baseline_rf = {
round(d.get('frequency', 0), 1): d
for d in baseline.get('rf_frequencies', [])
if d.get('frequency')
}
# Compare WiFi
_compare_wifi(diff, baseline_wifi, current_wifi)
# Compare WiFi clients
_compare_wifi_clients(diff, baseline_wifi_clients, current_wifi_clients)
# Compare Bluetooth
_compare_bluetooth(diff, baseline_bt, current_bt)
# Compare RF
_compare_rf(diff, baseline_rf, current_rf)
# Calculate totals
diff.total_new = len(diff.new_devices)
diff.total_missing = len(diff.missing_devices)
diff.total_changed = len(diff.changed_devices)
# Separate expected vs unexpected changes
for change in diff.new_devices + diff.missing_devices + diff.changed_devices:
if change.expected:
diff.expected_changes.append(change)
else:
diff.unexpected_changes.append(change)
# Calculate health
_calculate_baseline_health(diff, baseline)
return diff
def _compare_wifi(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None:
"""Compare WiFi devices between baseline and current."""
current_macs = {
d.get('bssid', d.get('mac', '')).upper(): d
for d in current
if d.get('bssid') or d.get('mac')
}
# Find new devices
for mac, device in current_macs.items():
if mac not in baseline:
ssid = device.get('essid', device.get('ssid', 'Hidden'))
diff.new_devices.append(DeviceChange(
identifier=mac,
protocol='wifi',
change_type='new',
description=f'New WiFi AP: {ssid}',
expected=False,
details={
'ssid': ssid,
'channel': device.get('channel'),
'rssi': device.get('power', device.get('signal')),
}
))
def _compare_wifi_clients(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None:
"""Compare WiFi clients between baseline and current."""
current_macs = {
d.get('mac', d.get('address', '')).upper(): d
for d in current
if d.get('mac') or d.get('address')
}
# Find new clients
for mac, device in current_macs.items():
if mac not in baseline:
name = device.get('vendor', 'WiFi Client')
diff.new_devices.append(DeviceChange(
identifier=mac,
protocol='wifi_client',
change_type='new',
description=f'New WiFi client: {name}',
expected=False,
details={
'vendor': name,
'rssi': device.get('rssi'),
'associated_bssid': device.get('associated_bssid'),
}
))
# Find missing clients
for mac, device in baseline.items():
if mac not in current_macs:
name = device.get('vendor', 'WiFi Client')
diff.missing_devices.append(DeviceChange(
identifier=mac,
protocol='wifi_client',
change_type='missing',
description=f'Missing WiFi client: {name}',
expected=True,
details={
'vendor': name,
}
))
else:
# Check for changes
baseline_dev = baseline[mac]
changes = []
# RSSI drift
curr_rssi = device.get('power', device.get('signal'))
base_rssi = baseline_dev.get('power', baseline_dev.get('signal'))
if curr_rssi and base_rssi:
rssi_diff = abs(int(curr_rssi) - int(base_rssi))
if rssi_diff > 15:
changes.append(('rssi_drift', f'RSSI changed by {rssi_diff} dBm'))
# Channel change
curr_chan = device.get('channel')
base_chan = baseline_dev.get('channel')
if curr_chan and base_chan and curr_chan != base_chan:
changes.append(('channel_change', f'Channel changed from {base_chan} to {curr_chan}'))
# Security change
curr_sec = device.get('encryption', device.get('privacy', ''))
base_sec = baseline_dev.get('encryption', baseline_dev.get('privacy', ''))
if curr_sec and base_sec and curr_sec != base_sec:
changes.append(('security_change', f'Security changed from {base_sec} to {curr_sec}'))
for change_type, desc in changes:
diff.changed_devices.append(DeviceChange(
identifier=mac,
protocol='wifi',
change_type=change_type,
description=desc,
expected=change_type == 'rssi_drift', # RSSI drift is often expected
details={
'ssid': device.get('essid', device.get('ssid')),
'baseline': baseline_dev,
'current': device,
}
))
# Find missing devices
for mac, device in baseline.items():
if mac not in current_macs:
ssid = device.get('essid', device.get('ssid', 'Hidden'))
diff.missing_devices.append(DeviceChange(
identifier=mac,
protocol='wifi',
change_type='missing',
description=f'Missing WiFi AP: {ssid}',
expected=False, # Could be powered off
details={
'ssid': ssid,
'last_channel': device.get('channel'),
}
))
def _compare_bluetooth(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None:
"""Compare Bluetooth devices between baseline and current."""
current_macs = {
d.get('mac', d.get('address', '')).upper(): d
for d in current
if d.get('mac') or d.get('address')
}
# Find new devices
for mac, device in current_macs.items():
if mac not in baseline:
name = device.get('name', 'Unknown')
diff.new_devices.append(DeviceChange(
identifier=mac,
protocol='bluetooth',
change_type='new',
description=f'New BLE device: {name}',
expected=False,
details={
'name': name,
'rssi': device.get('rssi'),
'manufacturer': device.get('manufacturer'),
}
))
else:
# Check for changes
baseline_dev = baseline[mac]
# Name change (device renamed)
curr_name = device.get('name', '')
base_name = baseline_dev.get('name', '')
if curr_name and base_name and curr_name != base_name:
diff.changed_devices.append(DeviceChange(
identifier=mac,
protocol='bluetooth',
change_type='name_change',
description=f'Device renamed: {base_name} -> {curr_name}',
expected=True,
details={'old_name': base_name, 'new_name': curr_name}
))
# Find missing devices
for mac, device in baseline.items():
if mac not in current_macs:
name = device.get('name', 'Unknown')
diff.missing_devices.append(DeviceChange(
identifier=mac,
protocol='bluetooth',
change_type='missing',
description=f'Missing BLE device: {name}',
expected=True, # BLE devices often go to sleep
details={'name': name}
))
def _compare_rf(diff: BaselineDiff, baseline: dict, current: list[dict]) -> None:
"""Compare RF signals between baseline and current."""
current_freqs = {
round(s.get('frequency', 0), 1): s
for s in current
if s.get('frequency')
}
# Find new signals
for freq, signal in current_freqs.items():
if freq not in baseline:
diff.new_devices.append(DeviceChange(
identifier=f'{freq:.1f} MHz',
protocol='rf',
change_type='new',
description=f'New RF signal at {freq:.3f} MHz',
expected=False,
details={
'frequency': freq,
'power': signal.get('power', signal.get('level')),
'modulation': signal.get('modulation'),
}
))
# Find missing signals
for freq, signal in baseline.items():
if freq not in current_freqs:
diff.missing_devices.append(DeviceChange(
identifier=f'{freq:.1f} MHz',
protocol='rf',
change_type='missing',
description=f'Missing RF signal at {freq:.1f} MHz',
expected=True, # RF signals can be intermittent
details={'frequency': freq}
))
def _calculate_baseline_health(diff: BaselineDiff, baseline: dict) -> None:
"""Calculate baseline health score and status."""
score = 1.0
reasons = []
# Age penalty
if diff.baseline_age_hours > 168: # > 1 week
score -= 0.4
reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old (>1 week)")
elif diff.baseline_age_hours > 72: # > 3 days
score -= 0.2
reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old (>3 days)")
elif diff.baseline_age_hours > 24:
score -= 0.1
reasons.append(f"Baseline is {diff.baseline_age_hours:.0f} hours old")
# Device churn penalty
total_baseline = (
len(baseline.get('wifi_networks', [])) +
len(baseline.get('wifi_clients', [])) +
len(baseline.get('bt_devices', [])) +
len(baseline.get('rf_frequencies', []))
)
if total_baseline > 0:
churn_rate = (diff.total_new + diff.total_missing) / total_baseline
if churn_rate > 0.5:
score -= 0.3
reasons.append(f"High device churn rate: {churn_rate:.0%}")
elif churn_rate > 0.25:
score -= 0.15
reasons.append(f"Moderate device churn rate: {churn_rate:.0%}")
# Small baseline penalty
if total_baseline < 3:
score -= 0.2
reasons.append(f"Baseline has few devices ({total_baseline}) - may be incomplete")
# Set health status
diff.health_score = max(0, min(1, score))
if diff.health_score >= 0.7:
diff.health = BaselineHealth.HEALTHY
elif diff.health_score >= 0.4:
diff.health = BaselineHealth.NOISY
if not reasons:
reasons.append("Baseline showing moderate variability")
else:
diff.health = BaselineHealth.STALE
if not reasons:
reasons.append("Baseline requires refresh")
diff.health_reasons = reasons
# =============================================================================
# 3. Per-Device Timelines
# =============================================================================
@dataclass
class DeviceObservation:
"""A single observation of a device."""
timestamp: datetime
rssi: Optional[int] = None
present: bool = True
channel: Optional[int] = None
frequency: Optional[float] = None
attributes: dict = field(default_factory=dict)
@dataclass
class DeviceTimeline:
"""
Complete timeline for a device showing behavior over time.
Used to assess signal stability, movement patterns, and
meeting window correlation.
"""
identifier: str
protocol: str
name: Optional[str] = None
# Observation history (time-bucketed)
observations: list[DeviceObservation] = field(default_factory=list)
# Computed metrics
first_seen: Optional[datetime] = None
last_seen: Optional[datetime] = None
total_observations: int = 0
presence_ratio: float = 0.0 # % of time device was present
# Signal metrics
rssi_min: Optional[int] = None
rssi_max: Optional[int] = None
rssi_mean: Optional[float] = None
rssi_stability: float = 0.0 # 0-1, higher = more stable
# Movement assessment
appears_stationary: bool = True
movement_pattern: str = 'unknown' # 'stationary', 'mobile', 'intermittent'
# Meeting correlation
meeting_correlated: bool = False
meeting_observations: int = 0
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'identifier': self.identifier,
'protocol': self.protocol,
'name': self.name,
'observations': [
{
'timestamp': obs.timestamp.isoformat(),
'rssi': obs.rssi,
'present': obs.present,
'channel': obs.channel,
'frequency': obs.frequency,
}
for obs in self.observations[-50:] # Limit to last 50
],
'metrics': {
'first_seen': self.first_seen.isoformat() if self.first_seen else None,
'last_seen': self.last_seen.isoformat() if self.last_seen else None,
'total_observations': self.total_observations,
'presence_ratio': round(self.presence_ratio, 2),
},
'signal': {
'rssi_min': self.rssi_min,
'rssi_max': self.rssi_max,
'rssi_mean': round(self.rssi_mean, 1) if self.rssi_mean else None,
'stability': round(self.rssi_stability, 2),
},
'movement': {
'appears_stationary': self.appears_stationary,
'pattern': self.movement_pattern,
},
'meeting_correlation': {
'correlated': self.meeting_correlated,
'observations_during_meeting': self.meeting_observations,
},
}
class TimelineManager:
"""
Manages per-device timelines with time-bucketing.
Buckets observations to keep memory bounded while preserving
useful behavioral patterns.
"""
def __init__(self, bucket_seconds: int = 30, max_observations: int = 200):
"""
Args:
bucket_seconds: Time bucket size in seconds
max_observations: Maximum observations to keep per device
"""
self.bucket_seconds = bucket_seconds
self.max_observations = max_observations
self.timelines: dict[str, DeviceTimeline] = {}
self._meeting_windows: list[tuple[datetime, Optional[datetime]]] = []
def add_observation(
self,
identifier: str,
protocol: str,
rssi: Optional[int] = None,
channel: Optional[int] = None,
frequency: Optional[float] = None,
name: Optional[str] = None,
attributes: Optional[dict] = None
) -> None:
"""Add an observation for a device."""
key = f"{protocol}:{identifier.upper()}"
now = datetime.now()
if key not in self.timelines:
self.timelines[key] = DeviceTimeline(
identifier=identifier.upper(),
protocol=protocol,
name=name,
first_seen=now,
)
timeline = self.timelines[key]
# Update name if provided
if name:
timeline.name = name
# Check if we should bucket with previous observation
if timeline.observations:
last_obs = timeline.observations[-1]
time_diff = (now - last_obs.timestamp).total_seconds()
if time_diff < self.bucket_seconds:
# Update existing bucket
if rssi is not None:
# Average RSSI
if last_obs.rssi is not None:
last_obs.rssi = (last_obs.rssi + rssi) // 2
else:
last_obs.rssi = rssi
return
# Add new observation
obs = DeviceObservation(
timestamp=now,
rssi=rssi,
present=True,
channel=channel,
frequency=frequency,
attributes=attributes or {},
)
timeline.observations.append(obs)
# Enforce max observations
if len(timeline.observations) > self.max_observations:
timeline.observations = timeline.observations[-self.max_observations:]
# Update metrics
timeline.last_seen = now
timeline.total_observations = len(timeline.observations)
# Check meeting correlation
if self._is_during_meeting(now):
timeline.meeting_observations += 1
timeline.meeting_correlated = True
def start_meeting_window(self) -> None:
"""Mark the start of a meeting window."""
self._meeting_windows.append((datetime.now(), None))
def end_meeting_window(self) -> None:
"""Mark the end of a meeting window."""
if self._meeting_windows and self._meeting_windows[-1][1] is None:
start = self._meeting_windows[-1][0]
self._meeting_windows[-1] = (start, datetime.now())
def _is_during_meeting(self, timestamp: datetime) -> bool:
"""Check if timestamp falls within a meeting window."""
for start, end in self._meeting_windows:
if end is None:
if timestamp >= start:
return True
elif start <= timestamp <= end:
return True
return False
def compute_metrics(self, identifier: str, protocol: str) -> Optional[DeviceTimeline]:
"""Compute all metrics for a device timeline."""
key = f"{protocol}:{identifier.upper()}"
if key not in self.timelines:
return None
timeline = self.timelines[key]
if not timeline.observations:
return timeline
# RSSI metrics
rssi_values = [obs.rssi for obs in timeline.observations if obs.rssi is not None]
if rssi_values:
timeline.rssi_min = min(rssi_values)
timeline.rssi_max = max(rssi_values)
timeline.rssi_mean = sum(rssi_values) / len(rssi_values)
# Calculate stability (0-1)
if len(rssi_values) >= 3:
variance = sum((r - timeline.rssi_mean) ** 2 for r in rssi_values) / len(rssi_values)
timeline.rssi_stability = max(0, 1 - (variance / 100))
# Movement assessment based on RSSI variance
rssi_range = timeline.rssi_max - timeline.rssi_min
if rssi_range < 10:
timeline.appears_stationary = True
timeline.movement_pattern = 'stationary'
elif rssi_range < 25:
timeline.appears_stationary = False
timeline.movement_pattern = 'mobile'
else:
timeline.appears_stationary = False
timeline.movement_pattern = 'intermittent'
# Presence ratio
if timeline.first_seen and timeline.last_seen:
total_duration = (timeline.last_seen - timeline.first_seen).total_seconds()
if total_duration > 0:
# Estimate presence based on observation count and bucket size
estimated_present_time = timeline.total_observations * self.bucket_seconds
timeline.presence_ratio = min(1.0, estimated_present_time / total_duration)
return timeline
def get_timeline(self, identifier: str, protocol: str) -> Optional[DeviceTimeline]:
"""Get computed timeline for a device."""
return self.compute_metrics(identifier, protocol)
def get_all_timelines(self) -> list[DeviceTimeline]:
"""Get all device timelines with computed metrics."""
for key in self.timelines:
protocol, identifier = key.split(':', 1)
self.compute_metrics(identifier, protocol)
return list(self.timelines.values())
# =============================================================================
# 5. Meeting-Window Summary Enhancements
# =============================================================================
@dataclass
class MeetingWindowSummary:
"""
Summary of device activity during a meeting window.
Tracks devices first seen during meeting, behavior changes,
and applies meeting-window scoring modifiers.
"""
meeting_id: int
name: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
duration_minutes: float = 0.0
# Devices first seen during meeting (high interest)
devices_first_seen: list[dict] = field(default_factory=list)
# Devices with behavior change during meeting
devices_behavior_change: list[dict] = field(default_factory=list)
# All active devices during meeting
active_devices: list[dict] = field(default_factory=list)
# Summary metrics
total_devices_active: int = 0
new_devices_count: int = 0
behavior_changes_count: int = 0
high_interest_count: int = 0
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'meeting_id': self.meeting_id,
'name': self.name,
'start_time': self.start_time.isoformat() if self.start_time else None,
'end_time': self.end_time.isoformat() if self.end_time else None,
'duration_minutes': round(self.duration_minutes, 1),
'summary': {
'total_devices_active': self.total_devices_active,
'new_devices': self.new_devices_count,
'behavior_changes': self.behavior_changes_count,
'high_interest': self.high_interest_count,
},
'devices_first_seen': self.devices_first_seen,
'devices_behavior_change': self.devices_behavior_change,
'disclaimer': (
"Meeting-correlated activity indicates temporal correlation only, "
"not confirmed surveillance. Devices may have legitimate reasons "
"for appearing during meetings."
),
}
def generate_meeting_summary(
meeting_window: dict,
device_timelines: list[DeviceTimeline],
device_profiles: list[dict]
) -> MeetingWindowSummary:
"""
Generate summary of device activity during a meeting window.
Args:
meeting_window: Meeting window dict from database
device_timelines: List of device timelines
device_profiles: List of device profiles from correlation engine
Returns:
MeetingWindowSummary with analysis
"""
summary = MeetingWindowSummary(
meeting_id=meeting_window.get('id', 0),
name=meeting_window.get('name'),
)
# Parse times
start_str = meeting_window.get('start_time')
end_str = meeting_window.get('end_time')
if start_str:
if isinstance(start_str, str):
summary.start_time = datetime.fromisoformat(start_str.replace('Z', '+00:00')).replace(tzinfo=None)
else:
summary.start_time = start_str
if end_str:
if isinstance(end_str, str):
summary.end_time = datetime.fromisoformat(end_str.replace('Z', '+00:00')).replace(tzinfo=None)
else:
summary.end_time = end_str
if summary.start_time and summary.end_time:
summary.duration_minutes = (summary.end_time - summary.start_time).total_seconds() / 60
if not summary.start_time:
return summary
# Analyze device timelines
for timeline in device_timelines:
if not timeline.first_seen:
continue
# Check if device was active during meeting
was_active = False
first_seen_during = False
for obs in timeline.observations:
if summary.end_time:
if summary.start_time <= obs.timestamp <= summary.end_time:
was_active = True
if timeline.first_seen and abs((obs.timestamp - timeline.first_seen).total_seconds()) < 60:
first_seen_during = True
break
else:
# Meeting still ongoing
if obs.timestamp >= summary.start_time:
was_active = True
if timeline.first_seen and abs((obs.timestamp - timeline.first_seen).total_seconds()) < 60:
first_seen_during = True
break
if was_active:
device_info = {
'identifier': timeline.identifier,
'protocol': timeline.protocol,
'name': timeline.name,
'meeting_correlated': True,
}
summary.active_devices.append(device_info)
if first_seen_during:
device_info['first_seen_during_meeting'] = True
summary.devices_first_seen.append({
**device_info,
'description': 'Device first seen during meeting window',
'risk_modifier': '+2 (meeting-correlated activity)',
})
# Update counts
summary.total_devices_active = len(summary.active_devices)
summary.new_devices_count = len(summary.devices_first_seen)
summary.behavior_changes_count = len(summary.devices_behavior_change)
# Count high interest from profiles
for profile in device_profiles:
if profile.get('risk_level') == 'high_interest':
indicators = profile.get('indicators', [])
if any(i.get('type') == 'meeting_correlated' for i in indicators):
summary.high_interest_count += 1
return summary
# =============================================================================
# 7. WiFi Advanced Indicators (LIMITED SCOPE)
# =============================================================================
@dataclass
class WiFiAdvancedIndicator:
"""An advanced WiFi indicator detection."""
indicator_type: str # 'evil_twin', 'probe_request', 'deauth_burst'
severity: str # 'high', 'medium', 'low'
description: str
details: dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
requires_monitor_mode: bool = False
def to_dict(self) -> dict:
return {
'type': self.indicator_type,
'severity': self.severity,
'description': self.description,
'details': self.details,
'timestamp': self.timestamp.isoformat(),
'requires_monitor_mode': self.requires_monitor_mode,
'disclaimer': (
"Pattern detected - this is an indicator, not confirmation of an attack. "
"Further investigation required."
),
}
class WiFiAdvancedDetector:
"""
Detects advanced WiFi indicators.
LIMITED SCOPE - Only implements:
1. Evil Twin patterns (same SSID, different BSSID/security/abnormal signal)
2. Probe requests for sensitive SSIDs (requires monitor mode)
3. Deauthentication bursts (requires monitor mode)
All findings labeled as "pattern detected", never called attacks.
"""
def __init__(self, monitor_mode_available: bool = False):
self.monitor_mode = monitor_mode_available
self.known_networks: dict[str, dict] = {} # SSID -> expected BSSID/security
self.probe_requests: list[dict] = []
self.deauth_frames: list[dict] = []
self.indicators: list[WiFiAdvancedIndicator] = []
def set_known_networks(self, networks: list[dict]) -> None:
"""Set known/expected networks from baseline."""
for net in networks:
ssid = net.get('essid', net.get('ssid', ''))
if ssid:
self.known_networks[ssid] = {
'bssid': net.get('bssid', net.get('mac', '')).upper(),
'security': net.get('encryption', net.get('privacy', '')),
'channel': net.get('channel'),
'rssi': net.get('power', net.get('signal')),
}
def analyze_network(self, network: dict) -> list[WiFiAdvancedIndicator]:
"""
Analyze a network for evil twin patterns.
Detects: Same SSID with different BSSID, security, or abnormal signal.
"""
indicators = []
ssid = network.get('essid', network.get('ssid', ''))
bssid = network.get('bssid', network.get('mac', '')).upper()
security = network.get('encryption', network.get('privacy', ''))
rssi = network.get('power', network.get('signal'))
if not ssid or ssid in ['', 'Hidden', '[Hidden]']:
return indicators
if ssid in self.known_networks:
known = self.known_networks[ssid]
# Different BSSID for same SSID
if known['bssid'] and known['bssid'] != bssid:
# Check security mismatch
security_mismatch = known['security'] and security and known['security'] != security
# Check signal anomaly (significantly stronger than expected)
signal_anomaly = False
if rssi and known.get('rssi'):
try:
rssi_diff = int(rssi) - int(known['rssi'])
signal_anomaly = rssi_diff > 20 # Much stronger than expected
except (ValueError, TypeError):
pass
if security_mismatch:
indicators.append(WiFiAdvancedIndicator(
indicator_type='evil_twin',
severity='high',
description=f'Evil twin pattern detected for SSID "{ssid}"',
details={
'ssid': ssid,
'detected_bssid': bssid,
'expected_bssid': known['bssid'],
'detected_security': security,
'expected_security': known['security'],
'pattern': 'Different BSSID with security downgrade',
},
requires_monitor_mode=False,
))
elif signal_anomaly:
indicators.append(WiFiAdvancedIndicator(
indicator_type='evil_twin',
severity='medium',
description=f'Possible evil twin pattern for SSID "{ssid}"',
details={
'ssid': ssid,
'detected_bssid': bssid,
'expected_bssid': known['bssid'],
'signal_difference': f'+{rssi_diff} dBm stronger than expected',
'pattern': 'Different BSSID with abnormally strong signal',
},
requires_monitor_mode=False,
))
else:
indicators.append(WiFiAdvancedIndicator(
indicator_type='evil_twin',
severity='low',
description=f'Duplicate SSID detected: "{ssid}"',
details={
'ssid': ssid,
'detected_bssid': bssid,
'expected_bssid': known['bssid'],
'pattern': 'Multiple APs with same SSID (may be legitimate)',
},
requires_monitor_mode=False,
))
self.indicators.extend(indicators)
return indicators
def add_probe_request(self, frame: dict) -> Optional[WiFiAdvancedIndicator]:
"""
Record a probe request frame (requires monitor mode).
Detects repeated probing for sensitive SSIDs.
"""
if not self.monitor_mode:
return None
self.probe_requests.append({
'timestamp': datetime.now(),
'src_mac': frame.get('src_mac', '').upper(),
'probed_ssid': frame.get('ssid', ''),
})
# Keep last 1000 probe requests
if len(self.probe_requests) > 1000:
self.probe_requests = self.probe_requests[-1000:]
# Check for sensitive SSID probing
ssid = frame.get('ssid', '')
sensitive_patterns = [
'corp', 'internal', 'private', 'secure', 'vpn',
'admin', 'management', 'executive', 'board',
]
is_sensitive = any(p in ssid.lower() for p in sensitive_patterns) if ssid else False
if is_sensitive:
# Count recent probes for this SSID
recent_cutoff = datetime.now() - timedelta(minutes=5)
recent_probes = [
p for p in self.probe_requests
if p['probed_ssid'] == ssid and p['timestamp'] > recent_cutoff
]
if len(recent_probes) >= 3:
indicator = WiFiAdvancedIndicator(
indicator_type='probe_request',
severity='medium',
description=f'Repeated probing for sensitive SSID "{ssid}"',
details={
'ssid': ssid,
'probe_count': len(recent_probes),
'source_macs': list(set(p['src_mac'] for p in recent_probes)),
'pattern': 'Multiple probe requests for potentially sensitive network',
},
requires_monitor_mode=True,
)
self.indicators.append(indicator)
return indicator
return None
def add_deauth_frame(self, frame: dict) -> Optional[WiFiAdvancedIndicator]:
"""
Record a deauthentication frame (requires monitor mode).
Detects abnormal deauth volume potentially indicating attack.
"""
if not self.monitor_mode:
return None
self.deauth_frames.append({
'timestamp': datetime.now(),
'src_mac': frame.get('src_mac', '').upper(),
'dst_mac': frame.get('dst_mac', '').upper(),
'bssid': frame.get('bssid', '').upper(),
'reason': frame.get('reason_code'),
})
# Keep last 500 deauth frames
if len(self.deauth_frames) > 500:
self.deauth_frames = self.deauth_frames[-500:]
# Check for deauth burst (>10 deauths in 10 seconds)
recent_cutoff = datetime.now() - timedelta(seconds=10)
recent_deauths = [d for d in self.deauth_frames if d['timestamp'] > recent_cutoff]
if len(recent_deauths) >= 10:
# Check if targeting specific BSSID
bssid = frame.get('bssid', '').upper()
targeting_bssid = len([d for d in recent_deauths if d['bssid'] == bssid]) >= 5
indicator = WiFiAdvancedIndicator(
indicator_type='deauth_burst',
severity='high' if targeting_bssid else 'medium',
description='Deauthentication burst pattern detected',
details={
'deauth_count': len(recent_deauths),
'time_window_seconds': 10,
'targeted_bssid': bssid if targeting_bssid else None,
'unique_sources': len(set(d['src_mac'] for d in recent_deauths)),
'pattern': 'Abnormal deauthentication frame volume',
},
requires_monitor_mode=True,
)
self.indicators.append(indicator)
# Clear recent to avoid repeated alerts
self.deauth_frames = [d for d in self.deauth_frames if d['timestamp'] <= recent_cutoff]
return indicator
return None
def get_all_indicators(self) -> list[dict]:
"""Get all detected indicators."""
return [i.to_dict() for i in self.indicators]
def get_unavailable_features(self) -> list[str]:
"""Get list of features unavailable without monitor mode."""
if self.monitor_mode:
return []
return [
"Probe request analysis: Requires monitor mode to capture probe frames.",
"Deauthentication detection: Requires monitor mode to capture management frames.",
"Raw 802.11 frame analysis: Not available in managed mode.",
]
# =============================================================================
# 8. Bluetooth Risk Explainability & Proximity Heuristics
# =============================================================================
class BLEProximity(Enum):
"""RSSI-based proximity estimation."""
VERY_CLOSE = 'very_close' # Within ~1m
CLOSE = 'close' # Within ~3m
MODERATE = 'moderate' # Within ~10m
FAR = 'far' # Beyond ~10m
UNKNOWN = 'unknown'
@dataclass
class BLERiskExplanation:
"""
Explainable risk assessment for a BLE device.
Provides human-readable explanations, proximity estimates,
and recommended actions.
"""
identifier: str
name: Optional[str] = None
# Risk assessment
risk_level: str = 'informational'
risk_score: int = 0
risk_explanation: str = ''
# Proximity
proximity: BLEProximity = BLEProximity.UNKNOWN
proximity_explanation: str = ''
estimated_distance: str = ''
# Tracker detection
is_tracker: bool = False
tracker_type: Optional[str] = None
tracker_explanation: str = ''
# Meeting correlation
meeting_correlated: bool = False
meeting_explanation: str = ''
# Recommended action
recommended_action: str = ''
action_rationale: str = ''
# All indicators with explanations
indicators: list[dict] = field(default_factory=list)
def to_dict(self) -> dict:
return {
'identifier': self.identifier,
'name': self.name,
'risk': {
'level': self.risk_level,
'score': self.risk_score,
'explanation': self.risk_explanation,
},
'proximity': {
'estimate': self.proximity.value,
'explanation': self.proximity_explanation,
'estimated_distance': self.estimated_distance,
},
'tracker': {
'is_tracker': self.is_tracker,
'type': self.tracker_type,
'explanation': self.tracker_explanation,
},
'meeting_correlation': {
'correlated': self.meeting_correlated,
'explanation': self.meeting_explanation,
},
'recommended_action': {
'action': self.recommended_action,
'rationale': self.action_rationale,
},
'indicators': self.indicators,
'disclaimer': (
"Risk assessment is based on observable indicators and heuristics. "
"Proximity estimates are approximate based on RSSI and may vary with environment. "
"Tracker detection indicates brand presence, not confirmed threat."
),
}
def estimate_ble_proximity(rssi: int) -> tuple[BLEProximity, str, str]:
"""
Estimate BLE device proximity from RSSI.
Note: RSSI-based distance is highly variable due to:
- TX power differences between devices
- Environmental factors (walls, interference)
- Antenna characteristics
Returns:
Tuple of (proximity enum, explanation, estimated distance string)
"""
if rssi is None:
return (
BLEProximity.UNKNOWN,
"RSSI not available - cannot estimate proximity",
"Unknown"
)
# These thresholds are heuristic approximations
if rssi >= -50:
return (
BLEProximity.VERY_CLOSE,
f"Very strong signal ({rssi} dBm) suggests device is very close",
"< 1 meter (approximate)"
)
elif rssi >= -65:
return (
BLEProximity.CLOSE,
f"Strong signal ({rssi} dBm) suggests device is nearby",
"1-3 meters (approximate)"
)
elif rssi >= -80:
return (
BLEProximity.MODERATE,
f"Moderate signal ({rssi} dBm) suggests device is in the area",
"3-10 meters (approximate)"
)
else:
return (
BLEProximity.FAR,
f"Weak signal ({rssi} dBm) suggests device is distant",
"> 10 meters (approximate)"
)
def generate_ble_risk_explanation(
device: dict,
profile: Optional[dict] = None,
is_during_meeting: bool = False
) -> BLERiskExplanation:
"""
Generate human-readable risk explanation for a BLE device.
Args:
device: BLE device dict with mac, name, rssi, etc.
profile: DeviceProfile dict from correlation engine
is_during_meeting: Whether device was detected during meeting
Returns:
BLERiskExplanation with complete assessment
"""
mac = device.get('mac', device.get('address', '')).upper()
name = device.get('name', '')
rssi = device.get('rssi', device.get('signal'))
explanation = BLERiskExplanation(
identifier=mac,
name=name if name else None,
)
# Proximity estimation
if rssi:
try:
rssi_int = int(rssi)
prox, prox_exp, dist = estimate_ble_proximity(rssi_int)
explanation.proximity = prox
explanation.proximity_explanation = prox_exp
explanation.estimated_distance = dist
except (ValueError, TypeError):
explanation.proximity = BLEProximity.UNKNOWN
explanation.proximity_explanation = "Could not parse RSSI value"
# Tracker detection with explanation
tracker_info = device.get('tracker_type') or device.get('is_tracker')
if device.get('is_airtag'):
explanation.is_tracker = True
explanation.tracker_type = 'Apple AirTag'
explanation.tracker_explanation = (
"Apple AirTag detected via manufacturer data. AirTags are legitimate "
"tracking devices but may indicate unwanted tracking if not recognized. "
"Apple's Find My network will alert iPhone users to unknown AirTags."
)
elif device.get('is_tile'):
explanation.is_tracker = True
explanation.tracker_type = 'Tile'
explanation.tracker_explanation = (
"Tile tracker detected. Tile trackers are common consumer devices "
"for finding lost items. Presence does not indicate surveillance."
)
elif device.get('is_smarttag'):
explanation.is_tracker = True
explanation.tracker_type = 'Samsung SmartTag'
explanation.tracker_explanation = (
"Samsung SmartTag detected. SmartTags are consumer tracking devices "
"similar to AirTags. Samsung phones can detect unknown SmartTags."
)
elif device.get('is_espressif'):
explanation.tracker_type = 'ESP32/ESP8266'
explanation.tracker_explanation = (
"Espressif chipset (ESP32/ESP8266) detected. These are programmable "
"development boards commonly used in IoT projects. They can be configured "
"for various purposes including custom tracking devices."
)
# Meeting correlation explanation
if is_during_meeting or device.get('meeting_correlated'):
explanation.meeting_correlated = True
explanation.meeting_explanation = (
"Device detected during a marked meeting window. This temporal correlation "
"is noted but does not confirm malicious intent - many legitimate devices "
"are active during meetings (phones, laptops, wearables)."
)
# Build risk explanation from profile
if profile:
explanation.risk_level = profile.get('risk_level', 'informational')
explanation.risk_score = profile.get('total_score', 0)
# Convert indicators to explanations
for ind in profile.get('indicators', []):
ind_type = ind.get('type', '')
ind_desc = ind.get('description', '')
explanation.indicators.append({
'type': ind_type,
'description': ind_desc,
'explanation': _get_indicator_explanation(ind_type),
})
# Build overall risk explanation
if explanation.risk_level == 'high_interest':
explanation.risk_explanation = (
f"This device has accumulated {explanation.risk_score} risk points "
"across multiple indicators, warranting closer investigation. "
"High interest does not confirm surveillance - manual verification required."
)
elif explanation.risk_level == 'review':
explanation.risk_explanation = (
f"This device shows {explanation.risk_score} risk points indicating "
"it should be reviewed but is not immediately concerning."
)
else:
explanation.risk_explanation = (
"This device shows typical characteristics and does not raise "
"significant concerns based on observable indicators."
)
else:
explanation.risk_explanation = "No detailed profile available for risk assessment."
# Recommended action
_set_recommended_action(explanation)
return explanation
def _get_indicator_explanation(indicator_type: str) -> str:
"""Get human-readable explanation for an indicator type."""
explanations = {
'unknown_device': (
"Device manufacturer is unknown or uses a generic chipset. "
"This is common in DIY/hobbyist devices and some surveillance equipment."
),
'audio_capable': (
"Device advertises audio services (headphones, speakers, etc.). "
"Audio-capable devices could theoretically transmit captured audio."
),
'persistent': (
"Device has been detected repeatedly across multiple scans. "
"Persistence suggests a fixed or regularly present device."
),
'meeting_correlated': (
"Device activity correlates with marked meeting windows. "
"This is a temporal pattern that warrants attention."
),
'hidden_identity': (
"Device does not broadcast a name or uses minimal advertising. "
"Some legitimate devices minimize advertising for battery life."
),
'stable_rssi': (
"Signal strength is very stable, suggesting a stationary device. "
"Fixed placement could indicate a planted device."
),
'mac_rotation': (
"Device appears to use MAC address randomization. "
"This is a privacy feature in modern devices, also used to evade detection."
),
'known_tracker': (
"Device matches known tracking device signatures. "
"May be a legitimate item tracker or unwanted surveillance."
),
'airtag_detected': (
"Apple AirTag identified. Check if this belongs to someone present."
),
'tile_detected': (
"Tile tracker identified. Common consumer tracking device."
),
'smarttag_detected': (
"Samsung SmartTag identified. Consumer tracking device."
),
'esp32_device': (
"Espressif development board detected. Highly programmable, "
"could be configured for custom surveillance applications."
),
}
return explanations.get(indicator_type, "Indicator detected requiring review.")
def _set_recommended_action(explanation: BLERiskExplanation) -> None:
"""Set recommended action based on risk assessment."""
if explanation.risk_level == 'high_interest':
if explanation.is_tracker and explanation.proximity == BLEProximity.VERY_CLOSE:
explanation.recommended_action = 'Investigate immediately'
explanation.action_rationale = (
"Unknown tracker in very close proximity warrants immediate "
"physical search of the area and personal belongings."
)
elif explanation.is_tracker:
explanation.recommended_action = 'Investigate location'
explanation.action_rationale = (
"Tracker detected - recommend searching the area to locate "
"the physical device and determine if it belongs to someone present."
)
else:
explanation.recommended_action = 'Review and document'
explanation.action_rationale = (
"Multiple risk indicators present. Document the finding, "
"attempt to identify the device, and consider physical search "
"if other indicators suggest surveillance."
)
elif explanation.risk_level == 'review':
explanation.recommended_action = 'Monitor and document'
explanation.action_rationale = (
"Device shows some indicators worth noting. Add to monitoring list "
"and compare against future sweeps to identify patterns."
)
else:
explanation.recommended_action = 'Continue monitoring'
explanation.action_rationale = (
"No immediate action required. Device will be tracked in subsequent "
"sweeps for pattern analysis."
)
# =============================================================================
# 9. Operator Playbooks ("What To Do Next")
# =============================================================================
@dataclass
class PlaybookStep:
"""A single step in an operator playbook."""
step_number: int
action: str
details: str
safety_note: Optional[str] = None
@dataclass
class OperatorPlaybook:
"""
Procedural guidance for TSCM operators based on findings.
Playbooks are procedural (what to do), not prescriptive (how to decide).
All guidance is legally safe and professional.
"""
playbook_id: str
title: str
risk_level: str
description: str
steps: list[PlaybookStep] = field(default_factory=list)
when_to_escalate: str = ''
documentation_required: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
'playbook_id': self.playbook_id,
'title': self.title,
'risk_level': self.risk_level,
'description': self.description,
'steps': [
{
'step': s.step_number,
'action': s.action,
'details': s.details,
'safety_note': s.safety_note,
}
for s in self.steps
],
'when_to_escalate': self.when_to_escalate,
'documentation_required': self.documentation_required,
'disclaimer': (
"This playbook provides procedural guidance only. Actions should be "
"adapted to local laws, organizational policies, and professional judgment. "
"Do not disassemble, interfere with, or remove suspected devices without "
"proper authorization and legal guidance."
),
}
# Predefined playbooks by risk level
PLAYBOOKS = {
'high_interest_tracker': OperatorPlaybook(
playbook_id='PB-001',
title='High Interest: Unknown Tracker Detection',
risk_level='high_interest',
description='Guidance for responding to unknown tracking device detection',
steps=[
PlaybookStep(
step_number=1,
action='Document the finding',
details='Record device identifier, signal strength, location, and timestamp. Take screenshots of the detection.',
),
PlaybookStep(
step_number=2,
action='Estimate device location',
details='Use signal strength variations while moving to triangulate approximate device position. Note areas of strongest signal.',
safety_note='Do not touch or disturb any physical device found.',
),
PlaybookStep(
step_number=3,
action='Physical search (if authorized)',
details='Systematically search the high-signal area. Check common hiding spots: under furniture, in plants, behind fixtures, in bags/belongings.',
safety_note='Only conduct physical searches with proper authorization.',
),
PlaybookStep(
step_number=4,
action='Identify device owner',
details='If device is located, determine if it belongs to someone legitimately present. Apple/Samsung/Tile devices can be scanned by their respective apps.',
),
PlaybookStep(
step_number=5,
action='Escalate if unidentified',
details='If device owner cannot be determined and device is in sensitive location, escalate to security management.',
),
],
when_to_escalate='Escalate immediately if: device is concealed in sensitive area, owner cannot be identified, or multiple unknown trackers are found.',
documentation_required=[
'Device identifier (MAC address)',
'Signal strength readings at multiple locations',
'Physical location description',
'Photos of any located devices',
'Names of individuals present during search',
],
),
'high_interest_generic': OperatorPlaybook(
playbook_id='PB-002',
title='High Interest: Suspicious Device Pattern',
risk_level='high_interest',
description='Guidance for devices with multiple high-risk indicators',
steps=[
PlaybookStep(
step_number=1,
action='Review all indicators',
details='Examine each risk indicator in the device profile. Understand why the device scored high interest.',
),
PlaybookStep(
step_number=2,
action='Cross-reference with baseline',
details='Check if device appears in baseline. New devices warrant more scrutiny than known devices.',
),
PlaybookStep(
step_number=3,
action='Monitor for pattern',
details='Continue sweep and note if device persists, moves, or correlates with sensitive activities.',
),
PlaybookStep(
step_number=4,
action='Attempt identification',
details='Research manufacturer OUI, check for matching devices in the environment, ask occupants about devices.',
),
PlaybookStep(
step_number=5,
action='Document and report',
details='Add finding to sweep report with full details. Include in meeting/client debrief.',
),
],
when_to_escalate='Escalate if: device cannot be identified, shows surveillance-consistent behavior, or correlates strongly with sensitive activities.',
documentation_required=[
'Complete device profile',
'All risk indicators with scores',
'Timeline of observations',
'Correlation with meeting windows',
'Any identification attempts and results',
],
),
'needs_review': OperatorPlaybook(
playbook_id='PB-003',
title='Needs Review: Unknown Device',
risk_level='needs_review',
description='Guidance for devices requiring investigation but not immediately concerning',
steps=[
PlaybookStep(
step_number=1,
action='Note the device',
details='Add device to monitoring list. Record basic details: identifier, type, signal strength.',
),
PlaybookStep(
step_number=2,
action='Check against known devices',
details='Verify device is not a known infrastructure device or personal device of authorized personnel.',
),
PlaybookStep(
step_number=3,
action='Continue sweep',
details='Complete the sweep. Review device in context of all findings.',
),
PlaybookStep(
step_number=4,
action='Assess in final review',
details='During sweep wrap-up, decide if device warrants further investigation or can be added to baseline.',
),
],
when_to_escalate='Escalate if: multiple "needs review" devices appear together, or device shows high-interest indicators in subsequent sweeps.',
documentation_required=[
'Device identifier and type',
'Brief description of why flagged',
'Decision made (investigate further / add to baseline / monitor)',
],
),
'informational': OperatorPlaybook(
playbook_id='PB-004',
title='Informational: Known/Expected Device',
risk_level='informational',
description='Guidance for devices that appear normal and expected',
steps=[
PlaybookStep(
step_number=1,
action='Verify against baseline',
details='Confirm device matches baseline entry. Note any changes (signal strength, channel, etc.).',
),
PlaybookStep(
step_number=2,
action='Log observation',
details='Record observation for timeline tracking. Even known devices should be logged.',
),
PlaybookStep(
step_number=3,
action='Continue sweep',
details='No further action required. Proceed with sweep.',
),
],
when_to_escalate='Only escalate if device shows unexpected behavior changes or additional risk indicators.',
documentation_required=[
'Device identifier (for timeline)',
'Observation timestamp',
],
),
'wifi_evil_twin': OperatorPlaybook(
playbook_id='PB-005',
title='High Interest: Evil Twin Pattern Detected',
risk_level='high_interest',
description='Guidance when duplicate SSID with security mismatch is detected',
steps=[
PlaybookStep(
step_number=1,
action='Document both access points',
details='Record details of legitimate AP and suspected rogue: BSSID, security, signal strength, channel.',
),
PlaybookStep(
step_number=2,
action='Verify legitimate AP',
details='Confirm which AP is the authorized infrastructure. Check with IT/facilities if needed.',
),
PlaybookStep(
step_number=3,
action='Locate rogue AP',
details='Use signal strength to estimate rogue AP location. Walk the area noting signal variations.',
safety_note='Do not connect to or interact with the suspected rogue AP.',
),
PlaybookStep(
step_number=4,
action='Physical search',
details='Search suspected area for unauthorized access point. Check for hidden devices, suspicious equipment.',
),
PlaybookStep(
step_number=5,
action='Report to IT Security',
details='Even if device not found, report the finding to IT Security for network monitoring.',
),
],
when_to_escalate='Escalate immediately. Evil twin attacks can capture credentials and traffic.',
documentation_required=[
'Both AP details (BSSID, SSID, security, channel, signal)',
'Location where detected',
'Signal strength map if created',
'Physical search results',
],
),
}
def get_playbook_for_finding(
risk_level: str,
finding_type: Optional[str] = None,
indicators: Optional[list[dict]] = None
) -> OperatorPlaybook:
"""
Get appropriate playbook for a finding.
Args:
risk_level: Risk level string
finding_type: Optional specific finding type
indicators: Optional list of indicators
Returns:
Appropriate OperatorPlaybook
"""
# Check for specific finding types
if finding_type == 'evil_twin':
return PLAYBOOKS['wifi_evil_twin']
# Check indicators for tracker
if indicators:
tracker_types = ['airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker']
if any(i.get('type') in tracker_types for i in indicators):
if risk_level == 'high_interest':
return PLAYBOOKS['high_interest_tracker']
# Return based on risk level
if risk_level == 'high_interest':
return PLAYBOOKS['high_interest_generic']
elif risk_level in ['review', 'needs_review']:
return PLAYBOOKS['needs_review']
else:
return PLAYBOOKS['informational']
def attach_playbook_to_finding(finding: dict) -> dict:
"""
Attach appropriate playbook to a finding dict.
Args:
finding: Finding dict with risk_level, indicators, etc.
Returns:
Finding dict with playbook attached
"""
risk_level = finding.get('risk_level', 'informational')
finding_type = finding.get('finding_type')
indicators = finding.get('indicators', [])
playbook = get_playbook_for_finding(risk_level, finding_type, indicators)
finding['suggested_playbook'] = playbook.to_dict()
finding['suggested_next_steps'] = [
f"Step {s.step_number}: {s.action}"
for s in playbook.steps[:3] # First 3 steps as quick reference
]
return finding
# =============================================================================
# Global Instance Management
# =============================================================================
_timeline_manager: Optional[TimelineManager] = None
_wifi_detector: Optional[WiFiAdvancedDetector] = None
def get_timeline_manager() -> TimelineManager:
"""Get or create global timeline manager."""
global _timeline_manager
if _timeline_manager is None:
_timeline_manager = TimelineManager()
return _timeline_manager
def reset_timeline_manager() -> None:
"""Reset global timeline manager."""
global _timeline_manager
_timeline_manager = TimelineManager()
def get_wifi_detector(monitor_mode: bool = False) -> WiFiAdvancedDetector:
"""Get or create global WiFi detector."""
global _wifi_detector
if _wifi_detector is None:
_wifi_detector = WiFiAdvancedDetector(monitor_mode)
return _wifi_detector
def reset_wifi_detector(monitor_mode: bool = False) -> None:
"""Reset global WiFi detector."""
global _wifi_detector
_wifi_detector = WiFiAdvancedDetector(monitor_mode)