mirror of
https://github.com/smittix/intercept.git
synced 2026-06-08 22:21:55 -07:00
Fix SSE fanout backlog causing delayed bursty updates
This commit is contained in:
+13
-8
@@ -6,6 +6,8 @@ import queue
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from utils.sse import subscribe_fanout_queue
|
||||
|
||||
|
||||
@@ -13,8 +15,8 @@ def _channel_key(prefix: str) -> str:
|
||||
return f"{prefix}-{uuid.uuid4()}"
|
||||
|
||||
|
||||
def test_fanout_does_not_drain_source_queue_without_subscribers() -> None:
|
||||
"""Queued messages should remain buffered while no SSE clients are connected."""
|
||||
def test_fanout_drains_source_queue_without_subscribers() -> None:
|
||||
"""Queued messages should be dropped while no SSE clients are connected."""
|
||||
source = queue.Queue()
|
||||
channel_key = _channel_key("sse-idle")
|
||||
|
||||
@@ -25,19 +27,18 @@ def test_fanout_does_not_drain_source_queue_without_subscribers() -> None:
|
||||
source.put({"type": "aprs", "callsign": "N0CALL"})
|
||||
time.sleep(0.05)
|
||||
|
||||
assert source.qsize() == 1
|
||||
assert source.qsize() == 0
|
||||
|
||||
|
||||
def test_fanout_delivers_buffered_message_after_re_subscribe() -> None:
|
||||
"""A message queued while disconnected should be delivered on reconnect."""
|
||||
def test_fanout_does_not_replay_stale_message_after_re_subscribe() -> None:
|
||||
"""A message queued while disconnected should not be replayed on reconnect."""
|
||||
source = queue.Queue()
|
||||
channel_key = _channel_key("sse-resub")
|
||||
|
||||
_, unsubscribe = subscribe_fanout_queue(source, channel_key=channel_key, source_timeout=0.01)
|
||||
unsubscribe()
|
||||
|
||||
expected = {"type": "aprs", "callsign": "K1ABC"}
|
||||
source.put(expected)
|
||||
source.put({"type": "aprs", "callsign": "K1ABC"})
|
||||
|
||||
subscriber, unsubscribe2 = subscribe_fanout_queue(
|
||||
source,
|
||||
@@ -45,8 +46,12 @@ def test_fanout_delivers_buffered_message_after_re_subscribe() -> None:
|
||||
source_timeout=0.01,
|
||||
)
|
||||
try:
|
||||
with pytest.raises(queue.Empty):
|
||||
subscriber.get(timeout=0.1)
|
||||
live = {"type": "aprs", "callsign": "LIVE01"}
|
||||
source.put(live)
|
||||
got = subscriber.get(timeout=0.25)
|
||||
finally:
|
||||
unsubscribe2()
|
||||
|
||||
assert got == expected
|
||||
assert got == live
|
||||
|
||||
+17
-15
@@ -26,13 +26,26 @@ _fanout_channels_lock = threading.Lock()
|
||||
|
||||
def _run_fanout(channel: _QueueFanoutChannel) -> None:
|
||||
"""Drain source queue and fan out each message to all subscribers."""
|
||||
idle_drain_batch = 512
|
||||
|
||||
while True:
|
||||
with channel.lock:
|
||||
has_subscribers = bool(channel.subscribers)
|
||||
subscribers = tuple(channel.subscribers)
|
||||
|
||||
if not has_subscribers:
|
||||
# Do not drain source_queue when no clients are connected.
|
||||
time.sleep(channel.source_timeout)
|
||||
if not subscribers:
|
||||
# Keep ingest pipelines responsive even if UI clients disconnect:
|
||||
# drain and drop stale backlog while idle so producer threads do
|
||||
# not block on full source queues.
|
||||
drained = 0
|
||||
for _ in range(idle_drain_batch):
|
||||
try:
|
||||
channel.source_queue.get_nowait()
|
||||
drained += 1
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
if drained == 0:
|
||||
time.sleep(channel.source_timeout)
|
||||
continue
|
||||
|
||||
try:
|
||||
@@ -40,17 +53,6 @@ def _run_fanout(channel: _QueueFanoutChannel) -> None:
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
with channel.lock:
|
||||
subscribers = tuple(channel.subscribers)
|
||||
|
||||
if not subscribers:
|
||||
# Subscriber set changed after we dequeued; requeue best-effort.
|
||||
try:
|
||||
channel.source_queue.put_nowait(msg)
|
||||
except queue.Full:
|
||||
pass
|
||||
continue
|
||||
|
||||
for subscriber in subscribers:
|
||||
try:
|
||||
subscriber.put_nowait(msg)
|
||||
|
||||
Reference in New Issue
Block a user