Merge branch 'main' into feature/login-system

This commit is contained in:
Jon Ander Oribe
2026-01-18 08:56:06 +01:00
60 changed files with 31086 additions and 9308 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -52,7 +52,7 @@ TOOL_DEPENDENCIES = {
'install': {
'apt': 'sudo apt install multimon-ng',
'brew': 'brew install multimon-ng',
'manual': 'https://github.com/EliasOewornal/multimon-ng'
'manual': 'https://github.com/EliasOenal/multimon-ng'
}
},
'rtl_test': {
@@ -195,6 +195,43 @@ TOOL_DEPENDENCIES = {
}
}
},
'acars': {
'name': 'Aircraft Messaging (ACARS)',
'tools': {
'acarsdec': {
'required': True,
'description': 'ACARS VHF decoder',
'install': {
'apt': 'Run ./setup.sh (builds from source)',
'brew': 'Run ./setup.sh (builds from source)',
'manual': 'https://github.com/TLeconte/acarsdec'
}
}
}
},
'aprs': {
'name': 'APRS Tracking',
'tools': {
'direwolf': {
'required': False,
'description': 'APRS/packet radio decoder (preferred)',
'install': {
'apt': 'sudo apt install direwolf',
'brew': 'brew install direwolf',
'manual': 'https://github.com/wb2osz/direwolf'
}
},
'multimon-ng': {
'required': False,
'description': 'Alternative AFSK1200 decoder',
'install': {
'apt': 'sudo apt install multimon-ng',
'brew': 'brew install multimon-ng',
'manual': 'https://github.com/EliasOenal/multimon-ng'
}
}
}
},
'satellite': {
'name': 'Satellite Tracking',
'tools': {
@@ -274,6 +311,56 @@ TOOL_DEPENDENCIES = {
}
}
}
},
'tscm': {
'name': 'TSCM Counter-Surveillance',
'tools': {
'rtl_power': {
'required': False,
'description': 'Wideband spectrum sweep for RF analysis',
'install': {
'apt': 'sudo apt install rtl-sdr',
'brew': 'brew install librtlsdr',
'manual': 'https://osmocom.org/projects/rtl-sdr/wiki'
}
},
'rtl_fm': {
'required': True,
'description': 'RF signal demodulation',
'install': {
'apt': 'sudo apt install rtl-sdr',
'brew': 'brew install librtlsdr',
'manual': 'https://osmocom.org/projects/rtl-sdr/wiki'
}
},
'rtl_433': {
'required': False,
'description': 'ISM band device decoding',
'install': {
'apt': 'sudo apt install rtl-433',
'brew': 'brew install rtl_433',
'manual': 'https://github.com/merbanan/rtl_433'
}
},
'airmon-ng': {
'required': False,
'description': 'WiFi monitor mode for network scanning',
'install': {
'apt': 'sudo apt install aircrack-ng',
'brew': 'Not available on macOS',
'manual': 'https://aircrack-ng.org'
}
},
'bluetoothctl': {
'required': False,
'description': 'Bluetooth device scanning',
'install': {
'apt': 'sudo apt install bluez',
'brew': 'Not available on macOS (use native)',
'manual': 'http://www.bluez.org'
}
}
}
}
}

View File

@@ -16,6 +16,7 @@ def get_logger(name: str) -> logging.Logger:
handler.setFormatter(logging.Formatter(LOG_FORMAT))
logger.addHandler(handler)
logger.setLevel(LOG_LEVEL)
logger.propagate = False # Prevent duplicate logs from parent handlers
return logger

View File

