diff --git a/routes/morse.py b/routes/morse.py index 779b3df..ae793f8 100644 --- a/routes/morse.py +++ b/routes/morse.py @@ -112,6 +112,44 @@ def _stdout_target_path() -> str: return '-' +def _is_real_tool_binary(cmd: list[str]) -> bool: + """Heuristic to avoid FIFO plumbing in unit tests with fake binaries.""" + if not cmd: + return False + exe = str(cmd[0] or '') + return exe.startswith('/') and Path(exe).exists() + + +def _prepare_fifo_output(cmd: list[str], *, token: str) -> tuple[list[str], Any | None, str | None]: + """Optionally route rtl_* output through a named pipe for robust reading.""" + if os.name != 'posix' or not _is_real_tool_binary(cmd): + return cmd, None, None + if not cmd: + return cmd, None, None + try: + fifo_dir = Path(tempfile.gettempdir()) + fifo_path = fifo_dir / f'morse_{token}_{int(time.time() * 1000)}.fifo' + with contextlib.suppress(FileNotFoundError): + fifo_path.unlink() + os.mkfifo(fifo_path, 0o600) + + fifo_cmd = list(cmd) + if fifo_cmd: + fifo_cmd[-1] = str(fifo_path) + + reader_fd = os.open(str(fifo_path), os.O_RDONLY | os.O_NONBLOCK) + reader = os.fdopen(reader_fd, 'rb', buffering=0) + return fifo_cmd, reader, str(fifo_path) + except Exception: + with contextlib.suppress(Exception): + if 'reader' in locals() and reader is not None: + reader.close() + with contextlib.suppress(Exception): + if 'fifo_path' in locals(): + Path(fifo_path).unlink(missing_ok=True) + return cmd, None, None + + def _bool_value(value: Any, default: bool = False) -> bool: if isinstance(value, bool): return value @@ -438,6 +476,8 @@ def start_morse() -> Response: decoder_thread: threading.Thread | None = None stderr_thread: threading.Thread | None = None control_queue: queue.Queue | None = None + decoder_input: Any | None = None + fifo_path: str | None = None runtime_config: dict[str, Any] = { 'sample_rate': sample_rate, @@ -462,12 +502,18 @@ def start_morse() -> Response: attempt_control_queue: queue.Queue | None, attempt_decoder_thread: threading.Thread | None, attempt_stderr_thread: threading.Thread | None, + attempt_stream_handle: Any | None = None, + attempt_fifo_path: str | None = None, ) -> None: if attempt_stop_event is not None: attempt_stop_event.set() if attempt_control_queue is not None: with contextlib.suppress(queue.Full): attempt_control_queue.put_nowait({'cmd': 'shutdown'}) + if attempt_stream_handle is not None and ( + proc is None or attempt_stream_handle is not getattr(proc, 'stdout', None) + ): + _close_pipe(attempt_stream_handle) if proc is not None: # Close stdout to unblock decoder reads. Keep stderr open until # after stderr monitor thread exits to avoid ValueError races. @@ -493,6 +539,9 @@ def start_morse() -> Response: _close_pipe(getattr(proc, 'stderr', None)) _join_thread(attempt_stderr_thread, timeout_s=0.15) _close_pipe(getattr(proc, 'stderr', None)) + if attempt_fifo_path: + with contextlib.suppress(Exception): + Path(attempt_fifo_path).unlink(missing_ok=True) attempt_errors: list[str] = [] full_cmd = '' @@ -551,13 +600,30 @@ def start_morse() -> Response: 'text': f'[cmd] {full_cmd}', }) - rtl_process = subprocess.Popen( + fifo_cmd, fifo_reader, fifo_path = _prepare_fifo_output( rtl_cmd, - stdout=subprocess.PIPE, + token=f'{morse_session_id}_{attempt_index}', + ) + if fifo_cmd is not rtl_cmd: + full_cmd = ' '.join(fifo_cmd) + logger.info( + f'Morse decoder attempt {attempt_index}/{len(command_attempts)} ' + f'({attempt_desc}) via fifo: {full_cmd}' + ) + with contextlib.suppress(queue.Full): + app_module.morse_queue.put_nowait({ + 'type': 'info', + 'text': f'[cmd] {full_cmd}', + }) + + rtl_process = subprocess.Popen( + fifo_cmd, + stdout=(subprocess.DEVNULL if fifo_reader is not None else subprocess.PIPE), stderr=subprocess.PIPE, bufsize=0, ) register_process(rtl_process) + decoder_input = fifo_reader if fifo_reader is not None else rtl_process.stdout stop_event = threading.Event() control_queue = queue.Queue(maxsize=16) @@ -605,7 +671,7 @@ def start_morse() -> Response: decoder_thread = threading.Thread( target=thread_target, args=( - rtl_process.stdout, + decoder_input, app_module.morse_queue, stop_event, iq_sample_rate, @@ -626,7 +692,7 @@ def start_morse() -> Response: decoder_thread = threading.Thread( target=thread_target, args=( - rtl_process.stdout, + decoder_input, app_module.morse_queue, stop_event, sample_rate, @@ -718,12 +784,16 @@ def start_morse() -> Response: control_queue, decoder_thread, stderr_thread, + decoder_input, + fifo_path, ) rtl_process = None stop_event = None control_queue = None decoder_thread = None stderr_thread = None + decoder_input = None + fifo_path = None if rtl_process is None or stop_event is None or control_queue is None or decoder_thread is None: msg = 'SDR capture started but no PCM stream was received.' @@ -745,6 +815,8 @@ def start_morse() -> Response: app_module.morse_process._decoder_thread = decoder_thread app_module.morse_process._stderr_thread = stderr_thread app_module.morse_process._control_queue = control_queue + app_module.morse_process._stream_handle = decoder_input + app_module.morse_process._fifo_path = fifo_path morse_stop_event = stop_event morse_control_queue = control_queue @@ -772,6 +844,13 @@ def start_morse() -> Response: except FileNotFoundError as e: if rtl_process is not None: unregister_process(rtl_process) + if decoder_input is not None and ( + rtl_process is None or decoder_input is not getattr(rtl_process, 'stdout', None) + ): + _close_pipe(decoder_input) + if fifo_path: + with contextlib.suppress(Exception): + Path(fifo_path).unlink(missing_ok=True) with app_module.morse_lock: if morse_active_device is not None: app_module.release_sdr_device(morse_active_device) @@ -785,6 +864,13 @@ def start_morse() -> Response: if rtl_process is not None: safe_terminate(rtl_process, timeout=0.5) unregister_process(rtl_process) + if decoder_input is not None and ( + rtl_process is None or decoder_input is not getattr(rtl_process, 'stdout', None) + ): + _close_pipe(decoder_input) + if fifo_path: + with contextlib.suppress(Exception): + Path(fifo_path).unlink(missing_ok=True) if stop_event is not None: stop_event.set() _join_thread(decoder_thread, timeout_s=0.25) @@ -815,6 +901,8 @@ def stop_morse() -> Response: decoder_thread = morse_decoder_worker or getattr(proc, '_decoder_thread', None) stderr_thread = morse_stderr_worker or getattr(proc, '_stderr_thread', None) control_queue = morse_control_queue or getattr(proc, '_control_queue', None) + stream_handle = getattr(proc, '_stream_handle', None) if proc else None + fifo_path = getattr(proc, '_fifo_path', None) if proc else None active_device = morse_active_device if not proc and not stop_event and not decoder_thread and not stderr_thread: @@ -848,6 +936,12 @@ def stop_morse() -> Response: control_queue.put_nowait({'cmd': 'shutdown'}) _mark('control_queue shutdown signal sent') + if stream_handle is not None and ( + proc is None or stream_handle is not getattr(proc, 'stdout', None) + ): + _close_pipe(stream_handle) + _mark('decoder input stream closed') + if proc is not None: _close_pipe(getattr(proc, 'stdout', None)) _mark('stdout pipe closed') @@ -865,6 +959,10 @@ def stop_morse() -> Response: _mark('stderr pipe force-closed') _close_pipe(getattr(proc, 'stderr', None)) _mark('stderr pipe closed') + if fifo_path: + with contextlib.suppress(Exception): + Path(fifo_path).unlink(missing_ok=True) + _mark('fifo path removed') _mark(f'decoder thread joined={decoder_joined}') _mark(f'stderr thread joined={stderr_joined}')