diff --git a/routes/morse.py b/routes/morse.py index ab057ac..9018fb5 100644 --- a/routes/morse.py +++ b/routes/morse.py @@ -16,7 +16,11 @@ from flask import Blueprint, Response, jsonify, request import app as app_module from utils.event_pipeline import process_event from utils.logging import sensor_logger as logger -from utils.morse import decode_morse_wav_file, morse_decoder_thread +from utils.morse import ( + decode_morse_wav_file, + morse_decoder_thread, + morse_iq_decoder_thread, +) from utils.process import register_process, safe_terminate, unregister_process from utils.sdr import SDRFactory, SDRType from utils.sse import sse_stream_fanout @@ -323,87 +327,70 @@ def start_morse() -> Response: cmd[insert_at:insert_at] = ['-E', 'dc'] return cmd + iq_sample_rate = 250000 + + def _build_iq_cmd(*, direct_sampling_mode: int | None) -> tuple[list[str], float]: + # CW USB-style offset tuning: keep the configured RF frequency sounding + # near the selected tone frequency in the software demod chain. + tune_mhz = max(0.5, float(freq) - (float(tone_freq) / 1_000_000.0)) + iq_cmd = builder.build_iq_capture_command( + device=sdr_device, + frequency_mhz=tune_mhz, + sample_rate=iq_sample_rate, + gain=float(gain) if gain and gain != '0' else None, + ppm=int(ppm) if ppm and ppm != '0' else None, + bias_t=bias_t, + ) + if ( + sdr_device.sdr_type == SDRType.RTL_SDR + and direct_sampling_mode is not None + and '-D' not in iq_cmd + ): + if iq_cmd and iq_cmd[-1] == '-': + iq_cmd[-1:-1] = ['-D', str(direct_sampling_mode)] + else: + iq_cmd.extend(['-D', str(direct_sampling_mode)]) + return iq_cmd, tune_mhz + can_try_direct_sampling = bool(sdr_device.sdr_type == SDRType.RTL_SDR and freq < 24.0) if can_try_direct_sampling: - # Cross-platform note: some rtl_fm builds treat "-l 0" as hard squelch and - # emit no PCM. Try the no-"-l" form first, then legacy variants. - command_attempts = [ - { - 'use_direct_sampling': True, - 'force_squelch_off': False, - 'add_resample_rate': False, - 'add_dc_fast': False, - }, + # Keep rtl_fm attempts first (cheap), then switch to IQ capture fallback. + command_attempts: list[dict[str, Any]] = [ { + 'source': 'rtl_fm', 'use_direct_sampling': True, 'force_squelch_off': False, 'add_resample_rate': True, 'add_dc_fast': True, }, { - 'use_direct_sampling': True, - 'force_squelch_off': True, - 'add_resample_rate': True, - 'add_dc_fast': True, - }, - { + 'source': 'rtl_fm', 'use_direct_sampling': False, 'force_squelch_off': False, 'add_resample_rate': True, 'add_dc_fast': True, }, { - 'use_direct_sampling': False, - 'force_squelch_off': False, - 'add_resample_rate': False, - 'add_dc_fast': False, + 'source': 'iq', + 'direct_sampling_mode': 2, }, { - 'use_direct_sampling': False, - 'force_squelch_off': True, - 'add_resample_rate': True, - 'add_dc_fast': True, - 'merge_stderr': False, - }, - { - # Last-resort compatibility mode: some rtl_fm variants may route - # payload unexpectedly; merge stderr->stdout and strip text logs. - 'use_direct_sampling': False, - 'force_squelch_off': False, - 'add_resample_rate': True, - 'add_dc_fast': True, - 'merge_stderr': True, + 'source': 'iq', + 'direct_sampling_mode': None, }, ] else: command_attempts = [ { - 'use_direct_sampling': False, - 'force_squelch_off': False, - 'add_resample_rate': False, - 'add_dc_fast': False, - 'merge_stderr': False, - }, - { + 'source': 'rtl_fm', 'use_direct_sampling': False, 'force_squelch_off': False, 'add_resample_rate': True, 'add_dc_fast': True, - 'merge_stderr': False, }, { - 'use_direct_sampling': False, - 'force_squelch_off': True, - 'add_resample_rate': True, - 'add_dc_fast': True, - 'merge_stderr': False, - }, - { - 'use_direct_sampling': False, - 'force_squelch_off': False, - 'add_resample_rate': True, - 'add_dc_fast': True, - 'merge_stderr': True, + 'source': 'iq', + 'direct_sampling_mode': None, }, ] @@ -454,20 +441,43 @@ def start_morse() -> Response: full_cmd = '' for attempt_index, attempt in enumerate(command_attempts, start=1): + source = str(attempt.get('source', 'rtl_fm')).strip().lower() use_direct_sampling = bool(attempt.get('use_direct_sampling', False)) force_squelch_off = bool(attempt.get('force_squelch_off', True)) add_resample_rate = bool(attempt.get('add_resample_rate', False)) add_dc_fast = bool(attempt.get('add_dc_fast', False)) - merge_stderr = bool(attempt.get('merge_stderr', False)) + direct_sampling_mode = attempt.get('direct_sampling_mode') + + if source == 'iq': + rtl_cmd, tuned_freq_mhz = _build_iq_cmd( + direct_sampling_mode=int(direct_sampling_mode) + if direct_sampling_mode is not None else None, + ) + thread_target = morse_iq_decoder_thread + attempt_desc = ( + f'source=iq direct_mode={direct_sampling_mode if direct_sampling_mode is not None else "none"} ' + f'iq_sr={iq_sample_rate}' + ) + else: + rtl_cmd = _build_rtl_cmd( + use_direct_sampling=use_direct_sampling, + force_squelch_off=force_squelch_off, + add_resample_rate=add_resample_rate, + add_dc_fast=add_dc_fast, + ) + tuned_freq_mhz = float(freq) + thread_target = morse_decoder_thread + attempt_desc = ( + f'source=rtl_fm direct={int(use_direct_sampling)} ' + f'squelch_forced={int(force_squelch_off)} ' + f'resample={int(add_resample_rate)} dc_fast={int(add_dc_fast)}' + ) - rtl_cmd = _build_rtl_cmd( - use_direct_sampling=use_direct_sampling, - force_squelch_off=force_squelch_off, - add_resample_rate=add_resample_rate, - add_dc_fast=add_dc_fast, - ) full_cmd = ' '.join(rtl_cmd) - logger.info(f'Morse decoder attempt {attempt_index}/{len(command_attempts)}: {full_cmd}') + logger.info( + f'Morse decoder attempt {attempt_index}/{len(command_attempts)} ' + f'({attempt_desc}): {full_cmd}' + ) with contextlib.suppress(queue.Full): app_module.morse_queue.put_nowait({ @@ -478,7 +488,7 @@ def start_morse() -> Response: rtl_process = subprocess.Popen( rtl_cmd, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT if merge_stderr else subprocess.PIPE, + stderr=subprocess.PIPE, bufsize=0, ) register_process(rtl_process) @@ -487,49 +497,67 @@ def start_morse() -> Response: control_queue = queue.Queue(maxsize=16) pcm_ready_event = threading.Event() - if not merge_stderr: - def monitor_stderr( - proc: subprocess.Popen = rtl_process, - proc_stop_event: threading.Event = stop_event, - ) -> None: - if proc.stderr is None: - return - for line in proc.stderr: - if proc_stop_event.is_set(): - break - err_text = line.decode('utf-8', errors='replace').strip() - if err_text: - logger.debug(f'[rtl_fm/morse] {err_text}') - with contextlib.suppress(queue.Full): - app_module.morse_queue.put_nowait({ - 'type': 'info', - 'text': f'[rtl_fm] {err_text}', - }) + def monitor_stderr( + proc: subprocess.Popen = rtl_process, + proc_stop_event: threading.Event = stop_event, + tool_label: str = rtl_cmd[0], + ) -> None: + if proc.stderr is None: + return + for line in proc.stderr: + if proc_stop_event.is_set(): + break + err_text = line.decode('utf-8', errors='replace').strip() + if err_text: + with contextlib.suppress(queue.Full): + app_module.morse_queue.put_nowait({ + 'type': 'info', + 'text': f'[{tool_label}] {err_text}', + }) - stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr') - stderr_thread.start() + stderr_thread = threading.Thread(target=monitor_stderr, daemon=True, name='morse-stderr') + stderr_thread.start() + + if source == 'iq': + decoder_thread = threading.Thread( + target=thread_target, + args=( + rtl_process.stdout, + app_module.morse_queue, + stop_event, + iq_sample_rate, + ), + kwargs={ + 'sample_rate': sample_rate, + 'tone_freq': tone_freq, + 'wpm': wpm, + 'decoder_config': runtime_config, + 'control_queue': control_queue, + 'pcm_ready_event': pcm_ready_event, + }, + daemon=True, + name='morse-decoder', + ) else: - stderr_thread = None - - decoder_thread = threading.Thread( - target=morse_decoder_thread, - args=( - rtl_process.stdout, - app_module.morse_queue, - stop_event, - sample_rate, - tone_freq, - wpm, - ), - kwargs={ - 'decoder_config': runtime_config, - 'control_queue': control_queue, - 'pcm_ready_event': pcm_ready_event, - 'strip_text_chunks': merge_stderr, - }, - daemon=True, - name='morse-decoder', - ) + decoder_thread = threading.Thread( + target=thread_target, + args=( + rtl_process.stdout, + app_module.morse_queue, + stop_event, + sample_rate, + tone_freq, + wpm, + ), + kwargs={ + 'decoder_config': runtime_config, + 'control_queue': control_queue, + 'pcm_ready_event': pcm_ready_event, + 'strip_text_chunks': False, + }, + daemon=True, + name='morse-decoder', + ) decoder_thread.start() startup_deadline = time.monotonic() + 2.0 @@ -541,25 +569,28 @@ def start_morse() -> Response: startup_ok = True break if rtl_process.poll() is not None: - startup_error = f'rtl_fm exited during startup (code {rtl_process.returncode})' + startup_error = f'{rtl_cmd[0]} exited during startup (code {rtl_process.returncode})' break time.sleep(0.05) if startup_ok: - runtime_config['direct_sampling'] = 2 if use_direct_sampling else 0 - runtime_config['force_squelch_off'] = force_squelch_off - runtime_config['resample_rate'] = sample_rate if add_resample_rate else None - runtime_config['dc_fast'] = add_dc_fast - runtime_config['merge_stderr'] = merge_stderr + runtime_config['source'] = source + runtime_config['command'] = full_cmd + runtime_config['tuned_frequency_mhz'] = tuned_freq_mhz + runtime_config['direct_sampling'] = ( + int(direct_sampling_mode) + if source == 'iq' and direct_sampling_mode is not None + else (2 if use_direct_sampling else 0) + ) + runtime_config['iq_sample_rate'] = iq_sample_rate if source == 'iq' else None + runtime_config['direct_sampling_mode'] = direct_sampling_mode if source == 'iq' else None break if not startup_error: startup_error = 'No PCM samples received within startup timeout' attempt_errors.append( - f'attempt {attempt_index}/{len(command_attempts)} ' - f'(direct={int(use_direct_sampling)} squelch_forced={int(force_squelch_off)} ' - f'resample={int(add_resample_rate)} dc_fast={int(add_dc_fast)} merged={int(merge_stderr)}): {startup_error}' + f'attempt {attempt_index}/{len(command_attempts)} ({attempt_desc}): {startup_error}' ) logger.warning(f'Morse startup attempt failed: {attempt_errors[-1]}') @@ -582,11 +613,11 @@ def start_morse() -> Response: decoder_thread = None stderr_thread = None - if rtl_process is None or stop_event is None or control_queue is None or decoder_thread is None or stderr_thread is None: - msg = 'rtl_fm started but no PCM stream was received.' + if rtl_process is None or stop_event is None or control_queue is None or decoder_thread is None: + msg = 'SDR capture started but no PCM stream was received.' if attempt_errors: msg = msg + ' ' + ' | '.join(attempt_errors[-2:]) - logger.error(f'Morse rtl_fm startup failed: {msg}') + logger.error(f'Morse startup failed: {msg}') with app_module.morse_lock: if morse_active_device is not None: app_module.release_sdr_device(morse_active_device) diff --git a/tests/test_morse.py b/tests/test_morse.py index 929259b..95adff8 100644 --- a/tests/test_morse.py +++ b/tests/test_morse.py @@ -379,8 +379,9 @@ class TestMorseLifecycleRoutes: assert start_resp.get_json()['status'] == 'started' assert len(popen_cmds) >= 2 assert '-E' in popen_cmds[0] and 'direct2' in popen_cmds[0] - assert '-l' not in popen_cmds[0] - assert '-E' in popen_cmds[1] and 'direct2' in popen_cmds[1] + assert '-r' in popen_cmds[0] + assert '-A' in popen_cmds[0] + assert '-E' in popen_cmds[1] and 'direct2' not in popen_cmds[1] assert '-r' in popen_cmds[1] assert '-A' in popen_cmds[1] assert 'dc' in popen_cmds[1] diff --git a/utils/morse.py b/utils/morse.py index 2148776..915b953 100644 --- a/utils/morse.py +++ b/utils/morse.py @@ -956,3 +956,254 @@ def morse_decoder_thread( 'status': 'stopped', 'metrics': decoder.get_metrics(), }) + + +def _cu8_to_complex(raw: bytes) -> np.ndarray: + """Convert interleaved unsigned 8-bit IQ to complex64 samples.""" + if len(raw) < 2: + return np.empty(0, dtype=np.complex64) + usable = len(raw) - (len(raw) % 2) + if usable <= 0: + return np.empty(0, dtype=np.complex64) + u8 = np.frombuffer(raw[:usable], dtype=np.uint8).astype(np.float32) + i = (u8[0::2] - 127.5) / 128.0 + q = (u8[1::2] - 127.5) / 128.0 + return (i + 1j * q).astype(np.complex64) + + +def _iq_usb_to_pcm16( + iq_samples: np.ndarray, + iq_sample_rate: int, + audio_sample_rate: int, +) -> bytes: + """Minimal USB demod from complex IQ to 16-bit PCM.""" + if iq_samples.size < 16 or iq_sample_rate <= 0 or audio_sample_rate <= 0: + return b'' + + audio = np.real(iq_samples).astype(np.float64) + audio -= float(np.mean(audio)) + + # Cheap decimation first, then linear resample for exact output rate. + decim = max(1, int(iq_sample_rate // max(audio_sample_rate, 1))) + if decim > 1: + usable = (audio.size // decim) * decim + if usable < decim: + return b'' + audio = audio[:usable].reshape(-1, decim).mean(axis=1) + fs1 = float(iq_sample_rate) / float(decim) + if audio.size < 8: + return b'' + + taps = int(max(1, min(31, fs1 / 12000.0))) + if taps > 1: + kernel = np.ones(taps, dtype=np.float64) / float(taps) + audio = np.convolve(audio, kernel, mode='same') + + if abs(fs1 - float(audio_sample_rate)) > 1.0: + out_len = int(audio.size * float(audio_sample_rate) / fs1) + if out_len < 8: + return b'' + x_old = np.linspace(0.0, 1.0, audio.size, endpoint=False, dtype=np.float64) + x_new = np.linspace(0.0, 1.0, out_len, endpoint=False, dtype=np.float64) + audio = np.interp(x_new, x_old, audio) + + peak = float(np.max(np.abs(audio))) if audio.size else 0.0 + if peak > 0.0: + audio = audio * min(8.0, 0.85 / peak) + + pcm = np.clip(audio, -1.0, 1.0) + return (pcm * 32767.0).astype(np.int16).tobytes() + + +def morse_iq_decoder_thread( + iq_stdout, + output_queue: queue.Queue, + stop_event: threading.Event, + iq_sample_rate: int, + sample_rate: int = 22050, + tone_freq: float = 700.0, + wpm: int = 15, + decoder_config: dict[str, Any] | None = None, + control_queue: queue.Queue | None = None, + pcm_ready_event: threading.Event | None = None, +) -> None: + """Decode Morse from raw IQ (cu8) by in-process USB demodulation.""" + import logging + logger = logging.getLogger('intercept.morse') + + CHUNK = 65536 + SCOPE_INTERVAL = 0.10 + WAITING_INTERVAL = 0.25 + STALLED_AFTER_DATA_SECONDS = 1.5 + + cfg = dict(decoder_config or {}) + decoder = MorseDecoder( + sample_rate=int(cfg.get('sample_rate', sample_rate)), + tone_freq=float(cfg.get('tone_freq', tone_freq)), + wpm=int(cfg.get('wpm', wpm)), + bandwidth_hz=int(cfg.get('bandwidth_hz', 200)), + auto_tone_track=_coerce_bool(cfg.get('auto_tone_track', True), True), + tone_lock=_coerce_bool(cfg.get('tone_lock', False), False), + threshold_mode=_normalize_threshold_mode(cfg.get('threshold_mode', 'auto')), + manual_threshold=float(cfg.get('manual_threshold', 0.0) or 0.0), + threshold_multiplier=float(cfg.get('threshold_multiplier', 2.8) or 2.8), + threshold_offset=float(cfg.get('threshold_offset', 0.0) or 0.0), + wpm_mode=_normalize_wpm_mode(cfg.get('wpm_mode', 'auto')), + wpm_lock=_coerce_bool(cfg.get('wpm_lock', False), False), + min_signal_gate=float(cfg.get('min_signal_gate', 0.0) or 0.0), + ) + + last_scope = time.monotonic() + last_waiting_emit = 0.0 + waiting_since: float | None = None + last_pcm_at: float | None = None + pcm_bytes = 0 + pcm_report_at = time.monotonic() + first_pcm_logged = False + reader_done = threading.Event() + reader_thread: threading.Thread | None = None + + raw_queue: queue.Queue[bytes] = queue.Queue(maxsize=96) + + try: + def _reader_loop() -> None: + try: + while not stop_event.is_set(): + try: + if hasattr(iq_stdout, 'read1'): + data = iq_stdout.read1(CHUNK) + else: + data = iq_stdout.read(CHUNK) + except Exception as e: + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'info', + 'text': f'[iq] reader error: {e}', + }) + break + + if data is None: + continue + if not data: + break + + try: + raw_queue.put(data, timeout=0.2) + except queue.Full: + with contextlib.suppress(queue.Empty): + raw_queue.get_nowait() + with contextlib.suppress(queue.Full): + raw_queue.put_nowait(data) + finally: + reader_done.set() + with contextlib.suppress(queue.Full): + raw_queue.put_nowait(b'') + + reader_thread = threading.Thread( + target=_reader_loop, + daemon=True, + name='morse-iq-reader', + ) + reader_thread.start() + + while not stop_event.is_set(): + if not _drain_control_queue(control_queue, decoder): + break + + try: + raw = raw_queue.get(timeout=0.20) + except queue.Empty: + now = time.monotonic() + should_emit_waiting = False + if last_pcm_at is None: + should_emit_waiting = True + elif (now - last_pcm_at) >= STALLED_AFTER_DATA_SECONDS: + should_emit_waiting = True + + if should_emit_waiting and waiting_since is None: + waiting_since = now + if should_emit_waiting and now - last_waiting_emit >= WAITING_INTERVAL: + last_waiting_emit = now + _emit_waiting_scope(output_queue, waiting_since) + + if reader_done.is_set(): + break + continue + + if not raw: + if reader_done.is_set() and last_pcm_at is None: + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'info', + 'text': '[iq] stream ended before samples were received', + }) + break + + iq = _cu8_to_complex(raw) + pcm = _iq_usb_to_pcm16( + iq_samples=iq, + iq_sample_rate=int(iq_sample_rate), + audio_sample_rate=int(decoder.sample_rate), + ) + if not pcm: + continue + + waiting_since = None + last_pcm_at = time.monotonic() + pcm_bytes += len(pcm) + + if not first_pcm_logged: + first_pcm_logged = True + if pcm_ready_event is not None: + pcm_ready_event.set() + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'info', + 'text': f'[pcm] first IQ demod chunk: {len(pcm)} bytes', + }) + + events = decoder.process_block(pcm) + for event in events: + if event.get('type') == 'scope': + now = time.monotonic() + if now - last_scope >= SCOPE_INTERVAL: + last_scope = now + with contextlib.suppress(queue.Full): + output_queue.put_nowait(event) + else: + with contextlib.suppress(queue.Full): + output_queue.put_nowait(event) + + now = time.monotonic() + if (now - pcm_report_at) >= 1.0: + kbps = (pcm_bytes * 8.0) / max(1e-6, (now - pcm_report_at)) / 1000.0 + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'info', + 'text': f'[pcm] {pcm_bytes} B in {now - pcm_report_at:.1f}s ({kbps:.1f} kbps)', + }) + pcm_bytes = 0 + pcm_report_at = now + + except Exception as e: # pragma: no cover - runtime safety + logger.debug(f'Morse IQ decoder thread error: {e}') + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'info', + 'text': f'[iq] decoder thread error: {e}', + }) + finally: + stop_event.set() + if reader_thread is not None: + reader_thread.join(timeout=0.35) + + for event in decoder.flush(): + with contextlib.suppress(queue.Full): + output_queue.put_nowait(event) + + with contextlib.suppress(queue.Full): + output_queue.put_nowait({ + 'type': 'status', + 'status': 'stopped', + 'metrics': decoder.get_metrics(), + })