@@ -134,8 +134,14 @@ class HackRFCommandBuilder(CommandBuilder):
Build rtl_433 command with SoapySDR support for ISM band decoding.
rtl_433 has native SoapySDR support via -d flag.
Note: rtl_433's -T flag is for timeout, NOT bias-t.
For SoapySDR devices, bias-t is passed as a device setting.
"""
# Build device string with optional bias-t setting
device_str = self._build_device_string(device)
if bias_t:
device_str = f'{device_str},bias_t=1'
cmd = [
'rtl_433',
@@ -147,9 +153,6 @@ class HackRFCommandBuilder(CommandBuilder):
if gain is not None and gain > 0:
cmd.extend(['-g', str(int(gain))])
if bias_t:
cmd.extend(['-T'])
return cmd
def get_capabilities(self) -> SDRCapabilities:

View File

@@ -10,6 +10,7 @@ from __future__ import annotations
from typing import Optional
from .base import CommandBuilder, SDRCapabilities, SDRDevice, SDRType
from utils.dependencies import get_tool_path
class RTLSDRCommandBuilder(CommandBuilder):
@@ -53,8 +54,9 @@ class RTLSDRCommandBuilder(CommandBuilder):
Used for pager decoding. Supports local devices and rtl_tcp connections.
"""
rtl_fm_path = get_tool_path('rtl_fm') or 'rtl_fm'
cmd = [
'rtl_fm',
rtl_fm_path,
'-d', self._get_device_arg(device),
'-f', f'{frequency_mhz}M',
'-M', modulation,
@@ -99,8 +101,9 @@ class RTLSDRCommandBuilder(CommandBuilder):
"connect to its SBS output (port 30003)."
)
dump1090_path = get_tool_path('dump1090') or 'dump1090'
cmd = [
'dump1090',
dump1090_path,
'--net',
'--device-index', str(device.index),
'--quiet'
@@ -126,10 +129,22 @@ class RTLSDRCommandBuilder(CommandBuilder):
Build rtl_433 command for ISM band sensor decoding.
Outputs JSON for easy parsing. Supports local devices and rtl_tcp connections.
Note: rtl_433's -T flag is for timeout, NOT bias-t.
Bias-t is enabled via the device string suffix :biast=1
"""
rtl_433_path = get_tool_path('rtl_433') or 'rtl_433'
# Build device argument with optional bias-t suffix
# rtl_433 uses :biast=1 suffix on device string, not -T flag
# (-T is timeout in rtl_433)
device_arg = self._get_device_arg(device)
if bias_t:
device_arg = f'{device_arg}:biast=1'
cmd = [
'rtl_433',
'-d', self._get_device_arg(device),
rtl_433_path,
'-d', device_arg,
'-f', f'{frequency_mhz}M',
'-F', 'json'
]
@@ -140,9 +155,6 @@ class RTLSDRCommandBuilder(CommandBuilder):
if ppm is not None and ppm != 0:
cmd.extend(['-p', str(ppm)])
if bias_t:
cmd.extend(['-T'])
return cmd
def get_capabilities(self) -> SDRCapabilities:

11
utils/tscm/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
"""
TSCM (Technical Surveillance Countermeasures) Utilities Package
Provides baseline recording, threat detection, correlation analysis,
BLE scanning, and MAC-randomization resistant device identity tools
for counter-surveillance operations.
"""
from __future__ import annotations
__all__ = ['detector', 'baseline', 'correlation', 'ble_scanner', 'device_identity']

2185
utils/tscm/advanced.py Normal file

File diff suppressed because it is too large Load Diff

388
utils/tscm/baseline.py Normal file
View File

@@ -0,0 +1,388 @@
"""
TSCM Baseline Recording and Comparison
Records environment "fingerprints" and compares current scans
against baselines to detect new or anomalous devices.
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Any
from utils.database import (
create_tscm_baseline,
get_active_tscm_baseline,
get_tscm_baseline,
update_tscm_baseline,
)
logger = logging.getLogger('intercept.tscm.baseline')
class BaselineRecorder:
"""
Records and manages TSCM environment baselines.
"""
def __init__(self):
self.recording = False
self.current_baseline_id: int | None = None
self.wifi_networks: dict[str, dict] = {} # BSSID -> network info
self.bt_devices: dict[str, dict] = {} # MAC -> device info
self.rf_frequencies: dict[float, dict] = {} # Frequency -> signal info
def start_recording(
self,
name: str,
location: str | None = None,
description: str | None = None
) -> int:
"""
Start recording a new baseline.
Args:
name: Baseline name
location: Optional location description
description: Optional description
Returns:
Baseline ID
"""
self.recording = True
self.wifi_networks = {}
self.bt_devices = {}
self.rf_frequencies = {}
# Create baseline in database
self.current_baseline_id = create_tscm_baseline(
name=name,
location=location,
description=description
)
logger.info(f"Started baseline recording: {name} (ID: {self.current_baseline_id})")
return self.current_baseline_id
def stop_recording(self) -> dict:
"""
Stop recording and finalize baseline.
Returns:
Final baseline summary
"""
if not self.recording or not self.current_baseline_id:
return {'error': 'Not recording'}
self.recording = False
# Convert to lists for storage
wifi_list = list(self.wifi_networks.values())
bt_list = list(self.bt_devices.values())
rf_list = list(self.rf_frequencies.values())
# Update database
update_tscm_baseline(
self.current_baseline_id,
wifi_networks=wifi_list,
bt_devices=bt_list,
rf_frequencies=rf_list
)
summary = {
'baseline_id': self.current_baseline_id,
'wifi_count': len(wifi_list),
'bt_count': len(bt_list),
'rf_count': len(rf_list),
}
logger.info(
f"Baseline recording complete: {summary['wifi_count']} WiFi, "
f"{summary['bt_count']} BT, {summary['rf_count']} RF"
)
baseline_id = self.current_baseline_id
self.current_baseline_id = None
return summary
def add_wifi_device(self, device: dict) -> None:
"""Add a WiFi device to the current baseline."""
if not self.recording:
return
mac = device.get('bssid', device.get('mac', '')).upper()
if not mac:
return
# Update or add device
if mac in self.wifi_networks:
# Update with latest info
self.wifi_networks[mac].update({
'last_seen': datetime.now().isoformat(),
'power': device.get('power', self.wifi_networks[mac].get('power')),
})
else:
self.wifi_networks[mac] = {
'bssid': mac,
'essid': device.get('essid', device.get('ssid', '')),
'channel': device.get('channel'),
'power': device.get('power', device.get('signal')),
'vendor': device.get('vendor', ''),
'encryption': device.get('privacy', device.get('encryption', '')),
'first_seen': datetime.now().isoformat(),
'last_seen': datetime.now().isoformat(),
}
def add_bt_device(self, device: dict) -> None:
"""Add a Bluetooth device to the current baseline."""
if not self.recording:
return
mac = device.get('mac', device.get('address', '')).upper()
if not mac:
return
if mac in self.bt_devices:
self.bt_devices[mac].update({
'last_seen': datetime.now().isoformat(),
'rssi': device.get('rssi', self.bt_devices[mac].get('rssi')),
})
else:
self.bt_devices[mac] = {
'mac': mac,
'name': device.get('name', ''),
'rssi': device.get('rssi', device.get('signal')),
'manufacturer': device.get('manufacturer', ''),
'type': device.get('type', ''),
'first_seen': datetime.now().isoformat(),
'last_seen': datetime.now().isoformat(),
}
def add_rf_signal(self, signal: dict) -> None:
"""Add an RF signal to the current baseline."""
if not self.recording:
return
frequency = signal.get('frequency')
if not frequency:
return
# Round to 0.1 MHz for grouping
freq_key = round(frequency, 1)
if freq_key in self.rf_frequencies:
existing = self.rf_frequencies[freq_key]
existing['last_seen'] = datetime.now().isoformat()
existing['hit_count'] = existing.get('hit_count', 1) + 1
# Update max signal level
new_level = signal.get('level', signal.get('power', -100))
if new_level > existing.get('max_level', -100):
existing['max_level'] = new_level
else:
self.rf_frequencies[freq_key] = {
'frequency': freq_key,
'level': signal.get('level', signal.get('power')),
'max_level': signal.get('level', signal.get('power', -100)),
'modulation': signal.get('modulation', ''),
'first_seen': datetime.now().isoformat(),
'last_seen': datetime.now().isoformat(),
'hit_count': 1,
}
def get_recording_status(self) -> dict:
"""Get current recording status and counts."""
return {
'recording': self.recording,
'baseline_id': self.current_baseline_id,
'wifi_count': len(self.wifi_networks),
'bt_count': len(self.bt_devices),
'rf_count': len(self.rf_frequencies),
}
class BaselineComparator:
"""
Compares current scan results against a baseline.
"""
def __init__(self, baseline: dict):
"""
Initialize comparator with a baseline.
Args:
baseline: Baseline dict from database
"""
self.baseline = baseline
self.baseline_wifi = {
d.get('bssid', d.get('mac', '')).upper(): d
for d in baseline.get('wifi_networks', [])
if d.get('bssid') or d.get('mac')
}
self.baseline_bt = {
d.get('mac', d.get('address', '')).upper(): d
for d in baseline.get('bt_devices', [])
if d.get('mac') or d.get('address')
}
self.baseline_rf = {
round(d.get('frequency', 0), 1): d
for d in baseline.get('rf_frequencies', [])
if d.get('frequency')
}
def compare_wifi(self, current_devices: list[dict]) -> dict:
"""
Compare current WiFi devices against baseline.
Returns:
Dict with new, missing, and matching devices
"""
current_macs = {
d.get('bssid', d.get('mac', '')).upper(): d
for d in current_devices
if d.get('bssid') or d.get('mac')
}
new_devices = []
missing_devices = []
matching_devices = []
# Find new devices
for mac, device in current_macs.items():
if mac not in self.baseline_wifi:
new_devices.append(device)
else:
matching_devices.append(device)
# Find missing devices
for mac, device in self.baseline_wifi.items():
if mac not in current_macs:
missing_devices.append(device)
return {
'new': new_devices,
'missing': missing_devices,
'matching': matching_devices,
'new_count': len(new_devices),
'missing_count': len(missing_devices),
'matching_count': len(matching_devices),
}
def compare_bluetooth(self, current_devices: list[dict]) -> dict:
"""Compare current Bluetooth devices against baseline."""
current_macs = {
d.get('mac', d.get('address', '')).upper(): d
for d in current_devices
if d.get('mac') or d.get('address')
}
new_devices = []
missing_devices = []
matching_devices = []
for mac, device in current_macs.items():
if mac not in self.baseline_bt:
new_devices.append(device)
else:
matching_devices.append(device)
for mac, device in self.baseline_bt.items():
if mac not in current_macs:
missing_devices.append(device)
return {
'new': new_devices,
'missing': missing_devices,
'matching': matching_devices,
'new_count': len(new_devices),
'missing_count': len(missing_devices),
'matching_count': len(matching_devices),
}
def compare_rf(self, current_signals: list[dict]) -> dict:
"""Compare current RF signals against baseline."""
current_freqs = {
round(s.get('frequency', 0), 1): s
for s in current_signals
if s.get('frequency')
}
new_signals = []
missing_signals = []
matching_signals = []
for freq, signal in current_freqs.items():
if freq not in self.baseline_rf:
new_signals.append(signal)
else:
matching_signals.append(signal)
for freq, signal in self.baseline_rf.items():
if freq not in current_freqs:
missing_signals.append(signal)
return {
'new': new_signals,
'missing': missing_signals,
'matching': matching_signals,
'new_count': len(new_signals),
'missing_count': len(missing_signals),
'matching_count': len(matching_signals),
}
def compare_all(
self,
wifi_devices: list[dict] | None = None,
bt_devices: list[dict] | None = None,
rf_signals: list[dict] | None = None
) -> dict:
"""
Compare all current data against baseline.
Returns:
Dict with comparison results for each category
"""
results = {
'wifi': None,
'bluetooth': None,
'rf': None,
'total_new': 0,
'total_missing': 0,
}
if wifi_devices is not None:
results['wifi'] = self.compare_wifi(wifi_devices)
results['total_new'] += results['wifi']['new_count']
results['total_missing'] += results['wifi']['missing_count']
if bt_devices is not None:
results['bluetooth'] = self.compare_bluetooth(bt_devices)
results['total_new'] += results['bluetooth']['new_count']
results['total_missing'] += results['bluetooth']['missing_count']
if rf_signals is not None:
results['rf'] = self.compare_rf(rf_signals)
results['total_new'] += results['rf']['new_count']
results['total_missing'] += results['rf']['missing_count']
return results
def get_comparison_for_active_baseline(
wifi_devices: list[dict] | None = None,
bt_devices: list[dict] | None = None,
rf_signals: list[dict] | None = None
) -> dict | None:
"""
Convenience function to compare against the active baseline.
Returns:
Comparison results or None if no active baseline
"""
baseline = get_active_tscm_baseline()
if not baseline:
return None
comparator = BaselineComparator(baseline)
return comparator.compare_all(wifi_devices, bt_devices, rf_signals)

476
utils/tscm/ble_scanner.py Normal file
View File

@@ -0,0 +1,476 @@
"""
BLE Scanner for TSCM
Cross-platform BLE scanning with manufacturer data detection.
Supports macOS and Linux using the bleak library with fallback to system tools.
Detects:
- Apple AirTags (company ID 0x004C)
- Tile trackers
- Samsung SmartTags
- ESP32/ESP8266 devices (Espressif, company ID 0x02E5)
- Generic BLE devices with suspicious characteristics
"""
import asyncio
import logging
import platform
import re
import subprocess
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
logger = logging.getLogger('intercept.tscm.ble')
# Manufacturer company IDs (Bluetooth SIG assigned)
COMPANY_IDS = {
0x004C: 'Apple',
0x02E5: 'Espressif',
0x0059: 'Nordic Semiconductor',
0x000D: 'Texas Instruments',
0x0075: 'Samsung',
0x00E0: 'Google',
0x0006: 'Microsoft',
0x01DA: 'Tile',
}
# Known tracker signatures
TRACKER_SIGNATURES = {
# Apple AirTag detection patterns
'airtag': {
'company_id': 0x004C,
'data_patterns': [
b'\x12\x19', # AirTag/Find My advertisement prefix
b'\x07\x19', # Offline Finding
],
'name_patterns': ['airtag', 'findmy', 'find my'],
},
# Tile tracker
'tile': {
'company_id': 0x01DA,
'name_patterns': ['tile'],
},
# Samsung SmartTag
'smarttag': {
'company_id': 0x0075,
'name_patterns': ['smarttag', 'smart tag', 'galaxy smart'],
},
# ESP32/ESP8266
'espressif': {
'company_id': 0x02E5,
'name_patterns': ['esp32', 'esp8266', 'espressif'],
},
}
@dataclass
class BLEDevice:
"""Represents a detected BLE device with full advertisement data."""
mac: str
name: Optional[str] = None
rssi: Optional[int] = None
manufacturer_id: Optional[int] = None
manufacturer_name: Optional[str] = None
manufacturer_data: bytes = field(default_factory=bytes)
service_uuids: list = field(default_factory=list)
tx_power: Optional[int] = None
is_connectable: bool = True
# Detection flags
is_airtag: bool = False
is_tile: bool = False
is_smarttag: bool = False
is_espressif: bool = False
is_tracker: bool = False
tracker_type: Optional[str] = None
first_seen: datetime = field(default_factory=datetime.now)
last_seen: datetime = field(default_factory=datetime.now)
detection_count: int = 1
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'mac': self.mac,
'name': self.name or 'Unknown',
'rssi': self.rssi,
'manufacturer_id': self.manufacturer_id,
'manufacturer_name': self.manufacturer_name,
'service_uuids': self.service_uuids,
'tx_power': self.tx_power,
'is_connectable': self.is_connectable,
'is_airtag': self.is_airtag,
'is_tile': self.is_tile,
'is_smarttag': self.is_smarttag,
'is_espressif': self.is_espressif,
'is_tracker': self.is_tracker,
'tracker_type': self.tracker_type,
'detection_count': self.detection_count,
'type': 'ble',
}
class BLEScanner:
"""
Cross-platform BLE scanner with manufacturer data detection.
Uses bleak library for proper BLE scanning, with fallback to
system tools (hcitool/btmgmt on Linux, system_profiler on macOS).
"""
def __init__(self):
self.devices: dict[str, BLEDevice] = {}
self._bleak_available = self._check_bleak()
self._scanning = False
def _check_bleak(self) -> bool:
"""Check if bleak library is available."""
try:
import bleak
return True
except ImportError:
logger.warning("bleak library not available - using fallback scanning")
return False
async def scan_async(self, duration: int = 10) -> list[BLEDevice]:
"""
Perform async BLE scan using bleak.
Args:
duration: Scan duration in seconds
Returns:
List of detected BLE devices
"""
if not self._bleak_available:
# Use synchronous fallback
return self._scan_fallback(duration)
try:
from bleak import BleakScanner
from bleak.backends.device import BLEDevice as BleakDevice
from bleak.backends.scanner import AdvertisementData
detected = {}
def detection_callback(device: BleakDevice, adv_data: AdvertisementData):
"""Callback for each detected device."""
mac = device.address.upper()
if mac in detected:
# Update existing device
detected[mac].rssi = adv_data.rssi
detected[mac].last_seen = datetime.now()
detected[mac].detection_count += 1
else:
# Create new device entry
ble_device = BLEDevice(
mac=mac,
name=adv_data.local_name or device.name,
rssi=adv_data.rssi,
service_uuids=list(adv_data.service_uuids) if adv_data.service_uuids else [],
tx_power=adv_data.tx_power,
)
# Parse manufacturer data
if adv_data.manufacturer_data:
for company_id, data in adv_data.manufacturer_data.items():
ble_device.manufacturer_id = company_id
ble_device.manufacturer_name = COMPANY_IDS.get(company_id, f'Unknown ({hex(company_id)})')
ble_device.manufacturer_data = bytes(data)
# Check for known trackers
self._identify_tracker(ble_device, company_id, data)
# Also check name patterns
self._check_name_patterns(ble_device)
detected[mac] = ble_device
logger.info(f"Starting BLE scan with bleak (duration={duration}s)")
scanner = BleakScanner(detection_callback=detection_callback)
await scanner.start()
await asyncio.sleep(duration)
await scanner.stop()
# Update internal device list
for mac, device in detected.items():
if mac in self.devices:
self.devices[mac].rssi = device.rssi
self.devices[mac].last_seen = device.last_seen
self.devices[mac].detection_count += 1
else:
self.devices[mac] = device
logger.info(f"BLE scan complete: {len(detected)} devices found")
return list(detected.values())
except Exception as e:
logger.error(f"Bleak scan failed: {e}")
return self._scan_fallback(duration)
def scan(self, duration: int = 10) -> list[BLEDevice]:
"""
Synchronous wrapper for BLE scanning.
Args:
duration: Scan duration in seconds
Returns:
List of detected BLE devices
"""
if self._bleak_available:
try:
# Try to get existing event loop
try:
loop = asyncio.get_running_loop()
# We're in an async context, can't use run()
future = asyncio.ensure_future(self.scan_async(duration))
return asyncio.get_event_loop().run_until_complete(future)
except RuntimeError:
# No running loop, create one
return asyncio.run(self.scan_async(duration))
except Exception as e:
logger.error(f"Async scan failed: {e}")
return self._scan_fallback(duration)
else:
return self._scan_fallback(duration)
def _identify_tracker(self, device: BLEDevice, company_id: int, data: bytes):
"""Identify if device is a known tracker type."""
# Apple AirTag detection
if company_id == 0x004C: # Apple
# Check for Find My / AirTag advertisement patterns
if len(data) >= 2:
# AirTag advertisements have specific byte patterns
if data[0] == 0x12 and data[1] == 0x19:
device.is_airtag = True
device.is_tracker = True
device.tracker_type = 'AirTag'
logger.info(f"AirTag detected: {device.mac}")
elif data[0] == 0x07: # Offline Finding
device.is_airtag = True
device.is_tracker = True
device.tracker_type = 'AirTag (Offline)'
logger.info(f"AirTag (offline mode) detected: {device.mac}")
# Tile tracker
elif company_id == 0x01DA: # Tile
device.is_tile = True
device.is_tracker = True
device.tracker_type = 'Tile'
logger.info(f"Tile tracker detected: {device.mac}")
# Samsung SmartTag
elif company_id == 0x0075: # Samsung
# Check if it's specifically a SmartTag
device.is_smarttag = True
device.is_tracker = True
device.tracker_type = 'SmartTag'
logger.info(f"Samsung SmartTag detected: {device.mac}")
# Espressif (ESP32/ESP8266)
elif company_id == 0x02E5: # Espressif
device.is_espressif = True
device.tracker_type = 'ESP32/ESP8266'
logger.info(f"ESP32/ESP8266 device detected: {device.mac}")
def _check_name_patterns(self, device: BLEDevice):
"""Check device name for tracker patterns."""
if not device.name:
return
name_lower = device.name.lower()
# Check each tracker type
for tracker_type, sig in TRACKER_SIGNATURES.items():
patterns = sig.get('name_patterns', [])
for pattern in patterns:
if pattern in name_lower:
if tracker_type == 'airtag':
device.is_airtag = True
device.is_tracker = True
device.tracker_type = 'AirTag'
elif tracker_type == 'tile':
device.is_tile = True
device.is_tracker = True
device.tracker_type = 'Tile'
elif tracker_type == 'smarttag':
device.is_smarttag = True
device.is_tracker = True
device.tracker_type = 'SmartTag'
elif tracker_type == 'espressif':
device.is_espressif = True
device.tracker_type = 'ESP32/ESP8266'
logger.info(f"Tracker identified by name: {device.name} -> {tracker_type}")
return
def _scan_fallback(self, duration: int = 10) -> list[BLEDevice]:
"""
Fallback scanning using system tools when bleak is unavailable.
Works on both macOS and Linux.
"""
system = platform.system()
if system == 'Darwin':
return self._scan_macos(duration)
else:
return self._scan_linux(duration)
def _scan_macos(self, duration: int = 10) -> list[BLEDevice]:
"""Fallback BLE scanning on macOS using system_profiler."""
devices = []
try:
import json
result = subprocess.run(
['system_profiler', 'SPBluetoothDataType', '-json'],
capture_output=True, text=True, timeout=15
)
data = json.loads(result.stdout)
bt_data = data.get('SPBluetoothDataType', [{}])[0]
# Get connected/paired devices
for section in ['device_connected', 'device_title']:
section_data = bt_data.get(section, {})
if isinstance(section_data, dict):
for name, info in section_data.items():
if isinstance(info, dict):
mac = info.get('device_address', '').upper()
if mac:
device = BLEDevice(
mac=mac,
name=name,
)
# Check name patterns
self._check_name_patterns(device)
devices.append(device)
logger.info(f"macOS fallback scan found {len(devices)} devices")
except Exception as e:
logger.error(f"macOS fallback scan failed: {e}")
return devices
def _scan_linux(self, duration: int = 10) -> list[BLEDevice]:
"""Fallback BLE scanning on Linux using bluetoothctl/btmgmt."""
import shutil
devices = []
seen_macs = set()
# Method 1: Try btmgmt for BLE devices
if shutil.which('btmgmt'):
try:
logger.info("Trying btmgmt find...")
result = subprocess.run(
['btmgmt', 'find'],
capture_output=True, text=True, timeout=duration + 5
)
for line in result.stdout.split('\n'):
if 'dev_found' in line.lower() or ('type' in line.lower() and ':' in line):
mac_match = re.search(
r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:'
r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})',
line
)
if mac_match:
mac = mac_match.group(1).upper()
if mac not in seen_macs:
seen_macs.add(mac)
name_match = re.search(r'name\s+(.+?)(?:\s|$)', line, re.I)
name = name_match.group(1) if name_match else None
device = BLEDevice(mac=mac, name=name)
self._check_name_patterns(device)
devices.append(device)
logger.info(f"btmgmt found {len(devices)} devices")
except Exception as e:
logger.warning(f"btmgmt failed: {e}")
# Method 2: Try hcitool lescan
if not devices and shutil.which('hcitool'):
try:
logger.info("Trying hcitool lescan...")
# Start lescan in background
process = subprocess.Popen(
['hcitool', 'lescan', '--duplicates'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
import time
time.sleep(duration)
process.terminate()
stdout, _ = process.communicate(timeout=2)
for line in stdout.split('\n'):
mac_match = re.search(
r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:'
r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})',
line
)
if mac_match:
mac = mac_match.group(1).upper()
if mac not in seen_macs:
seen_macs.add(mac)
# Extract name (comes after MAC)
parts = line.strip().split()
name = ' '.join(parts[1:]) if len(parts) > 1 else None
device = BLEDevice(mac=mac, name=name if name != '(unknown)' else None)
self._check_name_patterns(device)
devices.append(device)
logger.info(f"hcitool lescan found {len(devices)} devices")
except Exception as e:
logger.warning(f"hcitool lescan failed: {e}")
return devices
def get_trackers(self) -> list[BLEDevice]:
"""Get all detected tracker devices."""
return [d for d in self.devices.values() if d.is_tracker]
def get_espressif_devices(self) -> list[BLEDevice]:
"""Get all detected ESP32/ESP8266 devices."""
return [d for d in self.devices.values() if d.is_espressif]
def clear(self):
"""Clear all detected devices."""
self.devices.clear()
# Singleton instance
_scanner: Optional[BLEScanner] = None
def get_ble_scanner() -> BLEScanner:
"""Get the global BLE scanner instance."""
global _scanner
if _scanner is None:
_scanner = BLEScanner()
return _scanner
def scan_ble_devices(duration: int = 10) -> list[dict]:
"""
Convenience function to scan for BLE devices.
Args:
duration: Scan duration in seconds
Returns:
List of device dictionaries
"""
scanner = get_ble_scanner()
devices = scanner.scan(duration)
return [d.to_dict() for d in devices]

