diff --git a/routes/morse.py b/routes/morse.py index c1fa79b..83338c2 100644 --- a/routes/morse.py +++ b/routes/morse.py @@ -3,12 +3,7 @@ from __future__ import annotations import contextlib -import math -import os -import pty import queue -import re -import struct import subprocess import tempfile import threading @@ -20,10 +15,10 @@ from flask import Blueprint, Response, jsonify, request import app as app_module from utils.event_pipeline import process_event -from utils.dependencies import get_tool_path from utils.logging import sensor_logger as logger from utils.morse import ( decode_morse_wav_file, + morse_decoder_thread, ) from utils.process import register_process, safe_terminate, unregister_process from utils.sdr import SDRFactory, SDRType @@ -56,13 +51,9 @@ morse_session_id = 0 morse_decoder_worker: threading.Thread | None = None morse_stderr_worker: threading.Thread | None = None -morse_relay_worker: threading.Thread | None = None morse_stop_event: threading.Event | None = None morse_control_queue: queue.Queue | None = None -MORSE_LINE_RE = re.compile(r'^\s*(?:MORSE(?:_CW)?(?:\([^)]*\))?)\s*:\s*(.*)$', re.IGNORECASE) - - def _set_state(state: str, message: str = '', *, enqueue: bool = True, extra: dict[str, Any] | None = None) -> None: """Update lifecycle state and optionally emit a status queue event.""" global morse_state, morse_state_message, morse_state_since @@ -114,303 +105,6 @@ def _queue_morse_event(payload: dict[str, Any]) -> None: app_module.morse_queue.put_nowait(payload) -def _resolve_multimon_morse_modes(multimon_path: str) -> list[str]: - preferred = ['MORSE_CW', 'MORSE'] - discovered: list[str] = [] - try: - result = subprocess.run( - [multimon_path, '-h'], - capture_output=True, - text=True, - timeout=2, - check=False, - ) - blob = f'{result.stdout}\n{result.stderr}'.upper() - for mode in preferred: - if mode in blob and mode not in discovered: - discovered.append(mode) - except Exception: - pass - - if not discovered: - return preferred - return discovered - - -def _parse_multimon_morse_text(line: str) -> str | None: - cleaned = str(line or '').strip() - if not cleaned: - return None - - matched = MORSE_LINE_RE.match(cleaned) - if matched: - return matched.group(1).strip() - - lower = cleaned.lower() - if lower.startswith(('multimon-ng', 'available demodulators', 'enabled demodulators')): - return None - - if ':' in cleaned: - label, payload = cleaned.split(':', 1) - if 'morse' in label.upper(): - return payload.strip() - return None - - if len(cleaned) <= 128 and re.fullmatch(r"[A-Za-z0-9 /.,'!?+\-]+", cleaned): - return cleaned - - return None - - -def _emit_decoded_text(text: str) -> None: - filtered = ''.join(ch for ch in str(text or '') if ch == ' ' or 32 <= ord(ch) <= 126) - if not filtered: - return - - timestamp = time.strftime('%H:%M:%S') - for ch in filtered: - if ch.isspace(): - _queue_morse_event({ - 'type': 'morse_space', - 'timestamp': timestamp, - }) - else: - _queue_morse_event({ - 'type': 'morse_char', - 'char': ch, - 'morse': '', - 'timestamp': timestamp, - }) - - -def _compress_amplitudes(samples: tuple[int, ...], bins: int = 96) -> list[int]: - if not samples: - return [] - - step = max(1, len(samples) // bins) - out: list[int] = [] - for idx in range(0, len(samples), step): - if len(out) >= bins: - break - chunk = samples[idx:idx + step] - if not chunk: - continue - out.append(int(sum(abs(v) for v in chunk) / len(chunk))) - return out - - -def _morse_audio_relay_thread( - rtl_stdout: Any, - multimon_stdin: Any, - output_queue: queue.Queue, - stop_event: threading.Event, - control_queue: queue.Queue | None, - runtime_config: dict[str, Any], - pcm_ready_event: threading.Event, -) -> None: - chunk_bytes = 4096 - scope_interval = 0.1 - waiting_threshold = 0.7 - - tone_freq = _float_value(runtime_config.get('tone_freq'), 700.0) - wpm = _float_value(runtime_config.get('wpm'), 15.0) - threshold_mode = str(runtime_config.get('threshold_mode', 'auto')).strip().lower() - manual_threshold = _float_value(runtime_config.get('manual_threshold'), 0.0) - threshold_multiplier = _float_value(runtime_config.get('threshold_multiplier'), 2.8) - threshold_offset = _float_value(runtime_config.get('threshold_offset'), 0.0) - signal_gate = _float_value(runtime_config.get('min_signal_gate'), 0.0) - - last_scope_emit = 0.0 - last_pcm_at = 0.0 - noise_floor = 0.0 - threshold = manual_threshold if threshold_mode == 'manual' else 0.0 - pcm_announced = False - - fd: int | None = None - non_blocking = False - if rtl_stdout is not None: - with contextlib.suppress(Exception): - fd = rtl_stdout.fileno() - if fd is not None: - with contextlib.suppress(Exception): - os.set_blocking(fd, False) - non_blocking = True - - try: - while not stop_event.is_set(): - if control_queue is not None: - while True: - try: - control_msg = control_queue.get_nowait() - except queue.Empty: - break - - cmd = str(control_msg.get('cmd', '')).strip().lower() - if cmd == 'shutdown': - stop_event.set() - break - if cmd == 'reset': - noise_floor = 0.0 - threshold = manual_threshold if threshold_mode == 'manual' else 0.0 - _queue_morse_event({ - 'type': 'info', - 'text': '[morse] Calibration reset applied', - }) - if stop_event.is_set(): - break - - payload: bytes | None = None - if non_blocking and fd is not None: - try: - payload = os.read(fd, chunk_bytes) - except BlockingIOError: - payload = None - except OSError: - payload = b'' - else: - try: - payload = rtl_stdout.read(chunk_bytes) - except Exception: - payload = b'' - - now = time.monotonic() - - if payload is None: - if now - last_scope_emit >= scope_interval: - last_scope_emit = now - waiting = (last_pcm_at <= 0.0) or ((now - last_pcm_at) >= waiting_threshold) - with contextlib.suppress(queue.Full): - output_queue.put_nowait({ - 'type': 'scope', - 'waiting': waiting, - 'amplitudes': [], - 'tone_on': False, - 'level': 0.0, - 'threshold': round(threshold, 4), - 'noise_floor': round(noise_floor, 4), - 'tone_freq': tone_freq, - 'wpm': wpm, - }) - time.sleep(0.02) - continue - - if not payload: - break - - last_pcm_at = now - pcm_ready_event.set() - if not pcm_announced: - pcm_announced = True - _queue_morse_event({'type': 'info', 'text': '[morse] PCM stream active'}) - - try: - multimon_stdin.write(payload) - multimon_stdin.flush() - except (BrokenPipeError, OSError): - break - - sample_count = len(payload) // 2 - if sample_count <= 0: - continue - try: - samples = struct.unpack(f'<{sample_count}h', payload[:sample_count * 2]) - except struct.error: - continue - - amplitudes = _compress_amplitudes(samples) - rms = math.sqrt(sum(s * s for s in samples) / sample_count) / 32768.0 - level = max(0.0, min(1.0, rms)) - - if noise_floor <= 0.0: - noise_floor = level - elif level <= noise_floor: - noise_floor = (noise_floor * 0.9) + (level * 0.1) - else: - noise_floor = (noise_floor * 0.995) + (level * 0.005) - - if threshold_mode == 'manual': - threshold = manual_threshold - else: - threshold = max(0.0, (noise_floor * threshold_multiplier) + threshold_offset) - - tone_on = level >= max(signal_gate, threshold) - - if now - last_scope_emit >= scope_interval: - last_scope_emit = now - with contextlib.suppress(queue.Full): - output_queue.put_nowait({ - 'type': 'scope', - 'waiting': False, - 'amplitudes': amplitudes, - 'tone_on': tone_on, - 'level': round(level, 4), - 'threshold': round(threshold, 4), - 'noise_floor': round(noise_floor, 4), - 'tone_freq': tone_freq, - 'wpm': wpm, - }) - except Exception as exc: - logger.warning('Morse audio relay error: %s', exc) - _queue_morse_event({'type': 'error', 'text': f'morse relay error: {exc}'}) - finally: - _close_pipe(multimon_stdin) - - -def _morse_multimon_output_thread( - master_fd: int, - process: subprocess.Popen[bytes], - stop_event: threading.Event, -) -> None: - buffer = '' - with contextlib.suppress(Exception): - os.set_blocking(master_fd, False) - try: - while not stop_event.is_set(): - raw: bytes | None = None - try: - raw = os.read(master_fd, 2048) - except BlockingIOError: - raw = None - except OSError: - break - - if raw is None: - if process.poll() is not None: - break - time.sleep(0.02) - continue - - if not raw: - if process.poll() is not None: - break - time.sleep(0.02) - continue - - buffer += raw.decode('utf-8', errors='replace') - while '\n' in buffer: - line, buffer = buffer.split('\n', 1) - line = line.strip() - if not line: - continue - text = _parse_multimon_morse_text(line) - if text is None: - _queue_morse_event({'type': 'info', 'text': f'[multimon] {line}'}) - continue - if text: - _emit_decoded_text(text) - - tail = buffer.strip() - if tail: - tail_text = _parse_multimon_morse_text(tail) - if tail_text: - _emit_decoded_text(tail_text) - except Exception as exc: - _queue_morse_event({'type': 'error', 'text': f'multimon output error: {exc}'}) - finally: - with contextlib.suppress(OSError): - os.close(master_fd) - - def _bool_value(value: Any, default: bool = False) -> bool: if isinstance(value, bool): return value @@ -513,19 +207,14 @@ def _snapshot_live_resources() -> list[str]: alive.append('decoder_thread') if morse_stderr_worker and morse_stderr_worker.is_alive(): alive.append('stderr_thread') - if morse_relay_worker and morse_relay_worker.is_alive(): - alive.append('relay_thread') if app_module.morse_process and app_module.morse_process.poll() is None: - alive.append('multimon_process') - rtl_proc = getattr(app_module.morse_process, '_rtl_process', None) - if rtl_proc is not None and rtl_proc.poll() is None: - alive.append('rtl_process') + alive.append('rtl_process') return alive @morse_bp.route('/morse/start', methods=['POST']) def start_morse() -> Response: - global morse_active_device, morse_decoder_worker, morse_stderr_worker, morse_relay_worker + global morse_active_device, morse_decoder_worker, morse_stderr_worker global morse_stop_event, morse_control_queue, morse_runtime_config global morse_last_error, morse_session_id @@ -612,20 +301,6 @@ def start_morse() -> Response: name = str(meta.get('name') or f'SDR {device_index}') return f'device {device_index} ({name}, SN: {serial})' - multimon_path = get_tool_path('multimon-ng') - if not multimon_path: - msg = 'multimon-ng not found' - with app_module.morse_lock: - if morse_active_device is not None: - app_module.release_sdr_device(morse_active_device) - morse_active_device = None - morse_last_error = msg - _set_state(MORSE_ERROR, msg) - _set_state(MORSE_IDLE, 'Idle') - return jsonify({'status': 'error', 'message': msg}), 400 - - multimon_decoder_modes = _resolve_multimon_morse_modes(multimon_path) - def _build_rtl_cmd(device_index: int, direct_sampling_mode: int | None) -> list[str]: tuned_frequency_mhz = max(0.5, float(freq) - (float(tone_freq) / 1_000_000.0)) sdr_device = SDRFactory.create_default_device(sdr_type, index=device_index) @@ -671,35 +346,25 @@ def start_morse() -> Response: 'active_device': active_device_index, 'device_serial': str(device_catalog.get(active_device_index, {}).get('serial') or 'Unknown'), 'candidate_devices': list(candidate_device_indices), - 'multimon_decoder_modes': list(multimon_decoder_modes), } active_rtl_process: subprocess.Popen[bytes] | None = None - active_multimon_process: subprocess.Popen[bytes] | None = None active_stop_event: threading.Event | None = None active_control_queue: queue.Queue | None = None active_decoder_thread: threading.Thread | None = None active_stderr_thread: threading.Thread | None = None - active_relay_thread: threading.Thread | None = None - active_master_fd: int | None = None rtl_process: subprocess.Popen[bytes] | None = None - multimon_process: subprocess.Popen[bytes] | None = None stop_event: threading.Event | None = None control_queue: queue.Queue | None = None decoder_thread: threading.Thread | None = None stderr_thread: threading.Thread | None = None - relay_thread: threading.Thread | None = None - master_fd: int | None = None def _cleanup_attempt( rtl_proc: subprocess.Popen[bytes] | None, - multimon_proc: subprocess.Popen[bytes] | None, stop_evt: threading.Event | None, control_q: queue.Queue | None, decoder_worker: threading.Thread | None, stderr_worker: threading.Thread | None, - relay_worker: threading.Thread | None, - master_fd: int | None, ) -> None: if stop_evt is not None: stop_evt.set() @@ -707,24 +372,14 @@ def start_morse() -> Response: with contextlib.suppress(queue.Full): control_q.put_nowait({'cmd': 'shutdown'}) - if master_fd is not None: - with contextlib.suppress(OSError): - os.close(master_fd) - if rtl_proc is not None: _close_pipe(getattr(rtl_proc, 'stdout', None)) _close_pipe(getattr(rtl_proc, 'stderr', None)) - if multimon_proc is not None: - _close_pipe(getattr(multimon_proc, 'stdin', None)) if rtl_proc is not None: safe_terminate(rtl_proc, timeout=0.4) unregister_process(rtl_proc) - if multimon_proc is not None: - safe_terminate(multimon_proc, timeout=0.4) - unregister_process(multimon_proc) - _join_thread(relay_worker, timeout_s=0.35) _join_thread(decoder_worker, timeout_s=0.35) _join_thread(stderr_worker, timeout_s=0.35) @@ -733,282 +388,225 @@ def start_morse() -> Response: try: startup_succeeded = False - for decoder_pos, decoder_mode in enumerate(multimon_decoder_modes, start=1): - multimon_cmd = [multimon_path, '-t', 'raw', '-a', decoder_mode, '-f', 'alpha', '-'] - runtime_config['multimon_decoder'] = decoder_mode - if decoder_pos > 1: + for device_pos, candidate_device_index in enumerate(candidate_device_indices, start=1): + if candidate_device_index != active_device_index: + prev_device = active_device_index + claim_error = app_module.claim_sdr_device(candidate_device_index, 'morse') + if claim_error: + msg = f'{_device_label(candidate_device_index)} unavailable: {claim_error}' + attempt_errors.append(msg) + logger.warning('Morse startup device fallback skipped: %s', msg) + _queue_morse_event({'type': 'info', 'text': f'[morse] {msg}'}) + continue + + if prev_device is not None: + app_module.release_sdr_device(prev_device) + active_device_index = candidate_device_index + with app_module.morse_lock: + morse_active_device = active_device_index + _queue_morse_event({ 'type': 'info', - 'text': f'[morse] retrying with multimon decoder {decoder_mode}', + 'text': ( + f'[morse] switching to {_device_label(active_device_index)} ' + f'({device_pos}/{len(candidate_device_indices)})' + ), }) - for device_pos, candidate_device_index in enumerate(candidate_device_indices, start=1): - if candidate_device_index != active_device_index: - prev_device = active_device_index - claim_error = app_module.claim_sdr_device(candidate_device_index, 'morse') - if claim_error: - msg = f'{_device_label(candidate_device_index)} unavailable: {claim_error}' - attempt_errors.append(msg) - logger.warning('Morse startup device fallback skipped: %s', msg) - _queue_morse_event({'type': 'info', 'text': f'[morse] {msg}'}) - continue + runtime_config['active_device'] = active_device_index + runtime_config['device_serial'] = str( + device_catalog.get(active_device_index, {}).get('serial') or 'Unknown' + ) + runtime_config.pop('startup_waiting', None) + runtime_config.pop('startup_warning', None) - if prev_device is not None: - app_module.release_sdr_device(prev_device) - active_device_index = candidate_device_index - with app_module.morse_lock: - morse_active_device = active_device_index + for attempt_index, direct_sampling_mode in enumerate(direct_sampling_attempts, start=1): + rtl_process = None + stop_event = None + control_queue = None + decoder_thread = None + stderr_thread = None - _queue_morse_event({ - 'type': 'info', - 'text': ( - f'[morse] switching to {_device_label(active_device_index)} ' - f'({device_pos}/{len(candidate_device_indices)})' - ), - }) - - runtime_config['active_device'] = active_device_index - runtime_config['device_serial'] = str( - device_catalog.get(active_device_index, {}).get('serial') or 'Unknown' + rtl_cmd = _build_rtl_cmd(active_device_index, direct_sampling_mode) + direct_mode_label = direct_sampling_mode if direct_sampling_mode is not None else 'none' + full_cmd = ' '.join(rtl_cmd) + logger.info( + 'Morse decoder attempt device=%s (%s/%s) rf=%.6f tuned=%.6f direct_mode=%s (%s/%s): %s', + active_device_index, + device_pos, + len(candidate_device_indices), + float(freq), + float(runtime_config.get('tuned_frequency_mhz', freq)), + direct_mode_label, + attempt_index, + len(direct_sampling_attempts), + full_cmd, ) - runtime_config.pop('startup_waiting', None) - runtime_config.pop('startup_warning', None) + _queue_morse_event({'type': 'info', 'text': f'[cmd] {full_cmd}'}) - for attempt_index, direct_sampling_mode in enumerate(direct_sampling_attempts, start=1): - rtl_process = None - multimon_process = None - stop_event = None - control_queue = None - decoder_thread = None - stderr_thread = None - relay_thread = None - master_fd = None + rtl_process = subprocess.Popen( + rtl_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + register_process(rtl_process) - rtl_cmd = _build_rtl_cmd(active_device_index, direct_sampling_mode) - direct_mode_label = direct_sampling_mode if direct_sampling_mode is not None else 'none' - full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd) - logger.info( - 'Morse decoder attempt decoder=%s device=%s (%s/%s) rf=%.6f tuned=%.6f direct_mode=%s (%s/%s): %s', - decoder_mode, - active_device_index, - device_pos, - len(candidate_device_indices), - float(freq), - float(runtime_config.get('tuned_frequency_mhz', freq)), - direct_mode_label, - attempt_index, - len(direct_sampling_attempts), - full_cmd, - ) - _queue_morse_event({'type': 'info', 'text': f'[cmd] {full_cmd}'}) + stop_event = threading.Event() + control_queue = queue.Queue(maxsize=16) + pcm_ready_event = threading.Event() + stderr_lines: list[str] = [] - rtl_process = subprocess.Popen( - rtl_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - bufsize=0, - ) - register_process(rtl_process) - - stop_event = threading.Event() - control_queue = queue.Queue(maxsize=16) - pcm_ready_event = threading.Event() - stderr_lines: list[str] = [] - - def monitor_stderr( - proc: subprocess.Popen[bytes] = rtl_process, - proc_stop_event: threading.Event = stop_event, - capture_lines: list[str] = stderr_lines, - ) -> None: - stderr_stream = proc.stderr - if stderr_stream is None: - return - try: - while not proc_stop_event.is_set(): - line = stderr_stream.readline() - if not line: - if proc.poll() is not None: - break - time.sleep(0.02) - continue - err_text = line.decode('utf-8', errors='replace').strip() - if not err_text: - continue - if len(capture_lines) >= 40: - del capture_lines[:10] - capture_lines.append(err_text) - _queue_morse_event({'type': 'info', 'text': f'[rtl_fm] {err_text}'}) - except (ValueError, OSError): - return - except Exception: - return - - stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr') - stderr_thread.start() - - master_fd, slave_fd = pty.openpty() + def monitor_stderr( + proc: subprocess.Popen[bytes] = rtl_process, + proc_stop_event: threading.Event = stop_event, + capture_lines: list[str] = stderr_lines, + ) -> None: + stderr_stream = proc.stderr + if stderr_stream is None: + return try: - multimon_process = subprocess.Popen( - multimon_cmd, - stdin=subprocess.PIPE, - stdout=slave_fd, - stderr=slave_fd, - close_fds=True, - ) - finally: - with contextlib.suppress(OSError): - os.close(slave_fd) - register_process(multimon_process) + while not proc_stop_event.is_set(): + line = stderr_stream.readline() + if not line: + if proc.poll() is not None: + break + time.sleep(0.02) + continue + err_text = line.decode('utf-8', errors='replace').strip() + if not err_text: + continue + if len(capture_lines) >= 40: + del capture_lines[:10] + capture_lines.append(err_text) + _queue_morse_event({'type': 'info', 'text': f'[rtl_fm] {err_text}'}) + except (ValueError, OSError): + return + except Exception: + return - if rtl_process.stdout is None: - raise RuntimeError('rtl_fm stdout unavailable') - if multimon_process.stdin is None: - raise RuntimeError('multimon-ng stdin unavailable') + stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr') + stderr_thread.start() - relay_thread = threading.Thread( - target=_morse_audio_relay_thread, - args=( - rtl_process.stdout, - multimon_process.stdin, - app_module.morse_queue, - stop_event, - control_queue, - runtime_config, - pcm_ready_event, - ), - daemon=True, - name='morse-relay', - ) - relay_thread.start() + if rtl_process.stdout is None: + raise RuntimeError('rtl_fm stdout unavailable') - decoder_thread = threading.Thread( - target=_morse_multimon_output_thread, - args=(master_fd, multimon_process, stop_event), - daemon=True, - name='morse-decoder', - ) - decoder_thread.start() + decoder_thread = threading.Thread( + target=morse_decoder_thread, + kwargs={ + 'rtl_stdout': rtl_process.stdout, + 'output_queue': app_module.morse_queue, + 'stop_event': stop_event, + 'sample_rate': sample_rate, + 'tone_freq': tone_freq, + 'wpm': wpm, + 'decoder_config': runtime_config, + 'control_queue': control_queue, + 'pcm_ready_event': pcm_ready_event, + }, + daemon=True, + name='morse-decoder', + ) + decoder_thread.start() - startup_deadline = time.monotonic() + 4.0 - startup_ok = False - startup_error = '' + startup_deadline = time.monotonic() + 4.0 + startup_ok = False + startup_error = '' - while time.monotonic() < startup_deadline: - if pcm_ready_event.is_set(): - startup_ok = True - break - if rtl_process.poll() is not None: - startup_error = f'rtl_fm exited during startup (code {rtl_process.returncode})' - break - if multimon_process.poll() is not None: - startup_error = f'multimon-ng exited during startup (code {multimon_process.returncode})' - break - time.sleep(0.05) - - if not startup_ok: - if not startup_error: - startup_error = 'No PCM samples received within startup timeout' - if stderr_lines: - startup_error = f'{startup_error}; stderr: {stderr_lines[-1]}' - is_last_decoder = decoder_pos == len(multimon_decoder_modes) - is_last_device = device_pos == len(candidate_device_indices) - is_last_attempt = attempt_index == len(direct_sampling_attempts) - if ( - is_last_decoder - and is_last_device - and is_last_attempt - and rtl_process.poll() is None - and multimon_process.poll() is None - ): - startup_ok = True - runtime_config['startup_waiting'] = True - runtime_config['startup_warning'] = startup_error - logger.warning( - 'Morse startup continuing without PCM on %s: %s', - _device_label(active_device_index), - startup_error, - ) - _queue_morse_event({ - 'type': 'info', - 'text': '[morse] waiting for PCM stream...', - }) - - if startup_ok: - runtime_config['direct_sampling_mode'] = direct_sampling_mode - runtime_config['direct_sampling'] = ( - int(direct_sampling_mode) if direct_sampling_mode is not None else 0 - ) - runtime_config['command'] = full_cmd - runtime_config['active_device'] = active_device_index - runtime_config['multimon_decoder'] = decoder_mode - - active_rtl_process = rtl_process - active_multimon_process = multimon_process - active_stop_event = stop_event - active_control_queue = control_queue - active_decoder_thread = decoder_thread - active_stderr_thread = stderr_thread - active_relay_thread = relay_thread - active_master_fd = master_fd - startup_succeeded = True + while time.monotonic() < startup_deadline: + if pcm_ready_event.is_set(): + startup_ok = True break + if rtl_process.poll() is not None: + startup_error = f'rtl_fm exited during startup (code {rtl_process.returncode})' + break + time.sleep(0.05) - attempt_errors.append( - f'{_device_label(active_device_index)} decoder={decoder_mode} ' - f'attempt {attempt_index}/{len(direct_sampling_attempts)} ' - f'(source=rtl_fm direct_mode={direct_mode_label}): {startup_error}' + if not startup_ok: + if not startup_error: + startup_error = 'No PCM samples received within startup timeout' + if stderr_lines: + startup_error = f'{startup_error}; stderr: {stderr_lines[-1]}' + is_last_device = device_pos == len(candidate_device_indices) + is_last_attempt = attempt_index == len(direct_sampling_attempts) + if ( + is_last_device + and is_last_attempt + and rtl_process.poll() is None + ): + startup_ok = True + runtime_config['startup_waiting'] = True + runtime_config['startup_warning'] = startup_error + logger.warning( + 'Morse startup continuing without PCM on %s: %s', + _device_label(active_device_index), + startup_error, + ) + _queue_morse_event({ + 'type': 'info', + 'text': '[morse] waiting for PCM stream...', + }) + + if startup_ok: + runtime_config['direct_sampling_mode'] = direct_sampling_mode + runtime_config['direct_sampling'] = ( + int(direct_sampling_mode) if direct_sampling_mode is not None else 0 ) - logger.warning('Morse startup attempt failed: %s', attempt_errors[-1]) - _queue_morse_event({'type': 'info', 'text': f'[morse] startup attempt failed: {startup_error}'}) + runtime_config['command'] = full_cmd + runtime_config['active_device'] = active_device_index - _cleanup_attempt( - rtl_process, - multimon_process, - stop_event, - control_queue, - decoder_thread, - stderr_thread, - relay_thread, - master_fd, - ) - rtl_process = None - multimon_process = None - stop_event = None - control_queue = None - decoder_thread = None - stderr_thread = None - relay_thread = None - master_fd = None - - if startup_succeeded: + active_rtl_process = rtl_process + active_stop_event = stop_event + active_control_queue = control_queue + active_decoder_thread = decoder_thread + active_stderr_thread = stderr_thread + startup_succeeded = True break - if device_pos < len(candidate_device_indices): - next_device = candidate_device_indices[device_pos] - _queue_morse_event({ - 'type': 'status', - 'state': MORSE_STARTING, - 'status': MORSE_STARTING, - 'message': ( - f'No PCM on {_device_label(active_device_index)}. ' - f'Trying {_device_label(next_device)}...' - ), - 'session_id': morse_session_id, - 'timestamp': time.strftime('%H:%M:%S'), - }) + attempt_errors.append( + f'{_device_label(active_device_index)} ' + f'attempt {attempt_index}/{len(direct_sampling_attempts)} ' + f'(source=rtl_fm direct_mode={direct_mode_label}): {startup_error}' + ) + logger.warning('Morse startup attempt failed: %s', attempt_errors[-1]) + _queue_morse_event({'type': 'info', 'text': f'[morse] startup attempt failed: {startup_error}'}) + + _cleanup_attempt( + rtl_process, + stop_event, + control_queue, + decoder_thread, + stderr_thread, + ) + rtl_process = None + stop_event = None + control_queue = None + decoder_thread = None + stderr_thread = None if startup_succeeded: break + if device_pos < len(candidate_device_indices): + next_device = candidate_device_indices[device_pos] + _queue_morse_event({ + 'type': 'status', + 'state': MORSE_STARTING, + 'status': MORSE_STARTING, + 'message': ( + f'No PCM on {_device_label(active_device_index)}. ' + f'Trying {_device_label(next_device)}...' + ), + 'session_id': morse_session_id, + 'timestamp': time.strftime('%H:%M:%S'), + }) + if ( active_rtl_process is None - or active_multimon_process is None or active_stop_event is None or active_control_queue is None or active_decoder_thread is None or active_stderr_thread is None - or active_relay_thread is None - or active_master_fd is None ): msg = ( f'SDR capture started but no PCM stream was received from ' @@ -1027,20 +625,16 @@ def start_morse() -> Response: return jsonify({'status': 'error', 'message': msg}), 500 with app_module.morse_lock: - app_module.morse_process = active_multimon_process - app_module.morse_process._rtl_process = active_rtl_process + app_module.morse_process = active_rtl_process app_module.morse_process._stop_decoder = active_stop_event app_module.morse_process._decoder_thread = active_decoder_thread app_module.morse_process._stderr_thread = active_stderr_thread - app_module.morse_process._relay_thread = active_relay_thread app_module.morse_process._control_queue = active_control_queue - app_module.morse_process._master_fd = active_master_fd morse_stop_event = active_stop_event morse_control_queue = active_control_queue morse_decoder_worker = active_decoder_thread morse_stderr_worker = active_stderr_thread - morse_relay_worker = active_relay_thread morse_runtime_config = dict(runtime_config) _set_state(MORSE_RUNNING, 'Listening') @@ -1057,13 +651,10 @@ def start_morse() -> Response: except FileNotFoundError as e: _cleanup_attempt( rtl_process if rtl_process is not None else active_rtl_process, - multimon_process if multimon_process is not None else active_multimon_process, stop_event if stop_event is not None else active_stop_event, control_queue if control_queue is not None else active_control_queue, decoder_thread if decoder_thread is not None else active_decoder_thread, stderr_thread if stderr_thread is not None else active_stderr_thread, - relay_thread if relay_thread is not None else active_relay_thread, - master_fd if master_fd is not None else active_master_fd, ) with app_module.morse_lock: if morse_active_device is not None: @@ -1077,13 +668,10 @@ def start_morse() -> Response: except Exception as e: _cleanup_attempt( rtl_process if rtl_process is not None else active_rtl_process, - multimon_process if multimon_process is not None else active_multimon_process, stop_event if stop_event is not None else active_stop_event, control_queue if control_queue is not None else active_control_queue, decoder_thread if decoder_thread is not None else active_decoder_thread, stderr_thread if stderr_thread is not None else active_stderr_thread, - relay_thread if relay_thread is not None else active_relay_thread, - master_fd if master_fd is not None else active_master_fd, ) with app_module.morse_lock: if morse_active_device is not None: @@ -1097,7 +685,7 @@ def start_morse() -> Response: @morse_bp.route('/morse/stop', methods=['POST']) def stop_morse() -> Response: - global morse_active_device, morse_decoder_worker, morse_stderr_worker, morse_relay_worker + global morse_active_device, morse_decoder_worker, morse_stderr_worker global morse_stop_event, morse_control_queue stop_started = time.perf_counter() @@ -1106,23 +694,18 @@ def stop_morse() -> Response: if morse_state == MORSE_STOPPING: return jsonify({'status': 'stopping', 'state': MORSE_STOPPING}), 202 - proc = app_module.morse_process - rtl_proc = getattr(proc, '_rtl_process', None) if proc else None - stop_event = morse_stop_event or getattr(proc, '_stop_decoder', None) - decoder_thread = morse_decoder_worker or getattr(proc, '_decoder_thread', None) - stderr_thread = morse_stderr_worker or getattr(proc, '_stderr_thread', None) - relay_thread = morse_relay_worker or getattr(proc, '_relay_thread', None) - control_queue = morse_control_queue or getattr(proc, '_control_queue', None) - master_fd = getattr(proc, '_master_fd', None) if proc else None + rtl_proc = app_module.morse_process + stop_event = morse_stop_event or getattr(rtl_proc, '_stop_decoder', None) + decoder_thread = morse_decoder_worker or getattr(rtl_proc, '_decoder_thread', None) + stderr_thread = morse_stderr_worker or getattr(rtl_proc, '_stderr_thread', None) + control_queue = morse_control_queue or getattr(rtl_proc, '_control_queue', None) active_device = morse_active_device if ( - not proc - and not rtl_proc + not rtl_proc and not stop_event and not decoder_thread and not stderr_thread - and not relay_thread ): _set_state(MORSE_IDLE, 'Idle', enqueue=False) return jsonify({'status': 'not_running', 'state': MORSE_IDLE}) @@ -1134,7 +717,6 @@ def stop_morse() -> Response: morse_control_queue = None morse_decoder_worker = None morse_stderr_worker = None - morse_relay_worker = None cleanup_steps: list[str] = [] @@ -1153,34 +735,18 @@ def stop_morse() -> Response: control_queue.put_nowait({'cmd': 'shutdown'}) _mark('control_queue shutdown signal sent') - if master_fd is not None: - with contextlib.suppress(OSError): - os.close(master_fd) - _mark('pty master fd closed') - if rtl_proc is not None: _close_pipe(getattr(rtl_proc, 'stdout', None)) _close_pipe(getattr(rtl_proc, 'stderr', None)) _mark('rtl_fm pipes closed') - if proc is not None: - _close_pipe(getattr(proc, 'stdin', None)) - _mark('multimon stdin closed') - if rtl_proc is not None: safe_terminate(rtl_proc, timeout=0.6) unregister_process(rtl_proc) _mark('rtl_fm process terminated') - if proc is not None: - safe_terminate(proc, timeout=0.6) - unregister_process(proc) - _mark('multimon process terminated') - - relay_joined = _join_thread(relay_thread, timeout_s=0.45) decoder_joined = _join_thread(decoder_thread, timeout_s=0.45) stderr_joined = _join_thread(stderr_thread, timeout_s=0.45) - _mark(f'relay thread joined={relay_joined}') _mark(f'decoder thread joined={decoder_joined}') _mark(f'stderr thread joined={stderr_joined}') @@ -1190,16 +756,12 @@ def stop_morse() -> Response: stop_ms = round((time.perf_counter() - stop_started) * 1000.0, 1) alive_after = [] - if not relay_joined: - alive_after.append('relay_thread') if not decoder_joined: alive_after.append('decoder_thread') if not stderr_joined: alive_after.append('stderr_thread') if rtl_proc is not None and rtl_proc.poll() is None: alive_after.append('rtl_process') - if proc is not None and proc.poll() is None: - alive_after.append('multimon_process') with app_module.morse_lock: morse_active_device = None diff --git a/tests/test_morse.py b/tests/test_morse.py index 20fcd46..571a7a7 100644 --- a/tests/test_morse.py +++ b/tests/test_morse.py @@ -270,7 +270,6 @@ class TestMorseLifecycleRoutes: monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice())) monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder())) monkeypatch.setattr(morse_routes.SDRFactory, 'detect_devices', staticmethod(lambda: [])) - monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng') pcm = generate_morse_audio('E', wpm=15, sample_rate=22050) @@ -293,27 +292,7 @@ class TestMorseLifecycleRoutes: def kill(self): self.returncode = -9 - class FakeMultimonProc: - def __init__(self): - self.stdin = io.BytesIO() - self.returncode = None - - def poll(self): - return self.returncode - - def terminate(self): - self.returncode = 0 - - def wait(self, timeout=None): - self.returncode = 0 - return 0 - - def kill(self): - self.returncode = -9 - def fake_popen(cmd, *args, **kwargs): - if 'multimon' in str(cmd[0]): - return FakeMultimonProc() return FakeRtlProc(pcm) monkeypatch.setattr(morse_routes.subprocess, 'Popen', fake_popen) @@ -375,7 +354,6 @@ class TestMorseLifecycleRoutes: monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice())) monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder())) monkeypatch.setattr(morse_routes.SDRFactory, 'detect_devices', staticmethod(lambda: [])) - monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng') pcm = generate_morse_audio('E', wpm=15, sample_rate=22050) rtl_cmds = [] @@ -399,27 +377,7 @@ class TestMorseLifecycleRoutes: def kill(self): self.returncode = -9 - class FakeMultimonProc: - def __init__(self): - self.stdin = io.BytesIO() - self.returncode = None - - def poll(self): - return self.returncode - - def terminate(self): - self.returncode = 0 - - def wait(self, timeout=None): - self.returncode = 0 - return 0 - - def kill(self): - self.returncode = -9 - def fake_popen(cmd, *args, **kwargs): - if 'multimon' in str(cmd[0]): - return FakeMultimonProc() rtl_cmds.append(cmd) if len(rtl_cmds) == 1: return FakeRtlProc(b'', 1) @@ -465,7 +423,6 @@ class TestMorseLifecycleRoutes: monkeypatch.setattr(app_module, 'claim_sdr_device', lambda idx, mode: None) monkeypatch.setattr(app_module, 'release_sdr_device', lambda idx: released_devices.append(idx)) - monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng') class DummyDevice: def __init__(self, index: int): @@ -520,27 +477,7 @@ class TestMorseLifecycleRoutes: def kill(self): self.returncode = -9 - class FakeMultimonProc: - def __init__(self): - self.stdin = io.BytesIO() - self.returncode = None - - def poll(self): - return self.returncode - - def terminate(self): - self.returncode = 0 - - def wait(self, timeout=None): - self.returncode = 0 - return 0 - - def kill(self): - self.returncode = -9 - def fake_popen(cmd, *args, **kwargs): - if 'multimon' in str(cmd[0]): - return FakeMultimonProc() try: dev = int(cmd[cmd.index('-d') + 1]) except Exception: