diff --git a/tests/test_sse.py b/tests/test_sse.py index 729c502..4330be1 100644 --- a/tests/test_sse.py +++ b/tests/test_sse.py @@ -6,6 +6,8 @@ import queue import time import uuid +import pytest + from utils.sse import subscribe_fanout_queue @@ -13,8 +15,8 @@ def _channel_key(prefix: str) -> str: return f"{prefix}-{uuid.uuid4()}" -def test_fanout_does_not_drain_source_queue_without_subscribers() -> None: - """Queued messages should remain buffered while no SSE clients are connected.""" +def test_fanout_drains_source_queue_without_subscribers() -> None: + """Queued messages should be dropped while no SSE clients are connected.""" source = queue.Queue() channel_key = _channel_key("sse-idle") @@ -25,19 +27,18 @@ def test_fanout_does_not_drain_source_queue_without_subscribers() -> None: source.put({"type": "aprs", "callsign": "N0CALL"}) time.sleep(0.05) - assert source.qsize() == 1 + assert source.qsize() == 0 -def test_fanout_delivers_buffered_message_after_re_subscribe() -> None: - """A message queued while disconnected should be delivered on reconnect.""" +def test_fanout_does_not_replay_stale_message_after_re_subscribe() -> None: + """A message queued while disconnected should not be replayed on reconnect.""" source = queue.Queue() channel_key = _channel_key("sse-resub") _, unsubscribe = subscribe_fanout_queue(source, channel_key=channel_key, source_timeout=0.01) unsubscribe() - expected = {"type": "aprs", "callsign": "K1ABC"} - source.put(expected) + source.put({"type": "aprs", "callsign": "K1ABC"}) subscriber, unsubscribe2 = subscribe_fanout_queue( source, @@ -45,8 +46,12 @@ def test_fanout_delivers_buffered_message_after_re_subscribe() -> None: source_timeout=0.01, ) try: + with pytest.raises(queue.Empty): + subscriber.get(timeout=0.1) + live = {"type": "aprs", "callsign": "LIVE01"} + source.put(live) got = subscriber.get(timeout=0.25) finally: unsubscribe2() - assert got == expected + assert got == live diff --git a/utils/sse.py b/utils/sse.py index 92cb974..a2a9ccc 100644 --- a/utils/sse.py +++ b/utils/sse.py @@ -26,13 +26,26 @@ _fanout_channels_lock = threading.Lock() def _run_fanout(channel: _QueueFanoutChannel) -> None: """Drain source queue and fan out each message to all subscribers.""" + idle_drain_batch = 512 + while True: with channel.lock: - has_subscribers = bool(channel.subscribers) + subscribers = tuple(channel.subscribers) - if not has_subscribers: - # Do not drain source_queue when no clients are connected. - time.sleep(channel.source_timeout) + if not subscribers: + # Keep ingest pipelines responsive even if UI clients disconnect: + # drain and drop stale backlog while idle so producer threads do + # not block on full source queues. + drained = 0 + for _ in range(idle_drain_batch): + try: + channel.source_queue.get_nowait() + drained += 1 + except queue.Empty: + break + + if drained == 0: + time.sleep(channel.source_timeout) continue try: @@ -40,17 +53,6 @@ def _run_fanout(channel: _QueueFanoutChannel) -> None: except queue.Empty: continue - with channel.lock: - subscribers = tuple(channel.subscribers) - - if not subscribers: - # Subscriber set changed after we dequeued; requeue best-effort. - try: - channel.source_queue.put_nowait(msg) - except queue.Full: - pass - continue - for subscriber in subscribers: try: subscriber.put_nowait(msg)