959
utils/tscm/correlation.py Normal file
View File

@@ -0,0 +1,959 @@
"""
TSCM Cross-Protocol Correlation Engine
Correlates Bluetooth, Wi-Fi, and RF indicators to detect potential surveillance activity.
Implements scoring model for risk assessment and provides actionable intelligence.
DISCLAIMER: This system performs wireless and RF surveillance screening.
Findings indicate anomalies and indicators, not confirmed surveillance devices.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
logger = logging.getLogger('intercept.tscm.correlation')
class RiskLevel(Enum):
"""Risk classification levels."""
INFORMATIONAL = 'informational' # Score 0-2
NEEDS_REVIEW = 'review' # Score 3-5
HIGH_INTEREST = 'high_interest' # Score 6+
class IndicatorType(Enum):
"""Types of risk indicators."""
UNKNOWN_DEVICE = 'unknown_device'
AUDIO_CAPABLE = 'audio_capable'
PERSISTENT = 'persistent'
MEETING_CORRELATED = 'meeting_correlated'
CROSS_PROTOCOL = 'cross_protocol'
HIDDEN_IDENTITY = 'hidden_identity'
ROGUE_AP = 'rogue_ap'
BURST_TRANSMISSION = 'burst_transmission'
STABLE_RSSI = 'stable_rssi'
HIGH_FREQ_ADVERTISING = 'high_freq_advertising'
MAC_ROTATION = 'mac_rotation'
NARROWBAND_SIGNAL = 'narrowband_signal'
ALWAYS_ON_CARRIER = 'always_on_carrier'
# Tracker-specific indicators
KNOWN_TRACKER = 'known_tracker'
AIRTAG_DETECTED = 'airtag_detected'
TILE_DETECTED = 'tile_detected'
SMARTTAG_DETECTED = 'smarttag_detected'
ESP32_DEVICE = 'esp32_device'
GENERIC_CHIPSET = 'generic_chipset'
# Scoring weights for each indicator
INDICATOR_SCORES = {
IndicatorType.UNKNOWN_DEVICE: 1,
IndicatorType.AUDIO_CAPABLE: 2,
IndicatorType.PERSISTENT: 2,
IndicatorType.MEETING_CORRELATED: 2,
IndicatorType.CROSS_PROTOCOL: 3,
IndicatorType.HIDDEN_IDENTITY: 2,
IndicatorType.ROGUE_AP: 3,
IndicatorType.BURST_TRANSMISSION: 2,
IndicatorType.STABLE_RSSI: 1,
IndicatorType.HIGH_FREQ_ADVERTISING: 1,
IndicatorType.MAC_ROTATION: 1,
IndicatorType.NARROWBAND_SIGNAL: 2,
IndicatorType.ALWAYS_ON_CARRIER: 2,
# Tracker scores - higher for covert tracking devices
IndicatorType.KNOWN_TRACKER: 3,
IndicatorType.AIRTAG_DETECTED: 3,
IndicatorType.TILE_DETECTED: 2,
IndicatorType.SMARTTAG_DETECTED: 2,
IndicatorType.ESP32_DEVICE: 2,
IndicatorType.GENERIC_CHIPSET: 1,
}
# Known tracker device signatures
TRACKER_SIGNATURES = {
# Apple AirTag - OUI prefixes
'airtag_oui': ['4C:E6:76', '7C:04:D0', 'DC:A4:CA', 'F0:B3:EC'],
# Tile trackers
'tile_oui': ['D0:03:DF', 'EC:2E:4E'],
# Samsung SmartTag
'smarttag_oui': ['8C:71:F8', 'CC:2D:83', 'F0:5C:D5'],
# ESP32/ESP8266 Espressif chipsets
'espressif_oui': ['24:0A:C4', '24:6F:28', '24:62:AB', '30:AE:A4',
'3C:61:05', '3C:71:BF', '40:F5:20', '48:3F:DA',
'4C:11:AE', '54:43:B2', '58:BF:25', '5C:CF:7F',
'60:01:94', '68:C6:3A', '7C:9E:BD', '84:0D:8E',
'84:CC:A8', '84:F3:EB', '8C:AA:B5', '90:38:0C',
'94:B5:55', '98:CD:AC', 'A4:7B:9D', 'A4:CF:12',
'AC:67:B2', 'B4:E6:2D', 'BC:DD:C2', 'C4:4F:33',
'C8:2B:96', 'CC:50:E3', 'D8:A0:1D', 'DC:4F:22',
'E0:98:06', 'E8:68:E7', 'EC:FA:BC', 'F4:CF:A2'],
# Generic/suspicious chipset vendors (potential covert devices)
'generic_chipset_oui': [
'00:1A:7D', # cyber-blue(HK)
'00:25:00', # Apple (but generic BLE)
],
}
@dataclass
class Indicator:
"""A single risk indicator."""
type: IndicatorType
description: str
score: int
details: dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class DeviceProfile:
"""Complete profile for a detected device."""
# Identity
identifier: str # MAC, BSSID, or frequency
protocol: str # 'bluetooth', 'wifi', 'rf'
# Device info
name: Optional[str] = None
manufacturer: Optional[str] = None
device_type: Optional[str] = None
# Bluetooth-specific
services: list[str] = field(default_factory=list)
company_id: Optional[int] = None
advertising_interval: Optional[int] = None
# Wi-Fi-specific
ssid: Optional[str] = None
channel: Optional[int] = None
encryption: Optional[str] = None
beacon_interval: Optional[int] = None
is_hidden: bool = False
# RF-specific
frequency: Optional[float] = None
bandwidth: Optional[float] = None
modulation: Optional[str] = None
# Common measurements
rssi_samples: list[tuple[datetime, int]] = field(default_factory=list)
first_seen: Optional[datetime] = None
last_seen: Optional[datetime] = None
detection_count: int = 0
# Behavioral analysis
indicators: list[Indicator] = field(default_factory=list)
total_score: int = 0
risk_level: RiskLevel = RiskLevel.INFORMATIONAL
# Correlation
correlated_devices: list[str] = field(default_factory=list)
# Output
confidence: float = 0.0
recommended_action: str = 'monitor'
def add_rssi_sample(self, rssi: int) -> None:
"""Add an RSSI sample with timestamp."""
self.rssi_samples.append((datetime.now(), rssi))
# Keep last 100 samples
if len(self.rssi_samples) > 100:
self.rssi_samples = self.rssi_samples[-100:]
def get_rssi_stability(self) -> float:
"""Calculate RSSI stability (0-1, higher = more stable)."""
if len(self.rssi_samples) < 3:
return 0.0
values = [r for _, r in self.rssi_samples[-20:]]
if not values:
return 0.0
avg = sum(values) / len(values)
variance = sum((v - avg) ** 2 for v in values) / len(values)
# Convert variance to stability score (lower variance = higher stability)
# Variance of ~0 = 1.0, variance of 100+ = ~0
return max(0, 1 - (variance / 100))
def add_indicator(self, indicator_type: IndicatorType, description: str,
details: dict = None) -> None:
"""Add a risk indicator and update score."""
score = INDICATOR_SCORES.get(indicator_type, 1)
self.indicators.append(Indicator(
type=indicator_type,
description=description,
score=score,
details=details or {}
))
self._recalculate_score()
def _recalculate_score(self) -> None:
"""Recalculate total score and risk level."""
self.total_score = sum(i.score for i in self.indicators)
if self.total_score >= 6:
self.risk_level = RiskLevel.HIGH_INTEREST
self.recommended_action = 'investigate'
elif self.total_score >= 3:
self.risk_level = RiskLevel.NEEDS_REVIEW
self.recommended_action = 'review'
else:
self.risk_level = RiskLevel.INFORMATIONAL
self.recommended_action = 'monitor'
# Calculate confidence based on number and quality of indicators
indicator_count = len(self.indicators)
self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05))
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'identifier': self.identifier,
'protocol': self.protocol,
'name': self.name,
'manufacturer': self.manufacturer,
'device_type': self.device_type,
'ssid': self.ssid,
'frequency': self.frequency,
'first_seen': self.first_seen.isoformat() if self.first_seen else None,
'last_seen': self.last_seen.isoformat() if self.last_seen else None,
'detection_count': self.detection_count,
'rssi_current': self.rssi_samples[-1][1] if self.rssi_samples else None,
'rssi_stability': self.get_rssi_stability(),
'indicators': [
{
'type': i.type.value,
'description': i.description,
'score': i.score,
}
for i in self.indicators
],
'total_score': self.total_score,
'risk_level': self.risk_level.value,
'confidence': round(self.confidence, 2),
'recommended_action': self.recommended_action,
'correlated_devices': self.correlated_devices,
}
# Known audio-capable BLE service UUIDs
AUDIO_SERVICE_UUIDS = [
'0000110b-0000-1000-8000-00805f9b34fb', # A2DP Sink
'0000110a-0000-1000-8000-00805f9b34fb', # A2DP Source
'0000111e-0000-1000-8000-00805f9b34fb', # Handsfree
'0000111f-0000-1000-8000-00805f9b34fb', # Handsfree Audio Gateway
'00001108-0000-1000-8000-00805f9b34fb', # Headset
'00001203-0000-1000-8000-00805f9b34fb', # Generic Audio
]
# Generic chipset vendors (often used in covert devices)
GENERIC_CHIPSET_VENDORS = [
'espressif',
'nordic',
'texas instruments',
'silicon labs',
'realtek',
'mediatek',
'qualcomm',
'broadcom',
'cypress',
'dialog',
]
# Suspicious frequency ranges for RF
SUSPICIOUS_RF_BANDS = [
{'start': 136, 'end': 174, 'name': 'VHF', 'risk': 'high'},
{'start': 400, 'end': 470, 'name': 'UHF', 'risk': 'high'},
{'start': 315, 'end': 316, 'name': '315 MHz ISM', 'risk': 'medium'},
{'start': 433, 'end': 435, 'name': '433 MHz ISM', 'risk': 'medium'},
{'start': 868, 'end': 870, 'name': '868 MHz ISM', 'risk': 'medium'},
{'start': 902, 'end': 928, 'name': '915 MHz ISM', 'risk': 'medium'},
]
class CorrelationEngine:
"""
Cross-protocol correlation engine for TSCM analysis.
Correlates Bluetooth, Wi-Fi, and RF indicators to identify
potential surveillance activity patterns.
"""
def __init__(self):
self.device_profiles: dict[str, DeviceProfile] = {}
self.meeting_windows: list[tuple[datetime, datetime]] = []
self.correlation_window = timedelta(minutes=5)
def start_meeting_window(self) -> None:
"""Mark the start of a sensitive period (meeting)."""
self.meeting_windows.append((datetime.now(), None))
logger.info("Meeting window started")
def end_meeting_window(self) -> None:
"""Mark the end of a sensitive period."""
if self.meeting_windows and self.meeting_windows[-1][1] is None:
start = self.meeting_windows[-1][0]
self.meeting_windows[-1] = (start, datetime.now())
logger.info("Meeting window ended")
def is_during_meeting(self, timestamp: datetime = None) -> bool:
"""Check if timestamp falls within a meeting window."""
ts = timestamp or datetime.now()
for start, end in self.meeting_windows:
if end is None:
if ts >= start:
return True
elif start <= ts <= end:
return True
return False
def get_or_create_profile(self, identifier: str, protocol: str) -> DeviceProfile:
"""Get existing profile or create new one."""
key = f"{protocol}:{identifier}"
if key not in self.device_profiles:
self.device_profiles[key] = DeviceProfile(
identifier=identifier,
protocol=protocol,
first_seen=datetime.now()
)
profile = self.device_profiles[key]
profile.last_seen = datetime.now()
profile.detection_count += 1
return profile
def analyze_bluetooth_device(self, device: dict) -> DeviceProfile:
"""
Analyze a Bluetooth device for suspicious indicators.
Args:
device: Dict with mac, name, rssi, services, manufacturer, etc.
Returns:
DeviceProfile with risk assessment
"""
mac = device.get('mac', device.get('address', '')).upper()
profile = self.get_or_create_profile(mac, 'bluetooth')
# Update profile data
profile.name = device.get('name') or profile.name
profile.manufacturer = device.get('manufacturer') or profile.manufacturer
profile.device_type = device.get('type') or profile.device_type
profile.services = device.get('services', []) or profile.services
profile.company_id = device.get('company_id') or profile.company_id
profile.advertising_interval = device.get('advertising_interval') or profile.advertising_interval
# Add RSSI sample
rssi = device.get('rssi', device.get('signal'))
if rssi:
try:
profile.add_rssi_sample(int(rssi))
except (ValueError, TypeError):
pass
# Clear previous indicators for fresh analysis
profile.indicators = []
# === Detection Logic ===
# 1. Unknown manufacturer or generic chipset
if not profile.manufacturer:
profile.add_indicator(
IndicatorType.UNKNOWN_DEVICE,
'Unknown manufacturer',
{'manufacturer': None}
)
elif any(v in profile.manufacturer.lower() for v in GENERIC_CHIPSET_VENDORS):
profile.add_indicator(
IndicatorType.UNKNOWN_DEVICE,
f'Generic chipset vendor: {profile.manufacturer}',
{'manufacturer': profile.manufacturer}
)
# 2. No human-readable name
if not profile.name or profile.name in ['Unknown', '', 'N/A']:
profile.add_indicator(
IndicatorType.HIDDEN_IDENTITY,
'No device name advertised',
{'name': profile.name}
)
# 3. Audio-capable services
if profile.services:
audio_services = [s for s in profile.services
if s.lower() in [u.lower() for u in AUDIO_SERVICE_UUIDS]]
if audio_services:
profile.add_indicator(
IndicatorType.AUDIO_CAPABLE,
'Audio-capable BLE services detected',
{'services': audio_services}
)
# Check name for audio keywords
if profile.name:
audio_keywords = ['headphone', 'headset', 'earphone', 'speaker',
'mic', 'audio', 'airpod', 'buds', 'jabra', 'bose']
if any(k in profile.name.lower() for k in audio_keywords):
profile.add_indicator(
IndicatorType.AUDIO_CAPABLE,
f'Audio device name: {profile.name}',
{'name': profile.name}
)
# 4. High-frequency advertising (< 100ms interval is suspicious)
if profile.advertising_interval and profile.advertising_interval < 100:
profile.add_indicator(
IndicatorType.HIGH_FREQ_ADVERTISING,
f'High advertising frequency: {profile.advertising_interval}ms',
{'interval': profile.advertising_interval}
)
# 5. Persistent presence
if profile.detection_count >= 3:
profile.add_indicator(
IndicatorType.PERSISTENT,
f'Persistent device ({profile.detection_count} detections)',
{'count': profile.detection_count}
)
# 6. Stable RSSI (suggests fixed placement)
rssi_stability = profile.get_rssi_stability()
if rssi_stability > 0.7 and len(profile.rssi_samples) >= 5:
profile.add_indicator(
IndicatorType.STABLE_RSSI,
f'Stable signal strength (stability: {rssi_stability:.0%})',
{'stability': rssi_stability}
)
# 7. Meeting correlation
if self.is_during_meeting():
profile.add_indicator(
IndicatorType.MEETING_CORRELATED,
'Detected during sensitive period',
{'during_meeting': True}
)
# 8. MAC rotation pattern (random MAC prefix)
if mac and mac[1] in ['2', '6', 'A', 'E', 'a', 'e']:
profile.add_indicator(
IndicatorType.MAC_ROTATION,
'Random/rotating MAC address detected',
{'mac': mac}
)
# 9. Known tracker detection (AirTag, Tile, SmartTag, ESP32)
mac_prefix = mac[:8] if len(mac) >= 8 else ''
tracker_detected = False
# Check for tracker flags from BLE scanner (manufacturer ID detection)
if device.get('is_airtag'):
profile.add_indicator(
IndicatorType.AIRTAG_DETECTED,
'Apple AirTag detected via manufacturer data',
{'mac': mac, 'tracker_type': 'AirTag'}
)
profile.device_type = device.get('tracker_type', 'AirTag')
tracker_detected = True
if device.get('is_tile'):
profile.add_indicator(
IndicatorType.TILE_DETECTED,
'Tile tracker detected via manufacturer data',
{'mac': mac, 'tracker_type': 'Tile'}
)
profile.device_type = 'Tile Tracker'
tracker_detected = True
if device.get('is_smarttag'):
profile.add_indicator(
IndicatorType.SMARTTAG_DETECTED,
'Samsung SmartTag detected via manufacturer data',
{'mac': mac, 'tracker_type': 'SmartTag'}
)
profile.device_type = 'Samsung SmartTag'
tracker_detected = True
if device.get('is_espressif'):
profile.add_indicator(
IndicatorType.ESP32_DEVICE,
'ESP32/ESP8266 detected via Espressif manufacturer ID',
{'mac': mac, 'chipset': 'Espressif'}
)
profile.manufacturer = 'Espressif'
profile.device_type = device.get('tracker_type', 'ESP32/ESP8266')
tracker_detected = True
# Check manufacturer_id directly
mfg_id = device.get('manufacturer_id')
if mfg_id:
if mfg_id == 0x004C and not device.get('is_airtag'):
# Apple device - could be AirTag
profile.manufacturer = 'Apple'
elif mfg_id == 0x02E5 and not device.get('is_espressif'):
# Espressif device
profile.add_indicator(
IndicatorType.ESP32_DEVICE,
'ESP32/ESP8266 detected via manufacturer ID',
{'mac': mac, 'manufacturer_id': mfg_id}
)
profile.manufacturer = 'Espressif'
tracker_detected = True
# Fallback: Check for Apple AirTag by OUI
if not tracker_detected and mac_prefix in TRACKER_SIGNATURES.get('airtag_oui', []):
profile.add_indicator(
IndicatorType.AIRTAG_DETECTED,
'Apple AirTag detected - potential tracking device',
{'mac': mac, 'tracker_type': 'AirTag'}
)
profile.device_type = 'AirTag'
tracker_detected = True
# Check for Tile tracker
if mac_prefix in TRACKER_SIGNATURES.get('tile_oui', []):
profile.add_indicator(
IndicatorType.TILE_DETECTED,
'Tile tracker detected',
{'mac': mac, 'tracker_type': 'Tile'}
)
profile.device_type = 'Tile Tracker'
tracker_detected = True
# Check for Samsung SmartTag
if mac_prefix in TRACKER_SIGNATURES.get('smarttag_oui', []):
profile.add_indicator(
IndicatorType.SMARTTAG_DETECTED,
'Samsung SmartTag detected',
{'mac': mac, 'tracker_type': 'SmartTag'}
)
profile.device_type = 'Samsung SmartTag'
tracker_detected = True
# Check for ESP32/ESP8266 devices
if mac_prefix in TRACKER_SIGNATURES.get('espressif_oui', []):
profile.add_indicator(
IndicatorType.ESP32_DEVICE,
'ESP32/ESP8266 device detected - programmable hardware',
{'mac': mac, 'chipset': 'Espressif'}
)
profile.manufacturer = 'Espressif'
tracker_detected = True
# Check for generic/suspicious chipsets
if mac_prefix in TRACKER_SIGNATURES.get('generic_chipset_oui', []):
profile.add_indicator(
IndicatorType.GENERIC_CHIPSET,
'Generic chipset vendor - often used in covert devices',
{'mac': mac}
)
tracker_detected = True
# If any tracker detected, add general tracker indicator
if tracker_detected:
profile.add_indicator(
IndicatorType.KNOWN_TRACKER,
'Known tracking device signature detected',
{'mac': mac}
)
# Also check name for tracker keywords
if profile.name:
name_lower = profile.name.lower()
if 'airtag' in name_lower or 'findmy' in name_lower:
profile.add_indicator(
IndicatorType.AIRTAG_DETECTED,
f'AirTag identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'AirTag'
elif 'tile' in name_lower:
profile.add_indicator(
IndicatorType.TILE_DETECTED,
f'Tile tracker identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'Tile Tracker'
elif 'smarttag' in name_lower:
profile.add_indicator(
IndicatorType.SMARTTAG_DETECTED,
f'SmartTag identified by name: {profile.name}',
{'name': profile.name}
)
profile.device_type = 'Samsung SmartTag'
return profile
def analyze_wifi_device(self, device: dict) -> DeviceProfile:
"""
Analyze a Wi-Fi device/AP for suspicious indicators.
Args:
device: Dict with bssid, ssid, channel, rssi, encryption, etc.
Returns:
DeviceProfile with risk assessment
"""
bssid = device.get('bssid', device.get('mac', '')).upper()
profile = self.get_or_create_profile(bssid, 'wifi')
# Update profile data
ssid = device.get('ssid', device.get('essid', ''))
profile.ssid = ssid if ssid else profile.ssid
profile.name = ssid or f'Hidden Network ({bssid[-8:]})'
profile.channel = device.get('channel') or profile.channel
profile.encryption = device.get('encryption', device.get('privacy')) or profile.encryption
profile.beacon_interval = device.get('beacon_interval') or profile.beacon_interval
profile.is_hidden = not ssid or ssid in ['', 'Hidden', '[Hidden]']
# Extract manufacturer from OUI
if bssid and len(bssid) >= 8:
profile.manufacturer = device.get('vendor') or profile.manufacturer
# Add RSSI sample
rssi = device.get('rssi', device.get('power', device.get('signal')))
if rssi:
try:
profile.add_rssi_sample(int(rssi))
except (ValueError, TypeError):
pass
# Clear previous indicators
profile.indicators = []
# === Detection Logic ===
# 1. Hidden or unnamed SSID
if profile.is_hidden:
profile.add_indicator(
IndicatorType.HIDDEN_IDENTITY,
'Hidden or empty SSID',
{'ssid': ssid}
)
# 2. BSSID not in authorized list (would need baseline)
# For now, mark as unknown if no manufacturer
if not profile.manufacturer:
profile.add_indicator(
IndicatorType.UNKNOWN_DEVICE,
'Unknown AP manufacturer',
{'bssid': bssid}
)
# 3. Consumer device OUI in restricted environment
consumer_ouis = ['tp-link', 'netgear', 'd-link', 'linksys', 'asus']
if profile.manufacturer and any(c in profile.manufacturer.lower() for c in consumer_ouis):
profile.add_indicator(
IndicatorType.ROGUE_AP,
f'Consumer-grade AP detected: {profile.manufacturer}',
{'manufacturer': profile.manufacturer}
)
# 4. Camera device patterns
camera_keywords = ['cam', 'camera', 'ipcam', 'dvr', 'nvr', 'wyze',
'ring', 'arlo', 'nest', 'blink', 'eufy', 'yi']
if ssid and any(k in ssid.lower() for k in camera_keywords):
profile.add_indicator(
IndicatorType.AUDIO_CAPABLE, # Cameras often have mics
f'Potential camera device: {ssid}',
{'ssid': ssid}
)
# 5. Persistent presence
if profile.detection_count >= 3:
profile.add_indicator(
IndicatorType.PERSISTENT,
f'Persistent AP ({profile.detection_count} detections)',
{'count': profile.detection_count}
)
# 6. Stable RSSI (fixed placement)
rssi_stability = profile.get_rssi_stability()
if rssi_stability > 0.7 and len(profile.rssi_samples) >= 5:
profile.add_indicator(
IndicatorType.STABLE_RSSI,
f'Stable signal (stability: {rssi_stability:.0%})',
{'stability': rssi_stability}
)
# 7. Meeting correlation
if self.is_during_meeting():
profile.add_indicator(
IndicatorType.MEETING_CORRELATED,
'Detected during sensitive period',
{'during_meeting': True}
)
# 8. Strong hidden AP (very suspicious)
if profile.is_hidden and profile.rssi_samples:
latest_rssi = profile.rssi_samples[-1][1]
if latest_rssi > -50:
profile.add_indicator(
IndicatorType.ROGUE_AP,
f'Strong hidden AP (RSSI: {latest_rssi} dBm)',
{'rssi': latest_rssi}
)
return profile
def analyze_rf_signal(self, signal: dict) -> DeviceProfile:
"""
Analyze an RF signal for suspicious indicators.
Args:
signal: Dict with frequency, power, bandwidth, modulation, etc.
Returns:
DeviceProfile with risk assessment
"""
frequency = signal.get('frequency', 0)
freq_key = f"{frequency:.3f}"
profile = self.get_or_create_profile(freq_key, 'rf')
# Update profile data
profile.frequency = frequency
profile.name = f'{frequency:.3f} MHz'
profile.bandwidth = signal.get('bandwidth') or profile.bandwidth
profile.modulation = signal.get('modulation') or profile.modulation
# Add power sample
power = signal.get('power', signal.get('level'))
if power:
try:
profile.add_rssi_sample(int(float(power)))
except (ValueError, TypeError):
pass
# Clear previous indicators
profile.indicators = []
# === Detection Logic ===
# 1. Determine frequency band risk
band_info = None
for band in SUSPICIOUS_RF_BANDS:
if band['start'] <= frequency <= band['end']:
band_info = band
break
if band_info:
if band_info['risk'] == 'high':
profile.add_indicator(
IndicatorType.NARROWBAND_SIGNAL,
f"Signal in high-risk band: {band_info['name']}",
{'band': band_info['name'], 'frequency': frequency}
)
else:
profile.add_indicator(
IndicatorType.UNKNOWN_DEVICE,
f"Signal in ISM band: {band_info['name']}",
{'band': band_info['name'], 'frequency': frequency}
)
# 2. Narrowband FM/AM (potential bug)
if profile.modulation and profile.modulation.lower() in ['fm', 'nfm', 'am']:
profile.add_indicator(
IndicatorType.NARROWBAND_SIGNAL,
f'Narrowband {profile.modulation.upper()} signal',
{'modulation': profile.modulation}
)
# 3. Persistent/always-on carrier
if profile.detection_count >= 2:
profile.add_indicator(
IndicatorType.ALWAYS_ON_CARRIER,
f'Persistent carrier ({profile.detection_count} detections)',
{'count': profile.detection_count}
)
# 4. Strong signal (close proximity)
if profile.rssi_samples:
latest_power = profile.rssi_samples[-1][1]
if latest_power > -40:
profile.add_indicator(
IndicatorType.STABLE_RSSI,
f'Strong signal suggesting close proximity ({latest_power} dBm)',
{'power': latest_power}
)
# 5. Meeting correlation
if self.is_during_meeting():
profile.add_indicator(
IndicatorType.MEETING_CORRELATED,
'Signal detected during sensitive period',
{'during_meeting': True}
)
return profile
def correlate_devices(self) -> list[dict]:
"""
Perform cross-protocol correlation analysis.
Identifies devices across protocols that may be related.
Returns:
List of correlation findings
"""
correlations = []
now = datetime.now()
# Get recent devices by protocol
bt_devices = [p for p in self.device_profiles.values()
if p.protocol == 'bluetooth' and
p.last_seen and (now - p.last_seen) < self.correlation_window]
wifi_devices = [p for p in self.device_profiles.values()
if p.protocol == 'wifi' and
p.last_seen and (now - p.last_seen) < self.correlation_window]
rf_signals = [p for p in self.device_profiles.values()
if p.protocol == 'rf' and
p.last_seen and (now - p.last_seen) < self.correlation_window]
# Correlation 1: BLE audio device + RF narrowband signal
audio_bt = [p for p in bt_devices
if any(i.type == IndicatorType.AUDIO_CAPABLE for i in p.indicators)]
narrowband_rf = [p for p in rf_signals
if any(i.type == IndicatorType.NARROWBAND_SIGNAL for i in p.indicators)]
for bt in audio_bt:
for rf in narrowband_rf:
correlation = {
'type': 'bt_audio_rf_narrowband',
'description': 'Audio-capable BLE device detected alongside narrowband RF signal',
'devices': [bt.identifier, rf.identifier],
'protocols': ['bluetooth', 'rf'],
'score_boost': 3,
'significance': 'high',
}
correlations.append(correlation)
# Add cross-protocol indicator to both
bt.add_indicator(
IndicatorType.CROSS_PROTOCOL,
f'Correlated with RF signal at {rf.frequency:.3f} MHz',
{'correlated_device': rf.identifier}
)
rf.add_indicator(
IndicatorType.CROSS_PROTOCOL,
f'Correlated with BLE device {bt.identifier}',
{'correlated_device': bt.identifier}
)
bt.correlated_devices.append(rf.identifier)
rf.correlated_devices.append(bt.identifier)
# Correlation 2: Rogue WiFi AP + RF burst activity
rogue_aps = [p for p in wifi_devices
if any(i.type == IndicatorType.ROGUE_AP for i in p.indicators)]
rf_bursts = [p for p in rf_signals
if any(i.type in [IndicatorType.BURST_TRANSMISSION,
IndicatorType.ALWAYS_ON_CARRIER] for i in p.indicators)]
for ap in rogue_aps:
for rf in rf_bursts:
correlation = {
'type': 'rogue_ap_rf_burst',
'description': 'Rogue AP detected alongside RF transmission',
'devices': [ap.identifier, rf.identifier],
'protocols': ['wifi', 'rf'],
'score_boost': 3,
'significance': 'high',
}
correlations.append(correlation)
ap.add_indicator(
IndicatorType.CROSS_PROTOCOL,
f'Correlated with RF at {rf.frequency:.3f} MHz',
{'correlated_device': rf.identifier}
)
rf.add_indicator(
IndicatorType.CROSS_PROTOCOL,
f'Correlated with AP {ap.ssid or ap.identifier}',
{'correlated_device': ap.identifier}
)
# Correlation 3: Same vendor BLE + WiFi
for bt in bt_devices:
if bt.manufacturer:
for wifi in wifi_devices:
if wifi.manufacturer and bt.manufacturer.lower() in wifi.manufacturer.lower():
correlation = {
'type': 'same_vendor_bt_wifi',
'description': f'Same vendor ({bt.manufacturer}) on BLE and WiFi',
'devices': [bt.identifier, wifi.identifier],
'protocols': ['bluetooth', 'wifi'],
'score_boost': 2,
'significance': 'medium',
}
correlations.append(correlation)
return correlations
def get_high_interest_devices(self) -> list[DeviceProfile]:
"""Get all devices classified as high interest."""
return [p for p in self.device_profiles.values()
if p.risk_level == RiskLevel.HIGH_INTEREST]
def get_all_findings(self) -> dict:
"""
Get comprehensive findings report.
Returns:
Dict with all device profiles, correlations, and summary
"""
correlations = self.correlate_devices()
devices_by_risk = {
'high_interest': [],
'needs_review': [],
'informational': [],
}
for profile in self.device_profiles.values():
devices_by_risk[profile.risk_level.value].append(profile.to_dict())
return {
'timestamp': datetime.now().isoformat(),
'summary': {
'total_devices': len(self.device_profiles),
'high_interest': len(devices_by_risk['high_interest']),
'needs_review': len(devices_by_risk['needs_review']),
'informational': len(devices_by_risk['informational']),
'correlations_found': len(correlations),
},
'devices': devices_by_risk,
'correlations': correlations,
'disclaimer': (
"This system performs wireless and RF surveillance screening. "
"Findings indicate anomalies and indicators, not confirmed surveillance devices."
),
}
def clear_old_profiles(self, max_age_hours: int = 24) -> int:
"""Remove profiles older than specified age."""
cutoff = datetime.now() - timedelta(hours=max_age_hours)
old_keys = [
k for k, v in self.device_profiles.items()
if v.last_seen and v.last_seen < cutoff
]
for key in old_keys:
del self.device_profiles[key]
return len(old_keys)
# Global correlation engine instance
_correlation_engine: CorrelationEngine | None = None
def get_correlation_engine() -> CorrelationEngine:
"""Get or create the global correlation engine."""
global _correlation_engine
if _correlation_engine is None:
_correlation_engine = CorrelationEngine()
return _correlation_engine
def reset_correlation_engine() -> None:
"""Reset the global correlation engine."""
global _correlation_engine
_correlation_engine = CorrelationEngine()

580
utils/tscm/detector.py Normal file
View File

@@ -0,0 +1,580 @@
"""
TSCM Threat Detection Engine
Analyzes WiFi, Bluetooth, and RF data to identify potential surveillance devices
and classify threats based on known patterns and baseline comparison.
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Any
from data.tscm_frequencies import (
BLE_TRACKER_SIGNATURES,
THREAT_TYPES,
WIFI_CAMERA_PATTERNS,
get_frequency_risk,
get_threat_severity,
is_known_tracker,
is_potential_camera,
)
logger = logging.getLogger('intercept.tscm.detector')
# Classification levels for TSCM devices
CLASSIFICATION_LEVELS = {
'informational': {
'color': '#00cc00', # Green
'label': 'Informational',
'description': 'Known device, expected infrastructure, or background noise',
},
'review': {
'color': '#ffcc00', # Yellow
'label': 'Needs Review',
'description': 'Unknown device requiring investigation',
},
'high_interest': {
'color': '#ff3333', # Red
'label': 'High Interest',
'description': 'Suspicious device requiring immediate attention',
},
}
# BLE device types that can transmit audio (potential bugs)
AUDIO_CAPABLE_BLE_NAMES = [
'headphone', 'headset', 'earphone', 'earbud', 'speaker',
'audio', 'mic', 'microphone', 'airpod', 'buds',
'jabra', 'bose', 'sony wf', 'sony wh', 'beats',
'jbl', 'soundcore', 'anker', 'skullcandy',
]
# Device history for tracking repeat detections across scans
_device_history: dict[str, list[datetime]] = {}
_history_window_hours = 24 # Consider detections within 24 hours
def _record_device_seen(identifier: str) -> int:
"""Record a device sighting and return count of times seen."""
now = datetime.now()
if identifier not in _device_history:
_device_history[identifier] = []
# Clean old entries
cutoff = now.timestamp() - (_history_window_hours * 3600)
_device_history[identifier] = [
dt for dt in _device_history[identifier]
if dt.timestamp() > cutoff
]
_device_history[identifier].append(now)
return len(_device_history[identifier])
def _is_audio_capable_ble(name: str | None, device_type: str | None = None) -> bool:
"""Check if a BLE device might be audio-capable."""
if name:
name_lower = name.lower()
for pattern in AUDIO_CAPABLE_BLE_NAMES:
if pattern in name_lower:
return True
if device_type:
type_lower = device_type.lower()
if any(t in type_lower for t in ['audio', 'headset', 'headphone', 'speaker']):
return True
return False
class ThreatDetector:
"""
Analyzes scan results to detect potential surveillance threats.
"""
def __init__(self, baseline: dict | None = None):
"""
Initialize the threat detector.
Args:
baseline: Optional baseline dict containing expected devices
"""
self.baseline = baseline
self.baseline_wifi_macs = set()
self.baseline_bt_macs = set()
self.baseline_rf_freqs = set()
if baseline:
self._load_baseline(baseline)
def _load_baseline(self, baseline: dict) -> None:
"""Load baseline device identifiers for comparison."""
# WiFi networks and clients
for network in baseline.get('wifi_networks', []):
if 'bssid' in network:
self.baseline_wifi_macs.add(network['bssid'].upper())
if 'clients' in network:
for client in network['clients']:
if 'mac' in client:
self.baseline_wifi_macs.add(client['mac'].upper())
# Bluetooth devices
for device in baseline.get('bt_devices', []):
if 'mac' in device:
self.baseline_bt_macs.add(device['mac'].upper())
# RF frequencies (rounded to nearest 0.1 MHz)
for freq in baseline.get('rf_frequencies', []):
if isinstance(freq, dict):
self.baseline_rf_freqs.add(round(freq.get('frequency', 0), 1))
else:
self.baseline_rf_freqs.add(round(freq, 1))
logger.info(
f"Loaded baseline: {len(self.baseline_wifi_macs)} WiFi, "
f"{len(self.baseline_bt_macs)} BT, {len(self.baseline_rf_freqs)} RF"
)
def classify_wifi_device(self, device: dict) -> dict:
"""
Classify a WiFi device into informational/review/high_interest.
Returns:
Dict with 'classification', 'reasons', and metadata
"""
mac = device.get('bssid', device.get('mac', '')).upper()
ssid = device.get('essid', device.get('ssid', ''))
signal = device.get('power', device.get('signal', -100))
reasons = []
classification = 'informational'
# Track repeat detections
times_seen = _record_device_seen(f'wifi:{mac}') if mac else 1
# Check if in baseline (known device)
in_baseline = mac in self.baseline_wifi_macs if self.baseline else False
if in_baseline:
reasons.append('Known device in baseline')
classification = 'informational'
else:
# New/unknown device
reasons.append('New WiFi access point')
classification = 'review'
# Check for suspicious patterns -> high interest
if is_potential_camera(ssid=ssid, mac=mac):
reasons.append('Matches camera device patterns')
classification = 'high_interest'
try:
signal_val = int(signal) if signal else -100
except (ValueError, TypeError):
signal_val = -100
if not ssid and signal and signal_val > -60:
reasons.append('Hidden SSID with strong signal')
classification = 'high_interest'
# Repeat detections across scans
if times_seen >= 3:
reasons.append(f'Repeat detection ({times_seen} times)')
if classification != 'high_interest':
classification = 'high_interest'
return {
'classification': classification,
'reasons': reasons,
'in_baseline': in_baseline,
'times_seen': times_seen,
}
def classify_bt_device(self, device: dict) -> dict:
"""
Classify a Bluetooth device into informational/review/high_interest.
Returns:
Dict with 'classification', 'reasons', and metadata
"""
mac = device.get('mac', device.get('address', '')).upper()
name = device.get('name', '')
rssi = device.get('rssi', device.get('signal', -100))
device_type = device.get('type', '')
manufacturer_data = device.get('manufacturer_data')
reasons = []
classification = 'informational'
tracker_info = None
# Track repeat detections
times_seen = _record_device_seen(f'bt:{mac}') if mac else 1
# Check if in baseline (known device)
in_baseline = mac in self.baseline_bt_macs if self.baseline else False
# Check for trackers (do this early for all devices)
tracker_info = is_known_tracker(name, manufacturer_data)
if in_baseline:
reasons.append('Known device in baseline')
classification = 'informational'
else:
# New/unknown BLE device
if not name or name == 'Unknown':
reasons.append('Unknown BLE device')
classification = 'review'
else:
reasons.append('New Bluetooth device')
classification = 'review'
# Check for trackers -> high interest
if tracker_info:
reasons.append(f"Known tracker: {tracker_info.get('name', 'Unknown')}")
classification = 'high_interest'
# Check for audio-capable devices -> high interest
if _is_audio_capable_ble(name, device_type):
reasons.append('Audio-capable BLE device')
classification = 'high_interest'
# Strong signal from unknown device
try:
rssi_val = int(rssi) if rssi else -100
except (ValueError, TypeError):
rssi_val = -100
if rssi and rssi_val > -50 and not name:
reasons.append('Strong signal from unnamed device')
classification = 'high_interest'
# Repeat detections across scans
if times_seen >= 3:
reasons.append(f'Repeat detection ({times_seen} times)')
if classification != 'high_interest':
classification = 'high_interest'
return {
'classification': classification,
'reasons': reasons,
'in_baseline': in_baseline,
'times_seen': times_seen,
'is_tracker': tracker_info is not None,
'is_audio_capable': _is_audio_capable_ble(name, device_type),
}
def classify_rf_signal(self, signal: dict) -> dict:
"""
Classify an RF signal into informational/review/high_interest.
Returns:
Dict with 'classification', 'reasons', and metadata
"""
frequency = signal.get('frequency', 0)
power = signal.get('power', signal.get('level', -100))
band = signal.get('band', '')
reasons = []
classification = 'informational'
freq_rounded = round(frequency, 1)
# Track repeat detections
times_seen = _record_device_seen(f'rf:{freq_rounded}')
# Check if in baseline (known frequency)
in_baseline = freq_rounded in self.baseline_rf_freqs if self.baseline else False
# Get frequency risk info
risk, band_name = get_frequency_risk(frequency)
if in_baseline:
reasons.append('Known frequency in baseline')
classification = 'informational'
else:
# New/unidentified RF carrier
reasons.append(f'Unidentified RF carrier in {band_name}')
if risk == 'low':
reasons.append('Background RF noise band')
classification = 'review'
elif risk == 'medium':
reasons.append('ISM band signal')
classification = 'review'
elif risk in ['high', 'critical']:
reasons.append(f'High-risk surveillance band: {band_name}')
classification = 'high_interest'
# Strong persistent signal
if power and float(power) > -40:
reasons.append('Strong persistent transmitter')
classification = 'high_interest'
# Repeat detections (persistent transmitter)
if times_seen >= 2:
reasons.append(f'Persistent transmitter ({times_seen} detections)')
classification = 'high_interest'
return {
'classification': classification,
'reasons': reasons,
'in_baseline': in_baseline,
'times_seen': times_seen,
'risk_level': risk,
'band_name': band_name,
}
def analyze_wifi_device(self, device: dict) -> dict | None:
"""
Analyze a WiFi device for threats.
Args:
device: WiFi device dict with bssid, essid, etc.
Returns:
Threat dict if threat detected, None otherwise
"""
mac = device.get('bssid', device.get('mac', '')).upper()
ssid = device.get('essid', device.get('ssid', ''))
vendor = device.get('vendor', '')
signal = device.get('power', device.get('signal', -100))
threats = []
# Check if new device (not in baseline)
if self.baseline and mac and mac not in self.baseline_wifi_macs:
threats.append({
'type': 'new_device',
'severity': get_threat_severity('new_device', {'signal_strength': signal}),
'reason': 'Device not present in baseline',
})
# Check for hidden camera patterns
if is_potential_camera(ssid=ssid, mac=mac, vendor=vendor):
threats.append({
'type': 'hidden_camera',
'severity': get_threat_severity('hidden_camera', {'signal_strength': signal}),
'reason': 'Device matches WiFi camera patterns',
})
# Check for hidden SSID with strong signal
try:
signal_int = int(signal) if signal else -100
except (ValueError, TypeError):
signal_int = -100
if not ssid and signal and signal_int > -60:
threats.append({
'type': 'anomaly',
'severity': 'medium',
'reason': 'Hidden SSID with strong signal',
})
if not threats:
return None
# Return highest severity threat
threats.sort(key=lambda t: ['low', 'medium', 'high', 'critical'].index(t['severity']), reverse=True)
return {
'threat_type': threats[0]['type'],
'severity': threats[0]['severity'],
'source': 'wifi',
'identifier': mac,
'name': ssid or 'Hidden Network',
'signal_strength': signal,
'details': {
'all_threats': threats,
'vendor': vendor,
'ssid': ssid,
}
}
def analyze_bt_device(self, device: dict) -> dict | None:
"""
Analyze a Bluetooth device for threats.
Args:
device: BT device dict with mac, name, rssi, etc.
Returns:
Threat dict if threat detected, None otherwise
"""
mac = device.get('mac', device.get('address', '')).upper()
name = device.get('name', '')
rssi = device.get('rssi', device.get('signal', -100))
manufacturer = device.get('manufacturer', '')
device_type = device.get('type', '')
manufacturer_data = device.get('manufacturer_data')
threats = []
# Check if new device (not in baseline)
if self.baseline and mac and mac not in self.baseline_bt_macs:
threats.append({
'type': 'new_device',
'severity': get_threat_severity('new_device', {'signal_strength': rssi}),
'reason': 'Device not present in baseline',
})
# Check for known trackers
tracker_info = is_known_tracker(name, manufacturer_data)
if tracker_info:
threats.append({
'type': 'tracker',
'severity': tracker_info.get('risk', 'high'),
'reason': f"Known tracker detected: {tracker_info.get('name', 'Unknown')}",
'tracker_type': tracker_info.get('name'),
})
# Check for suspicious BLE beacons (unnamed, persistent)
try:
rssi_int = int(rssi) if rssi else -100
except (ValueError, TypeError):
rssi_int = -100
if not name and rssi and rssi_int > -70:
threats.append({
'type': 'anomaly',
'severity': 'medium',
'reason': 'Unnamed BLE device with strong signal',
})
if not threats:
return None
# Return highest severity threat
threats.sort(key=lambda t: ['low', 'medium', 'high', 'critical'].index(t['severity']), reverse=True)
return {
'threat_type': threats[0]['type'],
'severity': threats[0]['severity'],
'source': 'bluetooth',
'identifier': mac,
'name': name or 'Unknown BLE Device',
'signal_strength': rssi,
'details': {
'all_threats': threats,
'manufacturer': manufacturer,
'device_type': device_type,
}
}
def analyze_rf_signal(self, signal: dict) -> dict | None:
"""
Analyze an RF signal for threats.
Args:
signal: RF signal dict with frequency, level, etc.
Returns:
Threat dict if threat detected, None otherwise
"""
frequency = signal.get('frequency', 0)
level = signal.get('level', signal.get('power', -100))
modulation = signal.get('modulation', '')
if not frequency:
return None
threats = []
freq_rounded = round(frequency, 1)
# Check if new frequency (not in baseline)
if self.baseline and freq_rounded not in self.baseline_rf_freqs:
risk, band_name = get_frequency_risk(frequency)
threats.append({
'type': 'unknown_signal',
'severity': risk,
'reason': f'New signal in {band_name}',
})
# Check frequency risk even without baseline
risk, band_name = get_frequency_risk(frequency)
if risk in ['high', 'critical']:
threats.append({
'type': 'unknown_signal',
'severity': risk,
'reason': f'Signal in high-risk band: {band_name}',
})
if not threats:
return None
# Return highest severity threat
threats.sort(key=lambda t: ['low', 'medium', 'high', 'critical'].index(t['severity']), reverse=True)
return {
'threat_type': threats[0]['type'],
'severity': threats[0]['severity'],
'source': 'rf',
'identifier': f'{frequency:.3f} MHz',
'name': f'RF Signal @ {frequency:.3f} MHz',
'signal_strength': level,
'frequency': frequency,
'details': {
'all_threats': threats,
'modulation': modulation,
'band_name': band_name,
}
}
def analyze_all(
self,
wifi_devices: list[dict] | None = None,
bt_devices: list[dict] | None = None,
rf_signals: list[dict] | None = None
) -> list[dict]:
"""
Analyze all provided devices and signals for threats.
Returns:
List of detected threats sorted by severity
"""
threats = []
if wifi_devices:
for device in wifi_devices:
threat = self.analyze_wifi_device(device)
if threat:
threats.append(threat)
if bt_devices:
for device in bt_devices:
threat = self.analyze_bt_device(device)
if threat:
threats.append(threat)
if rf_signals:
for signal in rf_signals:
threat = self.analyze_rf_signal(signal)
if threat:
threats.append(threat)
# Sort by severity (critical first)
severity_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
threats.sort(key=lambda t: severity_order.get(t.get('severity', 'low'), 3))
return threats
def classify_device_threat(
source: str,
device: dict,
baseline: dict | None = None
) -> dict | None:
"""
Convenience function to classify a single device.
Args:
source: Device source ('wifi', 'bluetooth', 'rf')
device: Device data dict
baseline: Optional baseline for comparison
Returns:
Threat dict if threat detected, None otherwise
"""
detector = ThreatDetector(baseline)
if source == 'wifi':
return detector.analyze_wifi_device(device)
elif source == 'bluetooth':
return detector.analyze_bt_device(device)
elif source == 'rf':
return detector.analyze_rf_signal(device)
return None

