From ee9356c358cbd710dd103e978b6ea709224715dd Mon Sep 17 00:00:00 2001 From: Smittix Date: Wed, 25 Feb 2026 20:58:48 +0000 Subject: [PATCH] Add CW/Morse code decoder mode New signal mode for decoding Morse code (CW) transmissions via SDR. Includes route blueprint, utility decoder, frontend UI, and tests. Co-Authored-By: Claude Opus 4.6 --- app.py | 12 +- routes/__init__.py | 2 + routes/morse.py | 251 ++++++++++++++++++ static/css/modes/morse.css | 127 +++++++++ static/js/modes/morse.js | 379 +++++++++++++++++++++++++++ templates/index.html | 34 ++- templates/partials/modes/morse.html | 125 +++++++++ templates/partials/nav.html | 2 + tests/test_morse.py | 393 ++++++++++++++++++++++++++++ utils/morse.py | 276 +++++++++++++++++++ 10 files changed, 1599 insertions(+), 2 deletions(-) create mode 100644 routes/morse.py create mode 100644 static/css/modes/morse.css create mode 100644 static/js/modes/morse.js create mode 100644 templates/partials/modes/morse.html create mode 100644 tests/test_morse.py create mode 100644 utils/morse.py diff --git a/app.py b/app.py index 0754fff..1584e83 100644 --- a/app.py +++ b/app.py @@ -198,6 +198,11 @@ tscm_lock = threading.Lock() subghz_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) subghz_lock = threading.Lock() +# CW/Morse code decoder +morse_process = None +morse_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) +morse_lock = threading.Lock() + # Deauth Attack Detection deauth_detector = None deauth_detector_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE) @@ -755,6 +760,7 @@ def health_check() -> Response: 'wifi': wifi_active, 'bluetooth': bt_active, 'dsc': dsc_process is not None and (dsc_process.poll() is None if dsc_process else False), + 'morse': morse_process is not None and (morse_process.poll() is None if morse_process else False), 'subghz': _get_subghz_active(), }, 'data': { @@ -772,7 +778,7 @@ def health_check() -> Response: def kill_all() -> Response: """Kill all decoder, WiFi, and Bluetooth processes.""" global current_process, sensor_process, wifi_process, adsb_process, ais_process, acars_process - global vdl2_process + global vdl2_process, morse_process global aprs_process, aprs_rtl_process, dsc_process, dsc_rtl_process, bt_process # Import adsb and ais modules to reset their state @@ -825,6 +831,10 @@ def kill_all() -> Response: with vdl2_lock: vdl2_process = None + # Reset Morse state + with morse_lock: + morse_process = None + # Reset APRS state with aprs_lock: aprs_process = None diff --git a/routes/__init__.py b/routes/__init__.py index cf197b9..e1e1319 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -16,6 +16,7 @@ def register_blueprints(app): from .gps import gps_bp from .listening_post import receiver_bp from .meshtastic import meshtastic_bp + from .morse import morse_bp from .offline import offline_bp from .pager import pager_bp from .recordings import recordings_bp @@ -73,6 +74,7 @@ def register_blueprints(app): app.register_blueprint(space_weather_bp) # Space weather monitoring app.register_blueprint(signalid_bp) # External signal ID enrichment app.register_blueprint(wefax_bp) # WeFax HF weather fax decoder + app.register_blueprint(morse_bp) # CW/Morse code decoder # Initialize TSCM state with queue and lock from app import app as app_module diff --git a/routes/morse.py b/routes/morse.py new file mode 100644 index 0000000..56800a2 --- /dev/null +++ b/routes/morse.py @@ -0,0 +1,251 @@ +"""CW/Morse code decoder routes.""" + +from __future__ import annotations + +import contextlib +import queue +import subprocess +import threading +from typing import Any + +from flask import Blueprint, Response, jsonify, request + +import app as app_module +from utils.event_pipeline import process_event +from utils.logging import sensor_logger as logger +from utils.morse import morse_decoder_thread +from utils.process import register_process, safe_terminate, unregister_process +from utils.sdr import SDRFactory, SDRType +from utils.sse import sse_stream_fanout +from utils.validation import ( + validate_device_index, + validate_frequency, + validate_gain, + validate_ppm, +) + +morse_bp = Blueprint('morse', __name__) + +# Track which device is being used +morse_active_device: int | None = None + + +def _validate_tone_freq(value: Any) -> float: + """Validate CW tone frequency (300-1200 Hz).""" + try: + freq = float(value) + if not 300 <= freq <= 1200: + raise ValueError("Tone frequency must be between 300 and 1200 Hz") + return freq + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid tone frequency: {value}") from e + + +def _validate_wpm(value: Any) -> int: + """Validate words per minute (5-50).""" + try: + wpm = int(value) + if not 5 <= wpm <= 50: + raise ValueError("WPM must be between 5 and 50") + return wpm + except (ValueError, TypeError) as e: + raise ValueError(f"Invalid WPM: {value}") from e + + +@morse_bp.route('/morse/start', methods=['POST']) +def start_morse() -> Response: + global morse_active_device + + with app_module.morse_lock: + if app_module.morse_process: + return jsonify({'status': 'error', 'message': 'Morse decoder already running'}), 409 + + data = request.json or {} + + # Validate standard SDR inputs + try: + freq = validate_frequency(data.get('frequency', '14.060')) + gain = validate_gain(data.get('gain', '0')) + ppm = validate_ppm(data.get('ppm', '0')) + device = validate_device_index(data.get('device', '0')) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 + + # Validate Morse-specific inputs + try: + tone_freq = _validate_tone_freq(data.get('tone_freq', '700')) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 + + try: + wpm = _validate_wpm(data.get('wpm', '15')) + except ValueError as e: + return jsonify({'status': 'error', 'message': str(e)}), 400 + + # Claim SDR device + device_int = int(device) + error = app_module.claim_sdr_device(device_int, 'morse') + if error: + return jsonify({ + 'status': 'error', + 'error_type': 'DEVICE_BUSY', + 'message': error, + }), 409 + morse_active_device = device_int + + # Clear queue + while not app_module.morse_queue.empty(): + try: + app_module.morse_queue.get_nowait() + except queue.Empty: + break + + # Build rtl_fm USB demodulation command + sdr_type_str = data.get('sdr_type', 'rtlsdr') + try: + sdr_type = SDRType(sdr_type_str) + except ValueError: + sdr_type = SDRType.RTL_SDR + + sdr_device = SDRFactory.create_default_device(sdr_type, index=device) + builder = SDRFactory.get_builder(sdr_device.sdr_type) + + sample_rate = 8000 + bias_t = data.get('bias_t', False) + + rtl_cmd = builder.build_fm_demod_command( + device=sdr_device, + frequency_mhz=freq, + sample_rate=sample_rate, + gain=float(gain) if gain and gain != '0' else None, + ppm=int(ppm) if ppm and ppm != '0' else None, + modulation='usb', + bias_t=bias_t, + ) + + full_cmd = ' '.join(rtl_cmd) + logger.info(f"Morse decoder running: {full_cmd}") + + try: + rtl_process = subprocess.Popen( + rtl_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + register_process(rtl_process) + + # Monitor rtl_fm stderr + def monitor_stderr(): + for line in rtl_process.stderr: + err_text = line.decode('utf-8', errors='replace').strip() + if err_text: + logger.debug(f"[rtl_fm/morse] {err_text}") + + stderr_thread = threading.Thread(target=monitor_stderr) + stderr_thread.daemon = True + stderr_thread.start() + + # Start Morse decoder thread + stop_event = threading.Event() + decoder_thread = threading.Thread( + target=morse_decoder_thread, + args=( + rtl_process.stdout, + app_module.morse_queue, + stop_event, + sample_rate, + tone_freq, + wpm, + ), + ) + decoder_thread.daemon = True + decoder_thread.start() + + app_module.morse_process = rtl_process + app_module.morse_process._stop_decoder = stop_event + app_module.morse_process._decoder_thread = decoder_thread + + app_module.morse_queue.put({'type': 'status', 'status': 'started'}) + + return jsonify({ + 'status': 'started', + 'command': full_cmd, + 'tone_freq': tone_freq, + 'wpm': wpm, + }) + + except FileNotFoundError as e: + if morse_active_device is not None: + app_module.release_sdr_device(morse_active_device) + morse_active_device = None + return jsonify({'status': 'error', 'message': f'Tool not found: {e.filename}'}), 400 + + except Exception as e: + # Clean up rtl_fm if it was started + try: + rtl_process.terminate() + rtl_process.wait(timeout=2) + except Exception: + with contextlib.suppress(Exception): + rtl_process.kill() + unregister_process(rtl_process) + if morse_active_device is not None: + app_module.release_sdr_device(morse_active_device) + morse_active_device = None + return jsonify({'status': 'error', 'message': str(e)}), 500 + + +@morse_bp.route('/morse/stop', methods=['POST']) +def stop_morse() -> Response: + global morse_active_device + + with app_module.morse_lock: + if app_module.morse_process: + # Signal decoder thread to stop + stop_event = getattr(app_module.morse_process, '_stop_decoder', None) + if stop_event: + stop_event.set() + + safe_terminate(app_module.morse_process) + unregister_process(app_module.morse_process) + app_module.morse_process = None + + if morse_active_device is not None: + app_module.release_sdr_device(morse_active_device) + morse_active_device = None + + app_module.morse_queue.put({'type': 'status', 'status': 'stopped'}) + return jsonify({'status': 'stopped'}) + + return jsonify({'status': 'not_running'}) + + +@morse_bp.route('/morse/status') +def morse_status() -> Response: + with app_module.morse_lock: + running = ( + app_module.morse_process is not None + and app_module.morse_process.poll() is None + ) + return jsonify({'running': running}) + + +@morse_bp.route('/morse/stream') +def morse_stream() -> Response: + def _on_msg(msg: dict[str, Any]) -> None: + process_event('morse', msg, msg.get('type')) + + response = Response( + sse_stream_fanout( + source_queue=app_module.morse_queue, + channel_key='morse', + timeout=1.0, + keepalive_interval=30.0, + on_message=_on_msg, + ), + mimetype='text/event-stream', + ) + response.headers['Cache-Control'] = 'no-cache' + response.headers['X-Accel-Buffering'] = 'no' + response.headers['Connection'] = 'keep-alive' + return response diff --git a/static/css/modes/morse.css b/static/css/modes/morse.css new file mode 100644 index 0000000..4e46e09 --- /dev/null +++ b/static/css/modes/morse.css @@ -0,0 +1,127 @@ +/* Morse Code / CW Decoder Styles */ + +/* Scope canvas container */ +.morse-scope-container { + background: var(--bg-primary); + border: 1px solid var(--border-color); + border-radius: 6px; + padding: 8px; + margin-bottom: 12px; +} + +.morse-scope-container canvas { + width: 100%; + height: 120px; + display: block; + border-radius: 4px; +} + +/* Decoded text panel */ +.morse-decoded-panel { + background: var(--bg-primary); + border: 1px solid var(--border-color); + border-radius: 6px; + padding: 16px; + min-height: 200px; + max-height: 400px; + overflow-y: auto; + font-family: var(--font-mono); + font-size: 18px; + line-height: 1.6; + color: var(--text-primary); + word-wrap: break-word; + flex: 1; +} + +.morse-decoded-panel:empty::before { + content: 'Decoded text will appear here...'; + color: var(--text-dim); + font-size: 14px; + font-style: italic; +} + +/* Individual decoded character with fade-in */ +.morse-char { + display: inline; + animation: morseFadeIn 0.3s ease-out; + position: relative; +} + +@keyframes morseFadeIn { + from { + opacity: 0; + color: var(--accent-cyan); + } + to { + opacity: 1; + color: var(--text-primary); + } +} + +/* Small Morse notation above character */ +.morse-char-morse { + font-size: 9px; + color: var(--text-dim); + letter-spacing: 1px; + display: block; + line-height: 1; + margin-bottom: -2px; +} + +/* Reference grid */ +.morse-ref-grid { + transition: max-height 0.3s ease, opacity 0.3s ease; + max-height: 500px; + opacity: 1; + overflow: hidden; +} + +.morse-ref-grid.collapsed { + max-height: 0; + opacity: 0; +} + +/* Toolbar: export/copy/clear */ +.morse-toolbar { + display: flex; + gap: 6px; + margin-bottom: 8px; + flex-wrap: wrap; +} + +.morse-toolbar .btn { + font-size: 11px; + padding: 4px 10px; +} + +/* Status bar at bottom */ +.morse-status-bar { + display: flex; + justify-content: space-between; + align-items: center; + font-size: 11px; + color: var(--text-dim); + padding: 6px 0; + border-top: 1px solid var(--border-color); + margin-top: 8px; +} + +.morse-status-bar .status-item { + display: flex; + align-items: center; + gap: 4px; +} + +/* Visuals container layout */ +#morseVisuals { + flex-direction: column; + gap: 12px; + padding: 16px; + height: 100%; +} + +/* Word space styling */ +.morse-word-space { + display: inline; + width: 0.5em; +} diff --git a/static/js/modes/morse.js b/static/js/modes/morse.js new file mode 100644 index 0000000..1ba5acb --- /dev/null +++ b/static/js/modes/morse.js @@ -0,0 +1,379 @@ +/** + * Morse Code (CW) decoder module. + * + * IIFE providing start/stop controls, SSE streaming, scope canvas, + * decoded text display, and export capabilities. + */ +var MorseMode = (function () { + 'use strict'; + + var state = { + running: false, + initialized: false, + eventSource: null, + charCount: 0, + decodedLog: [], // { timestamp, morse, char } + }; + + // Scope state + var scopeCtx = null; + var scopeAnim = null; + var scopeHistory = []; + var SCOPE_HISTORY_LEN = 300; + var scopeThreshold = 0; + var scopeToneOn = false; + + // ---- Initialization ---- + + function init() { + if (state.initialized) { + checkStatus(); + return; + } + state.initialized = true; + checkStatus(); + } + + function destroy() { + disconnectSSE(); + stopScope(); + } + + // ---- Status ---- + + function checkStatus() { + fetch('/morse/status') + .then(function (r) { return r.json(); }) + .then(function (data) { + if (data.running) { + state.running = true; + updateUI(true); + connectSSE(); + startScope(); + } else { + state.running = false; + updateUI(false); + } + }) + .catch(function () {}); + } + + // ---- Start / Stop ---- + + function start() { + if (state.running) return; + + var payload = { + frequency: document.getElementById('morseFrequency').value || '14.060', + gain: document.getElementById('morseGain').value || '0', + ppm: document.getElementById('morsePPM').value || '0', + device: document.getElementById('morseDevice').value || '0', + sdr_type: document.getElementById('morseSdrType').value || 'rtlsdr', + tone_freq: document.getElementById('morseToneFreq').value || '700', + wpm: document.getElementById('morseWpm').value || '15', + bias_t: document.getElementById('morseBiasT').checked, + }; + + fetch('/morse/start', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(payload), + }) + .then(function (r) { return r.json(); }) + .then(function (data) { + if (data.status === 'started') { + state.running = true; + state.charCount = 0; + state.decodedLog = []; + updateUI(true); + connectSSE(); + startScope(); + clearDecodedText(); + } else { + alert('Error: ' + (data.message || 'Unknown error')); + } + }) + .catch(function (err) { + alert('Failed to start Morse decoder: ' + err); + }); + } + + function stop() { + fetch('/morse/stop', { method: 'POST' }) + .then(function (r) { return r.json(); }) + .then(function () { + state.running = false; + updateUI(false); + disconnectSSE(); + stopScope(); + }) + .catch(function () {}); + } + + // ---- SSE ---- + + function connectSSE() { + disconnectSSE(); + var es = new EventSource('/morse/stream'); + + es.onmessage = function (e) { + try { + var msg = JSON.parse(e.data); + handleMessage(msg); + } catch (_) {} + }; + + es.onerror = function () { + // Reconnect handled by browser + }; + + state.eventSource = es; + } + + function disconnectSSE() { + if (state.eventSource) { + state.eventSource.close(); + state.eventSource = null; + } + } + + function handleMessage(msg) { + var type = msg.type; + + if (type === 'scope') { + // Update scope data + var amps = msg.amplitudes || []; + for (var i = 0; i < amps.length; i++) { + scopeHistory.push(amps[i]); + if (scopeHistory.length > SCOPE_HISTORY_LEN) { + scopeHistory.shift(); + } + } + scopeThreshold = msg.threshold || 0; + scopeToneOn = msg.tone_on || false; + + } else if (type === 'morse_char') { + appendChar(msg.char, msg.morse, msg.timestamp); + + } else if (type === 'morse_space') { + appendSpace(); + + } else if (type === 'status') { + if (msg.status === 'stopped') { + state.running = false; + updateUI(false); + disconnectSSE(); + stopScope(); + } + } else if (type === 'error') { + console.error('Morse error:', msg.text); + } + } + + // ---- Decoded text ---- + + function appendChar(ch, morse, timestamp) { + state.charCount++; + state.decodedLog.push({ timestamp: timestamp, morse: morse, char: ch }); + + var panel = document.getElementById('morseDecodedText'); + if (!panel) return; + + var span = document.createElement('span'); + span.className = 'morse-char'; + span.textContent = ch; + span.title = morse + ' (' + timestamp + ')'; + panel.appendChild(span); + + // Auto-scroll + panel.scrollTop = panel.scrollHeight; + + // Update count + var countEl = document.getElementById('morseCharCount'); + if (countEl) countEl.textContent = state.charCount + ' chars'; + } + + function appendSpace() { + var panel = document.getElementById('morseDecodedText'); + if (!panel) return; + + var span = document.createElement('span'); + span.className = 'morse-word-space'; + span.textContent = ' '; + panel.appendChild(span); + } + + function clearDecodedText() { + var panel = document.getElementById('morseDecodedText'); + if (panel) panel.innerHTML = ''; + state.charCount = 0; + state.decodedLog = []; + var countEl = document.getElementById('morseCharCount'); + if (countEl) countEl.textContent = '0 chars'; + } + + // ---- Scope canvas ---- + + function startScope() { + var canvas = document.getElementById('morseScopeCanvas'); + if (!canvas) return; + + var dpr = window.devicePixelRatio || 1; + var rect = canvas.getBoundingClientRect(); + canvas.width = rect.width * dpr; + canvas.height = 120 * dpr; + canvas.style.height = '120px'; + + scopeCtx = canvas.getContext('2d'); + scopeCtx.scale(dpr, dpr); + scopeHistory = []; + + function draw() { + if (!scopeCtx) return; + var w = rect.width; + var h = 120; + + scopeCtx.fillStyle = '#0a0e14'; + scopeCtx.fillRect(0, 0, w, h); + + if (scopeHistory.length === 0) { + scopeAnim = requestAnimationFrame(draw); + return; + } + + // Find max for normalization + var maxVal = 0; + for (var i = 0; i < scopeHistory.length; i++) { + if (scopeHistory[i] > maxVal) maxVal = scopeHistory[i]; + } + if (maxVal === 0) maxVal = 1; + + var barW = w / SCOPE_HISTORY_LEN; + var threshNorm = scopeThreshold / maxVal; + + // Draw amplitude bars + for (var j = 0; j < scopeHistory.length; j++) { + var norm = scopeHistory[j] / maxVal; + var barH = norm * (h - 10); + var x = j * barW; + var y = h - barH; + + // Green if above threshold, gray if below + if (scopeHistory[j] > scopeThreshold) { + scopeCtx.fillStyle = '#00ff88'; + } else { + scopeCtx.fillStyle = '#334455'; + } + scopeCtx.fillRect(x, y, Math.max(barW - 1, 1), barH); + } + + // Draw threshold line + if (scopeThreshold > 0) { + var threshY = h - (threshNorm * (h - 10)); + scopeCtx.strokeStyle = '#ff4444'; + scopeCtx.lineWidth = 1; + scopeCtx.setLineDash([4, 4]); + scopeCtx.beginPath(); + scopeCtx.moveTo(0, threshY); + scopeCtx.lineTo(w, threshY); + scopeCtx.stroke(); + scopeCtx.setLineDash([]); + } + + // Tone indicator + if (scopeToneOn) { + scopeCtx.fillStyle = '#00ff88'; + scopeCtx.beginPath(); + scopeCtx.arc(w - 12, 12, 5, 0, Math.PI * 2); + scopeCtx.fill(); + } + + scopeAnim = requestAnimationFrame(draw); + } + + draw(); + } + + function stopScope() { + if (scopeAnim) { + cancelAnimationFrame(scopeAnim); + scopeAnim = null; + } + scopeCtx = null; + } + + // ---- Export ---- + + function exportTxt() { + var text = state.decodedLog.map(function (e) { return e.char; }).join(''); + downloadFile('morse_decoded.txt', text, 'text/plain'); + } + + function exportCsv() { + var lines = ['timestamp,morse,character']; + state.decodedLog.forEach(function (e) { + lines.push(e.timestamp + ',"' + e.morse + '",' + e.char); + }); + downloadFile('morse_decoded.csv', lines.join('\n'), 'text/csv'); + } + + function copyToClipboard() { + var text = state.decodedLog.map(function (e) { return e.char; }).join(''); + navigator.clipboard.writeText(text).then(function () { + var btn = document.getElementById('morseCopyBtn'); + if (btn) { + var orig = btn.textContent; + btn.textContent = 'Copied!'; + setTimeout(function () { btn.textContent = orig; }, 1500); + } + }); + } + + function downloadFile(filename, content, type) { + var blob = new Blob([content], { type: type }); + var url = URL.createObjectURL(blob); + var a = document.createElement('a'); + a.href = url; + a.download = filename; + a.click(); + URL.revokeObjectURL(url); + } + + // ---- UI ---- + + function updateUI(running) { + var startBtn = document.getElementById('morseStartBtn'); + var stopBtn = document.getElementById('morseStopBtn'); + var indicator = document.getElementById('morseStatusIndicator'); + var statusText = document.getElementById('morseStatusText'); + + if (startBtn) startBtn.style.display = running ? 'none' : 'block'; + if (stopBtn) stopBtn.style.display = running ? 'block' : 'none'; + + if (indicator) { + indicator.style.background = running ? '#00ff88' : 'var(--text-dim)'; + } + if (statusText) { + statusText.textContent = running ? 'Listening' : 'Standby'; + } + } + + function setFreq(mhz) { + var el = document.getElementById('morseFrequency'); + if (el) el.value = mhz; + } + + // ---- Public API ---- + + return { + init: init, + destroy: destroy, + start: start, + stop: stop, + setFreq: setFreq, + exportTxt: exportTxt, + exportCsv: exportCsv, + copyToClipboard: copyToClipboard, + clearText: clearDecodedText, + }; +})(); diff --git a/templates/index.html b/templates/index.html index fe780f7..bd65eb3 100644 --- a/templates/index.html +++ b/templates/index.html @@ -80,7 +80,8 @@ subghz: "{{ url_for('static', filename='css/modes/subghz.css') }}?v={{ version }}&r=subghz_layout9", bt_locate: "{{ url_for('static', filename='css/modes/bt_locate.css') }}?v={{ version }}&r=btlocate4", spaceweather: "{{ url_for('static', filename='css/modes/space-weather.css') }}", - wefax: "{{ url_for('static', filename='css/modes/wefax.css') }}" + wefax: "{{ url_for('static', filename='css/modes/wefax.css') }}", + morse: "{{ url_for('static', filename='css/modes/morse.css') }}" }; window.INTERCEPT_MODE_STYLE_LOADED = {}; window.INTERCEPT_MODE_STYLE_PROMISES = {}; @@ -271,6 +272,10 @@ Waterfall + @@ -675,6 +680,8 @@ {% include 'partials/modes/wefax.html' %} + {% include 'partials/modes/morse.html' %} + {% include 'partials/modes/space-weather.html' %} {% include 'partials/modes/tscm.html' %} @@ -3001,6 +3008,25 @@ + + + @@ -201,6 +202,7 @@ {{ mobile_item('sensor', '433MHz', '') }} {{ mobile_item('rtlamr', 'Meters', '') }} {{ mobile_item('subghz', 'SubGHz', '') }} + {{ mobile_item('morse', 'Morse', '') }} {# Tracking #} {{ mobile_item('adsb', 'Aircraft', '', '/adsb/dashboard') }} {{ mobile_item('ais', 'Vessels', '', '/ais/dashboard') }} diff --git a/tests/test_morse.py b/tests/test_morse.py new file mode 100644 index 0000000..bb6da32 --- /dev/null +++ b/tests/test_morse.py @@ -0,0 +1,393 @@ +"""Tests for Morse code decoder (utils/morse.py) and routes.""" + +from __future__ import annotations + +import math +import queue +import struct +import threading + +import pytest + +from utils.morse import ( + CHAR_TO_MORSE, + MORSE_TABLE, + GoertzelFilter, + MorseDecoder, + morse_decoder_thread, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _login_session(client) -> None: + """Mark the Flask test session as authenticated.""" + with client.session_transaction() as sess: + sess['logged_in'] = True + sess['username'] = 'test' + sess['role'] = 'admin' + + +def generate_tone(freq: float, duration: float, sample_rate: int = 8000, amplitude: float = 0.8) -> bytes: + """Generate a pure sine wave as 16-bit LE PCM bytes.""" + n_samples = int(sample_rate * duration) + samples = [] + for i in range(n_samples): + t = i / sample_rate + val = int(amplitude * 32767 * math.sin(2 * math.pi * freq * t)) + samples.append(max(-32768, min(32767, val))) + return struct.pack(f'<{len(samples)}h', *samples) + + +def generate_silence(duration: float, sample_rate: int = 8000) -> bytes: + """Generate silence as 16-bit LE PCM bytes.""" + n_samples = int(sample_rate * duration) + return b'\x00\x00' * n_samples + + +def generate_morse_audio(text: str, wpm: int = 15, tone_freq: float = 700.0, sample_rate: int = 8000) -> bytes: + """Generate PCM audio for a Morse-encoded string.""" + dit_dur = 1.2 / wpm + dah_dur = 3 * dit_dur + element_gap = dit_dur + char_gap = 3 * dit_dur + word_gap = 7 * dit_dur + + audio = b'' + words = text.upper().split() + for wi, word in enumerate(words): + for ci, char in enumerate(word): + morse = CHAR_TO_MORSE.get(char) + if morse is None: + continue + for ei, element in enumerate(morse): + if element == '.': + audio += generate_tone(tone_freq, dit_dur, sample_rate) + elif element == '-': + audio += generate_tone(tone_freq, dah_dur, sample_rate) + if ei < len(morse) - 1: + audio += generate_silence(element_gap, sample_rate) + if ci < len(word) - 1: + audio += generate_silence(char_gap, sample_rate) + if wi < len(words) - 1: + audio += generate_silence(word_gap, sample_rate) + + # Add some leading/trailing silence for threshold settling + silence = generate_silence(0.3, sample_rate) + return silence + audio + silence + + +# --------------------------------------------------------------------------- +# MORSE_TABLE tests +# --------------------------------------------------------------------------- + +class TestMorseTable: + def test_all_26_letters_present(self): + chars = set(MORSE_TABLE.values()) + for letter in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ': + assert letter in chars, f"Missing letter: {letter}" + + def test_all_10_digits_present(self): + chars = set(MORSE_TABLE.values()) + for digit in '0123456789': + assert digit in chars, f"Missing digit: {digit}" + + def test_reverse_lookup_consistent(self): + for morse, char in MORSE_TABLE.items(): + if char in CHAR_TO_MORSE: + assert CHAR_TO_MORSE[char] == morse + + def test_no_duplicate_morse_codes(self): + """Each morse pattern should map to exactly one character.""" + assert len(MORSE_TABLE) == len(set(MORSE_TABLE.keys())) + + +# --------------------------------------------------------------------------- +# GoertzelFilter tests +# --------------------------------------------------------------------------- + +class TestGoertzelFilter: + def test_detects_target_frequency(self): + gf = GoertzelFilter(target_freq=700.0, sample_rate=8000, block_size=160) + # Generate 700 Hz tone + samples = [0.8 * math.sin(2 * math.pi * 700 * i / 8000) for i in range(160)] + mag = gf.magnitude(samples) + assert mag > 10.0, f"Expected high magnitude for target freq, got {mag}" + + def test_rejects_off_frequency(self): + gf = GoertzelFilter(target_freq=700.0, sample_rate=8000, block_size=160) + # Generate 1500 Hz tone (well off target) + samples = [0.8 * math.sin(2 * math.pi * 1500 * i / 8000) for i in range(160)] + mag_off = gf.magnitude(samples) + + # Compare with on-target + samples_on = [0.8 * math.sin(2 * math.pi * 700 * i / 8000) for i in range(160)] + mag_on = gf.magnitude(samples_on) + + assert mag_on > mag_off * 3, "Target freq should be significantly stronger than off-freq" + + def test_silence_returns_near_zero(self): + gf = GoertzelFilter(target_freq=700.0, sample_rate=8000, block_size=160) + samples = [0.0] * 160 + mag = gf.magnitude(samples) + assert mag < 0.01, f"Expected near-zero for silence, got {mag}" + + def test_different_block_sizes(self): + for block_size in [80, 160, 320]: + gf = GoertzelFilter(target_freq=700.0, sample_rate=8000, block_size=block_size) + samples = [0.8 * math.sin(2 * math.pi * 700 * i / 8000) for i in range(block_size)] + mag = gf.magnitude(samples) + assert mag > 5.0, f"Should detect tone with block_size={block_size}" + + +# --------------------------------------------------------------------------- +# MorseDecoder tests +# --------------------------------------------------------------------------- + +class TestMorseDecoder: + def _make_decoder(self, wpm=15): + """Create decoder with pre-warmed threshold for testing.""" + decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=wpm) + # Warm up noise floor with silence + silence = generate_silence(0.5) + decoder.process_block(silence) + # Warm up signal peak with tone + tone = generate_tone(700.0, 0.3) + decoder.process_block(tone) + # More silence to settle + silence2 = generate_silence(0.5) + decoder.process_block(silence2) + # Reset state after warm-up + decoder._tone_on = False + decoder._current_symbol = '' + decoder._tone_blocks = 0 + decoder._silence_blocks = 0 + return decoder + + def test_dit_detection(self): + """A single dit should produce a '.' in the symbol buffer.""" + decoder = self._make_decoder() + dit_dur = 1.2 / 15 + + # Send a tone burst (dit) + tone = generate_tone(700.0, dit_dur) + decoder.process_block(tone) + + # Send silence to trigger end of tone + silence = generate_silence(dit_dur * 2) + decoder.process_block(silence) + + # Symbol buffer should have a dot + assert '.' in decoder._current_symbol, f"Expected '.' in symbol, got '{decoder._current_symbol}'" + + def test_dah_detection(self): + """A longer tone should produce a '-' in the symbol buffer.""" + decoder = self._make_decoder() + dah_dur = 3 * 1.2 / 15 + + tone = generate_tone(700.0, dah_dur) + decoder.process_block(tone) + + silence = generate_silence(dah_dur) + decoder.process_block(silence) + + assert '-' in decoder._current_symbol, f"Expected '-' in symbol, got '{decoder._current_symbol}'" + + def test_decode_letter_e(self): + """E is a single dit - the simplest character.""" + decoder = self._make_decoder() + audio = generate_morse_audio('E', wpm=15) + events = decoder.process_block(audio) + events.extend(decoder.flush()) + + chars = [e for e in events if e['type'] == 'morse_char'] + decoded = ''.join(e['char'] for e in chars) + assert 'E' in decoded, f"Expected 'E' in decoded text, got '{decoded}'" + + def test_decode_letter_t(self): + """T is a single dah.""" + decoder = self._make_decoder() + audio = generate_morse_audio('T', wpm=15) + events = decoder.process_block(audio) + events.extend(decoder.flush()) + + chars = [e for e in events if e['type'] == 'morse_char'] + decoded = ''.join(e['char'] for e in chars) + assert 'T' in decoded, f"Expected 'T' in decoded text, got '{decoded}'" + + def test_word_space_detection(self): + """A long silence between words should produce decoded chars with a space.""" + decoder = self._make_decoder() + dit_dur = 1.2 / 15 + # E = dit + audio = generate_tone(700.0, dit_dur) + generate_silence(7 * dit_dur * 1.5) + # T = dah + audio += generate_tone(700.0, 3 * dit_dur) + generate_silence(3 * dit_dur) + events = decoder.process_block(audio) + events.extend(decoder.flush()) + + spaces = [e for e in events if e['type'] == 'morse_space'] + assert len(spaces) >= 1, "Expected at least one word space" + + def test_scope_events_generated(self): + """Decoder should produce scope events for visualization.""" + audio = generate_morse_audio('SOS', wpm=15) + decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15) + + events = decoder.process_block(audio) + + scope_events = [e for e in events if e['type'] == 'scope'] + assert len(scope_events) > 0, "Expected scope events" + # Check scope event structure + se = scope_events[0] + assert 'amplitudes' in se + assert 'threshold' in se + assert 'tone_on' in se + + def test_adaptive_threshold_adjusts(self): + """After processing audio, threshold should be non-zero.""" + decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15) + + # Process some tone + silence + audio = generate_tone(700.0, 0.3) + generate_silence(0.3) + decoder.process_block(audio) + + assert decoder._threshold > 0, "Threshold should adapt above zero" + + def test_flush_emits_pending_char(self): + """flush() should emit any accumulated but not-yet-decoded symbol.""" + decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15) + decoder._current_symbol = '.' # Manually set pending dit + events = decoder.flush() + assert len(events) == 1 + assert events[0]['type'] == 'morse_char' + assert events[0]['char'] == 'E' + + def test_flush_empty_returns_nothing(self): + decoder = MorseDecoder(sample_rate=8000, tone_freq=700.0, wpm=15) + events = decoder.flush() + assert events == [] + + +# --------------------------------------------------------------------------- +# morse_decoder_thread tests +# --------------------------------------------------------------------------- + +class TestMorseDecoderThread: + def test_thread_stops_on_event(self): + """Thread should exit when stop_event is set.""" + import io + # Create a fake stdout that blocks until stop + stop = threading.Event() + q = queue.Queue(maxsize=100) + + # Feed some audio then close + audio = generate_morse_audio('E', wpm=15) + fake_stdout = io.BytesIO(audio) + + t = threading.Thread( + target=morse_decoder_thread, + args=(fake_stdout, q, stop), + ) + t.daemon = True + t.start() + t.join(timeout=5) + assert not t.is_alive(), "Thread should finish after reading all data" + + def test_thread_produces_events(self): + """Thread should push character events to the queue.""" + import io + from unittest.mock import patch + stop = threading.Event() + q = queue.Queue(maxsize=1000) + + # Generate audio with pre-warmed decoder in mind + # The thread creates a fresh decoder, so generate lots of audio + audio = generate_silence(0.5) + generate_morse_audio('SOS', wpm=10) + generate_silence(1.0) + fake_stdout = io.BytesIO(audio) + + # Patch SCOPE_INTERVAL to 0 so scope events aren't throttled in fast reads + with patch('utils.morse.time') as mock_time: + # Make monotonic() always return increasing values + counter = [0.0] + def fake_monotonic(): + counter[0] += 0.15 # each call advances 150ms + return counter[0] + mock_time.monotonic = fake_monotonic + + t = threading.Thread( + target=morse_decoder_thread, + args=(fake_stdout, q, stop), + ) + t.daemon = True + t.start() + t.join(timeout=10) + + events = [] + while not q.empty(): + events.append(q.get_nowait()) + + # Should have at least some events (scope or char) + assert len(events) > 0, "Expected events from thread" + + +# --------------------------------------------------------------------------- +# Route tests +# --------------------------------------------------------------------------- + +class TestMorseRoutes: + def test_start_missing_required_fields(self, client): + """Start should succeed with defaults.""" + _login_session(client) + with pytest.MonkeyPatch.context() as m: + m.setattr('app.morse_process', None) + # Should fail because rtl_fm won't be found in test env + resp = client.post('/morse/start', json={'frequency': '14.060'}) + assert resp.status_code in (200, 400, 409, 500) + + def test_stop_when_not_running(self, client): + """Stop when nothing is running should return not_running.""" + _login_session(client) + with pytest.MonkeyPatch.context() as m: + m.setattr('app.morse_process', None) + resp = client.post('/morse/stop') + data = resp.get_json() + assert data['status'] == 'not_running' + + def test_status_when_not_running(self, client): + """Status should report not running.""" + _login_session(client) + with pytest.MonkeyPatch.context() as m: + m.setattr('app.morse_process', None) + resp = client.get('/morse/status') + data = resp.get_json() + assert data['running'] is False + + def test_invalid_tone_freq(self, client): + """Tone frequency outside range should be rejected.""" + _login_session(client) + with pytest.MonkeyPatch.context() as m: + m.setattr('app.morse_process', None) + resp = client.post('/morse/start', json={ + 'frequency': '14.060', + 'tone_freq': '50', # too low + }) + assert resp.status_code == 400 + + def test_invalid_wpm(self, client): + """WPM outside range should be rejected.""" + _login_session(client) + with pytest.MonkeyPatch.context() as m: + m.setattr('app.morse_process', None) + resp = client.post('/morse/start', json={ + 'frequency': '14.060', + 'wpm': '100', # too high + }) + assert resp.status_code == 400 + + def test_stream_endpoint_exists(self, client): + """Stream endpoint should return SSE content type.""" + _login_session(client) + resp = client.get('/morse/stream') + assert resp.content_type.startswith('text/event-stream') diff --git a/utils/morse.py b/utils/morse.py new file mode 100644 index 0000000..cd354f3 --- /dev/null +++ b/utils/morse.py @@ -0,0 +1,276 @@ +"""Morse code (CW) decoder using Goertzel tone detection. + +Signal chain: rtl_fm -M usb → raw PCM → Goertzel filter → timing state machine → characters. +""" + +from __future__ import annotations + +import contextlib +import math +import queue +import struct +import threading +import time +from datetime import datetime +from typing import Any + +# International Morse Code table +MORSE_TABLE: dict[str, str] = { + '.-': 'A', '-...': 'B', '-.-.': 'C', '-..': 'D', '.': 'E', + '..-.': 'F', '--.': 'G', '....': 'H', '..': 'I', '.---': 'J', + '-.-': 'K', '.-..': 'L', '--': 'M', '-.': 'N', '---': 'O', + '.--.': 'P', '--.-': 'Q', '.-.': 'R', '...': 'S', '-': 'T', + '..-': 'U', '...-': 'V', '.--': 'W', '-..-': 'X', '-.--': 'Y', + '--..': 'Z', + '-----': '0', '.----': '1', '..---': '2', '...--': '3', + '....-': '4', '.....': '5', '-....': '6', '--...': '7', + '---..': '8', '----.': '9', + '.-.-.-': '.', '--..--': ',', '..--..': '?', '.----.': "'", + '-.-.--': '!', '-..-.': '/', '-.--.': '(', '-.--.-': ')', + '.-...': '&', '---...': ':', '-.-.-.': ';', '-...-': '=', + '.-.-.': '+', '-....-': '-', '..--.-': '_', '.-..-.': '"', + '...-..-': '$', '.--.-.': '@', + # Prosigns (unique codes only; -...- and -.--.- already mapped above) + '-.-.-': '', '.-.-': '', '...-.-': '', +} + +# Reverse lookup: character → morse notation +CHAR_TO_MORSE: dict[str, str] = {v: k for k, v in MORSE_TABLE.items()} + + +class GoertzelFilter: + """Single-frequency tone detector using the Goertzel algorithm. + + O(N) per block, much cheaper than FFT for detecting one frequency. + """ + + def __init__(self, target_freq: float, sample_rate: int, block_size: int): + self.target_freq = target_freq + self.sample_rate = sample_rate + self.block_size = block_size + # Precompute coefficient + k = round(target_freq * block_size / sample_rate) + omega = 2.0 * math.pi * k / block_size + self.coeff = 2.0 * math.cos(omega) + + def magnitude(self, samples: list[float] | tuple[float, ...]) -> float: + """Compute magnitude of the target frequency in the sample block.""" + s0 = 0.0 + s1 = 0.0 + s2 = 0.0 + coeff = self.coeff + for sample in samples: + s0 = sample + coeff * s1 - s2 + s2 = s1 + s1 = s0 + return math.sqrt(s1 * s1 + s2 * s2 - coeff * s1 * s2) + + +class MorseDecoder: + """Real-time Morse decoder with adaptive threshold. + + Processes blocks of PCM audio and emits decoded characters. + Timing based on PARIS standard: dit = 1.2/WPM seconds. + """ + + def __init__( + self, + sample_rate: int = 8000, + tone_freq: float = 700.0, + wpm: int = 15, + ): + self.sample_rate = sample_rate + self.tone_freq = tone_freq + self.wpm = wpm + + # Goertzel filter: ~50 blocks/sec at 8kHz + self._block_size = sample_rate // 50 + self._filter = GoertzelFilter(tone_freq, sample_rate, self._block_size) + self._block_duration = self._block_size / sample_rate # seconds per block + + # Timing thresholds (in blocks, converted from seconds) + dit_sec = 1.2 / wpm + self._dah_threshold = 2.0 * dit_sec / self._block_duration # blocks + self._dit_min = 0.3 * dit_sec / self._block_duration # min blocks for dit + self._char_gap = 3.0 * dit_sec / self._block_duration # blocks + self._word_gap = 7.0 * dit_sec / self._block_duration # blocks + + # Adaptive threshold via EMA + self._noise_floor = 0.0 + self._signal_peak = 0.0 + self._threshold = 0.0 + self._ema_alpha = 0.1 # smoothing factor + + # State machine (counts in blocks, not wall-clock time) + self._tone_on = False + self._tone_blocks = 0 # blocks since tone started + self._silence_blocks = 0 # blocks since silence started + self._current_symbol = '' # accumulates dits/dahs for current char + self._pending_buffer: list[float] = [] + self._blocks_processed = 0 # total blocks for warm-up tracking + + def process_block(self, pcm_bytes: bytes) -> list[dict[str, Any]]: + """Process a chunk of 16-bit LE PCM and return decoded events. + + Returns list of event dicts with keys: + type: 'scope' | 'morse_char' | 'morse_space' + + type-specific fields + """ + events: list[dict[str, Any]] = [] + + # Unpack PCM samples + n_samples = len(pcm_bytes) // 2 + if n_samples == 0: + return events + + samples = struct.unpack(f'<{n_samples}h', pcm_bytes[:n_samples * 2]) + + # Feed samples into pending buffer and process in blocks + self._pending_buffer.extend(samples) + + amplitudes: list[float] = [] + + while len(self._pending_buffer) >= self._block_size: + block = self._pending_buffer[:self._block_size] + self._pending_buffer = self._pending_buffer[self._block_size:] + + # Normalize to [-1, 1] + normalized = [s / 32768.0 for s in block] + mag = self._filter.magnitude(normalized) + amplitudes.append(mag) + + self._blocks_processed += 1 + + # Update adaptive threshold + if mag < self._threshold or self._threshold == 0: + self._noise_floor += self._ema_alpha * (mag - self._noise_floor) + else: + self._signal_peak += self._ema_alpha * (mag - self._signal_peak) + + self._threshold = (self._noise_floor + self._signal_peak) / 2.0 + + tone_detected = mag > self._threshold and self._threshold > 0 + + if tone_detected and not self._tone_on: + # Tone just started - check silence duration for gaps + self._tone_on = True + silence_count = self._silence_blocks + self._tone_blocks = 0 + + if self._current_symbol and silence_count >= self._char_gap: + # Character gap - decode accumulated symbol + char = MORSE_TABLE.get(self._current_symbol) + if char: + events.append({ + 'type': 'morse_char', + 'char': char, + 'morse': self._current_symbol, + 'timestamp': datetime.now().strftime('%H:%M:%S'), + }) + + if silence_count >= self._word_gap: + events.append({ + 'type': 'morse_space', + 'timestamp': datetime.now().strftime('%H:%M:%S'), + }) + + self._current_symbol = '' + + elif not tone_detected and self._tone_on: + # Tone just ended - classify as dit or dah + self._tone_on = False + tone_count = self._tone_blocks + self._silence_blocks = 0 + + if tone_count >= self._dah_threshold: + self._current_symbol += '-' + elif tone_count >= self._dit_min: + self._current_symbol += '.' + + elif tone_detected and self._tone_on: + self._tone_blocks += 1 + + elif not tone_detected and not self._tone_on: + self._silence_blocks += 1 + + # Emit scope data for visualization (~10 Hz is handled by caller) + if amplitudes: + events.append({ + 'type': 'scope', + 'amplitudes': amplitudes, + 'threshold': self._threshold, + 'tone_on': self._tone_on, + }) + + return events + + def flush(self) -> list[dict[str, Any]]: + """Flush any pending symbol at end of stream.""" + events: list[dict[str, Any]] = [] + if self._current_symbol: + char = MORSE_TABLE.get(self._current_symbol) + if char: + events.append({ + 'type': 'morse_char', + 'char': char, + 'morse': self._current_symbol, + 'timestamp': datetime.now().strftime('%H:%M:%S'), + }) + self._current_symbol = '' + return events + + +def morse_decoder_thread( + rtl_stdout, + output_queue: queue.Queue, + stop_event: threading.Event, + sample_rate: int = 8000, + tone_freq: float = 700.0, + wpm: int = 15, +) -> None: + """Thread function: reads PCM from rtl_fm, decodes Morse, pushes to queue. + + Reads raw 16-bit LE PCM from *rtl_stdout* and feeds it through the + MorseDecoder, pushing scope and character events onto *output_queue*. + """ + import logging + logger = logging.getLogger('intercept.morse') + + CHUNK = 4096 # bytes per read (2048 samples at 16-bit mono) + SCOPE_INTERVAL = 0.1 # scope updates at ~10 Hz + last_scope = time.monotonic() + + decoder = MorseDecoder( + sample_rate=sample_rate, + tone_freq=tone_freq, + wpm=wpm, + ) + + try: + while not stop_event.is_set(): + data = rtl_stdout.read(CHUNK) + if not data: + break + + events = decoder.process_block(data) + + for event in events: + if event['type'] == 'scope': + # Throttle scope events to ~10 Hz + now = time.monotonic() + if now - last_scope >= SCOPE_INTERVAL: + last_scope = now + with contextlib.suppress(queue.Full): + output_queue.put_nowait(event) + else: + # Character and space events always go through + with contextlib.suppress(queue.Full): + output_queue.put_nowait(event) + + except Exception as e: + logger.debug(f"Morse decoder thread error: {e}") + finally: + # Flush any pending symbol + for event in decoder.flush(): + with contextlib.suppress(queue.Full): + output_queue.put_nowait(event)