From 794dd693cf75ad1c4d9e79fb8fca05fba31f7e54 Mon Sep 17 00:00:00 2001 From: Smittix Date: Thu, 26 Feb 2026 17:38:22 +0000 Subject: [PATCH] morse: auto-fallback to alternate SDR device on no-PCM startup --- README.md | 1 + routes/morse.py | 452 ++++++++++++++++++++++++++------------------ tests/test_morse.py | 123 ++++++++++++ 3 files changed, 394 insertions(+), 182 deletions(-) diff --git a/README.md b/README.md index a127e17..be8a5a4 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Auto Tone Track behavior: Troubleshooting (no decode / noisy decode): - Confirm demod path is **USB/CW-compatible** and frequency is tuned correctly. +- If multiple SDRs are connected and the selected one has no PCM output, Morse startup now auto-tries other detected SDR devices and reports the active device/serial in status logs. - Match **tone** and **bandwidth** to the actual sidetone/pitch. - Try **Threshold Auto** first; if needed, switch to manual threshold and recalibrate. - Use **Reset/Calibrate** after major frequency or band condition changes. diff --git a/routes/morse.py b/routes/morse.py index fd22cf5..0277d3f 100644 --- a/routes/morse.py +++ b/routes/morse.py @@ -565,8 +565,29 @@ def start_morse() -> Response: except ValueError: sdr_type = SDRType.RTL_SDR - sdr_device = SDRFactory.create_default_device(sdr_type, index=device) - builder = SDRFactory.get_builder(sdr_device.sdr_type) + requested_device_index = int(device) + active_device_index = requested_device_index + builder = SDRFactory.get_builder(sdr_type) + + device_catalog: dict[int, dict[str, str]] = {} + candidate_device_indices: list[int] = [requested_device_index] + with contextlib.suppress(Exception): + detected_devices = SDRFactory.detect_devices() + same_type_devices = [d for d in detected_devices if d.sdr_type == sdr_type] + for d in same_type_devices: + device_catalog[d.index] = { + 'name': str(d.name or f'SDR {d.index}'), + 'serial': str(d.serial or 'Unknown'), + } + for d in sorted(same_type_devices, key=lambda dev: dev.index): + if d.index not in candidate_device_indices: + candidate_device_indices.append(d.index) + + def _device_label(device_index: int) -> str: + meta = device_catalog.get(device_index, {}) + serial = str(meta.get('serial') or 'Unknown') + name = str(meta.get('name') or f'SDR {device_index}') + return f'device {device_index} ({name}, SN: {serial})' multimon_path = get_tool_path('multimon-ng') if not multimon_path: @@ -582,7 +603,8 @@ def start_morse() -> Response: multimon_cmd = [multimon_path, '-t', 'raw', '-a', 'MORSE_CW', '-f', 'alpha', '-'] - def _build_rtl_cmd(direct_sampling_mode: int | None) -> list[str]: + def _build_rtl_cmd(device_index: int, direct_sampling_mode: int | None) -> list[str]: + sdr_device = SDRFactory.create_default_device(sdr_type, index=device_index) fm_kwargs: dict[str, Any] = { 'device': sdr_device, 'frequency_mhz': freq, @@ -601,7 +623,7 @@ def start_morse() -> Response: cmd.append('-') return cmd - can_try_direct_sampling = bool(sdr_device.sdr_type == SDRType.RTL_SDR and float(freq) < 24.0) + can_try_direct_sampling = bool(sdr_type == SDRType.RTL_SDR and float(freq) < 24.0) direct_sampling_attempts: list[int | None] = [2, 1, None] if can_try_direct_sampling else [None] runtime_config: dict[str, Any] = { @@ -619,6 +641,10 @@ def start_morse() -> Response: 'wpm_lock': wpm_lock, 'min_signal_gate': min_signal_gate, 'source': 'rtl_fm', + 'requested_device': requested_device_index, + 'active_device': active_device_index, + 'device_serial': str(device_catalog.get(active_device_index, {}).get('serial') or 'Unknown'), + 'candidate_devices': list(candidate_device_indices), } active_rtl_process: subprocess.Popen[bytes] | None = None @@ -679,196 +705,255 @@ def start_morse() -> Response: attempt_errors: list[str] = [] try: - for attempt_index, direct_sampling_mode in enumerate(direct_sampling_attempts, start=1): - rtl_process = None - multimon_process = None - stop_event = None - control_queue = None - decoder_thread = None - stderr_thread = None - relay_thread = None - master_fd = None + startup_succeeded = False + for device_pos, candidate_device_index in enumerate(candidate_device_indices, start=1): + if candidate_device_index != active_device_index: + prev_device = active_device_index + claim_error = app_module.claim_sdr_device(candidate_device_index, 'morse') + if claim_error: + msg = f'{_device_label(candidate_device_index)} unavailable: {claim_error}' + attempt_errors.append(msg) + logger.warning('Morse startup device fallback skipped: %s', msg) + _queue_morse_event({'type': 'info', 'text': f'[morse] {msg}'}) + continue + + if prev_device is not None: + app_module.release_sdr_device(prev_device) + active_device_index = candidate_device_index + with app_module.morse_lock: + morse_active_device = active_device_index + + _queue_morse_event({ + 'type': 'info', + 'text': ( + f'[morse] switching to {_device_label(active_device_index)} ' + f'({device_pos}/{len(candidate_device_indices)})' + ), + }) + + runtime_config['active_device'] = active_device_index + runtime_config['device_serial'] = str( + device_catalog.get(active_device_index, {}).get('serial') or 'Unknown' + ) runtime_config.pop('startup_waiting', None) runtime_config.pop('startup_warning', None) - rtl_cmd = _build_rtl_cmd(direct_sampling_mode) - direct_mode_label = direct_sampling_mode if direct_sampling_mode is not None else 'none' - full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd) - logger.info( - 'Morse decoder attempt %s/%s (source=rtl_fm direct_mode=%s): %s', - attempt_index, - len(direct_sampling_attempts), - direct_mode_label, - full_cmd, - ) - _queue_morse_event({'type': 'info', 'text': f'[cmd] {full_cmd}'}) + for attempt_index, direct_sampling_mode in enumerate(direct_sampling_attempts, start=1): + rtl_process = None + multimon_process = None + stop_event = None + control_queue = None + decoder_thread = None + stderr_thread = None + relay_thread = None + master_fd = None - rtl_process = subprocess.Popen( - rtl_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - bufsize=0, - ) - register_process(rtl_process) - - stop_event = threading.Event() - control_queue = queue.Queue(maxsize=16) - pcm_ready_event = threading.Event() - stderr_lines: list[str] = [] - - def monitor_stderr( - proc: subprocess.Popen[bytes] = rtl_process, - proc_stop_event: threading.Event = stop_event, - capture_lines: list[str] = stderr_lines, - ) -> None: - stderr_stream = proc.stderr - if stderr_stream is None: - return - try: - while not proc_stop_event.is_set(): - line = stderr_stream.readline() - if not line: - if proc.poll() is not None: - break - time.sleep(0.02) - continue - err_text = line.decode('utf-8', errors='replace').strip() - if not err_text: - continue - if len(capture_lines) >= 40: - del capture_lines[:10] - capture_lines.append(err_text) - _queue_morse_event({'type': 'info', 'text': f'[rtl_fm] {err_text}'}) - except (ValueError, OSError): - return - except Exception: - return - - stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr') - stderr_thread.start() - - master_fd, slave_fd = pty.openpty() - try: - multimon_process = subprocess.Popen( - multimon_cmd, - stdin=subprocess.PIPE, - stdout=slave_fd, - stderr=slave_fd, - close_fds=True, + rtl_cmd = _build_rtl_cmd(active_device_index, direct_sampling_mode) + direct_mode_label = direct_sampling_mode if direct_sampling_mode is not None else 'none' + full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd) + logger.info( + 'Morse decoder attempt device=%s (%s/%s) direct_mode=%s (%s/%s): %s', + active_device_index, + device_pos, + len(candidate_device_indices), + direct_mode_label, + attempt_index, + len(direct_sampling_attempts), + full_cmd, ) - finally: - with contextlib.suppress(OSError): - os.close(slave_fd) - register_process(multimon_process) + _queue_morse_event({'type': 'info', 'text': f'[cmd] {full_cmd}'}) - if rtl_process.stdout is None: - raise RuntimeError('rtl_fm stdout unavailable') - if multimon_process.stdin is None: - raise RuntimeError('multimon-ng stdin unavailable') + rtl_process = subprocess.Popen( + rtl_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + register_process(rtl_process) - relay_thread = threading.Thread( - target=_morse_audio_relay_thread, - args=( - rtl_process.stdout, - multimon_process.stdin, - app_module.morse_queue, + stop_event = threading.Event() + control_queue = queue.Queue(maxsize=16) + pcm_ready_event = threading.Event() + stderr_lines: list[str] = [] + + def monitor_stderr( + proc: subprocess.Popen[bytes] = rtl_process, + proc_stop_event: threading.Event = stop_event, + capture_lines: list[str] = stderr_lines, + ) -> None: + stderr_stream = proc.stderr + if stderr_stream is None: + return + try: + while not proc_stop_event.is_set(): + line = stderr_stream.readline() + if not line: + if proc.poll() is not None: + break + time.sleep(0.02) + continue + err_text = line.decode('utf-8', errors='replace').strip() + if not err_text: + continue + if len(capture_lines) >= 40: + del capture_lines[:10] + capture_lines.append(err_text) + _queue_morse_event({'type': 'info', 'text': f'[rtl_fm] {err_text}'}) + except (ValueError, OSError): + return + except Exception: + return + + stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr') + stderr_thread.start() + + master_fd, slave_fd = pty.openpty() + try: + multimon_process = subprocess.Popen( + multimon_cmd, + stdin=subprocess.PIPE, + stdout=slave_fd, + stderr=slave_fd, + close_fds=True, + ) + finally: + with contextlib.suppress(OSError): + os.close(slave_fd) + register_process(multimon_process) + + if rtl_process.stdout is None: + raise RuntimeError('rtl_fm stdout unavailable') + if multimon_process.stdin is None: + raise RuntimeError('multimon-ng stdin unavailable') + + relay_thread = threading.Thread( + target=_morse_audio_relay_thread, + args=( + rtl_process.stdout, + multimon_process.stdin, + app_module.morse_queue, + stop_event, + control_queue, + runtime_config, + pcm_ready_event, + ), + daemon=True, + name='morse-relay', + ) + relay_thread.start() + + decoder_thread = threading.Thread( + target=_morse_multimon_output_thread, + args=(master_fd, multimon_process, stop_event), + daemon=True, + name='morse-decoder', + ) + decoder_thread.start() + + startup_deadline = time.monotonic() + 4.0 + startup_ok = False + startup_error = '' + + while time.monotonic() < startup_deadline: + if pcm_ready_event.is_set(): + startup_ok = True + break + if rtl_process.poll() is not None: + startup_error = f'rtl_fm exited during startup (code {rtl_process.returncode})' + break + if multimon_process.poll() is not None: + startup_error = f'multimon-ng exited during startup (code {multimon_process.returncode})' + break + time.sleep(0.05) + + if not startup_ok: + if not startup_error: + startup_error = 'No PCM samples received within startup timeout' + if stderr_lines: + startup_error = f'{startup_error}; stderr: {stderr_lines[-1]}' + is_last_device = device_pos == len(candidate_device_indices) + is_last_attempt = attempt_index == len(direct_sampling_attempts) + if ( + is_last_device + and is_last_attempt + and rtl_process.poll() is None + and multimon_process.poll() is None + ): + startup_ok = True + runtime_config['startup_waiting'] = True + runtime_config['startup_warning'] = startup_error + logger.warning( + 'Morse startup continuing without PCM on %s: %s', + _device_label(active_device_index), + startup_error, + ) + _queue_morse_event({ + 'type': 'info', + 'text': '[morse] waiting for PCM stream...', + }) + + if startup_ok: + runtime_config['direct_sampling_mode'] = direct_sampling_mode + runtime_config['direct_sampling'] = ( + int(direct_sampling_mode) if direct_sampling_mode is not None else 0 + ) + runtime_config['command'] = full_cmd + runtime_config['active_device'] = active_device_index + + active_rtl_process = rtl_process + active_multimon_process = multimon_process + active_stop_event = stop_event + active_control_queue = control_queue + active_decoder_thread = decoder_thread + active_stderr_thread = stderr_thread + active_relay_thread = relay_thread + active_master_fd = master_fd + startup_succeeded = True + break + + attempt_errors.append( + f'{_device_label(active_device_index)} ' + f'attempt {attempt_index}/{len(direct_sampling_attempts)} ' + f'(source=rtl_fm direct_mode={direct_mode_label}): {startup_error}' + ) + logger.warning('Morse startup attempt failed: %s', attempt_errors[-1]) + _queue_morse_event({'type': 'info', 'text': f'[morse] startup attempt failed: {startup_error}'}) + + _cleanup_attempt( + rtl_process, + multimon_process, stop_event, control_queue, - runtime_config, - pcm_ready_event, - ), - daemon=True, - name='morse-relay', - ) - relay_thread.start() - - decoder_thread = threading.Thread( - target=_morse_multimon_output_thread, - args=(master_fd, multimon_process, stop_event), - daemon=True, - name='morse-decoder', - ) - decoder_thread.start() - - startup_deadline = time.monotonic() + 4.0 - startup_ok = False - startup_error = '' - - while time.monotonic() < startup_deadline: - if pcm_ready_event.is_set(): - startup_ok = True - break - if rtl_process.poll() is not None: - startup_error = f'rtl_fm exited during startup (code {rtl_process.returncode})' - break - if multimon_process.poll() is not None: - startup_error = f'multimon-ng exited during startup (code {multimon_process.returncode})' - break - time.sleep(0.05) - - if not startup_ok: - if not startup_error: - startup_error = 'No PCM samples received within startup timeout' - if stderr_lines: - startup_error = f'{startup_error}; stderr: {stderr_lines[-1]}' - is_last_attempt = attempt_index == len(direct_sampling_attempts) - if is_last_attempt and rtl_process.poll() is None and multimon_process.poll() is None: - startup_ok = True - runtime_config['startup_waiting'] = True - runtime_config['startup_warning'] = startup_error - logger.warning( - 'Morse startup continuing without PCM (attempt %s/%s): %s', - attempt_index, - len(direct_sampling_attempts), - startup_error, - ) - _queue_morse_event({ - 'type': 'info', - 'text': '[morse] waiting for PCM stream...', - }) - - if startup_ok: - runtime_config['direct_sampling_mode'] = direct_sampling_mode - runtime_config['direct_sampling'] = ( - int(direct_sampling_mode) if direct_sampling_mode is not None else 0 + decoder_thread, + stderr_thread, + relay_thread, + master_fd, ) - runtime_config['command'] = full_cmd + rtl_process = None + multimon_process = None + stop_event = None + control_queue = None + decoder_thread = None + stderr_thread = None + relay_thread = None + master_fd = None - active_rtl_process = rtl_process - active_multimon_process = multimon_process - active_stop_event = stop_event - active_control_queue = control_queue - active_decoder_thread = decoder_thread - active_stderr_thread = stderr_thread - active_relay_thread = relay_thread - active_master_fd = master_fd + if startup_succeeded: break - attempt_errors.append( - f'attempt {attempt_index}/{len(direct_sampling_attempts)} ' - f'(source=rtl_fm direct_mode={direct_mode_label}): {startup_error}' - ) - logger.warning('Morse startup attempt failed: %s', attempt_errors[-1]) - _queue_morse_event({'type': 'info', 'text': f'[morse] startup attempt failed: {startup_error}'}) - - _cleanup_attempt( - rtl_process, - multimon_process, - stop_event, - control_queue, - decoder_thread, - stderr_thread, - relay_thread, - master_fd, - ) - rtl_process = None - multimon_process = None - stop_event = None - control_queue = None - decoder_thread = None - stderr_thread = None - relay_thread = None - master_fd = None + if device_pos < len(candidate_device_indices): + next_device = candidate_device_indices[device_pos] + _queue_morse_event({ + 'type': 'status', + 'state': MORSE_STARTING, + 'status': MORSE_STARTING, + 'message': ( + f'No PCM on {_device_label(active_device_index)}. ' + f'Trying {_device_label(next_device)}...' + ), + 'session_id': morse_session_id, + 'timestamp': time.strftime('%H:%M:%S'), + }) if ( active_rtl_process is None @@ -880,7 +965,10 @@ def start_morse() -> Response: or active_relay_thread is None or active_master_fd is None ): - msg = 'SDR capture started but no PCM stream was received.' + msg = ( + f'SDR capture started but no PCM stream was received from ' + f'{_device_label(active_device_index)}.' + ) if attempt_errors: msg += ' ' + ' | '.join(attempt_errors) logger.error('Morse startup failed: %s', msg) diff --git a/tests/test_morse.py b/tests/test_morse.py index 43fabdf..20fcd46 100644 --- a/tests/test_morse.py +++ b/tests/test_morse.py @@ -269,6 +269,7 @@ class TestMorseLifecycleRoutes: monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice())) monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder())) + monkeypatch.setattr(morse_routes.SDRFactory, 'detect_devices', staticmethod(lambda: [])) monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng') pcm = generate_morse_audio('E', wpm=15, sample_rate=22050) @@ -373,6 +374,7 @@ class TestMorseLifecycleRoutes: monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice())) monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder())) + monkeypatch.setattr(morse_routes.SDRFactory, 'detect_devices', staticmethod(lambda: [])) monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng') pcm = generate_morse_audio('E', wpm=15, sample_rate=22050) @@ -455,6 +457,127 @@ class TestMorseLifecycleRoutes: assert stop_resp.get_json()['status'] == 'stopped' assert 0 in released_devices + def test_start_falls_back_to_next_device_when_selected_device_has_no_pcm(self, client, monkeypatch): + _login_session(client) + self._reset_route_state() + + released_devices = [] + + monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None) + monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx)) + monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng') + + class DummyDevice: + def __init__(self, index: int): + self.sdr_type = morse_routes.SDRType.RTL_SDR + self.index = index + + class DummyDetected: + def __init__(self, index: int, serial: str): + self.sdr_type = morse_routes.SDRType.RTL_SDR + self.index = index + self.name = f'RTL {index}' + self.serial = serial + + class DummyBuilder: + def build_fm_demod_command(self, **kwargs): + cmd = ['rtl_fm', '-d', str(kwargs['device'].index), '-f', '14.060M', '-M', 'usb', '-s', '22050'] + if kwargs.get('direct_sampling') is not None: + cmd.extend(['--direct', str(kwargs['direct_sampling'])]) + cmd.append('-') + return cmd + + monkeypatch.setattr( + morse_routes.SDRFactory, + 'create_default_device', + staticmethod(lambda sdr_type, index: DummyDevice(int(index))), + ) + monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder())) + monkeypatch.setattr( + morse_routes.SDRFactory, + 'detect_devices', + staticmethod(lambda: [DummyDetected(0, 'AAA00000'), DummyDetected(1, 'BBB11111')]), + ) + + pcm = generate_morse_audio('E', wpm=15, sample_rate=22050) + + class FakeRtlProc: + def __init__(self, stdout_bytes: bytes, returncode: int | None): + self.stdout = io.BytesIO(stdout_bytes) + self.stderr = io.BytesIO(b'') + self.returncode = returncode + + def poll(self): + return self.returncode + + def terminate(self): + self.returncode = 0 + + def wait(self, timeout=None): + self.returncode = 0 + return 0 + + def kill(self): + self.returncode = -9 + + class FakeMultimonProc: + def __init__(self): + self.stdin = io.BytesIO() + self.returncode = None + + def poll(self): + return self.returncode + + def terminate(self): + self.returncode = 0 + + def wait(self, timeout=None): + self.returncode = 0 + return 0 + + def kill(self): + self.returncode = -9 + + def fake_popen(cmd, *args, **kwargs): + if 'multimon' in str(cmd[0]): + return FakeMultimonProc() + try: + dev = int(cmd[cmd.index('-d') + 1]) + except Exception: + dev = 0 + if dev == 0: + return FakeRtlProc(b'', 1) + return FakeRtlProc(pcm, None) + + monkeypatch.setattr(morse_routes.subprocess, 'Popen', fake_popen) + monkeypatch.setattr(morse_routes, 'register_process', lambda _proc: None) + monkeypatch.setattr(morse_routes, 'unregister_process', lambda _proc: None) + monkeypatch.setattr( + morse_routes, + 'safe_terminate', + lambda proc, timeout=0.0: setattr(proc, 'returncode', 0), + ) + + start_resp = client.post('/morse/start', json={ + 'frequency': '14.060', + 'gain': '20', + 'ppm': '0', + 'device': '0', + 'tone_freq': '700', + 'wpm': '15', + }) + assert start_resp.status_code == 200 + start_data = start_resp.get_json() + assert start_data['status'] == 'started' + assert start_data['config']['active_device'] == 1 + assert start_data['config']['device_serial'] == 'BBB11111' + assert 0 in released_devices + + stop_resp = client.post('/morse/stop') + assert stop_resp.status_code == 200 + assert stop_resp.get_json()['status'] == 'stopped' + assert 1 in released_devices + # --------------------------------------------------------------------------- # Integration: synthetic CW -> WAV decode