diff --git a/docs/FEATURES.md b/docs/FEATURES.md index 03ec3c4..0622dbe 100644 --- a/docs/FEATURES.md +++ b/docs/FEATURES.md @@ -75,13 +75,47 @@ Complete feature list for all modules. ## Bluetooth Scanning - **BLE and Classic** Bluetooth device scanning -- **Multiple scan modes** - hcitool, bluetoothctl +- **Multiple scan modes** - hcitool, bluetoothctl, bleak - **Tracker detection** - AirTag, Tile, Samsung SmartTag, Chipolo - **Device classification** - phones, audio, wearables, computers -- **Manufacturer lookup** via OUI database +- **Manufacturer lookup** via OUI database and Bluetooth Company IDs - **Proximity radar** visualization - **Device type breakdown** chart +## TSCM Counter-Surveillance Mode + +Technical Surveillance Countermeasures (TSCM) screening for detecting wireless surveillance indicators. + +### Wireless Sweep Features +- **BLE scanning** with manufacturer data detection (AirTags, Tile, SmartTags, ESP32) +- **WiFi scanning** for rogue APs, hidden SSIDs, camera devices +- **RF spectrum analysis** (requires RTL-SDR) - FM bugs, ISM bands, video transmitters +- **Cross-protocol correlation** - links devices across BLE/WiFi/RF +- **Baseline comparison** - detect new/unknown devices vs known environment + +### MAC-Randomization Resistant Detection +- **Device fingerprinting** based on advertisement payloads, not MAC addresses +- **Behavioral clustering** - groups observations into probable physical devices +- **Session tracking** - monitors device presence windows +- **Timing pattern analysis** - detects characteristic advertising intervals +- **RSSI trajectory correlation** - identifies co-located devices + +### Risk Assessment +- **Three-tier scoring model**: + - Informational (0-2): Known or expected devices + - Needs Review (3-5): Unusual devices requiring assessment + - High Interest (6+): Multiple indicators warrant investigation +- **Risk indicators**: Stable RSSI, audio-capable, ESP32 chipsets, hidden identity, MAC rotation +- **Audit trail** - full evidence chain for each link/flag +- **Client-safe disclaimers** - findings are indicators, not confirmed surveillance + +### Limitations (Documented) +- Cannot detect non-transmitting devices +- False positives/negatives expected +- Results require professional verification +- No cryptographic de-randomization +- Passive screening only (no active probing by default) + ## User Interface - **Mode-specific header stats** - real-time badges showing key metrics per mode diff --git a/docs/HARDWARE.md b/docs/HARDWARE.md index 87ba978..68bea41 100644 --- a/docs/HARDWARE.md +++ b/docs/HARDWARE.md @@ -179,6 +179,7 @@ Open **http://localhost:5050** in your browser. |---------|---------| | `flask` | Web server | | `skyfield` | Satellite tracking | +| `bleak` | BLE scanning with manufacturer data (TSCM) | --- @@ -199,9 +200,57 @@ https://github.com/flightaware/dump1090 --- +## TSCM Mode Requirements + +TSCM (Technical Surveillance Countermeasures) mode requires specific hardware for full functionality: + +### BLE Scanning (Tracker Detection) +- Any Bluetooth adapter supported by your OS +- `bleak` Python library for manufacturer data detection +- Detects: AirTags, Tile, SmartTags, ESP32/ESP8266 devices + +```bash +# Install bleak +pip install bleak>=0.21.0 + +# Or via apt (Debian/Ubuntu) +sudo apt install python3-bleak +``` + +### RF Spectrum Analysis +- **RTL-SDR dongle** (required for RF sweeps) +- `rtl_power` command from `rtl-sdr` package + +Frequency bands scanned: +| Band | Frequency | Purpose | +|------|-----------|---------| +| FM Broadcast | 88-108 MHz | FM bugs | +| 315 MHz ISM | 315 MHz | US wireless devices | +| 433 MHz ISM | 433-434 MHz | EU wireless devices | +| 868 MHz ISM | 868-869 MHz | EU IoT devices | +| 915 MHz ISM | 902-928 MHz | US IoT devices | +| 1.2 GHz | 1200-1300 MHz | Video transmitters | +| 2.4 GHz ISM | 2400-2500 MHz | WiFi/BT/Video | + +```bash +# Linux +sudo apt install rtl-sdr + +# macOS +brew install librtlsdr +``` + +### WiFi Scanning +- Standard WiFi adapter (managed mode for basic scanning) +- Monitor mode capable adapter for advanced features +- `aircrack-ng` suite for monitor mode management + +--- + ## Notes -- **Bluetooth on macOS**: Uses native CoreBluetooth, bluez tools not needed +- **Bluetooth on macOS**: Uses bleak library (CoreBluetooth backend), bluez tools not needed - **WiFi on macOS**: Monitor mode has limited support, full functionality on Linux - **System tools**: `iw`, `iwconfig`, `rfkill`, `ip` are pre-installed on most Linux systems +- **TSCM on macOS**: BLE and WiFi scanning work; RF spectrum requires RTL-SDR diff --git a/requirements.txt b/requirements.txt index 68a3781..bc6fa65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,9 @@ flask>=2.0.0 requests>=2.28.0 +# BLE scanning with manufacturer data detection (optional - for TSCM) +bleak>=0.21.0 + # Satellite tracking (optional - only needed for satellite features) skyfield>=1.45 @@ -14,4 +17,4 @@ pyserial>=3.5 # ruff>=0.1.0 # black>=23.0.0 # mypy>=1.0.0 -flask-sock +flask-sock diff --git a/routes/tscm.py b/routes/tscm.py index e550624..29719f1 100644 --- a/routes/tscm.py +++ b/routes/tscm.py @@ -763,7 +763,12 @@ def _scan_wifi_networks(interface: str) -> list[dict]: def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]: - """Scan for Bluetooth devices using system tools.""" + """ + Scan for Bluetooth devices with manufacturer data detection. + + Uses the BLE scanner module (bleak library) for proper manufacturer ID + detection, with fallback to system tools if bleak is unavailable. + """ import platform import os import re @@ -775,6 +780,47 @@ def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]: logger.info(f"Starting Bluetooth scan (duration={duration}s, interface={interface})") + # Try the BLE scanner module first (uses bleak for proper manufacturer detection) + try: + from utils.tscm.ble_scanner import get_ble_scanner, scan_ble_devices + + logger.info("Using BLE scanner module with manufacturer detection") + ble_devices = scan_ble_devices(duration) + + for ble_dev in ble_devices: + mac = ble_dev.get('mac', '').upper() + if mac and mac not in seen_macs: + seen_macs.add(mac) + + device = { + 'mac': mac, + 'name': ble_dev.get('name', 'Unknown'), + 'rssi': ble_dev.get('rssi'), + 'type': 'ble', + 'manufacturer': ble_dev.get('manufacturer_name'), + 'manufacturer_id': ble_dev.get('manufacturer_id'), + 'is_tracker': ble_dev.get('is_tracker', False), + 'tracker_type': ble_dev.get('tracker_type'), + 'is_airtag': ble_dev.get('is_airtag', False), + 'is_tile': ble_dev.get('is_tile', False), + 'is_smarttag': ble_dev.get('is_smarttag', False), + 'is_espressif': ble_dev.get('is_espressif', False), + 'service_uuids': ble_dev.get('service_uuids', []), + } + devices.append(device) + + if devices: + logger.info(f"BLE scanner found {len(devices)} devices") + trackers = [d for d in devices if d.get('is_tracker')] + if trackers: + logger.info(f"Trackers detected: {[d.get('tracker_type') for d in trackers]}") + return devices + + except ImportError: + logger.warning("BLE scanner module not available, using fallback") + except Exception as e: + logger.warning(f"BLE scanner failed: {e}, using fallback") + if platform.system() == 'Darwin': # macOS: Use system_profiler for basic Bluetooth info try: @@ -1820,3 +1866,298 @@ def _generate_assessment(summary: dict) -> str: "BASELINE ENVIRONMENT: No significant anomalies detected. " "Environment appears consistent with expected wireless activity." ) + + +# ============================================================================= +# Device Identity Endpoints (MAC-Randomization Resistant Detection) +# ============================================================================= + +@tscm_bp.route('/identity/ingest/ble', methods=['POST']) +def ingest_ble_observation(): + """ + Ingest a BLE observation for device identity clustering. + + This endpoint accepts BLE advertisement data and feeds it into the + MAC-randomization resistant device detection engine. + + Expected JSON payload: + { + "timestamp": "2024-01-01T12:00:00", // ISO format or omit for now + "addr": "AA:BB:CC:DD:EE:FF", // BLE address (may be randomized) + "addr_type": "rpa", // public/random_static/rpa/nrpa/unknown + "rssi": -65, // dBm + "tx_power": -10, // dBm (optional) + "adv_type": "ADV_IND", // Advertisement type + "manufacturer_id": 1234, // Company ID (optional) + "manufacturer_data": "0102030405", // Hex string (optional) + "service_uuids": ["uuid1", "uuid2"], // List of UUIDs (optional) + "local_name": "Device Name", // Advertised name (optional) + "appearance": 960, // BLE appearance (optional) + "packet_length": 31 // Total packet length (optional) + } + """ + try: + from utils.tscm.device_identity import ingest_ble_dict + + data = request.get_json() + if not data: + return jsonify({'status': 'error', 'message': 'No data provided'}), 400 + + session = ingest_ble_dict(data) + + return jsonify({ + 'status': 'success', + 'session_id': session.session_id, + 'observation_count': len(session.observations), + }) + + except Exception as e: + logger.error(f"BLE ingestion error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/identity/ingest/wifi', methods=['POST']) +def ingest_wifi_observation(): + """ + Ingest a WiFi observation for device identity clustering. + + Expected JSON payload: + { + "timestamp": "2024-01-01T12:00:00", + "src_mac": "AA:BB:CC:DD:EE:FF", // Client MAC (may be randomized) + "dst_mac": "11:22:33:44:55:66", // Destination MAC + "bssid": "11:22:33:44:55:66", // AP BSSID + "ssid": "NetworkName", // SSID if available + "frame_type": "probe_request", // Frame type + "rssi": -70, // dBm + "channel": 6, // WiFi channel + "ht_capable": true, // 802.11n capable + "vht_capable": true, // 802.11ac capable + "he_capable": false, // 802.11ax capable + "supported_rates": [1, 2, 5.5, 11], // Supported rates + "vendor_ies": [["001122", 10]], // [(OUI, length), ...] + "probed_ssids": ["ssid1", "ssid2"] // For probe requests + } + """ + try: + from utils.tscm.device_identity import ingest_wifi_dict + + data = request.get_json() + if not data: + return jsonify({'status': 'error', 'message': 'No data provided'}), 400 + + session = ingest_wifi_dict(data) + + return jsonify({ + 'status': 'success', + 'session_id': session.session_id, + 'observation_count': len(session.observations), + }) + + except Exception as e: + logger.error(f"WiFi ingestion error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/identity/ingest/batch', methods=['POST']) +def ingest_batch_observations(): + """ + Ingest multiple observations in a single request. + + Expected JSON payload: + { + "ble": [, ...], + "wifi": [, ...] + } + """ + try: + from utils.tscm.device_identity import ingest_ble_dict, ingest_wifi_dict + + data = request.get_json() + if not data: + return jsonify({'status': 'error', 'message': 'No data provided'}), 400 + + ble_count = 0 + wifi_count = 0 + + for ble_obs in data.get('ble', []): + ingest_ble_dict(ble_obs) + ble_count += 1 + + for wifi_obs in data.get('wifi', []): + ingest_wifi_dict(wifi_obs) + wifi_count += 1 + + return jsonify({ + 'status': 'success', + 'ble_ingested': ble_count, + 'wifi_ingested': wifi_count, + }) + + except Exception as e: + logger.error(f"Batch ingestion error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/identity/clusters') +def get_device_clusters(): + """ + Get all device clusters (probable physical device identities). + + Query parameters: + - min_confidence: Minimum cluster confidence (0-1, default 0) + - protocol: Filter by protocol ('ble' or 'wifi') + - risk_level: Filter by risk level ('high', 'medium', 'low', 'informational') + """ + try: + from utils.tscm.device_identity import get_identity_engine + + engine = get_identity_engine() + min_conf = request.args.get('min_confidence', 0, type=float) + protocol = request.args.get('protocol') + risk_filter = request.args.get('risk_level') + + clusters = engine.get_clusters(min_confidence=min_conf) + + if protocol: + clusters = [c for c in clusters if c.protocol == protocol] + + if risk_filter: + clusters = [c for c in clusters if c.risk_level.value == risk_filter] + + return jsonify({ + 'status': 'success', + 'count': len(clusters), + 'clusters': [c.to_dict() for c in clusters], + 'disclaimer': ( + "Clusters represent PROBABLE device identities based on passive " + "fingerprinting. Results are statistical correlations, not " + "confirmed matches. False positives/negatives are expected." + ) + }) + + except Exception as e: + logger.error(f"Get clusters error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/identity/clusters/high-risk') +def get_high_risk_clusters(): + """Get device clusters with HIGH risk level.""" + try: + from utils.tscm.device_identity import get_identity_engine + + engine = get_identity_engine() + clusters = engine.get_high_risk_clusters() + + return jsonify({ + 'status': 'success', + 'count': len(clusters), + 'clusters': [c.to_dict() for c in clusters], + 'disclaimer': ( + "High-risk classification indicates multiple behavioral indicators " + "consistent with potential surveillance devices. This does NOT " + "confirm surveillance activity. Professional verification required." + ) + }) + + except Exception as e: + logger.error(f"Get high-risk clusters error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/identity/summary') +def get_identity_summary(): + """ + Get summary of device identity analysis. + + Returns statistics, cluster counts by risk level, and monitoring period. + """ + try: + from utils.tscm.device_identity import get_identity_engine + + engine = get_identity_engine() + summary = engine.get_summary() + + return jsonify({ + 'status': 'success', + 'summary': summary + }) + + except Exception as e: + logger.error(f"Get identity summary error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/identity/finalize', methods=['POST']) +def finalize_identity_sessions(): + """ + Finalize all active sessions and complete clustering. + + Call this at the end of a monitoring period to ensure all observations + are properly clustered and assessed. + """ + try: + from utils.tscm.device_identity import get_identity_engine + + engine = get_identity_engine() + engine.finalize_all_sessions() + summary = engine.get_summary() + + return jsonify({ + 'status': 'success', + 'message': 'All sessions finalized', + 'summary': summary + }) + + except Exception as e: + logger.error(f"Finalize sessions error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/identity/reset', methods=['POST']) +def reset_identity_engine(): + """ + Reset the device identity engine. + + Clears all sessions, clusters, and monitoring state. + """ + try: + from utils.tscm.device_identity import reset_identity_engine as reset_engine + + reset_engine() + + return jsonify({ + 'status': 'success', + 'message': 'Device identity engine reset' + }) + + except Exception as e: + logger.error(f"Reset identity engine error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@tscm_bp.route('/identity/cluster/') +def get_cluster_detail(cluster_id: str): + """Get detailed information for a specific cluster.""" + try: + from utils.tscm.device_identity import get_identity_engine + + engine = get_identity_engine() + + if cluster_id not in engine.clusters: + return jsonify({ + 'status': 'error', + 'message': 'Cluster not found' + }), 404 + + cluster = engine.clusters[cluster_id] + + return jsonify({ + 'status': 'success', + 'cluster': cluster.to_dict() + }) + + except Exception as e: + logger.error(f"Get cluster detail error: {e}") + return jsonify({'status': 'error', 'message': str(e)}), 500 diff --git a/setup.sh b/setup.sh index 66a0f65..fbbe6d6 100755 --- a/setup.sh +++ b/setup.sh @@ -348,6 +348,7 @@ install_macos_packages() { brew_install gpsd warn "macOS note: hcitool/hciconfig are Linux (BlueZ) utilities and often unavailable on macOS." + info "TSCM BLE scanning uses bleak library (installed via pip) for manufacturer data detection." echo } @@ -539,6 +540,8 @@ install_debian_packages() { # Install Python packages via apt (more reliable than pip on modern Debian/Ubuntu) $SUDO apt-get install -y python3-flask python3-requests python3-serial >/dev/null 2>&1 || true $SUDO apt-get install -y python3-skyfield >/dev/null 2>&1 || true + # bleak for BLE scanning with manufacturer data (TSCM mode) + $SUDO apt-get install -y python3-bleak >/dev/null 2>&1 || true progress "Installing dump1090" if ! cmd_exists dump1090 && ! cmd_exists dump1090-mutability; then diff --git a/utils/tscm/__init__.py b/utils/tscm/__init__.py index f67f9a4..062d447 100644 --- a/utils/tscm/__init__.py +++ b/utils/tscm/__init__.py @@ -1,10 +1,11 @@ """ TSCM (Technical Surveillance Countermeasures) Utilities Package -Provides baseline recording, threat detection, and analysis tools +Provides baseline recording, threat detection, correlation analysis, +BLE scanning, and MAC-randomization resistant device identity tools for counter-surveillance operations. """ from __future__ import annotations -__all__ = ['detector', 'baseline'] +__all__ = ['detector', 'baseline', 'correlation', 'ble_scanner', 'device_identity'] diff --git a/utils/tscm/ble_scanner.py b/utils/tscm/ble_scanner.py new file mode 100644 index 0000000..81bfc05 --- /dev/null +++ b/utils/tscm/ble_scanner.py @@ -0,0 +1,476 @@ +""" +BLE Scanner for TSCM + +Cross-platform BLE scanning with manufacturer data detection. +Supports macOS and Linux using the bleak library with fallback to system tools. + +Detects: +- Apple AirTags (company ID 0x004C) +- Tile trackers +- Samsung SmartTags +- ESP32/ESP8266 devices (Espressif, company ID 0x02E5) +- Generic BLE devices with suspicious characteristics +""" + +import asyncio +import logging +import platform +import re +import subprocess +from dataclasses import dataclass, field +from datetime import datetime +from typing import Optional + +logger = logging.getLogger('intercept.tscm.ble') + +# Manufacturer company IDs (Bluetooth SIG assigned) +COMPANY_IDS = { + 0x004C: 'Apple', + 0x02E5: 'Espressif', + 0x0059: 'Nordic Semiconductor', + 0x000D: 'Texas Instruments', + 0x0075: 'Samsung', + 0x00E0: 'Google', + 0x0006: 'Microsoft', + 0x01DA: 'Tile', +} + +# Known tracker signatures +TRACKER_SIGNATURES = { + # Apple AirTag detection patterns + 'airtag': { + 'company_id': 0x004C, + 'data_patterns': [ + b'\x12\x19', # AirTag/Find My advertisement prefix + b'\x07\x19', # Offline Finding + ], + 'name_patterns': ['airtag', 'findmy', 'find my'], + }, + # Tile tracker + 'tile': { + 'company_id': 0x01DA, + 'name_patterns': ['tile'], + }, + # Samsung SmartTag + 'smarttag': { + 'company_id': 0x0075, + 'name_patterns': ['smarttag', 'smart tag', 'galaxy smart'], + }, + # ESP32/ESP8266 + 'espressif': { + 'company_id': 0x02E5, + 'name_patterns': ['esp32', 'esp8266', 'espressif'], + }, +} + + +@dataclass +class BLEDevice: + """Represents a detected BLE device with full advertisement data.""" + mac: str + name: Optional[str] = None + rssi: Optional[int] = None + manufacturer_id: Optional[int] = None + manufacturer_name: Optional[str] = None + manufacturer_data: bytes = field(default_factory=bytes) + service_uuids: list = field(default_factory=list) + tx_power: Optional[int] = None + is_connectable: bool = True + + # Detection flags + is_airtag: bool = False + is_tile: bool = False + is_smarttag: bool = False + is_espressif: bool = False + is_tracker: bool = False + tracker_type: Optional[str] = None + + first_seen: datetime = field(default_factory=datetime.now) + last_seen: datetime = field(default_factory=datetime.now) + detection_count: int = 1 + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + 'mac': self.mac, + 'name': self.name or 'Unknown', + 'rssi': self.rssi, + 'manufacturer_id': self.manufacturer_id, + 'manufacturer_name': self.manufacturer_name, + 'service_uuids': self.service_uuids, + 'tx_power': self.tx_power, + 'is_connectable': self.is_connectable, + 'is_airtag': self.is_airtag, + 'is_tile': self.is_tile, + 'is_smarttag': self.is_smarttag, + 'is_espressif': self.is_espressif, + 'is_tracker': self.is_tracker, + 'tracker_type': self.tracker_type, + 'detection_count': self.detection_count, + 'type': 'ble', + } + + +class BLEScanner: + """ + Cross-platform BLE scanner with manufacturer data detection. + + Uses bleak library for proper BLE scanning, with fallback to + system tools (hcitool/btmgmt on Linux, system_profiler on macOS). + """ + + def __init__(self): + self.devices: dict[str, BLEDevice] = {} + self._bleak_available = self._check_bleak() + self._scanning = False + + def _check_bleak(self) -> bool: + """Check if bleak library is available.""" + try: + import bleak + return True + except ImportError: + logger.warning("bleak library not available - using fallback scanning") + return False + + async def scan_async(self, duration: int = 10) -> list[BLEDevice]: + """ + Perform async BLE scan using bleak. + + Args: + duration: Scan duration in seconds + + Returns: + List of detected BLE devices + """ + if not self._bleak_available: + # Use synchronous fallback + return self._scan_fallback(duration) + + try: + from bleak import BleakScanner + from bleak.backends.device import BLEDevice as BleakDevice + from bleak.backends.scanner import AdvertisementData + + detected = {} + + def detection_callback(device: BleakDevice, adv_data: AdvertisementData): + """Callback for each detected device.""" + mac = device.address.upper() + + if mac in detected: + # Update existing device + detected[mac].rssi = adv_data.rssi + detected[mac].last_seen = datetime.now() + detected[mac].detection_count += 1 + else: + # Create new device entry + ble_device = BLEDevice( + mac=mac, + name=adv_data.local_name or device.name, + rssi=adv_data.rssi, + service_uuids=list(adv_data.service_uuids) if adv_data.service_uuids else [], + tx_power=adv_data.tx_power, + ) + + # Parse manufacturer data + if adv_data.manufacturer_data: + for company_id, data in adv_data.manufacturer_data.items(): + ble_device.manufacturer_id = company_id + ble_device.manufacturer_name = COMPANY_IDS.get(company_id, f'Unknown ({hex(company_id)})') + ble_device.manufacturer_data = bytes(data) + + # Check for known trackers + self._identify_tracker(ble_device, company_id, data) + + # Also check name patterns + self._check_name_patterns(ble_device) + + detected[mac] = ble_device + + logger.info(f"Starting BLE scan with bleak (duration={duration}s)") + + scanner = BleakScanner(detection_callback=detection_callback) + await scanner.start() + await asyncio.sleep(duration) + await scanner.stop() + + # Update internal device list + for mac, device in detected.items(): + if mac in self.devices: + self.devices[mac].rssi = device.rssi + self.devices[mac].last_seen = device.last_seen + self.devices[mac].detection_count += 1 + else: + self.devices[mac] = device + + logger.info(f"BLE scan complete: {len(detected)} devices found") + return list(detected.values()) + + except Exception as e: + logger.error(f"Bleak scan failed: {e}") + return self._scan_fallback(duration) + + def scan(self, duration: int = 10) -> list[BLEDevice]: + """ + Synchronous wrapper for BLE scanning. + + Args: + duration: Scan duration in seconds + + Returns: + List of detected BLE devices + """ + if self._bleak_available: + try: + # Try to get existing event loop + try: + loop = asyncio.get_running_loop() + # We're in an async context, can't use run() + future = asyncio.ensure_future(self.scan_async(duration)) + return asyncio.get_event_loop().run_until_complete(future) + except RuntimeError: + # No running loop, create one + return asyncio.run(self.scan_async(duration)) + except Exception as e: + logger.error(f"Async scan failed: {e}") + return self._scan_fallback(duration) + else: + return self._scan_fallback(duration) + + def _identify_tracker(self, device: BLEDevice, company_id: int, data: bytes): + """Identify if device is a known tracker type.""" + + # Apple AirTag detection + if company_id == 0x004C: # Apple + # Check for Find My / AirTag advertisement patterns + if len(data) >= 2: + # AirTag advertisements have specific byte patterns + if data[0] == 0x12 and data[1] == 0x19: + device.is_airtag = True + device.is_tracker = True + device.tracker_type = 'AirTag' + logger.info(f"AirTag detected: {device.mac}") + elif data[0] == 0x07: # Offline Finding + device.is_airtag = True + device.is_tracker = True + device.tracker_type = 'AirTag (Offline)' + logger.info(f"AirTag (offline mode) detected: {device.mac}") + + # Tile tracker + elif company_id == 0x01DA: # Tile + device.is_tile = True + device.is_tracker = True + device.tracker_type = 'Tile' + logger.info(f"Tile tracker detected: {device.mac}") + + # Samsung SmartTag + elif company_id == 0x0075: # Samsung + # Check if it's specifically a SmartTag + device.is_smarttag = True + device.is_tracker = True + device.tracker_type = 'SmartTag' + logger.info(f"Samsung SmartTag detected: {device.mac}") + + # Espressif (ESP32/ESP8266) + elif company_id == 0x02E5: # Espressif + device.is_espressif = True + device.tracker_type = 'ESP32/ESP8266' + logger.info(f"ESP32/ESP8266 device detected: {device.mac}") + + def _check_name_patterns(self, device: BLEDevice): + """Check device name for tracker patterns.""" + if not device.name: + return + + name_lower = device.name.lower() + + # Check each tracker type + for tracker_type, sig in TRACKER_SIGNATURES.items(): + patterns = sig.get('name_patterns', []) + for pattern in patterns: + if pattern in name_lower: + if tracker_type == 'airtag': + device.is_airtag = True + device.is_tracker = True + device.tracker_type = 'AirTag' + elif tracker_type == 'tile': + device.is_tile = True + device.is_tracker = True + device.tracker_type = 'Tile' + elif tracker_type == 'smarttag': + device.is_smarttag = True + device.is_tracker = True + device.tracker_type = 'SmartTag' + elif tracker_type == 'espressif': + device.is_espressif = True + device.tracker_type = 'ESP32/ESP8266' + + logger.info(f"Tracker identified by name: {device.name} -> {tracker_type}") + return + + def _scan_fallback(self, duration: int = 10) -> list[BLEDevice]: + """ + Fallback scanning using system tools when bleak is unavailable. + Works on both macOS and Linux. + """ + system = platform.system() + + if system == 'Darwin': + return self._scan_macos(duration) + else: + return self._scan_linux(duration) + + def _scan_macos(self, duration: int = 10) -> list[BLEDevice]: + """Fallback BLE scanning on macOS using system_profiler.""" + devices = [] + + try: + import json + result = subprocess.run( + ['system_profiler', 'SPBluetoothDataType', '-json'], + capture_output=True, text=True, timeout=15 + ) + data = json.loads(result.stdout) + bt_data = data.get('SPBluetoothDataType', [{}])[0] + + # Get connected/paired devices + for section in ['device_connected', 'device_title']: + section_data = bt_data.get(section, {}) + if isinstance(section_data, dict): + for name, info in section_data.items(): + if isinstance(info, dict): + mac = info.get('device_address', '').upper() + if mac: + device = BLEDevice( + mac=mac, + name=name, + ) + # Check name patterns + self._check_name_patterns(device) + devices.append(device) + + logger.info(f"macOS fallback scan found {len(devices)} devices") + except Exception as e: + logger.error(f"macOS fallback scan failed: {e}") + + return devices + + def _scan_linux(self, duration: int = 10) -> list[BLEDevice]: + """Fallback BLE scanning on Linux using bluetoothctl/btmgmt.""" + import shutil + + devices = [] + seen_macs = set() + + # Method 1: Try btmgmt for BLE devices + if shutil.which('btmgmt'): + try: + logger.info("Trying btmgmt find...") + result = subprocess.run( + ['btmgmt', 'find'], + capture_output=True, text=True, timeout=duration + 5 + ) + + for line in result.stdout.split('\n'): + if 'dev_found' in line.lower() or ('type' in line.lower() and ':' in line): + mac_match = re.search( + r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:' + r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})', + line + ) + if mac_match: + mac = mac_match.group(1).upper() + if mac not in seen_macs: + seen_macs.add(mac) + name_match = re.search(r'name\s+(.+?)(?:\s|$)', line, re.I) + name = name_match.group(1) if name_match else None + + device = BLEDevice(mac=mac, name=name) + self._check_name_patterns(device) + devices.append(device) + + logger.info(f"btmgmt found {len(devices)} devices") + except Exception as e: + logger.warning(f"btmgmt failed: {e}") + + # Method 2: Try hcitool lescan + if not devices and shutil.which('hcitool'): + try: + logger.info("Trying hcitool lescan...") + # Start lescan in background + process = subprocess.Popen( + ['hcitool', 'lescan', '--duplicates'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + + import time + time.sleep(duration) + process.terminate() + + stdout, _ = process.communicate(timeout=2) + + for line in stdout.split('\n'): + mac_match = re.search( + r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:' + r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})', + line + ) + if mac_match: + mac = mac_match.group(1).upper() + if mac not in seen_macs: + seen_macs.add(mac) + # Extract name (comes after MAC) + parts = line.strip().split() + name = ' '.join(parts[1:]) if len(parts) > 1 else None + + device = BLEDevice(mac=mac, name=name if name != '(unknown)' else None) + self._check_name_patterns(device) + devices.append(device) + + logger.info(f"hcitool lescan found {len(devices)} devices") + except Exception as e: + logger.warning(f"hcitool lescan failed: {e}") + + return devices + + def get_trackers(self) -> list[BLEDevice]: + """Get all detected tracker devices.""" + return [d for d in self.devices.values() if d.is_tracker] + + def get_espressif_devices(self) -> list[BLEDevice]: + """Get all detected ESP32/ESP8266 devices.""" + return [d for d in self.devices.values() if d.is_espressif] + + def clear(self): + """Clear all detected devices.""" + self.devices.clear() + + +# Singleton instance +_scanner: Optional[BLEScanner] = None + + +def get_ble_scanner() -> BLEScanner: + """Get the global BLE scanner instance.""" + global _scanner + if _scanner is None: + _scanner = BLEScanner() + return _scanner + + +def scan_ble_devices(duration: int = 10) -> list[dict]: + """ + Convenience function to scan for BLE devices. + + Args: + duration: Scan duration in seconds + + Returns: + List of device dictionaries + """ + scanner = get_ble_scanner() + devices = scanner.scan(duration) + return [d.to_dict() for d in devices] diff --git a/utils/tscm/correlation.py b/utils/tscm/correlation.py index ce09007..59fba1d 100644 --- a/utils/tscm/correlation.py +++ b/utils/tscm/correlation.py @@ -447,8 +447,62 @@ class CorrelationEngine: mac_prefix = mac[:8] if len(mac) >= 8 else '' tracker_detected = False - # Check for Apple AirTag - if mac_prefix in TRACKER_SIGNATURES.get('airtag_oui', []): + # Check for tracker flags from BLE scanner (manufacturer ID detection) + if device.get('is_airtag'): + profile.add_indicator( + IndicatorType.AIRTAG_DETECTED, + 'Apple AirTag detected via manufacturer data', + {'mac': mac, 'tracker_type': 'AirTag'} + ) + profile.device_type = device.get('tracker_type', 'AirTag') + tracker_detected = True + + if device.get('is_tile'): + profile.add_indicator( + IndicatorType.TILE_DETECTED, + 'Tile tracker detected via manufacturer data', + {'mac': mac, 'tracker_type': 'Tile'} + ) + profile.device_type = 'Tile Tracker' + tracker_detected = True + + if device.get('is_smarttag'): + profile.add_indicator( + IndicatorType.SMARTTAG_DETECTED, + 'Samsung SmartTag detected via manufacturer data', + {'mac': mac, 'tracker_type': 'SmartTag'} + ) + profile.device_type = 'Samsung SmartTag' + tracker_detected = True + + if device.get('is_espressif'): + profile.add_indicator( + IndicatorType.ESP32_DEVICE, + 'ESP32/ESP8266 detected via Espressif manufacturer ID', + {'mac': mac, 'chipset': 'Espressif'} + ) + profile.manufacturer = 'Espressif' + profile.device_type = device.get('tracker_type', 'ESP32/ESP8266') + tracker_detected = True + + # Check manufacturer_id directly + mfg_id = device.get('manufacturer_id') + if mfg_id: + if mfg_id == 0x004C and not device.get('is_airtag'): + # Apple device - could be AirTag + profile.manufacturer = 'Apple' + elif mfg_id == 0x02E5 and not device.get('is_espressif'): + # Espressif device + profile.add_indicator( + IndicatorType.ESP32_DEVICE, + 'ESP32/ESP8266 detected via manufacturer ID', + {'mac': mac, 'manufacturer_id': mfg_id} + ) + profile.manufacturer = 'Espressif' + tracker_detected = True + + # Fallback: Check for Apple AirTag by OUI + if not tracker_detected and mac_prefix in TRACKER_SIGNATURES.get('airtag_oui', []): profile.add_indicator( IndicatorType.AIRTAG_DETECTED, 'Apple AirTag detected - potential tracking device', diff --git a/utils/tscm/device_identity.py b/utils/tscm/device_identity.py new file mode 100644 index 0000000..9b0fe66 --- /dev/null +++ b/utils/tscm/device_identity.py @@ -0,0 +1,1219 @@ +""" +Randomized MAC Resistant Device Detection + +Clusters BLE and WiFi observations into "probable same physical device" +identities using passive fingerprinting techniques. Does NOT attempt to +de-randomize MACs cryptographically or bypass privacy protections. + +This is passive screening + correlation only for TSCM purposes. + +LIMITATIONS AND DISCLAIMERS: +- Clustering confidence scores indicate statistical similarity, not certainty +- False positives and false negatives are expected +- Results should be treated as indicators requiring professional verification +- No attribution claims about specific device models or manufacturers +- Cannot detect devices that don't transmit or use advanced evasion + +Key Techniques Used: +1. Advertisement payload fingerprinting (manufacturer data, service UUIDs) +2. Timing correlation (appearance/disappearance patterns, ad intervals) +3. RSSI trajectory analysis (physical proximity/movement patterns) +4. Capability fingerprinting (WiFi HT/VHT/HE, rates, vendor IEs) +5. Behavioral pattern matching (frame types, payload structure) +""" + +from __future__ import annotations + +import hashlib +import logging +import math +import statistics +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from enum import Enum +from typing import Optional + +logger = logging.getLogger('intercept.tscm.device_identity') + + +# ============================================================================= +# Constants and Configuration +# ============================================================================= + +# Session gap thresholds (seconds) +BLE_SESSION_GAP = 60 # New session if no observations for 60s +WIFI_SESSION_GAP = 120 # WiFi clients may probe less frequently + +# Clustering thresholds +MIN_CLUSTER_CONFIDENCE = 0.3 # Minimum confidence to consider clustering +HIGH_CONFIDENCE_THRESHOLD = 0.7 +VERY_HIGH_CONFIDENCE_THRESHOLD = 0.85 + +# RSSI proximity threshold for "same location" assessment +RSSI_PROXIMITY_THRESHOLD = 10 # dBm difference + +# Time window for temporal correlation +TEMPORAL_CORRELATION_WINDOW = timedelta(seconds=5) + +# Fingerprint weights (sum to 1.0 for normalization) +FINGERPRINT_WEIGHTS = { + 'manufacturer_data': 0.25, + 'service_uuids': 0.20, + 'capabilities': 0.15, + 'payload_structure': 0.15, + 'timing_pattern': 0.10, + 'rssi_trajectory': 0.10, + 'name_similarity': 0.05, +} + + +class AddressType(Enum): + """BLE address types per Bluetooth spec.""" + PUBLIC = 'public' + RANDOM_STATIC = 'random_static' + RPA = 'rpa' # Resolvable Private Address + NRPA = 'nrpa' # Non-Resolvable Private Address + UNKNOWN = 'unknown' + + +class AdvType(Enum): + """BLE advertisement types.""" + ADV_IND = 'ADV_IND' + ADV_DIRECT_IND = 'ADV_DIRECT_IND' + ADV_NONCONN_IND = 'ADV_NONCONN_IND' + ADV_SCAN_IND = 'ADV_SCAN_IND' + SCAN_RSP = 'SCAN_RSP' + UNKNOWN = 'unknown' + + +class WifiFrameType(Enum): + """WiFi frame types of interest.""" + BEACON = 'beacon' + PROBE_REQUEST = 'probe_request' + PROBE_RESPONSE = 'probe_response' + AUTH = 'auth' + ASSOC_REQUEST = 'assoc_request' + ASSOC_RESPONSE = 'assoc_response' + DEAUTH = 'deauth' + DISASSOC = 'disassoc' + DATA = 'data' + UNKNOWN = 'unknown' + + +class RiskLevel(Enum): + """TSCM risk levels for device clusters.""" + INFORMATIONAL = 'informational' + LOW = 'low' + MEDIUM = 'medium' + HIGH = 'high' + + +# ============================================================================= +# Observation Data Classes +# ============================================================================= + +@dataclass +class BLEObservation: + """Single BLE advertisement observation.""" + timestamp: datetime + addr: str # MAC-like address + addr_type: AddressType = AddressType.UNKNOWN + rssi: Optional[int] = None + tx_power: Optional[int] = None + adv_type: AdvType = AdvType.UNKNOWN + adv_flags: Optional[int] = None + manufacturer_id: Optional[int] = None + manufacturer_data: Optional[bytes] = None + service_uuids: list[str] = field(default_factory=list) + service_data: Optional[bytes] = None + local_name: Optional[str] = None + appearance: Optional[int] = None + packet_length: Optional[int] = None + phy: Optional[str] = None + + def __post_init__(self): + if isinstance(self.addr_type, str): + try: + self.addr_type = AddressType(self.addr_type) + except ValueError: + self.addr_type = AddressType.UNKNOWN + if isinstance(self.adv_type, str): + try: + self.adv_type = AdvType(self.adv_type) + except ValueError: + self.adv_type = AdvType.UNKNOWN + + def compute_fingerprint_hash(self) -> str: + """ + Compute a fingerprint hash based on stable (non-MAC) features. + + This hash helps identify similar payloads across different MACs. + """ + components = [] + + if self.manufacturer_id is not None: + components.append(f"mfg:{self.manufacturer_id:04x}") + + if self.manufacturer_data: + # Use first 8 bytes of manufacturer data (often contains device type) + data_prefix = self.manufacturer_data[:8].hex() + components.append(f"mfg_data:{data_prefix}") + + if self.service_uuids: + # Sort for consistency + uuids = sorted(set(self.service_uuids)) + components.append(f"uuids:{','.join(uuids)}") + + if self.adv_flags is not None: + components.append(f"flags:{self.adv_flags:02x}") + + if self.appearance is not None: + components.append(f"appear:{self.appearance:04x}") + + if self.tx_power is not None: + components.append(f"txp:{self.tx_power}") + + if self.packet_length is not None: + components.append(f"plen:{self.packet_length}") + + if not components: + return "" + + fingerprint_str = "|".join(components) + return hashlib.sha256(fingerprint_str.encode()).hexdigest()[:16] + + def is_randomized_address(self) -> bool: + """Check if the address appears to be randomized.""" + if self.addr_type in (AddressType.RPA, AddressType.NRPA): + return True + + # Check MAC address format for random bit + # Bit 1 of first octet set = locally administered (random) + try: + first_octet = int(self.addr.split(':')[0], 16) + return bool(first_octet & 0x02) + except (ValueError, IndexError): + return False + + +@dataclass +class WifiObservation: + """Single WiFi frame observation.""" + timestamp: datetime + src_mac: str + dst_mac: Optional[str] = None + bssid: Optional[str] = None + ssid: Optional[str] = None + frame_type: WifiFrameType = WifiFrameType.UNKNOWN + rssi: Optional[int] = None + channel: Optional[int] = None + bandwidth: Optional[int] = None # 20/40/80/160 + encryption: Optional[str] = None + beacon_interval: Optional[int] = None + capabilities: Optional[int] = None + supported_rates: list[float] = field(default_factory=list) + extended_rates: list[float] = field(default_factory=list) + ht_capable: bool = False + vht_capable: bool = False + he_capable: bool = False + ht_capabilities: Optional[int] = None + vht_capabilities: Optional[int] = None + vendor_ies: list[tuple[str, int]] = field(default_factory=list) # (OUI, length) + wps_present: bool = False + sequence_number: Optional[int] = None + probed_ssids: list[str] = field(default_factory=list) + + def __post_init__(self): + if isinstance(self.frame_type, str): + try: + self.frame_type = WifiFrameType(self.frame_type) + except ValueError: + self.frame_type = WifiFrameType.UNKNOWN + + def compute_fingerprint_hash(self) -> str: + """ + Compute a fingerprint hash based on stable capability features. + + For clients, this captures the "device type" signature. + """ + components = [] + + # Rate set fingerprint + all_rates = sorted(set(self.supported_rates + self.extended_rates)) + if all_rates: + components.append(f"rates:{','.join(str(r) for r in all_rates)}") + + # Capability fingerprint + caps = [] + if self.ht_capable: + caps.append('HT') + if self.vht_capable: + caps.append('VHT') + if self.he_capable: + caps.append('HE') + if caps: + components.append(f"caps:{'+'.join(caps)}") + + if self.ht_capabilities is not None: + components.append(f"htcap:{self.ht_capabilities:04x}") + + if self.vht_capabilities is not None: + components.append(f"vhtcap:{self.vht_capabilities:08x}") + + # Vendor IE fingerprint (OUIs only, not content) + if self.vendor_ies: + ouis = sorted(set(oui for oui, _ in self.vendor_ies)) + components.append(f"vie:{','.join(ouis)}") + + if self.capabilities is not None: + components.append(f"cap:{self.capabilities:04x}") + + if not components: + return "" + + fingerprint_str = "|".join(components) + return hashlib.sha256(fingerprint_str.encode()).hexdigest()[:16] + + def is_randomized_address(self) -> bool: + """Check if source MAC appears to be randomized.""" + try: + first_octet = int(self.src_mac.split(':')[0], 16) + return bool(first_octet & 0x02) + except (ValueError, IndexError): + return False + + +# ============================================================================= +# Session and Cluster Data Classes +# ============================================================================= + +@dataclass +class DeviceSession: + """ + A session represents a contiguous presence window of a device. + + Multiple observations from the same MAC (or clustered identity) within + the session gap threshold belong to the same session. + """ + session_id: str + protocol: str # 'ble' or 'wifi' + first_seen: datetime + last_seen: datetime + observations: list = field(default_factory=list) + primary_mac: Optional[str] = None + observed_macs: set[str] = field(default_factory=set) + fingerprint_hashes: set[str] = field(default_factory=set) + + # Aggregated metrics + rssi_samples: list[int] = field(default_factory=list) + observation_intervals: list[float] = field(default_factory=list) + + def add_observation(self, obs) -> None: + """Add an observation to this session.""" + self.observations.append(obs) + self.last_seen = obs.timestamp + + if hasattr(obs, 'addr'): + self.observed_macs.add(obs.addr) + if self.primary_mac is None: + self.primary_mac = obs.addr + elif hasattr(obs, 'src_mac'): + self.observed_macs.add(obs.src_mac) + if self.primary_mac is None: + self.primary_mac = obs.src_mac + + fp = obs.compute_fingerprint_hash() + if fp: + self.fingerprint_hashes.add(fp) + + if obs.rssi is not None: + self.rssi_samples.append(obs.rssi) + + # Calculate interval from previous observation + if len(self.observations) > 1: + prev = self.observations[-2] + interval = (obs.timestamp - prev.timestamp).total_seconds() + if interval > 0: + self.observation_intervals.append(interval) + + def get_duration(self) -> timedelta: + """Get session duration.""" + return self.last_seen - self.first_seen + + def get_mean_rssi(self) -> Optional[float]: + """Get mean RSSI across session.""" + if not self.rssi_samples: + return None + return statistics.mean(self.rssi_samples) + + def get_rssi_stability(self) -> float: + """ + Calculate RSSI stability (0-1, higher = more stable). + + Stable RSSI suggests a stationary device. + """ + if len(self.rssi_samples) < 3: + return 0.0 + try: + stdev = statistics.stdev(self.rssi_samples) + # Convert to 0-1 scale (stdev of 0 = 1.0, stdev of 20+ = ~0) + return max(0, 1 - (stdev / 20)) + except statistics.StatisticsError: + return 0.0 + + def get_mean_interval(self) -> Optional[float]: + """Get mean advertising/probing interval.""" + if not self.observation_intervals: + return None + return statistics.mean(self.observation_intervals) + + def to_dict(self) -> dict: + """Convert to dictionary for serialization.""" + return { + 'session_id': self.session_id, + 'protocol': self.protocol, + 'first_seen': self.first_seen.isoformat(), + 'last_seen': self.last_seen.isoformat(), + 'duration_seconds': self.get_duration().total_seconds(), + 'observation_count': len(self.observations), + 'primary_mac': self.primary_mac, + 'observed_macs': list(self.observed_macs), + 'fingerprint_hashes': list(self.fingerprint_hashes), + 'mean_rssi': self.get_mean_rssi(), + 'rssi_stability': self.get_rssi_stability(), + 'mean_interval': self.get_mean_interval(), + } + + +@dataclass +class RiskIndicator: + """A TSCM risk indicator for a device cluster.""" + indicator_type: str + description: str + score: int # 0-10 + evidence: dict = field(default_factory=dict) + timestamp: datetime = field(default_factory=datetime.now) + + def to_dict(self) -> dict: + return { + 'type': self.indicator_type, + 'description': self.description, + 'score': self.score, + 'evidence': self.evidence, + 'timestamp': self.timestamp.isoformat(), + } + + +@dataclass +class DeviceCluster: + """ + A cluster represents a probable physical device identity. + + Multiple sessions and MACs may be linked to the same cluster based + on fingerprint similarity, temporal correlation, and RSSI patterns. + """ + cluster_id: str + protocol: str + created_at: datetime = field(default_factory=datetime.now) + updated_at: datetime = field(default_factory=datetime.now) + + sessions: list[DeviceSession] = field(default_factory=list) + linked_macs: set[str] = field(default_factory=set) + fingerprint_hashes: set[str] = field(default_factory=set) + + # Cluster confidence and linking evidence + confidence: float = 0.0 + link_evidence: list[dict] = field(default_factory=list) + + # Best available identifiers + best_name: Optional[str] = None + manufacturer_id: Optional[int] = None + manufacturer_name: Optional[str] = None + device_type: Optional[str] = None + + # TSCM risk assessment + risk_level: RiskLevel = RiskLevel.INFORMATIONAL + risk_score: int = 0 + risk_indicators: list[RiskIndicator] = field(default_factory=list) + + # Behavioral profile + total_observations: int = 0 + first_seen: Optional[datetime] = None + last_seen: Optional[datetime] = None + presence_ratio: float = 0.0 # % of monitoring period device was present + + def add_session(self, session: DeviceSession, link_reason: str, + link_confidence: float) -> None: + """Add a session to this cluster with linking evidence.""" + self.sessions.append(session) + self.linked_macs.update(session.observed_macs) + self.fingerprint_hashes.update(session.fingerprint_hashes) + self.total_observations += len(session.observations) + self.updated_at = datetime.now() + + if self.first_seen is None or session.first_seen < self.first_seen: + self.first_seen = session.first_seen + if self.last_seen is None or session.last_seen > self.last_seen: + self.last_seen = session.last_seen + + self.link_evidence.append({ + 'session_id': session.session_id, + 'reason': link_reason, + 'confidence': link_confidence, + 'timestamp': datetime.now().isoformat(), + }) + + # Update overall confidence (weighted average) + if self.link_evidence: + self.confidence = statistics.mean( + e['confidence'] for e in self.link_evidence + ) + + def add_risk_indicator(self, indicator: RiskIndicator) -> None: + """Add a risk indicator and update risk assessment.""" + self.risk_indicators.append(indicator) + self.risk_score = sum(i.score for i in self.risk_indicators) + + # Update risk level based on score + if self.risk_score >= 15: + self.risk_level = RiskLevel.HIGH + elif self.risk_score >= 8: + self.risk_level = RiskLevel.MEDIUM + elif self.risk_score >= 3: + self.risk_level = RiskLevel.LOW + else: + self.risk_level = RiskLevel.INFORMATIONAL + + def get_all_rssi_samples(self) -> list[int]: + """Get all RSSI samples across all sessions.""" + samples = [] + for session in self.sessions: + samples.extend(session.rssi_samples) + return samples + + def to_dict(self) -> dict: + """Convert to dictionary for serialization.""" + return { + 'cluster_id': self.cluster_id, + 'protocol': self.protocol, + 'created_at': self.created_at.isoformat(), + 'updated_at': self.updated_at.isoformat(), + 'confidence': round(self.confidence, 3), + 'session_count': len(self.sessions), + 'linked_macs': list(self.linked_macs), + 'fingerprint_hashes': list(self.fingerprint_hashes), + 'best_name': self.best_name, + 'manufacturer_id': self.manufacturer_id, + 'manufacturer_name': self.manufacturer_name, + 'device_type': self.device_type, + 'risk_level': self.risk_level.value, + 'risk_score': self.risk_score, + 'risk_indicators': [i.to_dict() for i in self.risk_indicators], + 'total_observations': self.total_observations, + 'first_seen': self.first_seen.isoformat() if self.first_seen else None, + 'last_seen': self.last_seen.isoformat() if self.last_seen else None, + 'presence_ratio': round(self.presence_ratio, 3), + 'link_evidence': self.link_evidence, + 'sessions': [s.to_dict() for s in self.sessions], + } + + +# ============================================================================= +# Fingerprint Similarity Functions +# ============================================================================= + +def jaccard_similarity(set1: set, set2: set) -> float: + """Calculate Jaccard similarity between two sets.""" + if not set1 and not set2: + return 0.0 + intersection = len(set1 & set2) + union = len(set1 | set2) + return intersection / union if union > 0 else 0.0 + + +def manufacturer_data_similarity(data1: Optional[bytes], + data2: Optional[bytes]) -> float: + """ + Calculate similarity between manufacturer data blobs. + + Many devices include consistent patterns in manufacturer data + even when MAC randomizes. + """ + if not data1 or not data2: + return 0.0 + + # Compare lengths + len_sim = 1.0 - abs(len(data1) - len(data2)) / max(len(data1), len(data2)) + + # Compare common prefix (often contains device type info) + prefix_len = min(8, len(data1), len(data2)) + prefix_match = sum( + 1 for i in range(prefix_len) if data1[i] == data2[i] + ) / prefix_len if prefix_len > 0 else 0.0 + + # Compare full content via byte-level similarity + min_len = min(len(data1), len(data2)) + byte_matches = sum(1 for i in range(min_len) if data1[i] == data2[i]) + content_sim = byte_matches / max(len(data1), len(data2)) + + # Weight prefix more heavily (device type usually in prefix) + return 0.5 * prefix_match + 0.3 * content_sim + 0.2 * len_sim + + +def rssi_trajectory_similarity(samples1: list[int], + samples2: list[int], + time_window: float = 5.0) -> float: + """ + Calculate RSSI trajectory similarity. + + Devices at the same physical location show similar RSSI patterns. + This helps correlate observations that may be from the same device. + """ + if len(samples1) < 3 or len(samples2) < 3: + return 0.0 + + # Compare mean RSSI (proximity indicator) + mean1 = statistics.mean(samples1) + mean2 = statistics.mean(samples2) + mean_diff = abs(mean1 - mean2) + + # If means are very different, devices are likely in different locations + if mean_diff > 20: + return 0.0 + + mean_sim = 1.0 - (mean_diff / 20) + + # Compare RSSI variance (movement pattern) + try: + var1 = statistics.variance(samples1) + var2 = statistics.variance(samples2) + var_diff = abs(var1 - var2) + var_sim = 1.0 / (1.0 + var_diff / 50) + except statistics.StatisticsError: + var_sim = 0.5 + + return 0.6 * mean_sim + 0.4 * var_sim + + +def timing_pattern_similarity(intervals1: list[float], + intervals2: list[float]) -> float: + """ + Calculate advertising/probing interval similarity. + + Devices often have characteristic timing patterns. + """ + if len(intervals1) < 2 or len(intervals2) < 2: + return 0.0 + + mean1 = statistics.mean(intervals1) + mean2 = statistics.mean(intervals2) + + # Calculate relative difference + if mean1 == 0 or mean2 == 0: + return 0.0 + + ratio = min(mean1, mean2) / max(mean1, mean2) + + # Also compare variance in timing + try: + cv1 = statistics.stdev(intervals1) / mean1 if mean1 > 0 else 0 + cv2 = statistics.stdev(intervals2) / mean2 if mean2 > 0 else 0 + cv_sim = 1.0 - abs(cv1 - cv2) + except statistics.StatisticsError: + cv_sim = 0.5 + + return 0.7 * ratio + 0.3 * max(0, cv_sim) + + +def name_similarity(name1: Optional[str], name2: Optional[str]) -> float: + """Calculate similarity between device names.""" + if not name1 or not name2: + return 0.0 + + # Normalize names + n1 = name1.lower().strip() + n2 = name2.lower().strip() + + if n1 == n2: + return 1.0 + + # Check if one is prefix of other (common with truncation) + if n1.startswith(n2) or n2.startswith(n1): + return 0.8 + + # Simple character-level similarity + common = sum(1 for c in set(n1) if c in n2) + total = len(set(n1) | set(n2)) + return common / total if total > 0 else 0.0 + + +# ============================================================================= +# Device Identity Engine +# ============================================================================= + +class DeviceIdentityEngine: + """ + Main engine for MAC-randomization resistant device detection. + + Ingests BLE and WiFi observations, creates sessions, clusters them + into probable device identities, and generates TSCM risk assessments. + """ + + def __init__(self): + self.ble_sessions: dict[str, DeviceSession] = {} + self.wifi_sessions: dict[str, DeviceSession] = {} + self.clusters: dict[str, DeviceCluster] = {} + + # Fingerprint index for efficient lookup + self._fingerprint_to_sessions: dict[str, list[str]] = defaultdict(list) + + # Session counters + self._session_counter = 0 + self._cluster_counter = 0 + + # Monitoring period for presence calculation + self.monitoring_start: Optional[datetime] = None + self.monitoring_end: Optional[datetime] = None + + def _generate_session_id(self, protocol: str) -> str: + """Generate unique session ID.""" + self._session_counter += 1 + return f"{protocol}_{self._session_counter:06d}" + + def _generate_cluster_id(self, protocol: str) -> str: + """Generate unique cluster ID.""" + self._cluster_counter += 1 + return f"cluster_{protocol}_{self._cluster_counter:06d}" + + def ingest_ble_observation(self, obs: BLEObservation) -> DeviceSession: + """ + Ingest a BLE observation and return/update the associated session. + """ + if self.monitoring_start is None: + self.monitoring_start = obs.timestamp + self.monitoring_end = obs.timestamp + + # Find or create session for this MAC + session_key = f"ble_{obs.addr}" + + if session_key in self.ble_sessions: + session = self.ble_sessions[session_key] + # Check if this is a continuation or new session + gap = (obs.timestamp - session.last_seen).total_seconds() + if gap > BLE_SESSION_GAP: + # Close old session, start new one + self._finalize_session(session) + session = self._create_ble_session(obs) + self.ble_sessions[session_key] = session + else: + session.add_observation(obs) + else: + session = self._create_ble_session(obs) + self.ble_sessions[session_key] = session + + # Update fingerprint index + fp = obs.compute_fingerprint_hash() + if fp: + if session.session_id not in self._fingerprint_to_sessions[fp]: + self._fingerprint_to_sessions[fp].append(session.session_id) + + return session + + def _create_ble_session(self, obs: BLEObservation) -> DeviceSession: + """Create a new BLE session from initial observation.""" + session = DeviceSession( + session_id=self._generate_session_id('ble'), + protocol='ble', + first_seen=obs.timestamp, + last_seen=obs.timestamp, + ) + session.add_observation(obs) + return session + + def ingest_wifi_observation(self, obs: WifiObservation) -> DeviceSession: + """ + Ingest a WiFi observation and return/update the associated session. + """ + if self.monitoring_start is None: + self.monitoring_start = obs.timestamp + self.monitoring_end = obs.timestamp + + # For WiFi, track by source MAC + session_key = f"wifi_{obs.src_mac}" + + if session_key in self.wifi_sessions: + session = self.wifi_sessions[session_key] + gap = (obs.timestamp - session.last_seen).total_seconds() + if gap > WIFI_SESSION_GAP: + self._finalize_session(session) + session = self._create_wifi_session(obs) + self.wifi_sessions[session_key] = session + else: + session.add_observation(obs) + else: + session = self._create_wifi_session(obs) + self.wifi_sessions[session_key] = session + + # Update fingerprint index + fp = obs.compute_fingerprint_hash() + if fp: + if session.session_id not in self._fingerprint_to_sessions[fp]: + self._fingerprint_to_sessions[fp].append(session.session_id) + + return session + + def _create_wifi_session(self, obs: WifiObservation) -> DeviceSession: + """Create a new WiFi session from initial observation.""" + session = DeviceSession( + session_id=self._generate_session_id('wifi'), + protocol='wifi', + first_seen=obs.timestamp, + last_seen=obs.timestamp, + ) + session.add_observation(obs) + return session + + def _finalize_session(self, session: DeviceSession) -> None: + """Finalize a session and attempt to cluster it.""" + # Try to find existing cluster for this session + cluster = self._find_matching_cluster(session) + + if cluster: + # Add to existing cluster + similarity = self._calculate_cluster_similarity(cluster, session) + cluster.add_session( + session, + link_reason=f"Fingerprint/behavioral match", + link_confidence=similarity + ) + else: + # Create new cluster + cluster = self._create_cluster_from_session(session) + self.clusters[cluster.cluster_id] = cluster + + # Run risk assessment on the cluster + self._assess_cluster_risk(cluster) + + def _find_matching_cluster(self, session: DeviceSession) -> Optional[DeviceCluster]: + """ + Find an existing cluster that matches this session. + + Uses fingerprint matching, temporal correlation, and RSSI similarity. + """ + best_match = None + best_score = MIN_CLUSTER_CONFIDENCE + + for cluster in self.clusters.values(): + if cluster.protocol != session.protocol: + continue + + similarity = self._calculate_cluster_similarity(cluster, session) + if similarity > best_score: + best_score = similarity + best_match = cluster + + return best_match + + def _calculate_cluster_similarity(self, cluster: DeviceCluster, + session: DeviceSession) -> float: + """ + Calculate similarity between a cluster and a session. + + Returns a confidence score 0-1. + """ + scores = {} + + # 1. Fingerprint hash matching (strongest signal) + fp_overlap = cluster.fingerprint_hashes & session.fingerprint_hashes + if fp_overlap: + fp_score = len(fp_overlap) / max( + len(cluster.fingerprint_hashes), + len(session.fingerprint_hashes) + ) + scores['fingerprint'] = min(1.0, fp_score * 1.5) # Boost for exact match + + # 2. Manufacturer data similarity + cluster_mfg_data = self._get_cluster_manufacturer_data(cluster) + session_mfg_data = self._get_session_manufacturer_data(session) + if cluster_mfg_data and session_mfg_data: + scores['manufacturer_data'] = manufacturer_data_similarity( + cluster_mfg_data, session_mfg_data + ) + + # 3. Service UUID overlap + cluster_uuids = self._get_cluster_service_uuids(cluster) + session_uuids = self._get_session_service_uuids(session) + if cluster_uuids or session_uuids: + scores['service_uuids'] = jaccard_similarity( + cluster_uuids, session_uuids + ) + + # 4. RSSI trajectory similarity + cluster_rssi = cluster.get_all_rssi_samples() + if cluster_rssi and session.rssi_samples: + scores['rssi_trajectory'] = rssi_trajectory_similarity( + cluster_rssi, session.rssi_samples + ) + + # 5. Timing pattern similarity + cluster_intervals = self._get_cluster_intervals(cluster) + if cluster_intervals and session.observation_intervals: + scores['timing_pattern'] = timing_pattern_similarity( + cluster_intervals, session.observation_intervals + ) + + # 6. Name similarity + session_name = self._get_session_name(session) + if cluster.best_name and session_name: + scores['name_similarity'] = name_similarity( + cluster.best_name, session_name + ) + + if not scores: + return 0.0 + + # Weighted average + total_weight = 0.0 + weighted_sum = 0.0 + + for key, score in scores.items(): + weight = FINGERPRINT_WEIGHTS.get(key, 0.1) + weighted_sum += score * weight + total_weight += weight + + return weighted_sum / total_weight if total_weight > 0 else 0.0 + + def _get_cluster_manufacturer_data(self, cluster: DeviceCluster) -> Optional[bytes]: + """Get representative manufacturer data from cluster.""" + for session in cluster.sessions: + for obs in session.observations: + if hasattr(obs, 'manufacturer_data') and obs.manufacturer_data: + return obs.manufacturer_data + return None + + def _get_session_manufacturer_data(self, session: DeviceSession) -> Optional[bytes]: + """Get manufacturer data from session.""" + for obs in session.observations: + if hasattr(obs, 'manufacturer_data') and obs.manufacturer_data: + return obs.manufacturer_data + return None + + def _get_cluster_service_uuids(self, cluster: DeviceCluster) -> set[str]: + """Get all service UUIDs from cluster.""" + uuids = set() + for session in cluster.sessions: + for obs in session.observations: + if hasattr(obs, 'service_uuids') and obs.service_uuids: + uuids.update(obs.service_uuids) + return uuids + + def _get_session_service_uuids(self, session: DeviceSession) -> set[str]: + """Get service UUIDs from session.""" + uuids = set() + for obs in session.observations: + if hasattr(obs, 'service_uuids') and obs.service_uuids: + uuids.update(obs.service_uuids) + return uuids + + def _get_cluster_intervals(self, cluster: DeviceCluster) -> list[float]: + """Get all observation intervals from cluster.""" + intervals = [] + for session in cluster.sessions: + intervals.extend(session.observation_intervals) + return intervals + + def _get_session_name(self, session: DeviceSession) -> Optional[str]: + """Get device name from session.""" + for obs in session.observations: + if hasattr(obs, 'local_name') and obs.local_name: + return obs.local_name + return None + + def _create_cluster_from_session(self, session: DeviceSession) -> DeviceCluster: + """Create a new cluster from a session.""" + cluster = DeviceCluster( + cluster_id=self._generate_cluster_id(session.protocol), + protocol=session.protocol, + ) + + cluster.add_session( + session, + link_reason="Initial session", + link_confidence=1.0 + ) + + # Extract identifying information + for obs in session.observations: + if hasattr(obs, 'local_name') and obs.local_name: + cluster.best_name = obs.local_name + if hasattr(obs, 'manufacturer_id') and obs.manufacturer_id: + cluster.manufacturer_id = obs.manufacturer_id + + return cluster + + def _assess_cluster_risk(self, cluster: DeviceCluster) -> None: + """ + Assess TSCM risk indicators for a cluster. + + Flags behaviors that may indicate surveillance devices: + - High presence ratio (always present) + - Stable RSSI (stationary/hidden device) + - Audio-capable services + - ESP32/generic chipsets + - Suspicious advertising patterns + - MAC rotation patterns + """ + # Calculate presence ratio + if self.monitoring_start and self.monitoring_end: + total_duration = (self.monitoring_end - self.monitoring_start).total_seconds() + if total_duration > 0 and cluster.first_seen and cluster.last_seen: + presence_duration = (cluster.last_seen - cluster.first_seen).total_seconds() + cluster.presence_ratio = min(1.0, presence_duration / total_duration) + + # Risk: High presence ratio (device always present) + if cluster.presence_ratio > 0.8: + cluster.add_risk_indicator(RiskIndicator( + indicator_type='high_presence', + description='Device present for >80% of monitoring period', + score=2, + evidence={'presence_ratio': round(cluster.presence_ratio, 2)} + )) + + # Risk: Very stable RSSI (stationary device) + rssi_samples = cluster.get_all_rssi_samples() + if len(rssi_samples) >= 5: + try: + stdev = statistics.stdev(rssi_samples) + if stdev < 3: + cluster.add_risk_indicator(RiskIndicator( + indicator_type='stable_rssi', + description='Very stable signal suggests fixed placement', + score=2, + evidence={ + 'rssi_stdev': round(stdev, 2), + 'sample_count': len(rssi_samples) + } + )) + except statistics.StatisticsError: + pass + + # Risk: Multiple MAC addresses observed (MAC rotation) + if len(cluster.linked_macs) > 1: + cluster.add_risk_indicator(RiskIndicator( + indicator_type='mac_rotation', + description=f'Multiple MACs ({len(cluster.linked_macs)}) linked to same device', + score=1, + evidence={'mac_count': len(cluster.linked_macs)} + )) + + # Risk: Check for suspicious manufacturer IDs + if cluster.manufacturer_id: + suspicious_mfg = { + 0x02E5: ('Espressif', 3, 'Programmable ESP32/ESP8266 device'), + } + if cluster.manufacturer_id in suspicious_mfg: + name, score, desc = suspicious_mfg[cluster.manufacturer_id] + cluster.add_risk_indicator(RiskIndicator( + indicator_type='suspicious_chipset', + description=desc, + score=score, + evidence={'manufacturer': name, 'id': hex(cluster.manufacturer_id)} + )) + + # Risk: Check for audio-capable services (BLE) + audio_service_prefixes = ['0000110', '00001108', '00001203'] # A2DP, Headset, Audio + cluster_uuids = set() + for session in cluster.sessions: + cluster_uuids.update(self._get_session_service_uuids(session)) + + for uuid in cluster_uuids: + if any(uuid.lower().startswith(prefix) for prefix in audio_service_prefixes): + cluster.add_risk_indicator(RiskIndicator( + indicator_type='audio_capable', + description='Audio-capable BLE services detected', + score=2, + evidence={'service_uuid': uuid} + )) + break + + # Risk: No name advertised (hidden identity) + if not cluster.best_name: + cluster.add_risk_indicator(RiskIndicator( + indicator_type='no_name', + description='Device does not advertise a name', + score=1, + evidence={} + )) + + # Risk: High observation count relative to duration (aggressive advertising) + if cluster.first_seen and cluster.last_seen: + duration = (cluster.last_seen - cluster.first_seen).total_seconds() + if duration > 60 and cluster.total_observations > 0: + obs_rate = cluster.total_observations / duration + if obs_rate > 2.0: # More than 2 observations per second + cluster.add_risk_indicator(RiskIndicator( + indicator_type='high_ad_rate', + description='Unusually high advertising rate', + score=2, + evidence={ + 'rate': round(obs_rate, 2), + 'observations': cluster.total_observations, + 'duration': round(duration, 1) + } + )) + + def finalize_all_sessions(self) -> None: + """Finalize all active sessions (call at end of monitoring).""" + for session in list(self.ble_sessions.values()): + self._finalize_session(session) + for session in list(self.wifi_sessions.values()): + self._finalize_session(session) + + def get_clusters(self, min_confidence: float = 0.0) -> list[DeviceCluster]: + """Get all clusters above minimum confidence.""" + return [ + c for c in self.clusters.values() + if c.confidence >= min_confidence + ] + + def get_high_risk_clusters(self) -> list[DeviceCluster]: + """Get clusters with HIGH risk level.""" + return [ + c for c in self.clusters.values() + if c.risk_level == RiskLevel.HIGH + ] + + def get_summary(self) -> dict: + """Get summary of all clusters and sessions.""" + clusters_by_risk = { + 'high': [], + 'medium': [], + 'low': [], + 'informational': [] + } + + for cluster in self.clusters.values(): + clusters_by_risk[cluster.risk_level.value].append(cluster.to_dict()) + + return { + 'monitoring_period': { + 'start': self.monitoring_start.isoformat() if self.monitoring_start else None, + 'end': self.monitoring_end.isoformat() if self.monitoring_end else None, + 'duration_seconds': ( + (self.monitoring_end - self.monitoring_start).total_seconds() + if self.monitoring_start and self.monitoring_end else 0 + ) + }, + 'statistics': { + 'total_clusters': len(self.clusters), + 'ble_sessions': len(self.ble_sessions), + 'wifi_sessions': len(self.wifi_sessions), + 'high_risk_count': len(clusters_by_risk['high']), + 'medium_risk_count': len(clusters_by_risk['medium']), + 'low_risk_count': len(clusters_by_risk['low']), + 'unique_fingerprints': len(self._fingerprint_to_sessions), + }, + 'clusters_by_risk': clusters_by_risk, + 'disclaimer': ( + "Device clustering uses passive fingerprinting and statistical correlation. " + "Results indicate probable device identities, NOT confirmed matches. " + "Confidence scores reflect similarity measures, not certainty. " + "False positives and false negatives are expected." + ), + } + + def clear(self) -> None: + """Clear all state.""" + self.ble_sessions.clear() + self.wifi_sessions.clear() + self.clusters.clear() + self._fingerprint_to_sessions.clear() + self._session_counter = 0 + self._cluster_counter = 0 + self.monitoring_start = None + self.monitoring_end = None + + +# ============================================================================= +# Convenience Functions +# ============================================================================= + +# Global engine instance +_identity_engine: Optional[DeviceIdentityEngine] = None + + +def get_identity_engine() -> DeviceIdentityEngine: + """Get or create the global identity engine.""" + global _identity_engine + if _identity_engine is None: + _identity_engine = DeviceIdentityEngine() + return _identity_engine + + +def reset_identity_engine() -> None: + """Reset the global identity engine.""" + global _identity_engine + _identity_engine = DeviceIdentityEngine() + + +def ingest_ble_dict(data: dict) -> DeviceSession: + """ + Ingest BLE observation from dictionary. + + Convenience function for API integration. + """ + obs = BLEObservation( + timestamp=datetime.fromisoformat(data['timestamp']) if isinstance(data.get('timestamp'), str) + else data.get('timestamp', datetime.now()), + addr=data.get('addr', data.get('mac', '')).upper(), + addr_type=data.get('addr_type', 'unknown'), + rssi=data.get('rssi'), + tx_power=data.get('tx_power'), + adv_type=data.get('adv_type', 'unknown'), + adv_flags=data.get('adv_flags'), + manufacturer_id=data.get('manufacturer_id'), + manufacturer_data=bytes.fromhex(data['manufacturer_data']) if data.get('manufacturer_data') else None, + service_uuids=data.get('service_uuids', []), + service_data=bytes.fromhex(data['service_data']) if data.get('service_data') else None, + local_name=data.get('local_name', data.get('name')), + appearance=data.get('appearance'), + packet_length=data.get('packet_length'), + phy=data.get('phy'), + ) + return get_identity_engine().ingest_ble_observation(obs) + + +def ingest_wifi_dict(data: dict) -> DeviceSession: + """ + Ingest WiFi observation from dictionary. + + Convenience function for API integration. + """ + obs = WifiObservation( + timestamp=datetime.fromisoformat(data['timestamp']) if isinstance(data.get('timestamp'), str) + else data.get('timestamp', datetime.now()), + src_mac=data.get('src_mac', data.get('mac', '')).upper(), + dst_mac=data.get('dst_mac'), + bssid=data.get('bssid'), + ssid=data.get('ssid'), + frame_type=data.get('frame_type', 'unknown'), + rssi=data.get('rssi'), + channel=data.get('channel'), + bandwidth=data.get('bandwidth'), + encryption=data.get('encryption'), + beacon_interval=data.get('beacon_interval'), + capabilities=data.get('capabilities'), + supported_rates=data.get('supported_rates', []), + extended_rates=data.get('extended_rates', []), + ht_capable=data.get('ht_capable', False), + vht_capable=data.get('vht_capable', False), + he_capable=data.get('he_capable', False), + ht_capabilities=data.get('ht_capabilities'), + vht_capabilities=data.get('vht_capabilities'), + vendor_ies=data.get('vendor_ies', []), + wps_present=data.get('wps_present', False), + sequence_number=data.get('sequence_number'), + probed_ssids=data.get('probed_ssids', []), + ) + return get_identity_engine().ingest_wifi_observation(obs)