diff --git a/utils/morse.py b/utils/morse.py index c21eafa..22db88f 100644 --- a/utils/morse.py +++ b/utils/morse.py @@ -11,9 +11,7 @@ from __future__ import annotations import contextlib import math -import os import queue -import select import struct import threading import time @@ -769,54 +767,86 @@ def morse_decoder_thread( last_pcm_at: float | None = None pcm_bytes = 0 pcm_report_at = time.monotonic() + reader_done = threading.Event() + reader_thread: threading.Thread | None = None + + raw_queue: queue.Queue[bytes] = queue.Queue(maxsize=96) try: - fd: int | None - try: - fd = rtl_stdout.fileno() - except Exception: - fd = None + def _reader_loop() -> None: + """Blocking PCM reader isolated from decode/control loop.""" + try: + while not stop_event.is_set(): + try: + if hasattr(rtl_stdout, 'read1'): + data = rtl_stdout.read1(CHUNK) + else: + data = rtl_stdout.read(CHUNK) + except Exception as e: + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'info', + 'text': f'[pcm] reader error: {e}', + }) + break + + if data is None: + continue + + if not data: + break + + try: + raw_queue.put(data, timeout=0.2) + except queue.Full: + # Keep latest PCM flowing even if downstream hiccups. + with contextlib.suppress(queue.Empty): + raw_queue.get_nowait() + with contextlib.suppress(queue.Full): + raw_queue.put_nowait(data) + finally: + reader_done.set() + with contextlib.suppress(queue.Full): + raw_queue.put_nowait(b'') + + reader_thread = threading.Thread( + target=_reader_loop, + daemon=True, + name='morse-pcm-reader', + ) + reader_thread.start() while not stop_event.is_set(): if not _drain_control_queue(control_queue, decoder): break - data = b'' - if fd is not None: - try: - ready, _, _ = select.select([fd], [], [], 0.20) - except Exception: + try: + data = raw_queue.get(timeout=0.20) + except queue.Empty: + now = time.monotonic() + should_emit_waiting = False + if last_pcm_at is None: + should_emit_waiting = True + elif (now - last_pcm_at) >= STALLED_AFTER_DATA_SECONDS: + should_emit_waiting = True + + if should_emit_waiting and waiting_since is None: + waiting_since = now + if should_emit_waiting and now - last_waiting_emit >= WAITING_INTERVAL: + last_waiting_emit = now + _emit_waiting_scope(output_queue, waiting_since) + + if reader_done.is_set(): break - - if ready: - try: - # Use buffered stream read first for cross-platform stability. - if hasattr(rtl_stdout, 'read1'): - data = rtl_stdout.read1(CHUNK) - else: - data = os.read(fd, CHUNK) - except Exception: - break - else: - now = time.monotonic() - should_emit_waiting = False - if last_pcm_at is None: - should_emit_waiting = True - elif (now - last_pcm_at) >= STALLED_AFTER_DATA_SECONDS: - should_emit_waiting = True - - if should_emit_waiting and waiting_since is None: - waiting_since = now - now = time.monotonic() - if should_emit_waiting and now - last_waiting_emit >= WAITING_INTERVAL: - last_waiting_emit = now - _emit_waiting_scope(output_queue, waiting_since) - continue - else: - # Fallback for test streams without fileno(). - data = rtl_stdout.read(CHUNK) + continue if not data: + if reader_done.is_set() and last_pcm_at is None: + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'info', + 'text': '[pcm] stream ended before samples were received', + }) break waiting_since = None @@ -848,7 +878,16 @@ def morse_decoder_thread( except Exception as e: # pragma: no cover - defensive runtime guard logger.debug(f'Morse decoder thread error: {e}') + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'info', + 'text': f'[pcm] decoder thread error: {e}', + }) finally: + stop_event.set() + if reader_thread is not None: + reader_thread.join(timeout=0.35) + for event in decoder.flush(): with contextlib.suppress(queue.Full): output_queue.put_nowait(event)