diff --git a/README.md b/README.md index a1a0738..a127e17 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,9 @@ Support the developer of this open-source project ## CW / Morse Decoder Notes +Live backend: +- Uses `rtl_fm` piped into `multimon-ng` (`MORSE_CW`) for real-time decode. + Recommended baseline settings: - **Tone**: `700 Hz` - **Bandwidth**: `200 Hz` (use `100 Hz` for crowded bands, `400 Hz` for drifting signals) diff --git a/routes/morse.py b/routes/morse.py index ae793f8..2a76e2c 100644 --- a/routes/morse.py +++ b/routes/morse.py @@ -3,8 +3,13 @@ from __future__ import annotations import contextlib +import math import os +import pty import queue +import re +import select +import struct import subprocess import tempfile import threading @@ -16,11 +21,10 @@ from flask import Blueprint, Response, jsonify, request import app as app_module from utils.event_pipeline import process_event +from utils.dependencies import get_tool_path from utils.logging import sensor_logger as logger from utils.morse import ( decode_morse_wav_file, - morse_decoder_thread, - morse_iq_decoder_thread, ) from utils.process import register_process, safe_terminate, unregister_process from utils.sdr import SDRFactory, SDRType @@ -53,9 +57,12 @@ morse_session_id = 0 morse_decoder_worker: threading.Thread | None = None morse_stderr_worker: threading.Thread | None = None +morse_relay_worker: threading.Thread | None = None morse_stop_event: threading.Event | None = None morse_control_queue: queue.Queue | None = None +MORSE_LINE_RE = re.compile(r'^\s*(?:MORSE(?:_CW)?(?:\([^)]*\))?)\s*:\s*(.*)$', re.IGNORECASE) + def _set_state(state: str, message: str = '', *, enqueue: bool = True, extra: dict[str, Any] | None = None) -> None: """Update lifecycle state and optionally emit a status queue event.""" @@ -112,42 +119,287 @@ def _stdout_target_path() -> str: return '-' -def _is_real_tool_binary(cmd: list[str]) -> bool: - """Heuristic to avoid FIFO plumbing in unit tests with fake binaries.""" - if not cmd: - return False - exe = str(cmd[0] or '') - return exe.startswith('/') and Path(exe).exists() +def _queue_morse_event(payload: dict[str, Any]) -> None: + with contextlib.suppress(queue.Full): + app_module.morse_queue.put_nowait(payload) -def _prepare_fifo_output(cmd: list[str], *, token: str) -> tuple[list[str], Any | None, str | None]: - """Optionally route rtl_* output through a named pipe for robust reading.""" - if os.name != 'posix' or not _is_real_tool_binary(cmd): - return cmd, None, None - if not cmd: - return cmd, None, None +def _parse_multimon_morse_text(line: str) -> str | None: + cleaned = str(line or '').strip() + if not cleaned: + return None + + matched = MORSE_LINE_RE.match(cleaned) + if matched: + return matched.group(1).strip() + + lower = cleaned.lower() + if lower.startswith(('multimon-ng', 'available demodulators', 'enabled demodulators')): + return None + + if ':' in cleaned: + label, payload = cleaned.split(':', 1) + if 'morse' in label.upper(): + return payload.strip() + return None + + if len(cleaned) <= 128 and re.fullmatch(r"[A-Za-z0-9 /.,'!?+\-]+", cleaned): + return cleaned + + return None + + +def _emit_decoded_text(text: str) -> None: + filtered = ''.join(ch for ch in str(text or '') if ch == ' ' or 32 <= ord(ch) <= 126) + if not filtered: + return + + timestamp = time.strftime('%H:%M:%S') + for ch in filtered: + if ch.isspace(): + _queue_morse_event({ + 'type': 'morse_space', + 'timestamp': timestamp, + }) + else: + _queue_morse_event({ + 'type': 'morse_char', + 'char': ch, + 'morse': '', + 'timestamp': timestamp, + }) + + +def _compress_amplitudes(samples: tuple[int, ...], bins: int = 96) -> list[int]: + if not samples: + return [] + + step = max(1, len(samples) // bins) + out: list[int] = [] + for idx in range(0, len(samples), step): + if len(out) >= bins: + break + chunk = samples[idx:idx + step] + if not chunk: + continue + out.append(int(sum(abs(v) for v in chunk) / len(chunk))) + return out + + +def _read_pcm_chunk( + stream: Any, + chunk_bytes: int, + stop_event: threading.Event, + timeout_s: float = 0.2, +) -> bytes | None: + if stream is None: + return b'' + try: - fifo_dir = Path(tempfile.gettempdir()) - fifo_path = fifo_dir / f'morse_{token}_{int(time.time() * 1000)}.fifo' - with contextlib.suppress(FileNotFoundError): - fifo_path.unlink() - os.mkfifo(fifo_path, 0o600) - - fifo_cmd = list(cmd) - if fifo_cmd: - fifo_cmd[-1] = str(fifo_path) - - reader_fd = os.open(str(fifo_path), os.O_RDONLY | os.O_NONBLOCK) - reader = os.fdopen(reader_fd, 'rb', buffering=0) - return fifo_cmd, reader, str(fifo_path) + fileno = stream.fileno() except Exception: + if stop_event.is_set(): + return b'' with contextlib.suppress(Exception): - if 'reader' in locals() and reader is not None: - reader.close() - with contextlib.suppress(Exception): - if 'fifo_path' in locals(): - Path(fifo_path).unlink(missing_ok=True) - return cmd, None, None + return stream.read(chunk_bytes) + return b'' + + while not stop_event.is_set(): + try: + ready, _, _ = select.select([fileno], [], [], timeout_s) + except Exception: + return b'' + + if not ready: + return None + + try: + return os.read(fileno, chunk_bytes) + except BlockingIOError: + continue + except OSError: + return b'' + + return b'' + + +def _morse_audio_relay_thread( + rtl_stdout: Any, + multimon_stdin: Any, + output_queue: queue.Queue, + stop_event: threading.Event, + control_queue: queue.Queue | None, + runtime_config: dict[str, Any], + pcm_ready_event: threading.Event, +) -> None: + chunk_bytes = 4096 + scope_interval = 0.1 + waiting_threshold = 0.7 + + tone_freq = _float_value(runtime_config.get('tone_freq'), 700.0) + wpm = _float_value(runtime_config.get('wpm'), 15.0) + threshold_mode = str(runtime_config.get('threshold_mode', 'auto')).strip().lower() + manual_threshold = _float_value(runtime_config.get('manual_threshold'), 0.0) + threshold_multiplier = _float_value(runtime_config.get('threshold_multiplier'), 2.8) + threshold_offset = _float_value(runtime_config.get('threshold_offset'), 0.0) + signal_gate = _float_value(runtime_config.get('min_signal_gate'), 0.0) + + last_scope_emit = 0.0 + last_pcm_at = 0.0 + noise_floor = 0.0 + threshold = manual_threshold if threshold_mode == 'manual' else 0.0 + + try: + while not stop_event.is_set(): + if control_queue is not None: + while True: + try: + control_msg = control_queue.get_nowait() + except queue.Empty: + break + + cmd = str(control_msg.get('cmd', '')).strip().lower() + if cmd == 'shutdown': + stop_event.set() + break + if cmd == 'reset': + noise_floor = 0.0 + threshold = manual_threshold if threshold_mode == 'manual' else 0.0 + _queue_morse_event({ + 'type': 'info', + 'text': '[morse] Calibration reset applied', + }) + if stop_event.is_set(): + break + + payload = _read_pcm_chunk(rtl_stdout, chunk_bytes, stop_event) + now = time.monotonic() + + if payload is None: + if now - last_scope_emit >= scope_interval: + last_scope_emit = now + waiting = (last_pcm_at <= 0.0) or ((now - last_pcm_at) >= waiting_threshold) + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'scope', + 'waiting': waiting, + 'amplitudes': [], + 'tone_on': False, + 'level': 0.0, + 'threshold': round(threshold, 4), + 'noise_floor': round(noise_floor, 4), + 'tone_freq': tone_freq, + 'wpm': wpm, + }) + continue + + if not payload: + break + + last_pcm_at = now + pcm_ready_event.set() + + try: + multimon_stdin.write(payload) + multimon_stdin.flush() + except (BrokenPipeError, OSError): + break + + sample_count = len(payload) // 2 + if sample_count <= 0: + continue + try: + samples = struct.unpack(f'<{sample_count}h', payload[:sample_count * 2]) + except struct.error: + continue + + amplitudes = _compress_amplitudes(samples) + rms = math.sqrt(sum(s * s for s in samples) / sample_count) / 32768.0 + level = max(0.0, min(1.0, rms)) + + if noise_floor <= 0.0: + noise_floor = level + elif level <= noise_floor: + noise_floor = (noise_floor * 0.9) + (level * 0.1) + else: + noise_floor = (noise_floor * 0.995) + (level * 0.005) + + if threshold_mode == 'manual': + threshold = manual_threshold + else: + threshold = max(0.0, (noise_floor * threshold_multiplier) + threshold_offset) + + tone_on = level >= max(signal_gate, threshold) + + if now - last_scope_emit >= scope_interval: + last_scope_emit = now + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'scope', + 'waiting': False, + 'amplitudes': amplitudes, + 'tone_on': tone_on, + 'level': round(level, 4), + 'threshold': round(threshold, 4), + 'noise_floor': round(noise_floor, 4), + 'tone_freq': tone_freq, + 'wpm': wpm, + }) + except Exception as exc: + logger.debug('Morse audio relay error: %s', exc) + finally: + _close_pipe(multimon_stdin) + + +def _morse_multimon_output_thread( + master_fd: int, + process: subprocess.Popen[bytes], + stop_event: threading.Event, +) -> None: + buffer = '' + try: + while not stop_event.is_set(): + try: + ready, _, _ = select.select([master_fd], [], [], 0.2) + except Exception: + break + + if ready: + try: + raw = os.read(master_fd, 2048) + except OSError: + break + if not raw: + if process.poll() is not None: + break + continue + + buffer += raw.decode('utf-8', errors='replace') + while '\n' in buffer: + line, buffer = buffer.split('\n', 1) + line = line.strip() + if not line: + continue + text = _parse_multimon_morse_text(line) + if text is None: + _queue_morse_event({'type': 'info', 'text': f'[multimon] {line}'}) + continue + if text: + _emit_decoded_text(text) + + if process.poll() is not None: + break + + tail = buffer.strip() + if tail: + tail_text = _parse_multimon_morse_text(tail) + if tail_text: + _emit_decoded_text(tail_text) + except Exception as exc: + _queue_morse_event({'type': 'error', 'text': f'multimon output error: {exc}'}) + finally: + with contextlib.suppress(OSError): + os.close(master_fd) def _bool_value(value: Any, default: bool = False) -> bool: @@ -252,20 +504,24 @@ def _snapshot_live_resources() -> list[str]: alive.append('decoder_thread') if morse_stderr_worker and morse_stderr_worker.is_alive(): alive.append('stderr_thread') + if morse_relay_worker and morse_relay_worker.is_alive(): + alive.append('relay_thread') if app_module.morse_process and app_module.morse_process.poll() is None: - alive.append('rtl_process') + alive.append('multimon_process') + rtl_proc = getattr(app_module.morse_process, '_rtl_process', None) + if rtl_proc is not None and rtl_proc.poll() is None: + alive.append('rtl_process') return alive @morse_bp.route('/morse/start', methods=['POST']) def start_morse() -> Response: - global morse_active_device, morse_decoder_worker, morse_stderr_worker + global morse_active_device, morse_decoder_worker, morse_stderr_worker, morse_relay_worker global morse_stop_event, morse_control_queue, morse_runtime_config global morse_last_error, morse_session_id data = request.json or {} - # Validate standard SDR inputs try: freq = validate_frequency(data.get('frequency', '14.060'), min_mhz=0.5, max_mhz=30.0) gain = validate_gain(data.get('gain', '0')) @@ -274,7 +530,6 @@ def start_morse() -> Response: except ValueError as e: return jsonify({'status': 'error', 'message': str(e)}), 400 - # Validate Morse-specific inputs try: tone_freq = _validate_tone_freq(data.get('tone_freq', '700')) wpm = _validate_wpm(data.get('wpm', '15')) @@ -299,7 +554,6 @@ def start_morse() -> Response: 'state': morse_state, }), 409 - # Claim SDR device device_int = int(device) error = app_module.claim_sdr_device(device_int, 'morse') if error: @@ -314,10 +568,8 @@ def start_morse() -> Response: morse_session_id += 1 _drain_queue(app_module.morse_queue) - _set_state(MORSE_STARTING, 'Starting decoder...') - # Use pager-proven audio rate for rtl_fm compatibility across builds. sample_rate = 22050 bias_t = _bool_value(data.get('bias_t', False), False) @@ -330,13 +582,21 @@ def start_morse() -> Response: sdr_device = SDRFactory.create_default_device(sdr_type, index=device) builder = SDRFactory.get_builder(sdr_device.sdr_type) - def _build_rtl_cmd( - *, - direct_sampling_mode: int | None, - force_squelch_off: bool, - add_resample_rate: bool, - add_dc_fast: bool, - ) -> list[str]: + multimon_path = get_tool_path('multimon-ng') + if not multimon_path: + msg = 'multimon-ng not found' + with app_module.morse_lock: + if morse_active_device is not None: + app_module.release_sdr_device(morse_active_device) + morse_active_device = None + morse_last_error = msg + _set_state(MORSE_ERROR, msg) + _set_state(MORSE_IDLE, 'Idle') + return jsonify({'status': 'error', 'message': msg}), 400 + + multimon_cmd = [multimon_path, '-t', 'raw', '-a', 'MORSE_CW', '-f', 'alpha', '-'] + + def _build_rtl_cmd(direct_sampling_mode: int | None) -> list[str]: fm_kwargs: dict[str, Any] = { 'device': sdr_device, 'frequency_mhz': freq, @@ -346,138 +606,38 @@ def start_morse() -> Response: 'modulation': 'usb', 'bias_t': bias_t, } - - # Only rtl_fm supports direct sampling flags. if direct_sampling_mode in (1, 2): fm_kwargs['direct_sampling'] = int(direct_sampling_mode) - cmd = builder.build_fm_demod_command(**fm_kwargs) - - # Some rtl_fm builds behave as if squelch is enabled unless -l is explicit. - # Force continuous audio for CW analysis. - if force_squelch_off and sdr_device.sdr_type == SDRType.RTL_SDR and '-l' not in cmd: - if cmd and cmd[-1] == '-': - cmd[-1:-1] = ['-l', '0'] - else: - cmd.extend(['-l', '0']) + cmd = list(builder.build_fm_demod_command(**fm_kwargs)) + insert_at = len(cmd) - 1 if cmd else 0 + if insert_at < 0: + insert_at = 0 if sdr_device.sdr_type == SDRType.RTL_SDR: - insert_at = len(cmd) - 1 if cmd and cmd[-1] == '-' else len(cmd) - if add_resample_rate and '-r' not in cmd: + if '-l' not in cmd: + cmd[insert_at:insert_at] = ['-l', '0'] + insert_at += 2 + if '-r' not in cmd: cmd[insert_at:insert_at] = ['-r', str(sample_rate)] insert_at += 2 - if add_dc_fast: - # Used in other stable modes to improve rtl_fm stream behavior. - if '-A' not in cmd: - cmd[insert_at:insert_at] = ['-A', 'fast'] - insert_at += 2 - if '-E' not in cmd or 'dc' not in cmd: - cmd[insert_at:insert_at] = ['-E', 'dc'] - # Some rtl_fm builds treat "-" as a literal filename. Use an - # explicit fd-backed stdout path for deterministic piping. - out_target = _stdout_target_path() - if cmd and cmd[-1] == '-': + if '-A' not in cmd: + cmd[insert_at:insert_at] = ['-A', 'fast'] + insert_at += 2 + if '-E' not in cmd: + cmd[insert_at:insert_at] = ['-E', 'dc'] + + out_target = _stdout_target_path() + if cmd: + if cmd[-1] == '-': cmd[-1] = out_target - elif out_target not in cmd: + elif cmd[-1] not in {out_target, '/dev/stdout', '/proc/self/fd/1', '/dev/fd/1'}: cmd.append(out_target) + return cmd - # Use a hardware-friendly IQ rate (matches common RTL-SDR stable rates - # and waterfall defaults) before decimating to audio. - iq_sample_rate = 1024000 - - def _build_iq_cmd(*, direct_sampling_mode: int | None) -> tuple[list[str], float]: - # CW USB-style offset tuning: keep the configured RF frequency sounding - # near the selected tone frequency in the software demod chain. - tune_mhz = max(0.5, float(freq) - (float(tone_freq) / 1_000_000.0)) - iq_cmd = builder.build_iq_capture_command( - device=sdr_device, - frequency_mhz=tune_mhz, - sample_rate=iq_sample_rate, - gain=float(gain) if gain and gain != '0' else None, - ppm=int(ppm) if ppm and ppm != '0' else None, - bias_t=bias_t, - ) - if ( - sdr_device.sdr_type == SDRType.RTL_SDR - and direct_sampling_mode is not None - and '-D' not in iq_cmd - ): - if iq_cmd and iq_cmd[-1] == '-': - iq_cmd[-1:-1] = ['-D', str(direct_sampling_mode)] - else: - iq_cmd.extend(['-D', str(direct_sampling_mode)]) - # Some rtl_sdr builds treat "-" as a literal filename instead of stdout. - # Use an explicit fd-backed stdout path for deterministic piping. - out_target = _stdout_target_path() - if iq_cmd: - if iq_cmd[-1] == '-': - iq_cmd[-1] = out_target - elif out_target not in iq_cmd: - iq_cmd.append(out_target) - return iq_cmd, tune_mhz - - can_try_direct_sampling = bool(sdr_device.sdr_type == SDRType.RTL_SDR and freq < 24.0) - if can_try_direct_sampling: - # IQ-first strategy: avoid repeated rtl_fm/rtl_sdr handoffs that can - # leave the tuner in a bad state on some Linux builds. - command_attempts: list[dict[str, Any]] = [ - { - 'source': 'iq', - 'direct_sampling_mode': 2, - }, - { - 'source': 'iq', - 'direct_sampling_mode': 1, - }, - { - 'source': 'iq', - 'direct_sampling_mode': None, - }, - { - 'source': 'rtl_fm', - 'direct_sampling_mode': 2, - 'force_squelch_off': False, - 'add_resample_rate': True, - 'add_dc_fast': True, - }, - { - 'source': 'rtl_fm', - 'direct_sampling_mode': 1, - 'force_squelch_off': False, - 'add_resample_rate': True, - 'add_dc_fast': True, - }, - { - 'source': 'rtl_fm', - 'direct_sampling_mode': None, - 'force_squelch_off': False, - 'add_resample_rate': True, - 'add_dc_fast': True, - }, - ] - else: - command_attempts = [ - { - 'source': 'iq', - 'direct_sampling_mode': None, - }, - { - 'source': 'rtl_fm', - 'direct_sampling_mode': None, - 'force_squelch_off': False, - 'add_resample_rate': True, - 'add_dc_fast': True, - }, - ] - - rtl_process: subprocess.Popen | None = None - stop_event: threading.Event | None = None - decoder_thread: threading.Thread | None = None - stderr_thread: threading.Thread | None = None - control_queue: queue.Queue | None = None - decoder_input: Any | None = None - fifo_path: str | None = None + can_try_direct_sampling = bool(sdr_device.sdr_type == SDRType.RTL_SDR and float(freq) < 24.0) + direct_sampling_attempts: list[int | None] = [2, 1, None] if can_try_direct_sampling else [None] runtime_config: dict[str, Any] = { 'sample_rate': sample_rate, @@ -493,154 +653,113 @@ def start_morse() -> Response: 'wpm_mode': wpm_mode, 'wpm_lock': wpm_lock, 'min_signal_gate': min_signal_gate, + 'source': 'rtl_fm', } + active_rtl_process: subprocess.Popen[bytes] | None = None + active_multimon_process: subprocess.Popen[bytes] | None = None + active_stop_event: threading.Event | None = None + active_control_queue: queue.Queue | None = None + active_decoder_thread: threading.Thread | None = None + active_stderr_thread: threading.Thread | None = None + active_relay_thread: threading.Thread | None = None + active_master_fd: int | None = None + rtl_process: subprocess.Popen[bytes] | None = None + multimon_process: subprocess.Popen[bytes] | None = None + stop_event: threading.Event | None = None + control_queue: queue.Queue | None = None + decoder_thread: threading.Thread | None = None + stderr_thread: threading.Thread | None = None + relay_thread: threading.Thread | None = None + master_fd: int | None = None + + def _cleanup_attempt( + rtl_proc: subprocess.Popen[bytes] | None, + multimon_proc: subprocess.Popen[bytes] | None, + stop_evt: threading.Event | None, + control_q: queue.Queue | None, + decoder_worker: threading.Thread | None, + stderr_worker: threading.Thread | None, + relay_worker: threading.Thread | None, + master_fd: int | None, + ) -> None: + if stop_evt is not None: + stop_evt.set() + if control_q is not None: + with contextlib.suppress(queue.Full): + control_q.put_nowait({'cmd': 'shutdown'}) + + if master_fd is not None: + with contextlib.suppress(OSError): + os.close(master_fd) + + if rtl_proc is not None: + _close_pipe(getattr(rtl_proc, 'stdout', None)) + _close_pipe(getattr(rtl_proc, 'stderr', None)) + if multimon_proc is not None: + _close_pipe(getattr(multimon_proc, 'stdin', None)) + + if rtl_proc is not None: + safe_terminate(rtl_proc, timeout=0.4) + unregister_process(rtl_proc) + if multimon_proc is not None: + safe_terminate(multimon_proc, timeout=0.4) + unregister_process(multimon_proc) + + _join_thread(relay_worker, timeout_s=0.35) + _join_thread(decoder_worker, timeout_s=0.35) + _join_thread(stderr_worker, timeout_s=0.35) + + full_cmd = '' + attempt_errors: list[str] = [] + try: - def _cleanup_attempt( - proc: subprocess.Popen | None, - attempt_stop_event: threading.Event | None, - attempt_control_queue: queue.Queue | None, - attempt_decoder_thread: threading.Thread | None, - attempt_stderr_thread: threading.Thread | None, - attempt_stream_handle: Any | None = None, - attempt_fifo_path: str | None = None, - ) -> None: - if attempt_stop_event is not None: - attempt_stop_event.set() - if attempt_control_queue is not None: - with contextlib.suppress(queue.Full): - attempt_control_queue.put_nowait({'cmd': 'shutdown'}) - if attempt_stream_handle is not None and ( - proc is None or attempt_stream_handle is not getattr(proc, 'stdout', None) - ): - _close_pipe(attempt_stream_handle) - if proc is not None: - # Close stdout to unblock decoder reads. Keep stderr open until - # after stderr monitor thread exits to avoid ValueError races. - _close_pipe(getattr(proc, 'stdout', None)) - # Keep startup retries responsive; avoid long waits inside - # generic safe_terminate() during a failed attempt. - if proc.poll() is None: - with contextlib.suppress(Exception): - proc.terminate() - with contextlib.suppress(subprocess.TimeoutExpired, Exception): - proc.wait(timeout=0.15) - if proc.poll() is None: - with contextlib.suppress(Exception): - proc.kill() - with contextlib.suppress(subprocess.TimeoutExpired, Exception): - proc.wait(timeout=0.25) - unregister_process(proc) - _join_thread(attempt_decoder_thread, timeout_s=0.20) - stderr_joined = _join_thread(attempt_stderr_thread, timeout_s=0.35) - if proc is not None: - if not stderr_joined: - # Force-close the pipe if stderr reader is still blocked. - _close_pipe(getattr(proc, 'stderr', None)) - _join_thread(attempt_stderr_thread, timeout_s=0.15) - _close_pipe(getattr(proc, 'stderr', None)) - if attempt_fifo_path: - with contextlib.suppress(Exception): - Path(attempt_fifo_path).unlink(missing_ok=True) - - attempt_errors: list[str] = [] - full_cmd = '' - - for attempt_index, attempt in enumerate(command_attempts, start=1): + for attempt_index, direct_sampling_mode in enumerate(direct_sampling_attempts, start=1): + rtl_process = None + multimon_process = None + stop_event = None + control_queue = None + decoder_thread = None + stderr_thread = None + relay_thread = None + master_fd = None runtime_config.pop('startup_waiting', None) runtime_config.pop('startup_warning', None) - source = str(attempt.get('source', 'rtl_fm')).strip().lower() - force_squelch_off = bool(attempt.get('force_squelch_off', True)) - add_resample_rate = bool(attempt.get('add_resample_rate', False)) - add_dc_fast = bool(attempt.get('add_dc_fast', False)) - direct_sampling_mode_raw = attempt.get('direct_sampling_mode') - try: - direct_sampling_mode = ( - int(direct_sampling_mode_raw) - if direct_sampling_mode_raw is not None - else None - ) - except (TypeError, ValueError): - direct_sampling_mode = None - if source == 'iq': - rtl_cmd, tuned_freq_mhz = _build_iq_cmd( - direct_sampling_mode=int(direct_sampling_mode) - if direct_sampling_mode is not None else None, - ) - thread_target = morse_iq_decoder_thread - attempt_desc = ( - f'source=iq direct_mode={direct_sampling_mode if direct_sampling_mode is not None else "none"} ' - f'iq_sr={iq_sample_rate}' - ) - else: - rtl_cmd = _build_rtl_cmd( - direct_sampling_mode=direct_sampling_mode, - force_squelch_off=force_squelch_off, - add_resample_rate=add_resample_rate, - add_dc_fast=add_dc_fast, - ) - tuned_freq_mhz = float(freq) - thread_target = morse_decoder_thread - attempt_desc = ( - f'source=rtl_fm direct_mode={direct_sampling_mode if direct_sampling_mode is not None else "none"} ' - f'squelch_forced={int(force_squelch_off)} ' - f'resample={int(add_resample_rate)} dc_fast={int(add_dc_fast)}' - ) - - full_cmd = ' '.join(rtl_cmd) + rtl_cmd = _build_rtl_cmd(direct_sampling_mode) + direct_mode_label = direct_sampling_mode if direct_sampling_mode is not None else 'none' + full_cmd = ' '.join(rtl_cmd) + ' | ' + ' '.join(multimon_cmd) logger.info( - f'Morse decoder attempt {attempt_index}/{len(command_attempts)} ' - f'({attempt_desc}): {full_cmd}' + 'Morse decoder attempt %s/%s (source=rtl_fm direct_mode=%s): %s', + attempt_index, + len(direct_sampling_attempts), + direct_mode_label, + full_cmd, ) - - with contextlib.suppress(queue.Full): - app_module.morse_queue.put_nowait({ - 'type': 'info', - 'text': f'[cmd] {full_cmd}', - }) - - fifo_cmd, fifo_reader, fifo_path = _prepare_fifo_output( - rtl_cmd, - token=f'{morse_session_id}_{attempt_index}', - ) - if fifo_cmd is not rtl_cmd: - full_cmd = ' '.join(fifo_cmd) - logger.info( - f'Morse decoder attempt {attempt_index}/{len(command_attempts)} ' - f'({attempt_desc}) via fifo: {full_cmd}' - ) - with contextlib.suppress(queue.Full): - app_module.morse_queue.put_nowait({ - 'type': 'info', - 'text': f'[cmd] {full_cmd}', - }) + _queue_morse_event({'type': 'info', 'text': f'[cmd] {full_cmd}'}) rtl_process = subprocess.Popen( - fifo_cmd, - stdout=(subprocess.DEVNULL if fifo_reader is not None else subprocess.PIPE), + rtl_cmd, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, ) register_process(rtl_process) - decoder_input = fifo_reader if fifo_reader is not None else rtl_process.stdout stop_event = threading.Event() control_queue = queue.Queue(maxsize=16) pcm_ready_event = threading.Event() - stream_ready_event = threading.Event() - attempt_stderr_lines: list[str] = [] + stderr_lines: list[str] = [] def monitor_stderr( - proc: subprocess.Popen = rtl_process, + proc: subprocess.Popen[bytes] = rtl_process, proc_stop_event: threading.Event = stop_event, - tool_label: str = rtl_cmd[0], - stderr_lines: list[str] = attempt_stderr_lines, + capture_lines: list[str] = stderr_lines, ) -> None: + stderr_stream = proc.stderr + if stderr_stream is None: + return try: - stderr_stream = proc.stderr - if stderr_stream is None: - return while not proc_stop_event.is_set(): line = stderr_stream.readline() if not line: @@ -649,17 +768,13 @@ def start_morse() -> Response: time.sleep(0.02) continue err_text = line.decode('utf-8', errors='replace').strip() - if err_text: - if len(stderr_lines) >= 40: - del stderr_lines[:10] - stderr_lines.append(err_text) - with contextlib.suppress(queue.Full): - app_module.morse_queue.put_nowait({ - 'type': 'info', - 'text': f'[{tool_label}] {err_text}', - }) - except ValueError: - # Pipe was closed during shutdown; expected during retries. + if not err_text: + continue + if len(capture_lines) >= 40: + del capture_lines[:10] + capture_lines.append(err_text) + _queue_morse_event({'type': 'info', 'text': f'[rtl_fm] {err_text}'}) + except (ValueError, OSError): return except Exception: return @@ -667,51 +782,50 @@ def start_morse() -> Response: stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr') stderr_thread.start() - if source == 'iq': - decoder_thread = threading.Thread( - target=thread_target, - args=( - decoder_input, - app_module.morse_queue, - stop_event, - iq_sample_rate, - ), - kwargs={ - 'sample_rate': sample_rate, - 'tone_freq': tone_freq, - 'wpm': wpm, - 'decoder_config': runtime_config, - 'control_queue': control_queue, - 'pcm_ready_event': pcm_ready_event, - 'stream_ready_event': stream_ready_event, - }, - daemon=True, - name='morse-decoder', - ) - else: - decoder_thread = threading.Thread( - target=thread_target, - args=( - decoder_input, - app_module.morse_queue, - stop_event, - sample_rate, - tone_freq, - wpm, - ), - kwargs={ - 'decoder_config': runtime_config, - 'control_queue': control_queue, - 'pcm_ready_event': pcm_ready_event, - 'stream_ready_event': stream_ready_event, - 'strip_text_chunks': False, - }, - daemon=True, - name='morse-decoder', + master_fd, slave_fd = pty.openpty() + try: + multimon_process = subprocess.Popen( + multimon_cmd, + stdin=subprocess.PIPE, + stdout=slave_fd, + stderr=slave_fd, + close_fds=True, ) + finally: + with contextlib.suppress(OSError): + os.close(slave_fd) + register_process(multimon_process) + + if rtl_process.stdout is None: + raise RuntimeError('rtl_fm stdout unavailable') + if multimon_process.stdin is None: + raise RuntimeError('multimon-ng stdin unavailable') + + relay_thread = threading.Thread( + target=_morse_audio_relay_thread, + args=( + rtl_process.stdout, + multimon_process.stdin, + app_module.morse_queue, + stop_event, + control_queue, + runtime_config, + pcm_ready_event, + ), + daemon=True, + name='morse-relay', + ) + relay_thread.start() + + decoder_thread = threading.Thread( + target=_morse_multimon_output_thread, + args=(master_fd, multimon_process, stop_event), + daemon=True, + name='morse-decoder', + ) decoder_thread.start() - startup_deadline = time.monotonic() + (4.0 if source == 'iq' else 2.0) + startup_deadline = time.monotonic() + 2.5 startup_ok = False startup_error = '' @@ -720,86 +834,91 @@ def start_morse() -> Response: startup_ok = True break if rtl_process.poll() is not None: - startup_error = f'{rtl_cmd[0]} exited during startup (code {rtl_process.returncode})' + startup_error = f'rtl_fm exited during startup (code {rtl_process.returncode})' + break + if multimon_process.poll() is not None: + startup_error = f'multimon-ng exited during startup (code {multimon_process.returncode})' break time.sleep(0.05) if not startup_ok: if not startup_error: startup_error = 'No PCM samples received within startup timeout' - if attempt_stderr_lines: - startup_error = f'{startup_error}; stderr: {attempt_stderr_lines[-1]}' - if stream_ready_event.is_set(): - startup_error = f'{startup_error}; stream=alive' - - is_last_attempt = attempt_index == len(command_attempts) - if ( - is_last_attempt - and rtl_process.poll() is None - and decoder_thread.is_alive() - ): - # Avoid hard-failing startup when SDR is alive but muted. + if stderr_lines: + startup_error = f'{startup_error}; stderr: {stderr_lines[-1]}' + is_last_attempt = attempt_index == len(direct_sampling_attempts) + if is_last_attempt and rtl_process.poll() is None and multimon_process.poll() is None: startup_ok = True runtime_config['startup_waiting'] = True runtime_config['startup_warning'] = startup_error logger.warning( 'Morse startup continuing without PCM (attempt %s/%s): %s', attempt_index, - len(command_attempts), + len(direct_sampling_attempts), startup_error, ) - with contextlib.suppress(queue.Full): - app_module.morse_queue.put_nowait({ - 'type': 'info', - 'text': '[morse] stream alive but no PCM yet; continuing in waiting mode', - }) + _queue_morse_event({ + 'type': 'info', + 'text': '[morse] waiting for PCM stream...', + }) if startup_ok: - runtime_config['source'] = source - runtime_config['command'] = full_cmd - runtime_config['tuned_frequency_mhz'] = tuned_freq_mhz + runtime_config['direct_sampling_mode'] = direct_sampling_mode runtime_config['direct_sampling'] = ( - int(direct_sampling_mode) - if source == 'iq' and direct_sampling_mode is not None - else (int(direct_sampling_mode) if direct_sampling_mode is not None else 0) + int(direct_sampling_mode) if direct_sampling_mode is not None else 0 ) - runtime_config['iq_sample_rate'] = iq_sample_rate if source == 'iq' else None - runtime_config['direct_sampling_mode'] = direct_sampling_mode if source == 'iq' else None + runtime_config['command'] = full_cmd + + active_rtl_process = rtl_process + active_multimon_process = multimon_process + active_stop_event = stop_event + active_control_queue = control_queue + active_decoder_thread = decoder_thread + active_stderr_thread = stderr_thread + active_relay_thread = relay_thread + active_master_fd = master_fd break attempt_errors.append( - f'attempt {attempt_index}/{len(command_attempts)} ({attempt_desc}): {startup_error}' + f'attempt {attempt_index}/{len(direct_sampling_attempts)} ' + f'(source=rtl_fm direct_mode={direct_mode_label}): {startup_error}' ) - logger.warning(f'Morse startup attempt failed: {attempt_errors[-1]}') - - with contextlib.suppress(queue.Full): - app_module.morse_queue.put_nowait({ - 'type': 'info', - 'text': f'[morse] startup attempt failed: {startup_error}', - }) + logger.warning('Morse startup attempt failed: %s', attempt_errors[-1]) + _queue_morse_event({'type': 'info', 'text': f'[morse] startup attempt failed: {startup_error}'}) _cleanup_attempt( rtl_process, + multimon_process, stop_event, control_queue, decoder_thread, stderr_thread, - decoder_input, - fifo_path, + relay_thread, + master_fd, ) rtl_process = None + multimon_process = None stop_event = None control_queue = None decoder_thread = None stderr_thread = None - decoder_input = None - fifo_path = None + relay_thread = None + master_fd = None - if rtl_process is None or stop_event is None or control_queue is None or decoder_thread is None: + if ( + active_rtl_process is None + or active_multimon_process is None + or active_stop_event is None + or active_control_queue is None + or active_decoder_thread is None + or active_stderr_thread is None + or active_relay_thread is None + or active_master_fd is None + ): msg = 'SDR capture started but no PCM stream was received.' if attempt_errors: - msg = msg + ' ' + ' | '.join(attempt_errors) - logger.error(f'Morse startup failed: {msg}') + msg += ' ' + ' | '.join(attempt_errors) + logger.error('Morse startup failed: %s', msg) with app_module.morse_lock: if morse_active_device is not None: app_module.release_sdr_device(morse_active_device) @@ -810,27 +929,23 @@ def start_morse() -> Response: return jsonify({'status': 'error', 'message': msg}), 500 with app_module.morse_lock: - app_module.morse_process = rtl_process - app_module.morse_process._stop_decoder = stop_event - app_module.morse_process._decoder_thread = decoder_thread - app_module.morse_process._stderr_thread = stderr_thread - app_module.morse_process._control_queue = control_queue - app_module.morse_process._stream_handle = decoder_input - app_module.morse_process._fifo_path = fifo_path + app_module.morse_process = active_multimon_process + app_module.morse_process._rtl_process = active_rtl_process + app_module.morse_process._stop_decoder = active_stop_event + app_module.morse_process._decoder_thread = active_decoder_thread + app_module.morse_process._stderr_thread = active_stderr_thread + app_module.morse_process._relay_thread = active_relay_thread + app_module.morse_process._control_queue = active_control_queue + app_module.morse_process._master_fd = active_master_fd - morse_stop_event = stop_event - morse_control_queue = control_queue - morse_decoder_worker = decoder_thread - morse_stderr_worker = stderr_thread + morse_stop_event = active_stop_event + morse_control_queue = active_control_queue + morse_decoder_worker = active_decoder_thread + morse_stderr_worker = active_stderr_thread + morse_relay_worker = active_relay_thread morse_runtime_config = dict(runtime_config) _set_state(MORSE_RUNNING, 'Listening') - with contextlib.suppress(queue.Full): - app_module.morse_queue.put_nowait({ - 'type': 'info', - 'text': f'[cmd] {full_cmd}', - }) - return jsonify({ 'status': 'started', 'state': MORSE_RUNNING, @@ -842,15 +957,16 @@ def start_morse() -> Response: }) except FileNotFoundError as e: - if rtl_process is not None: - unregister_process(rtl_process) - if decoder_input is not None and ( - rtl_process is None or decoder_input is not getattr(rtl_process, 'stdout', None) - ): - _close_pipe(decoder_input) - if fifo_path: - with contextlib.suppress(Exception): - Path(fifo_path).unlink(missing_ok=True) + _cleanup_attempt( + rtl_process if rtl_process is not None else active_rtl_process, + multimon_process if multimon_process is not None else active_multimon_process, + stop_event if stop_event is not None else active_stop_event, + control_queue if control_queue is not None else active_control_queue, + decoder_thread if decoder_thread is not None else active_decoder_thread, + stderr_thread if stderr_thread is not None else active_stderr_thread, + relay_thread if relay_thread is not None else active_relay_thread, + master_fd if master_fd is not None else active_master_fd, + ) with app_module.morse_lock: if morse_active_device is not None: app_module.release_sdr_device(morse_active_device) @@ -861,20 +977,16 @@ def start_morse() -> Response: return jsonify({'status': 'error', 'message': morse_last_error}), 400 except Exception as e: - if rtl_process is not None: - safe_terminate(rtl_process, timeout=0.5) - unregister_process(rtl_process) - if decoder_input is not None and ( - rtl_process is None or decoder_input is not getattr(rtl_process, 'stdout', None) - ): - _close_pipe(decoder_input) - if fifo_path: - with contextlib.suppress(Exception): - Path(fifo_path).unlink(missing_ok=True) - if stop_event is not None: - stop_event.set() - _join_thread(decoder_thread, timeout_s=0.25) - _join_thread(stderr_thread, timeout_s=0.25) + _cleanup_attempt( + rtl_process if rtl_process is not None else active_rtl_process, + multimon_process if multimon_process is not None else active_multimon_process, + stop_event if stop_event is not None else active_stop_event, + control_queue if control_queue is not None else active_control_queue, + decoder_thread if decoder_thread is not None else active_decoder_thread, + stderr_thread if stderr_thread is not None else active_stderr_thread, + relay_thread if relay_thread is not None else active_relay_thread, + master_fd if master_fd is not None else active_master_fd, + ) with app_module.morse_lock: if morse_active_device is not None: app_module.release_sdr_device(morse_active_device) @@ -887,7 +999,7 @@ def start_morse() -> Response: @morse_bp.route('/morse/stop', methods=['POST']) def stop_morse() -> Response: - global morse_active_device, morse_decoder_worker, morse_stderr_worker + global morse_active_device, morse_decoder_worker, morse_stderr_worker, morse_relay_worker global morse_stop_event, morse_control_queue stop_started = time.perf_counter() @@ -897,27 +1009,34 @@ def stop_morse() -> Response: return jsonify({'status': 'stopping', 'state': MORSE_STOPPING}), 202 proc = app_module.morse_process + rtl_proc = getattr(proc, '_rtl_process', None) if proc else None stop_event = morse_stop_event or getattr(proc, '_stop_decoder', None) decoder_thread = morse_decoder_worker or getattr(proc, '_decoder_thread', None) stderr_thread = morse_stderr_worker or getattr(proc, '_stderr_thread', None) + relay_thread = morse_relay_worker or getattr(proc, '_relay_thread', None) control_queue = morse_control_queue or getattr(proc, '_control_queue', None) - stream_handle = getattr(proc, '_stream_handle', None) if proc else None - fifo_path = getattr(proc, '_fifo_path', None) if proc else None + master_fd = getattr(proc, '_master_fd', None) if proc else None active_device = morse_active_device - if not proc and not stop_event and not decoder_thread and not stderr_thread: + if ( + not proc + and not rtl_proc + and not stop_event + and not decoder_thread + and not stderr_thread + and not relay_thread + ): _set_state(MORSE_IDLE, 'Idle', enqueue=False) return jsonify({'status': 'not_running', 'state': MORSE_IDLE}) - # Prevent new starts while cleanup is in progress. _set_state(MORSE_STOPPING, 'Stopping decoder...') - # Detach global runtime pointers immediately to avoid double-stop races. app_module.morse_process = None morse_stop_event = None morse_control_queue = None morse_decoder_worker = None morse_stderr_worker = None + morse_relay_worker = None cleanup_steps: list[str] = [] @@ -936,33 +1055,34 @@ def stop_morse() -> Response: control_queue.put_nowait({'cmd': 'shutdown'}) _mark('control_queue shutdown signal sent') - if stream_handle is not None and ( - proc is None or stream_handle is not getattr(proc, 'stdout', None) - ): - _close_pipe(stream_handle) - _mark('decoder input stream closed') + if master_fd is not None: + with contextlib.suppress(OSError): + os.close(master_fd) + _mark('pty master fd closed') + + if rtl_proc is not None: + _close_pipe(getattr(rtl_proc, 'stdout', None)) + _close_pipe(getattr(rtl_proc, 'stderr', None)) + _mark('rtl_fm pipes closed') if proc is not None: - _close_pipe(getattr(proc, 'stdout', None)) - _mark('stdout pipe closed') + _close_pipe(getattr(proc, 'stdin', None)) + _mark('multimon stdin closed') - safe_terminate(proc, timeout=0.6) - unregister_process(proc) + if rtl_proc is not None: + safe_terminate(rtl_proc, timeout=0.6) + unregister_process(rtl_proc) _mark('rtl_fm process terminated') + if proc is not None: + safe_terminate(proc, timeout=0.6) + unregister_process(proc) + _mark('multimon process terminated') + + relay_joined = _join_thread(relay_thread, timeout_s=0.45) decoder_joined = _join_thread(decoder_thread, timeout_s=0.45) stderr_joined = _join_thread(stderr_thread, timeout_s=0.45) - if proc is not None: - if not stderr_joined: - _close_pipe(getattr(proc, 'stderr', None)) - stderr_joined = _join_thread(stderr_thread, timeout_s=0.20) - _mark('stderr pipe force-closed') - _close_pipe(getattr(proc, 'stderr', None)) - _mark('stderr pipe closed') - if fifo_path: - with contextlib.suppress(Exception): - Path(fifo_path).unlink(missing_ok=True) - _mark('fifo path removed') + _mark(f'relay thread joined={relay_joined}') _mark(f'decoder thread joined={decoder_joined}') _mark(f'stderr thread joined={stderr_joined}') @@ -972,10 +1092,16 @@ def stop_morse() -> Response: stop_ms = round((time.perf_counter() - stop_started) * 1000.0, 1) alive_after = [] + if not relay_joined: + alive_after.append('relay_thread') if not decoder_joined: alive_after.append('decoder_thread') if not stderr_joined: alive_after.append('stderr_thread') + if rtl_proc is not None and rtl_proc.poll() is None: + alive_after.append('rtl_process') + if proc is not None and proc.poll() is None: + alive_after.append('multimon_process') with app_module.morse_lock: morse_active_device = None diff --git a/tests/test_morse.py b/tests/test_morse.py index a53274f..43fabdf 100644 --- a/tests/test_morse.py +++ b/tests/test_morse.py @@ -243,6 +243,7 @@ class TestMorseLifecycleRoutes: morse_routes.morse_active_device = None morse_routes.morse_decoder_worker = None morse_routes.morse_stderr_worker = None + morse_routes.morse_relay_worker = None morse_routes.morse_stop_event = None morse_routes.morse_control_queue = None morse_routes.morse_runtime_config = {} @@ -264,31 +265,57 @@ class TestMorseLifecycleRoutes: class DummyBuilder: def build_fm_demod_command(self, **kwargs): - return ['rtl_fm', '-f', '14060000'] - - def build_iq_capture_command(self, **kwargs): - cmd = ['rtl_sdr', '-f', '14060000', '-s', '250000'] - if kwargs.get('gain') is not None: - cmd.extend(['-g', str(kwargs['gain'])]) - cmd.append('-') - return cmd + return ['rtl_fm', '-f', '14060000', '-'] monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice())) monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder())) - monkeypatch.setattr(morse_routes.time, 'sleep', lambda _secs: None) + monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng') - pcm = generate_morse_audio('E', wpm=15) + pcm = generate_morse_audio('E', wpm=15, sample_rate=22050) - class FakeProc: - def __init__(self): - self.stdout = io.BytesIO(pcm) + class FakeRtlProc: + def __init__(self, payload: bytes): + self.stdout = io.BytesIO(payload) self.stderr = io.BytesIO(b'') self.returncode = None def poll(self): return self.returncode - monkeypatch.setattr(morse_routes.subprocess, 'Popen', lambda *args, **kwargs: FakeProc()) + def terminate(self): + self.returncode = 0 + + def wait(self, timeout=None): + self.returncode = 0 + return 0 + + def kill(self): + self.returncode = -9 + + class FakeMultimonProc: + def __init__(self): + self.stdin = io.BytesIO() + self.returncode = None + + def poll(self): + return self.returncode + + def terminate(self): + self.returncode = 0 + + def wait(self, timeout=None): + self.returncode = 0 + return 0 + + def kill(self): + self.returncode = -9 + + def fake_popen(cmd, *args, **kwargs): + if 'multimon' in str(cmd[0]): + return FakeMultimonProc() + return FakeRtlProc(pcm) + + monkeypatch.setattr(morse_routes.subprocess, 'Popen', fake_popen) monkeypatch.setattr(morse_routes, 'register_process', lambda _proc: None) monkeypatch.setattr(morse_routes, 'unregister_process', lambda _proc: None) monkeypatch.setattr( @@ -339,22 +366,19 @@ class TestMorseLifecycleRoutes: class DummyBuilder: def build_fm_demod_command(self, **kwargs): cmd = ['rtl_fm', '-f', '14.060M', '-M', 'usb', '-s', '22050'] - if kwargs.get('direct_sampling') == 2: - cmd.extend(['-E', 'direct2']) + if kwargs.get('direct_sampling') is not None: + cmd.extend(['--direct', str(kwargs['direct_sampling'])]) cmd.append('-') return cmd - def build_iq_capture_command(self, **kwargs): - cmd = ['rtl_sdr', '-f', '14.0593M', '-s', '250000', '-'] - return cmd - monkeypatch.setattr(morse_routes.SDRFactory, 'create_default_device', staticmethod(lambda sdr_type, index: DummyDevice())) monkeypatch.setattr(morse_routes.SDRFactory, 'get_builder', staticmethod(lambda sdr_type: DummyBuilder())) + monkeypatch.setattr(morse_routes, 'get_tool_path', lambda _name: '/usr/bin/multimon-ng') pcm = generate_morse_audio('E', wpm=15, sample_rate=22050) - popen_cmds = [] + rtl_cmds = [] - class FakeProc: + class FakeRtlProc: def __init__(self, stdout_bytes: bytes, returncode: int | None): self.stdout = io.BytesIO(stdout_bytes) self.stderr = io.BytesIO(b'') @@ -363,11 +387,41 @@ class TestMorseLifecycleRoutes: def poll(self): return self.returncode + def terminate(self): + self.returncode = 0 + + def wait(self, timeout=None): + self.returncode = 0 + return 0 + + def kill(self): + self.returncode = -9 + + class FakeMultimonProc: + def __init__(self): + self.stdin = io.BytesIO() + self.returncode = None + + def poll(self): + return self.returncode + + def terminate(self): + self.returncode = 0 + + def wait(self, timeout=None): + self.returncode = 0 + return 0 + + def kill(self): + self.returncode = -9 + def fake_popen(cmd, *args, **kwargs): - popen_cmds.append(cmd) - if len(popen_cmds) == 1: - return FakeProc(b'', 1) - return FakeProc(pcm, None) + if 'multimon' in str(cmd[0]): + return FakeMultimonProc() + rtl_cmds.append(cmd) + if len(rtl_cmds) == 1: + return FakeRtlProc(b'', 1) + return FakeRtlProc(pcm, None) monkeypatch.setattr(morse_routes.subprocess, 'Popen', fake_popen) monkeypatch.setattr(morse_routes, 'register_process', lambda _proc: None) @@ -388,13 +442,13 @@ class TestMorseLifecycleRoutes: }) assert start_resp.status_code == 200 assert start_resp.get_json()['status'] == 'started' - assert len(popen_cmds) >= 2 - assert popen_cmds[0][0] == 'rtl_sdr' - assert '-D' in popen_cmds[0] - assert '2' in popen_cmds[0] - assert popen_cmds[1][0] == 'rtl_sdr' - assert '-D' in popen_cmds[1] - assert '1' in popen_cmds[1] + assert len(rtl_cmds) >= 2 + assert rtl_cmds[0][0] == 'rtl_fm' + assert '--direct' in rtl_cmds[0] + assert '2' in rtl_cmds[0] + assert rtl_cmds[1][0] == 'rtl_fm' + assert '--direct' in rtl_cmds[1] + assert '1' in rtl_cmds[1] stop_resp = client.post('/morse/stop') assert stop_resp.status_code == 200