From 64f0e687a02f96ab9e6cb978e49d9d8f8fdcb152 Mon Sep 17 00:00:00 2001 From: Smittix Date: Thu, 26 Feb 2026 15:37:17 +0000 Subject: [PATCH] Fix Morse stderr thread race and broaden startup fallbacks --- routes/morse.py | 134 ++++++++++++++++++++++++++++++++++---------- tests/test_morse.py | 3 +- 2 files changed, 106 insertions(+), 31 deletions(-) diff --git a/routes/morse.py b/routes/morse.py index 2d99524..0e953b0 100644 --- a/routes/morse.py +++ b/routes/morse.py @@ -284,7 +284,7 @@ def start_morse() -> Response: def _build_rtl_cmd( *, - use_direct_sampling: bool, + direct_sampling_mode: int | None, force_squelch_off: bool, add_resample_rate: bool, add_dc_fast: bool, @@ -300,8 +300,8 @@ def start_morse() -> Response: } # Only rtl_fm supports direct sampling flags. - if use_direct_sampling: - fm_kwargs['direct_sampling'] = 2 + if direct_sampling_mode in (1, 2): + fm_kwargs['direct_sampling'] = int(direct_sampling_mode) cmd = builder.build_fm_demod_command(**fm_kwargs) @@ -370,20 +370,31 @@ def start_morse() -> Response: 'source': 'iq', 'direct_sampling_mode': 2, }, + { + 'source': 'iq', + 'direct_sampling_mode': 1, + }, { 'source': 'iq', 'direct_sampling_mode': None, }, { 'source': 'rtl_fm', - 'use_direct_sampling': True, + 'direct_sampling_mode': 2, 'force_squelch_off': False, 'add_resample_rate': True, 'add_dc_fast': True, }, { 'source': 'rtl_fm', - 'use_direct_sampling': False, + 'direct_sampling_mode': 1, + 'force_squelch_off': False, + 'add_resample_rate': True, + 'add_dc_fast': True, + }, + { + 'source': 'rtl_fm', + 'direct_sampling_mode': None, 'force_squelch_off': False, 'add_resample_rate': True, 'add_dc_fast': True, @@ -397,7 +408,7 @@ def start_morse() -> Response: }, { 'source': 'rtl_fm', - 'use_direct_sampling': False, + 'direct_sampling_mode': None, 'force_squelch_off': False, 'add_resample_rate': True, 'add_dc_fast': True, @@ -440,8 +451,9 @@ def start_morse() -> Response: with contextlib.suppress(queue.Full): attempt_control_queue.put_nowait({'cmd': 'shutdown'}) if proc is not None: + # Close stdout to unblock decoder reads. Keep stderr open until + # after stderr monitor thread exits to avoid ValueError races. _close_pipe(getattr(proc, 'stdout', None)) - _close_pipe(getattr(proc, 'stderr', None)) # Keep startup retries responsive; avoid long waits inside # generic safe_terminate() during a failed attempt. if proc.poll() is None: @@ -456,18 +468,33 @@ def start_morse() -> Response: proc.wait(timeout=0.25) unregister_process(proc) _join_thread(attempt_decoder_thread, timeout_s=0.20) - _join_thread(attempt_stderr_thread, timeout_s=0.20) + stderr_joined = _join_thread(attempt_stderr_thread, timeout_s=0.35) + if proc is not None: + if not stderr_joined: + # Force-close the pipe if stderr reader is still blocked. + _close_pipe(getattr(proc, 'stderr', None)) + _join_thread(attempt_stderr_thread, timeout_s=0.15) + _close_pipe(getattr(proc, 'stderr', None)) attempt_errors: list[str] = [] full_cmd = '' for attempt_index, attempt in enumerate(command_attempts, start=1): + runtime_config.pop('startup_waiting', None) + runtime_config.pop('startup_warning', None) source = str(attempt.get('source', 'rtl_fm')).strip().lower() - use_direct_sampling = bool(attempt.get('use_direct_sampling', False)) force_squelch_off = bool(attempt.get('force_squelch_off', True)) add_resample_rate = bool(attempt.get('add_resample_rate', False)) add_dc_fast = bool(attempt.get('add_dc_fast', False)) - direct_sampling_mode = attempt.get('direct_sampling_mode') + direct_sampling_mode_raw = attempt.get('direct_sampling_mode') + try: + direct_sampling_mode = ( + int(direct_sampling_mode_raw) + if direct_sampling_mode_raw is not None + else None + ) + except (TypeError, ValueError): + direct_sampling_mode = None if source == 'iq': rtl_cmd, tuned_freq_mhz = _build_iq_cmd( @@ -481,7 +508,7 @@ def start_morse() -> Response: ) else: rtl_cmd = _build_rtl_cmd( - use_direct_sampling=use_direct_sampling, + direct_sampling_mode=direct_sampling_mode, force_squelch_off=force_squelch_off, add_resample_rate=add_resample_rate, add_dc_fast=add_dc_fast, @@ -489,7 +516,7 @@ def start_morse() -> Response: tuned_freq_mhz = float(freq) thread_target = morse_decoder_thread attempt_desc = ( - f'source=rtl_fm direct={int(use_direct_sampling)} ' + f'source=rtl_fm direct_mode={direct_sampling_mode if direct_sampling_mode is not None else "none"} ' f'squelch_forced={int(force_squelch_off)} ' f'resample={int(add_resample_rate)} dc_fast={int(add_dc_fast)}' ) @@ -517,24 +544,40 @@ def start_morse() -> Response: stop_event = threading.Event() control_queue = queue.Queue(maxsize=16) pcm_ready_event = threading.Event() + attempt_stderr_lines: list[str] = [] def monitor_stderr( proc: subprocess.Popen = rtl_process, proc_stop_event: threading.Event = stop_event, tool_label: str = rtl_cmd[0], + stderr_lines: list[str] = attempt_stderr_lines, ) -> None: - if proc.stderr is None: + try: + stderr_stream = proc.stderr + if stderr_stream is None: + return + while not proc_stop_event.is_set(): + line = stderr_stream.readline() + if not line: + if proc.poll() is not None: + break + time.sleep(0.02) + continue + err_text = line.decode('utf-8', errors='replace').strip() + if err_text: + if len(stderr_lines) >= 40: + del stderr_lines[:10] + stderr_lines.append(err_text) + with contextlib.suppress(queue.Full): + app_module.morse_queue.put_nowait({ + 'type': 'info', + 'text': f'[{tool_label}] {err_text}', + }) + except ValueError: + # Pipe was closed during shutdown; expected during retries. + return + except Exception: return - for line in proc.stderr: - if proc_stop_event.is_set(): - break - err_text = line.decode('utf-8', errors='replace').strip() - if err_text: - with contextlib.suppress(queue.Full): - app_module.morse_queue.put_nowait({ - 'type': 'info', - 'text': f'[{tool_label}] {err_text}', - }) stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr') stderr_thread.start() @@ -581,7 +624,7 @@ def start_morse() -> Response: ) decoder_thread.start() - startup_deadline = time.monotonic() + (2.5 if source == 'iq' else 1.2) + startup_deadline = time.monotonic() + (4.0 if source == 'iq' else 2.0) startup_ok = False startup_error = '' @@ -594,6 +637,34 @@ def start_morse() -> Response: break time.sleep(0.05) + if not startup_ok: + if not startup_error: + startup_error = 'No PCM samples received within startup timeout' + if attempt_stderr_lines: + startup_error = f'{startup_error}; stderr: {attempt_stderr_lines[-1]}' + + is_last_attempt = attempt_index == len(command_attempts) + if ( + is_last_attempt + and rtl_process.poll() is None + and decoder_thread.is_alive() + ): + # Avoid hard-failing startup when SDR is alive but muted. + startup_ok = True + runtime_config['startup_waiting'] = True + runtime_config['startup_warning'] = startup_error + logger.warning( + 'Morse startup continuing without PCM (attempt %s/%s): %s', + attempt_index, + len(command_attempts), + startup_error, + ) + with contextlib.suppress(queue.Full): + app_module.morse_queue.put_nowait({ + 'type': 'info', + 'text': '[morse] stream alive but no PCM yet; continuing in waiting mode', + }) + if startup_ok: runtime_config['source'] = source runtime_config['command'] = full_cmd @@ -601,15 +672,12 @@ def start_morse() -> Response: runtime_config['direct_sampling'] = ( int(direct_sampling_mode) if source == 'iq' and direct_sampling_mode is not None - else (2 if use_direct_sampling else 0) + else (int(direct_sampling_mode) if direct_sampling_mode is not None else 0) ) runtime_config['iq_sample_rate'] = iq_sample_rate if source == 'iq' else None runtime_config['direct_sampling_mode'] = direct_sampling_mode if source == 'iq' else None break - if not startup_error: - startup_error = 'No PCM samples received within startup timeout' - attempt_errors.append( f'attempt {attempt_index}/{len(command_attempts)} ({attempt_desc}): {startup_error}' ) @@ -759,8 +827,7 @@ def stop_morse() -> Response: if proc is not None: _close_pipe(getattr(proc, 'stdout', None)) - _close_pipe(getattr(proc, 'stderr', None)) - _mark('stdout/stderr pipes closed') + _mark('stdout pipe closed') safe_terminate(proc, timeout=0.6) unregister_process(proc) @@ -768,6 +835,13 @@ def stop_morse() -> Response: decoder_joined = _join_thread(decoder_thread, timeout_s=0.45) stderr_joined = _join_thread(stderr_thread, timeout_s=0.45) + if proc is not None: + if not stderr_joined: + _close_pipe(getattr(proc, 'stderr', None)) + stderr_joined = _join_thread(stderr_thread, timeout_s=0.20) + _mark('stderr pipe force-closed') + _close_pipe(getattr(proc, 'stderr', None)) + _mark('stderr pipe closed') _mark(f'decoder thread joined={decoder_joined}') _mark(f'stderr thread joined={stderr_joined}') diff --git a/tests/test_morse.py b/tests/test_morse.py index 8d48b88..a53274f 100644 --- a/tests/test_morse.py +++ b/tests/test_morse.py @@ -393,7 +393,8 @@ class TestMorseLifecycleRoutes: assert '-D' in popen_cmds[0] assert '2' in popen_cmds[0] assert popen_cmds[1][0] == 'rtl_sdr' - assert '-D' not in popen_cmds[1] + assert '-D' in popen_cmds[1] + assert '1' in popen_cmds[1] stop_resp = client.post('/morse/stop') assert stop_resp.status_code == 200