feat: Add cross-mode analytics dashboard with geofencing, correlations, and data export

Adds a unified analytics mode under the Security nav group that aggregates
data across all signal modes. Includes emergency squawk alerting (7700/7600/7500),
vertical rate anomaly detection, ACARS/VDL2-to-ADS-B flight correlation,
geofence zones with enter/exit detection for aircraft/vessels/APRS stations,
temporal pattern detection, RSSI history tracking, Meshtastic topology mapping,
and JSON/CSV data export.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-02-17 12:59:31 +00:00
parent 831426948f
commit 0f5a414a09
22 changed files with 1943 additions and 108 deletions
+202
View File
@@ -0,0 +1,202 @@
"""Tests for analytics endpoints, export, and squawk detection."""
import json
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
@pytest.fixture(scope='session')
def app():
"""Create application for testing."""
import app as app_module
import utils.database as db_mod
from routes import register_blueprints
app_module.app.config['TESTING'] = True
# Use temp directory for test database
tmp_dir = Path(tempfile.mkdtemp())
db_mod.DB_DIR = tmp_dir
db_mod.DB_PATH = tmp_dir / 'test_intercept.db'
# Reset thread-local connection so it picks up new path
if hasattr(db_mod._local, 'connection') and db_mod._local.connection:
db_mod._local.connection.close()
db_mod._local.connection = None
db_mod.init_db()
if 'pager' not in app_module.app.blueprints:
register_blueprints(app_module.app)
return app_module.app
@pytest.fixture
def client(app):
client = app.test_client()
# Set session login to bypass require_login before_request hook
with client.session_transaction() as sess:
sess['logged_in'] = True
return client
class TestAnalyticsSummary:
"""Tests for /analytics/summary endpoint."""
def test_summary_returns_json(self, client):
response = client.get('/analytics/summary')
assert response.status_code == 200
data = json.loads(response.data)
assert data['status'] == 'success'
assert 'counts' in data
assert 'health' in data
assert 'squawks' in data
def test_summary_counts_structure(self, client):
response = client.get('/analytics/summary')
data = json.loads(response.data)
counts = data['counts']
assert 'adsb' in counts
assert 'ais' in counts
assert 'wifi' in counts
assert 'bluetooth' in counts
assert 'dsc' in counts
# All should be integers
for val in counts.values():
assert isinstance(val, int)
def test_summary_health_structure(self, client):
response = client.get('/analytics/summary')
data = json.loads(response.data)
health = data['health']
# Should have process statuses
assert 'pager' in health
assert 'sensor' in health
assert 'adsb' in health
# Each should have a running flag
for mode_info in health.values():
if isinstance(mode_info, dict) and 'running' in mode_info:
assert isinstance(mode_info['running'], bool)
class TestAnalyticsExport:
"""Tests for /analytics/export/<mode> endpoint."""
def test_export_adsb_json(self, client):
response = client.get('/analytics/export/adsb?format=json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['status'] == 'success'
assert data['mode'] == 'adsb'
assert 'data' in data
assert isinstance(data['data'], list)
def test_export_adsb_csv(self, client):
response = client.get('/analytics/export/adsb?format=csv')
assert response.status_code == 200
assert response.content_type.startswith('text/csv')
assert 'Content-Disposition' in response.headers
def test_export_invalid_mode(self, client):
response = client.get('/analytics/export/invalid_mode')
assert response.status_code == 400
data = json.loads(response.data)
assert data['status'] == 'error'
def test_export_wifi_json(self, client):
response = client.get('/analytics/export/wifi?format=json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['status'] == 'success'
assert data['mode'] == 'wifi'
class TestAnalyticsSquawks:
"""Tests for squawk detection."""
def test_squawks_endpoint(self, client):
response = client.get('/analytics/squawks')
assert response.status_code == 200
data = json.loads(response.data)
assert data['status'] == 'success'
assert isinstance(data['squawks'], list)
def test_get_emergency_squawks_detects_7700(self):
from utils.analytics import get_emergency_squawks
# Mock the adsb_aircraft DataStore
mock_store = MagicMock()
mock_store.items.return_value = [
('ABC123', {'squawk': '7700', 'callsign': 'TEST01', 'altitude': 35000}),
('DEF456', {'squawk': '1200', 'callsign': 'TEST02'}),
]
with patch('utils.analytics.app_module') as mock_app:
mock_app.adsb_aircraft = mock_store
squawks = get_emergency_squawks()
assert len(squawks) == 1
assert squawks[0]['squawk'] == '7700'
assert squawks[0]['meaning'] == 'General Emergency'
assert squawks[0]['icao'] == 'ABC123'
class TestGeofenceCRUD:
"""Tests for geofence CRUD endpoints."""
def test_list_geofences(self, client):
response = client.get('/analytics/geofences')
assert response.status_code == 200
data = json.loads(response.data)
assert data['status'] == 'success'
assert isinstance(data['zones'], list)
def test_create_geofence(self, client):
response = client.post('/analytics/geofences',
data=json.dumps({
'name': 'Test Zone',
'lat': 51.5074,
'lon': -0.1278,
'radius_m': 500,
}),
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['status'] == 'success'
assert 'zone_id' in data
def test_create_geofence_missing_fields(self, client):
response = client.post('/analytics/geofences',
data=json.dumps({'name': 'No coords'}),
content_type='application/json')
assert response.status_code == 400
def test_create_geofence_invalid_coords(self, client):
response = client.post('/analytics/geofences',
data=json.dumps({
'name': 'Bad',
'lat': 100,
'lon': 0,
'radius_m': 100,
}),
content_type='application/json')
assert response.status_code == 400
def test_delete_geofence_not_found(self, client):
response = client.delete('/analytics/geofences/99999')
assert response.status_code == 404
class TestAnalyticsActivity:
"""Tests for /analytics/activity endpoint."""
def test_activity_returns_sparklines(self, client):
response = client.get('/analytics/activity')
assert response.status_code == 200
data = json.loads(response.data)
assert data['status'] == 'success'
assert 'sparklines' in data
assert isinstance(data['sparklines'], dict)
+99
View File
@@ -0,0 +1,99 @@
"""Tests for FlightCorrelator: ACARS/VDL2 message matching."""
import pytest
from utils.flight_correlator import FlightCorrelator
class TestFlightCorrelator:
"""Test ACARS/VDL2 message matching by callsign."""
@pytest.fixture(autouse=True)
def setup(self):
self.correlator = FlightCorrelator(max_messages=100)
def test_add_acars_message(self):
self.correlator.add_acars_message({
'flight': 'BAW123', 'tail': 'G-ABCD', 'text': 'Hello',
})
assert self.correlator.acars_count == 1
def test_add_vdl2_message(self):
self.correlator.add_vdl2_message({
'flight': 'DLH456', 'text': 'World',
})
assert self.correlator.vdl2_count == 1
def test_match_by_callsign(self):
self.correlator.add_acars_message({
'flight': 'BAW123', 'text': 'msg1',
})
self.correlator.add_acars_message({
'flight': 'DLH456', 'text': 'msg2',
})
result = self.correlator.get_messages_for_aircraft(callsign='BAW123')
assert len(result['acars']) == 1
assert result['acars'][0]['text'] == 'msg1'
def test_match_by_icao(self):
self.correlator.add_vdl2_message({
'icao': 'ABC123', 'text': 'vdl2 msg',
})
result = self.correlator.get_messages_for_aircraft(icao='ABC123')
assert len(result['vdl2']) == 1
assert result['vdl2'][0]['text'] == 'vdl2 msg'
def test_no_match_returns_empty(self):
self.correlator.add_acars_message({'flight': 'BAW123', 'text': 'msg'})
result = self.correlator.get_messages_for_aircraft(callsign='NOMATCH')
assert result['acars'] == []
assert result['vdl2'] == []
def test_empty_search_returns_empty(self):
result = self.correlator.get_messages_for_aircraft()
assert result == {'acars': [], 'vdl2': []}
def test_ring_buffer_limit(self):
correlator = FlightCorrelator(max_messages=5)
for i in range(10):
correlator.add_acars_message({'flight': f'FL{i}', 'text': f'msg{i}'})
assert correlator.acars_count == 5
# First 5 messages should have been evicted
result = correlator.get_messages_for_aircraft(callsign='FL0')
assert len(result['acars']) == 0
# Last message should still be there
result = correlator.get_messages_for_aircraft(callsign='FL9')
assert len(result['acars']) == 1
def test_case_insensitive_matching(self):
self.correlator.add_acars_message({'flight': 'baw123', 'text': 'lowercase'})
result = self.correlator.get_messages_for_aircraft(callsign='BAW123')
assert len(result['acars']) == 1
def test_match_by_tail_field(self):
self.correlator.add_acars_message({
'tail': 'G-ABCD', 'text': 'tail match',
})
result = self.correlator.get_messages_for_aircraft(callsign='G-ABCD')
assert len(result['acars']) == 1
def test_internal_fields_not_returned(self):
self.correlator.add_acars_message({'flight': 'TEST', 'text': 'msg'})
result = self.correlator.get_messages_for_aircraft(callsign='TEST')
msg = result['acars'][0]
assert '_corr_time' not in msg
def test_both_acars_and_vdl2_returned(self):
self.correlator.add_acars_message({'flight': 'UAL789', 'text': 'acars'})
self.correlator.add_vdl2_message({'flight': 'UAL789', 'text': 'vdl2'})
result = self.correlator.get_messages_for_aircraft(callsign='UAL789')
assert len(result['acars']) == 1
assert len(result['vdl2']) == 1
+114
View File
@@ -0,0 +1,114 @@
"""Tests for geofence haversine, enter/exit detection, and persistence."""
from unittest.mock import MagicMock, patch
import pytest
class TestHaversineDistance:
"""Test haversine_distance accuracy."""
def test_same_point_zero_distance(self):
from utils.geofence import haversine_distance
assert haversine_distance(51.5, -0.1, 51.5, -0.1) == 0.0
def test_known_distance_london_paris(self):
from utils.geofence import haversine_distance
# London to Paris ~340km
dist = haversine_distance(51.5074, -0.1278, 48.8566, 2.3522)
assert 340_000 < dist < 345_000
def test_short_distance(self):
from utils.geofence import haversine_distance
# Two points ~111m apart (0.001 degrees latitude at equator)
dist = haversine_distance(0.0, 0.0, 0.001, 0.0)
assert 100 < dist < 120
def test_antipodal_distance(self):
from utils.geofence import haversine_distance
# North pole to south pole ~20015km
dist = haversine_distance(90.0, 0.0, -90.0, 0.0)
assert 20_000_000 < dist < 20_050_000
class TestGeofenceManager:
"""Test enter/exit detection logic."""
@pytest.fixture(autouse=True)
def _setup(self):
"""Provide a fresh GeofenceManager with mocked DB."""
from utils.geofence import GeofenceManager
with patch('utils.geofence._ensure_table'), patch('utils.geofence.get_db') as mock_db:
# Mock the context manager
mock_conn = MagicMock()
mock_db.return_value.__enter__ = MagicMock(return_value=mock_conn)
mock_db.return_value.__exit__ = MagicMock(return_value=False)
self.manager = GeofenceManager()
# Override list_zones to return test data
self._zones = []
self.manager.list_zones = lambda: self._zones
def test_no_zones_returns_empty(self):
events = self.manager.check_position('TEST1', 'aircraft', 51.5, -0.1)
assert events == []
def test_enter_event(self):
self._zones = [{
'id': 1, 'name': 'London', 'lat': 51.5074, 'lon': -0.1278,
'radius_m': 10000, 'alert_on': 'enter_exit',
}]
# First position inside zone
events = self.manager.check_position('AC1', 'aircraft', 51.5074, -0.1278)
assert len(events) == 1
assert events[0]['type'] == 'geofence_enter'
assert events[0]['zone_name'] == 'London'
def test_no_duplicate_enter(self):
self._zones = [{
'id': 1, 'name': 'London', 'lat': 51.5074, 'lon': -0.1278,
'radius_m': 10000, 'alert_on': 'enter_exit',
}]
# First enter
self.manager.check_position('AC1', 'aircraft', 51.5074, -0.1278)
# Second check still inside - should not fire enter again
events = self.manager.check_position('AC1', 'aircraft', 51.508, -0.128)
assert len(events) == 0
def test_exit_event(self):
self._zones = [{
'id': 1, 'name': 'London', 'lat': 51.5074, 'lon': -0.1278,
'radius_m': 1000, 'alert_on': 'enter_exit',
}]
# Enter
self.manager.check_position('AC1', 'aircraft', 51.5074, -0.1278)
# Exit (far away)
events = self.manager.check_position('AC1', 'aircraft', 52.0, 0.0)
assert len(events) == 1
assert events[0]['type'] == 'geofence_exit'
def test_enter_only_mode(self):
self._zones = [{
'id': 1, 'name': 'London', 'lat': 51.5074, 'lon': -0.1278,
'radius_m': 1000, 'alert_on': 'enter',
}]
# Enter
events = self.manager.check_position('AC1', 'aircraft', 51.5074, -0.1278)
assert len(events) == 1
assert events[0]['type'] == 'geofence_enter'
# Exit should not fire
events = self.manager.check_position('AC1', 'aircraft', 52.0, 0.0)
assert len(events) == 0
def test_metadata_included_in_event(self):
self._zones = [{
'id': 1, 'name': 'Zone', 'lat': 0.0, 'lon': 0.0,
'radius_m': 100000, 'alert_on': 'enter_exit',
}]
events = self.manager.check_position(
'AC1', 'aircraft', 0.0, 0.0,
metadata={'callsign': 'TEST01', 'altitude': 35000}
)
assert events[0]['callsign'] == 'TEST01'
assert events[0]['altitude'] == 35000