diff --git a/routes/morse.py b/routes/morse.py index 0e953b0..8936f3c 100644 --- a/routes/morse.py +++ b/routes/morse.py @@ -544,6 +544,7 @@ def start_morse() -> Response: stop_event = threading.Event() control_queue = queue.Queue(maxsize=16) pcm_ready_event = threading.Event() + stream_ready_event = threading.Event() attempt_stderr_lines: list[str] = [] def monitor_stderr( @@ -598,6 +599,7 @@ def start_morse() -> Response: 'decoder_config': runtime_config, 'control_queue': control_queue, 'pcm_ready_event': pcm_ready_event, + 'stream_ready_event': stream_ready_event, }, daemon=True, name='morse-decoder', @@ -617,6 +619,7 @@ def start_morse() -> Response: 'decoder_config': runtime_config, 'control_queue': control_queue, 'pcm_ready_event': pcm_ready_event, + 'stream_ready_event': stream_ready_event, 'strip_text_chunks': False, }, daemon=True, @@ -642,6 +645,8 @@ def start_morse() -> Response: startup_error = 'No PCM samples received within startup timeout' if attempt_stderr_lines: startup_error = f'{startup_error}; stderr: {attempt_stderr_lines[-1]}' + if stream_ready_event.is_set(): + startup_error = f'{startup_error}; stream=alive' is_last_attempt = attempt_index == len(command_attempts) if ( diff --git a/utils/morse.py b/utils/morse.py index 915b953..58c03dc 100644 --- a/utils/morse.py +++ b/utils/morse.py @@ -11,7 +11,9 @@ from __future__ import annotations import contextlib import math +import os import queue +import select import struct import threading import time @@ -763,6 +765,7 @@ def morse_decoder_thread( decoder_config: dict[str, Any] | None = None, control_queue: queue.Queue | None = None, pcm_ready_event: threading.Event | None = None, + stream_ready_event: threading.Event | None = None, strip_text_chunks: bool = False, ) -> None: """Decode Morse from live PCM stream and push events to *output_queue*.""" @@ -800,16 +803,26 @@ def morse_decoder_thread( first_pcm_logged = False reader_done = threading.Event() reader_thread: threading.Thread | None = None + first_raw_logged = False raw_queue: queue.Queue[bytes] = queue.Queue(maxsize=96) try: def _reader_loop() -> None: """Blocking PCM reader isolated from decode/control loop.""" + nonlocal first_raw_logged try: + fd = None + with contextlib.suppress(Exception): + fd = rtl_stdout.fileno() while not stop_event.is_set(): try: - if hasattr(rtl_stdout, 'read1'): + if fd is not None: + ready, _, _ = select.select([fd], [], [], 0.20) + if not ready: + continue + data = os.read(fd, CHUNK) + elif hasattr(rtl_stdout, 'read1'): data = rtl_stdout.read1(CHUNK) else: data = rtl_stdout.read(CHUNK) @@ -827,6 +840,16 @@ def morse_decoder_thread( if not data: break + if not first_raw_logged: + first_raw_logged = True + if stream_ready_event is not None: + stream_ready_event.set() + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'info', + 'text': f'[pcm] first raw chunk: {len(data)} bytes', + }) + if strip_text_chunks and _is_probably_rtl_log_text(data): try: text = data.decode('utf-8', errors='replace') @@ -1026,6 +1049,7 @@ def morse_iq_decoder_thread( decoder_config: dict[str, Any] | None = None, control_queue: queue.Queue | None = None, pcm_ready_event: threading.Event | None = None, + stream_ready_event: threading.Event | None = None, ) -> None: """Decode Morse from raw IQ (cu8) by in-process USB demodulation.""" import logging @@ -1062,15 +1086,25 @@ def morse_iq_decoder_thread( first_pcm_logged = False reader_done = threading.Event() reader_thread: threading.Thread | None = None + first_raw_logged = False raw_queue: queue.Queue[bytes] = queue.Queue(maxsize=96) try: def _reader_loop() -> None: + nonlocal first_raw_logged try: + fd = None + with contextlib.suppress(Exception): + fd = iq_stdout.fileno() while not stop_event.is_set(): try: - if hasattr(iq_stdout, 'read1'): + if fd is not None: + ready, _, _ = select.select([fd], [], [], 0.20) + if not ready: + continue + data = os.read(fd, CHUNK) + elif hasattr(iq_stdout, 'read1'): data = iq_stdout.read1(CHUNK) else: data = iq_stdout.read(CHUNK) @@ -1087,6 +1121,16 @@ def morse_iq_decoder_thread( if not data: break + if not first_raw_logged: + first_raw_logged = True + if stream_ready_event is not None: + stream_ready_event.set() + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'info', + 'text': f'[iq] first raw chunk: {len(data)} bytes', + }) + try: raw_queue.put(data, timeout=0.2) except queue.Full: