Move Morse PCM ingestion to dedicated reader thread

This commit is contained in:
Smittix
2026-02-26 11:53:03 +00:00
parent d9228fb05a
commit 63cc1647fb

View File

@@ -11,9 +11,7 @@ from __future__ import annotations
import contextlib
import math
import os
import queue
import select
import struct
import threading
import time
@@ -769,54 +767,86 @@ def morse_decoder_thread(
last_pcm_at: float | None = None
pcm_bytes = 0
pcm_report_at = time.monotonic()
reader_done = threading.Event()
reader_thread: threading.Thread | None = None
raw_queue: queue.Queue[bytes] = queue.Queue(maxsize=96)
try:
fd: int | None
try:
fd = rtl_stdout.fileno()
except Exception:
fd = None
def _reader_loop() -> None:
"""Blocking PCM reader isolated from decode/control loop."""
try:
while not stop_event.is_set():
try:
if hasattr(rtl_stdout, 'read1'):
data = rtl_stdout.read1(CHUNK)
else:
data = rtl_stdout.read(CHUNK)
except Exception as e:
with contextlib.suppress(queue.Full):
output_queue.put_nowait({
'type': 'info',
'text': f'[pcm] reader error: {e}',
})
break
if data is None:
continue
if not data:
break
try:
raw_queue.put(data, timeout=0.2)
except queue.Full:
# Keep latest PCM flowing even if downstream hiccups.
with contextlib.suppress(queue.Empty):
raw_queue.get_nowait()
with contextlib.suppress(queue.Full):
raw_queue.put_nowait(data)
finally:
reader_done.set()
with contextlib.suppress(queue.Full):
raw_queue.put_nowait(b'')
reader_thread = threading.Thread(
target=_reader_loop,
daemon=True,
name='morse-pcm-reader',
)
reader_thread.start()
while not stop_event.is_set():
if not _drain_control_queue(control_queue, decoder):
break
data = b''
if fd is not None:
try:
ready, _, _ = select.select([fd], [], [], 0.20)
except Exception:
try:
data = raw_queue.get(timeout=0.20)
except queue.Empty:
now = time.monotonic()
should_emit_waiting = False
if last_pcm_at is None:
should_emit_waiting = True
elif (now - last_pcm_at) >= STALLED_AFTER_DATA_SECONDS:
should_emit_waiting = True
if should_emit_waiting and waiting_since is None:
waiting_since = now
if should_emit_waiting and now - last_waiting_emit >= WAITING_INTERVAL:
last_waiting_emit = now
_emit_waiting_scope(output_queue, waiting_since)
if reader_done.is_set():
break
if ready:
try:
# Use buffered stream read first for cross-platform stability.
if hasattr(rtl_stdout, 'read1'):
data = rtl_stdout.read1(CHUNK)
else:
data = os.read(fd, CHUNK)
except Exception:
break
else:
now = time.monotonic()
should_emit_waiting = False
if last_pcm_at is None:
should_emit_waiting = True
elif (now - last_pcm_at) >= STALLED_AFTER_DATA_SECONDS:
should_emit_waiting = True
if should_emit_waiting and waiting_since is None:
waiting_since = now
now = time.monotonic()
if should_emit_waiting and now - last_waiting_emit >= WAITING_INTERVAL:
last_waiting_emit = now
_emit_waiting_scope(output_queue, waiting_since)
continue
else:
# Fallback for test streams without fileno().
data = rtl_stdout.read(CHUNK)
continue
if not data:
if reader_done.is_set() and last_pcm_at is None:
with contextlib.suppress(queue.Full):
output_queue.put_nowait({
'type': 'info',
'text': '[pcm] stream ended before samples were received',
})
break
waiting_since = None
@@ -848,7 +878,16 @@ def morse_decoder_thread(
except Exception as e: # pragma: no cover - defensive runtime guard
logger.debug(f'Morse decoder thread error: {e}')
with contextlib.suppress(queue.Full):
output_queue.put_nowait({
'type': 'info',
'text': f'[pcm] decoder thread error: {e}',
})
finally:
stop_event.set()
if reader_thread is not None:
reader_thread.join(timeout=0.35)
for event in decoder.flush():
with contextlib.suppress(queue.Full):
output_queue.put_nowait(event)