mirror of
https://github.com/smittix/intercept.git
synced 2026-06-12 16:03:29 -07:00
Fix SSE fanout packet loss on reconnect
This commit is contained in:
@@ -0,0 +1,52 @@
|
||||
"""Tests for SSE fanout queue behavior."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import queue
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from utils.sse import subscribe_fanout_queue
|
||||
|
||||
|
||||
def _channel_key(prefix: str) -> str:
|
||||
return f"{prefix}-{uuid.uuid4()}"
|
||||
|
||||
|
||||
def test_fanout_does_not_drain_source_queue_without_subscribers() -> None:
|
||||
"""Queued messages should remain buffered while no SSE clients are connected."""
|
||||
source = queue.Queue()
|
||||
channel_key = _channel_key("sse-idle")
|
||||
|
||||
# Start fanout distributor, then remove the only subscriber.
|
||||
_, unsubscribe = subscribe_fanout_queue(source, channel_key=channel_key, source_timeout=0.01)
|
||||
unsubscribe()
|
||||
|
||||
source.put({"type": "aprs", "callsign": "N0CALL"})
|
||||
time.sleep(0.05)
|
||||
|
||||
assert source.qsize() == 1
|
||||
|
||||
|
||||
def test_fanout_delivers_buffered_message_after_re_subscribe() -> None:
|
||||
"""A message queued while disconnected should be delivered on reconnect."""
|
||||
source = queue.Queue()
|
||||
channel_key = _channel_key("sse-resub")
|
||||
|
||||
_, unsubscribe = subscribe_fanout_queue(source, channel_key=channel_key, source_timeout=0.01)
|
||||
unsubscribe()
|
||||
|
||||
expected = {"type": "aprs", "callsign": "K1ABC"}
|
||||
source.put(expected)
|
||||
|
||||
subscriber, unsubscribe2 = subscribe_fanout_queue(
|
||||
source,
|
||||
channel_key=channel_key,
|
||||
source_timeout=0.01,
|
||||
)
|
||||
try:
|
||||
got = subscriber.get(timeout=0.25)
|
||||
finally:
|
||||
unsubscribe2()
|
||||
|
||||
assert got == expected
|
||||
+31
-3
@@ -27,6 +27,14 @@ _fanout_channels_lock = threading.Lock()
|
||||
def _run_fanout(channel: _QueueFanoutChannel) -> None:
|
||||
"""Drain source queue and fan out each message to all subscribers."""
|
||||
while True:
|
||||
with channel.lock:
|
||||
has_subscribers = bool(channel.subscribers)
|
||||
|
||||
if not has_subscribers:
|
||||
# Do not drain source_queue when no clients are connected.
|
||||
time.sleep(channel.source_timeout)
|
||||
continue
|
||||
|
||||
try:
|
||||
msg = channel.source_queue.get(timeout=channel.source_timeout)
|
||||
except queue.Empty:
|
||||
@@ -35,6 +43,14 @@ def _run_fanout(channel: _QueueFanoutChannel) -> None:
|
||||
with channel.lock:
|
||||
subscribers = tuple(channel.subscribers)
|
||||
|
||||
if not subscribers:
|
||||
# Subscriber set changed after we dequeued; requeue best-effort.
|
||||
try:
|
||||
channel.source_queue.put_nowait(msg)
|
||||
except queue.Full:
|
||||
pass
|
||||
continue
|
||||
|
||||
for subscriber in subscribers:
|
||||
try:
|
||||
subscriber.put_nowait(msg)
|
||||
@@ -52,13 +68,24 @@ def _ensure_fanout_channel(
|
||||
source_queue: queue.Queue,
|
||||
source_timeout: float,
|
||||
) -> _QueueFanoutChannel:
|
||||
"""Get/create a fanout channel and ensure distributor thread is running."""
|
||||
"""Get/create a fanout channel."""
|
||||
with _fanout_channels_lock:
|
||||
channel = _fanout_channels.get(channel_key)
|
||||
if channel is None:
|
||||
channel = _QueueFanoutChannel(source_queue=source_queue, source_timeout=source_timeout)
|
||||
_fanout_channels[channel_key] = channel
|
||||
|
||||
if channel.source_queue is not source_queue:
|
||||
# Keep channel in sync if source queue object is replaced.
|
||||
channel.source_queue = source_queue
|
||||
channel.source_timeout = source_timeout
|
||||
|
||||
return channel
|
||||
|
||||
|
||||
def _ensure_distributor_running(channel: _QueueFanoutChannel, channel_key: str) -> None:
|
||||
"""Ensure fanout distributor thread is running for a channel."""
|
||||
with _fanout_channels_lock:
|
||||
if channel.distributor is None or not channel.distributor.is_alive():
|
||||
channel.distributor = threading.Thread(
|
||||
target=_run_fanout,
|
||||
@@ -68,8 +95,6 @@ def _ensure_fanout_channel(
|
||||
)
|
||||
channel.distributor.start()
|
||||
|
||||
return channel
|
||||
|
||||
|
||||
def subscribe_fanout_queue(
|
||||
source_queue: queue.Queue,
|
||||
@@ -89,6 +114,9 @@ def subscribe_fanout_queue(
|
||||
with channel.lock:
|
||||
channel.subscribers.add(subscriber)
|
||||
|
||||
# Start distributor only after subscriber is registered to avoid initial-loss race.
|
||||
_ensure_distributor_running(channel, channel_key)
|
||||
|
||||
def _unsubscribe() -> None:
|
||||
with channel.lock:
|
||||
channel.subscribers.discard(subscriber)
|
||||
|
||||
Reference in New Issue
Block a user