Files
intercept/utils/correlation.py
Smittix e00fbfddc1 v2.26.0: fix SSE fanout crash and branded logo FOUC
- Fix SSE fanout thread AttributeError when source queue is None during
  interpreter shutdown by snapshotting to local variable with null guard
- Fix branded "i" logo rendering oversized on first page load (FOUC) by
  adding inline width/height to SVG elements across 10 templates
- Bump version to 2.26.0 in config.py, pyproject.toml, and CHANGELOG.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 11:51:27 +00:00

314 lines
11 KiB
Python

"""
Device correlation engine for matching WiFi and Bluetooth devices.
Uses timing-based correlation to identify when WiFi and Bluetooth
signals likely belong to the same physical device.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
from utils.database import add_correlation
from utils.database import get_correlations as db_get_correlations
logger = logging.getLogger('intercept.correlation')
@dataclass
class DeviceObservation:
"""A single observation of a device."""
mac: str
first_seen: datetime
last_seen: datetime
rssi: int | None = None
name: str | None = None
manufacturer: str | None = None
class DeviceCorrelator:
"""
Correlates WiFi and Bluetooth devices based on timing patterns.
Devices are considered potentially correlated if:
1. They appear within a short time window of each other
2. They have similar signal strength patterns (optional)
3. They share the same OUI/manufacturer (bonus confidence)
"""
def __init__(
self,
time_window_seconds: int = 30,
min_confidence: float = 0.5,
rssi_threshold: int = 20
):
"""
Initialize correlator.
Args:
time_window_seconds: Max time difference for correlation (default 30s)
min_confidence: Minimum confidence score to report (default 0.5)
rssi_threshold: Max RSSI difference for signal-based correlation
"""
self.time_window = timedelta(seconds=time_window_seconds)
self.min_confidence = min_confidence
self.rssi_threshold = rssi_threshold
def correlate(
self,
wifi_devices: dict[str, dict[str, Any]],
bt_devices: dict[str, dict[str, Any]]
) -> list[dict]:
"""
Find correlations between WiFi and Bluetooth devices.
Args:
wifi_devices: Dict of WiFi devices keyed by MAC
bt_devices: Dict of Bluetooth devices keyed by MAC
Returns:
List of correlation results with confidence scores
"""
correlations = []
for wifi_mac, wifi_data in wifi_devices.items():
wifi_obs = self._to_observation(wifi_mac, wifi_data, 'wifi')
if not wifi_obs:
continue
for bt_mac, bt_data in bt_devices.items():
bt_obs = self._to_observation(bt_mac, bt_data, 'bluetooth')
if not bt_obs:
continue
confidence = self._calculate_confidence(wifi_obs, bt_obs)
if confidence >= self.min_confidence:
correlations.append({
'wifi_mac': wifi_mac,
'wifi_name': wifi_obs.name,
'bt_mac': bt_mac,
'bt_name': bt_obs.name,
'confidence': round(confidence, 2),
'reason': self._get_correlation_reason(wifi_obs, bt_obs)
})
# Persist high-confidence correlations
if confidence >= 0.7:
try:
add_correlation(
wifi_mac=wifi_mac,
bt_mac=bt_mac,
confidence=confidence,
metadata={
'wifi_name': wifi_obs.name,
'bt_name': bt_obs.name
}
)
except Exception as e:
logger.debug(f"Failed to persist correlation: {e}")
# Sort by confidence (highest first)
correlations.sort(key=lambda x: x['confidence'], reverse=True)
return correlations
def _to_observation(
self,
mac: str,
data: dict[str, Any],
device_type: str
) -> DeviceObservation | None:
"""Convert device dict to observation."""
try:
# Handle different timestamp formats
first_seen = data.get('first_seen') or data.get('firstSeen')
last_seen = data.get('last_seen') or data.get('lastSeen')
if isinstance(first_seen, str):
first_seen = datetime.fromisoformat(first_seen.replace('Z', '+00:00'))
elif isinstance(first_seen, (int, float)):
first_seen = datetime.fromtimestamp(first_seen / 1000)
else:
first_seen = datetime.now()
if isinstance(last_seen, str):
last_seen = datetime.fromisoformat(last_seen.replace('Z', '+00:00'))
elif isinstance(last_seen, (int, float)):
last_seen = datetime.fromtimestamp(last_seen / 1000)
else:
last_seen = datetime.now()
# Get RSSI (different field names)
rssi = data.get('rssi') or data.get('power') or data.get('signal')
if rssi is not None:
rssi = int(rssi)
# Get name
name = data.get('name') or data.get('essid') or data.get('ssid')
# Get manufacturer
manufacturer = data.get('manufacturer') or data.get('vendor')
return DeviceObservation(
mac=mac,
first_seen=first_seen,
last_seen=last_seen,
rssi=rssi,
name=name,
manufacturer=manufacturer
)
except Exception as e:
logger.debug(f"Failed to parse device {mac}: {e}")
return None
def _calculate_confidence(
self,
wifi: DeviceObservation,
bt: DeviceObservation
) -> float:
"""
Calculate correlation confidence score.
Score components:
- Timing overlap: 0.0-0.5 (primary factor)
- Same manufacturer: +0.2
- Similar RSSI: +0.1
- Both named: +0.1
Returns:
Confidence score 0.0-1.0
"""
confidence = 0.0
# Timing correlation (most important)
time_diff = abs((wifi.first_seen - bt.first_seen).total_seconds())
if time_diff <= self.time_window.total_seconds():
# Linear decay from 0.5 to 0.0 as time difference increases
timing_score = 0.5 * (1 - time_diff / self.time_window.total_seconds())
confidence += timing_score
else:
# Check if observation windows overlap at all
wifi_end = wifi.last_seen
bt_end = bt.last_seen
# If observation periods overlap
if wifi.first_seen <= bt_end and bt.first_seen <= wifi_end:
confidence += 0.25 # Partial credit for overlapping presence
# Manufacturer match
if wifi.manufacturer and bt.manufacturer:
wifi_mfg = wifi.manufacturer.lower()
bt_mfg = bt.manufacturer.lower()
if wifi_mfg == bt_mfg:
confidence += 0.2
elif wifi_mfg[:5] == bt_mfg[:5]: # Partial match
confidence += 0.1
# OUI match (first 3 octets of MAC)
wifi_oui = wifi.mac[:8].upper()
bt_oui = bt.mac[:8].upper()
if wifi_oui == bt_oui:
confidence += 0.15
# RSSI similarity
if wifi.rssi is not None and bt.rssi is not None:
rssi_diff = abs(wifi.rssi - bt.rssi)
if rssi_diff <= self.rssi_threshold:
rssi_score = 0.1 * (1 - rssi_diff / self.rssi_threshold)
confidence += rssi_score
# Both have names (suggests user device)
if wifi.name and bt.name:
confidence += 0.05
return min(confidence, 1.0)
def _get_correlation_reason(
self,
wifi: DeviceObservation,
bt: DeviceObservation
) -> str:
"""Generate human-readable reason for correlation."""
reasons = []
time_diff = abs((wifi.first_seen - bt.first_seen).total_seconds())
if time_diff <= self.time_window.total_seconds():
reasons.append(f"appeared within {int(time_diff)}s")
wifi_oui = wifi.mac[:8].upper()
bt_oui = bt.mac[:8].upper()
if wifi_oui == bt_oui:
reasons.append("same OUI")
if wifi.manufacturer and bt.manufacturer and wifi.manufacturer.lower() == bt.manufacturer.lower():
reasons.append(f"same manufacturer ({wifi.manufacturer})")
if wifi.rssi is not None and bt.rssi is not None:
rssi_diff = abs(wifi.rssi - bt.rssi)
if rssi_diff <= self.rssi_threshold:
reasons.append("similar signal strength")
return "; ".join(reasons) if reasons else "timing overlap"
# Global correlator instance
correlator = DeviceCorrelator()
def get_correlations(
wifi_devices: dict[str, dict] | None = None,
bt_devices: dict[str, dict] | None = None,
min_confidence: float = 0.5,
include_historical: bool = True
) -> list[dict]:
"""
Get device correlations.
Args:
wifi_devices: Current WiFi devices (or None to use only historical)
bt_devices: Current Bluetooth devices (or None to use only historical)
min_confidence: Minimum confidence threshold
include_historical: Include correlations from database
Returns:
List of correlations sorted by confidence
"""
results = []
# Get live correlations
if wifi_devices and bt_devices:
correlator.min_confidence = min_confidence
results.extend(correlator.correlate(wifi_devices, bt_devices))
# Get historical correlations from database
if include_historical:
try:
historical = db_get_correlations(min_confidence)
for h in historical:
# Avoid duplicates
existing = next(
(r for r in results
if r['wifi_mac'] == h['wifi_mac'] and r['bt_mac'] == h['bt_mac']),
None
)
if not existing:
results.append({
'wifi_mac': h['wifi_mac'],
'bt_mac': h['bt_mac'],
'confidence': h['confidence'],
'reason': 'historical correlation',
'first_seen': h['first_seen'],
'last_seen': h['last_seen']
})
except Exception as e:
logger.debug(f"Failed to get historical correlations: {e}")
# Sort by confidence
results.sort(key=lambda x: x['confidence'], reverse=True)
return results