Files
intercept/routes/correlation.py
Smittix e00fbfddc1 v2.26.0: fix SSE fanout crash and branded logo FOUC
- Fix SSE fanout thread AttributeError when source queue is None during
  interpreter shutdown by snapshotting to local variable with null guard
- Fix branded "i" logo rendering oversized on first page load (FOUC) by
  adding inline width/height to SVG elements across 10 templates
- Bump version to 2.26.0 in config.py, pyproject.toml, and CHANGELOG.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 11:51:27 +00:00

98 lines
3.2 KiB
Python

"""Device correlation routes."""
from __future__ import annotations
from flask import Blueprint, Response, request
import app as app_module
from utils.correlation import get_correlations
from utils.logging import get_logger
from utils.responses import api_error, api_success
logger = get_logger('intercept.correlation')
correlation_bp = Blueprint('correlation', __name__, url_prefix='/correlation')
@correlation_bp.route('', methods=['GET'])
def get_device_correlations() -> Response:
"""
Get device correlations between WiFi and Bluetooth.
Query params:
min_confidence: Minimum confidence threshold (default 0.5)
include_historical: Include database correlations (default true)
"""
min_confidence = request.args.get('min_confidence', 0.5, type=float)
include_historical = request.args.get('include_historical', 'true').lower() == 'true'
try:
# Get current device data
wifi_devices = dict(app_module.wifi_networks)
wifi_devices.update(dict(app_module.wifi_clients))
bt_devices = dict(app_module.bt_devices)
# Calculate correlations
correlations = get_correlations(
wifi_devices=wifi_devices,
bt_devices=bt_devices,
min_confidence=min_confidence,
include_historical=include_historical
)
return api_success(data={
'correlations': correlations,
'wifi_count': len(wifi_devices),
'bt_count': len(bt_devices)
})
except Exception as e:
logger.error(f"Error calculating correlations: {e}")
return api_error(str(e), 500)
@correlation_bp.route('/analyze', methods=['POST'])
def analyze_correlation() -> Response:
"""
Analyze specific device pair for correlation.
Request body:
wifi_mac: WiFi device MAC address
bt_mac: Bluetooth device MAC address
"""
data = request.json or {}
wifi_mac = data.get('wifi_mac')
bt_mac = data.get('bt_mac')
if not wifi_mac or not bt_mac:
return api_error('wifi_mac and bt_mac are required', 400)
try:
# Get device data
wifi_device = app_module.wifi_networks.get(wifi_mac)
if not wifi_device:
wifi_device = app_module.wifi_clients.get(wifi_mac)
bt_device = app_module.bt_devices.get(bt_mac)
if not wifi_device:
return api_error(f'WiFi device {wifi_mac} not found', 404)
if not bt_device:
return api_error(f'Bluetooth device {bt_mac} not found', 404)
# Calculate correlation for this specific pair
correlations = get_correlations(
wifi_devices={wifi_mac: wifi_device},
bt_devices={bt_mac: bt_device},
min_confidence=0.0, # Show even low confidence for analysis
include_historical=True
)
if correlations:
return api_success(data={'correlation': correlations[0]})
else:
return api_success(data={'correlation': None}, message='No correlation detected between these devices')
except Exception as e:
logger.error(f"Error analyzing correlation: {e}")
return api_error(str(e), 500)