From 3e608c62a08fd7b4395da709cb8e979bdc96ce23 Mon Sep 17 00:00:00 2001 From: Smittix Date: Tue, 24 Feb 2026 20:38:19 +0000 Subject: [PATCH] Fix SSE fanout packet loss on reconnect --- tests/test_sse.py | 52 +++++++++++++++++++++++++++++++++++++++++++++++ utils/sse.py | 34 ++++++++++++++++++++++++++++--- 2 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 tests/test_sse.py diff --git a/tests/test_sse.py b/tests/test_sse.py new file mode 100644 index 0000000..729c502 --- /dev/null +++ b/tests/test_sse.py @@ -0,0 +1,52 @@ +"""Tests for SSE fanout queue behavior.""" + +from __future__ import annotations + +import queue +import time +import uuid + +from utils.sse import subscribe_fanout_queue + + +def _channel_key(prefix: str) -> str: + return f"{prefix}-{uuid.uuid4()}" + + +def test_fanout_does_not_drain_source_queue_without_subscribers() -> None: + """Queued messages should remain buffered while no SSE clients are connected.""" + source = queue.Queue() + channel_key = _channel_key("sse-idle") + + # Start fanout distributor, then remove the only subscriber. + _, unsubscribe = subscribe_fanout_queue(source, channel_key=channel_key, source_timeout=0.01) + unsubscribe() + + source.put({"type": "aprs", "callsign": "N0CALL"}) + time.sleep(0.05) + + assert source.qsize() == 1 + + +def test_fanout_delivers_buffered_message_after_re_subscribe() -> None: + """A message queued while disconnected should be delivered on reconnect.""" + source = queue.Queue() + channel_key = _channel_key("sse-resub") + + _, unsubscribe = subscribe_fanout_queue(source, channel_key=channel_key, source_timeout=0.01) + unsubscribe() + + expected = {"type": "aprs", "callsign": "K1ABC"} + source.put(expected) + + subscriber, unsubscribe2 = subscribe_fanout_queue( + source, + channel_key=channel_key, + source_timeout=0.01, + ) + try: + got = subscriber.get(timeout=0.25) + finally: + unsubscribe2() + + assert got == expected diff --git a/utils/sse.py b/utils/sse.py index dafe530..92cb974 100644 --- a/utils/sse.py +++ b/utils/sse.py @@ -27,6 +27,14 @@ _fanout_channels_lock = threading.Lock() def _run_fanout(channel: _QueueFanoutChannel) -> None: """Drain source queue and fan out each message to all subscribers.""" while True: + with channel.lock: + has_subscribers = bool(channel.subscribers) + + if not has_subscribers: + # Do not drain source_queue when no clients are connected. + time.sleep(channel.source_timeout) + continue + try: msg = channel.source_queue.get(timeout=channel.source_timeout) except queue.Empty: @@ -35,6 +43,14 @@ def _run_fanout(channel: _QueueFanoutChannel) -> None: with channel.lock: subscribers = tuple(channel.subscribers) + if not subscribers: + # Subscriber set changed after we dequeued; requeue best-effort. + try: + channel.source_queue.put_nowait(msg) + except queue.Full: + pass + continue + for subscriber in subscribers: try: subscriber.put_nowait(msg) @@ -52,13 +68,24 @@ def _ensure_fanout_channel( source_queue: queue.Queue, source_timeout: float, ) -> _QueueFanoutChannel: - """Get/create a fanout channel and ensure distributor thread is running.""" + """Get/create a fanout channel.""" with _fanout_channels_lock: channel = _fanout_channels.get(channel_key) if channel is None: channel = _QueueFanoutChannel(source_queue=source_queue, source_timeout=source_timeout) _fanout_channels[channel_key] = channel + if channel.source_queue is not source_queue: + # Keep channel in sync if source queue object is replaced. + channel.source_queue = source_queue + channel.source_timeout = source_timeout + + return channel + + +def _ensure_distributor_running(channel: _QueueFanoutChannel, channel_key: str) -> None: + """Ensure fanout distributor thread is running for a channel.""" + with _fanout_channels_lock: if channel.distributor is None or not channel.distributor.is_alive(): channel.distributor = threading.Thread( target=_run_fanout, @@ -68,8 +95,6 @@ def _ensure_fanout_channel( ) channel.distributor.start() - return channel - def subscribe_fanout_queue( source_queue: queue.Queue, @@ -89,6 +114,9 @@ def subscribe_fanout_queue( with channel.lock: channel.subscribers.add(subscriber) + # Start distributor only after subscriber is registered to avoid initial-loss race. + _ensure_distributor_running(channel, channel_key) + def _unsubscribe() -> None: with channel.lock: channel.subscribers.discard(subscriber)