mirror of
https://github.com/smittix/intercept.git
synced 2026-04-24 14:50:00 -07:00
Add Listening Post, improve setup and documentation
- Add Listening Post mode with frequency scanner and audio monitoring - Add dependency warning for aircraft dashboard listen feature - Auto-restart audio when switching frequencies - Fix toolbar overflow on aircraft dashboard custom frequency - Update setup script with full macOS/Debian support - Clean up README and documentation for clarity - Add sox and dump1090 to Dockerfile - Add comprehensive tool reference to HARDWARE.md - Add correlation, settings, and database utilities - Add new test files for routes, validation, correlation, database 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
313
utils/correlation.py
Normal file
313
utils/correlation.py
Normal file
@@ -0,0 +1,313 @@
|
||||
"""
|
||||
Device correlation engine for matching WiFi and Bluetooth devices.
|
||||
|
||||
Uses timing-based correlation to identify when WiFi and Bluetooth
|
||||
signals likely belong to the same physical device.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from utils.database import add_correlation, get_correlations as db_get_correlations
|
||||
|
||||
logger = logging.getLogger('intercept.correlation')
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeviceObservation:
|
||||
"""A single observation of a device."""
|
||||
mac: str
|
||||
first_seen: datetime
|
||||
last_seen: datetime
|
||||
rssi: int | None = None
|
||||
name: str | None = None
|
||||
manufacturer: str | None = None
|
||||
|
||||
|
||||
class DeviceCorrelator:
|
||||
"""
|
||||
Correlates WiFi and Bluetooth devices based on timing patterns.
|
||||
|
||||
Devices are considered potentially correlated if:
|
||||
1. They appear within a short time window of each other
|
||||
2. They have similar signal strength patterns (optional)
|
||||
3. They share the same OUI/manufacturer (bonus confidence)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
time_window_seconds: int = 30,
|
||||
min_confidence: float = 0.5,
|
||||
rssi_threshold: int = 20
|
||||
):
|
||||
"""
|
||||
Initialize correlator.
|
||||
|
||||
Args:
|
||||
time_window_seconds: Max time difference for correlation (default 30s)
|
||||
min_confidence: Minimum confidence score to report (default 0.5)
|
||||
rssi_threshold: Max RSSI difference for signal-based correlation
|
||||
"""
|
||||
self.time_window = timedelta(seconds=time_window_seconds)
|
||||
self.min_confidence = min_confidence
|
||||
self.rssi_threshold = rssi_threshold
|
||||
|
||||
def correlate(
|
||||
self,
|
||||
wifi_devices: dict[str, dict[str, Any]],
|
||||
bt_devices: dict[str, dict[str, Any]]
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Find correlations between WiFi and Bluetooth devices.
|
||||
|
||||
Args:
|
||||
wifi_devices: Dict of WiFi devices keyed by MAC
|
||||
bt_devices: Dict of Bluetooth devices keyed by MAC
|
||||
|
||||
Returns:
|
||||
List of correlation results with confidence scores
|
||||
"""
|
||||
correlations = []
|
||||
|
||||
for wifi_mac, wifi_data in wifi_devices.items():
|
||||
wifi_obs = self._to_observation(wifi_mac, wifi_data, 'wifi')
|
||||
if not wifi_obs:
|
||||
continue
|
||||
|
||||
for bt_mac, bt_data in bt_devices.items():
|
||||
bt_obs = self._to_observation(bt_mac, bt_data, 'bluetooth')
|
||||
if not bt_obs:
|
||||
continue
|
||||
|
||||
confidence = self._calculate_confidence(wifi_obs, bt_obs)
|
||||
|
||||
if confidence >= self.min_confidence:
|
||||
correlations.append({
|
||||
'wifi_mac': wifi_mac,
|
||||
'wifi_name': wifi_obs.name,
|
||||
'bt_mac': bt_mac,
|
||||
'bt_name': bt_obs.name,
|
||||
'confidence': round(confidence, 2),
|
||||
'reason': self._get_correlation_reason(wifi_obs, bt_obs)
|
||||
})
|
||||
|
||||
# Persist high-confidence correlations
|
||||
if confidence >= 0.7:
|
||||
try:
|
||||
add_correlation(
|
||||
wifi_mac=wifi_mac,
|
||||
bt_mac=bt_mac,
|
||||
confidence=confidence,
|
||||
metadata={
|
||||
'wifi_name': wifi_obs.name,
|
||||
'bt_name': bt_obs.name
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to persist correlation: {e}")
|
||||
|
||||
# Sort by confidence (highest first)
|
||||
correlations.sort(key=lambda x: x['confidence'], reverse=True)
|
||||
|
||||
return correlations
|
||||
|
||||
def _to_observation(
|
||||
self,
|
||||
mac: str,
|
||||
data: dict[str, Any],
|
||||
device_type: str
|
||||
) -> DeviceObservation | None:
|
||||
"""Convert device dict to observation."""
|
||||
try:
|
||||
# Handle different timestamp formats
|
||||
first_seen = data.get('first_seen') or data.get('firstSeen')
|
||||
last_seen = data.get('last_seen') or data.get('lastSeen')
|
||||
|
||||
if isinstance(first_seen, str):
|
||||
first_seen = datetime.fromisoformat(first_seen.replace('Z', '+00:00'))
|
||||
elif isinstance(first_seen, (int, float)):
|
||||
first_seen = datetime.fromtimestamp(first_seen / 1000)
|
||||
else:
|
||||
first_seen = datetime.now()
|
||||
|
||||
if isinstance(last_seen, str):
|
||||
last_seen = datetime.fromisoformat(last_seen.replace('Z', '+00:00'))
|
||||
elif isinstance(last_seen, (int, float)):
|
||||
last_seen = datetime.fromtimestamp(last_seen / 1000)
|
||||
else:
|
||||
last_seen = datetime.now()
|
||||
|
||||
# Get RSSI (different field names)
|
||||
rssi = data.get('rssi') or data.get('power') or data.get('signal')
|
||||
if rssi is not None:
|
||||
rssi = int(rssi)
|
||||
|
||||
# Get name
|
||||
name = data.get('name') or data.get('essid') or data.get('ssid')
|
||||
|
||||
# Get manufacturer
|
||||
manufacturer = data.get('manufacturer') or data.get('vendor')
|
||||
|
||||
return DeviceObservation(
|
||||
mac=mac,
|
||||
first_seen=first_seen,
|
||||
last_seen=last_seen,
|
||||
rssi=rssi,
|
||||
name=name,
|
||||
manufacturer=manufacturer
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to parse device {mac}: {e}")
|
||||
return None
|
||||
|
||||
def _calculate_confidence(
|
||||
self,
|
||||
wifi: DeviceObservation,
|
||||
bt: DeviceObservation
|
||||
) -> float:
|
||||
"""
|
||||
Calculate correlation confidence score.
|
||||
|
||||
Score components:
|
||||
- Timing overlap: 0.0-0.5 (primary factor)
|
||||
- Same manufacturer: +0.2
|
||||
- Similar RSSI: +0.1
|
||||
- Both named: +0.1
|
||||
|
||||
Returns:
|
||||
Confidence score 0.0-1.0
|
||||
"""
|
||||
confidence = 0.0
|
||||
|
||||
# Timing correlation (most important)
|
||||
time_diff = abs((wifi.first_seen - bt.first_seen).total_seconds())
|
||||
if time_diff <= self.time_window.total_seconds():
|
||||
# Linear decay from 0.5 to 0.0 as time difference increases
|
||||
timing_score = 0.5 * (1 - time_diff / self.time_window.total_seconds())
|
||||
confidence += timing_score
|
||||
else:
|
||||
# Check if observation windows overlap at all
|
||||
wifi_end = wifi.last_seen
|
||||
bt_end = bt.last_seen
|
||||
|
||||
# If observation periods overlap
|
||||
if wifi.first_seen <= bt_end and bt.first_seen <= wifi_end:
|
||||
confidence += 0.25 # Partial credit for overlapping presence
|
||||
|
||||
# Manufacturer match
|
||||
if wifi.manufacturer and bt.manufacturer:
|
||||
wifi_mfg = wifi.manufacturer.lower()
|
||||
bt_mfg = bt.manufacturer.lower()
|
||||
if wifi_mfg == bt_mfg:
|
||||
confidence += 0.2
|
||||
elif wifi_mfg[:5] == bt_mfg[:5]: # Partial match
|
||||
confidence += 0.1
|
||||
|
||||
# OUI match (first 3 octets of MAC)
|
||||
wifi_oui = wifi.mac[:8].upper()
|
||||
bt_oui = bt.mac[:8].upper()
|
||||
if wifi_oui == bt_oui:
|
||||
confidence += 0.15
|
||||
|
||||
# RSSI similarity
|
||||
if wifi.rssi is not None and bt.rssi is not None:
|
||||
rssi_diff = abs(wifi.rssi - bt.rssi)
|
||||
if rssi_diff <= self.rssi_threshold:
|
||||
rssi_score = 0.1 * (1 - rssi_diff / self.rssi_threshold)
|
||||
confidence += rssi_score
|
||||
|
||||
# Both have names (suggests user device)
|
||||
if wifi.name and bt.name:
|
||||
confidence += 0.05
|
||||
|
||||
return min(confidence, 1.0)
|
||||
|
||||
def _get_correlation_reason(
|
||||
self,
|
||||
wifi: DeviceObservation,
|
||||
bt: DeviceObservation
|
||||
) -> str:
|
||||
"""Generate human-readable reason for correlation."""
|
||||
reasons = []
|
||||
|
||||
time_diff = abs((wifi.first_seen - bt.first_seen).total_seconds())
|
||||
if time_diff <= self.time_window.total_seconds():
|
||||
reasons.append(f"appeared within {int(time_diff)}s")
|
||||
|
||||
wifi_oui = wifi.mac[:8].upper()
|
||||
bt_oui = bt.mac[:8].upper()
|
||||
if wifi_oui == bt_oui:
|
||||
reasons.append("same OUI")
|
||||
|
||||
if wifi.manufacturer and bt.manufacturer:
|
||||
if wifi.manufacturer.lower() == bt.manufacturer.lower():
|
||||
reasons.append(f"same manufacturer ({wifi.manufacturer})")
|
||||
|
||||
if wifi.rssi is not None and bt.rssi is not None:
|
||||
rssi_diff = abs(wifi.rssi - bt.rssi)
|
||||
if rssi_diff <= self.rssi_threshold:
|
||||
reasons.append("similar signal strength")
|
||||
|
||||
return "; ".join(reasons) if reasons else "timing overlap"
|
||||
|
||||
|
||||
# Global correlator instance
|
||||
correlator = DeviceCorrelator()
|
||||
|
||||
|
||||
def get_correlations(
|
||||
wifi_devices: dict[str, dict] | None = None,
|
||||
bt_devices: dict[str, dict] | None = None,
|
||||
min_confidence: float = 0.5,
|
||||
include_historical: bool = True
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Get device correlations.
|
||||
|
||||
Args:
|
||||
wifi_devices: Current WiFi devices (or None to use only historical)
|
||||
bt_devices: Current Bluetooth devices (or None to use only historical)
|
||||
min_confidence: Minimum confidence threshold
|
||||
include_historical: Include correlations from database
|
||||
|
||||
Returns:
|
||||
List of correlations sorted by confidence
|
||||
"""
|
||||
results = []
|
||||
|
||||
# Get live correlations
|
||||
if wifi_devices and bt_devices:
|
||||
correlator.min_confidence = min_confidence
|
||||
results.extend(correlator.correlate(wifi_devices, bt_devices))
|
||||
|
||||
# Get historical correlations from database
|
||||
if include_historical:
|
||||
try:
|
||||
historical = db_get_correlations(min_confidence)
|
||||
for h in historical:
|
||||
# Avoid duplicates
|
||||
existing = next(
|
||||
(r for r in results
|
||||
if r['wifi_mac'] == h['wifi_mac'] and r['bt_mac'] == h['bt_mac']),
|
||||
None
|
||||
)
|
||||
if not existing:
|
||||
results.append({
|
||||
'wifi_mac': h['wifi_mac'],
|
||||
'bt_mac': h['bt_mac'],
|
||||
'confidence': h['confidence'],
|
||||
'reason': 'historical correlation',
|
||||
'first_seen': h['first_seen'],
|
||||
'last_seen': h['last_seen']
|
||||
})
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to get historical correlations: {e}")
|
||||
|
||||
# Sort by confidence
|
||||
results.sort(key=lambda x: x['confidence'], reverse=True)
|
||||
|
||||
return results
|
||||
Reference in New Issue
Block a user