File diff suppressed because it is too large Load Diff

813
utils/tscm/reports.py Normal file
View File

@@ -0,0 +1,813 @@
"""
TSCM Report Generation Module
Generates:
1. Client-safe PDF reports with executive summary
2. Technical annex (JSON + CSV) with device timelines and indicators
DISCLAIMER: All reports include mandatory disclaimers.
No packet data. No claims of confirmed surveillance.
"""
from __future__ import annotations
import csv
import io
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
logger = logging.getLogger('intercept.tscm.reports')
# =============================================================================
# Report Data Structures
# =============================================================================
@dataclass
class ReportFinding:
"""A single finding for the report."""
identifier: str
protocol: str
name: Optional[str]
risk_level: str
risk_score: int
description: str
indicators: list[dict] = field(default_factory=list)
recommended_action: str = ''
playbook_reference: str = ''
@dataclass
class ReportMeetingSummary:
"""Meeting window summary for report."""
name: Optional[str]
start_time: str
end_time: Optional[str]
duration_minutes: float
devices_first_seen: int
behavior_changes: int
high_interest_devices: int
@dataclass
class TSCMReport:
"""
Complete TSCM sweep report.
Contains all data needed for both client-safe PDF and technical annex.
"""
# Report metadata
report_id: str
generated_at: datetime
sweep_id: int
sweep_type: str
# Location and context
location: Optional[str] = None
baseline_id: Optional[int] = None
baseline_name: Optional[str] = None
# Executive summary
executive_summary: str = ''
overall_risk_assessment: str = 'low' # low, moderate, elevated, high
key_findings_count: int = 0
# Capabilities used
capabilities: dict = field(default_factory=dict)
limitations: list[str] = field(default_factory=list)
# Findings by risk tier
high_interest_findings: list[ReportFinding] = field(default_factory=list)
needs_review_findings: list[ReportFinding] = field(default_factory=list)
informational_findings: list[ReportFinding] = field(default_factory=list)
# Meeting window summaries
meeting_summaries: list[ReportMeetingSummary] = field(default_factory=list)
# Statistics
total_devices_scanned: int = 0
wifi_devices: int = 0
bluetooth_devices: int = 0
rf_signals: int = 0
new_devices: int = 0
missing_devices: int = 0
# Sweep duration
sweep_start: Optional[datetime] = None
sweep_end: Optional[datetime] = None
duration_minutes: float = 0.0
# Technical data (for annex only)
device_timelines: list[dict] = field(default_factory=list)
all_indicators: list[dict] = field(default_factory=list)
baseline_diff: Optional[dict] = None
correlation_data: list[dict] = field(default_factory=list)
# =============================================================================
# Disclaimer Text
# =============================================================================
REPORT_DISCLAIMER = """
IMPORTANT DISCLAIMER
This report documents the findings of a Technical Surveillance Countermeasures
(TSCM) sweep conducted using electronic detection equipment. The following
limitations and considerations apply:
1. DETECTION LIMITATIONS: No TSCM sweep can guarantee detection of all
surveillance devices. Sophisticated devices may evade detection.
2. FINDINGS ARE INDICATORS: All findings represent patterns and indicators,
NOT confirmed surveillance devices. Each finding requires professional
interpretation and may have legitimate explanations.
3. ENVIRONMENTAL FACTORS: Wireless signals are affected by building
construction, interference, and other environmental factors that may
impact detection accuracy.
4. POINT-IN-TIME ASSESSMENT: This report reflects conditions at the time
of the sweep. Conditions may change after the assessment.
5. NOT LEGAL ADVICE: This report does not constitute legal advice. Consult
qualified legal counsel for guidance on surveillance-related matters.
6. PRIVACY CONSIDERATIONS: Some detected devices may be legitimate personal
devices of authorized individuals.
This report should be treated as confidential and distributed only to
authorized personnel on a need-to-know basis.
"""
ANNEX_DISCLAIMER = """
TECHNICAL ANNEX DISCLAIMER
This annex contains detailed technical data from the TSCM sweep. This data
is provided for documentation and audit purposes.
- No raw packet captures or intercepted communications are included
- Device identifiers (MAC addresses) are included for tracking purposes
- Signal strength values are approximate and environment-dependent
- Timeline data is time-bucketed to preserve privacy
- All interpretations require professional TSCM expertise
This data should be handled according to organizational data protection
policies and applicable privacy regulations.
"""
# =============================================================================
# Report Generation Functions
# =============================================================================
def generate_executive_summary(report: TSCMReport) -> str:
"""Generate executive summary text."""
lines = []
# Opening
lines.append(f"TSCM Sweep Report - {report.location or 'Location Not Specified'}")
lines.append(f"Conducted: {report.sweep_start.strftime('%Y-%m-%d %H:%M') if report.sweep_start else 'Unknown'}")
lines.append(f"Duration: {report.duration_minutes:.0f} minutes")
lines.append("")
# Overall assessment
assessment_text = {
'low': 'No significant indicators of surveillance activity were detected.',
'moderate': 'Some devices require review but no confirmed surveillance indicators.',
'elevated': 'Multiple indicators warrant further investigation.',
'high': 'Significant indicators detected requiring immediate attention.',
}
lines.append(f"OVERALL ASSESSMENT: {report.overall_risk_assessment.upper()}")
lines.append(assessment_text.get(report.overall_risk_assessment, ''))
lines.append("")
# Key statistics
lines.append("SCAN STATISTICS:")
lines.append(f" - Total devices scanned: {report.total_devices_scanned}")
lines.append(f" - WiFi access points: {report.wifi_devices}")
lines.append(f" - Bluetooth devices: {report.bluetooth_devices}")
lines.append(f" - RF signals: {report.rf_signals}")
lines.append("")
# Findings summary
lines.append("FINDINGS SUMMARY:")
lines.append(f" - High Interest (require investigation): {len(report.high_interest_findings)}")
lines.append(f" - Needs Review: {len(report.needs_review_findings)}")
lines.append(f" - Informational: {len(report.informational_findings)}")
lines.append("")
# Baseline comparison if available
if report.baseline_name:
lines.append(f"BASELINE COMPARISON (vs '{report.baseline_name}'):")
lines.append(f" - New devices: {report.new_devices}")
lines.append(f" - Missing devices: {report.missing_devices}")
lines.append("")
# Meeting window summary if available
if report.meeting_summaries:
lines.append("MEETING WINDOW ACTIVITY:")
for meeting in report.meeting_summaries:
lines.append(f" - {meeting.name or 'Unnamed meeting'}: "
f"{meeting.devices_first_seen} new devices, "
f"{meeting.high_interest_devices} high interest")
lines.append("")
# Limitations
if report.limitations:
lines.append("SWEEP LIMITATIONS:")
for limit in report.limitations[:3]: # Top 3 limitations
lines.append(f" - {limit}")
lines.append("")
return "\n".join(lines)
def generate_findings_section(findings: list[ReportFinding], title: str) -> str:
"""Generate a findings section for the report."""
if not findings:
return f"{title}\n\nNo findings in this category.\n"
lines = [title, "=" * len(title), ""]
for i, finding in enumerate(findings, 1):
lines.append(f"{i}. {finding.name or finding.identifier}")
lines.append(f" Protocol: {finding.protocol.upper()}")
lines.append(f" Identifier: {finding.identifier}")
lines.append(f" Risk Score: {finding.risk_score}")
lines.append(f" Description: {finding.description}")
if finding.indicators:
lines.append(" Indicators:")
for ind in finding.indicators[:5]: # Limit to 5 indicators
lines.append(f" - {ind.get('type', 'unknown')}: {ind.get('description', '')}")
lines.append(f" Recommended Action: {finding.recommended_action}")
if finding.playbook_reference:
lines.append(f" Reference: {finding.playbook_reference}")
lines.append("")
return "\n".join(lines)
def generate_meeting_section(summaries: list[ReportMeetingSummary]) -> str:
"""Generate meeting window summary section."""
if not summaries:
return "MEETING WINDOW SUMMARY\n\nNo meeting windows were marked during this sweep.\n"
lines = ["MEETING WINDOW SUMMARY", "=" * 22, ""]
for meeting in summaries:
lines.append(f"Meeting: {meeting.name or 'Unnamed'}")
lines.append(f" Time: {meeting.start_time} - {meeting.end_time or 'ongoing'}")
lines.append(f" Duration: {meeting.duration_minutes:.0f} minutes")
lines.append(f" Devices first seen during meeting: {meeting.devices_first_seen}")
lines.append(f" Behavior changes detected: {meeting.behavior_changes}")
lines.append(f" High interest devices active: {meeting.high_interest_devices}")
if meeting.devices_first_seen > 0 or meeting.high_interest_devices > 0:
lines.append(" NOTE: Meeting-correlated activity detected - see findings for details")
lines.append("")
lines.append("Meeting-correlated activity indicates temporal correlation only.")
lines.append("Devices appearing during meetings may have legitimate explanations.")
lines.append("")
return "\n".join(lines)
def generate_pdf_content(report: TSCMReport) -> str:
"""
Generate complete PDF report content.
Returns plain text that can be converted to PDF.
For actual PDF generation, use a library like reportlab or weasyprint.
"""
sections = []
# Header
sections.append("=" * 70)
sections.append("TECHNICAL SURVEILLANCE COUNTERMEASURES (TSCM) SWEEP REPORT")
sections.append("=" * 70)
sections.append("")
sections.append(f"Report ID: {report.report_id}")
sections.append(f"Generated: {report.generated_at.strftime('%Y-%m-%d %H:%M:%S')}")
sections.append(f"Sweep ID: {report.sweep_id}")
sections.append("")
# Executive Summary
sections.append("-" * 70)
sections.append("EXECUTIVE SUMMARY")
sections.append("-" * 70)
sections.append(report.executive_summary or generate_executive_summary(report))
sections.append("")
# High Interest Findings
if report.high_interest_findings:
sections.append("-" * 70)
sections.append(generate_findings_section(
report.high_interest_findings,
"HIGH INTEREST FINDINGS"
))
# Needs Review Findings
if report.needs_review_findings:
sections.append("-" * 70)
sections.append(generate_findings_section(
report.needs_review_findings,
"FINDINGS REQUIRING REVIEW"
))
# Meeting Window Summary
if report.meeting_summaries:
sections.append("-" * 70)
sections.append(generate_meeting_section(report.meeting_summaries))
# Capabilities & Limitations
sections.append("-" * 70)
sections.append("SWEEP CAPABILITIES & LIMITATIONS")
sections.append("=" * 33)
sections.append("")
if report.capabilities:
caps = report.capabilities
sections.append("Equipment Used:")
if caps.get('wifi', {}).get('mode') != 'unavailable':
sections.append(f" - WiFi: {caps.get('wifi', {}).get('mode', 'unknown')} mode")
if caps.get('bluetooth', {}).get('mode') != 'unavailable':
sections.append(f" - Bluetooth: {caps.get('bluetooth', {}).get('mode', 'unknown')}")
if caps.get('rf', {}).get('available'):
sections.append(f" - RF/SDR: {caps.get('rf', {}).get('device_type', 'unknown')}")
sections.append("")
if report.limitations:
sections.append("Limitations:")
for limit in report.limitations:
sections.append(f" - {limit}")
sections.append("")
# Disclaimer
sections.append("-" * 70)
sections.append(REPORT_DISCLAIMER)
# Footer
sections.append("")
sections.append("=" * 70)
sections.append("END OF REPORT")
sections.append("=" * 70)
return "\n".join(sections)
def generate_technical_annex_json(report: TSCMReport) -> dict:
"""
Generate technical annex as JSON.
Contains detailed device timelines, all indicators, and raw data
for audit and further analysis.
"""
return {
'annex_type': 'tscm_technical_annex',
'report_id': report.report_id,
'generated_at': report.generated_at.isoformat(),
'sweep_id': report.sweep_id,
'disclaimer': ANNEX_DISCLAIMER.strip(),
'sweep_details': {
'type': report.sweep_type,
'location': report.location,
'start_time': report.sweep_start.isoformat() if report.sweep_start else None,
'end_time': report.sweep_end.isoformat() if report.sweep_end else None,
'duration_minutes': report.duration_minutes,
'baseline_id': report.baseline_id,
'baseline_name': report.baseline_name,
},
'capabilities': report.capabilities,
'limitations': report.limitations,
'statistics': {
'total_devices': report.total_devices_scanned,
'wifi_devices': report.wifi_devices,
'bluetooth_devices': report.bluetooth_devices,
'rf_signals': report.rf_signals,
'new_devices': report.new_devices,
'missing_devices': report.missing_devices,
'high_interest_count': len(report.high_interest_findings),
'needs_review_count': len(report.needs_review_findings),
'informational_count': len(report.informational_findings),
},
'findings': {
'high_interest': [
{
'identifier': f.identifier,
'protocol': f.protocol,
'name': f.name,
'risk_score': f.risk_score,
'description': f.description,
'indicators': f.indicators,
'recommended_action': f.recommended_action,
}
for f in report.high_interest_findings
],
'needs_review': [
{
'identifier': f.identifier,
'protocol': f.protocol,
'name': f.name,
'risk_score': f.risk_score,
'description': f.description,
'indicators': f.indicators,
}
for f in report.needs_review_findings
],
},
'meeting_windows': [
{
'name': m.name,
'start_time': m.start_time,
'end_time': m.end_time,
'duration_minutes': m.duration_minutes,
'devices_first_seen': m.devices_first_seen,
'behavior_changes': m.behavior_changes,
'high_interest_devices': m.high_interest_devices,
}
for m in report.meeting_summaries
],
'device_timelines': report.device_timelines,
'all_indicators': report.all_indicators,
'baseline_diff': report.baseline_diff,
'correlations': report.correlation_data,
}
def generate_technical_annex_csv(report: TSCMReport) -> str:
"""
Generate device timeline data as CSV.
Provides spreadsheet-compatible format for further analysis.
"""
output = io.StringIO()
writer = csv.writer(output)
# Header
writer.writerow([
'identifier',
'protocol',
'name',
'risk_level',
'risk_score',
'first_seen',
'last_seen',
'observation_count',
'rssi_min',
'rssi_max',
'rssi_mean',
'rssi_stability',
'movement_pattern',
'meeting_correlated',
'indicators',
])
# Device data from timelines
for timeline in report.device_timelines:
indicators_str = '; '.join(
f"{i.get('type', '')}({i.get('score', 0)})"
for i in timeline.get('indicators', [])
)
signal = timeline.get('signal', {})
metrics = timeline.get('metrics', {})
movement = timeline.get('movement', {})
meeting = timeline.get('meeting_correlation', {})
writer.writerow([
timeline.get('identifier', ''),
timeline.get('protocol', ''),
timeline.get('name', ''),
timeline.get('risk_level', 'informational'),
timeline.get('risk_score', 0),
metrics.get('first_seen', ''),
metrics.get('last_seen', ''),
metrics.get('total_observations', 0),
signal.get('rssi_min', ''),
signal.get('rssi_max', ''),
signal.get('rssi_mean', ''),
signal.get('stability', ''),
movement.get('pattern', ''),
meeting.get('correlated', False),
indicators_str,
])
# Also add findings summary
writer.writerow([])
writer.writerow(['--- FINDINGS SUMMARY ---'])
writer.writerow(['identifier', 'protocol', 'risk_level', 'risk_score', 'description', 'recommended_action'])
all_findings = (
report.high_interest_findings +
report.needs_review_findings
)
for finding in all_findings:
writer.writerow([
finding.identifier,
finding.protocol,
finding.risk_level,
finding.risk_score,
finding.description,
finding.recommended_action,
])
return output.getvalue()
# =============================================================================
# Report Builder
# =============================================================================
class TSCMReportBuilder:
"""
Builder for constructing TSCM reports from sweep data.
Usage:
builder = TSCMReportBuilder(sweep_id=123)
builder.set_location("Conference Room A")
builder.add_capabilities(capabilities_dict)
builder.add_finding(finding)
report = builder.build()
"""
def __init__(self, sweep_id: int):
self.sweep_id = sweep_id
self.report = TSCMReport(
report_id=f"TSCM-{sweep_id}-{datetime.now().strftime('%Y%m%d%H%M%S')}",
generated_at=datetime.now(),
sweep_id=sweep_id,
sweep_type='standard',
)
def set_sweep_type(self, sweep_type: str) -> 'TSCMReportBuilder':
self.report.sweep_type = sweep_type
return self
def set_location(self, location: str) -> 'TSCMReportBuilder':
self.report.location = location
return self
def set_baseline(self, baseline_id: int, baseline_name: str) -> 'TSCMReportBuilder':
self.report.baseline_id = baseline_id
self.report.baseline_name = baseline_name
return self
def set_sweep_times(
self,
start: datetime,
end: Optional[datetime] = None
) -> 'TSCMReportBuilder':
self.report.sweep_start = start
self.report.sweep_end = end or datetime.now()
self.report.duration_minutes = (
(self.report.sweep_end - self.report.sweep_start).total_seconds() / 60
)
return self
def add_capabilities(self, capabilities: dict) -> 'TSCMReportBuilder':
self.report.capabilities = capabilities
self.report.limitations = capabilities.get('all_limitations', [])
return self
def add_finding(self, finding: ReportFinding) -> 'TSCMReportBuilder':
if finding.risk_level == 'high_interest':
self.report.high_interest_findings.append(finding)
elif finding.risk_level in ['review', 'needs_review']:
self.report.needs_review_findings.append(finding)
else:
self.report.informational_findings.append(finding)
return self
def add_findings_from_profiles(self, profiles: list[dict]) -> 'TSCMReportBuilder':
"""Add findings from correlation engine device profiles."""
for profile in profiles:
finding = ReportFinding(
identifier=profile.get('identifier', ''),
protocol=profile.get('protocol', ''),
name=profile.get('name'),
risk_level=profile.get('risk_level', 'informational'),
risk_score=profile.get('total_score', 0),
description=self._generate_finding_description(profile),
indicators=profile.get('indicators', []),
recommended_action=profile.get('recommended_action', 'monitor'),
playbook_reference=self._get_playbook_reference(profile),
)
self.add_finding(finding)
return self
def _generate_finding_description(self, profile: dict) -> str:
"""Generate description from profile indicators."""
indicators = profile.get('indicators', [])
if not indicators:
return f"{profile.get('protocol', 'Unknown').upper()} device detected"
# Use first indicator as primary description
primary = indicators[0]
desc = primary.get('description', 'Pattern detected')
if len(indicators) > 1:
desc += f" (+{len(indicators) - 1} additional indicators)"
return desc
def _get_playbook_reference(self, profile: dict) -> str:
"""Get playbook reference based on profile."""
risk_level = profile.get('risk_level', 'informational')
indicators = profile.get('indicators', [])
# Check for tracker
tracker_types = ['airtag_detected', 'tile_detected', 'smarttag_detected', 'known_tracker']
if any(i.get('type') in tracker_types for i in indicators):
if risk_level == 'high_interest':
return 'PB-001 (Tracker Detection)'
if risk_level == 'high_interest':
return 'PB-002 (Suspicious Device)'
elif risk_level in ['review', 'needs_review']:
return 'PB-003 (Unknown Device)'
return ''
def add_meeting_summary(self, summary: dict) -> 'TSCMReportBuilder':
"""Add meeting window summary."""
meeting = ReportMeetingSummary(
name=summary.get('name'),
start_time=summary.get('start_time', ''),
end_time=summary.get('end_time'),
duration_minutes=summary.get('duration_minutes', 0),
devices_first_seen=summary.get('devices_first_seen', 0),
behavior_changes=summary.get('behavior_changes', 0),
high_interest_devices=summary.get('high_interest_devices', 0),
)
self.report.meeting_summaries.append(meeting)
return self
def add_statistics(
self,
wifi: int = 0,
bluetooth: int = 0,
rf: int = 0,
new: int = 0,
missing: int = 0
) -> 'TSCMReportBuilder':
self.report.wifi_devices = wifi
self.report.bluetooth_devices = bluetooth
self.report.rf_signals = rf
self.report.total_devices_scanned = wifi + bluetooth + rf
self.report.new_devices = new
self.report.missing_devices = missing
return self
def add_device_timelines(self, timelines: list[dict]) -> 'TSCMReportBuilder':
self.report.device_timelines = timelines
return self
def add_all_indicators(self, indicators: list[dict]) -> 'TSCMReportBuilder':
self.report.all_indicators = indicators
return self
def add_baseline_diff(self, diff: dict) -> 'TSCMReportBuilder':
self.report.baseline_diff = diff
return self
def add_correlations(self, correlations: list[dict]) -> 'TSCMReportBuilder':
self.report.correlation_data = correlations
return self
def build(self) -> TSCMReport:
"""Build and return the complete report."""
# Calculate overall risk assessment
if self.report.high_interest_findings:
if len(self.report.high_interest_findings) >= 3:
self.report.overall_risk_assessment = 'high'
else:
self.report.overall_risk_assessment = 'elevated'
elif self.report.needs_review_findings:
self.report.overall_risk_assessment = 'moderate'
else:
self.report.overall_risk_assessment = 'low'
self.report.key_findings_count = (
len(self.report.high_interest_findings) +
len(self.report.needs_review_findings)
)
# Generate executive summary
self.report.executive_summary = generate_executive_summary(self.report)
return self.report
# =============================================================================
# Report Generation API Functions
# =============================================================================
def generate_report(
sweep_id: int,
sweep_data: dict,
device_profiles: list[dict],
capabilities: dict,
timelines: list[dict],
baseline_diff: Optional[dict] = None,
meeting_summaries: Optional[list[dict]] = None,
correlations: Optional[list[dict]] = None,
) -> TSCMReport:
"""
Generate a complete TSCM report from sweep data.
Args:
sweep_id: Sweep ID
sweep_data: Sweep dict from database
device_profiles: List of DeviceProfile dicts from correlation engine
capabilities: Capabilities dict
timelines: Device timeline dicts
baseline_diff: Optional baseline diff dict
meeting_summaries: Optional meeting summaries
correlations: Optional correlation data
Returns:
Complete TSCMReport
"""
builder = TSCMReportBuilder(sweep_id)
# Basic info
builder.set_sweep_type(sweep_data.get('sweep_type', 'standard'))
# Parse times
started_at = sweep_data.get('started_at')
completed_at = sweep_data.get('completed_at')
if started_at:
if isinstance(started_at, str):
started_at = datetime.fromisoformat(started_at.replace('Z', '+00:00')).replace(tzinfo=None)
if completed_at:
if isinstance(completed_at, str):
completed_at = datetime.fromisoformat(completed_at.replace('Z', '+00:00')).replace(tzinfo=None)
builder.set_sweep_times(started_at, completed_at)
# Capabilities
builder.add_capabilities(capabilities)
# Add findings from profiles
builder.add_findings_from_profiles(device_profiles)
# Statistics
results = sweep_data.get('results', {})
builder.add_statistics(
wifi=len(results.get('wifi', [])),
bluetooth=len(results.get('bluetooth', [])),
rf=len(results.get('rf', [])),
new=baseline_diff.get('summary', {}).get('new_devices', 0) if baseline_diff else 0,
missing=baseline_diff.get('summary', {}).get('missing_devices', 0) if baseline_diff else 0,
)
# Technical data
builder.add_device_timelines(timelines)
if baseline_diff:
builder.add_baseline_diff(baseline_diff)
if meeting_summaries:
for summary in meeting_summaries:
builder.add_meeting_summary(summary)
if correlations:
builder.add_correlations(correlations)
# Extract all indicators
all_indicators = []
for profile in device_profiles:
for ind in profile.get('indicators', []):
all_indicators.append({
'device': profile.get('identifier'),
'protocol': profile.get('protocol'),
**ind
})
builder.add_all_indicators(all_indicators)
return builder.build()
def get_pdf_report(report: TSCMReport) -> str:
"""Get PDF-ready report content."""
return generate_pdf_content(report)
def get_json_annex(report: TSCMReport) -> dict:
"""Get JSON technical annex."""
return generate_technical_annex_json(report)
def get_csv_annex(report: TSCMReport) -> str:
"""Get CSV technical annex."""
return generate_technical_annex_csv(report)