diff --git a/routes/adsb.py b/routes/adsb.py index 861a0d5..4b7e8de 100644 --- a/routes/adsb.py +++ b/routes/adsb.py @@ -77,6 +77,11 @@ _sbs_error_logged = False # Suppress repeated connection error logs # Track ICAOs already looked up in aircraft database (avoid repeated lookups) _looked_up_icaos: set[str] = set() +# Per-client SSE queues for ADS-B stream fanout. +_adsb_stream_subscribers: set[queue.Queue] = set() +_adsb_stream_subscribers_lock = threading.Lock() +_ADSB_STREAM_CLIENT_QUEUE_SIZE = 500 + # Load aircraft database at module init aircraft_db.load_database() @@ -203,6 +208,31 @@ def _parse_int_param(value: str | None, default: int, min_value: int | None = No return parsed +def _broadcast_adsb_update(payload: dict[str, Any]) -> None: + """Fan out a payload to all active ADS-B SSE subscribers.""" + with _adsb_stream_subscribers_lock: + subscribers = tuple(_adsb_stream_subscribers) + + for subscriber in subscribers: + try: + subscriber.put_nowait(payload) + except queue.Full: + # Drop oldest queued event for that client and try once more. + try: + subscriber.get_nowait() + subscriber.put_nowait(payload) + except (queue.Empty, queue.Full): + # Client queue remains saturated; skip this payload. + continue + + +def _adsb_stream_queue_depth() -> int: + """Best-effort aggregate queue depth across connected ADS-B SSE clients.""" + with _adsb_stream_subscribers_lock: + subscribers = tuple(_adsb_stream_subscribers) + return sum(subscriber.qsize() for subscriber in subscribers) + + def _get_active_session() -> dict[str, Any] | None: if not ADSB_HISTORY_ENABLED or not PSYCOPG2_AVAILABLE: return None @@ -481,7 +511,7 @@ def parse_sbs_stream(service_addr): for update_icao in pending_updates: if update_icao in app_module.adsb_aircraft: snapshot = app_module.adsb_aircraft[update_icao] - app_module.adsb_queue.put({ + _broadcast_adsb_update({ 'type': 'aircraft', **snapshot }) @@ -580,7 +610,7 @@ def adsb_status(): 'last_message_time': adsb_last_message_time, 'aircraft_count': len(app_module.adsb_aircraft), 'aircraft': dict(app_module.adsb_aircraft), # Full aircraft data - 'queue_size': app_module.adsb_queue.qsize(), + 'queue_size': _adsb_stream_queue_depth(), 'dump1090_path': find_dump1090(), 'dump1090_running': dump1090_running, 'port_30003_open': check_dump1090_service() is not None @@ -871,23 +901,39 @@ def stop_adsb(): @adsb_bp.route('/stream') def stream_adsb(): """SSE stream for ADS-B aircraft.""" + client_queue: queue.Queue = queue.Queue(maxsize=_ADSB_STREAM_CLIENT_QUEUE_SIZE) + with _adsb_stream_subscribers_lock: + _adsb_stream_subscribers.add(client_queue) + + # Prime new clients with current known aircraft so they don't wait for the + # next positional update before rendering. + for snapshot in list(app_module.adsb_aircraft.values()): + try: + client_queue.put_nowait({'type': 'aircraft', **snapshot}) + except queue.Full: + break + def generate(): last_keepalive = time.time() - while True: - try: - msg = app_module.adsb_queue.get(timeout=SSE_QUEUE_TIMEOUT) - last_keepalive = time.time() + try: + while True: try: - process_event('adsb', msg, msg.get('type')) - except Exception: - pass - yield format_sse(msg) - except queue.Empty: - now = time.time() - if now - last_keepalive >= SSE_KEEPALIVE_INTERVAL: - yield format_sse({'type': 'keepalive'}) - last_keepalive = now + msg = client_queue.get(timeout=SSE_QUEUE_TIMEOUT) + last_keepalive = time.time() + try: + process_event('adsb', msg, msg.get('type')) + except Exception: + pass + yield format_sse(msg) + except queue.Empty: + now = time.time() + if now - last_keepalive >= SSE_KEEPALIVE_INTERVAL: + yield format_sse({'type': 'keepalive'}) + last_keepalive = now + finally: + with _adsb_stream_subscribers_lock: + _adsb_stream_subscribers.discard(client_queue) response = Response(generate(), mimetype='text/event-stream') response.headers['Cache-Control'] = 'no-cache' diff --git a/routes/controller.py b/routes/controller.py index 920724c..eb46e92 100644 --- a/routes/controller.py +++ b/routes/controller.py @@ -13,6 +13,7 @@ from __future__ import annotations import json import logging import queue +import threading import time from datetime import datetime, timezone from typing import Generator @@ -36,10 +37,28 @@ from utils.trilateration import ( logger = logging.getLogger('intercept.controller') -controller_bp = Blueprint('controller', __name__, url_prefix='/controller') - -# Multi-agent data queue for combined SSE stream -agent_data_queue: queue.Queue = queue.Queue(maxsize=1000) +controller_bp = Blueprint('controller', __name__, url_prefix='/controller') + +# Multi-agent SSE fanout state (per-client queues). +_agent_stream_subscribers: set[queue.Queue] = set() +_agent_stream_subscribers_lock = threading.Lock() +_AGENT_STREAM_CLIENT_QUEUE_SIZE = 500 + + +def _broadcast_agent_data(payload: dict) -> None: + """Fan out an ingested payload to all active /controller/stream/all clients.""" + with _agent_stream_subscribers_lock: + subscribers = tuple(_agent_stream_subscribers) + + for subscriber in subscribers: + try: + subscriber.put_nowait(payload) + except queue.Full: + try: + subscriber.get_nowait() + subscriber.put_nowait(payload) + except (queue.Empty, queue.Full): + continue # ============================================================================= @@ -625,19 +644,16 @@ def ingest_push_data(): received_at=data.get('received_at') ) - # Emit to SSE stream - try: - agent_data_queue.put_nowait({ - 'type': 'agent_data', - 'agent_id': agent['id'], - 'agent_name': agent_name, - 'scan_type': data.get('scan_type'), - 'interface': data.get('interface'), - 'payload': data.get('payload'), - 'received_at': data.get('received_at') or datetime.now(timezone.utc).isoformat() - }) - except queue.Full: - logger.warning("Agent data queue full, data may be lost") + # Emit to SSE stream (fanout to all connected clients) + _broadcast_agent_data({ + 'type': 'agent_data', + 'agent_id': agent['id'], + 'agent_name': agent_name, + 'scan_type': data.get('scan_type'), + 'interface': data.get('interface'), + 'payload': data.get('payload'), + 'received_at': data.get('received_at') or datetime.now(timezone.utc).isoformat() + }) return jsonify({ 'status': 'accepted', @@ -674,27 +690,35 @@ def get_payloads(): # ============================================================================= @controller_bp.route('/stream/all') -def stream_all_agents(): +def stream_all_agents(): """ Combined SSE stream for data from all agents. This endpoint streams push data as it arrives from agents. Each message is tagged with agent_id and agent_name. """ - def generate() -> Generator[str, None, None]: - last_keepalive = time.time() - keepalive_interval = 30.0 - - while True: - try: - msg = agent_data_queue.get(timeout=1.0) - last_keepalive = time.time() - yield format_sse(msg) - except queue.Empty: - now = time.time() - if now - last_keepalive >= keepalive_interval: - yield format_sse({'type': 'keepalive'}) - last_keepalive = now + client_queue: queue.Queue = queue.Queue(maxsize=_AGENT_STREAM_CLIENT_QUEUE_SIZE) + with _agent_stream_subscribers_lock: + _agent_stream_subscribers.add(client_queue) + + def generate() -> Generator[str, None, None]: + last_keepalive = time.time() + keepalive_interval = 30.0 + + try: + while True: + try: + msg = client_queue.get(timeout=1.0) + last_keepalive = time.time() + yield format_sse(msg) + except queue.Empty: + now = time.time() + if now - last_keepalive >= keepalive_interval: + yield format_sse({'type': 'keepalive'}) + last_keepalive = now + finally: + with _agent_stream_subscribers_lock: + _agent_stream_subscribers.discard(client_queue) response = Response(generate(), mimetype='text/event-stream') response.headers['Cache-Control'] = 'no-cache'