feat: add ADS-B history filtering, export, and UI improvements

Add date range filtering, CSV export, and enhanced history page styling
for the ADS-B aircraft tracking history feature.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Smittix
2026-03-05 14:54:55 +00:00
parent 87a5715f30
commit a146a21285
3 changed files with 671 additions and 7 deletions

View File

@@ -3,6 +3,8 @@
from __future__ import annotations
import json
import csv
import io
import os
import queue
import shutil
@@ -10,7 +12,7 @@ import socket
import subprocess
import threading
import time
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from typing import Any, Generator
from flask import Blueprint, Response, jsonify, make_response, render_template, request
@@ -207,6 +209,134 @@ def _parse_int_param(value: str | None, default: int, min_value: int | None = No
return parsed
def _parse_iso_datetime(value: Any) -> datetime | None:
if not isinstance(value, str):
return None
cleaned = value.strip()
if not cleaned:
return None
if cleaned.endswith('Z'):
cleaned = f"{cleaned[:-1]}+00:00"
try:
parsed = datetime.fromisoformat(cleaned)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _parse_export_scope(
args: Any,
) -> tuple[str, int, datetime | None, datetime | None]:
scope = str(args.get('scope') or 'window').strip().lower()
if scope not in {'window', 'all', 'custom'}:
scope = 'window'
since_minutes = _parse_int_param(args.get('since_minutes'), 1440, 1, 525600)
start = _parse_iso_datetime(args.get('start'))
end = _parse_iso_datetime(args.get('end'))
if scope == 'custom' and (start is None or end is None or end <= start):
scope = 'window'
return scope, since_minutes, start, end
def _add_time_filter(
*,
where_parts: list[str],
params: list[Any],
scope: str,
timestamp_field: str,
since_minutes: int,
start: datetime | None,
end: datetime | None,
) -> None:
if scope == 'all':
return
if scope == 'custom' and start is not None and end is not None:
where_parts.append(f"{timestamp_field} >= %s AND {timestamp_field} < %s")
params.extend([start, end])
return
where_parts.append(f"{timestamp_field} >= NOW() - INTERVAL %s")
params.append(f'{since_minutes} minutes')
def _serialize_export_value(value: Any) -> Any:
if isinstance(value, datetime):
return value.isoformat()
return value
def _rows_to_serializable(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [{key: _serialize_export_value(value) for key, value in row.items()} for row in rows]
def _build_export_csv(
*,
exported_at: str,
scope: str,
since_minutes: int | None,
icao: str,
search: str,
messages: list[dict[str, Any]],
snapshots: list[dict[str, Any]],
sessions: list[dict[str, Any]],
export_type: str,
) -> str:
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['Exported At', exported_at])
writer.writerow(['Scope', scope])
if since_minutes is not None:
writer.writerow(['Since Minutes', since_minutes])
if icao:
writer.writerow(['ICAO Filter', icao])
if search:
writer.writerow(['Search Filter', search])
writer.writerow([])
def write_section(title: str, rows: list[dict[str, Any]], columns: list[str]) -> None:
writer.writerow([title])
writer.writerow(columns)
for row in rows:
writer.writerow([_serialize_export_value(row.get(col)) for col in columns])
writer.writerow([])
if export_type in {'messages', 'all'}:
write_section(
'Messages',
messages,
[
'received_at', 'msg_time', 'logged_time', 'icao', 'msg_type', 'callsign',
'altitude', 'speed', 'heading', 'vertical_rate', 'lat', 'lon', 'squawk',
'session_id', 'aircraft_id', 'flight_id', 'source_host', 'raw_line',
],
)
if export_type in {'snapshots', 'all'}:
write_section(
'Snapshots',
snapshots,
[
'captured_at', 'icao', 'callsign', 'registration', 'type_code', 'type_desc',
'altitude', 'speed', 'heading', 'vertical_rate', 'lat', 'lon', 'squawk',
'source_host',
],
)
if export_type in {'sessions', 'all'}:
write_section(
'Sessions',
sessions,
[
'id', 'started_at', 'ended_at', 'device_index', 'sdr_type', 'remote_host',
'remote_port', 'start_source', 'stop_source', 'started_by', 'stopped_by', 'notes',
],
)
return output.getvalue()
def _broadcast_adsb_update(payload: dict[str, Any]) -> None:
"""Fan out a payload to all active ADS-B SSE subscribers."""
with _adsb_stream_subscribers_lock:
@@ -1069,7 +1199,7 @@ def adsb_history_summary():
return jsonify({'error': 'ADS-B history is disabled'}), 503
_ensure_history_schema()
since_minutes = _parse_int_param(request.args.get('since_minutes'), 60, 1, 10080)
since_minutes = _parse_int_param(request.args.get('since_minutes'), 1440, 1, 10080)
window = f'{since_minutes} minutes'
sql = """
@@ -1099,7 +1229,7 @@ def adsb_history_aircraft():
return jsonify({'error': 'ADS-B history is disabled'}), 503
_ensure_history_schema()
since_minutes = _parse_int_param(request.args.get('since_minutes'), 60, 1, 10080)
since_minutes = _parse_int_param(request.args.get('since_minutes'), 1440, 1, 10080)
limit = _parse_int_param(request.args.get('limit'), 200, 1, 2000)
search = (request.args.get('search') or '').strip()
window = f'{since_minutes} minutes'
@@ -1153,7 +1283,7 @@ def adsb_history_timeline():
if not icao:
return jsonify({'error': 'icao is required'}), 400
since_minutes = _parse_int_param(request.args.get('since_minutes'), 60, 1, 10080)
since_minutes = _parse_int_param(request.args.get('since_minutes'), 1440, 1, 10080)
limit = _parse_int_param(request.args.get('limit'), 2000, 1, 20000)
window = f'{since_minutes} minutes'
@@ -1209,6 +1339,238 @@ def adsb_history_messages():
return jsonify({'error': 'History database unavailable'}), 503
@adsb_bp.route('/history/export')
def adsb_history_export():
"""Export ADS-B history data in CSV or JSON format."""
if not ADSB_HISTORY_ENABLED or not PSYCOPG2_AVAILABLE:
return jsonify({'error': 'ADS-B history is disabled'}), 503
_ensure_history_schema()
export_format = str(request.args.get('format') or 'csv').strip().lower()
export_type = str(request.args.get('type') or 'all').strip().lower()
if export_format not in {'csv', 'json'}:
return jsonify({'error': 'format must be csv or json'}), 400
if export_type not in {'messages', 'snapshots', 'sessions', 'all'}:
return jsonify({'error': 'type must be messages, snapshots, sessions, or all'}), 400
scope, since_minutes, start, end = _parse_export_scope(request.args)
icao = (request.args.get('icao') or '').strip().upper()
search = (request.args.get('search') or '').strip()
pattern = f'%{search}%'
snapshots: list[dict[str, Any]] = []
messages: list[dict[str, Any]] = []
sessions: list[dict[str, Any]] = []
try:
with _get_history_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
if export_type in {'snapshots', 'all'}:
snapshot_where: list[str] = []
snapshot_params: list[Any] = []
_add_time_filter(
where_parts=snapshot_where,
params=snapshot_params,
scope=scope,
timestamp_field='captured_at',
since_minutes=since_minutes,
start=start,
end=end,
)
if icao:
snapshot_where.append("icao = %s")
snapshot_params.append(icao)
if search:
snapshot_where.append("(icao ILIKE %s OR callsign ILIKE %s OR registration ILIKE %s)")
snapshot_params.extend([pattern, pattern, pattern])
snapshot_sql = """
SELECT captured_at, icao, callsign, registration, type_code, type_desc,
altitude, speed, heading, vertical_rate, lat, lon, squawk, source_host
FROM adsb_snapshots
"""
if snapshot_where:
snapshot_sql += " WHERE " + " AND ".join(snapshot_where)
snapshot_sql += " ORDER BY captured_at DESC"
cur.execute(snapshot_sql, tuple(snapshot_params))
snapshots = cur.fetchall()
if export_type in {'messages', 'all'}:
message_where: list[str] = []
message_params: list[Any] = []
_add_time_filter(
where_parts=message_where,
params=message_params,
scope=scope,
timestamp_field='received_at',
since_minutes=since_minutes,
start=start,
end=end,
)
if icao:
message_where.append("icao = %s")
message_params.append(icao)
if search:
message_where.append("(icao ILIKE %s OR callsign ILIKE %s)")
message_params.extend([pattern, pattern])
message_sql = """
SELECT received_at, msg_time, logged_time, icao, msg_type, callsign,
altitude, speed, heading, vertical_rate, lat, lon, squawk,
session_id, aircraft_id, flight_id, source_host, raw_line
FROM adsb_messages
"""
if message_where:
message_sql += " WHERE " + " AND ".join(message_where)
message_sql += " ORDER BY received_at DESC"
cur.execute(message_sql, tuple(message_params))
messages = cur.fetchall()
if export_type in {'sessions', 'all'}:
session_where: list[str] = []
session_params: list[Any] = []
if scope == 'custom' and start is not None and end is not None:
session_where.append("COALESCE(ended_at, %s) >= %s AND started_at < %s")
session_params.extend([end, start, end])
elif scope == 'window':
session_where.append("COALESCE(ended_at, NOW()) >= NOW() - INTERVAL %s")
session_params.append(f'{since_minutes} minutes')
session_sql = """
SELECT id, started_at, ended_at, device_index, sdr_type, remote_host,
remote_port, start_source, stop_source, started_by, stopped_by, notes
FROM adsb_sessions
"""
if session_where:
session_sql += " WHERE " + " AND ".join(session_where)
session_sql += " ORDER BY started_at DESC"
cur.execute(session_sql, tuple(session_params))
sessions = cur.fetchall()
except Exception as exc:
logger.warning("ADS-B history export failed: %s", exc)
return jsonify({'error': 'History database unavailable'}), 503
exported_at = datetime.now(timezone.utc).isoformat()
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
filename_scope = 'all' if scope == 'all' else ('custom' if scope == 'custom' else f'{since_minutes}m')
filename = f'adsb_history_{export_type}_{filename_scope}_{timestamp}.{export_format}'
if export_format == 'json':
payload = {
'exported_at': exported_at,
'format': export_format,
'type': export_type,
'scope': scope,
'since_minutes': None if scope != 'window' else since_minutes,
'filters': {
'icao': icao or None,
'search': search or None,
'start': start.isoformat() if start else None,
'end': end.isoformat() if end else None,
},
'counts': {
'messages': len(messages),
'snapshots': len(snapshots),
'sessions': len(sessions),
},
'messages': _rows_to_serializable(messages),
'snapshots': _rows_to_serializable(snapshots),
'sessions': _rows_to_serializable(sessions),
}
response = Response(
json.dumps(payload, indent=2, default=str),
mimetype='application/json',
)
response.headers['Content-Disposition'] = f'attachment; filename={filename}'
return response
csv_data = _build_export_csv(
exported_at=exported_at,
scope=scope,
since_minutes=since_minutes if scope == 'window' else None,
icao=icao,
search=search,
messages=messages,
snapshots=snapshots,
sessions=sessions,
export_type=export_type,
)
response = Response(csv_data, mimetype='text/csv')
response.headers['Content-Disposition'] = f'attachment; filename={filename}'
return response
@adsb_bp.route('/history/prune', methods=['POST'])
def adsb_history_prune():
"""Delete ADS-B history for a selected time range or entire dataset."""
if not ADSB_HISTORY_ENABLED or not PSYCOPG2_AVAILABLE:
return jsonify({'error': 'ADS-B history is disabled'}), 503
_ensure_history_schema()
payload = request.get_json(silent=True) or {}
mode = str(payload.get('mode') or 'range').strip().lower()
if mode not in {'range', 'all'}:
return jsonify({'error': 'mode must be range or all'}), 400
try:
with _get_history_connection() as conn:
with conn.cursor() as cur:
deleted = {'messages': 0, 'snapshots': 0}
if mode == 'all':
cur.execute("DELETE FROM adsb_messages")
deleted['messages'] = max(0, cur.rowcount or 0)
cur.execute("DELETE FROM adsb_snapshots")
deleted['snapshots'] = max(0, cur.rowcount or 0)
return jsonify({
'status': 'ok',
'mode': 'all',
'deleted': deleted,
'total_deleted': deleted['messages'] + deleted['snapshots'],
})
start = _parse_iso_datetime(payload.get('start'))
end = _parse_iso_datetime(payload.get('end'))
if start is None or end is None:
return jsonify({'error': 'start and end ISO datetime values are required'}), 400
if end <= start:
return jsonify({'error': 'end must be after start'}), 400
if end - start > timedelta(days=31):
return jsonify({'error': 'range cannot exceed 31 days'}), 400
cur.execute(
"""
DELETE FROM adsb_messages
WHERE received_at >= %s
AND received_at < %s
""",
(start, end),
)
deleted['messages'] = max(0, cur.rowcount or 0)
cur.execute(
"""
DELETE FROM adsb_snapshots
WHERE captured_at >= %s
AND captured_at < %s
""",
(start, end),
)
deleted['snapshots'] = max(0, cur.rowcount or 0)
return jsonify({
'status': 'ok',
'mode': 'range',
'start': start.isoformat(),
'end': end.isoformat(),
'deleted': deleted,
'total_deleted': deleted['messages'] + deleted['snapshots'],
})
except Exception as exc:
logger.warning("ADS-B history prune failed: %s", exc)
return jsonify({'error': 'History database unavailable'}), 503
# ============================================
# AIRCRAFT DATABASE MANAGEMENT
# ============================================