Add FIFO transport fallback for Morse SDR sample stream

This commit is contained in:
Smittix
2026-02-26 16:25:37 +00:00
parent 98642e43c7
commit fdffb8e88e

View File

@@ -112,6 +112,44 @@ def _stdout_target_path() -> str:
return '-'
def _is_real_tool_binary(cmd: list[str]) -> bool:
"""Heuristic to avoid FIFO plumbing in unit tests with fake binaries."""
if not cmd:
return False
exe = str(cmd[0] or '')
return exe.startswith('/') and Path(exe).exists()
def _prepare_fifo_output(cmd: list[str], *, token: str) -> tuple[list[str], Any | None, str | None]:
"""Optionally route rtl_* output through a named pipe for robust reading."""
if os.name != 'posix' or not _is_real_tool_binary(cmd):
return cmd, None, None
if not cmd:
return cmd, None, None
try:
fifo_dir = Path(tempfile.gettempdir())
fifo_path = fifo_dir / f'morse_{token}_{int(time.time() * 1000)}.fifo'
with contextlib.suppress(FileNotFoundError):
fifo_path.unlink()
os.mkfifo(fifo_path, 0o600)
fifo_cmd = list(cmd)
if fifo_cmd:
fifo_cmd[-1] = str(fifo_path)
reader_fd = os.open(str(fifo_path), os.O_RDONLY | os.O_NONBLOCK)
reader = os.fdopen(reader_fd, 'rb', buffering=0)
return fifo_cmd, reader, str(fifo_path)
except Exception:
with contextlib.suppress(Exception):
if 'reader' in locals() and reader is not None:
reader.close()
with contextlib.suppress(Exception):
if 'fifo_path' in locals():
Path(fifo_path).unlink(missing_ok=True)
return cmd, None, None
def _bool_value(value: Any, default: bool = False) -> bool:
if isinstance(value, bool):
return value
@@ -438,6 +476,8 @@ def start_morse() -> Response:
decoder_thread: threading.Thread | None = None
stderr_thread: threading.Thread | None = None
control_queue: queue.Queue | None = None
decoder_input: Any | None = None
fifo_path: str | None = None
runtime_config: dict[str, Any] = {
'sample_rate': sample_rate,
@@ -462,12 +502,18 @@ def start_morse() -> Response:
attempt_control_queue: queue.Queue | None,
attempt_decoder_thread: threading.Thread | None,
attempt_stderr_thread: threading.Thread | None,
attempt_stream_handle: Any | None = None,
attempt_fifo_path: str | None = None,
) -> None:
if attempt_stop_event is not None:
attempt_stop_event.set()
if attempt_control_queue is not None:
with contextlib.suppress(queue.Full):
attempt_control_queue.put_nowait({'cmd': 'shutdown'})
if attempt_stream_handle is not None and (
proc is None or attempt_stream_handle is not getattr(proc, 'stdout', None)
):
_close_pipe(attempt_stream_handle)
if proc is not None:
# Close stdout to unblock decoder reads. Keep stderr open until
# after stderr monitor thread exits to avoid ValueError races.
@@ -493,6 +539,9 @@ def start_morse() -> Response:
_close_pipe(getattr(proc, 'stderr', None))
_join_thread(attempt_stderr_thread, timeout_s=0.15)
_close_pipe(getattr(proc, 'stderr', None))
if attempt_fifo_path:
with contextlib.suppress(Exception):
Path(attempt_fifo_path).unlink(missing_ok=True)
attempt_errors: list[str] = []
full_cmd = ''
@@ -551,13 +600,30 @@ def start_morse() -> Response:
'text': f'[cmd] {full_cmd}',
})
rtl_process = subprocess.Popen(
fifo_cmd, fifo_reader, fifo_path = _prepare_fifo_output(
rtl_cmd,
stdout=subprocess.PIPE,
token=f'{morse_session_id}_{attempt_index}',
)
if fifo_cmd is not rtl_cmd:
full_cmd = ' '.join(fifo_cmd)
logger.info(
f'Morse decoder attempt {attempt_index}/{len(command_attempts)} '
f'({attempt_desc}) via fifo: {full_cmd}'
)
with contextlib.suppress(queue.Full):
app_module.morse_queue.put_nowait({
'type': 'info',
'text': f'[cmd] {full_cmd}',
})
rtl_process = subprocess.Popen(
fifo_cmd,
stdout=(subprocess.DEVNULL if fifo_reader is not None else subprocess.PIPE),
stderr=subprocess.PIPE,
bufsize=0,
)
register_process(rtl_process)
decoder_input = fifo_reader if fifo_reader is not None else rtl_process.stdout
stop_event = threading.Event()
control_queue = queue.Queue(maxsize=16)
@@ -605,7 +671,7 @@ def start_morse() -> Response:
decoder_thread = threading.Thread(
target=thread_target,
args=(
rtl_process.stdout,
decoder_input,
app_module.morse_queue,
stop_event,
iq_sample_rate,
@@ -626,7 +692,7 @@ def start_morse() -> Response:
decoder_thread = threading.Thread(
target=thread_target,
args=(
rtl_process.stdout,
decoder_input,
app_module.morse_queue,
stop_event,
sample_rate,
@@ -718,12 +784,16 @@ def start_morse() -> Response:
control_queue,
decoder_thread,
stderr_thread,
decoder_input,
fifo_path,
)
rtl_process = None
stop_event = None
control_queue = None
decoder_thread = None
stderr_thread = None
decoder_input = None
fifo_path = None
if rtl_process is None or stop_event is None or control_queue is None or decoder_thread is None:
msg = 'SDR capture started but no PCM stream was received.'
@@ -745,6 +815,8 @@ def start_morse() -> Response:
app_module.morse_process._decoder_thread = decoder_thread
app_module.morse_process._stderr_thread = stderr_thread
app_module.morse_process._control_queue = control_queue
app_module.morse_process._stream_handle = decoder_input
app_module.morse_process._fifo_path = fifo_path
morse_stop_event = stop_event
morse_control_queue = control_queue
@@ -772,6 +844,13 @@ def start_morse() -> Response:
except FileNotFoundError as e:
if rtl_process is not None:
unregister_process(rtl_process)
if decoder_input is not None and (
rtl_process is None or decoder_input is not getattr(rtl_process, 'stdout', None)
):
_close_pipe(decoder_input)
if fifo_path:
with contextlib.suppress(Exception):
Path(fifo_path).unlink(missing_ok=True)
with app_module.morse_lock:
if morse_active_device is not None:
app_module.release_sdr_device(morse_active_device)
@@ -785,6 +864,13 @@ def start_morse() -> Response:
if rtl_process is not None:
safe_terminate(rtl_process, timeout=0.5)
unregister_process(rtl_process)
if decoder_input is not None and (
rtl_process is None or decoder_input is not getattr(rtl_process, 'stdout', None)
):
_close_pipe(decoder_input)
if fifo_path:
with contextlib.suppress(Exception):
Path(fifo_path).unlink(missing_ok=True)
if stop_event is not None:
stop_event.set()
_join_thread(decoder_thread, timeout_s=0.25)
@@ -815,6 +901,8 @@ def stop_morse() -> Response:
decoder_thread = morse_decoder_worker or getattr(proc, '_decoder_thread', None)
stderr_thread = morse_stderr_worker or getattr(proc, '_stderr_thread', None)
control_queue = morse_control_queue or getattr(proc, '_control_queue', None)
stream_handle = getattr(proc, '_stream_handle', None) if proc else None
fifo_path = getattr(proc, '_fifo_path', None) if proc else None
active_device = morse_active_device
if not proc and not stop_event and not decoder_thread and not stderr_thread:
@@ -848,6 +936,12 @@ def stop_morse() -> Response:
control_queue.put_nowait({'cmd': 'shutdown'})
_mark('control_queue shutdown signal sent')
if stream_handle is not None and (
proc is None or stream_handle is not getattr(proc, 'stdout', None)
):
_close_pipe(stream_handle)
_mark('decoder input stream closed')
if proc is not None:
_close_pipe(getattr(proc, 'stdout', None))
_mark('stdout pipe closed')
@@ -865,6 +959,10 @@ def stop_morse() -> Response:
_mark('stderr pipe force-closed')
_close_pipe(getattr(proc, 'stderr', None))
_mark('stderr pipe closed')
if fifo_path:
with contextlib.suppress(Exception):
Path(fifo_path).unlink(missing_ok=True)
_mark('fifo path removed')
_mark(f'decoder thread joined={decoder_joined}')
_mark(f'stderr thread joined={stderr_joined}')