diff --git a/routes/morse.py b/routes/morse.py index 2fe823e..4108944 100644 --- a/routes/morse.py +++ b/routes/morse.py @@ -278,39 +278,52 @@ def start_morse() -> Response: sdr_device = SDRFactory.create_default_device(sdr_type, index=device) builder = SDRFactory.get_builder(sdr_device.sdr_type) - fm_kwargs: dict[str, Any] = { - 'device': sdr_device, - 'frequency_mhz': freq, - 'sample_rate': sample_rate, - 'gain': float(gain) if gain and gain != '0' else None, - 'ppm': int(ppm) if ppm and ppm != '0' else None, - 'modulation': 'usb', - 'bias_t': bias_t, - } + def _build_rtl_cmd(*, use_direct_sampling: bool, force_squelch_off: bool) -> list[str]: + fm_kwargs: dict[str, Any] = { + 'device': sdr_device, + 'frequency_mhz': freq, + 'sample_rate': sample_rate, + 'gain': float(gain) if gain and gain != '0' else None, + 'ppm': int(ppm) if ppm and ppm != '0' else None, + 'modulation': 'usb', + 'bias_t': bias_t, + } - # Only rtl_fm supports direct sampling flags. - if sdr_device.sdr_type == SDRType.RTL_SDR and freq < 24.0: - fm_kwargs['direct_sampling'] = 2 + # Only rtl_fm supports direct sampling flags. + if use_direct_sampling: + fm_kwargs['direct_sampling'] = 2 - rtl_cmd = builder.build_fm_demod_command( - **fm_kwargs, - ) + cmd = builder.build_fm_demod_command(**fm_kwargs) - # Some rtl_fm builds behave as if squelch is enabled unless -l is explicit. - # Force continuous audio for CW analysis. - if sdr_device.sdr_type == SDRType.RTL_SDR and '-l' not in rtl_cmd: - if rtl_cmd and rtl_cmd[-1] == '-': - rtl_cmd[-1:-1] = ['-l', '0'] - else: - rtl_cmd.extend(['-l', '0']) + # Some rtl_fm builds behave as if squelch is enabled unless -l is explicit. + # Force continuous audio for CW analysis. + if force_squelch_off and sdr_device.sdr_type == SDRType.RTL_SDR and '-l' not in cmd: + if cmd and cmd[-1] == '-': + cmd[-1:-1] = ['-l', '0'] + else: + cmd.extend(['-l', '0']) + return cmd - full_cmd = ' '.join(rtl_cmd) - logger.info(f'Morse decoder running: {full_cmd}') + can_try_direct_sampling = bool(sdr_device.sdr_type == SDRType.RTL_SDR and freq < 24.0) + if can_try_direct_sampling: + # Prefer direct2 with explicit squelch-off, then retry with safer variants + # if the command starts but never emits PCM. + command_attempts = [ + {'use_direct_sampling': True, 'force_squelch_off': True}, + {'use_direct_sampling': True, 'force_squelch_off': False}, + {'use_direct_sampling': False, 'force_squelch_off': True}, + ] + else: + command_attempts = [ + {'use_direct_sampling': False, 'force_squelch_off': True}, + {'use_direct_sampling': False, 'force_squelch_off': False}, + ] rtl_process: subprocess.Popen | None = None stop_event: threading.Event | None = None decoder_thread: threading.Thread | None = None stderr_thread: threading.Thread | None = None + control_queue: queue.Queue | None = None runtime_config: dict[str, Any] = { 'sample_rate': sample_rate, @@ -329,70 +342,150 @@ def start_morse() -> Response: } try: - rtl_process = subprocess.Popen( - rtl_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - bufsize=0, - ) - register_process(rtl_process) + def _cleanup_attempt( + proc: subprocess.Popen | None, + attempt_stop_event: threading.Event | None, + attempt_control_queue: queue.Queue | None, + attempt_decoder_thread: threading.Thread | None, + attempt_stderr_thread: threading.Thread | None, + ) -> None: + if attempt_stop_event is not None: + attempt_stop_event.set() + if attempt_control_queue is not None: + with contextlib.suppress(queue.Full): + attempt_control_queue.put_nowait({'cmd': 'shutdown'}) + if proc is not None: + _close_pipe(getattr(proc, 'stdout', None)) + _close_pipe(getattr(proc, 'stderr', None)) + safe_terminate(proc, timeout=0.5) + unregister_process(proc) + _join_thread(attempt_decoder_thread, timeout_s=0.35) + _join_thread(attempt_stderr_thread, timeout_s=0.35) - stop_event = threading.Event() - control_queue: queue.Queue = queue.Queue(maxsize=16) + attempt_errors: list[str] = [] + full_cmd = '' - def monitor_stderr() -> None: - if not rtl_process or rtl_process.stderr is None: - return - for line in rtl_process.stderr: - if stop_event.is_set(): + for attempt_index, attempt in enumerate(command_attempts, start=1): + use_direct_sampling = bool(attempt.get('use_direct_sampling', False)) + force_squelch_off = bool(attempt.get('force_squelch_off', True)) + + rtl_cmd = _build_rtl_cmd( + use_direct_sampling=use_direct_sampling, + force_squelch_off=force_squelch_off, + ) + full_cmd = ' '.join(rtl_cmd) + logger.info(f'Morse decoder attempt {attempt_index}/{len(command_attempts)}: {full_cmd}') + + with contextlib.suppress(queue.Full): + app_module.morse_queue.put_nowait({ + 'type': 'info', + 'text': f'[cmd] {full_cmd}', + }) + + rtl_process = subprocess.Popen( + rtl_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + register_process(rtl_process) + + stop_event = threading.Event() + control_queue = queue.Queue(maxsize=16) + pcm_ready_event = threading.Event() + + def monitor_stderr( + proc: subprocess.Popen = rtl_process, + proc_stop_event: threading.Event = stop_event, + ) -> None: + if proc.stderr is None: + return + for line in proc.stderr: + if proc_stop_event.is_set(): + break + err_text = line.decode('utf-8', errors='replace').strip() + if err_text: + logger.debug(f'[rtl_fm/morse] {err_text}') + with contextlib.suppress(queue.Full): + app_module.morse_queue.put_nowait({ + 'type': 'info', + 'text': f'[rtl_fm] {err_text}', + }) + + stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr') + stderr_thread.start() + + decoder_thread = threading.Thread( + target=morse_decoder_thread, + args=( + rtl_process.stdout, + app_module.morse_queue, + stop_event, + sample_rate, + tone_freq, + wpm, + ), + kwargs={ + 'decoder_config': runtime_config, + 'control_queue': control_queue, + 'pcm_ready_event': pcm_ready_event, + }, + daemon=True, + name='morse-decoder', + ) + decoder_thread.start() + + startup_deadline = time.monotonic() + 2.5 + startup_ok = False + startup_error = '' + + while time.monotonic() < startup_deadline: + if pcm_ready_event.is_set(): + startup_ok = True break - err_text = line.decode('utf-8', errors='replace').strip() - if err_text: - logger.debug(f'[rtl_fm/morse] {err_text}') - with contextlib.suppress(queue.Full): - app_module.morse_queue.put_nowait({ - 'type': 'info', - 'text': f'[rtl_fm] {err_text}', - }) + if rtl_process.poll() is not None: + startup_error = f'rtl_fm exited during startup (code {rtl_process.returncode})' + break + time.sleep(0.05) - stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr') - stderr_thread.start() + if startup_ok: + runtime_config['direct_sampling'] = 2 if use_direct_sampling else 0 + runtime_config['force_squelch_off'] = force_squelch_off + break - decoder_thread = threading.Thread( - target=morse_decoder_thread, - args=( - rtl_process.stdout, - app_module.morse_queue, + if not startup_error: + startup_error = 'No PCM samples received within startup timeout' + + attempt_errors.append( + f'attempt {attempt_index}/{len(command_attempts)} ' + f'(direct={int(use_direct_sampling)} squelch_forced={int(force_squelch_off)}): {startup_error}' + ) + logger.warning(f'Morse startup attempt failed: {attempt_errors[-1]}') + + with contextlib.suppress(queue.Full): + app_module.morse_queue.put_nowait({ + 'type': 'info', + 'text': f'[morse] startup attempt failed: {startup_error}', + }) + + _cleanup_attempt( + rtl_process, stop_event, - sample_rate, - tone_freq, - wpm, - ), - kwargs={ - 'decoder_config': runtime_config, - 'control_queue': control_queue, - }, - daemon=True, - name='morse-decoder', - ) - decoder_thread.start() + control_queue, + decoder_thread, + stderr_thread, + ) + rtl_process = None + stop_event = None + control_queue = None + decoder_thread = None + stderr_thread = None - # Detect immediate startup failure (e.g. device busy, no device) - time.sleep(0.30) - if rtl_process.poll() is not None: - stop_event.set() - stderr_text = '' - try: - if rtl_process.stderr: - stderr_text = rtl_process.stderr.read().decode('utf-8', errors='replace').strip() - except Exception: - stderr_text = '' - msg = stderr_text or f'rtl_fm exited immediately (code {rtl_process.returncode})' + if rtl_process is None or stop_event is None or control_queue is None or decoder_thread is None or stderr_thread is None: + msg = 'rtl_fm started but no PCM stream was received.' + if attempt_errors: + msg = msg + ' ' + ' | '.join(attempt_errors[-2:]) logger.error(f'Morse rtl_fm startup failed: {msg}') - safe_terminate(rtl_process, timeout=0.4) - unregister_process(rtl_process) - _join_thread(decoder_thread, timeout_s=0.25) - _join_thread(stderr_thread, timeout_s=0.25) with app_module.morse_lock: if morse_active_device is not None: app_module.release_sdr_device(morse_active_device) diff --git a/tests/test_morse.py b/tests/test_morse.py index 1bd7596..3a9dac2 100644 --- a/tests/test_morse.py +++ b/tests/test_morse.py @@ -317,6 +317,77 @@ class TestMorseLifecycleRoutes: assert final_status['state'] == 'idle' assert 0 in released_devices + def test_start_retries_after_early_process_exit(self, client, monkeypatch): + _login_session(client) + self._reset_route_state() + + released_devices = [] + + monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None) + monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx)) + + class DummyDevice: + sdr_type = morse_routes.SDRType.RTL_SDR + + class DummyBuilder: + def build_fm_demod_command(self, **kwargs): + cmd = ['rtl_fm', '-f', '14.060M', '-M', 'usb', '-s', '22050'] + if kwargs.get('direct_sampling') == 2: + cmd.extend(['-E', 'direct2']) + cmd.append('-') + return cmd + + monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice())) + monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder())) + + pcm = generate_morse_audio('E', wpm=15, sample_rate=22050) + popen_cmds = [] + + class FakeProc: + def __init__(self, stdout_bytes: bytes, returncode: int | None): + self.stdout = io.BytesIO(stdout_bytes) + self.stderr = io.BytesIO(b'') + self.returncode = returncode + + def poll(self): + return self.returncode + + def fake_popen(cmd, *args, **kwargs): + popen_cmds.append(cmd) + if len(popen_cmds) == 1: + return FakeProc(b'', 1) + return FakeProc(pcm, None) + + monkeypatch.setattr(morse_routes.subprocess, 'Popen', fake_popen) + monkeypatch.setattr(morse_routes, 'register_process', lambda _proc: None) + monkeypatch.setattr(morse_routes, 'unregister_process', lambda _proc: None) + monkeypatch.setattr( + morse_routes, + 'safe_terminate', + lambda proc, timeout=0.0: setattr(proc, 'returncode', 0), + ) + + start_resp = client.post('/morse/start', json={ + 'frequency': '14.060', + 'gain': '20', + 'ppm': '0', + 'device': '0', + 'tone_freq': '700', + 'wpm': '15', + }) + assert start_resp.status_code == 200 + assert start_resp.get_json()['status'] == 'started' + assert len(popen_cmds) >= 2 + assert '-E' in popen_cmds[0] and 'direct2' in popen_cmds[0] + assert '-l' in popen_cmds[0] + assert '-E' in popen_cmds[1] and 'direct2' in popen_cmds[1] + assert '-l' not in popen_cmds[1] + + stop_resp = client.post('/morse/stop') + assert stop_resp.status_code == 200 + assert stop_resp.get_json()['status'] == 'stopped' + assert 0 in released_devices + # --------------------------------------------------------------------------- # Integration: synthetic CW -> WAV decode diff --git a/utils/morse.py b/utils/morse.py index 497187a..c7dd8d5 100644 --- a/utils/morse.py +++ b/utils/morse.py @@ -734,6 +734,7 @@ def morse_decoder_thread( wpm: int = 15, decoder_config: dict[str, Any] | None = None, control_queue: queue.Queue | None = None, + pcm_ready_event: threading.Event | None = None, ) -> None: """Decode Morse from live PCM stream and push events to *output_queue*.""" import logging @@ -856,6 +857,8 @@ def morse_decoder_thread( if not first_pcm_logged: first_pcm_logged = True + if pcm_ready_event is not None: + pcm_ready_event.set() with contextlib.suppress(queue.Full): output_queue.put_nowait({ 'type': 'info',