mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 22:59:59 -07:00
679 lines
25 KiB
Python
679 lines
25 KiB
Python
"""
|
|
TSCM Threat Detection Engine
|
|
|
|
Analyzes WiFi, Bluetooth, and RF data to identify potential surveillance devices
|
|
and classify threats based on known patterns and baseline comparison.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from data.tscm_frequencies import (
|
|
BLE_TRACKER_SIGNATURES,
|
|
THREAT_TYPES,
|
|
WIFI_CAMERA_PATTERNS,
|
|
get_frequency_risk,
|
|
get_threat_severity,
|
|
is_known_tracker,
|
|
is_potential_camera,
|
|
)
|
|
from utils.tscm.signal_classification import (
|
|
classify_signal_strength,
|
|
get_signal_strength_info,
|
|
SignalStrength,
|
|
)
|
|
|
|
logger = logging.getLogger('intercept.tscm.detector')
|
|
|
|
# Classification levels for TSCM devices
|
|
CLASSIFICATION_LEVELS = {
|
|
'informational': {
|
|
'color': '#00cc00', # Green
|
|
'label': 'Informational',
|
|
'description': 'Known device, expected infrastructure, or background noise',
|
|
},
|
|
'review': {
|
|
'color': '#ffcc00', # Yellow
|
|
'label': 'Needs Review',
|
|
'description': 'Unknown device requiring investigation',
|
|
},
|
|
'high_interest': {
|
|
'color': '#ff3333', # Red
|
|
'label': 'High Interest',
|
|
'description': 'Suspicious device requiring immediate attention',
|
|
},
|
|
}
|
|
|
|
# BLE device types that can transmit audio (potential bugs)
|
|
AUDIO_CAPABLE_BLE_NAMES = [
|
|
'headphone', 'headset', 'earphone', 'earbud', 'speaker',
|
|
'audio', 'mic', 'microphone', 'airpod', 'buds',
|
|
'jabra', 'bose', 'sony wf', 'sony wh', 'beats',
|
|
'jbl', 'soundcore', 'anker', 'skullcandy',
|
|
]
|
|
|
|
# Device history for tracking repeat detections across scans
|
|
_device_history: dict[str, list[datetime]] = {}
|
|
_history_window_hours = 24 # Consider detections within 24 hours
|
|
|
|
|
|
def _record_device_seen(identifier: str) -> int:
|
|
"""Record a device sighting and return count of times seen."""
|
|
now = datetime.now()
|
|
if identifier not in _device_history:
|
|
_device_history[identifier] = []
|
|
|
|
# Clean old entries
|
|
cutoff = now.timestamp() - (_history_window_hours * 3600)
|
|
_device_history[identifier] = [
|
|
dt for dt in _device_history[identifier]
|
|
if dt.timestamp() > cutoff
|
|
]
|
|
|
|
_device_history[identifier].append(now)
|
|
return len(_device_history[identifier])
|
|
|
|
|
|
def _is_audio_capable_ble(name: str | None, device_type: str | None = None) -> bool:
|
|
"""Check if a BLE device might be audio-capable."""
|
|
if name:
|
|
name_lower = name.lower()
|
|
for pattern in AUDIO_CAPABLE_BLE_NAMES:
|
|
if pattern in name_lower:
|
|
return True
|
|
if device_type:
|
|
type_lower = device_type.lower()
|
|
if any(t in type_lower for t in ['audio', 'headset', 'headphone', 'speaker']):
|
|
return True
|
|
return False
|
|
|
|
|
|
class ThreatDetector:
|
|
"""
|
|
Analyzes scan results to detect potential surveillance threats.
|
|
"""
|
|
|
|
def __init__(self, baseline: dict | None = None):
|
|
"""
|
|
Initialize the threat detector.
|
|
|
|
Args:
|
|
baseline: Optional baseline dict containing expected devices
|
|
"""
|
|
self.baseline = baseline
|
|
self.baseline_wifi_macs = set()
|
|
self.baseline_bt_macs = set()
|
|
self.baseline_rf_freqs = set()
|
|
|
|
if baseline:
|
|
self._load_baseline(baseline)
|
|
|
|
def _load_baseline(self, baseline: dict) -> None:
|
|
"""Load baseline device identifiers for comparison."""
|
|
# WiFi networks and clients
|
|
for network in baseline.get('wifi_networks', []):
|
|
if 'bssid' in network:
|
|
self.baseline_wifi_macs.add(network['bssid'].upper())
|
|
if 'clients' in network:
|
|
for client in network['clients']:
|
|
if 'mac' in client:
|
|
self.baseline_wifi_macs.add(client['mac'].upper())
|
|
|
|
for client in baseline.get('wifi_clients', []):
|
|
if 'mac' in client:
|
|
self.baseline_wifi_macs.add(client['mac'].upper())
|
|
|
|
# Bluetooth devices
|
|
for device in baseline.get('bt_devices', []):
|
|
if 'mac' in device:
|
|
self.baseline_bt_macs.add(device['mac'].upper())
|
|
|
|
# RF frequencies (rounded to nearest 0.1 MHz)
|
|
for freq in baseline.get('rf_frequencies', []):
|
|
if isinstance(freq, dict):
|
|
self.baseline_rf_freqs.add(round(freq.get('frequency', 0), 1))
|
|
else:
|
|
self.baseline_rf_freqs.add(round(freq, 1))
|
|
|
|
logger.info(
|
|
f"Loaded baseline: {len(self.baseline_wifi_macs)} WiFi, "
|
|
f"{len(self.baseline_bt_macs)} BT, {len(self.baseline_rf_freqs)} RF"
|
|
)
|
|
|
|
def classify_wifi_device(self, device: dict) -> dict:
|
|
"""
|
|
Classify a WiFi device into informational/review/high_interest.
|
|
|
|
Returns:
|
|
Dict with 'classification', 'reasons', and metadata
|
|
"""
|
|
mac = device.get('bssid', device.get('mac', '')).upper()
|
|
ssid = device.get('essid', device.get('ssid', ''))
|
|
signal = device.get('power', device.get('signal', -100))
|
|
|
|
reasons = []
|
|
classification = 'informational'
|
|
|
|
# Track repeat detections
|
|
times_seen = _record_device_seen(f'wifi:{mac}') if mac else 1
|
|
|
|
# Check if in baseline (known device)
|
|
in_baseline = mac in self.baseline_wifi_macs if self.baseline else False
|
|
|
|
if in_baseline:
|
|
reasons.append('Known device in baseline')
|
|
classification = 'informational'
|
|
else:
|
|
# New/unknown device
|
|
reasons.append('New WiFi access point')
|
|
classification = 'review'
|
|
|
|
# Check for suspicious patterns -> high interest
|
|
if is_potential_camera(ssid=ssid, mac=mac):
|
|
reasons.append('Matches camera device patterns')
|
|
classification = 'high_interest'
|
|
|
|
try:
|
|
signal_val = int(signal) if signal else -100
|
|
except (ValueError, TypeError):
|
|
signal_val = -100
|
|
|
|
# Use standardized signal classification
|
|
signal_info = get_signal_strength_info(signal_val)
|
|
if not ssid and signal_info['strength'] in ('strong', 'very_strong'):
|
|
reasons.append(f"Hidden SSID with {signal_info['label'].lower()} signal")
|
|
classification = 'high_interest'
|
|
|
|
# Repeat detections across scans
|
|
if times_seen >= 3:
|
|
reasons.append(f'Repeat detection ({times_seen} times)')
|
|
if classification != 'high_interest':
|
|
classification = 'high_interest'
|
|
|
|
# Include standardized signal classification
|
|
signal_info = get_signal_strength_info(signal_val)
|
|
|
|
return {
|
|
'classification': classification,
|
|
'reasons': reasons,
|
|
'in_baseline': in_baseline,
|
|
'times_seen': times_seen,
|
|
'signal_strength': signal_info['strength'],
|
|
'signal_label': signal_info['label'],
|
|
'signal_confidence': signal_info['confidence'],
|
|
}
|
|
|
|
def classify_bt_device(self, device: dict) -> dict:
|
|
"""
|
|
Classify a Bluetooth device into informational/review/high_interest.
|
|
|
|
Now uses the v2 tracker detection data if available.
|
|
|
|
Returns:
|
|
Dict with 'classification', 'reasons', and metadata
|
|
"""
|
|
mac = device.get('mac', device.get('address', '')).upper()
|
|
name = device.get('name', '')
|
|
rssi = device.get('rssi', device.get('signal', -100))
|
|
device_type = device.get('type', '')
|
|
manufacturer_data = device.get('manufacturer_data')
|
|
|
|
reasons = []
|
|
classification = 'informational'
|
|
|
|
# Track repeat detections
|
|
times_seen = _record_device_seen(f'bt:{mac}') if mac else 1
|
|
|
|
# Check if in baseline (known device)
|
|
in_baseline = mac in self.baseline_bt_macs if self.baseline else False
|
|
|
|
# Use v2 tracker detection data if available (from get_tscm_bluetooth_snapshot)
|
|
tracker_data = device.get('tracker', {})
|
|
is_tracker_v2 = tracker_data.get('is_tracker', False)
|
|
tracker_type_v2 = tracker_data.get('type')
|
|
tracker_name_v2 = tracker_data.get('name')
|
|
tracker_confidence_v2 = tracker_data.get('confidence')
|
|
tracker_evidence_v2 = tracker_data.get('evidence', [])
|
|
|
|
# Use v2 risk analysis if available
|
|
risk_data = device.get('risk_analysis', {})
|
|
risk_score = risk_data.get('risk_score', 0)
|
|
risk_factors = risk_data.get('risk_factors', [])
|
|
|
|
# Fall back to legacy detection if v2 not available
|
|
tracker_info_legacy = None
|
|
if not is_tracker_v2:
|
|
tracker_info_legacy = is_known_tracker(name, manufacturer_data)
|
|
|
|
is_tracker = is_tracker_v2 or (tracker_info_legacy is not None)
|
|
|
|
if in_baseline:
|
|
reasons.append('Known device in baseline')
|
|
classification = 'informational'
|
|
else:
|
|
# New/unknown BLE device
|
|
if not name or name == 'Unknown':
|
|
reasons.append('Unknown BLE device')
|
|
classification = 'review'
|
|
else:
|
|
reasons.append('New Bluetooth device')
|
|
classification = 'review'
|
|
|
|
# Check for trackers -> high interest
|
|
if is_tracker_v2:
|
|
tracker_label = tracker_name_v2 or tracker_type_v2 or 'Unknown tracker'
|
|
conf_label = f' ({tracker_confidence_v2})' if tracker_confidence_v2 else ''
|
|
reasons.append(f"Tracker detected: {tracker_label}{conf_label}")
|
|
classification = 'high_interest'
|
|
|
|
# Add evidence from v2 detection
|
|
for evidence_item in tracker_evidence_v2[:2]: # First 2 items
|
|
reasons.append(f"Evidence: {evidence_item}")
|
|
|
|
# Add risk factors if significant
|
|
if risk_score >= 0.3:
|
|
reasons.append(f"Risk score: {int(risk_score * 100)}%")
|
|
for factor in risk_factors[:2]: # First 2 factors
|
|
reasons.append(f"Risk: {factor}")
|
|
|
|
elif tracker_info_legacy:
|
|
reasons.append(f"Known tracker: {tracker_info_legacy.get('name', 'Unknown')}")
|
|
classification = 'high_interest'
|
|
|
|
# Check for audio-capable devices -> high interest
|
|
if _is_audio_capable_ble(name, device_type):
|
|
reasons.append('Audio-capable BLE device')
|
|
classification = 'high_interest'
|
|
|
|
# Strong signal from unknown device - use standardized classification
|
|
try:
|
|
rssi_val = int(rssi) if rssi else -100
|
|
except (ValueError, TypeError):
|
|
rssi_val = -100
|
|
|
|
signal_info = get_signal_strength_info(rssi_val)
|
|
if signal_info['strength'] in ('strong', 'very_strong') and not name:
|
|
reasons.append(f"{signal_info['label']} signal from unnamed device")
|
|
classification = 'high_interest'
|
|
|
|
# Repeat detections across scans
|
|
if times_seen >= 3:
|
|
reasons.append(f'Repeat detection ({times_seen} times)')
|
|
if classification != 'high_interest':
|
|
classification = 'high_interest'
|
|
|
|
# Include standardized signal classification
|
|
try:
|
|
rssi_val = int(rssi) if rssi else -100
|
|
except (ValueError, TypeError):
|
|
rssi_val = -100
|
|
signal_info = get_signal_strength_info(rssi_val)
|
|
|
|
return {
|
|
'classification': classification,
|
|
'reasons': reasons,
|
|
'in_baseline': in_baseline,
|
|
'times_seen': times_seen,
|
|
'is_tracker': is_tracker,
|
|
'tracker_type': tracker_type_v2,
|
|
'tracker_name': tracker_name_v2,
|
|
'tracker_confidence': tracker_confidence_v2,
|
|
'risk_score': risk_score,
|
|
'is_audio_capable': _is_audio_capable_ble(name, device_type),
|
|
'signal_strength': signal_info['strength'],
|
|
'signal_label': signal_info['label'],
|
|
'signal_confidence': signal_info['confidence'],
|
|
}
|
|
|
|
def classify_rf_signal(self, signal: dict) -> dict:
|
|
"""
|
|
Classify an RF signal into informational/review/high_interest.
|
|
|
|
Returns:
|
|
Dict with 'classification', 'reasons', and metadata
|
|
"""
|
|
frequency = signal.get('frequency', 0)
|
|
power = signal.get('power', signal.get('level', -100))
|
|
band = signal.get('band', '')
|
|
|
|
reasons = []
|
|
classification = 'informational'
|
|
freq_rounded = round(frequency, 1)
|
|
|
|
# Track repeat detections
|
|
times_seen = _record_device_seen(f'rf:{freq_rounded}')
|
|
|
|
# Check if in baseline (known frequency)
|
|
in_baseline = freq_rounded in self.baseline_rf_freqs if self.baseline else False
|
|
|
|
# Get frequency risk info
|
|
risk, band_name = get_frequency_risk(frequency)
|
|
|
|
if in_baseline:
|
|
reasons.append('Known frequency in baseline')
|
|
classification = 'informational'
|
|
else:
|
|
# New/unidentified RF carrier
|
|
reasons.append(f'Unidentified RF carrier in {band_name}')
|
|
|
|
if risk == 'low':
|
|
reasons.append('Background RF noise band')
|
|
classification = 'review'
|
|
elif risk == 'medium':
|
|
reasons.append('ISM band signal')
|
|
classification = 'review'
|
|
elif risk in ['high', 'critical']:
|
|
reasons.append(f'High-risk surveillance band: {band_name}')
|
|
classification = 'high_interest'
|
|
|
|
# Strong persistent signal - use standardized classification
|
|
if power:
|
|
power_info = get_signal_strength_info(float(power))
|
|
if power_info['strength'] in ('strong', 'very_strong'):
|
|
reasons.append(f"{power_info['label']} persistent transmitter")
|
|
classification = 'high_interest'
|
|
|
|
# Repeat detections (persistent transmitter)
|
|
if times_seen >= 2:
|
|
reasons.append(f'Persistent transmitter ({times_seen} detections)')
|
|
classification = 'high_interest'
|
|
|
|
# Include standardized signal classification
|
|
try:
|
|
power_val = float(power) if power else -100
|
|
except (ValueError, TypeError):
|
|
power_val = -100
|
|
signal_info = get_signal_strength_info(power_val)
|
|
|
|
return {
|
|
'classification': classification,
|
|
'reasons': reasons,
|
|
'in_baseline': in_baseline,
|
|
'times_seen': times_seen,
|
|
'risk_level': risk,
|
|
'band_name': band_name,
|
|
'signal_strength': signal_info['strength'],
|
|
'signal_label': signal_info['label'],
|
|
'signal_confidence': signal_info['confidence'],
|
|
}
|
|
|
|
def analyze_wifi_device(self, device: dict) -> dict | None:
|
|
"""
|
|
Analyze a WiFi device for threats.
|
|
|
|
Args:
|
|
device: WiFi device dict with bssid, essid, etc.
|
|
|
|
Returns:
|
|
Threat dict if threat detected, None otherwise
|
|
"""
|
|
mac = device.get('bssid', device.get('mac', '')).upper()
|
|
ssid = device.get('essid', device.get('ssid', ''))
|
|
vendor = device.get('vendor', '')
|
|
signal = device.get('power', device.get('signal', -100))
|
|
|
|
threats = []
|
|
|
|
# Check if new device (not in baseline)
|
|
if self.baseline and mac and mac not in self.baseline_wifi_macs:
|
|
threats.append({
|
|
'type': 'new_device',
|
|
'severity': get_threat_severity('new_device', {'signal_strength': signal}),
|
|
'reason': 'Device not present in baseline',
|
|
})
|
|
|
|
# Check for hidden camera patterns
|
|
if is_potential_camera(ssid=ssid, mac=mac, vendor=vendor):
|
|
threats.append({
|
|
'type': 'hidden_camera',
|
|
'severity': get_threat_severity('hidden_camera', {'signal_strength': signal}),
|
|
'reason': 'Device matches WiFi camera patterns',
|
|
})
|
|
|
|
# Check for hidden SSID with strong signal - use standardized classification
|
|
try:
|
|
signal_int = int(signal) if signal else -100
|
|
except (ValueError, TypeError):
|
|
signal_int = -100
|
|
|
|
signal_info = get_signal_strength_info(signal_int)
|
|
if not ssid and signal_info['strength'] in ('strong', 'very_strong'):
|
|
threats.append({
|
|
'type': 'anomaly',
|
|
'severity': 'medium',
|
|
'reason': f"Hidden SSID with {signal_info['label'].lower()} signal",
|
|
})
|
|
|
|
if not threats:
|
|
return None
|
|
|
|
# Return highest severity threat
|
|
threats.sort(key=lambda t: ['low', 'medium', 'high', 'critical'].index(t['severity']), reverse=True)
|
|
|
|
return {
|
|
'threat_type': threats[0]['type'],
|
|
'severity': threats[0]['severity'],
|
|
'source': 'wifi',
|
|
'identifier': mac,
|
|
'name': ssid or 'Hidden Network',
|
|
'signal_strength': signal,
|
|
'details': {
|
|
'all_threats': threats,
|
|
'vendor': vendor,
|
|
'ssid': ssid,
|
|
}
|
|
}
|
|
|
|
def analyze_bt_device(self, device: dict) -> dict | None:
|
|
"""
|
|
Analyze a Bluetooth device for threats.
|
|
|
|
Args:
|
|
device: BT device dict with mac, name, rssi, etc.
|
|
|
|
Returns:
|
|
Threat dict if threat detected, None otherwise
|
|
"""
|
|
mac = device.get('mac', device.get('address', '')).upper()
|
|
name = device.get('name', '')
|
|
rssi = device.get('rssi', device.get('signal', -100))
|
|
manufacturer = device.get('manufacturer', '')
|
|
device_type = device.get('type', '')
|
|
manufacturer_data = device.get('manufacturer_data')
|
|
tracker_data = device.get('tracker', {}) or {}
|
|
|
|
threats = []
|
|
|
|
# Check if new device (not in baseline)
|
|
if self.baseline and mac and mac not in self.baseline_bt_macs:
|
|
threats.append({
|
|
'type': 'new_device',
|
|
'severity': get_threat_severity('new_device', {'signal_strength': rssi}),
|
|
'reason': 'Device not present in baseline',
|
|
})
|
|
|
|
# Check for known trackers (v2 tracker data if available)
|
|
if tracker_data.get('is_tracker'):
|
|
tracker_label = tracker_data.get('name') or tracker_data.get('type') or 'Tracker'
|
|
confidence = str(tracker_data.get('confidence') or '').lower()
|
|
severity = 'high' if confidence in ('high', 'medium') else 'medium'
|
|
threats.append({
|
|
'type': 'tracker',
|
|
'severity': severity,
|
|
'reason': f"Tracker detected: {tracker_label}",
|
|
'tracker_type': tracker_label,
|
|
'details': tracker_data.get('evidence', []),
|
|
})
|
|
|
|
# Check for known trackers (legacy patterns)
|
|
tracker_info = is_known_tracker(name, manufacturer_data)
|
|
if tracker_info:
|
|
threats.append({
|
|
'type': 'tracker',
|
|
'severity': tracker_info.get('risk', 'high'),
|
|
'reason': f"Known tracker detected: {tracker_info.get('name', 'Unknown')}",
|
|
'tracker_type': tracker_info.get('name'),
|
|
})
|
|
|
|
# Check for suspicious BLE beacons (unnamed, persistent) - use standardized classification
|
|
try:
|
|
rssi_int = int(rssi) if rssi else -100
|
|
except (ValueError, TypeError):
|
|
rssi_int = -100
|
|
|
|
signal_info = get_signal_strength_info(rssi_int)
|
|
if not name and signal_info['strength'] in ('moderate', 'strong', 'very_strong'):
|
|
threats.append({
|
|
'type': 'anomaly',
|
|
'severity': 'medium',
|
|
'reason': f"Unnamed BLE device with {signal_info['label'].lower()} signal",
|
|
})
|
|
|
|
if not threats:
|
|
return None
|
|
|
|
# Return highest severity threat
|
|
threats.sort(key=lambda t: ['low', 'medium', 'high', 'critical'].index(t['severity']), reverse=True)
|
|
|
|
return {
|
|
'threat_type': threats[0]['type'],
|
|
'severity': threats[0]['severity'],
|
|
'source': 'bluetooth',
|
|
'identifier': mac,
|
|
'name': name or 'Unknown BLE Device',
|
|
'signal_strength': rssi,
|
|
'details': {
|
|
'all_threats': threats,
|
|
'manufacturer': manufacturer,
|
|
'device_type': device_type,
|
|
}
|
|
}
|
|
|
|
def analyze_rf_signal(self, signal: dict) -> dict | None:
|
|
"""
|
|
Analyze an RF signal for threats.
|
|
|
|
Args:
|
|
signal: RF signal dict with frequency, level, etc.
|
|
|
|
Returns:
|
|
Threat dict if threat detected, None otherwise
|
|
"""
|
|
frequency = signal.get('frequency', 0)
|
|
level = signal.get('level', signal.get('power', -100))
|
|
modulation = signal.get('modulation', '')
|
|
|
|
if not frequency:
|
|
return None
|
|
|
|
threats = []
|
|
freq_rounded = round(frequency, 1)
|
|
|
|
# Check if new frequency (not in baseline)
|
|
if self.baseline and freq_rounded not in self.baseline_rf_freqs:
|
|
risk, band_name = get_frequency_risk(frequency)
|
|
threats.append({
|
|
'type': 'unknown_signal',
|
|
'severity': risk,
|
|
'reason': f'New signal in {band_name}',
|
|
})
|
|
|
|
# Check frequency risk even without baseline
|
|
risk, band_name = get_frequency_risk(frequency)
|
|
if risk in ['high', 'critical']:
|
|
threats.append({
|
|
'type': 'unknown_signal',
|
|
'severity': risk,
|
|
'reason': f'Signal in high-risk band: {band_name}',
|
|
})
|
|
|
|
if not threats:
|
|
return None
|
|
|
|
# Return highest severity threat
|
|
threats.sort(key=lambda t: ['low', 'medium', 'high', 'critical'].index(t['severity']), reverse=True)
|
|
|
|
return {
|
|
'threat_type': threats[0]['type'],
|
|
'severity': threats[0]['severity'],
|
|
'source': 'rf',
|
|
'identifier': f'{frequency:.3f} MHz',
|
|
'name': f'RF Signal @ {frequency:.3f} MHz',
|
|
'signal_strength': level,
|
|
'frequency': frequency,
|
|
'details': {
|
|
'all_threats': threats,
|
|
'modulation': modulation,
|
|
'band_name': band_name,
|
|
}
|
|
}
|
|
|
|
def analyze_all(
|
|
self,
|
|
wifi_devices: list[dict] | None = None,
|
|
bt_devices: list[dict] | None = None,
|
|
rf_signals: list[dict] | None = None
|
|
) -> list[dict]:
|
|
"""
|
|
Analyze all provided devices and signals for threats.
|
|
|
|
Returns:
|
|
List of detected threats sorted by severity
|
|
"""
|
|
threats = []
|
|
|
|
if wifi_devices:
|
|
for device in wifi_devices:
|
|
threat = self.analyze_wifi_device(device)
|
|
if threat:
|
|
threats.append(threat)
|
|
|
|
if bt_devices:
|
|
for device in bt_devices:
|
|
threat = self.analyze_bt_device(device)
|
|
if threat:
|
|
threats.append(threat)
|
|
|
|
if rf_signals:
|
|
for signal in rf_signals:
|
|
threat = self.analyze_rf_signal(signal)
|
|
if threat:
|
|
threats.append(threat)
|
|
|
|
# Sort by severity (critical first)
|
|
severity_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
|
|
threats.sort(key=lambda t: severity_order.get(t.get('severity', 'low'), 3))
|
|
|
|
return threats
|
|
|
|
|
|
def classify_device_threat(
|
|
source: str,
|
|
device: dict,
|
|
baseline: dict | None = None
|
|
) -> dict | None:
|
|
"""
|
|
Convenience function to classify a single device.
|
|
|
|
Args:
|
|
source: Device source ('wifi', 'bluetooth', 'rf')
|
|
device: Device data dict
|
|
baseline: Optional baseline for comparison
|
|
|
|
Returns:
|
|
Threat dict if threat detected, None otherwise
|
|
"""
|
|
detector = ThreatDetector(baseline)
|
|
|
|
if source == 'wifi':
|
|
return detector.analyze_wifi_device(device)
|
|
elif source == 'bluetooth':
|
|
return detector.analyze_bt_device(device)
|
|
elif source == 'rf':
|
|
return detector.analyze_rf_signal(device)
|
|
|
|
return None
|