diff --git a/routes/tscm.py b/routes/tscm.py index 80cf421..244edcf 100644 --- a/routes/tscm.py +++ b/routes/tscm.py @@ -569,14 +569,15 @@ def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]: """Scan for Bluetooth devices using system tools.""" import platform import os - import pty import re - import select + import shutil import subprocess devices = [] seen_macs = set() + logger.info(f"Starting Bluetooth scan (duration={duration}s, interface={interface})") + if platform.system() == 'Darwin': # macOS: Use system_profiler for basic Bluetooth info try: @@ -603,108 +604,153 @@ def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]: 'type': info.get('device_minorType', 'unknown'), 'connected': section == 'device_connected' }) + logger.info(f"macOS Bluetooth scan found {len(devices)} devices") except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError, json.JSONDecodeError) as e: logger.warning(f"macOS Bluetooth scan failed: {e}") else: - # Linux: Use bluetoothctl or hcitool + # Linux: Try multiple methods iface = interface or 'hci0' - # Try bluetoothctl first - try: - master_fd, slave_fd = pty.openpty() - process = subprocess.Popen( - ['bluetoothctl'], - stdin=slave_fd, - stdout=slave_fd, - stderr=slave_fd, - close_fds=True - ) - os.close(slave_fd) - - # Start scanning - time.sleep(0.3) - os.write(master_fd, b'power on\n') - time.sleep(0.3) - os.write(master_fd, b'scan on\n') - - # Collect devices for specified duration - scan_end = time.time() + duration - buffer = '' - - while time.time() < scan_end and _sweep_running: - readable, _, _ = select.select([master_fd], [], [], 1.0) - if readable: - try: - data = os.read(master_fd, 4096) - if not data: - break - buffer += data.decode('utf-8', errors='replace') - - while '\n' in buffer: - line, buffer = buffer.split('\n', 1) - line = re.sub(r'\x1b\[[0-9;]*m', '', line).strip() - - if 'Device' in line: - match = re.search( - r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:' - r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})\s*(.*)', - line - ) - if match: - mac = match.group(1).upper() - name = match.group(2).strip() - # Remove RSSI from name if present - name = re.sub(r'\s*RSSI:\s*-?\d+\s*', '', name).strip() - - if mac not in seen_macs: - seen_macs.add(mac) - devices.append({ - 'mac': mac, - 'name': name or '[Unknown]' - }) - except OSError: - break - - # Stop scanning and cleanup - try: - os.write(master_fd, b'scan off\n') - time.sleep(0.2) - os.write(master_fd, b'quit\n') - except OSError: - pass - - process.terminate() - try: - process.wait(timeout=2) - except subprocess.TimeoutExpired: - process.kill() - - try: - os.close(master_fd) - except OSError: - pass - - except (FileNotFoundError, subprocess.SubprocessError) as e: - logger.warning(f"bluetoothctl scan failed: {e}") - - # Fallback to hcitool + # Method 1: Try hcitool scan (simpler, more reliable) + if shutil.which('hcitool'): try: + logger.info("Trying hcitool scan...") result = subprocess.run( - ['hcitool', '-i', iface, 'scan'], + ['hcitool', '-i', iface, 'scan', '--flush'], capture_output=True, text=True, timeout=duration + 5 ) for line in result.stdout.split('\n'): - parts = line.split() - if len(parts) >= 1 and ':' in parts[0]: - mac = parts[0].upper() - name = ' '.join(parts[1:]) if len(parts) > 1 else '[Unknown]' - if mac not in seen_macs: - seen_macs.add(mac) - devices.append({'mac': mac, 'name': name}) - except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e: + line = line.strip() + if line and '\t' in line: + parts = line.split('\t') + if len(parts) >= 1 and ':' in parts[0]: + mac = parts[0].strip().upper() + name = parts[1].strip() if len(parts) > 1 else 'Unknown' + if mac not in seen_macs: + seen_macs.add(mac) + devices.append({'mac': mac, 'name': name}) + logger.info(f"hcitool scan found {len(devices)} classic BT devices") + except (subprocess.TimeoutExpired, subprocess.SubprocessError) as e: logger.warning(f"hcitool scan failed: {e}") + # Method 2: Try btmgmt for BLE devices + if shutil.which('btmgmt'): + try: + logger.info("Trying btmgmt find...") + result = subprocess.run( + ['btmgmt', 'find'], + capture_output=True, text=True, timeout=duration + 5 + ) + for line in result.stdout.split('\n'): + # Parse btmgmt output: "dev_found: XX:XX:XX:XX:XX:XX type LE..." + if 'dev_found' in line.lower() or ('type' in line.lower() and ':' in line): + mac_match = re.search( + r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:' + r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})', + line + ) + if mac_match: + mac = mac_match.group(1).upper() + if mac not in seen_macs: + seen_macs.add(mac) + # Try to extract name + name_match = re.search(r'name\s+(.+?)(?:\s|$)', line, re.I) + name = name_match.group(1) if name_match else 'Unknown BLE' + devices.append({ + 'mac': mac, + 'name': name, + 'type': 'ble' if 'le' in line.lower() else 'classic' + }) + logger.info(f"btmgmt found {len(devices)} total devices") + except (subprocess.TimeoutExpired, subprocess.SubprocessError) as e: + logger.warning(f"btmgmt find failed: {e}") + + # Method 3: Try bluetoothctl as last resort + if not devices and shutil.which('bluetoothctl'): + try: + import pty + import select + + logger.info("Trying bluetoothctl scan...") + master_fd, slave_fd = pty.openpty() + process = subprocess.Popen( + ['bluetoothctl'], + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + close_fds=True + ) + os.close(slave_fd) + + # Start scanning + time.sleep(0.3) + os.write(master_fd, b'power on\n') + time.sleep(0.3) + os.write(master_fd, b'scan on\n') + + # Collect devices for specified duration + scan_end = time.time() + min(duration, 10) # Cap at 10 seconds + buffer = '' + + while time.time() < scan_end: + readable, _, _ = select.select([master_fd], [], [], 1.0) + if readable: + try: + data = os.read(master_fd, 4096) + if not data: + break + buffer += data.decode('utf-8', errors='replace') + + while '\n' in buffer: + line, buffer = buffer.split('\n', 1) + line = re.sub(r'\x1b\[[0-9;]*m', '', line).strip() + + if 'Device' in line: + match = re.search( + r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:' + r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})\s*(.*)', + line + ) + if match: + mac = match.group(1).upper() + name = match.group(2).strip() + # Remove RSSI from name if present + name = re.sub(r'\s*RSSI:\s*-?\d+\s*', '', name).strip() + + if mac not in seen_macs: + seen_macs.add(mac) + devices.append({ + 'mac': mac, + 'name': name or '[Unknown]' + }) + except OSError: + break + + # Stop scanning and cleanup + try: + os.write(master_fd, b'scan off\n') + time.sleep(0.2) + os.write(master_fd, b'quit\n') + except OSError: + pass + + process.terminate() + try: + process.wait(timeout=2) + except subprocess.TimeoutExpired: + process.kill() + + try: + os.close(master_fd) + except OSError: + pass + + logger.info(f"bluetoothctl scan found {len(devices)} devices") + + except (FileNotFoundError, subprocess.SubprocessError) as e: + logger.warning(f"bluetoothctl scan failed: {e}") + return devices @@ -728,10 +774,15 @@ def _scan_rf_signals(sdr_device: int | None, duration: int = 30) -> list[dict]: signals = [] - if not shutil.which('rtl_power'): - logger.warning("rtl_power not found, RF scanning unavailable") + logger.info(f"Starting RF scan (device={sdr_device})") + + rtl_power_path = shutil.which('rtl_power') + if not rtl_power_path: + logger.warning("rtl_power not found in PATH, RF scanning unavailable") return signals + logger.info(f"Found rtl_power at: {rtl_power_path}") + # Define frequency bands to scan (in Hz) - focus on common bug frequencies # Format: (start_freq, end_freq, bin_size, description) scan_bands = [ @@ -757,10 +808,12 @@ def _scan_rf_signals(sdr_device: int | None, duration: int = 30) -> list[dict]: if not _sweep_running: break + logger.info(f"Scanning {band_name} ({start_freq/1e6:.1f}-{end_freq/1e6:.1f} MHz)") + try: # Run rtl_power for a quick sweep of this band cmd = [ - 'rtl_power', + rtl_power_path, '-f', f'{start_freq}:{end_freq}:{bin_size}', '-g', '40', # Gain '-i', '1', # Integration interval (1 second) @@ -768,13 +821,18 @@ def _scan_rf_signals(sdr_device: int | None, duration: int = 30) -> list[dict]: '-c', '20%', # Crop 20% of edges ] + device_arg + [tmp_path] + logger.debug(f"Running: {' '.join(cmd)}") + result = subprocess.run( cmd, capture_output=True, text=True, - timeout=15 + timeout=30 ) + if result.returncode != 0: + logger.warning(f"rtl_power returned {result.returncode}: {result.stderr}") + # Parse the CSV output if os.path.exists(tmp_path) and os.path.getsize(tmp_path) > 0: with open(tmp_path, 'r') as f: diff --git a/utils/tscm/correlation.py b/utils/tscm/correlation.py new file mode 100644 index 0000000..8562554 --- /dev/null +++ b/utils/tscm/correlation.py @@ -0,0 +1,779 @@ +""" +TSCM Cross-Protocol Correlation Engine + +Correlates Bluetooth, Wi-Fi, and RF indicators to detect potential surveillance activity. +Implements scoring model for risk assessment and provides actionable intelligence. + +DISCLAIMER: This system performs wireless and RF surveillance screening. +Findings indicate anomalies and indicators, not confirmed surveillance devices. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from enum import Enum +from typing import Optional + +logger = logging.getLogger('intercept.tscm.correlation') + + +class RiskLevel(Enum): + """Risk classification levels.""" + INFORMATIONAL = 'informational' # Score 0-2 + NEEDS_REVIEW = 'review' # Score 3-5 + HIGH_INTEREST = 'high_interest' # Score 6+ + + +class IndicatorType(Enum): + """Types of risk indicators.""" + UNKNOWN_DEVICE = 'unknown_device' + AUDIO_CAPABLE = 'audio_capable' + PERSISTENT = 'persistent' + MEETING_CORRELATED = 'meeting_correlated' + CROSS_PROTOCOL = 'cross_protocol' + HIDDEN_IDENTITY = 'hidden_identity' + ROGUE_AP = 'rogue_ap' + BURST_TRANSMISSION = 'burst_transmission' + STABLE_RSSI = 'stable_rssi' + HIGH_FREQ_ADVERTISING = 'high_freq_advertising' + MAC_ROTATION = 'mac_rotation' + NARROWBAND_SIGNAL = 'narrowband_signal' + ALWAYS_ON_CARRIER = 'always_on_carrier' + + +# Scoring weights for each indicator +INDICATOR_SCORES = { + IndicatorType.UNKNOWN_DEVICE: 1, + IndicatorType.AUDIO_CAPABLE: 2, + IndicatorType.PERSISTENT: 2, + IndicatorType.MEETING_CORRELATED: 2, + IndicatorType.CROSS_PROTOCOL: 3, + IndicatorType.HIDDEN_IDENTITY: 2, + IndicatorType.ROGUE_AP: 3, + IndicatorType.BURST_TRANSMISSION: 2, + IndicatorType.STABLE_RSSI: 1, + IndicatorType.HIGH_FREQ_ADVERTISING: 1, + IndicatorType.MAC_ROTATION: 1, + IndicatorType.NARROWBAND_SIGNAL: 2, + IndicatorType.ALWAYS_ON_CARRIER: 2, +} + + +@dataclass +class Indicator: + """A single risk indicator.""" + type: IndicatorType + description: str + score: int + details: dict = field(default_factory=dict) + timestamp: datetime = field(default_factory=datetime.now) + + +@dataclass +class DeviceProfile: + """Complete profile for a detected device.""" + # Identity + identifier: str # MAC, BSSID, or frequency + protocol: str # 'bluetooth', 'wifi', 'rf' + + # Device info + name: Optional[str] = None + manufacturer: Optional[str] = None + device_type: Optional[str] = None + + # Bluetooth-specific + services: list[str] = field(default_factory=list) + company_id: Optional[int] = None + advertising_interval: Optional[int] = None + + # Wi-Fi-specific + ssid: Optional[str] = None + channel: Optional[int] = None + encryption: Optional[str] = None + beacon_interval: Optional[int] = None + is_hidden: bool = False + + # RF-specific + frequency: Optional[float] = None + bandwidth: Optional[float] = None + modulation: Optional[str] = None + + # Common measurements + rssi_samples: list[tuple[datetime, int]] = field(default_factory=list) + first_seen: Optional[datetime] = None + last_seen: Optional[datetime] = None + detection_count: int = 0 + + # Behavioral analysis + indicators: list[Indicator] = field(default_factory=list) + total_score: int = 0 + risk_level: RiskLevel = RiskLevel.INFORMATIONAL + + # Correlation + correlated_devices: list[str] = field(default_factory=list) + + # Output + confidence: float = 0.0 + recommended_action: str = 'monitor' + + def add_rssi_sample(self, rssi: int) -> None: + """Add an RSSI sample with timestamp.""" + self.rssi_samples.append((datetime.now(), rssi)) + # Keep last 100 samples + if len(self.rssi_samples) > 100: + self.rssi_samples = self.rssi_samples[-100:] + + def get_rssi_stability(self) -> float: + """Calculate RSSI stability (0-1, higher = more stable).""" + if len(self.rssi_samples) < 3: + return 0.0 + values = [r for _, r in self.rssi_samples[-20:]] + if not values: + return 0.0 + avg = sum(values) / len(values) + variance = sum((v - avg) ** 2 for v in values) / len(values) + # Convert variance to stability score (lower variance = higher stability) + # Variance of ~0 = 1.0, variance of 100+ = ~0 + return max(0, 1 - (variance / 100)) + + def add_indicator(self, indicator_type: IndicatorType, description: str, + details: dict = None) -> None: + """Add a risk indicator and update score.""" + score = INDICATOR_SCORES.get(indicator_type, 1) + self.indicators.append(Indicator( + type=indicator_type, + description=description, + score=score, + details=details or {} + )) + self._recalculate_score() + + def _recalculate_score(self) -> None: + """Recalculate total score and risk level.""" + self.total_score = sum(i.score for i in self.indicators) + + if self.total_score >= 6: + self.risk_level = RiskLevel.HIGH_INTEREST + self.recommended_action = 'investigate' + elif self.total_score >= 3: + self.risk_level = RiskLevel.NEEDS_REVIEW + self.recommended_action = 'review' + else: + self.risk_level = RiskLevel.INFORMATIONAL + self.recommended_action = 'monitor' + + # Calculate confidence based on number and quality of indicators + indicator_count = len(self.indicators) + self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05)) + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + 'identifier': self.identifier, + 'protocol': self.protocol, + 'name': self.name, + 'manufacturer': self.manufacturer, + 'device_type': self.device_type, + 'ssid': self.ssid, + 'frequency': self.frequency, + 'first_seen': self.first_seen.isoformat() if self.first_seen else None, + 'last_seen': self.last_seen.isoformat() if self.last_seen else None, + 'detection_count': self.detection_count, + 'rssi_current': self.rssi_samples[-1][1] if self.rssi_samples else None, + 'rssi_stability': self.get_rssi_stability(), + 'indicators': [ + { + 'type': i.type.value, + 'description': i.description, + 'score': i.score, + } + for i in self.indicators + ], + 'total_score': self.total_score, + 'risk_level': self.risk_level.value, + 'confidence': round(self.confidence, 2), + 'recommended_action': self.recommended_action, + 'correlated_devices': self.correlated_devices, + } + + +# Known audio-capable BLE service UUIDs +AUDIO_SERVICE_UUIDS = [ + '0000110b-0000-1000-8000-00805f9b34fb', # A2DP Sink + '0000110a-0000-1000-8000-00805f9b34fb', # A2DP Source + '0000111e-0000-1000-8000-00805f9b34fb', # Handsfree + '0000111f-0000-1000-8000-00805f9b34fb', # Handsfree Audio Gateway + '00001108-0000-1000-8000-00805f9b34fb', # Headset + '00001203-0000-1000-8000-00805f9b34fb', # Generic Audio +] + +# Generic chipset vendors (often used in covert devices) +GENERIC_CHIPSET_VENDORS = [ + 'espressif', + 'nordic', + 'texas instruments', + 'silicon labs', + 'realtek', + 'mediatek', + 'qualcomm', + 'broadcom', + 'cypress', + 'dialog', +] + +# Suspicious frequency ranges for RF +SUSPICIOUS_RF_BANDS = [ + {'start': 136, 'end': 174, 'name': 'VHF', 'risk': 'high'}, + {'start': 400, 'end': 470, 'name': 'UHF', 'risk': 'high'}, + {'start': 315, 'end': 316, 'name': '315 MHz ISM', 'risk': 'medium'}, + {'start': 433, 'end': 435, 'name': '433 MHz ISM', 'risk': 'medium'}, + {'start': 868, 'end': 870, 'name': '868 MHz ISM', 'risk': 'medium'}, + {'start': 902, 'end': 928, 'name': '915 MHz ISM', 'risk': 'medium'}, +] + + +class CorrelationEngine: + """ + Cross-protocol correlation engine for TSCM analysis. + + Correlates Bluetooth, Wi-Fi, and RF indicators to identify + potential surveillance activity patterns. + """ + + def __init__(self): + self.device_profiles: dict[str, DeviceProfile] = {} + self.meeting_windows: list[tuple[datetime, datetime]] = [] + self.correlation_window = timedelta(minutes=5) + + def start_meeting_window(self) -> None: + """Mark the start of a sensitive period (meeting).""" + self.meeting_windows.append((datetime.now(), None)) + logger.info("Meeting window started") + + def end_meeting_window(self) -> None: + """Mark the end of a sensitive period.""" + if self.meeting_windows and self.meeting_windows[-1][1] is None: + start = self.meeting_windows[-1][0] + self.meeting_windows[-1] = (start, datetime.now()) + logger.info("Meeting window ended") + + def is_during_meeting(self, timestamp: datetime = None) -> bool: + """Check if timestamp falls within a meeting window.""" + ts = timestamp or datetime.now() + for start, end in self.meeting_windows: + if end is None: + if ts >= start: + return True + elif start <= ts <= end: + return True + return False + + def get_or_create_profile(self, identifier: str, protocol: str) -> DeviceProfile: + """Get existing profile or create new one.""" + key = f"{protocol}:{identifier}" + if key not in self.device_profiles: + self.device_profiles[key] = DeviceProfile( + identifier=identifier, + protocol=protocol, + first_seen=datetime.now() + ) + profile = self.device_profiles[key] + profile.last_seen = datetime.now() + profile.detection_count += 1 + return profile + + def analyze_bluetooth_device(self, device: dict) -> DeviceProfile: + """ + Analyze a Bluetooth device for suspicious indicators. + + Args: + device: Dict with mac, name, rssi, services, manufacturer, etc. + + Returns: + DeviceProfile with risk assessment + """ + mac = device.get('mac', device.get('address', '')).upper() + profile = self.get_or_create_profile(mac, 'bluetooth') + + # Update profile data + profile.name = device.get('name') or profile.name + profile.manufacturer = device.get('manufacturer') or profile.manufacturer + profile.device_type = device.get('type') or profile.device_type + profile.services = device.get('services', []) or profile.services + profile.company_id = device.get('company_id') or profile.company_id + profile.advertising_interval = device.get('advertising_interval') or profile.advertising_interval + + # Add RSSI sample + rssi = device.get('rssi', device.get('signal')) + if rssi: + try: + profile.add_rssi_sample(int(rssi)) + except (ValueError, TypeError): + pass + + # Clear previous indicators for fresh analysis + profile.indicators = [] + + # === Detection Logic === + + # 1. Unknown manufacturer or generic chipset + if not profile.manufacturer: + profile.add_indicator( + IndicatorType.UNKNOWN_DEVICE, + 'Unknown manufacturer', + {'manufacturer': None} + ) + elif any(v in profile.manufacturer.lower() for v in GENERIC_CHIPSET_VENDORS): + profile.add_indicator( + IndicatorType.UNKNOWN_DEVICE, + f'Generic chipset vendor: {profile.manufacturer}', + {'manufacturer': profile.manufacturer} + ) + + # 2. No human-readable name + if not profile.name or profile.name in ['Unknown', '', 'N/A']: + profile.add_indicator( + IndicatorType.HIDDEN_IDENTITY, + 'No device name advertised', + {'name': profile.name} + ) + + # 3. Audio-capable services + if profile.services: + audio_services = [s for s in profile.services + if s.lower() in [u.lower() for u in AUDIO_SERVICE_UUIDS]] + if audio_services: + profile.add_indicator( + IndicatorType.AUDIO_CAPABLE, + 'Audio-capable BLE services detected', + {'services': audio_services} + ) + + # Check name for audio keywords + if profile.name: + audio_keywords = ['headphone', 'headset', 'earphone', 'speaker', + 'mic', 'audio', 'airpod', 'buds', 'jabra', 'bose'] + if any(k in profile.name.lower() for k in audio_keywords): + profile.add_indicator( + IndicatorType.AUDIO_CAPABLE, + f'Audio device name: {profile.name}', + {'name': profile.name} + ) + + # 4. High-frequency advertising (< 100ms interval is suspicious) + if profile.advertising_interval and profile.advertising_interval < 100: + profile.add_indicator( + IndicatorType.HIGH_FREQ_ADVERTISING, + f'High advertising frequency: {profile.advertising_interval}ms', + {'interval': profile.advertising_interval} + ) + + # 5. Persistent presence + if profile.detection_count >= 3: + profile.add_indicator( + IndicatorType.PERSISTENT, + f'Persistent device ({profile.detection_count} detections)', + {'count': profile.detection_count} + ) + + # 6. Stable RSSI (suggests fixed placement) + rssi_stability = profile.get_rssi_stability() + if rssi_stability > 0.7 and len(profile.rssi_samples) >= 5: + profile.add_indicator( + IndicatorType.STABLE_RSSI, + f'Stable signal strength (stability: {rssi_stability:.0%})', + {'stability': rssi_stability} + ) + + # 7. Meeting correlation + if self.is_during_meeting(): + profile.add_indicator( + IndicatorType.MEETING_CORRELATED, + 'Detected during sensitive period', + {'during_meeting': True} + ) + + # 8. MAC rotation pattern (random MAC prefix) + if mac and mac[1] in ['2', '6', 'A', 'E', 'a', 'e']: + profile.add_indicator( + IndicatorType.MAC_ROTATION, + 'Random/rotating MAC address detected', + {'mac': mac} + ) + + return profile + + def analyze_wifi_device(self, device: dict) -> DeviceProfile: + """ + Analyze a Wi-Fi device/AP for suspicious indicators. + + Args: + device: Dict with bssid, ssid, channel, rssi, encryption, etc. + + Returns: + DeviceProfile with risk assessment + """ + bssid = device.get('bssid', device.get('mac', '')).upper() + profile = self.get_or_create_profile(bssid, 'wifi') + + # Update profile data + ssid = device.get('ssid', device.get('essid', '')) + profile.ssid = ssid if ssid else profile.ssid + profile.name = ssid or f'Hidden Network ({bssid[-8:]})' + profile.channel = device.get('channel') or profile.channel + profile.encryption = device.get('encryption', device.get('privacy')) or profile.encryption + profile.beacon_interval = device.get('beacon_interval') or profile.beacon_interval + profile.is_hidden = not ssid or ssid in ['', 'Hidden', '[Hidden]'] + + # Extract manufacturer from OUI + if bssid and len(bssid) >= 8: + profile.manufacturer = device.get('vendor') or profile.manufacturer + + # Add RSSI sample + rssi = device.get('rssi', device.get('power', device.get('signal'))) + if rssi: + try: + profile.add_rssi_sample(int(rssi)) + except (ValueError, TypeError): + pass + + # Clear previous indicators + profile.indicators = [] + + # === Detection Logic === + + # 1. Hidden or unnamed SSID + if profile.is_hidden: + profile.add_indicator( + IndicatorType.HIDDEN_IDENTITY, + 'Hidden or empty SSID', + {'ssid': ssid} + ) + + # 2. BSSID not in authorized list (would need baseline) + # For now, mark as unknown if no manufacturer + if not profile.manufacturer: + profile.add_indicator( + IndicatorType.UNKNOWN_DEVICE, + 'Unknown AP manufacturer', + {'bssid': bssid} + ) + + # 3. Consumer device OUI in restricted environment + consumer_ouis = ['tp-link', 'netgear', 'd-link', 'linksys', 'asus'] + if profile.manufacturer and any(c in profile.manufacturer.lower() for c in consumer_ouis): + profile.add_indicator( + IndicatorType.ROGUE_AP, + f'Consumer-grade AP detected: {profile.manufacturer}', + {'manufacturer': profile.manufacturer} + ) + + # 4. Camera device patterns + camera_keywords = ['cam', 'camera', 'ipcam', 'dvr', 'nvr', 'wyze', + 'ring', 'arlo', 'nest', 'blink', 'eufy', 'yi'] + if ssid and any(k in ssid.lower() for k in camera_keywords): + profile.add_indicator( + IndicatorType.AUDIO_CAPABLE, # Cameras often have mics + f'Potential camera device: {ssid}', + {'ssid': ssid} + ) + + # 5. Persistent presence + if profile.detection_count >= 3: + profile.add_indicator( + IndicatorType.PERSISTENT, + f'Persistent AP ({profile.detection_count} detections)', + {'count': profile.detection_count} + ) + + # 6. Stable RSSI (fixed placement) + rssi_stability = profile.get_rssi_stability() + if rssi_stability > 0.7 and len(profile.rssi_samples) >= 5: + profile.add_indicator( + IndicatorType.STABLE_RSSI, + f'Stable signal (stability: {rssi_stability:.0%})', + {'stability': rssi_stability} + ) + + # 7. Meeting correlation + if self.is_during_meeting(): + profile.add_indicator( + IndicatorType.MEETING_CORRELATED, + 'Detected during sensitive period', + {'during_meeting': True} + ) + + # 8. Strong hidden AP (very suspicious) + if profile.is_hidden and profile.rssi_samples: + latest_rssi = profile.rssi_samples[-1][1] + if latest_rssi > -50: + profile.add_indicator( + IndicatorType.ROGUE_AP, + f'Strong hidden AP (RSSI: {latest_rssi} dBm)', + {'rssi': latest_rssi} + ) + + return profile + + def analyze_rf_signal(self, signal: dict) -> DeviceProfile: + """ + Analyze an RF signal for suspicious indicators. + + Args: + signal: Dict with frequency, power, bandwidth, modulation, etc. + + Returns: + DeviceProfile with risk assessment + """ + frequency = signal.get('frequency', 0) + freq_key = f"{frequency:.3f}" + profile = self.get_or_create_profile(freq_key, 'rf') + + # Update profile data + profile.frequency = frequency + profile.name = f'{frequency:.3f} MHz' + profile.bandwidth = signal.get('bandwidth') or profile.bandwidth + profile.modulation = signal.get('modulation') or profile.modulation + + # Add power sample + power = signal.get('power', signal.get('level')) + if power: + try: + profile.add_rssi_sample(int(float(power))) + except (ValueError, TypeError): + pass + + # Clear previous indicators + profile.indicators = [] + + # === Detection Logic === + + # 1. Determine frequency band risk + band_info = None + for band in SUSPICIOUS_RF_BANDS: + if band['start'] <= frequency <= band['end']: + band_info = band + break + + if band_info: + if band_info['risk'] == 'high': + profile.add_indicator( + IndicatorType.NARROWBAND_SIGNAL, + f"Signal in high-risk band: {band_info['name']}", + {'band': band_info['name'], 'frequency': frequency} + ) + else: + profile.add_indicator( + IndicatorType.UNKNOWN_DEVICE, + f"Signal in ISM band: {band_info['name']}", + {'band': band_info['name'], 'frequency': frequency} + ) + + # 2. Narrowband FM/AM (potential bug) + if profile.modulation and profile.modulation.lower() in ['fm', 'nfm', 'am']: + profile.add_indicator( + IndicatorType.NARROWBAND_SIGNAL, + f'Narrowband {profile.modulation.upper()} signal', + {'modulation': profile.modulation} + ) + + # 3. Persistent/always-on carrier + if profile.detection_count >= 2: + profile.add_indicator( + IndicatorType.ALWAYS_ON_CARRIER, + f'Persistent carrier ({profile.detection_count} detections)', + {'count': profile.detection_count} + ) + + # 4. Strong signal (close proximity) + if profile.rssi_samples: + latest_power = profile.rssi_samples[-1][1] + if latest_power > -40: + profile.add_indicator( + IndicatorType.STABLE_RSSI, + f'Strong signal suggesting close proximity ({latest_power} dBm)', + {'power': latest_power} + ) + + # 5. Meeting correlation + if self.is_during_meeting(): + profile.add_indicator( + IndicatorType.MEETING_CORRELATED, + 'Signal detected during sensitive period', + {'during_meeting': True} + ) + + return profile + + def correlate_devices(self) -> list[dict]: + """ + Perform cross-protocol correlation analysis. + + Identifies devices across protocols that may be related. + + Returns: + List of correlation findings + """ + correlations = [] + now = datetime.now() + + # Get recent devices by protocol + bt_devices = [p for p in self.device_profiles.values() + if p.protocol == 'bluetooth' and + p.last_seen and (now - p.last_seen) < self.correlation_window] + wifi_devices = [p for p in self.device_profiles.values() + if p.protocol == 'wifi' and + p.last_seen and (now - p.last_seen) < self.correlation_window] + rf_signals = [p for p in self.device_profiles.values() + if p.protocol == 'rf' and + p.last_seen and (now - p.last_seen) < self.correlation_window] + + # Correlation 1: BLE audio device + RF narrowband signal + audio_bt = [p for p in bt_devices + if any(i.type == IndicatorType.AUDIO_CAPABLE for i in p.indicators)] + narrowband_rf = [p for p in rf_signals + if any(i.type == IndicatorType.NARROWBAND_SIGNAL for i in p.indicators)] + + for bt in audio_bt: + for rf in narrowband_rf: + correlation = { + 'type': 'bt_audio_rf_narrowband', + 'description': 'Audio-capable BLE device detected alongside narrowband RF signal', + 'devices': [bt.identifier, rf.identifier], + 'protocols': ['bluetooth', 'rf'], + 'score_boost': 3, + 'significance': 'high', + } + correlations.append(correlation) + + # Add cross-protocol indicator to both + bt.add_indicator( + IndicatorType.CROSS_PROTOCOL, + f'Correlated with RF signal at {rf.frequency:.3f} MHz', + {'correlated_device': rf.identifier} + ) + rf.add_indicator( + IndicatorType.CROSS_PROTOCOL, + f'Correlated with BLE device {bt.identifier}', + {'correlated_device': bt.identifier} + ) + bt.correlated_devices.append(rf.identifier) + rf.correlated_devices.append(bt.identifier) + + # Correlation 2: Rogue WiFi AP + RF burst activity + rogue_aps = [p for p in wifi_devices + if any(i.type == IndicatorType.ROGUE_AP for i in p.indicators)] + rf_bursts = [p for p in rf_signals + if any(i.type in [IndicatorType.BURST_TRANSMISSION, + IndicatorType.ALWAYS_ON_CARRIER] for i in p.indicators)] + + for ap in rogue_aps: + for rf in rf_bursts: + correlation = { + 'type': 'rogue_ap_rf_burst', + 'description': 'Rogue AP detected alongside RF transmission', + 'devices': [ap.identifier, rf.identifier], + 'protocols': ['wifi', 'rf'], + 'score_boost': 3, + 'significance': 'high', + } + correlations.append(correlation) + + ap.add_indicator( + IndicatorType.CROSS_PROTOCOL, + f'Correlated with RF at {rf.frequency:.3f} MHz', + {'correlated_device': rf.identifier} + ) + rf.add_indicator( + IndicatorType.CROSS_PROTOCOL, + f'Correlated with AP {ap.ssid or ap.identifier}', + {'correlated_device': ap.identifier} + ) + + # Correlation 3: Same vendor BLE + WiFi + for bt in bt_devices: + if bt.manufacturer: + for wifi in wifi_devices: + if wifi.manufacturer and bt.manufacturer.lower() in wifi.manufacturer.lower(): + correlation = { + 'type': 'same_vendor_bt_wifi', + 'description': f'Same vendor ({bt.manufacturer}) on BLE and WiFi', + 'devices': [bt.identifier, wifi.identifier], + 'protocols': ['bluetooth', 'wifi'], + 'score_boost': 2, + 'significance': 'medium', + } + correlations.append(correlation) + + return correlations + + def get_high_interest_devices(self) -> list[DeviceProfile]: + """Get all devices classified as high interest.""" + return [p for p in self.device_profiles.values() + if p.risk_level == RiskLevel.HIGH_INTEREST] + + def get_all_findings(self) -> dict: + """ + Get comprehensive findings report. + + Returns: + Dict with all device profiles, correlations, and summary + """ + correlations = self.correlate_devices() + + devices_by_risk = { + 'high_interest': [], + 'needs_review': [], + 'informational': [], + } + + for profile in self.device_profiles.values(): + devices_by_risk[profile.risk_level.value].append(profile.to_dict()) + + return { + 'timestamp': datetime.now().isoformat(), + 'summary': { + 'total_devices': len(self.device_profiles), + 'high_interest': len(devices_by_risk['high_interest']), + 'needs_review': len(devices_by_risk['needs_review']), + 'informational': len(devices_by_risk['informational']), + 'correlations_found': len(correlations), + }, + 'devices': devices_by_risk, + 'correlations': correlations, + 'disclaimer': ( + "This system performs wireless and RF surveillance screening. " + "Findings indicate anomalies and indicators, not confirmed surveillance devices." + ), + } + + def clear_old_profiles(self, max_age_hours: int = 24) -> int: + """Remove profiles older than specified age.""" + cutoff = datetime.now() - timedelta(hours=max_age_hours) + old_keys = [ + k for k, v in self.device_profiles.items() + if v.last_seen and v.last_seen < cutoff + ] + for key in old_keys: + del self.device_profiles[key] + return len(old_keys) + + +# Global correlation engine instance +_correlation_engine: CorrelationEngine | None = None + + +def get_correlation_engine() -> CorrelationEngine: + """Get or create the global correlation engine.""" + global _correlation_engine + if _correlation_engine is None: + _correlation_engine = CorrelationEngine() + return _correlation_engine + + +def reset_correlation_engine() -> None: + """Reset the global correlation engine.""" + global _correlation_engine + _correlation_engine = CorrelationEngine()