Fix Morse stderr thread race and broaden startup fallbacks

This commit is contained in:
Smittix
2026-02-26 15:37:17 +00:00
parent 6a54bc8cf3
commit 64f0e687a0
2 changed files with 106 additions and 31 deletions

View File

@@ -284,7 +284,7 @@ def start_morse() -> Response:
def _build_rtl_cmd(
*,
use_direct_sampling: bool,
direct_sampling_mode: int | None,
force_squelch_off: bool,
add_resample_rate: bool,
add_dc_fast: bool,
@@ -300,8 +300,8 @@ def start_morse() -> Response:
}
# Only rtl_fm supports direct sampling flags.
if use_direct_sampling:
fm_kwargs['direct_sampling'] = 2
if direct_sampling_mode in (1, 2):
fm_kwargs['direct_sampling'] = int(direct_sampling_mode)
cmd = builder.build_fm_demod_command(**fm_kwargs)
@@ -370,20 +370,31 @@ def start_morse() -> Response:
'source': 'iq',
'direct_sampling_mode': 2,
},
{
'source': 'iq',
'direct_sampling_mode': 1,
},
{
'source': 'iq',
'direct_sampling_mode': None,
},
{
'source': 'rtl_fm',
'use_direct_sampling': True,
'direct_sampling_mode': 2,
'force_squelch_off': False,
'add_resample_rate': True,
'add_dc_fast': True,
},
{
'source': 'rtl_fm',
'use_direct_sampling': False,
'direct_sampling_mode': 1,
'force_squelch_off': False,
'add_resample_rate': True,
'add_dc_fast': True,
},
{
'source': 'rtl_fm',
'direct_sampling_mode': None,
'force_squelch_off': False,
'add_resample_rate': True,
'add_dc_fast': True,
@@ -397,7 +408,7 @@ def start_morse() -> Response:
},
{
'source': 'rtl_fm',
'use_direct_sampling': False,
'direct_sampling_mode': None,
'force_squelch_off': False,
'add_resample_rate': True,
'add_dc_fast': True,
@@ -440,8 +451,9 @@ def start_morse() -> Response:
with contextlib.suppress(queue.Full):
attempt_control_queue.put_nowait({'cmd': 'shutdown'})
if proc is not None:
# Close stdout to unblock decoder reads. Keep stderr open until
# after stderr monitor thread exits to avoid ValueError races.
_close_pipe(getattr(proc, 'stdout', None))
_close_pipe(getattr(proc, 'stderr', None))
# Keep startup retries responsive; avoid long waits inside
# generic safe_terminate() during a failed attempt.
if proc.poll() is None:
@@ -456,18 +468,33 @@ def start_morse() -> Response:
proc.wait(timeout=0.25)
unregister_process(proc)
_join_thread(attempt_decoder_thread, timeout_s=0.20)
_join_thread(attempt_stderr_thread, timeout_s=0.20)
stderr_joined = _join_thread(attempt_stderr_thread, timeout_s=0.35)
if proc is not None:
if not stderr_joined:
# Force-close the pipe if stderr reader is still blocked.
_close_pipe(getattr(proc, 'stderr', None))
_join_thread(attempt_stderr_thread, timeout_s=0.15)
_close_pipe(getattr(proc, 'stderr', None))
attempt_errors: list[str] = []
full_cmd = ''
for attempt_index, attempt in enumerate(command_attempts, start=1):
runtime_config.pop('startup_waiting', None)
runtime_config.pop('startup_warning', None)
source = str(attempt.get('source', 'rtl_fm')).strip().lower()
use_direct_sampling = bool(attempt.get('use_direct_sampling', False))
force_squelch_off = bool(attempt.get('force_squelch_off', True))
add_resample_rate = bool(attempt.get('add_resample_rate', False))
add_dc_fast = bool(attempt.get('add_dc_fast', False))
direct_sampling_mode = attempt.get('direct_sampling_mode')
direct_sampling_mode_raw = attempt.get('direct_sampling_mode')
try:
direct_sampling_mode = (
int(direct_sampling_mode_raw)
if direct_sampling_mode_raw is not None
else None
)
except (TypeError, ValueError):
direct_sampling_mode = None
if source == 'iq':
rtl_cmd, tuned_freq_mhz = _build_iq_cmd(
@@ -481,7 +508,7 @@ def start_morse() -> Response:
)
else:
rtl_cmd = _build_rtl_cmd(
use_direct_sampling=use_direct_sampling,
direct_sampling_mode=direct_sampling_mode,
force_squelch_off=force_squelch_off,
add_resample_rate=add_resample_rate,
add_dc_fast=add_dc_fast,
@@ -489,7 +516,7 @@ def start_morse() -> Response:
tuned_freq_mhz = float(freq)
thread_target = morse_decoder_thread
attempt_desc = (
f'source=rtl_fm direct={int(use_direct_sampling)} '
f'source=rtl_fm direct_mode={direct_sampling_mode if direct_sampling_mode is not None else "none"} '
f'squelch_forced={int(force_squelch_off)} '
f'resample={int(add_resample_rate)} dc_fast={int(add_dc_fast)}'
)
@@ -517,24 +544,40 @@ def start_morse() -> Response:
stop_event = threading.Event()
control_queue = queue.Queue(maxsize=16)
pcm_ready_event = threading.Event()
attempt_stderr_lines: list[str] = []
def monitor_stderr(
proc: subprocess.Popen = rtl_process,
proc_stop_event: threading.Event = stop_event,
tool_label: str = rtl_cmd[0],
stderr_lines: list[str] = attempt_stderr_lines,
) -> None:
if proc.stderr is None:
try:
stderr_stream = proc.stderr
if stderr_stream is None:
return
while not proc_stop_event.is_set():
line = stderr_stream.readline()
if not line:
if proc.poll() is not None:
break
time.sleep(0.02)
continue
err_text = line.decode('utf-8', errors='replace').strip()
if err_text:
if len(stderr_lines) >= 40:
del stderr_lines[:10]
stderr_lines.append(err_text)
with contextlib.suppress(queue.Full):
app_module.morse_queue.put_nowait({
'type': 'info',
'text': f'[{tool_label}] {err_text}',
})
except ValueError:
# Pipe was closed during shutdown; expected during retries.
return
except Exception:
return
for line in proc.stderr:
if proc_stop_event.is_set():
break
err_text = line.decode('utf-8', errors='replace').strip()
if err_text:
with contextlib.suppress(queue.Full):
app_module.morse_queue.put_nowait({
'type': 'info',
'text': f'[{tool_label}] {err_text}',
})
stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr')
stderr_thread.start()
@@ -581,7 +624,7 @@ def start_morse() -> Response:
)
decoder_thread.start()
startup_deadline = time.monotonic() + (2.5 if source == 'iq' else 1.2)
startup_deadline = time.monotonic() + (4.0 if source == 'iq' else 2.0)
startup_ok = False
startup_error = ''
@@ -594,6 +637,34 @@ def start_morse() -> Response:
break
time.sleep(0.05)
if not startup_ok:
if not startup_error:
startup_error = 'No PCM samples received within startup timeout'
if attempt_stderr_lines:
startup_error = f'{startup_error}; stderr: {attempt_stderr_lines[-1]}'
is_last_attempt = attempt_index == len(command_attempts)
if (
is_last_attempt
and rtl_process.poll() is None
and decoder_thread.is_alive()
):
# Avoid hard-failing startup when SDR is alive but muted.
startup_ok = True
runtime_config['startup_waiting'] = True
runtime_config['startup_warning'] = startup_error
logger.warning(
'Morse startup continuing without PCM (attempt %s/%s): %s',
attempt_index,
len(command_attempts),
startup_error,
)
with contextlib.suppress(queue.Full):
app_module.morse_queue.put_nowait({
'type': 'info',
'text': '[morse] stream alive but no PCM yet; continuing in waiting mode',
})
if startup_ok:
runtime_config['source'] = source
runtime_config['command'] = full_cmd
@@ -601,15 +672,12 @@ def start_morse() -> Response:
runtime_config['direct_sampling'] = (
int(direct_sampling_mode)
if source == 'iq' and direct_sampling_mode is not None
else (2 if use_direct_sampling else 0)
else (int(direct_sampling_mode) if direct_sampling_mode is not None else 0)
)
runtime_config['iq_sample_rate'] = iq_sample_rate if source == 'iq' else None
runtime_config['direct_sampling_mode'] = direct_sampling_mode if source == 'iq' else None
break
if not startup_error:
startup_error = 'No PCM samples received within startup timeout'
attempt_errors.append(
f'attempt {attempt_index}/{len(command_attempts)} ({attempt_desc}): {startup_error}'
)
@@ -759,8 +827,7 @@ def stop_morse() -> Response:
if proc is not None:
_close_pipe(getattr(proc, 'stdout', None))
_close_pipe(getattr(proc, 'stderr', None))
_mark('stdout/stderr pipes closed')
_mark('stdout pipe closed')
safe_terminate(proc, timeout=0.6)
unregister_process(proc)
@@ -768,6 +835,13 @@ def stop_morse() -> Response:
decoder_joined = _join_thread(decoder_thread, timeout_s=0.45)
stderr_joined = _join_thread(stderr_thread, timeout_s=0.45)
if proc is not None:
if not stderr_joined:
_close_pipe(getattr(proc, 'stderr', None))
stderr_joined = _join_thread(stderr_thread, timeout_s=0.20)
_mark('stderr pipe force-closed')
_close_pipe(getattr(proc, 'stderr', None))
_mark('stderr pipe closed')
_mark(f'decoder thread joined={decoder_joined}')
_mark(f'stderr thread joined={stderr_joined}')

View File

@@ -393,7 +393,8 @@ class TestMorseLifecycleRoutes:
assert '-D' in popen_cmds[0]
assert '2' in popen_cmds[0]
assert popen_cmds[1][0] == 'rtl_sdr'
assert '-D' not in popen_cmds[1]
assert '-D' in popen_cmds[1]
assert '1' in popen_cmds[1]
stop_resp = client.post('/morse/stop')
assert stop_resp.status_code == 200