Add MAC-randomization resistant device detection for TSCM

- New device_identity.py: Clusters BLE/WiFi observations into probable
  physical devices using passive fingerprinting (not MAC addresses)
- Fingerprinting based on manufacturer data, service UUIDs, capabilities,
  timing patterns, and RSSI trajectories
- Session tracking with automatic gap detection
- Risk indicators: stable RSSI, MAC rotation, ESP32 chipsets, audio-capable
- Full audit trail for all clustering decisions

- New ble_scanner.py: Cross-platform BLE scanning with bleak library
- Detects AirTags, Tile, SmartTags, ESP32 by manufacturer ID
- Fallback to system tools (btmgmt, hcitool, system_profiler)

- Added API endpoints for device identity clustering (/tscm/identity/*)
- Updated setup.sh with bleak dependency
- Updated documentation with TSCM features and hardware requirements

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-01-14 15:19:20 +00:00
parent 35ca3f3a07
commit 21b0a153e8
9 changed files with 2189 additions and 9 deletions

View File

@@ -75,13 +75,47 @@ Complete feature list for all modules.
## Bluetooth Scanning
- **BLE and Classic** Bluetooth device scanning
- **Multiple scan modes** - hcitool, bluetoothctl
- **Multiple scan modes** - hcitool, bluetoothctl, bleak
- **Tracker detection** - AirTag, Tile, Samsung SmartTag, Chipolo
- **Device classification** - phones, audio, wearables, computers
- **Manufacturer lookup** via OUI database
- **Manufacturer lookup** via OUI database and Bluetooth Company IDs
- **Proximity radar** visualization
- **Device type breakdown** chart
## TSCM Counter-Surveillance Mode
Technical Surveillance Countermeasures (TSCM) screening for detecting wireless surveillance indicators.
### Wireless Sweep Features
- **BLE scanning** with manufacturer data detection (AirTags, Tile, SmartTags, ESP32)
- **WiFi scanning** for rogue APs, hidden SSIDs, camera devices
- **RF spectrum analysis** (requires RTL-SDR) - FM bugs, ISM bands, video transmitters
- **Cross-protocol correlation** - links devices across BLE/WiFi/RF
- **Baseline comparison** - detect new/unknown devices vs known environment
### MAC-Randomization Resistant Detection
- **Device fingerprinting** based on advertisement payloads, not MAC addresses
- **Behavioral clustering** - groups observations into probable physical devices
- **Session tracking** - monitors device presence windows
- **Timing pattern analysis** - detects characteristic advertising intervals
- **RSSI trajectory correlation** - identifies co-located devices
### Risk Assessment
- **Three-tier scoring model**:
- Informational (0-2): Known or expected devices
- Needs Review (3-5): Unusual devices requiring assessment
- High Interest (6+): Multiple indicators warrant investigation
- **Risk indicators**: Stable RSSI, audio-capable, ESP32 chipsets, hidden identity, MAC rotation
- **Audit trail** - full evidence chain for each link/flag
- **Client-safe disclaimers** - findings are indicators, not confirmed surveillance
### Limitations (Documented)
- Cannot detect non-transmitting devices
- False positives/negatives expected
- Results require professional verification
- No cryptographic de-randomization
- Passive screening only (no active probing by default)
## User Interface
- **Mode-specific header stats** - real-time badges showing key metrics per mode

View File

@@ -179,6 +179,7 @@ Open **http://localhost:5050** in your browser.
|---------|---------|
| `flask` | Web server |
| `skyfield` | Satellite tracking |
| `bleak` | BLE scanning with manufacturer data (TSCM) |
---
@@ -199,9 +200,57 @@ https://github.com/flightaware/dump1090
---
## TSCM Mode Requirements
TSCM (Technical Surveillance Countermeasures) mode requires specific hardware for full functionality:
### BLE Scanning (Tracker Detection)
- Any Bluetooth adapter supported by your OS
- `bleak` Python library for manufacturer data detection
- Detects: AirTags, Tile, SmartTags, ESP32/ESP8266 devices
```bash
# Install bleak
pip install bleak>=0.21.0
# Or via apt (Debian/Ubuntu)
sudo apt install python3-bleak
```
### RF Spectrum Analysis
- **RTL-SDR dongle** (required for RF sweeps)
- `rtl_power` command from `rtl-sdr` package
Frequency bands scanned:
| Band | Frequency | Purpose |
|------|-----------|---------|
| FM Broadcast | 88-108 MHz | FM bugs |
| 315 MHz ISM | 315 MHz | US wireless devices |
| 433 MHz ISM | 433-434 MHz | EU wireless devices |
| 868 MHz ISM | 868-869 MHz | EU IoT devices |
| 915 MHz ISM | 902-928 MHz | US IoT devices |
| 1.2 GHz | 1200-1300 MHz | Video transmitters |
| 2.4 GHz ISM | 2400-2500 MHz | WiFi/BT/Video |
```bash
# Linux
sudo apt install rtl-sdr
# macOS
brew install librtlsdr
```
### WiFi Scanning
- Standard WiFi adapter (managed mode for basic scanning)
- Monitor mode capable adapter for advanced features
- `aircrack-ng` suite for monitor mode management
---
## Notes
- **Bluetooth on macOS**: Uses native CoreBluetooth, bluez tools not needed
- **Bluetooth on macOS**: Uses bleak library (CoreBluetooth backend), bluez tools not needed
- **WiFi on macOS**: Monitor mode has limited support, full functionality on Linux
- **System tools**: `iw`, `iwconfig`, `rfkill`, `ip` are pre-installed on most Linux systems
- **TSCM on macOS**: BLE and WiFi scanning work; RF spectrum requires RTL-SDR

View File

@@ -2,6 +2,9 @@
flask>=2.0.0
requests>=2.28.0
# BLE scanning with manufacturer data detection (optional - for TSCM)
bleak>=0.21.0
# Satellite tracking (optional - only needed for satellite features)
skyfield>=1.45
@@ -14,4 +17,4 @@ pyserial>=3.5
# ruff>=0.1.0
# black>=23.0.0
# mypy>=1.0.0
flask-sock
flask-sock

View File

@@ -763,7 +763,12 @@ def _scan_wifi_networks(interface: str) -> list[dict]:
def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]:
"""Scan for Bluetooth devices using system tools."""
"""
Scan for Bluetooth devices with manufacturer data detection.
Uses the BLE scanner module (bleak library) for proper manufacturer ID
detection, with fallback to system tools if bleak is unavailable.
"""
import platform
import os
import re
@@ -775,6 +780,47 @@ def _scan_bluetooth_devices(interface: str, duration: int = 10) -> list[dict]:
logger.info(f"Starting Bluetooth scan (duration={duration}s, interface={interface})")
# Try the BLE scanner module first (uses bleak for proper manufacturer detection)
try:
from utils.tscm.ble_scanner import get_ble_scanner, scan_ble_devices
logger.info("Using BLE scanner module with manufacturer detection")
ble_devices = scan_ble_devices(duration)
for ble_dev in ble_devices:
mac = ble_dev.get('mac', '').upper()
if mac and mac not in seen_macs:
seen_macs.add(mac)
device = {
'mac': mac,
'name': ble_dev.get('name', 'Unknown'),
'rssi': ble_dev.get('rssi'),
'type': 'ble',
'manufacturer': ble_dev.get('manufacturer_name'),
'manufacturer_id': ble_dev.get('manufacturer_id'),
'is_tracker': ble_dev.get('is_tracker', False),
'tracker_type': ble_dev.get('tracker_type'),
'is_airtag': ble_dev.get('is_airtag', False),
'is_tile': ble_dev.get('is_tile', False),
'is_smarttag': ble_dev.get('is_smarttag', False),
'is_espressif': ble_dev.get('is_espressif', False),
'service_uuids': ble_dev.get('service_uuids', []),
}
devices.append(device)
if devices:
logger.info(f"BLE scanner found {len(devices)} devices")
trackers = [d for d in devices if d.get('is_tracker')]
if trackers:
logger.info(f"Trackers detected: {[d.get('tracker_type') for d in trackers]}")
return devices
except ImportError:
logger.warning("BLE scanner module not available, using fallback")
except Exception as e:
logger.warning(f"BLE scanner failed: {e}, using fallback")
if platform.system() == 'Darwin':
# macOS: Use system_profiler for basic Bluetooth info
try:
@@ -1820,3 +1866,298 @@ def _generate_assessment(summary: dict) -> str:
"BASELINE ENVIRONMENT: No significant anomalies detected. "
"Environment appears consistent with expected wireless activity."
)
# =============================================================================
# Device Identity Endpoints (MAC-Randomization Resistant Detection)
# =============================================================================
@tscm_bp.route('/identity/ingest/ble', methods=['POST'])
def ingest_ble_observation():
"""
Ingest a BLE observation for device identity clustering.
This endpoint accepts BLE advertisement data and feeds it into the
MAC-randomization resistant device detection engine.
Expected JSON payload:
{
"timestamp": "2024-01-01T12:00:00", // ISO format or omit for now
"addr": "AA:BB:CC:DD:EE:FF", // BLE address (may be randomized)
"addr_type": "rpa", // public/random_static/rpa/nrpa/unknown
"rssi": -65, // dBm
"tx_power": -10, // dBm (optional)
"adv_type": "ADV_IND", // Advertisement type
"manufacturer_id": 1234, // Company ID (optional)
"manufacturer_data": "0102030405", // Hex string (optional)
"service_uuids": ["uuid1", "uuid2"], // List of UUIDs (optional)
"local_name": "Device Name", // Advertised name (optional)
"appearance": 960, // BLE appearance (optional)
"packet_length": 31 // Total packet length (optional)
}
"""
try:
from utils.tscm.device_identity import ingest_ble_dict
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No data provided'}), 400
session = ingest_ble_dict(data)
return jsonify({
'status': 'success',
'session_id': session.session_id,
'observation_count': len(session.observations),
})
except Exception as e:
logger.error(f"BLE ingestion error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/identity/ingest/wifi', methods=['POST'])
def ingest_wifi_observation():
"""
Ingest a WiFi observation for device identity clustering.
Expected JSON payload:
{
"timestamp": "2024-01-01T12:00:00",
"src_mac": "AA:BB:CC:DD:EE:FF", // Client MAC (may be randomized)
"dst_mac": "11:22:33:44:55:66", // Destination MAC
"bssid": "11:22:33:44:55:66", // AP BSSID
"ssid": "NetworkName", // SSID if available
"frame_type": "probe_request", // Frame type
"rssi": -70, // dBm
"channel": 6, // WiFi channel
"ht_capable": true, // 802.11n capable
"vht_capable": true, // 802.11ac capable
"he_capable": false, // 802.11ax capable
"supported_rates": [1, 2, 5.5, 11], // Supported rates
"vendor_ies": [["001122", 10]], // [(OUI, length), ...]
"probed_ssids": ["ssid1", "ssid2"] // For probe requests
}
"""
try:
from utils.tscm.device_identity import ingest_wifi_dict
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No data provided'}), 400
session = ingest_wifi_dict(data)
return jsonify({
'status': 'success',
'session_id': session.session_id,
'observation_count': len(session.observations),
})
except Exception as e:
logger.error(f"WiFi ingestion error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/identity/ingest/batch', methods=['POST'])
def ingest_batch_observations():
"""
Ingest multiple observations in a single request.
Expected JSON payload:
{
"ble": [<ble_observation>, ...],
"wifi": [<wifi_observation>, ...]
}
"""
try:
from utils.tscm.device_identity import ingest_ble_dict, ingest_wifi_dict
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No data provided'}), 400
ble_count = 0
wifi_count = 0
for ble_obs in data.get('ble', []):
ingest_ble_dict(ble_obs)
ble_count += 1
for wifi_obs in data.get('wifi', []):
ingest_wifi_dict(wifi_obs)
wifi_count += 1
return jsonify({
'status': 'success',
'ble_ingested': ble_count,
'wifi_ingested': wifi_count,
})
except Exception as e:
logger.error(f"Batch ingestion error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/identity/clusters')
def get_device_clusters():
"""
Get all device clusters (probable physical device identities).
Query parameters:
- min_confidence: Minimum cluster confidence (0-1, default 0)
- protocol: Filter by protocol ('ble' or 'wifi')
- risk_level: Filter by risk level ('high', 'medium', 'low', 'informational')
"""
try:
from utils.tscm.device_identity import get_identity_engine
engine = get_identity_engine()
min_conf = request.args.get('min_confidence', 0, type=float)
protocol = request.args.get('protocol')
risk_filter = request.args.get('risk_level')
clusters = engine.get_clusters(min_confidence=min_conf)
if protocol:
clusters = [c for c in clusters if c.protocol == protocol]
if risk_filter:
clusters = [c for c in clusters if c.risk_level.value == risk_filter]
return jsonify({
'status': 'success',
'count': len(clusters),
'clusters': [c.to_dict() for c in clusters],
'disclaimer': (
"Clusters represent PROBABLE device identities based on passive "
"fingerprinting. Results are statistical correlations, not "
"confirmed matches. False positives/negatives are expected."
)
})
except Exception as e:
logger.error(f"Get clusters error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/identity/clusters/high-risk')
def get_high_risk_clusters():
"""Get device clusters with HIGH risk level."""
try:
from utils.tscm.device_identity import get_identity_engine
engine = get_identity_engine()
clusters = engine.get_high_risk_clusters()
return jsonify({
'status': 'success',
'count': len(clusters),
'clusters': [c.to_dict() for c in clusters],
'disclaimer': (
"High-risk classification indicates multiple behavioral indicators "
"consistent with potential surveillance devices. This does NOT "
"confirm surveillance activity. Professional verification required."
)
})
except Exception as e:
logger.error(f"Get high-risk clusters error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/identity/summary')
def get_identity_summary():
"""
Get summary of device identity analysis.
Returns statistics, cluster counts by risk level, and monitoring period.
"""
try:
from utils.tscm.device_identity import get_identity_engine
engine = get_identity_engine()
summary = engine.get_summary()
return jsonify({
'status': 'success',
'summary': summary
})
except Exception as e:
logger.error(f"Get identity summary error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/identity/finalize', methods=['POST'])
def finalize_identity_sessions():
"""
Finalize all active sessions and complete clustering.
Call this at the end of a monitoring period to ensure all observations
are properly clustered and assessed.
"""
try:
from utils.tscm.device_identity import get_identity_engine
engine = get_identity_engine()
engine.finalize_all_sessions()
summary = engine.get_summary()
return jsonify({
'status': 'success',
'message': 'All sessions finalized',
'summary': summary
})
except Exception as e:
logger.error(f"Finalize sessions error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/identity/reset', methods=['POST'])
def reset_identity_engine():
"""
Reset the device identity engine.
Clears all sessions, clusters, and monitoring state.
"""
try:
from utils.tscm.device_identity import reset_identity_engine as reset_engine
reset_engine()
return jsonify({
'status': 'success',
'message': 'Device identity engine reset'
})
except Exception as e:
logger.error(f"Reset identity engine error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@tscm_bp.route('/identity/cluster/<cluster_id>')
def get_cluster_detail(cluster_id: str):
"""Get detailed information for a specific cluster."""
try:
from utils.tscm.device_identity import get_identity_engine
engine = get_identity_engine()
if cluster_id not in engine.clusters:
return jsonify({
'status': 'error',
'message': 'Cluster not found'
}), 404
cluster = engine.clusters[cluster_id]
return jsonify({
'status': 'success',
'cluster': cluster.to_dict()
})
except Exception as e:
logger.error(f"Get cluster detail error: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500

View File

@@ -348,6 +348,7 @@ install_macos_packages() {
brew_install gpsd
warn "macOS note: hcitool/hciconfig are Linux (BlueZ) utilities and often unavailable on macOS."
info "TSCM BLE scanning uses bleak library (installed via pip) for manufacturer data detection."
echo
}
@@ -539,6 +540,8 @@ install_debian_packages() {
# Install Python packages via apt (more reliable than pip on modern Debian/Ubuntu)
$SUDO apt-get install -y python3-flask python3-requests python3-serial >/dev/null 2>&1 || true
$SUDO apt-get install -y python3-skyfield >/dev/null 2>&1 || true
# bleak for BLE scanning with manufacturer data (TSCM mode)
$SUDO apt-get install -y python3-bleak >/dev/null 2>&1 || true
progress "Installing dump1090"
if ! cmd_exists dump1090 && ! cmd_exists dump1090-mutability; then

View File

@@ -1,10 +1,11 @@
"""
TSCM (Technical Surveillance Countermeasures) Utilities Package
Provides baseline recording, threat detection, and analysis tools
Provides baseline recording, threat detection, correlation analysis,
BLE scanning, and MAC-randomization resistant device identity tools
for counter-surveillance operations.
"""
from __future__ import annotations
__all__ = ['detector', 'baseline']
__all__ = ['detector', 'baseline', 'correlation', 'ble_scanner', 'device_identity']

476
utils/tscm/ble_scanner.py Normal file
View File

@@ -0,0 +1,476 @@
"""
BLE Scanner for TSCM
Cross-platform BLE scanning with manufacturer data detection.
Supports macOS and Linux using the bleak library with fallback to system tools.
Detects:
- Apple AirTags (company ID 0x004C)
- Tile trackers
- Samsung SmartTags
- ESP32/ESP8266 devices (Espressif, company ID 0x02E5)
- Generic BLE devices with suspicious characteristics
"""
import asyncio
import logging
import platform
import re
import subprocess
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
logger = logging.getLogger('intercept.tscm.ble')
# Manufacturer company IDs (Bluetooth SIG assigned)
COMPANY_IDS = {
0x004C: 'Apple',
0x02E5: 'Espressif',
0x0059: 'Nordic Semiconductor',
0x000D: 'Texas Instruments',
0x0075: 'Samsung',
0x00E0: 'Google',
0x0006: 'Microsoft',
0x01DA: 'Tile',
}
# Known tracker signatures
TRACKER_SIGNATURES = {
# Apple AirTag detection patterns
'airtag': {
'company_id': 0x004C,
'data_patterns': [
b'\x12\x19', # AirTag/Find My advertisement prefix
b'\x07\x19', # Offline Finding
],
'name_patterns': ['airtag', 'findmy', 'find my'],
},
# Tile tracker
'tile': {
'company_id': 0x01DA,
'name_patterns': ['tile'],
},
# Samsung SmartTag
'smarttag': {
'company_id': 0x0075,
'name_patterns': ['smarttag', 'smart tag', 'galaxy smart'],
},
# ESP32/ESP8266
'espressif': {
'company_id': 0x02E5,
'name_patterns': ['esp32', 'esp8266', 'espressif'],
},
}
@dataclass
class BLEDevice:
"""Represents a detected BLE device with full advertisement data."""
mac: str
name: Optional[str] = None
rssi: Optional[int] = None
manufacturer_id: Optional[int] = None
manufacturer_name: Optional[str] = None
manufacturer_data: bytes = field(default_factory=bytes)
service_uuids: list = field(default_factory=list)
tx_power: Optional[int] = None
is_connectable: bool = True
# Detection flags
is_airtag: bool = False
is_tile: bool = False
is_smarttag: bool = False
is_espressif: bool = False
is_tracker: bool = False
tracker_type: Optional[str] = None
first_seen: datetime = field(default_factory=datetime.now)
last_seen: datetime = field(default_factory=datetime.now)
detection_count: int = 1
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
'mac': self.mac,
'name': self.name or 'Unknown',
'rssi': self.rssi,
'manufacturer_id': self.manufacturer_id,
'manufacturer_name': self.manufacturer_name,
'service_uuids': self.service_uuids,
'tx_power': self.tx_power,
'is_connectable': self.is_connectable,
'is_airtag': self.is_airtag,
'is_tile': self.is_tile,
'is_smarttag': self.is_smarttag,
'is_espressif': self.is_espressif,
'is_tracker': self.is_tracker,
'tracker_type': self.tracker_type,
'detection_count': self.detection_count,
'type': 'ble',
}
class BLEScanner:
"""
Cross-platform BLE scanner with manufacturer data detection.
Uses bleak library for proper BLE scanning, with fallback to
system tools (hcitool/btmgmt on Linux, system_profiler on macOS).
"""
def __init__(self):
self.devices: dict[str, BLEDevice] = {}
self._bleak_available = self._check_bleak()
self._scanning = False
def _check_bleak(self) -> bool:
"""Check if bleak library is available."""
try:
import bleak
return True
except ImportError:
logger.warning("bleak library not available - using fallback scanning")
return False
async def scan_async(self, duration: int = 10) -> list[BLEDevice]:
"""
Perform async BLE scan using bleak.
Args:
duration: Scan duration in seconds
Returns:
List of detected BLE devices
"""
if not self._bleak_available:
# Use synchronous fallback
return self._scan_fallback(duration)
try:
from bleak import BleakScanner
from bleak.backends.device import BLEDevice as BleakDevice
from bleak.backends.scanner import AdvertisementData
detected = {}
def detection_callback(device: BleakDevice, adv_data: AdvertisementData):
"""Callback for each detected device."""
mac = device.address.upper()
if mac in detected:
# Update existing device
detected[mac].rssi = adv_data.rssi
detected[mac].last_seen = datetime.now()
detected[mac].detection_count += 1
else:
# Create new device entry
ble_device = BLEDevice(
mac=mac,
name=adv_data.local_name or device.name,
rssi=adv_data.rssi,
service_uuids=list(adv_data.service_uuids) if adv_data.service_uuids else [],
tx_power=adv_data.tx_power,
)
# Parse manufacturer data
if adv_data.manufacturer_data:
for company_id, data in adv_data.manufacturer_data.items():
ble_device.manufacturer_id = company_id
ble_device.manufacturer_name = COMPANY_IDS.get(company_id, f'Unknown ({hex(company_id)})')
ble_device.manufacturer_data = bytes(data)
# Check for known trackers
self._identify_tracker(ble_device, company_id, data)
# Also check name patterns
self._check_name_patterns(ble_device)
detected[mac] = ble_device
logger.info(f"Starting BLE scan with bleak (duration={duration}s)")
scanner = BleakScanner(detection_callback=detection_callback)
await scanner.start()
await asyncio.sleep(duration)
await scanner.stop()
# Update internal device list
for mac, device in detected.items():
if mac in self.devices:
self.devices[mac].rssi = device.rssi
self.devices[mac].last_seen = device.last_seen
self.devices[mac].detection_count += 1
else:
self.devices[mac] = device
logger.info(f"BLE scan complete: {len(detected)} devices found")
return list(detected.values())
except Exception as e:
logger.error(f"Bleak scan failed: {e}")
return self._scan_fallback(duration)
def scan(self, duration: int = 10) -> list[BLEDevice]:
"""
Synchronous wrapper for BLE scanning.
Args:
duration: Scan duration in seconds
Returns:
List of detected BLE devices
"""
if self._bleak_available:
try:
# Try to get existing event loop
try:
loop = asyncio.get_running_loop()
# We're in an async context, can't use run()
future = asyncio.ensure_future(self.scan_async(duration))
return asyncio.get_event_loop().run_until_complete(future)
except RuntimeError:
# No running loop, create one
return asyncio.run(self.scan_async(duration))
except Exception as e:
logger.error(f"Async scan failed: {e}")
return self._scan_fallback(duration)
else:
return self._scan_fallback(duration)
def _identify_tracker(self, device: BLEDevice, company_id: int, data: bytes):
"""Identify if device is a known tracker type."""
# Apple AirTag detection
if company_id == 0x004C: # Apple
# Check for Find My / AirTag advertisement patterns
if len(data) >= 2:
# AirTag advertisements have specific byte patterns
if data[0] == 0x12 and data[1] == 0x19:
device.is_airtag = True
device.is_tracker = True
device.tracker_type = 'AirTag'
logger.info(f"AirTag detected: {device.mac}")
elif data[0] == 0x07: # Offline Finding
device.is_airtag = True
device.is_tracker = True
device.tracker_type = 'AirTag (Offline)'
logger.info(f"AirTag (offline mode) detected: {device.mac}")
# Tile tracker
elif company_id == 0x01DA: # Tile
device.is_tile = True
device.is_tracker = True
device.tracker_type = 'Tile'
logger.info(f"Tile tracker detected: {device.mac}")
# Samsung SmartTag
elif company_id == 0x0075: # Samsung
# Check if it's specifically a SmartTag
device.is_smarttag = True
device.is_tracker = True
device.tracker_type = 'SmartTag'
logger.info(f"Samsung SmartTag detected: {device.mac}")
# Espressif (ESP32/ESP8266)
elif company_id == 0x02E5: # Espressif
device.is_espressif = True
device.tracker_type = 'ESP32/ESP8266'
logger.info(f"ESP32/ESP8266 device detected: {device.mac}")
def _check_name_patterns(self, device: BLEDevice):
"""Check device name for tracker patterns."""
if not device.name:
return
name_lower = device.name.lower()
# Check each tracker type
for tracker_type, sig in TRACKER_SIGNATURES.items():
patterns = sig.get('name_patterns', [])
for pattern in patterns:
if pattern in name_lower:
if tracker_type == 'airtag':
device.is_airtag = True
device.is_tracker = True
device.tracker_type = 'AirTag'
elif tracker_type == 'tile':
device.is_tile = True
device.is_tracker = True
device.tracker_type = 'Tile'
elif tracker_type == 'smarttag':
device.is_smarttag = True
device.is_tracker = True
device.tracker_type = 'SmartTag'
elif tracker_type == 'espressif':
device.is_espressif = True
device.tracker_type = 'ESP32/ESP8266'
logger.info(f"Tracker identified by name: {device.name} -> {tracker_type}")
return
def _scan_fallback(self, duration: int = 10) -> list[BLEDevice]:
"""
Fallback scanning using system tools when bleak is unavailable.
Works on both macOS and Linux.
"""
system = platform.system()
if system == 'Darwin':
return self._scan_macos(duration)
else:
return self._scan_linux(duration)
def _scan_macos(self, duration: int = 10) -> list[BLEDevice]:
"""Fallback BLE scanning on macOS using system_profiler."""
devices = []
try:
import json
result = subprocess.run(
['system_profiler', 'SPBluetoothDataType', '-json'],
capture_output=True, text=True, timeout=15
)
data = json.loads(result.stdout)
bt_data = data.get('SPBluetoothDataType', [{}])[0]
# Get connected/paired devices
for section in ['device_connected', 'device_title']:
section_data = bt_data.get(section, {})
if isinstance(section_data, dict):
for name, info in section_data.items():
if isinstance(info, dict):
mac = info.get('device_address', '').upper()
if mac:
device = BLEDevice(
mac=mac,
name=name,
)
# Check name patterns
self._check_name_patterns(device)
devices.append(device)
logger.info(f"macOS fallback scan found {len(devices)} devices")
except Exception as e:
logger.error(f"macOS fallback scan failed: {e}")
return devices
def _scan_linux(self, duration: int = 10) -> list[BLEDevice]:
"""Fallback BLE scanning on Linux using bluetoothctl/btmgmt."""
import shutil
devices = []
seen_macs = set()
# Method 1: Try btmgmt for BLE devices
if shutil.which('btmgmt'):
try:
logger.info("Trying btmgmt find...")
result = subprocess.run(
['btmgmt', 'find'],
capture_output=True, text=True, timeout=duration + 5
)
for line in result.stdout.split('\n'):
if 'dev_found' in line.lower() or ('type' in line.lower() and ':' in line):
mac_match = re.search(
r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:'
r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})',
line
)
if mac_match:
mac = mac_match.group(1).upper()
if mac not in seen_macs:
seen_macs.add(mac)
name_match = re.search(r'name\s+(.+?)(?:\s|$)', line, re.I)
name = name_match.group(1) if name_match else None
device = BLEDevice(mac=mac, name=name)
self._check_name_patterns(device)
devices.append(device)
logger.info(f"btmgmt found {len(devices)} devices")
except Exception as e:
logger.warning(f"btmgmt failed: {e}")
# Method 2: Try hcitool lescan
if not devices and shutil.which('hcitool'):
try:
logger.info("Trying hcitool lescan...")
# Start lescan in background
process = subprocess.Popen(
['hcitool', 'lescan', '--duplicates'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
import time
time.sleep(duration)
process.terminate()
stdout, _ = process.communicate(timeout=2)
for line in stdout.split('\n'):
mac_match = re.search(
r'([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:'
r'[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})',
line
)
if mac_match:
mac = mac_match.group(1).upper()
if mac not in seen_macs:
seen_macs.add(mac)
# Extract name (comes after MAC)
parts = line.strip().split()
name = ' '.join(parts[1:]) if len(parts) > 1 else None
device = BLEDevice(mac=mac, name=name if name != '(unknown)' else None)
self._check_name_patterns(device)
devices.append(device)
logger.info(f"hcitool lescan found {len(devices)} devices")
except Exception as e:
logger.warning(f"hcitool lescan failed: {e}")
return devices
def get_trackers(self) -> list[BLEDevice]:
"""Get all detected tracker devices."""
return [d for d in self.devices.values() if d.is_tracker]
def get_espressif_devices(self) -> list[BLEDevice]:
"""Get all detected ESP32/ESP8266 devices."""
return [d for d in self.devices.values() if d.is_espressif]
def clear(self):
"""Clear all detected devices."""
self.devices.clear()
# Singleton instance
_scanner: Optional[BLEScanner] = None
def get_ble_scanner() -> BLEScanner:
"""Get the global BLE scanner instance."""
global _scanner
if _scanner is None:
_scanner = BLEScanner()
return _scanner
def scan_ble_devices(duration: int = 10) -> list[dict]:
"""
Convenience function to scan for BLE devices.
Args:
duration: Scan duration in seconds
Returns:
List of device dictionaries
"""
scanner = get_ble_scanner()
devices = scanner.scan(duration)
return [d.to_dict() for d in devices]

View File

@@ -447,8 +447,62 @@ class CorrelationEngine:
mac_prefix = mac[:8] if len(mac) >= 8 else ''
tracker_detected = False
# Check for Apple AirTag
if mac_prefix in TRACKER_SIGNATURES.get('airtag_oui', []):
# Check for tracker flags from BLE scanner (manufacturer ID detection)
if device.get('is_airtag'):
profile.add_indicator(
IndicatorType.AIRTAG_DETECTED,
'Apple AirTag detected via manufacturer data',
{'mac': mac, 'tracker_type': 'AirTag'}
)
profile.device_type = device.get('tracker_type', 'AirTag')
tracker_detected = True
if device.get('is_tile'):
profile.add_indicator(
IndicatorType.TILE_DETECTED,
'Tile tracker detected via manufacturer data',
{'mac': mac, 'tracker_type': 'Tile'}
)
profile.device_type = 'Tile Tracker'
tracker_detected = True
if device.get('is_smarttag'):
profile.add_indicator(
IndicatorType.SMARTTAG_DETECTED,
'Samsung SmartTag detected via manufacturer data',
{'mac': mac, 'tracker_type': 'SmartTag'}
)
profile.device_type = 'Samsung SmartTag'
tracker_detected = True
if device.get('is_espressif'):
profile.add_indicator(
IndicatorType.ESP32_DEVICE,
'ESP32/ESP8266 detected via Espressif manufacturer ID',
{'mac': mac, 'chipset': 'Espressif'}
)
profile.manufacturer = 'Espressif'
profile.device_type = device.get('tracker_type', 'ESP32/ESP8266')
tracker_detected = True
# Check manufacturer_id directly
mfg_id = device.get('manufacturer_id')
if mfg_id:
if mfg_id == 0x004C and not device.get('is_airtag'):
# Apple device - could be AirTag
profile.manufacturer = 'Apple'
elif mfg_id == 0x02E5 and not device.get('is_espressif'):
# Espressif device
profile.add_indicator(
IndicatorType.ESP32_DEVICE,
'ESP32/ESP8266 detected via manufacturer ID',
{'mac': mac, 'manufacturer_id': mfg_id}
)
profile.manufacturer = 'Espressif'
tracker_detected = True
# Fallback: Check for Apple AirTag by OUI
if not tracker_detected and mac_prefix in TRACKER_SIGNATURES.get('airtag_oui', []):
profile.add_indicator(
IndicatorType.AIRTAG_DETECTED,
'Apple AirTag detected - potential tracking device',

File diff suppressed because it is too large Load Diff