diff --git a/intercept_agent.py b/intercept_agent.py
index 1cc91b1..7405e2b 100644
--- a/intercept_agent.py
+++ b/intercept_agent.py
@@ -838,14 +838,15 @@ class ModeManager:
data['data'] = list(getattr(self, 'ais_vessels', {}).values())
elif mode == 'aprs':
data['data'] = list(getattr(self, 'aprs_stations', {}).values())
- elif mode == 'tscm':
- data['data'] = {
- 'anomalies': getattr(self, 'tscm_anomalies', []),
- 'baseline': getattr(self, 'tscm_baseline', {}),
- 'wifi_devices': list(self.wifi_networks.values()),
- 'bt_devices': list(self.bluetooth_devices.values()),
- 'rf_signals': getattr(self, 'tscm_rf_signals', []),
- }
+ elif mode == 'tscm':
+ data['data'] = {
+ 'anomalies': getattr(self, 'tscm_anomalies', []),
+ 'baseline': getattr(self, 'tscm_baseline', {}),
+ 'wifi_devices': list(self.wifi_networks.values()),
+ 'wifi_clients': list(getattr(self, 'tscm_wifi_clients', {}).values()),
+ 'bt_devices': list(self.bluetooth_devices.values()),
+ 'rf_signals': getattr(self, 'tscm_rf_signals', []),
+ }
elif mode == 'listening_post':
data['data'] = {
'activity': getattr(self, 'listening_post_activity', []),
@@ -1104,23 +1105,24 @@ class ModeManager:
self.wifi_clients.clear()
elif mode == 'bluetooth':
self.bluetooth_devices.clear()
- elif mode == 'tscm':
- # Clean up TSCM sub-threads
- for sub_thread_name in ['tscm_wifi', 'tscm_bt', 'tscm_rf']:
- if sub_thread_name in self.output_threads:
- thread = self.output_threads[sub_thread_name]
- if thread and thread.is_alive():
- thread.join(timeout=2)
- del self.output_threads[sub_thread_name]
- # Clear TSCM data
- self.tscm_anomalies = []
- self.tscm_baseline = {}
- self.tscm_rf_signals = []
- # Clear reported threat tracking sets
- if hasattr(self, '_tscm_reported_wifi'):
- self._tscm_reported_wifi.clear()
- if hasattr(self, '_tscm_reported_bt'):
- self._tscm_reported_bt.clear()
+ elif mode == 'tscm':
+ # Clean up TSCM sub-threads
+ for sub_thread_name in ['tscm_wifi', 'tscm_bt', 'tscm_rf']:
+ if sub_thread_name in self.output_threads:
+ thread = self.output_threads[sub_thread_name]
+ if thread and thread.is_alive():
+ thread.join(timeout=2)
+ del self.output_threads[sub_thread_name]
+ # Clear TSCM data
+ self.tscm_anomalies = []
+ self.tscm_baseline = {}
+ self.tscm_rf_signals = []
+ self.tscm_wifi_clients = {}
+ # Clear reported threat tracking sets
+ if hasattr(self, '_tscm_reported_wifi'):
+ self._tscm_reported_wifi.clear()
+ if hasattr(self, '_tscm_reported_bt'):
+ self._tscm_reported_bt.clear()
elif mode == 'dsc':
# Clear DSC data
if hasattr(self, 'dsc_messages'):
@@ -3111,9 +3113,12 @@ class ModeManager:
self.tscm_baseline = {}
if not hasattr(self, 'tscm_anomalies'):
self.tscm_anomalies = []
- if not hasattr(self, 'tscm_rf_signals'):
- self.tscm_rf_signals = []
- self.tscm_anomalies.clear()
+ if not hasattr(self, 'tscm_rf_signals'):
+ self.tscm_rf_signals = []
+ if not hasattr(self, 'tscm_wifi_clients'):
+ self.tscm_wifi_clients = {}
+ self.tscm_anomalies.clear()
+ self.tscm_wifi_clients.clear()
# Get params for what to scan
scan_wifi = params.get('wifi', True)
@@ -3168,7 +3173,7 @@ class ModeManager:
stop_event = self.stop_events.get(mode)
# Import existing Intercept TSCM functions
- from routes.tscm import _scan_wifi_networks, _scan_bluetooth_devices, _scan_rf_signals
+ from routes.tscm import _scan_wifi_networks, _scan_wifi_clients, _scan_bluetooth_devices, _scan_rf_signals
logger.info("TSCM imports successful")
sweep_ranges = None
@@ -3202,8 +3207,9 @@ class ModeManager:
self._tscm_correlation = None
# Track devices seen during this sweep (like local mode's all_wifi/all_bt dicts)
- seen_wifi = {}
- seen_bt = {}
+ seen_wifi = {}
+ seen_wifi_clients = {}
+ seen_bt = {}
last_rf_scan = 0
rf_scan_interval = 30
@@ -3261,10 +3267,51 @@ class ModeManager:
for i in profile.indicators
]
enriched['recommended_action'] = profile.recommended_action
-
- self.wifi_networks[bssid] = enriched
- except Exception as e:
- logger.debug(f"WiFi scan error: {e}")
+
+ self.wifi_networks[bssid] = enriched
+
+ # WiFi clients (monitor mode only)
+ try:
+ wifi_clients = _scan_wifi_clients(wifi_interface or '')
+ for client in wifi_clients:
+ mac = (client.get('mac') or '').upper()
+ if not mac or mac in seen_wifi_clients:
+ continue
+ seen_wifi_clients[mac] = client
+
+ rssi_val = client.get('rssi_current')
+ if rssi_val is None:
+ rssi_val = client.get('rssi_median') or client.get('rssi_ema')
+
+ client_device = {
+ 'mac': mac,
+ 'vendor': client.get('vendor'),
+ 'name': client.get('vendor') or 'WiFi Client',
+ 'rssi': rssi_val,
+ 'associated_bssid': client.get('associated_bssid'),
+ 'probed_ssids': client.get('probed_ssids', []),
+ 'probe_count': client.get('probe_count', len(client.get('probed_ssids', []))),
+ 'is_client': True,
+ }
+
+ if self._tscm_correlation:
+ profile = self._tscm_correlation.analyze_wifi_device(client_device)
+ client_device['classification'] = profile.risk_level.value
+ client_device['score'] = profile.total_score
+ client_device['score_modifier'] = profile.score_modifier
+ client_device['known_device'] = profile.known_device
+ client_device['known_device_name'] = profile.known_device_name
+ client_device['indicators'] = [
+ {'type': i.type.value, 'desc': i.description}
+ for i in profile.indicators
+ ]
+ client_device['recommended_action'] = profile.recommended_action
+
+ self.tscm_wifi_clients[mac] = client_device
+ except Exception as e:
+ logger.debug(f"WiFi client scan error: {e}")
+ except Exception as e:
+ logger.debug(f"WiFi scan error: {e}")
# Bluetooth scan using Intercept's function (same as local mode)
if scan_bt:
diff --git a/routes/bluetooth_v2.py b/routes/bluetooth_v2.py
index e75ec7f..8051477 100644
--- a/routes/bluetooth_v2.py
+++ b/routes/bluetooth_v2.py
@@ -944,23 +944,34 @@ def get_tscm_bluetooth_snapshot(duration: int = 8) -> list[dict]:
devices = scanner.get_devices()
logger.info(f"TSCM snapshot: get_devices() returned {len(devices)} devices")
- # Convert to TSCM format with tracker detection data
- tscm_devices = []
- for device in devices:
- device_data = {
- 'mac': device.address,
- 'address_type': device.address_type,
- 'device_key': device.device_key,
- 'name': device.name or 'Unknown',
- 'rssi': device.rssi_current or -100,
- 'rssi_median': device.rssi_median,
- 'rssi_ema': round(device.rssi_ema, 1) if device.rssi_ema else None,
- 'type': _classify_device_type(device),
- 'manufacturer': device.manufacturer_name,
- 'manufacturer_id': device.manufacturer_id,
- 'manufacturer_data': device.manufacturer_bytes.hex() if device.manufacturer_bytes else None,
- 'protocol': device.protocol,
- 'first_seen': device.first_seen.isoformat(),
+ # Convert to TSCM format with tracker detection data
+ tscm_devices = []
+ for device in devices:
+ manufacturer_name = device.manufacturer_name
+ if (not manufacturer_name) or str(manufacturer_name).lower().startswith('unknown'):
+ if device.address and not device.is_randomized_mac:
+ try:
+ from data.oui import get_manufacturer
+ oui_vendor = get_manufacturer(device.address)
+ if oui_vendor and oui_vendor != 'Unknown':
+ manufacturer_name = oui_vendor
+ except Exception:
+ pass
+
+ device_data = {
+ 'mac': device.address,
+ 'address_type': device.address_type,
+ 'device_key': device.device_key,
+ 'name': device.name or 'Unknown',
+ 'rssi': device.rssi_current or -100,
+ 'rssi_median': device.rssi_median,
+ 'rssi_ema': round(device.rssi_ema, 1) if device.rssi_ema else None,
+ 'type': _classify_device_type(device),
+ 'manufacturer': manufacturer_name,
+ 'manufacturer_id': device.manufacturer_id,
+ 'manufacturer_data': device.manufacturer_bytes.hex() if device.manufacturer_bytes else None,
+ 'protocol': device.protocol,
+ 'first_seen': device.first_seen.isoformat(),
'last_seen': device.last_seen.isoformat(),
'seen_count': device.seen_count,
'range_band': device.range_band,
@@ -1174,14 +1185,38 @@ def get_device_timeseries(device_key: str):
return jsonify(result)
-def _classify_device_type(device: BTDeviceAggregate) -> str:
- """Classify device type from available data."""
- name_lower = (device.name or '').lower()
- manufacturer_lower = (device.manufacturer_name or '').lower()
-
- # Check by name patterns
- if any(x in name_lower for x in ['airpods', 'headphone', 'earbuds', 'buds', 'beats']):
- return 'audio'
+def _classify_device_type(device: BTDeviceAggregate) -> str:
+ """Classify device type from available data."""
+ name_lower = (device.name or '').lower()
+ manufacturer_lower = (device.manufacturer_name or '').lower()
+ service_uuids = device.service_uuids or []
+
+ if (not manufacturer_lower) or manufacturer_lower.startswith('unknown'):
+ if device.address and not device.is_randomized_mac:
+ try:
+ from data.oui import get_manufacturer
+ oui_vendor = get_manufacturer(device.address)
+ if oui_vendor and oui_vendor != 'Unknown':
+ manufacturer_lower = oui_vendor.lower()
+ except Exception:
+ pass
+
+ def normalize_uuid(uuid: str) -> str:
+ if not uuid:
+ return ''
+ value = str(uuid).lower().strip()
+ if value.startswith('0x'):
+ value = value[2:]
+ # Bluetooth Base UUID normalization (16-bit UUIDs)
+ if value.endswith('-0000-1000-8000-00805f9b34fb') and len(value) >= 8:
+ return value[4:8]
+ if len(value) == 4:
+ return value
+ return value
+
+ # Check by name patterns
+ if any(x in name_lower for x in ['airpods', 'headphone', 'earbuds', 'buds', 'beats']):
+ return 'audio'
if any(x in name_lower for x in ['watch', 'band', 'fitbit', 'garmin']):
return 'wearable'
if any(x in name_lower for x in ['iphone', 'pixel', 'galaxy', 'phone']):
@@ -1190,18 +1225,41 @@ def _classify_device_type(device: BTDeviceAggregate) -> str:
return 'computer'
if any(x in name_lower for x in ['mouse', 'keyboard', 'trackpad']):
return 'peripheral'
- if any(x in name_lower for x in ['tile', 'airtag', 'smarttag', 'chipolo']):
- return 'tracker'
- if any(x in name_lower for x in ['speaker', 'sonos', 'echo', 'home']):
- return 'speaker'
- if any(x in name_lower for x in ['tv', 'chromecast', 'roku', 'firestick']):
- return 'media'
-
- # Check by manufacturer
- if 'apple' in manufacturer_lower:
- return 'apple_device'
- if 'samsung' in manufacturer_lower:
- return 'samsung_device'
+ if any(x in name_lower for x in ['tile', 'airtag', 'smarttag', 'chipolo']):
+ return 'tracker'
+ if any(x in name_lower for x in ['speaker', 'sonos', 'echo', 'home']):
+ return 'speaker'
+ if any(x in name_lower for x in ['tv', 'chromecast', 'roku', 'firestick']):
+ return 'media'
+
+ # Tracker signals (metadata or Find My service)
+ if getattr(device, 'is_tracker', False) or getattr(device, 'tracker_type', None):
+ return 'tracker'
+
+ normalized_uuids = {normalize_uuid(u) for u in service_uuids if u}
+ if 'fd6f' in normalized_uuids:
+ return 'tracker'
+
+ # Service UUIDs (GATT / classic)
+ audio_uuids = {'110b', '110a', '111e', '111f', '1108', '1203'}
+ wearable_uuids = {'180d', '1814', '1816'}
+ hid_uuids = {'1812'}
+ beacon_uuids = {'feaa', 'feab', 'feb1', 'febe'}
+
+ if normalized_uuids & audio_uuids:
+ return 'audio'
+ if normalized_uuids & hid_uuids:
+ return 'peripheral'
+ if normalized_uuids & wearable_uuids:
+ return 'wearable'
+ if normalized_uuids & beacon_uuids:
+ return 'beacon'
+
+ # Check by manufacturer
+ if 'apple' in manufacturer_lower:
+ return 'apple_device'
+ if 'samsung' in manufacturer_lower:
+ return 'samsung_device'
# Check by class of device
if device.major_class:
diff --git a/routes/tscm.py b/routes/tscm.py
index 6f9bdab..784d248 100644
--- a/routes/tscm.py
+++ b/routes/tscm.py
@@ -1072,6 +1072,32 @@ def _scan_wifi_networks(interface: str) -> list[dict]:
return []
+def _scan_wifi_clients(interface: str) -> list[dict]:
+ """
+ Get WiFi client observations from the unified WiFi scanner.
+
+ Clients are only available when monitor-mode scanning is active.
+ """
+ try:
+ from utils.wifi import get_wifi_scanner
+
+ scanner = get_wifi_scanner()
+ if interface:
+ try:
+ if not scanner._is_monitor_mode_interface(interface):
+ return []
+ except Exception:
+ return []
+
+ return [client.to_dict() for client in scanner.clients]
+ except ImportError as e:
+ logger.error(f"Failed to import wifi scanner: {e}")
+ return []
+ except Exception as e:
+ logger.exception(f"WiFi client scan failed: {e}")
+ return []
+
+
def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]:
"""
Scan for Bluetooth devices with manufacturer data detection.
@@ -1606,6 +1632,7 @@ def _run_sweep(
threats_found = 0
severity_counts = {'critical': 0, 'high': 0, 'medium': 0, 'low': 0}
all_wifi = {} # Use dict for deduplication by BSSID
+ all_wifi_clients = {} # Use dict for deduplication by client MAC
all_bt = {} # Use dict for deduplication by MAC
all_rf = []
@@ -1702,6 +1729,7 @@ def _run_sweep(
'channel': network.get('channel', ''),
'signal': network.get('power', ''),
'security': network.get('privacy', ''),
+ 'vendor': network.get('vendor'),
'is_threat': is_threat,
'is_new': not classification.get('in_baseline', False),
'classification': profile.risk_level.value,
@@ -1715,6 +1743,77 @@ def _run_sweep(
})
except Exception as e:
logger.error(f"WiFi device processing error for {network.get('bssid', '?')}: {e}")
+
+ # WiFi clients (monitor mode only)
+ try:
+ wifi_clients = _scan_wifi_clients(wifi_interface)
+ for client in wifi_clients:
+ mac = (client.get('mac') or '').upper()
+ if not mac or mac in all_wifi_clients:
+ continue
+ all_wifi_clients[mac] = client
+
+ rssi_val = client.get('rssi_current')
+ if rssi_val is None:
+ rssi_val = client.get('rssi_median') or client.get('rssi_ema')
+
+ client_device = {
+ 'mac': mac,
+ 'vendor': client.get('vendor'),
+ 'name': client.get('vendor') or 'WiFi Client',
+ 'rssi': rssi_val,
+ 'associated_bssid': client.get('associated_bssid'),
+ 'probed_ssids': client.get('probed_ssids', []),
+ 'probe_count': client.get('probe_count', len(client.get('probed_ssids', []))),
+ 'is_client': True,
+ }
+
+ try:
+ timeline_manager.add_observation(
+ identifier=mac,
+ protocol='wifi',
+ rssi=rssi_val,
+ name=client_device.get('vendor') or f'WiFi Client {mac[-5:]}',
+ attributes={'client': True, 'associated_bssid': client_device.get('associated_bssid')}
+ )
+ except Exception as e:
+ logger.debug(f"WiFi client timeline observation error: {e}")
+ _maybe_store_timeline(
+ identifier=mac,
+ protocol='wifi',
+ rssi=rssi_val,
+ attributes={'client': True, 'associated_bssid': client_device.get('associated_bssid')}
+ )
+
+ profile = correlation.analyze_wifi_device(client_device)
+ client_device['classification'] = profile.risk_level.value
+ client_device['score'] = profile.total_score
+ client_device['score_modifier'] = profile.score_modifier
+ client_device['known_device'] = profile.known_device
+ client_device['known_device_name'] = profile.known_device_name
+ client_device['indicators'] = [
+ {'type': i.type.value, 'desc': i.description}
+ for i in profile.indicators
+ ]
+ client_device['recommended_action'] = profile.recommended_action
+
+ # Feed to identity engine for MAC-randomization resistant clustering
+ try:
+ wifi_obs = {
+ 'timestamp': datetime.now().isoformat(),
+ 'src_mac': mac,
+ 'bssid': client_device.get('associated_bssid'),
+ 'rssi': rssi_val,
+ 'frame_type': 'probe_request',
+ 'probed_ssids': client_device.get('probed_ssids', []),
+ }
+ ingest_wifi_dict(wifi_obs)
+ except Exception as e:
+ logger.debug(f"Identity engine WiFi client ingest error: {e}")
+
+ _emit_event('wifi_client', client_device)
+ except Exception as e:
+ logger.debug(f"WiFi client scan error: {e}")
except Exception as e:
last_wifi_scan = current_time
logger.error(f"WiFi scan error: {e}")
@@ -1793,6 +1892,9 @@ def _run_sweep(
'name': device.get('name', 'Unknown'),
'device_type': device.get('type', ''),
'rssi': device.get('rssi', ''),
+ 'manufacturer': device.get('manufacturer'),
+ 'tracker': device.get('tracker'),
+ 'tracker_type': device.get('tracker_type'),
'is_threat': is_threat,
'is_new': not classification.get('in_baseline', False),
'classification': profile.risk_level.value,
@@ -1936,6 +2038,7 @@ def _run_sweep(
if verbose_results:
wifi_payload = list(all_wifi.values())
+ wifi_client_payload = list(all_wifi_clients.values())
bt_payload = list(all_bt.values())
rf_payload = list(all_rf)
else:
@@ -1951,6 +2054,28 @@ def _run_sweep(
}
for d in all_wifi.values()
]
+ wifi_client_payload = []
+ for client in all_wifi_clients.values():
+ mac = client.get('mac') or client.get('address')
+ if isinstance(mac, str):
+ mac = mac.upper()
+ probed_ssids = client.get('probed_ssids') or []
+ rssi = client.get('rssi')
+ if rssi is None:
+ rssi = client.get('rssi_current')
+ if rssi is None:
+ rssi = client.get('rssi_median')
+ if rssi is None:
+ rssi = client.get('rssi_ema')
+ wifi_client_payload.append({
+ 'mac': mac,
+ 'vendor': client.get('vendor'),
+ 'rssi': rssi,
+ 'associated_bssid': client.get('associated_bssid'),
+ 'is_associated': client.get('is_associated'),
+ 'probed_ssids': probed_ssids,
+ 'probe_count': client.get('probe_count', len(probed_ssids)),
+ })
bt_payload = [
{
'mac': d.get('mac') or d.get('address'),
@@ -1975,9 +2100,11 @@ def _run_sweep(
status='completed',
results={
'wifi_devices': wifi_payload,
+ 'wifi_clients': wifi_client_payload,
'bt_devices': bt_payload,
'rf_signals': rf_payload,
'wifi_count': len(all_wifi),
+ 'wifi_client_count': len(all_wifi_clients),
'bt_count': len(all_bt),
'rf_count': len(all_rf),
'severity_counts': severity_counts,
@@ -2022,6 +2149,7 @@ def _run_sweep(
'sweep_id': _current_sweep_id,
'threats_found': threats_found,
'wifi_count': len(all_wifi),
+ 'wifi_client_count': len(all_wifi_clients),
'bt_count': len(all_bt),
'rf_count': len(all_rf),
'severity_counts': severity_counts,
diff --git a/static/css/modes/tscm.css b/static/css/modes/tscm.css
index 0335c0c..883faf1 100644
--- a/static/css/modes/tscm.css
+++ b/static/css/modes/tscm.css
@@ -196,6 +196,28 @@
margin-left: 6px;
font-size: 10px;
}
+.tracker-badge {
+ margin-left: 6px;
+ font-size: 9px;
+ padding: 1px 4px;
+ border-radius: 3px;
+ background: rgba(255, 51, 102, 0.2);
+ color: #ff3366;
+ border: 1px solid rgba(255, 51, 102, 0.4);
+ text-transform: uppercase;
+ letter-spacing: 0.4px;
+}
+.client-badge {
+ margin-left: 6px;
+ font-size: 9px;
+ padding: 1px 4px;
+ border-radius: 3px;
+ background: rgba(74, 158, 255, 0.2);
+ color: #4a9eff;
+ border: 1px solid rgba(74, 158, 255, 0.4);
+ text-transform: uppercase;
+ letter-spacing: 0.4px;
+}
.known-badge {
margin-left: 6px;
font-size: 9px;
diff --git a/templates/index.html b/templates/index.html
index 661dc9c..7934eef 100644
--- a/templates/index.html
+++ b/templates/index.html
@@ -1565,6 +1565,17 @@
+
+
+
+
+
Start a sweep to scan for WiFi clients
+
+
+
${getScoreBadge(d.score)}
@@ -12371,7 +12482,7 @@
${d.mac}
${d.rssi || '--'} dBm
- ${d.device_type || 'Unknown'}
+ ${escapeHtml([d.device_type, d.manufacturer].filter(Boolean).join(' • ') || 'Unknown')}
${d.indicators && d.indicators.length > 0 ? `
${formatIndicators(d.indicators)}
` : ''}
${d.recommended_action && d.recommended_action !== 'monitor' ? `
Action: ${d.recommended_action}
` : ''}
@@ -12535,6 +12646,10 @@
const identityClusters = data.identity_clusters || (tscmIdentitySummary ? tscmIdentitySummary.total : 0);
const baselineNew = data.baseline_new_devices || 0;
const baselineMissing = data.baseline_missing_devices || 0;
+ const wifiCount = data.wifi_count ?? tscmWifiDevices.length;
+ const wifiClientCount = data.wifi_client_count ?? tscmWifiClients.length;
+ const btCount = data.bt_count ?? tscmBtDevices.length;
+ const rfCount = data.rf_count ?? tscmRfSignals.length;
let assessment = 'BASELINE ENVIRONMENT';
let assessmentClass = 'informational';
@@ -12574,6 +12689,9 @@
` : ''}
+
+ Devices: ${wifiCount} WiFi AP • ${wifiClientCount} WiFi Clients • ${btCount} BT • ${rfCount} RF
+
Assessment: ${assessment}
diff --git a/utils/tscm/correlation.py b/utils/tscm/correlation.py
index 1e6e795..d924f4d 100644
--- a/utils/tscm/correlation.py
+++ b/utils/tscm/correlation.py
@@ -118,10 +118,15 @@ class DeviceProfile:
identifier: str # MAC, BSSID, or frequency
protocol: str # 'bluetooth', 'wifi', 'rf'
- # Device info
- name: Optional[str] = None
- manufacturer: Optional[str] = None
- device_type: Optional[str] = None
+ # Device info
+ name: Optional[str] = None
+ manufacturer: Optional[str] = None
+ device_type: Optional[str] = None
+ tracker_type: Optional[str] = None
+ tracker_name: Optional[str] = None
+ tracker_confidence: Optional[str] = None
+ tracker_confidence_score: Optional[float] = None
+ tracker_evidence: list[str] = field(default_factory=list)
# Bluetooth-specific
services: list[str] = field(default_factory=list)
@@ -231,14 +236,19 @@ class DeviceProfile:
indicator_count = len(self.indicators)
self.confidence = min(1.0, (indicator_count * 0.15) + (self.total_score * 0.05))
- def to_dict(self) -> dict:
- """Convert to dictionary for JSON serialization."""
- return {
- 'identifier': self.identifier,
- 'protocol': self.protocol,
- 'name': self.name,
- 'manufacturer': self.manufacturer,
- 'device_type': self.device_type,
+ def to_dict(self) -> dict:
+ """Convert to dictionary for JSON serialization."""
+ return {
+ 'identifier': self.identifier,
+ 'protocol': self.protocol,
+ 'name': self.name,
+ 'manufacturer': self.manufacturer,
+ 'device_type': self.device_type,
+ 'tracker_type': self.tracker_type,
+ 'tracker_name': self.tracker_name,
+ 'tracker_confidence': self.tracker_confidence,
+ 'tracker_confidence_score': self.tracker_confidence_score,
+ 'tracker_evidence': self.tracker_evidence,
'ssid': self.ssid,
'frequency': self.frequency,
'first_seen': self.first_seen.isoformat() if self.first_seen else None,
@@ -266,14 +276,33 @@ class DeviceProfile:
# Known audio-capable BLE service UUIDs
-AUDIO_SERVICE_UUIDS = [
- '0000110b-0000-1000-8000-00805f9b34fb', # A2DP Sink
- '0000110a-0000-1000-8000-00805f9b34fb', # A2DP Source
- '0000111e-0000-1000-8000-00805f9b34fb', # Handsfree
- '0000111f-0000-1000-8000-00805f9b34fb', # Handsfree Audio Gateway
- '00001108-0000-1000-8000-00805f9b34fb', # Headset
- '00001203-0000-1000-8000-00805f9b34fb', # Generic Audio
-]
+AUDIO_SERVICE_UUIDS = [
+ '0000110b-0000-1000-8000-00805f9b34fb', # A2DP Sink
+ '0000110a-0000-1000-8000-00805f9b34fb', # A2DP Source
+ '0000111e-0000-1000-8000-00805f9b34fb', # Handsfree
+ '0000111f-0000-1000-8000-00805f9b34fb', # Handsfree Audio Gateway
+ '00001108-0000-1000-8000-00805f9b34fb', # Headset
+ '00001203-0000-1000-8000-00805f9b34fb', # Generic Audio
+]
+
+_BT_BASE_UUID_SUFFIX = '-0000-1000-8000-00805f9b34fb'
+
+
+def _normalize_bt_uuid(value: str) -> str:
+ """Normalize BLE UUIDs to 16-bit where possible."""
+ if not value:
+ return ''
+ uuid = str(value).lower().strip()
+ if uuid.startswith('0x'):
+ uuid = uuid[2:]
+ if uuid.endswith(_BT_BASE_UUID_SUFFIX) and len(uuid) >= 8:
+ return uuid[4:8]
+ if len(uuid) == 4:
+ return uuid
+ return uuid
+
+
+AUDIO_SERVICE_UUIDS_16 = {_normalize_bt_uuid(u) for u in AUDIO_SERVICE_UUIDS}
# Generic chipset vendors (often used in covert devices)
GENERIC_CHIPSET_VENDORS = [
@@ -415,10 +444,24 @@ class CorrelationEngine:
# Update profile data
profile.name = device.get('name') or profile.name
profile.manufacturer = device.get('manufacturer') or profile.manufacturer
- profile.device_type = device.get('type') or profile.device_type
- profile.services = device.get('services', []) or profile.services
- profile.company_id = device.get('company_id') or profile.company_id
- profile.advertising_interval = device.get('advertising_interval') or profile.advertising_interval
+ profile.device_type = device.get('type') or profile.device_type
+ services = device.get('services')
+ if not services:
+ services = device.get('service_uuids')
+ profile.services = services or profile.services
+ profile.company_id = device.get('company_id') or profile.company_id
+ profile.advertising_interval = device.get('advertising_interval') or profile.advertising_interval
+ tracker_data = device.get('tracker') or {}
+ if tracker_data:
+ profile.tracker_type = tracker_data.get('type') or profile.tracker_type
+ profile.tracker_name = tracker_data.get('name') or profile.tracker_name
+ profile.tracker_confidence = tracker_data.get('confidence') or profile.tracker_confidence
+ profile.tracker_confidence_score = tracker_data.get('confidence_score') or profile.tracker_confidence_score
+ evidence = tracker_data.get('evidence')
+ if isinstance(evidence, list):
+ profile.tracker_evidence = evidence
+ elif evidence:
+ profile.tracker_evidence = [str(evidence)]
# Add RSSI sample
rssi = device.get('rssi', device.get('signal'))
@@ -431,15 +474,28 @@ class CorrelationEngine:
# Clear previous indicators for fresh analysis
profile.indicators = []
- # === Detection Logic ===
-
- # 1. Unknown manufacturer or generic chipset
- if not profile.manufacturer:
- profile.add_indicator(
- IndicatorType.UNKNOWN_DEVICE,
- 'Unknown manufacturer',
- {'manufacturer': None}
- )
+ # === Detection Logic ===
+
+ # 1. Unknown manufacturer or generic chipset
+ if not profile.manufacturer and mac and not device.get('is_randomized_mac'):
+ try:
+ first_octet = int(mac.split(':')[0], 16)
+ except (ValueError, IndexError):
+ first_octet = None
+ if first_octet is None or not (first_octet & 0x02):
+ try:
+ from data.oui import get_manufacturer
+ vendor = get_manufacturer(mac)
+ if vendor and vendor != 'Unknown':
+ profile.manufacturer = vendor
+ except Exception:
+ pass
+ if not profile.manufacturer:
+ profile.add_indicator(
+ IndicatorType.UNKNOWN_DEVICE,
+ 'Unknown manufacturer',
+ {'manufacturer': None}
+ )
elif any(v in profile.manufacturer.lower() for v in GENERIC_CHIPSET_VENDORS):
profile.add_indicator(
IndicatorType.UNKNOWN_DEVICE,
@@ -455,16 +511,16 @@ class CorrelationEngine:
{'name': profile.name}
)
- # 3. Audio-capable services
- if profile.services:
- audio_services = [s for s in profile.services
- if s.lower() in [u.lower() for u in AUDIO_SERVICE_UUIDS]]
- if audio_services:
- profile.add_indicator(
- IndicatorType.AUDIO_CAPABLE,
- 'Audio-capable BLE services detected',
- {'services': audio_services}
- )
+ # 3. Audio-capable services
+ if profile.services:
+ normalized_services = {_normalize_bt_uuid(s) for s in profile.services if s}
+ audio_services = [s for s in normalized_services if s in AUDIO_SERVICE_UUIDS_16]
+ if audio_services:
+ profile.add_indicator(
+ IndicatorType.AUDIO_CAPABLE,
+ 'Audio-capable BLE services detected',
+ {'services': audio_services}
+ )
# Check name for audio keywords
if profile.name:
@@ -518,15 +574,47 @@ class CorrelationEngine:
{'mac': mac}
)
- # 9. Known tracker detection (AirTag, Tile, SmartTag, ESP32)
- mac_prefix = mac[:8] if len(mac) >= 8 else ''
- tracker_detected = False
-
- # Check for tracker flags from BLE scanner (manufacturer ID detection)
- if device.get('is_airtag'):
- profile.add_indicator(
- IndicatorType.AIRTAG_DETECTED,
- 'Apple AirTag detected via manufacturer data',
+ # 9. Known tracker detection (AirTag, Tile, SmartTag, ESP32)
+ mac_prefix = mac[:8] if len(mac) >= 8 else ''
+ tracker_detected = False
+ tracker_data = device.get('tracker') or {}
+
+ if tracker_data.get('is_tracker'):
+ tracker_detected = True
+ tracker_label = tracker_data.get('name') or tracker_data.get('type')
+ if tracker_label:
+ label_lower = str(tracker_label).lower()
+ if 'airtag' in label_lower or 'find my' in label_lower:
+ profile.add_indicator(
+ IndicatorType.AIRTAG_DETECTED,
+ f'Tracker detected: {tracker_label}',
+ {'mac': mac, 'tracker_type': tracker_label}
+ )
+ profile.device_type = 'AirTag'
+ elif 'tile' in label_lower:
+ profile.add_indicator(
+ IndicatorType.TILE_DETECTED,
+ f'Tracker detected: {tracker_label}',
+ {'mac': mac, 'tracker_type': tracker_label}
+ )
+ profile.device_type = 'Tile Tracker'
+ elif 'smarttag' in label_lower or 'samsung' in label_lower:
+ profile.add_indicator(
+ IndicatorType.SMARTTAG_DETECTED,
+ f'Tracker detected: {tracker_label}',
+ {'mac': mac, 'tracker_type': tracker_label}
+ )
+ profile.device_type = 'Samsung SmartTag'
+ else:
+ profile.device_type = tracker_label
+ elif not profile.device_type:
+ profile.device_type = 'Tracker'
+
+ # Check for tracker flags from BLE scanner (manufacturer ID detection)
+ if device.get('is_airtag'):
+ profile.add_indicator(
+ IndicatorType.AIRTAG_DETECTED,
+ 'Apple AirTag detected via manufacturer data',
{'mac': mac, 'tracker_type': 'AirTag'}
)
profile.device_type = device.get('tracker_type', 'AirTag')
@@ -662,31 +750,41 @@ class CorrelationEngine:
return profile
- def analyze_wifi_device(self, device: dict) -> DeviceProfile:
- """
- Analyze a Wi-Fi device/AP for suspicious indicators.
+ def analyze_wifi_device(self, device: dict) -> DeviceProfile:
+ """
+ Analyze a Wi-Fi device/AP for suspicious indicators.
Args:
device: Dict with bssid, ssid, channel, rssi, encryption, etc.
- Returns:
- DeviceProfile with risk assessment
- """
- bssid = device.get('bssid', device.get('mac', '')).upper()
- profile = self.get_or_create_profile(bssid, 'wifi')
-
- # Update profile data
- ssid = device.get('ssid', device.get('essid', ''))
- profile.ssid = ssid if ssid else profile.ssid
- profile.name = ssid or f'Hidden Network ({bssid[-8:]})'
- profile.channel = device.get('channel') or profile.channel
- profile.encryption = device.get('encryption', device.get('privacy')) or profile.encryption
- profile.beacon_interval = device.get('beacon_interval') or profile.beacon_interval
- profile.is_hidden = not ssid or ssid in ['', 'Hidden', '[Hidden]']
-
- # Extract manufacturer from OUI
- if bssid and len(bssid) >= 8:
- profile.manufacturer = device.get('vendor') or profile.manufacturer
+ Returns:
+ DeviceProfile with risk assessment
+ """
+ bssid = device.get('bssid', device.get('mac', '')).upper()
+ profile = self.get_or_create_profile(bssid, 'wifi')
+ is_client = bool(device.get('is_client') or device.get('role') == 'client')
+
+ # Update profile data
+ ssid = device.get('ssid', device.get('essid', ''))
+ if is_client:
+ profile.name = device.get('name') or device.get('vendor') or profile.name or f'Client ({bssid[-8:]})'
+ profile.device_type = 'client'
+ profile.ssid = profile.ssid # Clients are not SSIDs
+ profile.channel = device.get('channel') or profile.channel
+ profile.encryption = profile.encryption
+ profile.beacon_interval = profile.beacon_interval
+ profile.is_hidden = False
+ else:
+ profile.ssid = ssid if ssid else profile.ssid
+ profile.name = ssid or f'Hidden Network ({bssid[-8:]})'
+ profile.channel = device.get('channel') or profile.channel
+ profile.encryption = device.get('encryption', device.get('privacy')) or profile.encryption
+ profile.beacon_interval = device.get('beacon_interval') or profile.beacon_interval
+ profile.is_hidden = not ssid or ssid in ['', 'Hidden', '[Hidden]']
+
+ # Extract manufacturer from OUI
+ if bssid and len(bssid) >= 8:
+ profile.manufacturer = device.get('vendor') or profile.manufacturer
# Add RSSI sample
rssi = device.get('rssi', device.get('power', device.get('signal')))
@@ -699,78 +797,118 @@ class CorrelationEngine:
# Clear previous indicators
profile.indicators = []
- # === Detection Logic ===
-
- # 1. Hidden or unnamed SSID
- if profile.is_hidden:
- profile.add_indicator(
- IndicatorType.HIDDEN_IDENTITY,
- 'Hidden or empty SSID',
- {'ssid': ssid}
- )
-
- # 2. BSSID not in authorized list (would need baseline)
- # For now, mark as unknown if no manufacturer
- if not profile.manufacturer:
- profile.add_indicator(
- IndicatorType.UNKNOWN_DEVICE,
- 'Unknown AP manufacturer',
- {'bssid': bssid}
- )
-
- # 3. Consumer device OUI in restricted environment
- consumer_ouis = ['tp-link', 'netgear', 'd-link', 'linksys', 'asus']
- if profile.manufacturer and any(c in profile.manufacturer.lower() for c in consumer_ouis):
- profile.add_indicator(
- IndicatorType.ROGUE_AP,
- f'Consumer-grade AP detected: {profile.manufacturer}',
- {'manufacturer': profile.manufacturer}
- )
-
- # 4. Camera device patterns
- camera_keywords = ['cam', 'camera', 'ipcam', 'dvr', 'nvr', 'wyze',
- 'ring', 'arlo', 'nest', 'blink', 'eufy', 'yi']
- if ssid and any(k in ssid.lower() for k in camera_keywords):
- profile.add_indicator(
- IndicatorType.AUDIO_CAPABLE, # Cameras often have mics
- f'Potential camera device: {ssid}',
- {'ssid': ssid}
- )
-
- # 5. Persistent presence
- if profile.detection_count >= 3:
- profile.add_indicator(
- IndicatorType.PERSISTENT,
- f'Persistent AP ({profile.detection_count} detections)',
- {'count': profile.detection_count}
- )
-
- # 6. Stable RSSI (fixed placement)
- rssi_stability = profile.get_rssi_stability()
- if rssi_stability > 0.7 and len(profile.rssi_samples) >= 5:
- profile.add_indicator(
- IndicatorType.STABLE_RSSI,
- f'Stable signal (stability: {rssi_stability:.0%})',
- {'stability': rssi_stability}
- )
-
- # 7. Meeting correlation
- if self.is_during_meeting():
- profile.add_indicator(
- IndicatorType.MEETING_CORRELATED,
- 'Detected during sensitive period',
- {'during_meeting': True}
- )
-
- # 8. Strong hidden AP (very suspicious)
- if profile.is_hidden and profile.rssi_samples:
- latest_rssi = profile.rssi_samples[-1][1]
- if latest_rssi > -50:
- profile.add_indicator(
- IndicatorType.ROGUE_AP,
- f'Strong hidden AP (RSSI: {latest_rssi} dBm)',
- {'rssi': latest_rssi}
- )
+ # === Detection Logic ===
+ if is_client:
+ if not profile.manufacturer:
+ profile.add_indicator(
+ IndicatorType.UNKNOWN_DEVICE,
+ 'Unknown client manufacturer',
+ {'mac': bssid}
+ )
+
+ if profile.detection_count >= 3:
+ profile.add_indicator(
+ IndicatorType.PERSISTENT,
+ f'Persistent client ({profile.detection_count} detections)',
+ {'count': profile.detection_count}
+ )
+
+ rssi_stability = profile.get_rssi_stability()
+ if rssi_stability > 0.7 and len(profile.rssi_samples) >= 5:
+ profile.add_indicator(
+ IndicatorType.STABLE_RSSI,
+ f'Stable client signal (stability: {rssi_stability:.0%})',
+ {'stability': rssi_stability}
+ )
+
+ if self.is_during_meeting():
+ profile.add_indicator(
+ IndicatorType.MEETING_CORRELATED,
+ 'Detected during sensitive period',
+ {'during_meeting': True}
+ )
+
+ try:
+ first_octet = int(bssid.split(':')[0], 16)
+ if first_octet & 0x02:
+ profile.add_indicator(
+ IndicatorType.MAC_ROTATION,
+ 'Random/locally administered MAC detected',
+ {'mac': bssid}
+ )
+ except (ValueError, IndexError):
+ pass
+ else:
+ # 1. Hidden or unnamed SSID
+ if profile.is_hidden:
+ profile.add_indicator(
+ IndicatorType.HIDDEN_IDENTITY,
+ 'Hidden or empty SSID',
+ {'ssid': ssid}
+ )
+
+ # 2. BSSID not in authorized list (would need baseline)
+ # For now, mark as unknown if no manufacturer
+ if not profile.manufacturer:
+ profile.add_indicator(
+ IndicatorType.UNKNOWN_DEVICE,
+ 'Unknown AP manufacturer',
+ {'bssid': bssid}
+ )
+
+ # 3. Consumer device OUI in restricted environment
+ consumer_ouis = ['tp-link', 'netgear', 'd-link', 'linksys', 'asus']
+ if profile.manufacturer and any(c in profile.manufacturer.lower() for c in consumer_ouis):
+ profile.add_indicator(
+ IndicatorType.ROGUE_AP,
+ f'Consumer-grade AP detected: {profile.manufacturer}',
+ {'manufacturer': profile.manufacturer}
+ )
+
+ # 4. Camera device patterns
+ camera_keywords = ['cam', 'camera', 'ipcam', 'dvr', 'nvr', 'wyze',
+ 'ring', 'arlo', 'nest', 'blink', 'eufy', 'yi']
+ if ssid and any(k in ssid.lower() for k in camera_keywords):
+ profile.add_indicator(
+ IndicatorType.AUDIO_CAPABLE, # Cameras often have mics
+ f'Potential camera device: {ssid}',
+ {'ssid': ssid}
+ )
+
+ # 5. Persistent presence
+ if profile.detection_count >= 3:
+ profile.add_indicator(
+ IndicatorType.PERSISTENT,
+ f'Persistent AP ({profile.detection_count} detections)',
+ {'count': profile.detection_count}
+ )
+
+ # 6. Stable RSSI (fixed placement)
+ rssi_stability = profile.get_rssi_stability()
+ if rssi_stability > 0.7 and len(profile.rssi_samples) >= 5:
+ profile.add_indicator(
+ IndicatorType.STABLE_RSSI,
+ f'Stable signal (stability: {rssi_stability:.0%})',
+ {'stability': rssi_stability}
+ )
+
+ # 7. Meeting correlation
+ if self.is_during_meeting():
+ profile.add_indicator(
+ IndicatorType.MEETING_CORRELATED,
+ 'Detected during sensitive period',
+ {'during_meeting': True}
+ )
+
+ # 8. Strong hidden AP (very suspicious)
+ if profile.is_hidden and profile.rssi_samples:
+ latest_rssi = profile.rssi_samples[-1][1]
+ if latest_rssi > -50:
+ profile.add_indicator(
+ IndicatorType.ROGUE_AP,
+ f'Strong hidden AP (RSSI: {latest_rssi} dBm)',
+ {'rssi': latest_rssi}
+ )
self._apply_known_device_modifier(profile, bssid, 'wifi')
diff --git a/utils/tscm/detector.py b/utils/tscm/detector.py
index d09412b..4245706 100644
--- a/utils/tscm/detector.py
+++ b/utils/tscm/detector.py
@@ -476,11 +476,12 @@ class ThreatDetector:
mac = device.get('mac', device.get('address', '')).upper()
name = device.get('name', '')
rssi = device.get('rssi', device.get('signal', -100))
- manufacturer = device.get('manufacturer', '')
- device_type = device.get('type', '')
- manufacturer_data = device.get('manufacturer_data')
-
- threats = []
+ manufacturer = device.get('manufacturer', '')
+ device_type = device.get('type', '')
+ manufacturer_data = device.get('manufacturer_data')
+ tracker_data = device.get('tracker', {}) or {}
+
+ threats = []
# Check if new device (not in baseline)
if self.baseline and mac and mac not in self.baseline_bt_macs:
@@ -490,12 +491,25 @@ class ThreatDetector:
'reason': 'Device not present in baseline',
})
- # Check for known trackers
- tracker_info = is_known_tracker(name, manufacturer_data)
- if tracker_info:
- threats.append({
- 'type': 'tracker',
- 'severity': tracker_info.get('risk', 'high'),
+ # Check for known trackers (v2 tracker data if available)
+ if tracker_data.get('is_tracker'):
+ tracker_label = tracker_data.get('name') or tracker_data.get('type') or 'Tracker'
+ confidence = str(tracker_data.get('confidence') or '').lower()
+ severity = 'high' if confidence in ('high', 'medium') else 'medium'
+ threats.append({
+ 'type': 'tracker',
+ 'severity': severity,
+ 'reason': f"Tracker detected: {tracker_label}",
+ 'tracker_type': tracker_label,
+ 'details': tracker_data.get('evidence', []),
+ })
+
+ # Check for known trackers (legacy patterns)
+ tracker_info = is_known_tracker(name, manufacturer_data)
+ if tracker_info:
+ threats.append({
+ 'type': 'tracker',
+ 'severity': tracker_info.get('risk', 'high'),
'reason': f"Known tracker detected: {tracker_info.get('name', 'Unknown')}",
'tracker_type': tracker_info.get('name'),
})
diff --git a/utils/tscm/reports.py b/utils/tscm/reports.py
index 627bb03..e90b851 100644
--- a/utils/tscm/reports.py
+++ b/utils/tscm/reports.py
@@ -102,13 +102,14 @@ class TSCMReport:
# Meeting window summaries
meeting_summaries: list[ReportMeetingSummary] = field(default_factory=list)
- # Statistics
- total_devices_scanned: int = 0
- wifi_devices: int = 0
- bluetooth_devices: int = 0
- rf_signals: int = 0
- new_devices: int = 0
- missing_devices: int = 0
+ # Statistics
+ total_devices_scanned: int = 0
+ wifi_devices: int = 0
+ wifi_clients: int = 0
+ bluetooth_devices: int = 0
+ rf_signals: int = 0
+ new_devices: int = 0
+ missing_devices: int = 0
# Sweep duration
sweep_start: Optional[datetime] = None
@@ -200,12 +201,13 @@ def generate_executive_summary(report: TSCMReport) -> str:
lines.append("")
# Key statistics
- lines.append("SCAN STATISTICS:")
- lines.append(f" - Total devices scanned: {report.total_devices_scanned}")
- lines.append(f" - WiFi access points: {report.wifi_devices}")
- lines.append(f" - Bluetooth devices: {report.bluetooth_devices}")
- lines.append(f" - RF signals: {report.rf_signals}")
- lines.append("")
+ lines.append("SCAN STATISTICS:")
+ lines.append(f" - Total devices scanned: {report.total_devices_scanned}")
+ lines.append(f" - WiFi access points: {report.wifi_devices}")
+ lines.append(f" - WiFi clients: {report.wifi_clients}")
+ lines.append(f" - Bluetooth devices: {report.bluetooth_devices}")
+ lines.append(f" - RF signals: {report.rf_signals}")
+ lines.append("")
# Findings summary
lines.append("FINDINGS SUMMARY:")
@@ -427,13 +429,14 @@ def generate_technical_annex_json(report: TSCMReport) -> dict:
'capabilities': report.capabilities,
'limitations': report.limitations,
- 'statistics': {
- 'total_devices': report.total_devices_scanned,
- 'wifi_devices': report.wifi_devices,
- 'bluetooth_devices': report.bluetooth_devices,
- 'rf_signals': report.rf_signals,
- 'new_devices': report.new_devices,
- 'missing_devices': report.missing_devices,
+ 'statistics': {
+ 'total_devices': report.total_devices_scanned,
+ 'wifi_devices': report.wifi_devices,
+ 'wifi_clients': report.wifi_clients,
+ 'bluetooth_devices': report.bluetooth_devices,
+ 'rf_signals': report.rf_signals,
+ 'new_devices': report.new_devices,
+ 'missing_devices': report.missing_devices,
'high_interest_count': len(report.high_interest_findings),
'needs_review_count': len(report.needs_review_findings),
'informational_count': len(report.informational_findings),
@@ -781,21 +784,23 @@ class TSCMReportBuilder:
self.report.meeting_summaries.append(meeting)
return self
- def add_statistics(
- self,
- wifi: int = 0,
- bluetooth: int = 0,
- rf: int = 0,
- new: int = 0,
- missing: int = 0
- ) -> 'TSCMReportBuilder':
- self.report.wifi_devices = wifi
- self.report.bluetooth_devices = bluetooth
- self.report.rf_signals = rf
- self.report.total_devices_scanned = wifi + bluetooth + rf
- self.report.new_devices = new
- self.report.missing_devices = missing
- return self
+ def add_statistics(
+ self,
+ wifi: int = 0,
+ wifi_clients: int = 0,
+ bluetooth: int = 0,
+ rf: int = 0,
+ new: int = 0,
+ missing: int = 0
+ ) -> 'TSCMReportBuilder':
+ self.report.wifi_devices = wifi
+ self.report.wifi_clients = wifi_clients
+ self.report.bluetooth_devices = bluetooth
+ self.report.rf_signals = rf
+ self.report.total_devices_scanned = wifi + wifi_clients + bluetooth + rf
+ self.report.new_devices = new
+ self.report.missing_devices = missing
+ return self
def add_device_timelines(self, timelines: list[dict]) -> 'TSCMReportBuilder':
self.report.device_timelines = timelines
@@ -890,25 +895,30 @@ def generate_report(
builder.add_findings_from_profiles(device_profiles)
# Statistics
- results = sweep_data.get('results', {})
- wifi_count = results.get('wifi_count')
- if wifi_count is None:
- wifi_count = len(results.get('wifi_devices', results.get('wifi', [])))
-
- bt_count = results.get('bt_count')
- if bt_count is None:
- bt_count = len(results.get('bt_devices', results.get('bluetooth', [])))
+ results = sweep_data.get('results', {})
+ wifi_count = results.get('wifi_count')
+ if wifi_count is None:
+ wifi_count = len(results.get('wifi_devices', results.get('wifi', [])))
+
+ wifi_client_count = results.get('wifi_client_count')
+ if wifi_client_count is None:
+ wifi_client_count = len(results.get('wifi_clients', []))
+
+ bt_count = results.get('bt_count')
+ if bt_count is None:
+ bt_count = len(results.get('bt_devices', results.get('bluetooth', [])))
rf_count = results.get('rf_count')
if rf_count is None:
rf_count = len(results.get('rf_signals', results.get('rf', [])))
- builder.add_statistics(
- wifi=wifi_count,
- bluetooth=bt_count,
- rf=rf_count,
- new=baseline_diff.get('summary', {}).get('new_devices', 0) if baseline_diff else 0,
- missing=baseline_diff.get('summary', {}).get('missing_devices', 0) if baseline_diff else 0,
+ builder.add_statistics(
+ wifi=wifi_count,
+ wifi_clients=wifi_client_count,
+ bluetooth=bt_count,
+ rf=rf_count,
+ new=baseline_diff.get('summary', {}).get('new_devices', 0) if baseline_diff else 0,
+ missing=baseline_diff.get('summary', {}).get('missing_devices', 0) if baseline_diff else 0,
)
# Technical data
diff --git a/utils/wifi/constants.py b/utils/wifi/constants.py
index 1fec3be..ccb4d11 100644
--- a/utils/wifi/constants.py
+++ b/utils/wifi/constants.py
@@ -414,14 +414,27 @@ VENDOR_OUIS = {
}
-def get_vendor_from_mac(mac: str) -> str | None:
- """Get vendor name from MAC address OUI."""
- if not mac:
- return None
- # Normalize MAC format
- mac_upper = mac.upper().replace('-', ':')
- oui = mac_upper[:8]
- return VENDOR_OUIS.get(oui)
+def get_vendor_from_mac(mac: str) -> str | None:
+ """Get vendor name from MAC address OUI."""
+ if not mac:
+ return None
+ # Normalize MAC format
+ mac_upper = mac.upper().replace('-', ':')
+ oui = mac_upper[:8]
+ vendor = VENDOR_OUIS.get(oui)
+ if vendor:
+ return vendor
+
+ # Fallback to expanded OUI database if available
+ try:
+ from data.oui import get_manufacturer
+ manufacturer = get_manufacturer(mac_upper)
+ if manufacturer and manufacturer != 'Unknown':
+ return manufacturer
+ except Exception:
+ return None
+
+ return None
# =============================================================================
diff --git a/utils/wifi/models.py b/utils/wifi/models.py
index fa2f817..b49e02c 100644
--- a/utils/wifi/models.py
+++ b/utils/wifi/models.py
@@ -259,16 +259,17 @@ class WiFiAccessPoint:
'in_baseline': self.in_baseline,
}
- def to_legacy_dict(self) -> dict:
- """Convert to legacy format for TSCM compatibility."""
- return {
- 'bssid': self.bssid,
- 'essid': self.essid or '',
- 'power': str(self.rssi_current) if self.rssi_current else '-100',
- 'channel': str(self.channel) if self.channel else '',
- 'privacy': self.security,
- 'first_seen': self.first_seen.isoformat() if self.first_seen else '',
- 'last_seen': self.last_seen.isoformat() if self.last_seen else '',
+ def to_legacy_dict(self) -> dict:
+ """Convert to legacy format for TSCM compatibility."""
+ return {
+ 'bssid': self.bssid,
+ 'essid': self.essid or '',
+ 'vendor': self.vendor,
+ 'power': str(self.rssi_current) if self.rssi_current else '-100',
+ 'channel': str(self.channel) if self.channel else '',
+ 'privacy': self.security,
+ 'first_seen': self.first_seen.isoformat() if self.first_seen else '',
+ 'last_seen': self.last_seen.isoformat() if self.last_seen else '',
'beacon_count': str(self.beacon_count),
'lan_ip': '', # Not tracked in new system
}