Fix ADS-B SSE fanout for multi-client streams

This commit is contained in:
Smittix
2026-02-19 18:26:23 +00:00
parent 06a00ca6b5
commit 694786d4e0
2 changed files with 117 additions and 47 deletions

View File

@@ -77,6 +77,11 @@ _sbs_error_logged = False # Suppress repeated connection error logs
# Track ICAOs already looked up in aircraft database (avoid repeated lookups) # Track ICAOs already looked up in aircraft database (avoid repeated lookups)
_looked_up_icaos: set[str] = set() _looked_up_icaos: set[str] = set()
# Per-client SSE queues for ADS-B stream fanout.
_adsb_stream_subscribers: set[queue.Queue] = set()
_adsb_stream_subscribers_lock = threading.Lock()
_ADSB_STREAM_CLIENT_QUEUE_SIZE = 500
# Load aircraft database at module init # Load aircraft database at module init
aircraft_db.load_database() aircraft_db.load_database()
@@ -203,6 +208,31 @@ def _parse_int_param(value: str | None, default: int, min_value: int | None = No
return parsed return parsed
def _broadcast_adsb_update(payload: dict[str, Any]) -> None:
"""Fan out a payload to all active ADS-B SSE subscribers."""
with _adsb_stream_subscribers_lock:
subscribers = tuple(_adsb_stream_subscribers)
for subscriber in subscribers:
try:
subscriber.put_nowait(payload)
except queue.Full:
# Drop oldest queued event for that client and try once more.
try:
subscriber.get_nowait()
subscriber.put_nowait(payload)
except (queue.Empty, queue.Full):
# Client queue remains saturated; skip this payload.
continue
def _adsb_stream_queue_depth() -> int:
"""Best-effort aggregate queue depth across connected ADS-B SSE clients."""
with _adsb_stream_subscribers_lock:
subscribers = tuple(_adsb_stream_subscribers)
return sum(subscriber.qsize() for subscriber in subscribers)
def _get_active_session() -> dict[str, Any] | None: def _get_active_session() -> dict[str, Any] | None:
if not ADSB_HISTORY_ENABLED or not PSYCOPG2_AVAILABLE: if not ADSB_HISTORY_ENABLED or not PSYCOPG2_AVAILABLE:
return None return None
@@ -481,7 +511,7 @@ def parse_sbs_stream(service_addr):
for update_icao in pending_updates: for update_icao in pending_updates:
if update_icao in app_module.adsb_aircraft: if update_icao in app_module.adsb_aircraft:
snapshot = app_module.adsb_aircraft[update_icao] snapshot = app_module.adsb_aircraft[update_icao]
app_module.adsb_queue.put({ _broadcast_adsb_update({
'type': 'aircraft', 'type': 'aircraft',
**snapshot **snapshot
}) })
@@ -580,7 +610,7 @@ def adsb_status():
'last_message_time': adsb_last_message_time, 'last_message_time': adsb_last_message_time,
'aircraft_count': len(app_module.adsb_aircraft), 'aircraft_count': len(app_module.adsb_aircraft),
'aircraft': dict(app_module.adsb_aircraft), # Full aircraft data 'aircraft': dict(app_module.adsb_aircraft), # Full aircraft data
'queue_size': app_module.adsb_queue.qsize(), 'queue_size': _adsb_stream_queue_depth(),
'dump1090_path': find_dump1090(), 'dump1090_path': find_dump1090(),
'dump1090_running': dump1090_running, 'dump1090_running': dump1090_running,
'port_30003_open': check_dump1090_service() is not None 'port_30003_open': check_dump1090_service() is not None
@@ -871,23 +901,39 @@ def stop_adsb():
@adsb_bp.route('/stream') @adsb_bp.route('/stream')
def stream_adsb(): def stream_adsb():
"""SSE stream for ADS-B aircraft.""" """SSE stream for ADS-B aircraft."""
client_queue: queue.Queue = queue.Queue(maxsize=_ADSB_STREAM_CLIENT_QUEUE_SIZE)
with _adsb_stream_subscribers_lock:
_adsb_stream_subscribers.add(client_queue)
# Prime new clients with current known aircraft so they don't wait for the
# next positional update before rendering.
for snapshot in list(app_module.adsb_aircraft.values()):
try:
client_queue.put_nowait({'type': 'aircraft', **snapshot})
except queue.Full:
break
def generate(): def generate():
last_keepalive = time.time() last_keepalive = time.time()
while True: try:
try: while True:
msg = app_module.adsb_queue.get(timeout=SSE_QUEUE_TIMEOUT)
last_keepalive = time.time()
try: try:
process_event('adsb', msg, msg.get('type')) msg = client_queue.get(timeout=SSE_QUEUE_TIMEOUT)
except Exception: last_keepalive = time.time()
pass try:
yield format_sse(msg) process_event('adsb', msg, msg.get('type'))
except queue.Empty: except Exception:
now = time.time() pass
if now - last_keepalive >= SSE_KEEPALIVE_INTERVAL: yield format_sse(msg)
yield format_sse({'type': 'keepalive'}) except queue.Empty:
last_keepalive = now now = time.time()
if now - last_keepalive >= SSE_KEEPALIVE_INTERVAL:
yield format_sse({'type': 'keepalive'})
last_keepalive = now
finally:
with _adsb_stream_subscribers_lock:
_adsb_stream_subscribers.discard(client_queue)
response = Response(generate(), mimetype='text/event-stream') response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache' response.headers['Cache-Control'] = 'no-cache'

View File

@@ -13,6 +13,7 @@ from __future__ import annotations
import json import json
import logging import logging
import queue import queue
import threading
import time import time
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Generator from typing import Generator
@@ -36,10 +37,28 @@ from utils.trilateration import (
logger = logging.getLogger('intercept.controller') logger = logging.getLogger('intercept.controller')
controller_bp = Blueprint('controller', __name__, url_prefix='/controller') controller_bp = Blueprint('controller', __name__, url_prefix='/controller')
# Multi-agent data queue for combined SSE stream # Multi-agent SSE fanout state (per-client queues).
agent_data_queue: queue.Queue = queue.Queue(maxsize=1000) _agent_stream_subscribers: set[queue.Queue] = set()
_agent_stream_subscribers_lock = threading.Lock()
_AGENT_STREAM_CLIENT_QUEUE_SIZE = 500
def _broadcast_agent_data(payload: dict) -> None:
"""Fan out an ingested payload to all active /controller/stream/all clients."""
with _agent_stream_subscribers_lock:
subscribers = tuple(_agent_stream_subscribers)
for subscriber in subscribers:
try:
subscriber.put_nowait(payload)
except queue.Full:
try:
subscriber.get_nowait()
subscriber.put_nowait(payload)
except (queue.Empty, queue.Full):
continue
# ============================================================================= # =============================================================================
@@ -625,19 +644,16 @@ def ingest_push_data():
received_at=data.get('received_at') received_at=data.get('received_at')
) )
# Emit to SSE stream # Emit to SSE stream (fanout to all connected clients)
try: _broadcast_agent_data({
agent_data_queue.put_nowait({ 'type': 'agent_data',
'type': 'agent_data', 'agent_id': agent['id'],
'agent_id': agent['id'], 'agent_name': agent_name,
'agent_name': agent_name, 'scan_type': data.get('scan_type'),
'scan_type': data.get('scan_type'), 'interface': data.get('interface'),
'interface': data.get('interface'), 'payload': data.get('payload'),
'payload': data.get('payload'), 'received_at': data.get('received_at') or datetime.now(timezone.utc).isoformat()
'received_at': data.get('received_at') or datetime.now(timezone.utc).isoformat() })
})
except queue.Full:
logger.warning("Agent data queue full, data may be lost")
return jsonify({ return jsonify({
'status': 'accepted', 'status': 'accepted',
@@ -674,27 +690,35 @@ def get_payloads():
# ============================================================================= # =============================================================================
@controller_bp.route('/stream/all') @controller_bp.route('/stream/all')
def stream_all_agents(): def stream_all_agents():
""" """
Combined SSE stream for data from all agents. Combined SSE stream for data from all agents.
This endpoint streams push data as it arrives from agents. This endpoint streams push data as it arrives from agents.
Each message is tagged with agent_id and agent_name. Each message is tagged with agent_id and agent_name.
""" """
def generate() -> Generator[str, None, None]: client_queue: queue.Queue = queue.Queue(maxsize=_AGENT_STREAM_CLIENT_QUEUE_SIZE)
last_keepalive = time.time() with _agent_stream_subscribers_lock:
keepalive_interval = 30.0 _agent_stream_subscribers.add(client_queue)
while True: def generate() -> Generator[str, None, None]:
try: last_keepalive = time.time()
msg = agent_data_queue.get(timeout=1.0) keepalive_interval = 30.0
last_keepalive = time.time()
yield format_sse(msg) try:
except queue.Empty: while True:
now = time.time() try:
if now - last_keepalive >= keepalive_interval: msg = client_queue.get(timeout=1.0)
yield format_sse({'type': 'keepalive'}) last_keepalive = time.time()
last_keepalive = now yield format_sse(msg)
except queue.Empty:
now = time.time()
if now - last_keepalive >= keepalive_interval:
yield format_sse({'type': 'keepalive'})
last_keepalive = now
finally:
with _agent_stream_subscribers_lock:
_agent_stream_subscribers.discard(client_queue)
response = Response(generate(), mimetype='text/event-stream') response = Response(generate(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache' response.headers['Cache-Control'] = 'no-cache'