mirror of
https://github.com/smittix/intercept.git
synced 2026-04-25 07:10:00 -07:00
Improve TSCM detection and include WiFi clients
This commit is contained in:
@@ -118,10 +118,15 @@ class DeviceProfile:
|
||||
identifier: str # MAC, BSSID, or frequency
|
||||
protocol: str # 'bluetooth', 'wifi', 'rf'
|
||||
|
||||
# Device info
|
||||
name: Optional[str] = None
|
||||
manufacturer: Optional[str] = None
|
||||
device_type: Optional[str] = None
|
||||
# Device info
|
||||
name: Optional[str] = None
|
||||
manufacturer: Optional[str] = None
|
||||
device_type: Optional[str] = None
|
||||
tracker_type: Optional[str] = None
|
||||
tracker_name: Optional[str] = None
|
||||
tracker_confidence: Optional[str] = None
|
||||
tracker_confidence_score: Optional[float] = None
|
||||
tracker_evidence: list[str] = field(default_factory=list)
|
||||
|
||||
# Bluetooth-specific
|
||||
services: list[str] = field(default_factory=list)
|
||||
@@ -231,14 +236,19 @@ class DeviceProfile:
|
||||
indicator_count = len(self.indicators)
|
||||
self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05))
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert to dictionary for JSON serialization."""
|
||||
return {
|
||||
'identifier': self.identifier,
|
||||
'protocol': self.protocol,
|
||||
'name': self.name,
|
||||
'manufacturer': self.manufacturer,
|
||||
'device_type': self.device_type,
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert to dictionary for JSON serialization."""
|
||||
return {
|
||||
'identifier': self.identifier,
|
||||
'protocol': self.protocol,
|
||||
'name': self.name,
|
||||
'manufacturer': self.manufacturer,
|
||||
'device_type': self.device_type,
|
||||
'tracker_type': self.tracker_type,
|
||||
'tracker_name': self.tracker_name,
|
||||
'tracker_confidence': self.tracker_confidence,
|
||||
'tracker_confidence_score': self.tracker_confidence_score,
|
||||
'tracker_evidence': self.tracker_evidence,
|
||||
'ssid': self.ssid,
|
||||
'frequency': self.frequency,
|
||||
'first_seen': self.first_seen.isoformat() if self.first_seen else None,
|
||||
@@ -266,14 +276,33 @@ class DeviceProfile:
|
||||
|
||||
|
||||
# Known audio-capable BLE service UUIDs
|
||||
AUDIO_SERVICE_UUIDS = [
|
||||
'0000110b-0000-1000-8000-00805f9b34fb', # A2DP Sink
|
||||
'0000110a-0000-1000-8000-00805f9b34fb', # A2DP Source
|
||||
'0000111e-0000-1000-8000-00805f9b34fb', # Handsfree
|
||||
'0000111f-0000-1000-8000-00805f9b34fb', # Handsfree Audio Gateway
|
||||
'00001108-0000-1000-8000-00805f9b34fb', # Headset
|
||||
'00001203-0000-1000-8000-00805f9b34fb', # Generic Audio
|
||||
]
|
||||
AUDIO_SERVICE_UUIDS = [
|
||||
'0000110b-0000-1000-8000-00805f9b34fb', # A2DP Sink
|
||||
'0000110a-0000-1000-8000-00805f9b34fb', # A2DP Source
|
||||
'0000111e-0000-1000-8000-00805f9b34fb', # Handsfree
|
||||
'0000111f-0000-1000-8000-00805f9b34fb', # Handsfree Audio Gateway
|
||||
'00001108-0000-1000-8000-00805f9b34fb', # Headset
|
||||
'00001203-0000-1000-8000-00805f9b34fb', # Generic Audio
|
||||
]
|
||||
|
||||
_BT_BASE_UUID_SUFFIX = '-0000-1000-8000-00805f9b34fb'
|
||||
|
||||
|
||||
def _normalize_bt_uuid(value: str) -> str:
|
||||
"""Normalize BLE UUIDs to 16-bit where possible."""
|
||||
if not value:
|
||||
return ''
|
||||
uuid = str(value).lower().strip()
|
||||
if uuid.startswith('0x'):
|
||||
uuid = uuid[2:]
|
||||
if uuid.endswith(_BT_BASE_UUID_SUFFIX) and len(uuid) >= 8:
|
||||
return uuid[4:8]
|
||||
if len(uuid) == 4:
|
||||
return uuid
|
||||
return uuid
|
||||
|
||||
|
||||
AUDIO_SERVICE_UUIDS_16 = {_normalize_bt_uuid(u) for u in AUDIO_SERVICE_UUIDS}
|
||||
|
||||
# Generic chipset vendors (often used in covert devices)
|
||||
GENERIC_CHIPSET_VENDORS = [
|
||||
@@ -415,10 +444,24 @@ class CorrelationEngine:
|
||||
# Update profile data
|
||||
profile.name = device.get('name') or profile.name
|
||||
profile.manufacturer = device.get('manufacturer') or profile.manufacturer
|
||||
profile.device_type = device.get('type') or profile.device_type
|
||||
profile.services = device.get('services', []) or profile.services
|
||||
profile.company_id = device.get('company_id') or profile.company_id
|
||||
profile.advertising_interval = device.get('advertising_interval') or profile.advertising_interval
|
||||
profile.device_type = device.get('type') or profile.device_type
|
||||
services = device.get('services')
|
||||
if not services:
|
||||
services = device.get('service_uuids')
|
||||
profile.services = services or profile.services
|
||||
profile.company_id = device.get('company_id') or profile.company_id
|
||||
profile.advertising_interval = device.get('advertising_interval') or profile.advertising_interval
|
||||
tracker_data = device.get('tracker') or {}
|
||||
if tracker_data:
|
||||
profile.tracker_type = tracker_data.get('type') or profile.tracker_type
|
||||
profile.tracker_name = tracker_data.get('name') or profile.tracker_name
|
||||
profile.tracker_confidence = tracker_data.get('confidence') or profile.tracker_confidence
|
||||
profile.tracker_confidence_score = tracker_data.get('confidence_score') or profile.tracker_confidence_score
|
||||
evidence = tracker_data.get('evidence')
|
||||
if isinstance(evidence, list):
|
||||
profile.tracker_evidence = evidence
|
||||
elif evidence:
|
||||
profile.tracker_evidence = [str(evidence)]
|
||||
|
||||
# Add RSSI sample
|
||||
rssi = device.get('rssi', device.get('signal'))
|
||||
@@ -431,15 +474,28 @@ class CorrelationEngine:
|
||||
# Clear previous indicators for fresh analysis
|
||||
profile.indicators = []
|
||||
|
||||
# === Detection Logic ===
|
||||
|
||||
# 1. Unknown manufacturer or generic chipset
|
||||
if not profile.manufacturer:
|
||||
profile.add_indicator(
|
||||
IndicatorType.UNKNOWN_DEVICE,
|
||||
'Unknown manufacturer',
|
||||
{'manufacturer': None}
|
||||
)
|
||||
# === Detection Logic ===
|
||||
|
||||
# 1. Unknown manufacturer or generic chipset
|
||||
if not profile.manufacturer and mac and not device.get('is_randomized_mac'):
|
||||
try:
|
||||
first_octet = int(mac.split(':')[0], 16)
|
||||
except (ValueError, IndexError):
|
||||
first_octet = None
|
||||
if first_octet is None or not (first_octet & 0x02):
|
||||
try:
|
||||
from data.oui import get_manufacturer
|
||||
vendor = get_manufacturer(mac)
|
||||
if vendor and vendor != 'Unknown':
|
||||
profile.manufacturer = vendor
|
||||
except Exception:
|
||||
pass
|
||||
if not profile.manufacturer:
|
||||
profile.add_indicator(
|
||||
IndicatorType.UNKNOWN_DEVICE,
|
||||
'Unknown manufacturer',
|
||||
{'manufacturer': None}
|
||||
)
|
||||
elif any(v in profile.manufacturer.lower() for v in GENERIC_CHIPSET_VENDORS):
|
||||
profile.add_indicator(
|
||||
IndicatorType.UNKNOWN_DEVICE,
|
||||
@@ -455,16 +511,16 @@ class CorrelationEngine:
|
||||
{'name': profile.name}
|
||||
)
|
||||
|
||||
# 3. Audio-capable services
|
||||
if profile.services:
|
||||
audio_services = [s for s in profile.services
|
||||
if s.lower() in [u.lower() for u in AUDIO_SERVICE_UUIDS]]
|
||||
if audio_services:
|
||||
profile.add_indicator(
|
||||
IndicatorType.AUDIO_CAPABLE,
|
||||
'Audio-capable BLE services detected',
|
||||
{'services': audio_services}
|
||||
)
|
||||
# 3. Audio-capable services
|
||||
if profile.services:
|
||||
normalized_services = {_normalize_bt_uuid(s) for s in profile.services if s}
|
||||
audio_services = [s for s in normalized_services if s in AUDIO_SERVICE_UUIDS_16]
|
||||
if audio_services:
|
||||
profile.add_indicator(
|
||||
IndicatorType.AUDIO_CAPABLE,
|
||||
'Audio-capable BLE services detected',
|
||||
{'services': audio_services}
|
||||
)
|
||||
|
||||
# Check name for audio keywords
|
||||
if profile.name:
|
||||
@@ -518,15 +574,47 @@ class CorrelationEngine:
|
||||
{'mac': mac}
|
||||
)
|
||||
|
||||
# 9. Known tracker detection (AirTag, Tile, SmartTag, ESP32)
|
||||
mac_prefix = mac[:8] if len(mac) >= 8 else ''
|
||||
tracker_detected = False
|
||||
|
||||
# Check for tracker flags from BLE scanner (manufacturer ID detection)
|
||||
if device.get('is_airtag'):
|
||||
profile.add_indicator(
|
||||
IndicatorType.AIRTAG_DETECTED,
|
||||
'Apple AirTag detected via manufacturer data',
|
||||
# 9. Known tracker detection (AirTag, Tile, SmartTag, ESP32)
|
||||
mac_prefix = mac[:8] if len(mac) >= 8 else ''
|
||||
tracker_detected = False
|
||||
tracker_data = device.get('tracker') or {}
|
||||
|
||||
if tracker_data.get('is_tracker'):
|
||||
tracker_detected = True
|
||||
tracker_label = tracker_data.get('name') or tracker_data.get('type')
|
||||
if tracker_label:
|
||||
label_lower = str(tracker_label).lower()
|
||||
if 'airtag' in label_lower or 'find my' in label_lower:
|
||||
profile.add_indicator(
|
||||
IndicatorType.AIRTAG_DETECTED,
|
||||
f'Tracker detected: {tracker_label}',
|
||||
{'mac': mac, 'tracker_type': tracker_label}
|
||||
)
|
||||
profile.device_type = 'AirTag'
|
||||
elif 'tile' in label_lower:
|
||||
profile.add_indicator(
|
||||
IndicatorType.TILE_DETECTED,
|
||||
f'Tracker detected: {tracker_label}',
|
||||
{'mac': mac, 'tracker_type': tracker_label}
|
||||
)
|
||||
profile.device_type = 'Tile Tracker'
|
||||
elif 'smarttag' in label_lower or 'samsung' in label_lower:
|
||||
profile.add_indicator(
|
||||
IndicatorType.SMARTTAG_DETECTED,
|
||||
f'Tracker detected: {tracker_label}',
|
||||
{'mac': mac, 'tracker_type': tracker_label}
|
||||
)
|
||||
profile.device_type = 'Samsung SmartTag'
|
||||
else:
|
||||
profile.device_type = tracker_label
|
||||
elif not profile.device_type:
|
||||
profile.device_type = 'Tracker'
|
||||
|
||||
# Check for tracker flags from BLE scanner (manufacturer ID detection)
|
||||
if device.get('is_airtag'):
|
||||
profile.add_indicator(
|
||||
IndicatorType.AIRTAG_DETECTED,
|
||||
'Apple AirTag detected via manufacturer data',
|
||||
{'mac': mac, 'tracker_type': 'AirTag'}
|
||||
)
|
||||
profile.device_type = device.get('tracker_type', 'AirTag')
|
||||
@@ -662,31 +750,41 @@ class CorrelationEngine:
|
||||
|
||||
return profile
|
||||
|
||||
def analyze_wifi_device(self, device: dict) -> DeviceProfile:
|
||||
"""
|
||||
Analyze a Wi-Fi device/AP for suspicious indicators.
|
||||
def analyze_wifi_device(self, device: dict) -> DeviceProfile:
|
||||
"""
|
||||
Analyze a Wi-Fi device/AP for suspicious indicators.
|
||||
|
||||
Args:
|
||||
device: Dict with bssid, ssid, channel, rssi, encryption, etc.
|
||||
|
||||
Returns:
|
||||
DeviceProfile with risk assessment
|
||||
"""
|
||||
bssid = device.get('bssid', device.get('mac', '')).upper()
|
||||
profile = self.get_or_create_profile(bssid, 'wifi')
|
||||
|
||||
# Update profile data
|
||||
ssid = device.get('ssid', device.get('essid', ''))
|
||||
profile.ssid = ssid if ssid else profile.ssid
|
||||
profile.name = ssid or f'Hidden Network ({bssid[-8:]})'
|
||||
profile.channel = device.get('channel') or profile.channel
|
||||
profile.encryption = device.get('encryption', device.get('privacy')) or profile.encryption
|
||||
profile.beacon_interval = device.get('beacon_interval') or profile.beacon_interval
|
||||
profile.is_hidden = not ssid or ssid in ['', 'Hidden', '[Hidden]']
|
||||
|
||||
# Extract manufacturer from OUI
|
||||
if bssid and len(bssid) >= 8:
|
||||
profile.manufacturer = device.get('vendor') or profile.manufacturer
|
||||
Returns:
|
||||
DeviceProfile with risk assessment
|
||||
"""
|
||||
bssid = device.get('bssid', device.get('mac', '')).upper()
|
||||
profile = self.get_or_create_profile(bssid, 'wifi')
|
||||
is_client = bool(device.get('is_client') or device.get('role') == 'client')
|
||||
|
||||
# Update profile data
|
||||
ssid = device.get('ssid', device.get('essid', ''))
|
||||
if is_client:
|
||||
profile.name = device.get('name') or device.get('vendor') or profile.name or f'Client ({bssid[-8:]})'
|
||||
profile.device_type = 'client'
|
||||
profile.ssid = profile.ssid # Clients are not SSIDs
|
||||
profile.channel = device.get('channel') or profile.channel
|
||||
profile.encryption = profile.encryption
|
||||
profile.beacon_interval = profile.beacon_interval
|
||||
profile.is_hidden = False
|
||||
else:
|
||||
profile.ssid = ssid if ssid else profile.ssid
|
||||
profile.name = ssid or f'Hidden Network ({bssid[-8:]})'
|
||||
profile.channel = device.get('channel') or profile.channel
|
||||
profile.encryption = device.get('encryption', device.get('privacy')) or profile.encryption
|
||||
profile.beacon_interval = device.get('beacon_interval') or profile.beacon_interval
|
||||
profile.is_hidden = not ssid or ssid in ['', 'Hidden', '[Hidden]']
|
||||
|
||||
# Extract manufacturer from OUI
|
||||
if bssid and len(bssid) >= 8:
|
||||
profile.manufacturer = device.get('vendor') or profile.manufacturer
|
||||
|
||||
# Add RSSI sample
|
||||
rssi = device.get('rssi', device.get('power', device.get('signal')))
|
||||
@@ -699,78 +797,118 @@ class CorrelationEngine:
|
||||
# Clear previous indicators
|
||||
profile.indicators = []
|
||||
|
||||
# === Detection Logic ===
|
||||
|
||||
# 1. Hidden or unnamed SSID
|
||||
if profile.is_hidden:
|
||||
profile.add_indicator(
|
||||
IndicatorType.HIDDEN_IDENTITY,
|
||||
'Hidden or empty SSID',
|
||||
{'ssid': ssid}
|
||||
)
|
||||
|
||||
# 2. BSSID not in authorized list (would need baseline)
|
||||
# For now, mark as unknown if no manufacturer
|
||||
if not profile.manufacturer:
|
||||
profile.add_indicator(
|
||||
IndicatorType.UNKNOWN_DEVICE,
|
||||
'Unknown AP manufacturer',
|
||||
{'bssid': bssid}
|
||||
)
|
||||
|
||||
# 3. Consumer device OUI in restricted environment
|
||||
consumer_ouis = ['tp-link', 'netgear', 'd-link', 'linksys', 'asus']
|
||||
if profile.manufacturer and any(c in profile.manufacturer.lower() for c in consumer_ouis):
|
||||
profile.add_indicator(
|
||||
IndicatorType.ROGUE_AP,
|
||||
f'Consumer-grade AP detected: {profile.manufacturer}',
|
||||
{'manufacturer': profile.manufacturer}
|
||||
)
|
||||
|
||||
# 4. Camera device patterns
|
||||
camera_keywords = ['cam', 'camera', 'ipcam', 'dvr', 'nvr', 'wyze',
|
||||
'ring', 'arlo', 'nest', 'blink', 'eufy', 'yi']
|
||||
if ssid and any(k in ssid.lower() for k in camera_keywords):
|
||||
profile.add_indicator(
|
||||
IndicatorType.AUDIO_CAPABLE, # Cameras often have mics
|
||||
f'Potential camera device: {ssid}',
|
||||
{'ssid': ssid}
|
||||
)
|
||||
|
||||
# 5. Persistent presence
|
||||
if profile.detection_count >= 3:
|
||||
profile.add_indicator(
|
||||
IndicatorType.PERSISTENT,
|
||||
f'Persistent AP ({profile.detection_count} detections)',
|
||||
{'count': profile.detection_count}
|
||||
)
|
||||
|
||||
# 6. Stable RSSI (fixed placement)
|
||||
rssi_stability = profile.get_rssi_stability()
|
||||
if rssi_stability > 0.7 and len(profile.rssi_samples) >= 5:
|
||||
profile.add_indicator(
|
||||
IndicatorType.STABLE_RSSI,
|
||||
f'Stable signal (stability: {rssi_stability:.0%})',
|
||||
{'stability': rssi_stability}
|
||||
)
|
||||
|
||||
# 7. Meeting correlation
|
||||
if self.is_during_meeting():
|
||||
profile.add_indicator(
|
||||
IndicatorType.MEETING_CORRELATED,
|
||||
'Detected during sensitive period',
|
||||
{'during_meeting': True}
|
||||
)
|
||||
|
||||
# 8. Strong hidden AP (very suspicious)
|
||||
if profile.is_hidden and profile.rssi_samples:
|
||||
latest_rssi = profile.rssi_samples[-1][1]
|
||||
if latest_rssi > -50:
|
||||
profile.add_indicator(
|
||||
IndicatorType.ROGUE_AP,
|
||||
f'Strong hidden AP (RSSI: {latest_rssi} dBm)',
|
||||
{'rssi': latest_rssi}
|
||||
)
|
||||
# === Detection Logic ===
|
||||
if is_client:
|
||||
if not profile.manufacturer:
|
||||
profile.add_indicator(
|
||||
IndicatorType.UNKNOWN_DEVICE,
|
||||
'Unknown client manufacturer',
|
||||
{'mac': bssid}
|
||||
)
|
||||
|
||||
if profile.detection_count >= 3:
|
||||
profile.add_indicator(
|
||||
IndicatorType.PERSISTENT,
|
||||
f'Persistent client ({profile.detection_count} detections)',
|
||||
{'count': profile.detection_count}
|
||||
)
|
||||
|
||||
rssi_stability = profile.get_rssi_stability()
|
||||
if rssi_stability > 0.7 and len(profile.rssi_samples) >= 5:
|
||||
profile.add_indicator(
|
||||
IndicatorType.STABLE_RSSI,
|
||||
f'Stable client signal (stability: {rssi_stability:.0%})',
|
||||
{'stability': rssi_stability}
|
||||
)
|
||||
|
||||
if self.is_during_meeting():
|
||||
profile.add_indicator(
|
||||
IndicatorType.MEETING_CORRELATED,
|
||||
'Detected during sensitive period',
|
||||
{'during_meeting': True}
|
||||
)
|
||||
|
||||
try:
|
||||
first_octet = int(bssid.split(':')[0], 16)
|
||||
if first_octet & 0x02:
|
||||
profile.add_indicator(
|
||||
IndicatorType.MAC_ROTATION,
|
||||
'Random/locally administered MAC detected',
|
||||
{'mac': bssid}
|
||||
)
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
else:
|
||||
# 1. Hidden or unnamed SSID
|
||||
if profile.is_hidden:
|
||||
profile.add_indicator(
|
||||
IndicatorType.HIDDEN_IDENTITY,
|
||||
'Hidden or empty SSID',
|
||||
{'ssid': ssid}
|
||||
)
|
||||
|
||||
# 2. BSSID not in authorized list (would need baseline)
|
||||
# For now, mark as unknown if no manufacturer
|
||||
if not profile.manufacturer:
|
||||
profile.add_indicator(
|
||||
IndicatorType.UNKNOWN_DEVICE,
|
||||
'Unknown AP manufacturer',
|
||||
{'bssid': bssid}
|
||||
)
|
||||
|
||||
# 3. Consumer device OUI in restricted environment
|
||||
consumer_ouis = ['tp-link', 'netgear', 'd-link', 'linksys', 'asus']
|
||||
if profile.manufacturer and any(c in profile.manufacturer.lower() for c in consumer_ouis):
|
||||
profile.add_indicator(
|
||||
IndicatorType.ROGUE_AP,
|
||||
f'Consumer-grade AP detected: {profile.manufacturer}',
|
||||
{'manufacturer': profile.manufacturer}
|
||||
)
|
||||
|
||||
# 4. Camera device patterns
|
||||
camera_keywords = ['cam', 'camera', 'ipcam', 'dvr', 'nvr', 'wyze',
|
||||
'ring', 'arlo', 'nest', 'blink', 'eufy', 'yi']
|
||||
if ssid and any(k in ssid.lower() for k in camera_keywords):
|
||||
profile.add_indicator(
|
||||
IndicatorType.AUDIO_CAPABLE, # Cameras often have mics
|
||||
f'Potential camera device: {ssid}',
|
||||
{'ssid': ssid}
|
||||
)
|
||||
|
||||
# 5. Persistent presence
|
||||
if profile.detection_count >= 3:
|
||||
profile.add_indicator(
|
||||
IndicatorType.PERSISTENT,
|
||||
f'Persistent AP ({profile.detection_count} detections)',
|
||||
{'count': profile.detection_count}
|
||||
)
|
||||
|
||||
# 6. Stable RSSI (fixed placement)
|
||||
rssi_stability = profile.get_rssi_stability()
|
||||
if rssi_stability > 0.7 and len(profile.rssi_samples) >= 5:
|
||||
profile.add_indicator(
|
||||
IndicatorType.STABLE_RSSI,
|
||||
f'Stable signal (stability: {rssi_stability:.0%})',
|
||||
{'stability': rssi_stability}
|
||||
)
|
||||
|
||||
# 7. Meeting correlation
|
||||
if self.is_during_meeting():
|
||||
profile.add_indicator(
|
||||
IndicatorType.MEETING_CORRELATED,
|
||||
'Detected during sensitive period',
|
||||
{'during_meeting': True}
|
||||
)
|
||||
|
||||
# 8. Strong hidden AP (very suspicious)
|
||||
if profile.is_hidden and profile.rssi_samples:
|
||||
latest_rssi = profile.rssi_samples[-1][1]
|
||||
if latest_rssi > -50:
|
||||
profile.add_indicator(
|
||||
IndicatorType.ROGUE_AP,
|
||||
f'Strong hidden AP (RSSI: {latest_rssi} dBm)',
|
||||
{'rssi': latest_rssi}
|
||||
)
|
||||
|
||||
self._apply_known_device_modifier(profile, bssid, 'wifi')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user